Python自定义线程池实现方法分析

  

本文实例讲述了Python自定义线程池实现方法。分享给大家供大家参考,具体如下:

  

关于python的多线程,由与吉尔的存在被广大群主所诟病,说python的多线程不是真正的多线程。但多线程处理IO密集的任务效率还是可以杠杠的。

  

我实现的这个线程池其实是根据银角的思路来实现的。

  

  

<>强任务获取和执行:

  

1,任务加入队列,等待线程来获取并执行。
  2、按需生成线程,每个线程循环取任务。

  

<强>线程销毁:

  

1,获取任务是终止符时,线程停止。
  2,线程池close()时,向任务队列加入和已生成线程等量的终止符。
  3,线程池终止()时,设置线程下次任务取到为终止符。

  

  

 Python自定义线程池实现方法分析

  

        进口线程   进口contextlib   从队列进口队列   导入的时间   类ThreadPool(对象):   def __init__(自我,max_num):   自我。StopEvent=0 #线程任务终止符,当线程从队列获取到StopEvent时,代表此线程可以销毁。可设置为任意与任务有区别的值。   自我。q=队列()   自我。max_num=max_num #最大线程数   自我。终端=False #是否设置线程池强制终止   自我。created_list=[] #已创建线程的线程列表   self.free_list=[] #空闲线程的线程列表   自我。虚拟光驱=False #线程是否是后台线程   def运行(自我、函数参数,调=None):   """   线程池执行一个任务   :param func:任务函数   :参数arg游戏:任务函数所需参数   :param回调:   返回:如果线程池已经终止,则返回真否则没有   """   如果len (self.free_list)==0和len (self.created_list) & lt;self.max_num:   self.create_thread ()   任务=(函数、参数、回调)   self.q.put(任务)   def create_thread(自我):   """   创建一个线程   """   t=threading.Thread(目标=self.call)   t.setDaemon (self.Deamon)   t.start ()   self.created_list.append (t) #将当前线程加入已创建线程列表created_list   def调用(自我):   """   循环去获取任务函数并执行任务函数   """   current_thread=threading.current_thread() #获取当前线程对象·   事件=self.q.get() #从任务队列获取任务   虽然事件!=自我。StopEvent: #判断获取到的任务是否是终止符   函数,参数,调=事件#从任务中获取函数名,参数,和回调函数名   试一试:   结果=func(*参数)   func_excute_status=True # func执行成功状态   除了例外e:   func_excute_status=False   结果=没有   打印的函数执行产生错误的,e #打印错误信息   如果func_excute_status: # func执行成功后才能执行回调函数   如果回调不是没有:#判断回调函数是否是空的   试一试:   回调(结果)   除了例外e:   打印的回调函数执行产生错误的,e #打印错误信息   self.worker_state (self.free_list current_thread):   #执行完一次任务后,将线程加入空闲列表。然后继续去取任务,如果取到任务就将线程从空闲列表移除   如果self.terminal: #判断线程池终止命令,如果需要终止,则使下次取到的任务为StopEvent。   事件=self.StopEvent   其他:#否则继续获取任务   事件=self.q.get() #当线程等待任务时,q.get()方法阻塞住线程,使其持续等待   其他:#若线程取到的任务是终止符,就销毁线程   #将当前线程从已创建线程列表created_list移除   self.created_list.remove (current_thread)   def关闭(自我):   """   执行完所有的任务后,所有线程停止   """   full_size=len (self.created_list) #按已创建的线程数量往线程队列加入终止符。   而full_size:   self.q.put (self.StopEvent)   full_size -=1   def终止(自我):   """   无论是否还有任务,终止线程   """   自我。终端=True   而self.created_list:   self.q.put (self.StopEvent)   self.q.queue.clear() #清空任务队列   def加入(自我):   """   阻塞线程池上下文,使所有线程执行完后才能继续   """   t self.created_list:   t.join ()   @contextlib.contextmanager #上下文处理器,使其可以使与语用句修饰   def worker_state(自我、state_list worker_thread):   """   用于记录线程中正在等待的线程数   """   state_list.append (worker_thread)   试一试:   收益率   最后:   state_list.remove (worker_thread)   if __name__==癬_main__”:   def Foo (arg):   返回参数   # time . sleep (0.1)   def酒吧(res):   打印res   池=ThreadPool (5)   # pool.Deamon=True #需在pool.run之前设置   因为我在范围(1000):   pool.run (func=Foo, arg游戏=(我),回调=Bar)   pool.close ()   pool.join ()   # pool.terminate ()   打印”任务队列里任务数% s % pool.q.qsize ()   打印”当前存活子线程数量:% d % threading.activeCount ()   打印”当前线程创建列表:% s“% pool.created_list   打印”当前线程创建列表:% s“% pool.free_list      

Python自定义线程池实现方法分析