/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/framework/fleet/gloo_wrapper.h" #include "paddle/fluid/framework/io/fs.h" #include "paddle/fluid/string/string_helper.h" namespace gloo { namespace rendezvous { constexpr int kNodeSize = 136; HdfsStore::HdfsStore(const std::string& path) { path_ = path; wait_sleep_ms_ = 10000; wait_timeout_ = std::chrono::seconds(999999999); retry_times_ = 100; } void HdfsStore::set(const std::string& key, const std::vector& data) { #ifdef PADDLE_WITH_GLOO auto tmp = TmpPath(key); auto path = ObjectPath(key); bool is_exists = paddle::framework::fs_exists(path); if (is_exists) { LOG(WARNING) << "path exists, will be removed: " << path; paddle::framework::fs_remove(path); } int err_no = 0; for (int i = 1; i <= retry_times_; ++i) { err_no = 0; std::shared_ptr fp = paddle::framework::fs_open_write(tmp, &err_no, ""); size_t write_count = fwrite_unlocked(data.data(), 1, data.size(), fp.get()); if (write_count != data.size()) { VLOG(0) << "fwrite_unlocked failed, retry times " << i << " write_count " << write_count << " data.size() " << data.size(); err_no = -1; } fp.reset(); if (err_no != 0) { VLOG(0) << "fs_open_write failed, retry times " << i << " err no " << err_no; sleep(wait_sleep_ms_ / 1000); paddle::framework::fs_remove(tmp); if (i == retry_times_) { VLOG(0) << "fs_open_write failed, retry times reaches limit"; PADDLE_THROW(paddle::platform::errors::PreconditionNotMet( "fs_open_write failed, retry times reaches %d limit.", retry_times_)); } } else { break; } } paddle::framework::fs_mv(tmp, path); #endif } #ifdef PADDLE_WITH_GLOO int retry_do_func(std::function func, uint32_t max_try_time, uint32_t retry_interval_ms) { for (uint32_t i = 0; i < max_try_time; ++i) { if (func() == 0) { return 0; } #ifdef _LINUX usleep(retry_interval_ms * 1000); #endif } return -1; } #endif std::vector HdfsStore::get(const std::string& key) { auto path = ObjectPath(key); std::vector result; #ifdef PADDLE_WITH_GLOO // block until key is set wait({key}); int ret = retry_do_func( [&path]() { return paddle::framework::fs_exists(path) ? 0 : -1; }, 5, wait_sleep_ms_); bool is_exists = (ret == 0); PADDLE_ENFORCE_EQ(is_exists, true, paddle::platform::errors::NotFound( "HdfsStore::get, path not exists: " + path)); int read_status = retry_do_func( [&path, &result]() { result.clear(); int err_no = 0; { std::shared_ptr fp = paddle::framework::fs_open_read(path, &err_no, ""); char buffer = '\0'; size_t read_count = 0; while (fread(&buffer, 1, 1, fp.get()) == 1) { ++read_count; result.push_back(buffer); } VLOG(3) << "HdfsStore::get read_count " << read_count; } return err_no; }, 5, wait_sleep_ms_); PADDLE_ENFORCE_EQ(read_status, 0, paddle::platform::errors::Fatal( "HdfsStore::get, path read faied: " + path)); #endif return result; } void HdfsStore::wait(const std::vector& keys) { #ifdef PADDLE_WITH_GLOO wait(keys, wait_timeout_); // NOLINT #endif } void HdfsStore::wait(const std::vector& keys, const std::chrono::milliseconds&) { // NOLINT #ifdef PADDLE_WITH_GLOO auto start = std::chrono::steady_clock::now(); std::vector check_key_status(keys.size(), false); while (!Check(keys, &check_key_status)) { auto elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start); if (wait_timeout_ != gloo::kNoTimeout && elapsed > wait_timeout_) { int32_t last_check_rank = -1; for (size_t i = 0; i < check_key_status.size(); ++i) { if (!check_key_status[i]) { last_check_rank = i; break; } } PADDLE_THROW(paddle::platform::errors::ExecutionTimeout( "TIMEOUT self_rank = %d pair_rank = %d", self_rank_, last_check_rank)); } std::this_thread::sleep_for(std::chrono::milliseconds(wait_sleep_ms_)); } #endif } void HdfsStore::SetTimeoutSeconds(int timeout_seconds) { wait_timeout_ = std::chrono::seconds(timeout_seconds); } std::string HdfsStore::EncodeName(const std::string& name) { return ::paddle::string::erase_spaces(name); } std::string HdfsStore::TmpPath(const std::string& name) { return path_ + "/" + EncodeName(name) + "_tmp"; } std::string HdfsStore::ObjectPath(const std::string& name) { return path_ + "/" + EncodeName(name); } bool HdfsStore::Check(const std::vector& keys, std::vector* keys_check_status) { #ifdef PADDLE_WITH_GLOO bool ret = true; std::vector paths; for (const auto& key : keys) { paths.push_back(ObjectPath(key)); } for (size_t i = 0; i < paths.size(); ++i) { if ((*keys_check_status)[i]) { continue; } const auto& path = paths[i]; bool is_exists = paddle::framework::fs_exists(path); VLOG(3) << "HdfsStore::Check " << is_exists << " path " << path; if (!is_exists) { ret = false; } (*keys_check_status)[i] = is_exists; } return ret; #else VLOG(0) << "HdfsStore::Check does nothing when no gloo"; #endif return true; } #ifdef PADDLE_WITH_GLOO void ParallelConnectContext::connectFullMesh( Store& store, std::shared_ptr& dev) { std::vector allBytes; // Create pairs auto transportContext = dev->createContext(rank, size); transportContext->setTimeout(getTimeout()); for (int i = 0; i < size; i++) { if (i == rank) { continue; } auto& pair = transportContext->createPair(i); auto addrBytes = pair->address().bytes(); allBytes.insert(allBytes.end(), addrBytes.begin(), addrBytes.end()); } std::ostringstream storeKey; storeKey << rank; store.set(storeKey.str(), allBytes); auto total_add_size = kNodeSize * (size - 1); std::vector> connect_threads(thread_num_); // Connect every pair for (uint32_t i = 0; i < connect_threads.size(); ++i) { connect_threads[i].reset(new std::thread( [&store, &transportContext, total_add_size, this]( size_t thread_idx, size_t thread_num) -> void { for (int i = thread_idx; i < size; i += thread_num) { if (i == rank) { continue; } // Wait for address of other side of this pair to become available std::string key = std::to_string(i); store.wait({key}, getTimeout()); std::vector allAddrs; auto max_retry_times = 5; // Connect to other side of this pair while (max_retry_times > 0) { allAddrs = store.get(key); VLOG(3) << "store get all address size: " << allAddrs.size() << " except: " << total_add_size; if (allAddrs.size() == static_cast(total_add_size)) { break; } --max_retry_times; } auto addr = extractAddress(allAddrs, i); transportContext->getPair(i)->connect(addr); } }, i, connect_threads.size())); } for (uint32_t i = 0; i < connect_threads.size(); ++i) { connect_threads[i]->join(); } device_ = dev; transportContext_ = std::move(transportContext); } #endif } // namespace rendezvous } // namespace gloo namespace paddle { namespace framework { void GlooWrapper::Init() { if (is_initialized_) { return; } #ifdef PADDLE_WITH_GLOO gloo::transport::tcp::attr attr; attr.iface = iface_; std::shared_ptr file_store = nullptr; std::shared_ptr http_store = nullptr; auto context = std::make_shared(rank_, size_); context->setTimeout(run_timeout_); auto dev = gloo::transport::tcp::CreateDevice(attr); switch (store_type_) { case GlooStoreType::HDFS: { std::string cmd = std::string("${HADOOP_HOME}/bin/hadoop fs"); cmd += " -D fs.default.name=" + hdfs_name_; cmd += " -D hadoop.job.ugi=" + hdfs_ugi_; paddle::framework::hdfs_set_command(cmd); file_store = std::make_shared(hdfs_path_); file_store->SetTimeoutSeconds(init_timeout_.count()); auto prefix_store = std::make_shared(prefix_, *file_store); context->connectFullMesh(*prefix_store, dev); break; } case GlooStoreType::HTTP: { http_store = std::make_shared( http_ip_, http_port_, prefix_ + "_" + http_scope_, rank_); http_store->SetTimeoutSeconds(init_timeout_.count()); context->connectFullMesh(*http_store, dev); http_store->Finalize(); VLOG(3) << "after calling http_store->Finalize."; break; } default: LOG(ERROR) << "unknown store type " << store_type_; exit(-1); } context_ = std::move(context); #endif is_initialized_ = true; VLOG(3) << "gloo initialized done."; } template std::vector GlooWrapper::AllReduce( std::vector& sendbuf, // NOLINT const std::string& mode); template std::vector GlooWrapper::AllReduce( std::vector& sendbuf, // NOLINT const std::string& mode); template std::vector GlooWrapper::AllReduce( std::vector& sendbuf, // NOLINT const std::string& mode); template std::vector GlooWrapper::AllReduce( std::vector& sendbuf, // NOLINT const std::string& mode); template std::vector GlooWrapper::AllGather( int64_t& input); // NOLINT template std::vector GlooWrapper::AllGather( uint64_t& input); // NOLINT template std::vector GlooWrapper::AllGather( float& input); // NOLINT template std::vector GlooWrapper::AllGather( double& input); // NOLINT } // namespace framework } // namespace paddle