上图是一个下单接口的链路,在链路中首先要理解的概念是Segment,Segment表示一个JVM进程内的所有操作,上图中有6个Segment。Gateway Segment是Mall Segment的parent,通过parent关系就可以把多个Segment按顺序拼起来组装成一个链路
一个Segment里可能发生多个操作,如上图Segment中操作1是查Redis,操作2是查MySQL,这就是两个Span,Span表示一个具体的操作。Span之间也是基于parent的关系构建起来的,而Segment是Span的容器
多个Segment连接起来就组成了一个Trace,每个Trace都有一个全局唯一的ID
推荐阅读:
OpenTracing语义规范
OpenTracing语义规范译文
谷歌Dapper论文
谷歌Dapper论文译文
小结:
22、链路ID生成定义了traceId的抽象类DistributedTraceId,代码如下:
@RequiredArgsConstructor
@ToString
@EqualsAndHashCode
public abstract class DistributedTraceId {
@Getter
private final String id;
}
DistributedTraceId有两个实现PropagatedTraceId和NewDistributedTraceId
PropagatedTraceId构造函数需要传入traceId并赋值
public class PropagatedTraceId extends DistributedTraceId {
public PropagatedTraceId(String id) {
super(id);
}
}
NewDistributedTraceId会通过GlobalIdGenerator的generate()方法生成traceId并赋值
public class NewDistributedTraceId extends DistributedTraceId {
public NewDistributedTraceId() {
super(GlobalIdGenerator.generate());
}
}
GlobalIdGenerator的generate()方法,该方法用于生成TraceId和SegmentId,代码如下:
public final class GlobalIdGenerator {
private static final String PROCESS_ID = UUID.randomUUID().toString().replaceAll("-", "");
private static final ThreadLocal THREAD_ID_SEQUENCE = ThreadLocal.withInitial(
() -> new IDContext(System.currentTimeMillis(), (short) 0));
private GlobalIdGenerator() {
}
public static String generate() {
return StringUtil.join(
'.',
PROCESS_ID,
String.valueOf(Thread.currentThread().getId()),
String.valueOf(THREAD_ID_SEQUENCE.get().nextSeq())
);
}
生成的traceId由三部分组成,第一部分表示应用实例的Id,是一个UUID;第二部分表示线程Id;第三部分是一个毫秒级的时间戳+一个当前线程里的序号,该序号的范围是0-9999
第三部分调用ThreadLocal中IDContext的nextSeq()方法生成,代码如下:
public final class GlobalIdGenerator {
private static final ThreadLocal THREAD_ID_SEQUENCE = ThreadLocal.withInitial(
() -> new IDContext(System.currentTimeMillis(), (short) 0));
private static class IDContext {
// 上次生成sequence的时间戳
private long lastTimestamp;
// 线程的序列号
private short threadSeq;
// Just for considering time-shift-back only.
// 时钟回拨
private long lastShiftTimestamp;
private int lastShiftValue;
private IDContext(long lastTimestamp, short threadSeq) {
this.lastTimestamp = lastTimestamp;
this.threadSeq = threadSeq;
}
private long nextSeq() {
// 时间戳 * 10000 + 线程的序列号
return timestamp() * 10000 + nextThreadSeq();
}
private long timestamp() {
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis < lastTimestamp) { // 发生了时钟回拨
// Just for considering time-shift-back by Ops or OS. @hanahmily 's suggestion.
if (lastShiftTimestamp != currentTimeMillis) {
lastShiftValue++;
lastShiftTimestamp = currentTimeMillis;
}
return lastShiftValue;
} else { // 时钟正常
lastTimestamp = currentTimeMillis;
return lastTimestamp;
}
}
private short nextThreadSeq() {
if (threadSeq == 10000) {
threadSeq = 0;
}
return threadSeq++;
}
}
23、TraceSegment
先来看下TraceSegment中定义的属性:
public class TraceSegment {
private String traceSegmentId;
private TraceSegmentRef ref;
- traceSegmentId:每个Segment都有一个全局唯一的Id
- ref:指向当前Segment的Parent Segment的指针
TraceSegmentRef的代码如下:
@Getter
public class TraceSegmentRef {
private SegmentRefType type; // segment类型:跨进程或跨线程
private String traceId;
private String traceSegmentId; // segmentId
private int spanId;
private String parentService; // Mall -> Order 对于Order服务来讲,parentService就是Mall
private String parentServiceInstance; // parentService的具体一个实例
private String parentEndpoint; // 进入parentService的那个请求
private String addressUsedAtClient;
public enum SegmentRefType {
CROSS_PROCESS, CROSS_THREAD
}
public class TraceSegment {
private List spans;
private DistributedTraceId relatedGlobalTraceId;
- spans:保存Segment中所有的Span
- relatedGlobalTraceId:当前Segment所在Trace的Id
TraceSegment中的定义方法如下:
public class TraceSegment {
public TraceSegment() {
this.traceSegmentId = GlobalIdGenerator.generate();
this.spans = new LinkedList<>();
this.relatedGlobalTraceId = new NewDistributedTraceId();
this.createTime = System.currentTimeMillis();
}
public void ref(TraceSegmentRef refSegment) {
if (null == ref) {
this.ref = refSegment;
}
}
public void relatedGlobalTrace(DistributedTraceId distributedTraceId) {
if (relatedGlobalTraceId instanceof NewDistributedTraceId) {
this.relatedGlobalTraceId = distributedTraceId;
}
}
public void archive(AbstractTracingSpan finishedSpan) {
spans.add(finishedSpan);
}
public TraceSegment finish(boolean isSizeLimited) {
this.isSizeLimited = isSizeLimited;
return this;
}
finish()需要传入isSizeLimited,SkyWalking中限制了Segment中Span的数量,默认是最大是300,上层调用finish()方法时会判断此时该Segment中的Span是否到达上限
public class Config {
public static class Agent {
public static int SPAN_LIMIT_PER_SEGMENT = 300;
小结:
24、Span基本概念Span的最顶层实现为AsyncSpan,代码如下:
public interface AsyncSpan {
AbstractSpan prepareForAsync();
AbstractSpan asyncFinish();
}
AbstractSpan继承了AsyncSpan,代码如下:
public interface AbstractSpan extends AsyncSpan {
AbstractSpan setComponent(Component component);
AbstractSpan setLayer(SpanLayer layer);
setLayer()用于指定当前Span表示的操作所在的插件属于哪一种SkyWalking划分的类型,在SpanLayer中定义了五种类型:
public enum SpanLayer {
DB(1), RPC_FRAMEWORK(2), HTTP(3), MQ(4), CACHE(5);
public AbstractSpan extends AsyncSpan {
AbstractSpan tag(AbstractTag> tag, String value);
AbstractSpan log(Throwable t);
boolean isEntry();
boolean isExit();
AbstractSpan log(long timestamp, Map event);
AbstractSpan setOperationName(String operationName);
AbstractSpan start();
int getSpanId();
String getOperationName();
void ref(TraceSegmentRef ref);
AbstractSpan start(long startTime);
AbstractSpan setPeer(String remotePeer);
小结:
25、Span完整模型抽象类AbstractTracingSpan实现了AbstractSpan接口,代码如下:
public abstract class AbstractTracingSpan implements AbstractSpan {
protected int spanId;
protected int parentSpanId;
protected List tags;
protected String operationName;
protected SpanLayer layer;
protected volatile boolean isInAsyncMode = false;
private volatile boolean isAsyncStopped = false;
protected final TracingContext owner;
protected long startTime;
protected long endTime;
protected boolean errorOccurred = false;
protected int componentId = 0;
protected List logs;
protected List refs;
public boolean finish(TraceSegment owner) {
this.endTime = System.currentTimeMillis();
owner.archive(this);
return true;
}
SkyWalking中Trace数据模型的实现如下图:
一个Trace由多个TraceSegment组成,TraceSegment使用TraceSegmentRef指向它的上一个TraceSegment。每个TraceSegment中有多个Span,每个Span都有spanId和parentSpanId,spanId从0开始,parentSpanId指向上一个span的Id。一个TraceSegment中第一个创建的Span叫EntrySpan,调用的本地方法对应LocalSpan,离开当前Segment对应ExitSpan。每个Span都有一个refs,每个TraceSegment的第一个Span的refs会指向它所在TraceSegment的上一个TraceSegment
小结:
26、StackBasedTracingSpanStackBasedTracingSpan是一个内部具有栈结构的Span,它可以启动和关闭多次在一个类似栈的执行流程中
在看StackBasedTracingSpan源码之前,我们先来看下StackBasedTracingSpan的工作原理:
如上图,假设有一个应用部署在Tomcat上,使用SpringMVC提供一个getUser()的Controller方法,getUser()方法直接返回不会调用其他的第三方
在这样一个单体结构中,只会有一个TraceSegment,TraceSegment中会有一个EntrySpan
请求进来后,走到Tomcat,SkyWalking的Tomcat插件会尝试创建EntrySpan,如果发现自己是这个请求到达后第一个工作的插件就会创建EntrySpan,如果不是第一个就会复用之前插件创建的EntrySpan。Tomcat插件创建EntrySpan,并会在Span上记录tags、logs、component、layer等信息,代码如下:
public class TomcatInvokeInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
Request request = (Request) allArguments[0];
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(request.getHeader(next.getHeadKey()));
}
// 创建EntrySpan
AbstractSpan span = ContextManager.createEntrySpan(request.getRequestURI(), contextCarrier);
Tags.URL.set(span, request.getRequestURL().toString());
Tags.HTTP.METHOD.set(span, request.getMethod());
span.setComponent(ComponentsDefine.TOMCAT);
SpanLayer.asHttp(span);
if (TomcatPluginConfig.Plugin.Tomcat.COLLECT_HTTP_PARAMS) {
collectHttpParam(request, span);
}
}
请求经过Tomcat后交给SpringMVC,SpringMVC插件也会创建EntrySpan,代码如下:
public abstract class AbstractMethodInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
Boolean forwardRequestFlag = (Boolean) ContextManager.getRuntimeContext().get(FORWARD_REQUEST_FLAG);
if (forwardRequestFlag != null && forwardRequestFlag) {
return;
}
String operationName;
if (SpringMVCPluginConfig.Plugin.SpringMVC.USE_QUALIFIED_NAME_AS_ENDPOINT_NAME) {
operationName = MethodUtil.generateOperationName(method);
} else {
EnhanceRequireObjectCache pathMappingCache = (EnhanceRequireObjectCache) objInst.getSkyWalkingDynamicField();
String requestURL = pathMappingCache.findPathMapping(method);
if (requestURL == null) {
requestURL = getRequestURL(method);
pathMappingCache.addPathMapping(method, requestURL);
requestURL = pathMappingCache.findPathMapping(method);
}
operationName = getAcceptedMethodTypes(method) + requestURL;
}
Object request = ContextManager.getRuntimeContext().get(REQUEST_KEY_IN_RUNTIME_CONTEXT);
if (request != null) {
StackDepth stackDepth = (StackDepth) ContextManager.getRuntimeContext().get(CONTROLLER_METHOD_STACK_DEPTH);
if (stackDepth == null) {
final ContextCarrier contextCarrier = new ContextCarrier();
if (IN_SERVLET_CONTAINER && HttpServletRequest.class.isAssignableFrom(request.getClass())) {
final HttpServletRequest httpServletRequest = (HttpServletRequest) request;
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(httpServletRequest.getHeader(next.getHeadKey()));
}
// 创建EntrySpan
AbstractSpan span = ContextManager.createEntrySpan(operationName, contextCarrier);
Tags.URL.set(span, httpServletRequest.getRequestURL().toString());
Tags.HTTP.METHOD.set(span, httpServletRequest.getMethod());
span.setComponent(ComponentsDefine.SPRING_MVC_ANNOTATION);
SpanLayer.asHttp(span);
if (SpringMVCPluginConfig.Plugin.SpringMVC.COLLECT_HTTP_PARAMS) {
RequestUtil.collectHttpParam(httpServletRequest, span);
}
if (!CollectionUtil.isEmpty(SpringMVCPluginConfig.Plugin.Http.INCLUDE_HTTP_HEADERS)) {
RequestUtil.collectHttpHeaders(httpServletRequest, span);
}
} else if (ServerHttpRequest.class.isAssignableFrom(request.getClass())) {
final ServerHttpRequest serverHttpRequest = (ServerHttpRequest) request;
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(serverHttpRequest.getHeaders().getFirst(next.getHeadKey()));
}
// 创建EntrySpan
AbstractSpan span = ContextManager.createEntrySpan(operationName, contextCarrier);
Tags.URL.set(span, serverHttpRequest.getURI().toString());
Tags.HTTP.METHOD.set(span, serverHttpRequest.getMethodValue());
span.setComponent(ComponentsDefine.SPRING_MVC_ANNOTATION);
SpanLayer.asHttp(span);
if (SpringMVCPluginConfig.Plugin.SpringMVC.COLLECT_HTTP_PARAMS) {
RequestUtil.collectHttpParam(serverHttpRequest, span);
}
if (!CollectionUtil.isEmpty(SpringMVCPluginConfig.Plugin.Http.INCLUDE_HTTP_HEADERS)) {
RequestUtil.collectHttpHeaders(serverHttpRequest, span);
}
} else {
throw new IllegalStateException("this line should not be reached");
}
stackDepth = new StackDepth();
ContextManager.getRuntimeContext().put(CONTROLLER_METHOD_STACK_DEPTH, stackDepth);
} else {
AbstractSpan span = ContextManager.createLocalSpan(buildOperationName(objInst, method));
span.setComponent(ComponentsDefine.SPRING_MVC_ANNOTATION);
}
stackDepth.increment();
}
}
Tomcat已经创建了EntrySpan,SpringMVC就不能再创建EntrySpan了,SpringMVC会复用Tomcat创建的EntrySpan。Tomcat已经在Span上记录tags、logs、component、layer等信息,SpringMVC这里会覆盖掉之前Tomcat在Span上记录的信息
EntrySpan就是使用StackBasedTracingSpan这种基于栈的Span来实现的,EntrySpan中有两个属性:当前栈深stackDepth和当前最大栈深currentMaxDepth
Tomcat创建EntrySpan,EntrySpan中当前栈深=1,当前最大栈深=1
SpringMVC复用Tomcat创建的EntrySpan,会把当前栈深和当前最大栈深都+1,此时当前栈深=2,当前最大栈深=2
当getUser()方法执行完后,首先返回到SpringMVC,会把当前栈深-1,当前最大栈深是只增不减的,此时当前栈深=1,当前最大栈深=2
当返回到Tomcat时,当前栈深-1,此时当前栈深=0,当前最大栈深=2,当前栈深=0时,就代表EntrySpan出栈了
如何判断当前EntrySpan是复用前面的呢?只需要判断currentMaxDepth不等于1就是复用前面的EntrySpan,如果等于1就是当前插件创建的EntrySpan。记录Span信息的时候都是请求进来EntrySpan入栈的流程,只要stackDepth=currentMaxDepth时就是请求进来的流程,所以只有stackDepth=currentMaxDepth时才允许记录Span的信息
EntrySpan有如下几个特性:
- 在一个TraceSegment里面只能存在一个EntrySpan
- 后面的插件复用前面插件创建的EntrySpan时会覆盖掉前面插件设置的Span信息
- EntrySpan记录的信息永远是最靠近服务提供侧的信息
EntrySpan和ExitSpan都是通过StackBasedTracingSpan来实现的,继承关系如下:
StackBasedTracingSpan中包含stackDepth属性,代码如下:
public abstract class StackBasedTracingSpan extends AbstractTracingSpan {
protected int stackDepth;
@Override
public boolean finish(TraceSegment owner) {
if (--stackDepth == 0) {
return super.finish(owner);
} else {
return false;
}
}
finish()方法中,当stackDepth等于0时,栈就空了,就代表EntrySpan结束了
EntrySpan中包含currentMaxDepth属性,代码如下:
public class EntrySpan extends StackBasedTracingSpan {
private int currentMaxDepth;
public EntrySpan(int spanId, int parentSpanId, String operationName, TracingContext owner) {
super(spanId, parentSpanId, operationName, owner);
this.currentMaxDepth = 0;
}
@Override
public EntrySpan start() {
// currentMaxDepth = stackDepth = 1时,才调用start方法记录启动时间
if ((currentMaxDepth = ++stackDepth) == 1) {
super.start();
}
// 复用span时清空之前插件记录的span信息
clearWhenRestart();
return this;
}
@Override
public EntrySpan tag(String key, String value) {
// stackDepth = currentMaxDepth时,才记录span信息
if (stackDepth == currentMaxDepth || isInAsyncMode) {
super.tag(key, value);
}
return this;
}
@Override
public AbstractTracingSpan setLayer(SpanLayer layer) {
if (stackDepth == currentMaxDepth || isInAsyncMode) {
return super.setLayer(layer);
} else {
return this;
}
}
@Override
public AbstractTracingSpan setComponent(Component component) {
if (stackDepth == currentMaxDepth || isInAsyncMode) {
return super.setComponent(component);
} else {
return this;
}
}
@Override
public AbstractTracingSpan setOperationName(String operationName) {
if (stackDepth == currentMaxDepth || isInAsyncMode) {
return super.setOperationName(operationName);
} else {
return this;
}
}
@Override
public EntrySpan log(Throwable t) {
super.log(t);
return this;
}
@Override
public boolean isEntry() {
return true;
}
@Override
public boolean isExit() {
return false;
}
private void clearWhenRestart() {
this.componentId = DictionaryUtil.nullValue();
this.layer = null;
this.logs = null;
this.tags = null;
}
}
小结:
参考:
SkyWalking8.7.0源码分析(如果你对SkyWalking Agent源码感兴趣的话,强烈建议看下该教程)



