From cded09bf796b7a1b27a336d34876bbc14743653f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 19 May 2022 23:47:27 +0800 Subject: [PATCH] enh(query): limit the rsp ssdatablock size. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 103 +++++++++++++++++------- source/libs/executor/src/executorimpl.c | 30 ++++--- tests/script/tsim/testsuit.sim | 79 ++++++++++++++++++ 3 files changed, 171 insertions(+), 41 deletions(-) create mode 100644 tests/script/tsim/testsuit.sim diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 06c3b29132..41e591c5b2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -425,6 +425,12 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* rowLen += pCond->colList[i].bytes; } + // make sure the output SSDataBlock size be less than 2MB. + int32_t TWOMB = 2 * 1024 * 1024; + if (pReadHandle->outputCapacity * rowLen > TWOMB) { + pReadHandle->outputCapacity = TWOMB / rowLen; + } + // allocate buffer in order to load data blocks from file pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg)); if (pReadHandle->suppInfo.pstatis == NULL) { @@ -1302,20 +1308,22 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* if ((ascScan && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) || (!ascScan && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) { - if ((ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) || - (!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) { + + bool cacheDataInFileBlockHole = (ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) || + (!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey)); + if (cacheDataInFileBlockHole) { // do not load file block into buffer int32_t step = ascScan ? 1 : -1; - TSKEY maxKey = - ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? (binfo.window.skey - step) : (binfo.window.ekey - step); + TSKEY maxKey = ascScan ? (binfo.window.skey - step) : (binfo.window.ekey - step); cur->rows = tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle); pTsdbReadHandle->realNumOfRows = cur->rows; // update the last key value pCheckInfo->lastKey = cur->win.ekey + step; - if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { + + if (!ascScan) { TSWAP(cur->win.skey, cur->win.ekey); } @@ -1334,18 +1342,16 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* /* * no data in cache, only load data from file * during the query processing, data in cache will not be checked anymore. - * * Here the buffer is not enough, so only part of file block can be loaded into memory buffer */ - assert(pTsdbReadHandle->outputCapacity >= binfo.rows); int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo); - if ((cur->pos == 0 && endPos == binfo.rows - 1 && ascScan) || - (cur->pos == (binfo.rows - 1) && endPos == 0 && (!ascScan))) { + bool wholeBlockReturned = ((abs(cur->pos - endPos) + 1) == binfo.rows); + if (wholeBlockReturned) { pTsdbReadHandle->realNumOfRows = binfo.rows; cur->rows = binfo.rows; - cur->win = binfo.window; + cur->win = binfo.window; cur->mixBlock = false; cur->blockCompleted = true; @@ -1356,12 +1362,24 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* cur->lastKey = binfo.window.skey - 1; cur->pos = -1; } - } else { // partially copy to dest buffer + } else { // partially copy to dest buffer + // make sure to only load once + bool firstTimeExtract = ((cur->pos == 0 && ascScan) || (cur->pos == binfo.rows -1 && (!ascScan))); + if (pTsdbReadHandle->outputCapacity < binfo.rows && firstTimeExtract) { + code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &binfo, endPos); cur->mixBlock = true; } - assert(cur->blockCompleted); + if (pTsdbReadHandle->outputCapacity >= binfo.rows) { + ASSERT(cur->blockCompleted); + } + if (cur->rows == binfo.rows) { tsdbDebug("%p whole file block qualified, brange:%" PRId64 "-%" PRId64 ", rows:%d, lastKey:%" PRId64 ", %s", pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr); @@ -1858,15 +1876,14 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; TSKEY* tsArray = pCols->cols[0].pData; - int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; - int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)); + bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order); - int32_t pos = cur->pos; + int32_t step = ascScan? 1 : -1; int32_t start = cur->pos; int32_t end = endPos; - if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { + if (!ascScan) { TSWAP(start, end); } @@ -1876,11 +1893,11 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa // the time window should always be ascending order: skey <= ekey cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]}; cur->mixBlock = (numOfRows != pBlockInfo->rows); - cur->lastKey = tsArray[endPos] + step; - cur->blockCompleted = true; + cur->lastKey = tsArray[endPos] + step; + cur->blockCompleted = (ascScan? (endPos == pBlockInfo->rows - 1):(endPos == 0)); // The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases. - pos = endPos + step; + int32_t pos = endPos + step; updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos); doCheckGeneratedBlockRange(pTsdbReadHandle); @@ -1892,20 +1909,44 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) { // NOTE: reverse the order to find the end position in data block int32_t endPos = -1; - int32_t order = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; + bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order); + int32_t order = ascScan? TSDB_ORDER_DESC : TSDB_ORDER_ASC; SQueryFilePos* cur = &pTsdbReadHandle->cur; SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; - if (ASCENDING_TRAVERSE(pTsdbReadHandle->order) && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) { - endPos = pBlockInfo->rows - 1; - cur->mixBlock = (cur->pos != 0); - } else if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) { - endPos = 0; - cur->mixBlock = (cur->pos != pBlockInfo->rows - 1); + if (pTsdbReadHandle->outputCapacity >= pBlockInfo->rows) { + if (ascScan && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) { + endPos = pBlockInfo->rows - 1; + cur->mixBlock = (cur->pos != 0); + } else if ((!ascScan) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) { + endPos = 0; + cur->mixBlock = (cur->pos != pBlockInfo->rows - 1); + } else { + assert(pCols->numOfRows > 0); + endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order); + cur->mixBlock = true; + } } else { - assert(pCols->numOfRows > 0); - endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order); + if (ascScan && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) { + endPos = MIN(cur->pos + pTsdbReadHandle->outputCapacity - 1, pBlockInfo->rows - 1); + } else if ((!ascScan) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) { + endPos = MAX(cur->pos - pTsdbReadHandle->outputCapacity + 1, 0); + } else { + ASSERT(pCols->numOfRows > 0); + endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order); + + // current data is more than the capacity + int32_t size = abs(cur->pos - endPos) + 1; + if (size > pTsdbReadHandle->outputCapacity) { + int32_t delta = size - pTsdbReadHandle->outputCapacity; + if (ascScan) { + endPos -= delta; + } else { + endPos += delta; + } + } + } cur->mixBlock = true; } @@ -2369,7 +2410,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists); -static int32_t getDataBlockRv(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool* exists) { +static int32_t getDataBlock(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool* exists) { int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; SQueryFilePos* cur = &pTsdbReadHandle->cur; @@ -2478,7 +2519,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi cur->fid = pTsdbReadHandle->pFileGroup->fid; STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot]; - return getDataBlockRv(pTsdbReadHandle, pBlockInfo, exists); + return getDataBlock(pTsdbReadHandle, pBlockInfo, exists); } static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) { @@ -2643,7 +2684,7 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis } else { moveToNextDataBlockInCurrentFile(pTsdbReadHandle); STableBlockInfo* pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot]; - return getDataBlockRv(pTsdbReadHandle, pNext, exists); + return getDataBlock(pTsdbReadHandle, pNext, exists); } } } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 168589148e..8573b5ec10 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3546,11 +3546,12 @@ _error: int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag) { // todo add more information about exchange operation - if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) { + int32_t type = pOperator->operatorType; + if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN) { *order = TSDB_ORDER_ASC; *scanFlag = MAIN_SCAN; return TSDB_CODE_SUCCESS; - } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { + } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = pOperator->info; *order = pTableScanInfo->cond.order; *scanFlag = pTableScanInfo->scanFlag; @@ -3910,6 +3911,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); @@ -4311,23 +4315,29 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + // Make sure the size of SSDataBlock will never exceed the size of 2MB. + int32_t TWOMB = 2 * 1024 * 1024; + if (numOfRows * pResBlock->info.rowSize > TWOMB) { + numOfRows = TWOMB / pResBlock->info.rowSize; + } initResultSizeInfo(pOperator, numOfRows); + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols, pTaskInfo); - pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols); - pOperator->name = "ProjectOperator"; + pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols); + pOperator->name = "ProjectOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pExpr = pExprInfo; - pOperator->numOfExprs = num; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pExpr = pExprInfo; + pOperator->numOfExprs = num; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, destroyProjectOperatorInfo, NULL, NULL, NULL); - pOperator->pTaskInfo = pTaskInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/tests/script/tsim/testsuit.sim b/tests/script/tsim/testsuit.sim new file mode 100644 index 0000000000..e32abe4b7f --- /dev/null +++ b/tests/script/tsim/testsuit.sim @@ -0,0 +1,79 @@ +#run user/pass_alter.sim +#run user/basic1.sim +#run user/privilege2.sim +#run user/user_len.sim +#run user/privilege1.sim +#run user/pass_len.sim +#run tstream/basic1.sim +#run tstream/basic0.sim +#run table/basic1.sim +#run trans/create_db.sim +#run stable/alter1.sim +#run stable/vnode3.sim +#run stable/metrics.sim +#run stable/show.sim +#run stable/values.sim +#run stable/dnode3.sim +#run stable/refcount.sim +#run stable/disk.sim +#run db/basic1.sim +#run db/basic3.sim +#run db/basic7.sim +#run db/basic6.sim +#run db/create_all_options.sim +#run db/basic2.sim +#run db/error1.sim +#run db/taosdlog.sim +#run db/alter_option.sim +#run mnode/basic1.sim +#run parser/fourArithmetic-basic.sim +#run parser/groupby-basic.sim +#run snode/basic1.sim +#run query/time_process.sim +#run query/stddev.sim +#run query/interval-offset.sim +#run query/charScalarFunction.sim +#run query/complex_select.sim +#run query/explain.sim +#run query/crash_sql.sim +#run query/diff.sim +#run query/complex_limit.sim +#run query/complex_having.sim +#run query/udf.sim +#run query/complex_group.sim +#run query/interval.sim +#run query/session.sim + +print ========> dead lock failed when 2 rows in outputCapacity +run query/scalarFunction.sim +run query/scalarNull.sim +run query/complex_where.sim +run tmq/basic1.sim +run tmq/basic4.sim +run tmq/basic1Of2Cons.sim +run tmq/prepareBasicEnv-1vgrp.sim +run tmq/topic.sim +run tmq/basic4Of2Cons.sim +run tmq/prepareBasicEnv-4vgrp.sim +run tmq/basic3.sim +run tmq/basic2Of2Cons.sim +run tmq/basic2.sim +run tmq/basic3Of2Cons.sim +run tmq/basic2Of2ConsOverlap.sim +run tmq/clearConsume.sim +run qnode/basic1.sim +run dnode/basic1.sim +run show/basic.sim +run insert/basic1.sim +run insert/basic0.sim +run insert/backquote.sim +run insert/null.sim +run sync/oneReplica1VgElectWithInsert.sim +run sync/threeReplica1VgElect.sim +run sync/oneReplica1VgElect.sim +run sync/insertDataByRunBack.sim +run sync/threeReplica1VgElectWihtInsert.sim +run sma/tsmaCreateInsertData.sim +run sma/rsmaCreateInsertQuery.sim +run valgrind/checkError.sim +run bnode/basic1.sim -- GitLab