栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Java使用Producer向Kafka集群发送消息

Java使用Producer向Kafka集群发送消息

Java使用Producer向Kafka集群发送消息
  • 一、Pom文件配置
  • 二、向Kafka集群发送消息
    • 2.1 直接发送
    • 2.2 同步发送
    • 2.3 异步发送
    • 2.4 Flume使用的发送方式

Java使用Kafka Producer API向Kafka集群发送消息的时候,大概有3中形式,分别为直接、同步、异步发送,本篇文章会介绍这3中发布方式的区别

一、Pom文件配置

	org.apache.kafka
	kafka-clients
	0.9.0.1

二、向Kafka集群发送消息 2.1 直接发送
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class Producer1 {

	public static void main(String[] args) {

		Properties kafkaProps = new Properties();
		kafkaProps.put("bootstrap.servers", "10.202.62.66:9092,10.202.62.67:9092,10.202.62.68:9092");
		kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		KafkaProducer producer = new KafkaProducer(kafkaProps);
		// 直接发送
		for (int i = 0; i < 100; i++) {
			ProducerRecord record = new ProducerRecord<>("testtopic", "key" + i, "message " + i);
			producer.send(record);

		}
		producer.flush();
		producer.close();
	}
}

这里有几个注意事项:

(1) Producer接收是ProducerRecord对象,因此最开始的时候需要创建ProducerRecord的对象。ProducerRecord有好几种构造方法,稍后我们会讲到。上面例子中需要填写接收消息的topic(以啊不能都是字符串类型),想要发送的key和value。key和value的类型必须要和序列化保持一致。

(2) 使用send()方法发送ProducerRecord对象,就像最开始Kafka的架构一样,消息会先缓存在buffer,然后开启独立的线程发送给broker。send()方法返回一个Java Future对象,对象中包含Recordmetadata,在上面的例子中并没有关心返回值,因此也就不知道消息是否发送成功,这种一般适用于允许丢失消息的情况。比如记录一些日志信息或者是不太重要的应用信息。

(3) 虽然我们忽略了向broker发送数据时出的错或者是Broker自己出的错,在producer发送数据前如果有错误仍然会抛出异常。有可能是在序列化消息的时候,产生了异常。比如说Buffer已经满了或者是发送线程中断产生的中断异常。

2.2 同步发送
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Recordmetadata;

public class Producer2 {

	public static void main(String[] args) {

		Properties kafkaProps = new Properties();
		kafkaProps.put("bootstrap.servers", "10.202.62.66:9092,10.202.62.67:9092,10.202.62.68:9092");
		kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		KafkaProducer producer = new KafkaProducer(kafkaProps);

		// 同步发送
		for (int i = 0; i < 100; i++) {
			ProducerRecord record = new ProducerRecord<>("testtopic", "key-"+i, "message " + i);
			Future result = producer.send(record);
			try {
				Recordmetadata m = result.get();
				System.out.println("partition:" + m.partition() + ",offset:" + m.offset());
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
		producer.flush();
		producer.close();

	}
}

(1)这里用了Future.get()方法,会等待kafka的确认回复。当broker遇到错误或者应用出现问题时,future接口都会抛出异常,然后我们可以捕获到这个异常进行处理。如果没有错误。将会获得Recordmetadata对象,这个对象包含了消息写入的偏移值。

(2)Producer有两种错误类型。一种是可以通过再次发送消息解决的错误,比如连接出现问题,需要重新连接;或者是"no leader"错误,通过等待一会Leader重新选举完就可以继续。producer可以配置自动重试。另一种是通过重试无法处理的错误,比如消息过大,这种情况下,Producer就不会重试,而是直接抛出异常。

2.3 异步发送
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Recordmetadata;

public class Producer3 {

	public static void main(String[] args) {
		Properties kafkaProps = new Properties();

		kafkaProps.put("bootstrap.servers", "10.202.62.66:9092,10.202.62.67:9092,10.202.62.68:9092");
		kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		KafkaProducer producer = new KafkaProducer(kafkaProps);

		for (int i = 0; i < 100; i++) {
			ProducerRecord record = new ProducerRecord<>("testtopic", "key" + i, "async message " + i);
			Future result = producer.send(record, new Callback() {
				@Override
				public void onCompletion(Recordmetadata metadata, Exception exception) {
					if (exception != null) {
						exception.printStackTrace();
					}
					if (metadata != null) {
						System.out.println("partition:" + metadata.partition() + ",offset:" + metadata.offset());
					}
				}
			});

			try {
				Recordmetadata m = result.get();
				System.out.println(m.partition() + "," + m.topic());
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		producer.flush();
		producer.close();
	}
}

(1) 为了使用回调方法,需要实现org.apache.kafka.clients.producer.Callback接口,实现它的onCompletion方法。

(2) 当Kafka返回错误的时候,onCompletion方法会收到一个非null的异常。上面的例子直接打印异常消息,但是如果是生产环境,需要做一些处理错误的操作。

(3) 记录的创建和之前是一样的

(4)需要再发送消息的时候,传入回调的对象

2.4 Flume使用的发送方式
package com.master.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Recordmetadata;

public class FlumeProducer {

	public static void main(String[] args) {
		Properties kafkaProps = new Properties();
		List> list = new ArrayList>();
		kafkaProps.put("bootstrap.servers", "10.202.62.66:9092,10.202.62.67:9092,10.202.62.68:9092");
		kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		KafkaProducer producer = new KafkaProducer(kafkaProps);
		for (int i = 0; i < 100; i++) {
			ProducerRecord record = new ProducerRecord<>("testtopic", "key--" + i,
					"flume message " + i);
			Future result = producer.send(record, new Callback() {
				@Override
				public void onCompletion(Recordmetadata metadata, Exception exception) {
					if (exception != null) {
						exception.printStackTrace();
					}
					if (metadata != null) {
						System.out.println("partition:" + metadata.partition() + ",offset:" + metadata.offset());
					}
				}
			});
			list.add(result);
		}
		for (Future future : list) {
			try {
				future.get();
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
		}
		producer.flush();
		producer.close();
	}
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/654469.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号