1.2、日志文件配置4.0.0 org.example learn_one 1.0-SNAPSHOT UTF-8 UTF-8 1.8 1.8 1.8 2.12 1.13.1 org.projectlombok lombok 1.18.16 org.apache.flink flink-clients_${scala.version} ${flink.version} org.apache.flink flink-scala_${scala.version} ${flink.version} org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-scala_${scala.version} ${flink.version} org.apache.flink flink-streaming-java_${scala.version} ${flink.version} org.slf4j slf4j-log4j12 1.7.7 runtime log4j log4j 1.2.17 runtime com.alibaba fastjson 1.2.44
- resource目录下新建log4j.properties
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AfrbvOGz-1636364173489)(C:UsersPROTHAppDataRoamingTyporatypora-user-imagesimage-20211108115717213.png)]
- 配置内容
### 配置appender名称
log4j.rootLogger = debugFile, errorFile
### debug级别以上的⽇志到:src/logs/debug.log
log4j.appender.debugFile = org.apache.log4j.DailyRollingFileAppender
log4j.appender.debugFile.File = src/logs/flink.log
log4j.appender.debugFile.Append = true
#Threshold属性指定输出等级
log4j.appender.debugFile.Threshold = info
log4j.appender.debugFile.layout = org.apache.log4j.PatternLayout
log4j.appender.debugFile.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %n%m%n
### error级别以上的⽇志 src/logs/error.log
log4j.appender.errorFile = org.apache.log4j.DailyRollingFileAppender
log4j.appender.errorFile.File = src/logs/error.log
log4j.appender.errorFile.Append = true
log4j.appender.errorFile.Threshold = error
log4j.appender.errorFile.layout = org.apache.log4j.PatternLayout
log4j.appender.errorFile.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %n%m%n
2、代码实战
2.1、基本类型&操作
(1)元组数据类型
- Tuple类型:元组类型, 多个语⾔都有的特性, flink的java版 tuple最多⽀持 25个;
- 用途:集合⾥⾯是单个类型 ,列表只能存储相同的数据类型,⽽元组Tuple可以存储不同 的数据类型;
- 实战案例:tuple+数组,数字与数据个数有关
package com.lihaiwei.text1;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import scala.collection.generic.BitOperations;
public class Tupleclass {
public static void main(String[] args) {
Tuple2 tuple3 = Tuple2.of(1,"x");
System.out.println(tuple3.f0);
System.out.println(tuple3.f1);
}
}
- 运行结果
(2)map操作
- 作用:一对一转换对象
package com.lihaiwei.text1;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import scala.collection.generic.BitOperations;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class Tupleclass {
public static void main(String[] args) {
List list1 = new ArrayList<>();
list1.add("springboot,springcloud");
list1.add("redis6,docker");
list1.add("kafka,rabbitmq");
//⼀对⼀转换
List list2 = list1.stream().map(obj-> {
obj = "测试" + obj;
return obj;
}).collect(Collectors.toList());
System.out.println(list2);
}
}
- 运行结果
(3)flatmap操作
- 作用:一对多转换对象
package com.lihaiwei.text1;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import scala.collection.generic.BitOperations;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class Tupleclass {
public static void main(String[] args) {
List list1 = new ArrayList<>();
list1.add("springboot,springcloud");
list1.add("redis6,docker");
list1.add("kafka,rabbitmq");
//⼀对⼀转换
List list2 = list1.stream().flatMap(obj-> {
Stream stream = Arrays.stream(obj.split(","));
return stream;
}).collect(Collectors.toList());
System.out.println(list2);
}
}
- 运行结果
(1)需求
- 需求内容:根据字符串的逗号进行切割,输出
(2)代码编写
package com.hxjy.app;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class flink02 {
public static void main(String[] args) throws Exception {
// 1、构建执行任务以及任务的启动入口,存储全局相关参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2、获取数据源 -
DataStream stringDS = env.fromElements("flink,spark,hadoop","hbase,hive","kafka,redis");
stringDS.print("执行前");
// 3、进行flatmap操作
DataStream flatmapStream = stringDS.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector collector) throws Exception {
// 3.1、进行切割转换
String [] arr = value.split(",");
// 3.2、遍历收集每一个
for(String str : arr){
collector.collect(str);
}
}
});
// 4、输出
flatmapStream.print("执行后");
// 5、/DataStream需要调⽤execute,可以取个名称
env.execute("flink");
}
}
- 运行结果
(3)相关快捷键
alt+insert:快速实现方法2.3、批处理代码实战
(1)需求
- 根据字符串的逗号进⾏分割,输出
(2)代码编写
package com.hxjy.app;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class flink02 {
public static void main(String[] args) throws Exception {
// 1、构建执行任务以及任务的启动入口,存储全局相关参数
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 2、获取数据源 -
DataSet stringDS = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,⼩滴课堂");
stringDS.print("执行前");
// 3、进行flatmap操作
DataSet flatmapStream = stringDS.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector collector) throws Exception {
// 3.1、进行切割转换
String [] arr = value.split(",");
// 3.2、遍历收集每一个
for(String str : arr){
collector.collect(str);
}
}
});
// 4、输出
flatmapStream.print("执行后");
// 5、/DataStream需要调⽤execute,可以取个名称
env.execute("flink");
}
}
- 运行结果



