broker
Kafka服务器进程,生产者、消费者都要连接broker一个集群由多个broker组成,功能实现Kafka集群的负载均衡、容错 producer:生产者consumer:消费者topic:主题,一个Kafka集群中,可以包含多个topic。一个topic可以包含多个分区
是一个逻辑结构,生产、消费消息都需要指定topic partition:Kafka集群的分布式就是由分区来实现的。一个topic中的消息可以分布在topic中的不同partition中replica:副本,实现Kafkaf集群的容错,实现partition的容错。一个topic至少应该包含大于1个的副本consumer group:消费者组,一个消费者组中的消费者可以共同消费topic中的分区数据。每一个消费者组都一个唯一的名字。配置group.id一样的消费者是属于同一个组中offset:偏移量。相对消费者、partition来说,可以通过offset来拉取数据 消费者组
一个消费者组中可以包含多个消费者,共同来消费topic中的数据一个topic中如果只有一个分区,那么这个分区只能被某个组中的一个消费者消费有多少个分区,那么就可以被同一个组内的多少个消费者消费 幂等性
生产者消息重复问题
Kafka生产者生产消息到partition,如果直接发送消息,kafka会将消息保存到分区中,但Kafka会返回一个ack给生产者,表示当前操作是否成功,是否已经保存了这条消息。如果ack响应的过程失败了,此时生产者会重试,继续发送没有发送成功的消息,Kafka又会保存一条一模一样的消息
在Kafka中可以开启幂等性
当Kafka的生产者生产消息时,会增加一个pid(生产者的唯一编号)和sequence number(针对消息的一个递增序列)发送消息,会连着pid和sequence number一块发送kafka接收到消息,会将消息和pid、sequence number一并保存下来如果ack响应失败,生产者重试,再次发送消息时,Kafka会根据pid、sequence number是否需要再保存一条消息判断条件:生产者发送过来的sequence number 是否小于等于 partition中消息对应的sequence 事务编程
开启事务的条件
生产者
// 开启事务必须要配置事务的ID
props.put("transactional.id", "dwd_user");
消费者
// 配置事务的隔离级别
props.put("isolation.level","read_committed");
// 关闭自动提交,一会我们需要手动来提交offset,通过事务来维护offset
props.setProperty("enable.auto.commit", "false");
生产者
初始化事务开启事务需要使用producer来将消费者的offset提交到事务中提交事务如果出现异常回滚事务
如果使用了事务,不要使用异步发送
public class TransactionProgram {
public static void main(String[] args) {
// 1. 调用之前实现的方法,创建消费者、生产者对象
KafkaConsumer consumer = createConsumer();
KafkaProducer producer = createProducer();
// 2. 生产者调用initTransactions初始化事务
producer.initTransactions();
// 3. 编写一个while死循环,在while循环中不断拉取数据,进行处理后,再写入到指定的topic
while(true) {
try {
// (1) 生产者开启事务
producer.beginTransaction();
// 这个Map保存了topic对应的partition的偏移量
Map offsetMap = new HashMap<>();
// 从topic中拉取一批的数据
// (2) 消费者拉取消息
ConsumerRecords concumserRecordArray = consumer.poll(Duration.ofSeconds(5));
// (3) 遍历拉取到的消息,并进行预处理
for (ConsumerRecord cr : concumserRecordArray) {
// 将1转换为男,0转换为女
String msg = cr.value();
String[] fieldArray = msg.split(",");
// 将消息的偏移量保存
// 消费的是ods_user中的数据
String topic = cr.topic();
int partition = cr.partition();
long offset = cr.offset();
int i = 1 / 0;
// offset + 1:offset是当前消费的记录(消息)对应在partition中的offset,而我们希望下一次能继续从下一个消息消息
// 必须要+1,从能消费下一条消息
offsetMap.put(new TopicPartition(topic, partition), new OffsetAndmetadata(offset + 1));
// 将字段进行替换
if(fieldArray != null && fieldArray.length > 2) {
String sexField = fieldArray[1];
if(sexField.equals("1")) {
fieldArray[1] = "男";
}
else if(sexField.equals("0")){
fieldArray[1] = "女";
}
}
// 重新拼接字段
msg = fieldArray[0] + "," + fieldArray[1] + "," + fieldArray[2];
// (4) 生产消息到dwd_user topic中
ProducerRecord dwdMsg = new ProducerRecord<>("dwd_user", msg);
// 发送消息
Future future = producer.send(dwdMsg);
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
producer.abortTransaction();
}
// new Callback()
// {
// @Override
// public void onCompletion(Recordmetadata metadata, Exception exception) {
// // 生产消息没有问题
// if(exception == null) {
// System.out.println("发送成功:" + dwdMsg);
// }
// else {
// System.out.println("生产消息失败:");
// System.out.println(exception.getMessage());
// System.out.println(exception.getStackTrace());
// }
// }
// });
}
producer.sendOffsetsToTransaction(offsetMap, "ods_user");
// (6) 提交事务
producer.commitTransaction();
}catch (Exception e) {
e.printStackTrace();
// (7) 捕获异常,如果出现异常,则取消事务
producer.abortTransaction();
}
}
}
// 一、创建一个消费者来消费ods_user中的数据
private static KafkaConsumer createConsumer() {
// 1. 配置消费者的属性(添加对事务的支持)
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
props.setProperty("group.id", "ods_user");
// 配置事务的隔离级别
props.put("isolation.level","read_committed");
// 关闭自动提交,一会我们需要手动来提交offset,通过事务来维护offset
props.setProperty("enable.auto.commit", "false");
// 反序列化器
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 2. 构建消费者对象
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props);
// 3. 订阅一个topic
kafkaConsumer.subscribe(Arrays.asList("ods_user"));
return kafkaConsumer;
}
// 二、编写createProducer方法,用来创建一个带有事务配置的生产者
private static KafkaProducer createProducer() {
// 1. 配置生产者带有事务配置的属性
Properties props = new Properties();
props.put("bootstrap.servers", "node1.itcast.cn:9092");
// 开启事务必须要配置事务的ID
props.put("transactional.id", "dwd_user");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2. 构建生产者
KafkaProducer kafkaProducer = new KafkaProducer<>(props);
return kafkaProducer;
}
}



