env_posix.cc 27.8 KB
Newer Older
J
jorlow@chromium.org 已提交
1 2 3 4 5
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include <deque>
6
#include <set>
J
jorlow@chromium.org 已提交
7 8 9 10 11 12 13
#include <dirent.h>
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
14
#include <sys/ioctl.h>
J
jorlow@chromium.org 已提交
15 16
#include <sys/mman.h>
#include <sys/stat.h>
A
Abhishek Kona 已提交
17
#include <sys/statfs.h>
J
jorlow@chromium.org 已提交
18 19
#include <sys/time.h>
#include <sys/types.h>
A
Abhishek Kona 已提交
20
#include <sys/vfs.h>
J
jorlow@chromium.org 已提交
21 22
#include <time.h>
#include <unistd.h>
23 24 25
#if defined(OS_LINUX)
#include <linux/fs.h>
#endif
J
jorlow@chromium.org 已提交
26 27 28
#if defined(LEVELDB_PLATFORM_ANDROID)
#include <sys/stat.h>
#endif
29 30
#include "leveldb/env.h"
#include "leveldb/slice.h"
J
jorlow@chromium.org 已提交
31
#include "port/port.h"
32
#include "util/coding.h"
J
jorlow@chromium.org 已提交
33
#include "util/logging.h"
34
#include "util/posix_logger.h"
J
jorlow@chromium.org 已提交
35

A
Abhishek Kona 已提交
36 37 38 39 40 41 42 43 44 45
#if !defined(TMPFS_MAGIC)
#define TMPFS_MAGIC 0x01021994
#endif
#if !defined(XFS_SUPER_MAGIC)
#define XFS_SUPER_MAGIC 0x58465342
#endif
#if !defined(EXT4_SUPER_MAGIC)
#define EXT4_SUPER_MAGIC 0xEF53
#endif

A
Abhishek Kona 已提交
46
bool useOsBuffer = 1;     // cache data in OS buffers
47
bool useFsReadAhead = 1;  // allow filesystem to do readaheads
48 49
bool useMmapRead = 0;     // do not use mmaps for reading files
bool useMmapWrite = 1;    // use mmaps for appending to files
50

J
jorlow@chromium.org 已提交
51 52 53 54
namespace leveldb {

namespace {

55 56 57 58
// list of pathnames that are locked
static std::set<std::string> lockedFiles;
static port::Mutex mutex_lockedFiles;

59 60 61 62
static Status IOError(const std::string& context, int err_number) {
  return Status::IOError(context, strerror(err_number));
}

J
jorlow@chromium.org 已提交
63 64 65 66
class PosixSequentialFile: public SequentialFile {
 private:
  std::string filename_;
  FILE* file_;
67 68
  int fd_;
  bool use_os_buffer = true;
J
jorlow@chromium.org 已提交
69 70

 public:
71 72 73 74 75 76 77
  PosixSequentialFile(const std::string& fname, FILE* f,
      const EnvOptions& options)
      : filename_(fname), file_(f) {
    fd_ = fileno(f);
    assert(!options.UseMmapReads());
    use_os_buffer = options.UseOsBuffer();
  }
J
jorlow@chromium.org 已提交
78 79 80 81 82 83 84 85 86 87 88
  virtual ~PosixSequentialFile() { fclose(file_); }

  virtual Status Read(size_t n, Slice* result, char* scratch) {
    Status s;
    size_t r = fread_unlocked(scratch, 1, n, file_);
    *result = Slice(scratch, r);
    if (r < n) {
      if (feof(file_)) {
        // We leave status as ok if we hit the end of the file
      } else {
        // A partial read with an error: return a non-ok status
89
        s = IOError(filename_, errno);
J
jorlow@chromium.org 已提交
90 91
      }
    }
92 93 94 95 96
    if (!use_os_buffer) {
      // we need to fadvise away the entire range of pages because
      // we do not want readahead pages to be cached.
      posix_fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages
    }
J
jorlow@chromium.org 已提交
97 98
    return s;
  }
99 100 101

  virtual Status Skip(uint64_t n) {
    if (fseek(file_, n, SEEK_CUR)) {
102
      return IOError(filename_, errno);
103 104 105
    }
    return Status::OK();
  }
J
jorlow@chromium.org 已提交
106 107
};

108
// pread() based random-access
J
jorlow@chromium.org 已提交
109 110 111 112
class PosixRandomAccessFile: public RandomAccessFile {
 private:
  std::string filename_;
  int fd_;
113
  bool use_os_buffer = true;
J
jorlow@chromium.org 已提交
114 115

