这里实际上是调用kafka客户端来执行kafka消息数据写入的。这里模拟随机产生一系列数据,持续写入kafka,形成持续的消息流数据。
1. 添加依赖2. 测试代码org.apache.spark spark-core_2.123.0.0 org.apache.spark spark-streaming_2.123.0.0 org.apache.spark spark-streaming-kafka-0-10_2.123.1.0
package com.demo
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import java.util.Properties
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
object MockerRealTime {
def generateMockData(): Array[String] = {
val array: ArrayBuffer[String] = ArrayBuffer[String]()
val CityRandomOpt = RandomOptions(RanOpt(CityInfo(1, "北京", "华北"), 30),
RanOpt(CityInfo(2, "上海", "华东"), 30),
RanOpt(CityInfo(3, "广州", "华南"), 10),
RanOpt(CityInfo(4, "深圳", "华南"), 20),
RanOpt(CityInfo(5, "天津", "华北"), 10))
val random = new Random()
// 模拟实时数据:
// timestamp province city userid adid
for (i <- 0 to 50) {
val timestamp: Long = System.currentTimeMillis()
val cityInfo: CityInfo = CityRandomOpt.getRandomOpt
val city: String = cityInfo.city_name
val area: String = cityInfo.area
val adid: Int = 1 + random.nextInt(6)
val userid: Int = 1 + random.nextInt(6)
// 拼接实时数据
array += timestamp + " " + area + " " + city + " " + userid + " " + adid
}
array.toArray
}
def createKafkaProducer(broker: String): KafkaProducer[String, String] = {
// 创建配置对象
val prop = new Properties()
// 添加配置
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
// 根据配置创建 Kafka 生产者
new KafkaProducer[String, String](prop)
}
def main(args: Array[String]): Unit = {
// 获取配置文件 config.properties 中的 Kafka 配置参数
val config: Properties = PropertiesUtil.load("config.properties")
val broker: String = config.getProperty("kafka.broker.list")
val topic = "test"
// 创建 Kafka 消费者
val kafkaProducer: KafkaProducer[String, String] = createKafkaProducer(broker)
while (true) {
// 随机产生实时数据并通过 Kafka 生产者发送到 Kafka 集群中
for (line <- generateMockData()) {
kafkaProducer.send(new ProducerRecord[String, String](topic, line))
println(line)
}
Thread.sleep(2000)
}
}
}
kafka生产者参数配置主要由createKafkaProducer完成。
主要的配置内容时kafka的ip地址,端口号,topic以及key和value的序列化。
3. kafka配置(config.properties)# Kafka 配置 kafka.broker.list=192.168.22.56:90924. 辅助代码(PropertiesUtil.scala)
package com.demo
import java.io.InputStreamReader
import java.util.Properties
object PropertiesUtil {
def load(propertiesName:String): Properties ={
val prop=new Properties()
prop.load(new
InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertiesName) , "UTF-8"))
prop
}
}
5. 辅助代码(RandomOptions.scala)
package com.demo
import scala.collection.mutable.ListBuffer
import scala.util.Random
case class RanOpt[T](value: T, weight: Int)
object RandomOptions {
def apply[T](opts: RanOpt[T]*): RandomOptions[T] = {
val randomOptions = new RandomOptions[T]()
for (opt <- opts) {
randomOptions.totalWeight += opt.weight
for (i <- 1 to opt.weight) {
randomOptions.optsBuffer += opt.value
}
}
randomOptions
}
}
class RandomOptions[T](opts: RanOpt[T]*) {
var totalWeight = 0
var optsBuffer = new ListBuffer[T]
def getRandomOpt: T = {
val randomNum: Int = new Random().nextInt(totalWeight)
optsBuffer(randomNum)
}
}
6. 辅助代码(CityInfo.scala)
package com.demo
case class CityInfo (city_id:Long,
city_name:String,
area:String)
7. 执行程序测试
可以同时看到idea控制台和kafka的命令行消费者输出。
1645151518980 华南 深圳 6 6 1645151518980 华南 深圳 2 3 1645151518980 华南 深圳 4 6 1645151518980 华东 上海 3 6 1645151518980 华北 北京 2 4 1645151518980 华东 上海 6 2 1645151518980 华北 北京 2 1
kafka消息输出。



