个人录制的skywalking 源码阅读课程,如果需要点开看看吧

skywalking agent+oap端源码阅读 + 二开实践 原创视频

1、什么叫日志事件

日志事件是对一条日志数据(包括日志线程、日志级别、日志内容等等)的封装,必须实现ILoggingEvent接口,在不同的日志appender中传递都是基于日志事件。

2、agent中项目中使用日志上报功能

可以参考官方文档:https://skywalking.apache.org/docs/skywalking-java/v9.0.0/en/setup/service-agent/java-agent/application-toolkit-logback-1.x/

  • 第一步:引入依赖

    <dependency>
      <groupId>org.apache.skywalking</groupId>
      <artifactId>apm-toolkit-logback-1.x</artifactId>
      <version>9.0.0</version>
    </dependency>

  • 第二步:在resources下新增logback.xml文件,内容如下

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration scan="true" scanPeriod="10 seconds">
    ​
        <property name="devel" value="[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} %c %M %L %thread ------------------ %m%n"></property>
    ​
    ​
        <appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
            <!-- 控制台输出流对象 默认 System.out 改为 System.err -->
            <target>System.err</target> <!-- 这儿是修改打印的字体颜色为红色 -->
    ​
            <!-- 日志消息格式配置 -->
            <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
                <pattern>${devel}</pattern> <!-- ${上面定义的proerty的name的值} -->
            </encoder>
        </appender>
    ​
        <!-- 这儿就是配置的,把日志输出到skywalking-oap -->
        <appender name="sw-grpc-log" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender">
            <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
                <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.mdc.TraceIdMDCPatternLogbackLayout">
                    <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%X{tid}] [%thread] %-5level %logger{36} -%msg%n</Pattern>
                </layout>
            </encoder>
        </appender>
    ​
        <root level="INFO">
            <!-- 在该级别下,要进行的操作! -->
            <appender-ref ref="stdout"></appender-ref>
            <appender-ref ref="sw-grpc-log"></appender-ref>
            <!-- <appender-ref ref="上面配置好的日志输出到控制台的appender的name的名称"></appender-ref> -->
    ​
        </root>
    ​
    </configuration>

  • 第三步:到自己的项目中打印一段独特的日志

    @Transactional
    @GetMapping("/index")
    public void index(){
    ​
      System.out.println("xxxxxxxxxxxxxxxxxx");
    ​
      log.info("kkkkk"); //一会观察 日志中是否有 kkkk 的记录
    ​
      GoodsEntity goodsEntity = new GoodsEntity();
      goodsEntity.setGoodsName("goods");
    ​
      goodsService.save(goodsEntity);
    ​
    ​
    }

  • 第四步:到skywalking的web UI中查看日志

    日志中已经关联好了traceId了。

3、日志上报源码分析

先找到日志上报的协议文件Logging.proto。然后根据LogReportServiceGrpc使用的地方找到LogReportServiceClient类,根据LogReportServiceClient类能找到GRPCLogAppenderInterceptor类(这个类在很多地方有,需要找对地方),然后再根据GRPCLogAppenderInterceptor类,就可以找到GRPCLogAppenderActivation类!

GRPCLogAppenderActivation类,类明确了对GRPCLogClientAppender#subAppend感兴趣的拦截器GRPCLogAppenderInterceptor。其中subAppend方法是 Appender 接口中的一个方法,主要负责将日志事件传递给特定的 Appender 实现类,并执行相应的输出逻辑。不同类型的 Appender 会对 subAppend 方法进行重写,以实现不同的输出行为,如将日志输出到控制台、文件等。

public class GRPCLogAppenderInterceptor implements InstanceMethodsAroundInterceptor {
​
    private LogReportServiceClient client;
​
    @SuppressWarnings("unchecked")
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
        //从ServiceManager中获取到初始化好的Log报告器,具体的Log上报器,见下面的,分析1
        if (Objects.isNull(client)) {
          
            client = ServiceManager.INSTANCE.findService(LogReportServiceClient.class);
            if (Objects.isNull(client)) {
                return;
            }
        }
        ILoggingEvent event = (ILoggingEvent) allArguments[0];
        //把日志数据扔到LogReportServiceClient维护的DataCarrier中
        if (Objects.nonNull(event)) {
            //为什么这儿用`(OutputStreamAppender<ILoggingEvent>)`强转objInst,因为objInst是被拦截的`GRPCLogClientAppender`,它继承了`OutputStreamAppender<E>`
            //调用transform,是把OutputStreamAppender<ILoggingEvent>中的日志数据读出来,并组成GRPC要发送的数据即`Logging.proto`中的LogData数据!
            //client.produce 是把整理好的数据放到LogReportServiceClient维护的DataCarrier中
            client.produce(transform((OutputStreamAppender<ILoggingEvent>) objInst, event));
        }
    }
​
    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                              Object ret) throws Throwable {
        return ret;
    }
​
    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                      Class<?>[] argumentsTypes, Throwable t) {
​
    }