 public:
116 117
  PosixRandomAccessFile(const std::string& fname, int fd,
                        const EnvOptions& options)
A
Abhishek Kona 已提交
118
      : filename_(fname), fd_(fd) {
119 120
    assert(!options.UseMmapReads());
    if (!options.UseReadahead()) { // disable read-aheads
121 122
      posix_fadvise(fd, 0, 0, POSIX_FADV_RANDOM);
    }
123
    use_os_buffer = options.UseOsBuffer();
124
  }
J
jorlow@chromium.org 已提交
125 126 127 128 129 130 131 132 133
  virtual ~PosixRandomAccessFile() { close(fd_); }

  virtual Status Read(uint64_t offset, size_t n, Slice* result,
                      char* scratch) const {
    Status s;
    ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
    *result = Slice(scratch, (r < 0) ? 0 : r);
    if (r < 0) {
      // An error: return a non-ok status
134
      s = IOError(filename_, errno);
J
jorlow@chromium.org 已提交
135
    }
136
    if (!use_os_buffer) {
137 138 139
      // we need to fadvise away the entire range of pages because
      // we do not want readahead pages to be cached.
      posix_fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages
140
    }
J
jorlow@chromium.org 已提交
141 142
    return s;
  }
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171

#if defined(OS_LINUX)
  virtual size_t GetUniqueId(char* id, size_t max_size) const {
    // TODO: possibly allow this function to handle tighter bounds.
    if (max_size < kMaxVarint64Length*3) {
      return 0;
    }

    struct stat buf;
    int result = fstat(fd_, &buf);
    if (result == -1) {
      return 0;
    }

    long version = 0;
    result = ioctl(fd_, FS_IOC_GETVERSION, &version);
    if (result == -1) {
      return 0;
    }
    uint64_t uversion = (uint64_t)version;

    char* rid = id;
    rid = EncodeVarint64(rid, buf.st_dev);
    rid = EncodeVarint64(rid, buf.st_ino);
    rid = EncodeVarint64(rid, uversion);
    assert(rid >= id);
    return static_cast<size_t>(rid-id);
  }
#endif
J
jorlow@chromium.org 已提交
172 173
};

174 175 176 177 178 179 180 181 182
// mmap() based random-access
class PosixMmapReadableFile: public RandomAccessFile {
 private:
  std::string filename_;
  void* mmapped_region_;
  size_t length_;

 public:
  // base[0,length-1] contains the mmapped contents of the file.
183 184 185 186 187 188 189
  PosixMmapReadableFile(const std::string& fname, void* base, size_t length,
                        const EnvOptions& options)
      : filename_(fname), mmapped_region_(base), length_(length) {
    assert(options.UseMmapReads());
    assert(options.UseOsBuffer());
    assert(options.UseReadahead());
  }
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
  virtual ~PosixMmapReadableFile() { munmap(mmapped_region_, length_); }

  virtual Status Read(uint64_t offset, size_t n, Slice* result,
                      char* scratch) const {
    Status s;
    if (offset + n > length_) {
      *result = Slice();
      s = IOError(filename_, EINVAL);
    } else {
      *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
    }
    return s;
  }
};

J
jorlow@chromium.org 已提交
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
// We preallocate up to an extra megabyte and use memcpy to append new
// data to the file.  This is safe since we either properly close the
// file before reading from it, or for log files, the reading code
// knows enough to skip zero suffixes.
class PosixMmapFile : public WritableFile {
 private:
  std::string filename_;
  int fd_;
  size_t page_size_;
  size_t map_size_;       // How much extra memory to map at a time
  char* base_;            // The mapped region
  char* limit_;           // Limit of the mapped region
  char* dst_;             // Where to write next  (in range [base_,limit_])
  char* last_sync_;       // Where have we synced up to
  uint64_t file_offset_;  // Offset of base_ in file

  // Have we done an munmap of unsynced data?
  bool pending_sync_;

  // Roundup x to a multiple of y
  static size_t Roundup(size_t x, size_t y) {
    return ((x + y - 1) / y) * y;
  }

