jstorm中螺栓是如何处理异常的

  介绍

jstorm中螺栓是如何处理异常的?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

BasicBoltExecutor源码:

,, public  void 执行(Tuple 输入),{   ,,,_collector.setContext(输入);   ,,,try  {   ,,,,,_bolt.execute(输入,,_collector);   ,,,,,_collector.getOutputter () .ack(输入);   ,,,},catch  (FailedException  e), {   ,,,,,if  (e  instanceof  ReportedFailedException), {   ,,,,,,,_collector.reportError (e);   ,,,,,}   ,,,,,_collector.getOutputter () . fail(输入);   ,,,}   ,,}

_bolt。execute(输入、_collector)就是执行我们自己编写的螺栓里的excute方法。可以看到,在这里,只会抓风暴自己定义的FailedException,并且发送失败消息,标记元组处理失败,其余异常则会被放过。

再外层是BoltExecutors的processTupleEvent方法:

try  {   ,,,,,if  (! isSystemBolt ,,, tuple.getSourceStreamId () .equals (Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID)), {   ,,,,,,,backpressureTrigger.handle(元组);   ,,,,,},{else    ,,,,,,,bolt.execute(元组);   ,,,,,}   ,,,},catch  (Throwable  e), {   ,,,,,error =, e;   ,,,,,LOG.error (“bolt  execute  error “,, e);   ,,,,,report_error.report (e);   ,,,}

在这里,所有异常都会被吸引住,但是只会进行report_error,并不会发失败消息,相关元组只能等超时才能被标记为失败。

再来看report_error.report (e)的具体实现,通过看构造函数,可以看到report_error是一个TaskReportErrorAndDie类,

, @Override   public 才能;void 报告(Throwable 错误),{   ,,,this.reporterror.report(错误);   ,,,this.haltfn.run ();   以前,,}

在这里,reporterror是一个AsyncLoopDefaultKill类

, @Override   public 才能;void 运行(),{   ,,,JStormUtils.halt_process (1,“Async  loop 死了!“);   以前,,}

这里就是整个过程的最终步骤了,JStormUtils.halt_process()方法会打印一条“异步循环死了!“的日志后将工人进程杀死。

<强>思考

通过代码可以出来,对于jstorm,“异常后工人退出”是一个故意设计出的特性,并非程序不健壮。猜测这一块的设计理念就是对于已知异常,开发人员自己捕获并重新抛出FailedException,使相应消息失败;未知异常则强制使进程直接失败退出,避免过度的捕获导致问题被掩盖。

不过虽然话是这么说,对这个设计还是持保留意见,毕竟风暴和普通的java程序不一样,风暴的工人进程在退出后是会自动被重启的,所以这种异常处理方式并不能起到failfast的效果。

相反,工人的持续重启,还会带来一些其他问题。再一个,不主动将消息标为失败,而是等超时,如果设置的超时时间过长(当然超时时间太长也不合理),也会引入一些问题。比如说kafkaSpout,一条消息没被证实之前是不会继续取后边的数据的,这样如果有一条数据需要等超时,同分区下的数据在这一个超时周期内,就都无法被处理了。

从另一方面来说,如果像FailedException一样处理其他所有异,常由于异常之后可以看到有数据失败,也并不会掩盖问题。

jstorm中螺栓是如何处理异常的