原理图一ContextManager源码分析一ContextManager上下文属性源码分析一ContextManager跨进行跨线程传输
跨线程跨进程 源码分析一ContextManager上下文创建源码分析一ContextManager创建span
createEntrySpancreateExitSpancreateLocalSpanspan关系图 源码分析一ContextManager.stopSpan
采样 总结
原理图一ContextManagerContextManager维护整个调用链路对Segment和Span的管理Segment表示调用链路对象,span表示该链路上节点的集合Segment放置于TraceingContext上下文中除了TraceingContext上下文,还有一个RuntimeContext运行时上下文辅助
源码分析一ContextManager上下文属性
* 这个上下文是核心上下文 用户Segment和span的管理
private static ThreadLocal ConTEXT = new ThreadLocal();
* 这个上下文是辅助功能m链路上下文中提供上下文数据传播
* 比如tomcat接收请求: 在某个拦截器判断是转发请求则设置转发请求标记而在之后某些拦截器判断是转发则不进行链路上一些处理
* 针对不同的拦截器可以提供不同的作用
private static ThreadLocal RUNTIME_ConTEXT = new ThreadLocal();
源码分析一ContextManager跨进行跨线程传输
跨线程
通过ContextSnapshot完成跨线程
public static ContextSnapshot capture() {
return get().capture();
}
public static void continued(ContextSnapshot snapshot) {
if (snapshot == null) {
throw new IllegalArgumentException("ContextSnapshot can't be null.");
}
if (snapshot.isValid() && !snapshot.isFromCurrent()) {
get().continued(snapshot);
}
}
跨进程
通过ContextCarrier完成跨进程会将ContextCarrier放置框架的额外数据体里,比如mq的properties,dubbo会话的attachment
public static void inject(ContextCarrier carrier) {
get().inject(carrier);
}
public static void extract(ContextCarrier carrier) {
if (carrier == null) {
throw new IllegalArgumentException("ContextCarrier can't be null.");
}
if (carrier.isValid()) {
get().extract(carrier);
}
}
源码分析一ContextManager上下文创建
如果服务和服务实例对应字典都存在 则创建tracecontext 否则还是创建ignore上下文
private static AbstractTracerContext getOrCreate(String operationName, boolean forceSampling) {
AbstractTracerContext context = CONTEXT.get();
if (context == null) {
如果服务和服务实例对应字典都存在 则创建tracecontext 否则还是创建ignore上下文
if (RemoteDownstreamConfig.Agent.SERVICE_ID != DictionaryUtil.nullValue()
&& RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID != DictionaryUtil.nullValue()
) {
if (EXTEND_SERVICE == null) {
EXTEND_SERVICE = ServiceManager.INSTANCE.findService(ContextManagerExtendService.class);
}
创建tracecontext
context = EXTEND_SERVICE.createTraceContext(operationName, forceSampling);
}
CONTEXT.set(context);
}
return context;
}
创建TracingContext创建TraceSegment设置segment是否按照采样配置进行上报
public AbstractTracerContext createTraceContext(String operationName, boolean forceSampling) {
AbstractTracerContext context ;
int suffixIdx = operationName.lastIndexOf(".");
如果配置名单忽略该endpoint/操作资源 则忽略
if (suffixIdx > -1 && Config.Agent.IGNORE_SUFFIX.contains(operationName.substring(suffixIdx))) {
context = new IgnoredTracerContext();
} else {
SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
如果强制采样则采样 如果不强制采样 则按照采样机制判断是否采样
if (forceSampling || samplingService.trySampling()) {
context = new TracingContext();
} else {
context = new IgnoredTracerContext();
}
}
return context;
}
构造函数
TracingContext(String firstOPName) {
this.segment = new TraceSegment();
this.spanIdGenerator = 0;
isRunningInAsyncMode = false;
createTime = System.currentTimeMillis();
running = true;
if (SAMPLING_SERVICE == null) {
SAMPLING_SERVICE = ServiceManager.INSTANCE.findService(SamplingService.class);
}
// profiling status
if (PROFILE_TASK_EXECUTION_SERVICE == null) {
PROFILE_TASK_EXECUTION_SERVICE = ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class);
}
this.profileStatus = PROFILE_TASK_EXECUTION_SERVICE.addProfiling(
this, segment.getTraceSegmentId(), firstOPName);
this.correlationContext = new CorrelationContext();
this.extensionContext = new ExtensionContext();
}
源码分析一ContextManager创建span
createEntrySpanEntrySpan: 链路刚进入一个新的进程则创建EntrySpan
ExitSpan: 链路刚即将进入下一个进程则创建ExitSpan
LocalSpan: 链路在entrySpan和ExitSpan之间的Span叫做LocalSpan
TracingContext.activeSpanStack增加创建的span设置EntrySpan的currentMaxDepth和stackDepth
下节会重点介绍栈机制
public AbstractSpan createEntrySpan(final String operationName) {
if (isLimitMechanismWorking()) {
NoopSpan span = new NoopSpan();
return push(span);
}
AbstractSpan entrySpan;
TracingContext owner = this;
final AbstractSpan parentSpan = peek();
final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();
if (parentSpan != null && parentSpan.isEntry()) {
profilingRecheck(parentSpan, operationName);
parentSpan.setOperationName(operationName);
entrySpan = parentSpan;
return entrySpan.start();
} else {
entrySpan = new EntrySpan(
spanIdGenerator++, parentSpanId,
operationName, owner
);
entrySpan.start();
return push(entrySpan);
}
}
设置activeSpanStack
private AbstractSpan push(AbstractSpan span) {
if (firstSpan == null) {
firstSpan = span;
}
activeSpanStack.addLast(span);
this.extensionContext.handle(span);
return span;
}
设置currentMaxDepth和stackDepth
public EntrySpan(int spanId, int parentSpanId, String operationName, TracingContext owner) {
super(spanId, parentSpanId, operationName, owner);
this.currentMaxDepth = 0;
}
createExitSpan
exitSpan加入activeSpanStackexitSpan.start设置stackDepth为1
public AbstractSpan createExitSpan(final String operationName, final String remotePeer) {
if (isLimitMechanismWorking()) {
NoopExitSpan span = new NoopExitSpan(remotePeer);
return push(span);
}
AbstractSpan exitSpan;
AbstractSpan parentSpan = peek();
TracingContext owner = this;
if (parentSpan != null && parentSpan.isExit()) {
exitSpan = parentSpan;
} else {
final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();
exitSpan = new ExitSpan(spanIdGenerator++, parentSpanId, operationName, remotePeer, owner);
push(exitSpan);
}
exitSpan.start();
return exitSpan;
}
createLocalSpan
创建LocalSpan添加到activeSpanStack
public AbstractSpan createLocalSpan(final String operationName) {
if (isLimitMechanismWorking()) {
NoopSpan span = new NoopSpan();
return push(span);
}
AbstractSpan parentSpan = peek();
final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();
AbstractTracingSpan span = new LocalSpan(spanIdGenerator++, parentSpanId, operationName, this);
span.start();
return push(span);
}
span关系图
EntrySpan和ExitSpan具有当前栈深度stackDepthEntrySpan具有最大栈深度currentMaxDepth
源码分析一ContextManager.stopSpan
通过context停止span
private static void stopSpan(AbstractSpan span, final AbstractTracerContext context) {
if (context.stopSpan(span)) {
CONTEXT.remove();
RUNTIME_CONTEXT.remove();
}
}
获取activeSpanStack栈上当前span最后一个span则添加到segment.spans属性从栈上移除当前span
public boolean stopSpan(AbstractSpan span) {
获取activeSpanStack栈上当前span
AbstractSpan lastSpan = peek();
if (lastSpan == span) {
if (lastSpan instanceof AbstractTracingSpan) {
AbstractTracingSpan toFinishSpan = (AbstractTracingSpan) lastSpan;
最后一个span则添加到segment
if (toFinishSpan.finish(segment)) {
从栈上移除当前span
pop();
}
} else {
pop();
}
} else {
throw new IllegalStateException("Stopping the unexpected span = " + span);
}
采样处理
finish();
return activeSpanStack.isEmpty();
}
采样
从栈上移除span添加到TracerSegment如果是最后一个span,则进行采样数据上报通过producer-consumer模型发送Segment到服务端
private void finish() {
...... 删除其他代码
TracingContext.ListenerManager.notifyFinish(finishedSegment);
}
static void notifyFinish(TracingContext finishedContext) {
for (TracingThreadListener listener : LISTENERS) {
通过listener处理采样机制
listener.afterFinished(finishedSegment);
}
}
public void afterFinished(TraceSegment traceSegment) {
如果忽略则不上报服务端[生成traceSegment设置该值]
if (traceSegment.isIgnore()) {
return;
}
通过carrier producer-consumer机制完成上报数据
if (!carrier.produce(traceSegment)) {
if (logger.isDebugEnable()) {
logger.debug("One trace segment has been abandoned, cause by buffer is full.");
}
}
}
总结
ContextManager一般用于各类框架的插件中,用于处理TracerSegment和Span新建的Span并不是直接加入TracerSegment而是加入TracingContext.activeSpanStackSpan根据是否离开进程分为Entry,Local,Exit三种类型activeSpanStack为空才会发送Segment到服务端



