提交 c364eb42 编写于 作者: D Dmitri Smirnov 提交者: Facebook Github Bot

Windows cumulative patch

Summary:
This patch addressed several issues.
  Portability including db_test std::thread -> port::Thread Cc: @
  and %z to ROCKSDB portable macro. Cc: maysamyabandeh

  Implement Env::AreFilesSame

  Make the implementation of file unique number more robust

  Get rid of C-runtime and go directly to Windows API when dealing
  with file primitives.

  Implement GetSectorSize() and aling unbuffered read on the value if
  available.

  Adjust Windows Logger for the new interface, implement CloseImpl() Cc: anand1976

  Fix test running script issue where $status var was of incorrect scope
  so the failures were swallowed and not reported.

  DestroyDB() creates a logger and opens a LOG file in the directory
  being cleaned up. This holds a lock on the folder and the cleanup is
  prevented. This fails one of the checkpoin tests. We observe the same in production.
  We close the log file in this change.

 Fix DBTest2.ReadAmpBitmapLiveInCacheAfterDBClose failure where the test
 attempts to open a directory with NewRandomAccessFile which does not
 work on Windows.
  Fix DBTest.SoftLimit as it is dependent on thread timing. CC: yiwu-arbug
Closes https://github.com/facebook/rocksdb/pull/3552

Differential Revision: D7156304

Pulled By: siying

