io_posix.cc 44.2 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5 6 7 8 9 10
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#ifdef ROCKSDB_LIB_IO_POSIX
11
#include "env/io_posix.h"
S
sdong 已提交
12 13
#include <errno.h>
#include <fcntl.h>
K
krad 已提交
14
#include <algorithm>
S
sdong 已提交
15 16
#if defined(OS_LINUX)
#include <linux/fs.h>
17
#ifndef FALLOC_FL_KEEP_SIZE
18
#include <linux/falloc.h>
S
sdong 已提交
19
#endif
20
#endif
S
sdong 已提交
21 22 23 24 25 26 27 28 29 30
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#ifdef OS_LINUX
#include <sys/statfs.h>
#include <sys/syscall.h>
31
#include <sys/sysmacros.h>
S
sdong 已提交
32
#endif
33
#include "monitoring/iostats_context_imp.h"
S
sdong 已提交
34 35
#include "port/port.h"
#include "rocksdb/slice.h"
36
#include "test_util/sync_point.h"
37
#include "util/autovector.h"
S
sdong 已提交
38 39
#include "util/coding.h"
#include "util/string_util.h"
40

S
Stream  
Shaohua Li 已提交
41 42
#if defined(OS_LINUX) && !defined(F_SET_RW_HINT)
#define F_LINUX_SPECIFIC_BASE 1024
43
#define F_SET_RW_HINT (F_LINUX_SPECIFIC_BASE + 12)
S
Stream  
Shaohua Li 已提交
44 45
#endif

46
namespace ROCKSDB_NAMESPACE {
47

48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
std::string IOErrorMsg(const std::string& context,
                       const std::string& file_name) {
  if (file_name.empty()) {
    return context;
  }
  return context + ": " + file_name;
}

// file_name can be left empty if it is not unkown.
IOStatus IOError(const std::string& context, const std::string& file_name,
                 int err_number) {
  switch (err_number) {
    case ENOSPC: {
      IOStatus s = IOStatus::NoSpace(IOErrorMsg(context, file_name),
                                     strerror(err_number));
      s.SetRetryable(true);
      return s;
    }
    case ESTALE:
      return IOStatus::IOError(IOStatus::kStaleFile);
    case ENOENT:
      return IOStatus::PathNotFound(IOErrorMsg(context, file_name),
                                    strerror(err_number));
    default:
      return IOStatus::IOError(IOErrorMsg(context, file_name),
                               strerror(err_number));
  }
}

77
// A wrapper for fadvise, if the platform doesn't support fadvise,
78
// it will simply return 0.
79 80 81 82
int Fadvise(int fd, off_t offset, size_t len, int advice) {
#ifdef OS_LINUX
  return posix_fadvise(fd, offset, len, advice);
#else
83 84 85 86
  (void)fd;
  (void)offset;
  (void)len;
  (void)advice;
87 88 89 90
  return 0;  // simply do nothing.
#endif
}

A
Aaron Gao 已提交
91
namespace {
92

93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
// On MacOS (and probably *BSD), the posix write and pwrite calls do not support
// buffers larger than 2^31-1 bytes. These two wrappers fix this issue by
// cutting the buffer in 1GB chunks. We use this chunk size to be sure to keep
// the writes aligned.

bool PosixWrite(int fd, const char* buf, size_t nbyte) {
  const size_t kLimit1Gb = 1UL << 30;

  const char* src = buf;
  size_t left = nbyte;

  while (left != 0) {
    size_t bytes_to_write = std::min(left, kLimit1Gb);

    ssize_t done = write(fd, src, bytes_to_write);
    if (done < 0) {
      if (errno == EINTR) {
        continue;
      }
      return false;
    }
    left -= done;
    src += done;
  }
  return true;
}

bool PosixPositionedWrite(int fd, const char* buf, size_t nbyte, off_t offset) {
  const size_t kLimit1Gb = 1UL << 30;

  const char* src = buf;
  size_t left = nbyte;

  while (left != 0) {
    size_t bytes_to_write = std::min(left, kLimit1Gb);

    ssize_t done = pwrite(fd, src, bytes_to_write, offset);
    if (done < 0) {
      if (errno == EINTR) {
        continue;
      }
      return false;
    }
    left -= done;
    offset += done;
    src += done;
  }

  return true;
}

144 145 146 147 148 149 150 151
#ifdef ROCKSDB_RANGESYNC_PRESENT

#if !defined(ZFS_SUPER_MAGIC)
// The magic number for ZFS was not exposed until recently. It should be fixed
// forever so we can just copy the magic number here.
#define ZFS_SUPER_MAGIC 0x2fc12fc1
#endif

152 153 154 155 156 157
bool IsSyncFileRangeSupported(int fd) {
  // The approach taken in this function is to build a blacklist of cases where
  // we know `sync_file_range` definitely will not work properly despite passing
  // the compile-time check (`ROCKSDB_RANGESYNC_PRESENT`). If we are unsure, or
  // if any of the checks fail in unexpected ways, we allow `sync_file_range` to
  // be used. This way should minimize risk of impacting existing use cases.
158 159 160
  struct statfs buf;
  int ret = fstatfs(fd, &buf);
  assert(ret == 0);
161
  if (ret == 0 && buf.f_type == ZFS_SUPER_MAGIC) {
162 163 164 165 166 167
    // Testing on ZFS showed the writeback did not happen asynchronously when
    // `sync_file_range` was called, even though it returned success. Avoid it
    // and use `fdatasync` instead to preserve the contract of `bytes_per_sync`,
    // even though this'll incur extra I/O for metadata.
    return false;
  }
168 169 170 171 172 173 174 175 176 177 178 179

  ret = sync_file_range(fd, 0 /* offset */, 0 /* nbytes */, 0 /* flags */);
  assert(!(ret == -1 && errno != ENOSYS));
  if (ret == -1 && errno == ENOSYS) {
    // `sync_file_range` is not implemented on all platforms even if
    // compile-time checks pass and a supported filesystem is in-use. For
    // example, using ext4 on WSL (Windows Subsystem for Linux),
    // `sync_file_range()` returns `ENOSYS`
    // ("Function not implemented").
    return false;
  }
  // None of the cases on the blacklist matched, so allow `sync_file_range` use.
180 181 182 183 184 185 186 187
  return true;
}

#undef ZFS_SUPER_MAGIC

#endif  // ROCKSDB_RANGESYNC_PRESENT

}  // anonymous namespace
A
Aaron Gao 已提交
188

K
krad 已提交
189 190 191
/*
 * DirectIOHelper
 */
192
#ifndef NDEBUG
K
krad 已提交
193 194
namespace {

A
Aaron Gao 已提交
195 196 197
bool IsSectorAligned(const size_t off, size_t sector_size) {
  return off % sector_size == 0;
}
K
krad 已提交
198

199 200
bool IsSectorAligned(const void* ptr, size_t sector_size) {
  return uintptr_t(ptr) % sector_size == 0;
K
krad 已提交
201 202
}

203
}  // namespace
204
#endif
K
krad 已提交
205

206 207 208
/*
 * PosixSequentialFile
 */
A
Aaron Gao 已提交
209
PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* file,
210 211
                                         int fd, size_t logical_block_size,
                                         const EnvOptions& options)
