栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

解锁快速上手NSQ的新姿势

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

解锁快速上手NSQ的新姿势

解锁快速上手NSQ的新姿势 1. 介绍

​ NSQ是Go语言编写的,开源的分布式消息队列中间件,具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,同时支持横向扩展,操作友好,是一个成熟的、已在大规模生成环境下应用的产品。

2. 消息中间件应用场景 2.1 异步处理

参照下图利用消息队列把业务流程中的非关键流程异步化,从而显著降低业务请求的响应时间。
]

2.2 应用解耦

​ 通过使用消息队列将不同的业务逻辑解耦,降低系统间的耦合,提高系统的健壮性。后续有其他业务要使用订单数据可直接订阅消息队列,提高系统的灵活性。

2.3 流量削峰

​ 类似秒杀(大秒)等场景下,某一时间可能会产生大量的请求,使用消息队列能够为后端处理请求提供一定的缓冲区,保证后端服务的稳定性。

3. 组件 3.1 nsqd

​ nsqd是一个守护进程,它接收、排队并向客户端发送消息。默认监听了两个tcp端口,一个(4150)用来服务客户端,一个(4151)用来提供http端口,此外还可以选择性地监听一个可选的https端口。

3.2 nsqlookupd

​ nsqlookupd是维护所有nsqd状态、提供服务发现的守护进程。它能为消费者查找特定topic下的nsqd提供了运行时的自动发现服务。 **它不维持持久状态,也不需要与任何其他nsqlookupd实例协调以满足查询。**因此根据你系统的冗余要求尽可能多地部nsqlookupd节点。它们消耗的资源很少,可以与其他服务共存。我们的建议是为每个数据中心运行至少3个集群。

3.3 nsqadmin

一个实时监控集群状态、执行各种管理任务的Web管理平台。

4. NSQ架构设计 4.1 流程图

4.2 Topic和Channel
  • 每个nsqd实例旨在一次处理多个数据流。这些数据流称为“topics”,一个topic具有1个或多个“channels”。每个channel都会收到topic所有消息的副本,实际上下游的服务是通过对应的channel来消费topic消息。

  • topic和channel不是预先配置的。topic在首次使用时创建,方法是将其发布到指定topic,或者订阅指定topic上的channel。channel是通过订阅指定的channel在第一次使用时创建的。

  • topic和channel都相互独立地缓冲数据,防止缓慢的消费者导致其他chennel的积压(同样适用于topic级别)。

  • channel可以并且通常会连接多个客户端。假设所有连接的客户端都处于准备接收消息的状态,则每条消息将被传递到随机客户端。例如:

    总而言之,消息是从topic -> channel(每个channel接收该topic的所有消息的副本)多播的,但是从channel -> consumers均匀分布(每个消费者接收该channel的一部分消息)。

4.3 NSQ接收和发送消息流程

4.4 特性
  • 消息默认不持久化,可以配置成持久化模式。nsq采用的方式时内存+硬盘的模式,当内存到达一定程度时就会将数据持久化到硬盘。
    • 如果将--mem-queue-size设置为0,所有的消息将会存储到磁盘。
    • 服务器重启时也会将当时在内存中的消息持久化。
  • 每条消息至少传递一次。
  • 消息不保证有序。
5. 安装和使用教程 5.1 安装

下载地址: https://nsq.io/deployment/installing.html,选择对应的环境去下载NSQ

5.2 使用教程

双击nsqlookupd.exe启动

监听了4160的tcp端口和4161的http端口

打开bin的cmd界面 输入命令指定nsqlookupd地址

nsqd --lookupd-tcp-address=127.0.0.1:4160

指定nsqadmin地址 打开nsqadmin

nsqadmin -lookupd-http-address=127.0.0.1:4161

发现监听的4171端口的http服务我们去打开web管理页面,如下:

上述如果遇见admin管理页面打开有红色报错,说明上述服务中,有其中的进程没有启动成功!请按照上面的顺序重新再启动一遍!

6. Demo演示 6.1 生产者
// Package nsq


// nsq_producer/main.go
package main

import (
	"bufio"
	"fmt"
	"os"
	"strings"

	"github.com/nsqio/go-nsq"
)

