提交 3948b97e 编写于 作者: C Cary Xu

[TD-6044]<hotfix>: WAL compatibility since v2.1.5.0

上级 39280a5e
......@@ -306,6 +306,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000) //"Unexpected generic error in wal")
#define TSDB_CODE_WAL_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x1001) //"WAL file is corrupted")
#define TSDB_CODE_WAL_SIZE_LIMIT TAOS_DEF_ERROR_CODE(0, 0x1002) //"WAL size exceeds limit")
#define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1003) //"WAL out of memory")
// http
#define TSDB_CODE_HTTP_SERVER_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x1100) //"http server is not onlin")
......
......@@ -32,7 +32,7 @@ typedef enum {
typedef struct {
int8_t msgType;
int8_t sver;
int8_t sver; // sver 2 for WAL SDataRow/SMemRow compatibility
int8_t reserved[2];
int32_t len;
uint64_t version;
......
......@@ -314,6 +314,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "Unexpected generic error in wal")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_OUT_OF_MEMORY, "WAL out of memory")
// http
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_SERVER_OFFLINE, "http server is not onlin")
......
......@@ -17,6 +17,7 @@
#define TAOS_RANDOM_FILE_FAIL_TEST
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tchecksum.h"
#include "tfile.h"
#include "twal.h"
......@@ -114,7 +115,7 @@ void walRemoveAllOldFiles(void *handle) {
#if defined(WAL_CHECKSUM_WHOLE)
static void walUpdateChecksum(SWalHead *pHead) {
pHead->sver = 1;
pHead->sver = 2;
pHead->cksum = 0;
pHead->cksum = taosCalcChecksum(0, (uint8_t *)pHead, sizeof(*pHead) + pHead->len);
}
......@@ -122,7 +123,7 @@ static void walUpdateChecksum(SWalHead *pHead) {
static int walValidateChecksum(SWalHead *pHead) {
if (pHead->sver == 0) { // for compatible with wal before sver 1
return taosCheckChecksumWhole((uint8_t *)pHead, sizeof(*pHead));
} else if (pHead->sver == 1) {
} else if (pHead->sver == 2 || pHead->sver == 1) {
uint32_t cksum = pHead->cksum;
pHead->cksum = 0;
return taosCheckChecksum((uint8_t *)pHead, sizeof(*pHead) + pHead->len, cksum);
......@@ -281,7 +282,7 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd,
return TSDB_CODE_SUCCESS;
}
if (pHead->sver == 1) {
if (pHead->sver == 2 || pHead->sver == 1) {
if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) {
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED;
......@@ -306,7 +307,85 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd,
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
// Add SMemRowType ahead of SDataRow
static void expandSubmitBlk(SSubmitBlk *pDest, SSubmitBlk *pSrc, int32_t *lenExpand) {
memcpy(pDest, pSrc, sizeof(SSubmitBlk));
int nRows = htons(pSrc->numOfRows);
if (nRows <= 0) {
return;
}
char *pDestData = pDest->data;
char *pSrcData = pSrc->data;
for (int i = 0; i < nRows; ++i) {
memRowSetType(pDestData, SMEM_ROW_DATA);
memcpy(memRowDataBody(pDestData), pSrcData, dataRowLen(pSrcData));
pDestData = POINTER_SHIFT(pDestData, memRowTLen(pDestData));
pSrcData = POINTER_SHIFT(pSrcData, dataRowLen(pSrcData));
++(*lenExpand);
}
int32_t dataLen = htonl(pDest->dataLen);
pDest->dataLen = htonl(dataLen + nRows * sizeof(uint8_t));
}
static bool walIsSDataRow(void *pBlkData, int nRows, int32_t dataLen) {
int32_t len = 0;
for (int i = 0; i < nRows; ++i) {
len += dataRowLen(pBlkData);
if (len > dataLen) {
return false;
}
pBlkData = POINTER_SHIFT(pBlkData, dataRowLen(pBlkData));
}
if (len != dataLen) {
return false;
}
return true;
}
// for WAL SMemRow/SDataRow compatibility
static int walSMemRowCheck(SWalHead *pHead) {
if ((pHead->sver < 2) && (pHead->msgType == TSDB_MSG_TYPE_SUBMIT)) {
SSubmitMsg *pMsg = (SSubmitMsg *)pHead->cont;
int32_t numOfBlocks = htonl(pMsg->numOfBlocks);
if (numOfBlocks <= 0) {
return 0;
}
int32_t nTotalRows = 0;
SSubmitBlk *pBlk = (SSubmitBlk *)pMsg->blocks;
for (int32_t i = 0; i < numOfBlocks; ++i) {
int32_t dataLen = htonl(pBlk->dataLen);
int32_t nRows = htons(pBlk->numOfRows);
nTotalRows += nRows;
if (!walIsSDataRow(pBlk->data, nRows, dataLen)) {
return 0;
}
pBlk = (SSubmitBlk *)POINTER_SHIFT(pBlk, sizeof(SSubmitBlk) + dataLen);
}
SWalHead *pWalHead = (SWalHead *)calloc(sizeof(SWalHead) + pHead->len + nTotalRows * sizeof(uint8_t), 1);
if (pWalHead == NULL) {
return TSDB_CODE_WAL_OUT_OF_MEMORY;
}
// len should be updated
memcpy(pWalHead, pHead, sizeof(SWalHead) + sizeof(SSubmitMsg));
SSubmitMsg *pDestMsg = (SSubmitMsg *)pWalHead->cont;
SSubmitBlk *pDestBlks = (SSubmitBlk *)pDestMsg->blocks;
SSubmitBlk *pSrcBlks = (SSubmitBlk *)pMsg->blocks;
int32_t lenExpand = 0;
for (int32_t i = 0; i < numOfBlocks; ++i) {
expandSubmitBlk(pDestBlks, pSrcBlks, &lenExpand);
}
if (lenExpand > 0) {
pDestMsg->length = htonl(htonl(pDestMsg->length) + lenExpand);
pWalHead->len = pWalHead->len + lenExpand;
}
memcpy(pHead, pWalHead, sizeof(SWalHead) + pWalHead->len);
tfree(pWalHead);
}
return 0;
}
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId) {
int32_t size = WAL_MAX_SIZE;
......@@ -346,7 +425,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
}
#if defined(WAL_CHECKSUM_WHOLE)
if ((pHead->sver == 0 && !walValidateChecksum(pHead)) || pHead->sver < 0 || pHead->sver > 1) {
if ((pHead->sver == 0 && !walValidateChecksum(pHead)) || pHead->sver < 0 || pHead->sver > 2) {
wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
......@@ -379,7 +458,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
continue;
}
if (pHead->sver == 1 && !walValidateChecksum(pHead)) {
if ((pHead->sver == 2 || pHead->sver == 1) && !walValidateChecksum(pHead)) {
wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
......@@ -431,7 +510,13 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
pWal->version = pHead->version;
//wInfo("writeFp: %ld", offset);
// wInfo("writeFp: %ld", offset);
if (0 != walSMemRowCheck(pHead)) {
wError("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d offset:%" PRId64,
pWal->vgId, fileId, pHead->version, pWal->version, pHead->len, offset);
}
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册