lijian 4 gadi atpakaļ
vecāks
revīzija
a85d0246bf

+ 47 - 0
pkg/actor/actor_msg.go

@@ -0,0 +1,47 @@
+package actor
+
+import "sparrow/pkg/ruleEngine"
+
+type Msg interface {
+	GetMessageType() MsgType
+}
+
+// actor message type def
+type MsgType string
+
+const (
+	QUEUE_TO_RULE_ENGINE_MSG         MsgType = "QUEUE_TO_RULE_ENGINE_MSG"
+	RULE_CHAIN_TO_RULE_MSG           MsgType = "RULE_CHAIN_TO_RULE_MSG"
+	RULE_TO_RULE_CHAIN_TELL_NEXT_MSG MsgType = "RULE_TO_RULE_CHAIN_TELL_NEXT_MSG"
+	RULE_TO_SELF_MSG                 MsgType = "RULE_TO_SELF_MSG"
+)
+
+type QueueToRuleEngineMsg struct {
+	TenantId string
+	Message *ruleEngine.Message
+	RelationTypes  []ruleEngine.RelationType
+	FailureMessage error
+}
+
+func (q *QueueToRuleEngineMsg) GetMessageType() MsgType {
+	return QUEUE_TO_RULE_ENGINE_MSG
+}
+
+type RuleNodeToRuleChanTellNextMsg struct {
+	RuleNodeId     string
+	RelationTypes  []ruleEngine.RelationType
+	Message        *ruleEngine.Message
+	FailureMessage error
+}
+
+func (r *RuleNodeToRuleChanTellNextMsg) GetMessageType() MsgType {
+	return RULE_TO_RULE_CHAIN_TELL_NEXT_MSG
+}
+
+type RuleToSelfMsg struct {
+	Message *ruleEngine.Message
+}
+
+func (r *RuleToSelfMsg) GetMessageType() MsgType {
+	return RULE_TO_SELF_MSG
+}

+ 12 - 0
pkg/actor/actor_ref.go

@@ -0,0 +1,12 @@
+package actor
+
+
+// Ref a actor reference
+type Ref interface {
+	// get actor id
+	GetActorId() string
+	// tell actor a message
+	Tell(msg Msg)
+	// tell actor a message with high priority
+	TellWithHighPriority(msg Msg)
+}

+ 68 - 0
pkg/ruleEngine/context.go

@@ -0,0 +1,68 @@
+package ruleEngine
+
+import (
+	"sparrow/pkg/actor"
+	"time"
+)
+
+type Context interface {
+	// 向所有基于success类型的关系节点发消息
+	TellSuccess(msg *Message)
+	// 基于某个关系发消息
+	TellNext(msg *Message, relationType RelationType)
+	// 向当前节点发消息,duration 为延迟时间
+	TellSelf(msg *Message, duration time.Duration)
+	// 发送错误消息消息
+	TellError(msg *Message, err error)
+}
+
+// DefaultContext 默认的上下文
+type DefaultContext struct {
+	nodeCtx *RuleNodeCtx
+}
+
+func New(nodeCtx *RuleNodeCtx) *DefaultContext {
+	return &DefaultContext{nodeCtx: nodeCtx}
+}
+
+func (d *DefaultContext) TellSuccess(msg *Message) {
+	d.tellNext(msg, []RelationType{Success}, nil)
+}
+
+func (d *DefaultContext) TellNext(msg *Message, relationType RelationType) {
+	d.tellNext(msg, []RelationType{relationType}, nil)
+}
+
+func (d *DefaultContext) tellNext(msg *Message, relationTypes []RelationType, err error) {
+	if d.nodeCtx.self.IsDebug {
+		// TODO: 输出调试日志
+	}
+	msg.GetCallBack().onProcessingEnd(d.nodeCtx.self.RuleNodeId)
+	d.nodeCtx.chainActor.Tell(
+		&actor.RuleNodeToRuleChanTellNextMsg{
+			RuleNodeId:     d.nodeCtx.self.RuleNodeId,
+			RelationTypes:  relationTypes,
+			Message:        msg,
+			FailureMessage: err,
+		})
+}
+
+func (d *DefaultContext) TellSelf(msg *Message, duration time.Duration) {
+	if duration > 0 {
+		time.AfterFunc(duration, func() {
+			d.nodeCtx.selfActor.Tell(&actor.RuleToSelfMsg{Message: msg})
+		})
+	}
+}
+
+func (d *DefaultContext) TellError(msg *Message, err error) {
+	if d.nodeCtx.self.IsDebug {
+		// TODO: 处理调试
+	}
+	d.nodeCtx.chainActor.Tell(&actor.RuleNodeToRuleChanTellNextMsg{
+		RuleNodeId:     d.nodeCtx.self.RuleNodeId,
+		RelationTypes:  []RelationType{Failure},
+		Message:        msg,
+		FailureMessage: err,
+	})
+}

+ 55 - 0
pkg/ruleEngine/message.go

