env_posix.cc 27.8 KB
Newer Older
J
jorlow@chromium.org 已提交
1 2 3 4 5
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include <deque>
6
#include <set>
J
jorlow@chromium.org 已提交
7 8 9 10 11 12 13
#include <dirent.h>
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
14
#include <sys/ioctl.h>
J
jorlow@chromium.org 已提交
15 16
#include <sys/mman.h>
#include <sys/stat.h>
A
Abhishek Kona 已提交
17
#include <sys/statfs.h>
J
jorlow@chromium.org 已提交
18 19
#include <sys/time.h>
#include <sys/types.h>
A
Abhishek Kona 已提交
20
#include <sys/vfs.h>
J
jorlow@chromium.org 已提交
21 22
#include <time.h>
#include <unistd.h>
23 24 25
#if defined(OS_LINUX)
#include <linux/fs.h>
#endif
J
jorlow@chromium.org 已提交
26 27 28
#if defined(LEVELDB_PLATFORM_ANDROID)
#include <sys/stat.h>
#endif
29 30
#include "leveldb/env.h"
#include "leveldb/slice.h"
J
jorlow@chromium.org 已提交
31
#include "port/port.h"
32
#include "util/coding.h"
J
jorlow@chromium.org 已提交
33
#include "util/logging.h"
34
#include "util/posix_logger.h"
J
jorlow@chromium.org 已提交
35

A
Abhishek Kona 已提交
36 37 38 39 40 41 42 43 44 45
#if !defined(TMPFS_MAGIC)
#define TMPFS_MAGIC 0x01021994
#endif
#if !defined(XFS_SUPER_MAGIC)
#define XFS_SUPER_MAGIC 0x58465342
#endif
#if !defined(EXT4_SUPER_MAGIC)
#define EXT4_SUPER_MAGIC 0xEF53
#endif

A
Abhishek Kona 已提交
46
bool useOsBuffer = 1;     // cache data in OS buffers
47
bool useFsReadAhead = 1;  // allow filesystem to do readaheads
48 49
bool useMmapRead = 0;     // do not use mmaps for reading files
bool useMmapWrite = 1;    // use mmaps for appending to files
50

J
jorlow@chromium.org 已提交
51 52 53 54
namespace leveldb {

namespace {

55 56 57 58
// list of pathnames that are locked
static std::set<std::string> lockedFiles;
static port::Mutex mutex_lockedFiles;

59 60 61 62
static Status IOError(const std::string& context, int err_number) {
  return Status::IOError(context, strerror(err_number));
}

J
jorlow@chromium.org 已提交
63 64 65 66
class PosixSequentialFile: public SequentialFile {
 private:
  std::string filename_;
  FILE* file_;
67 68
  int fd_;
  bool use_os_buffer = true;
J
jorlow@chromium.org 已提交
69 70

 public:
71 72 73 74 75 76 77
  PosixSequentialFile(const std::string& fname, FILE* f,
      const EnvOptions& options)
      : filename_(fname), file_(f) {
    fd_ = fileno(f);
    assert(!options.UseMmapReads());
    use_os_buffer = options.UseOsBuffer();
  }
J
jorlow@chromium.org 已提交
78 79 80 81 82 83 84 85 86 87 88
  virtual ~PosixSequentialFile() { fclose(file_); }

  virtual Status Read(size_t n, Slice* result, char* scratch) {
    Status s;
    size_t r = fread_unlocked(scratch, 1, n, file_);
    *result = Slice(scratch, r);
    if (r < n) {
      if (feof(file_)) {
        // We leave status as ok if we hit the end of the file
      } else {
        // A partial read with an error: return a non-ok status
89
        s = IOError(filename_, errno);
J
jorlow@chromium.org 已提交
90 91
      }
    }
92 93 94 95 96
    if (!use_os_buffer) {
      // we need to fadvise away the entire range of pages because
      // we do not want readahead pages to be cached.
      posix_fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages
    }
J
jorlow@chromium.org 已提交
97 98
    return s;
  }
99 100 101

  virtual Status Skip(uint64_t n) {
    if (fseek(file_, n, SEEK_CUR)) {
102
      return IOError(filename_, errno);
103 104 105
    }
    return Status::OK();
  }
J
jorlow@chromium.org 已提交
106 107
};

108
// pread() based random-access
J
jorlow@chromium.org 已提交
109 110 111 112
class PosixRandomAccessFile: public RandomAccessFile {
 private:
  std::string filename_;
  int fd_;
113
  bool use_os_buffer = true;
J
jorlow@chromium.org 已提交
114 115

