reporter.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. // Copyright (c) 2017 Uber Technologies, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package jaeger
  15. import (
  16. "fmt"
  17. "sync"
  18. "sync/atomic"
  19. "time"
  20. "github.com/opentracing/opentracing-go"
  21. "github.com/uber/jaeger-client-go/log"
  22. )
  23. // Reporter is called by the tracer when a span is completed to report the span to the tracing collector.
  24. type Reporter interface {
  25. // Report submits a new span to collectors, possibly asynchronously and/or with buffering.
  26. Report(span *Span)
  27. // Close does a clean shutdown of the reporter, flushing any traces that may be buffered in memory.
  28. Close()
  29. }
  30. // ------------------------------
  31. type nullReporter struct{}
  32. // NewNullReporter creates a no-op reporter that ignores all reported spans.
  33. func NewNullReporter() Reporter {
  34. return &nullReporter{}
  35. }
  36. // Report implements Report() method of Reporter by doing nothing.
  37. func (r *nullReporter) Report(span *Span) {
  38. // no-op
  39. }
  40. // Close implements Close() method of Reporter by doing nothing.
  41. func (r *nullReporter) Close() {
  42. // no-op
  43. }
  44. // ------------------------------
  45. type loggingReporter struct {
  46. logger Logger
  47. }
  48. // NewLoggingReporter creates a reporter that logs all reported spans to provided logger.
  49. func NewLoggingReporter(logger Logger) Reporter {
  50. return &loggingReporter{logger}
  51. }
  52. // Report implements Report() method of Reporter by logging the span to the logger.
  53. func (r *loggingReporter) Report(span *Span) {
  54. r.logger.Infof("Reporting span %+v", span)
  55. }
  56. // Close implements Close() method of Reporter by doing nothing.
  57. func (r *loggingReporter) Close() {
  58. // no-op
  59. }
  60. // ------------------------------
  61. // InMemoryReporter is used for testing, and simply collects spans in memory.
  62. type InMemoryReporter struct {
  63. spans []opentracing.Span
  64. lock sync.Mutex
  65. }
  66. // NewInMemoryReporter creates a reporter that stores spans in memory.
  67. // NOTE: the Tracer should be created with options.PoolSpans = false.
  68. func NewInMemoryReporter() *InMemoryReporter {
  69. return &InMemoryReporter{
  70. spans: make([]opentracing.Span, 0, 10),
  71. }
  72. }
  73. // Report implements Report() method of Reporter by storing the span in the buffer.
  74. func (r *InMemoryReporter) Report(span *Span) {
  75. r.lock.Lock()
  76. r.spans = append(r.spans, span)
  77. r.lock.Unlock()
  78. }
  79. // Close implements Close() method of Reporter by doing nothing.
  80. func (r *InMemoryReporter) Close() {
  81. // no-op
  82. }
  83. // SpansSubmitted returns the number of spans accumulated in the buffer.
  84. func (r *InMemoryReporter) SpansSubmitted() int {
  85. r.lock.Lock()
  86. defer r.lock.Unlock()
  87. return len(r.spans)
  88. }
  89. // GetSpans returns accumulated spans as a copy of the buffer.
  90. func (r *InMemoryReporter) GetSpans() []opentracing.Span {
  91. r.lock.Lock()
  92. defer r.lock.Unlock()
  93. copied := make([]opentracing.Span, len(r.spans))
  94. copy(copied, r.spans)
  95. return copied
  96. }
  97. // Reset clears all accumulated spans.
  98. func (r *InMemoryReporter) Reset() {
  99. r.lock.Lock()
  100. defer r.lock.Unlock()
  101. r.spans = nil
  102. }
  103. // ------------------------------
  104. type compositeReporter struct {
  105. reporters []Reporter
  106. }
  107. // NewCompositeReporter creates a reporter that ignores all reported spans.
  108. func NewCompositeReporter(reporters ...Reporter) Reporter {
  109. return &compositeReporter{reporters: reporters}
  110. }
  111. // Report implements Report() method of Reporter by delegating to each underlying reporter.
  112. func (r *compositeReporter) Report(span *Span) {
  113. for _, reporter := range r.reporters {
  114. reporter.Report(span)
  115. }
  116. }
  117. // Close implements Close() method of Reporter by closing each underlying reporter.
  118. func (r *compositeReporter) Close() {
  119. for _, reporter := range r.reporters {
  120. reporter.Close()
  121. }
  122. }
  123. // ------------- REMOTE REPORTER -----------------
  124. type reporterQueueItemType int
  125. const (
  126. defaultQueueSize = 100
  127. defaultBufferFlushInterval = 1 * time.Second
  128. reporterQueueItemSpan reporterQueueItemType = iota
  129. reporterQueueItemClose
  130. )
  131. type reporterQueueItem struct {
  132. itemType reporterQueueItemType
  133. span *Span
  134. close *sync.WaitGroup
  135. }
  136. type remoteReporter struct {
  137. // These fields must be first in the struct because `sync/atomic` expects 64-bit alignment.
  138. // Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq
  139. queueLength int64
  140. closed int64 // 0 - not closed, 1 - closed
  141. reporterOptions
  142. sender Transport
  143. queue chan reporterQueueItem
  144. }
  145. // NewRemoteReporter creates a new reporter that sends spans out of process by means of Sender.
  146. // Calls to Report(Span) return immediately (side effect: if internal buffer is full the span is dropped).
  147. // Periodically the transport buffer is flushed even if it hasn't reached max packet size.
  148. // Calls to Close() block until all spans reported prior to the call to Close are flushed.
  149. func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter {
  150. options := reporterOptions{}
  151. for _, option := range opts {
  152. option(&options)
  153. }
  154. if options.bufferFlushInterval <= 0 {
  155. options.bufferFlushInterval = defaultBufferFlushInterval
  156. }
  157. if options.logger == nil {
  158. options.logger = log.NullLogger
  159. }
  160. if options.metrics == nil {
  161. options.metrics = NewNullMetrics()
  162. }
  163. if options.queueSize <= 0 {
  164. options.queueSize = defaultQueueSize
  165. }
  166. reporter := &remoteReporter{
  167. reporterOptions: options,
  168. sender: sender,
  169. queue: make(chan reporterQueueItem, options.queueSize),
  170. }
  171. go reporter.processQueue()
  172. return reporter
  173. }
  174. // Report implements Report() method of Reporter.
  175. // It passes the span to a background go-routine for submission to Jaeger backend.
  176. // If the internal queue is full, the span is dropped and metrics.ReporterDropped counter is incremented.
  177. // If Report() is called after the reporter has been Close()-ed, the additional spans will not be
  178. // sent to the backend, but the metrics.ReporterDropped counter may not reflect them correctly,
  179. // because some of them may still be successfully added to the queue.
  180. func (r *remoteReporter) Report(span *Span) {
  181. select {
  182. case r.queue <- reporterQueueItem{itemType: reporterQueueItemSpan, span: span}:
  183. atomic.AddInt64(&r.queueLength, 1)
  184. default:
  185. r.metrics.ReporterDropped.Inc(1)
  186. }
  187. }
  188. // Close implements Close() method of Reporter by waiting for the queue to be drained.
  189. func (r *remoteReporter) Close() {
  190. if swapped := atomic.CompareAndSwapInt64(&r.closed, 0, 1); !swapped {
  191. r.logger.Error("Repeated attempt to close the reporter is ignored")
  192. return
  193. }
  194. r.sendCloseEvent()
  195. r.sender.Close()
  196. }
  197. func (r *remoteReporter) sendCloseEvent() {
  198. wg := &sync.WaitGroup{}
  199. wg.Add(1)
  200. item := reporterQueueItem{itemType: reporterQueueItemClose, close: wg}
  201. r.queue <- item // if the queue is full we will block until there is space
  202. atomic.AddInt64(&r.queueLength, 1)
  203. wg.Wait()
  204. }
  205. // processQueue reads spans from the queue, converts them to Thrift, and stores them in an internal buffer.
  206. // When the buffer length reaches batchSize, it is flushed by submitting the accumulated spans to Jaeger.
  207. // Buffer also gets flushed automatically every batchFlushInterval seconds, just in case the tracer stopped
  208. // reporting new spans.
  209. func (r *remoteReporter) processQueue() {
  210. // flush causes the Sender to flush its accumulated spans and clear the buffer
  211. flush := func() {
  212. if flushed, err := r.sender.Flush(); err != nil {
  213. r.metrics.ReporterFailure.Inc(int64(flushed))
  214. r.logger.Error(fmt.Sprintf("error when flushing the buffer: %s", err.Error()))
  215. } else if flushed > 0 {
  216. r.metrics.ReporterSuccess.Inc(int64(flushed))
  217. }
  218. }
  219. timer := time.NewTicker(r.bufferFlushInterval)
  220. for {
  221. select {
  222. case <-timer.C:
  223. flush()
  224. case item := <-r.queue:
  225. atomic.AddInt64(&r.queueLength, -1)
  226. switch item.itemType {
  227. case reporterQueueItemSpan:
  228. span := item.span
  229. if flushed, err := r.sender.Append(span); err != nil {
  230. r.metrics.ReporterFailure.Inc(int64(flushed))
  231. r.logger.Error(fmt.Sprintf("error reporting span %q: %s", span.OperationName(), err.Error()))
  232. } else if flushed > 0 {
  233. r.metrics.ReporterSuccess.Inc(int64(flushed))
  234. // to reduce the number of gauge stats, we only emit queue length on flush
  235. r.metrics.ReporterQueueLength.Update(atomic.LoadInt64(&r.queueLength))
  236. }
  237. case reporterQueueItemClose:
  238. timer.Stop()
  239. flush()
  240. item.close.Done()
  241. return
  242. }
  243. }
  244. }
  245. }