212
    : filename_(fname),
A
Aaron Gao 已提交
213 214
      file_(file),
      fd_(fd),
A
Aaron Gao 已提交
215
      use_direct_io_(options.use_direct_reads),
216
      logical_sector_size_(logical_block_size) {
A
Aaron Gao 已提交
217 218
  assert(!options.use_direct_reads || !options.use_mmap_reads);
}
219

A
Aaron Gao 已提交
220
PosixSequentialFile::~PosixSequentialFile() {
221
  if (!use_direct_io()) {
A
Aaron Gao 已提交
222 223 224 225 226 227 228
    assert(file_);
    fclose(file_);
  } else {
    assert(fd_);
    close(fd_);
  }
}
229

230 231 232
IOStatus PosixSequentialFile::Read(size_t n, const IOOptions& /*opts*/,
                                   Slice* result, char* scratch,
                                   IODebugContext* /*dbg*/) {
233
  assert(result != nullptr && !use_direct_io());
234
  IOStatus s;
235 236
  size_t r = 0;
  do {
237
    clearerr(file_);
238 239 240 241 242 243 244 245 246 247 248
    r = fread_unlocked(scratch, 1, n, file_);
  } while (r == 0 && ferror(file_) && errno == EINTR);
  *result = Slice(scratch, r);
  if (r < n) {
    if (feof(file_)) {
      // We leave status as ok if we hit the end of the file
      // We also clear the error so that the reads can continue
      // if a new data is written to the file
      clearerr(file_);
    } else {
      // A partial read with an error: return a non-ok status
249
      s = IOError("While reading file sequentially", filename_, errno);
250 251
    }
  }
A
Aaron Gao 已提交
252 253 254
  return s;
}

255 256 257 258
IOStatus PosixSequentialFile::PositionedRead(uint64_t offset, size_t n,
                                             const IOOptions& /*opts*/,
                                             Slice* result, char* scratch,
                                             IODebugContext* /*dbg*/) {
259 260 261 262 263
  assert(use_direct_io());
  assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
  assert(IsSectorAligned(n, GetRequiredBufferAlignment()));
  assert(IsSectorAligned(scratch, GetRequiredBufferAlignment()));

264
  IOStatus s;
A
Aaron Gao 已提交
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
  ssize_t r = -1;
  size_t left = n;
  char* ptr = scratch;
  while (left > 0) {
    r = pread(fd_, ptr, left, static_cast<off_t>(offset));
    if (r <= 0) {
      if (r == -1 && errno == EINTR) {
        continue;
      }
      break;
    }
    ptr += r;
    offset += r;
    left -= r;
    if (r % static_cast<ssize_t>(GetRequiredBufferAlignment()) != 0) {
      // Bytes reads don't fill sectors. Should only happen at the end
      // of the file.
      break;
    }
284
  }
A
Aaron Gao 已提交
285 286
  if (r < 0) {
    // An error: return a non-ok status
287 288 289
    s = IOError(
        "While pread " + ToString(n) + " bytes from offset " + ToString(offset),
        filename_, errno);
A
Aaron Gao 已提交
290 291
  }
  *result = Slice(scratch, (r < 0) ? 0 : n - left);
292 293 294
  return s;
}

295
IOStatus PosixSequentialFile::Skip(uint64_t n) {
296
  if (fseek(file_, static_cast<long int>(n), SEEK_CUR)) {
297 298
    return IOError("While fseek to skip " + ToString(n) + " bytes", filename_,
                   errno);
299
  }
300
  return IOStatus::OK();
301 302
}

303
IOStatus PosixSequentialFile::InvalidateCache(size_t offset, size_t length) {
304
#ifndef OS_LINUX
305 306
  (void)offset;
  (void)length;
307
  return IOStatus::OK();
308
#else
309
  if (!use_direct_io()) {
A
Aaron Gao 已提交
310 311 312
    // free OS pages
    int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
    if (ret != 0) {
313 314 315
      return IOError("While fadvise NotNeeded offset " + ToString(offset) +
                         " len " + ToString(length),
                     filename_, errno);
A
Aaron Gao 已提交
316
    }
317
  }
318
  return IOStatus::OK();
A
Aaron Gao 已提交
319
#endif
K
krad 已提交
320 321 322 323 324
}

/*
 * PosixRandomAccessFile
 */
325
#if defined(OS_LINUX)
K
krad 已提交
326
size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
327 328 329 330 331 332 333 334 335 336 337 338
  if (max_size < kMaxVarint64Length * 3) {
    return 0;
  }

  struct stat buf;
  int result = fstat(fd, &buf);
  if (result == -1) {
    return 0;
  }

  long version = 0;
  result = ioctl(fd, FS_IOC_GETVERSION, &version);
K
krad 已提交
339
  TEST_SYNC_POINT_CALLBACK("GetUniqueIdFromFile:FS_IOC_GETVERSION", &result);
340 341 342 343 344 345 346 347 348 349 350 351
  if (result == -1) {
    return 0;
  }
  uint64_t uversion = (uint64_t)version;

  char* rid = id;
  rid = EncodeVarint64(rid, buf.st_dev);
  rid = EncodeVarint64(rid, buf.st_ino);
  rid = EncodeVarint64(rid, uversion);
  assert(rid >= id);
  return static_cast<size_t>(rid - id);
}
K
krad 已提交
352 353
#endif

T
Tomas Kolda 已提交
354
#if defined(OS_MACOSX) || defined(OS_AIX)
K
krad 已提交
355
size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
K
krad 已提交
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
  if (max_size < kMaxVarint64Length * 3) {
    return 0;
  }

  struct stat buf;
  int result = fstat(fd, &buf);
  if (result == -1) {
    return 0;
  }

  char* rid = id;
  rid = EncodeVarint64(rid, buf.st_dev);
  rid = EncodeVarint64(rid, buf.st_ino);
  rid = EncodeVarint64(rid, buf.st_gen);
  assert(rid >= id);
  return static_cast<size_t>(rid - id);
372 373
}
#endif
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537

#ifdef OS_LINUX
std::string RemoveTrailingSlash(const std::string& path) {
  std::string p = path;
  if (p.size() > 1 && p.back() == '/') {
    p.pop_back();
  }
  return p;
}

Status LogicalBlockSizeCache::RefAndCacheLogicalBlockSize(
    const std::vector<std::string>& directories) {
  std::vector<std::string> dirs;
  dirs.reserve(directories.size());
  for (auto& d : directories) {
    dirs.emplace_back(RemoveTrailingSlash(d));
  }

  std::map<std::string, size_t> dir_sizes;
  {
    ReadLock lock(&cache_mutex_);
    for (const auto& dir : dirs) {
      if (cache_.find(dir) == cache_.end()) {
        dir_sizes.emplace(dir, 0);
      }
    }
  }

  Status s;
  for (auto& dir_size : dir_sizes) {
    s = get_logical_block_size_of_directory_(dir_size.first, &dir_size.second);
    if (!s.ok()) {
      return s;
    }
  }

  WriteLock lock(&cache_mutex_);
  for (const auto& dir : dirs) {
    auto& v = cache_[dir];
    v.ref++;
    auto dir_size = dir_sizes.find(dir);
    if (dir_size != dir_sizes.end()) {
      v.size = dir_size->second;
    }
  }
  return Status::OK();
}

