博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
golang mgo的mongo连接池设置:必须手动加上maxPoolSize
阅读量:5258 次
发布时间:2019-06-14

本文共 5561 字,大约阅读时间需要 18 分钟。

本司礼物系统使用了golang的 mongo库 mgo,中间踩了一些坑,总结下避免大家再踩坑

golang的mgo库说明里是说明了开启连接复用的,但观察实验发现,这并没有根本实现连接的控制,连接复用仅在有空闲连接时生效,高并发时无可用连接会不断创建新连接,所以最终还是需要程序员自行去限制最大连接才行。

废话不多说,开始上代码

GlobalMgoSession, err := mgo.Dial(host)
 
func (m *MongoBaseDao) Get(tablename string, id string, result 
interface
{}) 
interface
{} {
    
session := GlobalMgoSession.Clone()
    
defer session.Close()
 
    
collection := session.DB(globalMgoDbName).C(tablename)
    
err := collection.FindId(bson.ObjectIdHex(id)).One(result)
 
    
if 
err != nil {
        
logkit.Logger.Error(
"mongo_base method:Get " 
+ err.Error())
    
}
    
return 
result
}

 

golang main入口启动时,我们会创建一个全局session,然后每次使用时clone session的信息和连接,用于本次请求,使用后调用session.Close() 释放连接。

// Clone works just like Copy, but also reuses the same socket as the original
// session, in case it had already reserved one due to its consistency
// guarantees.  This behavior ensures that writes performed in the old session
// are necessarily observed when using the new session, as long as it was a
// strong or monotonic session.  That said, it also means that long operations
// may cause other goroutines using the original session to wait.
func (s *Session) Clone() *Session {
    
s.m.Lock()
    
scopy := copySession(s, true)
    
s.m.Unlock()
    
return 
scopy
}
 
 
// Close terminates the session.  It's a runtime error to use a session
// after it has been closed.
func (s *Session) Close() {
    
s.m.Lock()
    
if 
s.cluster_ != nil {
        
debugf(
"Closing session %p"
, s)
        
s.unsetSocket()  
//释放当前线程占用的socket 置为nil
        
s.cluster_.Release()
        
s.cluster_ = nil
    
}
    
s.m.Unlock()
}

 

Clone的方法注释里说明会重用原始session的socket连接,但是并发请求一大,其他协程来不及释放连接,当前协程会怎么办?

 

func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) {
    
// Read-only lock to check for previously reserved socket.
    
s.m.RLock()
    
// If there is a slave socket reserved and its use is acceptable, take it as long
    
// as there isn't a master socket which would be preferred by the read preference mode.
    
if 
s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {
        
socket := s.slaveSocket
        
socket.Acquire()
        
s.m.RUnlock()
        
logkit.Logger.Info(
"sgp_test 1 acquireSocket slave is ok!"
)
        
return 
socket, nil
    
}
    
if 
s.masterSocket != nil {
        
socket := s.masterSocket
        
socket.Acquire()
        
s.m.RUnlock()
        
logkit.Logger.Info(
"sgp_test 1  acquireSocket master is ok!"
)
        
return 
socket, nil
    
}
 
    
s.m.RUnlock()
 
    
// No go.  We may have to request a new socket and change the session,
    
// so try again but with an exclusive lock now.
    
s.m.Lock()
    
defer s.m.Unlock()
    
if 
s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {
        
s.slaveSocket.Acquire()
        
logkit.Logger.Info(
"sgp_test 2  acquireSocket slave is ok!"
)
        
return 
s.slaveSocket, nil
    
}
    
if 
s.masterSocket != nil {
        
s.masterSocket.Acquire()
        
logkit.Logger.Info(
"sgp_test 2  acquireSocket master is ok!"
)
        
return 
s.masterSocket, nil
    
}
 
    
// Still not good.  We need a new socket.
    
sock, err := s.cluster().AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit)
 
......
    
logkit.Logger.Info(
"sgp_test 3   acquireSocket cluster AcquireSocket is ok!"
)
    
return 
sock, nil
 
}

在源码中加debug,结果日志说明一切:

