使用golang怎么实现一个分布式延时队列服务?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
<强>名词解释强>
topic_list队列:每一个来的延时请求都应该又一个延时主题参考卡夫卡,在逻辑上划分出一个队列出来每个业务分开处理;
topic_info队列:每一个队列主题都存在一个新的队列里,每次扫描主题信息检测新主题的建立与销毁管理服务协程数量;
抵消:当前消费的进度;
new_offset:新消费的进度,预备更迭抵消;
topic_offset_lock:分布式锁。
<强>二、设计目标强>
<强>,功能清单强>
1,延时信息添加接口基于http调用
2,拥有存储队列特性,可保存近3天内的队列消费数据
3,提供消费功能
4,延时通知
<>强性能指标强>
预计接口的调用量:单秒单类任务数3500多秒单类任务数1300
压测结果:
<强>简单压测强>
wrk写入每秒:259.3秒写入9000条记录单线程无并发
触发性能/准确率:单秒1000,在测试机无延长。单3000秒时,偶尔出现1 - 2秒延迟。受内存和cpu的影响。
<强>三,系统设计强>
<强>交互流程强>
时序图
本设计基于http接口调用,当向主题存在的队列中添加消息的时候,消息会被添加到相应的主题队列的末尾储存,当添加到不存在的相应的主题队列时,首先建立队新主题列,当定时器触发的时候或者分布式锁,抢到锁的实例先获得相应队列的抵消,设置新偏移量,就可以释放锁了让给其他实例争抢,弹出队列头一定数量元素,然后拿到抵消段的实例去存储中拿详细信息,在协程中处理,主要协程等待下次触发。然后添加协程去监控触发。
<强>模块划分强>
1,队列存储模块
1·延迟下的delay.base模块,主要负责接收写请求,将队列信息写入存储,不负责后端逻辑,调用存储模块
2,后端模块.delay下的延迟。后端模块,负责时间触发扫描对应的主题队列,调用存储模块,主要负责访问读取存储模块,调用回调模块
1·扫描主题添加groutine
2·扫描topic_list消费信息
3·扫描topic_list如果一定时间没有消费到则关闭groutine
3,调模块,主要负责发送已经到时间的数据,向相应服务通知
3,存储模块
1·分布式锁模块,系统多机部署,保证每次消费的唯一性,对每次主题消费的抵消段进行上锁抵消到new_offset段单机独享
2·主题管理列表、管理主题数量控制协程数
3·topic_list消息队列
4·topic_info消息实体,可能需要回调中会携带一些信息统一处理
4,唯一号生成模块。
<强>五、缓存设计强>
目前使用全缓存模式
键设计:
主题管理列出关键:XX: DELAY_TOPIC_LIST类型:列表
topic_list关键:XX: DELAY_SIMPLE_TOPIC_TASK - % s(根据主题分键)类型:zset
topic_info关键:XX: DELAY_REALL_TOPIC_TASK - % s(根据主题分键)类型:哈希
topic_offset关键:XX: DELAY_TOPIC_OFFSET - % s(根据主题分键)类型:字符串
topic_lock关键:XX: DELAY_TOPIC_RELOAD_LOCK - % s(根据主题分键)类型:字符串
<强>六,接口设计强>
<强> delay.task。addv1(延时队列添加v1) 强>
请求示例
curl -d & # 39;{ “topic"才能:,“xxx",,//,业务的话题 “timing_moment"才能:,,,,,,//,单位秒,要定时时刻 “content"才能:,“{}”;//,消息体,json串 }& # 39; & # 39;http://127.0.0.1: xxxx/延期/任务/添加# 39;
返回示例
{ “dm_error"才能:,0, “error_msg"才能:,“操作成功”, “task_id"才能:112345465765 }
<>强拉回调方式返回(v2不再支持)强>
请求示例
curl -d & # 39;{ “topic"才能:,“xxxx",,//,业务的话题 “task_id"才能:1324568798765//,taskid,选填,有则返回特定消息 }& # 39; & # 39;http://127.0.0.1: xxxx/延期/任务/拉# 39;
返回示例
{ “dm_error"才能:,0, “error_msg"才能:,“操作成功”; “content"才能:“{“\ xxx" \}“; }
<强> delay.task。addv2(延时队列添加v2) 强>
请求示例