env_hdfs.cc 17.5 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
6
#ifdef USE_HDFS
7 8
#ifndef ROCKSDB_HDFS_FILE_C
#define ROCKSDB_HDFS_FILE_C
9 10 11 12 13 14 15

#include <algorithm>
#include <stdio.h>
#include <sys/time.h>
#include <time.h>
#include <iostream>
#include <sstream>
16 17
#include "rocksdb/env.h"
#include "rocksdb/status.h"
18 19
#include "hdfs/env_hdfs.h"

I
Igor Canadi 已提交
20
#define HDFS_EXISTS 0
21 22
#define HDFS_DOESNT_EXIST -1
#define HDFS_SUCCESS 0
I
Igor Canadi 已提交
23

24
//
25 26
// This file defines an HDFS environment for rocksdb. It uses the libhdfs
// api to access HDFS. All HDFS files created by one instance of rocksdb
A
Abhishek Kona 已提交
27
// will reside on the same HDFS cluster.
28 29
//

30
namespace rocksdb {
31 32 33 34 35 36 37 38 39 40

namespace {

// Log error message
static Status IOError(const std::string& context, int err_number) {
  return Status::IOError(context, strerror(err_number));
}

// assume that there is one global logger for now. It is not thread-safe,
// but need not be because the logger is initialized at db-open time.
A
Abhishek Kona 已提交
41
static Logger* mylog = nullptr;
42 43 44

// Used for reading a file from HDFS. It implements both sequential-read
// access methods as well as random read access methods.
I
Igor Canadi 已提交
45 46
class HdfsReadableFile : virtual public SequentialFile,
                         virtual public RandomAccessFile {
47 48 49 50 51 52 53
 private:
  hdfsFS fileSys_;
  std::string filename_;
  hdfsFile hfile_;

 public:
  HdfsReadableFile(hdfsFS fileSys, const std::string& fname)
A
Abhishek Kona 已提交
54
      : fileSys_(fileSys), filename_(fname), hfile_(nullptr) {
55 56 57
    Log(mylog, "[hdfs] HdfsReadableFile opening file %s\n",
        filename_.c_str());
    hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0);
A
Abhishek Kona 已提交
58
    Log(mylog, "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n",
59 60 61 62
            filename_.c_str(), hfile_);
  }

  virtual ~HdfsReadableFile() {
A
Abhishek Kona 已提交
63
    Log(mylog, "[hdfs] HdfsReadableFile closing file %s\n",
64 65
       filename_.c_str());
    hdfsCloseFile(fileSys_, hfile_);
A
Abhishek Kona 已提交
66
    Log(mylog, "[hdfs] HdfsReadableFile closed file %s\n",
67
        filename_.c_str());
A
Abhishek Kona 已提交
68
    hfile_ = nullptr;
69 70 71
  }

  bool isValid() {
A
Abhishek Kona 已提交
72
    return hfile_ != nullptr;
73 74 75 76 77
  }

  // sequential access, read data at current offset in file
  virtual Status Read(size_t n, Slice* result, char* scratch) {
    Status s;
A
Abhishek Kona 已提交
78
    Log(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n",
79
        filename_.c_str(), n);
I
Igor Canadi 已提交
80 81 82 83 84 85 86 87 88 89 90

    char* buffer = scratch;
    size_t total_bytes_read = 0;
    tSize bytes_read = 0;
    tSize remaining_bytes = (tSize)n;

    // Read a total of n bytes repeatedly until we hit error or eof
    while (remaining_bytes > 0) {
      bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes);
      if (bytes_read <= 0) {
        break;
91
      }
I
Igor Canadi 已提交
92 93 94 95 96 97 98 99 100 101 102 103 104 105
      assert(bytes_read <= remaining_bytes);

      total_bytes_read += bytes_read;
      remaining_bytes -= bytes_read;
      buffer += bytes_read;
    }
    assert(total_bytes_read <= n);

    Log(mylog, "[hdfs] HdfsReadableFile read %s\n", filename_.c_str());

    if (bytes_read < 0) {
      s = IOError(filename_, errno);
    } else {
      *result = Slice(scratch, total_bytes_read);
106
    }
I
Igor Canadi 已提交
107

108 109 110 111 112 113 114 115
    return s;
  }

