栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

trino 373 windows下编译及运行DevelopmentServer

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

trino 373 windows下编译及运行DevelopmentServer

编译

trino 本身是要求在类linux下运行,用mac等。在windows下是无法全部编译的,比如trino-server-rpm这module是无法编译成功。
因此要取消root下部分无法编译的module,以下是修改后可以成功的modules:

 
        client/trino-cli
        client/trino-client
        client/trino-jdbc
        core/trino-main
        core/trino-parser
        core/trino-server
        core/trino-server-main
    
        core/trino-spi
        
          lib/trino-array
          lib/trino-collect
          lib/trino-geospatial-toolkit
          lib/trino-matching
          lib/trino-memory-context
          lib/trino-orc
          lib/trino-parquet
          lib/trino-plugin-toolkit
          lib/trino-rcfile
          lib/trino-record-decoder
          plugin/trino-accumulo
          plugin/trino-accumulo-iterators
          plugin/trino-atop
          plugin/trino-base-jdbc
          plugin/trino-bigquery
          plugin/trino-blackhole
          plugin/trino-cassandra
          plugin/trino-clickhouse
          plugin/trino-delta-lake
          plugin/trino-druid
          plugin/trino-elasticsearch
          plugin/trino-example-http
          plugin/trino-exchange
          plugin/trino-geospatial
          plugin/trino-google-sheets
          plugin/trino-hive
          plugin/trino-hive-hadoop2
          plugin/trino-http-event-listener
          plugin/trino-iceberg
          plugin/trino-jmx
          plugin/trino-kafka
          plugin/trino-kinesis
          plugin/trino-kudu
          plugin/trino-local-file
          plugin/trino-memory
          plugin/trino-memsql
          plugin/trino-ml
          plugin/trino-mongodb
          plugin/trino-mysql
          plugin/trino-oracle
          plugin/trino-password-authenticators
          plugin/trino-phoenix
          plugin/trino-phoenix5
          plugin/trino-pinot
          plugin/trino-postgresql
          plugin/trino-prometheus
          plugin/trino-raptor-legacy
          plugin/trino-redis
          plugin/trino-redshift
          plugin/trino-resource-group-managers
          plugin/trino-session-property-managers
          plugin/trino-sqlserver
          plugin/trino-teradata-functions
          plugin/trino-thrift
          plugin/trino-thrift-api
          plugin/trino-thrift-testing-server
          plugin/trino-tpcds
          plugin/trino-tpch
          service/trino-proxy
      
          testing/trino-benchmark
          testing/trino-benchto-benchmarks
          testing/trino-product-tests
        
          testing/trino-server-dev
          testing/trino-test-jdbc-compatibility-old-driver
          testing/trino-test-jdbc-compatibility-old-server
          testing/trino-testing
        
        
DevelopmentServer 类修改

如果不加载FileSystemExchangePlugin ,可能出现如下错误:

java.lang.IllegalArgumentException: Exchange manager factory 'filesystem' is not registered. Available factories: []
	at com.google.common.base.Preconditions.checkArgument(Preconditions.java:435)
	at io.trino.exchange.ExchangeManagerRegistry.loadExchangeManager(ExchangeManagerRegistry.java:88)
	at io.trino.exchange.ExchangeManagerRegistry.loadExchangeManager(ExchangeManagerRegistry.java:78)
	at io.trino.server.DevelopmentServer.doStart(DevelopmentServer.java:162)
	at io.trino.server.DevelopmentServer.lambda$devstart$1(DevelopmentServer.java:100)
	at io.trino.$gen.Trino_dev____20220323_001820_1.run(Unknown Source)
	at io.trino.server.DevelopmentServer.devstart(DevelopmentServer.java:100)
	at io.trino.server.DevelopmentServer.main(DevelopmentServer.java:95)
injector.getInstance(ExchangeManagerRegistry.class).loadExchangeManager();

同时在 public class Server 类中调用verifyJvmRequirements(); 这个verify函数会调用以下代码:

private static void verifyOsArchitecture()
{
    String osName = StandardSystemProperty.OS_NAME.value();
    String osArch = StandardSystemProperty.OS_ARCH.value();
    if ("Linux".equals(osName)) {
        if (!ImmutableSet.of("amd64", "aarch64", "ppc64le").contains(osArch)) {
            failRequirement("Trino requires amd64, aarch64, or ppc64le on Linux (found %s)", osArch);
        }
        if ("aarch64".equals(osArch)) {
            warnRequirement("Support for the ARM architecture is experimental");
        }
        else if ("ppc64le".equals(osArch)) {
            warnRequirement("Support for the POWER architecture is experimental");
        }
    }
    else if ("Mac OS X".equals(osName)) {
        if (!"x86_64".equals(osArch) && !"aarch64".equals(osArch)) {
            failRequirement("Trino requires x86_64 or aarch64 on Mac OS X (found %s)", osArch);
        }
    }
    else {
        failRequirement("Trino requires Linux or Mac OS X (found %s)", osName);
    }
}