 public:
116 117
  PosixRandomAccessFile(const std::string& fname, int fd,
                        const EnvOptions& options)
A
Abhishek Kona 已提交
118
      : filename_(fname), fd_(fd) {
119 120
    assert(!options.UseMmapReads());
    if (!options.UseReadahead()) { // disable read-aheads
121 122
      posix_fadvise(fd, 0, 0, POSIX_FADV_RANDOM);
    }
123
    use_os_buffer = options.UseOsBuffer();
124
  }
J
jorlow@chromium.org 已提交
125 126 127 128 129 130 131 132 133
  virtual ~PosixRandomAccessFile() { close(fd_); }

  virtual Status Read(uint64_t offset, size_t n, Slice* result,
                      char* scratch) const {
    Status s;
    ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
    *result = Slice(scratch, (r < 0) ? 0 : r);
    if (r < 0) {
      // An error: return a non-ok status
134
      s = IOError(filename_, errno);
J
jorlow@chromium.org 已提交
135
    }
136
    if (!use_os_buffer) {
137 138 139
      // we need to fadvise away the entire range of pages because
      // we do not want readahead pages to be cached.
      posix_fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages
140
    }
J
jorlow@chromium.org 已提交
141 142
    return s;
  }
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171

#if defined(OS_LINUX)
  virtual size_t GetUniqueId(char* id, size_t max_size) const {
    // TODO: possibly allow this function to handle tighter bounds.
    if (max_size < kMaxVarint64Length*3) {
      return 0;
    }

    struct stat buf;
    int result = fstat(fd_, &buf);
    if (result == -1) {
      return 0;
    }

    long version = 0;
    result = ioctl(fd_, FS_IOC_GETVERSION, &version);
    if (result == -1) {
      return 0;
    }
    uint64_t uversion = (uint64_t)version;

    char* rid = id;
    rid = EncodeVarint64(rid, buf.st_dev);
    rid = EncodeVarint64(rid, buf.st_ino);
    rid = EncodeVarint64(rid, uversion);
    assert(rid >= id);
    return static_cast<size_t>(rid-id);
  }
#endif
J
jorlow@chromium.org 已提交
172 173
};

174 175 176 177 178 179 180 181 182
// mmap() based random-access
class PosixMmapReadableFile: public RandomAccessFile {
 private:
  std::string filename_;
  void* mmapped_region_;
  size_t length_;

 public:
  // base[0,length-1] contains the mmapped contents of the file.
183 184 185 186 187 188 189
  PosixMmapReadableFile(const std::string& fname, void* base, size_t length,
                        const EnvOptions& options)
      : filename_(fname), mmapped_region_(base), length_(length) {
    assert(options.UseMmapReads());
    assert(options.UseOsBuffer());
    assert(options.UseReadahead());
  }
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
  virtual ~PosixMmapReadableFile() { munmap(mmapped_region_, length_); }

  virtual Status Read(uint64_t offset, size_t n, Slice* result,
                      char* scratch) const {
    Status s;
    if (offset + n > length_) {
      *result = Slice();
      s = IOError(filename_, EINVAL);
    } else {
      *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
    }
    return s;
  }
};

J
jorlow@chromium.org 已提交
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
// We preallocate up to an extra megabyte and use memcpy to append new
// data to the file.  This is safe since we either properly close the
// file before reading from it, or for log files, the reading code
// knows enough to skip zero suffixes.
class PosixMmapFile : public WritableFile {
 private:
  std::string filename_;
  int fd_;
  size_t page_size_;
  size_t map_size_;       // How much extra memory to map at a time
  char* base_;            // The mapped region
  char* limit_;           // Limit of the mapped region
  char* dst_;             // Where to write next  (in range [base_,limit_])
  char* last_sync_;       // Where have we synced up to
  uint64_t file_offset_;  // Offset of base_ in file

  // Have we done an munmap of unsynced data?
  bool pending_sync_;

  // Roundup x to a multiple of y
  static size_t Roundup(size_t x, size_t y) {
    return ((x + y - 1) / y) * y;
  }