  // random access, read data from specified offset in file
  virtual Status Read(uint64_t offset, size_t n, Slice* result,
                      char* scratch) const {
    Status s;
    Log(mylog, "[hdfs] HdfsReadableFile preading %s\n", filename_.c_str());
A
Abhishek Kona 已提交
116
    ssize_t bytes_read = hdfsPread(fileSys_, hfile_, offset,
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
                                   (void*)scratch, (tSize)n);
    Log(mylog, "[hdfs] HdfsReadableFile pread %s\n", filename_.c_str());
    *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read);
    if (bytes_read < 0) {
      // An error: return a non-ok status
      s = IOError(filename_, errno);
    }
    return s;
  }

  virtual Status Skip(uint64_t n) {
    Log(mylog, "[hdfs] HdfsReadableFile skip %s\n", filename_.c_str());
    // get current offset from file
    tOffset current = hdfsTell(fileSys_, hfile_);
    if (current < 0) {
      return IOError(filename_, errno);
    }
    // seek to new offset in file
    tOffset newoffset = current + n;
    int val = hdfsSeek(fileSys_, hfile_, newoffset);
    if (val < 0) {
      return IOError(filename_, errno);
    }
    return Status::OK();
  }

 private:

  // returns true if we are at the end of file, false otherwise
  bool feof() {
    Log(mylog, "[hdfs] HdfsReadableFile feof %s\n", filename_.c_str());
    if (hdfsTell(fileSys_, hfile_) == fileSize()) {
      return true;
    }
    return false;
  }

  // the current size of the file
  tOffset fileSize() {
    Log(mylog, "[hdfs] HdfsReadableFile fileSize %s\n", filename_.c_str());
    hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str());
    tOffset size = 0L;
A
Abhishek Kona 已提交
159
    if (pFileInfo != nullptr) {
160 161 162
      size = pFileInfo->mSize;
      hdfsFreeFileInfo(pFileInfo, 1);
    } else {
I
Igor Canadi 已提交
163
      throw HdfsFatalException("fileSize on unknown file " + filename_);
164 165 166 167 168 169 170 171 172 173 174 175 176 177
    }
    return size;
  }
};

// Appends to an existing file in HDFS.
class HdfsWritableFile: public WritableFile {
 private:
  hdfsFS fileSys_;
  std::string filename_;
  hdfsFile hfile_;

 public:
  HdfsWritableFile(hdfsFS fileSys, const std::string& fname)
A
Abhishek Kona 已提交
178
      : fileSys_(fileSys), filename_(fname) , hfile_(nullptr) {
179 180 181
    Log(mylog, "[hdfs] HdfsWritableFile opening %s\n", filename_.c_str());
    hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0);
    Log(mylog, "[hdfs] HdfsWritableFile opened %s\n", filename_.c_str());
A
Abhishek Kona 已提交
182
    assert(hfile_ != nullptr);
183 184
  }
  virtual ~HdfsWritableFile() {
A
Abhishek Kona 已提交
185
    if (hfile_ != nullptr) {
186 187 188
      Log(mylog, "[hdfs] HdfsWritableFile closing %s\n", filename_.c_str());
      hdfsCloseFile(fileSys_, hfile_);
      Log(mylog, "[hdfs] HdfsWritableFile closed %s\n", filename_.c_str());
A
Abhishek Kona 已提交
189
      hfile_ = nullptr;
190 191 192 193 194 195
    }
  }

  // If the file was successfully created, then this returns true.
  // Otherwise returns false.
  bool isValid() {
A
Abhishek Kona 已提交
196
    return hfile_ != nullptr;
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
  }

  // The name of the file, mostly needed for debug logging.
  const std::string& getName() {
    return filename_;
  }

  virtual Status Append(const Slice& data) {
    Log(mylog, "[hdfs] HdfsWritableFile Append %s\n", filename_.c_str());
    const char* src = data.data();
    size_t left = data.size();
    size_t ret = hdfsWrite(fileSys_, hfile_, src, left);
    Log(mylog, "[hdfs] HdfsWritableFile Appended %s\n", filename_.c_str());
    if (ret != left) {
      return IOError(filename_, errno);
    }
    return Status::OK();
  }

  virtual Status Flush() {
    return Status::OK();
  }

  virtual Status Sync() {
    Status s;
    Log(mylog, "[hdfs] HdfsWritableFile Sync %s\n", filename_.c_str());
    if (hdfsFlush(fileSys_, hfile_) == -1) {
      return IOError(filename_, errno);
    }
226
    if (hdfsHSync(fileSys_, hfile_) == -1) {
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
      return IOError(filename_, errno);
    }
    Log(mylog, "[hdfs] HdfsWritableFile Synced %s\n", filename_.c_str());
    return Status::OK();
  }

  // This is used by HdfsLogger to write data to the debug log file
  virtual Status Append(const char* src, size_t size) {
    if (hdfsWrite(fileSys_, hfile_, src, size) != (tSize)size) {
      return IOError(filename_, errno);
    }
    return Status::OK();
  }

  virtual Status Close() {
    Log(mylog, "[hdfs] HdfsWritableFile closing %s\n", filename_.c_str());
    if (hdfsCloseFile(fileSys_, hfile_) != 0) {
      return IOError(filename_, errno);
    }
    Log(mylog, "[hdfs] HdfsWritableFile closed %s\n", filename_.c_str());
A
Abhishek Kona 已提交
247
    hfile_ = nullptr;
248 249 250 251 252 253 254 255 256 257 258
    return Status::OK();
  }
};

