Storm中有个特殊的Executor叫acker,他们负责跟踪spout发出的每一个Tuple的Tuple树。当acker发现一个Tuple树已经处理完成了,它会告诉框架回调Spout的ack(),否则回调Spout的fail()。
Acker的跟踪算法是Storm的主要突破之一,对任意大的一个,它只需要恒定的20字节就可以进行跟踪。
我们期望的是,如果某个Tuple被Bolt执行失败了,则Spout端可以重新发送该Tuple。
package les19.Ack_Fail;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class AckSpout implements IRichSpout{
/**
*
*/
private static final long serialVersionUID=1L;
FileInputStream fis;
InputStreamReader isr;
BufferedReader br;
,私人ConcurrentHashMap<对象,Values>_pending;//线程安全的地图,存储发出过的元组
,,私人ConcurrentHashMap<对象,Integer>fail_pending;//存储失败的元组和其失败次数
SpoutOutputCollector收集器=零;
字符串str=零;
@Override
公共空间nextTuple () {
尝试{
, ((str=this.br.readLine ()) !=null) {
//过滤动作
UUID是否=UUID.randomUUID ();
字符串arr []=str.split (“\ t”);
字符串日期=arr [2]。substring (0, 10);
字符串orderAmt=arr [1];
值val=新值(日期、orderAmt);
,,,this._pending。put(是否,val);
,,,,
,,,收集器。发出(val,是否);
,,,,
,,,,System.out.println (“_pending.size ()=" + _pending.size ());
,,,,
}
}捕捉(异常e) {
//TODO:处理异常
}
}
@Override
公共空间关闭(){
//TODO自动生成方法存根
尝试{
br.close ();
isr.close ();
fis.close ();
}捕捉(异常e) {
//TODO:
e.printStackTrace处理异常();}
}
@Override
//初始化函数
公共空间开放(参看地图,TopologyContext上下文,
SpoutOutputCollector收集器){
尝试{
。收集器=收集器;
。fis=new FileInputStream (“order.log”);
。isr=new InputStreamReader (fis,“utf - 8”);
。br=new BufferedReader (isr);
,,,,_pending=new ConcurrentHashMap<对象,Values> ();
,,,,fail_pending=new ConcurrentHashMap<对象,Integer> ();
}捕捉(异常e) {
e.printStackTrace ();}
//TODO自动生成方法存根
}
@Override
公共空declareOutputFields (OutputFieldsDeclarer庄家){
//TODO自动生成方法存根
declarer.declare(新领域(“日期”、“orderAmt”)),
}
@Override
公共Map<字符串,Object>getComponentConfiguration () {
//TODO自动生成方法存根
返回null;
}
@Override
公共空间ack(对象是否){
//TODO自动生成方法存根
system . out。println (“_pending大小共有:“+ _pending.size ());
system . out。println(“壶嘴ack:”+ msgId.toString () +”——“+ msgId.getClass ());
this._pending.remove(是否);
system . out。println (“_pending大小剩余:“+ _pending.size ());
}
@Override
公共无效激活(){
//TODO自动生成方法存根
}
@Override
公共空禁用(){
//TODO自动生成方法存根
}
@Override
公共空失败(对象是否){
//TODO自动生成方法存根
system . out。println(“壶嘴失败:”+ msgId.toString ());
整数fail_count=fail_pending.get(是否);//获取该元组失败的次数