diff --git a/benchmark/cluster/vgg16/vgg16_fluid.py b/benchmark/cluster/vgg16/vgg16_fluid.py index 8b29227cfab2a36d5b9f6d17b837b33da8d2a92e..7e0b8875444df11d9a55c5f35c7614c5f407df3f 100644 --- a/benchmark/cluster/vgg16/vgg16_fluid.py +++ b/benchmark/cluster/vgg16/vgg16_fluid.py @@ -239,8 +239,6 @@ def main(): t = fluid.DistributeTranspiler() t.transpile( - optimize_ops, - params_grads, trainer_id=args.task_index, pservers=args.ps_hosts, trainers=trainers) diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index e63411782a31368d636bea5dbf4bb0e4cb480e1f..079d90f585abd5d3094dfb281f955c754fd33474 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -137,8 +137,6 @@ def split_dense_variable(var_list, class DistributeTranspiler: def transpile(self, - optimize_ops, - params_grads, trainer_id, program=None, pservers="127.0.0.1:6174", @@ -169,11 +167,6 @@ class DistributeTranspiler: 4. append ops that should run on current server instance. 5. add listen_and_serv op - :param optimize_ops: op list of optimization, should be the - return value of Optimizer.minimize - :type optimize_ops: list - :param params_grads: list of tuple(weight, gradient) - :type params_grads: list :param trainer_id: one unique id for each trainer in a job. :type trainer_id: int :param program: program to transpile, default is default_main_program @@ -194,7 +187,6 @@ class DistributeTranspiler: program = default_main_program() self.origin_program = program self.trainer_num = trainers - self.optimize_ops = optimize_ops self.sync_mode = sync_mode # TODO(typhoonzero): currently trainer_id is fetched from cluster system # like Kubernetes, we should port this to use etcd later when developing @@ -202,6 +194,7 @@ class DistributeTranspiler: self.trainer_id = trainer_id pserver_endpoints = pservers.split(",") self.pserver_endpoints = pserver_endpoints + self.optimize_ops, params_grads = self._get_optimize_pass() # process lookup_table_op # 1. check all lookup_table_op is distributed @@ -1147,3 +1140,25 @@ class DistributeTranspiler: # we only need to append op for once break return lr_ops + + def _get_optimize_pass(self): + block = self.origin_program.global_block() + opt_ops = [] + params_grads = [] + for op in block.ops: + if self._is_opt_op(op): + opt_ops.append(op) + params_grads.append((self.origin_program.global_block().var( + op.input("Param")[0]), + self.origin_program.global_block().var( + op.input("Grad")[0]))) + elif op.type == "scale": + # for adam optimize op + for in_name in op.input_arg_names: + if in_name.startswith("beta1_pow_acc") or \ + in_name.startswith("beta2_pow_acc"): + opt_ops.append(op) + break + else: + pass + return opt_ops, params_grads diff --git a/python/paddle/fluid/tests/book/test_fit_a_line.py b/python/paddle/fluid/tests/book/test_fit_a_line.py index 6dfc2997ae0328a41fe22d13dfa8fc51d4d021a6..ecb34699af0dc14782601702ab8afedbca7e1bfd 100644 --- a/python/paddle/fluid/tests/book/test_fit_a_line.py +++ b/python/paddle/fluid/tests/book/test_fit_a_line.py @@ -80,12 +80,7 @@ def train(use_cuda, save_dirname, is_local): trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_image_classification.py b/python/paddle/fluid/tests/book/test_image_classification.py index 09f994c37020c4612223e4168be4cf535157f60b..8ff4f6d47a9a8cf166d6e111a2d89e99dee74f4c 100644 --- a/python/paddle/fluid/tests/book/test_image_classification.py +++ b/python/paddle/fluid/tests/book/test_image_classification.py @@ -189,12 +189,7 @@ def train(net_type, use_cuda, save_dirname, is_local): trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_label_semantic_roles.py b/python/paddle/fluid/tests/book/test_label_semantic_roles.py index d9cd76952e31f8185512ab45f9f3ab2ce7d9da48..50ef29c4572f1b12fe9793bbf037cd7fe71a9e53 100644 --- a/python/paddle/fluid/tests/book/test_label_semantic_roles.py +++ b/python/paddle/fluid/tests/book/test_label_semantic_roles.py @@ -259,12 +259,7 @@ def train(use_cuda, save_dirname=None, is_local=True): trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_machine_translation.py b/python/paddle/fluid/tests/book/test_machine_translation.py index 830d78df8b9e56b45f7e928562ef4b89e88f696d..46c6b9c29a265741a99655d5ac29244798f6fec2 100644 --- a/python/paddle/fluid/tests/book/test_machine_translation.py +++ b/python/paddle/fluid/tests/book/test_machine_translation.py @@ -231,12 +231,7 @@ def train_main(use_cuda, is_sparse, is_local=True): trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_recognize_digits.py b/python/paddle/fluid/tests/book/test_recognize_digits.py index 5ec6890c1b0dabd2804a92071b63c9610299e67c..c115aa4d7d6b514f9207543730e5e76cb0d2040c 100644 --- a/python/paddle/fluid/tests/book/test_recognize_digits.py +++ b/python/paddle/fluid/tests/book/test_recognize_digits.py @@ -162,12 +162,7 @@ def train(nn_type, trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_recommender_system.py b/python/paddle/fluid/tests/book/test_recommender_system.py index 2172c275b8082689a6ff5f2c3c27a2ff4e92275a..d022dedbff805d597b68b5a47f7931f2dd946615 100644 --- a/python/paddle/fluid/tests/book/test_recommender_system.py +++ b/python/paddle/fluid/tests/book/test_recommender_system.py @@ -261,12 +261,7 @@ def train(use_cuda, save_dirname, is_local=True): trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_understand_sentiment.py b/python/paddle/fluid/tests/book/test_understand_sentiment.py index dedd153778d7ad9caeb5fa7090a980bc7f177dea..241778e303036d068dc0a40e4574a02eb97ad134 100644 --- a/python/paddle/fluid/tests/book/test_understand_sentiment.py +++ b/python/paddle/fluid/tests/book/test_understand_sentiment.py @@ -213,12 +213,7 @@ def train(word_dict, trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_word2vec.py b/python/paddle/fluid/tests/book/test_word2vec.py index 8929779de9448d036e1528b64330b37463ab3988..6dec0f6857e86b4b9c1c67af934aa9bfdb1c3df7 100644 --- a/python/paddle/fluid/tests/book/test_word2vec.py +++ b/python/paddle/fluid/tests/book/test_word2vec.py @@ -145,12 +145,7 @@ def train(use_cuda, is_sparse, is_parallel, save_dirname, is_local=True): trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint,