env_posix.cc 28.2 KB
Newer Older
J
jorlow@chromium.org 已提交
1 2 3 4 5
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include <deque>
6
#include <set>
J
jorlow@chromium.org 已提交
7 8 9 10 11 12 13
#include <dirent.h>
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
14
#include <sys/ioctl.h>
J
jorlow@chromium.org 已提交
15 16
#include <sys/mman.h>
#include <sys/stat.h>
A
Abhishek Kona 已提交
17
#include <sys/statfs.h>
J
jorlow@chromium.org 已提交
18 19
#include <sys/time.h>
#include <sys/types.h>
A
Abhishek Kona 已提交
20
#include <sys/vfs.h>
J
jorlow@chromium.org 已提交
21 22
#include <time.h>
#include <unistd.h>
23 24 25
#if defined(OS_LINUX)
#include <linux/fs.h>
#endif
J
jorlow@chromium.org 已提交
26 27 28
#if defined(LEVELDB_PLATFORM_ANDROID)
#include <sys/stat.h>
#endif
29 30
#include "leveldb/env.h"
#include "leveldb/slice.h"
J
jorlow@chromium.org 已提交
31
#include "port/port.h"
32
#include "util/coding.h"
J
jorlow@chromium.org 已提交
33
#include "util/logging.h"
34
#include "util/posix_logger.h"
J
jorlow@chromium.org 已提交
35

A
Abhishek Kona 已提交
36 37 38 39 40 41 42 43 44 45
#if !defined(TMPFS_MAGIC)
#define TMPFS_MAGIC 0x01021994
#endif
#if !defined(XFS_SUPER_MAGIC)
#define XFS_SUPER_MAGIC 0x58465342
#endif
#if !defined(EXT4_SUPER_MAGIC)
#define EXT4_SUPER_MAGIC 0xEF53
#endif

A
Abhishek Kona 已提交
46
bool useOsBuffer = 1;     // cache data in OS buffers
47
bool useFsReadAhead = 1;  // allow filesystem to do readaheads
48 49
bool useMmapRead = 0;     // do not use mmaps for reading files
bool useMmapWrite = 1;    // use mmaps for appending to files
50

J
jorlow@chromium.org 已提交
51 52 53 54
namespace leveldb {

namespace {

55 56 57 58
// list of pathnames that are locked
static std::set<std::string> lockedFiles;
static port::Mutex mutex_lockedFiles;

59 60 61 62
static Status IOError(const std::string& context, int err_number) {
  return Status::IOError(context, strerror(err_number));
}

J
jorlow@chromium.org 已提交
63 64 65 66
class PosixSequentialFile: public SequentialFile {
 private:
  std::string filename_;
  FILE* file_;
67 68
  int fd_;
  bool use_os_buffer = true;
J
jorlow@chromium.org 已提交
69 70

 public:
71 72 73 74 75 76 77
  PosixSequentialFile(const std::string& fname, FILE* f,
      const EnvOptions& options)
      : filename_(fname), file_(f) {
    fd_ = fileno(f);
    assert(!options.UseMmapReads());
    use_os_buffer = options.UseOsBuffer();
  }
J
jorlow@chromium.org 已提交
78 79 80 81 82 83 84 85 86 87 88
  virtual ~PosixSequentialFile() { fclose(file_); }

  virtual Status Read(size_t n, Slice* result, char* scratch) {
    Status s;
    size_t r = fread_unlocked(scratch, 1, n, file_);
    *result = Slice(scratch, r);
    if (r < n) {
      if (feof(file_)) {
        // We leave status as ok if we hit the end of the file
      } else {
        // A partial read with an error: return a non-ok status
89
        s = IOError(filename_, errno);
J
jorlow@chromium.org 已提交
90 91
      }
    }
92 93 94 95 96
    if (!use_os_buffer) {
      // we need to fadvise away the entire range of pages because
      // we do not want readahead pages to be cached.
      posix_fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages
    }
J
jorlow@chromium.org 已提交
97 98
    return s;
  }
99 100 101

  virtual Status Skip(uint64_t n) {
    if (fseek(file_, n, SEEK_CUR)) {
102
      return IOError(filename_, errno);
103 104 105
    }
    return Status::OK();
  }
J
jorlow@chromium.org 已提交
106 107
};

108
// pread() based random-access
J
jorlow@chromium.org 已提交
109 110 111 112
class PosixRandomAccessFile: public RandomAccessFile {
 private:
  std::string filename_;
  int fd_;
113
  bool use_os_buffer = true;
J
jorlow@chromium.org 已提交
114 115

 public:
116 117
  PosixRandomAccessFile(const std::string& fname, int fd,
                        const EnvOptions& options)
A
Abhishek Kona 已提交
118
      : filename_(fname), fd_(fd) {
119 120
    assert(!options.UseMmapReads());
    if (!options.UseReadahead()) { // disable read-aheads
121 122
      posix_fadvise(fd, 0, 0, POSIX_FADV_RANDOM);
    }
123
    use_os_buffer = options.UseOsBuffer();
124
  }
J
jorlow@chromium.org 已提交
125 126 127 128 129 130 131 132 133
  virtual ~PosixRandomAccessFile() { close(fd_); }

