未验证 提交 e32b8e44 编写于 作者: 陈键冬 提交者: GitHub

Reuse the pool module (#97)

* Reuse the pool module

* Remove useless init function

* Rename variable
上级 c0675e9a
......@@ -2,10 +2,11 @@ package query
import (
"github.com/didi/nightingale/src/toolkits/address"
"github.com/didi/nightingale/src/toolkits/pools"
)
var (
TransferConnPools *ConnPools = &ConnPools{}
TransferConnPools *pools.ConnPools
connTimeout int32
callTimeout int32
......@@ -24,6 +25,7 @@ type SeriesQuerySection struct {
func Init(cfg SeriesQuerySection) {
Config = cfg
TransferConnPools = CreateConnPools(Config.MaxConn, Config.MaxIdle,
Config.ConnTimeout, Config.CallTimeout, address.GetRPCAddresses("transfer"))
TransferConnPools = pools.NewConnPools(
Config.MaxConn, Config.MaxIdle, Config.ConnTimeout, Config.CallTimeout, address.GetRPCAddresses("transfer"),
)
}
package query
import (
"bufio"
"fmt"
"io"
"math/rand"
"net"
"net/rpc"
"reflect"
"sync"
"time"
"github.com/toolkits/pkg/pool"
"github.com/ugorji/go/codec"
)
// 每个后端backend对应一个ConnPool
type ConnPools struct {
sync.RWMutex
Pools []*pool.ConnPool
MaxConns int
MaxIdle int
ConnTimeout int
CallTimeout int
}
func CreateConnPools(maxConns, maxIdle, connTimeout, callTimeout int, cluster []string) *ConnPools {
cp := &ConnPools{Pools: []*pool.ConnPool{}, MaxConns: maxConns, MaxIdle: maxIdle,
ConnTimeout: connTimeout, CallTimeout: callTimeout}
ct := time.Duration(cp.ConnTimeout) * time.Millisecond
for _, address := range cluster {
cp.Pools = append(cp.Pools, createOnePool(address, address, ct, maxConns, maxIdle))
}
return cp
}
func createOnePool(name string, address string, connTimeout time.Duration, maxConns int, maxIdle int) *pool.ConnPool {
p := pool.NewConnPool(name, address, maxConns, maxIdle)
p.New = func(connName string) (pool.NConn, error) {
//校验地址是否正确
_, err := net.ResolveTCPAddr("tcp", p.Address)
if err != nil {
return nil, err
}
conn, err := net.DialTimeout("tcp", p.Address, connTimeout)
if err != nil {
return nil, err
}
var mh codec.MsgpackHandle
mh.MapType = reflect.TypeOf(map[string]interface{}(nil))
var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser
io.Closer
*bufio.Reader
*bufio.Writer
}{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
rpcCodec := codec.MsgpackSpecRpc.ClientCodec(bufconn, &mh)
return RpcClient{cli: rpc.NewClientWithCodec(rpcCodec), name: connName}, nil
}
return p
}
// 同步发送, 完成发送或超时后 才能返回
func (cp *ConnPools) Call(method string, args interface{}, resp interface{}) error {
connPool := cp.Get()
conn, err := connPool.Fetch()
if err != nil {
return fmt.Errorf("get connection fail: conn %v, err %v. proc: %s", conn, err, connPool.Proc())
}
rpcClient := conn.(RpcClient)
callTimeout := time.Duration(cp.CallTimeout) * time.Millisecond
done := make(chan error, 1)
go func() {
done <- rpcClient.Call(method, args, resp)
}()
select {
case <-time.After(callTimeout):
connPool.ForceClose(conn)
return fmt.Errorf("%v, call timeout", connPool.Proc())
case err = <-done:
if err != nil {
connPool.ForceClose(conn)
err = fmt.Errorf(" call failed, err %v. proc: %s", err, connPool.Proc())
} else {
connPool.Release(conn)
}
return err
}
}
func (cp *ConnPools) Get() *pool.ConnPool {
cp.RLock()
defer cp.RUnlock()
i := rand.Intn(len(cp.Pools))
return cp.Pools[i]
}
// RpcCient, 要实现io.Closer接口
type RpcClient struct {
cli *rpc.Client
name string
}
func (r RpcClient) Name() string {
return r.name
}
func (r RpcClient) Closed() bool {
return r.cli == nil
}
func (r RpcClient) Close() error {
if r.cli != nil {
err := r.cli.Close()
r.cli = nil
return err
}
return nil
}
func (r RpcClient) Call(method string, args interface{}, reply interface{}) error {
return r.cli.Call(method, args, reply)
}
......@@ -41,7 +41,7 @@ func Query(reqs []*dataobj.QueryData) ([]*dataobj.TsdbQueryResponse, error) {
var resp *dataobj.QueryDataResp
var err error
for i := 0; i < 3; i++ {
err = TransferConnPools.Call("Transfer.Query", reqs, &resp)
err = TransferConnPools.Call("", "Transfer.Query", reqs, &resp)
if err == nil {
break
}
......
......@@ -3,10 +3,10 @@ package backend
import (
"github.com/toolkits/pkg/container/list"
"github.com/toolkits/pkg/container/set"
"github.com/toolkits/pkg/pool"
"github.com/toolkits/pkg/str"
"github.com/didi/nightingale/src/modules/transfer/cache"
"github.com/didi/nightingale/src/toolkits/pools"
"github.com/didi/nightingale/src/toolkits/report"
"github.com/didi/nightingale/src/toolkits/stats"
)
......@@ -44,8 +44,8 @@ var (
JudgeQueues = cache.SafeJudgeQueue{}
// 连接池 node_address -> connection_pool
TsdbConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)}
JudgeConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)}
TsdbConnPools *pools.ConnPools
JudgeConnPools *pools.ConnPools
connTimeout int32
callTimeout int32
......@@ -75,11 +75,13 @@ func initConnPools() {
tsdbInstances.Add(addr)
}
}
TsdbConnPools = CreateConnPools(Config.MaxConns, Config.MaxIdle,
Config.ConnTimeout, Config.CallTimeout, tsdbInstances.ToSlice())
TsdbConnPools = pools.NewConnPools(
Config.MaxConns, Config.MaxIdle, Config.ConnTimeout, Config.CallTimeout, tsdbInstances.ToSlice(),
)
JudgeConnPools = CreateConnPools(Config.MaxConns, Config.MaxIdle,
Config.ConnTimeout, Config.CallTimeout, GetJudges())
JudgeConnPools = pools.NewConnPools(
Config.MaxConns, Config.MaxIdle, Config.ConnTimeout, Config.CallTimeout, GetJudges(),
)
}
func initSendQueues() {
......
......@@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"math/rand"
"strings"
......@@ -12,6 +13,7 @@ import (
"github.com/didi/nightingale/src/dataobj"
"github.com/didi/nightingale/src/modules/transfer/calc"
"github.com/didi/nightingale/src/toolkits/address"
"github.com/didi/nightingale/src/toolkits/pools"
"github.com/didi/nightingale/src/toolkits/stats"
"github.com/toolkits/pkg/logger"
......@@ -235,15 +237,15 @@ func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err
resp = &dataobj.TsdbQueryResponse{}
pk := dataobj.PKWithCounter(para.Endpoint, para.Counter)
pools, err := SelectPoolByPK(pk)
ps, err := SelectPoolByPK(pk)
if err != nil {
return resp, err
}
count := len(pools)
count := len(ps)
for _, i := range rand.Perm(count) {
onePool := pools[i].Pool
addr := pools[i].Addr
onePool := ps[i].Pool
addr := ps[i].Addr
conn, err := onePool.Fetch()
if err != nil {
......@@ -251,7 +253,7 @@ func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err
continue
}
rpcConn := conn.(RpcClient)
rpcConn := conn.(pools.RpcClient)
if rpcConn.Closed() {
onePool.ForceClose(conn)
......@@ -322,21 +324,21 @@ func SelectPoolByPK(pk string) ([]Pool, error) {
return []Pool{}, errors.New("node not found")
}
var pools []Pool
var ps []Pool
for _, addr := range nodeAddrs.Addrs {
onePool, found := TsdbConnPools.Get(addr)
if !found {
logger.Errorf("addr %s not found", addr)
continue
}
pools = append(pools, Pool{Pool: onePool, Addr: addr})
ps = append(ps, Pool{Pool: onePool, Addr: addr})
}
if len(pools) < 1 {
return pools, errors.New("addr not found")
if len(ps) < 1 {
return ps, errors.New("addr not found")
}
return pools, nil
return ps, nil
}
func getTags(counter string) (tags string) {
......
......@@ -16,6 +16,6 @@ func RebuildJudgePool() {
continue
}
backend.JudgeConnPools.Update(judges)
backend.JudgeConnPools.UpdatePools(judges)
}
}
package rpc
import (
"github.com/toolkits/pkg/pool"
"github.com/didi/nightingale/src/toolkits/pools"
)
var (
// 连接池 node_address -> connection_pool
IndexConnPools *ConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)}
IndexConnPools *pools.ConnPools
Config RpcClientSection
)
......@@ -17,12 +17,11 @@ type RpcClientSection struct {
CallTimeout int `yaml:"callTimeout"`
}
func Init(cfg RpcClientSection, indexs []string) {
func Init(cfg RpcClientSection, indexes []string) {
Config = cfg
IndexConnPools = CreateConnPools(cfg.MaxConns, cfg.MaxIdle,
cfg.ConnTimeout, cfg.CallTimeout, indexs)
IndexConnPools = pools.NewConnPools(cfg.MaxConns, cfg.MaxIdle, cfg.ConnTimeout, cfg.CallTimeout, indexes)
}
func ReNewPools(indexs []string) []string {
return IndexConnPools.UpdatePools(indexs)
func ReNewPools(indexes []string) []string {
return IndexConnPools.UpdatePools(indexes)
}
package rpc
import (
"bufio"
"fmt"
"io"
"net"
"net/rpc"
"reflect"
"sync"
"time"
"github.com/toolkits/pkg/pool"
"github.com/ugorji/go/codec"
)
// 每个后端backend对应一个ConnPool
type ConnPools struct {
sync.RWMutex
M map[string]*pool.ConnPool
MaxConns int
MaxIdle int
ConnTimeout int
CallTimeout int
}
func CreateConnPools(maxConns, maxIdle, connTimeout, callTimeout int, cluster []string) *ConnPools {
cp := &ConnPools{M: make(map[string]*pool.ConnPool), MaxConns: maxConns, MaxIdle: maxIdle,
ConnTimeout: connTimeout, CallTimeout: callTimeout}
ct := time.Duration(cp.ConnTimeout) * time.Millisecond
for _, address := range cluster {
if _, exist := cp.M[address]; exist {
continue
}
cp.M[address] = createOnePool(address, address, ct, maxConns, maxIdle)
}
return cp
}
func createOnePool(name string, address string, connTimeout time.Duration, maxConns int, maxIdle int) *pool.ConnPool {
p := pool.NewConnPool(name, address, maxConns, maxIdle)
p.New = func(connName string) (pool.NConn, error) {
//校验地址是否正确
_, err := net.ResolveTCPAddr("tcp", p.Address)
if err != nil {
return nil, err
}
conn, err := net.DialTimeout("tcp", p.Address, connTimeout)
if err != nil {
return nil, err
}
var mh codec.MsgpackHandle
mh.MapType = reflect.TypeOf(map[string]interface{}(nil))
var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser
io.Closer
*bufio.Reader
*bufio.Writer
}{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
rpcCodec := codec.MsgpackSpecRpc.ClientCodec(bufconn, &mh)
return RpcClient{cli: rpc.NewClientWithCodec(rpcCodec), name: connName}, nil
}
return p
}
// 同步发送, 完成发送或超时后 才能返回
func (this *ConnPools) Call(addr, method string, args interface{}, resp interface{}) error {
connPool, exists := this.Get(addr)
if !exists {
return fmt.Errorf("%s has no connection pool", addr)
}
conn, err := connPool.Fetch()
if err != nil {
return fmt.Errorf("%s get connection fail: conn %v, err %v. proc: %s", addr, conn, err, connPool.Proc())
}
rpcClient := conn.(RpcClient)
callTimeout := time.Duration(this.CallTimeout) * time.Millisecond
done := make(chan error, 1)
go func() {
done <- rpcClient.Call(method, args, resp)
}()
select {
case <-time.After(callTimeout):
connPool.ForceClose(conn)
return fmt.Errorf("%s, call timeout", addr)
case err = <-done:
if err != nil {
connPool.ForceClose(conn)
err = fmt.Errorf("%s, call failed, err %v. proc: %s", addr, err, connPool.Proc())
} else {
connPool.Release(conn)
}
return err
}
}
func (this *ConnPools) Get(address string) (*pool.ConnPool, bool) {
this.RLock()
defer this.RUnlock()
p, exists := this.M[address]
return p, exists
}
func (c *ConnPools) UpdatePools(addrs []string) []string {
c.Lock()
defer c.Unlock()
newAddrs := []string{}
if len(addrs) == 0 {
c.M = make(map[string]*pool.ConnPool)
return newAddrs
}
addrMap := make(map[string]struct{})
ct := time.Duration(c.ConnTimeout) * time.Millisecond
for _, addr := range addrs {
addrMap[addr] = struct{}{}
_, exists := c.M[addr]
if exists {
continue
}
newAddrs = append(newAddrs, addr)
c.M[addr] = createOnePool(addr, addr, ct, c.MaxConns, c.MaxIdle)
}
for addr := range c.M { //删除旧的地址
if _, exists := addrMap[addr]; !exists {
delete(c.M, addr)
}
}
return newAddrs
}
// RpcCient, 要实现io.Closer接口
type RpcClient struct {
cli *rpc.Client
name string
}
func (this RpcClient) Name() string {
return this.name
}
func (this RpcClient) Closed() bool {
return this.cli == nil
}
func (this RpcClient) Close() error {
if this.cli != nil {
err := this.cli.Close()
this.cli = nil
return err
}
return nil
}
func (this RpcClient) Call(method string, args interface{}, reply interface{}) error {
return this.cli.Call(method, args, reply)
}
......@@ -6,8 +6,9 @@ import (
"github.com/toolkits/pkg/container/list"
"github.com/toolkits/pkg/container/set"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/pool"
"github.com/toolkits/pkg/str"
"github.com/didi/nightingale/src/toolkits/pools"
)
type MigrateSection struct {
......@@ -39,8 +40,8 @@ var (
NewTsdbNodeRing *ConsistentHashRing
// 连接池 node_address -> connection_pool
TsdbConnPools *ConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)}
NewTsdbConnPools *ConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)}
TsdbConnPools *pools.ConnPools
NewTsdbConnPools *pools.ConnPools
)
type QueueFilter struct {
......@@ -87,16 +88,18 @@ func initConnPools() {
for _, addr := range Config.OldCluster {
tsdbInstances.Add(addr)
}
TsdbConnPools = CreateConnPools(Config.MaxConns, Config.MaxIdle,
Config.ConnTimeout, Config.CallTimeout, tsdbInstances.ToSlice())
TsdbConnPools = pools.NewConnPools(
Config.MaxConns, Config.MaxIdle, Config.ConnTimeout, Config.CallTimeout, tsdbInstances.ToSlice(),
)
// tsdb
newTsdbInstances := set.NewSafeSet()
for _, addr := range Config.NewCluster {
newTsdbInstances.Add(addr)
}
NewTsdbConnPools = CreateConnPools(Config.MaxConns, Config.MaxIdle,
Config.ConnTimeout, Config.CallTimeout, newTsdbInstances.ToSlice())
NewTsdbConnPools = pools.NewConnPools(
Config.MaxConns, Config.MaxIdle, Config.ConnTimeout, Config.CallTimeout, newTsdbInstances.ToSlice(),
)
}
func initQueues() {
......
package migrate
import (
"bufio"
"fmt"
"io"
"net"
"net/rpc"
"reflect"
"sync"
"time"
"github.com/toolkits/pkg/pool"
"github.com/ugorji/go/codec"
)
// 每个后端backend对应一个ConnPool
type ConnPools struct {
sync.RWMutex
M map[string]*pool.ConnPool
MaxConns int
MaxIdle int
ConnTimeout int
CallTimeout int
}
func CreateConnPools(maxConns, maxIdle, connTimeout, callTimeout int, cluster []string) *ConnPools {
cp := &ConnPools{M: make(map[string]*pool.ConnPool), MaxConns: maxConns, MaxIdle: maxIdle,
ConnTimeout: connTimeout, CallTimeout: callTimeout}
ct := time.Duration(cp.ConnTimeout) * time.Millisecond
for _, address := range cluster {
if _, exist := cp.M[address]; exist {
continue
}
cp.M[address] = createOnePool(address, address, ct, maxConns, maxIdle)
}
return cp
}
func createOnePool(name string, address string, connTimeout time.Duration, maxConns int, maxIdle int) *pool.ConnPool {
p := pool.NewConnPool(name, address, maxConns, maxIdle)
p.New = func(connName string) (pool.NConn, error) {
//校验地址是否正确
_, err := net.ResolveTCPAddr("tcp", p.Address)
if err != nil {
return nil, err
}
conn, err := net.DialTimeout("tcp", p.Address, connTimeout)
if err != nil {
return nil, err
}
var mh codec.MsgpackHandle
mh.MapType = reflect.TypeOf(map[string]interface{}(nil))
var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser
io.Closer
*bufio.Reader
*bufio.Writer
}{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
rpcCodec := codec.MsgpackSpecRpc.ClientCodec(bufconn, &mh)
return RpcClient{cli: rpc.NewClientWithCodec(rpcCodec), name: connName}, nil
}
return p
}
// 同步发送, 完成发送或超时后 才能返回
func (this *ConnPools) Call(addr, method string, args interface{}, resp interface{}) error {
connPool, exists := this.Get(addr)
if !exists {
return fmt.Errorf("%s has no connection pool", addr)
}
conn, err := connPool.Fetch()
if err != nil {
return fmt.Errorf("%s get connection fail: conn %v, err %v. proc: %s", addr, conn, err, connPool.Proc())
}
rpcClient := conn.(RpcClient)
callTimeout := time.Duration(this.CallTimeout) * time.Millisecond
done := make(chan error, 1)
go func() {
done <- rpcClient.Call(method, args, resp)
}()
select {
case <-time.After(callTimeout):
connPool.ForceClose(conn)
return fmt.Errorf("%s, call timeout", addr)
case err = <-done:
if err != nil {
connPool.ForceClose(conn)
err = fmt.Errorf("%s, call failed, err %v. proc: %s", addr, err, connPool.Proc())
} else {
connPool.Release(conn)
}
return err
}
}
func (this *ConnPools) Get(address string) (*pool.ConnPool, bool) {
this.RLock()
defer this.RUnlock()
p, exists := this.M[address]
return p, exists
}
// RpcCient, 要实现io.Closer接口
type RpcClient struct {
cli *rpc.Client
name string
}
func (this RpcClient) Name() string {
return this.name
}
func (this RpcClient) Closed() bool {
return this.cli == nil
}
func (this RpcClient) Close() error {
if this.cli != nil {
err := this.cli.Close()
this.cli = nil
return err
}
return nil
}
func (this RpcClient) Call(method string, args interface{}, reply interface{}) error {
return this.cli.Call(method, args, reply)
}
......@@ -7,6 +7,7 @@ import (
"time"
"github.com/didi/nightingale/src/dataobj"
"github.com/didi/nightingale/src/toolkits/pools"
"github.com/didi/nightingale/src/toolkits/str"
"github.com/toolkits/pkg/pool"
......@@ -68,19 +69,19 @@ func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err
resp = &dataobj.TsdbQueryResponse{}
pk := str.PK(para.Endpoint, para.Counter)
pool, addr, err := selectPoolByPK(pk)
onePool, addr, err := selectPoolByPK(pk)
if err != nil {
return resp, err
}
conn, err := pool.Fetch()
conn, err := onePool.Fetch()
if err != nil {
return resp, err
}
rpcConn := conn.(RpcClient)
rpcConn := conn.(pools.RpcClient)
if rpcConn.Closed() {
pool.ForceClose(conn)
onePool.ForceClose(conn)
return resp, errors.New("conn closed")
}
......@@ -98,20 +99,20 @@ func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err
select {
case <-time.After(time.Duration(Config.CallTimeout) * time.Millisecond):
pool.ForceClose(conn)
return nil, fmt.Errorf("%s, call timeout. proc: %s", addr, pool.Proc())
onePool.ForceClose(conn)
return nil, fmt.Errorf("%s, call timeout. proc: %s", addr, onePool.Proc())
case r := <-ch:
if r.Err != nil {
pool.ForceClose(conn)
return r.Resp, fmt.Errorf("%s, call failed, err %v. proc: %s", addr, r.Err, pool.Proc())
onePool.ForceClose(conn)
return r.Resp, fmt.Errorf("%s, call failed, err %v. proc: %s", addr, r.Err, onePool.Proc())
} else {
pool.Release(conn)
onePool.Release(conn)
if len(r.Resp.Values) < 1 {
r.Resp.Values = []*dataobj.RRDData{}
return r.Resp, nil
}
fixed := []*dataobj.RRDData{}
fixed := make([]*dataobj.RRDData, 0)
for _, v := range r.Resp.Values {
if v == nil || !(v.Timestamp >= start && v.Timestamp <= end) {
continue
......@@ -136,11 +137,10 @@ func selectPoolByPK(pk string) (*pool.ConnPool, string, error) {
return nil, "", errors.New("node not found")
}
pool, found := TsdbConnPools.Get(addr)
onePool, found := TsdbConnPools.Get(addr)
if !found {
return nil, "", errors.New("addr not found")
}
return pool, addr, nil
return onePool, addr, nil
}
package backend
package pools
import (
"bufio"
......@@ -11,38 +11,43 @@ import (
"time"
"github.com/toolkits/pkg/pool"
"github.com/ugorji/go/codec"
)
// backend -> ConnPool
// ConnPools is responsible for the Connection Pool lifecycle management.
type ConnPools struct {
sync.RWMutex
M map[string]*pool.ConnPool
P map[string]*pool.ConnPool
MaxConns int
MaxIdle int
ConnTimeout int
CallTimeout int
}
func CreateConnPools(maxConns, maxIdle, connTimeout, callTimeout int, cluster []string) *ConnPools {
cp := &ConnPools{M: make(map[string]*pool.ConnPool), MaxConns: maxConns, MaxIdle: maxIdle,
ConnTimeout: connTimeout, CallTimeout: callTimeout}
func NewConnPools(maxConns, maxIdle, connTimeout, callTimeout int, cluster []string) *ConnPools {
cp := &ConnPools{
P: make(map[string]*pool.ConnPool),
MaxConns: maxConns,
MaxIdle: maxIdle,
ConnTimeout: connTimeout,
CallTimeout: callTimeout,
}
ct := time.Duration(cp.ConnTimeout) * time.Millisecond
for _, address := range cluster {
if _, exist := cp.M[address]; exist {
if _, exist := cp.P[address]; exist {
continue
}
cp.M[address] = createOnePool(address, address, ct, maxConns, maxIdle)
cp.P[address] = createOnePool(address, address, ct, maxConns, maxIdle)
}
return cp
}
func createOnePool(name, address string, connTimeout time.Duration, maxConns, maxIdle int) *pool.ConnPool {
p := pool.NewConnPool(name, address, maxConns, maxIdle)
p.New = func(connName string) (pool.NConn, error) {
// check address
// valid address
_, err := net.ResolveTCPAddr("tcp", p.Address)
if err != nil {
return nil, err
......@@ -55,11 +60,12 @@ func createOnePool(name, address string, connTimeout time.Duration, maxConns, ma
var mh codec.MsgpackHandle
mh.MapType = reflect.TypeOf(map[string]interface{}(nil))
var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser
// bufconn here is a buffered io.ReadWriteCloser
var bufconn = struct {
io.Closer
*bufio.Reader
*bufio.Writer
}{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
}{Closer: conn, Reader: bufio.NewReader(conn), Writer: bufio.NewWriter(conn)}
rpcCodec := codec.MsgpackSpecRpc.ClientCodec(bufconn, &mh)
return RpcClient{cli: rpc.NewClientWithCodec(rpcCodec), name: connName}, nil
......@@ -67,37 +73,33 @@ func createOnePool(name, address string, connTimeout time.Duration, maxConns, ma
return p
}
func (cp *ConnPools) Update(cluster []string) {
cp.Lock()
defer cp.Unlock()
maxConns := Config.MaxConns
maxIdle := Config.MaxIdle
ct := time.Duration(cp.ConnTimeout) * time.Millisecond
newCluster := make(map[string]struct{})
for _, address := range cluster {
newCluster[address] = struct{}{}
if _, exist := cp.M[address]; exist {
continue
// Call will block until request failed or timeout.
func (cp *ConnPools) Call(addr, method string, args interface{}, resp interface{}) error {
var selectedPool *pool.ConnPool
var exists bool
// if address is empty, we will select a available pool from cp.P randomly.
// map-range function gets random keys order every time.
if addr == "" {
for _, p := range cp.P {
if p != nil {
selectedPool = p
break
}
}
cp.M[address] = createOnePool(address, address, ct, maxConns, maxIdle)
}
// delete invalid address from cp.M
for address := range cp.M {
if _, exists := newCluster[address]; !exists {
delete(cp.M, address)
} else {
selectedPool, exists = cp.Get(addr)
if !exists {
return fmt.Errorf("%s has no connection pool", addr)
}
}
}
// Call will block until the request failed or timeout
func (cp *ConnPools) Call(addr, method string, args, resp interface{}) error {
connPool, exists := cp.Get(addr)
if !exists {
return fmt.Errorf("%s has no connection pool", addr)
// make sure the selected pool alive.
if selectedPool == nil {
return fmt.Errorf("no connection pool available")
}
connPool := selectedPool
conn, err := connPool.Fetch()
if err != nil {
return fmt.Errorf("%s get connection fail: conn %v, err %v. proc: %s", addr, conn, err, connPool.Proc())
......@@ -129,11 +131,44 @@ func (cp *ConnPools) Call(addr, method string, args, resp interface{}) error {
func (cp *ConnPools) Get(address string) (*pool.ConnPool, bool) {
cp.RLock()
defer cp.RUnlock()
p, exists := cp.M[address]
p, exists := cp.P[address]
return p, exists
}
// RpcClient implements the io.Closer interface
func (cp *ConnPools) UpdatePools(addrs []string) []string {
cp.Lock()
defer cp.Unlock()
newAddrs := make([]string, 0)
if len(addrs) == 0 {
cp.P = make(map[string]*pool.ConnPool)
return newAddrs
}
addrMap := make(map[string]struct{})
ct := time.Duration(cp.ConnTimeout) * time.Millisecond
for _, addr := range addrs {
addrMap[addr] = struct{}{}
_, exists := cp.P[addr]
if exists {
continue
}
newAddrs = append(newAddrs, addr)
cp.P[addr] = createOnePool(addr, addr, ct, cp.MaxConns, cp.MaxIdle)
}
// remove a pool from cp.P
for addr := range cp.P {
if _, exists := addrMap[addr]; !exists {
delete(cp.P, addr)
}
}
return newAddrs
}
// RpcCient implements the io.Closer interface
type RpcClient struct {
cli *rpc.Client
name string
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册