io_posix.cc 32.9 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5 6 7 8 9 10
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#ifdef ROCKSDB_LIB_IO_POSIX
11
#include "env/io_posix.h"
S
sdong 已提交
12 13
#include <errno.h>
#include <fcntl.h>
K
krad 已提交
14
#include <algorithm>
S
sdong 已提交
15 16
#if defined(OS_LINUX)
#include <linux/fs.h>
17
#include <linux/falloc.h>
S
sdong 已提交
18 19 20 21 22 23 24 25 26 27 28
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#ifdef OS_LINUX
#include <sys/statfs.h>
#include <sys/syscall.h>
29
#include <sys/sysmacros.h>
S
sdong 已提交
30
#endif
31
#include "monitoring/iostats_context_imp.h"
S
sdong 已提交
32 33
#include "port/port.h"
#include "rocksdb/slice.h"
34
#include "test_util/sync_point.h"
S
sdong 已提交
35 36
#include "util/coding.h"
#include "util/string_util.h"
37

S
Stream  
Shaohua Li 已提交
38 39
#if defined(OS_LINUX) && !defined(F_SET_RW_HINT)
#define F_LINUX_SPECIFIC_BASE 1024
40
#define F_SET_RW_HINT (F_LINUX_SPECIFIC_BASE + 12)
S
Stream  
Shaohua Li 已提交
41 42
#endif

43 44 45
namespace rocksdb {

// A wrapper for fadvise, if the platform doesn't support fadvise,
46
// it will simply return 0.
47 48 49 50
int Fadvise(int fd, off_t offset, size_t len, int advice) {
#ifdef OS_LINUX
  return posix_fadvise(fd, offset, len, advice);
#else
51 52 53 54
  (void)fd;
  (void)offset;
  (void)len;
  (void)advice;
55 56 57 58
  return 0;  // simply do nothing.
#endif
}

A
Aaron Gao 已提交
59
namespace {
60

61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
// On MacOS (and probably *BSD), the posix write and pwrite calls do not support
// buffers larger than 2^31-1 bytes. These two wrappers fix this issue by
// cutting the buffer in 1GB chunks. We use this chunk size to be sure to keep
// the writes aligned.

bool PosixWrite(int fd, const char* buf, size_t nbyte) {
  const size_t kLimit1Gb = 1UL << 30;

  const char* src = buf;
  size_t left = nbyte;

  while (left != 0) {
    size_t bytes_to_write = std::min(left, kLimit1Gb);

    ssize_t done = write(fd, src, bytes_to_write);
    if (done < 0) {
      if (errno == EINTR) {
        continue;
      }
      return false;
    }
    left -= done;
    src += done;
  }
  return true;
}

bool PosixPositionedWrite(int fd, const char* buf, size_t nbyte, off_t offset) {
  const size_t kLimit1Gb = 1UL << 30;

  const char* src = buf;
  size_t left = nbyte;

  while (left != 0) {
    size_t bytes_to_write = std::min(left, kLimit1Gb);

    ssize_t done = pwrite(fd, src, bytes_to_write, offset);
    if (done < 0) {
      if (errno == EINTR) {
        continue;
      }
      return false;
    }
    left -= done;
    offset += done;
    src += done;
  }

  return true;
}

A
Aaron Gao 已提交
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
size_t GetLogicalBufferSize(int __attribute__((__unused__)) fd) {
#ifdef OS_LINUX
  struct stat buf;
  int result = fstat(fd, &buf);
  if (result == -1) {
    return kDefaultPageSize;
  }
  if (major(buf.st_dev) == 0) {
    // Unnamed devices (e.g. non-device mounts), reserved as null device number.
    // These don't have an entry in /sys/dev/block/. Return a sensible default.
    return kDefaultPageSize;
  }

  // Reading queue/logical_block_size does not require special permissions.
  const int kBufferSize = 100;
  char path[kBufferSize];
  char real_path[PATH_MAX + 1];
  snprintf(path, kBufferSize, "/sys/dev/block/%u:%u", major(buf.st_dev),
           minor(buf.st_dev));
  if (realpath(path, real_path) == nullptr) {
    return kDefaultPageSize;
  }
  std::string device_dir(real_path);
  if (!device_dir.empty() && device_dir.back() == '/') {
    device_dir.pop_back();
  }
138 139
  // NOTE: sda3 and nvme0n1p1 do not have a `queue/` subdir, only the parent sda
  // and nvme0n1 have it.
A
Aaron Gao 已提交
140 141 142
  // $ ls -al '/sys/dev/block/8:3'
  // lrwxrwxrwx. 1 root root 0 Jun 26 01:38 /sys/dev/block/8:3 ->
  // ../../block/sda/sda3
143 144 145
  // $ ls -al '/sys/dev/block/259:4'
  // lrwxrwxrwx 1 root root 0 Jan 31 16:04 /sys/dev/block/259:4 ->
  // ../../devices/pci0000:17/0000:17:00.0/0000:18:00.0/nvme/nvme0/nvme0n1/nvme0n1p1
A
Aaron Gao 已提交
146 147 148 149 150 151 152 153
  size_t parent_end = device_dir.rfind('/', device_dir.length() - 1);
  if (parent_end == std::string::npos) {
    return kDefaultPageSize;
  }
  size_t parent_begin = device_dir.rfind('/', parent_end - 1);
  if (parent_begin == std::string::npos) {
    return kDefaultPageSize;
  }
154 155 156 157 158
  std::string parent =
      device_dir.substr(parent_begin + 1, parent_end - parent_begin - 1);
  std::string child = device_dir.substr(parent_end + 1, std::string::npos);
  if (parent != "block" &&
      (child.compare(0, 4, "nvme") || child.find('p') != std::string::npos)) {
A
Aaron Gao 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
    device_dir = device_dir.substr(0, parent_end);
  }
  std::string fname = device_dir + "/queue/logical_block_size";
  FILE* fp;
  size_t size = 0;
  fp = fopen(fname.c_str(), "r");
  if (fp != nullptr) {
    char* line = nullptr;
    size_t len = 0;
    if (getline(&line, &len, fp) != -1) {
      sscanf(line, "%zu", &size);
    }
    free(line);
    fclose(fp);
  }
  if (size != 0 && (size & (size - 1)) == 0) {
    return size;
  }
#endif
  return kDefaultPageSize;
}
180 181 182 183 184 185 186 187 188

#ifdef ROCKSDB_RANGESYNC_PRESENT

#if !defined(ZFS_SUPER_MAGIC)
// The magic number for ZFS was not exposed until recently. It should be fixed
// forever so we can just copy the magic number here.
#define ZFS_SUPER_MAGIC 0x2fc12fc1
#endif

189 190 191 192 193 194
bool IsSyncFileRangeSupported(int fd) {
  // The approach taken in this function is to build a blacklist of cases where
  // we know `sync_file_range` definitely will not work properly despite passing
  // the compile-time check (`ROCKSDB_RANGESYNC_PRESENT`). If we are unsure, or
  // if any of the checks fail in unexpected ways, we allow `sync_file_range` to
  // be used. This way should minimize risk of impacting existing use cases.
195 196 197
  struct statfs buf;
  int ret = fstatfs(fd, &buf);
  assert(ret == 0);
198
  if (ret == 0 && buf.f_type == ZFS_SUPER_MAGIC) {
199 200 201 202 203 204
    // Testing on ZFS showed the writeback did not happen asynchronously when
    // `sync_file_range` was called, even though it returned success. Avoid it
    // and use `fdatasync` instead to preserve the contract of `bytes_per_sync`,
    // even though this'll incur extra I/O for metadata.
    return false;
  }
205 206 207 208 209 210 211 212 213 214 215 216

  ret = sync_file_range(fd, 0 /* offset */, 0 /* nbytes */, 0 /* flags */);
  assert(!(ret == -1 && errno != ENOSYS));
  if (ret == -1 && errno == ENOSYS) {
    // `sync_file_range` is not implemented on all platforms even if
    // compile-time checks pass and a supported filesystem is in-use. For
    // example, using ext4 on WSL (Windows Subsystem for Linux),
    // `sync_file_range()` returns `ENOSYS`
    // ("Function not implemented").
    return false;
  }
  // None of the cases on the blacklist matched, so allow `sync_file_range` use.
217 218 219 220 221 222 223 224
  return true;
}

#undef ZFS_SUPER_MAGIC

#endif  // ROCKSDB_RANGESYNC_PRESENT

}  // anonymous namespace
A
Aaron Gao 已提交
225

K
krad 已提交
226 227 228
/*
 * DirectIOHelper
 */
229
#ifndef NDEBUG
K
krad 已提交
230 231
namespace {

A
Aaron Gao 已提交
232 233 234
bool IsSectorAligned(const size_t off, size_t sector_size) {
  return off % sector_size == 0;
}
K
krad 已提交
235

236 237
bool IsSectorAligned(const void* ptr, size_t sector_size) {
  return uintptr_t(ptr) % sector_size == 0;
K
krad 已提交
238 239
}

240
}  // namespace
241
#endif
K
krad 已提交
242

243 244 245
/*
 * PosixSequentialFile
 */
A
Aaron Gao 已提交
246 247
PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* file,
                                         int fd, const EnvOptions& options)