  size_t TruncateToPageBoundary(size_t s) {
    s -= (s & (page_size_ - 1));
    assert((s % page_size_) == 0);
    return s;
  }

235 236
  bool UnmapCurrentRegion() {
    bool result = true;
A
Abhishek Kona 已提交
237
    if (base_ != nullptr) {
J
jorlow@chromium.org 已提交
238 239 240 241
      if (last_sync_ < limit_) {
        // Defer syncing this data until next Sync() call, if any
        pending_sync_ = true;
      }
242 243 244
      if (munmap(base_, limit_ - base_) != 0) {
        result = false;
      }
J
jorlow@chromium.org 已提交
245
      file_offset_ += limit_ - base_;
A
Abhishek Kona 已提交
246 247 248 249
      base_ = nullptr;
      limit_ = nullptr;
      last_sync_ = nullptr;
      dst_ = nullptr;
J
jorlow@chromium.org 已提交
250 251 252 253 254 255

      // Increase the amount we map the next time, but capped at 1MB
      if (map_size_ < (1<<20)) {
        map_size_ *= 2;
      }
    }
256
    return result;
J
jorlow@chromium.org 已提交
257 258
  }

A
Abhishek Kona 已提交
259
  Status MapNewRegion() {
A
Abhishek Kona 已提交
260
    assert(base_ == nullptr);
A
Abhishek Kona 已提交
261 262 263 264 265

    int alloc_status = posix_fallocate(fd_, file_offset_, map_size_);
    if (alloc_status != 0) {
      return Status::IOError("Error allocating space to file : " + filename_ +
        "Error : " + strerror(alloc_status));
J
jorlow@chromium.org 已提交
266
    }
A
Abhishek Kona 已提交
267 268


A
Abhishek Kona 已提交
269
    void* ptr = mmap(nullptr, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED,
J
jorlow@chromium.org 已提交
270 271
                     fd_, file_offset_);
    if (ptr == MAP_FAILED) {
A
Abhishek Kona 已提交
272
      return Status::IOError("MMap failed on " + filename_);
J
jorlow@chromium.org 已提交
273 274 275 276 277
    }
    base_ = reinterpret_cast<char*>(ptr);
    limit_ = base_ + map_size_;
    dst_ = base_;
    last_sync_ = base_;
A
Abhishek Kona 已提交
278
    return Status::OK();
J
jorlow@chromium.org 已提交
279 280 281
  }

 public:
282 283
  PosixMmapFile(const std::string& fname, int fd, size_t page_size,
                const EnvOptions& options)
J
jorlow@chromium.org 已提交
284 285 286 287
      : filename_(fname),
        fd_(fd),
        page_size_(page_size),
        map_size_(Roundup(65536, page_size)),
A
Abhishek Kona 已提交
288 289 290 291
        base_(nullptr),
        limit_(nullptr),
        dst_(nullptr),
        last_sync_(nullptr),
J
jorlow@chromium.org 已提交
292 293 294
        file_offset_(0),
        pending_sync_(false) {
    assert((page_size & (page_size - 1)) == 0);
295
    assert(options.UseMmapWrites());
J
jorlow@chromium.org 已提交
296 297 298 299 300 301 302 303 304 305 306 307
  }


  ~PosixMmapFile() {
    if (fd_ >= 0) {
      PosixMmapFile::Close();
    }
  }

  virtual Status Append(const Slice& data) {
    const char* src = data.data();
    size_t left = data.size();
308
    PrepareWrite(GetFileSize(), left);
J
jorlow@chromium.org 已提交
309 310 311 312 313
    while (left > 0) {
      assert(base_ <= dst_);
      assert(dst_ <= limit_);
      size_t avail = limit_ - dst_;
      if (avail == 0) {
A
Abhishek Kona 已提交
314 315 316 317 318
        if (UnmapCurrentRegion()) {
          Status s = MapNewRegion();
          if (!s.ok()) {
            return s;
          }
319
        }
J
jorlow@chromium.org 已提交
320 321 322 323 324 325 326 327 328 329 330 331 332 333
      }

      size_t n = (left <= avail) ? left : avail;
      memcpy(dst_, src, n);
      dst_ += n;
      src += n;
      left -= n;
    }
    return Status::OK();
  }

  virtual Status Close() {
    Status s;
    size_t unused = limit_ - dst_;
334 335 336
    if (!UnmapCurrentRegion()) {
      s = IOError(filename_, errno);
    } else if (unused > 0) {
J
jorlow@chromium.org 已提交
337 338
      // Trim the extra space at the end of the file
      if (ftruncate(fd_, file_offset_ - unused) < 0) {
339
        s = IOError(filename_, errno);
J
jorlow@chromium.org 已提交
340 341 342 343 344
      }
    }

    if (close(fd_) < 0) {
      if (s.ok()) {
345
        s = IOError(filename_, errno);
J
jorlow@chromium.org 已提交
346 347 348 349
      }
    }

    fd_ = -1;
A
Abhishek Kona 已提交
350 351
    base_ = nullptr;
    limit_ = nullptr;
J
jorlow@chromium.org 已提交
352 353 354 355 356 357 358 359 360 361 362 363 364 365
    return s;
  }

  virtual Status Flush() {
    return Status::OK();
  }

  virtual Status Sync() {
    Status s;

    if (pending_sync_) {
      // Some unmapped data was not synced
      pending_sync_ = false;
      if (fdatasync(fd_) < 0) {
366
        s = IOError(filename_, errno);
J
jorlow@chromium.org 已提交
367 368 369 370 371 372 373 374 375 376
      }
    }

    if (dst_ > last_sync_) {
      // Find the beginnings of the pages that contain the first and last
      // bytes to be synced.
      size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
      size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
      last_sync_ = dst_;
      if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
377
        s = IOError(filename_, errno);
J
jorlow@chromium.org 已提交
378 379 380 381 382
      }
    }

