skywalking源码解析系列二 : agent采集trace数据
在2020年3月份开始接触skywalking到现在,使用skywalking已经一年时间,期间对内部代码进行了详细阅读,并且由于项目需要,我们已经对源码进行了二开,新增了各种个性化需求,可以说,我们对skywalking底层源码了解程度已经相对较高。本来想通过笔记对这一年来的源码阅读及理解成果进行记录,无意中发现这篇文章写得相当的好,也懒得去写了,因此直接转载,后续该系列文章会夹杂着转载与原创,
在2020年3月份开始接触skywalking到现在,使用skywalking已经一年时间,期间对内部代码进行了详细阅读,并且由于项目需要,我们已经对源码进行了二开,新增了各种个性化需求,可以说,我们对skywalking底层源码了解程度已经相对较高。
本来想通过笔记对这一年来的源码阅读及理解成果进行记录,无意中发现这篇文章写得相当的好,也懒得去写了,因此直接转载,后续该系列文章会夹杂着转载与原创,欢迎各位码友交流探讨
1. 简介
本文源码解析使用的版本是 skywalking 7.0 , 不同版本实现上可能由一定差异,但是思想上大致相同
上篇文章介绍了skywalking-agent的整体架构以及插件的加载原理。
skywalking源码解析系列一: agent插件加载原理
但是仅仅知道了他如何去加载插件,那至于在他使用agent去修改业务代码后如何去收集trace数据 那么请继续往下看
2. trace数据结构
在看源码之前,我们先来了解一下 在skywalking中trace数据 是以什么样的数据结构去保存的,这里为了更清晰展示,这里选择从前端查看trace数据的结构
首先先来一个最简单的例子
其实就是一个 请求,访问的三次数据库,对应方法大概如下
请求A(){
访问数据库1()
访问数据库2()
访问数据库3()
}
然后我们来看看要显示这样一条 调用链 需要多少数据,这里把断点打在 trace-detail-chart-tree.vue 文件的 changeTree()方法上面
然后我们来查看 this.segmentId 中的数据
这里需要关注的数据我已经标注出来,但名字也大概能猜出这几个参数是干什么的,children 这个属性是用JS算出来挂载上去的对象,在实际的skywalking中并不是这么存储的,那么他是如何计算出children是谁,这里就要去关注一下那几个ID了
这里为了结构更清晰 我精简了一下
从上图可以看出 traceId / segmentId 都是相同的,不同的只是 spanId和parentSpanId 这里 可以看出 children的parentSpanId 属性指向了 父节点的 spanId属性。
嗯 这里可以看出 spanId 和 parentSpanId 的作用,就上面这个 例子来说可以认为 每一个记录点就是一个 span 他的唯一标识是 spanId , 然后父子关系通过parentSpanId 去关联。
然而真的是这样的吗?
我们看下面一个例子

这个调用链比较复杂 和上面最大的不同在于 SwCallableWapper 这是我自己封装的跨线程的插件(skywalking线程池插件,解决lamdba使用问题),也就是说 我这里的调用链上是跨了线程,同时 蓝色的点是远程调用了另外一个服务,也就是说我这边还跨了进程,
如果转成对应的方法就是
链路方法(){
多线程异步方法(){
访问数据库()
远程调用服务() --> 在远程服务这边执行了数据库
}
...后面同样的 异步方法 调用了 3次
}
那我们来看看他有什么不同
下面我只截取了 从链路入口到 SwCallableWapper 也就是跨线程 的一段数据
可以看出 segmentId 不同了,但是 traceId 父子还是相同的 , 同时 spanId 2个都是从 0 开始的,而parentSpandId 指向都是 -1 ,那么最开始那个例子的 使用spanId 来关联 父子 关系的逻辑失效了,同时为了标识出 children 的父亲到底是谁,新增加了一个属性 refs,这个属性里关联上了 父节点的所有 ID 信息。 同样跨进程 的远程调用也是一样的。
其实可以大概看出来 spanId 就像一个 链条一样把 一条链路 给串起来,但是如果是 跨线程/进程 又会创建出一个新的链条,然后用 refs 把这几段链条给接连接起来
那么最后调用实际上的链路结构是这样的

