栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

仿牛客社区项目笔记-构建TB级异步消息系统(引入Kafka)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

仿牛客社区项目笔记-构建TB级异步消息系统(引入Kafka)

仿牛客社区项目笔记-构建TB级异步消息系统(引入Kafka)
  • 1. 构建TB级异步消息系统
    • 1.1 引入Kafka
    • 1.2 Spring 整合 Kafka
    • 1.3 发送系统通知

1. 构建TB级异步消息系统

分为。

1.1 引入Kafka
  1. 在 Kafka.apach.org 安装 kafka 压缩包,解压缩。
  2. 更改 config 目录下 zookeeper.properties 和 server.properties 下的 /temp 路径,改为硬盘下路径。
  3. Kafka简介:其中broker是kafka的集群服务器,而zookeeper是管理集群的。topic是存放消息的空间,partition是topic的分区。

1.2 Spring 整合 Kafka
  1. 导入 Spring-kafka 依赖。
  2. 在 application.properties 中设置配置。
  3. 在使用 kafka 前要使用命令行打开 kafka 和 zookeeper 命令为:

D:workkafka_2.12-3.1.0>binwindowskafka-server-start.bat configserver.properties
D:workkafka_2.12-3.1.0>binwindowszookeeper-server-start.bat configzookeeper.properties

1.3 发送系统通知
  1. 构建事件 Event 实体类。包含字段:

private String topic; // 存放消息空间
private int userId; // 触发事件用户id
private int entityType; // 事件目标类型
private int entityId; // 事件目标id
private int entityUserId ;// 事件目标所有者id
private Map data = new HashMap<>(); // 其他

  1. 在 event 文件夹下构建 EventProducer 类,调用 KafkaTemplate 将事件发布到指定的主题。
@Component
public class EventProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    // 处理事件
    public void fireEvent(Event event) {
        // 将事件发布到指定的主题
        kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
    }
}
  1. 在 event 文件夹下构建 EventConsumer 类,监听三种类型topic,即:评论,点赞,关注。根据收到的 event 构建 message 对象,使用 messageService 插入数据库。
@Component
public class EventConsumer implements CommunityConstant {

    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);

    @Autowired
    private MessageService messageService;

    @KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW})
    public void handleCommentMessage(ConsumerRecord record) {
        if (record == null || record.value() == null) {
            logger.error("消息的内容为空!");
            return;
        }

        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        if (event == null) {
            logger.error("消息格式错误!");
            return;
        }

        // 发送站内通知
        Message message = new Message();
        // 系统发送 设为 1
        message.setFromId(SYSTEM_USER_ID);
        message.setToId(event.getEntityUserId());
        // ConversationId 设为 topic。
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());
        
        // message 的 content 
        Map content = new HashMap<>();
        content.put("userId", event.getUserId());
        content.put("entityType", event.getEntityType());
        content.put("entityId", event.getEntityId());

        if (!event.getData().isEmpty()) {
            for (Map.Entry entry : event.getData().entrySet()) {
                content.put(entry.getKey(), entry.getValue());
            }
        }
        
        // 保存为 json 字符串
        message.setContent(JSONObject.toJSONString(content));
        messageService.addMessage(message);
    }
}
  1. 在评论,点赞,关注时加入触发事件代码。在 CommentController,LikeController,FollowController中加入相应代码,即构建event并且调用 eventProducer 发布事件。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/850582.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号