    return s;
  }
383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398

  /**
   * Flush data as well as metadata to stable storage.
   */
  virtual Status Fsync() {
    if (pending_sync_) {
      // Some unmapped data was not synced
      pending_sync_ = false;
      if (fsync(fd_) < 0) {
        return IOError(filename_, errno);
      }
    }
    // This invocation to Sync will not issue the call to
    // fdatasync because pending_sync_ has already been cleared.
    return Sync();
  }
399 400 401 402 403 404 405 406 407 408

  /**
   * Get the size of valid data in the file. This will not match the
   * size that is returned from the filesystem because we use mmap
   * to extend file by map_size every time.
   */
  virtual uint64_t GetFileSize() {
    size_t used = dst_ - base_;
    return file_offset_ + used;
  }
409

410
#ifdef OS_LINUX
411 412 413 414 415 416 417
  virtual Status Allocate(off_t offset, off_t len) {
    if (!fallocate(fd_, FALLOC_FL_KEEP_SIZE, offset, len)) {
      return Status::OK();
    } else {
      return IOError(filename_, errno);
    }
  }
418
#endif
J
jorlow@chromium.org 已提交
419 420
};

421 422 423 424 425 426 427
// Use posix write to write data to a file.
class PosixWritableFile : public WritableFile {
 private:
  const std::string filename_;
  int fd_;
  size_t cursize_;      // current size of cached data in buf_
  size_t capacity_;     // max size of buf_
M
Mayank Agarwal 已提交
428
  unique_ptr<char[]> buf_;           // a buffer to cache writes
429 430 431 432 433
  uint64_t filesize_;
  bool pending_sync_;
  bool pending_fsync_;

 public:
434 435
  PosixWritableFile(const std::string& fname, int fd, size_t capacity,
                    const EnvOptions& options) :
436 437 438 439 440 441 442 443
    filename_(fname),
    fd_(fd),
    cursize_(0),
    capacity_(capacity),
    buf_(new char[capacity]),
    filesize_(0),
    pending_sync_(false),
    pending_fsync_(false) {
444
    assert(!options.UseMmapWrites());
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
  }

  ~PosixWritableFile() {
    if (fd_ >= 0) {
      PosixWritableFile::Close();
    }
  }

  virtual Status Append(const Slice& data) {
    char* src = (char *)data.data();
    size_t left = data.size();
    Status s;
    pending_sync_ = true;
    pending_fsync_ = true;

460
    PrepareWrite(GetFileSize(), left);
461 462 463 464 465 466 467 468 469
    // if there is no space in the cache, then flush
    if (cursize_ + left > capacity_) {
      s = Flush();
      if (!s.ok()) {
        return s;
      }
      // Increase the buffer size, but capped at 1MB
      if (capacity_ < (1<<20)) {
        capacity_ *= 2;
M
Mayank Agarwal 已提交
470
        buf_.reset(new char[capacity_]);
471 472 473 474 475 476 477
      }
      assert(cursize_ == 0);
    }

    // if the write fits into the cache, then write to cache
    // otherwise do a write() syscall to write to OS buffers.
    if (cursize_ + left <= capacity_) {
M
Mayank Agarwal 已提交
478
      memcpy(buf_.get()+cursize_, src, left);
479 480 481
      cursize_ += left;
    } else {
      while (left != 0) {
C
Chip Turner 已提交
482
        ssize_t done = write(fd_, src, left);
483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510
        if (done < 0) {
          return IOError(filename_, errno);
        }
        left -= done;
        src += done;
      }
    }
    filesize_ += data.size();
    return Status::OK();
  }

  virtual Status Close() {
    Status s;
    s = Flush(); // flush cache to OS
    if (!s.ok()) {
    }
    if (close(fd_) < 0) {
      if (s.ok()) {
        s = IOError(filename_, errno);
      }
    }
    fd_ = -1;
    return s;
  }

  // write out the cached data to the OS cache
  virtual Status Flush() {
    size_t left = cursize_;
M
Mayank Agarwal 已提交
511
    char* src = buf_.get();
512
    while (left != 0) {
C
Chip Turner 已提交
513
      ssize_t done = write(fd_, src, left);
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543
      if (done < 0) {
        return IOError(filename_, errno);
      }
      left -= done;
      src += done;
    }
    cursize_ = 0;
    return Status::OK();
  }

  virtual Status Sync() {
    if (pending_sync_ && fdatasync(fd_) < 0) {
      return IOError(filename_, errno);
    }
    pending_sync_ = false;
    return Status::OK();
  }

  virtual Status Fsync() {
    if (pending_fsync_ && fsync(fd_) < 0) {
      return IOError(filename_, errno);
    }
    pending_fsync_ = false;
    pending_sync_ = false;
    return Status::OK();
  }

