怎么在golang中实现一个复述,延时消息队列功能

  介绍

怎么在golang中实现一个复述,延时消息队列功能吗?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

<强>提前准备安装复述,redis-go

因为用的是macOS,直接

,美元brew  install 复述   之前,美元go  get github.com/garyburd/redigo/redis

又因为比较懒,生成任务的唯一id时,直接采用了bson中的objectId,所以:

,美元go  get  gopkg.in/分别。v2/bson

唯一id不是必须有,但如果之后有实际应用需要携带,便于查找相应任务。

<强>生产者

通过一个为循环生成10 w个任务,每一个任务有不同的时间

func 生产者(),{   ,count :=0   ,//生成100000个任务   ,for  count  & lt; 100000, {   数+ +   ,dealTime :=, int64 (rand.Intn (5)), +, time.Now () .Unix ()   ,uuid :=, bson.NewObjectId () .Hex ()   ,redis.Client.AddJob(及job.JobMessage {   ,Id: uuid、   ,DealTime: DealTime,   ,},+,int64 (dealTime))   ,}   }

其中AddJob函数在另一个包中,将上一个函数中随机生成的时间作为需要处理的时间戳。

//,添加任务
  func  (client  * RedisClient), AddJob (msg  * job.JobMessage, dealTime  int64), {
  ,conn :=, client.Get ()
  ,defer  conn.Close ()
  
  ,key :=,“JOB_MESSAGE_QUEUE"
  ,conn.Do (“zadd",钥匙,,dealTime,, util.JsonEncode(味精))
  }

<>强消费者

消费者处理流程分为两个步骤:

<李>

获取小于等于当前时间戳的任务

<李>

通过删除当前任务来判断谁获得了当前任务

因为在获取小于等于当前时间戳的任务时,可能有多个走常规同时读到了当前任务,而只有一个任务可以来处理当前任务。因此我们需要通过一个方案来判断究竟由谁来处理这个任务(当然如果只有一个消费者可以读到就直接处理):这个时候可以通过复述的删除操作来获取,因为删除指定值时只有成功的操作才会返回不为0,所以我们可以认为删除当前队列成功的那个常规拿去到了当前的任务。

下面是代码:

//,消费者   func 消费者(),{   ,//启动10个go 常规一起去拿   ,count :=0   ,for  count  & lt; 10, {   ,go  func (), {   ,for  {   ,jobs :=, redis.Client.GetJob ()   ,if  len(工作),& lt;=, 0, {   time . sleep才能(time.Second  *, 1)   ,继续   ,}   ,currentJob :[0]=,工作   ,//如果当前抢复述队列成功,   ,if  redis.Client.DelJob (currentJob),祝辞,0,{   var 才能;jobMessage  job.JobMessage   util.JsonDecode才能(currentJob,,, jobMessage),//自定义的json解析函数   handleMessage才能(及jobMessage)   ,}      ,}      ,}()   数+ +   ,}   }//,处理任务用函数   func  handleMessage (msg  * job.JobMessage), {   ,fmt.Printf (“deal 工作:% s,, require 时间:,% d  \ n",, msg.Id,, msg.DealTime)   ,go  func (), {   ,countChan  & lt;作用;真实的   ,}()   }

复述,部分的代码,获取任务和删除任务

//,获取任务   func  (client  * RedisClient), GetJob (), [] string  {   ,conn :=, client.Get ()   ,defer  conn.Close ()      ,key :=,“JOB_MESSAGE_QUEUE"   ,timeNow :=, time.Now () .Unix ()   ,受潮湿腐烂,err :=, redis.Strings (conn.Do (“zrangebyscore",,钥匙,,0,,timeNow,,“limit",, 0, 1))   ,if  err  !=, nil  {   ,恐慌(err)   ,}   return 受潮湿腐烂   }//,删除当前任务,,用来判断是否抢到了当前任务   func  (client  * RedisClient), DelJob (value 字符串),int  {   ,conn :=, client.Get ()   ,defer  conn.Close ()      ,key :=,“JOB_MESSAGE_QUEUE"   ,受潮湿腐烂,err :=, redis.Int (conn.Do (“zrem",,钥匙,,值))   ,if  err  !=, nil  {   ,恐慌(err)   ,}   return 受潮湿腐烂   }

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注行业资讯频道,感谢您对的支持。

怎么在golang中实现一个复述,延时消息队列功能