提交 e45701a6 编写于 作者: H hjxilinx

[TD-32] fix bugs in super table query

上级 ff144921
......@@ -514,7 +514,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscTansformSQLFunctionForSTableQuery(pQueryInfo);
tscIncStreamExecutionCount(pSql->pStream);
} else {
tscTrace("%p get tableMeta/metricMeta successfully", pSql);
tscTrace("%p get tableMeta successfully", pSql);
}
tscDoQuery(pSql);
......
......@@ -605,7 +605,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pTableMetaInfo->pMetricMeta->numOfVnodes);
(*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * 1);
if (*pMemBuffer == NULL) {
tscError("%p failed to allocate memory", pSql);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
......
......@@ -1011,8 +1011,9 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t numOfSubQueries = 0;
int32_t numOfSubQueries = 1;
// int32_t numOfSubQueries = pTableMetaInfo->pMetricMeta->numOfVnodes;
assert(numOfSubQueries > 0);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
......
......@@ -48,10 +48,10 @@ typedef struct STSElem {
} STSElem;
typedef struct STSCursor {
int32_t vnodeIndex;
int32_t blockIndex;
int32_t tsIndex;
int32_t order;
int32_t vnodeIndex;
int32_t blockIndex;
int32_t tsIndex;
uint32_t order;
} STSCursor;
typedef struct STSBlock {
......
......@@ -32,12 +32,6 @@ typedef struct SData {
char data[];
} SData;
enum {
// ST_QUERY_KILLED = 0, // query killed
ST_QUERY_PAUSED = 1, // query paused, due to full of the response buffer
ST_QUERY_COMPLETED = 2, // query completed
};
struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
......@@ -60,18 +54,20 @@ typedef struct SWindowStatus {
} SWindowStatus;
typedef struct SWindowResult {
uint16_t numOfRows;
uint16_t numOfRows; // number of rows of current time window
SPosInfo pos; // Position of current result in disk-based output buffer
SResultInfo* resultInfo; // For each result column, there is a resultInfo
STimeWindow window; // The time window that current result covers.
SWindowStatus status;
SWindowStatus status; // this result status: closed or opened
} SWindowResult;
typedef struct SResultRec {
int64_t total;
int64_t size;
int64_t capacity;
int32_t threshold; // the threshold size, when the number of rows in result buffer, return to client
int64_t total; // total generated result size in rows
int64_t size; // current result set size in rows
int64_t capacity; // capacity of current result output buffer
// result size threshold in rows. If the result buffer is larger than this, pause query and return to client
int32_t threshold;
} SResultRec;
typedef struct SWindowResInfo {
......@@ -99,7 +95,6 @@ typedef struct SSingleColumnFilterInfo {
void* pData;
} SSingleColumnFilterInfo;
/* intermediate pos during multimeter query involves interval */
typedef struct STableQueryInfo {
int64_t lastKey;
STimeWindow win;
......@@ -107,7 +102,7 @@ typedef struct STableQueryInfo {
int16_t queryRangeSet; // denote if the query range is set, only available for interval query
int64_t tag;
STSCursor cur;
int32_t sid; // for retrieve the page id list
int32_t tid; // for retrieve the page id list
SWindowResInfo windowResInfo;
} STableQueryInfo;
......@@ -116,7 +111,6 @@ typedef struct STableDataInfo {
int32_t numOfBlocks;
int32_t start; // start block index
int32_t tableIndex;
void* pMeterObj;
int32_t groupIdx; // group id in table list
STableQueryInfo* pTableQInfo;
} STableDataInfo;
......
......@@ -375,29 +375,6 @@ bool doRevisedResultsByLimit(SQInfo *pQInfo) {
return false;
}
/**
*
* @param pQuery
* @param pDataBlockInfo
* @param forwardStep
* @return TRUE means query not completed, FALSE means query is completed
*/
static bool queryPaused(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo, int32_t forwardStep) {
// output buffer is full, pause current query
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
// assert((QUERY_IS_ASC_QUERY(pQuery) && forwardStep + pQuery->pos <= pDataBlockInfo->size) ||
// (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->pos - forwardStep + 1 >= 0));
//
return true;
}
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
return true;
}
return false;
}
static bool isTopBottomQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
......@@ -1690,7 +1667,7 @@ bool notHasQueryTimeRange(SQuery *pQuery) {
(pQuery->window.skey == INT64_MAX && pQuery->window.ekey == 0 && (!QUERY_IS_ASC_QUERY(pQuery)));
}
bool needSupplementaryScan(SQuery *pQuery) {
static bool needReverseScan(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG) {
......@@ -2664,7 +2641,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
int32_t numOfRes = 0;
SDataStatis *pStatis = NULL;
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis);
int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &numOfRes,
&pRuntimeEnv->windowResInfo, pDataBlock);
......@@ -2950,7 +2926,7 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param)
return leftTimestamp > rightTimestamp ? 1 : -1;
}
int32_t mergeMetersResultToOneGroups(SQInfo *pQInfo) {
int32_t mergeResultsToGroup(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
......@@ -2990,7 +2966,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
pQInfo->numOfGroupResultPages = 0;
// current results of group has been sent to client, try next group
if (mergeMetersResultToOneGroups(pQInfo) != TSDB_CODE_SUCCESS) {
if (mergeResultsToGroup(pQInfo) != TSDB_CODE_SUCCESS) {
return; // failed to save data in the disk
}
......@@ -3071,9 +3047,9 @@ int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInfo *pTableDat
// todo opt for the case of one table per group
int32_t numOfMeters = 0;
for (int32_t i = start; i < end; ++i) {
int32_t sid = pTableDataInfo[i].pTableQInfo->sid;
int32_t tid = pTableDataInfo[i].pTableQInfo->tid;
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, sid);
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, tid);
if (list.size > 0 && pTableDataInfo[i].pTableQInfo->windowResInfo.size > 0) {
pTableList[numOfMeters] = &pTableDataInfo[i];
numOfMeters += 1;
......@@ -3240,10 +3216,9 @@ void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pRes
}
}
void setMeterDataInfo(STableDataInfo *pTableDataInfo, void *pMeterObj, int32_t meterIdx, int32_t groupId) {
pTableDataInfo->pMeterObj = pMeterObj;
void setTableDataInfo(STableDataInfo *pTableDataInfo, int32_t tableIndex, int32_t groupId) {
pTableDataInfo->groupIdx = groupId;
pTableDataInfo->tableIndex = meterIdx;
pTableDataInfo->tableIndex = tableIndex;
}
static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *pWindowResInfo, int32_t order) {
......@@ -3297,7 +3272,7 @@ void disableFunctForTableSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order
pQuery->order.order = pQuery->order.order ^ 1u;
}
void disableFunctForSuppleScan(SQInfo *pQInfo, int32_t order) {
void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
......@@ -3322,7 +3297,7 @@ void disableFunctForSuppleScan(SQInfo *pQInfo, int32_t order) {
pQuery->order.order = (pQuery->order.order) ^ 1u;
}
void enableFunctForMasterScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) {
void enableFuncForForwardScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
......@@ -3483,7 +3458,7 @@ static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQueryStatus qStatus = {0};
if (!needSupplementaryScan(pQuery)) {
if (!needReverseScan(pQuery)) {
return;
}
......@@ -3503,7 +3478,7 @@ static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) {
doScanAllDataBlocks(pRuntimeEnv);
queryStatusRestore(pRuntimeEnv, &qStatus);
enableFunctForMasterScan(pRuntimeEnv, pQuery->order.order);
enableFuncForForwardScan(pRuntimeEnv, pQuery->order.order);
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
}
......@@ -3562,7 +3537,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
return toContinue;
}
void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
......@@ -3588,10 +3563,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
break;
}
/*
* set the correct start position, and load the corresponding block in buffer for next
* round scan all data blocks.
*/
// set the correct start position, and load the corresponding block in buffer for next round scan all data blocks.
int32_t ret = tsdbDataBlockSeek(pRuntimeEnv->pQueryHandle, pos);
status = pQuery->status;
......@@ -3675,18 +3647,13 @@ static bool hasMainOutput(SQuery *pQuery) {
return false;
}
STableQueryInfo *createMeterQueryInfo(SQInfo *pQInfo, int32_t sid, TSKEY skey, TSKEY ekey) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, int32_t tid, STimeWindow win) {
STableQueryInfo *pTableQueryInfo = calloc(1, sizeof(STableQueryInfo));
pTableQueryInfo->win = (STimeWindow){
.skey = skey,
.ekey = ekey,
};
pTableQueryInfo->lastKey = skey;
pTableQueryInfo->win = win;
pTableQueryInfo->lastKey = win.skey;
pTableQueryInfo->sid = sid;
pTableQueryInfo->tid = tid;
pTableQueryInfo->cur.vnodeIndex = -1;
initWindowResInfo(&pTableQueryInfo->windowResInfo, pRuntimeEnv, 100, 100, TSDB_DATA_TYPE_INT);
......@@ -3702,7 +3669,7 @@ void destroyMeterQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols)
free(pTableQueryInfo);
}
void changeMeterQueryInfoForSuppleQuery(SQuery *pQuery, STableQueryInfo *pTableQueryInfo, TSKEY skey, TSKEY ekey) {
void changeMeterQueryInfoForSuppleQuery(SQuery *pQuery, STableQueryInfo *pTableQueryInfo) {
if (pTableQueryInfo == NULL) {
return;
}
......@@ -4014,9 +3981,8 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableDataInf
}
}
void stableApplyFunctionsOnBlock_(SQInfo *pQInfo, STableDataInfo *pTableDataInfo, SDataBlockInfo *pDataBlockInfo,
void stableApplyFunctionsOnBlock(SQueryRuntimeEnv* pRuntimeEnv, STableDataInfo *pTableDataInfo, SDataBlockInfo *pDataBlockInfo,
SDataStatis *pStatis, SArray *pDataBlock, __block_search_fn_t searchFn) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
STableQueryInfo * pTableQueryInfo = pTableDataInfo->pTableQInfo;
SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
......@@ -4363,40 +4329,32 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
static void queryOnDataBlocks(SQInfo *pQInfo, STableDataInfo *pMeterDataInfo) {
static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
// dTrace("QInfo:%p start to check data blocks in %d files", pQInfo, pVnodeFileInfo->numOfFiles);
int64_t st = taosGetTimestampMs();
size_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList);
tsdb_query_handle_t *pQueryHandle = pRuntimeEnv->pQueryHandle;
while (tsdbNextDataBlock(pQueryHandle)) {
if (isQueryKilled(pQInfo)) {
break;
}
// prepare the STableDataInfo struct for each table
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle);
// SMeterObj * pMeterObj = getMeterObj(pSupporter->pMetersHashTable, blockInfo.sid);
// pQInfo->pObj = pMeterObj;
// pRuntimeEnv->pMeterObj = pMeterObj;
STableDataInfo *pTableDataInfo = NULL;
// for (int32_t i = 0; i < pSupporter->pSidSet->numOfTables; ++i) {
// if (pMeterDataInfo[i].pMeterObj == pMeterObj) {
// pTableDataInfo = &pMeterDataInfo[i];
// break;
// }
// }
STableDataInfo* pTableDataInfo = NULL;
// todo opt performance
for(int32_t i = 0; i < numOfTables; ++i) {
if (pQInfo->pTableDataInfo[i].pTableQInfo->tid == blockInfo.sid) {
pTableDataInfo = &pQInfo->pTableDataInfo[i];
}
}
assert(pTableDataInfo != NULL);
assert(pTableDataInfo != NULL && pTableDataInfo->pTableQInfo != NULL);
STableQueryInfo *pTableQueryInfo = pTableDataInfo->pTableQInfo;
if (pTableDataInfo->pTableQInfo == NULL) {
// pTableDataInfo->pTableQInfo = createMeterQueryInfo(pQInfo, pMeterObj->sid, pQuery->skey, pQuery->ekey);
}
restoreIntervalQueryRange(pRuntimeEnv, pTableQueryInfo);
SDataStatis *pStatis = NULL;
......@@ -4408,14 +4366,18 @@ static void queryOnDataBlocks(SQInfo *pQInfo, STableDataInfo *pMeterDataInfo) {
} else { // interval query
setIntervalQueryRange(pTableQueryInfo, pQInfo, nextKey);
int32_t ret = setAdditionalInfo(pQInfo, pTableDataInfo->tableIndex, pTableQueryInfo);
if (ret != TSDB_CODE_SUCCESS) {
// pQInfo->killed = 1;
return;
pQInfo->code = ret;
return taosGetTimestampMs() - st;
}
}
// stableApplyFunctionsOnBlock_(pSupporter, pTableDataInfo, &blockInfo, pStatis, pDataBlock, searchFn);
stableApplyFunctionsOnBlock(pRuntimeEnv, pTableDataInfo, &blockInfo, pStatis, pDataBlock, binarySearchForKey);
}
int64_t et = taosGetTimestampMs();
return et - st;
}
static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool *dataInCache, int32_t index,
......@@ -4499,7 +4461,7 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start
pointInterpSupporterSetData(pQInfo, &pointInterpSupporter);
pointInterpSupporterDestroy(&pointInterpSupporter);
vnodeScanAllData(pRuntimeEnv);
scanAllDataBlocks(pRuntimeEnv);
// first/last_row query, do not invoke the finalize for super table query
doFinalizeResult(pRuntimeEnv);
......@@ -4679,7 +4641,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
}
}
vnodeScanAllData(pRuntimeEnv);
scanAllDataBlocks(pRuntimeEnv);
pQuery->size = getNumOfResult(pRuntimeEnv);
doSkipResults(pRuntimeEnv);
......@@ -4778,95 +4740,85 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
#endif
}
static void doOrderedScan(SQInfo *pQInfo) {
SQuery *pQuery = &pQInfo->runtimeEnv.pQuery;
#if 0
// if (pQInfo->runtimeEnv. == NULL) {
// pSupporter->pMeterDataInfo = calloc(pSupporter->pSidSet->numOfTables, sizeof(STableDataInfo));
// }
STableIdInfo **pMeterSidExtInfo = pSupporter->pMeterSidExtInfo;
static void createTableDataInfo(SQInfo* pQInfo) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
tSidSet* pSidset = pSupporter->pSidSet;
int32_t groupId = 0;
// todo make sure the table are added the reference count to gauranteed that all involved tables are valid
int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList);
for (int32_t i = 0; i < pSidset->numOfTables; ++i) { // load all meter meta info
SMeterObj *pMeterObj = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[i]->sid);
if (pMeterObj == NULL) {
dError("QInfo:%p failed to find required sid:%d", pQInfo, pMeterSidExtInfo[i]->sid);
continue;
if (pQInfo->pTableDataInfo == NULL) {
pQInfo->pTableDataInfo = (STableDataInfo *)calloc(1, sizeof(STableDataInfo) * numOfTables);
if (pQInfo->pTableDataInfo == NULL) {
dError("QInfo:%p failed to allocate memory, %s", pQInfo, strerror(errno));
pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY;
return;
}
if (i >= pSidset->starterPos[groupId + 1]) {
groupId += 1;
int32_t groupId = 0;
for (int32_t i = 0; i < numOfTables; ++i) { // load all meter meta info
STableId *id = taosArrayGet(pQInfo->pTableIdList, i);
STableDataInfo *pInfo = &pQInfo->pTableDataInfo[i];
setTableDataInfo(pInfo, i, groupId);
pInfo->pTableQInfo = createTableQueryInfo(&pQInfo->runtimeEnv, id->tid, pQuery->window);
}
STableDataInfo *pOneMeterDataInfo = &pSupporter->pMeterDataInfo[i];
assert(pOneMeterDataInfo->pMeterObj == NULL);
setMeterDataInfo(pOneMeterDataInfo, pMeterObj, i, groupId);
pOneMeterDataInfo->pTableQInfo = createMeterQueryInfo(pSupporter, pMeterObj->sid, pQuery->skey, pQuery->ekey);
}
queryOnDataBlocks(pQInfo, pSupporter->pMeterDataInfo);
if (pQInfo->code != TSDB_CODE_SUCCESS) {
return;
}
#endif
}
static void setupMeterQueryInfoForSupplementQuery(SQInfo *pQInfo) {
static void prepareQueryInfoForReverseScan(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t num = taosHashGetSize(pQInfo->pTableIdList);
for (int32_t i = 0; i < num; ++i) {
// STableQueryInfo *pTableQueryInfo = pSupporter->pMeterDataInfo[i].pTableQInfo;
// changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey);
size_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList);
for (int32_t i = 0; i < numOfTables; ++i) {
STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo);
}
}
static void doMultiMeterSupplementaryScan(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
if (!needSupplementaryScan(pQuery)) {
dTrace("QInfo:%p no need to do supplementary scan, query completed", pQInfo);
return;
}
static void doSaveContext(SQInfo* pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv);
// disableFunctForSuppleScan(pSupporter, pQuery->order.order);
disableFuncForReverseScan(pQInfo, pQuery->order.order);
if (pRuntimeEnv->pTSBuf != NULL) {
pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1u;
}
#if 0
SWAP(pSupporter->rawSKey, pSupporter->rawEKey, TSKEY);
setupMeterQueryInfoForSupplementQuery(pSupporter);
int64_t st = taosGetTimestampMs();
doOrderedScan(pQInfo);
int64_t et = taosGetTimestampMs();
dTrace("QInfo:%p supplementary scan completed, elapsed time: %lldms", pQInfo, et - st);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
prepareQueryInfoForReverseScan(pQInfo);
}
static void doRestoreContext(SQInfo* pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
/*
* restore the env
* the meter query info is not reset to the original state
*/
SWAP(pSupporter->rawSKey, pSupporter->rawEKey, TSKEY);
enableFunctForMasterScan(pRuntimeEnv, pQuery->order.order);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
if (pRuntimeEnv->pTSBuf != NULL) {
pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1;
}
#endif
enableFuncForForwardScan(pRuntimeEnv, pQuery->order.order);
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
}
static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
static void doCloseAllTimeWindowAfterScan(SQInfo* pQInfo) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
size_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList);
if (isIntervalQuery(pQuery)) {
for (int32_t i = 0; i < numOfTables; ++i) {
STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
closeAllTimeWindow(&pTableQueryInfo->windowResInfo);
}
} else { // close results for group result
closeAllTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
}
}
static void multiTableQueryProcess(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
......@@ -4875,7 +4827,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
* if the subgroupIdx > 0, the query process must be completed yet, we only need to
* copy the data into output buffer
*/
if (pQuery->intervalTime > 0) {
if (isIntervalQuery(pQuery)) {
copyResToQueryResultBuf(pQInfo, pQuery);
#ifdef _DEBUG_VIEW
......@@ -4894,46 +4846,49 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
dTrace("QInfo:%p current:%lldd, total:%lldd", pQInfo, pQuery->rec.size, pQuery->rec.total);
return;
}
#if 0
pSupporter->pMeterDataInfo = (STableDataInfo *)calloc(1, sizeof(STableDataInfo) * pSupporter->numOfMeters);
if (pSupporter->pMeterDataInfo == NULL) {
dError("QInfo:%p failed to allocate memory, %s", pQInfo, strerror(errno));
pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY;
return;
}
dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", order:%d, group:%d", pQInfo, pSupporter->rawSKey,
pSupporter->rawEKey, pQuery->order.order, pSupporter->pSidSet->numOfSubSet);
dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", order:%d, forward scan start", pQInfo, pQuery->window.skey,
pQuery->window.ekey, pQuery->order.order);
dTrace("QInfo:%p main query scan start", pQInfo);
int64_t st = taosGetTimestampMs();
doOrderedScan(pQInfo);
// create the query support structures
createTableDataInfo(pQInfo);
int64_t et = taosGetTimestampMs();
dTrace("QInfo:%p main scan completed, elapsed time: %lldms, supplementary scan start, order:%d", pQInfo, et - st,
// do check all qualified data blocks
int64_t el = queryOnDataBlocks(pQInfo);
dTrace("QInfo:%p forward scan completed, elapsed time: %lldms, reversed scan start, order:%d", pQInfo, el,
pQuery->order.order ^ 1u);
if (pQuery->intervalTime > 0) {
for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) {
STableQueryInfo *pTableQueryInfo = pSupporter->pMeterDataInfo[i].pTableQInfo;
closeAllTimeWindow(&pTableQueryInfo->windowResInfo);
}
} else { // close results for group result
closeAllTimeWindow(&pRuntimeEnv->windowResInfo);
// query error occurred or query is killed, abort current execution
if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) {
dTrace("QInfo:%p query killed or error occurred, code:%d, abort", pQInfo, pQInfo->code);
return;
}
doMultiMeterSupplementaryScan(pQInfo);
// close all time window results
doCloseAllTimeWindowAfterScan(pQInfo);
if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p query killed, abort", pQInfo);
if (needReverseScan(pQuery)) {
doSaveContext(pQInfo);
el = queryOnDataBlocks(pQInfo);
dTrace("QInfo:%p reversed scan completed, elapsed time: %lldms", pQInfo, el);
doRestoreContext(pQInfo);
} else {
dTrace("QInfo:%p no need to do reversed scan, query completed", pQInfo);
return;
}
if (pQuery->intervalTime > 0 || isSumAvgRateQuery(pQuery)) {
assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0);
if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) {
dTrace("QInfo:%p query killed or error occurred, code:%d, abort", pQInfo, pQInfo->code);
return;
}
if (isIntervalQuery(pQuery) || isSumAvgRateQuery(pQuery)) {
// assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0);
if (mergeMetersResultToOneGroups(pSupporter) == TSDB_CODE_SUCCESS) {
copyResToQueryResultBuf(pSupporter, pQuery);
if (mergeResultsToGroup(pQInfo) == TSDB_CODE_SUCCESS) {
copyResToQueryResultBuf(pQInfo, pQuery);
#ifdef _DEBUG_VIEW
displayInterResult(pQuery->sdata, pQuery, pQuery->sdata[0]->len);
......@@ -4944,10 +4899,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
}
// handle the limitation of output buffer
pQInfo->size += pQuery->size;
dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->size, pQInfo->size,
pQInfo->pointsReturned);
#endif
dTrace("QInfo:%p points returned:%d, total:%d", pQInfo, pQuery->rec.size, pQuery->rec.total);
}
/*
......@@ -4960,7 +4912,7 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
vnodeScanAllData(pRuntimeEnv);
scanAllDataBlocks(pRuntimeEnv);
doFinalizeResult(pRuntimeEnv);
if (isQueryKilled(pQInfo)) {
......@@ -4993,7 +4945,7 @@ static void tableMultiOutputProcessor(SQInfo *pQInfo) {
}
while (1) {
vnodeScanAllData(pRuntimeEnv);
scanAllDataBlocks(pRuntimeEnv);
doFinalizeResult(pRuntimeEnv);
if (isQueryKilled(pQInfo)) {
......@@ -5039,7 +4991,7 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) {
while (1) {
initCtxOutputBuf(pRuntimeEnv);
vnodeScanAllData(pRuntimeEnv);
scanAllDataBlocks(pRuntimeEnv);
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
return;
......@@ -5076,7 +5028,7 @@ static void tableIntervalProcessor(SQInfo *pQInfo) {
while (1) {
tableIntervalProcessImpl(pRuntimeEnv);
if (pQuery->intervalTime > 0) {
if (isIntervalQuery(pQuery)) {
pQInfo->subgroupIdx = 0; // always start from 0
pQuery->rec.size = 0;
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
......@@ -5213,15 +5165,15 @@ static void singleTableQueryImpl(SQInfo* pQInfo) {
sem_post(&pQInfo->dataReady);
}
void multiTableQueryImpl(SQInfo* pQInfo) {
static void multiTableQueryImpl(SQInfo* pQInfo) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
pQuery->rec.size = 0;
int64_t st = taosGetTimestampUs();
if (pQuery->intervalTime > 0 ||
if (isIntervalQuery(pQuery) ||
(isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !isGroupbyNormalCol(pQuery->pGroupbyExpr))) {
vnodeMultiMeterQueryProcessor(pQInfo);
multiTableQueryProcess(pQInfo);
} else {
assert((pQuery->checkBufferInLoop == 1 && pQuery->intervalTime == 0) || isPointInterpoQuery(pQuery) ||
isGroupbyNormalCol(pQuery->pGroupbyExpr));
......@@ -5242,29 +5194,6 @@ void multiTableQueryImpl(SQInfo* pQInfo) {
sem_post(&pQInfo->dataReady);
}
void qTableQuery(SQInfo *pQInfo) {
if (pQInfo == NULL || pQInfo->signature != pQInfo) {
dTrace("%p freed abort query", pQInfo);
return;
}
if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p it is already killed, abort", pQInfo);
return;
}
dTrace("QInfo:%p query task is launched", pQInfo);
int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList);
if (numOfTables == 1) {
singleTableQueryImpl(pQInfo);
} else {
multiTableQueryImpl(pQInfo);
}
// vnodeDecRefCount(pQInfo);
}
static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncExprMsg *pExprMsg) {
int32_t j = 0;
......@@ -5284,7 +5213,7 @@ bool vnodeValidateExprColumnInfo(SQueryTableMsg *pQueryMsg, SSqlFuncExprMsg *pEx
return j < pQueryMsg->numOfCols;
}
static int32_t validateQueryMeterMsg(SQueryTableMsg *pQueryMsg) {
static int32_t validateQueryMsg(SQueryTableMsg *pQueryMsg) {
if (pQueryMsg->intervalTime < 0) {
dError("qmsg:%p illegal value of aggTimeInterval %" PRId64 "", pQueryMsg, pQueryMsg->intervalTime);
return -1;
......@@ -5349,7 +5278,7 @@ static char* createTableIdList(SQueryTableMsg* pQueryMsg, char* pMsg, SArray** p
* @return
*/
static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr,
wchar_t** tagCond, char** nameCond) {
wchar_t** tagCond) {
pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables);
pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey);
......@@ -5374,7 +5303,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg->tsOrder = htonl(pQueryMsg->tsOrder);
// query msg safety check
if (validateQueryMeterMsg(pQueryMsg) != 0) {
if (validateQueryMsg(pQueryMsg) != 0) {
return TSDB_CODE_INVALID_QUERY_MSG;
}
......@@ -5503,10 +5432,6 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
*tagCond = calloc(1, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE);
memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE);
}
if (pQueryMsg->nameCondLen > 0) {
*nameCond = strndup(pMsg, pQueryMsg->nameCondLen);
}
dTrace("qmsg:%p query on %d meter(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, numOfTagCols:%d, "
"timestamp order:%d, tags order:%d, tags order col:%d, numOfOutputCols:%d, numOfCols:%d, interval:%" PRId64
......@@ -5919,7 +5844,7 @@ _clean_memory:
return NULL;
}
bool isQInfoValid(void *param) {
static bool isValidQInfo(void *param) {
SQInfo *pQInfo = (SQInfo *)param;
if (pQInfo == NULL) {
return false;
......@@ -5933,11 +5858,66 @@ bool isQInfoValid(void *param) {
return (sig == (uint64_t)pQInfo);
}
void vnodeFreeQInfo(SQInfo *pQInfo) {
if (!isQInfoValid(pQInfo)) {
return;
static void freeQInfo(SQInfo *pQInfo);
static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
SArray *pTableIdList, void* tsdb, SQInfo **pQInfo) {
int32_t code = TSDB_CODE_SUCCESS;
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs, pTableIdList);
if ((*pQInfo) == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error;
}
SQuery *pQuery = (*pQInfo)->runtimeEnv.pQuery;
dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo);
pQuery->window.skey = pQueryMsg->window.skey;
pQuery->window.ekey = pQueryMsg->window.ekey;
pQuery->lastKey = pQuery->window.skey;
if (sem_init(&(*pQInfo)->dataReady, 0, 0) != 0) {
dError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, strerror(errno));
code = TSDB_CODE_APP_ERROR;
goto _error;
}
vnodeParametersSafetyCheck(pQuery);
STSBuf *pTSBuf = NULL;
if (pQueryMsg->tsLen > 0) { // open new file to save the result
char *tsBlock = (char *)pQueryMsg + pQueryMsg->tsOffset;
pTSBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder);
tsBufResetPos(pTSBuf);
tsBufNextPos(pTSBuf);
}
// filter the qualified
if ((code = initQInfo(*pQInfo, pTSBuf, tsdb)) != TSDB_CODE_SUCCESS) {
goto _error;
}
// if (pQInfo->over == 1) {
// vnodeAddRefCount(pQInfo); // for retrieve procedure
// return pQInfo;
// }
// dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo,
// pQInfo->refCount);
return code;
_error:
// table query ref will be decrease during error handling
freeQInfo(*pQInfo);
return code;
}
static void freeQInfo(SQInfo *pQInfo) {
if (!isValidQInfo(pQInfo)) {
return;
}
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
setQueryKilled(pQInfo);
......@@ -5945,7 +5925,7 @@ void vnodeFreeQInfo(SQInfo *pQInfo) {
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) {
tfree(pQuery->sdata[col]);
}
// for (int col = 0; col < pQuery->numOfCols; ++col) {
// vnodeFreeColumnInfo(&pQuery->colList[col].data);
// }
......@@ -5953,100 +5933,102 @@ void vnodeFreeQInfo(SQInfo *pQInfo) {
// if (pQuery->colList[0].colIdx != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
// tfree(pQuery->tsData);
// }
sem_destroy(&(pQInfo->dataReady));
vnodeQueryFreeQInfoEx(pQInfo);
for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) {
SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i];
if (pColFilter->numOfFilters > 0) {
tfree(pColFilter->pFilters);
}
}
tfree(pQuery->pFilterInfo);
tfree(pQuery->colList);
tfree(pQuery->sdata);
if (pQuery->pSelectExpr != NULL) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SSqlBinaryExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo;
if (pBinExprInfo->numOfCols > 0) {
tfree(pBinExprInfo->pReqColumns);
tSQLBinaryExprDestroy(&pBinExprInfo->pBinExpr, NULL);
}
}
tfree(pQuery->pSelectExpr);
}
if (pQuery->defaultVal != NULL) {
tfree(pQuery->defaultVal);
}
tfree(pQuery->pGroupbyExpr);
tfree(pQuery);
dTrace("QInfo:%p QInfo is freed", pQInfo);
// destroy signature, in order to avoid the query process pass the object safety check
memset(pQInfo, 0, sizeof(SQInfo));
tfree(pQInfo);
}
static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
SArray *pTableIdList, void* tsdb, SQInfo **pQInfo) {
int32_t code = TSDB_CODE_SUCCESS;
static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs, pTableIdList);
if ((*pQInfo) == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error;
}
SQuery *pQuery = (*pQInfo)->runtimeEnv.pQuery;
dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo);
pQuery->window.skey = pQueryMsg->window.skey;
pQuery->window.ekey = pQueryMsg->window.ekey;
pQuery->lastKey = pQuery->window.skey;
if (sem_init(&(*pQInfo)->dataReady, 0, 0) != 0) {
dError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, strerror(errno));
code = TSDB_CODE_APP_ERROR;
goto _error;
}
vnodeParametersSafetyCheck(pQuery);
STSBuf *pTSBuf = NULL;
if (pQueryMsg->tsLen > 0) { // open new file to save the result
char *tsBlock = (char *)pQueryMsg + pQueryMsg->tsOffset;
pTSBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder);
tsBufResetPos(pTSBuf);
tsBufNextPos(pTSBuf);
/*
* get the file size and set the numOfRows to be the file size, since for tsComp query,
* the returned row size is equalled to 1
* TODO handle the case that the file is too large to send back one time
*/
if (isTSCompQuery(pQuery) && (*numOfRows) > 0) {
struct stat fstat;
if (stat(pQuery->sdata[0]->data, &fstat) == 0) {
*numOfRows = fstat.st_size;
return fstat.st_size;
} else {
dError("QInfo:%p failed to get file info, path:%s, reason:%s", pQInfo, pQuery->sdata[0]->data, strerror(errno));
return 0;
}
} else {
return pQuery->rowSize * (*numOfRows);
}
}
// filter the qualified
if ((code = initQInfo(*pQInfo, pTSBuf, tsdb)) != TSDB_CODE_SUCCESS) {
goto _error;
static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
// the remained number of retrieved rows, not the interpolated result
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
// load data from file to msg buffer
if (isTSCompQuery(pQuery)) {
int32_t fd = open(pQuery->sdata[0]->data, O_RDONLY, 0666);
// make sure file exist
if (FD_VALID(fd)) {
size_t s = lseek(fd, 0, SEEK_END);
dTrace("QInfo:%p ts comp data return, file:%s, size:%zu", pQInfo, pQuery->sdata[0]->data, s);
lseek(fd, 0, SEEK_SET);
read(fd, data, s);
close(fd);
unlink(pQuery->sdata[0]->data);
} else {
dError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo,
pQuery->sdata[0]->data, strerror(errno));
}
} else {
doCopyQueryResultToMsg(pQInfo, pQuery->rec.size, data);
}
// if (pQInfo->over == 1) {
// vnodeAddRefCount(pQInfo); // for retrieve procedure
// return pQInfo;
// }
// dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo,
// pQInfo->refCount);
return code;
_error:
// table query ref will be decrease during error handling
vnodeFreeQInfo(*pQInfo);
return code;
pQuery->rec.total += pQuery->rec.size;
dTrace("QInfo:%p current:%d, total:%d", pQInfo, pQuery->rec.size, pQuery->rec.total);
return TSDB_CODE_SUCCESS;
// todo if interpolation exists, the result may be dump to client by several rounds
}
int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo) {
......@@ -6057,9 +6039,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo)
SArray *pTableIdList = NULL;
SSqlFuncExprMsg** pExprMsg = NULL;
wchar_t* tagCond = NULL;
char* nameCond = NULL;
if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &nameCond)) != TSDB_CODE_SUCCESS) {
if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond)) != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -6126,8 +6106,31 @@ _query_over:
return TSDB_CODE_SUCCESS;
}
void qTableQuery(SQInfo *pQInfo) {
if (pQInfo == NULL || pQInfo->signature != pQInfo) {
dTrace("%p freed abort query", pQInfo);
return;
}
if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p it is already killed, abort", pQInfo);
return;
}
dTrace("QInfo:%p query task is launched", pQInfo);
int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList);
if (numOfTables == 1) {
singleTableQueryImpl(pQInfo);
} else {
multiTableQueryImpl(pQInfo);
}
// vnodeDecRefCount(pQInfo);
}
int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) {
if (pQInfo == NULL || !isQInfoValid(pQInfo)) {
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_INVALID_QHANDLE;
}
......@@ -6148,62 +6151,6 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) {
return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code);
}
static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
/*
* get the file size and set the numOfRows to be the file size, since for tsComp query,
* the returned row size is equalled to 1
* TODO handle the case that the file is too large to send back one time
*/
if (isTSCompQuery(pQuery) && (*numOfRows) > 0) {
struct stat fstat;
if (stat(pQuery->sdata[0]->data, &fstat) == 0) {
*numOfRows = fstat.st_size;
return fstat.st_size;
} else {
dError("QInfo:%p failed to get file info, path:%s, reason:%s", pQInfo, pQuery->sdata[0]->data, strerror(errno));
return 0;
}
} else {
return pQuery->rowSize * (*numOfRows);
}
}
static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
// the remained number of retrieved rows, not the interpolated result
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
// load data from file to msg buffer
if (isTSCompQuery(pQuery)) {
int32_t fd = open(pQuery->sdata[0]->data, O_RDONLY, 0666);
// make sure file exist
if (FD_VALID(fd)) {
size_t s = lseek(fd, 0, SEEK_END);
dTrace("QInfo:%p ts comp data return, file:%s, size:%zu", pQInfo, pQuery->sdata[0]->data, s);
lseek(fd, 0, SEEK_SET);
read(fd, data, s);
close(fd);
unlink(pQuery->sdata[0]->data);
} else {
dError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo,
pQuery->sdata[0]->data, strerror(errno));
}
} else {
doCopyQueryResultToMsg(pQInfo, pQuery->rec.size, data);
}
pQuery->rec.total += pQuery->rec.size;
dTrace("QInfo:%p current:%d, total:%d", pQInfo, pQuery->rec.size, pQuery->rec.total);
return TSDB_CODE_SUCCESS;
// todo if interpolation exists, the result may be dump to client by several rounds
}
bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) {
if (pQInfo == NULL || pQInfo->signature != pQInfo || pQInfo->code != TSDB_CODE_SUCCESS) {
return false;
......@@ -6222,7 +6169,7 @@ bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) {
}
int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen) {
if (pQInfo == NULL || !isQInfoValid(pQInfo)) {
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_INVALID_QHANDLE;
}
......@@ -6251,7 +6198,7 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c
if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
(*pRsp)->completed = 1; // notify no more result to client
vnodeFreeQInfo(pQInfo);
freeQInfo(pQInfo);
}
return code;
......
......@@ -349,10 +349,10 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
} else {
// add non-super table to the array
pMeta->tables[pTable->tableId.tid] = pTable;
if (pTable->type == TSDB_CHILD_TABLE) {
// add STABLE to the index
if (pTable->type == TSDB_CHILD_TABLE && addIdx) { // add STABLE to the index
tsdbAddTableIntoIndex(pMeta, pTable);
}
pMeta->nTables++;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册