风暴容错机制Acker详解和实战案例

Storm中有个特殊的Executor叫acker,他们负责跟踪spout发出的每一个Tuple的Tuple树。当acker发现一个Tuple树已经处理完成了,它会告诉框架回调Spout的ack(),否则回调Spout的fail()。


Acker的跟踪算法是Storm的主要突破之一,对任意大的一个,它只需要恒定的20字节就可以进行跟踪。


我们期望的是,如果某个Tuple被Bolt执行失败了,则Spout端可以重新发送该Tuple。


Storm容错机制Acker详解和实战案例


Storm容错机制Acker详解和实战案例

Storm容错机制Acker详解和实战案例


package les19.Ack_Fail;


import java.io.BufferedReader;

import java.io.FileInputStream;

import java.io.InputStreamReader;

import java.util.Map;

import java.util.UUID;

import java.util.concurrent.ConcurrentHashMap;


import org.apache.storm.spout.SpoutOutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.IRichSpout;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;


public class AckSpout implements IRichSpout{


/**

*/

private static final long serialVersionUID=1L;


FileInputStream fis;

InputStreamReader isr;

BufferedReader br;

  ,私人ConcurrentHashMap<对象,Values>_pending;//线程安全的地图,存储发出过的元组

,,私人ConcurrentHashMap<对象,Integer>fail_pending;//存储失败的元组和其失败次数

SpoutOutputCollector收集器=零;

字符串str=零;


@Override

公共空间nextTuple () {

尝试{

, ((str=this.br.readLine ()) !=null) {

//过滤动作

UUID是否=UUID.randomUUID ();

字符串arr []=str.split (“\ t”);

字符串日期=arr [2]。substring (0, 10);

字符串orderAmt=arr [1];

值val=新值(日期、orderAmt);

,,,this._pending。put(是否,val);

,,,,

,,,收集器。发出(val,是否);

,,,,

,,,,System.out.println (“_pending.size ()=" + _pending.size ());

,,,,

}

}捕捉(异常e) {

//TODO:处理异常

}

}

@Override

公共空间关闭(){

//TODO自动生成方法存根

尝试{

br.close ();

isr.close ();

fis.close ();

}捕捉(异常e) {

//TODO:

e.printStackTrace处理异常();}

}

@Override

//初始化函数

公共空间开放(参看地图,TopologyContext上下文,

SpoutOutputCollector收集器){

尝试{

。收集器=收集器;

。fis=new FileInputStream (“order.log”);

。isr=new InputStreamReader (fis,“utf - 8”);

。br=new BufferedReader (isr);

,,,,_pending=new ConcurrentHashMap<对象,Values> ();

,,,,fail_pending=new ConcurrentHashMap<对象,Integer> ();

}捕捉(异常e) {

e.printStackTrace ();}

//TODO自动生成方法存根

}


@Override

公共空declareOutputFields (OutputFieldsDeclarer庄家){

//TODO自动生成方法存根

declarer.declare(新领域(“日期”、“orderAmt”)),

}


@Override

公共Map<字符串,Object>getComponentConfiguration () {

//TODO自动生成方法存根

返回null;

}

@Override

公共空间ack(对象是否){

//TODO自动生成方法存根

system . out。println (“_pending大小共有:“+ _pending.size ());

system . out。println(“壶嘴ack:”+ msgId.toString () +”——“+ msgId.getClass ());

this._pending.remove(是否);

system . out。println (“_pending大小剩余:“+ _pending.size ());

}


@Override

公共无效激活(){

//TODO自动生成方法存根

}




@Override

公共空禁用(){

//TODO自动生成方法存根

}


@Override

公共空失败(对象是否){

//TODO自动生成方法存根

system . out。println(“壶嘴失败:”+ msgId.toString ());

整数fail_count=fail_pending.get(是否);//获取该元组失败的次数

风暴容错机制Acker详解和实战案例