io_posix.cc 32.5 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5 6 7 8 9 10
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#ifdef ROCKSDB_LIB_IO_POSIX
11
#include "env/io_posix.h"
S
sdong 已提交
12 13
#include <errno.h>
#include <fcntl.h>
K
krad 已提交
14
#include <algorithm>
S
sdong 已提交
15 16 17 18 19 20 21 22 23 24 25 26 27
#if defined(OS_LINUX)
#include <linux/fs.h>
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#ifdef OS_LINUX
#include <sys/statfs.h>
#include <sys/syscall.h>
28
#include <sys/sysmacros.h>
S
sdong 已提交
29
#endif
30
#include "logging/posix_logger.h"
31
#include "monitoring/iostats_context_imp.h"
S
sdong 已提交
32 33
#include "port/port.h"
#include "rocksdb/slice.h"
34
#include "test_util/sync_point.h"
S
sdong 已提交
35 36
#include "util/coding.h"
#include "util/string_util.h"
37

S
Stream  
Shaohua Li 已提交
38 39
#if defined(OS_LINUX) && !defined(F_SET_RW_HINT)
#define F_LINUX_SPECIFIC_BASE 1024
40
#define F_SET_RW_HINT (F_LINUX_SPECIFIC_BASE + 12)
S
Stream  
Shaohua Li 已提交
41 42
#endif

43 44 45
namespace rocksdb {

// A wrapper for fadvise, if the platform doesn't support fadvise,
46
// it will simply return 0.
47 48 49 50
int Fadvise(int fd, off_t offset, size_t len, int advice) {
#ifdef OS_LINUX
  return posix_fadvise(fd, offset, len, advice);
#else
51 52 53 54
  (void)fd;
  (void)offset;
  (void)len;
  (void)advice;
55 56 57 58
  return 0;  // simply do nothing.
#endif
}

A
Aaron Gao 已提交
59
namespace {
60

61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
// On MacOS (and probably *BSD), the posix write and pwrite calls do not support
// buffers larger than 2^31-1 bytes. These two wrappers fix this issue by
// cutting the buffer in 1GB chunks. We use this chunk size to be sure to keep
// the writes aligned.

bool PosixWrite(int fd, const char* buf, size_t nbyte) {
  const size_t kLimit1Gb = 1UL << 30;

  const char* src = buf;
  size_t left = nbyte;

  while (left != 0) {
    size_t bytes_to_write = std::min(left, kLimit1Gb);

    ssize_t done = write(fd, src, bytes_to_write);
    if (done < 0) {
      if (errno == EINTR) {
        continue;
      }
      return false;
    }
    left -= done;
    src += done;
  }
  return true;
}

bool PosixPositionedWrite(int fd, const char* buf, size_t nbyte, off_t offset) {
  const size_t kLimit1Gb = 1UL << 30;

  const char* src = buf;
  size_t left = nbyte;

  while (left != 0) {
    size_t bytes_to_write = std::min(left, kLimit1Gb);

    ssize_t done = pwrite(fd, src, bytes_to_write, offset);
    if (done < 0) {
      if (errno == EINTR) {
        continue;
      }
      return false;
    }
    left -= done;
    offset += done;
    src += done;
  }

  return true;
}

A
Aaron Gao 已提交
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
size_t GetLogicalBufferSize(int __attribute__((__unused__)) fd) {
#ifdef OS_LINUX
  struct stat buf;
  int result = fstat(fd, &buf);
  if (result == -1) {
    return kDefaultPageSize;
  }
  if (major(buf.st_dev) == 0) {
    // Unnamed devices (e.g. non-device mounts), reserved as null device number.
    // These don't have an entry in /sys/dev/block/. Return a sensible default.
    return kDefaultPageSize;
  }

  // Reading queue/logical_block_size does not require special permissions.
  const int kBufferSize = 100;
  char path[kBufferSize];
  char real_path[PATH_MAX + 1];
  snprintf(path, kBufferSize, "/sys/dev/block/%u:%u", major(buf.st_dev),
           minor(buf.st_dev));
  if (realpath(path, real_path) == nullptr) {
    return kDefaultPageSize;
  }
  std::string device_dir(real_path);
  if (!device_dir.empty() && device_dir.back() == '/') {
    device_dir.pop_back();
  }
138 139
  // NOTE: sda3 and nvme0n1p1 do not have a `queue/` subdir, only the parent sda
  // and nvme0n1 have it.
A
Aaron Gao 已提交
140 141 142
  // $ ls -al '/sys/dev/block/8:3'
  // lrwxrwxrwx. 1 root root 0 Jun 26 01:38 /sys/dev/block/8:3 ->
  // ../../block/sda/sda3
143 144 145
  // $ ls -al '/sys/dev/block/259:4'
  // lrwxrwxrwx 1 root root 0 Jan 31 16:04 /sys/dev/block/259:4 ->
  // ../../devices/pci0000:17/0000:17:00.0/0000:18:00.0/nvme/nvme0/nvme0n1/nvme0n1p1
A
Aaron Gao 已提交
146 147 148 149 150 151 152 153
  size_t parent_end = device_dir.rfind('/', device_dir.length() - 1);
  if (parent_end == std::string::npos) {
    return kDefaultPageSize;
  }
  size_t parent_begin = device_dir.rfind('/', parent_end - 1);
  if (parent_begin == std::string::npos) {
    return kDefaultPageSize;
  }
154 155 156 157 158
  std::string parent =
      device_dir.substr(parent_begin + 1, parent_end - parent_begin - 1);
  std::string child = device_dir.substr(parent_end + 1, std::string::npos);
  if (parent != "block" &&
      (child.compare(0, 4, "nvme") || child.find('p') != std::string::npos)) {
A
Aaron Gao 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
    device_dir = device_dir.substr(0, parent_end);
  }
  std::string fname = device_dir + "/queue/logical_block_size";
  FILE* fp;
  size_t size = 0;
  fp = fopen(fname.c_str(), "r");
  if (fp != nullptr) {
    char* line = nullptr;
    size_t len = 0;
    if (getline(&line, &len, fp) != -1) {
      sscanf(line, "%zu", &size);
    }
    free(line);
    fclose(fp);
  }
  if (size != 0 && (size & (size - 1)) == 0) {
    return size;
  }
#endif
  return kDefaultPageSize;
}
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218

#ifdef ROCKSDB_RANGESYNC_PRESENT

#if !defined(ZFS_SUPER_MAGIC)
// The magic number for ZFS was not exposed until recently. It should be fixed
// forever so we can just copy the magic number here.
#define ZFS_SUPER_MAGIC 0x2fc12fc1
#endif

bool IsSyncFileRangeSupported(int __attribute__((__unused__)) fd) {
  // `fstatfs` is only available on Linux, but so is `sync_file_range`, so
  // `defined(ROCKSDB_RANGESYNC_PRESENT)` should imply `defined(OS_LINUX)`.
  struct statfs buf;
  int ret = fstatfs(fd, &buf);
  assert(ret == 0);
  if (ret != 0) {
    // We don't know whether the filesystem properly supports `sync_file_range`.
    // Even if it doesn't, we don't know of any safety issue with trying to call
    // it anyways. So, to preserve the same behavior as before this `fstatfs`
    // check was introduced, we assume `sync_file_range` is usable.
    return true;
  }
  if (buf.f_type == ZFS_SUPER_MAGIC) {
    // Testing on ZFS showed the writeback did not happen asynchronously when
    // `sync_file_range` was called, even though it returned success. Avoid it
    // and use `fdatasync` instead to preserve the contract of `bytes_per_sync`,
    // even though this'll incur extra I/O for metadata.
    return false;
  }
  // No known problems with other filesystems' implementations of
  // `sync_file_range`, so allow them to use it.
  return true;
}

#undef ZFS_SUPER_MAGIC

#endif  // ROCKSDB_RANGESYNC_PRESENT

}  // anonymous namespace
A
Aaron Gao 已提交
219

K
krad 已提交
220 221 222
/*
 * DirectIOHelper
 */
223
#ifndef NDEBUG
K
krad 已提交
224 225
namespace {

A
Aaron Gao 已提交
226 227 228
bool IsSectorAligned(const size_t off, size_t sector_size) {
  return off % sector_size == 0;
}
K
krad 已提交
229

230 231
bool IsSectorAligned(const void* ptr, size_t sector_size) {
  return uintptr_t(ptr) % sector_size == 0;
K
krad 已提交
232 233
}

234
}  // namespace
235
#endif
K
krad 已提交
236

237 238 239
/*
 * PosixSequentialFile
 */
A
Aaron Gao 已提交
240 241
PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* file,
                                         int fd, const EnvOptions& options)
