栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

消息服务器学习笔记

消息服务器学习笔记

消息服务、消息队列、消息中间件Broker
常见的消息服务器:

Rabbitma 绝大多数公司都够用了ActivemqRoketmq 再不就用这个KafkaTubemq 达到阿里类似的量的话可以用这个

使用场景:
- 实现消息生产者和消费者之间的解耦合
- 流量的消峰
- 导步的调用,上游服务需要下游服务执行

聊天 :如果毕业后去的公司里面的项目是一些保险啊、银行号这些的项目的话,一般可能都是追求稳定性多一些,所以使用的技术可能还是之前的,建议如果遇到这种的项目尽快跳槽

搭建Rabbitma服务器 Rabbitmq API 测试

说明:Rabbitmq的端口有

5672:收发消息15672:控制台 新建EmptyProject :rabbitmq新建maven module:rabbitmq-api导入依赖



    4.0.0

    cn.tedu
    rabbitmq-api
    1.0-SNAPSHOT
    
        
            com.rabbitmq
            amqp-client
            5.4.3
        
    

    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.8.1
                
                    1.8
                    1.8
                
            
        
    

简单模式

创建m1.Producer,即使用默认交换机和一个队列,一个发,一个收的简单应用场景

package m1;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //连接服务器
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);
        f.setUsername("admin");
        f.setPassword("admin");
        // 得到连接
        Connection connection = f.newConnection();
        // 创建channel
        Channel channel = connection.createChannel();

		
        channel.queueDeclare("hello", false, false, true, null);
        //在服务器上创建一个队列,helloworld
        for (int i = 0; i < 1000; i++) {
			
            channel.basicPublish("", "hello", null, ("helloworld" + i).getBytes());
        }
    }
}

交换机可以从这里看到

创建消费者m1.Consumer

package m1;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.64.140");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 得到连接
		Connection connection = factory.newConnection();
		// 创建channel
        Channel channel = connection.createChannel();
        // 创建收到消息时的回调函数
        DeliverCallback deliverCallback = (String s, Delivery delivery) -> {
            System.out.println("accept context:"+new String(delivery.getBody()));
        };

        CancelCallback cancelCallback = s -> {
            System.out.println("cancel:"+s);
        };
       
        channel.queueDeclare("hello", false, false, true, null);

        
        channel.basicConsume("hello",true,deliverCallback,cancelCallback);
    }
}

可以开始测试了,我自己的机器一个只生产空消息,另一个只接收,不打印,使用虚拟机大概能处理2.6万条/秒 工作模式

多个消费者可以订阅同一个队列,这时消息会平均分摊,轮询给多个消费者,这样一个消费者不会得到全部消息。RabbitMQ不支持队列层面的广播消费

创建m2.Producer

package m2;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;


public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
       //连接服务器
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);
        f.setUsername("admin");
        f.setPassword("admin");
        // 得到连接
        Connection connection = f.newConnection();
        // 创建channel
        Channel channel = connection.createChannel();
        // 定义队列
        channel.queueDeclare("hello", false, false, false, null);
        Scanner sc = new Scanner(System.in);
        while (true) {
            System.out.println("请输入消息:");
            String line = sc.nextLine();
            // 仍然使用默认交换机,发送routingKey为hello的消息
            channel.basicPublish("", "hello", null, line.getBytes(StandardCharsets.UTF_8));
        }
    }
}

创建m2.Consumer

