提交 77a2da6d 编写于 作者: T tangwei

add qsub submit

上级 7c10b488
...@@ -21,6 +21,7 @@ import os ...@@ -21,6 +21,7 @@ import os
import copy import copy
from fleetrec.core.engine.engine import Engine from fleetrec.core.engine.engine import Engine
from fleetrec.core.factory import TrainerFactory
from fleetrec.core.utils import envs from fleetrec.core.utils import envs
...@@ -30,16 +31,8 @@ class ClusterEngine(Engine): ...@@ -30,16 +31,8 @@ class ClusterEngine(Engine):
self.submit_script = os.path.join(abs_dir, "master.sh") self.submit_script = os.path.join(abs_dir, "master.sh")
def start_worker_procs(self): def start_worker_procs(self):
default_env = os.environ.copy() trainer = TrainerFactory.create(self.trainer)
current_env = copy.copy(default_env) trainer.run()
current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None)
cmd = ("bash {}".format(self.submit_script)).split(" ")
proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd())
proc.wait()
print("all workers and parameter servers already completed", file=sys.stderr)
def start_master_procs(self): def start_master_procs(self):
default_env = os.environ.copy() default_env = os.environ.copy()
...@@ -51,8 +44,6 @@ class ClusterEngine(Engine): ...@@ -51,8 +44,6 @@ class ClusterEngine(Engine):
proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd()) proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd())
proc.wait() proc.wait()
print("all workers and parameter servers already completed", file=sys.stderr)
def run(self): def run(self):
role = envs.get_runtime_environ("engine_role") role = envs.get_runtime_environ("engine_role")
......
...@@ -19,7 +19,6 @@ declare g_run_stage="" ...@@ -19,7 +19,6 @@ declare g_run_stage=""
declare -r CALL="x" declare -r CALL="x"
################################################################################ ################################################################################
#----------------------------------------------------------------------------------------------------------------- #-----------------------------------------------------------------------------------------------------------------
# Function: get_cur_path # Function: get_cur_path
# Description: get churrent path # Description: get churrent path
...@@ -31,8 +30,7 @@ declare -r CALL="x" ...@@ -31,8 +30,7 @@ declare -r CALL="x"
# Return: 0 -- success; not 0 -- failure # Return: 0 -- success; not 0 -- failure
# Others: N/A # Others: N/A
#----------------------------------------------------------------------------------------------------------------- #-----------------------------------------------------------------------------------------------------------------
get_cur_path() get_cur_path() {
{
g_run_stage="get_cur_path" g_run_stage="get_cur_path"
cd "$(dirname "${BASH_SOURCE-$0}")" cd "$(dirname "${BASH_SOURCE-$0}")"
g_curPath="${PWD}" g_curPath="${PWD}"
...@@ -45,15 +43,13 @@ get_cur_path() ...@@ -45,15 +43,13 @@ get_cur_path()
#param : N/A #param : N/A
#return : 0 -- success; not 0 -- failure #return : 0 -- success; not 0 -- failure
#----------------------------------------------------------------------------------------------------------------- #-----------------------------------------------------------------------------------------------------------------
function check_error() function check_error() {
{ if [ ${?} -ne 0 ]; then
if [ ${?} -ne 0 ]
then
echo "execute " + $g_run_stage + " raise exception! please check ..." echo "execute " + $g_run_stage + " raise exception! please check ..."
exit 1 exit 1
fi fi
} }
source ${engine_scrpit} source ${engine_submit_scrpit}
main main
...@@ -154,8 +154,10 @@ def cluster_engine(args): ...@@ -154,8 +154,10 @@ def cluster_engine(args):
cluster_envs["train.trainer.platform"] = envs.get_platform() cluster_envs["train.trainer.platform"] = envs.get_platform()
print("launch {} engine with cluster to with model: {}".format(trainer, args.model)) print("launch {} engine with cluster to with model: {}".format(trainer, args.model))
set_runtime_envs(cluster_envs, args.model) set_runtime_envs(cluster_envs, args.model)
launch = ClusterEngine(cluster_envs, args.model)
launch = LocalClusterEngine(cluster_envs, args.model)
return launch return launch
if args.role == "worker": if args.role == "worker":
return worker() return worker()
else: else:
......
...@@ -63,13 +63,11 @@ function after_submit() { ...@@ -63,13 +63,11 @@ function after_submit() {
#----------------------------------------------------------------------------------------------------------------- #-----------------------------------------------------------------------------------------------------------------
function submit() { function submit() {
g_run_stage="submit" g_run_stage="submit"
g_job_name="paddle_rec_mpi" g_job_name="paddle_rec_mpi"
g_hdfs_path=$g_hdfs_path g_hdfs_path=$g_hdfs_path
g_job_entry="worker.sh" g_job_entry="worker.sh"
engine_hdfs_output=${engine_hdfs_output}/`date +%Y%m%d%H%M%S` engine_hdfs_output=${engine_hdfs_output}/$(date +%Y%m%d%H%M%S)
cd ${engine_temp_path} cd ${engine_temp_path}
...@@ -79,9 +77,8 @@ function submit() { ...@@ -79,9 +77,8 @@ function submit() {
--hdfs ${engine_hdfs_name} \ --hdfs ${engine_hdfs_name} \
--ugi ${engine_hdfs_ugi} \ --ugi ${engine_hdfs_ugi} \
--hout ${engine_hdfs_output} \ --hout ${engine_hdfs_output} \
--files ${engine_temp_path} \ --files ./package \
-l nodes=${engine_submit_nodes},walltime=1000:00:00,resource=full ${g_job_entry} -l nodes=${engine_submit_nodes},walltime=1000:00:00,resource=full ${g_job_entry}
} }
function main() { function main() {
......
...@@ -41,24 +41,24 @@ function check_error() { ...@@ -41,24 +41,24 @@ function check_error() {
#----------------------------------------------------------------------------------------------------------------- #-----------------------------------------------------------------------------------------------------------------
function env_prepare() { function env_prepare() {
g_run_stage="env_prepare" g_run_stage="env_prepare"
} WORKDIR=$(pwd)
mpirun -npernode 1 mv package/* ./
echo "current:"$WORKDIR
export LIBRARY_PATH=$WORKDIR/python/lib:$LIBRARY_PATH
function user_define_variables() { mpirun -npernode 1 python/bin/python -m pip install whl/fleet_rec-0.0.2-py2-none-any.whl --index-url=http://pip.baidu.com/pypi/simple --trusted-host pip.baidu.com >/dev/null
echo "user_define_variables" check_error
g_run_stage="user_define_variables"
} }
function job() { function run() {
echo "job" echo "run"
g_run_stage="job" g_run_stage="run"
mpirun -npernode 2 -timestamp-output -tag-output -machinefile ${PBS_NODEFILE} python/bin/python -u -m fleetrec.run -m fleetrec.models.rank.dnn --engine cluster --role worker
# mpirun -npernode 2 -timestamp-output -tag-output -machinefile ${PBS_NODEFILE} python -u ${g_job_entry}
} }
function main() { function main() {
user_define_variables
env_prepare env_prepare
job run
} }
main main
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册