栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

skywalking源码分析第七篇一StorageModule模块启动

skywalking源码分析第七篇一StorageModule模块启动

文章目录

数据模型存储层架构设计注册数据锁原理图源码分析一StorageModuleElasticsearchProvider

索引创建ModelInstaller.install 源码分析一@stream注解处理

举例一InventoryStreamProcessor.create 总结

存储实现本文默认elasticsearch

数据模型

StorageData代表一条数据每一条数据都会被存储到elasticsearch数据类上如果标注@Stream注解,则会被构建成Model对象[第五篇annotationScan已经介绍]StorageModuleProvider启动时会基于Model向es创建索引

存储层架构设计

StorageDAO 类似工厂用于获取真正的dao实现不同dao实现用于读写StorageDataStorageBuilder用于完成与es交互的防腐层转换,es层全部基于map传递数据,StorageBuilder支持map和StorageData的相互转换stream注解的扫描会完成Model的创建,model对应es的索引ModelInstaller会根据model对象向es存储发送请求索引创建请求
注册数据锁原理图

有一种特殊的Dao 叫做IRegisterLockDAOIRegisterLockDAO用于对注册数据进行加锁实现原理基于es的版本号[乐观锁机制]每一个@stream内有一个scopeId,比如endpoint注册信息,其写入es时会获取锁[启动阶段会为EndpointInventory创建endpoint_inventory索引,存储相关数据,同时会为register_lock创建一个id为16的数据,表示EndpointInventory当前的版本号]将来在对endpoint_inventory写数据需要依据register_lock索引id为16的数据版本号++是否成功来判断是否加锁成功,从而决定写endpoint_inventory索引
源码分析一StorageModuleElasticsearchProvider

prepare完成esclient的创建和服务注册start完成es索引的创建【init模式】

public class StorageModuleElasticsearch7Provider extends ModuleProvider {

    @Override
    public void prepare() throws ServiceNotProvidedException {
     
        if (!StringUtil.isEmpty(config.getNameSpace())) {
            config.setNameSpace(config.getNameSpace().toLowerCase());
        }
        elasticSearch7Client = new ElasticSearch7Client(config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config.getTrustStorePass(), config.getNameSpace(), config.getUser(), config.getPassword());
        构建esClient
        注册serviceImpl 必须要注册所有StorageModule.services方法的接口
        this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, config.getBulkActions(), config.getFlushInterval(), config.getConcurrentRequests()));
        this.registerServiceImplementation(StorageDAO.class, new StorageEs7DAO(elasticSearch7Client));
        this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockEs77DAOImpl(elasticSearch7Client));
        this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearch7Client, new ElasticsearchStorageTTL()));

        this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));
        this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));
        this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEs7DAO(elasticSearch7Client));
        this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));

        this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearch7Client));
        this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQueryEs7DAO(elasticSearch7Client));
        this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQueryEs7DAO(elasticSearch7Client, config.getSegmentQueryMaxSize()));
        this.registerServiceImplementation(ImetadataQueryDAO.class, new metadataQueryEs7DAO(elasticSearch7Client, config.getmetadataQueryMaxSize()));
        this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEs7DAO(elasticSearch7Client));
        this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEs7DAO(elasticSearch7Client));
        this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearch7Client));
        this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEs7DAO(elasticSearch7Client));
    }

    @Override
    public void start() throws ModuleStartException {
        overrideCoreModuleTTLConfig();

        try {
            elasticSearch7Client.connect();
            根据mode模式决定是否初始化es索引
            StorageEs7Installer installer = new StorageEs7Installer(getManager(), config);
            installer.install(elasticSearch7Client);
            初始化锁索引
            RegisterLockEs7Installer lockInstaller = new RegisterLockEs7Installer(elasticSearch7Client);
            lockInstaller.install();
        } catch (StorageException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) {
            throw new ModuleStartException(e.getMessage(), e);
        }
    }

    @Override
    public void notifyAfterCompleted() {
        无需处理
    }

    @Override
    public String[] requiredModules() {
        return new String[]{CoreModule.NAME};
    }

}
索引创建ModelInstaller.install

StorageModels获取es索引映射对象init模式调用esclient创建索引

 public final void install(Client client) throws StorageException {
        IModelGetter modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelGetter.class);
        StorageModels获取es索引映射对象
        List models = modelGetter.getModels();

        no-init 不会创建表 es索引等
        if (RunningMode.isNoInitMode()) {
            for (Model model : models) {
                while (!isExists(client, model)) {
                    try {
                        logger.info("table: {} does not exist. OAP is running in 'no-init' mode, waiting... retry 3s later.", model.getName());
                        Thread.sleep(3000L);
                    } catch (InterruptedException e) {
                        logger.error(e.getMessage());
                    }
                }
            }
        } else {
            init 索引不存在在创建索引
            for (Model model : models) {
                if (!isExists(client, model)) {
                    logger.info("table: {} does not exist", model.getName());
                    createTable(client, model);
                }
            }
        }
    }
源码分析一@stream注解处理

分为四类StreamProcessor处理数据[agent上报分为4类]StreamProcessor将来处理agent上报的数据

public class StreamAnnotationListener implements AnnotationListener {

    @SuppressWarnings("unchecked")
    @Override public void notify(Class aClass) {
        if (aClass.isAnnotationPresent(Stream.class)) {
            Stream stream = (Stream)aClass.getAnnotation(Stream.class);
            根据不同元组的Stream标记 创建Storage元信息,并构建Model 将来ModelInstaller 会根据Model信息执行es索引初始化
             构建entryWorkers数据的用于流式处理
            if (stream.processor().equals(InventoryStreamProcessor.class)) {
                InventoryStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
            } else if (stream.processor().equals(RecordStreamProcessor.class)) {
                RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
            } else if (stream.processor().equals(MetricsStreamProcessor.class)) {
                MetricsStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
            } else if (stream.processor().equals(TopNStreamProcessor.class)) {
                TopNStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
            } else {
                throw new UnexpectedException("Unknown stream processor.");
            }
        } else {
            throw new UnexpectedException("Stream annotation listener could only parse the class present stream annotation.");
        }
    }
}
举例一InventoryStreamProcessor.create

StorageModels根据stream注解添加Model[installer根据此model合集在es创建索引]构建worker链路[RegisterDistinctWorker[l1聚合]-> RegisterRemoteWorker -> RegisterPersistentWorker[l2聚合]]

public void create(ModuleDefineHolder moduleDefineHolder, Stream stream,
    Class inventoryClass) {
    StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
    IRegisterDAO registerDAO;
    try {
        registerDAO = storageDAO.newRegisterDao(stream.builder().newInstance());
    } catch (InstantiationException | IllegalAccessException e) {
        throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " register DAO failure.", e);
    }

    IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
    添加到StorageModels 之后installer会根据其包含的model以及init模式进行存储层的table创建[es,db等]
    Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None), false);
    最后一个worker 对请求去重 例如多个服务实例一起进行应用名注册  此外服务注册失败会进行去重
    RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, stream.scopeId());

    String remoteReceiverWorkerName = stream.name() + "_rec";
    IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
    workerInstanceSetter.put(remoteReceiverWorkerName, persistentWorker, inventoryClass);
    发往其他节点处理
    
    RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
    第一个worker
    RegisterDistinctWorker distinctWorker = new RegisterDistinctWorker(moduleDefineHolder, remoteWorker);

    entryWorkers.put(inventoryClass, distinctWorker);
}
总结

Storage完成es索引的创建概述Storage上报数据的分类以及Processor数据处理的分类概述Storage的架构设计讲解一个特殊的索引register_lock提供的加锁功能

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/751442.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号