Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
689cb598
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
689cb598
编写于
4月 12, 2020
作者:
S
slguan
提交者:
GitHub
4月 12, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1592 from taosdata/feature/query
[td-98] fix bug in descending order query
上级
0edce4a3
ea497f20
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
134 addition
and
113 deletion
+134
-113
src/kit/shell/src/shellEngine.c
src/kit/shell/src/shellEngine.c
+1
-1
src/kit/shell/src/shellMain.c
src/kit/shell/src/shellMain.c
+2
-0
src/vnode/tsdb/inc/tsdbMain.h
src/vnode/tsdb/inc/tsdbMain.h
+3
-2
src/vnode/tsdb/src/tsdbFile.c
src/vnode/tsdb/src/tsdbFile.c
+2
-2
src/vnode/tsdb/src/tsdbRead.c
src/vnode/tsdb/src/tsdbRead.c
+126
-108
未找到文件。
src/kit/shell/src/shellEngine.c
浏览文件 @
689cb598
...
...
@@ -284,7 +284,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
/* Function to do regular expression check */
int
regex_match
(
const
char
*
s
,
const
char
*
reg
,
int
cflags
)
{
regex_t
regex
;
char
msgbuf
[
100
];
char
msgbuf
[
100
]
=
{
0
}
;
/* Compile regular expression */
if
(
regcomp
(
&
regex
,
reg
,
cflags
)
!=
0
)
{
...
...
src/kit/shell/src/shellMain.c
浏览文件 @
689cb598
...
...
@@ -97,6 +97,8 @@ int main(int argc, char* argv[]) {
/* Interupt handler. */
struct
sigaction
act
;
memset
(
&
act
,
0
,
sizeof
(
struct
sigaction
));
act
.
sa_handler
=
interruptHandler
;
sigaction
(
SIGTERM
,
&
act
,
NULL
);
sigaction
(
SIGINT
,
&
act
,
NULL
);
...
...
src/vnode/tsdb/inc/tsdbMain.h
浏览文件 @
689cb598
...
...
@@ -222,8 +222,9 @@ int tsdbOpenFile(SFile *pFile, int oflag);
int
tsdbCloseFile
(
SFile
*
pFile
);
SFileGroup
*
tsdbOpenFilesForCommit
(
STsdbFileH
*
pFileH
,
int
fid
);
int
tsdbRemoveFileGroup
(
STsdbFileH
*
pFile
,
int
fid
);
#define TSDB_FGROUP_ITER_FORWARD 0
#define TSDB_FGROUP_ITER_BACKWARD 1
#define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC
#define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC
typedef
struct
{
int
numOfFGroups
;
SFileGroup
*
base
;
...
...
src/vnode/tsdb/src/tsdbFile.c
浏览文件 @
689cb598
...
...
@@ -154,7 +154,7 @@ void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) {
}
int
flags
=
(
pIter
->
direction
==
TSDB_FGROUP_ITER_FORWARD
)
?
TD_GE
:
TD_LE
;
void
*
ptr
=
taosbsearch
(
&
fid
,
pIter
->
base
,
sizeof
(
SFileGroup
),
pIter
->
numOfFGroups
,
compFGroupKey
,
flags
);
void
*
ptr
=
taosbsearch
(
&
fid
,
pIter
->
base
,
pIter
->
numOfFGroups
,
sizeof
(
SFileGroup
)
,
compFGroupKey
,
flags
);
if
(
ptr
==
NULL
)
{
pIter
->
pFileGroup
=
NULL
;
}
else
{
...
...
@@ -173,7 +173,7 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {
pIter
->
pFileGroup
+=
1
;
}
}
else
{
if
(
pIter
->
pFileGroup
-
1
==
pIter
->
base
)
{
if
(
pIter
->
pFileGroup
==
pIter
->
base
)
{
pIter
->
pFileGroup
=
NULL
;
}
else
{
pIter
->
pFileGroup
-=
1
;
...
...
src/vnode/tsdb/src/tsdbRead.c
浏览文件 @
689cb598
...
...
@@ -27,7 +27,7 @@
#define EXTRA_BYTES 2
#define PRIMARY_TSCOL_REQUIRED(c) (((SColumnInfoData*)taosArrayGet(c, 0))->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)
#define QUERY_IS_ASC_QUERY(o) (o == TSDB_ORDER_ASC)
#define QH_GET_NUM_OF_COLS(handle) (
taosArrayGetSize((handle)->pColumns
))
#define QH_GET_NUM_OF_COLS(handle) (
(size_t)(taosArrayGetSize((handle)->pColumns)
))
enum
{
QUERY_RANGE_LESS_EQUAL
=
0
,
...
...
@@ -87,14 +87,8 @@ typedef struct STableBlockInfo {
int32_t
groupIdx
;
/* number of group is less than the total number of tables */
}
STableBlockInfo
;
enum
{
SINGLE_TABLE_MODEL
=
1
,
MULTI_TABLE_MODEL
=
2
,
};
typedef
struct
STsdbQueryHandle
{
STsdbRepo
*
pTsdb
;
int8_t
model
;
// access model, single table model or multi-table model
SQueryFilePos
cur
;
// current position
SQueryFilePos
start
;
// the start position, used for secondary/third iteration
...
...
@@ -128,21 +122,6 @@ typedef struct STsdbQueryHandle {
SCompIdx
*
compIndex
;
}
STsdbQueryHandle
;
int32_t
doAllocateBuf
(
STsdbQueryHandle
*
pQueryHandle
,
int32_t
rowsPerFileBlock
)
{
// record the maximum column width among columns of this meter/metric
SColumnInfoData
*
pColumn
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
0
);
int32_t
maxColWidth
=
pColumn
->
info
.
bytes
;
for
(
int32_t
i
=
1
;
i
<
QH_GET_NUM_OF_COLS
(
pQueryHandle
);
++
i
)
{
int32_t
bytes
=
pColumn
[
i
].
info
.
bytes
;
if
(
bytes
>
maxColWidth
)
{
maxColWidth
=
bytes
;
}
}
return
TSDB_CODE_SUCCESS
;
}
static
void
tsdbInitDataBlockLoadInfo
(
SDataBlockLoadInfo
*
pBlockLoadInfo
)
{
pBlockLoadInfo
->
slot
=
-
1
;
pBlockLoadInfo
->
sid
=
-
1
;
...
...
@@ -161,9 +140,9 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond
// todo 2. add the reference count for each table that is involved in query
STsdbQueryHandle
*
pQueryHandle
=
calloc
(
1
,
sizeof
(
STsdbQueryHandle
));
pQueryHandle
->
order
=
pCond
->
order
;
pQueryHandle
->
order
=
pCond
->
order
;
pQueryHandle
->
window
=
pCond
->
twindow
;
pQueryHandle
->
pTsdb
=
tsdb
;
pQueryHandle
->
pTsdb
=
tsdb
;
pQueryHandle
->
compIndex
=
calloc
(
10000
,
sizeof
(
SCompIdx
)),
pQueryHandle
->
loadDataAfterSeek
=
false
;
...
...
@@ -174,25 +153,33 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond
pQueryHandle
->
pTableCheckInfo
=
taosArrayInit
(
size
,
sizeof
(
STableCheckInfo
));
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
STableId
id
=
*
(
STableId
*
)
taosArrayGet
(
idList
,
i
);
STableId
id
=
*
(
STableId
*
)
taosArrayGet
(
idList
,
i
);
STable
*
pTable
=
tsdbGetTableByUid
(
tsdbGetMeta
(
tsdb
),
id
.
uid
);
if
(
pTable
==
NULL
)
{
dError
(
"%p failed to get table, error uid:%"
PRIu64
,
pQueryHandle
,
id
.
uid
);
continue
;
}
STableCheckInfo
info
=
{
.
lastKey
=
pQueryHandle
->
window
.
skey
,
.
tableId
=
id
,
.
pTableObj
=
tsdbGetTableByUid
(
tsdbGetMeta
(
tsdb
),
id
.
uid
),
// todo this may be failed
.
pCompInfo
=
NULL
,
.
lastKey
=
pQueryHandle
->
window
.
skey
,
.
tableId
=
id
,
.
pTableObj
=
pTable
,
};
assert
(
info
.
pTableObj
!=
NULL
);
taosArrayPush
(
pQueryHandle
->
pTableCheckInfo
,
&
info
);
}
pQueryHandle
->
model
=
(
size
>
1
)
?
MULTI_TABLE_MODEL
:
SINGLE_TABLE_MODEL
;
pQueryHandle
->
checkFiles
=
1
;
dTrace
(
"%p total numOfTable:%d in query"
,
pQueryHandle
,
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
));
/*
* For ascending timestamp order query, query starts from data files. In contrast, buffer will be checked in the first place
* in case of descending timestamp order query.
*/
pQueryHandle
->
checkFiles
=
QUERY_IS_ASC_QUERY
(
pQueryHandle
->
order
);
pQueryHandle
->
activeIndex
=
0
;
//
malloc buffer in order to load data
from file
//
allocate buffer in order to load data blocks
from file
int32_t
numOfCols
=
taosArrayGetSize
(
pColumnInfo
);
size_t
bufferCapacity
=
4096
;
...
...
@@ -206,10 +193,6 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond
taosArrayPush
(
pQueryHandle
->
pColumns
,
&
pDest
);
}
if
(
doAllocateBuf
(
pQueryHandle
,
bufferCapacity
)
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
tsdbInitDataBlockLoadInfo
(
&
pQueryHandle
->
dataBlockLoadInfo
);
tsdbInitCompBlockLoadInfo
(
&
pQueryHandle
->
compBlockLoadInfo
);
...
...
@@ -239,7 +222,12 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
// todo dynamic get the daysperfile
static
int32_t
getFileIdFromKey
(
TSKEY
key
)
{
return
(
int32_t
)(
key
/
10
);
// set the starting fileId
int64_t
fid
=
(
int64_t
)(
key
/
10
);
// set the starting fileId
if
(
fid
>
INT32_MAX
)
{
fid
=
INT32_MAX
;
}
return
fid
;
}
static
int32_t
binarySearchForBlockImpl
(
SCompBlock
*
pBlock
,
int32_t
numOfBlocks
,
TSKEY
skey
,
int32_t
order
);
...
...
@@ -247,8 +235,12 @@ static int32_t binarySearchForBlockImpl(SCompBlock* pBlock, int32_t numOfBlocks,
static
int32_t
getFileCompInfo
(
STsdbQueryHandle
*
pQueryHandle
,
int32_t
*
numOfBlocks
,
int32_t
type
)
{
// todo check open file failed
SFileGroup
*
fileGroup
=
pQueryHandle
->
pFileGroup
;
assert
(
fileGroup
->
files
[
TSDB_FILE_TYPE_HEAD
].
fname
>
0
);
if
(
fileGroup
->
files
[
TSDB_FILE_TYPE_HEAD
].
fd
==
FD_INITIALIZER
)
{
fileGroup
->
files
[
TSDB_FILE_TYPE_HEAD
].
fd
=
open
(
fileGroup
->
files
[
TSDB_FILE_TYPE_HEAD
].
fname
,
O_RDONLY
);
}
else
{
assert
(
FD_VALID
(
fileGroup
->
files
[
TSDB_FILE_TYPE_HEAD
].
fd
));
}
// load all the comp offset value for all tables in this file
...
...
@@ -262,7 +254,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
SCompIdx
*
compIndex
=
&
pQueryHandle
->
compIndex
[
pCheckInfo
->
tableId
.
tid
];
if
(
compIndex
->
len
==
0
||
compIndex
->
numOfSuperBlocks
==
0
)
{
// no data block in this file, try next file
assert
(
0
);
}
else
{
if
(
pCheckInfo
->
compSize
<
compIndex
->
len
)
{
assert
(
compIndex
->
len
>
0
);
...
...
@@ -271,46 +263,35 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
assert
(
t
!=
NULL
);
pCheckInfo
->
pCompInfo
=
(
SCompInfo
*
)
t
;
pCheckInfo
->
compSize
=
compIndex
->
len
;
}
tsdbLoadCompBlocks
(
fileGroup
,
compIndex
,
pCheckInfo
->
pCompInfo
);
int32_t
index
=
0
;
SCompInfo
*
pCompInfo
=
pCheckInfo
->
pCompInfo
;
TSKEY
s
=
MIN
(
pCheckInfo
->
lastKey
,
pQueryHandle
->
window
.
ekey
);
TSKEY
e
=
MAX
(
pCheckInfo
->
lastKey
,
pQueryHandle
->
window
.
ekey
);
// discard the unqualified data block based on the query time window
int32_t
start
=
binarySearchForBlockImpl
(
pCheckInfo
->
pCompInfo
->
blocks
,
compIndex
->
numOfSuperBlocks
,
pQueryHandle
->
order
,
pCheckInfo
->
lastKey
);
if
(
type
==
QUERY_RANGE_GREATER_EQUAL
)
{
if
(
pCheckInfo
->
lastKey
<=
pCheckInfo
->
pCompInfo
->
blocks
[
start
].
keyLast
)
{
// break;
}
else
{
index
=
-
1
;
}
}
else
{
if
(
pCheckInfo
->
lastKey
>=
pCheckInfo
->
pCompInfo
->
blocks
[
start
].
keyFirst
)
{
// break;
}
else
{
index
=
-
1
;
}
}
// not found in data blocks in current file
if
(
index
==
-
1
)
{
int32_t
start
=
binarySearchForBlockImpl
(
pCompInfo
->
blocks
,
compIndex
->
numOfSuperBlocks
,
s
,
TSDB_ORDER_ASC
);
int32_t
end
=
start
;
if
(
s
>
pCompInfo
->
blocks
[
start
].
keyLast
)
{
continue
;
}
// todo speedup the procedure of locating end block
int32_t
e
=
start
;
while
(
e
<
compIndex
->
numOfSuperBlocks
&&
(
pCheckInfo
->
pCompInfo
->
blocks
[
e
].
keyFirst
<=
pQueryHandle
->
window
.
ekey
))
{
e
+=
1
;
// todo speedup the procedure of located end block
while
(
end
<
compIndex
->
numOfSuperBlocks
&&
(
pCompInfo
->
blocks
[
end
].
keyFirst
<=
e
))
{
end
+=
1
;
}
pCheckInfo
->
numOfBlocks
=
(
end
-
start
);
if
(
start
>
0
)
{
memmove
(
pC
heckInfo
->
pCompInfo
->
blocks
,
&
pCheckInfo
->
pCompInfo
->
blocks
[
start
],
(
e
-
start
)
*
sizeof
(
SCompBlock
));
memmove
(
pC
ompInfo
->
blocks
,
&
pCompInfo
->
blocks
[
start
],
pCheckInfo
->
numOfBlocks
*
sizeof
(
SCompBlock
));
}
pCheckInfo
->
numOfBlocks
=
(
e
-
start
);
(
*
numOfBlocks
)
+=
pCheckInfo
->
numOfBlocks
;
}
}
...
...
@@ -413,7 +394,6 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
}
SDataCols
*
pDataCols
=
pCheckInfo
->
pDataCols
;
if
(
pCheckInfo
->
lastKey
>
pBlock
->
keyFirst
)
{
cur
->
pos
=
binarySearchForKey
(
pDataCols
->
cols
[
0
].
pData
,
pBlock
->
numOfPoints
,
pCheckInfo
->
lastKey
,
pQueryHandle
->
order
);
...
...
@@ -425,8 +405,24 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
}
else
{
// the whole block is loaded in to buffer
pQueryHandle
->
realNumOfRows
=
pBlock
->
numOfPoints
;
}
}
else
{
// todo desc query
}
else
{
// query ended in current block
if
(
pQueryHandle
->
window
.
ekey
>
pBlock
->
keyFirst
)
{
if
(
!
doLoadFileDataBlock
(
pQueryHandle
,
pBlock
,
pCheckInfo
))
{
return
false
;
}
SDataCols
*
pDataCols
=
pCheckInfo
->
pDataCols
;
if
(
pCheckInfo
->
lastKey
<
pBlock
->
keyLast
)
{
cur
->
pos
=
binarySearchForKey
(
pDataCols
->
cols
[
0
].
pData
,
pBlock
->
numOfPoints
,
pCheckInfo
->
lastKey
,
pQueryHandle
->
order
);
}
else
{
cur
->
pos
=
pBlock
->
numOfPoints
-
1
;
}
filterDataInDataBlock
(
pQueryHandle
,
pCheckInfo
,
pBlock
,
sa
);
}
else
{
pQueryHandle
->
realNumOfRows
=
pBlock
->
numOfPoints
;
}
}
...
...
@@ -619,7 +615,9 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf
}
static
SArray
*
getColumnIdList
(
STsdbQueryHandle
*
pQueryHandle
)
{
int32_t
numOfCols
=
QH_GET_NUM_OF_COLS
(
pQueryHandle
);
size_t
numOfCols
=
QH_GET_NUM_OF_COLS
(
pQueryHandle
);
assert
(
numOfCols
<=
TSDB_MAX_COLUMNS
);
SArray
*
pIdList
=
taosArrayInit
(
numOfCols
,
sizeof
(
int16_t
));
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
pCol
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
i
);
...
...
@@ -923,8 +921,8 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
pQueryHandle
->
locateStart
=
true
;
int32_t
fid
=
getFileIdFromKey
(
pQueryHandle
->
window
.
skey
);
tsdbInitFileGroupIter
(
pFileHandle
,
&
pQueryHandle
->
fileIter
,
TSDB_FGROUP_ITER_FORWARD
);
tsdbInitFileGroupIter
(
pFileHandle
,
&
pQueryHandle
->
fileIter
,
pQueryHandle
->
order
);
tsdbSeekFileGroupIter
(
&
pQueryHandle
->
fileIter
,
fid
);
int32_t
numOfBlocks
=
-
1
;
...
...
@@ -934,13 +932,14 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
int32_t
numOfTables
=
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
);
while
((
pQueryHandle
->
pFileGroup
=
tsdbGetFileGroupNext
(
&
pQueryHandle
->
fileIter
))
!=
NULL
)
{
if
(
getFileCompInfo
(
pQueryHandle
,
&
numOfBlocks
,
1
)
!=
TSDB_CODE_SUCCESS
)
{
int32_t
type
=
QUERY_IS_ASC_QUERY
(
pQueryHandle
->
order
)
?
QUERY_RANGE_GREATER_EQUAL
:
QUERY_RANGE_LESS_EQUAL
;
if
(
getFileCompInfo
(
pQueryHandle
,
&
numOfBlocks
,
type
)
!=
TSDB_CODE_SUCCESS
)
{
break
;
}
assert
(
numOfBlocks
>=
0
);
dTrace
(
"%p %d blocks found in file for %d table(s), fid:%d"
,
pQueryHandle
,
numOfBlocks
,
pQueryHandle
->
pFileGroup
->
fileId
,
numOfTables
);
numOfTables
,
pQueryHandle
->
pFileGroup
->
fileId
);
// todo return error code to query engine
if
(
createDataBlocksInfo
(
pQueryHandle
,
numOfBlocks
,
&
pQueryHandle
->
numOfBlocks
)
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -961,7 +960,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
return
false
;
}
cur
->
slot
=
0
;
cur
->
slot
=
QUERY_IS_ASC_QUERY
(
pQueryHandle
->
order
)
?
0
:
pQueryHandle
->
numOfBlocks
-
1
;
cur
->
fid
=
pQueryHandle
->
pFileGroup
->
fileId
;
STableBlockInfo
*
pBlockInfo
=
&
pQueryHandle
->
pDataBlockInfo
[
cur
->
slot
];
...
...
@@ -970,8 +969,10 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
return
loadFileDataBlock
(
pQueryHandle
,
pBlock
,
pCheckInfo
);
}
else
{
if
(
cur
->
slot
==
pQueryHandle
->
numOfBlocks
-
1
)
{
// all blocks
if
((
cur
->
slot
==
pQueryHandle
->
numOfBlocks
-
1
&&
QUERY_IS_ASC_QUERY
(
pQueryHandle
->
order
))
||
(
cur
->
slot
==
0
&&
!
QUERY_IS_ASC_QUERY
(
pQueryHandle
->
order
)))
{
// all blocks
int32_t
numOfBlocks
=
-
1
;
int32_t
numOfTables
=
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
);
pQueryHandle
->
numOfBlocks
=
0
;
...
...
@@ -1001,67 +1002,84 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
cur
->
fid
=
-
1
;
return
false
;
}
cur
->
slot
=
0
;
cur
->
slot
=
QUERY_IS_ASC_QUERY
(
pQueryHandle
->
order
)
?
0
:
pQueryHandle
->
numOfBlocks
-
1
;
cur
->
fid
=
pQueryHandle
->
pFileGroup
->
fileId
;
STableBlockInfo
*
pBlockInfo
=
&
pQueryHandle
->
pDataBlockInfo
[
cur
->
slot
];
STableCheckInfo
*
pCheckInfo
=
pBlockInfo
->
pTableCheckInfo
;
SCompBlock
*
pBlock
=
pBlockInfo
->
pBlock
.
compBlock
;
return
loadFileDataBlock
(
pQueryHandle
,
pBlock
,
pCheckInfo
);
}
else
{
// next block of the same file
cur
->
slot
+=
1
;
cur
->
pos
=
0
;
int32_t
step
=
QUERY_IS_ASC_QUERY
(
pQueryHandle
->
order
)
?
1
:-
1
;
cur
->
slot
+=
step
;
STableBlockInfo
*
pBlockInfo
=
&
pQueryHandle
->
pDataBlockInfo
[
cur
->
slot
];
if
(
QUERY_IS_ASC_QUERY
(
pQueryHandle
->
order
))
{
cur
->
pos
=
0
;
}
else
{
cur
->
pos
=
pBlockInfo
->
pBlock
.
compBlock
->
numOfPoints
-
1
;
}
return
loadFileDataBlock
(
pQueryHandle
,
pBlockInfo
->
pBlock
.
compBlock
,
pBlockInfo
->
pTableCheckInfo
);
}
}
}
// handle data in cache situation
bool
tsdbNextDataBlock
(
tsdb_query_handle_t
*
pQueryHandle
)
{
STsdbQueryHandle
*
pHandle
=
(
STsdbQueryHandle
*
)
pQueryHandle
;
size_t
numOfTables
=
taosArrayGetSize
(
pHandle
->
pTableCheckInfo
);
static
bool
doHasDataInBuffer
(
STsdbQueryHandle
*
pQueryHandle
)
{
size_t
numOfTables
=
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
);
// todo add assert, the value of numOfTables should be less than the maximum value for each vnode capacity
assert
(
numOfTables
>
0
);
if
(
pHandle
->
checkFi
les
)
{
if
(
getDataBlocksInFiles
(
p
Handle
))
{
while
(
pQueryHandle
->
activeIndex
<
numOfTab
les
)
{
if
(
hasMoreDataInCache
(
pQuery
Handle
))
{
return
true
;
}
pHandle
->
activeIndex
=
0
;
pHandle
->
checkFiles
=
0
;
while
(
pHandle
->
activeIndex
<
numOfTables
)
{
if
(
hasMoreDataInCache
(
pHandle
))
{
pQueryHandle
->
activeIndex
+=
1
;
}
return
false
;
}
// handle data in cache situation
bool
tsdbNextDataBlock
(
tsdb_query_handle_t
*
pqHandle
)
{
STsdbQueryHandle
*
pQueryHandle
=
(
STsdbQueryHandle
*
)
pqHandle
;
size_t
numOfTables
=
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
);
assert
(
numOfTables
>
0
);
if
(
QUERY_IS_ASC_QUERY
(
pQueryHandle
->
order
))
{
if
(
pQueryHandle
->
checkFiles
)
{
if
(
getDataBlocksInFiles
(
pQueryHandle
))
{
return
true
;
}
pHandle
->
activeIndex
+=
1
;
pQueryHandle
->
activeIndex
=
0
;
pQueryHandle
->
checkFiles
=
false
;
}
return
false
;
}
else
{
while
(
pHandle
->
activeIndex
<
numOfTab
les
)
{
if
(
hasMoreDataInCache
(
p
Handle
))
{
return
doHasDataInBuffer
(
pQueryHandle
)
;
}
else
{
// starts from the buffer in case of descending timestamp order check data blocks
if
(
!
pQueryHandle
->
checkFi
les
)
{
if
(
doHasDataInBuffer
(
pQuery
Handle
))
{
return
true
;
}
p
Handle
->
activeIndex
+=
1
;
p
QueryHandle
->
checkFiles
=
true
;
}
return
false
;
return
getDataBlocksInFiles
(
pQueryHandle
)
;
}
}
static
int
tsdbReadRowsFromCache
(
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
TSKEY
*
skey
,
TSKEY
*
ekey
,
STsdbQueryHandle
*
pHandle
)
{
STsdbQueryHandle
*
p
Query
Handle
)
{
int
numOfRows
=
0
;
int32_t
numOfCols
=
taosArrayGetSize
(
pHandle
->
pColumns
);
int32_t
numOfCols
=
taosArrayGetSize
(
p
Query
Handle
->
pColumns
);
*
skey
=
INT64_MIN
;
while
(
tSkipListIterNext
(
pIter
))
{
...
...
@@ -1079,7 +1097,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max
int32_t
offset
=
0
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pHandle
->
pColumns
,
i
);
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
p
Query
Handle
->
pColumns
,
i
);
memcpy
(
pColInfo
->
pData
+
numOfRows
*
pColInfo
->
info
.
bytes
,
dataRowTuple
(
row
)
+
offset
,
pColInfo
->
info
.
bytes
);
offset
+=
pColInfo
->
info
.
bytes
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录