run.py 9.4 KB
Newer Older
T
tangwei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

T
tangwei 已提交
15
import os
T
tangwei 已提交
16
import subprocess
T
tangwei 已提交
17

T
tangwei 已提交
18 19
import argparse
import tempfile
T
tangwei 已提交
20
import yaml
T
tangwei 已提交
21

22 23 24
from paddlerec.core.factory import TrainerFactory
from paddlerec.core.utils import envs
from paddlerec.core.utils import util
T
tangwei 已提交
25

T
tangwei 已提交
26 27
engines = {}
device = ["CPU", "GPU"]
T
tangwei 已提交
28
clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"]
T
tangwei 已提交
29 30
engine_choices = [
    "SINGLE", "LOCAL_CLUSTER", "CLUSTER", "TDM_SINGLE", "TDM_LOCAL_CLUSTER",
X
fix  
xjqbest 已提交
31
    "TDM_CLUSTER", "SINGLE_YAMLOPT"
T
tangwei 已提交
32
]
T
tangwei 已提交
33
custom_model = ['TDM']
C
fix  
chengmo 已提交
34
model_name = ""
T
tangwei 已提交
35 36


T
tangwei 已提交
37
def engine_registry():
T
tangwei 已提交
38 39 40
    engines["TRANSPILER"] = {}
    engines["PSLIB"] = {}