导致无法运行,因此需要修改DevelopmentServer 代码,修改后的完整代码如下:

package io.trino.server;

import com.google.common.base.Joiner;
import com.google.common.base.StandardSystemProperty;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import com.google.inject.util.Types;
import io.airlift.bootstrap.ApplicationConfigurationException;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.discovery.client.Announcer;
import io.airlift.discovery.client.DiscoveryModule;
import io.airlift.discovery.client.ServiceAnnouncement;
import io.airlift.event.client.EventModule;
import io.airlift.event.client.JsonEventModule;
import io.airlift.http.server.HttpServerModule;
import io.airlift.jaxrs.JaxrsModule;
import io.airlift.jmx.JmxHttpModule;
import io.airlift.jmx.JmxModule;
import io.airlift.json.JsonModule;
import io.airlift.log.LogJmxModule;
import io.airlift.log.Logger;
import io.airlift.node.NodeModule;
import io.airlift.tracetoken.TraceTokenModule;
import io.trino.client.NodeVersion;
import io.trino.eventlistener.EventListenerManager;
import io.trino.eventlistener.EventListenerModule;
import io.trino.exchange.ExchangeManagerModule;
import io.trino.execution.resourcegroups.ResourceGroupManager;
import io.trino.execution.warnings.WarningCollectorModule;
import io.trino.metadata.Catalog;
import io.trino.metadata.CatalogManager;
import io.trino.metadata.StaticCatalogStore;
import io.trino.security.AccessControlManager;
import io.trino.security.AccessControlModule;
import io.trino.security.GroupProviderManager;
import io.trino.server.PluginManager.PluginsProvider;
import io.trino.server.security.CertificateAuthenticatorManager;
import io.trino.server.security.HeaderAuthenticatorManager;
import io.trino.server.security.PasswordAuthenticatorManager;
import io.trino.server.security.ServerSecurityModule;
import io.trino.version.EmbedVersion;
import org.weakref.jmx.guice.MBeanModule;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.discovery.client.ServiceAnnouncement.serviceAnnouncement;
import static io.trino.server.TrinoSystemRequirements.verifySystemTimeIsReasonable;
import static java.lang.String.format;
import static java.nio.file.linkOption.NOFOLLOW_linkS;

