cluster_trainer.py 6.7 KB
Newer Older
T
tangwei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""

from __future__ import print_function

C
chengmo 已提交
20
import os
T
tangwei 已提交
21 22
import time

T
tangwei 已提交
23
import paddle.fluid as fluid
T
tangwei 已提交
24 25 26 27
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker

28 29
from paddlerec.core.utils import envs
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
T
tangwei 已提交
30 31


T
tangwei 已提交
32
class ClusterTrainer(TranspileTrainer):
T
tangwei 已提交
33
    def processor_register(self):
T
tangwei 已提交
34
        role = PaddleCloudRoleMaker()
T
tangwei 已提交
35 36
        fleet.init(role)

T
tangwei12 已提交
37
        if fleet.is_server():
T
tangwei 已提交
38 39 40 41 42 43
            self.regist_context_processor('uninit', self.instance)
            self.regist_context_processor('init_pass', self.init)
            self.regist_context_processor('server_pass', self.server)
        else:
            self.regist_context_processor('uninit', self.instance)
            self.regist_context_processor('init_pass', self.init)
C
chengmo 已提交
44
            self.regist_context_processor('startup_pass', self.startup)
T
tangwei 已提交
45 46 47

            if envs.get_platform() == "LINUX" and envs.get_global_env(
                    "dataset_class", None, "train.reader") != "DataLoader":
T
tangwei 已提交
48 49
                self.regist_context_processor('train_pass', self.dataset_train)
            else:
T
tangwei 已提交
50 51 52
                self.regist_context_processor('train_pass',
                                              self.dataloader_train)

53
            self.regist_context_processor('infer_pass', self.infer)
T
tangwei 已提交
54 55 56
            self.regist_context_processor('terminal_pass', self.terminal)

    def build_strategy(self):
T
tangwei 已提交
57
        mode = envs.get_runtime_environ("train.trainer.strategy")
T
fix bug  
tangwei 已提交
58
        assert mode in ["async", "geo", "sync", "half_async"]
T
tangwei 已提交
59

T
tangwei 已提交
60 61 62 63 64 65 66 67 68 69 70 71 72 73
        strategy = None

        if mode == "async":
            strategy = StrategyFactory.create_async_strategy()
        elif mode == "geo":
            push_num = envs.get_global_env("train.strategy.mode.push_num", 100)
            strategy = StrategyFactory.create_geo_strategy(push_num)
        elif mode == "sync":
            strategy = StrategyFactory.create_sync_strategy()
        elif mode == "half_async":
            strategy = StrategyFactory.create_half_async_strategy()

        assert strategy is not None

T
tangwei 已提交
74
        self.strategy = strategy
T
tangwei 已提交
75 76 77
        return strategy

    def init(self, context):
T
tangwei 已提交
78
        self.model.train_net()
T
tangwei 已提交
79
        optimizer = self.model.optimizer()
T
tangwei 已提交
80 81
        optimizer_name = envs.get_global_env("hyper_parameters.optimizer",
                                             None, "train.model")
C
chengmo 已提交
82 83 84
        if optimizer_name not in ["", "sgd", "SGD", "Sgd"]:
            os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0'

T
tangwei 已提交
85 86
        strategy = self.build_strategy()
        optimizer = fleet.distributed_optimizer(optimizer, strategy)
T
tangwei 已提交
87
        optimizer.minimize(self.model.get_avg_cost())
T
tangwei 已提交
88 89 90 91

        if fleet.is_server():
            context['status'] = 'server_pass'
        else:
T
tangwei 已提交
92 93 94 95 96 97 98 99
            self.fetch_vars = []
            self.fetch_alias = []
            self.fetch_period = self.model.get_fetch_period()

            metrics = self.model.get_metrics()
            if metrics:
                self.fetch_vars = metrics.values()
                self.fetch_alias = metrics.keys()
C
chengmo 已提交
100
            context['status'] = 'startup_pass'
T
tangwei 已提交
101 102 103 104 105 106

    def server(self, context):
        fleet.init_server()
        fleet.run_server()
        context['is_exit'] = True

C
chengmo 已提交
107
    def startup(self, context):
T
tangwei 已提交
108
        self._exe.run(fleet.startup_program)
C
chengmo 已提交
109
        context['status'] = 'train_pass'
T
tangwei 已提交
110

C
chengmo 已提交
111
    def dataloader_train(self, context):
T
tangwei 已提交
112 113
        fleet.init_worker()

T
tangwei 已提交
114
        reader = self._get_dataloader()
T
tangwei 已提交
115 116
        epochs = envs.get_global_env("train.epochs")

T
tangwei 已提交
117 118
        program = fluid.compiler.CompiledProgram(
            fleet.main_program).with_data_parallel(
T
tangwei 已提交
119 120 121
                loss_name=self.model.get_avg_cost().name,
                build_strategy=self.strategy.get_build_strategy(),
                exec_strategy=self.strategy.get_execute_strategy())
T
tangwei 已提交
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139

        metrics_varnames = []
        metrics_format = []

        metrics_format.append("{}: {{}}".format("epoch"))
        metrics_format.append("{}: {{}}".format("batch"))

        for name, var in self.model.get_metrics().items():
            metrics_varnames.append(var.name)
            metrics_format.append("{}: {{}}".format(name))

        metrics_format = ", ".join(metrics_format)

        for epoch in range(epochs):
            reader.start()
            batch_id = 0
            try:
                while True:
T
tangwei 已提交
140 141
                    metrics_rets = self._exe.run(program=program,
                                                 fetch_list=metrics_varnames)
T
tangwei 已提交
142 143 144 145

                    metrics = [epoch, batch_id]
                    metrics.extend(metrics_rets)

146
                    if batch_id % self.fetch_period == 0 and batch_id != 0:
T
tangwei 已提交
147 148 149 150
                        print(metrics_format.format(*metrics))
                    batch_id += 1
            except fluid.core.EOFException:
                reader.reset()
151
            self.save(epoch, "train", is_fleet=True)
T
tangwei 已提交
152

T
tangwei 已提交
153
        fleet.stop_worker()
154
        context['status'] = 'infer_pass'
T
tangwei 已提交
155

T
tangwei 已提交
156 157 158 159
    def dataset_train(self, context):
        fleet.init_worker()

        dataset = self._get_dataset()
T
tangwei 已提交
160 161
        ins = self._get_dataset_ins()

T
tangwei 已提交
162 163 164
        epochs = envs.get_global_env("train.epochs")

        for i in range(epochs):
T
tangwei 已提交
165
            begin_time = time.time()
T
tangwei 已提交
166 167 168 169 170 171
            self._exe.train_from_dataset(
                program=fluid.default_main_program(),
                dataset=dataset,
                fetch_list=self.fetch_vars,
                fetch_info=self.fetch_alias,
                print_period=self.fetch_period)
T
tangwei 已提交
172
            end_time = time.time()
T
tangwei 已提交
173 174 175
            times = end_time - begin_time
            print("epoch {} using time {}, speed {:.2f} lines/s".format(
                i, times, ins / times))
T
tangwei 已提交
176

T
tangwei 已提交
177 178
            self.save(i, "train", is_fleet=True)
        fleet.stop_worker()
179
        context['status'] = 'infer_pass'
T
tangwei 已提交
180 181 182 183 184

    def terminal(self, context):
        for model in self.increment_models:
            print("epoch :{}, dir: {}".format(model[0], model[1]))
        context['is_exit'] = True