123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432 |
- // Copyright The OpenTelemetry Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package trace // import "go.opentelemetry.io/otel/sdk/trace"
- import (
- "context"
- "runtime"
- "sync"
- "sync/atomic"
- "time"
- "go.opentelemetry.io/otel"
- "go.opentelemetry.io/otel/internal/global"
- "go.opentelemetry.io/otel/sdk/internal/env"
- "go.opentelemetry.io/otel/trace"
- )
- // Defaults for BatchSpanProcessorOptions.
- const (
- DefaultMaxQueueSize = 2048
- DefaultScheduleDelay = 5000
- DefaultExportTimeout = 30000
- DefaultMaxExportBatchSize = 512
- )
- // BatchSpanProcessorOption configures a BatchSpanProcessor.
- type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)
- // BatchSpanProcessorOptions is configuration settings for a
- // BatchSpanProcessor.
- type BatchSpanProcessorOptions struct {
- // MaxQueueSize is the maximum queue size to buffer spans for delayed processing. If the
- // queue gets full it drops the spans. Use BlockOnQueueFull to change this behavior.
- // The default value of MaxQueueSize is 2048.
- MaxQueueSize int
- // BatchTimeout is the maximum duration for constructing a batch. Processor
- // forcefully sends available spans when timeout is reached.
- // The default value of BatchTimeout is 5000 msec.
- BatchTimeout time.Duration
- // ExportTimeout specifies the maximum duration for exporting spans. If the timeout
- // is reached, the export will be cancelled.
- // The default value of ExportTimeout is 30000 msec.
- ExportTimeout time.Duration
- // MaxExportBatchSize is the maximum number of spans to process in a single batch.
- // If there are more than one batch worth of spans then it processes multiple batches
- // of spans one batch after the other without any delay.
- // The default value of MaxExportBatchSize is 512.
- MaxExportBatchSize int
- // BlockOnQueueFull blocks onEnd() and onStart() method if the queue is full
- // AND if BlockOnQueueFull is set to true.
- // Blocking option should be used carefully as it can severely affect the performance of an
- // application.
- BlockOnQueueFull bool
- }
- // batchSpanProcessor is a SpanProcessor that batches asynchronously-received
- // spans and sends them to a trace.Exporter when complete.
- type batchSpanProcessor struct {
- e SpanExporter
- o BatchSpanProcessorOptions
- queue chan ReadOnlySpan
- dropped uint32
- batch []ReadOnlySpan
- batchMutex sync.Mutex
- timer *time.Timer
- stopWait sync.WaitGroup
- stopOnce sync.Once
- stopCh chan struct{}
- }
- var _ SpanProcessor = (*batchSpanProcessor)(nil)
- // NewBatchSpanProcessor creates a new SpanProcessor that will send completed
- // span batches to the exporter with the supplied options.
- //
- // If the exporter is nil, the span processor will preform no action.
- func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor {
- maxQueueSize := env.BatchSpanProcessorMaxQueueSize(DefaultMaxQueueSize)
- maxExportBatchSize := env.BatchSpanProcessorMaxExportBatchSize(DefaultMaxExportBatchSize)
- if maxExportBatchSize > maxQueueSize {
- if DefaultMaxExportBatchSize > maxQueueSize {
- maxExportBatchSize = maxQueueSize
- } else {
- maxExportBatchSize = DefaultMaxExportBatchSize
- }
- }
- o := BatchSpanProcessorOptions{
- BatchTimeout: time.Duration(env.BatchSpanProcessorScheduleDelay(DefaultScheduleDelay)) * time.Millisecond,
- ExportTimeout: time.Duration(env.BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond,
- MaxQueueSize: maxQueueSize,
- MaxExportBatchSize: maxExportBatchSize,
- }
- for _, opt := range options {
- opt(&o)
- }
- bsp := &batchSpanProcessor{
- e: exporter,
- o: o,
- batch: make([]ReadOnlySpan, 0, o.MaxExportBatchSize),
- timer: time.NewTimer(o.BatchTimeout),
- queue: make(chan ReadOnlySpan, o.MaxQueueSize),
- stopCh: make(chan struct{}),
- }
- bsp.stopWait.Add(1)
- go func() {
- defer bsp.stopWait.Done()
- bsp.processQueue()
- bsp.drainQueue()
- }()
- return bsp
- }
- // OnStart method does nothing.
- func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {}
- // OnEnd method enqueues a ReadOnlySpan for later processing.
- func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) {
- // Do not enqueue spans if we are just going to drop them.
- if bsp.e == nil {
- return
- }
- bsp.enqueue(s)
- }
- // Shutdown flushes the queue and waits until all spans are processed.
- // It only executes once. Subsequent call does nothing.
- func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
- var err error
- bsp.stopOnce.Do(func() {
- wait := make(chan struct{})
- go func() {
- close(bsp.stopCh)
- bsp.stopWait.Wait()
- if bsp.e != nil {
- if err := bsp.e.Shutdown(ctx); err != nil {
- otel.Handle(err)
- }
- }
- close(wait)
- }()
- // Wait until the wait group is done or the context is cancelled
- select {
- case <-wait:
- case <-ctx.Done():
- err = ctx.Err()
- }
- })
- return err
- }
- type forceFlushSpan struct {
- ReadOnlySpan
- flushed chan struct{}
- }
- func (f forceFlushSpan) SpanContext() trace.SpanContext {
- return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled})
- }
- // ForceFlush exports all ended spans that have not yet been exported.
- func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
- var err error
- if bsp.e != nil {
- flushCh := make(chan struct{})
- if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) {
- select {
- case <-flushCh:
- // Processed any items in queue prior to ForceFlush being called
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- wait := make(chan error)
- go func() {
- wait <- bsp.exportSpans(ctx)
- close(wait)
- }()
- // Wait until the export is finished or the context is cancelled/timed out
- select {
- case err = <-wait:
- case <-ctx.Done():
- err = ctx.Err()
- }
- }
- return err
- }
- // WithMaxQueueSize returns a BatchSpanProcessorOption that configures the
- // maximum queue size allowed for a BatchSpanProcessor.
- func WithMaxQueueSize(size int) BatchSpanProcessorOption {
- return func(o *BatchSpanProcessorOptions) {
- o.MaxQueueSize = size
- }
- }
- // WithMaxExportBatchSize returns a BatchSpanProcessorOption that configures
- // the maximum export batch size allowed for a BatchSpanProcessor.
- func WithMaxExportBatchSize(size int) BatchSpanProcessorOption {
- return func(o *BatchSpanProcessorOptions) {
- o.MaxExportBatchSize = size
- }
- }
- // WithBatchTimeout returns a BatchSpanProcessorOption that configures the
- // maximum delay allowed for a BatchSpanProcessor before it will export any
- // held span (whether the queue is full or not).
- func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption {
- return func(o *BatchSpanProcessorOptions) {
- o.BatchTimeout = delay
- }
- }
- // WithExportTimeout returns a BatchSpanProcessorOption that configures the
- // amount of time a BatchSpanProcessor waits for an exporter to export before
- // abandoning the export.
- func WithExportTimeout(timeout time.Duration) BatchSpanProcessorOption {
- return func(o *BatchSpanProcessorOptions) {
- o.ExportTimeout = timeout
- }
- }
- // WithBlocking returns a BatchSpanProcessorOption that configures a
- // BatchSpanProcessor to wait for enqueue operations to succeed instead of
- // dropping data when the queue is full.
- func WithBlocking() BatchSpanProcessorOption {
- return func(o *BatchSpanProcessorOptions) {
- o.BlockOnQueueFull = true
- }
- }
- // exportSpans is a subroutine of processing and draining the queue.
- func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
- bsp.timer.Reset(bsp.o.BatchTimeout)
- bsp.batchMutex.Lock()
- defer bsp.batchMutex.Unlock()
- if bsp.o.ExportTimeout > 0 {
- var cancel context.CancelFunc
- ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout)
- defer cancel()
- }
- if l := len(bsp.batch); l > 0 {
- global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
- err := bsp.e.ExportSpans(ctx, bsp.batch)
- // A new batch is always created after exporting, even if the batch failed to be exported.
- //
- // It is up to the exporter to implement any type of retry logic if a batch is failing
- // to be exported, since it is specific to the protocol and backend being sent to.
- bsp.batch = bsp.batch[:0]
- if err != nil {
- return err
- }
- }
- return nil
- }
- // processQueue removes spans from the `queue` channel until processor
- // is shut down. It calls the exporter in batches of up to MaxExportBatchSize
- // waiting up to BatchTimeout to form a batch.
- func (bsp *batchSpanProcessor) processQueue() {
- defer bsp.timer.Stop()
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- for {
- select {
- case <-bsp.stopCh:
- return
- case <-bsp.timer.C:
- if err := bsp.exportSpans(ctx); err != nil {
- otel.Handle(err)
- }
- case sd := <-bsp.queue:
- if ffs, ok := sd.(forceFlushSpan); ok {
- close(ffs.flushed)
- continue
- }
- bsp.batchMutex.Lock()
- bsp.batch = append(bsp.batch, sd)
- shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize
- bsp.batchMutex.Unlock()
- if shouldExport {
- if !bsp.timer.Stop() {
- <-bsp.timer.C
- }
- if err := bsp.exportSpans(ctx); err != nil {
- otel.Handle(err)
- }
- }
- }
- }
- }
- // drainQueue awaits the any caller that had added to bsp.stopWait
- // to finish the enqueue, then exports the final batch.
- func (bsp *batchSpanProcessor) drainQueue() {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- for {
- select {
- case sd := <-bsp.queue:
- if sd == nil {
- if err := bsp.exportSpans(ctx); err != nil {
- otel.Handle(err)
- }
- return
- }
- bsp.batchMutex.Lock()
- bsp.batch = append(bsp.batch, sd)
- shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize
- bsp.batchMutex.Unlock()
- if shouldExport {
- if err := bsp.exportSpans(ctx); err != nil {
- otel.Handle(err)
- }
- }
- default:
- close(bsp.queue)
- }
- }
- }
- func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {
- ctx := context.TODO()
- if bsp.o.BlockOnQueueFull {
- bsp.enqueueBlockOnQueueFull(ctx, sd)
- } else {
- bsp.enqueueDrop(ctx, sd)
- }
- }
- func recoverSendOnClosedChan() {
- x := recover()
- switch err := x.(type) {
- case nil:
- return
- case runtime.Error:
- if err.Error() == "send on closed channel" {
- return
- }
- }
- panic(x)
- }
- func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan) bool {
- if !sd.SpanContext().IsSampled() {
- return false
- }
- // This ensures the bsp.queue<- below does not panic as the
- // processor shuts down.
- defer recoverSendOnClosedChan()
- select {
- case <-bsp.stopCh:
- return false
- default:
- }
- select {
- case bsp.queue <- sd:
- return true
- case <-ctx.Done():
- return false
- }
- }
- func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan) bool {
- if !sd.SpanContext().IsSampled() {
- return false
- }
- // This ensures the bsp.queue<- below does not panic as the
- // processor shuts down.
- defer recoverSendOnClosedChan()
- select {
- case <-bsp.stopCh:
- return false
- default:
- }
- select {
- case bsp.queue <- sd:
- return true
- default:
- atomic.AddUint32(&bsp.dropped, 1)
- }
- return false
- }
- // MarshalLog is the marshaling function used by the logging system to represent this exporter.
- func (bsp *batchSpanProcessor) MarshalLog() interface{} {
- return struct {
- Type string
- SpanExporter SpanExporter
- Config BatchSpanProcessorOptions
- }{
- Type: "BatchSpanProcessor",
- SpanExporter: bsp.e,
- Config: bsp.o,
- }
- }
|