diff --git a/examples/ctr-dnn_train.yaml b/examples/ctr-dnn_train.yaml index 302fc110b65130784b12a3b946ed4b7fec194a59..2baf63f2ac9a609a1ac84ea9b366dc184674f2b0 100644 --- a/examples/ctr-dnn_train.yaml +++ b/examples/ctr-dnn_train.yaml @@ -29,6 +29,9 @@ train: threads: 12 epochs: 10 trainer: "SingleTraining" + role_maler: "PaddleCloudRoleMaker" + strategy: + mode: "async" reader: mode: "dataset" diff --git a/models/ctr_dnn/model.py b/models/ctr_dnn/model.py index cc3242d9b7ba86178f5d7aa498764bbd27f39693..44e0087d666118c075a03219fdc7e2931e1ba1cd 100644 --- a/models/ctr_dnn/model.py +++ b/models/ctr_dnn/model.py @@ -125,10 +125,6 @@ class Train(object): optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True) return optimizer - def optimize(self): - optimizer = self.optimizer() - optimizer.minimize(self.loss) - class Evaluate(object): def input(self): diff --git a/trainer/cluster_train.py b/trainer/cluster_train.py new file mode 100644 index 0000000000000000000000000000000000000000..8096ba3583a5d9b1fb2a5decacd4e56defdcd553 --- /dev/null +++ b/trainer/cluster_train.py @@ -0,0 +1,213 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Training use fluid with one node only. +""" + +from __future__ import print_function +import os +import time +import numpy as np +import logging +import paddle.fluid as fluid + +from .trainer import Trainer +from ..utils import envs + +from ..reader import dataset + +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory +from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker + +logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger("fluid") +logger.setLevel(logging.INFO) + + +def need_save(epoch_id, epoch_interval, is_last=False): + if is_last: + return True + + return epoch_id % epoch_interval == 0 + + +class ClusterTrainer(Trainer): + + def __init__(self, config=None, yaml_file=None): + Trainer.__init__(self, config, yaml_file) + + self.exe = fluid.Executor(fluid.CPUPlace()) + + self.regist_context_processor('uninit', self.instance) + self.regist_context_processor('init_pass', self.init) + self.regist_context_processor('server_pass', self.server) + self.regist_context_processor('train_pass', self.train) + self.regist_context_processor('terminal_pass', self.terminal) + + def build_role_maker(self): + role_maker = envs.get_global_env("train.role_maker") + + if role_maker == "PaddleCloudRoleMaker": + role = PaddleCloudRoleMaker() + return role + else: + raise ValueError("only support PaddleCloudRoleMaker now") + + def build_strategy(self): + mode = envs.get_global_env("train.strategy.mode") + strategy = None + + if mode == "async": + strategy = StrategyFactory.create_async_strategy() + elif mode == "geo": + push_num = envs.get_global_env("train.strategy.mode.push_num", 100) + strategy = StrategyFactory.create_geo_strategy(push_num) + elif mode == "sync": + strategy = StrategyFactory.create_sync_strategy() + elif mode == "half_async": + strategy = StrategyFactory.create_half_async_strategy() + + return strategy + + def instance(self, context): + model_package = __import__(envs.get_global_env("train.model.models")) + train_model = getattr(model_package, 'Train') + + self.model = train_model() + + context['status'] = 'init_pass' + + def init(self, context): + fleet.init(self.build_role_maker()) + + self.model.input() + self.model.net() + self.model.loss() + self.metrics = self.model.metrics() + self.loss = self.model.avg_loss() + + optimizer = self.model.get_optimizer() + strategy = self.build_strategy() + optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer.minimize(self.loss) + + if fleet.is_server(): + context['status'] = 'server_pass' + else: + context['status'] = 'train_pass' + + def server(self, context): + fleet.init_server() + fleet.run_server() + + context['status'] = 'wait' + + def terminal(self, context): + fleet.stop_worker() + context['is_exit'] = True + + def train(self, context): + print("Need to be implement") + context['is_exit'] = True + + +class ClusterTrainerWithDataloader(ClusterTrainer): + pass + + +class ClusterTrainerWithDataset(ClusterTrainer): + def _get_dataset(self, inputs, threads, batch_size, pipe_command, train_files_path): + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_use_var(inputs) + dataset.set_pipe_command(pipe_command) + dataset.set_batch_size(batch_size) + dataset.set_thread(threads) + file_list = [ + os.path.join(train_files_path, x) + for x in os.listdir(train_files_path) + ] + + dataset.set_filelist(file_list) + return dataset + + def save(self, epoch_id): + def save_inference_model(): + is_save_inference = envs.get_global_env("save.inference", False) + if not is_save_inference: + return + + save_interval = envs.get_global_env("save.inference.epoch_interval", 1) + if not need_save(epoch_id, save_interval, False): + return + + feed_varnames = envs.get_global_env("save.inference.feed_varnames", None) + fetch_varnames = envs.get_global_env("save.inference.fetch_varnames", None) + fetch_vars = [fluid.global_scope().vars[varname] for varname in fetch_varnames] + dirname = envs.get_global_env("save.inference.dirname", None) + + assert dirname is not None + dirname = os.path.join(dirname, str(epoch_id)) + fluid.io.save_inference_model(dirname, feed_varnames, fetch_vars, self.exe) + + def save_persistables(): + is_save_increment = envs.get_global_env("save.increment", False) + if not is_save_increment: + return + + save_interval = envs.get_global_env("save.increment.epoch_interval", 1) + if not need_save(epoch_id, save_interval, False): + return + + dirname = envs.get_global_env("save.inference.dirname", None) + + assert dirname is not None + dirname = os.path.join(dirname, str(epoch_id)) + fluid.io.save_persistables(self.exe, dirname) + + is_save = envs.get_global_env("save", False) + + if not is_save: + return + + save_persistables() + save_inference_model() + + def train(self, context): + inputs = self.model.input_vars() + threads = envs.get_global_env("threads") + batch_size = envs.get_global_env("batch_size") + pipe_command = envs.get_global_env("pipe_command") + train_data_path = envs.get_global_env("train_data_path") + + dataset = self._get_dataset(inputs, threads, batch_size, pipe_command, train_data_path) + + fleet.init_worker() + self.exe.run(fleet.startup_program) + + epochs = envs.get_global_env("epochs") + + for i in range(epochs): + self.exe.train_from_dataset(program=fluid.default_main_program(), + dataset=dataset, + fetch_list=[self.metrics], + fetch_info=["epoch {} auc ".format(i)], + print_period=100) + self.save(i) + + context['status'] = 'infer_pass' + + def infer(self, context): + context['status'] = 'terminal_pass' diff --git a/trainer/cluster_train_local.py b/trainer/cluster_train_local.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/trainer/single_train.py b/trainer/single_train.py index 20804543ebc7f042f853f744492ee3a2b11b6c48..9422dd5ab855bd880fff906ef413e748c52d52ad 100644 --- a/trainer/single_train.py +++ b/trainer/single_train.py @@ -62,9 +62,11 @@ class SingleTrainer(Trainer): def init(self, context): self.model.input() self.model.net() - self.model.loss() self.metrics = self.model.metrics() - self.model.optimize() + loss = self.model.avg_loss() + + optimizer = self.model.get_optimizer() + optimizer.minimize(loss) # run startup program at once self.exe.run(fluid.default_startup_program())