123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- /*
- * Copyright (c) 2014 IBM Corp.
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Allan Stockdill-Mander
- */
- package mqtt
- import (
- "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/packets"
- "sync"
- "time"
- )
- //PacketAndToken is a struct that contains both a ControlPacket and a
- //Token. This struct is passed via channels between the client interface
- //code and the underlying code responsible for sending and receiving
- //MQTT messages.
- type PacketAndToken struct {
- p packets.ControlPacket
- t Token
- }
- //Token defines the interface for the tokens used to indicate when
- //actions have completed.
- type Token interface {
- Wait() bool
- WaitTimeout(time.Duration) bool
- flowComplete()
- Error() error
- }
- type baseToken struct {
- m sync.RWMutex
- complete chan struct{}
- ready bool
- err error
- }
- // Wait will wait indefinitely for the Token to complete, ie the Publish
- // to be sent and confirmed receipt from the broker
- func (b *baseToken) Wait() bool {
- b.m.Lock()
- defer b.m.Unlock()
- if !b.ready {
- <-b.complete
- b.ready = true
- }
- return b.ready
- }
- // WaitTimeout takes a time in ms to wait for the flow associated with the
- // Token to complete, returns true if it returned before the timeout or
- // returns false if the timeout occurred. In the case of a timeout the Token
- // does not have an error set in case the caller wishes to wait again
- func (b *baseToken) WaitTimeout(d time.Duration) bool {
- b.m.Lock()
- defer b.m.Unlock()
- if !b.ready {
- select {
- case <-b.complete:
- b.ready = true
- case <-time.After(d):
- }
- }
- return b.ready
- }
- func (b *baseToken) flowComplete() {
- close(b.complete)
- }
- func (b *baseToken) Error() error {
- b.m.RLock()
- defer b.m.RUnlock()
- return b.err
- }
- func newToken(tType byte) Token {
- switch tType {
- case packets.Connect:
- return &ConnectToken{baseToken: baseToken{complete: make(chan struct{})}}
- case packets.Subscribe:
- return &SubscribeToken{baseToken: baseToken{complete: make(chan struct{})}, subResult: make(map[string]byte)}
- case packets.Publish:
- return &PublishToken{baseToken: baseToken{complete: make(chan struct{})}}
- case packets.Unsubscribe:
- return &UnsubscribeToken{baseToken: baseToken{complete: make(chan struct{})}}
- case packets.Disconnect:
- return &DisconnectToken{baseToken: baseToken{complete: make(chan struct{})}}
- }
- return nil
- }
- //ConnectToken is an extension of Token containing the extra fields
- //required to provide information about calls to Connect()
- type ConnectToken struct {
- baseToken
- returnCode byte
- }
- //ReturnCode returns the acknowlegement code in the connack sent
- //in response to a Connect()
- func (c *ConnectToken) ReturnCode() byte {
- c.m.RLock()
- defer c.m.RUnlock()
- return c.returnCode
- }
- //PublishToken is an extension of Token containing the extra fields
- //required to provide information about calls to Publish()
- type PublishToken struct {
- baseToken
- messageID uint16
- }
- //MessageID returns the MQTT message ID that was assigned to the
- //Publish packet when it was sent to the broker
- func (p *PublishToken) MessageID() uint16 {
- return p.messageID
- }
- //SubscribeToken is an extension of Token containing the extra fields
- //required to provide information about calls to Subscribe()
- type SubscribeToken struct {
- baseToken
- subs []string
- subResult map[string]byte
- }
- //Result returns a map of topics that were subscribed to along with
- //the matching return code from the broker. This is either the Qos
- //value of the subscription or an error code.
- func (s *SubscribeToken) Result() map[string]byte {
- s.m.RLock()
- defer s.m.RUnlock()
- return s.subResult
- }
- //UnsubscribeToken is an extension of Token containing the extra fields
- //required to provide information about calls to Unsubscribe()
- type UnsubscribeToken struct {
- baseToken
- }
- //DisconnectToken is an extension of Token containing the extra fields
- //required to provide information about calls to Disconnect()
- type DisconnectToken struct {
- baseToken
- }
|