248
    : filename_(fname),
A
Aaron Gao 已提交
249 250
      file_(file),
      fd_(fd),
A
Aaron Gao 已提交
251 252
      use_direct_io_(options.use_direct_reads),
      logical_sector_size_(GetLogicalBufferSize(fd_)) {
A
Aaron Gao 已提交
253 254
  assert(!options.use_direct_reads || !options.use_mmap_reads);
}
255

A
Aaron Gao 已提交
256
PosixSequentialFile::~PosixSequentialFile() {
257
  if (!use_direct_io()) {
A
Aaron Gao 已提交
258 259 260 261 262 263 264
    assert(file_);
    fclose(file_);
  } else {
    assert(fd_);
    close(fd_);
  }
}
265 266

Status PosixSequentialFile::Read(size_t n, Slice* result, char* scratch) {
267
  assert(result != nullptr && !use_direct_io());
268 269 270 271 272 273 274 275 276 277 278 279 280 281
  Status s;
  size_t r = 0;
  do {
    r = fread_unlocked(scratch, 1, n, file_);
  } while (r == 0 && ferror(file_) && errno == EINTR);
  *result = Slice(scratch, r);
  if (r < n) {
    if (feof(file_)) {
      // We leave status as ok if we hit the end of the file
      // We also clear the error so that the reads can continue
      // if a new data is written to the file
      clearerr(file_);
    } else {
      // A partial read with an error: return a non-ok status
282
      s = IOError("While reading file sequentially", filename_, errno);
283 284
    }
  }
A
Aaron Gao 已提交
285 286 287 288 289
  return s;
}

