Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3753c7d6
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3753c7d6
编写于
6月 09, 2023
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refact more
上级
cb52b7a7
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
241 addition
and
115 deletion
+241
-115
source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c
source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c
+241
-115
未找到文件。
source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c
浏览文件 @
3753c7d6
...
...
@@ -474,21 +474,30 @@ struct SDataFileWriter {
struct
{
bool
opened
;
SDataFileReader
*
reader
;
// for ts data
const
TBlockIdxArray
*
blockIdxArray
;
int32_t
blockIdxArrayIdx
;
bool
tbHasOldData
;
const
TDataBlkArray
*
dataBlkArray
;
int32_t
dataBlkArrayIdx
;
SBlockData
bData
[
1
];
int32_t
iRow
;
TABLEID
tbid
[
1
];
bool
tbHasOldData
;
const
TBrinBlkArray
*
brinBlkArray
;
int32_t
brinBlkArrayIdx
;
SBrinBlock
brinBlock
[
1
];
int32_t
brinBlockIdx
;
SBlockData
blockData
[
1
];
int32_t
blockDataIdx
;
// for tomb data
bool
hasOldTomb
;
const
TTombBlkArray
*
tombBlkArray
;
int32_t
tombBlkArrayIdx
;
STombBlock
tData
[
1
];
int32_t
iRowTomb
;
STombBlock
tombBlock
[
1
];
int32_t
tombBlockIdx
;
#if 0
const TBlockIdxArray *blockIdxArray;
int32_t blockIdxArrayIdx;
const TDataBlkArray *dataBlkArray;
int32_t dataBlkArrayIdx;
#endif
}
ctx
[
1
];
STFile
files
[
TSDB_FTYPE_MAX
];
...
...
@@ -496,14 +505,21 @@ struct SDataFileWriter {
SHeadFooter
headFooter
[
1
];
STombFooter
tombFooter
[
1
];
TBrinBlkArray
brinBlkArray
[
1
];
SBrinBlock
brinBlock
[
1
];
SBlockData
blockData
[
1
];
TTombBlkArray
tombBlkArray
[
1
];
STombBlock
tombBlock
[
1
];
#if 0
TBlockIdxArray blockIdxArray[1];
TDataBlkArray dataBlkArray[1];
TTombBlkArray
tombBlkArray
[
1
];
SBlockData
bData
[
1
];
STbStatisBlock
sData
[
1
];
STombBlock
tData
[
1
];
#endif
};
#if 0
static int32_t tsdbDataFileWriteBlockIdx(SDataFileWriter *writer) {
int32_t code = 0;
int32_t lino = 0;
...
...
@@ -524,6 +540,7 @@ _exit:
}
return code;
}
#endif
static
int32_t
tsdbDataFileWriterCloseAbort
(
SDataFileWriter
*
writer
)
{
ASSERT
(
0
);
...
...
@@ -535,13 +552,15 @@ static int32_t tsdbDataFileWriterDoClose(SDataFileWriter *writer) {
tsdbDataFileReaderClose
(
&
writer
->
ctx
->
reader
);
}
tTombBlockDestroy
(
writer
->
t
Data
);
tStatisBlockDestroy
(
writer
->
sData
);
tBlockDataDestroy
(
writer
->
bData
);
tTombBlockDestroy
(
writer
->
t
ombBlock
);
// tStatisBlockDestroy(writer->statisBlock
);
tBlockDataDestroy
(
writer
->
b
lock
Data
);
TARRAY2_DESTROY
(
writer
->
tombBlkArray
,
NULL
);
#if 0
TARRAY2_DESTROY(writer->dataBlkArray, NULL);
TARRAY2_DESTROY(writer->blockIdxArray, NULL);
tTombBlockDestroy
(
writer
->
ctx
->
tData
);
#endif
tTombBlockDestroy
(
writer
->
ctx
->
tombBlock
);
for
(
int32_t
i
=
0
;
i
<
ARRAY_SIZE
(
writer
->
bufArr
);
++
i
)
{
tFree
(
writer
->
bufArr
[
i
]);
...
...
@@ -565,8 +584,10 @@ static int32_t tsdbDataFileWriterDoOpenReader(SDataFileWriter *writer) {
for
(
int32_t
i
=
0
;
i
<
TSDB_FTYPE_MAX
;
++
i
)
{
config
->
files
[
i
].
exist
=
writer
->
config
->
files
[
i
].
exist
;
if
(
config
->
files
[
i
].
exist
)
{
config
->
files
[
i
].
file
=
writer
->
config
->
files
[
i
].
file
;
}
}
code
=
tsdbDataFileReaderOpen
(
NULL
,
config
,
&
writer
->
ctx
->
reader
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
...
...
@@ -584,6 +605,7 @@ _exit:
static
int32_t
tsdbDataFileWriterDoOpen
(
SDataFileWriter
*
writer
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
ftype
;
if
(
!
writer
->
config
->
skmTb
)
writer
->
config
->
skmTb
=
writer
->
skmTb
;
if
(
!
writer
->
config
->
skmRow
)
writer
->
config
->
skmRow
=
writer
->
skmRow
;
...
...
@@ -593,7 +615,6 @@ static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) {
code
=
tsdbDataFileWriterDoOpenReader
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
int32_t
ftype
;
// .head
ftype
=
TSDB_FTYPE_HEAD
;
writer
->
files
[
ftype
]
=
(
STFile
){
...
...
@@ -735,9 +756,11 @@ static int32_t tsdbDataFileWriteDataBlock(SDataFileWriter *writer, SBlockData *b
TARRAY2_DESTROY
(
smaArr
,
NULL
);
#if 0
// to dataBlkArray
code = TARRAY2_APPEND_PTR(writer->dataBlkArray, dataBlk);
TSDB_CHECK_CODE(code, lino, _exit);
#endif
tBlockDataClear
(
bData
);
...
...
@@ -767,8 +790,10 @@ static int32_t tsdbDataFileWriteDataBlk(SDataFileWriter *writer, const TDataBlkA
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
writer
->
files
[
ftype
].
size
+=
blockIdx
->
size
;
#if 0
code = TARRAY2_APPEND_PTR(writer->blockIdxArray, blockIdx);
TSDB_CHECK_CODE(code, lino, _exit);
#endif
_exit:
if
(
code
)
{
...
...
@@ -789,18 +814,18 @@ static int32_t tsdbDataFileDoWriteTSRow(SDataFileWriter *writer, TSDBROW *row) {
TSDBKEY
key
[
1
]
=
{
TSDBROW_KEY
(
row
)};
if
(
key
->
version
<=
writer
->
config
->
compactVersion
//
&&
writer
->
b
Data
->
nRow
>
0
//
&&
writer
->
b
Data
->
aTSKEY
[
writer
->
b
Data
->
nRow
-
1
]
==
key
->
ts
//
&&
writer
->
b
lockData
->
nRow
>
0
//
&&
writer
->
b
lockData
->
aTSKEY
[
writer
->
block
Data
->
nRow
-
1
]
==
key
->
ts
//
)
{
code
=
tBlockDataUpdateRow
(
writer
->
bData
,
row
,
writer
->
config
->
skmRow
->
pTSchema
);
code
=
tBlockDataUpdateRow
(
writer
->
b
lock
Data
,
row
,
writer
->
config
->
skmRow
->
pTSchema
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
if
(
writer
->
bData
->
nRow
>=
writer
->
config
->
maxRow
)
{
code
=
tsdbDataFileWriteDataBlock
(
writer
,
writer
->
bData
);
if
(
writer
->
b
lock
Data
->
nRow
>=
writer
->
config
->
maxRow
)
{
code
=
tsdbDataFileWriteDataBlock
(
writer
,
writer
->
b
lock
Data
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code
=
tBlockDataAppendRow
(
writer
->
bData
,
row
,
writer
->
config
->
skmRow
->
pTSchema
,
writer
->
ctx
->
tbid
->
uid
);
code
=
tBlockDataAppendRow
(
writer
->
b
lock
Data
,
row
,
writer
->
config
->
skmRow
->
pTSchema
,
writer
->
ctx
->
tbid
->
uid
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
...
...
@@ -816,8 +841,8 @@ static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row)
int32_t
lino
=
0
;
while
(
writer
->
ctx
->
tbHasOldData
)
{
for
(;
writer
->
ctx
->
iRow
<
writer
->
ctx
->
bData
->
nRow
;
writer
->
ctx
->
iRow
++
)
{
TSDBROW
row1
[
1
]
=
{
tsdbRowFromBlockData
(
writer
->
ctx
->
b
Data
,
writer
->
ctx
->
iRow
)};
for
(;
writer
->
ctx
->
blockDataIdx
<
writer
->
ctx
->
blockData
->
nRow
;
writer
->
ctx
->
blockDataIdx
++
)
{
TSDBROW
row1
[
1
]
=
{
tsdbRowFromBlockData
(
writer
->
ctx
->
b
lockData
,
writer
->
ctx
->
blockDataIdx
)};
int32_t
c
=
tsdbRowCmprFn
(
row
,
row1
);
ASSERT
(
c
);
...
...
@@ -829,6 +854,7 @@ static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row)
}
}
#if 0
if (writer->ctx->dataBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->dataBlkArray)) {
writer->ctx->tbHasOldData = false;
break;
...
...
@@ -845,7 +871,7 @@ static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row)
int32_t c = tDataBlkCmprFn(dataBlk, dataBlk1);
if (c < 0) {
code
=
tsdbDataFileWriteDataBlock
(
writer
,
writer
->
bData
);
code = tsdbDataFileWriteDataBlock(writer, writer->b
lock
Data);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND_PTR(writer->dataBlkArray, dataBlk);
...
...
@@ -853,14 +879,15 @@ static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row)
} else if (c > 0) {
goto _do_write;
} else {
code
=
tsdbDataFileReadDataBlock
(
writer
->
ctx
->
reader
,
dataBlk
,
writer
->
ctx
->
bData
);
code = tsdbDataFileReadDataBlock(writer->ctx->reader, dataBlk, writer->ctx->b
lock
Data);
TSDB_CHECK_CODE(code, lino, _exit);
writer
->
ctx
->
iRow
=
0
;
writer->ctx->
blockDataIdx
= 0;
writer->ctx->dataBlkArrayIdx++;
break;
}
}
#endif
}
_do_write:
...
...
@@ -875,37 +902,123 @@ _exit:
}
static
int32_t
tsdbDataFileWriteTableDataEnd
(
SDataFileWriter
*
writer
)
{
if
(
!
writer
->
ctx
->
tbid
->
uid
)
return
0
;
if
(
writer
->
ctx
->
tbid
->
uid
==
0
)
return
0
;
int32_t
code
=
0
;
int32_t
lino
=
0
;
// handle table remain data
if
(
writer
->
ctx
->
tbHasOldData
)
{
for
(;
writer
->
ctx
->
iRow
<
writer
->
ctx
->
bData
->
nRow
;
writer
->
ctx
->
iRow
++
)
{
TSDBROW
row
[
1
]
=
{
tsdbRowFromBlockData
(
writer
->
ctx
->
bData
,
writer
->
ctx
->
iRow
)};
for
(;
writer
->
ctx
->
blockDataIdx
<
writer
->
ctx
->
blockData
->
nRow
;
writer
->
ctx
->
blockDataIdx
++
)
{
TSDBROW
row
=
tsdbRowFromBlockData
(
writer
->
ctx
->
blockData
,
writer
->
ctx
->
blockDataIdx
);
code
=
tsdbDataFileDoWriteTSRow
(
writer
,
&
row
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
code
=
tsdbDataFileDoWriteTSRow
(
writer
,
row
);
code
=
tsdbDataFileWriteDataBlock
(
writer
,
writer
->
blockData
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
static
int32_t
tsdbDataFileWriteBrinBlock
(
SDataFileWriter
*
writer
)
{
if
(
BRIN_BLOCK_SIZE
(
writer
->
brinBlock
)
==
0
)
return
0
;
int32_t
code
=
0
;
int32_t
lino
=
0
;
// get SBrinBlk
SBrinBlk
brinBlk
[
1
]
=
{
{
.
dp
[
0
]
=
{
.
offset
=
writer
->
files
[
TSDB_FTYPE_HEAD
].
size
,
.
size
=
0
,
},
.
minTbid
=
{
.
suid
=
TARRAY2_FIRST
(
writer
->
brinBlock
->
suid
),
.
uid
=
TARRAY2_FIRST
(
writer
->
brinBlock
->
uid
),
},
.
maxTbid
=
{
.
suid
=
TARRAY2_LAST
(
writer
->
brinBlock
->
suid
),
.
uid
=
TARRAY2_LAST
(
writer
->
brinBlock
->
uid
),
},
.
minVer
=
TARRAY2_FIRST
(
writer
->
brinBlock
->
minVer
),
.
maxVer
=
TARRAY2_FIRST
(
writer
->
brinBlock
->
minVer
),
.
numRec
=
BRIN_BLOCK_SIZE
(
writer
->
brinBlock
),
.
cmprAlg
=
writer
->
config
->
cmprAlg
,
},
};
for
(
int32_t
i
=
1
;
i
<
BRIN_BLOCK_SIZE
(
writer
->
brinBlock
);
i
++
)
{
if
(
brinBlk
->
minVer
>
TARRAY2_GET
(
writer
->
brinBlock
->
minVer
,
i
))
{
brinBlk
->
minVer
=
TARRAY2_GET
(
writer
->
brinBlock
->
minVer
,
i
);
}
if
(
brinBlk
->
maxVer
<
TARRAY2_GET
(
writer
->
brinBlock
->
maxVer
,
i
))
{
brinBlk
->
maxVer
=
TARRAY2_GET
(
writer
->
brinBlock
->
maxVer
,
i
);
}
}
// write to file
for
(
int32_t
i
=
0
;
i
<
ARRAY_SIZE
(
writer
->
brinBlock
->
dataArr1
);
i
++
)
{
code
=
tsdbCmprData
((
uint8_t
*
)
TARRAY2_DATA
(
writer
->
brinBlock
->
dataArr1
+
i
),
TARRAY2_DATA_LEN
(
writer
->
brinBlock
->
dataArr1
+
i
),
TSDB_DATA_TYPE_BIGINT
,
brinBlk
->
cmprAlg
,
&
writer
->
config
->
bufArr
[
0
],
0
,
&
brinBlk
->
size
[
i
],
&
writer
->
config
->
bufArr
[
1
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbWriteFile
(
writer
->
fd
[
TSDB_FTYPE_HEAD
],
writer
->
files
[
TSDB_FTYPE_HEAD
].
size
,
writer
->
config
->
bufArr
[
0
],
brinBlk
->
size
[
i
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
brinBlk
->
dp
[
i
].
size
+=
brinBlk
->
size
[
i
];
writer
->
files
[
TSDB_FTYPE_HEAD
].
size
+=
brinBlk
->
size
[
i
];
}
code
=
tsdbDataFileWriteDataBlock
(
writer
,
writer
->
bData
);
for
(
int32_t
i
=
0
,
j
=
ARRAY_SIZE
(
writer
->
brinBlock
->
dataArr1
);
i
<
ARRAY_SIZE
(
writer
->
brinBlock
->
dataArr2
);
i
++
,
j
++
)
{
code
=
tsdbCmprData
((
uint8_t
*
)
TARRAY2_DATA
(
writer
->
brinBlock
->
dataArr2
+
i
),
TARRAY2_DATA_LEN
(
writer
->
brinBlock
->
dataArr2
+
i
),
TSDB_DATA_TYPE_INT
,
brinBlk
->
cmprAlg
,
&
writer
->
config
->
bufArr
[
0
],
0
,
&
brinBlk
->
size
[
j
],
&
writer
->
config
->
bufArr
[
1
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
for
(;
writer
->
ctx
->
dataBlkArrayIdx
<
TARRAY2_SIZE
(
writer
->
ctx
->
dataBlkArray
);
writer
->
ctx
->
dataBlkArrayIdx
++
)
{
code
=
TARRAY2_APPEND_PTR
(
writer
->
dataBlkArray
,
TARRAY2_GET_PTR
(
writer
->
ctx
->
dataBlkArray
,
writer
->
ctx
->
dataBlkArrayIdx
));
code
=
tsdbWriteFile
(
writer
->
fd
[
TSDB_FTYPE_HEAD
],
writer
->
files
[
TSDB_FTYPE_HEAD
].
size
,
writer
->
config
->
bufArr
[
0
],
brinBlk
->
size
[
j
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
brinBlk
->
dp
[
i
].
size
+=
brinBlk
->
size
[
j
];
writer
->
files
[
TSDB_FTYPE_HEAD
].
size
+=
brinBlk
->
size
[
j
];
}
writer
->
ctx
->
tbHasOldData
=
false
;
// append to brinBlkArray
code
=
TARRAY2_APPEND_PTR
(
writer
->
brinBlkArray
,
brinBlk
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
tBrinBlockClear
(
writer
->
brinBlock
);
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
code
=
tsdbDataFileWriteDataBlock
(
writer
,
writer
->
bData
);
static
int32_t
tsdbDataFileWriteBrinRecord
(
SDataFileWriter
*
writer
,
const
SBrinRecord
*
record
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
code
=
tBrinBlockPut
(
writer
->
brinBlock
,
record
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbDataFileWriteDataBlk
(
writer
,
writer
->
dataBlkArray
);
if
(
BRIN_BLOCK_SIZE
(
writer
->
brinBlock
)
>=
writer
->
config
->
maxRow
)
{
code
=
tsdbDataFileWriteBrinBlock
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
_exit:
if
(
code
)
{
...
...
@@ -919,57 +1032,64 @@ static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TA
int32_t
lino
=
0
;
SMetaInfo
info
;
ASSERT
(
writer
->
ctx
->
blockDataIdx
==
writer
->
ctx
->
blockData
->
nRow
);
ASSERT
(
writer
->
blockData
->
nRow
==
0
);
writer
->
ctx
->
tbHasOldData
=
false
;
while
(
writer
->
ctx
->
brinBlkArray
)
{
// skip data of previous table
for
(;
writer
->
ctx
->
brinBlockIdx
<
BRIN_BLOCK_SIZE
(
writer
->
ctx
->
brinBlock
);
writer
->
ctx
->
brinBlockIdx
++
)
{
// skip removed table
int64_t
uid
=
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
uid
,
writer
->
ctx
->
brinBlockIdx
);
if
(
metaGetInfo
(
writer
->
config
->
tsdb
->
pVnode
->
pMeta
,
uid
,
&
info
,
NULL
)
==
TSDB_CODE_NOT_FOUND
)
{
for
(
int32_t
idx
=
writer
->
ctx
->
brinBlockIdx
+
1
;
//
idx
<
BRIN_BLOCK_SIZE
(
writer
->
ctx
->
brinBlock
)
//
&&
uid
==
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
uid
,
idx
);
idx
++
,
writer
->
ctx
->
brinBlockIdx
++
)
{
}
continue
;
}
// skip data of previous table
if
(
writer
->
ctx
->
blockIdxArray
)
{
for
(;
writer
->
ctx
->
blockIdxArrayIdx
<
TARRAY2_SIZE
(
writer
->
ctx
->
blockIdxArray
);
writer
->
ctx
->
blockIdxArrayIdx
++
)
{
const
SBlockIdx
*
blockIdx
=
TARRAY2_GET_PTR
(
writer
->
ctx
->
blockIdxArray
,
writer
->
ctx
->
blockIdxArrayIdx
);
SBrinRecord
record
[
1
];
tBrinBlockGet
(
writer
->
ctx
->
brinBlock
,
writer
->
ctx
->
brinBlockIdx
,
record
);
int32_t
c
=
tTABLEIDCmprFn
(
blockIdx
,
tbid
);
int32_t
c
=
tTABLEIDCmprFn
(
record
,
tbid
);
if
(
c
<
0
)
{
if
(
metaGetInfo
(
writer
->
config
->
tsdb
->
pVnode
->
pMeta
,
blockIdx
->
uid
,
&
info
,
NULL
)
==
0
)
{
code
=
tsdbDataFileReadDataBlk
(
writer
->
ctx
->
reader
,
blockIdx
,
&
writer
->
ctx
->
dataBlkArray
);
code
=
tsdbDataFileWriteBrinRecord
(
writer
,
record
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
writer
->
ctx
->
tbid
->
suid
=
blockIdx
->
suid
;
writer
->
ctx
->
tbid
->
uid
=
blockIdx
->
uid
;
code
=
tsdbDataFileWriteDataBlk
(
writer
,
writer
->
ctx
->
dataBlkArray
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
continue
;
}
}
else
{
if
(
c
==
0
)
{
writer
->
ctx
->
tbHasOldData
=
true
;
}
goto
_begin
;
}
}
code
=
tsdbDataFileReadDataBlk
(
writer
->
ctx
->
reader
,
blockIdx
,
&
writer
->
ctx
->
dataBlkArray
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
writer
->
ctx
->
brinBlkArrayIdx
>=
TARRAY2_SIZE
(
writer
->
ctx
->
brinBlkArray
))
{
writer
->
ctx
->
brinBlkArray
=
NULL
;
break
;
}
writer
->
ctx
->
dataBlkArrayIdx
=
0
;
for
(;
writer
->
ctx
->
brinBlkArrayIdx
<
TARRAY2_SIZE
(
writer
->
ctx
->
brinBlkArray
);
writer
->
ctx
->
brinBlkArrayIdx
++
)
{
const
SBrinBlk
*
brinBlk
=
TARRAY2_GET_PTR
(
writer
->
ctx
->
brinBlkArray
,
writer
->
ctx
->
brinBlkArrayIdx
);
tBlockDataReset
(
writer
->
ctx
->
bData
);
writer
->
ctx
->
iRow
=
0
;
code
=
tsdbDataFileReadBrinBlock
(
writer
->
ctx
->
reader
,
brinBlk
,
writer
->
ctx
->
brinBlock
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
writer
->
ctx
->
blockIdxArrayIdx
++
;
}
writer
->
ctx
->
brinBlockIdx
=
0
;
writer
->
ctx
->
brinBlkArrayIdx
++
;
break
;
}
}
}
// make sure state is correct
writer
->
ctx
->
tbid
[
0
]
=
tbid
[
0
];
if
(
tbid
->
suid
==
INT64_MAX
&&
tbid
->
uid
==
INT64_MAX
)
goto
_exit
;
_begin:
writer
->
ctx
->
tbid
[
0
]
=
*
tbid
;
TARRAY2_CLEAR
(
writer
->
dataBlkArray
,
NULL
)
;
if
(
tbid
->
uid
==
INT64_MAX
)
goto
_exit
;
code
=
tsdbUpdateSkmTb
(
writer
->
config
->
tsdb
,
tbid
,
writer
->
config
->
skmTb
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tBlockDataInit
(
writer
->
bData
,
writer
->
ctx
->
tbid
,
writer
->
config
->
skmTb
->
pTSchema
,
NULL
,
0
);
code
=
tBlockDataInit
(
writer
->
b
lock
Data
,
writer
->
ctx
->
tbid
,
writer
->
config
->
skmTb
->
pTSchema
,
NULL
,
0
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
...
...
@@ -996,25 +1116,25 @@ _exit:
}
static
int32_t
tsdbDataFileDoWriteTombBlock
(
SDataFileWriter
*
writer
)
{
if
(
TOMB_BLOCK_SIZE
(
writer
->
t
Data
)
==
0
)
return
0
;
if
(
TOMB_BLOCK_SIZE
(
writer
->
t
ombBlock
)
==
0
)
return
0
;
int32_t
code
=
0
;
int32_t
lino
=
0
;
STombBlk
tombBlk
[
1
]
=
{{
.
numRec
=
TOMB_BLOCK_SIZE
(
writer
->
t
Data
),
.
numRec
=
TOMB_BLOCK_SIZE
(
writer
->
t
ombBlock
),
.
minTbid
=
{
.
suid
=
TARRAY2_FIRST
(
writer
->
t
Data
->
suid
),
.
uid
=
TARRAY2_FIRST
(
writer
->
t
Data
->
uid
),
.
suid
=
TARRAY2_FIRST
(
writer
->
t
ombBlock
->
suid
),
.
uid
=
TARRAY2_FIRST
(
writer
->
t
ombBlock
->
uid
),
},
.
maxTbid
=
{
.
suid
=
TARRAY2_LAST
(
writer
->
t
Data
->
suid
),
.
uid
=
TARRAY2_LAST
(
writer
->
t
Data
->
uid
),
.
suid
=
TARRAY2_LAST
(
writer
->
t
ombBlock
->
suid
),
.
uid
=
TARRAY2_LAST
(
writer
->
t
ombBlock
->
uid
),
},
.
minVer
=
TARRAY2_FIRST
(
writer
->
t
Data
->
version
),
.
maxVer
=
TARRAY2_FIRST
(
writer
->
t
Data
->
version
),
.
minVer
=
TARRAY2_FIRST
(
writer
->
t
ombBlock
->
version
),
.
maxVer
=
TARRAY2_FIRST
(
writer
->
t
ombBlock
->
version
),
.
dp
[
0
]
=
{
.
offset
=
writer
->
files
[
TSDB_FTYPE_TOMB
].
size
,
...
...
@@ -1022,15 +1142,15 @@ static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) {
},
}};
for
(
int32_t
i
=
1
;
i
<
TOMB_BLOCK_SIZE
(
writer
->
t
Data
);
i
++
)
{
tombBlk
->
minVer
=
TMIN
(
tombBlk
->
minVer
,
TARRAY2_GET
(
writer
->
t
Data
->
version
,
i
));
tombBlk
->
maxVer
=
TMAX
(
tombBlk
->
maxVer
,
TARRAY2_GET
(
writer
->
t
Data
->
version
,
i
));
for
(
int32_t
i
=
1
;
i
<
TOMB_BLOCK_SIZE
(
writer
->
t
ombBlock
);
i
++
)
{
tombBlk
->
minVer
=
TMIN
(
tombBlk
->
minVer
,
TARRAY2_GET
(
writer
->
t
ombBlock
->
version
,
i
));
tombBlk
->
maxVer
=
TMAX
(
tombBlk
->
maxVer
,
TARRAY2_GET
(
writer
->
t
ombBlock
->
version
,
i
));
}
for
(
int32_t
i
=
0
;
i
<
ARRAY_SIZE
(
writer
->
t
Data
->
dataArr
);
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
ARRAY_SIZE
(
writer
->
t
ombBlock
->
dataArr
);
i
++
)
{
int32_t
size
;
code
=
tsdbCmprData
((
uint8_t
*
)
TARRAY2_DATA
(
&
writer
->
t
Data
->
dataArr
[
i
]),
TARRAY2_DATA_LEN
(
&
writer
->
t
Data
->
dataArr
[
i
]),
TSDB_DATA_TYPE_BIGINT
,
TWO_STAGE_COMP
,
code
=
tsdbCmprData
((
uint8_t
*
)
TARRAY2_DATA
(
&
writer
->
t
ombBlock
->
dataArr
[
i
]),
TARRAY2_DATA_LEN
(
&
writer
->
t
ombBlock
->
dataArr
[
i
]),
TSDB_DATA_TYPE_BIGINT
,
TWO_STAGE_COMP
,
&
writer
->
config
->
bufArr
[
0
],
0
,
&
size
,
&
writer
->
config
->
bufArr
[
1
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
...
...
@@ -1046,7 +1166,7 @@ static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) {
code
=
TARRAY2_APPEND_PTR
(
writer
->
tombBlkArray
,
tombBlk
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
tTombBlockClear
(
writer
->
t
Data
);
tTombBlockClear
(
writer
->
t
ombBlock
);
_exit:
if
(
code
)
{
...
...
@@ -1098,23 +1218,23 @@ static int32_t tsdbDataFileDoWriteTombRecord(SDataFileWriter *writer, const STom
int32_t
lino
=
0
;
while
(
writer
->
ctx
->
hasOldTomb
)
{
for
(;
writer
->
ctx
->
iRowTomb
<
TOMB_BLOCK_SIZE
(
writer
->
ctx
->
tData
);
writer
->
ctx
->
iRowTomb
++
)
{
for
(;
writer
->
ctx
->
tombBlockIdx
<
TOMB_BLOCK_SIZE
(
writer
->
ctx
->
tombBlock
);
writer
->
ctx
->
tombBlockIdx
++
)
{
STombRecord
record1
[
1
]
=
{{
.
suid
=
TARRAY2_GET
(
writer
->
ctx
->
t
Data
->
suid
,
writer
->
ctx
->
iRowTomb
),
.
uid
=
TARRAY2_GET
(
writer
->
ctx
->
t
Data
->
uid
,
writer
->
ctx
->
iRowTomb
),
.
version
=
TARRAY2_GET
(
writer
->
ctx
->
t
Data
->
version
,
writer
->
ctx
->
iRowTomb
),
.
skey
=
TARRAY2_GET
(
writer
->
ctx
->
t
Data
->
skey
,
writer
->
ctx
->
iRowTomb
),
.
ekey
=
TARRAY2_GET
(
writer
->
ctx
->
t
Data
->
ekey
,
writer
->
ctx
->
iRowTomb
),
.
suid
=
TARRAY2_GET
(
writer
->
ctx
->
t
ombBlock
->
suid
,
writer
->
ctx
->
tombBlockIdx
),
.
uid
=
TARRAY2_GET
(
writer
->
ctx
->
t
ombBlock
->
uid
,
writer
->
ctx
->
tombBlockIdx
),
.
version
=
TARRAY2_GET
(
writer
->
ctx
->
t
ombBlock
->
version
,
writer
->
ctx
->
tombBlockIdx
),
.
skey
=
TARRAY2_GET
(
writer
->
ctx
->
t
ombBlock
->
skey
,
writer
->
ctx
->
tombBlockIdx
),
.
ekey
=
TARRAY2_GET
(
writer
->
ctx
->
t
ombBlock
->
ekey
,
writer
->
ctx
->
tombBlockIdx
),
}};
int32_t
c
=
tTombRecordCompare
(
record
,
record1
);
if
(
c
<
0
)
{
break
;
}
else
if
(
c
>
0
)
{
code
=
tTombBlockPut
(
writer
->
t
Data
,
record1
);
code
=
tTombBlockPut
(
writer
->
t
ombBlock
,
record1
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
TOMB_BLOCK_SIZE
(
writer
->
t
Data
)
>=
writer
->
config
->
maxRow
)
{
if
(
TOMB_BLOCK_SIZE
(
writer
->
t
ombBlock
)
>=
writer
->
config
->
maxRow
)
{
code
=
tsdbDataFileDoWriteTombBlock
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
...
...
@@ -1131,10 +1251,10 @@ static int32_t tsdbDataFileDoWriteTombRecord(SDataFileWriter *writer, const STom
for
(;
writer
->
ctx
->
tombBlkArrayIdx
<
TARRAY2_SIZE
(
writer
->
ctx
->
tombBlkArray
);
++
writer
->
ctx
->
tombBlkArrayIdx
)
{
const
STombBlk
*
tombBlk
=
TARRAY2_GET_PTR
(
writer
->
ctx
->
tombBlkArray
,
writer
->
ctx
->
tombBlkArrayIdx
);
code
=
tsdbDataFileReadTombBlock
(
writer
->
ctx
->
reader
,
tombBlk
,
writer
->
ctx
->
t
Data
);
code
=
tsdbDataFileReadTombBlock
(
writer
->
ctx
->
reader
,
tombBlk
,
writer
->
ctx
->
t
ombBlock
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
writer
->
ctx
->
iRowTomb
=
0
;
writer
->
ctx
->
tombBlockIdx
=
0
;
writer
->
ctx
->
tombBlkArrayIdx
++
;
break
;
}
...
...
@@ -1143,10 +1263,10 @@ static int32_t tsdbDataFileDoWriteTombRecord(SDataFileWriter *writer, const STom
_write:
if
(
record
->
suid
==
INT64_MAX
)
goto
_exit
;
code
=
tTombBlockPut
(
writer
->
t
Data
,
record
);
code
=
tTombBlockPut
(
writer
->
t
ombBlock
,
record
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
TOMB_BLOCK_SIZE
(
writer
->
t
Data
)
>=
writer
->
config
->
maxRow
)
{
if
(
TOMB_BLOCK_SIZE
(
writer
->
t
ombBlock
)
>=
writer
->
config
->
maxRow
)
{
code
=
tsdbDataFileDoWriteTombBlock
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
...
...
@@ -1177,8 +1297,10 @@ static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer, TFileOpArr
code
=
tsdbDataFileWriteTableDataBegin
(
writer
,
tbid
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
#if 0
code = tsdbDataFileWriteBlockIdx(writer);
TSDB_CHECK_CODE(code, lino, _exit);
#endif
code
=
tsdbDataFileWriteHeadFooter
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
...
...
@@ -1329,8 +1451,12 @@ static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) {
}
if
(
writer
->
ctx
->
reader
)
{
code
=
tsdbDataFileReadBrinBlk
(
writer
->
ctx
->
reader
,
&
writer
->
ctx
->
brinBlkArray
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
#if 0
code = tsdbDataFileReadBlockIdx(writer->ctx->reader, &writer->ctx->blockIdxArray);
TSDB_CHECK_CODE(code, lino, _exit);
#endif
}
_exit:
...
...
@@ -1341,7 +1467,7 @@ _exit:
}
int32_t
tsdbDataFileWriterOpen
(
const
SDataFileWriterConfig
*
config
,
SDataFileWriter
**
writer
)
{
writer
[
0
]
=
taosMemoryCalloc
(
1
,
sizeof
(
SDataFileWriter
));
writer
[
0
]
=
taosMemoryCalloc
(
1
,
sizeof
(
*
writer
[
0
]
));
if
(
!
writer
[
0
])
return
TSDB_CODE_OUT_OF_MEMORY
;
writer
[
0
]
->
config
[
0
]
=
config
[
0
];
...
...
@@ -1431,7 +1557,7 @@ int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData)
}
if
(
!
writer
->
ctx
->
tbHasOldData
//
&&
writer
->
bData
->
nRow
==
0
//
&&
writer
->
b
lock
Data
->
nRow
==
0
//
)
{
code
=
tsdbDataFileWriteDataBlock
(
writer
,
bData
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
...
...
@@ -1453,10 +1579,10 @@ _exit:
int32_t
tsdbDataFileFlushTSDataBlock
(
SDataFileWriter
*
writer
)
{
ASSERT
(
writer
->
ctx
->
opened
);
if
(
writer
->
bData
->
nRow
==
0
)
return
0
;
if
(
writer
->
b
lock
Data
->
nRow
==
0
)
return
0
;
if
(
writer
->
ctx
->
tbHasOldData
)
return
0
;
return
tsdbDataFileWriteDataBlock
(
writer
,
writer
->
bData
);
return
tsdbDataFileWriteDataBlock
(
writer
,
writer
->
b
lock
Data
);
}
static
int32_t
tsdbDataFileWriterOpenTombFD
(
SDataFileWriter
*
writer
)
{
...
...
@@ -1488,8 +1614,8 @@ static int32_t tsdbDataFileWriterOpenTombFD(SDataFileWriter *writer) {
}
writer
->
ctx
->
tombBlkArrayIdx
=
0
;
tTombBlockClear
(
writer
->
ctx
->
t
Data
);
writer
->
ctx
->
iRowTomb
=
0
;
tTombBlockClear
(
writer
->
ctx
->
t
ombBlock
);
writer
->
ctx
->
tombBlockIdx
=
0
;
}
_exit:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录