【启动kafka前,需要调整这个配置文件(/opt/kafka-2.1.0/config/server.properties)的一处ip地址】
因为每次虚拟机的ip地址可能会有所不同,这和所在网络有关,也可将ip固定,这样不需要调整配置文件
安装并使用centos7安装kafka
1.解压2.修改配置文件3.创建启动文件4.启动文件赋予权限5.直接启动kafka的启动文件即可6.命令行简单测试 安装kafka-tool客户端java程序调用
生产者
pom.xml文件引入依赖application.yml文件配置kafka信息开发Bean对象,初始化kafka地址,以方便调用controller调用postman测试 消费者
centos7安装kafka前辈的数据不丢失思想
1.解压2.修改配置文件tar -xvf kafka_2.11-2.1.0.tgz
mv kafka_2.11-2.1.0 kafka-2.1.0
cd /opt/kafka-2.1.0/config
【修改server.properties文件】
vim server.properties
修改内容如下:
broker.id=0
#端口号
port=9092
#单机可直接用localhost
host.name=localhost
listeners=PLAINTEXT://192.168.43.196:9092
#日志存放路径
log.dirs=/DATA/kafka/kafka_2.12-2.0.0/log
#zookeeper地址和端口,单机配置部署,localhost:2181
zookeeper.connect=localhost:2181
3.创建启动文件【修改zookeeper.properties文件】
vim zookeeper.properties
修改内容如下:
#zookeeper数据目录
dataDir=/opt/zookeeper-3.4.8/data
#zookeeper日志目录
dataLogDir=/opt/zookeeper-3.4.8/log
clientPort=2181
maxClientCnxns=100
tickTime=2000
initLimit=10
vim kafkaStart.sh
4.启动文件赋予权限内容如下:
#!/bin/bash
#启动zookeeper
/opt/kafka-2.1.0/bin/zookeeper-server-start.sh /opt/kafka-2.1.0/config/zookeeper.properties &
sleep 3 #默默等3秒后执行
#启动kafka
/opt/kafka-2.1.0/bin/kafka-server-start.sh /opt/kafka-2.1.0/config/server.properties &
5.直接启动kafka的启动文件即可chmod +x kafkaStart.sh
6.命令行简单测试cd bin
./kafkaStart.sh
#创建名字为test的topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
#查看topic列表
./kafka-topics.sh --list --zookeeper localhost:2181
#删除名字为test的topic
./kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
#开启两个终端 (生产者)
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
安装kafka-tool客户端#消费者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
傻瓜式安装,通过ip进行连接
application.yml文件配置kafka信息org.springframework.kafka spring-kafka
#kafka推送消息信息配置 kafka: #url url: 192.168.124.22:9092 #topic名称 topic: testKafka #发生错误后,消息重发的次数 retries: 0 #Broker对producer即将发送来的数据采用何种确认方式 acks: 1开发Bean对象,初始化kafka地址,以方便调用
MetricsCallBack 回调方法(获取发送消息的结果)
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Recordmetadata;
@Slf4j
public abstract class MetricsCallBack implements Callback {
private final long startTime;
private final String key;
private final String message;
public MetricsCallBack(long startTime,String key,String message){
this.startTime=startTime;
this.key=key;
this.message=message;
}
@Override
public void onCompletion(Recordmetadata metadata, Exception exception) {
long cost=System.currentTimeMillis()-startTime;
if(metadata!=null){
onSendFinish(this.key,this.message,true,cost,metadata.topic(),metadata.partition(),metadata.offset());
}else{
log.error("send message to kafka failed!",exception);
onSendFinish(this.key,this.message,false,cost,null,-1,-1);
}
}
public abstract void onSendFinish(String key,String message,boolean success,long useTime,String topic,int partition,long offset);
}
KafkaProducerHelper 工具类(发送消息)
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@Slf4j
public class KafkaProducerHelper {
private final KafkaProducer producer;
public KafkaProducerHelper(String servers,String acks,int retries){
log.info("有参构造方法....");
Properties properties=new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
properties.put(ProducerConfig.CLIENT_ID_CONFIG,"clientId");
properties.put(ProducerConfig.ACKS_CONFIG,acks);
properties.put(ProducerConfig.RETRIES_CONFIG,retries);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,100*1024);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer=new KafkaProducer(properties);
}
public void send(String topic,String message,boolean printLog){
if(printLog){
log.info("topic:{},message:{}",topic,message);
}
try {
this.producer.send(new ProducerRecord<>(topic,message)).get();
} catch (InterruptedException |ExecutionException e) {
log.error("get result error!",e);
}
}
public void send(String topic,String key,String message,boolean printLog){
if(printLog){
log.info("topic:{},key:{},message:{}",topic,key,message);
}
try {
this.producer.send(new ProducerRecord<>(topic,key,message)).get();
} catch (InterruptedException |ExecutionException e) {
log.error("get result error!",e);
}
}
public void send(String topic,String message,MetricsCallBack metricsCallBack,boolean printLog){
if(printLog){
log.info("topic:{},message:{}",topic,message);
}
this.producer.send(new ProducerRecord<>(topic,message),metricsCallBack);
}
public void send(String topic,String key,String message,MetricsCallBack metricsCallBack,boolean printLog){
if(printLog){
log.info("topic:{},key:{},message:{}",topic,key,message);
}
this.producer.send(new ProducerRecord<>(topic,key,message),metricsCallBack);
}
}
KafkaConfig 将kafka配置信息作为bean加载到容器中
import com.bigdata.bigdata.util.kafka.KafkaProducerHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaConfig {
@Value("${kafka.url}")
private String bootStrapServers;
@Value("${kafka.retries}")
private int retries;
@Value("${kafka.acks}")
private String acks;
@Bean
public KafkaProducerHelper getKafkaProducerHelper(){
return new KafkaProducerHelper(bootStrapServers,acks,retries);
}
}
KakfaProducerComponent 封装发送消息供他人调用
import com.alibaba.fastjson.JSON;
import com.bigdata.bigdata.util.kafka.KafkaProducerHelper;
import com.bigdata.bigdata.util.kafka.MetricsCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
@Slf4j
public class KakfaProducerComponent {
@Value("${kafka.topic}")
private String topicKey;
@Autowired
private KafkaProducerHelper kafkaProducerHelper;
@Async
public void sendMessage(String orderId,int status){
MetricsCallBack metricsCallBack=null;
try{
Map map=new HashMap();
map.put("orderId",orderId);
map.put("status",status);
String message= JSON.toJSONString(map);
metricsCallBack = new MetricsCallBack(System.currentTimeMillis(),"",message){
@Override
public void onSendFinish(String key, String message, boolean success, long useTime, String topic, int partition, long offset) {
log.info("推送消息:{},分区:{},偏移量:{}",message,partition,offset);
}
};
kafkaProducerHelper.send(topicKey,message,metricsCallBack,true);
}catch (Exception e){
log.error("kafka发生异常",e);
}finally {
metricsCallBack=null;
}
}
}
controller调用
import com.bigdata.bigdata.component.KakfaProducerComponent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
private KakfaProducerComponent kafkaProducer;
@GetMapping("/sendMessage")
public void sendMessage(String orderId,int status){
kafkaProducer.sendMessage(orderId,status);
}
}
postman测试
接下来就是在kafkaTool客户端查看数据
待完成



