欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報激活碼778899分享:go的分布式鏈路追蹤(1)

柚子快報激活碼778899分享:go的分布式鏈路追蹤(1)

http://yzkb.51969.com/

????????本文是在go1.19.11,go-zero1.6.0的user.api和user.rpc兩個服務為例子來分析OpenTelemetry的用法。寫博客的目的,是用最通俗的語言記錄現(xiàn)在,為了讓自己不遺忘。

首先本文分析的鏈路是

我們通過日志的json格式打印出的 上面對應的span

//span(server-api)

{

"Name": "/api/user/userinfo",

"SpanContext": {

"TraceID": "5204e4a305e5c4a4b2b86dc4d426e70b",

"SpanID": "88c30a0ca171abb4",

"TraceFlags": "01",

"TraceState": "",

"Remote": false

},

"Parent": {

"TraceID": "00000000000000000000000000000000",

"SpanID": "0000000000000000",

"TraceFlags": "00",

"TraceState": "",

"Remote": false

},

"SpanKind": 2,

"StartTime": "2023-12-19T18:01:27.8911484+08:00",

"EndTime": "2023-12-19T18:01:27.9021409+08:00",

"Attributes": [

{

"Key": "http.target",

"Value": {

"Type": "STRING",

"Value": "/api/user/userinfo"

}

},

{

"Key": "http.server_name",

"Value": {

"Type": "STRING",

"Value": "User"

}

},

{

"Key": "http.route",

"Value": {

"Type": "STRING",

"Value": "/api/user/userinfo"

}

},

{

"Key": "http.user_agent",

"Value": {

"Type": "STRING",

"Value": "PostmanRuntime/7.36.0"

}

},

{

"Key": "http.scheme",

"Value": {

"Type": "STRING",

"Value": "http"

}

},

{

"Key": "http.host",

"Value": {

"Type": "STRING",

"Value": "127.0.0.1:11000"

}

},

{

"Key": "http.flavor",

"Value": {

"Type": "STRING",

"Value": "1.1"

}

},

{

"Key": "http.method",

"Value": {

"Type": "STRING",

"Value": "POST"

}

},

{

"Key": "http.status_code",

"Value": {

"Type": "INT64",

"Value": 200

}

}

],

"Events": null,

"Links": null,

"Status": {

"Code": "Unset",

"Description": ""

},

"DroppedAttributes": 0,

"DroppedEvents": 0,

"DroppedLinks": 0,

"ChildSpanCount": 1,

"Resource": [

{

"Key": "service.name",

"Value": {

"Type": "STRING",

"Value": "user.api"

}

}

],

"InstrumentationLibrary": {

"Name": "go-zero",

"Version": "",

"SchemaURL": ""

}

}

//span(client-rpc)

{

"Name": "user.User/UserInfo",

"SpanContext": {

"TraceID": "5204e4a305e5c4a4b2b86dc4d426e70b",

"SpanID": "22e1947a80132233",

"TraceFlags": "01",

"TraceState": "",

"Remote": false

},

"Parent": {

"TraceID": "5204e4a305e5c4a4b2b86dc4d426e70b",

"SpanID": "88c30a0ca171abb4",

"TraceFlags": "01",

"TraceState": "",

"Remote": false

},

"SpanKind": 3,

"StartTime": "2023-12-19T18:01:27.891912+08:00",

"EndTime": "2023-12-19T18:01:27.9016198+08:00",

"Attributes": [

{

"Key": "rpc.system",

"Value": {

"Type": "STRING",

"Value": "grpc"

}

},

{

"Key": "rpc.service",

"Value": {

"Type": "STRING",

"Value": "user.User"

}

},

{

"Key": "rpc.method",

"Value": {

"Type": "STRING",

"Value": "UserInfo"

}

},

{

"Key": "rpc.grpc.status_code",

"Value": {

"Type": "INT64",

"Value": 0

}

}

],

"Events": [

{

"Name": "message",

"Attributes": [

{

"Key": "message.type",

"Value": {

"Type": "STRING",

"Value": "SENT"

}

},

{

"Key": "message.id",

"Value": {

"Type": "INT64",

"Value": 1

}

},

{

"Key": "message.uncompressed_size",

"Value": {

"Type": "INT64",

"Value": 2

}

}

],

"DroppedAttributeCount": 0,

"Time": "2023-12-19T18:01:27.891912+08:00"

},

{

"Name": "message",

"Attributes": [

{

"Key": "message.type",

"Value": {

"Type": "STRING",

"Value": "RECEIVED"

}

},

{

"Key": "message.id",

"Value": {

"Type": "INT64",

"Value": 1

}

},

{

"Key": "message.uncompressed_size",

"Value": {

"Type": "INT64",

"Value": 23

}

}

],

"DroppedAttributeCount": 0,

"Time": "2023-12-19T18:01:27.9016198+08:00"

}

],

"Links": null,

"Status": {

"Code": "Unset",

"Description": ""

},

"DroppedAttributes": 0,

"DroppedEvents": 0,

"DroppedLinks": 0,

"ChildSpanCount": 0,

"Resource": [

{

"Key": "service.name",

"Value": {

"Type": "STRING",

"Value": "user.api"

}

}

],

"InstrumentationLibrary": {

"Name": "go-zero",

"Version": "",

"SchemaURL": ""

}

}