  virtual Status Read(uint64_t offset, size_t n, Slice* result,
                      char* scratch) const {
    Status s;
    ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
    *result = Slice(scratch, (r < 0) ? 0 : r);
    if (r < 0) {
      // An error: return a non-ok status
134
      s = IOError(filename_, errno);
J
jorlow@chromium.org 已提交
135
    }
136
    if (!use_os_buffer) {
137 138 139
      // we need to fadvise away the entire range of pages because
      // we do not want readahead pages to be cached.
      posix_fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages
140
    }
J
jorlow@chromium.org 已提交
141 142
    return s;
  }
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171

#if defined(OS_LINUX)
  virtual size_t GetUniqueId(char* id, size_t max_size) const {
    // TODO: possibly allow this function to handle tighter bounds.
    if (max_size < kMaxVarint64Length*3) {
      return 0;
    }

    struct stat buf;
    int result = fstat(fd_, &buf);
    if (result == -1) {
      return 0;
    }

    long version = 0;
    result = ioctl(fd_, FS_IOC_GETVERSION, &version);
    if (result == -1) {
      return 0;
    }
    uint64_t uversion = (uint64_t)version;

    char* rid = id;
    rid = EncodeVarint64(rid, buf.st_dev);
    rid = EncodeVarint64(rid, buf.st_ino);
    rid = EncodeVarint64(rid, uversion);
    assert(rid >= id);
    return static_cast<size_t>(rid-id);
  }
#endif
J
jorlow@chromium.org 已提交
172 173
};

174 175 176 177 178 179 180 181 182
// mmap() based random-access
class PosixMmapReadableFile: public RandomAccessFile {
 private:
  std::string filename_;
  void* mmapped_region_;
  size_t length_;

 public:
  // base[0,length-1] contains the mmapped contents of the file.
183 184 185 186 187 188 189
  PosixMmapReadableFile(const std::string& fname, void* base, size_t length,
                        const EnvOptions& options)
      : filename_(fname), mmapped_region_(base), length_(length) {
    assert(options.UseMmapReads());
    assert(options.UseOsBuffer());
    assert(options.UseReadahead());
  }
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
  virtual ~PosixMmapReadableFile() { munmap(mmapped_region_, length_); }

  virtual Status Read(uint64_t offset, size_t n, Slice* result,
                      char* scratch) const {
    Status s;
    if (offset + n > length_) {
      *result = Slice();
      s = IOError(filename_, EINVAL);
    } else {
      *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
    }
    return s;
  }
};

J
jorlow@chromium.org 已提交
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
// We preallocate up to an extra megabyte and use memcpy to append new
// data to the file.  This is safe since we either properly close the
// file before reading from it, or for log files, the reading code
// knows enough to skip zero suffixes.
class PosixMmapFile : public WritableFile {
 private:
  std::string filename_;
  int fd_;
  size_t page_size_;
  size_t map_size_;       // How much extra memory to map at a time
  char* base_;            // The mapped region
  char* limit_;           // Limit of the mapped region
  char* dst_;             // Where to write next  (in range [base_,limit_])
  char* last_sync_;       // Where have we synced up to
  uint64_t file_offset_;  // Offset of base_ in file

  // Have we done an munmap of unsynced data?
  bool pending_sync_;

  // Roundup x to a multiple of y
  static size_t Roundup(size_t x, size_t y) {
    return ((x + y - 1) / y) * y;
  }

  size_t TruncateToPageBoundary(size_t s) {
    s -= (s & (page_size_ - 1));
    assert((s % page_size_) == 0);
    return s;
  }

235 236
  bool UnmapCurrentRegion() {
    bool result = true;
A
Abhishek Kona 已提交
237
    if (base_ != nullptr) {
J
jorlow@chromium.org 已提交
238 239 240 241
      if (last_sync_ < limit_) {
        // Defer syncing this data until next Sync() call, if any
        pending_sync_ = true;
      }
242 243 244
      if (munmap(base_, limit_ - base_) != 0) {
        result = false;
      }
J
jorlow@chromium.org 已提交
245
      file_offset_ += limit_ - base_;
A
Abhishek Kona 已提交
246 247 248 249
      base_ = nullptr;
      limit_ = nullptr;
      last_sync_ = nullptr;
      dst_ = nullptr;
J
jorlow@chromium.org 已提交
250 251 252 253 254 255

      // Increase the amount we map the next time, but capped at 1MB
      if (map_size_ < (1<<20)) {
        map_size_ *= 2;
      }
    }
256
    return result;
J
jorlow@chromium.org 已提交
257 258
  }

A
Abhishek Kona 已提交
259
  Status MapNewRegion() {
A
Abhishek Kona 已提交
260
    assert(base_ == nullptr);
A
Abhishek Kona 已提交
261 262 263 264 265

    int alloc_status = posix_fallocate(fd_, file_offset_, map_size_);
    if (alloc_status != 0) {
      return Status::IOError("Error allocating space to file : " + filename_ +
        "Error : " + strerror(alloc_status));
J
jorlow@chromium.org 已提交
266
    }
A
Abhishek Kona 已提交
267 268


A
Abhishek Kona 已提交
269
    void* ptr = mmap(nullptr, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED,
J
jorlow@chromium.org 已提交
270 271
                     fd_, file_offset_);
    if (ptr == MAP_FAILED) {
A
Abhishek Kona 已提交
272
      return Status::IOError("MMap failed on " + filename_);
J
jorlow@chromium.org 已提交
273 274 275 276 277
    }
    base_ = reinterpret_cast<char*>(ptr);
    limit_ = base_ + map_size_;
    dst_ = base_;
    last_sync_ = base_;
A
Abhishek Kona 已提交
278
    return Status::OK();
J
jorlow@chromium.org 已提交
279 280 281
  }

