栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink消费kafka数据实时写入Clickhouse(java版本)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink消费kafka数据实时写入Clickhouse(java版本)

集群规划(辛苦我的小本本了,拖8台centos):

flink采用on yarn模式,机器资源有限,ck只装了单节点

域名IP安装的软件运行的进程
zcx1192.168.220.128

hadoop

flink

kafka

NameNode

DFSZKFailoverController(zkfc)

JobManager

TaskManager

zcx2192.168.220.129

hadoop

flink

NameNode

DFSZKFailoverController(zkfc)

JobManager

TaskManager

zcx3192.168.220.130

hadoop

flink

ResourceManager

TaskManager

zcx4192.168.220.131

hadoop

zookeeper

kafka

DataNode

NodeManager

JournalNode

QuorumPeerMain

zcx5192.168.220.132

hadoop

zookeeper

kafka

DataNode

NodeManager

JournalNode

QuorumPeerMain

zcx6192.168.220.133    

hadoop

zookeeper

kafka

DataNode

NodeManager

JournalNode

QuorumPeerMain

zcx7192.168.220.134hadoopResourceManager
ck3192.168.220.142clickhoueclickhouse-server

flink主要代码

public class Kafka_To_Flink_To_Clickhouse {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
        Properties properties=new Properties();
        properties.setProperty("bootstrap.servers","zcx4:9092");
        FlinkKafkaConsumer stringFlinkKafkaConsumer = new FlinkKafkaConsumer("zcx1",new SimpleStringSchema(),properties);
        stringFlinkKafkaConsumer.setStartFromEarliest();
        DataStreamSource topic = env.addSource(stringFlinkKafkaConsumer);
        SingleOutputStreamOperator map = topic.map(new MapFunction() {
            @Override
            public String map(String s) throws Exception {
                return s;
            }
        });
        tenv.registerDataStream("zcx1",map,"name");
        Table result = tenv.sqlQuery("select name from zcx1");
        DataStream rowDataStream = tenv.toDataStream(result,Zcx1.class);
        rowDataStream.print();
        rowDataStream.addSink(new MyClickhouseUtil());
        env.execute();
    }
}
public class MyClickhouseUtil extends RichSinkFunction {
    Connection connection;
    PreparedStatement preparedStatement;

    @Override
    public void invoke(Zcx1 value, Context context) throws Exception {
        preparedStatement.setString(1,value.name);
//        preparedStatement.setInt(2,value.num);
        preparedStatement.execute();
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
        connection = DriverManager.getConnection("jdbc:clickhouse://192.168.220.142:8123/default","default","GsAdBi/O");
        preparedStatement = connection.prepareStatement("insert into zcx1 values(?)");
    }

    @Override
    public void close() throws Exception {
        if(null!=connection){
            connection.close();
        }
        if(null!=preparedStatement){
            preparedStatement.close();
        }
    }
}

测试

kafka producer生产数据

实时写入clickhouse

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/719284.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号