目前在个某市商业银行做实时数据展示、数据处理;项目中使用到flink框架,进行数据加工处理。针对使用到的几个业务场景,和目前学习的flink阶段自己搭建了一个实时数据加工的处理项目。
目前处在学习、整理、分享的阶段,并不具备成型、系统的理念;本次介绍的是,在flink进行数据格式的转换,为数据的统计、对比、计算奠定基础。
使用maven构建进行项目管理,idea开发工具、JDK1.8的java环境、数据包采用的某培训机构使用的温度传感器模拟数据sensor.txt数据文件包。该项目已上传到码云,项目地址是:flink项目地址, https://gitee.com/wuzheyi520/flink_java_api_test.git
问题描述:
flink通过source加载的数据,一般是通过DataStream进行的数据接收;较早版本的flink使用的scala,java语言的示例比较少,所以就写一篇记录,供自己以后进行查阅;也分享出来,给需要的人使用。
一、项目目录结构:
项目目录结构如下图所示,目前只是对单个API进行介绍,所以写在不同main函数中,便于进行数据的调试、加工处理;划线的地方分别是:java类,数据文件。
二、TransformDataTest_Map 类中的代码:
这里直接给出类中全部的代码,可以直接运行main函数,查看数据格式的转换。
1.其中readTextFile方法是加载数据源中的数据,该种方式能够减少搭建kafka的工作量,方便演示。
DataStreamSourcestringDataStreamSource = environment.readTextFile(filePath);
2.通过DataStream.map()方法进行数据格式的转换,在该方法中,通过MapFunction进行处理,重写类中的方法。其中MapFunction
SingleOutputStreamOperator
3.该实例类全部代码如下所示:
package com.wuu.flink_api_test.transformData;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.HashMap;
import java.util.Map;
public class TransformDataTest_Map {
public static void main(String[] args) {
System.out.println("---------------------------- start transform data ----------------");
String filePath = "E:\B_prooject\flink_java\src\main\resources\sensor.txt";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
DataStreamSource stringDataStreamSource = environment.readTextFile(filePath);
SingleOutputStreamOperator
控制数据打印效果如下图所示:
三、结束语:
if("喜欢"||"有用" || "其他"){
"欢迎打赏支持,您的支持是我前进的动力"
}else{
"欢迎批评指导"
}



