栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

skywalking源码分析第十二篇一server-receiver-plugin之Register服务注册发现模块启动

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

skywalking源码分析第十二篇一server-receiver-plugin之Register服务注册发现模块启动

文章目录

原理图源码分析一RegisterModuleProvider源码分析一服务注册

注册方法概览什么是端点?什么是字典?源码分析一doServiceRegister服务注册源码分析一ServiceInventoryRegister.getOrCreate服务注册信息缓存获取或者异步创建

源码分析一ServiceInventoryCache服务注册缓存获取源码分析一InventoryStreamProcessor服务注册注册异步创建

源码分析一RegisterDistinctWorker源码分析一RegisterPersistentWorker 总结备注: registerLockDAO原理

register_lock索引概览IRegisterLockDAO乐观锁实现

原理图

不同信息的服务注册都是同一套流程先基于缓存和存储获取,获取成功则返回agent交由agent构建字典信息[基于字典传输降低带宽]信息获取失败则异步创建agent获取失败则轮询发起服务注册请求
源码分析一RegisterModuleProvider

为grpc服务器和jetty服务器注册各类handler其中RegisterServiceHandler用于处理服务注册,服务实例注册以及[endpoint 和networkaddress]同步请求

高版本agent交互基本走grpc,skywalking-ui交互走jetty

public class RegisterModuleProvider extends ModuleProvider {
    @Override public void prepare() {
    }
    高版本agent交互基本走grpc,skywalking-ui交互走jetty
    @Override public void start() {
        GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(GRPCHandlerRegister.class);
        grpcHandlerRegister.addHandler(new ApplicationRegisterHandler(getManager()));
        grpcHandlerRegister.addHandler(new InstanceDiscoveryServiceHandler(getManager()));
        grpcHandlerRegister.addHandler(new ServiceNameDiscoveryHandler(getManager()));
        grpcHandlerRegister.addHandler(new NetworkAddressRegisterServiceHandler(getManager()));

        服务注册 服务实例注册 以及[endpoint 和networkaddress]同步请求
        grpcHandlerRegister.addHandler(new RegisterServiceHandler(getManager()));
        心跳请求 更新heatbeat
        grpcHandlerRegister.addHandler(new ServiceInstancePingServiceHandler(getManager()));
        其他handler主要处理低版本协议
        JettyHandlerRegister jettyHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(JettyHandlerRegister.class);
        jettyHandlerRegister.addHandler(new ApplicationRegisterServletHandler(getManager()));
        jettyHandlerRegister.addHandler(new InstanceDiscoveryServletHandler(getManager()));
        jettyHandlerRegister.addHandler(new InstanceHeartBeatServletHandler(getManager()));
        jettyHandlerRegister.addHandler(new NetworkAddressRegisterServletHandler(getManager()));
        jettyHandlerRegister.addHandler(new ServiceNameDiscoveryServiceHandler(getManager()));
    }

    @Override public void notifyAfterCompleted() {
    }
}
源码分析一服务注册 注册方法概览

其他注册流程类似,不在重复叙述

RegisterServiceHandler方法作用
doServiceRegister服务注册
doServiceInstanceRegister服务实例注册
doEndpointRegister端点注册,服务端生成id,返回agent端,agent维护id和name字典
doNetworkAddressRegisterip注册,同endpoint流量
doServiceAndNetworkAddressMappingRegister维护服务和ip映射关系
什么是端点?

端点名一般由agent插件定义,并且在字典中维护[id和名称的关系],传输时通过id传输避免网络带宽

dubbo的端点为接口名
下图为mysql端点名称
下图为elasticsearch端点名称

什么是字典?

字典维护在agent端,agent注册的EndPoint和ip等信息会在服务端生成唯一id并返回agent,agent依据id和名称的关系构建字典映射

将来agent上报record和metrics等数据则通过字典表将endpoint,ip等信息转成对应的唯一id

作用: 减少网络传输的带宽

源码分析一doServiceRegister服务注册

通过IServiceInventoryRegister获取serviceid返回agent相关信息

