一、flink分析结果写入redis
1、下载link-hadoop整合包,放入所有节点
2、KafkaToRedisWordCount
package cn._51doit.flink.day08;
import cn._51doit.flink.day02.RedisSinkDemo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class KafkaToRedisWordCount {
//--topic doit2021 --groupId g02 --redisHost node-3.51doit.cn
//--redisPwd 123456 --fsBackend hdfs://node-1.51doit.cn:9000/flinkck2021
public static void main(String[] args) throws Exception{
//System.setProperty("HADOOP_USER_NAME", "root");
ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//可以间内存中的状态持久化到StateBackend
env.enableCheckpointing(parameterTool.getLong("chkInterval", 30000));
//设置状态存储的后端
env.setStateBackend(new FsStateBackend(parameterTool.getRequired("fsBackend")));
//如果你手动cancel job后,不删除job的checkpoint数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//添加KafkaSource
//设置Kafka相关参数
Properties properties = new Properties();//设置Kafka的地址和端口
properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-1.51doit.cn:9092,node-1.51doit.cn:9092");
//读取偏移量策略:如果没有记录偏移量,就从头读,如果记录过偏移量,就接着读
properties.setProperty("auto.offset.reset", "earliest");
//设置消费者组ID
properties.setProperty("group.id", parameterTool.get("groupId"));
//开启checkpoint,不然让flink的消费(source对他的subtask)自动提交偏移量
properties.setProperty("enable.auto.commit", "false");
//创建FlinkKafkaConsumer并传入相关参数
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(
parameterTool.getRequired("topic"), //要读取数据的Topic名称
new SimpleStringSchema(), //读取文件的反序列化Schema
properties //传入Kafka的参数
);
//kafkaConsumer.setCommitOffsetsonCheckpoints(false); //设置在checkpoint是不将偏移量保存到kafka特殊的topic中,可设可不设
//使用addSource添加kafkaConsumer
DataStreamSource lines = env.addSource(kafkaConsumer);
SingleOutputStreamOperator> wordAndOne = lines.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String line, Collector> collector) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
//new Tuple2(word, 1)
collector.collect(Tuple2.of(word, 1));
}
}
});
//分组
KeyedStream, String> keyed = wordAndOne.keyBy(t -> t.f0);
//聚合
SingleOutputStreamOperator> summed = keyed.sum(1);
//将聚合后的结果写入到Redis中
//调用Sink
//summed.addSink()
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setHost(parameterTool.getRequired("redisHost"))
.setPassword(parameterTool.getRequired("redisPwd"))
.setDatabase(9).build();
summed.addSink(new RedisSink>(conf, new RedisSinkDemo.RedisWordCountMapper()));
env.execute();
}
private static class RedisWordCountMapper implements RedisMapper> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "WORD_COUNT");
}
@Override
public String getKeyFromData(Tuple2 data) {
return data.f0;
}
@Override
public String getValueFromData(Tuple2 data) {
return data.f1.toString();
}
}
}
备注:若redis挂了,flink继续写入数据,redis恢复,错过数据依旧写进来,因为;
取消flink, 不删除偏移量数据,重启后指定上次checkpoint,还能继续计算, 上面的案例就使用的这种方式或者使用savePoint,取消时手动保存。
二、从kafka读取数据,处理后写回kafka
package cn._51doit.flink.day09;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
public class KafkaToKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启checkpointing
env.enableCheckpointing(30000);
env.setStateBackend(new FsStateBackend("file:///Users/xing/Desktop/flinkck20210123"));
//设置Kafka相关参数
Properties properties = new Properties();//设置Kafka的地址和端口
properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-1.51doit.cn:9092,node-1.51doit.cn:9092");
//读取偏移量策略:如果没有记录偏移量,就从头读,如果记录过偏移量,就接着读
properties.setProperty("auto.offset.reset", "earliest");
//设置消费者组ID
properties.setProperty("group.id", "g1");
//没有开启checkpoint,让flink提交偏移量的消费者定期自动提交偏移量
properties.setProperty("enable.auto.commit", "false");
//创建FlinkKafkaConsumer并传入相关参数
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(
"doit2021", //要读取数据的Topic名称
new SimpleStringSchema(), //读取文件的反序列化Schema
properties //传入Kafka的参数
);
//使用addSource添加kafkaConsumer
kafkaConsumer.setCommitOffsetsOnCheckpoints(false); //在checkpoint时,不将偏移量写入到kafka特殊的topic中
DataStreamSource lines = env.addSource(kafkaConsumer);
SingleOutputStreamOperator filtered = lines.filter(e -> !e.startsWith("error"));
//使用的是AtLeastOnce
// FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>(
// "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092", "out2021", new SimpleStringSchema()
// );
//写入Kafka的topic
String topic = "out2021";
//设置Kafka相关参数
properties.setProperty("transaction.timeout.ms",1000 * 60 * 5 + "");
//创建FlinkKafkaProducer
FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(
topic, //指定topic
new KafkaStringSerializationSchema(topic), //指定写入Kafka的序列化Schema
properties, //指定Kafka的相关参数
FlinkKafkaProducer.Semantic.EXACTLY_onCE //指定写入Kafka为EXACTLY_ONCE语义
);
filtered.addSink(kafkaProducer);
env.execute();
}
}
2、定义KafkaStringSerializationSchema
package cn._51doit.flink.day09; import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.kafka.clients.producer.ProducerRecord; import javax.annotation.Nullable; import java.nio.charset.Charset; public class KafkaStringSerializationSchema implements KafkaSerializationSchema{ private String topic; private String charset; //构造方法传入要写入的topic和字符集,默认使用UTF-8 public KafkaStringSerializationSchema(String topic) { this(topic, "UTF-8"); } public KafkaStringSerializationSchema(String topic, String charset) { this.topic = topic; this.charset = charset; } //调用该方法将数据进行序列化 @Override public ProducerRecord serialize(String element, @Nullable Long timestamp) { //将数据转成bytes数组 byte[] bytes = element.getBytes(Charset.forName(charset)); //返回ProducerRecord return new ProducerRecord<>(topic, bytes); } }



