栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

11. flink createRemoteEnvironment

11. flink createRemoteEnvironment

createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
意思是将你本地的代码打的jar包,远程提交到已经存在的flink集群上.注意此程序再idea运行的时候,idea上不会有任何输出的.在这种模式下idea就是相当于一个传输所需jar文件的客户端,程序一旦执行之后,就和idea无关了.

1.代码如下:

package com.flink

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object StreamingJob {
  def main(args: Array[String]) {
//最后一个参数指的是当前代码打的包的路径
    val env = StreamExecutionEnvironment.createRemoteEnvironment("192.168.68.137",8081,"D:\IT\Project\FlinkDemo\target\FlinkDemo-1.0-SNAPSHOT.jar")
    val text = env.socketTextStream("localhost", 9999)

    val counts = text.flatMap { _.toLowerCase.split("\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .sum(1)
//下面的输出不会再idea上输出
    counts.print()

    env.execute("Window 333 WordCount")
  }
}

2.在idea运行程序之前需要先maven打包 3.linux 运行: nc -l 9999

自己输入需要统计的单词.

4.在flink集群, ip:8081 查看运行的结果

注:在上面的解说中:我没有提linux flink环境的安装,以及项目的构建,详情参考:安装以及demo

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/751395.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号