1.application.yml
test:
kafka:
bootstrapServers: 192.168.11.1:9092,192.168.11.2:9092
consumer:
group.id: group-id
auto.offset.reset: latest
enable.auto.commit: true
producer:
max.in.flight.requests.per.connection: 1
acks: -1
buffer.memory: 33554432
batch.size: 16384
linger.ms: 100
retries: 1
2.KafkaConfig.java
@Component
@ConfigurationProperties(prefix = "test.kafka")
@Data
@Slf4j
public class KafkaConfig implements InitializingBean {
private String bootstrapServers;
private Properties consumer;
private Properties producer;
@Override
public void afterPropertiesSet() throws Exception {
Properties consumerProperties = getConsumerProperties();
Properties producerProperties = getProducerProperties();
}
public Properties getConsumerProperties() {
Properties p = new Properties();
p.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 500);
p.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
p.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 70000);
p.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
p.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
p.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 70000);
p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
p.putAll(consumer);
return p;
}
public Properties getProducerProperties(){
Properties p = new Properties();
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
p.putAll(producer);
return p;
}
}
3.KafkaReceiver.java
@Slf4j
@Component("kafkaReceiver")
public class KafkaReceiver extends Thread {
private KafkaConfig kafkaConfig;
@Autowired
public KafkaReceiver (KafkaConfig kafkaConfig) {
this.kafkaConfig= kafkaConfig;
}
@Override
public void run() {
log.info("启动kafka consumer ...");
KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getConsumerProperties());
Set topics ; //TODO 从数据库中查询的topic
consumer.subscribe(topics );
KafkaProducer producer = new KafkaProducer(kafkaConfig.getProducerProperties());
while (true) {
try {
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord record : records) {
String topic = record.topic();
String msg = record.value();
log.info("topic={},offset={},value={}", topic, record.offset(), msg);
//TODO 在这里可以进行kafka消息转换。
String targetTopic ;//转发的目的topic
ProducerRecord producerRecord = new ProducerRecord(targetTopic, key, msg);
producer.send(producerRecord, (Recordmetadata metadata, Exception exception) -> {
if (exception == null) {
log.info("send success topic={},offset={},key={}", metadata.topic(), metadata.offset(), key);
} else {
log.error("send failure key={}", key);
}
});
}
} catch (Exception e) {
log.error("", e);
}
}
}
}



