未验证 提交 ff868b51 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #5195 from taosdata/enhance/TD-2955-v2

[TD-2955]<enhance>: [wal][v2] checksum whole wal record instead of just head
...@@ -79,3 +79,15 @@ tests/comparisonTest/opentsdb/opentsdbtest/.settings/ ...@@ -79,3 +79,15 @@ tests/comparisonTest/opentsdb/opentsdbtest/.settings/
tests/examples/JDBC/JDBCDemo/.classpath tests/examples/JDBC/JDBCDemo/.classpath
tests/examples/JDBC/JDBCDemo/.project tests/examples/JDBC/JDBCDemo/.project
tests/examples/JDBC/JDBCDemo/.settings/ tests/examples/JDBC/JDBCDemo/.settings/
# Emacs
# -*- mode: gitignore; -*-
*~
\#*\#
/.emacs.desktop
/.emacs.desktop.lock
*.elc
auto-save-list
tramp
.\#*
TAGS
CMAKE_MINIMUM_REQUIRED(VERSION 2.8) CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine) PROJECT(TDengine)
ADD_DEFINITIONS(-DWAL_CHECKSUM_WHOLE)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC)
......
...@@ -111,6 +111,28 @@ void walRemoveAllOldFiles(void *handle) { ...@@ -111,6 +111,28 @@ void walRemoveAllOldFiles(void *handle) {
pthread_mutex_unlock(&pWal->mutex); pthread_mutex_unlock(&pWal->mutex);
} }
#if defined(WAL_CHECKSUM_WHOLE)
static void walUpdateChecksum(SWalHead *pHead) {
pHead->sver = 1;
pHead->cksum = 0;
pHead->cksum = taosCalcChecksum(0, (uint8_t *)pHead, sizeof(*pHead) + pHead->len);
}
static int walValidateChecksum(SWalHead *pHead) {
if (pHead->sver == 0) { // for compatible with wal before sver 1
return taosCheckChecksumWhole((uint8_t *)pHead, sizeof(*pHead));
} else if (pHead->sver == 1) {
uint32_t cksum = pHead->cksum;
pHead->cksum = 0;
return taosCheckChecksum((uint8_t *)pHead, sizeof(*pHead) + pHead->len, cksum);
}
return 0;
}
#endif
int32_t walWrite(void *handle, SWalHead *pHead) { int32_t walWrite(void *handle, SWalHead *pHead) {
if (handle == NULL) return -1; if (handle == NULL) return -1;
...@@ -123,7 +145,13 @@ int32_t walWrite(void *handle, SWalHead *pHead) { ...@@ -123,7 +145,13 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
if (pHead->version <= pWal->version) return 0; if (pHead->version <= pWal->version) return 0;
pHead->signature = WAL_SIGNATURE; pHead->signature = WAL_SIGNATURE;
#if defined(WAL_CHECKSUM_WHOLE)
walUpdateChecksum(pHead);
#else
pHead->sver = 0;
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead)); taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
#endif
int32_t contLen = pHead->len + sizeof(SWalHead); int32_t contLen = pHead->len + sizeof(SWalHead);
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
...@@ -246,16 +274,40 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, ...@@ -246,16 +274,40 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd,
continue; continue;
} }
#if defined(WAL_CHECKSUM_WHOLE)
if (pHead->sver == 0 && walValidateChecksum(pHead)) {
wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos);
*offset = pos;
return TSDB_CODE_SUCCESS;
}
if (pHead->sver == 1) {
if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) {
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
if (walValidateChecksum(pHead)) {
wInfo("vgId:%d, wal whole cksum check passed, offset:%" PRId64, pWal->vgId, pos);
*offset = pos;
return TSDB_CODE_SUCCESS;
}
}
#else
if (taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { if (taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos); wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos);
*offset = pos; *offset = pos;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#endif
} }
return TSDB_CODE_WAL_FILE_CORRUPTED; return TSDB_CODE_WAL_FILE_CORRUPTED;
} }
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId) { static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId) {
int32_t size = WAL_MAX_SIZE; int32_t size = WAL_MAX_SIZE;
void * buffer = tmalloc(size); void * buffer = tmalloc(size);
...@@ -293,6 +345,51 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -293,6 +345,51 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
break; break;
} }
#if defined(WAL_CHECKSUM_WHOLE)
if (pHead->sver == 0 && !walValidateChecksum(pHead)) {
wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, tfd, offset);
break;
}
}
if (pHead->len < 0 || pHead->len > size - sizeof(SWalHead)) {
wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, tfd, offset);
break;
}
}
ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len);
if (ret < 0) {
wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno));
code = TAOS_SYSTEM_ERROR(errno);
break;
}
if (ret < pHead->len) {
wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len);
offset += sizeof(SWalHead);
continue;
}
if (pHead->sver == 1 && !walValidateChecksum(pHead)) {
wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, tfd, offset);
break;
}
}
#else
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset); pHead->version, pHead->len, offset);
...@@ -326,6 +423,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -326,6 +423,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
continue; continue;
} }
#endif
offset = offset + sizeof(SWalHead) + pHead->len; offset = offset + sizeof(SWalHead) + pHead->len;
wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d", pWal->vgId, wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d", pWal->vgId,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册