single_trainer.py 4.4 KB
Newer Older
T
tangwei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Training use fluid with one node only.
"""

from __future__ import print_function
import logging
import paddle.fluid as fluid

T
rename  
tangwei 已提交
23 24
from fleetrec.core.trainers.transpiler_trainer import TranspileTrainer
from fleetrec.core.utils import envs
T
tangwei 已提交
25
import numpy as np
T
tangwei 已提交
26 27 28 29 30 31

logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)


T
tangwei 已提交
32
class SingleTrainer(TranspileTrainer):
T
tangwei 已提交
33 34 35
    def processor_register(self):
        self.regist_context_processor('uninit', self.instance)
        self.regist_context_processor('init_pass', self.init)
C
chengmo 已提交
36
        self.regist_context_processor('startup_pass', self.startup)
Y
add din  
yaoxuefeng 已提交
37
        if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None, "train.reader") != "DataLoader":
T
tangwei 已提交
38 39 40 41
            self.regist_context_processor('train_pass', self.dataset_train)
        else:
            self.regist_context_processor('train_pass', self.dataloader_train)

T
tangwei 已提交
42 43 44 45
        self.regist_context_processor('infer_pass', self.infer)
        self.regist_context_processor('terminal_pass', self.terminal)

    def init(self, context):
T
tangwei 已提交
46
        self.model.train_net()
T
tangwei 已提交
47
        optimizer = self.model.optimizer()
T
tangwei 已提交
48
        optimizer.minimize((self.model.get_cost_op()))
T
tangwei 已提交
49 50 51 52

        self.fetch_vars = []
        self.fetch_alias = []
        self.fetch_period = self.model.get_fetch_period()
T
tangwei 已提交
53

T
tangwei 已提交
54 55 56 57
        metrics = self.model.get_metrics()
        if metrics:
            self.fetch_vars = metrics.values()
            self.fetch_alias = metrics.keys()
C
chengmo 已提交
58 59 60 61
        context['status'] = 'startup_pass'

    def startup(self, context):
        self._exe.run(fluid.default_startup_program())
T
tangwei 已提交
62 63
        context['status'] = 'train_pass'

T
tangwei 已提交
64 65 66
    def dataloader_train(self, context):
        reader = self._get_dataloader()
        epochs = envs.get_global_env("train.epochs")
T
tangwei 已提交
67

T
tangwei 已提交
68 69
        program = fluid.compiler.CompiledProgram(
            fluid.default_main_program()).with_data_parallel(
T
tangwei 已提交
70
            loss_name=self.model.get_cost_op().name)
T
tangwei 已提交
71 72 73 74 75 76 77 78

        metrics_varnames = []
        metrics_format = []

        metrics_format.append("{}: {{}}".format("epoch"))
        metrics_format.append("{}: {{}}".format("batch"))

        for name, var in self.model.get_metrics().items():
T
tangwei 已提交
79
            metrics_varnames.append(var.name)
T
tangwei 已提交
80 81 82
            metrics_format.append("{}: {{}}".format(name))

        metrics_format = ", ".join(metrics_format)
T
tangwei 已提交
83

T
tangwei 已提交
84 85 86 87 88 89 90 91 92 93
        for epoch in range(epochs):
            reader.start()
            batch_id = 0
            try:
                while True:
                    metrics_rets = self._exe.run(
                        program=program,
                        fetch_list=metrics_varnames)

                    metrics = [epoch, batch_id]
T
tangwei 已提交
94
                    metrics.extend(metrics_rets)
T
tangwei 已提交
95 96

                    if batch_id % 10 == 0 and batch_id != 0:
T
tangwei 已提交
97
                        print(metrics_format.format(*metrics))
T
tangwei 已提交
98 99 100 101 102 103 104 105
                    batch_id += 1
            except fluid.core.EOFException:
                reader.reset()

        context['status'] = 'infer_pass'

    def dataset_train(self, context):
        dataset = self._get_dataset()
T
tangwei 已提交
106 107 108
        epochs = envs.get_global_env("train.epochs")

        for i in range(epochs):
T
tangwei 已提交
109 110 111 112 113
            self._exe.train_from_dataset(program=fluid.default_main_program(),
                                         dataset=dataset,
                                         fetch_list=self.fetch_vars,
                                         fetch_info=self.fetch_alias,
                                         print_period=self.fetch_period)
T
tangwei 已提交
114 115 116 117 118 119 120 121 122 123
            self.save(i, "train", is_fleet=False)
        context['status'] = 'infer_pass'

    def infer(self, context):
        context['status'] = 'terminal_pass'

    def terminal(self, context):
        for model in self.increment_models:
            print("epoch :{}, dir: {}".format(model[0], model[1]))
        context['is_exit'] = True