介绍
利用复述,流怎么实现一个消息队列?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
代码清单10:1的展示了一个具有基本功能的消息队列实现:
- <李>
代码最开头的是几个转换函数,它们负责对程序的相关输入输出进行转换和格式化;
李> <李>MessageQueue类用于实现消息队列,它的添加消息,移除消息以及返回消息数量三个方法分别使用了流的,XADD命令,,XDEL命令和,XLEN命令;
李> <李>消息队列的两个获取方法,get_message()和,get_by_range()分别以两种形式调用了流的,XRANGE命令;
李> <李>最后,用于迭代消息的,迭代()方法使用了,XREAD命令对流进行迭代。
李>代码清单10:1的使用复述,流实现的消息队列:/流/message_queue。py
def reconstruct_message_list (message_list): “才能”;“ 为才能了让多条消息能够以更结构化的方式返回给调用者, 将才能,Redis 返回的多条消息从原来的格式: (才能(id1,, {k1: v1, k2: v2,…}),, (id2,, {k1: v1, k2: v2,…}),,…] 转才能换成以下格式: (才能{id1: {k1: v1, k2: v2,…}},, {id2: {k1: v1, k2: v2,…}},,…] “““才能,,result =, [] for id、才能,kvs 拷贝message_list: ,,,result.append ({id:变电站}) return 才能结果 def get_message_from_nested_list (lst): “才能”;“ ,,从嵌套列表中取出消息本体。 “才能”;“ return 才能lst [0] [1] class MessageQueue: “才能”;“ 使才能用,Redis 流实现的消息队列。 “才能”;“ def 才能;__init__(自我,,客户,,stream_key): ,,,self.client =客户 ,,,self.stream =stream_key def 才能;add_message(自我,,key_value_pairs): ,,,,,, ,,,将给定的键值对存入到消息里面,并返回相应的消息,ID 。 ,,,,,, ,,,return self.client.xadd (self.stream, key_value_pairs) def 才能;get_message(自我,,message_id): ,,,,,, ,,,根据给定的消息,ID 返回相应的消息,如果消息不存在则返回,None 。 ,,,,,, ,,,reply =, self.client.xrange (self.stream, message_id,, message_id) ,,,if len(回复),==,1: ,,,,,return get_message_from_nested_list(回复) def 才能;remove_message(自我,,message_id): ,,,,,, ,,,根据给定的消息,ID 删除相应的消息,如果消息不存在则忽略该动作。 ,,,,,, ,,,self.client.xdel (self.stream, message_id) def 才能len(自我): ,,,,,, ,,,返回消息队列的长度。 ,,,,,, ,,,return self.client.xlen (self.stream) def 才能;get_by_range (start_id,自我,还以为,end_id, max_item=10): ,,,,,, ,,,根据给定的,ID 区间范围返回队列中的消息。 ,,,,,, ,,,reply =, self.client.xrange (start_id, self.stream,还以为,end_id, max_item) ,,,return reconstruct_message_list(回复) def 才能;迭代(自我,,start_id=0,, max_item=10): ,,,,,, ,,,对消息队列进行迭代,返回最多,N 条大于给定,ID 的消息。 ,,,,,, ,,,reply =, self.client.xread ({self.stream: start_id},, max_item) ,,,if len(回复),==,0: ,,,,,return 列表() ,,,: ,,,,,messages =, get_message_from_nested_list(回复) ,,,,,return reconstruct_message_list(消息)
对于这个消息队列实现,我们可以通过执行以下代码,创建出它的实例:
在祝辞祝辞,得到redis import 复述 在祝辞祝辞,得到message_queue import MessageQueue 在祝辞祝辞,client =,复述(decode_responses=True) 在祝辞祝辞,mq =, MessageQueue(客户,,“mq")
然后通过执行以下代码,向队列里面添加十条消息:
在祝辞祝辞,for 小姐:拷贝范围(10): 时间=?,key “关键{0}“.format(我) 时间=?,value “价值{0}“.format(我) 时间=?,msg {键:值} ……,,mq.add_message(味精) … & # 39;1554113926280 - 0 & # 39; & # 39;1554113926280 - 1 & # 39; & # 39;1554113926281 - 0 & # 39; & # 39;1554113926281 - 1 & # 39; & # 39;1554113926281 - 2 & # 39; & # 39;1554113926281 - 3 & # 39; & # 39;1554113926281 - 4 & # 39; & # 39;1554113926281 - 5 - & # 39; & # 39;1554113926281 - 6 & # 39; null null null null null null null null null利用复述,流怎么实现一个消息队列