BlockingQueue时一个接口,又许多实现类
Kafka入门高吞吐量:Kafka是硬盘顺序读取:硬盘顺序读取高于内存的随机读取。
高可靠性:分布式的集群
高扩展性:加集群很方便
Broker:Kafka的服务器
Zookeeper:独立的软件,用来管理集群(Kafka里面内置有)
Topic:主题(消息队列实现的方式:点对点(每个数据只被一个消费者消费),发布订阅模式(消息可以被多个消费者读取))
Partition:对主题分区,调高效率,每一个分区顺序追加数据
offset:消息再分区内存放的索引,序列
Replica:副本,做备份,分布式的消息引擎,为了数据更可靠,提高容错率。分为Leader(可以处理请求)和Follower(只是备份不能响应)。主副本挂掉后,从众多的从副本里选一个新的主副本
进入Kafka解压后的文件目录
cd C:usersoftkafka_2.12-2.3.0
启动zokeeper
binwindowszookeeper-server-start.bat configzookeeper.properties
启动Kafka
binwindowskafka-server-start.bat configserver.properties
创建主题(代表位置,代表消息类别)
指定那个服务器,一个副本一个分区,主题的名字
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
查看所有主题
kafka-topics.bat --list --bootstrap-server localhost:9092
调用生产者发送消息
kafka-console-producer.bat --broker-list localhost:9092 --topic test
在另起一个窗口消费者读取消息
消费者消费消息
从哪个服务器,读哪个主题
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
# 存放路径(看自己情况)
E:javawebworkkafka_2.12-2.8.0
# windows运行路径(看自己的情况)
E:javawebworkkafka_2.12-2.8.0binwindows
# 启动服务器 (先启动zookeeper服务器,再启动kafka) !!!千万不要手动暴力关闭,用下面的命令关闭
binwindowszookeeper-server-start.bat configzookeeper.properties``binwindowskafka-server-start.bat configserver.properties
# 创建主题
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
# 查看当前服务器的主题
kafka-topics.bat --list --bootstrap-server localhost:9092
# 创建生产者,往指定主题上发消息
kafka-console-producer.bat --broker-list localhost:9092 --topic test
# 消费者
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
# 关闭zookeeper服务器
zookeeper-server-stop.bat
# 关闭kafka服务器
kafka-server-stop.batSpring整合Kafka
org.springframework.kafka spring-kafka
配置
#kafka spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=test-consumer-group #是否自动提交消费者的偏移量(读取消息按偏移量读取) spring.kafka.consumer.enable-auto-commit=true #自动提交的频率 spring.kafka.consumer.auto-commit-interval=3000
测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaTest {
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testKafka(){
kafkaProducer.sendMessage("learn","学习");
kafkaProducer.sendMessage("learn","加油");
try {
Thread.sleep(2000*10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//生产者,交给spring管理,发送消息需要我们手动去调用方法
@Component
class KafkaProducer{
@Autowired
private KafkaTemplate kafkaTemplate;
//发消息方法,主题和内容
public void sendMessage(String topic,String content){
kafkaTemplate.send(topic,content);
}
}
//消费者,消费者处理消息是被动的,当队列中有消息就去处理
@Component
class KafkaConsumer{
//要监听的主题
@KafkaListener(topics = {"learn"})
public void handleMessage(ConsumerRecord record){
System.out.println(record.value());
}
}
发送系统通知
为了提高性能,需要用到消息队列,为什么需要消息队列呢?
评论、点赞、关注不同的事情,三个主题,包装扔到队列里,当前线程(消息的生产者)就可以继续去做别的事情。
生产者和消费者处理消息是可以并发的,称为异步。
业务角度时间为目标
系统通知,后台给用户发,假设后台id(from_id)为1,这个时候conversation_id改存为主题(评论、点赞、关注),不再是两个对话的id,内容存json字符串,里面包含页面上要显示的信息
在相应的事件(评论、点赞、关注)后调用生产者,发送消息,消费者不需要调
为了可以看到帖子详细页面还需要帖子id,因为没有帖子对应的属性,所以将帖子id存到content里面
判断评论的是实体是帖子还是评论,将其存到实体的作者里面
不管是什么根据实体id找到目标
event事件封装好之后,发送消息。之后线程继续向下执行,后序消息的发布有消息队列发布,并行异步,我在处理的同时,另外一个线程也在处理执行
点赞才通知,取消点赞不通知,所以要先判断一下
Kafka出现问题就把日志文件删除
消费者
@Component //消费者
public class EventConsumer implements CommunityConstant {
//记日志
private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);
//往消息表里插数据
@Autowired
private MessageService messageService;
//一个方法,包含三个主题(定义常量引用主题)
@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_FOLLOW,TOPIC_LIKE})
public void handleCommentMessage(ConsumerRecord record){
//发了一个空消息
if(record==null || record.value()==null){
logger.error("发送消息为空!");
return;
}
//将json消息转为对象,指定字符串对应的具体类型
Event event = JSONObject.parseObject(record.value().toString(), Event.class);
//转为对象之后再判断
if(event==null){
logger.error("消息格式错误!");
return;
}
//内容和格式都对之后,发送站内通知
Message message=new Message();
//设置消息的发送者,from_id系统用户规定为1(定义为常量)
message.setFromId(SYSTEM_USER_ID);
//设置消息的接收者(帖子的作者)
message.setToId(event.getEntityUserId());
//存主题
message.setConversationId(event.getTopic());
//当前时间
message.setCreateTime(new Date());
//内容是json字符串,先存map,再转为json字符串存进入
Map content=new HashMap<>();
//事件触发者
content.put("userId",event.getUserId());
//实体的类型(帖子、点赞、关注)
content.put("entityType",event.getEntityType());
//实体的id
content.put("entityId",event.getEntityId());
//不方便村的字段存到content
//判断事件对象有没有值
if(!event.getData().isEmpty()){
//遍历事件对象的map,将其存到content里
for (Map.Entry entry : event.getData().entrySet()) {
content.put(entry.getKey(),entry.getValue());
}
}
//将content转为json字符串
message.setContent(JSONObject.toJSONString(content));
//存消息
messageService.addLetter(message);
}
}
生产者
@Component
public class EventProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
//生产者发送消息(处理事件)
public void fireEvent(Event event){
//将事件发布到指定的主题(主题,字符串(事件当中所有的数据)json)
kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
}
}
别人评论之后触发事件
//封装触发评论事件,链接评论帖子/评论的详情
Event event=new Event()
.setUserId(hostHolder.getUser().getId())
.setEntityType(comment.getEntityType())
.setEntityId(comment.getEntityId())
.setData("postId",discussPostId)
.setTopic(TOPIC_COMMENT);
if(comment.getEntityType()==ENTITY_TYPE_POST){
DiscussPost target = discussPostService.findDiscussDetail(comment.getEntityId());
event.setEntityUserId(target.getId());
}else if(comment.getEntityType()==ENTITY_TYPE_COMMENT){
Comment target = commentService.findComment(comment.getEntityId());
event.setEntityUserId(target.getId());
}
eventProducer.fireEvent(event);
显示系统通知
通知存在message里面
每个请求都有消息链接,用拦截器处理,实现接口,在controller之后,模板之前post
判断有没有登录,mv有没有携带,查私信和通知,相加传给页面
拦截器写完之后要进行配置
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
//判断有没有登录,mv有没有携带数据
User user = hostHolder.getUser();
if (user!=null && modelAndView!=null){
int unreadCount = messageService.findUnreadCount(user.getId(), null);
int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
modelAndView.addObject("allUnreadCount",unreadCount+noticeUnreadCount);
}
}
modelAndView) throws Exception {
//判断有没有登录,mv有没有携带数据
User user = hostHolder.getUser();
if (user!=null && modelAndView!=null){
int unreadCount = messageService.findUnreadCount(user.getId(), null);
int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
modelAndView.addObject("allUnreadCount",unreadCount+noticeUnreadCount);
}
}



