根据业务需要可以使用Kafka提供的Java Producer API进行产生数据,并将产生的数据发送到Kafka对应Topic的对应分区中,入口类为:Producer
Kafka的Producer API主要提供下列三个方法:
- public void send(KeyedMessage
message) 发送单条数据到Kafka集群 - public void send(List
> messages) 发送多条数据(数据集)到Kafka集群 - public void close() 关闭Kafka连接资源
一、JavaKafkaProducerPartitioner:自定义的数据分区器,功能是:决定输入的key/value键值对的message发送到Topic的那个分区中,返回分区id,范围:[0,分区数量); 这里的实现比较简单,根据key中的数字决定分区的值。具体代码如下:
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class JavaKafkaProducerPartitioner implements Partitioner {
public JavaKafkaProducerPartitioner() {
this(new VerifiableProperties());
}
public JavaKafkaProducerPartitioner(VerifiableProperties properties) {
// nothings
}
@Override
public int partition(Object key, int numPartitions) {
int num = Integer.valueOf(((String) key).replaceAll("key_", "").trim());
return num % numPartitions;
}
}
二、 JavaKafkaProducer:通过Kafka提供的API进行数据产生操作的测试类;具体代码如下:
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.log4j.Logger;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ThreadLocalRandom;
public class JavaKafkaProducer {
private Logger logger = Logger.getLogger(JavaKafkaProducer.class);
public static final String TOPIC_NAME = "test";
public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray();
public static final int chartsLength = charts.length;
public static void main(String[] args) {
String brokerList = "192.168.187.149:9092";
brokerList = "192.168.187.149:9092,192.168.187.149:9093,192.168.187.149:9094,192.168.187.149:9095";
brokerList = "192.168.187.146:9092";
Properties props = new Properties();
props.put("metadata.broker.list", brokerList);
props.put("request.required.acks", "0");
props.put("producer.type", "async");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "JavaKafkaProducerPartitioner");
// 重试次数
props.put("message.send.max.retries", "3");
// 异步提交的时候(async),并发提交的记录数
props.put("batch.num.messages", "200");
// 设置缓冲区大小,默认10KB
props.put("send.buffer.bytes", "102400");
// 2. 构建Kafka Producer Configuration上下文
ProducerConfig config = new ProducerConfig(props);
// 3. 构建Producer对象
final Producer producer = new Producer(config);
// 4. 发送数据到服务器,并发线程发送
final AtomicBoolean flag = new AtomicBoolean(true);
int numThreads = 50;
ExecutorService pool = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < 5; i++) {
pool.submit(new Thread(new Runnable() {
@Override
public void run() {
while (flag.get()) {
// 发送数据
KeyedMessage message = generateKeyedMessage();
producer.send(message);
System.out.println("发送数据:" + message);
// 休眠一下
try {
int least = 10;
int bound = 100;
Thread.sleep(ThreadLocalRandom.current().nextInt(least, bound));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " shutdown....");
}
}, "Thread-" + i));
}
// 5. 等待执行完成
long sleepMillis = 600000;
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag.set(false);
// 6. 关闭资源
pool.shutdown();
try {
pool.awaitTermination(6, TimeUnit.SECONDS);
} catch (InterruptedException e) {
} finally {
producer.close(); // 最后之后调用
}
}
private static KeyedMessage generateKeyedMessage() {
String key = "key_" + ThreadLocalRandom.current().nextInt(10, 99);
StringBuilder sb = new StringBuilder();
int num = ThreadLocalRandom.current().nextInt(1, 5);
for (int i = 0; i < num; i++) {
sb.append(generateStringMessage(ThreadLocalRandom.current().nextInt(3, 20))).append(" ");
}
String message = sb.toString().trim();
return new KeyedMessage(TOPIC_NAME, key, message);
}
private static String generateStringMessage(int numItems) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < numItems; i++) {
sb.append(charts[ThreadLocalRandom.current().nextInt(chartsLength)]);
}
return sb.toString();
}
}
三、Pom.xml依赖配置如下
0.8.2.1 org.apache.kafka kafka_2.10${kafka.version}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持考高分网。



