栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

这是Go中的惯用工作线程池吗?

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

这是Go中的惯用工作线程池吗?

从任何意义上讲,您的解决方案都不是工作程序goroutine池:您的代码不会限制并发的goroutine,也不会“重用”
goroutines(它总是在收到新作业时启动新的goroutine)。

生产者-消费者模式

如在Bruteforce MD5 Password
Cracker上
所发布的,您可以使用生产者-消费者模式。您可能有一个指定的
生产者 goroutine,它将生成作业(要执行/计算的事情)并将其发送到 作业 频道。您可能有一个固定的 使用者 goroutine
池(例如其中的5个),它们会在传送作业的通道上循环,并且每个执行/完成接收到的作业。

制片人 够程可以简单地关闭

jobs
时生成频道的所有作业,并发送正确的信令 消费者 没有更多的就业机会将到来。
for ...range
通道上的构造处理“关闭”事件并正确终止。请注意,关闭通道之前发送的所有作业仍将被发送。

这将导致干净的设计,将导致固定数量(但任意)的goroutine,并且它将始终使用100%CPU(如果goroutine的数量大于CPU内核的数量)。它还具有的优点是它可以被“节流”与信道容量(缓冲信道)的数量和适当选择
消费者 够程。

请注意,这种具有指定生产者goroutine的模型不是强制性的。您可能也有多个goroutine来产生作业,但是您也必须同步它们,以便仅

jobs
在所有生产者goroutine完成产生作业后才关闭通道-
否则在
jobs
已关闭的通道上尝试发送另一个作业会导致运行时恐慌。通常,生产作业的价格便宜,并且可以比执行的速度快得多,因此,这种模型可以在1个goroutine中生产它们,而同时又消耗/执行许多工作,因此在实践中是很好的。

处理结果:

如果作业有结果,则可以选择在其上可以传送结果(“发送回”)的指定 结果
渠道,也可以选择在作业完成/完成时在使用者中处理结果。后者甚至可以通过具有处理结果的“回调”功能来实现。重要的是结果是否可以独立处理,还是需要合并(例如map-
reduce框架)或汇总。

如果使用

results
通道,则还需要一个goroutine来接收来自该通道的值,以防止使用者被阻塞(如果的缓冲区
results
将被填充,则会发生)。

results
通道

string
我将创建一个包装器类型来容纳任何其他信息,而不是将简单的值作为作业和结果发送,因此它更加灵活:

type Job struct {    Id     int    Work   string    Result string}

请注意,该

Job
结构还会包装结果,因此当我们发回结果时,它也包含原始内容
Job
作为上下文- 通常非常有用
。还要注意,仅
*Job
在通道上发送指针()而不是
Job
值是有利可图的,因此无需制作
Job
s的“无数”副本,并且
Job
struct值的大小也变得无关紧要。

这是生产者-消费者的样子:

我将使用2个

sync.WaitGroup
值,它们的作用如下:

var wg, wg2 sync.WaitGroup

生产者负责生成要执行的作业:

func produce(jobs chan<- *Job) {    // Generate jobs:    id := 0    for c := 'a'; c <= 'z'; c++ {        id++        jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}    }    close(jobs)}

完成后(没有更多的工作),该

jobs
通道将关闭,这表明消费者将没有更多的工作到达。

请注意,

produce()
jobs
通道视为“ 仅发送” ,因为这是生产者仅需要执行的操作:在该通道上 发送 作业( 关闭
该通道之后,但在“ 仅发送” 通道上也允许这样做)。生产者的意外接收将是编译时错误(在编译时及早发现)。

消费者的责任是在可以接收工作的同时接收工作并执行它们:

func consume(id int, jobs <-chan *Job, results chan<- *Job) {    defer wg.Done()    for job := range jobs {        sleepMs := rand.Intn(1000)        fmt.Printf("worker #%d received: '%s', sleep %dmsn", id, job.Work, sleepMs)        time.Sleep(time.Duration(sleepMs) * time.Millisecond)        job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs)        results <- job    }}

注意

consume()
jobs
通道视为 只接收 ; 消费者只需要从中 接收 。类似地, 仅为 消费者 发送
results
频道。
__

还要注意,由于存在多个使用者goroutine, 因此无法*
在此处关闭