// The object that implements the debug logs to reside in HDFS.
class HdfsLogger : public Logger {
 private:
  HdfsWritableFile* file_;
  uint64_t (*gettid_)();  // Return the thread id for the current thread

 public:
I
Igor Canadi 已提交
259 260
  HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())
      : file_(f), gettid_(gettid) {
A
Abhishek Kona 已提交
261
    Log(mylog, "[hdfs] HdfsLogger opened %s\n",
262 263 264 265
            file_->getName().c_str());
  }

  virtual ~HdfsLogger() {
A
Abhishek Kona 已提交
266
    Log(mylog, "[hdfs] HdfsLogger closed %s\n",
267 268
            file_->getName().c_str());
    delete file_;
A
Abhishek Kona 已提交
269 270
    if (mylog != nullptr && mylog == this) {
      mylog = nullptr;
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
    }
  }

  virtual void Logv(const char* format, va_list ap) {
    const uint64_t thread_id = (*gettid_)();

    // We try twice: the first time with a fixed-size stack allocated buffer,
    // and the second time with a much larger dynamically allocated buffer.
    char buffer[500];
    for (int iter = 0; iter < 2; iter++) {
      char* base;
      int bufsize;
      if (iter == 0) {
        bufsize = sizeof(buffer);
        base = buffer;
      } else {
        bufsize = 30000;
        base = new char[bufsize];
      }
      char* p = base;
      char* limit = base + bufsize;

      struct timeval now_tv;
A
Abhishek Kona 已提交
294
      gettimeofday(&now_tv, nullptr);
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345
      const time_t seconds = now_tv.tv_sec;
      struct tm t;
      localtime_r(&seconds, &t);
      p += snprintf(p, limit - p,
                    "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
                    t.tm_year + 1900,
                    t.tm_mon + 1,
                    t.tm_mday,
                    t.tm_hour,
                    t.tm_min,
                    t.tm_sec,
                    static_cast<int>(now_tv.tv_usec),
                    static_cast<long long unsigned int>(thread_id));

      // Print the message
      if (p < limit) {
        va_list backup_ap;
        va_copy(backup_ap, ap);
        p += vsnprintf(p, limit - p, format, backup_ap);
        va_end(backup_ap);
      }

      // Truncate to available space if necessary
      if (p >= limit) {
        if (iter == 0) {
          continue;       // Try again with larger buffer
        } else {
          p = limit - 1;
        }
      }

      // Add newline if necessary
      if (p == base || p[-1] != '\n') {
        *p++ = '\n';
      }

      assert(p <= limit);
      file_->Append(base, p-base);
      file_->Flush();
      if (base != buffer) {
        delete[] base;
      }
      break;
    }
  }
};

}  // namespace

// Finally, the hdfs environment

I
Igor Canadi 已提交
346 347 348
const std::string HdfsEnv::kProto = "hdfs://";
const std::string HdfsEnv::pathsep = "/";

349 350
// open a file for sequential reading
Status HdfsEnv::NewSequentialFile(const std::string& fname,
I
Igor Canadi 已提交
351 352 353
                                  unique_ptr<SequentialFile>* result,
                                  const EnvOptions& options) {
  result->reset();
354
  HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
I
Igor Canadi 已提交
355 356
  if (f == nullptr || !f->isValid()) {
    delete f;
A
Abhishek Kona 已提交
357
    *result = nullptr;
358 359
    return IOError(fname, errno);
  }
I
Igor Canadi 已提交
360
  result->reset(dynamic_cast<SequentialFile*>(f));
361 362 363 364 365
  return Status::OK();
}

