From ae382d1fea6c55ff44f1439f1ca8df08048aa3d0 Mon Sep 17 00:00:00 2001 From: xiaoxiao-luomu <73728031+xiaoxiao-luomu@users.noreply.github.com> Date: Mon, 27 Sep 2021 22:45:55 +0800 Subject: [PATCH] gloo hdfs set check & gloo connect retry (#35750) * gloo hdfs set check & gloo connect retry * add vlog * print gloo connect addr & add vlog * . * modify vlof * modify vlog * modify vlog --- paddle/fluid/framework/fleet/gloo_wrapper.cc | 45 +++++++++++++++++++- paddle/fluid/framework/fleet/gloo_wrapper.h | 20 +++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/paddle/fluid/framework/fleet/gloo_wrapper.cc b/paddle/fluid/framework/fleet/gloo_wrapper.cc index 489cef9f046..14e5f2f5192 100644 --- a/paddle/fluid/framework/fleet/gloo_wrapper.cc +++ b/paddle/fluid/framework/fleet/gloo_wrapper.cc @@ -71,6 +71,18 @@ void HdfsStore::set(const std::string& key, const std::vector& data) { } } paddle::framework::fs_mv(tmp, path); + auto start = std::chrono::steady_clock::now(); + while (paddle::framework::fs_exists(path) == false) { + VLOG(0) << "HdfsStore::set fs_mv retrying..."; + paddle::framework::fs_mv(tmp, path); + auto elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start); + if (wait_timeout_ != gloo::kNoTimeout && elapsed > wait_timeout_) { + PADDLE_THROW(paddle::platform::errors::ExecutionTimeout( + "fs_mv failed, tmp: %s, path: %s", tmp, path)); + } + std::this_thread::sleep_for(std::chrono::milliseconds(wait_sleep_ms_)); + } #endif } @@ -140,6 +152,7 @@ void HdfsStore::wait(const std::vector& keys, auto start = std::chrono::steady_clock::now(); std::vector check_key_status(keys.size(), false); while (!Check(keys, &check_key_status)) { + VLOG(0) << "HdfsStore::wait checking repeatedly..."; auto elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start); if (wait_timeout_ != gloo::kNoTimeout && elapsed > wait_timeout_) { @@ -209,6 +222,8 @@ void ParallelConnectContext::connectFullMesh( // Create pairs auto transportContext = dev->createContext(rank, size); transportContext->setTimeout(getTimeout()); + VLOG(0) << "transportContext timeout: " << getTimeout().count() + << ", curr rank: " << rank; for (int i = 0; i < size; i++) { if (i == rank) { continue; @@ -225,6 +240,7 @@ void ParallelConnectContext::connectFullMesh( std::vector> connect_threads(thread_num_); // Connect every pair + VLOG(0) << "connect_thread_num: " << thread_num_ << ", size: " << size; for (uint32_t i = 0; i < connect_threads.size(); ++i) { connect_threads[i].reset(new std::thread( [&store, &transportContext, total_add_size, this]( @@ -252,10 +268,36 @@ void ParallelConnectContext::connectFullMesh( sleep(5); --max_retry_times; } - auto addr = extractAddress(allAddrs, i); + if (addr.empty()) { + VLOG(0) << "peer address is null"; + } + Impl impl_; + memcpy(&impl_, addr.data(), sizeof(impl_)); + struct sockaddr_in* sa = (struct sockaddr_in*)&(impl_.ss); + std::string ip = getCharIpAddr(sa->sin_addr.s_addr); + VLOG(0) << "peer " << i << " ip addr: " << ip + << ", port: " << sa->sin_port; + + auto start = std::chrono::steady_clock::now(); + std::chrono::seconds connect_wait_timeout_ = + std::chrono::seconds(600); + while (true) { + auto elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start); + if (elapsed > connect_wait_timeout_) { + break; + } + try { + transportContext->getPair(i)->connect(addr); + break; + } catch (...) { + VLOG(0) << "gloo connect failed, retrying..."; + } + } transportContext->getPair(i)->connect(addr); } + VLOG(0) << "peer connected success"; }, i, connect_threads.size())); } @@ -264,6 +306,7 @@ void ParallelConnectContext::connectFullMesh( } device_ = dev; transportContext_ = std::move(transportContext); + VLOG(0) << "ParallelConnectContext::connectFullMesh() is over"; } #endif } // namespace rendezvous diff --git a/paddle/fluid/framework/fleet/gloo_wrapper.h b/paddle/fluid/framework/fleet/gloo_wrapper.h index 4eb40da1bfd..eafc991fbca 100644 --- a/paddle/fluid/framework/fleet/gloo_wrapper.h +++ b/paddle/fluid/framework/fleet/gloo_wrapper.h @@ -97,6 +97,26 @@ class ParallelConnectContext : public gloo::rendezvous::Context { // slowly in case big size, especialy in HdfsStore void connectFullMesh(Store& store, // NOLINT std::shared_ptr& dev); // NOLINT + struct Impl { + // IP address of the listening socket. + struct sockaddr_storage ss; + // Sequence number of this address. + // If this is equal to -1, the address is assumed to + // represent the listening socket of a device. The sequence number + // must be set before it can be used by a pair. + ssize_t seq{-1}; + }; + std::string getCharIpAddr(uint32_t ipAddress) { + const int NBYTES = 4; + uint8_t octet[NBYTES]; + char ipAddressFinal[16]; + for (int i = 0; i < NBYTES; i++) { + octet[i] = ipAddress >> (i * 8); + } + snprintf(ipAddressFinal, sizeof(ipAddressFinal), "%d.%d.%d.%d", octet[0], + octet[1], octet[2], octet[3]); + return std::string(ipAddressFinal); + } protected: int thread_num_ = 6; -- GitLab