diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index 4d632d737611e9c04d8f60d34bbb9518266e2810..135fc407943c6c03c2b0d32ad5a9650daa5ea3e4 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -1166,6 +1166,28 @@ int32_t FleetWrapper::CopyTable(const uint64_t src_table_id, #endif } +void FleetWrapper::Confirm() { +#ifdef PADDLE_WITH_PSLIB + // FIXME(xujiaqi01): will later support confirm + // auto ret = pslib_ptr_->_worker_ptr->confirm(); + // ret.wait(); + VLOG(0) << "disable FleetWrapper::Confirm temporarily"; +#else + VLOG(0) << "FleetWrapper::Confirm does nothing when no pslib"; +#endif +} + +void FleetWrapper::Revert() { +#ifdef PADDLE_WITH_PSLIB + // FIXME(xujiaqi01): will later support revert + // auto ret = pslib_ptr_->_worker_ptr->revert(); + // ret.wait(); + VLOG(0) << "disable FleetWrapper::Revert temporarily"; +#else + VLOG(0) << "FleetWrapper::Revert does nothing when no pslib"; +#endif +} + int32_t FleetWrapper::CopyTableByFeasign( const uint64_t src_table_id, const uint64_t dest_table_id, const std::vector& feasign_list) { diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.h b/paddle/fluid/framework/fleet/fleet_wrapper.h index 933b0a8bd852cfa2b1fb1b485cfd3ef2d012dd11..4c0564f87d48b4ba06bdb75e94fbb80b3d4f448a 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.h +++ b/paddle/fluid/framework/fleet/fleet_wrapper.h @@ -268,6 +268,10 @@ class FleetWrapper { // send client to client message std::future SendClientToClientMsg(int msg_type, int to_client_id, const std::string& msg); + // confirm all the updated params in the current pass + void Confirm(); + // revert all the updated params in the current pass + void Revert(); // FleetWrapper singleton static std::shared_ptr GetInstance() { if (NULL == s_instance_) { diff --git a/paddle/fluid/framework/fleet/gloo_wrapper.cc b/paddle/fluid/framework/fleet/gloo_wrapper.cc index c599432ff190ae78a3b24dc5e7d3d6b7669e67d5..45512d6adc49c577c2332ae2ef2c42408138990e 100644 --- a/paddle/fluid/framework/fleet/gloo_wrapper.cc +++ b/paddle/fluid/framework/fleet/gloo_wrapper.cc @@ -10,16 +10,18 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/framework/fleet/gloo_wrapper.h" +#include // NOLINT #include #include "paddle/fluid/framework/io/fs.h" #include "paddle/fluid/platform/errors.h" +#include "paddle/fluid/string/string_helper.h" namespace gloo { namespace rendezvous { HdfsStore::HdfsStore(const std::string& path) { path_ = path; - wait_sleep_ms_ = 3000; + wait_sleep_ms_ = 10000; wait_timeout_ = std::chrono::seconds(999999999); retry_times_ = 100; } @@ -35,49 +37,86 @@ void HdfsStore::set(const std::string& key, const std::vector& data) { } int err_no = 0; for (int i = 1; i <= retry_times_; ++i) { + err_no = 0; std::shared_ptr fp = paddle::framework::fs_open_write(tmp, &err_no, ""); - if (err_no != 0) { - VLOG(0) << "fs_open_write failed, retry times " << i << " err no " - << err_no; - fp.reset(); - sleep(wait_sleep_ms_ / 1000); - continue; - } size_t write_count = fwrite_unlocked(data.data(), 1, data.size(), fp.get()); if (write_count != data.size()) { VLOG(0) << "fwrite_unlocked failed, retry times " << i << " write_count " << write_count << " data.size() " << data.size(); - fp.reset(); - sleep(2); - continue; + err_no = -1; } fp.reset(); - break; + if (err_no != 0) { + VLOG(0) << "fs_open_write failed, retry times " << i << " err no " + << err_no; + sleep(wait_sleep_ms_ / 1000); + paddle::framework::fs_remove(tmp); + if (i == retry_times_) { + VLOG(0) << "fs_open_write failed, retry times reaches limit"; + PADDLE_THROW(platform::errors::PreconditionNotMet( + "fs_open_write failed, retry times reaches" + " limit ", + retry_times_)); + } + } else { + break; + } } paddle::framework::fs_mv(tmp, path); #endif } +#ifdef PADDLE_WITH_GLOO +int retry_do_func(std::function func, uint32_t max_try_time, + uint32_t retry_interval_ms) { + for (uint32_t i = 0; i < max_try_time; ++i) { + if (func() == 0) { + return 0; + } +#ifdef _LINUX + usleep(retry_interval_ms * 1000); +#endif + } + return -1; +} +#endif + std::vector HdfsStore::get(const std::string& key) { auto path = ObjectPath(key); std::vector result; #ifdef PADDLE_WITH_GLOO // block until key is set wait({key}); - bool is_exists = paddle::framework::fs_exists(path); + int ret = retry_do_func( + [&path]() { return paddle::framework::fs_exists(path) ? 0 : -1; }, 5, + wait_sleep_ms_); + bool is_exists = (ret == 0); PADDLE_ENFORCE_EQ(is_exists, true, paddle::platform::errors::NotFound( "HdfsStore::get, path not exists: " + path)); - int err_no = 0; - std::shared_ptr fp = paddle::framework::fs_open_read(path, &err_no, ""); - char buffer = '\0'; - size_t read_count = 0; - while (fread(&buffer, 1, 1, fp.get()) == 1) { - ++read_count; - result.push_back(buffer); - } - VLOG(3) << "HdfsStore::get read_count " << read_count; + + int read_status = retry_do_func( + [&path, &result]() { + result.clear(); + int err_no = 0; + { + std::shared_ptr fp = + paddle::framework::fs_open_read(path, &err_no, ""); + char buffer = '\0'; + size_t read_count = 0; + while (fread(&buffer, 1, 1, fp.get()) == 1) { + ++read_count; + result.push_back(buffer); + } + VLOG(3) << "HdfsStore::get read_count " << read_count; + } + return err_no; + }, + 5, wait_sleep_ms_); + PADDLE_ENFORCE_EQ(read_status, 0, + paddle::platform::errors::Fatal( + "HdfsStore::get, path read faied: " + path)); #endif return result; } @@ -92,22 +131,33 @@ void HdfsStore::wait(const std::vector& keys, const std::chrono::milliseconds&) { // NOLINT #ifdef PADDLE_WITH_GLOO auto start = std::chrono::steady_clock::now(); - while (!Check(keys)) { + std::vector check_key_status(keys.size(), false); + while (!Check(keys, &check_key_status)) { auto elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start); if (wait_timeout_ != gloo::kNoTimeout && elapsed > wait_timeout_) { - PADDLE_ENFORCE_EQ(0, 1, paddle::platform::errors::ExecutionTimeout( - "HdfsStore::wait, Wait timeout for key(s): " + - ::gloo::MakeString(keys))); + int32_t last_check_rank = -1; + for (size_t i = 0; i < check_key_status.size(); ++i) { + if (!check_key_status[i]) { + last_check_rank = i; + break; + } + } + PADDLE_THROW(platform::errors::ExecutionTimeout( + "TIMEOUT self_rank = %d pair_rank = %d", self_rank_, + last_check_rank)); } std::this_thread::sleep_for(std::chrono::milliseconds(wait_sleep_ms_)); } #endif } +void HdfsStore::SetTimeoutSeconds(int timeout_seconds) { + wait_timeout_ = std::chrono::seconds(timeout_seconds); +} + std::string HdfsStore::EncodeName(const std::string& name) { - thread_local std::hash hash_func; - return std::to_string(hash_func(name)); + return ::paddle::string::erase_spaces(name); } std::string HdfsStore::TmpPath(const std::string& name) { @@ -118,50 +168,124 @@ std::string HdfsStore::ObjectPath(const std::string& name) { return path_ + "/" + EncodeName(name); } -bool HdfsStore::Check(const std::vector& keys) { +bool HdfsStore::Check(const std::vector& keys, + std::vector* keys_check_status) { #ifdef PADDLE_WITH_GLOO + bool ret = true; std::vector paths; for (const auto& key : keys) { paths.push_back(ObjectPath(key)); } - for (const auto& path : paths) { + for (size_t i = 0; i < paths.size(); ++i) { + if ((*keys_check_status)[i]) { + continue; + } + const auto& path = paths[i]; bool is_exists = paddle::framework::fs_exists(path); VLOG(3) << "HdfsStore::Check " << is_exists << " path " << path; if (!is_exists) { - return false; + ret = false; } + (*keys_check_status)[i] = is_exists; } + return ret; +#else + VLOG(0) << "HdfsStore::Check does nothing when no gloo"; #endif return true; } +#ifdef PADDLE_WITH_GLOO +void ParallelConnectContext::connectFullMesh( + Store& store, std::shared_ptr& dev) { + std::vector allBytes; + // Create pairs + auto transportContext = dev->createContext(rank, size); + transportContext->setTimeout(getTimeout()); + for (int i = 0; i < size; i++) { + if (i == rank) { + continue; + } + auto& pair = transportContext->createPair(i); + auto addrBytes = pair->address().bytes(); + allBytes.insert(allBytes.end(), addrBytes.begin(), addrBytes.end()); + } + std::ostringstream storeKey; + storeKey << rank; + store.set(storeKey.str(), allBytes); + + std::vector> connect_threads(thread_num_); + // Connect every pair + for (uint32_t i = 0; i < connect_threads.size(); ++i) { + connect_threads[i].reset(new std::thread( + [&store, &transportContext, this](size_t thread_idx, + size_t thread_num) -> void { + for (int i = thread_idx; i < size; i += thread_num) { + if (i == rank) { + continue; + } + // Wait for address of other side of this pair to become available + std::string key = std::to_string(i); + store.wait({key}, getTimeout()); + // Connect to other side of this pair + auto allAddrs = store.get(key); + auto addr = extractAddress(allAddrs, i); + transportContext->getPair(i)->connect(addr); + } + }, + i, connect_threads.size())); + } + for (uint32_t i = 0; i < connect_threads.size(); ++i) { + connect_threads[i]->join(); + } + device_ = dev; + transportContext_ = std::move(transportContext); +} +#endif } // namespace rendezvous } // namespace gloo namespace paddle { namespace framework { -void GlooWrapper::Init(int rank, int size, const std::string& path, - const std::string& fs_name, const std::string& fs_ugi, - const std::string& iface, const std::string& prefix) { +void GlooWrapper::Init() { if (is_initialized_) { return; } - rank_ = rank; - size_ = size; - std::string cmd = std::string("${HADOOP_HOME}/bin/hadoop fs"); - cmd += " -D fs.default.name=" + fs_name; - cmd += " -D hadoop.job.ugi=" + fs_ugi; - paddle::framework::hdfs_set_command(cmd); #ifdef PADDLE_WITH_GLOO gloo::transport::tcp::attr attr; - attr.iface = iface; - auto file_store = gloo::rendezvous::HdfsStore(path); - auto prefix_store = gloo::rendezvous::PrefixStore(prefix, file_store); + attr.iface = iface_; + std::shared_ptr file_store = nullptr; + std::shared_ptr http_store = nullptr; + auto context = + std::make_shared(rank_, size_); + context->setTimeout(run_timeout_); auto dev = gloo::transport::tcp::CreateDevice(attr); - auto context = std::make_shared(rank, size); - context->setTimeout(file_store.wait_timeout_); - context->connectFullMesh(prefix_store, dev); + switch (store_type_) { + case GlooStoreType::HDFS: { + std::string cmd = std::string("${HADOOP_HOME}/bin/hadoop fs"); + cmd += " -D fs.default.name=" + hdfs_name_; + cmd += " -D hadoop.job.ugi=" + hdfs_ugi_; + paddle::framework::hdfs_set_command(cmd); + file_store = std::make_shared(hdfs_path_); + file_store->SetTimeoutSeconds(init_timeout_.count()); + auto prefix_store = + std::make_shared(prefix_, *file_store); + context->connectFullMesh(*prefix_store, dev); + break; + } + case GlooStoreType::HTTP: { + http_store = std::make_shared( + http_ip_, http_port_, prefix_ + "_" + http_scope_, rank_); + http_store->SetTimeoutSeconds(init_timeout_.count()); + context->connectFullMesh(*http_store, dev); + http_store->Finalize(); + break; + } + default: + LOG(ERROR) << "unknown store type " << store_type_; + exit(-1); + } context_ = std::move(context); #endif is_initialized_ = true; @@ -170,6 +294,9 @@ void GlooWrapper::Init(int rank, int size, const std::string& path, template std::vector GlooWrapper::AllReduce( std::vector& sendbuf, // NOLINT const std::string& mode); +template std::vector GlooWrapper::AllReduce( + std::vector& sendbuf, // NOLINT + const std::string& mode); template std::vector GlooWrapper::AllReduce( std::vector& sendbuf, // NOLINT const std::string& mode); @@ -180,6 +307,8 @@ template std::vector GlooWrapper::AllGather( int64_t& input); // NOLINT template std::vector GlooWrapper::AllGather( uint64_t& input); // NOLINT +template std::vector GlooWrapper::AllGather( + float& input); // NOLINT template std::vector GlooWrapper::AllGather( double& input); // NOLINT diff --git a/paddle/fluid/framework/fleet/gloo_wrapper.h b/paddle/fluid/framework/fleet/gloo_wrapper.h index 528c91be6d4b9af7ce8bdfefdc98d4bad9896db3..3f932ee226ca85b86c92fef1b30420d782d9bc62 100644 --- a/paddle/fluid/framework/fleet/gloo_wrapper.h +++ b/paddle/fluid/framework/fleet/gloo_wrapper.h @@ -31,6 +31,7 @@ limitations under the License. */ #include #include #include +#include #include #include #include @@ -59,44 +60,87 @@ class HdfsStore { virtual void wait(const std::vector& keys, const std::chrono::milliseconds& timeout); + virtual void SetTimeoutSeconds(int timeout_seconds); + std::string EncodeName(const std::string& name); std::string TmpPath(const std::string& name); std::string ObjectPath(const std::string& name); - bool Check(const std::vector& keys); + bool Check(const std::vector& keys, + std::vector* keys_check_status); + + void SetRank(int rank) { self_rank_ = rank; } std::string path_; int wait_sleep_ms_; std::chrono::seconds wait_timeout_; int retry_times_; + int self_rank_; }; +#ifdef PADDLE_WITH_GLOO +class ParallelConnectContext : public gloo::rendezvous::Context { + public: + ParallelConnectContext(int rank, int size, int base = 2) + : gloo::rendezvous::Context(rank, size, base) {} + virtual ~ParallelConnectContext() {} + // in gloo::rendezvous::Context wait&get one by one, + // slowly in case big size, especialy in HdfsStore + void connectFullMesh(Store& store, // NOLINT + std::shared_ptr& dev); // NOLINT + + protected: + int thread_num_ = 6; +}; +#endif } // namespace rendezvous } // namespace gloo namespace paddle { namespace framework { +enum GlooStoreType { HDFS, HTTP }; + class GlooWrapper { public: GlooWrapper() {} virtual ~GlooWrapper() {} - void Init(int rank, int size, const std::string& path, - const std::string& fs_name, const std::string& fs_ugi, - const std::string& iface, const std::string& prefix); + void Init(); - int Rank() { - CHECK_EQ(is_initialized_, true); - return rank_; + void SetTimeoutSeconds(int init_seconds, int run_seconds) { + init_timeout_ = std::chrono::seconds(init_seconds); + run_timeout_ = std::chrono::seconds(run_seconds); } - int Size() { - CHECK_EQ(is_initialized_, true); - return size_; + int Rank() { return rank_; } + + int Size() { return size_; } + + void SetRank(int rank) { rank_ = rank; } + + void SetSize(int size) { size_ = size; } + + void SetIface(const std::string& iface) { iface_ = iface; } + + void SetPrefix(const std::string& prefix) { prefix_ = prefix; } + + void SetHdfsStore(const std::string& path, const std::string& fs_name, + const std::string& fs_ugi) { + store_type_ = GlooStoreType::HDFS; + hdfs_path_ = path; + hdfs_name_ = fs_name; + hdfs_ugi_ = fs_ugi; + } + + void SetHttpStore(const std::string& ip, int port, const std::string& scope) { + store_type_ = GlooStoreType::HTTP; + http_ip_ = ip; + http_port_ = port; + http_scope_ = scope; } void Barrier() { @@ -104,6 +148,8 @@ class GlooWrapper { #ifdef PADDLE_WITH_GLOO gloo::BarrierOptions opts(context_); gloo::barrier(opts); +#else + LOG(WARNING) << "Barrier does nothing when WITH_GLOO=OFF"; #endif } @@ -134,6 +180,8 @@ class GlooWrapper { "AllReduce mode not known: " + mode)); } gloo::allreduce(opts); +#else + LOG(WARNING) << "AllReduce does nothing when WITH_GLOO=OFF"; #endif return recvbuf; } @@ -147,6 +195,8 @@ class GlooWrapper { opts.setInput(&input, 1); opts.setOutput(ret.data(), size_); gloo::allgather(opts); +#else + LOG(WARNING) << "AllGather does nothing when WITH_GLOO=OFF"; #endif return std::move(ret); } @@ -158,6 +208,19 @@ class GlooWrapper { #endif int rank_ = 0; int size_ = 0; + std::chrono::seconds init_timeout_ = std::chrono::seconds(9999999); + std::chrono::seconds run_timeout_ = std::chrono::seconds(9999999); + std::string iface_ = "lo"; + std::string prefix_; + GlooStoreType store_type_ = GlooStoreType::HDFS; + // configs for hdfs store + std::string hdfs_path_; + std::string hdfs_name_; + std::string hdfs_ugi_; + std::string http_ip_; + // configs for http store + int http_port_; + std::string http_scope_; }; } // namespace framework diff --git a/paddle/fluid/framework/fleet/test_fleet.cc b/paddle/fluid/framework/fleet/test_fleet.cc index bf9928789cafeb1455bb241b76482bf78a737871..dac95dd268e4547b2abc551d9c8181904d246147 100644 --- a/paddle/fluid/framework/fleet/test_fleet.cc +++ b/paddle/fluid/framework/fleet/test_fleet.cc @@ -17,6 +17,7 @@ #include "paddle/fluid/framework/fleet/fleet_wrapper.h" #include "paddle/fluid/framework/fleet/gloo_wrapper.h" #include "paddle/fluid/framework/io/fs.h" +#include "paddle/fluid/string/string_helper.h" #if defined _WIN32 || defined __APPLE__ #else @@ -37,14 +38,23 @@ TEST(TEST_GLOO, store_1) { } store.wait(std::vector{"test"}); store.wait(std::vector{"test"}, std::chrono::milliseconds(0)); + store.SetTimeoutSeconds(100000); store.EncodeName("1"); store.TmpPath("1"); store.ObjectPath("1"); - store.Check(std::vector{"test"}); + std::vector status(1, false); + store.Check(std::vector{"test"}, &status); auto gw = paddle::framework::GlooWrapper(); - gw.Init(0, 1, "", "", "", "", ""); - gw.Init(0, 1, "", "", "", "", ""); + gw.SetTimeoutSeconds(1000, 1000); + gw.SetRank(0); + gw.SetSize(1); + gw.SetPrefix(""); + gw.SetIface("lo"); + gw.SetHdfsStore("", "", ""); + gw.Init(); + gw.SetHttpStore("", 8099, ""); + gw.Init(); gw.Rank(); gw.Size(); gw.Barrier(); @@ -63,5 +73,8 @@ TEST(TEST_FLEET, fleet_1) { fleet->RunServer("", 0); fleet->SaveModelOneTable(0, "", 0); fleet->SaveModelOneTablePrefix(0, "", 0, ""); + fleet->Confirm(); + fleet->Revert(); + paddle::string::erase_spaces("1 2"); #endif } diff --git a/paddle/fluid/pybind/fleet_wrapper_py.cc b/paddle/fluid/pybind/fleet_wrapper_py.cc index 3ae4eef44910031dd9f93c3286d404c51c11c1fe..4b72b09adddf24f63814e8e4872af289b38bcb44 100644 --- a/paddle/fluid/pybind/fleet_wrapper_py.cc +++ b/paddle/fluid/pybind/fleet_wrapper_py.cc @@ -78,6 +78,8 @@ void BindFleetWrapper(py::module* m) { &framework::FleetWrapper::SetClient2ClientConfig) .def("set_pull_local_thread_num", &framework::FleetWrapper::SetPullLocalThreadNum) + .def("confirm", &framework::FleetWrapper::Confirm) + .def("revert", &framework::FleetWrapper::Revert) .def("save_model_one_table", &framework::FleetWrapper::SaveModelOneTable) .def("save_model_one_table_with_prefix", &framework::FleetWrapper::SaveModelOneTablePrefix) diff --git a/paddle/fluid/pybind/gloo_wrapper_py.cc b/paddle/fluid/pybind/gloo_wrapper_py.cc index 80260ae2f2929693219a93bf7c4b9eeb4884113d..e570333d091b40f7ee9c34b02c98f3711e98e5f4 100644 --- a/paddle/fluid/pybind/gloo_wrapper_py.cc +++ b/paddle/fluid/pybind/gloo_wrapper_py.cc @@ -37,11 +37,20 @@ void BindGlooWrapper(py::module* m) { .def("rank", &framework::GlooWrapper::Rank) .def("size", &framework::GlooWrapper::Size) .def("barrier", &framework::GlooWrapper::Barrier) + .def("set_timeout_seconds", &framework::GlooWrapper::SetTimeoutSeconds) + .def("set_rank", &framework::GlooWrapper::SetRank) + .def("set_size", &framework::GlooWrapper::SetSize) + .def("set_iface", &framework::GlooWrapper::SetIface) + .def("set_prefix", &framework::GlooWrapper::SetPrefix) + .def("set_hdfs_store", &framework::GlooWrapper::SetHdfsStore) + .def("set_http_store", &framework::GlooWrapper::SetHttpStore) .def("all_reduce", &framework::GlooWrapper::AllReduce) .def("all_reduce", &framework::GlooWrapper::AllReduce) + .def("all_reduce", &framework::GlooWrapper::AllReduce) .def("all_reduce", &framework::GlooWrapper::AllReduce) .def("all_gather", &framework::GlooWrapper::AllGather) .def("all_gather", &framework::GlooWrapper::AllGather) + .def("all_gather", &framework::GlooWrapper::AllGather) .def("all_gather", &framework::GlooWrapper::AllGather); } // end BindGlooWrapper } // end namespace pybind diff --git a/paddle/fluid/string/string_helper.cc b/paddle/fluid/string/string_helper.cc index 27708b8eebd2131ebadcc310fd3521ad5ab824f3..712db90d2f4ba62abdb0747827d78266503efcc5 100644 --- a/paddle/fluid/string/string_helper.cc +++ b/paddle/fluid/string/string_helper.cc @@ -61,6 +61,19 @@ std::string trim_spaces(const std::string& str) { return std::string(p, len); } +std::string erase_spaces(const std::string& str) { + std::string result; + result.reserve(str.size()); + const char* p = str.c_str(); + while (*p != 0) { + if (!isspace(*p)) { + result.append(p, 1); + } + ++p; + } + return result; +} + inline int str_to_float(const char* str, float* v) { const char* head = str; char* cursor = NULL; diff --git a/paddle/fluid/string/string_helper.h b/paddle/fluid/string/string_helper.h index cc09088c7ee8a38e3b3a7a673c9da2e21cf42485..8bf379a6b3423fed07d3ab670e0492a9316e8e19 100644 --- a/paddle/fluid/string/string_helper.h +++ b/paddle/fluid/string/string_helper.h @@ -62,6 +62,9 @@ std::string format_string(const std::string& fmt, ARGS&&... args) { // remove leading and tailing spaces std::string trim_spaces(const std::string& str); +// erase all spaces in str +std::string erase_spaces(const std::string& str); + int str_to_float(const char* str, float* v); // split string by delim diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index bada19abcc32d2bc91d99203a7b66389aca912d3..c8005dce44f779ba264f4ccaa73584c5008efe2f 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -14,6 +14,7 @@ """Defination of Role Makers.""" from __future__ import print_function +from multiprocessing import Process, Manager import paddle.fluid as fluid import os import time @@ -556,7 +557,21 @@ class GeneralRoleMaker(RoleMakerBase): self._role_is_generated = False self._hdfs_name = kwargs.get("hdfs_name", "") self._hdfs_ugi = kwargs.get("hdfs_ugi", "") - self._hdfs_path = kwargs.get("path", "") + self._hdfs_path = kwargs.get("path", "").rstrip("/") + self._init_timeout_seconds = kwargs.get("init_timeout_seconds", 3600) + self._run_timeout_seconds = kwargs.get("run_timeout_seconds", 9999999) + ip_port = kwargs.get("http_ip_port", "") + self._http_ip_port = [] + self._http_server = None + # if ip_port is not empty, it will use http instead of hdfs + if ip_port != "": + self._http_ip_port = ip_port.split(":") + # it's for communication between processes + self._manager = Manager() + # global dict to store status + self._http_server_d = self._manager.dict() + # set running status of http server + self._http_server_d["running"] = False self._iface = self.__get_default_iface() # this environment variable can be empty self._prefix = os.getenv("SYS_JOB_ID", "") @@ -572,17 +587,41 @@ class GeneralRoleMaker(RoleMakerBase): trainers_num = len(worker_endpoints) if training_role not in ["TRAINER", "PSERVER"]: raise ValueError("TRAINING_ROLE must be PSERVER or TRAINER") + if training_role == "TRAINER": role = Role.WORKER current_id = int(os.environ["PADDLE_TRAINER_ID"]) + if current_id == 0 and len(self._http_ip_port) != 0: + size_d = { + "trainer": len(worker_endpoints), + "pserver": len(eplist), + "all": len(worker_endpoints) + len(eplist) + } + # child process for http server + self._http_server = Process( + target=self.__start_kv_server, + args=(self._http_server_d, size_d)) + self._http_server.daemon = True + # set running status to True + self._http_server_d["running"] = True + # start child process + self._http_server.start() self._node_type = 1 self._cur_endpoint = worker_endpoints[current_id] gloo = fluid.core.Gloo() - gloo.init(current_id, - len(worker_endpoints), - self._hdfs_path.rstrip("/") + "/trainer", - self._hdfs_name, self._hdfs_ugi, self._iface, - self._prefix) + gloo.set_rank(current_id) + gloo.set_size(len(worker_endpoints)) + gloo.set_prefix(self._prefix) + gloo.set_iface(self._iface) + gloo.set_timeout_seconds(self._init_timeout_seconds, + self._run_timeout_seconds) + if len(self._http_ip_port) != 0: + gloo.set_http_store(self._http_ip_port[0], + int(self._http_ip_port[1]), "trainer") + else: + gloo.set_hdfs_store(self._hdfs_path + "/trainer", + self._hdfs_name, self._hdfs_ugi) + gloo.init() self._node_type_comm = gloo elif training_role == "PSERVER": role = Role.SERVER @@ -598,20 +637,36 @@ class GeneralRoleMaker(RoleMakerBase): self._node_type = 0 self._cur_endpoint = cur_endpoint gloo = fluid.core.Gloo() - gloo.init(current_id, - len(eplist), - self._hdfs_path.rstrip("/") + "/pserver", - self._hdfs_name, self._hdfs_ugi, self._iface, - self._prefix) + gloo.set_rank(current_id) + gloo.set_size(len(eplist)) + gloo.set_prefix(self._prefix) + gloo.set_iface(self._iface) + gloo.set_timeout_seconds(self._init_timeout_seconds, + self._run_timeout_seconds) + if len(self._http_ip_port) != 0: + gloo.set_http_store(self._http_ip_port[0], + int(self._http_ip_port[1]), "pserver") + else: + gloo.set_hdfs_store(self._hdfs_path + "/pserver", + self._hdfs_name, self._hdfs_ugi) + gloo.init() self._node_type_comm = gloo gloo = fluid.core.Gloo() all_list = worker_endpoints + eplist - gloo.init( - all_list.index(self._cur_endpoint), - len(all_list), - self._hdfs_path.rstrip("/") + "/all", self._hdfs_name, - self._hdfs_ugi, self._iface, self._prefix) + gloo.set_rank(all_list.index(self._cur_endpoint)) + gloo.set_size(len(all_list)) + gloo.set_prefix(self._prefix) + gloo.set_iface(self._iface) + gloo.set_timeout_seconds(self._init_timeout_seconds, + self._run_timeout_seconds) + if len(self._http_ip_port) != 0: + gloo.set_http_store(self._http_ip_port[0], + int(self._http_ip_port[1]), "all") + else: + gloo.set_hdfs_store(self._hdfs_path + "/all", self._hdfs_name, + self._hdfs_ugi) + gloo.init() self._all_comm = gloo self._trainers_num = trainers_num self._server_endpoints = eplist @@ -620,6 +675,11 @@ class GeneralRoleMaker(RoleMakerBase): self._rank = all_list.index(self._cur_endpoint) self._size = len(all_list) self._worker_endpoints = worker_endpoints + if self._http_server is not None: + # set running status to False + self._http_server_d["running"] = False + # wait until child process exits + self._http_server.join() self._role_is_generated = True def all_gather(self, input): @@ -872,6 +932,16 @@ class GeneralRoleMaker(RoleMakerBase): return intf_name return "lo" + def __start_kv_server(self, http_server_d, size_d): + from paddle.fluid.incubate.fleet.utils.http_server import KVServer + http_server = KVServer(int(self._http_ip_port[1]), size_d) + http_server.start() + wait_seconds = 5 + while http_server_d.get("running", + False) and not http_server.shoud_stop(): + time.sleep(wait_seconds) + http_server.stop() + class UserDefinedRoleMaker(RoleMakerBase): """ diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index 210640f64c12c220304d31a5083de4e64756fd0d..c1ec749ac1fac35b9295689afa282adc765f49e8 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -567,6 +567,24 @@ class PSLib(Fleet): model_proto_file, table_var_names, load_combine) self._role_maker._barrier_worker() + def confirm(self): + """ + confirm all the updated params in current pass + """ + self._role_maker._barrier_worker() + if self._role_maker.is_first_worker(): + self._fleet_ptr.confirm() + self._role_maker._barrier_worker() + + def revert(self): + """ + revert all the updated params in current pass + """ + self._role_maker._barrier_worker() + if self._role_maker.is_first_worker(): + self._fleet_ptr.revert() + self._role_maker._barrier_worker() + def load_model(self, model_dir=None, **kwargs): """ load pslib model, there are at least 4 modes, these modes are the same diff --git a/python/paddle/fluid/incubate/fleet/utils/http_server.py b/python/paddle/fluid/incubate/fleet/utils/http_server.py new file mode 100644 index 0000000000000000000000000000000000000000..3573f417f3597d3feb1fbc4c1694f75b8e2086ce --- /dev/null +++ b/python/paddle/fluid/incubate/fleet/utils/http_server.py @@ -0,0 +1,187 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Http Server.""" + +import logging +import BaseHTTPServer +import SimpleHTTPServer +import time +import threading +import socket + + +def get_logger(name, level, fmt): + logger = logging.getLogger(name) + logger.setLevel(level) + handler = logging.FileHandler('http.log', mode='w') + formatter = logging.Formatter(fmt=fmt) + handler.setFormatter(formatter) + logger.addHandler(handler) + return logger + + +_http_server_logger = get_logger( + __name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s') + + +class KVHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): + """ + kv handler class for kv http server, + it defines the way to get/set kv in server. + """ + + def do_GET(self): + """ + get method for kv handler, get value according to key. + """ + log_str = "GET " + self.address_string() + self.path + paths = self.path.split('/') + if len(paths) < 3: + print('len of request path must be 3: ' + self.path) + self.send_status_code(400) + return + _, scope, key = paths + with self.server.kv_lock: + value = self.server.kv.get(scope, {}).get(key) + if value is None: + log_str += ' , key not found: ' + key + self.send_status_code(404) + else: + log_str += ' , key found: ' + key + self.send_response(200) + self.send_header("Content-Length", str(len(value))) + self.end_headers() + self.wfile.write(value) + _http_server_logger.info(log_str) + + def do_PUT(self): + """ + put method for kv handler, set value according to key. + """ + log_str = "PUT " + self.address_string() + self.path + paths = self.path.split('/') + if len(paths) < 3: + print('len of request path must be 3: ' + self.path) + self.send_status_code(400) + return + _, scope, key = paths + content_length = int(self.headers['Content-Length']) + try: + value = self.rfile.read(content_length) + except: + print("receive error invalid request") + self.send_status_code(404) + return + with self.server.kv_lock: + if self.server.kv.get(scope) is None: + self.server.kv[scope] = {} + self.server.kv[scope][key] = value + self.send_status_code(200) + _http_server_logger.info(log_str) + + def do_DELETE(self): + """ + delete method for kv handler, set value according to key. + """ + log_str = "DELETE " + self.address_string() + self.path + paths = self.path.split('/') + if len(paths) < 3: + print('len of request path must be 3: ' + self.path) + self.send_status_code(400) + return + _, scope, key = paths + with self.server.delete_kv_lock: + if self.server.delete_kv.get(scope) is None: + self.server.delete_kv[scope] = [] + self.server.delete_kv[scope].append(key) + self.send_status_code(200) + _http_server_logger.info(log_str) + + def log_message(self, format, *args): + """ + ignore all logging messages in kv handler. + """ + pass + + def send_status_code(self, code): + """ + send status code back to client. + """ + self.send_response(code) + self.send_header("Content-Length", 0) + self.end_headers() + + +class KVHTTPServer(BaseHTTPServer.HTTPServer, object): + """ + it is a http server storing kv pairs. + """ + + def __init__(self, port, handler): + """Init.""" + super(KVHTTPServer, self).__init__(('', port), handler) + self.delete_kv_lock = threading.Lock() + self.delete_kv = {} + self.kv_lock = threading.Lock() + self.kv = {} + + def get_deleted_size(self, key): + """ + get deleted size in key. + """ + ret = 0 + with self.delete_kv_lock: + ret = self.delete_kv.get(key, 0) + return ret + + +class KVServer: + """ + it is a server storing kv pairs, has a http server inside. + """ + + def __init__(self, port, size={}): + """Init.""" + self.http_server = KVHTTPServer(port, KVHandler) + self.listen_thread = None + self.size = {} + + def start(self): + """ + start server until user calls stop to let it quit. + """ + self.listen_thread = threading.Thread( + target=lambda: self.http_server.serve_forever()) + self.listen_thread.start() + + def stop(self): + """ + stop server and clear its resources. + """ + self.http_server.shutdown() + self.listen_thread.join() + self.http_server.server_close() + + def shoud_stop(self): + """ + return whether the server should stop. + + Returns: + ret(bool): whether the server should stop + """ + for key in self.size: + s = self.http_server.get_deleted_size(key) + if s != self.size.get(key, 0): + return False + return True diff --git a/python/paddle/fluid/tests/unittests/test_fleet_nocvm_1.py b/python/paddle/fluid/tests/unittests/test_fleet_nocvm_1.py index 5aa28673437b06de9376e1944a06d72f2668c385..a3038d1fb889d9fb40e30b4cebfad6db43de19b2 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_nocvm_1.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_nocvm_1.py @@ -96,6 +96,8 @@ class TestFleet1(unittest.TestCase): fleet.save_one_table(0, "./model_002", prefix="hahaha") fleet.load_model("./model_0003") fleet.load_one_table(0, "./model_004") + fleet.confirm() + fleet.revert() except: print("do not support pslib test, skip") return diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py index 5fee4458a715e9079d683cd59281280112ca563c..4e7de7c6ba0e861bccc550688c8bcaaf690cf34d 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py @@ -97,7 +97,7 @@ class TestCloudRoleMaker2(unittest.TestCase): role4._worker_gather(1) role4._get_rank() role4._get_size() - role4._all_comm.init(0, 0, "", "", "", "", "") + role4._all_comm.init() role5 = GeneralRoleMaker(path="./test_gloo_5") role5.get_local_endpoint() role5.get_local_endpoint() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_3.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_3.py new file mode 100644 index 0000000000000000000000000000000000000000..be1e228f4364f365ecb9f8d20918f66585b8af59 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_3.py @@ -0,0 +1,84 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test cloud role maker.""" + +from __future__ import print_function +import os +import unittest +import paddle.fluid.incubate.fleet.base.role_maker as role_maker + + +class TestCloudRoleMaker(unittest.TestCase): + """ + Test cases for PaddleCloudRoleMaker. + """ + + def setUp(self): + """Set up, set envs.""" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001" + + def test_pslib_1(self): + """Test cases for pslib.""" + import paddle.fluid as fluid + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + from paddle.fluid.incubate.fleet.parameter_server.pslib import PSLib + from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker + try: + import netifaces + except: + print("warning: no netifaces, skip test_pslib_1") + return + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36002" + os.environ["PADDLE_TRAINER_ID"] = "0" + role_maker = GeneralRoleMaker( + init_timeout_seconds=100, + run_timeout_seconds=100, + http_ip_port="127.0.0.1:36003") + role_maker.generate_role() + place = fluid.CPUPlace() + exe = fluid.Executor(place) + fleet.init(role_maker) + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + with fluid.program_guard(train_program, startup_program): + show = fluid.layers.data(name="show", shape=[-1, 1], \ + dtype="float32", lod_level=1, append_batch_size=False) + fc = fluid.layers.fc(input=show, size=1, act=None) + label = fluid.layers.data(name="click", shape=[-1, 1], \ + dtype="int64", lod_level=1, append_batch_size=False) + label_cast = fluid.layers.cast(label, dtype='float32') + cost = fluid.layers.log_loss(fc, label_cast) + try: + adam = fluid.optimizer.Adam(learning_rate=0.000005) + adam = fleet.distributed_optimizer(adam) + adam.minimize([cost], [scope]) + fleet.run_server() + http_server_d = {} + http_server_d["running"] = False + size_d = {} + role_maker._GeneralRoleMaker__start_kv_server(http_server_d, size_d) + except: + print("do not support pslib test, skip") + return + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_4.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_4.py new file mode 100644 index 0000000000000000000000000000000000000000..dd5cd715ecd1ed9ebc30a22cb924255d278643ed --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_4.py @@ -0,0 +1,196 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test cloud role maker.""" + +from __future__ import print_function +import os +import unittest +import paddle.fluid.incubate.fleet.base.role_maker as role_maker + + +class TestCloudRoleMaker(unittest.TestCase): + """ + Test cases for PaddleCloudRoleMaker. + """ + + def setUp(self): + """Set up, set envs.""" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001" + + def test_pslib_1(self): + """Test cases for pslib.""" + import sys + import threading + import paddle.fluid as fluid + try: + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + from paddle.fluid.incubate.fleet.parameter_server.pslib import PSLib + from paddle.fluid.incubate.fleet.base.role_maker import \ + GeneralRoleMaker + from paddle.fluid.incubate.fleet.utils.http_server import KVHandler + from paddle.fluid.incubate.fleet.utils.http_server import KVServer + from paddle.fluid.incubate.fleet.utils.http_server import \ + KVHTTPServer + except: + print("warning: no fleet, skip test_pslib_4") + return + + try: + import netifaces + except: + print("warning: no netifaces, skip test_pslib_4") + return + + class FakeStream(): + """ + it is a fake stream only for test. + """ + + def write(self, a): + """ + write a to stream, do nothing + + Args: + a(str): the string to write + """ + pass + + def read(self, b): + """ + read data of len b from stream, do nothing + + Args: + b(str): the len to read + + Returns: + c(str): the result + """ + if b == 0: + raise ValueError("this is only for test") + return "fake" + + import os + + try: + + class TmpKVHander(KVHandler): + """ + it is a fake handler only for this test case. + """ + + def __init__(self, server): + """Init.""" + self.path = "a/b/c" + self.server = server + self.wfile = FakeStream() + self.rfile = FakeStream() + self.headers = {} + self.headers['Content-Length'] = 0 + + def address_string(self): + """ + fake address string, it will do nothing. + """ + return "123" + + def send_response(self, code): + """ + fake send response, it will do nothing. + + Args: + code(int): error code + """ + pass + + def send_header(self, a, b): + """ + fake send header, it will do nothing. + + Args: + a(str): some header + b(str): some header + """ + pass + + def end_headers(self): + """ + fake end header, it will do nothing. + """ + pass + except: + print("warning: no KVHandler, skip test_pslib_4") + return + + import sys + + try: + + class TmpServer(KVHTTPServer): + """ + it is a fake server only for this test case. + """ + + def __init__(self): + """Init.""" + self.delete_kv_lock = threading.Lock() + self.delete_kv = {} + self.kv_lock = threading.Lock() + self.kv = {} + except: + print("warning: no KVHTTPServer, skip test_pslib_4") + return + + try: + + class TmpS(KVServer): + """ + it is a fake server only for this test case. + """ + + def __init__(self): + """Init.""" + self.http_server = TmpServer() + self.listen_thread = None + self.size = {} + self.size["a"] = 999 + except: + print("warning: no KVServer, skip test_pslib_4") + return + + s = TmpServer() + h = TmpKVHander(s) + h.do_GET() + h.path = "a/b" + h.do_GET() + h.do_PUT() + h.do_DELETE() + h.path = "a/b/c" + s.kv["b"] = {} + s.kv["b"]["c"] = "456" + h.do_GET() + h.path = "a/d/e" + h.do_PUT() + h.headers['Content-Length'] = 1 + h.do_PUT() + h.do_DELETE() + h.log_message("666") + s.get_deleted_size("haha") + s1 = TmpS() + s1.shoud_stop() + + +if __name__ == "__main__": + unittest.main()