​
    /**
     * transforms {@link ILoggingEvent}  to {@link LogData}
     *
     * @param appender the real {@link OutputStreamAppender appender}
     * @param event {@link ILoggingEvent}
     * @return {@link LogData} with filtered trace context in order to reduce the cost on the network
     */
    private LogData.Builder transform(final OutputStreamAppender<ILoggingEvent> appender, ILoggingEvent event) {
        //构建LogData中LogTags参数,LogTags的结构是键值对的形式,类似于map
        LogTags.Builder logTags = LogTags.newBuilder()
                .addData(KeyStringValuePair.newBuilder()
                        .setKey("level").setValue(event.getLevel().toString()).build())
                .addData(KeyStringValuePair.newBuilder()
                        .setKey("logger").setValue(event.getLoggerName()).build())
                .addData(KeyStringValuePair.newBuilder()
                        .setKey("thread").setValue(event.getThreadName()).build());
        //补充一些数据到logTags中
        if (!ToolkitConfig.Plugin.Toolkit.Log.TRANSMIT_FORMATTED) {
            if (event.getArgumentArray() != null) {
                for (int i = 0; i < event.getArgumentArray().length; i++) {
                    String value = Optional.ofNullable(event.getArgumentArray()[i]).orElse("null").toString();
                    logTags.addData(KeyStringValuePair.newBuilder()
                            .setKey("argument." + i).setValue(value).build());
                }
            }
​
            final IThrowableProxy throwableProxy = event.getThrowableProxy();
            if (throwableProxy instanceof ThrowableProxy) {
                Throwable throwable = ((ThrowableProxy) throwableProxy).getThrowable();
                logTags.addData(KeyStringValuePair.newBuilder()
                        .setKey("exception").setValue(ThrowableTransformer.INSTANCE.convert2String(throwable, 2048)).build());
            }
        }
        //开始构建LogData数据
        LogData.Builder builder = LogData.newBuilder()
                .setTimestamp(event.getTimeStamp())
                .setService(Config.Agent.SERVICE_NAME)
                .setServiceInstance(Config.Agent.INSTANCE_NAME)
                .setTags(logTags.build())
                .setBody(LogDataBody.newBuilder()
                         .setType(LogDataBody.ContentCase.TEXT.name())
                                    .setText(TextLog.newBuilder()setText(transformLogText(appender, event)).build() //按照日志设置的格式化,对日志内容进行格式化
                                            ).build());
​
        //往LogData的Endpoint中设置数据
        String primaryEndpointName = ContextManager.getPrimaryEndpointName();
        if (primaryEndpointName != null) {
            builder.setEndpoint(primaryEndpointName);
        }
​
        //往LogData的TraceContext中设置数据
        return -1 == ContextManager.getSpanId() ? builder
                : builder.setTraceContext(TraceContext.newBuilder()
                        .setTraceId(ContextManager.getGlobalTraceId())
                        .setSpanId(ContextManager.getSpanId())
                        .setTraceSegmentId(ContextManager.getSegmentId())
                        .build());
    }
    //根据用户设置的Encoder,把日志内容格式化
    private String transformLogText(final OutputStreamAppender<ILoggingEvent> appender, final ILoggingEvent event) {
        if (ToolkitConfig.Plugin.Toolkit.Log.TRANSMIT_FORMATTED) {
            return new String(appender.getEncoder().encode(event));
        } else {
            return event.getMessage();
        }
    }
}

日志通过client.produce把日志放到LogReportServiceClient维护的DataCarrier后,如何被发送出去

LogReportServiceClient初始化的过程中已经指明了其内部的DataCarrier的消费者了

@Override
public void boot() throws Throwable {
  carrier = new DataCarrier<>("gRPC-log", "gRPC-log",
                              Config.Buffer.CHANNEL_SIZE,
                              Config.Buffer.BUFFER_SIZE,
                              BufferStrategy.IF_POSSIBLE
                             );
  carrier.consume(this, 1);
}

然后,掉用LogReportServiceClientconsume方法读取DataCarrier的数据

