如何使用消息队列

  介绍

本篇内容介绍了”如何使用消息队列”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

1概述

1.1基本概念
<编辑> 1.1.1经纪人代理

已发布的消息保存在一组服务器中,称为卡夫卡集群。集群中的每个服务器都是一个经纪人。

<编辑> 1.1.2主题主题

通过主题机制对消息进行分类,可以认为每个话题就是一个队列。

<编辑> 1.1.3分区分区

每个主题可以有多个分区,主要为了提高并发而设计。相同主题下不同分区可以并发接收消息,同时也能供消费者并发拉取消息。有多少分区就有多少并发量。

在卡夫卡服务器上,分区是以文件目录的形式存在的。每个分区目录中,卡夫卡会按配置大小及配置周期将分区拆分成多个段文件(LogSegment),每个段由三部分组成:

安康;日志文件:* . log
  安康;位移索引文件:* .index
  安康;时间索引文件:* .timeindex 

其中<代码> * . log> * .index> * .timeindex>

将分区拆分成多个段是为了控制存储文件大小。可以很方便的通过操作系统<代码> mmap 机制映射到内存中,提高写入和读取效率,同时还有一个好处就是,当系统要清除过期数据时,可以直接将过期的段文件删除。

如果每个消息都要在<代码>指数> 指数文件自身大小也很容易变的很大。所以卡夫卡将指数<代码> 设计为稀疏索引来减小指数<代码> 文件的大小。

<编辑> 1.1.4复制副本

消息冗余数量。不能超过集群中经纪人的数量。

1.2基本操作
<编辑> 1.2.1主题相关一直编辑 <>之前#,创建Topic    #,——topic 主题名称,避免使用[_]及[]号   #,——replication-factor 副本数量(不能超过经纪人节点数)   #,——partitions 分区数量(并发)/bin/kafka-topics.sh ——create  \   ——topic  UserDataQueue  \   ——replication-factor  3 \   ——partitions  5 \   ——bootstrap-server  localhost: 9092, localhost: 9093, localhost: 9094      #,查看主题/bin/kafka-topics.sh ——list  \   ——bootstrap-server  localhost: 9092, localhost: 9093, localhost: 9094      #,修改话题   #,删除主题 <编辑> 1.2.2消息相关一直编辑 <>之前#,发送消息   #,——topic 指定目标话题/bin/kafka-console-producer.sh  \   ——topic  UserDataQueue  \   ——bootstrap-server  localhost: 9092, localhost: 9093, localhost: 9094      #,拉取消息   #,——from-beginning 从头开始(获取现有的全量数据)/bin/kafka-console-consumer.sh  \   ——topic  UserDataQueue  \   ——bootstrap-server  localhost: 9092, localhost: 9093, localhost: 9094 \   ——从头

2集群配置

卡夫卡集群依赖于管理员。

2.1管理员配置及启动
 #,需要修改的参数
  
  #,从而directory  where 从而snapshot  is 存储。
  dataDir=/卡夫卡/zkdata
  #,从而port  at  which 从而clients  will 连接
  clientPort=2182 
 #,启动/bin/zookeeper-server-start.sh  -daemon /卡夫卡zookeeper.properties 
2.2卡夫卡配置及启动
 #,需修改参数
  
  #,从而id  of ,代理只却;能够must  be  set 用a  unique  integer  for  each 代理。
  broker.id=1,, #,同一集群内ID必须唯一
  
  #,从而address 从而socket  server  listens 又是;It  will  get 从而value  returned 得到;
  #,java.net.InetAddress.getCanonicalHostName (), if  not 配置。
  #,,,格式:
  #,,,,,listeners =, listener_name://host_name:端口
  #,,,例如:
  #,,,,,listeners =,明文://your.host.name: 9092
  听众=明文://localhost: 9092,, #,同一主机的话,端口号不能相同
  
  #,A  comma  separated  list  of  directories  under  which 用store  log 文件
  log.dirs=/卡夫卡/data01 , #,日志存储目录,需做隔离
  
  #,Zookeeper  connection  string 阅读;Zookeeper  docs  for 细节)。
  #,却;能够is  a  comma  separated 主持人:port 对,,each  corresponding 用a  zk
  #,服务器只如只“127.0.0.1:3000,127.0.0.1:3001 127.0.0.1:3002"。
  #,你还要also  append  an  optional  chroot  string 用,urls 用specify 
  #,root  directory  for  all  kafka  znodes。
  zookeeper.connect=localhost: 2182,, #,饲养员连接地址,参见2.1,zk配置

如何使用消息队列