栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

基于channel的通信模型实践

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

基于channel的通信模型实践

前言

channel 作为 Go 核心的数据结构和 Goroutine 之间的通信方式,Channel 是支撑 Go 语言高性能并发编程模型的重要结构本节会介绍管道 Channel 的设计原理、数据结构和常见操作,例如 Channel 的创建、发送、接收和关闭。 ​

在进入主题内容之前,读者需要先掌握下表中的不同状态下的channel执行Read、Write、close操作所会产生的结果。

图来自 曹大 https://github.com/cch123 ​

Go 语言中最常见的、也是经常被人提及的设计模式就是:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。虽然我们在 Go 语言中也能使用共享内存加互斥锁进行通信,但是 Go 语言提供了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据。 本文将会介绍基于channel实现的多种通信模型 ​

MPSC:多生产者单消费者模型

对于MPSC的应用场景中,多个生产者负责生产数据,只有一个消费者来消费数据, 而这个模型又可分为两种实现方式:

1,多个生产者是公用一个channel 来和消费者通信
2,使用自己独有的channel来和消费者通信

变种一:生产者共用channel

如图所示,左边有多个生产goroutine往公共的channel中写入数据,右边只有一个消费goroutine从这个channel中读取数据进行处理。

基础版

我们首先定义传递的消息结构体定义:

type Msg struct {
    in int
}

然后实现生产者如下,其中参数 sendChan就是生产者和消费者进行通信的channel

// 生产者
func producer(sendChan chan Msg) {
    for i := 0; i < 10; i++ {
        sendChan <- Msg{in: i}
    }
}

消费者以及消息处理函数定义如下,其中参数 sendChan就是生产者和消费者进行通信的channel,当前消息处理函数process目前只是把消息内容打印出来。

// 消费者
func consumer(sendChan chan Msg) {
    for v := range sendChan {
        process(v)
    }
}

// 消息处理函数
func process(msg Msg){
    fmt.Println(msg)
}

mpsc的模型代码如下,首先创建通信用的channel,然后开启三个生产者goroutine,一个消费者goroutine。

func mpsc() {

    sendChan := make(chan Msg, 10)

    for p := 0; p < 3; p++ {
        go producer(sendChan)
    }

    go consumer(sendChan)
}

main函数如下, 里面 select{} 是为了保持main函数所在的goroutine一直阻塞,不然main函数立刻退出后,生产者和消费者goroutine说不定还没有执行或只执行了一部分就退出了。

func main() {
    mpsc()
    select{}
}

完整代码 在线演示 https://www.online-ide.com/IBPZGOdesk 结果如下

{0}
{1}
{2}
{3}
{4}
{5}
{6}
{7}
{8}
{9}
{0}
{1}
{2}
{3}
{4}
{5}
{6}
{7}
{8}
{9}
{0}
{1}
{2}
{3}
{4}
{5}
{6}
{7}
{8}
{9}
fatal error: 
all goroutines are asleep - deadlock!
goroutine 1 [select (no cases)]:
main.main()
    /home/kingeasternsun/1ba7ba19-5e66-4f4c-beb3-cb5e3e6d881e/main.go:9 +0x25
goroutine 9 [chan receive]:
main.consumer(0xc000072000)
    /home/kingeasternsun/1ba7ba19-5e66-4f4c-beb3-cb5e3e6d881e/main.go:25 +0xa9
created by main.mpsc
    /home/kingeasternsun/1ba7ba19-5e66-4f4c-beb3-cb5e3e6d881e/main.go:43 +0x9b
exit status 2


** Process exited - Return Code: 1 **

可以看到打印的数字是由交叉的,说明多个生产者并发的执行了写入。 不过最后生产者发送完后程序有两个报错:

1,第一条 goroutine 1 [select (no cases)]: 是指 select{} 一直阻塞,
2,第二条 goroutine 9 [chan receive]: 是指在于生产者发送完数据库后没有新的数据写入channel,而消费者消费完channel中的数据库,再从空的channel中读数据就会一致阻塞,所以上面报fatal error: all goroutines are asleep - deadlock! 这个错误。

