diff --git a/fleet_rec/core/engine/cluster/cluster.py b/fleet_rec/core/engine/cluster/cluster.py index abb3cf83e89347d1ca215ceb2ba789c55e32e07f..ffe54ff7bc2e1d9bc20dde0e877f72e1844931f3 100644 --- a/fleet_rec/core/engine/cluster/cluster.py +++ b/fleet_rec/core/engine/cluster/cluster.py @@ -21,15 +21,28 @@ import os import copy from fleetrec.core.engine.engine import Engine +from fleetrec.core.utils import envs class ClusterEngine(Engine): def __init_impl__(self): abs_dir = os.path.dirname(os.path.abspath(__file__)) - self.submit_script = os.path.join(abs_dir, "submit.sh") - self.job_script = os.path.join(abs_dir, "job.sh") + self.submit_script = os.path.join(abs_dir, "master.sh") + self.job_script = os.path.join(abs_dir, "worker.sh") - def start_procs(self): + def start_worker_procs(self): + default_env = os.environ.copy() + current_env = copy.copy(default_env) + current_env.pop("http_proxy", None) + current_env.pop("https_proxy", None) + + cmd = ("bash {}".format(self.submit_script)).split(" ") + proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd()) + proc.wait() + + print("all workers and parameter servers already completed", file=sys.stderr) + + def start_master_procs(self): default_env = os.environ.copy() current_env = copy.copy(default_env) current_env.pop("http_proxy", None) @@ -42,4 +55,13 @@ class ClusterEngine(Engine): print("all workers and parameter servers already completed", file=sys.stderr) def run(self): - self.start_procs() + role = envs.get_runtime_environ("engine_role") + + if role == "MASTER": + self.start_master_procs() + + elif role == "WORKER": + self.start_worker_procs() + + else: + raise ValueError("role {} error, must in MASTER/WORKER".format(role)) diff --git a/fleet_rec/core/engine/cluster/submit.sh b/fleet_rec/core/engine/cluster/submit.sh index 2382911d62807ea2665ff5a124a6deae024dc59f..f34adc533a8f933b4aa7ce22df57c457d70adc3e 100644 --- a/fleet_rec/core/engine/cluster/submit.sh +++ b/fleet_rec/core/engine/cluster/submit.sh @@ -13,17 +13,6 @@ declare g_scriptName="" declare g_workPath="" declare g_run_stage="" -# ----------------------------for hpc submit -------------------------------- # -declare g_hpc_path="" -declare g_job_name="" -declare g_qsub_conf="" -declare g_hdfs_path="" -declare g_hdfs_ugi="" -declare g_hdfs_output="" -declare g_submit_package="" -declare g_job_nodes="" -declare g_job_entry="" - # ---------------------------------------------------------------------------- # # const define # # ---------------------------------------------------------------------------- # @@ -51,19 +40,6 @@ get_cur_path() cd - >/dev/null } - -#----------------------------------------------------------------------------------------------------------------- -#fun : get argument from env, set it into variables -#param : N/A -#return : 0 -- success; not 0 -- failure -#----------------------------------------------------------------------------------------------------------------- -function vars_get_from_env() { - g_run_stage="vars_get_from_env" - g_hpc_path=${engine.} - g_crontabDate=$2 -} - - #----------------------------------------------------------------------------------------------------------------- #fun : check function return code #param : N/A @@ -77,73 +53,3 @@ function check_error() exit 1 fi } - - -#----------------------------------------------------------------------------------------------------------------- -#fun : package -#param : N/A -#return : 0 -- success; not 0 -- failure -#----------------------------------------------------------------------------------------------------------------- -function package() { - g_run_stage="package" - -} - - -#----------------------------------------------------------------------------------------------------------------- -#fun : before hook submit to cluster -#param : N/A -#return : 0 -- success; not 0 -- failure -#----------------------------------------------------------------------------------------------------------------- -function before_submit() { - -} - - -#----------------------------------------------------------------------------------------------------------------- -#fun : after hook submit to cluster -#param : N/A -#return : 0 -- success; not 0 -- failure -#----------------------------------------------------------------------------------------------------------------- -function after_submit() { - -} - - -#----------------------------------------------------------------------------------------------------------------- -#fun : submit to cluster -#param : N/A -#return : 0 -- success; not 0 -- failure -#----------------------------------------------------------------------------------------------------------------- -function submit() { - g_run_stage="submit" - - before_submit - - ${g_hpc_path}/bin/qsub_f \ - -N ${g_job_name} \ - --conf ${g_qsub_conf} \ - --hdfs ${g_hdfs_path} \ - --ugi ${g_hdfs_ugi} \ - --hout ${g_hdfs_output} \ - --files ${g_submit_package} \ - -l nodes=${g_job_nodes},walltime=1000:00:00,resource=full ${g_job_entry} - - after_submit -} - -function main() { - get_cur_path - check_error - - vars_get_from_env - check_error - - package - check_error - - submit - check_error -} - -main diff --git a/fleet_rec/core/engine/cluster/job.sh b/fleet_rec/core/engine/cluster/worker.sh similarity index 100% rename from fleet_rec/core/engine/cluster/job.sh rename to fleet_rec/core/engine/cluster/worker.sh diff --git a/fleet_rec/core/utils/envs.py b/fleet_rec/core/utils/envs.py index 2d4cfadea305e2d42bd454f3ddc4048dd593945a..93c9a68af6ba31906b0a387fc52bd2e943ab010a 100755 --- a/fleet_rec/core/utils/envs.py +++ b/fleet_rec/core/utils/envs.py @@ -20,13 +20,13 @@ from contextlib import closing global_envs = {} -def flatten_environs(envs): +def flatten_environs(envs, separator="."): flatten_dict = {} assert isinstance(envs, dict) def fatten_env_namespace(namespace_nests, local_envs): if not isinstance(local_envs, dict): - global_k = ".".join(namespace_nests) + global_k = separator.join(namespace_nests) flatten_dict[global_k] = str(local_envs) else: for k, v in local_envs.items(): @@ -35,7 +35,7 @@ def flatten_environs(envs): nests.append(k) fatten_env_namespace(nests, v) else: - global_k = ".".join(namespace_nests + [k]) + global_k = separator.join(namespace_nests + [k]) flatten_dict[global_k] = str(v) for k, v in envs.items(): diff --git a/fleet_rec/run.py b/fleet_rec/run.py index e9b91a3214323c4ec902a8b780fa48b47c33b6e9..e77954ef1d5d82b9535b71787bf255a5042da2c8 100755 --- a/fleet_rec/run.py +++ b/fleet_rec/run.py @@ -1,6 +1,8 @@ import argparse import os import subprocess +import tempfile + import yaml from fleetrec.core.factory import TrainerFactory @@ -109,17 +111,55 @@ def single_engine(args): def cluster_engine(args): from fleetrec.core.engine.cluster.cluster import ClusterEngine - trainer = get_trainer_prefix(args) + "ClusterTrainer" - cluster_envs = {} - cluster_envs["train.trainer.trainer"] = trainer - cluster_envs["train.trainer.engine"] = "cluster" - cluster_envs["train.trainer.device"] = args.device - cluster_envs["train.trainer.platform"] = envs.get_platform() - print("launch {} engine with cluster to run model: {}".format(trainer, args.model)) - set_runtime_envs(cluster_envs, args.model) - launch = LocalClusterEngine(cluster_envs, args.model) - return launch + def update_workspace(cluster_envs): + workspace = cluster_envs.get("engine_workspace", None) + if not workspace: + return + + # is fleet inner models + if workspace.startswith("fleetrec."): + fleet_package = envs.get_runtime_environ("PACKAGE_BASE") + workspace_dir = workspace.split("fleetrec.")[1].replace(".", "/") + path = os.path.join(fleet_package, workspace_dir) + else: + path = workspace + + for name, value in cluster_envs.items(): + if isinstance(value, str): + value = value.replace("{workspace}", path) + cluster_envs[name] = value + + def master(): + with open(args.backend, 'r') as rb: + _envs = yaml.load(rb.read(), Loader=yaml.FullLoader) + + flattens = envs.flatten_environs(_envs, "_") + flattens["engine_role"] = args.role + flattens["engine_temp_path"] = tempfile.mkdtemp() + update_workspace(flattens) + + envs.set_runtime_environs(flattens) + print(envs.pretty_print_envs(flattens, ("Submit Runtime Envs", "Value"))) + + launch = ClusterEngine(None, args.model) + return launch + + def worker(): + trainer = get_trainer_prefix(args) + "ClusterTrainer" + cluster_envs = {} + cluster_envs["train.trainer.trainer"] = trainer + cluster_envs["train.trainer.engine"] = "cluster" + cluster_envs["train.trainer.device"] = args.device + cluster_envs["train.trainer.platform"] = envs.get_platform() + print("launch {} engine with cluster to with model: {}".format(trainer, args.model)) + set_runtime_envs(cluster_envs, args.model) + launch = ClusterEngine(cluster_envs, args.model) + return launch + if args.role == "worker": + return worker() + else: + return master() def cluster_mpi_engine(args): diff --git a/models/rank/dnn/engine.yaml b/models/rank/dnn/backend.yaml similarity index 79% rename from models/rank/dnn/engine.yaml rename to models/rank/dnn/backend.yaml index 0b730092c1c29148768271105c8414216fcd88d2..1f448998d7a37b798d0b84eb248641f0ee84cbb2 100755 --- a/models/rank/dnn/engine.yaml +++ b/models/rank/dnn/backend.yaml @@ -13,18 +13,15 @@ # limitations under the License. engine: + workspace: "fleetrec.models.rank.dnn" backend: "MPI" package: - build_script: "/xxxx/xx/x/x/" + build_script: "{workspace}/package.sh" python: "/home/tangwei/fleet_rec_env/cpython-2.7.11-ucs4" paddlerec: "/home/tangwei/fleet_rec_env/FleetRec" - paddlepaddle: "/home/tangwei/fleet_rec_env/FleetRec" submit: - submit_script: "xx" - job: "xxx" - conf: "qsub_f.conf" hpc: "/home/tangwei/submit-tieba/smart_client/" hdfs: "xx" hout: "xxx" @@ -33,7 +30,4 @@ engine: before_hook: "" end_hook: "" - define: - user1: "user_define1" - user2: "user_define2" - define: "user_define3" \ No newline at end of file + scrpit: "{workspace}/submit.sh" \ No newline at end of file diff --git a/models/rank/dnn/submit.sh b/models/rank/dnn/submit.sh new file mode 100644 index 0000000000000000000000000000000000000000..f1b173481c3246898b4a454bf0002f2b9b40f451 --- /dev/null +++ b/models/rank/dnn/submit.sh @@ -0,0 +1,77 @@ +#!/bin/bash + +################################################### +# Usage: submit.sh +# Description: run mpi submit clinet implement +################################################### + + +#----------------------------------------------------------------------------------------------------------------- +#fun : get argument from env, set it into variables +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function vars_get_from_env() { + g_run_stage="vars_get_from_env" + g_hpc_path=${engine.} + g_crontabDate=$2 +} + + +#----------------------------------------------------------------------------------------------------------------- +#fun : package +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function package() { + g_run_stage="package" + +} + + +#----------------------------------------------------------------------------------------------------------------- +#fun : before hook submit to cluster +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function before_submit() { + +} + + +#----------------------------------------------------------------------------------------------------------------- +#fun : after hook submit to cluster +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function after_submit() { + +} + + +#----------------------------------------------------------------------------------------------------------------- +#fun : submit to cluster +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function submit() { + g_run_stage="submit" + + before_submit + + ${g_hpc_path}/bin/qsub_f \ + -N ${g_job_name} \ + --conf ${g_qsub_conf} \ + --hdfs ${g_hdfs_path} \ + --ugi ${g_hdfs_ugi} \ + --hout ${g_hdfs_output} \ + --files ${g_submit_package} \ + -l nodes=${g_job_nodes},walltime=1000:00:00,resource=full ${g_job_entry} + + after_submit +} + + +function main() { + echo "run submit done" +}