栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink学习笔记之流处理核心编程--数据源读取(Source)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink学习笔记之流处理核心编程--数据源读取(Source)

本篇博客主要是写了如何从不同的数据源里读取数据,包括Flink从Java集合中读取数据、从本地文件当中读取数据、从Socket读取数据、从Kafka读取数据、从自定义Source获取数据。


目录
  • 1、从Java集合中读取数据
  • 2、从本地文件当中读取数据
  • 3、从Socket读取数据
    • 写法1
    • 写法2
  • 4、从Kafka读取数据
    • 新版写法
    • 老版写法
  • 5、从自定义Source获取数据


1、从Java集合中读取数据

一般情况下,可以将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用。这里的数据结构采用集合类型是比较普遍的。

package com.lqs.five.part1_source;

import com.lqs.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;
import java.util.List;


public class Test01_SourceList {

    public static void main(String[] args) throws Exception {
        
        //TODO 1、获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //TODO 2、从集合中获取数据

        List waterSensors = Arrays.asList(
                new WaterSensor("1", 134L, 34),
                new WaterSensor("2", 1434L, 55),
                new WaterSensor("4", 1367L, 354)
        );

        env.fromCollection(waterSensors).print();

        

        //从元素中获取数据
        //The parallelism of non parallel operator must be 1
        DataStreamSource streamSource = env.fromElements(1, 2, 3, 4, 5).setParallelism(1);

        streamSource.print();

        env.execute("SourceList");

    }
    
}

2、从本地文件当中读取数据

注意:

  1. 参数可以是目录也可以是文件
  2. 路径可以是相对路径也可以是绝对路径
  3. 相对路径是从系统属性user.dir获取路径: idea下是project的根目录, standalone模式下是集群节点根目录
  4. 也可以从hdfs目录下读取, 使用路径:hdfs://nwh120:8020/…, 由于Flink没有提供hadoop相关依赖, 需要pom中添加
    关依赖:

    org.apache.hadoop
    hadoop-client
    3.1.3
package com.lqs.five.part1_source;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class Test02_SourceFile {

    public static void main(String[] args) throws Exception {

        //TODO 1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //TODO 2、读取文件里面的数据
        env.readTextFile("input/hcc").print();

        env.execute("SourceFile");

        

    }

}
3、从Socket读取数据 写法1
package com.lqs.five.part1_source;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class Test03_SourceUnboundStream {

    public static void main(String[] args) throws Exception {

        //TODO 1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        //TODO 2、读取无界数据流
        //使用的nacat进行数据模拟
        DataStreamSource streamSource = env.socketTextStream("nwh120", 8888);

        //TODO 3、将数据按照空格切分,切出每个单词
        SingleOutputStreamOperator wordDataStream = streamSource.flatMap(
                new FlatMapFunction() {
                    @Override
                    public void flatMap(String s, Collector out) throws Exception {
                        for (String s1 : s.split(" ")) {
                            out.collect(s1);
                        }
                    }
                }
        )
                //设置共享度
//                .slotSharingGroup("gourp1")
                //与前后都断开
//                .disableChaining()
                //与前面都断开任务链
//                .startNewChain()
                //为某个算子单独设置并行度
//                .setParallelism(5)
                ;

        //TODO 4、将每个单词组成Tuple2元组
        SingleOutputStreamOperator> wordToOneDataStream = wordDataStream.map(
                new MapFunction>() {
                    @Override
                    public Tuple2 map(String value) throws Exception {
                        return Tuple2.of(value, 1);
                    }
                }
        );

        //TODO 5、将相同的单词聚合到一块
        KeyedStream, String> keyedDataStream = wordToOneDataStream.keyBy(
                new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple2 value) throws Exception {
                        return value.f0;
                    }
                }
        );

        //TODO 1、做累加计数
        SingleOutputStreamOperator> result = keyedDataStream.sum(1);

        result.print();

        env.execute("SourceUnboundStream");

    }

}
写法2
package com.lqs.five.part1_source;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class Test03_SourceSocket {

    public static void main(String[] args) throws Exception {

        //TODO 1、获取流数据执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        //TODO 2、读取无界流数据
        DataStreamSource streamSource = env.socketTextStream("nwh120", 8888);

        //TODO 3、转换数据结构
        //TODO 写法1
        SingleOutputStreamOperator> wordAndOne = streamSource.flatMap(
                new RichFlatMapFunction>() {
                    @Override
                    public void flatMap(String value, Collector> out) throws Exception {
                        //按空格切分
                        String[] split = value.split(" ");
                        //循环将切分好的数据发送到下游(下一级)
                        for (String word : split) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                    }
                }
        );

        //TODO 写法2
        

        //TODO 写法3
        

        //TODO 4、分组
        KeyedStream, String> keyedStream = wordAndOne.keyBy(
                new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple2 value) throws Exception {
                        return value.f0;
                    }
                }
        );
        //简写
//        KeyedStream, String> keyedStream = wordAndOne.keyBy(value -> value.f0);

        //TODO 5、求和
        SingleOutputStreamOperator> result = keyedStream.sum(1);

        //TODO 6、打印
        result.print();

        //TODO 7、执行
        env.execute("WordCount");

    }

}
4、从Kafka读取数据

添加依赖


    org.apache.flink
    flink-connector-kafka_2.12
    1.13.0

新版写法
package com.lqs.five.part1_source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class Test04_SourceKafka {

    public static void main(String[] args) throws Exception {

        //TODO 1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        //TODO 2、从Kafka读取数据流
        KafkaSource kafkaSource = KafkaSource.builder()
                .setBootstrapServers("nwh120:9092")
                .setTopics("test_kafka_source")
                .setGroupId("lqs_test")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStreamSource streamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");

        streamSource.print();

        env.execute("SourceKafka");

    }

}
老版写法
package com.lqs.five.part1_source;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;


public class Test04_SourceKafkaOld {

    public static void main(String[] args) throws Exception {

        //TODO 1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        //TODO 2、从Kafka读取数据流
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "nwh120:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "lqs_test");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        env.addSource(new FlinkKafkaConsumer("test_kafka_source", new SimpleStringSchema(), properties)).print();

        env.execute("SourceKafkaOld");

    }

}
5、从自定义Source获取数据
package com.lqs.five.part1_source;

import com.lqs.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

import java.util.Random;


public class Test05_CustomizeSource {

    public static void main(String[] args) throws Exception {

        //TODO 1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //TODO 2、从自定义的数据源获取数据
        env.addSource(new MySource()).print();

        env.execute("CustomizeSource");

    }

    public static class MySource implements ParallelSourceFunction{

        private Boolean isRuning=true;

        
        @Override
        public void run(SourceContext ctx) throws Exception {
            while (isRuning){
                ctx.collect(
                        new WaterSensor(
                                "test_kafka_source"+new Random().nextInt(100),
                                System.currentTimeMillis(),
                                new Random().nextInt(1000)
                        )
                );
                Thread.sleep(1000);
            }
        }

        
        @Override
        public void cancel() {
            isRuning=false;
        }
    }

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/871860.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号