From 567d0fe8da077e8653d27c8410673f4a75a1e59c Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 24 Aug 2021 08:33:44 +0800 Subject: [PATCH] [TD-6264]:backport _block_dist() from 2.1 to 2.0 to fix core dump due to tbuf length limitation --- src/inc/tsdb.h | 7 +- src/query/src/qAggMain.c | 148 +++++++++++++------- src/query/src/qExecutor.c | 10 +- src/query/src/qUtil.c | 16 ++- src/tsdb/inc/tsdbCommit.h | 2 + src/tsdb/src/tsdbRead.c | 12 +- src/util/inc/tarray.h | 8 ++ src/util/src/tarray.c | 5 + tests/script/general/compute/block_dist.sim | 6 +- 9 files changed, 158 insertions(+), 56 deletions(-) diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 85ee9f0443..da33a2ba73 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -211,7 +211,7 @@ typedef struct SDataBlockInfo { } SDataBlockInfo; typedef struct SFileBlockInfo { - int32_t numOfRows; + int32_t numBlocksOfStep; } SFileBlockInfo; typedef struct { @@ -225,13 +225,18 @@ typedef struct { SHashObj *map; // speedup acquire the tableQueryInfo by table uid } STableGroupInfo; +#define TSDB_BLOCK_DIST_STEP_ROWS 16 typedef struct { uint16_t rowSize; uint16_t numOfFiles; uint32_t numOfTables; uint64_t totalSize; + uint64_t totalRows; + int32_t maxRows; + int32_t minRows; int32_t firstSeekTimeUs; uint32_t numOfRowsInMemTable; + uint32_t numOfSmallBlocks; SArray *dataBlockInfos; } STableBlockDist; diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index b3996fb55a..b0f7decfc6 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -4794,7 +4794,7 @@ void blockInfo_func(SQLFunctionCtx* pCtx) { int32_t len = *(int32_t*) pCtx->pInput; blockDistInfoFromBinary((char*)pCtx->pInput + sizeof(int32_t), len, pDist); - pDist->rowSize = (int16_t) pCtx->param[0].i64; + pDist->rowSize = (uint16_t)pCtx->param[0].i64; memcpy(pCtx->pOutput, pCtx->pInput, sizeof(int32_t) + len); @@ -4802,51 +4802,85 @@ void blockInfo_func(SQLFunctionCtx* pCtx) { pResInfo->hasResult = DATA_SET_FLAG; } -static void mergeTableBlockDist(STableBlockDist* pDist, const STableBlockDist* pSrc) { +static void mergeTableBlockDist(SResultRowCellInfo* pResInfo, const STableBlockDist* pSrc) { + STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo); assert(pDist != NULL && pSrc != NULL); + pDist->numOfTables += pSrc->numOfTables; pDist->numOfRowsInMemTable += pSrc->numOfRowsInMemTable; + pDist->numOfSmallBlocks += pSrc->numOfSmallBlocks; pDist->numOfFiles += pSrc->numOfFiles; pDist->totalSize += pSrc->totalSize; + pDist->totalRows += pSrc->totalRows; + + if (pResInfo->hasResult == DATA_SET_FLAG) { + pDist->maxRows = MAX(pDist->maxRows, pSrc->maxRows); + pDist->minRows = MIN(pDist->minRows, pSrc->minRows); + } else { + pDist->maxRows = pSrc->maxRows; + pDist->minRows = pSrc->minRows; - if (pDist->dataBlockInfos == NULL) { - pDist->dataBlockInfos = taosArrayInit(4, sizeof(SFileBlockInfo)); + int32_t maxSteps = TSDB_MAX_MAX_ROW_FBLOCK/TSDB_BLOCK_DIST_STEP_ROWS; + if (TSDB_MAX_MAX_ROW_FBLOCK % TSDB_BLOCK_DIST_STEP_ROWS != 0) { + ++maxSteps; + } + pDist->dataBlockInfos = taosArrayInit(maxSteps, sizeof(SFileBlockInfo)); + taosArraySetSize(pDist->dataBlockInfos, maxSteps); } - taosArrayPushBatch(pDist->dataBlockInfos, pSrc->dataBlockInfos->pData, (int32_t) taosArrayGetSize(pSrc->dataBlockInfos)); + size_t steps = taosArrayGetSize(pSrc->dataBlockInfos); + for (int32_t i = 0; i < steps; ++i) { + int32_t srcNumBlocks = ((SFileBlockInfo*)taosArrayGet(pSrc->dataBlockInfos, i))->numBlocksOfStep; + SFileBlockInfo* blockInfo = (SFileBlockInfo*)taosArrayGet(pDist->dataBlockInfos, i); + blockInfo->numBlocksOfStep += srcNumBlocks; + } } void block_func_merge(SQLFunctionCtx* pCtx) { - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - - STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo); STableBlockDist info = {0}; - int32_t len = *(int32_t*) pCtx->pInput; blockDistInfoFromBinary(((char*)pCtx->pInput) + sizeof(int32_t), len, &info); - mergeTableBlockDist(pDist, &info); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + mergeTableBlockDist(pResInfo, &info); + + pResInfo->numOfRes = 1; + pResInfo->hasResult = DATA_SET_FLAG; } -static int32_t doGetPercentile(const SArray* pArray, double rate) { - int32_t len = (int32_t)taosArrayGetSize(pArray); - if (len <= 0) { - return 0; +void getPercentiles(STableBlockDist *pTableBlockDist, int64_t totalBlocks, int32_t numOfPercents, + double* percents, int32_t* percentiles) { + if (totalBlocks == 0) { + for (int32_t i = 0; i < numOfPercents; ++i) { + percentiles[i] = 0; + } + return; } - assert(rate >= 0 && rate <= 1.0); - int idx = (int32_t)((len - 1) * rate); + SArray *blocksInfos = pTableBlockDist->dataBlockInfos; + size_t numSteps = taosArrayGetSize(blocksInfos); + size_t cumulativeBlocks = 0; - return ((SFileBlockInfo *)(taosArrayGet(pArray, idx)))->numOfRows; -} + int percentIndex = 0; + for (int32_t indexStep = 0; indexStep < numSteps; ++indexStep) { + int32_t numStepBlocks = ((SFileBlockInfo *)taosArrayGet(blocksInfos, indexStep))->numBlocksOfStep; + if (numStepBlocks == 0) continue; + cumulativeBlocks += numStepBlocks; -static int compareBlockInfo(const void *pLeft, const void *pRight) { - int32_t left = ((SFileBlockInfo *)pLeft)->numOfRows; - int32_t right = ((SFileBlockInfo *)pRight)->numOfRows; + while (percentIndex < numOfPercents) { + double blockRank = totalBlocks * percents[percentIndex]; + if (blockRank <= cumulativeBlocks) { + percentiles[percentIndex] = indexStep; + ++percentIndex; + } else { + break; + } + } + } - if (left > right) return 1; - if (left < right) return -1; - return 0; + for (int32_t i = 0; i < numOfPercents; ++i) { + percentiles[i] = (percentiles[i]+1) * TSDB_BLOCK_DIST_STEP_ROWS - TSDB_BLOCK_DIST_STEP_ROWS/2; + } } void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) { @@ -4854,41 +4888,56 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) { return; } - int64_t min = INT64_MAX, max = INT64_MIN, avg = 0; - SArray* blockInfos= pTableBlockDist->dataBlockInfos; - int64_t totalRows = 0, totalBlocks = taosArrayGetSize(blockInfos); + SArray* blockInfos = pTableBlockDist->dataBlockInfos; + uint64_t totalRows = pTableBlockDist->totalRows; + size_t numSteps = taosArrayGetSize(blockInfos); + int64_t totalBlocks = 0; + int64_t min = -1, max = -1, avg = 0; - for (size_t i = 0; i < taosArrayGetSize(blockInfos); i++) { + for (int32_t i = 0; i < numSteps; i++) { SFileBlockInfo *blockInfo = taosArrayGet(blockInfos, i); - int64_t rows = blockInfo->numOfRows; - - min = MIN(min, rows); - max = MAX(max, rows); - totalRows += rows; + int64_t blocks = blockInfo->numBlocksOfStep; + totalBlocks += blocks; } avg = totalBlocks > 0 ? (int64_t)(totalRows/totalBlocks) : 0; - taosArraySort(blockInfos, compareBlockInfo); + min = totalBlocks > 0 ? pTableBlockDist->minRows : 0; + max = totalBlocks > 0 ? pTableBlockDist->maxRows : 0; + + double stdDev = 0; + if (totalBlocks > 0) { + double variance = 0; + for (int32_t i = 0; i < numSteps; i++) { + SFileBlockInfo *blockInfo = taosArrayGet(blockInfos, i); + int64_t blocks = blockInfo->numBlocksOfStep; + int32_t rows = (i * TSDB_BLOCK_DIST_STEP_ROWS + TSDB_BLOCK_DIST_STEP_ROWS / 2); + variance += blocks * (rows - avg) * (rows - avg); + } + variance = variance / totalBlocks; + stdDev = sqrt(variance); + } + + double percents[] = {0.05, 0.10, 0.20, 0.30, 0.40, 0.50, 0.60, 0.70, 0.80, 0.90, 0.95, 0.99}; + int32_t percentiles[] = {-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}; + assert(sizeof(percents)/sizeof(double) == sizeof(percentiles)/sizeof(int32_t)); + getPercentiles(pTableBlockDist, totalBlocks, sizeof(percents)/sizeof(double), percents, percentiles); uint64_t totalLen = pTableBlockDist->totalSize; int32_t rowSize = pTableBlockDist->rowSize; - + int32_t smallBlocks = pTableBlockDist->numOfSmallBlocks; + double compRatio = (totalRows>0) ? ((double)(totalLen)/(rowSize*totalRows)) : 1; int sz = sprintf(result + VARSTR_HEADER_SIZE, "summary: \n\t " "5th=[%d], 10th=[%d], 20th=[%d], 30th=[%d], 40th=[%d], 50th=[%d]\n\t " "60th=[%d], 70th=[%d], 80th=[%d], 90th=[%d], 95th=[%d], 99th=[%d]\n\t " "Min=[%"PRId64"(Rows)] Max=[%"PRId64"(Rows)] Avg=[%"PRId64"(Rows)] Stddev=[%.2f] \n\t " - "Rows=[%"PRId64"], Blocks=[%"PRId64"], Size=[%.3f(Kb)] Comp=[%.2f%%]\n\t " - "RowsInMem=[%d] \n\t SeekHeaderTime=[%d(us)]", - doGetPercentile(blockInfos, 0.05), doGetPercentile(blockInfos, 0.10), - doGetPercentile(blockInfos, 0.20), doGetPercentile(blockInfos, 0.30), - doGetPercentile(blockInfos, 0.40), doGetPercentile(blockInfos, 0.50), - doGetPercentile(blockInfos, 0.60), doGetPercentile(blockInfos, 0.70), - doGetPercentile(blockInfos, 0.80), doGetPercentile(blockInfos, 0.90), - doGetPercentile(blockInfos, 0.95), doGetPercentile(blockInfos, 0.99), - min, max, avg, 0.0, - totalRows, totalBlocks, totalLen/1024.0, (double)(totalLen*100.0)/(rowSize*totalRows), - pTableBlockDist->numOfRowsInMemTable, pTableBlockDist->firstSeekTimeUs); + "Rows=[%"PRIu64"], Blocks=[%"PRId64"], SmallBlocks=[%d], Size=[%.3f(Kb)] Comp=[%.2f]\n\t " + "RowsInMem=[%d] \n\t", + percentiles[0], percentiles[1], percentiles[2], percentiles[3], percentiles[4], percentiles[5], + percentiles[6], percentiles[7], percentiles[8], percentiles[9], percentiles[10], percentiles[11], + min, max, avg, stdDev, + totalRows, totalBlocks, smallBlocks, totalLen/1024.0, compRatio, + pTableBlockDist->numOfRowsInMemTable); varDataSetLen(result, sz); UNUSED(sz); } @@ -4897,9 +4946,14 @@ void blockinfo_func_finalizer(SQLFunctionCtx* pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo); - pDist->rowSize = (int16_t)pCtx->param[0].i64; + pDist->rowSize = (uint16_t)pCtx->param[0].i64; generateBlockDistResult(pDist, pCtx->pOutput); + if (pDist->dataBlockInfos != NULL) { + taosArrayDestroy(pDist->dataBlockInfos); + pDist->dataBlockInfos = NULL; + } + // cannot set the numOfIteratedElems again since it is set during previous iteration pResInfo->numOfRes = 1; pResInfo->hasResult = DATA_SET_FLAG; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index f96d33f060..b2e63707cb 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4546,7 +4546,15 @@ static SSDataBlock* doBlockInfoScan(void* param) { STableBlockDist tableBlockDist = {0}; tableBlockDist.numOfTables = (int32_t)pOperator->pRuntimeEnv->tableqinfoGroupInfo.numOfTables; - tableBlockDist.dataBlockInfos = taosArrayInit(512, sizeof(SFileBlockInfo)); + + int32_t numRowSteps = tsMaxRowsInFileBlock / TSDB_BLOCK_DIST_STEP_ROWS; + if (tsMaxRowsInFileBlock % TSDB_BLOCK_DIST_STEP_ROWS != 0) { + ++numRowSteps; + } + tableBlockDist.dataBlockInfos = taosArrayInit(numRowSteps, sizeof(SFileBlockInfo)); + taosArraySetSize(tableBlockDist.dataBlockInfos, numRowSteps); + tableBlockDist.maxRows = INT_MIN; + tableBlockDist.minRows = INT_MAX; tsdbGetFileBlocksDistInfo(pTableScanInfo->pQueryHandle, &tableBlockDist); tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->pQueryHandle); diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index aa793add84..ed069c6a91 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -581,7 +581,11 @@ void blockDistInfoToBinary(STableBlockDist* pDist, struct SBufferWriter* bw) { tbufWriteUint32(bw, pDist->numOfTables); tbufWriteUint16(bw, pDist->numOfFiles); tbufWriteUint64(bw, pDist->totalSize); + tbufWriteUint64(bw, pDist->totalRows); + tbufWriteInt32(bw, pDist->maxRows); + tbufWriteInt32(bw, pDist->minRows); tbufWriteUint32(bw, pDist->numOfRowsInMemTable); + tbufWriteUint32(bw, pDist->numOfSmallBlocks); tbufWriteUint64(bw, taosArrayGetSize(pDist->dataBlockInfos)); // compress the binary string @@ -616,13 +620,17 @@ void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDi pDist->numOfTables = tbufReadUint32(&br); pDist->numOfFiles = tbufReadUint16(&br); pDist->totalSize = tbufReadUint64(&br); + pDist->totalRows = tbufReadUint64(&br); + pDist->maxRows = tbufReadInt32(&br); + pDist->minRows = tbufReadInt32(&br); pDist->numOfRowsInMemTable = tbufReadUint32(&br); - int64_t numOfBlocks = tbufReadUint64(&br); + pDist->numOfSmallBlocks = tbufReadUint32(&br); + int64_t numSteps = tbufReadUint64(&br); bool comp = tbufReadUint8(&br); uint32_t compLen = tbufReadUint32(&br); - size_t originalLen = (size_t) (numOfBlocks*sizeof(SFileBlockInfo)); + size_t originalLen = (size_t) (numSteps *sizeof(SFileBlockInfo)); char* outputBuf = NULL; if (comp) { @@ -633,12 +641,12 @@ void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDi int32_t orignalLen = tsDecompressString(compStr, compLen, 1, outputBuf, (int32_t)originalLen , ONE_STAGE_COMP, NULL, 0); - assert(orignalLen == numOfBlocks*sizeof(SFileBlockInfo)); + assert(orignalLen == numSteps *sizeof(SFileBlockInfo)); } else { outputBuf = (char*) tbufReadBinary(&br, &originalLen); } - pDist->dataBlockInfos = taosArrayFromList(outputBuf, (uint32_t) numOfBlocks, sizeof(SFileBlockInfo)); + pDist->dataBlockInfos = taosArrayFromList(outputBuf, (uint32_t)numSteps, sizeof(SFileBlockInfo)); if (comp) { tfree(outputBuf); } diff --git a/src/tsdb/inc/tsdbCommit.h b/src/tsdb/inc/tsdbCommit.h index 5e740081d1..f1c3a91746 100644 --- a/src/tsdb/inc/tsdbCommit.h +++ b/src/tsdb/inc/tsdbCommit.h @@ -29,6 +29,8 @@ typedef struct { int64_t size; } SKVRecord; +#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5) + void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn); int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord); void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord); diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index be686fcffd..1babb01530 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -2104,6 +2104,7 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) queryHandle; pTableBlockInfo->totalSize = 0; + pTableBlockInfo->totalRows = 0; STsdbFS* pFileHandle = REPO_FS(pQueryHandle->pTsdb); // find the start data block in file @@ -2121,6 +2122,7 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist int32_t code = TSDB_CODE_SUCCESS; int32_t numOfBlocks = 0; int32_t numOfTables = (int32_t)taosArrayGetSize(pQueryHandle->pTableCheckInfo); + int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock); STimeWindow win = TSWINDOW_INITIALIZER; while (true) { @@ -2136,7 +2138,7 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist // current file are not overlapped with query time window, ignore remain files if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) || - (!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) { + (!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) { tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb)); tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, 0x%"PRIx64, pQueryHandle, pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qId); @@ -2177,7 +2179,13 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist pTableBlockInfo->totalSize += pBlock[j].len; int32_t numOfRows = pBlock[j].numOfRows; - taosArrayPush(pTableBlockInfo->dataBlockInfos, &numOfRows); + pTableBlockInfo->totalRows += numOfRows; + if (numOfRows > pTableBlockInfo->maxRows) pTableBlockInfo->maxRows = numOfRows; + if (numOfRows < pTableBlockInfo->minRows) pTableBlockInfo->minRows = numOfRows; + if (numOfRows < defaultRows) pTableBlockInfo->numOfSmallBlocks+=1; + int32_t stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS; + SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex); + blockInfo->numBlocksOfStep++; } } } diff --git a/src/util/inc/tarray.h b/src/util/inc/tarray.h index f2e268c2d4..677be0051e 100644 --- a/src/util/inc/tarray.h +++ b/src/util/inc/tarray.h @@ -98,6 +98,14 @@ void* taosArrayGetLast(const SArray* pArray); */ size_t taosArrayGetSize(const SArray* pArray); +/** + * set the size of array + * @param pArray + * @param size size of the array + * @return + */ +void taosArraySetSize(SArray* pArray, size_t size); + /** * insert data into array * @param pArray diff --git a/src/util/src/tarray.c b/src/util/src/tarray.c index 4dde5dbba2..6fb4029a84 100644 --- a/src/util/src/tarray.c +++ b/src/util/src/tarray.c @@ -110,6 +110,11 @@ void* taosArrayGetLast(const SArray* pArray) { size_t taosArrayGetSize(const SArray* pArray) { return pArray->size; } +void taosArraySetSize(SArray* pArray, size_t size) { + assert(size <= pArray->capacity); + pArray->size = size; +} + void* taosArrayInsert(SArray* pArray, size_t index, void* pData) { if (pArray == NULL || pData == NULL) { return NULL; diff --git a/tests/script/general/compute/block_dist.sim b/tests/script/general/compute/block_dist.sim index 51cf903654..5343c1db28 100644 --- a/tests/script/general/compute/block_dist.sim +++ b/tests/script/general/compute/block_dist.sim @@ -84,6 +84,10 @@ if $rows != 1 then return -1 endi +print ============== TD-5998 +sql_error select _block_dist() from (select * from $nt) +sql_error select _block_dist() from (select * from $mt) + print =============== clear sql drop database $db sql show databases @@ -91,4 +95,4 @@ if $rows != 0 then return -1 endi -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode1 -s stop -x SIGINT -- GitLab