net.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. /*
  2. * Copyright (c) 2013 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * which accompanies this distribution, and is available at
  7. * http://www.eclipse.org/legal/epl-v10.html
  8. *
  9. * Contributors:
  10. * Seth Hoenig
  11. * Allan Stockdill-Mander
  12. * Mike Robertson
  13. */
  14. package mqtt
  15. import (
  16. "crypto/tls"
  17. "errors"
  18. "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/packets"
  19. "golang.org/x/net/websocket"
  20. "net"
  21. "net/url"
  22. "reflect"
  23. "time"
  24. )
  25. func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration) (net.Conn, error) {
  26. switch uri.Scheme {
  27. case "ws":
  28. conn, err := websocket.Dial(uri.String(), "mqtt", "ws://localhost")
  29. if err != nil {
  30. return nil, err
  31. }
  32. conn.PayloadType = websocket.BinaryFrame
  33. return conn, err
  34. case "wss":
  35. config, _ := websocket.NewConfig(uri.String(), "ws://localhost")
  36. config.Protocol = []string{"mqtt"}
  37. config.TlsConfig = tlsc
  38. conn, err := websocket.DialConfig(config)
  39. if err != nil {
  40. return nil, err
  41. }
  42. conn.PayloadType = websocket.BinaryFrame
  43. return conn, err
  44. case "tcp":
  45. conn, err := net.DialTimeout("tcp", uri.Host, timeout)
  46. if err != nil {
  47. return nil, err
  48. }
  49. return conn, nil
  50. case "ssl":
  51. fallthrough
  52. case "tls":
  53. fallthrough
  54. case "tcps":
  55. conn, err := tls.DialWithDialer(&net.Dialer{Timeout: timeout}, "tcp", uri.Host, tlsc)
  56. if err != nil {
  57. return nil, err
  58. }
  59. return conn, nil
  60. }
  61. return nil, errors.New("Unknown protocol")
  62. }
  63. // actually read incoming messages off the wire
  64. // send Message object into ibound channel
  65. func incoming(c *Client) {
  66. defer c.workers.Done()
  67. var err error
  68. var cp packets.ControlPacket
  69. DEBUG.Println(NET, "incoming started")
  70. for {
  71. if cp, err = packets.ReadPacket(c.conn); err != nil {
  72. break
  73. }
  74. DEBUG.Println(NET, "Received Message")
  75. c.ibound <- cp
  76. }
  77. // We received an error on read.
  78. // If disconnect is in progress, swallow error and return
  79. select {
  80. case <-c.stop:
  81. DEBUG.Println(NET, "incoming stopped")
  82. return
  83. // Not trying to disconnect, send the error to the errors channel
  84. default:
  85. ERROR.Println(NET, "incoming stopped with error")
  86. c.errors <- err
  87. return
  88. }
  89. }
  90. // receive a Message object on obound, and then
  91. // actually send outgoing message to the wire
  92. func outgoing(c *Client) {
  93. defer c.workers.Done()
  94. DEBUG.Println(NET, "outgoing started")
  95. for {
  96. DEBUG.Println(NET, "outgoing waiting for an outbound message")
  97. select {
  98. case <-c.stop:
  99. DEBUG.Println(NET, "outgoing stopped")
  100. return
  101. case pub := <-c.obound:
  102. msg := pub.p.(*packets.PublishPacket)
  103. if msg.Qos != 0 && msg.MessageID == 0 {
  104. msg.MessageID = c.getID(pub.t)
  105. pub.t.(*PublishToken).messageID = msg.MessageID
  106. }
  107. //persist_obound(c.persist, msg)
  108. if c.options.WriteTimeout > 0 {
  109. c.conn.SetWriteDeadline(time.Now().Add(c.options.WriteTimeout))
  110. }
  111. if err := msg.Write(c.conn); err != nil {
  112. ERROR.Println(NET, "outgoing stopped with error")
  113. c.errors <- err
  114. return
  115. }
  116. if c.options.WriteTimeout > 0 {
  117. // If we successfully wrote, we don't want the timeout to happen during an idle period
  118. // so we reset it to infinite.
  119. c.conn.SetWriteDeadline(time.Time{})
  120. }
  121. if msg.Qos == 0 {
  122. pub.t.flowComplete()
  123. }
  124. c.lastContact.update()
  125. DEBUG.Println(NET, "obound wrote msg, id:", msg.MessageID)
  126. case msg := <-c.oboundP:
  127. switch msg.p.(type) {
  128. case *packets.SubscribePacket:
  129. msg.p.(*packets.SubscribePacket).MessageID = c.getID(msg.t)
  130. case *packets.UnsubscribePacket:
  131. msg.p.(*packets.UnsubscribePacket).MessageID = c.getID(msg.t)
  132. }
  133. DEBUG.Println(NET, "obound priority msg to write, type", reflect.TypeOf(msg.p))
  134. if err := msg.p.Write(c.conn); err != nil {
  135. ERROR.Println(NET, "outgoing stopped with error")
  136. c.errors <- err
  137. return
  138. }
  139. c.lastContact.update()
  140. switch msg.p.(type) {
  141. case *packets.DisconnectPacket:
  142. msg.t.(*DisconnectToken).flowComplete()
  143. DEBUG.Println(NET, "outbound wrote disconnect, stopping")
  144. return
  145. }
  146. }
  147. }
  148. }
  149. // receive Message objects on ibound
  150. // store messages if necessary
  151. // send replies on obound
  152. // delete messages from store if necessary
  153. func alllogic(c *Client) {
  154. DEBUG.Println(NET, "logic started")
  155. for {
  156. DEBUG.Println(NET, "logic waiting for msg on ibound")
  157. select {
  158. case msg := <-c.ibound:
  159. DEBUG.Println(NET, "logic got msg on ibound")
  160. //persist_ibound(c.persist, msg)
  161. switch msg.(type) {
  162. case *packets.PingrespPacket:
  163. DEBUG.Println(NET, "received pingresp")
  164. c.pingOutstanding = false
  165. case *packets.SubackPacket:
  166. sa := msg.(*packets.SubackPacket)
  167. DEBUG.Println(NET, "received suback, id:", sa.MessageID)
  168. token := c.getToken(sa.MessageID).(*SubscribeToken)
  169. DEBUG.Println(NET, "granted qoss", sa.GrantedQoss)
  170. for i, qos := range sa.GrantedQoss {
  171. token.subResult[token.subs[i]] = qos
  172. }
  173. token.flowComplete()
  174. go c.freeID(sa.MessageID)
  175. case *packets.UnsubackPacket:
  176. ua := msg.(*packets.UnsubackPacket)
  177. DEBUG.Println(NET, "received unsuback, id:", ua.MessageID)
  178. token := c.getToken(ua.MessageID).(*UnsubscribeToken)
  179. token.flowComplete()
  180. go c.freeID(ua.MessageID)
  181. case *packets.PublishPacket:
  182. pp := msg.(*packets.PublishPacket)
  183. DEBUG.Println(NET, "received publish, msgId:", pp.MessageID)
  184. DEBUG.Println(NET, "putting msg on onPubChan")
  185. switch pp.Qos {
  186. case 2:
  187. c.incomingPubChan <- pp
  188. DEBUG.Println(NET, "done putting msg on incomingPubChan")
  189. pr := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket)
  190. pr.MessageID = pp.MessageID
  191. DEBUG.Println(NET, "putting pubrec msg on obound")
  192. c.oboundP <- &PacketAndToken{p: pr, t: nil}
  193. DEBUG.Println(NET, "done putting pubrec msg on obound")
  194. case 1:
  195. c.incomingPubChan <- pp
  196. DEBUG.Println(NET, "done putting msg on incomingPubChan")
  197. pa := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
  198. pa.MessageID = pp.MessageID
  199. DEBUG.Println(NET, "putting puback msg on obound")
  200. c.oboundP <- &PacketAndToken{p: pa, t: nil}
  201. DEBUG.Println(NET, "done putting puback msg on obound")
  202. case 0:
  203. select {
  204. case c.incomingPubChan <- pp:
  205. DEBUG.Println(NET, "done putting msg on incomingPubChan")
  206. case err, ok := <-c.errors:
  207. DEBUG.Println(NET, "error while putting msg on pubChanZero")
  208. // We are unblocked, but need to put the error back on so the outer
  209. // select can handle it appropriately.
  210. if ok {
  211. go func(errVal error, errChan chan error) {
  212. errChan <- errVal
  213. }(err, c.errors)
  214. }
  215. }
  216. }
  217. case *packets.PubackPacket:
  218. pa := msg.(*packets.PubackPacket)
  219. DEBUG.Println(NET, "received puback, id:", pa.MessageID)
  220. // c.receipts.get(msg.MsgId()) <- Receipt{}
  221. // c.receipts.end(msg.MsgId())
  222. c.getToken(pa.MessageID).flowComplete()
  223. c.freeID(pa.MessageID)
  224. case *packets.PubrecPacket:
  225. prec := msg.(*packets.PubrecPacket)
  226. DEBUG.Println(NET, "received pubrec, id:", prec.MessageID)
  227. prel := packets.NewControlPacket(packets.Pubrel).(*packets.PubrelPacket)
  228. prel.MessageID = prec.MessageID
  229. select {
  230. case c.oboundP <- &PacketAndToken{p: prel, t: nil}:
  231. case <-time.After(time.Second):
  232. }
  233. case *packets.PubrelPacket:
  234. pr := msg.(*packets.PubrelPacket)
  235. DEBUG.Println(NET, "received pubrel, id:", pr.MessageID)
  236. pc := packets.NewControlPacket(packets.Pubcomp).(*packets.PubcompPacket)
  237. pc.MessageID = pr.MessageID
  238. select {
  239. case c.oboundP <- &PacketAndToken{p: pc, t: nil}:
  240. case <-time.After(time.Second):
  241. }
  242. case *packets.PubcompPacket:
  243. pc := msg.(*packets.PubcompPacket)
  244. DEBUG.Println(NET, "received pubcomp, id:", pc.MessageID)
  245. c.getToken(pc.MessageID).flowComplete()
  246. c.freeID(pc.MessageID)
  247. }
  248. case <-c.stop:
  249. WARN.Println(NET, "logic stopped")
  250. return
  251. case err := <-c.errors:
  252. ERROR.Println(NET, "logic got error")
  253. c.internalConnLost(err)
  254. return
  255. }
  256. c.lastContact.update()
  257. }
  258. }