242
    : filename_(fname),
A
Aaron Gao 已提交
243 244
      file_(file),
      fd_(fd),
A
Aaron Gao 已提交
245 246
      use_direct_io_(options.use_direct_reads),
      logical_sector_size_(GetLogicalBufferSize(fd_)) {
A
Aaron Gao 已提交
247 248
  assert(!options.use_direct_reads || !options.use_mmap_reads);
}
249

A
Aaron Gao 已提交
250
PosixSequentialFile::~PosixSequentialFile() {
251
  if (!use_direct_io()) {
A
Aaron Gao 已提交
252 253 254 255 256 257 258
    assert(file_);
    fclose(file_);
  } else {
    assert(fd_);
    close(fd_);
  }
}
259 260

Status PosixSequentialFile::Read(size_t n, Slice* result, char* scratch) {
261
  assert(result != nullptr && !use_direct_io());
262 263 264 265 266 267 268 269 270 271 272 273 274 275
  Status s;
  size_t r = 0;
  do {
    r = fread_unlocked(scratch, 1, n, file_);
  } while (r == 0 && ferror(file_) && errno == EINTR);
  *result = Slice(scratch, r);
  if (r < n) {
    if (feof(file_)) {
      // We leave status as ok if we hit the end of the file
      // We also clear the error so that the reads can continue
      // if a new data is written to the file
      clearerr(file_);
    } else {
      // A partial read with an error: return a non-ok status
276
      s = IOError("While reading file sequentially", filename_, errno);
277 278
    }
  }
A
Aaron Gao 已提交
279 280 281 282 283
  return s;
}

