Parcourir la source

更新规则引擎

lijian il y a 4 ans
Parent
commit
dc3c079417
47 fichiers modifiés avec 1367 ajouts et 272 suppressions
  1. 5 9
      pkg/actors/rule_chain_actor.go
  2. 8 10
      pkg/models/rulechain.go
  3. 1 0
      pkg/models/vendor.go
  4. 1 1
      pkg/mongo/recorder_test.go
  5. 11 7
      pkg/productconfig/productconfig.go
  6. 18 9
      pkg/productconfig/productconfig_test.go
  7. 17 6
      pkg/protocol/message.go
  8. 4 3
      pkg/protocol/structure.go
  9. 263 0
      pkg/queue/msgQueue/rabbitmq.go
  10. 43 0
      pkg/queue/msgQueue/rabbitmq_test.go
  11. 7 0
      pkg/queue/queue_admin.go
  12. 2 2
      pkg/queue/queue_consumer.go
  13. 52 12
      pkg/queue/queue_msg.go
  14. 4 2
      pkg/queue/queue_producer.go
  15. 7 3
      pkg/queue/topic_partition_info.go
  16. 15 3
      pkg/ruleEngine/actor_system.go
  17. 5 2
      pkg/ruleEngine/cluster_service.go
  18. 19 0
      pkg/ruleEngine/context.go
  19. 2 0
      pkg/ruleEngine/nodes/msg_type_filter_node.go
  20. 2 0
      pkg/ruleEngine/nodes/msg_type_switch_node.go
  21. 1 0
      pkg/ruleEngine/nodes/reg_types.go
  22. 112 0
      pkg/ruleEngine/nodes/rest_api_request_node.go
  23. 27 0
      pkg/ruleEngine/rule_chain_service.go
  24. 2 1
      pkg/server/config.go
  25. 3 3
      pkg/server/http_server.go
  26. 19 12
      pkg/server/netif.go
  27. 0 33
      pkg/server/netif_test.go
  28. 2 2
      pkg/server/server.go
  29. 23 15
      pkg/server/server_manager.go
  30. 3 3
      pkg/server/tcp_server.go
  31. 2 2
      pkg/server/udp_server.go
  32. 475 0
      pkg/utils/http_client.go
  33. 8 8
      run.sh
  34. 1 1
      services/apiprovider/response.go
  35. 125 67
      services/controller/controller.go
  36. 0 36
      services/controller/controller_test.go
  37. 1 5
      services/controller/flags.go
  38. 7 1
      services/controller/main.go
  39. 51 0
      services/controller/service.go
  40. 1 1
      services/devicemanager/flags.go
  41. 1 1
      services/knowoapi/model/device_test.go
  42. 2 0
      services/knowoapi/model/vendor.go
  43. 1 1
      services/registry/registry.go
  44. 1 0
      services/registry/vendor.go
  45. BIN
      tests/device/device
  46. 12 9
      tests/device/device.go
  47. 1 2
      tests/device/main.go

+ 5 - 9
pkg/actors/rule_chain_actor.go

@@ -125,9 +125,10 @@ func (r *RuleChainActor) onTellNext(msg *protocol.Message, originatorNodeId stri
 			}
 		}
 	}