  virtual uint64_t GetFileSize() {
    return filesize_;
  }
544

545
#ifdef OS_LINUX
546 547 548 549 550 551 552
  virtual Status Allocate(off_t offset, off_t len) {
    if (!fallocate(fd_, FALLOC_FL_KEEP_SIZE, offset, len)) {
      return Status::OK();
    } else {
      return IOError(filename_, errno);
    }
  }
553
#endif
554 555
};

556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
static int LockOrUnlock(const std::string& fname, int fd, bool lock) {
  mutex_lockedFiles.Lock();
  if (lock) {
    // If it already exists in the lockedFiles set, then it is already locked,
    // and fail this lock attempt. Otherwise, insert it into lockedFiles.
    // This check is needed because fcntl() does not detect lock conflict
    // if the fcntl is issued by the same thread that earlier acquired
    // this lock.
    if (lockedFiles.insert(fname).second == false) {
      mutex_lockedFiles.Unlock();
      errno = ENOLCK;
      return -1;
    }
  } else {
    // If we are unlocking, then verify that we had locked it earlier,
    // it should already exist in lockedFiles. Remove it from lockedFiles.
    if (lockedFiles.erase(fname) != 1) {
      mutex_lockedFiles.Unlock();
      errno = ENOLCK;
      return -1;
    }
  }
J
jorlow@chromium.org 已提交
578 579 580 581 582 583 584
  errno = 0;
  struct flock f;
  memset(&f, 0, sizeof(f));
  f.l_type = (lock ? F_WRLCK : F_UNLCK);
  f.l_whence = SEEK_SET;
  f.l_start = 0;
  f.l_len = 0;        // Lock/unlock entire file
585 586 587 588 589 590 591
  int value = fcntl(fd, F_SETLK, &f);
  if (value == -1 && lock) {
    // if there is an error in locking, then remove the pathname from lockedfiles
    lockedFiles.erase(fname);
  }
  mutex_lockedFiles.Unlock();
  return value;
J
jorlow@chromium.org 已提交
592 593 594 595 596
}

class PosixFileLock : public FileLock {
 public:
  int fd_;
597
  std::string filename;
J
jorlow@chromium.org 已提交
598 599 600 601 602 603 604 605 606 607 608
};

class PosixEnv : public Env {
 public:
  PosixEnv();
  virtual ~PosixEnv() {
    fprintf(stderr, "Destroying Env::Default()\n");
    exit(1);
  }

  virtual Status NewSequentialFile(const std::string& fname,
609 610
                                   unique_ptr<SequentialFile>* result,
                                   const EnvOptions& options) {
611
    result->reset();
J
jorlow@chromium.org 已提交
612
    FILE* f = fopen(fname.c_str(), "r");
A
Abhishek Kona 已提交
613 614
    if (f == nullptr) {
      *result = nullptr;
615
      return IOError(fname, errno);
J
jorlow@chromium.org 已提交
616
    } else {
617
      result->reset(new PosixSequentialFile(fname, f, options));
J
jorlow@chromium.org 已提交
618 619 620 621 622
      return Status::OK();
    }
  }

  virtual Status NewRandomAccessFile(const std::string& fname,
623 624
                                     unique_ptr<RandomAccessFile>* result,
                                     const EnvOptions& options) {
625
    result->reset();
626
    Status s;
J
jorlow@chromium.org 已提交
627 628
    int fd = open(fname.c_str(), O_RDONLY);
    if (fd < 0) {
629
      s = IOError(fname, errno);
630
    } else if (options.UseMmapReads() && sizeof(void*) >= 8) {
631 632 633 634 635 636
      // Use of mmap for random reads has been removed because it
      // kills performance when storage is fast.
      // Use mmap when virtual address-space is plentiful.
      uint64_t size;
      s = GetFileSize(fname, &size);
      if (s.ok()) {
A
Abhishek Kona 已提交
637
        void* base = mmap(nullptr, size, PROT_READ, MAP_SHARED, fd, 0);
638
        if (base != MAP_FAILED) {
639
          result->reset(new PosixMmapReadableFile(fname, base, size, options));
640 641 642 643 644
        } else {
          s = IOError(fname, errno);
        }
      }
      close(fd);
645
    } else {
646
      result->reset(new PosixRandomAccessFile(fname, fd, options));
J
jorlow@chromium.org 已提交
647
    }
648
    return s;
J
jorlow@chromium.org 已提交
649 650 651
  }