  size_t TruncateToPageBoundary(size_t s) {
    s -= (s & (page_size_ - 1));
    assert((s % page_size_) == 0);
    return s;
  }

235 236
  bool UnmapCurrentRegion() {
    bool result = true;
A
Abhishek Kona 已提交
237
    if (base_ != nullptr) {
J
jorlow@chromium.org 已提交
238 239 240 241
      if (last_sync_ < limit_) {
        // Defer syncing this data until next Sync() call, if any
        pending_sync_ = true;
      }
242 243 244
      if (munmap(base_, limit_ - base_) != 0) {
        result = false;
      }
J
jorlow@chromium.org 已提交
245
      file_offset_ += limit_ - base_;
A
Abhishek Kona 已提交
246 247 248 249
      base_ = nullptr;
      limit_ = nullptr;
      last_sync_ = nullptr;
      dst_ = nullptr;
J
jorlow@chromium.org 已提交
250 251 252 253 254 255

      // Increase the amount we map the next time, but capped at 1MB
      if (map_size_ < (1<<20)) {
        map_size_ *= 2;
      }
    }
256
    return result;
J
jorlow@chromium.org 已提交
257 258
  }

A
Abhishek Kona 已提交
259
  Status MapNewRegion() {
A
Abhishek Kona 已提交
260
    assert(base_ == nullptr);
A
Abhishek Kona 已提交
261 262 263 264 265

    int alloc_status = posix_fallocate(fd_, file_offset_, map_size_);
    if (alloc_status != 0) {
      return Status::IOError("Error allocating space to file : " + filename_ +
        "Error : " + strerror(alloc_status));
J
jorlow@chromium.org 已提交
266
    }
A
Abhishek Kona 已提交
267 268


A
Abhishek Kona 已提交
269
    void* ptr = mmap(nullptr, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED,
J
jorlow@chromium.org 已提交
270 271
                     fd_, file_offset_);
    if (ptr == MAP_FAILED) {
A
Abhishek Kona 已提交
272
      return Status::IOError("MMap failed on " + filename_);
J
jorlow@chromium.org 已提交
273 274 275 276 277
    }
    base_ = reinterpret_cast<char*>(ptr);
    limit_ = base_ + map_size_;
    dst_ = base_;
    last_sync_ = base_;
A
Abhishek Kona 已提交
278
    return Status::OK();
J
jorlow@chromium.org 已提交
279 280 281
  }

 public:
282 283
  PosixMmapFile(const std::string& fname, int fd, size_t page_size,
                const EnvOptions& options)
J
jorlow@chromium.org 已提交
284 285 286 287
      : filename_(fname),
        fd_(fd),
        page_size_(page_size),
        map_size_(Roundup(65536, page_size)),
A
Abhishek Kona 已提交
288 289 290 291
        base_(nullptr),
        limit_(nullptr),
        dst_(nullptr),
        last_sync_(nullptr),
J
jorlow@chromium.org 已提交
292 293 294
        file_offset_(0),
        pending_sync_(false) {
    assert((page_size & (page_size - 1)) == 0);
295
    assert(options.UseMmapWrites());
J
jorlow@chromium.org 已提交
296 297 298 299 300 301 302 303 304 305 306 307
  }


  ~PosixMmapFile() {
    if (fd_ >= 0) {
      PosixMmapFile::Close();
    }
  }

  virtual Status Append(const Slice& data) {
    const char* src = data.data();
    size_t left = data.size();
308
    PrepareWrite(GetFileSize(), left);
J
jorlow@chromium.org 已提交
309 310 311 312 313
    while (left > 0) {
      assert(base_ <= dst_);
      assert(dst_ <= limit_);
      size_t avail = limit_ - dst_;
      if (avail == 0) {
A
Abhishek Kona 已提交
314 315 316 317 318
        if (UnmapCurrentRegion()) {
          Status s = MapNewRegion();
          if (!s.ok()) {
            return s;
          }
319
        }
J
jorlow@chromium.org 已提交
320 321 322 323 324 325 326 327 328 329 330 331 332 333
      }

      size_t n = (left <= avail) ? left : avail;
      memcpy(dst_, src, n);
      dst_ += n;
      src += n;
      left -= n;
    }
    return Status::OK();
  }

  virtual Status Close() {
    Status s;
    size_t unused = limit_ - dst_;
334 335 336
    if (!UnmapCurrentRegion()) {
      s = IOError(filename_, errno);
    } else if (unused > 0) {
J
jorlow@chromium.org 已提交
337 338
      // Trim the extra space at the end of the file
      if (ftruncate(fd_, file_offset_ - unused) < 0) {
339
        s = IOError(filename_, errno);
J
jorlow@chromium.org 已提交
340 341 342 343 344
      }
    }

    if (close(fd_) < 0) {
      if (s.ok()) {
345
        s = IOError(filename_, errno);
J
jorlow@chromium.org 已提交
346 347 348 349
      }
    }

    fd_ = -1;
A
Abhishek Kona 已提交
350 351
    base_ = nullptr;
    limit_ = nullptr;
J
jorlow@chromium.org 已提交
352 353 354 355 356 357 358 359 360 361 362 363 364 365
    return s;
  }

  virtual Status Flush() {
    return Status::OK();
  }

  virtual Status Sync() {
    Status s;

    if (pending_sync_) {
      // Some unmapped data was not synced
      pending_sync_ = false;
      if (fdatasync(fd_) < 0) {
366
        s = IOError(filename_, errno);
J
jorlow@chromium.org 已提交
367 368 369 370 371 372 373 374 375 376
      }
    }

    if (dst_ > last_sync_) {
      // Find the beginnings of the pages that contain the first and last
      // bytes to be synced.
      size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
      size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
      last_sync_ = dst_;
      if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
377
        s = IOError(filename_, errno);
J
jorlow@chromium.org 已提交
378 379 380 381 382
      }
    }