@@ -0,0 +1,55 @@
+package ruleEngine
+
+import "time"
+
+type Message struct {
+	QueueName  string
+	Id         string
+	Ts         time.Time
+	Type       RelationType
+	Data       string
+	RuleChanId string
+	RuleNodeId string
+	Callback IMessageCallBack
+}
+
+// IMessageCallBack message call back
+type IMessageCallBack interface {
+	// on success do sth.
+	OnSuccess()
+	// on failure do sth.
+	OnFailure(err error)
+	// on process start do sth.
+	onProcessingStart(ruleNodeInfo *RuleNodeInfo)
+	// on process end do sth.
+	onProcessingEnd(ruleNodeId string)
+}
+
+type emptyCallBack struct {
+}
+
+func (e emptyCallBack) onProcessingStart(ruleNodeInfo *RuleNodeInfo) {
+
+}
+
+func (e emptyCallBack) onProcessingEnd(ruleNodeId string) {
+
+}
+
+var EmptyCallBack = emptyCallBack{}
+
+func (e emptyCallBack) OnSuccess() {
+
+}
+
+func (e emptyCallBack) OnFailure(err error) {
+
+}
+// GetCallBack get message call back
+func (a *Message) GetCallBack() IMessageCallBack {
+	if a.Callback == nil {
+		return EmptyCallBack
+	} else {
+		return a.Callback
+	}
+}

+ 15 - 0
pkg/ruleEngine/node.go

@@ -0,0 +1,15 @@
+package ruleEngine
+
+import "sparrow/pkg/actor"
+
+type Node interface {
+	Init(ctx Context, config string) error
+	OnMessage(ctx Context, message *Message) error
+}
+// RuleNodeCtx 节点上下文
+type RuleNodeCtx struct {
+	TenantId   string    // 租户Id
+	chainActor actor.Ref // 规则链 actor
+	selfActor  actor.Ref // 当前节点的 actor
+	self       *RuleNode // 当前节点
+}

+ 27 - 0
pkg/ruleEngine/nodes/msg_type_filter_node.go

@@ -0,0 +1,27 @@
+package nodes
+
+import (
+	"encoding/json"
+	"sparrow/pkg/ruleEngine"
+)
+
+type MsgTypeFilterNode struct {
+	config *MsgTypeFilterNodeConfig
+}
+
+type MsgTypeFilterNodeConfig struct {
+	MessageTypes []string `json:"message_types"`
+}
+
+func (m *MsgTypeFilterNode) Init(ctx ruleEngine.Context, config string) error {
+	if config == "" {
+		m.config = &MsgTypeFilterNodeConfig{MessageTypes: []string{
+			"POST_ATTRIBUTES_REQUEST",
+		}}
+	}
+}
+
+func (m *MsgTypeFilterNode) OnMessage(ctx ruleEngine.Context, message ruleEngine.Message) error {
+	ctx.TellNext(message, )
+	return nil
+}

+ 34 - 0
pkg/ruleEngine/rule_chain.go

@@ -0,0 +1,34 @@
+package ruleEngine
+
+type EntityType int
+
+const (
+	ruleChan EntityType = iota
+)
+
+
+type RuleChain struct {
+	TenantId    string
+	Name        string
+	FirstNodeId string
+	IsRoot      bool
+	IsDebug     bool
+	Config      string
+	ChainId     string
+}
+
+type RuleNode struct {
+	RuleChainId string
+	Type        string
+	Name        string
+	IsDebug     bool
+	Config      string
+	RuleNodeId  string
+}
+
+type Relation struct {
+	From           string
+	To             string
+	Type           string
+	AdditionalInfo string
+}

+ 17 - 0
pkg/ruleEngine/rule_chain_test.go

@@ -0,0 +1,17 @@
+package ruleEngine
+
+func initData() map[string][]*RuleNode {
+	nodes := map[string][]*RuleNode {
+		"chain1": {
+			&RuleNode{
+				RuleChainId: "chain1",
+				Type:        "",
+				Name:        "simple rule node",
+				IsDebug:     false,
+				Config:      "",
+				RuleNodeId:  "",
+			},
+		},
+	}
+	return nodes
+}

+ 30 - 0
pkg/ruleEngine/schema.go

@@ -0,0 +1,30 @@
+package ruleEngine
+
+import "fmt"
+
+// RelationType 默认节点关系
+type RelationType string
+
+const (
+	Success RelationType = "Success" // 成功
+	Failure RelationType = "Failure" // 失败
+	True    RelationType = "True"    // 真
+	False   RelationType = "False"   // 假
+)
+
+// RuleNodeInfo rule node info for output
+type RuleNodeInfo struct {
+	ruleNodeId    string
+	ruleChainName string
+	ruleNodeName  string
+}
+
+func (r *RuleNodeInfo) String() string {
+	return fmt.Sprintf("[RuleChain:%s|RuleNode:%s|RuleNodeId:%s]", r.ruleChainName, r.ruleNodeName, r.ruleNodeId)
+}
+
+type RuleNodeRelation struct {
+	In           string
+	Out          string
+	RelationType RelationType
+}

+ 1 - 1
pkg/server/server_manager.go

@@ -85,7 +85,7 @@ func (mgr *ServerManager) RegisterServer() error {
 	}
 	}
 	if serverInstance.rpcsvr != nil {
 	if serverInstance.rpcsvr != nil {
 		addr, _ := fixHostIp(*confRPCHost)
 		addr, _ := fixHostIp(*confRPCHost)
-		key = fmt.Sprintf("%s%s/%s", prefix, FlagTCPHost, addr)
+		key = fmt.Sprintf("%s%s/%s", prefix, FlagRPCHost, addr)
 	}
 	}
 	if serverInstance.udpsvr != nil {
 	if serverInstance.udpsvr != nil {
 		addr := os.Getenv(EnvUDPProxy)
 		addr := os.Getenv(EnvUDPProxy)