Python怎么使用Beanstalkd做异步任务处理的方法

  介绍

这篇文章将为大家详细讲解有关Python怎么使用Beanstalkd做异步任务处理的方法,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

使用Beanstalkd作为消息队列服务,然后结合Python的装饰器语法实现一个简单的异步任务处理工具。

<>强最终效果

定义任务:

得到xxxxx.job_queue  import  JobQueue      时间=queue  JobQueue ()      @queue.task (& # 39; task_tube_one& # 39;)   def  task_one (__arg1、时间最长,大敌;;长度):   ,# do 任务

提交任务:

task_one.put (__arg1=癮",,最长=癰",,长度=癱")

然后就可以由后台的工作线程去执行这些任务了。

<强>实现过程

1,了解Beanstalk服务器

豆茎是一个简单,快速的工作队列。https://github.com/kr/beanstalkd

豆茎是一个C语言实现的消息队列服务。它提供了通用的接口,最初设计的目的是通过异步运行耗时的任务来减少大量网络应用程序中的页面延迟。针对不同的语言,有不同的Beanstalkd客户实现。Python里就有beanstalkc等。我就是利用beanstalkc来作为与Beanstalkd服务器通信的工具。

2,任务异步执行实现原理

 Python怎么使用Beanstalkd做异步任务处理的方法

Beanstalkd只能进行字符串的任务调度。为了让程序支持提交函数和参数,然后由锅执行函数并携带参数。需要一个中间层来将函数与传递的参数注册。

实现主要包括3个部分:

用户:负责将函数注册到Beanstalk的一个管上,实现很简单,注册函数名和函数本身的对应关系。(也就意味着同一个分组(管)下不能有相同函数名存在)。数据存储在类变量里。

class 用户(对象):=,,FUN_MAP  defaultdict(东西)      ,def  __init__(自我,时间函数,大敌;;管):   logger.info才能(& # 39;register  func:{},用管:{}# 39;公司.format (func.__name__,管))   Subscriber.FUN_MAP才能(管)[func.__name__],=, func

JobQueue:方便将一个普通函数转换为具有推杆能力的装饰器

class  JobQueue(对象):   ,@classmethod   ,def 任务(cls,管):   def 才能包装器(函数):   ,,用户(func,管)   ,,return 推杆(函数,,管)      return 才能包装

推杆:将函数名,函数参数,指定的分组组合为一个对象,然后json序列化为字符串,最后通过beanstalkc推送到beanstalkd队列。

class 推杆(对象):   ,def  __init__(自我,时间函数,大敌;;管):   self.func 才能=函数   self.tube 才能=管      ,#直接调用返回   ,def  __call__(自我,,* args,, * * kwargs):   return 才能self.func (* args,, * * kwargs)      ,#推给离线队列   ,def (自我,,* * kwargs):   args 才能=,{   ,,& # 39;func_name& # 39;:, self.func.__name__,   ,,& # 39;管# 39;:,self.tube,   ,,& # 39;kwargs& # 39;: kwargs   ,,}   logger.info才能(& # 39;put 工作:{},用队列# 39;.format (args))   beanstalk 才能=,beanstalkc.Connection(主机=BEANSTALK_CONFIG[& # 39;主机# 39;],,端口=BEANSTALK_CONFIG[& # 39;港口# 39;])   尝试才能:   ,,beanstalk.use (self.tube)   ,,job_id =, beanstalk.put (json.dumps (args))   ,,return  job_id   最后才能:   ,,beanstalk.close ()

工作人员:从beanstalkd队列中取出字符串,然后通过json。加载反序列化为对象,获得函数名,参数和管。最后从用户中获得函数名对应的函数代码,然后传递参数执行函数。

class 工人(对象):   worker_id =, 0      ,def  __init__(自我,,管):   self.beanstalk 才能=,beanstalkc.Connection(主机=BEANSTALK_CONFIG[& # 39;主机# 39;],,端口=BEANSTALK_CONFIG[& # 39;港口# 39;])   self.tubes =,才能管   self.reserve_timeout 才能=20   self.timeout_limit 才能=1000   self.kick_period 才能=600   self.signal_shutdown 才能=False   self.release_delay 才能=0   self.age 才能=0   self.signal_shutdown 才能=False   signal.signal才能(signal.SIGTERM, lambda 符号,,框架:,self.graceful_shutdown ())   Worker.worker_id 才能+=1   import_module_by_str才能(& # 39;pear.web.controllers.controller_crawler& # 39;)      ,def 订阅(自我):   if 才能isinstance (self.tubes,列表):   ,,for  tube  self.tubes:拷贝   ,,,if  tube  not 拷贝Subscriber.FUN_MAP.keys ():   ,,,,logger.error(& # 39;管:{},not 注册! & # 39;.format(管))   ,,,,继续   ,,,self.beanstalk.watch(管)   其他的才能:   ,,if  self.tubes  not 拷贝Subscriber.FUN_MAP.keys ():   ,,,logger.error(& # 39;管:{},not 注册! & # 39;.format (self.tubes))   ,才能返回   ,,self.beanstalk.watch (self.tubes)      ,def 运行(自我):   self.subscribe才能()   while 才能正确的:   ,,if  self.signal_shutdown:   ,才能打破   ,,if  self.signal_shutdown:   ,,,logger.info (“graceful  shutdown")   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null

Python怎么使用Beanstalkd做异步任务处理的方法