diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index 35279ca011f1a8aeec012e1d2311862dde2f95ca..2002483f03136fcc65d0bb1727ac169d12473db7 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -27,7 +27,13 @@ extern "C" { #define GET_QINFO_ADDR(x) ((char*)(x)-offsetof(SQInfo, query)) #define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0) +/* + * set the output buffer page size is 16k + * The page size should be sufficient for at least one output result or intermediate result. + * Some intermediate results may be extremely large, such as top/bottom(100) query. + */ #define DEFAULT_INTERN_BUF_SIZE 16384L + #define INIT_ALLOCATE_DISK_PAGES 60L #define DEFAULT_DATA_FILE_MAPPING_PAGES 2L #define DEFAULT_DATA_FILE_MMAP_WINDOW_SIZE (DEFAULT_DATA_FILE_MAPPING_PAGES * DEFAULT_INTERN_BUF_SIZE) @@ -160,7 +166,7 @@ void pointInterpSupporterDestroy(SPointInterpoSupporter* pPointInterpSupport); void pointInterpSupporterSetData(SQInfo* pQInfo, SPointInterpoSupporter* pPointInterpSupport); int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv* pRuntimeEnv, SPositionInfo* position); -void doCloseAllOpenedResults(SMeterQuerySupportObj* pSupporter); +int32_t doCloseAllOpenedResults(SMeterQuerySupportObj* pSupporter); void disableFunctForSuppleScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order); void enableFunctForMasterScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order); @@ -185,7 +191,7 @@ void freeMeterBlockInfoEx(SMeterDataBlockInfoEx* pDataBlockInfoEx, int32_t len); void setExecutionContext(SMeterQuerySupportObj* pSupporter, SOutputRes* outputRes, int32_t meterIdx, int32_t groupIdx, SMeterQueryInfo* sqinfo); -void setIntervalQueryExecutionContext(SMeterQuerySupportObj* pSupporter, int32_t meterIdx, SMeterQueryInfo* sqinfo); +int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj* pSupporter, int32_t meterIdx, SMeterQueryInfo* sqinfo); int64_t getQueryStartPositionInCache(SQueryRuntimeEnv* pRuntimeEnv, int32_t* slot, int32_t* pos, bool ignoreQueryRange); int64_t getNextAccessedKeyInData(SQuery* pQuery, int64_t* pPrimaryCol, SBlockInfo* pBlockInfo, int32_t blockStatus); @@ -224,11 +230,11 @@ void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY /** * add the new allocated disk page to meter query info * the new allocated disk page is used to keep the intermediate (interval) results - * + * @param pQuery * @param pMeterQueryInfo * @param pSupporter */ -tFilePage* addDataPageForMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportObj *pSupporter); +tFilePage* addDataPageForMeterQueryInfo(SQuery* pQuery, SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportObj *pSupporter); /** * save the query range data into SMeterQueryInfo diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 9620ab38dcbc357f1649a9b909b1abecb4c73233..5174753d8224a15b20af798844e50057cc13b2ce 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -67,13 +67,13 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, char *sdata, SField *pFields, __block_search_fn_t searchFn); -static void saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult); +static int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult); static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, char *data, int64_t *pPrimaryData, SBlockInfo *pBlockInfo, int32_t blockStatus, SField *pFields, __block_search_fn_t searchFn); static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx); -static void flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, +static int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, const SQueryRuntimeEnv *pRuntimeEnv); static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes); static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid); @@ -413,7 +413,7 @@ char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int vnodeSetOpenedFileNames(pVnodeFileInfo); if (doOpenQueryFileData(pQInfo, pVnodeFileInfo, vnodeId) != TSDB_CODE_SUCCESS) { - doCloseOpenedFileData(pVnodeFileInfo); // there may be partially open fd, close it anyway. + doCloseOpenedFileData(pVnodeFileInfo); // all the fds may be partially opened, close them anyway. return pVnodeFileInfo->pHeaderFileData; } } @@ -1291,9 +1291,6 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; - // if (!functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - // continue; - // } SField dummyField = {0}; @@ -3052,7 +3049,7 @@ static void vnodeRecordAllFiles(SQInfo *pQInfo, int32_t vnodeId) { sprintf(pVnodeFilesInfo->dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId); DIR *pDir = opendir(pVnodeFilesInfo->dbFilePathPrefix); if (pDir == NULL) { - dError("QInfo:%p failed to open directory:%s", pQInfo, pVnodeFilesInfo->dbFilePathPrefix); + dError("QInfo:%p failed to open directory:%s, %s", pQInfo, pVnodeFilesInfo->dbFilePathPrefix, strerror(errno)); return; } @@ -3920,11 +3917,16 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); return TSDB_CODE_SERV_OUT_OF_MEMORY; } - - // set 4k page for each meter + pSupporter->numOfPages = pSupporter->numOfMeters; - ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE); + ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE); + if (ret != TSDB_CODE_SUCCESS) { + dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile, + strerror(errno)); + return TSDB_CODE_SERV_NO_DISKSPACE; + } + pSupporter->runtimeEnv.numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize; pSupporter->lastPageId = -1; pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE; @@ -3932,7 +3934,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) pSupporter->meterOutputMMapBuf = mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0); if (pSupporter->meterOutputMMapBuf == MAP_FAILED) { - dError("QInfo:%p failed to map data file: %s to disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); + dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); return TSDB_CODE_SERV_OUT_OF_MEMORY; } } @@ -4733,20 +4735,24 @@ int32_t mergeMetersResultToOneGroups(SMeterQuerySupportObj *pSupporter) { SQuery * pQuery = pRuntimeEnv->pQuery; int64_t st = taosGetTimestampMs(); + int32_t ret = TSDB_CODE_SUCCESS; while (pSupporter->subgroupIdx < pSupporter->pSidSet->numOfSubSet) { int32_t start = pSupporter->pSidSet->starterPos[pSupporter->subgroupIdx]; int32_t end = pSupporter->pSidSet->starterPos[pSupporter->subgroupIdx + 1]; - int32_t ret = - doMergeMetersResultsToGroupRes(pSupporter, pQuery, pRuntimeEnv, pSupporter->pMeterDataInfo, start, end); + ret = doMergeMetersResultsToGroupRes(pSupporter, pQuery, pRuntimeEnv, pSupporter->pMeterDataInfo, start, end); + if (ret < 0) { // not enough disk space to save the data into disk + return -1; + } + pSupporter->subgroupIdx += 1; - /* this group generates at least one result, return results */ + // this group generates at least one result, return results if (ret > 0) { break; } - + assert(pSupporter->numOfGroupResultPages == 0); dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pSupporter->subgroupIdx - 1); } @@ -4754,7 +4760,7 @@ int32_t mergeMetersResultToOneGroups(SMeterQuerySupportObj *pSupporter) { dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", GET_QINFO_ADDR(pQuery), pSupporter->subgroupIdx - 1, pSupporter->pSidSet->numOfSubSet, taosGetTimestampMs() - st); - return pSupporter->numOfGroupResultPages; + return TSDB_CODE_SUCCESS; } void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery) { @@ -4762,7 +4768,9 @@ void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery) pSupporter->numOfGroupResultPages = 0; // current results of group has been sent to client, try next group - mergeMetersResultToOneGroups(pSupporter); + if (mergeMetersResultToOneGroups(pSupporter) != TSDB_CODE_SUCCESS) { + return; // failed to save data in the disk + } // set current query completed if (pSupporter->numOfGroupResultPages == 0 && pSupporter->subgroupIdx == pSupporter->pSidSet->numOfSubSet) { @@ -4840,7 +4848,10 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery } else { // copy data to disk buffer if (buffer[0]->numOfElems == pQuery->pointsToRead) { - flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv); + if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) { + return -1; + } + resetMergeResultBuf(pQuery, pCtx); } @@ -4887,7 +4898,14 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery } if (buffer[0]->numOfElems != 0) { // there are data in buffer - flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv); + if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) { + dError("QInfo:%p failed to flush data into temp file, abort query", GET_QINFO_ADDR(pQuery), pSupporter->extBufFile); + tfree(pTree); + tfree(pValidMeter); + tfree(posArray); + + return -1; + } } int64_t endt = taosGetTimestampMs(); @@ -4906,25 +4924,44 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery return pSupporter->numOfGroupResultPages; } -static void extendDiskBuf(SMeterQuerySupportObj *pSupporter, int32_t numOfPages) { +static int32_t extendDiskBuf(const SQuery* pQuery, SMeterQuerySupportObj *pSupporter, int32_t numOfPages) { assert(pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE == pSupporter->bufSize); - + + SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pQuery); + int32_t ret = munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize); pSupporter->numOfPages = numOfPages; - // disk-based output buffer is exhausted, try to extend the disk-based buffer + /* + * disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may + * be insufficient + */ ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE); if (ret != 0) { - perror("error in allocate the disk-based buffer"); - return; + dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile, + strerror(errno)); + pQInfo->code = -TSDB_CODE_SERV_NO_DISKSPACE; + pQInfo->killed = 1; + + return pQInfo->code; } pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE; pSupporter->meterOutputMMapBuf = mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0); + + if (pSupporter->meterOutputMMapBuf == MAP_FAILED) { + dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); + pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; + pQInfo->killed = 1; + + return pQInfo->code; + } + + return TSDB_CODE_SUCCESS; } -void flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, const SQueryRuntimeEnv *pRuntimeEnv) { +int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, const SQueryRuntimeEnv *pRuntimeEnv) { int32_t numOfMeterResultBufPages = pSupporter->lastPageId + 1; int64_t dstSize = numOfMeterResultBufPages * DEFAULT_INTERN_BUF_SIZE + pSupporter->groupResultSize * (pSupporter->numOfGroupResultPages + 1); @@ -4935,7 +4972,9 @@ void flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, requiredPages += pSupporter->numOfMeters; } - extendDiskBuf(pSupporter, requiredPages); + if (extendDiskBuf(pQuery, pSupporter, requiredPages) != TSDB_CODE_SUCCESS) { + return -1; + } } char *lastPosition = pSupporter->meterOutputMMapBuf + DEFAULT_INTERN_BUF_SIZE * numOfMeterResultBufPages + @@ -4949,6 +4988,7 @@ void flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, } pSupporter->numOfGroupResultPages += 1; + return TSDB_CODE_SUCCESS; } void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx) { @@ -4966,7 +5006,7 @@ void setMeterDataInfo(SMeterDataInfo *pMeterDataInfo, SMeterObj *pMeterObj, int3 pMeterDataInfo->meterOrderIdx = meterIdx; } -void doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) { +int32_t doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; @@ -4980,11 +5020,20 @@ void doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) { pRuntimeEnv->pMeterObj = getMeterObj(pSupporter->pMeterObj, pSupporter->pSidSet->pSids[index]->sid); assert(pRuntimeEnv->pMeterObj == pMeterInfo[i].pMeterObj); - setIntervalQueryExecutionContext(pSupporter, i, pMeterInfo[i].pMeterQInfo); - saveResult(pSupporter, pMeterInfo[i].pMeterQInfo, pMeterInfo[i].pMeterQInfo->lastResRows); + int32_t ret = setIntervalQueryExecutionContext(pSupporter, i, pMeterInfo[i].pMeterQInfo); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } + + ret = saveResult(pSupporter, pMeterInfo[i].pMeterQInfo, pMeterInfo[i].pMeterQInfo->lastResRows); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } } } } + + return TSDB_CODE_SUCCESS; } void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { @@ -5690,18 +5739,24 @@ void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY } } -static tFilePage *allocNewPage(SMeterQuerySupportObj *pSupporter, uint32_t *pageId) { +static tFilePage *allocNewPage(SQuery* pQuery, SMeterQuerySupportObj *pSupporter, uint32_t *pageId) { if (pSupporter->lastPageId == pSupporter->numOfPages - 1) { - extendDiskBuf(pSupporter, pSupporter->numOfPages + pSupporter->numOfMeters); + if (extendDiskBuf(pQuery, pSupporter, pSupporter->numOfPages + pSupporter->numOfMeters) != TSDB_CODE_SUCCESS) { + return NULL; + } } *pageId = (++pSupporter->lastPageId); return getFilePage(pSupporter, *pageId); } -tFilePage *addDataPageForMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportObj *pSupporter) { +tFilePage *addDataPageForMeterQueryInfo(SQuery* pQuery, SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportObj *pSupporter) { uint32_t pageId = 0; - tFilePage *pPage = allocNewPage(pSupporter, &pageId); + + tFilePage *pPage = allocNewPage(pQuery, pSupporter, &pageId); + if (pPage == NULL) { // failed to allocate disk-based buffer for intermediate results + return NULL; + } if (pMeterQueryInfo->numOfPages >= pMeterQueryInfo->numOfAlloc) { pMeterQueryInfo->numOfAlloc = pMeterQueryInfo->numOfAlloc << 1; @@ -6199,46 +6254,53 @@ void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t } } -void setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo) { +int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; tFilePage * pData = NULL; + SQuery* pQuery = pRuntimeEnv->pQuery; + // in the first scan, new space needed for results if (pMeterQueryInfo->numOfPages == 0) { - pData = addDataPageForMeterQueryInfo(pMeterQueryInfo, pSupporter); + pData = addDataPageForMeterQueryInfo(pQuery, pMeterQueryInfo, pSupporter); } else { int32_t lastPageId = pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages - 1]; pData = getFilePage(pSupporter, lastPageId); if (pData->numOfElems >= pRuntimeEnv->numOfRowsPerPage) { - pData = addDataPageForMeterQueryInfo(pMeterQueryInfo, pSupporter); - assert(pData->numOfElems == 0); // number of elements must be 0 for new allocated buffer + pData = addDataPageForMeterQueryInfo(pRuntimeEnv->pQuery, pMeterQueryInfo, pSupporter); + if (pData != NULL) { + assert(pData->numOfElems == 0); // number of elements must be 0 for new allocated buffer + } } } + + if (pData == NULL) { + return -1; + } for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) { pRuntimeEnv->pCtx[i].aOutputBuf = getOutputResPos(pRuntimeEnv, pData, pData->numOfElems, i); pRuntimeEnv->pCtx[i].resultInfo = &pMeterQueryInfo->resultInfo[i]; } + + return TSDB_CODE_SUCCESS; } -void setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int32_t meterIdx, +int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int32_t meterIdx, SMeterQueryInfo *pMeterQueryInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; if (IS_MASTER_SCAN(pRuntimeEnv)) { - setOutputBufferForIntervalQuery(pSupporter, pMeterQueryInfo); + if (setOutputBufferForIntervalQuery(pSupporter, pMeterQueryInfo) != TSDB_CODE_SUCCESS) { + // not enough disk space or memory buffer for intermediate results + return -1; + } if (pMeterQueryInfo->lastResRows == 0) { initCtxOutputBuf(pRuntimeEnv); } - // reset the number of iterated elements, once this function is called. since the pCtx for different - for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { - // pRuntimeEnv->pCtx[j].numOfIteratedElems = 0; - } - } else { if (pMeterQueryInfo->reverseFillRes) { setCtxOutputPointerForSupplementScan(pSupporter, pMeterQueryInfo); @@ -6249,7 +6311,9 @@ void setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int32_t * * If the master scan does not produce any results, new spaces needed to be allocated during supplement scan */ - setOutputBufferForIntervalQuery(pSupporter, pMeterQueryInfo); + if (setOutputBufferForIntervalQuery(pSupporter, pMeterQueryInfo) != TSDB_CODE_SUCCESS) { + return -1; + } } } @@ -6659,14 +6723,14 @@ static void validateResultBuf(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo } } -void saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult) { +int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; // no results generated, do nothing for master scan if (numOfResult <= 0) { if (IS_MASTER_SCAN(pRuntimeEnv)) { - return; + return TSDB_CODE_SUCCESS; } else { /* * There is a case that no result generated during the the supplement scan, and during the main @@ -6691,7 +6755,7 @@ void saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryI setCtxOutputPointerForSupplementScan(pSupporter, pMeterQueryInfo); } - return; + return TSDB_CODE_SUCCESS; } } @@ -6720,7 +6784,9 @@ void saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryI pMeterQueryInfo->numOfRes += numOfResult; assert(pData->numOfElems <= pRuntimeEnv->numOfRowsPerPage); - setOutputBufferForIntervalQuery(pSupporter, pMeterQueryInfo); + if (setOutputBufferForIntervalQuery(pSupporter, pMeterQueryInfo) != TSDB_CODE_SUCCESS) { + return -1; + } for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { resetResultInfo(&pMeterQueryInfo->resultInfo[i]); @@ -6743,6 +6809,8 @@ void saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryI tColModelDisplay(cm, outputPage->data, outputPage->numOfElems, pRuntimeEnv->numOfRowsPerPage); #endif } + + return TSDB_CODE_SUCCESS; } static int32_t getSubsetNumber(SMeterQuerySupportObj *pSupporter) { diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index dea865e5cdf2ba833b5f49c96fb98033e752e550..cec76d1cba836d3161d32240b1ccf3af91230fa5 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -157,7 +157,11 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe setExecutionContext(pSupporter, pSupporter->pResult, k, pMeterInfo[k].groupIdx, pMeterQueryInfo); } else { - setIntervalQueryExecutionContext(pSupporter, k, pMeterQueryInfo); + int32_t ret = setIntervalQueryExecutionContext(pSupporter, k, pMeterQueryInfo); + if (ret != TSDB_CODE_SUCCESS) { + pQInfo->killed = 1; + return NULL; + } } qTrace("QInfo:%p vid:%d sid:%d id:%s, query in cache, qrange:%lld-%lld, lastKey:%lld", pQInfo, pMeterObj->vnode, @@ -306,7 +310,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe if (pReqMeterDataInfo == NULL) { dError("QInfo:%p failed to allocate memory to perform query processing, abort", pQInfo); - pQInfo->code = TSDB_CODE_SERV_OUT_OF_MEMORY; + pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; pQInfo->killed = 1; return NULL; } @@ -338,7 +342,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe dError("QInfo:%p failed to allocate memory to perform query processing, abort", pQInfo); tfree(pReqMeterDataInfo); - pQInfo->code = TSDB_CODE_SERV_OUT_OF_MEMORY; + pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; pQInfo->killed = 1; return NULL; } @@ -393,7 +397,12 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe setExecutionContext(pSupporter, pSupporter->pResult, pOneMeterDataInfo->meterOrderIdx, pOneMeterDataInfo->groupIdx, pMeterQueryInfo); } else { // interval query - setIntervalQueryExecutionContext(pSupporter, pOneMeterDataInfo->meterOrderIdx, pMeterQueryInfo); + int32_t ret = setIntervalQueryExecutionContext(pSupporter, pOneMeterDataInfo->meterOrderIdx, pMeterQueryInfo); + if (ret != TSDB_CODE_SUCCESS) { + tfree(pReqMeterDataInfo); // error code has been set + pQInfo->killed = 1; + return NULL; + } } SCompBlock *pBlock = pInfoEx->pBlock.compBlock; @@ -900,7 +909,12 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { dTrace("QInfo:%p main scan completed, elapsed time: %lldms, supplementary scan start, order:%d", pQInfo, et - st, pQuery->order.order ^ 1); - doCloseAllOpenedResults(pSupporter); + // failed to save all intermediate results into disk, abort further query processing + if (doCloseAllOpenedResults(pSupporter) != TSDB_CODE_SUCCESS) { + dError("QInfo:%p failed to save intermediate results, abort further query processing", pQInfo); + return; + } + doMultiMeterSupplementaryScan(pQInfo); if (isQueryKilled(pQuery)) { @@ -911,12 +925,13 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { if (pQuery->nAggTimeInterval > 0) { assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0); - mergeMetersResultToOneGroups(pSupporter); - copyResToQueryResultBuf(pSupporter, pQuery); - + if (mergeMetersResultToOneGroups(pSupporter) == TSDB_CODE_SUCCESS) { + copyResToQueryResultBuf(pSupporter, pQuery); + #ifdef _DEBUG_VIEW - displayInterResult(pQuery->sdata, pQuery, pQuery->sdata[0]->len); + displayInterResult(pQuery->sdata, pQuery, pQuery->sdata[0]->len); #endif + } } else { // not a interval query copyFromGroupBuf(pQInfo, pSupporter->pResult); } diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index 81e4f6e370ae0a8591ec7c63fcdabc53732df2ec..d6f0796121ddc180325ad4cdf8e9012bd35cebfa 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -824,11 +824,11 @@ int vnodeRetrieveQueryInfo(void *handle, int *numOfRows, int *rowSize, int16_t * } if (pQInfo->killed) { - dTrace("QInfo:%p it is already killed, %p, code:%d", pQInfo, pQuery, pQInfo->code); + dTrace("QInfo:%p query is killed, %p, code:%d", pQInfo, pQuery, pQInfo->code); if (pQInfo->code == TSDB_CODE_SUCCESS) { return TSDB_CODE_QUERY_CANCELLED; } else { // in case of not TSDB_CODE_SUCCESS, return the code to client - return pQInfo->code; + return abs(pQInfo->code); } } @@ -837,8 +837,13 @@ int vnodeRetrieveQueryInfo(void *handle, int *numOfRows, int *rowSize, int16_t * *rowSize = pQuery->rowSize; *timePrec = vnodeList[pQInfo->pObj->vnode].cfg.precision; - - if (pQInfo->code < 0) return -pQInfo->code; + + dTrace("QInfo:%p, retrieve data info completed, precision:%d, rowsize:%d, rows:%d, code:%d", pQInfo, *timePrec, + *rowSize, *numOfRows, pQInfo->code); + + if (pQInfo->code < 0) { // less than 0 means there are error existed. + return -pQInfo->code; + } return TSDB_CODE_SUCCESS; } diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index f4b3cdbbe72a7adc2321987335f861ce9b2034c2..ba9e682f8b775e082542ebe834ccbd2f8ab1a52b 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -606,7 +606,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { if (tsAvailDataDirGB < tsMinimalDataDirGB) { dError("server disk space remain %.3f GB, need at least %.3f GB, stop writing", tsAvailDataDirGB, tsMinimalDataDirGB); - code = TSDB_CODE_SERVER_NO_SPACE; + code = TSDB_CODE_SERV_NO_DISKSPACE; goto _submit_over; }