run.py 16.5 KB
Newer Older
T
tangwei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

T
tangwei 已提交
15
import os
T
tangwei 已提交
16
import subprocess
X
test  
xjqbest 已提交
17
import sys
T
tangwei 已提交
18 19
import argparse
import tempfile
C
Chengmo 已提交
20

X
fix  
xjqbest 已提交
21
import copy
22 23 24
from paddlerec.core.factory import TrainerFactory
from paddlerec.core.utils import envs
from paddlerec.core.utils import util
X
test  
xjqbest 已提交
25
from paddlerec.core.utils import validation
T
tangwei 已提交
26

T
tangwei 已提交
27 28
engines = {}
device = ["CPU", "GPU"]
T
tangwei 已提交
29
engine_choices = [
C
Chengmo 已提交
30 31
    "TRAIN", "SINGLE_TRAIN", "INFER", "SINGLE_INFER", "LOCAL_CLUSTER",
    "LOCAL_CLUSTER_TRAIN", "CLUSTER_TRAIN"
T
tangwei 已提交
32
]
T
tangwei 已提交
33 34


T
tangwei 已提交
35
def engine_registry():
T
tangwei 已提交
36 37 38
    engines["TRANSPILER"] = {}
    engines["PSLIB"] = {}

C
Chengmo 已提交
39
    engines["TRANSPILER"]["TRAIN"] = single_train_engine
X
fix  
xjqbest 已提交
40
    engines["TRANSPILER"]["SINGLE_TRAIN"] = single_train_engine
C
Chengmo 已提交
41
    engines["TRANSPILER"]["INFER"] = single_infer_engine
X
fix  
xjqbest 已提交
42
    engines["TRANSPILER"]["SINGLE_INFER"] = single_infer_engine
