卡夫卡是吞吐量巨大的一个消息系统,它是用scala写的,和普通的消息的生产消费还有所不同,写了个演示程序供大家参考.kafka的安装请参考官方文档。
首先我们需要新建一个maven项目,然后在pom中引用卡夫卡jar包,引用依赖如下:
1 2 3. 4 5 & lt; dependency> ,,,& lt; groupId> org.apache.kafka ,,,& lt; artifactId> kafka_2.10 ,,,& lt; version> 0.8.0 & lt;/dependency>我们用的版本是0.8,下面我们看下生产消息的代码:
1 2 3. 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20. 21 22 23 24 25 26 27 28 29 30. 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 package cn.outofmemory.kafka; , import java.util.Properties; , import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; ,/* * ,* Hello world ! ,* ,*/public class  KafkaProducer { ,,,private final Producer<字符串,String>生产商; ,,,public final static 字符串主题=安馐蕴饽俊? , ,,,private KafkaProducer () { ,,,,,,,属性道具=new 属性(); ,,,,,,,//此处配置的是卡夫卡的端口 ,,,,,,,props.put (“metadata.broker。列表”、“192.168.193.148:9092”); , ,,,,,,,//配置值的序列化类 ,,,,,,,props.put(“序列化器。类”、“kafka.serializer.StringEncoder”); ,,,,,,,//配置关键的序列化类 ,,,,,,,props.put (“key.serializer。类”、“kafka.serializer.StringEncoder”); , ,,,,,,//request.required.acks ,,,,,,,//0,这意味着生产者从不等待从代理确认(0.7)一样的行为。此选项提供了最低的延迟但最弱的耐久性担保(有些数据服务器失败时将丢失)。 ,,,,,,,//1,这意味着生产者得到确认后,领袖副本已经收到数据。该选项提供了更好的耐久性作为客户机等待,直到服务器请求承认成功(只有消息被写入已遇难的领袖但尚未复制将丢失)。 ,,,,,,,//1,这意味着生产者得到确认毕竟同步副本已经收到数据。该选项提供了最好的耐用性,我们保证不会丢失任何消息只要至少有一个副本仍保持同步。 ,,,,,,,props.put (“request.required.acks”,“1”); , ,,,,,,,制片人=new Producer<字符串,String> (new ProducerConfig(道具)); ,,,} , ,,,void 生产(){ ,,,,,,,int messageNo=1000; ,,,,,,,final int 数=10000; , ,,,,,,,while (messageNo & lt;数){ ,,,,,,,,,,,字符串键=String.valueOf (messageNo); ,,,,,,,,,,,字符串数据=" https://www.yisu.com/zixun/hello卡夫卡信息”,+键; ,,,,,,,,,,,producer.send (new KeyedMessage<字符串,String>(主题、关键数据)); ,,,,,,,,,,,System.out.println(数据); ,,,,,,,,,,,messageNo + +; ,,,,,,,} ,,,} , ,,,public static void main (String [] args) ,,,{ ,,,,,,,new KafkaProducer () .produce (); ,,,} }