提交 e9011c05 编写于 作者: T tangwei

add mpi cluster

上级 1125b796
......@@ -7,6 +7,10 @@ class Engine:
def __init__(self, envs, trainer):
self.envs = envs
self.trainer = trainer
self.__init_impl__()
def __init_impl__(self):
pass
@abc.abstractmethod
def run(self):
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from __future__ import unicode_literals
import subprocess
import sys
import os
import copy
from fleetrec.core.engine.engine import Engine
class QSubClusterEngine(Engine):
def __init_impl__(self):
abs_dir = os.path.dirname(os.path.abspath(__file__))
self.submit_script = os.path.join(abs_dir, "submit.sh")
self.job_script = os.path.join(abs_dir, "job.sh")
def start_procs(self):
default_env = os.environ.copy()
current_env = copy.copy(default_env)
current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None)
cmd = ("bash {}".format(self.submit_script)).split(" ")
proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd())
proc.wait()
print("all workers and parameter servers already completed", file=sys.stderr)
def run(self):
self.start_procs()
#!/bin/bash
###################################################
# Usage: job.sh
# Description: run job on mpi per node
###################################################
# ---------------------------------------------------------------------------- #
# variable define #
# ---------------------------------------------------------------------------- #
declare g_curPath=""
declare g_scriptName=""
declare g_workPath=""
declare g_run_stage=""
# ---------------------------------------------------------------------------- #
# const define #
# ---------------------------------------------------------------------------- #
declare -r FLAGS_communicator_thread_pool_size=5
declare -r FLAGS_communicator_send_queue_size=18
declare -r FLAGS_communicator_thread_pool_size=20
declare -r FLAGS_communicator_max_merge_var_num=18
################################################################################
#-----------------------------------------------------------------------------------------------------------------
#fun : check function return code
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function check_error() {
if [ ${?} -ne 0 ]; then
echo "execute " + $g_run_stage + " raise exception! please check ..."
exit 1
fi
}
#-----------------------------------------------------------------------------------------------------------------
#fun : check function return code
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function env_prepare() {
g_run_stage="env_prepare"
}
function user_define_variables() {
echo "user_define_variables"
g_run_stage="user_define_variables"
}
function job() {
mpirun -npernode 2 -timestamp-output -tag-output -machinefile ${PBS_NODEFILE} python -u ${g_job_entry}
}
function main() {
user_define_variables
env_prepare
job
}
main
#!/bin/bash
###################################################
# Usage: submit.sh
# Description: run mpi submit clinet
###################################################
# ---------------------------------------------------------------------------- #
# variable define #
# ---------------------------------------------------------------------------- #
declare g_curPath=""
declare g_scriptName=""
declare g_workPath=""
declare g_run_stage=""
# ----------------------------for hpc submit -------------------------------- #
declare g_hpc_path=""
declare g_job_name=""
declare g_qsub_conf=""
declare g_hdfs_path=""
declare g_hdfs_ugi=""
declare g_hdfs_output=""
declare g_submit_package=""
declare g_job_nodes=""
declare g_job_entry=""
# ---------------------------------------------------------------------------- #
# const define #
# ---------------------------------------------------------------------------- #
declare -r CALL="x"
################################################################################
#-----------------------------------------------------------------------------------------------------------------
# Function: get_cur_path
# Description: get churrent path
# Parameter:
# input:
# N/A
# output:
# N/A
# Return: 0 -- success; not 0 -- failure
# Others: N/A
#-----------------------------------------------------------------------------------------------------------------
get_cur_path()
{
g_run_stage="get_cur_path"
cd "$(dirname "${BASH_SOURCE-$0}")"
g_curPath="${PWD}"
g_scriptName="$(basename "${BASH_SOURCE-$0}")"
cd - >/dev/null
}
#-----------------------------------------------------------------------------------------------------------------
#fun : get argument from env, set it into variables
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function vars_get_from_env() {
g_run_stage="vars_get_from_env"
g_hpc_path=${engine.}
g_crontabDate=$2
}
#-----------------------------------------------------------------------------------------------------------------
#fun : check function return code
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function check_error()
{
if [ ${?} -ne 0 ]
then
echo "execute " + $g_run_stage + " raise exception! please check ..."
exit 1
fi
}
#-----------------------------------------------------------------------------------------------------------------
#fun : package
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function package() {
g_run_stage="package"
}
#-----------------------------------------------------------------------------------------------------------------
#fun : before hook submit to cluster
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function before_submit() {
}
#-----------------------------------------------------------------------------------------------------------------
#fun : after hook submit to cluster
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function after_submit() {
}
#-----------------------------------------------------------------------------------------------------------------
#fun : submit to cluster
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function submit() {
g_run_stage="submit"
before_submit
${g_hpc_path}/bin/qsub_f \
-N ${g_job_name} \
--conf ${g_qsub_conf} \
--hdfs ${g_hdfs_path} \
--ugi ${g_hdfs_ugi} \
--hout ${g_hdfs_output} \
--files ${g_submit_package} \
-l nodes=${g_job_nodes},walltime=1000:00:00,resource=full ${g_job_entry}
after_submit
}
function main() {
get_cur_path
check_error
vars_get_from_env
check_error
package
check_error
submit
check_error
}
main
......@@ -136,7 +136,7 @@ def cluster_mpi_engine(args):
def local_cluster_engine(args):
from fleetrec.core.engine.local_cluster_engine import LocalClusterEngine
from fleetrec.core.engine.local_cluster import LocalClusterEngine
trainer = get_trainer_prefix(args) + "ClusterTrainer"
cluster_envs = {}
......@@ -162,7 +162,7 @@ def local_cluster_engine(args):
def local_mpi_engine(args):
print("launch cluster engine with cluster to run model: {}".format(args.model))
from fleetrec.core.engine.local_mpi_engine import LocalMPIEngine
from fleetrec.core.engine.local_mpi import LocalMPIEngine
print("use 1X1 MPI ClusterTraining at localhost to run model: {}".format(args.model))
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
engine:
backend: "MPI"
package:
build_script: "/xxxx/xx/x/x/"
python: "/home/tangwei/fleet_rec_env/cpython-2.7.11-ucs4"
paddlerec: "/home/tangwei/fleet_rec_env/FleetRec"
paddlepaddle: "/home/tangwei/fleet_rec_env/FleetRec"
submit:
submit_script: "xx"
job: "xxx"
conf: "qsub_f.conf"
hpc: "/home/tangwei/submit-tieba/smart_client/"
hdfs: "xx"
hout: "xxx"
ugi: "xxxx"
nodes: 10
before_hook: ""
end_hook: ""
define:
user1: "user_define1"
user2: "user_define2"
define: "user_define3"
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册