栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

streaming流实现kafka读写文件

streaming流实现kafka读写文件

第一步首先需要导入meven包 这是我的pom文件  缺什么导什么吧没啥说的




  4.0.0

  org.example
  mydatahandler
  1.0-SNAPSHOT

  mydatahandler
  
  http://www.example.com

  
    UTF-8
    1.8
    1.8
  

  
    
      junit
      junit
      4.11
      test
    
    
      org.apache.kafka
      kafka_2.11
      2.0.0
      
        
          com.fasterxml.jackson.core
          *
        
      
    
    
      org.apache.kafka
      kafka-clients
      2.0.0
    
    
      org.apache.spark
      spark-streaming_2.11
      2.3.4
    
    
      org.apache.spark
      spark-core_2.11
      2.3.4
    
    
      org.apache.spark
      spark-sql_2.11
      2.3.4
    
    
      com.fasterxml.jackson.core
      jackson-core
      2.6.6
    
    
      org.apache.spark
      spark-streaming-kafka-0-10_2.11
      2.3.4
    
    
      org.apache.hbase
      hbase-client
      1.2.0
    
    
      org.apache.hbase
      hbase
      1.2.0
    
    
      org.apache.hbase
      hbase-common
      1.2.0
    
  

  
    
      
        
        
          maven-clean-plugin
          3.1.0
        
        
        
          maven-resources-plugin
          3.0.2
        
        
          maven-compiler-plugin
          3.8.0
        
        
          maven-surefire-plugin
          2.22.1
        
        
          maven-jar-plugin
          3.0.2
        
        
          maven-install-plugin
          2.5.2
        
        
          maven-deploy-plugin
          2.8.2
        
        
        
          maven-site-plugin
          3.7.1
        
        
          maven-project-info-reports-plugin
          3.0.0
        
      
    
  

首先实现父类特质类

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}

trait DataHandler {
  def streamTranform(stream:InputDStream[ConsumerRecord[String,String]]):DStream[String]
}

 书写配置文件Params

import java.util

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
//为了后期方便修改在这的配置文件就不要写死了
class Params() extends Serializable {
  //  val kafkaIp: String = ip
//主机IP地址
  var IP:String = ""
//输入输出键的构造器
  var KEY_IN_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"
  var VALUE_IN_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"
//ack应答机制 0:  producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到还 没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;
//1:  producer 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack,如果在 follower 同步成功之前 leader 故障,那么将会丢失数据;
//-1(all):  producer 等待 broker 的 ack,partition 的 leader 和 follower(ISR中的follwer,不是左右的follower) 全部落盘成功后才 返回 ack。
//但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复
  var ACKS = "all"
//延迟3秒弹窗
  var RETRIES = "3"
//分组
  var GROUPID = ""
//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生
//该分区下的数据
  var AUTO_OFFSET = "earliest"
//因为第一次接收kafka数据,需要加载一堆基础数据,大概执行时间要8分钟,而5分钟后,kafka认为我
//消费,又重新发送,导致我这边收到许多重复数据,所以我需要调大这个值,避免接收重复数据
  var MAX_POLL = "500"
  var KEY_OUT_SERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"
  var VALUE_OUT_SERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"

  def getWriteKafkaParam()={
    val hm = new util.HashMap[String,Object]()
    hm.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,IP)
    hm.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,KEY_IN_SERIALIZER)
    hm.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,VALUE_IN_SERIALIZER)
    hm.put( ProducerConfig.ACKS_CONFIG,ACKS)
    hm.put(ProducerConfig.RETRIES_CONFIG,RETRIES)
    hm
  }

  def getReadKafkaParam={
    val hm = Map[String,Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->IP,
      ConsumerConfig.GROUP_ID_CONFIG->GROUPID,
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->AUTO_OFFSET,
      ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG->MAX_POLL,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->KEY_OUT_SERIALIZER,
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->VALUE_OUT_SERIALIZER
    )
    hm
  }
}
//构造方法
object Params {
  def apply(): Params = new Params()
}

实现streaming流的读写操作

import com.fenbi.mydh.streamhandler.Params
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

class StreamController{
       self:DataHandler=>
      def job(ip:String,group:String,inTopic:String,outTopic:String) {
        //1.读取kafka获取user_friend
        val conf = new SparkConf().setMaster("local[*]").setAppName("stream_job")
        val ssc = new StreamingContext(conf, Seconds(5))
        //准备参数
        val param = new Params()
        param.IP = ip
        param.GROUPID = group
        //1.1 开启sparkstreaming 获得 StreamingContext
        //1.2开启createDriectStream
        val ds:InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String, String](
          ssc,LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](Seq(inTopic),param.getReadKafkaParam))

        //2.将获取的user_friends_raw 数据转换为user_id
        //将转换的数据送回到kafka新的消息队列user_friends
        //3.1 开启kafkaProducer
        //3.2 使用send 把数据填充到kafka
        streamTranform(ds).foreachRDD(rdd=>{
          rdd.foreachPartition(part=>{
            val produce=new KafkaProducer[String,String](param.getWriteKafkaParam())
            part.foreach{
              case x:String=>{
                val recode=new ProducerRecord[String,String](outTopic,null,x)
                produce.send(recode)
              }
            }
          })
        })
        ssc.start()
        ssc.awaitTermination()
      }
}

 构造读取文件的子类 这里我读的是我的user_friends表 根据自己业务书写

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}

trait UserFriend extends DataHandler {
  def streamTranform(stream: InputDStream[ConsumerRecord[String, String]]): DStream[String] = {
    stream.flatMap(record => {
      val info = record.value().split(",", -1)
      info(1).split(" ",-1).filter(x=>x.trim()!="").map(fid => info(0) + "," + fid)
    })
  }
}

第二个文件

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}

trait EventAttendees extends DataHandler {
  def streamTranform(stream: InputDStream[ConsumerRecord[String, String]]): DStream[String] = {
    stream.flatMap(record=>{
      val info =record.value().split(",",-1)
      val yes = info(1).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",yes")
      val maybe = info(1).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",maybe")
      val invited = info(1).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",invited")
      val no = info(1).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",no")
      yes ++ maybe ++ invited ++ no
    })
  }
}

最后写一个测试方法测试一下

import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}

object Test {
  def main(args: Array[String]): Unit = {
//   val ctrl =(new StreamController() with UserFriend )
//     ctrl.job("192.168.80.181:9092","uf05","user_friends","friends")
    val event=(new StreamController() with EventAttendees)
    event.job("192.168.80.181:9092","ev01","event_attendeess","event_attendee")
  }
}

打开虚拟机

启动kafka

 这里要用到的两个文件我已经读好了至此 完成

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/460420.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号