void LogicalBlockSizeCache::UnrefAndTryRemoveCachedLogicalBlockSize(
    const std::vector<std::string>& directories) {
  std::vector<std::string> dirs;
  dirs.reserve(directories.size());
  for (auto& dir : directories) {
    dirs.emplace_back(RemoveTrailingSlash(dir));
  }

  WriteLock lock(&cache_mutex_);
  for (const auto& dir : dirs) {
    auto it = cache_.find(dir);
    if (it != cache_.end() && !(--(it->second.ref))) {
      cache_.erase(it);
    }
  }
}

size_t LogicalBlockSizeCache::GetLogicalBlockSize(const std::string& fname,
                                                  int fd) {
  std::string dir = fname.substr(0, fname.find_last_of("/"));
  if (dir.empty()) {
    dir = "/";
  }
  {
    ReadLock lock(&cache_mutex_);
    auto it = cache_.find(dir);
    if (it != cache_.end()) {
      return it->second.size;
    }
  }
  return get_logical_block_size_of_fd_(fd);
}
#endif

Status PosixHelper::GetLogicalBlockSizeOfDirectory(const std::string& directory,
                                                   size_t* size) {
  int fd = open(directory.c_str(), O_DIRECTORY | O_RDONLY);
  if (fd == -1) {
    close(fd);
    return Status::IOError("Cannot open directory " + directory);
  }
  *size = PosixHelper::GetLogicalBlockSizeOfFd(fd);
  close(fd);
  return Status::OK();
}

size_t PosixHelper::GetLogicalBlockSizeOfFd(int fd) {
#ifdef OS_LINUX
  struct stat buf;
  int result = fstat(fd, &buf);
  if (result == -1) {
    return kDefaultPageSize;
  }
  if (major(buf.st_dev) == 0) {
    // Unnamed devices (e.g. non-device mounts), reserved as null device number.
    // These don't have an entry in /sys/dev/block/. Return a sensible default.
    return kDefaultPageSize;
  }

  // Reading queue/logical_block_size does not require special permissions.
  const int kBufferSize = 100;
  char path[kBufferSize];
  char real_path[PATH_MAX + 1];
  snprintf(path, kBufferSize, "/sys/dev/block/%u:%u", major(buf.st_dev),
           minor(buf.st_dev));
  if (realpath(path, real_path) == nullptr) {
    return kDefaultPageSize;
  }
  std::string device_dir(real_path);
  if (!device_dir.empty() && device_dir.back() == '/') {
    device_dir.pop_back();
  }
  // NOTE: sda3 and nvme0n1p1 do not have a `queue/` subdir, only the parent sda
  // and nvme0n1 have it.
  // $ ls -al '/sys/dev/block/8:3'
  // lrwxrwxrwx. 1 root root 0 Jun 26 01:38 /sys/dev/block/8:3 ->
  // ../../block/sda/sda3
  // $ ls -al '/sys/dev/block/259:4'
  // lrwxrwxrwx 1 root root 0 Jan 31 16:04 /sys/dev/block/259:4 ->
  // ../../devices/pci0000:17/0000:17:00.0/0000:18:00.0/nvme/nvme0/nvme0n1/nvme0n1p1
  size_t parent_end = device_dir.rfind('/', device_dir.length() - 1);
  if (parent_end == std::string::npos) {
    return kDefaultPageSize;
  }
  size_t parent_begin = device_dir.rfind('/', parent_end - 1);
  if (parent_begin == std::string::npos) {
    return kDefaultPageSize;
  }
  std::string parent =
      device_dir.substr(parent_begin + 1, parent_end - parent_begin - 1);
  std::string child = device_dir.substr(parent_end + 1, std::string::npos);
  if (parent != "block" &&
      (child.compare(0, 4, "nvme") || child.find('p') != std::string::npos)) {
    device_dir = device_dir.substr(0, parent_end);
  }
  std::string fname = device_dir + "/queue/logical_block_size";
  FILE* fp;
  size_t size = 0;
  fp = fopen(fname.c_str(), "r");
  if (fp != nullptr) {
    char* line = nullptr;
    size_t len = 0;
    if (getline(&line, &len, fp) != -1) {
      sscanf(line, "%zu", &size);
    }
    free(line);
    fclose(fp);
  }
  if (size != 0 && (size & (size - 1)) == 0) {
    return size;
  }
#endif
  (void)fd;
  return kDefaultPageSize;
}

538 539 540 541 542
/*
 * PosixRandomAccessFile
 *
 * pread() based random-access
 */
543
PosixRandomAccessFile::PosixRandomAccessFile(
544 545
    const std::string& fname, int fd, size_t logical_block_size,
    const EnvOptions& options
546 547 548 549 550
#if defined(ROCKSDB_IOURING_PRESENT)
    ,
    ThreadLocalPtr* thread_local_io_urings
#endif
    )
A
Aaron Gao 已提交
551 552 553
    : filename_(fname),
      fd_(fd),
      use_direct_io_(options.use_direct_reads),
554
      logical_sector_size_(logical_block_size)
555 556 557 558 559
#if defined(ROCKSDB_IOURING_PRESENT)
      ,
      thread_local_io_urings_(thread_local_io_urings)
#endif
{
A
Aaron Gao 已提交
560
  assert(!options.use_direct_reads || !options.use_mmap_reads);
561 562 563 564 565
  assert(!options.use_mmap_reads || sizeof(void*) < 8);
}

PosixRandomAccessFile::~PosixRandomAccessFile() { close(fd_); }

566 567 568 569
IOStatus PosixRandomAccessFile::Read(uint64_t offset, size_t n,
                                     const IOOptions& /*opts*/, Slice* result,
                                     char* scratch,
                                     IODebugContext* /*dbg*/) const {
570 571 572 573 574
  if (use_direct_io()) {
    assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
    assert(IsSectorAligned(n, GetRequiredBufferAlignment()));
    assert(IsSectorAligned(scratch, GetRequiredBufferAlignment()));
  }
575
  IOStatus s;
576 577 578 579 580 581
  ssize_t r = -1;
  size_t left = n;
  char* ptr = scratch;
  while (left > 0) {
    r = pread(fd_, ptr, left, static_cast<off_t>(offset));
    if (r <= 0) {
A
Aaron Gao 已提交
582
      if (r == -1 && errno == EINTR) {
583 584 585 586 587 588 589
        continue;
      }
      break;
    }
    ptr += r;
    offset += r;
    left -= r;
590
    if (use_direct_io() &&
A
Aaron Gao 已提交
591 592 593 594 595
        r % static_cast<ssize_t>(GetRequiredBufferAlignment()) != 0) {
      // Bytes reads don't fill sectors. Should only happen at the end
      // of the file.
      break;
    }
596 597 598
  }
  if (r < 0) {
    // An error: return a non-ok status
599 600 601
    s = IOError(
        "While pread offset " + ToString(offset) + " len " + ToString(n),
        filename_, errno);
602
  }
A
Aaron Gao 已提交
603
  *result = Slice(scratch, (r < 0) ? 0 : n - left);
604 605 606
  return s;
}

