- 1.pom依赖
- 2.application.properties 配置文件
- 3. 消费者 配置自动读取
- 4. 生产者 配置自动读取
- 5. 消费者工具
- 6.生产者工具
- 7.测试 生产消费
1.pom依赖
如果是springboot项目可以不指定版本,自动匹配
2.application.properties 配置文件org.apache.kafka kafka-clients 2.6.0 org.apache.kafka kafka-streams 2.6.0
server.port=2333 #—————————————————————————————————生产者——————————————————————————————————————————— #xxx服务器ip jmw.kafka.producer.servers=xxxx:xxx jmw.kafka.producer.topic=xxx #所有follower都响应了才认为消息提交成功,即"committed" jmw.kafka.ack=all #retries = MAX 无限重试,直到你意识到出现了问题: jmw.kafka.retries=0 #producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数 jmw.kafka.batch.size=16384 #batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms #延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理 jmw.kafka.batch.linger.ms=1 #producer可以用来缓存数据的内存大小。 jmw.kafka.buffer.memory=33554432 producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer #—————————————————————————————————消费者——————————————————————————————————————————— jmw.kafka.consumer.servers=xxxx:xxx jmw.kafka.consumer.topic=xxx jmw.kafka.consumer.group.id=xxx jmw.kafka.enable.auto.commit=true jmw.kafka.auto.commit.interval.ms=1000 jmw.kafka.auto.offset.reset=latest jmw.kafka.session.timeout.ms=30000 consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer3. 消费者 配置自动读取
package cn.com.kaf.configuration;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class ConsumerApplicaitonProperties implements InitializingBean {
@Value("${jmw.kafka.consumer.servers}")
private String serverHostPort;
@Value("${jmw.kafka.enable.auto.commit}")
private String enableAutoCommit;
@Value("${jmw.kafka.auto.commit.interval.ms}")
private String autoCommitInterval;
@Value("${jmw.kafka.auto.offset.reset}")
private String autoOffsetReset;
@Value("${jmw.kafka.session.timeout.ms}")
private String sessionTimeout;
@Value("${jmw.kafka.consumer.topic}")
private String consumerTopic;
@Value("${jmw.kafka.consumer.group.id}")
private String consumerGroupId;
@Value("${consumer.key.deserializer}")
private String key;
@Value("${consumer.value.deserializer}")
private String value;
public static String KAFKA_CONSUMER_SERVER_HOST_PORT;
public static String KAFKA_CONSUMER_ENABLE_AUTO_COMMIT;
public static String KAFKA_CONSUMER_AUTO_COMMIT_INTERVAL;
public static String KAFKA_CONSUMER_AUTO_OFFSET_RESET;
public static String KAFKA_CONSUMER_SESSION_TIMEOUT;
public static String KAFKA_CONSUMER_TOPIC;
public static String KAFKA_CONSUMER_GROUP_ID;
public static String KAFKA_KEY_SERIALIZER ;
public static String KAFKA_VALUE_SERIALIZER ;
@Override
public void afterPropertiesSet() throws Exception {
KAFKA_CONSUMER_ENABLE_AUTO_COMMIT = enableAutoCommit;
KAFKA_CONSUMER_AUTO_COMMIT_INTERVAL = autoCommitInterval;
KAFKA_CONSUMER_AUTO_OFFSET_RESET = autoOffsetReset;
KAFKA_CONSUMER_SESSION_TIMEOUT = sessionTimeout;
KAFKA_CONSUMER_TOPIC = consumerTopic;
KAFKA_CONSUMER_GROUP_ID = consumerGroupId;
KAFKA_KEY_SERIALIZER = key;
KAFKA_VALUE_SERIALIZER = value;
KAFKA_CONSUMER_SERVER_HOST_PORT = serverHostPort;
}
}
4. 生产者 配置自动读取
package cn.com.kaf.configuration;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class ProducergApplicationProperties implements InitializingBean {
@Value("${jmw.kafka.producer.servers}")
private String serverHostPort;
@Value("${jmw.kafka.producer.topic}")
private String producerTopic;
@Value("${jmw.kafka.ack}")
private String acks;
@Value("${jmw.kafka.retries}")
private String retries;
@Value("${jmw.kafka.batch.size}")
private String batchSize;
@Value("${jmw.kafka.batch.linger.ms}")
private String batchLingerMs;
@Value("${jmw.kafka.buffer.memory}")
private String bufferMemory;
@Value("${producer.key.serializer}")
private String key;
@Value("${producer.value.serializer}")
private String value;
public static String KAFKA_SERVER_HOST_PORT;
public static String KAFKA_ACKS;
public static String KAFKA_RETRIES;
public static String KAFKA_BATCH_SIZE;
public static String KAFKA_BATCH_LINGER;
public static String KAFKA_CACHE_MEMORY;
public static String KAFKA_PRODUCER_TOPIC;
public static String KAFKA_KEY_SERIALIZER ;
public static String KAFKA_VALUE_SERIALIZER ;
@Override
public void afterPropertiesSet() throws Exception {
KAFKA_SERVER_HOST_PORT = serverHostPort;
KAFKA_ACKS = acks;
KAFKA_RETRIES = retries;
KAFKA_BATCH_SIZE = batchSize;
KAFKA_BATCH_LINGER = batchLingerMs;
KAFKA_CACHE_MEMORY = bufferMemory;
KAFKA_PRODUCER_TOPIC = producerTopic;
KAFKA_KEY_SERIALIZER = key;
KAFKA_VALUE_SERIALIZER = value;
}
}
5. 消费者工具
package cn.com.kaf.consumer;
import cn.com.kaf.configuration.ConsumerApplicaitonProperties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.Properties;
@Component
public class ConsumerFactoryTool extends ConsumerApplicaitonProperties{
private static KafkaConsumer consumer = null;
public static KafkaConsumer SingleCase(){
KafkaConsumer kafkaConsumer = consumer;
kafkaConsumer.subscribe(Arrays.asList(KAFKA_CONSUMER_TOPIC));
return kafkaConsumer;
}
private ConsumerFactoryTool() {
Properties properties = new Properties();
properties.put("bootstrap.servers", KAFKA_CONSUMER_SERVER_HOST_PORT);
properties.put("key.deserializer", KAFKA_KEY_SERIALIZER);
properties.put("value.deserializer", KAFKA_VALUE_SERIALIZER);
properties.put("group.id", KAFKA_CONSUMER_GROUP_ID);
properties.put("enable.auto.commit", KAFKA_CONSUMER_ENABLE_AUTO_COMMIT);
properties.put("auto.commit.interval.ms", KAFKA_CONSUMER_AUTO_COMMIT_INTERVAL);
properties.put("auto.offset.reset", KAFKA_CONSUMER_AUTO_OFFSET_RESET);
properties.put("session.timeout.ms", KAFKA_CONSUMER_SESSION_TIMEOUT);
consumer = new KafkaConsumer<>(properties);
}
}
6.生产者工具
package cn.com.kaf.producer;
import cn.com.kaf.configuration.ProducergApplicationProperties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.stereotype.Component;
import java.util.Properties;
@Component
public class ProducerFactoryTool extends ProducergApplicationProperties {
private static KafkaProducer producer = null;
public static KafkaProducer SingleCase(String data){
KafkaProducer kafkaProducer = producer;
kafkaProducer.send(new ProducerRecord(KAFKA_PRODUCER_TOPIC, data));
return kafkaProducer;
}
private ProducerFactoryTool() {
Properties properties = new Properties();
properties.put("bootstrap.servers",KAFKA_SERVER_HOST_PORT);//xxx服务器ip
properties.put("acks",KAFKA_ACKS);//所有follower都响应了才认为消息提交成功,即"committed"
properties.put("retries",KAFKA_RETRIES);//retries = MAX 无限重试,直到你意识到出现了问题:)
properties.put("batch.size", KAFKA_BATCH_SIZE);//producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数
//batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms
properties.put("linger.ms", KAFKA_BATCH_LINGER);//延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理
properties.put("buffer.memory", KAFKA_CACHE_MEMORY);//producer可以用来缓存数据的内存大小。
properties.put("key.serializer", KAFKA_KEY_SERIALIZER);
properties.put("value.serializer", KAFKA_KEY_SERIALIZER);
producer = new KafkaProducer<>(properties);
}
}
7.测试 生产消费
import cn.com.kaf.DemoApplication;
import cn.com.kaf.consumer.ConsumerFactoryTool;
import cn.com.kaf.producer.ProducerFactoryTool;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.junit.Before;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
public class Te {
@Before
public void StBefore() {
String[] args = new String[0];
SpringApplication.run(DemoApplication.class, args);
}
@Test
public void pullinkafka() throws InterruptedException {
KafkaConsumer kafkaConsumer = ConsumerFactoryTool.SingleCase();
while (true) {
ConsumerRecords records = kafkaConsumer.poll(100);
for (ConsumerRecord record : records) {
System.out.println("-----------------");
System.out.printf("offset = %d, value = %s", record.offset(), record.value());
System.out.println();
}
}
}
@Test
public void pushInKafka() throws InterruptedException {
String data = "";
KafkaProducer kafkaProducer = ProducerFactoryTool.SingleCase(data);
}
}