T
tangwei 已提交
43
    engines["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine
C
Chengmo 已提交
44
    engines["TRANSPILER"]["LOCAL_CLUSTER_TRAIN"] = local_cluster_engine
T
tangwei 已提交
45
    engines["TRANSPILER"]["CLUSTER"] = cluster_engine
C
Chengmo 已提交
46 47 48
    engines["PSLIB"]["SINGLE_TRAIN"] = local_mpi_engine
    engines["PSLIB"]["TRAIN"] = local_mpi_engine
    engines["PSLIB"]["LOCAL_CLUSTER_TRAIN"] = local_mpi_engine
T
tangwei 已提交
49
    engines["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine
C
Chengmo 已提交
50
    engines["PSLIB"]["CLUSTER_TRAIN"] = cluster_mpi_engine
T
tangwei 已提交
51
    engines["PSLIB"]["CLUSTER"] = cluster_mpi_engine
T
tangwei 已提交
52

T
tangwei 已提交
53

X
fix  
xjqbest 已提交
54
def get_inters_from_yaml(file, filters):
X
test  
xjqbest 已提交
55
    _envs = envs.load_yaml(file)
T
tangwei 已提交
56 57 58
    flattens = envs.flatten_environs(_envs)
    inters = {}
    for k, v in flattens.items():
X
fix  
xjqbest 已提交
59 60 61
        for f in filters:
            if k.startswith(f):
                inters[k] = v
T
tangwei 已提交
62
    return inters
T
tangwei 已提交
63 64


X
fix  
xjqbest 已提交
65
def get_all_inters_from_yaml(file, filters):
C
Chengmo 已提交
66
    _envs = envs.load_yaml(file)
X
fix  
xjqbest 已提交
67 68 69 70 71 72 73 74 75 76 77 78
    all_flattens = {}

    def fatten_env_namespace(namespace_nests, local_envs):
        for k, v in local_envs.items():
            if isinstance(v, dict):
                nests = copy.deepcopy(namespace_nests)
                nests.append(k)
                fatten_env_namespace(nests, v)
            elif (k == "dataset" or k == "phase" or
                  k == "runner") and isinstance(v, list):
                for i in v:
                    if i.get("name") is None:
C
Chengmo 已提交
79
                        raise ValueError("name must be in dataset list. ", v)
X
fix  
xjqbest 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
                    nests = copy.deepcopy(namespace_nests)
                    nests.append(k)
                    nests.append(i["name"])
                    fatten_env_namespace(nests, i)
            else:
                global_k = ".".join(namespace_nests + [k])
                all_flattens[global_k] = v

    fatten_env_namespace([], _envs)
    ret = {}
    for k, v in all_flattens.items():
        for f in filters:
            if k.startswith(f):
                ret[k] = v
    return ret


T
tangwei 已提交
97 98 99 100 101 102 103 104 105 106 107 108
def get_modes(running_config):
    if not isinstance(running_config, dict):
        raise ValueError("get_modes arguments must be [dict]")

    modes = running_config.get("mode")
    if not modes:
        raise ValueError("yaml mast have config: mode")

    return modes


def get_engine(args, running_config, mode):
T
tangwei 已提交
109
    transpiler = get_transpiler()
T
tangwei 已提交
110
    _envs = envs.load_yaml(args.model)
X
fix  
xjqbest 已提交
111

T
tangwei 已提交
112 113 114 115 116
    engine_class = ".".join(["runner", mode, "class"])
    engine_device = ".".join(["runner", mode, "device"])
    device_gpu_choices = ".".join(["runner", mode, "device", "selected_gpus"])

    engine = running_config.get(engine_class, None)
X
fix  
xjqbest 已提交
117
    if engine is None:
T
tangwei 已提交
118 119 120 121 122 123 124
        raise ValueError("not find {} in yaml, please check".format(
            mode, engine_class))
    device = running_config.get(engine_device, None)

    if device is None:
        print("not find device be specified in yaml, set CPU as default")
        device = "CPU"
C
Chengmo 已提交
125 126

    if device.upper() == "GPU":
T
tangwei 已提交
127 128 129 130 131 132 133 134 135 136 137
        selected_gpus = running_config.get(device_gpu_choices, None)

        if selected_gpus is None:
            print(
                "not find selected_gpus be specified in yaml, set `0` as default"
            )
            selected_gpus = ["0"]
        else:
            print("selected_gpus {} will be specified for running".format(
                selected_gpus))

C
Chengmo 已提交
138 139 140 141
        selected_gpus_num = len(selected_gpus.split(","))
        if selected_gpus_num > 1:
            engine = "LOCAL_CLUSTER"

T
tangwei 已提交
142 143
    engine = engine.upper()
    if engine not in engine_choices:
T
tangwei 已提交
144 145
        raise ValueError("{} can not be chosen in {}".format(engine_class,
                                                             engine_choices))
T
tangwei 已提交
146

T
tangwei 已提交
147
    run_engine = engines[transpiler].get(engine, None)
T
tangwei 已提交
148 149 150 151
    return run_engine


def get_transpiler():
T
tangwei 已提交
152
    FNULL = open(os.devnull, 'w')
T
tangwei 已提交
153 154 155 156
    cmd = [
        "python", "-c",
        "import paddle.fluid as fluid; fleet_ptr = fluid.core.Fleet(); [fleet_ptr.copy_table_by_feasign(10, 10, [2020, 1010])];"
    ]
T
tangwei 已提交
157 158 159
    proc = subprocess.Popen(cmd, stdout=FNULL, stderr=FNULL, cwd=os.getcwd())
    ret = proc.wait()
    if ret == -11:
T
tangwei 已提交
160
        return "PSLIB"
T
tangwei 已提交
161
    else:
T
tangwei 已提交
162
        return "TRANSPILER"
T
tangwei 已提交
163 164


T
tangwei 已提交
165 166 167
def set_runtime_envs(cluster_envs, engine_yaml):
    if cluster_envs is None:
        cluster_envs = {}
T
tangwei 已提交
168 169

    envs.set_runtime_environs(cluster_envs)
T
fix bug  
tangwei 已提交
170 171 172

    need_print = {}
    for k, v in os.environ.items():
T
tangwei 已提交
173
        if k.startswith("train.trainer."):
T
fix bug  
tangwei 已提交
174 175 176
            need_print[k] = v

    print(envs.pretty_print_envs(need_print, ("Runtime Envs", "Value")))
T
tangwei 已提交
177 178


C
Chengmo 已提交
179 180
def single_train_engine(args):
    _envs = envs.load_yaml(args.model)
T
tangwei 已提交
181 182 183 184 185 186 187 188 189 190 191 192 193
    run_extras = get_all_inters_from_yaml(args.model, ["runner."])

    mode = envs.get_runtime_environ("mode")
    trainer_class = ".".join(["runner", mode, "trainer_class"])
    fleet_class = ".".join(["runner", mode, "fleet_mode"])
    device_class = ".".join(["runner", mode, "device"])
    selected_gpus_class = ".".join(["runner", mode, "selected_gpus"])

    trainer = run_extras.get(trainer_class, "GeneralTrainer")
    fleet_mode = run_extras.get(fleet_class, "ps")
    device = run_extras.get(device_class, "cpu")
    selected_gpus = run_extras.get(selected_gpus_class, "0")
    executor_mode = "train"
T
tangwei 已提交
194

T
tangwei 已提交
195
    single_envs = {}
C
Chengmo 已提交
196 197

    if device.upper() == "GPU":
T
tangwei 已提交
198 199 200 201 202 203 204 205
        selected_gpus_num = len(selected_gpus.split(","))
        if selected_gpus_num != 1:
            raise ValueError(
                "Single Mode Only Support One GPU, Set Local Cluster Mode to use Multi-GPUS"
            )

        single_envs["selsected_gpus"] = selected_gpus
        single_envs["FLAGS_selected_gpus"] = selected_gpus
C
chengmo 已提交
206

C
chengmo 已提交
207
    single_envs["train.trainer.trainer"] = trainer
C
Chengmo 已提交
208 209
    single_envs["fleet_mode"] = fleet_mode
    single_envs["train.trainer.executor_mode"] = executor_mode
C
chengmo 已提交
210 211
    single_envs["train.trainer.threads"] = "2"
    single_envs["train.trainer.platform"] = envs.get_platform()
C
Chengmo 已提交
212 213
    single_envs["train.trainer.engine"] = "single"

X
fix  
xjqbest 已提交
214 215 216
    set_runtime_envs(single_envs, args.model)
    trainer = TrainerFactory.create(args.model)
    return trainer
X
fix  
xjqbest 已提交
217

X
fix  
xjqbest 已提交
218 219

def single_infer_engine(args):
C
Chengmo 已提交
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
    _envs = envs.load_yaml(args.model)
    run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."])
    trainer_class = run_extras.get(
        "runner." + _envs["mode"] + ".trainer_class", None)

    if trainer_class:
        trainer = trainer_class
    else:
        trainer = "GeneralTrainer"

    executor_mode = "infer"
    fleet_mode = run_extras.get("runner." + _envs["mode"] + ".fleet_mode",
                                "ps")

    device = run_extras.get("runner." + _envs["mode"] + ".device", "cpu")
    selected_gpus = run_extras.get(
        "runner." + _envs["mode"] + ".selected_gpus", "0")
    selected_gpus_num = len(selected_gpus.split(","))
    if device.upper() == "GPU":
        assert selected_gpus_num == 1, "Single Mode Only Support One GPU, Set Local Cluster Mode to use Multi-GPUS"

X
fix  
xjqbest 已提交
241
    single_envs = {}
C
Chengmo 已提交
242 243
    single_envs["selected_gpus"] = selected_gpus
    single_envs["FLAGS_selected_gpus"] = selected_gpus
X
fix  
xjqbest 已提交
244
    single_envs["train.trainer.trainer"] = trainer
C
Chengmo 已提交
245 246
    single_envs["train.trainer.executor_mode"] = executor_mode
    single_envs["fleet_mode"] = fleet_mode
X
fix  
xjqbest 已提交
247 248
    single_envs["train.trainer.threads"] = "2"
    single_envs["train.trainer.platform"] = envs.get_platform()
C
Chengmo 已提交
249 250
    single_envs["train.trainer.engine"] = "single"

X
fix  
xjqbest 已提交
251 252 253
    set_runtime_envs(single_envs, args.model)
    trainer = TrainerFactory.create(args.model)
    return trainer
C
chengmo 已提交
254

X
fix  
xjqbest 已提交
255

T
tangwei 已提交
256
def cluster_engine(args):
T
tangwei 已提交
257 258
    def update_workspace(cluster_envs):
        workspace = cluster_envs.get("engine_workspace", None)
T
tangwei 已提交
259

T
tangwei 已提交
260 261
        if not workspace:
            return
T
tangwei 已提交
262
        path = envs.path_adapter(workspace)
T
tangwei 已提交
263 264 265
        for name, value in cluster_envs.items():
            if isinstance(value, str):
                value = value.replace("{workspace}", path)
T
tangwei 已提交
266
                value = envs.windows_path_converter(value)
T
tangwei 已提交
267 268 269
                cluster_envs[name] = value

    def master():
T
tangwei 已提交
270
        role = "MASTER"
271
        from paddlerec.core.engine.cluster.cluster import ClusterEngine
X
test  
xjqbest 已提交
272
        _envs = envs.load_yaml(args.backend)
T
tangwei 已提交
273
        flattens = envs.flatten_environs(_envs, "_")
T
tangwei 已提交
274
        flattens["engine_role"] = role
T
tangwei 已提交
275
        flattens["engine_run_config"] = args.model
T
tangwei 已提交
276 277 278 279
        flattens["engine_temp_path"] = tempfile.mkdtemp()
        update_workspace(flattens)

        envs.set_runtime_environs(flattens)
C
Chengmo 已提交
280
        print(envs.pretty_print_envs(flattens, ("Submit Envs", "Value")))
T
tangwei 已提交
281 282 283 284 285

        launch = ClusterEngine(None, args.model)
        return launch

    def worker():
T
tangwei 已提交
286
        role = "WORKER"
C
Chengmo 已提交
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307

        _envs = envs.load_yaml(args.model)
        run_extras = get_all_inters_from_yaml(args.model,
                                              ["train.", "runner."])
        trainer_class = run_extras.get(
            "runner." + _envs["mode"] + ".trainer_class", None)

        if trainer_class:
            trainer = trainer_class
        else:
            trainer = "GeneralTrainer"

        executor_mode = "train"

        distributed_strategy = run_extras.get(
            "runner." + _envs["mode"] + ".distribute_strategy", "async")
        selected_gpus = run_extras.get(
            "runner." + _envs["mode"] + ".selected_gpus", "0")
        fleet_mode = run_extras.get("runner." + _envs["mode"] + ".fleet_mode",
                                    "ps")

T
tangwei 已提交
308
        cluster_envs = {}
C
Chengmo 已提交
309 310
        cluster_envs["selected_gpus"] = selected_gpus
        cluster_envs["fleet_mode"] = fleet_mode
T
tangwei 已提交
311
        cluster_envs["train.trainer.trainer"] = trainer
C
Chengmo 已提交
312
        cluster_envs["train.trainer.executor_mode"] = executor_mode
T
tangwei 已提交
313
        cluster_envs["train.trainer.engine"] = "cluster"
C
Chengmo 已提交
314
        cluster_envs["train.trainer.strategy"] = distributed_strategy
T
tangwei 已提交
315 316
        cluster_envs["train.trainer.threads"] = envs.get_runtime_environ(
            "CPU_NUM")
T
tangwei 已提交
317
        cluster_envs["train.trainer.platform"] = envs.get_platform()
C
chengmo 已提交
318 319
        print("launch {} engine with cluster to with model: {}".format(
            trainer, args.model))
T
tangwei 已提交
320
        set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
321

T
bug fix  
tangwei12 已提交
322 323
        trainer = TrainerFactory.create(args.model)
        return trainer
T
tangwei 已提交
324

T
tangwei 已提交
325 326 327
    role = os.getenv("PADDLE_PADDLEREC_ROLE", "MASTER")

    if role == "WORKER":
T
tangwei 已提交
328 329 330
        return worker()
    else:
        return master()
C
chengmo 已提交
331 332


T
tangwei 已提交
333
def cluster_mpi_engine(args):
T
tangwei 已提交
334 335
    print("launch cluster engine with cluster to run model: {}".format(
        args.model))
T
tangwei 已提交
336

T
fix bug  
tangwei 已提交
337
    cluster_envs = {}
T
tangwei 已提交
338
    cluster_envs["train.trainer.trainer"] = "CtrCodingTrainer"
T
tangwei 已提交
339
    cluster_envs["train.trainer.platform"] = envs.get_platform()
T
tangwei 已提交
340

T
tangwei 已提交
341
    set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
342

T
tangwei 已提交
343 344 345 346 347
    trainer = TrainerFactory.create(args.model)
    return trainer


def local_cluster_engine(args):
348
    from paddlerec.core.engine.local_cluster import LocalClusterEngine
C
chengmo 已提交
349

C
Chengmo 已提交
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376
    _envs = envs.load_yaml(args.model)
    run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."])
    trainer_class = run_extras.get("runner." + _envs["mode"] + ".runner_class",
                                   None)

    if trainer_class:
        trainer = trainer_class
    else:
        trainer = "GeneralTrainer"

    executor_mode = "train"
    distributed_strategy = run_extras.get(
        "runner." + _envs["mode"] + ".distribute_strategy", "async")

    worker_num = run_extras.get("runner." + _envs["mode"] + ".worker_num", 1)
    server_num = run_extras.get("runner." + _envs["mode"] + ".server_num", 1)
    selected_gpus = run_extras.get(
        "runner." + _envs["mode"] + ".selected_gpus", "0")

    fleet_mode = run_extras.get("runner." + _envs["mode"] + ".fleet_mode", "")
    if fleet_mode == "":
        device = run_extras.get("runner." + _envs["mode"] + ".device", "cpu")
        if len(selected_gpus.split(",")) > 1 and device.upper() == "GPU":
            fleet_mode = "COLLECTIVE"
        else:
            fleet_mode = "PS"

C
chengmo 已提交
377
    cluster_envs = {}
C
Chengmo 已提交
378 379 380
    cluster_envs["server_num"] = server_num
    cluster_envs["worker_num"] = worker_num
    cluster_envs["selected_gpus"] = selected_gpus
C
chengmo 已提交
381
    cluster_envs["start_port"] = envs.find_free_port()
C
Chengmo 已提交
382
    cluster_envs["fleet_mode"] = fleet_mode
C
chengmo 已提交
383
    cluster_envs["log_dir"] = "logs"
C
chengmo 已提交
384
    cluster_envs["train.trainer.trainer"] = trainer
C
Chengmo 已提交
385 386
    cluster_envs["train.trainer.executor_mode"] = executor_mode
    cluster_envs["train.trainer.strategy"] = distributed_strategy
C
chengmo 已提交
387 388 389 390 391
    cluster_envs["train.trainer.threads"] = "2"
    cluster_envs["train.trainer.engine"] = "local_cluster"
    cluster_envs["train.trainer.platform"] = envs.get_platform()

    cluster_envs["CPU_NUM"] = "2"
T
tangwei 已提交
392 393
    print("launch {} engine with cluster to run model: {}".format(trainer,
                                                                  args.model))
C
chengmo 已提交
394 395 396 397 398 399

    set_runtime_envs(cluster_envs, args.model)
    launch = LocalClusterEngine(cluster_envs, args.model)
    return launch


T
tangwei 已提交
400
def local_mpi_engine(args):
T
tangwei 已提交
401 402
    print("launch cluster engine with cluster to run model: {}".format(
        args.model))
403
    from paddlerec.core.engine.local_mpi import LocalMPIEngine
T
tangwei 已提交
404

T
tangwei 已提交
405 406
    print("use 1X1 MPI ClusterTraining at localhost to run model: {}".format(
        args.model))
T
tangwei 已提交
407

T
tangwei 已提交
408 409 410
    mpi = util.run_which("mpirun")
    if not mpi:
        raise RuntimeError("can not find mpirun, please check environment")
C
Chengmo 已提交
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426

    _envs = envs.load_yaml(args.model)
    run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."])
    trainer_class = run_extras.get("runner." + _envs["mode"] + ".runner_class",
                                   None)
    executor_mode = "train"
    distributed_strategy = run_extras.get(
        "runner." + _envs["mode"] + ".distribute_strategy", "async")
    fleet_mode = run_extras.get("runner." + _envs["mode"] + ".fleet_mode",
                                "ps")

    if trainer_class:
        trainer = trainer_class
    else:
        trainer = "GeneralTrainer"

T
fix bug  
tangwei 已提交
427 428
    cluster_envs = {}
    cluster_envs["mpirun"] = mpi
C
Chengmo 已提交
429
    cluster_envs["train.trainer.trainer"] = trainer
T
fix bug  
tangwei 已提交
430
    cluster_envs["log_dir"] = "logs"
T
tangwei 已提交
431
    cluster_envs["train.trainer.engine"] = "local_cluster"
C
Chengmo 已提交
432 433 434 435 436
    cluster_envs["train.trainer.executor_mode"] = executor_mode
    cluster_envs["fleet_mode"] = fleet_mode
    cluster_envs["train.trainer.strategy"] = distributed_strategy
    cluster_envs["train.trainer.threads"] = "2"
    cluster_envs["train.trainer.engine"] = "local_cluster"
T
tangwei 已提交
437
    cluster_envs["train.trainer.platform"] = envs.get_platform()
T
tangwei 已提交
438

T
tangwei 已提交
439
    set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
440 441 442 443
    launch = LocalMPIEngine(cluster_envs, args.model)
    return launch


T
tangwei 已提交
444
def get_abs_model(model):
445
    if model.startswith("paddlerec."):
T
tangwei 已提交
446
        dir = envs.path_adapter(model)
T
tangwei 已提交
447
        path = os.path.join(dir, "config.yaml")
T
tangwei 已提交
448 449 450 451 452 453 454
    else:
        if not os.path.isfile(model):
            raise IOError("model config: {} invalid".format(model))
        path = model
    return path


T
tangwei 已提交
455
if __name__ == "__main__":
456
    parser = argparse.ArgumentParser(description='paddle-rec run')
T
tangwei 已提交
457
    parser.add_argument("-m", "--model", type=str)
T
tangwei 已提交
458
    parser.add_argument("-b", "--backend", type=str, default=None)
T
tangwei 已提交
459

T
tangwei 已提交
460 461 462
    abs_dir = os.path.dirname(os.path.abspath(__file__))
    envs.set_runtime_environs({"PACKAGE_BASE": abs_dir})

T
tangwei 已提交
463
    args = parser.parse_args()
C
fix  
chengmo 已提交
464
    model_name = args.model.split('.')[-1]
T
tangwei 已提交
465
    args.model = get_abs_model(args.model)
T
tangwei 已提交
466

X
test  
xjqbest 已提交
467 468
    if not validation.yaml_validation(args.model):
        sys.exit(-1)
T
tangwei 已提交
469
    engine_registry()
T
tangwei 已提交
470 471 472 473 474 475 476 477 478

    running_config = get_all_inters_from_yaml(args.model, ["mode", "runner."])
    modes = get_modes(running_config)

    for mode in modes:
        envs.set_runtime_environs({"mode": mode})
        which_engine = get_engine(args, running_config, mode)
        engine = which_engine(args)
        engine.run()