风暴是grovvy写的
卡夫卡是scala写的
storm-kafka 风暴连接卡夫卡消费者的插件
下载地址:
https://github.com/wurstmeister/storm卡夫卡- 0.8 +
除了需要风暴和卡夫卡相关的jar包还需要google-collections-1.0.jar
以及饲养员相关包
以前由com.netflix.curator组织开发现在归到org.apache.curator下面
强>
package 演示; import java.util.ArrayList; import 并不知道; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; public class  MyKafkaSpout { public static  void main (String [], args), { ,,,, ,,,String topic =案佟? ,,,ZkHosts ZkHosts ,=, new ZkHosts (“192.168.1.107:2181, 192.168.1.108:2181, 192.168.1.109:2181”); ,,,, ,,,SpoutConfig SpoutConfig =, new SpoutConfig (zkhosts,话题, ,,,,,,,,,,,"/MyKafka ",//偏移量抵消的根目录 ,,,,,,,,,,," MyTrack ");//子目录对应一个应用,,,, ,,,List, zkServers=new ArrayList (); ,,,//zkServers.add (“192.168.1.107”); ,,,//zkServers.add (“192.168.1.108”); ,,,(String 主持人:zkhosts.brokerZkStr.split (", ")) ,,,{ ,,,,,,,zkServers.add (host.split (“:”) [0]); ,,,} ,,,, ,,,spoutConfig.zkServers=zkServers; ,,,spoutConfig.zkPort=2181; ,,,spoutConfig.forceFromStart=true;//从头开始消费,实际上是要改成假的 ,,,spoutConfig.socketTimeoutMs=60; ,,,spoutConfig.scheme=new SchemeAsMultiScheme (new StringScheme());//定义输出为字符串类型 ,,,, ,,,TopologyBuilder builder=new TopologyBuilder (); ,,,builder.setSpout(“槽”,new KafkaSpout (spoutConfig) 1);//引用水柱,并发度设为1 ,,,builder.setBolt (“bolt1”, new MyKafkaBolt (), 1) .shuffleGrouping(“槽”); ,,,, ,,,Config Config =new 配置(); ,,,config.setDebug(真正);//上线之前都要改成假否则日志会非常多 ,,,如果(args.length> 0) { ,,,,,,, ,,,,,,,try { ,,,,,,,,,,,StormSubmitter.submitTopology (args[0],配置,,builder.createTopology ()); ,,,,,,,},catch (AlreadyAliveException e), { ,,,,,,,,,,,//,TODO Auto-generated catch 块 ,,,,,,,,,,,e.printStackTrace (); ,,,,,,,},catch (InvalidTopologyException e), { ,,,,,,,,,,,//,TODO Auto-generated catch 块 ,,,,,,,,,,,e.printStackTrace (); ,,,,,,,} ,,,,,,, 还有,,,}{ ,,,,,,, ,,,,,,,LocalCluster localCluster=new localCluster (); ,,,,,,,localCluster.submitTopology (“mytopology”,配置,,,builder.createTopology ()); ,,,,,,,//本地模式在一个进程里面模拟一个风暴集群的所有功能 ,,,} ,,,, ,,,, ,,,, } }
package 演示; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class  MyKafkaBolt implements IBasicBolt  { ,,@Override null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null nullstorm-kafka(风暴壶嘴作为卡夫卡的消费端)