run.py 10.7 KB
Newer Older
T
tangwei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

T
tangwei 已提交
15
import os
T
tangwei 已提交
16
import subprocess
X
test  
xjqbest 已提交
17
import sys
T
tangwei 已提交
18 19
import argparse
import tempfile
T
tangwei 已提交
20
import yaml
X
fix  
xjqbest 已提交
21
import copy
22 23 24
from paddlerec.core.factory import TrainerFactory
from paddlerec.core.utils import envs
from paddlerec.core.utils import util
X
test  
xjqbest 已提交
25
from paddlerec.core.utils import validation
T
tangwei 已提交
26

T
tangwei 已提交
27 28
engines = {}
device = ["CPU", "GPU"]
T
tangwei 已提交
29
clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"]
T
tangwei 已提交
30
engine_choices = [
X
fix  
xjqbest 已提交
31 32
    "SINGLE_TRAIN", "LOCAL_CLUSTER", "CLUSTER", "TDM_SINGLE",
    "TDM_LOCAL_CLUSTER", "TDM_CLUSTER", "SINGLE_INFER"
T
tangwei 已提交
33
]
T
tangwei 已提交
34
custom_model = ['TDM']
C
fix  
chengmo 已提交
35
model_name = ""
T
tangwei 已提交
36 37


T
tangwei 已提交
38
def engine_registry():
T
tangwei 已提交
39 40 41
    engines["TRANSPILER"] = {}
    engines["PSLIB"] = {}

X
fix  
xjqbest 已提交
42 43
    engines["TRANSPILER"]["SINGLE_TRAIN"] = single_train_engine
    engines["TRANSPILER"]["SINGLE_INFER"] = single_infer_engine
