聊聊Go的goroutine和Channel
相关文章推荐:《聊聊Go的并发编程 (一)》
一、使用channel等待任务结束
package mainimport ("fmt""time")func createWorker(id int) chan<- int {c := make(chan int)go worker(id, c)return c}func worker(id int, c chan int) {for n := range c {fmt.Printf("Worker %d receive %cn", id, n)}}func channelDemo() {var channels [10]chan<- intfor i := 0; i < 10; i++ {channels[i] = createWorker(i)}for i := 0; i < 10; i++ {channels[i] <- 'a' + i}for i := 0; i < 10; i++ {channels[i] <- 'A' + i}time.Sleep(time.Millisecond)}func main() {channelDemo()}package mainimport ("fmt")type worker struct {in chan intdone chan bool}func createWorker(id int) worker {w := worker{in: make(chan int),done: make(chan bool),}go doWorker(id, w.in, w.done)return w}func doWorker(id int, c chan int, done chan bool) {for n := range c {fmt.Printf("Worker %d receive %cn", id, n)done <- true}}func channelDemo() {var workers [10]workerfor i := 0; i < 10; i++ {workers[i] = createWorker(i)}for i := 0; i < 10; i++ {workers[i].in <- 'a' + i<-workers[i].done}for i := 0; i < 10; i++ {workers[i].in <- 'A' + i<-workers[i].done}}func main() {channelDemo()}package mainimport ("fmt")type worker struct {in chan intdone chan bool}func createWorker(id int) worker {w := worker{in: make(chan int),done: make(chan bool),}go doWorker(id, w.in, w.done)return w}func doWorker(id int, c chan int, done chan bool) {for n := range c {fmt.Printf("Worker %d receive %cn", id, n)done <- true}}func channelDemo() {var workers [10]workerfor i := 0; i < 10; i++ {workers[i] = createWorker(i)}for i, worker := range workers {worker.in <- 'a' + i}for i, worker := range workers {worker.in <- 'A' + i}for _, worker := range workers {<-worker.done<-worker.done}}func main() {channelDemo()}sync.WaitGroup的用法
package mainimport ("fmt""sync")type worker struct {in chan intwg *sync.WaitGroup}func createWorker(id int, wg *sync.WaitGroup) worker {w := worker{in: make(chan int),wg: wg,}go doWorker(id, w.in, wg)return w}func doWorker(id int, c chan int, wg *sync.WaitGroup) {for n := range c {fmt.Printf("Worker %d receive %cn", id, n)wg.Done()}}func channelDemo() {var wg sync.WaitGroupvar workers [10]workerfor i := 0; i < 10; i++ {workers[i] = createWorker(i, &wg)}// 添加20个任务wg.Add(20)for i, worker := range workers {worker.in <- 'a' + i}for i, worker := range workers {worker.in <- 'A' + i}wg.Wait()}func main() {channelDemo()}channelDemo这个方法中关于done的channel。使用了sync.WaitGroup,并且给createWorker方法传递sync.WaitGroupcreateWorker方法使用了 worker的结构体。 所以要先修改worker结构体,将之前的done改为wg *sync.WaitGroup即可 这样就可以直接用结构体的数据。 接着在doWorker方法中把最后一个参数done改为wg *sync.WaitGroup 将方法中的done改为wg.Done() 最后一步就是回到函数channelDemo中把任务数添加进去,然后在代码最后添加一个等待即可。抽象代码
package mainimport ("fmt""sync")type worker struct {in chan intdone func()}func createWorker(id int, wg *sync.WaitGroup) worker {w := worker{in: make(chan int),done: func() {wg.Done()},}go doWorker(id, w)return w}func doWorker(id int, w worker) {for n := range w.in {fmt.Printf("Worker %d receive %cn", id, n)w.done()}}func channelDemo() {var wg sync.WaitGroupvar workers [10]workerfor i := 0; i < 10; i++ {workers[i] = createWorker(i, &wg)}// 添加20个任务wg.Add(20)for i, worker := range workers {worker.in <- 'a' + i}for i, worker := range workers {worker.in <- 'A' + i}wg.Wait()}func main() {channelDemo()}二、使用select进行调度
package mainimport ("fmt""math/rand""time")func generator() chan int {out := make(chan int)go func() {i := 0for {// 随机睡眠1500毫秒以内time.Sleep(time.Duration(rand.Intn(1500)) *time.Millisecond)// 往out这个channel发送i值out <- ii++}}()return out}func main() {// 这里需要明白如果代码为var c1, c2 chan int 则c1和c2都为nil// 在 select里面也是可以使用的,只不过是堵塞状态!var c1, c2 = generator(), generator()for {select {case n := <-c1:fmt.Printf("receive from c1 %dn", n)case n := <-c2:fmt.Printf("receive from c2 %dn", n)}}}package mainimport ("fmt""math/rand""time")func worker(id int, c chan int) {for n := range c {fmt.Printf("Worker %d receive %dn", id, n)}}func createWorker(id int) chan<- int {c := make(chan int)go worker(id, c)return c}func generator() chan int {out := make(chan int)go func() {i := 0for {// 随机睡眠1500毫秒以内time.Sleep(time.Duration(rand.Intn(1500)) *time.Millisecond)// 往out这个channel发送i值out <- ii++}}()return out}func main() {// 这里需要明白如果代码为var c1, c2 chan int 则c1和c2都为nil// 在 select里面也是可以使用的,只不过是堵塞状态!var c1, c2 = generator(), generator()// 直接调用createWorker方法,返回的就是一个channelw := createWorker(0)for {select {case n := <-c1:w <- ncase n := <-c2:w <- n}}}package mainimport ("fmt""math/rand""time")func worker(id int, c chan int) {for n := range c {fmt.Printf("Worker %d receive %dn", id, n)}}func createWorker(id int) chan<- int {c := make(chan int)go worker(id, c)return c}func generator() chan int {out := make(chan int)go func() {i := 0for {// 随机睡眠1500毫秒以内time.Sleep(time.Duration(rand.Intn(1500)) *time.Millisecond)// 往out这个channel发送i值out <- ii++}}()return out}func main() {// 这里需要明白如果代码为var c1, c2 chan int 则c1和c2都为nil// 在 select里面也是可以使用的,只不过是堵塞状态!var c1, c2 = generator(), generator()// 直接调用createWorker方法,返回的就是一个channelvar worker = createWorker(0)// 这个n如果放在for循环里边,就会一直打印0,因为从c1和c2收数据需要时间,所以会把0直接传给workern := 0// 使用这个标识告诉有没有值hasValue := falsefor {// 利用nil channel的特性var activeWorker chan<- intif hasValue {activeWorker = worker}select {case n = <-c1:// 收到值的话就标记为truehasValue = truecase n = <-c2:// 收到值的话就标记为truehasValue = truecase activeWorker <- n:hasValue = false}}}package mainimport ("fmt""math/rand""time")func worker(id int, c chan int) {for n := range c {// 手动让消耗速度变慢time.Sleep(5 * time.Second)fmt.Printf("Worker %d receive %dn", id, n)}}func createWorker(id int) chan<- int {c := make(chan int)go worker(id, c)return c}func generator() chan int {out := make(chan int)go func() {i := 0for {// 随机睡眠1500毫秒以内time.Sleep(time.Duration(rand.Intn(1500)) *time.Millisecond)// 往out这个channel发送i值out <- ii++}}()return out}func main() {// 这里需要明白如果代码为var c1, c2 chan int 则c1和c2都为nil// 在 select里面也是可以使用的,只不过是堵塞状态!var c1, c2 = generator(), generator()// 直接调用createWorker方法,返回的就是一个channelvar worker = createWorker(0)// 用来收n的值var values []intfor {// 利用nil channel的特性var activeWorker chan<- intvar activevalue int// 判断当values中有值时if len(values) > 0 {activeWorker = worker// 取出索引为0的值activevalue = values[0]}select {case n := <-c1:// 将收到的数据存到values中values = append(values, n)case n := <-c2:// 将收到的数据存到values中values = append(values, n)case activeWorker <- activevalue:// 送出去后就需要把values中的第一个值拿掉values = values[1:]}}}计时器的使用
package mainimport ("fmt""math/rand""time")func worker(id int, c chan int) {for n := range c {// 手动让消耗速度变慢time.Sleep(time.Second)fmt.Printf("Worker %d receive %dn", id, n)}}func createWorker(id int) chan<- int {c := make(chan int)go worker(id, c)return c}func generator() chan int {out := make(chan int)go func() {i := 0for {// 随机睡眠1500毫秒以内time.Sleep(time.Duration(rand.Intn(1500)) *time.Millisecond)// 往out这个channel发送i值out <- ii++}}()return out}func main() {// 这里需要明白如果代码为var c1, c2 chan int 则c1和c2都为nil// 在 select里面也是可以使用的,只不过是堵塞状态!var c1, c2 = generator(), generator()// 直接调用createWorker方法,返回的就是一个channelvar worker = createWorker(0)// 用来收n的值var values []int// 返回的是一个channeltm := time.After(10 * time.Second)for {// 利用nil channel的特性var activeWorker chan<- intvar activevalue int// 判断当values中有值时if len(values) > 0 {activeWorker = worker// 取出索引为0的值activevalue = values[0]}select {case n := <-c1:// 将收到的数据存到values中values = append(values, n)case n := <-c2:// 将收到的数据存到values中values = append(values, n)case activeWorker <- activevalue:// 送出去后就需要把values中的第一个值拿掉values = values[1:]case <-tm:fmt.Println("Bye")return}}}package mainimport ("fmt""math/rand""time")func worker(id int, c chan int) {for n := range c {// 手动让消耗速度变慢time.Sleep(time.Second)fmt.Printf("Worker %d receive %dn", id, n)}}func createWorker(id int) chan<- int {c := make(chan int)go worker(id, c)return c}func generator() chan int {out := make(chan int)go func() {i := 0for {// 随机睡眠1500毫秒以内time.Sleep(time.Duration(rand.Intn(1500)) *time.Millisecond)// 往out这个channel发送i值out <- ii++}}()return out}func main() {// 这里需要明白如果代码为var c1, c2 chan int 则c1和c2都为nil// 在 select里面也是可以使用的,只不过是堵塞状态!var c1, c2 = generator(), generator()// 直接调用createWorker方法,返回的就是一个channelvar worker = createWorker(0)// 用来收n的值var values []int// 返回的是一个channeltm := time.After(10 * time.Second)tick := time.Tick(time.Second)for {// 利用nil channel的特性var activeWorker chan<- intvar activevalue int// 判断当values中有值时if len(values) > 0 {activeWorker = worker// 取出索引为0的值activevalue = values[0]}select {case n := <-c1:// 将收到的数据存到values中values = append(values, n)case n := <-c2:// 将收到的数据存到values中values = append(values, n)case activeWorker <- activevalue:// 送出去后就需要把values中的第一个值拿掉values = values[1:]case <-time.After(800 * time.Millisecond):// 如果在800毫秒没有收到数据则提示超时fmt.Println("timeout")case <-tick:// 每秒获取一下values中队列的长度fmt.Println("queue len = ", len(values))case <-tm:fmt.Println("Bye")return}}}三、总结
以上就是聊聊Go的并发编程 (二)的详细内容,更多请关注考高分网其它相关文章!