Status PosixSequentialFile::PositionedRead(uint64_t offset, size_t n,
                                           Slice* result, char* scratch) {
290 291 292 293 294
  assert(use_direct_io());
  assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
  assert(IsSectorAligned(n, GetRequiredBufferAlignment()));
  assert(IsSectorAligned(scratch, GetRequiredBufferAlignment()));

A
Aaron Gao 已提交
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
  Status s;
  ssize_t r = -1;
  size_t left = n;
  char* ptr = scratch;
  while (left > 0) {
    r = pread(fd_, ptr, left, static_cast<off_t>(offset));
    if (r <= 0) {
      if (r == -1 && errno == EINTR) {
        continue;
      }
      break;
    }
    ptr += r;
    offset += r;
    left -= r;
    if (r % static_cast<ssize_t>(GetRequiredBufferAlignment()) != 0) {
      // Bytes reads don't fill sectors. Should only happen at the end
      // of the file.
      break;
    }
315
  }
A
Aaron Gao 已提交
316 317
  if (r < 0) {
    // An error: return a non-ok status
318 319 320
    s = IOError(
        "While pread " + ToString(n) + " bytes from offset " + ToString(offset),
        filename_, errno);
A
Aaron Gao 已提交
321 322
  }
  *result = Slice(scratch, (r < 0) ? 0 : n - left);
323 324 325 326 327
  return s;
}

Status PosixSequentialFile::Skip(uint64_t n) {
  if (fseek(file_, static_cast<long int>(n), SEEK_CUR)) {
328 329
    return IOError("While fseek to skip " + ToString(n) + " bytes", filename_,
                   errno);
330 331 332 333 334 335
  }
  return Status::OK();
}

Status PosixSequentialFile::InvalidateCache(size_t offset, size_t length) {
#ifndef OS_LINUX
336 337
  (void)offset;
  (void)length;
338 339
  return Status::OK();
#else
340
  if (!use_direct_io()) {
A
Aaron Gao 已提交
341 342 343
    // free OS pages
    int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
    if (ret != 0) {
344 345 346
      return IOError("While fadvise NotNeeded offset " + ToString(offset) +
                         " len " + ToString(length),
                     filename_, errno);
A
Aaron Gao 已提交
347
    }
348
  }
K
krad 已提交
349
  return Status::OK();
A
Aaron Gao 已提交
350
#endif
K
krad 已提交
351 352 353 354 355
}

/*
 * PosixRandomAccessFile
 */
356
#if defined(OS_LINUX)
K
krad 已提交
357
size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
358 359 360 361 362 363 364 365 366 367 368 369
  if (max_size < kMaxVarint64Length * 3) {
    return 0;
  }

  struct stat buf;
  int result = fstat(fd, &buf);
  if (result == -1) {
    return 0;
  }

  long version = 0;
  result = ioctl(fd, FS_IOC_GETVERSION, &version);
K
krad 已提交
370
  TEST_SYNC_POINT_CALLBACK("GetUniqueIdFromFile:FS_IOC_GETVERSION", &result);
371 372 373 374 375 376 377 378 379 380 381 382
  if (result == -1) {
    return 0;
  }
  uint64_t uversion = (uint64_t)version;

  char* rid = id;
  rid = EncodeVarint64(rid, buf.st_dev);
  rid = EncodeVarint64(rid, buf.st_ino);
  rid = EncodeVarint64(rid, uversion);
  assert(rid >= id);
  return static_cast<size_t>(rid - id);
}
K
krad 已提交
383 384
#endif

T
Tomas Kolda 已提交
385
#if defined(OS_MACOSX) || defined(OS_AIX)
K
krad 已提交
386
size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
K
krad 已提交
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402
  if (max_size < kMaxVarint64Length * 3) {
    return 0;
  }

  struct stat buf;
  int result = fstat(fd, &buf);
  if (result == -1) {
    return 0;
  }

  char* rid = id;
  rid = EncodeVarint64(rid, buf.st_dev);
  rid = EncodeVarint64(rid, buf.st_ino);
  rid = EncodeVarint64(rid, buf.st_gen);
  assert(rid >= id);
  return static_cast<size_t>(rid - id);
403 404 405 406 407 408 409 410 411
}
#endif
/*
 * PosixRandomAccessFile
 *
 * pread() based random-access
 */
PosixRandomAccessFile::PosixRandomAccessFile(const std::string& fname, int fd,
                                             const EnvOptions& options)
A
Aaron Gao 已提交
412 413 414 415
    : filename_(fname),
      fd_(fd),
      use_direct_io_(options.use_direct_reads),
      logical_sector_size_(GetLogicalBufferSize(fd_)) {
A
Aaron Gao 已提交
416
  assert(!options.use_direct_reads || !options.use_mmap_reads);
417 418 419 420 421 422 423
  assert(!options.use_mmap_reads || sizeof(void*) < 8);
}

PosixRandomAccessFile::~PosixRandomAccessFile() { close(fd_); }

