栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

3.2 Trino二次开发-动态数据源管理-源码解读2

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

3.2 Trino二次开发-动态数据源管理-源码解读2

​​​​​目录

前言

源码阅读

1. Airlift中添加API接口

2. Trino加载Catalog

2.1 加载Catalog

2.1.1 loadCatalogs

2.1.2 createCatalog

2.2 updateConnectorIds

总结



前言

上文中提到,对Trino动态数据源管理的二次开发需要明确两个问题:

1.Trino的RestAPI框架采用的airlift,如何在airlift体系中添加API接口。

2.Trino加载数据源(Catalog)都做了哪些工作。

带着这两个问题,我们来深入的调查下Trino源码。


源码阅读


1. Airlift中添加API接口

网上关于Airlift的资料比较少,因此直接从Trino的源码中找线索。在有限的资料里,了解到Airlift暴露url是通过@Path的方式,idea里面全局搜索一下,符合条件的还是很多。这里找到一个相对全面的类MemoryResource,该类主要是用于获取worker节点上的内存池信息的,为了节省空间,已将暂时无关的代码删除掉。

@Path("/v1/memory")
public class MemoryResource
{
    private final LocalMemoryManager memoryManager;
    private final TaskManager taskManager;

    @Inject
    public MemoryResource(LocalMemoryManager memoryManager, TaskManager taskManager)
    {
        this.memoryManager = requireNonNull(memoryManager, "memoryManager is null");
        this.taskManager = requireNonNull(taskManager, "taskManager is null");
    }

    @ResourceSecurity(INTERNAL_ONLY)
    @POST
    @Produces(MediaType.APPLICATION_JSON)
    @Consumes(MediaType.APPLICATION_JSON)
    public MemoryInfo getMemoryInfo(MemoryPoolAssignmentsRequest request)
    {
        taskManager.updateMemoryPoolAssignments(request);
        return memoryManager.getInfo();
    }

    @ResourceSecurity(MANAGEMENT_READ)
    @GET
    @Path("{poolId}")
    public Response getMemoryInfo(@PathParam("poolId") String poolId)
    {
        if (GENERAL_POOL.getId().equals(poolId)) {
            return toSuccessfulResponse(memoryManager.getGeneralPool().getInfo());
        }

        if (RESERVED_POOL.getId().equals(poolId) && memoryManager.getReservedPool().isPresent()) {
            return toSuccessfulResponse(memoryManager.getReservedPool().get().getInfo());
        }

        return Response.status(NOT_FOUND).build();
    }
    ......
}

汇总一下知识点:

前文中提到Airlift是一个整合了Java常用库的轻量级框架,那么在RestAPI部分引入了JAX-RS标准,JAX-RS全名是Java API for RESTful Web Services.它的核心理念是resource,可以理解为面向资源。

@Inject:用于对依赖的服务进行注入

@Path:请求的url路径,可以是固定如"/v1/memory",也支持动态的参数如"{poolId}"

@POST 和 @GET:分别对应不同的Http请求Post和Get

@Produces 和 @Consumes:分别对应返回实体的数据格式,以及请求实体的数据格式,其中数据格式包括,APPLICATION_XML、APPLICATION_JSON、MULTIPART_FORM_DATA、APPLICATION_OCTET_STREAM...

当然Trino本身也定义了自己的注解,如@ResourceSecurity,用于对资源的访问权限进行控制。

@documented
@Retention(RUNTIME)
@Target({TYPE, METHOD})
public @interface ResourceSecurity
{
    enum AccessType
    {
        PUBLIC, WEB_UI, AUTHENTICATED_USER, MANAGEMENT_READ, MANAGEMENT_WRITE, INTERNAL_onLY
    }

    AccessType value();
}

可以看到定义了很多不同类型的资源访问权限: PUBLIC(公共权限), WEB_UI(Web控制台权限), AUTHENTICATED_USER(授权用户权限), MANAGEMENT_READ(集群管理读权限), MANAGEMENT_WRITE(集群管理写权限), INTERNAL_ONLY(内部权限)

至此,我们对Trino的Restful接口有了一定的了解。


2. Trino加载Catalog

Trino在启动的时候,Server类会进行初始化工作,其中有两部分代码与Catalog相关

//从配置文件中加载Catalog信息
injector.getInstance(StaticCatalogStore.class).loadCatalogs();


// TODO: remove this huge hack
updateConnectorIds(injector.getInstance(Announcer.class), injector.getInstance(CatalogManager.class));

接下来就针对上面的两点,进行深入的研究。

2.1 加载Catalog


2.1.1 loadCatalogs

看一下StaticCatalogStore的loadCatalogs()都做了哪些的工作

