diff --git a/fleet_rec/core/engine/cluster/cluster.py b/fleet_rec/core/engine/cluster/cluster.py index 7136d60ff50575a89d8dced633d74e8abdde1c75..7386d36e6d2a26376e115e3ab5f1fb3058c542ec 100644 --- a/fleet_rec/core/engine/cluster/cluster.py +++ b/fleet_rec/core/engine/cluster/cluster.py @@ -21,6 +21,7 @@ import os import copy from fleetrec.core.engine.engine import Engine +from fleetrec.core.factory import TrainerFactory from fleetrec.core.utils import envs @@ -30,16 +31,8 @@ class ClusterEngine(Engine): self.submit_script = os.path.join(abs_dir, "master.sh") def start_worker_procs(self): - default_env = os.environ.copy() - current_env = copy.copy(default_env) - current_env.pop("http_proxy", None) - current_env.pop("https_proxy", None) - - cmd = ("bash {}".format(self.submit_script)).split(" ") - proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd()) - proc.wait() - - print("all workers and parameter servers already completed", file=sys.stderr) + trainer = TrainerFactory.create(self.trainer) + trainer.run() def start_master_procs(self): default_env = os.environ.copy() @@ -51,8 +44,6 @@ class ClusterEngine(Engine): proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd()) proc.wait() - print("all workers and parameter servers already completed", file=sys.stderr) - def run(self): role = envs.get_runtime_environ("engine_role") diff --git a/fleet_rec/core/engine/cluster/master.sh b/fleet_rec/core/engine/cluster/master.sh index fea51c4df414a0648d708f4c604e0972f8cd2b4b..47db58520ad4c0e03518a3e4d4ab24846632d290 100644 --- a/fleet_rec/core/engine/cluster/master.sh +++ b/fleet_rec/core/engine/cluster/master.sh @@ -19,7 +19,6 @@ declare g_run_stage="" declare -r CALL="x" ################################################################################ - #----------------------------------------------------------------------------------------------------------------- # Function: get_cur_path # Description: get churrent path @@ -31,13 +30,12 @@ declare -r CALL="x" # Return: 0 -- success; not 0 -- failure # Others: N/A #----------------------------------------------------------------------------------------------------------------- -get_cur_path() -{ +get_cur_path() { g_run_stage="get_cur_path" - cd "$(dirname "${BASH_SOURCE-$0}")" - g_curPath="${PWD}" - g_scriptName="$(basename "${BASH_SOURCE-$0}")" - cd - >/dev/null + cd "$(dirname "${BASH_SOURCE-$0}")" + g_curPath="${PWD}" + g_scriptName="$(basename "${BASH_SOURCE-$0}")" + cd - >/dev/null } #----------------------------------------------------------------------------------------------------------------- @@ -45,15 +43,13 @@ get_cur_path() #param : N/A #return : 0 -- success; not 0 -- failure #----------------------------------------------------------------------------------------------------------------- -function check_error() -{ - if [ ${?} -ne 0 ] - then - echo "execute " + $g_run_stage + " raise exception! please check ..." - exit 1 - fi +function check_error() { + if [ ${?} -ne 0 ]; then + echo "execute " + $g_run_stage + " raise exception! please check ..." + exit 1 + fi } -source ${engine_scrpit} +source ${engine_submit_scrpit} -main \ No newline at end of file +main diff --git a/fleet_rec/run.py b/fleet_rec/run.py index e77954ef1d5d82b9535b71787bf255a5042da2c8..34adbcd76cb47b8dca80893d8e61698c09b1aeaf 100755 --- a/fleet_rec/run.py +++ b/fleet_rec/run.py @@ -154,8 +154,10 @@ def cluster_engine(args): cluster_envs["train.trainer.platform"] = envs.get_platform() print("launch {} engine with cluster to with model: {}".format(trainer, args.model)) set_runtime_envs(cluster_envs, args.model) - launch = ClusterEngine(cluster_envs, args.model) + + launch = LocalClusterEngine(cluster_envs, args.model) return launch + if args.role == "worker": return worker() else: diff --git a/models/rank/dnn/submit.sh b/models/rank/dnn/submit.sh index c5c7ce82cd77bdfd244efe868abf1c3d6eb1f230..1a11360cccb3bfb8147300db4f35e1b66207098a 100644 --- a/models/rank/dnn/submit.sh +++ b/models/rank/dnn/submit.sh @@ -34,7 +34,7 @@ function package() { echo "copy python from " ${engine_package_python} " to " ${temp} mkdir ${temp}/package/whl - cp ${engine_package_paddlerec} ${temp}/package/whl/ + cp ${engine_package_paddlerec} ${temp}/package/whl/ echo "copy " ${engine_package_paddlerec} " to " ${temp}"/whl/" } @@ -63,13 +63,11 @@ function after_submit() { #----------------------------------------------------------------------------------------------------------------- function submit() { g_run_stage="submit" - g_job_name="paddle_rec_mpi" g_hdfs_path=$g_hdfs_path - g_job_entry="worker.sh" - engine_hdfs_output=${engine_hdfs_output}/`date +%Y%m%d%H%M%S` + engine_hdfs_output=${engine_hdfs_output}/$(date +%Y%m%d%H%M%S) cd ${engine_temp_path} @@ -79,9 +77,8 @@ function submit() { --hdfs ${engine_hdfs_name} \ --ugi ${engine_hdfs_ugi} \ --hout ${engine_hdfs_output} \ - --files ${engine_temp_path} \ + --files ./package \ -l nodes=${engine_submit_nodes},walltime=1000:00:00,resource=full ${g_job_entry} - } function main() { @@ -90,4 +87,4 @@ function main() { before_submit submit after_submit -} \ No newline at end of file +} diff --git a/models/rank/dnn/worker.sh b/models/rank/dnn/worker.sh index 2211a04a21871f4ecf07a89b3904d72a3ff08708..57cf12c9e81f93d27972bfe4254cea1e9a72f599 100644 --- a/models/rank/dnn/worker.sh +++ b/models/rank/dnn/worker.sh @@ -41,24 +41,24 @@ function check_error() { #----------------------------------------------------------------------------------------------------------------- function env_prepare() { g_run_stage="env_prepare" -} + WORKDIR=$(pwd) + mpirun -npernode 1 mv package/* ./ + echo "current:"$WORKDIR + export LIBRARY_PATH=$WORKDIR/python/lib:$LIBRARY_PATH -function user_define_variables() { - echo "user_define_variables" - g_run_stage="user_define_variables" + mpirun -npernode 1 python/bin/python -m pip install whl/fleet_rec-0.0.2-py2-none-any.whl --index-url=http://pip.baidu.com/pypi/simple --trusted-host pip.baidu.com >/dev/null + check_error } -function job() { - echo "job" - g_run_stage="job" - - # mpirun -npernode 2 -timestamp-output -tag-output -machinefile ${PBS_NODEFILE} python -u ${g_job_entry} +function run() { + echo "run" + g_run_stage="run" + mpirun -npernode 2 -timestamp-output -tag-output -machinefile ${PBS_NODEFILE} python/bin/python -u -m fleetrec.run -m fleetrec.models.rank.dnn --engine cluster --role worker } function main() { - user_define_variables env_prepare - job + run } main