conn.go 1.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
package connection

import (
	"errors"
	"net/rpc"
	"sync"
)

// TODO(helin): add TCP re-connect logic

// Conn is a connection to a parameter server
type Conn struct {
	mu       sync.Mutex
	client   *rpc.Client
	waitConn chan struct{}
}

// New creates a new connection.
func New() *Conn {
	c := &Conn{}
	return c
}

24 25 26 27 28 29 30 31 32 33 34 35
// Close closes the connection.
func (c *Conn) Close() error {
	c.mu.Lock()
	defer c.mu.Unlock()

	if c.client == nil {
		return nil
	}

	return c.client.Close()
}

36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
// Connect connects the connection to a address.
func (c *Conn) Connect(addr string) error {
	c.mu.Lock()
	if c.client != nil {
		err := c.client.Close()
		if err != nil {
			c.mu.Unlock()
			return err
		}

		c.client = nil
	}
	c.mu.Unlock()

	client, err := rpc.DialHTTP("tcp", addr)
	if err != nil {
		return err
	}

	c.mu.Lock()
	defer c.mu.Unlock()

	if c.client == nil {
		c.client = client
		if c.waitConn != nil {
			close(c.waitConn)
			c.waitConn = nil
		}
	} else {
		return errors.New("client already set from a concurrent goroutine")
	}

	return nil
}

71 72 73
// TODO(helin): refactor Call to be able to perform given retry
// policy.

74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
// Call make a RPC call.
//
// Call will be blocked until the connection to remote RPC service
// being established.
func (c *Conn) Call(serviceMethod string, args interface{}, reply interface{}) error {
	c.mu.Lock()
	client := c.client
	var waitCh chan struct{}
	if client == nil {
		if c.waitConn != nil {
			waitCh = c.waitConn
		} else {
			waitCh = make(chan struct{})
			c.waitConn = waitCh
		}
	}
	c.mu.Unlock()

	if waitCh != nil {
		// wait until new connection being established
		<-waitCh
		return c.Call(serviceMethod, args, reply)
	}

	return client.Call(serviceMethod, args, reply)
}