Status PosixRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
                                   char* scratch) const {
424 425 426 427 428
  if (use_direct_io()) {
    assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
    assert(IsSectorAligned(n, GetRequiredBufferAlignment()));
    assert(IsSectorAligned(scratch, GetRequiredBufferAlignment()));
  }
429 430 431 432 433 434 435
  Status s;
  ssize_t r = -1;
  size_t left = n;
  char* ptr = scratch;
  while (left > 0) {
    r = pread(fd_, ptr, left, static_cast<off_t>(offset));
    if (r <= 0) {
A
Aaron Gao 已提交
436
      if (r == -1 && errno == EINTR) {
437 438 439 440 441 442 443
        continue;
      }
      break;
    }
    ptr += r;
    offset += r;
    left -= r;
444
    if (use_direct_io() &&
A
Aaron Gao 已提交
445 446 447 448 449
        r % static_cast<ssize_t>(GetRequiredBufferAlignment()) != 0) {
      // Bytes reads don't fill sectors. Should only happen at the end
      // of the file.
      break;
    }
450 451 452
  }
  if (r < 0) {
    // An error: return a non-ok status
453 454 455
    s = IOError(
        "While pread offset " + ToString(offset) + " len " + ToString(n),
        filename_, errno);
456
  }
A
Aaron Gao 已提交
457
  *result = Slice(scratch, (r < 0) ? 0 : n - left);
458 459 460
  return s;
}

461 462 463 464 465 466 467 468 469 470 471 472 473 474
Status PosixRandomAccessFile::Prefetch(uint64_t offset, size_t n) {
  Status s;
  if (!use_direct_io()) {
    ssize_t r = 0;
#ifdef OS_LINUX
    r = readahead(fd_, offset, n);
#endif
#ifdef OS_MACOSX
    radvisory advice;
    advice.ra_offset = static_cast<off_t>(offset);
    advice.ra_count = static_cast<int>(n);
    r = fcntl(fd_, F_RDADVISE, &advice);
#endif
    if (r == -1) {
475 476 477
      s = IOError("While prefetching offset " + ToString(offset) + " len " +
                      ToString(n),
                  filename_, errno);
478 479 480 481 482
    }
  }
  return s;
}

T
Tomas Kolda 已提交
483
#if defined(OS_LINUX) || defined(OS_MACOSX) || defined(OS_AIX)
484
size_t PosixRandomAccessFile::GetUniqueId(char* id, size_t max_size) const {
K
krad 已提交
485
  return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size);
486 487 488 489
}
#endif

void PosixRandomAccessFile::Hint(AccessPattern pattern) {
490
  if (use_direct_io()) {
A
Aaron Gao 已提交
491 492
    return;
  }
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515
  switch (pattern) {
    case NORMAL:
      Fadvise(fd_, 0, 0, POSIX_FADV_NORMAL);
      break;
    case RANDOM:
      Fadvise(fd_, 0, 0, POSIX_FADV_RANDOM);
      break;
    case SEQUENTIAL:
      Fadvise(fd_, 0, 0, POSIX_FADV_SEQUENTIAL);
      break;
    case WILLNEED:
      Fadvise(fd_, 0, 0, POSIX_FADV_WILLNEED);
      break;
    case DONTNEED:
      Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED);
      break;
    default:
      assert(false);
      break;
  }
}

Status PosixRandomAccessFile::InvalidateCache(size_t offset, size_t length) {
516
  if (use_direct_io()) {
A
Aaron Gao 已提交
517 518
    return Status::OK();
  }
519
#ifndef OS_LINUX
520 521
  (void)offset;
  (void)length;
522 523 524 525 526 527 528
  return Status::OK();
#else
  // free OS pages
  int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
  if (ret == 0) {
    return Status::OK();
  }
529 530 531
  return IOError("While fadvise NotNeeded offset " + ToString(offset) +
                     " len " + ToString(length),
                 filename_, errno);
532 533 534 535 536 537 538 539 540 541 542 543 544 545
#endif
}

/*
 * PosixMmapReadableFile
 *
 * mmap() based random-access
 */
// base[0,length-1] contains the mmapped contents of the file.
PosixMmapReadableFile::PosixMmapReadableFile(const int fd,
                                             const std::string& fname,
                                             void* base, size_t length,
                                             const EnvOptions& options)
    : fd_(fd), filename_(fname), mmapped_region_(base), length_(length) {
546 547 548
#ifdef NDEBUG
  (void)options;
#endif
549 550
  fd_ = fd_ + 0;  // suppress the warning for used variables
  assert(options.use_mmap_reads);
A
Aaron Gao 已提交
551
  assert(!options.use_direct_reads);
552 553 554 555 556 557 558 559
}

PosixMmapReadableFile::~PosixMmapReadableFile() {
  int ret = munmap(mmapped_region_, length_);
  if (ret != 0) {
    fprintf(stdout, "failed to munmap %p length %" ROCKSDB_PRIszt " \n",
            mmapped_region_, length_);
  }
560
  close(fd_);
561 562 563
}

Status PosixMmapReadableFile::Read(uint64_t offset, size_t n, Slice* result,
A
Andrew Kryczka 已提交
564
                                   char* /*scratch*/) const {
565 566 567
  Status s;
  if (offset > length_) {
    *result = Slice();
568 569 570
    return IOError("While mmap read offset " + ToString(offset) +
                       " larger than file length " + ToString(length_),
                   filename_, EINVAL);
571 572 573 574 575 576 577 578 579
  } else if (offset + n > length_) {
    n = static_cast<size_t>(length_ - offset);
  }
  *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
  return s;
}