 public:
282 283
  PosixMmapFile(const std::string& fname, int fd, size_t page_size,
                const EnvOptions& options)
J
jorlow@chromium.org 已提交
284 285 286 287
      : filename_(fname),
        fd_(fd),
        page_size_(page_size),
        map_size_(Roundup(65536, page_size)),
A
Abhishek Kona 已提交
288 289 290 291
        base_(nullptr),
        limit_(nullptr),
        dst_(nullptr),
        last_sync_(nullptr),
J
jorlow@chromium.org 已提交
292 293 294
        file_offset_(0),
        pending_sync_(false) {
    assert((page_size & (page_size - 1)) == 0);
295
    assert(options.UseMmapWrites());
J
jorlow@chromium.org 已提交
296 297 298 299 300 301 302 303 304 305 306 307
  }


  ~PosixMmapFile() {
    if (fd_ >= 0) {
      PosixMmapFile::Close();
    }
  }

  virtual Status Append(const Slice& data) {
    const char* src = data.data();
    size_t left = data.size();
308
    PrepareWrite(GetFileSize(), left);
J
jorlow@chromium.org 已提交
309 310 311 312 313
    while (left > 0) {
      assert(base_ <= dst_);
      assert(dst_ <= limit_);
      size_t avail = limit_ - dst_;
      if (avail == 0) {
A
Abhishek Kona 已提交
314 315 316 317 318
        if (UnmapCurrentRegion()) {
          Status s = MapNewRegion();
          if (!s.ok()) {
            return s;
          }
319
        }
J
jorlow@chromium.org 已提交
320 321 322 323 324 325 326 327 328 329 330 331 332 333
      }

      size_t n = (left <= avail) ? left : avail;
      memcpy(dst_, src, n);
      dst_ += n;
      src += n;
      left -= n;
    }
    return Status::OK();
  }

  virtual Status Close() {
    Status s;
    size_t unused = limit_ - dst_;
334 335 336
    if (!UnmapCurrentRegion()) {
      s = IOError(filename_, errno);
    } else if (unused > 0) {
J
jorlow@chromium.org 已提交
337 338
      // Trim the extra space at the end of the file
      if (ftruncate(fd_, file_offset_ - unused) < 0) {
339
        s = IOError(filename_, errno);
J
jorlow@chromium.org 已提交
340 341 342 343 344
      }
    }

    if (close(fd_) < 0) {
      if (s.ok()) {
345
        s = IOError(filename_, errno);
J
jorlow@chromium.org 已提交
346 347 348 349
      }
    }

    fd_ = -1;
A
Abhishek Kona 已提交
350 351
    base_ = nullptr;
    limit_ = nullptr;
J
jorlow@chromium.org 已提交
352 353 354 355 356 357 358 359 360 361 362 363 364 365
    return s;
  }

  virtual Status Flush() {
    return Status::OK();
  }

  virtual Status Sync() {
    Status s;

    if (pending_sync_) {
      // Some unmapped data was not synced
      pending_sync_ = false;
      if (fdatasync(fd_) < 0) {
366
        s = IOError(filename_, errno);
J
jorlow@chromium.org 已提交
367 368 369 370 371 372 373 374 375 376
      }
    }

    if (dst_ > last_sync_) {
      // Find the beginnings of the pages that contain the first and last
      // bytes to be synced.
      size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
      size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
      last_sync_ = dst_;
      if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
377
        s = IOError(filename_, errno);
J
jorlow@chromium.org 已提交
378 379 380 381 382
      }
    }

    return s;
  }
383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398