public class StaticCatalogStore
{
    private static final Logger log = Logger.get(StaticCatalogStore.class);
    //注释1:ConnectorManager用于管理管理Catalog
    private final ConnectorManager connectorManager;
    private final File catalogConfigurationDir;
    private final Set disabledCatalogs;
    private final AtomicBoolean catalogsLoading = new AtomicBoolean();

    @Inject
    public StaticCatalogStore(ConnectorManager connectorManager, StaticCatalogStoreConfig config)
    {
        this(connectorManager,
                config.getCatalogConfigurationDir(),
                firstNonNull(config.getDisabledCatalogs(), ImmutableList.of()));
    }

    public StaticCatalogStore(ConnectorManager connectorManager, File catalogConfigurationDir, List disabledCatalogs)
    {
        this.connectorManager = connectorManager;
        this.catalogConfigurationDir = catalogConfigurationDir;
        this.disabledCatalogs = ImmutableSet.copyOf(disabledCatalogs);
    }

    public void loadCatalogs()
            throws Exception
    {
        if (!catalogsLoading.compareAndSet(false, true)) {
            return;
        }
        
        for (File file : listFiles(catalogConfigurationDir)) {
            if (file.isFile() && file.getName().endsWith(".properties")) {
                //注释3:具体加载工作,下推到loadCatalog()进行
                loadCatalog(file);
            }
        }
    }

    private void loadCatalog(File file)
            throws Exception
    {
        String catalogName = Files.getNameWithoutExtension(file.getName());
        if (disabledCatalogs.contains(catalogName)) {
            log.info("Skipping disabled catalog %s", catalogName);
            return;
        }

        log.info("-- Loading catalog %s --", file);
        Map properties = new HashMap<>(loadPropertiesFrom(file.getPath()));

        String connectorName = properties.remove("connector.name");
        checkState(connectorName != null, "Catalog configuration %s does not contain connector.name", file.getAbsoluteFile());
        //注释4:加载后的Catalog信息,由connectorManager的createCatalog来进行Catalog的创建
        connectorManager.createCatalog(catalogName, connectorName, ImmutableMap.copyOf(properties));
        log.info("-- Added catalog %s using connector %s --", catalogName, connectorName);
    }

    ......
}

上面代码址行流程:

1.StaticCatalogStore在实例化阶段会进行依赖的注入,其中ConnectorManager在该阶段被注入,它用于管理Catalog。

2.loadCatalogs()被调用后,会到配置文件"etc/catalog/"目录加载properties文件,然后循坏调用loadCatalog()方法。

3.loadCatalog方法做了信息提取的工作。

主要由三个属性:

        catalogName:取值通过Files.getNameWithoutExtension(file.getName())获取,相当于文件的名称做为catalogName。

        connectorName:配置文件中connector.name的值

        properties:配置文件中除connector.name以外的值

完成这些变量值的提取后,调用ConnectorManager类的createCatalog方法。

4.createCatalog(): 由ConnectorManager的createCatalog方法完成Catalog的创建。

至此,我们已经了解到Catalog的创建工作是由ConnectorManager的createCatalog来完成,看了下ConnectorManager的代码其中也包含对Catalog的修改和删除的方法,对于二次开发来讲已经够用。

不过,既然源码已经读到这里了,我们就继续深入探索一下。


2.1.2 createCatalog

在ConnectorManager中重载了3个名为createCatalog的方法,按照调用顺序,依次来了解。

1) CatalogName createCatalog(String catalogName, String connectorName, Map properties)

public synchronized CatalogName createCatalog(String catalogName, String connectorName, Map properties)
    {
        requireNonNull(connectorName, "connectorName is null");
        //根据connectorName获取对应的工厂,为实例化Catalog做准备
        InternalConnectorFactory connectorFactory = connectorFactories.get(connectorName);
        checkArgument(connectorFactory != null, "No factory for connector '%s'.  Available factories: %s", connectorName, connectorFactories.keySet());
        //下推
        return createCatalog(catalogName, connectorFactory, properties);
    }

在这个createCatalog方法中,主要是获取connectorName的工厂类,然后调用第二createCatalog。

2)CatalogName createCatalog(String catalogName, InternalConnectorFactory connectorFactory, Map properties)

当前的createCatalog方法,主要做校验的工作(引入了CatalogManager),此外还有CatalogName的创建。这里关注一下,CatalogManager和CatalogName。

CatalogManager类: 管理当前节点中Catalog信息的容器

@ThreadSafe
public class CatalogManager
{
    private final ConcurrentMap catalogs = new ConcurrentHashMap<>();