Status PosixMmapReadableFile::InvalidateCache(size_t offset, size_t length) {
#ifndef OS_LINUX
580 581
  (void)offset;
  (void)length;
582 583 584 585 586 587 588
  return Status::OK();
#else
  // free OS pages
  int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
  if (ret == 0) {
    return Status::OK();
  }
589 590 591
  return IOError("While fadvise not needed. Offset " + ToString(offset) +
                     " len" + ToString(length),
                 filename_, errno);
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607
#endif
}

/*
 * PosixMmapFile
 *
 * We preallocate up to an extra megabyte and use memcpy to append new
 * data to the file.  This is safe since we either properly close the
 * file before reading from it, or for log files, the reading code
 * knows enough to skip zero suffixes.
 */
Status PosixMmapFile::UnmapCurrentRegion() {
  TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0", rocksdb_kill_odds);
  if (base_ != nullptr) {
    int munmap_status = munmap(base_, limit_ - base_);
    if (munmap_status != 0) {
608
      return IOError("While munmap", filename_, munmap_status);
609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670
    }
    file_offset_ += limit_ - base_;
    base_ = nullptr;
    limit_ = nullptr;
    last_sync_ = nullptr;
    dst_ = nullptr;

    // Increase the amount we map the next time, but capped at 1MB
    if (map_size_ < (1 << 20)) {
      map_size_ *= 2;
    }
  }
  return Status::OK();
}

Status PosixMmapFile::MapNewRegion() {
#ifdef ROCKSDB_FALLOCATE_PRESENT
  assert(base_ == nullptr);
  TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0", rocksdb_kill_odds);
  // we can't fallocate with FALLOC_FL_KEEP_SIZE here
  if (allow_fallocate_) {
    IOSTATS_TIMER_GUARD(allocate_nanos);
    int alloc_status = fallocate(fd_, 0, file_offset_, map_size_);
    if (alloc_status != 0) {
      // fallback to posix_fallocate
      alloc_status = posix_fallocate(fd_, file_offset_, map_size_);
    }
    if (alloc_status != 0) {
      return Status::IOError("Error allocating space to file : " + filename_ +
                             "Error : " + strerror(alloc_status));
    }
  }

  TEST_KILL_RANDOM("PosixMmapFile::Append:1", rocksdb_kill_odds);
  void* ptr = mmap(nullptr, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_,
                   file_offset_);
  if (ptr == MAP_FAILED) {
    return Status::IOError("MMap failed on " + filename_);
  }
  TEST_KILL_RANDOM("PosixMmapFile::Append:2", rocksdb_kill_odds);

  base_ = reinterpret_cast<char*>(ptr);
  limit_ = base_ + map_size_;
  dst_ = base_;
  last_sync_ = base_;
  return Status::OK();
#else
  return Status::NotSupported("This platform doesn't support fallocate()");
#endif
}

Status PosixMmapFile::Msync() {
  if (dst_ == last_sync_) {
    return Status::OK();
  }
  // Find the beginnings of the pages that contain the first and last
  // bytes to be synced.
  size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
  size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
  last_sync_ = dst_;
  TEST_KILL_RANDOM("PosixMmapFile::Msync:0", rocksdb_kill_odds);
  if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
671
    return IOError("While msync", filename_, errno);
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689
  }
  return Status::OK();
}

PosixMmapFile::PosixMmapFile(const std::string& fname, int fd, size_t page_size,
                             const EnvOptions& options)
    : filename_(fname),
      fd_(fd),
      page_size_(page_size),
      map_size_(Roundup(65536, page_size)),
      base_(nullptr),
      limit_(nullptr),
      dst_(nullptr),
      last_sync_(nullptr),
      file_offset_(0) {
#ifdef ROCKSDB_FALLOCATE_PRESENT
  allow_fallocate_ = options.allow_fallocate;
  fallocate_with_keep_size_ = options.fallocate_with_keep_size;
690 691
#else
  (void)options;
692 693 694
#endif
  assert((page_size & (page_size - 1)) == 0);
  assert(options.use_mmap_writes);
A
Aaron Gao 已提交
695
  assert(!options.use_direct_writes);
696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723
}

PosixMmapFile::~PosixMmapFile() {
  if (fd_ >= 0) {
    PosixMmapFile::Close();
  }
}

Status PosixMmapFile::Append(const Slice& data) {
  const char* src = data.data();
  size_t left = data.size();
  while (left > 0) {
    assert(base_ <= dst_);
    assert(dst_ <= limit_);
    size_t avail = limit_ - dst_;
    if (avail == 0) {
      Status s = UnmapCurrentRegion();
      if (!s.ok()) {
        return s;
      }
      s = MapNewRegion();
      if (!s.ok()) {
        return s;
      }
      TEST_KILL_RANDOM("PosixMmapFile::Append:0", rocksdb_kill_odds);
    }

    size_t n = (left <= avail) ? left : avail;
724
    assert(dst_);
725 726 727 728 729 730 731 732 733 734 735 736 737 738
    memcpy(dst_, src, n);
    dst_ += n;
    src += n;
    left -= n;
  }
  return Status::OK();
}