    return s;
  }
383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398

  /**
   * Flush data as well as metadata to stable storage.
   */
  virtual Status Fsync() {
    if (pending_sync_) {
      // Some unmapped data was not synced
      pending_sync_ = false;
      if (fsync(fd_) < 0) {
        return IOError(filename_, errno);
      }
    }
    // This invocation to Sync will not issue the call to
    // fdatasync because pending_sync_ has already been cleared.
    return Sync();
  }
399 400 401 402 403 404 405 406 407 408

  /**
   * Get the size of valid data in the file. This will not match the
   * size that is returned from the filesystem because we use mmap
   * to extend file by map_size every time.
   */
  virtual uint64_t GetFileSize() {
    size_t used = dst_ - base_;
    return file_offset_ + used;
  }
409

410
#ifdef OS_LINUX
411 412 413 414 415 416 417
  virtual Status Allocate(off_t offset, off_t len) {
    if (!fallocate(fd_, FALLOC_FL_KEEP_SIZE, offset, len)) {
      return Status::OK();
    } else {
      return IOError(filename_, errno);
    }
  }
418
#endif
J
jorlow@chromium.org 已提交
419 420
};

421 422 423 424 425 426 427 428 429 430 431 432 433
// Use posix write to write data to a file.
class PosixWritableFile : public WritableFile {
 private:
  const std::string filename_;
  int fd_;
  size_t cursize_;      // current size of cached data in buf_
  size_t capacity_;     // max size of buf_
  char* buf_;           // a buffer to cache writes
  uint64_t filesize_;
  bool pending_sync_;
  bool pending_fsync_;

 public:
434 435
  PosixWritableFile(const std::string& fname, int fd, size_t capacity,
                    const EnvOptions& options) :
436 437 438 439 440 441 442 443
    filename_(fname),
    fd_(fd),
    cursize_(0),
    capacity_(capacity),
    buf_(new char[capacity]),
    filesize_(0),
    pending_sync_(false),
    pending_fsync_(false) {
444
    assert(!options.UseMmapWrites());
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
  }

  ~PosixWritableFile() {
    if (fd_ >= 0) {
      PosixWritableFile::Close();
    }
    delete buf_;
    buf_ = 0;
  }

  virtual Status Append(const Slice& data) {
    char* src = (char *)data.data();
    size_t left = data.size();
    Status s;
    pending_sync_ = true;
    pending_fsync_ = true;

462
    PrepareWrite(GetFileSize(), left);
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484
    // if there is no space in the cache, then flush
    if (cursize_ + left > capacity_) {
      s = Flush();
      if (!s.ok()) {
        return s;
      }
      // Increase the buffer size, but capped at 1MB
      if (capacity_ < (1<<20)) {
        delete buf_;
        capacity_ *= 2;
        buf_ = new char[capacity_];
      }
      assert(cursize_ == 0);
    }

    // if the write fits into the cache, then write to cache
    // otherwise do a write() syscall to write to OS buffers.
    if (cursize_ + left <= capacity_) {
      memcpy(buf_+cursize_, src, left);
      cursize_ += left;
    } else {
      while (left != 0) {
C
Chip Turner 已提交
485
        ssize_t done = write(fd_, src, left);
486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515
        if (done < 0) {
          return IOError(filename_, errno);
        }
        left -= done;
        src += done;
      }
    }
    filesize_ += data.size();
    return Status::OK();
  }

  virtual Status Close() {
    Status s;
    s = Flush(); // flush cache to OS
    if (!s.ok()) {
    }
    if (close(fd_) < 0) {
      if (s.ok()) {
        s = IOError(filename_, errno);
      }
    }
    fd_ = -1;
    return s;
  }

  // write out the cached data to the OS cache
  virtual Status Flush() {
    size_t left = cursize_;
    char* src = buf_;
    while (left != 0) {
C
Chip Turner 已提交
516
      ssize_t done = write(fd_, src, left);
517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546
      if (done < 0) {
        return IOError(filename_, errno);
      }
      left -= done;
      src += done;
    }
    cursize_ = 0;
    return Status::OK();
  }

  virtual Status Sync() {
    if (pending_sync_ && fdatasync(fd_) < 0) {
      return IOError(filename_, errno);
    }
    pending_sync_ = false;
    return Status::OK();
  }

  virtual Status Fsync() {
    if (pending_fsync_ && fsync(fd_) < 0) {
      return IOError(filename_, errno);
    }
    pending_fsync_ = false;
    pending_sync_ = false;
    return Status::OK();
  }

