service.go 7.6 KB
Newer Older
1 2 3
package pserver

import (
W
wuyi05 已提交
4
	"context"
5 6
	"errors"
	"fmt"
W
wuyi05 已提交
7 8
	"strconv"
	"strings"
9
	"sync"
W
wuyi05 已提交
10 11
	"time"

W
wuyi05 已提交
12
	"github.com/PaddlePaddle/Paddle/go/utils/networkhelper"
W
wuyi05 已提交
13 14 15
	"github.com/coreos/etcd/clientv3"
	"github.com/coreos/etcd/clientv3/concurrency"
	log "github.com/sirupsen/logrus"
16 17 18 19 20
)

// ElementType is the type of elements of a Parameter.
type ElementType int

21 22 23 24
const (
	AlreadyInitialized = "pserver already initialized"
	Uninitialized      = "pserver not fully initialized"
)
25 26 27 28 29 30 31 32 33 34 35

// Supported element types
const (
	Int32 ElementType = iota
	UInt32
	Int64
	UInt64
	Float32
	Float64
)

W
wuyi05 已提交
36 37 38
// PsDesired is etcd path for store desired pserver count
const PsDesired = "/ps_desired"

39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
// Parameter is a piece of data to sync with the parameter server.
type Parameter struct {
	Name        string
	ElementType ElementType
	Content     []byte
}

// ParameterWithConfig contains the parameter and the configuration.
type ParameterWithConfig struct {
	Param  Parameter
	Config []byte // parameter configuration in Proto Buffer format
}

// Gradient is the gradient of the parameter.
type Gradient Parameter

H
Helin Wang 已提交
55
// Service is the RPC service for pserver.
56 57 58 59 60 61
type Service struct {
	initialized chan struct{}

	mu       sync.Mutex
	opt      *optimizer
	paramMap map[string]Parameter
W
wuyi05 已提交
62 63 64 65 66 67 68 69 70 71

	etcdEndpoints string
	etcdClient    *clientv3.Client
	// etcdTimeout is also used as retry intervals.
	etcdTimeout time.Duration
	// desired number of pservers in the job.
	// assume desired will not change during one training job.
	desired int
	// FIXME: ensure GetExternalIP gets the correct ip for trainers to connect.
	externalIP string
72 73
}

W
wuyi05 已提交
74 75
// NewService creates a new service, will bypass etcd registration if no
// endpoints specified.
Y
yi.wu 已提交
76
func NewService(endpoints string, numPservers int, timeout time.Duration) (*Service, error) {
Q
qiaolongfei 已提交
77
	s := &Service{opt: newOptimizer(sgd, 0.005)}
78 79
	s.paramMap = make(map[string]Parameter)
	s.initialized = make(chan struct{})
W
wuyi05 已提交
80 81 82 83
	s.etcdEndpoints = endpoints
	s.etcdTimeout = timeout

	var err error
W
wuyi05 已提交
84
	s.externalIP, err = networkhelper.GetExternalIP()
W
wuyi05 已提交
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
	if err != nil {
		return nil, err
	}

	if endpoints != "" {
		// initialize connection to etcd, try
		ep := strings.Split(s.etcdEndpoints, ",")
		for {
			cli, err := clientv3.New(clientv3.Config{
				Endpoints:   ep,
				DialTimeout: s.etcdTimeout,
			})
			if err != nil {
				log.Errorf("connect to etcd error: %v", err)
				time.Sleep(s.etcdTimeout)
				continue
			}
			s.etcdClient = cli
			log.Debugf("inited client to %s", s.etcdEndpoints)
			break
		}
Y
yi.wu 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
		// init /ps_desired using transaction, for multiple pservers may want to write
		// it at the same time.
		for {
			ctx, cancel := context.WithTimeout(context.Background(), time.Second)
			_, err := s.initDesiredPsercers(ctx, numPservers)
			cancel()
			if err != nil {
				log.Warn(err)
				time.Sleep(s.etcdTimeout)
				continue
			}
			break
		}
		// TODO: when implementing extending or reducing pservers, /ps_desired is
		// changed, then we need to watch /ps_desired node for events. For now, just
		// write once when init and read from it.
W
wuyi05 已提交
122 123 124
		// wait and set s.desired init value
		for {
			ctx, cancel := context.WithTimeout(context.Background(), time.Second)
W
wuyi05 已提交
125
			resp, err := s.etcdClient.Get(ctx, PsDesired)
W
wuyi05 已提交
126 127
			cancel()
			if err != nil {
W
wuyi05 已提交
128
				log.Errorf("getting %s error: %v", PsDesired, err)
W
wuyi05 已提交
129 130 131
				time.Sleep(s.etcdTimeout)
				continue
			}
W
wuyi05 已提交
132 133 134 135 136 137 138
			if len(resp.Kvs) != 0 {
				s.desired, err = strconv.Atoi(string(resp.Kvs[0].Value))
				if err != nil {
					log.Errorf("value of %s invalid %v\n", PsDesired, err)
					time.Sleep(s.etcdTimeout)
					// NOTE: wait util ps_desired value change
					continue
W
wuyi05 已提交
139
				}
W
wuyi05 已提交
140 141 142 143 144 145 146 147 148 149 150 151
				break
			}
		}
		// try register pserver node on etcd
		for {
			ctx, cancel := context.WithTimeout(context.Background(), time.Second)
			_, err := s.registerPserverEtcd(ctx)
			cancel()
			if err != nil {
				log.Warn(err)
				time.Sleep(s.etcdTimeout)
				continue
W
wuyi05 已提交
152 153 154 155 156 157 158 159
			}
			break
		}
	} // if endpoints != ""
	// Bypass etcd registration if no endpoints specified
	return s, nil
}

