Procházet zdrojové kódy

完成对组件生命周期的处理

lijian před 4 roky
rodič
revize
7e52abcede

+ 3 - 1
README.md

@@ -29,4 +29,6 @@
 
 
 ## 开发计划:
-   1.实现规则引擎获取租户、规则链、规则节点相关的数据.
+   1.实现规则引擎获取租户、规则链、规则节点相关的数据.
+   
+   2.实现actor的生命周期消息事件支持。

+ 36 - 1
pkg/actors/app_actor.go

@@ -2,6 +2,7 @@ package actors
 
 import (
 	"github.com/gogf/gf/util/guid"
+	"sparrow/pkg/entities"
 	"sparrow/pkg/protocol"
 	"sparrow/pkg/ruleEngine"
 	"sparrow/pkg/server"
@@ -10,6 +11,7 @@ import (
 // AppActor 服务级actor
 type AppActor struct {
 	ruleEngine.ContextBasedCreator
+	deleteTenants map[string]string
 }
 
 func (a *AppActor) GetActorRef() ruleEngine.Ref {
@@ -29,12 +31,15 @@ func (a *AppActor) Process(msg protocol.ActorMsg) error {
 		}
 	case protocol.QUEUE_TO_RULE_ENGINE_MSG:
 		return a.onQueueToRuleEngineMsg(msg.(*ruleEngine.QueueToRuleEngineMsg))
+	case protocol.COMPONENT_LIFE_CYCLE_MSG:
+		return a.onComponentLifeCycleMsg(msg.(*ruleEngine.ComponentLifecycleMsg))
 	default:
 		server.Log.Debugf("未知的消息类型:%s", msg.GetMessageType())
 	}
 	return nil
 }
 
+// 消息队列向规则引擎的消息处理
 func (a *AppActor) onQueueToRuleEngineMsg(msg *ruleEngine.QueueToRuleEngineMsg) error {
 	if ref, err := a.getOrCreateTenantActor(msg.TenantId); err == nil {
 		ref.Tell(msg)
@@ -45,6 +50,34 @@ func (a *AppActor) onQueueToRuleEngineMsg(msg *ruleEngine.QueueToRuleEngineMsg)
 	return nil
 }
 
+// 组件生命周期事件处理
+func (a *AppActor) onComponentLifeCycleMsg(msg *ruleEngine.ComponentLifecycleMsg) error {
+	var target ruleEngine.Ref
+	var err error
+	if msg.EntityId.GetEntityType() == entities.TENANT {
+		tenantId := msg.EntityId.GetId()
+		if msg.EventType == ruleEngine.DELETED {
+			a.deleteTenants[tenantId] = tenantId
+			return a.Ctx.Stop(tenantId)
+		} else {
+			target, err = a.getOrCreateTenantActor(msg.TenantId)
+			if err != nil {
+				return err
+			}
+		}
+	} else {
+		target, err = a.getOrCreateTenantActor(msg.TenantId)
+		if err != nil {
+			return err
+		}
+	}
+	if target != nil {
+		target.TellWithHighPriority(msg)
+	} else {
+		server.Log.Debugf("invalid component lifecycle msg %s:", msg.TenantId)
+	}
+}
+
 func (a *AppActor) initTenants() error {
 	server.Log.Debug("starting main actor")
 	tenants, err := a.SystemCtx.TenantService.FindTenants()
@@ -97,7 +130,9 @@ func (a *AppActorCreator) CreateActorId() string {
 }
 
 func (a *AppActorCreator) CreateActor() ruleEngine.Actor {
-	appC := new(AppActor)
+	appC := &AppActor{
+		deleteTenants: make(map[string]string),
+	}
 	appC.SystemCtx = a.SystemCtx
 	return appC
 }

+ 114 - 8
pkg/actors/rule_chain_actor.go

@@ -67,6 +67,27 @@ func (r *RuleChainActor) Process(msg protocol.ActorMsg) error {
 		return r.onTellNextRuleNode(msg.(*ruleEngine.RuleNodeToRuleChanTellNextMsg))
 	case protocol.RULE_CHAIN_TO_RULE_CHAIN_MSG:
 		return r.onRuleChainToRuleChain(msg.(*ruleEngine.RuleChainToRuleChainMsg))
+	case protocol.COMPONENT_LIFE_CYCLE_MSG:
+		return r.onComponentLifeCycleMsg(msg.(*ruleEngine.ComponentLifecycleMsg))
+	}
+	return nil
+}
+
+// TODO:生命周期事件写入数据库
+func (r *RuleChainActor) onComponentLifeCycleMsg(msg *ruleEngine.ComponentLifecycleMsg) error {
+	server.Log.Debugf("%s,%s,%s onComponentLifecycleMsg", msg.TenantId, msg.EntityId.GetEntityType(), msg.EntityId.GetId())
+	switch msg.EventType {
+	case ruleEngine.CREATED:
+		return r.start()
+	case ruleEngine.UPDATED:
+		return r.update()
+	case ruleEngine.ACTIVATED:
+		return r.restart()
+	case ruleEngine.DELETED:
+		_ = r.stop()
+		return r.Ctx.Stop(r.ruleChainId)
+	default:
+
 	}
 	return nil
 }
@@ -247,7 +268,7 @@ func (r *RuleChainActor) start() error {
 				}
 				r.started = true
 			}
-			r.initRoutes(r.ruleChain, nodes)
+			return r.initRoutes(r.ruleChain, nodes)
 		}
 	} else {
 		return r.update()
@@ -255,15 +276,104 @@ func (r *RuleChainActor) start() error {
 	return nil
 }
 
-func (r *RuleChainActor) initRoutes(ruleChain *ruleEngine.RuleChain, nodes []*ruleEngine.RuleNode) {
+func (r *RuleChainActor) update() error {
+	ruleChain, err := r.SystemCtx.RuleChainService.FindRuleChainById(r.tenantId, r.ruleChainId)
+	if err != nil {
+		return err
+	}
+	if ruleChain == nil {
+		return errors.New(fmt.Sprintf("rule chain not found :%s", r.ruleChainId))
+	}
+	// 查询链上的所有节点
+	nodes, err := r.SystemCtx.RuleChainService.GetRuleChainNodes(r.tenantId, r.ruleChainId)
+	if err != nil {
+		return err
+	}
+	for _, v := range nodes {
+		// 如果找不到节点actor,则创建
+		if actor, ok := r.nodeActors[v.RuleNodeId]; !ok {
+			server.Log.Debugf("creating rule node actor:%s", v.RuleNodeId)
+			ref, err := r.createNodeActor(v.RuleNodeId)
+			if err != nil {
+				return err
+			}
+			r.nodeActors[v.RuleNodeId] = &ruleEngine.RuleNodeCtx{
+				TenantId:   r.tenantId,
+				ChainActor: r.Ctx,
+				SelfActor:  ref,
+				Self:       v,
+			}
+		} else {
+			// 传递消息到node actor
+			server.Log.Debugf("updating rule node actor:%s", v.RuleNodeId)
+			actor.Self = v
+			actor.SelfActor.TellWithHighPriority(&ruleEngine.ComponentLifecycleMsg{
+				TenantId:  r.tenantId,
+				EntityId:  &entities.RuleNodeId{Id: v.RuleNodeId},
+				EventType: ruleEngine.UPDATED,
+			})
+		}
+	}
+
+	var removeNodes []string
+	// 对比已经有节点和最新节点列表,找出差集,并移除
+	for k := range r.nodeActors {
+		var found = false
+		for _, v := range nodes {
+			if v.RuleNodeId == k {
+				found = true
+				break
+			}
+		}
+		if !found {
+			removeNodes = append(removeNodes, k)
+		}
+	}
+	// remove actors
+	for _, v := range removeNodes {
+		server.Log.Debugf("remove rule node :%s", v)
+		if ref, ok := r.nodeActors[v]; ok {
+			ref.SelfActor.TellWithHighPriority(&ruleEngine.ComponentLifecycleMsg{
+				TenantId:  r.tenantId,
+				EntityId:  &entities.RuleNodeId{Id: v},
+				EventType: ruleEngine.DELETED,
+			})
+			delete(r.nodeActors, v)
+		}
+	}
+	return r.initRoutes(r.ruleChain, nodes)
+}
+
+func (r *RuleChainActor) restart() error {
+	if err := r.stop(); err != nil {
+		return err
+	}
+	if err := r.start(); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (r *RuleChainActor) stop() error {
+	server.Log.Debugf("stopping rule chain with %d nodes, tenantId:%s, entityId:%s", len(r.nodeActors), r.tenantId, r.ruleChainId)
+	for actorId := range r.nodeActors {
+		err := r.Ctx.Stop(actorId)
+		if err != nil {
+			return err
+		}
+	}
+	r.started = false
+	return nil
+}
+
+func (r *RuleChainActor) initRoutes(ruleChain *ruleEngine.RuleChain, nodes []*ruleEngine.RuleNode) error {
 	for _, node := range nodes {
 		relations, err := r.SystemCtx.RuleChainService.GetRuleNodeRelations(r.tenantId, node.RuleNodeId)
 		if err != nil {
-			continue
+			return err
 		}
 		var rs []*ruleEngine.RuleNodeRelation
 		for _, relation := range relations {
-
 			rs = append(rs, &ruleEngine.RuleNodeRelation{
 				In:   &entities.RuleNodeId{Id: node.RuleNodeId},
 				Out:  &entities.RuleNodeId{Id: relation.To},
@@ -272,13 +382,9 @@ func (r *RuleChainActor) initRoutes(ruleChain *ruleEngine.RuleChain, nodes []*ru
 		}
 		r.nodeRoutes[node.RuleNodeId] = rs
 	}
-	fmt.Printf("%+v", r.nodeRoutes)
 	r.firstId = ruleChain.FirstNodeId
 	r.firstNode = r.nodeActors[r.firstId]
 	r.state = ruleEngine.ACTIVE
-}
-
-func (r *RuleChainActor) update() error {
 	return nil
 }
 

+ 62 - 9
pkg/actors/rule_node_actor.go

@@ -21,7 +21,7 @@ type RuleNodeActor struct {
 	tenantId       string
 	defaultContext ruleEngine.Context
 	parent         ruleEngine.Ref
-
+	state          ruleEngine.ComponentLifecycleState
 	info *protocol.RuleNodeInfo
 }
 
@@ -47,14 +47,7 @@ func (r *RuleNodeActor) Init(ctx ruleEngine.Ctx) error {
 		RuleChainName: r.ruleChainName,
 		RuleNodeName:  r.ruleNode.Name,
 	}
-	node, err := r.initComponent(r.ruleNode)
-	if err != nil {
-		return err
-	}
-	if node != nil {
-		r.node = node
-	}
-	return nil
+	return r.start()
 }
 
 // 实例化node
@@ -77,10 +70,35 @@ func (r *RuleNodeActor) Process(msg protocol.ActorMsg) error {
 		return r.onRuleToSelfMsg(msg.(*ruleEngine.RuleToSelfMsg))
 	case protocol.RULE_TO_SELF_ERROR_MSG:
 		return r.onRuleToSelfErrorMsg(msg.(*ruleEngine.RuleToSelfErrorMsg))
+	case protocol.COMPONENT_LIFE_CYCLE_MSG:
+		return r.onComponentLifeCycleMsg(msg.(*ruleEngine.ComponentLifecycleMsg))
 	default:
 		return errors.New("未知的消息类型")
 	}
 }
+// node actor lifecycle handle
+func (r *RuleNodeActor) onComponentLifeCycleMsg(msg *ruleEngine.ComponentLifecycleMsg) error {
+	switch msg.EventType {
+	case ruleEngine.CREATED:
+		return r.start()
+	case ruleEngine.UPDATED:
+		return r.update()
+	case ruleEngine.ACTIVATED:
+		if err := r.stop(); err != nil {
+			return err
+		}
+		if err := r.start();err != nil {
+			return err
+		}
+	case ruleEngine.DELETED:
+		if err := r.stop(); err != nil {
+			return err
+		}
+		return r.Ctx.Stop(r.Ctx.GetActorId())
+	default:
+
+	}
+}
 
 func (r *RuleNodeActor) onRuleToSelfErrorMsg(msg *ruleEngine.RuleToSelfErrorMsg) error {
 	server.Log.Error(msg.Err)
@@ -136,6 +154,41 @@ func (r *RuleNodeActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureSt
 	}
 }
 
+func (r *RuleNodeActor) start() error {
+	node, err := r.initComponent(r.ruleNode)
+	if err != nil {
+		return err
+	}
+	if node != nil {
+		r.state = ruleEngine.ACTIVE
+	}
+	return nil
+}
+
+func (r *RuleNodeActor) update() error {
+	node, err := r.SystemCtx.RuleChainService.FindRuleNodeById(r.tenantId, r.ruleNodeId)
+	if err != nil {
+		return err
+	}
+	if node == nil {
+		return errors.New(fmt.Sprintf("node not found %s", r.ruleNodeId))
+	}
+	r.info = &protocol.RuleNodeInfo{
+		RuleNodeId:    node.RuleNodeId,
+		RuleChainName: r.ruleChainName,
+		RuleNodeName:  node.Name,
+	}
+	r.ruleNode = node
+	r.defaultContext.GetRuleNodeCtx().Self = node
+	return r.start()
+}
+
+func (r *RuleNodeActor) stop() error {
+	if r.node != nil {
+		r.state = ruleEngine.SUSPENDED
+	}
+}
+
 type RuleNodeActorCreator struct {
 	RuleNodeActor
 }

+ 6 - 0
pkg/ruleEngine/context.go

@@ -20,6 +20,8 @@ type Context interface {
 	Ack(msg *protocol.Message)
 	// transform a message
 	TransformMessage(msg *protocol.Message, msgType, originator string, metaData map[string]interface{}, data string) *protocol.Message
+
+	GetRuleNodeCtx() *RuleNodeCtx
 }
 
 // DefaultContext 默认的上下文
@@ -32,6 +34,10 @@ func NewDefaultContext(nodeCtx *RuleNodeCtx, mainCtx *SystemContext) *DefaultCon
 	return &DefaultContext{nodeCtx: nodeCtx, mainCtx: mainCtx}
 }
 
+func (d *DefaultContext) GetRuleNodeCtx() *RuleNodeCtx {
+	return d.nodeCtx
+}
+
 func (d *DefaultContext) TellSuccess(msg *protocol.Message) {
 	d.tellNext(msg, []protocol.RelationType{protocol.Success}, nil)
 }