607 608 609 610
IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs,
                                          size_t num_reqs,
                                          const IOOptions& options,
                                          IODebugContext* dbg) {
611 612 613 614 615 616 617 618 619 620 621 622 623 624 625
#if defined(ROCKSDB_IOURING_PRESENT)
  struct io_uring* iu = nullptr;
  if (thread_local_io_urings_) {
    iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
    if (iu == nullptr) {
      iu = CreateIOUring();
      if (iu != nullptr) {
        thread_local_io_urings_->Reset(iu);
      }
    }
  }

  // Init failed, platform doesn't support io_uring. Fall back to
  // serialized reads
  if (iu == nullptr) {
626
    return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
627 628 629
  }

  struct WrappedReadRequest {
630
    FSReadRequest* req;
631
    struct iovec iov;
S
sdong 已提交
632 633
    size_t finished_len;
    explicit WrappedReadRequest(FSReadRequest* r) : req(r), finished_len(0) {}
634 635 636
  };

  autovector<WrappedReadRequest, 32> req_wraps;
S
sdong 已提交
637
  autovector<WrappedReadRequest*, 4> incomplete_rq_list;
638 639 640 641 642

  for (size_t i = 0; i < num_reqs; i++) {
    req_wraps.emplace_back(&reqs[i]);
  }

S
sdong 已提交
643 644 645
  size_t reqs_off = 0;
  while (num_reqs > reqs_off || !incomplete_rq_list.empty()) {
    size_t this_reqs = (num_reqs - reqs_off) + incomplete_rq_list.size();
646 647 648 649

    // If requests exceed depth, split it into batches
    if (this_reqs > kIoUringDepth) this_reqs = kIoUringDepth;

S
sdong 已提交
650
    assert(incomplete_rq_list.size() <= this_reqs);
651
    for (size_t i = 0; i < this_reqs; i++) {
S
sdong 已提交
652 653 654 655 656 657 658 659 660 661 662
      WrappedReadRequest* rep_to_submit;
      if (i < incomplete_rq_list.size()) {
        rep_to_submit = incomplete_rq_list[i];
      } else {
        rep_to_submit = &req_wraps[reqs_off++];
      }
      assert(rep_to_submit->req->len > rep_to_submit->finished_len);
      rep_to_submit->iov.iov_base =
          rep_to_submit->req->scratch + rep_to_submit->finished_len;
      rep_to_submit->iov.iov_len =
          rep_to_submit->req->len - rep_to_submit->finished_len;
663

S
sdong 已提交
664
      struct io_uring_sqe* sqe;
665
      sqe = io_uring_get_sqe(iu);
S
sdong 已提交
666 667 668 669
      io_uring_prep_readv(
          sqe, fd_, &rep_to_submit->iov, 1,
          rep_to_submit->req->offset + rep_to_submit->finished_len);
      io_uring_sqe_set_data(sqe, rep_to_submit);
670
    }
S
sdong 已提交
671
    incomplete_rq_list.clear();
672

S
sdong 已提交
673 674
    ssize_t ret =
        io_uring_submit_and_wait(iu, static_cast<unsigned int>(this_reqs));
675 676 677 678 679 680 681 682 683 684 685 686 687
    if (static_cast<size_t>(ret) != this_reqs) {
      fprintf(stderr, "ret = %ld this_reqs: %ld\n", (long)ret, (long)this_reqs);
    }
    assert(static_cast<size_t>(ret) == this_reqs);

    for (size_t i = 0; i < this_reqs; i++) {
      struct io_uring_cqe* cqe;
      WrappedReadRequest* req_wrap;

      // We could use the peek variant here, but this seems safer in terms
      // of our initial wait not reaping all completions
      ret = io_uring_wait_cqe(iu, &cqe);
      assert(!ret);
S
sdong 已提交
688

689
      req_wrap = static_cast<WrappedReadRequest*>(io_uring_cqe_get_data(cqe));
690
      FSReadRequest* req = req_wrap->req;
S
sdong 已提交
691
      if (cqe->res < 0) {
692 693
        req->result = Slice(req->scratch, 0);
        req->status = IOError("Req failed", filename_, cqe->res);
S
sdong 已提交
694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722
      } else {
        size_t bytes_read = static_cast<size_t>(cqe->res);
        TEST_SYNC_POINT_CALLBACK(
            "PosixRandomAccessFile::MultiRead:io_uring_result", &bytes_read);
        if (bytes_read == req_wrap->iov.iov_len) {
          req->result = Slice(req->scratch, req->len);
          req->status = IOStatus::OK();
        } else if (bytes_read == 0) {
          // cqe->res == 0 can means EOF, or can mean partial results. See
          // comment
          // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
          // Fall back to pread in this case.
          Slice tmp_slice;
          req->status =
              Read(req->offset + req_wrap->finished_len,
                   req->len - req_wrap->finished_len, options, &tmp_slice,
                   req->scratch + req_wrap->finished_len, dbg);
          req->result =
              Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
        } else if (bytes_read < req_wrap->iov.iov_len) {
          assert(bytes_read > 0);
          assert(bytes_read + req_wrap->finished_len < req->len);
          req_wrap->finished_len += bytes_read;
          incomplete_rq_list.push_back(req_wrap);
        } else {
          req->result = Slice(req->scratch, 0);
          req->status = IOError("Req returned more bytes than requested",
                                filename_, cqe->res);
        }
723 724 725 726
      }
      io_uring_cqe_seen(iu, cqe);
    }
  }
727
  return IOStatus::OK();
728
#else
729
  return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
730 731 732
#endif
}

733 734 735 736
IOStatus PosixRandomAccessFile::Prefetch(uint64_t offset, size_t n,
                                         const IOOptions& /*opts*/,
                                         IODebugContext* /*dbg*/) {
  IOStatus s;
737 738 739 740 741 742 743 744 745 746 747 748
  if (!use_direct_io()) {
    ssize_t r = 0;
#ifdef OS_LINUX
    r = readahead(fd_, offset, n);
#endif
#ifdef OS_MACOSX
    radvisory advice;
    advice.ra_offset = static_cast<off_t>(offset);
    advice.ra_count = static_cast<int>(n);
    r = fcntl(fd_, F_RDADVISE, &advice);
#endif
    if (r == -1) {
749 750 751
      s = IOError("While prefetching offset " + ToString(offset) + " len " +
                      ToString(n),
                  filename_, errno);
752 753 754 755 756
    }
  }
  return s;
}

T
Tomas Kolda 已提交
757
#if defined(OS_LINUX) || defined(OS_MACOSX) || defined(OS_AIX)
758
size_t PosixRandomAccessFile::GetUniqueId(char* id, size_t max_size) const {
K
krad 已提交
759
  return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size);
760 761 762 763
}
#endif

void PosixRandomAccessFile::Hint(AccessPattern pattern) {
764
  if (use_direct_io()) {
A
Aaron Gao 已提交
765 766
    return;
  }
767
  switch (pattern) {
768
    case kNormal:
769 770
      Fadvise(fd_, 0, 0, POSIX_FADV_NORMAL);
      break;
771
    case kRandom:
772 773
      Fadvise(fd_, 0, 0, POSIX_FADV_RANDOM);
      break;
774
    case kSequential:
775 776
      Fadvise(fd_, 0, 0, POSIX_FADV_SEQUENTIAL);
      break;
777
    case kWillNeed:
778 779
      Fadvise(fd_, 0, 0, POSIX_FADV_WILLNEED);
      break;
780
    case kWontNeed:
781 782 783 784 785 786 787 788
      Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED);
      break;
    default:
      assert(false);
      break;
  }
}

