第一步首先需要导入meven包 这是我的pom文件 缺什么导什么吧没啥说的
4.0.0 org.example mydatahandler1.0-SNAPSHOT mydatahandler http://www.example.com UTF-8 1.8 1.8 junit junit4.11 test org.apache.kafka kafka_2.112.0.0 com.fasterxml.jackson.core *org.apache.kafka kafka-clients2.0.0 org.apache.spark spark-streaming_2.112.3.4 org.apache.spark spark-core_2.112.3.4 org.apache.spark spark-sql_2.112.3.4 com.fasterxml.jackson.core jackson-core2.6.6 org.apache.spark spark-streaming-kafka-0-10_2.112.3.4 org.apache.hbase hbase-client1.2.0 org.apache.hbase hbase1.2.0 org.apache.hbase hbase-common1.2.0 maven-clean-plugin 3.1.0 maven-resources-plugin 3.0.2 maven-compiler-plugin 3.8.0 maven-surefire-plugin 2.22.1 maven-jar-plugin 3.0.2 maven-install-plugin 2.5.2 maven-deploy-plugin 2.8.2 maven-site-plugin 3.7.1 maven-project-info-reports-plugin 3.0.0
首先实现父类特质类
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
trait DataHandler {
def streamTranform(stream:InputDStream[ConsumerRecord[String,String]]):DStream[String]
}
书写配置文件Params
import java.util
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
//为了后期方便修改在这的配置文件就不要写死了
class Params() extends Serializable {
// val kafkaIp: String = ip
//主机IP地址
var IP:String = ""
//输入输出键的构造器
var KEY_IN_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"
var VALUE_IN_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"
//ack应答机制 0: producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到还 没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;
//1: producer 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack,如果在 follower 同步成功之前 leader 故障,那么将会丢失数据;
//-1(all): producer 等待 broker 的 ack,partition 的 leader 和 follower(ISR中的follwer,不是左右的follower) 全部落盘成功后才 返回 ack。
//但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复
var ACKS = "all"
//延迟3秒弹窗
var RETRIES = "3"
//分组
var GROUPID = ""
//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生
//该分区下的数据
var AUTO_OFFSET = "earliest"
//因为第一次接收kafka数据,需要加载一堆基础数据,大概执行时间要8分钟,而5分钟后,kafka认为我
//消费,又重新发送,导致我这边收到许多重复数据,所以我需要调大这个值,避免接收重复数据
var MAX_POLL = "500"
var KEY_OUT_SERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"
var VALUE_OUT_SERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"
def getWriteKafkaParam()={
val hm = new util.HashMap[String,Object]()
hm.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,IP)
hm.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,KEY_IN_SERIALIZER)
hm.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,VALUE_IN_SERIALIZER)
hm.put( ProducerConfig.ACKS_CONFIG,ACKS)
hm.put(ProducerConfig.RETRIES_CONFIG,RETRIES)
hm
}
def getReadKafkaParam={
val hm = Map[String,Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->IP,
ConsumerConfig.GROUP_ID_CONFIG->GROUPID,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->AUTO_OFFSET,
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG->MAX_POLL,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->KEY_OUT_SERIALIZER,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->VALUE_OUT_SERIALIZER
)
hm
}
}
//构造方法
object Params {
def apply(): Params = new Params()
}
实现streaming流的读写操作
import com.fenbi.mydh.streamhandler.Params
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
class StreamController{
self:DataHandler=>
def job(ip:String,group:String,inTopic:String,outTopic:String) {
//1.读取kafka获取user_friend
val conf = new SparkConf().setMaster("local[*]").setAppName("stream_job")
val ssc = new StreamingContext(conf, Seconds(5))
//准备参数
val param = new Params()
param.IP = ip
param.GROUPID = group
//1.1 开启sparkstreaming 获得 StreamingContext
//1.2开启createDriectStream
val ds:InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String, String](
ssc,LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Seq(inTopic),param.getReadKafkaParam))
//2.将获取的user_friends_raw 数据转换为user_id
//将转换的数据送回到kafka新的消息队列user_friends
//3.1 开启kafkaProducer
//3.2 使用send 把数据填充到kafka
streamTranform(ds).foreachRDD(rdd=>{
rdd.foreachPartition(part=>{
val produce=new KafkaProducer[String,String](param.getWriteKafkaParam())
part.foreach{
case x:String=>{
val recode=new ProducerRecord[String,String](outTopic,null,x)
produce.send(recode)
}
}
})
})
ssc.start()
ssc.awaitTermination()
}
}
构造读取文件的子类 这里我读的是我的user_friends表 根据自己业务书写
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
trait UserFriend extends DataHandler {
def streamTranform(stream: InputDStream[ConsumerRecord[String, String]]): DStream[String] = {
stream.flatMap(record => {
val info = record.value().split(",", -1)
info(1).split(" ",-1).filter(x=>x.trim()!="").map(fid => info(0) + "," + fid)
})
}
}
第二个文件
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
trait EventAttendees extends DataHandler {
def streamTranform(stream: InputDStream[ConsumerRecord[String, String]]): DStream[String] = {
stream.flatMap(record=>{
val info =record.value().split(",",-1)
val yes = info(1).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",yes")
val maybe = info(1).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",maybe")
val invited = info(1).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",invited")
val no = info(1).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",no")
yes ++ maybe ++ invited ++ no
})
}
}
最后写一个测试方法测试一下
import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
object Test {
def main(args: Array[String]): Unit = {
// val ctrl =(new StreamController() with UserFriend )
// ctrl.job("192.168.80.181:9092","uf05","user_friends","friends")
val event=(new StreamController() with EventAttendees)
event.job("192.168.80.181:9092","ev01","event_attendeess","event_attendee")
}
}
打开虚拟机
启动kafka
这里要用到的两个文件我已经读好了至此 完成



