SparkStreamDemo
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("sparkdemo1")
//定义流处理文件,采集周期为3秒
val streamingContext = new StreamingContext(conf,Seconds(3))
//数据源
val socketLineStream: ReceiverInputDStream[String] =
streamingContext.socketTextStream("192.168.111.131",7777)
//处理每三秒钟采集到的数据
val wordStream: DStream[String] = socketLineStream.flatMap(line=>line.split("\s+"))
val wordCountStream: DStream[(String, Int)] = wordStream.map(x=>(x,1)).reduceByKey(_+_)
//打印输出
wordCountStream.print()
//启动采集器
streamingContext.start()
streamingContext.awaitTermination()
}
}
SparkStreamKafkaSource
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamKafkaSource {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("sparkstreamkafkademo").setMaster("local[*]")
val streamingContext = new StreamingContext(conf, Seconds(5))
streamingContext.checkpoint("checkpoint")
val kafkaParams: Map[String, String] = Map(
(ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> "192.168.111.131:9092"),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_ConFIG -> " org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_ConFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup1")
)
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"), kafkaParams)
)
// //无状态
// val value: DStream[(String, Int)] =
// kafkaStream.flatMap(x => x.value().split("\s+")).map(x=>(x,1)).reduceByKey(_+_)
// value.print()
//有状态 保留之前数据
val value: DStream[(String, Int)] = kafkaStream.flatMap(x => x.value().split("\s+")).map(x => (x, 1)).updateStateByKey {
case (seq, buffer) => {
println(seq.toList.toString())
println(buffer.getOrElse(0).toString)
val sum = buffer.getOrElse(0) + seq.sum
Option(sum)
}
}
value.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
SparkStreamKafkaSourceToKafkaSink
import java.util
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object SparkStreamKafkaSourceToKafkaSink {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("sparkstreamkafkademo").setMaster("local[*]")
val streamingContext = new StreamingContext(conf, Seconds(5))
streamingContext.checkpoint("checkpoint")
val kafkaParams: Map[String, String] = Map(
(ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> "192.168.111.131:9092"),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_ConFIG -> " org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_ConFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup1")
)
val kafkaStream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(Set("mystreamin"), kafkaParams)
)
println("--------------a------------------")
kafkaStream.foreachRDD(
rdd=>{
println("----------------b--------------------")
// rdd.foreach(y=>{
// println("--------------c--------------------")
// val words: Array[String] = y.value().split("\s+") //y.value() hello java // hello 1 java 1
//
// val props: util.HashMap[String, Object] = new util.HashMap[String,Object]()
// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.111.131:9092")
// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
//
// //ack
//
// val producer: KafkaProducer[String, String] = new KafkaProducer[String,String](props)
//
// for(word <- words ){
// val record: ProducerRecord[String, String] = new ProducerRecord[String,String]("mystreamout",word+",1")
// producer.send(record)
//
// }
// })
rdd.foreachPartition(
rdds=>{
println("--------------------------c----------------------")
val props: util.HashMap[String, Object] = new util.HashMap[String,Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.111.131:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)
rdds.foreach(
y=>{
println("------------d-------------------")
val words: Array[String] = y.value().split("\s+")
for(word <- words ){
val record: ProducerRecord[String, String] = new ProducerRecord[String,String]("mystreamout",word+",1")
producer.send(record)
}
}
)
}
)
}
)
streamingContext.start()
streamingContext.awaitTermination()
}
}
SparkStreamUserFriendRowToUserFriendSpark
import java.util
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamUserFriendRowToUserFriendSpark {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkdemo1")
val sc: StreamingContext = new StreamingContext(conf,Seconds(3))
sc.checkpoint("checkpoint")
val kafkaParams = Map(
(ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> "192.168.111.131:9092"),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_ConFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_ConFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.GROUP_ID_ConFIG -> "userRowtouserSpark1"),
(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"earliest")
)
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
sc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(Set("user_friends_rows"), kafkaParams)
)
kafkaStream.foreachRDD(
rdd=>{ //每个分区传一个算法
rdd.foreachPartition (
x => {
val props = new util.HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.111.131:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
//1111,2222 3333 4444 5555
val producer = new KafkaProducer[String, String](props)
x.foreach(y => {
val split: Array[String] = y.value().split(",")
if (split.length == 2) {
val userId = split(0)
val friends: Array[String] = split(1).split("\s+")
for (friend <- friends) {
val content = userId + "," + friend
println(content)
producer.send(new ProducerRecord[String, String]("user_friends_spark", content))
}
}
})
}
)
}
)
sc.start()
sc.awaitTermination()
}
}
SparkStreamEventAttendRowToEventAttendSpark
import java.util
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamEventAttendRowToEventAttendSpark {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkdemo3")
val sc: StreamingContext = new StreamingContext(conf,Seconds(3))
sc.checkpoint("checkpoint")
val kafkaParams = Map(
(ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> "192.168.111.131:9092"),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_ConFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_ConFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.GROUP_ID_ConFIG -> "EventAttendRowToEventAttendSpark1"),
(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"earliest")
)
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
sc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(Set("event_attendess_row"), kafkaParams)
)
kafkaStream.foreachRDD(
rdd=>{ //每个分区传一个算法
rdd.foreachPartition (
x => {
val props = new util.HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.111.131:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
//user ,yes maybe,invited,no
val producer = new KafkaProducer[String, String](props)
x.foreach(y => {
val fields: Array[String] = y.value().split(",")
val userId:String=fields(0)
for(i <- 1 until fields.length)
if(fields(i).trim.length>0){
val status: Array[String] = fields(i).trim.split("\s+")
for(value<-status){
var tag:String=""
if(i==1) tag="yes"
else if (i==2) tag="maybe"
else if (i==3) tag="invited"
else if(i==4) tag="no"
val content=userId+","+value+","+tag
println(content)
producer.send(new ProducerRecord[String,String]("event_attend_spark",content))
}
}
})
}
)
}
)
sc.start()
sc.awaitTermination()
}
}