run.py 18.0 KB
Newer Older
T
tangwei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

T
tangwei 已提交
15
import os
T
tangwei 已提交
16
import subprocess
X
test  
xjqbest 已提交
17
import sys
T
tangwei 已提交
18 19
import argparse
import tempfile
C
Chengmo 已提交
20

X
fix  
xjqbest 已提交
21
import copy
22 23 24
from paddlerec.core.factory import TrainerFactory
from paddlerec.core.utils import envs
from paddlerec.core.utils import util
X
test  
xjqbest 已提交
25
from paddlerec.core.utils import validation
T
tangwei 已提交
26

T
tangwei 已提交
27 28
engines = {}
device = ["CPU", "GPU"]
J
Jinhua Liang 已提交
29 30

engine_choices = ["TRAIN", "INFER", "LOCAL_CLUSTER_TRAIN", "CLUSTER_TRAIN"]
T
tangwei 已提交
31 32


T
tangwei 已提交
33
def engine_registry():
T
tangwei 已提交
34 35 36
    engines["TRANSPILER"] = {}
    engines["PSLIB"] = {}

C
Chengmo 已提交
37 38 39
    engines["TRANSPILER"]["TRAIN"] = single_train_engine
    engines["TRANSPILER"]["INFER"] = single_infer_engine
    engines["TRANSPILER"]["LOCAL_CLUSTER_TRAIN"] = local_cluster_engine
T
tangwei 已提交
40
    engines["TRANSPILER"]["CLUSTER"] = cluster_engine
C
Chengmo 已提交
41 42 43
    engines["PSLIB"]["TRAIN"] = local_mpi_engine
    engines["PSLIB"]["LOCAL_CLUSTER_TRAIN"] = local_mpi_engine
    engines["PSLIB"]["CLUSTER_TRAIN"] = cluster_mpi_engine
T
tangwei 已提交
44
    engines["PSLIB"]["CLUSTER"] = cluster_mpi_engine
T
tangwei 已提交
45

T
tangwei 已提交
46

X
fix  
xjqbest 已提交
47
def get_inters_from_yaml(file, filters):
X
test  
xjqbest 已提交
48
    _envs = envs.load_yaml(file)
T
tangwei 已提交
49 50 51
    flattens = envs.flatten_environs(_envs)
    inters = {}
    for k, v in flattens.items():
X
fix  
xjqbest 已提交
52 53 54
        for f in filters:
            if k.startswith(f):
                inters[k] = v
T
tangwei 已提交
55
    return inters
T
tangwei 已提交
56 57


X
fix  
xjqbest 已提交
58
def get_all_inters_from_yaml(file, filters):
C
Chengmo 已提交
59
    _envs = envs.load_yaml(file)
X
fix  
xjqbest 已提交
60 61 62 63 64 65 66 67 68 69 70 71
    all_flattens = {}

    def fatten_env_namespace(namespace_nests, local_envs):
        for k, v in local_envs.items():
            if isinstance(v, dict):
                nests = copy.deepcopy(namespace_nests)
                nests.append(k)
                fatten_env_namespace(nests, v)
            elif (k == "dataset" or k == "phase" or
                  k == "runner") and isinstance(v, list):
                for i in v:
                    if i.get("name") is None:
C
Chengmo 已提交
72
                        raise ValueError("name must be in dataset list. ", v)
X
fix  
xjqbest 已提交
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
                    nests = copy.deepcopy(namespace_nests)
                    nests.append(k)
                    nests.append(i["name"])
                    fatten_env_namespace(nests, i)
            else:
                global_k = ".".join(namespace_nests + [k])
                all_flattens[global_k] = v

    fatten_env_namespace([], _envs)
    ret = {}
    for k, v in all_flattens.items():
        for f in filters:
            if k.startswith(f):
                ret[k] = v
    return ret


T
tangwei 已提交
90 91 92 93 94 95 96 97
def get_modes(running_config):
    if not isinstance(running_config, dict):
        raise ValueError("get_modes arguments must be [dict]")

    modes = running_config.get("mode")
    if not modes:
        raise ValueError("yaml mast have config: mode")

T
tangwei 已提交
98 99 100
    if isinstance(modes, str):
        modes = [modes]

T
tangwei 已提交
101 102 103 104
    return modes


def get_engine(args, running_config, mode):
T
tangwei 已提交
105
    transpiler = get_transpiler()
X
fix  
xjqbest 已提交
106