  /**
   * Flush data as well as metadata to stable storage.
   */
  virtual Status Fsync() {
    if (pending_sync_) {
      // Some unmapped data was not synced
      pending_sync_ = false;
      if (fsync(fd_) < 0) {
        return IOError(filename_, errno);
      }
    }
    // This invocation to Sync will not issue the call to
    // fdatasync because pending_sync_ has already been cleared.
    return Sync();
  }
399 400 401 402 403 404 405 406 407 408

  /**
   * Get the size of valid data in the file. This will not match the
   * size that is returned from the filesystem because we use mmap
   * to extend file by map_size every time.
   */
  virtual uint64_t GetFileSize() {
    size_t used = dst_ - base_;
    return file_offset_ + used;
  }
409

410
#ifdef OS_LINUX
411 412 413 414 415 416 417
  virtual Status Allocate(off_t offset, off_t len) {
    if (!fallocate(fd_, FALLOC_FL_KEEP_SIZE, offset, len)) {
      return Status::OK();
    } else {
      return IOError(filename_, errno);
    }
  }
418
#endif
J
jorlow@chromium.org 已提交
419 420
};

421 422 423 424 425 426 427
// Use posix write to write data to a file.
class PosixWritableFile : public WritableFile {
 private:
  const std::string filename_;
  int fd_;
  size_t cursize_;      // current size of cached data in buf_
  size_t capacity_;     // max size of buf_
M
Mayank Agarwal 已提交
428
  unique_ptr<char[]> buf_;           // a buffer to cache writes
429 430 431 432 433
  uint64_t filesize_;
  bool pending_sync_;
  bool pending_fsync_;

 public:
434 435
  PosixWritableFile(const std::string& fname, int fd, size_t capacity,
                    const EnvOptions& options) :
436 437 438 439 440 441 442 443
    filename_(fname),
    fd_(fd),
    cursize_(0),
    capacity_(capacity),
    buf_(new char[capacity]),
    filesize_(0),
    pending_sync_(false),
    pending_fsync_(false) {
444
    assert(!options.UseMmapWrites());
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
  }

  ~PosixWritableFile() {
    if (fd_ >= 0) {
      PosixWritableFile::Close();
    }
  }

  virtual Status Append(const Slice& data) {
    char* src = (char *)data.data();
    size_t left = data.size();
    Status s;
    pending_sync_ = true;
    pending_fsync_ = true;

460
    PrepareWrite(GetFileSize(), left);
461 462 463 464 465 466 467 468 469
    // if there is no space in the cache, then flush
    if (cursize_ + left > capacity_) {
      s = Flush();
      if (!s.ok()) {
        return s;
      }
      // Increase the buffer size, but capped at 1MB
      if (capacity_ < (1<<20)) {
        capacity_ *= 2;
M
Mayank Agarwal 已提交
470
        buf_.reset(new char[capacity_]);
471 472 473 474 475 476 477
      }
      assert(cursize_ == 0);
    }

    // if the write fits into the cache, then write to cache
    // otherwise do a write() syscall to write to OS buffers.
    if (cursize_ + left <= capacity_) {
M
Mayank Agarwal 已提交
478
      memcpy(buf_.get()+cursize_, src, left);
479 480 481
      cursize_ += left;
    } else {
      while (left != 0) {
C
Chip Turner 已提交
482
        ssize_t done = write(fd_, src, left);
483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510
        if (done < 0) {
          return IOError(filename_, errno);
        }
        left -= done;
        src += done;
      }
    }
    filesize_ += data.size();
    return Status::OK();
  }

  virtual Status Close() {
    Status s;
    s = Flush(); // flush cache to OS
    if (!s.ok()) {
    }
    if (close(fd_) < 0) {
      if (s.ok()) {
        s = IOError(filename_, errno);
      }
    }
    fd_ = -1;
    return s;
  }

  // write out the cached data to the OS cache
  virtual Status Flush() {
    size_t left = cursize_;
M
Mayank Agarwal 已提交
511
    char* src = buf_.get();
512
    while (left != 0) {
C
Chip Turner 已提交
513
      ssize_t done = write(fd_, src, left);
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543
      if (done < 0) {
        return IOError(filename_, errno);
      }
      left -= done;
      src += done;
    }
    cursize_ = 0;
    return Status::OK();
  }

  virtual Status Sync() {
    if (pending_sync_ && fdatasync(fd_) < 0) {
      return IOError(filename_, errno);
    }
    pending_sync_ = false;
    return Status::OK();
  }

  virtual Status Fsync() {
    if (pending_fsync_ && fsync(fd_) < 0) {
      return IOError(filename_, errno);
    }
    pending_fsync_ = false;
    pending_sync_ = false;
    return Status::OK();
  }

