
skywalking agent中Log的收集和发送流程
个人录制的源码阅读课程,如果需要点开看看吧。
个人录制的skywalking 源码阅读课程,如果需要点开看看吧
skywalking agent+oap端源码阅读 + 二开实践 原创视频
1、什么叫日志事件
日志事件是对一条日志数据(包括日志线程、日志级别、日志内容等等)的封装,必须实现ILoggingEvent
接口,在不同的日志appender中传递都是基于日志事件。
2、agent中项目中使用日志上报功能
可以参考官方文档:
https://skywalking.apache.org/docs/skywalking-java/v9.0.0/en/setup/service-agent/java-agent/application-toolkit-logback-1.x/
-
第一步:引入依赖
<dependency> <groupId>org.apache.skywalking</groupId> <artifactId>apm-toolkit-logback-1.x</artifactId> <version>9.0.0</version> </dependency>
-
第二步:在resources下新增
logback.xml
文件,内容如下<?xml version="1.0" encoding="UTF-8"?> <configuration scan="true" scanPeriod="10 seconds"> <property name="devel" value="[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} %c %M %L %thread ------------------ %m%n"></property> <appender name="stdout" class="ch.qos.logback.core.ConsoleAppender"> <!-- 控制台输出流对象 默认 System.out 改为 System.err --> <target>System.err</target> <!-- 这儿是修改打印的字体颜色为红色 --> <!-- 日志消息格式配置 --> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>${devel}</pattern> <!-- ${上面定义的proerty的name的值} --> </encoder> </appender> <!-- 这儿就是配置的,把日志输出到skywalking-oap --> <appender name="sw-grpc-log" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender"> <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder"> <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.mdc.TraceIdMDCPatternLogbackLayout"> <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%X{tid}] [%thread] %-5level %logger{36} -%msg%n</Pattern> </layout> </encoder> </appender> <root level="INFO"> <!-- 在该级别下,要进行的操作! --> <appender-ref ref="stdout"></appender-ref> <appender-ref ref="sw-grpc-log"></appender-ref> <!-- <appender-ref ref="上面配置好的日志输出到控制台的appender的name的名称"></appender-ref> --> </root> </configuration>
-
第三步:到自己的项目中打印一段独特的日志
@Transactional @GetMapping("/index") public void index(){ System.out.println("xxxxxxxxxxxxxxxxxx"); log.info("kkkkk"); //一会观察 日志中是否有 kkkk 的记录 GoodsEntity goodsEntity = new GoodsEntity(); goodsEntity.setGoodsName("goods"); goodsService.save(goodsEntity); }
-
第四步:到skywalking的web UI中查看日志
日志中已经关联好了traceId了。
3、日志上报源码分析
先找到日志上报的协议文件Logging.proto
。然后根据LogReportServiceGrpc
使用的地方找到LogReportServiceClient
类,根据LogReportServiceClient
类能找到GRPCLogAppenderInterceptor
类(这个类在很多地方有,需要找对地方),然后再根据GRPCLogAppenderInterceptor
类,就可以找到GRPCLogAppenderActivation
类!
GRPCLogAppenderActivation
类,类明确了对GRPCLogClientAppender#subAppend
感兴趣的拦截器GRPCLogAppenderInterceptor
。其中subAppend
方法是 Appender 接口中的一个方法,主要负责将日志事件传递给特定的 Appender 实现类,并执行相应的输出逻辑。不同类型的 Appender 会对 subAppend 方法进行重写,以实现不同的输出行为,如将日志输出到控制台、文件等。
public class GRPCLogAppenderInterceptor implements InstanceMethodsAroundInterceptor { private LogReportServiceClient client; @SuppressWarnings("unchecked") @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable { //从ServiceManager中获取到初始化好的Log报告器,具体的Log上报器,见下面的,分析1 if (Objects.isNull(client)) { client = ServiceManager.INSTANCE.findService(LogReportServiceClient.class); if (Objects.isNull(client)) { return; } } ILoggingEvent event = (ILoggingEvent) allArguments[0]; //把日志数据扔到LogReportServiceClient维护的DataCarrier中 if (Objects.nonNull(event)) { //为什么这儿用`(OutputStreamAppender<ILoggingEvent>)`强转objInst,因为objInst是被拦截的`GRPCLogClientAppender`,它继承了`OutputStreamAppender<E>` //调用transform,是把OutputStreamAppender<ILoggingEvent>中的日志数据读出来,并组成GRPC要发送的数据即`Logging.proto`中的LogData数据! //client.produce 是把整理好的数据放到LogReportServiceClient维护的DataCarrier中 client.produce(transform((OutputStreamAppender<ILoggingEvent>) objInst, event)); } } @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable { return ret; } @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) { } /** * transforms {@link ILoggingEvent} to {@link LogData} * * @param appender the real {@link OutputStreamAppender appender} * @param event {@link ILoggingEvent} * @return {@link LogData} with filtered trace context in order to reduce the cost on the network */ private LogData.Builder transform(final OutputStreamAppender<ILoggingEvent> appender, ILoggingEvent event) { //构建LogData中LogTags参数,LogTags的结构是键值对的形式,类似于map LogTags.Builder logTags = LogTags.newBuilder() .addData(KeyStringValuePair.newBuilder() .setKey("level").setValue(event.getLevel().toString()).build()) .addData(KeyStringValuePair.newBuilder() .setKey("logger").setValue(event.getLoggerName()).build()) .addData(KeyStringValuePair.newBuilder() .setKey("thread").setValue(event.getThreadName()).build()); //补充一些数据到logTags中 if (!ToolkitConfig.Plugin.Toolkit.Log.TRANSMIT_FORMATTED) { if (event.getArgumentArray() != null) { for (int i = 0; i < event.getArgumentArray().length; i++) { String value = Optional.ofNullable(event.getArgumentArray()[i]).orElse("null").toString(); logTags.addData(KeyStringValuePair.newBuilder() .setKey("argument." + i).setValue(value).build()); } } final IThrowableProxy throwableProxy = event.getThrowableProxy(); if (throwableProxy instanceof ThrowableProxy) { Throwable throwable = ((ThrowableProxy) throwableProxy).getThrowable(); logTags.addData(KeyStringValuePair.newBuilder() .setKey("exception").setValue(ThrowableTransformer.INSTANCE.convert2String(throwable, 2048)).build()); } } //开始构建LogData数据 LogData.Builder builder = LogData.newBuilder() .setTimestamp(event.getTimeStamp()) .setService(Config.Agent.SERVICE_NAME) .setServiceInstance(Config.Agent.INSTANCE_NAME) .setTags(logTags.build()) .setBody(LogDataBody.newBuilder() .setType(LogDataBody.ContentCase.TEXT.name()) .setText(TextLog.newBuilder()setText(transformLogText(appender, event)).build() //按照日志设置的格式化,对日志内容进行格式化 ).build()); //往LogData的Endpoint中设置数据 String primaryEndpointName = ContextManager.getPrimaryEndpointName(); if (primaryEndpointName != null) { builder.setEndpoint(primaryEndpointName); } //往LogData的TraceContext中设置数据 return -1 == ContextManager.getSpanId() ? builder : builder.setTraceContext(TraceContext.newBuilder() .setTraceId(ContextManager.getGlobalTraceId()) .setSpanId(ContextManager.getSpanId()) .setTraceSegmentId(ContextManager.getSegmentId()) .build()); } //根据用户设置的Encoder,把日志内容格式化 private String transformLogText(final OutputStreamAppender<ILoggingEvent> appender, final ILoggingEvent event) { if (ToolkitConfig.Plugin.Toolkit.Log.TRANSMIT_FORMATTED) { return new String(appender.getEncoder().encode(event)); } else { return event.getMessage(); } } }
日志通过client.produce
把日志放到LogReportServiceClient
维护的DataCarrier
后,如何被发送出去
在LogReportServiceClient
初始化的过程中已经指明了其内部的DataCarrier
的消费者了
@Override public void boot() throws Throwable { carrier = new DataCarrier<>("gRPC-log", "gRPC-log", Config.Buffer.CHANNEL_SIZE, Config.Buffer.BUFFER_SIZE, BufferStrategy.IF_POSSIBLE ); carrier.consume(this, 1); }
然后,掉用LogReportServiceClient
的consume
方法读取DataCarrier
的数据
//consume会自动读取到消息 @Override public void consume(final List<LogData.Builder> dataList) { //如果没有读取到消息 if (CollectionUtil.isEmpty(dataList)) { return; } //如果grpc的状态是CONNECTED if (GRPCChannelStatus.CONNECTED.equals(status)) { GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); StreamObserver<LogData> logDataStreamObserver = logReportServiceStub .withDeadlineAfter(Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS) .collect( new StreamObserver<Commands>() { //获取到从服务端获取到的响应内容 @Override public void onNext(final Commands commands) { } @Override public void onError(final Throwable throwable) { status.finished(); LOGGER.error(throwable, "Try to send {} log data to collector, with unexpected exception.", dataList.size() ); ServiceManager.INSTANCE .findService(GRPCChannelManager.class) .reportError(throwable); } @Override public void onCompleted() { status.finished(); } }); //遍历获取到的日志数据,把日志发送出去 boolean isFirst = true; for (final LogData.Builder logData : dataList) { if (isFirst) { // Only set service name of the first element in one stream // https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto // Log collecting protocol defines LogData#service is required in the first element only. logData.setService(Config.Agent.SERVICE_NAME); isFirst = false; } logDataStreamObserver.onNext(logData.build()); } //告诉服务端,请求全部发完了 logDataStreamObserver.onCompleted(); //等待关闭 status.wait4Finish(); } }
整体发送的日志数据大致如下面的json
{ "timestamp": "2024-04-10 15:10:14", "service": "服务名称", "serviceInstance": "服务实例名称", "endpoint": "具体方法的名称", "body": { "type": "TEXT", "content": "可以是json、text、yaml" }, "traceContext": { "traceId": "链路追踪的ID", "traceSegmentId": "segmentId", "spanId": "spanId" }, "tags": { //这里面放了很多键值对 }, "layer": "" }
分析1
@DefaultImplementor public class LogReportServiceClient implements BootService, GRPCChannelListener, IConsumer<LogData.Builder> { private static final ILog LOGGER = LogManager.getLogger(LogReportServiceClient.class); private volatile DataCarrier<LogData.Builder> carrier; private volatile GRPCChannelStatus status; private volatile LogReportServiceGrpc.LogReportServiceStub logReportServiceStub; @Override public void prepare() throws Throwable { ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this); } //Log上报器中维护了一套自己的 DataCarrier @Override public void boot() throws Throwable { carrier = new DataCarrier<>("gRPC-log", "gRPC-log", Config.Buffer.CHANNEL_SIZE, Config.Buffer.BUFFER_SIZE, BufferStrategy.IF_POSSIBLE ); //初始化好的DataCarrier的消费者,就是当前类LogReportServiceClient carrier.consume(this, 1); } @Override public void onComplete() throws Throwable { } //把log数据写入到DataCarrier public void produce(LogData.Builder logData) { if (Objects.nonNull(logData) && !carrier.produce(logData)) { if (LOGGER.isDebugEnable()) { LOGGER.debug("One log has been abandoned, cause by buffer is full."); } } } @Override public void init(final Properties properties) { } //开始消费,参数就是读取到的DataCarrier中待发送的数据 @Override public void consume(final List<LogData.Builder> dataList) { if (CollectionUtil.isEmpty(dataList)) { return; } //通过GRPC的流式发送的方式,把所有的日志数据上报! if (GRPCChannelStatus.CONNECTED.equals(status)) { GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); StreamObserver<LogData> logDataStreamObserver = logReportServiceStub .withDeadlineAfter(Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS) .collect( new StreamObserver<Commands>() { @Override public void onNext(final Commands commands) { } @Override public void onError(final Throwable throwable) { status.finished(); LOGGER.error(throwable, "Try to send {} log data to collector, with unexpected exception.", dataList.size() ); ServiceManager.INSTANCE .findService(GRPCChannelManager.class) .reportError(throwable); } @Override public void onCompleted() { status.finished(); } }); boolean isFirst = true; for (final LogData.Builder logData : dataList) { if (isFirst) { // Only set service name of the first element in one stream // https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto // Log collecting protocol defines LogData#service is required in the first element only. logData.setService(Config.Agent.SERVICE_NAME); isFirst = false; } logDataStreamObserver.onNext(logData.build()); } logDataStreamObserver.onCompleted(); status.wait4Finish(); } } @Override public void onError(final List<LogData.Builder> data, final Throwable t) { LOGGER.error(t, "Try to consume {} log data to sender, with unexpected exception.", data.size()); } @Override public void onExit() { } @Override public void statusChanged(GRPCChannelStatus status) { if (GRPCChannelStatus.CONNECTED.equals(status)) { Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel(); logReportServiceStub = LogReportServiceGrpc.newStub(channel) .withMaxOutboundMessageSize(Log.MAX_MESSAGE_SIZE); } this.status = status; } @Override public void shutdown() { carrier.shutdownConsumers(); } }
更多推荐
所有评论(0)