package com.yyds.app.function; import com.alibaba.fastjson.JSONObject; import com.yyds.common.FlinkConfig; import org.apache.commons.lang.StringUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Collection; import java.util.Set; public class DimSinkFunction extends RichSinkFunction(2)保存业务数据到 Kafka 主题{ private Connection connection; // 初始化连接 @Override public void open(Configuration parameters) throws Exception { Class.forName(FlinkConfig.PHOENIX_DRIVER); connection = DriverManager.getConnection( FlinkConfig.PHOENIX_SERVER ); } @Override public void invoke(JSONObject value, Context context) throws Exception { // value = {"sinkTable":"", "database":"","before":{},"after":{},"type":"insert","tableName":""} // phoenix sql语句 // upsert into db.tn (id,name) values('...','...') PreparedStatement preparedStatement = null; try { //1、获取sql String upsertSql = getUpsertSql( value.getString("sinkTable"), value.getJSONObject("after") ); System.out.println("phoenix sql ===> " + upsertSql); // 2、预编译sql语句 preparedStatement = connection.prepareStatement(upsertSql); // 3、执行插入操作 preparedStatement.executeUpdate(); connection.commit(); } catch (SQLException e) { e.printStackTrace(); } finally { if(preparedStatement != null){ try { preparedStatement.close(); } catch (SQLException e) { e.printStackTrace(); } } } } // upsert into db.tn (id,name) values('...','...') private String getUpsertSql(String sinkTable, JSONObject after) { String sql = " upsert into " + FlinkConfig.Hbase_SCHEMA + "." + sinkTable + " ( " ; Set keySet = after.keySet(); Collection
package com.yyds.utils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
public class MyKafkaUtils {
private static String brokers = "centos01:9092,centos02:9092,centos03:9092";
private static String defaultTopic = "DWD_DEFAULT_TOPIC";
public static FlinkKafkaProducer getKafkaProducer(KafkaSerializationSchema kafkaSerializationSchema){
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers);
return new FlinkKafkaProducer(
defaultTopic,
kafkaSerializationSchema,
properties,
FlinkKafkaProducer.Semantic.EXACTLY_onCE
);
}
public static FlinkKafkaConsumer getKafkaConsumer(String topic,String groupId){
Properties properties = new Properties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers);
return new FlinkKafkaConsumer(
topic,
new SimpleStringSchema(),
properties
);
}
}
(3)主程序代码
//TODO 8、kafka数据到kafka主题 和 Hbase流数据写入phoenix表
hbaseDataStream.addSink(new DimSinkFunction());
kafkaDataStream.addSink(MyKafkaUtils.getKafkaProducer(new KafkaSerializationSchema() {
@Override
public ProducerRecord serialize(JSONObject element, @Nullable Long timestamp) {
return new ProducerRecord(
element.getString("sinkTable"),
element.getString("after").getBytes()
);
}
}));
其他参考:
实时数仓(三)业务数据从ods到dwd中数据的动态分流:
https://blog.csdn.net/qq_44665283/article/details/123724897



