- Flink-clickhouse JDBC sink:
1.若是没有接触过clickhouse的同学可以先看下java连接clickhouse的方法。
@Test
public void clickhouseTest(){
//使用clickhouse官网推荐的JDBC连接方式,使用了官网推荐的连接驱动
String url = "jdbc:clickhouse://xxxxxx:8123/secisland";
ClickHouseProperties properties = new ClickHouseProperties();
properties.setUser("default");
properties.setPassword("xxxxx");
ClickHouseDataSource dataSource = new ClickHouseDataSource(url,properties);
String sql = "INSERT INTO secislog (id,content) VALUES (3,'demos')";
Map additionalDBParams = new HashMap<>();
additionalDBParams.put(ClickHouseQueryParam.SESSION_ID, "new-session-id");
ClickHouseConnection conn =null;
try {
conn = dataSource.getConnection();
ClickHouseStatement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql, additionalDBParams);
if (rs!=null){
while (rs.next()){
System.out.println(rs);
}
}
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
pom.xml
ru.yandex.clickhouse clickhouse-jdbc 0.3.1-patch
2.接下来可以参考Flink JDBC Sink的代码生成关于Clickhouse的代码
多思考,虽然可以直接使用。
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class FlinkToClickhouseSink{ static String url ="jdbc:clickhouse://xxxx"; static String driverName="ru.yandex.clickhouse.ClickHouseDriver"; static String userName="default"; static String passWord="xxxxxx"; FlinkToClickhouseSink(){ } public void operationClickhouse(StreamExecutionEnvironment env , String sql ,T t, JdbcStatementBuilder statementBuilder){ env.fromElements(t).addSink(JdbcSink.sink( sql,statementBuilder, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl(url) .withDriverName(driverName) .withUsername(userName) .withPassword(passWord) .build() )); } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Tuple2 bjTp = Tuple2.of(18, "北京adsd"); JdbcStatementBuilder > statementBuilder = (ps,t2 ) -> { ps.setInt(1,t2.f0); ps.setString(2,t2.f1); }; String sql = "INSERT INTO secislog (id,content) VALUES (?,?)"; FlinkToClickhouseSink fc =new FlinkToClickhouseSink >(); fc.operationClickhouse(env,sql,bjTp,statementBuilder); env.execute("demo"); } }



