栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

day11&12 ---- Flink

day11&12 ---- Flink

学习链接
##关闭flink集群
[root@hadoop01 flink-1.10.1]# bin/stop-cluster.sh

### zookeeper
bin/zkServer.sh start

##关闭所有服务
[root@hadoop01 flink-1.10.1]# stop-all.sh
    数据下沉后为什么会生成多个文件?作业完成后生成的文件数与什么有关?并行度如何设置?优先级?
1 Flink 1.1 简介

批,流一体化的框架
批:离线处理
流:实时处理

tar-xvzf
-x 从档案文件中释放文件。
-v 详细报告tar处理的文件信息。如无此选项,tar不报告文件信息。
-z 用gzip来压缩/解压缩文件,加上该选项后可以将档案文件进行压缩,
但还原时也一定要使用该选项进行解压缩。
-f 使用档案文件或设备,这个选项通常是必选的。

1.2 集群搭建 上传解压

上传Flink 压缩包到指定目录
解压缩flink 到/opt/servers 目录

cd /opt/servers
tar -xvzf flink-1.10.1-bin-scala_2.12.tgz -C ../servers/
修改配置文件

    修改安装目录下conf 文件夹内的flink-conf.yaml 配置文件,指定JobManager

    cd /opt/servers/flink-1.10.1/conf/
    ##创建目录
    mkdir -p /opt/servers/flink-1.10.1/tmp
    

    修改配置文件:flink-conf.yaml

    #配置Master 的机器名(IP 地址)
    
    jobmanager.rpc.address: hadoop01
    
    #配置每个taskmanager 生成的临时文件夹
    
    taskmanager.tmp.dirs: /opt/servers/flink-1.10.1/tmp
    

    修改安装目录下conf 文件夹内的slave 配置文件,指定TaskManager

    hadoop01
    hadoop02
    hadoop03
    

    使用vi 修改/etc/profile 系统环境变量配置文件,添加HADOOP_CONF_DIR 目录

    export HADOOP_CONF_DIR=/opt/servers/hadoop-2.7.7/etc/hadoop
    

    YARN_CONF_DIR 或者HADOOP_CONF_DIR 必须将环境变量设置为读取YARN 和HDFS 配置

    新版本需要增加hadoop的附加组件,下载一个jar包放在Flink的lib目录下

    下载地址:https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/

    分发/etc/profile 到其他两个节点

    scp -r /etc/profile hadoop02:/etc
    scp -r /etc/profile hadoop03:/etc
    

    每个节点重新加载环境变量

    source /etc/profile
    

    将配置好的Flink 目录分发给其他的两台节点

    cd /opt/servers
    scp -r flink-1.10.1/ hadoop02:$PWD
    scp -r flink-1.10.1/ hadoop03:$PWD
    
1.3 启动

启动Flink 集群

 cd /opt/servers/flink-1.10.1
 bin/start-cluster.sh


启动HDFS 集群

start-all.sh
1.4 使用 Flink 测试词频统计

在HDFS 中创建/test/input 目录

 hadoop fs -mkdir -p /test/input

上传wordcount.txt 文件到HDFS /test/input 目录

 hadoop fs -put /root/wordcount.txt /test/input

并运行测试任务

bin/flink run examples/batch/WordCount.jar --input hdfs://hadoop01:8020/test/input/wordcount.txt --output hdfs://hadoop01:8020/test/output/001

不output直接打印控制台

浏览Flink Web UI 界面

http://hadoop01:8081

启动flink集群

 cd /opt/servers/flink-1.10.1
 bin/start-cluster.sh
高可用HA

修改slots
taskmanager.numberOfTaskSlots:
2 2 4

cd /opt/servers/flink-1.10.1/conf/
[root@hadoop03 conf]# vim flink-conf.yaml


2 词频统计 2.1 java
    
        
            org.apache.flink
            flink-core
            1.10.1
        
        
            org.apache.flink
            flink-java
            1.10.1
        
        
            org.apache.flink
            flink-streaming-java_2.12
            1.10.1
        
    
package cn.tedu;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        //构建批处理的运行环境
        ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
        //读取文件
        DataSource source = env.readTextFile("flink_java/data/wordcount.txt");
