为了在redis中实现一个简单的队列,该队列可用于重新提交崩溃的作业,我会尝试如下操作:
- 1个清单“ up_for_grabs”
- 1个清单“ being_worked_on”
- 自动过期锁
试图找工作的工人会做这样的事情:
timeout = 3600#wrap this in a transaction so our cleanup wont kill the task#Move the job away from the queue so nobody else tries to claim itjob = RPOPLPUSH(up_for_grabs, being_worked_on)#Set a lock and expire it, the value tells us when that job will time out. This can be arbitrary thoughSETEX('lock:' + job, Time.now + timeout, timeout)#our application logicdo_work(job)#Remove the finished item from the queue.LREM being_worked_on -1 job#Delete the item's lock. If it crashes here, the expire will take care of itDEL('lock:' + job)而且不时地,我们只需获取清单并检查其中的所有作业实际上是否都已锁定。如果我们发现没有锁的任何作业,则意味着该作业已过期,我们的工作人员很可能崩溃了。在这种情况下,我们将重新提交。
这将是伪代码:
loop do items = LRANGE(being_worked_on, 0, -1) items.each do |job| if !(EXISTS("lock:" + job)) puts "We found a job that didn't have a lock, resubmitting" LREM being_worked_on -1 job LPUSH(up_for_grabs, job) end end sleep 60end