//span(client-redis)

{

"Name": "redis",

"SpanContext": {

"TraceID": "5204e4a305e5c4a4b2b86dc4d426e70b",

"SpanID": "7d856ed633df50cf",

"TraceFlags": "01",

"TraceState": "",

"Remote": false

},

"Parent": {

"TraceID": "5204e4a305e5c4a4b2b86dc4d426e70b",

"SpanID": "d76929b7c22e20f4",

"TraceFlags": "01",

"TraceState": "",

"Remote": false

},

"SpanKind": 3,

"StartTime": "2023-12-19T18:01:27.8929801+08:00",

"EndTime": "2023-12-19T18:01:27.9005968+08:00",

"Attributes": [

{

"Key": "redis.cmds",

"Value": {

"Type": "STRINGSLICE",

"Value": [

"get"

]

}

}

],

"Events": null,

"Links": null,

"Status": {

"Code": "Ok",

"Description": ""

},

"DroppedAttributes": 0,

"DroppedEvents": 0,

"DroppedLinks": 0,

"ChildSpanCount": 0,

"Resource": [

{

"Key": "service.name",

"Value": {

"Type": "STRING",

"Value": "user.rpc"

}

}

],

"InstrumentationLibrary": {

"Name": "go-zero",

"Version": "",

"SchemaURL": ""

}

}

//span(server-rpc)

{

"Name": "user.User/UserInfo",

"SpanContext": {

"TraceID": "5204e4a305e5c4a4b2b86dc4d426e70b",

"SpanID": "d76929b7c22e20f4",

"TraceFlags": "01",

"TraceState": "",

"Remote": false

},

"Parent": {

"TraceID": "5204e4a305e5c4a4b2b86dc4d426e70b",

"SpanID": "22e1947a80132233",

"TraceFlags": "01",

"TraceState": "",

"Remote": true

},

"SpanKind": 2,

"StartTime": "2023-12-19T18:01:27.8924298+08:00",

"EndTime": "2023-12-19T18:01:27.9011+08:00",

"Attributes": [

{

"Key": "rpc.system",

"Value": {

"Type": "STRING",

"Value": "grpc"

}

},

{

"Key": "rpc.service",

"Value": {

"Type": "STRING",

"Value": "user.User"

}

},

{

"Key": "rpc.method",

"Value": {

"Type": "STRING",

"Value": "UserInfo"

}

},

{

"Key": "net.peer.ip",

"Value": {

"Type": "STRING",

"Value": "192.168.1.36"

}

},

{

"Key": "net.peer.port",

"Value": {

"Type": "STRING",

"Value": "53405"

}

},

{

"Key": "rpc.grpc.status_code",

"Value": {

"Type": "INT64",

"Value": 0

}

}

],

"Events": [

{

"Name": "message",

"Attributes": [

{

"Key": "message.type",

"Value": {

"Type": "STRING",

"Value": "RECEIVED"

}

},

{

"Key": "message.id",

"Value": {

"Type": "INT64",

"Value": 1

}

},

{

"Key": "message.uncompressed_size",

"Value": {

"Type": "INT64",

"Value": 2

}

}

],

"DroppedAttributeCount": 0,

"Time": "2023-12-19T18:01:27.8924298+08:00"

},

{

"Name": "message",

"Attributes": [

{

"Key": "message.type",

"Value": {

"Type": "STRING",

"Value": "SENT"

}

},

{

"Key": "message.id",

"Value": {

"Type": "INT64",

"Value": 1

}

},

{

"Key": "message.uncompressed_size",

"Value": {

"Type": "INT64",

"Value": 23

}

}

],

"DroppedAttributeCount": 0,

"Time": "2023-12-19T18:01:27.9011+08:00"

}

],

"Links": null,

"Status": {

"Code": "Unset",

"Description": ""

},

"DroppedAttributes": 0,

"DroppedEvents": 0,

"DroppedLinks": 0,

"ChildSpanCount": 1,

"Resource": [

{

"Key": "service.name",

"Value": {

"Type": "STRING",

"Value": "user.rpc"

}

}

],

"InstrumentationLibrary": {

"Name": "go-zero",

"Version": "",

"SchemaURL": ""

}

}

