栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink开发IDEA环境搭建与测试的方法

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink开发IDEA环境搭建与测试的方法

一.IDEA开发环境

1.pom文件设置


    1.8
    1.8
    UTF-8
    2.11.12
    2.11
    2.7.6
    1.6.1
  
  
    
      org.scala-lang
      scala-library
      ${scala.version}
    
    
      org.apache.flink
      flink-java
      ${flink.version}
    
    
      org.apache.flink
      flink-streaming-java_${scala.binary.version}
      ${flink.version}
    
    
      org.apache.flink
      flink-scala_${scala.binary.version}
      ${flink.version}
    
    
      org.apache.flink
      flink-streaming-scala_${scala.binary.version}
      ${flink.version}
    
    
      org.apache.flink
      flink-table_${scala.binary.version}
      ${flink.version}
    
    
      org.apache.flink
      flink-clients_${scala.binary.version}
      ${flink.version}
    
    
      org.apache.flink
      flink-connector-kafka-0.10_${scala.binary.version}
      ${flink.version}
    
    
      org.apache.hadoop
      hadoop-client
      ${hadoop.version}
    
    
      mysql
      mysql-connector-java
      5.1.38
    
    
      com.alibaba
      fastjson
      1.2.22
    
  
  
    src/main/scala
    src/test/scala
    
      
 net.alchim31.maven
 scala-maven-plugin
 3.2.0
 
   
     
compile
testCompile
     
     

  
  -dependencyfile
  ${project.build.directory}/.scala_dependencies

     
   
 
      
      
 org.apache.maven.plugins
 maven-surefire-plugin
 2.18.1
 
   false
   true
   
     ***Suite.*
   
 
      
      
 org.apache.maven.plugins
 maven-shade-plugin
 3.0.0
 
   
     package
     
shade
     
     

  
    *:*
    
      meta-INF
object SocketWindowWordCountScala {
 def main(args: Array[String]) : Unit = {
  // 定义一个数据类型保存单词出现的次数
  case class WordWithCount(word: String, count: Long)
  // port 表示需要连接的端口
  val port: Int = try {
   ParameterTool.fromArgs(args).getInt("port")
  } catch {
   case e: Exception => {
    System.err.println("No port specified. Please run 'SocketWindowWordCount --port '")
    return
   }
  }
  // 获取运行环境
  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  // 连接此socket获取输入数据
  val text = env.socketTextStream("node21", port, 'n')
  //需要加上这一行隐式转换 否则在调用flatmap方法的时候会报错
  import org.apache.flink.api.scala._
  // 解析数据, 分组, 窗口化, 并且聚合求SUM
  val windowCounts = text
   .flatMap { w => w.split("\s") }
   .map { w => WordWithCount(w, 1) }
   .keyBy("word")
   .timeWindow(Time.seconds(5), Time.seconds(1))
   .sum("count")
  // 打印输出并设置使用一个并行度
  windowCounts.print().setParallelism(1)
  env.execute("Socket Window WordCount")
 }
}

2.Java代码

package com.xyg.streaming;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


public class StreamingWindowWordCountJava {
  public static void main(String[] args) throws Exception {
  //定义socket的端口号
  int port;
  try{
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    port = parameterTool.getInt("port");
  }catch (Exception e){
    System.err.println("没有指定port参数,使用默认值9000");
    port = 9000;
  }
  //获取运行环境
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  //连接socket获取输入的数据
  DataStreamSource text = env.socketTextStream("node21", port, "n");
  //计算数据
  DataStream windowCount = text.flatMap(new FlatMapFunction() {
    public void flatMap(String value, Collector out) throws Exception {
      String[] splits = value.split("\s");
      for (String word:splits) {
 out.collect(new WordWithCount(word,1L));
      }
    }
  })//打平操作,把每行的单词转为类型的数据
      //针对相同的word数据进行分组
      .keyBy("word")
      //指定计算数据的窗口大小和滑动窗口大小
      .timeWindow(Time.seconds(2),Time.seconds(1))
      .sum("count");
  //把数据打印到控制台,使用一个并行度
  windowCount.print().setParallelism(1);
  //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
  env.execute("streaming word count");
}

  
  public static class WordWithCount{
    public String word;
    public long count;
    public WordWithCount(){}
    public WordWithCount(String word, long count) {
      this.word = word;
      this.count = count;
    }

    @Override
    public String toString() {
      return "WordWithCount{" +
   "word='" + word + ''' +
   ", count=" + count +
   '}';
    }
  }

}

3.运行测试

首先,使用nc命令启动一个本地监听,命令是:

[admin@node21 ~]$ nc -l 9000

通过netstat命令观察9000端口。netstat -anlp | grep 9000,启动监听如果报错:-bash: nc: command not found,请先安装nc,在线安装命令:yum -y install nc。

然后,IDEA上运行flink官方案例程序

node21上输入

IDEA控制台输出如下

4.集群测试

这里单机测试官方案例

[admin@node21 flink-1.6.1]$ pwd
/opt/flink-1.6.1
[admin@node21 flink-1.6.1]$ ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node21.
Starting taskexecutor daemon on host node21.
[admin@node21 flink-1.6.1]$ jps
StandaloneSessionClusterEntrypoint
TaskManagerRunner
Jps
[admin@node21 flink-1.6.1]$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

程序连接到套接字并等待输入。您可以检查Web界面以验证作业是否按预期运行:

单词在5秒的时间窗口(处理时间,翻滚窗口)中计算并打印到stdout。监视TaskManager的输出文件并写入一些文本nc(输入在点击后逐行发送到Flink):

三.使用IDEA开发离线程序

Dataset是flink的常用程序,数据集通过source进行初始化,例如读取文件或者序列化集合,然后通过transformation(filtering、mapping、joining、grouping)将数据集转成,然后通过sink进行存储,既可以写入hdfs这种分布式文件系统,也可以打印控制台,flink可以有很多种运行方式,如local、flink集群、yarn等.

1. scala程序

package com.xyg.batch

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._


object WordCountScala{
 def main(args: Array[String]) {
  //初始化环境
  val env = ExecutionEnvironment.getExecutionEnvironment
  //从字符串中加载数据
  val text = env.fromElements(
   "Who's there?",
   "I think I hear them. Stand, ho! Who's there?")
  //分割字符串、汇总tuple、按照key进行分组、统计分组后word个数
  val counts = text.flatMap { _.toLowerCase.split("\W+") filter { _.nonEmpty } }
   .map { (_, 1) }
   .groupBy(0)
   .sum(1)
  //打印
  counts.print()
 }
}

2. java程序

package com.xyg.batch;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


public class WordCountJava {
  public static void main(String[] args) throws Exception {
    //构建环境
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    //通过字符串构建数据集
    DataSet text = env.fromElements(
 "Who's there?",
 "I think I hear them. Stand, ho! Who's there?");
    //分割字符串、按照key进行分组、统计相同的key个数
    DataSet> wordCounts = text
 .flatMap(new LineSplitter())
 .groupBy(0)
 .sum(1);
    //打印
    wordCounts.print();
  }
  //分割字符串的方法
  public static class LineSplitter implements FlatMapFunction> {
    @Override
    public void flatMap(String line, Collector> out) {
      for (String word : line.split(" ")) {
 out.collect(new Tuple2(word, 1));
      }
    }
  }
}

3.运行

到此这篇关于Flink开发IDEA环境搭建与测试的方法的文章就介绍到这了,更多相关Flink IDEA环境搭建 内容请搜索考高分网以前的文章或继续浏览下面的相关文章希望大家以后多多支持考高分网!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/134603.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号