Go Channel
Go CSP并发模型
CSP介绍ChannelGoroutineGoroutine 调度器总结 Channel类型
创建Channel通过 Channel 发送和接收消息使用Channel时发生死锁 Range接收Channel的消息SELECt选择一组可能的send与receive操作
selecttimeout Timer 和 Ticker
timerticker 关闭Channel -> Close同步
Go ChannelChannel是Go中的一个核心类型,可以理解为一个管道,通过它并发核心单元就可以发送或者接收数据进行通讯。
操作符:<- (右写左读)
传统的并发模型主要分为Actor模型和CSP模型,CSP全程为communicating sequential processes,CSP模型由并发执行实体(进程,线程或协程),和消息通道组成,实体之间通过消息通道发送消息进行通信。和Actor模型不同,CSP模型关注的是消息发送的载体,即通道,而不是发送消息的执行实体。其中执行实体对应的是goroutine,消息通道对应的就是channel。
Channelchannel在go中是被单独创建并且可以在进程之间传递,他的通信模式类似于 boss-worker 模式,一个实体通过将消息发送到channel中,然后又监听这个channel的实体处理,两个实体之间是匿名的,这个就是实现实体中间的解耦,其中channel是同步的一个消息被发送到channel中,最终一定是要被另外的实体消费掉的,在实现原理上其实就是一个阻塞的消息队列。
GoroutineGoroutine是实际并发执行的实体,它底层使用协程(coroutine)实现并发,coroutine是一种运行在用户态的用户线程,go底层选择使用coroutime的出发点是因为以下三大特点:
- 用户空间 避免了内核态和用户态的切换导致的成本可以由语言和框架层进行调度更小的栈空间允许创建大量的实例
由2可知用户空间线程的调度不是由操作系统来完成的,go提供了调度器并对网络IO库进行封装,屏蔽了复杂的细节,对外提供统一的语法关键字支持,简化了并发程序编写的成本。
go使用goroutine作为最小的执行单位,但是这个执行单位还在用户空间,实际上最后被处理器执行的还是内核中的线程,用户线程和内核线程的调度方法有:
多个用户线程对应一个内核线程
一个用户线程对应一个内核线程
用户线程和内核线程多对多
go通过为goroutine提供语言层面的调度器,来实现高效率的多对对线程对应关系
M:内核线程P:调度协调,用于协调M与G的执行,内核线程只有拿到了P才能对goroutine继续调度执行,一般都是通过限定P的个数来控制go的并发度G:待执行的goroutine,包含这个goroutine的栈空间Gn:灰色背景的Gn是已经挂起的goroutine,他们被添加到了执行队列中,然后需要等待网络IO的goroutine,当P通过epoll查询到特定的fd时,会重新调度对应的正在挂起的goroutine
go为了调度的公平性,在调度器中加入了steal working算法,在一个P自己的执行队列中,处理完之后,它会先到全局的执行队列中偷G进行处理,如果没有的话,再去其他P的执行队列中偷G来进行执行。
总结go实现了CSP并发模型作为并发基础,底层使用goroutine作为并发实体,goroutine非常轻量级可以创建几十万个实体!实体间通过channel进行匿名消息传递进行解耦,在语言层面实现了自动调度,这样屏蔽了很多内部细节,对外提供简单的语法关键字,大大简化了并发编程的思维转换和管理线程的复杂度。
Channel类型 创建Channelvar c1 chan [value type] c1 = make([channel type] [value type], [capacity])
[value type] 定义的是 Channel 中所传输数据的类型[channel type] 定义的是 Channel 的类型,有三种类型:
- chan 可读可写chan<- 仅可写<-chan 仅可读
示例代码:
package main
import "fmt"
func main(){
//定义变量
var c1 chan int
var i1 int
//初始化 channel
c1 = make(chan int, 100)
//向 channel c1 发送(写入)一个 int 20
c1 <- 20
//从 channel c1 接收(读取)一个 int 并赋值给 i1
i1 = <- c1
//将 i1 打印输出
fmt.Println("received: ", i1, " from c1")
}
运行结果:
received: 20 from c1使用Channel时发生死锁
当 channel 没有设置缓存区且的发送和接收动作不在同时发送时,会导致阻塞进而死锁!
package main
import "fmt"
import "time"
func main(){
var c1 chan string
c1 = make(chan string)
func() {
time.Sleep(time.Second * 2)
c1 <- "result 1"
}()
fmt.Println("received: '", <- c1,"' from c1")
}
避免死锁方案一:使用go语句并行执行
package main
import "fmt"
import "time"
func main(){
var c1 chan string
c1 = make(chan string)
go func() {
time.Sleep(time.Second * 2)
c1 <- "result 1"
}()
fmt.Println("received: '", <- c1,"' from c1")
}
通过go语句加在func前,使得该方法在另一个线程中执行,这样发送与接收操作就可以同时发生,从而解决死锁。(单核CPU不可以哦!)
避免死锁方法二:使用buffer
package main
import "fmt"
import "time"
func main(){
var c1 chan string
c1 = make(chan string,1) //这里我们设置了一个长度为 1 的 buffer
func() {
time.Sleep(time.Second * 2)
c1 <- "result 1"
}()
fmt.Println("received: '", <- c1,"' from c1")
}
通过给 channel 添加缓存区,当缓存区没有用尽之前,阻塞不会发生,死锁也就不会发生。
Range接收Channel的消息注意:即使有buffer,如果一个channel没有多余的数据发送进来时,接收消息的动作会造成 channel 的阻塞与死锁!合理使用 select 语句可以规避这个问题。
for ... range 语法可以用于处理 Channel。接收 Channel 中发送的值,作为迭代值执行循环体,直到 channel 被关闭!注意使用不善可能会导致程序一直阻塞!
func main() {
go func() {
time.Sleep(1 * time.Hour)
}()
c := make(chan int)
go func() {
for i := 0; i < 10; i = i + 1 {
c <- i
}
close(c) // 如果不关闭会一直阻塞!!!
}()
for i := range c {
fmt.Println(i)
}
fmt.Println("Finished")
}
SELECT选择一组可能的send与receive操作
select
select语句选择一组可能的send操作和receive操作去处理。类似于switch,但是只是用来处理通讯操作。
它的case语句可以是send语句,也可以是receive语句,亦或者是default。
receive语句可以将值赋值给一个或者两个变量。它必须是一个receive操作!
最多运行有一个default语句,它可以放在case列表的任何位置。(推荐放在最后)
如果同时有多个channel可以接受数据,那么go会伪随机地选择一个case处理。
如果没有case需要处理,则会选择default去处理,如果default不存在则select语句将会阻塞!直到某个case需要处理。
注意:nil channel 上的操作会一直被阻塞,如果没有default只有nil channel的select会一直阻塞
select和switch一样,不是循环,只会选择一个case来处理,如果想要一直处理 channel ,则可以在外面加一层无限的for循环。
timeoutselect的超时处理机制。time.After(tme)
通过添加一个time.After(time)方法的case,利用它返回一个类型为<-chan Time的单向channel,在指定的时间发送一个当前时间给返回的channel中。
import "time"
import "fmt"
func main() {
c1 := make(chan string, 1)
go func() {
time.Sleep(time.Second * 2)
c1 <- "result 1"
}()
select {
case res := <-c1:
fmt.Println(res)
case <-time.After(time.Second * 1):
fmt.Println("timeout 1")
}
}
Timer 和 Ticker
Timer 和 Ticker 是关于时间的两个 Channel
timertimer:定时器,代表未来的一个单一事件,可以设置一个等待时间,它提供一个Channel,到时间后该Channel提供一个时间值。
可以通过timer.Stop()停止计时器
可以用timer.Sleep()直接让线程休眠指定时间
timer := time.NewTimer(time.Second) // 设置两秒的定时器
go func() {
<-timer.C // 等待定时器提供channel输入时间值,阻塞
fmt.Println("Timer expired")
}()
//time.Sleep(time.Second * 2) // 如果设置休眠,到了timer过期后,Stop()将返回false
stop := timer.Stop() // 如果定时器timer还没到时间则直接停止计时返回true,否则返回false
if stop {
fmt.Println("Timer stopped")
}
ticker
ticker是一个定时触发的计时器,它会以一个间隔(interval)向Channel中发送一个事件(当前时间),而Channel的接受者可以以固定的时间间隔从Channel中读取事件。
// 通过 for range 一直等待ticker发送时间数据作为迭代值,定时间隔执行循环体
// 设置 每隔 500ms 执行一次函数
ticker := time.NewTicker(time.Millisecond * 500)
go func() {
for t := range ticker.C {
fmt.Println("Tick at", t)
}
}()
ticker也可以通过Stop来停止,一旦停止,接受者不再会从channel中接收数据了。
关闭Channel -> Close可以使用内建函数close来关闭channel。
- 如果向已关闭的channel发送数据会导致panic : send on closed channel可以向已关闭的channel中读取已发出的数据,且读取完毕后,再读取将读取零值如果通过range来读取,channel关闭后for循环就会跳出结束可以通过value : status := channel来查看Channel状态,判断是零值还是正常读取的值!
// true 正常读取的值 false 零值
channel := make(chan int, 10)
close(channel)
value, status := <-channel
fmt.Printf("%d, %t", value, status) //0, false
channel可以用在goroutine之间的同步。
示例:
main goroutine通过done channel等待worker完成任务。 worker做完任务后只需往channel发送一个数据就可以通知main goroutine任务完成。
import (
"fmt"
"time"
)
func worker(done chan bool) {
time.Sleep(time.Second)
// 通知任务已完成
done <- true
}
func main() {
done := make(chan bool, 1)
go worker(done)
// 等待任务完成
<-done
}



