conn.go 2.4 KB
Newer Older
D
dongzhihong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

15 16 17 18 19 20
package connection

import (
	"errors"
	"net/rpc"
	"sync"
H
Helin Wang 已提交
21 22

	log "github.com/sirupsen/logrus"
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
)

// TODO(helin): add TCP re-connect logic

// Conn is a connection to a parameter server
type Conn struct {
	mu       sync.Mutex
	client   *rpc.Client
	waitConn chan struct{}
}

// New creates a new connection.
func New() *Conn {
	c := &Conn{}
	return c
}

40 41 42 43 44 45 46 47 48 49 50 51
// Close closes the connection.
func (c *Conn) Close() error {
	c.mu.Lock()
	defer c.mu.Unlock()

	if c.client == nil {
		return nil
	}

	return c.client.Close()
}

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
// Connect connects the connection to a address.
func (c *Conn) Connect(addr string) error {
	c.mu.Lock()
	if c.client != nil {
		err := c.client.Close()
		if err != nil {
			c.mu.Unlock()
			return err
		}

		c.client = nil
	}
	c.mu.Unlock()

	client, err := rpc.DialHTTP("tcp", addr)
	if err != nil {
		return err
	}

	c.mu.Lock()
	defer c.mu.Unlock()

	if c.client == nil {
		c.client = client
		if c.waitConn != nil {
			close(c.waitConn)
			c.waitConn = nil
		}
	} else {
H
Helin Wang 已提交
81 82
		err := client.Close()
		if err != nil {
H
Helin Wang 已提交
83
			log.Errorln(err)
H
Helin Wang 已提交
84 85
		}

86 87 88 89 90 91
		return errors.New("client already set from a concurrent goroutine")
	}

	return nil
}

92 93 94
// TODO(helin): refactor Call to be able to perform given retry
// policy.

95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
// Call make a RPC call.
//
// Call will be blocked until the connection to remote RPC service
// being established.
func (c *Conn) Call(serviceMethod string, args interface{}, reply interface{}) error {
	c.mu.Lock()
	client := c.client
	var waitCh chan struct{}
	if client == nil {
		if c.waitConn != nil {
			waitCh = c.waitConn
		} else {
			waitCh = make(chan struct{})
			c.waitConn = waitCh
		}
	}
	c.mu.Unlock()

	if waitCh != nil {
		// wait until new connection being established
		<-waitCh
		return c.Call(serviceMethod, args, reply)
	}

	return client.Call(serviceMethod, args, reply)
}