Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
47a75227
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
47a75227
编写于
6月 28, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/feat/tsdb_refact' into feat/tsdb_refact
# Conflicts: # source/dnode/vnode/src/vnd/vnodeSvr.c
上级
4a59e130
3ba5d8e9
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
230 addition
and
533 deletion
+230
-533
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+11
-13
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+102
-416
source/dnode/vnode/src/tsdb/tsdbFile.c
source/dnode/vnode/src/tsdb/tsdbFile.c
+2
-8
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
+114
-87
source/dnode/vnode/src/tsdb/tsdbUtil.c
source/dnode/vnode/src/tsdb/tsdbUtil.c
+0
-8
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+1
-1
未找到文件。
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
47a75227
...
@@ -236,14 +236,14 @@ int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA);
...
@@ -236,14 +236,14 @@ int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA);
// SDelFWriter
// SDelFWriter
int32_t
tsdbDelFWriterOpen
(
SDelFWriter
**
ppWriter
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
);
int32_t
tsdbDelFWriterOpen
(
SDelFWriter
**
ppWriter
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
);
int32_t
tsdbDelFWriterClose
(
SDelFWriter
*
pWriter
,
int8_t
sync
);
int32_t
tsdbDelFWriterClose
(
SDelFWriter
*
pWriter
,
int8_t
sync
);
int32_t
tsdbWriteDelData
(
SDelFWriter
*
pWriter
,
S
MapData
*
pDelDataMap
,
uint8_t
**
ppBuf
,
SDelIdx
*
pDelIdx
);
int32_t
tsdbWriteDelData
(
SDelFWriter
*
pWriter
,
S
Array
*
aDelData
,
uint8_t
**
ppBuf
,
SDelIdx
*
pDelIdx
);
int32_t
tsdbWriteDelIdx
(
SDelFWriter
*
pWriter
,
S
MapData
*
pDelIdxMap
,
uint8_t
**
ppBuf
);
int32_t
tsdbWriteDelIdx
(
SDelFWriter
*
pWriter
,
S
Array
*
aDelIdx
,
uint8_t
**
ppBuf
);
int32_t
tsdbUpdateDelFileHdr
(
SDelFWriter
*
pWriter
,
uint8_t
**
ppBuf
);
int32_t
tsdbUpdateDelFileHdr
(
SDelFWriter
*
pWriter
);
// SDelFReader
// SDelFReader
int32_t
tsdbDelFReaderOpen
(
SDelFReader
**
ppReader
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
,
uint8_t
**
ppBuf
);
int32_t
tsdbDelFReaderOpen
(
SDelFReader
**
ppReader
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
,
uint8_t
**
ppBuf
);
int32_t
tsdbDelFReaderClose
(
SDelFReader
*
pReader
);
int32_t
tsdbDelFReaderClose
(
SDelFReader
*
pReader
);
int32_t
tsdbReadDelData
(
SDelFReader
*
pReader
,
SDelIdx
*
pDelIdx
,
S
MapData
*
pDelDataMap
,
uint8_t
**
ppBuf
);
int32_t
tsdbReadDelData
(
SDelFReader
*
pReader
,
SDelIdx
*
pDelIdx
,
S
Array
*
aDelData
,
uint8_t
**
ppBuf
);
int32_t
tsdbReadDelIdx
(
SDelFReader
*
pReader
,
S
MapData
*
pDelIdxMap
,
uint8_t
**
ppBuf
);
int32_t
tsdbReadDelIdx
(
SDelFReader
*
pReader
,
S
Array
*
aDelIdx
,
uint8_t
**
ppBuf
);
// tsdbCache
// tsdbCache
int32_t
tsdbOpenCache
(
STsdb
*
pTsdb
);
int32_t
tsdbOpenCache
(
STsdb
*
pTsdb
);
...
@@ -464,20 +464,12 @@ struct SDelData {
...
@@ -464,20 +464,12 @@ struct SDelData {
struct
SDelIdx
{
struct
SDelIdx
{
tb_uid_t
suid
;
tb_uid_t
suid
;
tb_uid_t
uid
;
tb_uid_t
uid
;
TSKEY
minKey
;
TSKEY
maxKey
;
int64_t
minVersion
;
int64_t
maxVersion
;
int64_t
offset
;
int64_t
offset
;
int64_t
size
;
int64_t
size
;
};
};
struct
SDelFile
{
struct
SDelFile
{
int64_t
commitID
;
int64_t
commitID
;
TSKEY
minKey
;
TSKEY
maxKey
;
int64_t
minVersion
;
int64_t
maxVersion
;
int64_t
size
;
int64_t
size
;
int64_t
offset
;
int64_t
offset
;
};
};
...
@@ -561,6 +553,12 @@ struct STsdbFS {
...
@@ -561,6 +553,12 @@ struct STsdbFS {
STsdbFSState
*
nState
;
STsdbFSState
*
nState
;
};
};
struct
SDelFWriter
{
STsdb
*
pTsdb
;
SDelFile
fDel
;
TdFilePtr
pWriteH
;
};
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
47a75227
...
@@ -45,11 +45,10 @@ typedef struct {
...
@@ -45,11 +45,10 @@ typedef struct {
STSchema
*
pTSchema
;
STSchema
*
pTSchema
;
/* commit del */
/* commit del */
SDelFReader
*
pDelFReader
;
SDelFReader
*
pDelFReader
;
SMapData
oDelIdxMap
;
// SMapData<SDelIdx>, old
SMapData
oDelDataMap
;
// SMapData<SDelData>, old
SDelFWriter
*
pDelFWriter
;
SDelFWriter
*
pDelFWriter
;
SMapData
nDelIdxMap
;
// SMapData<SDelIdx>, new
SArray
*
aDelIdx
;
// SArray<SDelIdx>
SMapData
nDelDataMap
;
// SMapData<SDelData>, new
SArray
*
aDelIdxN
;
// SArray<SDelIdx>
SArray
*
aDelData
;
// SArray<SDelData>
}
SCommitter
;
}
SCommitter
;
static
int32_t
tsdbStartCommit
(
STsdb
*
pTsdb
,
SCommitter
*
pCommitter
);
static
int32_t
tsdbStartCommit
(
STsdb
*
pTsdb
,
SCommitter
*
pCommitter
);
...
@@ -119,23 +118,37 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
...
@@ -119,23 +118,37 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
int32_t
code
=
0
;
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
SDelFile
*
pDelFileR
=
NULL
;
// TODO
SDelFile
*
pDelFileW
=
NULL
;
// TODO
tMapDataReset
(
&
pCommitter
->
oDelIdxMap
);
pCommitter
->
aDelIdx
=
taosArrayInit
(
0
,
sizeof
(
SDelIdx
));
tMapDataReset
(
&
pCommitter
->
nDelIdxMap
);
if
(
pCommitter
->
aDelIdx
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pCommitter
->
aDelData
=
taosArrayInit
(
0
,
sizeof
(
SDelData
));
if
(
pCommitter
->
aDelData
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pCommitter
->
aDelIdxN
=
taosArrayInit
(
0
,
sizeof
(
SDelIdx
));
if
(
pCommitter
->
aDelIdxN
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
// load old
SDelFile
*
pDelFileR
=
pTsdb
->
fs
->
nState
->
pDelFile
;
if
(
pDelFileR
)
{
if
(
pDelFileR
)
{
code
=
tsdbDelFReaderOpen
(
&
pCommitter
->
pDelFReader
,
pDelFileR
,
pTsdb
,
NULL
);
code
=
tsdbDelFReaderOpen
(
&
pCommitter
->
pDelFReader
,
pDelFileR
,
pTsdb
,
NULL
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
code
=
tsdbReadDelIdx
(
pCommitter
->
pDelFReader
,
&
pCommitter
->
oDelIdxMap
,
NULL
);
code
=
tsdbReadDelIdx
(
pCommitter
->
pDelFReader
,
pCommitter
->
aDelIdx
,
NULL
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
}
}
// prepare new
// prepare new
code
=
tsdbDelFWriterOpen
(
&
pCommitter
->
pDelFWriter
,
pDelFileW
,
pTsdb
);
SDelFile
wDelFile
=
{.
commitID
=
pCommitter
->
commitID
,
.
size
=
0
,
.
offset
=
0
};
code
=
tsdbDelFWriterOpen
(
&
pCommitter
->
pDelFWriter
,
&
wDelFile
,
pTsdb
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
_exit:
_exit:
...
@@ -149,74 +162,51 @@ _err:
...
@@ -149,74 +162,51 @@ _err:
static
int32_t
tsdbCommitTableDel
(
SCommitter
*
pCommitter
,
STbData
*
pTbData
,
SDelIdx
*
pDelIdx
)
{
static
int32_t
tsdbCommitTableDel
(
SCommitter
*
pCommitter
,
STbData
*
pTbData
,
SDelIdx
*
pDelIdx
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
SDelData
*
pDelData
=
&
(
SDelData
){}
;
SDelData
*
pDelData
;
tb_uid_t
suid
;
tb_uid_t
suid
;
tb_uid_t
uid
;
tb_uid_t
uid
;
SDelIdx
delIdx
;
// TODO
// check no del data, just return
taosArrayClear
(
pCommitter
->
aDelData
);
if
(
pTbData
&&
pTbData
->
pHead
==
NULL
)
{
pTbData
=
NULL
;
}
if
(
pTbData
==
NULL
&&
pDelIdx
==
NULL
)
goto
_exit
;
// prepare
if
(
pTbData
)
{
if
(
pTbData
)
{
delIdx
.
suid
=
pTbData
->
suid
;
suid
=
pTbData
->
suid
;
delIdx
.
uid
=
pTbData
->
uid
;
uid
=
pTbData
->
uid
;
}
else
{
delIdx
.
suid
=
pDelIdx
->
suid
;
delIdx
.
uid
=
pDelIdx
->
uid
;
}
delIdx
.
minKey
=
TSKEY_MAX
;
delIdx
.
maxKey
=
TSKEY_MIN
;
delIdx
.
minVersion
=
INT64_MAX
;
delIdx
.
maxVersion
=
INT64_MIN
;
// start
if
(
pTbData
->
pHead
==
NULL
)
{
tMapDataReset
(
&
pCommitter
->
oDelDataMap
);
pTbData
=
NULL
;
tMapDataReset
(
&
pCommitter
->
nDelDataMap
);
}
}
if
(
pDelIdx
)
{
if
(
pDelIdx
)
{
code
=
tsdbReadDelData
(
pCommitter
->
pDelFReader
,
pDelIdx
,
&
pCommitter
->
oDelDataMap
,
NULL
);
suid
=
pDelIdx
->
suid
;
if
(
code
)
goto
_err
;
uid
=
pDelIdx
->
uid
;
}
// disk
code
=
tsdbReadDelData
(
pCommitter
->
pDelFReader
,
pDelIdx
,
pCommitter
->
aDelData
,
NULL
);
for
(
int32_t
iDelData
=
0
;
iDelData
<
pCommitter
->
oDelDataMap
.
nItem
;
iDelData
++
)
{
code
=
tMapDataGetItemByIdx
(
&
pCommitter
->
oDelDataMap
,
iDelData
,
pDelData
,
tGetDelData
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
}
code
=
tMapDataPutItem
(
&
pCommitter
->
nDelDataMap
,
pDelData
,
tPutDelData
);
if
(
pTbData
==
NULL
&&
pDelIdx
==
NULL
)
goto
_exit
;
if
(
code
)
goto
_err
;
if
(
delIdx
.
minKey
>
pDelData
->
sKey
)
delIdx
.
minKey
=
pDelData
->
sKey
;
SDelIdx
delIdx
=
{.
suid
=
suid
,
.
uid
=
uid
};
if
(
delIdx
.
maxKey
<
pDelData
->
eKey
)
delIdx
.
maxKey
=
pDelData
->
eKey
;
if
(
delIdx
.
minVersion
>
pDelData
->
version
)
delIdx
.
minVersion
=
pDelData
->
version
;
if
(
delIdx
.
maxVersion
<
pDelData
->
version
)
delIdx
.
maxVersion
=
pDelData
->
version
;
}
// memory
// memory
pDelData
=
pTbData
?
pTbData
->
pHead
:
NULL
;
pDelData
=
pTbData
?
pTbData
->
pHead
:
NULL
;
for
(;
pDelData
;
pDelData
=
pDelData
->
pNext
)
{
for
(;
pDelData
;
pDelData
=
pDelData
->
pNext
)
{
code
=
tMapDataPutItem
(
&
pCommitter
->
nDelDataMap
,
pDelData
,
tPutDelData
);
if
(
taosArrayPush
(
pCommitter
->
aDelData
,
pDelData
)
==
NULL
)
{
if
(
code
)
goto
_err
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
if
(
delIdx
.
minKey
>
pDelData
->
sKey
)
delIdx
.
minKey
=
pDelData
->
sKey
;
}
if
(
delIdx
.
maxKey
<
pDelData
->
eKey
)
delIdx
.
maxKey
=
pDelData
->
eKey
;
if
(
delIdx
.
minVersion
>
pDelData
->
version
)
delIdx
.
minVersion
=
pDelData
->
version
;
if
(
delIdx
.
maxVersion
<
pDelData
->
version
)
delIdx
.
maxVersion
=
pDelData
->
version
;
}
}
ASSERT
(
pCommitter
->
nDelDataMap
.
nItem
>
0
);
// write
// write
code
=
tsdbWriteDelData
(
pCommitter
->
pDelFWriter
,
&
pCommitter
->
nDelDataMap
,
NULL
,
&
delIdx
);
code
=
tsdbWriteDelData
(
pCommitter
->
pDelFWriter
,
pCommitter
->
aDelData
,
NULL
,
&
delIdx
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
// put delIdx
// put delIdx
code
=
tMapDataPutItem
(
&
pCommitter
->
nDelIdxMap
,
&
delIdx
,
tPutDelIdx
);
if
(
taosArrayPush
(
pCommitter
->
aDelIdx
,
&
delIdx
)
==
NULL
)
{
if
(
code
)
goto
_err
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
_exit:
_exit:
return
code
;
return
code
;
...
@@ -231,30 +221,23 @@ static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
...
@@ -231,30 +221,23 @@ static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
int32_t
iDelIdx
=
0
;
int32_t
iDelIdx
=
0
;
int32_t
nDelIdx
=
pCommitter
->
oDelIdxMap
.
nItem
;
int32_t
nDelIdx
=
taosArrayGetSize
(
pCommitter
->
aDelIdx
)
;
int32_t
iTbData
=
0
;
int32_t
iTbData
=
0
;
int32_t
nTbData
=
taosArrayGetSize
(
pMemTable
->
aTbData
);
int32_t
nTbData
=
taosArrayGetSize
(
pMemTable
->
aTbData
);
STbData
*
pTbData
;
STbData
*
pTbData
;
SDelIdx
*
pDelIdx
;
SDelIdx
*
pDelIdx
;
SDelIdx
delIdx
;
int32_t
c
;
ASSERT
(
nTbData
>
0
);
ASSERT
(
nTbData
>
0
);
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
if
(
iDelIdx
<
nDelIdx
)
{
pDelIdx
=
(
iDelIdx
<
nDelIdx
)
?
(
SDelIdx
*
)
taosArrayGet
(
pCommitter
->
aDelIdx
,
iDelIdx
)
:
NULL
;
code
=
tMapDataGetItemByIdx
(
&
pCommitter
->
oDelIdxMap
,
iDelIdx
,
&
delIdx
,
tGetDelIdx
);
if
(
code
)
goto
_err
;
pDelIdx
=
&
delIdx
;
}
else
{
pDelIdx
=
NULL
;
}
while
(
true
)
{
while
(
true
)
{
if
(
pTbData
==
NULL
&&
pDelIdx
==
NULL
)
break
;
if
(
pTbData
==
NULL
&&
pDelIdx
==
NULL
)
break
;
if
(
pTbData
&&
pDelIdx
)
{
if
(
pTbData
&&
pDelIdx
)
{
c
=
tTABLEIDCmprFn
(
pTbData
,
pDelIdx
);
int32_t
c
=
tTABLEIDCmprFn
(
pTbData
,
pDelIdx
);
if
(
c
==
0
)
{
if
(
c
==
0
)
{
goto
_commit_mem_and_disk_del
;
goto
_commit_mem_and_disk_del
;
}
else
if
(
c
<
0
)
{
}
else
if
(
c
<
0
)
{
...
@@ -270,44 +253,27 @@ static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
...
@@ -270,44 +253,27 @@ static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
_commit_mem_del:
_commit_mem_del:
code
=
tsdbCommitTableDel
(
pCommitter
,
pTbData
,
NULL
);
code
=
tsdbCommitTableDel
(
pCommitter
,
pTbData
,
NULL
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
iTbData
++
;
iTbData
++
;
if
(
iTbData
<
nTbData
)
{
pTbData
=
(
iTbData
<
nTbData
)
?
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
)
:
NULL
;
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
}
else
{
pTbData
=
NULL
;
}
continue
;
continue
;
_commit_disk_del:
_commit_disk_del:
code
=
tsdbCommitTableDel
(
pCommitter
,
NULL
,
pDelIdx
);
code
=
tsdbCommitTableDel
(
pCommitter
,
NULL
,
pDelIdx
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
iDelIdx
++
;
iDelIdx
++
;
if
(
iDelIdx
<
nDelIdx
)
{
pDelIdx
=
(
iDelIdx
<
nDelIdx
)
?
(
SDelIdx
*
)
taosArrayGet
(
pCommitter
->
aDelIdx
,
iDelIdx
)
:
NULL
;
code
=
tMapDataGetItemByIdx
(
&
pCommitter
->
oDelIdxMap
,
iDelIdx
,
&
delIdx
,
tGetDelIdx
);
if
(
code
)
goto
_err
;
pDelIdx
=
&
delIdx
;
}
else
{
pDelIdx
=
NULL
;
}
continue
;
continue
;
_commit_mem_and_disk_del:
_commit_mem_and_disk_del:
code
=
tsdbCommitTableDel
(
pCommitter
,
pTbData
,
pDelIdx
);
code
=
tsdbCommitTableDel
(
pCommitter
,
pTbData
,
pDelIdx
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
iTbData
++
;
iTbData
++
;
pTbData
=
(
iTbData
<
nTbData
)
?
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
)
:
NULL
;
iDelIdx
++
;
iDelIdx
++
;
if
(
iTbData
<
nTbData
)
{
pDelIdx
=
(
iDelIdx
<
nDelIdx
)
?
(
SDelIdx
*
)
taosArrayGet
(
pCommitter
->
aDelIdx
,
iDelIdx
)
:
NULL
;
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
}
else
{
pTbData
=
NULL
;
}
if
(
iDelIdx
<
nDelIdx
)
{
code
=
tMapDataGetItemByIdx
(
&
pCommitter
->
oDelIdxMap
,
iDelIdx
,
&
delIdx
,
tGetDelIdx
);
if
(
code
)
goto
_err
;
pDelIdx
=
&
delIdx
;
}
else
{
pDelIdx
=
NULL
;
}
continue
;
continue
;
}
}
...
@@ -320,11 +286,15 @@ _err:
...
@@ -320,11 +286,15 @@ _err:
static
int32_t
tsdbCommitDelEnd
(
SCommitter
*
pCommitter
)
{
static
int32_t
tsdbCommitDelEnd
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
code
=
tsdbWriteDelIdx
(
pCommitter
->
pDelFWriter
,
&
pCommitter
->
nDelIdxMap
,
NULL
);
code
=
tsdbWriteDelIdx
(
pCommitter
->
pDelFWriter
,
pCommitter
->
aDelIdxN
,
NULL
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
code
=
tsdbUpdateDelFileHdr
(
pCommitter
->
pDelFWriter
,
NULL
);
code
=
tsdbUpdateDelFileHdr
(
pCommitter
->
pDelFWriter
);
if
(
code
)
goto
_err
;
code
=
tsdbFSStateUpsertDelFile
(
pTsdb
->
fs
->
nState
,
&
pCommitter
->
pDelFWriter
->
fDel
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
code
=
tsdbDelFWriterClose
(
pCommitter
->
pDelFWriter
,
1
);
code
=
tsdbDelFWriterClose
(
pCommitter
->
pDelFWriter
,
1
);
...
@@ -335,6 +305,10 @@ static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
...
@@ -335,6 +305,10 @@ static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
}
}
taosArrayDestroy
(
pCommitter
->
aDelIdx
);
taosArrayDestroy
(
pCommitter
->
aDelData
);
taosArrayDestroy
(
pCommitter
->
aDelIdxN
);
return
code
;
return
code
;
_err:
_err:
...
@@ -431,162 +405,6 @@ _exit:
...
@@ -431,162 +405,6 @@ _exit:
return
code
;
return
code
;
}
}
static
int32_t
tsdbCommitMemoryData
(
SCommitter
*
pCommitter
,
STbData
*
pTbData
)
{
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
STbDataIter
*
pIter
=
&
(
STbDataIter
){
0
};
TSDBKEY
key
=
{.
ts
=
pCommitter
->
minKey
,
.
version
=
VERSION_MIN
};
TSDBROW
row
;
TSDBROW
*
pRow
;
// create iter
tsdbTbDataIterOpen
(
pTbData
,
&
key
,
0
,
pIter
);
pRow
=
tsdbTbDataIterGet
(
pIter
);
if
(
pRow
==
NULL
||
TSDBROW_TS
(
pRow
)
>
pCommitter
->
maxKey
)
goto
_exit
;
// main loop
SBlockIdx
*
pBlockIdx
=
&
(
SBlockIdx
){.
suid
=
pTbData
->
suid
,
.
uid
=
pTbData
->
uid
};
SMapData
*
mBlock
=
&
pCommitter
->
nBlockMap
;
SBlock
*
pBlock
=
&
pCommitter
->
nBlock
;
SBlockData
*
pBlockData
=
&
pCommitter
->
nBlockData
;
TSKEY
lastTS
;
tBlockIdxReset
(
pBlockIdx
);
tMapDataReset
(
mBlock
);
tBlockReset
(
pBlock
);
tBlockDataReset
(
pBlockData
);
lastTS
=
TSKEY_MIN
;
while
(
1
)
{
if
(
pRow
==
NULL
||
TSDBROW_TS
(
pRow
)
>
pCommitter
->
maxKey
)
{
if
(
pBlockData
->
nRow
>
0
)
{
goto
_write_block
;
}
else
{
break
;
}
}
// update schema
code
=
tsdbCommitterUpdateSchema
(
pCommitter
,
pTbData
->
suid
,
pTbData
->
uid
,
TSDBROW_SVERSION
(
pRow
));
if
(
code
)
goto
_err
;
// append
code
=
tBlockDataAppendRow
(
pBlockData
,
pRow
,
pCommitter
->
pTSchema
);
if
(
code
)
goto
_err
;
// update
pBlock
->
minVersion
=
TMIN
(
pBlock
->
minVersion
,
TSDBROW_VERSION
(
pRow
));
pBlock
->
maxVersion
=
TMAX
(
pBlock
->
maxVersion
,
TSDBROW_VERSION
(
pRow
));
pBlock
->
nRow
++
;
if
(
TSDBROW_TS
(
pRow
)
==
lastTS
)
pBlock
->
hasDup
=
1
;
lastTS
=
TSDBROW_TS
(
pRow
);
// next
tsdbTbDataIterNext
(
pIter
);
pRow
=
tsdbTbDataIterGet
(
pIter
);
// check
if
(
pBlockData
->
nRow
>=
pCommitter
->
maxRow
*
4
/
5
)
goto
_write_block
;
continue
;
_write_block:
row
=
tBlockDataFirstRow
(
pBlockData
);
if
(
tsdbKeyCmprFn
(
&
pBlock
->
minKey
,
&
TSDBROW_KEY
(
&
row
))
>
0
)
pBlock
->
minKey
=
TSDBROW_KEY
(
&
row
);
row
=
tBlockDataLastRow
(
pBlockData
);
if
(
tsdbKeyCmprFn
(
&
pBlock
->
maxKey
,
&
TSDBROW_KEY
(
&
row
))
<
0
)
pBlock
->
maxKey
=
TSDBROW_KEY
(
&
row
);
pBlock
->
last
=
pBlockData
->
nRow
<
pCommitter
->
minRow
?
1
:
0
;
code
=
tsdbWriteBlockData
(
pCommitter
->
pWriter
,
pBlockData
,
NULL
,
NULL
,
pBlockIdx
,
pBlock
,
pCommitter
->
cmprAlg
);
if
(
code
)
goto
_err
;
// Design SMA and write SMA to file
// SBlockIdx
code
=
tMapDataPutItem
(
mBlock
,
pBlock
,
tPutBlock
);
if
(
code
)
goto
_err
;
pBlockIdx
->
minKey
=
TMIN
(
pBlockIdx
->
minKey
,
pBlock
->
minKey
.
ts
);
pBlockIdx
->
maxKey
=
TMAX
(
pBlockIdx
->
maxKey
,
pBlock
->
maxKey
.
ts
);
pBlockIdx
->
minVersion
=
TMIN
(
pBlockIdx
->
minVersion
,
pBlock
->
minVersion
);
pBlockIdx
->
maxVersion
=
TMAX
(
pBlockIdx
->
maxVersion
,
pBlock
->
maxVersion
);
tBlockReset
(
pBlock
);
tBlockDataReset
(
pBlockData
);
lastTS
=
TSKEY_MIN
;
}
// write block
code
=
tsdbWriteBlock
(
pCommitter
->
pWriter
,
mBlock
,
NULL
,
pBlockIdx
);
if
(
code
)
goto
_err
;
code
=
tMapDataPutItem
(
&
pCommitter
->
nBlockIdxMap
,
pBlockIdx
,
tPutBlockIdx
);
if
(
code
)
goto
_err
;
_exit:
if
(
pRow
)
pCommitter
->
nextKey
=
TMIN
(
pCommitter
->
nextKey
,
TSDBROW_TS
(
pRow
));
return
code
;
_err:
tsdbError
(
"vgId:%d tsdb commit memory data failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
static
int32_t
tsdbCommitDiskData
(
SCommitter
*
pCommitter
,
SBlockIdx
*
oBlockIdx
)
{
int32_t
code
=
0
;
SMapData
*
mBlockO
=
&
pCommitter
->
oBlockMap
;
SBlock
*
pBlockO
=
&
pCommitter
->
oBlock
;
SMapData
*
mBlockN
=
&
pCommitter
->
nBlockMap
;
SBlock
*
pBlockN
=
&
pCommitter
->
nBlock
;
SBlockIdx
*
pBlockIdx
=
&
(
SBlockIdx
){
0
};
SBlockData
*
pBlockDataO
=
&
pCommitter
->
oBlockData
;
// read
code
=
tsdbReadBlock
(
pCommitter
->
pReader
,
oBlockIdx
,
mBlockO
,
NULL
);
if
(
code
)
goto
_err
;
// loop to add to new
tMapDataReset
(
mBlockN
);
for
(
int32_t
iBlock
=
0
;
iBlock
<
mBlockO
->
nItem
;
iBlock
++
)
{
tMapDataGetItemByIdx
(
mBlockO
,
iBlock
,
pBlockO
,
tGetBlock
);
if
(
pBlockO
->
last
)
{
ASSERT
(
iBlock
==
mBlockO
->
nItem
-
1
);
code
=
tsdbReadBlockData
(
pCommitter
->
pReader
,
oBlockIdx
,
pBlockO
,
pBlockDataO
,
NULL
,
NULL
);
if
(
code
)
goto
_err
;
tBlockReset
(
pBlockN
);
pBlockN
->
minKey
=
pBlockO
->
minKey
;
pBlockN
->
maxKey
=
pBlockO
->
maxKey
;
pBlockN
->
minVersion
=
pBlockO
->
minVersion
;
pBlockN
->
maxVersion
=
pBlockO
->
maxVersion
;
pBlockN
->
nRow
=
pBlockO
->
nRow
;
pBlockN
->
last
=
pBlockO
->
last
;
pBlockN
->
hasDup
=
pBlockO
->
hasDup
;
code
=
tsdbWriteBlockData
(
pCommitter
->
pWriter
,
pBlockDataO
,
NULL
,
NULL
,
pBlockIdx
,
pBlockN
,
pCommitter
->
cmprAlg
);
if
(
code
)
goto
_err
;
code
=
tMapDataPutItem
(
mBlockN
,
pBlockN
,
tPutBlock
);
if
(
code
)
goto
_err
;
}
else
{
code
=
tMapDataPutItem
(
mBlockN
,
pBlockO
,
tPutBlock
);
if
(
code
)
goto
_err
;
}
}
// SBlock
*
pBlockIdx
=
*
oBlockIdx
;
code
=
tsdbWriteBlock
(
pCommitter
->
pWriter
,
mBlockN
,
NULL
,
pBlockIdx
);
if
(
code
)
goto
_err
;
// SBlockIdx
code
=
tMapDataPutItem
(
&
pCommitter
->
nBlockIdxMap
,
pBlockIdx
,
tPutBlockIdx
);
if
(
code
)
goto
_err
;
return
code
;
_err:
tsdbError
(
"vgId:%d tsdb commit disk data failed since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
static
int32_t
tsdbMergeTableData
(
SCommitter
*
pCommitter
,
STbDataIter
*
pIter
,
SBlock
*
pBlockMerge
,
TSDBKEY
toKey
,
static
int32_t
tsdbMergeTableData
(
SCommitter
*
pCommitter
,
STbDataIter
*
pIter
,
SBlock
*
pBlockMerge
,
TSDBKEY
toKey
,
int8_t
toDataOnly
)
{
int8_t
toDataOnly
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
...
@@ -820,123 +638,6 @@ _err:
...
@@ -820,123 +638,6 @@ _err:
return
code
;
return
code
;
}
}
static
int32_t
tsdbMergeMemDisk
(
SCommitter
*
pCommitter
,
STbData
*
pTbData
,
SBlockIdx
*
oBlockIdx
)
{
int32_t
code
=
0
;
// STbDataIter *pIter = &(STbDataIter){0};
// TSDBROW *pRow;
// // create iter
// tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
// pRow == tsdbTbDataIterGet(pIter);
// if (pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) {
// code = tsdbCommitDiskData(pCommitter, oBlockIdx);
// if (code) {
// goto _err;
// } else {
// goto _exit;
// }
// }
// // start ==================
// // read
// code = tsdbReadBlock(pCommitter->pReader, oBlockIdx, &pCommitter->oBlockMap, NULL);
// if (code) goto _err;
// // loop to merge
// // SBlockData *pBlockData = &pCommitter->nBlockData;
// int32_t iBlock = 0;
// int32_t nBlock = pCommitter->oBlockMap.nItem;
// // SBlock *pBlockO = &pCommitter->oBlock;
// SBlock *pBlock;
// int32_t c;
// // merge ===================
// while (true) {
// if ((pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) && pBlock == NULL) break;
// if ((pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey) && pBlock) {
// if (pBlock->last) {
// // merge memory data and disk data to write to .data/.last (todo)
// code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock,
// (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
// if (code) goto _err;
// pRow = tsdbTbDataIterGet(pIter);
// iBlock++;
// } else {
// c = tBlockCmprFn(&(SBlock){}, pBlock);
// if (c < 0) {
// // commit memory data until pBlock->minKey (not included) only to .data file (todo)
// code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
// if (code) goto _err;
// pRow = tsdbTbDataIterGet(pIter);
// } else if (c > 0) {
// // just move the block (todo)
// // code = tsdbCommitTableDiskData(pCommitter, pBlock);
// if (code) goto _err;
// iBlock++;
// // TODO
// } else {
// int64_t nOvlp = 0; // = tsdbOvlpRows();
// if (nOvlp + pBlock->nRow <= pCommitter->maxRow) {
// // add as a subblock
// } else {
// if (iBlock == nBlock - 1) {
// // merge memory data and disk data to .data/.last file
// code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock,
// (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
// if (code) goto _err;
// } else {
// // merge memory data and disk data to .data file only until pBlock[1].
// code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock, (TSDBKEY){0} /*TODO*/, 1);
// }
// }
// pRow = tsdbTbDataIterGet(pIter);
// iBlock++;
// }
// }
// } else if (pBlock) {
// // code = tsdbCommitTableDiskData(pCommitter, pBlock);
// if (code) goto _err;
// iBlock++;
// // next block
// } else {
// // commit only memory data until (pCommitter->maxKey, VERSION_MAX)
// code =
// tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version =
// VERSION_MIN}, 0);
// if (code) goto _err;
// pRow = tsdbTbDataIterGet(pIter);
// }
// }
// // end =====================
// // SBlock
// // code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx);
// // if (code) goto _err;
// // // SBlockIdx
// // code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx);
// // if (code) goto _err;
// _exit:
// pRow = tsdbTbDataIterGet(pIter);
// if (pRow) {
// pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
// }
return
code
;
// _err:
// tsdbError("vgId:%d tsdb merge mem disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode),
// tstrerror(code)); return code;
}
static
int32_t
tsdbCommitTableDataEnd
(
SCommitter
*
pCommitter
,
int64_t
suid
,
int64_t
uid
)
{
static
int32_t
tsdbCommitTableDataEnd
(
SCommitter
*
pCommitter
,
int64_t
suid
,
int64_t
uid
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
SBlockIdx
*
pBlockIdx
=
&
(
SBlockIdx
){.
suid
=
suid
,
.
uid
=
uid
};
SBlockIdx
*
pBlockIdx
=
&
(
SBlockIdx
){.
suid
=
suid
,
.
uid
=
uid
};
...
@@ -1190,7 +891,6 @@ _err:
...
@@ -1190,7 +891,6 @@ _err:
static
int32_t
tsdbCommitFileDataImpl
(
SCommitter
*
pCommitter
)
{
static
int32_t
tsdbCommitFileDataImpl
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
int32_t
c
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
int32_t
iTbData
=
0
;
int32_t
iTbData
=
0
;
...
@@ -1209,56 +909,37 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
...
@@ -1209,56 +909,37 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
pBlockIdx
=
NULL
;
pBlockIdx
=
NULL
;
}
}
// merge
while
(
pTbData
||
pBlockIdx
)
{
while
(
pTbData
&&
pBlockIdx
)
{
if
(
pTbData
&&
pBlockIdx
)
{
c
=
tTABLEIDCmprFn
(
pTbData
,
pBlockIdx
);
int32_t
c
=
tTABLEIDCmprFn
(
pTbData
,
pBlockIdx
);
if
(
c
==
0
)
{
if
(
c
==
0
)
{
// merge commit
goto
_commit_table_mem_and_disk
;
code
=
tsdbMergeMemDisk
(
pCommitter
,
pTbData
,
pBlockIdx
);
}
else
if
(
c
<
0
)
{
if
(
code
)
goto
_err
;
goto
_commit_table_mem_data
;
iTbData
++
;
iBlockIdx
++
;
if
(
iTbData
<
nTbData
)
{
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
}
else
{
pTbData
=
NULL
;
}
if
(
iBlockIdx
<
nBlockIdx
)
{
tMapDataGetItemByIdx
(
&
pCommitter
->
oBlockIdxMap
,
iBlockIdx
,
pBlockIdx
,
tGetBlockIdx
);
}
else
{
pBlockIdx
=
NULL
;
}
}
else
if
(
c
<
0
)
{
// commit memory data
code
=
tsdbCommitMemoryData
(
pCommitter
,
pTbData
);
if
(
code
)
goto
_err
;
iTbData
++
;
if
(
iTbData
<
nTbData
)
{
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
}
else
{
}
else
{
pTbData
=
NULL
;
goto
_commit_table_disk_data
;
}
}
}
else
if
(
pBlockIdx
)
{
goto
_commit_table_disk_data
;
}
else
{
}
else
{
// commit disk data
goto
_commit_table_mem_data
;
code
=
tsdbCommitDiskData
(
pCommitter
,
pBlockIdx
);
}
if
(
code
)
goto
_err
;
iBlockIdx
++
;
_commit_table_mem_data:
if
(
iBlockIdx
<
nBlockIdx
)
{
code
=
tsdbCommitTableData
(
pCommitter
,
pTbData
,
NULL
);
tMapDataGetItemByIdx
(
&
pCommitter
->
oBlockIdxMap
,
iBlockIdx
,
pBlockIdx
,
tGetBlockIdx
);
if
(
code
)
goto
_err
;
}
else
{
pBlockIdx
=
NULL
;
iTbData
++
;
}
if
(
iTbData
<
nTbData
)
{
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
}
else
{
pTbData
=
NULL
;
}
}
}
continue
;
// disk
_commit_table_disk_data:
while
(
pBlockIdx
)
{
code
=
tsdbCommitTableData
(
pCommitter
,
NULL
,
pBlockIdx
);
// commit disk data
code
=
tsdbCommitDiskData
(
pCommitter
,
pBlockIdx
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
iBlockIdx
++
;
iBlockIdx
++
;
...
@@ -1267,20 +948,25 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
...
@@ -1267,20 +948,25 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
}
else
{
}
else
{
pBlockIdx
=
NULL
;
pBlockIdx
=
NULL
;
}
}
}
continue
;
// memory
_commit_table_mem_and_disk:
while
(
pTbData
)
{
code
=
tsdbCommitTableData
(
pCommitter
,
pTbData
,
pBlockIdx
);
// commit memory data
code
=
tsdbCommitMemoryData
(
pCommitter
,
pTbData
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
iBlockIdx
++
;
if
(
iBlockIdx
<
nBlockIdx
)
{
tMapDataGetItemByIdx
(
&
pCommitter
->
oBlockIdxMap
,
iBlockIdx
,
pBlockIdx
,
tGetBlockIdx
);
}
else
{
pBlockIdx
=
NULL
;
}
iTbData
++
;
iTbData
++
;
if
(
iTbData
<
nTbData
)
{
if
(
iTbData
<
nTbData
)
{
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
}
else
{
}
else
{
pTbData
=
NULL
;
pTbData
=
NULL
;
}
}
continue
;
}
}
return
code
;
return
code
;
...
...
source/dnode/vnode/src/tsdb/tsdbFile.c
浏览文件 @
47a75227
...
@@ -283,10 +283,7 @@ void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) {
...
@@ -283,10 +283,7 @@ void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) {
int32_t
tPutDelFile
(
uint8_t
*
p
,
SDelFile
*
pDelFile
)
{
int32_t
tPutDelFile
(
uint8_t
*
p
,
SDelFile
*
pDelFile
)
{
int32_t
n
=
0
;
int32_t
n
=
0
;
n
+=
tPutI64
(
p
?
p
+
n
:
p
,
pDelFile
->
minKey
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelFile
->
commitID
);
n
+=
tPutI64
(
p
?
p
+
n
:
p
,
pDelFile
->
maxKey
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelFile
->
minVersion
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelFile
->
maxVersion
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelFile
->
size
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelFile
->
size
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelFile
->
offset
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelFile
->
offset
);
...
@@ -296,10 +293,7 @@ int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile) {
...
@@ -296,10 +293,7 @@ int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile) {
int32_t
tGetDelFile
(
uint8_t
*
p
,
SDelFile
*
pDelFile
)
{
int32_t
tGetDelFile
(
uint8_t
*
p
,
SDelFile
*
pDelFile
)
{
int32_t
n
=
0
;
int32_t
n
=
0
;
n
+=
tGetI64
(
p
+
n
,
&
pDelFile
->
minKey
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelFile
->
commitID
);
n
+=
tGetI64
(
p
+
n
,
&
pDelFile
->
maxKey
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelFile
->
minVersion
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelFile
->
maxVersion
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelFile
->
size
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelFile
->
size
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelFile
->
offset
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelFile
->
offset
);
...
...
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
浏览文件 @
47a75227
...
@@ -18,43 +18,45 @@
...
@@ -18,43 +18,45 @@
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
// SDelFWriter ====================================================
// SDelFWriter ====================================================
struct
SDelFWriter
{
STsdb
*
pTsdb
;
SDelFile
*
pFile
;
TdFilePtr
pWriteH
;
};
int32_t
tsdbDelFWriterOpen
(
SDelFWriter
**
ppWriter
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
)
{
int32_t
tsdbDelFWriterOpen
(
SDelFWriter
**
ppWriter
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
char
*
fname
=
NULL
;
// TODO
char
fname
[
TSDB_FILENAME_LEN
];
char
hdr
[
TSDB_FHDR_SIZE
]
=
{
0
};
SDelFWriter
*
pDelFWriter
;
SDelFWriter
*
pDelFWriter
;
int64_t
n
;
// alloc
pDelFWriter
=
(
SDelFWriter
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pDelFWriter
));
pDelFWriter
=
(
SDelFWriter
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pDelFWriter
));
if
(
pDelFWriter
==
NULL
)
{
if
(
pDelFWriter
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
goto
_err
;
}
}
pDelFWriter
->
pTsdb
=
pTsdb
;
pDelFWriter
->
pTsdb
=
pTsdb
;
pDelFWriter
->
pFile
=
pFile
;
pDelFWriter
->
fDel
=
*
pFile
;
tsdbDelFileName
(
pTsdb
,
pFile
,
fname
);
pDelFWriter
->
pWriteH
=
taosOpenFile
(
fname
,
TD_FILE_WRITE
|
TD_FILE_CREATE
);
pDelFWriter
->
pWriteH
=
taosOpenFile
(
fname
,
TD_FILE_WRITE
|
TD_FILE_CREATE
);
if
(
pDelFWriter
->
pWriteH
==
NULL
)
{
if
(
pDelFWriter
->
pWriteH
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
if
(
taosLSeekFile
(
pDelFWriter
->
pWriteH
,
TSDB_FHDR_SIZE
,
SEEK_SET
)
<
0
)
{
// update header
n
=
taosWriteFile
(
pDelFWriter
->
pWriteH
,
&
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
pDelFWriter
->
pFile
->
size
=
TSDB_FHDR_SIZE
;
pDelFWriter
->
fDel
.
size
=
TSDB_FHDR_SIZE
;
pDelFWriter
->
pFile
->
offset
=
0
;
pDelFWriter
->
fDel
.
size
=
0
;
*
ppWriter
=
pDelFWriter
;
return
code
;
return
code
;
_err:
_err:
tsdbError
(
"vgId:%d failed to open del file writer since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d failed to open del file writer since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
*
ppWriter
=
NULL
;
return
code
;
return
code
;
}
}
...
@@ -80,28 +82,33 @@ _err:
...
@@ -80,28 +82,33 @@ _err:
return
code
;
return
code
;
}
}
int32_t
tsdbWriteDelData
(
SDelFWriter
*
pWriter
,
SMapData
*
pDelDataMap
,
uint8_t
**
ppBuf
,
SDelIdx
*
pDelIdx
)
{
int32_t
tsdbWriteDelData
(
SDelFWriter
*
pWriter
,
SArray
*
aDelData
,
uint8_t
**
ppBuf
,
SDelIdx
*
pDelIdx
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
uint8_t
*
pBuf
=
NULL
;
uint8_t
*
pBuf
=
NULL
;
int64_t
size
=
0
;
int64_t
size
;
int64_t
n
=
0
;
int64_t
n
;
SBlockDataHdr
hdr
=
{.
delimiter
=
TSDB_FILE_DLMT
,
.
suid
=
pDelIdx
->
suid
,
.
uid
=
pDelIdx
->
uid
};
if
(
!
ppBuf
)
ppBuf
=
&
pBuf
;
// prepare
// prepare
size
+=
tPutU32
(
NULL
,
TSDB_FILE_DLMT
);
size
=
sizeof
(
hdr
);
size
+=
tPutI64
(
NULL
,
pDelIdx
->
suid
);
for
(
int32_t
iDelData
=
0
;
iDelData
<
taosArrayGetSize
(
aDelData
);
iDelData
++
)
{
size
+=
tPutI64
(
NULL
,
pDelIdx
->
uid
);
size
+=
tPutDelData
(
NULL
,
taosArrayGet
(
aDelData
,
iDelData
));
size
=
size
+
tPutMapData
(
NULL
,
pDelDataMap
)
+
sizeof
(
TSCKSUM
);
}
size
+=
sizeof
(
TSCKSUM
);
// alloc
// alloc
if
(
!
ppBuf
)
ppBuf
=
&
pBuf
;
code
=
tsdbRealloc
(
ppBuf
,
size
);
code
=
tsdbRealloc
(
ppBuf
,
size
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
// build
// build
n
+=
tPutU32
(
*
ppBuf
+
n
,
TSDB_FILE_DLMT
);
n
=
0
;
n
+=
tPutI64
(
*
ppBuf
+
n
,
pDelIdx
->
suid
);
*
(
SBlockDataHdr
*
)(
*
ppBuf
)
=
hdr
;
n
+=
tPutI64
(
*
ppBuf
+
n
,
pDelIdx
->
uid
);
n
+=
sizeof
(
hdr
);
n
+=
tPutMapData
(
*
ppBuf
+
n
,
pDelDataMap
);
for
(
int32_t
iDelData
=
0
;
iDelData
<
taosArrayGetSize
(
aDelData
);
iDelData
++
)
{
size
+=
tPutDelData
(
*
ppBuf
+
n
,
taosArrayGet
(
aDelData
,
iDelData
));
}
taosCalcChecksumAppend
(
0
,
*
ppBuf
,
size
);
taosCalcChecksumAppend
(
0
,
*
ppBuf
,
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
...
@@ -116,10 +123,9 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SMapData *pDelDataMap, uint8_t **
...
@@ -116,10 +123,9 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SMapData *pDelDataMap, uint8_t **
ASSERT
(
n
==
size
);
ASSERT
(
n
==
size
);
// update
// update
pDelIdx
->
offset
=
pWriter
->
pFile
->
size
;
pDelIdx
->
offset
=
pWriter
->
fDel
.
size
;
pDelIdx
->
size
=
size
;
pDelIdx
->
size
=
size
;
pWriter
->
pFile
->
offset
=
pWriter
->
pFile
->
size
;
pWriter
->
fDel
.
size
+=
size
;
pWriter
->
pFile
->
size
+=
size
;
tsdbFree
(
pBuf
);
tsdbFree
(
pBuf
);
return
code
;
return
code
;
...
@@ -130,24 +136,33 @@ _err:
...
@@ -130,24 +136,33 @@ _err:
return
code
;
return
code
;
}
}
int32_t
tsdbWriteDelIdx
(
SDelFWriter
*
pWriter
,
S
MapData
*
pDelIdxMap
,
uint8_t
**
ppBuf
)
{
int32_t
tsdbWriteDelIdx
(
SDelFWriter
*
pWriter
,
S
Array
*
aDelIdx
,
uint8_t
**
ppBuf
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
int64_t
size
=
0
;
int64_t
size
;
int64_t
n
=
0
;
int64_t
n
;
uint8_t
*
pBuf
=
NULL
;
uint8_t
*
pBuf
=
NULL
;
SDelIdx
*
pDelIdx
;
if
(
!
ppBuf
)
ppBuf
=
&
pBuf
;
// prepare
// prepare
size
=
0
;
size
+=
tPutU32
(
NULL
,
TSDB_FILE_DLMT
);
size
+=
tPutU32
(
NULL
,
TSDB_FILE_DLMT
);
size
=
size
+
tPutMapData
(
NULL
,
pDelIdxMap
)
+
sizeof
(
TSCKSUM
);
for
(
int32_t
iDelIdx
=
0
;
iDelIdx
<
taosArrayGetSize
(
aDelIdx
);
iDelIdx
++
)
{
size
+=
tPutDelIdx
(
NULL
,
taosArrayGet
(
aDelIdx
,
iDelIdx
));
}
size
+=
sizeof
(
TSCKSUM
);
// alloc
// alloc
if
(
!
ppBuf
)
ppBuf
=
&
pBuf
;
code
=
tsdbRealloc
(
ppBuf
,
size
);
code
=
tsdbRealloc
(
ppBuf
,
size
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
// build
// build
n
=
0
;
n
+=
tPutU32
(
*
ppBuf
+
n
,
TSDB_FILE_DLMT
);
n
+=
tPutU32
(
*
ppBuf
+
n
,
TSDB_FILE_DLMT
);
n
+=
tPutMapData
(
*
ppBuf
+
n
,
pDelIdxMap
);
for
(
int32_t
iDelIdx
=
0
;
iDelIdx
<
taosArrayGetSize
(
aDelIdx
);
iDelIdx
++
)
{
n
+=
tPutDelIdx
(
*
ppBuf
+
n
,
taosArrayGet
(
aDelIdx
,
iDelIdx
));
}
taosCalcChecksumAppend
(
0
,
*
ppBuf
,
size
);
taosCalcChecksumAppend
(
0
,
*
ppBuf
,
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
...
@@ -159,11 +174,9 @@ int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SMapData *pDelIdxMap, uint8_t **pp
...
@@ -159,11 +174,9 @@ int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SMapData *pDelIdxMap, uint8_t **pp
goto
_err
;
goto
_err
;
}
}
ASSERT
(
n
==
size
);
// update
// update
pWriter
->
pFile
->
offset
=
pWriter
->
pFile
->
size
;
pWriter
->
fDel
.
offset
=
pWriter
->
fDel
.
size
;
pWriter
->
pFile
->
size
+=
size
;
pWriter
->
fDel
.
size
+=
size
;
tsdbFree
(
pBuf
);
tsdbFree
(
pBuf
);
return
code
;
return
code
;
...
@@ -174,23 +187,16 @@ _err:
...
@@ -174,23 +187,16 @@ _err:
return
code
;
return
code
;
}
}
int32_t
tsdbUpdateDelFileHdr
(
SDelFWriter
*
pWriter
,
uint8_t
**
ppBuf
)
{
int32_t
tsdbUpdateDelFileHdr
(
SDelFWriter
*
pWriter
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
uint8_t
*
pBuf
=
NULL
;
char
hdr
[
TSDB_FHDR_SIZE
];
int64_t
size
=
TSDB_FHDR_SIZE
;
int64_t
size
=
TSDB_FHDR_SIZE
;
int64_t
n
;
int64_t
n
;
// alloc
if
(
!
ppBuf
)
ppBuf
=
&
pBuf
;
code
=
tsdbRealloc
(
ppBuf
,
size
);
if
(
code
)
goto
_err
;
// build
// build
memset
(
*
ppBuf
,
0
,
size
);
memset
(
hdr
,
0
,
size
);
n
=
tPutDelFile
(
*
ppBuf
,
pWriter
->
pFile
);
tPutDelFile
(
hdr
,
&
pWriter
->
fDel
);
taosCalcChecksumAppend
(
0
,
*
ppBuf
,
size
);
taosCalcChecksumAppend
(
0
,
hdr
,
size
);
ASSERT
(
n
<=
size
-
sizeof
(
TSCKSUM
));
// seek
// seek
if
(
taosLSeekFile
(
pWriter
->
pWriteH
,
0
,
SEEK_SET
)
<
0
)
{
if
(
taosLSeekFile
(
pWriter
->
pWriteH
,
0
,
SEEK_SET
)
<
0
)
{
...
@@ -199,30 +205,29 @@ int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter, uint8_t **ppBuf) {
...
@@ -199,30 +205,29 @@ int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter, uint8_t **ppBuf) {
}
}
// write
// write
if
(
taosWriteFile
(
pWriter
->
pWriteH
,
*
ppBuf
,
size
)
<
size
)
{
n
=
taosWriteFile
(
pWriter
->
pWriteH
,
hdr
,
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
tsdbFree
(
pBuf
);
return
code
;
return
code
;
_err:
_err:
tsdbError
(
"vgId:%d update del file hdr failed since %s"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d update del file hdr failed since %s"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbFree
(
pBuf
);
return
code
;
return
code
;
}
}
// SDelFReader ====================================================
// SDelFReader ====================================================
struct
SDelFReader
{
struct
SDelFReader
{
STsdb
*
pTsdb
;
STsdb
*
pTsdb
;
SDelFile
*
pFile
;
SDelFile
fDel
;
TdFilePtr
pReadH
;
TdFilePtr
pReadH
;
};
};
int32_t
tsdbDelFReaderOpen
(
SDelFReader
**
ppReader
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
,
uint8_t
**
ppBuf
)
{
int32_t
tsdbDelFReaderOpen
(
SDelFReader
**
ppReader
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
,
uint8_t
**
ppBuf
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
char
*
fname
=
NULL
;
// todo
char
fname
[
TSDB_FILENAME_LEN
];
SDelFReader
*
pDelFReader
;
SDelFReader
*
pDelFReader
;
int64_t
n
;
int64_t
n
;
...
@@ -235,7 +240,9 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
...
@@ -235,7 +240,9 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
// open impl
// open impl
pDelFReader
->
pTsdb
=
pTsdb
;
pDelFReader
->
pTsdb
=
pTsdb
;
pDelFReader
->
pFile
=
pFile
;
pDelFReader
->
fDel
=
*
pFile
;
tsdbDelFileName
(
pTsdb
,
pFile
,
fname
);
pDelFReader
->
pReadH
=
taosOpenFile
(
fname
,
TD_FILE_READ
);
pDelFReader
->
pReadH
=
taosOpenFile
(
fname
,
TD_FILE_READ
);
if
(
pDelFReader
==
NULL
)
{
if
(
pDelFReader
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
...
@@ -243,6 +250,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
...
@@ -243,6 +250,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
goto
_err
;
goto
_err
;
}
}
#if 0
// load and check hdr if buffer is given
// load and check hdr if buffer is given
if (ppBuf) {
if (ppBuf) {
code = tsdbRealloc(ppBuf, TSDB_FHDR_SIZE);
code = tsdbRealloc(ppBuf, TSDB_FHDR_SIZE);
...
@@ -266,6 +274,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
...
@@ -266,6 +274,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
// TODO: check the content
// TODO: check the content
}
}
#endif
_exit:
_exit:
*
ppReader
=
pDelFReader
;
*
ppReader
=
pDelFReader
;
...
@@ -292,66 +301,75 @@ _exit:
...
@@ -292,66 +301,75 @@ _exit:
return
code
;
return
code
;
}
}
int32_t
tsdbReadDelData
(
SDelFReader
*
pReader
,
SDelIdx
*
pDelIdx
,
SMapData
*
pDelDataMap
,
uint8_t
**
ppBuf
)
{
int32_t
tsdbReadDelData
(
SDelFReader
*
pReader
,
SDelIdx
*
pDelIdx
,
SArray
*
aDelData
,
uint8_t
**
ppBuf
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
int64_t
n
;
int64_t
offset
=
pDelIdx
->
offset
;
uint32_t
delimiter
;
int64_t
size
=
pDelIdx
->
size
;
tb_uid_t
suid
;
int64_t
n
;
tb_uid_t
uid
;
uint8_t
*
pBuf
=
NULL
;
SBlockDataHdr
*
pHdr
;
SDelData
*
pDelData
=
&
(
SDelData
){
0
};
if
(
!
ppBuf
)
ppBuf
=
&
pBuf
;
// seek
// seek
if
(
taosLSeekFile
(
pReader
->
pReadH
,
pDelIdx
->
offset
,
SEEK_SET
)
<
0
)
{
if
(
taosLSeekFile
(
pReader
->
pReadH
,
offset
,
SEEK_SET
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
// alloc
// alloc
if
(
!
ppBuf
)
ppBuf
=
&
pDelDataMap
->
pBuf
;
code
=
tsdbRealloc
(
ppBuf
,
size
);
code
=
tsdbRealloc
(
ppBuf
,
pDelIdx
->
size
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
// read
// read
n
=
taosReadFile
(
pReader
->
pReadH
,
*
ppBuf
,
pDelIdx
->
size
);
n
=
taosReadFile
(
pReader
->
pReadH
,
*
ppBuf
,
size
);
if
(
n
<
0
)
{
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
else
if
(
n
<
pDelIdx
->
size
)
{
}
else
if
(
n
<
size
)
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
goto
_err
;
}
}
// check
// check
if
(
!
taosCheckChecksumWhole
(
*
ppBuf
,
pDelIdx
->
size
))
{
if
(
!
taosCheckChecksumWhole
(
*
ppBuf
,
size
))
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
goto
_err
;
}
}
// // decode
// // decode
n
=
0
;
n
=
0
;
n
+=
tGetU32
(
*
ppBuf
+
n
,
&
delimiter
);
pHdr
=
(
SBlockDataHdr
*
)(
*
ppBuf
+
n
);
ASSERT
(
delimiter
==
TSDB_FILE_DLMT
);
ASSERT
(
pHdr
->
delimiter
==
TSDB_FILE_DLMT
);
n
+=
tGetI64
(
*
ppBuf
+
n
,
&
suid
);
ASSERT
(
pHdr
->
suid
==
pDelIdx
->
suid
);
ASSERT
(
suid
==
pDelIdx
->
suid
);
ASSERT
(
pHdr
->
uid
==
pDelIdx
->
uid
);
n
+=
tGetI64
(
*
ppBuf
+
n
,
&
uid
);
n
+=
sizeof
(
*
pHdr
);
ASSERT
(
uid
==
pDelIdx
->
uid
);
while
(
n
<
size
-
sizeof
(
TSCKSUM
))
{
n
+=
tGetMapData
(
*
ppBuf
+
n
,
pDelDataMap
);
n
+=
tGetDelData
(
*
ppBuf
+
n
,
pDelData
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
pDelIdx
->
size
);
}
ASSERT
(
n
==
size
-
sizeof
(
TSCKSUM
));
tsdbFree
(
pBuf
);
return
code
;
return
code
;
_err:
_err:
tsdbError
(
"vgId:%d read del data failed since %s"
,
TD_VID
(
pReader
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d read del data failed since %s"
,
TD_VID
(
pReader
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbFree
(
pBuf
);
return
code
;
return
code
;
}
}
int32_t
tsdbReadDelIdx
(
SDelFReader
*
pReader
,
S
MapData
*
pDelIdxMap
,
uint8_t
**
ppBuf
)
{
int32_t
tsdbReadDelIdx
(
SDelFReader
*
pReader
,
S
Array
*
aDelIdx
,
uint8_t
**
ppBuf
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
int32_t
n
;
int32_t
n
;
int64_t
offset
=
pReader
->
pFile
->
offset
;
int64_t
offset
=
pReader
->
fDel
.
offset
;
int64_t
size
=
pReader
->
pFile
->
size
-
offset
;
int64_t
size
=
pReader
->
fDel
.
size
-
offset
;
uint32_t
delimiter
;
uint32_t
delimiter
;
uint8_t
*
pBuf
=
NULL
;
SDelIdx
*
pDelIdx
=
&
(
SDelIdx
){};
ASSERT
(
ppBuf
&&
*
ppBuf
)
;
if
(
!
ppBuf
)
ppBuf
=
&
pBuf
;
// seek
// seek
if
(
taosLSeekFile
(
pReader
->
pReadH
,
offset
,
SEEK_SET
)
<
0
)
{
if
(
taosLSeekFile
(
pReader
->
pReadH
,
offset
,
SEEK_SET
)
<
0
)
{
...
@@ -360,7 +378,6 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB
...
@@ -360,7 +378,6 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB
}
}
// alloc
// alloc
if
(
!
ppBuf
)
ppBuf
=
&
pDelIdxMap
->
pBuf
;
code
=
tsdbRealloc
(
ppBuf
,
size
);
code
=
tsdbRealloc
(
ppBuf
,
size
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
...
@@ -384,8 +401,18 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB
...
@@ -384,8 +401,18 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB
n
=
0
;
n
=
0
;
n
+=
tGetU32
(
*
ppBuf
+
n
,
&
delimiter
);
n
+=
tGetU32
(
*
ppBuf
+
n
,
&
delimiter
);
ASSERT
(
delimiter
==
TSDB_FILE_DLMT
);
ASSERT
(
delimiter
==
TSDB_FILE_DLMT
);
n
+=
tGetMapData
(
*
ppBuf
+
n
,
pDelIdxMap
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
taosArrayClear
(
aDelIdx
);
while
(
n
<
size
-
sizeof
(
TSCKSUM
))
{
n
+=
tGetDelIdx
(
*
ppBuf
+
n
,
pDelIdx
);
if
(
taosArrayPush
(
aDelIdx
,
pDelIdx
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
}
ASSERT
(
n
==
size
-
sizeof
(
TSCKSUM
));
return
code
;
return
code
;
...
...
source/dnode/vnode/src/tsdb/tsdbUtil.c
浏览文件 @
47a75227
...
@@ -571,10 +571,6 @@ int32_t tPutDelIdx(uint8_t *p, void *ph) {
...
@@ -571,10 +571,6 @@ int32_t tPutDelIdx(uint8_t *p, void *ph) {
n
+=
tPutI64
(
p
?
p
+
n
:
p
,
pDelIdx
->
suid
);
n
+=
tPutI64
(
p
?
p
+
n
:
p
,
pDelIdx
->
suid
);
n
+=
tPutI64
(
p
?
p
+
n
:
p
,
pDelIdx
->
uid
);
n
+=
tPutI64
(
p
?
p
+
n
:
p
,
pDelIdx
->
uid
);
n
+=
tPutI64
(
p
?
p
+
n
:
p
,
pDelIdx
->
minKey
);
n
+=
tPutI64
(
p
?
p
+
n
:
p
,
pDelIdx
->
maxKey
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelIdx
->
minVersion
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelIdx
->
maxVersion
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelIdx
->
offset
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelIdx
->
offset
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelIdx
->
size
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDelIdx
->
size
);
...
@@ -587,10 +583,6 @@ int32_t tGetDelIdx(uint8_t *p, void *ph) {
...
@@ -587,10 +583,6 @@ int32_t tGetDelIdx(uint8_t *p, void *ph) {
n
+=
tGetI64
(
p
+
n
,
&
pDelIdx
->
suid
);
n
+=
tGetI64
(
p
+
n
,
&
pDelIdx
->
suid
);
n
+=
tGetI64
(
p
+
n
,
&
pDelIdx
->
uid
);
n
+=
tGetI64
(
p
+
n
,
&
pDelIdx
->
uid
);
n
+=
tGetI64
(
p
+
n
,
&
pDelIdx
->
minKey
);
n
+=
tGetI64
(
p
+
n
,
&
pDelIdx
->
maxKey
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelIdx
->
minVersion
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelIdx
->
maxVersion
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelIdx
->
offset
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelIdx
->
offset
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelIdx
->
size
);
n
+=
tGetI64v
(
p
+
n
,
&
pDelIdx
->
size
);
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
47a75227
...
@@ -315,7 +315,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p
...
@@ -315,7 +315,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p
if
(
tbUids
==
NULL
)
return
TSDB_CODE_OUT_OF_MEMORY
;
if
(
tbUids
==
NULL
)
return
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
t
=
ntohl
(
*
(
int32_t
*
)
pReq
);
int32_t
t
=
ntohl
(
*
(
int32_t
*
)
pReq
);
v
Debug
(
"vgId:%d, recv ttl msg, time:%d"
,
pVnode
->
config
.
vgId
,
t
);
v
Error
(
"rec ttl time:%d"
,
t
);
int32_t
ret
=
metaTtlDropTable
(
pVnode
->
pMeta
,
t
,
tbUids
);
int32_t
ret
=
metaTtlDropTable
(
pVnode
->
pMeta
,
t
,
tbUids
);
if
(
ret
!=
0
)
{
if
(
ret
!=
0
)
{
goto
end
;
goto
end
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录