rule_chain.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package services
  2. import (
  3. "encoding/json"
  4. "github.com/gogf/gf/util/guid"
  5. "sparrow/pkg/models"
  6. "sparrow/pkg/rpcs"
  7. "sparrow/pkg/server"
  8. "sparrow/services/knowoapi/model"
  9. )
  10. // RuleChainService 业务接口
  11. type RuleChainService interface {
  12. Create(*models.RuleChain) error
  13. Delete(*models.RuleChain) error
  14. Update(*models.RuleChainParams) error
  15. Query(int, int, string, string) ([]models.RuleChain, int, error)
  16. Get(vendorId, recordId string) (models.RuleChain, error)
  17. UpdateChainRoot(*models.ChangeRootParams) error
  18. }
  19. type ruleChainService struct {
  20. model *model.All
  21. }
  22. // NewRuleChainService new ruleChain service
  23. func NewRuleChainService(model *model.All) RuleChainService {
  24. return ruleChainService{
  25. model: model,
  26. }
  27. }
  28. func (a ruleChainService) Query(pi, ps int, name, vendorId string) ([]models.RuleChain, int, error) {
  29. return a.model.RuleChain.Query(pi, ps, name, vendorId, false)
  30. }
  31. func (a ruleChainService) Get(vendorId, recordId string) (models.RuleChain, error) {
  32. return a.model.RuleChain.Get(vendorId, recordId)
  33. }
  34. func (a ruleChainService) Create(ruleChain *models.RuleChain) error {
  35. ruleChain.RecordId = guid.S()
  36. if ruleChain.Root {
  37. result, total, err := a.model.RuleChain.Query(1, 1, "", ruleChain.VendorID, true)
  38. if err != nil {
  39. return err
  40. }
  41. if total > 0 {
  42. _, err = a.model.RuleChain.UpdateChainRoot(result[0].RecordId, false)
  43. if err != nil {
  44. return err
  45. }
  46. }
  47. }
  48. err := a.model.RuleChain.Create(ruleChain)
  49. if err != nil {
  50. return err
  51. }
  52. params := rpcs.ArgsRuleChainAct{
  53. RuleChainId: ruleChain.RecordId,
  54. VendorId: ruleChain.VendorID,
  55. }
  56. reply := new(rpcs.ReplyEmptyResult)
  57. err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.CreateRuleChain", &params, &reply)
  58. if err != nil {
  59. server.Log.Errorf("create ruleChan error : %v", err)
  60. }
  61. return nil
  62. }
  63. func (a ruleChainService) create(params *models.RuleChainParams) (firstNodeId string, err error) {
  64. nodeMap := make(map[string]models.RuleNode)
  65. var inputNodeId string
  66. for _, v := range params.Cell {
  67. if v.GetString("shape") != "edge" {
  68. if v.GetString("shape") == "input-node" {
  69. inputNodeId = v.GetString("id")
  70. continue
  71. }
  72. ruleNode := models.RuleNode{
  73. RecordId: guid.S(),
  74. RuleChainID: params.RecordId,
  75. Configuration: v.GetString("data"),
  76. Type: v.GetString("data.type"),
  77. Name: v.GetString("data.name"),
  78. DebugModel: true,
  79. Intro: v.GetString("data.desc"),
  80. }
  81. //if v.Shape != "edge" {
  82. // if v.Shape == "input-node" {
  83. // inputNodeId = v.Source.Cell
  84. // continue
  85. // }
  86. // ruleNode := models.RuleNode{
  87. // RecordId: guid.S(),
  88. // RuleChainID: params.RecordId,
  89. // Type: nodeType[v.Data.Type],
  90. // Name: v.Data.Name,
  91. // DebugModel: true,
  92. // Intro: v.Data.Desc,
  93. // }
  94. //switch ruleNode.Type {
  95. //case "MsgTypeSwitchNode":
  96. // ruleNode.Name = "消息类型路由器"
  97. //case "MsgTypeFilter":
  98. // conMap := map[string]interface{}{
  99. // "msg_type": v.Data.MesType,
  100. // }
  101. // marshal, _ := json.Marshal(conMap)
  102. // ruleNode.Configuration = fmt.Sprintf("%s", marshal)
  103. //
  104. //case "FilterJavascriptNode":
  105. // conMap := map[string]string{
  106. // "func_body": v.Data.FuncBody,
  107. // }
  108. // marshal, _ := json.Marshal(conMap)
  109. // ruleNode.Configuration = fmt.Sprintf("%s", marshal)
  110. //case "RestApiRequestNode":
  111. // configuration := models.NodeConfiguration{
  112. // Url: v.Data.Url,
  113. // Method: v.Data.Method,
  114. // Retry: v.Data.Retry,
  115. // TimeOut: v.Data.TimeOut,
  116. // RetryWait: v.Data.RetryWait,
  117. // Headers: make(map[string]interface{}),
  118. // }
  119. // if len(v.Data.Headers) > 0 {
  120. // for _, header := range v.Data.Headers {
  121. // configuration.Headers[header.Key] = header.Value
  122. // }
  123. // }
  124. //}
  125. //str, err := gjson.DecodeToJson(v)
  126. //if err != nil {
  127. // return "", err
  128. //}
  129. nodeMap[v.GetString("id")] = ruleNode
  130. err = a.model.RuleNode.Create(&ruleNode)
  131. if err != nil {
  132. return firstNodeId, err
  133. }
  134. }
  135. }
  136. for _, v := range params.Cell {
  137. if v.GetString("shape") == "edge" {
  138. if v.GetString("source.cell") == inputNodeId {
  139. firstNodeId = nodeMap[v.GetString("target.cell")].RecordId
  140. continue
  141. }
  142. err = a.model.Relation.Create(&models.Relation{
  143. RecordId: guid.S(),
  144. //RuleChainId: params.RecordId,
  145. FromID: nodeMap[v.GetString("source.cell")].RecordId,
  146. //FromType: nodeMap[v.Source.Cell].Type,
  147. ToID: nodeMap[v.GetString("target.cell")].RecordId,
  148. ToType: nodeMap[v.GetString("target.cell")].Type,
  149. RelationType: v.GetString("data.label"),
  150. })
  151. if err != nil {
  152. return firstNodeId, err
  153. }
  154. }
  155. }
  156. return firstNodeId, err
  157. }
  158. func (a ruleChainService) Delete(ruleChain *models.RuleChain) error {
  159. err := a.delete(ruleChain)
  160. if err != nil {
  161. return err
  162. }
  163. err = a.model.RuleChain.Delete(ruleChain)
  164. if err != nil {
  165. return err
  166. }
  167. params := rpcs.ArgsRuleChainAct{
  168. RuleChainId: ruleChain.RecordId,
  169. VendorId: ruleChain.VendorID,
  170. }
  171. reply := new(rpcs.ReplyEmptyResult)
  172. err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.DeleteRuleChain", &params, &reply)
  173. if err != nil {
  174. server.Log.Errorf("delete ruleChan error : %v", err)
  175. }
  176. return nil
  177. }
  178. func (a ruleChainService) delete(ruleChain *models.RuleChain) error {
  179. node, _, err := a.model.RuleNode.Query(1, 100, ruleChain.RecordId)
  180. if err != nil {
  181. return err
  182. }
  183. if len(node) > 0 {
  184. for _, v := range node {
  185. _ = a.model.RuleNode.Delete(&v)
  186. }
  187. }
  188. //relation, _, err := a.model.Relation.Query(1, 100, ruleChain.RecordId)
  189. //if err != nil {
  190. // return err
  191. //}
  192. //if len(relation) > 0 {
  193. // for _, v := range relation {
  194. // _ = a.model.Relation.DeleteByChainId(&v)
  195. // }
  196. //}
  197. return nil
  198. }
  199. func (a ruleChainService) Update(params *models.RuleChainParams) error {
  200. ruleChain, err := a.Get(params.VendorId, params.RecordId)
  201. if err != nil {
  202. return err
  203. }
  204. err = a.delete(&ruleChain)
  205. if err != nil {
  206. return err
  207. }
  208. if params.Cell != nil {
  209. str, err := json.Marshal(params.Cell)
  210. if err != nil {
  211. return err
  212. }
  213. ruleChain.Configuration = string(str)
  214. ruleChain.FirstRuleNodeID, err = a.create(params)
  215. if err != nil {
  216. return err
  217. }
  218. }
  219. err = a.model.RuleChain.Update(&ruleChain)
  220. if err != nil {
  221. return err
  222. }
  223. args := rpcs.ArgsRuleChainAct{
  224. RuleChainId: ruleChain.RecordId,
  225. VendorId: ruleChain.VendorID,
  226. }
  227. reply := new(rpcs.ReplyEmptyResult)
  228. err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.UpdateRuleChain", &args, &reply)
  229. if err != nil {
  230. server.Log.Errorf("update ruleChan error : %v", err)
  231. }
  232. return nil
  233. }
  234. func (a ruleChainService) UpdateChainRoot(params *models.ChangeRootParams) error {
  235. //var isRoot int = 0
  236. //if params.Root {
  237. // isRoot = 1
  238. //}
  239. _, err := a.model.RuleChain.UpdateChainRoot(params.RecordId, params.Root)
  240. if err != nil {
  241. return err
  242. }
  243. if params.Root {
  244. data, total, err := a.model.RuleChain.Query(1, 1, "", params.VendorId, true)
  245. if err != nil {
  246. return err
  247. }
  248. if total > 0 {
  249. data[0].Root = false
  250. _, err := a.model.RuleChain.UpdateChainRoot(data[0].RecordId, data[0].Root)
  251. if err != nil {
  252. return err
  253. }
  254. }
  255. }
  256. return nil
  257. }