栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink Table连接Kafka遇到的问题,已解决

Flink Table连接Kafka遇到的问题,已解决

package com.wjy.tableapi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;

public class TableTest4KafkaPipeLine {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //连接kafka
        tableEnv.connect(new Kafka()
                .version("2.4.1") //问题点调整为.version("universal") 
                .topic("sensor")
                .property("zookeeper.connect", "hadoop102:2181")
                .property("bootstrap.servers", "hadoop102:9092")
        )
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE())
                ).createTemporaryTable("inputTable");

        //简单转换
        Table sensorTable = tableEnv.from("inputTable");

        //tableAPI简单转换
        Table resultTable = sensorTable.select("id,temp")
                .filter("id === 'sensor_6'");

        //聚合统计
        Table aggTable = sensorTable.groupBy("id")
                .select("id,id.count as count,temp.avg as avgTemp");


        //sink 建立kafka,写入kafka
        tableEnv.connect(new Kafka()
                .version("2.4.1") //问题点 ,调整为.version("universal") 
                .topic("sinktest")
                .property("zookeeper.connect", "hadoop102:2181")
                .property("bootstrap.servers", "hadoop102:9092")
        )
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
//                        .field("timestamp", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE())
                ).inAppendMode().createTemporaryTable("outputTable");

        resultTable.executeInsert("outputTable");



    }

}

报错信息,如下:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
    at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:45)
    at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:76)
    at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:187)
    at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:96)
    at org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)
    at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)
    at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)
    at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:351)
    at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:154)
    at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
    at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:151)
    at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133)
    at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
    at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
    at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:150)
    at java.util.Collections$SingletonList.forEach(Collections.java:4824)
    at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:150)
    at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133)
    at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
    at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:76)
    at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:150)
    at java.util.Collections$SingletonList.forEach(Collections.java:4824)
    at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:150)
    at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133)
    at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:72)
    at org.apache.flink.table.operations.FilterQueryOperation.accept(FilterQueryOperation.java:67)
    at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:184)
    at org.apache.flink.table.planner.delegation.Plannerbase.translateToRel(Plannerbase.scala:198)
    at org.apache.flink.table.planner.delegation.Plannerbase.$anonfun$translate$1(Plannerbase.scala:162)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike.map(TraversableLike.scala:233)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.flink.table.planner.delegation.Plannerbase.translate(Plannerbase.scala:162)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
    at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
    at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:554)
    at com.wjy.tableapi.TableTest4KafkaPipeLine.main(TableTest4KafkaPipeLine.java:59)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
Mismatched properties:
'connector.version' expects 'universal', but is '2.4.1'

The following properties are requested:
connector.properties.bootstrap.servers=hadoop102:9092
connector.properties.zookeeper.connect=hadoop102:2181
connector.property-version=1
connector.topic=sensor
connector.type=kafka
connector.version=2.4.1
format.property-version=1
format.type=csv
schema.0.data-type=VARCHAr(2147483647)
schema.0.name=id
schema.1.data-type=BIGINT
schema.1.name=timestamp
schema.2.data-type=DOUBLE
schema.2.name=temp

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
    at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:300)
    at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:178)
    at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:139)
    at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:93)
    at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:41)
    ... 44 more

解决方式:

.version("2.4.1")  调整为:.version("universal") ,就解决了

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745635.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号