消息服务、消息队列、消息中间件Broker
常见的消息服务器:
Rabbitma 绝大多数公司都够用了ActivemqRoketmq 再不就用这个KafkaTubemq 达到阿里类似的量的话可以用这个
使用场景:
- 实现消息生产者和消费者之间的解耦合
- 流量的消峰
- 导步的调用,上游服务需要下游服务执行
聊天 :如果毕业后去的公司里面的项目是一些保险啊、银行号这些的项目的话,一般可能都是追求稳定性多一些,所以使用的技术可能还是之前的,建议如果遇到这种的项目尽快跳槽
搭建Rabbitma服务器 Rabbitmq API 测试说明:Rabbitmq的端口有
5672:收发消息15672:控制台 新建EmptyProject :rabbitmq新建maven module:rabbitmq-api导入依赖
简单模式4.0.0 cn.tedu rabbitmq-api 1.0-SNAPSHOT com.rabbitmq amqp-client 5.4.3 org.apache.maven.plugins maven-compiler-plugin 3.8.1 1.8 1.8
创建m1.Producer,即使用默认交换机和一个队列,一个发,一个收的简单应用场景
package m1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//连接服务器
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
// 得到连接
Connection connection = f.newConnection();
// 创建channel
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, true, null);
//在服务器上创建一个队列,helloworld
for (int i = 0; i < 1000; i++) {
channel.basicPublish("", "hello", null, ("helloworld" + i).getBytes());
}
}
}
交换机可以从这里看到
创建消费者m1.Consumer
package m1;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.64.140");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
// 得到连接
Connection connection = factory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 创建收到消息时的回调函数
DeliverCallback deliverCallback = (String s, Delivery delivery) -> {
System.out.println("accept context:"+new String(delivery.getBody()));
};
CancelCallback cancelCallback = s -> {
System.out.println("cancel:"+s);
};
channel.queueDeclare("hello", false, false, true, null);
channel.basicConsume("hello",true,deliverCallback,cancelCallback);
}
}
可以开始测试了,我自己的机器一个只生产空消息,另一个只接收,不打印,使用虚拟机大概能处理2.6万条/秒 工作模式
多个消费者可以订阅同一个队列,这时消息会平均分摊,轮询给多个消费者,这样一个消费者不会得到全部消息。RabbitMQ不支持队列层面的广播消费
创建m2.Producer
package m2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//连接服务器
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
// 得到连接
Connection connection = f.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 定义队列
channel.queueDeclare("hello", false, false, false, null);
Scanner sc = new Scanner(System.in);
while (true) {
System.out.println("请输入消息:");
String line = sc.nextLine();
// 仍然使用默认交换机,发送routingKey为hello的消息
channel.basicPublish("", "hello", null, line.getBytes(StandardCharsets.UTF_8));
}
}
}
创建m2.Consumer
package m2;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//连接服务器
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
// 得到连接
Connection connection = f.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 定义队列
channel.queueDeclare("hello", false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, message) -> {
//处理消息
printMsg(message.getBody());
};
CancelCallback cancelCallback = consumerTag -> {};
// 消费消息
// 第二个参数:自动确认
// 即服务端发出消息后直接确认发出成功
channel.basicConsume("hello", true, deliverCallback, cancelCallback);
}
public static void printMsg(byte[] bytes){
long t1 = System.currentTimeMillis();
String s = new String(bytes);
System.out.println(s);
// 遍历字符串中每一个字符
for (int i = 0; i < s.length(); i++) {
// 如果遇到'.'这个字符就暂停一秒
// 用来模拟那些比较耗时的请求的处理
if (s.charAt(i) == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.printf("-------------消息处理完成(耗时:%s)----------------n", (System.currentTimeMillis() - t1));
}
}
运行两个consumer ,然后运行一个producer,多次生产数据,观察消费者,可以发现消息是轮流发往消费者的。而且无论是否有一方的上一个消息还没有消费完,仍然会按一人一个消息的方式来分发。合理分发的实现:由于上面都是使用自动确认消息的,即服务器发出消息后直接就确认消息发送成功了,所以服务器方面并不知道消费者是否已经消费完毕,所以如果希望分发时只分发给那些空闲的消费者,则可以使用非自动确认消息的方式。即消费者收到消息后,需要手动调用
// 第一个参数在message.getEnvelope()对象中 // 第二个参数的意思是:是否一同确认之前接收到的消息 // 否就是只确认当前消息消费完成 channel.basicAck(deliverTag,false);
来告诉服务器自己的消息已经消费完成,这样服务器也就知道了哪些消费者空闲、哪些繁忙了,也就会自动实现优先向空闲消费者分发消息了。如果消费者在消费完毕前down掉了(信道关闭,连接关闭或者TCP链接丢失),就是没有发送确认消息给服务器,则服务器会自动回滚此条消息,以确保消息不会丢失。
当处理消息时异常中断, 可以选择让消息重回队列重新发送.
nack 操作可以是消息重回队列, 可以使用 basicNack() 方法:
// requeue为true时重回队列, 反之消息被丢弃或被发送到死信队列 c.basicNack(tag, multiple, requeue)
这里不存在消息超时, rabbitmq只在消费者挂掉时重新分派消息, 即使消费者花非常久的时间来处理消息也可以
手动消息确认默认是开启的
qos预抓取的消息数量,消费者还可以手动指定接收一次消息的容量、条数。如果指定为1,意思是一次只接收一条消息,在消费完毕前不会接收下一条消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
printMsg(message.getBody());
//手动回复服务器一条确认消息
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = consumerTag -> {};
// 设置预收取一条,处理完之前不收下一条,手动ack模式下才有效
channel.basicQos(1);
// 这里第二个参数是false,意思是不自动确认消息消费完成
channel.basicConsume("hello-m2",false, deliverCallback, cancelCallback);
消息的持久化
当rabbitmq关闭时, 我们队列中的消息仍然会丢失, 除非明确要求它不要丢失数据
要求rabbitmq不丢失数据要做如下两点: 把队列和消息都设置为可持久化(durable)
队列设置为可持久化, 可以在定义队列时指定参数durable为true
队列持久化
已经创建好的队列的参数是不能修改的,即如果创建的时候参数是不持久化,则后面也不能修改成持久化的,只能重新创建。
// 创建时就指定是持久化队列,第二个参数指定为true
channel.queueDeclare("hello", true, false, false, null);
消息持久化
channel.basicPublish("",
"hello",
MessageProperties.PERSISTENT_BASIC, //常量类里面的常量
line.getBytes(StandardCharsets.UTF_8));
群发模式
生产者发出消息后,需要所有消费者都消费全部的消息的话 Rabbit交换机
交换机是不保存消息的,如果交换机上没有绑定队列,那么发给此交换机的消息都会被丢弃
Direct 默认交换机,当使用""作为交换机参数时,调用的就是此类型的交换机(AMQP default)Fanout 此交换机会将接收到的所有消息广播给它所知道的所有队列TopicHeaders 不太常用
创建交换机时需要提供交换机的名称和类型
channel.exchangeDeclare("logs",BuiltinExchangeType.FANOUT);
测试 Fanout 群发模式中使用
先创建一个交换机,然后再绑定对应的队列。
创建队列的时候需要注意,应该创建独占的队列,因为此时队列如果共享的话,又变成了轮流发消息了,自动删除也设置为true,命名时建议使用一个随机的值,附止重复
在Java客户端中,当我们不向queueDeclare()提供任何参数时,会创建一个具有生成名称的、非持久的、独占的、自动删除队列
//自动生成队列名 //非持久,独占,自动删除 String queueName = ch.queueDeclare().getQueue();
package m3;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.140");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 声明一个新的交换机,指定为fanout类型
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
Scanner sc = new Scanner(System.in);
while(true){
System.out.println("输入消息:");
String line = sc.nextLine();
channel.basicPublish("logs",
"",// 在当前的交换机下,是无法选择队列的,所以写不写都不影响
null,//props
line.getBytes(StandardCharsets.UTF_8));
}
}
}
package m3;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.140");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("accept:" + new String(message.getBody()));
};
CancelCallback cancelCallback = consumerTag -> {};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}
订阅模式
实现此模式,使用的是direct交换机,此交换机只会向bindingKey与要发送的消息中包含的routingKey一致的队列中转发消息。
例:
如果有两个消费者,ConsumerA希望接收error、warning和info的消息,ConsumerB希望接收error的消息,则可以在ConsumerA中绑定多件bindingKey
ch.queueBind(queueName, "logs", "info"); ch.queueBind(queueName, "logs", "warning"); ch.queueBind(queueName, "logs", "error");
ConsumerB中只绑定一个bindingKey
ch.queueBind(queueName, "logs", "error");主题模式 主题交换机 Topic exchange
发送到Topic交换机的消息,它的的routingKey,必须是由点分隔的多个单词。单词可以是任何东西,但通常是与消息相关的一些特性。几个有效的routingKey示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey可以有任意多的单词,最多255个字节。
bindingKey也必须采用相同的形式。Topic交换机的逻辑与直连交换机类似——使用特定routingKey发送的消息将被传递到所有使用匹配bindingKey绑定的队列。bindingKey有两个重要的特殊点:
* 可以通配单个单词。# 可以通配零个或多个单词
*.*.cc.dd :可以匹配 aa.bb.cc.dd、eeeee.fda.cc.dd,但是后面这些是不可以的cc.dd、aa.cc.dd、fff.ddd.df.cc.dd
a.#:可以匹配a.开头的所有
如果一个队列可以匹配上的键有多个,消息也只会发送一次,不会发送多次
创建交换机时
ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
绑定交换机时
ch.queueBind(queueName, "topic_logs", bindingKey);



