Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
83edf4d6
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
83edf4d6
编写于
6月 14, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more work
上级
9f398eca
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
139 addition
and
169 deletion
+139
-169
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+139
-169
未找到文件。
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
83edf4d6
...
...
@@ -181,21 +181,15 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
// load old
if
(
pDelFileR
)
{
code
=
tsdbDelFReaderOpen
(
&
pCommitter
->
pDelFReader
,
pDelFileR
,
pTsdb
,
NULL
);
if
(
code
)
{
goto
_err
;
}
if
(
code
)
goto
_err
;
code
=
tsdbReadDelIdx
(
pCommitter
->
pDelFReader
,
&
pCommitter
->
oDelIdxMap
,
&
pCommitter
->
pBuf1
);
if
(
code
)
{
goto
_err
;
}
code
=
tsdbReadDelIdx
(
pCommitter
->
pDelFReader
,
&
pCommitter
->
oDelIdxMap
,
NULL
);
if
(
code
)
goto
_err
;
}
// prepare new
code
=
tsdbDelFWriterOpen
(
&
pCommitter
->
pDelFWriter
,
pDelFileW
,
pTsdb
);
if
(
code
)
{
goto
_err
;
}
if
(
code
)
goto
_err
;
_exit:
tsdbDebug
(
"vgId:%d commit del start"
,
TD_VID
(
pTsdb
->
pVnode
));
...
...
@@ -206,64 +200,89 @@ _err:
return
code
;
}
static
int32_t
tsdbCommitTableDel
(
SCommitter
*
pCommitter
);
static
int32_t
tsdbCommitDelImpl
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
int32_t
c
;
int32_t
iTbData
=
0
;
int32_t
nTbData
=
taosArrayGetSize
(
pMemTable
->
aTbData
);
int32_t
iDelIdxItem
=
0
;
int32_t
nDelIdxItem
=
pCommitter
->
delIdxOld
.
offset
.
nOffset
;
STbData
*
pTbData
=
NULL
;
SDelIdxItem
*
pDelIdxItem
=
NULL
;
SDelIdxItem
item
;
while
(
iTbData
<
nTbData
||
iDelIdxItem
<
nDelIdxItem
)
{
pTbData
=
NULL
;
pDelIdxItem
=
NULL
;
if
(
iTbData
<
nTbData
)
{
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
}
if
(
iDelIdxItem
<
nDelIdxItem
)
{
tDelIdxGetItemByIdx
(
&
pCommitter
->
delIdxOld
,
&
item
,
iDelIdxItem
);
pDelIdxItem
=
&
item
;
}
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
int32_t
iDelIdx
=
0
;
int32_t
nDelIdx
=
pCommitter
->
oDelIdxMap
.
nItem
;
int32_t
iTbData
=
0
;
int32_t
nTbData
=
taosArrayGetSize
(
pMemTable
->
aTbData
);
STbData
*
pTbData
;
SDelIdx
*
pDelIdx
;
SDelIdx
delIdx
;
int32_t
c
;
ASSERT
(
nTbData
>
0
);
if
(
pTbData
&&
pDelIdxItem
)
{
c
=
tTABLEIDCmprFn
(
pTbData
,
pDelIdxItem
);
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
if
(
iDelIdx
<
nDelIdx
)
{
code
=
tMapDataGetItemByIdx
(
&
pCommitter
->
oDelIdxMap
,
iDelIdx
,
&
delIdx
,
tGetDelIdx
);
if
(
code
)
goto
_err
;
pDelIdx
=
&
delIdx
;
}
else
{
pDelIdx
=
NULL
;
}
while
(
true
)
{
if
(
pTbData
==
NULL
&&
pDelIdx
==
NULL
)
break
;
if
(
pTbData
&&
pDelIdx
)
{
c
=
tTABLEIDCmprFn
(
pTbData
,
pDelIdx
);
if
(
c
==
0
)
{
iTbData
++
;
iDelIdxItem
++
;
goto
_commit_mem_and_disk_del
;
}
else
if
(
c
<
0
)
{
iTbData
++
;
pDelIdxItem
=
NULL
;
goto
_commit_mem_del
;
}
else
{
iDelIdxItem
++
;
pTbData
=
NULL
;
goto
_commit_disk_del
;
}
}
else
{
if
(
pTbData
)
{
iTbData
++
;
}
if
(
pDelIdxItem
)
{
iDelIdxItem
++
;
}
if
(
pTbData
)
goto
_commit_mem_del
;
if
(
pDelIdx
)
goto
_commit_disk_del
;
}
if
(
pTbData
&&
pTbData
->
pHead
==
NULL
)
{
_commit_mem_del:
code
=
tsdbCommitTableDel
(
pCommitter
,
pTbData
,
NULL
);
if
(
code
)
goto
_err
;
iTbData
++
;
if
(
iTbData
<
nTbData
)
{
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
}
else
{
pTbData
=
NULL
;
}
continue
;
if
(
pTbData
==
NULL
&&
pDelIdxItem
==
NULL
)
continue
;
_commit_disk_del:
code
=
tsdbCommitTableDel
(
pCommitter
,
NULL
,
pDelIdx
);
if
(
code
)
goto
_err
;
iDelIdx
++
;
if
(
iDelIdx
<
nDelIdx
)
{
code
=
tMapDataGetItemByIdx
(
&
pCommitter
->
oDelIdxMap
,
iDelIdx
,
&
delIdx
,
tGetDelIdx
);
if
(
code
)
goto
_err
;
pDelIdx
=
&
delIdx
;
}
else
{
pDelIdx
=
NULL
;
}
continue
;
// do merge
pCommitter
->
pTbData
=
pTbData
;
pCommitter
->
pDelIdxItem
=
pDelIdxItem
;
code
=
tsdbCommitTableDel
(
pCommitter
);
_commit_mem_and_disk_del:
code
=
tsdbCommitTableDel
(
pCommitter
,
pTbData
,
pDelIdx
);
if
(
code
)
goto
_err
;
iTbData
++
;
iDelIdx
++
;
if
(
iTbData
<
nTbData
)
{
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pMemTable
->
aTbData
,
iTbData
);
}
else
{
pTbData
=
NULL
;
}
if
(
iDelIdx
<
nDelIdx
)
{
code
=
tMapDataGetItemByIdx
(
&
pCommitter
->
oDelIdxMap
,
iDelIdx
,
&
delIdx
,
tGetDelIdx
);
if
(
code
)
goto
_err
;
pDelIdx
=
&
delIdx
;
}
else
{
pDelIdx
=
NULL
;
}
continue
;
}
return
code
;
...
...
@@ -276,29 +295,20 @@ _err:
static
int32_t
tsdbCommitDelEnd
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
code
=
tsdbWriteDelIdx
(
pCommitter
->
pDelFWriter
,
&
pCommitter
->
delIdxNew
,
NULL
);
if
(
code
)
{
goto
_err
;
}
code
=
tsdbWriteDelIdx
(
pCommitter
->
pDelFWriter
,
&
pCommitter
->
nDelIdxMap
,
NULL
);
if
(
code
)
goto
_err
;
code
=
tsdbUpdateDelFileHdr
(
pCommitter
->
pDelFWriter
,
NULL
);
if
(
code
)
{
goto
_err
;
}
if
(
code
)
goto
_err
;
code
=
tsdbDelFWriterClose
(
pCommitter
->
pDelFWriter
,
1
);
if
(
code
)
{
goto
_err
;
}
if
(
code
)
goto
_err
;
if
(
pCommitter
->
pDelFReader
)
{
code
=
tsdbDelFReaderClose
(
pCommitter
->
pDelFReader
);
if
(
code
)
goto
_err
;
}
tDelDataClear
(
&
pCommitter
->
delDataNew
);
tDelIdxClear
(
&
pCommitter
->
delIdxNew
);
return
code
;
_err:
...
...
@@ -609,129 +619,89 @@ static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter) {
return
code
;
}
static
int32_t
tsdbCommitTableDelStart
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
tb_uid_t
suid
;
tb_uid_t
uid
;
if
(
pCommitter
->
pTbData
)
{
suid
=
pCommitter
->
pTbData
->
suid
;
uid
=
pCommitter
->
pTbData
->
uid
;
static
int32_t
tsdbCommitTableDel
(
SCommitter
*
pCommitter
,
STbData
*
pTbData
,
SDelIdx
*
pDelIdx
)
{
int32_t
code
=
0
;
SDelData
delData
;
SDelOp
*
pDelOp
;
tb_uid_t
suid
;
tb_uid_t
uid
;
SDelIdx
delIdx
;
// TODO
SDelDataInfo
info
;
// TODO
// check no del data, just return
if
(
pTbData
&&
pTbData
->
pHead
==
NULL
)
{
pTbData
=
NULL
;
}
if
(
pTbData
==
NULL
&&
pDelIdx
==
NULL
)
goto
_exit
;
// load old
pCommitter
->
delDataOld
=
(
SDelData
){
0
};
if
(
pCommitter
->
pDelIdxItem
)
{
suid
=
pCommitter
->
pDelIdxItem
->
suid
;
uid
=
pCommitter
->
pDelIdxItem
->
uid
;
code
=
tsdbReadDelData
(
pCommitter
->
pDelFReader
,
pCommitter
->
pDelIdxItem
,
&
pCommitter
->
delDataOld
,
&
pCommitter
->
pBuf5
);
if
(
code
)
goto
_err
;
// prepare
if
(
pTbData
)
{
info
.
suid
=
pTbData
->
suid
;
info
.
uid
=
pTbData
->
uid
;
}
else
{
info
.
suid
=
pDelIdx
->
suid
;
info
.
uid
=
pDelIdx
->
uid
;
}
delIdx
.
suid
=
info
.
suid
;
delIdx
.
uid
=
info
.
uid
;
delIdx
.
minKey
=
TSKEY_MAX
;
delIdx
.
maxKey
=
TSKEY_MIN
;
delIdx
.
minVersion
=
INT64_MAX
;
delIdx
.
maxVersion
=
-
1
;
// prepare new
pCommitter
->
delDataNew
.
suid
=
suid
;
pCommitter
->
delDataNew
.
uid
=
uid
;
pCommitter
->
delDataNew
.
offset
.
flag
=
0
;
pCommitter
->
delDataNew
.
offset
.
nOffset
=
0
;
pCommitter
->
delDataNew
.
nData
=
0
;
pCommitter
->
delIdxItem
=
(
SDelIdxItem
){
.
suid
=
suid
,
.
uid
=
uid
,
.
minKey
=
TSKEY_MAX
,
.
maxKey
=
TSKEY_MIN
,
.
minVersion
=
INT64_MAX
,
.
maxVersion
=
INT64_MIN
,
.
offset
=
-
1
,
.
size
=
-
1
,
};
return
code
;
_err:
tsdbError
(
"vgId:%d commit table del start failed since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
static
int32_t
tsdbCommitTableDelImpl
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
SDelDataItem
item
;
// old
if
(
pCommitter
->
pDelIdxItem
)
{
for
(
int32_t
iDelIdxItem
=
0
;
iDelIdxItem
<
pCommitter
->
delDataOld
.
offset
.
nOffset
;
iDelIdxItem
++
)
{
code
=
tDelDataGetItemByIdx
(
&
pCommitter
->
delDataOld
,
&
item
,
iDelIdxItem
);
if
(
code
)
goto
_err
;
code
=
tDelDataPutItem
(
&
pCommitter
->
delDataNew
,
&
item
);
if
(
code
)
goto
_err
;
// start
tMapDataReset
(
&
pCommitter
->
oDelDataMap
);
tMapDataReset
(
&
pCommitter
->
nDelDataMap
);
// update index
if
(
item
.
version
<
pCommitter
->
delIdxItem
.
minVersion
)
pCommitter
->
delIdxItem
.
minVersion
=
item
.
version
;
if
(
item
.
version
>
pCommitter
->
delIdxItem
.
maxVersion
)
pCommitter
->
delIdxItem
.
maxVersion
=
item
.
version
;
if
(
item
.
sKey
<
pCommitter
->
delIdxItem
.
minKey
)
pCommitter
->
delIdxItem
.
minKey
=
item
.
sKey
;
if
(
item
.
eKey
>
pCommitter
->
delIdxItem
.
maxKey
)
pCommitter
->
delIdxItem
.
maxKey
=
item
.
eKey
;
}
if
(
pDelIdx
)
{
code
=
tsdbReadDelData
(
pCommitter
->
pDelFReader
,
pDelIdx
,
&
pCommitter
->
oDelDataMap
,
NULL
);
if
(
code
)
goto
_err
;
}
//
new
if
(
pCommitter
->
pTbData
)
{
for
(
SDelOp
*
pDelOp
=
pCommitter
->
pTbData
->
pHead
;
pDelOp
;
pDelOp
=
pDelOp
->
pNext
)
{
item
=
(
SDelDataItem
){.
version
=
pDelOp
->
version
,
.
sKey
=
pDelOp
->
sKey
,
.
eKey
=
pDelOp
->
eKey
}
;
//
disk
for
(
int32_t
iDelData
=
0
;
iDelData
<
pCommitter
->
oDelDataMap
.
nItem
;
iDelData
++
)
{
code
=
tMapDataGetItemByIdx
(
&
pCommitter
->
oDelDataMap
,
iDelData
,
&
delData
,
tGetDelData
);
if
(
code
)
goto
_err
;
code
=
tDelDataPutItem
(
&
pCommitter
->
delDataNew
,
&
item
);
if
(
code
)
goto
_err
;
code
=
tMapDataPutItem
(
&
pCommitter
->
nDelDataMap
,
&
delData
,
tPutDelData
);
if
(
code
)
goto
_err
;
// update index
if
(
item
.
version
<
pCommitter
->
delIdxItem
.
minVersion
)
pCommitter
->
delIdxItem
.
minVersion
=
item
.
version
;
if
(
item
.
version
>
pCommitter
->
delIdxItem
.
maxVersion
)
pCommitter
->
delIdxItem
.
maxVersion
=
item
.
version
;
if
(
item
.
sKey
<
pCommitter
->
delIdxItem
.
minKey
)
pCommitter
->
delIdxItem
.
minKey
=
item
.
sKey
;
if
(
item
.
eKey
>
pCommitter
->
delIdxItem
.
maxKey
)
pCommitter
->
delIdxItem
.
maxKey
=
item
.
eKey
;
}
if
(
delIdx
.
minKey
>
delData
.
sKey
)
delIdx
.
minKey
=
delData
.
sKey
;
if
(
delIdx
.
maxKey
<
delData
.
eKey
)
delIdx
.
maxKey
=
delData
.
eKey
;
if
(
delIdx
.
minVersion
>
delData
.
version
)
delIdx
.
minVersion
=
delData
.
version
;
if
(
delIdx
.
maxVersion
<
delData
.
version
)
delIdx
.
maxVersion
=
delData
.
version
;
}
return
code
;
_err:
return
code
;
}
static
int32_t
tsdbCommitTableDelEnd
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
// write table del data
code
=
tsdbWriteDelData
(
pCommitter
->
pDelFWriter
,
&
pCommitter
->
delDataNew
,
NULL
,
&
pCommitter
->
delIdxItem
.
offset
,
&
pCommitter
->
delIdxItem
.
size
);
if
(
code
)
goto
_err
;
// add SDelIdxItem
code
=
tDelIdxPutItem
(
&
pCommitter
->
delIdxNew
,
&
pCommitter
->
delIdxItem
);
if
(
code
)
goto
_err
;
return
code
;
// memory
pDelOp
=
pTbData
?
pTbData
->
pHead
:
NULL
;
for
(;
pDelOp
;
pDelOp
=
pDelOp
->
pNext
)
{
delData
.
version
=
pDelOp
->
version
;
delData
.
sKey
=
pDelOp
->
sKey
;
delData
.
eKey
=
pDelOp
->
eKey
;
_err:
tsdbError
(
"vgId:%d commit table del end failed since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
code
=
tMapDataPutItem
(
&
pCommitter
->
nDelDataMap
,
&
delData
,
tPutDelData
);
if
(
code
)
goto
_err
;
static
int32_t
tsdbCommitTableDel
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
if
(
delIdx
.
minKey
>
delData
.
sKey
)
delIdx
.
minKey
=
delData
.
sKey
;
if
(
delIdx
.
maxKey
<
delData
.
eKey
)
delIdx
.
maxKey
=
delData
.
eKey
;
if
(
delIdx
.
minVersion
>
delData
.
version
)
delIdx
.
minVersion
=
delData
.
version
;
if
(
delIdx
.
maxVersion
<
delData
.
version
)
delIdx
.
maxVersion
=
delData
.
version
;
}
// start
code
=
tsdbCommitTableDelStart
(
pCommitter
);
if
(
code
)
goto
_err
;
ASSERT
(
pCommitter
->
nDelDataMap
.
nItem
>
0
);
//
impl
code
=
tsdb
CommitTableDelImpl
(
pCommitter
);
//
write
code
=
tsdb
WriteDelData
(
pCommitter
->
pDelFWriter
,
&
info
,
&
pCommitter
->
nDelDataMap
,
NULL
,
&
delIdx
.
offset
,
&
delIdx
.
size
);
if
(
code
)
goto
_err
;
//
end
code
=
t
sdbCommitTableDelEnd
(
pCommitter
);
//
put delIdx
code
=
t
MapDataPutItem
(
&
pCommitter
->
nDelIdxMap
,
&
delIdx
,
tPutDelIdx
);
if
(
code
)
goto
_err
;
_exit:
return
code
;
_err:
tsdbError
(
"vgId:%d commit table del failed since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录