如何在SpringBoot项目中使用卡夫卡

  介绍

本篇文章给大家分享的是有关如何在SpringBoot项目中使用卡夫卡,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。


搭建卡夫卡和饲养员环境并且启动它们

2。创建演示项目引入spring-kafka

2.1 pom文件

& lt; dependency>   & lt;才能groupId> org.springframework.boot</groupId>   & lt;才能artifactId> spring-boot-starter-web</artifactId>   & lt;/dependency>   & lt; dependency>   & lt;才能groupId> org.springframework.kafka</groupId>   & lt;才能artifactId> spring-kafka</artifactId>   & lt;/dependency>      & lt; dependency>   & lt;才能groupId> com.google.code.gson</groupId>   & lt;才能artifactId> gson</artifactId>   & lt;/dependency>

2.2配置应用程序。yml

春:   ,卡夫卡:   ,,bootstrap-servers: 192.168.25.6:9092  # bootstrap-servers:连接卡夫卡的地址,多个地址用逗号分隔   消费者才能:   ,才能组id:组,myGroup   ,,enable-auto-commit:真实   ,,auto-commit-interval: 100 ms   ,,属性:   ,,,session.timeout.ms: 15000   ,,key-deserializer: org.apache.kafka.common.serialization.StringDeserializer   ,,value-deserializer: org.apache.kafka.common.serialization.StringDeserializer   ,,auto-offset-reset:最早   制作人:才能   ,,,重试:0,#若设置大于0的值,客户端会将发送失败的记录重新发送   ,,,批量大小:16384,#当将多个记录被发送到同一个分区时,,Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)0是缺省的配置   ,,,缓冲区内存:33554432,# Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置   ,,,key-serializer: org.apache.kafka.common.serialization.StringSerializer  #关键字的序列化类   ,,value-serializer: org.apache.kafka.common.serialization.StringSerializer  #值的序列化类

2.3定义消息体消息

/* *   ,* @author ,约翰尼   ,* @create  2020 - 09 - 23,上午21   * */@ data   public  class  Message  {         private 才能Long  id;      private 才能String 味精;      private 才能;Date  sendTime;   }

2.4定义KafkaSender

主要利用KafkaTemplate来发送消息,将消息封装成消息并且进行转化成Json串发送到卡夫卡中

@ component   @Slf4j   public  class  KafkaSender  {      private 才能;final  KafkaTemplate<字符串,,String>, kafkaTemplate;//才能构造器方式注入,kafkaTemplate   public 才能;KafkaSender(字符串,KafkaTemplate, kafkaTemplate), {   ,,,this.kafkaTemplate =, kafkaTemplate;   ,,}      private 才能Gson  Gson =, new  GsonBuilder()共创();      public 才能;void 发送(String 味精),{   ,,,Message  Message =, new 消息();      ,,,message.setId (System.currentTimeMillis ());   ,,,message.setMsg(味精);   ,,,message.setSendTime (new 日期());   ,,,log.info (“【+ + + + + + + + + + + + + + + + + +, message : {}】,,, gson.toJson(消息);   ,,,//对,topic =, hello2 的发送消息   ,,,kafkaTemplate.send (“hello2", gson.toJson(消息);   ,,}      }

2.5定义KafkaConsumer

在监听的方法上通过注解配置一个监听器即可,另外就是指定需要监听的话题
卡夫卡的消息再接收端会被封装成ConsumerRecord对象返回,它内部的价值属性就是实际的消息。

@ component   @Slf4j   public  class  KafkaConsumer  {         @KafkaListener才能(topics =, {“hello2"})   public 才能;void 听(ConsumerRecord<?, ?在,记录),{      ,,,Optional.ofNullable (record.value ())   ,,,,,,,.ifPresent (message →, {   ,,,,,,,,,log.info (“【+ + + + + + + + + + + + + + + + +, record =,{},】,,,记录);   ,,,,,,,,,log.info (“【+ + + + + + + + + + + + + + + + +, message =,{}】,,,消息);   ,,,,,,,});   ,,}      }

3。测试效果

提供一个Http接口调用KafkaSender去发送消息

3.1提供Http测试接口

@RestController   @Slf4j   public  class  TestController  {         @ autowired才能   private 才能;KafkaSender  kafkaSender;         @GetMapping才能(“sendMessage/{味精}“)   public 才能;void  sendMessage (@PathVariable (“msg"), String 味精){   ,,,kafkaSender.send(味精);   ,,}   null

如何在SpringBoot项目中使用卡夫卡