目录
前言
源码阅读
1. Airlift中添加API接口
2. Trino加载Catalog
2.1 加载Catalog
2.1.1 loadCatalogs
2.1.2 createCatalog
2.2 updateConnectorIds
总结
前言
上文中提到,对Trino动态数据源管理的二次开发需要明确两个问题:
1.Trino的RestAPI框架采用的airlift,如何在airlift体系中添加API接口。
2.Trino加载数据源(Catalog)都做了哪些工作。
带着这两个问题,我们来深入的调查下Trino源码。
源码阅读
1. Airlift中添加API接口
1. Airlift中添加API接口
网上关于Airlift的资料比较少,因此直接从Trino的源码中找线索。在有限的资料里,了解到Airlift暴露url是通过@Path的方式,idea里面全局搜索一下,符合条件的还是很多。这里找到一个相对全面的类MemoryResource,该类主要是用于获取worker节点上的内存池信息的,为了节省空间,已将暂时无关的代码删除掉。
@Path("/v1/memory")
public class MemoryResource
{
private final LocalMemoryManager memoryManager;
private final TaskManager taskManager;
@Inject
public MemoryResource(LocalMemoryManager memoryManager, TaskManager taskManager)
{
this.memoryManager = requireNonNull(memoryManager, "memoryManager is null");
this.taskManager = requireNonNull(taskManager, "taskManager is null");
}
@ResourceSecurity(INTERNAL_ONLY)
@POST
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public MemoryInfo getMemoryInfo(MemoryPoolAssignmentsRequest request)
{
taskManager.updateMemoryPoolAssignments(request);
return memoryManager.getInfo();
}
@ResourceSecurity(MANAGEMENT_READ)
@GET
@Path("{poolId}")
public Response getMemoryInfo(@PathParam("poolId") String poolId)
{
if (GENERAL_POOL.getId().equals(poolId)) {
return toSuccessfulResponse(memoryManager.getGeneralPool().getInfo());
}
if (RESERVED_POOL.getId().equals(poolId) && memoryManager.getReservedPool().isPresent()) {
return toSuccessfulResponse(memoryManager.getReservedPool().get().getInfo());
}
return Response.status(NOT_FOUND).build();
}
......
}
汇总一下知识点:
前文中提到Airlift是一个整合了Java常用库的轻量级框架,那么在RestAPI部分引入了JAX-RS标准,JAX-RS全名是Java API for RESTful Web Services.它的核心理念是resource,可以理解为面向资源。
@Inject:用于对依赖的服务进行注入
@Path:请求的url路径,可以是固定如"/v1/memory",也支持动态的参数如"{poolId}"
@POST 和 @GET:分别对应不同的Http请求Post和Get
@Produces 和 @Consumes:分别对应返回实体的数据格式,以及请求实体的数据格式,其中数据格式包括,APPLICATION_XML、APPLICATION_JSON、MULTIPART_FORM_DATA、APPLICATION_OCTET_STREAM...
当然Trino本身也定义了自己的注解,如@ResourceSecurity,用于对资源的访问权限进行控制。
@documented
@Retention(RUNTIME)
@Target({TYPE, METHOD})
public @interface ResourceSecurity
{
enum AccessType
{
PUBLIC, WEB_UI, AUTHENTICATED_USER, MANAGEMENT_READ, MANAGEMENT_WRITE, INTERNAL_onLY
}
AccessType value();
}
可以看到定义了很多不同类型的资源访问权限: PUBLIC(公共权限), WEB_UI(Web控制台权限), AUTHENTICATED_USER(授权用户权限), MANAGEMENT_READ(集群管理读权限), MANAGEMENT_WRITE(集群管理写权限), INTERNAL_ONLY(内部权限)
至此,我们对Trino的Restful接口有了一定的了解。
2. Trino加载Catalog
Trino在启动的时候,Server类会进行初始化工作,其中有两部分代码与Catalog相关
//从配置文件中加载Catalog信息 injector.getInstance(StaticCatalogStore.class).loadCatalogs(); // TODO: remove this huge hack updateConnectorIds(injector.getInstance(Announcer.class), injector.getInstance(CatalogManager.class));
接下来就针对上面的两点,进行深入的研究。
2.1 加载Catalog
2.1.1 loadCatalogs
2.1.1 loadCatalogs
看一下StaticCatalogStore的loadCatalogs()都做了哪些的工作
public class StaticCatalogStore
{
private static final Logger log = Logger.get(StaticCatalogStore.class);
//注释1:ConnectorManager用于管理管理Catalog
private final ConnectorManager connectorManager;
private final File catalogConfigurationDir;
private final Set disabledCatalogs;
private final AtomicBoolean catalogsLoading = new AtomicBoolean();
@Inject
public StaticCatalogStore(ConnectorManager connectorManager, StaticCatalogStoreConfig config)
{
this(connectorManager,
config.getCatalogConfigurationDir(),
firstNonNull(config.getDisabledCatalogs(), ImmutableList.of()));
}
public StaticCatalogStore(ConnectorManager connectorManager, File catalogConfigurationDir, List disabledCatalogs)
{
this.connectorManager = connectorManager;
this.catalogConfigurationDir = catalogConfigurationDir;
this.disabledCatalogs = ImmutableSet.copyOf(disabledCatalogs);
}
public void loadCatalogs()
throws Exception
{
if (!catalogsLoading.compareAndSet(false, true)) {
return;
}
for (File file : listFiles(catalogConfigurationDir)) {
if (file.isFile() && file.getName().endsWith(".properties")) {
//注释3:具体加载工作,下推到loadCatalog()进行
loadCatalog(file);
}
}
}
private void loadCatalog(File file)
throws Exception
{
String catalogName = Files.getNameWithoutExtension(file.getName());
if (disabledCatalogs.contains(catalogName)) {
log.info("Skipping disabled catalog %s", catalogName);
return;
}
log.info("-- Loading catalog %s --", file);
Map properties = new HashMap<>(loadPropertiesFrom(file.getPath()));
String connectorName = properties.remove("connector.name");
checkState(connectorName != null, "Catalog configuration %s does not contain connector.name", file.getAbsoluteFile());
//注释4:加载后的Catalog信息,由connectorManager的createCatalog来进行Catalog的创建
connectorManager.createCatalog(catalogName, connectorName, ImmutableMap.copyOf(properties));
log.info("-- Added catalog %s using connector %s --", catalogName, connectorName);
}
......
}
上面代码址行流程:
1.StaticCatalogStore在实例化阶段会进行依赖的注入,其中ConnectorManager在该阶段被注入,它用于管理Catalog。
2.loadCatalogs()被调用后,会到配置文件"etc/catalog/"目录加载properties文件,然后循坏调用loadCatalog()方法。
3.loadCatalog方法做了信息提取的工作。
主要由三个属性:
catalogName:取值通过Files.getNameWithoutExtension(file.getName())获取,相当于文件的名称做为catalogName。
connectorName:配置文件中connector.name的值
properties:配置文件中除connector.name以外的值
完成这些变量值的提取后,调用ConnectorManager类的createCatalog方法。
4.createCatalog(): 由ConnectorManager的createCatalog方法完成Catalog的创建。
至此,我们已经了解到Catalog的创建工作是由ConnectorManager的createCatalog来完成,看了下ConnectorManager的代码其中也包含对Catalog的修改和删除的方法,对于二次开发来讲已经够用。
不过,既然源码已经读到这里了,我们就继续深入探索一下。
2.1.2 createCatalog
在ConnectorManager中重载了3个名为createCatalog的方法,按照调用顺序,依次来了解。
1) CatalogName createCatalog(String catalogName, String connectorName, Map
public synchronized CatalogName createCatalog(String catalogName, String connectorName, Mapproperties) { requireNonNull(connectorName, "connectorName is null"); //根据connectorName获取对应的工厂,为实例化Catalog做准备 InternalConnectorFactory connectorFactory = connectorFactories.get(connectorName); checkArgument(connectorFactory != null, "No factory for connector '%s'. Available factories: %s", connectorName, connectorFactories.keySet()); //下推 return createCatalog(catalogName, connectorFactory, properties); }
在这个createCatalog方法中,主要是获取connectorName的工厂类,然后调用第二createCatalog。
2)CatalogName createCatalog(String catalogName, InternalConnectorFactory connectorFactory, Map
当前的createCatalog方法,主要做校验的工作(引入了CatalogManager),此外还有CatalogName的创建。这里关注一下,CatalogManager和CatalogName。
CatalogManager类: 管理当前节点中Catalog信息的容器
@ThreadSafe
public class CatalogManager
{
private final ConcurrentMap catalogs = new ConcurrentHashMap<>();
public synchronized void registerCatalog(Catalog catalog)
{
requireNonNull(catalog, "catalog is null");
checkState(catalogs.put(catalog.getCatalogName(), catalog) == null, "Catalog '%s' is already registered", catalog.getCatalogName());
}
public Optional removeCatalog(String catalogName)
{
return Optional.ofNullable(catalogs.remove(catalogName))
.map(Catalog::getConnectorCatalogName);
}
public List getCatalogs()
{
return ImmutableList.copyOf(catalogs.values());
}
public Optional getCatalog(String catalogName)
{
return Optional.ofNullable(catalogs.get(catalogName));
}
}
CatalogName类:数据源名称的包装类,包含内部和外部两类
,其中isInternalSystemConnector()用于区分此数据源是否系统内部还是外部的。
public final class CatalogName
{
private static final String INFORMATION_SCHEMA_CONNECTOR_PREFIX = "$info_schema@";
private static final String SYSTEM_TABLES_CONNECTOR_PREFIX = "$system@";
private final String catalogName;
@JsonCreator
public CatalogName(String catalogName)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
checkArgument(!catalogName.isEmpty(), "catalogName is empty");
}
public String getCatalogName()
{
return catalogName;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CatalogName that = (CatalogName) o;
return Objects.equals(catalogName, that.catalogName);
}
@Override
public int hashCode()
{
return Objects.hash(catalogName);
}
@JsonValue
@Override
public String toString()
{
return catalogName;
}
public static boolean isInternalSystemConnector(CatalogName catalogName)
{
return catalogName.getCatalogName().startsWith(SYSTEM_TABLES_CONNECTOR_PREFIX) ||
catalogName.getCatalogName().startsWith(INFORMATION_SCHEMA_CONNECTOR_PREFIX);
}
public static CatalogName createInformationSchemaCatalogName(CatalogName catalogName)
{
return new CatalogName(INFORMATION_SCHEMA_CONNECTOR_PREFIX + catalogName.getCatalogName());
}
public static CatalogName createSystemTablesCatalogName(CatalogName catalogName)
{
return new CatalogName(SYSTEM_TABLES_CONNECTOR_PREFIX + catalogName.getCatalogName());
}
}
接下来调用第三个createCatalog
3)void createCatalog(CatalogName catalogName, InternalConnectorFactory factory, Map
第三个createCatalog是最重要的,它完成了Catalog的具体创建工作。
private synchronized void createCatalog(CatalogName catalogName, InternalConnectorFactory factory, Mapproperties) { // create all connectors before adding, so a broken connector does not leave the system half updated //注释1: MaterializedConnector是一个内部类,也是Connector的包装类。此处调用createConnector创建Connector实例 MaterializedConnector connector = new MaterializedConnector(catalogName, createConnector(catalogName, factory, properties)); ConnectorHandleResolver connectorHandleResolver = connector.getConnector().getHandleResolver() .orElseGet(factory.getConnectorFactory()::getHandleResolver); checkArgument(connectorHandleResolver != null, "Connector %s does not have a handle resolver", factory); //注释2:创建Connector对应的informationSchemaConnector,informationSchemaConnector的catalogName为:$info_schema@+catalogName,相当于元数据信息的Connector MaterializedConnector informationSchemaConnector = new MaterializedConnector( createInformationSchemaCatalogName(catalogName), new InformationSchemaConnector(catalogName.getCatalogName(), nodeManager, metadataManager, accessControlManager)); //注释3:创建系统表的CatalogName对象,systemId的catalogName为:$system@+catalogName CatalogName systemId = createSystemTablesCatalogName(catalogName); SystemTablesProvider systemTablesProvider; if (nodeManager.getCurrentNode().isCoordinator()) { systemTablesProvider = new CoordinatorSystemTablesProvider( transactionManager, metadataManager, catalogName.getCatalogName(), new StaticSystemTablesProvider(connector.getSystemTables())); } else { systemTablesProvider = new StaticSystemTablesProvider(connector.getSystemTables()); } //注释5:创建Connector对应的systemConnector MaterializedConnector systemConnector = new MaterializedConnector(systemId, new SystemConnector( nodeManager, systemTablesProvider, transactionId -> transactionManager.getConnectorTransaction(transactionId, catalogName))); //注释6:创建catalog对象 Catalog catalog = new Catalog( catalogName.getCatalogName(), connector.getCatalogName(), connector.getConnector(), informationSchemaConnector.getCatalogName(), informationSchemaConnector.getConnector(), systemConnector.getCatalogName(), systemConnector.getConnector()); try { addConnectorInternal(connector); addConnectorInternal(informationSchemaConnector); addConnectorInternal(systemConnector); catalogManager.registerCatalog(catalog); handleResolver.addCatalogHandleResolver(catalogName.getCatalogName(), connectorHandleResolver); } catch (Throwable e) { handleResolver.removeCatalogHandleResolver(catalogName.getCatalogName()); catalogManager.removeCatalog(catalog.getCatalogName()); removeConnectorInternal(systemConnector.getCatalogName()); removeConnectorInternal(informationSchemaConnector.getCatalogName()); removeConnectorInternal(connector.getCatalogName()); throw e; } //注释8:将Connector添加到事件监听器中 connector.getEventListeners() .forEach(eventListenerManager::addEventListener); }
创建Catalog的具体工作流程分为8个部分:
1. 在注释1处,调用了createConnector(), 它完成的任务是调用connector name对应工厂的create方法,完成Connector的实例化。
private Connector createConnector(CatalogName catalogName, InternalConnectorFactory factory, Mapproperties) { ConnectorContext context = new ConnectorContextInstance( new ConnectorAwareNodeManager(nodeManager, nodeInfo.getEnvironment(), catalogName, schedulerIncludeCoordinator), versionEmbedder, new InternalTypeManager(metadataManager, typeOperators), pageSorter, pageIndexerFactory, factory.getDuplicatePluginClassLoaderFactory()); try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(factory.getConnectorFactory().getClass().getClassLoader())) { //调用connector name对应工厂的create方法,完成Connector的实例化 return factory.getConnectorFactory().create(catalogName.getCatalogName(), properties, context); } }
举个栗子:假如当前的ConnectorFactory是HiveConnectorFactory,在执行上面的create方法时,调用的是HiveConnectorFactory中的create方法,来完成Hive Connector的实例化。Trino在源码层抽象出来ConnectorFactory接口,不同数据源只需要实现这个接口,就可以创建对应的Connector实例,这也是Trino扩展性做的好的细节之处,值得学习。
2. 在注释2处,创建informationSchemaConnector
3. 在注释3处,创建系统表的CatalogName对象,systemId的catalogName为:$system@+catalogName
4. 在注释4处,根据节点的类型实例化SystemTablesProvider
5. 在注释5处,创建Connector对应的systemConnector
6. 在注释6处,创建创建Catalog对象。以下是Catalog的构造函数,可以看出一个Catalog包含3部分:自身的CatalogName和Connector对象,对应的informationSchema的CatalogName和Connector对象,systemTables的CatalogName和Connector对象。
public Catalog(
String catalogName,
CatalogName connectorCatalogName,
Connector connector,
CatalogName informationSchemaId,
Connector informationSchema,
CatalogName systemTablesId,
Connector systemTables)
{
this.catalogName = checkCatalogName(catalogName);
this.connectorCatalogName = requireNonNull(connectorCatalogName, "connectorCatalogName is null");
this.connector = requireNonNull(connector, "connector is null");
this.informationSchemaId = requireNonNull(informationSchemaId, "informationSchemaId is null");
this.informationSchema = requireNonNull(informationSchema, "informationSchema is null");
this.systemTablesId = requireNonNull(systemTablesId, "systemTablesId is null");
this.systemTables = requireNonNull(systemTables, "systemTables is null");
}
7. 在注释7处,connector的填充(如数据分切读取管理器SplitManager,用于索引的IndexProvider...),catalog注册到catalogManager,catalog处理器的注册
8. 在注释8处,将Connector添加到事件监听器中,就可以被使用了。
至此数据源加载和实例化的整个过程就完成了。
2.2 updateConnectorIds
由于上面的章节篇幅较长,此部分内容放到下一篇中进行介绍。
总结
本文首先介绍了Trino Restful开发的风格,为我们后来自定义Restful接口提供了有效的参考。然后对数据源的加载进行了深入的调研,明确了二次开发调用底层代码的边界ConnectorManager类中的方法即可。另外由于篇幅过长,鉴于阅读体验的原因,将updateConnectorIds的内容挪到下一篇文章中。



