提交 e1f9013a 编写于 作者: T tangwei

add mpi cluster

上级 f18eea21
......@@ -21,15 +21,28 @@ import os
import copy
from fleetrec.core.engine.engine import Engine
from fleetrec.core.utils import envs
class ClusterEngine(Engine):
def __init_impl__(self):
abs_dir = os.path.dirname(os.path.abspath(__file__))
self.submit_script = os.path.join(abs_dir, "submit.sh")
self.job_script = os.path.join(abs_dir, "job.sh")
self.submit_script = os.path.join(abs_dir, "master.sh")
self.job_script = os.path.join(abs_dir, "worker.sh")
def start_procs(self):
def start_worker_procs(self):
default_env = os.environ.copy()
current_env = copy.copy(default_env)
current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None)
cmd = ("bash {}".format(self.submit_script)).split(" ")
proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd())
proc.wait()
print("all workers and parameter servers already completed", file=sys.stderr)
def start_master_procs(self):
default_env = os.environ.copy()
current_env = copy.copy(default_env)
current_env.pop("http_proxy", None)
......@@ -42,4 +55,13 @@ class ClusterEngine(Engine):
print("all workers and parameter servers already completed", file=sys.stderr)
def run(self):
self.start_procs()
role = envs.get_runtime_environ("engine_role")
if role == "MASTER":
self.start_master_procs()
elif role == "WORKER":
self.start_worker_procs()
else:
raise ValueError("role {} error, must in MASTER/WORKER".format(role))
......@@ -13,17 +13,6 @@ declare g_scriptName=""
declare g_workPath=""
declare g_run_stage=""
# ----------------------------for hpc submit -------------------------------- #
declare g_hpc_path=""
declare g_job_name=""
declare g_qsub_conf=""
declare g_hdfs_path=""
declare g_hdfs_ugi=""
declare g_hdfs_output=""
declare g_submit_package=""
declare g_job_nodes=""
declare g_job_entry=""
# ---------------------------------------------------------------------------- #
# const define #
# ---------------------------------------------------------------------------- #
......@@ -51,19 +40,6 @@ get_cur_path()
cd - >/dev/null
}
#-----------------------------------------------------------------------------------------------------------------
#fun : get argument from env, set it into variables
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function vars_get_from_env() {
g_run_stage="vars_get_from_env"
g_hpc_path=${engine.}
g_crontabDate=$2
}
#-----------------------------------------------------------------------------------------------------------------
#fun : check function return code
#param : N/A
......@@ -77,73 +53,3 @@ function check_error()
exit 1
fi
}
#-----------------------------------------------------------------------------------------------------------------
#fun : package
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function package() {
g_run_stage="package"
}
#-----------------------------------------------------------------------------------------------------------------
#fun : before hook submit to cluster
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function before_submit() {
}
#-----------------------------------------------------------------------------------------------------------------
#fun : after hook submit to cluster
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function after_submit() {
}
#-----------------------------------------------------------------------------------------------------------------
#fun : submit to cluster
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function submit() {
g_run_stage="submit"
before_submit
${g_hpc_path}/bin/qsub_f \
-N ${g_job_name} \
--conf ${g_qsub_conf} \
--hdfs ${g_hdfs_path} \
--ugi ${g_hdfs_ugi} \
--hout ${g_hdfs_output} \
--files ${g_submit_package} \
-l nodes=${g_job_nodes},walltime=1000:00:00,resource=full ${g_job_entry}
after_submit
}
function main() {
get_cur_path
check_error
vars_get_from_env
check_error
package
check_error
submit
check_error
}
main
......@@ -20,13 +20,13 @@ from contextlib import closing
global_envs = {}
def flatten_environs(envs):
def flatten_environs(envs, separator="."):
flatten_dict = {}
assert isinstance(envs, dict)
def fatten_env_namespace(namespace_nests, local_envs):
if not isinstance(local_envs, dict):
global_k = ".".join(namespace_nests)
global_k = separator.join(namespace_nests)
flatten_dict[global_k] = str(local_envs)
else:
for k, v in local_envs.items():
......@@ -35,7 +35,7 @@ def flatten_environs(envs):
nests.append(k)
fatten_env_namespace(nests, v)
else:
global_k = ".".join(namespace_nests + [k])
global_k = separator.join(namespace_nests + [k])
flatten_dict[global_k] = str(v)
for k, v in envs.items():
......
import argparse
import os
import subprocess
import tempfile
import yaml
from fleetrec.core.factory import TrainerFactory
......@@ -109,17 +111,55 @@ def single_engine(args):
def cluster_engine(args):
from fleetrec.core.engine.cluster.cluster import ClusterEngine
trainer = get_trainer_prefix(args) + "ClusterTrainer"
cluster_envs = {}
cluster_envs["train.trainer.trainer"] = trainer
cluster_envs["train.trainer.engine"] = "cluster"
cluster_envs["train.trainer.device"] = args.device
cluster_envs["train.trainer.platform"] = envs.get_platform()
print("launch {} engine with cluster to run model: {}".format(trainer, args.model))
set_runtime_envs(cluster_envs, args.model)
launch = LocalClusterEngine(cluster_envs, args.model)
return launch
def update_workspace(cluster_envs):
workspace = cluster_envs.get("engine_workspace", None)
if not workspace:
return
# is fleet inner models
if workspace.startswith("fleetrec."):
fleet_package = envs.get_runtime_environ("PACKAGE_BASE")
workspace_dir = workspace.split("fleetrec.")[1].replace(".", "/")
path = os.path.join(fleet_package, workspace_dir)
else:
path = workspace
for name, value in cluster_envs.items():
if isinstance(value, str):
value = value.replace("{workspace}", path)
cluster_envs[name] = value
def master():
with open(args.backend, 'r') as rb:
_envs = yaml.load(rb.read(), Loader=yaml.FullLoader)
flattens = envs.flatten_environs(_envs, "_")
flattens["engine_role"] = args.role
flattens["engine_temp_path"] = tempfile.mkdtemp()
update_workspace(flattens)
envs.set_runtime_environs(flattens)
print(envs.pretty_print_envs(flattens, ("Submit Runtime Envs", "Value")))
launch = ClusterEngine(None, args.model)
return launch
def worker():
trainer = get_trainer_prefix(args) + "ClusterTrainer"
cluster_envs = {}
cluster_envs["train.trainer.trainer"] = trainer
cluster_envs["train.trainer.engine"] = "cluster"
cluster_envs["train.trainer.device"] = args.device
cluster_envs["train.trainer.platform"] = envs.get_platform()
print("launch {} engine with cluster to with model: {}".format(trainer, args.model))
set_runtime_envs(cluster_envs, args.model)
launch = ClusterEngine(cluster_envs, args.model)
return launch
if args.role == "worker":
return worker()
else:
return master()
def cluster_mpi_engine(args):
......
......@@ -13,18 +13,15 @@
# limitations under the License.
engine:
workspace: "fleetrec.models.rank.dnn"
backend: "MPI"
package:
build_script: "/xxxx/xx/x/x/"
build_script: "{workspace}/package.sh"
python: "/home/tangwei/fleet_rec_env/cpython-2.7.11-ucs4"
paddlerec: "/home/tangwei/fleet_rec_env/FleetRec"
paddlepaddle: "/home/tangwei/fleet_rec_env/FleetRec"
submit:
submit_script: "xx"
job: "xxx"
conf: "qsub_f.conf"
hpc: "/home/tangwei/submit-tieba/smart_client/"
hdfs: "xx"
hout: "xxx"
......@@ -33,7 +30,4 @@ engine:
before_hook: ""
end_hook: ""
define:
user1: "user_define1"
user2: "user_define2"
define: "user_define3"
\ No newline at end of file
scrpit: "{workspace}/submit.sh"
\ No newline at end of file
#!/bin/bash
###################################################
# Usage: submit.sh
# Description: run mpi submit clinet implement
###################################################
#-----------------------------------------------------------------------------------------------------------------
#fun : get argument from env, set it into variables
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function vars_get_from_env() {
g_run_stage="vars_get_from_env"
g_hpc_path=${engine.}
g_crontabDate=$2
}
#-----------------------------------------------------------------------------------------------------------------
#fun : package
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function package() {
g_run_stage="package"
}
#-----------------------------------------------------------------------------------------------------------------
#fun : before hook submit to cluster
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function before_submit() {
}
#-----------------------------------------------------------------------------------------------------------------
#fun : after hook submit to cluster
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function after_submit() {
}
#-----------------------------------------------------------------------------------------------------------------
#fun : submit to cluster
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function submit() {
g_run_stage="submit"
before_submit
${g_hpc_path}/bin/qsub_f \
-N ${g_job_name} \
--conf ${g_qsub_conf} \
--hdfs ${g_hdfs_path} \
--ugi ${g_hdfs_ugi} \
--hout ${g_hdfs_output} \
--files ${g_submit_package} \
-l nodes=${g_job_nodes},walltime=1000:00:00,resource=full ${g_job_entry}
after_submit
}
function main() {
echo "run submit done"
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册