// open a file for random reading
Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
I
Igor Canadi 已提交
366 367 368
                                    unique_ptr<RandomAccessFile>* result,
                                    const EnvOptions& options) {
  result->reset();
369
  HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
I
Igor Canadi 已提交
370 371
  if (f == nullptr || !f->isValid()) {
    delete f;
A
Abhishek Kona 已提交
372
    *result = nullptr;
373 374
    return IOError(fname, errno);
  }
I
Igor Canadi 已提交
375
  result->reset(dynamic_cast<RandomAccessFile*>(f));
376 377 378 379 380
  return Status::OK();
}

// create a new file for writing
Status HdfsEnv::NewWritableFile(const std::string& fname,
I
Igor Canadi 已提交
381 382 383
                                unique_ptr<WritableFile>* result,
                                const EnvOptions& options) {
  result->reset();
384 385
  Status s;
  HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
A
Abhishek Kona 已提交
386
  if (f == nullptr || !f->isValid()) {
I
Igor Canadi 已提交
387
    delete f;
A
Abhishek Kona 已提交
388
    *result = nullptr;
389 390
    return IOError(fname, errno);
  }
I
Igor Canadi 已提交
391
  result->reset(dynamic_cast<WritableFile*>(f));
392 393 394
  return Status::OK();
}

I
Igor Canadi 已提交
395 396 397 398 399 400
Status HdfsEnv::NewRandomRWFile(const std::string& fname,
                                unique_ptr<RandomRWFile>* result,
                                const EnvOptions& options) {
  return Status::NotSupported("NewRandomRWFile not supported on HdfsEnv");
}

401 402 403
class HdfsDirectory : public Directory {
 public:
  explicit HdfsDirectory(int fd) : fd_(fd) {}
404
  ~HdfsDirectory() {}
405

406
  virtual Status Fsync() { return Status::OK(); }
407 408 409 410 411

 private:
  int fd_;
};

I
Igor Canadi 已提交
412 413
Status HdfsEnv::NewDirectory(const std::string& name,
                             unique_ptr<Directory>* result) {
414
  int value = hdfsExists(fileSys_, name.c_str());
415
  switch (value) {
416 417
    case HDFS_EXISTS:
      result->reset(new HdfsDirectory(0));
418
      return Status::OK();
419 420 421 422 423
    default:  // fail if the directory doesn't exist
      Log(mylog, "NewDirectory hdfsExists call failed");
      throw HdfsFatalException("hdfsExists call failed with error " +
                               std::to_string(value) + " on path " + name +
                               ".\n");
424
  }
425 426
}

427
bool HdfsEnv::FileExists(const std::string& fname) {
428

429
  int value = hdfsExists(fileSys_, fname.c_str());
I
Igor Canadi 已提交
430 431
  switch (value) {
    case HDFS_EXISTS:
432
    return true;
I
Igor Canadi 已提交
433 434 435 436
    case HDFS_DOESNT_EXIST:
      return false;
    default:  // anything else should be an error
      Log(mylog, "FileExists hdfsExists call failed");
437 438 439
      throw HdfsFatalException("hdfsExists call failed with error " +
                               std::to_string(value) + " on path " + fname +
                               ".\n");
440 441 442 443 444 445 446
  }
}

Status HdfsEnv::GetChildren(const std::string& path,
                            std::vector<std::string>* result) {
  int value = hdfsExists(fileSys_, path.c_str());
  switch (value) {
I
Igor Canadi 已提交
447
    case HDFS_EXISTS: {  // directory exists
448 449 450 451 452 453 454
    int numEntries = 0;
    hdfsFileInfo* pHdfsFileInfo = 0;
    pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
    if (numEntries >= 0) {
      for(int i = 0; i < numEntries; i++) {
        char* pathname = pHdfsFileInfo[i].mName;
        char* filename = rindex(pathname, '/');
A
Abhishek Kona 已提交
455
        if (filename != nullptr) {
456 457 458
          result->push_back(filename+1);
        }
      }
A
Abhishek Kona 已提交
459
      if (pHdfsFileInfo != nullptr) {
460 461 462 463 464
        hdfsFreeFileInfo(pHdfsFileInfo, numEntries);
      }
    } else {
      // numEntries < 0 indicates error
      Log(mylog, "hdfsListDirectory call failed with error ");
I
Igor Canadi 已提交
465 466
      throw HdfsFatalException(
          "hdfsListDirectory call failed negative error.\n");
467 468 469
    }
    break;
  }
I
Igor Canadi 已提交
470
  case HDFS_DOESNT_EXIST:  // directory does not exist, exit
471 472
    break;
  default:          // anything else should be an error
I
Igor Canadi 已提交
473 474
    Log(mylog, "GetChildren hdfsExists call failed");
    throw HdfsFatalException("hdfsExists call failed with error " +
475
                             std::to_string(value) + ".\n");
476 477 478 479 480
  }
  return Status::OK();
}