Mar 25 09:46:40 dev02.pandatv.com bikini[12607]:  [info] sgp_test 1  acquireSocket master is ok!
Mar 25 09:46:40 dev02.pandatv.com bikini[12607]:  [info] sgp_test 1  acquireSocket master is ok!
Mar 25 09:46:41 dev02.pandatv.com bikini[12607]:  [info] sgp_test 1 acquireSocket slave is ok!
Mar 25 09:46:41 dev02.pandatv.com bikini[12607]:  [info] sgp_test 3   acquireSocket cluster AcquireSocket is ok!
Mar 25 09:46:41 dev02.pandatv.com bikini[12607]:  [info] sgp_test 3   acquireSocket cluster AcquireSocket is ok!
Mar 25 09:46:41 dev02.pandatv.com bikini[12607]:  [info] sgp_test 3   acquireSocket cluster AcquireSocket is ok!

不断的创建连接  AcquireSocket

 $  netstat -nat|grep -i 27017|wc -l

400

如果每个session 不调用close,会达到恐怖的4096,并堵死其他请求,所以clone或copy session时一定要defer close掉

启用maxPoolLimit 参数则会限制总连接大小,连接到限制则当前协程会sleep等待  直到可以创建连接,高并发时锁有问题,会导致多创建几个连接

src/gopkg.in/mgo.v2/cluster.go 
    
s, abended, err := server.AcquireSocket(poolLimit, socketTimeout)
        
if 
err == errPoolLimit {
            
if 
!warnedLimit {
                
warnedLimit = true
                
logkit.Logger.Error(
"sgp_test WARNING: Per-server connection limit reached. " 
+ err.Error())
                
log(
"WARNING: Per-server connection limit reached."
)
            
}
            
time.Sleep(100 * time.Millisecond)
            
continue
        
}
 
session.go:
// SetPoolLimit sets the maximum number of sockets in use in a single server
  
// before this session will block waiting for a socket to be available.
  
// The default limit is 4096.
  
//
  
// This limit must be set to cover more than any expected workload of the
  
// application. It is a bad practice and an unsupported use case to use the
  
// database driver to define the concurrency limit of an application. Prevent
  
// such concurrency "at the door" instead, by properly restricting the amount
  
// of used resources and number of goroutines before they are created.
  
func (s *Session) SetPoolLimit(limit int) {
      
s.m.Lock()
      
s.poolLimit = limit
      
s.m.Unlock()
  
}

连接池设置方法:

1、配置中 增加 

[host]:[port]?maxPoolSize=10

2、代码中 :

dao.GlobalMgoSession.SetPoolLimit(10)

再做压测:

 $  netstat -nat|grep -i 27017|wc -l

15

 

 

结论:

每次clone session之后,操作结束时如果调用 session.Close 则会unset Socket  ,socket refer数减少,如果不设置上限,每个协程请求到来发现无空闲连接就会创建socket连接,直到达到最大值4096,而mongo的连接数上限一般也就是1万,也就是一个端口你只能启动一两个进程保证连接不被撑爆,过多的连接数客户端效率不高,server端更会耗费内存和CPU,所以需要启用自定义连接池 , 启用连接池也需要注意如果有pooMaxLimit个协程执行过长或者死循环不释放socket连接,也会悲剧。

mgo底层socket连接池只在maxPooMaxLimit 范围内实现复用,需要自行优化。

 

 

转载于:https://www.cnblogs.com/shenguanpu/p/5318727.html

你可能感兴趣的文章
寄Android开发Gradle你需要知道的知识
查看>>
简述spring中常有的几种advice?
查看>>
整理推荐的CSS属性书写顺序
查看>>
ServerSocket和Socket通信
查看>>
css & input type & search icon
查看>>
源代码的下载和编译读后感
查看>>
Kafka学习笔记
查看>>
Octotree Chrome安装与使用方法
查看>>
Windows 环境下基于 Redis 的 Celery 任务调度模块的实现
查看>>
趣谈Java变量的可见性问题
查看>>
C# 强制关闭当前程序进程(完全Kill掉不留痕迹)
查看>>
ssm框架之将数据库的数据导入导出为excel文件
查看>>
语音识别中的MFCC的提取原理和MATLAB实现
查看>>
验证组件FluentValidation的使用示例
查看>>
0320-学习进度条
查看>>
解决windows系统的oracle数据库不能启动ora-00119和ora-00130的问题
查看>>
ip相关问题解答
查看>>
MetaWeblog API Test
查看>>
反弹SHELL
查看>>
关闭Chrome浏览器的自动更新和升级提示
查看>>