trainer_factory.py 9.5 KB
Newer Older
D
dongdaxiang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
X
xujiaqi01 已提交
14
"""Defination of TrainerFactory."""
D
dongdaxiang 已提交
15

16 17
import threading
import time
D
Dong Daxiang 已提交
18
import logging
19
import numpy as np
X
xujiaqi01 已提交
20
from paddle.fluid.log_helper import get_logger
21

22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
local_logger = get_logger(
    __name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s'
)

from .trainer_desc import (
    MultiTrainer,
    DistMultiTrainer,
    PipelineTrainer,
    HeterXpuTrainer,
    PSGPUTrainer,
    HeterPipelineTrainer,
)
from .device_worker import (
    Hogwild,
    DownpourSGD,
    DownpourLite,
    Section,
    DownpourSGDOPT,
    HeterSection,
)
D
Dong Daxiang 已提交
42 43
from .framework import Variable
from multiprocessing import Process, Manager
X
xjqbest 已提交
44

45
__all__ = ["TrainerFactory", "FetchHandlerMonitor"]
D
dongdaxiang 已提交
46 47


48
class TrainerFactory:
X
xujiaqi01 已提交
49 50 51 52 53 54
    """
    Create trainer and device worker.
    If opt_info is not None, it will get configs from opt_info,
    otherwise create MultiTrainer and Hogwild.
    """

D
dongdaxiang 已提交
55 56 57
    def __init__(self):
        pass

58
    def _create_trainer(self, opt_info=None):
D
dongdaxiang 已提交
59 60
        trainer = None
        device_worker = None
61
        if not opt_info:
D
dongdaxiang 已提交
62 63 64
            # default is MultiTrainer + Hogwild
            trainer = MultiTrainer()
            device_worker = Hogwild()
65
            trainer._set_device_worker(device_worker)
D
dongdaxiang 已提交
66
        else:
T
Thunderbrook 已提交
67 68
            trainer_class = opt_info.get("trainer", "MultiTrainer")
            device_worker_class = opt_info.get("device_worker", "Hogwild")
D
dongdaxiang 已提交
69 70
            trainer = globals()[trainer_class]()
            device_worker = globals()[device_worker_class]()
71 72 73

            # for debug tools
            if opt_info is not None:
74 75 76 77
                if opt_info.get("trainers") is not None:
                    trainer._set_trainers(opt_info["trainers"])
                if opt_info.get("trainer_id") is not None:
                    trainer._set_trainer_id(opt_info["trainer_id"])
X
xujiaqi01 已提交
78 79 80 81 82 83
                if opt_info.get("dump_slot") is not None:
                    trainer._set_dump_slot(opt_info["dump_slot"])
                if opt_info.get("mpi_rank") is not None:
                    trainer._set_mpi_rank(opt_info["mpi_rank"])
                if opt_info.get("mpi_size") is not None:
                    trainer._set_mpi_size(opt_info["mpi_size"])
84 85 86 87
                if (
                    opt_info.get("dump_fields") is not None
                    and len(opt_info.get("dump_fields")) != 0
                ):
X
xujiaqi01 已提交
88
                    trainer._set_dump_fields(opt_info["dump_fields"])
89 90 91 92
                if (
                    opt_info.get("dump_fields_path") is not None
                    and len(opt_info.get("dump_fields_path")) != 0
                ):
X
xujiaqi01 已提交
93 94 95 96 97
                    trainer._set_dump_fields_path(opt_info["dump_fields_path"])
                if opt_info.get("dump_file_num") is not None:
                    trainer._set_dump_file_num(opt_info["dump_file_num"])
                if opt_info.get("dump_converter") is not None:
                    trainer._set_dump_converter(opt_info["dump_converter"])
98 99 100 101
                if (
                    opt_info.get("dump_param") is not None
                    and len(opt_info.get("dump_param")) != 0
                ):
102
                    trainer._set_dump_param(opt_info["dump_param"])
T
Thunderbrook 已提交
103 104
                if opt_info.get("worker_places") is not None:
                    trainer._set_worker_places(opt_info["worker_places"])
T
Thunderbrook 已提交
105 106
                if opt_info.get("use_ps_gpu") is not None:
                    trainer._set_use_ps_gpu(opt_info["use_ps_gpu"])
D
danleifeng 已提交
107 108
                if opt_info.get("is_dump_in_simple_mode") is not None:
                    trainer._set_is_dump_in_simple_mode(
109 110
                        opt_info["is_dump_in_simple_mode"]
                    )
H
hutuxian 已提交
111
                if opt_info.get("enable_random_dump") is not None:
112
                    trainer._set_enable_random_dump(
113 114
                        opt_info["enable_random_dump"]
                    )
H
hutuxian 已提交
115 116 117
                if opt_info.get("dump_interval") is not None:
                    trainer._set_dump_interval(opt_info["dump_interval"])
                if opt_info.get("random_with_lineid") is not None:
118
                    trainer._set_random_with_lineid(
119 120
                        opt_info["random_with_lineid"]
                    )
