目录
一.手动模拟传感器数据,并获取数据输出输出
注意:获取流式环境和env.execute()是每一种方法必须的,但是由于我直接在一个作业里写就不重复敲了0.0,核数倒是可不设定
二.用流式消费者读取本地测试文件
三.读取外部随机数据
注意:有关随机类rand.nextGaussian()方法可参照下列同道中人的博客:搁这,搁这0.0(点前面那个蓝蓝的玩意)
一.手动模拟传感器数据,并获取数据输出输出
//下列导包均可在敲完需要的代码后Alt+回车补齐
注意:获取流式环境和env.execute()是每一种方法必须的,但是由于我直接在一个作业里写就不重复敲了0.0,核数倒是可不设定
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import scala.util.Random
case class SensorReading(id: String,timeStamp: Long, temperayure: Double)//创建一个类叫SensorReading,属性有id,时间,以及温度
object 流式消费者2 {
def main(args: Array[String]): Unit = {
val env =StreamExecutionEnvironment.getExecutionEnvironment//获取流式环境
env.setParallelism(1)//只有一核操作
val dataList=List(
SensorReading("sesor_1",154771889,35.8),//手动模拟传感器获取数据
SensorReading("sesor_2",154771889,35.8),
SensorReading("sesor_3",154771889,35.8),
SensorReading("sesor_1",154771889,35.8),
SensorReading("sesor_10",154771889,16.8),
)
val stream1=env.fromCollection(dataList)//获取传感器数据
stream1.print()
env.execute() //env.execute 是启动Flink作业所必需的,只有在 execute () 被调用时,之前调用的各个操作才会在提交到集群上或本地计算机上执行(嗨呀,说白点就是启动流计算0.0)
输出结果:SensorReading(sesor_1,154771889,35.8)
SensorReading(sesor_2,154771889,35.8)
SensorReading(sesor_3,154771889,35.8)
SensorReading(sesor_1,154771889,35.8)
SensorReading(sesor_10,154771889,16.8)
二.用流式消费者读取本地测试文件
val inputPath="F:\Scala课堂\src\main\resources\温度测试文件" //本地文件路径
val stream2=env.readTextFile(inputPath)//获取文件路径
stream2.print()
输出结果:你文件里有啥就输出啥呗
三.读取外部随机数据
注意:有关随机类rand.nextGaussian()方法可参照下列同道中人的博客:搁这,搁这0.0(点前面那个蓝蓝的玩意)
val stream3=env.addSource(new MySensorSource)//传感器
stream3.print()
env.execute()//env.execute 是启动Flink作业所必需的,只有在 execute () 被调用时,之前调用的各个操作才会在提交到集群上或本地计算机上执行(嗨呀,说白点就是启动流计算0.0)
}
}
class MySensorSource() extends SourceFunction[SensorReading]{ //创建一个类叫MySensorSource,并继承SourceFunction类的所有属性
var running: Boolean = true
override def cancel():Unit=running=false //重载cancel方法
override def run(sourceContext:SourceFunction.SourceContext[SensorReading]):Unit={ //重载run方法
val rand=new Random()//产生随机数
var curTemp = 1.to(10).map(i => ("sensor_" + i, rand.nextDouble() * 100))//取1到10遍历输出sensor_+“i”,rand.nextDouble取随机数
while (running){ //从这里开始死循环
curTemp=curTemp.map(
data=>(data._1,data._2+rand.nextGaussian()))//data._1元组
val curTime=System.currentTimeMillis()//获取时间戳
curTemp.foreach(//foreach 用于列出集合里的所有元素(也就是遍历,那为啥不用map?. さな 知るか)
data=>sourceContext.collect(SensorReading(data._1,curTime,data._2))
)
Thread.sleep(500)//0.5秒停一次
}
}
}
输出结果:SensorReading(sensor_1,1648388279347,93.75563732606835)
SensorReading(sensor_2,1648388279347,12.783324672535295)
SensorReading(sensor_3,1648388279347,97.7441929445475)
SensorReading(sensor_4,1648388279347,68.96699184741615)
SensorReading(sensor_5,1648388279347,61.47358147904229)
SensorReading(sensor_6,1648388279347,24.80700220290054)
SensorReading(sensor_7,1648388279347,50.61305009703232)
SensorReading(sensor_8,1648388279347,86.61766537245579)
SensorReading(sensor_9,1648388279347,11.357894020799383)
SensorReading(sensor_10,1648388279347,68.83351832543248)//你不按挺会一直输出
输出结果:SensorReading(sesor_1,154771889,35.8)
SensorReading(sesor_2,154771889,35.8)
SensorReading(sesor_3,154771889,35.8)
SensorReading(sesor_1,154771889,35.8)
SensorReading(sesor_10,154771889,16.8)
val inputPath="F:\Scala课堂\src\main\resources\温度测试文件" //本地文件路径
val stream2=env.readTextFile(inputPath)//获取文件路径
stream2.print()
输出结果:你文件里有啥就输出啥呗
三.读取外部随机数据
注意:有关随机类rand.nextGaussian()方法可参照下列同道中人的博客:搁这,搁这0.0(点前面那个蓝蓝的玩意)
val stream3=env.addSource(new MySensorSource)//传感器
stream3.print()
env.execute()//env.execute 是启动Flink作业所必需的,只有在 execute () 被调用时,之前调用的各个操作才会在提交到集群上或本地计算机上执行(嗨呀,说白点就是启动流计算0.0)
}
}
class MySensorSource() extends SourceFunction[SensorReading]{ //创建一个类叫MySensorSource,并继承SourceFunction类的所有属性
var running: Boolean = true
override def cancel():Unit=running=false //重载cancel方法
override def run(sourceContext:SourceFunction.SourceContext[SensorReading]):Unit={ //重载run方法
val rand=new Random()//产生随机数
var curTemp = 1.to(10).map(i => ("sensor_" + i, rand.nextDouble() * 100))//取1到10遍历输出sensor_+“i”,rand.nextDouble取随机数
while (running){ //从这里开始死循环
curTemp=curTemp.map(
data=>(data._1,data._2+rand.nextGaussian()))//data._1元组
val curTime=System.currentTimeMillis()//获取时间戳
curTemp.foreach(//foreach 用于列出集合里的所有元素(也就是遍历,那为啥不用map?. さな 知るか)
data=>sourceContext.collect(SensorReading(data._1,curTime,data._2))
)
Thread.sleep(500)//0.5秒停一次
}
}
}
输出结果:SensorReading(sensor_1,1648388279347,93.75563732606835)
SensorReading(sensor_2,1648388279347,12.783324672535295)
SensorReading(sensor_3,1648388279347,97.7441929445475)
SensorReading(sensor_4,1648388279347,68.96699184741615)
SensorReading(sensor_5,1648388279347,61.47358147904229)
SensorReading(sensor_6,1648388279347,24.80700220290054)
SensorReading(sensor_7,1648388279347,50.61305009703232)
SensorReading(sensor_8,1648388279347,86.61766537245579)
SensorReading(sensor_9,1648388279347,11.357894020799383)
SensorReading(sensor_10,1648388279347,68.83351832543248)//你不按挺会一直输出
val stream3=env.addSource(new MySensorSource)//传感器
stream3.print()
env.execute()//env.execute 是启动Flink作业所必需的,只有在 execute () 被调用时,之前调用的各个操作才会在提交到集群上或本地计算机上执行(嗨呀,说白点就是启动流计算0.0)
}
}
class MySensorSource() extends SourceFunction[SensorReading]{ //创建一个类叫MySensorSource,并继承SourceFunction类的所有属性
var running: Boolean = true
override def cancel():Unit=running=false //重载cancel方法
override def run(sourceContext:SourceFunction.SourceContext[SensorReading]):Unit={ //重载run方法
val rand=new Random()//产生随机数
var curTemp = 1.to(10).map(i => ("sensor_" + i, rand.nextDouble() * 100))//取1到10遍历输出sensor_+“i”,rand.nextDouble取随机数
while (running){ //从这里开始死循环
curTemp=curTemp.map(
data=>(data._1,data._2+rand.nextGaussian()))//data._1元组
val curTime=System.currentTimeMillis()//获取时间戳
curTemp.foreach(//foreach 用于列出集合里的所有元素(也就是遍历,那为啥不用map?. さな 知るか)
data=>sourceContext.collect(SensorReading(data._1,curTime,data._2))
)
Thread.sleep(500)//0.5秒停一次
}
}
}
输出结果:SensorReading(sensor_1,1648388279347,93.75563732606835)
SensorReading(sensor_2,1648388279347,12.783324672535295)
SensorReading(sensor_3,1648388279347,97.7441929445475)
SensorReading(sensor_4,1648388279347,68.96699184741615)
SensorReading(sensor_5,1648388279347,61.47358147904229)
SensorReading(sensor_6,1648388279347,24.80700220290054)
SensorReading(sensor_7,1648388279347,50.61305009703232)
SensorReading(sensor_8,1648388279347,86.61766537245579)
SensorReading(sensor_9,1648388279347,11.357894020799383)
SensorReading(sensor_10,1648388279347,68.83351832543248)//你不按挺会一直输出
输出结果:SensorReading(sensor_1,1648388279347,93.75563732606835)
SensorReading(sensor_2,1648388279347,12.783324672535295)
SensorReading(sensor_3,1648388279347,97.7441929445475)
SensorReading(sensor_4,1648388279347,68.96699184741615)
SensorReading(sensor_5,1648388279347,61.47358147904229)
SensorReading(sensor_6,1648388279347,24.80700220290054)
SensorReading(sensor_7,1648388279347,50.61305009703232)
SensorReading(sensor_8,1648388279347,86.61766537245579)
SensorReading(sensor_9,1648388279347,11.357894020799383)
SensorReading(sensor_10,1648388279347,68.83351832543248)//你不按挺会一直输出



