python-redis-消息队列
生产消费者模式
class redisConsumer(object):
def __init__(self):
self.con = redis.StrictRedis(host='192.168.234.241', port=63792, db=8)
self.queue = "job"
def push(self, **kwargs):
for subKargs in kwargs['datas']:
id = subKargs['id']
data = subKargs['data']
# lpush 在key对应 list的头部添加字符串元素
# rpush 在key对应 list的尾部添加字符串元素
self.con.lpush(self.queue, json.dumps(data).encode('utf-8'))
# self.con.publish(self.chanel, data['name'])
def get(self):
while True:
msg = self.con.blpop(self.queue)
print(f"{threading.currentThread().name}{msg}")
if __name__ == '__main__':
# obj = redisSubscribe()
obj = redisConsumer()
a = threading.Thread(target=obj.get, name="get-1")
c = threading.Thread(target=obj.get, name="get-2")
b = threading.Thread(target=obj.push, name='push',
kwargs={"datas": [{"id": 1, "data": {'name': 'hello 5'}},
{"id": 2, "data": {'name': 'hello 6'}},
{"id": 3, "data": {'name': 'hello 7'}},
{"id": 4, "data": {'name': 'hello 8'}}
]})
# 消费者
# a.setDaemon(True)
# a.start()
c.setDaemon(True)
c.start()
time.sleep(5)
# 生成者
b.start()
b.join()
time.sleep(60)
发布订阅模式
class redisSubscribe(object):
def __init__(self):
self.con = redis.StrictRedis(host='192.168.234.241', port=63792, db=8)
self.chanel = "job"
def push(self, **kwargs):
for subKargs in kwargs['datas']:
id = subKargs['id']
data = subKargs['data']
self.con.publish(self.chanel, json.dumps(data).encode('utf-8'))
# self.con.publish(self.chanel, data['name'])
def get(self):
pub = self.con.pubsub()
pub.subscribe(self.chanel)
pub.parse_response()
while True:
# for i in pub.listen():
# print(f"{threading.currentThread().name}{i}")
msg = pub.parse_response()
print(f"{threading.currentThread().name}{msg}")
if __name__ == '__main__':
# obj = redisSubscribe()
obj = redisConsumer()
a = threading.Thread(target=obj.get, name="get-1")
c = threading.Thread(target=obj.get, name="get-2")
b = threading.Thread(target=obj.push, name='push',
kwargs={"datas": [{"id": 1, "data": {'name': 'hello 5'}},
{"id": 2, "data": {'name': 'hello 6'}},
{"id": 3, "data": {'name': 'hello 7'}},
{"id": 4, "data": {'name': 'hello 8'}}
]})
# 订阅者
# a.setDaemon(True)
# a.start()
c.setDaemon(True)
c.start()
time.sleep(5)
# 发布者
b.start()
b.join()
time.sleep(60)