Status PosixSequentialFile::PositionedRead(uint64_t offset, size_t n,
                                           Slice* result, char* scratch) {
284 285 286 287 288
  assert(use_direct_io());
  assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
  assert(IsSectorAligned(n, GetRequiredBufferAlignment()));
  assert(IsSectorAligned(scratch, GetRequiredBufferAlignment()));

A
Aaron Gao 已提交
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
  Status s;
  ssize_t r = -1;
  size_t left = n;
  char* ptr = scratch;
  while (left > 0) {
    r = pread(fd_, ptr, left, static_cast<off_t>(offset));
    if (r <= 0) {
      if (r == -1 && errno == EINTR) {
        continue;
      }
      break;
    }
    ptr += r;
    offset += r;
    left -= r;
    if (r % static_cast<ssize_t>(GetRequiredBufferAlignment()) != 0) {
      // Bytes reads don't fill sectors. Should only happen at the end
      // of the file.
      break;
    }
309
  }
A
Aaron Gao 已提交
310 311
  if (r < 0) {
    // An error: return a non-ok status
312 313 314
    s = IOError(
        "While pread " + ToString(n) + " bytes from offset " + ToString(offset),
        filename_, errno);
A
Aaron Gao 已提交
315 316
  }
  *result = Slice(scratch, (r < 0) ? 0 : n - left);
317 318 319 320 321
  return s;
}

Status PosixSequentialFile::Skip(uint64_t n) {
  if (fseek(file_, static_cast<long int>(n), SEEK_CUR)) {
322 323
    return IOError("While fseek to skip " + ToString(n) + " bytes", filename_,
                   errno);
324 325 326 327 328 329
  }
  return Status::OK();
}

Status PosixSequentialFile::InvalidateCache(size_t offset, size_t length) {
#ifndef OS_LINUX
330 331
  (void)offset;
  (void)length;
332 333
  return Status::OK();
#else
334
  if (!use_direct_io()) {
A
Aaron Gao 已提交
335 336 337
    // free OS pages
    int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
    if (ret != 0) {
338 339 340
      return IOError("While fadvise NotNeeded offset " + ToString(offset) +
                         " len " + ToString(length),
                     filename_, errno);
A
Aaron Gao 已提交
341
    }
342
  }
K
krad 已提交
343
  return Status::OK();
A
Aaron Gao 已提交
344
#endif
K
krad 已提交
345 346 347 348 349
}

/*
 * PosixRandomAccessFile
 */
350
#if defined(OS_LINUX)
K
krad 已提交
351
size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
352 353 354 355 356 357 358 359 360 361 362 363
  if (max_size < kMaxVarint64Length * 3) {
    return 0;
  }

  struct stat buf;
  int result = fstat(fd, &buf);
  if (result == -1) {
    return 0;
  }

  long version = 0;
  result = ioctl(fd, FS_IOC_GETVERSION, &version);
K
krad 已提交
364
  TEST_SYNC_POINT_CALLBACK("GetUniqueIdFromFile:FS_IOC_GETVERSION", &result);
365 366 367 368 369 370 371 372 373 374 375 376
  if (result == -1) {
    return 0;
  }
  uint64_t uversion = (uint64_t)version;

  char* rid = id;
  rid = EncodeVarint64(rid, buf.st_dev);
  rid = EncodeVarint64(rid, buf.st_ino);
  rid = EncodeVarint64(rid, uversion);
  assert(rid >= id);
  return static_cast<size_t>(rid - id);
}
K
krad 已提交
377 378
#endif

T
Tomas Kolda 已提交
379
#if defined(OS_MACOSX) || defined(OS_AIX)
K
krad 已提交
380
size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
K
krad 已提交
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396
  if (max_size < kMaxVarint64Length * 3) {
    return 0;
  }

  struct stat buf;
  int result = fstat(fd, &buf);
  if (result == -1) {
    return 0;
  }

  char* rid = id;
  rid = EncodeVarint64(rid, buf.st_dev);
  rid = EncodeVarint64(rid, buf.st_ino);
  rid = EncodeVarint64(rid, buf.st_gen);
  assert(rid >= id);
  return static_cast<size_t>(rid - id);
397 398 399 400 401 402 403 404 405
}
#endif
/*
 * PosixRandomAccessFile
 *
 * pread() based random-access
 */
PosixRandomAccessFile::PosixRandomAccessFile(const std::string& fname, int fd,
                                             const EnvOptions& options)
A
Aaron Gao 已提交
406 407 408 409
    : filename_(fname),
      fd_(fd),
      use_direct_io_(options.use_direct_reads),
      logical_sector_size_(GetLogicalBufferSize(fd_)) {
A
Aaron Gao 已提交
410
  assert(!options.use_direct_reads || !options.use_mmap_reads);
411 412 413 414 415 416 417
  assert(!options.use_mmap_reads || sizeof(void*) < 8);
}

PosixRandomAccessFile::~PosixRandomAccessFile() { close(fd_); }

