提交 5ebdc7e3 编写于 作者: E Enrico Giordani

[Code cleanup] Code refactoring, formatting, comments, error logging.

上级 81891d71
......@@ -154,11 +154,11 @@ BOOL WriteToProcmon(wstring message)
#define IFFAILTHROW(a,m) if(!(a)) { throw system_error(GetLastError(), system_category(), m); }
#define MAX_GLOBAL_DATA 10000
struct QForkBeginInfo {
BYTE globalData[MAX_GLOBAL_DATA];
size_t globalDataSize;
unsigned __int32 dictHashSeed;
#define MAX_REDIS_DATA_SIZE 10000
struct QForkInfo {
BYTE redisData[MAX_REDIS_DATA_SIZE];
size_t redisDataSize;
uint32_t dictHashSeed;
char filename[MAX_PATH];
int *fds;
int numfds;
......@@ -222,7 +222,7 @@ struct QForkControl {
HANDLE operationFailed;
// Global data pointers to be passed to the forked process
QForkBeginInfo globalData;
QForkInfo globalData;
#ifdef USE_DLMALLOC
BYTE DLMallocGlobalState[1000];
size_t DLMallocGlobalStateSize;
......@@ -335,7 +335,9 @@ BOOL QForkChildInit(HANDLE QForkControlMemoryMapHandle, DWORD ParentProcessID) {
#endif
// Copy redis globals into fork process
SetupGlobals(g_pQForkControl->globalData.globalData, g_pQForkControl->globalData.globalDataSize, g_pQForkControl->globalData.dictHashSeed);
SetupRedisGlobals(g_pQForkControl->globalData.redisData,
g_pQForkControl->globalData.redisDataSize,
g_pQForkControl->globalData.dictHashSeed);
// Execute requested operation
if (g_pQForkControl->typeOfOperation == OperationType::otRDB) {
......@@ -570,14 +572,14 @@ BOOL QForkShutdown() {
return TRUE;
}
void CopyForkOperationData(OperationType type, LPVOID globalData, int sizeOfGlobalData, uint32_t dictHashSeed) {
void CopyForkOperationData(OperationType type, LPVOID redisData, int redisDataSize, uint32_t dictHashSeed) {
// Copy operation data
g_pQForkControl->typeOfOperation = type;
if (sizeOfGlobalData > MAX_GLOBAL_DATA) {
throw runtime_error("Global state too large.");
if (redisDataSize > MAX_REDIS_DATA_SIZE) {
throw runtime_error("Global redis data too large.");
}
memcpy(&(g_pQForkControl->globalData.globalData), globalData, sizeOfGlobalData);
g_pQForkControl->globalData.globalDataSize = sizeOfGlobalData;
memcpy(&(g_pQForkControl->globalData.redisData), redisData, redisDataSize);
g_pQForkControl->globalData.redisDataSize = redisDataSize;
g_pQForkControl->globalData.dictHashSeed = dictHashSeed;
#ifdef USE_DLMALLOC
......@@ -600,21 +602,24 @@ void CopyForkOperationData(OperationType type, LPVOID globalData, int sizeOfGlob
if (g_pQForkControl->heapBlockList[i].state == BlockState::bsMAPPED_IN_USE) {
oldProtect = 0;
VirtualProtect((byte*) g_pQForkControl->heapStart + i * cAllocationGranularity,
cAllocationGranularity,
PAGE_WRITECOPY,
&oldProtect);
cAllocationGranularity,
PAGE_WRITECOPY,
&oldProtect);
}
}
}
void CreateChildProcess(PROCESS_INFORMATION *pi, DWORD dwCreationFlags = 0) {
// Ensure events are in the correst state
IFFAILTHROW(ResetEvent(g_pQForkControl->operationComplete), "CreateChildProcess: ResetEvent() failed.");
IFFAILTHROW(ResetEvent(g_pQForkControl->operationFailed), "CreateChildProcess: ResetEvent() failed.");
IFFAILTHROW(ResetEvent(g_pQForkControl->operationComplete),
"CreateChildProcess: ResetEvent() failed.");
IFFAILTHROW(ResetEvent(g_pQForkControl->operationFailed),
"CreateChildProcess: ResetEvent() failed.");
// Launch the "forked" process
char fileName[MAX_PATH];
IFFAILTHROW(GetModuleFileNameA(NULL, fileName, MAX_PATH), "Failed to get module name.");
IFFAILTHROW(GetModuleFileNameA(NULL, fileName, MAX_PATH),
"Failed to get module name.");
STARTUPINFOA si;
memset(&si, 0, sizeof(STARTUPINFOA));
......@@ -622,15 +627,28 @@ void CreateChildProcess(PROCESS_INFORMATION *pi, DWORD dwCreationFlags = 0) {
char arguments[_MAX_PATH];
memset(arguments, 0, _MAX_PATH);
sprintf_s(arguments, _MAX_PATH, "\"%s\" --%s %llu %lu --%s \"%s\"", fileName, cQFork.c_str(), (uint64_t) g_hQForkControlFileMap, GetCurrentProcessId(), cLogfile.c_str(), getLogFilename());
IFFAILTHROW(CreateProcessA(fileName, arguments, NULL, NULL, TRUE, dwCreationFlags, NULL, NULL, &si, pi), "Problem creating slave process");
sprintf_s(arguments,
_MAX_PATH,
"\"%s\" --%s %llu %lu --%s \"%s\"",
fileName,
cQFork.c_str(),
(uint64_t) g_hQForkControlFileMap,
GetCurrentProcessId(),
cLogfile.c_str(),
getLogFilename());
IFFAILTHROW(CreateProcessA(fileName, arguments, NULL, NULL, TRUE, dwCreationFlags, NULL, NULL, &si, pi),
"Problem creating slave process");
g_hForkedProcess = pi->hProcess;
}
typedef void (*CHILD_PID_HOOK)(DWORD pid);
pid_t BeginForkOperation(OperationType type, LPVOID globalData, int sizeOfGlobalData, uint32_t dictHashSeed) {
pid_t BeginForkOperation(OperationType type,
LPVOID redisData,
int redisDataSize,
uint32_t dictHashSeed)
{
PROCESS_INFORMATION pi;
try {
pi.hProcess = INVALID_HANDLE_VALUE;
......@@ -639,10 +657,10 @@ pid_t BeginForkOperation(OperationType type, LPVOID globalData, int sizeOfGlobal
if (type == OperationType::otSocket) {
CreateChildProcess(&pi, CREATE_SUSPENDED);
BeginForkOperation_Socket_Duplicate(pi.dwProcessId);
CopyForkOperationData(type, globalData, sizeOfGlobalData, dictHashSeed);
CopyForkOperationData(type, redisData, redisDataSize, dictHashSeed);
ResumeThread(pi.hThread);
} else {
CopyForkOperationData(type, globalData, sizeOfGlobalData, dictHashSeed);
CopyForkOperationData(type, redisData, redisDataSize, dictHashSeed);
CreateChildProcess(&pi, 0);
}
......@@ -665,24 +683,22 @@ pid_t BeginForkOperation(OperationType type, LPVOID globalData, int sizeOfGlobal
return -1;
}
pid_t BeginForkOperation_Rdb(
char *filename,
LPVOID globalData,
int sizeOfGlobalData,
unsigned __int32 dictHashSeed)
pid_t BeginForkOperation_Rdb(char *filename,
LPVOID redisData,
int redisDataSize,
uint32_t dictHashSeed)
{
strcpy_s(g_pQForkControl->globalData.filename, filename);
return BeginForkOperation(otRDB, globalData, sizeOfGlobalData, dictHashSeed);
return BeginForkOperation(otRDB, redisData, redisDataSize, dictHashSeed);
}
pid_t BeginForkOperation_Aof(
int aof_pipe_write_ack_to_parent,
int aof_pipe_read_ack_from_parent,
int aof_pipe_read_data_from_parent,
char *filename,
LPVOID globalData,
int sizeOfGlobalData,
unsigned __int32 dictHashSeed)
pid_t BeginForkOperation_Aof(int aof_pipe_write_ack_to_parent,
int aof_pipe_read_ack_from_parent,
int aof_pipe_read_data_from_parent,
char *filename,
LPVOID redisData,
int redisDataSize,
uint32_t dictHashSeed)
{
HANDLE aof_pipe_write_ack_handle = (HANDLE) FDAPI_get_osfhandle(aof_pipe_write_ack_to_parent);
HANDLE aof_pipe_read_ack_handle = (HANDLE) FDAPI_get_osfhandle(aof_pipe_read_ack_from_parent);
......@@ -694,7 +710,7 @@ pid_t BeginForkOperation_Aof(
g_pQForkControl->globalData.aof_pipe_read_data_handle = (aof_pipe_read_data_handle);
strcpy_s(g_pQForkControl->globalData.filename, filename);
return BeginForkOperation(otAOF, globalData, sizeOfGlobalData, dictHashSeed);
return BeginForkOperation(otAOF, redisData, redisDataSize, dictHashSeed);
}
void BeginForkOperation_Socket_Duplicate(DWORD dwProcessId) {
......@@ -705,18 +721,19 @@ void BeginForkOperation_Socket_Duplicate(DWORD dwProcessId) {
#endif
g_pQForkControl->globalData.protocolInfo = protocolInfo;
for(int i = 0; i < g_pQForkControl->globalData.numfds; i++) {
FDAPI_WSADuplicateSocket(g_pQForkControl->globalData.fds[i], dwProcessId, &protocolInfo[i]);
FDAPI_WSADuplicateSocket(g_pQForkControl->globalData.fds[i],
dwProcessId,
&protocolInfo[i]);
}
}
pid_t BeginForkOperation_Socket(
int *fds,
int numfds,
uint64_t *clientids,
int pipe_write_fd,
LPVOID globalData,
int sizeOfGlobalData,
unsigned __int32 dictHashSeed)
pid_t BeginForkOperation_Socket(int *fds,
int numfds,
uint64_t *clientids,
int pipe_write_fd,
LPVOID redisData,
int redisDataSize,
uint32_t dictHashSeed)
{
g_pQForkControl->globalData.fds = fds;
g_pQForkControl->globalData.numfds = numfds;
......@@ -727,7 +744,7 @@ pid_t BeginForkOperation_Socket(
// The handle is already inheritable so there is no need to duplicate it
g_pQForkControl->globalData.pipe_write_handle = (pipe_write_handle);
return BeginForkOperation(otSocket, globalData, sizeOfGlobalData, dictHashSeed);
return BeginForkOperation(otSocket, redisData, redisDataSize, dictHashSeed);
}
OperationStatus GetForkOperationStatus() {
......@@ -762,7 +779,8 @@ BOOL AbortForkOperation() {
try {
if( g_hForkedProcess != 0 )
{
IFFAILTHROW(TerminateProcess(g_hForkedProcess, 1), "EndForkOperation: Killing forked process failed.");
IFFAILTHROW(TerminateProcess(g_hForkedProcess, 1),
"EndForkOperation: Killing forked process failed.");
CloseHandle(g_hForkedProcess);
g_hForkedProcess = 0;
}
......@@ -770,79 +788,62 @@ BOOL AbortForkOperation() {
return EndForkOperation(NULL);
}
catch(system_error syserr) {
redisLog(REDIS_WARNING, "AbortForkOperation(): 0x%08x - %s\n", syserr.code().value(), syserr.what());
redisLog(REDIS_WARNING, "AbortForkOperation: 0x%08x - %s\n", syserr.code().value(), syserr.what());
// If we can not properly restore fork state, then another fork operation is not possible.
exit(1);
}
catch(...) {
redisLog(REDIS_WARNING, "Some other exception caught in EndForkOperation().\n");
catch(exception ex) {
redisLog(REDIS_WARNING, "AbortForkOperation: %s\n", ex.what());
exit(1);
}
return FALSE;
}
void RejoinCOWPages(HANDLE mmHandle, byte* mmStart, size_t mmSize) {
SmartFileView<byte> copyView(
mmHandle,
FILE_MAP_WRITE,
0,
0,
mmSize,
string("RejoinCOWPages: Could not map COW back-copy view."));
SmartFileView<byte> copyView(mmHandle, FILE_MAP_WRITE, 0, 0, mmSize,
string("RejoinCOWPages: Could not map COW back-copy view."));
for (byte* mmAddress = mmStart; mmAddress < mmStart + mmSize; ) {
MEMORY_BASIC_INFORMATION memInfo;
if (!VirtualQuery(
mmAddress,
&memInfo,
sizeof(memInfo))) {
throw system_error(
GetLastError(),
system_category(),
"RejoinCOWPages: VirtualQuery failure");
}
IFFAILTHROW(VirtualQuery(mmAddress, &memInfo, sizeof(memInfo)),
"RejoinCOWPages: VirtualQuery failure");
byte* regionEnd = (byte*)memInfo.BaseAddress + memInfo.RegionSize;
if (memInfo.Protect != PAGE_WRITECOPY) {
// Copy back only the pages that have been copied on write
byte* srcEnd = min(regionEnd, mmStart + mmSize);
memcpy(copyView + (mmAddress - mmStart), mmAddress, srcEnd - mmAddress);
}
mmAddress = regionEnd;
}
// If the COWs are not discarded, then there is no way of propagating changes into subsequent fork operations.
// If the COWs are not discarded, then there is no way of propagating
// changes into subsequent fork operations.
#if FALSE
// This doesn't work. Disabling for now.
// This works when using a memory mapped file but it fails when using
// the system paging file.
if (WindowsVersion::getInstance().IsAtLeast_6_2()) {
// restores all page protections on the view and culls the COW pages.
// Restores all page protections on the view and culls the COW pages.
DWORD oldProtect;
if (FALSE == VirtualProtect(mmStart, mmSize, PAGE_READWRITE | PAGE_REVERT_TO_FILE_MAP, &oldProtect)) {
throw system_error(GetLastError(), system_category(), "RejoinCOWPages: COW cull failed");
}
IFFAILTHROW(VirtualProtect(mmStart, mmSize, PAGE_READWRITE | PAGE_REVERT_TO_FILE_MAP, &oldProtect),
"RejoinCOWPages: COW cull failed");
} else
#endif
{
// Prior to Win8 unmapping the view was the only way to discard the COW pages from the view. Unfortunately this forces
// the view to be completely flushed to disk, which is a bit inefficient.
// Prior to Win8 unmapping the view was the only way to discard the
// COW pages from the view. Unfortunately this forces the view to be
// completely flushed to disk, which is a bit inefficient.
IFFAILTHROW(UnmapViewOfFile(mmStart), "RejoinCOWPages: UnmapViewOfFile failed.");
// There is a race condition here. Something could map into the virtual address space used by the heap at the moment
// we are discarding local changes. There is nothing to do but report the problem and exit. This problem does not
// exist with the code above in Win8+ as the view is never unmapped.
LPVOID remapped =
MapViewOfFileEx(
mmHandle,
FILE_MAP_ALL_ACCESS,
0, 0,
0,
mmStart);
if (remapped == NULL) {
throw system_error(
GetLastError(),
system_category(),
"RejoinCOWPages: MapViewOfFileEx failed.");
}
// There is a possible race condition here. Something could map into
// the virtual address space used by the heap at the moment we are
// discarding local changes. There is nothing to do but report the
// problem and exit. This problem does not exist with the code above
// in Win8+ as the view is never unmapped.
IFFAILTHROW(MapViewOfFileEx(mmHandle, FILE_MAP_ALL_ACCESS, 0, 0, 0, mmStart),
"RejoinCOWPages: MapViewOfFileEx failed.");
}
}
......@@ -850,7 +851,8 @@ BOOL EndForkOperation(int * pExitCode) {
try {
if (g_hForkedProcess != 0) {
if (WaitForSingleObject(g_hForkedProcess, cDeadForkWait) == WAIT_TIMEOUT) {
IFFAILTHROW(TerminateProcess(g_hForkedProcess, 1), "EndForkOperation: Killing forked process failed.");
IFFAILTHROW(TerminateProcess(g_hForkedProcess, 1),
"EndForkOperation: Killing forked process failed.");
}
if (pExitCode != NULL) {
......@@ -861,10 +863,12 @@ BOOL EndForkOperation(int * pExitCode) {
g_hForkedProcess = 0;
}
IFFAILTHROW(ResetEvent(g_pQForkControl->operationComplete), "EndForkOperation: ResetEvent() failed.");
IFFAILTHROW(ResetEvent(g_pQForkControl->operationFailed), "EndForkOperation: ResetEvent() failed.");
IFFAILTHROW(ResetEvent(g_pQForkControl->operationComplete),
"EndForkOperation: ResetEvent() failed.");
IFFAILTHROW(ResetEvent(g_pQForkControl->operationFailed),
"EndForkOperation: ResetEvent() failed.");
// Move local changes back into memory mapped views for next fork operation
// Move the heap local changes back into memory mapped views for next fork operation
for (int i = 0; i < g_pQForkControl->numMappedBlocks; i++) {
if (g_pQForkControl->heapBlockList[i].state == BlockState::bsMAPPED_IN_USE) {
RejoinCOWPages(g_pQForkControl->heapBlockList[i].heapMap,
......@@ -873,10 +877,7 @@ BOOL EndForkOperation(int * pExitCode) {
}
}
RejoinCOWPages(
g_hQForkControlFileMap,
(byte*) g_pQForkControl,
sizeof(QForkControl));
RejoinCOWPages(g_hQForkControlFileMap, (byte*) g_pQForkControl, sizeof(QForkControl));
return TRUE;
}
......@@ -886,8 +887,8 @@ BOOL EndForkOperation(int * pExitCode) {
// If we can not properly restore fork state, then another fork operation is not possible.
exit(1);
}
catch (...) {
redisLog(REDIS_WARNING, "Some other exception caught in EndForkOperation().\n");
catch (exception ex) {
redisLog(REDIS_WARNING, "EndForkOperation: %s\n", ex.what());
exit(1);
}
return FALSE;
......@@ -906,13 +907,15 @@ HANDLE CreateBlockMap(int blockIndex) {
LPVOID addr = (byte*) g_pQForkControl->heapStart + blockIndex * cAllocationGranularity;
// Free the memory that was reserved in QForkParentInit() before mapping it
IFFAILTHROW(VirtualFree(addr, 0, MEM_RELEASE), "PhysicalMapMemory: VirtualFree failed");
IFFAILTHROW(VirtualFree(addr, 0, MEM_RELEASE),
"PhysicalMapMemory: VirtualFree failed");
LPVOID realAddr = MapViewOfFileEx(map, FILE_MAP_ALL_ACCESS, 0, 0, 0, addr);
IFFAILTHROW(realAddr, "PhysicalMapMemory: MapViewOfFileEx failed");
DWORD old;
IFFAILTHROW(VirtualProtect(realAddr, cAllocationGranularity, PAGE_READWRITE, &old), "PhysicalMapMemory: VirtualProtect failed");
IFFAILTHROW(VirtualProtect(realAddr, cAllocationGranularity, PAGE_READWRITE, &old),
"PhysicalMapMemory: VirtualProtect failed");
return map;
}
......
......@@ -52,27 +52,27 @@ typedef enum startupStatus {
// For parent process use only
pid_t BeginForkOperation_Rdb(
char* fileName,
LPVOID globalData,
int sizeOfGlobalData,
unsigned __int32 dictHashSeed);
LPVOID redisData,
int sizeOfRedisData,
uint32_t dictHashSeed);
pid_t BeginForkOperation_Aof(
int aof_pipe_write_ack_to_parent,
int aof_pipe_read_ack_from_parent,
int aof_pipe_read_data_from_parent,
char* fileName,
LPVOID globalData,
int sizeOfGlobalData,
unsigned __int32 dictHashSeed);
LPVOID redisData,
int sizeOfRedisData,
uint32_t dictHashSeed);
pid_t BeginForkOperation_Socket(
int *fds,
int numfds,
uint64_t *clientids,
int pipe_write_fd,
LPVOID globalData,
int sizeOfGlobalData,
unsigned __int32 dictHashSeed);
LPVOID redisData,
int sizeOfRedisData,
uint32_t dictHashSeed);
void BeginForkOperation_Socket_Duplicate(DWORD dwProcessId);
......
......@@ -22,10 +22,10 @@
#include "..\redis.h"
void SetupGlobals(LPVOID globalData, size_t globalDataSize, uint32_t dictHashSeed)
void SetupRedisGlobals(LPVOID redisData, size_t redisDataSize, uint32_t dictHashSeed)
{
#ifndef NO_QFORKIMPL
memcpy(&server, globalData, globalDataSize);
memcpy(&server, redisData, redisDataSize);
dictSetHashFunctionSeed(dictHashSeed);
#endif
}
......
......@@ -26,7 +26,7 @@
extern "C" {
#endif
void SetupGlobals(LPVOID globalData, size_t globalDataSize, unsigned __int32 dictHashKey);
void SetupRedisGlobals(LPVOID redisData, size_t redisDataSize, uint32_t dictHashKey);
int do_rdbSave(char* filename);
int do_aofSave(char* filename, int aof_pipe_read_ack, int aof_pipe_read_data, int aof_pipe_write_ack);
int do_socketSave(int *fds, int numfds, uint64_t *clientids, int pipe_write_fd);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册