未验证 提交 19fe9618 编写于 作者: L liuyao 提交者: GitHub

Merge pull request #15341 from taosdata/feature/TD-17357

fix(stream): push retrive datablock
......@@ -772,6 +772,41 @@ int32_t binarySearch(void* keyList, int num, TSKEY key, int order, __get_value_f
return midPos;
}
int32_t comparePullWinKey(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data;
SPullWindowInfo* pos = taosArrayGet(res, index);
SPullWindowInfo* pData = (SPullWindowInfo*) pKey;
if (pData->window.skey == pos->window.skey) {
if (pData->groupId > pos->groupId) {
return 1;
} else if (pData->groupId < pos->groupId) {
return -1;
}
return 0;
} else if (pData->window.skey > pos->window.skey) {
return 1;
}
return -1;
}
static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
int32_t size = taosArrayGetSize(pPullWins);
int32_t index = binarySearchCom(pPullWins, size, pPullInfo, TSDB_ORDER_DESC, comparePullWinKey);
if (index == -1) {
index = 0;
} else {
if (comparePullWinKey(pPullInfo, pPullWins, index) > 0) {
index++;
} else {
return TSDB_CODE_SUCCESS;
}
}
if (taosArrayInsert(pPullWins, index, pPullInfo) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
int32_t compareResKey(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data;
SResKeyPos* pos = taosArrayGetP(res, index);
......@@ -2586,7 +2621,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
if (isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup) && !chIds) {
SPullWindowInfo pull = {.window = nextWin, .groupId = tableGroupId};
// add pull data request
taosArrayPush(pInfo->pPullWins, &pull);
savePullWindow(&pull, pInfo->pPullWins);
int32_t size = taosArrayGetSize(pInfo->pChildren);
addPullWindow(pInfo->pPullDataMap, &winRes, size);
qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size);
......@@ -2674,14 +2709,6 @@ void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsCol
blockDataUpdateTsWindow(pDest, 0);
}
static bool needBreak(SStreamFinalIntervalOperatorInfo* pInfo) {
int32_t size = taosArrayGetSize(pInfo->pPullWins);
if (pInfo->pullIndex < size) {
return true;
}
return false;
}
static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pBlock) {
clearSpecialDataBlock(pBlock);
int32_t size = taosArrayGetSize(array);
......@@ -2748,6 +2775,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
} else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
if (pInfo->pPullDataRes->info.rows != 0) {
// process the rest of the data
ASSERT(IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->pPullDataRes;
}
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0) {
pOperator->status = OP_EXEC_DONE;
......@@ -2776,13 +2811,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
// process the rest of the data
return pInfo->pUpdateRes;
}
doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
if (pInfo->pPullDataRes->info.rows != 0) {
// process the rest of the data
ASSERT(IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->pPullDataRes;
}
// doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
// if (pInfo->pPullDataRes->info.rows != 0) {
// // process the rest of the data
// ASSERT(IS_FINAL_OP(pInfo));
// printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
// return pInfo->pPullDataRes;
// }
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data
......@@ -2882,10 +2917,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true);
doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL);
if (needBreak(pInfo)) {
break;
}
}
}
......@@ -2899,6 +2930,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
if (pInfo->pPullDataRes->info.rows != 0) {
// process the rest of the data
ASSERT(IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->pPullDataRes;
}
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
......@@ -2913,14 +2953,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pUpdateRes;
}
doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
if (pInfo->pPullDataRes->info.rows != 0) {
// process the rest of the data
ASSERT(IS_FINAL_OP(pInfo));
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
return pInfo->pPullDataRes;
}
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册