python模拟事件触发机制详解

  

本文实例为大家分享了python模拟事件触发机制的具体代码,供大家参考,具体内容如下

  

EventManager.py         # - * -编码:utf - 8 - *      #系统模块   从进口队列,队列是空的   从线程进口*         类EventManager:   def __init__(自我):   ”““初始化事件管理器“”“   #事件对象列表   自我。__eventQueue=队列()   #事件管理器开关   自我。__active=False   #事件处理线程   自我。__thread=线程(目标=self.__Run)      #这里的__handlers是一个字典,用来保存对应的事件的响应函数   #其中每个键对应的值是一个列表,列表中保存了对该事件监听的响应函数,一对多   自我。__handlers={} #{事件类型:[处理事件的方法]}      def属于(自我):   ””“引擎运行”“”   而自我。__active==True:   试一试:   #获取事件的阻塞时间设为1秒=self.__eventQueue事件。(块=True,超时=1)   self.__EventProcess(事件)   除了空:   通过      def __EventProcess(自我、事件):   ”“”处理事件”“”   #检查是否存在对该事件进行监听的处理函数   如果事件。type_ self.__handlers:   #若存在,则按顺序将事件传递给处理函数执行   处理程序在self.__handlers [event.type_]:   处理程序(事件)      def开始(自我):   ”““启动”“”   #将事件管理器设为启动   自我。__active=True   #启动事件处理线程   self.__thread.start ()      def停止(自我):   ”““停止”“”   #将事件管理器设为停止   自我。__active=False   #等待事件处理线程退出   self.__thread.join ()      def AddEventListener(自我,type_,处理程序):   ”““绑定事件和监听器处理函数”“   #尝试获取该事件类型对应的处理函数列表,若无则创建   试一试:   handlerList=self.__handlers [type_]   除了KeyError:   handlerList=[]      自我。__handlers [type_]=handlerList   #若要注册的处理器不在该事件的处理器列表中,则注册该事件   如果处理程序不在handlerList:   handlerList.append(处理器)      def RemoveEventListener(自我,type_,处理程序):   ””“移除监听器的处理函数”“   #读者自己试着实现      def SendEvent(自我、事件):   ””“发送事件,向事件队列中存入事件“”“   self.__eventQueue.put(事件)      ”““事件对象”“”   类事件:   def __init__(自我,type_=None):   自我。type_=type_ #事件类型   自我。dict={} #字典用于保存具体的事件数据   之前      

test.py         # - * -编码:utf - 8 - *      从线程进口*   从EventManager进口*   导入的时间      #事件名称新文章   EVENT_ARTICAL=" EVENT_ARTICAL "         #事件源公众号   类PublicAccounts:   def __init__(自我,eventManager):   自我。__eventManager=eventManager      def WriteNewArtical(自我):   #事件对象,写了新文章   事件=事件(type_=EVENT_ARTICAL)   事件。dict(“文章”)=u '如何写出更优雅的代码\ n”   #发送事件   self.__eventManager.SendEvent(事件)   打印(u的公众号发送新文章”)         #监听器订阅者   侦听器类:   def __init__(自我、用户名):   自我。__username=用户名      #监听器的处理函数读文章   def ReadArtical(自我、事件):   print (u ' % s收到新文章“% self.__username)   打印(u”正在阅读新文章内容:% s的% event.dict["文章"])         ”““测试函数”“”   def测试():   listner1=侦听器(“thinkroom”) #订阅者1   listner2=侦听器(史蒂夫)#订阅者2      eventManager=eventManager ()      #绑定事件和监听器响应函数(新文章)   eventManager。AddEventListener (EVENT_ARTICAL listner1.ReadArtical)   eventManager。AddEventListener (EVENT_ARTICAL listner2.ReadArtical)   eventManager.Start ()      publicAcc=PublicAccounts (eventManager)   而真正的:   publicAcc.WriteNewArtical ()   time . sleep (2)      if __name__==癬_main__”:   测试()   之前      

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

python模拟事件触发机制详解