未验证 提交 b5f6272b 编写于 作者: L liuyao 提交者: GitHub

Merge pull request #13682 from taosdata/feature/TD-16285

feat(stream): distribute stream interval
......@@ -1219,6 +1219,8 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
pBlock->info.hasVarCol = pDataBlock->info.hasVarCol;
pBlock->info.rowSize = pDataBlock->info.rowSize;
pBlock->info.groupId = pDataBlock->info.groupId;
pBlock->info.childId = pDataBlock->info.childId;
pBlock->info.type = pDataBlock->info.type;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {0};
......@@ -1499,6 +1501,7 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) {
SSDataBlock* pDataBlock = taosArrayGet(dataBlocks, i);
int32_t colNum = pDataBlock->info.numOfCols;
int32_t rows = pDataBlock->info.rows;
printf("%s |block type %d |child id %d|\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId);
for (int32_t j = 0; j < rows; j++) {
printf("%s |", flag);
for (int32_t k = 0; k < colNum; k++) {
......
......@@ -4690,10 +4690,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
int32_t children = 8;
qDebug("[******]create Semi");
int32_t children = 0;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
int32_t children = 0;
qDebug("[******]create Final");
int32_t children = 1;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
......
......@@ -809,11 +809,18 @@ static void getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible, SSD
// return p;
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pUpdateBlock->pDataBlock, pInfo->primaryTsIndex);
ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
colInfoDataEnsureCapacity(pCol, 0, size);
blockDataEnsureCapacity(pUpdateBlock, size);
for (int32_t i = 0; i < size; i++) {
TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->tsArray, i);
colDataAppend(pCol, i, (char*)pTs, false);
}
for (int32_t i = 0; i < pUpdateBlock->info.numOfCols; i++) {
if (i == pInfo->primaryTsIndex) {
continue;
}
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pUpdateBlock->pDataBlock, i);
colDataAppendNNULL(pCol, 0, size);
}
pUpdateBlock->info.rows = size;
pUpdateBlock->info.type = STREAM_REPROCESS;
blockDataUpdateTsWindow(pUpdateBlock, 0);
......@@ -841,7 +848,9 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
int32_t current = pInfo->validBlockIndex++;
return taosArrayGetP(pInfo->pBlockLists, current);
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
blockDataUpdateTsWindow(pBlock, 0);
return pBlock;
} else {
if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
blockDataDestroy(pInfo->pUpdateRes);
......@@ -940,7 +949,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
doFilter(pInfo->pCondition, pInfo->pRes, false);
blockDataUpdateTsWindow(pInfo->pRes, 0);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
break;
}
......
......@@ -1925,6 +1925,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
int32_t prevEndPos = (forwardRows - 1) * step + startPos;
ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order);
if (startPos < 0) {
break;
......@@ -2003,10 +2004,7 @@ static void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_
}
static int32_t getChildIndex(SSDataBlock* pBlock) {
// if (pBlock->info.type != STREAM_INVALID && pBlock->info.rows < 4) { // for test
// return pBlock->info.rows - 1;
// }
return 0;
return pBlock->info.childId;
}
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册