  virtual Status NewWritableFile(const std::string& fname,
652 653
                                 unique_ptr<WritableFile>* result,
                                 const EnvOptions& options) {
654
    result->reset();
J
jorlow@chromium.org 已提交
655 656 657
    Status s;
    const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
    if (fd < 0) {
658
      s = IOError(fname, errno);
J
jorlow@chromium.org 已提交
659
    } else {
660 661 662
      if (options.UseMmapWrites()) {
        if (!checkedDiskForMmap_) {
          // this will be executed once in the program's lifetime.
A
Abhishek Kona 已提交
663
          // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
664 665 666 667
          if (!SupportsFastAllocate(fname)) {
            forceMmapOff = true;
          }
          checkedDiskForMmap_ = true;
A
Abhishek Kona 已提交
668 669
        }
      }
670 671
      if (options.UseMmapWrites() && !forceMmapOff) {
        result->reset(new PosixMmapFile(fname, fd, page_size_, options));
672
      } else {
673
        result->reset(new PosixWritableFile(fname, fd, 65536, options));
674
      }
J
jorlow@chromium.org 已提交
675 676 677 678 679 680 681 682 683 684 685 686
    }
    return s;
  }

  virtual bool FileExists(const std::string& fname) {
    return access(fname.c_str(), F_OK) == 0;
  }

  virtual Status GetChildren(const std::string& dir,
                             std::vector<std::string>* result) {
    result->clear();
    DIR* d = opendir(dir.c_str());
A
Abhishek Kona 已提交
687
    if (d == nullptr) {
688
      return IOError(dir, errno);
J
jorlow@chromium.org 已提交
689 690
    }
    struct dirent* entry;
A
Abhishek Kona 已提交
691
    while ((entry = readdir(d)) != nullptr) {
J
jorlow@chromium.org 已提交
692 693 694 695 696 697 698 699 700
      result->push_back(entry->d_name);
    }
    closedir(d);
    return Status::OK();
  }

  virtual Status DeleteFile(const std::string& fname) {
    Status result;
    if (unlink(fname.c_str()) != 0) {
701
      result = IOError(fname, errno);
J
jorlow@chromium.org 已提交
702 703 704 705 706 707 708
    }
    return result;
  };

  virtual Status CreateDir(const std::string& name) {
    Status result;
    if (mkdir(name.c_str(), 0755) != 0) {
709
      result = IOError(name, errno);
J
jorlow@chromium.org 已提交
710 711 712 713
    }
    return result;
  };

714 715 716 717 718
  virtual Status CreateDirIfMissing(const std::string& name) {
    Status result;
    if (mkdir(name.c_str(), 0755) != 0) {
      if (errno != EEXIST) {
        result = IOError(name, errno);
719 720 721 722
      } else if (!DirExists(name)) { // Check that name is actually a
                                     // directory.
        // Message is taken from mkdir
        result = Status::IOError("`"+name+"' exists but is not a directory");
723 724 725 726 727
      }
    }
    return result;
  };

J
jorlow@chromium.org 已提交
728 729 730
  virtual Status DeleteDir(const std::string& name) {
    Status result;
    if (rmdir(name.c_str()) != 0) {
731
      result = IOError(name, errno);
J
jorlow@chromium.org 已提交
732 733 734 735 736 737 738 739 740
    }
    return result;
  };

  virtual Status GetFileSize(const std::string& fname, uint64_t* size) {
    Status s;
    struct stat sbuf;
    if (stat(fname.c_str(), &sbuf) != 0) {
      *size = 0;
741
      s = IOError(fname, errno);
J
jorlow@chromium.org 已提交
742 743 744 745 746 747
    } else {
      *size = sbuf.st_size;
    }
    return s;
  }

748 749 750 751 752 753 754 755 756
  virtual Status GetFileModificationTime(const std::string& fname,
                                         uint64_t* file_mtime) {
    struct stat s;
    if (stat(fname.c_str(), &s) !=0) {
      return IOError(fname, errno);
    }
    *file_mtime = static_cast<uint64_t>(s.st_mtime);
    return Status::OK();
  }
J
jorlow@chromium.org 已提交
757 758 759
  virtual Status RenameFile(const std::string& src, const std::string& target) {
    Status result;
    if (rename(src.c_str(), target.c_str()) != 0) {
760
      result = IOError(src, errno);
J
jorlow@chromium.org 已提交
761 762 763 764 765
    }
    return result;
  }

  virtual Status LockFile(const std::string& fname, FileLock** lock) {
A
Abhishek Kona 已提交
766
    *lock = nullptr;
J
jorlow@chromium.org 已提交
767 768 769
    Status result;
    int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
    if (fd < 0) {
770
      result = IOError(fname, errno);
771
    } else if (LockOrUnlock(fname, fd, true) == -1) {
772
      result = IOError("lock " + fname, errno);
J
jorlow@chromium.org 已提交
773 774 775 776
      close(fd);
    } else {
      PosixFileLock* my_lock = new PosixFileLock;
      my_lock->fd_ = fd;
777
      my_lock->filename = fname;
J
jorlow@chromium.org 已提交
778 779 780 781 782 783 784 785
      *lock = my_lock;
    }
    return result;
  }

