zipkin_thrift_span.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. // Copyright (c) 2017 Uber Technologies, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package jaeger
  15. import (
  16. "encoding/binary"
  17. "fmt"
  18. "time"
  19. "github.com/opentracing/opentracing-go/ext"
  20. "github.com/uber/jaeger-client-go/internal/spanlog"
  21. z "github.com/uber/jaeger-client-go/thrift-gen/zipkincore"
  22. "github.com/uber/jaeger-client-go/utils"
  23. )
  24. const (
  25. // Zipkin UI does not work well with non-string tag values
  26. allowPackedNumbers = false
  27. )
  28. var specialTagHandlers = map[string]func(*zipkinSpan, interface{}){
  29. string(ext.SpanKind): setSpanKind,
  30. string(ext.PeerHostIPv4): setPeerIPv4,
  31. string(ext.PeerPort): setPeerPort,
  32. string(ext.PeerService): setPeerService,
  33. TracerIPTagKey: removeTag,
  34. }
  35. // BuildZipkinThrift builds thrift span based on internal span.
  36. func BuildZipkinThrift(s *Span) *z.Span {
  37. span := &zipkinSpan{Span: s}
  38. span.handleSpecialTags()
  39. parentID := int64(span.context.parentID)
  40. var ptrParentID *int64
  41. if parentID != 0 {
  42. ptrParentID = &parentID
  43. }
  44. timestamp := utils.TimeToMicrosecondsSinceEpochInt64(span.startTime)
  45. duration := span.duration.Nanoseconds() / int64(time.Microsecond)
  46. endpoint := &z.Endpoint{
  47. ServiceName: span.tracer.serviceName,
  48. Ipv4: int32(span.tracer.hostIPv4)}
  49. thriftSpan := &z.Span{
  50. TraceID: int64(span.context.traceID.Low), // TODO upgrade zipkin thrift and use TraceIdHigh
  51. ID: int64(span.context.spanID),
  52. ParentID: ptrParentID,
  53. Name: span.operationName,
  54. Timestamp: &timestamp,
  55. Duration: &duration,
  56. Debug: span.context.IsDebug(),
  57. Annotations: buildAnnotations(span, endpoint),
  58. BinaryAnnotations: buildBinaryAnnotations(span, endpoint)}
  59. return thriftSpan
  60. }
  61. func buildAnnotations(span *zipkinSpan, endpoint *z.Endpoint) []*z.Annotation {
  62. // automatically adding 2 Zipkin CoreAnnotations
  63. annotations := make([]*z.Annotation, 0, 2+len(span.logs))
  64. var startLabel, endLabel string
  65. if span.spanKind == string(ext.SpanKindRPCClientEnum) {
  66. startLabel, endLabel = z.CLIENT_SEND, z.CLIENT_RECV
  67. } else if span.spanKind == string(ext.SpanKindRPCServerEnum) {
  68. startLabel, endLabel = z.SERVER_RECV, z.SERVER_SEND
  69. }
  70. if !span.startTime.IsZero() && startLabel != "" {
  71. start := &z.Annotation{
  72. Timestamp: utils.TimeToMicrosecondsSinceEpochInt64(span.startTime),
  73. Value: startLabel,
  74. Host: endpoint}
  75. annotations = append(annotations, start)
  76. if span.duration != 0 {
  77. endTs := span.startTime.Add(span.duration)
  78. end := &z.Annotation{
  79. Timestamp: utils.TimeToMicrosecondsSinceEpochInt64(endTs),
  80. Value: endLabel,
  81. Host: endpoint}
  82. annotations = append(annotations, end)
  83. }
  84. }
  85. for _, log := range span.logs {
  86. anno := &z.Annotation{
  87. Timestamp: utils.TimeToMicrosecondsSinceEpochInt64(log.Timestamp),
  88. Host: endpoint}
  89. if content, err := spanlog.MaterializeWithJSON(log.Fields); err == nil {
  90. anno.Value = truncateString(string(content), span.tracer.options.maxTagValueLength)
  91. } else {
  92. anno.Value = err.Error()
  93. }
  94. annotations = append(annotations, anno)
  95. }
  96. return annotations
  97. }
  98. func buildBinaryAnnotations(span *zipkinSpan, endpoint *z.Endpoint) []*z.BinaryAnnotation {
  99. // automatically adding local component or server/client address tag, and client version
  100. annotations := make([]*z.BinaryAnnotation, 0, 2+len(span.tags))
  101. if span.peerDefined() && span.isRPC() {
  102. peer := z.Endpoint{
  103. Ipv4: span.peer.Ipv4,
  104. Port: span.peer.Port,
  105. ServiceName: span.peer.ServiceName}
  106. label := z.CLIENT_ADDR
  107. if span.isRPCClient() {
  108. label = z.SERVER_ADDR
  109. }
  110. anno := &z.BinaryAnnotation{
  111. Key: label,
  112. Value: []byte{1},
  113. AnnotationType: z.AnnotationType_BOOL,
  114. Host: &peer}
  115. annotations = append(annotations, anno)
  116. }
  117. if !span.isRPC() {
  118. componentName := endpoint.ServiceName
  119. for _, tag := range span.tags {
  120. if tag.key == string(ext.Component) {
  121. componentName = stringify(tag.value)
  122. break
  123. }
  124. }
  125. local := &z.BinaryAnnotation{
  126. Key: z.LOCAL_COMPONENT,
  127. Value: []byte(componentName),
  128. AnnotationType: z.AnnotationType_STRING,
  129. Host: endpoint}
  130. annotations = append(annotations, local)
  131. }
  132. for _, tag := range span.tags {
  133. // "Special tags" are already handled by this point, we'd be double reporting the
  134. // tags if we don't skip here
  135. if _, ok := specialTagHandlers[tag.key]; ok {
  136. continue
  137. }
  138. if anno := buildBinaryAnnotation(tag.key, tag.value, span.tracer.options.maxTagValueLength, nil); anno != nil {
  139. annotations = append(annotations, anno)
  140. }
  141. }
  142. return annotations
  143. }
  144. func buildBinaryAnnotation(key string, val interface{}, maxTagValueLength int, endpoint *z.Endpoint) *z.BinaryAnnotation {
  145. bann := &z.BinaryAnnotation{Key: key, Host: endpoint}
  146. if value, ok := val.(string); ok {
  147. bann.Value = []byte(truncateString(value, maxTagValueLength))
  148. bann.AnnotationType = z.AnnotationType_STRING
  149. } else if value, ok := val.([]byte); ok {
  150. if len(value) > maxTagValueLength {
  151. value = value[:maxTagValueLength]
  152. }
  153. bann.Value = value
  154. bann.AnnotationType = z.AnnotationType_BYTES
  155. } else if value, ok := val.(int32); ok && allowPackedNumbers {
  156. bann.Value = int32ToBytes(value)
  157. bann.AnnotationType = z.AnnotationType_I32
  158. } else if value, ok := val.(int64); ok && allowPackedNumbers {
  159. bann.Value = int64ToBytes(value)
  160. bann.AnnotationType = z.AnnotationType_I64
  161. } else if value, ok := val.(int); ok && allowPackedNumbers {
  162. bann.Value = int64ToBytes(int64(value))
  163. bann.AnnotationType = z.AnnotationType_I64
  164. } else if value, ok := val.(bool); ok {
  165. bann.Value = []byte{boolToByte(value)}
  166. bann.AnnotationType = z.AnnotationType_BOOL
  167. } else {
  168. value := stringify(val)
  169. bann.Value = []byte(truncateString(value, maxTagValueLength))
  170. bann.AnnotationType = z.AnnotationType_STRING
  171. }
  172. return bann
  173. }
  174. func stringify(value interface{}) string {
  175. if s, ok := value.(string); ok {
  176. return s
  177. }
  178. return fmt.Sprintf("%+v", value)
  179. }
  180. func truncateString(value string, maxLength int) string {
  181. // we ignore the problem of utf8 runes possibly being sliced in the middle,
  182. // as it is rather expensive to iterate through each tag just to find rune
  183. // boundaries.
  184. if len(value) > maxLength {
  185. return value[:maxLength]
  186. }
  187. return value
  188. }
  189. func boolToByte(b bool) byte {
  190. if b {
  191. return 1
  192. }
  193. return 0
  194. }
  195. // int32ToBytes converts int32 to bytes.
  196. func int32ToBytes(i int32) []byte {
  197. buf := make([]byte, 4)
  198. binary.BigEndian.PutUint32(buf, uint32(i))
  199. return buf
  200. }
  201. // int64ToBytes converts int64 to bytes.
  202. func int64ToBytes(i int64) []byte {
  203. buf := make([]byte, 8)
  204. binary.BigEndian.PutUint64(buf, uint64(i))
  205. return buf
  206. }
  207. type zipkinSpan struct {
  208. *Span
  209. // peer points to the peer service participating in this span,
  210. // e.g. the Client if this span is a server span,
  211. // or Server if this span is a client span
  212. peer struct {
  213. Ipv4 int32
  214. Port int16
  215. ServiceName string
  216. }
  217. // used to distinguish local vs. RPC Server vs. RPC Client spans
  218. spanKind string
  219. }
  220. func (s *zipkinSpan) handleSpecialTags() {
  221. s.Lock()
  222. defer s.Unlock()
  223. if s.firstInProcess {
  224. // append the process tags
  225. s.tags = append(s.tags, s.tracer.tags...)
  226. }
  227. filteredTags := make([]Tag, 0, len(s.tags))
  228. for _, tag := range s.tags {
  229. if handler, ok := specialTagHandlers[tag.key]; ok {
  230. handler(s, tag.value)
  231. } else {
  232. filteredTags = append(filteredTags, tag)
  233. }
  234. }
  235. s.tags = filteredTags
  236. }
  237. func setSpanKind(s *zipkinSpan, value interface{}) {
  238. if val, ok := value.(string); ok {
  239. s.spanKind = val
  240. return
  241. }
  242. if val, ok := value.(ext.SpanKindEnum); ok {
  243. s.spanKind = string(val)
  244. }
  245. }
  246. func setPeerIPv4(s *zipkinSpan, value interface{}) {
  247. if val, ok := value.(string); ok {
  248. if ip, err := utils.ParseIPToUint32(val); err == nil {
  249. s.peer.Ipv4 = int32(ip)
  250. return
  251. }
  252. }
  253. if val, ok := value.(uint32); ok {
  254. s.peer.Ipv4 = int32(val)
  255. return
  256. }
  257. if val, ok := value.(int32); ok {
  258. s.peer.Ipv4 = val
  259. }
  260. }
  261. func setPeerPort(s *zipkinSpan, value interface{}) {
  262. if val, ok := value.(string); ok {
  263. if port, err := utils.ParsePort(val); err == nil {
  264. s.peer.Port = int16(port)
  265. return
  266. }
  267. }
  268. if val, ok := value.(uint16); ok {
  269. s.peer.Port = int16(val)
  270. return
  271. }
  272. if val, ok := value.(int); ok {
  273. s.peer.Port = int16(val)
  274. }
  275. }
  276. func setPeerService(s *zipkinSpan, value interface{}) {
  277. if val, ok := value.(string); ok {
  278. s.peer.ServiceName = val
  279. }
  280. }
  281. func removeTag(s *zipkinSpan, value interface{}) {}
  282. func (s *zipkinSpan) peerDefined() bool {
  283. return s.peer.ServiceName != "" || s.peer.Ipv4 != 0 || s.peer.Port != 0
  284. }
  285. func (s *zipkinSpan) isRPC() bool {
  286. s.RLock()
  287. defer s.RUnlock()
  288. return s.spanKind == string(ext.SpanKindRPCClientEnum) || s.spanKind == string(ext.SpanKindRPCServerEnum)
  289. }
  290. func (s *zipkinSpan) isRPCClient() bool {
  291. s.RLock()
  292. defer s.RUnlock()
  293. return s.spanKind == string(ext.SpanKindRPCClientEnum)
  294. }