diff --git a/fleet_rec/core/engine/engine.py b/fleet_rec/core/engine/engine.py index 66c43bca3f2121531e8477814d87adc36e6c6096..afe00dd931752399c35215ae7a306124e22630b1 100755 --- a/fleet_rec/core/engine/engine.py +++ b/fleet_rec/core/engine/engine.py @@ -7,6 +7,10 @@ class Engine: def __init__(self, envs, trainer): self.envs = envs self.trainer = trainer + self.__init_impl__() + + def __init_impl__(self): + pass @abc.abstractmethod def run(self): diff --git a/fleet_rec/core/engine/local_cluster_engine.py b/fleet_rec/core/engine/local_cluster.py similarity index 100% rename from fleet_rec/core/engine/local_cluster_engine.py rename to fleet_rec/core/engine/local_cluster.py diff --git a/fleet_rec/core/engine/local_mpi_engine.py b/fleet_rec/core/engine/local_mpi.py similarity index 100% rename from fleet_rec/core/engine/local_mpi_engine.py rename to fleet_rec/core/engine/local_mpi.py diff --git a/fleet_rec/core/engine/mpi_cluster/__init__.py b/fleet_rec/core/engine/mpi_cluster/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/fleet_rec/core/engine/mpi_cluster/cluster.py b/fleet_rec/core/engine/mpi_cluster/cluster.py new file mode 100644 index 0000000000000000000000000000000000000000..b8756aebd5ef536c05fd0124016c9b177fb25848 --- /dev/null +++ b/fleet_rec/core/engine/mpi_cluster/cluster.py @@ -0,0 +1,45 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +from __future__ import unicode_literals + +import subprocess +import sys +import os +import copy + +from fleetrec.core.engine.engine import Engine + + +class QSubClusterEngine(Engine): + def __init_impl__(self): + abs_dir = os.path.dirname(os.path.abspath(__file__)) + self.submit_script = os.path.join(abs_dir, "submit.sh") + self.job_script = os.path.join(abs_dir, "job.sh") + + def start_procs(self): + default_env = os.environ.copy() + current_env = copy.copy(default_env) + current_env.pop("http_proxy", None) + current_env.pop("https_proxy", None) + + cmd = ("bash {}".format(self.submit_script)).split(" ") + proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd()) + proc.wait() + + print("all workers and parameter servers already completed", file=sys.stderr) + + def run(self): + self.start_procs() diff --git a/fleet_rec/core/engine/mpi_cluster/job.sh b/fleet_rec/core/engine/mpi_cluster/job.sh new file mode 100644 index 0000000000000000000000000000000000000000..b287eb15d855a5d359cf35322a0c5220fc6233ae --- /dev/null +++ b/fleet_rec/core/engine/mpi_cluster/job.sh @@ -0,0 +1,61 @@ +#!/bin/bash + +################################################### +# Usage: job.sh +# Description: run job on mpi per node +################################################### + +# ---------------------------------------------------------------------------- # +# variable define # +# ---------------------------------------------------------------------------- # +declare g_curPath="" +declare g_scriptName="" +declare g_workPath="" +declare g_run_stage="" + +# ---------------------------------------------------------------------------- # +# const define # +# ---------------------------------------------------------------------------- # +declare -r FLAGS_communicator_thread_pool_size=5 +declare -r FLAGS_communicator_send_queue_size=18 +declare -r FLAGS_communicator_thread_pool_size=20 +declare -r FLAGS_communicator_max_merge_var_num=18 +################################################################################ + +#----------------------------------------------------------------------------------------------------------------- +#fun : check function return code +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function check_error() { + if [ ${?} -ne 0 ]; then + echo "execute " + $g_run_stage + " raise exception! please check ..." + exit 1 + fi +} + +#----------------------------------------------------------------------------------------------------------------- +#fun : check function return code +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function env_prepare() { + g_run_stage="env_prepare" +} + +function user_define_variables() { + echo "user_define_variables" + g_run_stage="user_define_variables" +} + +function job() { + mpirun -npernode 2 -timestamp-output -tag-output -machinefile ${PBS_NODEFILE} python -u ${g_job_entry} +} + +function main() { + user_define_variables + env_prepare + job +} + +main diff --git a/fleet_rec/core/engine/mpi_cluster/submit.sh b/fleet_rec/core/engine/mpi_cluster/submit.sh new file mode 100644 index 0000000000000000000000000000000000000000..2382911d62807ea2665ff5a124a6deae024dc59f --- /dev/null +++ b/fleet_rec/core/engine/mpi_cluster/submit.sh @@ -0,0 +1,149 @@ +#!/bin/bash + +################################################### +# Usage: submit.sh +# Description: run mpi submit clinet +################################################### + +# ---------------------------------------------------------------------------- # +# variable define # +# ---------------------------------------------------------------------------- # +declare g_curPath="" +declare g_scriptName="" +declare g_workPath="" +declare g_run_stage="" + +# ----------------------------for hpc submit -------------------------------- # +declare g_hpc_path="" +declare g_job_name="" +declare g_qsub_conf="" +declare g_hdfs_path="" +declare g_hdfs_ugi="" +declare g_hdfs_output="" +declare g_submit_package="" +declare g_job_nodes="" +declare g_job_entry="" + +# ---------------------------------------------------------------------------- # +# const define # +# ---------------------------------------------------------------------------- # +declare -r CALL="x" +################################################################################ + + +#----------------------------------------------------------------------------------------------------------------- +# Function: get_cur_path +# Description: get churrent path +# Parameter: +# input: +# N/A +# output: +# N/A +# Return: 0 -- success; not 0 -- failure +# Others: N/A +#----------------------------------------------------------------------------------------------------------------- +get_cur_path() +{ + g_run_stage="get_cur_path" + cd "$(dirname "${BASH_SOURCE-$0}")" + g_curPath="${PWD}" + g_scriptName="$(basename "${BASH_SOURCE-$0}")" + cd - >/dev/null +} + + +#----------------------------------------------------------------------------------------------------------------- +#fun : get argument from env, set it into variables +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function vars_get_from_env() { + g_run_stage="vars_get_from_env" + g_hpc_path=${engine.} + g_crontabDate=$2 +} + + +#----------------------------------------------------------------------------------------------------------------- +#fun : check function return code +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function check_error() +{ + if [ ${?} -ne 0 ] + then + echo "execute " + $g_run_stage + " raise exception! please check ..." + exit 1 + fi +} + + +#----------------------------------------------------------------------------------------------------------------- +#fun : package +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function package() { + g_run_stage="package" + +} + + +#----------------------------------------------------------------------------------------------------------------- +#fun : before hook submit to cluster +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function before_submit() { + +} + + +#----------------------------------------------------------------------------------------------------------------- +#fun : after hook submit to cluster +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function after_submit() { + +} + + +#----------------------------------------------------------------------------------------------------------------- +#fun : submit to cluster +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function submit() { + g_run_stage="submit" + + before_submit + + ${g_hpc_path}/bin/qsub_f \ + -N ${g_job_name} \ + --conf ${g_qsub_conf} \ + --hdfs ${g_hdfs_path} \ + --ugi ${g_hdfs_ugi} \ + --hout ${g_hdfs_output} \ + --files ${g_submit_package} \ + -l nodes=${g_job_nodes},walltime=1000:00:00,resource=full ${g_job_entry} + + after_submit +} + +function main() { + get_cur_path + check_error + + vars_get_from_env + check_error + + package + check_error + + submit + check_error +} + +main diff --git a/fleet_rec/run.py b/fleet_rec/run.py index d40cb99420cde906c39cbe34eaa64bdf8e74dbd2..2dda11e7feffe714f1a5e2b60b2bc193f22ef872 100755 --- a/fleet_rec/run.py +++ b/fleet_rec/run.py @@ -136,7 +136,7 @@ def cluster_mpi_engine(args): def local_cluster_engine(args): - from fleetrec.core.engine.local_cluster_engine import LocalClusterEngine + from fleetrec.core.engine.local_cluster import LocalClusterEngine trainer = get_trainer_prefix(args) + "ClusterTrainer" cluster_envs = {} @@ -162,7 +162,7 @@ def local_cluster_engine(args): def local_mpi_engine(args): print("launch cluster engine with cluster to run model: {}".format(args.model)) - from fleetrec.core.engine.local_mpi_engine import LocalMPIEngine + from fleetrec.core.engine.local_mpi import LocalMPIEngine print("use 1X1 MPI ClusterTraining at localhost to run model: {}".format(args.model)) diff --git a/models/rank/dnn/engine.yaml b/models/rank/dnn/engine.yaml new file mode 100755 index 0000000000000000000000000000000000000000..0b730092c1c29148768271105c8414216fcd88d2 --- /dev/null +++ b/models/rank/dnn/engine.yaml @@ -0,0 +1,39 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +engine: + backend: "MPI" + + package: + build_script: "/xxxx/xx/x/x/" + python: "/home/tangwei/fleet_rec_env/cpython-2.7.11-ucs4" + paddlerec: "/home/tangwei/fleet_rec_env/FleetRec" + paddlepaddle: "/home/tangwei/fleet_rec_env/FleetRec" + + submit: + submit_script: "xx" + job: "xxx" + conf: "qsub_f.conf" + hpc: "/home/tangwei/submit-tieba/smart_client/" + hdfs: "xx" + hout: "xxx" + ugi: "xxxx" + nodes: 10 + before_hook: "" + end_hook: "" + + define: + user1: "user_define1" + user2: "user_define2" + define: "user_define3" \ No newline at end of file