public final class DevelopmentServer
        extends Server
{
    private DevelopmentServer() {}

    @Override
    protected Iterable getAdditionalModules()
    {
        return ImmutableList.of(binder -> {
            newOptionalBinder(binder, PluginsProvider.class).setBinding()
                    .to(DevelopmentPluginsProvider.class).in(Scopes.SINGLETON);
            configBinder(binder).bindConfig(DevelopmentLoaderConfig.class);
        });
    }

    public static void main(String[] args)
    {
        new DevelopmentServer().devstart("dev");
    }

    public void devstart(String trinoVersion)
    {
        new EmbedVersion(trinoVersion).embedVersion(() -> doStart(trinoVersion)).run();
    }

    private void doStart(String trinoVersion)
    {
//        verifyJvmRequirements();
        verifySystemTimeIsReasonable();

        Logger log = Logger.get(Server.class);
        log.info("Java version: %s", StandardSystemProperty.JAVA_VERSION.value());

        ImmutableList.Builder modules = ImmutableList.builder();
        modules.add(
                new NodeModule(),
                new DiscoveryModule(),
                new HttpServerModule(),
                new JsonModule(),
                new JaxrsModule(),
                new MBeanModule(),
                new PrefixObjectNameGeneratorModule("io.trino"),
                new JmxModule(),
                new JmxHttpModule(),
                new LogJmxModule(),
                new TraceTokenModule(),
                new EventModule(),
                new JsonEventModule(),
                new ServerSecurityModule(),
                new AccessControlModule(),
                new EventListenerModule(),
                new ExchangeManagerModule(),
                new CoordinatorDiscoveryModule(),
                new ServerMainModule(trinoVersion),
                new GracefulShutdownModule(),
                new WarningCollectorModule());

        modules.addAll(getAdditionalModules());

        Bootstrap app = new Bootstrap(modules.build());

        try {
            Injector injector = app.initialize();

            log.info("Trino version: %s", injector.getInstance(NodeVersion.class).getVersion());
            logLocation(log, "Working directory", Paths.get("."));
            logLocation(log, "Etc directory", Paths.get("etc"));

            PluginManager pm = injector.getInstance(PluginManager.class);

            pm.loadPlugins();

            injector.getInstance(StaticCatalogStore.class).loadCatalogs();

            // TODO: remove this huge hack
            updateConnectorIds(injector.getInstance(Announcer.class), injector.getInstance(CatalogManager.class));

            injector.getInstance(SessionPropertyDefaults.class).loadConfigurationManager();
            injector.getInstance(ResourceGroupManager.class).loadConfigurationManager();
            injector.getInstance(AccessControlManager.class).loadSystemAccessControl();
            injector.getInstance(optionalKey(PasswordAuthenticatorManager.class))
                    .ifPresent(PasswordAuthenticatorManager::loadPasswordAuthenticator);
            injector.getInstance(EventListenerManager.class).loadEventListeners();
            injector.getInstance(GroupProviderManager.class).loadConfiguredGroupProvider();
            // 没有加载FileSystemExchangePlugin类的plugin,不能启动loadExchangeManager
            //injector.getInstance(ExchangeManagerRegistry.class).loadExchangeManager();
            injector.getInstance(CertificateAuthenticatorManager.class).loadCertificateAuthenticator();
            injector.getInstance(optionalKey(HeaderAuthenticatorManager.class))
                    .ifPresent(HeaderAuthenticatorManager::loadHeaderAuthenticator);

            injector.getInstance(Announcer.class).start();

            injector.getInstance(StartupStatus.class).startupComplete();

            log.info("======== SERVER STARTED ========");
        }
        catch (ApplicationConfigurationException e) {
            StringBuilder message = new StringBuilder();
            message.append("Configuration is invalidn");
            message.append("==========n");
            addMessages(message, "Errors", ImmutableList.copyOf(e.getErrors()));
            addMessages(message, "Warnings", ImmutableList.copyOf(e.getWarnings()));
            message.append("n");
            message.append("==========");
            log.error("%s", message);
            System.exit(1);
        }
        catch (Throwable e) {
            log.error(e);
            System.exit(1);
        }
    }

    @SuppressWarnings("unchecked")
    private static  Key> optionalKey(Class type)
    {
        return Key.get((TypeLiteral>) TypeLiteral.get(Types.newParameterizedType(Optional.class, type)));
    }

    private static void logLocation(Logger log, String name, Path path)
    {
        if (!Files.exists(path, NOFOLLOW_linkS)) {
            log.info("%s: [does not exist]", name);
            return;
        }
        try {
            path = path.toAbsolutePath().toRealPath();
        }
        catch (IOException e) {
            log.info("%s: [not accessible]", name);
            return;
        }
        log.info("%s: %s", name, path);
    }

    private static void addMessages(StringBuilder output, String type, List messages)
    {
        if (messages.isEmpty()) {
            return;
        }
        output.append("n").append(type).append(":nn");
        for (int index = 0; index < messages.size(); index++) {
            output.append(format("%s) %sn", index + 1, messages.get(index)));
        }
    }

    private static void updateConnectorIds(Announcer announcer, CatalogManager metadata)
    {
        // get existing announcement
        ServiceAnnouncement announcement = getTrinoAnnouncement(announcer.getServiceAnnouncements());

        // automatically build connectorIds if not configured
        Set connectorIds = metadata.getCatalogs().stream()
                .map(Catalog::getConnectorCatalogName)
                .map(Object::toString)
                .collect(toImmutableSet());

        // build announcement with updated sources
        ServiceAnnouncement.ServiceAnnouncementBuilder builder = serviceAnnouncement(announcement.getType());
        builder.addProperties(announcement.getProperties());
        builder.addProperty("connectorIds", Joiner.on(',').join(connectorIds));

        // update announcement
        announcer.removeServiceAnnouncement(announcement.getId());
        announcer.addServiceAnnouncement(builder.build());
    }

    private static ServiceAnnouncement getTrinoAnnouncement(Set announcements)
    {
        for (ServiceAnnouncement announcement : announcements) {
            if (announcement.getType().equals("trino")) {
                return announcement;
            }
        }
        throw new IllegalArgumentException("Trino announcement not found: " + announcements);
    }
}
 
DevelopmentPluginsProvider 修改 

在https://blog.csdn.net/weixin_40455124/article/details/123676315?spm=1001.2014.3001.5501提到pom模式加载plugins会失败,在DevelopmentPluginsProvider 加载jar模式默认也会由于找不到service :io.trino.spi.Plugin 失败,导致这个的根本原因是缺少文件:meta-INF/services/io.trino.spi.Plugin

这个文件在源代码是没有的,由maven 打包生成,以tpch为例,源代码目录如下

生成的zip文件如下

