From 23b8346072f4bc88fd88cfac82933de501f9f739 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AD=A6=E6=AF=85?= Date: Mon, 17 Jul 2017 10:15:42 +0800 Subject: [PATCH] Fault tolerant distributed training, just work version, with etcd (#2849) * using etcd as fault tolerant training * update * workable version, ft not tested * small fix * update * remove TODO --- go/cmd/pserver/pserver.go | 2 +- go/master/client.go | 5 ++-- go/master/service.go | 1 + go/pserver/client/c/test/test_train.py | 28 ++++++++++++++++---- go/pserver/client/etcd_client.go | 5 ++-- go/pserver/etcd_client.go | 11 ++++---- paddle/api/PaddleAPI.h | 3 ++- paddle/api/ParameterUpdater.cpp | 5 ++-- paddle/scripts/docker/build.sh | 3 ++- paddle/trainer/NewRemoteParameterUpdater.cpp | 20 ++++++++++++-- paddle/trainer/NewRemoteParameterUpdater.h | 5 ++++ python/paddle/v2/dataset/common.py | 6 ++--- python/paddle/v2/master/client.py | 5 ++-- python/paddle/v2/optimizer.py | 8 +++--- python/paddle/v2/trainer.py | 6 +++-- 15 files changed, 81 insertions(+), 32 deletions(-) diff --git a/go/cmd/pserver/pserver.go b/go/cmd/pserver/pserver.go index b331b8126ca..652d7ba315d 100644 --- a/go/cmd/pserver/pserver.go +++ b/go/cmd/pserver/pserver.go @@ -40,7 +40,7 @@ func main() { idx = *index } else { e = pserver.NewEtcdClient(*etcdEndpoint, *numPservers, *etcdTimeout) - idx, err = e.Register() + idx, err = e.Register(*port) candy.Must(err) cp, err = pserver.NewCheckpointFromFile(*checkpointPath, idx, e) diff --git a/go/master/client.go b/go/master/client.go index a2ca3f3ef8c..de883bf4b9a 100644 --- a/go/master/client.go +++ b/go/master/client.go @@ -2,6 +2,7 @@ package master import ( "os" + "time" "github.com/PaddlePaddle/Paddle/go/connection" "github.com/PaddlePaddle/recordio" @@ -36,9 +37,9 @@ func (c *Client) getRecords() { for { t, err := c.getTask() if err != nil { - // TODO(helin): wait before move on with next // getTask call. - log.Errorln(err) + log.Errorf("Get task failed, sleep 3 seconds and continue, %s", err) + time.Sleep(3 * time.Second) continue } diff --git a/go/master/service.go b/go/master/service.go index a6050ab9943..9cef2270ce6 100644 --- a/go/master/service.go +++ b/go/master/service.go @@ -215,6 +215,7 @@ func readChunks(globPaths []string) ([]Chunk, error) { } count := index.NumChunks() + log.Infof("readChunks: file %s has %d chunks", path, count) for i := 0; i < count; i++ { chunk := Chunk{ Path: path, diff --git a/go/pserver/client/c/test/test_train.py b/go/pserver/client/c/test/test_train.py index d6922672f4c..e9264592b4f 100644 --- a/go/pserver/client/c/test/test_train.py +++ b/go/pserver/client/c/test/test_train.py @@ -1,5 +1,23 @@ import paddle.v2 as paddle import paddle.v2.dataset.uci_housing as uci_housing +import paddle.v2.master as master +import os +import cPickle as pickle + +etcd_ip = os.getenv("MASTER_IP", "127.0.0.1") +etcd_endpoint = "http://" + etcd_ip + ":2379" + + +def cloud_reader(): + print "connecting to master, etcd endpoints: ", etcd_endpoint + master_client = master.client(etcd_endpoint, 5, 64) + master_client.set_dataset( + ["/pfs/dlnel/public/dataset/uci_housing/uci_housing-*-of-*"]) + while 1: + r, e = master_client.next_record() + if not r: + break + yield pickle.loads(r) def main(): @@ -22,13 +40,13 @@ def main(): # create optimizer of new remote updater to pserver optimizer = paddle.optimizer.Momentum(momentum=0) - #TODO(zhihong) : replace optimizer with new OptimizerConfig - + print "etcd endoint: ", etcd_endpoint trainer = paddle.trainer.SGD(cost=cost, parameters=parameters, update_equation=optimizer, is_local=False, - pserver_spec="localhost:3000") + pserver_spec=etcd_endpoint, + use_etcd=True) # event_handler to print training and testing info def event_handler(event): @@ -47,11 +65,11 @@ def main(): print "Test %d, %.2f" % (event.pass_id, result.cost) # training + # NOTE: use uci_housing.train() as reader for non-paddlecloud training trainer.train( reader=paddle.batch( paddle.reader.shuffle( - uci_housing.train(), buf_size=500), - batch_size=2), + cloud_reader, buf_size=500), batch_size=2), feeding={'x': 0, 'y': 1}, event_handler=event_handler, diff --git a/go/pserver/client/etcd_client.go b/go/pserver/client/etcd_client.go index 1fd3479aa88..8eb2a4f4511 100644 --- a/go/pserver/client/etcd_client.go +++ b/go/pserver/client/etcd_client.go @@ -12,6 +12,7 @@ import ( ) const ( + // DefaultEtcdTimeout is the default etcd timeout DefaultEtcdTimeout time.Duration = 5 * time.Second ) @@ -66,12 +67,12 @@ func (p *EtcdClient) List() []Server { for { for i := 0; i < psDesired; i++ { ctx, cancel := context.WithTimeout(context.Background(), p.timeout) - cancel() psKey := pserver.PsPath + strconv.Itoa(i) log.Debugf("checking %s", psKey) resp, err := p.client.Get(ctx, psKey) + cancel() if err != nil { - log.Infof("Get psKey= %s error, %v", psKey, err) + log.Infof("Get psKey=%s error, %v", psKey, err) time.Sleep(p.timeout) continue } diff --git a/go/pserver/etcd_client.go b/go/pserver/etcd_client.go index 4a694b97f47..66af4fa0b48 100644 --- a/go/pserver/etcd_client.go +++ b/go/pserver/etcd_client.go @@ -49,7 +49,7 @@ func NewEtcdClient(endpoints string, numPservers int, timeout time.Duration) *Et // Register registers the pserver on etcd // // Register returns the index of the current pserver. -func (e *EtcdClient) Register() (int, error) { +func (e *EtcdClient) Register(port int) (int, error) { var err error e.externalIP, err = networkhelper.GetExternalIP() @@ -116,7 +116,7 @@ func (e *EtcdClient) Register() (int, error) { for { ctx, cancel := context.WithTimeout(context.Background(), time.Second) var err error - pserverIdx, err = e.registerPserverEtcd(ctx) + pserverIdx, err = e.registerPserverEtcd(ctx, port) cancel() if err != nil { log.Warn(err) @@ -140,7 +140,7 @@ func (e *EtcdClient) initDesiredPservers(ctx context.Context, numPservers int) ( } // registerPserverEtcd registers pserver node on etcd using transaction. -func (e *EtcdClient) registerPserverEtcd(ctx context.Context) (int, error) { +func (e *EtcdClient) registerPserverEtcd(ctx context.Context, port int) (int, error) { var idx int _, err := concurrency.NewSTM(e.etcdClient, func(c concurrency.STM) error { registered := false @@ -156,8 +156,9 @@ func (e *EtcdClient) registerPserverEtcd(ctx context.Context) (int, error) { log.Fatal(err) } // find the first id and write info - c.Put(psKey, e.externalIP, clientv3.WithLease(resp.ID)) - log.Debugf("set pserver node %s with value %s", psKey, e.externalIP) + pserverAddr := e.externalIP + ":" + strconv.Itoa(port) + c.Put(psKey, pserverAddr, clientv3.WithLease(resp.ID)) + log.Debugf("set pserver node %s with value %s", psKey, pserverAddr) ch, kaerr := e.etcdClient.KeepAlive(context.TODO(), resp.ID) if kaerr != nil { log.Errorf("keepalive etcd node error: %v", kaerr) diff --git a/paddle/api/PaddleAPI.h b/paddle/api/PaddleAPI.h index 5fb3d1c73bc..0b9b83d4297 100644 --- a/paddle/api/PaddleAPI.h +++ b/paddle/api/PaddleAPI.h @@ -843,7 +843,8 @@ public: bool useSparseUpdater); static ParameterUpdater* createNewRemoteUpdater( OptimizationConfig* config, - const std::string pserverSpec) throw(UnsupportError); + const std::string pserverSpec, + const bool useEtcd) throw(UnsupportError); ~ParameterUpdater(); /** diff --git a/paddle/api/ParameterUpdater.cpp b/paddle/api/ParameterUpdater.cpp index 1aaefdfb810..5934cb898b5 100644 --- a/paddle/api/ParameterUpdater.cpp +++ b/paddle/api/ParameterUpdater.cpp @@ -33,11 +33,12 @@ ParameterUpdater *ParameterUpdater::createLocalUpdater( ParameterUpdater *ParameterUpdater::createNewRemoteUpdater( OptimizationConfig *config, - const std::string pserverSpec) throw(UnsupportError) { + const std::string pserverSpec, + const bool useEtcd) throw(UnsupportError) { #ifndef PADDLE_WITHOUT_GOLANG auto updater = new ParameterUpdater(); updater->m->updater.reset(new paddle::NewRemoteParameterUpdater( - config->m->getConfig(), pserverSpec)); + config->m->getConfig(), pserverSpec, useEtcd)); return updater; #else throw UnsupportError(); diff --git a/paddle/scripts/docker/build.sh b/paddle/scripts/docker/build.sh index ab60f1a38dd..3860facb099 100644 --- a/paddle/scripts/docker/build.sh +++ b/paddle/scripts/docker/build.sh @@ -155,7 +155,8 @@ RUN apt-get update &&\ paddle version ${DOCKERFILE_CUDNN_DSO} ${DOCKERFILE_GPU_ENV} - +ADD go/cmd/pserver/pserver /usr/bin/ +ADD go/cmd/master/master /usr/bin/ # default command shows the paddle version and exit CMD ["paddle", "version"] EOF diff --git a/paddle/trainer/NewRemoteParameterUpdater.cpp b/paddle/trainer/NewRemoteParameterUpdater.cpp index b359d9da216..a830ceba577 100644 --- a/paddle/trainer/NewRemoteParameterUpdater.cpp +++ b/paddle/trainer/NewRemoteParameterUpdater.cpp @@ -28,6 +28,17 @@ NewRemoteParameterUpdater::NewRemoteParameterUpdater( newGradients_(nullptr), pserverSpec_(pserverSpec) {} +NewRemoteParameterUpdater::NewRemoteParameterUpdater( + const OptimizationConfig &config, + const std::string pserverSpec, + const bool useEtcd) + : trainerConfig_(config), + parameterClient_(-1), + newParameters_(nullptr), + newGradients_(nullptr), + pserverSpec_(pserverSpec), + useEtcd_(useEtcd) {} + void NewRemoteParameterUpdater::init( const std::vector ¶meters) { ParameterUpdater::init(parameters); @@ -38,8 +49,13 @@ void NewRemoteParameterUpdater::init( } // create parameter server client. - parameterClient_ = paddle_new_pserver_client((char *)pserverSpec_.c_str(), - FLAGS_trainer_id == 0); + if (useEtcd_) { + parameterClient_ = paddle_new_etcd_pserver_client( + (char *)pserverSpec_.c_str(), FLAGS_trainer_id == 0); + } else { + parameterClient_ = paddle_new_pserver_client((char *)pserverSpec_.c_str(), + FLAGS_trainer_id == 0); + } // init new parameter and gradient. newParameters_ = initNewParameter(PARAMETER_VALUE); diff --git a/paddle/trainer/NewRemoteParameterUpdater.h b/paddle/trainer/NewRemoteParameterUpdater.h index dfed00bc216..6223ba427c9 100644 --- a/paddle/trainer/NewRemoteParameterUpdater.h +++ b/paddle/trainer/NewRemoteParameterUpdater.h @@ -32,6 +32,9 @@ class NewRemoteParameterUpdater : public ParameterUpdater { public: NewRemoteParameterUpdater(const OptimizationConfig& config, const std::string pserverSpec); + NewRemoteParameterUpdater(const OptimizationConfig& config, + const std::string pserverSpec, + const bool useEtcd); ~NewRemoteParameterUpdater() { releaseNewParameter(newParameters_); releaseNewParameter(newGradients_); @@ -111,6 +114,8 @@ protected: paddle_parameter** newGradients_; /// the specification of parameter server "host1:port,host1:port" std::string pserverSpec_; + /// true if pserverSpec_ is etcd endpoint, else pserverSpec_ is pserver addr + bool useEtcd_; }; } // namespace paddle diff --git a/python/paddle/v2/dataset/common.py b/python/paddle/v2/dataset/common.py index 4a2eb59c340..a7990222749 100644 --- a/python/paddle/v2/dataset/common.py +++ b/python/paddle/v2/dataset/common.py @@ -22,6 +22,8 @@ import importlib import paddle.v2.dataset import cPickle import glob +import cPickle as pickle +import random __all__ = [ 'DATA_HOME', 'download', 'md5file', 'split', 'cluster_files_reader', @@ -170,8 +172,6 @@ def convert(output_path, name_prefix, max_lines_to_shuffle=1000): import recordio - import cPickle as pickle - import random """ Convert data from reader to recordio format files. @@ -201,7 +201,7 @@ def convert(output_path, def write_data(w, lines): random.shuffle(lines) for i, d in enumerate(lines): - d = pickle.dumps(d, pickle.HIGHEST_PROTOCOL) + d = cPickle.dumps(d) w[i % num_shards].write(d) w = open_writers() diff --git a/python/paddle/v2/master/client.py b/python/paddle/v2/master/client.py index 70f9e43c968..4c041fb5099 100644 --- a/python/paddle/v2/master/client.py +++ b/python/paddle/v2/master/client.py @@ -10,8 +10,9 @@ class client(object): client is a client to the master server. """ - def __init__(self, addr, buf_size): - self.c = lib.paddle_new_master_client(addr, buf_size) + def __init__(self, etcd_endpoints, timeout, buf_size): + self.c = lib.paddle_new_etcd_master_client(etcd_endpoints, timeout, + buf_size) def close(self): lib.paddle_release_master_client(self.c) diff --git a/python/paddle/v2/optimizer.py b/python/paddle/v2/optimizer.py index b6ee51cfe89..755b1e09d7f 100644 --- a/python/paddle/v2/optimizer.py +++ b/python/paddle/v2/optimizer.py @@ -46,12 +46,12 @@ class Optimizer(object): return swig_api.ParameterUpdater.createRemoteUpdater( self.__opt_conf__, pass_num, use_sparse_updater) - def __create_new_remote_updater__(self, pserver_spec): + def __create_new_remote_updater__(self, pserver_spec, use_etcd): return swig_api.ParameterUpdater.createNewRemoteUpdater( - self.__opt_conf__, pserver_spec) + self.__opt_conf__, pserver_spec, use_etcd) def create_updater(self, is_local, num_passes, use_sparse_updater, - pserver_spec): + pserver_spec, use_etcd): """ create proper parameter_updater by configuration. :param is_local: create local or remote parameter updater @@ -77,7 +77,7 @@ class Optimizer(object): num_passes, use_sparse_updater) else: parameter_updater = self.__create_new_remote_updater__( - pserver_spec) + pserver_spec, use_etcd) return parameter_updater diff --git a/python/paddle/v2/trainer.py b/python/paddle/v2/trainer.py index 92fdf98e903..76bae0bb12b 100644 --- a/python/paddle/v2/trainer.py +++ b/python/paddle/v2/trainer.py @@ -45,7 +45,8 @@ class SGD(object): update_equation, extra_layers=None, is_local=True, - pserver_spec=None): + pserver_spec=None, + use_etcd=True): if not isinstance(parameters, v2_parameters.Parameters): raise TypeError('parameters should be parameters') @@ -61,6 +62,7 @@ class SGD(object): self.__topology_in_proto__ = topology.proto() self.__is_local__ = is_local self.__pserver_spec__ = pserver_spec + self.__use_etcd__ = use_etcd self.__use_sparse_updater__ = self.__topology__.use_sparse_updater() # # In local mode, disable sparse_remote_update. @@ -127,7 +129,7 @@ class SGD(object): self.__parameter_updater__ = self.__optimizer__.create_updater( self.__is_local__, num_passes, self.__use_sparse_updater__, - self.__pserver_spec__) + self.__pserver_spec__, self.__use_etcd__) self.__parameter_updater__.init(self.__gradient_machine__) self.__gradient_machine__.start() -- GitLab