Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
cbd5da5d
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
cbd5da5d
编写于
6月 09, 2023
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more code
上级
1ee9f80e
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
138 addition
and
132 deletion
+138
-132
source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c
source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c
+138
-132
未找到文件。
source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c
浏览文件 @
cbd5da5d
...
...
@@ -491,13 +491,6 @@ struct SDataFileWriter {
int32_t
tombBlkArrayIdx
;
STombBlock
tombBlock
[
1
];
int32_t
tombBlockIdx
;
#if 0
const TBlockIdxArray *blockIdxArray;
int32_t blockIdxArrayIdx;
const TDataBlkArray *dataBlkArray;
int32_t dataBlkArrayIdx;
#endif
}
ctx
[
1
];
STFile
files
[
TSDB_FTYPE_MAX
];
...
...
@@ -512,36 +505,8 @@ struct SDataFileWriter {
TTombBlkArray
tombBlkArray
[
1
];
STombBlock
tombBlock
[
1
];
#if 0
TBlockIdxArray blockIdxArray[1];
TDataBlkArray dataBlkArray[1];
#endif
};
#if 0
static int32_t tsdbDataFileWriteBlockIdx(SDataFileWriter *writer) {
int32_t code = 0;
int32_t lino = 0;
writer->headFooter->blockIdxPtr->offset = writer->files[TSDB_FTYPE_HEAD].size;
writer->headFooter->blockIdxPtr->size = TARRAY2_DATA_LEN(writer->blockIdxArray);
if (writer->headFooter->blockIdxPtr->size) {
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->headFooter->blockIdxPtr->offset,
(void *)TARRAY2_DATA(writer->blockIdxArray), writer->headFooter->blockIdxPtr->size);
TSDB_CHECK_CODE(code, lino, _exit);
writer->files[TSDB_FTYPE_HEAD].size += writer->headFooter->blockIdxPtr->size;
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
}
return code;
}
#endif
static
int32_t
tsdbDataFileWriterCloseAbort
(
SDataFileWriter
*
writer
)
{
ASSERT
(
0
);
return
0
;
...
...
@@ -553,14 +518,14 @@ static int32_t tsdbDataFileWriterDoClose(SDataFileWriter *writer) {
}
tTombBlockDestroy
(
writer
->
tombBlock
);
// tStatisBlockDestroy(writer->statisBlock);
tBlockDataDestroy
(
writer
->
blockData
);
TARRAY2_DESTROY
(
writer
->
tombBlkArray
,
NULL
);
#if 0
TARRAY2_DESTROY(writer->dataBlkArray, NULL
);
TARRAY2_DESTROY(writer->b
lockIdx
Array, NULL);
#endif
tBlockDataDestroy
(
writer
->
blockData
);
tBrinBlockDestroy
(
writer
->
brinBlock
);
TARRAY2_DESTROY
(
writer
->
b
rinBlk
Array
,
NULL
);
tTombBlockDestroy
(
writer
->
ctx
->
tombBlock
);
tBlockDataDestroy
(
writer
->
ctx
->
blockData
);
tBrinBlockDestroy
(
writer
->
ctx
->
brinBlock
);
for
(
int32_t
i
=
0
;
i
<
ARRAY_SIZE
(
writer
->
bufArr
);
++
i
)
{
tFree
(
writer
->
bufArr
[
i
]);
...
...
@@ -894,11 +859,6 @@ static int32_t tsdbDataFileWriteDataBlk(SDataFileWriter *writer, const TDataBlkA
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
writer
->
files
[
ftype
].
size
+=
blockIdx
->
size
;
#if 0
code = TARRAY2_APPEND_PTR(writer->blockIdxArray, blockIdx);
TSDB_CHECK_CODE(code, lino, _exit);
#endif
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
),
lino
,
code
);
...
...
@@ -947,101 +907,116 @@ _exit:
return
code
;
}
static
int32_t
tsdbDataFileDoWriteTSData
(
SDataFileWriter
*
writer
,
TSDBROW
*
row
)
{
static
int32_t
tsdbDataFileDoWriteTableOldData
(
SDataFileWriter
*
writer
,
const
TSDBKEY
*
key
)
{
if
(
writer
->
ctx
->
tbHasOldData
==
false
)
return
0
;
int32_t
code
=
0
;
int32_t
lino
=
0
;
if
(
writer
->
ctx
->
tbHasOldData
)
{
TSDBKEY
key
[
1
];
if
(
row
->
type
==
TSDBROW_ROW_FMT
)
{
key
->
ts
=
row
->
pTSRow
->
ts
;
key
->
version
=
row
->
version
;
}
else
{
key
->
ts
=
row
->
pBlockData
->
aTSKEY
[
row
->
iRow
];
key
->
version
=
row
->
pBlockData
->
aVersion
[
row
->
iRow
];
}
for
(;;)
{
for
(;;)
{
for
(;;)
{
// SBlockData
for
(;
writer
->
ctx
->
blockDataIdx
<
writer
->
ctx
->
blockData
->
nRow
;
writer
->
ctx
->
blockDataIdx
++
)
{
if
(
key
->
ts
<
writer
->
ctx
->
blockData
->
aTSKEY
[
writer
->
ctx
->
blockDataIdx
]
//
||
(
key
->
ts
==
writer
->
ctx
->
blockData
->
aTSKEY
[
writer
->
ctx
->
blockDataIdx
]
&&
key
->
version
<
writer
->
ctx
->
blockData
->
aVersion
[
writer
->
ctx
->
blockDataIdx
]))
{
goto
_do_write
;
}
else
{
TSDBROW
row1
=
tsdbRowFromBlockData
(
writer
->
ctx
->
blockData
,
writer
->
ctx
->
blockDataIdx
);
code
=
tsdbDataFileDoWriteTSRow
(
writer
,
&
row1
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
// SBlockData
for
(;
writer
->
ctx
->
blockDataIdx
<
writer
->
ctx
->
blockData
->
nRow
;
writer
->
ctx
->
blockDataIdx
++
)
{
if
(
key
->
ts
<
writer
->
ctx
->
blockData
->
aTSKEY
[
writer
->
ctx
->
blockDataIdx
]
//
||
(
key
->
ts
==
writer
->
ctx
->
blockData
->
aTSKEY
[
writer
->
ctx
->
blockDataIdx
]
&&
key
->
version
<
writer
->
ctx
->
blockData
->
aVersion
[
writer
->
ctx
->
blockDataIdx
]))
{
goto
_exit
;
}
else
{
TSDBROW
row1
=
tsdbRowFromBlockData
(
writer
->
ctx
->
blockData
,
writer
->
ctx
->
blockDataIdx
);
code
=
tsdbDataFileDoWriteTSRow
(
writer
,
&
row1
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
// SBrinBlock
if
(
writer
->
ctx
->
brinBlockIdx
>=
BRIN_BLOCK_SIZE
(
writer
->
ctx
->
brinBlock
))
{
break
;
}
// SBrinBlock
if
(
writer
->
ctx
->
brinBlockIdx
>=
BRIN_BLOCK_SIZE
(
writer
->
ctx
->
brinBlock
))
{
break
;
for
(;
writer
->
ctx
->
brinBlockIdx
<
BRIN_BLOCK_SIZE
(
writer
->
ctx
->
brinBlock
);
writer
->
ctx
->
brinBlockIdx
++
)
{
if
(
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
uid
,
writer
->
ctx
->
brinBlockIdx
)
!=
writer
->
ctx
->
tbid
->
uid
)
{
writer
->
ctx
->
tbHasOldData
=
false
;
goto
_exit
;
}
for
(;
writer
->
ctx
->
brinBlockIdx
<
BRIN_BLOCK_SIZE
(
writer
->
ctx
->
brinBlock
);
writer
->
ctx
->
brinBlockIdx
++
)
{
if
(
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
uid
,
writer
->
ctx
->
brinBlockIdx
)
!=
writer
->
ctx
->
tbid
->
uid
)
{
writer
->
ctx
->
tbHasOldData
=
false
;
goto
_do_write
;
}
if
(
key
->
ts
<
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
firstKey
,
writer
->
ctx
->
brinBlockIdx
)
//
||
(
key
->
ts
==
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
firstKey
,
writer
->
ctx
->
brinBlockIdx
)
&&
key
->
version
<
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
firstKeyVer
,
writer
->
ctx
->
brinBlockIdx
)))
{
goto
_exit
;
}
else
{
SBrinRecord
record
[
1
];
tBrinBlockGet
(
writer
->
ctx
->
brinBlock
,
writer
->
ctx
->
brinBlockIdx
,
record
);
if
(
key
->
ts
>
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
lastKey
,
writer
->
ctx
->
brinBlockIdx
)
//
||
(
key
->
ts
==
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
lastKey
,
writer
->
ctx
->
brinBlockIdx
)
&&
key
->
version
>
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
lastKeyVer
,
writer
->
ctx
->
brinBlockIdx
)))
{
if
(
writer
->
blockData
->
nRow
>
0
)
{
code
=
tsdbDataFileDoWriteBlockData
(
writer
,
writer
->
blockData
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
key
->
ts
<
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
firstKey
,
writer
->
ctx
->
brinBlockIdx
)
//
||
(
key
->
ts
==
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
firstKey
,
writer
->
ctx
->
brinBlockIdx
)
&&
key
->
version
<
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
firstKeyVer
,
writer
->
ctx
->
brinBlockIdx
)))
{
goto
_do_write
;
code
=
tsdbDataFileWriteBrinRecord
(
writer
,
record
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
SBrinRecord
record
[
1
];
tBrinBlockGet
(
writer
->
ctx
->
brinBlock
,
writer
->
ctx
->
brinBlockIdx
,
record
);
if
(
key
->
ts
>
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
lastKey
,
writer
->
ctx
->
brinBlockIdx
)
//
||
(
key
->
ts
==
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
lastKey
,
writer
->
ctx
->
brinBlockIdx
)
&&
key
->
version
>
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
lastKeyVer
,
writer
->
ctx
->
brinBlockIdx
)))
{
if
(
writer
->
blockData
->
nRow
>
0
)
{
code
=
tsdbDataFileDoWriteBlockData
(
writer
,
writer
->
blockData
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code
=
tsdbDataFileWriteBrinRecord
(
writer
,
record
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
code
=
tsdbDataFileReadBlockData
(
writer
->
ctx
->
reader
,
record
,
writer
->
ctx
->
blockData
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbDataFileReadBlockData
(
writer
->
ctx
->
reader
,
record
,
writer
->
ctx
->
blockData
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
writer
->
ctx
->
blockDataIdx
=
0
;
writer
->
ctx
->
brinBlockIdx
++
;
break
;
}
writer
->
ctx
->
blockDataIdx
=
0
;
writer
->
ctx
->
brinBlockIdx
++
;
break
;
}
}
}
}
// SBrinBlk
if
(
writer
->
ctx
->
brinBlkArrayIdx
>=
TARRAY2_SIZE
(
writer
->
ctx
->
brinBlkArray
))
{
writer
->
ctx
->
brinBlkArray
=
NULL
;
writer
->
ctx
->
tbHasOldData
=
false
;
goto
_exit
;
}
for
(;
writer
->
ctx
->
brinBlkArrayIdx
<
TARRAY2_SIZE
(
writer
->
ctx
->
brinBlkArray
);
writer
->
ctx
->
brinBlkArrayIdx
++
)
{
const
SBrinBlk
*
brinBlk
=
TARRAY2_GET_PTR
(
writer
->
ctx
->
brinBlkArray
,
writer
->
ctx
->
brinBlkArrayIdx
);
// SBrinBlk
if
(
writer
->
ctx
->
brinBlkArrayIdx
>=
TARRAY2_SIZE
(
writer
->
ctx
->
brinBlkArray
))
{
writer
->
ctx
->
brinBlkArray
=
NULL
;
if
(
brinBlk
->
minTbid
.
uid
!=
writer
->
ctx
->
tbid
->
uid
)
{
writer
->
ctx
->
tbHasOldData
=
false
;
goto
_
do_write
;
goto
_
exit
;
}
for
(;
writer
->
ctx
->
brinBlkArrayIdx
<
TARRAY2_SIZE
(
writer
->
ctx
->
brinBlkArray
);
writer
->
ctx
->
brinBlkArrayIdx
++
)
{
const
SBrinBlk
*
brinBlk
=
TARRAY2_GET_PTR
(
writer
->
ctx
->
brinBlkArray
,
writer
->
ctx
->
brinBlkArrayIdx
);
code
=
tsdbDataFileReadBrinBlock
(
writer
->
ctx
->
reader
,
brinBlk
,
writer
->
ctx
->
brinBlock
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
brinBlk
->
minTbid
.
uid
!=
writer
->
ctx
->
tbid
->
uid
)
{
writer
->
ctx
->
tbHasOldData
=
false
;
goto
_do_write
;
}
writer
->
ctx
->
brinBlockIdx
=
0
;
writer
->
ctx
->
brinBlkArrayIdx
++
;
break
;
}
}
code
=
tsdbDataFileReadBrinBlock
(
writer
->
ctx
->
reader
,
brinBlk
,
writer
->
ctx
->
brinBlock
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
writer
->
ctx
->
brinBlockIdx
=
0
;
writer
->
ctx
->
brinBlkArrayIdx
++
;
break
;
}
static
int32_t
tsdbDataFileDoWriteTSData
(
SDataFileWriter
*
writer
,
TSDBROW
*
row
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
if
(
writer
->
ctx
->
tbHasOldData
)
{
TSDBKEY
key
[
1
];
if
(
row
->
type
==
TSDBROW_ROW_FMT
)
{
key
->
ts
=
row
->
pTSRow
->
ts
;
key
->
version
=
row
->
version
;
}
else
{
key
->
ts
=
row
->
pBlockData
->
aTSKEY
[
row
->
iRow
];
key
->
version
=
row
->
pBlockData
->
aVersion
[
row
->
iRow
];
}
code
=
tsdbDataFileDoWriteTableOldData
(
writer
,
key
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
_do_write:
code
=
tsdbDataFileDoWriteTSRow
(
writer
,
row
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
...
...
@@ -1059,11 +1034,15 @@ static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) {
int32_t
lino
=
0
;
if
(
writer
->
ctx
->
tbHasOldData
)
{
for
(;
writer
->
ctx
->
blockDataIdx
<
writer
->
ctx
->
blockData
->
nRow
;
writer
->
ctx
->
blockDataIdx
++
)
{
TSDBROW
row
=
tsdbRowFromBlockData
(
writer
->
ctx
->
blockData
,
writer
->
ctx
->
blockDataIdx
);
code
=
tsdbDataFileDoWriteTSRow
(
writer
,
&
row
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
TSDBKEY
key
=
{
.
ts
=
TSKEY_MAX
,
.
version
=
VERSION_MAX
,
};
code
=
tsdbDataFileDoWriteTableOldData
(
writer
,
&
key
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
ASSERT
(
writer
->
ctx
->
tbHasOldData
==
false
);
}
code
=
tsdbDataFileDoWriteBlockData
(
writer
,
writer
->
blockData
);
...
...
@@ -1152,10 +1131,11 @@ static int32_t tsdbDataFileWriteHeadFooter(SDataFileWriter *writer) {
int32_t
code
=
0
;
int32_t
lino
=
0
;
code
=
tsdbWriteFile
(
writer
->
fd
[
TSDB_FTYPE_HEAD
],
writer
->
files
[
TSDB_FTYPE_HEAD
].
size
,
(
const
uint8_t
*
)
writer
->
headFooter
,
sizeof
(
SHeadFooter
));
int32_t
ftype
=
TSDB_FTYPE_HEAD
;
code
=
tsdbWriteFile
(
writer
->
fd
[
ftype
],
writer
->
files
[
ftype
].
size
,
(
const
uint8_t
*
)
writer
->
headFooter
,
sizeof
(
SHeadFooter
));
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
writer
->
files
[
TSDB_FTYPE_HEAD
].
size
+=
sizeof
(
SHeadFooter
);
writer
->
files
[
ftype
].
size
+=
sizeof
(
SHeadFooter
);
_exit:
if
(
code
)
{
...
...
@@ -1327,6 +1307,28 @@ _exit:
return
code
;
}
static
int32_t
tsdbDataFileWriteBrinBlk
(
SDataFileWriter
*
writer
)
{
ASSERT
(
TARRAY2_SIZE
(
writer
->
brinBlkArray
)
>
0
);
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
ftype
=
TSDB_FTYPE_HEAD
;
writer
->
headFooter
->
brinBlkPtr
->
offset
=
writer
->
files
[
ftype
].
size
;
writer
->
headFooter
->
brinBlkPtr
->
size
=
TARRAY2_DATA_LEN
(
writer
->
brinBlkArray
);
code
=
tsdbWriteFile
(
writer
->
fd
[
ftype
],
writer
->
headFooter
->
brinBlkPtr
->
offset
,
(
uint8_t
*
)
TARRAY2_DATA
(
writer
->
brinBlkArray
),
writer
->
headFooter
->
brinBlkPtr
->
size
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
writer
->
files
[
ftype
].
size
+=
writer
->
headFooter
->
brinBlkPtr
->
size
;
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
static
int32_t
tsdbDataFileWriterCloseCommit
(
SDataFileWriter
*
writer
,
TFileOpArray
*
opArr
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
...
...
@@ -1346,10 +1348,8 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
code
=
tsdbDataFileWriteTableDataBegin
(
writer
,
tbid
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
#if 0
code = tsdbDataFileWriteBlockIdx(writer);
code
=
tsdbDataFileWriteBrinBlk
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
#endif
code
=
tsdbDataFileWriteHeadFooter
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
...
...
@@ -1502,10 +1502,6 @@ static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) {
if
(
writer
->
ctx
->
reader
)
{
code
=
tsdbDataFileReadBrinBlk
(
writer
->
ctx
->
reader
,
&
writer
->
ctx
->
brinBlkArray
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
#if 0
code = tsdbDataFileReadBlockIdx(writer->ctx->reader, &writer->ctx->blockIdxArray);
TSDB_CHECK_CODE(code, lino, _exit);
#endif
}
_exit:
...
...
@@ -1605,6 +1601,16 @@ int32_t tsdbDataFileWriteBlockData(SDataFileWriter *writer, SBlockData *bData) {
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
writer
->
ctx
->
tbHasOldData
)
{
TSDBKEY
key
=
{
.
ts
=
bData
->
aTSKEY
[
0
],
.
version
=
bData
->
aVersion
[
0
],
};
code
=
tsdbDataFileDoWriteTableOldData
(
writer
,
&
key
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
!
writer
->
ctx
->
tbHasOldData
//
&&
writer
->
blockData
->
nRow
==
0
//
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录