基于python的Paxos算法实现

  

理解一个算法最快,最深刻的做法,我觉着可能是自己手动实现,虽然项目中不用自己实现,有已经封装好的算法库,供我们调用,我觉着还是有必要自己亲自实践一下。

  

这里首先说明一下,python这种动态语言,对不熟悉的人可能看着比较别扭,不像java那样参数类型是固定的,所以看着会有些蛋疼。这里环境用的是python2.7。

  

基于python的Paxos算法实现

        类信息:   #命令   MSG_ACCEPTOR_AGREE=0 #追随者约定   MSG_ACCEPTOR_ACCEPT=1 #追随者接受   MSG_ACCEPTOR_REJECT=2 #追随者拒绝——网络不通   MSG_ACCEPTOR_UNACCEPT=3 #追随者网络通——不同意   MSG_ACCEPT=4 #接受   MSG_PROPOSE=5 #提议   MSG_EXT_PROPOSE=6 #额外提议   MSG_HEARTBEAT=7 #心跳,每隔一段时间同步消息   def __init__(自我,命令=None):   self.command=命令   #把收到的消息原原路返回,作为应答消息   def copyAsReply(自我、消息):   # # #提议ID当前的ID发给谁#谁发的   自我。proposalID,自我。instanceID,自我。,自我。=消息来源。proposalID、消息。instanceID、消息。源,message.to   自我。值=https://www.yisu.com/zixun/message.value发的信息      

<>强然后是利用插座,线程和队列实现的消息处理器:

        #基于套接字传递消息,封装网络传递消息   进口线程   进口泡菜   进口套接字   进口队列   类MessagePump (threading.Thread):   #收取消息线程   类MPHelper (threading.Thread):   #   def __init__(自我,所有者):   自我。所有者=所有者   threading.Thread.__init__(自我)   def运行(自我):   虽然不是self.owner。中止:#只要所有者线程没有结束,一直接受消息   试一试:   (字节,addr)=self.owner.socket.recvfrom(2048) #收取消息   味精=pickle.loads(字节)#读取二进制数据转化为消息   味精。源=addr [1]   self.owner.queue.put (msg) #队列存入消息   除了例外e:   通过      def __init__(自我、所有者、端口、超时=2):   threading.Thread.__init__(自我)   自我。所有者=所有者   自我。中止=False   自我。超时=2   自我。端口=端口   自我。套接字=socket.socket(插座。AF_INET socket.SOCK_DGRAM) # UDP通信   self.socket.setsockopt(套接字。SOL_SOCKET,插座。SO_RCVBUF, 200000) #通信参数   self.socket。绑定((“localhost”、港口))#通信地址,ip,端口   self.socket.settimeout(超时)#超时设置   自我。队列=queue.Queue() #队列   自我。辅助=MessagePump.MPHelper(自我)#接收消息      #运行主线程   def运行(自我):   self.helper.start() #开启收消息的线程   虽然不是self.abort:   消息=self.waitForMessage() #阻塞等待   self.owner.recvMessage(消息)#收取消息      #等待消息   def waitForMessage(自我):   试一试:   味精=self.queue。(真的,3)#抓取数据,最多等待3 s   返回味精   除了:   回来没有      #发送消息   def sendMessage(自我、消息):   字节=pickle.dumps(消息)#转化为二进制   地址=(“localhost”, message.to) #地址ip、端口(ip、端口)   self.socket。sendto(字节,地址)   还真   #是否停止收取消息   def doAbort(自我):   自我。中止=True      

<强>再来一个消息处理器,模拟消息的传递,延迟,丢包,其实这个类没什么卵用,这个是为模拟测试准备的

        从MessagePump进口MessagePump   进口随机   类AdversarialMessagePump (MessagePump): #类的继承   #对抗消息传输,延迟消息并任意顺序传递,模拟网络的延迟,消息传送并不是顺序   def __init__(自我、所有者、端口、超时=2):   MessagePump。__init__(自我、所有者、港口、超时)#初始化父类   自我。消息组=()#集合避免重复      def waitForMessage(自我):   试一试:   味精=self.queue。(真的,0.1)#从队列抓取数据   self.messages.add (msg) #添加消息   除了例外e: #处理异常   通过   #打印(e)   如果len (self.messages)比;0和random.random () & lt;0.95:#武断!   味精=random.choice(列表(self.messages) #随机抓取消息发送   self.messages.remove (msg) #删除消息   其他:   味精=没有   返回味精      

<强>再来一个是记录类

基于python的Paxos算法实现