cluster_trainer.py 4.4 KB
Newer Older
T
tangwei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Training use fluid with one node only.
"""

from __future__ import print_function

T
tangwei 已提交
21
import paddle.fluid as fluid
T
tangwei 已提交
22 23 24 25
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker

T
tangwei 已提交
26 27
from fleetrec.core.utils import envs
from fleetrec.core.trainers.transpiler_trainer import TranspileTrainer
T
tangwei 已提交
28 29


T
tangwei 已提交
30
class ClusterTrainer(TranspileTrainer):
T
tangwei 已提交
31 32 33 34
    def processor_register(self):
        role = PaddleCloudRoleMaker()
        fleet.init(role)

T
tangwei12 已提交
35
        if fleet.is_server():
T
tangwei 已提交
36 37 38 39 40 41
            self.regist_context_processor('uninit', self.instance)
            self.regist_context_processor('init_pass', self.init)
            self.regist_context_processor('server_pass', self.server)
        else:
            self.regist_context_processor('uninit', self.instance)
            self.regist_context_processor('init_pass', self.init)
T
tangwei 已提交
42 43 44 45 46

            if envs.get_platform() == "LINUX":
                self.regist_context_processor('train_pass', self.dataset_train)
            else:
                self.regist_context_processor('train_pass', self.dataloader_train)
T
tangwei 已提交
47 48 49
            self.regist_context_processor('terminal_pass', self.terminal)

    def build_strategy(self):
T
tangwei 已提交
50
        mode = envs.get_runtime_environ("train.trainer.strategy")
T
fix bug  
tangwei 已提交
51
        assert mode in ["async", "geo", "sync", "half_async"]
T
tangwei 已提交
52

T
tangwei 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
        strategy = None

        if mode == "async":
            strategy = StrategyFactory.create_async_strategy()
        elif mode == "geo":
            push_num = envs.get_global_env("train.strategy.mode.push_num", 100)
            strategy = StrategyFactory.create_geo_strategy(push_num)
        elif mode == "sync":
            strategy = StrategyFactory.create_sync_strategy()
        elif mode == "half_async":
            strategy = StrategyFactory.create_half_async_strategy()

        assert strategy is not None

        return strategy

    def init(self, context):
T
tangwei 已提交
70
        self.model.train_net()
T
tangwei 已提交
71 72 73
        optimizer = self.model.optimizer()
        strategy = self.build_strategy()
        optimizer = fleet.distributed_optimizer(optimizer, strategy)
T
tangwei 已提交
74
        optimizer.minimize(self.model.get_cost_op())
T
tangwei 已提交
75 76 77 78

        if fleet.is_server():
            context['status'] = 'server_pass'
        else:
T
tangwei 已提交
79 80 81 82 83 84 85 86
            self.fetch_vars = []
            self.fetch_alias = []
            self.fetch_period = self.model.get_fetch_period()

            metrics = self.model.get_metrics()
            if metrics:
                self.fetch_vars = metrics.values()
                self.fetch_alias = metrics.keys()
T
tangwei 已提交
87 88 89 90 91 92 93
            context['status'] = 'train_pass'

    def server(self, context):
        fleet.init_server()
        fleet.run_server()
        context['is_exit'] = True

T
tangwei 已提交
94 95 96 97
    def dataloader_train(self, context):
        pass

    def dataset_train(self, context):
T
tangwei 已提交
98
        self._exe.run(fleet.startup_program)
T
tangwei 已提交
99 100 101 102 103 104
        fleet.init_worker()

        dataset = self._get_dataset()
        epochs = envs.get_global_env("train.epochs")

        for i in range(epochs):
T
tangwei 已提交
105 106 107 108 109
            self._exe.train_from_dataset(program=fluid.default_main_program(),
                                         dataset=dataset,
                                         fetch_list=self.fetch_vars,
                                         fetch_info=self.fetch_alias,
                                         print_period=self.fetch_period)
T
tangwei 已提交
110
            self.save(i, "train", is_fleet=True)
T
tangwei 已提交
111
        context['status'] = 'terminal_pass'
T
tangwei 已提交
112 113 114 115 116 117 118 119 120
        fleet.stop_worker()

    def infer(self, context):
        context['status'] = 'terminal_pass'

    def terminal(self, context):
        for model in self.increment_models:
            print("epoch :{}, dir: {}".format(model[0], model[1]))
        context['is_exit'] = True