未验证 提交 b7bfe978 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #19583 from taosdata/feature/event

feat: add event window
......@@ -64,6 +64,9 @@ extern "C" {
#define EXPLAIN_IGNORE_GROUPID_FORMAT "Ignore Group Id: %s"
#define EXPLAIN_PARTITION_KETS_FORMAT "Partition Key: "
#define EXPLAIN_INTERP_FORMAT "Interp"
#define EXPLAIN_EVENT_FORMAT "Event"
#define EXPLAIN_EVENT_START_FORMAT "Start Cond: "
#define EXPLAIN_EVENT_END_FORMAT "End Cond: "
#define EXPLAIN_PLANNING_TIME_FORMAT "Planning Time: %.3f ms"
#define EXPLAIN_EXEC_TIME_FORMAT "Execution Time: %.3f ms"
......
......@@ -114,129 +114,7 @@ _return:
int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNodeList **pChildren) {
int32_t tlen = 0;
SNodeList *pPhysiChildren = NULL;
switch (pNode->type) {
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: {
STagScanPhysiNode *pTagScanNode = (STagScanPhysiNode *)pNode;
pPhysiChildren = pTagScanNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: {
STableScanPhysiNode *pTblScanNode = (STableScanPhysiNode *)pNode;
pPhysiChildren = pTblScanNode->scan.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: {
SSystemTableScanPhysiNode *pSTblScanNode = (SSystemTableScanPhysiNode *)pNode;
pPhysiChildren = pSTblScanNode->scan.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_PROJECT: {
SProjectPhysiNode *pPrjNode = (SProjectPhysiNode *)pNode;
pPhysiChildren = pPrjNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
SSortMergeJoinPhysiNode *pJoinNode = (SSortMergeJoinPhysiNode *)pNode;
pPhysiChildren = pJoinNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode;
pPhysiChildren = pAggNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode;
pPhysiChildren = pExchNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_SORT: {
SSortPhysiNode *pSortNode = (SSortPhysiNode *)pNode;
pPhysiChildren = pSortNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: {
SIntervalPhysiNode *pIntNode = (SIntervalPhysiNode *)pNode;
pPhysiChildren = pIntNode->window.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: {
SSessionWinodwPhysiNode *pSessNode = (SSessionWinodwPhysiNode *)pNode;
pPhysiChildren = pSessNode->window.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: {
SStateWinodwPhysiNode *pStateNode = (SStateWinodwPhysiNode *)pNode;
pPhysiChildren = pStateNode->window.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_PARTITION: {
SPartitionPhysiNode *partitionPhysiNode = (SPartitionPhysiNode *)pNode;
pPhysiChildren = partitionPhysiNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE: {
SMergePhysiNode *mergePhysiNode = (SMergePhysiNode *)pNode;
pPhysiChildren = mergePhysiNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: {
SIndefRowsFuncPhysiNode *indefPhysiNode = (SIndefRowsFuncPhysiNode *)pNode;
pPhysiChildren = indefPhysiNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: {
SMergeAlignedIntervalPhysiNode *intPhysiNode = (SMergeAlignedIntervalPhysiNode *)pNode;
pPhysiChildren = intPhysiNode->window.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_FILL: {
SFillPhysiNode *fillPhysiNode = (SFillPhysiNode *)pNode;
pPhysiChildren = fillPhysiNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN: {
STableMergeScanPhysiNode *mergePhysiNode = (STableMergeScanPhysiNode *)pNode;
pPhysiChildren = mergePhysiNode->scan.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: {
SBlockDistScanPhysiNode *distPhysiNode = (SBlockDistScanPhysiNode *)pNode;
pPhysiChildren = distPhysiNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: {
SLastRowScanPhysiNode *lastRowPhysiNode = (SLastRowScanPhysiNode *)pNode;
pPhysiChildren = lastRowPhysiNode->scan.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN: {
STableCountScanPhysiNode *tableCountPhysiNode = (STableCountScanPhysiNode *)pNode;
pPhysiChildren = tableCountPhysiNode->scan.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT: {
SGroupSortPhysiNode *groupSortPhysiNode = (SGroupSortPhysiNode *)pNode;
pPhysiChildren = groupSortPhysiNode->node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: {
SMergeIntervalPhysiNode *mergeIntPhysiNode = (SMergeIntervalPhysiNode *)pNode;
pPhysiChildren = mergeIntPhysiNode->window.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: {
SInterpFuncPhysiNode *interpPhysiNode = (SInterpFuncPhysiNode *)pNode;
pPhysiChildren = interpPhysiNode->node.pChildren;
break;
}
default:
qError("not supported physical node type %d", pNode->type);
QRY_ERR_RET(TSDB_CODE_APP_ERROR);
}
SNodeList *pPhysiChildren = pNode->pChildren;
if (pPhysiChildren) {
*pChildren = nodesMakeList();
......@@ -1583,6 +1461,36 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT: {
SEventWinodwPhysiNode *pEventNode = (SEventWinodwPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_EVENT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pEventNode->window.pFuncs->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pEventNode->window.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_EVENT_START_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pEventNode->pStartCond, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_EVENT_END_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pEventNode->pEndCond, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
break;
}
default:
qError("not supported physical node type %d", pNode->type);
return TSDB_CODE_APP_ERROR;
......
......@@ -742,6 +742,7 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput);
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
......@@ -813,6 +814,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo);
// clang-format on
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
......@@ -872,6 +875,10 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSu
SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag);
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs);
void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo);
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) ;
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorimpl.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "ttime.h"
typedef struct SEventWindowOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SExprSupp scalarSup;
SGroupResInfo groupResInfo;
SWindowRowsSup winSup;
bool hasKey;
SStateKeys stateKey;
int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup;
SFilterInfo* pStartCondInfo;
SFilterInfo* pEndCondInfo;
bool inWindow;
SResultRow* pRow;
} SEventWindowOperatorInfo;
static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator);
static void destroyEWindowOperatorInfo(void* param);
static void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock);
static SSDataBlock* doEventWindowAgg(SOperatorInfo* pOperator);
// todo : move to util
static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
uint64_t groupId) {
pRowSup->startRowIndex = rowIndex;
pRowSup->numOfRows = 0;
pRowSup->win.skey = tsList[rowIndex];
pRowSup->groupId = groupId;
}
static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
pRowSup->win.ekey = ts;
pRowSup->prevTs = ts;
pRowSup->numOfRows += 1;
pRowSup->groupId = groupId;
}
static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, bool includeEndpoint) {
int64_t* ts = (int64_t*)pColData->pData;
int32_t delta = includeEndpoint ? 1 : 0;
int64_t duration = pWin->ekey - pWin->skey + delta;
ts[2] = duration; // set the duration
ts[3] = pWin->skey; // window start key
ts[4] = pWin->ekey + delta; // window end key
}
SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
SExecTaskInfo* pTaskInfo) {
SEventWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SEventWindowOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
SEventWinodwPhysiNode* pEventWindowNode = (SEventWinodwPhysiNode*)physiNode;
int32_t tsSlotId = ((SColumnNode*)pEventWindowNode->window.pTspk)->slotId;
int32_t code = filterInitFromNode((SNode*)pEventWindowNode->pStartCond, &pInfo->pStartCondInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = filterInitFromNode((SNode*)pEventWindowNode->pEndCond, &pInfo->pEndCondInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (pEventWindowNode->window.pExprs != NULL) {
int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pEventWindowNode->window.pExprs, NULL, &numOfScalarExpr);
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
code = filterInitFromNode((SNode*)pEventWindowNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pEventWindowNode->window.pFuncs, NULL, &num);
initResultSizeInfo(&pOperator->resultInfo, 4096);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
SSDataBlock* pResBlock = createDataBlockFromDescNode(pEventWindowNode->window.node.pOutputDataBlockDesc);
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
initBasicInfo(&pInfo->binfo, pResBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pEventWindowNode->window.watermark,
.calTrigger = pEventWindowNode->window.triggerType};
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->tsSlotId = tsSlotId;
setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, eventWindowAggregate, NULL, destroyEWindowOperatorInfo,
optrDefaultBufFn, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
if (pInfo != NULL) {
destroyEWindowOperatorInfo(pInfo);
}
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
}
void destroyEWindowOperatorInfo(void* param) {
SEventWindowOperatorInfo* pInfo = (SEventWindowOperatorInfo*)param;
if (pInfo == NULL) {
return;
}
if (pInfo->pRow != NULL) {
taosMemoryFree(pInfo->pRow);
}
if (pInfo->pStartCondInfo != NULL) {
filterFreeInfo(pInfo->pStartCondInfo);
pInfo->pStartCondInfo = NULL;
}
if (pInfo->pEndCondInfo != NULL) {
filterFreeInfo(pInfo->pEndCondInfo);
pInfo->pEndCondInfo = NULL;
}
cleanupBasicInfo(&pInfo->binfo);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupAggSup(&pInfo->aggSup);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosMemoryFreeClear(param);
}
static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) {
SEventWindowOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pSup = &pOperator->exprSupp;
int32_t order = TSDB_ORDER_ASC;
SSDataBlock* pRes = pInfo->binfo.pRes;
blockDataCleanup(pRes);
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
break;
}
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSup.pExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
pInfo->scalarSup.numOfExprs, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
}
eventWindowAggImpl(pOperator, pInfo, pBlock);
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
return pRes;
}
}
return pRes->info.rows == 0 ? NULL : pRes;
}
static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
SExprSupp* pExprSup, SAggSupporter* pAggSup) {
if (*pResult == NULL) {
SResultRow* p = taosMemoryCalloc(1, pAggSup->resultRowSize);
pResultRowInfo->cur = (SResultRowPosition){.pageId = p->pageId, .offset = p->offset};
*pResult = p;
}
(*pResult)->win = *win;
clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs);
setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
return TSDB_CODE_SUCCESS;
}
static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSup, int32_t startIndex, int32_t endIndex,
const SSDataBlock* pBlock, int64_t* tsList, SExecTaskInfo* pTaskInfo) {
SWindowRowsSup* pRowSup = &pInfo->winSup;
int32_t numOfOutput = pSup->numOfExprs;
int32_t numOfRows = endIndex - startIndex + 1;
doKeepTuple(pRowSup, tsList[endIndex], pBlock->info.id.groupId);
int32_t ret =
setSingleOutputTupleBufv1(&pInfo->binfo.resultRowInfo, &pRowSup->win, &pInfo->pRow, pSup, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startIndex, numOfRows,
pBlock->info.rows, numOfOutput);
}
void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pSup = &pOperator->exprSupp;
SSDataBlock* pRes = pInfo->binfo.pRes;
int64_t gid = pBlock->info.id.groupId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
SColumnInfoData *ps = NULL, *pe = NULL;
SWindowRowsSup* pRowSup = &pInfo->winSup;
pRowSup->numOfRows = 0;
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
int32_t code = filterSetDataFromSlotId(pInfo->pStartCondInfo, &param1);
int32_t status1 = 0;
bool keep1 = filterExecute(pInfo->pStartCondInfo, pBlock, &ps, NULL, param1.numOfCols, &status1);
SFilterColumnParam param2 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
code = filterSetDataFromSlotId(pInfo->pEndCondInfo, &param2);
int32_t status2 = 0;
bool keep2 = filterExecute(pInfo->pEndCondInfo, pBlock, &pe, NULL, param2.numOfCols, &status2);
int32_t rowIndex = 0;
int32_t startIndex = pInfo->inWindow ? 0 : -1;
while (rowIndex < pBlock->info.rows) {
if (pInfo->inWindow) { // let's find the first end value
for (rowIndex = startIndex; rowIndex < pBlock->info.rows; ++rowIndex) {
if (((bool*)pe->pData)[rowIndex]) {
break;
}
}
if (rowIndex < pBlock->info.rows) {
doEventWindowAggImpl(pInfo, pSup, startIndex, rowIndex, pBlock, tsList, pTaskInfo);
doUpdateNumOfRows(pSup->pCtx, pInfo->pRow, pSup->numOfExprs, pSup->rowEntryInfoOffset);
// check buffer size
if (pRes->info.rows + pInfo->pRow->numOfRows >= pRes->info.capacity) {
int32_t newSize = pRes->info.rows + pInfo->pRow->numOfRows;
blockDataEnsureCapacity(pRes, newSize);
}
copyResultrowToDataBlock(pSup->pExprInfo, pSup->numOfExprs, pInfo->pRow, pSup->pCtx, pRes,
pSup->rowEntryInfoOffset, pTaskInfo);
pRes->info.rows += pInfo->pRow->numOfRows;
pInfo->inWindow = false;
rowIndex += 1;
} else {
doEventWindowAggImpl(pInfo, pSup, startIndex, pBlock->info.rows - 1, pBlock, tsList, pTaskInfo);
}
} else { // find the first start value that is fulfill for the start condition
for (; rowIndex < pBlock->info.rows; ++rowIndex) {
if (((bool*)ps->pData)[rowIndex]) {
doKeepNewWindowStartInfo(pRowSup, tsList, rowIndex, gid);
pInfo->inWindow = true;
startIndex = rowIndex;
break;
}
}
if (pInfo->inWindow) {
continue;
} else {
break;
}
}
}
colDataDestroy(ps);
taosMemoryFree(ps);
colDataDestroy(pe);
taosMemoryFree(pe);
}
......@@ -833,6 +833,20 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
}
}
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t i = 0; i < numOfOutput; ++i) {
SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
if (pResInfo == NULL) {
continue;
}
pResInfo->initialized = false;
pResInfo->numOfRes = 0;
pResInfo->isNullRes = 0;
pResInfo->complete = false;
}
}
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
if (pFilterInfo == NULL || pBlock->info.rows == 0) {
return;
......@@ -892,12 +906,11 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoD
}
int32_t numOfRows = 0;
if (IS_VAR_DATA_TYPE(pDst->info.type)) {
int32_t j = 0;
pDst->varmeta.length = 0;
while(j < totalRows) {
while (j < totalRows) {
if (pIndicator[j] == 0) {
j += 1;
continue;
......@@ -1050,8 +1063,7 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u
pAggInfo->groupId = groupId;
}
static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs,
const int32_t* rowEntryOffset) {
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) {
bool returnNotNull = false;
for (int32_t j = 0; j < numOfExprs; ++j) {
SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
......@@ -1074,8 +1086,8 @@ static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t nu
}
}
static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
for (int32_t j = 0; j < numOfExprs; ++j) {
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
......@@ -1111,7 +1123,7 @@ static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SR
// todo refactor. SResultRow has direct pointer in miainfo
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
if (page == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, terrno);
......@@ -1141,7 +1153,7 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos
T_LONG_JMP(pTaskInfo->env, code);
}
doCopyResultToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
copyResultrowToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
releaseBufPage(pBuf, page);
pBlock->info.rows += pRow->numOfRows;
......@@ -1193,7 +1205,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
}
pGroupResInfo->index += 1;
doCopyResultToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
releaseBufPage(pBuf, page);
pBlock->info.rows += pRow->numOfRows;
......@@ -2222,8 +2234,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
} else {
ASSERT(0);
}
if (pOperator != NULL) {
......@@ -2312,8 +2322,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else {
ASSERT(0);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT == type) {
pOptr = createEventwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo);
}
taosMemoryFree(ops);
......
......@@ -1973,6 +1973,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
}
// todo make this as an non-blocking operator
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
SExecTaskInfo* pTaskInfo) {
SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
print ======== prepare data
sql drop database if exists db1;
sql create database db1 vgroups 5;
sql use db1;
sql create stable sta (ts timestamp, f1 int, f2 binary(10), f3 bool) tags(t1 int, t2 bool, t3 binary(10));
sql create table tba1 using sta tags(0, false, '0');
sql create table tba2 using sta tags(1, true, '1');
sql create table tba3 using sta tags(null, null, '');
sql create table tba4 using sta tags(1, false, null);
sql create table tba5 using sta tags(3, true, 'aa');
sql insert into tba1 values ('2022-09-26 15:15:01', 0, "a", false);
sql insert into tba1 values ('2022-09-26 15:15:02', 1, "0", true);
sql insert into tba1 values ('2022-09-26 15:15:03', 5, "5", false);
sql insert into tba1 values ('2022-09-26 15:15:04', 3, 'b', false);
sql insert into tba1 values ('2022-09-26 15:15:05', 0, '1', false);
sql insert into tba1 values ('2022-09-26 15:15:06', 2, 'd', true);
sql insert into tba2 values ('2022-09-27 15:15:01', 0, "a", false);
sql insert into tba2 values ('2022-09-27 15:15:02', 1, "0", true);
sql insert into tba2 values ('2022-09-27 15:15:03', 5, "5", false);
sql insert into tba2 values ('2022-09-27 15:15:04', null, null, null);
# child table: no window
print ====> select count(*) from tba1 event_window start with f1 = 0 end with f2 = 'c';
sql select count(*) from tba1 event_window start with f1 = 0 end with f2 = 'c';
if $rows != 0 then
return -1
endi
# child table: single row window
print ====> select count(*) from tba1 event_window start with f1 = 0 end with f3 = false;
sql select count(*) from tba1 event_window start with f1 = 0 end with f3 = false
if $rows != 2 then
return -1
endi
if $data00 != 1 then
return -1
endi
# child table: multi rows window
print ====> select count(*) from tba1 event_window start with f1 = 0 end with f2 = 'b';
sql select count(*) from tba1 event_window start with f1 = 0 end with f2 = 'b';
if $rows != 1 then
return -1
endi
if $data00 != 4 then
return -1
endi
# child table: multi windows
print ====> select count(*) from tba1 event_window start with f1 >= 0 end with f3 = true;
sql select count(*) from tba1 event_window start with f1 >= 0 end with f3 = true;
if $rows != 2 then
return -1
endi
if $data00 != 2 then
return -1
endi
if $data10 != 4 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册