package Kafka_Demo;
import org.apache.kafka.clients.consumer.StickyAssignor;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Recordmetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class Producer {
public static void main(String[] args) {
producerconfig producerconfig = new producerconfig();
Properties pc = producerconfig.pc();
KafkaProducer producer = new KafkaProducer<>(pc);
/
/
//****************************************************************************************************************************
//****************************************************************************************************************************
//异步回调(不阻塞)
//JavaProducer的send方法会返回一个JavaFuture对象供用户稍后获取发送结果。这就是回调机制。
//metadata 和 exception 不可能同时为空,消息发送成功时,Exception为null,消息发送失败时,metadata为空
StickyAssignor stickyAssignor = new StickyAssignor();
for (int i = 0; i <= 10; i++) {
String s = "test---" + i;
ProducerRecord first = new ProducerRecord<>("first", s, s);
producer.send(first, new Callback() {
@Override
public void onCompletion(Recordmetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("异步发送消息成功:" + " topic=" + metadata.topic() + "t" + "partition=" + metadata.partition() + "t" + "offset=" + metadata.offset() + "t" + s);
} else {
exception.printStackTrace();
}
}
});
}
producer.close();
//****************************************************************************************************************************
//
}
}