LearnerZooKeeperServer是所有追随者和观察者的父类,在LearnerZooKeeperServer里有2个重要的属性:
//提交请求处理器
保护CommitProcessor CommitProcessor;
//同步处理器
保护SyncRequestProcessor syncProcessor;
FollowerZooKeeperServer和ObserverZooKeeperServer都继承了LearnerZooKeeperServer服务器。
1, FollowerZooKeeperServer
1.1,类属性
<代码>//待同步的请求 ConcurrentLinkedQueuependingSyncs;//待处理的事务请求 LinkedBlockingQueue pendingTxns=new LinkedBlockingQueue (); 代码>
1.2,核心函数
1.2.1, setupRequestProcessors
构建请求处理链,FollowerZooKeeperServer的请求处理链是:
FollowerRequestProcessor→CommitProcessor→FinalRequestProcessor
<代码> @Override 保护无效setupRequestProcessors () {//最后的处理器 都finalProcessor=new FinalRequestProcessor(这个);//第二个处理器 commitProcessor=new commitProcessor (finalProcessor Long.toString (getServerId()),真的,getZooKeeperServerListener ()); commitProcessor.start ();//第一个请求处理器FollowerRequestProcessor firstProcessor=new FollowerRequestProcessor(这个,commitProcessor); ((FollowerRequestProcessor) firstProcessor) .start (); syncProcessor=new SyncRequestProcessor(这一点, 新SendAckRequestProcessor((学习者)getFollower ())); syncProcessor.start (); }代码>
1.2.2 logRequest
该函数将请求进行记录(放入到对应的队列中),等待处理。
<代码>公共空logRequest (TxnHeader hdr,记录公司{ 请求请求=new请求(hdr.getClientId (), hdr.getCxid (), hdr.getType (), hdr,公司hdr.getZxid ());//zxid不等于0,说明此服务器已经处理过请求 如果(请求。zxid,0 xffffffffl) !=0) {//将该请求放入pendingTxns中,等待事务处理 pendingTxns.add(请求); }//使用SyncRequestProcessor处理请求(其会将请求放在队列中,异步进行处理) syncProcessor.proce * * *装备(请求); }代码>
1.2.3,提交
函数会提交zxid对应的请求(pendingTxns的队首元素),其首先会判断队首请求对应的zxid是否为传入的zxid,然后再进行移除和提交(放在committedRequests队列中)。
<代码>公共空提交(长zxid) {//没有还在等待处理的事务 如果(pendingTxns.size ()==0) { 日志。警告(“提交”+ Long.toHexString (zxid) +“没有看到公司”); 返回; }//队首元素的zxid 长firstElementZxid=pendingTxns.element () .zxid;//如果队首元素的zxid不等于需要提交的zxid,则退出程序 如果(firstElementZxid !=zxid) { 日志。错误(“犯zxid 0 x”+ Long.toHexString (zxid) +“但接下来等待时候0 x” + Long.toHexString (firstElementZxid)); system . exit (12); }//从待处理事务请求队列中移除队首请求 请求请求=pendingTxns.remove ();//提交该请求 commitProcessor.commit(请求); }代码>
2, ObserverZooKeeperServer
2.1,类属性
<代码>//同步处理器是否可用,系统参数控制 私人布尔syncRequestProcessorEnabled=this.self.getSyncEnabled ();//待同步请求队列 ConcurrentLinkedQueuependingSyncs=新的ConcurrentLinkedQueue (); 代码>
2.2,核心方法
2.2.1, setupRequestProcessors
构建请求处理链,ObserverZooKeeperServer的请求处理链是:ObserverRequestProcessor→CommitProcessor→FinalRequestProcessor,可能会存在SyncRequestProcessor。
<代码> @Override 保护无效setupRequestProcessors () {//我们可以考虑改变处理器的行为//观察人士,例如,删除磁盘的同步需求。//当前,他们的行为几乎一模一样的追随者。 都finalProcessor=new FinalRequestProcessor(这个); commitProcessor=new commitProcessor (finalProcessor Long.toString (getServerId()),真的,getZooKeeperServerListener ()); commitProcessor.start (); firstProcessor=new ObserverRequestProcessor(这个,commitProcessor); ((ObserverRequestProcessor) firstProcessor) .start ();/* *观察者应该写入磁盘,所以它不会请求 *太老公司的领导人可能会导致整个 *快照。 * *不过,这可能会降低性能,因为它必须写入磁盘 *和做定期快照可能内存需求的两倍 *///是否使用同步处理器,看系统参数配置,会影响性能 如果(syncRequestProcessorEnabled) { syncProcessor=new SyncRequestProcessor(空); syncProcessor.start (); } }动物园管理员(15)源码分析——服务器(2)