因为我们是demo,为了保持main活着使用了 select{} ,实际项目中mspc往往会在一个持续运行的程序中调用,所以就没有上面的问题了。 ​

当然我们也可以直接来修复这个错误,让生产者都发送完后给消费者发消息让其退出即可。

修复deadlock问题

要等生产者都发送完后才给消费者发消息,那么我们就需要使用sync.WaitGroup来进行同步。 生产者如何给消费者发消息,同时要保证消费者把之前的消息处理完后才退出。我们可以又两种方案:

1,发送一个特殊标记的Msg,标记这个消息是终止消息
2,close channel

第1种方案会造成通信额外的内存占用消耗,推荐第2种方案。 ​

首先修改生产者代码如下,入参多了一个 wg *sync.WaitGroup , 生产者发送完数据后调用wg.Done()

// 生产者
func producer(sendChan chan Msg, wg *sync.WaitGroup) {
    for i := 0; i < 10; i++ {
        sendChan <- Msg{in: i}
    }

    wg.Done()
}

然后mpsc模型改写为如下:

func mpsc() {

    // 生产者个数
    pNum := 3
    sendChan := make(chan Msg, 10)

    wg := sync.WaitGroup{}
    wg.Add(pNum)
    for p := 0; p < pNum; p++ {
        go producer(sendChan, &wg)
    }

    // 等待生产者都完成后关闭 sendChan 通知到 消费者
    go func() {
        wg.Wait()
        close(sendChan)
    }()

    consumer(sendChan)
}

可以看到在mpsc中有以下几个变化

1,新起了一个goroutine,利用wg.Wait()等待生产者都完成,然后去关闭channel;
2,consumer(sendChan) 不再新起一个goroutine来执行,这样mspc就变成阻塞的,等待消费者正常结束。

因为mpsc本身是阻塞的了,所以我们在main中只需要调用mpsc即可

func main() {
    mpsc()
}

完整代码:

package main

import (
	"fmt"
	"sync"
)

func main() {
	mpsc()
}

type Msg struct {
	in int
}

// 生产者
func producer(sendChan chan Msg, wg *sync.WaitGroup) {
	for i := 0; i < 10; i++ {
		sendChan <- Msg{in: i}
	}

	wg.Done()
}

// 消费者
func consumer(sendChan chan Msg) {
	for v := range sendChan {
		process(v)
	}
}

// 消息处理函数
func process(msg Msg) {
	fmt.Println(msg)
}

func mpsc() {

	// 生产者个数
	pNum := 3
	sendChan := make(chan Msg, 10)

	wg := sync.WaitGroup{}
	wg.Add(pNum)
	for p := 0; p < pNum; p++ {
		go producer(sendChan, &wg)
	}

	// 等待生产者都完成后关闭 sendChan 通知到 消费者
	go func() {
		wg.Wait()
		close(sendChan)
	}()

	consumer(sendChan)
}

生产者消费者双向通信

当前模型中,生产者把消息发送给消费者后,并不知道消息处理的结果,如果生产者想要知道消息的处理结果,该如何改动呢? 其中一个比较常见的方法就是每个生产者维护一个自己私有的channel,然后在发送消息的时候,把自己私有的channel连同消息一起发送给消费者,消费者处理消息后,再将处理结果通过消息中的channel发送回生产者。 ​

首先消息类型定义中新增一个channel成员,用于存储生产者的私有channel

type Msg struct {
    in int
    ch chan int
}

生产者开始的时候会新建一个独有的channel,然后在发送消息的时候把这个通道放入到Msg中,同时生产者会新启一个goroutine从这个独有的channel中接受消费者返回的效应。

// 生产者
func producer(sendChan chan Msg, wg *sync.WaitGroup) {
    recvCh := make(chan int)
    go func() {
        for v := range recvCh {
            fmt.Println("recv ", v)
        }
    }()

    for i := 0; i < 10; i++ {
        sendChan <- Msg{in: i, ch: recvCh}
    }

    wg.Done()
}

最后修改消息处理函数如下,将接受的消息中的value加倍后通过消息中的channel传递回去。

// 消息处理函数
func process(msg Msg) {
    msg.ch <- 2 * msg.in
}

完整代码:

package main

