transport_udp.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. // Copyright (c) 2017 Uber Technologies, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package jaeger
  15. import (
  16. "errors"
  17. "fmt"
  18. "github.com/uber/jaeger-client-go/thrift"
  19. j "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
  20. "github.com/uber/jaeger-client-go/utils"
  21. )
  22. // Empirically obtained constant for how many bytes in the message are used for envelope.
  23. // The total datagram size is:
  24. // sizeof(Span) * numSpans + processByteSize + emitBatchOverhead <= maxPacketSize
  25. // There is a unit test `TestEmitBatchOverhead` that validates this number.
  26. // Note that due to the use of Compact Thrift protocol, overhead grows with the number of spans
  27. // in the batch, because the length of the list is encoded as varint32, as well as SeqId.
  28. const emitBatchOverhead = 30
  29. var errSpanTooLarge = errors.New("Span is too large")
  30. type udpSender struct {
  31. client *utils.AgentClientUDP
  32. maxPacketSize int // max size of datagram in bytes
  33. maxSpanBytes int // max number of bytes to record spans (excluding envelope) in the datagram
  34. byteBufferSize int // current number of span bytes accumulated in the buffer
  35. spanBuffer []*j.Span // spans buffered before a flush
  36. thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span
  37. thriftProtocol thrift.TProtocol
  38. process *j.Process
  39. processByteSize int
  40. }
  41. // NewUDPTransport creates a reporter that submits spans to jaeger-agent
  42. func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) {
  43. if len(hostPort) == 0 {
  44. hostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort)
  45. }
  46. if maxPacketSize == 0 {
  47. maxPacketSize = utils.UDPPacketMaxLength
  48. }
  49. protocolFactory := thrift.NewTCompactProtocolFactory()
  50. // Each span is first written to thriftBuffer to determine its size in bytes.
  51. thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize)
  52. thriftProtocol := protocolFactory.GetProtocol(thriftBuffer)
  53. client, err := utils.NewAgentClientUDP(hostPort, maxPacketSize)
  54. if err != nil {
  55. return nil, err
  56. }
  57. sender := &udpSender{
  58. client: client,
  59. maxSpanBytes: maxPacketSize - emitBatchOverhead,
  60. thriftBuffer: thriftBuffer,
  61. thriftProtocol: thriftProtocol}
  62. return sender, nil
  63. }
  64. func (s *udpSender) calcSizeOfSerializedThrift(thriftStruct thrift.TStruct) int {
  65. s.thriftBuffer.Reset()
  66. thriftStruct.Write(s.thriftProtocol)
  67. return s.thriftBuffer.Len()
  68. }
  69. func (s *udpSender) Append(span *Span) (int, error) {
  70. if s.process == nil {
  71. s.process = BuildJaegerProcessThrift(span)
  72. s.processByteSize = s.calcSizeOfSerializedThrift(s.process)
  73. s.byteBufferSize += s.processByteSize
  74. }
  75. jSpan := BuildJaegerThrift(span)
  76. spanSize := s.calcSizeOfSerializedThrift(jSpan)
  77. if spanSize > s.maxSpanBytes {
  78. return 1, errSpanTooLarge
  79. }
  80. s.byteBufferSize += spanSize
  81. if s.byteBufferSize <= s.maxSpanBytes {
  82. s.spanBuffer = append(s.spanBuffer, jSpan)
  83. if s.byteBufferSize < s.maxSpanBytes {
  84. return 0, nil
  85. }
  86. return s.Flush()
  87. }
  88. // the latest span did not fit in the buffer
  89. n, err := s.Flush()
  90. s.spanBuffer = append(s.spanBuffer, jSpan)
  91. s.byteBufferSize = spanSize + s.processByteSize
  92. return n, err
  93. }
  94. func (s *udpSender) Flush() (int, error) {
  95. n := len(s.spanBuffer)
  96. if n == 0 {
  97. return 0, nil
  98. }
  99. err := s.client.EmitBatch(&j.Batch{Process: s.process, Spans: s.spanBuffer})
  100. s.resetBuffers()
  101. return n, err
  102. }
  103. func (s *udpSender) Close() error {
  104. return s.client.Close()
  105. }
  106. func (s *udpSender) resetBuffers() {
  107. for i := range s.spanBuffer {
  108. s.spanBuffer[i] = nil
  109. }
  110. s.spanBuffer = s.spanBuffer[:0]
  111. s.byteBufferSize = s.processByteSize
  112. }