提交 3f24ff44 编写于 作者: H hjxilinx

[td-169] fix limit offset error for table projection/diff query

上级 e591a3eb
......@@ -16,17 +16,17 @@
#include "hash.h"
#include "hashfunc.h"
#include "taosmsg.h"
#include "tlosertree.h"
#include "tscompression.h"
#include "ttime.h"
#include "qast.h"
#include "qresultBuf.h"
#include "query.h"
#include "queryExecutor.h"
#include "queryLog.h"
#include "queryUtil.h"
#include "query.h"
#include "taosmsg.h"
#include "tlosertree.h"
#include "tscompression.h"
#include "tsdbMain.h" //todo use TableId instead of STable object
#include "queryLog.h"
#include "ttime.h"
#define DEFAULT_INTERN_BUF_SIZE 16384L
......@@ -45,16 +45,15 @@
#define SET_SUPPLEMENT_SCAN_FLAG(runtime) ((runtime)->scanFlag = SUPPLEMENTARY_SCAN)
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
#define GET_QINFO_ADDR(x) ((void*)((char *)(x)-offsetof(SQInfo, runtimeEnv)))
#define GET_QINFO_ADDR(x) ((void *)((char *)(x)-offsetof(SQInfo, runtimeEnv)))
#define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index) * (step))
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC))
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
/* get the qinfo struct address from the query struct address */
#define GET_COLUMN_BYTES(query, colidx) \
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIndex].info.bytes)
#define GET_COLUMN_TYPE(query, colidx) \
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIndex].info.type)
#define GET_COLUMN_TYPE(query, colidx) ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIndex].info.type)
typedef struct SPointInterpoSupporter {
int32_t numOfCols;
......@@ -93,7 +92,7 @@ typedef struct {
int32_t status; // query status
TSKEY lastKey; // the lastKey value before query executed
STimeWindow w; // whole query time window
STimeWindow current; // current query window
STimeWindow curWindow; // current query window
int32_t windowIndex; // index of active time window result for interval query
STSCursor cur;
} SQueryStatusInfo;
......@@ -101,7 +100,7 @@ typedef struct {
static void setQueryStatus(SQuery *pQuery, int8_t status);
bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray* group);
static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *group);
static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult);
static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo);
......@@ -114,9 +113,9 @@ static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static void destroyMeterQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols);
static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static bool hasMainOutput(SQuery *pQuery);
static void createTableDataInfo(SQInfo* pQInfo);
static void createTableDataInfo(SQInfo *pQInfo);
static int32_t setAdditionalInfo(SQInfo *pQInfo, STable* pTable, STableQueryInfo *pTableQueryInfo);
static int32_t setAdditionalInfo(SQInfo *pQInfo, STable *pTable, STableQueryInfo *pTableQueryInfo);
static int32_t flushFromResultBuf(SQInfo *pQInfo);
bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter *pPointInterpSupporter) {
......@@ -427,7 +426,7 @@ static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, SDataBlo
*/
static bool hasNullValue(SQuery *pQuery, int32_t col, SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis,
SDataStatis **pColStatis) {
SColIndex* pColIndex = &pQuery->pSelectExpr[col].pBase.colInfo;
SColIndex *pColIndex = &pQuery->pSelectExpr[col].pBase.colInfo;
if (TSDB_COL_IS_TAG(pColIndex->flag)) {
return false;
}
......@@ -865,9 +864,9 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3
dataBlock = NULL;
} else {
/*
* the colIndex is acquired from the first meter of all qualified meters in this vnode during query prepare stage,
* the remain meter may not have the required column in cache actually.
* So, the validation of required column in cache with the corresponding meter schema is reinforced.
* the colIndex is acquired from the first meter of all qualified meters in this vnode during query prepare
* stage, the remain meter may not have the required column in cache actually. So, the validation of required
* column in cache with the corresponding meter schema is reinforced.
*/
if (pDataBlock == NULL) {
return NULL;
......@@ -911,7 +910,6 @@ static void blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStati
primaryKeyCol = (TSKEY *)(pColInfo->pData);
}
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pDataBlockInfo->rows - 1;
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutputCols, sizeof(SArithmeticSupport));
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
......@@ -922,8 +920,8 @@ static void blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStati
bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo, pStatis, &tpField);
char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo->rows, functionId, tpField,
hasNull, &sasArray[k], pRuntimeEnv->scanFlag);
setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo->rows, functionId, tpField, hasNull,
&sasArray[k], pRuntimeEnv->scanFlag);
}
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
......@@ -1131,13 +1129,13 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat
bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo, pStatis, &pColStatis);
char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo->rows, functionId, pColStatis,
hasNull, &sasArray[k], pRuntimeEnv->scanFlag);
setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo->rows, functionId, pColStatis, hasNull,
&sasArray[k], pRuntimeEnv->scanFlag);
}
// set the input column data
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
// SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
// SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
assert(0);
/*
* NOTE: here the tbname/tags column cannot reach here, since it will never be a filter column,
......@@ -1308,17 +1306,17 @@ static UNUSED_FUNC int32_t reviseForwardSteps(SQueryRuntimeEnv *pRuntimeEnv, int
}
static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo,
SDataStatis *pStatis, __block_search_fn_t searchFn, SWindowResInfo *pWindowResInfo, SArray *pDataBlock) {
SDataStatis *pStatis, __block_search_fn_t searchFn,
SWindowResInfo *pWindowResInfo, SArray *pDataBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
/*numOfRes = */rowwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
/*numOfRes = */ rowwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
} else {
blockwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
}
TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey;
TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey;
pQuery->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
......@@ -1360,15 +1358,19 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY
pCtx->preAggVals.isSet = false;
}
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0 && (tsCol != NULL)) {
pCtx->ptsList = tsCol;
pCtx->startOffset = QUERY_IS_ASC_QUERY(pQuery)? pQuery->pos : 0;
pCtx->size = QUERY_IS_ASC_QUERY(pQuery)? size - pQuery->pos : pQuery->pos + 1;
uint32_t status = aAggs[functionId].nStatus;
if (((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) && (tsCol != NULL)) {
pCtx->ptsList = &tsCol[pCtx->startOffset];
}
if (functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_LAST_DST) {
// last_dist or first_dist function
// store the first&last timestamp into the intermediate buffer [1], the true
// value may be null but timestamp will never be null
pCtx->ptsList = tsCol;
// pCtx->ptsList = tsCol;
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_TWA ||
functionId == TSDB_FUNC_DIFF || (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) {
/*
......@@ -1384,15 +1386,12 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY
pTWAInfo->EKey = pQuery->window.ekey;
}
pCtx->ptsList = tsCol;
// pCtx->ptsList = tsCol;
} else if (functionId == TSDB_FUNC_ARITHM) {
pCtx->param[1].pz = param;
}
pCtx->startOffset = 0;
pCtx->size = size;
#if defined(_DEBUG_VIEW)
// int64_t *tsList = (int64_t *)primaryColumnData;
// int64_t s = tsList[0];
......@@ -1446,7 +1445,7 @@ static void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool i
}
}
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel *pTagsSchema, int16_t order) {
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order) {
qTrace("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv));
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -1583,9 +1582,7 @@ static bool isQueryKilled(SQInfo *pQInfo) {
#endif
}
static void setQueryKilled(SQInfo* pQInfo) {
pQInfo->code = TSDB_CODE_QUERY_CANCELLED;
}
static void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_QUERY_CANCELLED; }
bool isFixedOutputQuery(SQuery *pQuery) {
if (pQuery->intervalTime != 0) {
......@@ -1679,8 +1676,8 @@ static bool needReverseScan(SQuery *pQuery) {
}
/////////////////////////////////////////////////////////////////////////////////////////////
void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast,
int64_t *realSkey, int64_t *realEkey, STimeWindow *win) {
void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, int64_t *realSkey,
int64_t *realEkey, STimeWindow *win) {
assert(key >= keyFirst && key <= keyLast && pQuery->slidingTime <= pQuery->intervalTime);
win->skey = taosGetIntervalStartTimestamp(key, pQuery->slidingTime, pQuery->slidingTimeUnit, pQuery->precision);
......@@ -2100,7 +2097,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
continue;
}
int32_t colInBuf = 0;//pQuery->pSelectExpr[i].pBase.colInfo.colIdxInBuf;
int32_t colInBuf = 0; // pQuery->pSelectExpr[i].pBase.colInfo.colIdxInBuf;
SInterpInfo *pInterpInfo = (SInterpInfo *)pRuntimeEnv->pCtx[i].aOutputBuf;
pInterpInfo->pInterpDetail = calloc(1, sizeof(SInterpInfoDetail));
......@@ -2199,7 +2196,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
size_t s = pQInfo->groupInfo.numOfTables;
num = MAX(s, INITIAL_RESULT_ROWS_VALUE);
} else { // for super table query, one page for each subset
num = 1;//pQInfo->pSidSet->numOfSubSet;
num = 1; // pQInfo->pSidSet->numOfSubSet;
}
assert(num > 0);
......@@ -2374,8 +2371,8 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl
}
if (r == BLK_DATA_NO_NEEDED) {
qTrace("QInfo:%p slot:%d, data block ignored, brange:%" PRId64 "-%" PRId64 ", rows:%d",
GET_QINFO_ADDR(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
qTrace("QInfo:%p slot:%d, data block ignored, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_QINFO_ADDR(pRuntimeEnv),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
} else if (r == BLK_DATA_FILEDS_NEEDED) {
if (tsdbRetrieveDataBlockStatisInfo(pRuntimeEnv->pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) {
// return DISK_DATA_LOAD_FAILED;
......@@ -2419,7 +2416,7 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
TSKEY* keyList = (TSKEY *)pValue;
TSKEY * keyList = (TSKEY *)pValue;
int32_t firstPos = 0;
int32_t lastPos = num - 1;
......@@ -2477,9 +2474,9 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
qTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d",
GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order);
TsdbQueryHandleT pQueryHandle = pRuntimeEnv->scanFlag == MASTER_SCAN? pRuntimeEnv->pQueryHandle:pRuntimeEnv->pSecQueryHandle;
TsdbQueryHandleT pQueryHandle =
pRuntimeEnv->scanFlag == MASTER_SCAN ? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
while (tsdbNextDataBlock(pQueryHandle)) {
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
return 0;
}
......@@ -2493,8 +2490,8 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (QUERY_IS_ASC_QUERY(pQuery)) {
getAlignQueryTimeWindow(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey,
&skey1, &ekey1, &w);
getAlignQueryTimeWindow(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, &skey1,
&ekey1, &w);
pWindowResInfo->startTime = w.skey;
pWindowResInfo->prevSKey = w.skey;
} else {
......@@ -2509,23 +2506,23 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
// in case of prj/diff query, ensure the output buffer is sufficient to accomodate the results of current block
if (!isIntervalQuery(pQuery) && !isGroupbyNormalCol(pQuery->pGroupbyExpr) && !isFixedOutputQuery(pQuery)) {
SResultRec* pRec = &pQuery->rec;
SResultRec *pRec = &pQuery->rec;
if (pQuery->rec.capacity - pQuery->rec.rows < blockInfo.rows) {
int32_t remain = pRec->capacity - pRec->rows;
int32_t newSize = pRec->capacity + (blockInfo.rows - remain);
for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
int32_t bytes = pQuery->pSelectExpr[i].resBytes;
char* tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(SData));
char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(SData));
if (tmp == NULL) { // todo handle the oom
} else {
pQuery->sdata[i] = (SData*) tmp;
pQuery->sdata[i] = (SData *)tmp;
}
// set the pCtx output buffer position
pRuntimeEnv->pCtx[i].aOutputBuf = pQuery->sdata[i]->data + pRec->rows*bytes;
pRuntimeEnv->pCtx[i].aOutputBuf = pQuery->sdata[i]->data + pRec->rows * bytes;
}
pRec->capacity = newSize;
......@@ -2533,12 +2530,14 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
}
SDataStatis *pStatis = NULL;
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis);
SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis);
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1;
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey,
&pRuntimeEnv->windowResInfo, pDataBlock);
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d",
GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes);
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", GET_QINFO_ADDR(pRuntimeEnv),
blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes);
// save last access position
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
......@@ -2572,10 +2571,10 @@ static void updatelastkey(SQuery *pQuery, STableQueryInfo *pTableQInfo) { pTable
* set tag value in SQLFunctionCtx
* e.g.,tag information into input buffer
*/
static void doSetTagValueInParam(void* tsdb, STableId id, int32_t tagColId, tVariant *param) {
static void doSetTagValueInParam(void *tsdb, STableId id, int32_t tagColId, tVariant *param) {
tVariantDestroy(param);
char* val = NULL;
char * val = NULL;
int16_t bytes = 0;
int16_t type = 0;
......@@ -2583,8 +2582,8 @@ static void doSetTagValueInParam(void* tsdb, STableId id, int32_t tagColId, tVar
tVariantCreateFromBinary(param, val, bytes, type);
}
void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void* tsdb) {
SQuery * pQuery = pRuntimeEnv->pQuery;
void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void *tsdb) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SSqlFuncExprMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase;
if (pQuery->numOfOutputCols == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) {
......@@ -2600,7 +2599,6 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void* tsdb) {
continue;
}
// todo use tag column index to optimize performance
doSetTagValueInParam(tsdb, id, pCol->colId, &pRuntimeEnv->pCtx[idx].tag);
}
......@@ -2610,7 +2608,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void* tsdb) {
pRuntimeEnv->pTSBuf != NULL) {
assert(pFuncMsg->numOfParams == 1);
assert(0); // to do fix me
// doSetTagValueInParam(pTagSchema, pFuncMsg->arg->argValue.i64, pMeterSidInfo, &pRuntimeEnv->pCtx[0].tag);
// doSetTagValueInParam(pTagSchema, pFuncMsg->arg->argValue.i64, pMeterSidInfo, &pRuntimeEnv->pCtx[0].tag);
}
}
}
......@@ -2818,7 +2816,7 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
while (pQInfo->groupIndex < numOfGroups) {
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex);
SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex);
ret = mergeIntoGroupResultImpl(pQInfo, group);
if (ret < 0) { // not enough disk space to save the data into disk
return -1;
......@@ -2835,8 +2833,8 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
qTrace("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->groupIndex - 1);
}
qTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms",
pQInfo, pQInfo->groupIndex - 1, numOfGroups, taosGetTimestampMs() - st);
qTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", pQInfo,
pQInfo->groupIndex - 1, numOfGroups, taosGetTimestampMs() - st);
return TSDB_CODE_SUCCESS;
}
......@@ -2916,22 +2914,22 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pW
return maxOutput;
}
int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray* pGroup) {
int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
SQuery * pQuery = pRuntimeEnv->pQuery;
size_t size = taosArrayGetSize(pGroup);
tFilePage **buffer = (tFilePage **)pQuery->sdata;
int32_t *posList = calloc(size, sizeof(int32_t));
int32_t * posList = calloc(size, sizeof(int32_t));
STableDataInfo **pTableList = malloc(POINTER_BYTES * size);
// todo opt for the case of one table per group
int32_t numOfTables = 0;
for (int32_t i = 0; i < size; ++i) {
SPair* p = taosArrayGet(pGroup, i);
STableDataInfo* pInfo = p->sec;
SPair * p = taosArrayGet(pGroup, i);
STableDataInfo *pInfo = p->sec;
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, pInfo->pTableQInfo->tid);
if (list.size > 0 && pInfo->pTableQInfo->windowResInfo.size > 0) {
......@@ -3158,12 +3156,12 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) {
}
if (isIntervalQuery(pQuery)) {
// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
// SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
//
// doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
// }
// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
// SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
//
// doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
// }
} else {
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
......@@ -3175,7 +3173,8 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) {
void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SWITCH_ORDER(pRuntimeEnv->pCtx[i].order);// = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
SWITCH_ORDER(pRuntimeEnv->pCtx[i]
.order); // = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
}
}
......@@ -3209,7 +3208,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
}
memset(pQuery->sdata[i]->data, 0, (size_t) pQuery->pSelectExpr[i].resBytes * pQuery->rec.capacity);
memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * pQuery->rec.capacity);
}
initCtxOutputBuf(pRuntimeEnv);
......@@ -3345,25 +3344,23 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
return toContinue;
}
static SQueryStatusInfo getQueryStatusInfo(SQueryRuntimeEnv* pRuntimeEnv) {
SQuery* pQuery = pRuntimeEnv->pQuery;
static SQueryStatusInfo getQueryStatusInfo(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SQueryStatusInfo info = {
.status = pQuery->status,
.windowIndex = pRuntimeEnv->windowResInfo.curIndex,
.lastKey = pQuery->lastKey,
.w = pQuery->window,
.curWindow = {.skey = pQuery->lastKey, .ekey = pQuery->window.ekey},
};
return info;
}
static void setEnvBeforeReverseScan(SQueryRuntimeEnv* pRuntimeEnv, SQueryStatusInfo* pStatus) {
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SQuery* pQuery = pRuntimeEnv->pQuery;
// the step should be placed before order changed
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) {
SQInfo *pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SQuery *pQuery = pRuntimeEnv->pQuery;
pStatus->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor
if (pRuntimeEnv->pTSBuf) {
......@@ -3372,8 +3369,8 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv* pRuntimeEnv, SQueryStatusI
}
// reverse order time range
pQuery->window.skey = pQuery->lastKey - step;
pQuery->window.ekey = pStatus->lastKey; // the start timestamp of current query
pQuery->window = pStatus->curWindow;
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
SWITCH_ORDER(pQuery->order.order);
SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv);
......@@ -3397,8 +3394,8 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv* pRuntimeEnv, SQueryStatusI
disableFuncInReverseScan(pRuntimeEnv);
}
static void clearEnvAfterReverseScan(SQueryRuntimeEnv* pRuntimeEnv, TSKEY lastKey, SQueryStatusInfo* pStatus) {
SQuery* pQuery = pRuntimeEnv->pQuery;
static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SWITCH_ORDER(pQuery->order.order);
switchCtxOrder(pRuntimeEnv);
......@@ -3412,7 +3409,7 @@ static void clearEnvAfterReverseScan(SQueryRuntimeEnv* pRuntimeEnv, TSKEY lastKe
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query
// during reverse scan
pQuery->lastKey = lastKey;
pQuery->lastKey = pStatus->lastKey;
pQuery->status = pStatus->status;
pQuery->window = pStatus->w;
}
......@@ -3422,7 +3419,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
// store the start query position
SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv);
SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pRuntimeEnv);
SQueryStatusInfo qstatus = getQueryStatusInfo(pRuntimeEnv);
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
......@@ -3433,6 +3430,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
if (pRuntimeEnv->scanFlag == MASTER_SCAN) {
qstatus.status = pQuery->status;
qstatus.curWindow.ekey = pQuery->lastKey - step;
}
if (!needScanDataBlocksAgain(pRuntimeEnv)) {
......@@ -3445,7 +3443,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
}
STsdbQueryCond cond = {
.twindow = {.skey = qstatus.lastKey, .ekey = pQuery->lastKey - step},
.twindow = qstatus.curWindow,
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
......@@ -3471,14 +3469,13 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
return;
}
TSKEY lastKey = pQuery->lastKey;
setEnvBeforeReverseScan(pRuntimeEnv, &qstatus);
// reverse scan from current position
qTrace("QInfo:%p start to reverse scan", GET_QINFO_ADDR(pRuntimeEnv));
qTrace("QInfo:%p start to reverse scan", pQInfo);
doScanAllDataBlocks(pRuntimeEnv);
clearEnvAfterReverseScan(pRuntimeEnv, lastKey, &qstatus);
clearEnvAfterReverseScan(pRuntimeEnv, &qstatus);
}
void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
......@@ -3588,7 +3585,7 @@ void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *p
* @param pRuntimeEnv
* @param pDataBlockInfo
*/
void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, STable* pTable, int32_t groupIdx,
void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, STable *pTable, int32_t groupIdx,
TSKEY nextKey) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SWindowResInfo * pWindowResInfo = &pRuntimeEnv->windowResInfo;
......@@ -3642,7 +3639,7 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
}
}
int32_t setAdditionalInfo(SQInfo *pQInfo, STable* pTable, STableQueryInfo *pTableQueryInfo) {
int32_t setAdditionalInfo(SQInfo *pQInfo, STable *pTable, STableQueryInfo *pTableQueryInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
assert(pTableQueryInfo->lastKey > 0);
......@@ -3860,10 +3857,11 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableDataInf
}
}
void stableApplyFunctionsOnBlock(SQueryRuntimeEnv* pRuntimeEnv, STableDataInfo *pTableDataInfo, SDataBlockInfo *pDataBlockInfo,
SDataStatis *pStatis, SArray *pDataBlock, __block_search_fn_t searchFn) {
void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableDataInfo *pTableDataInfo,
SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis, SArray *pDataBlock,
__block_search_fn_t searchFn) {
SQuery * pQuery = pRuntimeEnv->pQuery;
STableQueryInfo * pTableQueryInfo = pTableDataInfo->pTableQInfo;
STableQueryInfo *pTableQueryInfo = pTableDataInfo->pTableQInfo;
SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) {
......@@ -3901,7 +3899,7 @@ bool vnodeHasRemainResults(void *handle) {
// query has completed
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
/*TSKEY ekey =*/ taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->intervalTime,
/*TSKEY ekey =*/taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->intervalTime,
pQuery->slidingTimeUnit, pQuery->precision);
// int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY
// *)pRuntimeEnv->pInterpoBuf[0]->data,
......@@ -4058,7 +4056,209 @@ void vnodePrintQueryStatistics(SQInfo *pQInfo) {
#endif
}
int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery) {
static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (pQuery->limit.offset == pBlockInfo->rows) { // current block will ignore completed
pQuery->lastKey = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.ekey + step : pBlockInfo->window.skey + step;
pQuery->limit.offset = 0;
return;
}
if (QUERY_IS_ASC_QUERY(pQuery)) {
pQuery->pos = pQuery->limit.offset;
} else {
pQuery->pos = pBlockInfo->rows - pQuery->limit.offset - 1;
}
assert(pQuery->pos >= 0 && pQuery->pos <= pBlockInfo->rows - 1);
SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL);
SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
// update the pQuery->limit.offset value, and pQuery->pos value
TSKEY *keys = (TSKEY *)pColInfoData->pData;
// update the offset value
pQuery->lastKey = keys[pQuery->pos];
pQuery->limit.offset = 0;
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey,
&pRuntimeEnv->windowResInfo, pDataBlock);
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", GET_QINFO_ADDR(pRuntimeEnv),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes);
}
void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pQuery->limit.offset <= 0 || pQuery->numOfFilterCols > 0) {
return;
}
pQuery->pos = 0;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle;
while (tsdbNextDataBlock(pQueryHandle)) {
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
return;
}
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle);
if (pQuery->limit.offset > blockInfo.rows) {
pQuery->limit.offset -= blockInfo.rows;
pQuery->lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.window.ekey : blockInfo.window.skey;
pQuery->lastKey += step;
qTrace("QInfo:%p skip rows:%d, offset:%" PRId64 "", GET_QINFO_ADDR(pRuntimeEnv), blockInfo.rows, pQuery->limit.offset);
} else { // find the appropriated start position in current block
updateOffsetVal(pRuntimeEnv, &blockInfo);
break;
}
}
}
static UNUSED_FUNC bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
// if queried with value filter, do NOT forward query start position
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) {
return true;
}
if (pQuery->limit.offset > 0 && (!isTopBottomQuery(pQuery)) && pQuery->interpoType == TSDB_INTERPO_NONE) {
/*
* 1. for top/bottom query, the offset applies to the final result, not here
* 2. for interval without interpolation query we forward pQuery->intervalTime at a time for
* pQuery->limit.offset times. Since hole exists, pQuery->intervalTime*pQuery->limit.offset value is
* not valid. otherwise, we only forward pQuery->limit.offset number of points
*/
if (isIntervalQuery(pQuery)) {
assert(pRuntimeEnv->windowResInfo.prevSKey == 0);
TSKEY skey1, ekey1;
STimeWindow w = {0};
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (!tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) {
// todo handle no data situation
}
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle);
if (QUERY_IS_ASC_QUERY(pQuery)) {
getAlignQueryTimeWindow(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, &skey1,
&ekey1, &w);
pWindowResInfo->startTime = w.skey;
pWindowResInfo->prevSKey = w.skey;
} else {
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
TSKEY start = blockInfo.window.ekey - pQuery->intervalTime;
getAlignQueryTimeWindow(pQuery, start, pQuery->window.ekey, blockInfo.window.ekey, &skey1, &ekey1, &w);
pWindowResInfo->startTime = pQuery->window.skey;
pWindowResInfo->prevSKey = w.skey;
}
// the first time window
STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQuery);
while (pQuery->limit.offset > 0) {
STimeWindow tw = win;
getNextTimeWindow(pQuery, &tw);
// next time window starts from current data block
if ((tw.skey <= blockInfo.window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(tw.ekey >= blockInfo.window.skey && !QUERY_IS_ASC_QUERY(pQuery))) {
// query completed
if ((tw.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(tw.ekey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
setQueryStatus(pQuery, QUERY_COMPLETED);
break;
}
tw = win;
SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL);
SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
int32_t startPos = getNextQualifiedWindow(pRuntimeEnv, &tw, pWindowResInfo, &blockInfo, pColInfoData->pData,
binarySearchForKey);
assert(startPos >= 0);
pQuery->limit.offset -= 1;
// set the abort info
pQuery->pos = startPos;
pQuery->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
pWindowResInfo->prevSKey = tw.skey;
win = tw;
continue;
} else {
if (!tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) {
setQueryStatus(pQuery, QUERY_COMPLETED);
break;
}
blockInfo = tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle);
if ((blockInfo.window.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(blockInfo.window.ekey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
setQueryStatus(pQuery, QUERY_COMPLETED);
break;
}
// set the window that start from the next data block
TSKEY key = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.window.skey : blockInfo.window.ekey;
STimeWindow n = getActiveTimeWindow(pWindowResInfo, key, pQuery);
// next data block are still covered by current time window
if (n.skey == win.skey && n.ekey == win.ekey) {
// do nothing
} else {
pQuery->limit.offset -= 1;
// query completed
if ((n.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(n.ekey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
setQueryStatus(pQuery, QUERY_COMPLETED);
break;
}
// set the abort info
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1;
pQuery->lastKey = QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.skey : blockInfo.window.ekey;
pWindowResInfo->prevSKey = n.skey;
win = n;
}
}
}
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) || pQuery->limit.offset > 0) {
setQueryStatus(pQuery, QUERY_COMPLETED);
return false;
} else {
assert(0);
// if (IS_DISK_DATA_BLOCK(pQuery)) {
// getTimestampInDiskBlock(pRuntimeEnv, 0);
}
}
} else { // forward the start position for projection query
skipBlocks(&pQInfo->runtimeEnv);
if (pQuery->limit.offset > 0) {
setQueryStatus(pQuery, QUERY_COMPLETED);
return false;
}
}
return true;
}
int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, bool isSTableQuery) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
......@@ -4091,7 +4291,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery)
}
// create runtime environment
code = setupQueryRuntimeEnv(pRuntimeEnv, NULL, pQuery->order.order);
code = setupQueryRuntimeEnv(pRuntimeEnv, pQuery->order.order);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -4158,11 +4358,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery)
pointInterpSupporterSetData(pQInfo, &interpInfo);
pointInterpSupporterDestroy(&interpInfo);
// todo move to other location
// if (!forwardQueryStartPosIfNeeded(pQInfo, pQInfo, dataInDisk, dataInCache)) {
// return TSDB_CODE_SUCCESS;
// }
int64_t rs = taosGetIntervalStartTimestamp(pQuery->window.skey, pQuery->intervalTime, pQuery->slidingTimeUnit,
pQuery->precision);
taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, rs, 0, 0);
......@@ -4226,18 +4421,18 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
}
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle);
STableDataInfo* pTableDataInfo = NULL;
STable* pTable = NULL;
STableDataInfo *pTableDataInfo = NULL;
STable * pTable = NULL;
// todo opt performance using hash table
size_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
for(int32_t i = 0; i < numOfGroup; ++i) {
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, i);
for (int32_t i = 0; i < numOfGroup; ++i) {
SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, i);
size_t num = taosArrayGetSize(group);
for(int32_t j = 0; j < num; ++j) {
SPair* p = taosArrayGet(group, j);
STableDataInfo* pInfo = p->sec;
for (int32_t j = 0; j < num; ++j) {
SPair * p = taosArrayGet(group, j);
STableDataInfo *pInfo = p->sec;
if (pInfo->pTableQInfo->tid == blockInfo.sid) {
pTableDataInfo = p->sec;
......@@ -4277,14 +4472,14 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
SQuery * pQuery = pRuntimeEnv->pQuery;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
SPair* p = taosArrayGet(group, index);
SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
SPair * p = taosArrayGet(group, index);
STable* pTable = p->first;
STableDataInfo* pInfo = p->sec;
STable * pTable = p->first;
STableDataInfo *pInfo = p->sec;
setTagVal(pRuntimeEnv, pTable->tableId, pQInfo->tsdb);
......@@ -4298,9 +4493,8 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
.numOfCols = pQuery->numOfCols,
};
SArray* g1 = taosArrayInit(1, POINTER_BYTES);
SArray* tx = taosArrayInit(1, sizeof(SPair));
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
SArray *tx = taosArrayInit(1, sizeof(SPair));
taosArrayPush(tx, p);
taosArrayPush(g1, &tx);
......@@ -4377,7 +4571,7 @@ static UNUSED_FUNC int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, i
*/
static void sequentialTableProcess(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
SQuery * pQuery = pRuntimeEnv->pQuery;
setQueryStatus(pQuery, QUERY_COMPLETED);
size_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
......@@ -4467,16 +4661,17 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
resetCtxOutputBuf(pRuntimeEnv);
resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo);
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
assert(taosArrayGetSize(group) == pQInfo->groupInfo.numOfTables && 1 == taosArrayGetSize(pQInfo->groupInfo.pGroupList));
SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
assert(taosArrayGetSize(group) == pQInfo->groupInfo.numOfTables &&
1 == taosArrayGetSize(pQInfo->groupInfo.pGroupList));
while (pQInfo->tableIndex < pQInfo->groupInfo.numOfTables) {
if (isQueryKilled(pQInfo)) {
return;
}
SPair *p = taosArrayGet(group, pQInfo->tableIndex);
STableDataInfo* pInfo = p->sec;
SPair * p = taosArrayGet(group, pQInfo->tableIndex);
STableDataInfo *pInfo = p->sec;
TSKEY skey = pInfo->pTableQInfo->lastKey;
if (skey > 0) {
......@@ -4488,11 +4683,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
continue;
}
// SPointInterpoSupporter pointInterpSupporter = {0};
// SPointInterpoSupporter pointInterpSupporter = {0};
// TODO handle the limit problem
if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) {
// forwardQueryStartPosition(pRuntimeEnv);
// skipBlocks(pRuntimeEnv);
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
pQInfo->tableIndex++;
......@@ -4538,10 +4733,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL));
continue;
} else {
// pQInfo->pTableQuerySupporter->pMeterSidExtInfo[k]->key = pQuery->lastKey;
// // buffer is full, wait for the next round to retrieve data from current meter
// assert(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL));
// break;
// pQInfo->pTableQuerySupporter->pMeterSidExtInfo[k]->key = pQuery->lastKey;
// // buffer is full, wait for the next round to retrieve data from current meter
// assert(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL));
// break;
}
}
}
......@@ -4587,34 +4782,37 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pQuery->rec.total += pQuery->rec.rows;
qTrace( "QInfo %p, numOfTables:%d, index:%d, numOfGroups:%d, %d points returned, total:%d totalReturn:%d,"
" offset:%" PRId64, pQInfo, pQInfo->groupInfo.numOfTables, pQInfo->tableIndex, numOfGroups,
pQuery->rec.rows, pQuery->rec.total, pQuery->limit.offset);
qTrace(
"QInfo %p, numOfTables:%d, index:%d, numOfGroups:%d, %d points returned, total:%d totalReturn:%d,"
" offset:%" PRId64,
pQInfo, pQInfo->groupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, pQuery->rec.total,
pQuery->limit.offset);
}
static void createTableDataInfo(SQInfo* pQInfo) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
static void createTableDataInfo(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
// todo make sure the table are added the reference count to gauranteed that all involved tables are valid
size_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
int32_t index = 0;
for (int32_t i = 0; i < numOfGroups; ++i) { // load all meter meta info
SArray *group = *(SArray**) taosArrayGet(pQInfo->groupInfo.pGroupList, i);
SArray *group = *(SArray **)taosArrayGet(pQInfo->groupInfo.pGroupList, i);
size_t s = taosArrayGetSize(group);
for(int32_t j = 0; j < s; ++j) {
SPair* p = (SPair*) taosArrayGet(group, j);
for (int32_t j = 0; j < s; ++j) {
SPair *p = (SPair *)taosArrayGet(group, j);
// STableDataInfo has been created for each table
if (p->sec != NULL) { // todo refactor
return;
}
STableDataInfo* pInfo = calloc(1, sizeof(STableDataInfo));
STableDataInfo *pInfo = calloc(1, sizeof(STableDataInfo));
setTableDataInfo(pInfo, index, i);
pInfo->pTableQInfo = createTableQueryInfo(&pQInfo->runtimeEnv, ((STable*)(p->first))->tableId.tid, pQuery->window);
pInfo->pTableQInfo =
createTableQueryInfo(&pQInfo->runtimeEnv, ((STable *)(p->first))->tableId.tid, pQuery->window);
p->sec = pInfo;
......@@ -4624,17 +4822,17 @@ static void createTableDataInfo(SQInfo* pQInfo) {
}
static void prepareQueryInfoForReverseScan(SQInfo *pQInfo) {
// SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
// SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
// changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo);
// }
// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
// changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo);
// }
}
static void doSaveContext(SQInfo* pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
static void doSaveContext(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv);
disableFuncForReverseScan(pQInfo, pQuery->order.order);
......@@ -4647,9 +4845,9 @@ static void doSaveContext(SQInfo* pQInfo) {
prepareQueryInfoForReverseScan(pQInfo);
}
static void doRestoreContext(SQInfo* pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
static void doRestoreContext(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
......@@ -4661,22 +4859,22 @@ static void doRestoreContext(SQInfo* pQInfo) {
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
}
static void doCloseAllTimeWindowAfterScan(SQInfo* pQInfo) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if (isIntervalQuery(pQuery)) {
// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
// closeAllTimeWindow(&pTableQueryInfo->windowResInfo);
// }
// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
// closeAllTimeWindow(&pTableQueryInfo->windowResInfo);
// }
size_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
for(int32_t i = 0; i < numOfGroup; ++i) {
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, i);
for (int32_t i = 0; i < numOfGroup; ++i) {
SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, i);
size_t num = taosArrayGetSize(group);
for(int32_t j = 0; j < num; ++j) {
SPair* p = taosArrayGet(group, j);
STableDataInfo* pInfo = p->sec;
for (int32_t j = 0; j < num; ++j) {
SPair * p = taosArrayGet(group, j);
STableDataInfo *pInfo = p->sec;
closeAllTimeWindow(&pInfo->pTableQInfo->windowResInfo);
}
......@@ -4715,8 +4913,8 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
return;
}
qTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", order:%d, forward scan start", pQInfo, pQuery->window.skey,
pQuery->window.ekey, pQuery->order.order);
qTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", order:%d, forward scan start", pQInfo,
pQuery->window.skey, pQuery->window.ekey, pQuery->order.order);
// create the query support structures
createTableDataInfo(pQInfo);
......@@ -4754,7 +4952,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
}
if (isIntervalQuery(pQuery) || isSumAvgRateQuery(pQuery)) {
// assert(pSupporter->groupIndex == 0 && pSupporter->numOfGroupResultPages == 0);
// assert(pSupporter->groupIndex == 0 && pSupporter->numOfGroupResultPages == 0);
if (mergeIntoGroupResult(pQInfo) == TSDB_CODE_SUCCESS) {
copyResToQueryResultBuf(pQInfo, pQuery);
......@@ -4802,13 +5000,20 @@ static void tableFixedOutputProcess(SQInfo *pQInfo) {
static void tableMultiOutputProcess(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
SQuery * pQuery = pRuntimeEnv->pQuery;
// for ts_comp query, re-initialized is not allowed
if (!isTSCompQuery(pQuery)) {
resetCtxOutputBuf(pRuntimeEnv);
}
// skip blocks without load the actual data block from file if no filter condition present
skipBlocks(&pQInfo->runtimeEnv);
if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0) {
setQueryStatus(pQuery, QUERY_COMPLETED);
return;
}
while (1) {
scanAllDataBlocks(pRuntimeEnv);
finalizeQueryResult(pRuntimeEnv);
......@@ -4838,13 +5043,10 @@ static void tableMultiOutputProcess(SQInfo *pQInfo) {
doRevisedResultsByLimit(pQInfo);
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64,
pQInfo, pQuery->lastKey, pQuery->window.ekey);
qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo, pQuery->lastKey,
pQuery->window.ekey);
}
// qTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode,
// pMeterObj->sid, pMeterObj->meterId, pQuery->size, pQInfo->size, pQInfo->pointsReturned);
if (!isTSCompQuery(pQuery)) {
assert(pQuery->rec.rows <= pQuery->rec.capacity);
}
......@@ -4875,7 +5077,7 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery->limit.offset -= c;
}
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED|QUERY_RESBUF_FULL)) {
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED | QUERY_RESBUF_FULL)) {
break;
}
}
......@@ -4912,8 +5114,8 @@ static void tableIntervalProcess(SQInfo *pQInfo) {
}
numOfInterpo = 0;
pQuery->rec.rows = vnodeQueryResultInterpolate(
pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, pQuery->rec.rows, &numOfInterpo);
pQuery->rec.rows = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf,
pQuery->rec.rows, &numOfInterpo);
qTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.rows);
if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
......@@ -4937,9 +5139,9 @@ static void tableIntervalProcess(SQInfo *pQInfo) {
pQInfo->pointsInterpo += numOfInterpo;
}
static void tableQueryImpl(SQInfo* pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
static void tableQueryImpl(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
if (vnodeHasRemainResults(pQInfo)) {
/*
......@@ -4964,7 +5166,6 @@ static void tableQueryImpl(SQInfo* pQInfo) {
// continue to get push data from the group result
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) ||
((isIntervalQuery(pQuery) && pQuery->rec.total < pQuery->limit.limit))) {
// todo limit the output for interval query?
pQuery->rec.rows = 0;
pQInfo->groupIndex = 0; // always start from 0
......@@ -5011,16 +5212,17 @@ static void tableQueryImpl(SQInfo* pQInfo) {
if (isQueryKilled(pQInfo)) {
qTrace("QInfo:%p query is killed", pQInfo);
} else {
// STableId* pTableId = taosArrayGet(pQInfo->groupInfo, 0);
// qTrace("QInfo:%p uid:%" PRIu64 " tid:%d, query completed, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
// pQInfo, pTableId->uid, pTableId->tid, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
// STableId* pTableId = taosArrayGet(pQInfo->groupInfo, 0);
// qTrace("QInfo:%p uid:%" PRIu64 " tid:%d, query completed, %" PRId64 " rows returned, numOfTotal:%" PRId64 "
// rows",
// pQInfo, pTableId->uid, pTableId->tid, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
}
sem_post(&pQInfo->dataReady);
}
static void stableQueryImpl(SQInfo* pQInfo) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
static void stableQueryImpl(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
pQuery->rec.rows = 0;
int64_t st = taosGetTimestampUs();
......@@ -5037,11 +5239,12 @@ static void stableQueryImpl(SQInfo* pQInfo) {
// record the total elapsed time
pQInfo->elapsedTime += (taosGetTimestampUs() - st);
// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size, pQInfo->query.interpoType);
// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size, pQInfo->query.interpoType);
if (pQuery->rec.rows == 0) {
qTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, pQInfo->groupInfo.numOfTables, pQuery->rec.total);
// vnodePrintQueryStatistics(pSupporter);
qTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, pQInfo->groupInfo.numOfTables,
pQuery->rec.total);
// vnodePrintQueryStatistics(pSupporter);
}
sem_post(&pQInfo->dataReady);
......@@ -5095,7 +5298,7 @@ static int32_t validateQueryMsg(SQueryTableMsg *pQueryMsg) {
return 0;
}
static char* createTableIdList(SQueryTableMsg* pQueryMsg, char* pMsg, SArray** pTableIdList) {
static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **pTableIdList) {
assert(pQueryMsg->numOfTables > 0);
*pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableId));
......@@ -5133,7 +5336,7 @@ static char* createTableIdList(SQueryTableMsg* pQueryMsg, char* pMsg, SArray** p
* @return
*/
static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr,
char** tagCond, SColIndex** groupbyCols) {
char **tagCond, SColIndex **groupbyCols) {
pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables);
pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey);
......@@ -5164,7 +5367,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
char *pMsg = (char *)(pQueryMsg->colList) + sizeof(SColumnInfo) * pQueryMsg->numOfCols;
for (int32_t col = 0; col < pQueryMsg->numOfCols; ++col) {
SColumnInfo* pColInfo = &pQueryMsg->colList[col];
SColumnInfo *pColInfo = &pQueryMsg->colList[col];
pColInfo->colId = htons(pColInfo->colId);
pColInfo->type = htons(pColInfo->type);
......@@ -5256,16 +5459,16 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pMsg = createTableIdList(pQueryMsg, pMsg, pTableIdList);
if (pQueryMsg->numOfGroupCols > 0) { // group by tag columns
*groupbyCols = malloc(pQueryMsg->numOfGroupCols*sizeof(SColIndex));
*groupbyCols = malloc(pQueryMsg->numOfGroupCols * sizeof(SColIndex));
for(int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) {
(*groupbyCols)[i].colId = *(int16_t*) pMsg;
for (int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) {
(*groupbyCols)[i].colId = *(int16_t *)pMsg;
pMsg += sizeof((*groupbyCols)[i].colId);
(*groupbyCols)[i].colIndex = *(int16_t*) pMsg;
(*groupbyCols)[i].colIndex = *(int16_t *)pMsg;
pMsg += sizeof((*groupbyCols)[i].colIndex);
(*groupbyCols)[i].flag = *(int16_t*) pMsg;
(*groupbyCols)[i].flag = *(int16_t *)pMsg;
pMsg += sizeof((*groupbyCols)[i].flag);
memcpy((*groupbyCols)[i].name, pMsg, tListLen(groupbyCols[i]->name));
......@@ -5294,12 +5497,13 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen);
}
qTrace("qmsg:%p query on %d table(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, ts order:%d, "
"outputCols:%d, numOfCols:%d, interval:%d" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64,
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey,
pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->numOfOutputCols,
pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->interpoType, pQueryMsg->tsLen,
pQueryMsg->limit, pQueryMsg->offset);
qTrace("qmsg:%p query on %d table(s), qrange:%" PRId64 "-%" PRId64
", numOfGroupbyTagCols:%d, ts order:%d, "
"outputCols:%d, numOfCols:%d, interval:%d" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64
", offset:%" PRId64,
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols,
pQueryMsg->order, pQueryMsg->numOfOutputCols, pQueryMsg->numOfCols, pQueryMsg->intervalTime,
pQueryMsg->interpoType, pQueryMsg->tsLen, pQueryMsg->limit, pQueryMsg->offset);
return 0;
}
......@@ -5355,7 +5559,8 @@ static int32_t buildAirthmeticExprFromMsg(SSqlFunctionExpr *pExpr, SQueryTableMs
return TSDB_CODE_SUCCESS;
}
static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr **pSqlFuncExpr, SSqlFuncExprMsg** pExprMsg) {
static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr **pSqlFuncExpr,
SSqlFuncExprMsg **pExprMsg) {
*pSqlFuncExpr = NULL;
int32_t code = TSDB_CODE_SUCCESS;
......@@ -5434,7 +5639,7 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunct
return TSDB_CODE_SUCCESS;
}
static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex* pColIndex, int32_t *code) {
static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code) {
if (pQueryMsg->numOfGroupCols == 0) {
return NULL;
}
......@@ -5538,31 +5743,31 @@ static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) {
return TSDB_CODE_SUCCESS;
}
static void doUpdateExprColumnIndex(SQuery* pQuery) {
static void doUpdateExprColumnIndex(SQuery *pQuery) {
assert(pQuery->pSelectExpr != NULL && pQuery != NULL);
// int32_t i = 0, j = 0;
// while (i < pQuery->numOfCols && j < pMeterObj->numOfColumns) {
// if (pQuery->colList[i].data.colId == pMeterObj->schema[j].colId) {
// pQuery->colList[i++].colIndex = (int16_t)j++;
// } else if (pQuery->colList[i].data.colId < pMeterObj->schema[j].colId) {
// pQuery->colList[i++].colIndex = -1;
// } else if (pQuery->colList[i].data.colId > pMeterObj->schema[j].colId) {
// j++;
// }
// }
// int32_t i = 0, j = 0;
// while (i < pQuery->numOfCols && j < pMeterObj->numOfColumns) {
// if (pQuery->colList[i].data.colId == pMeterObj->schema[j].colId) {
// pQuery->colList[i++].colIndex = (int16_t)j++;
// } else if (pQuery->colList[i].data.colId < pMeterObj->schema[j].colId) {
// pQuery->colList[i++].colIndex = -1;
// } else if (pQuery->colList[i].data.colId > pMeterObj->schema[j].colId) {
// j++;
// }
// }
// while (i < pQuery->numOfCols) {
// pQuery->colList[i++].colIndex = -1; // not such column in current meter
// }
// while (i < pQuery->numOfCols) {
// pQuery->colList[i++].colIndex = -1; // not such column in current meter
// }
for(int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
SSqlFuncExprMsg* pSqlExprMsg = &pQuery->pSelectExpr[k].pBase;
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
SSqlFuncExprMsg *pSqlExprMsg = &pQuery->pSelectExpr[k].pBase;
if (pSqlExprMsg->functionId == TSDB_FUNC_ARITHM || pSqlExprMsg->colInfo.flag == TSDB_COL_TAG) {
continue;
}
SColIndex* pColIndexEx = &pSqlExprMsg->colInfo;
for(int32_t f = 0; f < pQuery->numOfCols; ++f) {
SColIndex *pColIndexEx = &pSqlExprMsg->colInfo;
for (int32_t f = 0; f < pQuery->numOfCols; ++f) {
if (pColIndexEx->colId == pQuery->colList[f].info.colId) {
pColIndexEx->colIndex = f;
break;
......@@ -5719,12 +5924,12 @@ static bool isValidQInfo(void *param) {
* pQInfo->signature may be changed by another thread, so we assign value of signature
* into local variable, then compare by using local variable
*/
uint64_t sig = (uint64_t) pQInfo->signature;
uint64_t sig = (uint64_t)pQInfo->signature;
return (sig == (uint64_t)pQInfo);
}
static void freeQInfo(SQInfo *pQInfo);
static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void* tsdb, SQInfo *pQInfo, bool isSTable) {
static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, SQInfo *pQInfo, bool isSTable) {
int32_t code = TSDB_CODE_SUCCESS;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
......@@ -5768,7 +5973,7 @@ static void freeQInfo(SQInfo *pQInfo) {
return;
}
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
setQueryKilled(pQInfo);
qTrace("QInfo:%p start to free QInfo", pQInfo);
......@@ -5811,8 +6016,8 @@ static void freeQInfo(SQInfo *pQInfo) {
tfree(pQuery);
int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
for(int32_t i = 0; i < numOfGroups; ++i) {
SArray* p = taosArrayGetP(pQInfo->groupInfo.pGroupList, i);
for (int32_t i = 0; i < numOfGroups; ++i) {
SArray *p = taosArrayGetP(pQInfo->groupInfo.pGroupList, i);
taosArrayDestroy(p);
}
......@@ -5826,7 +6031,7 @@ static void freeQInfo(SQInfo *pQInfo) {
}
static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
/*
* get the file size and set the numOfRows to be the file size, since for tsComp query,
......@@ -5881,15 +6086,15 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
// todo if interpolation exists, the result may be dump to client by several rounds
}
int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) {
int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) {
assert(pQueryMsg != NULL);
int32_t code = TSDB_CODE_SUCCESS;
char* tagCond = NULL;
SArray *pTableIdList = NULL;
SSqlFuncExprMsg** pExprMsg = NULL;
SColIndex* pGroupColIndex = NULL;
char * tagCond = NULL;
SArray * pTableIdList = NULL;
SSqlFuncExprMsg **pExprMsg = NULL;
SColIndex * pGroupColIndex = NULL;
if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &pGroupColIndex)) != TSDB_CODE_SUCCESS) {
return code;
......@@ -5924,10 +6129,11 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
if ((pQueryMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) {
isSTableQuery = true;
STableId* id = taosArrayGet(pTableIdList, 0);
id->uid = -1; //todo fix me
STableId *id = taosArrayGet(pTableIdList, 0);
id->uid = -1; // todo fix me
/*int32_t ret =*/ tsdbQueryByTagsCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, &groupInfo, pGroupColIndex, pQueryMsg->numOfGroupCols);
/*int32_t ret =*/tsdbQueryByTagsCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, &groupInfo, pGroupColIndex,
pQueryMsg->numOfGroupCols);
if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query
code = TSDB_CODE_SUCCESS;
goto _query_over;
......@@ -5935,7 +6141,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
} else {
assert(taosArrayGetSize(pTableIdList) == 1);
STableId* id = taosArrayGet(pTableIdList, 0);
STableId *id = taosArrayGet(pTableIdList, 0);
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) {
goto _query_over;
}
......@@ -5962,7 +6168,7 @@ void qDestroyQueryInfo(qinfo_t pQInfo) {
}
void qTableQuery(qinfo_t qinfo) {
SQInfo* pQInfo = (SQInfo*) qinfo;
SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || pQInfo->signature != pQInfo) {
qTrace("%p freed abort query", pQInfo);
......@@ -5986,7 +6192,7 @@ void qTableQuery(qinfo_t qinfo) {
}
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) {
SQInfo* pQInfo = (SQInfo*) qinfo;
SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_INVALID_QHANDLE;
......@@ -6006,13 +6212,13 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) {
}
bool qHasMoreResultsToRetrieve(qinfo_t qinfo) {
SQInfo* pQInfo = (SQInfo*) qinfo;
SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || pQInfo->signature != pQInfo || pQInfo->code != TSDB_CODE_SUCCESS) {
return false;
}
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if (Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
return false;
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
......@@ -6024,14 +6230,14 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo) {
}
}
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen) {
SQInfo* pQInfo = (SQInfo*) qinfo;
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen) {
SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_INVALID_QHANDLE;
}
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
size_t size = getResultSize(pQInfo, &pQuery->rec.rows);
*contLen = size + sizeof(SRetrieveTableRsp);
......@@ -6061,9 +6267,9 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* co
return code;
// if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) {
// qTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code);
// vnodeDecRefCount(pObj->qhandle);
// pObj->qhandle = NULL;
// }
// if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) {
// qTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code);
// vnodeDecRefCount(pObj->qhandle);
// pObj->qhandle = NULL;
// }
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册