import (
	"fmt"
	"sync"
)

func main() {
	mpsc()
}

type Msg struct {
	in int
	ch chan int
}

// 生产者
func producer(sendChan chan Msg, wg *sync.WaitGroup) {
	recvCh := make(chan int)
	go func() {
		for v := range recvCh {
			fmt.Println("recv ", v)
		}
	}()

	for i := 0; i < 10; i++ {
		sendChan <- Msg{in: i, ch: recvCh}
	}

	wg.Done()
}

// 消费者
func consumer(sendChan chan Msg) {
	for v := range sendChan {
		process(v)
	}
}

// 消息处理函数
func process(msg Msg) {
	msg.ch <- 2 * msg.in
}

func mpsc() {

	// 生产者个数
	pNum := 3
	sendChan := make(chan Msg, 10)

	wg := sync.WaitGroup{}
	wg.Add(pNum)
	for p := 0; p < pNum; p++ {
		go producer(sendChan, &wg)
	}

	// 等待生产者都完成后关闭 sendChan 通知到 消费者
	go func() {
		wg.Wait()
		close(sendChan)
	}()

	consumer(sendChan)
}

目前为止生产者给消费者发送消息是公用一个channel,还有一种方案时生产者使用自己独有的channel给消费者发送消息。 ​

变种二:生产者使用独有的channel和消费者通信


在这种方案中,每一个生产者维护一个独有的channel和消费者通信,消费者监听这些channel来获取消息进行处理。

基础版

对于生产者,会创建一个独有的channel,返回出去给消费者读取,同时内部新起一个goroutine来往这个channel中发送数据,代码如下:

func producer(in []int) chan Msg {
    ch := make(chan Msg)
    go func() {
        for _, v := range in {
            ch <- Msg{in: v}
        }
        close(ch)
    }()
    return ch
}

消费者会同时监听多个生产者的channel读取消息,对应的消费者如下:

func consumer(ch1, ch2 chan Msg) {
    for {
        select {
        case v1 := <-ch1:
            fmt.Println(v1)
        case v2 := <-ch2:
            fmt.Println(v2)
        }
    }
}

对应的mpsc模型如下:

func mpsc() {
    ch1 := producer([]int{1, 2, 3})
    ch2 := producer([]int{4, 5, 6})

    consumer(ch1, ch2)
}

完整代码:

package main

import "fmt"

type Msg struct {
	in int
}

func producer(in []int) chan Msg {
	ch := make(chan Msg)
	go func() {
		for _, v := range in {
			ch <- Msg{in: v}
		}
		close(ch)
	}()
	return ch
}

func consumer(ch1, ch2 chan Msg) {
	for {
		select {
		case v1 := <-ch1:
			fmt.Println(v1)
		case v2 := <-ch2:
			fmt.Println(v2)
		}
	}
}

func mpsc() {
	ch1 := producer([]int{1, 2, 3})
	ch2 := producer([]int{4, 5, 6})

	consumer(ch1, ch2)
}
func main() {
	mpsc()
}

实际执行的时候会发现问题,当所有生产者发送完数据close自己的channel后,消费者还在不停的从channel里面接受数据,不过接收的数据值都是0. 其中原因在于close的通道仍然时可读的,读取的时候实际会返回两个值,第一个值是个零值,第二个值是一个bool值,标识当前通道是否close了,所以我们在代码中要读取这个bool值来判断当前的channel是否关闭。 另外select中的多个channel可能其中一个关闭了,但是其它的channel并没有close,仍然有数据可读,那如何让消费者跳过已经close的channel呢?我们可以把已经关闭的channel设置为nil,读写nil的channel都是阻塞的,所以在select中就会跳过这些channel了。 ​

修复版

根据上文所说,我们要做以下几点的修改

1,读取channel的关闭标志,来判断当前channel是否close了
2,如果当前channel已经close了,那么就将当前channel设置为nil
3,当所有的channel都关闭了,消费者就退出

代码如下:

