栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

11.4.4、kafka

11.4.4、kafka

1、Java代码实现生产kafka数据

(1)导包


    org.apache.kafka
    kafka_2.11
    1.0.0



    org.scala-lang
    scala-library
    2.11.12



    org.scala-lang
    scala-compiler
    2.11.12



    org.scala-lang
    scala-reflect
    2.11.12

(2)代码实现

—注意对key进行序列化
—kafka的数据以key-value形式存储类似于文件中
—生产者通过send发送数据
—flush,将数据刷到kafka,不刷的话就是批次,我们的数据一条数据不够

object Demo01KafkaProduct {
  def main(args: Array[String]): Unit = {

    
    //指定kafka的broker的地址
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "master:9092")
    properties.setProperty("group.id", "test1")
    //对key进行序列化
    //kafka的数据以key-value形式存储类似于文件中
    properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

    //生产者
    val producer: KafkaProducer[String, String] = new KafkaProducer[String,String](properties)
    //topic的名称,value数据
    val producerRecord = new ProducerRecord[String,String]("test1","java")
    //生产数据
    producer.send(producerRecord)
    //将数据刷到kafka,不刷的话就是批次,我们的数据一条数据不够
    producer.flush()

    //关闭连接
    producer.close()

  }

}

2、Java代码生产kafka数据(分区)

使用hash将数据打到不同的区
object Demo02ToKafka {

def main(args: Array[String]): Unit = {

val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("group.id", "ghtf")

properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

//生产者
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](properties)

//读取数据
Source.fromFile("kafkaproject/data/students.txt")
  .getLines()
  .foreach(line => {
    //将性别不同数据打到不同的分区
    val gender: String = line.split(",")(4)
    val parti: Int = Math.abs(gender.hashCode) % 2

    val redode = new ProducerRecord[String, String]("students5", parti, null, line)
    producer.send(redode)
    producer.flush()
  })
producer.close()

}

}

3、Java代码消费kafka数据(分区)

—使用hash将数据打到不同的区
—需要订阅topic

object Demo03Coutomer {

  def main(args: Array[String]): Unit = {

    /**
      * 创建kafka的连接
      */
    val properties: Properties = new Properties()
    properties.setProperty("bootstrap.servers", "master:9092")
    properties.setProperty("group.id", "f")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    /**
      * earliest
      * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      * latest
      * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      * none
      * topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      *
      */

    //从最早读取数据
    properties.put("auto.offset.reset", "earliest")
    //消费者
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](properties)
    //订阅一个消费者,订阅一个topic
    val list = new util.ArrayList[String]()
    list.add("students5")
    consumer.subscribe(list)

    while (true) {
      //拉取数据,设置超时时间
      val records: ConsumerRecords[String, String] = consumer.poll(1000)
      //读取所有数据,迭代器
      val iterators: util.Iterator[ConsumerRecord[String, String]] = records.iterator()
      while (iterators.hasNext) {
        //获取一行数据
        val line: ConsumerRecord[String, String] = iterators.next()
        val topic = line.topic()
        val partition = line.partition()
        val offset = line.offset()
        val key = line.key() //默认空
        val value = line.value()
        val time = line.timestamp() //默认系统时间
        println(s"$topict$partitiont$offsett$keyt$valuet$time")
      }
    }
    consumer.close()

  }

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/773987.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号