和其他所有的计算框架一样,Flink 也有一些基础的开发步骤以及基础,核心的 API,从开发步骤的角度来讲,主要分为四大部分
转换算子可以把一个或多个 DataStream 转成一个新的 DataStream.程序可以把多个复杂 的转换组合成复杂的数据流拓扑。
Sink 有下沉的意思,在 Flink 中所谓的 Sink 其实可以表示为将数据存储起来的意思,也可以将范围扩大,表示将处理完的数据发送到指定的存储系统的输出操作。前面的 print 就是一个 sink。
一、数据写入 Kafka代码如下:
package day04;
import bean.WaterSensor;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
public class Flink01_Sink_Kafka {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取端口数据
DataStreamSource socketDS = env.socketTextStream("localhost", 1111);
// 处理数据
SingleOutputStreamOperator mapDS = socketDS.map((MapFunction) value -> {
String[] els = value.split(" ");
return JSON.toJSONString(new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2])));
}).returns(Types.STRING);
// 将数据写入 Kafka
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.225:9092");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
mapDS.addSink(new FlinkKafkaProducer("test", new SimpleStringSchema(), properties));
env.execute();
}
}
与 Kafka Source 几乎一样。
二、数据写入 Redisredis 在写入 hash 结构的数据需要注意,作为 key-value 结构的数据,string、set、list这些只需要一个 key,hash 结构的数据需要额外提供一个 field 即:key-(field-value)
代码如下:
package day04;
import bean.WaterSensor;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class Flink02_Sink_Redis {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取端口数据
DataStreamSource socketDS = env.socketTextStream("localhost", 1111);
// 处理数据
SingleOutputStreamOperator mapDS = socketDS.map((MapFunction) value -> {
String[] els = value.split(" ");
return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2]));
}).returns(Types.POJO(WaterSensor.class));
// 将数据写入 Redis
FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build();
mapDS.addSink(new RedisSink<>(jedisPoolConfig, new RedisMapper() {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "Sensor");
}
@Override
public String getKeyFromData(WaterSensor data) {
return data.getId();
}
@Override
public String getValueFromData(WaterSensor data) {
return Double.toString(data.getVc());
}
}));
// 执行任务
env.execute();
}
}
对于非 hash 结构的数据 getKeyFromData、getValueFromData 即指定 key 和 value,getCommandDescription 则指定插入数据的命令,对于 hash 结构的数据则是 getCommandDescription 里指定 key,getKeyFromData 指定 field。
三、数据写入 ES代码如下:
package day04;
import bean.WaterSensor;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;
import java.util.Collections;
import java.util.List;
public class Flink03_Sink_ES {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取端口数据
DataStreamSource socketDS = env.socketTextStream("localhost", 1111);
// 处理数据
SingleOutputStreamOperator mapDS = socketDS.map((MapFunction) value -> {
String[] els = value.split(" ");
return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2]));
}).returns(Types.POJO(WaterSensor.class));
// 将数据写入 ES
List hosts = Collections.singletonList(new HttpHost("localhost", 9200));
ElasticsearchSink.Builder elasticsearchSink = new ElasticsearchSink.Builder<>(hosts, new ElasticsearchSinkFunction() {
@Override
public void process(WaterSensor element, RuntimeContext ctx, RequestIndexer indexer) {
IndexRequest request = Requests
.indexRequest("sensor")
.id(element.getId())
.source(JSON.toJSONString(element), XContentType.JSON);
indexer.add(request);
}
});
elasticsearchSink.setBulkFlushMaxActions(1);
mapDS.addSink(elasticsearchSink.build());
env.execute();
}
}
对于流式数据需要设置批量提交的周期
// 按操作次数 elasticsearchSink.setBulkFlushMaxActions(1); // 按时间,单位:秒 elasticsearchSink.setBulkFlushInterval(1); // 按数据量 elasticsearchSink.setBulkFlushMaxSizeMb(100);四、数据写入 RDB
flink 最新版本提供 JDBC 形式的 sink,之前数据若要写入 MySQL 这类关系型数据库还是需要自定义 sink 的。代码如下:
package day04;
import bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class Flink04_Sink_JDBC {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取端口数据
DataStreamSource socketDS = env.socketTextStream("localhost", 1111);
// 处理数据
SingleOutputStreamOperator mapDS = socketDS.map((MapFunction) value -> {
String[] els = value.split(" ");
return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2]));
}).returns(Types.POJO(WaterSensor.class));
// 将数据写入 MySQL 中
String sql = "insert into encryption.sensor(id,ts,vc) values(?,?,?)";
mapDS.addSink(JdbcSink.sink(sql,
(ps, waterSensor) -> {
ps.setString(1, waterSensor.getId());
ps.setLong(2, waterSensor.getTs());
ps.setDouble(3, waterSensor.getVc());
}, JdbcExecutionOptions.builder().withBatchSize(1).build()
, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUrl("jdbc:mysql://这是一个不愿意透露域名的云数据库地址:3306")
.withUsername("这是一个隐藏的用户")
.withPassword("这是一个隐藏的密码")
.build()));
env.execute();
}
}
同理也是需要指定批次提交的周期
// 按数据量 JdbcExecutionOptions.builder().withBatchSize(int size); // 按时间 JdbcExecutionOptions.builder().withBatchIntervalMs(long intervalMs); // 按次数 JdbcExecutionOptions.builder().withMaxRetries(int maxRetries);五、自定义 Sink
代码如下:
package day04;
import bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class Flink05_Sink_Custom {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取端口数据
DataStreamSource socketDS = env.socketTextStream("localhost", 1111);
// 处理数据
SingleOutputStreamOperator mapDS = socketDS.map((MapFunction) value -> {
String[] els = value.split(" ");
return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2]));
}).returns(Types.POJO(WaterSensor.class));
// 自定义 Sink 写入 Mysql 中
mapDS.addSink(new MySQLRichSinkFunction("jdbc:mysql://这是一个不愿意透露域名的云数据库地址:3306", "这是一个隐藏的用户", "这是一个隐藏的密码"));
env.execute();
}
private static class MySQLRichSinkFunction extends RichSinkFunction {
private final String url;
private final String userName;
private final String password;
private Connection con;
private PreparedStatement ps;
public MySQLRichSinkFunction(String url, String userName, String password) {
this.url = url;
this.userName = userName;
this.password = password;
}
@Override
public void open(Configuration parameters) throws Exception {
con = DriverManager.getConnection(url, userName, password);
ps = con.prepareStatement("insert into encryption.sensor(id,ts,vc) values(?,?,?)");
}
@Override
public void close() throws Exception {
if (ps != null) {
ps.close();
}
if (con != null) {
con.close();
}
}
@Override
public void invoke(WaterSensor value, Context context) throws Exception {
ps.setString(1, value.getId());
ps.setLong(2, value.getTs());
ps.setDouble(3, value.getVc());
ps.execute();
}
}
}