  virtual uint64_t GetFileSize() {
    return filesize_;
  }
547

548
#ifdef OS_LINUX
549 550 551 552 553 554 555
  virtual Status Allocate(off_t offset, off_t len) {
    if (!fallocate(fd_, FALLOC_FL_KEEP_SIZE, offset, len)) {
      return Status::OK();
    } else {
      return IOError(filename_, errno);
    }
  }
556
#endif
557 558
};

559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580
static int LockOrUnlock(const std::string& fname, int fd, bool lock) {
  mutex_lockedFiles.Lock();
  if (lock) {
    // If it already exists in the lockedFiles set, then it is already locked,
    // and fail this lock attempt. Otherwise, insert it into lockedFiles.
    // This check is needed because fcntl() does not detect lock conflict
    // if the fcntl is issued by the same thread that earlier acquired
    // this lock.
    if (lockedFiles.insert(fname).second == false) {
      mutex_lockedFiles.Unlock();
      errno = ENOLCK;
      return -1;
    }
  } else {
    // If we are unlocking, then verify that we had locked it earlier,
    // it should already exist in lockedFiles. Remove it from lockedFiles.
    if (lockedFiles.erase(fname) != 1) {
      mutex_lockedFiles.Unlock();
      errno = ENOLCK;
      return -1;
    }
  }
J
jorlow@chromium.org 已提交
581 582 583 584 585 586 587
  errno = 0;
  struct flock f;
  memset(&f, 0, sizeof(f));
  f.l_type = (lock ? F_WRLCK : F_UNLCK);
  f.l_whence = SEEK_SET;
  f.l_start = 0;
  f.l_len = 0;        // Lock/unlock entire file
588 589 590 591 592 593 594
  int value = fcntl(fd, F_SETLK, &f);
  if (value == -1 && lock) {
    // if there is an error in locking, then remove the pathname from lockedfiles
    lockedFiles.erase(fname);
  }
  mutex_lockedFiles.Unlock();
  return value;
J
jorlow@chromium.org 已提交
595 596 597 598 599
}

class PosixFileLock : public FileLock {
 public:
  int fd_;
600
  std::string filename;
J
jorlow@chromium.org 已提交
601 602 603 604 605 606 607 608 609 610 611
};

class PosixEnv : public Env {
 public:
  PosixEnv();
  virtual ~PosixEnv() {
    fprintf(stderr, "Destroying Env::Default()\n");
    exit(1);
  }

  virtual Status NewSequentialFile(const std::string& fname,
612 613
                                   unique_ptr<SequentialFile>* result,
                                   const EnvOptions& options) {
614
    result->reset();
J
jorlow@chromium.org 已提交
615
    FILE* f = fopen(fname.c_str(), "r");
A
Abhishek Kona 已提交
616 617
    if (f == nullptr) {
      *result = nullptr;
618
      return IOError(fname, errno);
J
jorlow@chromium.org 已提交
619
    } else {
620
      result->reset(new PosixSequentialFile(fname, f, options));
J
jorlow@chromium.org 已提交
621 622 623 624 625
      return Status::OK();
    }
  }

  virtual Status NewRandomAccessFile(const std::string& fname,
626 627
                                     unique_ptr<RandomAccessFile>* result,
                                     const EnvOptions& options) {
628
    result->reset();
629
    Status s;
J
jorlow@chromium.org 已提交
630 631
    int fd = open(fname.c_str(), O_RDONLY);
    if (fd < 0) {
632
      s = IOError(fname, errno);
633
    } else if (options.UseMmapReads() && sizeof(void*) >= 8) {
634 635 636 637 638 639
      // Use of mmap for random reads has been removed because it
      // kills performance when storage is fast.
      // Use mmap when virtual address-space is plentiful.
      uint64_t size;
      s = GetFileSize(fname, &size);
      if (s.ok()) {
A
Abhishek Kona 已提交
640
        void* base = mmap(nullptr, size, PROT_READ, MAP_SHARED, fd, 0);
641
        if (base != MAP_FAILED) {
642
          result->reset(new PosixMmapReadableFile(fname, base, size, options));
643 644 645 646 647
        } else {
          s = IOError(fname, errno);
        }
      }
      close(fd);
648
    } else {
649
      result->reset(new PosixRandomAccessFile(fname, fd, options));
J
jorlow@chromium.org 已提交
650
    }
651
    return s;
J
jorlow@chromium.org 已提交
652 653 654
  }