相同颜色的小球可以任何 是在同一个线程中 , 在同一个线程中 的 segmentId 都是相同的,并且在同一个线程中 , 使用 spanId以及parentSpanId去连接链路,线程、进程中不同的链路使用refs去连接,同时多个 线程链路 组成一个 trace,他们的 traceID 都是相同的
至于为什么 不同线程的链路 需要用refs连接而不是 spanId一直传递下去的方式连接,后面会给出答案
下面就总结一下 上面出现的 几个 ID 都是什么含义
spanId: 同一个线程中唯一, 从0始,按照调用链路从0递增parentSpanId: 在同一个线程的链路中,用来连接spansegmentId: 同一个线程链路中,这个值都是相同的,不同线程链路中 这个值不同traceId: 在一个链路中traceId唯一
3. trace 数据采集
上面讲了这么多,那么agent 到底怎么去采集数据嗯, 接下来将会已 spring-mvc-plugin 插件为例子 来讲解
以下 源码来源于 agent 插件 mvc-annotation-commons,这个插件是官方自带的
这个插件会去 代理 所有打了 @requestMapping 注解的方法,让其在进入对应前以及方法结束后做一些事件,至于他怎么代理请看 skywalking源码解析 (1) : agent插件加载原理
在类 AbstractMethodInterceptor#beforeMethod 方法里可以看到 当执行 @requestMapping 标注的的方法前将会做些什么 , 下面的代码是我删除过一些 代码,一遍能更清晰体现流程
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
String operationName;
//在之前提到过,在agent修改字节码的时候,会在对应的业务类里去 写入一个属性,然后让业务类实现接口 EnhancedInstance ,从此能有能力去获取这个属性
EnhanceRequireObjectCache pathMappingCache = (EnhanceRequireObjectCache) objInst.getSkyWalkingDynamicField();
//pathMappingCache 这个属性里存放是的是 method -> url 的数据,如果没有对应的缓存,就重新获取一下,然后存入缓存里
String requestURL = pathMappingCache.findPathMapping(method);
if (requestURL == null) {
requestURL = getRequestURL(method);
pathMappingCache.addPathMapping(method, requestURL);
requestURL = getAcceptedMethodTypes(method) + pathMappingCache.findPathMapping(method);
}
operationName = requestURL;
//获取了当前请求的 request
HttpServletRequest request = (HttpServletRequest) ContextManager.getRuntimeContext()
.get(REQUEST_KEY_IN_RUNTIME_CONTEXT);
if (request != null) {
// 获取 stackDepth, 用来记录本次调用链的深度
//比如 如果这个请求 第一站进入了这个 interceptor 那么他的深度就是0/null , 如果在此之前有其他的 interceptor提前执行了,那么深度 +1, 例如: tomcat 的插件
StackDepth stackDepth = (StackDepth) ContextManager.getRuntimeContext().get(CONTROLLER_METHOD_STACK_DEPTH);
//等于null 说明是入口 span
if (stackDepth == null) {
//获取了跨进程的 context
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
//从header 里获取了远程调用时候传送过来的数据,塞入contextCarrier
//setHeadValue 的时候会自动反序列化
next.setHeadValue(request.getHeader(next.getHeadKey()));
}
//创建一个入口 span
//下面都是在给这个span去塞入一些信息
AbstractSpan span = ContextManager.createEntrySpan(operationName, contextCarrier);
Tags.URL.set(span, request.getRequestURL().toString());
Tags.HTTP.METHOD.set(span, request.getMethod());
span.setComponent(ComponentsDefine.SPRING_MVC_ANNOTATION);
SpanLayer.asHttp(span);
//如果这个是入口类,那么创建一个新的 stackDepth
stackDepth = new StackDepth();
ContextManager.getRuntimeContext().put(CONTROLLER_METHOD_STACK_DEPTH, stackDepth);
} else {
AbstractSpan span = ContextManager.createLocalSpan(buildOperationName(objInst, method));
span.setComponent(ComponentsDefine.SPRING_MVC_ANNOTATION);
}
//深度 +1,在afterMethod 的时候会去 -1
stackDepth.increment();
}
}
上面代码其实要关注的是 span 的创建上面,从上面代码可以看出 如果 进入这个方法之前没有创建过任何一个 span那么就将会使用一个 ContextManager.createEntrySpan() 去创建span,如果在之前就已经创建过 span(比如如果使用了 tomcat 的插件,那么 tomcat 才将是 这次调用链的第一站),那么使用ContextManager.createLocalSpan() 去创建 span,那么看名字是否猜到还有一个 ContextManager.createExitSpan() , 从这这三者 从名字上看 也就是标识 了 入口 -> 本地 -> 出口 上面,这也 暗示了 如果我们写插件,也需要 这么去定义 span
ContextManager.createEntrySpan(): 如果在一个进程内 这是第一个生成的span那么使用createEntrySpan()方法去创建,他除了会生成span还会帮你把之前短接的调用链给连接起来(比如 远程调用 A - > B 在 B服务 调用createEntrySpan才能和A 关联起来)ContextManager.createLocalSpan(): 本地span, 最普通的创建span的方法ContextManager.createExitSpan(): 如果在一个进程内 发现这已经是这个进程最后一个调用span, 使用createExitSpan去创建对应的span, 比如 使用okhttp去调用别的服务,那么在okhttp发送之前就已经是最后一个span了,方法createExitSpan除了 会帮你创建一个span,还会帮你把 一些id信息带给 被调用方(okhttp是把ID信息给序列化放在 header里),被调用方使用createEntrySpan()就能把整个请求给 连接起来,这里需要留意的是 这里带给的 id 其实就是上问提到的traceId/segmentId/spanId,这三者组成了一个 完成的refs属性,刚好对应上上文所讲,如果看不懂可以再回过去看一遍
既然有前置代理方法,那就肯定有 后置 代理方法,在后置代理方法上面,聪明的小伙伴都能想到会把方法执行的结果,异常 等信息存入 span, 那么如果我没有对应的 放回数据 , 也不想记录异常,那么是否可以不写后置处理方法嗯? 答案是 不可以
因为可以 把在同一线程中的 调用链路 看做是一个 栈 , 执行createSpan() 是入栈的过程,那么执行ContextManager.stopSpan() 就是出栈的过程

