栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

学习笔记Flink(五)—— Flink开发环境配置及运行实例(单词计数)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

学习笔记Flink(五)—— Flink开发环境配置及运行实例(单词计数)

一、Intellij IDEA 环境配置

1、创建Maven工程

1.1、开发环境

Maven && JDK


1.2、Pom配置

Compiler Configuration

在pom.xml添加:


 	1.8
    1.8



    
        
            org.apache.maven.plugins
            maven-compiler-plugin
            3.6.1
            
                1.8
                1.8
            
        
    

Flink Dependency
在pom.xml添加:


    
    
        org.apache.flink
        flink-streaming-scala_2.11
        1.10.1
    
    
    
        org.apache.flink
        flink-clients_2.11
        1.10.1
    

    
        org.slf4j
        slf4j-log4j12
        1.7.7
        runtime
    
    
        log4j
        log4j
        1.2.17
        runtime
    

然后IDEA就会自动帮我们添加依赖


1.3、Log配置

log4j.properties
在resources下建立log4j.properties文件并添加(linux上flink安装目录conf下的log4j-console.properties内容):

# This affects logging for both user code and Flink
log4j.rootLogger=INFO, console

# Uncomment this if you want to _only_ change Flink's logging
log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console

1.4、添加Scala支持

在main文件下新建一个scala目录,并设置为source root

添加scala支持


二、案例:Flink单词计算本地实战

代码:

package demo
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WindowWordCount {
  def main(args: Array[String]): Unit = {
    val host = "node110"
    val port = 9999
    val windowSeconds = 5

    //get env
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //create data source
    val source = env.socketTextStream(host, port)
    val counts = source
      .flatMap { line => line.toLowerCase.split("\W+").filter(word => word.nonEmpty) }
      .map { word => (word, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(windowSeconds))
      .sum(1)
    //add sink
    counts.print()
    //execute
    env.execute("Window Stream Word Count with paremters")
  }
}

测试:
linux执行命令:nc -lk 9999


三、Flink集群运行实战 3.1、代码修改
if(args.length != 3){
  println("Usage: WindowWordCount   ")
  System.exit(1)
}
val host = args(0)
val port = args(1).toInt
val windowSeconds = args(2).toInt
3.2、程序打包 & 上传

3.3、运行

先开启一个会话(注意先开启,否则执行会出现一堆东西)

再开一个会话


输入数据:

网页查看:


四、Dataset API实现(单词计数)

数据文件 input.txt:

I am a student
I love the world

代码:

package demo

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironment

object WordCount {
  def main(args: Array[String]): Unit = {
    //get env
    val env = ExecutionEnvironment.getExecutionEnvironment
    //create data source
    val source = env.readTextFile("D:\java test\flink_test\src\main\resources\input.txt")
    val counts = source
      .flatMap { line => line.toLowerCase.split("\W+").filter(word => word.nonEmpty) }
      .map { word => (word, 1) }
      .groupBy(0)
      .sum(1)
    //add sink
    counts.writeAsText("D:\java test\flink_test\target\output00")
    //execute
    env.execute("Batch Word Count")
  }
}

结果

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/336894.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号