本篇文章为大家展示了Django异步任务线程池实现原理,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
<强>请求任务异步处理的原理强>
使用python管理。py runserver模式启动的Django应用只有一个进程,对于每个请求,主线程会开启一个子线程来处理请求。请求子线程向主线程申请一个新线程,然后把耗时的任务交给新线程,自身立即响应,这就是请求任务异步处理的原理。
<强>可视化线程池强>
如果想要管理这批异步线程,知道他们是否在运行中,可以使用线程池(ThreadPoolExecutor)。
线程池会先启动若干数量的线程,并让这些线程都处于睡眠状态,当向线程池提交一个任务后,会唤醒线程池中的某一个睡眠线程,让它来处理这个任务,当处理完这个任务,线程又处于睡眠状态。
提交任务后会返回一个期程(未来的),这个对象可以查看线程池中执行此任务的线程是否仍在处理中
因此可以构建一个全局可视化线程池:
得到concurrent.futures.thread import ThreadPoolExecutor class ThreadPool(对象): def 才能__init__(自我): ,,,#,线程池 ,,,self.executor =, ThreadPoolExecutor (20) ,,,#,用于存储每个项目批量任务的期程 ,,,self.future_dict =, {} #,才能检查某个项目是否有正在运行的批量任务 def 才能;is_project_thread_running(自我,,project_id): ,,,future =, self.future_dict.get (project_id,,没有) ,,,if future 以及future.running (): ,,,,,#,存在正在运行的批量任务 ,,,,,return 真实的 ,,,return 错误的 #,才能展示所有的异步任务 def 才能check_future(自我): ,,,data =, {} ,,,for project_id,, future 拷贝self.future_dict.items (): ,,,,,数据[project_id],=, future.running () ,,return 数据 def 才能__del__(自我): ,,,self.executor.shutdown () #,主线程中的全局线程池 #,global_thread_pool的生命周期是Django主线程运行的生命周期 时间=global_thread_pool ThreadPool ()
使用:
#,检查异步任务 if global_thread_pool.is_project_thread_running (project_id): raise 才能exceptions.ValidationError(细节=& # 39;存在正在处理的批量任务,请稍后重试& # 39;) #,提交一个异步任务 时间=future global_thread_pool.executor.submit (self.batch_thread, project_id) global_thread_pool.future_dict [project_id],=,未来 #,查看所有异步任务 @login_required def check_future(请求): 时间=data 才能;global_thread_pool.check_future () return 才能;HttpResponse(状态=status.HTTP_200_OK,内容=json.dumps(数据)
<强>串行执行
强>
使用线程锁
在全局线程池中初始化线程锁
class ThreadPool(对象): def 才能__init__(自我): ,,,self.executor =, ThreadPoolExecutor (20) ,,,self.future_dict =, {} ,,,self.lock =, threading.Lock ()
然后执行线程前需要获取锁并再执行结束后释放锁
def batch_thread(自我): global_thread_pool.lock.acquire才能() 尝试才能: ,,,… ,,,global_thread_pool.lock.release () except 才能;例外: ,,,trace_log =, traceback.format_exc () ,,,logger.error(& # 39;异步任务执行失败:\ n % & # 39;, %, trace_log) ,,,global_thread_pool.lock.release ()
需要捕捉异常预防子线程出错而无法释放锁的情况
<强>异步线程任务执行前先检查数据库连接是否可用,然后关掉不可用连接强>
由于django的数据库连接是保存到线程本地变量中的,通过ThreadPoolExecutor创建的线程会保存各自的数据库连接。
当连接被保存的时间超过mysql连接的最大超时时间,连接失效,但不会被线程释放。
之后再调起线程执行涉及到数据库操作的异步任务时,会用到失效的数据库连接,导致报错“mysql服务器已经消失”。
解决方案是在线程池的所有异步任务执行前先检查数据库连接是否可用,然后关掉不可用连接
def batch_thread(自我): for 才能;conn 拷贝connections.all (): ,,,conn.close_if_unusable_or_obsolete () ,,…
上述内容就是Django异步任务线程池实现原理,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注行业资讯频道。