From f192397c0f2f833d504bd631aa3daa402c9b88b7 Mon Sep 17 00:00:00 2001 From: slguan Date: Thu, 29 Aug 2019 17:56:57 +0800 Subject: [PATCH] fix some possible query errors --- src/system/inc/vnode.h | 6 +- src/system/src/mgmtDb.c | 2 + src/system/src/mgmtMeter.c | 3 +- src/system/src/vnodeQueryImpl.c | 100 +++++++++++++++++++++++------ src/system/src/vnodeQueryProcess.c | 5 +- 5 files changed, 90 insertions(+), 26 deletions(-) diff --git a/src/system/inc/vnode.h b/src/system/inc/vnode.h index fa118cd1ec..139ec8d373 100644 --- a/src/system/inc/vnode.h +++ b/src/system/inc/vnode.h @@ -382,9 +382,9 @@ int vnodeCreateMeterObj(SMeterObj *pNew, SConnSec *pSec); int vnodeRemoveMeterObj(int vnode, int sid); -int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *, int sversion, int *numOfPoints); +int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *, int sversion, int *numOfPoints, TSKEY now); -int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *, int sversion, int *numOfPoints); +int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *, int sversion, int *numOfPoints, TSKEY now); int vnodeInsertBufferedPoints(int vnode); @@ -537,7 +537,7 @@ void vnodeRemoveCommitLog(int vnode); int vnodeWriteToCommitLog(SMeterObj *pObj, char action, char *cont, int contLen, int sversion); -extern int (*vnodeProcessAction[])(SMeterObj *, char *, int, char, void *, int, int *); +extern int (*vnodeProcessAction[])(SMeterObj *, char *, int, char, void *, int, int *, TSKEY); extern int (*pCompFunc[])(const char *const input, int inputSize, const int elements, char *const output, int outputSize, char algorithm, char *const buffer, int bufferSize); diff --git a/src/system/src/mgmtDb.c b/src/system/src/mgmtDb.c index 63d3759a66..e2e46f95a8 100644 --- a/src/system/src/mgmtDb.c +++ b/src/system/src/mgmtDb.c @@ -366,6 +366,8 @@ int mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) { if (pAlter->daysToKeep > 0) { mTrace("db:%s daysToKeep:%d change to %d", pDb->name, pDb->cfg.daysToKeep, pAlter->daysToKeep); pDb->cfg.daysToKeep = pAlter->daysToKeep; + } else { + return TSDB_CODE_INVALID_OPTION; } if (sdbUpdateRow(dbSdb, pDb, tsDbUpdateSize, 1) < 0) { diff --git a/src/system/src/mgmtMeter.c b/src/system/src/mgmtMeter.c index bf09c90c7d..dc1fa004ea 100644 --- a/src/system/src/mgmtMeter.c +++ b/src/system/src/mgmtMeter.c @@ -1052,7 +1052,8 @@ static void mgmtRetrieveMetersFromIDs(tQueryResultset *pRes, char *queryStr, cha } /* queried meter not belongs to this metric, ignore */ - if (mgmtGetMeter(pMeterObj->pTagData)->uid != pMetric->uid) { + if (mgmtGetMeter(pMeterObj->pTagData)->uid != pMetric->uid || + strncmp(pMetric->meterId, pMeterObj->pTagData, TSDB_METER_ID_LEN) != 0) { continue; } diff --git a/src/system/src/vnodeQueryImpl.c b/src/system/src/vnodeQueryImpl.c index 29e1d4c3c4..82d74bb059 100644 --- a/src/system/src/vnodeQueryImpl.c +++ b/src/system/src/vnodeQueryImpl.c @@ -642,14 +642,19 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR int32_t ret = 0; - /* the first round always be 1, the secondary round is determined by queried - * function */ + // the first round always be 1, the secondary round is determined by queried function int32_t round = pRuntimeEnv->scanFlag; while (j < pBlock->numOfCols && i < pQuery->numOfCols) { if ((*pField)[j].colId < pQuery->colList[i].data.colId) { ++j; } else if ((*pField)[j].colId == pQuery->colList[i].data.colId) { + // add additional check for data type + if ((*pField)[j].type != pQuery->colList[i].data.type) { + ret = TSDB_CODE_INVALID_QUERY_MSG; + break; + } + /* * during supplementary scan: * 1. primary ts column (always loaded) @@ -1919,13 +1924,12 @@ static bool cacheBoundaryCheck(SQuery *pQuery, SMeterObj *pMeterObj) { TSKEY min, max; getQueryRange(pQuery, &min, &max); - // the query time range is earlier than the first element in cache. abort - if (max < keyFirst) { - setQueryStatus(pQuery, QUERY_COMPLETED); - return false; - } - - if (min > keyLast) { + /* + * The query time range is earlier than the first element or later than the last elements in cache. + * If the query window happens to overlap with the time range of disk files but not data in cache, + * the flag needs to be cleared. Otherwise, this flag will cause error in following processing. + */ + if (max < keyFirst || min > keyLast) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return false; } @@ -2072,6 +2076,8 @@ void vnodeCheckIfDataExists(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, *dataInCache = hasDataInCache(pRuntimeEnv, pMeterObj); *dataInDisk = hasDataInDisk(pQuery, pMeterObj); + + setQueryStatus(pQuery, QUERY_NOT_COMPLETED); } static void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t qualifiedKey, int64_t keyFirst, int64_t keyLast, @@ -2685,23 +2691,75 @@ static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) { qsort(pRuntimeEnv->pHeaderFiles, (size_t)pRuntimeEnv->numOfFiles, sizeof(SQueryFileInfo), file_order_comparator); } -static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, void *pBlock) { +static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* pBlockInfo, void *pBlock) { SQuery *pQuery = pRuntimeEnv->pQuery; + int32_t newPos = pQuery->pos; if (QUERY_IS_ASC_QUERY(pQuery)) { - pQuery->pos += pQuery->limit.offset; + if (newPos + pQuery->limit.offset > pBlockInfo->size) { + newPos = pBlockInfo->size - 1; + } else { + newPos += pQuery->limit.offset; + } } else { - pQuery->pos -= pQuery->limit.offset; + if (newPos < pQuery->limit.offset) { + newPos = 0; + } else { + newPos -= pQuery->limit.offset; + } } + TSKEY newKey = 0; if (IS_DISK_DATA_BLOCK(pQuery)) { - pQuery->skey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); + newKey = getTimestampInDiskBlock(pRuntimeEnv, newPos); } else { - pQuery->skey = getTimestampInCacheBlock(pBlock, pQuery->pos); + newKey = getTimestampInCacheBlock(pBlock, newPos); } - pQuery->lastKey = pQuery->skey; - pQuery->limit.offset = 0; + /* + * The actually qualified points that can be skipped needs to be calculated if query is + * done in current data block + */ + if ((newKey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (newKey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { + setQueryStatus(pQuery, QUERY_COMPLETED); + + // update the pQuery->limit.offset value, and pQuery->pos value + TSKEY* keys = NULL; + if (IS_DISK_DATA_BLOCK(pQuery)) { + keys = (TSKEY *) pRuntimeEnv->primaryColBuffer->data; + } else { + keys = (TSKEY *) (((SCacheBlock *)pBlock)->offset[0]); + } + + int32_t i = 0; + if (QUERY_IS_ASC_QUERY(pQuery)) { + for(i = pQuery->pos; i < pBlockInfo->size && pQuery->limit.offset > 0; ++i) { + if (keys[i] <= pQuery->ekey) { + pQuery->limit.offset -= 1; + } else { + break; + } + } + } else { + for(i = pQuery->pos; i >= 0 && pQuery->limit.offset > 0; --i) { + if (keys[i] >= pQuery->ekey) { + pQuery->limit.offset -= 1; + } else { + break; + } + } + } + + pQuery->pos = i; + } else { + pQuery->skey = newKey; + pQuery->lastKey = pQuery->skey; + + // update the offset value + pQuery->limit.offset -= abs(newPos - pQuery->pos); + pQuery->pos = newPos; + } } // todo ignore the avg/sum/min/max/count/stddev/top/bottom functions, of which @@ -2816,8 +2874,9 @@ static int32_t doSkipDataBlock(SQueryRuntimeEnv *pRuntimeEnv) { int32_t maxReads = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.size - pQuery->pos : pQuery->pos + 1; - if (pQuery->limit.offset < maxReads) { // start position in current block - updateOffsetVal(pRuntimeEnv, pBlock); + if (pQuery->limit.offset < maxReads || (pQuery->ekey <= blockInfo.keyLast && QUERY_IS_ASC_QUERY(pQuery)) || + (pQuery->ekey >= blockInfo.keyFirst && !QUERY_IS_ASC_QUERY(pQuery))) { // start position in current block + updateOffsetVal(pRuntimeEnv, &blockInfo, pBlock); break; } else { pQuery->limit.offset -= maxReads; @@ -2843,8 +2902,9 @@ void forwardQueryStartPosition(SQueryRuntimeEnv *pRuntimeEnv) { SBlockInfo blockInfo = getBlockBasicInfo(pBlock, blockType); int32_t maxReads = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.size - pQuery->pos : pQuery->pos + 1; - if (pQuery->limit.offset < maxReads) { // start position in current block - updateOffsetVal(pRuntimeEnv, pBlock); + if (pQuery->limit.offset < maxReads || (pQuery->ekey <= blockInfo.keyLast && QUERY_IS_ASC_QUERY(pQuery)) || + (pQuery->ekey >= blockInfo.keyFirst && !QUERY_IS_ASC_QUERY(pQuery))) { // start position in current block + updateOffsetVal(pRuntimeEnv, &blockInfo, pBlock); } else { pQuery->limit.offset -= maxReads; doSkipDataBlock(pRuntimeEnv); diff --git a/src/system/src/vnodeQueryProcess.c b/src/system/src/vnodeQueryProcess.c index 6367e78944..fef9e5d666 100644 --- a/src/system/src/vnodeQueryProcess.c +++ b/src/system/src/vnodeQueryProcess.c @@ -404,7 +404,8 @@ static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool * vnodeCheckIfDataExists(pRuntimeEnv, pMeterObj, dataInDisk, dataInCache); - if (pQuery->lastKey > pMeterObj->lastKey && QUERY_IS_ASC_QUERY(pQuery)) { + // data in file or cache is not qualified for the query. abort + if (!(dataInCache || dataInDisk)) { dTrace("QInfo:%p vid:%d sid:%d meterId:%s, qrange:%lld-%lld, nores, %p", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->skey, pQuery->ekey, pQuery); return false; @@ -578,7 +579,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) { forwardQueryStartPosition(pRuntimeEnv); - if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) { + if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK|QUERY_COMPLETED)) { pQuery->skey = pSupporter->rawSKey; pQuery->ekey = pSupporter->rawEKey; continue; -- GitLab