入上图所示, 在这个情况下 如果我直接 ContextManager.stopSpan() 那么 停止的就是远程调用B创建出来的 span,所以如果自定义插件,一定要确保自己的 span成功出栈
4. trace 数据采集
上面讲了如何去创建 span 那么 这些数据会如何发送到 skywalking ?
首先,要发送这些span 数据不能够阻塞 我们的业务线程,而然后有一定的数据量,需要批量发送等功能,所以这边 skywalking 使用了 生产-消费 的模型
4.1 生产者
上面提到,每当一个方法结束后 都需要调用一下 ContextManager.stopSpan() 方法,没错这个方法就是 将 span塞入队列的 方法,但是 并不是 每次调用 ContextManager.stopSpan() 都会把出栈的 span扔入队列的
在 TracingContext#finish() 中可以看到
这里其实他就是去检查了一下 上面的 span栈空了 才会执行后面的方法 . 换句话说就是,需要等同一个线程里面所有的span 都出栈了,才会去把这整个 segment 给放入 消费队列中。
同样在 TracingContext#finish() 的方法中可以看到他如何塞入消费队列的
//这里会去通知所有注册了的 listener,本TraceSegment成功结束了
// 这里会有一个叫做 TraceSegmentServiceClient 的listener 收到这个事件后,会把 TraceSegment 放入队列 等待消费
TracingContext.ListenerManager.notifyFinish(finishedSegment);
4.2 消费者
在agent 启动的时候会去 加载 模块apm-agent-core中一个 叫做 TraceSegmentServiceClient( 至于他是如何加载的 下篇文章会讲,这里就不赘述了) 的类
在这个类初始化的时候,执行了boot() 方法。
@Override
public void boot() {
lastLogTime = System.currentTimeMillis();
segmentUplinkedCounter = 0;
segmentAbandonedCounter = 0;
//这里会去创建 消费队列 可以使用 buffer.channel_size 来指定 消费队列的长度,以及 buffer.buffer_size 来指定消费队列的大小
carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE);
carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
//定义了消费者,Trace 的数据将会由这个消费者去发送给 skywalking,这里传了 this 消费者就是自己
//参数2 定义了有几个消费线程,每个线程会持有不同的队列
carrier.consume(this, 1);
}
从上面可以看出 他可以配置 多个 队列以及多个线程,但实际上他写死了 使用 1个线程去发送数据,其实我觉得多个队列也没多少意义
但是如果有多个 多线的话 每个线程有自己的所属队列 去发送数据
消费者的 远程调用 直接逻辑 在 TraceSegmentServiceClient#consume(List<TraceSegment> data)
@Override
public void consume(List<TraceSegment> data) {
if (CONNECTED.equals(status)) {
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(
Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).collect(new StreamObserver<Commands>() {..GRPC 的一些回调..});
for (TraceSegment segment : data) {
//转换一下 segment 成 proto 数据
UpstreamSegment upstreamSegment = segment.transform();
//GRPC 发送
upstreamSegmentStreamObserver.onNext(upstreamSegment);
}
//告诉 GRPC 流已经完全写入进去了,等待他全部把数据发送后会回调上面的 StreamObserver定义的回调方法
upstreamSegmentStreamObserver.onCompleted();
status.wait4Finish();
segmentUplinkedCounter += data.size();
} else {
segmentAbandonedCounter += data.size();
}
}
直接用的 GRPC 批量发送了所有的 span 数据
更多推荐

所有评论(0)