Status HdfsEnv::DeleteFile(const std::string& fname) {
481
  if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) {
482 483 484 485 486 487 488 489 490 491 492 493
    return Status::OK();
  }
  return IOError(fname, errno);
};

Status HdfsEnv::CreateDir(const std::string& name) {
  if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) {
    return Status::OK();
  }
  return IOError(name, errno);
};

494 495 496
Status HdfsEnv::CreateDirIfMissing(const std::string& name) {
  const int value = hdfsExists(fileSys_, name.c_str());
  //  Not atomic. state might change b/w hdfsExists and CreateDir.
I
Igor Canadi 已提交
497 498
  switch (value) {
    case HDFS_EXISTS:
499
    return Status::OK();
I
Igor Canadi 已提交
500
    case HDFS_DOESNT_EXIST:
501
    return CreateDir(name);
I
Igor Canadi 已提交
502 503
    default:  // anything else should be an error
      Log(mylog, "CreateDirIfMissing hdfsExists call failed");
M
Mike Orr 已提交
504
      throw HdfsFatalException("hdfsExists call failed with error " +
I
Igor Canadi 已提交
505
                               std::to_string(value) + ".\n");
506 507 508
  }
};

509 510 511 512 513 514 515
Status HdfsEnv::DeleteDir(const std::string& name) {
  return DeleteFile(name);
};

Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) {
  *size = 0L;
  hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
A
Abhishek Kona 已提交
516
  if (pFileInfo != nullptr) {
517 518 519 520 521 522 523
    *size = pFileInfo->mSize;
    hdfsFreeFileInfo(pFileInfo, 1);
    return Status::OK();
  }
  return IOError(fname, errno);
}

524 525 526
Status HdfsEnv::GetFileModificationTime(const std::string& fname,
                                        uint64_t* time) {
  hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
A
Abhishek Kona 已提交
527
  if (pFileInfo != nullptr) {
528 529 530 531 532 533 534 535
    *time = static_cast<uint64_t>(pFileInfo->mLastMod);
    hdfsFreeFileInfo(pFileInfo, 1);
    return Status::OK();
  }
  return IOError(fname, errno);

}

536 537 538 539
// The rename is not atomic. HDFS does not allow a renaming if the
// target already exists. So, we delete the target before attemting the
// rename.
Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {
540
  hdfsDelete(fileSys_, target.c_str(), 1);
541 542 543 544 545 546 547 548 549
  if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) {
    return Status::OK();
  }
  return IOError(src, errno);
}

Status HdfsEnv::LockFile(const std::string& fname, FileLock** lock) {
  // there isn's a very good way to atomically check and create
  // a file via libhdfs
A
Abhishek Kona 已提交
550
  *lock = nullptr;
551 552 553 554 555 556 557
  return Status::OK();
}

Status HdfsEnv::UnlockFile(FileLock* lock) {
  return Status::OK();
}

558 559
Status HdfsEnv::NewLogger(const std::string& fname,
                          shared_ptr<Logger>* result) {
560
  HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
A
Abhishek Kona 已提交
561
  if (f == nullptr || !f->isValid()) {
I
Igor Canadi 已提交
562
    delete f;
A
Abhishek Kona 已提交
563
    *result = nullptr;
564 565 566
    return IOError(fname, errno);
  }
  HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid);
I
Igor Canadi 已提交
567
  result->reset(h);
A
Abhishek Kona 已提交
568
  if (mylog == nullptr) {
569 570 571 572 573
    // mylog = h; // uncomment this for detailed logging
  }
  return Status::OK();
}

574
}  // namespace rocksdb
575

576
#endif // ROCKSDB_HDFS_FILE_C
577 578 579 580

#else // USE_HDFS

// dummy placeholders used when HDFS is not available
581
#include "rocksdb/env.h"
582
#include "hdfs/env_hdfs.h"
583
namespace rocksdb {
584
 Status HdfsEnv::NewSequentialFile(const std::string& fname,
585 586
                                   unique_ptr<SequentialFile>* result,
                                   const EnvOptions& options) {
587 588
   return Status::NotSupported("Not compiled with hdfs support");
 }
589 590 591
}

#endif