提交 c6b0d6b0 编写于 作者: H hjxilinx

support the sliding query [tbase-266]

上级 f7261780
......@@ -62,7 +62,7 @@ static int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQ
static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes,
int8_t type, char* fieldName);
static int32_t changeFunctionID(int32_t optr, int16_t* functionId);
static int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isMetric);
static int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable);
static bool validateIpAddress(const char* ip, size_t size);
static bool hasUnsupportFunctionsForSTableQuery(SQueryInfo* pQueryInfo);
......@@ -93,6 +93,8 @@ static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo);
static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField);
static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo);
static bool hasDefaultQueryTimeRange(SQueryInfo *pQueryInfo);
static void updateTagColumnIndex(SQueryInfo* pQueryInfo, int32_t tableIndex);
static int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t index, SQuerySQL* pQuerySql, SSqlObj* pSql);
......@@ -4432,8 +4434,6 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
const char* msg1 = "slimit/soffset only available for STable query";
const char* msg2 = "function not supported on table";
const char* msg3 = "slimit/soffset can not apply to projection query";
const char* msg4 = "projection on super table requires order by clause along with limitation";
const char* msg5 = "ordered projection result too large";
// handle the limit offset value, validate the limit
pQueryInfo->limit = pQuerySql->limit;
......@@ -5542,28 +5542,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
}
}
// set sliding value
SSQLToken* pSliding = &pQuerySql->sliding;
if (pSliding->n != 0) {
if (!tscEmbedded && pCmd->inStream == 0) { // sliding only allowed in stream
const char* msg = "not support sliding in query";
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
}
getTimestampInUsFromStr(pSliding->z, pSliding->n, &pQueryInfo->nSlidingTime);
if (pMeterMetaInfo->pMeterMeta->precision == TSDB_TIME_PRECISION_MILLI) {
pQueryInfo->nSlidingTime /= 1000;
}
if (pQueryInfo->nSlidingTime < tsMinSlidingTime) {
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
}
if (pQueryInfo->nSlidingTime > pQueryInfo->nAggTimeInterval) {
return invalidSqlErrMsg(pQueryInfo->msg, msg4);
}
}
// set order by info
if (parseOrderbyClause(pQueryInfo, pQuerySql, tsGetSchema(pMeterMetaInfo->pMeterMeta)) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_INVALID_SQL;
......@@ -5602,6 +5580,30 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
if (!hasTimestampForPointInterpQuery(pQueryInfo)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
}
// set sliding value, the query time range needs to be decide in the first place
SSQLToken* pSliding = &pQuerySql->sliding;
if (pSliding->n != 0) {
if (!tscEmbedded && pCmd->inStream == 0 && hasDefaultQueryTimeRange(pQueryInfo)) { // sliding only allowed in stream
const char* msg = "time range expected for sliding window query";
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
}
getTimestampInUsFromStr(pSliding->z, pSliding->n, &pQueryInfo->nSlidingTime);
if (pMeterMetaInfo->pMeterMeta->precision == TSDB_TIME_PRECISION_MILLI) {
pQueryInfo->nSlidingTime /= 1000;
}
if (pQueryInfo->nSlidingTime < tsMinSlidingTime) {
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
}
if (pQueryInfo->nSlidingTime > pQueryInfo->nAggTimeInterval) {
return invalidSqlErrMsg(pQueryInfo->msg, msg4);
}
} else {
pQueryInfo->nSlidingTime = -1;
}
// in case of join query, time range is required.
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
......@@ -5651,3 +5653,8 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return TSDB_CODE_SUCCESS; // Does not build query message here
}
bool hasDefaultQueryTimeRange(SQueryInfo *pQueryInfo) {
return (pQueryInfo->stime == 0 && pQueryInfo->etime == INT64_MAX) ||
(pQueryInfo->stime == INT64_MAX && pQueryInfo->etime == 0);
}
\ No newline at end of file
......@@ -260,6 +260,7 @@ typedef struct SQuery {
TSKEY skey;
TSKEY ekey;
int64_t nAggTimeInterval;
int64_t slidingTime; // sliding time for sliding window query
char intervalTimeUnit; // interval data type, used for daytime revise
int8_t precision;
int16_t numOfOutputCols;
......
......@@ -64,7 +64,7 @@ typedef enum {
* the next query.
*
* this status is only exist in group-by clause and
* diff/add/division/mulitply/ query.
* diff/add/division/multiply/ query.
*/
QUERY_RESBUF_FULL = 0x2,
......@@ -149,7 +149,6 @@ void vnodeScanAllData(SQueryRuntimeEnv* pRuntimeEnv);
int32_t vnodeQueryResultInterpolate(SQInfo* pQInfo, tFilePage** pDst, tFilePage** pDataSrc, int32_t numOfRows,
int32_t* numOfInterpo);
void copyResToQueryResultBuf(SMeterQuerySupportObj* pSupporter, SQuery* pQuery);
void moveDescOrderResultsToFront(SQueryRuntimeEnv* pRuntimeEnv);
void doSkipResults(SQueryRuntimeEnv* pRuntimeEnv);
void doFinalizeResult(SQueryRuntimeEnv* pRuntimeEnv);
......@@ -159,7 +158,7 @@ void forwardIntervalQueryRange(SMeterQuerySupportObj* pSupporter, SQueryRuntimeE
void forwardQueryStartPosition(SQueryRuntimeEnv* pRuntimeEnv);
bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, SMeterQuerySupportObj* pSupporter,
SPointInterpoSupporter* pPointInterpSupporter);
SPointInterpoSupporter* pPointInterpSupporter, int64_t* key);
void pointInterpSupporterInit(SQuery* pQuery, SPointInterpoSupporter* pInterpoSupport);
void pointInterpSupporterDestroy(SPointInterpoSupporter* pPointInterpSupport);
......@@ -278,6 +277,11 @@ void displayInterResult(SData** pdata, SQuery* pQuery, int32_t numOfRows);
void vnodePrintQueryStatistics(SMeterQuerySupportObj* pSupporter);
void clearGroupResultBuf(SOutputRes* pOneOutputRes, int32_t nOutputCols);
void copyGroupResultBuf(SOutputRes* dst, const SOutputRes* src, int32_t nOutputCols);
void resetResWindowInfo(SSlidingWindowResInfo* pWindowResInfo, int32_t numOfCols);
void clearCompletedResWindows(SSlidingWindowResInfo* pWindowResInfo, int32_t numOfCols);
int32_t numOfResFromResWindowInfo(SSlidingWindowResInfo* pWindowResInfo);
#ifdef __cplusplus
}
......
......@@ -112,7 +112,31 @@ typedef struct SQueryFilesInfo {
char dbFilePathPrefix[PATH_MAX];
} SQueryFilesInfo;
typedef struct RuntimeEnvironment {
typedef struct STimeWindow {
TSKEY skey;
TSKEY ekey;
} STimeWindow;
typedef struct SWindowStatus {
STimeWindow window;
bool closed;
} SWindowStatus;
typedef struct SSlidingWindowResInfo {
SOutputRes* pResult; // reference to SQuerySupporter->pResult
SWindowStatus* pStatus; // current query window closed or not?
void* hashList; // hash list for quick access
int16_t type; // data type for hash key
int32_t capacity; // max capacity
int32_t curIndex; // current start active index
int32_t size;
int64_t startTime; // start time of the first time window for sliding query
int64_t prevSKey; // previous (not completed) sliding window start key
int64_t threshold; // threshold for return completed results.
} SSlidingWindowResInfo;
typedef struct SQueryRuntimeEnv {
SPositionInfo startPos; /* the start position, used for secondary/third iteration */
SPositionInfo endPos; /* the last access position in query, served as the start pos of reversed order query */
SPositionInfo nextPos; /* start position of the next scan */
......@@ -134,13 +158,16 @@ typedef struct RuntimeEnvironment {
int16_t scanFlag; // denotes reversed scan of data or not
SInterpolationInfo interpoInfo;
SData** pInterpoBuf;
SOutputRes* pResult; // reference to SQuerySupporter->pResult
void* hashList;
int32_t usedIndex; // assigned SOutputRes in list
SSlidingWindowResInfo swindowResInfo;
STSBuf* pTSBuf;
STSCursor cur;
SQueryCostSummary summary;
TSKEY intervalSKey; // skey of the complete time window, not affected by the actual data distribution
TSKEY intervalEKey; // ekey of the complete time window
/*
* Temporarily hold the in-memory cache block info during scan cache blocks
* Here we do not use the cacheblock info from pMeterObj, simple because it may change anytime
......
......@@ -13,12 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "hash.h"
#include "hashutil.h"
#include "os.h"
#include "taosmsg.h"
#include "textbuffer.h"
#include "ttime.h"
#include "hash.h"
#include "hashutil.h"
#include "tinterpolation.h"
#include "tscJoinProcess.h"
......@@ -69,15 +69,15 @@ static int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj
static void setGroupOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes *pResult);
static void getAlignedIntervalQueryRange(SQuery *pQuery, TSKEY keyInData, TSKEY skey, TSKEY ekey);
static void getAlignedIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY keyInData, TSKEY skey, TSKEY ekey);
static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pInfo,
SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields,
__block_search_fn_t searchFn);
static int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult);
static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx,
int64_t *pPrimaryData, SBlockInfo *pBlockInfo, int32_t blockStatus,
SField *pFields, __block_search_fn_t searchFn);
static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, int64_t *pPrimaryData,
SBlockInfo *pBlockInfo, int32_t blockStatus, SField *pFields,
__block_search_fn_t searchFn);
static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx);
static int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery,
......@@ -86,6 +86,9 @@ static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEn
static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid);
static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
static void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t pKey, int64_t keyFirst, int64_t keyLast,
int64_t *actualSkey, int64_t *actualEkey, int64_t *skey, int64_t *ekey);
static void getNextLogicalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t *skey, int64_t *ekey);
// check the offset value integrity
static FORCE_INLINE int32_t validateHeaderOffsetSegment(SQInfo *pQInfo, char *filePath, int32_t vid, char *data,
......@@ -108,8 +111,9 @@ static FORCE_INLINE int32_t getCompHeaderStartPosition(SVnodeCfg *pCfg) {
static FORCE_INLINE int32_t validateCompBlockOffset(SQInfo *pQInfo, SMeterObj *pMeterObj, SCompHeader *pCompHeader,
SQueryFilesInfo *pQueryFileInfo, int32_t headerSize) {
if (pCompHeader->compInfoOffset < headerSize || pCompHeader->compInfoOffset > pQueryFileInfo->headerFileSize) {
dError("QInfo:%p vid:%d sid:%d id:%s, compInfoOffset:%" PRId64 " is not valid, size:%" PRId64, pQInfo, pMeterObj->vnode,
pMeterObj->sid, pMeterObj->meterId, pCompHeader->compInfoOffset, pQueryFileInfo->headerFileSize);
dError("QInfo:%p vid:%d sid:%d id:%s, compInfoOffset:%" PRId64 " is not valid, size:%" PRId64, pQInfo,
pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pCompHeader->compInfoOffset,
pQueryFileInfo->headerFileSize);
return -1;
}
......@@ -121,8 +125,8 @@ static FORCE_INLINE int32_t validateCompBlockOffset(SQInfo *pQInfo, SMeterObj *p
static FORCE_INLINE int32_t validateCompBlockInfoSegment(SQInfo *pQInfo, const char *filePath, int32_t vid,
SCompInfo *compInfo, int64_t offset) {
if (!taosCheckChecksumWhole((uint8_t *)compInfo, sizeof(SCompInfo))) {
dLError("QInfo:%p vid:%d, failed to read header file:%s, file compInfo broken, offset:%" PRId64, pQInfo, vid, filePath,
offset);
dLError("QInfo:%p vid:%d, failed to read header file:%s, file compInfo broken, offset:%" PRId64, pQInfo, vid,
filePath, offset);
return -1;
}
return 0;
......@@ -163,27 +167,27 @@ bool isGroupbyNormalCol(SSqlGroupbyExpr *pGroupbyExpr) {
return false;
}
int16_t getGroupbyColumnType(SQuery* pQuery, SSqlGroupbyExpr *pGroupbyExpr) {
int16_t getGroupbyColumnType(SQuery *pQuery, SSqlGroupbyExpr *pGroupbyExpr) {
assert(pGroupbyExpr != NULL);
int32_t colId = -2;
int16_t type = TSDB_DATA_TYPE_NULL;
for(int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) {
for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) {
SColIndexEx *pColIndex = &pGroupbyExpr->columnInfo[i];
if (pColIndex->flag == TSDB_COL_NORMAL) {
colId = pColIndex->colId;
break;
}
}
for(int32_t i = 0; i < pQuery->numOfCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
if (colId == pQuery->colList[i].data.colId) {
type = pQuery->colList[i].data.type;
break;
}
}
return type;
}
......@@ -236,7 +240,7 @@ static bool vnodeIsCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj
static void vnodeSetCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex, int32_t sid) {
SLoadCompBlockInfo *pCompBlockLoadInfo = &pRuntimeEnv->loadCompBlockInfo;
pCompBlockLoadInfo->sid = sid;
pCompBlockLoadInfo->fileListIndex = fileIndex;
pCompBlockLoadInfo->fileId = pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex].fileID;
......@@ -250,7 +254,7 @@ static void vnodeInitLoadCompBlockInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) {
static int32_t vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex,
bool loadPrimaryTS) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQuery * pQuery = pRuntimeEnv->pQuery;
SLoadDataBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo;
/* this block has been loaded into memory, return directly */
......@@ -269,7 +273,7 @@ static int32_t vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *
static void vnodeSetDataBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex,
bool tsLoaded) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQuery * pQuery = pRuntimeEnv->pQuery;
SLoadDataBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo;
pLoadInfo->fileId = pQuery->fileId;
......@@ -344,7 +348,7 @@ static void doCloseQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) {
tclose(pVnodeFilesInfo->headerFd);
tclose(pVnodeFilesInfo->dataFd);
tclose(pVnodeFilesInfo->lastFd);
pVnodeFilesInfo->current = -1;
pVnodeFilesInfo->headerFileSize = -1;
}
......@@ -363,7 +367,7 @@ static void doInitQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) {
*/
static int32_t doOpenQueryFile(SQInfo *pQInfo, SQueryFilesInfo *pVnodeFileInfo) {
SHeaderFileInfo *pHeaderFileInfo = &pVnodeFileInfo->pFileInfo[pVnodeFileInfo->current];
/*
* current header file is empty or broken, return directly.
*
......@@ -373,10 +377,10 @@ static int32_t doOpenQueryFile(SQInfo *pQInfo, SQueryFilesInfo *pVnodeFileInfo)
if (checkIsHeaderFileEmpty(pVnodeFileInfo)) {
qTrace("QInfo:%p vid:%d, fileId:%d, index:%d, size:%d, ignore file, empty or broken", pQInfo,
pVnodeFileInfo->vnodeId, pHeaderFileInfo->fileID, pVnodeFileInfo->current, pVnodeFileInfo->headerFileSize);
return -1;
}
pVnodeFileInfo->headerFd = open(pVnodeFileInfo->headerFilePath, O_RDONLY);
if (!FD_VALID(pVnodeFileInfo->headerFd)) {
dError("QInfo:%p failed open head file:%s reason:%s", pQInfo, pVnodeFileInfo->headerFilePath, strerror(errno));
......@@ -403,7 +407,7 @@ static void doCloseQueryFiles(SQueryFilesInfo *pVnodeFileInfo) {
assert(pVnodeFileInfo->current < pVnodeFileInfo->numOfFiles && pVnodeFileInfo->current >= 0);
pVnodeFileInfo->headerFileSize = -1;
doCloseQueryFileInfoFD(pVnodeFileInfo);
}
......@@ -478,38 +482,38 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
if (ret != TSDB_CODE_SUCCESS) {
return -1; // failed to load the header file data into memory
}
char* buf = calloc(1, getCompHeaderSegSize(pCfg));
char * buf = calloc(1, getCompHeaderSegSize(pCfg));
SQueryFilesInfo *pVnodeFileInfo = &pRuntimeEnv->vnodeFileInfo;
lseek(pVnodeFileInfo->headerFd, TSDB_FILE_HEADER_LEN, SEEK_SET);
read(pVnodeFileInfo->headerFd, buf, getCompHeaderSegSize(pCfg));
// check the offset value integrity
if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, pMeterObj->vnode, buf - TSDB_FILE_HEADER_LEN,
getCompHeaderSegSize(pCfg)) < 0) {
if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, pMeterObj->vnode,
buf - TSDB_FILE_HEADER_LEN, getCompHeaderSegSize(pCfg)) < 0) {
free(buf);
return -1;
}
SCompHeader *compHeader = (SCompHeader *)(buf + sizeof(SCompHeader) * pMeterObj->sid);
// no data in this file for specified meter, abort
if (compHeader->compInfoOffset == 0) {
free(buf);
return 0;
}
// corrupted file may cause the invalid compInfoOffset, check needs
if (validateCompBlockOffset(pQInfo, pMeterObj, compHeader, &pRuntimeEnv->vnodeFileInfo,
getCompHeaderStartPosition(pCfg)) < 0) {
free(buf);
return -1;
}
lseek(pVnodeFileInfo->headerFd, compHeader->compInfoOffset, SEEK_SET);
SCompInfo compInfo = {0};
SCompInfo compInfo = {0};
read(pVnodeFileInfo->headerFd, &compInfo, sizeof(SCompInfo));
// check compblock info integrity
......@@ -539,14 +543,14 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
// prepare buffer to hold compblock data
if (pQuery->blockBufferSize != bufferSize) {
pQuery->pBlock = realloc(pQuery->pBlock, bufferSize);
pQuery->blockBufferSize = (int32_t) bufferSize;
pQuery->blockBufferSize = (int32_t)bufferSize;
}
memset(pQuery->pBlock, 0, bufferSize);
// read data: comp block + checksum
read(pVnodeFileInfo->headerFd, pQuery->pBlock, compBlockSize + sizeof(TSCKSUM));
TSCKSUM checksum = *(TSCKSUM*)((char*)pQuery->pBlock + compBlockSize);
TSCKSUM checksum = *(TSCKSUM *)((char *)pQuery->pBlock + compBlockSize);
// check comp block integrity
if (validateCompBlockSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, &compInfo, (char *)pQuery->pBlock,
......@@ -565,7 +569,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
pSummary->totalCompInfoSize += compBlockSize;
pSummary->loadCompInfoUs += (et - st);
free(buf);
return pQuery->numOfBlocks;
}
......@@ -816,8 +820,8 @@ static int32_t loadDataBlockFieldsInfo(SQueryRuntimeEnv *pRuntimeEnv, SCompBlock
// check fields integrity
if (!taosCheckChecksumWhole((uint8_t *)(*pField), size)) {
dLError("QInfo:%p vid:%d sid:%d id:%s, slot:%d, failed to read sfields, file:%s, sfields area broken:%" PRId64, pQInfo,
pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pVnodeFilesInfo->dataFilePath,
dLError("QInfo:%p vid:%d sid:%d id:%s, slot:%d, failed to read sfields, file:%s, sfields area broken:%" PRId64,
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pVnodeFilesInfo->dataFilePath,
pBlock->offset);
return -1;
}
......@@ -870,20 +874,21 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
int32_t status = vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, fileIdx, loadPrimaryCol);
if (status == DISK_BLOCK_NO_NEED_TO_LOAD) {
dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, data block has been loaded, no need to load again, ts:%d, slot:%d,"
dTrace(
"QInfo:%p vid:%d sid:%d id:%s, fileId:%d, data block has been loaded, no need to load again, ts:%d, slot:%d,"
" brange:%lld-%lld, rows:%d",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, loadPrimaryCol,
pQuery->slot, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfPoints);
if (loadSField && (pQuery->pFields == NULL || pQuery->pFields[pQuery->slot] == NULL)) {
loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, &pQuery->pFields[pQuery->slot]);
}
return TSDB_CODE_SUCCESS;
} else if (status == DISK_BLOCK_LOAD_TS) {
dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, data block has been loaded, incrementally load ts",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId);
assert(PRIMARY_TSCOL_LOADED(pQuery) == false && loadSField == true);
if (pQuery->pFields == NULL || pQuery->pFields[pQuery->slot] == NULL) {
loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, &pQuery->pFields[pQuery->slot]);
......@@ -891,7 +896,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
// load primary timestamp
int32_t ret = loadPrimaryTSColumn(pRuntimeEnv, pBlock, pField, &columnBytes);
vnodeSetDataBlockInfoLoaded(pRuntimeEnv, pMeterObj, fileIdx, loadPrimaryCol);
return ret;
}
......@@ -988,7 +993,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
}
// todo ignore the blockType, pass the pQuery into this function
SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv* pRuntimeEnv, void *pBlock, int32_t blockType) {
SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv *pRuntimeEnv, void *pBlock, int32_t blockType) {
SBlockInfo blockInfo = {0};
if (IS_FILE_BLOCK(blockType)) {
SCompBlock *pDiskBlock = (SCompBlock *)pBlock;
......@@ -1034,7 +1039,7 @@ static bool checkQueryRangeAgainstNextBlock(SBlockInfo *pBlockInfo, SQueryRuntim
*/
static bool queryCompleteInBlock(SQuery *pQuery, SBlockInfo *pBlockInfo, int32_t forwardStep) {
if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) {
assert(pQuery->checkBufferInLoop == 1 && pQuery->over == QUERY_RESBUF_FULL && pQuery->pointsOffset == 0);
// assert(pQuery->checkBufferInLoop == 1 && pQuery->over == QUERY_RESBUF_FULL && pQuery->pointsOffset == 0);
assert((QUERY_IS_ASC_QUERY(pQuery) && forwardStep + pQuery->pos <= pBlockInfo->size) ||
(!QUERY_IS_ASC_QUERY(pQuery) && pQuery->pos - forwardStep + 1 >= 0));
......@@ -1071,20 +1076,20 @@ void savePointPosition(SPositionInfo *position, int32_t fileId, int32_t slot, in
position->pos = pos;
}
bool isCacheBlockValid(SQuery* pQuery, SCacheBlock* pBlock, SMeterObj* pMeterObj) {
bool isCacheBlockValid(SQuery *pQuery, SCacheBlock *pBlock, SMeterObj *pMeterObj) {
if (pMeterObj != pBlock->pMeterObj || pBlock->blockId > pQuery->blockId) {
SMeterObj* pNewMeterObj = pBlock->pMeterObj;
char* id = (pNewMeterObj != NULL)? pNewMeterObj->meterId:NULL;
dWarn("QInfo:%p vid:%d sid:%d id:%s, cache block is overwritten, slot:%d blockId:%d qBlockId:%d, meterObj:%p, "
"blockMeterObj:%p, blockMeter id:%s, first:%d, last:%d, numOfBlocks:%d",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pBlock->blockId,
pQuery->blockId, pMeterObj, pNewMeterObj, id, pQuery->firstSlot, pQuery->currentSlot, pQuery->numOfBlocks);
SMeterObj *pNewMeterObj = pBlock->pMeterObj;
char * id = (pNewMeterObj != NULL) ? pNewMeterObj->meterId : NULL;
dWarn(
"QInfo:%p vid:%d sid:%d id:%s, cache block is overwritten, slot:%d blockId:%d qBlockId:%d, meterObj:%p, "
"blockMeterObj:%p, blockMeter id:%s, first:%d, last:%d, numOfBlocks:%d",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pBlock->blockId,
pQuery->blockId, pMeterObj, pNewMeterObj, id, pQuery->firstSlot, pQuery->currentSlot, pQuery->numOfBlocks);
return false;
}
/*
* The check for empty block:
* pBlock->numOfPoints == 0. There is a empty block, which is caused by allocate-and-write data into cache
......@@ -1092,36 +1097,40 @@ bool isCacheBlockValid(SQuery* pQuery, SCacheBlock* pBlock, SMeterObj* pMeterObj
* block(newly allocated block), abort query. Otherwise, skip it and go on.
*/
if (pBlock->numOfPoints == 0) {
dWarn("QInfo:%p vid:%d sid:%d id:%s, cache block is empty. slot:%d first:%d, last:%d, numOfBlocks:%d,"
"allocated but not write data yet.", GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid,
pMeterObj->meterId, pQuery->slot, pQuery->firstSlot, pQuery->currentSlot, pQuery->numOfBlocks);
dWarn(
"QInfo:%p vid:%d sid:%d id:%s, cache block is empty. slot:%d first:%d, last:%d, numOfBlocks:%d,"
"allocated but not write data yet.",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pQuery->firstSlot,
pQuery->currentSlot, pQuery->numOfBlocks);
return false;
}
return true;
}
// todo all functions that call this function should check the returned data blocks status
SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv* pRuntimeEnv, int32_t slot) {
SQuery* pQuery = pRuntimeEnv->pQuery;
SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntimeEnv, int32_t slot) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache;
if (pCacheInfo == NULL || pCacheInfo->cacheBlocks == NULL || slot < 0 || slot >= pCacheInfo->maxBlocks) {
return NULL;
}
getBasicCacheInfoSnapshot(pQuery, pCacheInfo, pMeterObj->vnode);
SCacheBlock *pBlock = pCacheInfo->cacheBlocks[slot];
if (pBlock == NULL) { // the cache info snapshot must be existed.
if (pBlock == NULL) { // the cache info snapshot must be existed.
int32_t curNumOfBlocks = pCacheInfo->numOfBlocks;
int32_t curSlot = pCacheInfo->currentSlot;
dError("QInfo:%p NULL Block In Cache, snapshot (available blocks:%d, last block:%d), current (available blocks:%d, "
"last block:%d), accessed null block:%d, pBlockId:%d", GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks,
pQuery->currentSlot, curNumOfBlocks, curSlot, slot, pQuery->blockId);
dError(
"QInfo:%p NULL Block In Cache, snapshot (available blocks:%d, last block:%d), current (available blocks:%d, "
"last block:%d), accessed null block:%d, pBlockId:%d",
GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->currentSlot, curNumOfBlocks, curSlot, slot,
pQuery->blockId);
return NULL;
}
......@@ -1129,20 +1138,21 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv* pRuntimeE
if (!isCacheBlockValid(pQuery, pBlock, pMeterObj)) {
return NULL;
}
//the accessed cache block has been loaded already, return directly
// the accessed cache block has been loaded already, return directly
if (vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD) {
TSKEY skey = getTimestampInCacheBlock(pRuntimeEnv, pBlock, 0);
TSKEY ekey = getTimestampInCacheBlock(pRuntimeEnv, pBlock, pBlock->numOfPoints - 1);
dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, cache block has been loaded, no need to load again, ts:%d, "
"slot:%d, brange:%lld-%lld, rows:%d",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, 1,
pQuery->slot, skey, ekey, pBlock->numOfPoints);
dTrace(
"QInfo:%p vid:%d sid:%d id:%s, fileId:%d, cache block has been loaded, no need to load again, ts:%d, "
"slot:%d, brange:%lld-%lld, rows:%d",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, 1, pQuery->slot,
skey, ekey, pBlock->numOfPoints);
return &pRuntimeEnv->cacheBlock;
}
// keep the structure as well as the block data into local buffer
memcpy(&pRuntimeEnv->cacheBlock, pBlock, sizeof(SCacheBlock));
......@@ -1153,78 +1163,81 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv* pRuntimeE
int32_t numOfPoints = pNewBlock->numOfPoints;
if (pQuery->firstSlot == pQuery->commitSlot) {
assert(pQuery->commitPoint >= 0 && pQuery->commitPoint <= pNewBlock->numOfPoints);
offset = pQuery->commitPoint;
numOfPoints = pNewBlock->numOfPoints - offset;
if (offset != 0) {
dTrace("%p ignore the data in cache block that are commit already, numOfblock:%d slot:%d ignore points:%d. "
"first:%d last:%d", GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->slot, pQuery->commitPoint,
pQuery->firstSlot, pQuery->currentSlot);
dTrace(
"%p ignore the data in cache block that are commit already, numOfblock:%d slot:%d ignore points:%d. "
"first:%d last:%d",
GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->slot, pQuery->commitPoint, pQuery->firstSlot,
pQuery->currentSlot);
}
pNewBlock->numOfPoints = numOfPoints;
// current block are all commit already, ignore it
if (pNewBlock->numOfPoints == 0) {
dTrace("%p ignore current in cache block that are all commit already, numOfblock:%d slot:%d"
"first:%d last:%d", GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->slot,
pQuery->firstSlot, pQuery->currentSlot);
dTrace(
"%p ignore current in cache block that are all commit already, numOfblock:%d slot:%d"
"first:%d last:%d",
GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->slot, pQuery->firstSlot, pQuery->currentSlot);
return NULL;
}
}
// keep the data from in cache into the temporarily allocated buffer
for(int32_t i = 0; i < pQuery->numOfCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
SColumnInfoEx *pColumnInfoEx = &pQuery->colList[i];
int16_t columnIndex = pColumnInfoEx->colIdx;
int16_t columnIndexInBuf = pColumnInfoEx->colIdxInBuf;
SColumn* pCol = &pMeterObj->schema[columnIndex];
SColumn *pCol = &pMeterObj->schema[columnIndex];
int16_t bytes = pCol->bytes;
int16_t type = pCol->type;
char* dst = pRuntimeEnv->colDataBuffer[columnIndexInBuf]->data;
char *dst = pRuntimeEnv->colDataBuffer[columnIndexInBuf]->data;
if (pQuery->colList[i].colIdx != -1) {
assert(pCol->colId == pQuery->colList[i].data.colId && bytes == pColumnInfoEx->data.bytes &&
type == pColumnInfoEx->data.type);
type == pColumnInfoEx->data.type);
memcpy(dst, pBlock->offset[columnIndex] + offset * bytes, numOfPoints * bytes);
} else {
setNullN(dst, type, bytes, numOfPoints);
}
}
assert(numOfPoints == pNewBlock->numOfPoints);
assert(numOfPoints == pNewBlock->numOfPoints);
// if the primary timestamp are not loaded by default, always load it here into buffer
if(!PRIMARY_TSCOL_LOADED(pQuery)) {
memcpy(pRuntimeEnv->primaryColBuffer->data, pBlock->offset[0] + offset * TSDB_KEYSIZE, TSDB_KEYSIZE*numOfPoints);
if (!PRIMARY_TSCOL_LOADED(pQuery)) {
memcpy(pRuntimeEnv->primaryColBuffer->data, pBlock->offset[0] + offset * TSDB_KEYSIZE, TSDB_KEYSIZE * numOfPoints);
}
pQuery->fileId = -1;
pQuery->slot = slot;
if (!isCacheBlockValid(pQuery, pNewBlock, pMeterObj)) {
return NULL;
}
/*
* the accessed cache block still belongs to current meterObj, go on
* update the load data block info
*/
vnodeSetDataBlockInfoLoaded(pRuntimeEnv, pMeterObj, -1, true);
TSKEY skey = getTimestampInCacheBlock(pRuntimeEnv, pNewBlock, 0);
TSKEY ekey = getTimestampInCacheBlock(pRuntimeEnv, pNewBlock, numOfPoints - 1);
dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, load cache block, ts:%d, slot:%d, brange:%lld-%lld, rows:%d",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, 1,
pQuery->slot, skey, ekey, numOfPoints);
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, 1, pQuery->slot,
skey, ekey, numOfPoints);
return pNewBlock;
}
......@@ -1234,8 +1247,8 @@ static SCompBlock *getDiskDataBlock(SQuery *pQuery, int32_t slot) {
}
static void *getGenericDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntimeEnv, int32_t slot) {
SQuery* pQuery = pRuntimeEnv->pQuery;
SQuery *pQuery = pRuntimeEnv->pQuery;
if (IS_DISK_DATA_BLOCK(pQuery)) {
return getDiskDataBlock(pQuery, slot);
} else {
......@@ -1378,9 +1391,9 @@ static bool hasNullVal(SQuery *pQuery, int32_t col, SBlockInfo *pBlockInfo, SFie
return ret;
}
static char *doGetDataBlocks(SQuery* pQuery, SData** data, int32_t colIdx) {
static char *doGetDataBlocks(SQuery *pQuery, SData **data, int32_t colIdx) {
assert(colIdx >= 0 && colIdx < pQuery->numOfCols);
char* pData = data[colIdx]->data;
char *pData = data[colIdx]->data;
return pData;
}
......@@ -1409,7 +1422,7 @@ static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sa
sas->elemSize[i] = pColMsg->bytes;
sas->data[i] = pData + pCtx->startOffset * sas->elemSize[i]; // start from the offset
}
sas->numOfCols = pQuery->numOfCols;
sas->offset = 0;
} else { // other type of query function
......@@ -1443,8 +1456,8 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
SField *pFields, SBlockInfo *pBlockInfo) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
SQuery * pQuery = pRuntimeEnv->pQuery;
bool isDiskFileBlock = IS_FILE_BLOCK(pRuntimeEnv->blockStatus);
bool isDiskFileBlock = IS_FILE_BLOCK(pRuntimeEnv->blockStatus);
int64_t prevNumOfRes = getNumOfResult(pRuntimeEnv);
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutputCols, sizeof(SArithmeticSupport));
......@@ -1456,7 +1469,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock);
char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, forwardStep);
SField *tpField = NULL;
if (pFields != NULL) {
......@@ -1471,12 +1484,12 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
}
}
TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->skey : pQuery->ekey;
TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pRuntimeEnv->intervalSKey : pRuntimeEnv->intervalEKey;
int64_t alignedTimestamp =
taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision);
setExecParams(pQuery, &pCtx[k], alignedTimestamp, dataBlock, (char *)primaryKeyCol, forwardStep, functionId,
tpField, hasNull, pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag);
// int64_t alignedTimestamp =
// taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision);
setExecParams(pQuery, &pCtx[k], ts, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField, hasNull,
pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag);
}
/*
......@@ -1529,7 +1542,7 @@ static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx *
if (!vnodeSupportPrefilter(pFilterInfo->info.data.type)) {
continue;
}
// all points in current column are NULL, no need to check its boundary value
if (pField[colIndex].numOfNullPoints == numOfTotalPoints) {
continue;
......@@ -1564,35 +1577,201 @@ static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx *
return true;
}
static int32_t setGroupResultForKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes) {
SOutputRes *pOutputRes = NULL;
static SOutputRes *getResWindow(SSlidingWindowResInfo *pWindowResInfo, char *pData, int16_t bytes,
SWindowStatus **pStatus) {
int32_t p = -1;
int32_t *p1 = (int32_t *)taosGetDataFromHash(pWindowResInfo->hashList, pData, bytes);
if (p1 != NULL) {
p = *p1;
pWindowResInfo->curIndex = p;
if (pStatus != NULL) {
*pStatus = &pWindowResInfo->pStatus[p];
}
} else { // more than the capacity, reallocate the resources
if (pWindowResInfo->size >= pWindowResInfo->capacity) {
int64_t newCap = pWindowResInfo->capacity * 2;
char *t = realloc(pWindowResInfo->pStatus, newCap * sizeof(SWindowStatus));
if (t != NULL) {
pWindowResInfo->pStatus = (SWindowStatus *)t;
memset(&pWindowResInfo->pStatus[pWindowResInfo->capacity], 0, sizeof(SWindowStatus) * pWindowResInfo->capacity);
} else {
// todo
}
pWindowResInfo->capacity = newCap;
}
// add a new result set for a new group
if (pStatus != NULL) {
*pStatus = &pWindowResInfo->pStatus[pWindowResInfo->size];
}
p = pWindowResInfo->size;
pWindowResInfo->curIndex = pWindowResInfo->size;
pWindowResInfo->size += 1;
taosAddToHashTable(pWindowResInfo->hashList, pData, bytes, (char *)&pWindowResInfo->curIndex, sizeof(int32_t));
}
return &pWindowResInfo->pResult[p];
}
static int32_t initResWindowInfo(SSlidingWindowResInfo *pWindowResInfo, int32_t threshold, int16_t type,
SOutputRes *pRes) {
pWindowResInfo->capacity = threshold;
pWindowResInfo->threshold = threshold;
pWindowResInfo->type = type;
// ignore the null value
if (isNull(pData, type)) {
_hash_fn_t fn = taosGetDefaultHashFunction(type);
pWindowResInfo->hashList = taosInitHashTable(threshold, fn, false);
pWindowResInfo->curIndex = -1;
pWindowResInfo->size = 0;
pWindowResInfo->pResult = pRes;
pWindowResInfo->pStatus = calloc(threshold, sizeof(SWindowStatus));
if (pWindowResInfo->pStatus == NULL || pWindowResInfo->hashList == NULL) {
return -1;
}
SOutputRes **p1 = (SOutputRes **)taosGetDataFromHash(pRuntimeEnv->hashList, pData, bytes);
if (p1 != NULL) {
pOutputRes = *p1;
} else { // more than the threshold number, discard data that are not belong to current groups
if (pRuntimeEnv->usedIndex >= 10000) {
return -1;
return TSDB_CODE_SUCCESS;
}
static void destroyResWindowInfo(SSlidingWindowResInfo *pWindowResInfo) {
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) {
assert(pWindowResInfo->hashList == NULL && pWindowResInfo->pResult == NULL);
return;
}
taosCleanUpHashTable(pWindowResInfo->hashList);
tfree(pWindowResInfo->pStatus);
}
void resetResWindowInfo(SSlidingWindowResInfo *pWindowResInfo, int32_t numOfCols) {
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) {
return;
}
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SOutputRes *pOneRes = &pWindowResInfo->pResult[i];
clearGroupResultBuf(pOneRes, numOfCols);
}
memset(pWindowResInfo->pStatus, 0, sizeof(SWindowStatus) * pWindowResInfo->capacity);
pWindowResInfo->curIndex = -1;
taosCleanUpHashTable(pWindowResInfo->hashList);
pWindowResInfo->size = 0;
_hash_fn_t fn = taosGetDefaultHashFunction(pWindowResInfo->type);
pWindowResInfo->hashList = taosInitHashTable(pWindowResInfo->capacity, fn, false);
pWindowResInfo->startTime = 0;
pWindowResInfo->prevSKey = 0;
}
void clearCompletedResWindows(SSlidingWindowResInfo *pWindowResInfo, int32_t numOfCols) {
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0) {
return;
}
int32_t i = 0;
for (i = 0; i < pWindowResInfo->size; ++i) {
SWindowStatus *pStatus = &pWindowResInfo->pStatus[i];
if (pStatus->closed) { // remove the window slot from hash table
taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pStatus->window.skey, TSDB_KEYSIZE);
} else {
break;
}
}
// add a new result set for a new group
pOutputRes = &pRuntimeEnv->pResult[pRuntimeEnv->usedIndex++];
taosAddToHashTable(pRuntimeEnv->hashList, pData, bytes, (char *)&pOutputRes, POINTER_BYTES);
if (i == 0) {
return;
}
int32_t remain = pWindowResInfo->size - i;
//clear remain list
memmove(pWindowResInfo->pStatus, &pWindowResInfo->pStatus[i], remain * sizeof(SWindowStatus));
memset(&pWindowResInfo->pStatus[remain], 0, (pWindowResInfo->capacity - remain) * sizeof(SWindowStatus));
for(int32_t k = 0; k < remain; ++k) {
copyGroupResultBuf(&pWindowResInfo->pResult[k], &pWindowResInfo->pResult[i + k], numOfCols);
}
for(int32_t k = remain; k < pWindowResInfo->size; ++k) {
SOutputRes *pOneRes = &pWindowResInfo->pResult[k];
clearGroupResultBuf(pOneRes, numOfCols);
}
pWindowResInfo->size = remain;
for(int32_t k = 0; k < pWindowResInfo->size; ++k) {
SWindowStatus* pStatus = &pWindowResInfo->pStatus[k];
int32_t *p = (int32_t*) taosGetDataFromHash(pWindowResInfo->hashList, (const char*)&pStatus->window.skey, TSDB_KEYSIZE);
int32_t v = *p;
v = (v - i);
taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pStatus->window.skey, TSDB_KEYSIZE);
taosAddToHashTable(pWindowResInfo->hashList, (const char*)&pStatus->window.skey, TSDB_KEYSIZE,
(char *)&v, sizeof(int32_t));
}
pWindowResInfo->curIndex = -1;
}
int32_t numOfResFromResWindowInfo(SSlidingWindowResInfo *pWindowResInfo) {
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SWindowStatus *pStatus = &pWindowResInfo->pStatus[i];
if (pStatus->closed == false) {
return i;
}
}
}
static SWindowStatus* getCurrentSWindow(SSlidingWindowResInfo *pWindowResInfo) {
return &pWindowResInfo->pStatus[pWindowResInfo->curIndex];
}
static int32_t setGroupResultFromKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes) {
if (isNull(pData, type)) { // ignore the null value
return -1;
}
SOutputRes *pOutputRes = getResWindow(&pRuntimeEnv->swindowResInfo, pData, bytes, NULL);
if (pOutputRes == NULL) {
return -1;
}
setGroupOutputBuffer(pRuntimeEnv, pOutputRes);
initCtxOutputBuf(pRuntimeEnv);
return TSDB_CODE_SUCCESS;
}
static int32_t setSlidingWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, int64_t skey, int64_t ekey) {
int64_t st = skey;
SWindowStatus *pStatus = NULL;
SOutputRes * pOutputRes = getResWindow(&pRuntimeEnv->swindowResInfo, (char *)&st, TSDB_KEYSIZE, &pStatus);
if (pOutputRes == NULL) {
return -1;
}
pStatus->window = (STimeWindow){.skey = skey, .ekey = ekey};
setGroupOutputBuffer(pRuntimeEnv, pOutputRes);
initCtxOutputBuf(pRuntimeEnv);
return TSDB_CODE_SUCCESS;
}
static char *getGroupbyColumnData(SQuery* pQuery, SData** data, int16_t* type, int16_t* bytes) {
char * groupbyColumnData = NULL;
static char *getGroupbyColumnData(SQuery *pQuery, SData **data, int16_t *type, int16_t *bytes) {
char *groupbyColumnData = NULL;
SSqlGroupbyExpr *pGroupbyExpr = pQuery->pGroupbyExpr;
......@@ -1600,22 +1779,22 @@ static char *getGroupbyColumnData(SQuery* pQuery, SData** data, int16_t* type, i
if (pGroupbyExpr->columnInfo[k].flag == TSDB_COL_TAG) {
continue;
}
int16_t colIndex = -1;
int32_t colId = pGroupbyExpr->columnInfo[k].colId;
for(int32_t i = 0; i < pQuery->numOfCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
if (pQuery->colList[i].data.colId == colId) {
colIndex = i;
break;
}
}
assert(colIndex >= 0 && colIndex < pQuery->numOfCols);
*type = pQuery->colList[colIndex].data.type;
*bytes = pQuery->colList[colIndex].data.bytes;
groupbyColumnData = doGetDataBlocks(pQuery, data, pQuery->colList[colIndex].colIdxInBuf);
break;
}
......@@ -1637,7 +1816,8 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) {
TSKEY key = *(TSKEY *)(pCtx[0].aInputElemBuf + TSDB_KEYSIZE * offset);
#if defined(_DEBUG_VIEW)
printf("elem in comp ts file:%" PRId64 ", key:%" PRId64 ", tag:%d, id:%s, query order:%d, ts order:%d, traverse:%d, index:%d\n",
printf("elem in comp ts file:%" PRId64 ", key:%" PRId64
", tag:%d, id:%s, query order:%d, ts order:%d, traverse:%d, index:%d\n",
elem.ts, key, elem.tag, pRuntimeEnv->pMeterObj->meterId, pQuery->order.order, pRuntimeEnv->pTSBuf->tsOrder,
pRuntimeEnv->pTSBuf->cur.order, pRuntimeEnv->pTSBuf->cur.tsIndex);
#endif
......@@ -1666,6 +1846,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
return false;
}
// in the supplementary scan, only the following functions need to be executed
if (!IS_MASTER_SCAN(pRuntimeEnv) &&
!(functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_FIRST ||
functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS)) {
......@@ -1679,8 +1860,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
SField *pFields, SBlockInfo *pBlockInfo) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
SQuery * pQuery = pRuntimeEnv->pQuery;
bool isDiskFileBlock = IS_FILE_BLOCK(pRuntimeEnv->blockStatus);
bool isDiskFileBlock = IS_FILE_BLOCK(pRuntimeEnv->blockStatus);
SData **data = pRuntimeEnv->colDataBuffer;
int64_t prevNumOfRes = 0;
......@@ -1705,13 +1886,10 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock);
char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, *forwardStep);
TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->skey : pQuery->ekey;
int64_t alignedTimestamp =
taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision);
setExecParams(pQuery, &pCtx[k], alignedTimestamp, dataBlock, (char *)primaryKeyCol, (*forwardStep), functionId,
pFields, hasNull, pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag);
TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pRuntimeEnv->intervalSKey : pRuntimeEnv->intervalEKey;
setExecParams(pQuery, &pCtx[k], ts, dataBlock, (char *)primaryKeyCol, (*forwardStep), functionId, pFields, hasNull,
pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag);
}
// set the input column data
......@@ -1735,7 +1913,10 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
pQuery->order.order, pRuntimeEnv->pTSBuf->cur.order);
}
for (int32_t j = 0; j < (*forwardStep); ++j) {
int32_t j = 0;
int64_t lastKey = 0;
for (j = 0; j < (*forwardStep); ++j) {
int32_t offset = GET_COL_DATA_POS(pQuery, j, step);
if (pRuntimeEnv->pTSBuf != NULL) {
......@@ -1754,23 +1935,115 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
continue;
}
// decide which group this rows belongs to according to current state value
if (groupbyStateValue) {
char *stateVal = groupbyColumnData + bytes * offset;
// sliding window query
if (pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0) {
// decide the time window according to the primary timestamp
int64_t ts = primaryKeyCol[offset];
int64_t wskey = 0;
int64_t wekey = 0;
if (pRuntimeEnv->swindowResInfo.curIndex == -1) {
wskey = pRuntimeEnv->swindowResInfo.prevSKey;
wekey = wskey + pQuery->nAggTimeInterval - 1;
} else {
SWindowStatus *pStatus = &pRuntimeEnv->swindowResInfo.pStatus[pRuntimeEnv->swindowResInfo.curIndex];
if (pStatus->window.skey <= ts && pStatus->window.ekey >= ts) {
wskey = pStatus->window.skey;
wekey = pStatus->window.ekey;
} else {
int64_t st = pStatus->window.skey;
while (st > ts) {
st -= pQuery->slidingTime;
}
while ((st + pQuery->nAggTimeInterval - 1) < ts) {
st += pQuery->slidingTime;
}
wskey = st;
wekey = wskey + pQuery->nAggTimeInterval - 1;
}
}
int32_t ret = setGroupResultForKey(pRuntimeEnv, stateVal, type, bytes);
int32_t ret = setSlidingWindowFromKey(pRuntimeEnv, wskey, wekey);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
continue;
}
}
// all startOffset are identical
offset -= pCtx[0].startOffset;
// all startOffset are identical
offset -= pCtx[0].startOffset;
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
pCtx[k].nStartQueryTimestamp = wskey;
SWindowStatus* pStatus = getCurrentSWindow(&pRuntimeEnv->swindowResInfo);
if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) {
qTrace("not completed in supplementary scan, ignore\n");
continue;
}
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset);
}
}
lastKey = ts;
int32_t index = pRuntimeEnv->swindowResInfo.curIndex;
while (1) {
getNextLogicalQueryRange(pRuntimeEnv, &wskey, &wekey);
if (pRuntimeEnv->swindowResInfo.startTime > wskey || (wskey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(wskey > pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))) {
pRuntimeEnv->swindowResInfo.curIndex = index;
break;
}
if (ts >= wskey && ts <= wekey) {
// null data, failed to allocate more memory buffer
if (setSlidingWindowFromKey(pRuntimeEnv, wskey, wekey) != TSDB_CODE_SUCCESS) {
pRuntimeEnv->swindowResInfo.curIndex = index;
break;
}
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
pCtx[k].nStartQueryTimestamp = wskey;
SWindowStatus* pStatus = getCurrentSWindow(&pRuntimeEnv->swindowResInfo);
if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) {
qTrace("not completed in supplementary scan, ignore");
continue;
}
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset);
}
}
} else {
pRuntimeEnv->swindowResInfo.curIndex = index;
break;
}
}
} else { // other queries
// decide which group this rows belongs to according to current state value
if (groupbyStateValue) {
char *stateVal = groupbyColumnData + bytes * offset;
int32_t ret = setGroupResultFromKey(pRuntimeEnv, stateVal, type, bytes);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
continue;
}
}
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset);
// all startOffset are identical
offset -= pCtx[0].startOffset;
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset);
}
}
}
......@@ -1795,6 +2068,43 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
free(sasArray);
if (pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0) {
SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo;
// query completed
if (lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery) ||
(lastKey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SWindowStatus *pStatus = &pWindowResInfo->pStatus[i];
pStatus->closed = true;
}
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL);
} else {
int32_t i = 0;
int64_t skey = 0;
for (i = 0; i < pWindowResInfo->size; ++i) {
SWindowStatus *pStatus = &pWindowResInfo->pStatus[i];
if ((pStatus->window.ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pStatus->window.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) {
pStatus->closed = true;
} else {
skey = pStatus->window.skey;
break;
}
}
pWindowResInfo->prevSKey = skey;
// the number of completed slots are larger than the threshold, dump to client immediately.
if (numOfResFromResWindowInfo(pWindowResInfo) > pWindowResInfo->threshold) {
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
}
}
}
/*
* No need to calculate the number of output results for groupby normal columns
* because the results of group by normal column is put into intermediate buffer.
......@@ -1923,9 +2233,9 @@ static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *
pQuery->lastKey = pPrimaryColumn[pQuery->pos + (newForwardStep - 1) * step] + step;
}
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
*numOfRes =
rowwiseApplyAllFunctions(pRuntimeEnv, &newForwardStep, pPrimaryColumn, pFields, pBlockInfo);
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr) ||
(pQuery->slidingTime != -1 && pQuery->nAggTimeInterval > 0)) {
*numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, &newForwardStep, pPrimaryColumn, pFields, pBlockInfo);
} else {
*numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, newForwardStep, pPrimaryColumn, pFields, pBlockInfo);
}
......@@ -2192,7 +2502,7 @@ static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQuery
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
tVariantCreateFromBinary(&pCtx->param[j], pSqlFuncMsg->arg->argValue.pz, bytes, type);
} else {
tVariantCreateFromBinary(&pCtx->param[j], (char*) &pSqlFuncMsg->arg[j].argValue.i64, bytes, type);
tVariantCreateFromBinary(&pCtx->param[j], (char *)&pSqlFuncMsg->arg[j].argValue.i64, bytes, type);
}
}
......@@ -2249,8 +2559,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
}
tfree(pRuntimeEnv->secondaryUnzipBuffer);
taosCleanUpHashTable(pRuntimeEnv->hashList);
destroyResWindowInfo(&pRuntimeEnv->swindowResInfo);
if (pRuntimeEnv->pCtx != NULL) {
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) {
......@@ -2281,9 +2590,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
pRuntimeEnv->vnodeFileInfo.numOfFiles = 0;
free(pRuntimeEnv->vnodeFileInfo.pFileInfo);
}
taosDestoryInterpoInfo(&pRuntimeEnv->interpoInfo);
if (pRuntimeEnv->pInterpoBuf != NULL) {
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) {
tfree(pRuntimeEnv->pInterpoBuf[i]);
......@@ -2393,7 +2702,7 @@ bool isFirstLastRowQuery(SQuery *pQuery) {
bool notHasQueryTimeRange(SQuery *pQuery) {
return (pQuery->skey == 0 && pQuery->ekey == INT64_MAX && QUERY_IS_ASC_QUERY(pQuery)) ||
(pQuery->skey == INT64_MAX && pQuery->ekey == 0 && (!QUERY_IS_ASC_QUERY(pQuery)));
(pQuery->skey == INT64_MAX && pQuery->ekey == 0 && (!QUERY_IS_ASC_QUERY(pQuery)));
}
bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functionId == TSDB_FUNC_TS_COMP; }
......@@ -2470,13 +2779,13 @@ static int32_t getFirstCacheSlot(int32_t numOfBlocks, int32_t lastSlot, SCacheIn
return (lastSlot - numOfBlocks + 1 + pCacheInfo->maxBlocks) % pCacheInfo->maxBlocks;
}
static bool cacheBoundaryCheck(SQueryRuntimeEnv* pRuntimeEnv, SMeterObj *pMeterObj) {
static bool cacheBoundaryCheck(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj) {
/*
* here we get the first slot from the meter cache, not from the cache snapshot from pQuery, since the
* snapshot value in pQuery may have been expired now.
*/
SQuery* pQuery = pRuntimeEnv->pQuery;
SQuery *pQuery = pRuntimeEnv->pQuery;
SCacheInfo * pCacheInfo = (SCacheInfo *)pMeterObj->pCache;
SCacheBlock *pBlock = NULL;
......@@ -2550,7 +2859,7 @@ void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t v
pQuery->firstSlot = getFirstCacheSlot(numOfBlocks, lastSlot, pCacheInfo);
pQuery->commitSlot = commitSlot;
pQuery->commitPoint = commitPoint;
/*
* Note: the block id is continuous increasing, never becomes smaller.
*
......@@ -2599,14 +2908,14 @@ int64_t getQueryStartPositionInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t *slo
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm];
pQuery->slot = *slot;
// cache block has been flushed to disk, no required data block in cache.
SCacheBlock* pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
SCacheBlock *pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
if (pBlock == NULL) {
pQuery->skey = rawskey; // restore the skey
pQuery->skey = rawskey; // restore the skey
return -1;
}
(*pos) = searchFn(pRuntimeEnv->primaryColBuffer->data, pBlock->numOfPoints, pQuery->skey, pQuery->order.order);
// restore skey before return
......@@ -2685,57 +2994,73 @@ void vnodeCheckIfDataExists(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj,
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
}
static void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t qualifiedKey, int64_t keyFirst, int64_t keyLast,
int64_t *skey, int64_t *ekey) {
assert(qualifiedKey >= keyFirst && qualifiedKey <= keyLast);
static void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t pKey, int64_t keyFirst, int64_t keyLast,
int64_t *actualSkey, int64_t *actualEkey, int64_t *skey, int64_t *ekey) {
assert(pKey >= keyFirst && pKey <= keyLast);
*skey = taosGetIntervalStartTimestamp(pKey, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision);
if (keyFirst > (INT64_MAX - pQuery->nAggTimeInterval)) {
/*
* if the skey > INT64_MAX - pQuery->nAggTimeInterval, the query duration between
* skey and ekey must be less than one interval.Therefore, no need to adjust the query ranges.
* if the actualSkey > INT64_MAX - pQuery->nAggTimeInterval, the query duration between
* actualSkey and actualEkey must be less than one interval.Therefore, no need to adjust the query ranges.
*/
assert(keyLast - keyFirst < pQuery->nAggTimeInterval);
*skey = keyFirst;
*ekey = keyLast;
*actualSkey = keyFirst;
*actualEkey = keyLast;
*ekey = INT64_MAX;
return;
}
*skey = taosGetIntervalStartTimestamp(qualifiedKey, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit,
pQuery->precision);
int64_t endKey = *skey + pQuery->nAggTimeInterval - 1;
*ekey = *skey + pQuery->nAggTimeInterval - 1;
if (*skey < keyFirst) {
*skey = keyFirst;
*actualSkey = keyFirst;
} else {
*actualSkey = *skey;
}
if (endKey < keyLast) {
*ekey = endKey;
if (*ekey < keyLast) {
*actualEkey = *ekey;
} else {
*ekey = keyLast;
*actualEkey = keyLast;
}
}
static void getAlignedIntervalQueryRange(SQuery *pQuery, TSKEY key, TSKEY skey, TSKEY ekey) {
if (pQuery->nAggTimeInterval == 0) {
static void getAlignedIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key, TSKEY skey, TSKEY ekey) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pQuery->nAggTimeInterval == 0 || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) {
return;
}
TSKEY skey1, ekey1;
TSKEY skey2 = (skey < ekey) ? skey : ekey;
TSKEY ekey2 = (skey < ekey) ? ekey : skey;
TSKEY skey2 = MIN(skey, ekey);
TSKEY ekey2 = MAX(skey, ekey);
// the actual first query range in skey1 and ekey1
TSKEY skey1, ekey1;
doGetAlignedIntervalQueryRangeImpl(pQuery, key, skey2, ekey2, &skey1, &ekey1);
TSKEY windowSKey = 0, windowEKey = 0;
doGetAlignedIntervalQueryRangeImpl(pQuery, key, skey2, ekey2, &skey1, &ekey1, &windowSKey, &windowEKey);
if (QUERY_IS_ASC_QUERY(pQuery)) {
pQuery->skey = skey1;
pQuery->ekey = ekey1;
assert(pQuery->skey <= pQuery->ekey);
pRuntimeEnv->intervalSKey = windowSKey;
pRuntimeEnv->intervalEKey = windowEKey;
assert(pQuery->skey <= pQuery->ekey &&
pRuntimeEnv->intervalSKey + (pQuery->nAggTimeInterval - 1) == pRuntimeEnv->intervalEKey);
} else {
pQuery->skey = ekey1;
pQuery->ekey = skey1;
assert(pQuery->skey >= pQuery->ekey);
pRuntimeEnv->intervalSKey = windowEKey;
pRuntimeEnv->intervalEKey = windowSKey;
assert(pQuery->skey >= pQuery->ekey &&
pRuntimeEnv->intervalSKey - (pQuery->nAggTimeInterval - 1) == pRuntimeEnv->intervalEKey);
}
pQuery->lastKey = pQuery->skey;
......@@ -2819,7 +3144,8 @@ static bool getNeighborPoints(SMeterQuerySupportObj *pSupporter, SMeterObj *pMet
* reset the status and load the data block that contains the qualified point
*/
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
dTrace("QInfo:%p no previous data block, start fileId:%d, slot:%d, pos:%d, qrange:%" PRId64 "-%" PRId64 ", out of range",
dTrace("QInfo:%p no previous data block, start fileId:%d, slot:%d, pos:%d, qrange:%" PRId64 "-%" PRId64
", out of range",
GET_QINFO_ADDR(pQuery), pRuntimeEnv->startPos.fileId, pRuntimeEnv->startPos.slot,
pRuntimeEnv->startPos.pos, pQuery->skey, pQuery->ekey);
......@@ -2837,7 +3163,7 @@ static bool getNeighborPoints(SMeterQuerySupportObj *pSupporter, SMeterObj *pMet
// moveToNextBlock make sure there is a available cache block, if exists
assert(vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD);
pBlock = &pRuntimeEnv->cacheBlock;
pQuery->pos = pBlock->numOfPoints - 1;
getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos);
......@@ -2864,7 +3190,7 @@ static bool doGetQueryPos(TSKEY key, SMeterQuerySupportObj *pSupporter, SPointIn
if (isPointInterpoQuery(pQuery)) { /* no qualified data in this query range */
return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter);
} else {
getAlignedIntervalQueryRange(pQuery, key, pQuery->skey, pQuery->ekey);
getAlignedIntervalQueryRange(pRuntimeEnv, key, pQuery->skey, pQuery->ekey);
return true;
}
} else { // key > pQuery->ekey, abort for normal query, continue for interp query
......@@ -2877,10 +3203,10 @@ static bool doGetQueryPos(TSKEY key, SMeterQuerySupportObj *pSupporter, SPointIn
}
static bool doSetDataInfo(SMeterQuerySupportObj *pSupporter, SPointInterpoSupporter *pPointInterpSupporter,
SMeterObj *pMeterObj,TSKEY nextKey) {
SMeterObj *pMeterObj, TSKEY nextKey) {
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
if (isFirstLastRowQuery(pQuery)) {
/*
* if the pQuery->skey != pQuery->ekey for last_row query,
......@@ -2888,31 +3214,31 @@ static bool doSetDataInfo(SMeterQuerySupportObj *pSupporter, SPointInterpoSuppor
*/
if (pQuery->skey != pQuery->ekey) {
assert(pQuery->skey >= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery) && nextKey >= pQuery->ekey &&
nextKey <= pQuery->skey);
nextKey <= pQuery->skey);
pQuery->skey = nextKey;
pQuery->ekey = nextKey;
}
return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter);
} else {
getAlignedIntervalQueryRange(pQuery, nextKey, pQuery->skey, pQuery->ekey);
getAlignedIntervalQueryRange(pRuntimeEnv, nextKey, pQuery->skey, pQuery->ekey);
return true;
}
}
//TODO refactor code, the best way to implement the last_row is utilizing the iterator
// TODO refactor code, the best way to implement the last_row is utilizing the iterator
bool normalizeUnBoundLastRowQuery(SMeterQuerySupportObj *pSupporter, SPointInterpoSupporter *pPointInterpSupporter) {
SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj;
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj;
assert(!QUERY_IS_ASC_QUERY(pQuery) && notHasQueryTimeRange(pQuery));
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm];
TSKEY lastKey = -1;
pQuery->fileId = -1;
vnodeFreeFieldsEx(pRuntimeEnv);
......@@ -2923,44 +3249,44 @@ bool normalizeUnBoundLastRowQuery(SMeterQuerySupportObj *pSupporter, SPointInter
if (pCacheInfo != NULL && pCacheInfo->cacheBlocks != NULL && pQuery->numOfBlocks > 0) {
pQuery->fileId = -1;
TSKEY key = pMeterObj->lastKey;
pQuery->skey = key;
pQuery->ekey = key;
pQuery->lastKey = pQuery->skey;
/*
* cache block may have been flushed to disk, and no data in cache anymore.
* So, copy cache block to local buffer is required.
*/
lastKey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false);
if (lastKey < 0) { // data has been flushed to disk, try again search in file
if (lastKey < 0) { // data has been flushed to disk, try again search in file
lastKey = getQueryPositionForCacheInvalid(pRuntimeEnv, searchFn);
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK|QUERY_COMPLETED)) {
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) {
return false;
}
}
} else { // no data in cache, try file
} else { // no data in cache, try file
TSKEY key = pMeterObj->lastKeyOnFile;
pQuery->skey = key;
pQuery->ekey = key;
pQuery->lastKey = pQuery->skey;
bool ret = getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn);
if (!ret) { // no data in file, return false;
return false;
}
lastKey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
}
assert(lastKey <= pQuery->skey);
pQuery->skey = lastKey;
pQuery->ekey = lastKey;
pQuery->lastKey = pQuery->skey;
return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter);
}
......@@ -2970,7 +3296,7 @@ bool normalizeUnBoundLastRowQuery(SMeterQuerySupportObj *pSupporter, SPointInter
* 10ms, which is guaranteed by parser at client-side
*/
bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, SMeterQuerySupportObj *pSupporter,
SPointInterpoSupporter *pPointInterpSupporter) {
SPointInterpoSupporter *pPointInterpSupporter, int64_t *key) {
SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj;
......@@ -2979,10 +3305,14 @@ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, SMeterQuerySup
if (QUERY_IS_ASC_QUERY(pQuery)) {
// todo: the action return as the getQueryStartPositionInCache function
if (dataInDisk && getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_GREATER_EQUAL, searchFn)) {
TSKEY key = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
assert(key >= pQuery->skey);
return doGetQueryPos(key, pSupporter, pPointInterpSupporter);
TSKEY nextKey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
assert(nextKey >= pQuery->skey);
if (key != NULL) {
*key = nextKey;
}
return doGetQueryPos(nextKey, pSupporter, pPointInterpSupporter);
}
// set no data in file
......@@ -2995,14 +3325,23 @@ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, SMeterQuerySup
}
TSKEY nextKey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false);
if (key != NULL) {
*key = nextKey;
}
return doGetQueryPos(nextKey, pSupporter, pPointInterpSupporter);
} else { // descending order
} else { // descending order
if (dataInCache) { // todo handle error
TSKEY nextKey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false);
assert(nextKey == -1 || nextKey <= pQuery->skey);
if (nextKey != -1) { // find qualified data in cache
if (key != NULL) {
*key = nextKey;
}
if (nextKey != -1) { // find qualified data in cache
if (nextKey >= pQuery->ekey) {
return doSetDataInfo(pSupporter, pPointInterpSupporter, pMeterObj, nextKey);
} else {
......@@ -3020,13 +3359,17 @@ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, SMeterQuerySup
}
if (dataInDisk && getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn)) {
TSKEY key = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
assert(key <= pQuery->skey);
TSKEY nextKey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
assert(nextKey <= pQuery->skey);
if (key != NULL) {
*key = nextKey;
}
// key in query range. If not, no qualified in disk file
if (key >= pQuery->ekey) {
return doSetDataInfo(pSupporter, pPointInterpSupporter, pMeterObj, key);
} else { //In case of all queries, the value of false will be returned if key < pQuery->ekey
if (nextKey >= pQuery->ekey) {
return doSetDataInfo(pSupporter, pPointInterpSupporter, pMeterObj, nextKey);
} else { // In case of all queries, the value of false will be returned if key < pQuery->ekey
return false;
}
}
......@@ -3313,7 +3656,8 @@ static bool onlyLastQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSDB
static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) {
// in case of point-interpolation query, use asc order scan
char msg[] =
"QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%" PRId64 "-%" PRId64 ", "
"QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%" PRId64 "-%" PRId64
", "
"new qrange:%" PRId64 "-%" PRId64;
// descending order query for last_row query
......@@ -3322,13 +3666,13 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) {
pQuery->order.order, TSQL_SO_DESC);
pQuery->order.order = TSQL_SO_DESC;
int64_t skey = MIN(pQuery->skey, pQuery->ekey);
int64_t ekey = MAX(pQuery->skey, pQuery->ekey);
pQuery->skey = ekey;
pQuery->ekey = skey;
return;
}
......@@ -3448,7 +3792,7 @@ void forwardQueryStartPosition(SQueryRuntimeEnv *pRuntimeEnv) {
} else {
pQuery->limit.offset -= maxReads;
// update the lastkey, since the following skip operation may traverse to another media. update the lastkey first.
pQuery->lastKey = (QUERY_IS_ASC_QUERY(pQuery))? blockInfo.keyLast+1:blockInfo.keyFirst-1;
pQuery->lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.keyLast + 1 : blockInfo.keyFirst - 1;
doSkipDataBlock(pRuntimeEnv);
}
}
......@@ -3501,7 +3845,7 @@ static bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo, SMeterQuerySupportObj *
pQuery->lastKey = pQuery->skey;
// todo opt performance
if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, NULL) == false) {
if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, NULL, NULL) == false) {
sem_post(&pQInfo->dataReady); // hack for next read for empty return
pQInfo->over = 1;
return false;
......@@ -3746,7 +4090,7 @@ static void allocMemForInterpo(SMeterQuerySupportObj *pSupporter, SQuery *pQuery
static int32_t allocateOutputBufForGroup(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, bool isMetricQuery) {
int32_t slot = 0;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) {
slot = 10000;
} else {
slot = pSupporter->pSidSet->numOfSubSet;
......@@ -3760,27 +4104,27 @@ static int32_t allocateOutputBufForGroup(SMeterQuerySupportObj *pSupporter, SQue
for (int32_t k = 0; k < slot; ++k) {
SOutputRes *pOneRes = &pSupporter->pResult[k];
pOneRes->nAlloc = 1;
/*
* for single table top/bottom query, the output for group by normal column, the output rows is
* equals to the maximum rows, instead of 1.
*/
if (!isMetricQuery && isTopBottomQuery(pQuery)) {
assert(pQuery->numOfOutputCols > 1);
SSqlFunctionExpr *pExpr = &pQuery->pSelectExpr[1];
pOneRes->nAlloc = pExpr->pBase.arg[0].argValue.i64;
}
createGroupResultBuf(pQuery, pOneRes, isMetricQuery);
}
return TSDB_CODE_SUCCESS;
}
static int32_t allocateRuntimeEnvBuf(SQueryRuntimeEnv* pRuntimeEnv, SMeterObj* pMeterObj) {
SQuery* pQuery = pRuntimeEnv->pQuery;
static int32_t allocateRuntimeEnvBuf(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj) {
SQuery *pQuery = pRuntimeEnv->pQuery;
// To make sure the start position of each buffer is aligned to 4bytes in 32-bit ARM system.
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
int32_t bytes = pQuery->colList[i].data.bytes;
......@@ -3789,7 +4133,7 @@ static int32_t allocateRuntimeEnvBuf(SQueryRuntimeEnv* pRuntimeEnv, SMeterObj* p
goto _error_clean;
}
}
// record the maximum column width among columns of this meter/metric
int32_t maxColWidth = pQuery->colList[0].data.bytes;
for (int32_t i = 1; i < pQuery->numOfCols; ++i) {
......@@ -3798,39 +4142,39 @@ static int32_t allocateRuntimeEnvBuf(SQueryRuntimeEnv* pRuntimeEnv, SMeterObj* p
maxColWidth = bytes;
}
}
pRuntimeEnv->primaryColBuffer = NULL;
if (PRIMARY_TSCOL_LOADED(pQuery)) {
pRuntimeEnv->primaryColBuffer = pRuntimeEnv->colDataBuffer[0];
} else {
pRuntimeEnv->primaryColBuffer =
(SData *) malloc(pMeterObj->pointsPerFileBlock * TSDB_KEYSIZE + sizeof(SData) + EXTRA_BYTES);
(SData *)malloc(pMeterObj->pointsPerFileBlock * TSDB_KEYSIZE + sizeof(SData) + EXTRA_BYTES);
}
pRuntimeEnv->unzipBufSize = (size_t)(maxColWidth * pMeterObj->pointsPerFileBlock + EXTRA_BYTES); // plus extra_bytes
pRuntimeEnv->unzipBuffer = (char *)calloc(1, pRuntimeEnv->unzipBufSize);
pRuntimeEnv->secondaryUnzipBuffer = (char *)calloc(1, pRuntimeEnv->unzipBufSize);
if (pRuntimeEnv->unzipBuffer == NULL || pRuntimeEnv->secondaryUnzipBuffer == NULL ||
pRuntimeEnv->primaryColBuffer == NULL) {
goto _error_clean;
}
return TSDB_CODE_SUCCESS;
_error_clean:
_error_clean:
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfCols; ++i) {
tfree(pRuntimeEnv->colDataBuffer[i]);
}
tfree(pRuntimeEnv->unzipBuffer);
tfree(pRuntimeEnv->secondaryUnzipBuffer);
if (!PRIMARY_TSCOL_LOADED(pQuery)) {
tfree(pRuntimeEnv->primaryColBuffer);
}
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
......@@ -3838,7 +4182,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
void *param) {
SQuery *pQuery = &pQInfo->query;
int32_t code = TSDB_CODE_SUCCESS;
/*
* only the successful complete requries the sem_post/over = 1 operations.
*/
......@@ -3870,11 +4214,11 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
// check data in file or cache
bool dataInCache = true;
bool dataInDisk = true;
SQueryRuntimeEnv* pRuntimeEnv = &pSupporter->runtimeEnv;
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pMeterObj = pMeterObj;
if ((code = allocateRuntimeEnvBuf(pRuntimeEnv, pMeterObj)) != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -3904,55 +4248,106 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
vnodeRecordAllFiles(pQInfo, pMeterObj->vnode);
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) {
if ((code = allocateOutputBufForGroup(pSupporter, pQuery, false)) != TSDB_CODE_SUCCESS) {
return code;
}
int16_t type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
_hash_fn_t fn = taosGetDefaultHashFunction(type);
pRuntimeEnv->hashList = taosInitHashTable(10039, fn, false);
pRuntimeEnv->usedIndex = 0;
pRuntimeEnv->pResult = pSupporter->pResult;
int16_t type = TSDB_DATA_TYPE_NULL;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
} else {
type = TSDB_DATA_TYPE_TIMESTAMP;
}
// todo bug!
initResWindowInfo(&pRuntimeEnv->swindowResInfo, 3, type, pSupporter->pResult);
}
pSupporter->rawSKey = pQuery->skey;
pSupporter->rawEKey = pQuery->ekey;
/* query on single table */
pSupporter->numOfMeters = 1;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
SPointInterpoSupporter interpInfo = {0};
pointInterpSupporterInit(pQuery, &interpInfo);
/*
* in case of last_row query without query range, we set the query timestamp to
* pMeterObj->lastKey. Otherwise, keep the initial query time range unchanged.
*/
if (isFirstLastRowQuery(pQuery) && notHasQueryTimeRange(pQuery)) {
if (!normalizeUnBoundLastRowQuery(pSupporter, &interpInfo)) {
sem_post(&pQInfo->dataReady);
pQInfo->over = 1;
pointInterpSupporterDestroy(&interpInfo);
return TSDB_CODE_SUCCESS;
}
} else {
if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo) == false) ||
(isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) ||
(isTopBottomQuery(pQuery) && pQuery->limit.offset >= pQuery->pSelectExpr[1].pBase.arg[0].argValue.i64)) {
sem_post(&pQInfo->dataReady);
pQInfo->over = 1;
pointInterpSupporterDestroy(&interpInfo);
return TSDB_CODE_SUCCESS;
// find the skey and ekey in case of sliding query
if (pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0) {
int64_t skey = 0;
SWAP(pQuery->skey, pQuery->ekey, int64_t);
pQuery->order.order ^= 1;
pQuery->lastKey = pQuery->skey;
if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &skey) == false) {
sem_post(&pQInfo->dataReady);
pQInfo->over = 1;
pointInterpSupporterDestroy(&interpInfo);
return TSDB_CODE_SUCCESS;
}
pQuery->skey = skey;
pQuery->order.order ^= 1;
SWAP(pQuery->skey, pQuery->ekey, int64_t);
int64_t ekey = 0;
pQuery->lastKey = pQuery->skey;
if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &ekey) == false) {
//
}
pQuery->skey = ekey;
TSKEY skey1, ekey1;
TSKEY windowSKey = 0, windowEKey = 0;
TSKEY minKey = MIN(pQuery->skey, pQuery->ekey);
TSKEY maxKey = MAX(pQuery->skey, pQuery->ekey);
doGetAlignedIntervalQueryRangeImpl(pQuery, minKey, minKey, maxKey, &skey1, &ekey1, &windowSKey, &windowEKey);
pRuntimeEnv->swindowResInfo.startTime = windowSKey;
pSupporter->rawSKey = pQuery->skey;
pSupporter->rawEKey = pQuery->ekey;
if (QUERY_IS_ASC_QUERY(pQuery)) {
pRuntimeEnv->swindowResInfo.prevSKey = windowSKey;
} else {
pRuntimeEnv->swindowResInfo.prevSKey = windowSKey + ((pQuery->skey - windowSKey) / pQuery->slidingTime) * pQuery->slidingTime;
}
} else {
int64_t ekey = 0;
if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &ekey) == false) ||
(isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) ||
(isTopBottomQuery(pQuery) && pQuery->limit.offset >= pQuery->pSelectExpr[1].pBase.arg[0].argValue.i64)) {
sem_post(&pQInfo->dataReady);
pQInfo->over = 1;
pointInterpSupporterDestroy(&interpInfo);
return TSDB_CODE_SUCCESS;
}
}
}
/*
* here we set the value for before and after the specified time into the
* parameter for interpolation query
......@@ -4050,7 +4445,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
pQuery->pointsRead = 0;
changeExecuteScanOrder(pQuery, true);
SQueryRuntimeEnv* pRuntimeEnv = &pSupporter->runtimeEnv;
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
doInitQueryFileInfoFD(&pRuntimeEnv->vnodeFileInfo);
vnodeInitDataBlockInfo(&pRuntimeEnv->loadBlockInfo);
......@@ -4086,17 +4481,17 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSQL_SO_ASC : TSQL_SO_DESC;
tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order);
}
int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pSupporter->runtimeEnv, pTagSchema, TSQL_SO_ASC, true);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
ret = allocateRuntimeEnvBuf(pRuntimeEnv, pMeter);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
tSidSetSort(pSupporter->pSidSet);
vnodeRecordAllFiles(pQInfo, pMeter->vnode);
......@@ -4105,9 +4500,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
}
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags;
pRuntimeEnv->hashList = taosInitHashTable(10039, taosIntHash_64, false);
pRuntimeEnv->usedIndex = 0;
pRuntimeEnv->pResult = pSupporter->pResult;
initResWindowInfo(&pRuntimeEnv->swindowResInfo, 10039, TSDB_DATA_TYPE_BIGINT, pSupporter->pResult);
}
if (pQuery->nAggTimeInterval != 0) {
......@@ -4186,12 +4579,12 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
}
}
TSKEY getTimestampInCacheBlock(SQueryRuntimeEnv* pRuntimeEnv, SCacheBlock *pBlock, int32_t index) {
TSKEY getTimestampInCacheBlock(SQueryRuntimeEnv *pRuntimeEnv, SCacheBlock *pBlock, int32_t index) {
if (pBlock == NULL || index >= pBlock->numOfPoints || index < 0) {
return -1;
}
return ((TSKEY*)(pRuntimeEnv->primaryColBuffer->data))[index];
return ((TSKEY *)(pRuntimeEnv->primaryColBuffer->data))[index];
}
/*
......@@ -4214,19 +4607,19 @@ TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) {
SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj;
int32_t fileIndex = vnodeGetVnodeHeaderFileIdx(&pQuery->fileId, pRuntimeEnv, pQuery->order.order);
dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, slot:%d load data block due to primary key required",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, pQuery->slot);
bool loadTS = true;
bool loadFields = true;
bool loadTS = true;
bool loadFields = true;
int32_t slot = pQuery->slot;
int32_t ret = loadDataBlockIntoMem(pBlock, &pQuery->pFields[slot], pRuntimeEnv, fileIndex, loadTS, loadFields);
if (ret != TSDB_CODE_SUCCESS) {
return -1;
}
SET_DATA_BLOCK_LOADED(pRuntimeEnv->blockStatus);
SET_FILE_BLOCK_FLAG(pRuntimeEnv->blockStatus);
......@@ -4254,7 +4647,7 @@ static TSKEY getFirstDataBlockInCache(SQueryRuntimeEnv *pRuntimeEnv) {
} else if (nextTimestamp > pQuery->ekey) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
return nextTimestamp;
}
......@@ -4286,13 +4679,13 @@ TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_sea
if (key < pQuery->ekey) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
return key;
} else {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
return -1; // no data to check
return -1; // no data to check
}
} else {//asc query
} else { // asc query
bool ret = getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_GREATER_EQUAL, searchFn);
if (ret) {
dTrace("QInfo:%p vid:%d sid:%d id:%s find the possible position, fileId:%d, slot:%d, pos:%d", pQInfo,
......@@ -4304,7 +4697,7 @@ TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_sea
if (key > pQuery->ekey) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
return key;
} else {
/*
......@@ -4321,9 +4714,9 @@ TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_sea
}
static int32_t moveToNextBlockInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __block_search_fn_t searchFn) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj;
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj;
SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache;
assert(pQuery->fileId < 0);
......@@ -4344,7 +4737,7 @@ static int32_t moveToNextBlockInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t ste
int32_t currentSlot = pCacheInfo->currentSlot;
int32_t firstSlot = getFirstCacheSlot(numOfBlocks, currentSlot, pCacheInfo);
if (step == QUERY_DESC_FORWARD_STEP && pQuery->slot == firstSlot) {
bool ret = getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn);
if (ret) {
......@@ -4413,16 +4806,16 @@ static int32_t moveToNextBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __bl
(step == QUERY_DESC_FORWARD_STEP && (pQuery->slot == 0))) {
fileIndex = getNextDataFileCompInfo(pRuntimeEnv, pMeterObj, step);
/* data maybe in cache */
if (fileIndex >= 0) { // next file
if (fileIndex >= 0) { // next file
pQuery->slot = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQuery->numOfBlocks - 1;
pQuery->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQuery->pBlock[pQuery->slot].numOfPoints - 1;
} else { // try data in cache
} else { // try data in cache
assert(pQuery->fileId == -1);
if (step == QUERY_ASC_FORWARD_STEP) {
getFirstDataBlockInCache(pRuntimeEnv);
} else { // no data to check for desc order query
} else { // no data to check for desc order query
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
}
......@@ -4431,7 +4824,7 @@ static int32_t moveToNextBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __bl
} else { // next block in the same file
int32_t fid = pQuery->fileId;
fileIndex = vnodeGetVnodeHeaderFileIdx(&fid, pRuntimeEnv, pQuery->order.order);
pQuery->slot += step;
pQuery->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQuery->pBlock[pQuery->slot].numOfPoints - 1;
}
......@@ -4462,16 +4855,16 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl
SQuery * pQuery = pRuntimeEnv->pQuery;
SQueryCostSummary *pSummary = &pRuntimeEnv->summary;
TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
TSKEY * primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
int64_t start = taosGetTimestampUs();
if (IS_DISK_DATA_BLOCK(pQuery)) {
SCompBlock *pBlock = getDiskDataBlock(pQuery, pQuery->slot);
*pblockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_FILE_BLOCK);
if (blockLoadStatus == DISK_DATA_LOADED) {
*forwardStep = applyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, pQuery->pFields[pQuery->slot], searchFn,
numOfRes);
*forwardStep = applyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, pQuery->pFields[pQuery->slot],
searchFn, numOfRes);
} else {
*forwardStep = pblockInfo->size;
}
......@@ -4479,17 +4872,25 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl
pSummary->fileTimeUs += (taosGetTimestampUs() - start);
} else {
assert(vnodeIsDatablockLoaded(pRuntimeEnv, pRuntimeEnv->pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD);
SCacheBlock *pBlock = getCacheDataBlock(pRuntimeEnv->pMeterObj, pRuntimeEnv, pQuery->slot);
*pblockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_CACHE_BLOCK);
*forwardStep =
applyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, NULL, searchFn, numOfRes);
*forwardStep = applyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, NULL, searchFn, numOfRes);
pSummary->cacheTimeUs += (taosGetTimestampUs() - start);
}
}
static void getNextLogicalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t *skey, int64_t *ekey) {
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
*skey += (pQuery->slidingTime * factor);
*ekey += (pQuery->slidingTime * factor);
}
static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
bool LOAD_DATA = true;
......@@ -4509,22 +4910,24 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SPositionInfo *pStartPos = &pRuntimeEnv->startPos;
assert(pQuery->slot == pStartPos->slot);
dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d, start fileId:%d, slot:%d, pos:%d, bstatus:%d",
dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64
", order:%d, start fileId:%d, slot:%d, pos:%d, bstatus:%d",
GET_QINFO_ADDR(pQuery), pQuery->skey, pQuery->ekey, pQuery->lastKey, pQuery->order.order, pStartPos->fileId,
pStartPos->slot, pStartPos->pos, pRuntimeEnv->blockStatus);
while (1) {
// check if query is killed or not set the status of query to pass the status check
if (isQueryKilled(pQuery)) {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
return cnt;
}
// if (isQueryKilled(pQuery)) {
// setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
// return cnt;
// }
int32_t numOfRes = 0;
SBlockInfo blockInfo = {0};
doHandleDataBlockImpl(pRuntimeEnv, &blockInfo, searchFn, &numOfRes, blockLoadStatus, &forwardStep);
dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", fileId:%d, slot:%d, pos:%d, bstatus:%d, rows:%d, checked:%d",
dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64
", fileId:%d, slot:%d, pos:%d, bstatus:%d, rows:%d, checked:%d",
GET_QINFO_ADDR(pQuery), blockInfo.keyFirst, blockInfo.keyLast, pQuery->fileId, pQuery->slot, pQuery->pos,
pRuntimeEnv->blockStatus, blockInfo.size, forwardStep);
......@@ -4742,8 +5145,8 @@ static void printBinaryData(int32_t functionId, char *data, int32_t srcDataType)
printf("%lf,%lf\t", *(double *)data, *(double *)(data + sizeof(double)));
} else if (functionId == TSDB_FUNC_TWA) {
data += 1;
printf("%lf,%" PRId64 ",%" PRId64 ",%" PRId64 "\t", *(double *)data, *(int64_t *)(data + 8), *(int64_t *)(data + 16),
*(int64_t *)(data + 24));
printf("%lf,%" PRId64 ",%" PRId64 ",%" PRId64 "\t", *(double *)data, *(int64_t *)(data + 8),
*(int64_t *)(data + 16), *(int64_t *)(data + 24));
} else if (functionId == TSDB_FUNC_MIN || functionId == TSDB_FUNC_MAX) {
switch (srcDataType) {
case TSDB_DATA_TYPE_TINYINT:
......@@ -5195,13 +5598,20 @@ int32_t doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) {
void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1;
}
for (int32_t i = 0; i < pRuntimeEnv->usedIndex; ++i) {
SOutputRes *buf = &pRuntimeEnv->pResult[i];
SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SWindowStatus *pStatus = &pWindowResInfo->pStatus[i];
if (!pStatus->closed) {
continue;
}
SOutputRes *buf = &pWindowResInfo->pResult[i];
// open/close the specified query for each group result
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
......@@ -5276,6 +5686,22 @@ void clearGroupResultBuf(SOutputRes *pOneOutputRes, int32_t nOutputCols) {
}
}
void copyGroupResultBuf(SOutputRes* dst, const SOutputRes* src, int32_t nOutputCols) {
for(int32_t i = 0; i < nOutputCols; ++i) {
SResultInfo *pDst = &dst->resultInfo[i];
SResultInfo *pSrc = &src->resultInfo[i];
char* buf = pDst->interResultBuf;
memcpy(pDst, pSrc, sizeof(SResultInfo));
pDst->interResultBuf = buf;
memcpy(pDst->interResultBuf, pSrc->interResultBuf, pDst->bufLen);
int32_t size = sizeof(tFilePage) + pSrc->bufLen * src->nAlloc;
memcpy(dst->result[i], src->result[i], size);
}
}
void destroyGroupResultBuf(SOutputRes *pOneOutputRes, int32_t nOutputCols) {
if (pOneOutputRes == NULL) {
return;
......@@ -5296,13 +5722,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
// ts_comp query does not required reversed output
// if (QUERY_IS_ASC_QUERY(pQuery) || isTSCompQuery(pQuery)) {
pCtx->aOutputBuf = pQuery->sdata[i]->data;
// } else { // point to the last position of output buffer for desc query
// pCtx->aOutputBuf = pQuery->sdata[i]->data + (rows - 1) * pCtx->outputBytes;
// }
pCtx->aOutputBuf = pQuery->sdata[i]->data;
/*
* set the output buffer information and intermediate buffer
......@@ -5380,30 +5800,17 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery->over &= (~QUERY_RESBUF_FULL);
} else {
int32_t numOfSkip = (int32_t)pQuery->limit.offset;
int32_t size = pQuery->pointsRead;
pQuery->pointsRead -= numOfSkip;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
// if (QUERY_IS_ASC_QUERY(pQuery)) {
memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->pointsRead * bytes);
// } else { // DESC query
// int32_t maxrows = pQuery->pointsToRead;
//
// memmove(pQuery->sdata[i]->data + (maxrows - pQuery->pointsRead) * bytes,
// pQuery->sdata[i]->data + (maxrows - size) * bytes, pQuery->pointsRead * bytes);
// }
pRuntimeEnv->pCtx[i].aOutputBuf -= bytes * numOfSkip * step;
memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->pointsRead * bytes);
pRuntimeEnv->pCtx[i].aOutputBuf += bytes * numOfSkip;
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
pRuntimeEnv->pCtx[i].ptsOutputBuf -= TSDB_KEYSIZE * numOfSkip * step;
pRuntimeEnv->pCtx[i].ptsOutputBuf += TSDB_KEYSIZE * numOfSkip;
}
}
......@@ -5411,27 +5818,6 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
/**
* move remain data to the start position of output buffer
* @param pRuntimeEnv
*/
void moveDescOrderResultsToFront(SQueryRuntimeEnv *pRuntimeEnv) {
// SQuery *pQuery = pRuntimeEnv->pQuery;
// int32_t maxrows = pQuery->pointsToRead;
//
// if (QUERY_IS_ASC_QUERY(pQuery) || isTSCompQuery(pQuery)) {
// return;
// }
//
// if (pQuery->pointsRead > 0 && pQuery->pointsRead < maxrows) {
// for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
// int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
// memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + (maxrows - pQuery->pointsRead) * bytes,
// pQuery->pointsRead * bytes);
// }
// }
}
typedef struct SQueryStatus {
SPositionInfo start;
SPositionInfo next;
......@@ -5450,7 +5836,7 @@ static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus
pStatus->overStatus = pQuery->over;
pStatus->lastKey = pQuery->lastKey;
pStatus->skey = pQuery->skey;
pStatus->ekey = pQuery->ekey;
......@@ -5479,7 +5865,7 @@ static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pSta
pQuery->lastKey = pStatus->lastKey;
pQuery->skey = pStatus->skey;
pQuery->ekey = pStatus->ekey;
pQuery->over = pStatus->overStatus;
pRuntimeEnv->startPos = pStatus->start;
......@@ -5541,13 +5927,20 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
while (1) {
doScanAllDataBlocks(pRuntimeEnv);
// applied to agg functions (e.g., stddev)
bool toContinue = true;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) {
// for each group result, call the finalize function for each column
for (int32_t i = 0; i < pRuntimeEnv->usedIndex; ++i) {
SOutputRes *buf = &pRuntimeEnv->pResult[i];
SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SOutputRes *buf = &pWindowResInfo->pResult[i];
SWindowStatus *pStatus = &pWindowResInfo->pStatus[i];
if (!pStatus->closed) {
continue;
}
setGroupOutputBuffer(pRuntimeEnv, buf);
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
......@@ -5588,9 +5981,11 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
doSingleMeterSupplementScan(pRuntimeEnv);
// reset status code
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
for (int32_t i = 0; i < pRuntimeEnv->usedIndex; ++i) {
SOutputRes *buf = &pRuntimeEnv->pResult[i];
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) {
SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SOutputRes *buf = &pWindowResInfo->pResult[i];
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
buf->resultInfo[j].complete = false;
}
......@@ -5608,10 +6003,17 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
void doFinalizeResult(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) {
// for each group result, call the finalize function for each column
for (int32_t i = 0; i < pRuntimeEnv->usedIndex; ++i) {
SOutputRes *buf = &pRuntimeEnv->pResult[i];
SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SOutputRes *buf = &pWindowResInfo->pResult[i];
SWindowStatus* pStatus = &pWindowResInfo->pStatus[i];
if (!pStatus->closed) {
continue;
}
setGroupOutputBuffer(pRuntimeEnv, buf);
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
......@@ -5670,37 +6072,77 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) {
return maxOutput;
}
/*
* forward the query range for next interval query
*/
void forwardIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQueryRuntimeEnv *pRuntimeEnv) {
static int32_t getNextIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQueryRuntimeEnv *pRuntimeEnv,
int64_t *skey, int64_t *ekey) {
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
pQuery->ekey += (pQuery->nAggTimeInterval * factor);
pQuery->skey = pQuery->ekey - (pQuery->nAggTimeInterval - 1) * factor;
printf("------------------------%lld, %lld\n", pQuery->skey, pQuery->ekey);
// boundary check
if (QUERY_IS_ASC_QUERY(pQuery)) {
if (pQuery->skey > pSupporter->rawEKey) {
setQueryStatus(pQuery, QUERY_COMPLETED);
return;
}
*skey = pRuntimeEnv->intervalSKey + (pQuery->slidingTime * factor);
*ekey = pRuntimeEnv->intervalEKey + (pQuery->slidingTime * factor);
printf("new window:%lld, %lld\n", *skey, *ekey);
if (pQuery->slidingTime > 0) {
if (QUERY_IS_ASC_QUERY(pQuery)) {
// the next sliding window is not contained in the query time range
if (*skey < pSupporter->rawSKey) {
*skey = pSupporter->rawSKey;
}
if (pQuery->ekey > pSupporter->rawEKey) {
pQuery->ekey = pSupporter->rawEKey;
if (*skey > pSupporter->rawEKey) {
return QUERY_COMPLETED;
// setQueryStatus(pQuery, QUERY_COMPLETED);
// return;
}
if (*ekey > pSupporter->rawEKey) {
*ekey = pSupporter->rawEKey;
}
} else {
if (*skey > pSupporter->rawSKey) {
*skey = pSupporter->rawSKey;
}
if (*skey < pSupporter->rawEKey) {
// setQueryStatus(pQuery, QUERY_COMPLETED);
return QUERY_COMPLETED;
}
if (*ekey < pSupporter->rawEKey) {
*ekey = pSupporter->rawEKey;
}
}
} else {
if (pQuery->skey < pSupporter->rawEKey) {
}
return QUERY_NOT_COMPLETED;
}
/*
* forward the query range for next interval query
*/
void forwardIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0) {
if ((QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey >= pQuery->ekey) ||
(!QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey <= pQuery->ekey)) {
setQueryStatus(pQuery, QUERY_COMPLETED);
return;
} else {
TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
}
if (pQuery->ekey < pSupporter->rawEKey) {
pQuery->ekey = pSupporter->rawEKey;
}
return;
}
int32_t r = getNextIntervalQueryRange(pSupporter, pRuntimeEnv, &pQuery->skey, &pQuery->ekey);
if (r == QUERY_COMPLETED) {
setQueryStatus(pQuery, QUERY_COMPLETED);
return;
}
getNextLogicalQueryRange(pRuntimeEnv, &pRuntimeEnv->intervalSKey, &pRuntimeEnv->intervalEKey);
/* ensure the search in cache will return right position */
pQuery->lastKey = pQuery->skey;
......@@ -5715,7 +6157,7 @@ void forwardIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQueryRuntimeE
// bridge the gap in group by time function
if ((nextTimestamp > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(nextTimestamp < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
getAlignedIntervalQueryRange(pQuery, nextTimestamp, pSupporter->rawSKey, pSupporter->rawEKey);
getAlignedIntervalQueryRange(pRuntimeEnv, nextTimestamp, pSupporter->rawSKey, pSupporter->rawEKey);
}
}
......@@ -5750,33 +6192,33 @@ int32_t vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, tSidSet *pSidSet
SVnodeObj *pVnode = &vnodeList[vid];
char* buf = calloc(1, getCompHeaderSegSize(&pVnode->cfg));
char *buf = calloc(1, getCompHeaderSegSize(&pVnode->cfg));
if (buf == NULL) {
*numOfMeters = 0;
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
SQueryFilesInfo *pVnodeFileInfo = &pRuntimeEnv->vnodeFileInfo;
int32_t headerSize = getCompHeaderSegSize(&pVnode->cfg);
lseek(pVnodeFileInfo->headerFd, TSDB_FILE_HEADER_LEN, SEEK_SET);
read(pVnodeFileInfo->headerFd, buf, headerSize);
// check the offset value integrity
if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, vid, buf - TSDB_FILE_HEADER_LEN,
headerSize) < 0) {
free(buf);
*numOfMeters = 0;
return TSDB_CODE_FILE_CORRUPTED;
}
int64_t oldestKey = getOldestKey(pVnode->numOfFiles, pVnode->fileId, &pVnode->cfg);
int64_t oldestKey = getOldestKey(pVnode->numOfFiles, pVnode->fileId, &pVnode->cfg);
(*pReqMeterDataInfo) = malloc(POINTER_BYTES * pSidSet->numOfSids);
if (*pReqMeterDataInfo == NULL) {
free(buf);
*numOfMeters = 0;
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
......@@ -5821,19 +6263,19 @@ int32_t vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, tSidSet *pSidSet
}
}
int64_t headerOffset = sizeof(SCompHeader) * pMeterObj->sid;
int64_t headerOffset = sizeof(SCompHeader) * pMeterObj->sid;
SCompHeader *compHeader = (SCompHeader *)(buf + headerOffset);
if (compHeader->compInfoOffset == 0) { // current table is empty
continue;
}
// corrupted file may cause the invalid compInfoOffset, check needs
int32_t compHeaderOffset = getCompHeaderStartPosition(&pVnode->cfg);
if (validateCompBlockOffset(pQInfo, pMeterObj, compHeader, &pRuntimeEnv->vnodeFileInfo, compHeaderOffset) !=
TSDB_CODE_SUCCESS) {
free(buf);
*numOfMeters = 0;
return TSDB_CODE_FILE_CORRUPTED;
}
......@@ -5855,7 +6297,7 @@ int32_t vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, tSidSet *pSidSet
}
free(buf);
return TSDB_CODE_SUCCESS;
}
......@@ -5973,8 +6415,8 @@ void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, SMeterQueryInfo *p
((pQuery->lastKey <= pQuery->skey) && !QUERY_IS_ASC_QUERY(pQuery)));
}
static void clearAllMeterDataBlockInfo(SMeterDataInfo** pMeterDataInfo, int32_t start, int32_t end) {
for(int32_t i = start; i < end; ++i) {
static void clearAllMeterDataBlockInfo(SMeterDataInfo **pMeterDataInfo, int32_t start, int32_t end) {
for (int32_t i = start; i < end; ++i) {
tfree(pMeterDataInfo[i]->pBlock);
pMeterDataInfo[i]->numOfBlocks = 0;
pMeterDataInfo[i]->start = -1;
......@@ -6025,7 +6467,7 @@ static bool setValidDataBlocks(SMeterDataInfo *pMeterDataInfo, int32_t end) {
if (size != pMeterDataInfo->numOfBlocks) {
memmove(pMeterDataInfo->pBlock, &pMeterDataInfo->pBlock[pMeterDataInfo->start], size * sizeof(SCompBlock));
char *tmp = realloc(pMeterDataInfo->pBlock, size * sizeof(SCompBlock));
if (tmp == NULL) {
return false;
......@@ -6043,7 +6485,7 @@ static bool setCurrentQueryRange(SMeterDataInfo *pMeterDataInfo, SQuery *pQuery,
SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
SMeterObj * pMeterObj = pMeterDataInfo->pMeterObj;
SMeterQueryInfo *pMeterQInfo = pMeterDataInfo->pMeterQInfo;
if (QUERY_IS_ASC_QUERY(pQuery)) {
*minval = pMeterQInfo->lastKey;
*maxval = endKey;
......@@ -6053,12 +6495,14 @@ static bool setCurrentQueryRange(SMeterDataInfo *pMeterDataInfo, SQuery *pQuery,
}
if (*minval > *maxval) {
qTrace("QInfo:%p vid:%d sid:%d id:%s, no result in files, qrange:%" PRId64 "-%" PRId64 ", lastKey:%" PRId64, pQInfo, pMeterObj->vnode,
pMeterObj->sid, pMeterObj->meterId, pMeterQInfo->skey, pMeterQInfo->ekey, pMeterQInfo->lastKey);
qTrace("QInfo:%p vid:%d sid:%d id:%s, no result in files, qrange:%" PRId64 "-%" PRId64 ", lastKey:%" PRId64, pQInfo,
pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pMeterQInfo->skey, pMeterQInfo->ekey,
pMeterQInfo->lastKey);
return false;
} else {
qTrace("QInfo:%p vid:%d sid:%d id:%s, query in files, qrange:%" PRId64 "-%" PRId64 ", lastKey:%" PRId64, pQInfo, pMeterObj->vnode,
pMeterObj->sid, pMeterObj->meterId, pMeterQInfo->skey, pMeterQInfo->ekey, pMeterQInfo->lastKey);
qTrace("QInfo:%p vid:%d sid:%d id:%s, query in files, qrange:%" PRId64 "-%" PRId64 ", lastKey:%" PRId64, pQInfo,
pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pMeterQInfo->skey, pMeterQInfo->ekey,
pMeterQInfo->lastKey);
return true;
}
}
......@@ -6077,19 +6521,19 @@ int32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuery
SQueryCostSummary *pSummary = &pSupporter->runtimeEnv.summary;
TSKEY minval, maxval;
*numOfBlocks = 0;
SQueryFilesInfo *pVnodeFileInfo = &pSupporter->runtimeEnv.vnodeFileInfo;
// sequentially scan this header file to extract the compHeader info
for (int32_t j = 0; j < numOfMeters; ++j) {
SMeterObj *pMeterObj = pMeterDataInfo[j]->pMeterObj;
lseek(pVnodeFileInfo->headerFd, pMeterDataInfo[j]->offsetInHeaderFile, SEEK_SET);
SCompInfo compInfo = {0};
read(pVnodeFileInfo->headerFd, &compInfo, sizeof(SCompInfo));
int32_t ret = validateCompBlockInfoSegment(pQInfo, filePath, pMeterObj->vnode, &compInfo,
pMeterDataInfo[j]->offsetInHeaderFile);
if (ret != TSDB_CODE_SUCCESS) { // file corrupted
......@@ -6101,24 +6545,25 @@ int32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuery
clearAllMeterDataBlockInfo(pMeterDataInfo, 0, j);
continue;
}
int32_t size = compInfo.numOfBlocks * sizeof(SCompBlock);
size_t bufferSize = size + sizeof(TSCKSUM);
pMeterDataInfo[j]->numOfBlocks = compInfo.numOfBlocks;
pMeterDataInfo[j]->pBlock = calloc(1, bufferSize);
if (pMeterDataInfo[j]->pBlock == NULL) {
clearAllMeterDataBlockInfo(pMeterDataInfo, 0, j);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
read(pVnodeFileInfo->headerFd, pMeterDataInfo[j]->pBlock, bufferSize);
TSCKSUM checksum = *(TSCKSUM*)((char*)pMeterDataInfo[j]->pBlock + size);
TSCKSUM checksum = *(TSCKSUM *)((char *)pMeterDataInfo[j]->pBlock + size);
int64_t st = taosGetTimestampUs();
// check compblock integrity
ret = validateCompBlockSegment(pQInfo, filePath, &compInfo, (char*) pMeterDataInfo[j]->pBlock, pMeterObj->vnode, checksum);
ret = validateCompBlockSegment(pQInfo, filePath, &compInfo, (char *)pMeterDataInfo[j]->pBlock, pMeterObj->vnode,
checksum);
if (ret != TSDB_CODE_SUCCESS) {
clearAllMeterDataBlockInfo(pMeterDataInfo, 0, j);
return TSDB_CODE_FILE_CORRUPTED;
......@@ -6145,7 +6590,7 @@ int32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuery
if (!setValidDataBlocks(pMeterDataInfo[j], end)) {
clearAllMeterDataBlockInfo(pMeterDataInfo, 0, j);
pQInfo->killed = 1; // set query kill, abort current query since no memory available
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
......@@ -6207,14 +6652,14 @@ static int32_t blockAccessOrderComparator(const void *pLeft, const void *pRight,
return pLeftBlockInfoEx->pBlock.compBlock->offset > pRightBlockInfoEx->pBlock.compBlock->offset ? 1 : -1;
}
void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) {
void cleanBlockOrderSupporter(SBlockOrderSupporter *pSupporter, int32_t numOfTables) {
tfree(pSupporter->numOfBlocksPerMeter);
tfree(pSupporter->blockIndexArray);
for (int32_t i = 0; i < numOfTables; ++i) {
tfree(pSupporter->pDataBlockInfoEx[i]);
}
tfree(pSupporter->pDataBlockInfoEx);
}
......@@ -6258,13 +6703,13 @@ int32_t createDataBlocksInfoEx(SMeterDataInfo **pMeterDataInfo, int32_t numOfMet
SCompBlock *pBlock = pMeterDataInfo[j]->pBlock;
supporter.numOfBlocksPerMeter[numOfQualMeters] = pMeterDataInfo[j]->numOfBlocks;
char* buf = calloc(1, sizeof(SMeterDataBlockInfoEx) * pMeterDataInfo[j]->numOfBlocks);
char *buf = calloc(1, sizeof(SMeterDataBlockInfoEx) * pMeterDataInfo[j]->numOfBlocks);
if (buf == NULL) {
cleanBlockOrderSupporter(&supporter, numOfQualMeters);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
supporter.pDataBlockInfoEx[numOfQualMeters] = (SMeterDataBlockInfoEx*) buf;
supporter.pDataBlockInfoEx[numOfQualMeters] = (SMeterDataBlockInfoEx *)buf;
for (int32_t k = 0; k < pMeterDataInfo[j]->numOfBlocks; ++k) {
SMeterDataBlockInfoEx *pInfoEx = &supporter.pDataBlockInfoEx[numOfQualMeters][k];
......@@ -6283,7 +6728,7 @@ int32_t createDataBlocksInfoEx(SMeterDataInfo **pMeterDataInfo, int32_t numOfMet
dTrace("QInfo %p create data blocks info struct completed", addr);
assert(cnt == numOfCompBlocks && numOfQualMeters <= numOfMeters); // the pMeterDataInfo[j]->numOfBlocks may be 0
assert(cnt == numOfCompBlocks && numOfQualMeters <= numOfMeters); // the pMeterDataInfo[j]->numOfBlocks may be 0
supporter.numOfMeters = numOfQualMeters;
SLoserTreeInfo *pTree = NULL;
......@@ -6306,7 +6751,7 @@ int32_t createDataBlocksInfoEx(SMeterDataInfo **pMeterDataInfo, int32_t numOfMet
if (supporter.blockIndexArray[pos] >= supporter.numOfBlocksPerMeter[pos]) {
supporter.blockIndexArray[pos] = supporter.numOfBlocksPerMeter[pos] + 1;
}
tLoserTreeAdjust(pTree, pos + supporter.numOfMeters);
}
......@@ -6606,7 +7051,7 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete
/* current interval query is completed, set the next query range on other data blocks if exist */
int64_t prevEKey = pQuery->ekey;
getAlignedIntervalQueryRange(pQuery, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey);
getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey);
saveIntervalQueryRange(pRuntimeEnv, pInfo);
assert(queryCompleted && prevEKey < pQuery->skey);
......@@ -6657,7 +7102,7 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete
// current interval query is completed, set the next query range on other data blocks if exist
int64_t prevEKey = pQuery->ekey;
getAlignedIntervalQueryRange(pQuery, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey);
getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey);
saveIntervalQueryRange(pRuntimeEnv, pInfo);
assert(queryCompleted && prevEKey > pQuery->skey);
......@@ -6696,7 +7141,7 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete
(nextKey <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery)));
/* still in the same block to query */
getAlignedIntervalQueryRange(pQuery, nextKey, pSupporter->rawSKey, pSupporter->rawEKey);
getAlignedIntervalQueryRange(pRuntimeEnv, nextKey, pSupporter->rawSKey, pSupporter->rawEKey);
saveIntervalQueryRange(pRuntimeEnv, pInfo);
int32_t newPos = searchFn((char *)pPrimaryCol, pBlockInfo->size, pQuery->skey, pQuery->order.order);
......@@ -6743,7 +7188,7 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportO
* last query on this block of the meter is done, start next interval on this block
* otherwise, keep the previous query range and proceed
*/
getAlignedIntervalQueryRange(pQuery, key, pSupporter->rawSKey, pSupporter->rawEKey);
getAlignedIntervalQueryRange(pRuntimeEnv, key, pSupporter->rawSKey, pSupporter->rawEKey);
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
// previous query does not be closed, save the results and close it
......@@ -6763,7 +7208,7 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportO
return;
}
getAlignedIntervalQueryRange(pQuery, pQuery->skey, pSupporter->rawSKey, pSupporter->rawEKey);
getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->skey, pSupporter->rawSKey, pSupporter->rawEKey);
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
pMeterQueryInfo->queryRangeSet = 1;
}
......@@ -6833,7 +7278,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk
pQuery->pSelectExpr[i].pBase.colInfo.colId, *blkStatus);
}
if (pRuntimeEnv->pTSBuf > 0) {
if (pRuntimeEnv->pTSBuf > 0 || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) {
req |= BLK_DATA_ALL_NEEDED;
}
}
......@@ -6870,7 +7315,8 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk
dTrace("QInfo:%p fileId:%d, slot:%d, block discarded by per-filter, ", GET_QINFO_ADDR(pQuery), pQuery->fileId,
pQuery->slot);
#endif
qTrace("QInfo:%p id:%s slot:%d, data block ignored by pre-filter, fields loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d",
qTrace("QInfo:%p id:%s slot:%d, data block ignored by pre-filter, fields loaded, brange:%" PRId64 "-%" PRId64
", rows:%d",
GET_QINFO_ADDR(pQuery), pMeterObj->meterId, pQuery->slot, pBlock->keyFirst, pBlock->keyLast,
pBlock->numOfPoints);
return DISK_DATA_DISCARDED;
......@@ -6983,8 +7429,8 @@ int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQue
TSKEY ts = *(TSKEY *)getOutputResPos(pRuntimeEnv, pData, pData->numOfElems, 0);
SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj;
qTrace("QInfo:%p vid:%d sid:%d id:%s, save results, ts:%" PRId64 ", total:%d", GET_QINFO_ADDR(pQuery), pMeterObj->vnode,
pMeterObj->sid, pMeterObj->meterId, ts, pMeterQueryInfo->numOfRes + 1);
qTrace("QInfo:%p vid:%d sid:%d id:%s, save results, ts:%" PRId64 ", total:%d", GET_QINFO_ADDR(pQuery),
pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, ts, pMeterQueryInfo->numOfRes + 1);
pData->numOfElems += numOfResult;
pMeterQueryInfo->numOfRes += numOfResult;
......@@ -7023,8 +7469,8 @@ static int32_t getSubsetNumber(SMeterQuerySupportObj *pSupporter) {
SQuery *pQuery = pSupporter->runtimeEnv.pQuery;
int32_t totalSubset = 0;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
totalSubset = pSupporter->runtimeEnv.usedIndex;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) {
totalSubset = numOfResFromResWindowInfo(&pSupporter->runtimeEnv.swindowResInfo);
} else {
totalSubset = pSupporter->pSidSet->numOfSubSet;
}
......@@ -7104,8 +7550,7 @@ void copyFromGroupBuf(SQInfo *pQInfo, SOutputRes *result) {
SQuery * pQuery = &pQInfo->query;
SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter;
int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSQL_SO_DESC;
int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSQL_SO_ASC;
int32_t numOfResult = doCopyFromGroupBuf(pSupporter, result, orderType);
pQuery->pointsRead += numOfResult;
......@@ -7326,8 +7771,8 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage
int32_t ret = resultInterpolate(pQInfo, pDst, pDataSrc, numOfRows, numOfFinalRows);
assert(ret == numOfFinalRows);
/* reached the start position of according to offset value, return immediately */
if (pQuery->limit.offset == 0) {
/* reached the start position of according to offset value, return immediately */
return ret;
}
......@@ -7335,18 +7780,18 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage
ret -= pQuery->limit.offset;
// todo !!!!there exactly number of interpo is not valid.
// todo refactor move to the beginning of buffer
if (QUERY_IS_ASC_QUERY(pQuery)) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
memmove(pDst[i]->data, pDst[i]->data + pQuery->pSelectExpr[i].resBytes * pQuery->limit.offset,
ret * pQuery->pSelectExpr[i].resBytes);
}
} else {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
memmove(pDst[i]->data + (pQuery->pointsToRead - ret) * pQuery->pSelectExpr[i].resBytes,
pDst[i]->data + (pQuery->pointsToRead - ret - pQuery->limit.offset) * pQuery->pSelectExpr[i].resBytes,
ret * pQuery->pSelectExpr[i].resBytes);
}
// if (QUERY_IS_ASC_QUERY(pQuery)) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
memmove(pDst[i]->data, pDst[i]->data + pQuery->pSelectExpr[i].resBytes * pQuery->limit.offset,
ret * pQuery->pSelectExpr[i].resBytes);
}
// } else {
// for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
// memmove(pDst[i]->data + (pQuery->pointsToRead - ret) * pQuery->pSelectExpr[i].resBytes,
// pDst[i]->data + (pQuery->pointsToRead - ret - pQuery->limit.offset) *
// pQuery->pSelectExpr[i].resBytes, ret * pQuery->pSelectExpr[i].resBytes);
// }
// }
pQuery->limit.offset = 0;
return ret;
} else {
......
......@@ -535,7 +535,7 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start
SPointInterpoSupporter pointInterpSupporter = {0};
pointInterpSupporterInit(pQuery, &pointInterpSupporter);
if (!normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter)) {
if (!normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter, NULL)) {
pointInterpSupporterDestroy(&pointInterpSupporter);
return 0;
}
......@@ -666,19 +666,10 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
if (pSupporter->meterIdx >= pSids->numOfSids) {
return;
}
for (int32_t i = 0; i < pRuntimeEnv->usedIndex; ++i) {
SOutputRes *pOneRes = &pRuntimeEnv->pResult[i];
clearGroupResultBuf(pOneRes, pQuery->numOfOutputCols);
}
pRuntimeEnv->usedIndex = 0;
taosCleanUpHashTable(pRuntimeEnv->hashList);
int32_t primeHashSlot = 10039;
pRuntimeEnv->hashList = taosInitHashTable(primeHashSlot, taosIntHash_32, false);
while (pSupporter->meterIdx < pSupporter->numOfMeters) {
resetResWindowInfo(&pRuntimeEnv->swindowResInfo, pQuery->numOfOutputCols);
while (pSupporter->meterIdx < pSupporter->numOfMeters) {
int32_t k = pSupporter->meterIdx;
if (isQueryKilled(pQuery)) {
......@@ -703,7 +694,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
#endif
SPointInterpoSupporter pointInterpSupporter = {0};
if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter) == false) {
if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter, NULL) == false) {
pQuery->skey = pSupporter->rawSKey;
pQuery->ekey = pSupporter->rawEKey;
......@@ -778,8 +769,10 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
}
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
for (int32_t i = 0; i < pRuntimeEnv->usedIndex; ++i) {
SOutputRes *buf = &pRuntimeEnv->pResult[i];
SSlidingWindowResInfo* pWindowResInfo = &pRuntimeEnv->swindowResInfo;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SOutputRes *buf = &pWindowResInfo->pResult[i];
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
buf->numOfRows = MAX(buf->numOfRows, buf->resultInfo[j].numOfRes);
}
......@@ -787,14 +780,12 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
pQInfo->pMeterQuerySupporter->subgroupIdx = 0;
pQuery->pointsRead = 0;
copyFromGroupBuf(pQInfo, pRuntimeEnv->pResult);
copyFromGroupBuf(pQInfo, pWindowResInfo->pResult);
}
pQInfo->pointsRead += pQuery->pointsRead;
pQuery->pointsOffset = pQuery->pointsToRead;
moveDescOrderResultsToFront(pRuntimeEnv);
dTrace(
"QInfo %p vid:%d, numOfMeters:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d,"
"next skey:%" PRId64 ", offset:%" PRId64,
......@@ -982,12 +973,11 @@ static void vnodeSingleMeterFixedOutputProcessor(SQInfo *pQInfo) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
pQInfo->pMeterQuerySupporter->subgroupIdx = 0;
pQuery->pointsRead = 0;
copyFromGroupBuf(pQInfo, pRuntimeEnv->pResult);
copyFromGroupBuf(pQInfo, pRuntimeEnv->swindowResInfo.pResult);
}
doSkipResults(pRuntimeEnv);
doRevisedResultsByLimit(pQInfo);
moveDescOrderResultsToFront(pRuntimeEnv);
pQInfo->pointsRead = pQuery->pointsRead;
}
......@@ -1034,8 +1024,6 @@ static void vnodeSingleMeterMultiOutputProcessor(SQInfo *pQInfo) {
}
doRevisedResultsByLimit(pQInfo);
moveDescOrderResultsToFront(pRuntimeEnv);
pQInfo->pointsRead += pQuery->pointsRead;
if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) {
......@@ -1063,7 +1051,8 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter
(pQuery->skey >= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery)));
initCtxOutputBuf(pRuntimeEnv);
clearCompletedResWindows(&pRuntimeEnv->swindowResInfo, pQuery->numOfOutputCols);
vnodeScanAllData(pRuntimeEnv);
if (isQueryKilled(pQuery)) {
return;
......@@ -1094,7 +1083,7 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter
}
forwardIntervalQueryRange(pSupporter, pRuntimeEnv);
if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED)) {
if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED|QUERY_RESBUF_FULL)) {
break;
}
......@@ -1133,17 +1122,8 @@ static void vnodeSingleMeterIntervalProcessor(SQInfo *pQInfo) {
taosInterpoSetStartInfo(&pRuntimeEnv->interpoInfo, pQuery->pointsRead, pQuery->interpoType);
SData **pInterpoBuf = pRuntimeEnv->pInterpoBuf;
if (QUERY_IS_ASC_QUERY(pQuery)) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->pointsRead * pQuery->pSelectExpr[i].resBytes);
}
} else {
int32_t size = pMeterObj->pointsPerFileBlock;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
memcpy(pInterpoBuf[i]->data,
pQuery->sdata[i]->data + (size - pQuery->pointsRead) * pQuery->pSelectExpr[i].resBytes,
pQuery->pointsRead * pQuery->pSelectExpr[i].resBytes);
}
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->pointsRead * pQuery->pSelectExpr[i].resBytes);
}
numOfInterpo = 0;
......@@ -1160,12 +1140,16 @@ static void vnodeSingleMeterIntervalProcessor(SQInfo *pQInfo) {
pQuery->pointsRead = 0;
}
}
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0)) {
pQInfo->pMeterQuerySupporter->subgroupIdx = 0;
pQuery->pointsRead = 0;
copyFromGroupBuf(pQInfo, pRuntimeEnv->swindowResInfo.pResult);
}
pQInfo->pointsRead += pQuery->pointsRead;
pQInfo->pointsInterpo += numOfInterpo;
// moveDescOrderResultsToFront(pRuntimeEnv);
dTrace("%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d",
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo,
pQInfo->pointsRead - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned);
......@@ -1209,7 +1193,6 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
(tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo);
doRevisedResultsByLimit(pQInfo);
moveDescOrderResultsToFront(pRuntimeEnv);
pQInfo->pointsInterpo += numOfInterpo;
pQInfo->pointsRead += pQuery->pointsRead;
......
......@@ -267,6 +267,7 @@ static SQInfo *vnodeAllocateQInfoEx(SQueryMeterMsg *pQueryMsg, SSqlGroupbyExpr *
pQuery->pGroupbyExpr = pGroupbyExpr;
pQuery->nAggTimeInterval = pQueryMsg->nAggTimeInterval;
pQuery->slidingTime = pQueryMsg->slidingTime;
pQuery->interpoType = pQueryMsg->interpoType;
pQuery->intervalTimeUnit = pQueryMsg->intervalTimeUnit;
......@@ -966,6 +967,8 @@ int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQueryMsg) {
pQueryMsg->queryType = htons(pQueryMsg->queryType);
pQueryMsg->nAggTimeInterval = htobe64(pQueryMsg->nAggTimeInterval);
pQueryMsg->slidingTime = htobe64(pQueryMsg->slidingTime);
pQueryMsg->numOfTagsCols = htons(pQueryMsg->numOfTagsCols);
pQueryMsg->numOfCols = htons(pQueryMsg->numOfCols);
pQueryMsg->numOfOutputCols = htons(pQueryMsg->numOfOutputCols);
......
......@@ -423,7 +423,8 @@ void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen) {
__wr_lock(&pObj->lock);
}
SHashNode *pNode = doGetNodeFromHashTable(pObj, key, keyLen, NULL);
uint32_t val = 0;
SHashNode *pNode = doGetNodeFromHashTable(pObj, key, keyLen, &val);
if (pNode == NULL) {
if (pObj->multithreadSafe) {
__unlock(&pObj->lock);
......@@ -434,7 +435,12 @@ void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen) {
SHashNode *pNext = pNode->next;
if (pNode->prev != NULL) {
pNode->prev->next = pNext;
int32_t slot = HASH_INDEX(val, pObj->capacity);
if (pObj->hashList[slot]->next == pNode) {
pObj->hashList[slot]->next = pNext;
} else {
pNode->prev->next = pNext;
}
}
if (pNext != NULL) {
......
......@@ -93,6 +93,7 @@ uint32_t taosIntHash_64(const char *key, uint32_t UNUSED_PARAM(len)) {
_hash_fn_t taosGetDefaultHashFunction(int32_t type) {
_hash_fn_t fn = NULL;
switch(type) {
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT: fn = taosIntHash_64;break;
case TSDB_DATA_TYPE_BINARY: fn = MurmurHash3_32;break;
case TSDB_DATA_TYPE_INT: fn = taosIntHash_32; break;
......
......@@ -37,7 +37,7 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char
* here we revised the start time of day according to the local time zone,
* but in case of DST, the start time of one day need to be dynamically decided.
*
* TODO dynmaically decide the start time of a day
* TODO dynamically decide the start time of a day
*/
#if defined(WINDOWS) && _MSC_VER >= 1900
......@@ -94,7 +94,7 @@ void taosInterpoSetStartInfo(SInterpolationInfo* pInterpoInfo, int32_t numOfRawD
return;
}
pInterpoInfo->rowIdx = INTERPOL_IS_ASC_INTERPOL(pInterpoInfo) ? 0 : numOfRawDataInRows - 1;
pInterpoInfo->rowIdx = 0;//INTERPOL_IS_ASC_INTERPOL(pInterpoInfo) ? 0 : numOfRawDataInRows - 1;
pInterpoInfo->numOfRawDataInRows = numOfRawDataInRows;
}
......@@ -118,14 +118,14 @@ int32_t taosGetNumOfResWithoutLimit(SInterpolationInfo* pInterpoInfo, int64_t* p
if (numOfAvailRawData > 0) {
int32_t finalNumOfResult = 0;
if (pInterpoInfo->order == TSQL_SO_ASC) {
// if (pInterpoInfo->order == TSQL_SO_ASC) {
// get last timestamp, calculate the result size
int64_t lastKey = pPrimaryKeyArray[pInterpoInfo->numOfRawDataInRows - 1];
finalNumOfResult = (int32_t)((lastKey - pInterpoInfo->startTimestamp) / nInterval) + 1;
} else { // todo error less than one!!!
TSKEY lastKey = pPrimaryKeyArray[0];
finalNumOfResult = (int32_t)((pInterpoInfo->startTimestamp - lastKey) / nInterval) + 1;
}
finalNumOfResult = (int32_t)(labs(lastKey - pInterpoInfo->startTimestamp) / nInterval) + 1;
// } else { // todo error less than one!!!
// TSKEY lastKey = pPrimaryKeyArray[0];
// finalNumOfResult = (int32_t)((pInterpoInfo->startTimestamp - lastKey) / nInterval) + 1;
// }
assert(finalNumOfResult >= numOfAvailRawData);
return finalNumOfResult;
......@@ -198,11 +198,11 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi
}
static char* getPos(char* data, int32_t bytes, int32_t order, int32_t capacity, int32_t index) {
if (order == TSQL_SO_ASC) {
// if (order == TSQL_SO_ASC) {
return data + index * bytes;
} else {
return data + (capacity - index - 1) * bytes;
}
// } else {
// return data + (capacity - index - 1) * bytes;
// }
}
static void setTagsValueInInterpolation(tFilePage** data, char** pTags, tColModel* pModel, int32_t order, int32_t start,
......@@ -397,7 +397,7 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp
}
pInterpoInfo->startTimestamp += (nInterval * step);
pInterpoInfo->rowIdx += step;
pInterpoInfo->rowIdx += 1;
num += 1;
if ((pInterpoInfo->rowIdx >= pInterpoInfo->numOfRawDataInRows && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) ||
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册