flink的代码层面数据处理 流程
addSource(读取数据)->类型为dataStream做一些业务处理(核心部分在这里),好比java的stream的一些API操作->addSink(输出数据)
1.四种读取方式-从集合中、从kafka中、从文件中、自定义
上代码pom.xml
4.0.0 org.springframework.boot spring-boot-starter-parent 2.6.6 me flink 0.0.1-SNAPSHOT flink Demo project for Spring Boot 1.8 8 8 1.12.1 2.12 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version} org.apache.maven.plugins maven-surefire-plugin true org.springframework.boot spring-boot-maven-plugin
package me.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 创建流处理执行环境
StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
// 设置并行度1
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.5.42:9092");
// 下面这些次要参数
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
// flink添加外部数据源
DataStream dataStream = env.addSource(new FlinkKafkaConsumer("topic2", new SimpleStringSchema(),properties));
// 基于数据流进行转换计算 做一些业务操作
DataStream flatMapStream = dataStream.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) throws Exception {
String[] fields = value.split(",");
for(String field:fields){
// 收集器输出
out.collect(field);
}
}
});
flatMapStream.print();
// 将数据写入Kafka
flatMapStream.addSink( new FlinkKafkaProducer("192.168.5.42:9092", "topic1", new SimpleStringSchema()));
// 执行任务
env.execute();
}
}



