From d8c2a68f27df8ec9f3c108c10682ca7fb6d0e13b Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 15 Jun 2022 10:44:36 +0800 Subject: [PATCH] feat(stream): update data fo partition by --- source/libs/executor/inc/executorimpl.h | 4 + source/libs/executor/src/scanoperator.c | 154 +++++++++++++----- source/libs/executor/src/timewindowoperator.c | 4 +- source/libs/function/src/builtinsimpl.c | 9 +- source/libs/planner/src/planOptimizer.c | 34 ++-- 5 files changed, 145 insertions(+), 60 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index ab60acab53..c4d16c89e1 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -391,7 +391,9 @@ typedef struct SStreamBlockScanInfo { void* streamBlockReader;// stream block reader handle SArray* pColMatchInfo; // SNode* pCondition; + int32_t tsArrayIndex; SArray* tsArray; + uint64_t groupId; SUpdateInfo* pUpdateInfo; SExprInfo* pPseudoExpr; @@ -582,6 +584,7 @@ typedef struct SPartitionOperatorInfo { int32_t* columnOffset; // start position for each column data void* pGroupIter; // group iterator int32_t pageIndex; // page index of current group + SSDataBlock* pUpdateRes; } SPartitionOperatorInfo; typedef struct SWindowRowsSup { @@ -907,6 +910,7 @@ int32_t compareTimeWindow(const void* p1, const void* p2, const void* param); int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); +void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); #ifdef __cplusplus } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d30e4ef6db..e44e05224f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -750,60 +750,127 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { return true; } -static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) { - SSDataBlock* pResult = NULL; - pResult = doTableScan(pInfo->pOperatorDumy); - if (pResult == NULL) { - if (prepareDataScan(pInfo)) { - // scan next window data - pResult = doTableScan(pInfo->pOperatorDumy); +static void copyOneRow(SSDataBlock* dest, SSDataBlock* source, int32_t sourceRowId) { + for (int32_t j = 0; j < source->info.numOfCols; j++) { + SColumnInfoData* pDestCol = (SColumnInfoData*)taosArrayGet(dest->pDataBlock, j); + SColumnInfoData* pSourceCol = (SColumnInfoData*)taosArrayGet(source->pDataBlock, j); + if (colDataIsNull_s(pSourceCol, sourceRowId)) { + colDataAppendNULL(pDestCol, dest->info.rows); + } else { + colDataAppend(pDestCol, dest->info.rows, colDataGetData(pSourceCol, sourceRowId), false); } } - return pResult; + dest->info.rows++; } -static void getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, - SSDataBlock* pUpdateBlock) { - SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); - ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); - TSKEY* ts = (TSKEY*)pColDataInfo->pData; - for (int32_t i = 0; i < pBlock->info.rows; i++) { - if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[i])) { - taosArrayPush(pInfo->tsArray, ts + i); +static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t rowId) { + uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); + if (groupId) { + return *groupId; + } + return 0; + /* Todo(liuyao) for partition by column + recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, rowId); + int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals); + uint64_t resId = 0; + uint64_t* groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len); + if (groupId) { + return *groupId; + } else if (len != 0) { + resId = calcGroupId(pTableScanInfo->keyBuf, len); + taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &resId, sizeof(uint64_t)); + } + return resId; + */ +} + +static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) { + while (1) { + SSDataBlock* pResult = NULL; + pResult = doTableScan(pInfo->pOperatorDumy); + if (pResult == NULL) { + if (prepareDataScan(pInfo)) { + // scan next window data + pResult = doTableScan(pInfo->pOperatorDumy); + } + } + if (!pResult) { + return NULL; + } + + if (pResult->info.groupId == pInfo->groupId) { + return pResult; } } - if (!pUpdateBlock) { - taosArrayClear(pInfo->tsArray); - return; + +/* Todo(liuyao) for partition by column + SSDataBlock* pBlock = createOneDataBlock(pResult, true); + blockDataCleanup(pResult); + for (int32_t i = 0; i < pBlock->info.rows; i++) { + uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, i); + if (id == pInfo->groupId) { + copyOneRow(pResult, pBlock, i); + } } + return pResult; +*/ +} + +static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) { + blockDataCleanup(pUpdateBlock); int32_t size = taosArrayGetSize(pInfo->tsArray); - if (size > 0 && invertible) { - // Todo(liuyao) get from tsdb - // SSDataBlock* p = createOneDataBlock(pBlock, true); - // p->info.type = STREAM_INVERT; - // taosArrayClear(pInfo->tsArray); - // return p; + if (pInfo->tsArrayIndex < size) { SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pUpdateBlock->pDataBlock, pInfo->primaryTsIndex); ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); blockDataEnsureCapacity(pUpdateBlock, size); - for (int32_t i = 0; i < size; i++) { - TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->tsArray, i); - colDataAppend(pCol, i, (char*)pTs, false); - } - for (int32_t i = 0; i < pUpdateBlock->info.numOfCols; i++) { - if (i == pInfo->primaryTsIndex) { - continue; + ASSERT(pBlock->info.numOfCols == pUpdateBlock->info.numOfCols); + + int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex); + pInfo->groupId = getGroupId(pInfo->pOperatorDumy, pBlock, rowId); + int32_t i = 0; + for ( ; i < size; i++) { + rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex); + uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, rowId); + if (pInfo->groupId != id) { + break; } - SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pUpdateBlock->pDataBlock, i); - colDataAppendNNULL(pCol, 0, size); + copyOneRow(pUpdateBlock, pBlock, rowId); } - pUpdateBlock->info.rows = size; + pUpdateBlock->info.rows = i; + pInfo->tsArrayIndex += i; + pUpdateBlock->info.groupId = pInfo->groupId; pUpdateBlock->info.type = STREAM_REPROCESS; blockDataUpdateTsWindow(pUpdateBlock, 0); + } + // all rows have same group id + ASSERT(pInfo->tsArrayIndex >= size); + if (size > 0 && pInfo->tsArrayIndex == size) { taosArrayClear(pInfo->tsArray); } } +static void getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, + SSDataBlock* pUpdateBlock) { + SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); + ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); + TSKEY* ts = (TSKEY*)pColDataInfo->pData; + for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) { + if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[rowId])) { + taosArrayPush(pInfo->tsArray, &rowId); + } + } + if (!pUpdateBlock) { + taosArrayClear(pInfo->tsArray); + return; + } + setUpdateData(pInfo, pBlock, pUpdateBlock); + // Todo(liuyao) get from tsdb + // SSDataBlock* p = createOneDataBlock(pBlock, true); + // p->info.type = STREAM_INVERT; + // taosArrayClear(pInfo->tsArray); + // return p; +} + static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -833,7 +900,6 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; return pInfo->pRes; } else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) { - blockDataCleanup(pInfo->pRes); pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; if (!isStateWindow(pInfo)) { prepareDataScan(pInfo); @@ -848,7 +914,15 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) { SSDataBlock* pSDB = doDataScan(pInfo); if (pSDB == NULL) { - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes); + if (pInfo->pUpdateRes->info.rows > 0) { + if (!isStateWindow(pInfo)) { + prepareDataScan(pInfo); + } + return pInfo->pUpdateRes; + } else { + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + } } else { getUpdateDataBlock(pInfo, true, pSDB, NULL); return pSDB; @@ -941,7 +1015,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { if (rows == 0) { pOperator->status = OP_EXEC_DONE; } else if (pInfo->pUpdateInfo) { - blockDataCleanup(pInfo->pUpdateRes); + pInfo->tsArrayIndex = 0; getUpdateDataBlock(pInfo, true, pInfo->pRes, pInfo->pUpdateRes); if (pInfo->pUpdateRes->info.rows > 0) { if (pInfo->pUpdateRes->info.type == STREAM_REPROCESS) { @@ -1020,7 +1094,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan goto _error; } - pInfo->tsArray = taosArrayInit(4, sizeof(TSKEY)); + pInfo->tsArray = taosArrayInit(4, sizeof(int32_t)); if (pInfo->tsArray == NULL) { goto _error; } @@ -1047,6 +1121,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan pInfo->pOperatorDumy = pTableScanDummy; pInfo->interval = pSTInfo->interval; pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1}; + pInfo->groupId = 0; + pOperator->name = "StreamBlockScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->blocking = false; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 0151474008..5a18649cab 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1985,7 +1985,7 @@ static void clearUpdateDataBlock(SSDataBlock* pBlock) { blockDataCleanup(pBlock); } -static void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex) { +void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex) { ASSERT(pDest->info.capacity >= pSource->info.rows); clearUpdateDataBlock(pDest); SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, 0); @@ -1997,6 +1997,8 @@ static void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_ colDataAppendNNULL(pCol, 0, pSource->info.rows); } pDest->info.rows = pSource->info.rows; + pDest->info.groupId = pSource->info.groupId; + pDest->info.type = pSource->info.type; blockDataUpdateTsWindow(pDest, 0); } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index c6ab9f859d..94208043e7 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2345,14 +2345,7 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); SAPercentileInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); ASSERT(pDBuf->algo == pSBuf->algo); - if (pDBuf->algo == APERCT_ALGO_TDIGEST) { - tdigestMerge(pDBuf->pTDigest, pSBuf->pTDigest); - } else { - SHistogramInfo* pTmp = tHistogramMerge(pDBuf->pHisto, pSBuf->pHisto, MAX_HISTOGRAM_BIN); - memcpy(pDBuf->pHisto, pTmp, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1)); - pDBuf->pHisto->elems = (SHistBin*)((char*)pDBuf->pHisto + sizeof(SHistogramInfo)); - tHistogramDestroy(&pTmp); - } + apercentileTransferInfo(pSBuf, pDBuf); pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 65d73b4cdb..41b80eaaa8 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -104,10 +104,14 @@ static bool osdMayBeOptimized(SLogicNode* pNode) { return false; } if (NULL == pNode->pParent || (QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) && - QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode->pParent))) { + QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode->pParent) && + QUERY_NODE_LOGIC_PLAN_PARTITION != nodeType(pNode->pParent))) { return false; } - if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent)) { + if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent) || + (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode->pParent) && + pNode->pParent->pParent && + QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent->pParent)) ) { return true; } return !osdHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys); @@ -217,16 +221,22 @@ static int32_t osdGetDataRequired(SNodeList* pFuncs) { } static void setScanWindowInfo(SScanLogicNode* pScan) { - if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pScan->node.pParent)) { - pScan->interval = ((SWindowLogicNode*)pScan->node.pParent)->interval; - pScan->offset = ((SWindowLogicNode*)pScan->node.pParent)->offset; - pScan->sliding = ((SWindowLogicNode*)pScan->node.pParent)->sliding; - pScan->intervalUnit = ((SWindowLogicNode*)pScan->node.pParent)->intervalUnit; - pScan->slidingUnit = ((SWindowLogicNode*)pScan->node.pParent)->slidingUnit; - pScan->triggerType = ((SWindowLogicNode*)pScan->node.pParent)->triggerType; - pScan->watermark = ((SWindowLogicNode*)pScan->node.pParent)->watermark; - pScan->tsColId = ((SColumnNode*)((SWindowLogicNode*)pScan->node.pParent)->pTspk)->colId; - pScan->filesFactor = ((SWindowLogicNode*)pScan->node.pParent)->filesFactor; + SLogicNode* pParent = pScan->node.pParent; + if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pParent) && + pParent->pParent && + QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent->pParent)) { + pParent = pParent->pParent; + } + if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent)) { + pScan->interval = ((SWindowLogicNode*)pParent)->interval; + pScan->offset = ((SWindowLogicNode*)pParent)->offset; + pScan->sliding = ((SWindowLogicNode*)pParent)->sliding; + pScan->intervalUnit = ((SWindowLogicNode*)pParent)->intervalUnit; + pScan->slidingUnit = ((SWindowLogicNode*)pParent)->slidingUnit; + pScan->triggerType = ((SWindowLogicNode*)pParent)->triggerType; + pScan->watermark = ((SWindowLogicNode*)pParent)->watermark; + pScan->tsColId = ((SColumnNode*)((SWindowLogicNode*)pParent)->pTspk)->colId; + pScan->filesFactor = ((SWindowLogicNode*)pParent)->filesFactor; } } -- GitLab