使用golang怎么实现一个分布式延时队列服务

  介绍

使用golang怎么实现一个分布式延时队列服务?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

<强>名词解释

topic_list队列:每一个来的延时请求都应该又一个延时主题参考卡夫卡,在逻辑上划分出一个队列出来每个业务分开处理;

topic_info队列:每一个队列主题都存在一个新的队列里,每次扫描主题信息检测新主题的建立与销毁管理服务协程数量;

抵消:当前消费的进度;

new_offset:新消费的进度,预备更迭抵消;

topic_offset_lock:分布式锁。

<强>二、设计目标

<强>,功能清单

1,延时信息添加接口基于http调用

2,拥有存储队列特性,可保存近3天内的队列消费数据

3,提供消费功能

4,延时通知

<>强性能指标

预计接口的调用量:单秒单类任务数3500多秒单类任务数1300

压测结果:

<强>简单压测

wrk写入每秒:259.3秒写入9000条记录单线程无并发

触发性能/准确率:单秒1000,在测试机无延长。单3000秒时,偶尔出现1 - 2秒延迟。受内存和cpu的影响。

<强>三,系统设计

<强>交互流程

时序图

使用golang怎么实现一个分布式延时队列服务

本设计基于http接口调用,当向主题存在的队列中添加消息的时候,消息会被添加到相应的主题队列的末尾储存,当添加到不存在的相应的主题队列时,首先建立队新主题列,当定时器触发的时候或者分布式锁,抢到锁的实例先获得相应队列的抵消,设置新偏移量,就可以释放锁了让给其他实例争抢,弹出队列头一定数量元素,然后拿到抵消段的实例去存储中拿详细信息,在协程中处理,主要协程等待下次触发。然后添加协程去监控触发。

<强>模块划分

1,队列存储模块

1·延迟下的delay.base模块,主要负责接收写请求,将队列信息写入存储,不负责后端逻辑,调用存储模块

2,后端模块.delay下的延迟。后端模块,负责时间触发扫描对应的主题队列,调用存储模块,主要负责访问读取存储模块,调用回调模块

1·扫描主题添加groutine

2·扫描topic_list消费信息

3·扫描topic_list如果一定时间没有消费到则关闭groutine

3,调模块,主要负责发送已经到时间的数据,向相应服务通知

3,存储模块

1·分布式锁模块,系统多机部署,保证每次消费的唯一性,对每次主题消费的抵消段进行上锁抵消到new_offset段单机独享

2·主题管理列表、管理主题数量控制协程数

3·topic_list消息队列

4·topic_info消息实体,可能需要回调中会携带一些信息统一处理

4,唯一号生成模块。

<强>五、缓存设计

目前使用全缓存模式

键设计:

主题管理列出关键:XX: DELAY_TOPIC_LIST类型:列表

topic_list关键:XX: DELAY_SIMPLE_TOPIC_TASK - % s(根据主题分键)类型:zset

topic_info关键:XX: DELAY_REALL_TOPIC_TASK - % s(根据主题分键)类型:哈希

topic_offset关键:XX: DELAY_TOPIC_OFFSET - % s(根据主题分键)类型:字符串

topic_lock关键:XX: DELAY_TOPIC_RELOAD_LOCK - % s(根据主题分键)类型:字符串

<强>六,接口设计

<强> delay.task。addv1(延时队列添加v1)

请求示例

curl  -d    & # 39;{   “topic"才能:,“xxx",,//,业务的话题   “timing_moment"才能:,,,,,,//,单位秒,要定时时刻   “content"才能:,“{}”;//,消息体,json串   }& # 39;   & # 39;http://127.0.0.1: xxxx/延期/任务/添加# 39;

返回示例

{   “dm_error"才能:,0,   “error_msg"才能:,“操作成功”,   “task_id"才能:112345465765   }

<>强拉回调方式返回(v2不再支持)

请求示例

curl  -d    & # 39;{   “topic"才能:,“xxxx",,//,业务的话题   “task_id"才能:1324568798765//,taskid,选填,有则返回特定消息   }& # 39;   & # 39;http://127.0.0.1: xxxx/延期/任务/拉# 39;

返回示例

{   “dm_error"才能:,0,   “error_msg"才能:,“操作成功”;   “content"才能:“{“\ xxx" \}“;   }

<强> delay.task。addv2(延时队列添加v2)

请求示例

使用golang怎么实现一个分布式延时队列服务