import com.alibaba.ververica.connectors.datahub.sink.DatahubSinkFunction;
import com.aliyun.datahub.client.model.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class DemoTest {
public static void main(String[] args) throws Exception {
//创建Flink流处理执行环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
environment.setParallelism(1);
//调用Flink自定义Source
DataStreamSource source = environment.addSource(new DataSource());
//打印数据
source.print();
//DataHub连接配置。
DatahubSinkFunction sinkFunction = new DatahubSinkFunction<>(
"***************************************",//EndPoint
"*************",//DataHub项目名称
"*************",//主题topic的名称
"**********",//accessID
"************"//accessKey
);
//序列化信息
DataStream operator = source.map(new MapFunction() {
@Override
public RecordEntry map(Sick sick) throws Exception {
RecordEntry entry = getRecordEntry(sick);
return entry;
}
});
operator.addSink(sinkFunction);
//启动程序
environment.execute();
}
//序列化方法
public static RecordEntry getRecordEntry(Sick sick) {
//注册Schema信息
RecordSchema recordSchema = new RecordSchema();
recordSchema.addField(new Field("name", FieldType.STRING));
recordSchema.addField(new Field("sex", FieldType.STRING));
recordSchema.addField(new Field("age", FieldType.BIGINT));
recordSchema.addField(new Field("area", FieldType.BIGINT));
recordSchema.addField(new Field("disease_status", FieldType.BIGINT));
recordSchema.addField(new Field("date", FieldType.STRING));
RecordEntry recordEntry = new RecordEntry();
TupleRecordData recordData = new TupleRecordData(recordSchema);
//将数据流里面的对象根据下标赋值
recordData.setField(0, sick.getName());
recordData.setField(1, sick.getSex());
recordData.setField(2, sick.getAge());
recordData.setField(3, sick.getArea());
recordData.setField(4, sick.getDisease_status());
recordData.setField(5, sick.getDate());
recordEntry.setRecordData(recordData);
return recordEntry;
}
}