diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 366faa15dca19392386683dcb499b8dc5b97334a..985a6741f95ded4e3f430e5ee656434d2f4cef42 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -514,7 +514,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { tscTansformSQLFunctionForSTableQuery(pQueryInfo); tscIncStreamExecutionCount(pSql->pStream); } else { - tscTrace("%p get tableMeta/metricMeta successfully", pSql); + tscTrace("%p get tableMeta successfully", pSql); } tscDoQuery(pSql); diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index d69f6d295f13bb9515b3c5800f4c4b4bb0764daa..0ca079bc6e0b6a460c140e154e36be886b2eecad 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -605,7 +605,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); -// (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pTableMetaInfo->pMetricMeta->numOfVnodes); + (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * 1); if (*pMemBuffer == NULL) { tscError("%p failed to allocate memory", pSql); pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 21ce270466f19396f095124eb1818d4b48a866ba..ccf187ac649578f026eb952e87ad45715279edd3 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1011,8 +1011,9 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - int32_t numOfSubQueries = 0; + int32_t numOfSubQueries = 1; // int32_t numOfSubQueries = pTableMetaInfo->pMetricMeta->numOfVnodes; + assert(numOfSubQueries > 0); int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); diff --git a/src/query/inc/qtsbuf.h b/src/query/inc/qtsbuf.h index 1afdb0cd6cb9175b14675d06446afbe3a891df27..8e014e5feb781b2c87209118d1c4c8a7c2d13813 100644 --- a/src/query/inc/qtsbuf.h +++ b/src/query/inc/qtsbuf.h @@ -48,10 +48,10 @@ typedef struct STSElem { } STSElem; typedef struct STSCursor { - int32_t vnodeIndex; - int32_t blockIndex; - int32_t tsIndex; - int32_t order; + int32_t vnodeIndex; + int32_t blockIndex; + int32_t tsIndex; + uint32_t order; } STSCursor; typedef struct STSBlock { diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 2e1d86a94261cfe692411fcd5ce0b566423aa991..5adce04efa44d7e85bc9ccf482dd21e0deff20b5 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -32,12 +32,6 @@ typedef struct SData { char data[]; } SData; -enum { -// ST_QUERY_KILLED = 0, // query killed - ST_QUERY_PAUSED = 1, // query paused, due to full of the response buffer - ST_QUERY_COMPLETED = 2, // query completed -}; - struct SColumnFilterElem; typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2); typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); @@ -60,18 +54,20 @@ typedef struct SWindowStatus { } SWindowStatus; typedef struct SWindowResult { - uint16_t numOfRows; + uint16_t numOfRows; // number of rows of current time window SPosInfo pos; // Position of current result in disk-based output buffer SResultInfo* resultInfo; // For each result column, there is a resultInfo STimeWindow window; // The time window that current result covers. - SWindowStatus status; + SWindowStatus status; // this result status: closed or opened } SWindowResult; typedef struct SResultRec { - int64_t total; - int64_t size; - int64_t capacity; - int32_t threshold; // the threshold size, when the number of rows in result buffer, return to client + int64_t total; // total generated result size in rows + int64_t size; // current result set size in rows + int64_t capacity; // capacity of current result output buffer + + // result size threshold in rows. If the result buffer is larger than this, pause query and return to client + int32_t threshold; } SResultRec; typedef struct SWindowResInfo { @@ -99,7 +95,6 @@ typedef struct SSingleColumnFilterInfo { void* pData; } SSingleColumnFilterInfo; -/* intermediate pos during multimeter query involves interval */ typedef struct STableQueryInfo { int64_t lastKey; STimeWindow win; @@ -107,7 +102,7 @@ typedef struct STableQueryInfo { int16_t queryRangeSet; // denote if the query range is set, only available for interval query int64_t tag; STSCursor cur; - int32_t sid; // for retrieve the page id list + int32_t tid; // for retrieve the page id list SWindowResInfo windowResInfo; } STableQueryInfo; @@ -116,7 +111,6 @@ typedef struct STableDataInfo { int32_t numOfBlocks; int32_t start; // start block index int32_t tableIndex; - void* pMeterObj; int32_t groupIdx; // group id in table list STableQueryInfo* pTableQInfo; } STableDataInfo; diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 2f219368080b135cb380293f565dfb332320b538..224512f0ae8f409bd6dae69dcfa07d8b0eebc28a 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -375,29 +375,6 @@ bool doRevisedResultsByLimit(SQInfo *pQInfo) { return false; } -/** - * - * @param pQuery - * @param pDataBlockInfo - * @param forwardStep - * @return TRUE means query not completed, FALSE means query is completed - */ -static bool queryPaused(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo, int32_t forwardStep) { - // output buffer is full, pause current query - if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { - // assert((QUERY_IS_ASC_QUERY(pQuery) && forwardStep + pQuery->pos <= pDataBlockInfo->size) || - // (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->pos - forwardStep + 1 >= 0)); - // - return true; - } - - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - return true; - } - - return false; -} - static bool isTopBottomQuery(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; @@ -1690,7 +1667,7 @@ bool notHasQueryTimeRange(SQuery *pQuery) { (pQuery->window.skey == INT64_MAX && pQuery->window.ekey == 0 && (!QUERY_IS_ASC_QUERY(pQuery))); } -bool needSupplementaryScan(SQuery *pQuery) { +static bool needReverseScan(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG) { @@ -2664,7 +2641,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { int32_t numOfRes = 0; SDataStatis *pStatis = NULL; - SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis); int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &numOfRes, &pRuntimeEnv->windowResInfo, pDataBlock); @@ -2950,7 +2926,7 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) return leftTimestamp > rightTimestamp ? 1 : -1; } -int32_t mergeMetersResultToOneGroups(SQInfo *pQInfo) { +int32_t mergeResultsToGroup(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; @@ -2990,7 +2966,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { pQInfo->numOfGroupResultPages = 0; // current results of group has been sent to client, try next group - if (mergeMetersResultToOneGroups(pQInfo) != TSDB_CODE_SUCCESS) { + if (mergeResultsToGroup(pQInfo) != TSDB_CODE_SUCCESS) { return; // failed to save data in the disk } @@ -3071,9 +3047,9 @@ int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInfo *pTableDat // todo opt for the case of one table per group int32_t numOfMeters = 0; for (int32_t i = start; i < end; ++i) { - int32_t sid = pTableDataInfo[i].pTableQInfo->sid; + int32_t tid = pTableDataInfo[i].pTableQInfo->tid; - SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, sid); + SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, tid); if (list.size > 0 && pTableDataInfo[i].pTableQInfo->windowResInfo.size > 0) { pTableList[numOfMeters] = &pTableDataInfo[i]; numOfMeters += 1; @@ -3240,10 +3216,9 @@ void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pRes } } -void setMeterDataInfo(STableDataInfo *pTableDataInfo, void *pMeterObj, int32_t meterIdx, int32_t groupId) { - pTableDataInfo->pMeterObj = pMeterObj; +void setTableDataInfo(STableDataInfo *pTableDataInfo, int32_t tableIndex, int32_t groupId) { pTableDataInfo->groupIdx = groupId; - pTableDataInfo->tableIndex = meterIdx; + pTableDataInfo->tableIndex = tableIndex; } static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *pWindowResInfo, int32_t order) { @@ -3297,7 +3272,7 @@ void disableFunctForTableSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order pQuery->order.order = pQuery->order.order ^ 1u; } -void disableFunctForSuppleScan(SQInfo *pQInfo, int32_t order) { +void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; @@ -3322,7 +3297,7 @@ void disableFunctForSuppleScan(SQInfo *pQInfo, int32_t order) { pQuery->order.order = (pQuery->order.order) ^ 1u; } -void enableFunctForMasterScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { +void enableFuncForForwardScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { SQuery *pQuery = pRuntimeEnv->pQuery; for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { @@ -3483,7 +3458,7 @@ static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) { SQuery * pQuery = pRuntimeEnv->pQuery; SQueryStatus qStatus = {0}; - if (!needSupplementaryScan(pQuery)) { + if (!needReverseScan(pQuery)) { return; } @@ -3503,7 +3478,7 @@ static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) { doScanAllDataBlocks(pRuntimeEnv); queryStatusRestore(pRuntimeEnv, &qStatus); - enableFunctForMasterScan(pRuntimeEnv, pQuery->order.order); + enableFuncForForwardScan(pRuntimeEnv, pQuery->order.order); SET_MASTER_SCAN_FLAG(pRuntimeEnv); } @@ -3562,7 +3537,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { return toContinue; } -void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { +void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); @@ -3588,10 +3563,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { break; } - /* - * set the correct start position, and load the corresponding block in buffer for next - * round scan all data blocks. - */ + // set the correct start position, and load the corresponding block in buffer for next round scan all data blocks. int32_t ret = tsdbDataBlockSeek(pRuntimeEnv->pQueryHandle, pos); status = pQuery->status; @@ -3675,18 +3647,13 @@ static bool hasMainOutput(SQuery *pQuery) { return false; } -STableQueryInfo *createMeterQueryInfo(SQInfo *pQInfo, int32_t sid, TSKEY skey, TSKEY ekey) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - +STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, int32_t tid, STimeWindow win) { STableQueryInfo *pTableQueryInfo = calloc(1, sizeof(STableQueryInfo)); - pTableQueryInfo->win = (STimeWindow){ - .skey = skey, - .ekey = ekey, - }; - pTableQueryInfo->lastKey = skey; + pTableQueryInfo->win = win; + pTableQueryInfo->lastKey = win.skey; - pTableQueryInfo->sid = sid; + pTableQueryInfo->tid = tid; pTableQueryInfo->cur.vnodeIndex = -1; initWindowResInfo(&pTableQueryInfo->windowResInfo, pRuntimeEnv, 100, 100, TSDB_DATA_TYPE_INT); @@ -3702,7 +3669,7 @@ void destroyMeterQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols) free(pTableQueryInfo); } -void changeMeterQueryInfoForSuppleQuery(SQuery *pQuery, STableQueryInfo *pTableQueryInfo, TSKEY skey, TSKEY ekey) { +void changeMeterQueryInfoForSuppleQuery(SQuery *pQuery, STableQueryInfo *pTableQueryInfo) { if (pTableQueryInfo == NULL) { return; } @@ -4014,9 +3981,8 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableDataInf } } -void stableApplyFunctionsOnBlock_(SQInfo *pQInfo, STableDataInfo *pTableDataInfo, SDataBlockInfo *pDataBlockInfo, +void stableApplyFunctionsOnBlock(SQueryRuntimeEnv* pRuntimeEnv, STableDataInfo *pTableDataInfo, SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis, SArray *pDataBlock, __block_search_fn_t searchFn) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; STableQueryInfo * pTableQueryInfo = pTableDataInfo->pTableQInfo; SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo; @@ -4363,40 +4329,32 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) { } } -static void queryOnDataBlocks(SQInfo *pQInfo, STableDataInfo *pMeterDataInfo) { +static int64_t queryOnDataBlocks(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - - // dTrace("QInfo:%p start to check data blocks in %d files", pQInfo, pVnodeFileInfo->numOfFiles); + + int64_t st = taosGetTimestampMs(); + size_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); + tsdb_query_handle_t *pQueryHandle = pRuntimeEnv->pQueryHandle; while (tsdbNextDataBlock(pQueryHandle)) { if (isQueryKilled(pQInfo)) { break; } - // prepare the STableDataInfo struct for each table - SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle); - // SMeterObj * pMeterObj = getMeterObj(pSupporter->pMetersHashTable, blockInfo.sid); - - // pQInfo->pObj = pMeterObj; - // pRuntimeEnv->pMeterObj = pMeterObj; - - STableDataInfo *pTableDataInfo = NULL; - // for (int32_t i = 0; i < pSupporter->pSidSet->numOfTables; ++i) { - // if (pMeterDataInfo[i].pMeterObj == pMeterObj) { - // pTableDataInfo = &pMeterDataInfo[i]; - // break; - // } - // } + STableDataInfo* pTableDataInfo = NULL; + + // todo opt performance + for(int32_t i = 0; i < numOfTables; ++i) { + if (pQInfo->pTableDataInfo[i].pTableQInfo->tid == blockInfo.sid) { + pTableDataInfo = &pQInfo->pTableDataInfo[i]; + } + } - assert(pTableDataInfo != NULL); + assert(pTableDataInfo != NULL && pTableDataInfo->pTableQInfo != NULL); STableQueryInfo *pTableQueryInfo = pTableDataInfo->pTableQInfo; - if (pTableDataInfo->pTableQInfo == NULL) { - // pTableDataInfo->pTableQInfo = createMeterQueryInfo(pQInfo, pMeterObj->sid, pQuery->skey, pQuery->ekey); - } - restoreIntervalQueryRange(pRuntimeEnv, pTableQueryInfo); SDataStatis *pStatis = NULL; @@ -4408,14 +4366,18 @@ static void queryOnDataBlocks(SQInfo *pQInfo, STableDataInfo *pMeterDataInfo) { } else { // interval query setIntervalQueryRange(pTableQueryInfo, pQInfo, nextKey); int32_t ret = setAdditionalInfo(pQInfo, pTableDataInfo->tableIndex, pTableQueryInfo); + if (ret != TSDB_CODE_SUCCESS) { - // pQInfo->killed = 1; - return; + pQInfo->code = ret; + return taosGetTimestampMs() - st; } } - // stableApplyFunctionsOnBlock_(pSupporter, pTableDataInfo, &blockInfo, pStatis, pDataBlock, searchFn); + stableApplyFunctionsOnBlock(pRuntimeEnv, pTableDataInfo, &blockInfo, pStatis, pDataBlock, binarySearchForKey); } + + int64_t et = taosGetTimestampMs(); + return et - st; } static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool *dataInCache, int32_t index, @@ -4499,7 +4461,7 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start pointInterpSupporterSetData(pQInfo, &pointInterpSupporter); pointInterpSupporterDestroy(&pointInterpSupporter); - vnodeScanAllData(pRuntimeEnv); + scanAllDataBlocks(pRuntimeEnv); // first/last_row query, do not invoke the finalize for super table query doFinalizeResult(pRuntimeEnv); @@ -4679,7 +4641,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { } } - vnodeScanAllData(pRuntimeEnv); + scanAllDataBlocks(pRuntimeEnv); pQuery->size = getNumOfResult(pRuntimeEnv); doSkipResults(pRuntimeEnv); @@ -4778,95 +4740,85 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { #endif } -static void doOrderedScan(SQInfo *pQInfo) { - SQuery *pQuery = &pQInfo->runtimeEnv.pQuery; -#if 0 -// if (pQInfo->runtimeEnv. == NULL) { -// pSupporter->pMeterDataInfo = calloc(pSupporter->pSidSet->numOfTables, sizeof(STableDataInfo)); -// } - - STableIdInfo **pMeterSidExtInfo = pSupporter->pMeterSidExtInfo; +static void createTableDataInfo(SQInfo* pQInfo) { + SQuery* pQuery = pQInfo->runtimeEnv.pQuery; - tSidSet* pSidset = pSupporter->pSidSet; - int32_t groupId = 0; + // todo make sure the table are added the reference count to gauranteed that all involved tables are valid + int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); - for (int32_t i = 0; i < pSidset->numOfTables; ++i) { // load all meter meta info - SMeterObj *pMeterObj = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[i]->sid); - if (pMeterObj == NULL) { - dError("QInfo:%p failed to find required sid:%d", pQInfo, pMeterSidExtInfo[i]->sid); - continue; + if (pQInfo->pTableDataInfo == NULL) { + pQInfo->pTableDataInfo = (STableDataInfo *)calloc(1, sizeof(STableDataInfo) * numOfTables); + if (pQInfo->pTableDataInfo == NULL) { + dError("QInfo:%p failed to allocate memory, %s", pQInfo, strerror(errno)); + pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; + return; } - if (i >= pSidset->starterPos[groupId + 1]) { - groupId += 1; + int32_t groupId = 0; + for (int32_t i = 0; i < numOfTables; ++i) { // load all meter meta info + STableId *id = taosArrayGet(pQInfo->pTableIdList, i); + STableDataInfo *pInfo = &pQInfo->pTableDataInfo[i]; + + setTableDataInfo(pInfo, i, groupId); + pInfo->pTableQInfo = createTableQueryInfo(&pQInfo->runtimeEnv, id->tid, pQuery->window); } - - STableDataInfo *pOneMeterDataInfo = &pSupporter->pMeterDataInfo[i]; - assert(pOneMeterDataInfo->pMeterObj == NULL); - - setMeterDataInfo(pOneMeterDataInfo, pMeterObj, i, groupId); - pOneMeterDataInfo->pTableQInfo = createMeterQueryInfo(pSupporter, pMeterObj->sid, pQuery->skey, pQuery->ekey); } - - queryOnDataBlocks(pQInfo, pSupporter->pMeterDataInfo); - if (pQInfo->code != TSDB_CODE_SUCCESS) { - return; - } -#endif } -static void setupMeterQueryInfoForSupplementQuery(SQInfo *pQInfo) { +static void prepareQueryInfoForReverseScan(SQInfo *pQInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - - int32_t num = taosHashGetSize(pQInfo->pTableIdList); - for (int32_t i = 0; i < num; ++i) { - // STableQueryInfo *pTableQueryInfo = pSupporter->pMeterDataInfo[i].pTableQInfo; - // changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey); + size_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); + + for (int32_t i = 0; i < numOfTables; ++i) { + STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; + changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo); } } -static void doMultiMeterSupplementaryScan(SQInfo *pQInfo) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; - - if (!needSupplementaryScan(pQuery)) { - dTrace("QInfo:%p no need to do supplementary scan, query completed", pQInfo); - return; - } - +static void doSaveContext(SQInfo* pQInfo) { + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery* pQuery = pRuntimeEnv->pQuery; + SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); - // disableFunctForSuppleScan(pSupporter, pQuery->order.order); - + disableFuncForReverseScan(pQInfo, pQuery->order.order); + if (pRuntimeEnv->pTSBuf != NULL) { pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1u; } - -#if 0 - SWAP(pSupporter->rawSKey, pSupporter->rawEKey, TSKEY); - setupMeterQueryInfoForSupplementQuery(pSupporter); - - int64_t st = taosGetTimestampMs(); - - doOrderedScan(pQInfo); - int64_t et = taosGetTimestampMs(); - dTrace("QInfo:%p supplementary scan completed, elapsed time: %lldms", pQInfo, et - st); + SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); + prepareQueryInfoForReverseScan(pQInfo); +} + +static void doRestoreContext(SQInfo* pQInfo) { + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery* pQuery = pRuntimeEnv->pQuery; - /* - * restore the env - * the meter query info is not reset to the original state - */ - SWAP(pSupporter->rawSKey, pSupporter->rawEKey, TSKEY); - enableFunctForMasterScan(pRuntimeEnv, pQuery->order.order); + SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); if (pRuntimeEnv->pTSBuf != NULL) { pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1; } -#endif + + enableFuncForForwardScan(pRuntimeEnv, pQuery->order.order); SET_MASTER_SCAN_FLAG(pRuntimeEnv); } -static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { +static void doCloseAllTimeWindowAfterScan(SQInfo* pQInfo) { + SQuery* pQuery = pQInfo->runtimeEnv.pQuery; + size_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); + + if (isIntervalQuery(pQuery)) { + for (int32_t i = 0; i < numOfTables; ++i) { + STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; + closeAllTimeWindow(&pTableQueryInfo->windowResInfo); + } + } else { // close results for group result + closeAllTimeWindow(&pQInfo->runtimeEnv.windowResInfo); + } +} + +static void multiTableQueryProcess(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; @@ -4875,7 +4827,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { * if the subgroupIdx > 0, the query process must be completed yet, we only need to * copy the data into output buffer */ - if (pQuery->intervalTime > 0) { + if (isIntervalQuery(pQuery)) { copyResToQueryResultBuf(pQInfo, pQuery); #ifdef _DEBUG_VIEW @@ -4894,46 +4846,49 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { dTrace("QInfo:%p current:%lldd, total:%lldd", pQInfo, pQuery->rec.size, pQuery->rec.total); return; } -#if 0 - pSupporter->pMeterDataInfo = (STableDataInfo *)calloc(1, sizeof(STableDataInfo) * pSupporter->numOfMeters); - if (pSupporter->pMeterDataInfo == NULL) { - dError("QInfo:%p failed to allocate memory, %s", pQInfo, strerror(errno)); - pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; - return; - } - dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", order:%d, group:%d", pQInfo, pSupporter->rawSKey, - pSupporter->rawEKey, pQuery->order.order, pSupporter->pSidSet->numOfSubSet); + dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", order:%d, forward scan start", pQInfo, pQuery->window.skey, + pQuery->window.ekey, pQuery->order.order); - dTrace("QInfo:%p main query scan start", pQInfo); - int64_t st = taosGetTimestampMs(); - doOrderedScan(pQInfo); + // create the query support structures + createTableDataInfo(pQInfo); - int64_t et = taosGetTimestampMs(); - dTrace("QInfo:%p main scan completed, elapsed time: %lldms, supplementary scan start, order:%d", pQInfo, et - st, + // do check all qualified data blocks + int64_t el = queryOnDataBlocks(pQInfo); + dTrace("QInfo:%p forward scan completed, elapsed time: %lldms, reversed scan start, order:%d", pQInfo, el, pQuery->order.order ^ 1u); - if (pQuery->intervalTime > 0) { - for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) { - STableQueryInfo *pTableQueryInfo = pSupporter->pMeterDataInfo[i].pTableQInfo; - closeAllTimeWindow(&pTableQueryInfo->windowResInfo); - } - } else { // close results for group result - closeAllTimeWindow(&pRuntimeEnv->windowResInfo); + // query error occurred or query is killed, abort current execution + if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) { + dTrace("QInfo:%p query killed or error occurred, code:%d, abort", pQInfo, pQInfo->code); + return; } - doMultiMeterSupplementaryScan(pQInfo); + // close all time window results + doCloseAllTimeWindowAfterScan(pQInfo); - if (isQueryKilled(pQInfo)) { - dTrace("QInfo:%p query killed, abort", pQInfo); + if (needReverseScan(pQuery)) { + doSaveContext(pQInfo); + + el = queryOnDataBlocks(pQInfo); + dTrace("QInfo:%p reversed scan completed, elapsed time: %lldms", pQInfo, el); + + doRestoreContext(pQInfo); + } else { + dTrace("QInfo:%p no need to do reversed scan, query completed", pQInfo); return; } - if (pQuery->intervalTime > 0 || isSumAvgRateQuery(pQuery)) { - assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0); + if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) { + dTrace("QInfo:%p query killed or error occurred, code:%d, abort", pQInfo, pQInfo->code); + return; + } + + if (isIntervalQuery(pQuery) || isSumAvgRateQuery(pQuery)) { +// assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0); - if (mergeMetersResultToOneGroups(pSupporter) == TSDB_CODE_SUCCESS) { - copyResToQueryResultBuf(pSupporter, pQuery); + if (mergeResultsToGroup(pQInfo) == TSDB_CODE_SUCCESS) { + copyResToQueryResultBuf(pQInfo, pQuery); #ifdef _DEBUG_VIEW displayInterResult(pQuery->sdata, pQuery, pQuery->sdata[0]->len); @@ -4944,10 +4899,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { } // handle the limitation of output buffer - pQInfo->size += pQuery->size; - dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->size, pQInfo->size, - pQInfo->pointsReturned); -#endif + dTrace("QInfo:%p points returned:%d, total:%d", pQInfo, pQuery->rec.size, pQuery->rec.total); } /* @@ -4960,7 +4912,7 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - vnodeScanAllData(pRuntimeEnv); + scanAllDataBlocks(pRuntimeEnv); doFinalizeResult(pRuntimeEnv); if (isQueryKilled(pQInfo)) { @@ -4993,7 +4945,7 @@ static void tableMultiOutputProcessor(SQInfo *pQInfo) { } while (1) { - vnodeScanAllData(pRuntimeEnv); + scanAllDataBlocks(pRuntimeEnv); doFinalizeResult(pRuntimeEnv); if (isQueryKilled(pQInfo)) { @@ -5039,7 +4991,7 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) { while (1) { initCtxOutputBuf(pRuntimeEnv); - vnodeScanAllData(pRuntimeEnv); + scanAllDataBlocks(pRuntimeEnv); if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { return; @@ -5076,7 +5028,7 @@ static void tableIntervalProcessor(SQInfo *pQInfo) { while (1) { tableIntervalProcessImpl(pRuntimeEnv); - if (pQuery->intervalTime > 0) { + if (isIntervalQuery(pQuery)) { pQInfo->subgroupIdx = 0; // always start from 0 pQuery->rec.size = 0; copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); @@ -5213,15 +5165,15 @@ static void singleTableQueryImpl(SQInfo* pQInfo) { sem_post(&pQInfo->dataReady); } -void multiTableQueryImpl(SQInfo* pQInfo) { +static void multiTableQueryImpl(SQInfo* pQInfo) { SQuery* pQuery = pQInfo->runtimeEnv.pQuery; pQuery->rec.size = 0; int64_t st = taosGetTimestampUs(); - if (pQuery->intervalTime > 0 || + if (isIntervalQuery(pQuery) || (isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !isGroupbyNormalCol(pQuery->pGroupbyExpr))) { - vnodeMultiMeterQueryProcessor(pQInfo); + multiTableQueryProcess(pQInfo); } else { assert((pQuery->checkBufferInLoop == 1 && pQuery->intervalTime == 0) || isPointInterpoQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)); @@ -5242,29 +5194,6 @@ void multiTableQueryImpl(SQInfo* pQInfo) { sem_post(&pQInfo->dataReady); } -void qTableQuery(SQInfo *pQInfo) { - if (pQInfo == NULL || pQInfo->signature != pQInfo) { - dTrace("%p freed abort query", pQInfo); - return; - } - - if (isQueryKilled(pQInfo)) { - dTrace("QInfo:%p it is already killed, abort", pQInfo); - return; - } - - dTrace("QInfo:%p query task is launched", pQInfo); - - int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); - if (numOfTables == 1) { - singleTableQueryImpl(pQInfo); - } else { - multiTableQueryImpl(pQInfo); - } - - // vnodeDecRefCount(pQInfo); -} - static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncExprMsg *pExprMsg) { int32_t j = 0; @@ -5284,7 +5213,7 @@ bool vnodeValidateExprColumnInfo(SQueryTableMsg *pQueryMsg, SSqlFuncExprMsg *pEx return j < pQueryMsg->numOfCols; } -static int32_t validateQueryMeterMsg(SQueryTableMsg *pQueryMsg) { +static int32_t validateQueryMsg(SQueryTableMsg *pQueryMsg) { if (pQueryMsg->intervalTime < 0) { dError("qmsg:%p illegal value of aggTimeInterval %" PRId64 "", pQueryMsg, pQueryMsg->intervalTime); return -1; @@ -5349,7 +5278,7 @@ static char* createTableIdList(SQueryTableMsg* pQueryMsg, char* pMsg, SArray** p * @return */ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr, - wchar_t** tagCond, char** nameCond) { + wchar_t** tagCond) { pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables); pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey); @@ -5374,7 +5303,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pQueryMsg->tsOrder = htonl(pQueryMsg->tsOrder); // query msg safety check - if (validateQueryMeterMsg(pQueryMsg) != 0) { + if (validateQueryMsg(pQueryMsg) != 0) { return TSDB_CODE_INVALID_QUERY_MSG; } @@ -5503,10 +5432,6 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, *tagCond = calloc(1, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE); memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE); } - - if (pQueryMsg->nameCondLen > 0) { - *nameCond = strndup(pMsg, pQueryMsg->nameCondLen); - } dTrace("qmsg:%p query on %d meter(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, numOfTagCols:%d, " "timestamp order:%d, tags order:%d, tags order col:%d, numOfOutputCols:%d, numOfCols:%d, interval:%" PRId64 @@ -5919,7 +5844,7 @@ _clean_memory: return NULL; } -bool isQInfoValid(void *param) { +static bool isValidQInfo(void *param) { SQInfo *pQInfo = (SQInfo *)param; if (pQInfo == NULL) { return false; @@ -5933,11 +5858,66 @@ bool isQInfoValid(void *param) { return (sig == (uint64_t)pQInfo); } -void vnodeFreeQInfo(SQInfo *pQInfo) { - if (!isQInfoValid(pQInfo)) { - return; +static void freeQInfo(SQInfo *pQInfo); +static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs, + SArray *pTableIdList, void* tsdb, SQInfo **pQInfo) { + int32_t code = TSDB_CODE_SUCCESS; + + (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs, pTableIdList); + if ((*pQInfo) == NULL) { + code = TSDB_CODE_SERV_OUT_OF_MEMORY; + goto _error; } + SQuery *pQuery = (*pQInfo)->runtimeEnv.pQuery; + dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo); + + pQuery->window.skey = pQueryMsg->window.skey; + pQuery->window.ekey = pQueryMsg->window.ekey; + pQuery->lastKey = pQuery->window.skey; + + if (sem_init(&(*pQInfo)->dataReady, 0, 0) != 0) { + dError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, strerror(errno)); + code = TSDB_CODE_APP_ERROR; + goto _error; + } + + vnodeParametersSafetyCheck(pQuery); + + STSBuf *pTSBuf = NULL; + if (pQueryMsg->tsLen > 0) { // open new file to save the result + char *tsBlock = (char *)pQueryMsg + pQueryMsg->tsOffset; + pTSBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder); + + tsBufResetPos(pTSBuf); + tsBufNextPos(pTSBuf); + } + + // filter the qualified + if ((code = initQInfo(*pQInfo, pTSBuf, tsdb)) != TSDB_CODE_SUCCESS) { + goto _error; + } + + // if (pQInfo->over == 1) { + // vnodeAddRefCount(pQInfo); // for retrieve procedure + // return pQInfo; + // } + + // dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo, + // pQInfo->refCount); + return code; + +_error: + // table query ref will be decrease during error handling + freeQInfo(*pQInfo); + return code; +} + +static void freeQInfo(SQInfo *pQInfo) { + if (!isValidQInfo(pQInfo)) { + return; + } + SQuery* pQuery = pQInfo->runtimeEnv.pQuery; setQueryKilled(pQInfo); @@ -5945,7 +5925,7 @@ void vnodeFreeQInfo(SQInfo *pQInfo) { for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { tfree(pQuery->sdata[col]); } - + // for (int col = 0; col < pQuery->numOfCols; ++col) { // vnodeFreeColumnInfo(&pQuery->colList[col].data); // } @@ -5953,100 +5933,102 @@ void vnodeFreeQInfo(SQInfo *pQInfo) { // if (pQuery->colList[0].colIdx != PRIMARYKEY_TIMESTAMP_COL_INDEX) { // tfree(pQuery->tsData); // } - + sem_destroy(&(pQInfo->dataReady)); vnodeQueryFreeQInfoEx(pQInfo); - + for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) { SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i]; if (pColFilter->numOfFilters > 0) { tfree(pColFilter->pFilters); } } - + tfree(pQuery->pFilterInfo); tfree(pQuery->colList); tfree(pQuery->sdata); - + if (pQuery->pSelectExpr != NULL) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { SSqlBinaryExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo; - + if (pBinExprInfo->numOfCols > 0) { tfree(pBinExprInfo->pReqColumns); tSQLBinaryExprDestroy(&pBinExprInfo->pBinExpr, NULL); } } - + tfree(pQuery->pSelectExpr); } - + if (pQuery->defaultVal != NULL) { tfree(pQuery->defaultVal); } - + tfree(pQuery->pGroupbyExpr); tfree(pQuery); - + dTrace("QInfo:%p QInfo is freed", pQInfo); - + // destroy signature, in order to avoid the query process pass the object safety check memset(pQInfo, 0, sizeof(SQInfo)); tfree(pQInfo); } -static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs, - SArray *pTableIdList, void* tsdb, SQInfo **pQInfo) { - int32_t code = TSDB_CODE_SUCCESS; +static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) { + SQuery* pQuery = pQInfo->runtimeEnv.pQuery; - (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs, pTableIdList); - if ((*pQInfo) == NULL) { - code = TSDB_CODE_SERV_OUT_OF_MEMORY; - goto _error; - } - - SQuery *pQuery = (*pQInfo)->runtimeEnv.pQuery; - dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo); - - pQuery->window.skey = pQueryMsg->window.skey; - pQuery->window.ekey = pQueryMsg->window.ekey; - pQuery->lastKey = pQuery->window.skey; - - if (sem_init(&(*pQInfo)->dataReady, 0, 0) != 0) { - dError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, strerror(errno)); - code = TSDB_CODE_APP_ERROR; - goto _error; - } - - vnodeParametersSafetyCheck(pQuery); - - STSBuf *pTSBuf = NULL; - if (pQueryMsg->tsLen > 0) { // open new file to save the result - char *tsBlock = (char *)pQueryMsg + pQueryMsg->tsOffset; - pTSBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder); - - tsBufResetPos(pTSBuf); - tsBufNextPos(pTSBuf); + /* + * get the file size and set the numOfRows to be the file size, since for tsComp query, + * the returned row size is equalled to 1 + * TODO handle the case that the file is too large to send back one time + */ + if (isTSCompQuery(pQuery) && (*numOfRows) > 0) { + struct stat fstat; + if (stat(pQuery->sdata[0]->data, &fstat) == 0) { + *numOfRows = fstat.st_size; + return fstat.st_size; + } else { + dError("QInfo:%p failed to get file info, path:%s, reason:%s", pQInfo, pQuery->sdata[0]->data, strerror(errno)); + return 0; + } + } else { + return pQuery->rowSize * (*numOfRows); } +} - // filter the qualified - if ((code = initQInfo(*pQInfo, pTSBuf, tsdb)) != TSDB_CODE_SUCCESS) { - goto _error; +static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { + // the remained number of retrieved rows, not the interpolated result + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + + // load data from file to msg buffer + if (isTSCompQuery(pQuery)) { + int32_t fd = open(pQuery->sdata[0]->data, O_RDONLY, 0666); + + // make sure file exist + if (FD_VALID(fd)) { + size_t s = lseek(fd, 0, SEEK_END); + dTrace("QInfo:%p ts comp data return, file:%s, size:%zu", pQInfo, pQuery->sdata[0]->data, s); + + lseek(fd, 0, SEEK_SET); + read(fd, data, s); + close(fd); + + unlink(pQuery->sdata[0]->data); + } else { + dError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo, + pQuery->sdata[0]->data, strerror(errno)); + } + } else { + doCopyQueryResultToMsg(pQInfo, pQuery->rec.size, data); } - - // if (pQInfo->over == 1) { - // vnodeAddRefCount(pQInfo); // for retrieve procedure - // return pQInfo; - // } - - // dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo, - // pQInfo->refCount); - return code; - -_error: - // table query ref will be decrease during error handling - vnodeFreeQInfo(*pQInfo); - return code; + + pQuery->rec.total += pQuery->rec.size; + dTrace("QInfo:%p current:%d, total:%d", pQInfo, pQuery->rec.size, pQuery->rec.total); + + return TSDB_CODE_SUCCESS; + + // todo if interpolation exists, the result may be dump to client by several rounds } int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo) { @@ -6057,9 +6039,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo) SArray *pTableIdList = NULL; SSqlFuncExprMsg** pExprMsg = NULL; wchar_t* tagCond = NULL; - char* nameCond = NULL; - - if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &nameCond)) != TSDB_CODE_SUCCESS) { + if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond)) != TSDB_CODE_SUCCESS) { return code; } @@ -6126,8 +6106,31 @@ _query_over: return TSDB_CODE_SUCCESS; } +void qTableQuery(SQInfo *pQInfo) { + if (pQInfo == NULL || pQInfo->signature != pQInfo) { + dTrace("%p freed abort query", pQInfo); + return; + } + + if (isQueryKilled(pQInfo)) { + dTrace("QInfo:%p it is already killed, abort", pQInfo); + return; + } + + dTrace("QInfo:%p query task is launched", pQInfo); + + int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); + if (numOfTables == 1) { + singleTableQueryImpl(pQInfo); + } else { + multiTableQueryImpl(pQInfo); + } + + // vnodeDecRefCount(pQInfo); +} + int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) { - if (pQInfo == NULL || !isQInfoValid(pQInfo)) { + if (pQInfo == NULL || !isValidQInfo(pQInfo)) { return TSDB_CODE_INVALID_QHANDLE; } @@ -6148,62 +6151,6 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) { return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code); } -static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) { - SQuery* pQuery = pQInfo->runtimeEnv.pQuery; - - /* - * get the file size and set the numOfRows to be the file size, since for tsComp query, - * the returned row size is equalled to 1 - * TODO handle the case that the file is too large to send back one time - */ - if (isTSCompQuery(pQuery) && (*numOfRows) > 0) { - struct stat fstat; - if (stat(pQuery->sdata[0]->data, &fstat) == 0) { - *numOfRows = fstat.st_size; - return fstat.st_size; - } else { - dError("QInfo:%p failed to get file info, path:%s, reason:%s", pQInfo, pQuery->sdata[0]->data, strerror(errno)); - return 0; - } - } else { - return pQuery->rowSize * (*numOfRows); - } -} - -static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { - // the remained number of retrieved rows, not the interpolated result - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - - // load data from file to msg buffer - if (isTSCompQuery(pQuery)) { - int32_t fd = open(pQuery->sdata[0]->data, O_RDONLY, 0666); - - // make sure file exist - if (FD_VALID(fd)) { - size_t s = lseek(fd, 0, SEEK_END); - dTrace("QInfo:%p ts comp data return, file:%s, size:%zu", pQInfo, pQuery->sdata[0]->data, s); - - lseek(fd, 0, SEEK_SET); - read(fd, data, s); - close(fd); - - unlink(pQuery->sdata[0]->data); - } else { - dError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo, - pQuery->sdata[0]->data, strerror(errno)); - } - } else { - doCopyQueryResultToMsg(pQInfo, pQuery->rec.size, data); - } - - pQuery->rec.total += pQuery->rec.size; - dTrace("QInfo:%p current:%d, total:%d", pQInfo, pQuery->rec.size, pQuery->rec.total); - - return TSDB_CODE_SUCCESS; - - // todo if interpolation exists, the result may be dump to client by several rounds -} - bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) { if (pQInfo == NULL || pQInfo->signature != pQInfo || pQInfo->code != TSDB_CODE_SUCCESS) { return false; @@ -6222,7 +6169,7 @@ bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) { } int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen) { - if (pQInfo == NULL || !isQInfoValid(pQInfo)) { + if (pQInfo == NULL || !isValidQInfo(pQInfo)) { return TSDB_CODE_INVALID_QHANDLE; } @@ -6251,7 +6198,7 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { (*pRsp)->completed = 1; // notify no more result to client - vnodeFreeQInfo(pQInfo); + freeQInfo(pQInfo); } return code; diff --git a/src/vnode/tsdb/src/tsdbMeta.c b/src/vnode/tsdb/src/tsdbMeta.c index 761006a0095db206eb7be93fef456a7de1951df3..1dd1c3b29d1dd1d805169e85c39c04d235ad1016 100644 --- a/src/vnode/tsdb/src/tsdbMeta.c +++ b/src/vnode/tsdb/src/tsdbMeta.c @@ -349,10 +349,10 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { } else { // add non-super table to the array pMeta->tables[pTable->tableId.tid] = pTable; - if (pTable->type == TSDB_CHILD_TABLE) { - // add STABLE to the index + if (pTable->type == TSDB_CHILD_TABLE && addIdx) { // add STABLE to the index tsdbAddTableIntoIndex(pMeta, pTable); } + pMeta->nTables++; }