(1)导包
org.apache.kafka kafka_2.11 1.0.0 org.scala-lang scala-library 2.11.12 org.scala-lang scala-compiler 2.11.12 org.scala-lang scala-reflect 2.11.12
(2)代码实现
—注意对key进行序列化
—kafka的数据以key-value形式存储类似于文件中
—生产者通过send发送数据
—flush,将数据刷到kafka,不刷的话就是批次,我们的数据一条数据不够
object Demo01KafkaProduct {
def main(args: Array[String]): Unit = {
//指定kafka的broker的地址
val properties = new Properties()
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("group.id", "test1")
//对key进行序列化
//kafka的数据以key-value形式存储类似于文件中
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
//生产者
val producer: KafkaProducer[String, String] = new KafkaProducer[String,String](properties)
//topic的名称,value数据
val producerRecord = new ProducerRecord[String,String]("test1","java")
//生产数据
producer.send(producerRecord)
//将数据刷到kafka,不刷的话就是批次,我们的数据一条数据不够
producer.flush()
//关闭连接
producer.close()
}
}
2、Java代码生产kafka数据(分区)使用hash将数据打到不同的区
object Demo02ToKafka {
def main(args: Array[String]): Unit = {
val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("group.id", "ghtf")
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
//生产者
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](properties)
//读取数据
Source.fromFile("kafkaproject/data/students.txt")
.getLines()
.foreach(line => {
//将性别不同数据打到不同的分区
val gender: String = line.split(",")(4)
val parti: Int = Math.abs(gender.hashCode) % 2
val redode = new ProducerRecord[String, String]("students5", parti, null, line)
producer.send(redode)
producer.flush()
})
producer.close()
}
}
3、Java代码消费kafka数据(分区)—使用hash将数据打到不同的区
—需要订阅topic
object Demo03Coutomer {
def main(args: Array[String]): Unit = {
/**
* 创建kafka的连接
*/
val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("group.id", "f")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
/**
* earliest
* 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
* latest
* 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
* none
* topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
*
*/
//从最早读取数据
properties.put("auto.offset.reset", "earliest")
//消费者
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](properties)
//订阅一个消费者,订阅一个topic
val list = new util.ArrayList[String]()
list.add("students5")
consumer.subscribe(list)
while (true) {
//拉取数据,设置超时时间
val records: ConsumerRecords[String, String] = consumer.poll(1000)
//读取所有数据,迭代器
val iterators: util.Iterator[ConsumerRecord[String, String]] = records.iterator()
while (iterators.hasNext) {
//获取一行数据
val line: ConsumerRecord[String, String] = iterators.next()
val topic = line.topic()
val partition = line.partition()
val offset = line.offset()
val key = line.key() //默认空
val value = line.value()
val time = line.timestamp() //默认系统时间
println(s"$topict$partitiont$offsett$keyt$valuet$time")
}
}
consumer.close()
}
}