Status PosixMmapFile::Close() {
  Status s;
  size_t unused = limit_ - dst_;

  s = UnmapCurrentRegion();
  if (!s.ok()) {
739
    s = IOError("While closing mmapped file", filename_, errno);
740 741 742
  } else if (unused > 0) {
    // Trim the extra space at the end of the file
    if (ftruncate(fd_, file_offset_ - unused) < 0) {
743
      s = IOError("While ftruncating mmaped file", filename_, errno);
744 745 746 747 748
    }
  }

  if (close(fd_) < 0) {
    if (s.ok()) {
749
      s = IOError("While closing mmapped file", filename_, errno);
750 751 752 753 754 755 756 757 758 759 760 761 762
    }
  }

  fd_ = -1;
  base_ = nullptr;
  limit_ = nullptr;
  return s;
}

Status PosixMmapFile::Flush() { return Status::OK(); }

Status PosixMmapFile::Sync() {
  if (fdatasync(fd_) < 0) {
763
    return IOError("While fdatasync mmapped file", filename_, errno);
764 765 766 767 768 769 770 771 772 773
  }

  return Msync();
}

/**
 * Flush data as well as metadata to stable storage.
 */
Status PosixMmapFile::Fsync() {
  if (fsync(fd_) < 0) {
774
    return IOError("While fsync mmaped file", filename_, errno);
775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791
  }

  return Msync();
}

/**
 * Get the size of valid data in the file. This will not match the
 * size that is returned from the filesystem because we use mmap
 * to extend file by map_size every time.
 */
uint64_t PosixMmapFile::GetFileSize() {
  size_t used = dst_ - base_;
  return file_offset_ + used;
}

Status PosixMmapFile::InvalidateCache(size_t offset, size_t length) {
#ifndef OS_LINUX
792 793
  (void)offset;
  (void)length;
794 795 796 797 798 799 800
  return Status::OK();
#else
  // free OS pages
  int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
  if (ret == 0) {
    return Status::OK();
  }
801
  return IOError("While fadvise NotNeeded mmapped file", filename_, errno);
802 803 804
#endif
}

805
#ifdef ROCKSDB_FALLOCATE_PRESENT
806
Status PosixMmapFile::Allocate(uint64_t offset, uint64_t len) {
Y
Yi Wu 已提交
807 808
  assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
  assert(len <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
809 810 811
  TEST_KILL_RANDOM("PosixMmapFile::Allocate:0", rocksdb_kill_odds);
  int alloc_status = 0;
  if (allow_fallocate_) {
812 813 814
    alloc_status =
        fallocate(fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0,
                  static_cast<off_t>(offset), static_cast<off_t>(len));
815 816 817 818
  }
  if (alloc_status == 0) {
    return Status::OK();
  } else {
819 820 821
    return IOError(
        "While fallocate offset " + ToString(offset) + " len " + ToString(len),
        filename_, errno);
822
  }
823
}
824
#endif
825 826 827 828 829 830 831 832

/*
 * PosixWritableFile
 *
 * Use posix write to write data to a file.
 */
PosixWritableFile::PosixWritableFile(const std::string& fname, int fd,
                                     const EnvOptions& options)
833 834
    : WritableFile(options),
      filename_(fname),
835
      use_direct_io_(options.use_direct_writes),
A
Aaron Gao 已提交
836
      fd_(fd),
A
Aaron Gao 已提交
837 838
      filesize_(0),
      logical_sector_size_(GetLogicalBufferSize(fd_)) {
839 840 841 842
#ifdef ROCKSDB_FALLOCATE_PRESENT
  allow_fallocate_ = options.allow_fallocate;
  fallocate_with_keep_size_ = options.fallocate_with_keep_size;
#endif
843 844 845
#ifdef ROCKSDB_RANGESYNC_PRESENT
  sync_file_range_supported_ = IsSyncFileRangeSupported(fd_);
#endif  // ROCKSDB_RANGESYNC_PRESENT
846 847 848 849 850 851 852 853 854 855
  assert(!options.use_mmap_writes);
}

PosixWritableFile::~PosixWritableFile() {
  if (fd_ >= 0) {
    PosixWritableFile::Close();
  }
}

Status PosixWritableFile::Append(const Slice& data) {
856 857 858 859
  if (use_direct_io()) {
    assert(IsSectorAligned(data.size(), GetRequiredBufferAlignment()));
    assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment()));
  }
860
  const char* src = data.data();
861 862 863 864
  size_t nbytes = data.size();

  if (!PosixWrite(fd_, src, nbytes)) {
    return IOError("While appending to file", filename_, errno);
865
  }
866 867

  filesize_ += nbytes;
868 869 870
  return Status::OK();
}

871
Status PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset) {
872 873 874 875 876
  if (use_direct_io()) {
    assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
    assert(IsSectorAligned(data.size(), GetRequiredBufferAlignment()));
    assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment()));
  }
Y
Yi Wu 已提交
877
  assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
878
  const char* src = data.data();
879 880 881 882
  size_t nbytes = data.size();
  if (!PosixPositionedWrite(fd_, src, nbytes, static_cast<off_t>(offset))) {
    return IOError("While pwrite to file at offset " + ToString(offset),
                   filename_, errno);
883
  }
884
  filesize_ = offset + nbytes;
885 886 887
  return Status::OK();
}

A
Aaron Gao 已提交
888 889 890 891
Status PosixWritableFile::Truncate(uint64_t size) {
  Status s;
  int r = ftruncate(fd_, size);
  if (r < 0) {
892 893
    s = IOError("While ftruncate file to size " + ToString(size), filename_,
                errno);
A
Aaron Gao 已提交
894 895 896 897 898 899
  } else {
    filesize_ = size;
  }
  return s;
}

