栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 数据挖掘与分析

NSQ源码分析之nsqd

NSQ源码分析之nsqd

概述

最近想通过观察分析NSQ的源码,来更进一步学习golang,编程嘛,就是多看、多写、多思考,没有捷径。
系列文章采用的nsq版本为1.2.1

什么是NSQ

引用官方文档的话:NSQ is a realtime distributed messaging platform designed to operate at scale, handling billions of messages per day.

NSQ特性和使用

网上很多介绍NSQ的文章,这里就不再赘述。
文章链接

NSQ的架构

文本源码分析的思路就是根据NSQ的不同功能模块,逐一击破。这样我就需要首先了解其架构。

从整体架构上来看,NSQ可以分为五个部分:

  • producer:服务端生产者。
  • comsumer:服务端消费者。
  • lookupd:lookupd是守护进程负责管理拓扑信息。客户端通过查询 lookupd 来发现指定 topic的生产者,并且nsqd 节点广播 topic 和 channel 信息。
  • nsqd:nsqd 是一个守护进程,负责接收,排队,投递消息给客户端。
  • admin: 是一套web UI,用来汇集集群的实时统计,并执行不同的管理任务。

在这篇文章中,我们首先对nsqd的源码进行分析。

Topic和Channel

每个nsqd实例旨在一次处理多个数据流。这些数据流称为“topics”,一个topic具有1个或多个“channels”。每个channel都会收到topic所有消息的副本,实际上下游的服务是通过对应的channel来消费topic消息。

topic和channel不是预先配置的。topic在首次使用时创建,方法是将其发布到指定topic,或者订阅指定topic上的channel。channel是通过订阅指定的channel在第一次使用时创建的。

topic和channel都相互独立地缓冲数据,防止缓慢的消费者导致其他chennel的积压(同样适用于topic级别)。

channel可以并且通常会连接多个客户端。假设所有连接的客户端都处于准备接收消息的状态,则每条消息将被传递到随机客户端。

参考博客

源码分析 主要的一些结构体 nsqd

首先我们从最开始的nsqd来一步步看看对应的数据,可以窥探出一些模块的主要功能和依赖关系。

type NSQD struct {
	// 64bit atomic vars need to be first for proper alignment on 32bit platforms
	clientIDSequence int64

	// 读写锁
	sync.RWMutex
	// 线程参数
	ctx context.Context
	// ctxCancel cancels a context that main() is waiting on
	ctxCancel context.CancelFunc
	// 启动时的配置参数
	opts atomic.Value
	// 锁,里面就是存一个字符串,存储着持久化diskqueue的系统文件路径
	dl        *dirlock.DirLock
	isLoading int32
	isExiting int32
	errValue  atomic.Value
	startTime time.Time
	// 存储这个nsqd的所有Topic
	topicMap map[string]*Topic

	lookupPeers atomic.Value
	// TCP服务器控制块,用于存储所有TCP链接的conn
	tcpServer     *tcpServer
	// TCP、http、https监听器
	tcpListener   net.Listener
	httpListener  net.Listener
	httpsListener net.Listener
	// 安全传输层协议配置
	tlsConfig     *tls.Config
	// 缓存池大小
	poolSize int
	// 不同功能的通道
	notifyChan           chan interface{}
	optsNotificationChan chan struct{}
	exitChan             chan int
	// 多线程时的同步等待组
	waitGroup            util.WaitGroupWrapper

	ci *clusterinfo.ClusterInfo
}

topic

从上面的接口题可以看到,一个nsqd存储了所有的Topic,我们再看看Topic的数据结构。

type Topic struct {
	// 64bit atomic vars need to be first for proper alignment on 32bit platforms
	messageCount uint64
	messageBytes uint64
	// 读写锁
	sync.RWMutex
	// topic的名称
	name              string
	// 这个topic对应的所有channel
	channelMap        map[string]*Channel
	// 负责向磁盘文件中写入消息、从磁盘文件中读取消息,是NSQ实现数据持久化的最重要结构
	backend           BackendQueue
	// 内存消息通道
	memoryMsgChan     chan *Message
	startChan         chan int
	exitChan          chan int
	channelUpdateChan chan int
	// 多线程时的同步等待组
	waitGroup         util.WaitGroupWrapper
	// 退出标记
	exitFlag          int32
	// 生产消息ID的工厂对象
	idFactory         *guidFactory
	// 是否为临时topic
	ephemeral      bool
	// 删除topic的方法指针
	deleteCallback func(*Topic)
	// 用于调用上面的删除回调函数,保证并发请客下,只调用一次
	deleter        sync.once
	// 暂停标记
	paused    int32
	pauseChan chan int
	// 对应的nsqd指针
	nsqd *NSQD
}
channel

Topic中有一个哈希表,存储其对应的所有的channel,我们再看channel的数据结构。

