Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0c63eecc
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
0c63eecc
编写于
6月 14, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refact some code
上级
9b15bc40
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
177 addition
and
465 deletion
+177
-465
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+3
-18
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+135
-138
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
+16
-16
source/dnode/vnode/src/tsdb/tsdbUtil.c
source/dnode/vnode/src/tsdb/tsdbUtil.c
+23
-293
未找到文件。
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
0c63eecc
...
@@ -37,7 +37,6 @@ typedef struct TABLEID TABLEID;
...
@@ -37,7 +37,6 @@ typedef struct TABLEID TABLEID;
typedef
struct
SDelOp
SDelOp
;
typedef
struct
SDelOp
SDelOp
;
typedef
struct
SDelData
SDelData
;
typedef
struct
SDelData
SDelData
;
typedef
struct
SDelIdx
SDelIdx
;
typedef
struct
SDelIdx
SDelIdx
;
typedef
struct
SDelDataInfo
SDelDataInfo
;
typedef
struct
STbData
STbData
;
typedef
struct
STbData
STbData
;
typedef
struct
SMemTable
SMemTable
;
typedef
struct
SMemTable
SMemTable
;
typedef
struct
STbDataIter
STbDataIter
;
typedef
struct
STbDataIter
STbDataIter
;
...
@@ -48,7 +47,6 @@ typedef struct SMapData SMapData;
...
@@ -48,7 +47,6 @@ typedef struct SMapData SMapData;
typedef
struct
SColData
SColData
;
typedef
struct
SColData
SColData
;
typedef
struct
SColDataBlock
SColDataBlock
;
typedef
struct
SColDataBlock
SColDataBlock
;
typedef
struct
SBlockSMA
SBlockSMA
;
typedef
struct
SBlockSMA
SBlockSMA
;
typedef
struct
SBlockIdxItem
SBlockIdxItem
;
typedef
struct
SBlockIdx
SBlockIdx
;
typedef
struct
SBlockIdx
SBlockIdx
;
typedef
struct
SBlockInfo
SBlockInfo
;
typedef
struct
SBlockInfo
SBlockInfo
;
typedef
struct
SBlock
SBlock
;
typedef
struct
SBlock
SBlock
;
...
@@ -118,8 +116,7 @@ typedef struct SDelFWriter SDelFWriter;
...
@@ -118,8 +116,7 @@ typedef struct SDelFWriter SDelFWriter;
int32_t
tsdbDelFWriterOpen
(
SDelFWriter
**
ppWriter
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
);
int32_t
tsdbDelFWriterOpen
(
SDelFWriter
**
ppWriter
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
);
int32_t
tsdbDelFWriterClose
(
SDelFWriter
*
pWriter
,
int8_t
sync
);
int32_t
tsdbDelFWriterClose
(
SDelFWriter
*
pWriter
,
int8_t
sync
);
int32_t
tsdbWriteDelData
(
SDelFWriter
*
pWriter
,
SDelDataInfo
*
pInfo
,
SMapData
*
pDelDataMap
,
uint8_t
**
ppBuf
,
int32_t
tsdbWriteDelData
(
SDelFWriter
*
pWriter
,
SMapData
*
pDelDataMap
,
uint8_t
**
ppBuf
,
SDelIdx
*
pDelIdx
);
int64_t
*
rOffset
,
int64_t
*
rSize
);
int32_t
tsdbWriteDelIdx
(
SDelFWriter
*
pWriter
,
SMapData
*
pDelIdxMap
,
uint8_t
**
ppBuf
);
int32_t
tsdbWriteDelIdx
(
SDelFWriter
*
pWriter
,
SMapData
*
pDelIdxMap
,
uint8_t
**
ppBuf
);
int32_t
tsdbUpdateDelFileHdr
(
SDelFWriter
*
pWriter
,
uint8_t
**
ppBuf
);
int32_t
tsdbUpdateDelFileHdr
(
SDelFWriter
*
pWriter
,
uint8_t
**
ppBuf
);
...
@@ -150,29 +147,17 @@ int32_t tPutMapData(uint8_t *p, SMapData *pMapData);
...
@@ -150,29 +147,17 @@ int32_t tPutMapData(uint8_t *p, SMapData *pMapData);
int32_t
tGetMapData
(
uint8_t
*
p
,
SMapData
*
pMapData
);
int32_t
tGetMapData
(
uint8_t
*
p
,
SMapData
*
pMapData
);
// SBlockIdx
// SBlockIdx
int32_t
tBlockIdxClear
(
SBlockIdx
*
pBlockIdx
);
int32_t
tPutBlockIdx
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tBlockIdxPutItem
(
SBlockIdx
*
pBlockIdx
,
SBlockIdxItem
*
pItem
);
int32_t
tGetBlockIdx
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tBlockIdxGetItemByIdx
(
SBlockIdx
*
pBlockIdx
,
SBlockIdxItem
*
pItem
,
int32_t
idx
);
int32_t
tBlockIdxGetItem
(
SBlockIdx
*
pBlockIdx
,
SBlockIdxItem
*
pItem
,
TABLEID
id
);
int32_t
tPutBlockIdx
(
uint8_t
*
p
,
SBlockIdx
*
pBlockIdx
);
int32_t
tGetBlockIdx
(
uint8_t
*
p
,
SBlockIdx
*
pBlockIdx
);
// SBlock
// SBlock
int32_t
tBlockCmprFn
(
const
void
*
p1
,
const
void
*
p2
);
int32_t
tBlockCmprFn
(
const
void
*
p1
,
const
void
*
p2
);
// SDelIdx
// SDelIdx
// int32_t tDelIdxClear(SDelIdx *pDelIdx);
// int32_t tDelIdxPutItem(SDelIdx *pDelIdx, SDelIdxItem *pItem);
// int32_t tDelIdxGetItemByIdx(SDelIdx *pDelIdx, SDelIdxItem *pItem, int32_t idx);
// int32_t tDelIdxGetItem(SDelIdx *pDelIdx, SDelIdxItem *pItem, TABLEID id);
int32_t
tPutDelIdx
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tPutDelIdx
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tGetDelIdx
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tGetDelIdx
(
uint8_t
*
p
,
void
*
ph
);
// SDelData
// SDelData
// int32_t tDelDataClear(SDelData *pDelData);
// int32_t tDelDataPutItem(SDelData *pDelData, SDelDataItem *pItem);
// int32_t tDelDataGetItemByIdx(SDelData *pDelData, SDelDataItem *pItem, int32_t idx);
// int32_t tDelDataGetItem(SDelData *pDelData, SDelDataItem *pItem, int64_t version);
int32_t
tPutDelData
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tPutDelData
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tGetDelData
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tGetDelData
(
uint8_t
*
p
,
void
*
ph
);
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
0c63eecc
...
@@ -139,35 +139,6 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
...
@@ -139,35 +139,6 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
return
code
;
return
code
;
}
}
static
int32_t
tsdbCommitData
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
// check
if
(
pMemTable
->
nRow
==
0
)
{
goto
_exit
;
}
// loop
pCommitter
->
nextKey
=
pMemTable
->
minKey
.
ts
;
while
(
pCommitter
->
nextKey
<
TSKEY_MAX
)
{
pCommitter
->
commitFid
=
tsdbKeyFid
(
pCommitter
->
nextKey
,
pCommitter
->
minutes
,
pCommitter
->
precision
);
tsdbFidKeyRange
(
pCommitter
->
commitFid
,
pCommitter
->
minutes
,
pCommitter
->
precision
,
&
pCommitter
->
minKey
,
&
pCommitter
->
maxKey
);
code
=
tsdbCommitFileData
(
pCommitter
);
if
(
code
)
goto
_err
;
}
_exit:
tsdbDebug
(
"vgId:%d commit data done, nRow:%"
PRId64
,
TD_VID
(
pTsdb
->
pVnode
),
pMemTable
->
nRow
);
return
code
;
_err:
tsdbError
(
"vgId:%d commit data failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
static
int32_t
tsdbCommitDelStart
(
SCommitter
*
pCommitter
)
{
static
int32_t
tsdbCommitDelStart
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
...
@@ -200,6 +171,90 @@ _err:
...
@@ -200,6 +171,90 @@ _err:
return
code
;
return
code
;
}
}
static
int32_t
tsdbCommitTableDel
(
SCommitter
*
pCommitter
,
STbData
*
pTbData
,
SDelIdx
*
pDelIdx
)
{
int32_t
code
=
0
;
SDelData
delData
;
SDelOp
*
pDelOp
;
tb_uid_t
suid
;
tb_uid_t
uid
;
SDelIdx
delIdx
;
// TODO
// check no del data, just return
if
(
pTbData
&&
pTbData
->
pHead
==
NULL
)
{
pTbData
=
NULL
;
}
if
(
pTbData
==
NULL
&&
pDelIdx
==
NULL
)
goto
_exit
;
// prepare
if
(
pTbData
)
{
delIdx
.
suid
=
pTbData
->
suid
;
delIdx
.
uid
=
pTbData
->
uid
;
}
else
{
delIdx
.
suid
=
pDelIdx
->
suid
;
delIdx
.
uid
=
pDelIdx
->
uid
;
}
delIdx
.
minKey
=
TSKEY_MAX
;
delIdx
.
maxKey
=
TSKEY_MIN
;
delIdx
.
minVersion
=
INT64_MAX
;
delIdx
.
maxVersion
=
-
1
;
// start
tMapDataReset
(
&
pCommitter
->
oDelDataMap
);
tMapDataReset
(
&
pCommitter
->
nDelDataMap
);
if
(
pDelIdx
)
{
code
=
tsdbReadDelData
(
pCommitter
->
pDelFReader
,
pDelIdx
,
&
pCommitter
->
oDelDataMap
,
NULL
);
if
(
code
)
goto
_err
;
}
// disk
for
(
int32_t
iDelData
=
0
;
iDelData
<
pCommitter
->
oDelDataMap
.
nItem
;
iDelData
++
)
{
code
=
tMapDataGetItemByIdx
(
&
pCommitter
->
oDelDataMap
,
iDelData
,
&
delData
,
tGetDelData
);
if
(
code
)
goto
_err
;
code
=
tMapDataPutItem
(
&
pCommitter
->
nDelDataMap
,
&
delData
,
tPutDelData
);
if
(
code
)
goto
_err
;
if
(
delIdx
.
minKey
>
delData
.
sKey
)
delIdx
.
minKey
=
delData
.
sKey
;
if
(
delIdx
.
maxKey
<
delData
.
eKey
)
delIdx
.
maxKey
=
delData
.
eKey
;
if
(
delIdx
.
minVersion
>
delData
.
version
)
delIdx
.
minVersion
=
delData
.
version
;
if
(
delIdx
.
maxVersion
<
delData
.
version
)
delIdx
.
maxVersion
=
delData
.
version
;
}
// memory
pDelOp
=
pTbData
?
pTbData
->
pHead
:
NULL
;
for
(;
pDelOp
;
pDelOp
=
pDelOp
->
pNext
)
{
delData
.
version
=
pDelOp
->
version
;
delData
.
sKey
=
pDelOp
->
sKey
;
delData
.
eKey
=
pDelOp
->
eKey
;
code
=
tMapDataPutItem
(
&
pCommitter
->
nDelDataMap
,
&
delData
,
tPutDelData
);
if
(
code
)
goto
_err
;
if
(
delIdx
.
minKey
>
delData
.
sKey
)
delIdx
.
minKey
=
delData
.
sKey
;
if
(
delIdx
.
maxKey
<
delData
.
eKey
)
delIdx
.
maxKey
=
delData
.
eKey
;
if
(
delIdx
.
minVersion
>
delData
.
version
)
delIdx
.
minVersion
=
delData
.
version
;
if
(
delIdx
.
maxVersion
<
delData
.
version
)
delIdx
.
maxVersion
=
delData
.
version
;
}
ASSERT
(
pCommitter
->
nDelDataMap
.
nItem
>
0
);
// write
code
=
tsdbWriteDelData
(
pCommitter
->
pDelFWriter
,
&
pCommitter
->
nDelDataMap
,
NULL
,
&
delIdx
);
if
(
code
)
goto
_err
;
// put delIdx
code
=
tMapDataPutItem
(
&
pCommitter
->
nDelIdxMap
,
&
delIdx
,
tPutDelIdx
);
if
(
code
)
goto
_err
;
_exit:
return
code
;
_err:
tsdbError
(
"vgId:%d commit table del failed since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
static
int32_t
tsdbCommitDelImpl
(
SCommitter
*
pCommitter
)
{
static
int32_t
tsdbCommitDelImpl
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
...
@@ -316,48 +371,6 @@ _err:
...
@@ -316,48 +371,6 @@ _err:
return
code
;
return
code
;
}
}
static
int32_t
tsdbCommitDel
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
if
(
pMemTable
->
nDel
==
0
)
{
goto
_exit
;
}
// start
code
=
tsdbCommitDelStart
(
pCommitter
);
if
(
code
)
{
goto
_err
;
}
// impl
code
=
tsdbCommitDelImpl
(
pCommitter
);
if
(
code
)
{
goto
_err
;
}
// end
code
=
tsdbCommitDelEnd
(
pCommitter
);
if
(
code
)
{
goto
_err
;
}
_exit:
tsdbDebug
(
"vgId:%d commit del done, nDel:%"
PRId64
,
TD_VID
(
pTsdb
->
pVnode
),
pMemTable
->
nDel
);
return
code
;
_err:
tsdbError
(
"vgId:%d commit del failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
static
int32_t
tsdbCommitCache
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
static
int32_t
tsdbEndCommit
(
SCommitter
*
pCommitter
,
int32_t
eno
)
{
static
int32_t
tsdbEndCommit
(
SCommitter
*
pCommitter
,
int32_t
eno
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
// TODO
// TODO
...
@@ -619,89 +632,73 @@ static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter) {
...
@@ -619,89 +632,73 @@ static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter) {
return
code
;
return
code
;
}
}
static
int32_t
tsdbCommitTableDel
(
SCommitter
*
pCommitter
,
STbData
*
pTbData
,
SDelIdx
*
pDelIdx
)
{
static
int32_t
tsdbCommitData
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
SDelData
delData
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SDelOp
*
pDelOp
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
tb_uid_t
suid
;
tb_uid_t
uid
;
SDelIdx
delIdx
;
// TODO
SDelDataInfo
info
;
// TODO
// check no del data, just return
if
(
pTbData
&&
pTbData
->
pHead
==
NULL
)
{
pTbData
=
NULL
;
}
if
(
pTbData
==
NULL
&&
pDelIdx
==
NULL
)
goto
_exit
;
// prepare
// check
if
(
pTbData
)
{
if
(
pMemTable
->
nRow
==
0
)
{
info
.
suid
=
pTbData
->
suid
;
goto
_exit
;
info
.
uid
=
pTbData
->
uid
;
}
else
{
info
.
suid
=
pDelIdx
->
suid
;
info
.
uid
=
pDelIdx
->
uid
;
}
}
delIdx
.
suid
=
info
.
suid
;
delIdx
.
uid
=
info
.
uid
;
delIdx
.
minKey
=
TSKEY_MAX
;
delIdx
.
maxKey
=
TSKEY_MIN
;
delIdx
.
minVersion
=
INT64_MAX
;
delIdx
.
maxVersion
=
-
1
;
// start
tMapDataReset
(
&
pCommitter
->
oDelDataMap
);
tMapDataReset
(
&
pCommitter
->
nDelDataMap
);
if
(
pDelIdx
)
{
// loop
code
=
tsdbReadDelData
(
pCommitter
->
pDelFReader
,
pDelIdx
,
&
pCommitter
->
oDelDataMap
,
NULL
);
pCommitter
->
nextKey
=
pMemTable
->
minKey
.
ts
;
while
(
pCommitter
->
nextKey
<
TSKEY_MAX
)
{
pCommitter
->
commitFid
=
tsdbKeyFid
(
pCommitter
->
nextKey
,
pCommitter
->
minutes
,
pCommitter
->
precision
);
tsdbFidKeyRange
(
pCommitter
->
commitFid
,
pCommitter
->
minutes
,
pCommitter
->
precision
,
&
pCommitter
->
minKey
,
&
pCommitter
->
maxKey
);
code
=
tsdbCommitFileData
(
pCommitter
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
}
}
// disk
_exit:
for
(
int32_t
iDelData
=
0
;
iDelData
<
pCommitter
->
oDelDataMap
.
nItem
;
iDelData
++
)
{
tsdbDebug
(
"vgId:%d commit data done, nRow:%"
PRId64
,
TD_VID
(
pTsdb
->
pVnode
),
pMemTable
->
nRow
);
code
=
tMapDataGetItemByIdx
(
&
pCommitter
->
oDelDataMap
,
iDelData
,
&
delData
,
tGetDelData
);
return
code
;
if
(
code
)
goto
_err
;
code
=
tMapDataPutItem
(
&
pCommitter
->
nDelDataMap
,
&
delData
,
tPutDelData
);
if
(
code
)
goto
_err
;
if
(
delIdx
.
minKey
>
delData
.
sKey
)
delIdx
.
minKey
=
delData
.
sKey
;
if
(
delIdx
.
maxKey
<
delData
.
eKey
)
delIdx
.
maxKey
=
delData
.
eKey
;
if
(
delIdx
.
minVersion
>
delData
.
version
)
delIdx
.
minVersion
=
delData
.
version
;
if
(
delIdx
.
maxVersion
<
delData
.
version
)
delIdx
.
maxVersion
=
delData
.
version
;
}
// memory
_err:
pDelOp
=
pTbData
?
pTbData
->
pHead
:
NULL
;
tsdbError
(
"vgId:%d commit data failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
for
(;
pDelOp
;
pDelOp
=
pDelOp
->
pNext
)
{
return
code
;
delData
.
version
=
pDelOp
->
version
;
}
delData
.
sKey
=
pDelOp
->
sKey
;
delData
.
eKey
=
pDelOp
->
eKey
;
code
=
tMapDataPutItem
(
&
pCommitter
->
nDelDataMap
,
&
delData
,
tPutDelData
);
static
int32_t
tsdbCommitDel
(
SCommitter
*
pCommitter
)
{
if
(
code
)
goto
_err
;
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
if
(
delIdx
.
minKey
>
delData
.
sKey
)
delIdx
.
minKey
=
delData
.
sKey
;
if
(
pMemTable
->
nDel
==
0
)
{
if
(
delIdx
.
maxKey
<
delData
.
eKey
)
delIdx
.
maxKey
=
delData
.
eKey
;
goto
_exit
;
if
(
delIdx
.
minVersion
>
delData
.
version
)
delIdx
.
minVersion
=
delData
.
version
;
if
(
delIdx
.
maxVersion
<
delData
.
version
)
delIdx
.
maxVersion
=
delData
.
version
;
}
}
ASSERT
(
pCommitter
->
nDelDataMap
.
nItem
>
0
);
// start
code
=
tsdbCommitDelStart
(
pCommitter
);
if
(
code
)
{
goto
_err
;
}
// write
// impl
code
=
tsdbWriteDelData
(
pCommitter
->
pDelFWriter
,
&
info
,
&
pCommitter
->
nDelDataMap
,
NULL
,
&
delIdx
.
offset
,
&
delIdx
.
size
);
code
=
tsdbCommitDelImpl
(
pCommitter
);
if
(
code
)
goto
_err
;
if
(
code
)
{
goto
_err
;
}
// put delIdx
// end
code
=
tMapDataPutItem
(
&
pCommitter
->
nDelIdxMap
,
&
delIdx
,
tPutDelIdx
);
code
=
tsdbCommitDelEnd
(
pCommitter
);
if
(
code
)
goto
_err
;
if
(
code
)
{
goto
_err
;
}
_exit:
_exit:
tsdbDebug
(
"vgId:%d commit del done, nDel:%"
PRId64
,
TD_VID
(
pTsdb
->
pVnode
),
pMemTable
->
nDel
);
return
code
;
return
code
;
_err:
_err:
tsdbError
(
"vgId:%d commit
table del failed since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d commit
del failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
return
code
;
}
}
\ No newline at end of file
static
int32_t
tsdbCommitCache
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
浏览文件 @
0c63eecc
...
@@ -139,8 +139,7 @@ _err:
...
@@ -139,8 +139,7 @@ _err:
return
code
;
return
code
;
}
}
int32_t
tsdbWriteDelData
(
SDelFWriter
*
pWriter
,
SDelDataInfo
*
pInfo
,
SMapData
*
pDelDataMap
,
uint8_t
**
ppBuf
,
int32_t
tsdbWriteDelData
(
SDelFWriter
*
pWriter
,
SMapData
*
pDelDataMap
,
uint8_t
**
ppBuf
,
SDelIdx
*
pDelIdx
)
{
int64_t
*
rOffset
,
int64_t
*
rSize
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
uint8_t
*
pBuf
=
NULL
;
uint8_t
*
pBuf
=
NULL
;
int64_t
size
=
0
;
int64_t
size
=
0
;
...
@@ -148,8 +147,8 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelDataInfo *pInfo, SMapData *pD
...
@@ -148,8 +147,8 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelDataInfo *pInfo, SMapData *pD
// prepare
// prepare
size
+=
tPutU32
(
NULL
,
TSDB_FILE_DLMT
);
size
+=
tPutU32
(
NULL
,
TSDB_FILE_DLMT
);
size
+=
tPutI64
(
NULL
,
p
Info
->
suid
);
size
+=
tPutI64
(
NULL
,
p
DelIdx
->
suid
);
size
+=
tPutI64
(
NULL
,
p
Info
->
uid
);
size
+=
tPutI64
(
NULL
,
p
DelIdx
->
uid
);
size
=
size
+
tPutMapData
(
NULL
,
pDelDataMap
)
+
sizeof
(
TSCKSUM
);
size
=
size
+
tPutMapData
(
NULL
,
pDelDataMap
)
+
sizeof
(
TSCKSUM
);
// alloc
// alloc
...
@@ -159,8 +158,8 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelDataInfo *pInfo, SMapData *pD
...
@@ -159,8 +158,8 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelDataInfo *pInfo, SMapData *pD
// build
// build
n
+=
tPutU32
(
*
ppBuf
+
n
,
TSDB_FILE_DLMT
);
n
+=
tPutU32
(
*
ppBuf
+
n
,
TSDB_FILE_DLMT
);
n
+=
tPutI64
(
*
ppBuf
+
n
,
p
Info
->
suid
);
n
+=
tPutI64
(
*
ppBuf
+
n
,
p
DelIdx
->
suid
);
n
+=
tPutI64
(
*
ppBuf
+
n
,
p
Info
->
uid
);
n
+=
tPutI64
(
*
ppBuf
+
n
,
p
DelIdx
->
uid
);
n
+=
tPutMapData
(
*
ppBuf
+
n
,
pDelDataMap
);
n
+=
tPutMapData
(
*
ppBuf
+
n
,
pDelDataMap
);
taosCalcChecksumAppend
(
0
,
*
ppBuf
,
size
);
taosCalcChecksumAppend
(
0
,
*
ppBuf
,
size
);
...
@@ -176,8 +175,8 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelDataInfo *pInfo, SMapData *pD
...
@@ -176,8 +175,8 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelDataInfo *pInfo, SMapData *pD
ASSERT
(
n
==
size
);
ASSERT
(
n
==
size
);
// update
// update
*
rO
ffset
=
pWriter
->
pFile
->
size
;
pDelIdx
->
o
ffset
=
pWriter
->
pFile
->
size
;
*
rS
ize
=
size
;
pDelIdx
->
s
ize
=
size
;
pWriter
->
pFile
->
offset
=
pWriter
->
pFile
->
size
;
pWriter
->
pFile
->
offset
=
pWriter
->
pFile
->
size
;
pWriter
->
pFile
->
size
+=
size
;
pWriter
->
pFile
->
size
+=
size
;
...
@@ -348,10 +347,11 @@ _exit:
...
@@ -348,10 +347,11 @@ _exit:
}
}
int32_t
tsdbReadDelData
(
SDelFReader
*
pReader
,
SDelIdx
*
pDelIdx
,
SMapData
*
pDelDataMap
,
uint8_t
**
ppBuf
)
{
int32_t
tsdbReadDelData
(
SDelFReader
*
pReader
,
SDelIdx
*
pDelIdx
,
SMapData
*
pDelDataMap
,
uint8_t
**
ppBuf
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
int64_t
n
;
int64_t
n
;
uint32_t
delimiter
;
uint32_t
delimiter
;
SDelDataInfo
info
;
tb_uid_t
suid
;
tb_uid_t
uid
;
// seek
// seek
if
(
taosLSeekFile
(
pReader
->
pReadH
,
pDelIdx
->
offset
,
SEEK_SET
)
<
0
)
{
if
(
taosLSeekFile
(
pReader
->
pReadH
,
pDelIdx
->
offset
,
SEEK_SET
)
<
0
)
{
...
@@ -381,10 +381,10 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SMapData *pDelDa
...
@@ -381,10 +381,10 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SMapData *pDelDa
n
=
0
;
n
=
0
;
n
+=
tGetU32
(
*
ppBuf
+
n
,
&
delimiter
);
n
+=
tGetU32
(
*
ppBuf
+
n
,
&
delimiter
);
ASSERT
(
delimiter
==
TSDB_FILE_DLMT
);
ASSERT
(
delimiter
==
TSDB_FILE_DLMT
);
n
+=
tGetI64
(
*
ppBuf
+
n
,
&
info
.
suid
);
n
+=
tGetI64
(
*
ppBuf
+
n
,
&
suid
);
ASSERT
(
info
.
suid
==
pDelIdx
->
suid
);
ASSERT
(
suid
==
pDelIdx
->
suid
);
n
+=
tGetI64
(
*
ppBuf
+
n
,
&
info
.
uid
);
n
+=
tGetI64
(
*
ppBuf
+
n
,
&
uid
);
ASSERT
(
info
.
uid
==
pDelIdx
->
uid
);
ASSERT
(
uid
==
pDelIdx
->
uid
);
n
+=
tGetMapData
(
*
ppBuf
+
n
,
pDelDataMap
);
n
+=
tGetMapData
(
*
ppBuf
+
n
,
pDelDataMap
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
pDelIdx
->
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
pDelIdx
->
size
);
...
...
source/dnode/vnode/src/tsdb/tsdbUtil.c
浏览文件 @
0c63eecc
...
@@ -356,143 +356,35 @@ static FORCE_INLINE int32_t tGetTSDBKEY(uint8_t *p, TSDBKEY *pKey) {
...
@@ -356,143 +356,35 @@ static FORCE_INLINE int32_t tGetTSDBKEY(uint8_t *p, TSDBKEY *pKey) {
return
n
;
return
n
;
}
}
// // SDelIdxItem ======================================================
// static FORCE_INLINE int32_t tPutDelIdxItem(uint8_t *p, SDelIdxItem *pDelIdxItem) {
// int32_t n = 0;
// n += tPutI64(p ? p + n : p, pDelIdxItem->suid);
// n += tPutI64(p ? p + n : p, pDelIdxItem->uid);
// n += tPutI64(p ? p + n : p, pDelIdxItem->minKey);
// n += tPutI64(p ? p + n : p, pDelIdxItem->maxKey);
// n += tPutI64v(p ? p + n : p, pDelIdxItem->minVersion);
// n += tPutI64v(p ? p + n : p, pDelIdxItem->maxVersion);
// n += tPutI64v(p ? p + n : p, pDelIdxItem->offset);
// n += tPutI64v(p ? p + n : p, pDelIdxItem->size);
// return n;
// }
// static FORCE_INLINE int32_t tGetDelIdxItem(uint8_t *p, SDelIdxItem *pDelIdxItem) {
// int32_t n = 0;
// n += tGetI64(p + n, &pDelIdxItem->suid);
// n += tGetI64(p + n, &pDelIdxItem->uid);
// n += tGetI64(p + n, &pDelIdxItem->minKey);
// n += tGetI64(p + n, &pDelIdxItem->maxKey);
// n += tGetI64v(p + n, &pDelIdxItem->minVersion);
// n += tGetI64v(p + n, &pDelIdxItem->maxVersion);
// n += tGetI64v(p + n, &pDelIdxItem->offset);
// n += tGetI64v(p + n, &pDelIdxItem->size);
// return n;
// }
// SBlockIdxItem ======================================================
static
FORCE_INLINE
int32_t
tPutBlockIdxItem
(
uint8_t
*
p
,
SBlockIdxItem
*
pItem
)
{
int32_t
n
=
0
;
n
+=
tPutI64
(
p
?
p
+
n
:
p
,
pItem
->
suid
);
n
+=
tPutI64
(
p
?
p
+
n
:
p
,
pItem
->
uid
);
n
+=
tPutTSDBKEY
(
p
?
p
+
n
:
p
,
&
pItem
->
minKey
);
n
+=
tPutTSDBKEY
(
p
?
p
+
n
:
p
,
&
pItem
->
maxKey
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pItem
->
minVersion
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pItem
->
maxVersion
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pItem
->
offset
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pItem
->
size
);
return
n
;
}
static
FORCE_INLINE
int32_t
tGetBlockIdxItem
(
uint8_t
*
p
,
SBlockIdxItem
*
pItem
)
{
int32_t
n
=
0
;
n
+=
tGetI64
(
p
+
n
,
&
pItem
->
suid
);
n
+=
tGetI64
(
p
+
n
,
&
pItem
->
uid
);
n
+=
tGetTSDBKEY
(
p
+
n
,
&
pItem
->
minKey
);
n
+=
tGetTSDBKEY
(
p
+
n
,
&
pItem
->
maxKey
);
n
+=
tGetI64v
(
p
+
n
,
&
pItem
->
minVersion
);
n
+=
tGetI64v
(
p
+
n
,
&
pItem
->
maxVersion
);
n
+=
tGetI64v
(
p
+
n
,
&
pItem
->
offset
);
n
+=
tGetI64v
(
p
+
n
,
&
pItem
->
size
);
return
n
;
}
// SBlockIdx ======================================================
// SBlockIdx ======================================================
int32_t
tBlockIdxClear
(
SBlockIdx
*
pBlockIdx
)
{
int32_t
tPutBlockIdx
(
uint8_t
*
p
,
void
*
ph
)
{
int32_t
code
=
0
;
int32_t
n
=
0
;
tsdbFree
(
pBlockIdx
->
offset
.
pOffset
);
SBlockIdx
*
pBlockIdx
=
(
SBlockIdx
*
)
ph
;
tsdbFree
(
pBlockIdx
->
pData
);
return
code
;
n
+=
tPutI64
(
p
?
p
+
n
:
p
,
pBlockIdx
->
suid
);
}
n
+=
tPutI64
(
p
?
p
+
n
:
p
,
pBlockIdx
->
uid
);
n
+=
tPutTSDBKEY
(
p
?
p
+
n
:
p
,
&
pBlockIdx
->
minKey
);
int32_t
tBlockIdxPutItem
(
SBlockIdx
*
pBlockIdx
,
SBlockIdxItem
*
pItem
)
{
n
+=
tPutTSDBKEY
(
p
?
p
+
n
:
p
,
&
pBlockIdx
->
maxKey
);
int32_t
code
=
0
;
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pBlockIdx
->
minVersion
);
// TODO
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pBlockIdx
->
maxVersion
);
return
code
;
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pBlockIdx
->
offset
);
}
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pBlockIdx
->
size
);
int32_t
tBlockIdxGetItemByIdx
(
SBlockIdx
*
pBlockIdx
,
SBlockIdxItem
*
pItem
,
int32_t
idx
)
{
int32_t
code
=
0
;
int32_t
offset
;
offset
=
tsdbGetOffset
(
&
pBlockIdx
->
offset
,
idx
);
if
(
offset
<
0
)
{
code
=
TSDB_CODE_NOT_FOUND
;
goto
_exit
;
}
tGetBlockIdxItem
(
pBlockIdx
->
pData
+
offset
,
pItem
);
_exit:
return
code
;
}
int32_t
tBlockIdxGetItem
(
SBlockIdx
*
pBlockIdx
,
SBlockIdxItem
*
pItem
,
TABLEID
id
)
{
int32_t
code
=
0
;
int32_t
lidx
=
0
;
int32_t
ridx
=
pBlockIdx
->
offset
.
nOffset
-
1
;
int32_t
midx
;
int32_t
c
;
while
(
lidx
<=
ridx
)
{
midx
=
(
lidx
+
midx
)
/
2
;
code
=
tBlockIdxGetItemByIdx
(
pBlockIdx
,
pItem
,
midx
);
if
(
code
)
goto
_exit
;
c
=
tTABLEIDCmprFn
(
&
id
,
pItem
);
if
(
c
==
0
)
{
goto
_exit
;
}
else
if
(
c
<
0
)
{
ridx
=
midx
-
1
;
}
else
{
lidx
=
midx
+
1
;
}
}
code
=
TSDB_CODE_NOT_FOUND
;
_exit:
return
code
;
}
int32_t
tPutBlockIdx
(
uint8_t
*
p
,
SBlockIdx
*
pBlockIdx
)
{
int32_t
n
=
0
;
n
+=
tPutU32
(
p
?
p
+
n
:
p
,
pBlockIdx
->
delimiter
);
n
+=
tPutOffset
(
p
?
p
+
n
:
p
,
&
pBlockIdx
->
offset
);
n
+=
tPutBinary
(
p
?
p
+
n
:
p
,
pBlockIdx
->
pData
,
pBlockIdx
->
nData
);
return
n
;
return
n
;
}
}
int32_t
tGetBlockIdx
(
uint8_t
*
p
,
SBlockIdx
*
pBlockIdx
)
{
int32_t
tGetBlockIdx
(
uint8_t
*
p
,
void
*
ph
)
{
int32_t
n
=
0
;
int32_t
n
=
0
;
SBlockIdx
*
pBlockIdx
=
(
SBlockIdx
*
)
ph
;
n
+=
tGetU32
(
p
+
n
,
&
pBlockIdx
->
delimiter
);
n
+=
tGetI64
(
p
+
n
,
&
pBlockIdx
->
suid
);
n
+=
tGetOffset
(
p
+
n
,
&
pBlockIdx
->
offset
);
n
+=
tGetI64
(
p
+
n
,
&
pBlockIdx
->
uid
);
n
+=
tGetBinary
(
p
+
n
,
&
pBlockIdx
->
pData
,
&
pBlockIdx
->
nData
);
n
+=
tGetTSDBKEY
(
p
+
n
,
&
pBlockIdx
->
minKey
);
n
+=
tGetTSDBKEY
(
p
+
n
,
&
pBlockIdx
->
maxKey
);
n
+=
tGetI64v
(
p
+
n
,
&
pBlockIdx
->
minVersion
);
n
+=
tGetI64v
(
p
+
n
,
&
pBlockIdx
->
maxVersion
);
n
+=
tGetI64v
(
p
+
n
,
&
pBlockIdx
->
offset
);
n
+=
tGetI64v
(
p
+
n
,
&
pBlockIdx
->
size
);
return
n
;
return
n
;
}
}
...
@@ -513,78 +405,6 @@ int32_t tBlockCmprFn(const void *p1, const void *p2) {
...
@@ -513,78 +405,6 @@ int32_t tBlockCmprFn(const void *p1, const void *p2) {
}
}
// SDelIdx ======================================================
// SDelIdx ======================================================
// int32_t tDelIdxClear(SDelIdx *pDelIdx) {
// int32_t code = 0;
// tdbFree(pDelIdx->offset.pOffset);
// tdbFree(pDelIdx->pData);
// return code;
// }
// int32_t tDelIdxPutItem(SDelIdx *pDelIdx, SDelIdxItem *pItem) {
// int32_t code = 0;
// uint32_t offset = pDelIdx->nData;
// // offset
// code = tsdbAddOffset(&pDelIdx->offset, offset);
// if (code) goto _exit;
// // alloc
// pDelIdx->nData += tPutDelIdxItem(NULL, pItem);
// code = tsdbRealloc(&pDelIdx->pData, pDelIdx->nData);
// if (code) goto _exit;
// // put
// tPutDelIdxItem(pDelIdx->pData + offset, pItem);
// _exit:
// return code;
// }
// int32_t tDelIdxGetItemByIdx(SDelIdx *pDelIdx, SDelIdxItem *pItem, int32_t idx) {
// int32_t code = 0;
// int32_t offset;
// offset = tsdbGetOffset(&pDelIdx->offset, idx);
// if (offset < 0) {
// code = TSDB_CODE_NOT_FOUND;
// goto _exit;
// }
// tGetDelIdxItem(pDelIdx->pData + offset, pItem);
// _exit:
// return code;
// }
// int32_t tDelIdxGetItem(SDelIdx *pDelIdx, SDelIdxItem *pItem, TABLEID id) {
// int32_t code = 0;
// int32_t lidx = 0;
// int32_t ridx = pDelIdx->offset.nOffset - 1;
// int32_t midx;
// int32_t c;
// while (lidx <= ridx) {
// midx = (lidx + ridx) / 2;
// code = tDelIdxGetItemByIdx(pDelIdx, pItem, midx);
// if (code) goto _exit;
// c = tTABLEIDCmprFn(&id, pItem);
// if (c == 0) {
// goto _exit;
// } else if (c < 0) {
// ridx = midx - 1;
// } else {
// lidx = midx + 1;
// }
// }
// code = TSDB_CODE_NOT_FOUND;
// _exit:
// return code;
// }
int32_t
tPutDelIdx
(
uint8_t
*
p
,
void
*
ph
)
{
int32_t
tPutDelIdx
(
uint8_t
*
p
,
void
*
ph
)
{
SDelIdx
*
pDelIdx
=
(
SDelIdx
*
)
ph
;
SDelIdx
*
pDelIdx
=
(
SDelIdx
*
)
ph
;
int32_t
n
=
0
;
int32_t
n
=
0
;
...
@@ -617,97 +437,7 @@ int32_t tGetDelIdx(uint8_t *p, void *ph) {
...
@@ -617,97 +437,7 @@ int32_t tGetDelIdx(uint8_t *p, void *ph) {
return
n
;
return
n
;
}
}
// // SDelDataItem ======================================================
// static FORCE_INLINE int32_t tPutDelDataItem(uint8_t *p, SDelDataItem *pItem) {
// int32_t n = 0;
// n += tPutI64v(p ? p + n : p, pItem->version);
// n += tPutI64(p ? p + n : p, pItem->sKey);
// n += tPutI64(p ? p + n : p, pItem->eKey);
// return n;
// }
// static FORCE_INLINE int32_t tGetDelDataItem(uint8_t *p, SDelDataItem *pItem) {
// int32_t n = 0;
// n += tGetI64v(p + n, &pItem->version);
// n += tGetI64(p + n, &pItem->sKey);
// n += tGetI64(p + n, &pItem->eKey);
// return n;
// }
// SDelData ======================================================
// SDelData ======================================================
// int32_t tDelDataClear(SDelData *pDelData) {
// int32_t code = 0;
// tsdbFree(pDelData->offset.pOffset);
// tsdbFree(pDelData->pData);
// return code;
// }
// int32_t tDelDataPutItem(SDelData *pDelData, SDelDataItem *pItem) {
// int32_t code = 0;
// uint32_t offset = pDelData->nData;
// // offset
// code = tsdbAddOffset(&pDelData->offset, offset);
// if (code) goto _exit;
// // alloc
// pDelData->nData += tPutDelDataItem(NULL, pItem);
// code = tsdbRealloc(&pDelData->pData, pDelData->nData);
// if (code) goto _exit;
// // put
// tPutDelDataItem(pDelData->pData + offset, pItem);
// _exit:
// return code;
// }
// int32_t tDelDataGetItemByIdx(SDelData *pDelData, SDelDataItem *pItem, int32_t idx) {
// int32_t code = 0;
// int32_t offset;
// offset = tsdbGetOffset(&pDelData->offset, idx);
// if (offset < 0) {
// code = TSDB_CODE_NOT_FOUND;
// goto _exit;
// }
// tGetDelDataItem(pDelData->pData + offset, pItem);
// _exit:
// return code;
// }
// int32_t tDelDataGetItem(SDelData *pDelData, SDelDataItem *pItem, int64_t version) {
// int32_t code = 0;
// int32_t lidx = 0;
// int32_t ridx = pDelData->offset.nOffset - 1;
// int32_t midx;
// while (lidx <= ridx) {
// midx = (lidx + ridx) / 2;
// code = tDelDataGetItemByIdx(pDelData, pItem, midx);
// if (code) goto _exit;
// if (version == pItem->version) {
// goto _exit;
// } else if (version < pItem->version) {
// ridx = midx - 1;
// } else {
// ridx = midx + 1;
// }
// }
// code = TSDB_CODE_NOT_FOUND;
// _exit:
// return code;
// }
int32_t
tPutDelData
(
uint8_t
*
p
,
void
*
ph
)
{
int32_t
tPutDelData
(
uint8_t
*
p
,
void
*
ph
)
{
SDelData
*
pDelData
=
(
SDelData
*
)
ph
;
SDelData
*
pDelData
=
(
SDelData
*
)
ph
;
int32_t
n
=
0
;
int32_t
n
=
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录