run.py 8.9 KB
Newer Older
T
tangwei 已提交
1 2
import argparse
import os
T
tangwei 已提交
3
import subprocess
T
tangwei 已提交
4 5
import tempfile

T
tangwei 已提交
6
import yaml
T
tangwei 已提交
7

8 9 10
from paddlerec.core.factory import TrainerFactory
from paddlerec.core.utils import envs
from paddlerec.core.utils import util
T
tangwei 已提交
11

T
tangwei 已提交
12 13
engines = {}
device = ["CPU", "GPU"]
T
tangwei 已提交
14
clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"]
C
chengmo 已提交
15
custom_model = ['tdm']
C
fix  
chengmo 已提交
16
model_name = ""
T
tangwei 已提交
17 18


T
tangwei 已提交
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
def engine_registry():
    cpu = {"TRANSPILER": {}, "PSLIB": {}}
    cpu["TRANSPILER"]["SINGLE"] = single_engine
    cpu["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine
    cpu["TRANSPILER"]["CLUSTER"] = cluster_engine
    cpu["PSLIB"]["SINGLE"] = local_mpi_engine
    cpu["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine
    cpu["PSLIB"]["CLUSTER"] = cluster_mpi_engine

    gpu = {"TRANSPILER": {}, "PSLIB": {}}
    gpu["TRANSPILER"]["SINGLE"] = single_engine

    engines["CPU"] = cpu
    engines["GPU"] = gpu


C
chengmo 已提交
35 36
def get_engine(args):
    device = args.device
T
tangwei 已提交
37 38
    d_engine = engines[device]
    transpiler = get_transpiler()
C
chengmo 已提交
39

C
fix  
chengmo 已提交
40
    engine = args.engine
T
tangwei 已提交
41 42 43
    run_engine = d_engine[transpiler].get(engine, None)

    if run_engine is None:
C
chengmo 已提交
44 45
        raise ValueError(
            "engine {} can not be supported on device: {}".format(engine, device))
T
tangwei 已提交
46 47 48 49
    return run_engine


def get_transpiler():
T
tangwei 已提交
50 51 52 53 54 55
    FNULL = open(os.devnull, 'w')
    cmd = ["python", "-c",
           "import paddle.fluid as fluid; fleet_ptr = fluid.core.Fleet(); [fleet_ptr.copy_table_by_feasign(10, 10, [2020, 1010])];"]
    proc = subprocess.Popen(cmd, stdout=FNULL, stderr=FNULL, cwd=os.getcwd())
    ret = proc.wait()
    if ret == -11:
T
tangwei 已提交
56
        return "PSLIB"
T
tangwei 已提交
57
    else:
T
tangwei 已提交
58
        return "TRANSPILER"
T
tangwei 已提交
59 60


T
tangwei 已提交
61
def set_runtime_envs(cluster_envs, engine_yaml):
T
tangwei 已提交
62
    def get_engine_extras():
T
tangwei 已提交
63 64
        with open(engine_yaml, 'r') as rb:
            _envs = yaml.load(rb.read(), Loader=yaml.FullLoader)
T
tangwei 已提交
65 66 67 68 69 70 71 72

        flattens = envs.flatten_environs(_envs)

        engine_extras = {}
        for k, v in flattens.items():
            if k.startswith("train.trainer."):
                engine_extras[k] = v
        return engine_extras
T
tangwei 已提交
73 74 75

    if cluster_envs is None:
        cluster_envs = {}
T
tangwei 已提交
76

77 78 79
    engine_extras = get_engine_extras()
    if "train.trainer.threads" in engine_extras and "CPU_NUM" in cluster_envs:
        cluster_envs["CPU_NUM"] = engine_extras["train.trainer.threads"]
T
tangwei 已提交
80
    envs.set_runtime_environs(cluster_envs)
81
    envs.set_runtime_environs(engine_extras)
T
fix bug  
tangwei 已提交
82 83 84

    need_print = {}
    for k, v in os.environ.items():
T
tangwei 已提交
85
        if k.startswith("train.trainer."):
T
fix bug  
tangwei 已提交
86 87 88
            need_print[k] = v

    print(envs.pretty_print_envs(need_print, ("Runtime Envs", "Value")))
T
tangwei 已提交
89 90


C
chengmo 已提交
91 92 93 94
def get_trainer_prefix(args):
    if model_name in custom_model:
        return model_name.upper()
    return ""
T
tangwei 已提交
95

C
chengmo 已提交
96

C
chengmo 已提交
97 98
def single_engine(args):
    trainer = get_trainer_prefix(args) + "SingleTrainer"
C
chengmo 已提交
99
    single_envs = {}
C
chengmo 已提交
100
    single_envs["train.trainer.trainer"] = trainer
C
chengmo 已提交
101 102 103 104
    single_envs["train.trainer.threads"] = "2"
    single_envs["train.trainer.engine"] = "single"
    single_envs["train.trainer.device"] = args.device
    single_envs["train.trainer.platform"] = envs.get_platform()
C
chengmo 已提交
105
    print("use {} engine to run model: {}".format(trainer, args.model))
C
chengmo 已提交
106 107 108 109 110 111

    set_runtime_envs(single_envs, args.model)
    trainer = TrainerFactory.create(args.model)
    return trainer


T
tangwei 已提交
112
def cluster_engine(args):
C
chengmo 已提交
113

T
tangwei 已提交
114 115 116 117 118 119
    def update_workspace(cluster_envs):
        workspace = cluster_envs.get("engine_workspace", None)
        if not workspace:
            return

        # is fleet inner models
120
        if workspace.startswith("paddlerec."):
T
tangwei 已提交
121
            fleet_package = envs.get_runtime_environ("PACKAGE_BASE")
122
            workspace_dir = workspace.split("paddlerec.")[1].replace(".", "/")
T
tangwei 已提交
123 124 125 126 127 128 129 130 131 132
            path = os.path.join(fleet_package, workspace_dir)
        else:
            path = workspace

        for name, value in cluster_envs.items():
            if isinstance(value, str):
                value = value.replace("{workspace}", path)
                cluster_envs[name] = value

    def master():
133
        from paddlerec.core.engine.cluster.cluster import ClusterEngine
T
tangwei 已提交
134 135 136 137 138
        with open(args.backend, 'r') as rb:
            _envs = yaml.load(rb.read(), Loader=yaml.FullLoader)

        flattens = envs.flatten_environs(_envs, "_")
        flattens["engine_role"] = args.role
T
tangwei 已提交
139
        flattens["engine_run_config"] = args.model
T
tangwei 已提交
140 141 142 143 144 145 146 147 148 149 150 151 152 153
        flattens["engine_temp_path"] = tempfile.mkdtemp()
        update_workspace(flattens)

        envs.set_runtime_environs(flattens)
        print(envs.pretty_print_envs(flattens, ("Submit Runtime Envs", "Value")))

        launch = ClusterEngine(None, args.model)
        return launch

    def worker():
        trainer = get_trainer_prefix(args) + "ClusterTrainer"
        cluster_envs = {}
        cluster_envs["train.trainer.trainer"] = trainer
        cluster_envs["train.trainer.engine"] = "cluster"
T
tangwei 已提交
154
        cluster_envs["train.trainer.threads"] = envs.get_runtime_environ("CPU_NUM")
T
tangwei 已提交
155 156
        cluster_envs["train.trainer.device"] = args.device
        cluster_envs["train.trainer.platform"] = envs.get_platform()
C
chengmo 已提交
157 158
        print("launch {} engine with cluster to with model: {}".format(
            trainer, args.model))
T
tangwei 已提交
159
        set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
160

T
bug fix  
tangwei12 已提交
161 162
        trainer = TrainerFactory.create(args.model)
        return trainer
T
tangwei 已提交
163

T
bug fix  
tangwei12 已提交
164
    if args.role == "WORKER":
T
tangwei 已提交
165 166 167
        return worker()
    else:
        return master()
C
chengmo 已提交
168 169


T
tangwei 已提交
170
def cluster_mpi_engine(args):
T
tangwei 已提交
171 172
    print("launch cluster engine with cluster to run model: {}".format(args.model))

T
fix bug  
tangwei 已提交
173
    cluster_envs = {}
T
tangwei 已提交
174
    cluster_envs["train.trainer.trainer"] = "CtrCodingTrainer"
T
tangwei 已提交
175
    cluster_envs["train.trainer.device"] = args.device
T
tangwei 已提交
176
    cluster_envs["train.trainer.platform"] = envs.get_platform()
T
tangwei 已提交
177

T
tangwei 已提交
178
    set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
179

T
tangwei 已提交
180 181 182 183 184
    trainer = TrainerFactory.create(args.model)
    return trainer


def local_cluster_engine(args):
185
    from paddlerec.core.engine.local_cluster import LocalClusterEngine
C
chengmo 已提交
186

C
chengmo 已提交
187
    trainer = get_trainer_prefix(args) + "ClusterTrainer"
C
chengmo 已提交
188 189 190
    cluster_envs = {}
    cluster_envs["server_num"] = 1
    cluster_envs["worker_num"] = 1
C
chengmo 已提交
191
    cluster_envs["start_port"] = envs.find_free_port()
C
chengmo 已提交
192
    cluster_envs["log_dir"] = "logs"
C
chengmo 已提交
193
    cluster_envs["train.trainer.trainer"] = trainer
C
chengmo 已提交
194 195 196 197 198 199 200 201
    cluster_envs["train.trainer.strategy"] = "async"
    cluster_envs["train.trainer.threads"] = "2"
    cluster_envs["train.trainer.engine"] = "local_cluster"

    cluster_envs["train.trainer.device"] = args.device
    cluster_envs["train.trainer.platform"] = envs.get_platform()

    cluster_envs["CPU_NUM"] = "2"
C
chengmo 已提交
202
    print("launch {} engine with cluster to run model: {}".format(trainer, args.model))
C
chengmo 已提交
203 204 205 206 207 208

    set_runtime_envs(cluster_envs, args.model)
    launch = LocalClusterEngine(cluster_envs, args.model)
    return launch


T
tangwei 已提交
209
def local_mpi_engine(args):
T
tangwei 已提交
210
    print("launch cluster engine with cluster to run model: {}".format(args.model))
211
    from paddlerec.core.engine.local_mpi import LocalMPIEngine
T
tangwei 已提交
212

T
tangwei 已提交
213
    print("use 1X1 MPI ClusterTraining at localhost to run model: {}".format(args.model))
T
tangwei 已提交
214

T
tangwei 已提交
215 216 217
    mpi = util.run_which("mpirun")
    if not mpi:
        raise RuntimeError("can not find mpirun, please check environment")
T
fix bug  
tangwei 已提交
218 219
    cluster_envs = {}
    cluster_envs["mpirun"] = mpi
T
tangwei 已提交
220
    cluster_envs["train.trainer.trainer"] = "CtrCodingTrainer"
T
fix bug  
tangwei 已提交
221
    cluster_envs["log_dir"] = "logs"
T
tangwei 已提交
222
    cluster_envs["train.trainer.engine"] = "local_cluster"
T
tangwei 已提交
223

T
tangwei 已提交
224
    cluster_envs["train.trainer.device"] = args.device
T
tangwei 已提交
225
    cluster_envs["train.trainer.platform"] = envs.get_platform()
T
tangwei 已提交
226

T
tangwei 已提交
227
    set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
228 229 230 231
    launch = LocalMPIEngine(cluster_envs, args.model)
    return launch


T
tangwei 已提交
232
def get_abs_model(model):
233
    if model.startswith("paddlerec."):
T
tangwei 已提交
234
        fleet_base = envs.get_runtime_environ("PACKAGE_BASE")
235
        workspace_dir = model.split("paddlerec.")[1].replace(".", "/")
T
tangwei 已提交
236 237 238 239 240 241 242 243
        path = os.path.join(fleet_base, workspace_dir, "config.yaml")
    else:
        if not os.path.isfile(model):
            raise IOError("model config: {} invalid".format(model))
        path = model
    return path


T
tangwei 已提交
244
if __name__ == "__main__":
245
    parser = argparse.ArgumentParser(description='paddle-rec run')
T
tangwei 已提交
246
    parser.add_argument("-m", "--model", type=str)
C
chengmo 已提交
247
    parser.add_argument("-e", "--engine", type=str,
C
chengmo 已提交
248 249
                        choices=["single", "local_cluster", "cluster",
                                 "tdm_single", "tdm_local_cluster", "tdm_cluster"])
T
tangwei 已提交
250

C
chengmo 已提交
251 252
    parser.add_argument("-d", "--device", type=str,
                        choices=["cpu", "gpu"], default="cpu")
T
tangwei 已提交
253
    parser.add_argument("-b", "--backend", type=str, default=None)
C
chengmo 已提交
254 255
    parser.add_argument("-r", "--role", type=str,
                        choices=["master", "worker"], default="master")
T
tangwei 已提交
256

T
tangwei 已提交
257 258 259
    abs_dir = os.path.dirname(os.path.abspath(__file__))
    envs.set_runtime_environs({"PACKAGE_BASE": abs_dir})

T
tangwei 已提交
260
    args = parser.parse_args()
T
tangwei 已提交
261 262
    args.engine = args.engine.upper()
    args.device = args.device.upper()
T
tangwei 已提交
263 264
    args.role = args.role.upper()

C
fix  
chengmo 已提交
265
    model_name = args.model.split('.')[-1]
T
tangwei 已提交
266
    args.model = get_abs_model(args.model)
T
tangwei 已提交
267
    engine_registry()
T
tangwei 已提交
268

C
chengmo 已提交
269
    which_engine = get_engine(args)
T
bug fix  
tangwei12 已提交
270

T
tangwei 已提交
271 272
    engine = which_engine(args)
    engine.run()