卡夫卡生产国如何使用?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
<强>一、生产工作流程强>
生产商使用用户启动生产商的线程,将待发送的消息封装到一个ProducerRecord类实例,然后将其序列化之后发送给分割者,再由后者确定目标分区后一同发送到位于生产者程序中的一块内存缓冲区中,而生产者的另外一个线程(发送方线程)则负责实时从该缓冲区中提取出准备就绪的消息封装进一个批次(批),统一发送给对应的经纪人,具体流程如下图:
& lt; !——卡夫卡——比; & lt; dependency> & lt; groupId> org.apache.kafka & lt; artifactId> kafka_2.12 & lt; version> 2.2.0 & lt;/dependency>
在资源下面创建kafka-producer。属性配置文件,用于设置卡夫卡参数,内容如下:
bootstrap.servers=192.168.184.128:9092 192.168.184.128:9093 192.168.184.128:9094 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer ack=1 重试=3 batch.size=323840 linger.ms=10 buffer.memory=33554432 max.block。女士=3000
其中,前三个参数必须明确指定,因为这三个参数没有默认值(注:卡夫卡的制作人参数配置可以参考http://kafka.apache.org/documentation/),然后编写生产商发送消息的代码:
/* * *卡夫卡发送消息测试 * @throws IOException */公共空间sendMsg()抛出IOException {//1 .构造属性对象 属性=new属性(); FileInputStream FileInputStream=new FileInputStream (“F: \ \ javaCode \ \ jvmdemo主要\ \ src \ \ \ \ \ \ kafka-producer.properties"资源;); properties.load (fileInputStream); fileInputStream.close ();//2 .构造kafkaProducer对象 KafkaProducer生产商=new KafkaProducer(属性); for (int i=0;我& lt;100;我+ +){//3 .构造待发送消息的producerRecord对象,并指定消息要发送到哪个话题,消息的键和值 ProducerRecord testTopic=new ProducerRecord (“testTopic", Integer.toString(我),Integer.toString(我));//4 .调用kafkaProducer对象的发送方法发送消息 producer.send (testTopic); }//5 .关闭kafkaProducer producer.close (); }
然后登陆卡夫卡所在服务器,执行以下命令监听消息:
cd/usr/local/kafka/bin
引用>
。/kafka-console-consumer。sh——bootstrap-server 192.168.184.128:9092、192.168.184.128:9093 192.168.184.128:9094——主题testTopic从头运行sendMsg方法,注意观察消费端,
/* * *异步发送消息 * * @throws IOException */公共空间sendMsg()抛出IOException {//1 .构造属性对象 属性=new属性(); FileInputStream FileInputStream=new FileInputStream (“F: \ \ javaCode \ \ jvmdemo主要\ \ src \ \ \ \ \ \ kafka-producer.properties"资源;); properties.load (fileInputStream); fileInputStream.close ();//2 .构造kafkaProducer对象 KafkaProducer生产商=new KafkaProducer(属性); for (int i=0;我& lt;100;我+ +){//3 .构造待发送消息的producerRecord对象,并指定消息要发送到哪个话题,消息的键和值 ProducerRecord testTopic=new ProducerRecord (“testTopic", Integer.toString(我),Integer.toString(我));//4 .调用kafkaProducer对象的发送方法发送消息,传入回调函数回调参数 生产商。发送(testTopic,新的回调(){ @Override 公共空间>//4。调用kafkaProducer对象的发送方法发送消息 生产商。发送(testTopic,新的回调(){ @Override 公共空间>//发送消息 未来的未来=producer.send (testTopic); 尝试{//调用得到方法等待结果返回,发送失败则会抛出异常 future.get (); }捕捉(异常e) { System.out.println(“消息发送失败“); }
卡夫卡生产国如何使用