[root@CentOSA kafka-eagle]# ll 总用量 0 drwxr-xr-x. 2 root root 33 1月 2 22:26 bin drwxr-xr-x. 2 root root 62 1月 2 22:26 conf drwxr-xr-x. 2 root root 6 9月 13 01:12 db drwxr-xr-x. 2 root root 23 1月 2 22:26 font drwxr-xr-x. 9 root root 91 9月 13 01:12 kms drwxr-xr-x. 2 root root 6 9月 13 01:12 logs3.配置文件
[root@CentOSA conf]# cat system-config.properties ###################################### # multi zookeeper & kafka cluster list # Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead ###################################### efak.zk.cluster.alias=cluster1,cluster2 # 1.修改zookeeper 集群 cluster1.zk.list=CentOSA:2181,CentOSB:2181,CentOSC:2181 # cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181 ###################################### # zookeeper enable acl ###################################### cluster1.zk.acl.enable=false cluster1.zk.acl.schema=digest cluster1.zk.acl.username=test cluster1.zk.acl.password=test123 ###################################### # broker size online list ###################################### cluster1.efak.broker.size=20 ###################################### # zk client thread limit ###################################### kafka.zk.limit.size=32 ###################################### # EFAK webui port 2.服务端口 ###################################### efak.webui.port=8048 ###################################### # kafka jmx acl and ssl authenticate ###################################### cluster1.efak.jmx.acl=false cluster1.efak.jmx.user=keadmin cluster1.efak.jmx.password=keadmin123 cluster1.efak.jmx.ssl=false cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore cluster1.efak.jmx.truststore.password=ke123456 ###################################### # kafka offset storage ###################################### cluster1.efak.offset.storage=kafka # cluster2.efak.offset.storage=zk ###################################### # kafka jmx uri ###################################### cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi ###################################### # kafka metrics, 15 days by default 3.报表图 这个要开必须开启kafka jmx ###################################### efak.metrics.charts=true efak.metrics.retain=15 ###################################### # kafka sql topic records max ###################################### efak.sql.topic.records.max=5000 efak.sql.topic.preview.records.max=10 ###################################### # delete kafka topic token 4.管理的密码 ###################################### efak.topic.token=keadmin ###################################### # kafka sasl authenticate ###################################### cluster1.efak.sasl.enable=false cluster1.efak.sasl.protocol=SASL_PLAINTEXT cluster1.efak.sasl.mechanism=SCRAM-SHA-256 cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle"; cluster1.efak.sasl.client.id= cluster1.efak.blacklist.topics= cluster1.efak.sasl.cgroup.enable=false cluster1.efak.sasl.cgroup.topics= #cluster2.efak.sasl.enable=false #cluster2.efak.sasl.protocol=SASL_PLAINTEXT #cluster2.efak.sasl.mechanism=PLAIN #cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle"; #cluster2.efak.sasl.client.id= #cluster2.efak.blacklist.topics= #cluster2.efak.sasl.cgroup.enable=false #cluster2.efak.sasl.cgroup.topics= ###################################### # kafka ssl authenticate ###################################### cluster3.efak.ssl.enable=false cluster3.efak.ssl.protocol=SSL cluster3.efak.ssl.truststore.location= cluster3.efak.ssl.truststore.password= cluster3.efak.ssl.keystore.location= cluster3.efak.ssl.keystore.password= cluster3.efak.ssl.key.password= cluster3.efak.ssl.endpoint.identification.algorithm=https cluster3.efak.blacklist.topics= cluster3.efak.ssl.cgroup.enable=false cluster3.efak.ssl.cgroup.topics= ###################################### # kafka sqlite jdbc driver address ###################################### efak.driver=org.sqlite.JDBC efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db efak.username=root efak.password=www.kafka-eagle.org ###################################### # kafka mysql jdbc driver address 5.插件依赖的数据库 ###################################### efak.driver=com.mysql.cj.jdbc.Driver efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull efak.username=root efak.password=root4.配置KE_HOME
因为启动的时候需要这个变量;
二. Springboot 项目集成 1.pom.xml2.配置文件4.0.0 com.sff springbootkafka 1.0-SNAPSHOT org.springframework.boot spring-boot-starter-parent 2.2.6.RELEASE org.springframework.boot spring-boot-starter org.springframework.kafka spring-kafka org.springframework.boot spring-boot-starter-test test 8 8
spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,CentOSC:9092 #producer spring.kafka.producer.retries=5 spring.kafka.producer.acks=all spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 ## 开启事务 如果 1.代码里面使用事务的api 2.使用transaction 若无发送数据会报错 spring.kafka.producer.transaction-id-prefix=transaction-id- spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.properties.enable.idempotence=true # consumer spring.kafka.consumer.group-id=springboot-kafka spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.properties.isolation.level=read_committed spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer ##批量消费 # 若没有配置 则每次消费一个 # 若配置 KafkaListeners 注解的方法 就不是一个对象了 而是一个List 使用一个对象会报错 spring.kafka.listener.type=batch spring.kafka.listener.poll-timeout=1000 spring.kafka.listener.concurrency=1 spring.kafka.consumer.max-poll-records=203.监听kafka代码 3.1 则每次消费一个
@KafkaListeners(
value = {
@KafkaListener(topics = {"topic06"})
}
)
public void recevice3(ConsumerRecord record) {
System.out.println("topic06: "+record.value());
}
3.2 每次消费多个
@KafkaListeners(
value = {
@KafkaListener(topics = {"topic06"})
}
)
public void recevice3(List> records) {
System.out.println("consumer size:"+records.size());
for (ConsumerRecord record : records) {
System.out.println("topic06: " + record.value());
}
}
4.发送kafka代码
4.1 发送没有事务
public void sendMessage(String topic ,String value) throws InterruptedException {
ProducerRecord record = new ProducerRecord<>(topic, value);
kafkaTemplate.send(record);
}
4.2 发送携带事务一
public void testSend(){
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
@Override
public Object doInOperations(KafkaOperations operations) {
for (int i = 0; i < 50; i++) {
ProducerRecord record = new ProducerRecord<>("topic06", "testSend"+i);
kafkaTemplate.send(record);
}
return null;
}
});
}
4.3 发送携带事务二
@Transactional
public void sendMessage(String topic, String value) {
ProducerRecord record = new ProducerRecord<>(topic, value);
kafkaTemplate.send(record);
}