results
通道,只有第一次尝试关闭该通道才会成功,而其他尝试会导致运行时出现恐慌!在所有使用者goroutine结束之后,可以(必须)关闭通道,因为这样我们可以确保不会在该通道上发送任何进一步的值(结果)。
*
results``results

我们有需要分析的结果:

func analyze(results <-chan *Job) {    defer wg2.Done()    for job := range results {        fmt.Printf("result: %sn", job.Result)    }}

如您所见,只要结果可能出现(直到

results
关闭通道),它也会收到结果。
results
分析仪的通道 仅接收

请注意通道类型的使用:只要足够,就在编译时仅使用 单向 通道类型来及早发现并防止错误。如果确实需要 双向, 则仅使用 双向 通道类型。

这就是将所有这些粘合在一起的方式:

func main() {    jobs := make(chan *Job, 100)    // Buffered channel    results := make(chan *Job, 100) // Buffered channel    // Start consumers:    for i := 0; i < 5; i++ { // 5 consumers        wg.Add(1)        go consume(i, jobs, results)    }    // Start producing    go produce(jobs)    // Start analyzing:    wg2.Add(1)    go analyze(results)    wg.Wait() // Wait all consumers to finish processing jobs    // All jobs are processed, no more values will be sent on results:    close(results)    wg2.Wait() // Wait analyzer to analyze all results}

输出示例:

这是示例输出:

如您所见,在将所有作业排入队列之前,结果即将到来并得到分析:

worker #4 received: 'e', sleep 81msworker #0 received: 'a', sleep 887msworker #1 received: 'b', sleep 847msworker #2 received: 'c', sleep 59msworker #3 received: 'd', sleep 81msworker #2 received: 'f', sleep 318msresult: c-59msworker #4 received: 'g', sleep 425msresult: e-81msworker #3 received: 'h', sleep 540msresult: d-81msworker #2 received: 'i', sleep 456msresult: f-318msworker #4 received: 'j', sleep 300msresult: g-425msworker #3 received: 'k', sleep 694msresult: h-540msworker #4 received: 'l', sleep 511msresult: j-300msworker #2 received: 'm', sleep 162msresult: i-456msworker #1 received: 'n', sleep 89msresult: b-847msworker #0 received: 'o', sleep 728msresult: a-887msworker #1 received: 'p', sleep 274msresult: n-89msworker #2 received: 'q', sleep 211msresult: m-162msworker #2 received: 'r', sleep 445msresult: q-211msworker #1 received: 's', sleep 237msresult: p-274msworker #3 received: 't', sleep 106msresult: k-694msworker #4 received: 'u', sleep 495msresult: l-511msworker #3 received: 'v', sleep 466msresult: t-106msworker #1 received: 'w', sleep 528msresult: s-237msworker #0 received: 'x', sleep 258msresult: o-728msworker #2 received: 'y', sleep 47msresult: r-445msworker #2 received: 'z', sleep 947msresult: y-47msresult: u-495msresult: x-258msresult: v-466msresult: w-528msresult: z-947ms

在Go Playground上尝试完整的应用程序。

没有
results
频道

如果我们不使用

results
通道,但是消费者goroutines立即处理结果(在我们的情况下将其打印),代码将大大简化。在这种情况下,我们不需要2个
sync.WaitGroup
值(只需要第二个就可以等待分析器完成)。

没有

results
渠道,完整的解决方案是这样的:

var wg sync.WaitGrouptype Job struct {    Id   int    Work string}func produce(jobs chan<- *Job) {    // Generate jobs:    id := 0    for c := 'a'; c <= 'z'; c++ {        id++        jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}    }    close(jobs)}func consume(id int, jobs <-chan *Job) {    defer wg.Done()    for job := range jobs {        sleepMs := rand.Intn(1000)        fmt.Printf("worker #%d received: '%s', sleep %dmsn", id, job.Work, sleepMs)        time.Sleep(time.Duration(sleepMs) * time.Millisecond)        fmt.Printf("result: %sn", job.Work+fmt.Sprintf("-%dms", sleepMs))    }}func main() {    jobs := make(chan *Job, 100) // Buffered channel    // Start consumers:    for i := 0; i < 5; i++ { // 5 consumers        wg.Add(1)        go consume(i, jobs)    }    // Start producing    go produce(jobs)    wg.Wait() // Wait all consumers to finish processing jobs}

输出与

results
通道的输出“类似” (但执行/完成顺序当然是随机的)。

在Go Playground上尝试使用此变体。



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/371032.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号