介绍
2。创建演示项目引入spring-kafka
2.4定义KafkaSender
2.5定义KafkaConsumer
3。测试效果
本篇文章给大家分享的是有关如何在SpringBoot项目中使用卡夫卡,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
搭建卡夫卡和饲养员环境并且启动它们
2。创建演示项目引入spring-kafka
2.1 pom文件
& lt; dependency> & lt;才能groupId> org.springframework.boot</groupId> & lt;才能artifactId> spring-boot-starter-web</artifactId> & lt;/dependency> & lt; dependency> & lt;才能groupId> org.springframework.kafka</groupId> & lt;才能artifactId> spring-kafka</artifactId> & lt;/dependency> & lt; dependency> & lt;才能groupId> com.google.code.gson</groupId> & lt;才能artifactId> gson</artifactId> & lt;/dependency>
2.2配置应用程序。yml
春: ,卡夫卡: ,,bootstrap-servers: 192.168.25.6:9092 # bootstrap-servers:连接卡夫卡的地址,多个地址用逗号分隔 消费者才能: ,才能组id:组,myGroup ,,enable-auto-commit:真实 ,,auto-commit-interval: 100 ms ,,属性: ,,,session.timeout.ms: 15000 ,,key-deserializer: org.apache.kafka.common.serialization.StringDeserializer ,,value-deserializer: org.apache.kafka.common.serialization.StringDeserializer ,,auto-offset-reset:最早 制作人:才能 ,,,重试:0,#若设置大于0的值,客户端会将发送失败的记录重新发送 ,,,批量大小:16384,#当将多个记录被发送到同一个分区时,,Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)0是缺省的配置 ,,,缓冲区内存:33554432,# Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置 ,,,key-serializer: org.apache.kafka.common.serialization.StringSerializer #关键字的序列化类 ,,value-serializer: org.apache.kafka.common.serialization.StringSerializer #值的序列化类
2.3定义消息体消息
/* * ,* @author ,约翰尼 ,* @create 2020 - 09 - 23,上午21 * */@ data public class  Message { private 才能Long id; private 才能String 味精; private 才能;Date  sendTime; }
2.4定义KafkaSender
主要利用KafkaTemplate来发送消息,将消息封装成消息并且进行转化成Json串发送到卡夫卡中
@ component @Slf4j public class  KafkaSender { private 才能;final  KafkaTemplate<字符串,,String>, kafkaTemplate;//才能构造器方式注入,kafkaTemplate public 才能;KafkaSender(字符串,KafkaTemplate, kafkaTemplate), { ,,,this.kafkaTemplate =, kafkaTemplate; ,,} private 才能Gson Gson =, new GsonBuilder()共创(); public 才能;void 发送(String 味精),{ ,,,Message Message =, new 消息(); ,,,message.setId (System.currentTimeMillis ()); ,,,message.setMsg(味精); ,,,message.setSendTime (new 日期()); ,,,log.info (“【+ + + + + + + + + + + + + + + + + +, message : {}】,,, gson.toJson(消息); ,,,//对,topic =, hello2 的发送消息 ,,,kafkaTemplate.send (“hello2", gson.toJson(消息); ,,} }
2.5定义KafkaConsumer
在监听的方法上通过注解配置一个监听器即可,另外就是指定需要监听的话题
卡夫卡的消息再接收端会被封装成ConsumerRecord对象返回,它内部的价值属性就是实际的消息。
@ component @Slf4j public class  KafkaConsumer { @KafkaListener才能(topics =, {“hello2"}) public 才能;void 听(ConsumerRecord<?, ?在,记录),{ ,,,Optional.ofNullable (record.value ()) ,,,,,,,.ifPresent (message →, { ,,,,,,,,,log.info (“【+ + + + + + + + + + + + + + + + +, record =,{},】,,,记录); ,,,,,,,,,log.info (“【+ + + + + + + + + + + + + + + + +, message =,{}】,,,消息); ,,,,,,,}); ,,} }
3。测试效果
提供一个Http接口调用KafkaSender去发送消息
3.1提供Http测试接口
@RestController @Slf4j public class  TestController { @ autowired才能 private 才能;KafkaSender  kafkaSender; @GetMapping才能(“sendMessage/{味精}“) public 才能;void  sendMessage (@PathVariable (“msg"), String 味精){ ,,,kafkaSender.send(味精); ,,} null如何在SpringBoot项目中使用卡夫卡