    public synchronized void registerCatalog(Catalog catalog)
    {
        requireNonNull(catalog, "catalog is null");

        checkState(catalogs.put(catalog.getCatalogName(), catalog) == null, "Catalog '%s' is already registered", catalog.getCatalogName());
    }

    public Optional removeCatalog(String catalogName)
    {
        return Optional.ofNullable(catalogs.remove(catalogName))
                .map(Catalog::getConnectorCatalogName);
    }

    public List getCatalogs()
    {
        return ImmutableList.copyOf(catalogs.values());
    }

    public Optional getCatalog(String catalogName)
    {
        return Optional.ofNullable(catalogs.get(catalogName));
    }
}

CatalogName类:数据源名称的包装类,包含内部和外部两类

,其中isInternalSystemConnector()用于区分此数据源是否系统内部还是外部的。

public final class CatalogName
{
    private static final String INFORMATION_SCHEMA_CONNECTOR_PREFIX = "$info_schema@";
    private static final String SYSTEM_TABLES_CONNECTOR_PREFIX = "$system@";

    private final String catalogName;

    @JsonCreator
    public CatalogName(String catalogName)
    {
        this.catalogName = requireNonNull(catalogName, "catalogName is null");
        checkArgument(!catalogName.isEmpty(), "catalogName is empty");
    }

    public String getCatalogName()
    {
        return catalogName;
    }

    @Override
    public boolean equals(Object o)
    {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        CatalogName that = (CatalogName) o;
        return Objects.equals(catalogName, that.catalogName);
    }

    @Override
    public int hashCode()
    {
        return Objects.hash(catalogName);
    }

    @JsonValue
    @Override
    public String toString()
    {
        return catalogName;
    }

    public static boolean isInternalSystemConnector(CatalogName catalogName)
    {
        return catalogName.getCatalogName().startsWith(SYSTEM_TABLES_CONNECTOR_PREFIX) ||
                catalogName.getCatalogName().startsWith(INFORMATION_SCHEMA_CONNECTOR_PREFIX);
    }

    public static CatalogName createInformationSchemaCatalogName(CatalogName catalogName)
    {
        return new CatalogName(INFORMATION_SCHEMA_CONNECTOR_PREFIX + catalogName.getCatalogName());
    }

    public static CatalogName createSystemTablesCatalogName(CatalogName catalogName)
    {
        return new CatalogName(SYSTEM_TABLES_CONNECTOR_PREFIX + catalogName.getCatalogName());
    }
}

接下来调用第三个createCatalog

3)void createCatalog(CatalogName catalogName, InternalConnectorFactory factory, Map properties)

第三个createCatalog是最重要的,它完成了Catalog的具体创建工作。

private synchronized void createCatalog(CatalogName catalogName, InternalConnectorFactory factory, Map properties)
    {
        // create all connectors before adding, so a broken connector does not leave the system half updated
        //注释1: MaterializedConnector是一个内部类,也是Connector的包装类。此处调用createConnector创建Connector实例
        MaterializedConnector connector = new MaterializedConnector(catalogName, createConnector(catalogName, factory, properties));

        ConnectorHandleResolver connectorHandleResolver = connector.getConnector().getHandleResolver()
                .orElseGet(factory.getConnectorFactory()::getHandleResolver);
        checkArgument(connectorHandleResolver != null, "Connector %s does not have a handle resolver", factory);
        //注释2:创建Connector对应的informationSchemaConnector,informationSchemaConnector的catalogName为:$info_schema@+catalogName,相当于元数据信息的Connector
        MaterializedConnector informationSchemaConnector = new MaterializedConnector(
                createInformationSchemaCatalogName(catalogName),
                new InformationSchemaConnector(catalogName.getCatalogName(), nodeManager, metadataManager, accessControlManager));
        //注释3:创建系统表的CatalogName对象,systemId的catalogName为:$system@+catalogName
        CatalogName systemId = createSystemTablesCatalogName(catalogName);
        
        SystemTablesProvider systemTablesProvider;

        if (nodeManager.getCurrentNode().isCoordinator()) {
            systemTablesProvider = new CoordinatorSystemTablesProvider(
                    transactionManager,
                    metadataManager,
                    catalogName.getCatalogName(),
                    new StaticSystemTablesProvider(connector.getSystemTables()));
        }
        else {
            systemTablesProvider = new StaticSystemTablesProvider(connector.getSystemTables());
        }
        //注释5:创建Connector对应的systemConnector
        MaterializedConnector systemConnector = new MaterializedConnector(systemId, new SystemConnector(
                nodeManager,
                systemTablesProvider,
                transactionId -> transactionManager.getConnectorTransaction(transactionId, catalogName)));

        //注释6:创建catalog对象
        Catalog catalog = new Catalog(
                catalogName.getCatalogName(),
                connector.getCatalogName(),
                connector.getConnector(),
                informationSchemaConnector.getCatalogName(),
                informationSchemaConnector.getConnector(),
                systemConnector.getCatalogName(),
                systemConnector.getConnector());

        try {
            
            addConnectorInternal(connector);
            addConnectorInternal(informationSchemaConnector);
            addConnectorInternal(systemConnector);
            catalogManager.registerCatalog(catalog);
            handleResolver.addCatalogHandleResolver(catalogName.getCatalogName(), connectorHandleResolver);
        }
        catch (Throwable e) {
            handleResolver.removeCatalogHandleResolver(catalogName.getCatalogName());
            catalogManager.removeCatalog(catalog.getCatalogName());
            removeConnectorInternal(systemConnector.getCatalogName());
            removeConnectorInternal(informationSchemaConnector.getCatalogName());
            removeConnectorInternal(connector.getCatalogName());
            throw e;
        }
        //注释8:将Connector添加到事件监听器中
        connector.getEventListeners()
                .forEach(eventListenerManager::addEventListener);
    }

