提交 8f40b87b 编写于 作者: U Ulric Qin

collector call transfer rpc use long connection

上级 a74016f7
......@@ -73,6 +73,7 @@ func main() {
funcs.BuildMappers()
funcs.Collect()
funcs.InitRpcClients()
//插件采集
plugins.Detect()
......
package funcs
import (
"net/rpc"
"sync"
)
type RpcClientContainer struct {
M map[string]*rpc.Client
sync.RWMutex
}
var rpcClients *RpcClientContainer
func InitRpcClients() {
rpcClients = &RpcClientContainer{
M: make(map[string]*rpc.Client),
}
}
func (rcc *RpcClientContainer) Get(addr string) *rpc.Client {
rcc.RLock()
defer rcc.RUnlock()
client, has := rcc.M[addr]
if !has {
return nil
}
return client
}
// Put 返回的bool表示affected,确实把自己塞进去了
func (rcc *RpcClientContainer) Put(addr string, client *rpc.Client) bool {
rcc.Lock()
defer rcc.Unlock()
oc, has := rcc.M[addr]
if has && oc != nil {
return false
}
rcc.M[addr] = client
return true
}
......@@ -44,45 +44,25 @@ func Push(metricItems []*dataobj.MetricValue) error {
items = append(items, item)
}
var mh codec.MsgpackHandle
mh.MapType = reflect.TypeOf(map[string]interface{}(nil))
addrs := address.GetRPCAddresses("transfer")
count := len(addrs)
retry := 0
for {
for _, i := range rand.Perm(count) {
addr := addrs[i]
var conn net.Conn
conn, err = net.DialTimeout("tcp", addr, time.Millisecond*3000)
if err != nil {
logger.Error("dial transfer err:", err)
continue
}
var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser
io.Closer
*bufio.Reader
*bufio.Writer
}{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
rpcCodec := codec.MsgpackSpecRpc.ClientCodec(bufconn, &mh)
client := rpc.NewClientWithCodec(rpcCodec)
var reply dataobj.TransferResp
err = client.Call("Transfer.Push", items, &reply)
client.Close()
reply, err := rpcCall(addr, items)
if err != nil {
logger.Error(err)
continue
} else {
if reply.Msg != "ok" {
err = fmt.Errorf("some item push err", reply)
err = fmt.Errorf("some item push err: %s", reply.Msg)
logger.Error(err)
}
return err
}
}
time.Sleep(time.Millisecond * 500)
retry += 1
......@@ -94,6 +74,73 @@ func Push(metricItems []*dataobj.MetricValue) error {
return err
}
func rpcCall(addr string, items []*dataobj.MetricValue) (dataobj.TransferResp, error) {
var reply dataobj.TransferResp
var err error
affected := false
client := rpcClients.Get(addr)
if client == nil {
client, err = rpcClient(addr)
if err != nil {
return reply, err
} else {
affected = rpcClients.Put(addr, client)
}
}
timeout := time.Duration(8) * time.Second
done := make(chan error, 1)
go func() {
err := client.Call("Transfer.Push", items, &reply)
done <- err
}()
select {
case <-time.After(timeout):
logger.Warningf("rpc call timeout, transfer addr: %s", addr)
rpcClients.Put(addr, nil)
client.Close()
return reply, fmt.Errorf("%s rpc call timeout", addr)
case err := <-done:
if err != nil {
rpcClients.Put(addr, nil)
client.Close()
return reply, err
}
}
if !affected {
// 我尝试把自己这个client塞进map失败,说明已经有一个client塞进去了,那我自己用完了就关闭
client.Close()
}
return reply, nil
}
func rpcClient(addr string) (*rpc.Client, error) {
conn, err := net.DialTimeout("tcp", addr, time.Second*3)
if err != nil {
err = fmt.Errorf("dial transfer %s fail: %v", addr, err)
logger.Error(err)
return nil, err
}
var bufConn = struct {
io.Closer
*bufio.Reader
*bufio.Writer
}{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
var mh codec.MsgpackHandle
mh.MapType = reflect.TypeOf(map[string]interface{}(nil))
rpcCodec := codec.MsgpackSpecRpc.ClientCodec(bufConn, &mh)
client := rpc.NewClientWithCodec(rpcCodec)
return client, nil
}
func CounterToGauge(item *dataobj.MetricValue) error {
key := item.PK()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册