T
tangwei 已提交
107 108
    engine_class = ".".join(["runner", mode, "class"])
    engine_device = ".".join(["runner", mode, "device"])
C
Chengmo 已提交
109
    device_gpu_choices = ".".join(["runner", mode, "selected_gpus"])
T
tangwei 已提交
110 111

    engine = running_config.get(engine_class, None)
X
fix  
xjqbest 已提交
112
    if engine is None:
T
tangwei 已提交
113 114 115 116
        raise ValueError("not find {} in yaml, please check".format(
            mode, engine_class))
    device = running_config.get(engine_device, None)

T
tangwei 已提交
117 118 119
    engine = engine.upper()
    device = device.upper()

T
tangwei 已提交
120 121 122
    if device is None:
        print("not find device be specified in yaml, set CPU as default")
        device = "CPU"
C
Chengmo 已提交
123

T
tangwei 已提交
124
    if device == "GPU":
T
tangwei 已提交
125 126 127 128 129 130
        selected_gpus = running_config.get(device_gpu_choices, None)

        if selected_gpus is None:
            print(
                "not find selected_gpus be specified in yaml, set `0` as default"
            )
C
Chengmo 已提交
131
            selected_gpus = "0"
T
tangwei 已提交
132 133 134 135
        else:
            print("selected_gpus {} will be specified for running".format(
                selected_gpus))

C
Chengmo 已提交
136 137
        selected_gpus_num = len(selected_gpus.split(","))
        if selected_gpus_num > 1:
J
Jinhua Liang 已提交
138
            engine = "LOCAL_CLUSTER_TRAIN"
C
Chengmo 已提交
139

T
tangwei 已提交
140
    if engine not in engine_choices:
T
tangwei 已提交
141 142
        raise ValueError("{} can not be chosen in {}".format(engine_class,
                                                             engine_choices))
T
tangwei 已提交
143

T
tangwei 已提交
144
    run_engine = engines[transpiler].get(engine, None)
T
tangwei 已提交
145 146 147 148
    return run_engine


def get_transpiler():
T
tangwei 已提交
149
    FNULL = open(os.devnull, 'w')
T
tangwei 已提交
150 151 152 153
    cmd = [
        "python", "-c",
        "import paddle.fluid as fluid; fleet_ptr = fluid.core.Fleet(); [fleet_ptr.copy_table_by_feasign(10, 10, [2020, 1010])];"
    ]
T
tangwei 已提交
154 155 156
    proc = subprocess.Popen(cmd, stdout=FNULL, stderr=FNULL, cwd=os.getcwd())
    ret = proc.wait()
    if ret == -11:
T
tangwei 已提交
157
        return "PSLIB"
T
tangwei 已提交
158
    else:
T
tangwei 已提交
159
        return "TRANSPILER"
T
tangwei 已提交
160 161


T
tangwei 已提交
162 163 164
def set_runtime_envs(cluster_envs, engine_yaml):
    if cluster_envs is None:
        cluster_envs = {}
T
tangwei 已提交
165 166

    envs.set_runtime_environs(cluster_envs)
T
fix bug  
tangwei 已提交
167 168 169

    need_print = {}
    for k, v in os.environ.items():
T
tangwei 已提交
170
        if k.startswith("train.trainer."):
T
fix bug  
tangwei 已提交
171 172 173
            need_print[k] = v

    print(envs.pretty_print_envs(need_print, ("Runtime Envs", "Value")))
T
tangwei 已提交
174 175


C
Chengmo 已提交
176
def single_train_engine(args):
T
tangwei 已提交
177 178 179 180 181 182 183 184 185 186 187 188
    run_extras = get_all_inters_from_yaml(args.model, ["runner."])
    mode = envs.get_runtime_environ("mode")
    trainer_class = ".".join(["runner", mode, "trainer_class"])
    fleet_class = ".".join(["runner", mode, "fleet_mode"])
    device_class = ".".join(["runner", mode, "device"])
    selected_gpus_class = ".".join(["runner", mode, "selected_gpus"])

    trainer = run_extras.get(trainer_class, "GeneralTrainer")
    fleet_mode = run_extras.get(fleet_class, "ps")
    device = run_extras.get(device_class, "cpu")
    selected_gpus = run_extras.get(selected_gpus_class, "0")
    executor_mode = "train"
T
tangwei 已提交
189

T
tangwei 已提交
190
    single_envs = {}
C
Chengmo 已提交
191 192

    if device.upper() == "GPU":
