风暴+卡夫卡集成简单应用

  

,,,,这两天公司要学习卡夫卡,结合之前的风暴,做了一个简单的集成,之前也参考了网上的例子一些例子,发现或多或少都有一些问题,所以自己做了一个。


,,,,这个是网上其他人遇到的问题,给摘录一下,防止以后自己和大家出现:


基本场景是应用出现错误,发送日志到卡夫卡的某个话题,风暴订阅该话题,然后进行后续处理。场景非常简单,但是在学习过程中,遇到一个奇怪的异常情况:使用KafkaSpout读取主题数据时,没有向ZK写抵消数据,致使每次都从头开始读取。纠结了两天,终于碰巧找到原因:应该使用<代码> BaseBasicBolt> BaseRichBolt>

,,,,,,,,

基本场景:订阅卡夫卡的某个话题,然后在读取的消息前加上自定义的字符串,然后写回到卡夫卡另外一个主题,从卡夫卡读取数据的壶嘴使用storm.kafka.KafkaSpout,向卡夫卡写数据的螺栓使用storm.kafka.bolt。KafkaBolt。中间进行进行数据处理的螺栓定义为TopicMsgBolt。

import  backtype.storm.Config;   import  backtype.storm.LocalCluster;   import  backtype.storm.StormSubmitter;   import  backtype.storm.spout.SchemeAsMultiScheme;   import  backtype.storm.topology.IBasicBolt;   import  backtype.storm.topology.TopologyBuilder;   import  backtype.storm.utils.Utils;   import  storm.kafka.BrokerHosts;   import  storm.kafka.KafkaSpout;   import  storm.kafka.SpoutConfig;   import  storm.kafka.ZkHosts;   import  storm.kafka.bolt.KafkaBolt;      import  java.util.Properties;      public  class  TopicMsgTopology  {   ,,,public  static  void  main (String [], args), throws  Exception  {   ,,,,,,,//,配置饲养员地址   ,,,,,,,BrokerHosts  BrokerHosts =, new  ZkHosts (localhost: 2181);   ,,,,,,,//,配置卡夫卡订阅的话题,以及管理员中数据节点目录和名字   ,,,,,,,SpoutConfig  SpoutConfig =, new  SpoutConfig (brokerHosts,“msgTopic1”,“拓扑/root1”,,“topicMsgTopology”);   ,,,,,,,//,配置KafkaBolt中的kafka.broker.properties   ,,,,,,,Config  conf =, new 配置();   ,,,,,,,Properties  props =, new 属性();   ,,,,,,,//,配置Kafka 代理地址   ,,,,,,,props.put (“metadata.broker.list”,“localhost: 9092”);   ,,,,,,,//,serializer.class为消息的序列化类   ,,,,,,,props.put (“serializer.class”,“kafka.serializer.StringEncoder”);   ,,,,,,,conf.put (“kafka.broker.properties”,道具);   ,,,,,,,//,配置KafkaBolt生成的话题   ,,,,,,,conf.put(“主题”,“msgTopic2”);   ,,,,,,,spoutConfig.scheme =, new  SchemeAsMultiScheme (new  MessageScheme ());   ,,,,,,,TopologyBuilder  builder =, new  TopologyBuilder ();   ,,,,,,,builder.setSpout (“msgKafkaSpout”, new  KafkaSpout (spoutConfig));   ,,,,,,,builder.setBolt (“msgSentenceBolt”, (IBasicBolt), new  TopicMsgBolt ()) .shuffleGrouping (“msgKafkaSpout”);   ,,,,,,,builder.setBolt (“msgKafkaBolt”, new  KafkaBolt<字符串,Integer> ()) .shuffleGrouping (“msgSentenceBolt”);   ,,,,,,,if  (args.length ==, 0), {   ,,,,,,,,,,,String  topologyName =,“kafkaTopicTopology”;   ,,,,,,,,,,,LocalCluster  cluster =, new  LocalCluster ();   ,,,,,,,,,,,cluster.submitTopology (topologyName,相依,,builder.createTopology ());   ,,,,,,,,,,,Utils.sleep (100000);   ,,,,,,,,,,,cluster.killTopology (topologyName);   ,,,,,,,,,,,cluster.shutdown ();   ,,,,,,,},{else    ,,,,,,,,,,,conf.setNumWorkers (1);   ,,,,,,,,,,,StormSubmitter.submitTopology (args[0],相依,,builder.createTopology ());   ,,,,,,,}   ,,,}   }


storm.kafka.ZkHosts构造方法的参数是动物园管理员标准配置地址的形式

storm.kafka.SpoutConfig构造方法第一个参数为上述的storm.kafka。ZkHosts对象,第二个为待订阅的主题名称,第三个参数zkRoot为写读取主题时的偏移量抵消数据的节点(zk节点),第四个参数为该节点上的次级节点名(有个地方说这个是壶嘴的id), backtype.storm。配置对象是配置风暴的拓扑(拓扑)所需要的基础配置,backtype.storm.spout。null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null

风暴+卡夫卡集成简单应用