diff --git a/fleet_rec/core/trainers/cluster_trainer.py b/fleet_rec/core/trainers/cluster_trainer.py index 4635d91020888666a3b3449e6dfd8994ff45a6cf..bc5a91732febcc470554ecfa07cf5fb11518546f 100755 --- a/fleet_rec/core/trainers/cluster_trainer.py +++ b/fleet_rec/core/trainers/cluster_trainer.py @@ -23,6 +23,7 @@ import paddle.fluid as fluid from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker +from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker from fleetrec.core.utils import envs from fleetrec.core.trainers.transpiler_trainer import TranspileTrainer @@ -30,7 +31,8 @@ from fleetrec.core.trainers.transpiler_trainer import TranspileTrainer class ClusterTrainer(TranspileTrainer): def processor_register(self): - role = PaddleCloudRoleMaker() + #role = PaddleCloudRoleMaker() + role = MPISymetricRoleMaker() fleet.init(role) if fleet.is_server(): diff --git a/fleet_rec/core/trainers/transpiler_trainer.py b/fleet_rec/core/trainers/transpiler_trainer.py index 96afab2164f9f1e815ae0c1c48249ea0fa109dbf..683b921e33de481e26cc1ff93a8300ddf1475d51 100755 --- a/fleet_rec/core/trainers/transpiler_trainer.py +++ b/fleet_rec/core/trainers/transpiler_trainer.py @@ -72,7 +72,8 @@ class TranspileTrainer(Trainer): train_data_path = envs.get_global_env( "test_data_path", None, namespace) - threads = int(envs.get_runtime_environ("train.trainer.threads")) + #threads = int(envs.get_runtime_environ("train.trainer.threads")) + threads = 2 batch_size = envs.get_global_env("batch_size", None, namespace) reader_class = envs.get_global_env("class", None, namespace) abs_dir = os.path.dirname(os.path.abspath(__file__)) diff --git a/fleet_rec/run.py b/fleet_rec/run.py index 34adbcd76cb47b8dca80893d8e61698c09b1aeaf..aa8d59655971851238df32d89c2484afc5c3fd1b 100755 --- a/fleet_rec/run.py +++ b/fleet_rec/run.py @@ -110,7 +110,6 @@ def single_engine(args): def cluster_engine(args): - from fleetrec.core.engine.cluster.cluster import ClusterEngine def update_workspace(cluster_envs): workspace = cluster_envs.get("engine_workspace", None) @@ -131,6 +130,7 @@ def cluster_engine(args): cluster_envs[name] = value def master(): + from fleetrec.core.engine.cluster.cluster import ClusterEngine with open(args.backend, 'r') as rb: _envs = yaml.load(rb.read(), Loader=yaml.FullLoader) @@ -155,10 +155,10 @@ def cluster_engine(args): print("launch {} engine with cluster to with model: {}".format(trainer, args.model)) set_runtime_envs(cluster_envs, args.model) - launch = LocalClusterEngine(cluster_envs, args.model) - return launch + trainer = TrainerFactory.create(args.model) + return trainer - if args.role == "worker": + if args.role == "WORKER": return worker() else: return master() diff --git a/models/rank/dnn/submit.sh b/models/rank/dnn/submit.sh index 1a11360cccb3bfb8147300db4f35e1b66207098a..56b5f8798f0e4181dfd54d9e831078e4b1533d39 100644 --- a/models/rank/dnn/submit.sh +++ b/models/rank/dnn/submit.sh @@ -29,8 +29,8 @@ function package() { cp ${engine_submit_qconf} ${temp} echo "copy job.sh from " ${engine_worker} " to " ${temp} - mkdir -p ${temp}/package/python - cp -r ${engine_package_python}/* ${temp}/package/python/ + mkdir -p ${temp}/package + cp -r ${engine_package_python} ${temp}/package/ echo "copy python from " ${engine_package_python} " to " ${temp} mkdir ${temp}/package/whl diff --git a/models/rank/dnn/worker.sh b/models/rank/dnn/worker.sh index 57cf12c9e81f93d27972bfe4254cea1e9a72f599..3ca2a1f07dd07bd45599f23a60dd4c47f2c72700 100644 --- a/models/rank/dnn/worker.sh +++ b/models/rank/dnn/worker.sh @@ -16,10 +16,10 @@ declare g_run_stage="" # ---------------------------------------------------------------------------- # # const define # # ---------------------------------------------------------------------------- # -declare -r FLAGS_communicator_thread_pool_size=5 -declare -r FLAGS_communicator_send_queue_size=18 -declare -r FLAGS_communicator_thread_pool_size=20 -declare -r FLAGS_communicator_max_merge_var_num=18 +export FLAGS_communicator_thread_pool_size=5 +export FLAGS_communicator_send_queue_size=18 +export FLAGS_communicator_thread_pool_size=20 +export FLAGS_communicator_max_merge_var_num=18 ################################################################################ #----------------------------------------------------------------------------------------------------------------- @@ -44,9 +44,20 @@ function env_prepare() { WORKDIR=$(pwd) mpirun -npernode 1 mv package/* ./ echo "current:"$WORKDIR - export LIBRARY_PATH=$WORKDIR/python/lib:$LIBRARY_PATH - mpirun -npernode 1 python/bin/python -m pip install whl/fleet_rec-0.0.2-py2-none-any.whl --index-url=http://pip.baidu.com/pypi/simple --trusted-host pip.baidu.com >/dev/null + mpirun -npernode 1 tar -zxvf python.tar.gz > /dev/null + + export PYTHONPATH=$WORKDIR/python/ + export PYTHONROOT=$WORKDIR/python/ + export LIBRARY_PATH=$PYTHONPATH/lib:$LIBRARY_PATH + export LD_LIBRARY_PATH=$PYTHONPATH/lib:$LD_LIBRARY_PATH + export PATH=$PYTHONPATH/bin:$PATH + export LIBRARY_PATH=$PYTHONROOT/lib:$LIBRARY_PATH + + python -c "print('heheda')" + + mpirun -npernode 1 python/bin/python -m pip uninstall -y fleet-rec + mpirun -npernode 1 python/bin/python -m pip install whl/fleet_rec-0.0.2-py2-none-any.whl --index-url=http://pip.baidu.com/pypi/simple --trusted-host pip.baidu.com check_error }