789
IOStatus PosixRandomAccessFile::InvalidateCache(size_t offset, size_t length) {
790
  if (use_direct_io()) {
791
    return IOStatus::OK();
A
Aaron Gao 已提交
792
  }
793
#ifndef OS_LINUX
794 795
  (void)offset;
  (void)length;
796
  return IOStatus::OK();
797 798 799 800
#else
  // free OS pages
  int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
  if (ret == 0) {
801
    return IOStatus::OK();
802
  }
803 804 805
  return IOError("While fadvise NotNeeded offset " + ToString(offset) +
                     " len " + ToString(length),
                 filename_, errno);
806 807 808 809 810 811 812 813 814 815 816 817 818 819
#endif
}

/*
 * PosixMmapReadableFile
 *
 * mmap() based random-access
 */
// base[0,length-1] contains the mmapped contents of the file.
PosixMmapReadableFile::PosixMmapReadableFile(const int fd,
                                             const std::string& fname,
                                             void* base, size_t length,
                                             const EnvOptions& options)
    : fd_(fd), filename_(fname), mmapped_region_(base), length_(length) {
820 821 822
#ifdef NDEBUG
  (void)options;
#endif
823 824
  fd_ = fd_ + 0;  // suppress the warning for used variables
  assert(options.use_mmap_reads);
A
Aaron Gao 已提交
825
  assert(!options.use_direct_reads);
826 827 828 829 830 831 832 833
}

PosixMmapReadableFile::~PosixMmapReadableFile() {
  int ret = munmap(mmapped_region_, length_);
  if (ret != 0) {
    fprintf(stdout, "failed to munmap %p length %" ROCKSDB_PRIszt " \n",
            mmapped_region_, length_);
  }
834
  close(fd_);
835 836
}

837 838 839 840 841
IOStatus PosixMmapReadableFile::Read(uint64_t offset, size_t n,
                                     const IOOptions& /*opts*/, Slice* result,
                                     char* /*scratch*/,
                                     IODebugContext* /*dbg*/) const {
  IOStatus s;
842 843
  if (offset > length_) {
    *result = Slice();
844 845 846
    return IOError("While mmap read offset " + ToString(offset) +
                       " larger than file length " + ToString(length_),
                   filename_, EINVAL);
847 848 849 850 851 852 853
  } else if (offset + n > length_) {
    n = static_cast<size_t>(length_ - offset);
  }
  *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
  return s;
}

854
IOStatus PosixMmapReadableFile::InvalidateCache(size_t offset, size_t length) {
855
#ifndef OS_LINUX
856 857
  (void)offset;
  (void)length;
858
  return IOStatus::OK();
859 860 861 862
#else
  // free OS pages
  int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
  if (ret == 0) {
863
    return IOStatus::OK();
864
  }
865 866 867
  return IOError("While fadvise not needed. Offset " + ToString(offset) +
                     " len" + ToString(length),
                 filename_, errno);
868 869 870 871 872 873 874 875 876 877 878
#endif
}

/*
 * PosixMmapFile
 *
 * We preallocate up to an extra megabyte and use memcpy to append new
 * data to the file.  This is safe since we either properly close the
 * file before reading from it, or for log files, the reading code
 * knows enough to skip zero suffixes.
 */
879
IOStatus PosixMmapFile::UnmapCurrentRegion() {
880 881 882 883
  TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0", rocksdb_kill_odds);
  if (base_ != nullptr) {
    int munmap_status = munmap(base_, limit_ - base_);
    if (munmap_status != 0) {
884
      return IOError("While munmap", filename_, munmap_status);
885 886 887 888 889 890 891 892 893 894 895 896
    }
    file_offset_ += limit_ - base_;
    base_ = nullptr;
    limit_ = nullptr;
    last_sync_ = nullptr;
    dst_ = nullptr;

    // Increase the amount we map the next time, but capped at 1MB
    if (map_size_ < (1 << 20)) {
      map_size_ *= 2;
    }
  }
897
  return IOStatus::OK();
898 899
}

900
IOStatus PosixMmapFile::MapNewRegion() {
901 902 903 904 905 906 907 908 909 910 911 912
#ifdef ROCKSDB_FALLOCATE_PRESENT
  assert(base_ == nullptr);
  TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0", rocksdb_kill_odds);
  // we can't fallocate with FALLOC_FL_KEEP_SIZE here
  if (allow_fallocate_) {
    IOSTATS_TIMER_GUARD(allocate_nanos);
    int alloc_status = fallocate(fd_, 0, file_offset_, map_size_);
    if (alloc_status != 0) {
      // fallback to posix_fallocate
      alloc_status = posix_fallocate(fd_, file_offset_, map_size_);
    }
    if (alloc_status != 0) {
913 914
      return IOStatus::IOError("Error allocating space to file : " + filename_ +
                               "Error : " + strerror(alloc_status));
915 916 917 918 919 920 921
    }
  }

  TEST_KILL_RANDOM("PosixMmapFile::Append:1", rocksdb_kill_odds);
  void* ptr = mmap(nullptr, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_,
                   file_offset_);
  if (ptr == MAP_FAILED) {
922
    return IOStatus::IOError("MMap failed on " + filename_);
923 924 925 926 927 928 929
  }
  TEST_KILL_RANDOM("PosixMmapFile::Append:2", rocksdb_kill_odds);

  base_ = reinterpret_cast<char*>(ptr);
  limit_ = base_ + map_size_;
  dst_ = base_;
  last_sync_ = base_;
930
  return IOStatus::OK();
931
#else
932
  return IOStatus::NotSupported("This platform doesn't support fallocate()");
933 934 935
#endif
}

936
IOStatus PosixMmapFile::Msync() {
937
  if (dst_ == last_sync_) {
938
    return IOStatus::OK();
939 940 941 942 943 944 945 946
  }
  // Find the beginnings of the pages that contain the first and last
  // bytes to be synced.
  size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
  size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
  last_sync_ = dst_;
  TEST_KILL_RANDOM("PosixMmapFile::Msync:0", rocksdb_kill_odds);
  if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
947
    return IOError("While msync", filename_, errno);
948
  }
949
  return IOStatus::OK();
950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965
}

PosixMmapFile::PosixMmapFile(const std::string& fname, int fd, size_t page_size,
                             const EnvOptions& options)
    : filename_(fname),
      fd_(fd),
      page_size_(page_size),
      map_size_(Roundup(65536, page_size)),
      base_(nullptr),
      limit_(nullptr),
      dst_(nullptr),
      last_sync_(nullptr),
      file_offset_(0) {
#ifdef ROCKSDB_FALLOCATE_PRESENT
  allow_fallocate_ = options.allow_fallocate;
  fallocate_with_keep_size_ = options.fallocate_with_keep_size;
966 967
#else
  (void)options;
968 969 970
#endif
  assert((page_size & (page_size - 1)) == 0);
  assert(options.use_mmap_writes);
A
Aaron Gao 已提交
971
  assert(!options.use_direct_writes);
972 973 974 975
}

