开启kafka
kafka-server-start.sh /opt/soft/kafka200/config/server.properties
查看主题
kafka-topics.sh --zookeeper 192.168.100.155:2181 --list
创建主题
①Event_Attendeeskafka-topics.sh --create --topic event_attendees_raw --zookeeper 192.168.100.155:2181 --replication-factor 1 --partitions 1
flume命令 导入数据
(event_attendees_raw)conf文件
event_attendees_raw.channels = c1 event_attendees_raw.sources = s1 event_attendees_raw.sinks = k1 event_attendees_raw.sources.s1.type = spooldir event_attendees_raw.sources.s1.spoolDir = /opt/mydata/event_attendees/ event_attendees_raw.sources.s1.deserializer.maxLineLength=120000 event_attendees_raw.sources.s1.interceptors= i1 event_attendees_raw.sources.s1.interceptors.i1.type=regex_filter event_attendees_raw.sources.s1.interceptors.i1.regex=event.* event_attendees_raw.sources.s1.interceptors.i1.excludeEvents=true event_attendees_raw.channels.c1.type = memory event_attendees_raw.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink event_attendees_raw.sinks.k1.kafka.topic = event_attendees_raw event_attendees_raw.sinks.k1.kafka.bootstrap.servers = 192.168.100.155:9092 event_attendees_raw.sinks.k1.channel = c1 event_attendees_raw.sources.s1.channels =c1
导入数据
flume-ng agent -n event_attendees_raw -f /opt/fconf/event_attendees_raw.conf
查看数据条数(event_attendees_raw:0:24144)
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.100.155:9092 --topic event_attendees_raw --time -1
删除kafka topic
kafka-topics.sh --delete --zookeeper 192.168.100.155:2181 --topic event_attendees_raw②Event
创建主题
kafka-topics.sh --create --topic events --zookeeper 192.168.100.155:2181 --replication-factor 1 --partitions 1
event.conf
event.channels = c1 event.sources = s1 event.sinks = k1 event.sources.s1.type = spooldir event.sources.s1.spoolDir = /opt/mydata/events/ event.sources.s1.interceptors= i1 event.sources.s1.interceptors.i1.type=regex_filter event.sources.s1.interceptors.i1.regex=event_id.* event.sources.s1.interceptors.i1.excludeEvents=true event.channels.c1.type = memory event.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink event.sinks.k1.kafka.topic = events event.sinks.k1.kafka.bootstrap.servers = 192.168.100.155:9092 event.sinks.k1.channel = c1 event.sources.s1.channels =c1
flume导入数据
flume-ng agent -n event -f /opt/fconf/event.conf
查看数据量(events:0:3137972)
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.100.155:9092 --topic events --time -1③ users
kafka-topics.sh --create --topic users --zookeeper 192.168.100.155:2181 --replication-factor 1 --partitions 1
flume-ng agent --name user -f /opt/fconf/users.conf
users.conf
user.channels = c1 user.sources = s1 user.sinks = k1 user.sources.s1.type = spooldir user.sources.s1.spoolDir = /opt/mydata/users/ user.sources.s1.interceptors=i1 user.sources.s1.interceptors.i1.type=regex_filter user.sources.s1.interceptors.i1.regex=user.* user.sources.s1.interceptors.i1.excludeEvents=true user.channels.c1.type = memory user.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink user.sinks.k1.kafka.topic = users user.sinks.k1.kafka.bootstrap.servers = 192.168.100.155:9092 user.sinks.k1.channel = c1 user.sources.s1.channels =c1
查看数据量(users:0:38209)
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.100.155:9092 --topic users --time -1④user_friends
kafka-topics.sh --create --topic user_friends_raw --zookeeper 192.168.100.155:2181 --replication-factor 1 --partitions 1
flume-ng agent -n uf -f /opt/fconf/uf.conf
uf.conf
uf.channels = c1 uf.sources = s1 uf.sinks = k1 uf.sources.s1.type = spooldir uf.sources.s1.spoolDir = /opt/mydata/user_friends/ uf.sources.s1.deserializer.maxLineLength=60000 uf.sources.s1.interceptors= i1 uf.sources.s1.interceptors.i1.type=regex_filter uf.sources.s1.interceptors.i1.regex=user.* uf.sources.s1.interceptors.i1.excludeEvents=true uf.channels.c1.type = memory uf.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink uf.sinks.k1.kafka.topic = user_friends_raw uf.sinks.k1.kafka.bootstrap.servers = 192.168.100.155:9092 uf.sinks.k1.channel = c1 uf.sources.s1.channels =c1
查看数据量(user_friends_raw:0:38202)
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.100.155:9092 --topic user_friends_raw --time -1⑤train
kafka-topics.sh --create --topic train --zookeeper 192.168.100.155:2181 --replication-factor 1 --partitions 1
train.conf
train.channels = c1 train.sources = s1 train.sinks = k1 train.sources.s1.type = spooldir train.sources.s1.spoolDir = /opt/mydata/train/ train.sources.s1.deserializer.maxLineLength=60000 train.sources.s1.interceptors= i1 train.sources.s1.interceptors.i1.type=regex_filter train.sources.s1.interceptors.i1.regex=user.* train.sources.s1.interceptors.i1.excludeEvents=true train.channels.c1.type = memory train.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink train.sinks.k1.kafka.topic = train train.sinks.k1.kafka.bootstrap.servers = 192.168.100.155:9092 train.sinks.k1.channel = c1 train.sources.s1.channels =c1
flume-ng agent -n train -f /opt/fconf/train.conf
查看数据量 (train:0:15398)
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.100.155:9092 --topic train --time -1⑥test
kafka-topics.sh --create --topic test --zookeeper 192.168.100.155:2181 --replication-factor 1 --partitions 1
test.conf
test.channels = c1 test.sources = s1 test.sinks = k1 test.sources.s1.type = spooldir test.sources.s1.spoolDir = /opt/mydata/test/ test.sources.s1.interceptors=i1 test.sources.s1.interceptors.i1.type=regex_filter test.sources.s1.interceptors.i1.regex=user,.* test.sources.s1.interceptors.i1.excludeEvents=true test.channels.c1.type = memory test.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink test.sinks.k1.kafka.topic = test test.sinks.k1.kafka.bootstrap.servers = 192.168.100.155:9092 test.sinks.k1.channel = c1 test.sources.s1.channels =c1
flume-ng agent -n test -f /opt/fconf/test.conf
查看数据量(test:0:10237
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.100.155:9092 --topic test --time -1使用spark-Streaming读取kafka的topic,然后通过SparkStreaming的RDD算子可以对表进行数据处理
pom文件
org.apache.kafka kafka_2.112.0.0 com.fasterxml.jackson.core *org.apache.kafka kafka-clients2.0.0 org.apache.spark spark-streaming_2.112.3.4 org.apache.spark spark-core_2.112.3.4 org.apache.spark spark-sql_2.112.3.4 com.fasterxml.jackson.core jackson-core2.6.6 org.apache.spark spark-streaming-kafka-0-10_2.112.3.4 org.apache.hbase hbase-client1.2.0 org.apache.hbase hbase-common1.2.0
①userFriend(user_friends:0:30386403
package com.nj.mydh.streamhandler.impl
import com.njbdqn.mydh.streamhandler.DataHandler
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
trait UserFriend extends DataHandler {
override def streamTranform(stream: InputDStream[ConsumerRecord[String, String]]): DStream[String] = {
stream.flatMap(record=>{
val info = record.value().split(",",-1)
info(1).split(" ").filter(x=>x.trim!="").map(fid=>info(0)+","+fid)
})
}
}
②event_attendees(event_attendees:0:11245010)
package com.nj.mydh.streamhandler.impl
import com.njbdqn.mydh.streamhandler.DataHandler
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
trait EventAttendees extends DataHandler {
override def streamTranform(stream: InputDStream[ConsumerRecord[String, String]]): DStream[String] = {
stream.flatMap(record=>{
val info = record.value().split(",",-1)
val yes = info(1).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",yes")
val maybe = info(2).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",maybe")
val invited = info(3).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",invited")
val no = info(4).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",no")
yes ++ maybe ++ invited ++ no
})
}
}
package com.nj.mydh.streamhandler
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream._
trait DataHandler {
def streamTranform(stream:InputDStream[ConsumerRecord[String,String]]):DStream[String]
}
package com.nj.mydh.streamhandler
import java.util
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
class Params extends Serializable {
var IP:String = ""
var KEY_IN_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"
var VALUE_IN_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"
var ACKS = "all"
var RETRIES = "3"
var GROUPID = ""
var AUTO_OFFSET = "earliest"
var MAX_POLL = "500"
var KEY_OUT_SERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"
var VALUE_OUT_SERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"
def getWriteKafkaParam() ={
val hm = new util.HashMap[String,Object]()
hm.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,IP)
hm.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,KEY_IN_SERIALIZER)
hm.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,VALUE_IN_SERIALIZER)
hm.put(ProducerConfig.ACKS_CONFIG,ACKS)
hm.put(ProducerConfig.RETRIES_CONFIG,RETRIES)
hm
}
def getReadKafkaParam() ={
val hm = new util.HashMap[String,Object]()
hm.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,IP)
hm.put(ConsumerConfig.GROUP_ID_CONFIG,GROUPID)
hm.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,AUTO_OFFSET)
hm.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,MAX_POLL)
hm.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,KEY_OUT_SERIALIZER)
hm.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,VALUE_OUT_SERIALIZER)
hm
}
}
package com.nj.mydh.streamhandler
import java.util
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
class StreamController {
self:DataHandler=>
def job(ip:String,group:String,inTopic:String,outTopic:String) = {
//1.读取kafka获取user_friends_raw
//1.1 开启sparkstreaming 获得StreamingContext
//1.2 再开启createDriectStream读取文件
val conf = new SparkConf().setMaster("local[*]").setAppName("stream_job")
val ssc = new StreamingContext(conf,Seconds(5))
//准备参数
val param = new Params()
param.IP=ip
param.GROUPID=group
val set = new util.HashSet[String]()
set.add(inTopic)
val ds:InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String]
(set, param.getReadKafkaParam()))
//2.将获取的user_friends_raw数据转换为 user_id,friend_id
streamTranform(ds).foreachRDD(rdd=>{
//3.将转换后的数据送回到kafka新的消息队列 user_friends
//3.1 开启KafkaProducer
//3.2 使用send 把数据填充回kafka
rdd.foreachPartition(part=>{
val produce = new KafkaProducer[String,String](param.getWriteKafkaParam())
part.foreach{
case x:String=>{
val recode = new ProducerRecord[String,String](outTopic,null,x);
produce.send(recode)
}
}
})
})
ssc.start()
ssc.awaitTermination()
}
}
package com.njbdqn.mydh.streamhandler
import com.nj.mydh.streamhandler.impl.{EventAttendees, UserFriend}
import org.apache.kafka.common.serialization.IntegerSerializer
import org.apache.spark.{SparkConf, SparkContext}
object Test {
def main(args: Array[String]): Unit = {
// val ctrl = (new StreamController() with UserFriend)
// ctrl.job("192.168.100.155:9092","uf03"
// ,"user_friends_raw","user_friends")
val event=(new StreamController() with EventAttendees)
event.job("192.168.100.155:9092"
,"ev01","event_attendees_raw",
"event_attendees")
}
}
读取Kafka通过Hbase的JavaAPI写入到Hbase中
创建hbase表
hbase(main):001:0> create_namespace 'prac' hbase(main):001:0> create 'prac:hb_eventAttendees','base' hbase(main):001:0> create 'prac:hb_users','base' hbase(main):001:0> create 'prac:hb_train','base' hbase(main):001:0> create 'prac:hb_userFriends','base' hbase(main):001:0> create 'prac:hb_events','base','other' # 进入hbase的bin目录 计数: cd /opt/software/hbase/bin ./hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'prac:hb_eventAttendees' ./hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'prac:hb_users' ./hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'prac:hb_train' ./hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'prac:hb_userFriends' ./hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'prac:hb_events' # 得到结果如下:该数据和kafka里的行数相匹配,只有event_attendees_hb表因为hbase自动去重掉两条,结果全部正确 intes:event_attendees_hb:ROWS=11245008 intes:users_hb:ROWS=38209 intes:train_hb:ROWS=15220 intes:user_friends_hb:ROWS=30386387 intes:events_hb:ROWS=3137972
package com.nj.mydh.kafkatohbase.impl
import com.nj.mydh.kafkatohbase.DataHandler
import org.apache.hadoop.hbase.client.Put
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords}
import org.apache.kafka.common.TopicPartition
trait EventAttendeesImpl extends DataHandler{
override def transform(topic:String,records: ConsumerRecords[String, String]): Array[Put] = {
val topicPartition = new TopicPartition(topic,0)
records.records(topicPartition).toArray.map(r=>{
val rec = r.asInstanceOf[ConsumerRecord[String, String]]
val info = rec.value().split(",")//123123,234234,yes
val put = new Put((info(0)+""+info(1)+info(2)).getBytes)
put.addColumn("base".getBytes(),"user_id".getBytes(),info(0).getBytes)
put.addColumn("base".getBytes(),"friend_id".getBytes(),info(1).getBytes)
put.addColumn("base".getBytes(),"attend_type".getBytes(),info(2).getBytes)
put
})
}
}
package com.nj.mydh.kafkatohbase.impl
import com.nj.mydh.kafkatohbase.DataHandler
import org.apache.hadoop.hbase.client.Put
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords}
import org.apache.kafka.common.TopicPartition
trait EventsImpl extends DataHandler{
override def transform(topic:String,records: ConsumerRecords[String, String]): Array[Put] = {
val topicPartition = new TopicPartition(topic,0)
records.records(topicPartition).toArray.map(r=>{
val rec = r.asInstanceOf[ConsumerRecord[String, String]]
val info = rec.value().split(",")//event_id,user_id,start_time,city,state,zip,country,lat,lng
val put = new Put(info(0).getBytes)
put.addColumn("base".getBytes(),"event_id".getBytes(),info(0).getBytes)
put.addColumn("base".getBytes(),"user_id".getBytes(),info(1).getBytes)
put.addColumn("base".getBytes(),"start_time".getBytes(),info(2).getBytes)
put.addColumn("base".getBytes(),"city".getBytes(),info(3).getBytes)
put.addColumn("base".getBytes(),"state".getBytes(),info(4).getBytes)
put.addColumn("base".getBytes(),"zip".getBytes(),info(5).getBytes)
put.addColumn("base".getBytes(),"country".getBytes(),info(6).getBytes)
put.addColumn("base".getBytes(),"lat".getBytes(),info(7).getBytes)
put.addColumn("base".getBytes(),"lng".getBytes(),info(8).getBytes)
var other=""
for (i <- 9 until info.length) {
other+=info(i)+" "
}
put.addColumn("base".getBytes(),"other".getBytes(),other.trim.getBytes)
put
})
}
}
package com.nj.mydh.kafkatohbase.impl
import com.nj.mydh.kafkatohbase.DataHandler
import org.apache.hadoop.hbase.client.Put
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords}
import org.apache.kafka.common.TopicPartition
trait TrainImpl extends DataHandler{
override def transform(topic:String,records: ConsumerRecords[String, String]): Array[Put] = {
val topicPartition = new TopicPartition(topic, 0)
records.records(topicPartition).toArray.map(r => {
val rec = r.asInstanceOf[ConsumerRecord[String, String]]
val info = rec.value().split(",")
//user,event,invited,timestamp,interested,not_interested
(info(0),info(1),info(2),info(3),info(4),info(5))
}).sortBy(_._4).map(x=>{
val put = new Put((x._1+""+x._2).getBytes)
put.addColumn("base".getBytes(),"user_id".getBytes(),x._1.getBytes())
put.addColumn("base".getBytes(),"event_id".getBytes(),x._2.getBytes())
put.addColumn("base".getBytes(),"invited".getBytes(),x._3.getBytes())
put.addColumn("base".getBytes(),"timestamp".getBytes(),x._4.getBytes())
put.addColumn("base".getBytes(),"interested".getBytes(),x._5.getBytes())
put.addColumn("base".getBytes(),"not_interested".getBytes(),x._6.getBytes())
put
})
}
}
package com.nj.mydh.kafkatohbase.impl
import com.nj.mydh.kafkatohbase.DataHandler
import org.apache.hadoop.hbase.client.Put
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords}
import org.apache.kafka.common.TopicPartition
trait UserFriendImpl extends DataHandler{
override def transform(topic:String,records: ConsumerRecords[String, String]): Array[Put] = {
val topicPartition = new TopicPartition(topic,0)
records.records(topicPartition).toArray.map(r=>{
val rec = r.asInstanceOf[ConsumerRecord[String, String]]
val info = rec.value().split(",")//123123,234234
val put = new Put((info(0)+""+info(1)).getBytes)
put.addColumn("base".getBytes(),"user_id".getBytes(),info(0).getBytes)
put.addColumn("base".getBytes(),"friend_id".getBytes(),info(1).getBytes)
put
})
}
}
package com.nj.mydh.kafkatohbase.impl
import com.nj.mydh.kafkatohbase.DataHandler
import org.apache.hadoop.hbase.client.Put
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords}
import org.apache.kafka.common.TopicPartition
trait UsersImpl extends DataHandler{
override def transform(topic:String,records: ConsumerRecords[String, String]): Array[Put] = {
val topicPartition = new TopicPartition(topic,0)
records.records(topicPartition).toArray.map(r=>{
val rec = r.asInstanceOf[ConsumerRecord[String, String]]
val info = rec.value().split(",",-1)//user_id,locale,birthyear,gender,joinedAt,location,timezone
val put = new Put(info(0).getBytes)
put.addColumn("base".getBytes(),"user_id".getBytes(),info(0).getBytes)
put.addColumn("base".getBytes(),"locale".getBytes(),info(1).getBytes)
put.addColumn("base".getBytes(),"birthyear".getBytes(),info(2).getBytes)
put.addColumn("base".getBytes(),"gender".getBytes(),info(3).getBytes)
put.addColumn("base".getBytes(),"joinedAt".getBytes(),info(4).getBytes)
put.addColumn("base".getBytes(),"location".getBytes(),info(5).getBytes)
put.addColumn("base".getBytes(),"timezone".getBytes(),info(6).getBytes)
put
})
}
}
package com.nj.mydh.kafkatohbase
import org.apache.hadoop.hbase.client.Put
import org.apache.kafka.clients.consumer.ConsumerRecords
trait DataHandler {
def transform(topic:String,records:ConsumerRecords[String,String]):Array[Put]
}
package com.nj.mydh.kafkatohbase
import java.time.Duration
import java.util
import com.nj.mydh.streamhandler.Params
import org.apache.hadoop.hbase.{HbaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
class ReadKafkaToHbase {
self:DataHandler=>
def job(ip:String,group:String,topic:String,hbaseTable:String) ={
val hbaseconf = HbaseConfiguration.create()
hbaseconf.set("hbase.zookeeper.quorum",ip+":2181")
val conn = ConnectionFactory.createConnection(hbaseconf)
//读取kafka消息
val param = new Params()
param.IP=ip+":9092"
param.GROUPID=group
val consumer = new KafkaConsumer[String, String](param.getReadKafkaParam())
consumer.subscribe(util.Arrays.asList(topic))
while(true){
val rds:ConsumerRecords[String,String] = consumer.poll(Duration.ofSeconds(3))
//转换数据获得Array[Put]
val puts:Array[Put] = transform(topic,rds)
//将scalaArray[Put]转为java ArrayList[Put]
val lst = new util.ArrayList[Put]()
puts.foreach(p=>lst.add(p))
//将数据发送到hbase
//找到表
val table = conn.getTable(TableName.valueOf(hbaseTable))
table.put(lst)
println("循环。。。。。。。。。。")
}
}
}
package com.nj.mydh.kafkatohbase
import java.util
import com.nj.mydh.kafkatohbase.impl.{EventAttendeesImpl, EventsImpl, TrainImpl, UserFriendImpl, UsersImpl}
object Test1 {
def main(args: Array[String]): Unit = {
// (new ReadKafkaToHbase() with EventAttendeesImpl)
// .job("192.168.100.155","ea05",
// "event_attendees","prac:hb_eventAttendees")
// (new ReadKafkaToHbase() with EventsImpl)
// .job("192.168.100.155","et01",
// "events","exp:hbase_events") //3137972
(new ReadKafkaToHbase() with UsersImpl)
.job("192.168.100.155","us01",
"users","prac:hb_users") //38209
// (new ReadKafkaToHbase() with UserFriendImpl)
// .job("192.168.100.155","uff01",
// "user_friends","prac:hb_userFriends")
// (new ReadKafkaToHbase() with TrainImpl)
// .job("192.168.100.155","tr01",
// "train","exp:hbase_train")
}
}



