因为是java开发,相关zookeeper环境搭建 与kafka搭建就不详细做说明了,网上有很多的搭建方法,这里只叙述JAVA中的使用方法跟相关问题,纯手工啊兄弟萌
kafkaApi
1.2 yml配置org.apache.kafka kafka_2.121.0.0 provided org.apache.kafka kafka-clients1.0.0 org.apache.kafka kafka-streams1.0.0 org.springframework.kafka spring-kafka
kafka:
bootstrap-servers: 172.16.7.56:1996
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
listener:
#listner负责ack,每调用一次,就立即commit,如果是手动消费此项必须配置不然会报错
ack-mode: manual_immediate
missing-topics-fatal: false
注意配置好了消费业务 手动消费时 ack-mode: manual_immediate必须配置 不然会报错spring boot集成配合kafka的话只需要在yml中做好参数配置就可以了,简单配置一下,详细的可以在官方文档中查看-官方文档
1.3 确保数据安全配置- 给 topic 设置
replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2
个副本。这个是可以在yml中配置的在 Kafka 服务端设置
min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader
至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。在 producer 端设置
acks=all:这个是要求每条数据,必须是写入所有 replica
之后,才能认为是写成功了。这个是可以在yml中配置的在 producer 端设置
retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。这个是可以在yml中配置的
JAVA是可以通过配置文件直接增加主题的,当然生产者send数据的时候如果没有指定的主题kafka是会默认创建这个主题的,我们也可以通过配置类进行主题的增加。
具体是通过kafka.clients.admin包实现的以下是代码:
@Configuration
@Slf4j
@EnableAsync
public class kafkaConfiguration {
@Bean
NewTopic initialTopic(){
log.info("【kafka 已创建主题partitions 分区数量为5 副本数量为2】");
//创建主题名为partitionsOne,分区数量 5 副本数量 2
return new NewTopic("partitionsOne",5, (short) 2);
}
}
2.1 JAVA手动增加主题分区
因为不知道怎么把yml中的kafka相关配置直接取出来,我这里只能手动创建properties类加入map中使用AdminClient去增加主题的分区以下有几点注意
1.分区只能增大不能减小
2.确定好客户端跟服务器的kafka版本是否支持并确定好版本是否一致
public void createPartition(){
Properties properties = new Properties();
//不知道怎么直接把yml中kafka相关配置放过来只能手动加入
//加入kafka地址的key跟地址值 bootstrap.servers 是写死的
properties.put("bootstrap.servers", "192.168.0.0:9092");
try (AdminClient client = AdminClient.create(properties)) {
Map newPartitions = new HashMap<>();
newPartitions.put("topic1", NewPartitions.increaseTo(2));
CreatePartitionsResult rs = client.createPartitions(newPartitions);
try {
rs.all().get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
}
未完待续…后续会慢慢补充