Status PosixRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
                                   char* scratch) const {
418 419 420 421 422
  if (use_direct_io()) {
    assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
    assert(IsSectorAligned(n, GetRequiredBufferAlignment()));
    assert(IsSectorAligned(scratch, GetRequiredBufferAlignment()));
  }
423 424 425 426 427 428 429
  Status s;
  ssize_t r = -1;
  size_t left = n;
  char* ptr = scratch;
  while (left > 0) {
    r = pread(fd_, ptr, left, static_cast<off_t>(offset));
    if (r <= 0) {
A
Aaron Gao 已提交
430
      if (r == -1 && errno == EINTR) {
431 432 433 434 435 436 437
        continue;
      }
      break;
    }
    ptr += r;
    offset += r;
    left -= r;
438
    if (use_direct_io() &&
A
Aaron Gao 已提交
439 440 441 442 443
        r % static_cast<ssize_t>(GetRequiredBufferAlignment()) != 0) {
      // Bytes reads don't fill sectors. Should only happen at the end
      // of the file.
      break;
    }
444 445 446
  }
  if (r < 0) {
    // An error: return a non-ok status
447 448 449
    s = IOError(
        "While pread offset " + ToString(offset) + " len " + ToString(n),
        filename_, errno);
450
  }
A
Aaron Gao 已提交
451
  *result = Slice(scratch, (r < 0) ? 0 : n - left);
452 453 454
  return s;
}

455 456 457 458 459 460 461 462 463 464 465 466 467 468
Status PosixRandomAccessFile::Prefetch(uint64_t offset, size_t n) {
  Status s;
  if (!use_direct_io()) {
    ssize_t r = 0;
#ifdef OS_LINUX
    r = readahead(fd_, offset, n);
#endif
#ifdef OS_MACOSX
    radvisory advice;
    advice.ra_offset = static_cast<off_t>(offset);
    advice.ra_count = static_cast<int>(n);
    r = fcntl(fd_, F_RDADVISE, &advice);
#endif
    if (r == -1) {
469 470 471
      s = IOError("While prefetching offset " + ToString(offset) + " len " +
                      ToString(n),
                  filename_, errno);
472 473 474 475 476
    }
  }
  return s;
}

T
Tomas Kolda 已提交
477
#if defined(OS_LINUX) || defined(OS_MACOSX) || defined(OS_AIX)
478
size_t PosixRandomAccessFile::GetUniqueId(char* id, size_t max_size) const {
K
krad 已提交
479
  return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size);
480 481 482 483
}
#endif

void PosixRandomAccessFile::Hint(AccessPattern pattern) {
484
  if (use_direct_io()) {
A
Aaron Gao 已提交
485 486
    return;
  }
487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
  switch (pattern) {
    case NORMAL:
      Fadvise(fd_, 0, 0, POSIX_FADV_NORMAL);
      break;
    case RANDOM:
      Fadvise(fd_, 0, 0, POSIX_FADV_RANDOM);
      break;
    case SEQUENTIAL:
      Fadvise(fd_, 0, 0, POSIX_FADV_SEQUENTIAL);
      break;
    case WILLNEED:
      Fadvise(fd_, 0, 0, POSIX_FADV_WILLNEED);
      break;
    case DONTNEED:
      Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED);
      break;
    default:
      assert(false);
      break;
  }
}

Status PosixRandomAccessFile::InvalidateCache(size_t offset, size_t length) {
510
  if (use_direct_io()) {
A
Aaron Gao 已提交
511 512
    return Status::OK();
  }
513
#ifndef OS_LINUX
514 515
  (void)offset;
  (void)length;
516 517 518 519 520 521 522
  return Status::OK();
#else
  // free OS pages
  int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
  if (ret == 0) {
    return Status::OK();
  }
523 524 525
  return IOError("While fadvise NotNeeded offset " + ToString(offset) +
                     " len " + ToString(length),
                 filename_, errno);
526 527 528 529 530 531 532 533 534 535 536 537 538 539
#endif
}

/*
 * PosixMmapReadableFile
 *
 * mmap() based random-access
 */
// base[0,length-1] contains the mmapped contents of the file.
PosixMmapReadableFile::PosixMmapReadableFile(const int fd,
                                             const std::string& fname,
                                             void* base, size_t length,
                                             const EnvOptions& options)
    : fd_(fd), filename_(fname), mmapped_region_(base), length_(length) {
540 541 542
#ifdef NDEBUG
  (void)options;
#endif
543 544
  fd_ = fd_ + 0;  // suppress the warning for used variables
  assert(options.use_mmap_reads);
A
Aaron Gao 已提交
545
  assert(!options.use_direct_reads);
546 547 548 549 550 551 552 553
}

PosixMmapReadableFile::~PosixMmapReadableFile() {
  int ret = munmap(mmapped_region_, length_);
  if (ret != 0) {
    fprintf(stdout, "failed to munmap %p length %" ROCKSDB_PRIszt " \n",
            mmapped_region_, length_);
  }
554
  close(fd_);
555 556 557
}

Status PosixMmapReadableFile::Read(uint64_t offset, size_t n, Slice* result,
A
Andrew Kryczka 已提交
558
                                   char* /*scratch*/) const {
559 560 561
  Status s;
  if (offset > length_) {
    *result = Slice();
562 563 564
    return IOError("While mmap read offset " + ToString(offset) +
                       " larger than file length " + ToString(length_),
                   filename_, EINVAL);
565 566 567 568 569 570 571 572 573
  } else if (offset + n > length_) {
    n = static_cast<size_t>(length_ - offset);
  }
  *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
  return s;
}

Status PosixMmapReadableFile::InvalidateCache(size_t offset, size_t length) {
#ifndef OS_LINUX
574 575
  (void)offset;
  (void)length;
576 577 578 579 580 581 582
  return Status::OK();
#else
  // free OS pages
  int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
  if (ret == 0) {
    return Status::OK();
  }
583 584 585
  return IOError("While fadvise not needed. Offset " + ToString(offset) +
                     " len" + ToString(length),
                 filename_, errno);
586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601
#endif
}

