怎么在春天引导中使用KafkaAdminClient集群管理工具?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
<强>原理介绍强>
在卡夫卡官网中这么描述AdminClient: AdminClient API支持管理和检查主题、经纪人、acl,卡夫卡和其他对象。具体的KafkaAdminClient包含了一下几种功能(以Kafka1.0.0版本为准):
- <李>
创建主题:createTopics (Collection
删除主题:deleteTopics (Collection
罗列所有主题:listTopics ()
李> <李>查询主题:describeTopics (Collection
查询集群信息:describeCluster ()
李> <李>查询ACL信息:describeAcls (AclBindingFilter过滤器)
李> <李>创建ACL信息:createAcls (Collection
删除ACL信息:deleteAcls (Collection
查询配置信息:describeConfigs (Collection
修改配置信息:alterConfigs (Map
修改副本的日志目录:alterReplicaLogDirs (Map
查询节点的日志目录信息:describeLogDirs (Collection
查询副本的日志目录信息:describeReplicaLogDirs (Collection
增加分区:createPartitions (Map<字符串,NewPartitions>李newPartitions)
>其内部原理是使卡夫卡用自定义的一套二进制协议来实现,详细可以参见卡夫卡协议。主要实现步骤:
客户端根据方法的调用创建相应的协议请求,比如创建主题的createTopics方法,其内部就是发送CreateTopicRequest请求。
客户端发送请求至卡夫卡代理。
卡夫卡经纪人处理相应的请求并回执,比如与CreateTopicRequest对应的是CreateTopicResponse。
客户端接收相应的回执并进行解析处理。
和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和回执类的两个基本父类。
代码如下
@ component public class  KafkaConfig {//,,,配置卡夫卡 public 才能;Properties  getProps () { ,,,Properties props =, new 属性(); ,,,props.put (“bootstrap.servers",,“localhost: 9092“);/*,,,,props.put (“retries",, 2),,//,重试次数 ,,,props.put (“batch.size",, 16384),,//,批量发送大小 ,,,props.put (“buffer.memory",, 33554432),,//,缓存大小,根据本机内存大小配置 ,,,props.put (“linger.ms",, 1000),,//,发送频率,满足任务一个条件发送*/,,,props.put (“key.serializer",,“org.apache.kafka.common.serialization.StringSerializer"); ,,,props.put (“value.serializer",,“org.apache.kafka.common.serialization.StringSerializer"); ,,,return 道具; ,,} }
@RestController public class  KafkaTopicManager { @ autowired才能 private 才能;KafkaConfig  kafkaConfig; @GetMapping才能(“createTopic") public 才能;void  createTopic () { ,,,AdminClient AdminClient =, KafkaAdminClient.create (kafkaConfig.getProps ()); ,,,NewTopic NewTopic =, new NewTopic (“test1" 4,(短),1); ,,,Collection, newTopicList =, new ArrayList<在(); ,,,newTopicList.add (newTopic); ,,,adminClient.createTopics (newTopicList); ,,,adminClient.close (); ,,} @GetMapping才能(“deleteTopic") public 才能;void  deleteTopic () { ,,,AdminClient AdminClient =, KafkaAdminClient.create (kafkaConfig.getProps ()); ,,,adminClient.deleteTopics (arrays . aslist (“test1")); ,,,adminClient.close (); ,,} @GetMapping才能(“listAllTopic") public 才能;void  listAllTopic () { ,,,AdminClient AdminClient =, KafkaAdminClient.create (kafkaConfig.getProps ()); ,,,ListTopicsResult result =, adminClient.listTopics (); ,,,KafkaFuture 祝辞,names =, result.names (); ,,,try { ,,,,,names.get () .forEach ((k)→{ ,,,,,,,System.out.println (k); ,,,,,}); ,,,},catch (InterruptedException |, ExecutionException e), { ,,,,,e.printStackTrace (); ,,,} ,,,adminClient.close (); ,,} @GetMapping才能(“getTopic") public 才能;void  getTopic () { null null null null null null null null null null null null null null null null 怎么在春天引导中使用KafkaAdminClient集群管理工具