怎么在春天引导中使用KafkaAdminClient集群管理工具

  介绍

怎么在春天引导中使用KafkaAdminClient集群管理工具?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

<强>原理介绍

在卡夫卡官网中这么描述AdminClient: AdminClient API支持管理和检查主题、经纪人、acl,卡夫卡和其他对象。具体的KafkaAdminClient包含了一下几种功能(以Kafka1.0.0版本为准):

<李>

创建主题:createTopics (Collection李newTopics)

<李>

删除主题:deleteTopics (Collection李主题)

<李>

罗列所有主题:listTopics ()

<李>

查询主题:describeTopics (Collection李topicNames)

<李>

查询集群信息:describeCluster ()

<李>

查询ACL信息:describeAcls (AclBindingFilter过滤器)

<李>

创建ACL信息:createAcls (Collection李ACL)

<李>

删除ACL信息:deleteAcls (Collection李过滤器)

<李>

查询配置信息:describeConfigs (Collection李资源)

<李>

修改配置信息:alterConfigs (Map李配置)

<李>

修改副本的日志目录:alterReplicaLogDirs (Map李replicaAssignment)

<李>

查询节点的日志目录信息:describeLogDirs (Collection李代理)

<李>

查询副本的日志目录信息:describeReplicaLogDirs (Collection李副本)

<李>

增加分区:createPartitions (Map<字符串,NewPartitions>李newPartitions)

其内部原理是使卡夫卡用自定义的一套二进制协议来实现,详细可以参见卡夫卡协议。主要实现步骤:

客户端根据方法的调用创建相应的协议请求,比如创建主题的createTopics方法,其内部就是发送CreateTopicRequest请求。
客户端发送请求至卡夫卡代理。

卡夫卡经纪人处理相应的请求并回执,比如与CreateTopicRequest对应的是CreateTopicResponse。
客户端接收相应的回执并进行解析处理。

和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和回执类的两个基本父类。

代码如下

@ component   public  class  KafkaConfig {//,,,配置卡夫卡   public 才能;Properties  getProps () {   ,,,Properties  props =, new 属性();   ,,,props.put (“bootstrap.servers",,“localhost: 9092“);/*,,,,props.put (“retries",, 2),,//,重试次数   ,,,props.put (“batch.size",, 16384),,//,批量发送大小   ,,,props.put (“buffer.memory",, 33554432),,//,缓存大小,根据本机内存大小配置   ,,,props.put (“linger.ms",, 1000),,//,发送频率,满足任务一个条件发送*/,,,props.put (“key.serializer",,“org.apache.kafka.common.serialization.StringSerializer");   ,,,props.put (“value.serializer",,“org.apache.kafka.common.serialization.StringSerializer");   ,,,return 道具;   ,,}      } @RestController   public  class  KafkaTopicManager  {      @ autowired才能   private 才能;KafkaConfig  kafkaConfig;      @GetMapping才能(“createTopic")   public 才能;void  createTopic () {   ,,,AdminClient  AdminClient =, KafkaAdminClient.create (kafkaConfig.getProps ());      ,,,NewTopic  NewTopic =, new  NewTopic (“test1" 4,(短),1);   ,,,Collection, newTopicList =, new  ArrayList<在();   ,,,newTopicList.add (newTopic);   ,,,adminClient.createTopics (newTopicList);      ,,,adminClient.close ();   ,,}   @GetMapping才能(“deleteTopic")   public 才能;void  deleteTopic () {   ,,,AdminClient  AdminClient =, KafkaAdminClient.create (kafkaConfig.getProps ());   ,,,adminClient.deleteTopics (arrays . aslist (“test1"));   ,,,adminClient.close ();   ,,}   @GetMapping才能(“listAllTopic")   public 才能;void  listAllTopic () {   ,,,AdminClient  AdminClient =, KafkaAdminClient.create (kafkaConfig.getProps ());   ,,,ListTopicsResult  result =, adminClient.listTopics ();   ,,,KafkaFuture祝辞,names =, result.names ();   ,,,try  {   ,,,,,names.get () .forEach ((k)→{   ,,,,,,,System.out.println (k);   ,,,,,});   ,,,},catch  (InterruptedException  |, ExecutionException  e), {   ,,,,,e.printStackTrace ();   ,,,}   ,,,adminClient.close ();   ,,}   @GetMapping才能(“getTopic")   public 才能;void  getTopic () {   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null

怎么在春天引导中使用KafkaAdminClient集群管理工具