上面的span的屬性,我們先不介紹,等開始看源碼的時候

1.Exporter

? ? ? ? 由于trace記錄的數(shù)據都要導出到一個具體的軟件去做分析,go的OpenTelemetry支持文件,終端,http,grpc,Jaeger,Zipkin等等,我們只是為了獲取數(shù)據,所以這里直接將數(shù)據輸出到文件。所以go-zero的user的api和rpc服務的配置

//api的user.yaml

Telemetry:

Name: user.api

Endpoint: ./usertrace.log

Sampler: 1.0

Batcher: file

//rpc的user.yaml

Telemetry:

Name: user.rpc

Endpoint: ./usertrace.log

Sampler: 1.0

Batcher: file

無論是api和rpc的服務,都會調用c.SetUp()函數(shù),然后SetUp函trace.StartAgent(sc.Telemetry),然后進入StartAgent里面,里面會將Endpoint的set結構保存起來,然后執(zhí)行內部的startAgent

func startAgent(c Config) error {

AddResources(semconv.ServiceNameKey.String(c.Name))

opts := []sdktrace.TracerProviderOption{

// Set the sampling rate based on the parent span to 100%

sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(c.Sampler))),

// Record information about this application in a Resource.

sdktrace.WithResource(resource.NewSchemaless(attrResources...)),

}

if len(c.Endpoint) > 0 {

exp, err := createExporter(c)

if err != nil {

logx.Error(err)

return err

}

// Always be sure to batch in production.

opts = append(opts, sdktrace.WithBatcher(exp))

}

tp = sdktrace.NewTracerProvider(opts...)

otel.SetTracerProvider(tp)

otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {

logx.Errorf("[otel] error: %v", err)

}))

return nil

}

其中

tp = sdktrace.NewTracerProvider(opts...)

otel.SetTracerProvider(tp)

這兩句代碼是設置TracerProvider,我們看一下TracerProvider有哪些字段

type TracerProvider struct {

mu sync.Mutex

namedTracer map[instrumentation.Scope]*tracer

spanProcessors atomic.Pointer[spanProcessorStates]

isShutdown atomic.Bool

// These fields are not protected by the lock mu. They are assumed to be

// immutable after creation of the TracerProvider.

sampler Sampler

idGenerator IDGenerator

spanLimits SpanLimits

resource *resource.Resource

}

AddResources(semconv.ServiceNameKey.String(c.Name))

是將attribute.KeyValue的鍵值對存入,上面日志里的resource屬性,value就是配置里的Name值

"Resource": [

{

"Key": "service.name",

"Value": {

"Type": "STRING",

"Value": "user.rpc"

}

}

],

其中的

opts := []sdktrace.TracerProviderOption{

// Set the sampling rate based on the parent span to 100%

sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(c.Sampler))),

// Record information about this application in a Resource.

sdktrace.WithResource(resource.NewSchemaless(attrResources...)),

}

sdktrace.WithSampler賦值TracerProvider的sampler,我們來看一下sampler是什么東西,看源碼注釋

func TraceIDRatioBased(fraction float64) Sampler {

//如果fraction大于1.返回AlwaysSample,這個是一定記錄數(shù)據

if fraction >= 1 {

return AlwaysSample()

}

if fraction <= 0 {

fraction = 0

}

//如果fraction0-1之間.返回traceIDRatioSampler,這個是按fraction的概率來記錄

return &traceIDRatioSampler{

traceIDUpperBound: uint64(fraction * (1 << 63)),

description: fmt.Sprintf("TraceIDRatioBased{%g}", fraction),

}

}