PosixMmapFile::~PosixMmapFile() {
  if (fd_ >= 0) {
976
    PosixMmapFile::Close(IOOptions(), nullptr);
977 978 979
  }
}

980 981
IOStatus PosixMmapFile::Append(const Slice& data, const IOOptions& /*opts*/,
                               IODebugContext* /*dbg*/) {
982 983 984 985 986 987 988
  const char* src = data.data();
  size_t left = data.size();
  while (left > 0) {
    assert(base_ <= dst_);
    assert(dst_ <= limit_);
    size_t avail = limit_ - dst_;
    if (avail == 0) {
989
      IOStatus s = UnmapCurrentRegion();
990 991 992 993 994 995 996 997 998 999 1000
      if (!s.ok()) {
        return s;
      }
      s = MapNewRegion();
      if (!s.ok()) {
        return s;
      }
      TEST_KILL_RANDOM("PosixMmapFile::Append:0", rocksdb_kill_odds);
    }

    size_t n = (left <= avail) ? left : avail;
1001
    assert(dst_);
1002 1003 1004 1005 1006
    memcpy(dst_, src, n);
    dst_ += n;
    src += n;
    left -= n;
  }
1007
  return IOStatus::OK();
1008 1009
}

1010 1011 1012
IOStatus PosixMmapFile::Close(const IOOptions& /*opts*/,
                              IODebugContext* /*dbg*/) {
  IOStatus s;
1013 1014 1015 1016
  size_t unused = limit_ - dst_;

  s = UnmapCurrentRegion();
  if (!s.ok()) {
1017
    s = IOError("While closing mmapped file", filename_, errno);
1018 1019 1020
  } else if (unused > 0) {
    // Trim the extra space at the end of the file
    if (ftruncate(fd_, file_offset_ - unused) < 0) {
1021
      s = IOError("While ftruncating mmaped file", filename_, errno);
1022 1023 1024 1025 1026
    }
  }

  if (close(fd_) < 0) {
    if (s.ok()) {
1027
      s = IOError("While closing mmapped file", filename_, errno);
1028 1029 1030 1031 1032 1033 1034 1035 1036
    }
  }

  fd_ = -1;
  base_ = nullptr;
  limit_ = nullptr;
  return s;
}

1037 1038 1039 1040
IOStatus PosixMmapFile::Flush(const IOOptions& /*opts*/,
                              IODebugContext* /*dbg*/) {
  return IOStatus::OK();
}
1041

1042 1043
IOStatus PosixMmapFile::Sync(const IOOptions& /*opts*/,
                             IODebugContext* /*dbg*/) {
1044
  if (fdatasync(fd_) < 0) {
1045
    return IOError("While fdatasync mmapped file", filename_, errno);
1046 1047 1048 1049 1050 1051 1052 1053
  }

  return Msync();
}

/**
 * Flush data as well as metadata to stable storage.
 */
1054 1055
IOStatus PosixMmapFile::Fsync(const IOOptions& /*opts*/,
                              IODebugContext* /*dbg*/) {
1056
  if (fsync(fd_) < 0) {
1057
    return IOError("While fsync mmaped file", filename_, errno);
1058 1059 1060 1061 1062 1063 1064 1065 1066 1067
  }

  return Msync();
}

/**
 * Get the size of valid data in the file. This will not match the
 * size that is returned from the filesystem because we use mmap
 * to extend file by map_size every time.
 */
1068 1069
uint64_t PosixMmapFile::GetFileSize(const IOOptions& /*opts*/,
                                    IODebugContext* /*dbg*/) {
1070 1071 1072 1073
  size_t used = dst_ - base_;
  return file_offset_ + used;
}

1074
IOStatus PosixMmapFile::InvalidateCache(size_t offset, size_t length) {
1075
#ifndef OS_LINUX
1076 1077
  (void)offset;
  (void)length;
1078
  return IOStatus::OK();
1079 1080 1081 1082
#else
  // free OS pages
  int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
  if (ret == 0) {
1083
    return IOStatus::OK();
1084
  }
1085
  return IOError("While fadvise NotNeeded mmapped file", filename_, errno);
1086 1087 1088
#endif
}

1089
#ifdef ROCKSDB_FALLOCATE_PRESENT
1090 1091 1092
IOStatus PosixMmapFile::Allocate(uint64_t offset, uint64_t len,
                                 const IOOptions& /*opts*/,
                                 IODebugContext* /*dbg*/) {
Y
Yi Wu 已提交
1093 1094
  assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
  assert(len <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1095 1096 1097
  TEST_KILL_RANDOM("PosixMmapFile::Allocate:0", rocksdb_kill_odds);
  int alloc_status = 0;
  if (allow_fallocate_) {
1098 1099 1100
    alloc_status =
        fallocate(fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0,
                  static_cast<off_t>(offset), static_cast<off_t>(len));
1101 1102
  }
  if (alloc_status == 0) {
1103
    return IOStatus::OK();
1104
  } else {
1105 1106 1107
    return IOError(
        "While fallocate offset " + ToString(offset) + " len " + ToString(len),
        filename_, errno);
1108
  }
1109
}
1110
#endif
1111 1112 1113 1114 1115 1116 1117

/*
 * PosixWritableFile
 *
 * Use posix write to write data to a file.
 */
PosixWritableFile::PosixWritableFile(const std::string& fname, int fd,
1118
                                     size_t logical_block_size,
1119
                                     const EnvOptions& options)
1120
    : FSWritableFile(options),
1121
      filename_(fname),
1122
      use_direct_io_(options.use_direct_writes),
A
Aaron Gao 已提交
1123
      fd_(fd),
A
Aaron Gao 已提交
1124
      filesize_(0),
1125
      logical_sector_size_(logical_block_size) {
1126 1127 1128 1129
#ifdef ROCKSDB_FALLOCATE_PRESENT
  allow_fallocate_ = options.allow_fallocate;
  fallocate_with_keep_size_ = options.fallocate_with_keep_size;
#endif
1130 1131 1132
#ifdef ROCKSDB_RANGESYNC_PRESENT
  sync_file_range_supported_ = IsSyncFileRangeSupported(fd_);
#endif  // ROCKSDB_RANGESYNC_PRESENT
1133 1134 1135 1136 1137
  assert(!options.use_mmap_writes);
}

PosixWritableFile::~PosixWritableFile() {
  if (fd_ >= 0) {
1138
    PosixWritableFile::Close(IOOptions(), nullptr);
1139 1140 1141
  }
}

1142 1143
IOStatus PosixWritableFile::Append(const Slice& data, const IOOptions& /*opts*/,
                                   IODebugContext* /*dbg*/) {
1144 1145 1146 1147
  if (use_direct_io()) {
    assert(IsSectorAligned(data.size(), GetRequiredBufferAlignment()));
    assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment()));
  }
1148
  const char* src = data.data();
1149 1150 1151 1152
  size_t nbytes = data.size();

  if (!PosixWrite(fd_, src, nbytes)) {
    return IOError("While appending to file", filename_, errno);
1153
  }
1154 1155

  filesize_ += nbytes;
1156
  return IOStatus::OK();
1157 1158
}

