使用springboot实现
一、选择MongoDB和RabbitMQ理由- 对于公告消息,本设计是设计为 为每个用户创建一条公告信息(原因是:方便记录用户对于消息的已读和未读状态,这样设计会更符合用户需求),因此设计两个表,message(存储消息及其发送者消息) 和 message_ref(存储接收者,及已读和新接收状态)数据库选择为MongoDB,原因是MongoDB对于海量数据以及高并发情况下的读写数据很有优势,同时他的数据存储是以文档结构存储,简单来说就是用JSON格式,他会更加类似关系型数据库的存储,同时MongoDB从3.X开始支持集合的连接查询,就可以实现message和message_ref连接查询,查询出某个用户拥有的信息。由于存在海量数据,高并发情况下,MongoDB也支持不了瞬时写入百万数据,因此引入消息队列MQ来进行削峰填谷。选择RabbitMQ的原因是,它的可靠性和稳定性比较好,而且不仅支持消息的异步收发,还支持消息的同步收发
1.发送系统通知时,先把消息数据插入message表中,然后把消息推送到相关的消息队列(以用户id作为routing-key)中去
3.消费者消费消息时,取消自动应答,每次从消息队列中取数据的时候,把数据插入message_ref(lastFlag为true【新信息】,readFlag为false【未读】)中,插入成功后再发送消息的Ack应答,让消息队列删除消息
3.前端设计定时器轮训:每次读取消息的时候,先从消息队列中接收消息(步骤3;然后把message_ref中的lastFlag改为false,更新的数量即为新消息的数量;查询readFlag为false的数据(查询未读数据),然后把这些数据传给前端
三、实现有些地方引用了hutool工具,可自行引入依赖
cn.hutool
hutool-all
5.4.0
3.1、MongoDB数据库表的设计
3.1.1.message集合
集合相当于MySQL中的数据表,但是没有固定的表结构。集合有什么字段,取决于保存在其中的数据。下面这张表格是Message集合中JSON数据的结构要求。
| 字段 | 类型 | 备注 |
|---|---|---|
| _id | UUID | 自动生成的主键值 |
| uuid | UUID | UUID值,并且设置有唯一性索引,防止消息被重复消费 |
| senderId | Integer | 发送者ID,就是用户ID。如果是系统自动发出,这个ID值是0 |
| senderPhoto | String | 发送者的头像URL。在消息页面要显示发送人的头像 |
| senderName | String | 发送者名称,也就是用户姓名。在消息页面要显示发送人的名字 |
| msg | String | 消息正文 |
| sendTime | Date | 发送时间 |
uuid : 防止重复消费
消息积压过多的情况下,如果第一次轮询还没结束,第二次轮询就开始了,那就可能出现把重复的数据写入数据库中,因此如果每条MQ消息都有唯一的UUID,第一个消费者把消息保存到数据库,那么第二个消费者就无法再把这条消息保存到数据库,解决了消息的重复消费问题。
3.1.2 message_ref集合虽然message集合记录的是消息内容及其发送者,message_ref集合来记录接收人和已读状态。
| 字段 | 类型 | 备注 |
|---|---|---|
| _id | UUID | 主键 |
| messageId | UUID | message记录的_id |
| receiverId | String | 接收人ID |
| readFlag | Boolean | 是否已读 |
| lastFlag | Boolean | 是否为新接收的消息 |
执行两个集合的联合查询,根据接收人来查询消息,并且按照消息发送时间降序排列,查询前50条记录
db.message.aggregate([
{
$set: {
"id": { $toString: "$_id" }
}
},
{
$lookup:{
from:"message_ref",
localField:"id",
foreignField:"messageId",
as:"ref"
},
},
{ $match:{"ref.receiverId": 1} },
{ $sort: {sendTime : -1} },
{ $skip: 0 },
{ $limit: 50 }
])
解析1.$set: { "id": { $toString: "$_id" } }添加一个字段,字段名为id,值为"_id"的值,格式为string类型3.$lookup:{ from:"message_ref", localField:"id", foreignField:"messageId", as:"ref" },连接另一个表from:连接哪个表localField :以自己的那个字段与另一个表连接foreignField:另一个表的连接字段as:给连接的那个表起别名3.$match:{"ref.receiverId": 1}查询条件(相当于where),该例子表示where ref.receiverId = 14.$sort: {sendTime : -1}排序,该例子为根据sendTime按降序排5.$skip: 0 从那条数据开始(数据是从0开始算的,即0为第一条数据),相当于分页查询的start ,即sql语句limit start,length中的start6.$limit: 50查询几条数据,相当于分页查询的length,即sql语句limit start,length中的length
3.2 程序中实现数据表实体设计
依赖导入
org.springframework.boot
spring-boot-starter-data-mongodb
mongoDB配置
spring: #MongoDB data: mongodb: #ip host: localhost #端口 port: 37017 #数据库 database: pbms authentication-database: admin username: admin password: abc1334563.2.1 pojo实体类
@Data
@document(collection = "message")
public class MessageEntity implements Serializable {
@Id
private String _id;
@Indexed(unique = true)
private String uuid;
@Indexed
private Integer senderId;
private String senderPhoto=""; //默认系统图片
private String senderName;
@Indexed
private Date sendTime;
private String msg;
}
@document(collection = "message_ref")
@Data
public class MessageRefEntity implements Serializable {
@Id
private String _id;
@Indexed
private String messageId;
@Indexed
private Integer receiverId;
@Indexed
private Boolean readFlag;
@Indexed
private Boolean lastFlag;
}
解析1.@document(collection = "message") 表示映射到Mongodb文档上的领域对象3.@Id 表示某个域为ID域3.@Indexed 表示某个字段为Mongodb的索引字段3.2.2 DAO
messageDao
@Repository
public class MessageDao {
@Autowired
private MongoTemplate mongoTemplate;
//插入消息数据
public String insert(MessageEntity entity) {
//把北京时间转换成格林尼治时间
Date sendTime = entity.getSendTime();
sendTime = DateUtil.offset(sendTime, DateField.HOUR, -8);
entity.setSendTime(sendTime);
entity = mongoTemplate.save(entity);
return entity.get_id();
}
//分页查询某个人的信息
public List searchMessageByPage(int userId,long start,long length){
JSonObject json = new JSonObject();
json.set("$toString","$_id");
//类比上面所说的连接查询,以那个为逻辑
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.addFields().addField("id").withValue(json).build(),
Aggregation.lookup("message_ref","id","messageId","ref"),
Aggregation.match(Criteria.where("ref.receiverId").is(userId)),
Aggregation.sort(Sort.by(Sort.Direction.DESC,"sendTime")),
Aggregation.skip(start),
Aggregation.limit(length)
);
AggregationResults message = mongoTemplate.aggregate(aggregation, "message", HashMap.class);
List list = message.getMappedResults();
list.forEach(one -> {
List refList = (List) one.get("ref");
MessageRefEntity entity = refList.get(0);
Boolean readFlag = entity.getReadFlag();
String id = entity.get_id();
one.remove("ref");
one.put("readFlag",readFlag);
one.put("refId",id);
one.remove("_id");
//把格林尼治时间转换成北京时间
Date sendTime = (Date) one.get("sendTime");
sendTime = DateUtil.offset(sendTime, DateField.HOUR, 8);
String today = DateUtil.today();
//如果是今天的消息,只显示发送时间,不需要显示日期
if (today.equals(DateUtil.date(sendTime).toDateStr())) {
one.put("sendTime", DateUtil.format(sendTime, "HH:mm"));
}
//如果是以往的消息,只显示日期,不显示发送时间
else {
one.put("sendTime", DateUtil.format(sendTime, "yyyy/MM/dd"));
}
});
return list;
}
//根据id查询某条消息详细内容
public HashMap searchMessageById(String id){
HashMap message = mongoTemplate.findById(id, HashMap.class, "message");
Date sendTime = (Date) message.get("sendTime");
//把格林尼治时间转换成北京时间
sendTime = DateUtil.date(sendTime).offset(DateField.HOUR, 8);
message.replace("sendTime", DateUtil.format(sendTime, "yyyy-MM-dd HH:mm"));
return message;
}
}
MessageRefDao
@Repository
public class MessageRefDao {
@Autowired
private MongoTemplate mongoTemplate;
public String insert(MessageRefEntity entity){
entity = mongoTemplate.save(entity);
return entity.get_id();
}
public long searchUnreadCount(int userId){
Query query = new Query();
query.addCriteria(Criteria.where("readFlag").is(false).and("receiverId").is(userId));
long count = mongoTemplate.count(query, MessageRefEntity.class);
return count;
}
public long searchLastCount(int userId){
Query query = new Query();
query.addCriteria(Criteria.where("lastFlag").is(true).and("receiverId").is(userId));
Update update = new Update();
update.set("lastFlag",false);
UpdateResult result = mongoTemplate.updateMulti(query, update, "message_ref");
long count = result.getModifiedCount();
return count;
}
public long updateUnreadMessage(String id){
Query query = new Query();
query.addCriteria(Criteria.where("_id").is(id));
Update update = new Update();
update.set("readFlag",true);
UpdateResult result = mongoTemplate.updateFirst(query, update, "message_ref");
long count = result.getModifiedCount();
return count;
}
public long deleteMessageRefById(String id){
Query query = new Query();
query.addCriteria(Criteria.where("_id").is(id));
DeleteResult messageRef = mongoTemplate.remove(query, "message_ref");
long deletedCount = messageRef.getDeletedCount();
return deletedCount;
}
public long deleteUserMessageRef(int userId){
Query query = new Query();
query.addCriteria(Criteria.where("receiverId").is(userId));
DeleteResult ref = mongoTemplate.remove(query, "message_ref");
long deletedCount = ref.getDeletedCount();
return deletedCount;
}
}
3.3 业务层设计
基本上是引用DAO层
public interface MessageService {
public String insertMessage(MessageEntity entity);
public String insertRef(MessageRefEntity entity);
public long searchUnreadCount(int userId);
public long searchLastCount(int userId);
public List searchMessageByPage(int userId, long start, int length) ;
public HashMap searchMessageById(String id);
public long updateUnreadMessage(String id) ;
public long deleteMessageRefById(String id);
public long deleteUserMessageRef(int userId);
}
@Service
public class MessageServiceImpl implements MessageService {
@Autowired
private MessageDao messageDao;
@Autowired
private MessageRefDao messageRefDao;
@Override
public String insertMessage(MessageEntity entity) {
String id = messageDao.insert(entity);
return id;
}
@Override
public String insertRef(MessageRefEntity entity) {
String id = messageRefDao.insert(entity);
return id;
}
@Override
public long searchUnreadCount(int userId) {
long count = messageRefDao.searchUnreadCount(userId);
return count;
}
@Override
public long searchLastCount(int userId) {
long count = messageRefDao.searchLastCount(userId);
return count;
}
@Override
public List searchMessageByPage(int userId, long start, int length) {
List list = messageDao.searchMessageByPage(userId, start, length);
return list;
}
@Override
public HashMap searchMessageById(String id) {
HashMap map = messageDao.searchMessageById(id);
return map;
}
@Override
public long updateUnreadMessage(String id) {
long count = messageRefDao.updateUnreadMessage(id);
return count;
}
@Override
public long deleteMessageRefById(String id) {
long count = messageRefDao.deleteMessageRefById(id);
return count;
}
@Override
public long deleteUserMessageRef(int userId) {
long count = messageRefDao.deleteUserMessageRef(userId);
return count;
}
}
3.4 web层
1.获取分页消息列表
@ApiModel
@Data
public class SearchMessageByPageForm {
@NotNull
@Min(1)
private Integer page;
@NotNull
@Range(min = 1,max = 40)
private Integer length;
}
@RestController
@RequestMapping("/message")
@Api("消息模块网络接口")
public class MessageController {
@Autowired
private JwtUtil jwtUtil;
@Autowired
private MessageService messageService;
@PostMapping("/searchMessageByPage")
@ApiOperation("获取分页消息列表")
public R searchMessageByPage(@Valid @RequestBody SearchMessageByPageForm form, @RequestHeader("token") String token) {
int userId = jwtUtil.getUserId(token);
int page = form.getPage();
int length = form.getLength();
long start = (page - 1) * length;
List list = messageService.searchMessageByPage(userId, start, length);
return R.ok().put("result", list);
}
}
3.根据ID查询消息
@ApiModel
@Data
public class SearchMessageByIdForm {
@NotBlank
private String id;
}
public class MessageController {
……
@PostMapping("/searchMessageById")
@ApiOperation("根据ID查询消息")
public R searchMessageById(@Valid @RequestBody SearchMessageByIdForm form) {
HashMap map = messageService.searchMessageById(form.getId());
return R.ok().put("result", map);
}
}
3.把未读消息更新成已读消息
@ApiModel
@Data
public class UpdateUnreadMessageForm {
@NotBlank
private String id;
}
public class MessageController {
……
@PostMapping("/updateUnreadMessage")
@ApiOperation("未读消息更新成已读消息")
public R updateUnreadMessage(@Valid @RequestBody UpdateUnreadMessageForm form) {
long rows = messageService.updateUnreadMessage(form.getId());
return R.ok().put("result", rows == 1 ? true : false);
}
}
4.删除消息
@Data
@ApiModel
public class DeleteMessageRefByIdForm {
@NotBlank
private String id;
}
public class MessageController {
……
@PostMapping("/deleteMessageRefById")
@ApiOperation("删除消息")
public R deleteMessageRefById(@Valid @RequestBody DeleteMessageRefByIdForm form){
long rows=messageService.deleteMessageRefById(form.getId());
return R.ok().put("result", rows == 1 ? true : false);
}
}
5.轮询接收系统消息
public class MessageController {
……
@GetMapping("/refreshMessage")
@ApiOperation("刷新用户的消息")
public R refreshMessage(@RequestHeader("token") String token) {
int userId = jwtUtil.getUserId(token);
//异步接收消息
messageTask.receiveAysnc(userId + "");
//查询接收了多少条消息
long lastRows=messageService.searchLastCount(userId);
//查询未读数据
long unreadRows = messageService.searchUnreadCount(userId);
return R.ok().put("lastRows", lastRows).put("unreadRows", unreadRows);
}
}
3.5 使用RabbitMQ实现削峰填谷
3.5.1 导入依赖
3.5.2 配置类com.rabbitmq amqp-client 5.9.0 org.springframework.boot spring-boot-starter-amqp
注意这里可能存在rabbitMQ连接不上的问题:ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.
原因是不支持使用默认用户进行非本地连接,应新建用户
如何新建用户
@Configuration
public class RabbitMQConfig {
@Bean
public ConnectionFactory getFactory(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip"); //rabbitMQ所处服务器ip
factory.setPort(5672); //端口
factory.setUsername("admin"); //用户名
factory.setPassword("admin"); //密码
return factory;
}
}
3.5.3 收发消息
这里只是使用的rabbitMQ的简单模式,即一个消费者对应一个队列
如有其他需求可自行学习rabbitMQ的其他模式
rabbitMQ相关笔记
@Slf4j
@Component
public class MessageTask {
@Autowired
private ConnectionFactory factory;
@Autowired
private MessageService messageService;
public void send(String topic, MessageEntity entity) {
String id = messageService.insertMessage(entity); //向MongoDB保存消息数据,返回消息ID
//向RabbitMQ发送消息
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//连接到某个Topic
channel.queueDeclare(topic, true, false, false, null);
HashMap header = new HashMap(); //存放属性数据
header.put("messageId", id);
//创建AMQP协议参数对象,添加附加属性
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(header).build();
channel.basicPublish("", topic, properties, entity.getMsg().getBytes());
log.debug("消息发送成功");
} catch (Exception e) {
log.error("执行异常", e);
throw new PBMSException("向MQ发送消息失败");
}
}
@Async
public void sendAsync(String topic, MessageEntity entity) {
send(topic, entity);
}
public int receive(String topic) {
int i = 0;
try (//接收消息数据
Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 从队列中获取消息,不自动确认
channel.queueDeclare(topic, true, false, false, null);
//Topic中有多少条数据未知,所以使用死循环接收数据,直到接收不到消息,退出死循环
while (true) {
//创建响应接收数据,禁止自动发送Ack应答
GetResponse response = channel.basicGet(topic, false);
if (response != null) {
AMQP.BasicProperties properties = response.getProps();
Map header = properties.getHeaders(); //获取附加属性对象
String messageId = header.get("messageId").toString();
byte[] body = response.getBody();//获取消息正文
String message = new String(body);
log.debug("从RabbitMQ接收的消息:" + message);
MessageRefEntity entity = new MessageRefEntity();
entity.setMessageId(messageId);
entity.setReceiverId(Integer.parseInt(topic));
entity.setReadFlag(false);
entity.setLastFlag(true);
messageService.insertRef(entity); //把消息存储在MongoDB中
//数据保存到MongoDB后,才发送Ack应答,让Topic删除这条消息
long deliveryTag = response.getEnvelope().getDeliveryTag();
channel.basicAck(deliveryTag, false);
i++;
} else {
break; //接收不到消息,则退出死循环
}
}
} catch (Exception e) {
log.error("执行异常", e);
}
return i;
}
@Async
public int receiveAysnc(String topic) {
return receive(topic);
}
public void deleteQueue(String topic) {
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDelete(topic);
log.debug("消息队列成功删除");
} catch (Exception e) {
log.error("删除队列失败", e);
throw new PBMSException("删除队列失败");
}
}
@Async
public void deleteQueueAsync(String topic) {
deleteQueue(topic);
}
}
3.6 使用
1.发送消息
在要发送信息的地方:创建MessageEntity,调用messageService.insertMessage,插入message表,然后调用messageTask.send发送消息到消息队列
2.接收消息
调用messageController.refreshMessage()接收消息,同时可获取新消息数量和未读消息数量
测试数据
@Test
void contextLoads() {
for (int i = 1; i <= 100; i++) {
MessageEntity message = new MessageEntity();
message.setUuid(IdUtil.simpleUUID());
message.setSenderId(0);
message.setSenderName("系统消息");
message.setMsg("这是第" + i + "条测试消息");
message.setSendTime(new Date());
String id=messageService.insertMessage(message);
MessageRefEntity ref=new MessageRefEntity();
ref.setMessageId(id);
ref.setReceiverId(11); //注意:这是接收人ID
ref.setLastFlag(true);
ref.setReadFlag(false);
messageService.insertRef(ref);
}
}