@Override public void doServiceRegister(Services request, StreamObserver responseObserver) {
    ServiceRegisterMapping.Builder builder = ServiceRegisterMapping.newBuilder();
    request.getServicesList().forEach(service -> {
        String serviceName = service.getServiceName();
        服务注册的service_id 如果不存在则直接返回agentagent会轮询调用直到获取成功
        service_id不存在会异步创建
        int serviceId = serviceInventoryRegister.getOrCreate(serviceName, null);
        if (serviceId != Const.NONE) {
            KeyIntValuePair value = KeyIntValuePair.newBuilder().setKey(serviceName).setValue(serviceId).build();
            builder.addServices(value);
        }
    });
    responseObserver.onNext(builder.build());
    responseObserver.onCompleted();
}
源码分析一ServiceInventoryRegister.getOrCreate服务注册信息缓存获取或者异步创建

通过缓存获取,缓存不存在则通过es获取存储层也获取不到则异步构建,agent获取不到注册结果会轮询调用InventoryStreamProcessor处理创建serviceId流程

@Override public int getOrCreate(String serviceName, JsonObject properties) {
    通过缓存获取,缓存不存在则通过es获取
    int serviceId = getServiceInventoryCache().getServiceId(serviceName);
    存储层也获取不到则异步构建,返回agent NONE,agent注册后字典填充失败会轮询调用
    if (serviceId == Const.NONE) {
        对应service_inventory索引
        ServiceInventory serviceInventory = new ServiceInventory();
        serviceInventory.setName(serviceName);
        serviceInventory.setAddressId(Const.NONE);
        serviceInventory.setIsAddress(BooleanUtils.FALSE);

        long now = System.currentTimeMillis();
        serviceInventory.setRegisterTime(now);
        serviceInventory.setHeartbeatTime(now);
        serviceInventory.setMappingServiceId(Const.NONE);
        serviceInventory.setLastUpdateTime(now);
        serviceInventory.setProperties(properties);
        不存在异步添加
        InventoryStreamProcessor.getInstance().in(serviceInventory);
    }
    return serviceId;
}
源码分析一ServiceInventoryCache服务注册缓存获取

缓存获取缓存获取失败则通过elasticSearch获取

public int getServiceId(String serviceName) {
    缓存获取
    Integer serviceId = serviceNameCache.getIfPresent(ServiceInventory.buildId(serviceName));
    if (Objects.isNull(serviceId) || serviceId == Const.NONE) {
        缓存不存在则通过存储层查找
        serviceId = getCacheDAO().getServiceId(serviceName);
        if (serviceId != Const.NONE) {
            serviceNameCache.put(ServiceInventory.buildId(serviceName), serviceId);
        }
    }
    return serviceId;
}
源码分析一InventoryStreamProcessor服务注册注册异步创建

RegisterDistinctWorker------RegisterRemoteWorker------RegisterPersistentWorker

流式处理节点名称作用
RegisterDistinctWorker缓冲注册请求,去重,批量发送下一个worker
RegisterRemoteWorker远程通信,发往其他集群节点处理
RegisterPersistentWorker缓冲注册请求,去重,加锁创建注册信息

为什么注册需要去重?

serviceId会有一个集群多个agent同时发送该serviceid的创建
agent 第一次注册获取serviceId失败会轮询请求

源码分析一RegisterDistinctWorker

流式处理L1聚合阶段主要处理去重,缓冲

private void onWork(RegisterSource source) {
    messageNum++;

    if (!sources.containsKey(source)) {
        第一次出现则加入缓冲区
        sources.put(source, source);
    } else {
        已经存在则合并
        sources.get(source).combine(source);
    }
    消息数量超过1000或者一个批次的数据  【isEndOfBatch不在分析,其原理为,生产者生产时 endOfBatch  =false  消费者消费时,获取缓冲区所有的消息则发送nextWorker】
    也就是生产者在持续生产 则缓存区满1000则发送下一个worker, 生产者不在生产,则消费者将所有的消息的最后一条设置成end,触发nextWorker
    if (messageNum >= 1000 || source.isEndOfBatch()) {
        sources.values().forEach(nextWorker::in);
        sources.clear();
        messageNum = 0;
    }
}
源码分析一RegisterPersistentWorker

流式处理L2聚合阶段主要处理注册请求的创建通过registerLockDao进行加锁保障线程安全

