package com.wjy.tableapi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
public class TableTest4KafkaPipeLine {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//连接kafka
tableEnv.connect(new Kafka()
.version("2.4.1") //问题点调整为.version("universal")
.topic("sensor")
.property("zookeeper.connect", "hadoop102:2181")
.property("bootstrap.servers", "hadoop102:9092")
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
).createTemporaryTable("inputTable");
//简单转换
Table sensorTable = tableEnv.from("inputTable");
//tableAPI简单转换
Table resultTable = sensorTable.select("id,temp")
.filter("id === 'sensor_6'");
//聚合统计
Table aggTable = sensorTable.groupBy("id")
.select("id,id.count as count,temp.avg as avgTemp");
//sink 建立kafka,写入kafka
tableEnv.connect(new Kafka()
.version("2.4.1") //问题点 ,调整为.version("universal")
.topic("sinktest")
.property("zookeeper.connect", "hadoop102:2181")
.property("bootstrap.servers", "hadoop102:9092")
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
// .field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
).inAppendMode().createTemporaryTable("outputTable");
resultTable.executeInsert("outputTable");
}
}
报错信息,如下:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:45)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:76)
at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:187)
at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:96)
at org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)
at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:351)
at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:154)
at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:151)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133)
at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:150)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:150)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133)
at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:76)
at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:150)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:150)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133)
at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:72)
at org.apache.flink.table.operations.FilterQueryOperation.accept(FilterQueryOperation.java:67)
at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:184)
at org.apache.flink.table.planner.delegation.Plannerbase.translateToRel(Plannerbase.scala:198)
at org.apache.flink.table.planner.delegation.Plannerbase.$anonfun$translate$1(Plannerbase.scala:162)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.Plannerbase.translate(Plannerbase.scala:162)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:554)
at com.wjy.tableapi.TableTest4KafkaPipeLine.main(TableTest4KafkaPipeLine.java:59)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: Required context properties mismatch.
The matching candidates:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
Mismatched properties:
'connector.version' expects 'universal', but is '2.4.1'
The following properties are requested:
connector.properties.bootstrap.servers=hadoop102:9092
connector.properties.zookeeper.connect=hadoop102:2181
connector.property-version=1
connector.topic=sensor
connector.type=kafka
connector.version=2.4.1
format.property-version=1
format.type=csv
schema.0.data-type=VARCHAr(2147483647)
schema.0.name=id
schema.1.data-type=BIGINT
schema.1.name=timestamp
schema.2.data-type=DOUBLE
schema.2.name=temp
The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:300)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:178)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:139)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:93)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:41)
... 44 more
解决方式:
.version("2.4.1") 调整为:.version("universal") ,就解决了



