observer.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. // Copyright (c) 2017 Uber Technologies, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rpcmetrics
  15. import (
  16. "strconv"
  17. "sync"
  18. "time"
  19. "github.com/opentracing/opentracing-go"
  20. "github.com/opentracing/opentracing-go/ext"
  21. "github.com/uber/jaeger-lib/metrics"
  22. jaeger "github.com/uber/jaeger-client-go"
  23. )
  24. const defaultMaxNumberOfEndpoints = 200
  25. // Observer is an observer that can emit RPC metrics.
  26. type Observer struct {
  27. metricsByEndpoint *MetricsByEndpoint
  28. }
  29. // NewObserver creates a new observer that can emit RPC metrics.
  30. func NewObserver(metricsFactory metrics.Factory, normalizer NameNormalizer) *Observer {
  31. return &Observer{
  32. metricsByEndpoint: newMetricsByEndpoint(
  33. metricsFactory,
  34. normalizer,
  35. defaultMaxNumberOfEndpoints,
  36. ),
  37. }
  38. }
  39. // OnStartSpan creates a new Observer for the span.
  40. func (o *Observer) OnStartSpan(
  41. operationName string,
  42. options opentracing.StartSpanOptions,
  43. ) jaeger.SpanObserver {
  44. return NewSpanObserver(o.metricsByEndpoint, operationName, options)
  45. }
  46. // SpanKind identifies the span as inboud, outbound, or internal
  47. type SpanKind int
  48. const (
  49. // Local span kind
  50. Local SpanKind = iota
  51. // Inbound span kind
  52. Inbound
  53. // Outbound span kind
  54. Outbound
  55. )
  56. // SpanObserver collects RPC metrics
  57. type SpanObserver struct {
  58. metricsByEndpoint *MetricsByEndpoint
  59. operationName string
  60. startTime time.Time
  61. mux sync.Mutex
  62. kind SpanKind
  63. httpStatusCode uint16
  64. err bool
  65. }
  66. // NewSpanObserver creates a new SpanObserver that can emit RPC metrics.
  67. func NewSpanObserver(
  68. metricsByEndpoint *MetricsByEndpoint,
  69. operationName string,
  70. options opentracing.StartSpanOptions,
  71. ) *SpanObserver {
  72. so := &SpanObserver{
  73. metricsByEndpoint: metricsByEndpoint,
  74. operationName: operationName,
  75. startTime: options.StartTime,
  76. }
  77. for k, v := range options.Tags {
  78. so.handleTagInLock(k, v)
  79. }
  80. return so
  81. }
  82. // handleTags watches for special tags
  83. // - SpanKind
  84. // - HttpStatusCode
  85. // - Error
  86. func (so *SpanObserver) handleTagInLock(key string, value interface{}) {
  87. if key == string(ext.SpanKind) {
  88. if v, ok := value.(ext.SpanKindEnum); ok {
  89. value = string(v)
  90. }
  91. if v, ok := value.(string); ok {
  92. if v == string(ext.SpanKindRPCClientEnum) {
  93. so.kind = Outbound
  94. } else if v == string(ext.SpanKindRPCServerEnum) {
  95. so.kind = Inbound
  96. }
  97. }
  98. return
  99. }
  100. if key == string(ext.HTTPStatusCode) {
  101. if v, ok := value.(uint16); ok {
  102. so.httpStatusCode = v
  103. } else if v, ok := value.(int); ok {
  104. so.httpStatusCode = uint16(v)
  105. } else if v, ok := value.(string); ok {
  106. if vv, err := strconv.Atoi(v); err == nil {
  107. so.httpStatusCode = uint16(vv)
  108. }
  109. }
  110. return
  111. }
  112. if key == string(ext.Error) {
  113. if v, ok := value.(bool); ok {
  114. so.err = v
  115. } else if v, ok := value.(string); ok {
  116. if vv, err := strconv.ParseBool(v); err == nil {
  117. so.err = vv
  118. }
  119. }
  120. return
  121. }
  122. }
  123. // OnFinish emits the RPC metrics. It only has an effect when operation name
  124. // is not blank, and the span kind is an RPC server.
  125. func (so *SpanObserver) OnFinish(options opentracing.FinishOptions) {
  126. so.mux.Lock()
  127. defer so.mux.Unlock()
  128. if so.operationName == "" || so.kind != Inbound {
  129. return
  130. }
  131. mets := so.metricsByEndpoint.get(so.operationName)
  132. latency := options.FinishTime.Sub(so.startTime)
  133. if so.err {
  134. mets.RequestCountFailures.Inc(1)
  135. mets.RequestLatencyFailures.Record(latency)
  136. } else {
  137. mets.RequestCountSuccess.Inc(1)
  138. mets.RequestLatencySuccess.Record(latency)
  139. }
  140. mets.recordHTTPStatusCode(so.httpStatusCode)
  141. }
  142. // OnSetOperationName records new operation name.
  143. func (so *SpanObserver) OnSetOperationName(operationName string) {
  144. so.mux.Lock()
  145. so.operationName = operationName
  146. so.mux.Unlock()
  147. }
  148. // OnSetTag implements SpanObserver
  149. func (so *SpanObserver) OnSetTag(key string, value interface{}) {
  150. so.mux.Lock()
  151. so.handleTagInLock(key, value)
  152. so.mux.Unlock()
  153. }