  virtual Status NewWritableFile(const std::string& fname,
655 656
                                 unique_ptr<WritableFile>* result,
                                 const EnvOptions& options) {
657
    result->reset();
J
jorlow@chromium.org 已提交
658 659 660
    Status s;
    const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
    if (fd < 0) {
661
      s = IOError(fname, errno);
J
jorlow@chromium.org 已提交
662
    } else {
663 664 665
      if (options.UseMmapWrites()) {
        if (!checkedDiskForMmap_) {
          // this will be executed once in the program's lifetime.
A
Abhishek Kona 已提交
666
          // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
667 668 669 670
          if (!SupportsFastAllocate(fname)) {
            forceMmapOff = true;
          }
          checkedDiskForMmap_ = true;
A
Abhishek Kona 已提交
671 672
        }
      }
673 674
      if (options.UseMmapWrites() && !forceMmapOff) {
        result->reset(new PosixMmapFile(fname, fd, page_size_, options));
675
      } else {
676
        result->reset(new PosixWritableFile(fname, fd, 65536, options));
677
      }
J
jorlow@chromium.org 已提交
678 679 680 681 682 683 684 685 686 687 688 689
    }
    return s;
  }

  virtual bool FileExists(const std::string& fname) {
    return access(fname.c_str(), F_OK) == 0;
  }

  virtual Status GetChildren(const std::string& dir,
                             std::vector<std::string>* result) {
    result->clear();
    DIR* d = opendir(dir.c_str());
A
Abhishek Kona 已提交
690
    if (d == nullptr) {
691
      return IOError(dir, errno);
J
jorlow@chromium.org 已提交
692 693
    }
    struct dirent* entry;
A
Abhishek Kona 已提交
694
    while ((entry = readdir(d)) != nullptr) {
J
jorlow@chromium.org 已提交
695 696 697 698 699 700 701 702 703
      result->push_back(entry->d_name);
    }
    closedir(d);
    return Status::OK();
  }

  virtual Status DeleteFile(const std::string& fname) {
    Status result;
    if (unlink(fname.c_str()) != 0) {
704
      result = IOError(fname, errno);
J
jorlow@chromium.org 已提交
705 706 707 708 709 710 711
    }
    return result;
  };

  virtual Status CreateDir(const std::string& name) {
    Status result;
    if (mkdir(name.c_str(), 0755) != 0) {
712
      result = IOError(name, errno);
J
jorlow@chromium.org 已提交
713 714 715 716
    }
    return result;
  };

717 718 719 720 721
  virtual Status CreateDirIfMissing(const std::string& name) {
    Status result;
    if (mkdir(name.c_str(), 0755) != 0) {
      if (errno != EEXIST) {
        result = IOError(name, errno);
722 723 724 725
      } else if (!DirExists(name)) { // Check that name is actually a
                                     // directory.
        // Message is taken from mkdir
        result = Status::IOError("`"+name+"' exists but is not a directory");
726 727 728 729 730
      }
    }
    return result;
  };

J
jorlow@chromium.org 已提交
731 732 733
  virtual Status DeleteDir(const std::string& name) {
    Status result;
    if (rmdir(name.c_str()) != 0) {
734
      result = IOError(name, errno);
J
jorlow@chromium.org 已提交
735 736 737 738 739 740 741 742 743
    }
    return result;
  };

  virtual Status GetFileSize(const std::string& fname, uint64_t* size) {
    Status s;
    struct stat sbuf;
    if (stat(fname.c_str(), &sbuf) != 0) {
      *size = 0;
744
      s = IOError(fname, errno);
J
jorlow@chromium.org 已提交
745 746 747 748 749 750
    } else {
      *size = sbuf.st_size;
    }
    return s;
  }

751 752 753 754 755 756 757 758 759
  virtual Status GetFileModificationTime(const std::string& fname,
                                         uint64_t* file_mtime) {
    struct stat s;
    if (stat(fname.c_str(), &s) !=0) {
      return IOError(fname, errno);
    }
    *file_mtime = static_cast<uint64_t>(s.st_mtime);
    return Status::OK();
  }
J
jorlow@chromium.org 已提交
760 761 762
  virtual Status RenameFile(const std::string& src, const std::string& target) {
    Status result;
    if (rename(src.c_str(), target.c_str()) != 0) {
763
      result = IOError(src, errno);
J
jorlow@chromium.org 已提交
764 765 766 767 768
    }
    return result;
  }

  virtual Status LockFile(const std::string& fname, FileLock** lock) {
A
Abhishek Kona 已提交
769
    *lock = nullptr;
J
jorlow@chromium.org 已提交
770 771 772
    Status result;
    int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
    if (fd < 0) {
773
      result = IOError(fname, errno);
774
    } else if (LockOrUnlock(fname, fd, true) == -1) {
775
      result = IOError("lock " + fname, errno);
J
jorlow@chromium.org 已提交
776 777 778 779
      close(fd);
    } else {
      PosixFileLock* my_lock = new PosixFileLock;
      my_lock->fd_ = fd;
780
      my_lock->filename = fname;
J
jorlow@chromium.org 已提交
781 782 783 784 785 786 787 788
      *lock = my_lock;
    }
    return result;
  }

