run.py 5.6 KB
Newer Older
T
tangwei 已提交
1 2
import argparse
import os
T
tangwei 已提交
3
import subprocess
T
tangwei 已提交
4
import yaml
T
tangwei 已提交
5 6 7

from fleetrec.core.factory import TrainerFactory
from fleetrec.core.utils import envs
T
tangwei 已提交
8
from fleetrec.core.utils import util
T
tangwei 已提交
9

T
tangwei 已提交
10 11
engines = {}
device = ["CPU", "GPU"]
T
tangwei 已提交
12 13 14
clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"]


T
tangwei 已提交
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
def engine_registry():
    cpu = {"TRANSPILER": {}, "PSLIB": {}}
    cpu["TRANSPILER"]["SINGLE"] = single_engine
    cpu["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine
    cpu["TRANSPILER"]["CLUSTER"] = cluster_engine
    cpu["PSLIB"]["SINGLE"] = local_mpi_engine
    cpu["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine
    cpu["PSLIB"]["CLUSTER"] = cluster_mpi_engine

    gpu = {"TRANSPILER": {}, "PSLIB": {}}
    gpu["TRANSPILER"]["SINGLE"] = single_engine

    engines["CPU"] = cpu
    engines["GPU"] = gpu


def get_engine(engine, device):
    d_engine = engines[device]
    transpiler = get_transpiler()
    run_engine = d_engine[transpiler].get(engine, None)

    if run_engine is None:
        raise ValueError("engine {} can not be supported on device: {}".format(engine, device))
    return run_engine


def get_transpiler():
T
tangwei 已提交
42 43 44 45 46 47
    FNULL = open(os.devnull, 'w')
    cmd = ["python", "-c",
           "import paddle.fluid as fluid; fleet_ptr = fluid.core.Fleet(); [fleet_ptr.copy_table_by_feasign(10, 10, [2020, 1010])];"]
    proc = subprocess.Popen(cmd, stdout=FNULL, stderr=FNULL, cwd=os.getcwd())
    ret = proc.wait()
    if ret == -11:
T
tangwei 已提交
48
        return "PSLIB"
T
tangwei 已提交
49
    else:
T
tangwei 已提交
50
        return "TRANSPILER"
T
tangwei 已提交
51 52


T
tangwei 已提交
53
def set_runtime_envs(cluster_envs, engine_yaml):
T
tangwei 已提交
54
    def get_engine_extras():
T
tangwei 已提交
55 56
        with open(engine_yaml, 'r') as rb:
            _envs = yaml.load(rb.read(), Loader=yaml.FullLoader)
T
tangwei 已提交
57 58 59 60 61 62 63 64

        flattens = envs.flatten_environs(_envs)

        engine_extras = {}
        for k, v in flattens.items():
            if k.startswith("train.trainer."):
                engine_extras[k] = v
        return engine_extras
T
tangwei 已提交
65 66 67

    if cluster_envs is None:
        cluster_envs = {}
T
tangwei 已提交
68 69 70

    envs.set_runtime_environs(cluster_envs)
    envs.set_runtime_environs(get_engine_extras())
T
fix bug  
tangwei 已提交
71 72 73

    need_print = {}
    for k, v in os.environ.items():
T
tangwei 已提交
74
        if k.startswith("train.trainer."):
T
fix bug  
tangwei 已提交
75 76 77
            need_print[k] = v

    print(envs.pretty_print_envs(need_print, ("Runtime Envs", "Value")))
T
tangwei 已提交
78 79


T
tangwei 已提交
80
def single_engine(args):
T
tangwei 已提交
81
    print("use single engine to run model: {}".format(args.model))
T
fix bug  
tangwei 已提交
82 83

    single_envs = {}
T
tangwei 已提交
84 85 86
    single_envs["train.trainer.trainer"] = "SingleTrainer"
    single_envs["train.trainer.threads"] = "2"
    single_envs["train.trainer.engine"] = "single"
T
tangwei 已提交
87 88
    single_envs["train.trainer.device"] = args.device

T
tangwei 已提交
89
    set_runtime_envs(single_envs, args.model)
T
tangwei 已提交
90 91 92 93 94
    trainer = TrainerFactory.create(args.model)
    return trainer


def cluster_engine(args):
T
tangwei 已提交
95 96
    print("launch cluster engine with cluster to run model: {}".format(args.model))

T
fix bug  
tangwei 已提交
97
    cluster_envs = {}
T
tangwei 已提交
98 99
    cluster_envs["train.trainer.trainer"] = "ClusterTrainer"
    cluster_envs["train.trainer.engine"] = "cluster"
T
tangwei 已提交
100 101
    cluster_envs["train.trainer.device"] = args.device

T
tangwei 已提交
102
    set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
103 104 105 106 107 108

    trainer = TrainerFactory.create(args.model)
    return trainer


def cluster_mpi_engine(args):
T
tangwei 已提交
109 110
    print("launch cluster engine with cluster to run model: {}".format(args.model))

T
fix bug  
tangwei 已提交
111
    cluster_envs = {}
T
tangwei 已提交
112
    cluster_envs["train.trainer.trainer"] = "CtrCodingTrainer"
T
tangwei 已提交
113 114
    cluster_envs["train.trainer.device"] = args.device

T
tangwei 已提交
115
    set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
116

T
tangwei 已提交
117 118 119 120 121
    trainer = TrainerFactory.create(args.model)
    return trainer


def local_cluster_engine(args):
T
tangwei 已提交
122
    print("launch cluster engine with cluster to run model: {}".format(args.model))
T
tangwei 已提交
123 124
    from fleetrec.core.engine.local_cluster_engine import LocalClusterEngine

T
tangwei 已提交
125 126 127 128 129
    cluster_envs = {}
    cluster_envs["server_num"] = 1
    cluster_envs["worker_num"] = 1
    cluster_envs["start_port"] = 36001
    cluster_envs["log_dir"] = "logs"
T
tangwei 已提交
130 131 132 133
    cluster_envs["train.trainer.trainer"] = "ClusterTrainer"
    cluster_envs["train.trainer.strategy"] = "async"
    cluster_envs["train.trainer.threads"] = "2"
    cluster_envs["train.trainer.engine"] = "local_cluster"
T
tangwei 已提交
134
    cluster_envs["train.trainer.device"] = args.device
T
fix bug  
tangwei 已提交
135
    cluster_envs["CPU_NUM"] = "2"
T
tangwei 已提交
136

T
tangwei 已提交
137
    set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
138

T
tangwei 已提交
139 140
    launch = LocalClusterEngine(cluster_envs, args.model)
    return launch
T
tangwei 已提交
141

T
tangwei 已提交
142

T
tangwei 已提交
143
def local_mpi_engine(args):
T
tangwei 已提交
144
    print("launch cluster engine with cluster to run model: {}".format(args.model))
T
tangwei 已提交
145
    from fleetrec.core.engine.local_mpi_engine import LocalMPIEngine
T
tangwei 已提交
146

T
tangwei 已提交
147
    print("use 1X1 MPI ClusterTraining at localhost to run model: {}".format(args.model))
T
tangwei 已提交
148

T
tangwei 已提交
149 150 151
    mpi = util.run_which("mpirun")
    if not mpi:
        raise RuntimeError("can not find mpirun, please check environment")
T
fix bug  
tangwei 已提交
152 153
    cluster_envs = {}
    cluster_envs["mpirun"] = mpi
T
tangwei 已提交
154
    cluster_envs["train.trainer.trainer"] = "CtrCodingTrainer"
T
fix bug  
tangwei 已提交
155
    cluster_envs["log_dir"] = "logs"
T
tangwei 已提交
156
    cluster_envs["train.trainer.engine"] = "local_cluster"
T
tangwei 已提交
157
    cluster_envs["train.trainer.device"] = args.device
T
tangwei 已提交
158

T
tangwei 已提交
159
    set_runtime_envs(cluster_envs, args.model)
T
tangwei 已提交
160 161 162 163
    launch = LocalMPIEngine(cluster_envs, args.model)
    return launch


T
tangwei 已提交
164 165
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='fleet-rec run')
T
tangwei 已提交
166
    parser.add_argument("-m", "--model", type=str)
T
tangwei 已提交
167 168
    parser.add_argument("-e", "--engine", type=str, choices=["single", "local_cluster", "cluster"])
    parser.add_argument("-d", "--device", type=str, choices=["cpu", "gpu"], default="cpu")
T
tangwei 已提交
169 170

    args = parser.parse_args()
T
tangwei 已提交
171 172
    args.engine = args.engine.upper()
    args.device = args.device.upper()
T
tangwei 已提交
173

T
tangwei 已提交
174
    if not os.path.isfile(args.model):
T
bug fix  
tangwei12 已提交
175
        raise IOError("argument model: {} do not exist".format(args.model))
T
tangwei 已提交
176
    engine_registry()
T
tangwei 已提交
177

T
tangwei 已提交
178
    which_engine = get_engine(args.engine, args.device)
T
bug fix  
tangwei12 已提交
179

T
tangwei 已提交
180 181
    engine = which_engine(args)
    engine.run()