/*
 * PosixMmapFile
 *
 * We preallocate up to an extra megabyte and use memcpy to append new
 * data to the file.  This is safe since we either properly close the
 * file before reading from it, or for log files, the reading code
 * knows enough to skip zero suffixes.
 */
Status PosixMmapFile::UnmapCurrentRegion() {
  TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0", rocksdb_kill_odds);
  if (base_ != nullptr) {
    int munmap_status = munmap(base_, limit_ - base_);
    if (munmap_status != 0) {
602
      return IOError("While munmap", filename_, munmap_status);
603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664
    }
    file_offset_ += limit_ - base_;
    base_ = nullptr;
    limit_ = nullptr;
    last_sync_ = nullptr;
    dst_ = nullptr;

    // Increase the amount we map the next time, but capped at 1MB
    if (map_size_ < (1 << 20)) {
      map_size_ *= 2;
    }
  }
  return Status::OK();
}

Status PosixMmapFile::MapNewRegion() {
#ifdef ROCKSDB_FALLOCATE_PRESENT
  assert(base_ == nullptr);
  TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0", rocksdb_kill_odds);
  // we can't fallocate with FALLOC_FL_KEEP_SIZE here
  if (allow_fallocate_) {
    IOSTATS_TIMER_GUARD(allocate_nanos);
    int alloc_status = fallocate(fd_, 0, file_offset_, map_size_);
    if (alloc_status != 0) {
      // fallback to posix_fallocate
      alloc_status = posix_fallocate(fd_, file_offset_, map_size_);
    }
    if (alloc_status != 0) {
      return Status::IOError("Error allocating space to file : " + filename_ +
                             "Error : " + strerror(alloc_status));
    }
  }

  TEST_KILL_RANDOM("PosixMmapFile::Append:1", rocksdb_kill_odds);
  void* ptr = mmap(nullptr, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_,
                   file_offset_);
  if (ptr == MAP_FAILED) {
    return Status::IOError("MMap failed on " + filename_);
  }
  TEST_KILL_RANDOM("PosixMmapFile::Append:2", rocksdb_kill_odds);

  base_ = reinterpret_cast<char*>(ptr);
  limit_ = base_ + map_size_;
  dst_ = base_;
  last_sync_ = base_;
  return Status::OK();
#else
  return Status::NotSupported("This platform doesn't support fallocate()");
#endif
}

Status PosixMmapFile::Msync() {
  if (dst_ == last_sync_) {
    return Status::OK();
  }
  // Find the beginnings of the pages that contain the first and last
  // bytes to be synced.
  size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
  size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
  last_sync_ = dst_;
  TEST_KILL_RANDOM("PosixMmapFile::Msync:0", rocksdb_kill_odds);
  if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
665
    return IOError("While msync", filename_, errno);
666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683
  }
  return Status::OK();
}

PosixMmapFile::PosixMmapFile(const std::string& fname, int fd, size_t page_size,
                             const EnvOptions& options)
    : filename_(fname),
      fd_(fd),
      page_size_(page_size),
      map_size_(Roundup(65536, page_size)),
      base_(nullptr),
      limit_(nullptr),
      dst_(nullptr),
      last_sync_(nullptr),
      file_offset_(0) {
#ifdef ROCKSDB_FALLOCATE_PRESENT
  allow_fallocate_ = options.allow_fallocate;
  fallocate_with_keep_size_ = options.fallocate_with_keep_size;
684 685
#else
  (void)options;
686 687 688
#endif
  assert((page_size & (page_size - 1)) == 0);
  assert(options.use_mmap_writes);
A
Aaron Gao 已提交
689
  assert(!options.use_direct_writes);
690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717
}

PosixMmapFile::~PosixMmapFile() {
  if (fd_ >= 0) {
    PosixMmapFile::Close();
  }
}

Status PosixMmapFile::Append(const Slice& data) {
  const char* src = data.data();
  size_t left = data.size();
  while (left > 0) {
    assert(base_ <= dst_);
    assert(dst_ <= limit_);
    size_t avail = limit_ - dst_;
    if (avail == 0) {
      Status s = UnmapCurrentRegion();
      if (!s.ok()) {
        return s;
      }
      s = MapNewRegion();
      if (!s.ok()) {
        return s;
      }
      TEST_KILL_RANDOM("PosixMmapFile::Append:0", rocksdb_kill_odds);
    }

    size_t n = (left <= avail) ? left : avail;
718
    assert(dst_);
719 720 721 722 723 724 725 726 727 728 729 730 731 732
    memcpy(dst_, src, n);
    dst_ += n;
    src += n;
    left -= n;
  }
  return Status::OK();
}

Status PosixMmapFile::Close() {
  Status s;
  size_t unused = limit_ - dst_;

  s = UnmapCurrentRegion();
  if (!s.ok()) {
733
    s = IOError("While closing mmapped file", filename_, errno);
734 735 736
  } else if (unused > 0) {
    // Trim the extra space at the end of the file
    if (ftruncate(fd_, file_offset_ - unused) < 0) {
737
      s = IOError("While ftruncating mmaped file", filename_, errno);
738 739 740 741 742
    }
  }

  if (close(fd_) < 0) {
    if (s.ok()) {
743
      s = IOError("While closing mmapped file", filename_, errno);
744 745 746 747 748 749 750 751 752 753 754 755 756
    }
  }

  fd_ = -1;
  base_ = nullptr;
  limit_ = nullptr;
  return s;
}