创建Catalog的具体工作流程分为8个部分:

1. 在注释1处,调用了createConnector(), 它完成的任务是调用connector name对应工厂的create方法,完成Connector的实例化。

private Connector createConnector(CatalogName catalogName, InternalConnectorFactory factory, Map properties)
    {
        ConnectorContext context = new ConnectorContextInstance(
                new ConnectorAwareNodeManager(nodeManager, nodeInfo.getEnvironment(), catalogName, schedulerIncludeCoordinator),
                versionEmbedder,
                new InternalTypeManager(metadataManager, typeOperators),
                pageSorter,
                pageIndexerFactory,
                factory.getDuplicatePluginClassLoaderFactory());

        try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(factory.getConnectorFactory().getClass().getClassLoader())) {
            //调用connector name对应工厂的create方法,完成Connector的实例化
            return factory.getConnectorFactory().create(catalogName.getCatalogName(), properties, context);
        }
    }

举个栗子:假如当前的ConnectorFactory是HiveConnectorFactory,在执行上面的create方法时,调用的是HiveConnectorFactory中的create方法,来完成Hive Connector的实例化。Trino在源码层抽象出来ConnectorFactory接口,不同数据源只需要实现这个接口,就可以创建对应的Connector实例,这也是Trino扩展性做的好的细节之处,值得学习。

2. 在注释2处,创建informationSchemaConnector

3. 在注释3处,创建系统表的CatalogName对象,systemId的catalogName为:$system@+catalogName

4. 在注释4处,根据节点的类型实例化SystemTablesProvider
5. 在注释5处,创建Connector对应的systemConnector

6. 在注释6处,创建创建Catalog对象。以下是Catalog的构造函数,可以看出一个Catalog包含3部分:自身的CatalogName和Connector对象,对应的informationSchema的CatalogName和Connector对象,systemTables的CatalogName和Connector对象。

public Catalog(
            String catalogName,
            CatalogName connectorCatalogName,
            Connector connector,
            CatalogName informationSchemaId,
            Connector informationSchema,
            CatalogName systemTablesId,
            Connector systemTables)
    {
        this.catalogName = checkCatalogName(catalogName);
        this.connectorCatalogName = requireNonNull(connectorCatalogName, "connectorCatalogName is null");
        this.connector = requireNonNull(connector, "connector is null");
        this.informationSchemaId = requireNonNull(informationSchemaId, "informationSchemaId is null");
        this.informationSchema = requireNonNull(informationSchema, "informationSchema is null");
        this.systemTablesId = requireNonNull(systemTablesId, "systemTablesId is null");
        this.systemTables = requireNonNull(systemTables, "systemTables is null");
    }

7. 在注释7处,connector的填充(如数据分切读取管理器SplitManager,用于索引的IndexProvider...),catalog注册到catalogManager,catalog处理器的注册

8. 在注释8处,将Connector添加到事件监听器中,就可以被使用了。

至此数据源加载和实例化的整个过程就完成了。

2.2 updateConnectorIds

由于上面的章节篇幅较长,此部分内容放到下一篇中进行介绍。


总结

本文首先介绍了Trino Restful开发的风格,为我们后来自定义Restful接口提供了有效的参考。然后对数据源的加载进行了深入的调研,明确了二次开发调用底层代码的边界ConnectorManager类中的方法即可。另外由于篇幅过长,鉴于阅读体验的原因,将updateConnectorIds的内容挪到下一篇文章中。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/681440.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号