栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink简介和入门API

Flink简介和入门API

什么是Flink

        Flink起源于Stratosphere项目,Stratosphere是在2010~2014年由3所地处柏林的大学和欧洲的一些其他的大学共同进行的研究项目,2014年4月Stratosphere的代码被复制并捐赠给了Apache软件基金会,参加这个孵化项目的初始成员是Stratosphere系统的核心开发人员,2014年12月,Flink一跃成为Apache软件基金会的顶级项目。

        在德语中,Flink一词表示快速和灵巧,项目采用一只松鼠的彩色图案作为logo,这不仅是因为松鼠具有快速和灵巧的特点,还因为柏林的松鼠有一种迷人的红棕色,而Flink的松鼠logo拥有可爱的尾巴,尾巴的颜色与Apache软件基金会的logo颜色相呼应,也就是说,这是一只Apache风格的松鼠。

        Flink项目的理念是:“Apache Flink是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源的有状态的流处理框架”。 

        Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。

Flink和Spark的区别
        Flink:事件驱动型:被动拉取数据,(如果没数据的时候什么也不干,节省资源)
        Spark:时间驱动型:主动拉取数据,(即使没有数据,到达一定时间,也会去计算,浪费资源)

        Flink:一切都是由流构成的
        Spark:一切都是由批构成的

        批处理的特点是有界、大量,非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计。

        流处理的特点是无界、实时,  无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。

        在spark的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。

        而在flink的世界观中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流。

        Flink:在流的基础上做批处理  Flink实时处理毫秒级
        Spark:在批的基础上做流处理  Spark实时处理秒级

        Spark吞吐量要比Flink大

Flink简单的API

文件设置

hello flink
hello world
hello China
nihao flink

依赖设置

    
        1.13.0
        1.8
        2.12
        1.7.30
    

    
        
            org.apache.flink
            flink-java
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-java_${scala.binary.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-clients_${scala.binary.version}
            ${flink.version}
        

        
            org.apache.flink
            flink-runtime-web_${scala.binary.version}
            ${flink.version}
        

        
            org.slf4j
            slf4j-api
            ${slf4j.version}
        
        
            org.slf4j
            slf4j-log4j12
            ${slf4j.version}
        
        
            org.apache.logging.log4j
            log4j-to-slf4j
            2.14.0
        
    

    
        
            
                org.apache.maven.plugins
                maven-assembly-plugin
                3.3.0
                
                    
                        jar-with-dependencies
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
        
    

1.批处理WordCount

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class Flink01_Batch_WordCount {
    public static void main(String[] args) throws Exception {
        //1.创建批处理环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //2.读取文件中的数据
        DataSource dataSource = env.readTextFile("input/words.txt");

        
        //3.将数据按照空格切分,组成Tuple2元组
        FlatMapOperator> wordToOne = dataSource.flatMap(new MyFlatMap());

        //4.将相同的单词聚合到一块
        UnsortedGrouping> groupBy = wordToOne.groupBy(0);

        //5.将单词的个数做累加
        AggregateOperator> result = groupBy.sum(1);

        result.print();


    }
    public static class MyFlatMap implements FlatMapFunction>{

        @Override
        public void flatMap(String value, Collector> out) throws Exception {
            //将数据按照空格切分
            String[] words = value.split(" ");
            //遍历出每一个单词
            for (String word : words) {
                out.collect(Tuple2.of(word,1));
            }

        }
    }
}

2.有界流处理WordCount

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class stream_test01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource streamSource = env.readTextFile("input/hello.txt");

        SingleOutputStreamOperator flatWord = streamSource.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String s, Collector collector) throws Exception {
                String[] words = s.split(" ");
                for (String word : words) {
                    collector.collect(word);
                }
            }
        });

        SingleOutputStreamOperator> mapWord = flatWord.map(new MapFunction>() {
            @Override
            public Tuple2 map(String s) throws Exception {
                return Tuple2.of(s, 1);
            }
        });

        KeyedStream, Tuple> tuple2TupleKeyedStream = mapWord.keyBy(0);

        SingleOutputStreamOperator> result = tuple2TupleKeyedStream.sum(1);

        result.print();

        env.execute();


    }
}

3.无界流处理WordCount

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class stream_test02 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource socketTextStream = env.socketTextStream("hadoop102", 9999);

        SingleOutputStreamOperator stringSingleOutputStreamOperator = socketTextStream.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String s, Collector collector) throws Exception {
                String[] words = s.split(" ");
                for (String word : words) {
                    collector.collect(word);
                }
            }
        });

        SingleOutputStreamOperator> map = stringSingleOutputStreamOperator.map(new MapFunction>() {
            @Override
            public Tuple2 map(String s) throws Exception {
                return Tuple2.of(s, 1);
            }
        });

        KeyedStream, Tuple> tuple2TupleKeyedStream = map.keyBy(0);

        SingleOutputStreamOperator> result = tuple2TupleKeyedStream.sum(1);

        result.print();

        env.execute();

    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/780477.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号