Status PosixMmapFile::Flush() { return Status::OK(); }

Status PosixMmapFile::Sync() {
  if (fdatasync(fd_) < 0) {
757
    return IOError("While fdatasync mmapped file", filename_, errno);
758 759 760 761 762 763 764 765 766 767
  }

  return Msync();
}

/**
 * Flush data as well as metadata to stable storage.
 */
Status PosixMmapFile::Fsync() {
  if (fsync(fd_) < 0) {
768
    return IOError("While fsync mmaped file", filename_, errno);
769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785
  }

  return Msync();
}

/**
 * Get the size of valid data in the file. This will not match the
 * size that is returned from the filesystem because we use mmap
 * to extend file by map_size every time.
 */
uint64_t PosixMmapFile::GetFileSize() {
  size_t used = dst_ - base_;
  return file_offset_ + used;
}

Status PosixMmapFile::InvalidateCache(size_t offset, size_t length) {
#ifndef OS_LINUX
786 787
  (void)offset;
  (void)length;
788 789 790 791 792 793 794
  return Status::OK();
#else
  // free OS pages
  int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
  if (ret == 0) {
    return Status::OK();
  }
795
  return IOError("While fadvise NotNeeded mmapped file", filename_, errno);
796 797 798
#endif
}

799
#ifdef ROCKSDB_FALLOCATE_PRESENT
800 801 802
Status PosixMmapFile::Allocate(uint64_t offset, uint64_t len) {
  assert(offset <= std::numeric_limits<off_t>::max());
  assert(len <= std::numeric_limits<off_t>::max());
803 804 805
  TEST_KILL_RANDOM("PosixMmapFile::Allocate:0", rocksdb_kill_odds);
  int alloc_status = 0;
  if (allow_fallocate_) {
806 807 808
    alloc_status =
        fallocate(fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0,
                  static_cast<off_t>(offset), static_cast<off_t>(len));
809 810 811 812
  }
  if (alloc_status == 0) {
    return Status::OK();
  } else {
813 814 815
    return IOError(
        "While fallocate offset " + ToString(offset) + " len " + ToString(len),
        filename_, errno);
816
  }
817
}
818
#endif
819 820 821 822 823 824 825 826

/*
 * PosixWritableFile
 *
 * Use posix write to write data to a file.
 */
PosixWritableFile::PosixWritableFile(const std::string& fname, int fd,
                                     const EnvOptions& options)
827 828
    : WritableFile(options),
      filename_(fname),
829
      use_direct_io_(options.use_direct_writes),
A
Aaron Gao 已提交
830
      fd_(fd),
A
Aaron Gao 已提交
831 832
      filesize_(0),
      logical_sector_size_(GetLogicalBufferSize(fd_)) {
833 834 835 836
#ifdef ROCKSDB_FALLOCATE_PRESENT
  allow_fallocate_ = options.allow_fallocate;
  fallocate_with_keep_size_ = options.fallocate_with_keep_size;
#endif
837 838 839
#ifdef ROCKSDB_RANGESYNC_PRESENT
  sync_file_range_supported_ = IsSyncFileRangeSupported(fd_);
#endif  // ROCKSDB_RANGESYNC_PRESENT
840 841 842 843 844 845 846 847 848 849
  assert(!options.use_mmap_writes);
}

PosixWritableFile::~PosixWritableFile() {
  if (fd_ >= 0) {
    PosixWritableFile::Close();
  }
}

Status PosixWritableFile::Append(const Slice& data) {
850 851 852 853
  if (use_direct_io()) {
    assert(IsSectorAligned(data.size(), GetRequiredBufferAlignment()));
    assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment()));
  }
854
  const char* src = data.data();
855 856 857 858
  size_t nbytes = data.size();

  if (!PosixWrite(fd_, src, nbytes)) {
    return IOError("While appending to file", filename_, errno);
859
  }
860 861

  filesize_ += nbytes;
862 863 864
  return Status::OK();
}

865
Status PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset) {
866 867 868 869 870
  if (use_direct_io()) {
    assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
    assert(IsSectorAligned(data.size(), GetRequiredBufferAlignment()));
    assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment()));
  }
871 872
  assert(offset <= std::numeric_limits<off_t>::max());
  const char* src = data.data();
873 874 875 876
  size_t nbytes = data.size();
  if (!PosixPositionedWrite(fd_, src, nbytes, static_cast<off_t>(offset))) {
    return IOError("While pwrite to file at offset " + ToString(offset),
                   filename_, errno);
877
  }
878
  filesize_ = offset + nbytes;
879 880 881
  return Status::OK();
}

A
Aaron Gao 已提交
882 883 884 885
Status PosixWritableFile::Truncate(uint64_t size) {
  Status s;
  int r = ftruncate(fd_, size);
  if (r < 0) {
886 887
    s = IOError("While ftruncate file to size " + ToString(size), filename_,
                errno);
A
Aaron Gao 已提交
888 889 890 891 892 893
  } else {
    filesize_ = size;
  }
  return s;
}

