栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink流式消费者

flink流式消费者

目录

一.手动模拟传感器数据,并获取数据输出输出

注意:获取流式环境和env.execute()是每一种方法必须的,但是由于我直接在一个作业里写就不重复敲了0.0,核数倒是可不设定

 二.用流式消费者读取本地测试文件

三.读取外部随机数据

注意:有关随机类rand.nextGaussian()方法可参照下列同道中人的博客:搁这,搁这0.0(点前面那个蓝蓝的玩意)

 


一.手动模拟传感器数据,并获取数据输出输出

//下列导包均可在敲完需要的代码后Alt+回车补齐

注意:获取流式环境和env.execute()是每一种方法必须的,但是由于我直接在一个作业里写就不重复敲了0.0,核数倒是可不设定
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import scala.util.Random
case class SensorReading(id: String,timeStamp: Long, temperayure: Double)//创建一个类叫SensorReading,属性有id,时间,以及温度
object 流式消费者2 {
  def main(args: Array[String]): Unit = {
    val env =StreamExecutionEnvironment.getExecutionEnvironment//获取流式环境
    env.setParallelism(1)//只有一核操作
    val dataList=List(
     SensorReading("sesor_1",154771889,35.8),//手动模拟传感器获取数据
      SensorReading("sesor_2",154771889,35.8),
      SensorReading("sesor_3",154771889,35.8),
      SensorReading("sesor_1",154771889,35.8),
      SensorReading("sesor_10",154771889,16.8),


    )
    val stream1=env.fromCollection(dataList)//获取传感器数据
    stream1.print()
    env.execute() //env.execute 是启动Flink作业所必需的,只有在 execute () 被调用时,之前调用的各个操作才会在提交到集群上或本地计算机上执行(嗨呀,说白点就是启动流计算0.0)

输出结果:SensorReading(sesor_1,154771889,35.8)
SensorReading(sesor_2,154771889,35.8)
SensorReading(sesor_3,154771889,35.8)
SensorReading(sesor_1,154771889,35.8)
SensorReading(sesor_10,154771889,16.8)

 二.用流式消费者读取本地测试文件
val inputPath="F:\Scala课堂\src\main\resources\温度测试文件" //本地文件路径
    val stream2=env.readTextFile(inputPath)//获取文件路径
    stream2.print()

输出结果:你文件里有啥就输出啥呗

三.读取外部随机数据

注意:有关随机类rand.nextGaussian()方法可参照下列同道中人的博客:搁这,搁这0.0(点前面那个蓝蓝的玩意)

 
 val stream3=env.addSource(new MySensorSource)//传感器
    stream3.print()
    env.execute()//env.execute 是启动Flink作业所必需的,只有在 execute () 被调用时,之前调用的各个操作才会在提交到集群上或本地计算机上执行(嗨呀,说白点就是启动流计算0.0)
  }

}
class MySensorSource() extends SourceFunction[SensorReading]{ //创建一个类叫MySensorSource,并继承SourceFunction类的所有属性
  var running: Boolean = true
  override def cancel():Unit=running=false //重载cancel方法
  override def run(sourceContext:SourceFunction.SourceContext[SensorReading]):Unit={ //重载run方法
    val rand=new Random()//产生随机数
    var curTemp = 1.to(10).map(i => ("sensor_" + i, rand.nextDouble() * 100))//取1到10遍历输出sensor_+“i”,rand.nextDouble取随机数
    while (running){ //从这里开始死循环
       curTemp=curTemp.map(
        data=>(data._1,data._2+rand.nextGaussian()))//data._1元组
         val curTime=System.currentTimeMillis()//获取时间戳
      curTemp.foreach(//foreach 用于列出集合里的所有元素(也就是遍历,那为啥不用map?. さな 知るか)
        data=>sourceContext.collect(SensorReading(data._1,curTime,data._2))
      )
      Thread.sleep(500)//0.5秒停一次

    }
  }
}

输出结果:SensorReading(sensor_1,1648388279347,93.75563732606835)
SensorReading(sensor_2,1648388279347,12.783324672535295)
SensorReading(sensor_3,1648388279347,97.7441929445475)
SensorReading(sensor_4,1648388279347,68.96699184741615)
SensorReading(sensor_5,1648388279347,61.47358147904229)
SensorReading(sensor_6,1648388279347,24.80700220290054)
SensorReading(sensor_7,1648388279347,50.61305009703232)
SensorReading(sensor_8,1648388279347,86.61766537245579)
SensorReading(sensor_9,1648388279347,11.357894020799383)
SensorReading(sensor_10,1648388279347,68.83351832543248)//你不按挺会一直输出

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/779412.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号