diff --git a/fleet_rec/core/engine/cluster/__init__.py b/fleet_rec/core/engine/cluster/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/fleet_rec/core/engine/cluster/cluster.py b/fleet_rec/core/engine/cluster/cluster.py new file mode 100644 index 0000000000000000000000000000000000000000..7386d36e6d2a26376e115e3ab5f1fb3058c542ec --- /dev/null +++ b/fleet_rec/core/engine/cluster/cluster.py @@ -0,0 +1,57 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +from __future__ import unicode_literals + +import subprocess +import sys +import os +import copy + +from fleetrec.core.engine.engine import Engine +from fleetrec.core.factory import TrainerFactory +from fleetrec.core.utils import envs + + +class ClusterEngine(Engine): + def __init_impl__(self): + abs_dir = os.path.dirname(os.path.abspath(__file__)) + self.submit_script = os.path.join(abs_dir, "master.sh") + + def start_worker_procs(self): + trainer = TrainerFactory.create(self.trainer) + trainer.run() + + def start_master_procs(self): + default_env = os.environ.copy() + current_env = copy.copy(default_env) + current_env.pop("http_proxy", None) + current_env.pop("https_proxy", None) + + cmd = ("bash {}".format(self.submit_script)).split(" ") + proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd()) + proc.wait() + + def run(self): + role = envs.get_runtime_environ("engine_role") + + if role == "MASTER": + self.start_master_procs() + + elif role == "WORKER": + self.start_worker_procs() + + else: + raise ValueError("role {} error, must in MASTER/WORKER".format(role)) diff --git a/fleet_rec/core/engine/cluster/master.sh b/fleet_rec/core/engine/cluster/master.sh new file mode 100644 index 0000000000000000000000000000000000000000..47db58520ad4c0e03518a3e4d4ab24846632d290 --- /dev/null +++ b/fleet_rec/core/engine/cluster/master.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +################################################### +# Usage: submit.sh +# Description: run mpi submit clinet +################################################### + +# ---------------------------------------------------------------------------- # +# variable define # +# ---------------------------------------------------------------------------- # +declare g_curPath="" +declare g_scriptName="" +declare g_workPath="" +declare g_run_stage="" + +# ---------------------------------------------------------------------------- # +# const define # +# ---------------------------------------------------------------------------- # +declare -r CALL="x" +################################################################################ + +#----------------------------------------------------------------------------------------------------------------- +# Function: get_cur_path +# Description: get churrent path +# Parameter: +# input: +# N/A +# output: +# N/A +# Return: 0 -- success; not 0 -- failure +# Others: N/A +#----------------------------------------------------------------------------------------------------------------- +get_cur_path() { + g_run_stage="get_cur_path" + cd "$(dirname "${BASH_SOURCE-$0}")" + g_curPath="${PWD}" + g_scriptName="$(basename "${BASH_SOURCE-$0}")" + cd - >/dev/null +} + +#----------------------------------------------------------------------------------------------------------------- +#fun : check function return code +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function check_error() { + if [ ${?} -ne 0 ]; then + echo "execute " + $g_run_stage + " raise exception! please check ..." + exit 1 + fi +} + +source ${engine_submit_scrpit} + +main diff --git a/fleet_rec/core/engine/engine.py b/fleet_rec/core/engine/engine.py index 66c43bca3f2121531e8477814d87adc36e6c6096..afe00dd931752399c35215ae7a306124e22630b1 100755 --- a/fleet_rec/core/engine/engine.py +++ b/fleet_rec/core/engine/engine.py @@ -7,6 +7,10 @@ class Engine: def __init__(self, envs, trainer): self.envs = envs self.trainer = trainer + self.__init_impl__() + + def __init_impl__(self): + pass @abc.abstractmethod def run(self): diff --git a/fleet_rec/core/engine/local_cluster_engine.py b/fleet_rec/core/engine/local_cluster.py similarity index 100% rename from fleet_rec/core/engine/local_cluster_engine.py rename to fleet_rec/core/engine/local_cluster.py diff --git a/fleet_rec/core/engine/local_mpi_engine.py b/fleet_rec/core/engine/local_mpi.py similarity index 100% rename from fleet_rec/core/engine/local_mpi_engine.py rename to fleet_rec/core/engine/local_mpi.py diff --git a/fleet_rec/core/trainers/cluster_trainer.py b/fleet_rec/core/trainers/cluster_trainer.py index 4635d91020888666a3b3449e6dfd8994ff45a6cf..bc5a91732febcc470554ecfa07cf5fb11518546f 100755 --- a/fleet_rec/core/trainers/cluster_trainer.py +++ b/fleet_rec/core/trainers/cluster_trainer.py @@ -23,6 +23,7 @@ import paddle.fluid as fluid from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker +from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker from fleetrec.core.utils import envs from fleetrec.core.trainers.transpiler_trainer import TranspileTrainer @@ -30,7 +31,8 @@ from fleetrec.core.trainers.transpiler_trainer import TranspileTrainer class ClusterTrainer(TranspileTrainer): def processor_register(self): - role = PaddleCloudRoleMaker() + #role = PaddleCloudRoleMaker() + role = MPISymetricRoleMaker() fleet.init(role) if fleet.is_server(): diff --git a/fleet_rec/core/trainers/transpiler_trainer.py b/fleet_rec/core/trainers/transpiler_trainer.py index 96afab2164f9f1e815ae0c1c48249ea0fa109dbf..683b921e33de481e26cc1ff93a8300ddf1475d51 100755 --- a/fleet_rec/core/trainers/transpiler_trainer.py +++ b/fleet_rec/core/trainers/transpiler_trainer.py @@ -72,7 +72,8 @@ class TranspileTrainer(Trainer): train_data_path = envs.get_global_env( "test_data_path", None, namespace) - threads = int(envs.get_runtime_environ("train.trainer.threads")) + #threads = int(envs.get_runtime_environ("train.trainer.threads")) + threads = 2 batch_size = envs.get_global_env("batch_size", None, namespace) reader_class = envs.get_global_env("class", None, namespace) abs_dir = os.path.dirname(os.path.abspath(__file__)) diff --git a/fleet_rec/core/utils/envs.py b/fleet_rec/core/utils/envs.py index 2d4cfadea305e2d42bd454f3ddc4048dd593945a..93c9a68af6ba31906b0a387fc52bd2e943ab010a 100755 --- a/fleet_rec/core/utils/envs.py +++ b/fleet_rec/core/utils/envs.py @@ -20,13 +20,13 @@ from contextlib import closing global_envs = {} -def flatten_environs(envs): +def flatten_environs(envs, separator="."): flatten_dict = {} assert isinstance(envs, dict) def fatten_env_namespace(namespace_nests, local_envs): if not isinstance(local_envs, dict): - global_k = ".".join(namespace_nests) + global_k = separator.join(namespace_nests) flatten_dict[global_k] = str(local_envs) else: for k, v in local_envs.items(): @@ -35,7 +35,7 @@ def flatten_environs(envs): nests.append(k) fatten_env_namespace(nests, v) else: - global_k = ".".join(namespace_nests + [k]) + global_k = separator.join(namespace_nests + [k]) flatten_dict[global_k] = str(v) for k, v in envs.items(): diff --git a/fleet_rec/run.py b/fleet_rec/run.py index d40cb99420cde906c39cbe34eaa64bdf8e74dbd2..aa8d59655971851238df32d89c2484afc5c3fd1b 100755 --- a/fleet_rec/run.py +++ b/fleet_rec/run.py @@ -1,6 +1,8 @@ import argparse import os import subprocess +import tempfile + import yaml from fleetrec.core.factory import TrainerFactory @@ -108,17 +110,58 @@ def single_engine(args): def cluster_engine(args): - trainer = get_trainer_prefix(args) + "ClusterTrainer" - cluster_envs = {} - cluster_envs["train.trainer.trainer"] = trainer - cluster_envs["train.trainer.engine"] = "cluster" - cluster_envs["train.trainer.device"] = args.device - cluster_envs["train.trainer.platform"] = envs.get_platform() - print("launch {} engine with cluster to run model: {}".format(trainer, args.model)) - set_runtime_envs(cluster_envs, args.model) - trainer = TrainerFactory.create(args.model) - return trainer + def update_workspace(cluster_envs): + workspace = cluster_envs.get("engine_workspace", None) + if not workspace: + return + + # is fleet inner models + if workspace.startswith("fleetrec."): + fleet_package = envs.get_runtime_environ("PACKAGE_BASE") + workspace_dir = workspace.split("fleetrec.")[1].replace(".", "/") + path = os.path.join(fleet_package, workspace_dir) + else: + path = workspace + + for name, value in cluster_envs.items(): + if isinstance(value, str): + value = value.replace("{workspace}", path) + cluster_envs[name] = value + + def master(): + from fleetrec.core.engine.cluster.cluster import ClusterEngine + with open(args.backend, 'r') as rb: + _envs = yaml.load(rb.read(), Loader=yaml.FullLoader) + + flattens = envs.flatten_environs(_envs, "_") + flattens["engine_role"] = args.role + flattens["engine_temp_path"] = tempfile.mkdtemp() + update_workspace(flattens) + + envs.set_runtime_environs(flattens) + print(envs.pretty_print_envs(flattens, ("Submit Runtime Envs", "Value"))) + + launch = ClusterEngine(None, args.model) + return launch + + def worker(): + trainer = get_trainer_prefix(args) + "ClusterTrainer" + cluster_envs = {} + cluster_envs["train.trainer.trainer"] = trainer + cluster_envs["train.trainer.engine"] = "cluster" + cluster_envs["train.trainer.device"] = args.device + cluster_envs["train.trainer.platform"] = envs.get_platform() + print("launch {} engine with cluster to with model: {}".format(trainer, args.model)) + set_runtime_envs(cluster_envs, args.model) + + trainer = TrainerFactory.create(args.model) + return trainer + + if args.role == "WORKER": + return worker() + else: + return master() def cluster_mpi_engine(args): @@ -136,7 +179,7 @@ def cluster_mpi_engine(args): def local_cluster_engine(args): - from fleetrec.core.engine.local_cluster_engine import LocalClusterEngine + from fleetrec.core.engine.local_cluster import LocalClusterEngine trainer = get_trainer_prefix(args) + "ClusterTrainer" cluster_envs = {} @@ -162,7 +205,7 @@ def local_cluster_engine(args): def local_mpi_engine(args): print("launch cluster engine with cluster to run model: {}".format(args.model)) - from fleetrec.core.engine.local_mpi_engine import LocalMPIEngine + from fleetrec.core.engine.local_mpi import LocalMPIEngine print("use 1X1 MPI ClusterTraining at localhost to run model: {}".format(args.model)) @@ -201,8 +244,10 @@ if __name__ == "__main__": parser.add_argument("-e", "--engine", type=str, choices=["single", "local_cluster", "cluster", "tdm_single", "tdm_local_cluster", "tdm_cluster"]) - parser.add_argument("-d", "--device", type=str, - choices=["cpu", "gpu"], default="cpu") + + parser.add_argument("-d", "--device", type=str, choices=["cpu", "gpu"], default="cpu") + parser.add_argument("-b", "--backend", type=str, default=None) + parser.add_argument("-r", "--role", type=str, choices=["master", "worker"], default="master") abs_dir = os.path.dirname(os.path.abspath(__file__)) envs.set_runtime_environs({"PACKAGE_BASE": abs_dir}) @@ -210,6 +255,8 @@ if __name__ == "__main__": args = parser.parse_args() args.engine = args.engine.upper() args.device = args.device.upper() + args.role = args.role.upper() + model_name = args.model.split('.')[-1] args.model = get_abs_model(args.model) engine_registry() diff --git a/models/rank/dnn/backend.yaml b/models/rank/dnn/backend.yaml new file mode 100755 index 0000000000000000000000000000000000000000..60529faf9fc02bd7cb7a182dc0e2ff2a12ad8af2 --- /dev/null +++ b/models/rank/dnn/backend.yaml @@ -0,0 +1,35 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +engine: + workspace: "fleetrec.models.rank.dnn" + backend: "MPI" + + hdfs: + name: "hdfs://nmg01-taihang-hdfs.dmop.baidu.com:54310" + ugi: "fcr,SaK2VqfEDeXzKPor" + output: "/app/ecom/fcr/fanyabo/wadstyleimageq/tangwei12/output_1/" + + package: + build_script: "{workspace}/package.sh" + python: "/home/tangwei/fleet_rec_env/cpython-2.7.11-ucs4" + paddlerec: "/home/tangwei/fleet_rec_env/FleetRec" + + submit: + hpc: "/home/tangwei/Plines/client/smart_client_khan/" + qconf: "/home/tangwei/Plines/imageq/qsub_f.conf" + nodes: 10 + + submit_scrpit: "{workspace}/submit.sh" + job_scrpit: "{workspace}/worker.sh" diff --git a/models/rank/dnn/submit.sh b/models/rank/dnn/submit.sh new file mode 100644 index 0000000000000000000000000000000000000000..56b5f8798f0e4181dfd54d9e831078e4b1533d39 --- /dev/null +++ b/models/rank/dnn/submit.sh @@ -0,0 +1,90 @@ +#!/bin/bash + +################################################### +# Usage: submit.sh +# Description: run mpi submit clinet implement +################################################### + +#----------------------------------------------------------------------------------------------------------------- +#fun : get argument from env, set it into variables +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function vars_get_from_env() { + echo "xx" +} + +#----------------------------------------------------------------------------------------------------------------- +#fun : package +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function package() { + g_run_stage="package" + + temp=${engine_temp_path} + echo "package temp dir: " ${temp} + + cp ${engine_job_scrpit} ${temp} + cp ${engine_submit_qconf} ${temp} + echo "copy job.sh from " ${engine_worker} " to " ${temp} + + mkdir -p ${temp}/package + cp -r ${engine_package_python} ${temp}/package/ + echo "copy python from " ${engine_package_python} " to " ${temp} + + mkdir ${temp}/package/whl + cp ${engine_package_paddlerec} ${temp}/package/whl/ + echo "copy " ${engine_package_paddlerec} " to " ${temp}"/whl/" +} + +#----------------------------------------------------------------------------------------------------------------- +#fun : before hook submit to cluster +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function before_submit() { + echo "before_submit" +} + +#----------------------------------------------------------------------------------------------------------------- +#fun : after hook submit to cluster +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function after_submit() { + echo "after_submit" +} + +#----------------------------------------------------------------------------------------------------------------- +#fun : submit to cluster +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function submit() { + g_run_stage="submit" + g_job_name="paddle_rec_mpi" + g_hdfs_path=$g_hdfs_path + g_job_entry="worker.sh" + + engine_hdfs_output=${engine_hdfs_output}/$(date +%Y%m%d%H%M%S) + + cd ${engine_temp_path} + + ${engine_submit_hpc}/bin/qsub_f \ + -N ${g_job_name} \ + --conf ${engine_submit_qconf} \ + --hdfs ${engine_hdfs_name} \ + --ugi ${engine_hdfs_ugi} \ + --hout ${engine_hdfs_output} \ + --files ./package \ + -l nodes=${engine_submit_nodes},walltime=1000:00:00,resource=full ${g_job_entry} +} + +function main() { + package + + before_submit + submit + after_submit +} diff --git a/models/rank/dnn/worker.sh b/models/rank/dnn/worker.sh new file mode 100644 index 0000000000000000000000000000000000000000..3ca2a1f07dd07bd45599f23a60dd4c47f2c72700 --- /dev/null +++ b/models/rank/dnn/worker.sh @@ -0,0 +1,75 @@ +#!/bin/bash + +################################################### +# Usage: job.sh +# Description: run job on mpi per node +################################################### + +# ---------------------------------------------------------------------------- # +# variable define # +# ---------------------------------------------------------------------------- # +declare g_curPath="" +declare g_scriptName="" +declare g_workPath="" +declare g_run_stage="" + +# ---------------------------------------------------------------------------- # +# const define # +# ---------------------------------------------------------------------------- # +export FLAGS_communicator_thread_pool_size=5 +export FLAGS_communicator_send_queue_size=18 +export FLAGS_communicator_thread_pool_size=20 +export FLAGS_communicator_max_merge_var_num=18 +################################################################################ + +#----------------------------------------------------------------------------------------------------------------- +#fun : check function return code +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function check_error() { + if [ ${?} -ne 0 ]; then + echo "execute " + $g_run_stage + " raise exception! please check ..." + exit 1 + fi +} + +#----------------------------------------------------------------------------------------------------------------- +#fun : check function return code +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function env_prepare() { + g_run_stage="env_prepare" + WORKDIR=$(pwd) + mpirun -npernode 1 mv package/* ./ + echo "current:"$WORKDIR + + mpirun -npernode 1 tar -zxvf python.tar.gz > /dev/null + + export PYTHONPATH=$WORKDIR/python/ + export PYTHONROOT=$WORKDIR/python/ + export LIBRARY_PATH=$PYTHONPATH/lib:$LIBRARY_PATH + export LD_LIBRARY_PATH=$PYTHONPATH/lib:$LD_LIBRARY_PATH + export PATH=$PYTHONPATH/bin:$PATH + export LIBRARY_PATH=$PYTHONROOT/lib:$LIBRARY_PATH + + python -c "print('heheda')" + + mpirun -npernode 1 python/bin/python -m pip uninstall -y fleet-rec + mpirun -npernode 1 python/bin/python -m pip install whl/fleet_rec-0.0.2-py2-none-any.whl --index-url=http://pip.baidu.com/pypi/simple --trusted-host pip.baidu.com + check_error +} + +function run() { + echo "run" + g_run_stage="run" + mpirun -npernode 2 -timestamp-output -tag-output -machinefile ${PBS_NODEFILE} python/bin/python -u -m fleetrec.run -m fleetrec.models.rank.dnn --engine cluster --role worker +} + +function main() { + env_prepare + run +} + +main diff --git a/setup.py b/setup.py index 80e39b5533e050e2307a436865ce14e44d797773..3dd1a1e9686fd447d91b3099ea14506b505aa7b5 100644 --- a/setup.py +++ b/setup.py @@ -39,10 +39,14 @@ def build(dirname): packages = find_packages(dirname, include=('fleetrec.*')) package_dir = {'': dirname} package_data = {} - need_copy = ['data/*.txt', 'data/*/*.txt', '*.yaml', 'tree/*.npy','tree/*.txt'] + + models_copy = ['data/*.txt', 'data/*/*.txt', '*.yaml', '*.sh', 'tree/*.npy', 'tree/*.txt'] + engine_copy = ['*/*.sh'] for package in packages: if package.startswith("fleetrec.models."): - package_data[package] = need_copy + package_data[package] = models_copy + if package.startswith("fleetrec.core.engine"): + package_data[package] = engine_copy setup( name=about["__title__"], @@ -67,10 +71,10 @@ shutil.rmtree(dirname) print(''' \033[32m - _ _ _ _ _ _ _ _ _ - / \ / \ / \ / \ / \ / \ / \ / \ / \ + _ _ _ _ _ _ _ _ _ + / \ / \ / \ / \ / \ / \ / \ / \ / \ ( F | L | E | E | T | - | R | E | C ) - \_/ \_/ \_/ \_/ \_/ \_/ \_/ \_/ \_/ + \_/ \_/ \_/ \_/ \_/ \_/ \_/ \_/ \_/ \033[0m \033[34m Installation Complete. Congratulations!