以官网的例子为起点,选用Kafka为source和sink ,了解下批流统一的使用cuiyaonan2000@163.com
参考资料:
- Kafka | Apache Flink ----表连接器JSON | Apache Flink ----表格式器
如下是官网创建Kafka的SQL,后面都是针对该SQL开始的.据说会了Kafka,我们就可以掌握Hive,Haddop所以一个药引子的作用非常大cuiyaonan2000@163.com
package cui.yao.nan.flink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.nio.file.FileSystem;
public class Test2 {
public static void main(String[] args) throws Exception {
//创建流式环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//创建表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
TableResult tableResult = tableEnv.executeSql("CREATE TABLE jjjk (" +
" myoffset BIGINT metaDATA FROM 'offset' VIRTUAL," +
" mypartition BIGINT metaDATA FROM 'partition' VIRTUAL," +
" id BIGINT," +
" name STRING," +
" age BIGINT " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'topic-name-cui'," +
" 'properties.bootstrap.servers' = '172.17.15.2:9092'," +
" 'properties.group.id' = 'testGroup'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'json'," +
" 'json.fail-on-missing-field' = 'false'," +
" 'json.ignore-parse-errors' = 'true'" +
")");
Table table = tableEnv.sqlQuery("select id,name,age,mypartition,myoffset From jjjk");
// 将该视图结果在转成一个流
DataStream resultStream = tableEnv.toDataStream(table);
// add a printing sink and execute in DataStream API
resultStream.print();
env.execute();
}
}