type Channel struct {
	// 64bit atomic vars need to be first for proper alignment on 32bit platforms
	requeueCount uint64
	messageCount uint64
	timeoutCount uint64
	// 读写锁
	sync.RWMutex
	// chanel对应的topic名字
	topicName string
	// channel名字
	name      string
	// 对应的nsqd指针
	nsqd      *NSQD
	// 负责向磁盘文件中写入消息、从磁盘文件中读取消息,是NSQ实现数据持久化的最重要结构
	backend BackendQueue
	// 内存消息通道
	memoryMsgChan chan *Message
	// 退出标志
	exitFlag      int32
	// 退出读写锁
	exitMutex     sync.RWMutex

	// state tracking
	// 存储监听这个channel的所有的消费者
	clients        map[int64]Consumer
	// 暂停标志
	paused         int32
	// 是否为临时channel
	ephemeral      bool
	// 删除channel函数指针
	deleteCallback func(*Channel)
	// 用于调用上面的删除回调函数,保证并发请客下,只调用一次
	deleter        sync.once

	// Stats tracking
	e2eProcessingLatencyStream *quantile.Quantile

	// TODO: these can be DRYd up
	// 对应的消息队列
	deferredMessages map[MessageID]*pqueue.Item
	deferredPQ       pqueue.PriorityQueue
	deferredMutex    sync.Mutex
	inFlightMessages map[MessageID]*Message
	inFlightPQ       inFlightPqueue
	inFlightMutex    sync.Mutex
}
主要的一些流程 nsqd创建

观察nsqd的创建流程,也可以帮助我们理解nsqd结构体中一些成员的含义。最后,可以看到nsqd守护进程开启的所有功能协程,比较清晰。

// 传入的参数是一个巨大的配置类,包括nsqd的各种参数信息
func New(opts *Options) (*NSQD, error) {
	var err error

	// 获取持久化的文件夹路径信息
	dataPath := opts.DataPath
	if opts.DataPath == "" {
		cwd, _ := os.Getwd()
		dataPath = cwd
	}
	// 配置日志信息
	if opts.Logger == nil {
		opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
	}
	// 实例化一个nsqd,分配内存
	n := &NSQD{
		startTime:            time.Now(),
		topicMap:             make(map[string]*Topic),
		exitChan:             make(chan int),
		notifyChan:           make(chan interface{}),
		optsNotificationChan: make(chan struct{}, 1),
		// 这里可以看到,将持久化的系统路径传入了这个锁中,应该是用于持久化时的并发安全
		dl:                   dirlock.New(dataPath),
	}
	n.ctx, n.ctxCancel = context.WithCancel(context.Background())
	// 获取http的客户端
	httpcli := http_api.NewClient(nil, opts.HTTPClientConnectTimeout, opts.HTTPClientRequestTimeout)
	n.ci = clusterinfo.New(n.logf, httpcli)
	// 保存所有nsqlookup的连接和读写
	n.lookupPeers.Store([]*lookupPeer{})
	// 存入opts
	n.swapOpts(opts)
	n.errValue.Store(errStore{})
	// 给路径加锁,不过当前函数内部还是空逻辑
	err = n.dl.Lock()
	if err != nil {
		return nil, fmt.Errorf("failed to lock data-path: %v", err)
	}
	// 都是一些参数的合法性校验
	if opts.MaxDeflateLevel < 1 || opts.MaxDeflateLevel > 9 {
		return nil, errors.New("--max-deflate-level must be [1,9]")
	}

	if opts.ID < 0 || opts.ID >= 1024 {
		return nil, errors.New("--node-id must be [0,1024)")
	}

	if opts.TLSClientAuthPolicy != "" && opts.TLSRequired == TLSNotRequired {
		opts.TLSRequired = TLSRequired
	}
	// 传输层密钥配置
	tlsConfig, err := buildTLSConfig(opts)
	if err != nil {
		return nil, fmt.Errorf("failed to build TLS config - %s", err)
	}
	if tlsConfig == nil && opts.TLSRequired != TLSNotRequired {
		return nil, errors.New("cannot require TLS client connections without TLS key and cert")
	}
	n.tlsConfig = tlsConfig
	// 端到端网络延迟百分数
	for _, v := range opts.E2EProcessingLatencyPercentiles {
		if v <= 0 || v > 1 {
			return nil, fmt.Errorf("invalid E2E processing latency percentile: %v", v)
		}
	}
	// 日志打印
	n.logf(LOG_INFO, version.String("nsqd"))
	n.logf(LOG_INFO, "ID: %d", opts.ID)
	// 注册TCP服务端
	n.tcpServer = &tcpServer{nsqd: n}
	// 注册TCP监听器
	n.tcpListener, err = net.Listen("tcp", opts.TCPAddress)
	if err != nil {
		return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress, err)
	}
	// 注册HTTP监听器
	n.httpListener, err = net.Listen("tcp", opts.HTTPAddress)
	if err != nil {
		return nil, fmt.Errorf("listen (%s) failed - %s", opts.HTTPAddress, err)
	}
	// 注册HTTPS监听器,上面配置的传输层加密协议就用到了这里
	if n.tlsConfig != nil && opts.HTTPSAddress != "" {
		n.httpsListener, err = tls.Listen("tcp", opts.HTTPSAddress, n.tlsConfig)
		if err != nil {
			return nil, fmt.Errorf("listen (%s) failed - %s", opts.HTTPSAddress, err)
		}
	}
	// 配置默认HTTP广播端口号
	if opts.BroadcastHTTPPort == 0 {
		opts.BroadcastHTTPPort = n.RealHTTPAddr().Port
	}
	// 配置默认TCP广播端口号
	if opts.BroadcastTCPPort == 0 {
		opts.BroadcastTCPPort = n.RealTCPAddr().Port
	}
	// 配置默认统计前缀
	if opts.StatsdPrefix != "" {
		var port string = fmt.Sprint(opts.BroadcastHTTPPort)
		statsdHostKey := statsd.HostKey(net.JoinHostPort(opts.BroadcastAddress, port))
		prefixWithHost := strings.Replace(opts.StatsdPrefix, "%s", statsdHostKey, -1)
		if prefixWithHost[len(prefixWithHost)-1] != '.' {
			prefixWithHost += "."
		}
		opts.StatsdPrefix = prefixWithHost
	}

	return n, nil
}
nsqd启动