T
tangwei 已提交
193 194 195 196 197 198 199 200
        selected_gpus_num = len(selected_gpus.split(","))
        if selected_gpus_num != 1:
            raise ValueError(
                "Single Mode Only Support One GPU, Set Local Cluster Mode to use Multi-GPUS"
            )

        single_envs["selsected_gpus"] = selected_gpus
        single_envs["FLAGS_selected_gpus"] = selected_gpus
C
chengmo 已提交
201

C
chengmo 已提交
202
    single_envs["train.trainer.trainer"] = trainer
C
Chengmo 已提交
203 204
    single_envs["fleet_mode"] = fleet_mode
    single_envs["train.trainer.executor_mode"] = executor_mode
C
chengmo 已提交
205 206
    single_envs["train.trainer.threads"] = "2"
    single_envs["train.trainer.platform"] = envs.get_platform()
C
Chengmo 已提交
207 208
    single_envs["train.trainer.engine"] = "single"

X
fix  
xjqbest 已提交
209 210 211
    set_runtime_envs(single_envs, args.model)
    trainer = TrainerFactory.create(args.model)
    return trainer
X
fix  
xjqbest 已提交
212

X
fix  
xjqbest 已提交
213 214

def single_infer_engine(args):
T
tangwei 已提交
215
    run_extras = get_all_inters_from_yaml(args.model, ["runner."])
C
Chengmo 已提交
216

T
tangwei 已提交
217 218 219 220 221
    mode = envs.get_runtime_environ("mode")
    trainer_class = ".".join(["runner", mode, "trainer_class"])
    fleet_class = ".".join(["runner", mode, "fleet_mode"])
    device_class = ".".join(["runner", mode, "device"])
    selected_gpus_class = ".".join(["runner", mode, "selected_gpus"])
C
Chengmo 已提交
222

T
tangwei 已提交
223 224 225 226
    trainer = run_extras.get(trainer_class, "GeneralTrainer")
    fleet_mode = run_extras.get(fleet_class, "ps")
    device = run_extras.get(device_class, "cpu")
    selected_gpus = run_extras.get(selected_gpus_class, "0")
C
Chengmo 已提交
227 228
    executor_mode = "infer"

T
tangwei 已提交
229 230
    single_envs = {}

C
Chengmo 已提交
231
    if device.upper() == "GPU":
T
tangwei 已提交
232 233 234 235 236 237 238 239
        selected_gpus_num = len(selected_gpus.split(","))
        if selected_gpus_num != 1:
            raise ValueError(
                "Single Mode Only Support One GPU, Set Local Cluster Mode to use Multi-GPUS"
            )

        single_envs["selsected_gpus"] = selected_gpus
        single_envs["FLAGS_selected_gpus"] = selected_gpus
C
Chengmo 已提交
240

X
fix  
xjqbest 已提交
241
    single_envs["train.trainer.trainer"] = trainer
C
Chengmo 已提交
242 243
    single_envs["train.trainer.executor_mode"] = executor_mode
    single_envs["fleet_mode"] = fleet_mode
X
fix  
xjqbest 已提交
244 245
    single_envs["train.trainer.threads"] = "2"
    single_envs["train.trainer.platform"] = envs.get_platform()
C
Chengmo 已提交
246 247
    single_envs["train.trainer.engine"] = "single"

X
fix  
xjqbest 已提交
248 249 250
    set_runtime_envs(single_envs, args.model)
    trainer = TrainerFactory.create(args.model)
    return trainer
C
chengmo 已提交
251

X
fix  
xjqbest 已提交
252

T
tangwei 已提交
253
def cluster_engine(args):
T
tangwei 已提交
254
    def master():
255
        from paddlerec.core.engine.cluster.cluster import ClusterEngine
X
test  
xjqbest 已提交
256
        _envs = envs.load_yaml(args.backend)
T
tangwei 已提交
257
        flattens = envs.flatten_environs(_envs, "_")
J
Jinhua Liang 已提交
258 259
        flattens["engine_role"] = "MASTER"
        flattens["engine_mode"] = envs.get_runtime_environ("mode")
T
tangwei 已提交
260
        flattens["engine_run_config"] = args.model
T
tangwei 已提交
261 262
        flattens["engine_temp_path"] = tempfile.mkdtemp()
        envs.set_runtime_environs(flattens)
J
Jinhua Liang 已提交
263
        ClusterEngine.workspace_replace()
