From ba0bce8092eeea6cbebb029626c9a7b592ab5e8c Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 11 Jul 2023 18:52:51 +0800 Subject: [PATCH] chore: code revert --- include/common/tdatablock.h | 2 +- source/common/src/tdatablock.c | 37 +------------------- source/dnode/vnode/src/inc/sma.h | 2 +- source/dnode/vnode/src/sma/smaCommit.c | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 48 ++++---------------------- 5 files changed, 11 insertions(+), 80 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 129078ee23..c0412d2617 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -244,7 +244,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData); char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf); int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId, - tb_uid_t suid, int64_t blkVer, int8_t level, const char* tag); + tb_uid_t suid); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index c631b14816..b2f03fa7ba 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1872,33 +1872,13 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) return dumpBuf; } -static SHashObj* dupCheck = NULL; -static int8_t dupInit = 0; - int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, const STSchema* pTSchema, - int64_t uid, int32_t vgId, tb_uid_t suid, int64_t blkVer, int8_t level, - const char* tag) { + int64_t uid, int32_t vgId, tb_uid_t suid) { SSubmitReq2* pReq = *ppReq; SArray* pVals = NULL; int32_t numOfBlks = 0; int32_t sz = 1; - // if (!dupCheck) { - // if (0 == atomic_val_compare_exchange_8(&dupInit, 0, 1)) { - // dupCheck = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - // if (!dupCheck) ASSERT(0); - // } else { - // int32_t cnt = 0; - // while (!dupCheck) { - // ++cnt; - // if (cnt > 1000) { - // sched_yield(); - // cnt = 0; - // } - // } - // } - // } - terrno = TSDB_CODE_SUCCESS; if (NULL == pReq) { @@ -1912,8 +1892,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat } } - char dupKey[70]; - for (int32_t i = 0; i < sz; ++i) { int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; @@ -1958,19 +1936,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat ASSERT(PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId); SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, (SValue){.val = *(TSKEY*)var}); taosArrayPush(pVals, &cv); - snprintf(dupKey, 70, "%" PRIi8 ":%d:%" PRIi64 ":%" PRIi64 ":%" PRIi64, level, vgId, uid, *(TSKEY*)var, - blkVer); - uInfo("%s:%d key:ver: %s, tags: %s", __func__, __LINE__, dupKey, tag); - int32_t dupKeyLen = strlen(dupKey); - assert(dupKeyLen < 70); - void* hashKey = NULL; - if ((hashKey = taosHashGet(dupCheck, &dupKey, dupKeyLen + 1))) { - ASSERT(0); - } else { - if(taosHashPut(dupCheck, &dupKey, dupKeyLen + 1, NULL, 0) != 0){ - ASSERT(0); - } - } } else if (colDataIsNull_s(pColInfoData, j)) { SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); taosArrayPush(pVals, &cv); diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 36a7615af1..29d3e752b2 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -220,7 +220,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback); int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); -int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); +// int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback); void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName); void tdRSmaQTaskInfoGetFullPath(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index e770e8c22a..2aa898e59e 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -176,7 +176,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { if (!isCommit) goto _exit; - code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); + // code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); TSDB_CHECK_CODE(code, lino, _exit); smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index a94ddf7111..72d7895f2a 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -45,7 +45,7 @@ static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); static void tdFreeRSmaSubmitItems(SArray *pItems); static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo); static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, - int64_t suid, const char* tag); + int64_t suid); static void tdRSmaFetchTrigger(void *param, void *tmrId); static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); @@ -581,7 +581,7 @@ _end: } static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, - int64_t suid, const char* tag) { + int64_t suid) { int32_t code = 0; int32_t lino = 0; SSDataBlock *output = NULL; @@ -620,8 +620,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma SSubmitReq2 *pReq = NULL; // TODO: the schema update should be handled later(TD-17965) - if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid, - output->info.version, pItem->level, tag) < 0) { + if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) { code = terrno ? terrno : TSDB_CODE_RSMA_RESULT; TSDB_CHECK_CODE(code, lino, _exit); } @@ -776,7 +775,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, } SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); - tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid, __func__); + tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid); return TSDB_CODE_SUCCESS; } @@ -845,10 +844,6 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { * @param suid * @return int32_t */ - -static SHashObj* dupVerCheck = NULL; -static int8_t dupVerInit = 0; - static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType, tb_uid_t suid) { SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid); @@ -858,35 +853,6 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, } if (inputType == STREAM_INPUT__DATA_SUBMIT) { - char dupKey[40]; - if (!dupVerCheck) { - if (0 == atomic_val_compare_exchange_8(&dupVerInit, 0, 1)) { - dupVerCheck = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - if (!dupVerCheck) ASSERT(0); - } else { - int32_t cnt = 0; - while (!dupVerCheck) { - ++cnt; - if (cnt > 1000) { - sched_yield(); - cnt = 0; - } - } - } - } - - snprintf(dupKey, 40, "%d:%" PRIi64 ":%" PRIi64, SMA_VID(pSma), version); - int32_t dupKeyLen = strlen(dupKey); - assert(dupKeyLen < 40); - void *hashKey = NULL; - if ((hashKey = taosHashGet(dupVerCheck, &dupKey, dupKeyLen + 1))) { - ASSERT(0); - } else { - if (taosHashPut(dupVerCheck, &dupKey, dupKeyLen + 1, NULL, 0) != 0) { - ASSERT(0); - } - } - if (tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid) < 0) { tdReleaseRSmaInfo(pSma, pRSmaInfo); return TSDB_CODE_FAILED; @@ -1085,7 +1051,7 @@ _err: return code; } - +#if 0 int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { int32_t code = 0; int32_t lino = 0; @@ -1126,7 +1092,7 @@ _exit: terrno = code; return code; } - +#endif /** * @brief trigger to get rsma result in async mode * @@ -1280,7 +1246,7 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { goto _err; } - if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid, __func__) < 0) { + if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) { goto _err; } -- GitLab