利用复述,流怎么实现一个消息队列

  介绍

利用复述,流怎么实现一个消息队列?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

代码清单10:1的展示了一个具有基本功能的消息队列实现:

<李>

代码最开头的是几个转换函数,它们负责对程序的相关输入输出进行转换和格式化;

<李>

MessageQueue类用于实现消息队列,它的添加消息,移除消息以及返回消息数量三个方法分别使用了流的,XADD命令,,XDEL命令和,XLEN命令;

<李>

消息队列的两个获取方法,get_message()和,get_by_range()分别以两种形式调用了流的,XRANGE命令;

<李>

最后,用于迭代消息的,迭代()方法使用了,XREAD命令对流进行迭代。

代码清单10:1的使用复述,流实现的消息队列:/流/message_queue。py

def  reconstruct_message_list (message_list):   “才能”;“   为才能了让多条消息能够以更结构化的方式返回给调用者,   将才能,Redis 返回的多条消息从原来的格式:   (才能(id1,, {k1: v1, k2: v2,…}),, (id2,, {k1: v1, k2: v2,…}),,…]   转才能换成以下格式:   (才能{id1: {k1: v1, k2: v2,…}},, {id2: {k1: v1, k2: v2,…}},,…]   “““才能,,result =, []   for  id、才能,kvs 拷贝message_list:   ,,,result.append ({id:变电站})   return 才能结果   def  get_message_from_nested_list (lst):   “才能”;“   ,,从嵌套列表中取出消息本体。   “才能”;“   return 才能lst [0] [1]   class  MessageQueue:   “才能”;“   使才能用,Redis 流实现的消息队列。   “才能”;“   def 才能;__init__(自我,,客户,,stream_key):   ,,,self.client =客户   ,,,self.stream =stream_key   def 才能;add_message(自我,,key_value_pairs):   ,,,,,,   ,,,将给定的键值对存入到消息里面,并返回相应的消息,ID 。   ,,,,,,   ,,,return  self.client.xadd (self.stream, key_value_pairs)   def 才能;get_message(自我,,message_id):   ,,,,,,   ,,,根据给定的消息,ID 返回相应的消息,如果消息不存在则返回,None 。   ,,,,,,   ,,,reply =, self.client.xrange (self.stream, message_id,, message_id)   ,,,if  len(回复),==,1:   ,,,,,return  get_message_from_nested_list(回复)      def 才能;remove_message(自我,,message_id):   ,,,,,,   ,,,根据给定的消息,ID 删除相应的消息,如果消息不存在则忽略该动作。   ,,,,,,   ,,,self.client.xdel (self.stream, message_id)      def 才能len(自我):   ,,,,,,   ,,,返回消息队列的长度。   ,,,,,,   ,,,return  self.client.xlen (self.stream)      def 才能;get_by_range (start_id,自我,还以为,end_id, max_item=10):   ,,,,,,   ,,,根据给定的,ID 区间范围返回队列中的消息。   ,,,,,,   ,,,reply =, self.client.xrange (start_id, self.stream,还以为,end_id, max_item)   ,,,return  reconstruct_message_list(回复)      def 才能;迭代(自我,,start_id=0,, max_item=10):   ,,,,,,   ,,,对消息队列进行迭代,返回最多,N 条大于给定,ID 的消息。   ,,,,,,   ,,,reply =, self.client.xread ({self.stream: start_id},, max_item)   ,,,if  len(回复),==,0:   ,,,,,return 列表()   ,,,:   ,,,,,messages =, get_message_from_nested_list(回复)   ,,,,,return  reconstruct_message_list(消息)

对于这个消息队列实现,我们可以通过执行以下代码,创建出它的实例:

在祝辞祝辞,得到redis  import 复述   在祝辞祝辞,得到message_queue  import  MessageQueue   在祝辞祝辞,client =,复述(decode_responses=True)   在祝辞祝辞,mq =, MessageQueue(客户,,“mq")

然后通过执行以下代码,向队列里面添加十条消息:

在祝辞祝辞,for 小姐:拷贝范围(10):   时间=?,key “关键{0}“.format(我)   时间=?,value “价值{0}“.format(我)   时间=?,msg {键:值}   ……,,mq.add_message(味精)   …   & # 39;1554113926280 - 0 & # 39;   & # 39;1554113926280 - 1 & # 39;   & # 39;1554113926281 - 0 & # 39;   & # 39;1554113926281 - 1 & # 39;   & # 39;1554113926281 - 2 & # 39;   & # 39;1554113926281 - 3 & # 39;   & # 39;1554113926281 - 4 & # 39;   & # 39;1554113926281 - 5 - & # 39;   & # 39;1554113926281 - 6 & # 39;   null   null   null   null   null   null   null   null   null

利用复述,流怎么实现一个消息队列