  virtual uint64_t GetFileSize() {
    return filesize_;
  }
544

545
#ifdef OS_LINUX
546 547 548 549 550 551 552
  virtual Status Allocate(off_t offset, off_t len) {
    if (!fallocate(fd_, FALLOC_FL_KEEP_SIZE, offset, len)) {
      return Status::OK();
    } else {
      return IOError(filename_, errno);
    }
  }
553
#endif
554 555
};

556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
static int LockOrUnlock(const std::string& fname, int fd, bool lock) {
  mutex_lockedFiles.Lock();
  if (lock) {
    // If it already exists in the lockedFiles set, then it is already locked,
    // and fail this lock attempt. Otherwise, insert it into lockedFiles.
    // This check is needed because fcntl() does not detect lock conflict
    // if the fcntl is issued by the same thread that earlier acquired
    // this lock.
    if (lockedFiles.insert(fname).second == false) {
      mutex_lockedFiles.Unlock();
      errno = ENOLCK;
      return -1;
    }
  } else {
    // If we are unlocking, then verify that we had locked it earlier,
    // it should already exist in lockedFiles. Remove it from lockedFiles.
    if (lockedFiles.erase(fname) != 1) {
      mutex_lockedFiles.Unlock();
      errno = ENOLCK;
      return -1;
    }
  }
J
jorlow@chromium.org 已提交
578 579 580 581 582 583 584
  errno = 0;
  struct flock f;
  memset(&f, 0, sizeof(f));
  f.l_type = (lock ? F_WRLCK : F_UNLCK);
  f.l_whence = SEEK_SET;
  f.l_start = 0;
  f.l_len = 0;        // Lock/unlock entire file
585 586 587 588 589 590 591
  int value = fcntl(fd, F_SETLK, &f);
  if (value == -1 && lock) {
    // if there is an error in locking, then remove the pathname from lockedfiles
    lockedFiles.erase(fname);
  }
  mutex_lockedFiles.Unlock();
  return value;
J
jorlow@chromium.org 已提交
592 593 594 595 596
}

class PosixFileLock : public FileLock {
 public:
  int fd_;
597
  std::string filename;
J
jorlow@chromium.org 已提交
598 599 600 601 602 603 604 605 606 607
};

class PosixEnv : public Env {
 public:
  PosixEnv();
  virtual ~PosixEnv() {
    fprintf(stderr, "Destroying Env::Default()\n");
    exit(1);
  }

608 609 610 611 612 613
  void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
    if ((options == nullptr || options->IsFDCloseOnExec()) && fd > 0) {
      fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
    }
  }

J
jorlow@chromium.org 已提交
614
  virtual Status NewSequentialFile(const std::string& fname,
615 616
                                   unique_ptr<SequentialFile>* result,
                                   const EnvOptions& options) {
617
    result->reset();
J
jorlow@chromium.org 已提交
618
    FILE* f = fopen(fname.c_str(), "r");
A
Abhishek Kona 已提交
619 620
    if (f == nullptr) {
      *result = nullptr;
621
      return IOError(fname, errno);
J
jorlow@chromium.org 已提交
622
    } else {
623 624
      int fd = fileno(f);
      SetFD_CLOEXEC(fd, &options);
625
      result->reset(new PosixSequentialFile(fname, f, options));
J
jorlow@chromium.org 已提交
626 627 628 629 630
      return Status::OK();
    }
  }

  virtual Status NewRandomAccessFile(const std::string& fname,
631 632
                                     unique_ptr<RandomAccessFile>* result,
                                     const EnvOptions& options) {
633
    result->reset();
634
    Status s;
J
jorlow@chromium.org 已提交
635
    int fd = open(fname.c_str(), O_RDONLY);
636
    SetFD_CLOEXEC(fd, &options);
J
jorlow@chromium.org 已提交
637
    if (fd < 0) {
638
      s = IOError(fname, errno);
639
    } else if (options.UseMmapReads() && sizeof(void*) >= 8) {
640 641 642 643 644 645
      // Use of mmap for random reads has been removed because it
      // kills performance when storage is fast.
      // Use mmap when virtual address-space is plentiful.
      uint64_t size;
      s = GetFileSize(fname, &size);
      if (s.ok()) {
A
Abhishek Kona 已提交
646
        void* base = mmap(nullptr, size, PROT_READ, MAP_SHARED, fd, 0);
647
        if (base != MAP_FAILED) {
648
          result->reset(new PosixMmapReadableFile(fname, base, size, options));
649 650 651 652 653
        } else {
          s = IOError(fname, errno);
        }
      }
      close(fd);
654
    } else {
655
      result->reset(new PosixRandomAccessFile(fname, fd, options));
J
jorlow@chromium.org 已提交
656
    }
657
    return s;
J
jorlow@chromium.org 已提交
658 659 660
  }

