同步方式
public class Myproducer1 {
public static void main(String[] args) {
Map configs = new HashMap<>();
configs.put("bootstrap.servers", "Linux122:9092");
configs.put("key.serializer", IntegerSerializer.class);
configs.put("value.serializer", StringSerializer.class);
KafkaProducer producer = new KafkaProducer<>(configs);
ArrayList headers = new ArrayList<>();
headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));
ProducerRecord producerRecord = new ProducerRecord("topic_1", 0, 0, "hello laogu 0 ", headers);
// Future future = producer.send(producerRecord);
// try {
// Recordmetadata recordmetadata = future.get();
// System.out.println("消息的主题:" + recordmetadata.topic());
// System.out.println("消息的分区号:" + recordmetadata.partition());
// System.out.println("消息的偏移量:" + recordmetadata.offset());
// } catch (InterruptedException | ExecutionException e) {
// e.printStackTrace();
// }finally {
// producer.close();
// }
异步方式
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(Recordmetadata recordmetadata, Exception e) {
if (e != null) {
System.out.println("异常消息" + e.getMessage());
} else {
System.out.println("消息的主题:" + recordmetadata.topic());
System.out.println("消息的分区号:" + recordmetadata.partition());
System.out.println("消息的偏移量:" + recordmetadata.offset());
}
}
});
producer.close();
}
}
2.消费者消费消息的方式
public class MyConsumer {
public static void main(String[] args) {
HashMap configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "Linux122:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList("topic_1"));
while (true) {
ConsumerRecords consumerRecords = consumer.poll(3000);
for (ConsumerRecord consumerRecord : consumerRecords) {
System.out.println("topic = " + consumerRecord.topic() + " partition = " + consumerRecord.partition() + " offset = " + consumerRecord.offset() + " key = " + consumerRecord.key() + " value = " + consumerRecord.value());
}
}
}
}
3.Kafka和Spring boot整合
首先需要配置application.properties文件
spring.application.name=springboot-kafka-02 server.port=8080 #kafka配置 spring.kafka.bootstrap-servers=Linux122:9092 #producer配置 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 #consumer配置 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.group-id=springboot-consumer02 spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=1000(1)同步生产者
@RestController
public class KafkaSyncProducerController {
@Autowired
private KafkaTemplate template;
@RequestMapping("send/sync/{message}")
public String send(@PathVariable String message){
ListenableFuture> future = template.send("topic-spring-01", 0, 0, message);
try {
SendResult sendResult = future.get();
Recordmetadata recordmetadata = sendResult.getRecordmetadata();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return "success";
}
}
(2)异步生产者
@RestController
public class KafkaASyncProducerController {
@Autowired
private KafkaTemplate template;
@RequestMapping("send/async/{message}")
public String send(@PathVariable String message) {
ListenableFuture> future = template.send("topic-spring-01", 0, 0, message);
future.addCallback(new ListenableFutureCallback>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("发送消息失败 : " + throwable.getMessage());
}
@Override
public void onSuccess(SendResult result) {
Recordmetadata recordmetadata = result.getRecordmetadata();
}
});
return "success";
}
}
我们在和Spring boot整合时候发现没有主动去创建主题等信息,但是运行后发现存在这是由于spring配置中自动做了这些工作.
我们也可以对其进行自定义,类似的对象能重新覆盖定义
eg:Spring boot中自定义Kafka中KafkaConfig
@Configuration
public class KafkaConfig {
@Bean
public NewTopic topic1(){
return new NewTopic("nptc-01",1,(short) 1);
}
@Bean
public NewTopic topic2(){
return new NewTopic("nptc-02",5,(short) 1);
}
}
(3)消费者
@Component
public class Myconsumer {
@KafkaListener(topics = "topic-spring-01")
public void onMessage(ConsumerRecord record) {
System.out.println("消费者收到消息 消息的topic = " + record.topic() + " 消息的value = " + record.value() + " 消息的partition = " + record.partition() + " 消息的offset = " + record.offset());
}
}
二.自定义序列化和反序列化
(1)自定义Object
public class User {
private Integer userId;
private String userName;
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
@Override
public String toString() {
return "User{" +
"userId=" + userId +
", userName='" + userName + ''' +
'}';
}
}
(2)自定义序列化器
public class UserSerializer implements Serializer{ @Override public void configure(Map configs, boolean isKey) { Serializer.super.configure(configs, isKey); //用于接收对序列化器的配置参数,并对当前的序列化器进行配置和初始化 } @Override public byte[] serialize(String s, User user) { if (user == null) { return null; } Integer userId = user.getUserId(); String userName = user.getUserName(); if (userId != null && userName != null) { try { byte[] bytes = userName.getBytes("UTF-8"); int length = bytes.length; ByteBuffer byteBuffer = ByteBuffer.allocate(4+4+length); byteBuffer.putInt(userId); byteBuffer.putInt(length); byteBuffer.put(bytes); return byteBuffer.array(); } catch (Exception e) { throw new SerializationException("User Serialization Exception!"); } } return null; } @Override public byte[] serialize(String topic, Headers headers, User data) { return Serializer.super.serialize(topic, headers, data); } @Override public void close() { Serializer.super.close(); //幂等调用即调用多次效果和一次一样 } }
(3).使用自定义序列化器进行发送消息
public class MyProducer {
public static void main(String[] args) {
Map configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Linux122:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);
KafkaProducer userKafkaProducer = new KafkaProducer<>(configs);
User user = new User();
user.setUserId(111);
user.setUserName("good go");
ProducerRecord userProducerRecord = new ProducerRecord("tp_user_01", user.getUserName(), user);
userKafkaProducer.send(userProducerRecord, new Callback() {
@Override
public void onCompletion(Recordmetadata recordmetadata, Exception e) {
if (e == null) {
System.out.println("userProducerRecord 发送成功,返回的信息是:topic = " + recordmetadata.topic() + " offset = " + recordmetadata.offset() + " partition = " + recordmetadata.partition());
}
}
});
userKafkaProducer.close();
}
}
(4).自定义反序列化器
public class UserDeserializer implements Deserializer{ @Override public void configure(Map configs, boolean isKey) { Deserializer.super.configure(configs, isKey); } @Override public User deserialize(String topic, byte[] data) { ByteBuffer byteBuffer = ByteBuffer.allocate(data.length); byteBuffer.put(data); byteBuffer.flip();//类似重置操作 int userId = byteBuffer.getInt(); int length = byteBuffer.getInt(); String userName = new String(data,8,length); return new User(userId,userName); } @Override public User deserialize(String topic, Headers headers, byte[] data) { return Deserializer.super.deserialize(topic, headers, data); } @Override public void close() { Deserializer.super.close(); } }
(5)使用自定义反序列化器进行消费消息
public class MyConsumer {
public static void main(String[] args) {
Map configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "Linux122:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "con1");
KafkaConsumer kafkaConsumer = new KafkaConsumer(configs);
kafkaConsumer.subscribe(Collections.singleton("tp_user_01"));
int i = 0;
while (true) {
ConsumerRecords consumerRecords = kafkaConsumer.poll(5000);
final int index = i++;
consumerRecords.forEach(new Consumer>() {
@Override
public void accept(ConsumerRecord consumerRecord) {
System.out.println("第" + index + "批次收到消息key = " + consumerRecord.key() + " value = " + consumerRecord.value());
}
});
}
//kafkaConsumer.close();
}
}



