package pool |
|
|
| import ( |
| "errors" |
| "fmt" |
| log "github.com/sirupsen/logrus" |
| "sync" |
| "time" |
| //"reflect" |
| ) |
|
|
| var ( |
| //ErrMaxActiveConnReached 连接池超限 |
| ErrMaxActiveConnReached = errors.New("MaxActiveConnReached") |
| ) |
|
|
| // Config 连接池相关配置 |
| type Config struct { |
| //连接池中拥有的最小连接数 |
| InitialCap int |
| //最大并发存活连接数 |
| MaxCap int |
| //最大空闲连接 |
| MaxIdle int |
| //生成连接的方法 |
| Factory func() (interface{}, error) |
| //关闭连接的方法 |
| Close func(interface{}) error |
| //检查连接是否有效的方法 |
| Ping func(interface{}) error |
| //连接最大空闲时间,超过该事件则将失效 |
| IdleTimeout time.Duration |
| } |
|
|
| type connReq struct { |
| idleConn *idleConn |
| } |
|
|
| // channelPool 存放连接信息 |
| type channelPool struct { |
| mu sync.RWMutex |
| conns chan *idleConn |
| factory func() (interface{}, error) |
| close func(interface{}) error |
| ping func(interface{}) error |
| idleTimeout, waitTimeOut time.Duration |
| maxActive int |
| openingConns int |
| connReqs []chan connReq |
| } |
|
|
| type idleConn struct { |
| conn interface{} |
| t time.Time |
| } |
|
|
| // NewChannelPool 初始化连接 |
| func NewChannelPool(poolConfig *Config) (Pool, error) { |
| if !(poolConfig.InitialCap <= poolConfig.MaxIdle && poolConfig.MaxCap >= poolConfig.MaxIdle && poolConfig.InitialCap >= 0) { |
| return nil, errors.New("invalid capacity settings") |
| } |
| if poolConfig.Factory == nil { |
| return nil, errors.New("invalid factory func settings") |
| } |
| if poolConfig.Close == nil { |
| return nil, errors.New("invalid close func settings") |
| } |
|
|
| c := &channelPool{ |
| conns: make(chan *idleConn, poolConfig.MaxIdle), |
| factory: poolConfig.Factory, |
| close: poolConfig.Close, |
| idleTimeout: poolConfig.IdleTimeout, |
| maxActive: poolConfig.MaxCap, |
| openingConns: poolConfig.InitialCap, |
| } |
|
|
| if poolConfig.Ping != nil { |
| c.ping = poolConfig.Ping |
| } |
|
|
| for i := 0; i < poolConfig.InitialCap; i++ { |
| conn, err := c.factory() |
| if err != nil { |
| c.Release() |
| return nil, fmt.Errorf("factory is not able to fill the pool: %s", err) |
| } |
| c.conns <- &idleConn{conn: conn, t: time.Now()} |
| } |
|
|
| return c, nil |
| } |
|
|
| // getConns 获取所有连接 |
| func (c *channelPool) getConns() chan *idleConn { |
| c.mu.Lock() |
| conns := c.conns |
| c.mu.Unlock() |
| return conns |
| } |
|
|
| // Get 从pool中取一个连接 |
| func (c *channelPool) Get() (interface{}, error) { |
| conns := c.getConns() |
| if conns == nil { |
| return nil, ErrClosed |
| } |
| for { |
| select { |
| case wrapConn := <-conns: |
| if wrapConn == nil { |
| return nil, ErrClosed |
| } |
| //判断是否超时,超时则丢弃 |
| if timeout := c.idleTimeout; timeout > 0 { |
| if wrapConn.t.Add(timeout).Before(time.Now()) { |
| //丢弃并关闭该连接 |
| c.Close(wrapConn.conn) |
| continue |
| } |
| } |
| //判断是否失效,失效则丢弃,如果用户没有设定 ping 方法,就不检查 |
| if c.ping != nil { |
| if err := c.Ping(wrapConn.conn); err != nil { |
| c.Close(wrapConn.conn) |
| continue |
| } |
| } |
| return wrapConn.conn, nil |
| default: |
| c.mu.Lock() |
| log.Debugf("openConn %v %v", c.openingConns, c.maxActive) |
| if c.openingConns >= c.maxActive { |
| req := make(chan connReq, 1) |
| c.connReqs = append(c.connReqs, req) |
| c.mu.Unlock() |
| ret, ok := <-req |
| if !ok { |
| return nil, ErrMaxActiveConnReached |
| } |
| if timeout := c.idleTimeout; timeout > 0 { |
| if ret.idleConn.t.Add(timeout).Before(time.Now()) { |
| //丢弃并关闭该连接 |
| c.Close(ret.idleConn.conn) |
| continue |
| } |
| } |
| return ret.idleConn.conn, nil |
| } |
| if c.factory == nil { |
| c.mu.Unlock() |
| return nil, ErrClosed |
| } |
| conn, err := c.factory() |
| if err != nil { |
| c.mu.Unlock() |
| return nil, err |
| } |
| c.openingConns++ |
| c.mu.Unlock() |
| return conn, nil |
| } |
| } |
| } |
|