900 901 902 903 904 905 906 907 908 909
Status PosixWritableFile::Close() {
  Status s;

  size_t block_size;
  size_t last_allocated_block;
  GetPreallocationStatus(&block_size, &last_allocated_block);
  if (last_allocated_block > 0) {
    // trim the extra space preallocated at the end of the file
    // NOTE(ljin): we probably don't want to surface failure as an IOError,
    // but it will be nice to log these errors.
T
Tamir Duberstein 已提交
910
    int dummy __attribute__((__unused__));
911
    dummy = ftruncate(fd_, filesize_);
912
#if defined(ROCKSDB_FALLOCATE_PRESENT) && defined(FALLOC_FL_PUNCH_HOLE) && \
913
    !defined(TRAVIS)
914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929
    // in some file systems, ftruncate only trims trailing space if the
    // new file size is smaller than the current size. Calling fallocate
    // with FALLOC_FL_PUNCH_HOLE flag to explicitly release these unused
    // blocks. FALLOC_FL_PUNCH_HOLE is supported on at least the following
    // filesystems:
    //   XFS (since Linux 2.6.38)
    //   ext4 (since Linux 3.0)
    //   Btrfs (since Linux 3.7)
    //   tmpfs (since Linux 3.5)
    // We ignore error since failure of this operation does not affect
    // correctness.
    // TRAVIS - this code does not work on TRAVIS filesystems.
    // the FALLOC_FL_KEEP_SIZE option is expected to not change the size
    // of the file, but it does. Simple strace report will show that.
    // While we work with Travis-CI team to figure out if this is a
    // quirk of Docker/AUFS, we will comment this out.
930
    struct stat file_stats;
931
    int result = fstat(fd_, &file_stats);
932 933
    // After ftruncate, we check whether ftruncate has the correct behavior.
    // If not, we should hack it with FALLOC_FL_PUNCH_HOLE
934 935
    if (result == 0 &&
        (file_stats.st_size + file_stats.st_blksize - 1) /
936 937
                file_stats.st_blksize !=
            file_stats.st_blocks / (file_stats.st_blksize / 512)) {
938 939 940 941 942
      IOSTATS_TIMER_GUARD(allocate_nanos);
      if (allow_fallocate_) {
        fallocate(fd_, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE, filesize_,
                  block_size * last_allocated_block - filesize_);
      }
943 944
    }
#endif
945 946 947
  }

  if (close(fd_) < 0) {
948
    s = IOError("While closing file after writing", filename_, errno);
949 950 951 952 953 954 955 956 957 958
  }
  fd_ = -1;
  return s;
}

// write out the cached data to the OS cache
Status PosixWritableFile::Flush() { return Status::OK(); }

Status PosixWritableFile::Sync() {
  if (fdatasync(fd_) < 0) {
959
    return IOError("While fdatasync", filename_, errno);
960 961 962 963 964 965
  }
  return Status::OK();
}

Status PosixWritableFile::Fsync() {
  if (fsync(fd_) < 0) {
966
    return IOError("While fsync", filename_, errno);
967 968 969 970 971 972 973 974
  }
  return Status::OK();
}

bool PosixWritableFile::IsSyncThreadSafe() const { return true; }

uint64_t PosixWritableFile::GetFileSize() { return filesize_; }

S
Stream  
Shaohua Li 已提交
975 976
void PosixWritableFile::SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) {
#ifdef OS_LINUX
977 978
// Suppress Valgrind "Unimplemented functionality" error.
#ifndef ROCKSDB_VALGRIND_RUN
S
Stream  
Shaohua Li 已提交
979 980 981 982 983 984
  if (hint == write_hint_) {
    return;
  }
  if (fcntl(fd_, F_SET_RW_HINT, &hint) == 0) {
    write_hint_ = hint;
  }
985 986
#else
  (void)hint;
987
#endif  // ROCKSDB_VALGRIND_RUN
988 989
#else
  (void)hint;
990
#endif  // OS_LINUX
S
Stream  
Shaohua Li 已提交
991 992
}

993
Status PosixWritableFile::InvalidateCache(size_t offset, size_t length) {
994
  if (use_direct_io()) {
A
Aaron Gao 已提交
995 996
    return Status::OK();
  }
997
#ifndef OS_LINUX
998 999
  (void)offset;
  (void)length;
1000 1001 1002 1003 1004 1005 1006
  return Status::OK();
#else
  // free OS pages
  int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
  if (ret == 0) {
    return Status::OK();
  }
1007
  return IOError("While fadvise NotNeeded", filename_, errno);
1008 1009 1010
#endif
}

