0
点赞
收藏
分享

微信扫一扫

基于源码skywalking使用elasticsearch流程分析

在这里插入图片描述

环境准备

elasticesearch准备

version: '2'
services:
    elasticsearch:
        image: elasticsearch:6.8.0
        container_name: skywalking-es
        restart: always
        ports:
            - 9200:9200
            - 9300:9300
        environment:
            discovery.type: single-node
            TZ: Asia/Shanghai
  • 启动 ,docker-compose up -d

在这里插入图片描述

修改skywalking存储配置

  • 修改启动配置,根据截图箭头修改即可,这里需要注意的是,namespace配置为es的clulster_name的值。
    在这里插入图片描述

看下效果

  • 可以看到已经上传到es数据了
    在这里插入图片描述
  • 对应的其实也就是segment的这个索引数据
    在这里插入图片描述

写入流程

既然是基于es的写入,就可以猜测下看他们底层是怎么操作es的。

  • 找到了这个类org.apache.skywalking.oap.server.core.analysis.worker.RecordPersistentWorker,我们打个断点看下,断点进来了,还需要注意的是TraceSegmentReportServiceHandler这个类,就是上次grpc client发送过来接收数据的地方。
    在这里插入图片描述

  • 我们再来看下,存到es的结构体内容,首先看下org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.SegmentAnalysisListener#parseFirst,这里需要解释下的是SegmentObject这个类是grpc传输日志的契约类,也就是protobuf文件生成的java文件,还有就是dataBinary这个字段的赋值是SegmentObject的字节流。

在这里插入图片描述
org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentDispatcher#dispatch,里面SegmentRecord,是最终写入es的实体类,同样dataBinary这个字段也是上面传过来的字节流。这里是一种优化手段减少存储空间在这里插入图片描述

  • org.apache.skywalking.library.elasticsearch.bulk.BulkProcessor#internalAdd 这里面也是批量写入,虽然每次都是一条一条的上传日志,使用semaphore控制同一个时刻只有一个线程写入es,数据都放到阻塞队列上,只有数据满5000字节,才会进行一次真正的写入

在这里插入图片描述

举报

相关推荐

0 条评论