rule_chain.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package services
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/gogf/gf/util/guid"
  6. "github.com/jinzhu/gorm"
  7. "sparrow/pkg/models"
  8. "sparrow/pkg/rpcs"
  9. "sparrow/pkg/server"
  10. "sparrow/services/knowoapi/model"
  11. )
  12. var nodeType = map[string]string{
  13. "MsgTypeFilter": "MsgTypeFilterNode",
  14. "MsgTypeSwitchNode": "MsgTypeSwitchNode",
  15. "JavascriptFilter": "FilterJavascriptNode",
  16. "RestApiAction": "RestApiRequestNode",
  17. }
  18. // RuleChainService 业务接口
  19. type RuleChainService interface {
  20. Create(*models.RuleChain) error
  21. Delete(*models.RuleChain) error
  22. Update(*models.RuleChain) error
  23. Query(int, int, string) ([]models.RuleChain, int, error)
  24. Get(vendorId, recordId string) (models.RuleChain, error)
  25. }
  26. type ruleChainService struct {
  27. model *model.All
  28. }
  29. // NewRuleChainService new ruleChain service
  30. func NewRuleChainService(model *model.All) RuleChainService {
  31. return ruleChainService{
  32. model: model,
  33. }
  34. }
  35. func (a ruleChainService) Query(pi, ps int, name string) ([]models.RuleChain, int, error) {
  36. return a.model.RuleChain.Query(pi, ps, name)
  37. }
  38. func (a ruleChainService) Get(vendorId, recordId string) (models.RuleChain, error) {
  39. return a.model.RuleChain.Get(vendorId, recordId)
  40. }
  41. func (a ruleChainService) Create(ruleChain *models.RuleChain) error {
  42. ruleChain.RecordId = guid.S()
  43. err := a.create(ruleChain)
  44. if err != nil {
  45. return err
  46. }
  47. err = a.model.RuleChain.Create(ruleChain)
  48. if err != nil {
  49. return err
  50. }
  51. params := rpcs.ArgsRuleChainAct{
  52. RuleChainId: ruleChain.RecordId,
  53. VendorId: ruleChain.VendorID,
  54. }
  55. reply := new(rpcs.ReplyEmptyResult)
  56. err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.CreateRuleChain", &params, &reply)
  57. if err != nil {
  58. server.Log.Errorf("create ruleChan error : %v", err)
  59. }
  60. return nil
  61. }
  62. func (a ruleChainService) create(ruleChain *models.RuleChain) error {
  63. if len(ruleChain.Cell) == 0 {
  64. return nil
  65. }
  66. marshal, _ := json.Marshal(ruleChain.Cell)
  67. ruleChain.Configuration = fmt.Sprintf("%s", marshal)
  68. nodeMap := make(map[string]models.RuleNode)
  69. var inputNodeId string
  70. for _, v := range ruleChain.Cell {
  71. if v.Shape != "edge" {
  72. if v.Shape == "input-node" {
  73. inputNodeId = v.Source.Cell
  74. continue
  75. }
  76. ruleNode := models.RuleNode{
  77. Model: gorm.Model{},
  78. RecordId: guid.S(),
  79. RuleChainID: ruleChain.RecordId,
  80. Type: nodeType[v.Data.Type],
  81. Name: v.Data.Name,
  82. DebugModel: true,
  83. Intro: v.Data.Desc,
  84. }
  85. switch ruleNode.Type {
  86. case "MsgTypeSwitchNode":
  87. ruleNode.Name = "消息类型路由器"
  88. case "MsgTypeFilter":
  89. conMap := map[string]interface{}{
  90. "msg_type": v.Data.MesType,
  91. }
  92. marshal, _ := json.Marshal(conMap)
  93. ruleNode.Configuration = fmt.Sprintf("%s", marshal)
  94. case "FilterJavascriptNode":
  95. conMap := map[string]string{
  96. "func_body": v.Data.FuncBody,
  97. }
  98. marshal, _ := json.Marshal(conMap)
  99. ruleNode.Configuration = fmt.Sprintf("%s", marshal)
  100. case "RestApiRequestNode":
  101. configuration := models.NodeConfiguration{
  102. Url: v.Data.Url,
  103. Method: v.Data.Method,
  104. Retry: v.Data.Retry,
  105. TimeOut: v.Data.TimeOut,
  106. RetryWait: v.Data.RetryWait,
  107. Headers: make(map[string]interface{}),
  108. }
  109. if len(v.Data.Headers) > 0 {
  110. for _, header := range v.Data.Headers {
  111. configuration.Headers[header.Key] = header.Value
  112. }
  113. }
  114. marshal, _ := json.Marshal(configuration)
  115. ruleNode.Configuration = fmt.Sprintf("%s", marshal)
  116. }
  117. nodeMap[v.Id] = ruleNode
  118. err := a.model.RuleNode.Create(&ruleNode)
  119. if err != nil {
  120. return err
  121. }
  122. }
  123. }
  124. for _, v := range ruleChain.Cell {
  125. if v.Shape == "edge" && v.Source.Cell != inputNodeId {
  126. err := a.model.Relation.Create(&models.Relation{
  127. RecordId: guid.S(),
  128. RuleChainId: ruleChain.RecordId,
  129. FromID: nodeMap[v.Source.Cell].RecordId,
  130. //FromType: nodeMap[v.Source.Cell].Type,
  131. ToID: nodeMap[v.Target.Cell].RecordId,
  132. ToType: nodeMap[v.Target.Cell].Type,
  133. RelationType: v.Data.Label,
  134. })
  135. if err != nil {
  136. return err
  137. }
  138. }
  139. }
  140. return nil
  141. }
  142. func (a ruleChainService) Delete(ruleChain *models.RuleChain) error {
  143. err := a.delete(ruleChain)
  144. if err != nil {
  145. return err
  146. }
  147. err = a.model.RuleChain.Delete(ruleChain)
  148. if err != nil {
  149. return err
  150. }
  151. params := rpcs.ArgsRuleChainAct{
  152. RuleChainId: ruleChain.RecordId,
  153. VendorId: ruleChain.VendorID,
  154. }
  155. reply := new(rpcs.ReplyEmptyResult)
  156. err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.DeleteRuleChain", &params, &reply)
  157. if err != nil {
  158. server.Log.Errorf("delete ruleChan error : %v", err)
  159. }
  160. return nil
  161. }
  162. func (a ruleChainService) delete(ruleChain *models.RuleChain) error {
  163. node, _, err := a.model.RuleNode.Query(1, 100, ruleChain.RecordId)
  164. if err != nil {
  165. return err
  166. }
  167. if len(node) > 0 {
  168. for _, v := range node {
  169. _ = a.model.RuleNode.Delete(&v)
  170. }
  171. }
  172. relation, _, err := a.model.Relation.Query(1, 100, ruleChain.RecordId)
  173. if err != nil {
  174. return err
  175. }
  176. if len(relation) > 0 {
  177. for _, v := range relation {
  178. _ = a.model.Relation.DeleteByChainId(&v)
  179. }
  180. }
  181. return nil
  182. }
  183. func (a ruleChainService) Update(ruleChain *models.RuleChain) error {
  184. err := a.delete(ruleChain)
  185. if err != nil {
  186. return err
  187. }
  188. err = a.create(ruleChain)
  189. if err != nil {
  190. return err
  191. }
  192. err = a.model.RuleChain.Update(ruleChain)
  193. if err != nil {
  194. return err
  195. }
  196. params := rpcs.ArgsRuleChainAct{
  197. RuleChainId: ruleChain.RecordId,
  198. VendorId: ruleChain.VendorID,
  199. }
  200. reply := new(rpcs.ReplyEmptyResult)
  201. err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.UpdateRuleChain", &params, &reply)
  202. if err != nil {
  203. server.Log.Errorf("update ruleChan error : %v", err)
  204. }
  205. return nil
  206. }