gloo_wrapper.cc 10.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
  http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/framework/fleet/gloo_wrapper.h"
#include "paddle/fluid/framework/io/fs.h"
14
#include "paddle/fluid/string/string_helper.h"
15 16 17 18

namespace gloo {
namespace rendezvous {

19 20
constexpr int kNodeSize = 136;

21 22
HdfsStore::HdfsStore(const std::string& path) {
  path_ = path;
23
  wait_sleep_ms_ = 10000;
24
  wait_timeout_ = std::chrono::seconds(999999999);
X
xujiaqi01 已提交
25
  retry_times_ = 100;
26 27 28 29 30 31 32 33 34 35 36 37
}

void HdfsStore::set(const std::string& key, const std::vector<char>& data) {
#ifdef PADDLE_WITH_GLOO
  auto tmp = TmpPath(key);
  auto path = ObjectPath(key);
  bool is_exists = paddle::framework::fs_exists(path);
  if (is_exists) {
    LOG(WARNING) << "path exists, will be removed: " << path;
    paddle::framework::fs_remove(path);
  }
  int err_no = 0;
X
xujiaqi01 已提交
38
  for (int i = 1; i <= retry_times_; ++i) {
39
    err_no = 0;
X
xujiaqi01 已提交
40 41 42 43 44 45
    std::shared_ptr<FILE> fp =
        paddle::framework::fs_open_write(tmp, &err_no, "");
    size_t write_count = fwrite_unlocked(data.data(), 1, data.size(), fp.get());
    if (write_count != data.size()) {
      VLOG(0) << "fwrite_unlocked failed, retry times " << i << " write_count "
              << write_count << " data.size() " << data.size();
46
      err_no = -1;
X
xujiaqi01 已提交
47 48
    }
    fp.reset();
49 50 51 52 53 54 55
    if (err_no != 0) {
      VLOG(0) << "fs_open_write failed, retry times " << i << " err no "
              << err_no;
      sleep(wait_sleep_ms_ / 1000);
      paddle::framework::fs_remove(tmp);
      if (i == retry_times_) {
        VLOG(0) << "fs_open_write failed, retry times reaches limit";
56 57 58
        PADDLE_THROW(paddle::platform::errors::PreconditionNotMet(
            "fs_open_write failed, retry times reaches %d limit.",
            retry_times_));
59 60 61 62
      }
    } else {
      break;
    }
X
xujiaqi01 已提交
63
  }
64 65 66 67
  paddle::framework::fs_mv(tmp, path);
#endif
}

68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
#ifdef PADDLE_WITH_GLOO
int retry_do_func(std::function<int(void)> func, uint32_t max_try_time,
                  uint32_t retry_interval_ms) {
  for (uint32_t i = 0; i < max_try_time; ++i) {
    if (func() == 0) {
      return 0;
    }
#ifdef _LINUX
    usleep(retry_interval_ms * 1000);
#endif
  }
  return -1;
}
#endif

83 84 85 86 87 88
std::vector<char> HdfsStore::get(const std::string& key) {
  auto path = ObjectPath(key);
  std::vector<char> result;
#ifdef PADDLE_WITH_GLOO
  // block until key is set
  wait({key});
89 90 91 92
  int ret = retry_do_func(
      [&path]() { return paddle::framework::fs_exists(path) ? 0 : -1; }, 5,
      wait_sleep_ms_);
  bool is_exists = (ret == 0);
93 94 95
  PADDLE_ENFORCE_EQ(is_exists, true,
                    paddle::platform::errors::NotFound(
                        "HdfsStore::get, path not exists: " + path));
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117

  int read_status = retry_do_func(
      [&path, &result]() {
        result.clear();
        int err_no = 0;
        {
          std::shared_ptr<FILE> fp =
              paddle::framework::fs_open_read(path, &err_no, "");
          char buffer = '\0';
          size_t read_count = 0;
          while (fread(&buffer, 1, 1, fp.get()) == 1) {
            ++read_count;
            result.push_back(buffer);
          }
          VLOG(3) << "HdfsStore::get read_count " << read_count;
        }
        return err_no;
      },
      5, wait_sleep_ms_);
  PADDLE_ENFORCE_EQ(read_status, 0,
                    paddle::platform::errors::Fatal(
                        "HdfsStore::get, path read faied: " + path));
118 119 120 121 122 123 124 125 126 127 128 129 130 131
#endif
  return result;
}

void HdfsStore::wait(const std::vector<std::string>& keys) {
#ifdef PADDLE_WITH_GLOO
  wait(keys, wait_timeout_);  // NOLINT
#endif
}

void HdfsStore::wait(const std::vector<std::string>& keys,
                     const std::chrono::milliseconds&) {  // NOLINT
#ifdef PADDLE_WITH_GLOO
  auto start = std::chrono::steady_clock::now();
132 133
  std::vector<bool> check_key_status(keys.size(), false);
  while (!Check(keys, &check_key_status)) {
134 135 136
    auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
        std::chrono::steady_clock::now() - start);
    if (wait_timeout_ != gloo::kNoTimeout && elapsed > wait_timeout_) {
137 138 139 140 141 142 143
      int32_t last_check_rank = -1;
      for (size_t i = 0; i < check_key_status.size(); ++i) {
        if (!check_key_status[i]) {
          last_check_rank = i;
          break;
        }
      }
144 145 146
      PADDLE_THROW(paddle::platform::errors::ExecutionTimeout(
          "TIMEOUT self_rank = %d pair_rank = %d", self_rank_,
          last_check_rank));
147 148 149 150 151 152
    }
    std::this_thread::sleep_for(std::chrono::milliseconds(wait_sleep_ms_));
  }
#endif
}

153 154 155 156
void HdfsStore::SetTimeoutSeconds(int timeout_seconds) {
  wait_timeout_ = std::chrono::seconds(timeout_seconds);
}

157
std::string HdfsStore::EncodeName(const std::string& name) {
158
  return ::paddle::string::erase_spaces(name);
159 160 161 162 163 164 165 166 167 168
}

std::string HdfsStore::TmpPath(const std::string& name) {
  return path_ + "/" + EncodeName(name) + "_tmp";
}

std::string HdfsStore::ObjectPath(const std::string& name) {
  return path_ + "/" + EncodeName(name);
}

169 170
bool HdfsStore::Check(const std::vector<std::string>& keys,
                      std::vector<bool>* keys_check_status) {
171
#ifdef PADDLE_WITH_GLOO
172
  bool ret = true;
173 174 175 176
  std::vector<std::string> paths;
  for (const auto& key : keys) {
    paths.push_back(ObjectPath(key));
  }
177 178 179 180 181
  for (size_t i = 0; i < paths.size(); ++i) {
    if ((*keys_check_status)[i]) {
      continue;
    }
    const auto& path = paths[i];
182 183 184
    bool is_exists = paddle::framework::fs_exists(path);
    VLOG(3) << "HdfsStore::Check " << is_exists << " path " << path;
    if (!is_exists) {
185
      ret = false;
186
    }
187
    (*keys_check_status)[i] = is_exists;
188
  }
189 190 191
  return ret;
#else
  VLOG(0) << "HdfsStore::Check does nothing when no gloo";
192 193 194 195
#endif
  return true;
}

196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
#ifdef PADDLE_WITH_GLOO
void ParallelConnectContext::connectFullMesh(
    Store& store, std::shared_ptr<transport::Device>& dev) {
  std::vector<char> allBytes;
  // Create pairs
  auto transportContext = dev->createContext(rank, size);
  transportContext->setTimeout(getTimeout());
  for (int i = 0; i < size; i++) {
    if (i == rank) {
      continue;
    }
    auto& pair = transportContext->createPair(i);
    auto addrBytes = pair->address().bytes();
    allBytes.insert(allBytes.end(), addrBytes.begin(), addrBytes.end());
  }
  std::ostringstream storeKey;
  storeKey << rank;
  store.set(storeKey.str(), allBytes);

215 216
  auto total_add_size = kNodeSize * (size - 1);

217 218 219 220
  std::vector<std::shared_ptr<std::thread>> connect_threads(thread_num_);
  // Connect every pair
  for (uint32_t i = 0; i < connect_threads.size(); ++i) {
    connect_threads[i].reset(new std::thread(
221 222
        [&store, &transportContext, total_add_size, this](
            size_t thread_idx, size_t thread_num) -> void {
223 224 225 226 227 228 229
          for (int i = thread_idx; i < size; i += thread_num) {
            if (i == rank) {
              continue;
            }
            // Wait for address of other side of this pair to become available
            std::string key = std::to_string(i);
            store.wait({key}, getTimeout());
230 231 232

            std::vector<char> allAddrs;
            auto max_retry_times = 5;
233
            // Connect to other side of this pair
234 235 236 237 238 239 240 241 242 243 244 245 246

            while (max_retry_times > 0) {
              allAddrs = store.get(key);

              VLOG(3) << "store get all address size: " << allAddrs.size()
                      << " except: " << total_add_size;
              if (allAddrs.size() == static_cast<size_t>(total_add_size)) {
                break;
              }

              --max_retry_times;
            }

247 248 249 250 251 252 253 254 255 256 257 258 259
            auto addr = extractAddress(allAddrs, i);
            transportContext->getPair(i)->connect(addr);
          }
        },
        i, connect_threads.size()));
  }
  for (uint32_t i = 0; i < connect_threads.size(); ++i) {
    connect_threads[i]->join();
  }
  device_ = dev;
  transportContext_ = std::move(transportContext);
}
#endif
260 261 262 263 264 265
}  // namespace rendezvous
}  // namespace gloo

