Docker 分为 CE 和 EE 两大版本。CE 即社区版(免费,支持周期 7 个月),EE 即企业版,强调安全,付费使用,支持周期 24 个月。
Docker CE 分为 stable test 和 nightly 三个更新频道。
官方网站上有各种环境下的 安装指南,这里主要介绍 Docker CE 在 CentOS上的安装。
Docker CE 支持 64 位版本 CentOS 7,并且要求内核版本不低于 3.10, CentOS 7 满足最低内核的要求,所以我们在CentOS 7安装Docker。
D.1 安装docker (1) 卸载(可选)如果之前安装过旧版本的Docker,可以使用下面命令卸载:
yum remove docker
docker-client
docker-client-latest
docker-common
docker-latest
docker-latest-logrotate
docker-logrotate
docker-selinux
docker-engine-selinux
docker-engine
docker-ce
(2) 安装docker
首先需要大家虚拟机联网,安装yum工具
yum install -y yum-utils
device-mapper-persistent-data
lvm2 --skip-broken
然后更新本地镜像源:
# 设置docker镜像源
yum-config-manager
--add-repo
https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
sed -i 's/download.docker.com/mirrors.aliyun.com/docker-ce/g' /etc/yum.repos.d/docker-ce.repo
yum makecache fast
然后输入命令:
yum install -y docker-ce
docker-ce为社区免费版本。稍等片刻,docker即可安装成功。
(3) 启动dockerDocker应用需要用到各种端口,逐一去修改防火墙设置。非常麻烦,因此建议大家直接关闭防火墙!
启动docker前,一定要关闭防火墙后!!
启动docker前,一定要关闭防火墙后!!
启动docker前,一定要关闭防火墙后!!
# 关闭 systemctl stop firewalld # 查看防火墙状态 systemctl status firewalld # 禁止开机启动防火墙 systemctl disable firewalld
通过命令启动docker:
systemctl start docker # 启动docker服务 systemctl status docker # 查看docker状态 systemctl stop docker # 停止docker服务 systemctl restart docker # 重启docker服务
然后输入命令,可以查看docker版本:
docker -v
如图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hTWNVR71-1639731721195)(assets/image-20210418154704436.png)]
(4) 配置镜像加速docker官方镜像仓库网速较差,我们需要设置国内镜像服务:
参考阿里云的镜像加速文档:https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors
D.2 docker常用命令docker images 查看docker镜像列表 docker save -o nginx.tar nginx:latest 在当前目录打包一个nginx.tar文件 docker save --help 查看docker save命令的使用解释 docker rmi ngingx:latest 删除镜像 docker load -i nginx.tar 加载当前目录的nginx.tar镜像D.2.1 创建并运行一个docker容器 D.2.2 进入容器内部
修改文件内容:
sed -i 's#Welcome to nginx!#小猫佩琪欢迎您!#g' index.html sed -i 's###g' index.htmlD.2.3 docker数据卷 操作数据卷 挂载数据卷
如果执行该命令时数据卷不存在将会自动创建
直接将宿主机目录挂载到容器
在centos7中使用docker来安装
(1)下载镜像 方式一:从hub.docker.com在线拉取docker pull rabbitmq:3-management方式二:本地加载
将本地镜像包 mq.tar 上传到服务器中 /tmp 目录下,进入/tmp目录使用命令加载镜像
docker load -i mq.tar
使用 docker images 查看镜像列表
(2)运行mq容器第一个-p是rabbitmq管理平台端口
第二个-p是消息通信端口
docker run -e RABBITMQ_DEFAULT_USER=peppacatt -e RABBITMQ_DEFAULT_PASS=123456 --name rabbitmq_wsw --hostname mq1 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management
使用 docker ps 查看正在运行中的容器
访问 服务器ip:15672 进入管理平台,使用上面配置的账号和密码进入
- 发送者publisher把消息发送到exchange交换机
- 交换机把消息路由到队列queue
- 消费者再从队列当中获取消息,处理消息
rabbitmq中的几个概念:
- channels:操作mq的工具
- exchange:路由消息到队列中
- queue:缓存消息
- virtual host(虚拟主机): 是对queue,exchange等资源的逻辑分组
参见 rabbitmq.com 官网中的文档
蓝色:交换机
红色:消息队列
p:发送者
c:消费者
前两种都是基于队列完整消息的发送和接收,没有涉及到交换机,不是完整的消息推送模式
发送者代码示例:
package cn.itcast.mq.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.5.132");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("peppacatt");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, peppacatt!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
消费者代码示例:
package cn.itcast.mq.helloworld;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.5.132");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("peppacatt");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列(此处还要创建队列的原因:生产者和消费者启动的顺序是不确定的,为了避免消费者使用队列的时候队列还不存在的情况)
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
消息在被消费者接收之后,rabbitmq消息队列中的消息会被立即删除
总结:
什么是SpringAMQP
步骤:
org.springframework.boot
spring-boot-starter-amqp
(2)publisher发送者服务
1.在publisher发送者服务中编辑yml,添加mq连接信息
spring:
rabbitmq:
host: 192.168.217.136 #RabbitMQ的ip地址
port: 5672 #端口
username: peppacatt #用户名
password: 123456 #密码
virtual-host: / #虚拟主机
2.在发送者服务中编写代码发送消息
注意:消息队列中的消息阅后即焚,如果此时consumer服务是开启的,将会把此时发送到队列中的消息接收,RabbitMQ管理平台中的队列将无消息显示,可先停掉consumer服务
package cn.itcast.mq.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
//队列名称
String queueName = "simple.queue";
//要发送的消息
String msg = "hello SpringAMQP!!!";
rabbitTemplate.convertAndSend(queueName, msg);
}
}
(3)consumer消费者服务
1.在consumer消费者服务中编辑yml,添加mq连接信息
spring:
rabbitmq:
host: 192.168.217.136 #rabbitmq的ip地址
port: 5672 #端口
username: peppacatt #用户名
password: 123456 #密码
virtual-host: / #虚拟主机
2.在consumer消费者服务中新建一个类,编写消费逻辑
package cn.itcast.mq.config;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQListener {
//queues 指定队列名称(可指定多个也可指定一个)
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg){
System.out.println("消费者接收到simple.queue的消息:["+msg+"]");
}
}
3.运行consumer启动类main函数,服务启动后将会持续监听上面指定的队列 simple.queue 中的消息,只要有发送者向该队列发送消息,consumer服务就会接受该消息
R.3.2 工作消息队列 (1) 引入依赖…
(2)publisher服务1.编辑yml
…
2.编写发送代码
package cn.itcast.mq.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {
@Autowired
private RabbitTemplate rabbitTemplate;
// @Test
// public void testSimpleQueue() {
// //队列名称
// String queueName = "simple.queue";
// //要发送的消息
// String msg = "hello SpringAMQP!!!";
// rabbitTemplate.convertAndSend(queueName, msg);
// }
@Test
public void testSendMsgWorkQueue() throws InterruptedException {
//队列名称
String queueName = "simple.queue";
//要发送的消息
String msg = "hello SpringAMQP!!!";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName, msg+"第"+i+"条");
//不要一下发完 1秒发送50条消息
Thread.sleep(20);
}
}
}
(3)consumer服务
package cn.itcast.mq.config;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
@Component
public class RabbitMQListener {
//queues 指定队列名称(可指定多个也可指定一个)
// @RabbitListener(queues = "simple.queue")
// public void listenSimpleQueue(String msg){
// System.out.println("消费者接收到simple.queue的消息:["+msg+"]");
// }
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue(String msg) throws InterruptedException {
System.out.println("消费者0号wwwwwwwwwwwwwwwwww接收到simple.queue的消息:[" + msg + "]" + LocalTime.now());
//一秒接受50条消息
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1号sssssssssssssssssss接收到simple.queue的消息:[" + msg + "]" + LocalTime.now());
//一秒接受5条消息
Thread.sleep(200);
}
}
结果和总结
- 先启动consumer服务
- 运行publisher发送代码
- 查看consumer输出端的信息,
发现消费者0号接受的消息序号都是偶数,消费者1号接受的消息序号都是奇数,而且接受消息的时间大于了一秒
上面我们定义的两个消费者接收消息的能力不同,照理来说应该是消费者0号接受的消息多一些才合理,结果是两个消费者接收的消息数量是一样的,这是由于RabbitMQ内部的消息预取机制造成的
消息预取:当大量消息消息到达队列时,两个消费者会提前将消息拿过去(不管自己处理消息的能力强弱),平均分配队列中的消息,导致消息的接受大于了一秒
设置了消息预取限制之后,谁的能力强谁处理的消息就越多,可在案例中打印的时间看出处理消息的时间差不多为1秒左右