func (ts traceIDRatioSampler) ShouldSample(p SamplingParameters) SamplingResult {

psc := trace.SpanContextFromContext(p.ParentContext)

//這個是選取TraceID的8-16位的一個字節(jié)的數(shù)字來判斷概率是否記錄span

x := binary.BigEndian.Uint64(p.TraceID[8:16]) >> 1

//如果比邊界小,就記錄span

if x < ts.traceIDUpperBound {

return SamplingResult{

Decision: RecordAndSample,

Tracestate: psc.TraceState(),

}

}

//大于就不記錄

return SamplingResult{

Decision: Drop,

Tracestate: psc.TraceState(),

}

}

在這上面,由于我們的user.yaml的Sampler是1,所以是AlwaysSample,這個代表的是我們一直記錄span,當然也有alwaysOffSampler從來不記錄。

sdktrace.WithResource就是賦值了TracerProvider的resource

下面看怎么賦值TracerProvider的resource的spanProcessors

if len(c.Endpoint) > 0 { ?? ??? ?exp, err := createExporter(c) ?? ??? ?if err != nil { ?? ??? ??? ?logx.Error(err) ?? ??? ??? ?return err ?? ??? ?}

?? ??? ?// Always be sure to batch in production. ?? ??? ?opts = append(opts, sdktrace.WithBatcher(exp)) ?? ?}

createExporter(c)的函數(shù)可以根據http,grpc,Jaeger,Zipkin,file等,由于user.yaml的Batcher: file所以我們選擇的是file,返回的exp都要滿足接口

type SpanExporter interface { ? ? ExportSpans(ctx context.Context, spans []ReadOnlySpan) error ? ? Shutdown(ctx context.Context) error }

然后看看Exporter的機制是怎樣導出數(shù)據的

sdktrace.WithBatcher(exp)---->NewBatchSpanProcessor(e, opts...)導出機制就在這個類里實現(xiàn)的,看源碼注釋

// Copyright The OpenTelemetry Authors

//

// Licensed under the Apache License, Version 2.0 (the "License");

// you may not use this file except in compliance with the License.

// You may obtain a copy of the License at

//

// http://www.apache.org/licenses/LICENSE-2.0

//

// Unless required by applicable law or agreed to in writing, software

// distributed under the License is distributed on an "AS IS" BASIS,

// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

// See the License for the specific language governing permissions and

// limitations under the License.

package trace // import "go.opentelemetry.io/otel/sdk/trace"

import (

"context"

"sync"

"sync/atomic"

"time"

"go.opentelemetry.io/otel"

"go.opentelemetry.io/otel/internal/global"

"go.opentelemetry.io/otel/sdk/internal/env"

"go.opentelemetry.io/otel/trace"

)

// Defaults for BatchSpanProcessorOptions.

const (

DefaultMaxQueueSize = 2048

DefaultScheduleDelay = 5000

DefaultExportTimeout = 30000

DefaultMaxExportBatchSize = 512

)

// BatchSpanProcessorOption configures a BatchSpanProcessor.

type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)

// BatchSpanProcessorOptions is configuration settings for a

// BatchSpanProcessor.

type BatchSpanProcessorOptions struct {

// MaxQueueSize is the maximum queue size to buffer spans for delayed processing. If the

// queue gets full it drops the spans. Use BlockOnQueueFull to change this behavior.

// The default value of MaxQueueSize is 2048.

MaxQueueSize int

// BatchTimeout is the maximum duration for constructing a batch. Processor

// forcefully sends available spans when timeout is reached.

// The default value of BatchTimeout is 5000 msec.

BatchTimeout time.Duration

// ExportTimeout specifies the maximum duration for exporting spans. If the timeout

// is reached, the export will be cancelled.

// The default value of ExportTimeout is 30000 msec.

ExportTimeout time.Duration

// MaxExportBatchSize is the maximum number of spans to process in a single batch.

// If there are more than one batch worth of spans then it processes multiple batches

// of spans one batch after the other without any delay.

// The default value of MaxExportBatchSize is 512.

MaxExportBatchSize int

// BlockOnQueueFull blocks onEnd() and onStart() method if the queue is full

// AND if BlockOnQueueFull is set to true.

// Blocking option should be used carefully as it can severely affect the performance of an

// application.

BlockOnQueueFull bool

}

// batchSpanProcessor is a SpanProcessor that batches asynchronously-received

// spans and sends them to a trace.Exporter when complete.

