123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- package protocol
- import (
- "bytes"
- "encoding/gob"
- "time"
- )
- type MessageSerializer interface {
- Encode() bytes.Buffer
- Decode()
- }
- type Message struct {
- QueueName string
- Id string
- Ts *time.Time
- Type string
- Data string
- RuleChanId string
- RuleNodeId string
- Callback IMessageCallBack
- MetaData map[string]interface{}
- Originator string
- }
- func (a *Message) Encode() ([]byte, error) {
- var network bytes.Buffer
- enc := gob.NewEncoder(&network)
- err := enc.Encode(a)
- if err != nil {
- return nil, err
- }
- return network.Bytes(), nil
- }
- func (a *Message) Decode(data []byte) error {
- var network bytes.Buffer
- network.Write(data)
- dec := gob.NewDecoder(&network)
- return dec.Decode(a)
- }
- // IMessageCallBack message call back
- type IMessageCallBack interface {
- // on success do sth.
- OnSuccess()
- // on failure do sth.
- OnFailure(err error)
- // on process start do sth.
- OnProcessingStart(ruleNodeInfo *RuleNodeInfo)
- // on process end do sth.
- OnProcessingEnd(ruleNodeId string)
- }
- type emptyCallBack struct {
- }
- func (e emptyCallBack) OnProcessingStart(ruleNodeInfo *RuleNodeInfo) {
- }
- func (e emptyCallBack) OnProcessingEnd(ruleNodeId string) {
- }
- var EmptyCallBack = emptyCallBack{}
- func (e emptyCallBack) OnSuccess() {
- }
- func (e emptyCallBack) OnFailure(err error) {
- }
- // SetCallBack
- func (a *Message) SetCallBack(callback IMessageCallBack) {
- a.Callback = callback
- }
- // GetCallBack get message call back
- func (a *Message) GetCallBack() IMessageCallBack {
- if a.Callback == nil {
- return EmptyCallBack
- } else {
- return a.Callback
- }
- }
- func (a *Message) GetQueueName() string {
- if a.QueueName == "" {
- return "MAIN"
- }
- return a.QueueName
- }
- func (a *Message) CopyWithRuleChainId(ruleChainId string) *Message {
- return &Message{
- QueueName: a.QueueName,
- Id: a.Id,
- Ts: a.Ts,
- Type: a.Type,
- Data: a.Data,
- RuleChanId: ruleChainId,
- RuleNodeId: a.RuleNodeId,
- Callback: a.Callback,
- MetaData: a.MetaData,
- Originator: a.Originator,
- }
- }
- func (a *Message) CopyWithRuleNodeId(ruleNodeId string) *Message {
- return &Message{
- QueueName: a.QueueName,
- Id: a.Id,
- Ts: a.Ts,
- Type: a.Type,
- Data: a.Data,
- RuleChanId: a.RuleChanId,
- RuleNodeId: ruleNodeId,
- Callback: a.Callback,
- MetaData: a.MetaData,
- Originator: a.Originator,
- }
- }
|