<>强起步强>
芹菜是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。
运行模式是生产者消费者模式:
任务队列:任务队列是一种在线程或机器间分发任务的机制。
消息队列:消息队列的输入是工作的一个单元,称为任务,独立的职程(工人)进程持续监视队列中是否有需要处理的新任务。
芹菜用消息通信,通常使用中间人(代理)在客户端和职程间斡旋。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程,职程对消息进行处理。
芹菜的架构由三部分组成,消息中间件(message broker),任务执行单元(工人)和任务执行结果存储(任务结果存储)组成。
消息中间件:芹菜本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成,包括,RabbitMQ,复述,MongoDB等,本文使用复述。
任务执行单元:工人是芹菜提供的任务执行的单元,工人并发的运行在分布式的系统节点中
任务结果存储:任务结果存储用来存储工人执行的任务的结果,芹菜支持以不同方式存储任务的结果,包括复述,MongoDB, Django ORM, AMQP等,这里我先不去看它是如何存储的,就先选用复述来存储任务执行结果。
<>强安装强>
通过pip命令即可安装:
pip安装芹菜
本文使用复述做消息中间件,所以需要在安装:
pip安装复述, >之前复述,软件也要安装,官网只提供了linux版本的下载:https://redis。io/下载,windows的可以到https://github.com/MicrosoftArchive/redis下载exe安装包。
<强>简单的演示
强>为了运行一个简单的任务,从中说明芹菜的使用方式。在项目文件夹内创建app.py和任务。py .tasks。py用来定义任务:
# tasks.py 导入的时间 从芹菜进口芹菜 代理='复述://127.0.0.1:6379/1 ' 后端='复述://127.0.0.1:6379/2 ' 应用=芹菜(my_tasks,代理=代理后台=后台) @app.task def添加(x, y): print(输入任务) time . sleep (3) 返回x + y >之前这些代码做了什么事。代理指定任务队列的消息中间件,后端指定了任务执行结果的存储.app就是我们创建的芹菜对象。通过app.task修饰器将添加函数变成一个一部的任务。
# app.py 从任务导入添加 if __name__==癬_main__”: print(“开始工作”) 结果=add.delay (18) 打印(“任务结束”) 打印(结果) >之前add.delay函数将任务序列化发送到消息中间件。终端执行python app.py可以看到输出一个任务的唯一识别:
引用>
开始任务 结束任务
79年ef4736-1ecb-4afd-aa5e-b532657acd43
这个只是将任务推送到复述,任务还没被消费,任务会在芹菜队列中。
开启芹菜锅可以将任务进行消费:
芹菜工人——任务- l信息#——后是模块名参数指定了芹菜对象的位置,l参数指定醒来的日志级别。
如果此命令在终端报错:
,文件”e: \ workspace \ .env \ lib \网站\芹菜\ app \痕迹。py”, 537行,在_fast_trace_task
引用>
,,任务、接受主机名=_loc
ValueError:不够值解包(预计3,0)
这是赢得10在使用芹菜4。x的时候会有这个问题,解决方式可以是改用芹菜3。x版本,或者按照无法在Windows上运行任务提供的方式,该问题提供了两种方式解决,一种是安装eventlet扩展:
pip安装eventlet 芹菜——& lt; mymodule>工人- l信息- p eventlet另一种方式是添加个FORKED_BY_MULTIPROCESSING=1的环境变量(推荐这种方式):
进口操作系统 os.environ。setdefault (' FORKED_BY_MULTIPROCESSING ', ' 1 ')详解分布式任务队列芹菜使用说明