1159 1160 1161
IOStatus PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset,
                                             const IOOptions& /*opts*/,
                                             IODebugContext* /*dbg*/) {
1162 1163 1164 1165 1166
  if (use_direct_io()) {
    assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
    assert(IsSectorAligned(data.size(), GetRequiredBufferAlignment()));
    assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment()));
  }
Y
Yi Wu 已提交
1167
  assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1168
  const char* src = data.data();
1169 1170 1171 1172
  size_t nbytes = data.size();
  if (!PosixPositionedWrite(fd_, src, nbytes, static_cast<off_t>(offset))) {
    return IOError("While pwrite to file at offset " + ToString(offset),
                   filename_, errno);
1173
  }
1174
  filesize_ = offset + nbytes;
1175
  return IOStatus::OK();
1176 1177
}

1178 1179 1180
IOStatus PosixWritableFile::Truncate(uint64_t size, const IOOptions& /*opts*/,
                                     IODebugContext* /*dbg*/) {
  IOStatus s;
A
Aaron Gao 已提交
1181 1182
  int r = ftruncate(fd_, size);
  if (r < 0) {
1183 1184
    s = IOError("While ftruncate file to size " + ToString(size), filename_,
                errno);
A
Aaron Gao 已提交
1185 1186 1187 1188 1189 1190
  } else {
    filesize_ = size;
  }
  return s;
}

1191 1192 1193
IOStatus PosixWritableFile::Close(const IOOptions& /*opts*/,
                                  IODebugContext* /*dbg*/) {
  IOStatus s;
1194 1195 1196 1197 1198 1199 1200 1201

  size_t block_size;
  size_t last_allocated_block;
  GetPreallocationStatus(&block_size, &last_allocated_block);
  if (last_allocated_block > 0) {
    // trim the extra space preallocated at the end of the file
    // NOTE(ljin): we probably don't want to surface failure as an IOError,
    // but it will be nice to log these errors.
T
Tamir Duberstein 已提交
1202
    int dummy __attribute__((__unused__));
1203
    dummy = ftruncate(fd_, filesize_);
1204
#if defined(ROCKSDB_FALLOCATE_PRESENT) && defined(FALLOC_FL_PUNCH_HOLE) && \
1205
    !defined(TRAVIS)
1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221
    // in some file systems, ftruncate only trims trailing space if the
    // new file size is smaller than the current size. Calling fallocate
    // with FALLOC_FL_PUNCH_HOLE flag to explicitly release these unused
    // blocks. FALLOC_FL_PUNCH_HOLE is supported on at least the following
    // filesystems:
    //   XFS (since Linux 2.6.38)
    //   ext4 (since Linux 3.0)
    //   Btrfs (since Linux 3.7)
    //   tmpfs (since Linux 3.5)
    // We ignore error since failure of this operation does not affect
    // correctness.
    // TRAVIS - this code does not work on TRAVIS filesystems.
    // the FALLOC_FL_KEEP_SIZE option is expected to not change the size
    // of the file, but it does. Simple strace report will show that.
    // While we work with Travis-CI team to figure out if this is a
    // quirk of Docker/AUFS, we will comment this out.
1222
    struct stat file_stats;
1223
    int result = fstat(fd_, &file_stats);
1224 1225
    // After ftruncate, we check whether ftruncate has the correct behavior.
    // If not, we should hack it with FALLOC_FL_PUNCH_HOLE
1226 1227
    if (result == 0 &&
        (file_stats.st_size + file_stats.st_blksize - 1) /
1228 1229
                file_stats.st_blksize !=
            file_stats.st_blocks / (file_stats.st_blksize / 512)) {
1230 1231 1232 1233 1234
      IOSTATS_TIMER_GUARD(allocate_nanos);
      if (allow_fallocate_) {
        fallocate(fd_, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE, filesize_,
                  block_size * last_allocated_block - filesize_);
      }
1235 1236
    }
#endif
1237 1238 1239
  }

  if (close(fd_) < 0) {
1240
    s = IOError("While closing file after writing", filename_, errno);
1241 1242 1243 1244 1245 1246
  }
  fd_ = -1;
  return s;
}

// write out the cached data to the OS cache
1247 1248 1249 1250
IOStatus PosixWritableFile::Flush(const IOOptions& /*opts*/,
                                  IODebugContext* /*dbg*/) {
  return IOStatus::OK();
}
1251

1252 1253
IOStatus PosixWritableFile::Sync(const IOOptions& /*opts*/,
                                 IODebugContext* /*dbg*/) {
1254
  if (fdatasync(fd_) < 0) {
1255
    return IOError("While fdatasync", filename_, errno);
1256
  }
1257
  return IOStatus::OK();
1258 1259
}

1260 1261
IOStatus PosixWritableFile::Fsync(const IOOptions& /*opts*/,
                                  IODebugContext* /*dbg*/) {
1262
  if (fsync(fd_) < 0) {
1263
    return IOError("While fsync", filename_, errno);
1264
  }
1265
  return IOStatus::OK();
1266 1267 1268 1269
}

bool PosixWritableFile::IsSyncThreadSafe() const { return true; }

1270 1271 1272 1273
uint64_t PosixWritableFile::GetFileSize(const IOOptions& /*opts*/,
                                        IODebugContext* /*dbg*/) {
  return filesize_;
}
1274

S
Stream  
Shaohua Li 已提交
1275 1276
void PosixWritableFile::SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) {
#ifdef OS_LINUX
1277 1278
// Suppress Valgrind "Unimplemented functionality" error.
#ifndef ROCKSDB_VALGRIND_RUN
S
Stream  
Shaohua Li 已提交
1279 1280 1281 1282 1283 1284
  if (hint == write_hint_) {
    return;
  }
  if (fcntl(fd_, F_SET_RW_HINT, &hint) == 0) {
    write_hint_ = hint;
  }
1285 1286
#else
  (void)hint;
1287
#endif  // ROCKSDB_VALGRIND_RUN
1288 1289
#else
  (void)hint;
1290
#endif  // OS_LINUX
S
Stream  
Shaohua Li 已提交
1291 1292
}

1293
IOStatus PosixWritableFile::InvalidateCache(size_t offset, size_t length) {
1294
  if (use_direct_io()) {
1295
    return IOStatus::OK();
A
Aaron Gao 已提交
1296
  }
1297
#ifndef OS_LINUX
1298 1299
  (void)offset;
  (void)length;
1300
  return IOStatus::OK();
1301 1302 1303 1304
#else
  // free OS pages
  int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
  if (ret == 0) {
1305
    return IOStatus::OK();
1306
  }
1307
  return IOError("While fadvise NotNeeded", filename_, errno);
1308 1309 1310
#endif
}