namespace paddle {
namespace framework {

266
void GlooWrapper::Init() {
267 268 269 270 271
  if (is_initialized_) {
    return;
  }
#ifdef PADDLE_WITH_GLOO
  gloo::transport::tcp::attr attr;
272 273 274
  attr.iface = iface_;
  std::shared_ptr<gloo::rendezvous::HdfsStore> file_store = nullptr;
  std::shared_ptr<gloo::rendezvous::HTTPStore> http_store = nullptr;
L
lilong12 已提交
275
  auto context = std::make_shared<gloo::rendezvous::Context>(rank_, size_);
276
  context->setTimeout(run_timeout_);
277
  auto dev = gloo::transport::tcp::CreateDevice(attr);
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296
  switch (store_type_) {
    case GlooStoreType::HDFS: {
      std::string cmd = std::string("${HADOOP_HOME}/bin/hadoop fs");
      cmd += " -D fs.default.name=" + hdfs_name_;
      cmd += " -D hadoop.job.ugi=" + hdfs_ugi_;
      paddle::framework::hdfs_set_command(cmd);
      file_store = std::make_shared<gloo::rendezvous::HdfsStore>(hdfs_path_);
      file_store->SetTimeoutSeconds(init_timeout_.count());
      auto prefix_store =
          std::make_shared<gloo::rendezvous::PrefixStore>(prefix_, *file_store);
      context->connectFullMesh(*prefix_store, dev);
      break;
    }
    case GlooStoreType::HTTP: {
      http_store = std::make_shared<gloo::rendezvous::HTTPStore>(
          http_ip_, http_port_, prefix_ + "_" + http_scope_, rank_);
      http_store->SetTimeoutSeconds(init_timeout_.count());
      context->connectFullMesh(*http_store, dev);
      http_store->Finalize();
L
lilong12 已提交
297
      VLOG(3) << "after calling http_store->Finalize.";
298 299 300 301 302 303
      break;
    }
    default:
      LOG(ERROR) << "unknown store type " << store_type_;
      exit(-1);
  }
304 305 306
  context_ = std::move(context);
#endif
  is_initialized_ = true;
L
lilong12 已提交
307
  VLOG(3) << "gloo initialized done.";
308 309
}

X
xujiaqi01 已提交
310
template std::vector<int64_t> GlooWrapper::AllReduce<int64_t>(
311 312
    std::vector<int64_t>& sendbuf,  // NOLINT
    const std::string& mode);
313 314 315
template std::vector<float> GlooWrapper::AllReduce<float>(
    std::vector<float>& sendbuf,  // NOLINT
    const std::string& mode);
X
xujiaqi01 已提交
316
template std::vector<double> GlooWrapper::AllReduce<double>(
317
    std::vector<double>& sendbuf,  // NOLINT
X
xujiaqi01 已提交
318 319 320
    const std::string& mode);
template std::vector<uint64_t> GlooWrapper::AllReduce<uint64_t>(
    std::vector<uint64_t>& sendbuf,  // NOLINT
321 322 323
    const std::string& mode);
template std::vector<int64_t> GlooWrapper::AllGather<int64_t>(
    int64_t& input);  // NOLINT
X
xujiaqi01 已提交
324 325
template std::vector<uint64_t> GlooWrapper::AllGather<uint64_t>(
    uint64_t& input);  // NOLINT
326 327
template std::vector<float> GlooWrapper::AllGather<float>(
    float& input);  // NOLINT
328 329 330 331 332
template std::vector<double> GlooWrapper::AllGather<double>(
    double& input);  // NOLINT

}  // namespace framework
}  // namespace paddle