tracer.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. // Copyright (c) 2017-2018 Uber Technologies, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package jaeger
  15. import (
  16. "fmt"
  17. "io"
  18. "math/rand"
  19. "os"
  20. "reflect"
  21. "strconv"
  22. "sync"
  23. "time"
  24. "github.com/opentracing/opentracing-go"
  25. "github.com/opentracing/opentracing-go/ext"
  26. "github.com/uber/jaeger-client-go/internal/baggage"
  27. "github.com/uber/jaeger-client-go/internal/throttler"
  28. "github.com/uber/jaeger-client-go/log"
  29. "github.com/uber/jaeger-client-go/utils"
  30. )
  31. // Tracer implements opentracing.Tracer.
  32. type Tracer struct {
  33. serviceName string
  34. hostIPv4 uint32 // this is for zipkin endpoint conversion
  35. sampler Sampler
  36. reporter Reporter
  37. metrics Metrics
  38. logger log.Logger
  39. timeNow func() time.Time
  40. randomNumber func() uint64
  41. options struct {
  42. poolSpans bool
  43. gen128Bit bool // whether to generate 128bit trace IDs
  44. zipkinSharedRPCSpan bool
  45. highTraceIDGenerator func() uint64 // custom high trace ID generator
  46. maxTagValueLength int
  47. // more options to come
  48. }
  49. // pool for Span objects
  50. spanPool sync.Pool
  51. injectors map[interface{}]Injector
  52. extractors map[interface{}]Extractor
  53. observer compositeObserver
  54. tags []Tag
  55. process Process
  56. baggageRestrictionManager baggage.RestrictionManager
  57. baggageSetter *baggageSetter
  58. debugThrottler throttler.Throttler
  59. }
  60. // NewTracer creates Tracer implementation that reports tracing to Jaeger.
  61. // The returned io.Closer can be used in shutdown hooks to ensure that the internal
  62. // queue of the Reporter is drained and all buffered spans are submitted to collectors.
  63. func NewTracer(
  64. serviceName string,
  65. sampler Sampler,
  66. reporter Reporter,
  67. options ...TracerOption,
  68. ) (opentracing.Tracer, io.Closer) {
  69. t := &Tracer{
  70. serviceName: serviceName,
  71. sampler: sampler,
  72. reporter: reporter,
  73. injectors: make(map[interface{}]Injector),
  74. extractors: make(map[interface{}]Extractor),
  75. metrics: *NewNullMetrics(),
  76. spanPool: sync.Pool{New: func() interface{} {
  77. return &Span{}
  78. }},
  79. }
  80. for _, option := range options {
  81. option(t)
  82. }
  83. // register default injectors/extractors unless they are already provided via options
  84. textPropagator := newTextMapPropagator(getDefaultHeadersConfig(), t.metrics)
  85. t.addCodec(opentracing.TextMap, textPropagator, textPropagator)
  86. httpHeaderPropagator := newHTTPHeaderPropagator(getDefaultHeadersConfig(), t.metrics)
  87. t.addCodec(opentracing.HTTPHeaders, httpHeaderPropagator, httpHeaderPropagator)
  88. binaryPropagator := newBinaryPropagator(t)
  89. t.addCodec(opentracing.Binary, binaryPropagator, binaryPropagator)
  90. // TODO remove after TChannel supports OpenTracing
  91. interopPropagator := &jaegerTraceContextPropagator{tracer: t}
  92. t.addCodec(SpanContextFormat, interopPropagator, interopPropagator)
  93. zipkinPropagator := &zipkinPropagator{tracer: t}
  94. t.addCodec(ZipkinSpanFormat, zipkinPropagator, zipkinPropagator)
  95. if t.baggageRestrictionManager != nil {
  96. t.baggageSetter = newBaggageSetter(t.baggageRestrictionManager, &t.metrics)
  97. } else {
  98. t.baggageSetter = newBaggageSetter(baggage.NewDefaultRestrictionManager(0), &t.metrics)
  99. }
  100. if t.debugThrottler == nil {
  101. t.debugThrottler = throttler.DefaultThrottler{}
  102. }
  103. if t.randomNumber == nil {
  104. seedGenerator := utils.NewRand(time.Now().UnixNano())
  105. pool := sync.Pool{
  106. New: func() interface{} {
  107. return rand.NewSource(seedGenerator.Int63())
  108. },
  109. }
  110. t.randomNumber = func() uint64 {
  111. generator := pool.Get().(rand.Source)
  112. number := uint64(generator.Int63())
  113. pool.Put(generator)
  114. return number
  115. }
  116. }
  117. if t.timeNow == nil {
  118. t.timeNow = time.Now
  119. }
  120. if t.logger == nil {
  121. t.logger = log.NullLogger
  122. }
  123. // Set tracer-level tags
  124. t.tags = append(t.tags, Tag{key: JaegerClientVersionTagKey, value: JaegerClientVersion})
  125. if hostname, err := os.Hostname(); err == nil {
  126. t.tags = append(t.tags, Tag{key: TracerHostnameTagKey, value: hostname})
  127. }
  128. if ip, err := utils.HostIP(); err == nil {
  129. t.tags = append(t.tags, Tag{key: TracerIPTagKey, value: ip.String()})
  130. t.hostIPv4 = utils.PackIPAsUint32(ip)
  131. } else {
  132. t.logger.Error("Unable to determine this host's IP address: " + err.Error())
  133. }
  134. if t.options.gen128Bit {
  135. if t.options.highTraceIDGenerator == nil {
  136. t.options.highTraceIDGenerator = t.randomNumber
  137. }
  138. } else if t.options.highTraceIDGenerator != nil {
  139. t.logger.Error("Overriding high trace ID generator but not generating " +
  140. "128 bit trace IDs, consider enabling the \"Gen128Bit\" option")
  141. }
  142. if t.options.maxTagValueLength == 0 {
  143. t.options.maxTagValueLength = DefaultMaxTagValueLength
  144. }
  145. t.process = Process{
  146. Service: serviceName,
  147. UUID: strconv.FormatUint(t.randomNumber(), 16),
  148. Tags: t.tags,
  149. }
  150. if throttler, ok := t.debugThrottler.(ProcessSetter); ok {
  151. throttler.SetProcess(t.process)
  152. }
  153. return t, t
  154. }
  155. // addCodec adds registers injector and extractor for given propagation format if not already defined.
  156. func (t *Tracer) addCodec(format interface{}, injector Injector, extractor Extractor) {
  157. if _, ok := t.injectors[format]; !ok {
  158. t.injectors[format] = injector
  159. }
  160. if _, ok := t.extractors[format]; !ok {
  161. t.extractors[format] = extractor
  162. }
  163. }
  164. // StartSpan implements StartSpan() method of opentracing.Tracer.
  165. func (t *Tracer) StartSpan(
  166. operationName string,
  167. options ...opentracing.StartSpanOption,
  168. ) opentracing.Span {
  169. sso := opentracing.StartSpanOptions{}
  170. for _, o := range options {
  171. o.Apply(&sso)
  172. }
  173. return t.startSpanWithOptions(operationName, sso)
  174. }
  175. func (t *Tracer) startSpanWithOptions(
  176. operationName string,
  177. options opentracing.StartSpanOptions,
  178. ) opentracing.Span {
  179. if options.StartTime.IsZero() {
  180. options.StartTime = t.timeNow()
  181. }
  182. // Predicate whether the given span context is a valid reference
  183. // which may be used as parent / debug ID / baggage items source
  184. isValidReference := func(ctx SpanContext) bool {
  185. return ctx.IsValid() || ctx.isDebugIDContainerOnly() || len(ctx.baggage) != 0
  186. }
  187. var references []Reference
  188. var parent SpanContext
  189. var hasParent bool // need this because `parent` is a value, not reference
  190. for _, ref := range options.References {
  191. ctx, ok := ref.ReferencedContext.(SpanContext)
  192. if !ok {
  193. t.logger.Error(fmt.Sprintf(
  194. "Reference contains invalid type of SpanReference: %s",
  195. reflect.ValueOf(ref.ReferencedContext)))
  196. continue
  197. }
  198. if !isValidReference(ctx) {
  199. continue
  200. }
  201. references = append(references, Reference{Type: ref.Type, Context: ctx})
  202. if !hasParent {
  203. parent = ctx
  204. hasParent = ref.Type == opentracing.ChildOfRef
  205. }
  206. }
  207. if !hasParent && isValidReference(parent) {
  208. // If ChildOfRef wasn't found but a FollowFromRef exists, use the context from
  209. // the FollowFromRef as the parent
  210. hasParent = true
  211. }
  212. rpcServer := false
  213. if v, ok := options.Tags[ext.SpanKindRPCServer.Key]; ok {
  214. rpcServer = (v == ext.SpanKindRPCServerEnum || v == string(ext.SpanKindRPCServerEnum))
  215. }
  216. var samplerTags []Tag
  217. var ctx SpanContext
  218. newTrace := false
  219. if !hasParent || !parent.IsValid() {
  220. newTrace = true
  221. ctx.traceID.Low = t.randomID()
  222. if t.options.gen128Bit {
  223. ctx.traceID.High = t.options.highTraceIDGenerator()
  224. }
  225. ctx.spanID = SpanID(ctx.traceID.Low)
  226. ctx.parentID = 0
  227. ctx.flags = byte(0)
  228. if hasParent && parent.isDebugIDContainerOnly() && t.isDebugAllowed(operationName) {
  229. ctx.flags |= (flagSampled | flagDebug)
  230. samplerTags = []Tag{{key: JaegerDebugHeader, value: parent.debugID}}
  231. } else if sampled, tags := t.sampler.IsSampled(ctx.traceID, operationName); sampled {
  232. ctx.flags |= flagSampled
  233. samplerTags = tags
  234. }
  235. } else {
  236. ctx.traceID = parent.traceID
  237. if rpcServer && t.options.zipkinSharedRPCSpan {
  238. // Support Zipkin's one-span-per-RPC model
  239. ctx.spanID = parent.spanID
  240. ctx.parentID = parent.parentID
  241. } else {
  242. ctx.spanID = SpanID(t.randomID())
  243. ctx.parentID = parent.spanID
  244. }
  245. ctx.flags = parent.flags
  246. }
  247. if hasParent {
  248. // copy baggage items
  249. if l := len(parent.baggage); l > 0 {
  250. ctx.baggage = make(map[string]string, len(parent.baggage))
  251. for k, v := range parent.baggage {
  252. ctx.baggage[k] = v
  253. }
  254. }
  255. }
  256. sp := t.newSpan()
  257. sp.context = ctx
  258. sp.observer = t.observer.OnStartSpan(sp, operationName, options)
  259. return t.startSpanInternal(
  260. sp,
  261. operationName,
  262. options.StartTime,
  263. samplerTags,
  264. options.Tags,
  265. newTrace,
  266. rpcServer,
  267. references,
  268. )
  269. }
  270. // Inject implements Inject() method of opentracing.Tracer
  271. func (t *Tracer) Inject(ctx opentracing.SpanContext, format interface{}, carrier interface{}) error {
  272. c, ok := ctx.(SpanContext)
  273. if !ok {
  274. return opentracing.ErrInvalidSpanContext
  275. }
  276. if injector, ok := t.injectors[format]; ok {
  277. return injector.Inject(c, carrier)
  278. }
  279. return opentracing.ErrUnsupportedFormat
  280. }
  281. // Extract implements Extract() method of opentracing.Tracer
  282. func (t *Tracer) Extract(
  283. format interface{},
  284. carrier interface{},
  285. ) (opentracing.SpanContext, error) {
  286. if extractor, ok := t.extractors[format]; ok {
  287. return extractor.Extract(carrier)
  288. }
  289. return nil, opentracing.ErrUnsupportedFormat
  290. }
  291. // Close releases all resources used by the Tracer and flushes any remaining buffered spans.
  292. func (t *Tracer) Close() error {
  293. t.reporter.Close()
  294. t.sampler.Close()
  295. if mgr, ok := t.baggageRestrictionManager.(io.Closer); ok {
  296. mgr.Close()
  297. }
  298. if throttler, ok := t.debugThrottler.(io.Closer); ok {
  299. throttler.Close()
  300. }
  301. return nil
  302. }
  303. // Tags returns a slice of tracer-level tags.
  304. func (t *Tracer) Tags() []opentracing.Tag {
  305. tags := make([]opentracing.Tag, len(t.tags))
  306. for i, tag := range t.tags {
  307. tags[i] = opentracing.Tag{Key: tag.key, Value: tag.value}
  308. }
  309. return tags
  310. }
  311. // newSpan returns an instance of a clean Span object.
  312. // If options.PoolSpans is true, the spans are retrieved from an object pool.
  313. func (t *Tracer) newSpan() *Span {
  314. if !t.options.poolSpans {
  315. return &Span{}
  316. }
  317. sp := t.spanPool.Get().(*Span)
  318. sp.context = emptyContext
  319. sp.tracer = nil
  320. sp.tags = nil
  321. sp.logs = nil
  322. return sp
  323. }
  324. func (t *Tracer) startSpanInternal(
  325. sp *Span,
  326. operationName string,
  327. startTime time.Time,
  328. internalTags []Tag,
  329. tags opentracing.Tags,
  330. newTrace bool,
  331. rpcServer bool,
  332. references []Reference,
  333. ) *Span {
  334. sp.tracer = t
  335. sp.operationName = operationName
  336. sp.startTime = startTime
  337. sp.duration = 0
  338. sp.references = references
  339. sp.firstInProcess = rpcServer || sp.context.parentID == 0
  340. if len(tags) > 0 || len(internalTags) > 0 {
  341. sp.tags = make([]Tag, len(internalTags), len(tags)+len(internalTags))
  342. copy(sp.tags, internalTags)
  343. for k, v := range tags {
  344. sp.observer.OnSetTag(k, v)
  345. if k == string(ext.SamplingPriority) && !setSamplingPriority(sp, v) {
  346. continue
  347. }
  348. sp.setTagNoLocking(k, v)
  349. }
  350. }
  351. // emit metrics
  352. if sp.context.IsSampled() {
  353. t.metrics.SpansStartedSampled.Inc(1)
  354. if newTrace {
  355. // We cannot simply check for parentID==0 because in Zipkin model the
  356. // server-side RPC span has the exact same trace/span/parent IDs as the
  357. // calling client-side span, but obviously the server side span is
  358. // no longer a root span of the trace.
  359. t.metrics.TracesStartedSampled.Inc(1)
  360. } else if sp.firstInProcess {
  361. t.metrics.TracesJoinedSampled.Inc(1)
  362. }
  363. } else {
  364. t.metrics.SpansStartedNotSampled.Inc(1)
  365. if newTrace {
  366. t.metrics.TracesStartedNotSampled.Inc(1)
  367. } else if sp.firstInProcess {
  368. t.metrics.TracesJoinedNotSampled.Inc(1)
  369. }
  370. }
  371. return sp
  372. }
  373. func (t *Tracer) reportSpan(sp *Span) {
  374. t.metrics.SpansFinished.Inc(1)
  375. if sp.context.IsSampled() {
  376. t.reporter.Report(sp)
  377. }
  378. if t.options.poolSpans {
  379. t.spanPool.Put(sp)
  380. }
  381. }
  382. // randomID generates a random trace/span ID, using tracer.random() generator.
  383. // It never returns 0.
  384. func (t *Tracer) randomID() uint64 {
  385. val := t.randomNumber()
  386. for val == 0 {
  387. val = t.randomNumber()
  388. }
  389. return val
  390. }
  391. // (NB) span must hold the lock before making this call
  392. func (t *Tracer) setBaggage(sp *Span, key, value string) {
  393. t.baggageSetter.setBaggage(sp, key, value)
  394. }
  395. // (NB) span must hold the lock before making this call
  396. func (t *Tracer) isDebugAllowed(operation string) bool {
  397. return t.debugThrottler.IsAllowed(operation)
  398. }