未验证 提交 ae382d1f 编写于 作者: X xiaoxiao-luomu 提交者: GitHub

gloo hdfs set check & gloo connect retry (#35750)

* gloo hdfs set check & gloo connect retry

* add vlog

* print gloo connect addr & add vlog

* .

* modify vlof

* modify vlog

* modify vlog
上级 efd35384
......@@ -71,6 +71,18 @@ void HdfsStore::set(const std::string& key, const std::vector<char>& data) {
}
}
paddle::framework::fs_mv(tmp, path);
auto start = std::chrono::steady_clock::now();
while (paddle::framework::fs_exists(path) == false) {
VLOG(0) << "HdfsStore::set fs_mv retrying...";
paddle::framework::fs_mv(tmp, path);
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - start);
if (wait_timeout_ != gloo::kNoTimeout && elapsed > wait_timeout_) {
PADDLE_THROW(paddle::platform::errors::ExecutionTimeout(
"fs_mv failed, tmp: %s, path: %s", tmp, path));
}
std::this_thread::sleep_for(std::chrono::milliseconds(wait_sleep_ms_));
}
#endif
}
......@@ -140,6 +152,7 @@ void HdfsStore::wait(const std::vector<std::string>& keys,
auto start = std::chrono::steady_clock::now();
std::vector<bool> check_key_status(keys.size(), false);
while (!Check(keys, &check_key_status)) {
VLOG(0) << "HdfsStore::wait checking repeatedly...";
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - start);
if (wait_timeout_ != gloo::kNoTimeout && elapsed > wait_timeout_) {
......@@ -209,6 +222,8 @@ void ParallelConnectContext::connectFullMesh(
// Create pairs
auto transportContext = dev->createContext(rank, size);
transportContext->setTimeout(getTimeout());
VLOG(0) << "transportContext timeout: " << getTimeout().count()
<< ", curr rank: " << rank;
for (int i = 0; i < size; i++) {
if (i == rank) {
continue;
......@@ -225,6 +240,7 @@ void ParallelConnectContext::connectFullMesh(
std::vector<std::shared_ptr<std::thread>> connect_threads(thread_num_);
// Connect every pair
VLOG(0) << "connect_thread_num: " << thread_num_ << ", size: " << size;
for (uint32_t i = 0; i < connect_threads.size(); ++i) {
connect_threads[i].reset(new std::thread(
[&store, &transportContext, total_add_size, this](
......@@ -252,10 +268,36 @@ void ParallelConnectContext::connectFullMesh(
sleep(5);
--max_retry_times;
}
auto addr = extractAddress(allAddrs, i);
if (addr.empty()) {
VLOG(0) << "peer address is null";
}
Impl impl_;
memcpy(&impl_, addr.data(), sizeof(impl_));
struct sockaddr_in* sa = (struct sockaddr_in*)&(impl_.ss);
std::string ip = getCharIpAddr(sa->sin_addr.s_addr);
VLOG(0) << "peer " << i << " ip addr: " << ip
<< ", port: " << sa->sin_port;
auto start = std::chrono::steady_clock::now();
std::chrono::seconds connect_wait_timeout_ =
std::chrono::seconds(600);
while (true) {
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - start);
if (elapsed > connect_wait_timeout_) {
break;
}
try {
transportContext->getPair(i)->connect(addr);
break;
} catch (...) {
VLOG(0) << "gloo connect failed, retrying...";
}
}
transportContext->getPair(i)->connect(addr);
}
VLOG(0) << "peer connected success";
},
i, connect_threads.size()));
}
......@@ -264,6 +306,7 @@ void ParallelConnectContext::connectFullMesh(
}
device_ = dev;
transportContext_ = std::move(transportContext);
VLOG(0) << "ParallelConnectContext::connectFullMesh() is over";
}
#endif
} // namespace rendezvous
......
......@@ -97,6 +97,26 @@ class ParallelConnectContext : public gloo::rendezvous::Context {
// slowly in case big size, especialy in HdfsStore
void connectFullMesh(Store& store, // NOLINT
std::shared_ptr<transport::Device>& dev); // NOLINT
struct Impl {
// IP address of the listening socket.
struct sockaddr_storage ss;
// Sequence number of this address.
// If this is equal to -1, the address is assumed to
// represent the listening socket of a device. The sequence number
// must be set before it can be used by a pair.
ssize_t seq{-1};
};
std::string getCharIpAddr(uint32_t ipAddress) {
const int NBYTES = 4;
uint8_t octet[NBYTES];
char ipAddressFinal[16];
for (int i = 0; i < NBYTES; i++) {
octet[i] = ipAddress >> (i * 8);
}
snprintf(ipAddressFinal, sizeof(ipAddressFinal), "%d.%d.%d.%d", octet[0],
octet[1], octet[2], octet[3]);
return std::string(ipAddressFinal);
}
protected:
int thread_num_ = 6;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册