在实时流数据处理中,我们通常可以采用Flink+Clickhouse的方式做实时的OLAP处理。关于两者的优点就不再赘述,本文采用一个案例来简要介绍一下整体的流程。
整体流程:- 向kafka特定主题下导入json格式数据
- 编写Flink Kafka Comsumer消费主题下的数据
- 利用Flink算子对数据进行处理(ETL)
- 将处理后的数据下沉到Clickhouse数据库中
在创建好主题后,利用kafka-console-producer.sh命令将预先的JSON格式数据发送到创建好的主题下,比如JSON格式数据:
{"appKey":"mailandroid","deviceId":"1807516f-1cb3-4a6e-8ac1-454d401a5716","version":"1.0","uid":"","dashiUid":"1388f4059f87578418ba2906c5425af5","ua":"","carrier":"中国移动", ...}
{"appKey":"mailios","deviceId":"0B4D45A9-3212-4C38-B58E-1A96792AF297","version":"1.0","uid":"","dashiUid":"c53f631b1d33273f28953893b7383e0a","ua":"Mozilla/5.0 (iPhone; CPU iPhone OS 15_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148","carrier":"中国移动", ...}
...
写入完成后,可以利用kafka-console-consumer.sh来进行查看对应主题下的数据是否有被写入。
编写Flink Kafka Comsumer消费主题下的数据在Idea中创建项目来编写代码连接Kafka进行消费。
package com.demo.flink;
import com.alibaba.fastjson.JSON;
import com.demo.flink.pojo.Mail;
import com.demo.flink.utils.MyClickHouseUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.HashMap;
import java.util.Properties;
public class FlinkSinkClickhouse {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// source
String topic = "test_process";
Properties props = new Properties();
// 设置连接kafka集群的参数
props.setProperty("bootstrap.servers", "10.224.192.133:9092, 10.224.192.134:9092");
// 定义Flink Kafka Consumer
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(topic, new SimpleStringSchema(), props);
consumer.setStartFromGroupOffsets();
consumer.setStartFromEarliest(); // 设置每次都从头消费
// 添加source数据流
DataStreamSource source = env.addSource(consumer);
System.out.println(source);
SingleOutputStreamOperator dataStream = source.map(new MapFunction() {
@Override
public Mail map(String value) throws Exception {
HashMap hashMap = JSON.parseObject(value, HashMap.class);
// System.out.println(hashMap);
String appKey = hashMap.get("appKey");
String appVersion = hashMap.get("appVersion");
String deviceId = hashMap.get("deviceId");
String phone_no = hashMap.get("phone_no");
Mail mail = new Mail(appKey, appVersion, deviceId, phone_no);
// System.out.println(mail);
return mail;
}
});
dataStream.print();
// sink
String sql = "INSERT INTO test.ods_countlyV2 (appKey, appVersion, deviceId, phone_no) " +
"VALUES (?, ?, ?, ?)";
MyClickHouseUtil ckSink = new MyClickHouseUtil(sql);
dataStream.addSink(ckSink);
env.execute();
上面利用了Java Flink连接Kafka的方式进行连接,设置了一些初始化和连接必要的参数。最后addSource添加数据流
利用Flink算子对数据进行处理(ETL)一个简单的ETL过程,使用了Flink的Map算子,在Map算子中编写自己的数据处理逻辑。这里的Mail类是我自己定义的Pojo类,用来封装处理后需要保存的json结果。由于Kafka读取出来的数据是String格式的value,因此利用了fastjson的JSON.parseObject(value, HashMap.class)来转换为HashMap的格式,便于取出我需要的键值对。最后将所需要的键值对封装为MailPojo类进行返回。以此来对数据流做一个简单的ETL过程。
将处理后的数据下沉到Clickhouse数据库中处理好的数据最后需要下沉到Clickhouse中进行保存和使用。下面给出sink clickhouse的代码
package com.demo.flink.utils; import com.demo.flink.pojo.Mail; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import ru.yandex.clickhouse.ClickHouseConnection; import ru.yandex.clickhouse.ClickHouseDataSource; import ru.yandex.clickhouse.ClickHouseStatement; import ru.yandex.clickhouse.settings.ClickHouseProperties; import ru.yandex.clickhouse.settings.ClickHouseQueryParam; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.HashMap; import java.util.Map; public class MyClickHouseUtil extends RichSinkFunction{ private ClickHouseConnection conn = null; String sql; public MyClickHouseUtil(String sql) { this.sql = sql; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); return ; } @Override public void close() throws Exception { super.close(); if (conn != null) { conn.close(); } } @Override public void invoke(Mail mail, Context context) throws Exception { String url = "jdbc:clickhouse://10.224.192.133:8123/test"; ClickHouseProperties properties = new ClickHouseProperties(); properties.setUser("default"); properties.setPassword("ch20482048"); properties.setSessionId("default-session-id"); ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties); Map additionalDBParams = new HashMap<>(); additionalDBParams.put(ClickHouseQueryParam.SESSION_ID, "new-session-id"); try { conn = dataSource.getConnection(); PreparedStatement preparedStatement = conn.prepareStatement(sql); preparedStatement.setString(1,mail.getAppKey()); preparedStatement.setString(2, mail.getAppVersion()); preparedStatement.setString(3, mail.getDeviceId()); preparedStatement.setString(4, mail.getPhone_no()); preparedStatement.execute(); } catch (Exception e){ e.printStackTrace(); } } }
MyClickHouseUtil类继承了RichSinkFunction类。由于前面的Flink算子处理后的数据流类型是Mail类型的,因此RichSinkFunction类的泛型为Mail类型。
接下来就是重写open、close和invode方法。其中最关键的是invoke方法,每sink一条数据,都会调用一次invoke方法。因此invoke方法的第一个参数类型为Mail,也就是Flink算子处理后,需要sink的数据流类型。因此,我们主要的sink逻辑都可以写在这里。
首先定义了一个ClickHouseProperties的对象,用来保存连接Clickhouse所需的参数,比如用户名,密码。接下来,用该properties和url来构造一个连接Clickhouse的DataSource,并从该连接池获取连接conn。最后利用JDBC的prepareStatement,来对写好的SQL中的占位符进行赋值。调用execute方法执行SQL,将处理后的数据流插入到Clickhouse中。
运行后查看Clickhouse中的数据可以发现数据已经被写入到Clickhouse中对应的表中。
最后给出pom.xml和Mail类的源码:
pom.xml:
4.0.0 org.example kafka_flink 1.0-SNAPSHOT 8 8 org.apache.flink flink-java 1.11.1 org.apache.flink flink-clients_2.11 1.11.1 org.apache.flink flink-connector-kafka_2.11 1.11.1 org.apache.flink flink-table-api-java-bridge_2.11 1.11.1 org.apache.flink flink-streaming-java_2.10 1.3.2 com.alibaba fastjson 1.2.59 org.apache.kafka kafka_2.11 1.0.2 org.apache.kafka kafka-clients 1.0.2 ru.yandex.clickhouse clickhouse-jdbc 0.1.54
其中最重要的是引入ru.yandex.clickhouse,用来sink clickhouse.
Mail类:
package com.demo.flink.pojo;
public class Mail {
private String appKey;
private String appVersion;
private String deviceId;
private String phone_no;
public Mail(String appKey, String appVersion, String deviceId, String phone_no) {
this.appKey = appKey;
this.appVersion = appVersion;
this.deviceId = deviceId;
this.phone_no = phone_no;
}
public String getAppKey() {
return appKey;
}
public void setAppKey(String appKey) {
this.appKey = appKey;
}
public String getAppVersion() {
return appVersion;
}
public void setAppVersion(String appVersion) {
this.appVersion = appVersion;
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getPhone_no() {
return phone_no;
}
public void setPhone_no(String phone_no) {
this.phone_no = phone_no;
}
@Override
public String toString() {
return "Mail{" +
"appKey='" + appKey + ''' +
", appVersion='" + appVersion + ''' +
", deviceId='" + deviceId + ''' +
", phone_no='" + phone_no + ''' +
'}';
}
public Mail of(String appKey, String appVersion, String deviceId, String phone_no)
{
return new Mail(appKey, appVersion, deviceId, phone_no);
}
}
对于数据处理过程中需要提取的一些数据,我们可以做成一个Pojo类来进行存储,这样更方便明了,代码层次也更简洁。