T
tangwei 已提交
41 42 43
    engines["TRANSPILER"]["SINGLE"] = single_engine
    engines["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine
    engines["TRANSPILER"]["CLUSTER"] = cluster_engine
T
tangwei 已提交
44

X
fix  
xjqbest 已提交
45 46
    engines["TRANSPILER"]["SINGLE_YAMLOPT"] = single_yamlopt_engine

T
tangwei 已提交
47 48 49
    engines["PSLIB"]["SINGLE"] = local_mpi_engine
    engines["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine
    engines["PSLIB"]["CLUSTER"] = cluster_mpi_engine
T
tangwei 已提交
50

T
tangwei 已提交
51

X
fix  
xjqbest 已提交
52
def get_inters_from_yaml(file, filters):
T
tangwei 已提交
53 54 55 56 57 58 59
    with open(file, 'r') as rb:
        _envs = yaml.load(rb.read(), Loader=yaml.FullLoader)

    flattens = envs.flatten_environs(_envs)

    inters = {}
    for k, v in flattens.items():
X
fix  
xjqbest 已提交
60 61 62
        for f in filters:
            if k.startswith(f):
                inters[k] = v
T
tangwei 已提交
63
    return inters
T
tangwei 已提交
64 65


C
chengmo 已提交
66
def get_engine(args):
T
tangwei 已提交
67
    transpiler = get_transpiler()
X
fix  
xjqbest 已提交
68 69 70 71 72 73 74 75
    run_extras = get_inters_from_yaml(args.model, ["train.", "epoch."])

    engine = run_extras.get("train.engine", None)
    if engine is None:
        engine = run_extras.get("epoch.trainer_class", None)
    if engine is None:
        engine = "single"
    
T
tangwei 已提交
76
    engine = engine.upper()
C
chengmo 已提交
77

T
tangwei 已提交
78
    if engine not in engine_choices:
T
tangwei 已提交
79 80
        raise ValueError("train.engin can not be chosen in {}".format(
            engine_choices))
T
tangwei 已提交
81

T
tangwei 已提交
82
    print("engines: \n{}".format(engines))
T
tangwei 已提交
83

T
tangwei 已提交
84
    run_engine = engines[transpiler].get(engine, None)
T
tangwei 已提交
85

T
tangwei 已提交
86 87 88 89
    return run_engine


def get_transpiler():
T
tangwei 已提交
90
    FNULL = open(os.devnull, 'w')
T
tangwei 已提交
91 92 93 94
    cmd = [
        "python", "-c",
        "import paddle.fluid as fluid; fleet_ptr = fluid.core.Fleet(); [fleet_ptr.copy_table_by_feasign(10, 10, [2020, 1010])];"
    ]
T
tangwei 已提交
95 96 97
    proc = subprocess.Popen(cmd, stdout=FNULL, stderr=FNULL, cwd=os.getcwd())
    ret = proc.wait()
    if ret == -11:
T
tangwei 已提交
98
        return "PSLIB"
T
tangwei 已提交
99
    else:
T
tangwei 已提交
100
        return "TRANSPILER"
T
tangwei 已提交
101 102


T
tangwei 已提交
103 104 105
def set_runtime_envs(cluster_envs, engine_yaml):
    if cluster_envs is None:
        cluster_envs = {}
T
tangwei 已提交
106

T
tangwei 已提交
107
    engine_extras = get_inters_from_yaml(engine_yaml, "train.trainer.")
108 109
    if "train.trainer.threads" in engine_extras and "CPU_NUM" in cluster_envs:
        cluster_envs["CPU_NUM"] = engine_extras["train.trainer.threads"]
T
tangwei 已提交
110

T
tangwei 已提交
111
    envs.set_runtime_environs(cluster_envs)
112
    envs.set_runtime_environs(engine_extras)
T
fix bug  
tangwei 已提交
113 114 115

    need_print = {}
    for k, v in os.environ.items():
T
tangwei 已提交
116
        if k.startswith("train.trainer."):
T
fix bug  
tangwei 已提交
117 118 119
            need_print[k] = v

    print(envs.pretty_print_envs(need_print, ("Runtime Envs", "Value")))
T
tangwei 已提交
120 121


C
chengmo 已提交
122 123 124 125
def get_trainer_prefix(args):
    if model_name in custom_model:
        return model_name.upper()
    return ""
T
tangwei 已提交
126

C
chengmo 已提交
127

C
chengmo 已提交
128 129
def single_engine(args):
    trainer = get_trainer_prefix(args) + "SingleTrainer"
C
chengmo 已提交
130
    single_envs = {}
C
chengmo 已提交
131
    single_envs["train.trainer.trainer"] = trainer
C
chengmo 已提交
132 133 134
    single_envs["train.trainer.threads"] = "2"
    single_envs["train.trainer.engine"] = "single"
    single_envs["train.trainer.platform"] = envs.get_platform()
C
chengmo 已提交
135
    print("use {} engine to run model: {}".format(trainer, args.model))
C
chengmo 已提交
136 137 138 139 140

    set_runtime_envs(single_envs, args.model)
    trainer = TrainerFactory.create(args.model)
    return trainer

X
fix  
xjqbest 已提交
141 142 143 144 145 146 147 148 149 150 151 152
def single_yamlopt_engine(args):
    trainer = get_trainer_prefix(args) + "SingleTrainerYamlOpt"
    single_envs = {}
    single_envs["train.trainer.trainer"] = trainer
    single_envs["train.trainer.threads"] = "2"
    single_envs["train.trainer.engine"] = "single_yamlopt"
    single_envs["train.trainer.platform"] = envs.get_platform()
    print("use {} engine to run model: {}".format(trainer, args.model))

    set_runtime_envs(single_envs, args.model)
    trainer = TrainerFactory.create(args.model)
    return trainer
C
chengmo 已提交
153

T
tangwei 已提交
154
def cluster_engine(args):
T
tangwei 已提交
155 156
    def update_workspace(cluster_envs):
        workspace = cluster_envs.get("engine_workspace", None)
T
tangwei 已提交
157

T
tangwei 已提交
158 159
        if not workspace:
            return
T
tangwei 已提交
160
        path = envs.path_adapter(workspace)
T
tangwei 已提交
161 162 163
        for name, value in cluster_envs.items():
            if isinstance(value, str):
                value = value.replace("{workspace}", path)
T
tangwei 已提交
164
                value = envs.windows_path_converter(value)
T
tangwei 已提交
165 166 167
                cluster_envs[name] = value

    def master():
T
tangwei 已提交
168
        role = "MASTER"
169
        from paddlerec.core.engine.cluster.cluster import ClusterEngine
T
tangwei 已提交
170 171 172 173
        with open(args.backend, 'r') as rb:
            _envs = yaml.load(rb.read(), Loader=yaml.FullLoader)

        flattens = envs.flatten_environs(_envs, "_")
T
tangwei 已提交
174
        flattens["engine_role"] = role
T
tangwei 已提交
175
        flattens["engine_run_config"] = args.model
T
tangwei 已提交
176 177 178 179
        flattens["engine_temp_path"] = tempfile.mkdtemp()
        update_workspace(flattens)

        envs.set_runtime_environs(flattens)
T
tangwei 已提交
180 181
        print(envs.pretty_print_envs(flattens, ("Submit Runtime Envs", "Value"
                                                )))
T
tangwei 已提交
182 183 184 185 186

        launch = ClusterEngine(None, args.model)
        return launch

    def worker():
T
tangwei 已提交
187
        role = "WORKER"
T
tangwei 已提交
188 189 190 191
        trainer = get_trainer_prefix(args) + "ClusterTrainer"
        cluster_envs = {}
        cluster_envs["train.trainer.trainer"] = trainer
        cluster_envs["train.trainer.engine"] = "cluster"
T
tangwei 已提交
192 193
        cluster_envs["train.trainer.threads"] = envs.get_runtime_environ(
            "CPU_NUM")
T
tangwei 已提交
194
        cluster_envs["train.trainer.platform"] = envs.get_platform()
C
chengmo 已提交
195 196
        print("launch {} engine with cluster to with model: {}".format(
            trainer, args.model))
T
tangwei 已提交
197
        set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
198

T
bug fix  
tangwei12 已提交
199 200
        trainer = TrainerFactory.create(args.model)
        return trainer
T
tangwei 已提交
201

T
tangwei 已提交
202 203 204
    role = os.getenv("PADDLE_PADDLEREC_ROLE", "MASTER")

    if role == "WORKER":
T
tangwei 已提交
205 206 207
        return worker()
    else:
        return master()
C
chengmo 已提交
208 209


T
tangwei 已提交
210
def cluster_mpi_engine(args):
T
tangwei 已提交
211 212
    print("launch cluster engine with cluster to run model: {}".format(
        args.model))
T
tangwei 已提交
213

T
fix bug  
tangwei 已提交
214
    cluster_envs = {}
T
tangwei 已提交
215
    cluster_envs["train.trainer.trainer"] = "CtrCodingTrainer"
T
tangwei 已提交
216
    cluster_envs["train.trainer.platform"] = envs.get_platform()
T
tangwei 已提交
217

T
tangwei 已提交
218
    set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
219

T
tangwei 已提交
220 221 222 223 224
    trainer = TrainerFactory.create(args.model)
    return trainer


def local_cluster_engine(args):
225
    from paddlerec.core.engine.local_cluster import LocalClusterEngine
C
chengmo 已提交
226

C
chengmo 已提交
227
    trainer = get_trainer_prefix(args) + "ClusterTrainer"
C
chengmo 已提交
228 229 230
    cluster_envs = {}
    cluster_envs["server_num"] = 1
    cluster_envs["worker_num"] = 1
C
chengmo 已提交
231
    cluster_envs["start_port"] = envs.find_free_port()
C
chengmo 已提交
232
    cluster_envs["log_dir"] = "logs"
C
chengmo 已提交
233
    cluster_envs["train.trainer.trainer"] = trainer
C
chengmo 已提交
234 235 236 237 238 239
    cluster_envs["train.trainer.strategy"] = "async"
    cluster_envs["train.trainer.threads"] = "2"
    cluster_envs["train.trainer.engine"] = "local_cluster"
    cluster_envs["train.trainer.platform"] = envs.get_platform()

    cluster_envs["CPU_NUM"] = "2"
T
tangwei 已提交
240 241
    print("launch {} engine with cluster to run model: {}".format(trainer,
                                                                  args.model))
C
chengmo 已提交
242 243 244 245 246 247

    set_runtime_envs(cluster_envs, args.model)
    launch = LocalClusterEngine(cluster_envs, args.model)
    return launch


T
tangwei 已提交
248
def local_mpi_engine(args):
T
tangwei 已提交
249 250
    print("launch cluster engine with cluster to run model: {}".format(
        args.model))
251
    from paddlerec.core.engine.local_mpi import LocalMPIEngine
T
tangwei 已提交
252

T
tangwei 已提交
253 254
    print("use 1X1 MPI ClusterTraining at localhost to run model: {}".format(
        args.model))
T
tangwei 已提交
255

T
tangwei 已提交
256 257 258
    mpi = util.run_which("mpirun")
    if not mpi:
        raise RuntimeError("can not find mpirun, please check environment")
T
fix bug  
tangwei 已提交
259 260
    cluster_envs = {}
    cluster_envs["mpirun"] = mpi
T
tangwei 已提交
261
    cluster_envs["train.trainer.trainer"] = "CtrCodingTrainer"
T
fix bug  
tangwei 已提交
262
    cluster_envs["log_dir"] = "logs"
T
tangwei 已提交
263
    cluster_envs["train.trainer.engine"] = "local_cluster"
T
tangwei 已提交
264 265

    cluster_envs["train.trainer.platform"] = envs.get_platform()
T
tangwei 已提交
266

T
tangwei 已提交
267
    set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
268 269 270 271
    launch = LocalMPIEngine(cluster_envs, args.model)
    return launch


T
tangwei 已提交
272
def get_abs_model(model):
273
    if model.startswith("paddlerec."):
T
tangwei 已提交
274
        dir = envs.path_adapter(model)
T
tangwei 已提交
275
        path = os.path.join(dir, "config.yaml")
T
tangwei 已提交
276 277 278 279 280 281 282
    else:
        if not os.path.isfile(model):
            raise IOError("model config: {} invalid".format(model))
        path = model
    return path


T
tangwei 已提交
283
if __name__ == "__main__":
284
    parser = argparse.ArgumentParser(description='paddle-rec run')
T
tangwei 已提交
285
    parser.add_argument("-m", "--model", type=str)
T
tangwei 已提交
286
    parser.add_argument("-b", "--backend", type=str, default=None)
T
tangwei 已提交
287

T
tangwei 已提交
288 289 290
    abs_dir = os.path.dirname(os.path.abspath(__file__))
    envs.set_runtime_environs({"PACKAGE_BASE": abs_dir})

T
tangwei 已提交
291
    args = parser.parse_args()
T
tangwei 已提交
292

C
fix  
chengmo 已提交
293
    model_name = args.model.split('.')[-1]
T
tangwei 已提交
294
    args.model = get_abs_model(args.model)
T
tangwei 已提交
295
    engine_registry()
T
tangwei 已提交
296

C
chengmo 已提交
297
    which_engine = get_engine(args)
T
tangwei 已提交
298 299
    engine = which_engine(args)
    engine.run()