UpdateStateBykey会统计全局的key的状态,不管有没有数据输入,它会在每一个批次间隔返回之前的key的状态。updateStateBykey会对已存在的key进行state的状态更新,同时还会对每个新出现的key执行相同的更新函数操作。如果通过更新函数对state更新后返回来为none,此时刻key对应的state状态会删除(state可以是任意类型的数据结构)。
适用场景:
UpdataStateBykey可以用来统计历史数据,每次输出所有的key值。列如统计不同时间段用户平均消费金额,消费次数,消费总额,网站的不同时间段的返回量等指标。
适用实例条件:
- 首先会以DStream中的数据进行按key做reduce操作,然后再对各个批次的数据进行累加。
- updataStateByKey要求必须设置checkpoint点(设置中间结果文件夹)
- updataStateByKey方法中updataFunc就要传入的参数,Seq[V]表示当前key对应的所有值,Option[S]是当前key的历史状态,返回的是新的封装的数据。
def main(args: Array[String]): Unit = {
// Logger.getLogger("org").setLevel(Level.WARN)
val conf = new SparkConf()
.setAppName("Test")
.setMaster("local[*]")
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("D:\mydata\checkpoint\test")
//必须转换成可迭代类型
val topics = Array(Test_topic)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> bootstrapServers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
// val pattern = new Regex("[0-9]+")
val pattern = "\d+".r
// val str = "44: this is a test_topic 44 kafka data"
// println((pattern findAllIn str).mkString(","))
val wordCountDS: DStream[(String, Int)] = kafkaDStream
.map(record => (pattern findAllIn record.value).next())
.map(w => (w, 1))
.reduceByKey((a, b) => a + b)
.updateStateByKey(update)
.transform(rdd => {
rdd.sortBy(t => t._2, ascending = false)
})
wordCountDS.print()
ssc.start()
ssc.awaitTermination()
}
//new_state=("hello",1),old_state=("hello",3)
//newValue=(1),oldstate=(3)
def update(newValue: Seq[Int], oldstate: Option[Int]): Option[Int] = {
val oldCount = oldstate.getOrElse(0) //oldCount(3)
println(s"updateStateByKey: ${System.currentTimeMillis()}")
Some(oldCount + newValue.sum) //newstage(4)
}
mapWithState(基于磁盘存储+缓存)
mapWithState也是用于对于全局统计key的状态,但是它如果没有数据输入,便不会返回之前的key的状态,类型于增量的感觉。
使用场景
mapWithState可以用于一些实时性较高,延迟较少的一些场景,例如你在某宝上下单买了个东西,付款之后返回你账户里余额信息。
适用实例条件:
- 如果有初始化的值得需要,可以使用initialState(RDD)来初始化key的值
- 还可以指定timeout函数,该函数的作用是,如果一个key超过timeout设定的时间没有更新值,那么这个key将会失效。这个控制需要在fun中实现,必须使用state.isTimingOut()来判断失效的key值。如果在失效时间之后,这个key又有新的值了,则会重新计算。如果没有使用isTimingOut,则会报错。3. checkpoint不会必须的
def main(args: Array[String]): Unit = {
//1、创建sparkConf
val sparkConf = new SparkConf().setAppName("scala wordcount").setMaster("local[*]")
//2、创建SparkContext
val sc = new SparkContext(sparkConf)
//3、创建StreamingContext
val ssc = new StreamingContext(sc, Seconds(5))
//4、设置checkpoint地址
ssc.checkpoint("D:\mydata\checkpoint\test")
//5、必须转换成可迭代类型
val topics = Array(Test_topic)
//6、消费者相关参数配置
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> bootstrapServers,
ConsumerConfig.GROUP_ID_ConFIG -> groupId,
ConsumerConfig.AUTO_OFFSET_RESET_ConFIG -> "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_ConFIG -> (false: java.lang.Boolean),
ConsumerConfig.KEY_DESERIALIZER_CLASS_ConFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_ConFIG -> classOf[StringDeserializer]
)
//7、通过KafkaUtils.createDirectStream对接kafka
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
//8、获取topic中的数据
val topicData: DStream[String] = kafkaDStream.map(_.value())
//9、切分每一行,每个单词计为1
val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_, 1))
//10、统计相同单词
val wordCount: DStream[(String, Int)] = wordAndOne.reduceByKey((a, b) => a + b)
//11、统计变更数据
val mapWithStateDStream: MapWithStateDStream[String, Int, Int, (String, Int)] = wordCount.mapWithState(StateSpec.function(mappingFunc))
//12、打印预览(MappedType)结果
//mapWithStateDStream.print()
//
mapWithStateDStream.foreachRDD(rdd => {
rdd.foreachPartition(p => {
//
p.foreach(line => {
println(s"结果: ${line}")
})
//
})
})
// //当前状态的键值快照
// val stateSnapshots: DStream[(String, Int)] = wordCount.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots()
//
// stateSnapshots.print()
//13、开启计算
ssc.start()
ssc.awaitTermination()
}
// word="hello",value=1,state=("hello",3)
val mappingFunc = (word: String, value: Option[Int], state: State[Int]) => {
val sum = value.getOrElse(0) + state.getOption.getOrElse(0) //sum = 4
// val output: String = s"${word} -> ${sum}" //("hello-> 4")
val output: (String, Int) = (word, sum) //("hello-> 4")
state.update(sum) //更新key="hello"的状态为4
println(s"mapWithState: ${System.currentTimeMillis()}")
output //返回("hello",4)
}
区别:
updataeStateByKey可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。
mapWithState只返回变化后的key的值,这样做的好处是,我们可以只关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key 的数据。这样的话,即使数据量很大,checkpint也不会updateBykey那样,占用太多的存储,效率比较高(再生产环境中建议使用这个)。
详细使用:https://www.jianshu.com/p/a54b142067e5
map和mapPartition的区别:map是对RDD的每一个元素使用一个方法操作,mapPartitions是对每个partition的迭代器使用一个方法操作。
MapPartitions的优点:使用MapPartitions操作之后,一个task仅仅会执行一次function,function一次接收所有的partition数据。只要执行一次就可以了,性能比较高。通常体现在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。SparkSql或Dataframe默认会对程序进行mapPartition的优化。
MapPartitions的缺点:如果是普通的map操作,一次function的执行就处理一条数据,可以将已经处理完的数据从内存里面释放掉。
所以说普通的map操作通常不会导致内存的OOM异常。 但是MapPartitions操作,对于大量数据来说,如果直接将迭代器中数据取出来放内存,可能就OOM(内存溢出)。
foreachPartition是spark-core的action算子,foreachPartition是对每个partition中的iterator分别处理,通过将iterator传入function对进行数据的处理,也就是说在foreachPartition中函数处理的是分区迭代器,而非具体的数据,源码中的注释是:Applies a function func to each parition of this RDD.(将函数func应用于此RDD的每个分区)
foreach:foreach也是spark-core的action算子,与foreachPartition类似的是,foreach也是对每个partition中的iterator分别处理,通过对每个iterator迭代获取数据传给function进行数据的处理,也就是说在foreach中函数处理的是具体的数据,源码中的注释是:Applies a function fun to all elements of this RDD.(将函数func用于此RDD的所有元素).
foreachRDD与上面两个的区别:foreachRDD是sparkStreaming的OutputOperation算子。但是foreachRDD并不会触发立即处理,必须在碰到sparkcore的foreach或者foreachPartition算子后,才会触发action动作。同时要注意,function的应用在的driver端进行,而不是Executor端进行。
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.getOrCreate()
//隐式转换(spark自适应)
import spark.implicits._
val sc = spark.sparkContext
val sourceRDD: RDD[String] = sc.parallelize(List[String](
"hello", "hello", "hello", "spark", "spark"),
2)
//groupBy指定键来标示一个数据,便于之后的聚合
val groupRDD: RDD[(String, Iterable[String])] = sourceRDD.groupBy(x => x)
val mapPartRDD = groupRDD.mapPartitions(iterable1 => {
iterable1.map {
case (word, words) => {
(word, words.size)
}
}
})
mapPartRDD.foreach(println)
val mapRDD = groupRDD.map {
case (word, iterable) => {
(word, iterable.size)
}
}
mapRDD.foreachPartition(iterable => {
iterable.foreach(println)
})
spark.stop()
}
GroupByKey和ReduceByKey的区别:
GroupByKey(numPartitions=None)
官方解释:将 RDD 中每个键的值分组为一个序列。 使用 numPartitions 分区对生成的 RDD 进行哈希分区。 注意:如果您分组是为了对每个键执行聚合(例如求和或平均值),则使用 reduceByKey 或 aggregateByKey 将提供更好的性能。
– GroupByKey 只是将相同键的给归纳到一个序列,没有其它函数操作 ReduceByKey(func, numPartitions=None)官方解释:使用关联归约函数合并每个键的值。 这也将在结果发送到reduce端之前在每个映射器上本地执行合并, 输出将使numPartitions 分区进行哈希分区,如果未指定 numPartitions,则使用默认并行级别。
– ReduceByKey shuffle前将键相同的值局部函数聚合,减轻shuffle时的io压力,shuffle后在各自分区继续函数聚合。 aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])官方解释:zeroValue是给每个分区的初始值(相当于对应键的一个初始元素),map端每个分区数据都会使用第一个函数seqOp进行聚合操作,
shuffle后分区使用第二个函数combOp进行操作,并且最终返回值类型与初始值类型一样(和原有数据类型可以不一致)。
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.getOrCreate()
//隐式转换(spark自适应)
import spark.implicits._
val sc = spark.sparkContext
val sourceRDD: RDD[String] = sc.parallelize(List[String](
"hello", "hello", "hello", "spark", "spark"),
2)
val wordPairRDD: RDD[(String, Int)] = sourceRDD.map((_, 1))
println("=" * 50)
//相同键放在同一分区,值合并为一个序列
val groupByRDD: RDD[(String, Iterable[Int])] = wordPairRDD.groupByKey()
groupByRDD.foreach(println)
println("=" * 50)
//对各键的序列进行聚合操作
groupByRDD.map(pair =>{
val iterablevalue: Iterable[Int] = pair._2
var sum = 0
iterablevalue.foreach(num => sum += num)
(pair._1,sum)
}).foreach(println)
println("=" * 50)
//直接使用函数在shuffle前聚合一遍,减轻IO压力,shuffle后各分区在聚合一遍
val reduceByKeyRDD = wordPairRDD.reduceByKey(_ + _)
reduceByKeyRDD.foreach(println)
println("=" * 50)
//查看数据所在分区
wordPairRDD.mapPartitionsWithIndex((index, ite) => {
ite.map((index, _))
}).foreach(println)
println("=" * 50)
//分区数量
println("partitions: " + wordPairRDD.partitions.length)
println("=" * 50)
//shuffle前分区的聚合函数
val sepOP = (a: String, b: Int) => {
a + "|" + b
}
//shuffle后分区的聚合函数
val combOP = (a: String, b: String) => {
a + "-Shufix-" + b
}
//柯里化func(x)(y)
//zeroValue给每个分区不同键的初始值
//可以根据初始值类型来返回最终聚合类型
val aggregateByKeyRDD: RDD[(String, String)] = wordPairRDD.aggregateByKey("Prefix")(sepOP, combOP)
aggregateByKeyRDD.foreach(println)
spark.stop()
}



