柚子快報激活碼778899分享:go的分布式鏈路追蹤(1)
柚子快報激活碼778899分享:go的分布式鏈路追蹤(1)
????????本文是在go1.19.11,go-zero1.6.0的user.api和user.rpc兩個服務為例子來分析OpenTelemetry的用法。寫博客的目的,是用最通俗的語言記錄現(xiàn)在,為了讓自己不遺忘。
首先本文分析的鏈路是
我們通過日志的json格式打印出的 上面對應的span
//span(server-api)
{
"Name": "/api/user/userinfo",
"SpanContext": {
"TraceID": "5204e4a305e5c4a4b2b86dc4d426e70b",
"SpanID": "88c30a0ca171abb4",
"TraceFlags": "01",
"TraceState": "",
"Remote": false
},
"Parent": {
"TraceID": "00000000000000000000000000000000",
"SpanID": "0000000000000000",
"TraceFlags": "00",
"TraceState": "",
"Remote": false
},
"SpanKind": 2,
"StartTime": "2023-12-19T18:01:27.8911484+08:00",
"EndTime": "2023-12-19T18:01:27.9021409+08:00",
"Attributes": [
{
"Key": "http.target",
"Value": {
"Type": "STRING",
"Value": "/api/user/userinfo"
}
},
{
"Key": "http.server_name",
"Value": {
"Type": "STRING",
"Value": "User"
}
},
{
"Key": "http.route",
"Value": {
"Type": "STRING",
"Value": "/api/user/userinfo"
}
},
{
"Key": "http.user_agent",
"Value": {
"Type": "STRING",
"Value": "PostmanRuntime/7.36.0"
}
},
{
"Key": "http.scheme",
"Value": {
"Type": "STRING",
"Value": "http"
}
},
{
"Key": "http.host",
"Value": {
"Type": "STRING",
"Value": "127.0.0.1:11000"
}
},
{
"Key": "http.flavor",
"Value": {
"Type": "STRING",
"Value": "1.1"
}
},
{
"Key": "http.method",
"Value": {
"Type": "STRING",
"Value": "POST"
}
},
{
"Key": "http.status_code",
"Value": {
"Type": "INT64",
"Value": 200
}
}
],
"Events": null,
"Links": null,
"Status": {
"Code": "Unset",
"Description": ""
},
"DroppedAttributes": 0,
"DroppedEvents": 0,
"DroppedLinks": 0,
"ChildSpanCount": 1,
"Resource": [
{
"Key": "service.name",
"Value": {
"Type": "STRING",
"Value": "user.api"
}
}
],
"InstrumentationLibrary": {
"Name": "go-zero",
"Version": "",
"SchemaURL": ""
}
}
//span(client-rpc)
{
"Name": "user.User/UserInfo",
"SpanContext": {
"TraceID": "5204e4a305e5c4a4b2b86dc4d426e70b",
"SpanID": "22e1947a80132233",
"TraceFlags": "01",
"TraceState": "",
"Remote": false
},
"Parent": {
"TraceID": "5204e4a305e5c4a4b2b86dc4d426e70b",
"SpanID": "88c30a0ca171abb4",
"TraceFlags": "01",
"TraceState": "",
"Remote": false
},
"SpanKind": 3,
"StartTime": "2023-12-19T18:01:27.891912+08:00",
"EndTime": "2023-12-19T18:01:27.9016198+08:00",
"Attributes": [
{
"Key": "rpc.system",
"Value": {
"Type": "STRING",
"Value": "grpc"
}
},
{
"Key": "rpc.service",
"Value": {
"Type": "STRING",
"Value": "user.User"
}
},
{
"Key": "rpc.method",
"Value": {
"Type": "STRING",
"Value": "UserInfo"
}
},
{
"Key": "rpc.grpc.status_code",
"Value": {
"Type": "INT64",
"Value": 0
}
}
],
"Events": [
{
"Name": "message",
"Attributes": [
{
"Key": "message.type",
"Value": {
"Type": "STRING",
"Value": "SENT"
}
},
{
"Key": "message.id",
"Value": {
"Type": "INT64",
"Value": 1
}
},
{
"Key": "message.uncompressed_size",
"Value": {
"Type": "INT64",
"Value": 2
}
}
],
"DroppedAttributeCount": 0,
"Time": "2023-12-19T18:01:27.891912+08:00"
},
{
"Name": "message",
"Attributes": [
{
"Key": "message.type",
"Value": {
"Type": "STRING",
"Value": "RECEIVED"
}
},
{
"Key": "message.id",
"Value": {
"Type": "INT64",
"Value": 1
}
},
{
"Key": "message.uncompressed_size",
"Value": {
"Type": "INT64",
"Value": 23
}
}
],
"DroppedAttributeCount": 0,
"Time": "2023-12-19T18:01:27.9016198+08:00"
}
],
"Links": null,
"Status": {
"Code": "Unset",
"Description": ""
},
"DroppedAttributes": 0,
"DroppedEvents": 0,
"DroppedLinks": 0,
"ChildSpanCount": 0,
"Resource": [
{
"Key": "service.name",
"Value": {
"Type": "STRING",
"Value": "user.api"
}
}
],
"InstrumentationLibrary": {
"Name": "go-zero",
"Version": "",
"SchemaURL": ""
}
}
//span(client-redis)
{
"Name": "redis",
"SpanContext": {
"TraceID": "5204e4a305e5c4a4b2b86dc4d426e70b",
"SpanID": "7d856ed633df50cf",
"TraceFlags": "01",
"TraceState": "",
"Remote": false
},
"Parent": {
"TraceID": "5204e4a305e5c4a4b2b86dc4d426e70b",
"SpanID": "d76929b7c22e20f4",
"TraceFlags": "01",
"TraceState": "",
"Remote": false
},
"SpanKind": 3,
"StartTime": "2023-12-19T18:01:27.8929801+08:00",
"EndTime": "2023-12-19T18:01:27.9005968+08:00",
"Attributes": [
{
"Key": "redis.cmds",
"Value": {
"Type": "STRINGSLICE",
"Value": [
"get"
]
}
}
],
"Events": null,
"Links": null,
"Status": {
"Code": "Ok",
"Description": ""
},
"DroppedAttributes": 0,
"DroppedEvents": 0,
"DroppedLinks": 0,
"ChildSpanCount": 0,
"Resource": [
{
"Key": "service.name",
"Value": {
"Type": "STRING",
"Value": "user.rpc"
}
}
],
"InstrumentationLibrary": {
"Name": "go-zero",
"Version": "",
"SchemaURL": ""
}
}
//span(server-rpc)
{
"Name": "user.User/UserInfo",
"SpanContext": {
"TraceID": "5204e4a305e5c4a4b2b86dc4d426e70b",
"SpanID": "d76929b7c22e20f4",
"TraceFlags": "01",
"TraceState": "",
"Remote": false
},
"Parent": {
"TraceID": "5204e4a305e5c4a4b2b86dc4d426e70b",
"SpanID": "22e1947a80132233",
"TraceFlags": "01",
"TraceState": "",
"Remote": true
},
"SpanKind": 2,
"StartTime": "2023-12-19T18:01:27.8924298+08:00",
"EndTime": "2023-12-19T18:01:27.9011+08:00",
"Attributes": [
{
"Key": "rpc.system",
"Value": {
"Type": "STRING",
"Value": "grpc"
}
},
{
"Key": "rpc.service",
"Value": {
"Type": "STRING",
"Value": "user.User"
}
},
{
"Key": "rpc.method",
"Value": {
"Type": "STRING",
"Value": "UserInfo"
}
},
{
"Key": "net.peer.ip",
"Value": {
"Type": "STRING",
"Value": "192.168.1.36"
}
},
{
"Key": "net.peer.port",
"Value": {
"Type": "STRING",
"Value": "53405"
}
},
{
"Key": "rpc.grpc.status_code",
"Value": {
"Type": "INT64",
"Value": 0
}
}
],
"Events": [
{
"Name": "message",
"Attributes": [
{
"Key": "message.type",
"Value": {
"Type": "STRING",
"Value": "RECEIVED"
}
},
{
"Key": "message.id",
"Value": {
"Type": "INT64",
"Value": 1
}
},
{
"Key": "message.uncompressed_size",
"Value": {
"Type": "INT64",
"Value": 2
}
}
],
"DroppedAttributeCount": 0,
"Time": "2023-12-19T18:01:27.8924298+08:00"
},
{
"Name": "message",
"Attributes": [
{
"Key": "message.type",
"Value": {
"Type": "STRING",
"Value": "SENT"
}
},
{
"Key": "message.id",
"Value": {
"Type": "INT64",
"Value": 1
}
},
{
"Key": "message.uncompressed_size",
"Value": {
"Type": "INT64",
"Value": 23
}
}
],
"DroppedAttributeCount": 0,
"Time": "2023-12-19T18:01:27.9011+08:00"
}
],
"Links": null,
"Status": {
"Code": "Unset",
"Description": ""
},
"DroppedAttributes": 0,
"DroppedEvents": 0,
"DroppedLinks": 0,
"ChildSpanCount": 1,
"Resource": [
{
"Key": "service.name",
"Value": {
"Type": "STRING",
"Value": "user.rpc"
}
}
],
"InstrumentationLibrary": {
"Name": "go-zero",
"Version": "",
"SchemaURL": ""
}
}
上面的span的屬性,我們先不介紹,等開始看源碼的時候
1.Exporter
? ? ? ? 由于trace記錄的數(shù)據都要導出到一個具體的軟件去做分析,go的OpenTelemetry支持文件,終端,http,grpc,Jaeger,Zipkin等等,我們只是為了獲取數(shù)據,所以這里直接將數(shù)據輸出到文件。所以go-zero的user的api和rpc服務的配置
//api的user.yaml
Telemetry:
Name: user.api
Endpoint: ./usertrace.log
Sampler: 1.0
Batcher: file
//rpc的user.yaml
Telemetry:
Name: user.rpc
Endpoint: ./usertrace.log
Sampler: 1.0
Batcher: file
無論是api和rpc的服務,都會調用c.SetUp()函數(shù),然后SetUp函trace.StartAgent(sc.Telemetry),然后進入StartAgent里面,里面會將Endpoint的set結構保存起來,然后執(zhí)行內部的startAgent
func startAgent(c Config) error {
AddResources(semconv.ServiceNameKey.String(c.Name))
opts := []sdktrace.TracerProviderOption{
// Set the sampling rate based on the parent span to 100%
sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(c.Sampler))),
// Record information about this application in a Resource.
sdktrace.WithResource(resource.NewSchemaless(attrResources...)),
}
if len(c.Endpoint) > 0 {
exp, err := createExporter(c)
if err != nil {
logx.Error(err)
return err
}
// Always be sure to batch in production.
opts = append(opts, sdktrace.WithBatcher(exp))
}
tp = sdktrace.NewTracerProvider(opts...)
otel.SetTracerProvider(tp)
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
logx.Errorf("[otel] error: %v", err)
}))
return nil
}
其中
tp = sdktrace.NewTracerProvider(opts...)
otel.SetTracerProvider(tp)
這兩句代碼是設置TracerProvider,我們看一下TracerProvider有哪些字段
type TracerProvider struct {
mu sync.Mutex
namedTracer map[instrumentation.Scope]*tracer
spanProcessors atomic.Pointer[spanProcessorStates]
isShutdown atomic.Bool
// These fields are not protected by the lock mu. They are assumed to be
// immutable after creation of the TracerProvider.
sampler Sampler
idGenerator IDGenerator
spanLimits SpanLimits
resource *resource.Resource
}
AddResources(semconv.ServiceNameKey.String(c.Name))
是將attribute.KeyValue的鍵值對存入,上面日志里的resource屬性,value就是配置里的Name值
"Resource": [
{
"Key": "service.name",
"Value": {
"Type": "STRING",
"Value": "user.rpc"
}
}
],
其中的
opts := []sdktrace.TracerProviderOption{
// Set the sampling rate based on the parent span to 100%
sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(c.Sampler))),
// Record information about this application in a Resource.
sdktrace.WithResource(resource.NewSchemaless(attrResources...)),
}
sdktrace.WithSampler賦值TracerProvider的sampler,我們來看一下sampler是什么東西,看源碼注釋
func TraceIDRatioBased(fraction float64) Sampler {
//如果fraction大于1.返回AlwaysSample,這個是一定記錄數(shù)據
if fraction >= 1 {
return AlwaysSample()
}
if fraction <= 0 {
fraction = 0
}
//如果fraction0-1之間.返回traceIDRatioSampler,這個是按fraction的概率來記錄
return &traceIDRatioSampler{
traceIDUpperBound: uint64(fraction * (1 << 63)),
description: fmt.Sprintf("TraceIDRatioBased{%g}", fraction),
}
}
func (ts traceIDRatioSampler) ShouldSample(p SamplingParameters) SamplingResult {
psc := trace.SpanContextFromContext(p.ParentContext)
//這個是選取TraceID的8-16位的一個字節(jié)的數(shù)字來判斷概率是否記錄span
x := binary.BigEndian.Uint64(p.TraceID[8:16]) >> 1
//如果比邊界小,就記錄span
if x < ts.traceIDUpperBound {
return SamplingResult{
Decision: RecordAndSample,
Tracestate: psc.TraceState(),
}
}
//大于就不記錄
return SamplingResult{
Decision: Drop,
Tracestate: psc.TraceState(),
}
}
在這上面,由于我們的user.yaml的Sampler是1,所以是AlwaysSample,這個代表的是我們一直記錄span,當然也有alwaysOffSampler從來不記錄。
sdktrace.WithResource就是賦值了TracerProvider的resource
下面看怎么賦值TracerProvider的resource的spanProcessors
if len(c.Endpoint) > 0 { ?? ??? ?exp, err := createExporter(c) ?? ??? ?if err != nil { ?? ??? ??? ?logx.Error(err) ?? ??? ??? ?return err ?? ??? ?}
?? ??? ?// Always be sure to batch in production. ?? ??? ?opts = append(opts, sdktrace.WithBatcher(exp)) ?? ?}
createExporter(c)的函數(shù)可以根據http,grpc,Jaeger,Zipkin,file等,由于user.yaml的Batcher: file所以我們選擇的是file,返回的exp都要滿足接口
type SpanExporter interface { ? ? ExportSpans(ctx context.Context, spans []ReadOnlySpan) error ? ? Shutdown(ctx context.Context) error }
然后看看Exporter的機制是怎樣導出數(shù)據的
sdktrace.WithBatcher(exp)---->NewBatchSpanProcessor(e, opts...)導出機制就在這個類里實現(xiàn)的,看源碼注釋
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package trace // import "go.opentelemetry.io/otel/sdk/trace"
import (
"context"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/internal/env"
"go.opentelemetry.io/otel/trace"
)
// Defaults for BatchSpanProcessorOptions.
const (
DefaultMaxQueueSize = 2048
DefaultScheduleDelay = 5000
DefaultExportTimeout = 30000
DefaultMaxExportBatchSize = 512
)
// BatchSpanProcessorOption configures a BatchSpanProcessor.
type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)
// BatchSpanProcessorOptions is configuration settings for a
// BatchSpanProcessor.
type BatchSpanProcessorOptions struct {
// MaxQueueSize is the maximum queue size to buffer spans for delayed processing. If the
// queue gets full it drops the spans. Use BlockOnQueueFull to change this behavior.
// The default value of MaxQueueSize is 2048.
MaxQueueSize int
// BatchTimeout is the maximum duration for constructing a batch. Processor
// forcefully sends available spans when timeout is reached.
// The default value of BatchTimeout is 5000 msec.
BatchTimeout time.Duration
// ExportTimeout specifies the maximum duration for exporting spans. If the timeout
// is reached, the export will be cancelled.
// The default value of ExportTimeout is 30000 msec.
ExportTimeout time.Duration
// MaxExportBatchSize is the maximum number of spans to process in a single batch.
// If there are more than one batch worth of spans then it processes multiple batches
// of spans one batch after the other without any delay.
// The default value of MaxExportBatchSize is 512.
MaxExportBatchSize int
// BlockOnQueueFull blocks onEnd() and onStart() method if the queue is full
// AND if BlockOnQueueFull is set to true.
// Blocking option should be used carefully as it can severely affect the performance of an
// application.
BlockOnQueueFull bool
}
// batchSpanProcessor is a SpanProcessor that batches asynchronously-received
// spans and sends them to a trace.Exporter when complete.
type batchSpanProcessor struct {
e SpanExporter
o BatchSpanProcessorOptions
queue chan ReadOnlySpan
dropped uint32
batch []ReadOnlySpan
batchMutex sync.Mutex
timer *time.Timer
stopWait sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
stopped atomic.Bool
}
var _ SpanProcessor = (*batchSpanProcessor)(nil)
// NewBatchSpanProcessor creates a new SpanProcessor that will send completed
// span batches to the exporter with the supplied options.
//
// If the exporter is nil, the span processor will perform no action.
func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor {
//默認的maxQueueSize 是2048
maxQueueSize := env.BatchSpanProcessorMaxQueueSize(DefaultMaxQueueSize)
//默認的maxExportBatchSize是512
maxExportBatchSize := env.BatchSpanProcessorMaxExportBatchSize(DefaultMaxExportBatchSize)
//檢驗參數(shù)
if maxExportBatchSize > maxQueueSize {
if DefaultMaxExportBatchSize > maxQueueSize {
maxExportBatchSize = maxQueueSize
} else {
maxExportBatchSize = DefaultMaxExportBatchSize
}
}
//BatchTimeout超時時間是5s,ExportTimeout是30s
o := BatchSpanProcessorOptions{
BatchTimeout: time.Duration(env.BatchSpanProcessorScheduleDelay(DefaultScheduleDelay)) * time.Millisecond,
ExportTimeout: time.Duration(env.BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond,
MaxQueueSize: maxQueueSize,
MaxExportBatchSize: maxExportBatchSize,
}
//可選項模式
for _, opt := range options {
opt(&o)
}
bsp := &batchSpanProcessor{
e: exporter,
o: o,
batch: make([]ReadOnlySpan, 0, o.MaxExportBatchSize),
timer: time.NewTimer(o.BatchTimeout),
queue: make(chan ReadOnlySpan, o.MaxQueueSize),
stopCh: make(chan struct{}),
}
bsp.stopWait.Add(1)
go func() {
defer bsp.stopWait.Done()
//處理queue數(shù)據,并導出
bsp.processQueue()
//當stopCh關閉時,殘留的數(shù)據繼續(xù)導出
bsp.drainQueue()
}()
return bsp
}
// OnStart method does nothing.
//對外提供的OnStart接口,span.start的時候調用
func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {}
//對外提供的OnEnd接口,span.end的時候調用
// OnEnd method enqueues a ReadOnlySpan for later processing.
func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) {
// Do not enqueue spans after Shutdown.
if bsp.stopped.Load() {
return
}
// Do not enqueue spans if we are just going to drop them.
if bsp.e == nil {
return
}
bsp.enqueue(s)
}
// Shutdown flushes the queue and waits until all spans are processed.
// It only executes once. Subsequent call does nothing.
func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
var err error
bsp.stopOnce.Do(func() { //once只執(zhí)行一次
bsp.stopped.Store(true) //設置停止的狀態(tài)
wait := make(chan struct{})
go func() {
close(bsp.stopCh) //關閉隊列執(zhí)行的通道
bsp.stopWait.Wait() //等到bsp.processQueue() bsp.drainQueue()兩個函數(shù)執(zhí)行完了,才走下面的關閉
if bsp.e != nil {
//file類型的接口Shutdown函數(shù)
if err := bsp.e.Shutdown(ctx); err != nil {
otel.Handle(err)
}
}
close(wait) //關閉wait的通道
}()
// Wait until the wait group is done or the context is cancelled
select {
case <-wait: //等到file的Shutdown函數(shù)的協(xié)程函數(shù)執(zhí)行完,才能監(jiān)聽到退出
case <-ctx.Done():
err = ctx.Err()
}
})
return err
}
type forceFlushSpan struct {
ReadOnlySpan
flushed chan struct{}
}
func (f forceFlushSpan) SpanContext() trace.SpanContext {
return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled})
}
// ForceFlush exports all ended spans that have not yet been exported.
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
// Interrupt if context is already canceled.
if err := ctx.Err(); err != nil {
return err
}
// Do nothing after Shutdown.
if bsp.stopped.Load() {
return nil
}
var err error
if bsp.e != nil {
flushCh := make(chan struct{})
if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) {
select {
case <-bsp.stopCh:
// The batchSpanProcessor is Shutdown.
return nil
case <-flushCh:
// Processed any items in queue prior to ForceFlush being called
case <-ctx.Done():
return ctx.Err()
}
}
wait := make(chan error)
go func() {
wait <- bsp.exportSpans(ctx)
close(wait)
}()
// Wait until the export is finished or the context is cancelled/timed out
select {
case err = <-wait:
case <-ctx.Done():
err = ctx.Err()
}
}
return err
}
//下面都是可選項模式賦值
// WithMaxQueueSize returns a BatchSpanProcessorOption that configures the
// maximum queue size allowed for a BatchSpanProcessor.
func WithMaxQueueSize(size int) BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.MaxQueueSize = size
}
}
// WithMaxExportBatchSize returns a BatchSpanProcessorOption that configures
// the maximum export batch size allowed for a BatchSpanProcessor.
func WithMaxExportBatchSize(size int) BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.MaxExportBatchSize = size
}
}
// WithBatchTimeout returns a BatchSpanProcessorOption that configures the
// maximum delay allowed for a BatchSpanProcessor before it will export any
// held span (whether the queue is full or not).
func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.BatchTimeout = delay
}
}
// WithExportTimeout returns a BatchSpanProcessorOption that configures the
// amount of time a BatchSpanProcessor waits for an exporter to export before
// abandoning the export.
func WithExportTimeout(timeout time.Duration) BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.ExportTimeout = timeout
}
}
// WithBlocking returns a BatchSpanProcessorOption that configures a
// BatchSpanProcessor to wait for enqueue operations to succeed instead of
// dropping data when the queue is full.
func WithBlocking() BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.BlockOnQueueFull = true
}
}
// exportSpans is a subroutine of processing and draining the queue.
func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
bsp.timer.Reset(bsp.o.BatchTimeout)
bsp.batchMutex.Lock()
defer bsp.batchMutex.Unlock()
//導出時間超時30s,如果超時直接取消
if bsp.o.ExportTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout)
defer cancel()
}
//隊列有數(shù)據才導出
if l := len(bsp.batch); l > 0 {
global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
//這里就是前面的file的接口ExportSpans函數(shù),并監(jiān)聽ctx超時30s的觸發(fā)
err := bsp.e.ExportSpans(ctx, bsp.batch)
// A new batch is always created after exporting, even if the batch failed to be exported.
//
// It is up to the exporter to implement any type of retry logic if a batch is failing
// to be exported, since it is specific to the protocol and backend being sent to.
//清空隊列
bsp.batch = bsp.batch[:0]
if err != nil {
return err
}
}
return nil
}
// processQueue removes spans from the `queue` channel until processor
// is shut down. It calls the exporter in batches of up to MaxExportBatchSize
// waiting up to BatchTimeout to form a batch.
func (bsp *batchSpanProcessor) processQueue() {
defer bsp.timer.Stop()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
select {
case <-bsp.stopCh: //停止
return
case <-bsp.timer.C: //3s定時器到了導出數(shù)據
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
case sd := <-bsp.queue: //有數(shù)據到了
if ffs, ok := sd.(forceFlushSpan); ok { //如果是forceFlushSpan類型,關ffs.flushed
close(ffs.flushed)
continue
}
bsp.batchMutex.Lock()
bsp.batch = append(bsp.batch, sd) //將數(shù)據添加到隊列
shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize
bsp.batchMutex.Unlock()
if shouldExport {//如果隊列大于512,就到導出了
if !bsp.timer.Stop() {//導出之前,定時器沒關閉,就等到當前定時器到了,然后再導出數(shù)據
<-bsp.timer.C
}
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
}
}
}
}
// drainQueue awaits the any caller that had added to bsp.stopWait
// to finish the enqueue, then exports the final batch.
func (bsp *batchSpanProcessor) drainQueue() {
// 當processQueue的bsp.stopCh觸發(fā)了,就會調用這里
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
select {
case sd := <-bsp.queue: //如果bsp.queue恰好有發(fā)送數(shù)據,就會觸發(fā)這里
if _, ok := sd.(forceFlushSpan); ok {
// Ignore flush requests as they are not valid spans.
continue
}
bsp.batchMutex.Lock()
bsp.batch = append(bsp.batch, sd)
shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize
bsp.batchMutex.Unlock()
//這里也是到512的長度就直接導出數(shù)據
if shouldExport {
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
}
default: //如果沒有數(shù)據,就將剩余的所有數(shù)據導出
// There are no more enqueued spans. Make final export.
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
return
}
}
}
func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {
ctx := context.TODO()
//可選參數(shù)BlockOnQueueFull =true表示是否阻塞發(fā)送數(shù)據
if bsp.o.BlockOnQueueFull {
//阻塞發(fā)送數(shù)據
bsp.enqueueBlockOnQueueFull(ctx, sd)
} else {
//非阻塞發(fā)送數(shù)據
bsp.enqueueDrop(ctx, sd)
}
}
func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan) bool {
if !sd.SpanContext().IsSampled() {
return false
}
//阻塞等待直到有接受數(shù)據才返回
select {
case bsp.queue <- sd:
return true
case <-ctx.Done():
return false
}
}
func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan) bool {
if !sd.SpanContext().IsSampled() {
return false
}
//如果沒有接受數(shù)據的,直接將dropped+1
select {
case bsp.queue <- sd:
return true
default:
atomic.AddUint32(&bsp.dropped, 1)
}
return false
}
// MarshalLog is the marshaling function used by the logging system to represent this exporter.
func (bsp *batchSpanProcessor) MarshalLog() interface{} {
return struct {
Type string
SpanExporter SpanExporter
Config BatchSpanProcessorOptions
}{
Type: "BatchSpanProcessor",
SpanExporter: bsp.e,
Config: bsp.o,
}
}
????????簡單總結就是span的隊列長度大于一定的長度就會觸發(fā)導出,并且也有定時器觸發(fā)導出,可以設置導出的數(shù)據格式,本文是json,千萬注意其中的,對外提供的OnStart接口,span.start的時候調用,對外提供的OnEnd接口,span.end的時候調用,會將span數(shù)據添加到隊列,可以有是否阻塞的方式 ?
2.propagation
go-zero的設置的TextMapPropagator是在trace的目錄下的propagation.go文件下的初始化的
func init() {
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{}, propagation.Baggage{}))
}
func (p compositeTextMapPropagator) Inject(ctx context.Context, carrier TextMapCarrier) {
for _, i := range p {
i.Inject(ctx, carrier)
}
}
func (p compositeTextMapPropagator) Extract(ctx context.Context, carrier TextMapCarrier) context.Context {
for _, i := range p {
ctx = i.Extract(ctx, carrier)
}
return ctx
}
其中設置的是一個數(shù)組TextMapPropagator的數(shù)組,包含TraceContext和Baggage的實體類的執(zhí)行,就循環(huán)執(zhí)行他們的Extract和Inject函數(shù)
TraceContext的分析,Extract就是TextMapCarrier的頭中得到key是"traceparent",value是
h := fmt.Sprintf("%.2x-%s-%s-%s",
supportedVersion,
sc.TraceID(),
sc.SpanID(),
flags))
的類型的數(shù)據,然后檢驗其合法性,并將spanconnetx的值加入到ctx中,Inject函數(shù)就是spanconnetx的值按照上面的這種格式寫入TextMapCarrier頭中,key是"traceparent",其實就是一個put,一個get,Baggage也是類似的,只不過它只傳輸其他的數(shù)據,自己可以看看源碼
最后真正的分析將會在下章解析
柚子快報激活碼778899分享:go的分布式鏈路追蹤(1)
文章鏈接
本文內容根據網絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉載請注明,如有侵權,聯(lián)系刪除。