栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

skywalking源码分析第十七篇一agent端Trace三部曲一ContextManager追踪链路上下文管理

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

skywalking源码分析第十七篇一agent端Trace三部曲一ContextManager追踪链路上下文管理

这里写目录标题

原理图一ContextManager源码分析一ContextManager上下文属性源码分析一ContextManager跨进行跨线程传输

跨线程跨进程 源码分析一ContextManager上下文创建源码分析一ContextManager创建span

createEntrySpancreateExitSpancreateLocalSpanspan关系图 源码分析一ContextManager.stopSpan

采样 总结

原理图一ContextManager

ContextManager维护整个调用链路对Segment和Span的管理Segment表示调用链路对象,span表示该链路上节点的集合Segment放置于TraceingContext上下文中除了TraceingContext上下文,还有一个RuntimeContext运行时上下文辅助
源码分析一ContextManager上下文属性

     *  这个上下文是核心上下文 用户Segment和span的管理
    private static ThreadLocal ConTEXT = new ThreadLocal();

     * 这个上下文是辅助功能m链路上下文中提供上下文数据传播
     * 比如tomcat接收请求: 在某个拦截器判断是转发请求则设置转发请求标记而在之后某些拦截器判断是转发则不进行链路上一些处理
     * 针对不同的拦截器可以提供不同的作用
    
    private static ThreadLocal RUNTIME_ConTEXT = new ThreadLocal();

源码分析一ContextManager跨进行跨线程传输 跨线程

通过ContextSnapshot完成跨线程

   

    public static ContextSnapshot capture() {
        return get().capture();
    }
    
    public static void continued(ContextSnapshot snapshot) {
        if (snapshot == null) {
            throw new IllegalArgumentException("ContextSnapshot can't be null.");
        }
        if (snapshot.isValid() && !snapshot.isFromCurrent()) {
            get().continued(snapshot);
        }
    }
跨进程

通过ContextCarrier完成跨进程会将ContextCarrier放置框架的额外数据体里,比如mq的properties,dubbo会话的attachment

    
    public static void inject(ContextCarrier carrier) {
        get().inject(carrier);
    }

    
    public static void extract(ContextCarrier carrier) {
        if (carrier == null) {
            throw new IllegalArgumentException("ContextCarrier can't be null.");
        }
        if (carrier.isValid()) {
            get().extract(carrier);
        }
    }
源码分析一ContextManager上下文创建

如果服务和服务实例对应字典都存在 则创建tracecontext 否则还是创建ignore上下文

private static AbstractTracerContext getOrCreate(String operationName, boolean forceSampling) {
    AbstractTracerContext context = CONTEXT.get();
    if (context == null) {
     如果服务和服务实例对应字典都存在 则创建tracecontext 否则还是创建ignore上下文
    if (RemoteDownstreamConfig.Agent.SERVICE_ID != DictionaryUtil.nullValue()
        && RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID != DictionaryUtil.nullValue()
    ) {
        if (EXTEND_SERVICE == null) {
            EXTEND_SERVICE = ServiceManager.INSTANCE.findService(ContextManagerExtendService.class);
        }
        创建tracecontext  
        context = EXTEND_SERVICE.createTraceContext(operationName, forceSampling);
    }
    CONTEXT.set(context);
    }
    return context;
}

创建TracingContext创建TraceSegment设置segment是否按照采样配置进行上报

public AbstractTracerContext createTraceContext(String operationName, boolean forceSampling) {
    AbstractTracerContext context ;
    int suffixIdx = operationName.lastIndexOf(".");
    如果配置名单忽略该endpoint/操作资源 则忽略
    if (suffixIdx > -1 && Config.Agent.IGNORE_SUFFIX.contains(operationName.substring(suffixIdx))) {
        context = new IgnoredTracerContext();
    } else {

        SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
        如果强制采样则采样  如果不强制采样 则按照采样机制判断是否采样
        if (forceSampling || samplingService.trySampling()) {
            context = new TracingContext();
        } else {
            context = new IgnoredTracerContext();
        }
    }

    return context;
}


构造函数
TracingContext(String firstOPName) {
    this.segment = new TraceSegment();
    this.spanIdGenerator = 0;
    isRunningInAsyncMode = false;
    createTime = System.currentTimeMillis();
    running = true;

    if (SAMPLING_SERVICE == null) {
        SAMPLING_SERVICE = ServiceManager.INSTANCE.findService(SamplingService.class);
    }

    // profiling status
    if (PROFILE_TASK_EXECUTION_SERVICE == null) {
        PROFILE_TASK_EXECUTION_SERVICE = ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class);
    }
    this.profileStatus = PROFILE_TASK_EXECUTION_SERVICE.addProfiling(
        this, segment.getTraceSegmentId(), firstOPName);

    this.correlationContext = new CorrelationContext();
    this.extensionContext = new ExtensionContext();
}
源码分析一ContextManager创建span

EntrySpan: 链路刚进入一个新的进程则创建EntrySpan
ExitSpan: 链路刚即将进入下一个进程则创建ExitSpan
LocalSpan: 链路在entrySpan和ExitSpan之间的Span叫做LocalSpan

