From da960ada49694b09e2f74cfeec4774c394ce066f Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Wed, 2 May 2018 16:14:25 +0800 Subject: [PATCH] redefine distribute transpiler api --- benchmark/cluster/vgg16/vgg16_fluid.py | 2 -- python/paddle/fluid/distribute_transpiler.py | 31 ++++++++++++++----- .../fluid/tests/book/test_fit_a_line.py | 7 +---- .../tests/book/test_image_classification.py | 7 +---- .../tests/book/test_label_semantic_roles.py | 7 +---- .../tests/book/test_machine_translation.py | 7 +---- .../fluid/tests/book/test_recognize_digits.py | 7 +---- .../tests/book/test_recommender_system.py | 7 +---- .../tests/book/test_understand_sentiment.py | 7 +---- .../paddle/fluid/tests/book/test_word2vec.py | 7 +---- 10 files changed, 31 insertions(+), 58 deletions(-) diff --git a/benchmark/cluster/vgg16/vgg16_fluid.py b/benchmark/cluster/vgg16/vgg16_fluid.py index 8b29227cfab..7e0b8875444 100644 --- a/benchmark/cluster/vgg16/vgg16_fluid.py +++ b/benchmark/cluster/vgg16/vgg16_fluid.py @@ -239,8 +239,6 @@ def main(): t = fluid.DistributeTranspiler() t.transpile( - optimize_ops, - params_grads, trainer_id=args.task_index, pservers=args.ps_hosts, trainers=trainers) diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index e63411782a3..079d90f585a 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -137,8 +137,6 @@ def split_dense_variable(var_list, class DistributeTranspiler: def transpile(self, - optimize_ops, - params_grads, trainer_id, program=None, pservers="127.0.0.1:6174", @@ -169,11 +167,6 @@ class DistributeTranspiler: 4. append ops that should run on current server instance. 5. add listen_and_serv op - :param optimize_ops: op list of optimization, should be the - return value of Optimizer.minimize - :type optimize_ops: list - :param params_grads: list of tuple(weight, gradient) - :type params_grads: list :param trainer_id: one unique id for each trainer in a job. :type trainer_id: int :param program: program to transpile, default is default_main_program @@ -194,7 +187,6 @@ class DistributeTranspiler: program = default_main_program() self.origin_program = program self.trainer_num = trainers - self.optimize_ops = optimize_ops self.sync_mode = sync_mode # TODO(typhoonzero): currently trainer_id is fetched from cluster system # like Kubernetes, we should port this to use etcd later when developing @@ -202,6 +194,7 @@ class DistributeTranspiler: self.trainer_id = trainer_id pserver_endpoints = pservers.split(",") self.pserver_endpoints = pserver_endpoints + self.optimize_ops, params_grads = self._get_optimize_pass() # process lookup_table_op # 1. check all lookup_table_op is distributed @@ -1147,3 +1140,25 @@ class DistributeTranspiler: # we only need to append op for once break return lr_ops + + def _get_optimize_pass(self): + block = self.origin_program.global_block() + opt_ops = [] + params_grads = [] + for op in block.ops: + if self._is_opt_op(op): + opt_ops.append(op) + params_grads.append((self.origin_program.global_block().var( + op.input("Param")[0]), + self.origin_program.global_block().var( + op.input("Grad")[0]))) + elif op.type == "scale": + # for adam optimize op + for in_name in op.input_arg_names: + if in_name.startswith("beta1_pow_acc") or \ + in_name.startswith("beta2_pow_acc"): + opt_ops.append(op) + break + else: + pass + return opt_ops, params_grads diff --git a/python/paddle/fluid/tests/book/test_fit_a_line.py b/python/paddle/fluid/tests/book/test_fit_a_line.py index 6dfc2997ae0..ecb34699af0 100644 --- a/python/paddle/fluid/tests/book/test_fit_a_line.py +++ b/python/paddle/fluid/tests/book/test_fit_a_line.py @@ -80,12 +80,7 @@ def train(use_cuda, save_dirname, is_local): trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_image_classification.py b/python/paddle/fluid/tests/book/test_image_classification.py index 09f994c3702..8ff4f6d47a9 100644 --- a/python/paddle/fluid/tests/book/test_image_classification.py +++ b/python/paddle/fluid/tests/book/test_image_classification.py @@ -189,12 +189,7 @@ def train(net_type, use_cuda, save_dirname, is_local): trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_label_semantic_roles.py b/python/paddle/fluid/tests/book/test_label_semantic_roles.py index d9cd76952e3..50ef29c4572 100644 --- a/python/paddle/fluid/tests/book/test_label_semantic_roles.py +++ b/python/paddle/fluid/tests/book/test_label_semantic_roles.py @@ -259,12 +259,7 @@ def train(use_cuda, save_dirname=None, is_local=True): trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_machine_translation.py b/python/paddle/fluid/tests/book/test_machine_translation.py index 830d78df8b9..46c6b9c29a2 100644 --- a/python/paddle/fluid/tests/book/test_machine_translation.py +++ b/python/paddle/fluid/tests/book/test_machine_translation.py @@ -231,12 +231,7 @@ def train_main(use_cuda, is_sparse, is_local=True): trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_recognize_digits.py b/python/paddle/fluid/tests/book/test_recognize_digits.py index 5ec6890c1b0..c115aa4d7d6 100644 --- a/python/paddle/fluid/tests/book/test_recognize_digits.py +++ b/python/paddle/fluid/tests/book/test_recognize_digits.py @@ -162,12 +162,7 @@ def train(nn_type, trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_recommender_system.py b/python/paddle/fluid/tests/book/test_recommender_system.py index 2172c275b80..d022dedbff8 100644 --- a/python/paddle/fluid/tests/book/test_recommender_system.py +++ b/python/paddle/fluid/tests/book/test_recommender_system.py @@ -261,12 +261,7 @@ def train(use_cuda, save_dirname, is_local=True): trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_understand_sentiment.py b/python/paddle/fluid/tests/book/test_understand_sentiment.py index dedd153778d..241778e3030 100644 --- a/python/paddle/fluid/tests/book/test_understand_sentiment.py +++ b/python/paddle/fluid/tests/book/test_understand_sentiment.py @@ -213,12 +213,7 @@ def train(word_dict, trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_word2vec.py b/python/paddle/fluid/tests/book/test_word2vec.py index 8929779de94..6dec0f6857e 100644 --- a/python/paddle/fluid/tests/book/test_word2vec.py +++ b/python/paddle/fluid/tests/book/test_word2vec.py @@ -145,12 +145,7 @@ def train(use_cuda, is_sparse, is_parallel, save_dirname, is_local=True): trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, -- GitLab