1:Kafka
org.apache.flink
flink-connector-kafka-0.11_2.12
1.10.1
package com.atguigu.sink;
import com.atguigu.bean.SensorReading;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
public class SinkTest1_Kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String path ="D:\大数据组件API\Flink\Flink01\src\main\resources\test.txt";
DataStreamSource dataStream = env.readTextFile(path);
DataStream inputStream = dataStream.map(value -> {
String[] split = value.split(",");
return new SensorReading(split[0], new Long(split[1]), new Double(split[2])).toString();
});
DataStreamSink second = inputStream.addSink(new FlinkKafkaProducer011(
"hadoop112:9092", "second", new SimpleStringSchema()));
env.execute();
}
}
2:redis
org.apache.bahir
flink-connector-redis_2.11
1.0
package com.atguigu.sink;
import com.atguigu.bean.SensorReading;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class SinkTest1_Redis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String path ="D:\大数据组件API\Flink\Flink01\src\main\resources\test.txt";
DataStreamSource dataStream = env.readTextFile(path);
DataStream inputStream = dataStream.map(value -> {
String[] split = value.split(",");
return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
});
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("hadoop112")
.setPort(6379)
.build();
inputStream.addSink(new RedisSink<>(config, new RedisMapper() {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "sensor_tempe");
}
@Override
public String getKeyFromData(SensorReading data) {
return data.getId();
}
@Override
public String getValueFromData(SensorReading data) {
return data.getTemperature().toString();
}
}));
// inputStream.addSink(new RedisSink(config, new MyRedisMapper()) );
env.execute();
}
public static class MyRedisMapper implements RedisMapper {
// 保存到 redis 的命令,存成哈希表
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "sensor_tempe");
}
public String getKeyFromData(SensorReading data) {
return data.getId();
}
public String getValueFromData(SensorReading data) {
return data.getTemperature().toString();
}
}
}
3:ES
org.apache.flink
flink-connector-elasticsearch6_2.12
1.10.1
package com.atguigu.sink;
import com.atguigu.bean.SensorReading;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkbase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.HashMap;
public class SinkTest1_ES {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String path ="D:\大数据组件API\Flink\Flink01\src\main\resources\test.txt";
DataStreamSource dataStream = env.readTextFile(path);
DataStream inputStream = dataStream.map(value -> {
String[] split = value.split(",");
return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
});
// es 的 httpHosts 配置
ArrayList httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("hadoop112",9200));
ElasticsearchSink elasticsearchSink = new ElasticsearchSink.Builder<>(httpHosts, new MyEsSink()).build();
inputStream.addSink(elasticsearchSink);
env.execute();
}
public static class MyEsSink implements ElasticsearchSinkFunction{
@Override
public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
HashMap hashMap = new HashMap<>();
hashMap.put("id",element.getId());
hashMap.put("temp",element.getTemperature().toString());
hashMap.put("ts",element.getTimestamp().toString());
IndexRequest indexRequest = Requests.indexRequest()
.index("senor")
.type("info")
.source(hashMap);
indexer.add(indexRequest);
}
}
}
4:JDBC自定义Sink
mysql
mysql-connector-java
5.1.47
package com.atguigu.sink;
import com.atguigu.bean.SensorReading;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.ArrayList;
public class SinkTest1_JDBC {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String path ="D:\大数据组件API\Flink\Flink01\src\main\resources\test.txt";
DataStreamSource dataStream = env.readTextFile(path);
DataStream inputStream = dataStream.map(value -> {
String[] split = value.split(",");
return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
});
inputStream.addSink(new MyJdbc());
env.execute();
}
public static class MyJdbc extends RichSinkFunction {
Connection conn = null;
String sql = "INSERT INTO `firstorder`(`phone_no`,`is_new`) VALUES(?,?)";
String sql2 = "Update `firstorder` set is_new = ? where phone_no = ?";
PreparedStatement insertStmt;
PreparedStatement updateStmt;
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456");
insertStmt = conn.prepareStatement(sql);
updateStmt=conn.prepareStatement(sql2);
}
@Override
public void invoke(SensorReading value, Context context) throws Exception {
updateStmt.setString(1,value.getTemperature().toString());
updateStmt.setString(2,value.getId());
updateStmt.executeUpdate();
if(updateStmt.getUpdateCount() == 0){
insertStmt.setString(1,value.getId());
insertStmt.setString(2,value.getTemperature().toString());
insertStmt.execute();
}
}
@Override
public void close() throws Exception {
insertStmt.close();
updateStmt.close();
conn.close();
}
}
}