diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 1ec84f023ad74cc8c29654bf14f55781485b691d..8e81462257fcd2b43dbbb32419a617ae2e8fa46f 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -1481,7 +1481,7 @@ static bool first_last_function_setup(SQLFunctionCtx *pCtx) { // todo opt for null block static void first_function(SQLFunctionCtx *pCtx) { - if (pCtx->order == TSDB_ORDER_DESC) { + if (pCtx->order == TSDB_ORDER_DESC || pCtx->preAggVals.dataBlockLoaded == false) { return; } @@ -1550,28 +1550,17 @@ static void first_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t in * to decide if the value is earlier than current intermediate result */ static void first_dist_function(SQLFunctionCtx *pCtx) { - assert(pCtx->size > 0); - - if (pCtx->size == 0) { - return; - } - /* * do not to check data in the following cases: * 1. data block that are not loaded * 2. scan data files in desc order */ - if (pCtx->order == TSDB_ORDER_DESC) { + if (pCtx->order == TSDB_ORDER_DESC || pCtx->preAggVals.dataBlockLoaded == false) { return; } int32_t notNullElems = 0; - // data block is discard, not loaded, do not need to check it - if (!pCtx->preAggVals.dataBlockLoaded) { - return; - } - // find the first not null value for (int32_t i = 0; i < pCtx->size; ++i) { char *data = GET_INPUT_CHAR_INDEX(pCtx, i); @@ -1655,7 +1644,7 @@ static void first_dist_func_second_merge(SQLFunctionCtx *pCtx) { * least one data in this block that is not null.(TODO opt for this case) */ static void last_function(SQLFunctionCtx *pCtx) { - if (pCtx->order != pCtx->param[0].i64Key) { + if (pCtx->order != pCtx->param[0].i64Key || pCtx->preAggVals.dataBlockLoaded == false) { return; } diff --git a/src/query/inc/qresultBuf.h b/src/query/inc/qresultBuf.h index ad01555c2867b02b8365b50335930b81debd7130..a323d530a27ada5eabf9c4316281ee01d782c090 100644 --- a/src/query/inc/qresultBuf.h +++ b/src/query/inc/qresultBuf.h @@ -22,26 +22,22 @@ extern "C" { #include "os.h" #include "qextbuffer.h" +#include "hash.h" -typedef struct SIDList { - uint32_t alloc; - int32_t size; - int32_t* pData; -} SIDList; +typedef struct SArray* SIDList; typedef struct SDiskbasedResultBuf { - int32_t numOfRowsPerPage; - int32_t numOfPages; - int64_t totalBufSize; - int32_t fd; // data file fd - int32_t allocateId; // allocated page id - int32_t incStep; // minimum allocated pages - char* pBuf; // mmap buffer pointer - char* path; // file path + int32_t numOfRowsPerPage; + int32_t numOfPages; + int64_t totalBufSize; + int32_t fd; // data file fd + int32_t allocateId; // allocated page id + int32_t incStep; // minimum allocated pages + char* pBuf; // mmap buffer pointer + char* path; // file path - uint32_t numOfAllocGroupIds; // number of allocated id list - void* idsTable; // id hash table - SIDList* list; // for each id, there is a page id list + SHashObj* idsTable; // id hash table + SIDList list; // for each id, there is a page id list } SDiskbasedResultBuf; #define DEFAULT_INTERN_BUF_PAGE_SIZE (8192L*5) @@ -112,7 +108,7 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle); * @param pList * @return */ -int32_t getLastPageId(SIDList *pList); +int32_t getLastPageId(SIDList pList); #ifdef __cplusplus } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 82cd4f4286bb445964bb15c5ad6ee01f92370732..d1349bd221ba18684097b25ad48b30f57d879c15 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -361,7 +361,7 @@ static bool hasTagValOutput(SQuery* pQuery) { * @return */ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis **pColStatis) { - if (pStatis != NULL) { + if (pStatis != NULL && !TSDB_COL_IS_TAG(pColIndex->flag)) { *pColStatis = &pStatis[pColIndex->colIndex]; assert((*pColStatis)->colId == pColIndex->colId); } else { @@ -472,10 +472,10 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult int32_t pageId = -1; SIDList list = getDataBufPagesIdList(pResultBuf, sid); - if (list.size == 0) { + if (taosArrayGetSize(list) == 0) { pData = getNewDataBuf(pResultBuf, sid, &pageId); } else { - pageId = getLastPageId(&list); + pageId = getLastPageId(list); pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, pageId); if (pData->num >= numOfRowsPerPage) { @@ -2069,7 +2069,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, int32_t colId = pSqlFunc->colInfo.colId; status |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId); - if ((status & BLK_DATA_ALL_NEEDED) != 0) { + if ((status & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { break; } } @@ -2670,16 +2670,19 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { SIDList list = getDataBufPagesIdList(pResultBuf, pQInfo->offset + id); int32_t total = 0; - for (int32_t i = 0; i < list.size; ++i) { - tFilePage *pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, list.pData[i]); + int32_t size = taosArrayGetSize(list); + for (int32_t i = 0; i < size; ++i) { + int32_t* pgId = taosArrayGet(list, i); + tFilePage *pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, *pgId); total += pData->num; } int32_t rows = total; int32_t offset = 0; - for (int32_t num = 0; num < list.size; ++num) { - tFilePage *pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, list.pData[num]); + for (int32_t j = 0; j < size; ++j) { + int32_t* pgId = taosArrayGet(list, j); + tFilePage *pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, *pgId); for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; @@ -2745,7 +2748,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { STableQueryInfo *item = taosArrayGetP(pGroup, i); SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, TSDB_TABLEID(item->pTable)->tid); - if (list.size > 0 && item->windowResInfo.size > 0) { + if (taosArrayGetSize(list) > 0 && item->windowResInfo.size > 0) { pTableList[numOfTables] = item; numOfTables += 1; } @@ -4208,14 +4211,14 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, pRuntimeEnv->topBotQuery, isSTableQuery); - if (isSTableQuery) { + if (isSTableQuery && !onlyQueryTags(pRuntimeEnv->pQuery)) { int32_t rows = getInitialPageNum(pQInfo); code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize, pQInfo); if (code != TSDB_CODE_SUCCESS) { return code; } - if (pQuery->intervalTime == 0) { + if (!QUERY_IS_INTERVAL_QUERY(pQuery)) { int16_t type = TSDB_DATA_TYPE_NULL; if (pRuntimeEnv->groupbyNormalCol) { // group by columns not tags; diff --git a/src/query/src/qresultBuf.c b/src/query/src/qresultBuf.c index fe079694f7edd5e631b736c3150140436c9bca00..6175008ef663123bc631b06ce45bb4094084c27a 100644 --- a/src/query/src/qresultBuf.c +++ b/src/query/src/qresultBuf.c @@ -2,7 +2,6 @@ #include "hash.h" #include "qextbuffer.h" #include "taoserror.h" -#include "tsqlfunction.h" #include "queryLog.h" int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize, void* handle) { @@ -20,11 +19,10 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t si // init id hash table pResBuf->idsTable = taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); - pResBuf->list = calloc(size, sizeof(SIDList)); - pResBuf->numOfAllocGroupIds = size; + pResBuf->list = taosArrayInit(size, POINTER_BYTES); char path[4096] = {0}; - getTmpfilePath("tsdb_q_buf", path); + getTmpfilePath("tsdb_qbuf", path); pResBuf->path = strdup(path); pResBuf->fd = open(pResBuf->path, O_CREAT | O_RDWR, 0666); @@ -48,7 +46,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t si return TSDB_CODE_QRY_OUT_OF_MEMORY; // todo change error code } - qDebug("QInfo:%p create tmp file for output result, %s, %" PRId64 "bytes", handle, pResBuf->path, + qDebug("QInfo:%p create tmp file for output result: %s, %" PRId64 "bytes", handle, pResBuf->path, pResBuf->totalBufSize); return TSDB_CODE_SUCCESS; @@ -90,7 +88,7 @@ static FORCE_INLINE bool noMoreAvailablePages(SDiskbasedResultBuf* pResultBuf) { return (pResultBuf->allocateId == pResultBuf->numOfPages - 1); } -static int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { +static FORCE_INLINE int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { assert(pResultBuf != NULL); char* p = taosHashGet(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t)); @@ -99,51 +97,20 @@ static int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { } int32_t slot = GET_INT32_VAL(p); - assert(slot >= 0 && slot < pResultBuf->numOfAllocGroupIds); + assert(slot >= 0 && slot < taosHashGetSize(pResultBuf->idsTable)); return slot; } static int32_t addNewGroupId(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { int32_t num = getNumOfResultBufGroupId(pResultBuf); // the num is the newest allocated group id slot - - if (pResultBuf->numOfAllocGroupIds <= num) { - size_t n = pResultBuf->numOfAllocGroupIds << 1u; - - SIDList* p = (SIDList*)realloc(pResultBuf->list, sizeof(SIDList) * n); - assert(p != NULL); - - memset(&p[pResultBuf->numOfAllocGroupIds], 0, sizeof(SIDList) * pResultBuf->numOfAllocGroupIds); - - pResultBuf->list = p; - pResultBuf->numOfAllocGroupIds = n; - } - taosHashPut(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t), &num, sizeof(int32_t)); - return num; -} -static int32_t doRegisterId(SIDList* pList, int32_t id) { - if (pList->size >= pList->alloc) { - int32_t s = 0; - if (pList->alloc == 0) { - s = 4; - assert(pList->pData == NULL); - } else { - s = pList->alloc << 1u; - } - - int32_t* c = realloc(pList->pData, s * sizeof(int32_t)); - assert(c); - - memset(&c[pList->alloc], 0, sizeof(int32_t) * pList->alloc); - - pList->pData = c; - pList->alloc = s; - } + SArray* pa = taosArrayInit(1, sizeof(int32_t)); + taosArrayPush(pResultBuf->list, &pa); - pList->pData[pList->size++] = id; - return 0; + assert(taosArrayGetSize(pResultBuf->list) == taosHashGetSize(pResultBuf->idsTable)); + return num; } static void registerPageId(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t pageId) { @@ -152,8 +119,8 @@ static void registerPageId(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int slot = addNewGroupId(pResultBuf, groupId); } - SIDList* pList = &pResultBuf->list[slot]; - doRegisterId(pList, pageId); + SIDList pList = taosArrayGetP(pResultBuf->list, slot); + taosArrayPush(pList, &pageId); } tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) { @@ -178,12 +145,11 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32 int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; } SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { - SIDList list = {0}; int32_t slot = getGroupIndex(pResultBuf, groupId); if (slot < 0) { - return list; + return taosArrayInit(1, sizeof(int32_t)); } else { - return pResultBuf->list[slot]; + return taosArrayGetP(pResultBuf->list, slot); } } @@ -202,22 +168,20 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) { tfree(pResultBuf->path); - for (int32_t i = 0; i < pResultBuf->numOfAllocGroupIds; ++i) { - SIDList* pList = &pResultBuf->list[i]; - tfree(pList->pData); + size_t size = taosArrayGetSize(pResultBuf->list); + for (int32_t i = 0; i < size; ++i) { + SArray* pa = taosArrayGetP(pResultBuf->list, i); + taosArrayDestroy(pa); } - tfree(pResultBuf->list); + taosArrayDestroy(pResultBuf->list); taosHashCleanup(pResultBuf->idsTable); tfree(pResultBuf); } -int32_t getLastPageId(SIDList *pList) { - if (pList == NULL || pList->size <= 0) { - return -1; - } - - return pList->pData[pList->size - 1]; +int32_t getLastPageId(SIDList pList) { + size_t size = taosArrayGetSize(pList); + return *(int32_t*) taosArrayGet(pList, size - 1); } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 2fca34cd0acc23745b3bf93b4b6d8a8c4c5b3aff..fee8784b88183b86c2a3a417e697f802f63dcad8 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -90,6 +90,12 @@ typedef struct SBlockOrderSupporter { int32_t* numOfBlocksPerTable; } SBlockOrderSupporter; +typedef struct SIOCostSummary { + int64_t blockLoadTime; + int64_t statisInfoLoadTime; + int64_t blockMergeTime; +} SIOCostSummary; + typedef struct STsdbQueryHandle { STsdbRepo* pTsdb; SQueryFilePos cur; // current position @@ -116,6 +122,8 @@ typedef struct STsdbQueryHandle { SArray* defaultLoadColumn;// default load column SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */ SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */ + + SIOCostSummary cost; } STsdbQueryHandle; static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle); @@ -622,6 +630,8 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo tfree(data); int64_t et = taosGetTimestampUs() - st; + + pQueryHandle->cost.blockLoadTime += et; tsdbDebug("%p load file block into buffer, elapsed time:%"PRId64 " us", pQueryHandle, et); return blockLoaded; @@ -1784,23 +1794,22 @@ void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle, SDataBlockInfo* p int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataStatis** pBlockStatis) { STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; - SQueryFilePos* cur = &pHandle->cur; - if (cur->mixBlock) { + SQueryFilePos* c = &pHandle->cur; + if (c->mixBlock) { *pBlockStatis = NULL; return TSDB_CODE_SUCCESS; } - assert((cur->slot >= 0 && cur->slot < pHandle->numOfBlocks) || - ((cur->slot == pHandle->numOfBlocks) && (cur->slot == 0))); - - STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot]; - - // file block with subblocks has no statistics data + STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[c->slot]; + assert((c->slot >= 0 && c->slot < pHandle->numOfBlocks) || ((c->slot == pHandle->numOfBlocks) && (c->slot == 0))); + + // file block with sub-blocks has no statistics data if (pBlockInfo->compBlock->numOfSubBlocks > 1) { *pBlockStatis = NULL; return TSDB_CODE_SUCCESS; } - + + int64_t stime = taosGetTimestampUs(); tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL); // todo opt perf @@ -1830,7 +1839,10 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta pHandle->statis[i].max = pBlockInfo->compBlock->keyLast; } } - + + int64_t elapsed = taosGetTimestampUs() - stime; + pHandle->cost.statisInfoLoadTime += elapsed; + return TSDB_CODE_SUCCESS; } @@ -2351,6 +2363,10 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->imem); tsdbDestroyHelper(&pQueryHandle->rhelper); + + tsdbDebug(":io-cost summary: statis-info time:%"PRId64"us, datablock time:%" PRId64"us ,%p", pQueryHandle->cost.statisInfoLoadTime, + pQueryHandle->cost.blockLoadTime, pQueryHandle->qinfo); + tfree(pQueryHandle); }