propagation.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. // Copyright (c) 2017 Uber Technologies, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package jaeger
  15. import (
  16. "bytes"
  17. "encoding/binary"
  18. "fmt"
  19. "io"
  20. "log"
  21. "net/url"
  22. "strings"
  23. "sync"
  24. opentracing "github.com/opentracing/opentracing-go"
  25. )
  26. // Injector is responsible for injecting SpanContext instances in a manner suitable
  27. // for propagation via a format-specific "carrier" object. Typically the
  28. // injection will take place across an RPC boundary, but message queues and
  29. // other IPC mechanisms are also reasonable places to use an Injector.
  30. type Injector interface {
  31. // Inject takes `SpanContext` and injects it into `carrier`. The actual type
  32. // of `carrier` depends on the `format` passed to `Tracer.Inject()`.
  33. //
  34. // Implementations may return opentracing.ErrInvalidCarrier or any other
  35. // implementation-specific error if injection fails.
  36. Inject(ctx SpanContext, carrier interface{}) error
  37. }
  38. // Extractor is responsible for extracting SpanContext instances from a
  39. // format-specific "carrier" object. Typically the extraction will take place
  40. // on the server side of an RPC boundary, but message queues and other IPC
  41. // mechanisms are also reasonable places to use an Extractor.
  42. type Extractor interface {
  43. // Extract decodes a SpanContext instance from the given `carrier`,
  44. // or (nil, opentracing.ErrSpanContextNotFound) if no context could
  45. // be found in the `carrier`.
  46. Extract(carrier interface{}) (SpanContext, error)
  47. }
  48. type textMapPropagator struct {
  49. headerKeys *HeadersConfig
  50. metrics Metrics
  51. encodeValue func(string) string
  52. decodeValue func(string) string
  53. }
  54. func newTextMapPropagator(headerKeys *HeadersConfig, metrics Metrics) *textMapPropagator {
  55. return &textMapPropagator{
  56. headerKeys: headerKeys,
  57. metrics: metrics,
  58. encodeValue: func(val string) string {
  59. return val
  60. },
  61. decodeValue: func(val string) string {
  62. return val
  63. },
  64. }
  65. }
  66. func newHTTPHeaderPropagator(headerKeys *HeadersConfig, metrics Metrics) *textMapPropagator {
  67. return &textMapPropagator{
  68. headerKeys: headerKeys,
  69. metrics: metrics,
  70. encodeValue: func(val string) string {
  71. return url.QueryEscape(val)
  72. },
  73. decodeValue: func(val string) string {
  74. // ignore decoding errors, cannot do anything about them
  75. if v, err := url.QueryUnescape(val); err == nil {
  76. return v
  77. }
  78. return val
  79. },
  80. }
  81. }
  82. type binaryPropagator struct {
  83. tracer *Tracer
  84. buffers sync.Pool
  85. }
  86. func newBinaryPropagator(tracer *Tracer) *binaryPropagator {
  87. return &binaryPropagator{
  88. tracer: tracer,
  89. buffers: sync.Pool{New: func() interface{} { return &bytes.Buffer{} }},
  90. }
  91. }
  92. func (p *textMapPropagator) Inject(
  93. sc SpanContext,
  94. abstractCarrier interface{},
  95. ) error {
  96. textMapWriter, ok := abstractCarrier.(opentracing.TextMapWriter)
  97. if !ok {
  98. return opentracing.ErrInvalidCarrier
  99. }
  100. // Do not encode the string with trace context to avoid accidental double-encoding
  101. // if people are using opentracing < 0.10.0. Our colon-separated representation
  102. // of the trace context is already safe for HTTP headers.
  103. textMapWriter.Set(p.headerKeys.TraceContextHeaderName, sc.String())
  104. for k, v := range sc.baggage {
  105. safeKey := p.addBaggageKeyPrefix(k)
  106. safeVal := p.encodeValue(v)
  107. textMapWriter.Set(safeKey, safeVal)
  108. }
  109. return nil
  110. }
  111. func (p *textMapPropagator) Extract(abstractCarrier interface{}) (SpanContext, error) {
  112. textMapReader, ok := abstractCarrier.(opentracing.TextMapReader)
  113. if !ok {
  114. return emptyContext, opentracing.ErrInvalidCarrier
  115. }
  116. var ctx SpanContext
  117. var baggage map[string]string
  118. err := textMapReader.ForeachKey(func(rawKey, value string) error {
  119. key := strings.ToLower(rawKey) // TODO not necessary for plain TextMap
  120. if key == p.headerKeys.TraceContextHeaderName {
  121. var err error
  122. safeVal := p.decodeValue(value)
  123. if ctx, err = ContextFromString(safeVal); err != nil {
  124. return err
  125. }
  126. } else if key == p.headerKeys.JaegerDebugHeader {
  127. ctx.debugID = p.decodeValue(value)
  128. } else if key == p.headerKeys.JaegerBaggageHeader {
  129. if baggage == nil {
  130. baggage = make(map[string]string)
  131. }
  132. for k, v := range p.parseCommaSeparatedMap(value) {
  133. baggage[k] = v
  134. }
  135. } else if strings.HasPrefix(key, p.headerKeys.TraceBaggageHeaderPrefix) {
  136. if baggage == nil {
  137. baggage = make(map[string]string)
  138. }
  139. safeKey := p.removeBaggageKeyPrefix(key)
  140. safeVal := p.decodeValue(value)
  141. baggage[safeKey] = safeVal
  142. }
  143. return nil
  144. })
  145. if err != nil {
  146. p.metrics.DecodingErrors.Inc(1)
  147. return emptyContext, err
  148. }
  149. if !ctx.traceID.IsValid() && ctx.debugID == "" && len(baggage) == 0 {
  150. return emptyContext, opentracing.ErrSpanContextNotFound
  151. }
  152. ctx.baggage = baggage
  153. return ctx, nil
  154. }
  155. func (p *binaryPropagator) Inject(
  156. sc SpanContext,
  157. abstractCarrier interface{},
  158. ) error {
  159. carrier, ok := abstractCarrier.(io.Writer)
  160. if !ok {
  161. return opentracing.ErrInvalidCarrier
  162. }
  163. // Handle the tracer context
  164. if err := binary.Write(carrier, binary.BigEndian, sc.traceID); err != nil {
  165. return err
  166. }
  167. if err := binary.Write(carrier, binary.BigEndian, sc.spanID); err != nil {
  168. return err
  169. }
  170. if err := binary.Write(carrier, binary.BigEndian, sc.parentID); err != nil {
  171. return err
  172. }
  173. if err := binary.Write(carrier, binary.BigEndian, sc.flags); err != nil {
  174. return err
  175. }
  176. // Handle the baggage items
  177. if err := binary.Write(carrier, binary.BigEndian, int32(len(sc.baggage))); err != nil {
  178. return err
  179. }
  180. for k, v := range sc.baggage {
  181. if err := binary.Write(carrier, binary.BigEndian, int32(len(k))); err != nil {
  182. return err
  183. }
  184. io.WriteString(carrier, k)
  185. if err := binary.Write(carrier, binary.BigEndian, int32(len(v))); err != nil {
  186. return err
  187. }
  188. io.WriteString(carrier, v)
  189. }
  190. return nil
  191. }
  192. func (p *binaryPropagator) Extract(abstractCarrier interface{}) (SpanContext, error) {
  193. carrier, ok := abstractCarrier.(io.Reader)
  194. if !ok {
  195. return emptyContext, opentracing.ErrInvalidCarrier
  196. }
  197. var ctx SpanContext
  198. if err := binary.Read(carrier, binary.BigEndian, &ctx.traceID); err != nil {
  199. return emptyContext, opentracing.ErrSpanContextCorrupted
  200. }
  201. if err := binary.Read(carrier, binary.BigEndian, &ctx.spanID); err != nil {
  202. return emptyContext, opentracing.ErrSpanContextCorrupted
  203. }
  204. if err := binary.Read(carrier, binary.BigEndian, &ctx.parentID); err != nil {
  205. return emptyContext, opentracing.ErrSpanContextCorrupted
  206. }
  207. if err := binary.Read(carrier, binary.BigEndian, &ctx.flags); err != nil {
  208. return emptyContext, opentracing.ErrSpanContextCorrupted
  209. }
  210. // Handle the baggage items
  211. var numBaggage int32
  212. if err := binary.Read(carrier, binary.BigEndian, &numBaggage); err != nil {
  213. return emptyContext, opentracing.ErrSpanContextCorrupted
  214. }
  215. if iNumBaggage := int(numBaggage); iNumBaggage > 0 {
  216. ctx.baggage = make(map[string]string, iNumBaggage)
  217. buf := p.buffers.Get().(*bytes.Buffer)
  218. defer p.buffers.Put(buf)
  219. var keyLen, valLen int32
  220. for i := 0; i < iNumBaggage; i++ {
  221. if err := binary.Read(carrier, binary.BigEndian, &keyLen); err != nil {
  222. return emptyContext, opentracing.ErrSpanContextCorrupted
  223. }
  224. buf.Reset()
  225. buf.Grow(int(keyLen))
  226. if n, err := io.CopyN(buf, carrier, int64(keyLen)); err != nil || int32(n) != keyLen {
  227. return emptyContext, opentracing.ErrSpanContextCorrupted
  228. }
  229. key := buf.String()
  230. if err := binary.Read(carrier, binary.BigEndian, &valLen); err != nil {
  231. return emptyContext, opentracing.ErrSpanContextCorrupted
  232. }
  233. buf.Reset()
  234. buf.Grow(int(valLen))
  235. if n, err := io.CopyN(buf, carrier, int64(valLen)); err != nil || int32(n) != valLen {
  236. return emptyContext, opentracing.ErrSpanContextCorrupted
  237. }
  238. ctx.baggage[key] = buf.String()
  239. }
  240. }
  241. return ctx, nil
  242. }
  243. // Converts a comma separated key value pair list into a map
  244. // e.g. key1=value1, key2=value2, key3 = value3
  245. // is converted to map[string]string { "key1" : "value1",
  246. // "key2" : "value2",
  247. // "key3" : "value3" }
  248. func (p *textMapPropagator) parseCommaSeparatedMap(value string) map[string]string {
  249. baggage := make(map[string]string)
  250. value, err := url.QueryUnescape(value)
  251. if err != nil {
  252. log.Printf("Unable to unescape %s, %v", value, err)
  253. return baggage
  254. }
  255. for _, kvpair := range strings.Split(value, ",") {
  256. kv := strings.Split(strings.TrimSpace(kvpair), "=")
  257. if len(kv) == 2 {
  258. baggage[kv[0]] = kv[1]
  259. } else {
  260. log.Printf("Malformed value passed in for %s", p.headerKeys.JaegerBaggageHeader)
  261. }
  262. }
  263. return baggage
  264. }
  265. // Converts a baggage item key into an http header format,
  266. // by prepending TraceBaggageHeaderPrefix and encoding the key string
  267. func (p *textMapPropagator) addBaggageKeyPrefix(key string) string {
  268. // TODO encodeBaggageKeyAsHeader add caching and escaping
  269. return fmt.Sprintf("%v%v", p.headerKeys.TraceBaggageHeaderPrefix, key)
  270. }
  271. func (p *textMapPropagator) removeBaggageKeyPrefix(key string) string {
  272. // TODO decodeBaggageHeaderKey add caching and escaping
  273. return key[len(p.headerKeys.TraceBaggageHeaderPrefix):]
  274. }