集群规划(辛苦我的小本本了,拖8台centos):
flink采用on yarn模式,机器资源有限,ck只装了单节点
| 域名 | IP | 安装的软件 | 运行的进程 |
| zcx1 | 192.168.220.128 | hadoop flink kafka | NameNode DFSZKFailoverController(zkfc) JobManager TaskManager |
| zcx2 | 192.168.220.129 | hadoop flink | NameNode DFSZKFailoverController(zkfc) JobManager TaskManager |
| zcx3 | 192.168.220.130 | hadoop flink | ResourceManager TaskManager |
| zcx4 | 192.168.220.131 | hadoop zookeeper kafka | DataNode NodeManager JournalNode QuorumPeerMain |
| zcx5 | 192.168.220.132 | hadoop zookeeper kafka | DataNode NodeManager JournalNode QuorumPeerMain |
| zcx6 | 192.168.220.133 | hadoop zookeeper kafka | DataNode NodeManager JournalNode QuorumPeerMain |
| zcx7 | 192.168.220.134 | hadoop | ResourceManager |
| ck3 | 192.168.220.142 | clickhoue | clickhouse-server |
flink主要代码
public class Kafka_To_Flink_To_Clickhouse {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
Properties properties=new Properties();
properties.setProperty("bootstrap.servers","zcx4:9092");
FlinkKafkaConsumer stringFlinkKafkaConsumer = new FlinkKafkaConsumer("zcx1",new SimpleStringSchema(),properties);
stringFlinkKafkaConsumer.setStartFromEarliest();
DataStreamSource topic = env.addSource(stringFlinkKafkaConsumer);
SingleOutputStreamOperator map = topic.map(new MapFunction() {
@Override
public String map(String s) throws Exception {
return s;
}
});
tenv.registerDataStream("zcx1",map,"name");
Table result = tenv.sqlQuery("select name from zcx1");
DataStream rowDataStream = tenv.toDataStream(result,Zcx1.class);
rowDataStream.print();
rowDataStream.addSink(new MyClickhouseUtil());
env.execute();
}
}
public class MyClickhouseUtil extends RichSinkFunction{ Connection connection; PreparedStatement preparedStatement; @Override public void invoke(Zcx1 value, Context context) throws Exception { preparedStatement.setString(1,value.name); // preparedStatement.setInt(2,value.num); preparedStatement.execute(); } @Override public void open(Configuration parameters) throws Exception { Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); connection = DriverManager.getConnection("jdbc:clickhouse://192.168.220.142:8123/default","default","GsAdBi/O"); preparedStatement = connection.prepareStatement("insert into zcx1 values(?)"); } @Override public void close() throws Exception { if(null!=connection){ connection.close(); } if(null!=preparedStatement){ preparedStatement.close(); } } }
测试
kafka producer生产数据
实时写入clickhouse



