diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 2214078f5587799ff4daea4f708e920a95e97fcf..d7e15929119214a8674dde109dd3e52cfbbedf1a 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -306,6 +306,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000) //"Unexpected generic error in wal") #define TSDB_CODE_WAL_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x1001) //"WAL file is corrupted") #define TSDB_CODE_WAL_SIZE_LIMIT TAOS_DEF_ERROR_CODE(0, 0x1002) //"WAL size exceeds limit") +#define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1003) //"WAL out of memory") // http #define TSDB_CODE_HTTP_SERVER_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x1100) //"http server is not onlin") diff --git a/src/inc/twal.h b/src/inc/twal.h index bce398d6f95518379c1cd4c0d95a4f6324aab4d7..868a1fbd780232303b42e58185ffc00730c17546 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -32,7 +32,7 @@ typedef enum { typedef struct { int8_t msgType; - int8_t sver; + int8_t sver; // sver 2 for WAL SDataRow/SMemRow compatibility int8_t reserved[2]; int32_t len; uint64_t version; diff --git a/src/util/src/terror.c b/src/util/src/terror.c index 42fc76e6c94227b6d8b5fb886239c42e19fc064d..8d2ef29c8c6b3099ec695fb9b5b3da56f668200d 100644 --- a/src/util/src/terror.c +++ b/src/util/src/terror.c @@ -314,6 +314,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "Unexpected generic error in wal") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit") +TAOS_DEFINE_ERROR(TSDB_CODE_WAL_OUT_OF_MEMORY, "WAL out of memory") // http TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_SERVER_OFFLINE, "http server is not onlin") diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 47749987994a3af3234b0149d00f5b89c75700ff..c1b3c1ac3fd6488a2881c18a634ce993ef1d22d1 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -17,6 +17,7 @@ #define TAOS_RANDOM_FILE_FAIL_TEST #include "os.h" #include "taoserror.h" +#include "taosmsg.h" #include "tchecksum.h" #include "tfile.h" #include "twal.h" @@ -114,7 +115,7 @@ void walRemoveAllOldFiles(void *handle) { #if defined(WAL_CHECKSUM_WHOLE) static void walUpdateChecksum(SWalHead *pHead) { - pHead->sver = 1; + pHead->sver = 2; pHead->cksum = 0; pHead->cksum = taosCalcChecksum(0, (uint8_t *)pHead, sizeof(*pHead) + pHead->len); } @@ -122,7 +123,7 @@ static void walUpdateChecksum(SWalHead *pHead) { static int walValidateChecksum(SWalHead *pHead) { if (pHead->sver == 0) { // for compatible with wal before sver 1 return taosCheckChecksumWhole((uint8_t *)pHead, sizeof(*pHead)); - } else if (pHead->sver == 1) { + } else if (pHead->sver == 2 || pHead->sver == 1) { uint32_t cksum = pHead->cksum; pHead->cksum = 0; return taosCheckChecksum((uint8_t *)pHead, sizeof(*pHead) + pHead->len, cksum); @@ -281,7 +282,7 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, return TSDB_CODE_SUCCESS; } - if (pHead->sver == 1) { + if (pHead->sver == 2 || pHead->sver == 1) { if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) { wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos); return TSDB_CODE_WAL_FILE_CORRUPTED; @@ -306,7 +307,85 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, return TSDB_CODE_WAL_FILE_CORRUPTED; } +// Add SMemRowType ahead of SDataRow +static void expandSubmitBlk(SSubmitBlk *pDest, SSubmitBlk *pSrc, int32_t *lenExpand) { + memcpy(pDest, pSrc, sizeof(SSubmitBlk)); + int nRows = htons(pSrc->numOfRows); + if (nRows <= 0) { + return; + } + char *pDestData = pDest->data; + char *pSrcData = pSrc->data; + for (int i = 0; i < nRows; ++i) { + memRowSetType(pDestData, SMEM_ROW_DATA); + memcpy(memRowDataBody(pDestData), pSrcData, dataRowLen(pSrcData)); + pDestData = POINTER_SHIFT(pDestData, memRowTLen(pDestData)); + pSrcData = POINTER_SHIFT(pSrcData, dataRowLen(pSrcData)); + ++(*lenExpand); + } + int32_t dataLen = htonl(pDest->dataLen); + pDest->dataLen = htonl(dataLen + nRows * sizeof(uint8_t)); +} + +static bool walIsSDataRow(void *pBlkData, int nRows, int32_t dataLen) { + int32_t len = 0; + for (int i = 0; i < nRows; ++i) { + len += dataRowLen(pBlkData); + if (len > dataLen) { + return false; + } + pBlkData = POINTER_SHIFT(pBlkData, dataRowLen(pBlkData)); + } + if (len != dataLen) { + return false; + } + return true; +} +// for WAL SMemRow/SDataRow compatibility +static int walSMemRowCheck(SWalHead *pHead) { + if ((pHead->sver < 2) && (pHead->msgType == TSDB_MSG_TYPE_SUBMIT)) { + SSubmitMsg *pMsg = (SSubmitMsg *)pHead->cont; + int32_t numOfBlocks = htonl(pMsg->numOfBlocks); + if (numOfBlocks <= 0) { + return 0; + } + + int32_t nTotalRows = 0; + SSubmitBlk *pBlk = (SSubmitBlk *)pMsg->blocks; + for (int32_t i = 0; i < numOfBlocks; ++i) { + int32_t dataLen = htonl(pBlk->dataLen); + int32_t nRows = htons(pBlk->numOfRows); + nTotalRows += nRows; + if (!walIsSDataRow(pBlk->data, nRows, dataLen)) { + return 0; + } + pBlk = (SSubmitBlk *)POINTER_SHIFT(pBlk, sizeof(SSubmitBlk) + dataLen); + } + SWalHead *pWalHead = (SWalHead *)calloc(sizeof(SWalHead) + pHead->len + nTotalRows * sizeof(uint8_t), 1); + if (pWalHead == NULL) { + return TSDB_CODE_WAL_OUT_OF_MEMORY; + } + // len should be updated + memcpy(pWalHead, pHead, sizeof(SWalHead) + sizeof(SSubmitMsg)); + + SSubmitMsg *pDestMsg = (SSubmitMsg *)pWalHead->cont; + SSubmitBlk *pDestBlks = (SSubmitBlk *)pDestMsg->blocks; + SSubmitBlk *pSrcBlks = (SSubmitBlk *)pMsg->blocks; + int32_t lenExpand = 0; + for (int32_t i = 0; i < numOfBlocks; ++i) { + expandSubmitBlk(pDestBlks, pSrcBlks, &lenExpand); + } + if (lenExpand > 0) { + pDestMsg->length = htonl(htonl(pDestMsg->length) + lenExpand); + pWalHead->len = pWalHead->len + lenExpand; + } + + memcpy(pHead, pWalHead, sizeof(SWalHead) + pWalHead->len); + tfree(pWalHead); + } + return 0; +} static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId) { int32_t size = WAL_MAX_SIZE; @@ -346,7 +425,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch } #if defined(WAL_CHECKSUM_WHOLE) - if ((pHead->sver == 0 && !walValidateChecksum(pHead)) || pHead->sver < 0 || pHead->sver > 1) { + if ((pHead->sver == 0 && !walValidateChecksum(pHead)) || pHead->sver < 0 || pHead->sver > 2) { wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, pHead->version, pHead->len, offset); code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); @@ -379,7 +458,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch continue; } - if (pHead->sver == 1 && !walValidateChecksum(pHead)) { + if ((pHead->sver == 2 || pHead->sver == 1) && !walValidateChecksum(pHead)) { wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, pHead->version, pHead->len, offset); code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); @@ -431,7 +510,13 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch pWal->version = pHead->version; - //wInfo("writeFp: %ld", offset); + // wInfo("writeFp: %ld", offset); + + if (0 != walSMemRowCheck(pHead)) { + wError("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d offset:%" PRId64, + pWal->vgId, fileId, pHead->version, pWal->version, pHead->len, offset); + } + (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL); }