  virtual Status UnlockFile(FileLock* lock) {
    PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
    Status result;
786
    if (LockOrUnlock(my_lock->filename, my_lock->fd_, false) == -1) {
787
      result = IOError("unlock", errno);
J
jorlow@chromium.org 已提交
788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811
    }
    close(my_lock->fd_);
    delete my_lock;
    return result;
  }

  virtual void Schedule(void (*function)(void*), void* arg);

  virtual void StartThread(void (*function)(void* arg), void* arg);

  virtual Status GetTestDirectory(std::string* result) {
    const char* env = getenv("TEST_TMPDIR");
    if (env && env[0] != '\0') {
      *result = env;
    } else {
      char buf[100];
      snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid()));
      *result = buf;
    }
    // Directory may already exist
    CreateDir(*result);
    return Status::OK();
  }

812
  static uint64_t gettid() {
J
jorlow@chromium.org 已提交
813 814
    pthread_t tid = pthread_self();
    uint64_t thread_id = 0;
J
jorlow@chromium.org 已提交
815
    memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
816 817
    return thread_id;
  }
J
jorlow@chromium.org 已提交
818

819 820
  virtual Status NewLogger(const std::string& fname,
                           shared_ptr<Logger>* result) {
821
    FILE* f = fopen(fname.c_str(), "w");
A
Abhishek Kona 已提交
822
    if (f == nullptr) {
823
      result->reset();
824 825
      return IOError(fname, errno);
    } else {
826
      result->reset(new PosixLogger(f, &PosixEnv::gettid));
827
      return Status::OK();
J
jorlow@chromium.org 已提交
828 829 830 831 832
    }
  }

  virtual uint64_t NowMicros() {
    struct timeval tv;
A
Abhishek Kona 已提交
833
    gettimeofday(&tv, nullptr);
J
jorlow@chromium.org 已提交
834 835 836 837 838 839 840
    return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
  }

  virtual void SleepForMicroseconds(int micros) {
    usleep(micros);
  }

841
  virtual Status GetHostName(char* name, uint64_t len) {
842 843 844 845 846 847 848 849 850 851 852
    int ret = gethostname(name, len);
    if (ret < 0) {
      if (errno == EFAULT || errno == EINVAL)
        return Status::InvalidArgument(strerror(errno));
      else
        return IOError("GetHostName", errno);
    }
    return Status::OK();
  }

  virtual Status GetCurrentTime(int64_t* unix_time) {
A
Abhishek Kona 已提交
853
    time_t ret = time(nullptr);
854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869
    if (ret == (time_t) -1) {
      return IOError("GetCurrentTime", errno);
    }
    *unix_time = (int64_t) ret;
    return Status::OK();
  }

  virtual Status GetAbsolutePath(const std::string& db_path,
      std::string* output_path) {
    if (db_path.find('/') == 0) {
      *output_path = db_path;
      return Status::OK();
    }

    char the_path[256];
    char* ret = getcwd(the_path, 256);
A
Abhishek Kona 已提交
870
    if (ret == nullptr) {
871 872 873 874 875 876 877
      return Status::IOError(strerror(errno));
    }

    *output_path = ret;
    return Status::OK();
  }

A
Abhishek Kona 已提交
878
  // Allow increasing the number of worker threads.
879
  virtual void SetBackgroundThreads(int num) {
H
Haobo Xu 已提交
880
    PthreadCall("lock", pthread_mutex_lock(&mu_));
881 882 883 884
    if (num > num_threads_) {
      num_threads_ = num;
      bgthread_.resize(num_threads_);
    }
H
Haobo Xu 已提交
885
    PthreadCall("unlock", pthread_mutex_unlock(&mu_));
886 887
  }

888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907
  virtual std::string TimeToString(uint64_t secondsSince1970) {
    const time_t seconds = (time_t)secondsSince1970;
    struct tm t;
    int maxsize = 64;
    std::string dummy;
    dummy.reserve(maxsize);
    dummy.resize(maxsize);
    char* p = &dummy[0];
    localtime_r(&seconds, &t);
    snprintf(p, maxsize,
             "%04d/%02d/%02d-%02d:%02d:%02d ",
             t.tm_year + 1900,
             t.tm_mon + 1,
             t.tm_mday,
             t.tm_hour,
             t.tm_min,
             t.tm_sec);
    return dummy;
  }

J
jorlow@chromium.org 已提交
908
 private:
909 910
  bool checkedDiskForMmap_;
  bool forceMmapOff; // do we override Env options?
A
Abhishek Kona 已提交
911

J
jorlow@chromium.org 已提交
912 913 914 915 916 917 918
  void PthreadCall(const char* label, int result) {
    if (result != 0) {
      fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
      exit(1);
    }
  }

