栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

ATeam社区(牛客网项目第五章)

ATeam社区(牛客网项目第五章)

文章目录
  • 1. 阻塞队列
  • 2. Kafka入门
    • 2.1 Kafka术语解释
    • 2.2 Windows下修改配置
    • 2.3 Windows下使用Kafka
  • 3. Spring整合Kafka
  • 4. 发送系统通知
    • 4.1 封装事件对象
    • 4.2 开发事件的生产者
    • 4.3 开发事件的消费者
    • 4.4 处理评论事件-CommentController
    • 4.5 处理点赞事件-LikeController
    • 4.6 处理关注事件-FollowController
    • 4.7 测试
  • 5. 显示系统通知
    • 5.1 通知列表
      • 5.1.1 数据层
      • 5.1.2 服务层
      • 5.1.3 视图层
    • 5.2 通知详情
      • 5.2.1 数据层
      • 5.2.2 服务层
      • 5.2.3 视图层
    • 5.3 页面头部显示所有的未读消息数量

1. 阻塞队列
  • BlockingQueue
    • 解决线程通信问题
    • 阻塞方法:put、take
  • 生产者消费者模式
    • 生产者:产生数据的线程
    • 消费者:使用数据的线程
  • 实现类
    • ArrayBlockingQueue
    • linkedBlocikingQueue
    • PriorityBlockingQueue、SynchronousQueue、DelayQueue等

      面试常问:写一个生产者消费者实现模式
package com.ateam.community;

import sun.net.www.protocol.http.HttpURLConnection;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;


public class BlockingQueueTests {

    public static void main(String[] args){
      BlockingQueue queue = new ArrayBlockingQueue(10);
      new Thread(new Producer(queue)).start();
      new Thread(new Customer(queue)).start();
      new Thread(new Customer(queue)).start();
      new Thread(new Customer(queue)).start();
      new Thread(new Customer(queue)).start();
      new Thread(new Customer(queue)).start();
    }
    


}


class Producer implements Runnable{

    private BlockingQueue queue;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 100; i++) {
                Thread.sleep(20);
                queue.put(i);
                System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}



class Customer implements Runnable{

    private BlockingQueue queue;

    public Customer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(new Random().nextInt(1000));
                queue.take();
                System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
2. Kafka入门
  • Kafka简介
    • Kafka是一个分布式的流媒体平台
    • 应用:消息系统、日志收集、用户行为追踪、流式处理
  • Kafkat特点
    • 高吞吐量、消息持久化、高可靠性、高扩展性
  • Kafka术语
    • Broker、Zookeeper
    • Topic、Partion、Offset
    • Leader Replica、Follower Replica
2.1 Kafka术语解释
  • Broker:Kafka的服务器
  • Zookeeper:管理集群
  • Topic:点对点模式中每个消费者拿到的消息都不同,发布订阅模式中消费者可能拿到同一份消息。Kafka采用发布订阅模式,生产者把消息发布到的空间(位置)叫做Topic
  • Partition:是对topic位置的分区
  • Offset:就是消息存放在分区中的索引
  • Leader Replica:主副本,可以处理请求
  • Follower Replica:从副本,只是用来做备份
    Kafka官网:官网
2.2 Windows下修改配置
  1. Kafka下的目录结构
  2. 修改config目录下的zookeeper的配置文件zookeepe.properties
  3. 修改config目录下的Kafka的配置文件server.properties
2.3 Windows下使用Kafka
  1. 启动Zookeeper
    D:kafka_2.12-2.8.1>binwindowszookeeper-server-start.bat configzookeeper.properties
  2. 启动Kafka
    D:kafka_2.12-2.8.1>binwindowskafka-server-start.bat configserver.properties
    Kafka启动以后,在刚才设置的目录,就会发现对应的文件
  3. 创建主题

    查看所有主题,判断是否主题创建是否成功
  4. 生产者往主题上发送消息

    发送两条消息:hello world
  5. 消费者接受消息

    说明Kafka已正常工作,可以进一步开发使用了。
3. Spring整合Kafka
  • 引入依赖
    • spring-kafka
  • 配置Kafka
    • 配置server、consumer
  • 访问Kafka
    • 生产者
      kafkaTemplate.send(topic, data);
    • 消费者
      @KafkaListener(topics = {“test”})
      public void handleMessage(ConsumerRecord record) {}

