提交 0aac37a4 编写于 作者: C Cary Xu

enh: rsma fetch logic optimization

上级 cb7be9a2
...@@ -246,7 +246,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag); ...@@ -246,7 +246,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag);
// for debug // for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf); char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid); tb_uid_t suid);
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
......
...@@ -200,6 +200,7 @@ enum { ...@@ -200,6 +200,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", SRSmaFetchMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL)
......
...@@ -1874,21 +1874,20 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) ...@@ -1874,21 +1874,20 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
* @brief TODO: Assume that the final generated result it less than 3M * @brief TODO: Assume that the final generated result it less than 3M
* *
* @param pReq * @param pReq
* @param pDataBlocks * @param pDataBlock
* @param vgId * @param vgId
* @param suid // TODO: check with Liao whether suid response is reasonable * @param suid
* *
* TODO: colId should be set
*/ */
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid) { tb_uid_t suid) {
int32_t sz = taosArrayGetSize(pDataBlocks);
int32_t bufSize = sizeof(SSubmitReq); int32_t bufSize = sizeof(SSubmitReq);
int32_t sz = 1;
for (int32_t i = 0; i < sz; ++i) { for (int32_t i = 0; i < sz; ++i) {
SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGet(pDataBlocks, i))->info; const SDataBlockInfo* pBlkInfo = &pDataBlock->info;
int32_t numOfCols = taosArrayGetSize(pDataBlocks); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(numOfCols)); bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(colNum));
bufSize += sizeof(SSubmitBlk); bufSize += sizeof(SSubmitBlk);
} }
...@@ -1905,7 +1904,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks ...@@ -1905,7 +1904,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
tdSRowInit(&rb, pTSchema->version); tdSRowInit(&rb, pTSchema->version);
for (int32_t i = 0; i < sz; ++i) { for (int32_t i = 0; i < sz; ++i) {
SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i);
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
// int32_t rowSize = pDataBlock->info.rowSize; // int32_t rowSize = pDataBlock->info.rowSize;
......
...@@ -401,7 +401,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -401,7 +401,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1); tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1);
if (cfgAddInt32(pCfg, "numOfVnodeWriteThreads", tsNumOfVnodeWriteThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeWriteThreads", tsNumOfVnodeWriteThreads, 1, 1024, 0) != 0) return -1;
tsNumOfVnodeSyncThreads = tsNumOfCores; // tsNumOfVnodeSyncThreads = tsNumOfCores;
tsNumOfVnodeSyncThreads = 32;
tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 1); tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 1);
if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1;
......
...@@ -347,6 +347,7 @@ SArray *vmGetMsgHandles() { ...@@ -347,6 +347,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -187,6 +187,7 @@ int32_t smaAsyncPreCommit(SSma* pSma); ...@@ -187,6 +187,7 @@ int32_t smaAsyncPreCommit(SSma* pSma);
int32_t smaAsyncCommit(SSma* pSma); int32_t smaAsyncCommit(SSma* pSma);
int32_t smaAsyncPostCommit(SSma* pSma); int32_t smaAsyncPostCommit(SSma* pSma);
int32_t smaDoRetention(SSma* pSma, int64_t now); int32_t smaDoRetention(SSma* pSma, int64_t now);
int32_t smaProcessFetch(SSma *pSma, void* pMsg);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
......
...@@ -36,19 +36,17 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputT ...@@ -36,19 +36,17 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputT
int8_t level); int8_t level);
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, int64_t suid, int8_t blkType);
SRSmaStat *pStat, int8_t blkType); static void tdRSmaFetchTrigger(void *param, void *tmrId);
static void tdRSmaFetchTrigger(void *param, void *tmrId); static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level);
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter);
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter); static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem);
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem); static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) { static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) {
// adapt accordingly if definition of SRSmaInfo update // adapt accordingly if definition of SRSmaInfo update
...@@ -604,11 +602,8 @@ _end: ...@@ -604,11 +602,8 @@ _end:
return code; return code;
} }
static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
SRSmaStat *pStat, int8_t blkType) { int64_t suid, int8_t blkType) {
SArray *pResult = NULL;
SSma *pSma = pStat->pSma;
while (1) { while (1) {
SSDataBlock *output = NULL; SSDataBlock *output = NULL;
uint64_t ts; uint64_t ts;
...@@ -619,30 +614,20 @@ static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *p ...@@ -619,30 +614,20 @@ static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *p
pItem->level, terrstr(code)); pItem->level, terrstr(code));
goto _err; goto _err;
} }
if (!output) {
break;
}
if (!pResult) { if (output) {
pResult = taosArrayInit(1, sizeof(SSDataBlock)); #if 0
if (!pResult) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
taosArrayPush(pResult, output);
if (taosArrayGetSize(pResult) > 0) {
#if 1
char flag[10] = {0}; char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, pItem->level); snprintf(flag, 10, "level %" PRIi8, pItem->level);
SArray *pResult = taosArrayInit(1, sizeof(SSDataBlock));
taosArrayPush(pResult, output);
blockDebugShowDataBlocks(pResult, flag); blockDebugShowDataBlocks(pResult, flag);
taosArrayDestroy(pResult);
#endif #endif
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
SSubmitReq *pReq = NULL; SSubmitReq *pReq = NULL;
// TODO: the schema update should be handled later(TD-17965) // TODO: the schema update should be handled later(TD-17965)
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) { if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) {
smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s",
SMA_VID(pSma), suid, pItem->level, terrstr()); SMA_VID(pSma), suid, pItem->level, terrstr());
goto _err; goto _err;
...@@ -659,18 +644,17 @@ static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *p ...@@ -659,18 +644,17 @@ static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *p
SMA_VID(pSma), suid, pItem->level, output->info.version); SMA_VID(pSma), suid, pItem->level, output->info.version);
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
taosArrayClear(pResult);
} else if (terrno == 0) { } else if (terrno == 0) {
smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level); smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level);
break;
} else { } else {
smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, terrstr()); smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, terrstr());
goto _err;
} }
} }
tdDestroySDataBlockArray(pResult);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
tdDestroySDataBlockArray(pResult);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -694,11 +678,9 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType ...@@ -694,11 +678,9 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat);
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
tdRSmaFetchAndSubmitResult(RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid, pStat, tdRSmaFetchAndSubmitResult(pSma, RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid,
STREAM_INPUT__DATA_SUBMIT); STREAM_INPUT__DATA_SUBMIT);
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
...@@ -724,11 +706,13 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { ...@@ -724,11 +706,13 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
SRSmaInfo *pRSmaInfo = NULL; SRSmaInfo *pRSmaInfo = NULL;
if (!pEnv) { if (!pEnv) {
terrno = TSDB_CODE_RSMA_INVALID_ENV;
return NULL; return NULL;
} }
pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
if (!pStat || !RSMA_INFO_HASH(pStat)) { if (!pStat || !RSMA_INFO_HASH(pStat)) {
terrno = TSDB_CODE_RSMA_INVALID_STAT;
return NULL; return NULL;
} }
...@@ -743,12 +727,12 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { ...@@ -743,12 +727,12 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return pRSmaInfo; return pRSmaInfo;
} }
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
if (RSMA_COMMIT_STAT(pStat) == 0) { // return NULL if not in committing stat if (RSMA_COMMIT_STAT(pStat) == 0) { // return NULL if not in committing stat
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL; return NULL;
} }
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
// clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat // clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat
SRSmaInfo *pCowRSmaInfo = NULL; SRSmaInfo *pCowRSmaInfo = NULL;
...@@ -779,7 +763,7 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { ...@@ -779,7 +763,7 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
ASSERT(!pCowRSmaInfo); ASSERT(!pCowRSmaInfo);
} }
if(pCowRSmaInfo) { if (pCowRSmaInfo) {
tdRefRSmaInfo(pSma, pCowRSmaInfo); tdRefRSmaInfo(pSma, pCowRSmaInfo);
} }
// unlock // unlock
...@@ -1323,7 +1307,7 @@ _err: ...@@ -1323,7 +1307,7 @@ _err:
} }
/** /**
* @brief trigger to get rsma result * @brief trigger to get rsma result in async mode
* *
* @param param * @param param
* @param tmrId * @param tmrId
...@@ -1357,8 +1341,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { ...@@ -1357,8 +1341,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
" refId:%d", " refId:%d",
SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId); SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId);
if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) { if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay > 5000 ? 5000 : pItem->maxDelay, pItem, smaMgmt.tmrHandle, taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
&pItem->tmrId);
} }
return; return;
} }
...@@ -1372,16 +1355,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { ...@@ -1372,16 +1355,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
case TASK_TRIGGER_STAT_ACTIVE: { case TASK_TRIGGER_STAT_ACTIVE: {
smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma), smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma),
pItem->level, pRSmaInfo->suid); pItem->level, pRSmaInfo->suid);
// async process
// sync procedure => async process tdRSmaFetchSend(pSma, pRSmaInfo, pItem->level);
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
qTaskInfo_t taskInfo = pRSmaInfo->taskInfo[pItem->level - 1];
qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK);
tdRSmaFetchAndSubmitResult(taskInfo, pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat,
STREAM_INPUT__DATA_BLOCK);
tdCleanupStreamInputDataBlock(taskInfo);
} break; } break;
case TASK_TRIGGER_STAT_PAUSED: { case TASK_TRIGGER_STAT_PAUSED: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused", smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused",
...@@ -1404,3 +1379,90 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { ...@@ -1404,3 +1379,90 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
_end: _end:
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
} }
/**
* @brief put rsma fetch msg to fetch queue
*
* @param pSma
* @param pInfo
* @param level
* @return int32_t
*/
int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) {
SRSmaFetchMsg fetchMsg = {.refId = pInfo->refId, .suid = pInfo->suid, .level = level};
int32_t ret = 0;
int32_t contLen = 0;
SEncoder encoder = {0};
tEncodeSize(tEncodeSRSmaFetchMsg, &fetchMsg, contLen, ret);
if (ret < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tEncoderClear(&encoder);
goto _err;
}
void *pBuf = rpcMallocCont(contLen);
tEncoderInit(&encoder, pBuf, contLen);
if (tEncodeSRSmaFetchMsg(&encoder, &fetchMsg) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tEncoderClear(&encoder);
}
tEncoderClear(&encoder);
SRpcMsg rpcMsg = {
.code = 0,
.msgType = TDMT_VND_FETCH_RSMA,
.pCont = pBuf,
.contLen = contLen,
};
if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, FETCH_QUEUE, &rpcMsg)) != 0) {
smaError("vgId:%d, failed to put rsma fetch msg into fetch-queue for suid:%d level:%" PRIi8 " since %s",
SMA_VID(pSma), pInfo->suid, level, terrstr());
goto _err;
}
return TSDB_CODE_SUCCESS;
_err:
return TSDB_CODE_FAILED;
}
int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg;
SRSmaFetchMsg req = {0};
SDecoder decoder = {0};
SRSmaInfo *pInfo = NULL;
SRSmaInfoItem *pItem = NULL;
tDecoderInit(&decoder, pRpcMsg->pCont, pRpcMsg->contLen);
if (tDecodeSRSmaFetchMsg(&decoder, &req) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _err;
}
pInfo = tdAcquireRSmaInfoBySuid(pSma, req.suid);
if (!pInfo) {
smaDebug("vgId:%d, failed to process rsma fetch msg since Empty rsma info", SMA_VID(pSma));
goto _err;
}
pItem = RSMA_INFO_ITEM(pInfo, req.level - 1);
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, req.level - 1);
if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
goto _err;
}
if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid, STREAM_INPUT__DATA_BLOCK) < 0) {
goto _err;
}
tdCleanupStreamInputDataBlock(taskInfo);
tdReleaseRSmaInfo(pSma, pInfo);
tDecoderClear(&decoder);
return TSDB_CODE_SUCCESS;
_err:
tdReleaseRSmaInfo(pSma, pInfo);
tDecoderClear(&decoder);
smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr());
return TSDB_CODE_FAILED;
}
...@@ -325,6 +325,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -325,6 +325,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return vnodeGetTableCfg(pVnode, pMsg, true); return vnodeGetTableCfg(pVnode, pMsg, true);
case TDMT_VND_BATCH_META: case TDMT_VND_BATCH_META:
return vnodeGetBatchMeta(pVnode, pMsg); return vnodeGetBatchMeta(pVnode, pMsg);
case TDMT_VND_FETCH_RSMA:
return smaProcessFetch(pVnode->pSma, pMsg);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg); return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RUN: case TDMT_STREAM_TASK_RUN:
......
...@@ -47,6 +47,8 @@ char* logStoreSimple2Str(SSyncLogStore* pLogStore); ...@@ -47,6 +47,8 @@ char* logStoreSimple2Str(SSyncLogStore* pLogStore);
SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore); SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore);
SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore);
// for debug // for debug
void logStorePrint(SSyncLogStore* pLogStore); void logStorePrint(SSyncLogStore* pLogStore);
void logStorePrint2(char* s, SSyncLogStore* pLogStore); void logStorePrint2(char* s, SSyncLogStore* pLogStore);
......
...@@ -357,16 +357,14 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -357,16 +357,14 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0); ASSERT(code == 0);
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "log truncate, from %" PRId64 " to %" PRId64, delBegin, delEnd);
syncNodeEventLog(ths, eventLog);
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
return code; return code;
} }
// if FromIndex > walCommitVer, return 0
// else return num of pass entries
static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) { static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
int32_t code; int32_t code = 0;
int32_t pass = 0;
SyncIndex delBegin = FromIndex; SyncIndex delBegin = FromIndex;
SyncIndex delEnd = ths->pLogStore->syncLogLastIndex(ths->pLogStore); SyncIndex delEnd = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
...@@ -398,16 +396,31 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) { ...@@ -398,16 +396,31 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
} }
} }
// update delete begin
SyncIndex walCommitVer = logStoreWalCommitVer(ths->pLogStore);
if (delBegin <= walCommitVer) {
delBegin = walCommitVer + 1;
pass = walCommitVer - delBegin + 1;
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "update delete begin to %ld", delBegin);
syncNodeEventLog(ths, logBuf);
} while (0);
}
// delete confict entries // delete confict entries
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0); ASSERT(code == 0);
char eventLog[128]; do {
snprintf(eventLog, sizeof(eventLog), "log truncate, from %" PRId64 " to %" PRId64, delBegin, delEnd); char logBuf[128];
syncNodeEventLog(ths, eventLog); snprintf(logBuf, sizeof(logBuf), "make log same from:%ld, delbegin:%ld, pass:%d", FromIndex, delBegin, pass);
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); syncNodeEventLog(ths, logBuf);
} while (0);
return code; return pass;
} }
int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code) { int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code) {
...@@ -543,31 +556,34 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -543,31 +556,34 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg); SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg);
if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) { if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) {
// make log same int32_t pass = 0;
do { SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); bool hasExtraEntries = logLastIndex > pMsg->prevLogIndex;
bool hasExtraEntries = logLastIndex > pMsg->prevLogIndex;
if (hasExtraEntries) { // make log same
// make log same, rollback deleted entries if (hasExtraEntries) {
code = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1); // make log same, rollback deleted entries
ASSERT(code == 0); pass = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1);
} ASSERT(pass >= 0);
}
} while (0);
// append entry batch // append entry batch
for (int32_t i = 0; i < pMsg->dataCount; ++i) { if (pass == 0) {
SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); // assert! no batch
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); ASSERT(pMsg->dataCount <= 1);
if (code != 0) {
return -1; for (int32_t i = 0; i < pMsg->dataCount; ++i) {
} SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
if (code != 0) {
return -1;
}
code = syncNodePreCommit(ths, pAppendEntry, 0); code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0); ASSERT(code == 0);
// syncEntryDestory(pAppendEntry); // syncEntryDestory(pAppendEntry);
}
} }
// fsync once // fsync once
...@@ -670,25 +686,33 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -670,25 +686,33 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
syncLogRecvAppendEntriesBatch(ths, pMsg, "really match"); syncLogRecvAppendEntriesBatch(ths, pMsg, "really match");
int32_t pass = 0;
if (hasExtraEntries) { if (hasExtraEntries) {
// make log same, rollback deleted entries // make log same, rollback deleted entries
code = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1); pass = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1);
ASSERT(code == 0); ASSERT(pass >= 0);
} }
if (hasAppendEntries) { if (hasAppendEntries) {
// append entry batch // append entry batch
for (int32_t i = 0; i < pMsg->dataCount; ++i) { if (pass == 0) {
SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); // assert! no batch
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); ASSERT(pMsg->dataCount <= 1);
if (code != 0) {
return -1; // append entry batch
} for (int32_t i = 0; i < pMsg->dataCount; ++i) {
SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
if (code != 0) {
return -1;
}
code = syncNodePreCommit(ths, pAppendEntry, 0); code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0); ASSERT(code == 0);
// syncEntryDestory(pAppendEntry); // syncEntryDestory(pAppendEntry);
}
} }
// fsync once // fsync once
......
...@@ -92,6 +92,12 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -92,6 +92,12 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
} }
} }
// advance commit index as large as possible
SyncIndex walCommitVer = logStoreWalCommitVer(pSyncNode->pLogStore);
if (walCommitVer > newCommitIndex) {
newCommitIndex = walCommitVer;
}
// maybe execute fsm // maybe execute fsm
if (newCommitIndex > pSyncNode->commitIndex) { if (newCommitIndex > pSyncNode->commitIndex) {
SyncIndex beginIndex = pSyncNode->commitIndex + 1; SyncIndex beginIndex = pSyncNode->commitIndex + 1;
......
...@@ -2409,6 +2409,9 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { ...@@ -2409,6 +2409,9 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param; SSyncNode* pSyncNode = (SSyncNode*)param;
syncNodeEventLog(pSyncNode, "eq hb timer");
if (pSyncNode->replicaNum > 1) { if (pSyncNode->replicaNum > 1) {
if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <= if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <=
atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) { atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) {
......
...@@ -305,10 +305,18 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, ...@@ -305,10 +305,18 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
return code; return code;
} }
// truncate semantic
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex) { static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
int32_t code = walRollback(pWal, fromIndex);
// need not truncate
SyncIndex wallastVer = walGetLastVer(pWal);
if (fromIndex > wallastVer) {
return 0;
}
int32_t code = walRollback(pWal, fromIndex);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
...@@ -323,7 +331,7 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn ...@@ -323,7 +331,7 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
// event log // event log
do { do {
char logBuf[128]; char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal truncate, from-index:%" PRId64, fromIndex); snprintf(logBuf, sizeof(logBuf), "log truncate, from-index:%" PRId64, fromIndex);
syncNodeEventLog(pData->pSyncNode, logBuf); syncNodeEventLog(pData->pSyncNode, logBuf);
} while (0); } while (0);
...@@ -637,6 +645,12 @@ SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) { ...@@ -637,6 +645,12 @@ SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) {
return walGetFirstVer(pWal); return walGetFirstVer(pWal);
} }
SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
return walGetCommittedVer(pWal);
}
// for debug ----------------- // for debug -----------------
void logStorePrint(SSyncLogStore* pLogStore) { void logStorePrint(SSyncLogStore* pLogStore) {
char* serialized = logStore2Str(pLogStore); char* serialized = logStore2Str(pLogStore);
......
...@@ -29,8 +29,8 @@ sql insert into ct1 values(now, 10); ...@@ -29,8 +29,8 @@ sql insert into ct1 values(now, 10);
sql insert into ct1 values(now+1s, 1); sql insert into ct1 values(now+1s, 1);
sql insert into ct1 values(now+2s, 100); sql insert into ct1 values(now+2s, 100);
print =============== wait maxdelay 15+1 seconds for results print =============== wait maxdelay 15+2 seconds for results
sleep 16000 sleep 17000
print =============== select * from retention level 2 from memory print =============== select * from retention level 2 from memory
sql select * from ct1; sql select * from ct1;
......
...@@ -29,8 +29,8 @@ sql insert into ct1 values(now, 10, 10.0); ...@@ -29,8 +29,8 @@ sql insert into ct1 values(now, 10, 10.0);
sql insert into ct1 values(now+1s, 1, 1.0); sql insert into ct1 values(now+1s, 1, 1.0);
sql insert into ct1 values(now+2s, 100, 100.0); sql insert into ct1 values(now+2s, 100, 100.0);
print =============== wait maxdelay 5+1 seconds for results print =============== wait maxdelay 5+2 seconds for results
sleep 6000 sleep 7000
print =============== select * from retention level 2 from memory print =============== select * from retention level 2 from memory
sql select * from ct1; sql select * from ct1;
...@@ -135,8 +135,8 @@ print =============== insert after rsma qtaskinfo recovery ...@@ -135,8 +135,8 @@ print =============== insert after rsma qtaskinfo recovery
sql insert into ct1 values(now, 50, 500.0); sql insert into ct1 values(now, 50, 500.0);
sql insert into ct1 values(now+1s, 40, 40.0); sql insert into ct1 values(now+1s, 40, 40.0);
print =============== wait maxdelay 5+1 seconds for results print =============== wait maxdelay 5+2 seconds for results
sleep 6000 sleep 7000
print =============== select * from retention level 2 from file and memory after rsma qtaskinfo recovery print =============== select * from retention level 2 from file and memory after rsma qtaskinfo recovery
sql select * from ct1; sql select * from ct1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册