C
Chengmo 已提交
264
        print(envs.pretty_print_envs(flattens, ("Submit Envs", "Value")))
T
tangwei 已提交
265 266 267 268

        launch = ClusterEngine(None, args.model)
        return launch

J
Jinhua Liang 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
    def worker(mode):
        if not mode:
            raise ValueError("mode: {} can not be recognized")

        run_extras = get_all_inters_from_yaml(args.model, ["runner."])

        trainer_class = ".".join(["runner", mode, "trainer_class"])
        fleet_class = ".".join(["runner", mode, "fleet_mode"])
        device_class = ".".join(["runner", mode, "device"])
        selected_gpus_class = ".".join(["runner", mode, "selected_gpus"])
        strategy_class = ".".join(["runner", mode, "distribute_strategy"])
        worker_class = ".".join(["runner", mode, "worker_num"])
        server_class = ".".join(["runner", mode, "server_num"])

        trainer = run_extras.get(trainer_class, "GeneralTrainer")
        fleet_mode = run_extras.get(fleet_class, "ps")
        device = run_extras.get(device_class, "cpu")
        selected_gpus = run_extras.get(selected_gpus_class, "0")
        distributed_strategy = run_extras.get(strategy_class, "async")
        worker_num = run_extras.get(worker_class, 1)
        server_num = run_extras.get(server_class, 1)
        executor_mode = "train"
C
Chengmo 已提交
291

J
Jinhua Liang 已提交
292 293
        device = device.upper()
        fleet_mode = fleet_mode.upper()
C
Chengmo 已提交
294

J
Jinhua Liang 已提交
295 296
        if fleet_mode == "COLLECTIVE" and device != "GPU":
            raise ValueError("COLLECTIVE can not be used with GPU")
C
Chengmo 已提交
297

J
Jinhua Liang 已提交
298
        cluster_envs = {}
C
Chengmo 已提交
299

J
Jinhua Liang 已提交
300 301
        if device == "GPU":
            cluster_envs["selected_gpus"] = selected_gpus
C
Chengmo 已提交
302

J
Jinhua Liang 已提交
303 304
        cluster_envs["server_num"] = server_num
        cluster_envs["worker_num"] = worker_num
C
Chengmo 已提交
305
        cluster_envs["fleet_mode"] = fleet_mode
T
tangwei 已提交
306 307
        cluster_envs["train.trainer.trainer"] = trainer
        cluster_envs["train.trainer.engine"] = "cluster"
J
Jinhua Liang 已提交
308
        cluster_envs["train.trainer.executor_mode"] = executor_mode
C
Chengmo 已提交
309
        cluster_envs["train.trainer.strategy"] = distributed_strategy
T
tangwei 已提交
310 311
        cluster_envs["train.trainer.threads"] = envs.get_runtime_environ(
            "CPU_NUM")
T
tangwei 已提交
312
        cluster_envs["train.trainer.platform"] = envs.get_platform()
C
chengmo 已提交
313 314
        print("launch {} engine with cluster to with model: {}".format(
            trainer, args.model))
T
tangwei 已提交
315
        set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
316

T
bug fix  
tangwei12 已提交
317 318
        trainer = TrainerFactory.create(args.model)
        return trainer
T
tangwei 已提交
319

T
tangwei 已提交
320 321 322
    role = os.getenv("PADDLE_PADDLEREC_ROLE", "MASTER")

    if role == "WORKER":
J
Jinhua Liang 已提交
323 324
        mode = os.getenv("PADDLE_PADDLEREC_MODE", None)
        return worker(mode)
T
tangwei 已提交
325 326
    else:
        return master()
C
chengmo 已提交
327 328


T
tangwei 已提交
329
def cluster_mpi_engine(args):
T
tangwei 已提交
330 331
    print("launch cluster engine with cluster to run model: {}".format(
        args.model))
T
tangwei 已提交
332

T
fix bug  
tangwei 已提交
333
    cluster_envs = {}
T
tangwei 已提交
334
    cluster_envs["train.trainer.trainer"] = "CtrCodingTrainer"
T
tangwei 已提交
335
    cluster_envs["train.trainer.platform"] = envs.get_platform()
T
tangwei 已提交
336

T
tangwei 已提交
337
    set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
338

T
tangwei 已提交
339 340 341 342 343
    trainer = TrainerFactory.create(args.model)
    return trainer


