动物园管理员(15)源码分析——服务器(2)

  

LearnerZooKeeperServer是所有追随者和观察者的父类,在LearnerZooKeeperServer里有2个重要的属性:
//提交请求处理器
保护CommitProcessor CommitProcessor;
//同步处理器
保护SyncRequestProcessor syncProcessor;

  

FollowerZooKeeperServer和ObserverZooKeeperServer都继承了LearnerZooKeeperServer服务器。

  

1, FollowerZooKeeperServer

  

1.1,类属性

  
 <代码>//待同步的请求
  ConcurrentLinkedQueuependingSyncs;//待处理的事务请求
  LinkedBlockingQueuependingTxns=new LinkedBlockingQueue();  
  

1.2,核心函数

  

1.2.1, setupRequestProcessors

  

构建请求处理链,FollowerZooKeeperServer的请求处理链是:
FollowerRequestProcessor→CommitProcessor→FinalRequestProcessor

  
 <代码> @Override
  保护无效setupRequestProcessors () {//最后的处理器
  都finalProcessor=new FinalRequestProcessor(这个);//第二个处理器
  commitProcessor=new commitProcessor (finalProcessor
  Long.toString (getServerId()),真的,getZooKeeperServerListener ());
  commitProcessor.start ();//第一个请求处理器FollowerRequestProcessor
  firstProcessor=new FollowerRequestProcessor(这个,commitProcessor);
  ((FollowerRequestProcessor) firstProcessor) .start ();
  syncProcessor=new SyncRequestProcessor(这一点,
  新SendAckRequestProcessor((学习者)getFollower ()));
  syncProcessor.start ();
  } 
  

1.2.2 logRequest

  

该函数将请求进行记录(放入到对应的队列中),等待处理。

  
 <代码>公共空logRequest (TxnHeader hdr,记录公司{
  请求请求=new请求(hdr.getClientId (), hdr.getCxid (), hdr.getType (), hdr,公司hdr.getZxid ());//zxid不等于0,说明此服务器已经处理过请求
  如果(请求。zxid,0 xffffffffl) !=0) {//将该请求放入pendingTxns中,等待事务处理
  pendingTxns.add(请求);
  }//使用SyncRequestProcessor处理请求(其会将请求放在队列中,异步进行处理)
  syncProcessor.proce * * *装备(请求);
  } 
  

1.2.3,提交

  

函数会提交zxid对应的请求(pendingTxns的队首元素),其首先会判断队首请求对应的zxid是否为传入的zxid,然后再进行移除和提交(放在committedRequests队列中)。

  
 <代码>公共空提交(长zxid) {//没有还在等待处理的事务
  如果(pendingTxns.size ()==0) {
  日志。警告(“提交”+ Long.toHexString (zxid)
  +“没有看到公司”);
  返回;
  }//队首元素的zxid
  长firstElementZxid=pendingTxns.element () .zxid;//如果队首元素的zxid不等于需要提交的zxid,则退出程序
  如果(firstElementZxid !=zxid) {
  日志。错误(“犯zxid 0 x”+ Long.toHexString (zxid)
  +“但接下来等待时候0 x”
  + Long.toHexString (firstElementZxid));
  system . exit (12);
  }//从待处理事务请求队列中移除队首请求
  请求请求=pendingTxns.remove ();//提交该请求
  commitProcessor.commit(请求);
  } 
  

2, ObserverZooKeeperServer

  

2.1,类属性

  
 <代码>//同步处理器是否可用,系统参数控制
  私人布尔syncRequestProcessorEnabled=this.self.getSyncEnabled ();//待同步请求队列
  ConcurrentLinkedQueuependingSyncs=新的ConcurrentLinkedQueue ();
   
  

2.2,核心方法

  

2.2.1, setupRequestProcessors

  

构建请求处理链,ObserverZooKeeperServer的请求处理链是:ObserverRequestProcessor→CommitProcessor→FinalRequestProcessor,可能会存在SyncRequestProcessor。

  
 <代码> @Override
  保护无效setupRequestProcessors () {//我们可以考虑改变处理器的行为//观察人士,例如,删除磁盘的同步需求。//当前,他们的行为几乎一模一样的追随者。
  都finalProcessor=new FinalRequestProcessor(这个);
  commitProcessor=new commitProcessor (finalProcessor Long.toString (getServerId()),真的,getZooKeeperServerListener ());
  commitProcessor.start ();
  firstProcessor=new ObserverRequestProcessor(这个,commitProcessor);
  ((ObserverRequestProcessor) firstProcessor) .start ();/*
  *观察者应该写入磁盘,所以它不会请求
  *太老公司的领导人可能会导致整个
  *快照。
  *
  *不过,这可能会降低性能,因为它必须写入磁盘
  *和做定期快照可能内存需求的两倍
  *///是否使用同步处理器,看系统参数配置,会影响性能
  如果(syncRequestProcessorEnabled) {
  syncProcessor=new SyncRequestProcessor(空);
  syncProcessor.start ();
  }
  }

动物园管理员(15)源码分析——服务器(2)