介绍
本篇内容介绍了”如何使用消息队列”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
1概述
1.1基本概念
<编辑> 1.1.1经纪人代理编辑>已发布的消息保存在一组服务器中,称为卡夫卡集群。集群中的每个服务器都是一个经纪人。
<编辑> 1.1.2主题主题编辑>通过主题机制对消息进行分类,可以认为每个话题就是一个队列。
<编辑> 1.1.3分区分区编辑>每个主题可以有多个分区,主要为了提高并发而设计。相同主题下不同分区可以并发接收消息,同时也能供消费者并发拉取消息。有多少分区就有多少并发量。
在卡夫卡服务器上,分区是以文件目录的形式存在的。每个分区目录中,卡夫卡会按配置大小及配置周期将分区拆分成多个段文件(LogSegment),每个段由三部分组成:
安康;日志文件:* . log 安康;位移索引文件:* .index 安康;时间索引文件:* .timeindex
其中<代码> * . log> 代码用于存储消息本身的数据内容,<代码> * .index> 代码存储消息在文件中的位置(包括消息的逻辑抵消和物理存储抵消),<代码> * .timeindex> 代码存储消息创建时间和对应逻辑地址的映射关系。
将分区拆分成多个段是为了控制存储文件大小。可以很方便的通过操作系统<代码> mmap 代码>机制映射到内存中,提高写入和读取效率,同时还有一个好处就是,当系统要清除过期数据时,可以直接将过期的段文件删除。
如果每个消息都要在<代码>指数> 代码中保存位置信息,<代码>指数代码>文件自身大小也很容易变的很大。所以卡夫卡将指数<代码> 代码>设计为稀疏索引来减小指数<代码> 代码>文件的大小。
<编辑> 1.1.4复制副本编辑>消息冗余数量。不能超过集群中经纪人的数量。
1.2基本操作
<编辑> 1.2.1主题相关一直编辑> <>之前#,创建Topic #,——topic 主题名称,避免使用[_]及[]号 #,——replication-factor 副本数量(不能超过经纪人节点数) #,——partitions 分区数量(并发)/bin/kafka-topics.sh ——create \ ——topic UserDataQueue \ ——replication-factor 3 \ ——partitions 5 \ ——bootstrap-server localhost: 9092, localhost: 9093, localhost: 9094 #,查看主题/bin/kafka-topics.sh ——list \ ——bootstrap-server localhost: 9092, localhost: 9093, localhost: 9094 #,修改话题 #,删除主题 <编辑> 1.2.2消息相关一直编辑> <>之前#,发送消息 #,——topic 指定目标话题/bin/kafka-console-producer.sh \ ——topic UserDataQueue \ ——bootstrap-server localhost: 9092, localhost: 9093, localhost: 9094 #,拉取消息 #,——from-beginning 从头开始(获取现有的全量数据)/bin/kafka-console-consumer.sh \ ——topic UserDataQueue \ ——bootstrap-server localhost: 9092, localhost: 9093, localhost: 9094 \ ——从头2集群配置
卡夫卡集群依赖于管理员。
2.1管理员配置及启动
#,需要修改的参数 #,从而directory where 从而snapshot is 存储。 dataDir=/卡夫卡/zkdata #,从而port at  which 从而clients will 连接 clientPort=2182
#,启动/bin/zookeeper-server-start.sh -daemon /卡夫卡zookeeper.properties
2.2卡夫卡配置及启动
#,需修改参数 #,从而id of ,代理只却;能够must be set 用a unique integer for each 代理。 broker.id=1,, #,同一集群内ID必须唯一 #,从而address 从而socket server listens 又是;It will get 从而value returned 得到; #,java.net.InetAddress.getCanonicalHostName (), if not 配置。 #,,,格式: #,,,,,listeners =, listener_name://host_name:端口 #,,,例如: #,,,,,listeners =,明文://your.host.name: 9092 听众=明文://localhost: 9092,, #,同一主机的话,端口号不能相同 #,A comma  separated list of directories under which 用store log 文件 log.dirs=/卡夫卡/data01 , #,日志存储目录,需做隔离 #,Zookeeper connection  string 阅读;Zookeeper  docs for 细节)。 #,却;能够is a  comma separated 主持人:port 对,,each corresponding 用a zk #,服务器只如只“127.0.0.1:3000,127.0.0.1:3001 127.0.0.1:3002"。 #,你还要also append an optional chroot string 用,urls 用specify #,root directory  for all kafka znodes。 zookeeper.connect=localhost: 2182,, #,饲养员连接地址,参见2.1,zk配置如何使用消息队列