栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

clickhouse系列二 - flink推送数据到clickhouse

clickhouse系列二 - flink推送数据到clickhouse

前言

flink默认提供的sink无法直接访问clickhouse,本文基于flink-clickhouse-sink插件访问clickhouse

一、DataStream Connectors连接方式

1.插件地址 https://github.com/ivi-ru/flink-clickhouse-sink,下载好插件后需要写一个测试驱动主程序模块。


2.不下载源码的话需要添加maven依赖

		
            ru.ivi.opensource
            flink-clickhouse-sink
            1.2.0
        
二、编写实体类
public class J_User {
    public int id;
    public String name;
    public int age;
 
    public J_User(int id, String name, int age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }
 
    public static J_User of(int id, String name, int age) {
        return new J_User(id, name, age);
    }
 
    // Java Bean 必须实现的方法,信息通过字符串进行拼接
    public static String convertToCsv(J_User user) {
        StringBuilder builder = new StringBuilder();
        builder.append("(");
 
        // add user.id
        builder.append(user.id);
        builder.append(", ");
 
        // add user.name
        builder.append("'");
        builder.append(String.valueOf(user.name));
        builder.append("', ");
 
        // add user.age
        builder.append(user.age);
 
        builder.append(" )");
        return builder.toString();
    }
}

三、编写flink测试主程序
import com.lei.domain.J_User;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import ru.ivi.opensource.flinkclickhousesink.ClickHouseSink;
import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings;
import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst;
 
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
 

public class J05_ClickHouseSinkTestByLib {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        Map globalParameters = new HashMap<>();
 
        // ClickHouse cluster properties
        globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "http://localhost:8123/");
        //globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, ...);
        //globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, ...);
 
        // sink common
        globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, "1");
        globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "d:/");
        globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, "2");
        globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, "2");
        globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "2");
        globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false");
 
        // set global paramaters
        ParameterTool parameters = ParameterTool.fromMap(globalParameters);
        env.getConfig().setGlobalJobParameters(parameters);
 
        env.setParallelism(1);
 
        // source
        DataStream inputStream = env.socketTextStream("localhost", 7777);
 
        // Transform 操作
        SingleOutputStreamOperator dataStream = inputStream.map(new MapFunction() {
            @Override
            public String map(String data) throws Exception {
                String[] split = data.split(",");
                J_User user = J_User.of(Integer.parseInt(split[0]),
                        split[1],
                        Integer.parseInt(split[2]));
                return J_User.convertToCsv(user);
            }
        });
 
        // create props for sink
        Properties props = new Properties();
        props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, "default.user_table");
        props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, "10000");
        ClickHouseSink sink = new ClickHouseSink(props);
        dataStream.addSink(sink);
        dataStream.print();
 
        env.execute("clickhouse sink test");
    }
}
四、验证连接

去官网下载netcat软件:

启动socket端口7777,并启动flink测试程序

输入数据

 查询clickhouse数据库,验证数据是否写入clickhouse

五、附言

正式环境连接clickhouse数据库是需要有账号/密码的,可在上一章节中通过配置clickhouse中的user.xml来创建连接账号/密码

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/423105.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号