private void onWork(RegisterSource registerSource) {
    if (!sources.containsKey(registerSource)) {
        sources.put(registerSource, registerSource);
    } else {
        sources.get(registerSource).combine(registerSource);
    }

    try (HistogramMetrics.Timer timer = workerLatencyHistogram.createTimer()) {
        if (sources.size() > 1000 || registerSource.isEndOfBatch()) {
            sources.values().forEach(source -> {
                try {
                    存在则更新心跳时间等
                    RegisterSource dbSource = registerDAO.get(modelName, source.id());
                    if (Objects.nonNull(dbSource)) {
                        if (dbSource.combine(source)) {
                            registerDAO.forceUpdate(modelName, dbSource);
                        }
                    } else {
                        不存在加锁
                        int sequence;
                        返回一个自增id 为一个全局锁功能 确保创建流程的独占性 [乐观锁更新失败返回Const.NONE]
                        if ((sequence = registerLockDAO.getId(scopeId, source)) != Const.NONE) {
                            // --------------类似单例模式的双重锁机制,保障线程安全--------------
                            加锁成功获取
                            try {
                                dbSource = registerDAO.get(modelName, source.id());
                                获取存在则更新
                                if (Objects.nonNull(dbSource)) {
                                    if (dbSource.combine(source)) {
                                        registerDAO.forceUpdate(modelName, dbSource);
                                    }
                                } else {
                                    获取不存在则注册
                                    source.setSequence(sequence);
                                    registerDAO.forceInsert(modelName, source);
                                }
                            } catch (Throwable t) {
                                logger.error(t.getMessage(), t);
                            }
                        } else {
                            加锁失败说明有其他线程在处理注册请求,该线程无需创建
                            logger.info("{} inventory register try lock and increment sequence failure.", DefaultScopeDefine.nameOf(scopeId));
                        }
                    }
                } catch (Throwable t) {
                    logger.error(t.getMessage(), t);
                }
            });
            sources.clear();
        }
    }
}
总结

服务注册将读写分离,读取失败由agent轮询注册异步写时通过registerLockDAO加锁保障线程安全其中不同信息的注册流程大同小异 备注: registerLockDAO原理

存储模块启动时会通过RegisterLockEs7Installer完成registerLockDAO对应的es索引初始化registerLockDAO通过对注册场景对应的scopeId进行es乐观锁自增更新更新成功加锁成功,更新失败加锁失败

 public void install() throws StorageException {
    删除其他代码
   	创建registerLockDAO的底层索引register_lock
    createIndex();
    遍历每个注册信息的scopeId,在register_lock插入一行数据
    主键_document_id = scopeId,乐观锁字段sequence值初始化为1
    for (Class registerSource : InventoryStreamProcessor.getInstance().getAllRegisterSources()) {
        int scopeId = ((Stream)registerSource.getAnnotation(Stream.class)).scopeId();
        putIfAbsent(scopeId);
    }
} 
register_lock索引概览
__document_idsequence注册类说明
141ServiceInventory

DefaultScopeDefine.SERVICE_INVENTORY值为14,代表ServiceInventory服务注册的场景编号为14,则在register_lock索引插入一行主键为14的数据sequence为乐观锁机制,初始化为1以上信息都是基于ServiceInventory类的注解@Stream指定

@Stream(name = ServiceInventory.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INVENTORY, builder = ServiceInventory.Builder.class, processor = InventoryStreamProcessor.class)

IRegisterLockDAO乐观锁实现

获取register_lock索引sequence字段对sequence字段自增并force强刷es更新成功则加锁成功

 @Override public int getId(int scopeId, RegisterSource registerSource) {
    String id = scopeId + "";
    int sequence = Const.NONE;
    try {
        GetResponse response = getClient().get(RegisterLockIndex.NAME, id);
        if (response.isExists()) {
            Map source = response.getSource();
            sequence = ((Number)source.get(RegisterLockIndex.COLUMN_SEQUENCE)).intValue();
            long version = response.getVersion();
            增加版本号
            sequence++;
            使用es的乐观锁
            lock(id, sequence, version);
        }
    } catch (Throwable t) {
        return Const.NONE;
    }
    return sequence;
}  

private void lock(String id, int sequence, long version) throws IOException {
    XContentBuilder source = XContentFactory.jsonBuilder().startObject();
    es sequence字段
    source.field(RegisterLockIndex.COLUMN_SEQUENCE, sequence);
    source.endObject();

    getClient().forceUpdate(RegisterLockIndex.NAME, id, source, version);
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/756224.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号