//consume会自动读取到消息
@Override
public void consume(final List<LogData.Builder> dataList) {
  //如果没有读取到消息
  if (CollectionUtil.isEmpty(dataList)) {
    return;
  }
  //如果grpc的状态是CONNECTED
  if (GRPCChannelStatus.CONNECTED.equals(status)) {
    GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
​
    StreamObserver<LogData> logDataStreamObserver = logReportServiceStub
      .withDeadlineAfter(Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
      .collect(
      new StreamObserver<Commands>() {
        
        //获取到从服务端获取到的响应内容
        @Override
        public void onNext(final Commands commands) {
​
        }
​
        @Override
        public void onError(final Throwable throwable) {
          status.finished();
          LOGGER.error(throwable, "Try to send {} log data to collector, with unexpected exception.",
                       dataList.size()
                      );
          ServiceManager.INSTANCE
            .findService(GRPCChannelManager.class)
            .reportError(throwable);
        }
​
        @Override
        public void onCompleted() {
          status.finished();
        }
      });
​
    //遍历获取到的日志数据,把日志发送出去
    boolean isFirst = true;
    for (final LogData.Builder logData : dataList) {
      if (isFirst) {
        // Only set service name of the first element in one stream
        // https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto
        // Log collecting protocol defines LogData#service is required in the first element only.
        logData.setService(Config.Agent.SERVICE_NAME);
        isFirst = false;
      }
      logDataStreamObserver.onNext(logData.build());
    }
    //告诉服务端,请求全部发完了
    logDataStreamObserver.onCompleted();
    //等待关闭
    status.wait4Finish();
  }
}

整体发送的日志数据大致如下面的json

{
  "timestamp": "2024-04-10 15:10:14",
  "service": "服务名称",
  "serviceInstance": "服务实例名称",
  "endpoint": "具体方法的名称",
  "body": {
    "type": "TEXT",
    "content": "可以是json、text、yaml"
  },
  "traceContext": {
    "traceId": "链路追踪的ID",
    "traceSegmentId": "segmentId",
    "spanId": "spanId"
  },
  "tags": {
    //这里面放了很多键值对
  },
  "layer": ""
}

分析1

@DefaultImplementor
public class LogReportServiceClient implements BootService, GRPCChannelListener, IConsumer<LogData.Builder> {
    private static final ILog LOGGER = LogManager.getLogger(LogReportServiceClient.class);
​
    private volatile DataCarrier<LogData.Builder> carrier;
    private volatile GRPCChannelStatus status;
​
    private volatile LogReportServiceGrpc.LogReportServiceStub logReportServiceStub;
​
    @Override
    public void prepare() throws Throwable {
        ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
    }
​
    //Log上报器中维护了一套自己的 DataCarrier
    @Override
    public void boot() throws Throwable {
        carrier = new DataCarrier<>("gRPC-log", "gRPC-log",
                                    Config.Buffer.CHANNEL_SIZE,
                                    Config.Buffer.BUFFER_SIZE,
                                    BufferStrategy.IF_POSSIBLE
        );
        //初始化好的DataCarrier的消费者,就是当前类LogReportServiceClient
        carrier.consume(this, 1);
    }
​
    @Override
    public void onComplete() throws Throwable {
​
    }
​
    //把log数据写入到DataCarrier
    public void produce(LogData.Builder logData) {
        if (Objects.nonNull(logData) && !carrier.produce(logData)) {
            if (LOGGER.isDebugEnable()) {
                LOGGER.debug("One log has been abandoned, cause by buffer is full.");
            }
        }
    }
​
    @Override
    public void init(final Properties properties) {
​
    }
​
    //开始消费,参数就是读取到的DataCarrier中待发送的数据
    @Override
    public void consume(final List<LogData.Builder> dataList) {
        if (CollectionUtil.isEmpty(dataList)) {
            return;
        }
​
        //通过GRPC的流式发送的方式,把所有的日志数据上报!
        if (GRPCChannelStatus.CONNECTED.equals(status)) {
            GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
​
            StreamObserver<LogData> logDataStreamObserver = logReportServiceStub
                .withDeadlineAfter(Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
                .collect(
                    new StreamObserver<Commands>() {
                        @Override
                        public void onNext(final Commands commands) {
​
                        }
​
                        @Override
                        public void onError(final Throwable throwable) {
                            status.finished();
                            LOGGER.error(throwable, "Try to send {} log data to collector, with unexpected exception.",
                                         dataList.size()
                            );
                            ServiceManager.INSTANCE
                                .findService(GRPCChannelManager.class)
                                .reportError(throwable);
                        }
​
                        @Override
                        public void onCompleted() {
                            status.finished();
                        }
                    });
​
            boolean isFirst = true;
            for (final LogData.Builder logData : dataList) {
                if (isFirst) {
                    // Only set service name of the first element in one stream
                    // https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto
                    // Log collecting protocol defines LogData#service is required in the first element only.
                    logData.setService(Config.Agent.SERVICE_NAME);
                    isFirst = false;
                }
                logDataStreamObserver.onNext(logData.build());
            }
            logDataStreamObserver.onCompleted();
            status.wait4Finish();
        }
    }
​
    @Override
    public void onError(final List<LogData.Builder> data, final Throwable t) {
        LOGGER.error(t, "Try to consume {} log data to sender, with unexpected exception.", data.size());
    }
​
    @Override
    public void onExit() {
​
    }
​
    @Override
    public void statusChanged(GRPCChannelStatus status) {
        if (GRPCChannelStatus.CONNECTED.equals(status)) {
            Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
            logReportServiceStub = LogReportServiceGrpc.newStub(channel)
                                                       .withMaxOutboundMessageSize(Log.MAX_MESSAGE_SIZE);
        }
        this.status = status;
    }
​
    @Override
    public void shutdown() {
        carrier.shutdownConsumers();
    }
}
Logo

更多推荐