def local_cluster_engine(args):
J
Jinhua Liang 已提交
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
    def get_worker_num(run_extras, workers):
        _envs = envs.load_yaml(args.model)
        mode = envs.get_runtime_environ("mode")
        workspace = envs.get_runtime_environ("workspace")
        phases_class = ".".join(["runner", mode, "phases"])
        phase_names = run_extras.get(phases_class)
        phases = []
        all_phases = _envs.get("phase")
        if phase_names is None:
            phases = all_phases
        else:
            for phase in all_phases:
                if phase["name"] in phase_names:
                    phases.append(phase)

        dataset_names = []
        for phase in phases:
            dataset_names.append(phase["dataset_name"])

        datapaths = []
        for dataset in _envs.get("dataset"):
            if dataset["name"] in dataset_names:
                datapaths.append(dataset["data_path"])

        if not datapaths:
            raise ValueError("data path must exist for training/inference")

        datapaths = [
            envs.workspace_adapter_by_specific(path, workspace)
            for path in datapaths
        ]
        all_workers = [len(os.listdir(path)) for path in datapaths]
        all_workers.append(workers)
        return min(all_workers)
C
chengmo 已提交
378

J
Jinhua Liang 已提交
379
    from paddlerec.core.engine.local_cluster import LocalClusterEngine
C
Chengmo 已提交
380

J
Jinhua Liang 已提交
381 382 383 384 385 386 387 388 389
    run_extras = get_all_inters_from_yaml(args.model, ["runner."])
    mode = envs.get_runtime_environ("mode")
    trainer_class = ".".join(["runner", mode, "trainer_class"])
    fleet_class = ".".join(["runner", mode, "fleet_mode"])
    device_class = ".".join(["runner", mode, "device"])
    selected_gpus_class = ".".join(["runner", mode, "selected_gpus"])
    strategy_class = ".".join(["runner", mode, "distribute_strategy"])
    worker_class = ".".join(["runner", mode, "worker_num"])
    server_class = ".".join(["runner", mode, "server_num"])
C
Chengmo 已提交
390

J
Jinhua Liang 已提交
391 392 393 394 395
    trainer = run_extras.get(trainer_class, "GeneralTrainer")
    fleet_mode = run_extras.get(fleet_class, "ps")
    device = run_extras.get(device_class, "cpu")
    selected_gpus = run_extras.get(selected_gpus_class, "0")
    distributed_strategy = run_extras.get(strategy_class, "async")
C
Chengmo 已提交
396
    executor_mode = "train"
J
Jinhua Liang 已提交
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412

    worker_num = run_extras.get(worker_class, 1)
    server_num = run_extras.get(server_class, 1)
    max_worker_num = get_worker_num(run_extras, worker_num)

    if max_worker_num < worker_num:
        print(
            "has phase do not have enough datas for training, set worker num from {} to {}".
            format(worker_num, max_worker_num))
        worker_num = max_worker_num

    device = device.upper()
    fleet_mode = fleet_mode.upper()

    if fleet_mode == "COLLECTIVE" and device != "GPU":
        raise ValueError("COLLECTIVE can not be used with GPU")
C
Chengmo 已提交
413

C
chengmo 已提交
414
    cluster_envs = {}
J
Jinhua Liang 已提交
415 416 417 418

    if device == "GPU":
        cluster_envs["selected_gpus"] = selected_gpus

C
Chengmo 已提交
419 420
    cluster_envs["server_num"] = server_num
    cluster_envs["worker_num"] = worker_num
C
chengmo 已提交
421
    cluster_envs["start_port"] = envs.find_free_port()
C
Chengmo 已提交
422
    cluster_envs["fleet_mode"] = fleet_mode
C
chengmo 已提交
423
    cluster_envs["log_dir"] = "logs"
C
chengmo 已提交
424
    cluster_envs["train.trainer.trainer"] = trainer
C
Chengmo 已提交
425 426
    cluster_envs["train.trainer.executor_mode"] = executor_mode
    cluster_envs["train.trainer.strategy"] = distributed_strategy
C
chengmo 已提交
427
    cluster_envs["train.trainer.threads"] = "2"
J
Jinhua Liang 已提交
428
    cluster_envs["CPU_NUM"] = cluster_envs["train.trainer.threads"]
C
chengmo 已提交
429 430 431
    cluster_envs["train.trainer.engine"] = "local_cluster"
    cluster_envs["train.trainer.platform"] = envs.get_platform()

T
tangwei 已提交
432 433
    print("launch {} engine with cluster to run model: {}".format(trainer,
                                                                  args.model))