  virtual Status NewWritableFile(const std::string& fname,
661 662
                                 unique_ptr<WritableFile>* result,
                                 const EnvOptions& options) {
663
    result->reset();
J
jorlow@chromium.org 已提交
664 665 666
    Status s;
    const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
    if (fd < 0) {
667
      s = IOError(fname, errno);
J
jorlow@chromium.org 已提交
668
    } else {
669
      SetFD_CLOEXEC(fd, &options);
670 671 672
      if (options.UseMmapWrites()) {
        if (!checkedDiskForMmap_) {
          // this will be executed once in the program's lifetime.
A
Abhishek Kona 已提交
673
          // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
674 675 676 677
          if (!SupportsFastAllocate(fname)) {
            forceMmapOff = true;
          }
          checkedDiskForMmap_ = true;
A
Abhishek Kona 已提交
678 679
        }
      }
680 681
      if (options.UseMmapWrites() && !forceMmapOff) {
        result->reset(new PosixMmapFile(fname, fd, page_size_, options));
682
      } else {
683
        result->reset(new PosixWritableFile(fname, fd, 65536, options));
684
      }
J
jorlow@chromium.org 已提交
685 686 687 688 689 690 691 692 693 694 695 696
    }
    return s;
  }

  virtual bool FileExists(const std::string& fname) {
    return access(fname.c_str(), F_OK) == 0;
  }

  virtual Status GetChildren(const std::string& dir,
                             std::vector<std::string>* result) {
    result->clear();
    DIR* d = opendir(dir.c_str());
A
Abhishek Kona 已提交
697
    if (d == nullptr) {
698
      return IOError(dir, errno);
J
jorlow@chromium.org 已提交
699 700
    }
    struct dirent* entry;
A
Abhishek Kona 已提交
701
    while ((entry = readdir(d)) != nullptr) {
J
jorlow@chromium.org 已提交
702 703 704 705 706 707 708 709 710
      result->push_back(entry->d_name);
    }
    closedir(d);
    return Status::OK();
  }

  virtual Status DeleteFile(const std::string& fname) {
    Status result;
    if (unlink(fname.c_str()) != 0) {
711
      result = IOError(fname, errno);
J
jorlow@chromium.org 已提交
712 713 714 715 716 717 718
    }
    return result;
  };

  virtual Status CreateDir(const std::string& name) {
    Status result;
    if (mkdir(name.c_str(), 0755) != 0) {
719
      result = IOError(name, errno);
J
jorlow@chromium.org 已提交
720 721 722 723
    }
    return result;
  };

724 725 726 727 728
  virtual Status CreateDirIfMissing(const std::string& name) {
    Status result;
    if (mkdir(name.c_str(), 0755) != 0) {
      if (errno != EEXIST) {
        result = IOError(name, errno);
729 730 731 732
      } else if (!DirExists(name)) { // Check that name is actually a
                                     // directory.
        // Message is taken from mkdir
        result = Status::IOError("`"+name+"' exists but is not a directory");
733 734 735 736 737
      }
    }
    return result;
  };

J
jorlow@chromium.org 已提交
738 739 740
  virtual Status DeleteDir(const std::string& name) {
    Status result;
    if (rmdir(name.c_str()) != 0) {
741
      result = IOError(name, errno);
J
jorlow@chromium.org 已提交
742 743 744 745 746 747 748 749 750
    }
    return result;
  };

  virtual Status GetFileSize(const std::string& fname, uint64_t* size) {
    Status s;
    struct stat sbuf;
    if (stat(fname.c_str(), &sbuf) != 0) {
      *size = 0;
751
      s = IOError(fname, errno);
J
jorlow@chromium.org 已提交
752 753 754 755 756 757
    } else {
      *size = sbuf.st_size;
    }
    return s;
  }

758 759 760 761 762 763 764 765 766
  virtual Status GetFileModificationTime(const std::string& fname,
                                         uint64_t* file_mtime) {
    struct stat s;
    if (stat(fname.c_str(), &s) !=0) {
      return IOError(fname, errno);
    }
    *file_mtime = static_cast<uint64_t>(s.st_mtime);
    return Status::OK();
  }
J
jorlow@chromium.org 已提交
767 768 769
  virtual Status RenameFile(const std::string& src, const std::string& target) {
    Status result;
    if (rename(src.c_str(), target.c_str()) != 0) {
770
      result = IOError(src, errno);
J
jorlow@chromium.org 已提交
771 772 773 774 775
    }
    return result;
  }

  virtual Status LockFile(const std::string& fname, FileLock** lock) {
A
Abhishek Kona 已提交
776
    *lock = nullptr;
J
jorlow@chromium.org 已提交
777 778 779
    Status result;
    int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
    if (fd < 0) {
780
      result = IOError(fname, errno);
781
    } else if (LockOrUnlock(fname, fd, true) == -1) {
782
      result = IOError("lock " + fname, errno);
J
jorlow@chromium.org 已提交
783 784
      close(fd);
    } else {
785
      SetFD_CLOEXEC(fd, nullptr);
J
jorlow@chromium.org 已提交
786 787
      PosixFileLock* my_lock = new PosixFileLock;
      my_lock->fd_ = fd;
788
      my_lock->filename = fname;
J
jorlow@chromium.org 已提交
789 790 791 792 793 794 795 796
      *lock = my_lock;
    }
    return result;
  }

