router.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. /*
  2. * Copyright (c) 2013 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * which accompanies this distribution, and is available at
  7. * http://www.eclipse.org/legal/epl-v10.html
  8. *
  9. * Contributors:
  10. * Seth Hoenig
  11. * Allan Stockdill-Mander
  12. * Mike Robertson
  13. */
  14. package mqtt
  15. import (
  16. "container/list"
  17. "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/packets"
  18. "strings"
  19. "sync"
  20. )
  21. // route is a type which associates MQTT Topic strings with a
  22. // callback to be executed upon the arrival of a message associated
  23. // with a subscription to that topic.
  24. type route struct {
  25. topic string
  26. callback MessageHandler
  27. }
  28. // match takes a slice of strings which represent the route being tested having been split on '/'
  29. // separators, and a slice of strings representing the topic string in the published message, similarly
  30. // split.
  31. // The function determines if the topic string matches the route according to the MQTT topic rules
  32. // and returns a boolean of the outcome
  33. func match(route []string, topic []string) bool {
  34. if len(route) == 0 {
  35. if len(topic) == 0 {
  36. return true
  37. }
  38. return false
  39. }
  40. if len(topic) == 0 {
  41. if route[0] == "#" {
  42. return true
  43. }
  44. return false
  45. }
  46. if route[0] == "#" {
  47. return true
  48. }
  49. if (route[0] == "+") || (route[0] == topic[0]) {
  50. return match(route[1:], topic[1:])
  51. }
  52. return false
  53. }
  54. func routeIncludesTopic(route, topic string) bool {
  55. return match(strings.Split(route, "/"), strings.Split(topic, "/"))
  56. }
  57. // match takes the topic string of the published message and does a basic compare to the
  58. // string of the current Route, if they match it returns true
  59. func (r *route) match(topic string) bool {
  60. return r.topic == topic || routeIncludesTopic(r.topic, topic)
  61. }
  62. type router struct {
  63. sync.RWMutex
  64. routes *list.List
  65. defaultHandler MessageHandler
  66. messages chan *packets.PublishPacket
  67. stop chan bool
  68. }
  69. // newRouter returns a new instance of a Router and channel which can be used to tell the Router
  70. // to stop
  71. func newRouter() (*router, chan bool) {
  72. router := &router{routes: list.New(), messages: make(chan *packets.PublishPacket), stop: make(chan bool)}
  73. stop := router.stop
  74. return router, stop
  75. }
  76. // addRoute takes a topic string and MessageHandler callback. It looks in the current list of
  77. // routes to see if there is already a matching Route. If there is it replaces the current
  78. // callback with the new one. If not it add a new entry to the list of Routes.
  79. func (r *router) addRoute(topic string, callback MessageHandler) {
  80. r.Lock()
  81. defer r.Unlock()
  82. for e := r.routes.Front(); e != nil; e = e.Next() {
  83. if e.Value.(*route).match(topic) {
  84. r := e.Value.(*route)
  85. r.callback = callback
  86. return
  87. }
  88. }
  89. r.routes.PushBack(&route{topic: topic, callback: callback})
  90. }
  91. // deleteRoute takes a route string, looks for a matching Route in the list of Routes. If
  92. // found it removes the Route from the list.
  93. func (r *router) deleteRoute(topic string) {
  94. r.Lock()
  95. defer r.Unlock()
  96. for e := r.routes.Front(); e != nil; e = e.Next() {
  97. if e.Value.(*route).match(topic) {
  98. r.routes.Remove(e)
  99. return
  100. }
  101. }
  102. }
  103. // setDefaultHandler assigns a default callback that will be called if no matching Route
  104. // is found for an incoming Publish.
  105. func (r *router) setDefaultHandler(handler MessageHandler) {
  106. r.defaultHandler = handler
  107. }
  108. // matchAndDispatch takes a channel of Message pointers as input and starts a go routine that
  109. // takes messages off the channel, matches them against the internal route list and calls the
  110. // associated callback (or the defaultHandler, if one exists and no other route matched). If
  111. // anything is sent down the stop channel the function will end.
  112. func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *Client) {
  113. go func() {
  114. for {
  115. select {
  116. case message := <-messages:
  117. sent := false
  118. r.RLock()
  119. for e := r.routes.Front(); e != nil; e = e.Next() {
  120. if e.Value.(*route).match(message.TopicName) {
  121. if order {
  122. r.RUnlock()
  123. e.Value.(*route).callback(client, messageFromPublish(message))
  124. r.RLock()
  125. } else {
  126. go e.Value.(*route).callback(client, messageFromPublish(message))
  127. }
  128. sent = true
  129. }
  130. }
  131. r.RUnlock()
  132. if !sent && r.defaultHandler != nil {
  133. if order {
  134. r.RLock()
  135. r.defaultHandler(client, messageFromPublish(message))
  136. r.RUnlock()
  137. } else {
  138. go r.defaultHandler(client, messageFromPublish(message))
  139. }
  140. }
  141. case <-r.stop:
  142. return
  143. }
  144. }
  145. }()
  146. }