Spring
Kafka允许您通过
@Bean在应用程序上下文中声明s来创建新主题。这将需要
KafkaAdmin在应用程序上下文中具有类型的Bean,如果使用Spring
Boot,则会自动创建该类型的Bean 。您可以如下定义主题:
@Beanpublic NewTopic myTopic() { return TopicBuilder.name("my-topic") .partitions(4) .replicas(3) .config(TopicConfig.RETENTION_MS_CONFIG, "1680000") .build();}如果您不使用Spring Boot,则还必须定义
KafkaAdminbean:
@Beanpublic KafkaAdmin admin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); return new KafkaAdmin(configs);}如果您要编辑现有主题的配置,则必须使用
AdminClient,这是
retention.ms在主题级别更改的代码段:
Map<String, Object> config = new HashMap<>(); config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");AdminClient client = AdminClient.create(config);ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "new-topic");// Update the retention.ms valueConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1680000");Map<ConfigResource, Config> updateConfig = new HashMap<ConfigResource, Config>();updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));AlterConfigsResult alterConfigsResult = client.alterConfigs(updateConfig);alterConfigsResult.all();