Y
yi.wu 已提交
160 161 162 163 164 165 166 167 168 169
func (s *Service) initDesiredPsercers(ctx context.Context, numPservers int) (*clientv3.TxnResponse, error) {
	return concurrency.NewSTM(s.etcdClient, func(c concurrency.STM) error {
		dsStr := c.Get(PsDesired)
		if dsStr == "" {
			c.Put(PsDesired, strconv.Itoa(numPservers))
		}
		return nil
	}, concurrency.WithAbortContext(ctx), concurrency.WithIsolation(concurrency.RepeatableReads))
}

W
wuyi05 已提交
170
// registerPserverEtcd registers pserver node on etcd using transaction.
W
wuyi05 已提交
171 172 173
func (s *Service) registerPserverEtcd(ctx context.Context) (*clientv3.TxnResponse, error) {
	return concurrency.NewSTM(s.etcdClient, func(c concurrency.STM) error {
		registered := false
W
wuyi05 已提交
174 175 176 177 178 179 180
		for i := 0; i < s.desired; i++ {
			psKey := "/ps/" + strconv.Itoa(i)
			log.Debugf("checking %s", psKey)
			ps := c.Get(psKey)
			log.Debugf("got value (%s) for key: %s", ps, psKey)

			if ps == "" {
W
wuyi05 已提交
181 182 183 184
				resp, err := s.etcdClient.Grant(context.TODO(), 5)
				if err != nil {
					log.Fatal(err)
				}
W
wuyi05 已提交
185 186 187
				// find the first id and write info
				c.Put(psKey, s.externalIP, clientv3.WithLease(resp.ID))
				log.Debugf("set pserver node %s with value %s", psKey, s.externalIP)
H
Helin Wang 已提交
188
				ch, kaerr := s.etcdClient.KeepAlive(context.TODO(), resp.ID)
W
wuyi05 已提交
189 190 191 192
				if kaerr != nil {
					log.Errorf("keepalive etcd node error: %v", kaerr)
					return kaerr
				}
H
Helin Wang 已提交
193 194 195 196 197 198 199

				// Eat the keep alive message so etcd
				// will not expire the lease.
				go func(ch <-chan *clientv3.LeaseKeepAliveResponse) {
					ka := <-ch
					log.Debugf("keepalive: %d\n", ka.TTL)
				}(ch)
W
wuyi05 已提交
200 201
				log.Debug("register finished")
				registered = true
W
wuyi05 已提交
202 203 204
				break
			}
		}
W
wuyi05 已提交
205 206 207 208 209
		if registered == true {
			return nil
		}
		return errors.New("not registerd, may due to already have enough pservers")
	}, concurrency.WithAbortContext(ctx), concurrency.WithIsolation(concurrency.RepeatableReads))
210 211
}

H
Helin Wang 已提交
212
// InitParam initializes a parameter.
213 214 215
func (s *Service) InitParam(paramWithConfigs ParameterWithConfig, dummy *int) error {
	select {
	case <-s.initialized:
216
		return errors.New(AlreadyInitialized)
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
	default:
	}

	// TODO(helin): parse parameter config

	s.mu.Lock()
	defer s.mu.Unlock()

	// TODO(helin): check if paramWithConfigs.Param.Content is
	// properly memory aligned, if not, make copy to a memory
	// aligned region.
	s.paramMap[paramWithConfigs.Param.Name] = paramWithConfigs.Param
	return nil
}

H
Helin Wang 已提交
232 233
// FinishInitParams tells the parameter server that the parameter
// initialization has finished.
234 235 236
func (s *Service) FinishInitParams(dummy0 int, dummy1 *int) error {
	select {
	case <-s.initialized:
237
		return errors.New(AlreadyInitialized)
238 239 240 241 242 243 244
	default:
	}

	close(s.initialized)
	return nil
}

245
// SendGrad sends gradient to parameter servers for parameter
H
Helin Wang 已提交
246
// optimization.
247
func (s *Service) SendGrad(g Gradient, dummy *int) error {
248 249 250
	select {
	case <-s.initialized:
	default:
251
		return errors.New(Uninitialized)
252
	}
253 254

	s.mu.Lock()
H
Helin Wang 已提交
255
	defer s.mu.Unlock()
256

257 258 259
	p, ok := s.paramMap[g.Name]
	if !ok {
		return fmt.Errorf("parameter: %s does not exist", g.Name)
260 261
	}

262
	return s.opt.UpdateParameter(p, g)
263 264
}

265 266
// GetParam gets parameters from the parameter server.
func (s *Service) GetParam(name string, parameter *Parameter) error {
267 268
	<-s.initialized
	s.mu.Lock()
H
Helin Wang 已提交
269
	defer s.mu.Unlock()
270

271 272 273
	p, ok := s.paramMap[name]
	if !ok {
		return fmt.Errorf("parameter: %s does not exist", name)
274 275
	}

276 277 278 279 280 281 282 283
	// The parameter content (a byte slice) may change
	// during RPC serialization due to write from other
	// goroutine, we allow it since mini-batch based deep
	// learning optimization methods are stochastic in
	// nature. This race condition is allowed deliberately
	// to save the program from making a copy of the
	// paramter content.
	*parameter = p
284 285 286
	return nil
}

H
Helin Wang 已提交
287 288
// Save tells the parameter server to save parameters.
func (s *Service) Save(path string, dummy *int) error {
289 290 291 292 293
	<-s.initialized

	// TODO
	return nil
}