From bd2caded2f1a5e3060561726aa66c3bf779a18cf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 5 Mar 2021 11:02:58 +0800 Subject: [PATCH] [td-2895] refactor. --- src/inc/tsdb.h | 7 +++ src/query/inc/qUtil.h | 13 +++- src/query/src/qExecutor.c | 90 ++++++++++++++-------------- src/query/src/qUtil.c | 28 +++++++-- src/tsdb/src/tsdbRead.c | 122 ++++++++++++++++++++++++++------------ 5 files changed, 169 insertions(+), 91 deletions(-) diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index d82d9a6045..a660b9e13f 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -181,6 +181,11 @@ typedef struct SDataBlockInfo { int32_t tid; } SDataBlockInfo; +typedef struct SFileBlockInfo { + int32_t len; + int32_t numOfRows; +} SFileBlockInfo; + typedef struct { void *pTable; TSKEY lastKey; @@ -341,6 +346,8 @@ void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond); void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList); +int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, SArray* pBlockInfo); + /** * get the statistics of repo usage * @param repo. point to the tsdbrepo diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index c5bdf28817..c4a2d900d0 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -55,7 +55,6 @@ static FORCE_INLINE char *getPosInResultPage(SQuery *pQuery, tFilePage* page, in assert(rowOffset >= 0 && pQuery != NULL); int32_t numOfRows = GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery); -// return ((char *)page->data) + offset * numOfRowsPerPage + bytes * realRowId; return ((char *)page->data) + rowOffset + offset * numOfRows; } @@ -71,8 +70,6 @@ void* destroyResultRowPool(SResultRowPool* p); int32_t getNumOfAllocatedResultRows(SResultRowPool* p); int32_t getNumOfUsedResultRows(SResultRowPool* p); -bool isPointInterpoQuery(SQuery *pQuery); - typedef struct { SArray* pResult; // SArray int32_t colId; @@ -82,6 +79,16 @@ void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen); SArray* interResFromBinary(const char* data, int32_t len); void freeInterResult(void* param); +typedef struct { + int64_t numOfTables; + SArray *dataBlockInfos; + int64_t firstSeekTimeUs; + int64_t numOfRowsInMemTable; +} STableBlockDist; + +void blockDistInfoToBinary(SBufferWriter* bw, STableBlockDist* pDist); +void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist); + void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 4c9edb5008..3a76df5fa1 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -61,13 +61,6 @@ typedef struct { STSCursor cur; } SQueryStatusInfo; -typedef struct { - SArray *dataBlockInfos; - int64_t firstSeekTimeUs; - int64_t numOfRowsInMemTable; - char *result; -} STableBlockDist; - #if 0 static UNUSED_FUNC void *u_malloc (size_t __size) { uint32_t v = rand(); @@ -199,6 +192,7 @@ static int32_t setGroupResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResult static void destroyOperatorInfo(SOperatorInfo* pOperator); void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size); void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); +static bool isPointInterpoQuery(SQuery *pQuery); // setup the output buffer for each operator static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) { @@ -3738,7 +3732,6 @@ void queryCostStatis(SQInfo *pQInfo) { static void freeTableBlockDist(STableBlockDist *pTableBlockDist) { if (pTableBlockDist != NULL) { taosArrayDestroy(pTableBlockDist->dataBlockInfos); - free(pTableBlockDist->result); free(pTableBlockDist); } } @@ -3765,12 +3758,13 @@ static void generateBlockDistResult(STableBlockDist *pTableBlockDist) { if (pTableBlockDist == NULL) { return; } +#if 0 int64_t min = INT64_MAX, max = INT64_MIN, avg = 0; SArray* blockInfos= pTableBlockDist->dataBlockInfos; int64_t totalRows = 0, totalBlocks = taosArrayGetSize(blockInfos); for (size_t i = 0; i < taosArrayGetSize(blockInfos); i++) { - SDataBlockInfo *blockInfo = taosArrayGet(blockInfos, i); - int64_t rows = blockInfo->rows; + SFileBlockInfo *blockInfo = taosArrayGet(blockInfos, i); + int64_t rows = blockInfo->numOfRows; min = MIN(min, rows); max = MAX(max, rows); totalRows += rows; @@ -3778,18 +3772,16 @@ static void generateBlockDistResult(STableBlockDist *pTableBlockDist) { avg = totalBlocks > 0 ? (int64_t)(totalRows/totalBlocks) : 0; taosArraySort(blockInfos, compareBlockInfo); - - int sz = sprintf(pTableBlockDist->result, - "summery: \n\t 5th=[%d], 25th=[%d], 50th=[%d],75th=[%d], 95th=[%d], 99th=[%d] \n\t min=[%"PRId64"], max=[%"PRId64"], avg = [%"PRId64"] \n\t totalRows=[%"PRId64"], totalBlocks=[%"PRId64"] \n\t seekHeaderTimeCost=[%"PRId64"(us)] \n\t rowsInMem=[%"PRId64"]", - getPercentileFromSortedArray(blockInfos, 0.05), getPercentileFromSortedArray(blockInfos, 0.25), getPercentileFromSortedArray(blockInfos, 0.50), - getPercentileFromSortedArray(blockInfos, 0.75), getPercentileFromSortedArray(blockInfos, 0.95), getPercentileFromSortedArray(blockInfos, 0.99), - min, max, avg, - totalRows, totalBlocks, - pTableBlockDist->firstSeekTimeUs, - pTableBlockDist->numOfRowsInMemTable); - UNUSED(sz); - return; -} +#endif +// int sz = sprintf(pTableBlockDist->result, +// "summary: \n\t 5th=[%d], 25th=[%d], 50th=[%d],75th=[%d], 95th=[%d], 99th=[%d] \n\t min=[%"PRId64"], max=[%"PRId64"], avg = [%"PRId64"] \n\t totalRows=[%"PRId64"], totalBlocks=[%"PRId64"] \n\t seekHeaderTimeCost=[%"PRId64"(us)] \n\t rowsInMem=[%"PRId64"]", +// getPercentileFromSortedArray(blockInfos, 0.05), getPercentileFromSortedArray(blockInfos, 0.25), getPercentileFromSortedArray(blockInfos, 0.50), +// getPercentileFromSortedArray(blockInfos, 0.75), getPercentileFromSortedArray(blockInfos, 0.95), getPercentileFromSortedArray(blockInfos, 0.99), +// min, max, avg, +// totalRows, totalBlocks, +// pTableBlockDist->firstSeekTimeUs, +// pTableBlockDist->numOfRowsInMemTable); +} //void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { // SQuery *pQuery = pRuntimeEnv->pQuery; // @@ -4412,6 +4404,26 @@ static SSDataBlock* doSeqTableBlocksScan(void* param) { pTableScanInfo->pQueryHandle = pRuntimeEnv->pQueryHandle; pTableScanInfo->externalLoaded = false; } +} + +static SSDataBlock* doBlockInfoScan(void* param) { + SOperatorInfo *pOperator = (SOperatorInfo*)param; + + STableScanInfo *pTableScanInfo = pOperator->info; + + STableBlockDist tableBlockDist = {0}; + tableBlockDist.dataBlockInfos = taosArrayInit(512, sizeof(SFileBlockInfo)); + + tsdbGetFileBlocksDistInfo(pTableScanInfo->pQueryHandle, tableBlockDist.dataBlockInfos); + tableBlockDist.numOfRowsInMemTable = tsdbGetNumOfRowsInMemTable(pTableScanInfo->pQueryHandle); + + SSDataBlock* pBlock = &pTableScanInfo->block; + pBlock->info.rows = 1; + pBlock->info.numOfCols = 1; + + + + } SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime) { @@ -5390,40 +5402,23 @@ void tableQueryImpl(SQInfo *pQInfo) { pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st); } + + void buildTableBlockDistResult(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pRuntimeEnv->pQuery; -// pQuery->pos = 0; - STableBlockDist *pTableBlockDist = calloc(1, sizeof(STableBlockDist)); - pTableBlockDist->dataBlockInfos = taosArrayInit(512, sizeof(SDataBlockInfo)); - pTableBlockDist->result = (char *)malloc(512); + STableBlockDist *pTableBlockDist = calloc(1, sizeof(STableBlockDist)); + pTableBlockDist->dataBlockInfos = taosArrayInit(512, sizeof(SFileBlockInfo)); TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle; - SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; SSchema blockDistSchema = tGetBlockDistColumnSchema(); - int64_t startTime = taosGetTimestampUs(); - while (tsdbNextDataBlockWithoutMerge(pQueryHandle)) { - if (isQueryKilled(pRuntimeEnv->qinfo)) { - freeTableBlockDist(pTableBlockDist); - longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); - } - if (pTableBlockDist->firstSeekTimeUs == 0) { - pTableBlockDist->firstSeekTimeUs = taosGetTimestampUs() - startTime; - } - - tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); - taosArrayPush(pTableBlockDist->dataBlockInfos, &blockInfo); - } - if (terrno != TSDB_CODE_SUCCESS) { - freeTableBlockDist(pTableBlockDist); - longjmp(pRuntimeEnv->env, terrno); - } - - pTableBlockDist->numOfRowsInMemTable = tsdbGetNumOfRowsInMemTable(pQueryHandle); +// int64_t startTime = taosGetTimestampUs(); + tsdbGetFileBlocksDistInfo(pQueryHandle, pTableBlockDist->dataBlockInfos); + pTableBlockDist->numOfRowsInMemTable = tsdbGetNumOfRowsInMemTable(pQueryHandle); - generateBlockDistResult(pTableBlockDist); +// generateBlockDistResult(pTableBlockDist); int type = -1; assert(pQuery->numOfOutput == 1); @@ -5432,6 +5427,7 @@ void buildTableBlockDistResult(SQInfo *pQInfo) { if (pExprInfo[j].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) { type = blockDistSchema.type; } + assert(type == TSDB_DATA_TYPE_BINARY); // STR_WITH_SIZE_TO_VARSTR(pQuery->sdata[j]->data, pTableBlockDist->result, (VarDataLenT)strlen(pTableBlockDist->result)); } diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 67ecc51892..81f797899b 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -243,6 +243,7 @@ void* destroyResultRowPool(SResultRowPool* p) { return NULL; } +// TODO refactor void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen) { uint32_t numOfGroup = (uint32_t) taosArrayGetSize(pRes); tbufWriteUint32(bw, numOfGroup); @@ -572,10 +573,6 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu incNextGroup(pGroupResInfo); } -// if (pGroupResInfo->currentGroup >= pGroupResInfo->totalGroup && !hasRemainData(pGroupResInfo)) { -// SET_STABLE_QUERY_OVER(pRuntimeEnv); -// } - int64_t elapsedTime = taosGetTimestampUs() - st; qDebug("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", pRuntimeEnv->qinfo, pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime); @@ -583,3 +580,26 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu // pQInfo->summary.firstStageMergeTime += elapsedTime; return TSDB_CODE_SUCCESS; } + +/* + * typedef struct { + int64_t numOfTables; + SArray *dataBlockInfos; + int64_t firstSeekTimeUs; + int64_t numOfRowsInMemTable; +} STableBlockDist; + * + */ +SBufferWriter* blockDistInfoToBinary(STableBlockDist* pDist) { + SBufferWriter bw = tbufInitWriter(NULL, false); + + tbufWriteUint64(&bw, pDist->numOfTables); + tbufWriteUint64(&bw, pDist->numOfRowsInMemTable); + tbufWriteUint64(&bw, taosArrayGetSize(pDist->dataBlockInfos)); + + pDist->dataBlockInfos->pData +} + +void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist) { + +} diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index c68fb7bce0..38129fac38 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -78,8 +78,8 @@ typedef struct STableCheckInfo { } STableCheckInfo; typedef struct STableBlockInfo { - SBlock* compBlock; - STableCheckInfo* pTableCheckInfo; + SBlock *compBlock; + STableCheckInfo *pTableCheckInfo; } STableBlockInfo; typedef struct SBlockOrderSupporter { @@ -2006,6 +2006,89 @@ static void moveToNextDataBlockInCurrentFile(STsdbQueryHandle* pQueryHandle) { cur->blockCompleted = false; } +int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, SArray* pBlockInfo) { + STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) queryHandle; + + STsdbFS* pFileHandle = REPO_FS(pQueryHandle->pTsdb); + + // find the start data block in file + pQueryHandle->locateStart = true; + STsdbCfg* pCfg = &pQueryHandle->pTsdb->config; + int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision); + + tsdbRLockFS(pFileHandle); + tsdbFSIterInit(&pQueryHandle->fileIter, pFileHandle, pQueryHandle->order); + tsdbFSIterSeek(&pQueryHandle->fileIter, fid); + tsdbUnLockFS(pFileHandle); + + int32_t code = TSDB_CODE_SUCCESS; + + int32_t numOfBlocks = 0; + int32_t numOfTables = (int32_t)taosArrayGetSize(pQueryHandle->pTableCheckInfo); + STimeWindow win = TSWINDOW_INITIALIZER; + + while (true) { + numOfBlocks = 0; + tsdbRLockFS(REPO_FS(pQueryHandle->pTsdb)); + + if ((pQueryHandle->pFileGroup = tsdbFSIterNext(&pQueryHandle->fileIter)) == NULL) { + tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb)); + break; + } + + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pQueryHandle->pFileGroup->fid, &win.skey, &win.ekey); + + // current file are not overlapped with query time window, ignore remain files + if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) || + (!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) { + tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb)); + tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %p", pQueryHandle, + pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qinfo); + pQueryHandle->pFileGroup = NULL; + break; + } + + if (tsdbSetAndOpenReadFSet(&pQueryHandle->rhelper, pQueryHandle->pFileGroup) < 0) { + tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb)); + code = terrno; + break; + } + + tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb)); + + if (tsdbLoadBlockIdx(&pQueryHandle->rhelper) < 0) { + code = terrno; + break; + } + + if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) { + break; + } + + tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %p", pQueryHandle, numOfBlocks, numOfTables, + pQueryHandle->pFileGroup->fid, pQueryHandle->qinfo); + + if (numOfBlocks == 0) { + continue; + } + + SFileBlockInfo info = {0}; + for (int32_t i = 0; i < numOfTables; ++i) { + STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); + + SBlock* pBlock = pCheckInfo->pCompInfo->blocks; + for (int32_t j = 0; j < pCheckInfo->numOfBlocks; ++j) { + info.numOfRows = pBlock[j].numOfRows; + info.len = pBlock[j].len; + + taosArrayPush(pBlockInfo, &info); + } + } + } + + return code; +} + static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) { STsdbFS* pFileHandle = REPO_FS(pQueryHandle->pTsdb); SQueryFilePos* cur = &pQueryHandle->cur; @@ -2284,41 +2367,6 @@ bool tsdbNextDataBlockWithoutMerge(TsdbQueryHandleT* pHandle) { size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); assert(numOfTables > 0); - if (pQueryHandle->type == TSDB_QUERY_TYPE_LAST && pQueryHandle->cachelastrow) { - // the last row is cached in buffer, return it directly. - // here note that the pQueryHandle->window must be the TS_INITIALIZER - int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)); - SQueryFilePos* cur = &pQueryHandle->cur; - - SDataRow pRow = NULL; - TSKEY key = TSKEY_INITIAL_VAL; - int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; - - if (++pQueryHandle->activeIndex < numOfTables) { - STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex); - int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key); - if (ret != TSDB_CODE_SUCCESS) { - return false; - } - - copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL); - tfree(pRow); - - // update the last key value - pCheckInfo->lastKey = key + step; - - cur->rows = 1; // only one row - cur->lastKey = key + step; - cur->mixBlock = true; - cur->win.skey = key; - cur->win.ekey = key; - - return true; - } - - return false; - } - if (pQueryHandle->checkFiles) { // check if the query range overlaps with the file data block bool exists = true; -- GitLab