func consumer(ch1, ch2 chan Msg) {
    var v1 Msg
    var v2 Msg
    ok1 := true
    ok2 := true

    for ok1 || ok2 {
        select {
        case v1, ok1 = <-ch1:
            fmt.Println(v1)
            if !ok1 { //通道关闭了
                ch1 = nil
            }
        case v2, ok2 = <-ch2:
            fmt.Println(v2)
            if !ok2 { //通道关闭了
                ch2 = nil
            }
        }
    }
}

完整代码:

package main

import "fmt"

type Msg struct {
	in int
}

func producer(in []int) chan Msg {
	ch := make(chan Msg)
	go func() {
		for _, v := range in {
			ch <- Msg{in: v}
		}
		close(ch)
	}()
	return ch
}

func consumer(ch1, ch2 chan Msg) {
	var v1 Msg
	var v2 Msg
	ok1 := true
	ok2 := true

	for ok1 || ok2 {
		select {
		case v1, ok1 = <-ch1:
			fmt.Println(v1)
			if !ok1 { //通道关闭了
				ch1 = nil
			}
		case v2, ok2 = <-ch2:
			fmt.Println(v2)
			if !ok2 { //通道关闭了
				ch2 = nil
			}
		}
	}
}

func mpsc() {
	ch1 := producer([]int{1, 2, 3})
	ch2 := producer([]int{4, 5, 6})

	consumer(ch1, ch2)
}
func main() {
	mpsc()
}

SPMC:单生产者多消费者模型


如图,单个生产者和多个消费者之间通过共有的一个channel进行通信,生产者往channel里面写消息,多个消费者争抢着从channel中读取消息进行处理,这个模型非常像一个消息队列中的FanOut模型。 ​

生产者负责往channel写消息,写完后关闭channel

func producer(ch chan Msg) {
    in := []int{1, 2, 3, 4, 5, 6}
    for _, v := range in {
        ch <- Msg{in: v}
    }
    close(ch)

}

消费者负责从channel中读取消息

func consumer(ch chan Msg) {
    for v := range ch {
        fmt.Println(v)
    }
}

spmc模型代码如下:

func spmc() {

    ch := make(chan Msg)
    go producer(ch)
    go consumer(ch)
    go consumer(ch)
    go consumer(ch)

}

spmc模型处理关闭特别方便,因为只有一个生产者,所以生产者发送完消息后就可以close这个channel,然后下游的消费者都可以在channel中数据被读完后自动退出。for range 操作channel的时候,如果读取到channel返回的bool值是false就会退出循环。 ​

因为spmc里面生产者和消费者都是异步的,所以main只有保持一直阻塞才能让逻辑正常执行。

func main() {
    spmc()
    select {}
}

完整代码:

package main

import "fmt"

type Msg struct {
	in int
}

func producer(ch chan Msg) {
	in := []int{1, 2, 3, 4, 5, 6}
	for _, v := range in {
		ch <- Msg{in: v}
	}
	close(ch)

}

func consumer(ch chan Msg) {
	for v := range ch {
		fmt.Println(v)
	}
}

func spmc() {

	ch := make(chan Msg)
	go producer(ch)
	go consumer(ch)
	go consumer(ch)
	go consumer(ch)

}

func main() {
	spmc()
	select {}
}

另外一种常见的写法如下,producer在内部创建channel然后返回出去 ​

func producer() chan Msg {
    in := []int{1, 2, 3, 4, 5, 6}
    ch := make(chan Msg)
    go func() {
        for _, v := range in {
            ch <- Msg{in: v}
        }
        close(ch)
    }()
    return ch
}

然后消费者从producer返回的channel中读取消息。 spmc模型如下

func spmc() {

    ch := producer()
    go consumer(ch)
    go consumer(ch)
    go consumer(ch)

}

完整代码:

package main

import "fmt"

type Msg struct {
	in int
}

func producer() chan Msg {
	in := []int{1, 2, 3, 4, 5, 6}
	ch := make(chan Msg)
	go func() {
		for _, v := range in {
			ch <- Msg{in: v}
		}
		close(ch)
	}()
	return ch
}

func consumer(ch chan Msg) {
	for v := range ch {
		fmt.Println(v)
	}

}

func spmc() {

	ch := producer()
	go consumer(ch)
	go consumer(ch)
	go consumer(ch)

}

func main() {
	spmc()
	select {}
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/349218.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号