栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink的批流统一:Ⅴ

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink的批流统一:Ⅴ

序言

以官网的例子为起点,选用Kafka为source和sink ,了解下批流统一的使用cuiyaonan2000@163.com

参考资料:

    Kafka | Apache Flink       ----表连接器JSON | Apache Flink       ----表格式器

DEMO

如下是官网创建Kafka的SQL,后面都是针对该SQL开始的.据说会了Kafka,我们就可以掌握Hive,Haddop所以一个药引子的作用非常大cuiyaonan2000@163.com

package cui.yao.nan.flink;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.nio.file.FileSystem;


public class Test2 {

    public static void main(String[] args) throws Exception {


        //创建流式环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        //创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        TableResult tableResult = tableEnv.executeSql("CREATE TABLE jjjk (" +
                "  myoffset  BIGINT metaDATA FROM 'offset' VIRTUAL," +
                "  mypartition  BIGINT metaDATA FROM 'partition' VIRTUAL," +
                "  id BIGINT," +
                "  name STRING," +
                "  age BIGINT " +
                ") WITH (" +
                " 'connector' = 'kafka'," +
                " 'topic' = 'topic-name-cui'," +
                " 'properties.bootstrap.servers' = '172.17.15.2:9092'," +
                " 'properties.group.id' = 'testGroup'," +
                " 'scan.startup.mode' = 'earliest-offset'," +
                " 'format' = 'json'," +
                " 'json.fail-on-missing-field' = 'false'," +
                " 'json.ignore-parse-errors' = 'true'" +
                ")");

        Table table = tableEnv.sqlQuery("select id,name,age,mypartition,myoffset From jjjk");

        // 将该视图结果在转成一个流
        DataStream resultStream = tableEnv.toDataStream(table);

        // add a printing sink and execute in DataStream API
        resultStream.print();

        env.execute();
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/785504.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号