栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

大数据(9d)Flink流处理核心编程

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

大数据(9d)Flink流处理核心编程

文章目录
  • IDE环境准备
    • 准备依赖和打包插件
    • 日志配置:创建`log4j.properties`
    • 编辑IDEA运行时配置
  • 开发步骤
    • 1·Environment
    • 2·Source
      • 从Java的集合中读取数据
      • 从Kafka获取数据
      • 自定义数据源
    • 3·Transform
    • 4·Sink
      • KafkaSink
      • 自定义Sink
  • Execution Mode(执行模式)

IDE环境准备 准备依赖和打包插件

Flink版本:1.13


    1.13.1
    2.12
    1.7.30



    
    
        org.apache.flink
        flink-java
        ${flink.version}
        provided  
    
    
        org.apache.flink
        flink-streaming-java_${scala.binary.version}
        ${flink.version}
        provided
    
    
        org.apache.flink
        flink-clients_${scala.binary.version}
        ${flink.version}
        provided
    
    
        org.apache.flink
        flink-runtime-web_${scala.binary.version}
        ${flink.version}
        provided
    
    
    
        org.slf4j
        slf4j-api
        ${slf4j.version}
        provided
    
    
        org.slf4j
        slf4j-log4j12
        ${slf4j.version}
        provided
    
    
        org.apache.logging.log4j
        log4j-to-slf4j
        2.14.0
        provided
    



    
        
            org.apache.maven.plugins
            maven-shade-plugin
            3.2.4
            
                
                    package
                    
                        shade
                    
                    
                        
                            
                                com.google.code.findbugs:jsr305
                                org.slf4j:*
                                log4j:*
                            
                        
                        
                            
                                
                                *:*
                                
                                    meta-INF/*.SF
                                    meta-INF/*.DSA
                                    meta-INF/*.RSA
                                
                            
                        
                        
                            
                            
                        
                    
                
            
        
    

日志配置:创建log4j.properties
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
编辑IDEA运行时配置

Include dependencies with “Provided” scope

开发步骤

1·Environment
  • Job提交前,需要先建立与Flink框架之间的联系,也就是当前的Flink运行环境
// 批处理环境
ExecutionEnvironment benv = ExecutionEnvironment.getExecutionEnvironment();
// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2·Source
  • 获取数据
从Java的集合中读取数据
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

public class Hello {
    public static void main(String[] args) throws Exception {
        //创建集合
        ArrayList al = new ArrayList();
        al.add(1);
        al.add(2);
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //从集合中获取数据并打印
        env.fromCollection(al).print();
        //执行
        env.execute();
    }
}
从Kafka获取数据

创建Kafka主题

kafka-topics.sh 
--replication-factor 1 
--partitions 1 
--zookeeper hadoop102:2181/kafka 
--create 
--topic flink

Flink的Kafka依赖


    org.apache.flink
    flink-connector-kafka_2.12
    1.13.1

运行Java代码

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class Hello {
    public static void main(String[] args) throws Exception {
        //Kafka配置
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop100:9092,hadoop101:9092,hadoop102:9092");
        properties.setProperty("group.id", "g1");
        properties.setProperty("auto.offset.reset", "latest");
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env
                .addSource(new FlinkKafkaConsumer<>("flink", new SimpleStringSchema(), properties))
                .print("Kafka");
        //执行
        env.execute();
    }
}

生产数据

kafka-producer-perf-test.sh 
--throughput -1 
--print-metrics 
--num-records 10000 
--record-size 1024 
--producer-props bootstrap.servers=hadoop102:9092 
--topic flink
自定义数据源
  1. 实现SourceFunction接口
  2. 覆写run和cancel方法
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.ArrayList;

public class Hello {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //创建自定义数据源
        ArrayList al = new ArrayList<>();
        al.add(333);
        al.add(4444);
        al.add(55555);
        MySource mySource = new MySource(al);
        //加入数据源
        env.addSource(mySource).print();
        //执行
        env.execute();
    }

    public static class MySource implements SourceFunction {
        private final ArrayList al;

        public MySource(ArrayList al) {
            this.al = al;
        }

        @Override
        public void run(SourceContext sc) {
            for (Integer integer : al) {
                sc.collect(integer);
            }
        }

        @Override
        public void cancel() {
        }
    }
}
3·Transform
  • 数据转换
    有map、flatMap、filter、keyBy、shuffle、reduce……
    下面以map为例
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Hello {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements(1, 2, 3, 4, 5).map(new MapFunction() {
            @Override
            public Integer map(Integer i) {
                return i * i;
            }
        }).print();
        env.execute();
    }
}

使用lambda表达式

.map((MapFunction) i -> i * i)
4·Sink KafkaSink

Flink的Kafka生产者类:FlinkKafkaProducer

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

public class Hello {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        env
                .fromElements("a", "bb", "ccc")
                .addSink(
                        new FlinkKafkaProducer<>("hadoop102:9092", "flink",
                        new SimpleStringSchema())
                );
        env.execute();
    }
}

查看有没有写进去

kafka-console-consumer.sh 
--bootstrap-server hadoop102:9092 
--from-beginning 
--topic flink
自定义Sink

实现RichSinkFunction

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class Hello {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //自定义Sink
        env.fromElements(1, 2, 3, 4).addSink(new RichSinkFunction() {
            @Override
            public void invoke(Integer value, Context context) {
                System.out.println("value: " + value);
                System.out.println("context: " + context);
            }
        });
        //执行
        env.execute();
    }
}

打印结果

Execution Mode(执行模式)
  • 流式API特性:可根据使用情况和Job特点,来选择不同的运行时执行模式
  • 流式API默认是使用的STREAMING执行模式,持续在线,常用于无界流,也可用于有界数据
    还有一个BATCH执行模式,用于有界数据

配置方式

命令行方式配置

bin/flink run -Dexecution.runtime-mode=BATCH ...

代码方式配置

env.setRuntimeMode(RuntimeExecutionMode.BATCH);

建议
代码方式配置 不灵活,建议用 命令行方式配置 来随时切换有界数据和无界数据

有界数据用STREAMING和BATCH的区别

STREAMING模式:来一条数据输出一次结果
BATCH模式:数据处理完之后,一次性输出结果

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/458469.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号