最近在用golang做一些复述,相关的操作,选用了redigo这个第三方库,然后在使用Pub/Sub的时候,却发现了一个小坑……
首先,我们来初始化一个带连接池的复述,客户:
导入( "github.com/gomodule/redigo/redis” ) RedisClient struct类型{ 池* redis.Pool } func NewRedisClient (addr字符串,db int, passwd字符串)* RedisClient { 池:=,redis.Pool { MaxIdle: 10, IdleTimeout: 300 * time.Second, 表盘:func()(复述。康涅狄格州,错误){ c,犯错:=复述。刻度盘(tcp、addr redis.DialPassword (passwd), redis.DialDatabase (db)) 如果犯错!=nil { 返回nil,犯错 } c,返回零 }, TestOnBorrow: func (c复述。康涅狄格州,t time.Time)错误{ 如果time.Since (t) & lt;时间。分钟{ 返回nil } _,犯错:=c.Do(“平”) 返回错 }, } 日志。Printf(“新复述,在% s池”,addr) 客户:=,RedisClient { 池:池, } 返回客户端 }
然后我们可以简单的实现一个发布方法:
func (r * RedisClient)发布(通道、消息字符串)(int,错误){ c:=r.pool.Get () 推迟c.Close () n,犯错:=redis.Int (c。(“发布”,频道消息)) 如果犯错!=nil { 返回0,fmt。Errorf(“复述,发布% s % s,犯错:% v”,频道信息,犯错) } 返回n,零 }
接下来就是一个稍微复杂点的带有心跳的订阅方法:
func (r * RedisClient)订阅(ctx上下文。背景下,消费ConsumeFunc、通道……错误字符串){ psc:=复述。PubSubConn{康涅狄格州:r.pool.Get ()} 推迟psc.Close () 日志。Printf("复述,pubsub订阅频道:% v”,频道) 如果犯错:=psc.Subscribe (redis.Args {} .AddFlat(频道)…);犯错!=nil { 返回错 } 完成:=(陈错误,1)//启动一个新的goroutine接收消息 去func () { 为{ 开关味精:=psc.Receive()。(类型){ 案例的错误: & lt;——fmt。Errorf(“复述,pubsub收到错误:% v”,味精) 返回 案例redis.Message: 如果犯错:=消费(味精);犯错!=nil { & lt; -犯错 返回 } 案例redis.Subscription: 如果味精。数==0 {//所有的渠道都没订阅 & lt; - nil 返回 } } } }()//健康检查 蜱虫:=time.NewTicker (time.Minute) 推迟tick.Stop () 为{ 选择{ & lt; -ctx.Done (): 如果犯错:=psc.Unsubscribe ();犯错!=nil { fmt返回。Errorf(“复述,pubsub退订犯错:% v”,呃) } 返回nil 案例犯错:=& lt;完成: 返回错 案例& lt; -tick.C: 如果犯错:=psc.Ping (" ");犯错!=nil { 返回错 } } } 返回nil }
最后,我们写一个简单地主要函数来调用发布,订阅:
func (r * RedisClient)订阅(ctx上下文。背景下,消费ConsumeFunc、通道……错误字符串){ psc:=复述。PubSubConn{康涅狄格州:r.pool.Get ()} 推迟psc.Close () 日志。Printf("复述,pubsub订阅频道:% v”,频道) 如果犯错:=psc.Subscribe (redis.Args {} .AddFlat(频道)…);犯错!=nil { 返回错 } 完成:=(陈错误,1)//启动一个新的goroutine接收消息 去func () { 为{ 开关味精:=psc.Receive()。(类型){ 案例的错误: & lt;——fmt。Errorf(“复述,pubsub收到错误:% v”,味精) 返回 案例redis.Message: 如果犯错:=消费(味精);犯错!=nil { & lt; -犯错 返回 } 案例redis.Subscription: 如果味精。数==0 {//所有的渠道都没订阅 & lt; - nil 返回 } } } }()//健康检查 蜱虫:=time.NewTicker (time.Minute) 推迟tick.Stop () 为{ 选择{ & lt; -ctx.Done (): 如果犯错:=psc.Unsubscribe ();犯错!=nil { fmt返回。Errorf(“复述,pubsub退订犯错:% v”,呃) } 返回nil 案例犯错:=& lt;完成: 返回错 案例& lt; -tick.C: 如果犯错:=psc.Ping (" ");犯错!=nil { 返回错 } } } 返回nil }
咋一看之下,好像并没有什么异常?然而,如果我们这时候去看复述的tcp连接,就可以发现一些猫腻: