Browse Source

修复服务发现

lijian 4 years ago
parent
commit
2d229347e3

+ 9 - 0
pkg/actor/app_actor.go

@@ -0,0 +1,9 @@
+package actor
+
+type AppActor struct {
+
+}
+
+func (a *AppActor) Receive(ctx actor.Context) {
+
+}

+ 1 - 1
pkg/ruleEngine/message.go

@@ -6,7 +6,7 @@ type Message struct {
 	QueueName  string
 	Id         string
 	Ts         time.Time
-	Type       RelationType
+	Type       string
 	Data       string
 	RuleChanId string
 	RuleNodeId string

+ 20 - 3
pkg/ruleEngine/nodes/msg_type_filter_node.go

@@ -16,12 +16,29 @@ type MsgTypeFilterNodeConfig struct {
 func (m *MsgTypeFilterNode) Init(ctx ruleEngine.Context, config string) error {
 	if config == "" {
 		m.config = &MsgTypeFilterNodeConfig{MessageTypes: []string{
-			"POST_ATTRIBUTES_REQUEST",
+			ruleEngine.POST_ATTRIBUTES_REQUEST,
+			ruleEngine.POST_EVENT_REQUEST,
 		}}
+		return nil
 	}
+	c := new(MsgTypeFilterNodeConfig)
+	err := json.Unmarshal([]byte(config), c)
+	if err != nil {
+		return err
+	}
+	m.config = c
+	return nil
 }
 
-func (m *MsgTypeFilterNode) OnMessage(ctx ruleEngine.Context, message ruleEngine.Message) error {
-	ctx.TellNext(message, )
+func (m *MsgTypeFilterNode) OnMessage(ctx ruleEngine.Context, message *ruleEngine.Message) error {
+	var relation ruleEngine.RelationType
+	for _, msgType := range m.config.MessageTypes {
+		if message.Type == msgType {
+			relation = ruleEngine.True
+		} else {
+			relation = ruleEngine.False
+		}
+	}
+	ctx.TellNext(message, relation)
 	return nil
 }

+ 30 - 0
pkg/ruleEngine/nodes/msg_type_switch_node.go

@@ -0,0 +1,30 @@
+package nodes
+
+import (
+	"sparrow/pkg/ruleEngine"
+)
+
+type MsgTypeSwitchNode struct {
+}
+
+func (m *MsgTypeSwitchNode) Init(ctx ruleEngine.Context, config string) error {
+	return nil
+}
+
+func (m *MsgTypeSwitchNode) OnMessage(ctx ruleEngine.Context, message *ruleEngine.Message) error {
+	var relationType ruleEngine.RelationType
+	switch message.Type {
+	case ruleEngine.POST_ATTRIBUTES_REQUEST:
+		relationType = ruleEngine.PostAttributes
+	case ruleEngine.POST_EVENT_REQUEST:
+		relationType = ruleEngine.PostAttributes
+	case ruleEngine.CONNECT_EVENT:
+		relationType = ruleEngine.ConnectEvent
+	case ruleEngine.DISCONNECT_EVENT:
+		relationType = ruleEngine.DisconnectEvent
+	default:
+		relationType = ruleEngine.Other
+	}
+	ctx.TellNext(message, relationType)
+	return nil
+}

+ 13 - 0
pkg/ruleEngine/schema.go

@@ -8,8 +8,21 @@ type RelationType string
 const (
 	Success RelationType = "Success" // 成功
 	Failure RelationType = "Failure" // 失败
+	Other   RelationType = "Other"   // 其他
 	True    RelationType = "True"    // 真
 	False   RelationType = "False"   // 假
+
+	PostAttributes  RelationType = "Post attributes"  // 上报属性
+	PostEvent       RelationType = "Post Event"       // 上报事件
+	ConnectEvent    RelationType = "Connect Event"    // 接入
+	DisconnectEvent RelationType = "Disconnect Event" // 断开
+)
+
+const (
+	POST_ATTRIBUTES_REQUEST = "POST_ATTRIBUTES_REQUEST" // 属性上报消息
+	POST_EVENT_REQUEST      = "POST_EVENT_REQUEST"      // 事件上报消息
+	CONNECT_EVENT           = "CONNECT_EVENT"           // 接入事件
+	DISCONNECT_EVENT        = "DISCONNECT_EVENT"        // 断开事件
 )
 
 // RuleNodeInfo rule node info for output

+ 29 - 25
pkg/server/server_manager.go

@@ -73,45 +73,46 @@ func (mgr *ServerManager) RegisterServer() error {
 	prefix := fmt.Sprintf("%s%s/", EtcdServersPrefix, mgr.serverName)
 	var (
 		addr string
-		key  string
+		keys []string
 	)
-
 	if serverInstance.tcpsvr != nil {
 		addr := os.Getenv(EnvTCPProxy)
 		if addr == "" {
 			addr, _ = fixHostIp(*confTCPHost)
 		}
-		key = fmt.Sprintf("%s%s/%s", prefix, FlagTCPHost, addr)
+		keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagTCPHost, addr))
 	}
 	if serverInstance.rpcsvr != nil {
 		addr, _ := fixHostIp(*confRPCHost)
-		key = fmt.Sprintf("%s%s/%s", prefix, FlagRPCHost, addr)
+		keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagRPCHost, addr))
 	}
 	if serverInstance.udpsvr != nil {
 		addr := os.Getenv(EnvUDPProxy)
 		if addr == "" {
 			addr, _ = fixHostIp(*confUDPHost)
 		}
-		key = fmt.Sprintf("%s%s/%s", prefix, FlagUDPHost, addr)
+		keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagUDPHost, addr))
 	}
 	if serverInstance.httpsvr != nil {
 		addr := os.Getenv(EnvHTTPProxy)
 		if addr == "" {
 			addr, _ = fixHostIp(*confHTTPHost)
 		}
-		key = fmt.Sprintf("%s%s/%s", prefix, FlagHTTPHost, addr)
-	}
-	_, err = mgr.cli.Put(ctx, key, addr, clientv3.WithLease(resp.ID))
-	if err != nil {
-		return nil
+		keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagHTTPHost, addr))
 	}