//        source.print();
        //词频统计                          <输入类型,输出类型>
        FlatMapOperator> wordAndOne = source.flatMap(new FlatMapFunction>() {
            //每来一条数据,调用一次flatmap处理数据
            @Override
            public void flatMap(String value, Collector> out) throws Exception {
                //获取每个词的数组
                String[] words = value.split(" ");
                //循环输出
                for (String word : words) {
                    out.collect(new Tuple2(word, 1));
                }
            }
        });
//        wordAndOne.print();                   (hello,1)(hadoop,1)
        UnsortedGrouping> groupeWords = wordAndOne.groupBy(0);
        AggregateOperator> result = groupeWords.sum(1);
        //数据sink保存
        result.print();
    }
}

2.2 scala


数据下沉后为什么会生成多个文件?

package cn.tedu.batch

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.core.fs.FileSystem.WriteMode

object BatchWordCount {

  def main(args: Array[String]): Unit = {
    //构建Flink批处理的运行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //读取数据
    //val source: DataSet[String] = env.readTextFile("FlinkSCALA/data/wordcount.txt")
    val source: DataSet[String]
    = env.readTextFile("hdfs://hadoop01:8020/test/input/wordcount.txt")
    //source.print()
    //hello flink kafka flink kafka flink --> hello ,1
    //数据转化
    //需要导入隐式转换
    import org.apache.flink.api.scala._
    val words: DataSet[String] = source.flatMap(_.split(" "))
    val wordAndOne = words.map((_, 1))
    //分组
    val groupedDataSet: GroupedDataSet[(String, Int)] = wordAndOne.groupBy(0)
    val result = groupedDataSet.sum(1)
    //val result = source.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
    //数据下沉
    //result.print()
    result.writeAsText("hdfs://hadoop01:8020/test/output/102")
    //执行批处理,flink中是惰性加载
    env.execute()
  }

}


文件数 根据计算机处理器数生成

打包到服务器


作业完成后生成的文件数与什么有关?

package cn.tedu.batch

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.core.fs.FileSystem.WriteMode

object BatchWordCount {

  def main(args: Array[String]): Unit = {
    //构建Flink批处理的运行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //读取数据
    //val source: DataSet[String] = env.readTextFile("FlinkSCALA/data/wordcount.txt")
    val source: DataSet[String]
    = env.readTextFile("hdfs://hadoop01:8020/test/input/wordcount.txt")
    //source.print()
    //hello flink kafka flink kafka flink --> hello ,1
    //数据转化
    //需要导入隐式转换
    import org.apache.flink.api.scala._
    val words: DataSet[String] = source.flatMap(_.split(" "))
    val wordAndOne = words.map((_, 1))
    //分组
    val groupedDataSet: GroupedDataSet[(String, Int)] = wordAndOne.groupBy(0)
    val result = groupedDataSet.sum(1)
    //val result = source.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
    //数据下沉
    //result.print()
    result.writeAsText("hdfs://hadoop01:8020/test/output/101",WriteMode.OVERWRITE)
    //执行批处理,flink中是惰性加载
    env.execute()

  }

}

打包
上传服务器

[root@hadoop01 flink-1.10.1]# bin/flink run -m yarn-cluster /opt/data/flink_scala-1.0-SNAPSHOT.jar


在flink界面提交作业


并行度设置为2,submit

并行度



并行度优先级

算子的并行度 > env环境的并行度 > 运行参数的并行度 > 配置文件的默认并行度

2.3 源的读取(scala)
package cn.tedu.batch.source

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}


object BatchFromFile {
  def main(args: Array[String]): Unit = {
    //构建数据源
    val env=ExecutionEnvironment.getExecutionEnvironment

    //1.读取本地文件
    val source1: DataSet[String]= env.readTextFile("flink_scala/data/wordcount.txt")
//    source1.print()

    //2.读取hdfs文件
    val source2: DataSet[String] = env.readTextFile("hdfs://hadoop01:8020/test/input/wordcount.txt")
//    source2.print()

    //3.读取csv文件
    import org.apache.flink.api.scala._
    val source3: DataSet[Subject] = env.readCsvFile[Subject]("flink_scala/data/subject.csv")
//    source3.print()

    //4.读取压缩文件
    val source4: DataSet[String]= env.readTextFile("flink_scala/data/wordcount.txt.gz")
    source4.print()


  }
  case class Subject(id:Int,subjectName:String)

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761327.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号