+	fmt.Printf("+++++++++nodeid:%s, creator:%s, %+v, types:%v", originatorNodeId, originatorId, relations, relationTypes)
 	if len(relations) == 0 {
 		server.Log.Debugf("No outbound relations to process,%s, %s", originatorId, r.tenantId)
-		if contains(relationTypes, string(protocol.Failure)) {
+		if contains(relationTypes, "Failure") {
 			if ruleNodeCtx, ok := r.nodeActors[originatorNodeId]; ok {
 				msg.GetCallBack().OnFailure(errors.New(fmt.Sprintf(""+
 					"[%s], ruleChainName:%s, ruleNodeId:%s", errMsg, r.ruleChainName, ruleNodeCtx.Self.RuleNodeId)))
@@ -152,7 +153,7 @@ func (r *RuleChainActor) onTellNext(msg *protocol.Message, originatorNodeId stri
 
 // push a message to target ctx
 func (r *RuleChainActor) pushMsgToTarget(tpi *queue.TopicPartitionInfo, msg *protocol.Message, entityId entities.EntityId, fromRelationType string) {
-	if !tpi.MyPartition {
+	if tpi.MyPartition {
 		switch entityId.GetEntityType() {
 		case entities.RULE_NODE:
 			targetCtx := r.nodeActors[entityId.GetId()]
@@ -181,17 +182,12 @@ func (r *RuleChainActor) putToQueue(tpi *queue.TopicPartitionInfo, msg *protocol
 }
 
 func (r *RuleChainActor) putMessageToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback) {
-	msgBytes, err := msg.Encode()
-	if err != nil {
-		server.Log.Error(err)
-	}
-	fmt.Printf("%v", tpi)
-	r.clusterService.PushMessageToRuleEngine(tpi, msg.Id, msgBytes, queueCallback)
+	r.clusterService.PushMessageToRuleEngine(tpi, msg.Id, msg, queueCallback)
 }
 
 func contains(relations []string, relation string) bool {
 	if len(relations) == 0 {
-		return true
+		return false
 	}
 	for _, item := range relations {
 		if strings.ToLower(item) == strings.ToLower(relation) {

+ 8 - 10
pkg/models/rulechain.go

@@ -7,14 +7,12 @@ import (
 // RuleChain 规则链
 type RuleChain struct {
 	gorm.Model
-	AdditionalInfo  string //节点属性信息
-	Configuration   string //配置信息
-	Name            string //名称
-	FirstRuleNodeID int    //第一个节点的ID
-	Root            bool   //是否为root chain
-	DebugModel      bool   //调试模式
-	Intro           string //描述
-	ProductID       uint   //产品ID
-	DeviceID        string //设备ID
-	VendorID        uint   //厂商ID
+	AdditionalInfo  string `gorm:"column:additional_info"`    //节点属性信息
+	Configuration   string `gorm:"column:configuration"`      //配置信息
+	Name            string `gorm:"column:name"`               //名称
+	FirstRuleNodeID int    `gorm:"column:first_rule_node_id"` //第一个节点的ID
+	Root            bool   `gorm:"column:root"`               //是否为root chain
+	DebugModel      bool   `gorm:"column:debug_model"`        //调试模式
+	Intro           string `gorm:"column:intro"`              //描述
+	VendorID        int    `gorm:"column:vendor_id"`          //厂商ID
 }

+ 1 - 0
pkg/models/vendor.go

@@ -10,6 +10,7 @@ import (
 // vendor is those who make products
 type Vendor struct {
 	gorm.Model
+	RecordId string `gorm:"column:record_id;size:32;index"`
 	// vendor name
 	VendorName string `sql:"type:varchar(200);not null;"`
 	// vendor key

+ 1 - 1
pkg/mongo/recorder_test.go

@@ -25,7 +25,7 @@ func TestRecorder(t *testing.T) {
 
 	subdata := protocol.SubData{
 		Head: protocol.SubDataHead{
-			SubDeviceid: 1,
+			SubDeviceid: "1",
 			PropertyNum: 2,
 			ParamsCount: 3,
 		},

+ 11 - 7
pkg/productconfig/productconfig.go

@@ -25,9 +25,9 @@ type ProductCommandOrEvent struct {
 
 // StatusParam 上报类型的参数
 type StatusParam struct {
-	ValueType int32 `json:"value_type"`
-	Name      string
-	ID        uint `json:"pid"` //protocal id 与后台对应
+	ValueType int32  `json:"value_type"`
+	Name      string `json:"name"`
+	ID        uint   `json:"pid"` //protocal id 与后台对应
 }
 
 // ProductObject 产品对象
@@ -120,21 +120,25 @@ func (config *ProductConfig) ValidateCommandOrEvent(name string, params []interf
 }
 
 // StatusToMap struct to map
-func (config *ProductConfig) StatusToMap(status []protocol.SubData) (map[string][]interface{}, error) {
-	result := make(map[string][]interface{})
-
+func (config *ProductConfig) StatusToMap(status []protocol.SubData) (map[string]interface{}, error) {
+	result := make(map[string]interface{})
 	for _, sub := range status {
 		val, err := tlv.ReadTLVs(sub.Params)
 		if err != nil {
 			return nil, err
 		}
 		label := ""
+		values := make(map[string]interface{})
 		for _, obj := range config.Objects {
 			if obj.No == int(sub.Head.PropertyNum) {
 				label = obj.Label
 			}
+			for k, v := range obj.Status {
+				values[v.Name] = val[k]
+			}
 		}
-		result[label] = val
+		result[label] = values
+		result["device_id"] = sub.Head.ExternalDeviceId
 	}
 
 	return result, nil

+ 18 - 9
pkg/productconfig/productconfig_test.go

@@ -12,7 +12,7 @@ func testStatus(c *ProductConfig, t *testing.T) {
 	status :=
 		`
     {
-      "switch": [1]
+      "location": [1,2]
     }
     `
 
@@ -38,13 +38,19 @@ func testStatus(c *ProductConfig, t *testing.T) {
 			t.Error(err)
 		}
 	}
-
-	params := []tlv.TLV{*one}
+	two, err := tlv.MakeTLV(uint8(2))
+	if err != nil {
+		if err != nil {
+			t.Error(err)
+		}
+	}
+	params := []tlv.TLV{*one, *two}
 	teststatus := []protocol.SubData{protocol.SubData{
 		Head: protocol.SubDataHead{
 			SubDeviceid: uint16(1),
 			PropertyNum: uint16(1),
-			ParamsCount: uint16(1),
+			ParamsCount: uint16(2),
+			ExternalDeviceId: [8]byte{0x1,0x2},
 		},
 		Params: params,
 	}}
@@ -53,11 +59,11 @@ func testStatus(c *ProductConfig, t *testing.T) {
 	if err != nil {
 		t.Error(err)
 	}
-
-	t.Log(res)
+	str, err :=json.Marshal(res)
+	t.Log(string(str))
 
 	m := make(map[string]interface{})
-	m["switch"] = []interface{}{float64(1)}
+	m["location"] = []interface{}{float64(1)}
 	_, err = c.MapToStatus(m)
 	if err != nil {
 		t.Error(err)
@@ -122,11 +128,14 @@ func TestParseProductConfig(t *testing.T) {
     {
       "objects": [{
         "no": 1,
-        "label": "switch",
+        "label": "location",
         "part": 1,
         "status": [{
           "value_type": 7,
-          "name": "onoff"
+          "name": "lgt"
+        },{
+          "value_type": 7,
+          "name": "lat"
         }]
       }],
       "commands": [{

+ 17 - 6
pkg/protocol/message.go

@@ -3,15 +3,11 @@ package protocol
 import (
 	"bytes"
 	"encoding/gob"
+	"sparrow/pkg/server"
 	"sync/atomic"
 	"time"
 )
 
-type MessageSerializer interface {
-	Encode() bytes.Buffer
-	Decode()
-}
-
 type Message struct {
 	QueueName   string
 	Id          string
@@ -26,6 +22,21 @@ type Message struct {
 	execCounter int32
 }
 
+func (a *Message) TransformMsg(msgType string, ori string, data string) *Message {
+	return &Message{
+		QueueName:  a.QueueName,
+		Id:         a.Id,
+		Ts:         a.Ts,
+		Type:       msgType,
+		Data:       data,
+		RuleChanId: a.RuleChanId,
+		RuleNodeId: a.RuleNodeId,
+		Callback:   a.Callback,
+		MetaData:   a.MetaData,
+		Originator: ori,
+	}
+}
+
 func (a *Message) GetAndIncrementRuleNodeCounter() int32 {
 	return atomic.AddInt32(&a.execCounter, 1)
 }
@@ -77,7 +88,7 @@ func (e emptyCallBack) OnSuccess() {
 }
 
 func (e emptyCallBack) OnFailure(err error) {
-
+	server.Log.Error("消息出错:" + err.Error())
 }
 
 // SetCallBack

+ 4 - 3
pkg/protocol/structure.go

@@ -36,9 +36,10 @@ type Data struct {
 }
 
 type SubDataHead struct {
-	SubDeviceid uint16
-	PropertyNum uint16
-	ParamsCount uint16
+	SubDeviceid      uint16
+	PropertyNum      uint16
+	ParamsCount      uint16
+	ExternalDeviceId [8]byte // 扩展设备Id为兼容网关类设备
 }
 
 type SubData struct {

+ 263 - 0
pkg/queue/msgQueue/rabbitmq.go

@@ -0,0 +1,263 @@
+package msgQueue
+
+import (
+	"errors"
+	"github.com/streadway/amqp"
+	"sparrow/pkg/protocol"
+	"sparrow/pkg/queue"
+	"sparrow/pkg/server"
+	"sync"
+	"time"
+)
+
+// RabbitMessageQueueAdmin rabbit mq 管理器
+type RabbitMessageQueueAdmin struct {
+	conn      *amqp.Connection
+	ch        *amqp.Channel
+	arguments map[string]interface{}
+}
+
+func (r *RabbitMessageQueueAdmin) GetChannel() *amqp.Channel {
+	return r.ch
+}
+
+func (r *RabbitMessageQueueAdmin) GetConn() *amqp.Connection {
+	return r.conn
+}
+
+// RabbitMqSettings 配置
+type RabbitMqSettings struct {
+	Host string
+}
+
+func NewRabbitMessageQueueAdmin(settings *RabbitMqSettings, args map[string]interface{}) *RabbitMessageQueueAdmin {
+	conn, err := amqp.Dial(settings.Host)
+	if err != nil {
+		panic(err)
+	}
+
+	ch, err := conn.Channel()
+	if err != nil {
+		panic(err)
+	}
+	return &RabbitMessageQueueAdmin{
+		conn:      conn,
+		ch:        ch,
+		arguments: args,
+	}
+}
+
+func (r *RabbitMessageQueueAdmin) CreateTopicIfNotExists(topic string) error {
+	_, err := r.ch.QueueDeclare(topic,
+		true,  // durable
+		false, // delete when unused
+		false, // exclusive
+		false, // no-wait
+		nil)
+	return err
+}
+
+func (r *RabbitMessageQueueAdmin) Destroy() error {
+	if r.ch != nil {
+		if err := r.ch.Close(); err != nil {
+			return err
+		}
+	}
+	if r.conn != nil {
+		return r.conn.Close()
+	}
+	return nil
+}
+
+// RabbitMqProducer rabbit mq message producer
+type RabbitMqProducer struct {
+	defaultTopic string
+	admin        *RabbitMessageQueueAdmin
+	settings     *RabbitMqSettings
+	channel      *amqp.Channel
+	conn         *amqp.Connection
+	topics       map[string]*queue.TopicPartitionInfo
+}
+
+func NewRabbitMqProducer(admin *RabbitMessageQueueAdmin, defaultTopic string) *RabbitMqProducer {
+	result := new(RabbitMqProducer)
+	result.admin = admin
+	result.defaultTopic = defaultTopic
+	result.conn = admin.conn
+	result.channel = admin.ch
+	result.topics = make(map[string]*queue.TopicPartitionInfo)
+	return result
+}
+
+func (r *RabbitMqProducer) Init() error {
+	return nil
+}
+
+func (r *RabbitMqProducer) GetDefaultTopic() string {
+	return r.defaultTopic
+}
+
+func (r *RabbitMqProducer) Send(info *queue.TopicPartitionInfo, payload protocol.Payload, callback queue.Callback) error {
+	r.createTopicIfNoExist(info)
+	if r.channel == nil {
+		return errors.New("rabbit mq channel is not initialized")
+	}
+	bytes, err := payload.Marshal()
+	if err != nil {
+		server.Log.Errorf("queue message marshal error:%s", err.Error())
+		return err
+	}
+	err = r.channel.Publish("", info.String(), false, false,
+		amqp.Publishing{
+			DeliveryMode: amqp.Persistent,
+			Body:         bytes,
+		})
+	if err != nil {
+		if callback != nil {
+			callback.OnFailure(err)
+		}
+		server.Log.Errorf("rabbit mq message publish error:%s", err.Error())
+		return err
+	}
+	if callback != nil {
+		callback.OnSuccess()
+	}
+	return nil
+}
+
+func (r *RabbitMqProducer) createTopicIfNoExist(tpi *queue.TopicPartitionInfo) {
+	if _, ok := r.topics[tpi.String()]; !ok {
+		_ = r.admin.CreateTopicIfNotExists(tpi.String())
+		r.topics[tpi.String()] = tpi
+	}
+}
+
+func (r *RabbitMqProducer) Stop() error {
+	if r.admin != nil {
+		return r.admin.Destroy()
+	}
+	return nil
+}
+
+type RabbitMqConsumer struct {
+	admin      *RabbitMessageQueueAdmin
+	topics     []string
+	topic      string
+	partitions []*queue.TopicPartitionInfo
+	subscribe  bool
+	mu         sync.Mutex
+	recvChan   chan []byte
+}
+
+func (r *RabbitMqConsumer) GetTopic() string {
+	return r.topic
+}
+
+func (r *RabbitMqConsumer) Subscribe() error {
+	r.mu.Lock()
+	defer r.mu.Unlock()
+	r.partitions = append(r.partitions, &queue.TopicPartitionInfo{
+		Topic:       r.topic,
+		TenantId:    "",
+		Partition:   0,
+		MyPartition: true,
+	})
+	r.subscribe = false
+	return nil
+}
+
+func (r *RabbitMqConsumer) SubscribeWithPartitions(partitions []*queue.TopicPartitionInfo) error {
+	r.mu.Lock()
+	defer r.mu.Unlock()
+	r.partitions = partitions
+	r.subscribe = false
+	return nil
+}
+
+func (r *RabbitMqConsumer) UnSubscribe() {
+	_ = r.admin.Destroy()
+}
+
+func (r *RabbitMqConsumer) Pop(duration time.Duration) (<-chan queue.QueueMessage, error) {
+	result := make(chan queue.QueueMessage, 10)
+	if !r.subscribe && len(r.partitions) == 0 {
+		time.Sleep(duration)
+	} else {
+		r.mu.Lock()
+		defer r.mu.Unlock()
+		if !r.subscribe {
+			for _, p := range r.partitions {
+				r.topics = append(r.topics, p.String())
+			}
+			r.doSubscribe(r.topics)
+			r.subscribe = true
+		}
+		go r.doPop(duration)
+		go func() {
+			for {
+				select {
+				case msg := <-r.recvChan:
+					m := &queue.GobQueueMessage{}
+					err := m.UnMarshal(msg)
+					if err != nil {
+						server.Log.Error(err)
+						continue
+					}
+					result <- m
+				}
+			}
+		}()
+
+	}
+	return result, nil
+}
+
+func (r *RabbitMqConsumer) doSubscribe(topics []string) {
+	for _, item := range topics {
+		_ = r.admin.CreateTopicIfNotExists(item)
+	}
+}
+
+func (r *RabbitMqConsumer) doPop(duration time.Duration) error {
+	if r.admin.ch == nil || r.admin.conn == nil {
+		return errors.New("ch and conn is not init")
+	}
+	for _, topic := range r.topics {
+		go func() {
+			msgs, err := r.admin.ch.Consume(
+				topic,
+				"",    // consumer
+				false, // auto-ack
+				false, // exclusive
+				false, // no-local
+				false, // no-wait
+				nil,   // args
+			)
+			if err != nil {
+				server.Log.Error(err)
+				return
+			}
+			for d := range msgs {
+				r.recvChan <- d.Body
+				d.Ack(true)
+			}
+		}()
+	}
+	return nil
+}
+
+func (r *RabbitMqConsumer) Commit() error {
+	r.mu.Lock()
+	defer r.mu.Unlock()
+	return r.admin.ch.Ack(0, true)
+}
+
+func NewRabbitConsumer(admin *RabbitMessageQueueAdmin, topic string) *RabbitMqConsumer {
+	return &RabbitMqConsumer{
+		admin:      admin,
+		topics:     make([]string, 0),
+		topic:      topic,
+		partitions: make([]*queue.TopicPartitionInfo, 0),
+		recvChan:   make(chan []byte, 10),
+	}
+}

+ 43 - 0
pkg/queue/msgQueue/rabbitmq_test.go

@@ -0,0 +1,43 @@
+package msgQueue
+
+import (
+	"sparrow/pkg/protocol"
+	"sparrow/pkg/queue"
+	"sparrow/pkg/server"
+	"testing"
+)
+
+func TestNewRabbitMqProducer(t *testing.T) {
+	server.InitLog("test", "debug")
+	admin := NewRabbitMessageQueueAdmin(&RabbitMqSettings{
+		Host:"amqp://guest:guest@localhost:5672/",
+	}, nil)
+
+	producer := NewRabbitMqProducer(admin, "test.lijian")
+	tpi :=&queue.TopicPartitionInfo{
+		Topic:       "ruleengine",
+		TenantId:    "111",
+		Partition:   1,
+		MyPartition: false,
+	}
+	msg, err := queue.NewGobQueueMessage(&protocol.Message{
+		QueueName:  "111",
+		Id:         "1111",
+		Ts:         nil,
+		Type:       "1213",
+		Data:       "123213",
+		RuleChanId: "123",
+		RuleNodeId: "123123",
+		Callback:   nil,
+		MetaData:   nil,
+		Originator: "",
+	})
+	if err != nil {
+		t.Error(err)
+	}
+
+	err = producer.Send(tpi, msg, nil)
+	if err != nil {
+		t.Error(err)
+	}
+}

+ 7 - 0
pkg/queue/queue_admin.go

@@ -0,0 +1,7 @@
+package queue
+
+// QueueAdmin 消息队列管理器
+type QueueAdmin interface {
+	CreateTopicIfNotExists(topic string) error
+	Destroy() error
+}

+ 2 - 2
pkg/queue/queue_consumer.go

@@ -10,11 +10,11 @@ type QueueConsumer interface {
 	// subscribe the topic
 	Subscribe() error
 	// subscribe with partitions, partitions is topics
-	SubscribeWithPartitions(partitions []TopicPartitionInfo) error
+	SubscribeWithPartitions(partitions []*TopicPartitionInfo) error
 	// unsubscribe
 	UnSubscribe()
 	// pop message from queue with time duration
-	Pop(time time.Duration) ([]QueueMessage, error)
+	Pop(time time.Duration) (<- chan QueueMessage, error)
 	// commit a message to queue if it is necessary
 	Commit() error
 }

+ 52 - 12
pkg/queue/queue_msg.go

@@ -1,5 +1,12 @@
 package queue
 
+import (
+	"bytes"
+	"encoding/gob"
+	"github.com/gogf/gf/util/guid"
+	"sparrow/pkg/protocol"
+)
+
 type QueueMessage interface {
 	GetKey() string
 	GetData() []byte
@@ -11,34 +18,67 @@ type QueueMsgHeaders interface {
 	Get(key string) []byte
 	GetData() map[string][]byte
 }
-type DefaultQueueMsg struct {
+
+// GobQueueMessage 发到消息队列中的消息
+type GobQueueMessage struct {
 	Key     string
-	Data    []byte
+	Value   []byte
 	Headers *DefaultQueueMsgHeader
 }
 
-func (d *DefaultQueueMsg) GetKey() string {
-	return d.Key
+func (g *GobQueueMessage) GetKey() string {
+	return g.Key
 }
 
-func (d *DefaultQueueMsg) GetData() []byte {
-	return d.Data
+func (g *GobQueueMessage) GetData() []byte {
+	return g.Value
+}
+
+func (g *GobQueueMessage) GetHeaders() QueueMsgHeaders {
+	return g.Headers
+}
+
+func NewGobQueueMessage(msg *protocol.Message) (*GobQueueMessage, error) {
+	b, err := msg.Encode()
+	if err != nil {
+		return nil, err
+	}
+	return &GobQueueMessage{
+		Key:     guid.S(),
+		Value:   b,
+		Headers: new(DefaultQueueMsgHeader),
+	}, nil
+}
+
+func (g *GobQueueMessage) Marshal() ([]byte, error) {
+	var network bytes.Buffer
+	enc := gob.NewEncoder(&network)
+	err := enc.Encode(g)
+	if err != nil {
+		return nil, err
+	}
+	return network.Bytes(), nil
 }
 
-func (d *DefaultQueueMsg) GetHeaders() QueueMsgHeaders {
-	return d.Headers
+func (g *GobQueueMessage) UnMarshal(data []byte) error {
+	var network bytes.Buffer
+	network.Write(data)
+	dec := gob.NewDecoder(&network)
+	return dec.Decode(g)
 }
 
+
+
 type DefaultQueueMsgHeader struct {
-	data map[string][]byte
+	Data map[string][]byte
 }
 
 func (d *DefaultQueueMsgHeader) Put(key string, value []byte) {
-	d.data[key] = value
+	d.Data[key] = value
 }
 
 func (d *DefaultQueueMsgHeader) Get(key string) []byte {
-	if v, ok := d.data[key]; !ok {
+	if v, ok := d.Data[key]; !ok {
 		return nil
 	} else {
 		return v
@@ -46,5 +86,5 @@ func (d *DefaultQueueMsgHeader) Get(key string) []byte {
 }
 
 func (d *DefaultQueueMsgHeader) GetData() map[string][]byte {
-	return d.data
+	return d.Data
 }

+ 4 - 2
pkg/queue/queue_producer.go

@@ -1,8 +1,10 @@
 package queue
 
-type RuleEngineQueueProducer interface {
+import "sparrow/pkg/protocol"
+
+type QueueProducer interface {
 	Init() error
 	GetDefaultTopic() string
-	Send(info TopicPartitionInfo, msg QueueMessage, callback Callback) error
+	Send(info *TopicPartitionInfo, msg protocol.Payload, callback Callback) error
 	Stop() error
 }

+ 7 - 3
pkg/queue/topic_partition_info.go

@@ -8,12 +8,12 @@ import (
 type TopicPartitionInfo struct {
 	Topic       string
 	TenantId    string
-	Partition   string
+	Partition   int
 	MyPartition bool
 }
 
 func (a *TopicPartitionInfo) String() string {
-	return fmt.Sprintf("%s.%s.%s", a.Topic, a.TenantId, a.Partition)
+	return fmt.Sprintf("%s", a.Topic)
 }
 
 func (a *TopicPartitionInfo) HashCode() string {
@@ -23,5 +23,9 @@ func (a *TopicPartitionInfo) HashCode() string {
 // ResolvePartition 生成info
 func ResolvePartition(serviceType, queueName, tenantId, entityId string) *TopicPartitionInfo {
 	// TODO:生成主题逻辑
-	return &TopicPartitionInfo{}
+	return &TopicPartitionInfo{
+		Topic: queueName,
+		TenantId: tenantId,
+		MyPartition: true,
+	}
 }

+ 15 - 3
pkg/ruleEngine/actor_system.go

@@ -18,17 +18,29 @@ type SystemContext struct {
 	TenantService    TenantService
 }
 
-func NewSystemContext(sys System) *SystemContext {
+type SystemContextServiceConfig struct {
+	ClusterService   ClusterService
+	RuleChainService RuleChainService
+	TenantService    TenantService
+}
+
+func NewSystemContext(sys System, config SystemContextServiceConfig) *SystemContext {
+	if config.TenantService == nil || config.RuleChainService== nil || config.ClusterService== nil {
+		panic("RuleEngine init error: services is not set")
+	}
 	return &SystemContext{
 		ActorSystem: sys,
+		ClusterService: config.ClusterService,
+		RuleChainService: config.RuleChainService,
+		TenantService: config.TenantService,
 	}
 }
 
-func (s *SystemContext) tell(msg protocol.ActorMsg) {
+func (s *SystemContext) Tell(msg protocol.ActorMsg) {
 	s.AppActor.Tell(msg)
 }
 
-func (s *SystemContext) tellWithHighPriority(msg protocol.ActorMsg) {
+func (s *SystemContext) TellWithHighPriority(msg protocol.ActorMsg) {
 	s.AppActor.TellWithHighPriority(msg)
 }
 

+ 5 - 2
pkg/ruleEngine/cluster_service.go

@@ -1,7 +1,10 @@
 package ruleEngine
 
-import "sparrow/pkg/queue"
+import (
+	"sparrow/pkg/protocol"
+	"sparrow/pkg/queue"
+)
 
 type ClusterService interface {
-	PushMessageToRuleEngine(info *queue.TopicPartitionInfo, msgId string, msg []byte, callback queue.Callback)
+	PushMessageToRuleEngine(info *queue.TopicPartitionInfo, msgId string, msg *protocol.Message, callback queue.Callback)
 }

+ 19 - 0
pkg/ruleEngine/context.go

@@ -16,6 +16,8 @@ type Context interface {
 	TellError(msg *protocol.Message, err error)
 	// message ack
 	Ack(msg *protocol.Message)
+	// transform a message
+	TransformMessage(msg *protocol.Message, msgType, originator string, metaData map[string]interface{}, data string) *protocol.Message
 }
 
 // DefaultContext 默认的上下文
@@ -76,3 +78,20 @@ func (d *DefaultContext) Ack(msg *protocol.Message) {
 	msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId)
 	msg.GetCallBack().OnSuccess()
 }
+
+func (d *DefaultContext) TransformMessage(msg *protocol.Message, msgType, originator string,
+	metaData map[string]interface{}, data string) *protocol.Message {
+		return &protocol.Message{
+			QueueName:  msg.QueueName,
+			Id:         msg.Id,
+			Ts:         msg.Ts,
+			Type:       msgType,
+			Data:       data,
+			RuleChanId: msg.RuleChanId,
+			RuleNodeId: msg.RuleNodeId,
+			Callback:   msg.Callback,
+			MetaData:   metaData,
+			Originator: originator,
+		}
+}
+

+ 2 - 0
pkg/ruleEngine/nodes/msg_type_filter_node.go

@@ -6,6 +6,8 @@ import (
 	"sparrow/pkg/ruleEngine"
 )
 
+// MsgTypeFilterNode 消息类型过滤节点
+// 如果当前消息的消息类型包含在当前节点的配置中,则使用True关系链,否则使用False关系
 type MsgTypeFilterNode struct {
 	config *MsgTypeFilterNodeConfig
 }

+ 2 - 0
pkg/ruleEngine/nodes/msg_type_switch_node.go

@@ -5,6 +5,8 @@ import (
 	"sparrow/pkg/ruleEngine"
 )
 
+// MsgTypeSwitchNode 消息类型路由节点
+// 根据消息的不同类型,路由到不同的关系节点上
 type MsgTypeSwitchNode struct {
 }
 

+ 1 - 0
pkg/ruleEngine/nodes/reg_types.go

@@ -15,6 +15,7 @@ type item struct {
 func init() {
 	registerType((*MsgTypeFilterNode)(nil))
 	registerType((*MsgTypeSwitchNode)(nil))
+	registerType((*RestApiRequestNode)(nil))
 }
 
 func registerType(elem interface{}) {

+ 112 - 0
pkg/ruleEngine/nodes/rest_api_request_node.go

@@ -0,0 +1,112 @@
+package nodes
+
+import (
+	"bytes"
+	"encoding/json"
+	"github.com/gogf/gf/os/grpool"
+	"net/http"
+	"sparrow/pkg/protocol"
+	"sparrow/pkg/ruleEngine"
+	"sparrow/pkg/server"
+	"sparrow/pkg/utils"
+	"time"
+)
+
+// RestApiRequestNode 请求外部API节点
+type RestApiRequestNode struct {
+	pool   *grpool.Pool
+	config *RestApiRequestNodeConfig
+	client *utils.HttpClient
+}
+
+func (r *RestApiRequestNode) Init(ctx ruleEngine.Context, config string) error {
+	r.pool = grpool.New(10)
+
+	if config == "" {
+		r.config = &RestApiRequestNodeConfig{
+			Url:       "http://localhost/",
+			Headers:   make(map[string]string),
+			Retry:     1,
+			Method:    "GET",
+			TimeOut:   5,
+			RetryWait: 1,
+		}
+	} else {
+		c := new(RestApiRequestNodeConfig)
+		err := json.Unmarshal([]byte(config), c)
+		if err != nil {
+			return err
+		}
+		r.config = c
+	}
+	client := utils.NewHttpClientWithConfig(time.Duration(r.config.TimeOut)*time.Second,
+		r.config.Retry, time.Duration(r.config.RetryWait)*time.Second)
+	client.SetLogger(server.Log)
+	r.client = client
+	return nil
+}
+
+func (r *RestApiRequestNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
+	body := message.Data
+	headers := make(map[string]interface{})
+	for k, v := range message.MetaData {
+		headers[k] = v
+	}
+	for k, v := range r.config.Headers {
+		headers[k] = v
+	}
+	w := new(bytes.Buffer)
+	if err := json.NewEncoder(w).Encode(body); err != nil {
+		return err
+	}
+	req, err := utils.NewRequest(r.config.Method, r.config.Url, w)
+	if err != nil {
+		return err
+	}
+	return r.pool.Add(func() {
+		res, err := r.client.Do(req)
+		if err != nil {
+			next := r.processError(ctx, message, err)
+			ctx.TellError(next, err)
+			return
+		}
+		if res.Response().StatusCode == http.StatusOK {
+			msg := r.processResponse(ctx, message, res)
+			ctx.TellSuccess(msg)
+		}
+	})
+}
+
+func (r *RestApiRequestNode) processError(ctx ruleEngine.Context, msg *protocol.Message, err error) *protocol.Message {
+	var metaData map[string]interface{}
+	if msg.MetaData != nil {
+		metaData = msg.MetaData
+	}
+	metaData = make(map[string]interface{})
+	metaData["error"] = err.Error()
+
+	return ctx.TransformMessage(msg, msg.Type, msg.Originator, metaData, msg.Data)
+}
+func (r *RestApiRequestNode) processResponse(ctx ruleEngine.Context, msg *protocol.Message, res utils.Responser) *protocol.Message {
+	var metaData map[string]interface{}
+	if msg.MetaData != nil {
+		metaData = msg.MetaData
+	}
+	metaData = make(map[string]interface{})
+	metaData["STATUS"] = res.Response().Status
+	metaData["STATUS_CODE"] = res.Response().StatusCode
+	var data string
+	if body, err := res.String(); err == nil {
+		data = body
+	}
+	return ctx.TransformMessage(msg, msg.Type, msg.Originator, metaData, data)
+}
+
+type RestApiRequestNodeConfig struct {
+	Url       string            `json:"url"`        // 请求的地址
+	Headers   map[string]string `json:"headers"`    // 请求头
+	Retry     int               `json:"retry"`      // 重试次数
+	Method    string            `json:"method"`     // 请求方法
+	TimeOut   int               `json:"time_out"`   // 超时时间(秒)
+	RetryWait int               `json:"retry_wait"` // 重试等待时间
+}

+ 27 - 0
pkg/ruleEngine/rule_chain_service.go

@@ -27,6 +27,16 @@ func (t *TestRuleChainService) GetRuleNodeRelations(tenantId, nodeId string) ([]
 			},
 		}, nil
 	}
+	if nodeId == "node2" {
+		return []*Relation {
+			{
+				From:              "node2",
+				To:                "node3",
+				Type:              "True",
+				RelationTypeGroup: COMMON,
+			},
+		}, nil
+	}
 	return nil, nil
 }
 
@@ -62,6 +72,15 @@ func (t *TestRuleChainService) FindRuleNodeById(tenantId, ruleNodeId string) (*R
 			Config:      "",
 			RuleNodeId:  "node2",
 		}, nil
+	case "node3":
+		return &RuleNode{
+			RuleChainId: "chain id 2",
+			Type:        "RestApiRequestNode",
+			Name:        "rest api",
+			IsDebug:     false,
+			Config:      "",
+			RuleNodeId:  "node3",
+		}, nil
 	}
 	return nil, nil
 }
@@ -84,6 +103,14 @@ func (t *TestRuleChainService) GetRuleChainNodes(tenantId, ruleChainId string) (
 			Config:      "",
 			RuleNodeId:  "node2",
 		},
+		{
+			RuleChainId: "chain id 2",
+			Type:        "RestApiRequestNode",
+			Name:        "rest api",
+			IsDebug:     false,
+			Config:      "",
+			RuleNodeId:  "node3",
+		},
 	}, nil
 }
 

+ 2 - 1
pkg/server/config.go

@@ -17,6 +17,7 @@ const (
 	FlagEtcd     = "etcd"
 	FlagLogLevel = "loglevel"
 	FlagUDPHost  = "udphost"
+	FlagExternalIp = "external"
 )
 
 var (
@@ -33,6 +34,6 @@ var (
 	confRPCHost = flag.String(FlagRPCHost, "", "rpc server listen address, format ip:port")
 
 	confEtcd = flag.String(FlagEtcd, "", "etcd service addr, format ip:port;ip:port")
-
+	confExIp = flag.String(FlagExternalIp, "", "服务使用的外网IP")
 	confLogLevel = flag.String(FlagLogLevel, "info", "default log level, options are panic|fatal|error|warn|info|debug")
 )

+ 3 - 3
pkg/server/http_server.go

@@ -6,7 +6,7 @@ import (
 )
 
 type HTTPServer struct {
-	addr     string
+	addr     *addr
 	handler  http.Handler
 	useHttps bool
 }
@@ -31,9 +31,9 @@ func (hs *HTTPServer) Start() error {
 	go func() {
 		var err error
 		if hs.useHttps == false {
-			err = http.ListenAndServe(hs.addr, hs.handler)
+			err = http.ListenAndServe(hs.addr.internalIp, hs.handler)
 		} else {
-			err = http.ListenAndServeTLS(hs.addr, *confCAFile, *confKeyFile, hs.handler)
+			err = http.ListenAndServeTLS(hs.addr.internalIp, *confCAFile, *confKeyFile, hs.handler)
 		}
 
 		if err != nil {

+ 19 - 12
pkg/server/netif.go

@@ -4,7 +4,6 @@
 package server
 
 import (
-	"errors"
 	"net"
 	"strings"
 )
@@ -62,23 +61,31 @@ func readNetInterfaces() {
 			ExternalIP = ip
 		}
 	}
+	if *confExIp != "" {
+		ExternalIP = *confExIp
+	}
 	return
 }
 
 // fix host ip with "internal:port" or "external:port" format
-func fixHostIp(addr string) (string, error) {
-	if strings.Contains(addr, confInternalIP) {
-		if InternalIP != "" {
-			addr = strings.Replace(addr, confInternalIP, InternalIP, -1)
-		} else {
-			return addr, errors.New("server has no internal ip")
+func fixHostIp(address string) (*addr, error) {
+	var ex, in string
+	if *confExIp != "" {
+		if strings.Contains(address, confExternalIP) {
+			in = strings.Replace(address, confExternalIP, "0.0.0.0", -1)
 		}
-	} else if strings.Contains(addr, confExternalIP) {
-		if ExternalIP != "" {
-			addr = strings.Replace(addr, confExternalIP, ExternalIP, -1)
+		ex = strings.Replace(address, confExternalIP, *confExIp, -1)
+	} else {
+		if strings.Contains(address, confInternalIP) {
+			in = strings.Replace(address, confInternalIP, InternalIP, -1)
+			ex = in
 		} else {
-			return addr, errors.New("server has no external ip")
+			in = address
+			ex = address
 		}
 	}
-	return addr, nil
+	return &addr{
+		externalIp: ex,
+		internalIp: in,
+	}, nil
 }

+ 0 - 33
pkg/server/netif_test.go

@@ -1,33 +0,0 @@
-package server
-
-import (
-	"testing"
-)
-
-func TestNetIf(t *testing.T) {
-	readNetInterfaces()
-	t.Logf("internal ip: %s", InternalIP)
-	t.Logf("external ip: %s", ExternalIP)
-}
-
-func TestIsInternalIP(t *testing.T) {
-	testIPs := []string{"127.0.0.1", "192.168.5.234", "10.23.45.56", "172.17.2.4"}
-	for _, ip := range testIPs {
-		if isInternalIP(ip) == false {
-			t.Errorf("test internal ip failed: %s", ip)
-		}
-	}
-}
-
-func TestFixHostIP(t *testing.T) {
-	InternalIP = "10.1.1.1"
-	ExternalIP = "5.1.1.1"
-	fixedIP, err := fixHostIp("internal:40")
-	if err != nil || fixedIP != "10.1.1.1:40" {
-		t.Errorf("test fix host ip failed: %s, %s", fixedIP, err)
-	}
-	fixedIP, err = fixHostIp("external:40")
-	if err != nil || fixedIP != "5.1.1.1:40" {
-		t.Errorf("test fix host ip failed: %s, %s", fixedIP, err)
-	}
-}

+ 2 - 2
pkg/server/server.go

@@ -230,7 +230,7 @@ func GetRPCHost() string {
 		return ""
 	}
 
-	return serverInstance.rpcsvr.addr
+	return serverInstance.rpcsvr.addr.externalIp
 }
 
 // GetHTTPHost get this server's http host addr
@@ -238,7 +238,7 @@ func GetHTTPHost() string {
 	if serverInstance == nil || serverInstance.httpsvr == nil {
 		return ""
 	}
-	return serverInstance.httpsvr.addr
+	return serverInstance.httpsvr.addr.externalIp
 }
 
 // Run start service

+ 23 - 15
pkg/server/server_manager.go

@@ -25,6 +25,11 @@ const (
 	lease = 90
 )
 
+type addr struct {
+	internalIp string
+	externalIp string
+}
+
 // ServerManager server manager
 type ServerManager struct {
 	serverName string
@@ -72,36 +77,39 @@ func (mgr *ServerManager) RegisterServer() error {
 	}
 	prefix := fmt.Sprintf("%s%s/", EtcdServersPrefix, mgr.serverName)
 	var (
-		addr string
+		addr *addr
 		keys []string
 	)
 	if serverInstance.tcpsvr != nil {
-		addr := os.Getenv(EnvTCPProxy)
-		if addr == "" {
-			addr, _ = fixHostIp(*confTCPHost)
+		addr, _ = fixHostIp(*confTCPHost)
+		envAddr := os.Getenv(EnvTCPProxy)
+		if envAddr != "" {
+			addr.externalIp = envAddr
 		}
-		keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagTCPHost, addr))
+		keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagTCPHost, addr.externalIp))
 	}
 	if serverInstance.rpcsvr != nil {
 		addr, _ := fixHostIp(*confRPCHost)
-		keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagRPCHost, addr))
+		keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagRPCHost, addr.externalIp))
 	}
 	if serverInstance.udpsvr != nil {
-		addr := os.Getenv(EnvUDPProxy)
-		if addr == "" {
-			addr, _ = fixHostIp(*confUDPHost)
+		addr, _ = fixHostIp(*confUDPHost)
+		envAddr := os.Getenv(EnvUDPProxy)
+		if envAddr != "" {
+			addr.externalIp = envAddr
 		}
-		keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagUDPHost, addr))
+		keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagUDPHost, addr.externalIp))
 	}
 	if serverInstance.httpsvr != nil {
-		addr := os.Getenv(EnvHTTPProxy)
-		if addr == "" {
-			addr, _ = fixHostIp(*confHTTPHost)
+		addr, _ = fixHostIp(*confHTTPHost)
+		envAddr := os.Getenv(EnvHTTPProxy)
+		if envAddr != "" {
+			addr.externalIp = envAddr
 		}
-		keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagHTTPHost, addr))
+		keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagHTTPHost, addr.externalIp))
 	}
 	for _, key := range keys {
-		_, err = mgr.cli.Put(ctx, key, addr, clientv3.WithLease(resp.ID))
+		_, err = mgr.cli.Put(ctx, key, "server", clientv3.WithLease(resp.ID))
 		if err != nil {
 			return nil
 		}

+ 3 - 3
pkg/server/tcp_server.go

@@ -11,7 +11,7 @@ type TCPHandler interface {
 }
 
 type TCPServer struct {
-	addr    string
+	addr    *addr
 	handler TCPHandler
 	useTls  bool
 }
@@ -44,7 +44,7 @@ func (ts *TCPServer) Start() error {
 		config := tls.Config{Certificates: []tls.Certificate{cert}}
 
 		// listen for new connection
-		ln, err = tls.Listen("tcp", ts.addr, &config)
+		ln, err = tls.Listen("tcp4", ts.addr.internalIp, &config)
 
 		if err != nil {
 			return errorf(errListenFailed, ts.addr, err)
@@ -52,7 +52,7 @@ func (ts *TCPServer) Start() error {
 
 	} else {
 		// don't use tls, just listen
-		ln, err = net.Listen("tcp", ts.addr)
+		ln, err = net.Listen("tcp4", ts.addr.internalIp)
 		if err != nil {
 			return errorf(errListenFailed, ts.addr, err)
 		}

+ 2 - 2
pkg/server/udp_server.go

@@ -15,7 +15,7 @@ type UDPHandler interface {
 
 // UDPServer udp server
 type UDPServer struct {
-	addr    string
+	addr    *addr
 	handler UDPHandler
 }
 
@@ -26,7 +26,7 @@ func (us *UDPServer) Start() error {
 	}
 	var ln *net.UDPConn
 	var err error
-	a, err := net.ResolveUDPAddr("udp", us.addr)
+	a, err := net.ResolveUDPAddr("udp", us.addr.internalIp)
 	if err != nil {
 		return err
 	}

+ 475 - 0
pkg/utils/http_client.go

@@ -0,0 +1,475 @@
+package utils
+
+import (
+	"bytes"
+	"context"
+	"encoding/json"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"regexp"
+	"time"
+)
+
+// 封装一个可以支持重试的http request client
+// admin server集群在某些机器宕机或者超时的情况下轮询重试
+//
+
+var (
+	// 默认重试等待时间
+	defaultRetryWaitTime = 1 * time.Second
+	// 默认重试次数
+	defaultRetryCount = 4
+	// 重定向次数过多的错误
+	redirectsErrorRe = regexp.MustCompile(`stopped after \d+ redirects\z`)
+	// 不支持的协议类型错误
+	schemeErrorRe = regexp.MustCompile(`unsupported protocol scheme`)
+	// 默认的客户端
+	DefaultClient = NewHttpClient()
+	// 默认log
+	// defaultLogger = Logger{log.New(os.Stdout, "\r\n", 0)}
+	// 请求超时时间
+	defaultRequestTimeout = 3 * time.Second
+)
+
+// ErrorResult 错误结果
+type ErrorResult struct {
+	Code    int    `json:"code"`
+	Message string `json:"message"`
+}
+
+func (r *ErrorResult) Error() string {
+	return r.Message
+}
+
+type ReaderFunc func() (io.Reader, error)
+
+type Request struct {
+	body ReaderFunc
+	*http.Request
+}
+
+// Responser response interface
+type Responser interface {
+	String() (string, error)
+	Bytes() ([]byte, error)
+	JSON(v interface{}) error
+	Response() *http.Response
+	Close()
+}
+
+func newResponse(resp *http.Response) *response {
+	return &response{resp}
+}
+
+type response struct {
+	resp *http.Response
+}
+
+func (r *response) Response() *http.Response {
+	return r.resp
+}
+
+func (r *response) String() (string, error) {
+	b, err := r.Bytes()
+	if err != nil {
+		return "", err
+	}
+	return string(b), nil
+}
+
+func (r *response) Bytes() ([]byte, error) {
+	defer r.resp.Body.Close()
+
+	buf, err := ioutil.ReadAll(r.resp.Body)
+	if err != nil {
+		return nil, err
+	}
+	return buf, nil
+}
+
+func (r *response) JSON(v interface{}) error {
+	defer r.resp.Body.Close()
+	return json.NewDecoder(r.resp.Body).Decode(v)
+}
+
+func (r *response) Close() {
+	if !r.resp.Close {
+		r.resp.Body.Close()
+	}
+}
+
+// ParseResponseJSON 解析响应JSON
+func ParseResponseJSON(resp Responser, result interface{}) error {
+	if resp.Response().StatusCode != 200 {
+		buf, err := resp.Bytes()
+		if err != nil {
+			return err
+		}
+
+		errResult := &ErrorResult{}
+		err = json.Unmarshal(buf, errResult)
+		if err == nil &&
+			(errResult.Code != 0 || errResult.Message != "") {
+			return errResult
+		}
+
+		return fmt.Errorf("%s", buf)
+	} else if result == nil {
+		resp.Close()
+		return nil
+	}
+	return resp.JSON(result)
+}
+
+func NewRequest(method, url string, rawBody interface{}) (*Request, error) {
+	bodyReader, contentLength, err := getBodyReaderAndContentLength(rawBody)
+	if err != nil {
+		return nil, err
+	}
+
+	httpReq, err := http.NewRequest(method, url, nil)
+	if err != nil {
+		return nil, err
+	}
+	httpReq.ContentLength = contentLength
+
+	return &Request{bodyReader, httpReq}, nil
+}
+
+type LenReader interface {
+	Len() int
+}
+
+func getBodyReaderAndContentLength(rawBody interface{}) (ReaderFunc, int64, error) {
+	var bodyReader ReaderFunc
+	var contentLength int64
+
+	if rawBody != nil {
+		switch body := rawBody.(type) {
+		// 如果注册了ReaderFunc,直接调用
+		case ReaderFunc:
+			bodyReader = body
+			tmp, err := body()
+			if err != nil {
+				return nil, 0, err
+			}
+			if lr, ok := tmp.(LenReader); ok {
+				contentLength = int64(lr.Len())
+			}
+			if c, ok := tmp.(io.Closer); ok {
+				_ = c.Close()
+			}
+
+		case func() (io.Reader, error):
+			bodyReader = body
+			tmp, err := body()
+			if err != nil {
+				return nil, 0, err
+			}
+			if lr, ok := tmp.(LenReader); ok {
+				contentLength = int64(lr.Len())
+			}
+			if c, ok := tmp.(io.Closer); ok {
+				_ = c.Close()
+			}
+
+		case []byte:
+			buf := body
+			bodyReader = func() (io.Reader, error) {
+				return bytes.NewReader(buf), nil
+			}
+			contentLength = int64(len(buf))
+
+		case *bytes.Buffer:
+			buf := body
+			bodyReader = func() (io.Reader, error) {
+				return bytes.NewReader(buf.Bytes()), nil
+			}
+			contentLength = int64(buf.Len())
+
+		case *bytes.Reader:
+			buf, err := ioutil.ReadAll(body)
+			if err != nil {
+				return nil, 0, err
+			}
+			bodyReader = func() (io.Reader, error) {
+				return bytes.NewReader(buf), nil
+			}
+			contentLength = int64(len(buf))
+
+		case io.ReadSeeker:
+			raw := body
+			bodyReader = func() (io.Reader, error) {
+				_, err := raw.Seek(0, 0)
+				return ioutil.NopCloser(raw), err
+			}
+			if lr, ok := raw.(LenReader); ok {
+				contentLength = int64(lr.Len())
+			}
+
+		case io.Reader:
+			buf, err := ioutil.ReadAll(body)
+			if err != nil {
+				return nil, 0, err
+			}
+			bodyReader = func() (io.Reader, error) {
+				return bytes.NewReader(buf), nil
+			}
+			contentLength = int64(len(buf))
+
+		default:
+			return nil, 0, fmt.Errorf("无法处理的的body类型 %T", rawBody)
+		}
+	}
+	return bodyReader, contentLength, nil
+}
+
+func (r *Request) WithContext(ctx context.Context) *Request {
+	r.Request = r.Request.WithContext(ctx)
+	return r
+}
+
+func (r *Request) BodyBytes() ([]byte, error) {
+	if r.body == nil {
+		return nil, nil
+	}
+	body, err := r.body()
+	if err != nil {
+		return nil, err
+	}
+	buf := new(bytes.Buffer)
+	_, err = buf.ReadFrom(body)
+	if err != nil {
+		return nil, err
+	}
+	return buf.Bytes(), nil
+}
+
+// 指定是否可以重试的策略,如果返回false,则客户端停止重试。
+type CheckRetry func(ctx context.Context, resp *http.Response, err error) (bool, error)
+
+// DefaultCheckRetry 默认的重试策略
+func DefaultCheckRetry(ctx context.Context, resp *http.Response, err error) (bool, error) {
+	if ctx.Err() != nil {
+		return false, ctx.Err()
+	}
+	if err != nil {
+		if v, ok := err.(*url.Error); ok {
+			if redirectsErrorRe.MatchString(v.Error()) {
+				return false, nil
+			}
+			if schemeErrorRe.MatchString(v.Error()) {
+				return false, nil
+			}
+			// 超时不重试
+			if v.Timeout() {
+				return false, nil
+			}
+		}
+		return true, nil
+	}
+	if resp.StatusCode == 0 || (resp.StatusCode >= 500 && resp.StatusCode != 501) {
+		return true, nil
+	}
+
+	return false, nil
+}
+
+type logger interface {
+	Info(v ...interface{})
+}
+
+// LogWriter log writer interface
+type LogWriter interface {
+	Info(v ...interface{})
+}
+
+// Logger default logger
+type Logger struct {
+	LogWriter
+}
+
+// Print format & print log
+func (logger Logger) Print(values ...interface{}) {
+	logger.Info(values...)
+}
+
+type HttpClient struct {
+	// 默认的http client
+	httpClient *http.Client
+	// 重试等待时长
+	retryWaitTime time.Duration
+	// 重试次数
+	retryCount int
+	// 重试判定策略
+	canRetry CheckRetry
+	// logger
+	logger logger
+	// request time out seconds
+	timeOut time.Duration
+}
+
+// NewHttpClient new http client with retry
+func NewHttpClient() *HttpClient {
+	return &HttpClient{
+		httpClient: &http.Client{
+			Timeout: defaultRequestTimeout,
+		},
+		retryWaitTime: defaultRetryWaitTime,
+		retryCount:    defaultRetryCount,
+		canRetry:      DefaultCheckRetry,
+		//logger:        defaultLogger,
+	}
+}
+
+func NewHttpClientWithConfig(timeOut time.Duration, retry int, retryWaitTime time.Duration) *HttpClient {
+	return &HttpClient{
+		httpClient: &http.Client{
+			Timeout: timeOut,
+		},
+		retryWaitTime: retryWaitTime,
+		retryCount:    retry,
+		canRetry:      DefaultCheckRetry,
+	}
+}
+
+// 设置外部注入logger,只要实现print方法
+func (a *HttpClient) SetLogger(l logger) {
+	a.logger = l
+}
+
+// SetLogger
+func SetLogger(l logger) {
+	DefaultClient.SetLogger(l)
+}
+
+// Do do http method with retries
+func (a *HttpClient) Do(req *Request) (Responser, error) {
+	if a.httpClient == nil {
+		a.httpClient = http.DefaultClient
+	}
+	var resp *http.Response
+	var err error
+	for i := 0; ; i++ {
+		var code int
+		if req.body != nil {
+			body, err := req.body()
+			if err != nil {
+				a.httpClient.CloseIdleConnections()
+				return newResponse(resp), err
+			}
+			if c, ok := body.(io.ReadCloser); ok {
+				req.Body = c
+			} else {
+				req.Body = ioutil.NopCloser(body)
+			}
+		}
+		resp, err = a.httpClient.Do(req.Request)
+		if resp != nil {
+			code = resp.StatusCode
+		}
+		checkOk, checkErr := a.canRetry(req.Context(), resp, err)
+		if err != nil {
+			a.logger.Info(fmt.Sprintf("请求出错:[URL]:%s [Method]:%s, [错误]:%s", req.URL, req.Method, err.Error()))
+		}
+		if !checkOk {
+			if checkErr != nil {
+				err = checkErr
+			}
+			a.httpClient.CloseIdleConnections()
+			return newResponse(resp), nil
+		}
+		remain := a.retryCount - i
+		if remain <= 0 {
+			break
+		}
+		//if err == nil &&resp !=nil {
+		//
+		//}
+		desc := fmt.Sprintf("%s %s", req.Method, req.URL)
+		if code > 0 {
+			desc = fmt.Sprintf("%s [status:%d]", desc, code)
+		}
+		a.logger.Info(desc)
+		select {
+		case <-req.Context().Done():
+			a.httpClient.CloseIdleConnections()
+			return nil, req.Context().Err()
+		case <-time.After(a.retryWaitTime):
+		}
+	}
+	if resp != nil {
+		_ = resp.Body.Close()
+	}
+	a.httpClient.CloseIdleConnections()
+	return nil, fmt.Errorf("%s %s giving up after %d attempts",
+		req.Method, req.URL, a.retryCount+1)
+}
+
+func Get(url string) (Responser, error) {
+	return DefaultClient.Get(url)
+}
+
+func (a *HttpClient) Get(url string) (Responser, error) {
+	req, err := NewRequest("GET", url, nil)
+	if err != nil {
+		return nil, err
+	}
+	return a.Do(req)
+}
+
+func Post(url string, body interface{}) (Responser, error) {
+	return DefaultClient.Post(url, "application/json", body)
+}
+
+func Put(url string, body interface{}) (Responser, error) {
+	return DefaultClient.Put(url, "application/json", body)
+}
+func Delete(url string, body interface{}) (Responser, error) {
+	return DefaultClient.Delete(url, "application/json", body)
+}
+
+func (a *HttpClient) Post(url, bodyType string, body interface{}) (Responser, error) {
+	w := new(bytes.Buffer)
+	if err := json.NewEncoder(w).Encode(body); err != nil {
+		return nil, err
+	}
+	req, err := NewRequest("POST", url, w)
+	if err != nil {
+		return nil, err
+	}
+	req.Header.Set("Content-Type", bodyType)
+	return a.Do(req)
+}
+func (a *HttpClient) Put(url, bodyType string, body interface{}) (Responser, error) {
+	w := new(bytes.Buffer)
+	if err := json.NewEncoder(w).Encode(body); err != nil {
+		return nil, err
+	}
+	req, err := NewRequest(http.MethodPut, url, w)
+	if err != nil {
+		return nil, err
+	}
+	req.Header.Set("Content-Type", bodyType)
+	return a.Do(req)
+}
+func (a *HttpClient) Delete(url, bodyType string, body interface{}) (Responser, error) {
+	req, err := NewRequest(http.MethodDelete, url, nil)
+	if err != nil {
+		return nil, err
+	}
+	req.Header.Set("Content-Type", bodyType)
+	return a.Do(req)
+}
+
+// GetForObject http get a json obj
+func GetForObject(url string, result interface{}) error {
+	resp, err := Get(url)
+	if err != nil {
+		return err
+	}
+	return ParseResponseJSON(resp, result)
+}

+ 8 - 8
run.sh

@@ -5,15 +5,15 @@ sudo killall -9 httpaccess registry apiprovider devicemanager controller mqttacc
 # start services
 #$GOPATH/bin/httpaccess -etcd http://localhost:2379 -httphost internal:443 -loglevel debug -usehttps -keyfile $GOPATH/src/github.com/PandoCloud/pando-cloud/pkg/server/testdata/key.pem -cafile $GOPATH/src/github.com/PandoCloud/pando-cloud/pkg/server/testdata/cert.pem &
 $GOPATH/bin/httpaccess -etcd http://127.0.0.1:2379 -httphost internal:8088 -loglevel debug &
-$GOPATH/bin/registry -etcd http://127.0.0.1:2379 -rpchost localhost:20034 -aeskey ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP -dbhost 127.0.0.1 -dbname SparrowCloud -dbport 3306 -dbuser SparrowCloud -dbpass 123456 -loglevel debug &
-$GOPATH/bin/apiprovider -etcd http://127.0.0.1:2379 -loglevel debug  -httphost localhost:8888 &
-$GOPATH/bin/devicemanager -etcd http://127.0.0.1:2379 -loglevel debug  -rpchost localhost:20033 &
-$GOPATH/bin/controller -etcd http://127.0.0.1:2379 -loglevel debug  -rpchost localhost:20032 &
+$GOPATH/bin/registry -etcd http://127.0.0.1:2379 -rpchost internal:20034 -aeskey ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP -dbhost 39.98.250.155 -dbname gxt-iot-db -dbport 3306 -dbuser root -dbpass gEkYDPloQcp93t4WHr3X -loglevel debug &
+$GOPATH/bin/apiprovider -etcd http://127.0.0.1:2379 -loglevel debug  -httphost internal:8888 &
+$GOPATH/bin/devicemanager -etcd http://127.0.0.1:2379 -loglevel debug  -rpchost internal:20033 &
+$GOPATH/bin/controller -etcd http://127.0.0.1:2379 -loglevel debug  -rpchost internal:20032 &
 #$GOPATH/bin/mqttaccess -etcd http://localhost:2379 -loglevel debug  -rpchost localhost:20030 -tcphost internal:1883 -usetls -keyfile $GOPATH/src/github.com/PandoCloud/pando-cloud/pkg/server/testdata/key.pem -cafile $GOPATH/src/github.com/PandoCloud/pando-cloud/pkg/server/testdata/cert.pem &
-$GOPATH/bin/mqttaccess -etcd http://127.0.0.1:2379 -loglevel debug  -rpchost localhost:20030 -tcphost internal:1883  &
-$GOPATH/bin/knowoapi -etcd http://127.0.0.1:2379 -loglevel debug  -httphost localhost:8889 -dbhost 127.0.0.1 -dbname SparrowCloud -dbport 3306 -dbuser SparrowCloud -dbpass 123456 -aeskey ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP &
-$GOPATH/bin/fileaccess -etcd http://127.0.0.1:2379 -loglevel debug  -rpchost localhost:20035 -httphost localhost:9000 &
-$GOPATH/bin/coapaccess -etcd http://127.0.0.1:2379 -loglevel debug  -udphost localhost:56883 &
+$GOPATH/bin/mqttaccess -etcd http://127.0.0.1:2379 -loglevel debug  -rpchost internal:20030 -tcphost internal:1883  &
+$GOPATH/bin/knowoapi -etcd http://127.0.0.1:2379 -loglevel debug  -httphost internal:8889 -dbhost 39.98.250.155 -dbname gxt-iot-db -dbport 3306 -dbuser root -dbpass gEkYDPloQcp93t4WHr3X -aeskey ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP &
+$GOPATH/bin/fileaccess -etcd http://127.0.0.1:2379 -loglevel debug  -rpchost internal:20035 -httphost internal:9000 &
+$GOPATH/bin/coapaccess -etcd http://127.0.0.1:2379 -loglevel debug  -udphost internal:56883 &
 exit 0
 
 

+ 1 - 1
services/apiprovider/response.go

@@ -19,7 +19,7 @@ type DeviceInfoResponse struct {
 	Data DeviceInfoData `json:"data"`
 }
 
-type DeviceStatusData map[string][]interface{}
+type DeviceStatusData map[string]interface{}
 
 type DeviceStatusResponse struct {
 	Common

+ 125 - 67
services/controller/controller.go

@@ -1,73 +1,56 @@
 package main
 
 import (
+	"encoding/json"
+	"fmt"
+	"github.com/gogf/gf/os/grpool"
+	"github.com/gogf/gf/util/guid"
 	"sparrow/pkg/actors"
-	"sparrow/pkg/mongo"
+	"sparrow/pkg/models"
+	"sparrow/pkg/productconfig"
+	"sparrow/pkg/protocol"
 	"sparrow/pkg/queue"
+	"sparrow/pkg/queue/msgQueue"
 	"sparrow/pkg/rpcs"
 	"sparrow/pkg/rule"
 	"sparrow/pkg/ruleEngine"
 	"sparrow/pkg/server"
+	"strconv"
 	"time"
 )
 
-const (
-	mongoSetName = "pandocloud"
-	topicEvents  = "events"
-	topicStatus  = "status"
-)
-
 type Controller struct {
-	commandRecorder *mongo.Recorder
-	eventRecorder   *mongo.Recorder
-	dataRecorder    *mongo.Recorder
-	eventsQueue     *queue.Queue
-	statusQueue     *queue.Queue
-	timer           *rule.Timer
-	ift             *rule.Ifttt
+	producer     queue.QueueProducer
+	timer        *rule.Timer
+	ift          *rule.Ifttt
+	actorContext *ruleEngine.SystemContext
+	consumer     queue.QueueConsumer
+	cluster *ClusterService
+	pool *grpool.Pool
 }
 
-func NewController(mongohost string, rabbithost string) (*Controller, error) {
-	cmdr, err := mongo.NewRecorder(mongohost, mongoSetName, "commands")
-	if err != nil {
-		return nil, err
-	}
-
-	ever, err := mongo.NewRecorder(mongohost, mongoSetName, "events")
-	if err != nil {
-		return nil, err
-	}
-
-	datar, err := mongo.NewRecorder(mongohost, mongoSetName, "datas")
-	if err != nil {
-		return nil, err
-	}
-
-	eq, err := queue.New(rabbithost, topicEvents)
-	if err != nil {
-		return nil, err
-	}
-
-	sq, err := queue.New(rabbithost, topicStatus)
-	if err != nil {
-		return nil, err
-	}
-
+func NewController(rabbithost string) (*Controller, error) {
+	admin := msgQueue.NewRabbitMessageQueueAdmin(&msgQueue.RabbitMqSettings{Host: rabbithost}, nil)
+	producer := msgQueue.NewRabbitMqProducer(admin, "default")
+	consumer := msgQueue.NewRabbitConsumer(admin, "MAIN")
+	_ = consumer.Subscribe()
 	// timer
 	t := rule.NewTimer()
 	t.Run()
 
 	// ifttt
 	ttt := rule.NewIfttt()
+	if err := producer.Init(); err != nil {
+		return nil, err
+	}
 
 	return &Controller{
-		commandRecorder: cmdr,
-		eventRecorder:   ever,
-		dataRecorder:    datar,
-		eventsQueue:     eq,
-		statusQueue:     sq,
-		timer:           t,
-		ift:             ttt,
+		producer: producer,
+		timer:    t,
+		ift:      ttt,
+		consumer: consumer,
+		cluster:&ClusterService{producer:producer},
+		pool:     grpool.New(),
 	}, nil
 }
 
@@ -90,15 +73,68 @@ func (c *Controller) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStat
 }
 
 func (c *Controller) OnStatus(args rpcs.ArgsOnStatus, reply *rpcs.ReplyOnStatus) error {
-	err := c.dataRecorder.Insert(args)
+	t := time.Unix(int64(args.Timestamp/1000), 0)
+	data, err := c.processStatusToQueue(args)
 	if err != nil {
 		return err
 	}
-	err = c.statusQueue.Send(args)
+	msg := &protocol.Message{
+		Id:       guid.S(),
+		Ts:       &t,
+		Type:     protocol.POST_ATTRIBUTES_REQUEST,
+		Data:     data,
+		Callback: nil,
+		MetaData: map[string]interface{}{
+			"tenant_id": "tenant_1",
+			"device_id": args.DeviceId,
+		},
+		Originator: "device",
+	}
+	tpi := queue.ResolvePartition("RULE_ENGINE",
+		msg.GetQueueName(),
+		"tenant_1",
+		strconv.Itoa(int(args.DeviceId)))
+	g, err := queue.NewGobQueueMessage(msg)
 	if err != nil {
 		return err
 	}
-	return nil
+	return c.producer.Send(tpi, g, nil)
+}
+
+func (c *Controller) processStatusToQueue(args rpcs.ArgsOnStatus) (string, error) {
+	var result string
+	device := &models.Device{}
+	err := server.RPCCallByName(nil, "registry", "Registry.FindDeviceById", &rpcs.ArgsDeviceAuth{DeviceID: int64(args.DeviceId)}, device)
+	if err != nil {
+		server.Log.Errorf("find device error : %v", err)
+		return result, err
+	}
+
+	product := &models.Product{}
+	err = server.RPCCallByName(nil, "registry", "Registry.FindProduct", device.ProductID, product)
+	if err != nil {
+		server.Log.Errorf("find product error : %v", err)
+		return result, err
+	}
+
+	pc, err := productconfig.New(product.ProductConfig)
+	if err != nil {
+		server.Log.Errorf("product config error : %v", err)
+		return result, err
+	}
+	ev := &protocol.Data{}
+	ev.SubData = args.Subdata
+	m, err := pc.StatusToMap(ev.SubData)
+	if err != nil {
+		server.Log.Errorf("gen status json error : %v", err)
+		return result, err
+	}
+	b, err := json.Marshal(&m)
+	if err != nil {
+		server.Log.Errorf("marshal json error : %v", err)
+	}
+	result = string(b)
+	return result, nil
 }
 
 func (c *Controller) OnEvent(args rpcs.ArgsOnEvent, reply *rpcs.ReplyOnEvent) error {
@@ -109,14 +145,6 @@ func (c *Controller) OnEvent(args rpcs.ArgsOnEvent, reply *rpcs.ReplyOnEvent) er
 		}
 	}()
 
-	err := c.eventRecorder.Insert(args)
-	if err != nil {
-		return err
-	}
-	err = c.eventsQueue.Send(args)
-	if err != nil {
-		return err
-	}
 	return nil
 }
 
@@ -146,23 +174,29 @@ type ActorSystem struct {
 	rootActor ruleEngine.Ref
 }
 
-func initActorSystem() (*ActorSystem, error) {
-	actorContext := new(ruleEngine.SystemContext)
+
+// 初始化actor system
+func (c *Controller) initActorSystem() (*ActorSystem, error) {
+
 	system := ruleEngine.NewDefaultActorSystem(&ruleEngine.DefaultActorSystemConfig{
 		SchedulerPoolSize:            5,
 		AppDispatcherPoolSize:        4,
 		TenantDispatcherPoolSize:     4,
 		RuleEngineDispatcherPoolSize: 4,
 	})
-	system.CreateDispatcher(ruleEngine.APP_DISPATCHER_NAME, ruleEngine.NewPoolDispatcher(5))
-	system.CreateDispatcher(ruleEngine.TENANT_DISPATCHER_NAME, ruleEngine.NewPoolDispatcher(4))
-	system.CreateDispatcher(ruleEngine.RULE_DISPATCHER_NAME, ruleEngine.NewPoolDispatcher(4))
-	actorContext.ActorSystem = system
+	_ = system.CreateDispatcher(ruleEngine.APP_DISPATCHER_NAME, ruleEngine.NewPoolDispatcher(5))
+	_ = system.CreateDispatcher(ruleEngine.TENANT_DISPATCHER_NAME, ruleEngine.NewPoolDispatcher(4))
+	_ = system.CreateDispatcher(ruleEngine.RULE_DISPATCHER_NAME, ruleEngine.NewPoolDispatcher(4))
+
 	// init services
 	tenantService := &ruleEngine.TestTenantService{}
-	rulechainService := &ruleEngine.TestRuleChainService{}
-	actorContext.TenantService = tenantService
-	actorContext.RuleChainService = rulechainService
+	ruleChainService := &ruleEngine.TestRuleChainService{}
+	actorContext := ruleEngine.NewSystemContext(system, ruleEngine.SystemContextServiceConfig{
+		ClusterService:   c.cluster,
+		RuleChainService: ruleChainService,
+		TenantService:    tenantService,
+
+	})
 	appActor, err := system.CreateRootActor(ruleEngine.APP_DISPATCHER_NAME,
 		actors.NewAppActorCreator(actorContext))
 	if err != nil {
@@ -172,5 +206,29 @@ func initActorSystem() (*ActorSystem, error) {
 	server.Log.Debugln("actor system initialized")
 	time.Sleep(time.Second * 1)
 	appActor.Tell(&ruleEngine.AppInitMsg{})
+	c.actorContext = actorContext
 	return &ActorSystem{rootActor: appActor}, nil
 }
+// 启动mq consumers
+func (c *Controller) launchConsumer() {
+	msgs, err := c.consumer.Pop(100 * time.Millisecond)
+	if err != nil {
+		server.Log.Error(err)
+	}
+	for {
+		select {
+		case msg := <-msgs:
+			ruleEngineMsg := &protocol.Message{}
+			if err := ruleEngineMsg.Decode(msg.GetData()); err != nil {
+				fmt.Println("解析消息失败")
+			}
+			fmt.Println(ruleEngineMsg.Data)
+			if c.actorContext != nil {
+				c.actorContext.Tell(&ruleEngine.QueueToRuleEngineMsg{
+					TenantId: "tenant_1",
+					Message:  ruleEngineMsg,
+				})
+			}
+		}
+	}
+}

+ 0 - 36
services/controller/controller_test.go

@@ -1,36 +0,0 @@
-package main
-
-import (
-	"sparrow/pkg/protocol"
-	"sparrow/pkg/ruleEngine"
-	"sparrow/pkg/server"
-	"testing"
-	"time"
-)
-
-func TestInit(t *testing.T) {
-	server.InitLog("test", "debug")
-	as, err := initActorSystem()
-	if err != nil {
-		t.Error(err)
-	}
-	time.Sleep(5 * time.Second)
-	as.rootActor.Tell(&ruleEngine.QueueToRuleEngineMsg{
-		TenantId: "tenant_1",
-		Message: &protocol.Message{
-			QueueName:  "",
-			Id:         "123",
-			Ts:         nil,
-			Type:       "POST_ATTRIBUTES_REQUEST",
-			Data:       "",
-			RuleChanId: "",
-			RuleNodeId: "",
-			Callback:   nil,
-			MetaData:   nil,
-			Originator: "",
-		},
-		RelationTypes:  nil,
-		FailureMessage: nil,
-	})
-	select {}
-}

+ 1 - 5
services/controller/flags.go

@@ -5,14 +5,10 @@ import (
 )
 
 const (
-	flagMongoHost    = "mongohost"
-	defaultMongoHost = "192.168.175.60:27017"
-
 	flagRabbitHost    = "rabbithost"
-	defaultRabbitHost = "amqp://knowocloud:123456@192.168.175.60:5672/"
+	defaultRabbitHost = "amqp://guest:guest@localhost:5672/"
 )
 
 var (
-	confMongoHost  = flag.String(flagMongoHost, defaultMongoHost, "mongo host address, ip:port")
 	confRabbitHost = flag.String(flagRabbitHost, defaultRabbitHost, "rabbitmq host address, amqp://user:password@ip:port/")
 )

+ 7 - 1
services/controller/main.go

@@ -13,11 +13,17 @@ func main() {
 	}
 
 	// register a rpc service
-	controller, err := NewController(*confMongoHost, *confRabbitHost)
+	controller, err := NewController(*confRabbitHost)
 	if err != nil {
 		server.Log.Errorf("NewController Error: %s", err)
 		return
 	}
+	_, err =controller.initActorSystem()
+	if err != nil {
+		server.Log.Errorf("NewController Error: %s", err)
+		return
+	}
+	go controller.launchConsumer()
 
 	err = server.RegisterRPCHandler(controller)
 	if err != nil {

+ 51 - 0
services/controller/service.go

@@ -0,0 +1,51 @@
+package main
+
+import (
+	"sparrow/pkg/protocol"
+	"sparrow/pkg/queue"
+	"sparrow/pkg/server"
+)
+
+// ClusterService 集群服务
+type ClusterService struct {
+	producer queue.QueueProducer
+}
+
+func (c *ClusterService) PushMessageToRuleEngine(info *queue.TopicPartitionInfo, msgId string, msg *protocol.Message, callback queue.Callback) {
+	m, err := queue.NewGobQueueMessage(msg)
+	if err != nil {
+		return
+	}
+	if err :=c.producer.Send(info, m, callback);err != nil {
+		server.Log.Error(err)
+	}
+}
+
+//type TenantService struct {
+//
+//}
+//
+//func (t *TenantService) FindTenants() ([]*ruleEngine.Tenant, error) {
+//	var arg int
+//	var reply []*models.Vendor
+//	var result []*ruleEngine.Tenant
+//	err :=server.RPCCallByName(context.Background(), "registry", "registry.GetVendors", &arg, &reply)
+//	if err != nil {
+//		return nil, err
+//	}
+//	for _, vendor := range reply {
+//		result = append(result, &ruleEngine.Tenant{
+//			Id: vendor.RecordId,
+//		})
+//	}
+//	return result, nil
+//}
+//
+//func (t *TenantService) GetTenant(tId string) (*ruleEngine.Tenant, error) {
+//	var reply *models.Vendor
+//	err := server.
+//}
+
+
+
+

+ 1 - 1
services/devicemanager/flags.go

@@ -7,7 +7,7 @@ import (
 const (
 	flagRedisHost = "redishost"
 
-	defaultRedisHost = "192.168.175.60:6379"
+	defaultRedisHost = "127.0.0.1:6379"
 )
 
 var (

+ 1 - 1
services/knowoapi/model/device_test.go

@@ -105,7 +105,7 @@ func newDevice(t *testing.T) *Device {
 }
 
 func getDB() (*gorm.DB, error) {
-	db, err := mysql.GetClient("192.168.175.60", "3306", "SparrowCloud", "SparrowCloud", "123456")
+	db, err := mysql.GetClient("39.98.250.155", "3306", "gxt-iot-db", "root", "gEkYDPloQcp93t4WHr3X")
 	if err != nil {
 		return nil, err
 	}

+ 2 - 0
services/knowoapi/model/vendor.go

@@ -1,6 +1,7 @@
 package model
 
 import (
+	"github.com/gogf/gf/util/guid"
 	"sparrow/pkg/models"
 
 	"github.com/jinzhu/gorm"
@@ -19,6 +20,7 @@ func (a *Vendor) Init(db *gorm.DB) *Vendor {
 
 // Create 增加
 func (a *Vendor) Create(vendor *models.Vendor) error {
+	vendor.RecordId = guid.S()
 	return a.db.Save(vendor).Error
 }
 

+ 1 - 1
services/registry/registry.go

@@ -216,7 +216,7 @@ func (r *Registry) FindVendor(id int32, reply *models.Vendor) error {
 }
 
 // GetVendors will get all vendors in the platform.
-func (r *Registry) GetVendors(noarg int, reply *[]models.Vendor) error {
+func (r *Registry) GetVendors(noarg int, reply []*models.Vendor) error {
 	db, err := getDB()
 	if err != nil {
 		return err

+ 1 - 0
services/registry/vendor.go

@@ -0,0 +1 @@
+package main

BIN
tests/device/device


+ 12 - 9
tests/device/device.go

@@ -9,6 +9,7 @@ import (
 	"os"
 	"sparrow/pkg/coap"
 	"sparrow/pkg/protocol"
+	"sparrow/pkg/server"
 	"sparrow/pkg/tlv"
 	"time"
 
@@ -164,14 +165,13 @@ func (d *Device) DoLogin() error {
 }
 
 func (d *Device) reportStatus(client *MQTT.Client) {
-
 	for {
-		time.Sleep(10 * time.Second)
+		time.Sleep(2 * time.Second)
 		payloadHead := protocol.DataHead{
 			Flag:      0,
 			Timestamp: uint64(time.Now().Unix() * 1000),
 		}
-		param := []interface{}{1}
+		param := []interface{}{"lijian"}
 		params, err := tlv.MakeTLVs(param)
 		if err != nil {
 			fmt.Println(err)
@@ -179,9 +179,10 @@ func (d *Device) reportStatus(client *MQTT.Client) {
 		}
 		sub := protocol.SubData{
 			Head: protocol.SubDataHead{
-				SubDeviceid: uint16(1),
+				SubDeviceid: uint16(225),
 				PropertyNum: uint16(1),
 				ParamsCount: uint16(len(params)),
+				ExternalDeviceId: [8]byte{0x01},
 			},
 			Params: params,
 		}
@@ -198,7 +199,6 @@ func (d *Device) reportStatus(client *MQTT.Client) {
 			fmt.Println(err)
 			return
 		}
-
 		client.Publish("s", 1, false, payload)
 	}
 
@@ -383,12 +383,15 @@ func (d *Device) doMQTTAccess() error {
 
 	//create and start a client using the above ClientOptions
 	c := MQTT.NewClient(opts)
-	if token := c.Connect(); token.Wait() && token.Error() != nil {
-		return token.Error()
-	}
+	go func() {
+		if token := c.Connect(); token.Wait() && token.Error() != nil {
+			server.Log.Error(token.Error())
+			return
+		}
+	}()
 	go d.reportStatus(c)
 	// we just pause here to wait for messages
-	<-make(chan int)
+	 <-make(chan int)
 
 	defer c.Disconnect(250)
 

+ 1 - 2
tests/device/main.go

@@ -6,7 +6,7 @@ import (
 )
 
 var (
-	testURL        = flag.String("url", "http://192.168.175.60:8088", "login url")
+	testURL        = flag.String("url", "http://192.168.1.107:8088", "login url")
 	testProductKey = flag.String("productkey", "99b11b395c84435202692e36dada175c7af9452038a62a40b230b5e18b7d51ff", "product key")
 	testProtocol   = flag.String("protocol", "mqtt", "access protocol")
 )
@@ -32,7 +32,6 @@ func main() {
 		fmt.Printf("device login error %s", err)
 		return
 	}
-
 	err = dev.DoAccess()
 	if err != nil {
 		fmt.Printf("device access error %s", err)