storm-kafka(风暴壶嘴作为卡夫卡的消费端)

  

风暴是grovvy写的

卡夫卡是scala写的

storm-kafka 风暴连接卡夫卡消费者的插件

下载地址:

https://github.com/wurstmeister/storm卡夫卡- 0.8 +




除了需要风暴和卡夫卡相关的jar包还需要google-collections-1.0.jar

以及饲养员相关包

以前由com.netflix.curator组织开发现在归到org.apache.curator下面



package 演示;      import  java.util.ArrayList;   import 并不知道;      import  backtype.storm.Config;   import  backtype.storm.LocalCluster;   import  backtype.storm.StormSubmitter;   import  backtype.storm.generated.AlreadyAliveException;   import  backtype.storm.generated.InvalidTopologyException;   import  backtype.storm.spout.SchemeAsMultiScheme;   import  backtype.storm.topology.TopologyBuilder;   import  storm.kafka.KafkaSpout;   import  storm.kafka.SpoutConfig;   import  storm.kafka.StringScheme;   import  storm.kafka.ZkHosts;      public  class  MyKafkaSpout  {   public  static  void  main (String [], args), {   ,,,,   ,,,String  topic =案佟?   ,,,ZkHosts  ZkHosts ,=, new  ZkHosts (“192.168.1.107:2181, 192.168.1.108:2181, 192.168.1.109:2181”);   ,,,,   ,,,SpoutConfig  SpoutConfig =, new  SpoutConfig (zkhosts,话题,   ,,,,,,,,,,,"/MyKafka ",//偏移量抵消的根目录   ,,,,,,,,,,," MyTrack ");//子目录对应一个应用,,,,   ,,,List, zkServers=new  ArrayList ();   ,,,//zkServers.add (“192.168.1.107”);   ,,,//zkServers.add (“192.168.1.108”);   ,,,(String 主持人:zkhosts.brokerZkStr.split (", "))   ,,,{   ,,,,,,,zkServers.add (host.split (“:”) [0]);   ,,,}   ,,,,   ,,,spoutConfig.zkServers=zkServers;   ,,,spoutConfig.zkPort=2181;   ,,,spoutConfig.forceFromStart=true;//从头开始消费,实际上是要改成假的   ,,,spoutConfig.socketTimeoutMs=60;   ,,,spoutConfig.scheme=new  SchemeAsMultiScheme (new  StringScheme());//定义输出为字符串类型   ,,,,   ,,,TopologyBuilder  builder=new  TopologyBuilder ();   ,,,builder.setSpout(“槽”,new  KafkaSpout (spoutConfig) 1);//引用水柱,并发度设为1   ,,,builder.setBolt (“bolt1”, new  MyKafkaBolt (), 1) .shuffleGrouping(“槽”);   ,,,,   ,,,Config  Config =new 配置();   ,,,config.setDebug(真正);//上线之前都要改成假否则日志会非常多   ,,,如果(args.length> 0) {   ,,,,,,,   ,,,,,,,try  {   ,,,,,,,,,,,StormSubmitter.submitTopology (args[0],配置,,builder.createTopology ());   ,,,,,,,},catch  (AlreadyAliveException  e), {   ,,,,,,,,,,,//,TODO  Auto-generated  catch 块   ,,,,,,,,,,,e.printStackTrace ();   ,,,,,,,},catch  (InvalidTopologyException  e), {   ,,,,,,,,,,,//,TODO  Auto-generated  catch 块   ,,,,,,,,,,,e.printStackTrace ();   ,,,,,,,}   ,,,,,,,   还有,,,}{   ,,,,,,,   ,,,,,,,LocalCluster  localCluster=new  localCluster ();   ,,,,,,,localCluster.submitTopology (“mytopology”,配置,,,builder.createTopology ());   ,,,,,,,//本地模式在一个进程里面模拟一个风暴集群的所有功能   ,,,}   ,,,,   ,,,,   ,,,,   }   }


package 演示;      import  java.util.Map;      import  backtype.storm.task.TopologyContext;   import  backtype.storm.topology.BasicOutputCollector;   import  backtype.storm.topology.IBasicBolt;   import  backtype.storm.topology.OutputFieldsDeclarer;   import  backtype.storm.tuple.Tuple;      public  class  MyKafkaBolt  implements  IBasicBolt  {      ,,@Override   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null

storm-kafka(风暴壶嘴作为卡夫卡的消费端)