121 122 123 124 125 126 127 128

            if "fleet_desc" in opt_info:
                device_worker._set_fleet_desc(opt_info["fleet_desc"])
                trainer._set_fleet_desc(opt_info["fleet_desc"])
                if opt_info.get("use_cvm") is not None:
                    trainer._set_use_cvm(opt_info["use_cvm"])
                if opt_info.get("no_cvm") is not None:
                    trainer._set_no_cvm(opt_info["no_cvm"])
129 130 131 132
                if (
                    opt_info.get("scale_sparse_gradient_with_batch_size")
                    is not None
                ):
133
                    trainer._set_scale_sparse_grad_with_batch_size(
134 135
                        opt_info["scale_sparse_gradient_with_batch_size"]
                    )
136 137
                if opt_info.get("scale_datanorm") is not None:
                    trainer._set_scale_datanorm(opt_info["scale_datanorm"])
X
xujiaqi01 已提交
138
                if opt_info.get("adjust_ins_weight") is not None:
139
                    trainer._set_adjust_ins_weight(
140 141
                        opt_info["adjust_ins_weight"]
                    )
X
xujiaqi01 已提交
142 143 144
                if opt_info.get("copy_table") is not None:
                    trainer._set_copy_table_config(opt_info["copy_table"])
                if opt_info.get("check_nan_var_names") is not None:
145
                    trainer._set_check_nan_var_names(
146 147
                        opt_info["check_nan_var_names"]
                    )
148 149
                if opt_info.get("loss_names") is not None:
                    trainer._set_loss_names(opt_info["loss_names"])
150
            trainer._set_device_worker(device_worker)
D
dongdaxiang 已提交
151
        return trainer
152 153


154
class FetchHandlerMonitor:
X
xujiaqi01 已提交
155 156 157 158 159
    """
    Defination of FetchHandlerMonitor class,
    it's for fetch handler.
    """

160 161
    def __init__(self, scope, handler):
        self.fetch_instance = handler
162 163 164
        self.fetch_thread = threading.Thread(
            target=self.handler_launch_func, args=(scope, self.fetch_instance)
        )
D
Dong Daxiang 已提交
165
        self.running_lock = threading.Lock()
166 167
        self.running = False

D
Dong Daxiang 已提交
168 169 170 171 172 173 174 175
    def handler_launch_func(self, scope, handler):
        fetch_instance = handler
        period_secs = fetch_instance.period_secs
        var_name_to_key = {}
        for key in fetch_instance.var_dict:
            if isinstance(fetch_instance.var_dict[key], Variable):
                var_name_to_key[fetch_instance.var_dict[key].name] = key
            else:
176
                local_logger.warning(
177 178
                    "the value of {} is not a Variable".format(key)
                )
D
Dong Daxiang 已提交
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
                var_name_to_key["None.var"] = key
        elapsed_secs = 0
        while True:
            self.running_lock.acquire()
            if self.running == False:
                break
            if elapsed_secs < period_secs:
                # TODO(guru4elephant): needs customized condition
                time.sleep(1)
                elapsed_secs += 1
            else:
                elapsed_secs = 0
                fetch_dict = {}
                for key in var_name_to_key:
                    var = scope.find_var(key)
                    fetch_dict[key] = var
195
                    if var is None:
196 197
                        local_logger.warning(
                            "{} value currently not available".format(
198 199 200
                                var_name_to_key[key]
                            )
                        )
D
Dong Daxiang 已提交
201 202 203
                res_dict = {}
                for key in fetch_dict:
                    user_name = var_name_to_key[key]
204
                    if fetch_dict[key] is None:
D
Dong Daxiang 已提交
205 206 207 208 209 210 211
                        res_dict[user_name] = None
                        continue
                    else:
                        res_dict[user_name] = fetch_dict[key].get_tensor()

                    lod = res_dict[user_name].lod()
                    if len(lod) > 0:
212 213
                        raise RuntimeError(
                            "Some of your fetched tensors \
D
Dong Daxiang 已提交
214 215 216 217
                                            hold LoD information. \
                                            They can not be completely cast \
                                            to Python ndarray. We can \
                                            not return LoDTensor itself directly, \
218 219
                                            please choose another targets"
                        )
D
Dong Daxiang 已提交
220 221 222 223 224 225 226
                    if res_dict[user_name]._is_initialized():
                        res_dict[user_name] = np.array(res_dict[user_name])
                    else:
                        res_dict[user_name] = None
                fetch_instance.handler(res_dict)
            self.running_lock.release()

227
    def start(self):
X
xujiaqi01 已提交
228 229 230 231
        """
        start monitor,
        it will start a monitor thread.
        """
D
Dong Daxiang 已提交
232
        self.running_lock.acquire()
233
        self.running = True
D
Dong Daxiang 已提交
234
        self.running_lock.release()
235 236 237 238
        self.fetch_thread.setDaemon(True)
        self.fetch_thread.start()

    def stop(self):
D
Dong Daxiang 已提交
239
        self.running_lock.acquire()
240
        self.running = False
D
Dong Daxiang 已提交
241
        self.running_lock.release()