提交 352db0fe 编写于 作者: 5 54liuyao

feat(stream): optimize interval plan

上级 1a08229c
......@@ -1752,7 +1752,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
int32_t len = 0;
len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|group id:%" PRIu64 "| uid:%ld\n", flag,
len += snprintf(dumpBuf + len, size - len, "%s |block type %d |child id %d|group id:%" PRIu64 "| uid:%ld|======\n", "dumpBlockData",
(int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId,
pDataBlock->info.uid);
if (len >= size - 1) return dumpBuf;
......
......@@ -371,6 +371,13 @@ typedef struct SessionWindowSupporter {
uint8_t parentType;
} SessionWindowSupporter;
typedef struct STimeWindowSupp {
int8_t calTrigger;
int64_t waterMark;
TSKEY maxTs;
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp;
typedef struct SStreamScanInfo {
uint64_t tableUid; // queried super table uid
SExprInfo* pPseudoExpr;
......@@ -407,6 +414,7 @@ typedef struct SStreamScanInfo {
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
int32_t deleteDataIndex;
STimeWindow updateWin;
STimeWindowAggSupp twAggSup;
// status for tmq
// SSchemaWrapper schema;
......@@ -452,13 +460,6 @@ typedef struct SAggSupporter {
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
} SAggSupporter;
typedef struct STimeWindowSupp {
int8_t calTrigger;
int64_t waterMark;
TSKEY maxTs;
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp;
typedef struct SIntervalAggOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo; // basic info
......@@ -952,6 +953,7 @@ bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
......
......@@ -802,7 +802,12 @@ static bool isStateWindow(SStreamScanInfo* pInfo) {
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL;
pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
}
static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
}
static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) {
......@@ -1130,9 +1135,14 @@ static void setUpdateData(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlo
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* ts = (TSKEY*)pColDataInfo->pData;
TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[rowId]) && out) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
// must check update info first.
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
if ( (update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup)) ) && out) {
taosArrayPush(pInfo->tsArray, &rowId);
}
}
......@@ -1413,6 +1423,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo->tsArrayIndex = 0;
checkUpdateData(pInfo, true, pInfo->pRes, true);
setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey);
if (pInfo->pUpdateRes->info.rows > 0) {
if (pInfo->pUpdateRes->info.type == STREAM_CLEAR) {
pInfo->updateResIndex = 0;
......@@ -1584,13 +1595,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pUpdateRes = createResDataBlock(pDescNode);
pInfo->pCondition = pScanPhyNode->node.pConditions;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
pInfo->groupId = 0;
pInfo->pPullDataRes = createPullDataBlock();
pInfo->pStreamScanOp = pOperator;
pInfo->deleteDataIndex = 0;
pInfo->pDeleteDataRes = createPullDataBlock();
pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
pInfo->twAggSup = *pTwSup;
pOperator->name = "StreamScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
......
......@@ -652,8 +652,8 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
}
void printDataBlock(SSDataBlock* pBlock, const char* flag) {
if (pBlock == NULL) {
qDebug("======printDataBlock Block is Null");
if (!pBlock || pBlock->info.rows == 0) {
qDebug("======printDataBlock: Block is Null or Empty");
return;
}
char* pBuf = NULL;
......@@ -1355,7 +1355,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
int32_t size = taosArrayGetSize(chAy);
qDebug("window %" PRId64 " wait child size:%d", win.skey, size);
for (int32_t i = 0; i < size; i++) {
qDebug("window %" PRId64 " wait chid id:%d", win.skey, *(int32_t*)taosArrayGet(chAy, i));
qDebug("window %" PRId64 " wait child id:%d", win.skey, *(int32_t*)taosArrayGet(chAy, i));
}
continue;
} else if (pPullDataMap) {
......@@ -1626,6 +1626,12 @@ SSDataBlock* createDeleteBlock() {
return pBlock;
}
void initIntervalDownStream(SOperatorInfo* downstream, uint8_t type) {
ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->sessionSup.parentType = type;
}
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode,
......@@ -1701,6 +1707,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL,
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL) {
initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL);
}
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
......@@ -2476,12 +2486,14 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
} else {
int32_t index = -1;
SArray* chArray = NULL;
int32_t chId = 0;
if (chIds) {
chArray = *(void**)chIds;
int32_t chId = getChildIndex(pSDataBlock);
chId = getChildIndex(pSDataBlock);
index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
}
if (index != -1 && pSDataBlock->info.type == STREAM_PULL_DATA) {
qDebug("======delete child id %d", chId);
taosArrayRemove(chArray, index);
if (taosArrayGetSize(chArray) == 0) {
// pull data is over
......@@ -3010,6 +3022,7 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num
pDummy[i].functionId = pCtx[i].functionId;
}
}
void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int64_t gap, int64_t waterMark,
uint8_t type) {
ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
......
......@@ -198,8 +198,7 @@ static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) {
}
static bool stbSplIsMultiTbScan(bool streamQuery, SScanLogicNode* pScan) {
return (NULL != pScan->pVgroupList && pScan->pVgroupList->numOfVgroups > 1) ||
(streamQuery && TSDB_SUPER_TABLE == pScan->tableType);
return (NULL != pScan->pVgroupList && pScan->pVgroupList->numOfVgroups > 1);
}
static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
......
......@@ -366,4 +366,143 @@ if $data32 != 8 then
goto loop1
endi
sql drop database IF EXISTS test2;
sql drop stream IF EXISTS streams21;
sql drop stream IF EXISTS streams22;
sql create database test2 vgroups 2;
sql use test2;
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams21 trigger at_once into streamt as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s);
sql create stream streams22 trigger at_once into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s);
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791223001,2,2,2,1.1);
sql insert into t1 values(1648791233002,3,3,3,2.1);
sql insert into t1 values(1648791243003,4,4,4,3.1);
sql insert into t1 values(1648791213004,4,5,5,4.1);
sql insert into t2 values(1648791213000,1,6,6,1.0);
sql insert into t2 values(1648791223001,2,7,7,1.1);
sql insert into t2 values(1648791233002,3,8,8,2.1);
sql insert into t2 values(1648791243003,4,9,9,3.1);
sql insert into t2 values(1648791213004,4,10,10,4.1);
$loop_count = 0
loop2:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt;
# row 0
if $data01 != 2 then
print =====data01=$data01
goto loop2
endi
if $data02 != 5 then
print =====data02=$data02
goto loop2
endi
# row 1
if $data11 != 1 then
print =====data11=$data11
goto loop2
endi
if $data12 != 2 then
print =====data12=$data12
goto loop2
endi
# row 2
if $data21 != 1 then
print =====data21=$data21
goto loop2
endi
if $data22 != 3 then
print =====data22=$data22
goto loop2
endi
# row 3
if $data31 != 1 then
print =====data31=$data31
goto loop2
endi
if $data32 != 4 then
print =====data32=$data32
goto loop2
endi
print step 6
$loop_count = 0
loop3:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt2;
# row 0
if $data01 != 4 then
print =====data01=$data01
# goto loop3
endi
if $data02 != 10 then
print =====data02=$data02
goto loop3
endi
# row 1
if $data11 != 2 then
print =====data11=$data11
goto loop3
endi
if $data12 != 4 then
print =====data12=$data12
goto loop3
endi
# row 2
if $data21 != 2 then
print =====data21=$data21
goto loop3
endi
if $data22 != 6 then
print =====data22=$data22
goto loop3
endi
# row 3
if $data31 != 2 then
print =====data31=$data31
goto loop3
endi
if $data32 != 8 then
print =====data32=$data32
goto loop3
endi
system sh/stop_dnodes.sh
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册