  1. 引入依赖

    org.springframework.kafka
    spring-kafka

  1. Kafka相关配置
    spring整合Kafka,在application.properties中配置即可
# kafka
# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000
  1. 测试
    在test包下,新建KafkaTests类,进行测试
package com.ateam.community;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;


@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)//配置类
public class KafkaTests {

    @Autowired
    private KafkaProducer kafkaProducer;


    @Test
    public void testKafka(){
        kafkaProducer.sendMessage("test","hello");
        kafkaProducer.sendMessage("test","world");
        try {
            Thread.sleep(1000 * 10);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


@Component
class KafkaProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String content) {
        kafkaTemplate.send(topic, content);
    }
}

@Component
class KafkaConsumer {
    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record) {
        System.out.println(record.value());
    }
}

生产者主动发送消息,消费者被动接收消息。

4. 发送系统通知
  • 触发事件
    • 评论后,发布通知
    • 点赞后,发布通知
    • 关注后,发布通知
  • 处理事件
    • 封装事件对象
    • 开发事件的生产者
    • 开发事件的消费者
4.1 封装事件对象

在entity包下,新建一个类Event,用来封装事件

package com.ateam.community.entity;

import java.util.HashMap;
import java.util.Map;


// 事件
public class Event {

	
	
    private String topic;// 主题,即事件的类型
    private int userId;// 触发事件的用户id
    private int entityType;// 实体的类型
    private int entityId;// 实体的id
    private int entityUserId;// 实体的拥有者id
    private Map data = new HashMap<>();// 其他的一些数据、

    public String getTopic() {
        return topic;
    }

    // 改造set方法
    public Event setTopic(String topic) {
        this.topic = topic;
        return this;
    }

    public int getUserId() {
        return userId;
    }

    public Event setUserId(int userId) {
        this.userId = userId;
        return this;
    }

    public int getEntityType() {
        return entityType;
    }

    public Event setEntityType(int entityType) {
        this.entityType = entityType;
        return this;
    }

    public int getEntityId() {
        return entityId;
    }

    public Event setEntityId(int entityId) {
        this.entityId = entityId;
        return this;
    }

    public int getEntityUserId() {
        return entityUserId;
    }

    public Event setEntityUserId(int entityUserId) {
        this.entityUserId = entityUserId;
        return this;
    }

    public Map getData() {
        return data;
    }

    public Event setData(String key, Object value){
        this.data.put(key,value);
        return this;
    }
}

  • 注意set方法的修改,是为了方便后续的操作,即可以类似sb.append("").append("wsh");
  • 注意Map data的set方法,又与其他属性不同
4.2 开发事件的生产者

新建event包,在包内创建EventProducer类

@Component
public class EventProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    // 处理事件,本质上就是发送消息
    public void fireEvent(Event event) {
        // 将事件发布到指定的主题
        kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
    }

}
4.3 开发事件的消费者

先在CommunityConstant类中,增加一些常量:

    
    String TOPIC_COMMENT = "comment";

    
    String TOPIC_LIKE = "like";

    
    String TOPIC_FOLLOW = "follow";

    
    String TOPIC_PUBLISH = "publish";

    
    String TOPIC_DELETe = "delete";

    
    String TOPIC_SHARE = "share";

    
    int SYSTEM_USER_ID = 1;

在event包下, 新建EventConsumer类

package com.ateam.community.event;

@Component
public class EventConsumer implements CommunityConstant {
	// 记录日志
    private static final Logger logger = LoggerFactory.getLogger(Event.class);

    @Autowired
    private MessageService messageService;


