From e8cdf132fbeedc2e89ec761ee517a2aba230959f Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 6 Jul 2023 11:32:20 +0800 Subject: [PATCH] chore: rsma code refactor --- source/dnode/vnode/CMakeLists.txt | 1 - source/dnode/vnode/src/inc/sma.h | 25 +- source/dnode/vnode/src/inc/vnodeInt.h | 3 +- source/dnode/vnode/src/sma/smaCommit.c | 30 -- source/dnode/vnode/src/sma/smaEnv.c | 5 +- source/dnode/vnode/src/sma/smaFS.c | 643 ----------------------- source/dnode/vnode/src/sma/smaRollup.c | 278 +--------- source/dnode/vnode/src/sma/smaSnapshot.c | 293 +---------- source/dnode/vnode/src/sma/smaUtil.c | 65 +-- source/dnode/vnode/src/tsdb/tsdbRead.c | 6 +- 10 files changed, 14 insertions(+), 1335 deletions(-) delete mode 100644 source/dnode/vnode/src/sma/smaFS.c diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index a4dad8f96a..a3ccc720d9 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -32,7 +32,6 @@ target_sources( # sma "src/sma/smaEnv.c" "src/sma/smaUtil.c" - "src/sma/smaFS.c" "src/sma/smaOpen.c" "src/sma/smaCommit.c" "src/sma/smaRollup.c" diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index a5d9c0a91b..4e87589e60 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -159,11 +159,6 @@ struct SRSmaInfo { void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t STaosQueue *queue; // buffer queue of SubmitReq STaosQall *qall; // buffer qall of SubmitReq -#if 0 - void *iTaskInfo[TSDB_RETENTION_L2]; // immutable qTaskInfo_t - STaosQueue *iQueue; // immutable buffer queue of SubmitReq - STaosQall *iQall; // immutable buffer qall of SubmitReq -#endif }; #define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items) @@ -223,27 +218,12 @@ int32_t smaPreClose(SSma *pSma); // rsma void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); -// int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback); -// void tdRSmaFSClose(SRSmaFS *fs); -// int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew); -// int32_t tdRSmaFSCommit(SSma *pSma); -// int32_t tdRSmaFSFinishCommit(SSma *pSma); -// int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS); -// int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS); -// int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS); -// void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS); -// int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize); -// int32_t tdRSmaFSRollback(SSma *pSma); int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback); int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); -// int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback); void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName); -void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path, - char *outputName); -void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName); -void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName); +void tdRSmaQTaskInfoGetFullPath(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName); static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { int32_t ref = T_REF_INC(pRSmaInfo); @@ -254,8 +234,7 @@ static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { smaTrace("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); } -void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid, - int8_t level, int64_t version, char *outputName); + void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 6597334c5e..5c08293591 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -318,7 +318,6 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData); // SRSmaSnapWriter ======================================== int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter); int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); -// int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter); int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback); typedef struct { @@ -467,7 +466,7 @@ enum { SNAP_DATA_DEL = 3, SNAP_DATA_RSMA1 = 4, SNAP_DATA_RSMA2 = 5, - SNAP_DATA_QTASK = 6, + SNAP_DATA_QTASK = 6, // obsolete SNAP_DATA_TQ_HANDLE = 7, SNAP_DATA_TQ_OFFSET = 8, SNAP_DATA_STREAM_TASK = 9, diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index c74559bc5e..16505b7dc7 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -108,9 +108,6 @@ int32_t smaFinishCommit(SSma *pSma) { int32_t lino = 0; SVnode *pVnode = pSma->pVnode; - // code = tdRSmaFSFinishCommit(pSma); - // TSDB_CHECK_CODE(code, lino, _exit); - if (VND_RSMA1(pVnode) && (code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) { TSDB_CHECK_CODE(code, lino, _exit); } @@ -187,30 +184,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { if (!isCommit) goto _exit; smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - // code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); - // TSDB_CHECK_CODE(code, lino, _exit); - - // smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - -#if 0 // consuming task of qTaskInfo clone - // step 4: swap queue/qall and iQueue/iQall - // lock - taosWLockLatch(SMA_ENV_LOCK(pEnv)); - - void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); - - while (pIter) { - SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; - TSWAP(pInfo->iQall, pInfo->qall); - TSWAP(pInfo->iQueue, pInfo->queue); - TSWAP(pInfo->iTaskInfo[0], pInfo->taskInfo[0]); - TSWAP(pInfo->iTaskInfo[1], pInfo->taskInfo[1]); - pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter); - } - - // unlock - taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); -#endif // all rsma results are written completely STsdb *pTsdb = NULL; @@ -246,9 +219,6 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) { goto _exit; } - // code = tdRSmaFSCommit(pSma); - // TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbCommit(VND_RSMA1(pVnode), pInfo); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index b96d04e180..c171355cb2 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -287,10 +287,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { // step 4: destroy the rsma info and associated fetch tasks taosHashCleanup(RSMA_INFO_HASH(pStat)); - // step 5: - // tdRSmaFSClose(RSMA_FS(pStat)); - - // step 6: free pStat + // step 5: free pStat tsem_destroy(&(pStat->notEmpty)); taosMemoryFreeClear(pStat); } diff --git a/source/dnode/vnode/src/sma/smaFS.c b/source/dnode/vnode/src/sma/smaFS.c deleted file mode 100644 index d7400245f1..0000000000 --- a/source/dnode/vnode/src/sma/smaFS.c +++ /dev/null @@ -1,643 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "sma.h" - -// ================================================================================================= -#if 0 -// static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output); -static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2); - -static FORCE_INLINE int32_t tPutQTaskF(uint8_t *p, SQTaskFile *pFile) { - int32_t n = 0; - - n += tPutI8(p ? p + n : p, pFile->level); - n += tPutI64v(p ? p + n : p, pFile->size); - n += tPutI64v(p ? p + n : p, pFile->suid); - n += tPutI64v(p ? p + n : p, pFile->version); - n += tPutI64v(p ? p + n : p, pFile->mtime); - - return n; -} - -static int32_t tdRSmaFSToBinary(uint8_t *p, SRSmaFS *pFS) { - int32_t n = 0; - uint32_t size = taosArrayGetSize(pFS->aQTaskInf); - - // version - n += tPutI8(p ? p + n : p, 0); - - // SArray - n += tPutU32v(p ? p + n : p, size); - for (uint32_t i = 0; i < size; ++i) { - n += tPutQTaskF(p ? p + n : p, taosArrayGet(pFS->aQTaskInf, i)); - } - - return n; -} - -int32_t tdRSmaGetQTaskF(uint8_t *p, SQTaskFile *pFile) { - int32_t n = 0; - - n += tGetI8(p + n, &pFile->level); - n += tGetI64v(p + n, &pFile->size); - n += tGetI64v(p + n, &pFile->suid); - n += tGetI64v(p + n, &pFile->version); - n += tGetI64v(p + n, &pFile->mtime); - - return n; -} - -static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, SRSmaFS *pFS) { - int32_t code = 0; - int32_t n = 0; - int8_t version = 0; - - // version - n += tGetI8(pData + n, &version); - - // SArray - taosArrayClear(pFS->aQTaskInf); - uint32_t size = 0; - n += tGetU32v(pData + n, &size); - for (uint32_t i = 0; i < size; ++i) { - SQTaskFile qTaskF = {0}; - - int32_t nt = tdRSmaGetQTaskF(pData + n, &qTaskF); - if (nt < 0) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _exit; - } - - n += nt; - if (taosArrayPush(pFS->aQTaskInf, &qTaskF) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - } - - if (ASSERTS(n + sizeof(TSCKSUM) == nData, "n:%d + sizeof(TSCKSUM):%d != nData:%d", n, (int32_t)sizeof(TSCKSUM), - nData)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _exit; - } - -_exit: - return code; -} - -static int32_t tdRSmaSaveFSToFile(SRSmaFS *pFS, const char *fname) { - int32_t code = 0; - int32_t lino = 0; - TdFilePtr pFD = NULL; - - // encode to binary - int32_t size = tdRSmaFSToBinary(NULL, pFS) + sizeof(TSCKSUM); - uint8_t *pData = taosMemoryMalloc(size); - if (pData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - tdRSmaFSToBinary(pData, pFS); - taosCalcChecksumAppend(0, pData, size); - - // save to file - pFD = taosCreateFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); - if (!pFD) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - - int64_t n = taosWriteFile(pFD, pData, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (taosFsyncFile(pFD) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - taosCloseFile(&pFD); - if (pData) taosMemoryFree(pData); - if (code) { - smaError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname); - } - return code; -} - -static int32_t tdRSmaFSCreate(SRSmaFS *pFS, int32_t size) { - int32_t code = 0; - - pFS->aQTaskInf = taosArrayInit(size, sizeof(SQTaskFile)); - if (pFS->aQTaskInf == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - -_exit: - return code; -} - -static void tdRSmaGetCurrentFName(SSma *pSma, char *current, char *current_t) { - SVnode *pVnode = pSma->pVnode; - if (pVnode->pTfs) { - if (current) { - snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT", tfsGetPrimaryPath(pVnode->pTfs), - TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP); - } - if (current_t) { - snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT.t", tfsGetPrimaryPath(pVnode->pTfs), - TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP); - } - } else { -#if 0 - if (current) { - snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sPRESENT", pTsdb->path, TD_DIRSEP); - } - if (current_t) { - snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sPRESENT.t", pTsdb->path, TD_DIRSEP); - } -#endif - } -} - -static int32_t tdRSmaLoadFSFromFile(const char *fname, SRSmaFS *pFS) { - int32_t code = 0; - int32_t lino = 0; - uint8_t *pData = NULL; - - // load binary - TdFilePtr pFD = taosOpenFile(fname, TD_FILE_READ); - if (pFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - - int64_t size; - if (taosFStatFile(pFD, &size, NULL) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - - pData = taosMemoryMalloc(size); - if (pData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (taosReadFile(pFD, pData, size) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (!taosCheckChecksumWhole(pData, size)) { - code = TSDB_CODE_FILE_CORRUPTED; - TSDB_CHECK_CODE(code, lino, _exit); - } - - // decode binary - code = tsdbBinaryToFS(pData, size, pFS); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - taosCloseFile(&pFD); - if (pData) taosMemoryFree(pData); - if (code) { - smaError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname); - } - return code; -} - -static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) { - const SQTaskFile *q1 = (const SQTaskFile *)p1; - const SQTaskFile *q2 = (const SQTaskFile *)p2; - - if (q1->suid < q2->suid) { - return -1; - } else if (q1->suid > q2->suid) { - return 1; - } - - if (q1->level < q2->level) { - return -1; - } else if (q1->level > q2->level) { - return 1; - } - - if (q1->version < q2->version) { - return -2; - } else if (q1->version > q2->version) { - return 1; - } - - return 0; -} - -static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFSNew) { - int32_t code = 0; - int32_t lino = 0; - int32_t nRef = 0; - SVnode *pVnode = pSma->pVnode; - SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); - SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); - SRSmaFS *pFSOld = RSMA_FS(pStat); - int64_t version = pStat->commitAppliedVer; - char fname[TSDB_FILENAME_LEN] = {0}; - - // SQTaskFile - int32_t nNew = taosArrayGetSize(pFSNew->aQTaskInf); - int32_t iNew = 0; - while (iNew < nNew) { - SQTaskFile *pQTaskFNew = TARRAY_GET_ELEM(pFSNew->aQTaskInf, iNew++); - - int32_t idx = taosArraySearchIdx(pFSOld->aQTaskInf, pQTaskFNew, tdQTaskInfCmprFn1, TD_GE); - - if (idx < 0) { - idx = taosArrayGetSize(pFSOld->aQTaskInf); - pQTaskFNew->nRef = 1; - } else { - SQTaskFile *pTaskF = TARRAY_GET_ELEM(pFSOld->aQTaskInf, idx); - int32_t c1 = tdQTaskInfCmprFn1(pQTaskFNew, pTaskF); - if (c1 == 0) { - // utilize the item in pFSOld->qQTaskInf, instead of pFSNew - continue; - } else if (c1 < 0) { - // NOTHING TODO - } else { - code = TSDB_CODE_RSMA_FS_UPDATE; - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - if (taosArrayInsert(pFSOld->aQTaskInf, idx, pQTaskFNew) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - // remove previous version - while (--idx >= 0) { - SQTaskFile *preTaskF = TARRAY_GET_ELEM(pFSOld->aQTaskInf, idx); - int32_t c2 = tdQTaskInfCmprFn1(preTaskF, pQTaskFNew); - if (c2 == 0) { - code = TSDB_CODE_RSMA_FS_UPDATE; - TSDB_CHECK_CODE(code, lino, _exit); - } else if (c2 != -2) { - break; - } - - nRef = atomic_sub_fetch_32(&preTaskF->nRef, 1); - if (nRef <= 0) { - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), preTaskF->suid, preTaskF->level, preTaskF->version, - tfsGetPrimaryPath(pVnode->pTfs), fname); - (void)taosRemoveFile(fname); - taosArrayRemove(pFSOld->aQTaskInf, idx); - } - } - } - -_exit: - if (code) { - smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -static int32_t tdRSmaFSScanAndTryFix(SSma *pSma) { - int32_t code = 0; -#if 0 - int32_t lino = 0; - SVnode *pVnode = pSma->pVnode; - SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); - SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); - SRSmaFS *pFS = RSMA_FS(pStat); - char fname[TSDB_FILENAME_LEN] = {0}; - char fnameVer[TSDB_FILENAME_LEN] = {0}; - - // SArray - int32_t size = taosArrayGetSize(pFS->aQTaskInf); - for (int32_t i = 0; i < size; ++i) { - SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, i); - - // main.tdb ========= - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version, - tfsGetPrimaryPath(pVnode->pTfs), fnameVer); - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, -1, tfsGetPrimaryPath(pVnode->pTfs), fname); - - if (taosCheckExistFile(fnameVer)) { - if (taosRenameFile(fnameVer, fname) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - smaDebug("vgId:%d, %s:%d succeed to to rename %s to %s", TD_VID(pVnode), __func__, lino, fnameVer, fname); - } else if (taosCheckExistFile(fname)) { - if (taosRemoveFile(fname) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - smaDebug("vgId:%d, %s:%d succeed to to remove %s", TD_VID(pVnode), __func__, lino, fname); - } - } - - { - // remove those invalid files (todo) - // main.tdb-journal.5 // TDB should handle its clear for kill -9 - } - -_exit: - if (code) { - smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); - } -#endif - return code; -} - -// EXPOSED APIS ==================================================================================== - -int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback) { - int32_t code = 0; - int32_t lino = 0; - SVnode *pVnode = pSma->pVnode; - SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); - SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); - - // open handle - code = tdRSmaFSCreate(RSMA_FS(pStat), 0); - TSDB_CHECK_CODE(code, lino, _exit); - - // open impl - char current[TSDB_FILENAME_LEN] = {0}; - char current_t[TSDB_FILENAME_LEN] = {0}; - tdRSmaGetCurrentFName(pSma, current, current_t); - - if (taosCheckExistFile(current)) { - code = tdRSmaLoadFSFromFile(current, RSMA_FS(pStat)); - TSDB_CHECK_CODE(code, lino, _exit); - - if (taosCheckExistFile(current_t)) { - if (rollback) { - code = tdRSmaFSRollback(pSma); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tdRSmaFSCommit(pSma); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - } else { - // 1st time open with empty current/qTaskInfoFile - code = tdRSmaSaveFSToFile(RSMA_FS(pStat), current); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // scan and try fix(remove main.db/main.db.xxx and use the one with version) - code = tdRSmaFSScanAndTryFix(pSma); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -void tdRSmaFSClose(SRSmaFS *pFS) { pFS->aQTaskInf = taosArrayDestroy(pFS->aQTaskInf); } - -int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew) { - int32_t code = 0; - int32_t lino = 0; - char tfname[TSDB_FILENAME_LEN]; - - tdRSmaGetCurrentFName(pSma, NULL, tfname); - - // generate PRESENT.t - code = tdRSmaSaveFSToFile(pFSNew, tfname); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -int32_t tdRSmaFSCommit(SSma *pSma) { - int32_t code = 0; - int32_t lino = 0; - SRSmaFS fs = {0}; - - char current[TSDB_FILENAME_LEN] = {0}; - char current_t[TSDB_FILENAME_LEN] = {0}; - tdRSmaGetCurrentFName(pSma, current, current_t); - - if (!taosCheckExistFile(current_t)) { - goto _exit; - } - - // rename the file - if (taosRenameFile(current_t, current) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // load the new FS - code = tdRSmaFSCreate(&fs, 1); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tdRSmaLoadFSFromFile(current, &fs); - TSDB_CHECK_CODE(code, lino, _exit); - - // apply file change - code = tdRSmaFSApplyChange(pSma, &fs); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - tdRSmaFSClose(&fs); - if (code) { - smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code)); - } - return code; -} - -int32_t tdRSmaFSFinishCommit(SSma *pSma) { - int32_t code = 0; - int32_t lino = 0; - SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); - SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); - - taosWLockLatch(RSMA_FS_LOCK(pStat)); - code = tdRSmaFSCommit(pSma); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - taosWUnLockLatch(RSMA_FS_LOCK(pStat)); - if (code) { - smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code)); - } else { - smaInfo("vgId:%d, rsmaFS finish commit", SMA_VID(pSma)); - } - return code; -} - -int32_t tdRSmaFSRollback(SSma *pSma) { - int32_t code = 0; - int32_t lino = 0; - - char current_t[TSDB_FILENAME_LEN] = {0}; - tdRSmaGetCurrentFName(pSma, NULL, current_t); - (void)taosRemoveFile(current_t); - -_exit: - if (code) { - smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(errno)); - } - return code; -} - -int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize) { - int32_t code = 0; - - for (int32_t i = 0; i < nSize; ++i) { - SQTaskFile *qTaskF = qTaskFile + i; - - int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskF, tdQTaskInfCmprFn1, TD_GE); - - if (idx < 0) { - idx = taosArrayGetSize(pFS->aQTaskInf); - } else { - SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx); - int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskF); - if (c == 0) { - if (pTaskF->size != qTaskF->size) { - code = TSDB_CODE_RSMA_FS_UPDATE; - smaError("vgId:%d, %s failed at line %d since %s, level:%" PRIi8 ", suid:%" PRIi64 ", version:%" PRIi64 - ", size:%" PRIi64 " != %" PRIi64, - SMA_VID(pSma), __func__, __LINE__, tstrerror(code), pTaskF->level, pTaskF->suid, pTaskF->version, - pTaskF->size, qTaskF->size); - goto _exit; - } - continue; - } - } - - if (!taosArrayInsert(pFS->aQTaskInf, idx, qTaskF)) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - } - -_exit: - return code; -} - -int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS) { - int32_t code = 0; - int32_t lino = 0; - int32_t nRef = 0; - SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); - SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); - SRSmaFS *qFS = RSMA_FS(pStat); - int32_t size = taosArrayGetSize(qFS->aQTaskInf); - - pFS->aQTaskInf = taosArrayInit_s(sizeof(SQTaskFile), size); - if (pFS->aQTaskInf == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - for (int32_t i = 0; i < size; ++i) { - SQTaskFile *qTaskF = (SQTaskFile *)taosArrayGet(qFS->aQTaskInf, i); - nRef = atomic_fetch_add_32(&qTaskF->nRef, 1); - if (nRef <= 0) { - code = TSDB_CODE_RSMA_FS_REF; - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - memcpy(pFS->aQTaskInf->pData, qFS->aQTaskInf->pData, size * sizeof(SQTaskFile)); - -_exit: - if (code) { - smaError("vgId:%d, %s failed at line %d since %s, nRef %d", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code), - nRef); - } - return code; -} - -void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS) { - int32_t nRef = 0; - char fname[TSDB_FILENAME_LEN]; - SVnode *pVnode = pSma->pVnode; - SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); - SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); - int32_t size = taosArrayGetSize(pFS->aQTaskInf); - - for (int32_t i = 0; i < size; ++i) { - SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, i); - - nRef = atomic_sub_fetch_32(&pTaskF->nRef, 1); - if (nRef == 0) { - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version, - tfsGetPrimaryPath(pVnode->pTfs), fname); - if (taosRemoveFile(fname) < 0) { - smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), fname, tstrerror(TAOS_SYSTEM_ERROR(errno))); - } else { - smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), fname); - } - } else if (nRef < 0) { - smaWarn("vgId:%d, abnormal unref %s since %s", TD_VID(pVnode), fname, tstrerror(TSDB_CODE_RSMA_FS_REF)); - } - } - - taosArrayDestroy(pFS->aQTaskInf); -} - -int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS) { - int32_t code = 0; - int32_t lino = 0; - SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); - SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); - - taosRLockLatch(RSMA_FS_LOCK(pStat)); - code = tdRSmaFSRef(pSma, pFS); - TSDB_CHECK_CODE(code, lino, _exit); -_exit: - taosRUnLockLatch(RSMA_FS_LOCK(pStat)); - if (code) { - smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS) { - int32_t code = 0; - int32_t lino = 0; - SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); - SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); - SRSmaFS *qFS = RSMA_FS(pStat); - int32_t size = taosArrayGetSize(qFS->aQTaskInf); - - code = tdRSmaFSCreate(pFS, size); - TSDB_CHECK_CODE(code, lino, _exit); - taosArrayAddBatch(pFS->aQTaskInf, qFS->aQTaskInf->pData, size); - -_exit: - if (code) { - smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} -#endif \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 99a3fd1f46..53b2786321 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -47,7 +47,6 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo); static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid); static void tdRSmaFetchTrigger(void *param, void *tmrId); -// static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo); static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer); @@ -96,11 +95,6 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { if (isDeepFree && pInfo->taskInfo[i]) { tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); } -#if 0 - if (pInfo->iTaskInfo[i]) { - tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1); - } -#endif } if (isDeepFree) { taosMemoryFreeClear(pInfo->pTSchema); @@ -115,16 +109,6 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { taosFreeQall(pInfo->qall); pInfo->qall = NULL; } -#if 0 - if (pInfo->iQueue) { - taosCloseQueue(pInfo->iQueue); - pInfo->iQueue = NULL; - } - if (pInfo->iQall) { - taosFreeQall(pInfo->iQall); - pInfo->iQall = NULL; - } -#endif } taosMemoryFree(pInfo); @@ -260,7 +244,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat void *pStreamState = NULL; // set the backend of stream state - tdRSmaQTaskInfoGetFullPathEx(TD_VID(pVnode), pRSmaInfo->suid, idx + 1, tfsGetPrimaryPath(pVnode->pTfs), taskInfDir); + tdRSmaQTaskInfoGetFullPath(TD_VID(pVnode), pRSmaInfo->suid, idx + 1, tfsGetPrimaryPath(pVnode->pTfs), taskInfDir); if (!taosCheckExistFile(taskInfDir)) { char *s = taosStrdup(taskInfDir); if (taosMulMkDir(taosDirName(s)) != 0) { @@ -371,15 +355,6 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con goto _err; } -#if 0 - if (!(pRSmaInfo->iQueue = taosOpenQueue())) { - goto _err; - } - if (!(pRSmaInfo->iQall = taosAllocateQall())) { - goto _err; - } -#endif - if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) { goto _err; } @@ -605,15 +580,6 @@ _end: return code; } -#if 0 -static void tdBlockDataDestroy(SArray *pBlockArr) { - for (int32_t i = 0; i < taosArrayGetSize(pBlockArr); ++i) { - blockDataDestroy(taosArrayGetP(pBlockArr, i)); - } - taosArrayDestroy(pBlockArr); -} -#endif - static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid) { int32_t code = 0; @@ -780,11 +746,8 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) { static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, ERsmaExecType type, int8_t level) { int32_t idx = level - 1; -#if 0 - void *qTaskInfo = (type == RSMA_EXEC_COMMIT) ? RSMA_INFO_IQTASK(pInfo, idx) : RSMA_INFO_QTASK(pInfo, idx); -#else - void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx); -#endif + void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx); + if (!qTaskInfo) { smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid); @@ -817,115 +780,6 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, return TSDB_CODE_SUCCESS; } -#if 0 -static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t srcTaskInfo, SRSmaParam *param, - tb_uid_t suid, int8_t idx) { - int32_t code = 0; - int32_t lino = 0; - SVnode *pVnode = pSma->pVnode; - char *pOutput = NULL; - int32_t len = 0; - - if (!srcTaskInfo) { - code = TSDB_CODE_INVALID_PTR; - smaWarn("vgId:%d, rsma clone, table %" PRIi64 ", no need since srcTaskInfo is NULL", TD_VID(pVnode), suid); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len); - TSDB_CHECK_CODE(code, lino, _exit); - - SReadHandle handle = { .vnode = pVnode, .initTqReader = 1 }; - initStorageAPI(&handle.api); - - if (ASSERTS(!dstTaskInfo, "dstTaskInfo:%p is not NULL", dstTaskInfo)) { - code = TSDB_CODE_APP_ERROR; - TSDB_CHECK_CODE(code, lino, _exit); - } - - dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode)); - if (!dstTaskInfo) { - code = TSDB_CODE_RSMA_QTASKINFO_CREATE; - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = qDeserializeTaskStatus(dstTaskInfo, pOutput, len); - TSDB_CHECK_CODE(code, lino, _exit); - - smaDebug("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid); - -_exit: - taosMemoryFreeClear(pOutput); - if (code) { - tdRSmaQTaskInfoFree(dstTaskInfo, TD_VID(pVnode), idx + 1); - smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid, - terrstr()); - } - return code; -} -#endif - -/** - * @brief Clone qTaskInfo of SRSmaInfo - * - * @param pSma - * @param pInfo - * @return int32_t - */ -#if 0 -static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) { - int32_t code = 0; - int32_t lino = 0; - SRSmaParam *param = NULL; - SMetaReader mr = {0}; - - if (!pInfo) { - return TSDB_CODE_SUCCESS; - } - - metaReaderDoInit(&mr, SMA_META(pSma), 0); - smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid); - if (metaReaderGetTableEntryByUidCache(&mr, pInfo->suid) < 0) { - code = terrno; - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (mr.me.type != TSDB_SUPER_TABLE) { - code = TSDB_CODE_RSMA_INVALID_SCHEMA; - TSDB_CHECK_CODE(code, lino, _exit); - } - if (mr.me.uid != pInfo->suid) { - code = TSDB_CODE_RSMA_INVALID_SCHEMA; - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (TABLE_IS_ROLLUP(mr.me.flags)) { - param = &mr.me.stbEntry.rsmaParam; -#if 0 - for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - if (!pInfo->iTaskInfo[i]) { - continue; - } - code = tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i); - TSDB_CHECK_CODE(code, lino, _exit); - } - smaDebug("vgId:%d, rsma clone env success for %" PRIi64, SMA_VID(pSma), pInfo->suid); -#endif - } else { - code = TSDB_CODE_RSMA_INVALID_SCHEMA; - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", flags:%" PRIi8 ",type:%" PRIi8 ", uid:%" PRIi64, - SMA_VID(pSma), __func__, lino, tstrerror(code), pInfo->suid, mr.me.flags, mr.me.type, mr.me.uid); - } - metaReaderClear(&mr); - return code; -} -#endif - /** * @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied. * @@ -960,14 +814,7 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } -#if 0 - if (!pRSmaInfo->taskInfo[0]) { - if ((terrno = tdRSmaInfoClone(pSma, pRSmaInfo)) < 0) { - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); - return NULL; - } - } -#endif + tdRefRSmaInfo(pSma, pRSmaInfo); taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); if (ASSERTS(pRSmaInfo->suid == suid, "suid:%" PRIi64 " != %" PRIi64, pRSmaInfo->suid, suid)) { @@ -1188,12 +1035,7 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, goto _err; } - // step 2: open SRSmaFS for qTaskFiles - // if ((code = tdRSmaFSOpen(pSma, qtaskFileVer, rollback)) < 0) { - // goto _err; - // } - - // step 3: iterate all stables to restore the rsma env + // step 2: iterate all stables to restore the rsma env if ((code = tdRSmaRestoreQTaskInfoInit(pSma, &nTables)) < 0) { goto _err; } @@ -1209,117 +1051,7 @@ _err: return code; } -#if 0 -int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { - int32_t code = 0; - int32_t lino = 0; - SSma *pSma = pRSmaStat->pSma; - SVnode *pVnode = pSma->pVnode; - SArray *qTaskFArray = NULL; - int64_t version = pRSmaStat->commitAppliedVer; - TdFilePtr pOutFD = NULL; - TdFilePtr pInFD = NULL; - char fname[TSDB_FILENAME_LEN]; - char fnameVer[TSDB_FILENAME_LEN]; - SRSmaFS fs = {0}; - - if (taosHashGetSize(pInfoHash) <= 0) { - return TSDB_CODE_SUCCESS; - } - - qTaskFArray = taosArrayInit(taosHashGetSize(pInfoHash) << 1, sizeof(SQTaskFile)); - if (!qTaskFArray) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - void *infoHash = NULL; - while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { - SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; - - if (RSMA_INFO_IS_DEL(pRSmaInfo)) { - continue; - } - - for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i); - if (pItem && pItem->pStreamState) { - if (streamStateCommit(pItem->pStreamState) < 0) { - code = TSDB_CODE_RSMA_STREAM_STATE_COMMIT; - TSDB_CHECK_CODE(code, lino, _exit); - } - smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 ", level %d", TD_VID(pVnode), - pRSmaInfo->suid, i + 1); - - // qTaskInfo file - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pRSmaInfo->suid, i + 1, -1, tfsGetPrimaryPath(pVnode->pTfs), fname); - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pRSmaInfo->suid, i + 1, version, tfsGetPrimaryPath(pVnode->pTfs), - fnameVer); - if (taosCheckExistFile(fnameVer)) { - smaWarn("vgId:%d, rsma persist, duplicate file %s exist", TD_VID(pVnode), fnameVer); - } - - pOutFD = taosCreateFile(fnameVer, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); - if (pOutFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - pInFD = taosOpenFile(fname, TD_FILE_READ); - if (pInFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - - int64_t size = 0; - uint32_t mtime = 0; - if (taosFStatFile(pInFD, &size, &mtime) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - - int64_t offset = 0; - if (taosFSendFile(pOutFD, pInFD, &offset, size) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - smaError("vgId:%d, rsma persist, send qtaskinfo file %s to %s failed since %s", TD_VID(pVnode), fname, - fnameVer, tstrerror(code)); - TSDB_CHECK_CODE(code, lino, _exit); - } - taosCloseFile(&pOutFD); - taosCloseFile(&pInFD); - - SQTaskFile qTaskF = { - .nRef = 1, .level = i + 1, .suid = pRSmaInfo->suid, .version = version, .size = size, .mtime = mtime}; - taosArrayPush(qTaskFArray, &qTaskF); - } - } - } - - // prepare - code = tdRSmaFSCopy(pSma, &fs); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tdRSmaFSUpsertQTaskFile(pSma, &fs, qTaskFArray->pData, taosArrayGetSize(qTaskFArray)); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tdRSmaFSPrepareCommit(pSma, &fs); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - - taosArrayDestroy(fs.aQTaskInf); - taosArrayDestroy(qTaskFArray); - - if (code) { - if (pOutFD) taosCloseFile(&pOutFD); - if (pInFD) taosCloseFile(&pInFD); - smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); - } - - terrno = code; - return code; -} -#endif /** * @brief trigger to get rsma result in async mode * diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c index 0cde0de2a4..151d424cc2 100644 --- a/source/dnode/vnode/src/sma/smaSnapshot.c +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -15,9 +15,6 @@ #include "sma.h" -// static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppData); -// static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); - // SRSmaSnapReader ======================================== struct SRSmaSnapReader { SSma* pSma; @@ -28,11 +25,6 @@ struct SRSmaSnapReader { // for data file int8_t rsmaDataDone[TSDB_RETENTION_L2]; STsdbSnapReader* pDataReader[TSDB_RETENTION_L2]; - - // for qtaskinfo file - int8_t qTaskDone; - int32_t fsIter; - SQTaskFReader* pQTaskFReader; }; int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapReader** ppReader) { @@ -62,22 +54,6 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapRead } } - // open qtaskinfo - // taosRLockLatch(RSMA_FS_LOCK(pStat)); - // code = tdRSmaFSRef(pSma, &pReader->fs); - // taosRUnLockLatch(RSMA_FS_LOCK(pStat)); - // TSDB_CHECK_CODE(code, lino, _exit); - - if (taosArrayGetSize(pReader->fs.aQTaskInf) > 0) { - pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader)); - if (!pReader->pQTaskFReader) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - pReader->pQTaskFReader->pSma = pSma; - pReader->pQTaskFReader->version = pReader->ever; - } - *ppReader = pReader; _exit: if (code) { @@ -88,114 +64,6 @@ _exit: return code; } -static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf) { - int32_t code = 0; - int32_t lino = 0; - SVnode* pVnode = pReader->pSma->pVnode; - SQTaskFReader* qReader = pReader->pQTaskFReader; - SRSmaFS* pFS = &pReader->fs; - int64_t n = 0; - uint8_t* pBuf = NULL; - int64_t version = pReader->ever; - char fname[TSDB_FILENAME_LEN]; - - if (!qReader) { - *ppBuf = NULL; - smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, not needed since qTaskReader is NULL", TD_VID(pVnode)); - goto _exit; - } - - if (pReader->fsIter >= taosArrayGetSize(pFS->aQTaskInf)) { - *ppBuf = NULL; - smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, fsIter reach end", TD_VID(pVnode)); - goto _exit; - } - - while (pReader->fsIter < taosArrayGetSize(pFS->aQTaskInf)) { - SQTaskFile* qTaskF = taosArrayGet(pFS->aQTaskInf, pReader->fsIter++); - if (qTaskF->version != version) { - continue; - } - - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, tfsGetPrimaryPath(pVnode->pTfs), - fname); - if (!taosCheckExistFile(fname)) { - smaError("vgId:%d, vnode snapshot rsma reader for qtaskinfo, table %" PRIi64 ", level %" PRIi8 - ", version %" PRIi64 " failed since %s not exist", - TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, fname); - code = TSDB_CODE_RSMA_FS_SYNC; - TSDB_CHECK_CODE(code, lino, _exit); - } - - TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ); - if (!fp) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - qReader->pReadH = fp; - qReader->level = qTaskF->level; - qReader->suid = qTaskF->suid; - } - - if (!qReader->pReadH) { - *ppBuf = NULL; - smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, not needed since readh is NULL", TD_VID(pVnode)); - goto _exit; - } - - int64_t size = 0; - if (taosFStatFile(qReader->pReadH, &size, NULL) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // seek - if (taosLSeekFile(qReader->pReadH, 0, SEEK_SET) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (*ppBuf) { - *ppBuf = taosMemoryRealloc(*ppBuf, sizeof(SSnapDataHdr) + size); - } else { - *ppBuf = taosMemoryMalloc(sizeof(SSnapDataHdr) + size); - } - if (!(*ppBuf)) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - // read - n = taosReadFile(qReader->pReadH, POINTER_SHIFT(*ppBuf, sizeof(SSnapDataHdr)), size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } else if (n != size) { - code = TSDB_CODE_FILE_CORRUPTED; - TSDB_CHECK_CODE(code, lino, _exit); - } - - smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo, version:%" PRIi64 ", size:%" PRIi64, TD_VID(pVnode), version, - size); - - SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppBuf); - pHdr->type = SNAP_DATA_QTASK; - pHdr->flag = qReader->level; - pHdr->index = qReader->suid; - pHdr->size = size; - -_exit: - if (qReader) taosCloseFile(&qReader->pReadH); - - if (code) { - *ppBuf = NULL; - smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); - } else { - smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo succeed", TD_VID(pVnode)); - } - return code; -} - int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) { int32_t code = 0; int32_t lino = 0; @@ -223,18 +91,6 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) { } } - // read qtaskinfo file - if (!pReader->qTaskDone) { - smaInfo("vgId:%d, vnode snapshot rsma qtaskinfo not done", SMA_VID(pReader->pSma)); - code = rsmaSnapReadQTaskInfo(pReader, ppData); - TSDB_CHECK_CODE(code, lino, _exit); - if (*ppData) { - goto _exit; - } else { - pReader->qTaskDone = 1; - } - } - _exit: if (code) { smaError("vgId:%d, vnode snapshot rsma read failed since %s", SMA_VID(pReader->pSma), tstrerror(code)); @@ -249,9 +105,6 @@ int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader) { int32_t code = 0; SRSmaSnapReader* pReader = *ppReader; - // tdRSmaFSUnRef(pReader->pSma, &pReader->fs); - taosMemoryFreeClear(pReader->pQTaskFReader); - for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pReader->pDataReader[i]) { tsdbSnapReaderClose(&pReader->pDataReader[i]); @@ -299,10 +152,6 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWrit } } - // qtaskinfo - // code = tdRSmaFSCopy(pSma, &pWriter->fs); - // TSDB_CHECK_CODE(code, lino, _exit); - // snapWriter *ppWriter = pWriter; _exit: @@ -316,22 +165,6 @@ _exit: return code; } -// int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter) { -// int32_t code = 0; -// int32_t lino = 0; - -// if (pWriter) { -// code = tdRSmaFSPrepareCommit(pWriter->pSma, &pWriter->fs); -// TSDB_CHECK_CODE(code, lino, _exit); -// } - -// _exit: -// if (code) { -// smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pWriter->pSma), __func__, lino, tstrerror(code)); -// } -// return code; -// } - int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) { int32_t code = 0; int32_t lino = 0; @@ -364,62 +197,6 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) { } } -#if 0 - // qtaskinfo - if (rollback) { - // tdRSmaFSRollback(pSma); - // remove qTaskFiles - } else { - // sendFile from fname.Ver to fname - SRSmaFS* pFS = &pWriter->fs; - int32_t size = taosArrayGetSize(pFS->aQTaskInf); - for (int32_t i = 0; i < size; ++i) { - SQTaskFile* pTaskF = TARRAY_GET_ELEM(pFS->aQTaskInf, i); - if (pTaskF->version == pWriter->ever) { - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version, primaryPath, fnameVer); - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, -1, primaryPath, fname); - - pInFD = taosOpenFile(fnameVer, TD_FILE_READ); - if (pInFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - - pOutFD = taosCreateFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); - if (pOutFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - - int64_t size = 0; - if (taosFStatFile(pInFD, &size, NULL) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - - int64_t offset = 0; - if (taosFSendFile(pOutFD, pInFD, &offset, size) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - smaError("vgId:%d, vnode snapshot rsma writer, send qtaskinfo file %s to %s failed since %s", TD_VID(pVnode), - fnameVer, fname, tstrerror(code)); - TSDB_CHECK_CODE(code, lino, _exit); - } - taosCloseFile(&pOutFD); - taosCloseFile(&pInFD); - } - } - - // lock - taosWLockLatch(RSMA_FS_LOCK(pStat)); - code = tdRSmaFSCommit(pSma); - if (code) { - taosWUnLockLatch(RSMA_FS_LOCK(pStat)); - goto _exit; - } - // unlock - taosWUnLockLatch(RSMA_FS_LOCK(pStat)); - } -#endif // rsma restore code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever, rollback); TSDB_CHECK_CODE(code, lino, _exit); @@ -451,8 +228,6 @@ int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) } else if (pHdr->type == SNAP_DATA_RSMA2) { pHdr->type = SNAP_DATA_TSDB; code = tsdbSnapWrite(pWriter->pDataWriter[1], pHdr); - } else if (pHdr->type == SNAP_DATA_QTASK) { - // code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData); } else { code = TSDB_CODE_RSMA_FS_SYNC; } @@ -466,70 +241,4 @@ _exit: smaInfo("vgId:%d, rsma snapshot write for data type %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type); } return code; -} -#if 0 -static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - int32_t lino = 0; - SSma* pSma = pWriter->pSma; - SVnode* pVnode = pSma->pVnode; - char fname[TSDB_FILENAME_LEN]; - TdFilePtr fp = NULL; - SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; - - fname[0] = '\0'; - - if (pHdr->size != (nData - sizeof(SSnapDataHdr))) { - code = TSDB_CODE_RSMA_FS_SYNC; - TSDB_CHECK_CODE(code, lino, _exit); - } - - SQTaskFile qTaskFile = { - .nRef = 1, .level = pHdr->flag, .suid = pHdr->index, .version = pWriter->ever, .size = pHdr->size}; - - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pHdr->index, pHdr->flag, qTaskFile.version, - tfsGetPrimaryPath(pVnode->pTfs), fname); - - fp = taosCreateFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (!fp) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - - int64_t contLen = taosWriteFile(fp, pHdr->data, pHdr->size); - if (contLen != pHdr->size) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - - uint32_t mtime = 0; - if (taosFStatFile(fp, NULL, &mtime) != 0) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - qTaskFile.mtime = mtime; - } - - if (taosFsyncFile(fp) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - TSDB_CHECK_CODE(code, lino, _exit); - } - - taosCloseFile(&fp); - - // code = tdRSmaFSUpsertQTaskFile(pSma, &pWriter->fs, &qTaskFile, 1); - // TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - if (fp) { - (void)taosRemoveFile(fname); - } - smaError("vgId:%d, %s failed at line %d since %s, file:%s", TD_VID(pVnode), __func__, lino, tstrerror(code), fname); - } else { - smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo %s succeed", TD_VID(pVnode), fname); - } - - return code; -} -#endif \ No newline at end of file +} \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 16efdd1ec1..beb3e24c70 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -17,75 +17,12 @@ #define TD_QTASKINFO_FNAME_PREFIX "main.tdb" -#if 0 -void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName) { - tdRSmaGetFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName); -} -#endif - -void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path, - char *outputName) { - tdRSmaGetFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName); -} - -#if 0 -void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName) { - tdRSmaGetDirName(vgId, path, VNODE_RSMA_DIR, true, outputName); - int32_t rsmaLen = strlen(outputName); - snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8, level); -} -#endif - -void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) { +void tdRSmaQTaskInfoGetFullPath(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) { tdRSmaGetDirName(vgId, path, VNODE_RSMA_DIR, true, outputName); int32_t rsmaLen = strlen(outputName); snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8 "%s%" PRIi64, level, TD_DIRSEP, suid); } -void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid, - int8_t level, int64_t version, char *outputName) { - if (level >= 0 && suid > 0) { - if (version >= 0) { - if (pdname) { - snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s.%" PRIi64, pdname, - TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname, - version); - } else { - snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s.%" PRIi64, TD_DIRSEP, - vgId, TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname, version); - } - } else { - if (pdname) { - snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s", pdname, - TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname); - } else { - snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s", TD_DIRSEP, vgId, - TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname); - } - } - } else { -#if 0 - if (version >= 0) { - if (pdname) { - snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s%" PRIi64, pdname, TD_DIRSEP, TD_DIRSEP, - vgId, TD_DIRSEP, dname, TD_DIRSEP, vgId, fname, version); - } else { - snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s%" PRIi64, TD_DIRSEP, vgId, TD_DIRSEP, dname, - TD_DIRSEP, vgId, fname, version); - } - } else { - if (pdname) { - snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, - TD_DIRSEP, dname, TD_DIRSEP, vgId, fname); - } else { - snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s", TD_DIRSEP, vgId, TD_DIRSEP, dname, - TD_DIRSEP, vgId, fname); - } - } -#endif - } -} - void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName) { if (pdname) { if (endWithSep) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 165448fb7b..45df342f77 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3705,15 +3705,15 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret if (level == TSDB_RETENTION_L0) { *pLevel = TSDB_RETENTION_L0; - tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str); + tsdbError("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str); return VND_RSMA0(pVnode); } else if (level == TSDB_RETENTION_L1) { *pLevel = TSDB_RETENTION_L1; - tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str); + tsdbError("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str); return VND_RSMA1(pVnode); } else { *pLevel = TSDB_RETENTION_L2; - tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str); + tsdbError("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str); return VND_RSMA2(pVnode); } } -- GitLab