Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
1bf6d523
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
1bf6d523
编写于
3月 29, 2020
作者:
H
hjxilinx
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-32] fix bugs in check data in file block
上级
93e38711
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
369 addition
and
13 deletion
+369
-13
src/vnode/tsdb/inc/tsdbFile.h
src/vnode/tsdb/inc/tsdbFile.h
+3
-0
src/vnode/tsdb/src/tsdbMain.c
src/vnode/tsdb/src/tsdbMain.c
+5
-0
src/vnode/tsdb/src/tsdbRead.c
src/vnode/tsdb/src/tsdbRead.c
+361
-13
未找到文件。
src/vnode/tsdb/inc/tsdbFile.h
浏览文件 @
1bf6d523
...
...
@@ -20,6 +20,7 @@
#include "dataformat.h"
#include "taosdef.h"
#include "tglobalcfg.h"
#include "tsdb.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -148,6 +149,8 @@ typedef struct {
SCompCol
cols
[];
}
SCompData
;
STsdbFileH
*
tsdbGetFile
(
tsdb_repo_t
*
pRepo
);
int
tsdbCopyBlockDataInFile
(
SFile
*
pOutFile
,
SFile
*
pInFile
,
SCompInfo
*
pCompInfo
,
int
idx
,
int
isLast
,
SDataCols
*
pCols
);
int
tsdbLoadCompIdx
(
SFileGroup
*
pGroup
,
void
*
buf
,
int
maxTables
);
...
...
src/vnode/tsdb/src/tsdbMain.c
浏览文件 @
1bf6d523
...
...
@@ -551,6 +551,11 @@ STsdbMeta* tsdbGetMeta(tsdb_repo_t* pRepo) {
return
tsdb
->
tsdbMeta
;
}
STsdbFileH
*
tsdbGetFile
(
tsdb_repo_t
*
pRepo
)
{
STsdbRepo
*
tsdb
=
(
STsdbRepo
*
)
pRepo
;
return
tsdb
->
tsdbFileH
;
}
// Check the configuration and set default options
static
int32_t
tsdbCheckAndSetDefaultCfg
(
STsdbCfg
*
pCfg
)
{
// Check precision
...
...
src/vnode/tsdb/src/tsdbRead.c
浏览文件 @
1bf6d523
...
...
@@ -13,8 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <tlog.h>
#include "os.h"
#include "tlog.h"
#include "tutil.h"
#include "../../../query/inc/qast.h"
...
...
@@ -28,6 +29,11 @@
#define QUERY_IS_ASC_QUERY(o) (o == TSQL_SO_ASC)
#define QH_GET_NUM_OF_COLS(handle) (taosArrayGetSize((handle)->pColumns))
enum
{
QUERY_RANGE_LESS_EQUAL
=
0
,
QUERY_RANGE_GREATER_EQUAL
=
1
,
};
typedef
struct
SField
{
// todo need the definition
}
SField
;
...
...
@@ -36,12 +42,12 @@ typedef struct SHeaderFileInfo {
int32_t
fileId
;
}
SHeaderFileInfo
;
typedef
struct
SQuery
Hand
lePos
{
int32_t
fi
leI
d
;
typedef
struct
SQuery
Fi
lePos
{
int32_t
fid
;
int32_t
slot
;
int32_t
pos
;
int
32_t
fileIndex
;
}
SQuery
Hand
lePos
;
int
64_t
lastKey
;
}
SQuery
Fi
lePos
;
typedef
struct
SDataBlockLoadInfo
{
int32_t
fileListIndex
;
...
...
@@ -78,8 +84,12 @@ typedef struct STableCheckInfo {
TSKEY
lastKey
;
STable
*
pTableObj
;
int64_t
offsetInHeaderFile
;
int32_t
numOfBlocks
;
//
int32_t numOfBlocks;
int32_t
start
;
bool
checkFirstFileBlock
;
SCompIdx
*
compIndex
;
SCompBlock
*
pBlock
;
SSkipListIterator
*
iter
;
}
STableCheckInfo
;
...
...
@@ -104,8 +114,8 @@ enum {
typedef
struct
STsdbQueryHandle
{
struct
STsdbRepo
*
pTsdb
;
int8_t
model
;
// access model, single table model or multi-table model
SQuery
Hand
lePos
cur
;
// current position
SQuery
Hand
lePos
start
;
// the start position, used for secondary/third iteration
SQuery
Fi
lePos
cur
;
// current position
SQuery
Fi
lePos
start
;
// the start position, used for secondary/third iteration
int32_t
unzipBufSize
;
char
*
unzipBuffer
;
char
*
secondaryUnzipBuffer
;
...
...
@@ -342,6 +352,344 @@ static bool hasMoreDataInCacheForSingleModel(STsdbQueryHandle* pHandle) {
return
true
;
}
// todo dynamic get the daysperfile
static
int32_t
getFileIdFromKey
(
TSKEY
key
)
{
return
(
int32_t
)(
key
/
10
);
// set the starting fileId
}
static
int32_t
getFileCompInfo
(
STableCheckInfo
*
pCheckInfo
,
SFileGroup
*
fileGroup
)
{
tsdbLoadCompIdx
(
fileGroup
,
pCheckInfo
->
compIndex
,
10000
);
// todo set dynamic max tables
SCompIdx
*
compIndex
=
&
pCheckInfo
->
compIndex
[
pCheckInfo
->
tableId
.
tid
];
if
(
compIndex
->
len
==
0
||
compIndex
->
numOfSuperBlocks
==
0
)
{
// no data block in this file, try next file
}
else
{
tsdbLoadCompBlocks
(
fileGroup
,
compIndex
,
pCheckInfo
->
pBlock
);
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
binarySearchForBlockImpl
(
SCompBlock
*
pBlock
,
int32_t
numOfBlocks
,
TSKEY
skey
,
int32_t
order
)
{
int32_t
firstSlot
=
0
;
int32_t
lastSlot
=
numOfBlocks
-
1
;
int32_t
midSlot
=
firstSlot
;
while
(
1
)
{
numOfBlocks
=
lastSlot
-
firstSlot
+
1
;
midSlot
=
(
firstSlot
+
(
numOfBlocks
>>
1
));
if
(
numOfBlocks
==
1
)
break
;
if
(
skey
>
pBlock
[
midSlot
].
keyLast
)
{
if
(
numOfBlocks
==
2
)
break
;
if
((
order
==
TSQL_SO_DESC
)
&&
(
skey
<
pBlock
[
midSlot
+
1
].
keyFirst
))
break
;
firstSlot
=
midSlot
+
1
;
}
else
if
(
skey
<
pBlock
[
midSlot
].
keyFirst
)
{
if
((
order
==
TSQL_SO_ASC
)
&&
(
skey
>
pBlock
[
midSlot
-
1
].
keyLast
))
break
;
lastSlot
=
midSlot
-
1
;
}
else
{
break
;
// got the slot
}
}
return
midSlot
;
}
static
SDataBlockInfo
getTrueBlockInfo
(
STsdbQueryHandle
*
pHandle
,
STableCheckInfo
*
pCheckInfo
)
{
SDataBlockInfo
info
=
{{
0
},
0
};
SCompBlock
*
pDiskBlock
=
&
pCheckInfo
->
pBlock
[
pHandle
->
cur
.
slot
];
info
.
window
.
skey
=
pDiskBlock
->
keyFirst
;
info
.
window
.
ekey
=
pDiskBlock
->
keyLast
;
info
.
size
=
pDiskBlock
->
numOfPoints
;
info
.
numOfCols
=
pDiskBlock
->
numOfCols
;
return
info
;
}
bool
moveToNextBlock
(
STsdbQueryHandle
*
pQueryHandle
,
int32_t
step
)
{
SQueryFilePos
*
cur
=
&
pQueryHandle
->
cur
;
if
(
pQueryHandle
->
cur
.
fid
>=
0
)
{
int32_t
fileIndex
=
-
1
;
/*
* 1. ascending order. The last data block of data file
* 2. descending order. The first block of file
*/
if
((
step
==
QUERY_ASC_FORWARD_STEP
&&
(
pQueryHandle
->
cur
.
slot
==
pQueryHandle
->
numOfBlocks
-
1
))
||
(
step
==
QUERY_DESC_FORWARD_STEP
&&
(
pQueryHandle
->
cur
.
slot
==
0
)))
{
// temporarily keep the position value, in case of no data qualified when move forwards(backwards)
SQueryFilePos
save
=
pQueryHandle
->
cur
;
// fileIndex = getNextDataFileCompInfo_(pQueryHandle, &pQueryHandle->cur, &pQueryHandle->vnodeFileInfo, step);
// first data block in the next file
if
(
fileIndex
>=
0
)
{
cur
->
slot
=
(
step
==
QUERY_ASC_FORWARD_STEP
)
?
0
:
pQueryHandle
->
numOfBlocks
-
1
;
cur
->
pos
=
(
step
==
QUERY_ASC_FORWARD_STEP
)
?
0
:
pQueryHandle
->
pBlock
[
cur
->
slot
].
numOfPoints
-
1
;
// return loadQaulifiedData(pQueryHandle);
}
else
{
// try data in cache
assert
(
cur
->
fid
==
-
1
);
if
(
step
==
QUERY_ASC_FORWARD_STEP
)
{
// TSKEY nextTimestamp =
// getQueryStartPositionInCache_rv(pQueryHandle, &pQueryHandle->cur.slot, &pQueryHandle->cur.pos, true);
// if (nextTimestamp < 0) {
// pQueryHandle->cur = save;
// }
// return (nextTimestamp > 0);
}
// no data to check for desc order query, restore the saved position value
pQueryHandle
->
cur
=
save
;
return
false
;
}
}
// next block in the same file
int32_t
fid
=
cur
->
fid
;
// fileIndex = vnodeGetVnodeHeaderFileIndex(&fid, pQueryHandle->order, &pQueryHandle->vnodeFileInfo);
cur
->
slot
+=
step
;
SCompBlock
*
pBlock
=
&
pQueryHandle
->
pBlock
[
cur
->
slot
];
cur
->
pos
=
(
step
==
QUERY_ASC_FORWARD_STEP
)
?
0
:
pBlock
->
numOfPoints
-
1
;
// return loadQaulifiedData(pQueryHandle);
}
else
{
// data in cache
// todo continue;
}
}
int
vnodeBinarySearchKey
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
)
{
int
firstPos
,
lastPos
,
midPos
=
-
1
;
int
numOfPoints
;
TSKEY
*
keyList
;
if
(
num
<=
0
)
return
-
1
;
keyList
=
(
TSKEY
*
)
pValue
;
firstPos
=
0
;
lastPos
=
num
-
1
;
if
(
order
==
0
)
{
// find the first position which is smaller than the key
while
(
1
)
{
if
(
key
>=
keyList
[
lastPos
])
return
lastPos
;
if
(
key
==
keyList
[
firstPos
])
return
firstPos
;
if
(
key
<
keyList
[
firstPos
])
return
firstPos
-
1
;
numOfPoints
=
lastPos
-
firstPos
+
1
;
midPos
=
(
numOfPoints
>>
1
)
+
firstPos
;
if
(
key
<
keyList
[
midPos
])
{
lastPos
=
midPos
-
1
;
}
else
if
(
key
>
keyList
[
midPos
])
{
firstPos
=
midPos
+
1
;
}
else
{
break
;
}
}
}
else
{
// find the first position which is bigger than the key
while
(
1
)
{
if
(
key
<=
keyList
[
firstPos
])
return
firstPos
;
if
(
key
==
keyList
[
lastPos
])
return
lastPos
;
if
(
key
>
keyList
[
lastPos
])
{
lastPos
=
lastPos
+
1
;
if
(
lastPos
>=
num
)
return
-
1
;
else
return
lastPos
;
}
numOfPoints
=
lastPos
-
firstPos
+
1
;
midPos
=
(
numOfPoints
>>
1
)
+
firstPos
;
if
(
key
<
keyList
[
midPos
])
{
lastPos
=
midPos
-
1
;
}
else
if
(
key
>
keyList
[
midPos
])
{
firstPos
=
midPos
+
1
;
}
else
{
break
;
}
}
}
return
midPos
;
}
static
void
filterDataInDataBlock
(
STsdbQueryHandle
*
pQueryHandle
,
SArray
*
sa
)
{
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
SQueryFilePos
*
cur
=
&
pQueryHandle
->
cur
;
STableCheckInfo
*
pCheckInfo
=
taosArrayGet
(
pQueryHandle
->
pTableCheckInfo
,
pQueryHandle
->
activeIndex
);
SDataBlockInfo
blockInfo
=
getTrueBlockInfo
(
pQueryHandle
,
pCheckInfo
);
int32_t
endPos
=
cur
->
pos
;
if
(
QUERY_IS_ASC_QUERY
(
pQueryHandle
->
order
)
&&
pQueryHandle
->
window
.
ekey
>
blockInfo
.
window
.
ekey
)
{
endPos
=
blockInfo
.
size
-
1
;
pQueryHandle
->
realNumOfRows
=
endPos
-
cur
->
pos
+
1
;
}
else
if
(
!
QUERY_IS_ASC_QUERY
(
pQueryHandle
->
order
)
&&
pQueryHandle
->
window
.
ekey
<
blockInfo
.
window
.
skey
)
{
endPos
=
0
;
pQueryHandle
->
realNumOfRows
=
cur
->
pos
+
1
;
}
else
{
// endPos = vnodeBinarySearchKey(pQueryHandle->tsBuf->data, blockInfo.size, pQueryHandle->window.ekey, pQueryHandle->order);
if
(
QUERY_IS_ASC_QUERY
(
pQueryHandle
->
order
))
{
if
(
endPos
<
cur
->
pos
)
{
pQueryHandle
->
realNumOfRows
=
0
;
return
;
}
else
{
pQueryHandle
->
realNumOfRows
=
endPos
-
cur
->
pos
;
}
}
else
{
if
(
endPos
>
cur
->
pos
)
{
pQueryHandle
->
realNumOfRows
=
0
;
return
;
}
else
{
pQueryHandle
->
realNumOfRows
=
cur
->
pos
-
endPos
;
}
}
}
int32_t
start
=
MIN
(
cur
->
pos
,
endPos
);
// move the data block in the front to data block if needed
if
(
start
!=
0
)
{
int32_t
numOfCols
=
QH_GET_NUM_OF_COLS
(
pQueryHandle
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
sa
);
++
i
)
{
int16_t
colId
=
*
(
int16_t
*
)
taosArrayGet
(
sa
,
i
);
for
(
int32_t
j
=
0
;
j
<
numOfCols
;
++
j
)
{
SColumnInfoEx
*
pCol
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
j
);
if
(
pCol
->
info
.
colId
==
colId
)
{
memmove
(
pCol
->
pData
,
((
char
*
)
pCol
->
pData
)
+
pCol
->
info
.
bytes
*
start
,
pQueryHandle
->
realNumOfRows
*
pCol
->
info
.
bytes
);
break
;
}
}
}
}
assert
(
pQueryHandle
->
realNumOfRows
<=
blockInfo
.
size
);
// forward(backward) the position for cursor
cur
->
pos
=
endPos
;
}
static
bool
getQualifiedDataBlock
(
STsdbQueryHandle
*
pQueryHandle
,
STableCheckInfo
*
pCheckInfo
,
int32_t
type
)
{
STsdbFileH
*
pFileHandle
=
tsdbGetFile
(
pQueryHandle
->
pTsdb
);
int32_t
fid
=
getFileIdFromKey
(
pCheckInfo
->
lastKey
);
SFileGroup
*
fileGroup
=
tsdbSearchFGroup
(
pFileHandle
,
fid
);
pCheckInfo
->
checkFirstFileBlock
=
true
;
SQueryFilePos
*
cur
=
&
pQueryHandle
->
cur
;
TSKEY
key
=
pCheckInfo
->
lastKey
;
int32_t
index
=
-
1
;
// todo add iterator for filegroup
while
(
1
)
{
if
((
fid
=
getFileCompInfo
(
pCheckInfo
,
fileGroup
))
<
0
)
{
break
;
}
int32_t
tid
=
pCheckInfo
->
tableId
.
tid
;
index
=
binarySearchForBlockImpl
(
pCheckInfo
->
pBlock
,
pCheckInfo
->
compIndex
[
tid
].
numOfSuperBlocks
,
pQueryHandle
->
order
,
key
);
if
(
type
==
QUERY_RANGE_GREATER_EQUAL
)
{
if
(
key
<=
pCheckInfo
->
pBlock
[
index
].
keyLast
)
{
break
;
}
else
{
index
=
-
1
;
}
}
else
{
if
(
key
>=
pCheckInfo
->
pBlock
[
index
].
keyFirst
)
{
break
;
}
else
{
index
=
-
1
;
}
}
}
// failed to find qualified point in file, abort
if
(
index
==
-
1
)
{
return
false
;
}
assert
(
index
>=
0
&&
index
<
pQueryHandle
->
numOfBlocks
);
// load first data block into memory failed, caused by disk block error
bool
blockLoaded
=
false
;
SArray
*
sa
=
NULL
;
// todo no need to loaded at all
cur
->
slot
=
index
;
// sa = getDefaultLoadColumns(pQueryHandle, true);
if
(
tsdbLoadDataBlock
(
&
fileGroup
->
files
[
2
],
&
pCheckInfo
->
pBlock
[
cur
->
slot
],
1
,
fid
,
sa
)
==
0
)
{
blockLoaded
=
true
;
}
// dError("QInfo:%p fileId:%d total numOfBlks:%d blockId:%d load into memory failed due to error in disk files",
// GET_QINFO_ADDR(pQuery), pQuery->fileId, pQuery->numOfBlocks, blkIdx);
// failed to load data from disk, abort current query
if
(
blockLoaded
==
false
)
{
return
false
;
}
// todo search qualified points in blk, according to primary key (timestamp) column
// cur->pos = binarySearchForBlockImpl(ptsBuf->data, pBlocks->numOfPoints, key, pQueryHandle->order);
assert
(
cur
->
pos
>=
0
&&
cur
->
fid
>=
0
&&
cur
->
slot
>=
0
);
filterDataInDataBlock
(
pQueryHandle
,
sa
);
return
pQueryHandle
->
realNumOfRows
>
0
;
}
static
bool
hasMoreDataInFileForSingleTableModel
(
STsdbQueryHandle
*
pHandle
)
{
assert
(
pHandle
->
activeIndex
==
0
&&
taosArrayGetSize
(
pHandle
->
pTableCheckInfo
)
==
1
);
STsdbFileH
*
pFileHandle
=
tsdbGetFile
(
pHandle
->
pTsdb
);
SQueryFilePos
*
cur
=
&
pHandle
->
cur
;
STableCheckInfo
*
pCheckInfo
=
taosArrayGet
(
pHandle
->
pTableCheckInfo
,
pHandle
->
activeIndex
);
if
(
!
pCheckInfo
->
checkFirstFileBlock
&&
pFileHandle
!=
NULL
)
{
int32_t
fid
=
getFileIdFromKey
(
pCheckInfo
->
lastKey
);
SFileGroup
*
fileGroup
=
tsdbSearchFGroup
(
pFileHandle
,
fid
);
pCheckInfo
->
checkFirstFileBlock
=
true
;
if
(
fileGroup
!=
NULL
)
{
return
getQualifiedDataBlock
(
pHandle
,
pCheckInfo
,
1
);
}
else
{
// no data in file, try cache
return
hasMoreDataInCacheForSingleModel
(
pHandle
);
}
}
else
{
pCheckInfo
->
checkFirstFileBlock
=
true
;
if
(
pFileHandle
==
NULL
)
{
cur
->
fid
=
-
1
;
}
if
(
cur
->
fid
==
-
1
||
pFileHandle
!=
NULL
)
{
// try data in cache
return
hasMoreDataInCacheForSingleModel
(
pHandle
);
}
else
{
return
true
;
}
}
}
static
bool
hasMoreDataInCacheForMultiModel
(
STsdbQueryHandle
*
pHandle
)
{
size_t
numOfTables
=
taosArrayGetSize
(
pHandle
->
pTableCheckInfo
);
assert
(
numOfTables
>
0
);
...
...
@@ -372,7 +720,7 @@ static bool hasMoreDataInCacheForMultiModel(STsdbQueryHandle* pHandle) {
bool
tsdbNextDataBlock
(
tsdb_query_handle_t
*
pQueryHandle
)
{
STsdbQueryHandle
*
pHandle
=
(
STsdbQueryHandle
*
)
pQueryHandle
;
if
(
pHandle
->
model
==
SINGLE_TABLE_MODEL
)
{
return
hasMoreDataIn
CacheForSing
leModel
(
pHandle
);
return
hasMoreDataIn
FileForSingleTab
leModel
(
pHandle
);
}
else
{
return
hasMoreDataInCacheForMultiModel
(
pHandle
);
}
...
...
@@ -704,8 +1052,6 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
STable
*
pTable
=
(
STable
*
)(
SL_GET_NODE_DATA
((
SSkipListNode
*
)
pNode
));
char
buf
[
TSDB_MAX_TAGS_LEN
]
=
{
0
};
char
*
val
=
dataRowTuple
(
pTable
->
tagVal
);
// todo not only the first column
int8_t
type
=
pInfo
->
sch
.
type
;
...
...
@@ -765,9 +1111,11 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, const char* pCond
// query according to the binary expression
SSyntaxTreeFilterSupporter
s
=
{.
pTagSchema
=
stcol
,
.
numOfTags
=
schemaNCols
(
pSTable
->
tagSchema
)};
SBinaryFilterSupp
supp
=
{.
fp
=
(
__result_filter_fn_t
)
tSkipListNodeFilterCallback
,
SBinaryFilterSupp
supp
=
{
.
fp
=
(
__result_filter_fn_t
)
tSkipListNodeFilterCallback
,
.
setupInfoFn
=
(
__do_filter_suppl_fn_t
)
filterPrepare
,
.
pExtInfo
=
&
s
};
.
pExtInfo
=
&
s
};
tSQLBinaryExprTraverse
(
pExpr
,
pSTable
->
pIndex
,
pRes
,
&
supp
);
tSQLBinaryExprDestroy
(
&
pExpr
,
tSQLListTraverseDestroyInfo
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录