如果我正确地理解了问题,我想到的最简单的解决方案是在任务发送者(放入
queue)和工作人员(从取出
queue)之间添加中间层。这(可能是例行程序)将负责存储当前任务(按ID)并将结果广播到每个匹配的任务。
伪代码:
go func() { active := make(map[TaskID][]Task) for { select { case task := <-queue: tasks := active[task.ID] // No tasks with such ID, start heavy work if len(tasks) == 0 { worker <- task } // Save task for the result active[task.ID] = append(active[task.ID], task) case r := <-response: // Broadcast to all tasks for _, task := range active[r.ID] { task.Result <- r.Result } } }}()不需要互斥体,也可能不需要携带任何东西,工作人员将只需要将所有结果放入中间层,然后可以正确地路由响应。如果冲突ID可能在一段时间内到达,您甚至可以在此处轻松添加缓存。
编辑:
我有一个梦想,上面的代码导致了死锁。如果您一次发送大量请求并阻塞了
worker通道,那么这将是一个严重的问题–这个中间层例程正
worker <-task等待工作人员完成,但是所有工作人员都可能无法发送到响应通道(因为我们的例程可以收集)。可播放的证明。
可以考虑在通道中添加一些缓冲区,但这不是一个合适的解决方案(除非您可以通过这种方式设计系统,使缓冲区永远不会填满)。有几种方法可以解决此问题;例如,您可以运行一个单独的例程来收集响应,但是随后您需要
active使用互斥量保护地图。可行
您还可以
worker <-task选择一个选项,该选项将尝试将任务发送给工作人员,接收新任务(如果没有要发送的内容)或收集响应。可以利用以下事实:零通道从未准备好进行通信(被选择忽略),因此您可以在一个选择中选择接收和发送任务。例:
go func() { var next Task // received task which needs to be passed to a worker in := queue // incoming channel (new tasks) -- active var out chan Task // outgoing channel (to workers) -- inactive for { select { case t := <-in: next = t // store task, so we can pass to worker in, out = nil, worker // deactivate incoming channel, activate outgoing case out <- next: in, out = queue, nil // deactivate outgoing channel, activate incoming case r := <-response: collect <- r } }}()玩