C
chengmo 已提交
434 435 436 437 438 439

    set_runtime_envs(cluster_envs, args.model)
    launch = LocalClusterEngine(cluster_envs, args.model)
    return launch


T
tangwei 已提交
440
def local_mpi_engine(args):
T
tangwei 已提交
441 442
    print("launch cluster engine with cluster to run model: {}".format(
        args.model))
443
    from paddlerec.core.engine.local_mpi import LocalMPIEngine
T
tangwei 已提交
444

T
tangwei 已提交
445 446
    print("use 1X1 MPI ClusterTraining at localhost to run model: {}".format(
        args.model))
T
tangwei 已提交
447

T
tangwei 已提交
448 449 450
    mpi = util.run_which("mpirun")
    if not mpi:
        raise RuntimeError("can not find mpirun, please check environment")
C
Chengmo 已提交
451

J
Jinhua Liang 已提交
452 453 454 455 456 457
    run_extras = get_all_inters_from_yaml(args.model, ["runner."])

    mode = envs.get_runtime_environ("mode")
    trainer_class = ".".join(["runner", mode, "trainer_class"])
    fleet_class = ".".join(["runner", mode, "fleet_mode"])
    distributed_strategy = "async"
C
Chengmo 已提交
458 459
    executor_mode = "train"

J
Jinhua Liang 已提交
460 461
    trainer = run_extras.get(trainer_class, "GeneralTrainer")
    fleet_mode = run_extras.get(fleet_class, "ps")
C
Chengmo 已提交
462

T
fix bug  
tangwei 已提交
463 464
    cluster_envs = {}
    cluster_envs["mpirun"] = mpi
C
Chengmo 已提交
465
    cluster_envs["train.trainer.trainer"] = trainer
T
fix bug  
tangwei 已提交
466
    cluster_envs["log_dir"] = "logs"
T
tangwei 已提交
467
    cluster_envs["train.trainer.engine"] = "local_cluster"
C
Chengmo 已提交
468 469 470 471
    cluster_envs["train.trainer.executor_mode"] = executor_mode
    cluster_envs["fleet_mode"] = fleet_mode
    cluster_envs["train.trainer.strategy"] = distributed_strategy
    cluster_envs["train.trainer.threads"] = "2"
T
tangwei 已提交
472
    cluster_envs["train.trainer.platform"] = envs.get_platform()
T
tangwei 已提交
473

T
tangwei 已提交
474
    set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
475 476 477 478
    launch = LocalMPIEngine(cluster_envs, args.model)
    return launch


T
tangwei 已提交
479
def get_abs_model(model):
480
    if model.startswith("paddlerec."):
T
tangwei 已提交
481
        dir = envs.paddlerec_adapter(model)
T
tangwei 已提交
482
        path = os.path.join(dir, "config.yaml")
T
tangwei 已提交
483 484 485 486 487 488 489
    else:
        if not os.path.isfile(model):
            raise IOError("model config: {} invalid".format(model))
        path = model
    return path


T
tangwei 已提交
490
if __name__ == "__main__":
491
    parser = argparse.ArgumentParser(description='paddle-rec run')
T
tangwei 已提交
492
    parser.add_argument("-m", "--model", type=str)
T
tangwei 已提交
493
    parser.add_argument("-b", "--backend", type=str, default=None)
T
tangwei 已提交
494

T
tangwei 已提交
495 496 497
    abs_dir = os.path.dirname(os.path.abspath(__file__))
    envs.set_runtime_environs({"PACKAGE_BASE": abs_dir})

T
tangwei 已提交
498
    args = parser.parse_args()
T
tangwei 已提交
499
    args.model = get_abs_model(args.model)
T
tangwei 已提交
500

X
test  
xjqbest 已提交
501 502
    if not validation.yaml_validation(args.model):
        sys.exit(-1)
T
tangwei 已提交
503

T
tangwei 已提交
504
    engine_registry()
J
Jinhua Liang 已提交
505 506
    running_config = get_all_inters_from_yaml(
        args.model, ["workspace", "mode", "runner."])
T
tangwei 已提交
507 508 509
    modes = get_modes(running_config)

    for mode in modes:
J
Jinhua Liang 已提交
510 511 512 513
        envs.set_runtime_environs({
            "mode": mode,
            "workspace": running_config["workspace"]
        })
T
tangwei 已提交
514 515 516
        which_engine = get_engine(args, running_config, mode)
        engine = which_engine(args)
        engine.run()