    @KafkaListener(topics = {TOPIC_COMMENT,TOPIC_FOLLOW,TOPIC_LIKE})
    public void handleEventMessage(ConsumerRecord record) {
        if (record == null || record.value() == null) {
            logger.error("消息的内容为空!");
            return;
        }
        // 利用fastjson将json字符串转化为Event对象
        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        if (event == null) {
            logger.error("消息格式错误!");
            return;
        }

        // 发送站内通知,主要是构造Message对象
        Message message = new Message();
        // User表中的id为1的代表系统用户
        message.setFromId(SYSTEM_USER_ID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());

        HashMap content = new HashMap<>();
        content.put("userId",event.getUserId());
        content.put("entityType",event.getEntityType());
        content.put("entityId", event.getEntityId());

        if (!event.getData().isEmpty()) {
            for (Map.Entry entry : event.getData().entrySet()) {
                content.put(entry.getKey(),entry.getValue());
            }
        }

        message.setContent(JSONObject.toJSONString(content));
        messageService.addMessage(message);

    }

}

4.4 处理评论事件-CommentController

在addComment方法中,怎讲如下代码:

修改后,该方法完整代码如下:

    @RequestMapping(value = "/add/{discussPostId}", method = RequestMethod.POST)
    @LoginRequired
    public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment){
        comment.setUserId(hostHolder.getUser().getId());
        comment.setStatus(0);
        comment.setCreateTime(new Date());
        commentService.addComment(comment);

        // 触发评论事件
        Event event = new Event()
                .setTopic(TOPIC_COMMENT)
                .setUserId(hostHolder.getUser().getId())
                .setEntityId(comment.getEntityId())
                .setEntityType(comment.getEntityType())
                .setData("postId",discussPostId);



        if (comment.getEntityType() == ENTITY_TYPE_POST) {
            DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        } else if (comment.getEntityType() == ENTITY_TYPE_COMMENT) {
            Comment target = commentService.findCommentById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        }

        eventProducer.fireEvent(event);


        return "redirect:/discuss/detail/" + discussPostId;
    }
4.5 处理点赞事件-LikeController

在LikeController类中like方法,增加如下代码:

修改后,该方法完整代码如下:

   @RequestMapping(value = "/like", method = RequestMethod.POST)
    @ResponseBody
    public String like(int entityType, int entityId, int entityUserId, int postId) {
        User user = hostHolder.getUser();

        // 点赞
        likeService.like(user.getId(),entityType,entityId,entityUserId);
        // 获得点赞的数量
        long likeCount = likeService.findEntityLikeCount(entityType, entityId);
        // 获得点赞的状态
        int likeStatus = likeService.findEntityLikeStatus(user.getId(), entityType, entityId);

        // 封装返回结果
        HashMap map = new HashMap<>();
        map.put("likeCount",likeCount);
        map.put("likeStatus",likeStatus);

        // 触发点赞事件,取消点赞就不发消息了
       if (likeStatus == 1) {
           Event event = new Event()
                   .setTopic(TOPIC_LIKE)
                   .setUserId(hostHolder.getUser().getId())
                   .setEntityType(entityType)
                   .setEntityId(entityId)
                   .setEntityUserId(entityUserId)
                   .setData("postId",postId); // 重构了like方法,添加了postId参数
            eventProducer.fireEvent(event);
       }
        return CommunityUtil.getJSONString(0,null,map);
    }

为什么需要重构,因为我们需要postId这个参数

当系统给用户发通知时,比如:xxx用户赞了你的帖子,xxx用户赞了你的评论,你需要帖子的id,才能完成“点击查看”时,跳转到帖子页面。
因为重构了like方法,所以我们需要处理一些discuss.js中like方法传参问题:
修改discuss-detail页面


修改discusss.js

4.6 处理关注事件-FollowController

在FollowController类中follow方法,增加如下代码:

取消关注,不发消息,不触发事件。

4.7 测试

以前写的AOP记录日志报错

空指针异常:

以前访问都是:前端页面 -> controller层 -> service层
现在,出现了Kafka的 消费者,也调用了service层,就出现了空指针异常,此时,没有请求到service层
解决方法:

点赞、关注、评论后,message表中有数据了

5. 显示系统通知
  • 通知列表
    • 显示评论、点赞、关注三种类型的通知
  • 通知详情
    • 分页显示某一类主题所包含的通知
  • 未读消息
    • 在页面头部显示所有的未读消息数量
5.1 通知列表 5.1.1 数据层

