本文实例为大家分享了python模拟事件触发机制的具体代码,供大家参考,具体内容如下
EventManager.py
# - * -编码:utf - 8 - * #系统模块 从进口队列,队列是空的 从线程进口* 类EventManager: def __init__(自我): ”““初始化事件管理器“”“ #事件对象列表 自我。__eventQueue=队列() #事件管理器开关 自我。__active=False #事件处理线程 自我。__thread=线程(目标=self.__Run) #这里的__handlers是一个字典,用来保存对应的事件的响应函数 #其中每个键对应的值是一个列表,列表中保存了对该事件监听的响应函数,一对多 自我。__handlers={} #{事件类型:[处理事件的方法]} def属于(自我): ””“引擎运行”“” 而自我。__active==True: 试一试: #获取事件的阻塞时间设为1秒=self.__eventQueue事件。(块=True,超时=1) self.__EventProcess(事件) 除了空: 通过 def __EventProcess(自我、事件): ”“”处理事件”“” #检查是否存在对该事件进行监听的处理函数 如果事件。type_ self.__handlers: #若存在,则按顺序将事件传递给处理函数执行 处理程序在self.__handlers [event.type_]: 处理程序(事件) def开始(自我): ”““启动”“” #将事件管理器设为启动 自我。__active=True #启动事件处理线程 self.__thread.start () def停止(自我): ”““停止”“” #将事件管理器设为停止 自我。__active=False #等待事件处理线程退出 self.__thread.join () def AddEventListener(自我,type_,处理程序): ”““绑定事件和监听器处理函数”“ #尝试获取该事件类型对应的处理函数列表,若无则创建 试一试: handlerList=self.__handlers [type_] 除了KeyError: handlerList=[] 自我。__handlers [type_]=handlerList #若要注册的处理器不在该事件的处理器列表中,则注册该事件 如果处理程序不在handlerList: handlerList.append(处理器) def RemoveEventListener(自我,type_,处理程序): ””“移除监听器的处理函数”“ #读者自己试着实现 def SendEvent(自我、事件): ””“发送事件,向事件队列中存入事件“”“ self.__eventQueue.put(事件) ”““事件对象”“” 类事件: def __init__(自我,type_=None): 自我。type_=type_ #事件类型 自我。dict={} #字典用于保存具体的事件数据 >之前test.py
# - * -编码:utf - 8 - * 从线程进口* 从EventManager进口* 导入的时间 #事件名称新文章 EVENT_ARTICAL=" EVENT_ARTICAL " #事件源公众号 类PublicAccounts: def __init__(自我,eventManager): 自我。__eventManager=eventManager def WriteNewArtical(自我): #事件对象,写了新文章 事件=事件(type_=EVENT_ARTICAL) 事件。dict(“文章”)=u '如何写出更优雅的代码\ n” #发送事件 self.__eventManager.SendEvent(事件) 打印(u的公众号发送新文章”) #监听器订阅者 侦听器类: def __init__(自我、用户名): 自我。__username=用户名 #监听器的处理函数读文章 def ReadArtical(自我、事件): print (u ' % s收到新文章“% self.__username) 打印(u”正在阅读新文章内容:% s的% event.dict["文章"]) ”““测试函数”“” def测试(): listner1=侦听器(“thinkroom”) #订阅者1 listner2=侦听器(史蒂夫)#订阅者2 eventManager=eventManager () #绑定事件和监听器响应函数(新文章) eventManager。AddEventListener (EVENT_ARTICAL listner1.ReadArtical) eventManager。AddEventListener (EVENT_ARTICAL listner2.ReadArtical) eventManager.Start () publicAcc=PublicAccounts (eventManager) 而真正的: publicAcc.WriteNewArtical () time . sleep (2) if __name__==癬_main__”: 测试() >之前以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
python模拟事件触发机制详解