-	mgr.leaseId = resp.ID
-	leaseRespChan, err := mgr.cli.KeepAlive(ctx, resp.ID)
-	if err != nil {
-		return err
+	for _, key := range keys {
+		_, err = mgr.cli.Put(ctx, key, addr, clientv3.WithLease(resp.ID))
+		if err != nil {
+			return nil
+		}
+		mgr.leaseId = resp.ID
+		leaseRespChan, err := mgr.cli.KeepAlive(ctx, resp.ID)
+		if err != nil {
+			return err
+		}
+		mgr.keepAliveChan = leaseRespChan
 	}
-	mgr.keepAliveChan = leaseRespChan
-	// print common key info
+	// print common keys info
 	Log.Infof("RegisterServer is done. leaseId is %v\n", mgr.leaseId)
 	go func() {
 		for leaseResp := range mgr.keepAliveChan {
@@ -126,21 +127,24 @@ func (mgr *ServerManager) UpdateServerHosts() error {
 	if serverInstance == nil {
 		return errorf(errServerNotInit)
 	}
-
 	prefix := EtcdServersPrefix
 	response, err := mgr.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
 	if err != nil {
 		return err
 	}
 
-	servers := make(map[string](map[string][]string))
-
-	for _, server := range response.Kvs {
-		name := strings.Split(string(server.Key), "/")[EtcdServersPrefixCnt+1]
-		servers[name] = make(map[string][]string)
-		host := strings.Split(string(server.Key), "/")[EtcdServersPrefixCnt+2]
-		servers[name][host] = []string{}
-		addr := strings.Split(string(server.Key), "/")[EtcdServersPrefixCnt+3]
+	servers := make(map[string]map[string][]string)
+	for _, kvs := range response.Kvs {
+		key := string(kvs.Key)
+		name := strings.Split(key, "/")[EtcdServersPrefixCnt+1]
+		host := strings.Split(key, "/")[EtcdServersPrefixCnt+2]
+		addr := strings.Split(key, "/")[EtcdServersPrefixCnt+3]
+		if _, ok := servers[name]; !ok {
+			servers[name] = make(map[string][]string)
+		}
+		if _, ok := servers[name][host];!ok {
+			servers[name][host] = []string{}
+		}
 		servers[name][host] = append(servers[name][host], addr)
 	}
 	mgr.mapServers = servers