原创 skywalking源码分析第十九篇一agent端Trace三部曲一Trace上报
源码分析一采样入口当前Segmentspan处理完毕根据采样机制判断是否进行上报上报则设置Ignore为false,不上报则设置ignore为trueprivate void finish() {...... 删除其他代码try {当前Segmentspan处理完毕if (activeSpanStack.isEmpty() && running && (!isRun
·
源码分析一采样入口
- 当前Segmentspan处理完毕根据采样机制判断是否进行上报
- 上报则设置Ignore为false,不上报则设置ignore为true
private void finish() {
...... 删除其他代码
try {
当前Segmentspan处理完毕
if (activeSpanStack.isEmpty() && running && (!isRunningInAsyncMode || asyncSpanCounter.get() == 0)) {
TraceSegment finishedSegment = segment.finish(isLimitMechanismWorking());
if (!segment.hasRef() && segment.isSingleSpanSegment()) {
trySampling为采样算法
if (!samplingService.trySampling()) {
不进行采样
finishedSegment.setIgnore(true);
}
}
if (segment.createTime() < RemoteDownstreamConfig.Agent.INSTANCE_REGISTERED_TIME) {
不进行采样
finishedSegment.setIgnore(true);
}
根据Ignore属性决定是否上报Segment数据到OAP
TracingContext.ListenerManager.notifyFinish(finishedSegment);
running = false;
}
} finally {
if (isRunningInAsyncMode) {
asyncFinishLock.unlock();
}
}
}
源码分析一采样机制
- SamplingService基于配置Config.Agent.SAMPLE_N_PER_3_SECS进行处理采样逻辑
- 默认开关不打开直接进行全采样
- SAMPLE_N_PER_3_SECS配置大于0 则处理采样逻辑
- 通过比较当前采样数samplingFactorHolder和SAMPLE_N_PER_3_SECS决定是否采样
- SamplingService有内部调度每隔三秒对samplingFactorHolder进行复位
public class SamplingService implements BootService {
是否打开采样比控制,不打开全采样
private volatile boolean on = false;
@Override
public void boot() throws Throwable {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
if (Config.Agent.SAMPLE_N_PER_3_SECS > 0) {
on = true;
this.resetSamplingFactor();
ScheduledExecutorService service = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("SamplingService"));
每隔3秒清空采样比
scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(new Runnable() {
@Override
public void run() {
resetSamplingFactor();
}
}, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, 3, TimeUnit.SECONDS);
logger.debug("Agent sampling mechanism started. Sample {} traces in 3 seconds.", Config.Agent.SAMPLE_N_PER_3_SECS);
}
}
public boolean trySampling() {
是否打开采样比控制,不打开全采样【默认不打开】
if (on) {
int factor = samplingFactorHolder.get();
配置每三秒采样多少
if (factor < Config.Agent.SAMPLE_N_PER_3_SECS) {
出现并发问题也不进行采样
boolean success = samplingFactorHolder.compareAndSet(factor, factor + 1);
return success;
} else {
三秒内大于Config.Agent.SAMPLE_N_PER_3_SECS则不进行采样
return false;
}
}
return true;
}
private void resetSamplingFactor() {
samplingFactorHolder = new AtomicInteger(0);
}
}
源码分析一TraceSegmentServiceClient上报Trace
- DataCarrier缓冲上报的数据
- TraceSegmentServiceClient通过消费DataCarrier发送Segment并完成回调处理
public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener {
private volatile DataCarrier<TraceSegment> carrier;
@Override
public void consume(List<TraceSegment> data) {
连接状态正常
if (CONNECTED.equals(status)) {
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
上报数据时设置超时时间为30秒
StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {
@Override
public void onNext(Commands commands) {
处理成功的回调Commands
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}
@Override
public void onError(Throwable throwable) {
status.finished();
if (logger.isErrorEnable()) {
logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception.");
}
处理失败的回调
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
}
@Override
public void onCompleted() {
status.finished();
}
});
遍历dateCarrier的缓冲区
try {
for (TraceSegment segment : data) {
领域对象转换成grpc传输对象
UpstreamSegment upstreamSegment = segment.transform();
上传数据
upstreamSegmentStreamObserver.onNext(upstreamSegment);
}
} catch (Throwable t) {
logger.error(t, "Transform and send UpstreamSegment to collector fail.");
}
完成传输
upstreamSegmentStreamObserver.onCompleted();
status.wait4Finish();
segmentUplinkedCounter += data.size();
} else {
segmentAbandonedCounter += data.size();
}
printUplinkStatus();
}
@Override
public void afterFinished(TraceSegment traceSegment) {
if (traceSegment.isIgnore()) {
return;
}
存储数据到DataCarrier缓冲区
if (!carrier.produce(traceSegment)) {
if (logger.isDebugEnable()) {
logger.debug("One trace segment has been abandoned, cause by buffer is full.");
}
}
}
}
总结
- skywalking默认全采样Trace数据,降采样处理Metrics数据
- 不同于Pinpoint等其他工具,skywalking的采样率设置: 每三秒采样数量阈值
更多推荐
所有评论(0)