diff --git a/src/client/inc/tscSecondaryMerge.h b/src/client/inc/tscSecondaryMerge.h index d423b32356bb90247dc69424f6638d4dfca34f99..08d995c9f3d789a82f5b8fa1331d8653a017181b 100644 --- a/src/client/inc/tscSecondaryMerge.h +++ b/src/client/inc/tscSecondaryMerge.h @@ -90,14 +90,14 @@ typedef struct SSubqueryState { } SSubqueryState; typedef struct SRetrieveSupport { - tExtMemBuffer ** pExtMemBuffer; // for build loser tree + tExtMemBuffer ** pExtMemBuffer; // for build loser tree tOrderDescriptor *pOrderDescriptor; - SColumnModel * pFinalColModel; // colModel for final result + SColumnModel * pFinalColModel; // colModel for final result SSubqueryState * pState; int32_t subqueryIndex; // index of current vnode in vnode list SSqlObj * pParentSqlObj; - tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to - uint32_t numOfRetry; // record the number of retry times + tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to + uint32_t numOfRetry; // record the number of retry times pthread_mutex_t queryMutex; } SRetrieveSupport; diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 73d7f289511a1dc3a068dbe0a31b524bf0bc9901..ca57030539a451d4967ace21fc688a1e44ffea76 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -387,18 +387,19 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data, int32_t numOfRows, int32_t orderType) { - if (pPage->numOfElems + numOfRows <= pDesc->pColumnModel->capacity) { - tColModelAppend(pDesc->pColumnModel, pPage, data, 0, numOfRows, numOfRows); + SColumnModel *pModel = pDesc->pColumnModel; + + if (pPage->numOfElems + numOfRows <= pModel->capacity) { + tColModelAppend(pModel, pPage, data, 0, numOfRows, numOfRows); return 0; } - SColumnModel *pModel = pDesc->pColumnModel; - - int32_t numOfRemainEntries = pDesc->pColumnModel->capacity - pPage->numOfElems; + // current buffer is overflow, flush data to extensive buffer + int32_t numOfRemainEntries = pModel->capacity - pPage->numOfElems; tColModelAppend(pModel, pPage, data, 0, numOfRemainEntries, numOfRows); - /* current buffer is full, need to flushed to disk */ - assert(pPage->numOfElems == pDesc->pColumnModel->capacity); + // current buffer is full, need to flushed to disk + assert(pPage->numOfElems == pModel->capacity); int32_t ret = tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType); if (ret != 0) { return -1; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 41c3a8729530e3bf9f99993a045435d3cc817b61..35a0358f2f3de0ef2d4bd532fde0b3b2d07a901c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1202,6 +1202,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE); return; } + int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data, pRes->numOfRows, pQueryInfo->groupbyExpr.orderType); if (ret < 0) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index e9395d7dde46e478ae5058e6842579eb33079b57..22027ab54a28bee78bba67ffa753136b7d99d0fe 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -676,7 +676,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, STableDataBlocks** dataBlocks) { *dataBlocks = NULL; - STableDataBlocks** t1 = (STableDataBlocks**)taosGetDataFromHash(pHashList, (const char*)&id, sizeof(id)); + STableDataBlocks** t1 = (STableDataBlocks**)taosGetDataFromHashTable(pHashList, (const char*)&id, sizeof(id)); if (t1 != NULL) { *dataBlocks = *t1; } diff --git a/src/inc/hash.h b/src/inc/hash.h index 54a43fb6ebc3f692c642e1270a948016b4244194..14c73fb37015042f2be0dd31be89ba59374ce098 100644 --- a/src/inc/hash.h +++ b/src/inc/hash.h @@ -43,10 +43,10 @@ typedef struct SHashEntry { typedef struct HashObj { SHashEntry **hashList; - uint32_t capacity; - int size; - _hash_fn_t hashFp; - bool multithreadSafe; // enable lock + uint32_t capacity; // number of slots + int size; // number of elements in hash table + _hash_fn_t hashFp; // hash function + bool multithreadSafe; // enable lock or not #if defined LINUX pthread_rwlock_t lock; @@ -57,11 +57,13 @@ typedef struct HashObj { } HashObj; void *taosInitHashTable(uint32_t capacity, _hash_fn_t fn, bool multithreadSafe); +void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen); int32_t taosAddToHashTable(HashObj *pObj, const char *key, uint32_t keyLen, void *data, uint32_t size); -void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen); +int32_t taosNumElemsInHashTable(HashObj *pObj); + +char *taosGetDataFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen); -char *taosGetDataFromHash(HashObj *pObj, const char *key, uint32_t keyLen); void taosCleanUpHashTable(void *handle); diff --git a/src/inc/tresultBuf.h b/src/inc/tresultBuf.h new file mode 100644 index 0000000000000000000000000000000000000000..571417439b2220248df7fa8a3beb933d4e08c7db --- /dev/null +++ b/src/inc/tresultBuf.h @@ -0,0 +1,112 @@ +#ifndef TDENGINE_VNODEQUERYUTIL_H +#define TDENGINE_VNODEQUERYUTIL_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "os.h" +#include "textbuffer.h" + +typedef struct SIDList { + uint32_t alloc; + int32_t size; + int32_t* pData; +} SIDList; + +typedef struct SQueryResultBuf { + int32_t numOfRowsPerPage; + int32_t numOfPages; + int64_t totalBufSize; + int32_t fd; // data file fd + int32_t ifd; // index file fd + int32_t allocateId; // allocated page id + int32_t incStep; // minimum allocated pages + char* pBuf; // mmap buffer pointer + char* path; // file path + + char* ipath; // index file path + int32_t* pIndexData; // index file data + + char* internBuf; // intern buf + int32_t internfd; // intern fd + char* internpath; + + uint32_t numOfAllocGroupIds; // number of allocated id list + void* idsTable; // id hash table + SIDList* list; // for each id, there is a page id list +} SQueryResultBuf; + +/** + * create disk-based result buffer + * @param pResultBuf + * @param size + * @param rowSize + * @return + */ +int32_t createResultBuf(SQueryResultBuf** pResultBuf, int32_t size, int32_t rowSize); + +/** + * + * @param pResultBuf + * @param groupId + * @param pageId + * @return + */ +tFilePage* getNewDataBuf(SQueryResultBuf* pResultBuf, int32_t groupId, int32_t* pageId); + +/** + * + * @param pResultBuf + * @return + */ +int32_t getNumOfRowsPerPage(SQueryResultBuf* pResultBuf); + +/** + * + * @param pResultBuf + * @param groupId + * @return + */ +SIDList getDataBufPagesIdList(SQueryResultBuf* pResultBuf, int32_t groupId); + +/** + * get the specified buffer page by id + * @param pResultBuf + * @param id + * @return + */ +tFilePage* getResultBufferPageById(SQueryResultBuf* pResultBuf, int32_t id); + +/** + * get the total buffer size in the format of disk file + * @param pResultBuf + * @return + */ +int32_t getResBufSize(SQueryResultBuf* pResultBuf); + +/** + * get the number of groups in the result buffer + * @param pResultBuf + * @return + */ +int32_t getNumOfResultBufGroupId(SQueryResultBuf* pResultBuf); + +/** + * destroy result buffer + * @param pResultBuf + */ +void destroyResultBuf(SQueryResultBuf* pResultBuf); + +/** + * + * @param pList + * @return + */ +int32_t getLastPageId(SIDList *pList); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_VNODEQUERYUTIL_H diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index dc86f924aa1655291cb9ee97c2b23d8386ae46a7..2b89cae317d82540c2ff23d5b1b45d46509a27e5 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_VNODEQUERYUTIL_H -#define TDENGINE_VNODEQUERYUTIL_H +#ifndef TDENGINE_VNODEQUERYIMPL_H +#define TDENGINE_VNODEQUERYIMPL_H #ifdef __cplusplus extern "C" { @@ -120,7 +120,7 @@ typedef enum { typedef int (*__block_search_fn_t)(char* data, int num, int64_t key, int order); static FORCE_INLINE SMeterObj* getMeterObj(void* hashHandle, int32_t sid) { - return *(SMeterObj**)taosGetDataFromHash(hashHandle, (const char*) &sid, sizeof(sid)); + return *(SMeterObj**)taosGetDataFromHashTable(hashHandle, (const char*) &sid, sizeof(sid)); } bool isQueryKilled(SQuery* pQuery); @@ -209,7 +209,7 @@ int32_t vnodeGetHeaderFile(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex); * @param ekey * @return */ -SMeterQueryInfo* createMeterQueryInfo(SQuery* pQuery, TSKEY skey, TSKEY ekey); +SMeterQueryInfo* createMeterQueryInfo(SQuery* pQuery, int32_t sid, TSKEY skey, TSKEY ekey); /** * Destroy meter query info @@ -224,7 +224,7 @@ void destroyMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, int32_t numOfCols); * @param skey * @param ekey */ -void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY skey, TSKEY ekey); +void changeMeterQueryInfoForSuppleQuery(SQueryResultBuf* pResultBuf, SMeterQueryInfo *pMeterQueryInfo, TSKEY skey, TSKEY ekey); /** * add the new allocated disk page to meter query info @@ -289,4 +289,4 @@ void closeAllSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo); } #endif -#endif // TDENGINE_VNODEQUERYUTIL_H +#endif // TDENGINE_VNODEQUERYIMPL_H diff --git a/src/system/detail/inc/vnodeRead.h b/src/system/detail/inc/vnodeRead.h index ee88e5e36681a2a9d22faa5f757fa3c4f1f57a59..321b837c771fc4e2aa815ef102a2a76346e3d845 100644 --- a/src/system/detail/inc/vnodeRead.h +++ b/src/system/detail/inc/vnodeRead.h @@ -21,6 +21,7 @@ extern "C" { #endif #include "os.h" +#include "tresultBuf.h" #include "tinterpolation.h" #include "vnodeTagMgmt.h" @@ -182,8 +183,8 @@ typedef struct SMeterQueryInfo { int64_t skey; int64_t ekey; int32_t numOfRes; - uint32_t numOfPages; - uint32_t numOfAlloc; +// uint32_t numOfPages; +// uint32_t numOfAlloc; int32_t reverseIndex; // reversed output indicator, start from (numOfRes-1) int16_t reverseFillRes; // denote if reverse fill the results in supplementary scan required or not int16_t queryRangeSet; // denote if the query range is set, only available for interval query @@ -191,7 +192,9 @@ typedef struct SMeterQueryInfo { int64_t tag; STSCursor cur; SResultInfo* resultInfo; - uint32_t* pageList; +// uint32_t* pageList; +// SIDList pageIdList; + int32_t sid; // for retrieve the page id list } SMeterQueryInfo; typedef struct SMeterDataInfo { @@ -235,15 +238,15 @@ typedef struct SMeterQuerySupportObj { */ int32_t meterIdx; - int32_t meterOutputFd; - int32_t lastPageId; - int32_t numOfPages; +// int32_t meterOutputFd; +// int32_t lastPageId; +// int32_t numOfPages; int32_t numOfGroupResultPages; int32_t groupResultSize; - - char* meterOutputMMapBuf; - int64_t bufSize; - char extBufFile[256]; // external file name + SQueryResultBuf* pResultBuf; +// char* meterOutputMMapBuf; +// int64_t bufSize; +// char extBufFile[256]; // external file name SMeterDataInfo* pMeterDataInfo; diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index bf055a8b63d7bbb9362b513b2423540a558e7cfb..cec3949eb891002256ffd38e97e2542158fcbffd 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -56,7 +56,7 @@ static void vnodeInitLoadCompBlockInfo(SLoadCompBlockInfo *pCompBlockLoadInfo static int32_t moveToNextBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __block_search_fn_t searchFn, bool loadData); static int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, - SQueryRuntimeEnv *pRuntimeEnv, SMeterDataInfo *pMeterHeadDataInfo, + SQueryRuntimeEnv *pRuntimeEnv, SMeterDataInfo *pMeterDataInfo, int32_t start, int32_t end); static TSKEY getTimestampInCacheBlock(SQueryRuntimeEnv *pRuntimeEnv, SCacheBlock *pBlock, int32_t index); @@ -1576,7 +1576,7 @@ static SOutputRes *doSetSlidingWindowFromKey(SSlidingWindowInfo *pSlidingWindowI SWindowStatus **pStatus) { int32_t p = -1; - int32_t *p1 = (int32_t *)taosGetDataFromHash(pSlidingWindowInfo->hashList, pData, bytes); + int32_t *p1 = (int32_t *)taosGetDataFromHashTable(pSlidingWindowInfo->hashList, pData, bytes); if (p1 != NULL) { p = *p1; @@ -1706,7 +1706,7 @@ void clearCompletedSlidingWindows(SSlidingWindowInfo *pSlidingWindowInfo, int32_ for(int32_t k = 0; k < pSlidingWindowInfo->size; ++k) { SWindowStatus* pStatus = &pSlidingWindowInfo->pStatus[k]; - int32_t *p = (int32_t*) taosGetDataFromHash(pSlidingWindowInfo->hashList, (const char*)&pStatus->window.skey, TSDB_KEYSIZE); + int32_t *p = (int32_t*) taosGetDataFromHashTable(pSlidingWindowInfo->hashList, (const char*)&pStatus->window.skey, TSDB_KEYSIZE); int32_t v = *p; v = (v - i); @@ -2653,6 +2653,7 @@ static int64_t getOldestKey(int32_t numOfFiles, int64_t fileId, SVnodeCfg *pCfg) } bool isQueryKilled(SQuery *pQuery) { + return false; SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); /* @@ -4439,14 +4440,14 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) { } } - if (FD_VALID(pSupporter->meterOutputFd)) { - assert(pSupporter->meterOutputMMapBuf != NULL); - dTrace("QInfo:%p disk-based output buffer during query:%" PRId64 " bytes", pQInfo, pSupporter->bufSize); - munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize); - tclose(pSupporter->meterOutputFd); +// if (FD_VALID(pSupporter->meterOutputFd)) { +// assert(pSupporter->meterOutputMMapBuf != NULL); +// dTrace("QInfo:%p disk-based output buffer during query:%" PRId64 " bytes", pQInfo, pSupporter->bufSize); +// munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize); +// tclose(pSupporter->meterOutputFd); - unlink(pSupporter->extBufFile); - } +// unlink(pSupporter->extBufFile); +// } tSidSetDestroy(&pSupporter->pSidSet); @@ -4542,33 +4543,39 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) } if (pQuery->nAggTimeInterval != 0) { - getTmpfilePath("tb_metric_mmap", pSupporter->extBufFile); - pSupporter->meterOutputFd = open(pSupporter->extBufFile, O_CREAT | O_RDWR, 0666); +// getTmpfilePath("tb_metric_mmap", pSupporter->extBufFile); +// pSupporter->meterOutputFd = open(pSupporter->extBufFile, O_CREAT | O_RDWR, 0666); - if (!FD_VALID(pSupporter->meterOutputFd)) { - dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); - return TSDB_CODE_SERV_OUT_OF_MEMORY; - } +// if (!FD_VALID(pSupporter->meterOutputFd)) { +// dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); +// return TSDB_CODE_SERV_OUT_OF_MEMORY; +// } - pSupporter->numOfPages = pSupporter->numOfMeters; +// pSupporter->numOfPages = pSupporter->numOfMeters; - ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE); +// ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE); +// if (ret != TSDB_CODE_SUCCESS) { +// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile, +// strerror(errno)); +// return TSDB_CODE_SERV_NO_DISKSPACE; +// } +// + // one page for each table at least + ret = createResultBuf(&pSupporter->pResultBuf, pSupporter->numOfMeters, pQuery->rowSize); if (ret != TSDB_CODE_SUCCESS) { - dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile, - strerror(errno)); - return TSDB_CODE_SERV_NO_DISKSPACE; + return ret; } - + pRuntimeEnv->numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize; - pSupporter->lastPageId = -1; - pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE; - - pSupporter->meterOutputMMapBuf = - mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0); - if (pSupporter->meterOutputMMapBuf == MAP_FAILED) { - dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); - return TSDB_CODE_SERV_OUT_OF_MEMORY; - } +// pSupporter->lastPageId = -1; +// pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE; + +// pSupporter->meterOutputMMapBuf = +// mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0); +// if (pSupporter->meterOutputMMapBuf == MAP_FAILED) { +// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); +// return TSDB_CODE_SERV_OUT_OF_MEMORY; +// } } // metric query do not invoke interpolation, it will be done at the second-stage merge @@ -5261,19 +5268,9 @@ void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOf } } -static tFilePage *getFilePage(SMeterQuerySupportObj *pSupporter, int32_t pageId) { - assert(pageId <= pSupporter->lastPageId && pageId >= 0); - return (tFilePage *)(pSupporter->meterOutputMMapBuf + DEFAULT_INTERN_BUF_SIZE * pageId); -} - -static tFilePage *getMeterDataPage(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pMeterDataInfo, int32_t pageId) { - SMeterQueryInfo *pMeterQueryInfo = pMeterDataInfo->pMeterQInfo; - if (pageId >= pMeterQueryInfo->numOfPages) { - return NULL; - } - - int32_t realId = pMeterQueryInfo->pageList[pageId]; - return getFilePage(pSupporter, realId); +static tFilePage *getMeterDataPage(SQueryResultBuf* pResultBuf, SMeterQueryInfo *pMeterQueryInfo, int32_t index) { + SIDList pList = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid); + return getResultBufferPageById(pResultBuf, pList.pData[index]); } typedef struct Position { @@ -5289,7 +5286,9 @@ typedef struct SCompSupporter { int64_t getCurrentTimestamp(SCompSupporter *pSupportor, int32_t meterIdx) { Position * pPos = &pSupportor->pPosition[meterIdx]; - tFilePage *pPage = getMeterDataPage(pSupportor->pSupporter, pSupportor->pMeterDataInfo[meterIdx], pPos->pageIdx); + tFilePage *pPage = getMeterDataPage(pSupportor->pSupporter->pResultBuf, + pSupportor->pMeterDataInfo[meterIdx]->pMeterQInfo, pPos->pageIdx); + return *(int64_t *)(pPage->data + TSDB_KEYSIZE * pPos->rowIdx); } @@ -5297,10 +5296,11 @@ int32_t meterResultComparator(const void *pLeft, const void *pRight, void *param int32_t left = *(int32_t *)pLeft; int32_t right = *(int32_t *)pRight; - SCompSupporter *supportor = (SCompSupporter *)param; - - Position leftPos = supportor->pPosition[left]; - Position rightPos = supportor->pPosition[right]; + SCompSupporter *supporter = (SCompSupporter *)param; + SQueryResultBuf* pResultBuf = supporter->pSupporter->pResultBuf; + + Position leftPos = supporter->pPosition[left]; + Position rightPos = supporter->pPosition[right]; /* left source is exhausted */ if (leftPos.pageIdx == -1 && leftPos.rowIdx == -1) { @@ -5312,10 +5312,10 @@ int32_t meterResultComparator(const void *pLeft, const void *pRight, void *param return -1; } - tFilePage *pPageLeft = getMeterDataPage(supportor->pSupporter, supportor->pMeterDataInfo[left], leftPos.pageIdx); + tFilePage *pPageLeft = getMeterDataPage(pResultBuf, supporter->pMeterDataInfo[left]->pMeterQInfo, leftPos.pageIdx); int64_t leftTimestamp = *(int64_t *)(pPageLeft->data + TSDB_KEYSIZE * leftPos.rowIdx); - tFilePage *pPageRight = getMeterDataPage(supportor->pSupporter, supportor->pMeterDataInfo[right], rightPos.pageIdx); + tFilePage *pPageRight = getMeterDataPage(pResultBuf, supporter->pMeterDataInfo[right]->pMeterQInfo, rightPos.pageIdx); int64_t rightTimestamp = *(int64_t *)(pPageRight->data + TSDB_KEYSIZE * rightPos.rowIdx); if (leftTimestamp == rightTimestamp) { @@ -5375,23 +5375,47 @@ void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery) } SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; - char * pStart = pSupporter->meterOutputMMapBuf + DEFAULT_INTERN_BUF_SIZE * (pSupporter->lastPageId + 1) + - pSupporter->groupResultSize * pSupporter->offset; - - uint64_t numOfElem = ((tFilePage *)pStart)->numOfElems; - assert(numOfElem <= pQuery->pointsToRead); - - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - memcpy(pQuery->sdata[i], pStart, pRuntimeEnv->pCtx[i].outputBytes * numOfElem + sizeof(tFilePage)); - pStart += pRuntimeEnv->pCtx[i].outputBytes * pQuery->pointsToRead + sizeof(tFilePage); + SQueryResultBuf* pResultBuf = pSupporter->pResultBuf; + + SIDList list = getDataBufPagesIdList(pResultBuf, 200000 + pSupporter->offset); + +// char * pStart = pSupporter->meterOutputMMapBuf + DEFAULT_INTERN_BUF_SIZE * (pSupporter->lastPageId + 1) + +// pSupporter->groupResultSize * pSupporter->offset; +// uint64_t numOfElem = ((tFilePage *)pStart)->numOfElems; +// assert(numOfElem <= pQuery->pointsToRead); + int32_t total = 0; + for(int32_t i = 0; i < list.size; ++i) { + tFilePage* pData = getResultBufferPageById(pResultBuf, list.pData[i]); + total += pData->numOfElems; + } + + pQuery->sdata[0]->len = total; + + int32_t offset = 0; + for(int32_t num = 0; num < list.size; ++num) { + tFilePage* pData = getResultBufferPageById(pResultBuf, list.pData[num]); + + for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; + + memcpy(pQuery->sdata[i]->data + pRuntimeEnv->offset[i] * total + offset * bytes, + pData->data + pRuntimeEnv->offset[i] * pData->numOfElems, + bytes * pData->numOfElems); +// pStart += pRuntimeEnv->pCtx[i].outputBytes * pQuery->pointsToRead + sizeof(tFilePage); + } + + offset += pData->numOfElems; +// pQuery->sdata[0]->len += pData->numOfElems; } - pQuery->pointsRead += numOfElem; + assert(pQuery->pointsRead == 0); + + pQuery->pointsRead += pQuery->sdata[0]->len; pSupporter->offset += 1; } int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv, - SMeterDataInfo *pMeterHeadDataInfo, int32_t start, int32_t end) { + SMeterDataInfo *pMeterDataInfo, int32_t start, int32_t end) { // calculate the maximum required space if (pSupporter->groupResultSize == 0) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { @@ -5405,8 +5429,11 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery int32_t numOfMeters = 0; for (int32_t i = start; i < end; ++i) { - if (pMeterHeadDataInfo[i].pMeterQInfo->numOfPages > 0 && pMeterHeadDataInfo[i].pMeterQInfo->numOfRes > 0) { - pValidMeter[numOfMeters] = &pMeterHeadDataInfo[i]; + int32_t sid = pMeterDataInfo[i].pMeterQInfo->sid; + SIDList list = getDataBufPagesIdList(pSupporter->pResultBuf, sid); + + if (list.size > 0 && pMeterDataInfo[i].pMeterQInfo->numOfRes > 0) { + pValidMeter[numOfMeters] = &pMeterDataInfo[i]; // set the merge start position: page:0, index:0 posArray[numOfMeters].pageIdx = 0; posArray[numOfMeters++].rowIdx = 0; @@ -5435,10 +5462,12 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery while (1) { int32_t pos = pTree->pNode[0].index; Position * position = &cs.pPosition[pos]; - tFilePage *pPage = getMeterDataPage(cs.pSupporter, pValidMeter[pos], position->pageIdx); + SQueryResultBuf* pResultBuf = cs.pSupporter->pResultBuf; + tFilePage *pPage = getMeterDataPage(pResultBuf, pValidMeter[pos]->pMeterQInfo, position->pageIdx); int64_t ts = getCurrentTimestamp(&cs, pos); - if (ts == lastTimestamp) { // merge with the last one + printf("++++++++++++++++++++++%d, %d, %lld\n", position->pageIdx, pos, ts); + if (ts == lastTimestamp) {// merge with the last one doMerge(pRuntimeEnv, ts, pPage, position->rowIdx, true); } else { // copy data to disk buffer @@ -5450,7 +5479,7 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery resetMergeResultBuf(pQuery, pCtx); } - pPage = getMeterDataPage(cs.pSupporter, pValidMeter[pos], position->pageIdx); + pPage = getMeterDataPage(pResultBuf, pValidMeter[pos]->pMeterQInfo, position->pageIdx); if (pPage->numOfElems <= 0) { // current source data page is empty // do nothing } else { @@ -5466,17 +5495,19 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery cs.pPosition[pos].pageIdx += 1; // try next page // check if current page is empty or not. if it is empty, ignore it and try next - if (cs.pPosition[pos].pageIdx <= cs.pMeterDataInfo[pos]->pMeterQInfo->numOfPages - 1) { - tFilePage *newPage = getMeterDataPage(cs.pSupporter, pValidMeter[pos], position->pageIdx); + SIDList list = getDataBufPagesIdList(pSupporter->pResultBuf, cs.pMeterDataInfo[pos]->pMeterQInfo->sid); + if (cs.pPosition[pos].pageIdx <= list.size - 1) { + tFilePage *newPage = getMeterDataPage(pResultBuf, pValidMeter[pos]->pMeterQInfo, position->pageIdx); + + // if current source data page is null, it must be the last page of source output page if (newPage->numOfElems <= 0) { - // if current source data page is null, it must be the last page of source output page cs.pPosition[pos].pageIdx += 1; - assert(cs.pPosition[pos].pageIdx >= cs.pMeterDataInfo[pos]->pMeterQInfo->numOfPages - 1); + assert(cs.pPosition[pos].pageIdx >= list.size - 1); } } // the following code must be executed if current source pages are exhausted - if (cs.pPosition[pos].pageIdx >= cs.pMeterDataInfo[pos]->pMeterQInfo->numOfPages) { + if (cs.pPosition[pos].pageIdx >= list.size) { cs.pPosition[pos].pageIdx = -1; cs.pPosition[pos].rowIdx = -1; @@ -5494,8 +5525,8 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery if (buffer[0]->numOfElems != 0) { // there are data in buffer if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) { - dError("QInfo:%p failed to flush data into temp file, abort query", GET_QINFO_ADDR(pQuery), - pSupporter->extBufFile); +// dError("QInfo:%p failed to flush data into temp file, abort query", GET_QINFO_ADDR(pQuery), +// pSupporter->extBufFile); tfree(pTree); tfree(pValidMeter); tfree(posArray); @@ -5520,69 +5551,100 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery return pSupporter->numOfGroupResultPages; } -static int32_t extendDiskBuf(const SQuery *pQuery, SMeterQuerySupportObj *pSupporter, int32_t numOfPages) { - assert(pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE == pSupporter->bufSize); - - SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); - - int32_t ret = munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize); - pSupporter->numOfPages = numOfPages; - - /* - * disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may - * be insufficient - */ - ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE); - if (ret != 0) { - dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile, - strerror(errno)); - pQInfo->code = -TSDB_CODE_SERV_NO_DISKSPACE; - pQInfo->killed = 1; - - return pQInfo->code; - } - - pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE; - pSupporter->meterOutputMMapBuf = - mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0); - - if (pSupporter->meterOutputMMapBuf == MAP_FAILED) { - dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); - pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; - pQInfo->killed = 1; - - return pQInfo->code; - } - - return TSDB_CODE_SUCCESS; -} +//static int32_t extendDiskBuf(const SQuery *pQuery, SMeterQuerySupportObj *pSupporter, int32_t numOfPages) { +// assert(pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE == pSupporter->bufSize); +// +// SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); +// +// int32_t ret = munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize); +// pSupporter->numOfPages = numOfPages; +// +// /* +// * disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may +// * be insufficient +// */ +// ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE); +// if (ret != 0) { +// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile, +// strerror(errno)); +// pQInfo->code = -TSDB_CODE_SERV_NO_DISKSPACE; +// pQInfo->killed = 1; +// +// return pQInfo->code; +// } +// +// pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE; +// pSupporter->meterOutputMMapBuf = +// mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0); +// +// if (pSupporter->meterOutputMMapBuf == MAP_FAILED) { +// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); +// pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; +// pQInfo->killed = 1; +// +// return pQInfo->code; +// } +// +// return TSDB_CODE_SUCCESS; +//} int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, const SQueryRuntimeEnv *pRuntimeEnv) { - int32_t numOfMeterResultBufPages = pSupporter->lastPageId + 1; - int64_t dstSize = numOfMeterResultBufPages * DEFAULT_INTERN_BUF_SIZE + - pSupporter->groupResultSize * (pSupporter->numOfGroupResultPages + 1); + printf("group===============%d\n", pSupporter->numOfGroupResultPages); +// int32_t numOfMeterResultBufPages = pSupporter->lastPageId + 1; +// int64_t dstSize = numOfMeterResultBufPages * DEFAULT_INTERN_BUF_SIZE + +// pSupporter->groupResultSize * (pSupporter->numOfGroupResultPages + 1); +// +// int32_t requiredPages = pSupporter->numOfPages; +// if (requiredPages * DEFAULT_INTERN_BUF_SIZE < dstSize) { +// while (requiredPages * DEFAULT_INTERN_BUF_SIZE < dstSize) { +// requiredPages += pSupporter->numOfMeters; +// } +// +// if (extendDiskBuf(pQuery, pSupporter, requiredPages) != TSDB_CODE_SUCCESS) { +// return -1; +// } +// } - int32_t requiredPages = pSupporter->numOfPages; - if (requiredPages * DEFAULT_INTERN_BUF_SIZE < dstSize) { - while (requiredPages * DEFAULT_INTERN_BUF_SIZE < dstSize) { - requiredPages += pSupporter->numOfMeters; - } +// char *lastPosition = pSupporter->meterOutputMMapBuf + DEFAULT_INTERN_BUF_SIZE * numOfMeterResultBufPages + +// pSupporter->groupResultSize * pSupporter->numOfGroupResultPages; - if (extendDiskBuf(pQuery, pSupporter, requiredPages) != TSDB_CODE_SUCCESS) { - return -1; + SQueryResultBuf* pResultBuf = pSupporter->pResultBuf; + int32_t capacity = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage))/ pQuery->rowSize; + + // the base value for group result, since the maximum number of table for each vnode will not exceed 100,000. + int32_t base = 200000; + int32_t pageId = -1; + + int32_t remain = pQuery->sdata[0]->len; + int32_t offset = 0; + + while(remain > 0) { + int32_t r = remain; + if (r > capacity) { + r = capacity; } + + tFilePage* buf = getNewDataBuf(pResultBuf, base + pSupporter->numOfGroupResultPages, &pageId); + + //pagewise copy to dest buffer + for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; + buf->numOfElems = r; + + memcpy(buf->data + pRuntimeEnv->offset[i] * buf->numOfElems, ((char*)pQuery->sdata[i]->data) + offset * bytes, buf->numOfElems * bytes); + } + + offset += r; + remain -= r; } - - char *lastPosition = pSupporter->meterOutputMMapBuf + DEFAULT_INTERN_BUF_SIZE * numOfMeterResultBufPages + - pSupporter->groupResultSize * pSupporter->numOfGroupResultPages; - - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - int32_t size = pRuntimeEnv->pCtx[i].outputBytes * pQuery->sdata[0]->len + sizeof(tFilePage); - memcpy(lastPosition, pQuery->sdata[i], size); - - lastPosition += pRuntimeEnv->pCtx[i].outputBytes * pQuery->pointsToRead + sizeof(tFilePage); - } + +// for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { +// int32_t size = pRuntimeEnv->pCtx[i].outputBytes * pQuery->sdata[0]->len + sizeof(tFilePage); +// memcpy(lastPosition, pQuery->sdata[i], size); +// +// lastPosition += pRuntimeEnv->pCtx[i].outputBytes * pQuery->pointsToRead + sizeof(tFilePage); +// } pSupporter->numOfGroupResultPages += 1; return TSDB_CODE_SUCCESS; @@ -6308,7 +6370,7 @@ int32_t vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, tSidSet *pSidSet pOneMeterDataInfo->offsetInHeaderFile = (uint64_t)compHeader->compInfoOffset; if (pOneMeterDataInfo->pMeterQInfo == NULL) { - pOneMeterDataInfo->pMeterQInfo = createMeterQueryInfo(pQuery, pSupporter->rawSKey, pSupporter->rawEKey); + pOneMeterDataInfo->pMeterQInfo = createMeterQueryInfo(pQuery, pMeterObj->sid, pSupporter->rawSKey, pSupporter->rawEKey); } (*pReqMeterDataInfo)[*numOfMeters] = pOneMeterDataInfo; @@ -6327,18 +6389,18 @@ int32_t vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, tSidSet *pSidSet return TSDB_CODE_SUCCESS; } -SMeterQueryInfo *createMeterQueryInfo(SQuery *pQuery, TSKEY skey, TSKEY ekey) { +SMeterQueryInfo *createMeterQueryInfo(SQuery *pQuery, int32_t sid, TSKEY skey, TSKEY ekey) { SMeterQueryInfo *pMeterQueryInfo = calloc(1, sizeof(SMeterQueryInfo)); pMeterQueryInfo->skey = skey; pMeterQueryInfo->ekey = ekey; pMeterQueryInfo->lastKey = skey; - pMeterQueryInfo->numOfPages = 0; - pMeterQueryInfo->numOfAlloc = INIT_ALLOCATE_DISK_PAGES; - pMeterQueryInfo->pageList = calloc(pMeterQueryInfo->numOfAlloc, sizeof(uint32_t)); +// pMeterQueryInfo->numOfPages = 0; +// pMeterQueryInfo->numOfAlloc = INIT_ALLOCATE_DISK_PAGES; +// pMeterQueryInfo->pageList = calloc(pMeterQueryInfo->numOfAlloc, sizeof(uint32_t)); pMeterQueryInfo->lastResRows = 0; - + pMeterQueryInfo->sid = sid; pMeterQueryInfo->cur.vnodeIndex = -1; pMeterQueryInfo->resultInfo = calloc((size_t)pQuery->numOfOutputCols, sizeof(SResultInfo)); @@ -6355,7 +6417,7 @@ void destroyMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, int32_t numOfCols) return; } - free(pMeterQueryInfo->pageList); +// free(pMeterQueryInfo->pageList); for (int32_t i = 0; i < numOfCols; ++i) { tfree(pMeterQueryInfo->resultInfo[i].interResultBuf); } @@ -6364,7 +6426,8 @@ void destroyMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, int32_t numOfCols) free(pMeterQueryInfo); } -void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY skey, TSKEY ekey) { +void changeMeterQueryInfoForSuppleQuery(SQueryResultBuf* pResultBuf, SMeterQueryInfo *pMeterQueryInfo, TSKEY skey, + TSKEY ekey) { if (pMeterQueryInfo == NULL) { return; } @@ -6378,7 +6441,9 @@ void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY pMeterQueryInfo->cur.vnodeIndex = -1; // previous does not generate any results - if (pMeterQueryInfo->numOfPages == 0) { + SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid); + + if (list.size == 0) { pMeterQueryInfo->reverseFillRes = 0; } else { pMeterQueryInfo->reverseIndex = pMeterQueryInfo->numOfRes; @@ -6386,34 +6451,34 @@ void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY } } -static tFilePage *allocNewPage(SQuery *pQuery, SMeterQuerySupportObj *pSupporter, uint32_t *pageId) { - if (pSupporter->lastPageId == pSupporter->numOfPages - 1) { - if (extendDiskBuf(pQuery, pSupporter, pSupporter->numOfPages + pSupporter->numOfMeters) != TSDB_CODE_SUCCESS) { - return NULL; - } - } - - *pageId = (++pSupporter->lastPageId); - return getFilePage(pSupporter, *pageId); -} - -tFilePage *addDataPageForMeterQueryInfo(SQuery *pQuery, SMeterQueryInfo *pMeterQueryInfo, - SMeterQuerySupportObj *pSupporter) { - uint32_t pageId = 0; - - tFilePage *pPage = allocNewPage(pQuery, pSupporter, &pageId); - if (pPage == NULL) { // failed to allocate disk-based buffer for intermediate results - return NULL; - } - - if (pMeterQueryInfo->numOfPages >= pMeterQueryInfo->numOfAlloc) { - pMeterQueryInfo->numOfAlloc = pMeterQueryInfo->numOfAlloc << 1; - pMeterQueryInfo->pageList = realloc(pMeterQueryInfo->pageList, sizeof(uint32_t) * pMeterQueryInfo->numOfAlloc); - } - - pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages++] = pageId; - return pPage; -} +//static tFilePage *allocNewPage(SQuery *pQuery, SMeterQuerySupportObj *pSupporter, uint32_t *pageId) { +// if (pSupporter->lastPageId == pSupporter->numOfPages - 1) { +// if (extendDiskBuf(pQuery, pSupporter, pSupporter->numOfPages + pSupporter->numOfMeters) != TSDB_CODE_SUCCESS) { +// return NULL; +// } +// } +// +// *pageId = (++pSupporter->lastPageId); +// return getFilePage(pSupporter, *pageId); +//} + +//tFilePage *addDataPageForMeterQueryInfo(SQuery *pQuery, SMeterQueryInfo *pMeterQueryInfo, +// SMeterQuerySupportObj *pSupporter) { +// uint32_t pageId = 0; +// +// tFilePage *pPage = allocNewPage(pQuery, pSupporter, &pageId); +// if (pPage == NULL) { // failed to allocate disk-based buffer for intermediate results +// return NULL; +// } +// +// if (pMeterQueryInfo->numOfPages >= pMeterQueryInfo->numOfAlloc) { +// pMeterQueryInfo->numOfAlloc = pMeterQueryInfo->numOfAlloc << 1; +// pMeterQueryInfo->pageList = realloc(pMeterQueryInfo->pageList, sizeof(uint32_t) * pMeterQueryInfo->numOfAlloc); +// } +// +// pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages++] = pageId; +// return pPage; +//} void saveIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, SMeterQueryInfo *pMeterQueryInfo) { SQuery *pQuery = pRuntimeEnv->pQuery; @@ -6869,12 +6934,16 @@ void setCtxOutputPointerForSupplementScan(SMeterQuerySupportObj *pSupporter, SMe tFilePage *pData = NULL; int32_t i = 0; + SQueryResultBuf* pResultBuf = pSupporter->pResultBuf; + // find the position for this output result - for (; i < pMeterQueryInfo->numOfPages; ++i) { - pData = getFilePage(pSupporter, pMeterQueryInfo->pageList[i]); + SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid); + for (; i < list.size; ++i) { + pData = getResultBufferPageById(pResultBuf, list.pData[i]); if (index <= pData->numOfElems) { break; } + index -= pData->numOfElems; } @@ -6936,17 +7005,23 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; tFilePage * pData = NULL; - SQuery *pQuery = pRuntimeEnv->pQuery; + SQueryResultBuf* pResultBuf = pSupporter->pResultBuf; +// SQuery *pQuery = pRuntimeEnv->pQuery; // in the first scan, new space needed for results - if (pMeterQueryInfo->numOfPages == 0) { - pData = addDataPageForMeterQueryInfo(pQuery, pMeterQueryInfo, pSupporter); + SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid); + int32_t pageId = -1; + if (list.size == 0) { +// pData = addDataPageForMeterQueryInfo(pQuery, pMeterQueryInfo, pSupporter); + pData = getNewDataBuf(pResultBuf, pMeterQueryInfo->sid, &pageId); } else { - int32_t lastPageId = pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages - 1]; - pData = getFilePage(pSupporter, lastPageId); +// int32_t lastPageId = pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages - 1]; + pData = getResultBufferPageById(pResultBuf, getLastPageId(&list)); +// pData = getFilePage(pSupporter, lastPageId); + printf("==============%d\n", pData->numOfElems); if (pData->numOfElems >= pRuntimeEnv->numOfRowsPerPage) { - pData = addDataPageForMeterQueryInfo(pRuntimeEnv->pQuery, pMeterQueryInfo, pSupporter); + pData = getNewDataBuf(pResultBuf, pMeterQueryInfo->sid, &pageId); if (pData != NULL) { assert(pData->numOfElems == 0); // number of elements must be 0 for new allocated buffer } @@ -7246,7 +7321,9 @@ static void doApplyIntervalQueryOnBlock_rv(SMeterQuerySupportObj *pSupporter, SM assert(completed); - if (pQuery->ekey == pSupporter->rawEKey) { + // while the interval time window is less than the time range gap between two points, nextKey may be greater than + // pSupporter->rawEKey + if (pQuery->ekey == pSupporter->rawEKey || nextKey > pSupporter->rawEKey) { /* whole query completed, save result and abort */ saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows); @@ -7495,8 +7572,12 @@ bool onDemandLoadDatablock(SQuery *pQuery, int16_t queryRangeSet) { static void validateResultBuf(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQuery * pQuery = pSupporter->runtimeEnv.pQuery; + SQueryResultBuf* pResultBuf = pSupporter->pResultBuf; - tFilePage *newOutput = getFilePage(pSupporter, pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages - 1]); + SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid); + int32_t id = getLastPageId(&list); + + tFilePage* newOutput = getResultBufferPageById(pResultBuf, id); for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { assert(pRuntimeEnv->pCtx[i].aOutputBuf - newOutput->data < DEFAULT_INTERN_BUF_SIZE); } @@ -7549,12 +7630,15 @@ int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQue pMeterQueryInfo->reverseIndex -= 1; setCtxOutputPointerForSupplementScan(pSupporter, pMeterQueryInfo); } else { - int32_t pageId = pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages - 1]; - tFilePage *pData = getFilePage(pSupporter, pageId); - + SIDList list = getDataBufPagesIdList(pSupporter->pResultBuf, pMeterQueryInfo->sid); + + int32_t pageId = getLastPageId(&list); + tFilePage* pData = getResultBufferPageById(pSupporter->pResultBuf, pageId); + // in handling records occuring around '1970-01-01', the aligned start timestamp may be 0. TSKEY ts = *(TSKEY *)getOutputResPos(pRuntimeEnv, pData, pData->numOfElems, 0); - + printf("-----------------------%d\n", pData->numOfElems); + SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; qTrace("QInfo:%p vid:%d sid:%d id:%s, save results, ts:%" PRId64 ", total:%d", GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, ts, pMeterQueryInfo->numOfRes + 1); @@ -7592,7 +7676,7 @@ int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQue return TSDB_CODE_SUCCESS; } -static int32_t getSubsetNumber(SMeterQuerySupportObj *pSupporter) { +static int32_t getNumOfSubset(SMeterQuerySupportObj *pSupporter) { SQuery *pQuery = pSupporter->runtimeEnv.pQuery; int32_t totalSubset = 0; @@ -7615,7 +7699,7 @@ static int32_t doCopyFromGroupBuf(SMeterQuerySupportObj *pSupporter, SOutputRes dTrace("QInfo:%p start to copy data to dest buf", GET_QINFO_ADDR(pSupporter->runtimeEnv.pQuery)); - int32_t totalSubset = getSubsetNumber(pSupporter); + int32_t totalSubset = getNumOfSubset(pSupporter); if (orderType == TSQL_SO_ASC) { startIdx = pSupporter->subgroupIdx; @@ -7908,18 +7992,10 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage ret -= pQuery->limit.offset; // todo !!!!there exactly number of interpo is not valid. // todo refactor move to the beginning of buffer - // if (QUERY_IS_ASC_QUERY(pQuery)) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { memmove(pDst[i]->data, pDst[i]->data + pQuery->pSelectExpr[i].resBytes * pQuery->limit.offset, ret * pQuery->pSelectExpr[i].resBytes); } - // } else { - // for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - // memmove(pDst[i]->data + (pQuery->pointsToRead - ret) * pQuery->pSelectExpr[i].resBytes, - // pDst[i]->data + (pQuery->pointsToRead - ret - pQuery->limit.offset) * - // pQuery->pSelectExpr[i].resBytes, ret * pQuery->pSelectExpr[i].resBytes); - // } - // } pQuery->limit.offset = 0; return ret; } else { @@ -7940,7 +8016,11 @@ void vnodePrintQueryStatistics(SMeterQuerySupportObj *pSupporter) { SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); SQueryCostSummary *pSummary = &pRuntimeEnv->summary; - pSummary->tmpBufferInDisk = pSupporter->bufSize; + if (pSupporter->pResultBuf == NULL) { + pSummary->tmpBufferInDisk = 0; + } else { + pSummary->tmpBufferInDisk = getResBufSize(pSupporter->pResultBuf); + } dTrace("QInfo:%p statis: comp blocks:%d, size:%d Bytes, elapsed time:%.2f ms", pQInfo, pSummary->readCompInfo, pSummary->totalCompInfoSize, pSummary->loadCompInfoUs / 1000.0); diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index 6fe8b2fa775b57799b150f57ba3f0eb71e7ab3f6..9bcbc772699f4482ef2ad4cde1329db594346a3d 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -132,7 +132,7 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) { pRuntimeEnv->pMeterObj = pMeterObj; if (pMeterInfo[k].pMeterQInfo == NULL) { - pMeterInfo[k].pMeterQInfo = createMeterQueryInfo(pQuery, pSupporter->rawSKey, pSupporter->rawEKey); + pMeterInfo[k].pMeterQInfo = createMeterQueryInfo(pQuery, pMeterObj->sid, pSupporter->rawSKey, pSupporter->rawEKey); } if (pMeterInfo[k].pMeterObj == NULL) { // no data in disk for this meter, set its pointer @@ -858,7 +858,9 @@ static void doOrderedScan(SQInfo *pQInfo) { static void setupMeterQueryInfoForSupplementQuery(SMeterQuerySupportObj *pSupporter) { for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) { SMeterQueryInfo *pMeterQueryInfo = pSupporter->pMeterDataInfo[i].pMeterQInfo; - changeMeterQueryInfoForSuppleQuery(pMeterQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey); + SQueryResultBuf* pResultBuf = pSupporter->pResultBuf; + + changeMeterQueryInfoForSuppleQuery(pResultBuf, pMeterQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey); } } diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 506829368812325d4c77492ea9411d9952944034..99643c92cc68d2964db90fdbf259b37c174bb5f9 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -346,6 +346,14 @@ static void doAddToHashTable(HashObj *pObj, SHashNode *pNode) { // pTrace("key:%s %p add to hash table", key, pNode); } +int32_t taosNumElemsInHashTable(HashObj *pObj) { + if (pObj == NULL) { + return 0; + } + + return pObj->size; +} + /** * add data node into hash table * @param pObj hash object @@ -392,7 +400,7 @@ int32_t taosAddToHashTable(HashObj *pObj, const char *key, uint32_t keyLen, void return 0; } -char *taosGetDataFromHash(HashObj *pObj, const char *key, uint32_t keyLen) { +char *taosGetDataFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen) { if (pObj->multithreadSafe) { __rd_lock(&pObj->lock); } diff --git a/src/util/src/textbuffer.c b/src/util/src/textbuffer.c index 53c273676177904470769afbd814fad3f7fe4735..860de6782be97ce83032cf60d3d2f303af18c795 100644 --- a/src/util/src/textbuffer.c +++ b/src/util/src/textbuffer.c @@ -135,11 +135,11 @@ static bool allocFlushoutInfoEntries(SFileInfo *pFileMeta) { } static bool tExtMemBufferAlloc(tExtMemBuffer *pMemBuffer) { + /* + * the in-mem buffer is full. + * To flush data to disk to accommodate more data + */ if (pMemBuffer->numOfInMemPages > 0 && pMemBuffer->numOfInMemPages == pMemBuffer->inMemCapacity) { - /* - * the in-mem buffer is full. - * To flush data to disk to accommodate more data - */ if (!tExtMemBufferFlush(pMemBuffer)) { return false; } @@ -147,7 +147,7 @@ static bool tExtMemBufferAlloc(tExtMemBuffer *pMemBuffer) { /* * We do not recycle the file page structure. And in flush data operations, all - * filepage that are full of data are destroyed after data being flushed to disk. + * file page that are full of data are destroyed after data being flushed to disk. * * The memory buffer pages may be recycle in order to avoid unnecessary memory * allocation later. @@ -189,9 +189,9 @@ int16_t tExtMemBufferPut(tExtMemBuffer *pMemBuffer, void *data, int32_t numOfRow pLast = pMemBuffer->pTail; } - if (pLast->item.numOfElems + numOfRows <= pMemBuffer->numOfElemsPerPage) { - // enough space for records + if (pLast->item.numOfElems + numOfRows <= pMemBuffer->numOfElemsPerPage) { // enough space for records tColModelAppend(pMemBuffer->pColumnModel, &pLast->item, data, 0, numOfRows, numOfRows); + pMemBuffer->numOfElemsInBuffer += numOfRows; pMemBuffer->numOfTotalElems += numOfRows; } else { @@ -205,8 +205,7 @@ int16_t tExtMemBufferPut(tExtMemBuffer *pMemBuffer, void *data, int32_t numOfRow int32_t remain = numOfRows - numOfRemainEntries; while (remain > 0) { - if (!tExtMemBufferAlloc(pMemBuffer)) { - // failed to allocate memory buffer + if (!tExtMemBufferAlloc(pMemBuffer)) { // failed to allocate memory buffer return -1; } @@ -252,7 +251,7 @@ static bool tExtMemBufferUpdateFlushoutInfo(tExtMemBuffer *pMemBuffer) { pFlushoutInfo->numOfPages = pMemBuffer->numOfInMemPages; pFileMeta->flushoutData.nLength += 1; } else { - // always update the first flushout array in single_flush_model + // always update the first flush out array in single_flush_model pFileMeta->flushoutData.nLength = 1; tFlushoutInfo *pFlushoutInfo = &pFileMeta->flushoutData.pFlushoutInfo[0]; pFlushoutInfo->numOfPages += pMemBuffer->numOfInMemPages; @@ -320,9 +319,7 @@ void tExtMemBufferClear(tExtMemBuffer *pMemBuffer) { return; } - /* - * release all data in memory buffer - */ + //release all data in memory buffer tFilePagesItem *first = pMemBuffer->pHead; while (first != NULL) { tFilePagesItem *ptmp = first; @@ -335,6 +332,7 @@ void tExtMemBufferClear(tExtMemBuffer *pMemBuffer) { pMemBuffer->numOfElemsInBuffer = 0; pMemBuffer->numOfInMemPages = 0; + pMemBuffer->pHead = NULL; pMemBuffer->pTail = NULL; @@ -586,7 +584,7 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta char *endx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, end, f); int32_t colIdx = pDescriptor->orderIdx.pData[0]; - tSortDataPrint(pDescriptor->pColumnModel->pFields[colIdx].type, "before", startx, midx, endx); + tSortDataPrint(pDescriptor->pColumnModel->pFields[colIdx].field.type, "before", startx, midx, endx); #endif if (compareFn(pDescriptor, numOfRows, midIdx, start, data) == 1) { @@ -607,7 +605,7 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta midx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, midIdx, f); startx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, start, f); endx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, end, f); - tSortDataPrint(pDescriptor->pColumnModel->pFields[colIdx].type, "after", startx, midx, endx); + tSortDataPrint(pDescriptor->pColumnModel->pFields[colIdx].field.type, "after", startx, midx, endx); #endif } @@ -661,15 +659,15 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta } #ifdef _DEBUG_VIEW - printf("before sort:\n"); - tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1); +// printf("before sort:\n"); +// tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1); #endif int32_t s = start, e = end; median(pDescriptor, numOfRows, start, end, data, compareFn); #ifdef _DEBUG_VIEW - printf("%s called: %d\n", __FUNCTION__, qsort_call++); +// printf("%s called: %d\n", __FUNCTION__, qsort_call++); #endif UNUSED(qsort_call); @@ -695,7 +693,7 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta } #ifdef _DEBUG_VIEW - tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1); +// tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1); #endif while (s < e) { @@ -714,7 +712,7 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta swap(pDescriptor->pColumnModel, numOfRows, s, data, e); } #ifdef _DEBUG_VIEW - tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1); +// tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1); #endif } @@ -731,7 +729,7 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta rightx += (end - end_same); #ifdef _DEBUG_VIEW - tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1); +// tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1); #endif } @@ -748,7 +746,7 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta leftx -= (start_same - start); #ifdef _DEBUG_VIEW - tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1); +// tRowModelDisplay(pDescriptor, numOfRows, data, end - start + 1); #endif } diff --git a/src/util/src/tresultBuf.c b/src/util/src/tresultBuf.c new file mode 100644 index 0000000000000000000000000000000000000000..57aca682fc1d13faf665b98cb76d338876edce0c --- /dev/null +++ b/src/util/src/tresultBuf.c @@ -0,0 +1,224 @@ +#include "hash.h" +#include "taoserror.h" +#include "textbuffer.h" +#include "tlog.h" +#include "tsqlfunction.h" +#include "tresultBuf.h" + +#define DEFAULT_INTERN_BUF_SIZE 16384L + +int32_t createResultBuf(SQueryResultBuf** pResultBuf, int32_t size, int32_t rowSize) { + SQueryResultBuf* pResBuf = calloc(1, sizeof(SQueryResultBuf)); + pResBuf->numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / rowSize; + pResBuf->numOfPages = size; + + pResBuf->totalBufSize = pResBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE; + pResBuf->incStep = 4; + + // init id hash table + pResBuf->idsTable = taosInitHashTable(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); + pResBuf->list = calloc(size, sizeof(SIDList)); + pResBuf->numOfAllocGroupIds = size; + + char path[4096] = {0}; + getTmpfilePath("tsdb_q_buf", path); + pResBuf->path = strdup(path); + + pResBuf->fd = open(pResBuf->path, O_CREAT | O_RDWR, 0666); + + memset(path, 0, tListLen(path)); + getTmpfilePath("tsdb_q_i", path); + pResBuf->internpath = strdup(path); + + pResBuf->internfd = open(pResBuf->internpath, O_CREAT|O_RDWR, 0666); + + if (!FD_VALID(pResBuf->fd)) { + pError("failed to create tmp file: %s on disk. %s", pResBuf->path, strerror(errno)); + return TSDB_CODE_CLI_NO_DISKSPACE; + } + + int32_t ret = ftruncate(pResBuf->fd, pResBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE); + if (ret != TSDB_CODE_SUCCESS) { + pError("failed to create tmp file: %s on disk. %s", pResBuf->path, strerror(errno)); + return TSDB_CODE_CLI_NO_DISKSPACE; + } + + pResBuf->pBuf = mmap(NULL, pResBuf->totalBufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pResBuf->fd, 0); + if (pResBuf->pBuf == MAP_FAILED) { + pError("QInfo:%p failed to map temp file: %s. %s", pResBuf->path, strerror(errno)); + return TSDB_CODE_CLI_OUT_OF_MEMORY; // todo change error code + } + + pTrace("create tmp file for output result, %s, " PRId64 "bytes", pResBuf->path, pResBuf->totalBufSize); + *pResultBuf = pResBuf; + return TSDB_CODE_SUCCESS; +} + +tFilePage* getResultBufferPageById(SQueryResultBuf* pResultBuf, int32_t id) { + assert(id < pResultBuf->numOfPages && id >= 0); + + return (tFilePage*)(pResultBuf->pBuf + DEFAULT_INTERN_BUF_SIZE * id); +} + +int32_t getNumOfResultBufGroupId(SQueryResultBuf* pResultBuf) { return taosNumElemsInHashTable(pResultBuf->idsTable); } + +int32_t getResBufSize(SQueryResultBuf* pResultBuf) { return pResultBuf->totalBufSize; } + +static int32_t extendDiskFileSize(SQueryResultBuf* pResultBuf, int32_t numOfPages) { + assert(pResultBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE == pResultBuf->totalBufSize); + + int32_t ret = munmap(pResultBuf->pBuf, pResultBuf->totalBufSize); + pResultBuf->numOfPages += numOfPages; + + /* + * disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may + * be insufficient + */ + ret = ftruncate(pResultBuf->fd, pResultBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE); + if (ret != 0) { + // dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile, + // strerror(errno)); + return -TSDB_CODE_SERV_NO_DISKSPACE; + } + + pResultBuf->totalBufSize = pResultBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE; + pResultBuf->pBuf = mmap(NULL, pResultBuf->totalBufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0); + + if (pResultBuf->pBuf == MAP_FAILED) { + // dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); + return -TSDB_CODE_SERV_OUT_OF_MEMORY; + } + + return TSDB_CODE_SUCCESS; +} + +static bool noMoreAvailablePages(SQueryResultBuf* pResultBuf) { + return (pResultBuf->allocateId == pResultBuf->numOfPages - 1); +} + +static int32_t getGroupIndex(SQueryResultBuf* pResultBuf, int32_t groupId) { + assert(pResultBuf != NULL); + + char* p = taosGetDataFromHashTable(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t)); + if (p == NULL) { // it is a new group id + return -1; + } + + int32_t slot = GET_INT32_VAL(p); + assert(slot >= 0 && slot < pResultBuf->numOfAllocGroupIds); + + return slot; +} + +static int32_t addNewGroupId(SQueryResultBuf* pResultBuf, int32_t groupId) { + int32_t num = getNumOfResultBufGroupId(pResultBuf); // the num is the newest allocated group id slot + + if (pResultBuf->numOfAllocGroupIds <= num) { + size_t n = pResultBuf->numOfAllocGroupIds << 1u; + + SIDList* p = (SIDList*)realloc(pResultBuf->list, sizeof(SIDList) * n); + assert(p != NULL); + + memset(&p[pResultBuf->numOfAllocGroupIds], 0, sizeof(SIDList) * pResultBuf->numOfAllocGroupIds); + + pResultBuf->list = p; + pResultBuf->numOfAllocGroupIds = n; + } + + taosAddToHashTable(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t), &num, sizeof(int32_t)); + return num; +} + +static int32_t doRegisterId(SIDList* pList, int32_t id) { + if (pList->size >= pList->alloc) { + int32_t s = 0; + if (pList->alloc == 0) { + s = 4; + assert(pList->pData == NULL); + } else { + s = pList->alloc << 1u; + } + + int32_t* c = realloc(pList->pData, s * sizeof(int32_t)); + assert(c); + + memset(&c[pList->alloc], 0, sizeof(int32_t) * pList->alloc); + + pList->pData = c; + pList->alloc = s; + } + + pList->pData[pList->size++] = id; + return 0; +} + +static void registerPageId(SQueryResultBuf* pResultBuf, int32_t groupId, int32_t pageId) { + int32_t slot = getGroupIndex(pResultBuf, groupId); + if (slot < 0) { + slot = addNewGroupId(pResultBuf, groupId); + } + + SIDList* pList = &pResultBuf->list[slot]; + doRegisterId(pList, pageId); +} + +tFilePage* getNewDataBuf(SQueryResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) { + if (noMoreAvailablePages(pResultBuf)) { + if (extendDiskFileSize(pResultBuf, pResultBuf->incStep) != TSDB_CODE_SUCCESS) { + return NULL; + } + } + + // register new id in this group + *pageId = (pResultBuf->allocateId++); + registerPageId(pResultBuf, groupId, *pageId); + + return getResultBufferPageById(pResultBuf, *pageId); +} + +int32_t getNumOfRowsPerPage(SQueryResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; } + +SIDList getDataBufPagesIdList(SQueryResultBuf* pResultBuf, int32_t groupId) { + SIDList list = {0}; + int32_t slot = getGroupIndex(pResultBuf, groupId); + if (slot < 0) { + return list; + } else { + return pResultBuf->list[slot]; + } +} + +void destroyResultBuf(SQueryResultBuf* pResultBuf) { + if (pResultBuf == NULL) { + return; + } + + if (FD_VALID(pResultBuf->fd)) { + close(pResultBuf->fd); + } + + pTrace("disk-based output buffer closed, %" PRId64 " bytes, file:%s", pResultBuf->totalBufSize, pResultBuf->path); + munmap(pResultBuf->pBuf, pResultBuf->totalBufSize); + unlink(pResultBuf->path); + + tfree(pResultBuf->path); + + for (int32_t i = 0; i < pResultBuf->numOfAllocGroupIds; ++i) { + SIDList* pList = &pResultBuf->list[i]; + tfree(pList->pData); + } + + tfree(pResultBuf->list); + taosCleanUpHashTable(pResultBuf->idsTable); + + tfree(pResultBuf); +} + +int32_t getLastPageId(SIDList *pList) { + if (pList == NULL && pList->size <= 0) { + return -1; + } + + return pList->pData[pList->size - 1]; +} + diff --git a/src/util/src/ttokenizer.c b/src/util/src/ttokenizer.c index a1e7a6828c87e846fd1600a33a06d8b171b4523e..d4f3bd6879dae6b9e8573a9230f39fe3405b5927 100644 --- a/src/util/src/ttokenizer.c +++ b/src/util/src/ttokenizer.c @@ -274,7 +274,7 @@ int tSQLKeywordCode(const char* z, int n) { } } - SKeyword** pKey = (SKeyword**)taosGetDataFromHash(KeywordHashTable, key, n); + SKeyword** pKey = (SKeyword**)taosGetDataFromHashTable(KeywordHashTable, key, n); if (pKey != NULL) { return (*pKey)->type; } else {