1011
#ifdef ROCKSDB_FALLOCATE_PRESENT
1012
Status PosixWritableFile::Allocate(uint64_t offset, uint64_t len) {
Y
Yi Wu 已提交
1013 1014
  assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
  assert(len <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1015 1016 1017 1018
  TEST_KILL_RANDOM("PosixWritableFile::Allocate:0", rocksdb_kill_odds);
  IOSTATS_TIMER_GUARD(allocate_nanos);
  int alloc_status = 0;
  if (allow_fallocate_) {
1019 1020 1021
    alloc_status =
        fallocate(fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0,
                  static_cast<off_t>(offset), static_cast<off_t>(len));
1022 1023 1024 1025
  }
  if (alloc_status == 0) {
    return Status::OK();
  } else {
1026 1027 1028
    return IOError(
        "While fallocate offset " + ToString(offset) + " len " + ToString(len),
        filename_, errno);
1029 1030
  }
}
1031
#endif
1032

1033
Status PosixWritableFile::RangeSync(uint64_t offset, uint64_t nbytes) {
1034
#ifdef ROCKSDB_RANGESYNC_PRESENT
Y
Yi Wu 已提交
1035 1036
  assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
  assert(nbytes <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053
  if (sync_file_range_supported_) {
    int ret;
    if (strict_bytes_per_sync_) {
      // Specifying `SYNC_FILE_RANGE_WAIT_BEFORE` together with an offset/length
      // that spans all bytes written so far tells `sync_file_range` to wait for
      // any outstanding writeback requests to finish before issuing a new one.
      ret =
          sync_file_range(fd_, 0, static_cast<off_t>(offset + nbytes),
                          SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE);
    } else {
      ret = sync_file_range(fd_, static_cast<off_t>(offset),
                            static_cast<off_t>(nbytes), SYNC_FILE_RANGE_WRITE);
    }
    if (ret != 0) {
      return IOError("While sync_file_range returned " + ToString(ret),
                     filename_, errno);
    }
1054 1055
    return Status::OK();
  }
1056 1057
#endif  // ROCKSDB_RANGESYNC_PRESENT
  return WritableFile::RangeSync(offset, nbytes);
1058 1059
}

T
Tomas Kolda 已提交
1060
#ifdef OS_LINUX
1061
size_t PosixWritableFile::GetUniqueId(char* id, size_t max_size) const {
K
krad 已提交
1062
  return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size);
1063
}
1064
#endif
1065

I
Islam AbdelRahman 已提交
1066 1067 1068 1069 1070
/*
 * PosixRandomRWFile
 */

PosixRandomRWFile::PosixRandomRWFile(const std::string& fname, int fd,
A
Andrew Kryczka 已提交
1071
                                     const EnvOptions& /*options*/)
I
Islam AbdelRahman 已提交
1072 1073 1074 1075 1076 1077 1078 1079 1080 1081
    : filename_(fname), fd_(fd) {}

PosixRandomRWFile::~PosixRandomRWFile() {
  if (fd_ >= 0) {
    Close();
  }
}

Status PosixRandomRWFile::Write(uint64_t offset, const Slice& data) {
  const char* src = data.data();
1082 1083 1084 1085 1086
  size_t nbytes = data.size();
  if (!PosixPositionedWrite(fd_, src, nbytes, static_cast<off_t>(offset))) {
    return IOError(
        "While write random read/write file at offset " + ToString(offset),
        filename_, errno);
I
Islam AbdelRahman 已提交
1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103
  }

  return Status::OK();
}

Status PosixRandomRWFile::Read(uint64_t offset, size_t n, Slice* result,
                               char* scratch) const {
  size_t left = n;
  char* ptr = scratch;
  while (left > 0) {
    ssize_t done = pread(fd_, ptr, left, offset);
    if (done < 0) {
      // error while reading from file
      if (errno == EINTR) {
        // read was interrupted, try again.
        continue;
      }
1104 1105 1106
      return IOError("While reading random read/write file offset " +
                         ToString(offset) + " len " + ToString(n),
                     filename_, errno);
I
Islam AbdelRahman 已提交
1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125
    } else if (done == 0) {
      // Nothing more to read
      break;
    }

    // Read `done` bytes
    ptr += done;
    offset += done;
    left -= done;
  }

  *result = Slice(scratch, n - left);
  return Status::OK();
}

Status PosixRandomRWFile::Flush() { return Status::OK(); }

Status PosixRandomRWFile::Sync() {
  if (fdatasync(fd_) < 0) {
1126
    return IOError("While fdatasync random read/write file", filename_, errno);
I
Islam AbdelRahman 已提交
1127 1128 1129 1130 1131 1132
  }
  return Status::OK();
}

Status PosixRandomRWFile::Fsync() {
  if (fsync(fd_) < 0) {
1133
    return IOError("While fsync random read/write file", filename_, errno);
I
Islam AbdelRahman 已提交
1134 1135 1136 1137 1138 1139
  }
  return Status::OK();
}

Status PosixRandomRWFile::Close() {
  if (close(fd_) < 0) {
1140
    return IOError("While close random read/write file", filename_, errno);
I
Islam AbdelRahman 已提交
1141 1142 1143 1144 1145
  }
  fd_ = -1;
  return Status::OK();
}

1146 1147
PosixMemoryMappedFileBuffer::~PosixMemoryMappedFileBuffer() {
  // TODO should have error handling though not much we can do...
D
Dmitri Smirnov 已提交
1148
  munmap(this->base_, length_);
1149 1150
}

K
krad 已提交
1151 1152 1153 1154
/*
 * PosixDirectory
 */

1155 1156 1157
PosixDirectory::~PosixDirectory() { close(fd_); }

Status PosixDirectory::Fsync() {
T
Tomas Kolda 已提交
1158
#ifndef OS_AIX
1159
  if (fsync(fd_) == -1) {
1160
    return IOError("While fsync", "a directory", errno);
1161
  }
T
Tomas Kolda 已提交
1162
#endif
1163 1164
  return Status::OK();
}
1165 1166
}  // namespace rocksdb
#endif