提交 ba0bce80 编写于 作者: K kailixu

chore: code revert

上级 30f8c9c7
...@@ -244,7 +244,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData); ...@@ -244,7 +244,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData);
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf); char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId, int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId,
tb_uid_t suid, int64_t blkVer, int8_t level, const char* tag); tb_uid_t suid);
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);
......
...@@ -1872,33 +1872,13 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) ...@@ -1872,33 +1872,13 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
return dumpBuf; return dumpBuf;
} }
static SHashObj* dupCheck = NULL;
static int8_t dupInit = 0;
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, const STSchema* pTSchema, int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, const STSchema* pTSchema,
int64_t uid, int32_t vgId, tb_uid_t suid, int64_t blkVer, int8_t level, int64_t uid, int32_t vgId, tb_uid_t suid) {
const char* tag) {
SSubmitReq2* pReq = *ppReq; SSubmitReq2* pReq = *ppReq;
SArray* pVals = NULL; SArray* pVals = NULL;
int32_t numOfBlks = 0; int32_t numOfBlks = 0;
int32_t sz = 1; int32_t sz = 1;
// if (!dupCheck) {
// if (0 == atomic_val_compare_exchange_8(&dupInit, 0, 1)) {
// dupCheck = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
// if (!dupCheck) ASSERT(0);
// } else {
// int32_t cnt = 0;
// while (!dupCheck) {
// ++cnt;
// if (cnt > 1000) {
// sched_yield();
// cnt = 0;
// }
// }
// }
// }
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
if (NULL == pReq) { if (NULL == pReq) {
...@@ -1912,8 +1892,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat ...@@ -1912,8 +1892,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
} }
} }
char dupKey[70];
for (int32_t i = 0; i < sz; ++i) { for (int32_t i = 0; i < sz; ++i) {
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
...@@ -1958,19 +1936,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat ...@@ -1958,19 +1936,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
ASSERT(PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId); ASSERT(PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId);
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, (SValue){.val = *(TSKEY*)var}); SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, (SValue){.val = *(TSKEY*)var});
taosArrayPush(pVals, &cv); taosArrayPush(pVals, &cv);
snprintf(dupKey, 70, "%" PRIi8 ":%d:%" PRIi64 ":%" PRIi64 ":%" PRIi64, level, vgId, uid, *(TSKEY*)var,
blkVer);
uInfo("%s:%d key:ver: %s, tags: %s", __func__, __LINE__, dupKey, tag);
int32_t dupKeyLen = strlen(dupKey);
assert(dupKeyLen < 70);
void* hashKey = NULL;
if ((hashKey = taosHashGet(dupCheck, &dupKey, dupKeyLen + 1))) {
ASSERT(0);
} else {
if(taosHashPut(dupCheck, &dupKey, dupKeyLen + 1, NULL, 0) != 0){
ASSERT(0);
}
}
} else if (colDataIsNull_s(pColInfoData, j)) { } else if (colDataIsNull_s(pColInfoData, j)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv); taosArrayPush(pVals, &cv);
......
...@@ -220,7 +220,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); ...@@ -220,7 +220,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback); int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback);
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); // int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback); int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback);
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName); void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName); void tdRSmaQTaskInfoGetFullPath(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName);
......
...@@ -176,7 +176,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { ...@@ -176,7 +176,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
if (!isCommit) goto _exit; if (!isCommit) goto _exit;
code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); // code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
......
...@@ -45,7 +45,7 @@ static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); ...@@ -45,7 +45,7 @@ static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
static void tdFreeRSmaSubmitItems(SArray *pItems); static void tdFreeRSmaSubmitItems(SArray *pItems);
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo); static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid, const char* tag); int64_t suid);
static void tdRSmaFetchTrigger(void *param, void *tmrId); static void tdRSmaFetchTrigger(void *param, void *tmrId);
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
...@@ -581,7 +581,7 @@ _end: ...@@ -581,7 +581,7 @@ _end:
} }
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid, const char* tag) { int64_t suid) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SSDataBlock *output = NULL; SSDataBlock *output = NULL;
...@@ -620,8 +620,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma ...@@ -620,8 +620,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
SSubmitReq2 *pReq = NULL; SSubmitReq2 *pReq = NULL;
// TODO: the schema update should be handled later(TD-17965) // TODO: the schema update should be handled later(TD-17965)
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid, if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) {
output->info.version, pItem->level, tag) < 0) {
code = terrno ? terrno : TSDB_CODE_RSMA_RESULT; code = terrno ? terrno : TSDB_CODE_RSMA_RESULT;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
...@@ -776,7 +775,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, ...@@ -776,7 +775,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
} }
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid, __func__); tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -845,10 +844,6 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { ...@@ -845,10 +844,6 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
* @param suid * @param suid
* @return int32_t * @return int32_t
*/ */
static SHashObj* dupVerCheck = NULL;
static int8_t dupVerInit = 0;
static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType, static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
tb_uid_t suid) { tb_uid_t suid) {
SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid); SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid);
...@@ -858,35 +853,6 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, ...@@ -858,35 +853,6 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg,
} }
if (inputType == STREAM_INPUT__DATA_SUBMIT) { if (inputType == STREAM_INPUT__DATA_SUBMIT) {
char dupKey[40];
if (!dupVerCheck) {
if (0 == atomic_val_compare_exchange_8(&dupVerInit, 0, 1)) {
dupVerCheck = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (!dupVerCheck) ASSERT(0);
} else {
int32_t cnt = 0;
while (!dupVerCheck) {
++cnt;
if (cnt > 1000) {
sched_yield();
cnt = 0;
}
}
}
}
snprintf(dupKey, 40, "%d:%" PRIi64 ":%" PRIi64, SMA_VID(pSma), version);
int32_t dupKeyLen = strlen(dupKey);
assert(dupKeyLen < 40);
void *hashKey = NULL;
if ((hashKey = taosHashGet(dupVerCheck, &dupKey, dupKeyLen + 1))) {
ASSERT(0);
} else {
if (taosHashPut(dupVerCheck, &dupKey, dupKeyLen + 1, NULL, 0) != 0) {
ASSERT(0);
}
}
if (tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid) < 0) { if (tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid) < 0) {
tdReleaseRSmaInfo(pSma, pRSmaInfo); tdReleaseRSmaInfo(pSma, pRSmaInfo);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
...@@ -1085,7 +1051,7 @@ _err: ...@@ -1085,7 +1051,7 @@ _err:
return code; return code;
} }
#if 0
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
...@@ -1126,7 +1092,7 @@ _exit: ...@@ -1126,7 +1092,7 @@ _exit:
terrno = code; terrno = code;
return code; return code;
} }
#endif
/** /**
* @brief trigger to get rsma result in async mode * @brief trigger to get rsma result in async mode
* *
...@@ -1280,7 +1246,7 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { ...@@ -1280,7 +1246,7 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
goto _err; goto _err;
} }
if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid, __func__) < 0) { if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) {
goto _err; goto _err;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册