服务之间最常见的通信方式是直接调用彼此来通信 , 消息从一端发出后立即就可以达到另一端 , 称为即时消息通讯 ( 同步通信 ) 消息从某一端发出后 , 首先进入一个容器进行临时存储 , 当达到某种条件后 , 再由这个容器发送给另一端 , 称为延迟消息通讯 ( 异步通信 )消息队列相关
AMQP 一个提供统一消息服务的应用层标准高级消息队列协议 , 是一个通用的应用层协议 消息发送与接受的双方遵守这个协议可以实现异步通讯 . 这个协议约定了消息的格式和工作方式 .
RabbitMq
RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言.
Server(Broker): 接收客户端连接 , 实现 AMQP 协议的消息队列和路由功能的进程 . Virtual Host :虚拟主机的概念 , 类似权限控制组 , 一个 Virtual Host 里可以有多个 Exchange 和 Queue. Exchange: 交换机 , 接收生产者发送的消息 , 并根据 Routing Key 将消息路由到服务器中的队列 Queue. ExchangeType: 交换机类型决定了路由消息行为 ,RabbitMQ 中有三种类型 Exchange, 分别是 fanout 、 direct 、 topic. Message Queue :消息队列 , 用于存储还未被消费者消费的消息 . Message :由 Header 和 body 组成 ,Header 是由生产者添加的各种属性的集合 , 包括 Message 是否被持久化、优先级是多少、由哪个 Message Queue 接收等 .body 是真正需要发送的数据内 容 . BindingKey :绑定关键字 , 将一个特定的 Exchange 和一个特定的 Queue 绑定起来
一、Docker安装部署RabbitMQ
docker pull rabbitmq:management
注意获取镜像的时候要获取 management 版本的 , 不要获取 last 版本的 ,management 版本的才带有管理界面
没有raabitmq文件夹的话新建一个
mkdir rabittmq
docker run -d --name my-rabbitmq -p 5672:5672 -p 15672:15672 -v /home/rabbitmq:/var/lib/rabbitmq --hostname my-rabbitmq-host -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --restart=always rabbitmq:management
--hostname :主机名 (RabbitMQ 的一个重要注意事项是它根据所谓的 “ 节点名称 ” 存储数据 , 默认为主机名 ) -e :指定环境变量 : RABBITMQ_DEFAULT_VHOST :默认虚拟机名 RABBITMQ_DEFAULT_USER :默认的用户名 RABBITMQ_DEFAULT_PASS :默认用户名的密码
springboot连接配置
添加一个新账户并添加权限
springboot项目搭建
父项目
pom.xml
provider comsuner 4.0.0 org.example rabbitMQ1.0-SNAPSHOT pom 1.8 UTF-8 UTF-8 2.4.1 org.projectlombok lomboktrue org.springframework.boot spring-boot-starter-testtest org.springframework.boot spring-boot-dependencies${spring-boot.version} pom import
provider
pom.xml
rabbitMQ org.example 1.0-SNAPSHOT 4.0.0 com.smy provider0.0.1-SNAPSHOT provider Demo project for Spring Boot org.apache.maven.plugins maven-compiler-plugin3.8.1 1.8 1.8 UTF-8 org.springframework.boot spring-boot-maven-plugin2.4.1 com.smy.provider.ProviderApplication repackage repackage
comsuner
pom.xml
rabbitMQ org.example 1.0-SNAPSHOT 4.0.0 com.smy comsuner0.0.1-SNAPSHOT comsuner Demo project for Spring Boot org.springframework.boot spring-boot-starter-weborg.apache.maven.plugins maven-compiler-plugin3.8.1 1.8 1.8 UTF-8 org.springframework.boot spring-boot-maven-plugin2.4.1 com.smy.comsuner.ComsunerApplication repackage repackage
还要导一个依赖 连接消息列的依赖
org.springframework.boot spring-boot-starter-amqp
生产者 Provider
新建俩个类
RabbitConfig
package com.smy.provider;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@SuppressWarnings("all")
public class RabbitConfig {
@Bean
public Queue firstQueue() {
return new Queue("firstQueue");
}
}
Sender
package com.smy.provider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendFirst() {
rabbitTemplate.convertAndSend("firstQueue", "Hello World");
}
//
// public void sendFirst(User user) {
// rabbitTemplate.convertAndSend("firstQueue", user);
// }
// public void sendFirst(String json) {
// rabbitTemplate.convertAndSend("firstQueue", json);
// }
}
编写测试类
package com.smy.provider;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class ProviderApplicationTests {
@Autowired
private Sender sender;
@Test
void contextLoads() {
sender.sendFirst();
// sender.sendFirst(new User("aa","bb"));
// User user = new User("aa", "bb");
//转json
// ObjectMapper mapper = new ObjectMapper();
//
// try {
// sender.sendFirst(mapper.writevalueAsString(user));
// } catch (JsonProcessingException e) {
// e.printStackTrace();
// }
}
}
消费者 Consumer
创建一个类 (接收者)
package com.smy.comsuner;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "firstQueue")
public class Receiver {
// @RabbitHandler
// public void process(String json) throws JsonProcessingException {
// log.warn("接收到:" + json);
// //json转对象
// ObjectMapper mapper = new ObjectMapper();
// User user = mapper.readValue(json, User.class);
// log.warn("接收到:" + user);
// }
@RabbitHandler public void process(String msg) { log.warn("接收到:" + msg); }
}
开启启动类
传入user数据
public void sendFirst(User user) {
rabbitTemplate.convertAndSend("firstQueue", user);
}
provider
user类
package com.smy.provider;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@SuppressWarnings("all")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
private String username;
private String userpwd;
}
生产者会报错 实现序列化接口就好了(implements)
SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: com.smy.provider.User
SimpleMessageConverter仅支持字符串、字节[]和可序列化的有效载荷,接收:com。smy。供应商。使用者
再次测试生产者,测试通过,但是接收者还是报错
解决办法:
1.写一个公共类dto把user放里面
2.把user转成json发送,接收者接收json在转成对象
provider
ProviderApplicationTests
运行测试类发送数据
package com.smy.provider;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class ProviderApplicationTests {
@Autowired
private Sender sender;
@Test
void contextLoads() {
// sender.sendFirst();
// sender.sendFirst(new User("aa","bb"));
User user = new User("aa", "bb");
// 转json
ObjectMapper mapper = new ObjectMapper();
try {
sender.sendFirst(mapper.writevalueAsString(user));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
成功接收



