run.py 8.7 KB
Newer Older
T
tangwei 已提交
1 2
import argparse
import os
T
tangwei 已提交
3
import subprocess
T
tangwei 已提交
4 5
import tempfile

T
tangwei 已提交
6
import yaml
T
tangwei 已提交
7

T
rename  
tangwei 已提交
8 9 10
from fleetrec.core.factory import TrainerFactory
from fleetrec.core.utils import envs
from fleetrec.core.utils import util
T
tangwei 已提交
11

T
tangwei 已提交
12 13
engines = {}
device = ["CPU", "GPU"]
T
tangwei 已提交
14
clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"]
C
chengmo 已提交
15
custom_model = ['tdm']
C
fix  
chengmo 已提交
16
model_name = ""
T
tangwei 已提交
17 18


T
tangwei 已提交
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
def engine_registry():
    cpu = {"TRANSPILER": {}, "PSLIB": {}}
    cpu["TRANSPILER"]["SINGLE"] = single_engine
    cpu["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine
    cpu["TRANSPILER"]["CLUSTER"] = cluster_engine
    cpu["PSLIB"]["SINGLE"] = local_mpi_engine
    cpu["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine
    cpu["PSLIB"]["CLUSTER"] = cluster_mpi_engine

    gpu = {"TRANSPILER": {}, "PSLIB": {}}
    gpu["TRANSPILER"]["SINGLE"] = single_engine

    engines["CPU"] = cpu
    engines["GPU"] = gpu


C
chengmo 已提交
35 36
def get_engine(args):
    device = args.device
T
tangwei 已提交
37 38
    d_engine = engines[device]
    transpiler = get_transpiler()
C
chengmo 已提交
39

C
fix  
chengmo 已提交
40
    engine = args.engine
T
tangwei 已提交
41 42 43
    run_engine = d_engine[transpiler].get(engine, None)

    if run_engine is None:
C
chengmo 已提交
44 45
        raise ValueError(
            "engine {} can not be supported on device: {}".format(engine, device))
T
tangwei 已提交
46 47 48 49
    return run_engine


def get_transpiler():
T
tangwei 已提交
50 51 52 53 54 55
    FNULL = open(os.devnull, 'w')
    cmd = ["python", "-c",
           "import paddle.fluid as fluid; fleet_ptr = fluid.core.Fleet(); [fleet_ptr.copy_table_by_feasign(10, 10, [2020, 1010])];"]
    proc = subprocess.Popen(cmd, stdout=FNULL, stderr=FNULL, cwd=os.getcwd())
    ret = proc.wait()
    if ret == -11:
T
tangwei 已提交
56
        return "PSLIB"
T
tangwei 已提交
57
    else:
T
tangwei 已提交
58
        return "TRANSPILER"
T
tangwei 已提交
59 60


T
tangwei 已提交
61
def set_runtime_envs(cluster_envs, engine_yaml):
T
tangwei 已提交
62
    def get_engine_extras():
T
tangwei 已提交
63 64
        with open(engine_yaml, 'r') as rb:
            _envs = yaml.load(rb.read(), Loader=yaml.FullLoader)
T
tangwei 已提交
65 66 67 68 69 70 71 72

        flattens = envs.flatten_environs(_envs)

        engine_extras = {}
        for k, v in flattens.items():
            if k.startswith("train.trainer."):
                engine_extras[k] = v
        return engine_extras
T
tangwei 已提交
73 74 75

    if cluster_envs is None:
        cluster_envs = {}
T
tangwei 已提交
76

77 78 79
    engine_extras = get_engine_extras()
    if "train.trainer.threads" in engine_extras and "CPU_NUM" in cluster_envs:
        cluster_envs["CPU_NUM"] = engine_extras["train.trainer.threads"]
T
tangwei 已提交
80
    envs.set_runtime_environs(cluster_envs)
81
    envs.set_runtime_environs(engine_extras)
T
fix bug  
tangwei 已提交
82 83 84

    need_print = {}
    for k, v in os.environ.items():
T
tangwei 已提交
85
        if k.startswith("train.trainer."):
T
fix bug  
tangwei 已提交
86 87 88
            need_print[k] = v

    print(envs.pretty_print_envs(need_print, ("Runtime Envs", "Value")))
T
tangwei 已提交
89 90


C
chengmo 已提交
91 92 93 94
def get_trainer_prefix(args):
    if model_name in custom_model:
        return model_name.upper()
    return ""
T
tangwei 已提交
95

C
chengmo 已提交
96

C
chengmo 已提交
97 98
def single_engine(args):
    trainer = get_trainer_prefix(args) + "SingleTrainer"
C
chengmo 已提交
99
    single_envs = {}
C
chengmo 已提交
100
    single_envs["train.trainer.trainer"] = trainer
C
chengmo 已提交
101 102 103 104
    single_envs["train.trainer.threads"] = "2"
    single_envs["train.trainer.engine"] = "single"
    single_envs["train.trainer.device"] = args.device
    single_envs["train.trainer.platform"] = envs.get_platform()
C
chengmo 已提交
105
    print("use {} engine to run model: {}".format(trainer, args.model))
C
chengmo 已提交
106 107 108 109 110 111

    set_runtime_envs(single_envs, args.model)
    trainer = TrainerFactory.create(args.model)
    return trainer


T
tangwei 已提交
112
def cluster_engine(args):
T
tangwei 已提交
113
    from fleetrec.core.engine.cluster.cluster import ClusterEngine
C
chengmo 已提交
114

T
tangwei 已提交
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
    def update_workspace(cluster_envs):
        workspace = cluster_envs.get("engine_workspace", None)
        if not workspace:
            return

        # is fleet inner models
        if workspace.startswith("fleetrec."):
            fleet_package = envs.get_runtime_environ("PACKAGE_BASE")
            workspace_dir = workspace.split("fleetrec.")[1].replace(".", "/")
            path = os.path.join(fleet_package, workspace_dir)
        else:
            path = workspace

        for name, value in cluster_envs.items():
            if isinstance(value, str):
                value = value.replace("{workspace}", path)
                cluster_envs[name] = value

    def master():
        with open(args.backend, 'r') as rb:
            _envs = yaml.load(rb.read(), Loader=yaml.FullLoader)

        flattens = envs.flatten_environs(_envs, "_")
        flattens["engine_role"] = args.role
        flattens["engine_temp_path"] = tempfile.mkdtemp()
        update_workspace(flattens)

        envs.set_runtime_environs(flattens)
        print(envs.pretty_print_envs(flattens, ("Submit Runtime Envs", "Value")))

        launch = ClusterEngine(None, args.model)
        return launch

    def worker():
        trainer = get_trainer_prefix(args) + "ClusterTrainer"
        cluster_envs = {}
        cluster_envs["train.trainer.trainer"] = trainer
        cluster_envs["train.trainer.engine"] = "cluster"
        cluster_envs["train.trainer.device"] = args.device
        cluster_envs["train.trainer.platform"] = envs.get_platform()
        print("launch {} engine with cluster to with model: {}".format(trainer, args.model))
        set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
157 158

        launch = LocalClusterEngine(cluster_envs, args.model)
T
tangwei 已提交
159
        return launch
T
tangwei 已提交
160

T
tangwei 已提交
161 162 163 164
    if args.role == "worker":
        return worker()
    else:
        return master()
C
chengmo 已提交
165 166


T
tangwei 已提交
167
def cluster_mpi_engine(args):
T
tangwei 已提交
168 169
    print("launch cluster engine with cluster to run model: {}".format(args.model))

T
fix bug  
tangwei 已提交
170
    cluster_envs = {}
T
tangwei 已提交
171
    cluster_envs["train.trainer.trainer"] = "CtrCodingTrainer"
T
tangwei 已提交
172
    cluster_envs["train.trainer.device"] = args.device
T
tangwei 已提交
173
    cluster_envs["train.trainer.platform"] = envs.get_platform()
T
tangwei 已提交
174

T
tangwei 已提交
175
    set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
176

T
tangwei 已提交
177 178 179 180 181
    trainer = TrainerFactory.create(args.model)
    return trainer


def local_cluster_engine(args):
T
tangwei 已提交
182
    from fleetrec.core.engine.local_cluster import LocalClusterEngine
C
chengmo 已提交
183

C
chengmo 已提交
184
    trainer = get_trainer_prefix(args) + "ClusterTrainer"
C
chengmo 已提交
185 186 187
    cluster_envs = {}
    cluster_envs["server_num"] = 1
    cluster_envs["worker_num"] = 1
C
chengmo 已提交
188
    cluster_envs["start_port"] = envs.find_free_port()
C
chengmo 已提交
189
    cluster_envs["log_dir"] = "logs"
C
chengmo 已提交
190
    cluster_envs["train.trainer.trainer"] = trainer
C
chengmo 已提交
191 192 193 194 195 196 197 198
    cluster_envs["train.trainer.strategy"] = "async"
    cluster_envs["train.trainer.threads"] = "2"
    cluster_envs["train.trainer.engine"] = "local_cluster"

    cluster_envs["train.trainer.device"] = args.device
    cluster_envs["train.trainer.platform"] = envs.get_platform()

    cluster_envs["CPU_NUM"] = "2"
C
chengmo 已提交
199
    print("launch {} engine with cluster to run model: {}".format(trainer, args.model))
C
chengmo 已提交
200 201 202 203 204 205

    set_runtime_envs(cluster_envs, args.model)
    launch = LocalClusterEngine(cluster_envs, args.model)
    return launch


T
tangwei 已提交
206
def local_mpi_engine(args):
T
tangwei 已提交
207
    print("launch cluster engine with cluster to run model: {}".format(args.model))
T
tangwei 已提交
208
    from fleetrec.core.engine.local_mpi import LocalMPIEngine
T
tangwei 已提交
209

T
tangwei 已提交
210
    print("use 1X1 MPI ClusterTraining at localhost to run model: {}".format(args.model))
T
tangwei 已提交
211

T
tangwei 已提交
212 213 214
    mpi = util.run_which("mpirun")
    if not mpi:
        raise RuntimeError("can not find mpirun, please check environment")
T
fix bug  
tangwei 已提交
215 216
    cluster_envs = {}
    cluster_envs["mpirun"] = mpi
T
tangwei 已提交
217
    cluster_envs["train.trainer.trainer"] = "CtrCodingTrainer"
T
fix bug  
tangwei 已提交
218
    cluster_envs["log_dir"] = "logs"
T
tangwei 已提交
219
    cluster_envs["train.trainer.engine"] = "local_cluster"
T
tangwei 已提交
220

T
tangwei 已提交
221
    cluster_envs["train.trainer.device"] = args.device
T
tangwei 已提交
222
    cluster_envs["train.trainer.platform"] = envs.get_platform()
T
tangwei 已提交
223

T
tangwei 已提交
224
    set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
225 226 227 228
    launch = LocalMPIEngine(cluster_envs, args.model)
    return launch


T
tangwei 已提交
229 230 231 232 233 234 235 236 237 238 239 240
def get_abs_model(model):
    if model.startswith("fleetrec."):
        fleet_base = envs.get_runtime_environ("PACKAGE_BASE")
        workspace_dir = model.split("fleetrec.")[1].replace(".", "/")
        path = os.path.join(fleet_base, workspace_dir, "config.yaml")
    else:
        if not os.path.isfile(model):
            raise IOError("model config: {} invalid".format(model))
        path = model
    return path


T
tangwei 已提交
241 242
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='fleet-rec run')
T
tangwei 已提交
243
    parser.add_argument("-m", "--model", type=str)
C
chengmo 已提交
244
    parser.add_argument("-e", "--engine", type=str,
C
chengmo 已提交
245 246
                        choices=["single", "local_cluster", "cluster",
                                 "tdm_single", "tdm_local_cluster", "tdm_cluster"])
T
tangwei 已提交
247 248 249 250

    parser.add_argument("-d", "--device", type=str, choices=["cpu", "gpu"], default="cpu")
    parser.add_argument("-b", "--backend", type=str, default=None)
    parser.add_argument("-r", "--role", type=str, choices=["master", "worker"], default="master")
T
tangwei 已提交
251

T
tangwei 已提交
252 253 254
    abs_dir = os.path.dirname(os.path.abspath(__file__))
    envs.set_runtime_environs({"PACKAGE_BASE": abs_dir})

T
tangwei 已提交
255
    args = parser.parse_args()
T
tangwei 已提交
256 257
    args.engine = args.engine.upper()
    args.device = args.device.upper()
T
tangwei 已提交
258 259
    args.role = args.role.upper()

C
fix  
chengmo 已提交
260
    model_name = args.model.split('.')[-1]
T
tangwei 已提交
261
    args.model = get_abs_model(args.model)
T
tangwei 已提交
262
    engine_registry()
T
tangwei 已提交
263

C
chengmo 已提交
264
    which_engine = get_engine(args)
T
bug fix  
tangwei12 已提交
265

T
tangwei 已提交
266 267
    engine = which_engine(args)
    engine.run()