我们有一个用去做的项目,其中用到了zmq4进行通信,一个简单的rpc过程,早期远端是使用一个地图去做ip和具体套接字的映射。
大概是这样
struct SocketMap { sync.Mutex 套接字map [string] * zmq4.Socket }
然后调用的时候的代码大概就是这样的:
func (pushList * SocketMap)推动(ip字符串,[]的数据字节){ pushList.Lock () 推迟pushList.UnLock () 插座:=pushList.sockets(字符串) 如果套接字==nil { 插座:=zmq4.NewSocket ()//做一些初始操作连接 pushList。套接字(ip)=套接字 } socket.Send(数据) }
相信大家都能看出问题:当被推并发访问的时候(事实上推会经常被并发访问),由于这把大锁的存在,同时只能有一个协程在临界区工作,效率是会被大大降低的。
所以我们决定使用sync.Map来替代这个设计,然后出了第一版代码,写的非常简单,只做了简单的替换:
struct SocketMap { 套接字sync.Map } func (pushList * SocketMap)推动(ip字符串,[]的数据字节){ var插座* zmq4.Socket socketInter吧=pushList.sockets.Load (ip) 如果!好{ 套接字=zmq4.NewSocket ()//做一些初始操作连接 pushList.sockets。存储(ip套接字) 其他}{ 套接字=socketInter。(* zmq4.Socket) } socket.Send(数据) }
乍一看似乎没什么问题?但是跑起来总是爆炸,然后一看日志,提示有个非法地址。后来在github上才看的到,zmq4.Socket不是线程安全的。上面的代码恰恰会造成多个线程同时拿到套接字实例,然后就崩溃了。
然后怎么办呢?看来也只能加锁了,不过这次加锁不能加到整个地图上,否则还会有性能问题,那就考虑减小锁的粒度吧,使用锁包装插座。这个时候我们的代码也就呼之欲出了:
struct SocketMutex { sync.Mutex 套接字* zmq4.Socket } struct SocketMap { 套接字sync.Map } func (pushList * SocketMap)推动(ip字符串,[]的数据字节){ var插座* SocketMutex socketInter吧=pushList.sockets.Load (ip) 如果!好{ 套接字=,{ 插座:zmq4.NewSocket () }//做一些初始操作连接 pushList.sockets。存储(ip、newSocket) 其他}{ 套接字=socketInter。(* SocketMutex) } socket.Lock () 推迟socket.Unlock () socket.socket.Send(数据) }
但是这样还是有问题,相信经验比较丰富的老哥一眼就能看出来,问题处在socketInter, ok=pushList.sockets.Load (ip)这行代码上,如果地图中没有这个值,且有多个协程同时访问到这行代码,显然这几个协程的好都会置为false,然后都进入第一个如果代码块,创建多个插座实例,并且争相覆盖原有值。
单纯解决这个问题也很简单,就是使用<代码> sync.Map。LoadOrStore(键界面{},值接口{})(v接口{},加载bool) 代码>这个api,来原子地去做读写。
然而这还没完,我们的写入新值的操作不光是调用一个api创建插座就完的了,还要有一系列的初始化操作,我们必须保证在初始化完成之前,其他通过负载拿到这个实例的协程无法真正访问插座实例。
这时候显然sync.Map自带的机制已经无法解决这个问题了,那么我们必须寻求其他的手段,要么锁,要么就sync.WaitGroup或者任何的其他什么东西。
后来经大佬指点,我在encoder.go中看到了这么一段代码:
func typeEncoder (t reflect.Type) encoderFunc { 如果fi,好的:=encoderCache.Load (t);好{ 返回fi。(encoderFunc) }//处理递归类型、填充和一个地图//间接函数之前我们建立它。这种等待> struct SocketMutex { sync.Mutex 套接字* zmq4.Socket } struct SocketMap { 套接字sync.Map } func (pushList * SocketMap)推动(ip字符串,[]的数据字节){ * SocketMutex类型SocketFunc func () var ( 套接字* SocketMutex w sync.WaitGroup ) 套接字=,SocketMutex { 插座:zmq4.NewSocket () } w.Add (1) socketf吧=pushList.sockets。LoadOrStore (ip, SocketFunc (func () * SocketMutex) { w.Wait () 返回套接字 }) 如果!好{ 套接字=,{ 插座:zmq4.NewSocket () }//做一些初始操作连接 w.Done () 其他}{ 套接字=socketInter。(* SockeFunc) () } socket.Lock () 推迟socket.Unlock () socket.socket.Send(数据) }golang中sync.Map并发创建,读取问题实战记录