version: '2'
services:
elasticsearch:
image: elasticsearch:6.8.0
container_name: skywalking-es
restart: always
ports:
- 9200:9200
- 9300:9300
environment:
discovery.type: single-node
TZ: Asia/Shanghai
启动 ,docker-compose up -d
修改skywalking存储配置修改启动配置,根据截图箭头修改即可,这里需要注意的是,namespace配置为es的clulster_name的值。
看下效果
可以看到已经上传到es数据了
对应的其实也就是segment的这个索引数据
写入流程
既然是基于es的写入,就可以猜测下看他们底层是怎么操作es的。
找到了这个类org.apache.skywalking.oap.server.core.analysis.worker.RecordPersistentWorker,我们打个断点看下,断点进来了,还需要注意的是TraceSegmentReportServiceHandler这个类,就是上次grpc client发送过来接收数据的地方。
我们再来看下,存到es的结构体内容,首先看下org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.SegmentAnalysisListener#parseFirst,这里需要解释下的是SegmentObject这个类是grpc传输日志的契约类,也就是protobuf文件生成的java文件,还有就是dataBinary这个字段的赋值是SegmentObject的字节流。
org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentDispatcher#dispatch,里面SegmentRecord,是最终写入es的实体类,同样dataBinary这个字段也是上面传过来的字节流,这里面最后一步将字段进行了base64编码,具体可以看org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.RecordEsDAO#prepareBatchInsert中调用org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder#entity2Storage方法,这里是一种优化手段减少存储空间。
org.apache.skywalking.library.elasticsearch.bulk.BulkProcessor#internalAdd 这里面也是批量写入,虽然每次都是一条一条的上传日志,使用semaphore控制同一个时刻只有一个线程写入es,数据都放到阻塞队列上,只有数据满5000字节,才会进行一次真正的写入。
查询流程首先是通过graphql的,这个就是类似restful的调用最终找到的有两个有用的方法。
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TraceQueryEsDAO#queryBasicTraces 查询所有的链路org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TraceQueryEsDAO#queryByTraceId 根据traceId查询单个链路的,这里debug可以看到,span最多支持200个,同时这里也做了base64的解码,通过调用parseFrom方法进行转换为java实体类。



