未验证 提交 3f8a10d9 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1587 from taosdata/feature/query

[td-98] fix bugs in interval query
......@@ -14,10 +14,10 @@ typedef struct SDataStatis {
int16_t numOfNull;
} SDataStatis;
typedef struct SColumnInfoEx {
typedef struct SColumnInfoData {
SColumnInfo info;
void* pData; // the corresponding block data in memory
} SColumnInfoEx;
} SColumnInfoData;
void extractTableName(const char *tableId, char *name);
......
......@@ -89,7 +89,7 @@ typedef struct SColumnFilterElem {
} SColumnFilterElem;
typedef struct SSingleColumnFilterInfo {
SColumnInfoEx info;
SColumnInfoData info;
int32_t numOfFilters;
SColumnFilterElem* pFilters;
void* pData;
......@@ -130,7 +130,7 @@ typedef struct SQuery {
int32_t rowSize;
SSqlGroupbyExpr* pGroupbyExpr;
SSqlFunctionExpr* pSelectExpr;
SColumnInfoEx* colList;
SColumnInfoData* colList;
int32_t numOfFilterCols;
int64_t* defaultVal;
TSKEY lastKey;
......
......@@ -866,7 +866,7 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3
int32_t numOfCols = taosArrayGetSize(pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoEx *p = taosArrayGet(pDataBlock, i);
SColumnInfoData *p = taosArrayGet(pDataBlock, i);
if (pCol->colId == p->info.colId) {
dataBlock = p->pData;
break;
......@@ -894,7 +894,7 @@ static void blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStati
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
SQuery * pQuery = pRuntimeEnv->pQuery;
SColumnInfoEx *pColInfo = NULL;
SColumnInfoData *pColInfo = NULL;
TSKEY * primaryKeyCol = NULL;
if (pDataBlock != NULL) {
......@@ -2223,7 +2223,7 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi
pQuery->pSelectExpr[columnIndex].resBytes * realRowId;
}
int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) ||
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) {
dTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey,
......@@ -2292,11 +2292,12 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, true);
STsdbQueryCond cond = {0};
cond.twindow = (STimeWindow){.skey = pQuery->window.skey, .ekey = pQuery->window.ekey};
cond.order = pQuery->order.order;
cond.colList = *pQuery->colList;
STsdbQueryCond cond = {
.twindow = (STimeWindow) {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey},
.order = pQuery->order.order,
.colList = pQuery->colList,
};
SArray *sa = taosArrayInit(1, POINTER_BYTES);
// for(int32_t i = 0; i < pQInfo->pSidSet->numOfTables; ++i) {
......@@ -2566,7 +2567,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
tsdb_query_handle_t pQueryHandle = pRuntimeEnv->pQueryHandle;
while (tsdbNextDataBlock(pQueryHandle)) {
// check if query is killed or not set the status of query to pass the status check
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
return 0;
}
......@@ -2699,7 +2700,6 @@ void vnodeSetTagValueInParam(tSidSet *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, vo
}
// set the join tag for first column
SSqlFuncExprMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase;
if (pFuncMsg->functionId == TSDB_FUNC_TS && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
pRuntimeEnv->pTSBuf != NULL) {
assert(pFuncMsg->numOfParams == 1);
......@@ -3472,8 +3472,8 @@ void setQueryStatus(SQuery *pQuery, int8_t status) {
bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
bool toContinue = false;
bool toContinue = false;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
// for each group result, call the finalize function for each column
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
......@@ -3567,8 +3567,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
doSingleMeterSupplementScan(pRuntimeEnv);
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during
// supplementary scan
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during reverse scan
pQuery->lastKey = lkey;
pQuery->window.ekey = ekey;
......@@ -3577,7 +3576,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
// tsdbNextDataBlock(pRuntimeEnv->pQueryHandle);
}
void doFinalizeResult(SQueryRuntimeEnv *pRuntimeEnv) {
void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
......@@ -3865,7 +3864,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
int32_t startIdx = 0;
int32_t step = -1;
dTrace("QInfo:%p start to copy data from windowResInfo to pQuery buf", GET_QINFO_ADDR(pQuery));
dTrace("QInfo:%p start to copy data from windowResInfo to query buf", GET_QINFO_ADDR(pQuery));
int32_t totalSubset = getNumOfSubset(pQInfo);
if (orderType == TSDB_ORDER_ASC) {
......@@ -3914,7 +3913,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
}
}
dTrace("QInfo:%p copy data to SQuery buf completed", pQInfo);
dTrace("QInfo:%p copy data to query buf completed", pQInfo);
#ifdef _DEBUG_VIEW
displayInterResult(pQuery->sdata, pQuery, numOfResult);
......@@ -4170,7 +4169,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery)
STsdbQueryCond cond = {
.twindow = pQuery->window,
.order = pQuery->order.order,
.colList = *pQuery->colList,
.colList = pQuery->colList,
};
SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0]));
......@@ -4450,7 +4449,7 @@ static UNUSED_FUNC int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, i
scanAllDataBlocks(pRuntimeEnv);
// first/last_row query, do not invoke the finalize for super table query
doFinalizeResult(pRuntimeEnv);
finalizeQueryResult(pRuntimeEnv);
int64_t numOfRes = getNumOfResult(pRuntimeEnv);
assert(numOfRes == 1 || numOfRes == 0);
......@@ -4690,7 +4689,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
* Only the ts-comp query requires the finalizer function to be executed here.
*/
if (isTSCompQuery(pQuery)) {
doFinalizeResult(pRuntimeEnv);
finalizeQueryResult(pRuntimeEnv);
}
if (pRuntimeEnv->pTSBuf != NULL) {
......@@ -4830,7 +4829,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
// vnodePrintQueryStatistics(pSupporter);
}
dTrace("QInfo:%p current:%lldd, total:%lldd", pQInfo, pQuery->rec.rows, pQuery->rec.total);
dTrace("QInfo:%p current:%lld, total:%lld", pQInfo, pQuery->rec.rows, pQuery->rec.total);
return;
}
......@@ -4896,12 +4895,12 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
* select count(*)/top(field,k)/avg(field name) from table_name [where ts>now-1a];
* select count(*) from table_name group by status_column;
*/
static void tableFixedOutputProcessor(SQInfo *pQInfo) {
static void tableFixedOutputProcess(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
scanAllDataBlocks(pRuntimeEnv);
doFinalizeResult(pRuntimeEnv);
finalizeQueryResult(pRuntimeEnv);
if (isQueryKilled(pQInfo)) {
return;
......@@ -4909,8 +4908,6 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) {
// since the numOfOutputElems must be identical for all sql functions that are allowed to be executed simutanelously.
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
// assert(pQuery->size <= pQuery->pointsToRead &&
// Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED));
// must be top/bottom query if offset > 0
if (pQuery->limit.offset > 0) {
......@@ -4919,8 +4916,6 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) {
doSkipResults(pRuntimeEnv);
doRevisedResultsByLimit(pQInfo);
pQuery->rec.rows = pQuery->rec.rows;
}
static void tableMultiOutputProcess(SQInfo *pQInfo) {
......@@ -4934,7 +4929,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo) {
while (1) {
scanAllDataBlocks(pRuntimeEnv);
doFinalizeResult(pRuntimeEnv);
finalizeQueryResult(pRuntimeEnv);
if (isQueryKilled(pQInfo)) {
return;
......@@ -4977,7 +4972,7 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
while (1) {
initCtxOutputBuf(pRuntimeEnv);
// initCtxOutputBuf(pRuntimeEnv);
scanAllDataBlocks(pRuntimeEnv);
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
......@@ -4985,7 +4980,7 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) {
}
assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_NOT_COMPLETED));
doFinalizeResult(pRuntimeEnv);
finalizeQueryResult(pRuntimeEnv);
// here we can ignore the records in case of no interpolation
// todo handle offset, in case of top/bottom interval query
......@@ -5006,7 +5001,7 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) {
}
// handle time interval query on table
static void tableIntervalProcessor(SQInfo *pQInfo) {
static void tableIntervalProcess(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv);
SQuery * pQuery = pRuntimeEnv->pQuery;
......@@ -5058,13 +5053,7 @@ static void tableIntervalProcessor(SQInfo *pQInfo) {
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx);
}
pQuery->rec.rows += pQuery->rec.rows;
pQInfo->pointsInterpo += numOfInterpo;
// dTrace("%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d
// totalReturn:%d",
// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->size, numOfInterpo,
// pQInfo->size - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned);
}
static void tableQueryImpl(SQInfo* pQInfo) {
......@@ -5084,8 +5073,6 @@ static void tableQueryImpl(SQInfo* pQInfo) {
doRevisedResultsByLimit(pQInfo);
pQInfo->pointsInterpo += numOfInterpo;
pQuery->rec.rows += pQuery->rec.rows;
dTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total);
sem_post(&pQInfo->dataReady);
return;
......@@ -5127,25 +5114,25 @@ static void tableQueryImpl(SQInfo* pQInfo) {
// group by normal column, sliding window query, interval query are handled by interval query processor
if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation)
tableIntervalProcessor(pQInfo);
tableIntervalProcess(pQInfo);
} else if (isFixedOutputQuery(pQuery)) {
assert(pQuery->checkBuffer == 0);
tableFixedOutputProcessor(pQInfo);
tableFixedOutputProcess(pQInfo);
} else { // diff/add/multiply/subtract/division
assert(pQuery->checkBuffer == 1);
tableMultiOutputProcess(pQInfo);
}
// record the total elapsed time
pQInfo->elapsedTime += (taosGetTimestampUs() - st);
assert(taosArrayGetSize(pQInfo->pTableIdList) == 1);
/* check if query is killed or not */
if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p query is killed", pQInfo);
} else {
dTrace("QInfo:%p query task completed, %" PRId64 " rows will returned, total:%" PRId64 " rows", pQInfo, pQuery->rec.rows,
pQuery->rec.total);
STableId* pTableId = taosArrayGet(pQInfo->pTableIdList, 0);
dTrace("QInfo:%p uid:%" PRIu64 " tid:%d, query completed, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
pQInfo, pTableId->uid, pTableId->tid, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
}
sem_post(&pQInfo->dataReady);
......@@ -5600,7 +5587,7 @@ static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) {
if (pQuery->colList[i].info.numOfFilters > 0) {
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[j];
memcpy(&pFilterInfo->info, &pQuery->colList[i], sizeof(SColumnInfoEx));
memcpy(&pFilterInfo->info, &pQuery->colList[i], sizeof(SColumnInfoData));
pFilterInfo->info.info.filters = NULL;
pFilterInfo->numOfFilters = pQuery->colList[i].info.numOfFilters;
......
......@@ -188,7 +188,7 @@ typedef void* tsdb_query_handle_t; // Use void to hide implementation details
typedef struct STsdbQueryCond {
STimeWindow twindow;
int32_t order; // desc/asc order to iterate the data block
SColumnInfoEx colList;
SColumnInfoData* colList;
} STsdbQueryCond;
typedef struct SBlockInfo {
......
......@@ -25,7 +25,7 @@
#include "tsdbMain.h"
#define EXTRA_BYTES 2
#define PRIMARY_TSCOL_REQUIRED(c) (((SColumnInfoEx*)taosArrayGet(c, 0))->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)
#define PRIMARY_TSCOL_REQUIRED(c) (((SColumnInfoData*)taosArrayGet(c, 0))->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)
#define QUERY_IS_ASC_QUERY(o) (o == TSDB_ORDER_ASC)
#define QH_GET_NUM_OF_COLS(handle) (taosArrayGetSize((handle)->pColumns))
......@@ -65,7 +65,10 @@ typedef struct STableCheckInfo {
int64_t offsetInHeaderFile;
int32_t start;
bool checkFirstFileBlock;
SCompInfo* pCompInfo;
int32_t compSize;
int32_t numOfBlocks; // number of qualified data blocks not the original blocks
SDataCols* pDataCols;
......@@ -106,7 +109,7 @@ typedef struct STsdbQueryHandle {
SCompBlock* pBlock;
int32_t numOfBlocks;
SField** pFields;
SArray* pColumns; // column list, SColumnInfoEx array list
SArray* pColumns; // column list, SColumnInfoData array list
bool locateStart;
int32_t realNumOfRows;
bool loadDataAfterSeek; // load data after seek.
......@@ -127,7 +130,7 @@ typedef struct STsdbQueryHandle {
int32_t doAllocateBuf(STsdbQueryHandle* pQueryHandle, int32_t rowsPerFileBlock) {
// record the maximum column width among columns of this meter/metric
SColumnInfoEx* pColumn = taosArrayGet(pQueryHandle->pColumns, 0);
SColumnInfoData* pColumn = taosArrayGet(pQueryHandle->pColumns, 0);
int32_t maxColWidth = pColumn->info.bytes;
for (int32_t i = 1; i < QH_GET_NUM_OF_COLS(pQueryHandle); ++i) {
......@@ -177,7 +180,7 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond
.lastKey = pQueryHandle->window.skey,
.tableId = id,
.pTableObj = tsdbGetTableByUid(tsdbGetMeta(tsdb), id.uid), // todo this may be failed
.pCompInfo = calloc(1, 1024),
.pCompInfo = NULL,
};
assert(info.pTableObj != NULL);
......@@ -193,10 +196,10 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond
int32_t numOfCols = taosArrayGetSize(pColumnInfo);
size_t bufferCapacity = 4096;
pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoEx));
pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoEx* pCol = taosArrayGet(pColumnInfo, i);
SColumnInfoEx pDest = {{0}, 0};
SColumnInfoData* pCol = taosArrayGet(pColumnInfo, i);
SColumnInfoData pDest = {{0}, 0};
pDest.pData = calloc(1, EXTRA_BYTES + bufferCapacity * pCol->info.bytes);
pDest.info = pCol->info;
......@@ -261,6 +264,15 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file
} else {
if (pCheckInfo->compSize < compIndex->len) {
assert(compIndex->len > 0);
char* t = realloc(pCheckInfo->pCompInfo, compIndex->len);
assert(t != NULL);
pCheckInfo->pCompInfo = (SCompInfo*) t;
}
tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo);
int32_t index = 0;
......@@ -589,7 +601,7 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf
int16_t colId = *(int16_t*)taosArrayGet(sa, i);
for (int32_t j = 0; j < numOfCols; ++j) {
SColumnInfoEx* pCol = taosArrayGet(pQueryHandle->pColumns, j);
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, j);
if (pCol->info.colId == colId) {
SDataCol* pDataCol = &pCols->cols[i];
......@@ -610,7 +622,7 @@ static SArray* getColumnIdList(STsdbQueryHandle* pQueryHandle) {
int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoEx* pCol = taosArrayGet(pQueryHandle->pColumns, i);
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
taosArrayPush(pIdList, &pCol->info.colId);
}
......@@ -1067,7 +1079,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max
int32_t offset = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoEx* pColInfo = taosArrayGet(pHandle->pColumns, i);
SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i);
memcpy(pColInfo->pData + numOfRows * pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes);
offset += pColInfo->info.bytes;
}
......@@ -1102,7 +1114,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t* pQueryHandle) {
/* not a whole disk block, only the qualified rows, so this block is loaded in to buffer during the
* block next function
*/
SColumnInfoEx* pColInfoEx = taosArrayGet(pHandle->pColumns, 0);
SColumnInfoData* pColInfoEx = taosArrayGet(pHandle->pColumns, 0);
rows = pHandle->realNumOfRows;
skey = *(TSKEY*)pColInfoEx->pData;
......@@ -1141,7 +1153,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t* pQueryHandle, SData
SArray* tsdbRetrieveDataBlock(tsdb_query_handle_t* pQueryHandle, SArray* pIdList) {
/**
* In the following two cases, the data has been loaded to SColumnInfoEx.
* In the following two cases, the data has been loaded to SColumnInfoData.
* 1. data is from cache, 2. data block is not completed qualified to query time range
*/
STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;
......@@ -1484,7 +1496,7 @@ void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) {
size_t cols = taosArrayGetSize(pQueryHandle->pColumns);
for (int32_t i = 0; i < cols; ++i) {
SColumnInfoEx* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
tfree(pColInfo->pData);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册