nsqd也是一个单独的守护进程,看看它在运行的时候到底做了哪些事情。

func (n *NSQD) Main() error {
    // 创建退出通道,当进程需要关闭的时候,通知各个携程
	exitCh := make(chan error)
	// 保证并发下只执行一次的锁
	var once sync.once
	// 定义协程退出函数,输出退出错误原因
	exitFunc := func(err error) {
		once.Do(func() {
			if err != nil {
				n.logf(LOG_FATAL, "%s", err)
			}
			exitCh <- err
		})
	}
	// 开启TCP监听
	n.waitGroup.Wrap(func() {
		exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
	})
	// 开启HTTP服务监听
	httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
	n.waitGroup.Wrap(func() {
		exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
	})
	// 开启HTTPS监听
	if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
		httpsServer := newHTTPServer(n, true, true)
		n.waitGroup.Wrap(func() {
			exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
		})
	}
  // 开启队列扫描,维护在传输和存储的优先队列
	n.waitGroup.Wrap(n.queueScanLoop)
	// 开启lookup协程
	n.waitGroup.Wrap(n.lookupLoop)
	// 开启统计协程
	if n.getOpts().StatsdAddress != "" {
		n.waitGroup.Wrap(n.statsdLoop)
	}
	// 接收错误信息,返回
	err := <-exitCh
	return err
}

可以看到,nsqd进程中主要包括了:

  • TCP监听
  • HTTP监听
  • HTTPS监听
  • queueScanLoop
  • lookupLoop
  • statsdLoop

其实主要可以包括两大部分,连接建立和逻辑循环,然后我们下一步分析每个开启协程中做了什么。

TCP监听
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
	logf(lg.INFO, "TCP: listening on %s", listener.Addr())

	var wg sync.WaitGroup

	for {
		// 死循环,不停监听TCP端口
		clientConn, err := listener.Accept()
		if err != nil {
			if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
				logf(lg.WARN, "temporary Accept() failure - %s", err)
				runtime.Gosched()
				continue
			}
			// theres no direct way to detect this error because it is not exposed
			if !strings.Contains(err.Error(), "use of closed network connection") {
				return fmt.Errorf("listener.Accept() error - %s", err)
			}
			break
		}
		// 如果成功建立连接,那就单独开启一个协程来处理连接
		wg.Add(1)
		go func() {
			handler.Handle(clientConn)
			wg.Done()
		}()
	}

	// wait to return until all handler goroutines complete
	wg.Wait()

	logf(lg.INFO, "TCP: closing %s", listener.Addr())

	return nil
}

当建立连接之后,就会将连接交给handle处理。

func (p *tcpServer) Handle(conn net.Conn) {
	p.nsqd.logf(LOG_INFO, "TCP: new client(%s)", conn.RemoteAddr())

	// 获取客户端发送的协议名称来初始化
	buf := make([]byte, 4)
	_, err := io.ReadFull(conn, buf)
	if err != nil {
		p.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
		conn.Close()
		return
	}
	protocolMagic := string(buf)

	p.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
		conn.RemoteAddr(), protocolMagic)
  // 创建对应的协议管理器
	var prot protocol.Protocol
	switch protocolMagic {
	case "  V2":
		prot = &protocolV2{nsqd: p.nsqd}
	default:
		protocol.SendframedResponse(conn, frameTypeError, []byte("E_BAD_PROTOCOL"))
		conn.Close()
		p.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
			conn.RemoteAddr(), protocolMagic)
		return
	}
	// 将连接存储到TCPmap当中
	client := prot.NewClient(conn)
	p.conns.Store(conn.RemoteAddr(), client)
  // 开始运行这个客户端的连接,进行通信
	err = prot.IOLoop(client)
	if err != nil {
		p.nsqd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr(), err)
	}
	// 退出连接
	p.conns.Delete(conn.RemoteAddr())
	client.Close()
}
结尾
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/278578.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号