提交 14e69034 编写于 作者: H Haojun Liao

[td-13039] fix bug in session window agg.

上级 0ae4bab0
...@@ -573,6 +573,7 @@ typedef struct SGroupbyOperatorInfo { ...@@ -573,6 +573,7 @@ typedef struct SGroupbyOperatorInfo {
typedef struct SSessionAggOperatorInfo { typedef struct SSessionAggOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup; SAggSupporter aggSup;
SGroupResInfo groupResInfo;
STimeWindow curWindow; // current time window STimeWindow curWindow; // current time window
TSKEY prevTs; // previous timestamp TSKEY prevTs; // previous timestamp
int32_t numOfRows; // number of rows int32_t numOfRows; // number of rows
......
...@@ -1798,14 +1798,17 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) { ...@@ -1798,14 +1798,17 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) {
} }
} }
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { // todo handle multiple tables cases.
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo *pInfo, SSDataBlock *pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// primary timestamp column // primary timestamp column
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
// bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); bool masterScan = true;
SOptrBasicInfo* pBInfo = &pInfo->binfo; STimeWindow window = {0};
int32_t numOfOutput = pOperator->numOfOutput;
int64_t gid = pBlock->info.groupId;
int64_t gap = pInfo->gap; int64_t gap = pInfo->gap;
pInfo->numOfRows = 0; pInfo->numOfRows = 0;
...@@ -1815,7 +1818,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ...@@ -1815,7 +1818,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
} }
TSKEY* tsList = (TSKEY*)pColInfoData->pData; TSKEY* tsList = (TSKEY*)pColInfoData->pData;
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { for (int32_t j = 0; j < pBlock->info.rows; ++j) {
if (pInfo->prevTs == INT64_MIN) { if (pInfo->prevTs == INT64_MIN) {
pInfo->curWindow.skey = tsList[j]; pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j]; pInfo->curWindow.ekey = tsList[j];
...@@ -1832,16 +1835,15 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ...@@ -1832,16 +1835,15 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
} }
} else { // start a new session window } else { // start a new session window
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
int64_t gid = pSDataBlock->info.groupId;
pInfo->curWindow.ekey = pInfo->curWindow.skey; pInfo->curWindow.ekey = pInfo->curWindow.skey;
// int32_t ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, gid, pInfo->binfo.pCtx, int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &window, masterScan, &pResult, gid, pInfo->binfo.pCtx,
// numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
// if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
// longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
// } }
// doApplyFunctions(pInfo->binfo.pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); // pInfo->numOfRows data belong to the current session window
doApplyFunctions(pInfo->binfo.pCtx, &window, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
pInfo->curWindow.skey = tsList[j]; pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j]; pInfo->curWindow.ekey = tsList[j];
...@@ -1854,14 +1856,13 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ...@@ -1854,14 +1856,13 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey; pInfo->curWindow.ekey = pInfo->curWindow.skey;
// int32_t ret = setResultOutputBufByKey_rv(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.uid, &pInfo->curWindow, masterScan, int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &window, masterScan, &pResult, gid, pInfo->binfo.pCtx,
// &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
// pBInfo->rowCellInfoOffset); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
// if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
// longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); }
// }
// doApplyFunctions(pInfo->binfo.pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); doApplyFunctions(pInfo->binfo.pCtx, &window, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
} }
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
...@@ -6703,7 +6704,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) { ...@@ -6703,7 +6704,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) {
STableIntervalOperatorInfo* pInfo = pOperator->info; STableIntervalOperatorInfo* pInfo = pOperator->info;
// int32_t order = pQueryAttr->order.order; int32_t order = TSDB_ORDER_ASC;
// STimeWindow win = pQueryAttr->window; // STimeWindow win = pQueryAttr->window;
bool newgroup = false; bool newgroup = false;
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
...@@ -6720,7 +6721,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) { ...@@ -6720,7 +6721,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) {
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
} }
...@@ -6732,7 +6733,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) { ...@@ -6732,7 +6733,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SSDataBlock* doIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { static SSDataBlock* doBuildIntervalResult(SOperatorInfo *pOperator, bool* newgroup) {
STableIntervalOperatorInfo* pInfo = pOperator->info; STableIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
...@@ -7064,13 +7065,14 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo *pOperator, bool* newgroup) ...@@ -7064,13 +7065,14 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo *pOperator, bool* newgroup)
return NULL; return NULL;
} }
SSessionAggOperatorInfo* pWindowInfo = pOperator->info; SSessionAggOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pWindowInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset);
if (pBInfo->pRes->info.rows == 0/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; doSetOperatorCompleted(pOperator);
return NULL;
} }
return pBInfo->pRes; return pBInfo->pRes;
...@@ -7089,19 +7091,20 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo *pOperator, bool* newgroup) ...@@ -7089,19 +7091,20 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo *pOperator, bool* newgroup)
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order);
doSessionWindowAggImpl(pOperator, pWindowInfo, pBlock); doSessionWindowAggImpl(pOperator, pInfo, pBlock);
} }
// restore the value // restore the value
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo); closeAllResultRows(&pBInfo->resultRowInfo);
// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
finalizeQueryResult(pBInfo->pCtx, pOperator->numOfOutput);
// initGroupResInfo(&pBInfo->groupResInfo, &pBInfo->resultRowInfo); initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) { blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity);
pOperator->status = OP_EXEC_DONE; toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
} }
return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes; return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes;
...@@ -7678,7 +7681,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -7678,7 +7681,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->numOfOutput = numOfCols; pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->_openFn = doOpenIntervalAgg; pOperator->_openFn = doOpenIntervalAgg;
pOperator->getNextFn = doIntervalAgg; pOperator->getNextFn = doBuildIntervalResult;
pOperator->closeFn = destroyIntervalOperatorInfo; pOperator->closeFn = destroyIntervalOperatorInfo;
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
...@@ -8775,11 +8778,11 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod ...@@ -8775,11 +8778,11 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
for(int32_t i = 0; i < num; ++i) { for(int32_t i = 0; i < num; ++i) {
SSlotDescNode* pNode = (SSlotDescNode*) nodesListGetNode(pOutputNodeList->pSlots, i); SSlotDescNode* pNode = (SSlotDescNode*) nodesListGetNode(pOutputNodeList->pSlots, i);
SColMatchInfo* info = taosArrayGet(pList, pNode->slotId); SColMatchInfo* info = taosArrayGet(pList, pNode->slotId);
if (pNode->output) { // if (pNode->output) {
(*numOfOutputCols) += 1; (*numOfOutputCols) += 1;
} else { // } else {
info->output = false; // info->output = false;
} // }
} }
return pList; return pList;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册