From 7171b20e23f321fbb9f011bcbd74359d98d75829 Mon Sep 17 00:00:00 2001 From: xujiaqi01 <173596896@qq.com> Date: Wed, 5 Feb 2020 15:38:21 +0800 Subject: [PATCH] add GeneralRoleMaker (#22295) (#22446) * add GeneralRoleMaker which is for general usage * test=develop --- Dockerfile | 7 + paddle/fluid/framework/CMakeLists.txt | 1 + paddle/fluid/framework/data_set.cc | 2 + paddle/fluid/framework/dist_multi_trainer.cc | 4 +- .../framework/dist_multi_trainer_test.cc | 56 ++ paddle/fluid/framework/fleet/gloo_wrapper.cc | 39 +- paddle/fluid/framework/fleet/gloo_wrapper.h | 7 +- paddle/fluid/framework/fleet/test_fleet.cc | 3 +- paddle/fluid/pybind/gloo_wrapper_py.cc | 6 +- python/paddle/fluid/dataset.py | 16 +- .../fluid/incubate/fleet/base/fleet_base.py | 16 + .../fluid/incubate/fleet/base/role_maker.py | 498 +++++++++++++++++- .../fleet/parameter_server/pslib/__init__.py | 22 +- .../fluid/incubate/fleet/utils/fleet_util.py | 10 +- .../fluid/tests/unittests/test_dataset.py | 8 +- .../tests/unittests/test_fleet_rolemaker.py | 76 ++- .../tests/unittests/test_fleet_rolemaker_2.py | 285 ++++++++++ 17 files changed, 993 insertions(+), 63 deletions(-) create mode 100644 paddle/fluid/framework/dist_multi_trainer_test.cc create mode 100644 python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py diff --git a/Dockerfile b/Dockerfile index e431356a379..b4f8c9dcebb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -219,6 +219,13 @@ RUN wget -q https://launchpad.net/ubuntu/+archive/primary/+sourcefiles/binutils/ cd binutils-2.27 && \ ./configure && make -j && make install && cd .. && rm -rf binutils-2.27 binutils_2.27.orig.tar.gz +RUN wget --no-check-certificate https://pslib.bj.bcebos.com/openmpi-1.4.5.tar.gz && tar -xzf openmpi-1.4.5.tar.gz && \ + cd openmpi-1.4.5 && ./configure --prefix=/usr/local && make all -j8 && make install -j8 && \ + export LD_LIBRARY_PATH=/usr/local/lib/:$LD_LIBRARY_PATH && export PATH=/usr/local/bin:$PATH && cd .. && \ + rm -rf openmpi-1.4.5.tar.gz && pip --no-cache-dir install mpi4py && ln -fs /bin/bash /bin/sh && \ + apt-get install libprotobuf-dev -y +RUN pip --no-cache-dir install -U netifaces==0.10.9 + # Older versions of patchelf limited the size of the files being processed and were fixed in this pr. # https://github.com/NixOS/patchelf/commit/ba2695a8110abbc8cc6baf0eea819922ee5007fa # So install a newer version here. diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index bee02c556a1..bbe2e34650a 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -214,6 +214,7 @@ cc_library(parallel_executor SRCS parallel_executor.cc DEPS graph build_strategy fast_threaded_ssa_graph_executor variable_helper) +cc_test(dist_multi_trainer_test SRCS dist_multi_trainer_test.cc DEPS executor) cc_library(prune SRCS prune.cc DEPS framework_proto boost) cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context) cc_test(var_type_inference_test SRCS var_type_inference_test.cc DEPS op_registry diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index 514bc8ba9eb..69f13d9f7d7 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -287,6 +287,7 @@ void DatasetImpl::LocalShuffle() { template void DatasetImpl::GlobalShuffle(int thread_num) { +#ifdef PADDLE_WITH_PSLIB VLOG(3) << "DatasetImpl::GlobalShuffle() begin"; platform::Timer timeline; timeline.Start(); @@ -379,6 +380,7 @@ void DatasetImpl::GlobalShuffle(int thread_num) { timeline.Pause(); VLOG(3) << "DatasetImpl::GlobalShuffle() end, cost time=" << timeline.ElapsedSec() << " seconds"; +#endif } template diff --git a/paddle/fluid/framework/dist_multi_trainer.cc b/paddle/fluid/framework/dist_multi_trainer.cc index 879382e3f18..cb93e8bbcb1 100644 --- a/paddle/fluid/framework/dist_multi_trainer.cc +++ b/paddle/fluid/framework/dist_multi_trainer.cc @@ -41,8 +41,8 @@ void DistMultiTrainer::Initialize(const TrainerDesc &trainer_desc, need_dump_field_ = false; } } - mpi_rank_ = trainer_desc.mpi_rank() / 2; - mpi_size_ = trainer_desc.mpi_size() / 2; + mpi_rank_ = trainer_desc.mpi_rank(); + mpi_size_ = trainer_desc.mpi_size(); dump_file_num_ = trainer_desc.dump_file_num(); const std::vector readers = dataset->GetReaders(); diff --git a/paddle/fluid/framework/dist_multi_trainer_test.cc b/paddle/fluid/framework/dist_multi_trainer_test.cc new file mode 100644 index 00000000000..f54029fd17f --- /dev/null +++ b/paddle/fluid/framework/dist_multi_trainer_test.cc @@ -0,0 +1,56 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include "google/protobuf/io/zero_copy_stream_impl.h" +#include "google/protobuf/message.h" +#include "google/protobuf/text_format.h" +#include "gtest/gtest.h" +#include "paddle/fluid/framework/trainer.h" + +#if defined _WIN32 || defined __APPLE__ +#else +#define _LINUX +#endif + +namespace paddle { +namespace framework { +TEST(DisMultiTrainerTest, test1) { +#ifdef _LINUX + std::shared_ptr tmp1 = std::make_shared(); + TrainerDesc t; + t.set_class_name("DistMultiTrainer"); + t.set_device_worker_name("DownpourWorker"); + t.set_thread_num(1); + auto* m = t.mutable_downpour_param()->add_program_config(); + m->set_program_id("123"); + std::string str; + str += "name: \"MultiSlotDataFeed\"\nbatch_size: 2\nmulti_slot_desc {\n"; + str += "slots {\nname: \"words\"\ntype: \"uint64\"\nis_dense: false\n"; + str += "is_used: true\n}\nslots {\nname: \"label\"\ntype: \"uint64\"\n"; + str += "is_dense: false\nis_used: true\n}\n}\n"; + std::shared_ptr dataset = + std::make_shared(); + dataset->SetFileList(std::vector()); + dataset->SetThreadNum(1); + dataset->SetTrainerNum(1); + dataset->SetDataFeedDesc(str); + dataset->CreateReaders(); + tmp1->Initialize(t, dataset.get()); +#endif +} +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/fleet/gloo_wrapper.cc b/paddle/fluid/framework/fleet/gloo_wrapper.cc index c839bd1d38b..c599432ff19 100644 --- a/paddle/fluid/framework/fleet/gloo_wrapper.cc +++ b/paddle/fluid/framework/fleet/gloo_wrapper.cc @@ -21,6 +21,7 @@ HdfsStore::HdfsStore(const std::string& path) { path_ = path; wait_sleep_ms_ = 3000; wait_timeout_ = std::chrono::seconds(999999999); + retry_times_ = 100; } void HdfsStore::set(const std::string& key, const std::vector& data) { @@ -33,10 +34,27 @@ void HdfsStore::set(const std::string& key, const std::vector& data) { paddle::framework::fs_remove(path); } int err_no = 0; - std::shared_ptr fp = paddle::framework::fs_open_write(tmp, &err_no, ""); - size_t write_count = fwrite_unlocked(data.data(), 1, data.size(), fp.get()); - VLOG(3) << "HdfsStore::set write_count=" << write_count << " key " << key; - fp.reset(); + for (int i = 1; i <= retry_times_; ++i) { + std::shared_ptr fp = + paddle::framework::fs_open_write(tmp, &err_no, ""); + if (err_no != 0) { + VLOG(0) << "fs_open_write failed, retry times " << i << " err no " + << err_no; + fp.reset(); + sleep(wait_sleep_ms_ / 1000); + continue; + } + size_t write_count = fwrite_unlocked(data.data(), 1, data.size(), fp.get()); + if (write_count != data.size()) { + VLOG(0) << "fwrite_unlocked failed, retry times " << i << " write_count " + << write_count << " data.size() " << data.size(); + fp.reset(); + sleep(2); + continue; + } + fp.reset(); + break; + } paddle::framework::fs_mv(tmp, path); #endif } @@ -131,7 +149,7 @@ void GlooWrapper::Init(int rank, int size, const std::string& path, } rank_ = rank; size_ = size; - std::string cmd = std::string("hadoop fs"); + std::string cmd = std::string("${HADOOP_HOME}/bin/hadoop fs"); cmd += " -D fs.default.name=" + fs_name; cmd += " -D hadoop.job.ugi=" + fs_ugi; paddle::framework::hdfs_set_command(cmd); @@ -149,16 +167,19 @@ void GlooWrapper::Init(int rank, int size, const std::string& path, is_initialized_ = true; } -template void GlooWrapper::AllReduce( +template std::vector GlooWrapper::AllReduce( std::vector& sendbuf, // NOLINT - std::vector& recvbuf, // NOLINT const std::string& mode); -template void GlooWrapper::AllReduce( +template std::vector GlooWrapper::AllReduce( std::vector& sendbuf, // NOLINT - std::vector& recvbuf, // NOLINT + const std::string& mode); +template std::vector GlooWrapper::AllReduce( + std::vector& sendbuf, // NOLINT const std::string& mode); template std::vector GlooWrapper::AllGather( int64_t& input); // NOLINT +template std::vector GlooWrapper::AllGather( + uint64_t& input); // NOLINT template std::vector GlooWrapper::AllGather( double& input); // NOLINT diff --git a/paddle/fluid/framework/fleet/gloo_wrapper.h b/paddle/fluid/framework/fleet/gloo_wrapper.h index a7d0526dcde..528c91be6d4 100644 --- a/paddle/fluid/framework/fleet/gloo_wrapper.h +++ b/paddle/fluid/framework/fleet/gloo_wrapper.h @@ -70,6 +70,7 @@ class HdfsStore { std::string path_; int wait_sleep_ms_; std::chrono::seconds wait_timeout_; + int retry_times_; }; } // namespace rendezvous @@ -107,9 +108,10 @@ class GlooWrapper { } template - void AllReduce(std::vector& sendbuf, std::vector& recvbuf, // NOLINT - const std::string& mode = "sum") { + std::vector AllReduce(std::vector& sendbuf, // NOLINT + const std::string& mode = "sum") { // NOLINT CHECK_EQ(is_initialized_, true); + std::vector recvbuf(sendbuf.size(), T()); CHECK_EQ(sendbuf.size() == recvbuf.size(), true); #ifdef PADDLE_WITH_GLOO gloo::AllreduceOptions opts(context_); @@ -133,6 +135,7 @@ class GlooWrapper { } gloo::allreduce(opts); #endif + return recvbuf; } template diff --git a/paddle/fluid/framework/fleet/test_fleet.cc b/paddle/fluid/framework/fleet/test_fleet.cc index 42343beb246..5a3fd132d7e 100644 --- a/paddle/fluid/framework/fleet/test_fleet.cc +++ b/paddle/fluid/framework/fleet/test_fleet.cc @@ -49,8 +49,7 @@ TEST(TEST_GLOO, store_1) { gw.Size(); gw.Barrier(); std::vector input; - std::vector output; - gw.AllReduce(input, output); + gw.AllReduce(input); int64_t t; gw.AllGather(t); #endif diff --git a/paddle/fluid/pybind/gloo_wrapper_py.cc b/paddle/fluid/pybind/gloo_wrapper_py.cc index 5d0b720fcaf..80260ae2f29 100644 --- a/paddle/fluid/pybind/gloo_wrapper_py.cc +++ b/paddle/fluid/pybind/gloo_wrapper_py.cc @@ -37,12 +37,12 @@ void BindGlooWrapper(py::module* m) { .def("rank", &framework::GlooWrapper::Rank) .def("size", &framework::GlooWrapper::Size) .def("barrier", &framework::GlooWrapper::Barrier) + .def("all_reduce", &framework::GlooWrapper::AllReduce) .def("all_reduce", &framework::GlooWrapper::AllReduce) .def("all_reduce", &framework::GlooWrapper::AllReduce) + .def("all_gather", &framework::GlooWrapper::AllGather) .def("all_gather", &framework::GlooWrapper::AllGather) - .def("all_gather", &framework::GlooWrapper::AllGather) - .def("Allreduce", &framework::GlooWrapper::AllReduce) - .def("Allreduce", &framework::GlooWrapper::AllReduce); + .def("all_gather", &framework::GlooWrapper::AllGather); } // end BindGlooWrapper } // end namespace pybind } // end namespace paddle diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index 0915ac49872..92c8b6f06b8 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -526,7 +526,7 @@ class InMemoryDataset(DatasetBase): """ trainer_num = 1 if fleet is not None: - fleet._role_maker._barrier_worker() + fleet._role_maker.barrier_worker() trainer_num = fleet.worker_num() if self.fleet_send_batch_size is None: self.fleet_send_batch_size = 1024 @@ -537,14 +537,14 @@ class InMemoryDataset(DatasetBase): self.dataset.set_fleet_send_batch_size(self.fleet_send_batch_size) self.dataset.set_fleet_send_sleep_seconds(self.fleet_send_sleep_seconds) if fleet is not None: - fleet._role_maker._barrier_worker() + fleet._role_maker.barrier_worker() self.dataset.global_shuffle(thread_num) if fleet is not None: - fleet._role_maker._barrier_worker() + fleet._role_maker.barrier_worker() if self.merge_by_lineid: self.dataset.merge_by_lineid() if fleet is not None: - fleet._role_maker._barrier_worker() + fleet._role_maker.barrier_worker() def release_memory(self): """ @@ -599,8 +599,8 @@ class InMemoryDataset(DatasetBase): local_data_size = np.array([local_data_size]) if fleet is not None: global_data_size = local_data_size * 0 - fleet._role_maker._node_type_comm.Allreduce(local_data_size, - global_data_size) + fleet._role_maker.all_reduce_worker(local_data_size, + global_data_size) return global_data_size[0] return local_data_size[0] @@ -637,8 +637,8 @@ class InMemoryDataset(DatasetBase): local_data_size = np.array([local_data_size]) if fleet is not None: global_data_size = local_data_size * 0 - fleet._role_maker._node_type_comm.Allreduce(local_data_size, - global_data_size) + fleet._role_maker.all_reduce_worker(local_data_size, + global_data_size) return global_data_size[0] return local_data_size[0] diff --git a/python/paddle/fluid/incubate/fleet/base/fleet_base.py b/python/paddle/fluid/incubate/fleet/base/fleet_base.py index 93d30d3d74d..09a1bac85f0 100644 --- a/python/paddle/fluid/incubate/fleet/base/fleet_base.py +++ b/python/paddle/fluid/incubate/fleet/base/fleet_base.py @@ -202,6 +202,22 @@ class Fleet(object): self._role_maker.generate_role() self._is_initialized = True + def all_reduce_worker(self, input, output): + """ + all reduce between workers, only support array of one dim. + + Args: + input(list|numpy.array): array of one dim + output(list|numpy.array): array of one dim + """ + self._role_maker.all_reduce_worker(input, output) + + def barrier_worker(self): + """ + barrier between workers + """ + self._role_maker.barrier_worker() + @abc.abstractmethod def init_worker(self): pass diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index 2471748b4e4..6600ed9aa4e 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -11,16 +11,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Defination of Role Makers.""" from __future__ import print_function +import paddle.fluid as fluid +import os +import time __all__ = [ 'Role', 'RoleMakerBase', 'MPISymetricRoleMaker', 'UserDefinedRoleMaker', - 'UserDefinedCollectiveRoleMaker', 'PaddleCloudRoleMaker' + 'UserDefinedCollectiveRoleMaker', 'PaddleCloudRoleMaker', 'GeneralRoleMaker' ] -import os - class Role: WORKER = 1 @@ -107,6 +109,43 @@ class RoleMakerBase(object): self._role, self._current_id, self._worker_endpoints, self._server_endpoints) + def all_gather(self, input): + """ + all gather between trainers and pservers + + Args: + input(int|float): input value + + Returns: + return a list of values + """ + print("warning: RoleMakerBase does not have all gather.") + return None + + def all_reduce_worker(self, input, output, mode="sum"): + """ + all reduce between trainers if current role is TRAINER, + only support array of one dim. + + Args: + input(list/numpy.array): array of one dim + output(list/numpy.array): array of one dim + mode(str): "sum" or "min" or "max" + """ + print("warning: RoleMakerBase does not have all reduce worker.") + + def barrier_worker(self): + """ + barrier between trainers if current role is TRAINER + """ + print("warning: RoleMakerBase does not have barrier worker.") + + def barrier_all(self): + """ + barrier between trainers if current role is PSERVER + """ + print("warning: RoleMakerBase does not have barrier all.") + class MPIRoleMaker(RoleMakerBase): """ @@ -115,6 +154,7 @@ class MPIRoleMaker(RoleMakerBase): """ def __init__(self): + """Init.""" super(MPIRoleMaker, self).__init__() from mpi4py import MPI self.MPI = MPI @@ -124,16 +164,12 @@ class MPIRoleMaker(RoleMakerBase): self._ip = None def _get_rank(self): - """ - return rank - """ + """Return rank.""" self._rank = self._comm.Get_rank() return self._rank def _get_size(self): - """ - return size - """ + """Return size.""" self._size = self._comm.Get_size() return self._size @@ -174,9 +210,7 @@ class MPIRoleMaker(RoleMakerBase): return self._ips def get_local_ip(self): - """ - return get local ip - """ + """Return get local ip.""" import socket self._ip = socket.gethostbyname(socket.gethostname()) return self._ip @@ -196,16 +230,68 @@ class MPISymetricRoleMaker(MPIRoleMaker): """ def __init__(self): + """Init.""" super(MPISymetricRoleMaker, self).__init__() self._node_type = None self._proc_per_node = 2 self._pserver_rand_port = 0 def _check_role_generation(self): + """Check whether role has been generated.""" if not self._role_is_generated: raise NameError("generate_role() should be called first") return True + def all_gather(self, input): + """ + all gather between trainers and pservers + + Args: + input(int|float): input value + + Returns: + return a list of values + """ + if not self._role_is_generated: + self.generate_role() + return self._all_gather(input) + + def all_reduce_worker(self, input, output, mode="sum"): + """ + all reduce between trainers if current role is TRAINER, + only support array of one dim. + + Args: + input(list/numpy.array): array of one dim + output(list/numpy.array): array of one dim + mode(str): "sum" or "min" or "max" + """ + if not self._role_is_generated: + self.generate_role() + if not self.is_worker(): + print("warning: current role is not worker in all_reduce_worker") + return + self._all_reduce(input, output, mode) + + def barrier_worker(self): + """ + barrier between trainers if current role is TRAINER + """ + if not self._role_is_generated: + self.generate_role() + if self.is_worker(): + self._node_type_comm.barrier() + else: + print("warning: current role is not worker in barrier_worker") + + def barrier_all(self): + """ + barrier between trainers if current role is PSERVER + """ + if not self._role_is_generated: + self.generate_role() + self._comm.barrier() + def is_first_worker(self): """ return whether current process is the first worker assigned by role maker @@ -215,6 +301,12 @@ class MPISymetricRoleMaker(MPIRoleMaker): return False def get_pserver_endpoints(self): + """ + get pserver endpoints + + Returns: + endpoints(list): pserver endpoints + """ if self._pserver_rand_port <= 0: import random random.seed(self._server_num()) @@ -285,6 +377,28 @@ class MPISymetricRoleMaker(MPIRoleMaker): self.generate_role() return self._get_size() / self._proc_per_node + def _all_reduce(self, input, output, mode="sum"): + """ + all reduce between trainers if current role is TRAINER, + only support array of one dim. + + Args: + input(list/numpy.array): array of one dim + output(list/numpy.array): array of one dim + mode(str): "sum" or "min" or "max" + """ + if not self._role_is_generated: + self.generate_role() + if mode == "sum": + mode = self.MPI.SUM + elif mode == "max": + mode = self.MPI.MAX + elif mode == "min": + mode = self.MPI.MIN + else: + raise ValueError("unknown mode: %s" % mode) + self._node_type_comm.Allreduce(input, output, op=mode) + def _barrier_worker(self): """ barrier all workers in current distributed job @@ -325,12 +439,18 @@ class MPISymetricRoleMaker(MPIRoleMaker): class PaddleCloudRoleMaker(RoleMakerBase): + """ + role maker for paddle cloud, + base class is RoleMakerBase + """ + def __init__(self, is_collective=False): super(PaddleCloudRoleMaker, self).__init__() self._role_is_generated = False self._is_collective = is_collective def generate_role(self): + """Generate role.""" if not self._role_is_generated: if not self._is_collective: try: @@ -419,17 +539,352 @@ class PaddleCloudRoleMaker(RoleMakerBase): return self._trainers_num +class GeneralRoleMaker(RoleMakerBase): + """ + This role maker is for general use, you can set os.environ to customize: + PADDLE_PSERVERS_IP_PORT_LIST : all pservers' ip:port, seperated by ',' + PADDLE_TRAINER_ENDPOINTS : all trainers' ip:port, seperated by ',' + TRAINING_ROLE : TRAINER or PSERVER + PADDLE_TRAINER_ID : current trainer id (only for trainer), + it is index in PADDLE_TRAINER_ENDPOINTS + PADDLE_PSERVER_ID : current pserver id (only for pserver) + it is index in PADDLE_PSERVERS_IP_PORT_LIST + """ + + def __init__(self, **kwargs): + super(RoleMakerBase, self).__init__() + self._role_is_generated = False + self._hdfs_name = kwargs.get("hdfs_name", "") + self._hdfs_ugi = kwargs.get("hdfs_ugi", "") + self._hdfs_path = kwargs.get("path", "") + self._iface = self.__get_default_iface() + # this environment variable can be empty + self._prefix = os.getenv("SYS_JOB_ID", "") + + def generate_role(self): + """ + generate role for general role maker + """ + if not self._role_is_generated: + eplist = os.environ["PADDLE_PSERVERS_IP_PORT_LIST"].split(",") + training_role = os.environ["TRAINING_ROLE"] + worker_endpoints = os.environ["PADDLE_TRAINER_ENDPOINTS"].split(",") + trainers_num = len(worker_endpoints) + if training_role not in ["TRAINER", "PSERVER"]: + raise ValueError("TRAINING_ROLE must be PSERVER or TRAINER") + if training_role == "TRAINER": + role = Role.WORKER + current_id = int(os.environ["PADDLE_TRAINER_ID"]) + self._node_type = 1 + self._cur_endpoint = worker_endpoints[current_id] + gloo = fluid.core.Gloo() + gloo.init(current_id, + len(worker_endpoints), + self._hdfs_path.rstrip("/") + "/trainer", + self._hdfs_name, self._hdfs_ugi, self._iface, + self._prefix) + self._node_type_comm = gloo + elif training_role == "PSERVER": + role = Role.SERVER + if os.environ.get("PADDLE_PSERVER_ID") is not None: + current_id = int(os.environ["PADDLE_PSERVER_ID"]) + cur_endpoint = eplist[current_id] + else: + # this is for compatible with paddlecloud + cur_ip = os.environ["POD_IP"] + cur_port = os.environ["PADDLE_PORT"] + cur_endpoint = ":".join([cur_ip, cur_port]) + current_id = eplist.index(cur_endpoint) + self._node_type = 0 + self._cur_endpoint = cur_endpoint + gloo = fluid.core.Gloo() + gloo.init(current_id, + len(eplist), + self._hdfs_path.rstrip("/") + "/pserver", + self._hdfs_name, self._hdfs_ugi, self._iface, + self._prefix) + self._node_type_comm = gloo + + gloo = fluid.core.Gloo() + all_list = worker_endpoints + eplist + gloo.init( + all_list.index(self._cur_endpoint), + len(all_list), + self._hdfs_path.rstrip("/") + "/all", self._hdfs_name, + self._hdfs_ugi, self._iface, self._prefix) + self._all_comm = gloo + self._trainers_num = trainers_num + self._server_endpoints = eplist + self._role = role + self._current_id = current_id + self._rank = all_list.index(self._cur_endpoint) + self._size = len(all_list) + self._worker_endpoints = worker_endpoints + self._role_is_generated = True + + def all_gather(self, input): + """ + all gather between trainers and pservers + + Args: + input(int|float): input value + + Returns: + return a list of values + """ + return self._all_gather(input) + + def all_reduce_worker(self, input, output, mode="sum"): + """ + all reduce between trainers if current role is TRAINER, + only support array of one dim. + + Args: + input(list/numpy.array): array of one dim + output(list/numpy.array): array of one dim + mode(str): "sum" or "min" or "max" + """ + if not self.is_worker(): + return + self._all_reduce(input, output, mode) + + def barrier_worker(self): + """ + barrier between trainers if current role is TRAINER + """ + self._barrier_worker() + + def barrier_all(self): + """ + barrier between trainers if current role is PSERVER + """ + self._barrier_all() + + def get_local_endpoint(self): + """ + get local endpoint of current process + """ + if not self._role_is_generated: + self.generate_role() + return self._cur_endpoint + + def get_trainer_endpoints(self): + """ + get endpoint of all trainers + """ + if not self._role_is_generated: + self.generate_role() + return self._worker_endpoints + + def get_pserver_endpoints(self): + """ + get endpoint of all pservers + """ + if not self._role_is_generated: + self.generate_role() + return self._server_endpoints + + def is_worker(self): + """ + whether current process is worker + """ + if not self._role_is_generated: + self.generate_role() + return self._role == Role.WORKER + + def is_server(self): + """ + whether current process is server + """ + if not self._role_is_generated: + self.generate_role() + return self._role == Role.SERVER + + def is_first_worker(self): + """ + whether current process is worker of rank 0 + """ + if not self._role_is_generated: + self.generate_role() + return self._role == Role.WORKER and self._current_id == 0 + + def worker_index(self): + """ + get index of current worker + """ + if not self._role_is_generated: + self.generate_role() + return self._current_id + + def server_index(self): + """ + get index of current server + """ + if not self._role_is_generated: + self.generate_role() + return self._current_id + + def worker_num(self): + """ + retrun the current number of worker + """ + if not self._role_is_generated: + self.generate_role() + return self._worker_num() + + def server_num(self): + """ + return the current number of server + """ + if not self._role_is_generated: + self.generate_role() + return self._server_num() + + def _barrier_worker(self): + """ + barrier all workers in current distributed job + """ + if not self._role_is_generated: + self.generate_role() + if self.is_worker(): + self._node_type_comm.barrier() + + def _barrier_all(self): + """ + barrier all workers and servers in current distributed job + """ + if not self._role_is_generated: + self.generate_role() + self._all_comm.barrier() + + def _barrier_server(self): + """ + barrier all servers in current distributed job + """ + if not self._role_is_generated: + self.generate_role() + if self.is_server(): + self._node_type_comm.barrier() + + def _worker_num(self): + """ + return the current number of worker + """ + if not self._role_is_generated: + self.generate_role() + return self._trainers_num + + def _server_num(self): + """ + return the current number of server + """ + if not self._role_is_generated: + self.generate_role() + return len(self._server_endpoints) + + def _finalize(self): + """Default do nothing.""" + pass + + def _all_reduce(self, input, output, mode="sum"): + """ + all reduce between all workers + + Args: + input(list|numpy.array): array of one dim + output(list|numpy.array): array of one dim + mode(str): "sum" or "min" or "max" + """ + if not self._role_is_generated: + self.generate_role() + input_list = [i for i in input] + ans = self._node_type_comm.all_reduce(input_list, mode) + for i in range(len(ans)): + output[i] = ans[i] + + def _all_gather(self, obj): + """ + gather between all workers and pservers + """ + if not self._role_is_generated: + self.generate_role() + self._barrier_all() + return self._all_comm.all_gather(obj) + + def _worker_gather(self, obj): + """ + gather between all workers + """ + if not self._role_is_generated: + self.generate_role() + if not self.is_worker(): + return None + self._barrier_worker() + return self._node_type_comm.all_gather(obj) + + def _get_rank(self): + """ + get current rank in all workers and pservers + """ + if not self._role_is_generated: + self.generate_role() + return self._rank + + def _get_size(self): + """ + get total num of all workers and pservers + """ + if not self._role_is_generated: + self.generate_role() + return self._size + + def __get_default_iface(self): + """ + get default physical interface + """ + default1 = self.__get_default_iface_from_gateway() + default2 = self.__get_default_iface_from_interfaces() + return default2 if default1 == "lo" else default1 + + def __get_default_iface_from_gateway(self): + """ + get default physical interface + """ + import netifaces + gateways = netifaces.gateways() + if gateways.get(netifaces.AF_INET) != None: + gateway = gateways[netifaces.AF_INET] + if len(gateway) > 0 and len(gateway[0]) > 1: + return gateway[0][1] + return "lo" + + def __get_default_iface_from_interfaces(self): + """ + get default physical interface + """ + import netifaces + for intf_name in netifaces.interfaces(): + addresses = netifaces.ifaddresses(intf_name) + if netifaces.AF_INET in addresses: + ipv4_addresses = addresses[netifaces.AF_INET] + for ipv4_address in ipv4_addresses: + if 'broadcast' in ipv4_address: + return intf_name + return "lo" + + class UserDefinedRoleMaker(RoleMakerBase): + """ + UserDefinedRoleMaker is designed for worker and server assignment + under manual. Typically, a worker and a server node will be appointed + on each physical node, It can be assign by user. + """ + def __init__(self, current_id=0, role=Role.WORKER, worker_num=0, server_endpoints=None): - """ - UserDefinedRoleMaker is designed for worker and server assignment - under manual. Typically, a worker and a server node will be appointed - on each physical node, It can be assign by user. - """ super(UserDefinedRoleMaker, self).__init__() if not isinstance(server_endpoints, list): @@ -495,11 +950,12 @@ class UserDefinedRoleMaker(RoleMakerBase): class UserDefinedCollectiveRoleMaker(RoleMakerBase): + """ + UserDefinedCollectiveRoleMaker is designed for worker assignment + under manual for collective mode. + """ + def __init__(self, current_id=0, worker_endpoints=None): - """ - UserDefinedCollectiveRoleMaker is designed for worker assignment - under manual for collective mode. - """ super(UserDefinedCollectiveRoleMaker, self).__init__() if not isinstance(worker_endpoints, list): diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index 8d6be81fb11..362bb2c586f 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -40,7 +40,9 @@ class PSLib(Fleet): self._client2client_max_retry = 3 def init(self, role_maker=None): - super(PSLib, self).init(MPISymetricRoleMaker()) + if role_maker is None: + role_maker = MPISymetricRoleMaker() + super(PSLib, self).init(role_maker) self._fleet_ptr = fluid.core.Fleet() def _set_client_communication_config(self, request_timeout_ms, @@ -75,9 +77,10 @@ class PSLib(Fleet): # barrier_all for init_server, wait for server starts self._role_maker._barrier_all() self.all_ips_ = self._role_maker._all_gather(self._local_ip) + # worker_index * 2 is for compatible with older versions of pslib self._fleet_ptr.init_worker(self._dist_desc_str, self.all_ips_, self._role_maker._get_size(), - self._role_maker._get_rank()) + self._role_maker.worker_index() * 2) # barrier_all for init_worker self._role_maker._barrier_all() # prepare for client to client communication @@ -160,9 +163,16 @@ class PSLib(Fleet): else: raise Exception( "You should run DistributedOptimizer.minimize() first") + # server_index * 2 is for compatible with older versions of pslib self._fleet_ptr.init_server(self._dist_desc_str, - self._role_maker._get_rank()) - self._local_ip = self._fleet_ptr.run_server() + self._role_maker.server_index() * 2) + if isinstance(self._role_maker, MPISymetricRoleMaker): + self._local_ip = self._fleet_ptr.run_server() + else: + local_endpoint = self._role_maker.get_local_endpoint() + local_endpoint = local_endpoint.split(":") + self._local_ip = self._fleet_ptr.run_server( + str(local_endpoint[0]), int(local_endpoint[1])) # barrier_all for init_server self._role_maker._barrier_all() @@ -632,8 +642,8 @@ class DownpourOptimizer(DistributedOptimizer): parameter_list, no_grad_set, self._strategy) - opt_info["mpi_rank"] = fleet._role_maker._get_rank() - opt_info["mpi_size"] = fleet._role_maker._get_size() + opt_info["mpi_rank"] = fleet.worker_index() + opt_info["mpi_size"] = fleet.worker_num() fleet._set_opt_info(opt_info) programs = [loss.block.program for loss in losses] diff --git a/python/paddle/fluid/incubate/fleet/utils/fleet_util.py b/python/paddle/fluid/incubate/fleet/utils/fleet_util.py index 247828de5af..50fde2c47bf 100644 --- a/python/paddle/fluid/incubate/fleet/utils/fleet_util.py +++ b/python/paddle/fluid/incubate/fleet/utils/fleet_util.py @@ -206,7 +206,7 @@ class FleetUtil(object): pos = pos.reshape(-1) global_pos = np.copy(pos) * 0 # mpi allreduce - fleet._role_maker._node_type_comm.Allreduce(pos, global_pos) + fleet._role_maker._all_reduce(pos, global_pos) # reshape to its original shape global_pos = global_pos.reshape(old_pos_shape) @@ -215,7 +215,7 @@ class FleetUtil(object): old_neg_shape = np.array(neg.shape) neg = neg.reshape(-1) global_neg = np.copy(neg) * 0 - fleet._role_maker._node_type_comm.Allreduce(neg, global_neg) + fleet._role_maker._all_reduce(neg, global_neg) global_neg = global_neg.reshape(old_neg_shape) # calculate auc @@ -1350,7 +1350,7 @@ class FleetUtil(object): pos = pos.reshape(-1) global_pos = np.copy(pos) * 0 # mpi allreduce - fleet._role_maker._node_type_comm.Allreduce(pos, global_pos) + fleet._role_maker._all_reduce(pos, global_pos) # reshape to its original shape global_pos = global_pos.reshape(old_pos_shape) # auc neg bucket @@ -1358,7 +1358,7 @@ class FleetUtil(object): old_neg_shape = np.array(neg.shape) neg = neg.reshape(-1) global_neg = np.copy(neg) * 0 - fleet._role_maker._node_type_comm.Allreduce(neg, global_neg) + fleet._role_maker._all_reduce(neg, global_neg) global_neg = global_neg.reshape(old_neg_shape) num_bucket = len(global_pos[0]) @@ -1368,7 +1368,7 @@ class FleetUtil(object): old_metric_shape = np.array(metric.shape) metric = metric.reshape(-1) global_metric = np.copy(metric) * 0 - fleet._role_maker._node_type_comm.Allreduce(metric, global_metric) + fleet._role_maker._all_reduce(metric, global_metric) global_metric = global_metric.reshape(old_metric_shape) return global_metric[0] diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index d2b7e508a58..ff115963216 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -733,7 +733,7 @@ class TestDataset2(unittest.TestCase): place = fluid.CPUPlace() exe = fluid.Executor(place) try: - fleet.init(exe) + fleet.init() except ImportError as e: print("warning: no mpi4py") adam = fluid.optimizer.Adam(learning_rate=0.000005) @@ -795,7 +795,7 @@ class TestDataset2(unittest.TestCase): place = fluid.CPUPlace() exe = fluid.Executor(place) try: - fleet.init(exe) + fleet.init() except ImportError as e: print("warning: no mpi4py") adam = fluid.optimizer.Adam(learning_rate=0.000005) @@ -824,6 +824,10 @@ class TestDataset2(unittest.TestCase): dataset.set_pipe_command("cat") dataset.set_use_var(slots_vars) dataset.load_into_memory() + try: + dataset.global_shuffle(fleet) + except: + print("warning: catch expected error") fleet._opt_info = None fleet._fleet_ptr = None diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker.py index 298f7687093..7322891338f 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker.py @@ -11,36 +11,41 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Test cloud role maker.""" from __future__ import print_function import os import unittest - import paddle.fluid.incubate.fleet.base.role_maker as role_maker class TestCloudRoleMaker(unittest.TestCase): + """ + Test cases for PaddleCloudRoleMaker. + """ + def setUp(self): + """Set up, set envs.""" os.environ["PADDLE_TRAINERS_NUM"] = "2" os.environ[ "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001" def test_tr_rolemaker(self): + """Test tr rolenamer.""" os.environ["TRAINING_ROLE"] = "TRAINER" os.environ["PADDLE_TRAINER_ID"] = "0" ro = role_maker.PaddleCloudRoleMaker(is_collective=False) ro.generate_role() - self.assertTrue(ro.is_worker()) self.assertFalse(ro.is_server()) self.assertEqual(ro.worker_num(), 2) def test_ps_rolemaker(self): + """Test ps rolemaker.""" os.environ["TRAINING_ROLE"] = "PSERVER" os.environ["POD_IP"] = "127.0.0.1" os.environ["PADDLE_PORT"] = "36001" - ro = role_maker.PaddleCloudRoleMaker(is_collective=False) ro.generate_role() self.assertFalse(ro.is_worker()) @@ -48,10 +53,75 @@ class TestCloudRoleMaker(unittest.TestCase): self.assertEqual(ro.worker_num(), 2) def test_traing_role(self): + """Test training role.""" os.environ["TRAINING_ROLE"] = "TEST" ro = role_maker.PaddleCloudRoleMaker(is_collective=False) self.assertRaises(ValueError, ro.generate_role) + def test_pslib_1(self): + """Test cases for pslib.""" + import paddle.fluid as fluid + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + from paddle.fluid.incubate.fleet.parameter_server.pslib import PSLib + from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker + try: + import netifaces + except: + print("warning: no netifaces, skip test_pslib_1") + return + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36002" + os.environ["PADDLE_TRAINER_ID"] = "0" + role_maker = GeneralRoleMaker() + role_maker.generate_role() + place = fluid.CPUPlace() + exe = fluid.Executor(place) + fleet.init(role_maker) + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + with fluid.program_guard(train_program, startup_program): + show = fluid.layers.data(name="show", shape=[-1, 1], \ + dtype="float32", lod_level=1, append_batch_size=False) + fc = fluid.layers.fc(input=show, size=1, act=None) + label = fluid.layers.data(name="click", shape=[-1, 1], \ + dtype="int64", lod_level=1, append_batch_size=False) + label_cast = fluid.layers.cast(label, dtype='float32') + cost = fluid.layers.log_loss(fc, label_cast) + try: + adam = fluid.optimizer.Adam(learning_rate=0.000005) + adam = fleet.distributed_optimizer(adam) + adam.minimize([cost], [scope]) + fleet.run_server() + except: + print("do not support pslib test, skip") + return + from paddle.fluid.incubate.fleet.base.role_maker import \ + MPISymetricRoleMaker + try: + role = MPISymetricRoleMaker() + role._all_reduce([1], [2]) + except: + print("catch expected error of not inited") + try: + role = MPISymetricRoleMaker() + role._all_reduce([1], [2], "min") + except: + print("catch expected error of not inited") + try: + role = MPISymetricRoleMaker() + role._all_reduce([1], [2], "max") + except: + print("catch expected error of not inited") + try: + role = MPISymetricRoleMaker() + role._all_reduce([1], [2], "unknown") + except: + print("catch expected error of unknown type") + if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py new file mode 100644 index 00000000000..5fee4458a71 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py @@ -0,0 +1,285 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test cases for role makers.""" + +from __future__ import print_function +import os +import unittest + +import paddle.fluid.incubate.fleet.base.role_maker as role_maker + + +class TestCloudRoleMaker2(unittest.TestCase): + """ + Test cases for paddle cloud role makers. + """ + + def setUp(self): + """Set up, set envs.""" + pass + + def test_pslib_2(self): + """Test cases for pslib.""" + import paddle.fluid as fluid + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + from paddle.fluid.incubate.fleet.parameter_server.pslib import PSLib + from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker + from paddle.fluid.incubate.fleet.base.role_maker import RoleMakerBase + try: + import netifaces + except: + print("warning: no netifaces, skip test_pslib_2") + return + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36002" + os.environ["PADDLE_TRAINER_ID"] = "0" + os.environ["PADDLE_TRAINERS_NUM"] = "1" + place = fluid.CPUPlace() + exe = fluid.Executor(place) + try: + fleet.init(None) + except: + print("no mpi4py, skip test_pslib_2") + return + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + with fluid.program_guard(train_program, startup_program): + show = fluid.layers.data(name="show", shape=[-1, 1], \ + dtype="float32", lod_level=1, append_batch_size=False) + fc = fluid.layers.fc(input=show, size=1, act=None) + label = fluid.layers.data(name="click", shape=[-1, 1], \ + dtype="int64", lod_level=1, append_batch_size=False) + label_cast = fluid.layers.cast(label, dtype='float32') + cost = fluid.layers.log_loss(fc, label_cast) + try: + adam = fluid.optimizer.Adam(learning_rate=0.000005) + adam = fleet.distributed_optimizer(adam) + adam.minimize([cost], [scope]) + fleet.run_server() + except: + print("do not support pslib test, skip") + return + os.environ["TRAINING_ROLE"] = "wrong" + try: + role1 = GeneralRoleMaker(path="./test_gloo_1") + role1.generate_role() + except: + print("catch expected error of wrong TRAINING_ROLE") + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + role2 = GeneralRoleMaker(path="./test_gloo_2") + role2._finalize() + role2._all_gather(1) + role2._all_gather(1) + role2._barrier_server() + role2.all_gather(1) + role3 = GeneralRoleMaker(path="./test_gloo_3") + role3._worker_gather(1) + role3._worker_gather(1) + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36002" + role4 = GeneralRoleMaker(path="./test_gloo_4") + role4._worker_gather(1) + role4._get_rank() + role4._get_size() + role4._all_comm.init(0, 0, "", "", "", "", "") + role5 = GeneralRoleMaker(path="./test_gloo_5") + role5.get_local_endpoint() + role5.get_local_endpoint() + role6 = GeneralRoleMaker(path="./test_gloo_6") + role6.get_trainer_endpoints() + role6.get_trainer_endpoints() + role7 = GeneralRoleMaker(path="./test_gloo_7") + role7.get_pserver_endpoints() + role7.get_pserver_endpoints() + role8 = GeneralRoleMaker(path="./test_gloo_8") + role8.is_worker() + role8.is_worker() + role9 = GeneralRoleMaker(path="./test_gloo_9") + role9.is_server() + role9.is_server() + role10 = GeneralRoleMaker(path="./test_gloo_10") + role10.is_first_worker() + role10.is_first_worker() + role11 = GeneralRoleMaker(path="./test_gloo_11") + role11.worker_index() + role11.worker_index() + role12 = GeneralRoleMaker(path="./test_gloo_12") + role12.server_index() + role12.server_index() + role13 = GeneralRoleMaker(path="./test_gloo_13") + role13.worker_num() + role13.worker_num() + role14 = GeneralRoleMaker(path="./test_gloo_14") + role14.server_num() + role14.server_num() + role15 = GeneralRoleMaker(path="./test_gloo_15") + role15._barrier_worker() + role15._barrier_worker() + role16 = GeneralRoleMaker(path="./test_gloo_16") + role16._barrier_all() + role16._barrier_all() + role17 = GeneralRoleMaker(path="./test_gloo_17") + role17._barrier_server() + role17._barrier_server() + role18 = GeneralRoleMaker(path="./test_gloo_18") + role18._worker_num() + role18._worker_num() + role19 = GeneralRoleMaker(path="./test_gloo_19") + role19._server_num() + role19._server_num() + role20 = GeneralRoleMaker(path="./test_gloo_20") + a = [1] + b = [0] + role20._all_reduce(a, b) + role21 = GeneralRoleMaker(path="./test_gloo_21") + role21.all_reduce_worker([], []) + role21.all_reduce_worker([], []) + role21.barrier_worker() + role21.barrier_all() + role22 = GeneralRoleMaker(path="./test_gloo_22") + role22._get_rank() + role22._get_rank() + os.environ["PADDLE_PSERVER_ID"] = "0" + role23 = GeneralRoleMaker(path="./test_gloo_23") + role23._get_size() + role23._get_size() + with open("test_fleet_gloo_role_maker_1.txt", "w") as f: + data = "1 1 1 1\n" + f.write(data) + + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_filelist(["test_fleet_gloo_role_maker_1.txt"]) + dataset.set_use_var([show, label]) + dataset.load_into_memory() + dataset.get_memory_data_size(fleet) + dataset.get_shuffle_data_size(fleet) + os.remove("./test_fleet_gloo_role_maker_1.txt") + + class TmpClass(): + """ + dummy tmp class + """ + + def __init__(self): + pass + + def all_reduce_worker(self, input, output): + """ + dummy all reduce worker + + Args: + input(None): fake input + output(None): fale output + """ + pass + + def barrier_worker(self): + """ + dummy barrier worker + """ + pass + + from paddle.fluid.incubate.fleet.base.fleet_base import Fleet + + class TmpFleet(Fleet): + """ + dummy tmp fleet + """ + + def __init__(self): + super(Fleet, self).__init__() + self._role_maker = None + + def init_worker(self): + """ + dummy init worker + """ + pass + + def init_server(self, model_dir=None): + """ + dummy init server + + Args: + model_dir(None): fake model_dir + """ + pass + + def run_server(self): + """ + dummy run server + """ + pass + + def stop_worker(self): + """ + dummy stop worker + """ + pass + + def distributed_optimizer(self, optimizer, strategy=None): + """ + dummy distributed optimizer + + Args: + optimizer(None): fake optimizer + strategy(None): fake strategy + """ + pass + + def save_inference_model(self): + """ + dummy save inference model + """ + pass + + def save_persistables(self): + """ + dummy save persistables + """ + pass + + os.environ["TRAINING_ROLE"] = "TRAINER" + tmp = TmpFleet() + tmp._role_maker = TmpClass() + tmp.all_reduce_worker([], []) + tmp.barrier_worker() + from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker + tmp = RoleMakerBase() + tmp.all_gather(1) + tmp.all_reduce_worker([], []) + tmp.barrier_worker() + tmp.barrier_all() + from paddle.fluid.incubate.fleet.base.role_maker import \ + MPISymetricRoleMaker + tmp1 = MPISymetricRoleMaker() + tmp1.all_gather(1) + tmp1.all_gather(1) + tmp2 = MPISymetricRoleMaker() + tmp2.all_reduce_worker([], []) + tmp3 = MPISymetricRoleMaker() + tmp3.barrier_worker() + tmp3.barrier_worker() + tmp4 = MPISymetricRoleMaker() + tmp4.barrier_all() + tmp4.barrier_all() + + +if __name__ == "__main__": + unittest.main() -- GitLab