介绍
今天就跟大家聊聊有关Java中的卡夫卡怎么利用API进行调用,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
1。客户端创建对应协议的请求
2。客户端发送请求给对应的经纪人
3。经纪人处理请求,并发送响应给客户端
虽然卡夫卡提供的大量的脚本工具用于各种功能的实现,但很多时候我们还是希望可以把某些功能以编程的方式嵌入到另一个系统中。这时使用Java API的方式就显得异常地灵活了。本文我将尝试给出Java API底层框架的一个范例,同时也会针对“创建主题”和“查看位移”这两个主要功能给出对应的例子。需要提前说明的是,本文给出的范例并没有考虑卡夫卡集群开启安全的情况。另外卡夫卡的KIP4应该一直在优化命令行工具以及各种管理操作,有兴趣的读者可以关注这个客栈。
本文中用到API的依赖于kafka-clients,所以如果你使用Maven构建的话,请加上:
& lt; dependency> & lt; groupId> org.apache.kafka & lt; artifactId> kafka-clients & lt; version> 0.10.2.0 & lt;/dependency>
如果是gradle,请加上:
编译组:& # 39;org.apache.kafka& # 39;,名字:& # 39;kafka-clients& # 39;,版本:& # 39;0.10.2.0& # 39;
<强>底层框架强>
/* * *发送请求主方法 * @param主机目标经纪人的主机名 * @param港口目标经纪人的端口 * @param请求请求对象 * @param apiKey请求类型 * @return序列化后的反应 * @throws IOException */公共ByteBuffer发送(String主机,int港口,AbstractRequest请求,apiKey apiKey)抛出IOException { Socket套接字=(主机、端口)连接; 尝试{ 返回发送(请求、apiKey插座); 最后}{ socket.close (); } }/* * *发送序列化请求并等待响应返回 * @param插座连向目标代理的套接字 * @param请求序列化后的请求 * @return序列化后的反应 * @throws IOException */私人byte [] issueRequestAndWaitForResponse (Socket套接字,byte[]请求)抛出IOException { sendRequest(插座、请求); 返回getResponse(插座); }/* * *发送序列化请求给插座 * @param插座连向目标代理的套接字 * @param请求序列化后的请求 * @throws IOException */私人空间sendRequest (Socket套接字,byte[]请求)抛出IOException { DataOutputStream dos=new DataOutputStream (socket.getOutputStream ()); dos.writeInt (request.length); dos.write(请求); dos.flush (); }/* * *从给定插座处获取响应 * @param插座连向目标代理的套接字 * @return获取到的序列化后的反应 * @throws IOException */私人byte [] getResponse (Socket套接字)抛出IOException { DataInputStream说=零; 尝试{ 说=new DataInputStream (socket.getInputStream ()); byte[]反应=new byte [dis.readInt ()); dis.readFully(响应); 返回响应; 最后}{ 如果(dis !=null) { dis.close (); } } }/* * *创建插座连接 * @param主机名目标代理主机名 * @param港口目标代理服务端口,比如9092 * @return创建的套接字连接 * @throws IOException */私人的套接字连接(主机名的字符串,int端口)抛出IOException { 返回新的套接字(主机名、端口); }/* * *向给定套接字发送请求 * @param请求请求对象 * @param apiKey请求类型,即属于哪种请求 * @param插座连向目标代理的套接字 * @return序列化后的反应 * @throws IOException */私人ByteBuffer发送(AbstractRequest请求、apiKey apiKey Socket套接字)抛出IOException { RequestHeader头=new RequestHeader (apiKey。id, request.version (),“client-id", 0); ByteBuffer缓冲=ByteBuffer.allocate (header.sizeOf () + request.sizeOf ()); header.writeTo(缓冲); request.writeTo(缓冲); byte [] serializedRequest=buffer.array (); byte []=issueRequestAndWaitForResponse反应(套接字,serializedRequest); ByteBuffer responseBuffer=ByteBuffer.wrap(响应); ResponseHeader.parse (responseBuffer); 返回responseBuffer; }
有了这些方法的铺垫,我们就可以创建具体的请求了。
<强>创建主题强>
/* * *创建主题 *由于只是样例代码,有些东西就硬编码写到程序里面了(比如主机名和端口),各位看官自行修改即可 * @param topicName主题名 * @param分区分区数 * @param replicationFactor副本数 * @throws IOException */topicName公共空createTopics(字符串,整数分区,短replicationFactor)抛出IOException { CreateTopicsRequest.TopicDetails> MapJava中的卡夫卡怎么利用API进行调用