消息最终都存在message表中。
在dao下MessageMaper类中,新增几个方法

   

    // 查询某个主题下 最新 的通知
    Message selectLatestNotice(int userId, String topic);

    // 查询某个主题所包含的通知数量
    int selectNoticeCount(int userId, String topic);

    // 查询未读的通知的数量
    int selectNoticeUnreadCount(int userId, String topic);

在mapper包下的message-mapper.xml中,编写对应的SQL语句

    
        select count(id) from message
        where status != 2
        and from_id = 1
        and to_id = #{userId}
        and conversation_id = #{topic}
    

    
        select 
        from message
        where status != 2
        and from_id = 1
        and to_id = #{userId}
        and conversation_id = #{topic}
        order by create_time desc
        limit #{offset}, #{limit}
    

5.2.2 服务层

在service包MessageService类中,添加对应的方法

    public List findNotices(int userId, String topic, int offset, int limit) {
        return messageMapper.selectNotices(userId,topic,offset,limit);
    }
5.2.3 视图层
  1. 在controller包下MessageController类中,添加新方法
    @RequestMapping(value = "/notice/detail/{topic}", method = RequestMethod.GET)
    public String getNoticeDetail(@PathVariable("topic") String topic, Page page, Model model) {

        User user = hostHolder.getUser();

        // 分页信息
        page.setLimit(5);
        page.setPath("/notice/detail/" + topic);
        page.setRows(messageService.findNoticeCount(user.getId(),topic));

        // 私信列表
        List noticeList = messageService.findNotices(user.getId(),topic,page.getOffset(),page.getLimit());
        ArrayList> noticeVoList = new ArrayList<>();
        if (noticeList != null) {
            for (Message notice : noticeList) {
                HashMap map = new HashMap<>();
                // 通知
                map.put("notice",notice);
                // 内容
                String content = HtmlUtils.htmlUnescape(notice.getContent());
                HashMap data = JSONObject.parseObject(content, HashMap.class);
                map.put("user",userService.findUserById((Integer) data.get("userId")));
                map.put("entityId",data.get("entityId"));
                map.put("entityType",data.get("entityType"));
                map.put("postId",data.get("postId"));

                // 通知的作者
                map.put("fromUser", userService.findUserById(notice.getFromId()));

                noticeVoList.add(map);
            }
        }
        model.addAttribute("notices", noticeVoList);


        // 设置已读
        List ids = getLetterIds(noticeList);
        if (!ids.isEmpty()) {
            messageService.readMessage(ids);
        }

        return "/site/notice-detail";
    }
  1. 修改notice-detail.html页面
5.3 页面头部显示所有的未读消息数量


每个请求处理完以后,都要查询一下未读消息的数量,用拦截器来处理

  1. 定义一个拦截器,在
package com.ateam.community.controller.interceptor;

@Component
public class MessageInterceptor  implements HandlerInterceptor {

   @Autowired
   private HostHolder hostHolder;

   @Autowired
   private MessageService messageService;

   @Override
   public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
       User user = hostHolder.getUser();
       if (user != null && modelAndView != null) {
           int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null);
           int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
           modelAndView.addObject("allUnreadCount", letterUnreadCount + noticeUnreadCount);
       }

   }
}

  1. 配置这个拦截器
    在config包下WebMvcConfig类中,配置拦截器
    @Autowired
    private MessageInterceptor messageInterceptor;

    @Autowired
    private DataInterceptor dataInterceptor;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(alphaInterceptor)
                .excludePathPatterns("*.css","*.js","*.png","*.jpg","*.jpeg")//通配符
                .addPathPatterns("/register","/login");

        registry.addInterceptor(loginTicketInterceptor)
                .excludePathPatterns("*.css","*.js","*.png","*.jpg","*.jpeg");//通配符

        // 这是自己写的登录认证,现在由springSecurity来管理,这个废弃
//        registry.addInterceptor(loginRequiredInterceptor)
//                .excludePathPatterns("*.css","*.js","*.png","*.jpg","*.jpeg");//通配符

        registry.addInterceptor(messageInterceptor)
                .excludePathPatterns("*.css","*.js","*.png","*.jpg","*.jpeg");//通配符
}
  1. 修改index.html页面
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/696367.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号