type batchSpanProcessor struct {

e SpanExporter

o BatchSpanProcessorOptions

queue chan ReadOnlySpan

dropped uint32

batch []ReadOnlySpan

batchMutex sync.Mutex

timer *time.Timer

stopWait sync.WaitGroup

stopOnce sync.Once

stopCh chan struct{}

stopped atomic.Bool

}

var _ SpanProcessor = (*batchSpanProcessor)(nil)

// NewBatchSpanProcessor creates a new SpanProcessor that will send completed

// span batches to the exporter with the supplied options.

//

// If the exporter is nil, the span processor will perform no action.

func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor {

//默認的maxQueueSize 是2048

maxQueueSize := env.BatchSpanProcessorMaxQueueSize(DefaultMaxQueueSize)

//默認的maxExportBatchSize是512

maxExportBatchSize := env.BatchSpanProcessorMaxExportBatchSize(DefaultMaxExportBatchSize)

//檢驗參數(shù)

if maxExportBatchSize > maxQueueSize {

if DefaultMaxExportBatchSize > maxQueueSize {

maxExportBatchSize = maxQueueSize

} else {

maxExportBatchSize = DefaultMaxExportBatchSize

}

}

//BatchTimeout超時時間是5s,ExportTimeout是30s

o := BatchSpanProcessorOptions{

BatchTimeout: time.Duration(env.BatchSpanProcessorScheduleDelay(DefaultScheduleDelay)) * time.Millisecond,

ExportTimeout: time.Duration(env.BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond,

MaxQueueSize: maxQueueSize,

MaxExportBatchSize: maxExportBatchSize,

}

//可選項模式

for _, opt := range options {

opt(&o)

}

bsp := &batchSpanProcessor{

e: exporter,

o: o,

batch: make([]ReadOnlySpan, 0, o.MaxExportBatchSize),

timer: time.NewTimer(o.BatchTimeout),

queue: make(chan ReadOnlySpan, o.MaxQueueSize),

stopCh: make(chan struct{}),

}

bsp.stopWait.Add(1)

go func() {

defer bsp.stopWait.Done()

//處理queue數(shù)據,并導出

bsp.processQueue()

//當stopCh關閉時,殘留的數(shù)據繼續(xù)導出

bsp.drainQueue()

}()

return bsp

}

// OnStart method does nothing.

//對外提供的OnStart接口,span.start的時候調用

func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {}

//對外提供的OnEnd接口,span.end的時候調用

// OnEnd method enqueues a ReadOnlySpan for later processing.

func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) {

// Do not enqueue spans after Shutdown.

if bsp.stopped.Load() {

return

}

// Do not enqueue spans if we are just going to drop them.

if bsp.e == nil {

return

}

bsp.enqueue(s)

}

// Shutdown flushes the queue and waits until all spans are processed.

// It only executes once. Subsequent call does nothing.

func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {

var err error

bsp.stopOnce.Do(func() { //once只執(zhí)行一次

bsp.stopped.Store(true) //設置停止的狀態(tài)

wait := make(chan struct{})

go func() {

close(bsp.stopCh) //關閉隊列執(zhí)行的通道

bsp.stopWait.Wait() //等到bsp.processQueue() bsp.drainQueue()兩個函數(shù)執(zhí)行完了,才走下面的關閉

if bsp.e != nil {

//file類型的接口Shutdown函數(shù)

if err := bsp.e.Shutdown(ctx); err != nil {

otel.Handle(err)

}

}

close(wait) //關閉wait的通道

}()

// Wait until the wait group is done or the context is cancelled

select {

case <-wait: //等到file的Shutdown函數(shù)的協(xié)程函數(shù)執(zhí)行完,才能監(jiān)聽到退出

case <-ctx.Done():

err = ctx.Err()

}

})

return err

}

type forceFlushSpan struct {

ReadOnlySpan

flushed chan struct{}

}

func (f forceFlushSpan) SpanContext() trace.SpanContext {

return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled})

}

// ForceFlush exports all ended spans that have not yet been exported.

func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {

// Interrupt if context is already canceled.

if err := ctx.Err(); err != nil {

return err

}

// Do nothing after Shutdown.

if bsp.stopped.Load() {

return nil

}

var err error

if bsp.e != nil {

flushCh := make(chan struct{})

if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) {

select {

case <-bsp.stopCh:

// The batchSpanProcessor is Shutdown.

return nil

case <-flushCh:

// Processed any items in queue prior to ForceFlush being called

case <-ctx.Done():

return ctx.Err()

}

}

wait := make(chan error)

go func() {

wait <- bsp.exportSpans(ctx)

close(wait)

}()

// Wait until the export is finished or the context is cancelled/timed out

select {

case err = <-wait:

case <-ctx.Done():

err = ctx.Err()

}

}