T
tangwei 已提交
44 45 46 47 48
    engines["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine
    engines["TRANSPILER"]["CLUSTER"] = cluster_engine
    engines["PSLIB"]["SINGLE"] = local_mpi_engine
    engines["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine
    engines["PSLIB"]["CLUSTER"] = cluster_mpi_engine
T
tangwei 已提交
49

T
tangwei 已提交
50

X
fix  
xjqbest 已提交
51
def get_inters_from_yaml(file, filters):
X
test  
xjqbest 已提交
52
    _envs = envs.load_yaml(file)
T
tangwei 已提交
53 54 55
    flattens = envs.flatten_environs(_envs)
    inters = {}
    for k, v in flattens.items():
X
fix  
xjqbest 已提交
56 57 58
        for f in filters:
            if k.startswith(f):
                inters[k] = v
T
tangwei 已提交
59
    return inters
T
tangwei 已提交
60 61


X
fix  
xjqbest 已提交
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
def get_all_inters_from_yaml(file, filters):
    with open(file, 'r') as rb:
        _envs = yaml.load(rb.read(), Loader=yaml.FullLoader)
    all_flattens = {}

    def fatten_env_namespace(namespace_nests, local_envs):
        for k, v in local_envs.items():
            if isinstance(v, dict):
                nests = copy.deepcopy(namespace_nests)
                nests.append(k)
                fatten_env_namespace(nests, v)
            elif (k == "dataset" or k == "phase" or
                  k == "runner") and isinstance(v, list):
                for i in v:
                    if i.get("name") is None:
                        raise ValueError("name must be in dataset list ", v)
                    nests = copy.deepcopy(namespace_nests)
                    nests.append(k)
                    nests.append(i["name"])
                    fatten_env_namespace(nests, i)
            else:
                global_k = ".".join(namespace_nests + [k])
                all_flattens[global_k] = v

    fatten_env_namespace([], _envs)
    ret = {}
    for k, v in all_flattens.items():
        for f in filters:
            if k.startswith(f):
                ret[k] = v
    return ret


C
chengmo 已提交
95
def get_engine(args):
T
tangwei 已提交
96
    transpiler = get_transpiler()
X
fix  
xjqbest 已提交
97 98 99
    with open(args.model, 'r') as rb:
        envs = yaml.load(rb.read(), Loader=yaml.FullLoader)
    run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."])
X
fix  
xjqbest 已提交
100 101 102

    engine = run_extras.get("train.engine", None)
    if engine is None:
X
fix  
xjqbest 已提交
103
        engine = run_extras.get("runner." + envs["mode"] + ".class", None)
X
fix  
xjqbest 已提交
104
    if engine is None:
X
fix  
xjqbest 已提交
105
        engine = "single_train"
T
tangwei 已提交
106 107
    engine = engine.upper()
    if engine not in engine_choices:
T
tangwei 已提交
108 109
        raise ValueError("train.engin can not be chosen in {}".format(
            engine_choices))
T
tangwei 已提交
110

T
tangwei 已提交
111
    print("engines: \n{}".format(engines))
T
tangwei 已提交
112

T
tangwei 已提交
113
    run_engine = engines[transpiler].get(engine, None)
T
tangwei 已提交
114

T
tangwei 已提交
115 116 117 118
    return run_engine


def get_transpiler():
T
tangwei 已提交
119
    FNULL = open(os.devnull, 'w')
T
tangwei 已提交
120 121 122 123
    cmd = [
        "python", "-c",
        "import paddle.fluid as fluid; fleet_ptr = fluid.core.Fleet(); [fleet_ptr.copy_table_by_feasign(10, 10, [2020, 1010])];"
    ]
T
tangwei 已提交
124 125 126
    proc = subprocess.Popen(cmd, stdout=FNULL, stderr=FNULL, cwd=os.getcwd())
    ret = proc.wait()
    if ret == -11:
T
tangwei 已提交
127
        return "PSLIB"
T
tangwei 已提交
128
    else:
T
tangwei 已提交
129
        return "TRANSPILER"
T
tangwei 已提交
130 131


T
tangwei 已提交
132 133 134
def set_runtime_envs(cluster_envs, engine_yaml):
    if cluster_envs is None:
        cluster_envs = {}
T
tangwei 已提交
135

T
tangwei 已提交
136
    engine_extras = get_inters_from_yaml(engine_yaml, "train.trainer.")
137 138
    if "train.trainer.threads" in engine_extras and "CPU_NUM" in cluster_envs:
        cluster_envs["CPU_NUM"] = engine_extras["train.trainer.threads"]
T
tangwei 已提交
139

T
tangwei 已提交
140
    envs.set_runtime_environs(cluster_envs)
141
    envs.set_runtime_environs(engine_extras)
T
fix bug  
tangwei 已提交
142 143 144

    need_print = {}
    for k, v in os.environ.items():
T
tangwei 已提交
145
        if k.startswith("train.trainer."):
T
fix bug  
tangwei 已提交
146 147 148
            need_print[k] = v

    print(envs.pretty_print_envs(need_print, ("Runtime Envs", "Value")))
T
tangwei 已提交
149 150


C
chengmo 已提交
151 152 153 154
def get_trainer_prefix(args):
    if model_name in custom_model:
        return model_name.upper()
    return ""
T
tangwei 已提交
155

C
chengmo 已提交
156

X
fix  
xjqbest 已提交
157
def single_train_engine(args):
C
chengmo 已提交
158
    trainer = get_trainer_prefix(args) + "SingleTrainer"
C
chengmo 已提交
159
    single_envs = {}
C
chengmo 已提交
160
    single_envs["train.trainer.trainer"] = trainer
C
chengmo 已提交
161
    single_envs["train.trainer.threads"] = "2"
X
fix  
xjqbest 已提交
162
    single_envs["train.trainer.engine"] = "single_train"
C
chengmo 已提交
163
    single_envs["train.trainer.platform"] = envs.get_platform()
C
chengmo 已提交
164
    print("use {} engine to run model: {}".format(trainer, args.model))
X
fix  
xjqbest 已提交
165 166 167
    set_runtime_envs(single_envs, args.model)
    trainer = TrainerFactory.create(args.model)
    return trainer
X
fix  
xjqbest 已提交
168

X
fix  
xjqbest 已提交
169 170 171 172 173 174 175 176 177

def single_infer_engine(args):
    trainer = get_trainer_prefix(args) + "SingleInfer"
    single_envs = {}
    single_envs["train.trainer.trainer"] = trainer
    single_envs["train.trainer.threads"] = "2"
    single_envs["train.trainer.engine"] = "single_infer"
    single_envs["train.trainer.platform"] = envs.get_platform()
    print("use {} engine to run model: {}".format(trainer, args.model))
X
fix  
xjqbest 已提交
178 179 180
    set_runtime_envs(single_envs, args.model)
    trainer = TrainerFactory.create(args.model)
    return trainer
C
chengmo 已提交
181

X
fix  
xjqbest 已提交
182

T
tangwei 已提交
183
def cluster_engine(args):
T
tangwei 已提交
184 185
    def update_workspace(cluster_envs):
        workspace = cluster_envs.get("engine_workspace", None)
T
tangwei 已提交
186

T
tangwei 已提交
187 188
        if not workspace:
            return
T
tangwei 已提交
189
        path = envs.path_adapter(workspace)
T
tangwei 已提交
190 191 192
        for name, value in cluster_envs.items():
            if isinstance(value, str):
                value = value.replace("{workspace}", path)
T
tangwei 已提交
193
                value = envs.windows_path_converter(value)
T
tangwei 已提交
194 195 196
                cluster_envs[name] = value

    def master():
T
tangwei 已提交
197
        role = "MASTER"
198
        from paddlerec.core.engine.cluster.cluster import ClusterEngine
X
test  
xjqbest 已提交
199
        _envs = envs.load_yaml(args.backend)
T
tangwei 已提交
200
        flattens = envs.flatten_environs(_envs, "_")
T
tangwei 已提交
201
        flattens["engine_role"] = role
T
tangwei 已提交
202
        flattens["engine_run_config"] = args.model
T
tangwei 已提交
203 204 205 206
        flattens["engine_temp_path"] = tempfile.mkdtemp()
        update_workspace(flattens)

        envs.set_runtime_environs(flattens)
T
tangwei 已提交
207 208
        print(envs.pretty_print_envs(flattens, ("Submit Runtime Envs", "Value"
                                                )))
T
tangwei 已提交
209 210 211 212 213

        launch = ClusterEngine(None, args.model)
        return launch

    def worker():
T
tangwei 已提交
214
        role = "WORKER"
T
tangwei 已提交
215 216 217 218
        trainer = get_trainer_prefix(args) + "ClusterTrainer"
        cluster_envs = {}
        cluster_envs["train.trainer.trainer"] = trainer
        cluster_envs["train.trainer.engine"] = "cluster"
T
tangwei 已提交
219 220
        cluster_envs["train.trainer.threads"] = envs.get_runtime_environ(
            "CPU_NUM")
T
tangwei 已提交
221
        cluster_envs["train.trainer.platform"] = envs.get_platform()
C
chengmo 已提交
222 223
        print("launch {} engine with cluster to with model: {}".format(
            trainer, args.model))
T
tangwei 已提交
224
        set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
225

T
bug fix  
tangwei12 已提交
226 227
        trainer = TrainerFactory.create(args.model)
        return trainer
T
tangwei 已提交
228

T
tangwei 已提交
229 230 231
    role = os.getenv("PADDLE_PADDLEREC_ROLE", "MASTER")

    if role == "WORKER":
T
tangwei 已提交
232 233 234
        return worker()
    else:
        return master()
C
chengmo 已提交
235 236


T
tangwei 已提交
237
def cluster_mpi_engine(args):
T
tangwei 已提交
238 239
    print("launch cluster engine with cluster to run model: {}".format(
        args.model))
T
tangwei 已提交
240

T
fix bug  
tangwei 已提交
241
    cluster_envs = {}
T
tangwei 已提交
242
    cluster_envs["train.trainer.trainer"] = "CtrCodingTrainer"
T
tangwei 已提交
243
    cluster_envs["train.trainer.platform"] = envs.get_platform()
T
tangwei 已提交
244

T
tangwei 已提交
245
    set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
246

T
tangwei 已提交
247 248 249 250 251
    trainer = TrainerFactory.create(args.model)
    return trainer


def local_cluster_engine(args):
252
    from paddlerec.core.engine.local_cluster import LocalClusterEngine
C
chengmo 已提交
253

C
chengmo 已提交
254
    trainer = get_trainer_prefix(args) + "ClusterTrainer"
C
chengmo 已提交
255 256 257
    cluster_envs = {}
    cluster_envs["server_num"] = 1
    cluster_envs["worker_num"] = 1
C
chengmo 已提交
258
    cluster_envs["start_port"] = envs.find_free_port()
C
chengmo 已提交
259
    cluster_envs["log_dir"] = "logs"
C
chengmo 已提交
260
    cluster_envs["train.trainer.trainer"] = trainer
C
chengmo 已提交
261 262 263 264 265 266
    cluster_envs["train.trainer.strategy"] = "async"
    cluster_envs["train.trainer.threads"] = "2"
    cluster_envs["train.trainer.engine"] = "local_cluster"
    cluster_envs["train.trainer.platform"] = envs.get_platform()

    cluster_envs["CPU_NUM"] = "2"
T
tangwei 已提交
267 268
    print("launch {} engine with cluster to run model: {}".format(trainer,
                                                                  args.model))
C
chengmo 已提交
269 270 271 272 273 274

    set_runtime_envs(cluster_envs, args.model)
    launch = LocalClusterEngine(cluster_envs, args.model)
    return launch


T
tangwei 已提交
275
def local_mpi_engine(args):
T
tangwei 已提交
276 277
    print("launch cluster engine with cluster to run model: {}".format(
        args.model))
278
    from paddlerec.core.engine.local_mpi import LocalMPIEngine
T
tangwei 已提交
279

T
tangwei 已提交
280 281
    print("use 1X1 MPI ClusterTraining at localhost to run model: {}".format(
        args.model))
T
tangwei 已提交
282

T
tangwei 已提交
283 284 285
    mpi = util.run_which("mpirun")
    if not mpi:
        raise RuntimeError("can not find mpirun, please check environment")
T
fix bug  
tangwei 已提交
286 287
    cluster_envs = {}
    cluster_envs["mpirun"] = mpi
T
tangwei 已提交
288
    cluster_envs["train.trainer.trainer"] = "CtrCodingTrainer"
T
fix bug  
tangwei 已提交
289
    cluster_envs["log_dir"] = "logs"
T
tangwei 已提交
290
    cluster_envs["train.trainer.engine"] = "local_cluster"
T
tangwei 已提交
291 292

    cluster_envs["train.trainer.platform"] = envs.get_platform()
T
tangwei 已提交
293

T
tangwei 已提交
294
    set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
295 296 297 298
    launch = LocalMPIEngine(cluster_envs, args.model)
    return launch


T
tangwei 已提交
299
def get_abs_model(model):
300
    if model.startswith("paddlerec."):
T
tangwei 已提交
301
        dir = envs.path_adapter(model)
T
tangwei 已提交
302
        path = os.path.join(dir, "config.yaml")
T
tangwei 已提交
303 304 305 306 307 308 309
    else:
        if not os.path.isfile(model):
            raise IOError("model config: {} invalid".format(model))
        path = model
    return path


T
tangwei 已提交
310
if __name__ == "__main__":
311
    parser = argparse.ArgumentParser(description='paddle-rec run')
T
tangwei 已提交
312
    parser.add_argument("-m", "--model", type=str)
T
tangwei 已提交
313
    parser.add_argument("-b", "--backend", type=str, default=None)
T
tangwei 已提交
314

T
tangwei 已提交
315 316 317
    abs_dir = os.path.dirname(os.path.abspath(__file__))
    envs.set_runtime_environs({"PACKAGE_BASE": abs_dir})

T
tangwei 已提交
318
    args = parser.parse_args()
T
tangwei 已提交
319

C
fix  
chengmo 已提交
320
    model_name = args.model.split('.')[-1]
T
tangwei 已提交
321
    args.model = get_abs_model(args.model)
X
test  
xjqbest 已提交
322 323
    if not validation.yaml_validation(args.model):
        sys.exit(-1)
T
tangwei 已提交
324
    engine_registry()
C
chengmo 已提交
325
    which_engine = get_engine(args)
T
tangwei 已提交
326 327
    engine = which_engine(args)
    engine.run()