package protocol import ( "bytes" "encoding/gob" "sparrow/pkg/server" "sync/atomic" "time" ) type Message struct { QueueName string Id string Ts *time.Time Type string Data string RuleChanId string RuleNodeId string Callback IMessageCallBack MetaData map[string]interface{} Originator string execCounter int32 } func (a *Message) TransformMsg(msgType string, ori string, data string) *Message { return &Message{ QueueName: a.QueueName, Id: a.Id, Ts: a.Ts, Type: msgType, Data: data, RuleChanId: a.RuleChanId, RuleNodeId: a.RuleNodeId, Callback: a.Callback, MetaData: a.MetaData, Originator: ori, } } func (a *Message) GetAndIncrementRuleNodeCounter() int32 { return atomic.AddInt32(&a.execCounter, 1) } func (a *Message) Encode() ([]byte, error) { var network bytes.Buffer enc := gob.NewEncoder(&network) err := enc.Encode(a) if err != nil { return nil, err } return network.Bytes(), nil } func (a *Message) Decode(data []byte) error { var network bytes.Buffer network.Write(data) dec := gob.NewDecoder(&network) return dec.Decode(a) } // IMessageCallBack message call back type IMessageCallBack interface { // on success do sth. OnSuccess() // on failure do sth. OnFailure(err error) // on process start do sth. OnProcessingStart(ruleNodeInfo *RuleNodeInfo) // on process end do sth. OnProcessingEnd(ruleNodeId string) } type emptyCallBack struct { } func (e emptyCallBack) OnProcessingStart(ruleNodeInfo *RuleNodeInfo) { } func (e emptyCallBack) OnProcessingEnd(ruleNodeId string) { } var EmptyCallBack = emptyCallBack{} func (e emptyCallBack) OnSuccess() { } func (e emptyCallBack) OnFailure(err error) { server.Log.Error("消息出错:" + err.Error()) } // SetCallBack func (a *Message) SetCallBack(callback IMessageCallBack) { a.Callback = callback } // GetCallBack get message call back func (a *Message) GetCallBack() IMessageCallBack { if a.Callback == nil { return EmptyCallBack } else { return a.Callback } } func (a *Message) GetQueueName() string { if a.QueueName == "" { return "MAIN" } return a.QueueName } func (a *Message) CopyWithRuleChainId(ruleChainId string) *Message { return &Message{ QueueName: a.QueueName, Id: a.Id, Ts: a.Ts, Type: a.Type, Data: a.Data, RuleChanId: ruleChainId, RuleNodeId: a.RuleNodeId, Callback: a.Callback, MetaData: a.MetaData, Originator: a.Originator, } } func (a *Message) CopyWithRuleNodeId(ruleNodeId string) *Message { return &Message{ QueueName: a.QueueName, Id: a.Id, Ts: a.Ts, Type: a.Type, Data: a.Data, RuleChanId: a.RuleChanId, RuleNodeId: ruleNodeId, Callback: a.Callback, MetaData: a.MetaData, Originator: a.Originator, } }