// NSQ Producer Demo
var producer *nsq.Producer

// 初始化生产者
func initProducer(str string) (err error) {
	config := nsq.NewConfig()
	producer, err = nsq.NewProducer(str, config)
	if err != nil {
		fmt.Printf("create producer failed, err:%vn", err)
		return err
	}
	return nil
}

func main() {
	nsqAddress := "127.0.0.1:4150"
	err := initProducer(nsqAddress)
	if err != nil {
		fmt.Printf("init producer failed, err:%vn", err)
		return
	}

	reader := bufio.NewReader(os.Stdin) // 从标准输入读取
	for {
		data, err := reader.ReadString('n')
		if err != nil {
			fmt.Printf("read string from stdin failed, err:%vn", err)
			continue
		}
		data = strings.TrimSpace(data)
		if strings.ToUpper(data) == "Q" { // 输入Q退出
			break
		}
		// 向 'topic_demo' publish 数据
		err = producer.Publish("topic_demo", []byte(data))
		if err != nil {
			fmt.Printf("publish msg to nsq failed, err:%vn", err)
			continue
		}
	}
}
  1. 运行,且在控制台随便输入一串数据

  1. 发现admin页面有Demo代码中生产者所发布的Topic 【topic_demo】

  1. 点击Topic进来发现确实有一条发布的数据信息

  1. 去控制台再发一段信息,admin确实会加入一条Messages

这个代表信息的消费数量,0则为未消费

下面也可以在admin为集群地址的nsqlookupd手动配置nsqd 的topic和对应的channel

下面可以看到对应主机连接的nsqd节点以及和生产者的关系

6.2 消费者
// Package main

// nsq_consumer/main.go
package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/nsqio/go-nsq"
)

// NSQ Consumer Demo
// MyHandler 是一个消费者类型
type MyHandler struct {
	Title string
}

// HandleMessage 是需要实现的处理消息的方法
func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
	fmt.Printf("%s recv from %v, msg:%vn", m.Title, msg.NSQDAddress, string(msg.Body))
	return
}

// 初始化消费者
func initConsumer(topic string, channel string, address string) (err error) {
	config := nsq.NewConfig()
	config.LookupdPollInterval = 15 * time.Second
	c, err := nsq.NewConsumer(topic, channel, config)
	if err != nil {
		fmt.Printf("create consumer failed, err:%vn", err)
		return
	}
	consumer := &MyHandler{
		Title: "沙河1号",
	}
	c.AddHandler(consumer)

	// if err := c.ConnectToNSQD(address); err != nil { // 直接连NSQD
	if err := c.ConnectToNSQLookupd(address); err != nil { // 通过lookupd查询
		return err
	}
	return nil

}

func main() {
	err := initConsumer("topic_demo", "first", "127.0.0.1:4161")
	if err != nil {
		fmt.Printf("init consumer failed, err:%vn", err)
		return
	}
	c := make(chan os.Signal)        // 定义一个信号的通道
	signal.Notify(c, syscall.SIGINT) // 转发键盘中断信号到c
	<-c                              // 阻塞
}

  1. 运行,拿到之前生产者的两条数据

  1. 下面这个是消费者的策略,代码中定义每15秒去查询一遍nsq中对应topic的数据

  1. 现在生产者再输入数据,消费依然能拿到

发现消费者的channel First消费了3条数据,是对的,且生产者有一个消费者的channel

6.3 第二个消费者

将消费者代码copy一份,并将channel换为two,再次启动,查看生产者在发布一条数据后,两名消费者会表现如何?

后来启动的消费者无法拿到已经被第一个消费者消费的数据了,这时候发现生产者的topic多了一个two的channel绑定上了

生产者发送消息"消费者1号",发现两名消费者都收到了,说明nsqd是广播模式,会发送副本给到topic下的每个channel

文末

​ NSQ的基本使用和功能介绍完毕,进阶功能则是利用好NSQ作为消息中间件的功能去做一个demo使用一下,这将会对掌握有极大的帮助!以及对NSQ的策略可以研究一下,和Demo示例如何更好的写出通用的消息中间件去发布订阅!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/301118.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号