,,,,这两天公司要学习卡夫卡,结合之前的风暴,做了一个简单的集成,之前也参考了网上的例子一些例子,发现或多或少都有一些问题,所以自己做了一个。
,,,,这个是网上其他人遇到的问题,给摘录一下,防止以后自己和大家出现:
基本场景是应用出现错误,发送日志到卡夫卡的某个话题,风暴订阅该话题,然后进行后续处理。场景非常简单,但是在学习过程中,遇到一个奇怪的异常情况:使用KafkaSpout读取主题数据时,没有向ZK写抵消数据,致使每次都从头开始读取。纠结了两天,终于碰巧找到原因:应该使用<代码> BaseBasicBolt> 代码作为螺栓的父类,而不是<代码> BaseRichBolt> 代码。
,,,,,,,,
基本场景:订阅卡夫卡的某个话题,然后在读取的消息前加上自定义的字符串,然后写回到卡夫卡另外一个主题,从卡夫卡读取数据的壶嘴使用storm.kafka.KafkaSpout,向卡夫卡写数据的螺栓使用storm.kafka.bolt。KafkaBolt。中间进行进行数据处理的螺栓定义为TopicMsgBolt。
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.TopologyBuilder; import backtype.storm.utils.Utils; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.ZkHosts; import storm.kafka.bolt.KafkaBolt; import java.util.Properties; public class  TopicMsgTopology { ,,,public static void main (String [], args), throws Exception { ,,,,,,,//,配置饲养员地址 ,,,,,,,BrokerHosts BrokerHosts =, new ZkHosts (localhost: 2181); ,,,,,,,//,配置卡夫卡订阅的话题,以及管理员中数据节点目录和名字 ,,,,,,,SpoutConfig SpoutConfig =, new SpoutConfig (brokerHosts,“msgTopic1”,“拓扑/root1”,,“topicMsgTopology”); ,,,,,,,//,配置KafkaBolt中的kafka.broker.properties ,,,,,,,Config conf =, new 配置(); ,,,,,,,Properties props =, new 属性(); ,,,,,,,//,配置Kafka 代理地址 ,,,,,,,props.put (“metadata.broker.list”,“localhost: 9092”); ,,,,,,,//,serializer.class为消息的序列化类 ,,,,,,,props.put (“serializer.class”,“kafka.serializer.StringEncoder”); ,,,,,,,conf.put (“kafka.broker.properties”,道具); ,,,,,,,//,配置KafkaBolt生成的话题 ,,,,,,,conf.put(“主题”,“msgTopic2”); ,,,,,,,spoutConfig.scheme =, new SchemeAsMultiScheme (new MessageScheme ()); ,,,,,,,TopologyBuilder builder =, new TopologyBuilder (); ,,,,,,,builder.setSpout (“msgKafkaSpout”, new KafkaSpout (spoutConfig)); ,,,,,,,builder.setBolt (“msgSentenceBolt”, (IBasicBolt), new TopicMsgBolt ()) .shuffleGrouping (“msgKafkaSpout”); ,,,,,,,builder.setBolt (“msgKafkaBolt”, new KafkaBolt<字符串,Integer> ()) .shuffleGrouping (“msgSentenceBolt”); ,,,,,,,if (args.length ==, 0), { ,,,,,,,,,,,String topologyName =,“kafkaTopicTopology”; ,,,,,,,,,,,LocalCluster cluster =, new LocalCluster (); ,,,,,,,,,,,cluster.submitTopology (topologyName,相依,,builder.createTopology ()); ,,,,,,,,,,,Utils.sleep (100000); ,,,,,,,,,,,cluster.killTopology (topologyName); ,,,,,,,,,,,cluster.shutdown (); ,,,,,,,},{else ,,,,,,,,,,,conf.setNumWorkers (1); ,,,,,,,,,,,StormSubmitter.submitTopology (args[0],相依,,builder.createTopology ()); ,,,,,,,} ,,,} }
storm.kafka.ZkHosts构造方法的参数是动物园管理员标准配置地址的形式
storm.kafka.SpoutConfig构造方法第一个参数为上述的storm.kafka。ZkHosts对象,第二个为待订阅的主题名称,第三个参数zkRoot为写读取主题时的偏移量抵消数据的节点(zk节点),第四个参数为该节点上的次级节点名(有个地方说这个是壶嘴的id), backtype.storm。配置对象是配置风暴的拓扑(拓扑)所需要的基础配置,backtype.storm.spout。null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null