return err

}

//下面都是可選項模式賦值

// WithMaxQueueSize returns a BatchSpanProcessorOption that configures the

// maximum queue size allowed for a BatchSpanProcessor.

func WithMaxQueueSize(size int) BatchSpanProcessorOption {

return func(o *BatchSpanProcessorOptions) {

o.MaxQueueSize = size

}

}

// WithMaxExportBatchSize returns a BatchSpanProcessorOption that configures

// the maximum export batch size allowed for a BatchSpanProcessor.

func WithMaxExportBatchSize(size int) BatchSpanProcessorOption {

return func(o *BatchSpanProcessorOptions) {

o.MaxExportBatchSize = size

}

}

// WithBatchTimeout returns a BatchSpanProcessorOption that configures the

// maximum delay allowed for a BatchSpanProcessor before it will export any

// held span (whether the queue is full or not).

func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption {

return func(o *BatchSpanProcessorOptions) {

o.BatchTimeout = delay

}

}

// WithExportTimeout returns a BatchSpanProcessorOption that configures the

// amount of time a BatchSpanProcessor waits for an exporter to export before

// abandoning the export.

func WithExportTimeout(timeout time.Duration) BatchSpanProcessorOption {

return func(o *BatchSpanProcessorOptions) {

o.ExportTimeout = timeout

}

}

// WithBlocking returns a BatchSpanProcessorOption that configures a

// BatchSpanProcessor to wait for enqueue operations to succeed instead of

// dropping data when the queue is full.

func WithBlocking() BatchSpanProcessorOption {

return func(o *BatchSpanProcessorOptions) {

o.BlockOnQueueFull = true

}

}

// exportSpans is a subroutine of processing and draining the queue.

func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {

bsp.timer.Reset(bsp.o.BatchTimeout)

bsp.batchMutex.Lock()

defer bsp.batchMutex.Unlock()

//導出時間超時30s,如果超時直接取消

if bsp.o.ExportTimeout > 0 {

var cancel context.CancelFunc

ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout)

defer cancel()

}

//隊列有數(shù)據才導出

if l := len(bsp.batch); l > 0 {

global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))

//這里就是前面的file的接口ExportSpans函數(shù),并監(jiān)聽ctx超時30s的觸發(fā)

err := bsp.e.ExportSpans(ctx, bsp.batch)

// A new batch is always created after exporting, even if the batch failed to be exported.

//

// It is up to the exporter to implement any type of retry logic if a batch is failing

// to be exported, since it is specific to the protocol and backend being sent to.

//清空隊列

bsp.batch = bsp.batch[:0]

if err != nil {

return err

}

}

return nil

}

// processQueue removes spans from the `queue` channel until processor

// is shut down. It calls the exporter in batches of up to MaxExportBatchSize

// waiting up to BatchTimeout to form a batch.

func (bsp *batchSpanProcessor) processQueue() {

defer bsp.timer.Stop()

ctx, cancel := context.WithCancel(context.Background())

defer cancel()

for {

select {

case <-bsp.stopCh: //停止

return

case <-bsp.timer.C: //3s定時器到了導出數(shù)據

if err := bsp.exportSpans(ctx); err != nil {

otel.Handle(err)

}

case sd := <-bsp.queue: //有數(shù)據到了

if ffs, ok := sd.(forceFlushSpan); ok { //如果是forceFlushSpan類型,關ffs.flushed

close(ffs.flushed)

continue

}

bsp.batchMutex.Lock()

bsp.batch = append(bsp.batch, sd) //將數(shù)據添加到隊列

shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize

bsp.batchMutex.Unlock()

if shouldExport {//如果隊列大于512,就到導出了

if !bsp.timer.Stop() {//導出之前,定時器沒關閉,就等到當前定時器到了,然后再導出數(shù)據

<-bsp.timer.C

}

if err := bsp.exportSpans(ctx); err != nil {

otel.Handle(err)

}

}

}

}

}

// drainQueue awaits the any caller that had added to bsp.stopWait

// to finish the enqueue, then exports the final batch.

