Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
96ae0ca2
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
96ae0ca2
编写于
11月 28, 2019
作者:
H
hjxilinx
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[jira none]only open and memory map one group of files
上级
07a67002
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
224 addition
and
200 deletion
+224
-200
src/system/detail/inc/vnodeQueryImpl.h
src/system/detail/inc/vnodeQueryImpl.h
+2
-2
src/system/detail/inc/vnodeRead.h
src/system/detail/inc/vnodeRead.h
+28
-29
src/system/detail/src/vnodeQueryImpl.c
src/system/detail/src/vnodeQueryImpl.c
+181
-158
src/system/detail/src/vnodeQueryProcess.c
src/system/detail/src/vnodeQueryProcess.c
+13
-11
未找到文件。
src/system/detail/inc/vnodeQueryImpl.h
浏览文件 @
96ae0ca2
...
@@ -111,7 +111,7 @@ typedef enum {
...
@@ -111,7 +111,7 @@ typedef enum {
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
typedef
int
(
*
__block_search_fn_t
)(
char
*
data
,
int
num
,
int64_t
key
,
int
order
);
typedef
int
(
*
__block_search_fn_t
)(
char
*
data
,
int
num
,
int64_t
key
,
int
order
);
typedef
int32_t
(
*
__read_data_fn_t
)(
int
fd
,
SQInfo
*
pQInfo
,
SQueryFileInfo
*
pQueryFile
,
char
*
buf
,
uint64_t
offset
,
typedef
int32_t
(
*
__read_data_fn_t
)(
int
fd
,
SQInfo
*
pQInfo
,
SQueryFile
s
Info
*
pQueryFile
,
char
*
buf
,
uint64_t
offset
,
int32_t
size
);
int32_t
size
);
static
FORCE_INLINE
SMeterObj
*
getMeterObj
(
void
*
hashHandle
,
int32_t
sid
)
{
static
FORCE_INLINE
SMeterObj
*
getMeterObj
(
void
*
hashHandle
,
int32_t
sid
)
{
...
@@ -191,7 +191,7 @@ int64_t getQueryStartPositionInCache(SQueryRuntimeEnv* pRuntimeEnv, int32_t* slo
...
@@ -191,7 +191,7 @@ int64_t getQueryStartPositionInCache(SQueryRuntimeEnv* pRuntimeEnv, int32_t* slo
int64_t
getNextAccessedKeyInData
(
SQuery
*
pQuery
,
int64_t
*
pPrimaryCol
,
SBlockInfo
*
pBlockInfo
,
int32_t
blockStatus
);
int64_t
getNextAccessedKeyInData
(
SQuery
*
pQuery
,
int64_t
*
pPrimaryCol
,
SBlockInfo
*
pBlockInfo
,
int32_t
blockStatus
);
uint32_t
getDataBlocksForMeters
(
SMeterQuerySupportObj
*
pSupporter
,
SQuery
*
pQuery
,
char
*
pHeaderData
,
uint32_t
getDataBlocksForMeters
(
SMeterQuerySupportObj
*
pSupporter
,
SQuery
*
pQuery
,
char
*
pHeaderData
,
int32_t
numOfMeters
,
SQueryFileInfo
*
pQueryFileInfo
,
SMeterDataInfo
**
pMeterDataInfo
);
int32_t
numOfMeters
,
const
char
*
filePath
,
SMeterDataInfo
**
pMeterDataInfo
);
int32_t
LoadDatablockOnDemand
(
SCompBlock
*
pBlock
,
SField
**
pFields
,
uint8_t
*
blkStatus
,
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
LoadDatablockOnDemand
(
SCompBlock
*
pBlock
,
SField
**
pFields
,
uint8_t
*
blkStatus
,
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
fileIdx
,
int32_t
slotIdx
,
__block_search_fn_t
searchFn
,
bool
onDemand
);
int32_t
fileIdx
,
int32_t
slotIdx
,
__block_search_fn_t
searchFn
,
bool
onDemand
);
char
*
vnodeGetHeaderFileData
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
vnodeId
,
int32_t
fileIndex
);
char
*
vnodeGetHeaderFileData
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
vnodeId
,
int32_t
fileIndex
);
...
...
src/system/detail/inc/vnodeRead.h
浏览文件 @
96ae0ca2
...
@@ -47,29 +47,14 @@ typedef struct SQueryLoadCompBlockInfo {
...
@@ -47,29 +47,14 @@ typedef struct SQueryLoadCompBlockInfo {
int32_t
fileId
;
int32_t
fileId
;
int32_t
fileListIndex
;
int32_t
fileListIndex
;
}
SQueryLoadCompBlockInfo
;
}
SQueryLoadCompBlockInfo
;
/*
/*
* the header file info for one vnode
* the header file info for one vnode
*/
*/
typedef
struct
SQueryFileInfo
{
typedef
struct
SHeaderFileInfo
{
int32_t
fileID
;
/* file id */
int32_t
fileID
;
// file id
char
headerFilePath
[
PATH_MAX
];
/* full file name */
size_t
headFileSize
;
// header file size
char
dataFilePath
[
PATH_MAX
];
}
SHeaderFileInfo
;
char
lastFilePath
[
PATH_MAX
];
int32_t
defaultMappingSize
;
/* default mapping size */
int32_t
headerFd
;
/* file handler */
char
*
pHeaderFileData
;
/* mmap header files */
size_t
headFileSize
;
int32_t
dataFd
;
char
*
pDataFileData
;
size_t
dataFileSize
;
uint64_t
dtFileMappingOffset
;
int32_t
lastFd
;
size_t
lastFileSize
;
uint64_t
lastFileMappingOffset
;
}
SQueryFileInfo
;
typedef
struct
SQueryCostSummary
{
typedef
struct
SQueryCostSummary
{
double
cacheTimeUs
;
double
cacheTimeUs
;
...
@@ -106,6 +91,27 @@ typedef struct SOutputRes {
...
@@ -106,6 +91,27 @@ typedef struct SOutputRes {
SResultInfo
*
resultInfo
;
SResultInfo
*
resultInfo
;
}
SOutputRes
;
}
SOutputRes
;
/*
* header files info, avoid to iterate the directory, the data is acquired
* during in query preparation function
*/
typedef
struct
SQueryFilesInfo
{
SHeaderFileInfo
*
pFileInfo
;
uint32_t
numOfFiles
;
// the total available number of files for this virtual node during query execution
int32_t
current
;
// the memory mapped header file, NOTE: only one header file can be mmap.
int32_t
vnodeId
;
int32_t
headerFd
;
// header file fd
char
*
pHeaderFileData
;
// mmap header files
int32_t
dataFd
;
int32_t
lastFd
;
char
headerFilePath
[
PATH_MAX
];
// current opened header file name
char
dataFilePath
[
PATH_MAX
];
// current opened data file name
char
lastFilePath
[
PATH_MAX
];
// current opened last file path
char
dbFilePathPrefix
[
PATH_MAX
];
}
SQueryFilesInfo
;
typedef
struct
RuntimeEnvironment
{
typedef
struct
RuntimeEnvironment
{
SPositionInfo
startPos
;
/* the start position, used for secondary/third iteration */
SPositionInfo
startPos
;
/* the start position, used for secondary/third iteration */
SPositionInfo
endPos
;
/* the last access position in query, served as the start pos of reversed order query */
SPositionInfo
endPos
;
/* the last access position in query, served as the start pos of reversed order query */
...
@@ -122,14 +128,7 @@ typedef struct RuntimeEnvironment {
...
@@ -122,14 +128,7 @@ typedef struct RuntimeEnvironment {
SQLFunctionCtx
*
pCtx
;
SQLFunctionCtx
*
pCtx
;
SQueryLoadBlockInfo
loadBlockInfo
;
/* record current block load information */
SQueryLoadBlockInfo
loadBlockInfo
;
/* record current block load information */
SQueryLoadCompBlockInfo
loadCompBlockInfo
;
/* record current compblock information in SQuery */
SQueryLoadCompBlockInfo
loadCompBlockInfo
;
/* record current compblock information in SQuery */
SQueryFilesInfo
vnodeFileInfo
;
/*
* header files info, avoid to iterate the directory, the data is acquired
* during in query preparation function
*/
SQueryFileInfo
*
pVnodeFiles
;
uint32_t
numOfFiles
;
// the total available number of files for this virtual node during query execution
int32_t
mmapedHFileIndex
;
// the mmaped header file, NOTE: only one header file can be mmap.
int16_t
numOfRowsPerPage
;
int16_t
numOfRowsPerPage
;
int16_t
offset
[
TSDB_MAX_COLUMNS
];
int16_t
offset
[
TSDB_MAX_COLUMNS
];
int16_t
scanFlag
;
// denotes reversed scan of data or not
int16_t
scanFlag
;
// denotes reversed scan of data or not
...
...
src/system/detail/src/vnodeQueryImpl.c
浏览文件 @
96ae0ca2
...
@@ -40,12 +40,12 @@ enum {
...
@@ -40,12 +40,12 @@ enum {
#define IS_DISK_DATA_BLOCK(q) ((q)->fileId >= 0)
#define IS_DISK_DATA_BLOCK(q) ((q)->fileId >= 0)
static
int32_t
copyDataFromMMapBuffer
(
int
fd
,
SQInfo
*
pQInfo
,
SQueryFile
Info
*
pQueryFile
,
char
*
buf
,
uint64_t
offset
,
//static int32_t copyDataFromMMapBuffer(int fd, SQInfo *pQInfo, SQueryFiles
Info *pQueryFile, char *buf, uint64_t offset,
int32_t
size
);
//
int32_t size);
static
int32_t
readDataFromDiskFile
(
int
fd
,
SQInfo
*
pQInfo
,
SQueryFileInfo
*
pQueryFile
,
char
*
buf
,
uint64_t
offset
,
static
int32_t
readDataFromDiskFile
(
int
fd
,
SQInfo
*
pQInfo
,
SQueryFile
s
Info
*
pQueryFile
,
char
*
buf
,
uint64_t
offset
,
int32_t
size
);
int32_t
size
);
__read_data_fn_t
readDataFunctor
[
2
]
=
{
copyDataFromMMapBuffer
,
readDataFromDiskFile
};
//
__read_data_fn_t readDataFunctor[2] = {copyDataFromMMapBuffer, readDataFromDiskFile};
static
void
vnodeInitLoadCompBlockInfo
(
SQueryLoadCompBlockInfo
*
pCompBlockLoadInfo
);
static
void
vnodeInitLoadCompBlockInfo
(
SQueryLoadCompBlockInfo
*
pCompBlockLoadInfo
);
static
int32_t
moveToNextBlock
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
step
,
__block_search_fn_t
searchFn
,
static
int32_t
moveToNextBlock
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
step
,
__block_search_fn_t
searchFn
,
...
@@ -99,7 +99,7 @@ static FORCE_INLINE int32_t getCompHeaderStartPosition(SVnodeCfg *pCfg) {
...
@@ -99,7 +99,7 @@ static FORCE_INLINE int32_t getCompHeaderStartPosition(SVnodeCfg *pCfg) {
}
}
static
FORCE_INLINE
int32_t
validateCompBlockOffset
(
SQInfo
*
pQInfo
,
SMeterObj
*
pMeterObj
,
SCompHeader
*
pCompHeader
,
static
FORCE_INLINE
int32_t
validateCompBlockOffset
(
SQInfo
*
pQInfo
,
SMeterObj
*
pMeterObj
,
SCompHeader
*
pCompHeader
,
S
Query
FileInfo
*
pQueryFileInfo
,
int32_t
headerSize
)
{
S
Header
FileInfo
*
pQueryFileInfo
,
int32_t
headerSize
)
{
if
(
pCompHeader
->
compInfoOffset
<
headerSize
||
pCompHeader
->
compInfoOffset
>
pQueryFileInfo
->
headFileSize
)
{
if
(
pCompHeader
->
compInfoOffset
<
headerSize
||
pCompHeader
->
compInfoOffset
>
pQueryFileInfo
->
headFileSize
)
{
dError
(
"QInfo:%p vid:%d sid:%d id:%s, compInfoOffset:%d is not valid, size:%ld"
,
pQInfo
,
pMeterObj
->
vnode
,
dError
(
"QInfo:%p vid:%d sid:%d id:%s, compInfoOffset:%d is not valid, size:%ld"
,
pQInfo
,
pMeterObj
->
vnode
,
pMeterObj
->
sid
,
pMeterObj
->
meterId
,
pCompHeader
->
compInfoOffset
,
pQueryFileInfo
->
headFileSize
);
pMeterObj
->
sid
,
pMeterObj
->
meterId
,
pCompHeader
->
compInfoOffset
,
pQueryFileInfo
->
headFileSize
);
...
@@ -195,7 +195,7 @@ static bool vnodeIsCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj
...
@@ -195,7 +195,7 @@ static bool vnodeIsCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj
// if vnodeFreeFields is called, the pQuery->pFields is NULL
// if vnodeFreeFields is called, the pQuery->pFields is NULL
if
(
pLoadCompBlockInfo
->
fileListIndex
==
fileIndex
&&
pLoadCompBlockInfo
->
sid
==
pMeterObj
->
sid
&&
if
(
pLoadCompBlockInfo
->
fileListIndex
==
fileIndex
&&
pLoadCompBlockInfo
->
sid
==
pMeterObj
->
sid
&&
pQuery
->
pFields
!=
NULL
&&
pQuery
->
fileId
>
0
)
{
pQuery
->
pFields
!=
NULL
&&
pQuery
->
fileId
>
0
)
{
assert
(
pRuntimeEnv
->
pVnodeFiles
[
fileIndex
].
fileID
==
pLoadCompBlockInfo
->
fileId
&&
pQuery
->
numOfBlocks
>
0
);
assert
(
pRuntimeEnv
->
vnodeFileInfo
.
pFileInfo
[
fileIndex
].
fileID
==
pLoadCompBlockInfo
->
fileId
&&
pQuery
->
numOfBlocks
>
0
);
return
true
;
return
true
;
}
}
...
@@ -207,7 +207,7 @@ static void vnodeSetCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, int32_t f
...
@@ -207,7 +207,7 @@ static void vnodeSetCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, int32_t f
pLoadCompBlockInfo
->
sid
=
sid
;
pLoadCompBlockInfo
->
sid
=
sid
;
pLoadCompBlockInfo
->
fileListIndex
=
fileIndex
;
pLoadCompBlockInfo
->
fileListIndex
=
fileIndex
;
pLoadCompBlockInfo
->
fileId
=
pRuntimeEnv
->
pVnodeFiles
[
fileIndex
].
fileID
;
pLoadCompBlockInfo
->
fileId
=
pRuntimeEnv
->
vnodeFileInfo
.
pFileInfo
[
fileIndex
].
fileID
;
}
}
static
void
vnodeInitLoadCompBlockInfo
(
SQueryLoadCompBlockInfo
*
pCompBlockLoadInfo
)
{
static
void
vnodeInitLoadCompBlockInfo
(
SQueryLoadCompBlockInfo
*
pCompBlockLoadInfo
)
{
...
@@ -259,49 +259,90 @@ static bool isHeaderFileEmpty(int32_t vnodeId, size_t headerFileSize) {
...
@@ -259,49 +259,90 @@ static bool isHeaderFileEmpty(int32_t vnodeId, size_t headerFileSize) {
return
headerFileSize
<=
getCompHeaderStartPosition
(
pVnodeCfg
);
return
headerFileSize
<=
getCompHeaderStartPosition
(
pVnodeCfg
);
}
}
static
void
doCloseQueryFileInfoFD
(
SQueryFile
Info
*
pVnodeFiles
)
{
static
void
doCloseQueryFileInfoFD
(
SQueryFile
sInfo
*
pVnodeFilesInfo
)
{
tclose
(
pVnodeFiles
->
headerFd
);
tclose
(
pVnodeFiles
Info
->
headerFd
);
tclose
(
pVnodeFiles
->
dataFd
);
tclose
(
pVnodeFiles
Info
->
dataFd
);
tclose
(
pVnodeFiles
->
lastFd
);
tclose
(
pVnodeFiles
Info
->
lastFd
);
}
}
static
int32_t
doOpenQueryFileInfoDF
(
SQInfo
*
pQInfo
,
SQueryFileInfo
*
pVnodeFiles
)
{
static
void
doInitQueryFileInfoFD
(
SQueryFilesInfo
*
pVnodeFilesInfo
)
{
pVnodeFilesInfo
->
current
=
-
1
;
pVnodeFilesInfo
->
headerFd
=
FD_INITIALIZER
;
// set the initial value
pVnodeFilesInfo
->
dataFd
=
FD_INITIALIZER
;
pVnodeFilesInfo
->
lastFd
=
FD_INITIALIZER
;
}
static
int32_t
doOpenQueryFileData
(
SQInfo
*
pQInfo
,
SQueryFilesInfo
*
pVnodeFiles
)
{
// if the header is smaller than a threshold value, this file is empty, no need to
// if the header is smaller than a threshold value, this file is empty, no need to
SHeaderFileInfo
*
pCurrentFileInfo
=
&
pVnodeFiles
->
pFileInfo
[
pVnodeFiles
->
current
];
// set the full file path for current opened files
snprintf
(
pVnodeFiles
->
headerFilePath
,
PATH_MAX
,
"%sv%df%d.head"
,
pVnodeFiles
->
dbFilePathPrefix
,
pVnodeFiles
->
vnodeId
,
pCurrentFileInfo
->
fileID
);
pVnodeFiles
->
headerFd
=
open
(
pVnodeFiles
->
headerFilePath
,
O_RDONLY
);
pVnodeFiles
->
headerFd
=
open
(
pVnodeFiles
->
headerFilePath
,
O_RDONLY
);
if
(
!
FD_VALID
(
pVnodeFiles
->
headerFd
))
{
if
(
!
FD_VALID
(
pVnodeFiles
->
headerFd
))
{
dError
(
"QInfo:%p failed open header file:%s reason:%s"
,
pQInfo
,
pVnodeFiles
->
headerFilePath
,
strerror
(
errno
));
dError
(
"QInfo:%p failed open header file:%s reason:%s"
,
pQInfo
,
pVnodeFiles
->
headerFilePath
,
strerror
(
errno
));
return
-
1
;
return
-
1
;
}
}
snprintf
(
pVnodeFiles
->
dataFilePath
,
PATH_MAX
,
"%sv%df%d.data"
,
pVnodeFiles
->
dbFilePathPrefix
,
pVnodeFiles
->
vnodeId
,
pCurrentFileInfo
->
fileID
);
pVnodeFiles
->
dataFd
=
open
(
pVnodeFiles
->
dataFilePath
,
O_RDONLY
);
pVnodeFiles
->
dataFd
=
open
(
pVnodeFiles
->
dataFilePath
,
O_RDONLY
);
if
(
!
FD_VALID
(
pVnodeFiles
->
header
Fd
))
{
if
(
!
FD_VALID
(
pVnodeFiles
->
data
Fd
))
{
dError
(
"QInfo:%p failed open data file:%s reason:%s"
,
pQInfo
,
pVnodeFiles
->
dataFilePath
,
strerror
(
errno
));
dError
(
"QInfo:%p failed open data file:%s reason:%s"
,
pQInfo
,
pVnodeFiles
->
dataFilePath
,
strerror
(
errno
));
return
-
1
;
return
-
1
;
}
}
snprintf
(
pVnodeFiles
->
lastFilePath
,
PATH_MAX
,
"%sv%df%d.last"
,
pVnodeFiles
->
dbFilePathPrefix
,
pVnodeFiles
->
vnodeId
,
pCurrentFileInfo
->
fileID
);
pVnodeFiles
->
lastFd
=
open
(
pVnodeFiles
->
lastFilePath
,
O_RDONLY
);
pVnodeFiles
->
lastFd
=
open
(
pVnodeFiles
->
lastFilePath
,
O_RDONLY
);
if
(
!
FD_VALID
(
pVnodeFiles
->
header
Fd
))
{
if
(
!
FD_VALID
(
pVnodeFiles
->
last
Fd
))
{
dError
(
"QInfo:%p failed open last file:%s reason:%s"
,
pQInfo
,
pVnodeFiles
->
lastFilePath
,
strerror
(
errno
));
dError
(
"QInfo:%p failed open last file:%s reason:%s"
,
pQInfo
,
pVnodeFiles
->
lastFilePath
,
strerror
(
errno
));
return
-
1
;
return
-
1
;
}
}
pVnodeFiles
->
pHeaderFileData
=
mmap
(
NULL
,
pCurrentFileInfo
->
headFileSize
,
PROT_READ
,
MAP_SHARED
,
pVnodeFiles
->
headerFd
,
0
);
if
(
pVnodeFiles
->
pHeaderFileData
==
MAP_FAILED
)
{
pVnodeFiles
->
pHeaderFileData
=
NULL
;
doCloseQueryFileInfoFD
(
pVnodeFiles
);
doInitQueryFileInfoFD
(
pVnodeFiles
);
dError
(
"QInfo:%p failed to mmap header file:%s, size:%lld, %s"
,
pQInfo
,
pVnodeFiles
->
headerFilePath
,
pCurrentFileInfo
->
headFileSize
,
strerror
(
errno
));
return
-
1
;
}
else
{
if
(
madvise
(
pVnodeFiles
->
pHeaderFileData
,
pCurrentFileInfo
->
headFileSize
,
MADV_SEQUENTIAL
)
==
-
1
)
{
dError
(
"QInfo:%p failed to advise kernel the usage of header file, reason:%s"
,
pQInfo
,
strerror
(
errno
));
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
void
doUnmapHeaderFileData
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
static
void
doCloseOpenedFileData
(
SQueryFilesInfo
*
pVnodeFileInfo
)
{
if
(
pRuntimeEnv
->
mmapedHFileIndex
>=
0
)
{
if
(
pVnodeFileInfo
->
current
>=
0
)
{
assert
(
pRuntimeEnv
->
mmapedHFileIndex
<
pRuntimeEnv
->
numOfFiles
&&
pRuntimeEnv
->
mmapedHFileIndex
>=
0
);
SQueryFileInfo
*
otherVnodeFiles
=
&
pRuntimeEnv
->
pVnodeFiles
[
pRuntimeEnv
->
mmapedHFileIndex
];
assert
(
pVnodeFileInfo
->
current
<
pVnodeFileInfo
->
numOfFiles
&&
pVnodeFileInfo
->
current
>=
0
&&
munmap
(
otherVnodeFiles
->
pHeaderFileData
,
otherVnodeFiles
->
headFileSize
);
pVnodeFileInfo
->
pHeaderFileData
!=
NULL
);
otherVnodeFiles
->
pHeaderFileData
=
NULL
;
SHeaderFileInfo
*
pCurrentFile
=
&
pVnodeFileInfo
->
pFileInfo
[
pVnodeFileInfo
->
current
];
doCloseQueryFileInfoFD
(
otherVnodeFiles
);
pRuntimeEnv
->
mmapedHFileIndex
=
-
1
;
munmap
(
pVnodeFileInfo
->
pHeaderFileData
,
pCurrentFile
->
headFileSize
);
pVnodeFileInfo
->
pHeaderFileData
=
NULL
;
doCloseQueryFileInfoFD
(
pVnodeFileInfo
);
doInitQueryFileInfoFD
(
pVnodeFileInfo
);
}
}
assert
(
p
RuntimeEnv
->
mmapedHFileIndex
==
-
1
);
assert
(
p
VnodeFileInfo
->
current
==
-
1
);
}
}
/**
/**
...
@@ -314,46 +355,40 @@ static void doUnmapHeaderFileData(SQueryRuntimeEnv* pRuntimeEnv) {
...
@@ -314,46 +355,40 @@ static void doUnmapHeaderFileData(SQueryRuntimeEnv* pRuntimeEnv) {
* @return the return value may be null, so any invoker needs to check the returned value
* @return the return value may be null, so any invoker needs to check the returned value
*/
*/
char
*
vnodeGetHeaderFileData
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
vnodeId
,
int32_t
fileIndex
)
{
char
*
vnodeGetHeaderFileData
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
vnodeId
,
int32_t
fileIndex
)
{
assert
(
fileIndex
>=
0
&&
fileIndex
<
pRuntimeEnv
->
numOfFiles
);
assert
(
fileIndex
>=
0
&&
fileIndex
<
pRuntimeEnv
->
vnodeFileInfo
.
numOfFiles
);
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQInfo
*
pQInfo
=
(
SQInfo
*
)
GET_QINFO_ADDR
(
pQuery
);
// only for log output
SQInfo
*
pQInfo
=
(
SQInfo
*
)
GET_QINFO_ADDR
(
pQuery
);
// only for log output
SQueryFileInfo
*
pVnodeFiles
=
&
pRuntimeEnv
->
pVnodeFiles
[
fileIndex
];
SQueryFilesInfo
*
pVnodeFileInfo
=
&
pRuntimeEnv
->
vnodeFileInfo
;
SHeaderFileInfo
*
pHeaderFileInfo
=
&
pVnodeFileInfo
->
pFileInfo
[
fileIndex
];
if
(
pVnodeFiles
->
pHeaderFileData
==
NULL
)
{
if
(
pVnodeFileInfo
->
current
!=
fileIndex
||
pVnodeFileInfo
->
pHeaderFileData
==
NULL
)
{
assert
(
pRuntimeEnv
->
mmapedHFileIndex
!=
fileIndex
);
if
(
pVnodeFileInfo
->
current
>=
0
)
{
doUnmapHeaderFileData
(
pRuntimeEnv
);
// do close the other memory mapped header file
assert
(
pVnodeFileInfo
->
pHeaderFileData
!=
NULL
);
}
assert
(
pVnodeFiles
->
pHeaderFileData
==
NULL
);
doCloseOpenedFileData
(
pVnodeFileInfo
);
// do close the other memory mapped header file
assert
(
pVnodeFileInfo
->
pHeaderFileData
==
NULL
);
// current header file is empty or broken, return directly
// current header file is empty or broken, return directly
if
(
isHeaderFileEmpty
(
vnodeId
,
p
VnodeFiles
->
headFileSize
))
{
if
(
isHeaderFileEmpty
(
vnodeId
,
p
HeaderFileInfo
->
headFileSize
))
{
return
pVnodeFiles
->
pHeaderFileData
;
qTrace
(
"QInfo:%p vid:%d, fileId:%d, index:%d, size:%d, ignore file, empty or broken"
,
pQInfo
,
}
pVnodeFileInfo
->
vnodeId
,
pHeaderFileInfo
->
fileID
,
fileIndex
,
pHeaderFileInfo
->
headFileSize
);
if
(
doOpenQueryFileInfoDF
(
pQInfo
,
pVnodeFiles
)
!=
TSDB_CODE_SUCCESS
)
{
return
pVnodeFileInfo
->
pHeaderFileData
;
return
pVnodeFiles
->
pHeaderFileData
;
}
}
pVnodeFiles
->
pHeaderFileData
=
mmap
(
NULL
,
pVnodeFiles
->
headFileSize
,
PROT_READ
,
MAP_SHARED
,
pVnodeFiles
->
headerFd
,
0
);
// set current opened file Index
if
(
pVnodeFiles
->
pHeaderFileData
==
MAP_FAILED
)
{
pVnodeFileInfo
->
current
=
fileIndex
;
pVnodeFiles
->
pHeaderFileData
=
NULL
;
doCloseQueryFileInfoFD
(
pVnodeFiles
);
dError
(
"QInfo:%p failed to mmap header file:%s, size:%lld, %s"
,
pQInfo
,
pVnodeFiles
->
headerFilePath
,
if
(
doOpenQueryFileData
(
pQInfo
,
pVnodeFileInfo
)
!=
TSDB_CODE_SUCCESS
)
{
pVnodeFiles
->
headFileSize
,
strerror
(
errno
));
doCloseOpenedFileData
(
pVnodeFileInfo
);
// there may be partially open fd, close it anyway.
}
else
{
return
pVnodeFileInfo
->
pHeaderFileData
;
pRuntimeEnv
->
mmapedHFileIndex
=
fileIndex
;
// set the value in case of success mmap file
if
(
madvise
(
pVnodeFiles
->
pHeaderFileData
,
pVnodeFiles
->
headFileSize
,
MADV_SEQUENTIAL
)
==
-
1
)
{
dError
(
"QInfo:%p failed to advise kernel the usage of header file, reason:%s"
,
pQInfo
,
strerror
(
errno
));
}
}
}
}
else
{
assert
(
pRuntimeEnv
->
mmapedHFileIndex
==
fileIndex
);
}
}
return
pVnodeFile
s
->
pHeaderFileData
;
return
pVnodeFile
Info
->
pHeaderFileData
;
}
}
/*
/*
...
@@ -365,7 +400,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
...
@@ -365,7 +400,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
SQInfo
*
pQInfo
=
(
SQInfo
*
)
GET_QINFO_ADDR
(
pQuery
);
SQInfo
*
pQInfo
=
(
SQInfo
*
)
GET_QINFO_ADDR
(
pQuery
);
SVnodeCfg
*
pCfg
=
&
vnodeList
[
pMeterObj
->
vnode
].
cfg
;
SVnodeCfg
*
pCfg
=
&
vnodeList
[
pMeterObj
->
vnode
].
cfg
;
S
QueryFileInfo
*
pQueryFileInfo
=
&
pRuntimeEnv
->
pVnodeFiles
[
fileIndex
];
S
HeaderFileInfo
*
pQueryFileInfo
=
&
pRuntimeEnv
->
vnodeFileInfo
.
pFileInfo
[
fileIndex
];
int64_t
st
=
taosGetTimestampUs
();
int64_t
st
=
taosGetTimestampUs
();
...
@@ -392,7 +427,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
...
@@ -392,7 +427,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
#endif
#endif
// check the offset value integrity
// check the offset value integrity
if
(
validateHeaderOffsetSegment
(
pQInfo
,
p
QueryFileInfo
->
headerFilePath
,
pMeterObj
->
vnode
,
data
,
if
(
validateHeaderOffsetSegment
(
pQInfo
,
p
RuntimeEnv
->
vnodeFileInfo
.
headerFilePath
,
pMeterObj
->
vnode
,
data
,
getCompHeaderSegSize
(
pCfg
))
<
0
)
{
getCompHeaderSegSize
(
pCfg
))
<
0
)
{
return
-
1
;
return
-
1
;
}
}
...
@@ -420,7 +455,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
...
@@ -420,7 +455,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
#endif
#endif
// check compblock info integrity
// check compblock info integrity
if
(
validateCompBlockInfoSegment
(
pQInfo
,
p
QueryFileInfo
->
headerFilePath
,
pMeterObj
->
vnode
,
compInfo
,
if
(
validateCompBlockInfoSegment
(
pQInfo
,
p
RuntimeEnv
->
vnodeFileInfo
.
headerFilePath
,
pMeterObj
->
vnode
,
compInfo
,
compHeader
->
compInfoOffset
)
<
0
)
{
compHeader
->
compInfoOffset
)
<
0
)
{
return
-
1
;
return
-
1
;
}
}
...
@@ -454,7 +489,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
...
@@ -454,7 +489,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
#endif
#endif
// check comp block integrity
// check comp block integrity
if
(
validateCompBlockSegment
(
pQInfo
,
p
QueryFileInfo
->
headerFilePath
,
compInfo
,
(
char
*
)
pQuery
->
pBlock
,
if
(
validateCompBlockSegment
(
pQInfo
,
p
RuntimeEnv
->
vnodeFileInfo
.
headerFilePath
,
compInfo
,
(
char
*
)
pQuery
->
pBlock
,
pMeterObj
->
vnode
,
checksum
)
<
0
)
{
pMeterObj
->
vnode
,
checksum
)
<
0
)
{
return
-
1
;
return
-
1
;
}
}
...
@@ -464,7 +499,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
...
@@ -464,7 +499,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
int64_t
et
=
taosGetTimestampUs
();
int64_t
et
=
taosGetTimestampUs
();
qTrace
(
"QInfo:%p vid:%d sid:%d id:%s, fileId:%d, load compblock info, size:%d, elapsed:%f ms"
,
pQInfo
,
qTrace
(
"QInfo:%p vid:%d sid:%d id:%s, fileId:%d, load compblock info, size:%d, elapsed:%f ms"
,
pQInfo
,
pMeterObj
->
vnode
,
pMeterObj
->
sid
,
pMeterObj
->
meterId
,
pRuntimeEnv
->
pVnodeFiles
[
fileIndex
].
fileID
,
pMeterObj
->
vnode
,
pMeterObj
->
sid
,
pMeterObj
->
meterId
,
pRuntimeEnv
->
vnodeFileInfo
.
pFileInfo
[
fileIndex
].
fileID
,
compBlockSize
,
(
et
-
st
)
/
1000
.
0
);
compBlockSize
,
(
et
-
st
)
/
1000
.
0
);
pSummary
->
totalCompInfoSize
+=
compBlockSize
;
pSummary
->
totalCompInfoSize
+=
compBlockSize
;
...
@@ -524,8 +559,9 @@ static int32_t binarySearchForBlock(SQuery *pQuery, int64_t key) {
...
@@ -524,8 +559,9 @@ static int32_t binarySearchForBlock(SQuery *pQuery, int64_t key) {
return
binarySearchForBlockImpl
(
pQuery
->
pBlock
,
pQuery
->
numOfBlocks
,
key
,
pQuery
->
order
.
order
);
return
binarySearchForBlockImpl
(
pQuery
->
pBlock
,
pQuery
->
numOfBlocks
,
key
,
pQuery
->
order
.
order
);
}
}
#if 0
/* unmap previous buffer */
/* unmap previous buffer */
static
UNUSED_FUNC
int32_t
resetMMapWindow
(
S
Query
FileInfo
*
pQueryFileInfo
)
{
static UNUSED_FUNC int32_t resetMMapWindow(S
Header
FileInfo *pQueryFileInfo) {
munmap(pQueryFileInfo->pDataFileData, pQueryFileInfo->defaultMappingSize);
munmap(pQueryFileInfo->pDataFileData, pQueryFileInfo->defaultMappingSize);
pQueryFileInfo->dtFileMappingOffset = 0;
pQueryFileInfo->dtFileMappingOffset = 0;
...
@@ -539,7 +575,7 @@ static UNUSED_FUNC int32_t resetMMapWindow(SQueryFileInfo *pQueryFileInfo) {
...
@@ -539,7 +575,7 @@ static UNUSED_FUNC int32_t resetMMapWindow(SQueryFileInfo *pQueryFileInfo) {
return 0;
return 0;
}
}
static
int32_t
moveMMapWindow
(
S
Query
FileInfo
*
pQueryFileInfo
,
uint64_t
offset
)
{
static int32_t moveMMapWindow(S
Header
FileInfo *pQueryFileInfo, uint64_t offset) {
uint64_t upperBnd = (pQueryFileInfo->dtFileMappingOffset + pQueryFileInfo->defaultMappingSize - 1);
uint64_t upperBnd = (pQueryFileInfo->dtFileMappingOffset + pQueryFileInfo->defaultMappingSize - 1);
/* data that are located in current mmapping window */
/* data that are located in current mmapping window */
...
@@ -586,7 +622,7 @@ static int32_t moveMMapWindow(SQueryFileInfo *pQueryFileInfo, uint64_t offset) {
...
@@ -586,7 +622,7 @@ static int32_t moveMMapWindow(SQueryFileInfo *pQueryFileInfo, uint64_t offset) {
return 0;
return 0;
}
}
static
int32_t
copyDataFromMMapBuffer
(
int
fd
,
SQInfo
*
pQInfo
,
S
Query
FileInfo
*
pQueryFile
,
char
*
buf
,
uint64_t
offset
,
static int32_t copyDataFromMMapBuffer(int fd, SQInfo *pQInfo, S
Header
FileInfo *pQueryFile, char *buf, uint64_t offset,
int32_t size) {
int32_t size) {
assert(size >= 0);
assert(size >= 0);
...
@@ -637,7 +673,9 @@ static int32_t copyDataFromMMapBuffer(int fd, SQInfo *pQInfo, SQueryFileInfo *pQ
...
@@ -637,7 +673,9 @@ static int32_t copyDataFromMMapBuffer(int fd, SQInfo *pQInfo, SQueryFileInfo *pQ
return 0;
return 0;
}
}
static
int32_t
readDataFromDiskFile
(
int
fd
,
SQInfo
*
pQInfo
,
SQueryFileInfo
*
pQueryFile
,
char
*
buf
,
uint64_t
offset
,
#endif
static
int32_t
readDataFromDiskFile
(
int
fd
,
SQInfo
*
pQInfo
,
SQueryFilesInfo
*
pQueryFile
,
char
*
buf
,
uint64_t
offset
,
int32_t
size
)
{
int32_t
size
)
{
assert
(
size
>=
0
);
assert
(
size
>=
0
);
...
@@ -652,7 +690,7 @@ static int32_t readDataFromDiskFile(int fd, SQInfo *pQInfo, SQueryFileInfo *pQue
...
@@ -652,7 +690,7 @@ static int32_t readDataFromDiskFile(int fd, SQInfo *pQInfo, SQueryFileInfo *pQue
return
0
;
return
0
;
}
}
static
int32_t
loadColumnIntoMem
(
SQuery
*
pQuery
,
SQueryFileInfo
*
pQueryFileInfo
,
SCompBlock
*
pBlock
,
SField
*
pFields
,
static
int32_t
loadColumnIntoMem
(
SQuery
*
pQuery
,
SQueryFile
s
Info
*
pQueryFileInfo
,
SCompBlock
*
pBlock
,
SField
*
pFields
,
int32_t
col
,
SData
*
sdata
,
void
*
tmpBuf
,
char
*
buffer
,
int32_t
buffersize
)
{
int32_t
col
,
SData
*
sdata
,
void
*
tmpBuf
,
char
*
buffer
,
int32_t
buffersize
)
{
char
*
dst
=
(
pBlock
->
algorithm
)
?
tmpBuf
:
sdata
->
data
;
char
*
dst
=
(
pBlock
->
algorithm
)
?
tmpBuf
:
sdata
->
data
;
...
@@ -660,14 +698,14 @@ static int32_t loadColumnIntoMem(SQuery *pQuery, SQueryFileInfo *pQueryFileInfo,
...
@@ -660,14 +698,14 @@ static int32_t loadColumnIntoMem(SQuery *pQuery, SQueryFileInfo *pQueryFileInfo,
SQInfo
*
pQInfo
=
(
SQInfo
*
)
GET_QINFO_ADDR
(
pQuery
);
SQInfo
*
pQInfo
=
(
SQInfo
*
)
GET_QINFO_ADDR
(
pQuery
);
int
fd
=
pBlock
->
last
?
pQueryFileInfo
->
lastFd
:
pQueryFileInfo
->
dataFd
;
int
fd
=
pBlock
->
last
?
pQueryFileInfo
->
lastFd
:
pQueryFileInfo
->
dataFd
;
int32_t
ret
=
(
*
readDataFunctor
[
DEFAULT_IO_ENGINE
])
(
fd
,
pQInfo
,
pQueryFileInfo
,
dst
,
offset
,
pFields
[
col
].
len
);
int32_t
ret
=
readDataFromDiskFile
(
fd
,
pQInfo
,
pQueryFileInfo
,
dst
,
offset
,
pFields
[
col
].
len
);
if
(
ret
!=
0
)
{
if
(
ret
!=
0
)
{
return
ret
;
return
ret
;
}
}
// load checksum
// load checksum
TSCKSUM
checksum
=
0
;
TSCKSUM
checksum
=
0
;
ret
=
(
*
readDataFunctor
[
DEFAULT_IO_ENGINE
])
(
fd
,
pQInfo
,
pQueryFileInfo
,
(
char
*
)
&
checksum
,
offset
+
pFields
[
col
].
len
,
ret
=
readDataFromDiskFile
(
fd
,
pQInfo
,
pQueryFileInfo
,
(
char
*
)
&
checksum
,
offset
+
pFields
[
col
].
len
,
sizeof
(
TSCKSUM
));
sizeof
(
TSCKSUM
));
if
(
ret
!=
0
)
{
if
(
ret
!=
0
)
{
return
ret
;
return
ret
;
...
@@ -689,11 +727,11 @@ static int32_t loadColumnIntoMem(SQuery *pQuery, SQueryFileInfo *pQueryFileInfo,
...
@@ -689,11 +727,11 @@ static int32_t loadColumnIntoMem(SQuery *pQuery, SQueryFileInfo *pQueryFileInfo,
return
0
;
return
0
;
}
}
static
int32_t
loadDataBlockFieldsInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQueryFileInfo
*
pQueryFileInfo
,
static
int32_t
loadDataBlockFieldsInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SCompBlock
*
pBlock
,
SField
**
pField
)
{
SCompBlock
*
pBlock
,
SField
**
pField
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQInfo
*
pQInfo
=
(
SQInfo
*
)
GET_QINFO_ADDR
(
pQuery
);
SQInfo
*
pQInfo
=
(
SQInfo
*
)
GET_QINFO_ADDR
(
pQuery
);
SMeterObj
*
pMeterObj
=
pRuntimeEnv
->
pMeterObj
;
SMeterObj
*
pMeterObj
=
pRuntimeEnv
->
pMeterObj
;
SQueryFilesInfo
*
pVnodeFilesInfo
=
&
pRuntimeEnv
->
vnodeFileInfo
;
size_t
size
=
sizeof
(
SField
)
*
(
pBlock
->
numOfCols
)
+
sizeof
(
TSCKSUM
);
size_t
size
=
sizeof
(
SField
)
*
(
pBlock
->
numOfCols
)
+
sizeof
(
TSCKSUM
);
...
@@ -709,9 +747,8 @@ static int32_t loadDataBlockFieldsInfo(SQueryRuntimeEnv *pRuntimeEnv, SQueryFile
...
@@ -709,9 +747,8 @@ static int32_t loadDataBlockFieldsInfo(SQueryRuntimeEnv *pRuntimeEnv, SQueryFile
int64_t
st
=
taosGetTimestampUs
();
int64_t
st
=
taosGetTimestampUs
();
int
fd
=
pBlock
->
last
?
pQueryFileInfo
->
lastFd
:
pQueryFileInfo
->
dataFd
;
int
fd
=
pBlock
->
last
?
pVnodeFilesInfo
->
lastFd
:
pVnodeFilesInfo
->
dataFd
;
int32_t
ret
=
int32_t
ret
=
readDataFromDiskFile
(
fd
,
pQInfo
,
pVnodeFilesInfo
,
(
char
*
)(
*
pField
),
pBlock
->
offset
,
size
);
(
*
readDataFunctor
[
DEFAULT_IO_ENGINE
])(
fd
,
pQInfo
,
pQueryFileInfo
,
(
char
*
)(
*
pField
),
pBlock
->
offset
,
size
);
if
(
ret
!=
0
)
{
if
(
ret
!=
0
)
{
return
ret
;
return
ret
;
}
}
...
@@ -719,7 +756,7 @@ static int32_t loadDataBlockFieldsInfo(SQueryRuntimeEnv *pRuntimeEnv, SQueryFile
...
@@ -719,7 +756,7 @@ static int32_t loadDataBlockFieldsInfo(SQueryRuntimeEnv *pRuntimeEnv, SQueryFile
// check fields integrity
// check fields integrity
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)(
*
pField
),
size
))
{
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)(
*
pField
),
size
))
{
dLError
(
"QInfo:%p vid:%d sid:%d id:%s, slot:%d, failed to read sfields, file:%s, sfields area broken:%lld"
,
pQInfo
,
dLError
(
"QInfo:%p vid:%d sid:%d id:%s, slot:%d, failed to read sfields, file:%s, sfields area broken:%lld"
,
pQInfo
,
pMeterObj
->
vnode
,
pMeterObj
->
sid
,
pMeterObj
->
meterId
,
pQuery
->
slot
,
p
QueryFile
Info
->
dataFilePath
,
pMeterObj
->
vnode
,
pMeterObj
->
sid
,
pMeterObj
->
meterId
,
pQuery
->
slot
,
p
VnodeFiles
Info
->
dataFilePath
,
pBlock
->
offset
);
pBlock
->
offset
);
return
-
1
;
return
-
1
;
}
}
...
@@ -747,7 +784,8 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
...
@@ -747,7 +784,8 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
SMeterObj
*
pMeterObj
=
pRuntimeEnv
->
pMeterObj
;
SMeterObj
*
pMeterObj
=
pRuntimeEnv
->
pMeterObj
;
SData
**
sdata
=
pRuntimeEnv
->
colDataBuffer
;
SData
**
sdata
=
pRuntimeEnv
->
colDataBuffer
;
SQueryFileInfo
*
pQueryFileInfo
=
&
pRuntimeEnv
->
pVnodeFiles
[
fileIdx
];
assert
(
fileIdx
==
pRuntimeEnv
->
vnodeFileInfo
.
current
);
SData
**
primaryTSBuf
=
&
pRuntimeEnv
->
primaryColBuffer
;
SData
**
primaryTSBuf
=
&
pRuntimeEnv
->
primaryColBuffer
;
void
*
tmpBuf
=
pRuntimeEnv
->
unzipBuffer
;
void
*
tmpBuf
=
pRuntimeEnv
->
unzipBuffer
;
...
@@ -760,7 +798,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
...
@@ -760,7 +798,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
}
}
/* failed to load fields info, return with error info */
/* failed to load fields info, return with error info */
if
(
loadSField
&&
(
loadDataBlockFieldsInfo
(
pRuntimeEnv
,
p
QueryFileInfo
,
p
Block
,
pField
)
!=
0
))
{
if
(
loadSField
&&
(
loadDataBlockFieldsInfo
(
pRuntimeEnv
,
pBlock
,
pField
)
!=
0
))
{
return
-
1
;
return
-
1
;
}
}
...
@@ -775,7 +813,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
...
@@ -775,7 +813,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
}
else
{
}
else
{
columnBytes
+=
(
*
pField
)[
PRIMARYKEY_TIMESTAMP_COL_INDEX
].
len
+
sizeof
(
TSCKSUM
);
columnBytes
+=
(
*
pField
)[
PRIMARYKEY_TIMESTAMP_COL_INDEX
].
len
+
sizeof
(
TSCKSUM
);
int32_t
ret
=
int32_t
ret
=
loadColumnIntoMem
(
pQuery
,
pQuery
FileInfo
,
pBlock
,
*
pField
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
,
*
primaryTSBuf
,
loadColumnIntoMem
(
pQuery
,
&
pRuntimeEnv
->
vnode
FileInfo
,
pBlock
,
*
pField
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
,
*
primaryTSBuf
,
tmpBuf
,
pRuntimeEnv
->
secondaryUnzipBuffer
,
pRuntimeEnv
->
unzipBufSize
);
tmpBuf
,
pRuntimeEnv
->
secondaryUnzipBuffer
,
pRuntimeEnv
->
unzipBufSize
);
if
(
ret
!=
0
)
{
if
(
ret
!=
0
)
{
return
-
1
;
return
-
1
;
...
@@ -813,7 +851,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
...
@@ -813,7 +851,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
fillWithNull
(
pQuery
,
sdata
[
i
]
->
data
,
i
,
pBlock
->
numOfPoints
);
fillWithNull
(
pQuery
,
sdata
[
i
]
->
data
,
i
,
pBlock
->
numOfPoints
);
}
else
{
}
else
{
columnBytes
+=
(
*
pField
)[
j
].
len
+
sizeof
(
TSCKSUM
);
columnBytes
+=
(
*
pField
)[
j
].
len
+
sizeof
(
TSCKSUM
);
ret
=
loadColumnIntoMem
(
pQuery
,
pQuery
FileInfo
,
pBlock
,
*
pField
,
j
,
sdata
[
i
],
tmpBuf
,
ret
=
loadColumnIntoMem
(
pQuery
,
&
pRuntimeEnv
->
vnode
FileInfo
,
pBlock
,
*
pField
,
j
,
sdata
[
i
],
tmpBuf
,
pRuntimeEnv
->
secondaryUnzipBuffer
,
pRuntimeEnv
->
unzipBufSize
);
pRuntimeEnv
->
secondaryUnzipBuffer
,
pRuntimeEnv
->
unzipBufSize
);
pSummary
->
numOfSeek
++
;
pSummary
->
numOfSeek
++
;
...
@@ -1763,23 +1801,25 @@ static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *
...
@@ -1763,23 +1801,25 @@ static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *
}
}
int32_t
vnodeGetVnodeHeaderFileIdx
(
int32_t
*
fid
,
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
order
)
{
int32_t
vnodeGetVnodeHeaderFileIdx
(
int32_t
*
fid
,
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
order
)
{
if
(
pRuntimeEnv
->
numOfFiles
==
0
)
{
if
(
pRuntimeEnv
->
vnodeFileInfo
.
numOfFiles
==
0
)
{
return
-
1
;
return
-
1
;
}
}
SQueryFilesInfo
*
pVnodeFiles
=
&
pRuntimeEnv
->
vnodeFileInfo
;
/* set the initial file for current query */
/* set the initial file for current query */
if
(
order
==
TSQL_SO_ASC
&&
*
fid
<
p
RuntimeEnv
->
pVnodeFiles
[
0
].
fileID
)
{
if
(
order
==
TSQL_SO_ASC
&&
*
fid
<
p
VnodeFiles
->
pFileInfo
[
0
].
fileID
)
{
*
fid
=
p
RuntimeEnv
->
pVnodeFiles
[
0
].
fileID
;
*
fid
=
p
VnodeFiles
->
pFileInfo
[
0
].
fileID
;
return
0
;
return
0
;
}
else
if
(
order
==
TSQL_SO_DESC
&&
*
fid
>
p
RuntimeEnv
->
pVnodeFiles
[
pRuntimeEnv
->
numOfFiles
-
1
].
fileID
)
{
}
else
if
(
order
==
TSQL_SO_DESC
&&
*
fid
>
p
VnodeFiles
->
pFileInfo
[
pVnodeFiles
->
numOfFiles
-
1
].
fileID
)
{
*
fid
=
p
RuntimeEnv
->
pVnodeFiles
[
pRuntimeEnv
->
numOfFiles
-
1
].
fileID
;
*
fid
=
p
VnodeFiles
->
pFileInfo
[
pVnodeFiles
->
numOfFiles
-
1
].
fileID
;
return
p
RuntimeEnv
->
numOfFiles
-
1
;
return
p
VnodeFiles
->
numOfFiles
-
1
;
}
}
int32_t
numOfFiles
=
p
RuntimeEnv
->
numOfFiles
;
int32_t
numOfFiles
=
p
VnodeFiles
->
numOfFiles
;
if
(
order
==
TSQL_SO_DESC
&&
*
fid
>
p
RuntimeEnv
->
pVnodeFiles
[
numOfFiles
-
1
].
fileID
)
{
if
(
order
==
TSQL_SO_DESC
&&
*
fid
>
p
VnodeFiles
->
pFileInfo
[
numOfFiles
-
1
].
fileID
)
{
*
fid
=
p
RuntimeEnv
->
pVnodeFiles
[
numOfFiles
-
1
].
fileID
;
*
fid
=
p
VnodeFiles
->
pFileInfo
[
numOfFiles
-
1
].
fileID
;
return
numOfFiles
-
1
;
return
numOfFiles
-
1
;
}
}
...
@@ -1787,12 +1827,12 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv,
...
@@ -1787,12 +1827,12 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv,
int32_t
i
=
0
;
int32_t
i
=
0
;
int32_t
step
=
1
;
int32_t
step
=
1
;
while
(
i
<
numOfFiles
&&
*
fid
>
p
RuntimeEnv
->
pVnodeFiles
[
i
].
fileID
)
{
while
(
i
<
numOfFiles
&&
*
fid
>
p
VnodeFiles
->
pFileInfo
[
i
].
fileID
)
{
i
+=
step
;
i
+=
step
;
}
}
if
(
i
<
numOfFiles
&&
*
fid
<=
p
RuntimeEnv
->
pVnodeFiles
[
i
].
fileID
)
{
if
(
i
<
numOfFiles
&&
*
fid
<=
p
VnodeFiles
->
pFileInfo
[
i
].
fileID
)
{
*
fid
=
p
RuntimeEnv
->
pVnodeFiles
[
i
].
fileID
;
*
fid
=
p
VnodeFiles
->
pFileInfo
[
i
].
fileID
;
return
i
;
return
i
;
}
else
{
}
else
{
return
-
1
;
return
-
1
;
...
@@ -1801,12 +1841,12 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv,
...
@@ -1801,12 +1841,12 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv,
int32_t
i
=
numOfFiles
-
1
;
int32_t
i
=
numOfFiles
-
1
;
int32_t
step
=
-
1
;
int32_t
step
=
-
1
;
while
(
i
>=
0
&&
*
fid
<
p
RuntimeEnv
->
pVnodeFiles
[
i
].
fileID
)
{
while
(
i
>=
0
&&
*
fid
<
p
VnodeFiles
->
pFileInfo
[
i
].
fileID
)
{
i
+=
step
;
i
+=
step
;
}
}
if
(
i
>=
0
&&
*
fid
>=
p
RuntimeEnv
->
pVnodeFiles
[
i
].
fileID
)
{
if
(
i
>=
0
&&
*
fid
>=
p
VnodeFiles
->
pFileInfo
[
i
].
fileID
)
{
*
fid
=
p
RuntimeEnv
->
pVnodeFiles
[
i
].
fileID
;
*
fid
=
p
VnodeFiles
->
pFileInfo
[
i
].
fileID
;
return
i
;
return
i
;
}
else
{
}
else
{
return
-
1
;
return
-
1
;
...
@@ -2137,24 +2177,11 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
...
@@ -2137,24 +2177,11 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tfree
(
pRuntimeEnv
->
primaryColBuffer
);
tfree
(
pRuntimeEnv
->
primaryColBuffer
);
}
}
for
(
int32_t
i
=
0
;
i
<
pRuntimeEnv
->
numOfFiles
;
++
i
)
{
doCloseOpenedFileData
(
&
pRuntimeEnv
->
vnodeFileInfo
);
SQueryFileInfo
*
pQFileInfo
=
&
(
pRuntimeEnv
->
pVnodeFiles
[
i
]);
if
(
pQFileInfo
->
pHeaderFileData
!=
NULL
&&
pQFileInfo
->
pHeaderFileData
!=
MAP_FAILED
)
{
munmap
(
pQFileInfo
->
pHeaderFileData
,
pQFileInfo
->
headFileSize
);
}
tclose
(
pQFileInfo
->
headerFd
);
if
(
pQFileInfo
->
pDataFileData
!=
NULL
&&
pQFileInfo
->
pDataFileData
!=
MAP_FAILED
)
{
munmap
(
pQFileInfo
->
pDataFileData
,
pQFileInfo
->
defaultMappingSize
);
}
tclose
(
pQFileInfo
->
dataFd
);
tclose
(
pQFileInfo
->
lastFd
);
}
if
(
pRuntimeEnv
->
pVnodeFiles
!=
NULL
)
{
if
(
pRuntimeEnv
->
vnodeFileInfo
.
pFileInfo
!=
NULL
)
{
pRuntimeEnv
->
numOfFiles
=
0
;
pRuntimeEnv
->
vnodeFileInfo
.
numOfFiles
=
0
;
free
(
pRuntimeEnv
->
pVnodeFiles
);
free
(
pRuntimeEnv
->
vnodeFileInfo
.
pFileInfo
);
}
}
if
(
pRuntimeEnv
->
pInterpoBuf
!=
NULL
)
{
if
(
pRuntimeEnv
->
pInterpoBuf
!=
NULL
)
{
...
@@ -2962,8 +2989,8 @@ bool vnodeParametersSafetyCheck(SQuery *pQuery) {
...
@@ -2962,8 +2989,8 @@ bool vnodeParametersSafetyCheck(SQuery *pQuery) {
}
}
static
int
file_order_comparator
(
const
void
*
p1
,
const
void
*
p2
)
{
static
int
file_order_comparator
(
const
void
*
p1
,
const
void
*
p2
)
{
S
QueryFileInfo
*
pInfo1
=
(
SQuery
FileInfo
*
)
p1
;
S
HeaderFileInfo
*
pInfo1
=
(
SHeader
FileInfo
*
)
p1
;
S
QueryFileInfo
*
pInfo2
=
(
SQuery
FileInfo
*
)
p2
;
S
HeaderFileInfo
*
pInfo2
=
(
SHeader
FileInfo
*
)
p2
;
if
(
pInfo1
->
fileID
==
pInfo2
->
fileID
)
{
if
(
pInfo1
->
fileID
==
pInfo2
->
fileID
)
{
return
0
;
return
0
;
...
@@ -2983,24 +3010,17 @@ static int file_order_comparator(const void *p1, const void *p2) {
...
@@ -2983,24 +3010,17 @@ static int file_order_comparator(const void *p1, const void *p2) {
* @param prefix
* @param prefix
* @return
* @return
*/
*/
static
int32_t
vnodeOpenVnodeDBFiles
(
SQueryFile
Info
*
pVnodeFiles
,
int32_t
fid
,
int32_t
vnodeId
,
char
*
fileName
,
static
int32_t
vnodeOpenVnodeDBFiles
(
SQueryFile
sInfo
*
pVnodeFiles
,
int32_t
fid
,
int32_t
index
,
char
*
fileName
)
{
char
*
prefix
)
{
SHeaderFileInfo
*
pFileInfo
=
&
pVnodeFiles
->
pFileInfo
[
index
];
pVnodeFiles
->
fileID
=
fid
;
pFileInfo
->
fileID
=
fid
;
pVnodeFiles
->
defaultMappingSize
=
DEFAULT_DATA_FILE_MMAP_WINDOW_SIZE
;
snprintf
(
pVnodeFiles
->
headerFilePath
,
PATH_MAX
,
"%s%s"
,
prefix
,
fileName
);
char
buf
[
PATH_MAX
]
=
{
0
};
snprintf
(
buf
,
PATH_MAX
,
"%s%s"
,
pVnodeFiles
->
dbFilePathPrefix
,
fileName
);
struct
stat
fstat
=
{
0
};
struct
stat
fstat
=
{
0
};
if
(
stat
(
pVnodeFiles
->
headerFilePath
,
&
fstat
)
<
0
)
return
-
1
;
if
(
stat
(
buf
,
&
fstat
)
<
0
)
return
-
1
;
pVnodeFiles
->
headFileSize
=
fstat
.
st_size
;
pFileInfo
->
headFileSize
=
fstat
.
st_size
;
snprintf
(
pVnodeFiles
->
dataFilePath
,
PATH_MAX
,
"%sv%df%d.data"
,
prefix
,
vnodeId
,
fid
);
snprintf
(
pVnodeFiles
->
lastFilePath
,
PATH_MAX
,
"%sv%df%d.last"
,
prefix
,
vnodeId
,
fid
);
pVnodeFiles
->
headerFd
=
FD_INITIALIZER
;
pVnodeFiles
->
dataFd
=
FD_INITIALIZER
;
pVnodeFiles
->
lastFd
=
FD_INITIALIZER
;
#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP
#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP
/* enforce kernel to preload data when the file is mapping */
/* enforce kernel to preload data when the file is mapping */
...
@@ -3035,23 +3055,23 @@ _clean:
...
@@ -3035,23 +3055,23 @@ _clean:
#endif
#endif
}
}
static
void
vnodeOpenAllFiles
(
SQInfo
*
pQInfo
,
int32_t
vnodeId
)
{
static
void
vnodeRecordAllFiles
(
SQInfo
*
pQInfo
,
int32_t
vnodeId
)
{
char
dbFilePathPrefix
[
TSDB_FILENAME_LEN
]
=
{
0
};
sprintf
(
dbFilePathPrefix
,
"%s/vnode%d/db/"
,
tsDirectory
,
vnodeId
);
DIR
*
pDir
=
opendir
(
dbFilePathPrefix
);
if
(
pDir
==
NULL
)
{
dError
(
"QInfo:%p failed to open directory:%s"
,
pQInfo
,
dbFilePathPrefix
);
return
;
}
char
suffix
[]
=
".head"
;
char
suffix
[]
=
".head"
;
struct
dirent
*
pEntry
=
NULL
;
struct
dirent
*
pEntry
=
NULL
;
size_t
alloc
=
4
;
// default allocated size
size_t
alloc
=
4
;
// default allocated size
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
(
pQInfo
->
pMeterQuerySupporter
->
runtimeEnv
);
SQueryFilesInfo
*
pVnodeFilesInfo
=
&
(
pQInfo
->
pMeterQuerySupporter
->
runtimeEnv
.
vnodeFileInfo
);
pRuntimeEnv
->
pVnodeFiles
=
calloc
(
1
,
sizeof
(
SQueryFileInfo
)
*
alloc
);
pVnodeFilesInfo
->
vnodeId
=
vnodeId
;
sprintf
(
pVnodeFilesInfo
->
dbFilePathPrefix
,
"%s/vnode%d/db/"
,
tsDirectory
,
vnodeId
);
DIR
*
pDir
=
opendir
(
pVnodeFilesInfo
->
dbFilePathPrefix
);
if
(
pDir
==
NULL
)
{
dError
(
"QInfo:%p failed to open directory:%s"
,
pQInfo
,
pVnodeFilesInfo
->
dbFilePathPrefix
);
return
;
}
pVnodeFilesInfo
->
pFileInfo
=
calloc
(
1
,
sizeof
(
SHeaderFileInfo
)
*
alloc
);
SVnodeObj
*
pVnode
=
&
vnodeList
[
vnodeId
];
SVnodeObj
*
pVnode
=
&
vnodeList
[
vnodeId
];
while
((
pEntry
=
readdir
(
pDir
))
!=
NULL
)
{
while
((
pEntry
=
readdir
(
pDir
))
!=
NULL
)
{
...
@@ -3085,26 +3105,28 @@ static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
...
@@ -3085,26 +3105,28 @@ static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
assert
(
fid
>=
0
&&
vid
>=
0
);
assert
(
fid
>=
0
&&
vid
>=
0
);
if
(
++
p
RuntimeEnv
->
numOfFiles
>
alloc
)
{
if
(
++
p
VnodeFilesInfo
->
numOfFiles
>
alloc
)
{
alloc
=
alloc
<<
1
;
alloc
=
alloc
<<
1
U
;
p
RuntimeEnv
->
pVnodeFiles
=
realloc
(
pRuntimeEnv
->
pVnodeFiles
,
alloc
*
sizeof
(
SQuery
FileInfo
));
p
VnodeFilesInfo
->
pFileInfo
=
realloc
(
pVnodeFilesInfo
->
pFileInfo
,
alloc
*
sizeof
(
SHeader
FileInfo
));
memset
(
&
p
RuntimeEnv
->
pVnodeFiles
[
alloc
>>
1
],
0
,
(
alloc
>>
1
)
*
sizeof
(
SQuery
FileInfo
));
memset
(
&
p
VnodeFilesInfo
->
pFileInfo
[
alloc
>>
1U
],
0
,
(
alloc
>>
1U
)
*
sizeof
(
SHeader
FileInfo
));
}
}
SQueryFileInfo
*
pVnodeFiles
=
&
pRuntimeEnv
->
pVnodeFiles
[
pRuntimeEnv
->
numOfFiles
-
1
];
SHeaderFileInfo
*
pVnodeFiles
=
&
pVnodeFilesInfo
->
pFileInfo
[
pVnodeFilesInfo
->
numOfFiles
-
1
];
int32_t
ret
=
vnodeOpenVnodeDBFiles
(
pVnodeFiles
,
fid
,
vnodeId
,
pEntry
->
d_name
,
dbFilePathPrefix
);
int32_t
ret
=
vnodeOpenVnodeDBFiles
(
pVnodeFilesInfo
,
fid
,
pVnodeFilesInfo
->
numOfFiles
-
1
,
pEntry
->
d_name
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
memset
(
pVnodeFiles
,
0
,
sizeof
(
S
Query
FileInfo
));
// reset information
memset
(
pVnodeFiles
,
0
,
sizeof
(
S
Header
FileInfo
));
// reset information
p
RuntimeEnv
->
numOfFiles
-=
1
;
p
VnodeFilesInfo
->
numOfFiles
-=
1
;
}
}
}
}
closedir
(
pDir
);
closedir
(
pDir
);
dTrace
(
"QInfo:%p find %d data files in %s to be checked"
,
pQInfo
,
pRuntimeEnv
->
numOfFiles
,
dbFilePathPrefix
);
dTrace
(
"QInfo:%p find %d data files in %s to be checked"
,
pQInfo
,
pVnodeFilesInfo
->
numOfFiles
,
pVnodeFilesInfo
->
dbFilePathPrefix
);
/* order the files information according their names */
/* order the files information according their names */
qsort
(
p
RuntimeEnv
->
pVnodeFiles
,
(
size_t
)
pRuntimeEnv
->
numOfFiles
,
sizeof
(
SQuery
FileInfo
),
file_order_comparator
);
qsort
(
p
VnodeFilesInfo
->
pFileInfo
,
(
size_t
)
pVnodeFilesInfo
->
numOfFiles
,
sizeof
(
SHeader
FileInfo
),
file_order_comparator
);
}
}
static
void
updateOffsetVal
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SBlockInfo
*
pBlockInfo
,
void
*
pBlock
)
{
static
void
updateOffsetVal
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SBlockInfo
*
pBlockInfo
,
void
*
pBlock
)
{
...
@@ -3686,7 +3708,8 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
...
@@ -3686,7 +3708,8 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
// dataInCache requires lastKey value
// dataInCache requires lastKey value
pQuery
->
lastKey
=
pQuery
->
skey
;
pQuery
->
lastKey
=
pQuery
->
skey
;
pSupporter
->
runtimeEnv
.
mmapedHFileIndex
=
-
1
;
// set the initial value
doInitQueryFileInfoFD
(
&
pSupporter
->
runtimeEnv
.
vnodeFileInfo
);
vnodeInitDataBlockInfo
(
&
pSupporter
->
runtimeEnv
.
loadBlockInfo
);
vnodeInitDataBlockInfo
(
&
pSupporter
->
runtimeEnv
.
loadBlockInfo
);
vnodeInitLoadCompBlockInfo
(
&
pSupporter
->
runtimeEnv
.
loadCompBlockInfo
);
vnodeInitLoadCompBlockInfo
(
&
pSupporter
->
runtimeEnv
.
loadCompBlockInfo
);
...
@@ -3719,7 +3742,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
...
@@ -3719,7 +3742,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
return
ret
;
return
ret
;
}
}
vnode
Open
AllFiles
(
pQInfo
,
pMeterObj
->
vnode
);
vnode
Record
AllFiles
(
pQInfo
,
pMeterObj
->
vnode
);
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
{
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
{
if
((
ret
=
allocateOutputBufForGroup
(
pSupporter
,
pQuery
,
false
))
!=
TSDB_CODE_SUCCESS
)
{
if
((
ret
=
allocateOutputBufForGroup
(
pSupporter
,
pQuery
,
false
))
!=
TSDB_CODE_SUCCESS
)
{
...
@@ -3856,7 +3879,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
...
@@ -3856,7 +3879,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
changeExecuteScanOrder
(
pQuery
,
true
);
changeExecuteScanOrder
(
pQuery
,
true
);
pSupporter
->
runtimeEnv
.
mmapedHFileIndex
=
-
1
;
// set the initial value
doInitQueryFileInfoFD
(
&
pSupporter
->
runtimeEnv
.
vnodeFileInfo
);
vnodeInitDataBlockInfo
(
&
pSupporter
->
runtimeEnv
.
loadBlockInfo
);
vnodeInitDataBlockInfo
(
&
pSupporter
->
runtimeEnv
.
loadBlockInfo
);
vnodeInitLoadCompBlockInfo
(
&
pSupporter
->
runtimeEnv
.
loadCompBlockInfo
);
vnodeInitLoadCompBlockInfo
(
&
pSupporter
->
runtimeEnv
.
loadCompBlockInfo
);
...
@@ -3897,7 +3920,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
...
@@ -3897,7 +3920,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
}
}
tSidSetSort
(
pSupporter
->
pSidSet
);
tSidSetSort
(
pSupporter
->
pSidSet
);
vnode
Open
AllFiles
(
pQInfo
,
pMeter
->
vnode
);
vnode
Record
AllFiles
(
pQInfo
,
pMeter
->
vnode
);
if
((
ret
=
allocateOutputBufForGroup
(
pSupporter
,
pQuery
,
true
))
!=
TSDB_CODE_SUCCESS
)
{
if
((
ret
=
allocateOutputBufForGroup
(
pSupporter
,
pQuery
,
true
))
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
return
ret
;
...
@@ -4038,7 +4061,7 @@ TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) {
...
@@ -4038,7 +4061,7 @@ TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) {
// the fields info is not loaded, load it into memory
// the fields info is not loaded, load it into memory
if
(
pQuery
->
pFields
==
NULL
||
pQuery
->
pFields
[
pQuery
->
slot
]
==
NULL
)
{
if
(
pQuery
->
pFields
==
NULL
||
pQuery
->
pFields
[
pQuery
->
slot
]
==
NULL
)
{
loadDataBlockFieldsInfo
(
pRuntimeEnv
,
&
pRuntimeEnv
->
pVnodeFiles
[
fileIndex
],
pBlock
,
&
pQuery
->
pFields
[
pQuery
->
slot
]);
loadDataBlockFieldsInfo
(
pRuntimeEnv
,
pBlock
,
&
pQuery
->
pFields
[
pQuery
->
slot
]);
}
}
SET_DATA_BLOCK_LOADED
(
pRuntimeEnv
->
blockStatus
);
SET_DATA_BLOCK_LOADED
(
pRuntimeEnv
->
blockStatus
);
...
@@ -5533,7 +5556,8 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t
...
@@ -5533,7 +5556,8 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t
SMeterQuerySupportObj
*
pSupporter
=
pQInfo
->
pMeterQuerySupporter
;
SMeterQuerySupportObj
*
pSupporter
=
pQInfo
->
pMeterQuerySupporter
;
SMeterSidExtInfo
**
pMeterSidExtInfo
=
pSupporter
->
pMeterSidExtInfo
;
SMeterSidExtInfo
**
pMeterSidExtInfo
=
pSupporter
->
pMeterSidExtInfo
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pSupporter
->
runtimeEnv
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pSupporter
->
runtimeEnv
;
SQueryFileInfo
*
pQueryFileInfo
=
&
pRuntimeEnv
->
pVnodeFiles
[
fileIndex
];
SHeaderFileInfo
*
pQueryFileInfo
=
&
pRuntimeEnv
->
vnodeFileInfo
.
pFileInfo
[
fileIndex
];
SVnodeObj
*
pVnode
=
&
vnodeList
[
vid
];
SVnodeObj
*
pVnode
=
&
vnodeList
[
vid
];
...
@@ -5545,7 +5569,7 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t
...
@@ -5545,7 +5569,7 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t
int32_t
tmsize
=
sizeof
(
SCompHeader
)
*
(
pVnode
->
cfg
.
maxSessions
)
+
sizeof
(
TSCKSUM
);
int32_t
tmsize
=
sizeof
(
SCompHeader
)
*
(
pVnode
->
cfg
.
maxSessions
)
+
sizeof
(
TSCKSUM
);
// file is corrupted, abort query in current file
// file is corrupted, abort query in current file
if
(
validateHeaderOffsetSegment
(
pQInfo
,
p
QueryFileInfo
->
headerFilePath
,
vid
,
pHeaderFileData
,
tmsize
)
<
0
)
{
if
(
validateHeaderOffsetSegment
(
pQInfo
,
p
RuntimeEnv
->
vnodeFileInfo
.
headerFilePath
,
vid
,
pHeaderFileData
,
tmsize
)
<
0
)
{
*
numOfMeters
=
0
;
*
numOfMeters
=
0
;
return
0
;
return
0
;
}
}
...
@@ -5835,7 +5859,7 @@ static bool setCurrentQueryRange(SMeterDataInfo *pMeterDataInfo, SQuery *pQuery,
...
@@ -5835,7 +5859,7 @@ static bool setCurrentQueryRange(SMeterDataInfo *pMeterDataInfo, SQuery *pQuery,
* @return
* @return
*/
*/
uint32_t
getDataBlocksForMeters
(
SMeterQuerySupportObj
*
pSupporter
,
SQuery
*
pQuery
,
char
*
pHeaderData
,
uint32_t
getDataBlocksForMeters
(
SMeterQuerySupportObj
*
pSupporter
,
SQuery
*
pQuery
,
char
*
pHeaderData
,
int32_t
numOfMeters
,
SQueryFileInfo
*
pQueryFileInfo
,
SMeterDataInfo
**
pMeterDataInfo
)
{
int32_t
numOfMeters
,
const
char
*
filePath
,
SMeterDataInfo
**
pMeterDataInfo
)
{
uint32_t
numOfBlocks
=
0
;
uint32_t
numOfBlocks
=
0
;
SQInfo
*
pQInfo
=
(
SQInfo
*
)
GET_QINFO_ADDR
(
pQuery
);
SQInfo
*
pQInfo
=
(
SQInfo
*
)
GET_QINFO_ADDR
(
pQuery
);
SQueryCostSummary
*
pSummary
=
&
pSupporter
->
runtimeEnv
.
summary
;
SQueryCostSummary
*
pSummary
=
&
pSupporter
->
runtimeEnv
.
summary
;
...
@@ -5847,7 +5871,7 @@ uint32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuer
...
@@ -5847,7 +5871,7 @@ uint32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuer
SMeterObj
*
pMeterObj
=
pMeterDataInfo
[
j
]
->
pMeterObj
;
SMeterObj
*
pMeterObj
=
pMeterDataInfo
[
j
]
->
pMeterObj
;
SCompInfo
*
compInfo
=
(
SCompInfo
*
)(
pHeaderData
+
pMeterDataInfo
[
j
]
->
offsetInHeaderFile
);
SCompInfo
*
compInfo
=
(
SCompInfo
*
)(
pHeaderData
+
pMeterDataInfo
[
j
]
->
offsetInHeaderFile
);
int32_t
ret
=
validateCompBlockInfoSegment
(
pQInfo
,
pQueryFileInfo
->
headerF
ilePath
,
pMeterObj
->
vnode
,
compInfo
,
int32_t
ret
=
validateCompBlockInfoSegment
(
pQInfo
,
f
ilePath
,
pMeterObj
->
vnode
,
compInfo
,
pMeterDataInfo
[
j
]
->
offsetInHeaderFile
);
pMeterDataInfo
[
j
]
->
offsetInHeaderFile
);
if
(
ret
!=
0
)
{
if
(
ret
!=
0
)
{
clearMeterDataBlockInfo
(
pMeterDataInfo
[
j
]);
clearMeterDataBlockInfo
(
pMeterDataInfo
[
j
]);
...
@@ -5866,8 +5890,7 @@ uint32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuer
...
@@ -5866,8 +5890,7 @@ uint32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuer
// check compblock integrity
// check compblock integrity
TSCKSUM
checksum
=
*
(
TSCKSUM
*
)((
char
*
)
compInfo
+
sizeof
(
SCompInfo
)
+
size
);
TSCKSUM
checksum
=
*
(
TSCKSUM
*
)((
char
*
)
compInfo
+
sizeof
(
SCompInfo
)
+
size
);
ret
=
validateCompBlockSegment
(
pQInfo
,
pQueryFileInfo
->
headerFilePath
,
compInfo
,
(
char
*
)
pCompBlock
,
ret
=
validateCompBlockSegment
(
pQInfo
,
filePath
,
compInfo
,
(
char
*
)
pCompBlock
,
pMeterObj
->
vnode
,
checksum
);
pMeterObj
->
vnode
,
checksum
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
clearMeterDataBlockInfo
(
pMeterDataInfo
[
j
]);
clearMeterDataBlockInfo
(
pMeterDataInfo
[
j
]);
continue
;
continue
;
...
@@ -6540,7 +6563,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk
...
@@ -6540,7 +6563,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk
int32_t
fileIdx
,
int32_t
slotIdx
,
__block_search_fn_t
searchFn
,
bool
onDemand
)
{
int32_t
fileIdx
,
int32_t
slotIdx
,
__block_search_fn_t
searchFn
,
bool
onDemand
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SMeterObj
*
pMeterObj
=
pRuntimeEnv
->
pMeterObj
;
SMeterObj
*
pMeterObj
=
pRuntimeEnv
->
pMeterObj
;
S
QueryFileInfo
*
pQueryFileInfo
=
&
pRuntimeEnv
->
pVnodeFiles
[
fileIdx
];
S
HeaderFileInfo
*
pQueryFileInfo
=
&
pRuntimeEnv
->
vnodeFileInfo
.
pFileInfo
[
fileIdx
];
TSKEY
*
primaryKeys
=
(
TSKEY
*
)
pRuntimeEnv
->
primaryColBuffer
->
data
;
TSKEY
*
primaryKeys
=
(
TSKEY
*
)
pRuntimeEnv
->
primaryColBuffer
->
data
;
...
@@ -6575,7 +6598,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk
...
@@ -6575,7 +6598,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk
setTimestampRange
(
pRuntimeEnv
,
pBlock
->
keyFirst
,
pBlock
->
keyLast
);
setTimestampRange
(
pRuntimeEnv
,
pBlock
->
keyFirst
,
pBlock
->
keyLast
);
}
else
if
(
req
==
BLK_DATA_FILEDS_NEEDED
)
{
}
else
if
(
req
==
BLK_DATA_FILEDS_NEEDED
)
{
if
(
loadDataBlockFieldsInfo
(
pRuntimeEnv
,
p
QueryFileInfo
,
p
Block
,
pFields
)
<
0
)
{
if
(
loadDataBlockFieldsInfo
(
pRuntimeEnv
,
pBlock
,
pFields
)
<
0
)
{
return
DISK_DATA_LOAD_FAILED
;
return
DISK_DATA_LOAD_FAILED
;
}
}
}
else
{
}
else
{
...
@@ -6584,7 +6607,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk
...
@@ -6584,7 +6607,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk
}
}
}
else
{
}
else
{
_load_all:
_load_all:
if
(
loadDataBlockFieldsInfo
(
pRuntimeEnv
,
p
QueryFileInfo
,
p
Block
,
pFields
)
<
0
)
{
if
(
loadDataBlockFieldsInfo
(
pRuntimeEnv
,
pBlock
,
pFields
)
<
0
)
{
return
DISK_DATA_LOAD_FAILED
;
return
DISK_DATA_LOAD_FAILED
;
}
}
...
...
src/system/detail/src/vnodeQueryProcess.c
浏览文件 @
96ae0ca2
...
@@ -266,7 +266,9 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
...
@@ -266,7 +266,9 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
__block_search_fn_t
searchFn
=
vnodeSearchKeyFunc
[
pTempMeter
->
searchAlgorithm
];
__block_search_fn_t
searchFn
=
vnodeSearchKeyFunc
[
pTempMeter
->
searchAlgorithm
];
int32_t
vnodeId
=
pTempMeter
->
vnode
;
int32_t
vnodeId
=
pTempMeter
->
vnode
;
dTrace
(
"QInfo:%p start to check data blocks in %d files"
,
pQInfo
,
pRuntimeEnv
->
numOfFiles
);
SQueryFilesInfo
*
pVnodeFileInfo
=
&
pRuntimeEnv
->
vnodeFileInfo
;
dTrace
(
"QInfo:%p start to check data blocks in %d files"
,
pQInfo
,
pVnodeFileInfo
->
numOfFiles
);
int32_t
fid
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
-
1
:
INT32_MAX
;
int32_t
fid
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
-
1
:
INT32_MAX
;
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
...
@@ -289,9 +291,8 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
...
@@ -289,9 +291,8 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
pQuery
->
fileId
=
fid
;
pQuery
->
fileId
=
fid
;
pSummary
->
numOfFiles
++
;
pSummary
->
numOfFiles
++
;
SQueryFileInfo
*
pQueryFileInfo
=
&
pRuntimeEnv
->
pVnodeFiles
[
fileIdx
];
char
*
pHeaderFileData
=
vnodeGetHeaderFileData
(
pRuntimeEnv
,
vnodeId
,
fileIdx
);
char
*
pHeaderData
=
vnodeGetHeaderFileData
(
pRuntimeEnv
,
vnodeId
,
fileIdx
);
if
(
pHeaderFileData
==
NULL
)
{
// failed to mmap header file into buffer, ignore current file, try next
if
(
pHeaderData
==
NULL
)
{
// failed to mmap header file into buffer, ignore current file, try next
fid
+=
step
;
fid
+=
step
;
continue
;
continue
;
}
}
...
@@ -308,20 +309,21 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
...
@@ -308,20 +309,21 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
return
NULL
;
return
NULL
;
}
}
dTrace
(
"QInfo:%p file:%s, %d meters qualified"
,
pQInfo
,
p
Query
FileInfo
->
dataFilePath
,
numOfQualifiedMeters
);
dTrace
(
"QInfo:%p file:%s, %d meters qualified"
,
pQInfo
,
p
Vnode
FileInfo
->
dataFilePath
,
numOfQualifiedMeters
);
// none of meters in query set have pHeaderData in this file, try next file
// none of meters in query set have pHeader
File
Data in this file, try next file
if
(
numOfQualifiedMeters
==
0
)
{
if
(
numOfQualifiedMeters
==
0
)
{
fid
+=
step
;
fid
+=
step
;
tfree
(
pReqMeterDataInfo
);
tfree
(
pReqMeterDataInfo
);
continue
;
continue
;
}
}
uint32_t
numOfBlocks
=
getDataBlocksForMeters
(
pSupporter
,
pQuery
,
pHeader
Data
,
numOfQualifiedMeters
,
pQueryFileInfo
,
uint32_t
numOfBlocks
=
getDataBlocksForMeters
(
pSupporter
,
pQuery
,
pHeader
FileData
,
numOfQualifiedMeters
,
pReqMeterDataInfo
);
pVnodeFileInfo
->
headerFilePath
,
pReqMeterDataInfo
);
dTrace
(
"QInfo:%p file:%s, %d meters contains %d blocks to be checked"
,
pQInfo
,
p
Query
FileInfo
->
dataFilePath
,
dTrace
(
"QInfo:%p file:%s, %d meters contains %d blocks to be checked"
,
pQInfo
,
p
Vnode
FileInfo
->
dataFilePath
,
numOfQualifiedMeters
,
numOfBlocks
);
numOfQualifiedMeters
,
numOfBlocks
);
if
(
numOfBlocks
==
0
)
{
if
(
numOfBlocks
==
0
)
{
fid
+=
step
;
fid
+=
step
;
tfree
(
pReqMeterDataInfo
);
tfree
(
pReqMeterDataInfo
);
...
@@ -346,7 +348,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
...
@@ -346,7 +348,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
totalBlocks
+=
numOfBlocks
;
totalBlocks
+=
numOfBlocks
;
// sequentially scan the pHeaderData file
// sequentially scan the pHeader
File
Data file
int32_t
j
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
0
:
numOfBlocks
-
1
;
int32_t
j
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
0
:
numOfBlocks
-
1
;
for
(;
j
<
numOfBlocks
&&
j
>=
0
;
j
+=
step
)
{
for
(;
j
<
numOfBlocks
&&
j
>=
0
;
j
+=
step
)
{
...
@@ -428,7 +430,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
...
@@ -428,7 +430,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
}
}
int64_t
time
=
taosGetTimestampUs
()
-
st
;
int64_t
time
=
taosGetTimestampUs
()
-
st
;
dTrace
(
"QInfo:%p complete check %d files, %d blocks, elapsed time:%.3fms"
,
pQInfo
,
p
RuntimeEnv
->
numOfFiles
,
dTrace
(
"QInfo:%p complete check %d files, %d blocks, elapsed time:%.3fms"
,
pQInfo
,
p
VnodeFileInfo
->
numOfFiles
,
totalBlocks
,
time
/
1000
.
0
);
totalBlocks
,
time
/
1000
.
0
);
pSummary
->
fileTimeUs
+=
time
;
pSummary
->
fileTimeUs
+=
time
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录