卡夫卡生产国如何使用

  

卡夫卡生产国如何使用?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

<强>一、生产工作流程

生产商使用用户启动生产商的线程,将待发送的消息封装到一个ProducerRecord类实例,然后将其序列化之后发送给分割者,再由后者确定目标分区后一同发送到位于生产者程序中的一块内存缓冲区中,而生产者的另外一个线程(发送方线程)则负责实时从该缓冲区中提取出准备就绪的消息封装进一个批次(批),统一发送给对应的经纪人,具体流程如下图:

卡夫卡生产国如何使用“> </p> <p> <强>二,制片人示例程序开发</强> </p> <p>首先引入卡夫卡相关依赖,在砰的一声。xml文件中加入如下依赖:</p> <pre类= & lt; !——卡夫卡——比;   & lt; dependency>   & lt; groupId> org.apache.kafka   & lt; artifactId> kafka_2.12   & lt; version> 2.2.0   & lt;/dependency>

在资源下面创建kafka-producer。属性配置文件,用于设置卡夫卡参数,内容如下:

bootstrap.servers=192.168.184.128:9092 192.168.184.128:9093 192.168.184.128:9094   key.serializer=org.apache.kafka.common.serialization.StringSerializer   value.serializer=org.apache.kafka.common.serialization.StringSerializer   ack=1   重试=3   batch.size=323840   linger.ms=10   buffer.memory=33554432   max.block。女士=3000

其中,前三个参数必须明确指定,因为这三个参数没有默认值(注:卡夫卡的制作人参数配置可以参考http://kafka.apache.org/documentation/),然后编写生产商发送消息的代码:

/* *   *卡夫卡发送消息测试   * @throws IOException   */公共空间sendMsg()抛出IOException {//1 .构造属性对象   属性=new属性();   FileInputStream FileInputStream=new FileInputStream (“F: \ \ javaCode \ \ jvmdemo主要\ \ src \ \ \ \ \ \ kafka-producer.properties"资源;);   properties.load (fileInputStream);   fileInputStream.close ();//2 .构造kafkaProducer对象   KafkaProducer生产商=new KafkaProducer(属性);   for (int i=0;我& lt;100;我+ +){//3 .构造待发送消息的producerRecord对象,并指定消息要发送到哪个话题,消息的键和值   ProducerRecord testTopic=new ProducerRecord (“testTopic", Integer.toString(我),Integer.toString(我));//4 .调用kafkaProducer对象的发送方法发送消息   producer.send (testTopic);   }//5 .关闭kafkaProducer   producer.close ();   }

然后登陆卡夫卡所在服务器,执行以下命令监听消息:

cd/usr/local/kafka/bin
。/kafka-console-consumer。sh——bootstrap-server 192.168.184.128:9092、192.168.184.128:9093 192.168.184.128:9094——主题testTopic从头

运行sendMsg方法,注意观察消费端,

卡夫卡生产商如何使用“> </p> <p>可以看到有0 - 99之间的数字依次被消费到,说明消息发送成功。</p> <p> <强>三、异步和同步发送消息</强> </p> <p>上面发送消息的示例程序中,没有对发送结果进行处理,如果消息发送失败我们也是无法得知的,这种方法在实际应用中是不推荐的。在实际使用场景中,一般使用异步和同步两种常见发送方式. java版本生产商的发送方法会返回一个未来对象,如果调用Future.get()方法就会无限等待返回结果,实现同步发送的效果,否则就是异步发送。</p> <p> 1。异步发送消息</p> <p> Java版本生产商的发送()方法提供了回调类参数来实现异步发送以及对发送结果进行的响应,具体代码如下:</p> <pre类=/* *   *异步发送消息   *   * @throws IOException   */公共空间sendMsg()抛出IOException {//1 .构造属性对象   属性=new属性();   FileInputStream FileInputStream=new FileInputStream (“F: \ \ javaCode \ \ jvmdemo主要\ \ src \ \ \ \ \ \ kafka-producer.properties"资源;);   properties.load (fileInputStream);   fileInputStream.close ();//2 .构造kafkaProducer对象   KafkaProducer生产商=new KafkaProducer(属性);   for (int i=0;我& lt;100;我+ +){//3 .构造待发送消息的producerRecord对象,并指定消息要发送到哪个话题,消息的键和值   ProducerRecord testTopic=new ProducerRecord (“testTopic", Integer.toString(我),Integer.toString(我));//4 .调用kafkaProducer对象的发送方法发送消息,传入回调函数回调参数   生产商。发送(testTopic,新的回调(){   @Override   公共空间>//4。调用kafkaProducer对象的发送方法发送消息   生产商。发送(testTopic,新的回调(){   @Override   公共空间>//发送消息   未来的未来=producer.send (testTopic);   尝试{//调用得到方法等待结果返回,发送失败则会抛出异常   future.get ();   }捕捉(异常e) {   System.out.println(“消息发送失败“);   }

卡夫卡生产国如何使用