  virtual Status UnlockFile(FileLock* lock) {
    PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
    Status result;
789
    if (LockOrUnlock(my_lock->filename, my_lock->fd_, false) == -1) {
790
      result = IOError("unlock", errno);
J
jorlow@chromium.org 已提交
791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814
    }
    close(my_lock->fd_);
    delete my_lock;
    return result;
  }

  virtual void Schedule(void (*function)(void*), void* arg);

  virtual void StartThread(void (*function)(void* arg), void* arg);

  virtual Status GetTestDirectory(std::string* result) {
    const char* env = getenv("TEST_TMPDIR");
    if (env && env[0] != '\0') {
      *result = env;
    } else {
      char buf[100];
      snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid()));
      *result = buf;
    }
    // Directory may already exist
    CreateDir(*result);
    return Status::OK();
  }

815
  static uint64_t gettid() {
J
jorlow@chromium.org 已提交
816 817
    pthread_t tid = pthread_self();
    uint64_t thread_id = 0;
J
jorlow@chromium.org 已提交
818
    memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
819 820
    return thread_id;
  }
J
jorlow@chromium.org 已提交
821

822 823
  virtual Status NewLogger(const std::string& fname,
                           shared_ptr<Logger>* result) {
824
    FILE* f = fopen(fname.c_str(), "w");
A
Abhishek Kona 已提交
825
    if (f == nullptr) {
826
      result->reset();
827 828
      return IOError(fname, errno);
    } else {
829
      result->reset(new PosixLogger(f, &PosixEnv::gettid));
830
      return Status::OK();
J
jorlow@chromium.org 已提交
831 832 833 834 835
    }
  }

  virtual uint64_t NowMicros() {
    struct timeval tv;
A
Abhishek Kona 已提交
836
    gettimeofday(&tv, nullptr);
J
jorlow@chromium.org 已提交
837 838 839 840 841 842 843
    return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
  }

  virtual void SleepForMicroseconds(int micros) {
    usleep(micros);
  }

844
  virtual Status GetHostName(char* name, uint64_t len) {
845 846 847 848 849 850 851 852 853 854 855
    int ret = gethostname(name, len);
    if (ret < 0) {
      if (errno == EFAULT || errno == EINVAL)
        return Status::InvalidArgument(strerror(errno));
      else
        return IOError("GetHostName", errno);
    }
    return Status::OK();
  }

  virtual Status GetCurrentTime(int64_t* unix_time) {
A
Abhishek Kona 已提交
856
    time_t ret = time(nullptr);
857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872
    if (ret == (time_t) -1) {
      return IOError("GetCurrentTime", errno);
    }
    *unix_time = (int64_t) ret;
    return Status::OK();
  }

  virtual Status GetAbsolutePath(const std::string& db_path,
      std::string* output_path) {
    if (db_path.find('/') == 0) {
      *output_path = db_path;
      return Status::OK();
    }

    char the_path[256];
    char* ret = getcwd(the_path, 256);
A
Abhishek Kona 已提交
873
    if (ret == nullptr) {
874 875 876 877 878 879 880
      return Status::IOError(strerror(errno));
    }

    *output_path = ret;
    return Status::OK();
  }

A
Abhishek Kona 已提交
881
  // Allow increasing the number of worker threads.
882
  virtual void SetBackgroundThreads(int num) {
H
Haobo Xu 已提交
883
    PthreadCall("lock", pthread_mutex_lock(&mu_));
884 885 886 887
    if (num > num_threads_) {
      num_threads_ = num;
      bgthread_.resize(num_threads_);
    }
H
Haobo Xu 已提交
888
    PthreadCall("unlock", pthread_mutex_unlock(&mu_));
889 890
  }

891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910
  virtual std::string TimeToString(uint64_t secondsSince1970) {
    const time_t seconds = (time_t)secondsSince1970;
    struct tm t;
    int maxsize = 64;
    std::string dummy;
    dummy.reserve(maxsize);
    dummy.resize(maxsize);
    char* p = &dummy[0];
    localtime_r(&seconds, &t);
    snprintf(p, maxsize,
             "%04d/%02d/%02d-%02d:%02d:%02d ",
             t.tm_year + 1900,
             t.tm_mon + 1,
             t.tm_mday,
             t.tm_hour,
             t.tm_min,
             t.tm_sec);
    return dummy;
  }

J
jorlow@chromium.org 已提交
911
 private:
912 913
  bool checkedDiskForMmap_;
  bool forceMmapOff; // do we override Env options?
A
Abhishek Kona 已提交
914

J
jorlow@chromium.org 已提交
915 916 917 918 919 920 921
  void PthreadCall(const char* label, int result) {
    if (result != 0) {
      fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
      exit(1);
    }
  }

