卡夫卡java生产消费程序演示示例

  

  卡夫卡是吞吐量巨大的一个消息系统,它是用scala写的,和普通的消息的生产消费还有所不同,写了个演示程序供大家参考.kafka的安装请参考官方文档。   

  

  首先我们需要新建一个maven项目,然后在pom中引用卡夫卡jar包,引用依赖如下:   

                       1         2         3.         4         5                  & lt; dependency>         ,,,& lt; groupId> org.apache.kafka         ,,,& lt; artifactId> kafka_2.10         ,,,& lt; version> 0.8.0         & lt;/dependency>                           

  我们用的版本是0.8,下面我们看下生产消息的代码:   

                       1         2         3.         4         5         6         7         8         9         10         11         12         13         14         15         16         17         18         19         20.         21         22         23         24         25         26         27         28         29         30.         31         32         33         34         35         36         37         38         39         40         41         42         43         44         45         46         47         48         49         50         51         52         53         54                  package  cn.outofmemory.kafka;         ,         import  java.util.Properties;         ,         import  kafka.javaapi.producer.Producer;         import  kafka.producer.KeyedMessage;         import  kafka.producer.ProducerConfig;         ,/* *         ,* Hello world !         ,*         ,*/public  class  KafkaProducer         {         ,,,private  final  Producer<字符串,String>生产商;         ,,,public  final  static 字符串主题=安馐蕴饽俊?         ,         ,,,private  KafkaProducer () {         ,,,,,,,属性道具=new 属性();         ,,,,,,,//此处配置的是卡夫卡的端口         ,,,,,,,props.put (“metadata.broker。列表”、“192.168.193.148:9092”);         ,         ,,,,,,,//配置值的序列化类         ,,,,,,,props.put(“序列化器。类”、“kafka.serializer.StringEncoder”);         ,,,,,,,//配置关键的序列化类         ,,,,,,,props.put (“key.serializer。类”、“kafka.serializer.StringEncoder”);         ,         ,,,,,,//request.required.acks         ,,,,,,,//0,这意味着生产者从不等待从代理确认(0.7)一样的行为。此选项提供了最低的延迟但最弱的耐久性担保(有些数据服务器失败时将丢失)。         ,,,,,,,//1,这意味着生产者得到确认后,领袖副本已经收到数据。该选项提供了更好的耐久性作为客户机等待,直到服务器请求承认成功(只有消息被写入已遇难的领袖但尚未复制将丢失)。         ,,,,,,,//1,这意味着生产者得到确认毕竟同步副本已经收到数据。该选项提供了最好的耐用性,我们保证不会丢失任何消息只要至少有一个副本仍保持同步。         ,,,,,,,props.put (“request.required.acks”,“1”);         ,         ,,,,,,,制片人=new  Producer<字符串,String> (new  ProducerConfig(道具));         ,,,}         ,         ,,,void 生产(){         ,,,,,,,int  messageNo=1000;         ,,,,,,,final  int 数=10000;         ,         ,,,,,,,while  (messageNo & lt;数){         ,,,,,,,,,,,字符串键=String.valueOf (messageNo);         ,,,,,,,,,,,字符串数据=" https://www.yisu.com/zixun/hello卡夫卡信息”,+键;         ,,,,,,,,,,,producer.send (new  KeyedMessage<字符串,String>(主题、关键数据));         ,,,,,,,,,,,System.out.println(数据);         ,,,,,,,,,,,messageNo + +;         ,,,,,,,}         ,,,}         ,         ,,,public  static  void  main (String [] args)         ,,,{         ,,,,,,,new  KafkaProducer () .produce ();         ,,,}         }                           

卡夫卡java生产消费程序演示示例