栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink

Flink

Flink Sink
  • Flink-clickhouse JDBC sink:

1.若是没有接触过clickhouse的同学可以先看下java连接clickhouse的方法。

@Test
public void clickhouseTest(){
    //使用clickhouse官网推荐的JDBC连接方式,使用了官网推荐的连接驱动
    String url = "jdbc:clickhouse://xxxxxx:8123/secisland";
    ClickHouseProperties properties = new ClickHouseProperties();
    properties.setUser("default");
    properties.setPassword("xxxxx");

    ClickHouseDataSource dataSource = new ClickHouseDataSource(url,properties);
    String sql = "INSERT INTO secislog (id,content) VALUES (3,'demos')";
    Map additionalDBParams = new HashMap<>();
    additionalDBParams.put(ClickHouseQueryParam.SESSION_ID, "new-session-id");
    ClickHouseConnection conn =null;
    try {
    conn = dataSource.getConnection();
    ClickHouseStatement stmt = conn.createStatement();
    ResultSet rs = stmt.executeQuery(sql, additionalDBParams);
    if (rs!=null){
    while (rs.next()){
    System.out.println(rs);
    }
    }
    } catch (SQLException throwables) {
    throwables.printStackTrace();
    }
}

pom.xml


    ru.yandex.clickhouse
    clickhouse-jdbc
    0.3.1-patch

2.接下来可以参考Flink JDBC Sink的代码生成关于Clickhouse的代码

多思考,虽然可以直接使用。

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class FlinkToClickhouseSink {

    static String url ="jdbc:clickhouse://xxxx";
    static String driverName="ru.yandex.clickhouse.ClickHouseDriver";
    static String userName="default";
    static String passWord="xxxxxx";
    FlinkToClickhouseSink(){
    }

    
    public void operationClickhouse(StreamExecutionEnvironment env , String sql ,T t, JdbcStatementBuilder statementBuilder){
        env.fromElements(t).addSink(JdbcSink.sink(
                sql,statementBuilder,
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(url)
                        .withDriverName(driverName)
                        .withUsername(userName)
                        .withPassword(passWord)
                        .build()
        ));
    }

    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Tuple2 bjTp = Tuple2.of(18, "北京adsd");
        JdbcStatementBuilder> statementBuilder = (ps,t2 ) -> {
            ps.setInt(1,t2.f0);
            ps.setString(2,t2.f1);
        };
        String sql = "INSERT INTO secislog (id,content) VALUES (?,?)";
        FlinkToClickhouseSink fc =new FlinkToClickhouseSink>();
        fc.operationClickhouse(env,sql,bjTp,statementBuilder);
        env.execute("demo");
    }

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/604684.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号