需要修改createClassLoader(ps,每个plugins有独立的classloader,这种模式运行不同plugins之间的dependency jar版本可以不同)函数如下

 private static PluginClassLoader createClassLoader(List artifacts, Function, PluginClassLoader> classLoaderFactory)
            throws IOException
    {
        List urls = new ArrayList<>();
        for (Artifact artifact : sortedArtifacts(artifacts)) {
            System.out.println("====artifact=====" + artifact.toString());
            if (artifact.getFile() == null) {
                throw new RuntimeException("Could not resolve artifact: " + artifact);
            }
            File file = artifact.getFile().getCanonicalFile();
            urls.add(file.toURI().toURL());
        }

     //   String userDir = System.getProperty("user.dir");
        Path currentRelativePath = Paths.get("");
        String userDir = currentRelativePath.toAbsolutePath().toString();
        File file = new File(userDir + "/src/main/resources/" + artifacts.get(0).getArtifactId() + "/");
        urls.add(file.toURI().toURL());
//        urls.add(new URL("file:/D:/mydocuments/as4_code/perstodb/trino-mysql-373-services.jar"));
        System.out.println("resouces dir==" + file.getPath());
        return classLoaderFactory.apply(urls);
    }

根据需要装载的plugins,创建对应文件,示意如下:

etc/catalog文件

如果出现类似如下错误

2022-03-22T21:22:33.663+0800	ERROR	main	io.trino.server.Server	No factory for connector 'jmx'.  Available factories: [system, mysql]
java.lang.IllegalArgumentException: No factory for connector 'jmx'.  Available factories: [system, mysql]
	at com.google.common.base.Preconditions.checkArgument(Preconditions.java:435)
	at io.trino.connector.ConnectorManager.createCatalog(ConnectorManager.java:226)
	at io.trino.metadata.StaticCatalogStore.loadCatalog(StaticCatalogStore.java:88)

可以根据提示去检查etc/catalog 目录下文件

文件应该和config.properties里面plugin.bundles的值对应上,不能多。

config.properties 文件修改

config.properties文件中最重要的是plugin.bundles的值,trino目前使用airlift的resolveArtifacts进行内容解析,初步看必须把插件先使用maven install后才能被解析,调用;核心逻辑代码如下

从
private PluginClassLoader buildClassLoaderFromCoordinates(String coordinates, Function, PluginClassLoader> classLoaderFactory)
        throws IOException
{
    Artifact rootArtifact = new DefaultArtifact(coordinates);
    List artifacts = resolver.resolveArtifacts(rootArtifact);
    return createClassLoader(artifacts, classLoaderFactory);
}


调用 从远程"https://repo1.maven.org/maven2/"获取信息
public List resolveArtifacts(Iterable sourceArtifacts) {
    CollectRequest collectRequest = new CollectRequest();
    Iterator var3 = sourceArtifacts.iterator();

    while(var3.hasNext()) {
        Artifact sourceArtifact = (Artifact)var3.next();
        collectRequest.addDependency(new Dependency(sourceArtifact, "runtime"));
    }

    RemoteRepository repository;
    for(var3 = this.repositories.iterator(); var3.hasNext(); collectRequest.addRepository(repository)) {
        repository = (RemoteRepository)var3.next();
        if (DEPRECATED_MAVEN_CENTRAL_URIS.contains(repository.getUrl())) {
            repository = new RemoteRepository(repository.getId(), repository.getContentType(), "https://repo1.maven.org/maven2/");
        }
    }

    DependencyRequest dependencyRequest = new DependencyRequest(collectRequest, DependencyFilterUtils.classpathFilter(new String[]{"runtime"}));
    return this.resolveArtifacts(dependencyRequest);
}

测试用的config.properties文件如下,最重要的是plugin.bundles

#
# WARNING
# ^^^^^^^
# This configuration file is for development only and should NOT be used
# in production. For example configuration, see the Trino documentation.
#

# sample nodeId to provide consistency across test runs
node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
node.environment=test
node.internal-address=localhost
experimental.concurrent-startup=true
http-server.http.port=8080

discovery.uri=http://localhost:8080

exchange.http-client.max-connections=1000
exchange.http-client.max-connections-per-server=1000
exchange.http-client.connect-timeout=1m
exchange.http-client.idle-timeout=1m

scheduler.http-client.max-connections=1000
scheduler.http-client.max-connections-per-server=1000
scheduler.http-client.connect-timeout=1m
scheduler.http-client.idle-timeout=1m

query.client.timeout=5m
query.min-expire-age=30m

plugin.bundles=io.trino:trino-local-file:jar:373,io.trino:trino-tpch:jar:373

  # ,io.trino:trino-jmx:jar:373


# 
# 
 # ../../plugin/trino-resource-group-managers/pom.xml,
 # ../../plugin/trino-password-authenticators/pom.xml, 
 # ../../plugin/trino-tpch/pom.xml

node-scheduler.include-coordinator=true
maven.repo.remote=http://maven.aliyun.com/nexus/content/repositories/central
转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号