919 920 921 922 923 924 925 926 927 928
  // Returns true iff the named directory exists and is a directory.
  virtual bool DirExists(const std::string& dname) {
    struct stat statbuf;
    if (stat(dname.c_str(), &statbuf) == 0) {
      return S_ISDIR(statbuf.st_mode);
    }
    return false; // stat() failed return false
  }


J
jorlow@chromium.org 已提交
929 930 931 932
  // BGThread() is the body of the background thread
  void BGThread();
  static void* BGThreadWrapper(void* arg) {
    reinterpret_cast<PosixEnv*>(arg)->BGThread();
A
Abhishek Kona 已提交
933
    return nullptr;
J
jorlow@chromium.org 已提交
934 935
  }

A
Abhishek Kona 已提交
936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952
  bool SupportsFastAllocate(const std::string& path) {
    struct statfs s;
    if (statfs(path.c_str(), &s)){
      return false;
    }
    switch (s.f_type) {
      case EXT4_SUPER_MAGIC:
        return true;
      case XFS_SUPER_MAGIC:
        return true;
      case TMPFS_MAGIC:
        return true;
      default:
        return false;
    }
  }

J
jorlow@chromium.org 已提交
953 954 955
  size_t page_size_;
  pthread_mutex_t mu_;
  pthread_cond_t bgsignal_;
956 957 958
  std::vector<pthread_t> bgthread_;
  int started_bgthread_;
  int num_threads_;
J
jorlow@chromium.org 已提交
959 960 961 962 963 964 965

  // Entry per Schedule() call
  struct BGItem { void* arg; void (*function)(void*); };
  typedef std::deque<BGItem> BGQueue;
  BGQueue queue_;
};

966 967 968
PosixEnv::PosixEnv() : checkedDiskForMmap_(false),
                       forceMmapOff(false),
                       page_size_(getpagesize()),
969
                       started_bgthread_(0),
H
Haobo Xu 已提交
970
                       num_threads_(1) {
A
Abhishek Kona 已提交
971 972
  PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
  PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr));
973
  bgthread_.resize(num_threads_);
J
jorlow@chromium.org 已提交
974 975 976 977 978 979
}

void PosixEnv::Schedule(void (*function)(void*), void* arg) {
  PthreadCall("lock", pthread_mutex_lock(&mu_));

  // Start background thread if necessary
980
  for (; started_bgthread_ < num_threads_; started_bgthread_++) {
J
jorlow@chromium.org 已提交
981 982
    PthreadCall(
        "create thread",
A
Abhishek Kona 已提交
983 984 985 986
        pthread_create(&bgthread_[started_bgthread_],
                       nullptr,
                       &PosixEnv::BGThreadWrapper,
                       this));
987
    fprintf(stdout, "Created bg thread 0x%lx\n", bgthread_[started_bgthread_]);
J
jorlow@chromium.org 已提交
988 989 990 991 992 993
  }

  // Add to priority queue
  queue_.push_back(BGItem());
  queue_.back().function = function;
  queue_.back().arg = arg;
H
Haobo Xu 已提交
994 995 996

  // always wake up at least one waiting thread.
  PthreadCall("signal", pthread_cond_signal(&bgsignal_));
J
jorlow@chromium.org 已提交
997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027

  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}

void PosixEnv::BGThread() {
  while (true) {
    // Wait until there is an item that is ready to run
    PthreadCall("lock", pthread_mutex_lock(&mu_));
    while (queue_.empty()) {
      PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
    }

    void (*function)(void*) = queue_.front().function;
    void* arg = queue_.front().arg;
    queue_.pop_front();

    PthreadCall("unlock", pthread_mutex_unlock(&mu_));
    (*function)(arg);
  }
}

namespace {
struct StartThreadState {
  void (*user_function)(void*);
  void* arg;
};
}
static void* StartThreadWrapper(void* arg) {
  StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
  state->user_function(state->arg);
  delete state;
A
Abhishek Kona 已提交
1028
  return nullptr;
J
jorlow@chromium.org 已提交
1029 1030 1031 1032 1033 1034 1035 1036
}

void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
  pthread_t t;
  StartThreadState* state = new StartThreadState;
  state->user_function = function;
  state->arg = arg;
  PthreadCall("start thread",
A
Abhishek Kona 已提交
1037
              pthread_create(&t, nullptr,  &StartThreadWrapper, state));
J
jorlow@chromium.org 已提交
1038 1039
}

H
Hans Wennborg 已提交
1040
}  // namespace
J
jorlow@chromium.org 已提交
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050

static pthread_once_t once = PTHREAD_ONCE_INIT;
static Env* default_env;
static void InitDefaultEnv() { default_env = new PosixEnv; }

Env* Env::Default() {
  pthread_once(&once, InitDefaultEnv);
  return default_env;
}

H
Hans Wennborg 已提交
1051
}  // namespace leveldb