提交 1c8486fc 编写于 作者: T tangwei

Merge branch 'rec_mpi' into rec_develop

# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from __future__ import unicode_literals
import subprocess
import sys
import os
import copy
from fleetrec.core.engine.engine import Engine
from fleetrec.core.factory import TrainerFactory
from fleetrec.core.utils import envs
class ClusterEngine(Engine):
def __init_impl__(self):
abs_dir = os.path.dirname(os.path.abspath(__file__))
self.submit_script = os.path.join(abs_dir, "master.sh")
def start_worker_procs(self):
trainer = TrainerFactory.create(self.trainer)
trainer.run()
def start_master_procs(self):
default_env = os.environ.copy()
current_env = copy.copy(default_env)
current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None)
cmd = ("bash {}".format(self.submit_script)).split(" ")
proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd())
proc.wait()
def run(self):
role = envs.get_runtime_environ("engine_role")
if role == "MASTER":
self.start_master_procs()
elif role == "WORKER":
self.start_worker_procs()
else:
raise ValueError("role {} error, must in MASTER/WORKER".format(role))
#!/bin/bash
###################################################
# Usage: submit.sh
# Description: run mpi submit clinet
###################################################
# ---------------------------------------------------------------------------- #
# variable define #
# ---------------------------------------------------------------------------- #
declare g_curPath=""
declare g_scriptName=""
declare g_workPath=""
declare g_run_stage=""
# ---------------------------------------------------------------------------- #
# const define #
# ---------------------------------------------------------------------------- #
declare -r CALL="x"
################################################################################
#-----------------------------------------------------------------------------------------------------------------
# Function: get_cur_path
# Description: get churrent path
# Parameter:
# input:
# N/A
# output:
# N/A
# Return: 0 -- success; not 0 -- failure
# Others: N/A
#-----------------------------------------------------------------------------------------------------------------
get_cur_path() {
g_run_stage="get_cur_path"
cd "$(dirname "${BASH_SOURCE-$0}")"
g_curPath="${PWD}"
g_scriptName="$(basename "${BASH_SOURCE-$0}")"
cd - >/dev/null
}
#-----------------------------------------------------------------------------------------------------------------
#fun : check function return code
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function check_error() {
if [ ${?} -ne 0 ]; then
echo "execute " + $g_run_stage + " raise exception! please check ..."
exit 1
fi
}
source ${engine_submit_scrpit}
main
...@@ -7,6 +7,10 @@ class Engine: ...@@ -7,6 +7,10 @@ class Engine:
def __init__(self, envs, trainer): def __init__(self, envs, trainer):
self.envs = envs self.envs = envs
self.trainer = trainer self.trainer = trainer
self.__init_impl__()
def __init_impl__(self):
pass
@abc.abstractmethod @abc.abstractmethod
def run(self): def run(self):
......
...@@ -23,6 +23,7 @@ import paddle.fluid as fluid ...@@ -23,6 +23,7 @@ import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker
from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker
from fleetrec.core.utils import envs from fleetrec.core.utils import envs
from fleetrec.core.trainers.transpiler_trainer import TranspileTrainer from fleetrec.core.trainers.transpiler_trainer import TranspileTrainer
...@@ -30,7 +31,8 @@ from fleetrec.core.trainers.transpiler_trainer import TranspileTrainer ...@@ -30,7 +31,8 @@ from fleetrec.core.trainers.transpiler_trainer import TranspileTrainer
class ClusterTrainer(TranspileTrainer): class ClusterTrainer(TranspileTrainer):
def processor_register(self): def processor_register(self):
role = PaddleCloudRoleMaker() #role = PaddleCloudRoleMaker()
role = MPISymetricRoleMaker()
fleet.init(role) fleet.init(role)
if fleet.is_server(): if fleet.is_server():
......
...@@ -72,7 +72,8 @@ class TranspileTrainer(Trainer): ...@@ -72,7 +72,8 @@ class TranspileTrainer(Trainer):
train_data_path = envs.get_global_env( train_data_path = envs.get_global_env(
"test_data_path", None, namespace) "test_data_path", None, namespace)
threads = int(envs.get_runtime_environ("train.trainer.threads")) #threads = int(envs.get_runtime_environ("train.trainer.threads"))
threads = 2
batch_size = envs.get_global_env("batch_size", None, namespace) batch_size = envs.get_global_env("batch_size", None, namespace)
reader_class = envs.get_global_env("class", None, namespace) reader_class = envs.get_global_env("class", None, namespace)
abs_dir = os.path.dirname(os.path.abspath(__file__)) abs_dir = os.path.dirname(os.path.abspath(__file__))
......
...@@ -20,13 +20,13 @@ from contextlib import closing ...@@ -20,13 +20,13 @@ from contextlib import closing
global_envs = {} global_envs = {}
def flatten_environs(envs): def flatten_environs(envs, separator="."):
flatten_dict = {} flatten_dict = {}
assert isinstance(envs, dict) assert isinstance(envs, dict)
def fatten_env_namespace(namespace_nests, local_envs): def fatten_env_namespace(namespace_nests, local_envs):
if not isinstance(local_envs, dict): if not isinstance(local_envs, dict):
global_k = ".".join(namespace_nests) global_k = separator.join(namespace_nests)
flatten_dict[global_k] = str(local_envs) flatten_dict[global_k] = str(local_envs)
else: else:
for k, v in local_envs.items(): for k, v in local_envs.items():
...@@ -35,7 +35,7 @@ def flatten_environs(envs): ...@@ -35,7 +35,7 @@ def flatten_environs(envs):
nests.append(k) nests.append(k)
fatten_env_namespace(nests, v) fatten_env_namespace(nests, v)
else: else:
global_k = ".".join(namespace_nests + [k]) global_k = separator.join(namespace_nests + [k])
flatten_dict[global_k] = str(v) flatten_dict[global_k] = str(v)
for k, v in envs.items(): for k, v in envs.items():
......
import argparse import argparse
import os import os
import subprocess import subprocess
import tempfile
import yaml import yaml
from fleetrec.core.factory import TrainerFactory from fleetrec.core.factory import TrainerFactory
...@@ -108,18 +110,59 @@ def single_engine(args): ...@@ -108,18 +110,59 @@ def single_engine(args):
def cluster_engine(args): def cluster_engine(args):
def update_workspace(cluster_envs):
workspace = cluster_envs.get("engine_workspace", None)
if not workspace:
return
# is fleet inner models
if workspace.startswith("fleetrec."):
fleet_package = envs.get_runtime_environ("PACKAGE_BASE")
workspace_dir = workspace.split("fleetrec.")[1].replace(".", "/")
path = os.path.join(fleet_package, workspace_dir)
else:
path = workspace
for name, value in cluster_envs.items():
if isinstance(value, str):
value = value.replace("{workspace}", path)
cluster_envs[name] = value
def master():
from fleetrec.core.engine.cluster.cluster import ClusterEngine
with open(args.backend, 'r') as rb:
_envs = yaml.load(rb.read(), Loader=yaml.FullLoader)
flattens = envs.flatten_environs(_envs, "_")
flattens["engine_role"] = args.role
flattens["engine_temp_path"] = tempfile.mkdtemp()
update_workspace(flattens)
envs.set_runtime_environs(flattens)
print(envs.pretty_print_envs(flattens, ("Submit Runtime Envs", "Value")))
launch = ClusterEngine(None, args.model)
return launch
def worker():
trainer = get_trainer_prefix(args) + "ClusterTrainer" trainer = get_trainer_prefix(args) + "ClusterTrainer"
cluster_envs = {} cluster_envs = {}
cluster_envs["train.trainer.trainer"] = trainer cluster_envs["train.trainer.trainer"] = trainer
cluster_envs["train.trainer.engine"] = "cluster" cluster_envs["train.trainer.engine"] = "cluster"
cluster_envs["train.trainer.device"] = args.device cluster_envs["train.trainer.device"] = args.device
cluster_envs["train.trainer.platform"] = envs.get_platform() cluster_envs["train.trainer.platform"] = envs.get_platform()
print("launch {} engine with cluster to run model: {}".format(trainer, args.model)) print("launch {} engine with cluster to with model: {}".format(trainer, args.model))
set_runtime_envs(cluster_envs, args.model) set_runtime_envs(cluster_envs, args.model)
trainer = TrainerFactory.create(args.model) trainer = TrainerFactory.create(args.model)
return trainer return trainer
if args.role == "WORKER":
return worker()
else:
return master()
def cluster_mpi_engine(args): def cluster_mpi_engine(args):
print("launch cluster engine with cluster to run model: {}".format(args.model)) print("launch cluster engine with cluster to run model: {}".format(args.model))
...@@ -136,7 +179,7 @@ def cluster_mpi_engine(args): ...@@ -136,7 +179,7 @@ def cluster_mpi_engine(args):
def local_cluster_engine(args): def local_cluster_engine(args):
from fleetrec.core.engine.local_cluster_engine import LocalClusterEngine from fleetrec.core.engine.local_cluster import LocalClusterEngine
trainer = get_trainer_prefix(args) + "ClusterTrainer" trainer = get_trainer_prefix(args) + "ClusterTrainer"
cluster_envs = {} cluster_envs = {}
...@@ -162,7 +205,7 @@ def local_cluster_engine(args): ...@@ -162,7 +205,7 @@ def local_cluster_engine(args):
def local_mpi_engine(args): def local_mpi_engine(args):
print("launch cluster engine with cluster to run model: {}".format(args.model)) print("launch cluster engine with cluster to run model: {}".format(args.model))
from fleetrec.core.engine.local_mpi_engine import LocalMPIEngine from fleetrec.core.engine.local_mpi import LocalMPIEngine
print("use 1X1 MPI ClusterTraining at localhost to run model: {}".format(args.model)) print("use 1X1 MPI ClusterTraining at localhost to run model: {}".format(args.model))
...@@ -201,8 +244,10 @@ if __name__ == "__main__": ...@@ -201,8 +244,10 @@ if __name__ == "__main__":
parser.add_argument("-e", "--engine", type=str, parser.add_argument("-e", "--engine", type=str,
choices=["single", "local_cluster", "cluster", choices=["single", "local_cluster", "cluster",
"tdm_single", "tdm_local_cluster", "tdm_cluster"]) "tdm_single", "tdm_local_cluster", "tdm_cluster"])
parser.add_argument("-d", "--device", type=str,
choices=["cpu", "gpu"], default="cpu") parser.add_argument("-d", "--device", type=str, choices=["cpu", "gpu"], default="cpu")
parser.add_argument("-b", "--backend", type=str, default=None)
parser.add_argument("-r", "--role", type=str, choices=["master", "worker"], default="master")
abs_dir = os.path.dirname(os.path.abspath(__file__)) abs_dir = os.path.dirname(os.path.abspath(__file__))
envs.set_runtime_environs({"PACKAGE_BASE": abs_dir}) envs.set_runtime_environs({"PACKAGE_BASE": abs_dir})
...@@ -210,6 +255,8 @@ if __name__ == "__main__": ...@@ -210,6 +255,8 @@ if __name__ == "__main__":
args = parser.parse_args() args = parser.parse_args()
args.engine = args.engine.upper() args.engine = args.engine.upper()
args.device = args.device.upper() args.device = args.device.upper()
args.role = args.role.upper()
model_name = args.model.split('.')[-1] model_name = args.model.split('.')[-1]
args.model = get_abs_model(args.model) args.model = get_abs_model(args.model)
engine_registry() engine_registry()
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
engine:
workspace: "fleetrec.models.rank.dnn"
backend: "MPI"
hdfs:
name: "hdfs://nmg01-taihang-hdfs.dmop.baidu.com:54310"
ugi: "fcr,SaK2VqfEDeXzKPor"
output: "/app/ecom/fcr/fanyabo/wadstyleimageq/tangwei12/output_1/"
package:
build_script: "{workspace}/package.sh"
python: "/home/tangwei/fleet_rec_env/cpython-2.7.11-ucs4"
paddlerec: "/home/tangwei/fleet_rec_env/FleetRec"
submit:
hpc: "/home/tangwei/Plines/client/smart_client_khan/"
qconf: "/home/tangwei/Plines/imageq/qsub_f.conf"
nodes: 10
submit_scrpit: "{workspace}/submit.sh"
job_scrpit: "{workspace}/worker.sh"
#!/bin/bash
###################################################
# Usage: submit.sh
# Description: run mpi submit clinet implement
###################################################
#-----------------------------------------------------------------------------------------------------------------
#fun : get argument from env, set it into variables
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function vars_get_from_env() {
echo "xx"
}
#-----------------------------------------------------------------------------------------------------------------
#fun : package
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function package() {
g_run_stage="package"
temp=${engine_temp_path}
echo "package temp dir: " ${temp}
cp ${engine_job_scrpit} ${temp}
cp ${engine_submit_qconf} ${temp}
echo "copy job.sh from " ${engine_worker} " to " ${temp}
mkdir -p ${temp}/package
cp -r ${engine_package_python} ${temp}/package/
echo "copy python from " ${engine_package_python} " to " ${temp}
mkdir ${temp}/package/whl
cp ${engine_package_paddlerec} ${temp}/package/whl/
echo "copy " ${engine_package_paddlerec} " to " ${temp}"/whl/"
}
#-----------------------------------------------------------------------------------------------------------------
#fun : before hook submit to cluster
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function before_submit() {
echo "before_submit"
}
#-----------------------------------------------------------------------------------------------------------------
#fun : after hook submit to cluster
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function after_submit() {
echo "after_submit"
}
#-----------------------------------------------------------------------------------------------------------------
#fun : submit to cluster
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function submit() {
g_run_stage="submit"
g_job_name="paddle_rec_mpi"
g_hdfs_path=$g_hdfs_path
g_job_entry="worker.sh"
engine_hdfs_output=${engine_hdfs_output}/$(date +%Y%m%d%H%M%S)
cd ${engine_temp_path}
${engine_submit_hpc}/bin/qsub_f \
-N ${g_job_name} \
--conf ${engine_submit_qconf} \
--hdfs ${engine_hdfs_name} \
--ugi ${engine_hdfs_ugi} \
--hout ${engine_hdfs_output} \
--files ./package \
-l nodes=${engine_submit_nodes},walltime=1000:00:00,resource=full ${g_job_entry}
}
function main() {
package
before_submit
submit
after_submit
}
#!/bin/bash
###################################################
# Usage: job.sh
# Description: run job on mpi per node
###################################################
# ---------------------------------------------------------------------------- #
# variable define #
# ---------------------------------------------------------------------------- #
declare g_curPath=""
declare g_scriptName=""
declare g_workPath=""
declare g_run_stage=""
# ---------------------------------------------------------------------------- #
# const define #
# ---------------------------------------------------------------------------- #
export FLAGS_communicator_thread_pool_size=5
export FLAGS_communicator_send_queue_size=18
export FLAGS_communicator_thread_pool_size=20
export FLAGS_communicator_max_merge_var_num=18
################################################################################
#-----------------------------------------------------------------------------------------------------------------
#fun : check function return code
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function check_error() {
if [ ${?} -ne 0 ]; then
echo "execute " + $g_run_stage + " raise exception! please check ..."
exit 1
fi
}
#-----------------------------------------------------------------------------------------------------------------
#fun : check function return code
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function env_prepare() {
g_run_stage="env_prepare"
WORKDIR=$(pwd)
mpirun -npernode 1 mv package/* ./
echo "current:"$WORKDIR
mpirun -npernode 1 tar -zxvf python.tar.gz > /dev/null
export PYTHONPATH=$WORKDIR/python/
export PYTHONROOT=$WORKDIR/python/
export LIBRARY_PATH=$PYTHONPATH/lib:$LIBRARY_PATH
export LD_LIBRARY_PATH=$PYTHONPATH/lib:$LD_LIBRARY_PATH
export PATH=$PYTHONPATH/bin:$PATH
export LIBRARY_PATH=$PYTHONROOT/lib:$LIBRARY_PATH
python -c "print('heheda')"
mpirun -npernode 1 python/bin/python -m pip uninstall -y fleet-rec
mpirun -npernode 1 python/bin/python -m pip install whl/fleet_rec-0.0.2-py2-none-any.whl --index-url=http://pip.baidu.com/pypi/simple --trusted-host pip.baidu.com
check_error
}
function run() {
echo "run"
g_run_stage="run"
mpirun -npernode 2 -timestamp-output -tag-output -machinefile ${PBS_NODEFILE} python/bin/python -u -m fleetrec.run -m fleetrec.models.rank.dnn --engine cluster --role worker
}
function main() {
env_prepare
run
}
main
...@@ -39,10 +39,14 @@ def build(dirname): ...@@ -39,10 +39,14 @@ def build(dirname):
packages = find_packages(dirname, include=('fleetrec.*')) packages = find_packages(dirname, include=('fleetrec.*'))
package_dir = {'': dirname} package_dir = {'': dirname}
package_data = {} package_data = {}
need_copy = ['data/*.txt', 'data/*/*.txt', '*.yaml', 'tree/*.npy','tree/*.txt']
models_copy = ['data/*.txt', 'data/*/*.txt', '*.yaml', '*.sh', 'tree/*.npy', 'tree/*.txt']
engine_copy = ['*/*.sh']
for package in packages: for package in packages:
if package.startswith("fleetrec.models."): if package.startswith("fleetrec.models."):
package_data[package] = need_copy package_data[package] = models_copy
if package.startswith("fleetrec.core.engine"):
package_data[package] = engine_copy
setup( setup(
name=about["__title__"], name=about["__title__"],
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册