- 1. 构建TB级异步消息系统
- 1.1 引入Kafka
- 1.2 Spring 整合 Kafka
- 1.3 发送系统通知
分为。
1.1 引入Kafka- 在 Kafka.apach.org 安装 kafka 压缩包,解压缩。
- 更改 config 目录下 zookeeper.properties 和 server.properties 下的 /temp 路径,改为硬盘下路径。
- Kafka简介:其中broker是kafka的集群服务器,而zookeeper是管理集群的。topic是存放消息的空间,partition是topic的分区。
- 导入 Spring-kafka 依赖。
- 在 application.properties 中设置配置。
- 在使用 kafka 前要使用命令行打开 kafka 和 zookeeper 命令为:
1.3 发送系统通知D:workkafka_2.12-3.1.0>binwindowskafka-server-start.bat configserver.properties
D:workkafka_2.12-3.1.0>binwindowszookeeper-server-start.bat configzookeeper.properties
- 构建事件 Event 实体类。包含字段:
private String topic; // 存放消息空间
private int userId; // 触发事件用户id
private int entityType; // 事件目标类型
private int entityId; // 事件目标id
private int entityUserId ;// 事件目标所有者id
private Mapdata = new HashMap<>(); // 其他
- 在 event 文件夹下构建 EventProducer 类,调用 KafkaTemplate 将事件发布到指定的主题。
@Component
public class EventProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
// 处理事件
public void fireEvent(Event event) {
// 将事件发布到指定的主题
kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
}
}
- 在 event 文件夹下构建 EventConsumer 类,监听三种类型topic,即:评论,点赞,关注。根据收到的 event 构建 message 对象,使用 messageService 插入数据库。
@Component
public class EventConsumer implements CommunityConstant {
private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
@Autowired
private MessageService messageService;
@KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW})
public void handleCommentMessage(ConsumerRecord record) {
if (record == null || record.value() == null) {
logger.error("消息的内容为空!");
return;
}
Event event = JSONObject.parseObject(record.value().toString(), Event.class);
if (event == null) {
logger.error("消息格式错误!");
return;
}
// 发送站内通知
Message message = new Message();
// 系统发送 设为 1
message.setFromId(SYSTEM_USER_ID);
message.setToId(event.getEntityUserId());
// ConversationId 设为 topic。
message.setConversationId(event.getTopic());
message.setCreateTime(new Date());
// message 的 content
Map content = new HashMap<>();
content.put("userId", event.getUserId());
content.put("entityType", event.getEntityType());
content.put("entityId", event.getEntityId());
if (!event.getData().isEmpty()) {
for (Map.Entry entry : event.getData().entrySet()) {
content.put(entry.getKey(), entry.getValue());
}
}
// 保存为 json 字符串
message.setContent(JSONObject.toJSONString(content));
messageService.addMessage(message);
}
}
- 在评论,点赞,关注时加入触发事件代码。在 CommentController,LikeController,FollowController中加入相应代码,即构建event并且调用 eventProducer 发布事件。