  virtual Status UnlockFile(FileLock* lock) {
    PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
    Status result;
797
    if (LockOrUnlock(my_lock->filename, my_lock->fd_, false) == -1) {
798
      result = IOError("unlock", errno);
J
jorlow@chromium.org 已提交
799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822
    }
    close(my_lock->fd_);
    delete my_lock;
    return result;
  }

  virtual void Schedule(void (*function)(void*), void* arg);

  virtual void StartThread(void (*function)(void* arg), void* arg);

  virtual Status GetTestDirectory(std::string* result) {
    const char* env = getenv("TEST_TMPDIR");
    if (env && env[0] != '\0') {
      *result = env;
    } else {
      char buf[100];
      snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid()));
      *result = buf;
    }
    // Directory may already exist
    CreateDir(*result);
    return Status::OK();
  }

823
  static uint64_t gettid() {
J
jorlow@chromium.org 已提交
824 825
    pthread_t tid = pthread_self();
    uint64_t thread_id = 0;
J
jorlow@chromium.org 已提交
826
    memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
827 828
    return thread_id;
  }
J
jorlow@chromium.org 已提交
829

830 831
  virtual Status NewLogger(const std::string& fname,
                           shared_ptr<Logger>* result) {
832
    FILE* f = fopen(fname.c_str(), "w");
A
Abhishek Kona 已提交
833
    if (f == nullptr) {
834
      result->reset();
835 836
      return IOError(fname, errno);
    } else {
837 838
      int fd = fileno(f);
      SetFD_CLOEXEC(fd, nullptr);
839
      result->reset(new PosixLogger(f, &PosixEnv::gettid));
840
      return Status::OK();
J
jorlow@chromium.org 已提交
841 842 843 844 845
    }
  }

  virtual uint64_t NowMicros() {
    struct timeval tv;
A
Abhishek Kona 已提交
846
    gettimeofday(&tv, nullptr);
J
jorlow@chromium.org 已提交
847 848 849 850 851 852 853
    return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
  }

  virtual void SleepForMicroseconds(int micros) {
    usleep(micros);
  }

854
  virtual Status GetHostName(char* name, uint64_t len) {
855 856 857 858 859 860 861 862 863 864 865
    int ret = gethostname(name, len);
    if (ret < 0) {
      if (errno == EFAULT || errno == EINVAL)
        return Status::InvalidArgument(strerror(errno));
      else
        return IOError("GetHostName", errno);
    }
    return Status::OK();
  }

  virtual Status GetCurrentTime(int64_t* unix_time) {
A
Abhishek Kona 已提交
866
    time_t ret = time(nullptr);
867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882
    if (ret == (time_t) -1) {
      return IOError("GetCurrentTime", errno);
    }
    *unix_time = (int64_t) ret;
    return Status::OK();
  }

  virtual Status GetAbsolutePath(const std::string& db_path,
      std::string* output_path) {
    if (db_path.find('/') == 0) {
      *output_path = db_path;
      return Status::OK();
    }

    char the_path[256];
    char* ret = getcwd(the_path, 256);
A
Abhishek Kona 已提交
883
    if (ret == nullptr) {
884 885 886 887 888 889 890
      return Status::IOError(strerror(errno));
    }

    *output_path = ret;
    return Status::OK();
  }

A
Abhishek Kona 已提交
891
  // Allow increasing the number of worker threads.
892
  virtual void SetBackgroundThreads(int num) {
H
Haobo Xu 已提交
893
    PthreadCall("lock", pthread_mutex_lock(&mu_));
894 895 896 897
    if (num > num_threads_) {
      num_threads_ = num;
      bgthread_.resize(num_threads_);
    }
H
Haobo Xu 已提交
898
    PthreadCall("unlock", pthread_mutex_unlock(&mu_));
899 900
  }

901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920
  virtual std::string TimeToString(uint64_t secondsSince1970) {
    const time_t seconds = (time_t)secondsSince1970;
    struct tm t;
    int maxsize = 64;
    std::string dummy;
    dummy.reserve(maxsize);
    dummy.resize(maxsize);
    char* p = &dummy[0];
    localtime_r(&seconds, &t);
    snprintf(p, maxsize,
             "%04d/%02d/%02d-%02d:%02d:%02d ",
             t.tm_year + 1900,
             t.tm_mon + 1,
             t.tm_mday,
             t.tm_hour,
             t.tm_min,
             t.tm_sec);
    return dummy;
  }

J
jorlow@chromium.org 已提交
921
 private:
922 923
  bool checkedDiskForMmap_;
  bool forceMmapOff; // do we override Env options?
A
Abhishek Kona 已提交
924

J
jorlow@chromium.org 已提交
925 926 927 928 929 930 931
  void PthreadCall(const char* label, int result) {
    if (result != 0) {
      fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
      exit(1);
    }
  }

