盐分发后,主动将已完成的任务数据推送到复述中,使用复述的生产者模式,进行消息传送
#=utf - 8编码 进口:json,日志记录 进口salt.config 进口salt.utils.event 从salt.utils。复述,进口RedisPool 导入系统,操作系统,datetime,随机的 进口多处理、线程 从joi.utils。gobsAPI进口PostWeb 记录器=logging.getLogger (__name__) 选择=salt.config.client_config('/数据/盐/saltstack/etc/盐/主’) r_conn=RedisPool (opts.get (redis_db)) .getConn () 锁=threading.Lock () 类RedisQueueDaemon(对象): “‘ 复述,队列监听器 “‘ def __init__(自我,r_conn): 自我。r_conn=r_conn #复述,连接实例 自我。task_queue='任务:刺激:队列#任务消息队列 def listen_task(自我): “‘ 监听主函数 “‘ 而真正的: queue_item=self.r_conn.blpop (self.task_queue,0) [1] 打印“队列”,queue_item # self.run_task (queue_item) t=threading.Thread(目标=self.run_task args=(queue_item,)) t.start () def run_task(自我,信息): “‘ 执行操作函数 “‘ lock.acquire () 信息=json.loads(信息) 如果信息“类型”==皃ushTaskData”: task_data=https://www.yisu.com/zixun/self.getTaskData(信息[' jid ']) task_data=https://www.yisu.com/zixun/json.loads (task_data)如果其他task_data [] logger.info(“获取缓存数据:% s的% task_data) 如果task_data: 如果self.sendTaskData2bs (task_data): task_data=https://www.yisu.com/zixun/[] 自我。setTaskData(信息(“jid”), task_data) elif信息(“类型”)==皊etTaskState”: self.setTaskState(信息(“jid”),信息(的状态),信息(“信息”)) elif信息(“类型”)==皊etTaskData”: 自我。setTaskData(信息(“jid”),信息(“数据”)) lock.release () def getTaskData(自我,jid): 返回self.r_conn.hget(“任务:”+ jid,“数据”) def setTaskData(自我、jid、数据): self.r_conn.hset('任务:' + jid,‘数据’,json.dumps(数据) def sendTaskData2bs(自我,task_data): logger.info(“发送任务数据到后端……”) logger.info (task_data) 如果task_data: p=PostWeb (task_data jgapi/验证”,“pushFlowTaskData”) 结果=p.postRes () 打印结果 如果结果(“代码”): logger.info(“发送成功!”) 还真 其他: logger.error(“发送失败!”) 返回假 其他: 还真 def setTaskState(自我,jid、州消息="): logger.info('到后端设置任务【% s】状态的% str (jid)) p=PostWeb (/jgapi/验证,{“代码”:jid,“状态”:“成功”,“消息”:消息},“setTaskState”) 结果=p.postRes () 如果结果(“代码”): logger.info(“设置任务【% s】状态成功!' % str (jid)) 返回True,结果 其他: logger.error(“设置任务【% s】状态失败!' % str (jid)) 返回结果 def salt_job_listener (): “‘ 盐工作监听器 “‘ sevent=salt.utils.event.get_event ( “大师”, sock_dir=选择(“sock_dir”), 运输=选择(“运输”), 选择=选择) 而真正的: ret=sevent.get_event(全=True) 如果后悔是没有: 继续 如果:。:随著“标签”,“盐/工作/*/ret/*”): task_key='任务:' + ret(“数据”)(“jid”) task_state=r_conn.hget (task_key,“状态”) task_data=https://www.yisu.com/zixun/r_conn.hget (task_key,“数据”) 如果task_state: jid_data=https://www.yisu.com/zixun/{“代码”:ret(“数据”)(“jid”), “project_id”: settings.SALT_MASTER_OPTS (“project_id”), “serverip”: ret(“数据”)(“id”), “返回”:ret(“数据”)(“返回”), “名称”:ret(“数据”)(“id”), “状态”:“成功”如果ret(“数据”)(“成功”)“失败”, } task_data=https://www.yisu.com/zixun/json.loads (task_data)如果其他task_data [] task_data.append (jid_data) logger.info(“新增数据:% s“% json.dumps (task_data)) r_conn.lpush(任务:刺激:队列,json.dumps({“类型”:“setTaskData”,“jid”:受潮湿腐烂(“数据”)(“jid”),“数据”:task_data})) # r_conn.hset (task_key‘数据’,json.dumps (task_data)) 如果task_state==氨寂堋? 如果len (task_data)祝辞=1: logger.info(“新增消息到队列:pushTaskData”) r_conn.lpush(任务:刺激:队列,json.dumps ({jid: ret(“数据”)(“jid”),“类型”:“pushTaskData”})) 其他: logger.info(“任务{0}完成,发送剩下的数据到后端…”.format (task_key)) logger.info(“新增消息到队列:pushTaskData”) r_conn.lpush(任务:刺激:队列,json.dumps ({jid: ret(“数据”)(“jid”),“类型”:“pushTaskData”})) 打印datetime.datetime.now () def run (): 打印“开始复述,产品队列listerner…” logger.info(“开始复述,产品队列listerner…”) multiprocessing.Process(目标=RedisQueueDaemon r_conn .listen_task, arg游戏=()).start () 打印“开始盐工作listerner…” logger.info(“开始盐工作listerner…”) multiprocessing.Process(目标=salt_job_listener args=()) .start () “‘ p=multiprocessing.Pool (2) 打印“开始复述,产品队列listerner…” p.apply_async (redis_queue_listenr ()) 打印“开始盐工作listerner…” p.apply_async (salt_job_listener ()) p.close () p.join () "python监听盐工作状态,并任务数据推送到复述中的方法