1311
#ifdef ROCKSDB_FALLOCATE_PRESENT
1312 1313 1314
IOStatus PosixWritableFile::Allocate(uint64_t offset, uint64_t len,
                                     const IOOptions& /*opts*/,
                                     IODebugContext* /*dbg*/) {
Y
Yi Wu 已提交
1315 1316
  assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
  assert(len <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1317 1318 1319 1320
  TEST_KILL_RANDOM("PosixWritableFile::Allocate:0", rocksdb_kill_odds);
  IOSTATS_TIMER_GUARD(allocate_nanos);
  int alloc_status = 0;
  if (allow_fallocate_) {
1321 1322 1323
    alloc_status =
        fallocate(fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0,
                  static_cast<off_t>(offset), static_cast<off_t>(len));
1324 1325
  }
  if (alloc_status == 0) {
1326
    return IOStatus::OK();
1327
  } else {
1328 1329 1330
    return IOError(
        "While fallocate offset " + ToString(offset) + " len " + ToString(len),
        filename_, errno);
1331 1332
  }
}
1333
#endif
1334

1335 1336 1337
IOStatus PosixWritableFile::RangeSync(uint64_t offset, uint64_t nbytes,
                                      const IOOptions& opts,
                                      IODebugContext* dbg) {
1338
#ifdef ROCKSDB_RANGESYNC_PRESENT
Y
Yi Wu 已提交
1339 1340
  assert(offset <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
  assert(nbytes <= static_cast<uint64_t>(std::numeric_limits<off_t>::max()));
1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357
  if (sync_file_range_supported_) {
    int ret;
    if (strict_bytes_per_sync_) {
      // Specifying `SYNC_FILE_RANGE_WAIT_BEFORE` together with an offset/length
      // that spans all bytes written so far tells `sync_file_range` to wait for
      // any outstanding writeback requests to finish before issuing a new one.
      ret =
          sync_file_range(fd_, 0, static_cast<off_t>(offset + nbytes),
                          SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE);
    } else {
      ret = sync_file_range(fd_, static_cast<off_t>(offset),
                            static_cast<off_t>(nbytes), SYNC_FILE_RANGE_WRITE);
    }
    if (ret != 0) {
      return IOError("While sync_file_range returned " + ToString(ret),
                     filename_, errno);
    }
1358
    return IOStatus::OK();
1359
  }
1360
#endif  // ROCKSDB_RANGESYNC_PRESENT
1361
  return FSWritableFile::RangeSync(offset, nbytes, opts, dbg);
1362 1363
}

T
Tomas Kolda 已提交
1364
#ifdef OS_LINUX
1365
size_t PosixWritableFile::GetUniqueId(char* id, size_t max_size) const {
K
krad 已提交
1366
  return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size);
1367
}
1368
#endif
1369

I
Islam AbdelRahman 已提交
1370 1371 1372 1373 1374
/*
 * PosixRandomRWFile
 */

PosixRandomRWFile::PosixRandomRWFile(const std::string& fname, int fd,
A
Andrew Kryczka 已提交
1375
                                     const EnvOptions& /*options*/)
I
Islam AbdelRahman 已提交
1376 1377 1378 1379
    : filename_(fname), fd_(fd) {}

PosixRandomRWFile::~PosixRandomRWFile() {
  if (fd_ >= 0) {
1380
    Close(IOOptions(), nullptr);
I
Islam AbdelRahman 已提交
1381 1382 1383
  }
}

1384 1385 1386
IOStatus PosixRandomRWFile::Write(uint64_t offset, const Slice& data,
                                  const IOOptions& /*opts*/,
                                  IODebugContext* /*dbg*/) {
I
Islam AbdelRahman 已提交
1387
  const char* src = data.data();
1388 1389 1390 1391 1392
  size_t nbytes = data.size();
  if (!PosixPositionedWrite(fd_, src, nbytes, static_cast<off_t>(offset))) {
    return IOError(
        "While write random read/write file at offset " + ToString(offset),
        filename_, errno);
I
Islam AbdelRahman 已提交
1393 1394
  }

1395
  return IOStatus::OK();
I
Islam AbdelRahman 已提交
1396 1397
}

1398 1399 1400
IOStatus PosixRandomRWFile::Read(uint64_t offset, size_t n,
                                 const IOOptions& /*opts*/, Slice* result,
                                 char* scratch, IODebugContext* /*dbg*/) const {
I
Islam AbdelRahman 已提交
1401 1402 1403 1404 1405 1406 1407 1408 1409 1410
  size_t left = n;
  char* ptr = scratch;
  while (left > 0) {
    ssize_t done = pread(fd_, ptr, left, offset);
    if (done < 0) {
      // error while reading from file
      if (errno == EINTR) {
        // read was interrupted, try again.
        continue;
      }
1411 1412 1413
      return IOError("While reading random read/write file offset " +
                         ToString(offset) + " len " + ToString(n),
                     filename_, errno);
I
Islam AbdelRahman 已提交
1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425
    } else if (done == 0) {
      // Nothing more to read
      break;
    }

    // Read `done` bytes
    ptr += done;
    offset += done;
    left -= done;
  }

  *result = Slice(scratch, n - left);
1426
  return IOStatus::OK();
I
Islam AbdelRahman 已提交
1427 1428
}

1429 1430 1431 1432
IOStatus PosixRandomRWFile::Flush(const IOOptions& /*opts*/,
                                  IODebugContext* /*dbg*/) {
  return IOStatus::OK();
}
I
Islam AbdelRahman 已提交
1433

1434 1435
IOStatus PosixRandomRWFile::Sync(const IOOptions& /*opts*/,
                                 IODebugContext* /*dbg*/) {
I
Islam AbdelRahman 已提交
1436
  if (fdatasync(fd_) < 0) {
1437
    return IOError("While fdatasync random read/write file", filename_, errno);
I
Islam AbdelRahman 已提交
1438
  }
1439
  return IOStatus::OK();
I
Islam AbdelRahman 已提交
1440 1441
}

1442 1443
IOStatus PosixRandomRWFile::Fsync(const IOOptions& /*opts*/,
                                  IODebugContext* /*dbg*/) {
I
Islam AbdelRahman 已提交
1444
  if (fsync(fd_) < 0) {
1445
    return IOError("While fsync random read/write file", filename_, errno);
I
Islam AbdelRahman 已提交
1446
  }
1447
  return IOStatus::OK();
I
Islam AbdelRahman 已提交
1448 1449
}

1450 1451
IOStatus PosixRandomRWFile::Close(const IOOptions& /*opts*/,
                                  IODebugContext* /*dbg*/) {
I
Islam AbdelRahman 已提交
1452
  if (close(fd_) < 0) {
1453
    return IOError("While close random read/write file", filename_, errno);
I
Islam AbdelRahman 已提交
1454 1455
  }
  fd_ = -1;
1456
  return IOStatus::OK();
I
Islam AbdelRahman 已提交
1457 1458
}

1459 1460
PosixMemoryMappedFileBuffer::~PosixMemoryMappedFileBuffer() {
  // TODO should have error handling though not much we can do...
D
Dmitri Smirnov 已提交
1461
  munmap(this->base_, length_);
1462 1463
}

K
krad 已提交
1464 1465 1466 1467
/*
 * PosixDirectory
 */

1468 1469
PosixDirectory::~PosixDirectory() { close(fd_); }

1470 1471
IOStatus PosixDirectory::Fsync(const IOOptions& /*opts*/,
                               IODebugContext* /*dbg*/) {
T
Tomas Kolda 已提交
1472
#ifndef OS_AIX
1473
  if (fsync(fd_) == -1) {
1474
    return IOError("While fsync", "a directory", errno);
1475
  }
T
Tomas Kolda 已提交
1476
#endif
1477
  return IOStatus::OK();
1478
}
1479
}  // namespace ROCKSDB_NAMESPACE
1480
#endif