- IDE环境准备
- 准备依赖和打包插件
- 日志配置:创建`log4j.properties`
- 编辑IDEA运行时配置
- 开发步骤
- 1·Environment
- 2·Source
- 从Java的集合中读取数据
- 从Kafka获取数据
- 自定义数据源
- 3·Transform
- 4·Sink
- KafkaSink
- 自定义Sink
- Execution Mode(执行模式)
Flink版本:1.13
日志配置:创建log4j.properties1.13.1 2.12 1.7.30 org.apache.flink flink-java ${flink.version} provided org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} provided org.apache.flink flink-clients_${scala.binary.version} ${flink.version} provided org.apache.flink flink-runtime-web_${scala.binary.version} ${flink.version} provided org.slf4j slf4j-api ${slf4j.version} provided org.slf4j slf4j-log4j12 ${slf4j.version} provided org.apache.logging.log4j log4j-to-slf4j 2.14.0 provided org.apache.maven.plugins maven-shade-plugin 3.2.4 package shade com.google.code.findbugs:jsr305 org.slf4j:* log4j:* *:* meta-INF/*.SF meta-INF/*.DSA meta-INF/*.RSA
log4j.rootLogger=error, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n编辑IDEA运行时配置
开发步骤 1·EnvironmentInclude dependencies with “Provided” scope
- Job提交前,需要先建立与Flink框架之间的联系,也就是当前的Flink运行环境
// 批处理环境 ExecutionEnvironment benv = ExecutionEnvironment.getExecutionEnvironment(); // 流处理环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();2·Source
- 获取数据
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
public class Hello {
public static void main(String[] args) throws Exception {
//创建集合
ArrayList al = new ArrayList();
al.add(1);
al.add(2);
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//从集合中获取数据并打印
env.fromCollection(al).print();
//执行
env.execute();
}
}
从Kafka获取数据
创建Kafka主题
kafka-topics.sh --replication-factor 1 --partitions 1 --zookeeper hadoop102:2181/kafka --create --topic flink
Flink的Kafka依赖
org.apache.flink flink-connector-kafka_2.12 1.13.1
运行Java代码
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class Hello {
public static void main(String[] args) throws Exception {
//Kafka配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop100:9092,hadoop101:9092,hadoop102:9092");
properties.setProperty("group.id", "g1");
properties.setProperty("auto.offset.reset", "latest");
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.addSource(new FlinkKafkaConsumer<>("flink", new SimpleStringSchema(), properties))
.print("Kafka");
//执行
env.execute();
}
}
生产数据
kafka-producer-perf-test.sh --throughput -1 --print-metrics --num-records 10000 --record-size 1024 --producer-props bootstrap.servers=hadoop102:9092 --topic flink自定义数据源
- 实现SourceFunction接口
- 覆写run和cancel方法
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.ArrayList;
public class Hello {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//创建自定义数据源
ArrayList al = new ArrayList<>();
al.add(333);
al.add(4444);
al.add(55555);
MySource mySource = new MySource(al);
//加入数据源
env.addSource(mySource).print();
//执行
env.execute();
}
public static class MySource implements SourceFunction {
private final ArrayList al;
public MySource(ArrayList al) {
this.al = al;
}
@Override
public void run(SourceContext sc) {
for (Integer integer : al) {
sc.collect(integer);
}
}
@Override
public void cancel() {
}
}
}
3·Transform
- 数据转换
有map、flatMap、filter、keyBy、shuffle、reduce……
下面以map为例
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Hello {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2, 3, 4, 5).map(new MapFunction() {
@Override
public Integer map(Integer i) {
return i * i;
}
}).print();
env.execute();
}
}
使用lambda表达式
.map((MapFunction4·Sink KafkaSink) i -> i * i)
Flink的Kafka生产者类:FlinkKafkaProducer
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class Hello {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
env
.fromElements("a", "bb", "ccc")
.addSink(
new FlinkKafkaProducer<>("hadoop102:9092", "flink",
new SimpleStringSchema())
);
env.execute();
}
}
查看有没有写进去
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic flink自定义Sink
实现RichSinkFunction
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class Hello {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//自定义Sink
env.fromElements(1, 2, 3, 4).addSink(new RichSinkFunction() {
@Override
public void invoke(Integer value, Context context) {
System.out.println("value: " + value);
System.out.println("context: " + context);
}
});
//执行
env.execute();
}
}
Execution Mode(执行模式)打印结果
- 流式API特性:可根据使用情况和Job特点,来选择不同的运行时执行模式
- 流式API默认是使用的STREAMING执行模式,持续在线,常用于无界流,也可用于有界数据
还有一个BATCH执行模式,用于有界数据
配置方式
命令行方式配置
bin/flink run -Dexecution.runtime-mode=BATCH ...
代码方式配置
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
建议
代码方式配置 不灵活,建议用 命令行方式配置 来随时切换有界数据和无界数据
有界数据用STREAMING和BATCH的区别
STREAMING模式:来一条数据输出一次结果
BATCH模式:数据处理完之后,一次性输出结果



