提交 14be1790 编写于 作者: H hjxilinx

fix bug #929.[TBASE-1349]

上级 e03f3364
...@@ -267,9 +267,7 @@ typedef struct SQuery { ...@@ -267,9 +267,7 @@ typedef struct SQuery {
int16_t checkBufferInLoop; // check if the buffer is full during scan each block int16_t checkBufferInLoop; // check if the buffer is full during scan each block
SLimitVal limit; SLimitVal limit;
int32_t rowSize; int32_t rowSize;
int32_t dataRowSize; // row size of each loaded data from disk, the value is
// used for prepare buffer
SSqlGroupbyExpr * pGroupbyExpr; SSqlGroupbyExpr * pGroupbyExpr;
SSqlFunctionExpr * pSelectExpr; SSqlFunctionExpr * pSelectExpr;
SColumnInfoEx * colList; SColumnInfoEx * colList;
......
...@@ -40,6 +40,7 @@ typedef struct SQueryLoadBlockInfo { ...@@ -40,6 +40,7 @@ typedef struct SQueryLoadBlockInfo {
int32_t fileId; int32_t fileId;
int32_t slotIdx; int32_t slotIdx;
int32_t sid; int32_t sid;
bool tsLoaded; // if timestamp column of current block is loaded or not
} SQueryLoadBlockInfo; } SQueryLoadBlockInfo;
typedef struct SQueryLoadCompBlockInfo { typedef struct SQueryLoadCompBlockInfo {
......
...@@ -38,9 +38,16 @@ enum { ...@@ -38,9 +38,16 @@ enum {
TS_JOIN_TAG_NOT_EQUALS = 2, TS_JOIN_TAG_NOT_EQUALS = 2,
}; };
enum {
DISK_BLOCK_NO_NEED_TO_LOAD = 0,
DISK_BLOCK_LOAD_TS = 1,
DISK_BLOCK_LOAD_BLOCK = 2,
};
#define IS_DISK_DATA_BLOCK(q) ((q)->fileId >= 0) #define IS_DISK_DATA_BLOCK(q) ((q)->fileId >= 0)
//static int32_t copyDataFromMMapBuffer(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t offset, // static int32_t copyDataFromMMapBuffer(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t
// offset,
// int32_t size); // int32_t size);
static int32_t readDataFromDiskFile(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t offset, static int32_t readDataFromDiskFile(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t offset,
int32_t size); int32_t size);
...@@ -68,17 +75,17 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete ...@@ -68,17 +75,17 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete
__block_search_fn_t searchFn); __block_search_fn_t searchFn);
static int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult); static int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult);
static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, char *data, static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, char *data,
int64_t *pPrimaryData, SBlockInfo *pBlockInfo, int32_t blockStatus, int64_t *pPrimaryData, SBlockInfo *pBlockInfo, int32_t blockStatus,
SField *pFields, __block_search_fn_t searchFn); SField *pFields, __block_search_fn_t searchFn);
static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx); static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx);
static int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, static int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery,
const SQueryRuntimeEnv *pRuntimeEnv); const SQueryRuntimeEnv *pRuntimeEnv);
static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes); static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes);
static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid); static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid);
static void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn); static void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
// check the offset value integrity // check the offset value integrity
static FORCE_INLINE int32_t validateHeaderOffsetSegment(SQInfo *pQInfo, char *filePath, int32_t vid, char *data, static FORCE_INLINE int32_t validateHeaderOffsetSegment(SQInfo *pQInfo, char *filePath, int32_t vid, char *data,
...@@ -121,8 +128,8 @@ static FORCE_INLINE int32_t validateCompBlockInfoSegment(SQInfo *pQInfo, const c ...@@ -121,8 +128,8 @@ static FORCE_INLINE int32_t validateCompBlockInfoSegment(SQInfo *pQInfo, const c
return 0; return 0;
} }
static FORCE_INLINE int32_t validateCompBlockSegment(SQInfo *pQInfo, const char *filePath, SCompInfo *compInfo, char *pBlock, static FORCE_INLINE int32_t validateCompBlockSegment(SQInfo *pQInfo, const char *filePath, SCompInfo *compInfo,
int32_t vid, TSCKSUM checksum) { char *pBlock, int32_t vid, TSCKSUM checksum) {
uint32_t size = compInfo->numOfBlocks * sizeof(SCompBlock); uint32_t size = compInfo->numOfBlocks * sizeof(SCompBlock);
if (checksum != taosCalcChecksum(0, (uint8_t *)pBlock, size)) { if (checksum != taosCalcChecksum(0, (uint8_t *)pBlock, size)) {
...@@ -195,7 +202,8 @@ static bool vnodeIsCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj ...@@ -195,7 +202,8 @@ static bool vnodeIsCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj
// if vnodeFreeFields is called, the pQuery->pFields is NULL // if vnodeFreeFields is called, the pQuery->pFields is NULL
if (pLoadCompBlockInfo->fileListIndex == fileIndex && pLoadCompBlockInfo->sid == pMeterObj->sid && if (pLoadCompBlockInfo->fileListIndex == fileIndex && pLoadCompBlockInfo->sid == pMeterObj->sid &&
pQuery->pFields != NULL && pQuery->fileId > 0) { pQuery->pFields != NULL && pQuery->fileId > 0) {
assert(pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex].fileID == pLoadCompBlockInfo->fileId && pQuery->numOfBlocks > 0); assert(pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex].fileID == pLoadCompBlockInfo->fileId &&
pQuery->numOfBlocks > 0);
return true; return true;
} }
...@@ -216,7 +224,8 @@ static void vnodeInitLoadCompBlockInfo(SQueryLoadCompBlockInfo *pCompBlockLoadIn ...@@ -216,7 +224,8 @@ static void vnodeInitLoadCompBlockInfo(SQueryLoadCompBlockInfo *pCompBlockLoadIn
pCompBlockLoadInfo->fileListIndex = -1; pCompBlockLoadInfo->fileListIndex = -1;
} }
static bool vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex) { static int32_t vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex,
bool loadPrimaryTS) {
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
SQueryLoadBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo; SQueryLoadBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo;
...@@ -224,13 +233,20 @@ static bool vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMe ...@@ -224,13 +233,20 @@ static bool vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMe
if (pLoadInfo->fileId == pQuery->fileId && pLoadInfo->slotIdx == pQuery->slot && pQuery->slot != -1 && if (pLoadInfo->fileId == pQuery->fileId && pLoadInfo->slotIdx == pQuery->slot && pQuery->slot != -1 &&
pLoadInfo->sid == pMeterObj->sid) { pLoadInfo->sid == pMeterObj->sid) {
assert(fileIndex == pLoadInfo->fileListIndex); assert(fileIndex == pLoadInfo->fileListIndex);
return true;
// previous load operation does not load the primary timestamp column, we only need to load the timestamp column
if (pLoadInfo->tsLoaded == false && pLoadInfo->tsLoaded != loadPrimaryTS) {
return DISK_BLOCK_LOAD_TS;
} else {
return DISK_BLOCK_NO_NEED_TO_LOAD;
}
} }
return false; return DISK_BLOCK_LOAD_BLOCK;
} }
static void vnodeSetDataBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex) { static void vnodeSetDataBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex,
bool tsLoaded) {
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
SQueryLoadBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo; SQueryLoadBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo;
...@@ -238,6 +254,7 @@ static void vnodeSetDataBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj ...@@ -238,6 +254,7 @@ static void vnodeSetDataBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj
pLoadInfo->slotIdx = pQuery->slot; pLoadInfo->slotIdx = pQuery->slot;
pLoadInfo->fileListIndex = fileIndex; pLoadInfo->fileListIndex = fileIndex;
pLoadInfo->sid = pMeterObj->sid; pLoadInfo->sid = pMeterObj->sid;
pLoadInfo->tsLoaded = tsLoaded;
} }
static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) { static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) {
...@@ -247,35 +264,35 @@ static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) { ...@@ -247,35 +264,35 @@ static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) {
pBlockLoadInfo->fileListIndex = -1; pBlockLoadInfo->fileListIndex = -1;
} }
static void vnodeSetOpenedFileNames(SQueryFilesInfo* pVnodeFilesInfo) { static void vnodeSetOpenedFileNames(SQueryFilesInfo *pVnodeFilesInfo) {
assert(pVnodeFilesInfo->current >= 0 && pVnodeFilesInfo->current < pVnodeFilesInfo->numOfFiles); assert(pVnodeFilesInfo->current >= 0 && pVnodeFilesInfo->current < pVnodeFilesInfo->numOfFiles);
SHeaderFileInfo* pCurrentFileInfo = &pVnodeFilesInfo->pFileInfo[pVnodeFilesInfo->current]; SHeaderFileInfo *pCurrentFileInfo = &pVnodeFilesInfo->pFileInfo[pVnodeFilesInfo->current];
/* /*
* set the full file path for current opened files * set the full file path for current opened files
* the maximum allowed path string length is PATH_MAX in Linux, 100 bytes is used to * the maximum allowed path string length is PATH_MAX in Linux, 100 bytes is used to
* suppress the compiler warnings * suppress the compiler warnings
*/ */
char str[PATH_MAX + 100] = {0}; char str[PATH_MAX + 100] = {0};
int32_t PATH_WITH_EXTRA = PATH_MAX + 100; int32_t PATH_WITH_EXTRA = PATH_MAX + 100;
int32_t vnodeId = pVnodeFilesInfo->vnodeId; int32_t vnodeId = pVnodeFilesInfo->vnodeId;
int32_t fileId = pCurrentFileInfo->fileID; int32_t fileId = pCurrentFileInfo->fileID;
int32_t len = snprintf(str, PATH_WITH_EXTRA, "%sv%df%d.head", pVnodeFilesInfo->dbFilePathPrefix, vnodeId, fileId); int32_t len = snprintf(str, PATH_WITH_EXTRA, "%sv%df%d.head", pVnodeFilesInfo->dbFilePathPrefix, vnodeId, fileId);
assert(len <= PATH_MAX); assert(len <= PATH_MAX);
strncpy(pVnodeFilesInfo->headerFilePath, str, PATH_MAX); strncpy(pVnodeFilesInfo->headerFilePath, str, PATH_MAX);
len = snprintf(str, PATH_WITH_EXTRA, "%sv%df%d.data", pVnodeFilesInfo->dbFilePathPrefix, vnodeId, fileId); len = snprintf(str, PATH_WITH_EXTRA, "%sv%df%d.data", pVnodeFilesInfo->dbFilePathPrefix, vnodeId, fileId);
assert(len <= PATH_MAX); assert(len <= PATH_MAX);
strncpy(pVnodeFilesInfo->dataFilePath, str, PATH_MAX); strncpy(pVnodeFilesInfo->dataFilePath, str, PATH_MAX);
len = snprintf(str, PATH_WITH_EXTRA, "%sv%df%d.last", pVnodeFilesInfo->dbFilePathPrefix, vnodeId, fileId); len = snprintf(str, PATH_WITH_EXTRA, "%sv%df%d.last", pVnodeFilesInfo->dbFilePathPrefix, vnodeId, fileId);
assert(len <= PATH_MAX); assert(len <= PATH_MAX);
strncpy(pVnodeFilesInfo->lastFilePath, str, PATH_MAX); strncpy(pVnodeFilesInfo->lastFilePath, str, PATH_MAX);
} }
...@@ -287,31 +304,31 @@ static void vnodeSetOpenedFileNames(SQueryFilesInfo* pVnodeFilesInfo) { ...@@ -287,31 +304,31 @@ static void vnodeSetOpenedFileNames(SQueryFilesInfo* pVnodeFilesInfo) {
* @return * @return
*/ */
static FORCE_INLINE bool isHeaderFileEmpty(int32_t vnodeId, size_t headerFileSize) { static FORCE_INLINE bool isHeaderFileEmpty(int32_t vnodeId, size_t headerFileSize) {
SVnodeCfg* pVnodeCfg = &vnodeList[vnodeId].cfg; SVnodeCfg *pVnodeCfg = &vnodeList[vnodeId].cfg;
return headerFileSize <= getCompHeaderStartPosition(pVnodeCfg); return headerFileSize <= getCompHeaderStartPosition(pVnodeCfg);
} }
static bool checkIsHeaderFileEmpty(SQueryFilesInfo* pVnodeFilesInfo, int32_t vnodeId) { static bool checkIsHeaderFileEmpty(SQueryFilesInfo *pVnodeFilesInfo, int32_t vnodeId) {
struct stat fstat = {0}; struct stat fstat = {0};
if (stat(pVnodeFilesInfo->headerFilePath, &fstat) < 0) { if (stat(pVnodeFilesInfo->headerFilePath, &fstat) < 0) {
return true; return true;
} }
pVnodeFilesInfo->headFileSize = fstat.st_size; pVnodeFilesInfo->headFileSize = fstat.st_size;
return isHeaderFileEmpty(vnodeId, pVnodeFilesInfo->headFileSize); return isHeaderFileEmpty(vnodeId, pVnodeFilesInfo->headFileSize);
} }
static void doCloseQueryFileInfoFD(SQueryFilesInfo* pVnodeFilesInfo) { static void doCloseQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) {
tclose(pVnodeFilesInfo->headerFd); tclose(pVnodeFilesInfo->headerFd);
tclose(pVnodeFilesInfo->dataFd); tclose(pVnodeFilesInfo->dataFd);
tclose(pVnodeFilesInfo->lastFd); tclose(pVnodeFilesInfo->lastFd);
} }
static void doInitQueryFileInfoFD(SQueryFilesInfo* pVnodeFilesInfo) { static void doInitQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) {
pVnodeFilesInfo->current = -1; pVnodeFilesInfo->current = -1;
pVnodeFilesInfo->headFileSize = -1; pVnodeFilesInfo->headFileSize = -1;
pVnodeFilesInfo->headerFd = FD_INITIALIZER; // set the initial value pVnodeFilesInfo->headerFd = FD_INITIALIZER; // set the initial value
pVnodeFilesInfo->dataFd = FD_INITIALIZER; pVnodeFilesInfo->dataFd = FD_INITIALIZER;
pVnodeFilesInfo->lastFd = FD_INITIALIZER; pVnodeFilesInfo->lastFd = FD_INITIALIZER;
...@@ -320,15 +337,15 @@ static void doInitQueryFileInfoFD(SQueryFilesInfo* pVnodeFilesInfo) { ...@@ -320,15 +337,15 @@ static void doInitQueryFileInfoFD(SQueryFilesInfo* pVnodeFilesInfo) {
/* /*
* clean memory and other corresponding resources are delegated to invoker * clean memory and other corresponding resources are delegated to invoker
*/ */
static int32_t doOpenQueryFileData(SQInfo* pQInfo, SQueryFilesInfo* pVnodeFileInfo, int32_t vnodeId) { static int32_t doOpenQueryFileData(SQInfo *pQInfo, SQueryFilesInfo *pVnodeFileInfo, int32_t vnodeId) {
SHeaderFileInfo* pHeaderFileInfo = &pVnodeFileInfo->pFileInfo[pVnodeFileInfo->current]; SHeaderFileInfo *pHeaderFileInfo = &pVnodeFileInfo->pFileInfo[pVnodeFileInfo->current];
pVnodeFileInfo->headerFd = open(pVnodeFileInfo->headerFilePath, O_RDONLY); pVnodeFileInfo->headerFd = open(pVnodeFileInfo->headerFilePath, O_RDONLY);
if (!FD_VALID(pVnodeFileInfo->headerFd)) { if (!FD_VALID(pVnodeFileInfo->headerFd)) {
dError("QInfo:%p failed open head file:%s reason:%s", pQInfo, pVnodeFileInfo->headerFilePath, strerror(errno)); dError("QInfo:%p failed open head file:%s reason:%s", pQInfo, pVnodeFileInfo->headerFilePath, strerror(errno));
return -1; return -1;
} }
/* /*
* current header file is empty or broken, return directly. * current header file is empty or broken, return directly.
* *
...@@ -339,55 +356,54 @@ static int32_t doOpenQueryFileData(SQInfo* pQInfo, SQueryFilesInfo* pVnodeFileIn ...@@ -339,55 +356,54 @@ static int32_t doOpenQueryFileData(SQInfo* pQInfo, SQueryFilesInfo* pVnodeFileIn
if (checkIsHeaderFileEmpty(pVnodeFileInfo, vnodeId)) { if (checkIsHeaderFileEmpty(pVnodeFileInfo, vnodeId)) {
qTrace("QInfo:%p vid:%d, fileId:%d, index:%d, size:%d, ignore file, empty or broken", pQInfo, qTrace("QInfo:%p vid:%d, fileId:%d, index:%d, size:%d, ignore file, empty or broken", pQInfo,
pVnodeFileInfo->vnodeId, pHeaderFileInfo->fileID, pVnodeFileInfo->current, pVnodeFileInfo->headFileSize); pVnodeFileInfo->vnodeId, pHeaderFileInfo->fileID, pVnodeFileInfo->current, pVnodeFileInfo->headFileSize);
return -1; return -1;
} }
pVnodeFileInfo->dataFd = open(pVnodeFileInfo->dataFilePath, O_RDONLY); pVnodeFileInfo->dataFd = open(pVnodeFileInfo->dataFilePath, O_RDONLY);
if (!FD_VALID(pVnodeFileInfo->dataFd)) { if (!FD_VALID(pVnodeFileInfo->dataFd)) {
dError("QInfo:%p failed open data file:%s reason:%s", pQInfo, pVnodeFileInfo->dataFilePath, strerror(errno)); dError("QInfo:%p failed open data file:%s reason:%s", pQInfo, pVnodeFileInfo->dataFilePath, strerror(errno));
return -1; return -1;
} }
pVnodeFileInfo->lastFd = open(pVnodeFileInfo->lastFilePath, O_RDONLY); pVnodeFileInfo->lastFd = open(pVnodeFileInfo->lastFilePath, O_RDONLY);
if (!FD_VALID(pVnodeFileInfo->lastFd)) { if (!FD_VALID(pVnodeFileInfo->lastFd)) {
dError("QInfo:%p failed open last file:%s reason:%s", pQInfo, pVnodeFileInfo->lastFilePath, strerror(errno)); dError("QInfo:%p failed open last file:%s reason:%s", pQInfo, pVnodeFileInfo->lastFilePath, strerror(errno));
return -1; return -1;
} }
pVnodeFileInfo->pHeaderFileData = mmap(NULL, pVnodeFileInfo->headFileSize, PROT_READ, MAP_SHARED, pVnodeFileInfo->pHeaderFileData =
pVnodeFileInfo->headerFd, 0); mmap(NULL, pVnodeFileInfo->headFileSize, PROT_READ, MAP_SHARED, pVnodeFileInfo->headerFd, 0);
if (pVnodeFileInfo->pHeaderFileData == MAP_FAILED) { if (pVnodeFileInfo->pHeaderFileData == MAP_FAILED) {
pVnodeFileInfo->pHeaderFileData = NULL; pVnodeFileInfo->pHeaderFileData = NULL;
doCloseQueryFileInfoFD(pVnodeFileInfo); doCloseQueryFileInfoFD(pVnodeFileInfo);
doInitQueryFileInfoFD(pVnodeFileInfo); doInitQueryFileInfoFD(pVnodeFileInfo);
dError("QInfo:%p failed to mmap header file:%s, size:%lld, %s", pQInfo, pVnodeFileInfo->headerFilePath, dError("QInfo:%p failed to mmap header file:%s, size:%lld, %s", pQInfo, pVnodeFileInfo->headerFilePath,
pVnodeFileInfo->headFileSize, strerror(errno)); pVnodeFileInfo->headFileSize, strerror(errno));
return -1; return -1;
} else { } else {
if (madvise(pVnodeFileInfo->pHeaderFileData, pVnodeFileInfo->headFileSize, MADV_SEQUENTIAL) == -1) { if (madvise(pVnodeFileInfo->pHeaderFileData, pVnodeFileInfo->headFileSize, MADV_SEQUENTIAL) == -1) {
dError("QInfo:%p failed to advise kernel the usage of header file, reason:%s", pQInfo, strerror(errno)); dError("QInfo:%p failed to advise kernel the usage of header file, reason:%s", pQInfo, strerror(errno));
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void doUnmapHeaderFile(SQueryFilesInfo* pVnodeFileInfo) { static void doUnmapHeaderFile(SQueryFilesInfo *pVnodeFileInfo) {
munmap(pVnodeFileInfo->pHeaderFileData, pVnodeFileInfo->headFileSize); munmap(pVnodeFileInfo->pHeaderFileData, pVnodeFileInfo->headFileSize);
pVnodeFileInfo->pHeaderFileData = NULL; pVnodeFileInfo->pHeaderFileData = NULL;
pVnodeFileInfo->headFileSize = -1; pVnodeFileInfo->headFileSize = -1;
} }
static void doCloseOpenedFileData(SQueryFilesInfo* pVnodeFileInfo) { static void doCloseOpenedFileData(SQueryFilesInfo *pVnodeFileInfo) {
if (pVnodeFileInfo->current >= 0) { if (pVnodeFileInfo->current >= 0) {
assert(pVnodeFileInfo->current < pVnodeFileInfo->numOfFiles && pVnodeFileInfo->current >= 0); assert(pVnodeFileInfo->current < pVnodeFileInfo->numOfFiles && pVnodeFileInfo->current >= 0);
doUnmapHeaderFile(pVnodeFileInfo); doUnmapHeaderFile(pVnodeFileInfo);
doCloseQueryFileInfoFD(pVnodeFileInfo); doCloseQueryFileInfoFD(pVnodeFileInfo);
doInitQueryFileInfoFD(pVnodeFileInfo); doInitQueryFileInfoFD(pVnodeFileInfo);
...@@ -412,22 +428,22 @@ char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int ...@@ -412,22 +428,22 @@ char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); // only for log output SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); // only for log output
SQueryFilesInfo *pVnodeFileInfo = &pRuntimeEnv->vnodeFileInfo; SQueryFilesInfo *pVnodeFileInfo = &pRuntimeEnv->vnodeFileInfo;
if (pVnodeFileInfo->current != fileIndex || pVnodeFileInfo->pHeaderFileData == NULL) { if (pVnodeFileInfo->current != fileIndex || pVnodeFileInfo->pHeaderFileData == NULL) {
if (pVnodeFileInfo->current >= 0) { if (pVnodeFileInfo->current >= 0) {
assert(pVnodeFileInfo->pHeaderFileData != NULL); assert(pVnodeFileInfo->pHeaderFileData != NULL);
} }
// do close the current memory mapped header file and corresponding fd // do close the current memory mapped header file and corresponding fd
doCloseOpenedFileData(pVnodeFileInfo); doCloseOpenedFileData(pVnodeFileInfo);
assert(pVnodeFileInfo->pHeaderFileData == NULL); assert(pVnodeFileInfo->pHeaderFileData == NULL);
// set current opened file Index // set current opened file Index
pVnodeFileInfo->current = fileIndex; pVnodeFileInfo->current = fileIndex;
// set the current opened files(header, data, last) path // set the current opened files(header, data, last) path
vnodeSetOpenedFileNames(pVnodeFileInfo); vnodeSetOpenedFileNames(pVnodeFileInfo);
if (doOpenQueryFileData(pQInfo, pVnodeFileInfo, vnodeId) != TSDB_CODE_SUCCESS) { if (doOpenQueryFileData(pQInfo, pVnodeFileInfo, vnodeId) != TSDB_CODE_SUCCESS) {
doCloseOpenedFileData(pVnodeFileInfo); // all the fds may be partially opened, close them anyway. doCloseOpenedFileData(pVnodeFileInfo); // all the fds may be partially opened, close them anyway.
return pVnodeFileInfo->pHeaderFileData; return pVnodeFileInfo->pHeaderFileData;
...@@ -445,7 +461,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim ...@@ -445,7 +461,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
SVnodeCfg * pCfg = &vnodeList[pMeterObj->vnode].cfg; SVnodeCfg * pCfg = &vnodeList[pMeterObj->vnode].cfg;
SHeaderFileInfo *pHeadeFileInfo = &pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex]; SHeaderFileInfo *pHeadeFileInfo = &pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex];
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
...@@ -466,7 +482,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim ...@@ -466,7 +482,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
if (data == NULL) { if (data == NULL) {
return -1; // failed to load the header file data into memory return -1; // failed to load the header file data into memory
} }
#else #else
char *data = calloc(1, tmsize + TSDB_FILE_HEADER_LEN); char *data = calloc(1, tmsize + TSDB_FILE_HEADER_LEN);
read(fd, data, tmsize + TSDB_FILE_HEADER_LEN); read(fd, data, tmsize + TSDB_FILE_HEADER_LEN);
...@@ -487,7 +503,8 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim ...@@ -487,7 +503,8 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
} }
// corrupted file may cause the invalid compInfoOffset, check needs // corrupted file may cause the invalid compInfoOffset, check needs
if (validateCompBlockOffset(pQInfo, pMeterObj, compHeader, &pRuntimeEnv->vnodeFileInfo, getCompHeaderStartPosition(pCfg)) < 0) { if (validateCompBlockOffset(pQInfo, pMeterObj, compHeader, &pRuntimeEnv->vnodeFileInfo,
getCompHeaderStartPosition(pCfg)) < 0) {
return -1; return -1;
} }
...@@ -751,8 +768,7 @@ static int32_t loadColumnIntoMem(SQuery *pQuery, SQueryFilesInfo *pQueryFileInfo ...@@ -751,8 +768,7 @@ static int32_t loadColumnIntoMem(SQuery *pQuery, SQueryFilesInfo *pQueryFileInfo
// load checksum // load checksum
TSCKSUM checksum = 0; TSCKSUM checksum = 0;
ret = readDataFromDiskFile(fd, pQInfo, pQueryFileInfo, (char *)&checksum, offset + pFields[col].len, ret = readDataFromDiskFile(fd, pQInfo, pQueryFileInfo, (char *)&checksum, offset + pFields[col].len, sizeof(TSCKSUM));
sizeof(TSCKSUM));
if (ret != 0) { if (ret != 0) {
return ret; return ret;
} }
...@@ -774,11 +790,11 @@ static int32_t loadColumnIntoMem(SQuery *pQuery, SQueryFilesInfo *pQueryFileInfo ...@@ -774,11 +790,11 @@ static int32_t loadColumnIntoMem(SQuery *pQuery, SQueryFilesInfo *pQueryFileInfo
} }
static int32_t loadDataBlockFieldsInfo(SQueryRuntimeEnv *pRuntimeEnv, SCompBlock *pBlock, SField **pField) { static int32_t loadDataBlockFieldsInfo(SQueryRuntimeEnv *pRuntimeEnv, SCompBlock *pBlock, SField **pField) {
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj;
SQueryFilesInfo *pVnodeFilesInfo = &pRuntimeEnv->vnodeFileInfo; SQueryFilesInfo *pVnodeFilesInfo = &pRuntimeEnv->vnodeFileInfo;
size_t size = sizeof(SField) * (pBlock->numOfCols) + sizeof(TSCKSUM); size_t size = sizeof(SField) * (pBlock->numOfCols) + sizeof(TSCKSUM);
// if *pField != NULL, this block is loaded once, in current query do nothing // if *pField != NULL, this block is loaded once, in current query do nothing
...@@ -822,6 +838,21 @@ static void fillWithNull(SQuery *pQuery, char *dst, int32_t col, int32_t numOfPo ...@@ -822,6 +838,21 @@ static void fillWithNull(SQuery *pQuery, char *dst, int32_t col, int32_t numOfPo
setNullN(dst, type, bytes, numOfPoints); setNullN(dst, type, bytes, numOfPoints);
} }
static int32_t loadPrimaryTSColumn(SQueryRuntimeEnv *pRuntimeEnv, SCompBlock *pBlock, SField **pField,
int32_t *columnBytes) {
SQuery *pQuery = pRuntimeEnv->pQuery;
assert(PRIMARY_TSCOL_LOADED(pQuery) == false);
if (columnBytes != NULL) {
(*columnBytes) += (*pField)[PRIMARYKEY_TIMESTAMP_COL_INDEX].len + sizeof(TSCKSUM);
}
int32_t ret = loadColumnIntoMem(pQuery, &pRuntimeEnv->vnodeFileInfo, pBlock, *pField, PRIMARYKEY_TIMESTAMP_COL_INDEX,
pRuntimeEnv->primaryColBuffer, pRuntimeEnv->unzipBuffer,
pRuntimeEnv->secondaryUnzipBuffer, pRuntimeEnv->unzipBufSize);
return ret;
}
static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIdx, static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIdx,
bool loadPrimaryCol, bool loadSField) { bool loadPrimaryCol, bool loadSField) {
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
...@@ -831,16 +862,40 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR ...@@ -831,16 +862,40 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
SData ** sdata = pRuntimeEnv->colDataBuffer; SData ** sdata = pRuntimeEnv->colDataBuffer;
assert(fileIdx == pRuntimeEnv->vnodeFileInfo.current); assert(fileIdx == pRuntimeEnv->vnodeFileInfo.current);
SData ** primaryTSBuf = &pRuntimeEnv->primaryColBuffer;
void * tmpBuf = pRuntimeEnv->unzipBuffer;
if (vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, fileIdx)) { SData **primaryTSBuf = &pRuntimeEnv->primaryColBuffer;
dTrace("QInfo:%p vid:%d sid:%d id:%s, data block has been loaded, ts:%d, slot:%d, brange:%lld-%lld, rows:%d", void * tmpBuf = pRuntimeEnv->unzipBuffer;
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, loadPrimaryCol, pQuery->slot, int32_t columnBytes = 0;
pBlock->keyFirst, pBlock->keyLast, pBlock->numOfPoints);
return 0; SQueryCostSummary *pSummary = &pRuntimeEnv->summary;
int32_t status = vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, fileIdx, loadPrimaryCol);
if (status == DISK_BLOCK_NO_NEED_TO_LOAD) {
dTrace(
"QInfo:%p vid:%d sid:%d id:%s, fileId:%d, data block has been loaded, no need to load again, ts:%d, slot:%d, "
"brange:%lld-%lld, rows:%d",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, loadPrimaryCol,
pQuery->slot, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfPoints);
if (loadSField && (pQuery->pFields == NULL || pQuery->pFields[pQuery->slot] == NULL)) {
loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, &pQuery->pFields[pQuery->slot]);
}
return TSDB_CODE_SUCCESS;
} else if (status == DISK_BLOCK_LOAD_TS) {
dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, data block has been loaded, incrementally load ts",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId);
assert(PRIMARY_TSCOL_LOADED(pQuery) == false && loadSField == true);
if (pQuery->pFields == NULL || pQuery->pFields[pQuery->slot] == NULL) {
loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, &pQuery->pFields[pQuery->slot]);
}
// load primary timestamp
int32_t ret = loadPrimaryTSColumn(pRuntimeEnv, pBlock, pField, &columnBytes);
vnodeSetDataBlockInfoLoaded(pRuntimeEnv, pMeterObj, fileIdx, loadPrimaryCol);
return ret;
} }
/* failed to load fields info, return with error info */ /* failed to load fields info, return with error info */
...@@ -848,21 +903,15 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR ...@@ -848,21 +903,15 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
return -1; return -1;
} }
SQueryCostSummary *pSummary = &pRuntimeEnv->summary;
int32_t columnBytes = 0;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
if (loadPrimaryCol) { if (loadPrimaryCol) {
if (PRIMARY_TSCOL_LOADED(pQuery)) { if (PRIMARY_TSCOL_LOADED(pQuery)) {
*primaryTSBuf = sdata[0]; *primaryTSBuf = sdata[0];
} else { } else {
columnBytes += (*pField)[PRIMARYKEY_TIMESTAMP_COL_INDEX].len + sizeof(TSCKSUM); int32_t ret = loadPrimaryTSColumn(pRuntimeEnv, pBlock, pField, &columnBytes);
int32_t ret = if (ret != TSDB_CODE_SUCCESS) {
loadColumnIntoMem(pQuery, &pRuntimeEnv->vnodeFileInfo, pBlock, *pField, PRIMARYKEY_TIMESTAMP_COL_INDEX, *primaryTSBuf, return ret;
tmpBuf, pRuntimeEnv->secondaryUnzipBuffer, pRuntimeEnv->unzipBufSize);
if (ret != 0) {
return -1;
} }
pSummary->numOfSeek++; pSummary->numOfSeek++;
...@@ -936,7 +985,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR ...@@ -936,7 +985,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
pSummary->loadBlocksUs += (et - st); pSummary->loadBlocksUs += (et - st);
pSummary->readDiskBlocks++; pSummary->readDiskBlocks++;
vnodeSetDataBlockInfoLoaded(pRuntimeEnv, pMeterObj, fileIdx); vnodeSetDataBlockInfoLoaded(pRuntimeEnv, pMeterObj, fileIdx, loadPrimaryCol);
return ret; return ret;
} }
...@@ -1848,8 +1897,8 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv, ...@@ -1848,8 +1897,8 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv,
return -1; return -1;
} }
SQueryFilesInfo* pVnodeFiles = &pRuntimeEnv->vnodeFileInfo; SQueryFilesInfo *pVnodeFiles = &pRuntimeEnv->vnodeFileInfo;
/* set the initial file for current query */ /* set the initial file for current query */
if (order == TSQL_SO_ASC && *fid < pVnodeFiles->pFileInfo[0].fileID) { if (order == TSQL_SO_ASC && *fid < pVnodeFiles->pFileInfo[0].fileID) {
*fid = pVnodeFiles->pFileInfo[0].fileID; *fid = pVnodeFiles->pFileInfo[0].fileID;
...@@ -2967,7 +3016,7 @@ int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv *pRuntimeEnv, SPositionInfo *p ...@@ -2967,7 +3016,7 @@ int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv *pRuntimeEnv, SPositionInfo *p
* currently opened file is not the start file, reset to the start file * currently opened file is not the start file, reset to the start file
*/ */
int32_t fileIdx = vnodeGetVnodeHeaderFileIdx(&pQuery->fileId, pRuntimeEnv, pQuery->order.order); int32_t fileIdx = vnodeGetVnodeHeaderFileIdx(&pQuery->fileId, pRuntimeEnv, pQuery->order.order);
if (fileIdx < 0) { // ignore the files on disk if (fileIdx < 0) { // ignore the files on disk
dError("QInfo:%p failed to get data file:%d", GET_QINFO_ADDR(pQuery), pQuery->fileId); dError("QInfo:%p failed to get data file:%d", GET_QINFO_ADDR(pQuery), pQuery->fileId);
position->fileId = -1; position->fileId = -1;
return -1; return -1;
...@@ -3061,7 +3110,7 @@ static void vnodeRecordAllFiles(SQInfo *pQInfo, int32_t vnodeId) { ...@@ -3061,7 +3110,7 @@ static void vnodeRecordAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
SQueryFilesInfo *pVnodeFilesInfo = &(pQInfo->pMeterQuerySupporter->runtimeEnv.vnodeFileInfo); SQueryFilesInfo *pVnodeFilesInfo = &(pQInfo->pMeterQuerySupporter->runtimeEnv.vnodeFileInfo);
pVnodeFilesInfo->vnodeId = vnodeId; pVnodeFilesInfo->vnodeId = vnodeId;
sprintf(pVnodeFilesInfo->dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId); sprintf(pVnodeFilesInfo->dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId);
DIR *pDir = opendir(pVnodeFilesInfo->dbFilePathPrefix); DIR *pDir = opendir(pVnodeFilesInfo->dbFilePathPrefix);
if (pDir == NULL) { if (pDir == NULL) {
...@@ -3116,10 +3165,11 @@ static void vnodeRecordAllFiles(SQInfo *pQInfo, int32_t vnodeId) { ...@@ -3116,10 +3165,11 @@ static void vnodeRecordAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
closedir(pDir); closedir(pDir);
dTrace("QInfo:%p find %d data files in %s to be checked", pQInfo, pVnodeFilesInfo->numOfFiles, dTrace("QInfo:%p find %d data files in %s to be checked", pQInfo, pVnodeFilesInfo->numOfFiles,
pVnodeFilesInfo->dbFilePathPrefix); pVnodeFilesInfo->dbFilePathPrefix);
/* order the files information according their names */ /* order the files information according their names */
qsort(pVnodeFilesInfo->pFileInfo, (size_t)pVnodeFilesInfo->numOfFiles, sizeof(SHeaderFileInfo), file_order_comparator); qsort(pVnodeFilesInfo->pFileInfo, (size_t)pVnodeFilesInfo->numOfFiles, sizeof(SHeaderFileInfo),
file_order_comparator);
} }
static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, void *pBlock) { static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, void *pBlock) {
...@@ -3602,9 +3652,9 @@ void pointInterpSupporterInit(SQuery *pQuery, SPointInterpoSupporter *pInterpoSu ...@@ -3602,9 +3652,9 @@ void pointInterpSupporterInit(SQuery *pQuery, SPointInterpoSupporter *pInterpoSu
int32_t offset = 0; int32_t offset = 0;
for (int32_t i = 0, j = 0; i < pQuery->numOfCols; ++i, ++j) { for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
pInterpoSupport->pPrevPoint[j] = prev + offset; pInterpoSupport->pPrevPoint[i] = prev + offset;
pInterpoSupport->pNextPoint[j] = next + offset; pInterpoSupport->pNextPoint[i] = next + offset;
offset += pQuery->colList[i].data.bytes; offset += pQuery->colList[i].data.bytes;
} }
...@@ -3702,7 +3752,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete ...@@ -3702,7 +3752,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
pQuery->lastKey = pQuery->skey; pQuery->lastKey = pQuery->skey;
doInitQueryFileInfoFD(&pSupporter->runtimeEnv.vnodeFileInfo); doInitQueryFileInfoFD(&pSupporter->runtimeEnv.vnodeFileInfo);
vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo); vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo);
vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo); vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo);
...@@ -3871,7 +3921,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) ...@@ -3871,7 +3921,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
pQuery->pointsRead = 0; pQuery->pointsRead = 0;
changeExecuteScanOrder(pQuery, true); changeExecuteScanOrder(pQuery, true);
doInitQueryFileInfoFD(&pSupporter->runtimeEnv.vnodeFileInfo); doInitQueryFileInfoFD(&pSupporter->runtimeEnv.vnodeFileInfo);
vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo); vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo);
vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo); vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo);
...@@ -3933,16 +3983,16 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) ...@@ -3933,16 +3983,16 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
return TSDB_CODE_SERV_OUT_OF_MEMORY; return TSDB_CODE_SERV_OUT_OF_MEMORY;
} }
pSupporter->numOfPages = pSupporter->numOfMeters; pSupporter->numOfPages = pSupporter->numOfMeters;
ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE); ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile, dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
strerror(errno)); strerror(errno));
return TSDB_CODE_SERV_NO_DISKSPACE; return TSDB_CODE_SERV_NO_DISKSPACE;
} }
pSupporter->runtimeEnv.numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize; pSupporter->runtimeEnv.numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize;
pSupporter->lastPageId = -1; pSupporter->lastPageId = -1;
pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE; pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE;
...@@ -4026,7 +4076,7 @@ TSKEY getTimestampInCacheBlock(SCacheBlock *pBlock, int32_t index) { ...@@ -4026,7 +4076,7 @@ TSKEY getTimestampInCacheBlock(SCacheBlock *pBlock, int32_t index) {
/* /*
* NOTE: pQuery->pos will not change, the corresponding data block will be loaded into buffer * NOTE: pQuery->pos will not change, the corresponding data block will be loaded into buffer
* loadDataBlockOnDemand will change the value of pQuery->pos, according to the pQuery->lastKey * loadDataBlockOnDemand will change the value of pQuery->pos, according to the pQuery->lastKey
* */ */
TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) { TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
...@@ -4045,23 +4095,16 @@ TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) { ...@@ -4045,23 +4095,16 @@ TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) {
bool loadTimestamp = true; bool loadTimestamp = true;
int32_t fileId = pQuery->fileId; int32_t fileId = pQuery->fileId;
int32_t fileIndex = vnodeGetVnodeHeaderFileIdx(&fileId, pRuntimeEnv, pQuery->order.order); int32_t fileIndex = vnodeGetVnodeHeaderFileIdx(&fileId, pRuntimeEnv, pQuery->order.order);
if (!vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, fileIndex)) { dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, slot:%d load data block due to primary key required",
dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, slot:%d load data block due to primary key required", GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, pQuery->slot);
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, pQuery->slot);
int32_t ret =
// todo handle failed to load data, file corrupted
// todo refactor the return value
int32_t ret =
loadDataBlockIntoMem(pBlock, &pQuery->pFields[pQuery->slot], pRuntimeEnv, fileIndex, loadTimestamp, true); loadDataBlockIntoMem(pBlock, &pQuery->pFields[pQuery->slot], pRuntimeEnv, fileIndex, loadTimestamp, true);
UNUSED(ret); if (ret != TSDB_CODE_SUCCESS) {
} return -1;
// the fields info is not loaded, load it into memory
if (pQuery->pFields == NULL || pQuery->pFields[pQuery->slot] == NULL) {
loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, &pQuery->pFields[pQuery->slot]);
} }
SET_DATA_BLOCK_LOADED(pRuntimeEnv->blockStatus); SET_DATA_BLOCK_LOADED(pRuntimeEnv->blockStatus);
SET_FILE_BLOCK_FLAG(pRuntimeEnv->blockStatus); SET_FILE_BLOCK_FLAG(pRuntimeEnv->blockStatus);
...@@ -4757,17 +4800,17 @@ int32_t mergeMetersResultToOneGroups(SMeterQuerySupportObj *pSupporter) { ...@@ -4757,17 +4800,17 @@ int32_t mergeMetersResultToOneGroups(SMeterQuerySupportObj *pSupporter) {
int32_t end = pSupporter->pSidSet->starterPos[pSupporter->subgroupIdx + 1]; int32_t end = pSupporter->pSidSet->starterPos[pSupporter->subgroupIdx + 1];
ret = doMergeMetersResultsToGroupRes(pSupporter, pQuery, pRuntimeEnv, pSupporter->pMeterDataInfo, start, end); ret = doMergeMetersResultsToGroupRes(pSupporter, pQuery, pRuntimeEnv, pSupporter->pMeterDataInfo, start, end);
if (ret < 0) { // not enough disk space to save the data into disk if (ret < 0) { // not enough disk space to save the data into disk
return -1; return -1;
} }
pSupporter->subgroupIdx += 1; pSupporter->subgroupIdx += 1;
// this group generates at least one result, return results // this group generates at least one result, return results
if (ret > 0) { if (ret > 0) {
break; break;
} }
assert(pSupporter->numOfGroupResultPages == 0); assert(pSupporter->numOfGroupResultPages == 0);
dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pSupporter->subgroupIdx - 1); dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pSupporter->subgroupIdx - 1);
} }
...@@ -4784,7 +4827,7 @@ void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery) ...@@ -4784,7 +4827,7 @@ void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery)
// current results of group has been sent to client, try next group // current results of group has been sent to client, try next group
if (mergeMetersResultToOneGroups(pSupporter) != TSDB_CODE_SUCCESS) { if (mergeMetersResultToOneGroups(pSupporter) != TSDB_CODE_SUCCESS) {
return; // failed to save data in the disk return; // failed to save data in the disk
} }
// set current query completed // set current query completed
...@@ -4866,7 +4909,7 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery ...@@ -4866,7 +4909,7 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) { if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) {
return -1; return -1;
} }
resetMergeResultBuf(pQuery, pCtx); resetMergeResultBuf(pQuery, pCtx);
} }
...@@ -4914,11 +4957,12 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery ...@@ -4914,11 +4957,12 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
if (buffer[0]->numOfElems != 0) { // there are data in buffer if (buffer[0]->numOfElems != 0) { // there are data in buffer
if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) { if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) {
dError("QInfo:%p failed to flush data into temp file, abort query", GET_QINFO_ADDR(pQuery), pSupporter->extBufFile); dError("QInfo:%p failed to flush data into temp file, abort query", GET_QINFO_ADDR(pQuery),
pSupporter->extBufFile);
tfree(pTree); tfree(pTree);
tfree(pValidMeter); tfree(pValidMeter);
tfree(posArray); tfree(posArray);
return -1; return -1;
} }
} }
...@@ -4939,11 +4983,11 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery ...@@ -4939,11 +4983,11 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
return pSupporter->numOfGroupResultPages; return pSupporter->numOfGroupResultPages;
} }
static int32_t extendDiskBuf(const SQuery* pQuery, SMeterQuerySupportObj *pSupporter, int32_t numOfPages) { static int32_t extendDiskBuf(const SQuery *pQuery, SMeterQuerySupportObj *pSupporter, int32_t numOfPages) {
assert(pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE == pSupporter->bufSize); assert(pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE == pSupporter->bufSize);
SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pQuery); SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
int32_t ret = munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize); int32_t ret = munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize);
pSupporter->numOfPages = numOfPages; pSupporter->numOfPages = numOfPages;
...@@ -4957,26 +5001,27 @@ static int32_t extendDiskBuf(const SQuery* pQuery, SMeterQuerySupportObj *pSuppo ...@@ -4957,26 +5001,27 @@ static int32_t extendDiskBuf(const SQuery* pQuery, SMeterQuerySupportObj *pSuppo
strerror(errno)); strerror(errno));
pQInfo->code = -TSDB_CODE_SERV_NO_DISKSPACE; pQInfo->code = -TSDB_CODE_SERV_NO_DISKSPACE;
pQInfo->killed = 1; pQInfo->killed = 1;
return pQInfo->code; return pQInfo->code;
} }
pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE; pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE;
pSupporter->meterOutputMMapBuf = pSupporter->meterOutputMMapBuf =
mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0); mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0);
if (pSupporter->meterOutputMMapBuf == MAP_FAILED) { if (pSupporter->meterOutputMMapBuf == MAP_FAILED) {
dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY;
pQInfo->killed = 1; pQInfo->killed = 1;
return pQInfo->code; return pQInfo->code;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, const SQueryRuntimeEnv *pRuntimeEnv) { int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery,
const SQueryRuntimeEnv *pRuntimeEnv) {
int32_t numOfMeterResultBufPages = pSupporter->lastPageId + 1; int32_t numOfMeterResultBufPages = pSupporter->lastPageId + 1;
int64_t dstSize = numOfMeterResultBufPages * DEFAULT_INTERN_BUF_SIZE + int64_t dstSize = numOfMeterResultBufPages * DEFAULT_INTERN_BUF_SIZE +
pSupporter->groupResultSize * (pSupporter->numOfGroupResultPages + 1); pSupporter->groupResultSize * (pSupporter->numOfGroupResultPages + 1);
...@@ -5039,7 +5084,7 @@ int32_t doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) { ...@@ -5039,7 +5084,7 @@ int32_t doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) {
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
} }
ret = saveResult(pSupporter, pMeterInfo[i].pMeterQInfo, pMeterInfo[i].pMeterQInfo->lastResRows); ret = saveResult(pSupporter, pMeterInfo[i].pMeterQInfo, pMeterInfo[i].pMeterQInfo->lastResRows);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
...@@ -5047,7 +5092,7 @@ int32_t doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) { ...@@ -5047,7 +5092,7 @@ int32_t doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) {
} }
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -5355,6 +5400,7 @@ static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -5355,6 +5400,7 @@ static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) {
// usually this load operation will incure load disk block operation // usually this load operation will incure load disk block operation
TSKEY endKey = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->endPos); TSKEY endKey = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->endPos);
assert((QUERY_IS_ASC_QUERY(pQuery) && endKey <= pQuery->ekey) || assert((QUERY_IS_ASC_QUERY(pQuery) && endKey <= pQuery->ekey) ||
(!QUERY_IS_ASC_QUERY(pQuery) && endKey >= pQuery->ekey)); (!QUERY_IS_ASC_QUERY(pQuery) && endKey >= pQuery->ekey));
...@@ -5600,18 +5646,19 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t ...@@ -5600,18 +5646,19 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t
SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter;
SMeterSidExtInfo ** pMeterSidExtInfo = pSupporter->pMeterSidExtInfo; SMeterSidExtInfo ** pMeterSidExtInfo = pSupporter->pMeterSidExtInfo;
SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv; SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv;
SVnodeObj *pVnode = &vnodeList[vid]; SVnodeObj *pVnode = &vnodeList[vid];
char * pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, vid, fileIndex); char *pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, vid, fileIndex);
if (pHeaderFileData == NULL) { // failed to load header file into buffer if (pHeaderFileData == NULL) { // failed to load header file into buffer
return 0; return 0;
} }
int32_t tmsize = sizeof(SCompHeader) * (pVnode->cfg.maxSessions) + sizeof(TSCKSUM); int32_t tmsize = sizeof(SCompHeader) * (pVnode->cfg.maxSessions) + sizeof(TSCKSUM);
// file is corrupted, abort query in current file // file is corrupted, abort query in current file
if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, vid, pHeaderFileData, tmsize) < 0) { if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, vid, pHeaderFileData, tmsize) <
0) {
*numOfMeters = 0; *numOfMeters = 0;
return 0; return 0;
} }
...@@ -5754,7 +5801,7 @@ void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY ...@@ -5754,7 +5801,7 @@ void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY
} }
} }
static tFilePage *allocNewPage(SQuery* pQuery, SMeterQuerySupportObj *pSupporter, uint32_t *pageId) { static tFilePage *allocNewPage(SQuery *pQuery, SMeterQuerySupportObj *pSupporter, uint32_t *pageId) {
if (pSupporter->lastPageId == pSupporter->numOfPages - 1) { if (pSupporter->lastPageId == pSupporter->numOfPages - 1) {
if (extendDiskBuf(pQuery, pSupporter, pSupporter->numOfPages + pSupporter->numOfMeters) != TSDB_CODE_SUCCESS) { if (extendDiskBuf(pQuery, pSupporter, pSupporter->numOfPages + pSupporter->numOfMeters) != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
...@@ -5765,9 +5812,10 @@ static tFilePage *allocNewPage(SQuery* pQuery, SMeterQuerySupportObj *pSupporter ...@@ -5765,9 +5812,10 @@ static tFilePage *allocNewPage(SQuery* pQuery, SMeterQuerySupportObj *pSupporter
return getFilePage(pSupporter, *pageId); return getFilePage(pSupporter, *pageId);
} }
tFilePage *addDataPageForMeterQueryInfo(SQuery* pQuery, SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportObj *pSupporter) { tFilePage *addDataPageForMeterQueryInfo(SQuery *pQuery, SMeterQueryInfo *pMeterQueryInfo,
uint32_t pageId = 0; SMeterQuerySupportObj *pSupporter) {
uint32_t pageId = 0;
tFilePage *pPage = allocNewPage(pQuery, pSupporter, &pageId); tFilePage *pPage = allocNewPage(pQuery, pSupporter, &pageId);
if (pPage == NULL) { // failed to allocate disk-based buffer for intermediate results if (pPage == NULL) { // failed to allocate disk-based buffer for intermediate results
return NULL; return NULL;
...@@ -5907,7 +5955,7 @@ static bool setCurrentQueryRange(SMeterDataInfo *pMeterDataInfo, SQuery *pQuery, ...@@ -5907,7 +5955,7 @@ static bool setCurrentQueryRange(SMeterDataInfo *pMeterDataInfo, SQuery *pQuery,
* @return * @return
*/ */
uint32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, char *pHeaderData, uint32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, char *pHeaderData,
int32_t numOfMeters, const char* filePath, SMeterDataInfo **pMeterDataInfo) { int32_t numOfMeters, const char *filePath, SMeterDataInfo **pMeterDataInfo) {
uint32_t numOfBlocks = 0; uint32_t numOfBlocks = 0;
SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
SQueryCostSummary *pSummary = &pSupporter->runtimeEnv.summary; SQueryCostSummary *pSummary = &pSupporter->runtimeEnv.summary;
...@@ -6273,8 +6321,8 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete ...@@ -6273,8 +6321,8 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
tFilePage * pData = NULL; tFilePage * pData = NULL;
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
// in the first scan, new space needed for results // in the first scan, new space needed for results
if (pMeterQueryInfo->numOfPages == 0) { if (pMeterQueryInfo->numOfPages == 0) {
pData = addDataPageForMeterQueryInfo(pQuery, pMeterQueryInfo, pSupporter); pData = addDataPageForMeterQueryInfo(pQuery, pMeterQueryInfo, pSupporter);
...@@ -6289,7 +6337,7 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete ...@@ -6289,7 +6337,7 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete
} }
} }
} }
if (pData == NULL) { if (pData == NULL) {
return -1; return -1;
} }
...@@ -6298,12 +6346,12 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete ...@@ -6298,12 +6346,12 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete
pRuntimeEnv->pCtx[i].aOutputBuf = getOutputResPos(pRuntimeEnv, pData, pData->numOfElems, i); pRuntimeEnv->pCtx[i].aOutputBuf = getOutputResPos(pRuntimeEnv, pData, pData->numOfElems, i);
pRuntimeEnv->pCtx[i].resultInfo = &pMeterQueryInfo->resultInfo[i]; pRuntimeEnv->pCtx[i].resultInfo = &pMeterQueryInfo->resultInfo[i];
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int32_t meterIdx, int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int32_t meterIdx,
SMeterQueryInfo *pMeterQueryInfo) { SMeterQueryInfo *pMeterQueryInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
if (IS_MASTER_SCAN(pRuntimeEnv)) { if (IS_MASTER_SCAN(pRuntimeEnv)) {
...@@ -6620,8 +6668,8 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SBlockInfo *pBlockInfo) { ...@@ -6620,8 +6668,8 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SBlockInfo *pBlockInfo) {
int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blkStatus, SQueryRuntimeEnv *pRuntimeEnv, int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blkStatus, SQueryRuntimeEnv *pRuntimeEnv,
int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand) { int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand) {
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj; SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj;
TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
...@@ -6826,7 +6874,7 @@ int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQue ...@@ -6826,7 +6874,7 @@ int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQue
tColModelDisplay(cm, outputPage->data, outputPage->numOfElems, pRuntimeEnv->numOfRowsPerPage); tColModelDisplay(cm, outputPage->data, outputPage->numOfElems, pRuntimeEnv->numOfRowsPerPage);
#endif #endif
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -7077,7 +7125,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data ...@@ -7077,7 +7125,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
SQuery * pQuery = &pQInfo->query; SQuery * pQuery = &pQInfo->query;
int tnumOfRows = vnodeList[pObj->vnode].cfg.rowsInFileBlock; int tnumOfRows = vnodeList[pObj->vnode].cfg.rowsInFileBlock;
// for metric query, bufIndex always be 0. // for metric query, bufIndex always be 0.
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { // pQInfo->bufIndex == 0 for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { // pQInfo->bufIndex == 0
int32_t bytes = pQuery->pSelectExpr[col].resBytes; int32_t bytes = pQuery->pSelectExpr[col].resBytes;
......
...@@ -193,8 +193,6 @@ static SQInfo *vnodeAllocateQInfoCommon(SQueryMeterMsg *pQueryMsg, SMeterObj *pM ...@@ -193,8 +193,6 @@ static SQInfo *vnodeAllocateQInfoCommon(SQueryMeterMsg *pQueryMsg, SMeterObj *pM
} else { } else {
pQuery->colList[i].data.filters = NULL; pQuery->colList[i].data.filters = NULL;
} }
pQuery->dataRowSize += colList[i].bytes;
} }
vnodeUpdateQueryColumnIndex(pQuery, pMeterObj); vnodeUpdateQueryColumnIndex(pQuery, pMeterObj);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册