922 923 924 925 926 927 928 929 930 931
  // Returns true iff the named directory exists and is a directory.
  virtual bool DirExists(const std::string& dname) {
    struct stat statbuf;
    if (stat(dname.c_str(), &statbuf) == 0) {
      return S_ISDIR(statbuf.st_mode);
    }
    return false; // stat() failed return false
  }


J
jorlow@chromium.org 已提交
932 933 934 935
  // BGThread() is the body of the background thread
  void BGThread();
  static void* BGThreadWrapper(void* arg) {
    reinterpret_cast<PosixEnv*>(arg)->BGThread();
A
Abhishek Kona 已提交
936
    return nullptr;
J
jorlow@chromium.org 已提交
937 938
  }

A
Abhishek Kona 已提交
939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955
  bool SupportsFastAllocate(const std::string& path) {
    struct statfs s;
    if (statfs(path.c_str(), &s)){
      return false;
    }
    switch (s.f_type) {
      case EXT4_SUPER_MAGIC:
        return true;
      case XFS_SUPER_MAGIC:
        return true;
      case TMPFS_MAGIC:
        return true;
      default:
        return false;
    }
  }

J
jorlow@chromium.org 已提交
956 957 958
  size_t page_size_;
  pthread_mutex_t mu_;
  pthread_cond_t bgsignal_;
959 960 961
  std::vector<pthread_t> bgthread_;
  int started_bgthread_;
  int num_threads_;
J
jorlow@chromium.org 已提交
962 963 964 965 966 967 968

  // Entry per Schedule() call
  struct BGItem { void* arg; void (*function)(void*); };
  typedef std::deque<BGItem> BGQueue;
  BGQueue queue_;
};

969 970 971
PosixEnv::PosixEnv() : checkedDiskForMmap_(false),
                       forceMmapOff(false),
                       page_size_(getpagesize()),
972
                       started_bgthread_(0),
H
Haobo Xu 已提交
973
                       num_threads_(1) {
A
Abhishek Kona 已提交
974 975
  PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
  PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr));
976
  bgthread_.resize(num_threads_);
J
jorlow@chromium.org 已提交
977 978 979 980 981 982
}

void PosixEnv::Schedule(void (*function)(void*), void* arg) {
  PthreadCall("lock", pthread_mutex_lock(&mu_));

  // Start background thread if necessary
983
  for (; started_bgthread_ < num_threads_; started_bgthread_++) {
J
jorlow@chromium.org 已提交
984 985
    PthreadCall(
        "create thread",
A
Abhishek Kona 已提交
986 987 988 989
        pthread_create(&bgthread_[started_bgthread_],
                       nullptr,
                       &PosixEnv::BGThreadWrapper,
                       this));
990
    fprintf(stdout, "Created bg thread 0x%lx\n", bgthread_[started_bgthread_]);
J
jorlow@chromium.org 已提交
991 992 993 994 995 996
  }

  // Add to priority queue
  queue_.push_back(BGItem());
  queue_.back().function = function;
  queue_.back().arg = arg;
H
Haobo Xu 已提交
997 998 999

  // always wake up at least one waiting thread.
  PthreadCall("signal", pthread_cond_signal(&bgsignal_));
J
jorlow@chromium.org 已提交
1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030

  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}

void PosixEnv::BGThread() {
  while (true) {
    // Wait until there is an item that is ready to run
    PthreadCall("lock", pthread_mutex_lock(&mu_));
    while (queue_.empty()) {
      PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
    }

    void (*function)(void*) = queue_.front().function;
    void* arg = queue_.front().arg;
    queue_.pop_front();

    PthreadCall("unlock", pthread_mutex_unlock(&mu_));
    (*function)(arg);
  }
}

namespace {
struct StartThreadState {
  void (*user_function)(void*);
  void* arg;
};
}
static void* StartThreadWrapper(void* arg) {
  StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
  state->user_function(state->arg);
  delete state;
A
Abhishek Kona 已提交
1031
  return nullptr;
J
jorlow@chromium.org 已提交
1032 1033 1034 1035 1036 1037 1038 1039
}

void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
  pthread_t t;
  StartThreadState* state = new StartThreadState;
  state->user_function = function;
  state->arg = arg;
  PthreadCall("start thread",
A
Abhishek Kona 已提交
1040
              pthread_create(&t, nullptr,  &StartThreadWrapper, state));
J
jorlow@chromium.org 已提交
1041 1042
}

H
Hans Wennborg 已提交
1043
}  // namespace
J
jorlow@chromium.org 已提交
1044 1045 1046 1047 1048 1049 1050 1051 1052 1053

static pthread_once_t once = PTHREAD_ONCE_INIT;
static Env* default_env;
static void InitDefaultEnv() { default_env = new PosixEnv; }

Env* Env::Default() {
  pthread_once(&once, InitDefaultEnv);
  return default_env;
}

H
Hans Wennborg 已提交
1054
}  // namespace leveldb