源码分析一采样入口

  • 当前Segmentspan处理完毕根据采样机制判断是否进行上报
  • 上报则设置Ignore为false,不上报则设置ignore为true
private void finish() {
    ...... 删除其他代码
    try {
        当前Segmentspan处理完毕
        if (activeSpanStack.isEmpty() && running && (!isRunningInAsyncMode || asyncSpanCounter.get() == 0)) {
            TraceSegment finishedSegment = segment.finish(isLimitMechanismWorking());
            if (!segment.hasRef() && segment.isSingleSpanSegment()) {
                trySampling为采样算法
                if (!samplingService.trySampling()) {
                    不进行采样
                    finishedSegment.setIgnore(true);
                }
            }
            if (segment.createTime() < RemoteDownstreamConfig.Agent.INSTANCE_REGISTERED_TIME) {
                不进行采样
                finishedSegment.setIgnore(true);
            }
            根据Ignore属性决定是否上报Segment数据到OAP
            TracingContext.ListenerManager.notifyFinish(finishedSegment);
            running = false;
        }
    } finally {
        if (isRunningInAsyncMode) {
            asyncFinishLock.unlock();
        }
    }
}

源码分析一采样机制

  • SamplingService基于配置Config.Agent.SAMPLE_N_PER_3_SECS进行处理采样逻辑
  • 默认开关不打开直接进行全采样
  • SAMPLE_N_PER_3_SECS配置大于0 则处理采样逻辑
  • 通过比较当前采样数samplingFactorHolder和SAMPLE_N_PER_3_SECS决定是否采样
  • SamplingService有内部调度每隔三秒对samplingFactorHolder进行复位
public class SamplingService implements BootService {
    是否打开采样比控制,不打开全采样
    private volatile boolean on = false;
   

    @Override
    public void boot() throws Throwable {
        if (scheduledFuture != null) {
            scheduledFuture.cancel(true);
        }
        if (Config.Agent.SAMPLE_N_PER_3_SECS > 0) {
            on = true;
            this.resetSamplingFactor();
            ScheduledExecutorService service = Executors
                .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("SamplingService"));
            每隔3秒清空采样比
            scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(new Runnable() {
                @Override
                public void run() {
                    resetSamplingFactor();
                }
            }, new RunnableWithExceptionProtection.CallbackWhenException() {
                @Override public void handle(Throwable t) {
                    logger.error("unexpected exception.", t);
                }
            }), 0, 3, TimeUnit.SECONDS);
            logger.debug("Agent sampling mechanism started. Sample {} traces in 3 seconds.", Config.Agent.SAMPLE_N_PER_3_SECS);
        }
    }
   
    public boolean trySampling() {
        是否打开采样比控制,不打开全采样【默认不打开】
        if (on) {
            int factor = samplingFactorHolder.get();
            配置每三秒采样多少
            if (factor < Config.Agent.SAMPLE_N_PER_3_SECS) {
                出现并发问题也不进行采样
                boolean success = samplingFactorHolder.compareAndSet(factor, factor + 1);
                return success;
            } else {
                三秒内大于Config.Agent.SAMPLE_N_PER_3_SECS则不进行采样
                return false;
            }
        }
        return true;
    }

    private void resetSamplingFactor() {
        samplingFactorHolder = new AtomicInteger(0);
    }
}

源码分析一TraceSegmentServiceClient上报Trace

  • DataCarrier缓冲上报的数据
  • TraceSegmentServiceClient通过消费DataCarrier发送Segment并完成回调处理

public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener {
    
    private volatile DataCarrier<TraceSegment> carrier;
    

    @Override
    public void consume(List<TraceSegment> data) {
        连接状态正常
        if (CONNECTED.equals(status)) {
            final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
            上报数据时设置超时时间为30秒
            StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {
                @Override
                public void onNext(Commands commands) {
                    处理成功的回调Commands
                    ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
                }

                @Override
                public void onError(Throwable throwable) {
                    status.finished();
                    if (logger.isErrorEnable()) {
                        logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception.");
                    }
                    处理失败的回调
                    ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
                }

                @Override
                public void onCompleted() {
                    status.finished();
                }
            });
            遍历dateCarrier的缓冲区
            try {
                for (TraceSegment segment : data) {
                    领域对象转换成grpc传输对象
                    UpstreamSegment upstreamSegment = segment.transform();
                    上传数据
                    upstreamSegmentStreamObserver.onNext(upstreamSegment);
                }
            } catch (Throwable t) {
                logger.error(t, "Transform and send UpstreamSegment to collector fail.");
            }
            完成传输
            upstreamSegmentStreamObserver.onCompleted();

            status.wait4Finish();
            segmentUplinkedCounter += data.size();
        } else {
            segmentAbandonedCounter += data.size();
        }
        printUplinkStatus();
    }

    

    @Override
    public void afterFinished(TraceSegment traceSegment) {
        if (traceSegment.isIgnore()) {
            return;
        }   
        存储数据到DataCarrier缓冲区
        if (!carrier.produce(traceSegment)) {
            if (logger.isDebugEnable()) {
                logger.debug("One trace segment has been abandoned, cause by buffer is full.");
            }
        }
    }
}

总结

  • skywalking默认全采样Trace数据,降采样处理Metrics数据
  • 不同于Pinpoint等其他工具,skywalking的采样率设置: 每三秒采样数量阈值
Logo

更多推荐