package m2;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //连接服务器
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);
        f.setUsername("admin");
        f.setPassword("admin");
        // 得到连接
        Connection connection = f.newConnection();
        // 创建channel
        Channel channel = connection.createChannel();
    	// 定义队列
        channel.queueDeclare("hello", false, false, false, null);
        DeliverCallback deliverCallback = (consumerTag, message) -> {
        	//处理消息
            printMsg(message.getBody());
        };
        CancelCallback cancelCallback = consumerTag -> {};
		// 消费消息
		// 第二个参数:自动确认
		// 即服务端发出消息后直接确认发出成功
        channel.basicConsume("hello", true, deliverCallback, cancelCallback);
    }

    public static void printMsg(byte[] bytes){
        long t1 = System.currentTimeMillis();
        String s = new String(bytes);
        System.out.println(s);
        // 遍历字符串中每一个字符
        for (int i = 0; i < s.length(); i++) {
        	// 如果遇到'.'这个字符就暂停一秒
        	// 用来模拟那些比较耗时的请求的处理
            if (s.charAt(i) == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        System.out.printf("-------------消息处理完成(耗时:%s)----------------n", (System.currentTimeMillis() - t1));
    }
}

运行两个consumer ,然后运行一个producer,多次生产数据,观察消费者,可以发现消息是轮流发往消费者的。而且无论是否有一方的上一个消息还没有消费完,仍然会按一人一个消息的方式来分发。合理分发的实现:由于上面都是使用自动确认消息的,即服务器发出消息后直接就确认消息发送成功了,所以服务器方面并不知道消费者是否已经消费完毕,所以如果希望分发时只分发给那些空闲的消费者,则可以使用非自动确认消息的方式。即消费者收到消息后,需要手动调用

 // 第一个参数在message.getEnvelope()对象中
// 第二个参数的意思是:是否一同确认之前接收到的消息
// 否就是只确认当前消息消费完成
channel.basicAck(deliverTag,false);

来告诉服务器自己的消息已经消费完成,这样服务器也就知道了哪些消费者空闲、哪些繁忙了,也就会自动实现优先向空闲消费者分发消息了。如果消费者在消费完毕前down掉了(信道关闭,连接关闭或者TCP链接丢失),就是没有发送确认消息给服务器,则服务器会自动回滚此条消息,以确保消息不会丢失。
当处理消息时异常中断, 可以选择让消息重回队列重新发送.
nack 操作可以是消息重回队列, 可以使用 basicNack() 方法:

// requeue为true时重回队列, 反之消息被丢弃或被发送到死信队列
c.basicNack(tag, multiple, requeue)

这里不存在消息超时, rabbitmq只在消费者挂掉时重新分派消息, 即使消费者花非常久的时间来处理消息也可以
手动消息确认默认是开启的

qos预抓取的消息数量,消费者还可以手动指定接收一次消息的容量、条数。如果指定为1,意思是一次只接收一条消息,在消费完毕前不会接收下一条消息

DeliverCallback deliverCallback = (consumerTag, message) -> {
    printMsg(message.getBody());
   	//手动回复服务器一条确认消息
    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = consumerTag -> {};
// 设置预收取一条,处理完之前不收下一条,手动ack模式下才有效
channel.basicQos(1);
// 这里第二个参数是false,意思是不自动确认消息消费完成
channel.basicConsume("hello-m2",false, deliverCallback, cancelCallback);
消息的持久化

当rabbitmq关闭时, 我们队列中的消息仍然会丢失, 除非明确要求它不要丢失数据

要求rabbitmq不丢失数据要做如下两点: 把队列和消息都设置为可持久化(durable)

队列设置为可持久化, 可以在定义队列时指定参数durable为true

队列持久化
已经创建好的队列的参数是不能修改的,即如果创建的时候参数是不持久化,则后面也不能修改成持久化的,只能重新创建。

// 创建时就指定是持久化队列,第二个参数指定为true
channel.queueDeclare("hello", true, false, false, null);

消息持久化

channel.basicPublish("", 
				 "hello",
				 MessageProperties.PERSISTENT_BASIC, //常量类里面的常量
				 line.getBytes(StandardCharsets.UTF_8));
群发模式

生产者发出消息后,需要所有消费者都消费全部的消息的话 Rabbit交换机

交换机是不保存消息的,如果交换机上没有绑定队列,那么发给此交换机的消息都会被丢弃

Direct 默认交换机,当使用""作为交换机参数时,调用的就是此类型的交换机(AMQP default)Fanout 此交换机会将接收到的所有消息广播给它所知道的所有队列TopicHeaders 不太常用

创建交换机时需要提供交换机的名称和类型

channel.exchangeDeclare("logs",BuiltinExchangeType.FANOUT);

测试 Fanout 群发模式中使用
先创建一个交换机,然后再绑定对应的队列。
创建队列的时候需要注意,应该创建独占的队列,因为此时队列如果共享的话,又变成了轮流发消息了,自动删除也设置为true,命名时建议使用一个随机的值,附止重复

在Java客户端中,当我们不向queueDeclare()提供任何参数时,会创建一个具有生成名称的、非持久的、独占的、自动删除队列

//自动生成队列名
//非持久,独占,自动删除
String queueName = ch.queueDeclare().getQueue();
package m3;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;


public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.64.140");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // 声明一个新的交换机,指定为fanout类型
        channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
        Scanner sc = new Scanner(System.in);
        while(true){
            System.out.println("输入消息:");
            String line = sc.nextLine();
            channel.basicPublish("logs",
                    "",// 在当前的交换机下,是无法选择队列的,所以写不写都不影响
                    null,//props
                    line.getBytes(StandardCharsets.UTF_8));
        }
    }
}
package m3;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.64.140");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, "logs", "");
        
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("accept:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = consumerTag -> {};
        
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}

订阅模式

实现此模式,使用的是direct交换机,此交换机只会向bindingKey与要发送的消息中包含的routingKey一致的队列中转发消息。
例:
如果有两个消费者,ConsumerA希望接收error、warning和info的消息,ConsumerB希望接收error的消息,则可以在ConsumerA中绑定多件bindingKey

ch.queueBind(queueName, "logs", "info");
ch.queueBind(queueName, "logs", "warning");
ch.queueBind(queueName, "logs", "error");

ConsumerB中只绑定一个bindingKey

ch.queueBind(queueName, "logs", "error");
主题模式 主题交换机 Topic exchange

发送到Topic交换机的消息,它的的routingKey,必须是由点分隔的多个单词。单词可以是任何东西,但通常是与消息相关的一些特性。几个有效的routingKey示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey可以有任意多的单词,最多255个字节。
bindingKey也必须采用相同的形式。Topic交换机的逻辑与直连交换机类似——使用特定routingKey发送的消息将被传递到所有使用匹配bindingKey绑定的队列。bindingKey有两个重要的特殊点:

* 可以通配单个单词。# 可以通配零个或多个单词

*.*.cc.dd :可以匹配 aa.bb.cc.dd、eeeee.fda.cc.dd,但是后面这些是不可以的cc.dd、aa.cc.dd、fff.ddd.df.cc.dd
a.#:可以匹配a.开头的所有
如果一个队列可以匹配上的键有多个,消息也只会发送一次,不会发送多次

创建交换机时

ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);

绑定交换机时

ch.queueBind(queueName, "topic_logs", bindingKey);
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/711629.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号