894 895 896 897 898 899 900 901 902 903
Status PosixWritableFile::Close() {
  Status s;

  size_t block_size;
  size_t last_allocated_block;
  GetPreallocationStatus(&block_size, &last_allocated_block);
  if (last_allocated_block > 0) {
    // trim the extra space preallocated at the end of the file
    // NOTE(ljin): we probably don't want to surface failure as an IOError,
    // but it will be nice to log these errors.
T
Tamir Duberstein 已提交
904
    int dummy __attribute__((__unused__));
905
    dummy = ftruncate(fd_, filesize_);
906
#if defined(ROCKSDB_FALLOCATE_PRESENT) && defined(FALLOC_FL_PUNCH_HOLE) && \
907
    !defined(TRAVIS)
908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923
    // in some file systems, ftruncate only trims trailing space if the
    // new file size is smaller than the current size. Calling fallocate
    // with FALLOC_FL_PUNCH_HOLE flag to explicitly release these unused
    // blocks. FALLOC_FL_PUNCH_HOLE is supported on at least the following
    // filesystems:
    //   XFS (since Linux 2.6.38)
    //   ext4 (since Linux 3.0)
    //   Btrfs (since Linux 3.7)
    //   tmpfs (since Linux 3.5)
    // We ignore error since failure of this operation does not affect
    // correctness.
    // TRAVIS - this code does not work on TRAVIS filesystems.
    // the FALLOC_FL_KEEP_SIZE option is expected to not change the size
    // of the file, but it does. Simple strace report will show that.
    // While we work with Travis-CI team to figure out if this is a
    // quirk of Docker/AUFS, we will comment this out.
924
    struct stat file_stats;
925
    int result = fstat(fd_, &file_stats);
926 927
    // After ftruncate, we check whether ftruncate has the correct behavior.
    // If not, we should hack it with FALLOC_FL_PUNCH_HOLE
928 929
    if (result == 0 &&
        (file_stats.st_size + file_stats.st_blksize - 1) /
930 931
                file_stats.st_blksize !=
            file_stats.st_blocks / (file_stats.st_blksize / 512)) {
932 933 934 935 936
      IOSTATS_TIMER_GUARD(allocate_nanos);
      if (allow_fallocate_) {
        fallocate(fd_, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE, filesize_,
                  block_size * last_allocated_block - filesize_);
      }
937 938
    }
#endif
939 940 941
  }

  if (close(fd_) < 0) {
942
    s = IOError("While closing file after writing", filename_, errno);
943 944 945 946 947 948 949 950 951 952
  }
  fd_ = -1;
  return s;
}

// write out the cached data to the OS cache
Status PosixWritableFile::Flush() { return Status::OK(); }

Status PosixWritableFile::Sync() {
  if (fdatasync(fd_) < 0) {
953
    return IOError("While fdatasync", filename_, errno);
954 955 956 957 958 959
  }
  return Status::OK();
}

Status PosixWritableFile::Fsync() {
  if (fsync(fd_) < 0) {
960
    return IOError("While fsync", filename_, errno);
961 962 963 964 965 966 967 968
  }
  return Status::OK();
}

bool PosixWritableFile::IsSyncThreadSafe() const { return true; }

uint64_t PosixWritableFile::GetFileSize() { return filesize_; }

S
Stream  
Shaohua Li 已提交
969 970
void PosixWritableFile::SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) {
#ifdef OS_LINUX
971 972
// Suppress Valgrind "Unimplemented functionality" error.
#ifndef ROCKSDB_VALGRIND_RUN
S
Stream  
Shaohua Li 已提交
973 974 975 976 977 978
  if (hint == write_hint_) {
    return;
  }
  if (fcntl(fd_, F_SET_RW_HINT, &hint) == 0) {
    write_hint_ = hint;
  }
979 980
#else
  (void)hint;
981
#endif  // ROCKSDB_VALGRIND_RUN
982 983
#else
  (void)hint;
984
#endif  // OS_LINUX
S
Stream  
Shaohua Li 已提交
985 986
}

987
Status PosixWritableFile::InvalidateCache(size_t offset, size_t length) {
988
  if (use_direct_io()) {
A
Aaron Gao 已提交
989 990
    return Status::OK();
  }
991
#ifndef OS_LINUX
992 993
  (void)offset;
  (void)length;
994 995 996 997 998 999 1000
  return Status::OK();
#else
  // free OS pages
  int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
  if (ret == 0) {
    return Status::OK();
  }
1001
  return IOError("While fadvise NotNeeded", filename_, errno);
1002 1003 1004
#endif
}

1005
#ifdef ROCKSDB_FALLOCATE_PRESENT
1006 1007 1008
Status PosixWritableFile::Allocate(uint64_t offset, uint64_t len) {
  assert(offset <= std::numeric_limits<off_t>::max());
  assert(len <= std::numeric_limits<off_t>::max());
1009 1010 1011 1012
  TEST_KILL_RANDOM("PosixWritableFile::Allocate:0", rocksdb_kill_odds);
  IOSTATS_TIMER_GUARD(allocate_nanos);
  int alloc_status = 0;
  if (allow_fallocate_) {
1013 1014 1015
    alloc_status =
        fallocate(fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0,
                  static_cast<off_t>(offset), static_cast<off_t>(len));
1016 1017 1018 1019
  }
  if (alloc_status == 0) {
    return Status::OK();
  } else {
1020 1021 1022
    return IOError(
        "While fallocate offset " + ToString(offset) + " len " + ToString(len),
        filename_, errno);
1023 1024
  }
}
1025
#endif
1026

