通过自定义输入和输出,可以极大程度地适应业务需求。不管是输入数据,还是输出数据,都可以和文件,数据库,消息队列,redis,网络服务等进行交互,实现数据传输。
1. 添加依赖2. 测试代码8 8 1.10.1 1.2.17 1.7.7 2.12 org.apache.flink flink-java${flink.version} org.apache.flink flink-streaming-java_${scala.version}${flink.version}
package com.demo.customsource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.HashMap;
import java.util.Random;
public class CustomSourceAndSink {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream dataStream = env.addSource(new MyCustomSource());
dataStream.print();
dataStream.addSink(new MyCustomSink());
env.execute();
}
public static class MyCustomSource implements SourceFunction
{
// 标示位,控制数据产生
private volatile boolean running = true;
@Override
public void run(SourceContext sourceContext) throws Exception {
//定义一个随机数发生器
Random random = new Random();
// 设置10个传感器的初始温度
HashMap dataTempMap = new HashMap<>();
for (int i = 0; i < 10; ++i) {
dataTempMap.put("id_" + (i + 1), 60 + random.nextGaussian() * 20);
}
while (running) {
for (String sensorId : dataTempMap.keySet()) {
// 在当前基础上随机波动
Double newTemp = dataTempMap.get(sensorId) + random.nextGaussian();
dataTempMap.put(sensorId, newTemp);
sourceContext.collect(new DataEntity(sensorId,System.currentTimeMillis(),newTemp.longValue()));
}
// 控制输出评率
Thread.sleep(2000L);
}
}
@Override
public void cancel() {
this.running = false;
}
}
// 实现自定义的SinkFunction
public static class MyCustomSink extends RichSinkFunction {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("自定义输出的初始化");
}
@Override
public void invoke(DataEntity value, Context context) throws Exception {
super.invoke(value, context);
System.out.println("自定义输出:" + value);
}
}
}
3. 辅助类
package com.demo.customsource;
public class DataEntity {
String id;
Long timestamp;
Long value;
public DataEntity() {
}
public DataEntity(String id, Long timestamp, Long value) {
this.id = id;
this.timestamp = timestamp;
this.value = value;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Long getValue() {
return value;
}
public void setValue(Long value) {
this.value = value;
}
@Override
public String toString() {
return "DataEntity{" +
"id='" + id + ''' +
", timestamp=" + timestamp +
", value=" + value +
'}';
}
}
4. 执行测试
自定义输出的初始化
自定义输出的初始化
自定义输出的初始化
自定义输出的初始化
自定义输出的初始化
自定义输出的初始化
自定义输出的初始化
自定义输出的初始化
自定义输出:DataEntity{id='id_10', timestamp=1646041659065, value=25}
3> DataEntity{id='id_4', timestamp=1646041659063, value=35}
自定义输出:DataEntity{id='id_1', timestamp=1646041659066, value=97}
4> DataEntity{id='id_7', timestamp=1646041659064, value=76}
2> DataEntity{id='id_5', timestamp=1646041659043, value=52}
自定义输出:DataEntity{id='id_9', timestamp=1646041659065, value=56}
2> DataEntity{id='id_3', timestamp=1646041659066, value=62}
7> DataEntity{id='id_8', timestamp=1646041659065, value=42}
6> DataEntity{id='id_9', timestamp=1646041659065, value=56}
1> DataEntity{id='id_1', timestamp=1646041659066, value=97}
自定义输出:DataEntity{id='id_8', timestamp=1646041659065, value=42}
3> DataEntity{id='id_2', timestamp=1646041659066, value=45}
5> DataEntity{id='id_6', timestamp=1646041659064, value=80}
自定义输出:DataEntity{id='id_5', timestamp=1646041659043, value=52}
8> DataEntity{id='id_10', timestamp=1646041659065, value=25}
自定义输出:DataEntity{id='id_6', timestamp=1646041659064, value=80}
自定义输出:DataEntity{id='id_4', timestamp=1646041659063, value=35}
从输出可以看出,每个分区(partition)执行了一次初始化。没一条数据都执行了一次invoke,此时可以根据业务需要,写入数据库,redis,hdfs文件等处理,也可以通过网络发送到指定的网络服务。



