diff --git a/.gitignore b/.gitignore index e6e327327c2a72bc2262d9b2923314c950c78cf6..b400d719cc967fd4c1270355f876bd3c39cfc2f3 100644 --- a/.gitignore +++ b/.gitignore @@ -79,3 +79,15 @@ tests/comparisonTest/opentsdb/opentsdbtest/.settings/ tests/examples/JDBC/JDBCDemo/.classpath tests/examples/JDBC/JDBCDemo/.project tests/examples/JDBC/JDBCDemo/.settings/ + +# Emacs +# -*- mode: gitignore; -*- +*~ +\#*\# +/.emacs.desktop +/.emacs.desktop.lock +*.elc +auto-save-list +tramp +.\#* +TAGS diff --git a/src/wal/CMakeLists.txt b/src/wal/CMakeLists.txt index 42a764fce20d567fc0b127825576d9462eb8f06d..a89024dab5060b1f18174f769e0d70c00ad00faf 100644 --- a/src/wal/CMakeLists.txt +++ b/src/wal/CMakeLists.txt @@ -1,6 +1,8 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8) PROJECT(TDengine) +ADD_DEFINITIONS(-DWAL_CHECKSUM_WHOLE) + INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index ea1eaa4feefd01ca9db764e1b27377233b6dfd2b..aeb49830299eb0dcddfbd39a7a838fdc5d45b081 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -111,6 +111,28 @@ void walRemoveAllOldFiles(void *handle) { pthread_mutex_unlock(&pWal->mutex); } +#if defined(WAL_CHECKSUM_WHOLE) + +static void walUpdateChecksum(SWalHead *pHead) { + pHead->sver = 1; + pHead->cksum = 0; + pHead->cksum = taosCalcChecksum(0, (uint8_t *)pHead, sizeof(*pHead) + pHead->len); +} + +static int walValidateChecksum(SWalHead *pHead) { + if (pHead->sver == 0) { // for compatible with wal before sver 1 + return taosCheckChecksumWhole((uint8_t *)pHead, sizeof(*pHead)); + } else if (pHead->sver == 1) { + uint32_t cksum = pHead->cksum; + pHead->cksum = 0; + return taosCheckChecksum((uint8_t *)pHead, sizeof(*pHead) + pHead->len, cksum); + } + + return 0; +} + +#endif + int32_t walWrite(void *handle, SWalHead *pHead) { if (handle == NULL) return -1; @@ -123,7 +145,13 @@ int32_t walWrite(void *handle, SWalHead *pHead) { if (pHead->version <= pWal->version) return 0; pHead->signature = WAL_SIGNATURE; +#if defined(WAL_CHECKSUM_WHOLE) + walUpdateChecksum(pHead); +#else + pHead->sver = 0; taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead)); +#endif + int32_t contLen = pHead->len + sizeof(SWalHead); pthread_mutex_lock(&pWal->mutex); @@ -246,16 +274,40 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, continue; } +#if defined(WAL_CHECKSUM_WHOLE) + if (pHead->sver == 0 && walValidateChecksum(pHead)) { + wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos); + *offset = pos; + return TSDB_CODE_SUCCESS; + } + + if (pHead->sver == 1) { + if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) { + wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos); + return TSDB_CODE_WAL_FILE_CORRUPTED; + } + + if (walValidateChecksum(pHead)) { + wInfo("vgId:%d, wal whole cksum check passed, offset:%" PRId64, pWal->vgId, pos); + *offset = pos; + return TSDB_CODE_SUCCESS; + } + } + +#else if (taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos); *offset = pos; return TSDB_CODE_SUCCESS; } + +#endif } return TSDB_CODE_WAL_FILE_CORRUPTED; } + static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId) { int32_t size = WAL_MAX_SIZE; void * buffer = tmalloc(size); @@ -293,6 +345,51 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch break; } +#if defined(WAL_CHECKSUM_WHOLE) + if (pHead->sver == 0 && !walValidateChecksum(pHead)) { + wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, + pHead->version, pHead->len, offset); + code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); + if (code != TSDB_CODE_SUCCESS) { + walFtruncate(pWal, tfd, offset); + break; + } + } + + if (pHead->len < 0 || pHead->len > size - sizeof(SWalHead)) { + wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, + pHead->version, pHead->len, offset); + code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); + if (code != TSDB_CODE_SUCCESS) { + walFtruncate(pWal, tfd, offset); + break; + } + } + + ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len); + if (ret < 0) { + wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno)); + code = TAOS_SYSTEM_ERROR(errno); + break; + } + + if (ret < pHead->len) { + wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len); + offset += sizeof(SWalHead); + continue; + } + + if (pHead->sver == 1 && !walValidateChecksum(pHead)) { + wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, + pHead->version, pHead->len, offset); + code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); + if (code != TSDB_CODE_SUCCESS) { + walFtruncate(pWal, tfd, offset); + break; + } + } + +#else if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, pHead->version, pHead->len, offset); @@ -326,6 +423,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch continue; } +#endif offset = offset + sizeof(SWalHead) + pHead->len; wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d", pWal->vgId,