栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Kafka Java Producer代码实例详解

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka Java Producer代码实例详解

根据业务需要可以使用Kafka提供的Java Producer API进行产生数据,并将产生的数据发送到Kafka对应Topic的对应分区中,入口类为:Producer

Kafka的Producer API主要提供下列三个方法:

  •   public void send(KeyedMessage message) 发送单条数据到Kafka集群
  •   public void send(List> messages) 发送多条数据(数据集)到Kafka集群
  •   public void close() 关闭Kafka连接资源

一、JavaKafkaProducerPartitioner:自定义的数据分区器,功能是:决定输入的key/value键值对的message发送到Topic的那个分区中,返回分区id,范围:[0,分区数量); 这里的实现比较简单,根据key中的数字决定分区的值。具体代码如下:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;


public class JavaKafkaProducerPartitioner implements Partitioner {

  
  public JavaKafkaProducerPartitioner() {
    this(new VerifiableProperties());
  }

  
  public JavaKafkaProducerPartitioner(VerifiableProperties properties) {
    // nothings
  }

  @Override
  public int partition(Object key, int numPartitions) {
    int num = Integer.valueOf(((String) key).replaceAll("key_", "").trim());
    return num % numPartitions;
  }
}

二、 JavaKafkaProducer:通过Kafka提供的API进行数据产生操作的测试类;具体代码如下:

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.log4j.Logger;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ThreadLocalRandom;


public class JavaKafkaProducer {
  private Logger logger = Logger.getLogger(JavaKafkaProducer.class);
  public static final String TOPIC_NAME = "test";
  public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray();
  public static final int chartsLength = charts.length;


  public static void main(String[] args) {
    String brokerList = "192.168.187.149:9092";
    brokerList = "192.168.187.149:9092,192.168.187.149:9093,192.168.187.149:9094,192.168.187.149:9095";
    brokerList = "192.168.187.146:9092";
    Properties props = new Properties();
    props.put("metadata.broker.list", brokerList);
    
    props.put("request.required.acks", "0");
    
    props.put("producer.type", "async");
    
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    
    props.put("partitioner.class", "JavaKafkaProducerPartitioner");

    // 重试次数
    props.put("message.send.max.retries", "3");

    // 异步提交的时候(async),并发提交的记录数
    props.put("batch.num.messages", "200");

    // 设置缓冲区大小,默认10KB
    props.put("send.buffer.bytes", "102400");

    // 2. 构建Kafka Producer Configuration上下文
    ProducerConfig config = new ProducerConfig(props);

    // 3. 构建Producer对象
    final Producer producer = new Producer(config);

    // 4. 发送数据到服务器,并发线程发送
    final AtomicBoolean flag = new AtomicBoolean(true);
    int numThreads = 50;
    ExecutorService pool = Executors.newFixedThreadPool(numThreads);
    for (int i = 0; i < 5; i++) {
      pool.submit(new Thread(new Runnable() {
 @Override
 public void run() {
   while (flag.get()) {
     // 发送数据
     KeyedMessage message = generateKeyedMessage();
     producer.send(message);
     System.out.println("发送数据:" + message);

     // 休眠一下
     try {
int least = 10;
int bound = 100;
Thread.sleep(ThreadLocalRandom.current().nextInt(least, bound));
     } catch (InterruptedException e) {
e.printStackTrace();
     }
   }

   System.out.println(Thread.currentThread().getName() + " shutdown....");
 }
      }, "Thread-" + i));

    }

    // 5. 等待执行完成
    long sleepMillis = 600000;
    try {
      Thread.sleep(sleepMillis);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    flag.set(false);

    // 6. 关闭资源

    pool.shutdown();
    try {
      pool.awaitTermination(6, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
    } finally {
      producer.close(); // 最后之后调用
    }
  }

  
  private static KeyedMessage generateKeyedMessage() {
    String key = "key_" + ThreadLocalRandom.current().nextInt(10, 99);
    StringBuilder sb = new StringBuilder();
    int num = ThreadLocalRandom.current().nextInt(1, 5);
    for (int i = 0; i < num; i++) {
      sb.append(generateStringMessage(ThreadLocalRandom.current().nextInt(3, 20))).append(" ");
    }
    String message = sb.toString().trim();
    return new KeyedMessage(TOPIC_NAME, key, message);
  }

  
  private static String generateStringMessage(int numItems) {
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < numItems; i++) {
      sb.append(charts[ThreadLocalRandom.current().nextInt(chartsLength)]);
    }
    return sb.toString();
  }
}

三、Pom.xml依赖配置如下


  0.8.2.1



  
    org.apache.kafka
    kafka_2.10
    ${kafka.version}
  

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持考高分网。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/133875.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号