栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink数据格式转换,DataStream转换成指定对象

flink数据格式转换,DataStream转换成指定对象

flink实时流学习项目介绍:

       目前在个某市商业银行做实时数据展示、数据处理;项目中使用到flink框架,进行数据加工处理。针对使用到的几个业务场景,和目前学习的flink阶段自己搭建了一个实时数据加工的处理项目。
目前处在学习、整理、分享的阶段,并不具备成型、系统的理念;本次介绍的是,在flink进行数据格式的转换,为数据的统计、对比、计算奠定基础。

       使用maven构建进行项目管理,idea开发工具、JDK1.8的java环境、数据包采用的某培训机构使用的温度传感器模拟数据sensor.txt数据文件包。该项目已上传到码云,项目地址是:flink项目地址, https://gitee.com/wuzheyi520/flink_java_api_test.git


问题描述:

       flink通过source加载的数据,一般是通过DataStream进行的数据接收;较早版本的flink使用的scala,java语言的示例比较少,所以就写一篇记录,供自己以后进行查阅;也分享出来,给需要的人使用。


一、项目目录结构:

项目目录结构如下图所示,目前只是对单个API进行介绍,所以写在不同main函数中,便于进行数据的调试、加工处理;划线的地方分别是:java类,数据文件。


二、TransformDataTest_Map 类中的代码:

这里直接给出类中全部的代码,可以直接运行main函数,查看数据格式的转换。

1.其中readTextFile方法是加载数据源中的数据,该种方式能够减少搭建kafka的工作量,方便演示。

 DataStreamSource stringDataStreamSource = environment.readTextFile(filePath);

2.通过DataStream.map()方法进行数据格式的转换,在该方法中,通过MapFunction进行处理,重写类中的方法。其中MapFunction>该方法中的第1个类型String,表示数据的输入类型,也就是DataStream中的类型T;第2个类型Map表示的是:目标类型,也可以是自定义类。

 SingleOutputStreamOperator> mapStreamOperator = stringDataStreamSource.map(new MapFunction>() {
            @Override
            public Map map(String s) throws Exception {
                String[] splitData = s.split(",");
                System.out.println("---id is:"+splitData[0]+"---- timeStamp is:"+splitData[1]+"---data is:"+splitData[2]);
                Map map = new HashMap<>();
                map.put("id",splitData[0]);
                map.put("timeStamp",splitData[1]);
                map.put("data",splitData[2]);
                return map;
            }
        });

3.该实例类全部代码如下所示:

package com.wuu.flink_api_test.transformData;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.HashMap;
import java.util.Map;

public class TransformDataTest_Map {

    public static void main(String[] args) {
        System.out.println("----------------------------  start transform data ----------------");

        String filePath = "E:\B_prooject\flink_java\src\main\resources\sensor.txt";

        
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        
        DataStreamSource stringDataStreamSource = environment.readTextFile(filePath);

        
        SingleOutputStreamOperator> mapStreamOperator = stringDataStreamSource.map(new MapFunction>() {
            @Override
            public Map map(String s) throws Exception {
                String[] splitData = s.split(",");
                System.out.println("---id is:"+splitData[0]+"---- timeStamp is:"+splitData[1]+"---data is:"+splitData[2]);
                Map map = new HashMap<>();
                map.put("id",splitData[0]);
                map.put("timeStamp",splitData[1]);
                map.put("data",splitData[2]);
                return map;
            }
        });

        mapStreamOperator.print();


        try {
            environment.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}


控制数据打印效果如下图所示:


三、结束语:
if("喜欢"||"有用" || "其他"){
	"欢迎打赏支持,您的支持是我前进的动力"
}else{
	"欢迎批评指导"
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/434065.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号