932 933 934 935 936 937 938 939 940 941
  // Returns true iff the named directory exists and is a directory.
  virtual bool DirExists(const std::string& dname) {
    struct stat statbuf;
    if (stat(dname.c_str(), &statbuf) == 0) {
      return S_ISDIR(statbuf.st_mode);
    }
    return false; // stat() failed return false
  }


J
jorlow@chromium.org 已提交
942 943 944 945
  // BGThread() is the body of the background thread
  void BGThread();
  static void* BGThreadWrapper(void* arg) {
    reinterpret_cast<PosixEnv*>(arg)->BGThread();
A
Abhishek Kona 已提交
946
    return nullptr;
J
jorlow@chromium.org 已提交
947 948
  }

A
Abhishek Kona 已提交
949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965
  bool SupportsFastAllocate(const std::string& path) {
    struct statfs s;
    if (statfs(path.c_str(), &s)){
      return false;
    }
    switch (s.f_type) {
      case EXT4_SUPER_MAGIC:
        return true;
      case XFS_SUPER_MAGIC:
        return true;
      case TMPFS_MAGIC:
        return true;
      default:
        return false;
    }
  }

J
jorlow@chromium.org 已提交
966 967 968
  size_t page_size_;
  pthread_mutex_t mu_;
  pthread_cond_t bgsignal_;
969 970 971
  std::vector<pthread_t> bgthread_;
  int started_bgthread_;
  int num_threads_;
J
jorlow@chromium.org 已提交
972 973 974 975 976 977 978

  // Entry per Schedule() call
  struct BGItem { void* arg; void (*function)(void*); };
  typedef std::deque<BGItem> BGQueue;
  BGQueue queue_;
};

979 980 981
PosixEnv::PosixEnv() : checkedDiskForMmap_(false),
                       forceMmapOff(false),
                       page_size_(getpagesize()),
982
                       started_bgthread_(0),
H
Haobo Xu 已提交
983
                       num_threads_(1) {
A
Abhishek Kona 已提交
984 985
  PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
  PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr));
986
  bgthread_.resize(num_threads_);
J
jorlow@chromium.org 已提交
987 988 989 990 991 992
}

void PosixEnv::Schedule(void (*function)(void*), void* arg) {
  PthreadCall("lock", pthread_mutex_lock(&mu_));

  // Start background thread if necessary
993
  for (; started_bgthread_ < num_threads_; started_bgthread_++) {
J
jorlow@chromium.org 已提交
994 995
    PthreadCall(
        "create thread",
A
Abhishek Kona 已提交
996 997 998 999
        pthread_create(&bgthread_[started_bgthread_],
                       nullptr,
                       &PosixEnv::BGThreadWrapper,
                       this));
1000
    fprintf(stdout, "Created bg thread 0x%lx\n", bgthread_[started_bgthread_]);
J
jorlow@chromium.org 已提交
1001 1002 1003 1004 1005 1006
  }

  // Add to priority queue
  queue_.push_back(BGItem());
  queue_.back().function = function;
  queue_.back().arg = arg;
H
Haobo Xu 已提交
1007 1008 1009

  // always wake up at least one waiting thread.
  PthreadCall("signal", pthread_cond_signal(&bgsignal_));
J
jorlow@chromium.org 已提交
1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040

  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}

void PosixEnv::BGThread() {
  while (true) {
    // Wait until there is an item that is ready to run
    PthreadCall("lock", pthread_mutex_lock(&mu_));
    while (queue_.empty()) {
      PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
    }

    void (*function)(void*) = queue_.front().function;
    void* arg = queue_.front().arg;
    queue_.pop_front();

    PthreadCall("unlock", pthread_mutex_unlock(&mu_));
    (*function)(arg);
  }
}

namespace {
struct StartThreadState {
  void (*user_function)(void*);
  void* arg;
};
}
static void* StartThreadWrapper(void* arg) {
  StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
  state->user_function(state->arg);
  delete state;
A
Abhishek Kona 已提交
1041
  return nullptr;
J
jorlow@chromium.org 已提交
1042 1043 1044 1045 1046 1047 1048 1049
}

void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
  pthread_t t;
  StartThreadState* state = new StartThreadState;
  state->user_function = function;
  state->arg = arg;
  PthreadCall("start thread",
A
Abhishek Kona 已提交
1050
              pthread_create(&t, nullptr,  &StartThreadWrapper, state));
J
jorlow@chromium.org 已提交
1051 1052
}

H
Hans Wennborg 已提交
1053
}  // namespace
J
jorlow@chromium.org 已提交
1054 1055 1056 1057 1058 1059 1060 1061 1062 1063

static pthread_once_t once = PTHREAD_ONCE_INIT;
static Env* default_env;
static void InitDefaultEnv() { default_env = new PosixEnv; }

Env* Env::Default() {
  pthread_once(&once, InitDefaultEnv);
  return default_env;
}

H
Hans Wennborg 已提交
1064
}  // namespace leveldb