提交 4b3d9891 编写于 作者: C Cary Xu

other: naming optimization

上级 77bfd400
......@@ -42,9 +42,9 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSiz
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
static void tdFreeRSmaSubmitItems(SArray *pItems);
static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr);
static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid);
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr);
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid);
static void tdRSmaFetchTrigger(void *param, void *tmrId);
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
......@@ -636,8 +636,8 @@ _end:
return code;
}
static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid) {
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid) {
SArray *pResList = taosArrayInit(1, POINTER_BYTES);
if (pResList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -815,7 +815,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
}
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
tdRSmaFetchAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid);
tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid);
return TSDB_CODE_SUCCESS;
}
......@@ -1596,46 +1596,15 @@ static void tdFreeRSmaSubmitItems(SArray *pItems) {
}
}
static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr) {
// step 1: consume submit req
#if 0
int64_t qMemSize = 0;
if ((qMemSize = taosQueueMemorySize(pInfo->queue) > 0)) {
taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
atomic_fetch_sub_64(&pRSmaStat->nBufItems, taosQallItemSize(pInfo->qall));
taosArrayClear(pSubmitArr);
while (1) {
void *msg = NULL;
taosGetQitem(pInfo->qall, (void **)&msg);
if (msg) {
if (taosArrayPush(pSubmitArr, &msg) < 0) {
tdFreeRSmaSubmitItems(pSubmitArr);
goto _err;
}
} else {
break;
}
}
int32_t size = taosArrayGetSize(pSubmitArr);
if (size > 0) {
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, RSMA_EXEC_TIMEOUT, i) <
0) {
tdFreeRSmaSubmitItems(pSubmitArr);
goto _err;
}
}
tdFreeRSmaSubmitItems(pSubmitArr);
}
}
#endif
// step 2: fetch rsma result(consider the efficiency and functionality)
/**
* @brief fetch rsma result(consider the efficiency and functionality)
*
* @param pSma
* @param pInfo
* @param pSubmitArr
* @return int32_t
*/
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr) {
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
......@@ -1665,7 +1634,7 @@ static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubm
if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
goto _err;
}
if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) {
if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) {
tdCleanupStreamInputDataBlock(taskInfo);
goto _err;
}
......@@ -1772,7 +1741,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type);
}
tdRSmaConsumeAndFetch(pSma, pInfo, pSubmitArr);
tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr);
if (qallItemSize > 0) {
// subtract the item size after the task finished, commit should wait for all items be consumed
atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize);
......@@ -1803,7 +1772,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type);
}
// tdRSmaConsumeAndFetch(pSma, pInfo, pSubmitArr);
// tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr);
ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0));
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册