From 044b02bb60d5e3853ecf36f4c25f13eee61c4dda Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 23 Nov 2021 15:46:23 +0800 Subject: [PATCH] add wal implementation --- include/libs/wal/wal.h | 48 ++++++- include/os/os.h | 1 + include/util/tlog.h | 2 + include/util/tnote.h | 2 +- include/util/ulog.h | 15 ++- source/libs/wal/CMakeLists.txt | 1 + source/libs/wal/inc/walInt.h | 4 +- source/libs/wal/src/wal.c | 45 +++---- {src => source/libs}/wal/src/walMgmt.c | 25 ++-- {src => source/libs}/wal/src/walUtil.c | 2 +- {src => source/libs}/wal/src/walWrite.c | 170 ++++-------------------- source/libs/wal/test/walTests.cpp | 137 +++++++++++++++++++ source/util/src/tconfig.c | 6 +- source/util/src/tlog.c | 2 + src/wal/inc/walInt.h | 68 ---------- src/wal/test/waltest.c | 137 ------------------- 16 files changed, 260 insertions(+), 405 deletions(-) rename {src => source/libs}/wal/src/walMgmt.c (91%) rename {src => source/libs}/wal/src/walUtil.c (99%) rename {src => source/libs}/wal/src/walWrite.c (73%) delete mode 100644 src/wal/inc/walInt.h delete mode 100644 src/wal/test/waltest.c diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index ba37e6880b..94346d705e 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -16,11 +16,21 @@ #define _TD_WAL_H_ #include "os.h" - +#include "tdef.h" +#include "tlog.h" #ifdef __cplusplus extern "C" { #endif +extern int32_t wDebugFlag; + +#define wFatal(...) { if (wDebugFlag & DEBUG_FATAL) { taosPrintLog("WAL FATAL ", 255, __VA_ARGS__); }} +#define wError(...) { if (wDebugFlag & DEBUG_ERROR) { taosPrintLog("WAL ERROR ", 255, __VA_ARGS__); }} +#define wWarn(...) { if (wDebugFlag & DEBUG_WARN) { taosPrintLog("WAL WARN ", 255, __VA_ARGS__); }} +#define wInfo(...) { if (wDebugFlag & DEBUG_INFO) { taosPrintLog("WAL ", 255, __VA_ARGS__); }} +#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} +#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} + typedef enum { TAOS_WAL_NOLOG = 0, TAOS_WAL_WRITE = 1, @@ -28,9 +38,8 @@ typedef enum { } EWalType; typedef struct { - int8_t msgType; - int8_t sver; // sver 2 for WAL SDataRow/SMemRow compatibility - int8_t reserved[2]; + int8_t sver; + int8_t reserved[3]; int32_t len; int64_t version; uint32_t signature; @@ -44,11 +53,33 @@ typedef struct { EWalType walLevel; // wal level } SWalCfg; +#define WAL_PREFIX "wal" +#define WAL_PREFIX_LEN 3 +#define WAL_REFRESH_MS 1000 +#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16) +#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE)) +#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) +#define WAL_FILE_LEN (WAL_PATH_LEN + 32) +#define WAL_FILE_NUM 1 // 3 + typedef struct SWal { - int8_t unused; + int64_t version; + int64_t fileId; + int64_t rId; + int64_t tfd; + int32_t vgId; + int32_t keep; + int32_t level; + int32_t fsyncPeriod; + int32_t fsyncSeq; + int8_t stop; + int8_t reseved[3]; + char path[WAL_PATH_LEN]; + char name[WAL_FILE_LEN]; + pthread_mutex_t mutex; } SWal; // WAL HANDLE -typedef int32_t (*FWalWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg); +typedef int32_t (*FWalWrite)(void *ahandle, void *pHead, void *pMsg); // module initialization int32_t walInit(); @@ -82,6 +113,11 @@ int64_t walGetSnapshotVer(SWal *); int64_t walGetLastVer(SWal *); // int32_t walDataCorrupted(SWal*); +//internal +int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId); +int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId); +int32_t walGetNewFile(SWal *pWal, int64_t *newFileId); + #ifdef __cplusplus } #endif diff --git a/include/os/os.h b/include/os/os.h index 44ce6f81ec..53a6cef96a 100644 --- a/include/os/os.h +++ b/include/os/os.h @@ -46,6 +46,7 @@ extern "C" { #include #include #include +#include #include #include "osAtomic.h" diff --git a/include/util/tlog.h b/include/util/tlog.h index 5c6e59b103..2ee60e4324 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -16,6 +16,8 @@ #ifndef _TD_UTIL_LOG_H #define _TD_UTIL_LOG_H +#include "os.h" + #ifdef __cplusplus extern "C" { #endif diff --git a/include/util/tnote.h b/include/util/tnote.h index e4f28d8cff..f17857a32b 100644 --- a/include/util/tnote.h +++ b/include/util/tnote.h @@ -53,7 +53,7 @@ void taosNotePrintBuffer(SNoteObj *pNote, char *buffer, int32_t len); } #define nInfo(buffer, len) \ - if (tscEmbedded == 1) { \ + if (tscEmbeddedInUtil == 1) { \ taosNotePrintBuffer(&tsInfoNote, buffer, len); \ } diff --git a/include/util/ulog.h b/include/util/ulog.h index ba59f4eb79..89d9f89476 100644 --- a/include/util/ulog.h +++ b/include/util/ulog.h @@ -20,20 +20,21 @@ extern "C" { #endif +#include "os.h" #include "tlog.h" extern int32_t uDebugFlag; -extern int8_t tscEmbedded; +extern int8_t tscEmbeddedInUtil; -#define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} -#define uError(...) { if (uDebugFlag & DEBUG_ERROR) { taosPrintLog("UTL ERROR ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} -#define uWarn(...) { if (uDebugFlag & DEBUG_WARN) { taosPrintLog("UTL WARN ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} -#define uInfo(...) { if (uDebugFlag & DEBUG_INFO) { taosPrintLog("UTL ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} +#define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }} +#define uError(...) { if (uDebugFlag & DEBUG_ERROR) { taosPrintLog("UTL ERROR ", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }} +#define uWarn(...) { if (uDebugFlag & DEBUG_WARN) { taosPrintLog("UTL WARN ", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }} +#define uInfo(...) { if (uDebugFlag & DEBUG_INFO) { taosPrintLog("UTL ", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }} #define uDebug(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }} #define uTrace(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }} -#define pError(...) { taosPrintLog("APP ERROR ", 255, __VA_ARGS__); } -#define pPrint(...) { taosPrintLog("APP ", 255, __VA_ARGS__); } +#define pError(...) { taosPrintLog("APP ERROR ", 255, __VA_ARGS__); } +#define pPrint(...) { taosPrintLog("APP ", 255, __VA_ARGS__); } #ifdef __cplusplus } diff --git a/source/libs/wal/CMakeLists.txt b/source/libs/wal/CMakeLists.txt index 4af8bac7f9..e5697415f1 100644 --- a/source/libs/wal/CMakeLists.txt +++ b/source/libs/wal/CMakeLists.txt @@ -9,4 +9,5 @@ target_include_directories( target_link_libraries( wal PUBLIC os + PUBLIC util ) diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 3cf38a5ffc..f5f944b12b 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -16,6 +16,8 @@ #ifndef _TD_WAL_INT_H_ #define _TD_WAL_INT_H_ +#include "wal.h" + #ifdef __cplusplus extern "C" { #endif @@ -24,4 +26,4 @@ extern "C" { } #endif -#endif /*_TD_WAL_INT_H_*/ \ No newline at end of file +#endif /*_TD_WAL_INT_H_*/ diff --git a/source/libs/wal/src/wal.c b/source/libs/wal/src/wal.c index c107a94f3f..05d81e0867 100644 --- a/source/libs/wal/src/wal.c +++ b/source/libs/wal/src/wal.c @@ -15,40 +15,35 @@ #include "wal.h" -int32_t walInit() { return 0; } - -void walCleanUp() {} - -SWal *walOpen(char *path, SWalCfg *pCfg) { - SWal* pWal = malloc(sizeof(SWal)); - if(pWal == NULL) { - return NULL; - } - return pWal; +int32_t walCommit(SWal *pWal, int64_t ver) { + return 0; } -int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { return 0; } - -void walClose(SWal *pWal) { - if(pWal) free(pWal); +int32_t walRollback(SWal *pWal, int64_t ver) { + return 0; } -void walFsync(SWal *pWal, bool force) {} - -int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) { +int32_t walPrune(SWal *pWal, int64_t ver) { return 0; } -int32_t walCommit(SWal *pWal, int64_t ver) { return 0; } -int32_t walRollback(SWal *pWal, int64_t ver) { return 0; } +int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { + return 0; +} -int32_t walPrune(SWal *pWal, int64_t ver) { return 0; } +int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) { + return 0; +} +int64_t walGetFirstVer(SWal *pWal) { + return 0; +} -int32_t walRead(SWal *, SWalHead **, int64_t ver); -int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum); +int64_t walGetSnapshotVer(SWal *pWal) { + return 0; +} -int64_t walGetFirstVer(SWal *); -int64_t walGetSnapshotVer(SWal *); -int64_t walGetLastVer(SWal *); +int64_t walGetLastVer(SWal *pWal) { + return 0; +} diff --git a/src/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c similarity index 91% rename from src/wal/src/walMgmt.c rename to source/libs/wal/src/walMgmt.c index dbff08d730..2bc12b374c 100644 --- a/src/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -18,7 +18,6 @@ #include "taoserror.h" #include "tref.h" #include "tfile.h" -#include "twal.h" #include "walInt.h" typedef struct { @@ -62,8 +61,8 @@ void walCleanUp() { wInfo("wal module is cleaned up"); } -void *walOpen(char *path, SWalCfg *pCfg) { - SWal *pWal = tcalloc(1, sizeof(SWal)); +SWal *walOpen(char *path, SWalCfg *pCfg) { + SWal *pWal = malloc(sizeof(SWal)); if (pWal == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return NULL; @@ -73,7 +72,7 @@ void *walOpen(char *path, SWalCfg *pCfg) { pWal->tfd = -1; pWal->fileId = -1; pWal->level = pCfg->walLevel; - pWal->keep = pCfg->keep; + /*pWal->keep = pCfg->keep;*/ pWal->fsyncPeriod = pCfg->fsyncPeriod; tstrncpy(pWal->path, path, sizeof(pWal->path)); pthread_mutex_init(&pWal->mutex, NULL); @@ -86,8 +85,8 @@ void *walOpen(char *path, SWalCfg *pCfg) { return NULL; } - pWal->rid = taosAddRef(tsWal.refId, pWal); - if (pWal->rid < 0) { + pWal->rId = taosAddRef(tsWal.refId, pWal); + if (pWal->rId < 0) { walFreeObj(pWal); return NULL; } @@ -97,9 +96,8 @@ void *walOpen(char *path, SWalCfg *pCfg) { return pWal; } -int32_t walAlter(void *handle, SWalCfg *pCfg) { - if (handle == NULL) return TSDB_CODE_WAL_APP_ERROR; - SWal *pWal = handle; +int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { + if (pWal == NULL) return TSDB_CODE_WAL_APP_ERROR; if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) { wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->vgId, pWal->level, @@ -128,14 +126,13 @@ void walStop(void *handle) { wDebug("vgId:%d, stop write wal", pWal->vgId); } -void walClose(void *handle) { - if (handle == NULL) return; +void walClose(SWal *pWal) { + if (pWal == NULL) return; - SWal *pWal = handle; pthread_mutex_lock(&pWal->mutex); tfClose(pWal->tfd); pthread_mutex_unlock(&pWal->mutex); - taosRemoveRef(tsWal.refId, pWal->rid); + taosRemoveRef(tsWal.refId, pWal->rId); } static int32_t walInitObj(SWal *pWal) { @@ -186,7 +183,7 @@ static void walFsyncAll() { wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code)); } } - pWal = taosIterateRef(tsWal.refId, pWal->rid); + pWal = taosIterateRef(tsWal.refId, pWal->rId); } } diff --git a/src/wal/src/walUtil.c b/source/libs/wal/src/walUtil.c similarity index 99% rename from src/wal/src/walUtil.c rename to source/libs/wal/src/walUtil.c index e4d9a555b3..c88cc918fe 100644 --- a/src/wal/src/walUtil.c +++ b/source/libs/wal/src/walUtil.c @@ -115,4 +115,4 @@ int32_t walGetNewFile(SWal *pWal, int64_t *newFileId) { wTrace("vgId:%d, path:%s, newFileId:%" PRId64, pWal->vgId, pWal->path, *newFileId); return 0; -} \ No newline at end of file +} diff --git a/src/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c similarity index 73% rename from src/wal/src/walWrite.c rename to source/libs/wal/src/walWrite.c index cae4291eb8..023b1c4a48 100644 --- a/src/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -14,13 +14,11 @@ */ #define _DEFAULT_SOURCE -#define TAOS_RANDOM_FILE_FAIL_TEST + #include "os.h" #include "taoserror.h" -#include "taosmsg.h" #include "tchecksum.h" #include "tfile.h" -#include "twal.h" #include "walInt.h" static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId); @@ -43,12 +41,12 @@ int32_t walRenew(void *handle) { wDebug("vgId:%d, file:%s, it is closed while renew", pWal->vgId, pWal->name); } - if (pWal->keep == TAOS_WAL_KEEP) { - pWal->fileId = 0; - } else { - if (walGetNewFile(pWal, &pWal->fileId) != 0) pWal->fileId = 0; - pWal->fileId++; - } + /*if (pWal->keep == TAOS_WAL_KEEP) {*/ + /*pWal->fileId = 0;*/ + /*} else {*/ + /*if (walGetNewFile(pWal, &pWal->fileId) != 0) pWal->fileId = 0;*/ + /*pWal->fileId++;*/ + /*}*/ snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); pWal->tfd = tfOpenCreateWrite(pWal->name); @@ -68,7 +66,7 @@ int32_t walRenew(void *handle) { void walRemoveOneOldFile(void *handle) { SWal *pWal = handle; if (pWal == NULL) return; - if (pWal->keep == TAOS_WAL_KEEP) return; + /*if (pWal->keep == TAOS_WAL_KEEP) return;*/ if (!tfValid(pWal->tfd)) return; pthread_mutex_lock(&pWal->mutex); @@ -117,7 +115,7 @@ void walRemoveAllOldFiles(void *handle) { static void walUpdateChecksum(SWalHead *pHead) { pHead->sver = 2; pHead->cksum = 0; - pHead->cksum = taosCalcChecksum(0, (uint8_t *)pHead, sizeof(*pHead) + pHead->len); + pHead->cksum = taosCalcChecksum(0, (uint8_t *)pHead, sizeof(SWalHead) + pHead->len); } static int walValidateChecksum(SWalHead *pHead) { @@ -134,10 +132,14 @@ static int walValidateChecksum(SWalHead *pHead) { #endif -int32_t walWrite(void *handle, SWalHead *pHead) { - if (handle == NULL) return -1; +int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) { + if (pWal == NULL) return -1; - SWal * pWal = handle; + SWalHead *pHead = malloc(sizeof(SWalHead) + bodyLen); + if(pHead == NULL) { + return -1; + } + pHead->version = index; int32_t code = 0; // no wal @@ -146,6 +148,9 @@ int32_t walWrite(void *handle, SWalHead *pHead) { if (pHead->version <= pWal->version) return 0; pHead->signature = WAL_SIGNATURE; + pHead->len = bodyLen; + memcpy(pHead->cont, body, bodyLen); + #if defined(WAL_CHECKSUM_WHOLE) walUpdateChecksum(pHead); #else @@ -173,8 +178,7 @@ int32_t walWrite(void *handle, SWalHead *pHead) { return code; } -void walFsync(void *handle, bool forceFsync) { - SWal *pWal = handle; +void walFsync(SWal *pWal, bool forceFsync) { if (pWal == NULL || !tfValid(pWal->tfd)) return; if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) { @@ -211,7 +215,7 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) { count++; } - if (pWal->keep != TAOS_WAL_KEEP) return TSDB_CODE_SUCCESS; + /*if (pWal->keep != TAOS_WAL_KEEP) return TSDB_CODE_SUCCESS;*/ if (count == 0) { wDebug("vgId:%d, wal file not exist, renew it", pWal->vgId); @@ -307,119 +311,10 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, return TSDB_CODE_WAL_FILE_CORRUPTED; } -// Add SMemRowType ahead of SDataRow -static void expandSubmitBlk(SSubmitBlk *pDest, SSubmitBlk *pSrc, int32_t *lenExpand) { - // copy the header firstly - memcpy(pDest, pSrc, sizeof(SSubmitBlk)); - - int32_t nRows = htons(pDest->numOfRows); - int32_t dataLen = htonl(pDest->dataLen); - - if ((nRows <= 0) || (dataLen <= 0)) { - return; - } - - char *pDestData = pDest->data; - char *pSrcData = pSrc->data; - for (int32_t i = 0; i < nRows; ++i) { - memRowSetType(pDestData, SMEM_ROW_DATA); - memcpy(memRowDataBody(pDestData), pSrcData, dataRowLen(pSrcData)); - pDestData = POINTER_SHIFT(pDestData, memRowTLen(pDestData)); - pSrcData = POINTER_SHIFT(pSrcData, dataRowLen(pSrcData)); - ++(*lenExpand); - } - pDest->dataLen = htonl(dataLen + nRows * sizeof(uint8_t)); -} - -// Check SDataRow by comparing the SDataRow len and SSubmitBlk dataLen -static bool walIsSDataRow(void *pBlkData, int nRows, int32_t dataLen) { - if ((nRows <= 0) || (dataLen <= 0)) { - return true; - } - int32_t len = 0, kvLen = 0; - for (int i = 0; i < nRows; ++i) { - len += dataRowLen(pBlkData); - if (len > dataLen) { - return false; - } - - /** - * For SDataRow between version [2.1.5.0 and 2.1.6.X], it would never conflict. - * For SKVRow between version [2.1.5.0 and 2.1.6.X], it may conflict in below scenario - * - with 1st type byte 0x01 and sversion 0x0101(257), thus do further check - */ - if (dataRowLen(pBlkData) == 257) { - SMemRow memRow = pBlkData; - SKVRow kvRow = memRowKvBody(memRow); - int nCols = kvRowNCols(kvRow); - uint16_t calcTsOffset = (uint16_t)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nCols); - uint16_t realTsOffset = (kvRowColIdx(kvRow))->offset; - if (calcTsOffset == realTsOffset) { - kvLen += memRowKvTLen(memRow); - } - } - pBlkData = POINTER_SHIFT(pBlkData, dataRowLen(pBlkData)); - } - if (len != dataLen) { - return false; - } - if (kvLen == dataLen) { - return false; - } - return true; -} -// for WAL SMemRow/SDataRow compatibility -static int walSMemRowCheck(SWalHead *pHead) { - if ((pHead->sver < 2) && (pHead->msgType == TSDB_MSG_TYPE_SUBMIT)) { - SSubmitMsg *pMsg = (SSubmitMsg *)pHead->cont; - int32_t numOfBlocks = htonl(pMsg->numOfBlocks); - if (numOfBlocks <= 0) { - return 0; - } - - int32_t nTotalRows = 0; - SSubmitBlk *pBlk = (SSubmitBlk *)pMsg->blocks; - for (int32_t i = 0; i < numOfBlocks; ++i) { - int32_t dataLen = htonl(pBlk->dataLen); - int32_t nRows = htons(pBlk->numOfRows); - nTotalRows += nRows; - if (!walIsSDataRow(pBlk->data, nRows, dataLen)) { - return 0; - } - pBlk = (SSubmitBlk *)POINTER_SHIFT(pBlk, sizeof(SSubmitBlk) + dataLen); - } - ASSERT(nTotalRows >= 0); - SWalHead *pWalHead = (SWalHead *)calloc(sizeof(SWalHead) + pHead->len + nTotalRows * sizeof(uint8_t), 1); - if (pWalHead == NULL) { - return -1; - } - - memcpy(pWalHead, pHead, sizeof(SWalHead) + sizeof(SSubmitMsg)); - - SSubmitMsg *pDestMsg = (SSubmitMsg *)pWalHead->cont; - SSubmitBlk *pDestBlks = (SSubmitBlk *)pDestMsg->blocks; - SSubmitBlk *pSrcBlks = (SSubmitBlk *)pMsg->blocks; - int32_t lenExpand = 0; - for (int32_t i = 0; i < numOfBlocks; ++i) { - expandSubmitBlk(pDestBlks, pSrcBlks, &lenExpand); - pDestBlks = POINTER_SHIFT(pDestBlks, htonl(pDestBlks->dataLen) + sizeof(SSubmitBlk)); - pSrcBlks = POINTER_SHIFT(pSrcBlks, htonl(pSrcBlks->dataLen) + sizeof(SSubmitBlk)); - } - if (lenExpand > 0) { - pDestMsg->header.contLen = htonl(pDestMsg->length) + lenExpand; - pDestMsg->length = htonl(pDestMsg->header.contLen); - pWalHead->len = pWalHead->len + lenExpand; - } - - memcpy(pHead, pWalHead, sizeof(SWalHead) + pWalHead->len); - tfree(pWalHead); - } - return 0; -} static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId) { int32_t size = WAL_MAX_SIZE; - void * buffer = tmalloc(size); + void * buffer = malloc(size); if (buffer == NULL) { wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); @@ -541,14 +436,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch pWal->version = pHead->version; // wInfo("writeFp: %ld", offset); - if (0 != walSMemRowCheck(pHead)) { - wError("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d offset:%" PRId64, - pWal->vgId, fileId, pHead->version, pWal->version, pHead->len, offset); - tfClose(tfd); - tfree(buffer); - return TAOS_SYSTEM_ERROR(errno); - } - (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL); + (*writeFp)(pVnode, pHead, NULL); } tfClose(tfd); @@ -558,9 +446,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch return code; } -uint64_t walGetVersion(twalh param) { - SWal *pWal = param; - if (pWal == 0) return 0; +uint64_t walGetVersion(SWal *pWal) { + if (pWal == NULL) return 0; return pWal->version; } @@ -570,10 +457,9 @@ uint64_t walGetVersion(twalh param) { // Some new wal record cannot be written to the wal file in dnode1 for wal version not reset, then fversion and the record in wal file may inconsistent, // At this time, if dnode2 down, dnode1 switched to master. After dnode2 start and restore data from dnode1, data loss will occur -void walResetVersion(twalh param, uint64_t newVer) { - SWal *pWal = param; - if (pWal == 0) return; +void walResetVersion(SWal *pWal, uint64_t newVer) { + if (pWal == NULL) return; wInfo("vgId:%d, version reset from %" PRIu64 " to %" PRIu64, pWal->vgId, pWal->version, newVer); pWal->version = newVer; -} \ No newline at end of file +} diff --git a/source/libs/wal/test/walTests.cpp b/source/libs/wal/test/walTests.cpp index e69de29bb2..505728fbe4 100644 --- a/source/libs/wal/test/walTests.cpp +++ b/source/libs/wal/test/walTests.cpp @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//#define _DEFAULT_SOURCE +#include "os.h" +#include "tutil.h" +#include "tglobal.h" +#include "tlog.h" +#include "twal.h" +#include "tfile.h" + +int64_t ver = 0; +void *pWal = NULL; + +int writeToQueue(void *pVnode, void *data, int type, void *pMsg) { + // do nothing + SWalHead *pHead = data; + + if (pHead->version > ver) + ver = pHead->version; + + walWrite(pWal, pHead); + + return 0; +} + +int main(int argc, char *argv[]) { + char path[128] = "/tmp/wal"; + int level = 2; + int total = 5; + int rows = 10000; + int size = 128; + int keep = 0; + + for (int i=1; iversion = ++ver; + pHead->len = size; + walWrite(pWal, pHead); + } + + printf("renew a wal, i:%d\n", i); + walRenew(pWal); + } + + printf("%d wal files are written\n", total); + + int64_t index = 0; + char name[256]; + + while (1) { + int code = walGetWalFile(pWal, name, &index); + if (code == -1) { + printf("failed to get wal file, index:%" PRId64 "\n", index); + break; + } + + printf("index:%" PRId64 " wal:%s\n", index, name); + if (code == 0) break; + } + + getchar(); + + walClose(pWal); + walCleanUp(); + tfCleanup(); + + return 0; +} diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index 6b3f08a446..1ca29f798a 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -402,7 +402,7 @@ void taosPrintGlobalCfg() { for (int i = 0; i < tsGlobalConfigNum; ++i) { SGlobalCfg *cfg = tsGlobalConfig + i; - if (tscEmbedded == 0 && !(cfg->cfgType & TSDB_CFG_CTYPE_B_CLIENT)) continue; + if (tscEmbeddedInUtil == 0 && !(cfg->cfgType & TSDB_CFG_CTYPE_B_CLIENT)) continue; if (cfg->cfgType & TSDB_CFG_CTYPE_B_NOT_PRINT) continue; int optionLen = (int)strlen(cfg->option); @@ -487,7 +487,7 @@ void taosDumpGlobalCfg() { printf("==================================\n"); for (int i = 0; i < tsGlobalConfigNum; ++i) { SGlobalCfg *cfg = tsGlobalConfig + i; - if (tscEmbedded == 0 && !(cfg->cfgType & TSDB_CFG_CTYPE_B_CLIENT)) continue; + if (tscEmbeddedInUtil == 0 && !(cfg->cfgType & TSDB_CFG_CTYPE_B_CLIENT)) continue; if (cfg->cfgType & TSDB_CFG_CTYPE_B_NOT_PRINT) continue; if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_SHOW)) continue; @@ -499,7 +499,7 @@ void taosDumpGlobalCfg() { for (int i = 0; i < tsGlobalConfigNum; ++i) { SGlobalCfg *cfg = tsGlobalConfig + i; - if (tscEmbedded == 0 && !(cfg->cfgType & TSDB_CFG_CTYPE_B_CLIENT)) continue; + if (tscEmbeddedInUtil == 0 && !(cfg->cfgType & TSDB_CFG_CTYPE_B_CLIENT)) continue; if (cfg->cfgType & TSDB_CFG_CTYPE_B_NOT_PRINT) continue; if (cfg->cfgType & TSDB_CFG_CTYPE_B_SHOW) continue; diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 2fb84656b6..e6cc3a53af 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -68,6 +68,8 @@ typedef struct { pthread_mutex_t logMutex; } SLogObj; +int8_t tscEmbeddedInUtil = 0; + int32_t tsLogKeepDays = 0; int8_t tsAsyncLog = 1; float tsTotalLogDirGB = 0; diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h deleted file mode 100644 index 890b404ce9..0000000000 --- a/src/wal/inc/walInt.h +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_WAL_INT_H -#define TDENGINE_WAL_INT_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include "tlog.h" - -extern int32_t wDebugFlag; - -#define wFatal(...) { if (wDebugFlag & DEBUG_FATAL) { taosPrintLog("WAL FATAL ", 255, __VA_ARGS__); }} -#define wError(...) { if (wDebugFlag & DEBUG_ERROR) { taosPrintLog("WAL ERROR ", 255, __VA_ARGS__); }} -#define wWarn(...) { if (wDebugFlag & DEBUG_WARN) { taosPrintLog("WAL WARN ", 255, __VA_ARGS__); }} -#define wInfo(...) { if (wDebugFlag & DEBUG_INFO) { taosPrintLog("WAL ", 255, __VA_ARGS__); }} -#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} -#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} - -#define WAL_PREFIX "wal" -#define WAL_PREFIX_LEN 3 -#define WAL_REFRESH_MS 1000 -#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16) -#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE)) -#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) -#define WAL_FILE_LEN (WAL_PATH_LEN + 32) -#define WAL_FILE_NUM 1 // 3 - -typedef struct { - uint64_t version; - int64_t fileId; - int64_t rid; - int64_t tfd; - int32_t vgId; - int32_t keep; - int32_t level; - int32_t fsyncPeriod; - int32_t fsyncSeq; - int8_t stop; - int8_t reserved[3]; - char path[WAL_PATH_LEN]; - char name[WAL_FILE_LEN]; - pthread_mutex_t mutex; -} SWal; - -int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId); -int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId); -int32_t walGetNewFile(SWal *pWal, int64_t *newFileId); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/wal/test/waltest.c b/src/wal/test/waltest.c deleted file mode 100644 index 505728fbe4..0000000000 --- a/src/wal/test/waltest.c +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -//#define _DEFAULT_SOURCE -#include "os.h" -#include "tutil.h" -#include "tglobal.h" -#include "tlog.h" -#include "twal.h" -#include "tfile.h" - -int64_t ver = 0; -void *pWal = NULL; - -int writeToQueue(void *pVnode, void *data, int type, void *pMsg) { - // do nothing - SWalHead *pHead = data; - - if (pHead->version > ver) - ver = pHead->version; - - walWrite(pWal, pHead); - - return 0; -} - -int main(int argc, char *argv[]) { - char path[128] = "/tmp/wal"; - int level = 2; - int total = 5; - int rows = 10000; - int size = 128; - int keep = 0; - - for (int i=1; iversion = ++ver; - pHead->len = size; - walWrite(pWal, pHead); - } - - printf("renew a wal, i:%d\n", i); - walRenew(pWal); - } - - printf("%d wal files are written\n", total); - - int64_t index = 0; - char name[256]; - - while (1) { - int code = walGetWalFile(pWal, name, &index); - if (code == -1) { - printf("failed to get wal file, index:%" PRId64 "\n", index); - break; - } - - printf("index:%" PRId64 " wal:%s\n", index, name); - if (code == 0) break; - } - - getchar(); - - walClose(pWal); - walCleanUp(); - tfCleanup(); - - return 0; -} -- GitLab