func (bsp *batchSpanProcessor) drainQueue() {

// 當processQueue的bsp.stopCh觸發(fā)了,就會調用這里

ctx, cancel := context.WithCancel(context.Background())

defer cancel()

for {

select {

case sd := <-bsp.queue: //如果bsp.queue恰好有發(fā)送數(shù)據,就會觸發(fā)這里

if _, ok := sd.(forceFlushSpan); ok {

// Ignore flush requests as they are not valid spans.

continue

}

bsp.batchMutex.Lock()

bsp.batch = append(bsp.batch, sd)

shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize

bsp.batchMutex.Unlock()

//這里也是到512的長度就直接導出數(shù)據

if shouldExport {

if err := bsp.exportSpans(ctx); err != nil {

otel.Handle(err)

}

}

default: //如果沒有數(shù)據,就將剩余的所有數(shù)據導出

// There are no more enqueued spans. Make final export.

if err := bsp.exportSpans(ctx); err != nil {

otel.Handle(err)

}

return

}

}

}

func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {

ctx := context.TODO()

//可選參數(shù)BlockOnQueueFull =true表示是否阻塞發(fā)送數(shù)據

if bsp.o.BlockOnQueueFull {

//阻塞發(fā)送數(shù)據

bsp.enqueueBlockOnQueueFull(ctx, sd)

} else {

//非阻塞發(fā)送數(shù)據

bsp.enqueueDrop(ctx, sd)

}

}

func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan) bool {

if !sd.SpanContext().IsSampled() {

return false

}

//阻塞等待直到有接受數(shù)據才返回

select {

case bsp.queue <- sd:

return true

case <-ctx.Done():

return false

}

}

func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan) bool {

if !sd.SpanContext().IsSampled() {

return false

}

//如果沒有接受數(shù)據的,直接將dropped+1

select {

case bsp.queue <- sd:

return true

default:

atomic.AddUint32(&bsp.dropped, 1)

}

return false

}

// MarshalLog is the marshaling function used by the logging system to represent this exporter.

func (bsp *batchSpanProcessor) MarshalLog() interface{} {

return struct {

Type string

SpanExporter SpanExporter

Config BatchSpanProcessorOptions

}{

Type: "BatchSpanProcessor",

SpanExporter: bsp.e,

Config: bsp.o,

}

}

????????簡單總結就是span的隊列長度大于一定的長度就會觸發(fā)導出,并且也有定時器觸發(fā)導出,可以設置導出的數(shù)據格式,本文是json,千萬注意其中的,對外提供的OnStart接口,span.start的時候調用,對外提供的OnEnd接口,span.end的時候調用,會將span數(shù)據添加到隊列,可以有是否阻塞的方式 ?

2.propagation

go-zero的設置的TextMapPropagator是在trace的目錄下的propagation.go文件下的初始化的

func init() {

otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(

propagation.TraceContext{}, propagation.Baggage{}))

}

func (p compositeTextMapPropagator) Inject(ctx context.Context, carrier TextMapCarrier) {

for _, i := range p {

i.Inject(ctx, carrier)

}

}

func (p compositeTextMapPropagator) Extract(ctx context.Context, carrier TextMapCarrier) context.Context {

for _, i := range p {

ctx = i.Extract(ctx, carrier)

}

return ctx

}

其中設置的是一個數(shù)組TextMapPropagator的數(shù)組,包含TraceContext和Baggage的實體類的執(zhí)行,就循環(huán)執(zhí)行他們的Extract和Inject函數(shù)

TraceContext的分析,Extract就是TextMapCarrier的頭中得到key是"traceparent",value是

h := fmt.Sprintf("%.2x-%s-%s-%s",

supportedVersion,

sc.TraceID(),

sc.SpanID(),

flags))

的類型的數(shù)據,然后檢驗其合法性,并將spanconnetx的值加入到ctx中,Inject函數(shù)就是spanconnetx的值按照上面的這種格式寫入TextMapCarrier頭中,key是"traceparent",其實就是一個put,一個get,Baggage也是類似的,只不過它只傳輸其他的數(shù)據,自己可以看看源碼

最后真正的分析將會在下章解析

柚子快報激活碼778899分享:go的分布式鏈路追蹤(1)

http://yzkb.51969.com/

文章鏈接

評論可見,查看隱藏內容

本文內容根據網絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉載請注明,如有侵權,聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19295732.html

發(fā)布評論

您暫未設置收款碼

請在主題配置——文章設置里上傳

掃描二維碼手機訪問

文章目錄