Преглед изворни кода

feat: 尝试修复 registry 服务

lijian пре 4 година
родитељ
комит
c4a820d1d9

+ 1 - 0
.gitignore

@@ -31,3 +31,4 @@ _testmain.go
 *.prof
 upload
 upload/*
+.vscode

+ 0 - 3
.vscode/settings.json

@@ -1,3 +0,0 @@
-{
-    "go.testFlags": ["-v"]
-}

+ 0 - 1
pkg/actors/rule_chain_actor.go

@@ -125,7 +125,6 @@ func (r *RuleChainActor) onTellNext(msg *protocol.Message, originatorNodeId stri
 			}
 		}
 	}
-	fmt.Printf("+++++++++nodeid:%s, creator:%s, %+v, types:%v", originatorNodeId, originatorId, relations, relationTypes)
 	if len(relations) == 0 {
 		server.Log.Debugf("No outbound relations to process,%s, %s", originatorId, r.tenantId)
 		if contains(relationTypes, "Failure") {

+ 14 - 2
pkg/actors/rule_node_actor.go

@@ -3,6 +3,7 @@ package actors
 import (
 	"errors"
 	"fmt"
+	"sparrow/pkg/entities"
 	"sparrow/pkg/protocol"
 	"sparrow/pkg/ruleEngine"
 	"sparrow/pkg/ruleEngine/nodes"
@@ -92,6 +93,10 @@ func (r *RuleNodeActor) onRuleToSelfMsg(msg *ruleEngine.RuleToSelfMsg) error {
 	actorMsg := msg.Message
 	ruleNodeCount := actorMsg.GetAndIncrementRuleNodeCounter()
 	if ruleNodeCount < 20 {
+		if r.ruleNode.IsDebug {
+			_ = r.SystemCtx.PersistDebugInput(r.tenantId,
+				&entities.RuleNodeId{Id: r.ruleNode.RuleNodeId}, actorMsg, "Self", nil)
+		}
 		if err := r.node.OnMessage(r.defaultContext, actorMsg); err != nil {
 			r.defaultContext.TellError(actorMsg, errors.New("onRuleToSelfMsg error"))
 			return err
@@ -108,6 +113,9 @@ func (r *RuleNodeActor) onRuleChainToNodeMsg(msg *ruleEngine.RuleChainToRuleNode
 	msg.Message.GetCallBack().OnProcessingStart(r.info)
 	actorMsg := msg.Message
 	// ruleNodeCount := actorMsg.GetAndIncrementRuleNodeCounter()
+	if r.ruleNode.IsDebug {
+		_ = r.SystemCtx.PersistDebugInput(r.tenantId, &entities.RuleNodeId{Id: r.ruleNode.RuleNodeId}, actorMsg, msg.FromRelationType, nil)
+	}
 	err := r.node.OnMessage(msg.Ctx, actorMsg)
 	if err != nil {
 		msg.Ctx.TellError(actorMsg, errors.New("onRuleChainToNodeMsg error"))
@@ -117,11 +125,15 @@ func (r *RuleNodeActor) onRuleChainToNodeMsg(msg *ruleEngine.RuleChainToRuleNode
 }
 
 func (r *RuleNodeActor) Destroy() error {
-	panic("implement me")
+	return nil
 }
 
 func (r *RuleNodeActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureStrategy {
-	panic("implement me")
+	if err != nil {
+		return ruleEngine.Stop()
+	} else {
+		return ruleEngine.Resume()
+	}
 }
 
 type RuleNodeActorCreator struct {

+ 16 - 0
pkg/entities/entities.go

@@ -15,6 +15,22 @@ const (
 	RULE_NODE
 )
 
+func (a EntityType) String() string {
+	switch a {
+	case 0:
+		return "TENANT"
+	case 1:
+		return "DEVICE"
+	case 2:
+		return "ALARM"
+	case 3:
+		return "RULE_CHAIN"
+	case 4:
+		return "RULE_NODE"
+	}
+	return ""
+}
+
 type RuleNodeId struct {
 	Id string
 }

+ 18 - 0
pkg/models/event.go

@@ -0,0 +1,18 @@
+package models
+
+import "github.com/jinzhu/gorm"
+
+type Event struct {
+	gorm.Model
+	RecordId     string `gorm:"column:record_id;size:32;index"`
+	ServerId     string `gorm:"column:server_id"`
+	EventType    string `gorm:"column:event_type;size:20"`    // 事件类型(IN,OUT)
+	MessageType  string `gorm:"column:type;size:50"`          // 消息类型
+	EntityType   string `gorm:"column:entity_type;size:20"`   // 实体类型
+	EntityId     string `gorm:"column:entity_id;size:32"`     // 实体Id
+	MessageId    string `gorm:"column:message_id;size:36"`    // 消息Id
+	RelationType string `gorm:"column:relation_type;size:30"` // 关系类型
+	Data         string `gorm:"column:data;size:2048"`        // 数据
+	MetaData     string `gorm:"column:meta_data;size:2048"`   // 元数据
+	Error        string `gorm:"column:error;size:2048"`       // 错误信息
+}

+ 1 - 0
pkg/mysql/migrate.go

@@ -42,6 +42,7 @@ func MigrateDatabase(dbhost, dbport, dbname, dbuser, dbpass string) error {
 		&models.Protocal{},
 		&models.AlarmRule{},
 		&models.Menu{},
+		&models.Event{},
 	).Error
 	if err != nil {
 		fmt.Printf("%s", err.Error())

+ 1 - 1
pkg/mysql/migrate_test.go

@@ -5,7 +5,7 @@ import (
 )
 
 func TestMigrate(t *testing.T) {
-	err := MigrateDatabase("localhost", "3306", "", "root", "")
+	err := MigrateDatabase("39.98.250.155", "3306", "gxt-iot-db", "root", "gEkYDPloQcp93t4WHr3X")
 	if err != nil {
 		t.Error(err)
 	}

+ 3 - 3
pkg/productconfig/productconfig.go

@@ -41,9 +41,9 @@ type ProductObject struct {
 
 // ProductConfig product config parses the JSON product config string.
 type ProductConfig struct {
-	Objects  []ProductObject
-	Commands []ProductCommandOrEvent
-	Events   []ProductCommandOrEvent
+	Objects  []ProductObject         `json:"objects"`
+	Commands []ProductCommandOrEvent `json:"commands"`
+	Events   []ProductCommandOrEvent `json:"events"`
 }
 
 // New create a new productConfig

+ 69 - 5
pkg/ruleEngine/actor_system.go

@@ -1,12 +1,18 @@
 package ruleEngine
 
 import (
+	"encoding/json"
 	"errors"
 	"fmt"
 	"github.com/gogf/gf/os/grpool"
+	"github.com/gogf/gf/util/guid"
+	"golang.org/x/time/rate"
+	"sparrow/pkg/entities"
+	"sparrow/pkg/models"
 	"sparrow/pkg/protocol"
 	"sparrow/pkg/server"
 	"sync"
+	"time"
 )
 
 // SystemContext actor system context, with some func
@@ -16,26 +22,84 @@ type SystemContext struct {
 	ClusterService   ClusterService
 	RuleChainService RuleChainService
 	TenantService    TenantService
+	EventService     EventService
+	// 调试信息的限流器
+	debugPerTenantLimits map[string]*rate.Limiter
 }
 
 type SystemContextServiceConfig struct {
 	ClusterService   ClusterService
 	RuleChainService RuleChainService
 	TenantService    TenantService
+	EventService     EventService
 }
 
 func NewSystemContext(sys System, config SystemContextServiceConfig) *SystemContext {
-	if config.TenantService == nil || config.RuleChainService== nil || config.ClusterService== nil {
+	if config.TenantService == nil || config.RuleChainService == nil || config.ClusterService == nil ||
+		config.EventService == nil {
 		panic("RuleEngine init error: services is not set")
 	}
 	return &SystemContext{
-		ActorSystem: sys,
-		ClusterService: config.ClusterService,
-		RuleChainService: config.RuleChainService,
-		TenantService: config.TenantService,
+		ActorSystem:          sys,
+		ClusterService:       config.ClusterService,
+		RuleChainService:     config.RuleChainService,
+		TenantService:        config.TenantService,
+		debugPerTenantLimits: make(map[string]*rate.Limiter),
+		EventService:         config.EventService,
 	}
 }
 
+// PersistDebugInput 保存输入调试信息
+func (s *SystemContext) PersistDebugInput(tenantId string, entityId entities.EntityId, msg *protocol.Message, relationType string, err error) error {
+	return s.persistDebugAsync(tenantId, entityId, "IN", msg, relationType, err)
+}
+
+// PersistDebugOutput 保存输出调试信息
+func (s *SystemContext) PersistDebugOutput(tenantId string, entityId entities.EntityId, msg *protocol.Message, relationType string, err error) error {
+	return s.persistDebugAsync(tenantId, entityId, "OUT", msg, relationType, err)
+}
+
+func (s *SystemContext) persistDebugAsync(tenantId string, id entities.EntityId, eType string, msg *protocol.Message, rType string, errInfo error) error {
+	var limiter *rate.Limiter
+	if v, ok := s.debugPerTenantLimits[tenantId]; !ok {
+		limiter = rate.NewLimiter(10, 200)
+		s.debugPerTenantLimits[tenantId] = limiter
+	} else {
+		limiter = v
+	}
+
+	if limiter.AllowN(time.Now(), 200) {
+		var errStr string
+		if v, ok := msg.MetaData["error"]; ok {
+			errStr = v.(string)
+		}
+		if errInfo != nil {
+			errStr = errInfo.Error()
+		}
+		buf, err := json.Marshal(msg.MetaData)
+		if err != nil {
+			server.Log.WithField("method", "persistDebugAsync").Error(err)
+		}
+		if s.EventService != nil {
+			if err := s.EventService.SaveAsync(&models.Event{
+				RecordId:     guid.S(),
+				ServerId:     server.InternalIP,
+				EventType:    eType,
+				EntityType:   id.GetEntityType().String(),
+				EntityId:     id.GetId(),
+				MessageId:    msg.Id,
+				RelationType: rType,
+				Data:         msg.Data,
+				MetaData:     string(buf),
+				Error:        errStr,
+			}); err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
 func (s *SystemContext) Tell(msg protocol.ActorMsg) {
 	s.AppActor.Tell(msg)
 }

+ 41 - 16
pkg/ruleEngine/context.go

@@ -1,7 +1,9 @@
 package ruleEngine
 
 import (
+	"sparrow/pkg/entities"
 	"sparrow/pkg/protocol"
+	"sparrow/pkg/server"
 	"time"
 )
 
@@ -40,7 +42,15 @@ func (d *DefaultContext) TellNext(msg *protocol.Message, relationType protocol.R
 
 func (d *DefaultContext) tellNext(msg *protocol.Message, relationTypes []protocol.RelationType, err error) {
 	if d.nodeCtx.Self.IsDebug {
-		// TODO: 输出调试日志
+		for _, item := range relationTypes {
+			if Err := d.mainCtx.PersistDebugOutput(d.nodeCtx.TenantId,
+				&entities.RuleNodeId{Id: d.nodeCtx.Self.RuleNodeId},
+				msg,
+				item.String(),
+				err); Err != nil {
+				continue
+			}
+		}
 	}
 	msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId)
 	d.nodeCtx.ChainActor.Tell(
@@ -62,7 +72,15 @@ func (d *DefaultContext) TellSelf(msg *protocol.Message, duration time.Duration)
 
 func (d *DefaultContext) TellError(msg *protocol.Message, err error) {
 	if d.nodeCtx.Self.IsDebug {
-		// TODO: 处理调试
+		if err := d.mainCtx.PersistDebugOutput(d.nodeCtx.TenantId,
+			&entities.RuleNodeId{Id: d.nodeCtx.Self.RuleNodeId},
+			msg,
+			protocol.Failure.String(),
+			err,
+		); err != nil {
+			server.Log.WithField("method", "DefaultContext.TellError").
+				Error(err)
+		}
 	}
 	d.nodeCtx.ChainActor.Tell(&RuleNodeToRuleChanTellNextMsg{
 		RuleNodeId:     d.nodeCtx.Self.RuleNodeId,
@@ -73,7 +91,15 @@ func (d *DefaultContext) TellError(msg *protocol.Message, err error) {
 }
 func (d *DefaultContext) Ack(msg *protocol.Message) {
 	if d.nodeCtx.Self.IsDebug {
-		// TODO: 处理调试
+		if err := d.mainCtx.PersistDebugOutput(d.nodeCtx.TenantId,
+			&entities.RuleNodeId{Id: d.nodeCtx.Self.RuleNodeId},
+			msg,
+			"ACK",
+			nil,
+		); err != nil {
+			server.Log.WithField("method", "DefaultContext.Ack").
+				Error(err)
+		}
 	}
 	msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId)
 	msg.GetCallBack().OnSuccess()
@@ -81,17 +107,16 @@ func (d *DefaultContext) Ack(msg *protocol.Message) {
 
 func (d *DefaultContext) TransformMessage(msg *protocol.Message, msgType, originator string,
 	metaData map[string]interface{}, data string) *protocol.Message {
-		return &protocol.Message{
-			QueueName:  msg.QueueName,
-			Id:         msg.Id,
-			Ts:         msg.Ts,
-			Type:       msgType,
-			Data:       data,
-			RuleChanId: msg.RuleChanId,
-			RuleNodeId: msg.RuleNodeId,
-			Callback:   msg.Callback,
-			MetaData:   metaData,
-			Originator: originator,
-		}
+	return &protocol.Message{
+		QueueName:  msg.QueueName,
+		Id:         msg.Id,
+		Ts:         msg.Ts,
+		Type:       msgType,
+		Data:       data,
+		RuleChanId: msg.RuleChanId,
+		RuleNodeId: msg.RuleNodeId,
+		Callback:   msg.Callback,
+		MetaData:   metaData,
+		Originator: originator,
+	}
 }
-

+ 10 - 0
pkg/ruleEngine/event_service.go

@@ -0,0 +1,10 @@
+package ruleEngine
+
+import "sparrow/pkg/models"
+
+type EventService interface {
+	// 保存事件
+	Save(data *models.Event) error
+	// 异步保存事件
+	SaveAsync(data *models.Event) error
+}

+ 0 - 2
pkg/ruleEngine/nodes/rest_api_request_node.go

@@ -3,7 +3,6 @@ package nodes
 import (
 	"bytes"
 	"encoding/json"
-	"fmt"
 	"github.com/gogf/gf/os/grpool"
 	"net/http"
 	"sparrow/pkg/protocol"
@@ -56,7 +55,6 @@ func (r *RestApiRequestNode) OnMessage(ctx ruleEngine.Context, message *protocol
 	for k, v := range r.config.Headers {
 		headers[k] = v
 	}
-	fmt.Printf("%+v\r\n", headers)
 	w := new(bytes.Buffer)
 	if err := json.NewEncoder(w).Encode(body); err != nil {
 		return err

+ 8 - 8
pkg/ruleEngine/rule_chain_service.go

@@ -45,7 +45,7 @@ func (t *TestRuleChainService) FindRuleChainById(tenantId, ruleChainId string) (
 		TenantId:    "tenant_1",
 		Name:        "test rule chain 1",
 		FirstNodeId: "node1",
-		IsDebug:     false,
+		IsDebug:     true,
 		IsRoot:      true,
 		Config:      "",
 		ChainId:     "chain id 1",
@@ -59,7 +59,7 @@ func (t *TestRuleChainService) FindRuleNodeById(tenantId, ruleNodeId string) (*R
 			RuleChainId: "chain id 1",
 			Type:        "MsgTypeFilterNode",
 			Name:        "filternode1",
-			IsDebug:     false,
+			IsDebug:     true,
 			Config:      "",
 			RuleNodeId:  "node1",
 		}, nil
@@ -68,7 +68,7 @@ func (t *TestRuleChainService) FindRuleNodeById(tenantId, ruleNodeId string) (*R
 			RuleChainId: "chain id 2",
 			Type:        "MsgTypeFilterNode",
 			Name:        "filternode2",
-			IsDebug:     false,
+			IsDebug:     true,
 			Config:      "",
 			RuleNodeId:  "node2",
 		}, nil
@@ -77,7 +77,7 @@ func (t *TestRuleChainService) FindRuleNodeById(tenantId, ruleNodeId string) (*R
 			RuleChainId: "chain id 2",
 			Type:        "RestApiRequestNode",
 			Name:        "rest api",
-			IsDebug:     false,
+			IsDebug:     true,
 			Config:      "",
 			RuleNodeId:  "node3",
 		}, nil
@@ -91,7 +91,7 @@ func (t *TestRuleChainService) GetRuleChainNodes(tenantId, ruleChainId string) (
 			RuleChainId: "chain id 1",
 			Type:        "MsgTypeFilterNode",
 			Name:        "filternode",
-			IsDebug:     false,
+			IsDebug:     true,
 			Config:      "",
 			RuleNodeId:  "node1",
 		},
@@ -99,7 +99,7 @@ func (t *TestRuleChainService) GetRuleChainNodes(tenantId, ruleChainId string) (
 			RuleChainId: "chain id 2",
 			Type:        "MsgTypeFilterNode",
 			Name:        "filternode",
-			IsDebug:     false,
+			IsDebug:     true,
 			Config:      "",
 			RuleNodeId:  "node2",
 		},
@@ -107,7 +107,7 @@ func (t *TestRuleChainService) GetRuleChainNodes(tenantId, ruleChainId string) (
 			RuleChainId: "chain id 2",
 			Type:        "RestApiRequestNode",
 			Name:        "rest api",
-			IsDebug:     false,
+			IsDebug:     true,
 			Config:      "",
 			RuleNodeId:  "node3",
 		},
@@ -120,7 +120,7 @@ func (t *TestRuleChainService) FindRuleChains(tenantId string) ([]*RuleChain, er
 			TenantId:    "tenant_1",
 			Name:        "test rule chain 1",
 			FirstNodeId: "node1",
-			IsDebug:     false,
+			IsDebug:     true,
 			IsRoot:      true,
 			Config:      "",
 			ChainId:     "chain id 1",

+ 4 - 3
pkg/server/server_manager.go

@@ -123,8 +123,9 @@ func (mgr *ServerManager) RegisterServer() error {
 	// print common keys info
 	Log.Infof("RegisterServer is done. leaseId is %v\n", mgr.leaseId)
 	go func() {
-		for leaseResp := range mgr.keepAliveChan {
-			Log.Infof("update lease success:%d", leaseResp.ID)
+		for {
+			leaseResp := <-mgr.keepAliveChan
+			Log.Infof("update lease success:%d", leaseResp.TTL)
 		}
 	}()
 	return nil
@@ -150,7 +151,7 @@ func (mgr *ServerManager) UpdateServerHosts() error {
 		if _, ok := servers[name]; !ok {
 			servers[name] = make(map[string][]string)
 		}
-		if _, ok := servers[name][host];!ok {
+		if _, ok := servers[name][host]; !ok {
 			servers[name][host] = []string{}
 		}
 		servers[name][host] = append(servers[name][host], addr)

+ 10 - 10
run.sh

@@ -1,19 +1,19 @@
 export GOPATH=/Users/lijian/go
-
+LEVEL="error"
 sudo killall -9 httpaccess registry apiprovider devicemanager controller mqttaccess knowoapi fileaccess coapaccess
 
 # start services
 #$GOPATH/bin/httpaccess -etcd http://localhost:2379 -httphost internal:443 -loglevel debug -usehttps -keyfile $GOPATH/src/github.com/PandoCloud/pando-cloud/pkg/server/testdata/key.pem -cafile $GOPATH/src/github.com/PandoCloud/pando-cloud/pkg/server/testdata/cert.pem &
-$GOPATH/bin/httpaccess -etcd http://127.0.0.1:2379 -httphost internal:8088 -loglevel debug &
-$GOPATH/bin/registry -etcd http://127.0.0.1:2379 -rpchost internal:20034 -aeskey ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP -dbhost 39.98.250.155 -dbname gxt-iot-db -dbport 3306 -dbuser root -dbpass gEkYDPloQcp93t4WHr3X -loglevel debug &
-$GOPATH/bin/apiprovider -etcd http://127.0.0.1:2379 -loglevel debug  -httphost internal:8888 &
-$GOPATH/bin/devicemanager -etcd http://127.0.0.1:2379 -loglevel debug  -rpchost internal:20033 &
-$GOPATH/bin/controller -etcd http://127.0.0.1:2379 -loglevel debug  -rpchost internal:20032 &
+$GOPATH/bin/httpaccess -etcd http://127.0.0.1:2379 -httphost internal:8088 -loglevel $LEVEL &
+$GOPATH/bin/registry -etcd http://127.0.0.1:2379 -rpchost internal:20034 -aeskey ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP -dbhost 39.98.250.155 -dbname gxt-iot-db -dbport 3306 -dbuser root -dbpass gEkYDPloQcp93t4WHr3X -loglevel $LEVEL &
+$GOPATH/bin/apiprovider -etcd http://127.0.0.1:2379 -loglevel $LEVEL  -httphost internal:8888 &
+$GOPATH/bin/devicemanager -etcd http://127.0.0.1:2379 -loglevel $LEVEL  -rpchost internal:20033 &
+$GOPATH/bin/controller -etcd http://127.0.0.1:2379 -loglevel $LEVEL  -rpchost internal:20032 &
 #$GOPATH/bin/mqttaccess -etcd http://localhost:2379 -loglevel debug  -rpchost localhost:20030 -tcphost internal:1883 -usetls -keyfile $GOPATH/src/github.com/PandoCloud/pando-cloud/pkg/server/testdata/key.pem -cafile $GOPATH/src/github.com/PandoCloud/pando-cloud/pkg/server/testdata/cert.pem &
-$GOPATH/bin/mqttaccess -etcd http://127.0.0.1:2379 -loglevel debug  -rpchost internal:20030 -tcphost internal:1883  &
-$GOPATH/bin/knowoapi -etcd http://127.0.0.1:2379 -loglevel debug  -httphost internal:8889 -dbhost 39.98.250.155 -dbname gxt-iot-db -dbport 3306 -dbuser root -dbpass gEkYDPloQcp93t4WHr3X -aeskey ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP &
-$GOPATH/bin/fileaccess -etcd http://127.0.0.1:2379 -loglevel debug  -rpchost internal:20035 -httphost internal:9000 &
-$GOPATH/bin/coapaccess -etcd http://127.0.0.1:2379 -loglevel debug  -udphost internal:56883 &
+$GOPATH/bin/mqttaccess -etcd http://127.0.0.1:2379 -loglevel $LEVEL  -rpchost internal:20030 -tcphost internal:1883  &
+$GOPATH/bin/knowoapi -etcd http://127.0.0.1:2379 -loglevel $LEVEL  -httphost internal:8889 -dbhost 39.98.250.155 -dbname gxt-iot-db -dbport 3306 -dbuser root -dbpass gEkYDPloQcp93t4WHr3X -aeskey ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP &
+$GOPATH/bin/fileaccess -etcd http://127.0.0.1:2379 -loglevel $LEVEL  -rpchost internal:20035 -httphost internal:9000 &
+$GOPATH/bin/coapaccess -etcd http://127.0.0.1:2379 -loglevel $LEVEL  -udphost internal:56883 &
 exit 0
 
 

+ 5 - 5
services/controller/controller.go

@@ -24,8 +24,8 @@ type Controller struct {
 	ift          *rule.Ifttt
 	actorContext *ruleEngine.SystemContext
 	consumer     queue.QueueConsumer
-	cluster *ClusterService
-	pool *grpool.Pool
+	cluster      *ClusterService
+	pool         *grpool.Pool
 }
 
 func NewController(rabbithost string) (*Controller, error) {
@@ -48,7 +48,7 @@ func NewController(rabbithost string) (*Controller, error) {
 		timer:    t,
 		ift:      ttt,
 		consumer: consumer,
-		cluster:&ClusterService{producer:producer},
+		cluster:  &ClusterService{producer: producer},
 		pool:     grpool.New(),
 	}, nil
 }
@@ -173,7 +173,6 @@ type ActorSystem struct {
 	rootActor ruleEngine.Ref
 }
 
-
 // 初始化actor system
 func (c *Controller) initActorSystem() (*ActorSystem, error) {
 
@@ -194,7 +193,7 @@ func (c *Controller) initActorSystem() (*ActorSystem, error) {
 		ClusterService:   c.cluster,
 		RuleChainService: ruleChainService,
 		TenantService:    tenantService,
-
+		EventService:     NewEventService(),
 	})
 	appActor, err := system.CreateRootActor(ruleEngine.APP_DISPATCHER_NAME,
 		actors.NewAppActorCreator(actorContext))
@@ -208,6 +207,7 @@ func (c *Controller) initActorSystem() (*ActorSystem, error) {
 	c.actorContext = actorContext
 	return &ActorSystem{rootActor: appActor}, nil
 }
+
 // 启动mq consumers
 func (c *Controller) launchConsumer() {
 	msgs, err := c.consumer.Pop(100 * time.Millisecond)

+ 26 - 5
services/controller/service.go

@@ -1,8 +1,11 @@
 package main
 
 import (
+	"github.com/gogf/gf/os/grpool"
+	"sparrow/pkg/models"
 	"sparrow/pkg/protocol"
 	"sparrow/pkg/queue"
+	"sparrow/pkg/rpcs"
 	"sparrow/pkg/server"
 )
 
@@ -16,11 +19,33 @@ func (c *ClusterService) PushMessageToRuleEngine(info *queue.TopicPartitionInfo,
 	if err != nil {
 		return
 	}
-	if err :=c.producer.Send(info, m, callback);err != nil {
+	if err := c.producer.Send(info, m, callback); err != nil {
 		server.Log.Error(err)
 	}
 }
 
+// EventService 事件持久化服务
+type EventService struct {
+	pool *grpool.Pool
+}
+
+// NewEventService create
+func NewEventService() *EventService {
+	return &EventService{pool: grpool.New(5)}
+}
+
+func (e *EventService) Save(data *models.Event) error {
+	return server.RPCCallByName(nil, "registry", "Registry.CreateEvent", data, &rpcs.ReplyEmptyResult{})
+}
+
+func (e *EventService) SaveAsync(data *models.Event) error {
+	return e.pool.Add(func() {
+		if err := server.RPCCallByName(nil, "registry", "Registry.CreateEvent", data, &rpcs.ReplyEmptyResult{}); err != nil {
+			return
+		}
+	})
+}
+
 //type TenantService struct {
 //
 //}
@@ -45,7 +70,3 @@ func (c *ClusterService) PushMessageToRuleEngine(info *queue.TopicPartitionInfo,
 //	var reply *models.Vendor
 //	err := server.
 //}
-
-
-
-

+ 1 - 1
services/fileaccess/flags.go

@@ -12,7 +12,7 @@ const (
 	defaultStaticPath = "upload"
 	defaultMaxSize    = 300 << 10         //默认300K
 	defaultAllowExt   = ".jpeg|.jpg|.png" //注意.号
-	defaultRedisHost  = "192.168.175.60:6379"
+	defaultRedisHost  = "127.0.0.1:6379"
 )
 
 var (

+ 0 - 1
services/registry/product.go

@@ -102,7 +102,6 @@ func (r *Registry) ValidateProduct(key string, reply *models.Product) error {
 	}
 
 	id, err := r.keygen.DecodeIDFromRandomKey(key)
-	server.Log.Debug(id)
 	if err != nil {
 		return err
 	}

+ 9 - 1
services/registry/registry.go

@@ -219,7 +219,7 @@ func (r *Registry) FindVendor(recordId string, reply *models.Vendor) error {
 }
 
 // GetVendors will get all vendors in the platform.
-func (r *Registry) GetVendors(noarg int, reply []*models.Vendor) error {
+func (r *Registry) GetVendors(noarg int, reply *[]*models.Vendor) error {
 	db, err := getDB()
 	if err != nil {
 		return err
@@ -482,3 +482,11 @@ func (r *Registry) QueryRules(args *models.Rule, reply *[]models.Rule) error {
 
 	return nil
 }
+// CreateEvent create event
+func (r *Registry) CreateEvent(args *models.Event, reply *rpcs.ReplyEmptyResult) error {
+	db, err := getDB()
+	if err != nil {
+		return err
+	}
+	return db.Save(args).Error
+}

BIN
tests/device/device


+ 2 - 2
tests/device/main.go

@@ -6,7 +6,7 @@ import (
 )
 
 var (
-	testURL        = flag.String("url", "http://192.168.1.100:8088", "login url")
+	testURL        = flag.String("url", "http://192.168.1.112:8088", "login url")
 	testProductKey = flag.String("productkey", "2e397f5599a3f6f6a5a3c8fcd45437169501b3c6e239042ad5e9b65303561e41ab5519e9d205facbcbe75a2784354501", "product key")
 	testProtocol   = flag.String("protocol", "mqtt", "access protocol")
 )
@@ -19,7 +19,7 @@ func main() {
 		return
 	}
 
-	dev := NewDevice(*testURL, *testProductKey, "123456", "1.2.3", *testProtocol)
+	dev := NewDevice(*testURL, *testProductKey, "3344", "1.25", *testProtocol)
 
 	err := dev.DoRegister()
 	if err != nil {