fbshipit-source-id: 43db0a757f1dfceffeb2b7988043156639173f5b
上级 b864bc9b
......@@ -336,7 +336,7 @@ $InvokeTestAsync = {
# Test limiting factor here
[int]$count = 0
# Overall status
[bool]$success = $true;
[bool]$script:success = $true;
function RunJobs($Suites, $TestCmds, [int]$ConcurrencyVal)
{
......@@ -425,7 +425,7 @@ function RunJobs($Suites, $TestCmds, [int]$ConcurrencyVal)
$log_content = @(Get-Content $log)
if($completed.State -ne "Completed") {
$success = $false
$script:success = $false
Write-Warning $message
$log_content | Write-Warning
} else {
......@@ -449,7 +449,7 @@ function RunJobs($Suites, $TestCmds, [int]$ConcurrencyVal)
}
if(!$pass_found) {
$success = $false;
$script:success = $false;
Write-Warning $message
$log_content | Write-Warning
} else {
......@@ -473,7 +473,7 @@ New-TimeSpan -Start $StartDate -End $EndDate |
}
if(!$success) {
if(!$script:success) {
# This does not succeed killing off jobs quick
# So we simply exit
# Remove-Job -Job $jobs -Force
......
......@@ -2389,10 +2389,13 @@ Snapshot::~Snapshot() {
}
Status DestroyDB(const std::string& dbname, const Options& options) {
const ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
Env* env = soptions.env;
std::vector<std::string> filenames;
// Reset the logger because it holds a handle to the
// log file and prevents cleanup and directory removal
soptions.info_log.reset();
// Ignore error in case directory does not exist
env->GetChildren(dbname, &filenames);
......
......@@ -5153,7 +5153,7 @@ TEST_F(DBTest, AutomaticConflictsWithManualCompaction) {
}
ASSERT_OK(Flush());
}
std::thread manual_compaction_thread([this]() {
port::Thread manual_compaction_thread([this]() {
CompactRangeOptions croptions;
croptions.exclusive_manual_compaction = true;
ASSERT_OK(db_->CompactRange(croptions, nullptr, nullptr));
......@@ -5393,18 +5393,42 @@ TEST_F(DBTest, HardLimit) {
#ifndef ROCKSDB_LITE
class WriteStallListener : public EventListener {
public:
WriteStallListener() : condition_(WriteStallCondition::kNormal) {}
WriteStallListener() : cond_(&mutex_),
condition_(WriteStallCondition::kNormal),
expected_(WriteStallCondition::kNormal),
expected_set_(false)
{}
void OnStallConditionsChanged(const WriteStallInfo& info) override {
MutexLock l(&mutex_);
condition_ = info.condition.cur;
if (expected_set_ &&
condition_ == expected_) {
cond_.Signal();
expected_set_ = false;
}
}
bool CheckCondition(WriteStallCondition expected) {
MutexLock l(&mutex_);
return expected == condition_;
if (expected != condition_) {
expected_ = expected;
expected_set_ = true;
while (expected != condition_) {
// We bail out on timeout 500 milliseconds
const uint64_t timeout_us = 500000;
if (cond_.TimedWait(timeout_us)) {
expected_set_ = false;
return false;
}
}
}
return true;
}
private:
port::Mutex mutex_;
port::Mutex mutex_;
port::CondVar cond_;
WriteStallCondition condition_;
WriteStallCondition expected_;
bool expected_set_;
};
TEST_F(DBTest, SoftLimit) {
......@@ -5743,7 +5767,7 @@ TEST_F(DBTest, ThreadLocalPtrDeadlock) {
return flushes_done.load() > 10;
};
std::thread flushing_thread([&] {
port::Thread flushing_thread([&] {
for (int i = 0; !done(); ++i) {
ASSERT_OK(db_->Put(WriteOptions(), Slice("hi"),
Slice(std::to_string(i).c_str())));
......@@ -5753,12 +5777,12 @@ TEST_F(DBTest, ThreadLocalPtrDeadlock) {
}
});
std::vector<std::thread> thread_spawning_threads(10);
std::vector<port::Thread> thread_spawning_threads(10);
for (auto& t: thread_spawning_threads) {
t = std::thread([&] {
t = port::Thread([&] {
while (!done()) {
{
std::thread tmp_thread([&] {
port::Thread tmp_thread([&] {
auto it = db_->NewIterator(ReadOptions());
delete it;
});
......
......@@ -1822,14 +1822,26 @@ TEST_F(DBTest2, ReadAmpBitmapLiveInCacheAfterDBClose) {
{
const int kIdBufLen = 100;
char id_buf[kIdBufLen];
#ifndef OS_WIN
// You can't open a directory on windows using random access file
std::unique_ptr<RandomAccessFile> file;
env_->NewRandomAccessFile(dbname_, &file, EnvOptions());
ASSERT_OK(env_->NewRandomAccessFile(dbname_, &file, EnvOptions()));
if (file->GetUniqueId(id_buf, kIdBufLen) == 0) {
// fs holding db directory doesn't support getting a unique file id,
// this means that running this test will fail because lru_cache will load
// the blocks again regardless of them being already in the cache
return;
}
#else
std::unique_ptr<Directory> dir;
ASSERT_OK(env_->NewDirectory(dbname_, &dir));
if (dir->GetUniqueId(id_buf, kIdBufLen) == 0) {
// fs holding db directory doesn't support getting a unique file id,
// this means that running this test will fail because lru_cache will load
// the blocks again regardless of them being already in the cache
return;
}
#endif
}
uint32_t bytes_per_bit[2] = {1, 16};
for (size_t k = 0; k < 2; k++) {
......
......@@ -135,6 +135,37 @@ TEST_F(EnvPosixTest, RunImmediately) {
}
}
#ifdef OS_WIN
TEST_F(EnvPosixTest, AreFilesSame) {
{
bool tmp;
if (env_->AreFilesSame("", "", &tmp).IsNotSupported()) {
fprintf(stderr,
"skipping EnvBasicTestWithParam.AreFilesSame due to "
"unsupported Env::AreFilesSame\n");
return;
}
}
const EnvOptions soptions;
auto* env = Env::Default();
std::string same_file_name = test::TmpDir(env) + "/same_file";
std::string same_file_link_name = same_file_name + "_link";
std::unique_ptr<WritableFile> same_file;
ASSERT_OK(env->NewWritableFile(same_file_name,
&same_file, soptions));
same_file->Append("random_data");
ASSERT_OK(same_file->Flush());
same_file.reset();
ASSERT_OK(env->LinkFile(same_file_name, same_file_link_name));
bool result = false;
ASSERT_OK(env->AreFilesSame(same_file_name, same_file_link_name, &result));
ASSERT_TRUE(result);
}
#endif
TEST_P(EnvPosixTestWithParam, UnSchedule) {
std::atomic<bool> called(false);
env_->SetBackgroundThreads(1, Env::LOW);
......
......@@ -802,6 +802,10 @@ class Directory {
virtual ~Directory() {}
// Fsync directory. Can be called concurrently from multiple threads.
virtual Status Fsync() = 0;
virtual size_t GetUniqueId(char* id, size_t max_size) const {
return 0;
}
};
enum InfoLogLevel : unsigned char {
......
......@@ -35,6 +35,10 @@
#include <rpc.h> // for uuid generation
#include <windows.h>
#include <shlwapi.h>
#include "strsafe.h"
#include <algorithm>
namespace rocksdb {
......@@ -44,10 +48,15 @@ ThreadStatusUpdater* CreateThreadStatusUpdater() {
namespace {
static const size_t kSectorSize = 512; // Sector size used when physical sector size could not be obtained from device.
// RAII helpers for HANDLEs
const auto CloseHandleFunc = [](HANDLE h) { ::CloseHandle(h); };
typedef std::unique_ptr<void, decltype(CloseHandleFunc)> UniqueCloseHandlePtr;
const auto FindCloseFunc = [](HANDLE h) { ::FindClose(h); };
typedef std::unique_ptr<void, decltype(FindCloseFunc)> UniqueFindClosePtr;
void WinthreadCall(const char* label, std::error_code result) {
if (0 != result.value()) {
fprintf(stderr, "pthread %s: %s\n", label, strerror(result.value()));
......@@ -61,7 +70,7 @@ namespace port {
WinEnvIO::WinEnvIO(Env* hosted_env)
: hosted_env_(hosted_env),
page_size_(4 * 1012),
page_size_(4 * 1024),
allocation_granularity_(page_size_),
perf_counter_frequency_(0),
GetSystemTimePreciseAsFileTime_(NULL) {
......@@ -93,8 +102,11 @@ WinEnvIO::~WinEnvIO() {
Status WinEnvIO::DeleteFile(const std::string& fname) {
Status result;
if (_unlink(fname.c_str())) {
result = IOError("Failed to delete: " + fname, errno);
BOOL ret = DeleteFileA(fname.c_str());
if(!ret) {
auto lastError = GetLastError();
result = IOErrorFromWindowsError("Failed to delete: " + fname,
lastError);
}
return result;
......@@ -231,7 +243,8 @@ Status WinEnvIO::NewRandomAccessFile(const std::string& fname,
fileGuard.release();
}
} else {
result->reset(new WinRandomAccessFile(fname, hFile, page_size_, options));
result->reset(new WinRandomAccessFile(fname, hFile,
std::max(GetSectorSize(fname), page_size_), options));
fileGuard.release();
}
return s;
......@@ -265,8 +278,7 @@ Status WinEnvIO::OpenWritableFile(const std::string& fname,
if (local_options.use_mmap_writes) {
desired_access |= GENERIC_READ;
}
else {
} else {
// Adding this solely for tests to pass (fault_injection_test,
// wal_manager_test).
shared_mode |= (FILE_SHARE_WRITE | FILE_SHARE_DELETE);
......@@ -317,7 +329,7 @@ Status WinEnvIO::OpenWritableFile(const std::string& fname,
} else {
// Here we want the buffer allocation to be aligned by the SSD page size
// and to be a multiple of it
result->reset(new WinWritableFile(fname, hFile, page_size_,
result->reset(new WinWritableFile(fname, hFile, std::max(GetSectorSize(fname), GetPageSize()),
c_BufferCapacity, local_options));
}
return s;
......@@ -361,7 +373,8 @@ Status WinEnvIO::NewRandomRWFile(const std::string & fname,
}
UniqueCloseHandlePtr fileGuard(hFile, CloseHandleFunc);
result->reset(new WinRandomRWFile(fname, hFile, page_size_, options));
result->reset(new WinRandomRWFile(fname, hFile, std::max(GetSectorSize(fname), GetPageSize()),
options));
fileGuard.release();
return s;
......@@ -372,67 +385,128 @@ Status WinEnvIO::NewDirectory(const std::string& name,
Status s;
// Must be nullptr on failure
result->reset();
// Must fail if directory does not exist
if (!DirExists(name)) {
s = IOError("Directory does not exist: " + name, EEXIST);
} else {
s = IOErrorFromWindowsError(
"open folder: " + name, ERROR_DIRECTORY);
return s;
}
HANDLE handle = INVALID_HANDLE_VALUE;
// 0 - for access means read metadata
{
IOSTATS_TIMER_GUARD(open_nanos);
result->reset(new WinDirectory);
handle = ::CreateFileA(name.c_str(), 0,
FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS, // make opening folders possible
NULL);
}
if (INVALID_HANDLE_VALUE == handle) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError(
"open folder: " + name, lastError);
return s;
}
result->reset(new WinDirectory(handle));
return s;
}
Status WinEnvIO::FileExists(const std::string& fname) {
// F_OK == 0
const int F_OK_ = 0;
return _access(fname.c_str(), F_OK_) == 0 ? Status::OK()
: Status::NotFound();
Status s;
// TODO: This does not follow symbolic links at this point
// which is consistent with _access() impl on windows
// but can be added
WIN32_FILE_ATTRIBUTE_DATA attrs;
if (FALSE == GetFileAttributesExA(fname.c_str(), GetFileExInfoStandard,
&attrs)) {
auto lastError = GetLastError();
switch (lastError) {
case ERROR_ACCESS_DENIED:
case ERROR_NOT_FOUND:
case ERROR_FILE_NOT_FOUND:
case ERROR_PATH_NOT_FOUND:
s = Status::NotFound();
break;
default:
s = IOErrorFromWindowsError("Unexpected error for: " + fname,
lastError);
break;
}
}
return s;
}
Status WinEnvIO::GetChildren(const std::string& dir,
std::vector<std::string>* result) {
Status status;
result->clear();
std::vector<std::string> output;
Status status;
WIN32_FIND_DATA data;
std::string pattern(dir);
pattern.append("\\").append("*");
auto CloseDir = [](DIR* p) { closedir(p); };
std::unique_ptr<DIR, decltype(CloseDir)> dirp(opendir(dir.c_str()),
CloseDir);
if (!dirp) {
switch (errno) {
case EACCES:
case ENOENT:
case ENOTDIR:
return Status::NotFound();
default:
return IOError(dir, errno);
}
} else {
if (result->capacity() > 0) {
output.reserve(result->capacity());
}
HANDLE handle = ::FindFirstFileExA(pattern.c_str(),
FindExInfoBasic, // Do not want alternative name
&data,
FindExSearchNameMatch,
NULL, // lpSearchFilter
0);
struct dirent* ent = readdir(dirp.get());
while (ent) {
output.push_back(ent->d_name);
ent = readdir(dirp.get());
if (handle == INVALID_HANDLE_VALUE) {
auto lastError = GetLastError();
switch (lastError) {
case ERROR_NOT_FOUND:
case ERROR_ACCESS_DENIED:
case ERROR_FILE_NOT_FOUND:
case ERROR_PATH_NOT_FOUND:
status = Status::NotFound();
break;
default:
status = IOErrorFromWindowsError(
"Failed to GetChhildren for: " + dir, lastError);
}
return status;
}
output.swap(*result);
UniqueFindClosePtr fc(handle, FindCloseFunc);
if (result->capacity() > 0) {
output.reserve(result->capacity());
}
// For safety
data.cFileName[MAX_PATH - 1] = 0;
while (true) {
output.emplace_back(data.cFileName);
BOOL ret =- ::FindNextFileA(handle, &data);
// If the function fails the return value is zero
// and non-zero otherwise. Not TRUE or FALSE.
if (ret == FALSE) {
// Posix does not care why we stopped
break;
}
data.cFileName[MAX_PATH - 1] = 0;
}
output.swap(*result);
return status;
}
Status WinEnvIO::CreateDir(const std::string& name) {
Status result;
if (_mkdir(name.c_str()) != 0) {
auto code = errno;
result = IOError("Failed to create dir: " + name, code);
BOOL ret = CreateDirectoryA(name.c_str(), NULL);
if (!ret) {
auto lastError = GetLastError();
result = IOErrorFromWindowsError(
"Failed to create a directory: " + name, lastError);
}
return result;
......@@ -441,28 +515,26 @@ Status WinEnvIO::CreateDir(const std::string& name) {
Status WinEnvIO::CreateDirIfMissing(const std::string& name) {
Status result;
if (DirExists(name)) {
return result;
}
if (_mkdir(name.c_str()) != 0) {
if (errno == EEXIST) {
BOOL ret = CreateDirectoryA(name.c_str(), NULL);
if (!ret) {
auto lastError = GetLastError();
if (lastError != ERROR_ALREADY_EXISTS) {
result = IOErrorFromWindowsError(
"Failed to create a directory: " + name, lastError);
} else if (!DirExists(name)) {
result =
Status::IOError("`" + name + "' exists but is not a directory");
} else {
auto code = errno;
result = IOError("Failed to create dir: " + name, code);
}
}
return result;
}
Status WinEnvIO::DeleteDir(const std::string& name) {
Status result;
if (_rmdir(name.c_str()) != 0) {
auto code = errno;
result = IOError("Failed to remove dir: " + name, code);
BOOL ret = RemoveDirectoryA(name.c_str());
if (!ret) {
auto lastError = GetLastError();
result = IOErrorFromWindowsError("Failed to remove dir: " + name, lastError);
}
return result;
}
......@@ -553,6 +625,81 @@ Status WinEnvIO::LinkFile(const std::string& src,
return result;
}
Status WinEnvIO::AreFilesSame(const std::string& first,
const std::string& second, bool* res) {
// For MinGW builds
#if (_WIN32_WINNT == _WIN32_WINNT_VISTA)
Status s = Status::NotSupported();
#else
assert(res != nullptr);
Status s;
if (res == nullptr) {
s = Status::InvalidArgument("res");
return s;
}
// 0 - for access means read metadata
HANDLE file_1 = ::CreateFileA(first.c_str(), 0,
FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS, // make opening folders possible
NULL);
if (INVALID_HANDLE_VALUE == file_1) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError(
"open file: " + first, lastError);
return s;
}
UniqueCloseHandlePtr g_1(file_1, CloseHandleFunc);
HANDLE file_2 = ::CreateFileA(second.c_str(), 0,
FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE,
NULL, OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS, // make opening folders possible
NULL);
if (INVALID_HANDLE_VALUE == file_2) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError(
"open file: " + second, lastError);
return s;
}
UniqueCloseHandlePtr g_2(file_2, CloseHandleFunc);
FILE_ID_INFO FileInfo_1;
BOOL result = GetFileInformationByHandleEx(file_1, FileIdInfo, &FileInfo_1,
sizeof(FileInfo_1));
if (!result) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError(
"stat file: " + first, lastError);
return s;
}
FILE_ID_INFO FileInfo_2;
result = GetFileInformationByHandleEx(file_2, FileIdInfo, &FileInfo_2,
sizeof(FileInfo_2));
if (!result) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError(
"stat file: " + second, lastError);
return s;
}
if (FileInfo_1.VolumeSerialNumber == FileInfo_2.VolumeSerialNumber) {
*res = (0 == memcmp(FileInfo_1.FileId.Identifier, FileInfo_2.FileId.Identifier,
sizeof(FileInfo_1.FileId.Identifier)));
} else {
*res = false;
}
#endif
return s;
}
Status WinEnvIO::LockFile(const std::string& lockFname,
FileLock** lock) {
assert(lock != nullptr);
......@@ -596,12 +743,12 @@ Status WinEnvIO::UnlockFile(FileLock* lock) {
}
Status WinEnvIO::GetTestDirectory(std::string* result) {
std::string output;
const char* env = getenv("TEST_TMPDIR");
if (env && env[0] != '\0') {
output = env;
CreateDir(output);
} else {
env = getenv("TMP");
......@@ -610,9 +757,8 @@ Status WinEnvIO::GetTestDirectory(std::string* result) {
} else {
output = "c:\\tmp";
}
CreateDir(output);
}
CreateDir(output);
output.append("\\testrocksdb-");
output.append(std::to_string(_getpid()));
......@@ -722,26 +868,29 @@ Status WinEnvIO::GetHostName(char* name, uint64_t len) {
Status WinEnvIO::GetAbsolutePath(const std::string& db_path,
std::string* output_path) {
// Check if we already have an absolute path
// that starts with non dot and has a semicolon in it
if ((!db_path.empty() && (db_path[0] == '/' || db_path[0] == '\\')) ||
(db_path.size() > 2 && db_path[0] != '.' &&
((db_path[1] == ':' && db_path[2] == '\\') ||
(db_path[1] == ':' && db_path[2] == '/')))) {
// For test compatibility we will consider starting slash as an
// absolute path
if ((!db_path.empty() && (db_path[0] == '\\' || db_path[0] == '/')) ||
!PathIsRelativeA(db_path.c_str())) {
*output_path = db_path;
return Status::OK();
}
std::string result;
result.resize(_MAX_PATH);
result.resize(MAX_PATH);
char* ret = _getcwd(&result[0], _MAX_PATH);
if (ret == nullptr) {
return Status::IOError("Failed to get current working directory",
strerror(errno));
// Hopefully no changes the current directory while we do this
// however _getcwd also suffers from the same limitation
DWORD len = GetCurrentDirectoryA(MAX_PATH, &result[0]);
if (len == 0) {
auto lastError = GetLastError();
return IOErrorFromWindowsError("Failed to get current working directory",
lastError);
}
result.resize(strlen(result.data()));
result.resize(len);
result.swap(*output_path);
return Status::OK();
......@@ -808,6 +957,62 @@ bool WinEnvIO::DirExists(const std::string& dname) {
return false;
}
size_t WinEnvIO::GetSectorSize(const std::string& fname) {
size_t sector_size = kSectorSize;
if (PathIsRelativeA(fname.c_str())) {
return sector_size;
}
// obtain device handle
char devicename[7] = "\\\\.\\";
int erresult = strncat_s(devicename, sizeof(devicename), fname.c_str(), 2);
if (erresult) {
assert(false);
return sector_size;
}
HANDLE hDevice = CreateFile(devicename, 0, 0,
nullptr, OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL, nullptr);
if (hDevice == INVALID_HANDLE_VALUE) {
return sector_size;
}
STORAGE_PROPERTY_QUERY spropertyquery;
spropertyquery.PropertyId = StorageAccessAlignmentProperty;
spropertyquery.QueryType = PropertyStandardQuery;
BYTE output_buffer[sizeof(STORAGE_ACCESS_ALIGNMENT_DESCRIPTOR)];
DWORD output_bytes = 0;
BOOL ret = DeviceIoControl(hDevice, IOCTL_STORAGE_QUERY_PROPERTY,
&spropertyquery, sizeof(spropertyquery), output_buffer,
sizeof(STORAGE_ACCESS_ALIGNMENT_DESCRIPTOR), &output_bytes, nullptr);
if (ret) {
sector_size = ((STORAGE_ACCESS_ALIGNMENT_DESCRIPTOR *)output_buffer)->BytesPerLogicalSector;
} else {
// many devices do not support StorageProcessAlignmentProperty. Any failure here and we
// fall back to logical alignment
DISK_GEOMETRY_EX geometry = { 0 };
ret = DeviceIoControl(hDevice, IOCTL_DISK_GET_DRIVE_GEOMETRY,
nullptr, 0, &geometry, sizeof(geometry), nullptr, nullptr);
if (ret) {
sector_size = geometry.Geometry.BytesPerSector;
}
}
if (hDevice != INVALID_HANDLE_VALUE) {
CloseHandle(hDevice);
}
return sector_size;
}
////////////////////////////////////////////////////////////////////////
// WinEnvThreads
......@@ -1014,6 +1219,11 @@ Status WinEnv::LinkFile(const std::string& src,
return winenv_io_.LinkFile(src, target);
}
Status WinEnv::AreFilesSame(const std::string& first,
const std::string& second, bool* res) {
return winenv_io_.AreFilesSame(first, second, res);
}
Status WinEnv::LockFile(const std::string& lockFname,
FileLock** lock) {
return winenv_io_.LockFile(lockFname, lock);
......
......@@ -138,6 +138,9 @@ public:
virtual Status LinkFile(const std::string& src,
const std::string& target);
virtual Status AreFilesSame(const std::string& first,
const std::string& second, bool* res);
virtual Status LockFile(const std::string& lockFname,
FileLock** lock);
......@@ -171,6 +174,8 @@ public:
uint64_t GetPerfCounterFrequency() const { return perf_counter_frequency_; }
static size_t GetSectorSize(const std::string& fname);
private:
// Returns true iff the named directory exists and is a directory.
virtual bool DirExists(const std::string& dname);
......@@ -248,6 +253,9 @@ public:
Status LinkFile(const std::string& src,
const std::string& target) override;
Status AreFilesSame(const std::string& first,
const std::string& second, bool* res) override;
Status LockFile(const std::string& lockFname,
FileLock** lock) override;
......
......@@ -157,9 +157,11 @@ size_t GetUniqueIdFromFile(HANDLE hFile, char* id, size_t max_size) {
if (max_size < kMaxVarint64Length * 3) {
return 0;
}
// This function has to be re-worked for cases when
// ReFS file system introduced on Windows Server 2012 is used
#if (_WIN32_WINNT == _WIN32_WINNT_VISTA)
// MINGGW as defined by CMake file.
// yuslepukhin: I hate the guts of the above macros.
// This impl does not guarantee uniqueness everywhere
// is reasonably good
BY_HANDLE_FILE_INFORMATION FileInfo;
BOOL result = GetFileInformationByHandle(hFile, &FileInfo);
......@@ -177,6 +179,33 @@ size_t GetUniqueIdFromFile(HANDLE hFile, char* id, size_t max_size) {
assert(rid >= id);
return static_cast<size_t>(rid - id);
#else
FILE_ID_INFO FileInfo;
BOOL result = GetFileInformationByHandleEx(hFile, FileIdInfo, &FileInfo,
sizeof(FileInfo));
TEST_SYNC_POINT_CALLBACK("GetUniqueIdFromFile:FS_IOC_GETVERSION", &result);
if (!result) {
return 0;
}
static_assert(sizeof(uint64_t) == sizeof(FileInfo.VolumeSerialNumber),
"Wrong sizeof expectations");
// FileId.Identifier is an array of 16 BYTEs, we encode them as two uint64_t
static_assert(sizeof(uint64_t) * 2 == sizeof(FileInfo.FileId.Identifier),
"Wrong sizeof expectations");
char* rid = id;
rid = EncodeVarint64(rid, uint64_t(FileInfo.VolumeSerialNumber));
uint64_t* file_id = reinterpret_cast<uint64_t*>(&FileInfo.FileId.Identifier[0]);
rid = EncodeVarint64(rid, *file_id);
++file_id;
rid = EncodeVarint64(rid, *file_id);
assert(rid >= id);
return static_cast<size_t>(rid - id);
#endif
}
////////////////////////////////////////////////////////////////////////////////////////////////////
......@@ -782,8 +811,7 @@ Status WinWritableImpl::AppendImpl(const Slice& data) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError(
"Failed to pwrite for: " + file_data_->GetName(), lastError);
}
else {
} else {
written = ret;
}
......@@ -828,8 +856,7 @@ Status WinWritableImpl::PositionedAppendImpl(const Slice& data, uint64_t offset)
auto lastError = GetLastError();
s = IOErrorFromWindowsError(
"Failed to pwrite for: " + file_data_->GetName(), lastError);
}
else {
} else {
assert(size_t(ret) == data.size());
// For sequential write this would be simple
// size extension by data.size()
......@@ -1019,6 +1046,9 @@ Status WinRandomRWFile::Close() {
Status WinDirectory::Fsync() { return Status::OK(); }
size_t WinDirectory::GetUniqueId(char* id, size_t max_size) const {
return GetUniqueIdFromFile(handle_, id, max_size);
}
//////////////////////////////////////////////////////////////////////////
/// WinFileLock
......
......@@ -421,10 +421,19 @@ class WinRandomRWFile : private WinFileData,
};
class WinDirectory : public Directory {
HANDLE handle_;
public:
WinDirectory() {}
explicit
WinDirectory(HANDLE h) noexcept :
handle_(h) {
assert(handle_ != INVALID_HANDLE_VALUE);
}
~WinDirectory() {
::CloseHandle(handle_);
}
virtual Status Fsync() override;
size_t GetUniqueId(char* id, size_t max_size) const override;
};
class WinFileLock : public FileLock {
......
......@@ -108,19 +108,20 @@ void InitOnce(OnceType* once, void (*initializer)()) {
// Private structure, exposed only by pointer
struct DIR {
intptr_t handle_;
bool firstread_;
struct __finddata64_t data_;
HANDLE handle_;
bool firstread_;
WIN32_FIND_DATA data_;
dirent entry_;
DIR() : handle_(-1), firstread_(true) {}
DIR() : handle_(INVALID_HANDLE_VALUE),
firstread_(true) {}
DIR(const DIR&) = delete;
DIR& operator=(const DIR&) = delete;
~DIR() {
if (-1 != handle_) {
_findclose(handle_);
if (INVALID_HANDLE_VALUE != handle_) {
::FindClose(handle_);
}
}
};
......@@ -136,19 +137,25 @@ DIR* opendir(const char* name) {
std::unique_ptr<DIR> dir(new DIR);
dir->handle_ = _findfirst64(pattern.c_str(), &dir->data_);
dir->handle_ = ::FindFirstFileExA(pattern.c_str(),
FindExInfoBasic, // Do not want alternative name
&dir->data_,
FindExSearchNameMatch,
NULL, // lpSearchFilter
0);
if (dir->handle_ == -1) {
if (dir->handle_ == INVALID_HANDLE_VALUE) {
return nullptr;
}
strcpy_s(dir->entry_.d_name, sizeof(dir->entry_.d_name), dir->data_.name);
strcpy_s(dir->entry_.d_name, sizeof(dir->entry_.d_name),
dir->data_.cFileName);
return dir.release();
}
struct dirent* readdir(DIR* dirp) {
if (!dirp || dirp->handle_ == -1) {
if (!dirp || dirp->handle_ == INVALID_HANDLE_VALUE) {
errno = EBADF;
return nullptr;
}
......@@ -158,13 +165,14 @@ struct dirent* readdir(DIR* dirp) {
return &dirp->entry_;
}
auto ret = _findnext64(dirp->handle_, &dirp->data_);
auto ret = ::FindNextFileA(dirp->handle_, &dirp->data_);
if (ret != 0) {
if (ret == 0) {
return nullptr;
}
strcpy_s(dirp->entry_.d_name, sizeof(dirp->entry_.d_name), dirp->data_.name);
strcpy_s(dirp->entry_.d_name, sizeof(dirp->entry_.d_name),
dirp->data_.cFileName);
return &dirp->entry_;
}
......
......@@ -36,9 +36,13 @@ WinLogger::WinLogger(uint64_t (*gettid)(), Env* env, HANDLE file,
log_size_(0),
last_flush_micros_(0),
env_(env),
flush_pending_(false) {}
flush_pending_(false) {
assert(file_ != NULL);
assert(file_ != INVALID_HANDLE_VALUE);
}
void WinLogger::DebugWriter(const char* str, int len) {
assert(file_ != INVALID_HANDLE_VALUE);
DWORD bytesWritten = 0;
BOOL ret = WriteFile(file_, str, len, &bytesWritten, NULL);
if (ret == FALSE) {
......@@ -47,11 +51,38 @@ void WinLogger::DebugWriter(const char* str, int len) {
}
}
WinLogger::~WinLogger() { close(); }
WinLogger::~WinLogger() {
CloseInternal();
}
Status WinLogger::CloseImpl() {
return CloseInternal();
}
void WinLogger::close() { CloseHandle(file_); }
Status WinLogger::CloseInternal() {
Status s;
if (INVALID_HANDLE_VALUE != file_) {
BOOL ret = FlushFileBuffers(file_);
if (ret == 0) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError("Failed to flush LOG on Close() ",
lastError);
}
ret = CloseHandle(file_);
// On error the return value is zero
if (ret == 0 && s.ok()) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError("Failed to flush LOG on Close() ",
lastError);
}
file_ = INVALID_HANDLE_VALUE;
closed_ = true;
}
return s;
}
void WinLogger::Flush() {
assert(file_ != INVALID_HANDLE_VALUE);
if (flush_pending_) {
flush_pending_ = false;
// With Windows API writes go to OS buffers directly so no fflush needed
......@@ -64,6 +95,7 @@ void WinLogger::Flush() {
void WinLogger::Logv(const char* format, va_list ap) {
IOSTATS_TIMER_GUARD(logger_nanos);
assert(file_ != INVALID_HANDLE_VALUE);
const uint64_t thread_id = (*gettid_)();
......
......@@ -36,8 +36,6 @@ class WinLogger : public rocksdb::Logger {
WinLogger& operator=(const WinLogger&) = delete;
void close();
void Flush() override;
using rocksdb::Logger::Logv;
......@@ -47,6 +45,10 @@ class WinLogger : public rocksdb::Logger {
void DebugWriter(const char* str, int len);
protected:
Status CloseImpl() override;
private:
HANDLE file_;
uint64_t (*gettid_)(); // Return the thread id for the current thread
......@@ -55,6 +57,8 @@ class WinLogger : public rocksdb::Logger {
Env* env_;
bool flush_pending_;
Status CloseInternal();
const static uint64_t flush_every_seconds_ = 5;
};
......
......@@ -175,7 +175,7 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn,
if (txn != nullptr) {
std::hash<std::thread::id> hasher;
char name[64];
snprintf(name, 64, "txn%zu-%d", hasher(std::this_thread::get_id()),
snprintf(name, 64, "txn%" ROCKSDB_PRIszt "-%d", hasher(std::this_thread::get_id()),
txn_id_++);
assert(strlen(name) < 64 - 1);
if (!is_optimistic && !rand_->OneIn(10)) {
......
......@@ -279,7 +279,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
if (heap_.top() == erased_heap_.top()) {
heap_.pop();
}
auto erased __attribute__((__unused__)) = erased_heap_.top();
uint64_t erased __attribute__((__unused__));
erased = erased_heap_.top();
erased_heap_.pop();
// No duplicate prepare sequence numbers
assert(erased_heap_.empty() || erased_heap_.top() != erased);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册