diff --git a/benchmark/cluster/vgg16/vgg16_fluid.py b/benchmark/cluster/vgg16/vgg16_fluid.py index 6c47f6535c3cfc31ba165ccec07ac95a9bfd0a7f..6c7d2c10363d3e311dfae455f3dd8fcfc51077a0 100644 --- a/benchmark/cluster/vgg16/vgg16_fluid.py +++ b/benchmark/cluster/vgg16/vgg16_fluid.py @@ -240,8 +240,6 @@ def main(): t = fluid.DistributeTranspiler() t.transpile( - optimize_ops, - params_grads, trainer_id=args.task_index, pservers=args.ps_hosts, trainers=trainers) diff --git a/doc/v2/howto/cluster/multi_cluster/index_en.rst b/doc/v2/howto/cluster/multi_cluster/index_en.rst index b69bd5b2dbf1967d65558da06812d76f431c1d5a..9bc1eb2e3796d95dd69b165e916e263ea34b87f6 100644 --- a/doc/v2/howto/cluster/multi_cluster/index_en.rst +++ b/doc/v2/howto/cluster/multi_cluster/index_en.rst @@ -8,28 +8,28 @@ The user's cluster environment is not the same. To facilitate everyone's deploym .. toctree:: :maxdepth: 1 - k8s_cn.md - k8s_distributed_cn.md + k8s_en.md + k8s_distributed_en.md `OpenMPI `_ is a mature high-performance parallel computing framework, which is widely used in the field of HPC. The following guide describes how to use OpenMPI to build PaddlePaddle's cluster training task: .. toctree:: :maxdepth: 1 - openmpi_cn.md + openmpi_en.md `Fabric `_ is a convenient tool for program deployment and management. We provide a way to deploy and manage with Fabric. If you want to know more about it, please read the following guidelines: .. toctree:: :maxdepth: 1 - fabric_cn.md + fabric_en.md We also support the deployment of PaddlePaddle on AWS. Learn more about: .. toctree:: :maxdepth: 1 - k8s_aws_cn.md + k8s_aws_en.md -The examples can be found under `cluster_train_v2 `_ . \ No newline at end of file +The examples can be found under `cluster_train_v2 `_ . diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index bd325bd2574afa9c652a9613bdb3bf0b6a93b4a3..dcf4e2a8e013f8e4e70ac1335890e7df0a050b5f 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -21,14 +21,15 @@ import executor from executor import * import trainer -from trainer import * +from trainer import Trainer +from trainer import BeginEpochEvent +from trainer import EndEpochEvent +from trainer import BeginStepEvent +from trainer import EndStepEvent import inferencer from inferencer import Inferencer -import params -from params import Params - import io import evaluator import initializer @@ -57,7 +58,7 @@ from parallel_executor import ParallelExecutor Tensor = LoDTensor __all__ = framework.__all__ + executor.__all__ + concurrency.__all__ +\ - trainer.__all__ + inferencer.__all__ + params.__all__ + [ + trainer.__all__ + inferencer.__all__ + [ 'io', 'initializer', 'layers', diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index e63411782a31368d636bea5dbf4bb0e4cb480e1f..c180e7b21042a1ceb2651d8f7a48a02c1abfd02c 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -137,8 +137,6 @@ def split_dense_variable(var_list, class DistributeTranspiler: def transpile(self, - optimize_ops, - params_grads, trainer_id, program=None, pservers="127.0.0.1:6174", @@ -169,11 +167,6 @@ class DistributeTranspiler: 4. append ops that should run on current server instance. 5. add listen_and_serv op - :param optimize_ops: op list of optimization, should be the - return value of Optimizer.minimize - :type optimize_ops: list - :param params_grads: list of tuple(weight, gradient) - :type params_grads: list :param trainer_id: one unique id for each trainer in a job. :type trainer_id: int :param program: program to transpile, default is default_main_program @@ -194,7 +187,6 @@ class DistributeTranspiler: program = default_main_program() self.origin_program = program self.trainer_num = trainers - self.optimize_ops = optimize_ops self.sync_mode = sync_mode # TODO(typhoonzero): currently trainer_id is fetched from cluster system # like Kubernetes, we should port this to use etcd later when developing @@ -202,6 +194,7 @@ class DistributeTranspiler: self.trainer_id = trainer_id pserver_endpoints = pservers.split(",") self.pserver_endpoints = pserver_endpoints + self.optimize_ops, params_grads = self._get_optimize_pass() # process lookup_table_op # 1. check all lookup_table_op is distributed @@ -408,11 +401,8 @@ class DistributeTranspiler: # HACK: optimization global ops only used to scale beta1 and beta2 # replace it with dependency engine. for op in self.optimize_ops: - if op.type == "scale": - for in_name in op.input_arg_names: - if in_name.startswith("beta1_pow_acc") or \ - in_name.startswith("beta2_pow_acc"): - global_ops.append(op) + if self._is_adam_connected_op(op): + global_ops.append(op) def __append_optimize_op__(op, block, grad_to_block_id): if self._is_opt_op(op): @@ -1147,3 +1137,32 @@ class DistributeTranspiler: # we only need to append op for once break return lr_ops + + def _get_optimize_pass(self): + block = self.origin_program.global_block() + opt_ops = [] + params_grads = [] + for op in block.ops: + if self._is_opt_op(op): + opt_ops.append(op) + params_grads.append((self.origin_program.global_block().var( + op.input("Param")[0]), + self.origin_program.global_block().var( + op.input("Grad")[0]))) + elif self._is_adam_connected_op(op): + opt_ops.append(op) + else: + pass + return opt_ops, params_grads + + def _is_adam_connected_op(self, op): + """ + A hack function to determinate whether the input operator + is connected to optimize operator. + """ + if op.type == "scale": + for in_name in op.input_arg_names: + if in_name.startswith("beta1_pow_acc") or \ + in_name.startswith("beta2_pow_acc"): + return True + return False diff --git a/python/paddle/fluid/inferencer.py b/python/paddle/fluid/inferencer.py index 3ea50bf196d00152e6579623c981ecbfb57b8e3b..58e027695a7100245dd424583e2cedeed3d165e6 100644 --- a/python/paddle/fluid/inferencer.py +++ b/python/paddle/fluid/inferencer.py @@ -12,18 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +import core + __all__ = ['Inferencer', ] class Inferencer(object): - def __init__(self, network_func, params, place=None): + def __init__(self, network_func, param_path=None, place=None): # 1. we need to generate a framework.Program by calling # network_func. Reference: fluid.program_guard in test_word2vec.py # 2. move the default_main_program to self.program. # 3. run the default_startup program. - self.params = params + + # 4. load params from param_path into scope + self.scope = core.Scope() self.place = place def infer(self, inputs): diff --git a/python/paddle/fluid/params.py b/python/paddle/fluid/params.py deleted file mode 100644 index a5d257e53a2958acd1b8f6ef29d0f9f531b36678..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/params.py +++ /dev/null @@ -1,39 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from . import core - -__all__ = ['Params', ] - - -class Params(object): - def __init__(self, path=None): - self.scope = core.Scope() - - if path: - self._load(path) - - def _load(self, path): - # reference: load_persistables in io.py - pass - - def save(self, path): - # reference: save_persistables in io.py - pass - - def add_params(self, scope): - # take the keys from the scope, - # if not already exists in self.scope, - # add the key and value into self.scope. - pass diff --git a/python/paddle/fluid/tests/book/test_fit_a_line.py b/python/paddle/fluid/tests/book/test_fit_a_line.py index 6dfc2997ae0328a41fe22d13dfa8fc51d4d021a6..ecb34699af0dc14782601702ab8afedbca7e1bfd 100644 --- a/python/paddle/fluid/tests/book/test_fit_a_line.py +++ b/python/paddle/fluid/tests/book/test_fit_a_line.py @@ -80,12 +80,7 @@ def train(use_cuda, save_dirname, is_local): trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_image_classification.py b/python/paddle/fluid/tests/book/test_image_classification.py index 09f994c37020c4612223e4168be4cf535157f60b..8ff4f6d47a9a8cf166d6e111a2d89e99dee74f4c 100644 --- a/python/paddle/fluid/tests/book/test_image_classification.py +++ b/python/paddle/fluid/tests/book/test_image_classification.py @@ -189,12 +189,7 @@ def train(net_type, use_cuda, save_dirname, is_local): trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_label_semantic_roles.py b/python/paddle/fluid/tests/book/test_label_semantic_roles.py index d9cd76952e31f8185512ab45f9f3ab2ce7d9da48..50ef29c4572f1b12fe9793bbf037cd7fe71a9e53 100644 --- a/python/paddle/fluid/tests/book/test_label_semantic_roles.py +++ b/python/paddle/fluid/tests/book/test_label_semantic_roles.py @@ -259,12 +259,7 @@ def train(use_cuda, save_dirname=None, is_local=True): trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_machine_translation.py b/python/paddle/fluid/tests/book/test_machine_translation.py index 830d78df8b9e56b45f7e928562ef4b89e88f696d..46c6b9c29a265741a99655d5ac29244798f6fec2 100644 --- a/python/paddle/fluid/tests/book/test_machine_translation.py +++ b/python/paddle/fluid/tests/book/test_machine_translation.py @@ -231,12 +231,7 @@ def train_main(use_cuda, is_sparse, is_local=True): trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_recognize_digits.py b/python/paddle/fluid/tests/book/test_recognize_digits.py index 5ec6890c1b0dabd2804a92071b63c9610299e67c..c115aa4d7d6b514f9207543730e5e76cb0d2040c 100644 --- a/python/paddle/fluid/tests/book/test_recognize_digits.py +++ b/python/paddle/fluid/tests/book/test_recognize_digits.py @@ -162,12 +162,7 @@ def train(nn_type, trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_recommender_system.py b/python/paddle/fluid/tests/book/test_recommender_system.py index 2172c275b8082689a6ff5f2c3c27a2ff4e92275a..d022dedbff805d597b68b5a47f7931f2dd946615 100644 --- a/python/paddle/fluid/tests/book/test_recommender_system.py +++ b/python/paddle/fluid/tests/book/test_recommender_system.py @@ -261,12 +261,7 @@ def train(use_cuda, save_dirname, is_local=True): trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_understand_sentiment.py b/python/paddle/fluid/tests/book/test_understand_sentiment.py index dedd153778d7ad9caeb5fa7090a980bc7f177dea..241778e303036d068dc0a40e4574a02eb97ad134 100644 --- a/python/paddle/fluid/tests/book/test_understand_sentiment.py +++ b/python/paddle/fluid/tests/book/test_understand_sentiment.py @@ -213,12 +213,7 @@ def train(word_dict, trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/test_word2vec.py b/python/paddle/fluid/tests/book/test_word2vec.py index 8929779de9448d036e1528b64330b37463ab3988..6dec0f6857e86b4b9c1c67af934aa9bfdb1c3df7 100644 --- a/python/paddle/fluid/tests/book/test_word2vec.py +++ b/python/paddle/fluid/tests/book/test_word2vec.py @@ -145,12 +145,7 @@ def train(use_cuda, is_sparse, is_parallel, save_dirname, is_local=True): trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) training_role = os.getenv("TRAINING_ROLE", "TRAINER") t = fluid.DistributeTranspiler() - t.transpile( - optimize_ops, - params_grads, - trainer_id, - pservers=pserver_endpoints, - trainers=trainers) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, diff --git a/python/paddle/fluid/tests/book/word2vec/no_test_word2vec_new_api.py b/python/paddle/fluid/tests/book/word2vec/no_test_word2vec_new_api.py index 30939cae29ddf7070f3e5d39f33dc88e86c65450..35e163dc9df5a35ee5774b6b157366c4eabcb0f7 100644 --- a/python/paddle/fluid/tests/book/word2vec/no_test_word2vec_new_api.py +++ b/python/paddle/fluid/tests/book/word2vec/no_test_word2vec_new_api.py @@ -39,7 +39,7 @@ word_dict = paddle.dataset.imikolov.build_dict() dict_size = len(word_dict) -def inference_network(is_sparse): +def inference_program(is_sparse): first_word = fluid.layers.data(name='firstw', shape=[1], dtype='int64') second_word = fluid.layers.data(name='secondw', shape=[1], dtype='int64') third_word = fluid.layers.data(name='thirdw', shape=[1], dtype='int64') @@ -79,9 +79,9 @@ def inference_network(is_sparse): return predict_word -def train_network(is_sparse): +def train_program(is_sparse): next_word = fluid.layers.data(name='nextw', shape=[1], dtype='int64') - predict_word = inference_network(is_sparse) + predict_word = inference_program(is_sparse) cost = fluid.layers.cross_entropy(input=predict_word, label=next_word) avg_cost = fluid.layers.mean(cost) return avg_cost @@ -100,23 +100,25 @@ def train(use_cuda, is_sparse, save_path): word_dict, N)) if avg_cost < 5.0: - trainer.params.save(save_path) + trainer.save_params(save_path) return if math.isnan(avg_cost): sys.exit("got NaN loss, training failed.") trainer = fluid.Trainer( - partial(train_network, is_sparse), + partial(train_program, is_sparse), fluid.optimizer.SGD(learning_rate=0.001), place=place) trainer.train( reader=train_reader, num_epochs=100, event_handler=event_handler) -def infer(use_cuda, save_path): - params = fluid.Params(save_path) +def infer(use_cuda, is_sparse, save_path): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - inferencer = fluid.Inferencer(inference_network, params, place=place) + inferencer = fluid.Inferencer( + partial(inference_program, is_sparse), + param_path=save_path, + place=place) lod = [0, 1] first_word = create_random_lodtensor(lod, place, low=0, high=dict_size - 1) @@ -138,7 +140,7 @@ def main(use_cuda, is_sparse): save_path = "word2vec.inference.model" train(use_cuda, is_sparse, save_path) - infer(use_cuda, save_path) + infer(use_cuda, is_sparse, save_path) if __name__ == '__main__': diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 2362da370a39b6e59f486e2affbb02e21840784f..0aada3deb0f047d21701b64af022ebad372d505b 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -56,23 +56,22 @@ class Trainer(object): """ Args: - network_func(callable): A function which will return loss. The loss must be a scaler. + program_func(callable): A function which will return loss. The loss must be a scaler. optimizer(optimizer.Optimizer): The optimizer should be an instance of Optimizer - params: place: The device place of this trainer. """ - def __init__(self, network_func, optimizer, params=None, place=None): + def __init__(self, program_func, optimizer, param_path=None, place=None): # 1. we need to generate a framework.Program by calling - # network_func. Reference: fluid.program_guard in + # program_func. Reference: fluid.program_guard in # test_word2vec.py - self.scope = self._get_scope_from_params(params) + self.scope = core.Scope() self.startup_program = framework.Program() self.train_program = framework.Program() with framework.program_guard(self.train_program, self.startup_program): - loss = network_func() + loss = program_func() if not isinstance(optimizer, opt_module.Optimizer): raise TypeError( "The optimizer should be an instance of Optimizer") @@ -84,14 +83,13 @@ class Trainer(object): # 2. move the default_main_program to self.program and run the # default_startup program on an empty core.Scope() # Run startup program - if params is None: - exe = executor.Executor(place) - exe.run(self.startup_program, scope=self.scope) + exe = executor.Executor(place) + exe.run(self.startup_program, scope=self.scope) - # 3. call self.params.add_vars with the initialized scope, it - # will add the new vars of the initialized scope into - # self.params. - # TODO(yuyang): This depends on parameters implementation. + if param_path: + # load params from param_path into scope + # TODO(yuyang): This depends on parameters implementation. + pass # TODO(helin): support distributed training @@ -124,19 +122,9 @@ class Trainer(object): def test(self, reader): pass - def _get_scope_from_params(self, params): - """ - Get Scope from parameter object. - Args: - params(Parameter|None): The parameter object instance. Could be None. - - Returns: New scope if params is None. Or params.scope() - NOTE: This method is WIP. Not fully implemented. - """ - if params is None: - return core.Scope() # new scope when params is None - else: - raise NotImplementedError("Not implemented right now.") + def save_params(self, param_path): + # reference: save_persistables in io.py + pass @staticmethod def _check_and_get_place(place):