router.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. /*
  2. * Copyright (c) 2021 IBM Corp and others.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Seth Hoenig
  15. * Allan Stockdill-Mander
  16. * Mike Robertson
  17. */
  18. package mqtt
  19. import (
  20. "container/list"
  21. "strings"
  22. "sync"
  23. "github.com/eclipse/paho.mqtt.golang/packets"
  24. )
  25. // route is a type which associates MQTT Topic strings with a
  26. // callback to be executed upon the arrival of a message associated
  27. // with a subscription to that topic.
  28. type route struct {
  29. topic string
  30. callback MessageHandler
  31. }
  32. // match takes a slice of strings which represent the route being tested having been split on '/'
  33. // separators, and a slice of strings representing the topic string in the published message, similarly
  34. // split.
  35. // The function determines if the topic string matches the route according to the MQTT topic rules
  36. // and returns a boolean of the outcome
  37. func match(route []string, topic []string) bool {
  38. if len(route) == 0 {
  39. return len(topic) == 0
  40. }
  41. if len(topic) == 0 {
  42. return route[0] == "#"
  43. }
  44. if route[0] == "#" {
  45. return true
  46. }
  47. if (route[0] == "+") || (route[0] == topic[0]) {
  48. return match(route[1:], topic[1:])
  49. }
  50. return false
  51. }
  52. func routeIncludesTopic(route, topic string) bool {
  53. return match(routeSplit(route), strings.Split(topic, "/"))
  54. }
  55. // removes $share and sharename when splitting the route to allow
  56. // shared subscription routes to correctly match the topic
  57. func routeSplit(route string) []string {
  58. var result []string
  59. if strings.HasPrefix(route, "$share") {
  60. result = strings.Split(route, "/")[2:]
  61. } else {
  62. result = strings.Split(route, "/")
  63. }
  64. return result
  65. }
  66. // match takes the topic string of the published message and does a basic compare to the
  67. // string of the current Route, if they match it returns true
  68. func (r *route) match(topic string) bool {
  69. return r.topic == topic || routeIncludesTopic(r.topic, topic)
  70. }
  71. type router struct {
  72. sync.RWMutex
  73. routes *list.List
  74. defaultHandler MessageHandler
  75. messages chan *packets.PublishPacket
  76. }
  77. // newRouter returns a new instance of a Router and channel which can be used to tell the Router
  78. // to stop
  79. func newRouter() *router {
  80. router := &router{routes: list.New(), messages: make(chan *packets.PublishPacket)}
  81. return router
  82. }
  83. // addRoute takes a topic string and MessageHandler callback. It looks in the current list of
  84. // routes to see if there is already a matching Route. If there is it replaces the current
  85. // callback with the new one. If not it add a new entry to the list of Routes.
  86. func (r *router) addRoute(topic string, callback MessageHandler) {
  87. r.Lock()
  88. defer r.Unlock()
  89. for e := r.routes.Front(); e != nil; e = e.Next() {
  90. if e.Value.(*route).topic == topic {
  91. r := e.Value.(*route)
  92. r.callback = callback
  93. return
  94. }
  95. }
  96. r.routes.PushBack(&route{topic: topic, callback: callback})
  97. }
  98. // deleteRoute takes a route string, looks for a matching Route in the list of Routes. If
  99. // found it removes the Route from the list.
  100. func (r *router) deleteRoute(topic string) {
  101. r.Lock()
  102. defer r.Unlock()
  103. for e := r.routes.Front(); e != nil; e = e.Next() {
  104. if e.Value.(*route).topic == topic {
  105. r.routes.Remove(e)
  106. return
  107. }
  108. }
  109. }
  110. // setDefaultHandler assigns a default callback that will be called if no matching Route
  111. // is found for an incoming Publish.
  112. func (r *router) setDefaultHandler(handler MessageHandler) {
  113. r.Lock()
  114. defer r.Unlock()
  115. r.defaultHandler = handler
  116. }
  117. // matchAndDispatch takes a channel of Message pointers as input and starts a go routine that
  118. // takes messages off the channel, matches them against the internal route list and calls the
  119. // associated callback (or the defaultHandler, if one exists and no other route matched). If
  120. // anything is sent down the stop channel the function will end.
  121. func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
  122. var wg sync.WaitGroup
  123. ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed
  124. var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel
  125. stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan
  126. ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan
  127. goRoutinesDone := make(chan struct{}) // closed on wg.Done()
  128. if order {
  129. ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done
  130. } else {
  131. // When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done
  132. ackInChan = make(chan *PacketAndToken)
  133. go func() { // go routine to copy from ackInChan to ackOutChan until stopped
  134. for {
  135. select {
  136. case a := <-ackInChan:
  137. ackOutChan <- a
  138. case <-stopAckCopy:
  139. close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan
  140. for {
  141. select {
  142. case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)
  143. DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
  144. case <-goRoutinesDone:
  145. close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure)
  146. DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.")
  147. return
  148. }
  149. }
  150. }
  151. }
  152. }()
  153. }
  154. go func() { // Main go routine handling inbound messages
  155. for message := range messages {
  156. // DEBUG.Println(ROU, "matchAndDispatch received message")
  157. sent := false
  158. r.RLock()
  159. m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message))
  160. var handlers []MessageHandler
  161. for e := r.routes.Front(); e != nil; e = e.Next() {
  162. if e.Value.(*route).match(message.TopicName) {
  163. if order {
  164. handlers = append(handlers, e.Value.(*route).callback)
  165. } else {
  166. hd := e.Value.(*route).callback
  167. wg.Add(1)
  168. go func() {
  169. hd(client, m)
  170. if !client.options.AutoAckDisabled {
  171. m.Ack()
  172. }
  173. wg.Done()
  174. }()
  175. }
  176. sent = true
  177. }
  178. }
  179. if !sent {
  180. if r.defaultHandler != nil {
  181. if order {
  182. handlers = append(handlers, r.defaultHandler)
  183. } else {
  184. wg.Add(1)
  185. go func() {
  186. r.defaultHandler(client, m)
  187. if !client.options.AutoAckDisabled {
  188. m.Ack()
  189. }
  190. wg.Done()
  191. }()
  192. }
  193. } else {
  194. DEBUG.Println(ROU, "matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.")
  195. }
  196. }
  197. r.RUnlock()
  198. for _, handler := range handlers {
  199. handler(client, m)
  200. if !client.options.AutoAckDisabled {
  201. m.Ack()
  202. }
  203. }
  204. // DEBUG.Println(ROU, "matchAndDispatch handled message")
  205. }
  206. if order {
  207. close(ackOutChan)
  208. } else { // Ensure that nothing further will be written to ackOutChan before closing it
  209. close(stopAckCopy)
  210. <-ackCopyStopped
  211. close(ackOutChan)
  212. go func() {
  213. wg.Wait() // Note: If this remains running then the user has handlers that are not returning
  214. close(goRoutinesDone)
  215. }()
  216. }
  217. DEBUG.Println(ROU, "matchAndDispatch exiting")
  218. }()
  219. return ackOutChan
  220. }