1027
Status PosixWritableFile::RangeSync(uint64_t offset, uint64_t nbytes) {
1028
#ifdef ROCKSDB_RANGESYNC_PRESENT
1029 1030
  assert(offset <= std::numeric_limits<off_t>::max());
  assert(nbytes <= std::numeric_limits<off_t>::max());
1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047
  if (sync_file_range_supported_) {
    int ret;
    if (strict_bytes_per_sync_) {
      // Specifying `SYNC_FILE_RANGE_WAIT_BEFORE` together with an offset/length
      // that spans all bytes written so far tells `sync_file_range` to wait for
      // any outstanding writeback requests to finish before issuing a new one.
      ret =
          sync_file_range(fd_, 0, static_cast<off_t>(offset + nbytes),
                          SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE);
    } else {
      ret = sync_file_range(fd_, static_cast<off_t>(offset),
                            static_cast<off_t>(nbytes), SYNC_FILE_RANGE_WRITE);
    }
    if (ret != 0) {
      return IOError("While sync_file_range returned " + ToString(ret),
                     filename_, errno);
    }
1048 1049
    return Status::OK();
  }
1050 1051
#endif  // ROCKSDB_RANGESYNC_PRESENT
  return WritableFile::RangeSync(offset, nbytes);
1052 1053
}

T
Tomas Kolda 已提交
1054
#ifdef OS_LINUX
1055
size_t PosixWritableFile::GetUniqueId(char* id, size_t max_size) const {
K
krad 已提交
1056
  return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size);
1057
}
1058
#endif
1059

I
Islam AbdelRahman 已提交
1060 1061 1062 1063 1064
/*
 * PosixRandomRWFile
 */

PosixRandomRWFile::PosixRandomRWFile(const std::string& fname, int fd,
A
Andrew Kryczka 已提交
1065
                                     const EnvOptions& /*options*/)
I
Islam AbdelRahman 已提交
1066 1067 1068 1069 1070 1071 1072 1073 1074 1075
    : filename_(fname), fd_(fd) {}

PosixRandomRWFile::~PosixRandomRWFile() {
  if (fd_ >= 0) {
    Close();
  }
}

Status PosixRandomRWFile::Write(uint64_t offset, const Slice& data) {
  const char* src = data.data();
1076 1077 1078 1079 1080
  size_t nbytes = data.size();
  if (!PosixPositionedWrite(fd_, src, nbytes, static_cast<off_t>(offset))) {
    return IOError(
        "While write random read/write file at offset " + ToString(offset),
        filename_, errno);
I
Islam AbdelRahman 已提交
1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097
  }

  return Status::OK();
}

Status PosixRandomRWFile::Read(uint64_t offset, size_t n, Slice* result,
                               char* scratch) const {
  size_t left = n;
  char* ptr = scratch;
  while (left > 0) {
    ssize_t done = pread(fd_, ptr, left, offset);
    if (done < 0) {
      // error while reading from file
      if (errno == EINTR) {
        // read was interrupted, try again.
        continue;
      }
1098 1099 1100
      return IOError("While reading random read/write file offset " +
                         ToString(offset) + " len " + ToString(n),
                     filename_, errno);
I
Islam AbdelRahman 已提交
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119
    } else if (done == 0) {
      // Nothing more to read
      break;
    }

    // Read `done` bytes
    ptr += done;
    offset += done;
    left -= done;
  }

  *result = Slice(scratch, n - left);
  return Status::OK();
}

Status PosixRandomRWFile::Flush() { return Status::OK(); }

Status PosixRandomRWFile::Sync() {
  if (fdatasync(fd_) < 0) {
1120
    return IOError("While fdatasync random read/write file", filename_, errno);
I
Islam AbdelRahman 已提交
1121 1122 1123 1124 1125 1126
  }
  return Status::OK();
}

Status PosixRandomRWFile::Fsync() {
  if (fsync(fd_) < 0) {
1127
    return IOError("While fsync random read/write file", filename_, errno);
I
Islam AbdelRahman 已提交
1128 1129 1130 1131 1132 1133
  }
  return Status::OK();
}

Status PosixRandomRWFile::Close() {
  if (close(fd_) < 0) {
1134
    return IOError("While close random read/write file", filename_, errno);
I
Islam AbdelRahman 已提交
1135 1136 1137 1138 1139
  }
  fd_ = -1;
  return Status::OK();
}

1140 1141
PosixMemoryMappedFileBuffer::~PosixMemoryMappedFileBuffer() {
  // TODO should have error handling though not much we can do...
D
Dmitri Smirnov 已提交
1142
  munmap(this->base_, length_);
1143 1144
}

K
krad 已提交
1145 1146 1147 1148
/*
 * PosixDirectory
 */

1149 1150 1151
PosixDirectory::~PosixDirectory() { close(fd_); }

Status PosixDirectory::Fsync() {
T
Tomas Kolda 已提交
1152
#ifndef OS_AIX
1153
  if (fsync(fd_) == -1) {
1154
    return IOError("While fsync", "a directory", errno);
1155
  }
T
Tomas Kolda 已提交
1156
#endif
1157 1158
  return Status::OK();
}
1159 1160
}  // namespace rocksdb
#endif