提交 aceeed3b 编写于 作者: H hjxilinx

[tbase-1007]

上级 22619e2d
......@@ -174,7 +174,7 @@ void queryOnBlock(SMeterQuerySupportObj* pSupporter, int64_t* primaryKeys, int32
SBlockInfo* pBlockBasicInfo, SMeterDataInfo* pDataHeadInfoEx, SField* pFields,
__block_search_fn_t searchFn);
SMeterDataInfo** vnodeFilterQualifiedMeters(SQInfo* pQInfo, int32_t vid, SQueryFileInfo* pQueryFileInfo,
SMeterDataInfo** vnodeFilterQualifiedMeters(SQInfo* pQInfo, int32_t vid, int32_t fileIndex,
tSidSet* pSidSet, SMeterDataInfo* pMeterDataInfo, int32_t* numOfMeters);
int32_t vnodeGetVnodeHeaderFileIdx(int32_t* fid, SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
......@@ -194,6 +194,7 @@ uint32_t getDataBlocksForMeters(SMeterQuerySupportObj* pSupporter, SQuery* pQuer
int32_t numOfMeters, SQueryFileInfo* pQueryFileInfo, SMeterDataInfo** pMeterDataInfo);
int32_t LoadDatablockOnDemand(SCompBlock* pBlock, SField** pFields, int8_t* blkStatus, SQueryRuntimeEnv* pRuntimeEnv,
int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand);
char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex);
/**
* Create SMeterQueryInfo.
......
......@@ -112,9 +112,7 @@ typedef struct RuntimeEnvironment {
SPositionInfo nextPos; /* start position of the next scan */
SData* colDataBuffer[TSDB_MAX_COLUMNS];
SResultInfo* resultInfo;
// Indicate if data block is loaded, the block is first/last/internal block
int8_t blockStatus;
uint8_t blockStatus; // Indicate if data block is loaded, the block is first/last/internal block
int32_t unzipBufSize;
SData* primaryColBuffer;
char* unzipBuffer;
......@@ -129,19 +127,17 @@ typedef struct RuntimeEnvironment {
* header files info, avoid to iterate the directory, the data is acquired
* during in query preparation function
*/
SQueryFileInfo* pHeaderFiles;
uint32_t numOfFiles; /* number of files of one vnode during query execution */
SQueryFileInfo* pVnodeFiles;
uint32_t numOfFiles; // the total available number of files for this virtual node during query execution
int32_t mmapedHFileIndex; // the mmaped header file, NOTE: only one header file can be mmap.
int16_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS];
int16_t scanFlag; /* denotes reversed scan of data or not */
int16_t scanFlag; // denotes reversed scan of data or not
SInterpolationInfo interpoInfo;
SData** pInterpoBuf;
SOutputRes* pResult; // reference to SQuerySupporter->pResult
void* hashList;
int32_t usedIndex; // assigned SOutputRes in list
STSBuf* pTSBuf;
STSCursor cur;
SQueryCostSummary summary;
......
......@@ -195,7 +195,7 @@ static bool vnodeIsCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj
// if vnodeFreeFields is called, the pQuery->pFields is NULL
if (pLoadCompBlockInfo->fileListIndex == fileIndex && pLoadCompBlockInfo->sid == pMeterObj->sid &&
pQuery->pFields != NULL && pQuery->fileId > 0) {
assert(pRuntimeEnv->pHeaderFiles[fileIndex].fileID == pLoadCompBlockInfo->fileId && pQuery->numOfBlocks > 0);
assert(pRuntimeEnv->pVnodeFiles[fileIndex].fileID == pLoadCompBlockInfo->fileId && pQuery->numOfBlocks > 0);
return true;
}
......@@ -207,7 +207,7 @@ static void vnodeSetCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, int32_t f
pLoadCompBlockInfo->sid = sid;
pLoadCompBlockInfo->fileListIndex = fileIndex;
pLoadCompBlockInfo->fileId = pRuntimeEnv->pHeaderFiles[fileIndex].fileID;
pLoadCompBlockInfo->fileId = pRuntimeEnv->pVnodeFiles[fileIndex].fileID;
}
static void vnodeInitLoadCompBlockInfo(SQueryLoadCompBlockInfo *pCompBlockLoadInfo) {
......@@ -247,6 +247,60 @@ static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) {
pBlockLoadInfo->fileListIndex = -1;
}
static void doUnmapHeaderFileData(SQueryRuntimeEnv* pRuntimeEnv) {
if (pRuntimeEnv->mmapedHFileIndex >= 0) {
assert(pRuntimeEnv->mmapedHFileIndex < pRuntimeEnv->numOfFiles && pRuntimeEnv->mmapedHFileIndex >= 0);
SQueryFileInfo *otherVnodeFiles = &pRuntimeEnv->pVnodeFiles[pRuntimeEnv->mmapedHFileIndex];
munmap(otherVnodeFiles->pHeaderFileData, otherVnodeFiles->headFileSize);
otherVnodeFiles->pHeaderFileData = NULL;
pRuntimeEnv->mmapedHFileIndex = -1;
} else {
assert(pRuntimeEnv->mmapedHFileIndex == -1);
}
}
/**
* mmap the data file into memory. For each query, only one header file is allowed to mmap into memory, in order to
* avoid too many mmapped files at the save time to cause OS return the message of "Cannot allocate memory",
* during query processing.
*
* @param pRuntimeEnv
* @param fileIndex
* @return the return value may be null, so any invoker needs to check the returned value
*/
char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex) {
assert(fileIndex >= 0 && fileIndex < pRuntimeEnv->numOfFiles);
SQuery *pQuery = pRuntimeEnv->pQuery;
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); // only for log output
SQueryFileInfo *pVnodeFiles = &pRuntimeEnv->pVnodeFiles[fileIndex];
size_t size = pVnodeFiles->headFileSize;
if (pVnodeFiles->pHeaderFileData == NULL) {
assert(pRuntimeEnv->mmapedHFileIndex != fileIndex);
doUnmapHeaderFileData(pRuntimeEnv); // do close the other mmaped header file
pVnodeFiles->pHeaderFileData = mmap(NULL, size, PROT_READ, MAP_SHARED, pVnodeFiles->headerFd, 0);
if (pVnodeFiles->pHeaderFileData == MAP_FAILED) {
pVnodeFiles->pHeaderFileData = NULL;
dError("QInfo:%p failed to map header file:%s, size:%lld, %s", pQInfo, pVnodeFiles->headerFilePath, size,
strerror(errno));
} else {
pRuntimeEnv->mmapedHFileIndex = fileIndex; // set the value in case of success mmap file
if (madvise(pVnodeFiles->pHeaderFileData, size, MADV_SEQUENTIAL) == -1) {
dError("QInfo:%p failed to advise kernel the usage of header files, reason:%s", pQInfo, strerror(errno));
}
}
} else {
assert(pRuntimeEnv->mmapedHFileIndex == fileIndex);
}
return pVnodeFiles->pHeaderFileData;
}
/*
* read comp block info from header file
*
......@@ -256,8 +310,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
SVnodeCfg * pCfg = &vnodeList[pMeterObj->vnode].cfg;
SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pHeaderFiles[fileIndex];
int32_t fd = pQueryFileInfo->headerFd;
SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIndex];
int64_t st = taosGetTimestampUs();
......@@ -273,8 +326,11 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
pSummary->numOfSeek++;
#if 1
char *data = pRuntimeEnv->pHeaderFiles[fileIndex].pHeaderFileData;
UNUSED(fd);
char *data = vnodeGetHeaderFileData(pRuntimeEnv, fileIndex); // failed to load the header file data into memory
if (data == NULL) {
return -1;
}
#else
char *data = calloc(1, tmsize + TSDB_FILE_HEADER_LEN);
read(fd, data, tmsize + TSDB_FILE_HEADER_LEN);
......@@ -353,7 +409,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
int64_t et = taosGetTimestampUs();
qTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, load compblock info, size:%d, elapsed:%f ms", pQInfo,
pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pRuntimeEnv->pHeaderFiles[fileIndex].fileID,
pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pRuntimeEnv->pVnodeFiles[fileIndex].fileID,
compBlockSize, (et - st) / 1000.0);
pSummary->totalCompInfoSize += compBlockSize;
......@@ -636,7 +692,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj;
SData ** sdata = pRuntimeEnv->colDataBuffer;
SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pHeaderFiles[fileIdx];
SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIdx];
SData ** primaryTSBuf = &pRuntimeEnv->primaryColBuffer;
void * tmpBuf = pRuntimeEnv->unzipBuffer;
......@@ -850,7 +906,9 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQuery *pQuery, int32_t slo
}
if (pMeterObj != pBlock->pMeterObj || pBlock->blockId > pQuery->blockId) {
dWarn("QInfo:%p vid:%d sid:%d id:%s, cache block is overwritten, slot:%d blockId:%d qBlockId:%d, meterObj:%p, blockMeterObj:%p",
dWarn(
"QInfo:%p vid:%d sid:%d id:%s, cache block is overwritten, slot:%d blockId:%d qBlockId:%d, meterObj:%p, "
"blockMeterObj:%p",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pBlock->blockId,
pQuery->blockId, pMeterObj, pBlock->pMeterObj);
return NULL;
......@@ -938,7 +996,7 @@ static bool getQualifiedDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRunti
break;
}
dError("QInfo:%p fileId:%d total numOfBlks:%d blockId:%d into memory failed due to error in disk files",
dError("QInfo:%p fileId:%d total numOfBlks:%d blockId:%d load into memory failed due to error in disk files",
GET_QINFO_ADDR(pQuery), pQuery->fileId, pQuery->numOfBlocks, blkIdx);
blkIdx += step;
}
......@@ -1110,9 +1168,9 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
// if (!functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
// continue;
// }
// if (!functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
// continue;
// }
SField dummyField = {0};
......@@ -1135,8 +1193,8 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->skey : pQuery->ekey;
int64_t alignedTimestamp = taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit,
pQuery->precision);
int64_t alignedTimestamp =
taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision);
setExecParams(pQuery, &pCtx[k], alignedTimestamp, dataBlock, (char *)primaryKeyCol, forwardStep, functionId,
tpField, hasNull, pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag);
}
......@@ -1405,8 +1463,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
char *dataBlock = getDataBlocks(pRuntimeEnv, data, &sasArray[k], k, *forwardStep, isDiskFileBlock);
TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->skey : pQuery->ekey;
int64_t alignedTimestamp = taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit,
pQuery->precision);
int64_t alignedTimestamp =
taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision);
setExecParams(pQuery, &pCtx[k], alignedTimestamp, dataBlock, (char *)primaryKeyCol, (*forwardStep), functionId,
pFields, hasNull, pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag);
......@@ -1655,18 +1713,18 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv,
}
/* set the initial file for current query */
if (order == TSQL_SO_ASC && *fid < pRuntimeEnv->pHeaderFiles[0].fileID) {
*fid = pRuntimeEnv->pHeaderFiles[0].fileID;
if (order == TSQL_SO_ASC && *fid < pRuntimeEnv->pVnodeFiles[0].fileID) {
*fid = pRuntimeEnv->pVnodeFiles[0].fileID;
return 0;
} else if (order == TSQL_SO_DESC && *fid > pRuntimeEnv->pHeaderFiles[pRuntimeEnv->numOfFiles - 1].fileID) {
*fid = pRuntimeEnv->pHeaderFiles[pRuntimeEnv->numOfFiles - 1].fileID;
} else if (order == TSQL_SO_DESC && *fid > pRuntimeEnv->pVnodeFiles[pRuntimeEnv->numOfFiles - 1].fileID) {
*fid = pRuntimeEnv->pVnodeFiles[pRuntimeEnv->numOfFiles - 1].fileID;
return pRuntimeEnv->numOfFiles - 1;
}
int32_t numOfFiles = pRuntimeEnv->numOfFiles;
if (order == TSQL_SO_DESC && *fid > pRuntimeEnv->pHeaderFiles[numOfFiles - 1].fileID) {
*fid = pRuntimeEnv->pHeaderFiles[numOfFiles - 1].fileID;
if (order == TSQL_SO_DESC && *fid > pRuntimeEnv->pVnodeFiles[numOfFiles - 1].fileID) {
*fid = pRuntimeEnv->pVnodeFiles[numOfFiles - 1].fileID;
return numOfFiles - 1;
}
......@@ -1674,12 +1732,12 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv,
int32_t i = 0;
int32_t step = 1;
while (i<numOfFiles && * fid> pRuntimeEnv->pHeaderFiles[i].fileID) {
while (i<numOfFiles && * fid> pRuntimeEnv->pVnodeFiles[i].fileID) {
i += step;
}
if (i < numOfFiles && *fid <= pRuntimeEnv->pHeaderFiles[i].fileID) {
*fid = pRuntimeEnv->pHeaderFiles[i].fileID;
if (i < numOfFiles && *fid <= pRuntimeEnv->pVnodeFiles[i].fileID) {
*fid = pRuntimeEnv->pVnodeFiles[i].fileID;
return i;
} else {
return -1;
......@@ -1688,12 +1746,12 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv,
int32_t i = numOfFiles - 1;
int32_t step = -1;
while (i >= 0 && *fid < pRuntimeEnv->pHeaderFiles[i].fileID) {
while (i >= 0 && *fid < pRuntimeEnv->pVnodeFiles[i].fileID) {
i += step;
}
if (i >= 0 && *fid >= pRuntimeEnv->pHeaderFiles[i].fileID) {
*fid = pRuntimeEnv->pHeaderFiles[i].fileID;
if (i >= 0 && *fid >= pRuntimeEnv->pVnodeFiles[i].fileID) {
*fid = pRuntimeEnv->pVnodeFiles[i].fileID;
return i;
} else {
return -1;
......@@ -1723,6 +1781,7 @@ int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeter
break;
}
// failed to mmap header file into memory will cause the retrieval of compblock info failed
if (vnodeGetCompBlockInfo(pMeterObj, pRuntimeEnv, fid) > 0) {
break;
}
......@@ -1821,13 +1880,13 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t startQueryTimes
}
// set the output buffer for the selectivity + tag query
static void setCtxTagColumnInfo(SQuery* pQuery, SQueryRuntimeEnv* pRuntimeEnv) {
static void setCtxTagColumnInfo(SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv) {
if (isSelectivityWithTagsQuery(pQuery)) {
int32_t num = 0;
SQLFunctionCtx *pCtx = NULL;
int16_t tagLen = 0;
SQLFunctionCtx ** pTagCtx = calloc(pQuery->numOfOutputCols, POINTER_BYTES);
SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutputCols, POINTER_BYTES);
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SSqlFuncExprMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].pBase;
if (pSqlFuncMsg->functionId == TSDB_FUNC_TAG_DUMMY || pSqlFuncMsg->functionId == TSDB_FUNC_TS_DUMMY) {
......@@ -1973,7 +2032,7 @@ _error_clean:
tfree(pRuntimeEnv->resultInfo);
tfree(pRuntimeEnv->pCtx);
for(int32_t i = 0; i < pRuntimeEnv->pQuery->numOfCols; ++i) {
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfCols; ++i) {
tfree(pRuntimeEnv->colDataBuffer[i]);
}
......@@ -1993,7 +2052,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
}
dTrace("QInfo:%p teardown runtime env", GET_QINFO_ADDR(pRuntimeEnv->pQuery));
for(int32_t i = 0; i < pRuntimeEnv->pQuery->numOfCols; ++i) {
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfCols; ++i) {
tfree(pRuntimeEnv->colDataBuffer[i]);
}
......@@ -2024,7 +2083,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
}
for (int32_t i = 0; i < pRuntimeEnv->numOfFiles; ++i) {
SQueryFileInfo *pQFileInfo = &(pRuntimeEnv->pHeaderFiles[i]);
SQueryFileInfo *pQFileInfo = &(pRuntimeEnv->pVnodeFiles[i]);
if (pQFileInfo->pHeaderFileData != NULL && pQFileInfo->pHeaderFileData != MAP_FAILED) {
munmap(pQFileInfo->pHeaderFileData, pQFileInfo->headFileSize);
}
......@@ -2038,9 +2097,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tclose(pQFileInfo->lastFd);
}
if (pRuntimeEnv->pHeaderFiles != NULL) {
if (pRuntimeEnv->pVnodeFiles != NULL) {
pRuntimeEnv->numOfFiles = 0;
free(pRuntimeEnv->pHeaderFiles);
free(pRuntimeEnv->pVnodeFiles);
}
if (pRuntimeEnv->pInterpoBuf != NULL) {
......@@ -2791,11 +2850,17 @@ int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv *pRuntimeEnv, SPositionInfo *p
}
/*
* NOTE: the compblock information may not be loaded yet, here loaded it firstly
* NOTE:
* The compblock information may not be loaded yet, here loaded it firstly.
* If the compBlock info is loaded, it wont be loaded again.
*
* If failed to load comp block into memory due some how reasons, e.g., empty header file/not enough memory
*/
int32_t numOfBlocks = vnodeGetCompBlockInfo(pMeterObj, pRuntimeEnv, fileIdx);
assert(numOfBlocks > 0);
if (numOfBlocks <= 0) {
position->fileId = -1;
return -1;
}
nextTimestamp = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
}
......@@ -2865,71 +2930,28 @@ static int file_order_comparator(const void *p1, const void *p2) {
*/
static int32_t vnodeOpenVnodeDBFiles(SQInfo *pQInfo, SQueryFileInfo *pVnodeFiles, int32_t fid, int32_t vnodeId,
char *fileName, char *prefix) {
__off_t size = 0;
// __off_t size = 0;
pVnodeFiles->fileID = fid;
pVnodeFiles->defaultMappingSize = DEFAULT_DATA_FILE_MMAP_WINDOW_SIZE;
snprintf(pVnodeFiles->headerFilePath, 256, "%s%s", prefix, fileName);
#if 1
pVnodeFiles->headerFd = open(pVnodeFiles->headerFilePath, O_RDONLY);
#else
int32_t *val = (int32_t *)taosGetStrHashData(fileHandleHashList, pVnodeFiles->headerFilePath);
if (val == NULL) {
pVnodeFiles->headerFd = open(pVnodeFiles->headerFilePath, O_RDONLY);
taosAddStrHash(fileHandleHashList, pVnodeFiles->headerFilePath, (char *)&pVnodeFiles->headerFd);
} else {
pVnodeFiles->headerFd = *val;
}
#endif
if (!VALIDFD(pVnodeFiles->headerFd)) {
dError("QInfo:%p failed open header file:%s reason:%s", pQInfo, pVnodeFiles->headerFilePath, strerror(errno));
goto _clean;
}
struct stat fstat;
struct stat fstat = {0};
if (stat(pVnodeFiles->headerFilePath, &fstat) < 0) return -1;
pVnodeFiles->headFileSize = fstat.st_size;
size = fstat.st_size;
pVnodeFiles->pHeaderFileData = mmap(NULL, size, PROT_READ, MAP_SHARED, pVnodeFiles->headerFd, 0);
if (pVnodeFiles->pHeaderFileData == MAP_FAILED) {
dError("QInfo:%p failed to map header file:%s, %s", pQInfo, pVnodeFiles->headerFilePath, strerror(errno));
goto _clean;
}
/* even the advise failed, continue.. */
if (madvise(pVnodeFiles->pHeaderFileData, size, MADV_SEQUENTIAL) == -1) {
dError("QInfo:%p failed to advise kernel the usage of header files, reason:%s", pQInfo, strerror(errno));
}
snprintf(pVnodeFiles->dataFilePath, 256, "%sv%df%d.data", prefix, vnodeId, fid);
snprintf(pVnodeFiles->lastFilePath, 256, "%sv%df%d.last", prefix, vnodeId, fid);
#if 1
pVnodeFiles->dataFd = open(pVnodeFiles->dataFilePath, O_RDONLY);
pVnodeFiles->lastFd = open(pVnodeFiles->lastFilePath, O_RDONLY);
#else
val = (int32_t *)taosGetStrHashData(fileHandleHashList, pVnodeFiles->dataFilePath);
if (val == NULL) {
pVnodeFiles->dataFd = open(pVnodeFiles->dataFilePath, O_RDONLY);
taosAddStrHash(fileHandleHashList, pVnodeFiles->dataFilePath, (char *)&pVnodeFiles->dataFd);
} else {
pVnodeFiles->dataFd = *val;
}
#endif
if (!VALIDFD(pVnodeFiles->dataFd)) {
dError("QInfo:%p failed to open data file:%s, reason:%s", pQInfo, pVnodeFiles->dataFilePath, strerror(errno));
goto _clean;
}
if (!VALIDFD(pVnodeFiles->lastFd)) {
dError("QInfo:%p failed to open last file:%s, reason:%s", pQInfo, pVnodeFiles->lastFilePath, strerror(errno));
goto _clean;
}
if (stat(pVnodeFiles->dataFilePath, &fstat) < 0) return -1;
pVnodeFiles->dataFileSize = fstat.st_size;
......@@ -2953,13 +2975,9 @@ static int32_t vnodeOpenVnodeDBFiles(SQInfo *pQInfo, SQueryFileInfo *pVnodeFiles
}
#endif
return 0;
return TSDB_CODE_SUCCESS;
_clean:
if (pVnodeFiles->pHeaderFileData != MAP_FAILED && pVnodeFiles->pDataFileData != NULL) {
munmap(pVnodeFiles->pHeaderFileData, pVnodeFiles->headFileSize);
pVnodeFiles->pHeaderFileData = NULL;
}
#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP
if (pVnodeFiles->pDataFileData != MAP_FAILED && pVnodeFiles->pDataFileData != NULL) {
......@@ -2987,10 +3005,10 @@ static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
char suffix[] = ".head";
struct dirent *pEntry = NULL;
int32_t alloc = 4; // default allocated size
size_t alloc = 4; // default allocated size
SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->pMeterQuerySupporter->runtimeEnv);
pRuntimeEnv->pHeaderFiles = calloc(1, sizeof(SQueryFileInfo) * alloc);
pRuntimeEnv->pVnodeFiles = calloc(1, sizeof(SQueryFileInfo) * alloc);
SVnodeObj *pVnode = &vnodeList[vnodeId];
while ((pEntry = readdir(pDir)) != NULL) {
......@@ -3026,11 +3044,11 @@ static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
if (++pRuntimeEnv->numOfFiles > alloc) {
alloc = alloc << 1;
pRuntimeEnv->pHeaderFiles = realloc(pRuntimeEnv->pHeaderFiles, alloc * sizeof(SQueryFileInfo));
memset(&pRuntimeEnv->pHeaderFiles[alloc >> 1], 0, (alloc >> 1) * sizeof(SQueryFileInfo));
pRuntimeEnv->pVnodeFiles = realloc(pRuntimeEnv->pVnodeFiles, alloc * sizeof(SQueryFileInfo));
memset(&pRuntimeEnv->pVnodeFiles[alloc >> 1], 0, (alloc >> 1) * sizeof(SQueryFileInfo));
}
SQueryFileInfo *pVnodeFiles = &pRuntimeEnv->pHeaderFiles[pRuntimeEnv->numOfFiles - 1];
SQueryFileInfo *pVnodeFiles = &pRuntimeEnv->pVnodeFiles[pRuntimeEnv->numOfFiles - 1];
int32_t ret = vnodeOpenVnodeDBFiles(pQInfo, pVnodeFiles, fid, vnodeId, pEntry->d_name, dbFilePathPrefix);
if (ret < 0) {
memset(pVnodeFiles, 0, sizeof(SQueryFileInfo)); // reset information
......@@ -3043,7 +3061,7 @@ static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
dTrace("QInfo:%p find %d data files in %s to be checked", pQInfo, pRuntimeEnv->numOfFiles, dbFilePathPrefix);
/* order the files information according their names */
qsort(pRuntimeEnv->pHeaderFiles, (size_t)pRuntimeEnv->numOfFiles, sizeof(SQueryFileInfo), file_order_comparator);
qsort(pRuntimeEnv->pVnodeFiles, (size_t)pRuntimeEnv->numOfFiles, sizeof(SQueryFileInfo), file_order_comparator);
}
static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, void *pBlock) {
......@@ -3122,8 +3140,8 @@ static bool onlyOneQueryType(SQuery *pQuery, int32_t functId, int32_t functIdDst
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY ||
functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAG_DUMMY) {
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG ||
functionId == TSDB_FUNC_TAG_DUMMY) {
continue;
}
......@@ -3141,7 +3159,8 @@ static bool onlyLastQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSDB
static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) {
// in case of point-interpolation query, use asc order scan
char msg[] = "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%lld-%lld, "
char msg[] =
"QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%lld-%lld, "
"new qrange:%lld-%lld";
// descending order query
......@@ -3445,7 +3464,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
pCtx->numOfParams = 4;
SInterpInfo * pInterpInfo = (SInterpInfo *)pRuntimeEnv->pCtx[i].aOutputBuf;
SInterpInfo *pInterpInfo = (SInterpInfo *)pRuntimeEnv->pCtx[i].aOutputBuf;
pInterpInfo->pInterpDetail = calloc(1, sizeof(SInterpInfoDetail));
......@@ -3624,6 +3643,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
// dataInCache requires lastKey value
pQuery->lastKey = pQuery->skey;
pSupporter->runtimeEnv.mmapedHFileIndex = -1; // set the initial value
vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo);
vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo);
......@@ -3793,6 +3813,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
changeExecuteScanOrder(pQuery, true);
pSupporter->runtimeEnv.mmapedHFileIndex = -1; // set the initial value
vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo);
vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo);
......@@ -3917,13 +3938,13 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
}
// todo merge with doRevisedResultsByLimit
void UNUSED_FUNC truncateResultByLimit(SQInfo *pQInfo, int64_t * final, int32_t *interpo) {
void UNUSED_FUNC truncateResultByLimit(SQInfo *pQInfo, int64_t *final, int32_t *interpo) {
SQuery *pQuery = &(pQInfo->query);
if (pQuery->limit.limit > 0 && ((* final) + pQInfo->pointsRead > pQuery->limit.limit)) {
int64_t num = (* final) + pQInfo->pointsRead - pQuery->limit.limit;
if (pQuery->limit.limit > 0 && ((*final) + pQInfo->pointsRead > pQuery->limit.limit)) {
int64_t num = (*final) + pQInfo->pointsRead - pQuery->limit.limit;
(*interpo) -= num;
(* final) -= num;
(*final) -= num;
setQueryStatus(pQuery, QUERY_COMPLETED); // query completed
}
......@@ -3974,7 +3995,7 @@ TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) {
// the fields info is not loaded, load it into memory
if (pQuery->pFields == NULL || pQuery->pFields[pQuery->slot] == NULL) {
loadDataBlockFieldsInfo(pRuntimeEnv, &pRuntimeEnv->pHeaderFiles[fileIndex], pBlock, &pQuery->pFields[pQuery->slot]);
loadDataBlockFieldsInfo(pRuntimeEnv, &pRuntimeEnv->pVnodeFiles[fileIndex], pBlock, &pQuery->pFields[pQuery->slot]);
}
SET_DATA_BLOCK_LOADED(pRuntimeEnv->blockStatus);
......@@ -4006,7 +4027,7 @@ static void getFirstDataBlockInCache(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
//TODO handle case that the cache is allocated but not assign to SMeterObj
// TODO handle case that the cache is allocated but not assign to SMeterObj
void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
......@@ -4464,17 +4485,18 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, tFilePage
pCtx[i].hasNull = true;
pCtx[i].nStartQueryTimestamp = timestamp;
pCtx[i].aInputElemBuf = ((char *) inputSrc->data) +
((int32_t) pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage) + pCtx[i].outputBytes * inputIdx;
pCtx[i].aInputElemBuf = ((char *)inputSrc->data) +
((int32_t)pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage) +
pCtx[i].outputBytes * inputIdx;
//in case of tag column, the tag information should be extracted from input buffer
// in case of tag column, the tag information should be extracted from input buffer
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG) {
tVariantDestroy(&pCtx[i].tag);
tVariantCreateFromBinary(&pCtx[i].tag, pCtx[i].aInputElemBuf, pCtx[i].inputBytes, pCtx[i].inputType);
}
}
for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY) {
continue;
......@@ -4942,7 +4964,8 @@ void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) {
}
}
}
} else {
} else { // TODO ERROR!!
// need to handle for each query result, not just the single runtime ctx.
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1;
int32_t functId = pQuery->pSelectExpr[i].pBase.functionId;
......@@ -5460,15 +5483,18 @@ static int32_t offsetComparator(const void *pLeft, const void *pRight) {
* @param pMeterHeadDataInfo
* @return
*/
SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, SQueryFileInfo *pQueryFileInfo,
tSidSet *pSidSet, SMeterDataInfo *pMeterDataInfo, int32_t *numOfMeters) {
SQuery * pQuery = &pQInfo->query;
SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t fileIndex, tSidSet *pSidSet,
SMeterDataInfo *pMeterDataInfo, int32_t *numOfMeters) {
SQuery *pQuery = &pQInfo->query;
SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter;
SMeterSidExtInfo ** pMeterSidExtInfo = pSupporter->pMeterSidExtInfo;
SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv;
SQueryFileInfo * pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIndex];
SVnodeObj *pVnode = &vnodeList[vid];
char * pHeaderData = pQueryFileInfo->pHeaderFileData;
char * pHeaderData = vnodeGetHeaderFileData(pRuntimeEnv, fileIndex);
int32_t tmsize = sizeof(SCompHeader) * (pVnode->cfg.maxSessions) + sizeof(TSCKSUM);
// file is corrupted, abort query in current file
......@@ -6467,7 +6493,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, int8_t *blkS
int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj;
SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pHeaderFiles[fileIdx];
SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIdx];
TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
......@@ -6914,18 +6940,18 @@ static int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, tFilePage **p
return numOfRes;
}
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char* data, int32_t* size) {
SMeterObj* pObj = pQInfo->pObj;
SQuery* pQuery = &pQInfo->query;
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data, int32_t *size) {
SMeterObj *pObj = pQInfo->pObj;
SQuery * pQuery = &pQInfo->query;
int tnumOfRows = vnodeList[pObj->vnode].cfg.rowsInFileBlock;
int32_t dataSize = pQInfo->query.rowSize * numOfRows;
if (dataSize >= tsCompressMsgSize && tsCompressMsgSize > 0) {
char* compBuf = malloc((size_t) dataSize);
char *compBuf = malloc((size_t)dataSize);
// for metric query, bufIndex always be 0.
char* d = compBuf;
char *d = compBuf;
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { // pQInfo->bufIndex == 0
int32_t bytes = pQuery->pSelectExpr[col].resBytes;
......@@ -6958,9 +6984,9 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char* data
* @param numOfRows the number of rows that are not returned in current retrieve
* @return
*/
int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, int32_t* size) {
int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, int32_t *size) {
SQInfo *pQInfo = (SQInfo *)handle;
SQuery * pQuery = &pQInfo->query;
SQuery *pQuery = &pQInfo->query;
assert(pQuery->pSelectExpr != NULL && pQuery->numOfOutputCols > 0);
......
......@@ -289,12 +289,11 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
pQuery->fileId = fid;
pSummary->numOfFiles++;
SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pHeaderFiles[fileIdx];
char * pHeaderData = pQueryFileInfo->pHeaderFileData;
SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIdx];
int32_t numOfQualifiedMeters = 0;
SMeterDataInfo **pReqMeterDataInfo = vnodeFilterQualifiedMeters(
pQInfo, vnodeId, pQueryFileInfo, pSupporter->pSidSet, pMeterDataInfo, &numOfQualifiedMeters);
SMeterDataInfo **pReqMeterDataInfo = vnodeFilterQualifiedMeters(pQInfo, vnodeId, fileIdx, pSupporter->pSidSet,
pMeterDataInfo, &numOfQualifiedMeters);
dTrace("QInfo:%p file:%s, %d meters qualified", pQInfo, pQueryFileInfo->dataFilePath, numOfQualifiedMeters);
if (pReqMeterDataInfo == NULL) {
......@@ -312,6 +311,11 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
continue;
}
char *pHeaderData = vnodeGetHeaderFileData(pRuntimeEnv, fileIdx);
if (pHeaderData == NULL) { // failed to mmap header file into buffer
continue;
}
uint32_t numOfBlocks = getDataBlocksForMeters(pSupporter, pQuery, pHeaderData, numOfQualifiedMeters, pQueryFileInfo,
pReqMeterDataInfo);
......@@ -500,7 +504,7 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start
#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP
for (int32_t i = 0; i < pRuntimeEnv->numOfFiles; ++i) {
resetMMapWindow(&pRuntimeEnv->pHeaderFiles[i]);
resetMMapWindow(&pRuntimeEnv->pVnodeFiles[i]);
}
#endif
......@@ -670,7 +674,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP
for (int32_t i = 0; i < pRuntimeEnv->numOfFiles; ++i) {
resetMMapWindow(&pRuntimeEnv->pHeaderFiles[i]);
resetMMapWindow(&pRuntimeEnv->pVnodeFiles[i]);
}
#endif
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册