提交 15544a22 编写于 作者: C Cary Xu

enh: rsma fetch

上级 1785f5bb
...@@ -246,9 +246,10 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag); ...@@ -246,9 +246,10 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag);
// for debug // for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf); char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId, int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid); tb_uid_t suid);
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
......
...@@ -1870,20 +1870,20 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) ...@@ -1870,20 +1870,20 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
* @brief TODO: Assume that the final generated result it less than 3M * @brief TODO: Assume that the final generated result it less than 3M
* *
* @param pReq * @param pReq
* @param pDataBlock * @param pDataBlocks
* @param vgId * @param vgId
* @param suid * @param suid
* *
*/ */
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId, int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid) { tb_uid_t suid) {
int32_t sz = taosArrayGetSize(pDataBlocks);
int32_t bufSize = sizeof(SSubmitReq); int32_t bufSize = sizeof(SSubmitReq);
int32_t sz = 1;
for (int32_t i = 0; i < sz; ++i) { for (int32_t i = 0; i < sz; ++i) {
const SDataBlockInfo* pBlkInfo = &pDataBlock->info; SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGet(pDataBlocks, i))->info;
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t numOfCols = taosArrayGetSize(pDataBlocks);
bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(colNum)); bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(numOfCols));
bufSize += sizeof(SSubmitBlk); bufSize += sizeof(SSubmitBlk);
} }
...@@ -1900,6 +1900,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB ...@@ -1900,6 +1900,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB
tdSRowInit(&rb, pTSchema->version); tdSRowInit(&rb, pTSchema->version);
for (int32_t i = 0; i < sz; ++i) { for (int32_t i = 0; i < sz; ++i) {
SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i);
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
// int32_t rowSize = pDataBlock->info.rowSize; // int32_t rowSize = pDataBlock->info.rowSize;
...@@ -1997,7 +1998,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB ...@@ -1997,7 +1998,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB
} }
offset += TYPE_BYTES[pCol->type]; // sum/avg would convert to int64_t/uint64_t/double during aggregation offset += TYPE_BYTES[pCol->type]; // sum/avg would convert to int64_t/uint64_t/double during aggregation
} }
tdSRowEnd(&rb);
dataLen += TD_ROW_LEN(rb.pBuf); dataLen += TD_ROW_LEN(rb.pBuf);
#ifdef TD_DEBUG_PRINT_ROW #ifdef TD_DEBUG_PRINT_ROW
tdSRowPrint(rb.pBuf, pTSchema, __func__); tdSRowPrint(rb.pBuf, pTSchema, __func__);
......
...@@ -337,6 +337,7 @@ SArray *vmGetMsgHandles() { ...@@ -337,6 +337,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
...@@ -347,7 +348,6 @@ SArray *vmGetMsgHandles() { ...@@ -347,7 +348,6 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -54,7 +54,7 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -54,7 +54,7 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
if (IsReq(pMsg)) { if (IsReq(pMsg)) {
if (code != 0) { if (code != 0) {
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
dGError("msg:%p, failed to process since %s", pMsg, terrstr()); dGError("msg:%p, failed to process since %s", pMsg, terrstr(code));
} }
vmSendRsp(pMsg, code); vmSendRsp(pMsg, code);
} }
...@@ -72,7 +72,7 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -72,7 +72,7 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg); int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg);
if (code != 0) { if (code != 0) {
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, terrstr()); dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, terrstr(code));
vmSendRsp(pMsg, code); vmSendRsp(pMsg, code);
} }
...@@ -89,7 +89,7 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -89,7 +89,7 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) { if (code != 0) {
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
dGError("vgId:%d, msg:%p failed to process stream since %s", pVnode->vgId, pMsg, terrstr()); dGError("vgId:%d, msg:%p failed to process stream since %s", pVnode->vgId, pMsg, terrstr(code));
vmSendRsp(pMsg, code); vmSendRsp(pMsg, code);
} }
...@@ -110,7 +110,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -110,7 +110,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) { if (code != 0) {
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr()); dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr(code));
vmSendRsp(pMsg, code); vmSendRsp(pMsg, code);
} }
...@@ -156,7 +156,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp ...@@ -156,7 +156,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
if ((pMsg->msgType == TDMT_SCH_QUERY) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) { if ((pMsg->msgType == TDMT_SCH_QUERY) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) {
terrno = TSDB_CODE_GRANT_EXPIRED; terrno = TSDB_CODE_GRANT_EXPIRED;
code = terrno; code = terrno;
dDebug("vgId:%d, msg:%p put into vnode-query queue failed since %s", pVnode->vgId, pMsg, terrstr()); dDebug("vgId:%d, msg:%p put into vnode-query queue failed since %s", pVnode->vgId, pMsg, terrstr(code));
} else { } else {
vnodePreprocessQueryMsg(pVnode->pImpl, pMsg); vnodePreprocessQueryMsg(pVnode->pImpl, pMsg);
dGTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg);
...@@ -179,11 +179,11 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp ...@@ -179,11 +179,11 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
if (!osDataSpaceAvailable()) { if (!osDataSpaceAvailable()) {
terrno = TSDB_CODE_VND_NO_DISKSPACE; terrno = TSDB_CODE_VND_NO_DISKSPACE;
code = terrno; code = terrno;
dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr()); dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code));
} else if ((pMsg->msgType == TDMT_VND_SUBMIT) && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) { } else if ((pMsg->msgType == TDMT_VND_SUBMIT) && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) {
terrno = TSDB_CODE_VND_NO_WRITE_AUTH; terrno = TSDB_CODE_VND_NO_WRITE_AUTH;
code = terrno; code = terrno;
dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr()); dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code));
} else { } else {
dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pWriteQ, pMsg); taosWriteQitem(pVnode->pWriteQ, pMsg);
......
...@@ -293,7 +293,9 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat ...@@ -293,7 +293,9 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
if (pItem->maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) { if (pItem->maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) {
pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY; pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY;
} }
pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2; pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2;
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
smaInfo("vgId:%d, table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64 smaInfo("vgId:%d, table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64
", finally maxdelay:%" PRIi32, ", finally maxdelay:%" PRIi32,
TD_VID(pVnode), pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay); TD_VID(pVnode), pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay);
...@@ -614,9 +616,13 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm ...@@ -614,9 +616,13 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm
uint64_t ts; uint64_t ts;
int32_t code = qExecTaskOpt(taskInfo, pResList, &ts); int32_t code = qExecTaskOpt(taskInfo, pResList, &ts);
if (code < 0) { if (code < 0) {
smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid, if (code == TSDB_CODE_QRY_IN_EXEC) {
pItem->level, terrstr(code)); break;
goto _err; } else {
smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid,
pItem->level, terrstr(code));
goto _err;
}
} }
if (taosArrayGetSize(pResList) == 0) { if (taosArrayGetSize(pResList) == 0) {
...@@ -630,37 +636,34 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm ...@@ -630,37 +636,34 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm
break; break;
} }
for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
SSDataBlock *output = taosArrayGetP(pResList, i);
#if 1 #if 1
char flag[10] = {0}; char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, pItem->level); snprintf(flag, 10, "level %" PRIi8, pItem->level);
// blockDebugShowDataBlocks(output, flag); blockDebugShowDataBlocks(pResList, flag);
// taosArrayDestroy(pResult);
#endif #endif
STsdb * sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
SSubmitReq *pReq = NULL; SSubmitReq *pReq = NULL;
// TODO: the schema update should be handled later(TD-17965)
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) {
smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s",
SMA_VID(pSma), suid, pItem->level, terrstr());
goto _err;
}
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
taosMemoryFreeClear(pReq);
smaError("vgId:%d, process submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s",
SMA_VID(pSma), suid, pItem->level, terrstr());
goto _err;
}
smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%" PRIi64, // TODO: the schema update should be handled later(TD-17965)
SMA_VID(pSma), suid, pItem->level, output->info.version); if (buildSubmitReqFromDataBlock(&pReq, pResList, pTSchema, SMA_VID(pSma), suid) < 0) {
smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma),
suid, pItem->level, terrstr());
goto _err;
}
SSDataBlock *pBlk = (SSDataBlock *)taosArrayGet(pResList, 0);
if (pReq && tdProcessSubmitReq(sinkTsdb, pBlk->info.version, pReq) < 0) {
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
smaError("vgId:%d, process submit req for rsma stable %" PRIi64 " level %" PRIi8 " version %" PRIi64
" failed since %s",
SMA_VID(pSma), suid, pItem->level, terrstr());
goto _err;
} }
smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version %" PRIi64, SMA_VID(pSma),
suid, pItem->level);
taosMemoryFreeClear(pReq);
} }
taosArrayDestroy(pResList); taosArrayDestroy(pResList);
...@@ -692,15 +695,12 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType ...@@ -692,15 +695,12 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
} }
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
tdRSmaFetchAndSubmitResult(pSma, RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid, tdRSmaFetchAndSubmitResult(pSma, RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid,
STREAM_INPUT__DATA_SUBMIT); STREAM_INPUT__DATA_SUBMIT);
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
if (smaMgmt.tmrHandle) { if (smaMgmt.tmrHandle) {
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
} else {
ASSERT(0);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -746,7 +746,6 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { ...@@ -746,7 +746,6 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
return NULL; return NULL;
} }
// clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat // clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat
SRSmaInfo *pCowRSmaInfo = NULL; SRSmaInfo *pCowRSmaInfo = NULL;
// lock // lock
...@@ -793,13 +792,7 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { ...@@ -793,13 +792,7 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) { static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid); SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid);
if (!pRSmaInfo) { if (!pRSmaInfo) {
smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); smaError("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
return TSDB_CODE_SUCCESS;
}
if (!RSMA_INFO_QTASK(pRSmaInfo, 0)) {
tdReleaseRSmaInfo(pSma, pRSmaInfo);
smaDebug("vgId:%d, execute rsma, no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1331,14 +1324,16 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { ...@@ -1331,14 +1324,16 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem); SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem);
if (RSMA_INFO_IS_DEL(pRSmaInfo)) { if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
smaDebug("rsma fetch task not start since rsma info already deleted, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
pRSmaInfo->refId);
return; return;
} }
SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
if (!pStat) { if (!pStat) {
smaDebug("rsma fetch task not start since already destroyed, rsetId rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId, smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)",
pRSmaInfo->refId); smaMgmt.rsetId, pRSmaInfo->refId);
return; return;
} }
...@@ -1350,8 +1345,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { ...@@ -1350,8 +1345,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
case TASK_TRIGGER_STAT_PAUSED: case TASK_TRIGGER_STAT_PAUSED:
case TASK_TRIGGER_STAT_CANCELLED: { case TASK_TRIGGER_STAT_CANCELLED: {
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64 smaDebug("vgId:%d, rsma fetch task not start for level %" PRIi8 " since stat is %" PRIi8
" refId:%d", ", rsetId rsetId:%" PRIi64 " refId:%d",
SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId); SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId);
if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) { if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId); taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
...@@ -1366,30 +1361,31 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { ...@@ -1366,30 +1361,31 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
switch (fetchTriggerStat) { switch (fetchTriggerStat) {
case TASK_TRIGGER_STAT_ACTIVE: { case TASK_TRIGGER_STAT_ACTIVE: {
smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma), smaDebug("vgId:%d, rsma fetch task started for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
pItem->level, pRSmaInfo->suid); SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
// async process // async process
tdRSmaFetchSend(pSma, pRSmaInfo, pItem->level); tdRSmaFetchSend(pSma, pRSmaInfo, pItem->level);
} break; } break;
case TASK_TRIGGER_STAT_PAUSED: { case TASK_TRIGGER_STAT_PAUSED: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused", smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is paused",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid); SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
} break; } break;
case TASK_TRIGGER_STAT_INACTIVE: { case TASK_TRIGGER_STAT_INACTIVE: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is inactive", smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is inactive",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid); SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
} break; } break;
case TASK_TRIGGER_STAT_INIT: { case TASK_TRIGGER_STAT_INIT: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is init", SMA_VID(pSma), smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid::%" PRIi64 " since stat is init",
pItem->level, pRSmaInfo->suid); SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
} break; } break;
default: { default: {
smaWarn("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is unknown", smaWarn("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is unknown",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid); SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
} break; } break;
} }
_end: _end:
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
} }
...@@ -1402,7 +1398,7 @@ _end: ...@@ -1402,7 +1398,7 @@ _end:
* @return int32_t * @return int32_t
*/ */
int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) {
SRSmaFetchMsg fetchMsg = { .suid = pInfo->suid, .level = level}; SRSmaFetchMsg fetchMsg = {.suid = pInfo->suid, .level = level};
int32_t ret = 0; int32_t ret = 0;
int32_t contLen = 0; int32_t contLen = 0;
SEncoder encoder = {0}; SEncoder encoder = {0};
...@@ -1431,7 +1427,7 @@ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { ...@@ -1431,7 +1427,7 @@ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) {
.contLen = contLen, .contLen = contLen,
}; };
if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, FETCH_QUEUE, &rpcMsg)) != 0) { if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) {
smaError("vgId:%d, failed to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8 " since %s", smaError("vgId:%d, failed to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8 " since %s",
SMA_VID(pSma), pInfo->suid, level, terrstr()); SMA_VID(pSma), pInfo->suid, level, terrstr());
goto _err; goto _err;
...@@ -1462,7 +1458,7 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { ...@@ -1462,7 +1458,7 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) { if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) {
terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP; terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP;
return -1; goto _err;
} }
pBuf = POINTER_SHIFT(pRpcMsg->pCont, sizeof(SMsgHead)); pBuf = POINTER_SHIFT(pRpcMsg->pCont, sizeof(SMsgHead));
...@@ -1479,7 +1475,7 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { ...@@ -1479,7 +1475,7 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
terrno = TSDB_CODE_RSMA_EMPTY_INFO; terrno = TSDB_CODE_RSMA_EMPTY_INFO;
} }
smaWarn("vgId:%d, failed to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8 " since %s", SMA_VID(pSma), smaWarn("vgId:%d, failed to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8 " since %s", SMA_VID(pSma),
req.suid, req.level, terrstr()); req.suid, req.level, terrstr());
goto _err; goto _err;
} }
......
...@@ -293,6 +293,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -293,6 +293,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0); return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
case TDMT_SCH_QUERY_CONTINUE: case TDMT_SCH_QUERY_CONTINUE:
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0); return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
case TDMT_VND_FETCH_RSMA:
return smaProcessFetch(pVnode->pSma, pMsg);
default: default:
vError("unknown msg type:%d in query queue", pMsg->msgType); vError("unknown msg type:%d in query queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR; return TSDB_CODE_VND_APP_ERROR;
...@@ -329,8 +331,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -329,8 +331,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return vnodeGetTableCfg(pVnode, pMsg, true); return vnodeGetTableCfg(pVnode, pMsg, true);
case TDMT_VND_BATCH_META: case TDMT_VND_BATCH_META:
return vnodeGetBatchMeta(pVnode, pMsg); return vnodeGetBatchMeta(pVnode, pMsg);
case TDMT_VND_FETCH_RSMA:
return smaProcessFetch(pVnode->pSma, pMsg);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg); return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RUN: case TDMT_STREAM_TASK_RUN:
......
...@@ -507,7 +507,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { ...@@ -507,7 +507,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
*pRes = NULL; *pRes = NULL;
int64_t curOwner = 0; int64_t curOwner = 0;
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner); qDebug("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
return pTaskInfo->code; return pTaskInfo->code;
} }
...@@ -529,7 +529,6 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { ...@@ -529,7 +529,6 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
cleanUpUdfs(); cleanUpUdfs();
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
atomic_store_64(&pTaskInfo->owner, 0); atomic_store_64(&pTaskInfo->owner, 0);
return pTaskInfo->code; return pTaskInfo->code;
} }
......
...@@ -119,7 +119,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, "Disconnected from ser ...@@ -119,7 +119,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, "Disconnected from ser
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, "No write permission") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, "No write permission")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, "Connection killed") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, "Connection killed")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, "Syntax error in SQL") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, "Syntax error in SQL")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, "Database not specified or available") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, "TSC:Database not specified or available")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, "Table does not exist") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, "Table does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_EXCEED_SQL_LIMIT, "SQL statement too long, check maxSQLLength config") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_EXCEED_SQL_LIMIT, "SQL statement too long, check maxSQLLength config")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_FILE_EMPTY, "File is empty") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_FILE_EMPTY, "File is empty")
...@@ -220,7 +220,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_COMMENT, "Invalid func comment" ...@@ -220,7 +220,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_COMMENT, "Invalid func comment"
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retrieve msg") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retrieve msg")
// mnode-db // mnode-db
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_NOT_SELECTED, "Database not specified or available") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_NOT_SELECTED, "MND:Database not specified or available")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_ALREADY_EXIST, "Database already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_ALREADY_EXIST, "Database already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION, "Invalid database options") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION, "Invalid database options")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB, "Invalid database name") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB, "Invalid database name")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册