createEntrySpan

TracingContext.activeSpanStack增加创建的span设置EntrySpan的currentMaxDepth和stackDepth

下节会重点介绍栈机制

public AbstractSpan createEntrySpan(final String operationName) {
        if (isLimitMechanismWorking()) {
            NoopSpan span = new NoopSpan();
            return push(span);
        }
        AbstractSpan entrySpan;
        TracingContext owner = this;
        final AbstractSpan parentSpan = peek();
        final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();
        if (parentSpan != null && parentSpan.isEntry()) {
            
            profilingRecheck(parentSpan, operationName);
            parentSpan.setOperationName(operationName);
            entrySpan = parentSpan;
            return entrySpan.start();
        } else {
            entrySpan = new EntrySpan(
                spanIdGenerator++, parentSpanId,
                operationName, owner
            );
            entrySpan.start();
            return push(entrySpan);
        }
    }
	设置activeSpanStack
    private AbstractSpan push(AbstractSpan span) {
        if (firstSpan == null) {
            firstSpan = span;
        }
        activeSpanStack.addLast(span);
        this.extensionContext.handle(span);
        return span;
    }
  设置currentMaxDepth和stackDepth
  public EntrySpan(int spanId, int parentSpanId, String operationName, TracingContext owner) {
        super(spanId, parentSpanId, operationName, owner);
        this.currentMaxDepth = 0;
    }
createExitSpan

exitSpan加入activeSpanStackexitSpan.start设置stackDepth为1

 public AbstractSpan createExitSpan(final String operationName, final String remotePeer) {
        if (isLimitMechanismWorking()) {
            NoopExitSpan span = new NoopExitSpan(remotePeer);
            return push(span);
        }

        AbstractSpan exitSpan;
        AbstractSpan parentSpan = peek();
        TracingContext owner = this;
        if (parentSpan != null && parentSpan.isExit()) {
            exitSpan = parentSpan;
        } else {
            final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();
            exitSpan = new ExitSpan(spanIdGenerator++, parentSpanId, operationName, remotePeer, owner);
            push(exitSpan);
        }
        exitSpan.start();
        return exitSpan;
    }
createLocalSpan

创建LocalSpan添加到activeSpanStack

public AbstractSpan createLocalSpan(final String operationName) {
        if (isLimitMechanismWorking()) {
            NoopSpan span = new NoopSpan();
            return push(span);
        }
        AbstractSpan parentSpan = peek();
        final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();
        
        AbstractTracingSpan span = new LocalSpan(spanIdGenerator++, parentSpanId, operationName, this);
        span.start();
        return push(span);
    }
span关系图

EntrySpan和ExitSpan具有当前栈深度stackDepthEntrySpan具有最大栈深度currentMaxDepth
源码分析一ContextManager.stopSpan

通过context停止span

   private static void stopSpan(AbstractSpan span, final AbstractTracerContext context) {
        if (context.stopSpan(span)) {
            CONTEXT.remove();
            RUNTIME_CONTEXT.remove();
        }
    }

获取activeSpanStack栈上当前span最后一个span则添加到segment.spans属性从栈上移除当前span

    public boolean stopSpan(AbstractSpan span) {
        获取activeSpanStack栈上当前span
        AbstractSpan lastSpan = peek();
        if (lastSpan == span) {
            if (lastSpan instanceof AbstractTracingSpan) {
                AbstractTracingSpan toFinishSpan = (AbstractTracingSpan) lastSpan;
                最后一个span则添加到segment
                if (toFinishSpan.finish(segment)) {
                	从栈上移除当前span
                    pop();
                }
            } else {
                pop();
            }
        } else {
            throw new IllegalStateException("Stopping the unexpected span = " + span);
        }
		采样处理
        finish();

        return activeSpanStack.isEmpty();
    }
采样

从栈上移除span添加到TracerSegment如果是最后一个span,则进行采样数据上报通过producer-consumer模型发送Segment到服务端

private void finish() {
    ...... 删除其他代码
    TracingContext.ListenerManager.notifyFinish(finishedSegment);
}

static void notifyFinish(TracingContext finishedContext) {
    for (TracingThreadListener listener : LISTENERS) {
    通过listener处理采样机制
        listener.afterFinished(finishedSegment);

    }
}

public void afterFinished(TraceSegment traceSegment) {
    如果忽略则不上报服务端[生成traceSegment设置该值]
    if (traceSegment.isIgnore()) {
        return;
    }
    通过carrier producer-consumer机制完成上报数据
    if (!carrier.produce(traceSegment)) {
        if (logger.isDebugEnable()) {
            logger.debug("One trace segment has been abandoned, cause by buffer is full.");
        }
    }
}

总结

ContextManager一般用于各类框架的插件中,用于处理TracerSegment和Span新建的Span并不是直接加入TracerSegment而是加入TracingContext.activeSpanStackSpan根据是否离开进程分为Entry,Local,Exit三种类型activeSpanStack为空才会发送Segment到服务端

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/759674.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号