python监听盐工作状态,并任务数据推送到复述中的方法

  

盐分发后,主动将已完成的任务数据推送到复述中,使用复述的生产者模式,进行消息传送

        #=utf - 8编码   进口:json,日志记录   进口salt.config   进口salt.utils.event   从salt.utils。复述,进口RedisPool   导入系统,操作系统,datetime,随机的   进口多处理、线程   从joi.utils。gobsAPI进口PostWeb   记录器=logging.getLogger (__name__)   选择=salt.config.client_config('/数据/盐/saltstack/etc/盐/主’)   r_conn=RedisPool (opts.get (redis_db)) .getConn ()   锁=threading.Lock ()   类RedisQueueDaemon(对象):   “‘   复述,队列监听器   “‘   def __init__(自我,r_conn):   自我。r_conn=r_conn #复述,连接实例   自我。task_queue='任务:刺激:队列#任务消息队列   def listen_task(自我):   “‘   监听主函数   “‘   而真正的:   queue_item=self.r_conn.blpop (self.task_queue,0) [1]   打印“队列”,queue_item   # self.run_task (queue_item)   t=threading.Thread(目标=self.run_task args=(queue_item,))   t.start ()   def run_task(自我,信息):   “‘   执行操作函数   “‘   lock.acquire ()   信息=json.loads(信息)   如果信息“类型”==皃ushTaskData”:   task_data=https://www.yisu.com/zixun/self.getTaskData(信息[' jid '])   task_data=https://www.yisu.com/zixun/json.loads (task_data)如果其他task_data []   logger.info(“获取缓存数据:% s的% task_data)   如果task_data:   如果self.sendTaskData2bs (task_data):   task_data=https://www.yisu.com/zixun/[]   自我。setTaskData(信息(“jid”), task_data)   elif信息(“类型”)==皊etTaskState”:   self.setTaskState(信息(“jid”),信息(的状态),信息(“信息”))   elif信息(“类型”)==皊etTaskData”:   自我。setTaskData(信息(“jid”),信息(“数据”))   lock.release ()   def getTaskData(自我,jid):   返回self.r_conn.hget(“任务:”+ jid,“数据”)   def setTaskData(自我、jid、数据):   self.r_conn.hset('任务:' + jid,‘数据’,json.dumps(数据)   def sendTaskData2bs(自我,task_data):   logger.info(“发送任务数据到后端……”)   logger.info (task_data)   如果task_data:   p=PostWeb (task_data jgapi/验证”,“pushFlowTaskData”)   结果=p.postRes ()   打印结果   如果结果(“代码”):   logger.info(“发送成功!”)   还真   其他:   logger.error(“发送失败!”)   返回假   其他:   还真   def setTaskState(自我,jid、州消息="):   logger.info('到后端设置任务【% s】状态的% str (jid))   p=PostWeb (/jgapi/验证,{“代码”:jid,“状态”:“成功”,“消息”:消息},“setTaskState”)   结果=p.postRes ()   如果结果(“代码”):   logger.info(“设置任务【% s】状态成功!' % str (jid))   返回True,结果   其他:   logger.error(“设置任务【% s】状态失败!' % str (jid))   返回结果   def salt_job_listener ():   “‘   盐工作监听器   “‘   sevent=salt.utils.event.get_event (   “大师”,   sock_dir=选择(“sock_dir”),   运输=选择(“运输”),   选择=选择)   而真正的:   ret=sevent.get_event(全=True)   如果后悔是没有:   继续   如果:。:随著“标签”,“盐/工作/*/ret/*”):   task_key='任务:' + ret(“数据”)(“jid”)   task_state=r_conn.hget (task_key,“状态”)   task_data=https://www.yisu.com/zixun/r_conn.hget (task_key,“数据”)   如果task_state:   jid_data=https://www.yisu.com/zixun/{“代码”:ret(“数据”)(“jid”),   “project_id”: settings.SALT_MASTER_OPTS (“project_id”),   “serverip”: ret(“数据”)(“id”),   “返回”:ret(“数据”)(“返回”),   “名称”:ret(“数据”)(“id”),   “状态”:“成功”如果ret(“数据”)(“成功”)“失败”,   }   task_data=https://www.yisu.com/zixun/json.loads (task_data)如果其他task_data []   task_data.append (jid_data)   logger.info(“新增数据:% s“% json.dumps (task_data))   r_conn.lpush(任务:刺激:队列,json.dumps({“类型”:“setTaskData”,“jid”:受潮湿腐烂(“数据”)(“jid”),“数据”:task_data}))   # r_conn.hset (task_key‘数据’,json.dumps (task_data))   如果task_state==氨寂堋?   如果len (task_data)祝辞=1:   logger.info(“新增消息到队列:pushTaskData”)   r_conn.lpush(任务:刺激:队列,json.dumps ({jid: ret(“数据”)(“jid”),“类型”:“pushTaskData”}))   其他:   logger.info(“任务{0}完成,发送剩下的数据到后端…”.format (task_key))   logger.info(“新增消息到队列:pushTaskData”)   r_conn.lpush(任务:刺激:队列,json.dumps ({jid: ret(“数据”)(“jid”),“类型”:“pushTaskData”}))      打印datetime.datetime.now ()      def run ():   打印“开始复述,产品队列listerner…”   logger.info(“开始复述,产品队列listerner…”)   multiprocessing.Process(目标=RedisQueueDaemon r_conn .listen_task, arg游戏=()).start ()   打印“开始盐工作listerner…”   logger.info(“开始盐工作listerner…”)   multiprocessing.Process(目标=salt_job_listener args=()) .start ()      “‘   p=multiprocessing.Pool (2)   打印“开始复述,产品队列listerner…”   p.apply_async (redis_queue_listenr ())   打印“开始盐工作listerner…”   p.apply_async (salt_job_listener ())   p.close ()   p.join ()   "

python监听盐工作状态,并任务数据推送到复述中的方法