未验证 提交 03280133 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #19948 from taosdata/feature/event

fix(query): identify the data blocks from different group.
...@@ -26,23 +26,19 @@ typedef struct SEventWindowOperatorInfo { ...@@ -26,23 +26,19 @@ typedef struct SEventWindowOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup; SAggSupporter aggSup;
SExprSupp scalarSup; SExprSupp scalarSup;
SGroupResInfo groupResInfo;
SWindowRowsSup winSup; SWindowRowsSup winSup;
bool hasKey;
SStateKeys stateKey;
int32_t tsSlotId; // primary timestamp column slot id int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
uint64_t groupId; // current group id, used to identify the data block from different groups
SFilterInfo* pStartCondInfo; SFilterInfo* pStartCondInfo;
SFilterInfo* pEndCondInfo; SFilterInfo* pEndCondInfo;
bool inWindow; bool inWindow;
SResultRow* pRow; SResultRow* pRow;
} SEventWindowOperatorInfo; } SEventWindowOperatorInfo;
static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator); static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator);
static void destroyEWindowOperatorInfo(void* param); static void destroyEWindowOperatorInfo(void* param);
static void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); static int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock);
static SSDataBlock* doEventWindowAgg(SOperatorInfo* pOperator);
// todo : move to util // todo : move to util
static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
...@@ -176,7 +172,6 @@ void destroyEWindowOperatorInfo(void* param) { ...@@ -176,7 +172,6 @@ void destroyEWindowOperatorInfo(void* param) {
colDataDestroy(&pInfo->twAggSup.timeWindowData); colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
...@@ -254,36 +249,45 @@ static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSu ...@@ -254,36 +249,45 @@ static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSu
pBlock->info.rows, numOfOutput); pBlock->info.rows, numOfOutput);
} }
void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
SSDataBlock* pRes = pInfo->binfo.pRes;
SSDataBlock* pRes = pInfo->binfo.pRes; int64_t gid = pBlock->info.id.groupId;
int64_t gid = pBlock->info.id.groupId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
TSKEY* tsList = (TSKEY*)pColInfoData->pData; TSKEY* tsList = (TSKEY*)pColInfoData->pData;
SWindowRowsSup* pRowSup = &pInfo->winSup;
SColumnInfoData *ps = NULL, *pe = NULL; SColumnInfoData *ps = NULL, *pe = NULL;
int32_t rowIndex = 0;
SWindowRowsSup* pRowSup = &pInfo->winSup;
pRowSup->numOfRows = 0; pRowSup->numOfRows = 0;
if (pInfo->groupId == 0) {
pInfo->groupId = gid;
} else if (pInfo->groupId != gid) {
// this is a new group, reset the info
pInfo->inWindow = false;
}
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
int32_t code = filterSetDataFromSlotId(pInfo->pStartCondInfo, &param1);
int32_t code = filterSetDataFromSlotId(pInfo->pStartCondInfo, &param1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t status1 = 0; int32_t status1 = 0;
bool keep1 = filterExecute(pInfo->pStartCondInfo, pBlock, &ps, NULL, param1.numOfCols, &status1); filterExecute(pInfo->pStartCondInfo, pBlock, &ps, NULL, param1.numOfCols, &status1);
SFilterColumnParam param2 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; SFilterColumnParam param2 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
code = filterSetDataFromSlotId(pInfo->pEndCondInfo, &param2); code = filterSetDataFromSlotId(pInfo->pEndCondInfo, &param2);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t status2 = 0; int32_t status2 = 0;
bool keep2 = filterExecute(pInfo->pEndCondInfo, pBlock, &pe, NULL, param2.numOfCols, &status2); filterExecute(pInfo->pEndCondInfo, pBlock, &pe, NULL, param2.numOfCols, &status2);
int32_t rowIndex = 0;
int32_t startIndex = pInfo->inWindow ? 0 : -1; int32_t startIndex = pInfo->inWindow ? 0 : -1;
while (rowIndex < pBlock->info.rows) { while (rowIndex < pBlock->info.rows) {
if (pInfo->inWindow) { // let's find the first end value if (pInfo->inWindow) { // let's find the first end value
for (rowIndex = startIndex; rowIndex < pBlock->info.rows; ++rowIndex) { for (rowIndex = startIndex; rowIndex < pBlock->info.rows; ++rowIndex) {
...@@ -294,7 +298,6 @@ void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInf ...@@ -294,7 +298,6 @@ void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInf
if (rowIndex < pBlock->info.rows) { if (rowIndex < pBlock->info.rows) {
doEventWindowAggImpl(pInfo, pSup, startIndex, rowIndex, pBlock, tsList, pTaskInfo); doEventWindowAggImpl(pInfo, pSup, startIndex, rowIndex, pBlock, tsList, pTaskInfo);
doUpdateNumOfRows(pSup->pCtx, pInfo->pRow, pSup->numOfExprs, pSup->rowEntryInfoOffset); doUpdateNumOfRows(pSup->pCtx, pInfo->pRow, pSup->numOfExprs, pSup->rowEntryInfoOffset);
// check buffer size // check buffer size
...@@ -324,9 +327,9 @@ void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInf ...@@ -324,9 +327,9 @@ void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInf
} }
if (pInfo->inWindow) { if (pInfo->inWindow) {
continue; continue; // try to find the end position
} else { } else {
break; break; // no valid start position, quit
} }
} }
} }
...@@ -335,4 +338,6 @@ void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInf ...@@ -335,4 +338,6 @@ void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInf
taosMemoryFree(ps); taosMemoryFree(ps);
colDataDestroy(pe); colDataDestroy(pe);
taosMemoryFree(pe); taosMemoryFree(pe);
return TSDB_CODE_SUCCESS;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册