Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
acda5114
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
acda5114
编写于
12月 01, 2020
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refact
上级
4a78a061
变更
6
展开全部
隐藏空白更改
内联
并排
Showing
6 changed file
with
186 addition
and
186 deletion
+186
-186
src/tsdb/inc/tsdbMain.h
src/tsdb/inc/tsdbMain.h
+24
-24
src/tsdb/src/tsdbCommit.c
src/tsdb/src/tsdbCommit.c
+2
-2
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+1
-1
src/tsdb/src/tsdbRWHelper.c
src/tsdb/src/tsdbRWHelper.c
+143
-143
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+14
-14
src/tsdb/src/tsdbScan.c
src/tsdb/src/tsdbScan.c
+2
-2
未找到文件。
src/tsdb/inc/tsdbMain.h
浏览文件 @
acda5114
...
...
@@ -257,7 +257,7 @@ typedef struct {
uint32_t
numOfBlocks
:
30
;
uint64_t
uid
;
TSKEY
maxKey
;
}
S
Comp
Idx
;
}
S
Block
Idx
;
typedef
struct
{
int64_t
last
:
1
;
...
...
@@ -265,19 +265,19 @@ typedef struct {
int32_t
algorithm
:
8
;
int32_t
numOfRows
:
24
;
int32_t
len
;
int32_t
keyLen
;
// key column length, keyOffset = offset+sizeof(S
CompData)+sizeof(SComp
Col)*numOfCols
int32_t
keyLen
;
// key column length, keyOffset = offset+sizeof(S
BlockData)+sizeof(SBlock
Col)*numOfCols
int16_t
numOfSubBlocks
;
int16_t
numOfCols
;
// not including timestamp column
TSKEY
keyFirst
;
TSKEY
keyLast
;
}
S
Comp
Block
;
}
SBlock
;
typedef
struct
{
int32_t
delimiter
;
// For recovery usage
int32_t
tid
;
uint64_t
uid
;
S
Comp
Block
blocks
[];
}
S
Comp
Info
;
SBlock
blocks
[];
}
S
Block
Info
;
typedef
struct
{
int16_t
colId
;
...
...
@@ -291,14 +291,14 @@ typedef struct {
int16_t
minIndex
;
int16_t
numOfNull
;
char
padding
[
2
];
}
S
Comp
Col
;
}
S
Block
Col
;
typedef
struct
{
int32_t
delimiter
;
// For recovery usage
int32_t
numOfCols
;
// For recovery usage
uint64_t
uid
;
// For recovery usage
S
Comp
Col
cols
[];
}
S
Comp
Data
;
S
Block
Col
cols
[];
}
S
Block
Data
;
typedef
enum
{
TSDB_WRITE_HELPER
,
TSDB_READ_HELPER
}
tsdb_rw_helper_t
;
...
...
@@ -316,7 +316,7 @@ typedef struct {
}
SHelperTable
;
typedef
struct
{
S
Comp
Idx
*
pIdxArray
;
S
Block
Idx
*
pIdxArray
;
int
numOfIdx
;
int
curIdx
;
}
SIdxH
;
...
...
@@ -329,14 +329,14 @@ typedef struct {
// For file set usage
SHelperFile
files
;
SIdxH
idxH
;
S
Comp
Idx
curCompIdx
;
S
Block
Idx
curCompIdx
;
void
*
pWIdx
;
// For table set usage
SHelperTable
tableInfo
;
S
Comp
Info
*
pCompInfo
;
S
Block
Info
*
pCompInfo
;
bool
hasOldLastBlock
;
// For block set usage
S
Comp
Data
*
pCompData
;
S
Block
Data
*
pCompData
;
SDataCols
*
pDataCols
[
2
];
void
*
pBuffer
;
// Buffer to hold the whole data block
void
*
compBuffer
;
// Buffer for temperary compress/decompress purpose
...
...
@@ -355,8 +355,8 @@ typedef struct {
typedef
struct
{
SFileGroup
fGroup
;
int
numOfIdx
;
S
Comp
Idx
*
pCompIdx
;
S
Comp
Info
*
pCompInfo
;
S
Block
Idx
*
pCompIdx
;
S
Block
Info
*
pCompInfo
;
void
*
pBuf
;
FILE
*
tLogStream
;
}
STsdbScanHandle
;
...
...
@@ -535,10 +535,10 @@ int tsdbApplyRetention(STsdbRepo* pRepo, SFidGroup *pFidGroup);
// ------------------ tsdbRWHelper.c
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
#define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set
#define TSDB_HELPER_IDX_LOAD 0x2 // S
Comp
Idx part is loaded
#define TSDB_HELPER_IDX_LOAD 0x2 // S
Block
Idx part is loaded
#define TSDB_HELPER_TABLE_SET 0x4 // Table is set
#define TSDB_HELPER_INFO_LOAD 0x8 // S
Comp
Info part is loaded
#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // S
Comp
Data part is loaded
#define TSDB_HELPER_INFO_LOAD 0x8 // S
Block
Info part is loaded
#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // S
Block
Data part is loaded
#define helperSetState(h, s) (((h)->state) |= (s))
#define helperClearState(h, s) ((h)->state &= (~(s)))
#define helperHasState(h, s) ((((h)->state) & (s)) == (s))
...
...
@@ -568,15 +568,15 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
int
tsdbWriteCompInfo
(
SRWHelper
*
pHelper
);
int
tsdbWriteCompIdx
(
SRWHelper
*
pHelper
);
int
tsdbLoadCompIdxImpl
(
SFile
*
pFile
,
uint32_t
offset
,
uint32_t
len
,
void
*
buffer
);
int
tsdbDecodeS
CompIdxImpl
(
void
*
buffer
,
uint32_t
len
,
SComp
Idx
**
ppCompIdx
,
int
*
numOfIdx
);
int
tsdbDecodeS
BlockIdxImpl
(
void
*
buffer
,
uint32_t
len
,
SBlock
Idx
**
ppCompIdx
,
int
*
numOfIdx
);
int
tsdbLoadCompIdx
(
SRWHelper
*
pHelper
,
void
*
target
);
int
tsdbLoadCompInfoImpl
(
SFile
*
pFile
,
S
CompIdx
*
pIdx
,
SComp
Info
**
ppCompInfo
);
int
tsdbLoadCompInfoImpl
(
SFile
*
pFile
,
S
BlockIdx
*
pIdx
,
SBlock
Info
**
ppCompInfo
);
int
tsdbLoadCompInfo
(
SRWHelper
*
pHelper
,
void
*
target
);
int
tsdbLoadCompData
(
SRWHelper
*
phelper
,
S
Comp
Block
*
pcompblock
,
void
*
target
);
int
tsdbLoadCompData
(
SRWHelper
*
phelper
,
SBlock
*
pcompblock
,
void
*
target
);
void
tsdbGetDataStatis
(
SRWHelper
*
pHelper
,
SDataStatis
*
pStatis
,
int
numOfCols
);
int
tsdbLoadBlockDataCols
(
SRWHelper
*
pHelper
,
S
CompBlock
*
pCompBlock
,
SComp
Info
*
pCompInfo
,
int16_t
*
colIds
,
int
tsdbLoadBlockDataCols
(
SRWHelper
*
pHelper
,
S
Block
*
pCompBlock
,
SBlock
Info
*
pCompInfo
,
int16_t
*
colIds
,
int
numOfColIds
);
int
tsdbLoadBlockData
(
SRWHelper
*
pHelper
,
S
CompBlock
*
pCompBlock
,
SComp
Info
*
pCompInfo
);
int
tsdbLoadBlockData
(
SRWHelper
*
pHelper
,
S
Block
*
pCompBlock
,
SBlock
Info
*
pCompInfo
);
static
FORCE_INLINE
int
compTSKEY
(
const
void
*
key1
,
const
void
*
key2
)
{
if
(
*
(
TSKEY
*
)
key1
>
*
(
TSKEY
*
)
key2
)
{
...
...
@@ -608,8 +608,8 @@ int tsdbScanFGroup(STsdbScanHandle* pScanHandle, char* rootDir, int
STsdbScanHandle
*
tsdbNewScanHandle
();
void
tsdbSetScanLogStream
(
STsdbScanHandle
*
pScanHandle
,
FILE
*
fLogStream
);
int
tsdbSetAndOpenScanFile
(
STsdbScanHandle
*
pScanHandle
,
char
*
rootDir
,
int
fid
);
int
tsdbScanS
Comp
Idx
(
STsdbScanHandle
*
pScanHandle
);
int
tsdbScanS
Comp
Block
(
STsdbScanHandle
*
pScanHandle
,
int
idx
);
int
tsdbScanS
Block
Idx
(
STsdbScanHandle
*
pScanHandle
);
int
tsdbScanSBlock
(
STsdbScanHandle
*
pScanHandle
,
int
idx
);
int
tsdbCloseScanFile
(
STsdbScanHandle
*
pScanHandle
);
void
tsdbFreeScanHandle
(
STsdbScanHandle
*
pScanHandle
);
...
...
src/tsdb/src/tsdbCommit.c
浏览文件 @
acda5114
...
...
@@ -207,7 +207,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) {
newLast
=
TSDB_NLAST_FILE_OPENED
(
pHelper
);
if
(
tsdbLoadCompIdx
(
pHelper
,
NULL
)
<
0
)
{
tsdbError
(
"vgId:%d failed to load S
Comp
Idx part since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
tsdbError
(
"vgId:%d failed to load S
Block
Idx part since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
...
...
@@ -243,7 +243,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) {
goto
_err
;
}
// Write the S
Comp
Block part
// Write the SBlock part
if
(
tsdbWriteCompInfo
(
pHelper
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to write compInfo part since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
acda5114
...
...
@@ -715,7 +715,7 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) {
STable
*
pTable
=
pMeta
->
tables
[
i
];
if
(
pTable
==
NULL
)
continue
;
if
(
tsdbSetHelperTable
(
&
rhelper
,
pTable
,
pRepo
)
<
0
)
goto
_err
;
S
Comp
Idx
*
pIdx
=
&
(
rhelper
.
curCompIdx
);
S
Block
Idx
*
pIdx
=
&
(
rhelper
.
curCompIdx
);
if
(
pIdx
->
offset
>
0
&&
pTable
->
lastKey
<
pIdx
->
maxKey
)
pTable
->
lastKey
=
pIdx
->
maxKey
;
}
...
...
src/tsdb/src/tsdbRWHelper.c
浏览文件 @
acda5114
此差异已折叠。
点击以展开。
src/tsdb/src/tsdbRead.c
浏览文件 @
acda5114
...
...
@@ -69,7 +69,7 @@ typedef struct STableCheckInfo {
STableId
tableId
;
TSKEY
lastKey
;
STable
*
pTableObj
;
S
Comp
Info
*
pCompInfo
;
S
Block
Info
*
pCompInfo
;
int32_t
compSize
;
int32_t
numOfBlocks
:
29
;
// number of qualified data blocks not the original blocks
int8_t
chosen
:
2
;
// indicate which iterator should move forward
...
...
@@ -79,7 +79,7 @@ typedef struct STableCheckInfo {
}
STableCheckInfo
;
typedef
struct
STableBlockInfo
{
S
Comp
Block
*
compBlock
;
SBlock
*
compBlock
;
STableCheckInfo
*
pTableCheckInfo
;
}
STableBlockInfo
;
...
...
@@ -136,7 +136,7 @@ typedef struct STableGroupSupporter {
static
STimeWindow
changeTableGroupByLastrow
(
STableGroupInfo
*
groupList
);
static
void
changeQueryHandleForInterpQuery
(
TsdbQueryHandleT
pHandle
);
static
void
doMergeTwoLevelData
(
STsdbQueryHandle
*
pQueryHandle
,
STableCheckInfo
*
pCheckInfo
,
S
Comp
Block
*
pBlock
);
static
void
doMergeTwoLevelData
(
STsdbQueryHandle
*
pQueryHandle
,
STableCheckInfo
*
pCheckInfo
,
SBlock
*
pBlock
);
static
int32_t
binarySearchForKey
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
);
static
int
tsdbReadRowsFromCache
(
STableCheckInfo
*
pCheckInfo
,
TSKEY
maxKey
,
int
maxRowsToRead
,
STimeWindow
*
win
,
STsdbQueryHandle
*
pQueryHandle
);
...
...
@@ -669,7 +669,7 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio
return
(
int32_t
)
fid
;
}
static
int32_t
binarySearchForBlock
(
S
Comp
Block
*
pBlock
,
int32_t
numOfBlocks
,
TSKEY
skey
,
int32_t
order
)
{
static
int32_t
binarySearchForBlock
(
SBlock
*
pBlock
,
int32_t
numOfBlocks
,
TSKEY
skey
,
int32_t
order
)
{
int32_t
firstSlot
=
0
;
int32_t
lastSlot
=
numOfBlocks
-
1
;
...
...
@@ -712,7 +712,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
break
;
}
S
Comp
Idx
*
compIndex
=
&
pQueryHandle
->
rhelper
.
curCompIdx
;
S
Block
Idx
*
compIndex
=
&
pQueryHandle
->
rhelper
.
curCompIdx
;
// no data block in this file, try next file
if
(
compIndex
->
len
==
0
||
compIndex
->
numOfBlocks
==
0
||
compIndex
->
uid
!=
pCheckInfo
->
tableId
.
uid
)
{
...
...
@@ -729,12 +729,12 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
break
;
}
pCheckInfo
->
pCompInfo
=
(
S
Comp
Info
*
)
t
;
pCheckInfo
->
pCompInfo
=
(
S
Block
Info
*
)
t
;
pCheckInfo
->
compSize
=
compIndex
->
len
;
}
tsdbLoadCompInfo
(
&
(
pQueryHandle
->
rhelper
),
(
void
*
)(
pCheckInfo
->
pCompInfo
));
S
Comp
Info
*
pCompInfo
=
pCheckInfo
->
pCompInfo
;
S
Block
Info
*
pCompInfo
=
pCheckInfo
->
pCompInfo
;
TSKEY
s
=
TSKEY_INITIAL_VAL
,
e
=
TSKEY_INITIAL_VAL
;
...
...
@@ -763,7 +763,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
pCheckInfo
->
numOfBlocks
=
(
end
-
start
);
if
(
start
>
0
)
{
memmove
(
pCompInfo
->
blocks
,
&
pCompInfo
->
blocks
[
start
],
pCheckInfo
->
numOfBlocks
*
sizeof
(
S
Comp
Block
));
memmove
(
pCompInfo
->
blocks
,
&
pCompInfo
->
blocks
[
start
],
pCheckInfo
->
numOfBlocks
*
sizeof
(
SBlock
));
}
(
*
numOfBlocks
)
+=
pCheckInfo
->
numOfBlocks
;
...
...
@@ -772,7 +772,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
return
code
;
}
static
int32_t
doLoadFileDataBlock
(
STsdbQueryHandle
*
pQueryHandle
,
S
Comp
Block
*
pBlock
,
STableCheckInfo
*
pCheckInfo
,
int32_t
slotIndex
)
{
static
int32_t
doLoadFileDataBlock
(
STsdbQueryHandle
*
pQueryHandle
,
SBlock
*
pBlock
,
STableCheckInfo
*
pCheckInfo
,
int32_t
slotIndex
)
{
int64_t
st
=
taosGetTimestampUs
();
STSchema
*
pSchema
=
tsdbGetTableSchema
(
pCheckInfo
->
pTableObj
);
...
...
@@ -838,7 +838,7 @@ static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, i
static
void
doCheckGeneratedBlockRange
(
STsdbQueryHandle
*
pQueryHandle
);
static
void
copyAllRemainRowsFromFileBlock
(
STsdbQueryHandle
*
pQueryHandle
,
STableCheckInfo
*
pCheckInfo
,
SDataBlockInfo
*
pBlockInfo
,
int32_t
endPos
);
static
int32_t
handleDataMergeIfNeeded
(
STsdbQueryHandle
*
pQueryHandle
,
S
Comp
Block
*
pBlock
,
STableCheckInfo
*
pCheckInfo
){
static
int32_t
handleDataMergeIfNeeded
(
STsdbQueryHandle
*
pQueryHandle
,
SBlock
*
pBlock
,
STableCheckInfo
*
pCheckInfo
){
SQueryFilePos
*
cur
=
&
pQueryHandle
->
cur
;
STsdbCfg
*
pCfg
=
&
pQueryHandle
->
pTsdb
->
config
;
SDataBlockInfo
binfo
=
GET_FILE_DATA_BLOCK_INFO
(
pCheckInfo
,
pBlock
);
...
...
@@ -921,7 +921,7 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBloc
return
code
;
}
static
int32_t
loadFileDataBlock
(
STsdbQueryHandle
*
pQueryHandle
,
S
Comp
Block
*
pBlock
,
STableCheckInfo
*
pCheckInfo
,
bool
*
exists
)
{
static
int32_t
loadFileDataBlock
(
STsdbQueryHandle
*
pQueryHandle
,
SBlock
*
pBlock
,
STableCheckInfo
*
pCheckInfo
,
bool
*
exists
)
{
SQueryFilePos
*
cur
=
&
pQueryHandle
->
cur
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -1327,7 +1327,7 @@ int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBl
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
static
void
doMergeTwoLevelData
(
STsdbQueryHandle
*
pQueryHandle
,
STableCheckInfo
*
pCheckInfo
,
S
Comp
Block
*
pBlock
)
{
static
void
doMergeTwoLevelData
(
STsdbQueryHandle
*
pQueryHandle
,
STableCheckInfo
*
pCheckInfo
,
SBlock
*
pBlock
)
{
SQueryFilePos
*
cur
=
&
pQueryHandle
->
cur
;
SDataBlockInfo
blockInfo
=
GET_FILE_DATA_BLOCK_INFO
(
pCheckInfo
,
pBlock
);
STsdbCfg
*
pCfg
=
&
pQueryHandle
->
pTsdb
->
config
;
...
...
@@ -1626,7 +1626,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
continue
;
}
S
Comp
Block
*
pBlock
=
pTableCheck
->
pCompInfo
->
blocks
;
SBlock
*
pBlock
=
pTableCheck
->
pCompInfo
->
blocks
;
sup
.
numOfBlocksPerTable
[
numOfQualTables
]
=
pTableCheck
->
numOfBlocks
;
char
*
buf
=
calloc
(
1
,
sizeof
(
STableBlockInfo
)
*
pTableCheck
->
numOfBlocks
);
...
...
@@ -2316,7 +2316,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
pBlockLoadInfo
->
tid
==
pCheckInfo
->
pTableObj
->
tableId
.
tid
)
{
return
pHandle
->
pColumns
;
}
else
{
// only load the file block
S
Comp
Block
*
pBlock
=
pBlockInfo
->
compBlock
;
SBlock
*
pBlock
=
pBlockInfo
->
compBlock
;
if
(
doLoadFileDataBlock
(
pHandle
,
pBlock
,
pCheckInfo
,
pHandle
->
cur
.
slot
)
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
...
...
src/tsdb/src/tsdbScan.c
浏览文件 @
acda5114
...
...
@@ -25,9 +25,9 @@ void tsdbSetScanLogStream(STsdbScanHandle* pScanHandle, FILE* fLogStream) {}
int
tsdbSetAndOpenScanFile
(
STsdbScanHandle
*
pScanHandle
,
char
*
rootDir
,
int
fid
)
{
return
0
;
}
int
tsdbScanS
Comp
Idx
(
STsdbScanHandle
*
pScanHandle
)
{
return
0
;
}
int
tsdbScanS
Block
Idx
(
STsdbScanHandle
*
pScanHandle
)
{
return
0
;
}
int
tsdbScanS
Comp
Block
(
STsdbScanHandle
*
pScanHandle
,
int
idx
)
{
return
0
;
}
int
tsdbScanSBlock
(
STsdbScanHandle
*
pScanHandle
,
int
idx
)
{
return
0
;
}
int
tsdbCloseScanFile
(
STsdbScanHandle
*
pScanHandle
)
{
return
0
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录