Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1ee9f80e
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
1ee9f80e
编写于
6月 09, 2023
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more code
上级
3753c7d6
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
242 addition
and
193 deletion
+242
-193
source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h
source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h
+3
-3
source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c
source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c
+232
-183
source/dnode/vnode/src/tsdb/dev/tsdbMerge.c
source/dnode/vnode/src/tsdb/dev/tsdbMerge.c
+7
-7
未找到文件。
source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h
浏览文件 @
1ee9f80e
...
...
@@ -86,9 +86,9 @@ typedef struct SDataFileWriterConfig {
int32_t
tsdbDataFileWriterOpen
(
const
SDataFileWriterConfig
*
config
,
SDataFileWriter
**
writer
);
int32_t
tsdbDataFileWriterClose
(
SDataFileWriter
**
writer
,
bool
abort
,
TFileOpArray
*
opArr
);
int32_t
tsdbDataFileWrite
TSData
(
SDataFileWriter
*
writer
,
SRowInfo
*
row
);
int32_t
tsdbDataFileWrite
TSDataBlock
(
SDataFileWriter
*
writer
,
SBlockData
*
bData
);
int32_t
tsdbDataFileFlush
TSDataBlock
(
SDataFileWriter
*
writer
);
int32_t
tsdbDataFileWrite
Row
(
SDataFileWriter
*
writer
,
SRowInfo
*
row
);
int32_t
tsdbDataFileWrite
BlockData
(
SDataFileWriter
*
writer
,
SBlockData
*
bData
);
int32_t
tsdbDataFileFlush
(
SDataFileWriter
*
writer
);
int32_t
tsdbDataFileWriteTombRecord
(
SDataFileWriter
*
writer
,
const
STombRecord
*
record
);
...
...
source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c
浏览文件 @
1ee9f80e
...
...
@@ -672,7 +672,109 @@ _exit:
return
code
;
}
static
int32_t
tsdbDataFileWriteDataBlock
(
SDataFileWriter
*
writer
,
SBlockData
*
bData
)
{
static
int32_t
tsdbDataFileWriteBrinBlock
(
SDataFileWriter
*
writer
)
{
if
(
BRIN_BLOCK_SIZE
(
writer
->
brinBlock
)
==
0
)
return
0
;
int32_t
code
=
0
;
int32_t
lino
=
0
;
// get SBrinBlk
SBrinBlk
brinBlk
[
1
]
=
{
{
.
dp
[
0
]
=
{
.
offset
=
writer
->
files
[
TSDB_FTYPE_HEAD
].
size
,
.
size
=
0
,
},
.
minTbid
=
{
.
suid
=
TARRAY2_FIRST
(
writer
->
brinBlock
->
suid
),
.
uid
=
TARRAY2_FIRST
(
writer
->
brinBlock
->
uid
),
},
.
maxTbid
=
{
.
suid
=
TARRAY2_LAST
(
writer
->
brinBlock
->
suid
),
.
uid
=
TARRAY2_LAST
(
writer
->
brinBlock
->
uid
),
},
.
minVer
=
TARRAY2_FIRST
(
writer
->
brinBlock
->
minVer
),
.
maxVer
=
TARRAY2_FIRST
(
writer
->
brinBlock
->
minVer
),
.
numRec
=
BRIN_BLOCK_SIZE
(
writer
->
brinBlock
),
.
cmprAlg
=
writer
->
config
->
cmprAlg
,
},
};
for
(
int32_t
i
=
1
;
i
<
BRIN_BLOCK_SIZE
(
writer
->
brinBlock
);
i
++
)
{
if
(
brinBlk
->
minVer
>
TARRAY2_GET
(
writer
->
brinBlock
->
minVer
,
i
))
{
brinBlk
->
minVer
=
TARRAY2_GET
(
writer
->
brinBlock
->
minVer
,
i
);
}
if
(
brinBlk
->
maxVer
<
TARRAY2_GET
(
writer
->
brinBlock
->
maxVer
,
i
))
{
brinBlk
->
maxVer
=
TARRAY2_GET
(
writer
->
brinBlock
->
maxVer
,
i
);
}
}
// write to file
for
(
int32_t
i
=
0
;
i
<
ARRAY_SIZE
(
writer
->
brinBlock
->
dataArr1
);
i
++
)
{
code
=
tsdbCmprData
((
uint8_t
*
)
TARRAY2_DATA
(
writer
->
brinBlock
->
dataArr1
+
i
),
TARRAY2_DATA_LEN
(
writer
->
brinBlock
->
dataArr1
+
i
),
TSDB_DATA_TYPE_BIGINT
,
brinBlk
->
cmprAlg
,
&
writer
->
config
->
bufArr
[
0
],
0
,
&
brinBlk
->
size
[
i
],
&
writer
->
config
->
bufArr
[
1
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbWriteFile
(
writer
->
fd
[
TSDB_FTYPE_HEAD
],
writer
->
files
[
TSDB_FTYPE_HEAD
].
size
,
writer
->
config
->
bufArr
[
0
],
brinBlk
->
size
[
i
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
brinBlk
->
dp
[
i
].
size
+=
brinBlk
->
size
[
i
];
writer
->
files
[
TSDB_FTYPE_HEAD
].
size
+=
brinBlk
->
size
[
i
];
}
for
(
int32_t
i
=
0
,
j
=
ARRAY_SIZE
(
writer
->
brinBlock
->
dataArr1
);
i
<
ARRAY_SIZE
(
writer
->
brinBlock
->
dataArr2
);
i
++
,
j
++
)
{
code
=
tsdbCmprData
((
uint8_t
*
)
TARRAY2_DATA
(
writer
->
brinBlock
->
dataArr2
+
i
),
TARRAY2_DATA_LEN
(
writer
->
brinBlock
->
dataArr2
+
i
),
TSDB_DATA_TYPE_INT
,
brinBlk
->
cmprAlg
,
&
writer
->
config
->
bufArr
[
0
],
0
,
&
brinBlk
->
size
[
j
],
&
writer
->
config
->
bufArr
[
1
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbWriteFile
(
writer
->
fd
[
TSDB_FTYPE_HEAD
],
writer
->
files
[
TSDB_FTYPE_HEAD
].
size
,
writer
->
config
->
bufArr
[
0
],
brinBlk
->
size
[
j
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
brinBlk
->
dp
[
i
].
size
+=
brinBlk
->
size
[
j
];
writer
->
files
[
TSDB_FTYPE_HEAD
].
size
+=
brinBlk
->
size
[
j
];
}
// append to brinBlkArray
code
=
TARRAY2_APPEND_PTR
(
writer
->
brinBlkArray
,
brinBlk
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
tBrinBlockClear
(
writer
->
brinBlock
);
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
static
int32_t
tsdbDataFileWriteBrinRecord
(
SDataFileWriter
*
writer
,
const
SBrinRecord
*
record
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
code
=
tBrinBlockPut
(
writer
->
brinBlock
,
record
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
BRIN_BLOCK_SIZE
(
writer
->
brinBlock
)
>=
writer
->
config
->
maxRow
)
{
code
=
tsdbDataFileWriteBrinBlock
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
static
int32_t
tsdbDataFileDoWriteBlockData
(
SDataFileWriter
*
writer
,
SBlockData
*
bData
)
{
if
(
bData
->
nRow
==
0
)
return
0
;
ASSERT
(
bData
->
uid
);
...
...
@@ -680,41 +782,44 @@ static int32_t tsdbDataFileWriteDataBlock(SDataFileWriter *writer, SBlockData *b
int32_t
code
=
0
;
int32_t
lino
=
0
;
SDataBlk
dataBlk
[
1
]
=
{{
.
minKey
=
{
.
ts
=
bData
->
aTSKEY
[
0
],
.
version
=
bData
->
aVersion
[
0
],
},
.
maxKey
=
{
.
ts
=
bData
->
aTSKEY
[
bData
->
nRow
-
1
],
.
version
=
bData
->
aVersion
[
bData
->
nRow
-
1
],
},
SBrinRecord
record
[
1
]
=
{{
.
suid
=
bData
->
suid
,
.
uid
=
bData
->
uid
,
.
firstKey
=
bData
->
aTSKEY
[
0
],
.
firstKeyVer
=
bData
->
aVersion
[
0
],
.
lastKey
=
bData
->
aTSKEY
[
bData
->
nRow
-
1
],
.
lastKeyVer
=
bData
->
aVersion
[
bData
->
nRow
-
1
],
.
minVer
=
bData
->
aVersion
[
0
],
.
maxVer
=
bData
->
aVersion
[
0
],
.
nRow
=
bData
->
nRow
,
.
hasDup
=
0
,
.
nSubBlock
=
1
,
.
blockOffset
=
writer
->
files
[
TSDB_FTYPE_DATA
].
size
,
.
smaOffset
=
writer
->
files
[
TSDB_FTYPE_SMA
].
size
,
.
blockSize
=
0
,
.
blockKeySize
=
0
,
.
smaSize
=
0
,
.
numRow
=
bData
->
nRow
,
.
count
=
1
,
}};
for
(
int32_t
i
=
1
;
i
<
bData
->
nRow
;
++
i
)
{
if
(
bData
->
aTSKEY
[
i
]
==
bData
->
aTSKEY
[
i
-
1
])
{
dataBlk
->
hasDup
=
1
;
if
(
bData
->
aTSKEY
[
i
]
!=
bData
->
aTSKEY
[
i
-
1
])
{
record
->
count
++
;
}
if
(
bData
->
aVersion
[
i
]
<
record
->
minVer
)
{
record
->
minVer
=
bData
->
aVersion
[
i
];
}
if
(
bData
->
aVersion
[
i
]
>
record
->
maxVer
)
{
record
->
maxVer
=
bData
->
aVersion
[
i
];
}
dataBlk
->
minVer
=
TMIN
(
dataBlk
->
minVer
,
bData
->
aVersion
[
i
]);
dataBlk
->
maxVer
=
TMAX
(
dataBlk
->
maxVer
,
bData
->
aVersion
[
i
]);
}
// to .data file
int32_t
sizeArr
[
5
]
=
{
0
};
// to .data
code
=
tCmprBlockData
(
bData
,
writer
->
config
->
cmprAlg
,
NULL
,
NULL
,
writer
->
config
->
bufArr
,
sizeArr
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
dataBlk
->
aSubBlock
->
offset
=
writer
->
files
[
TSDB_FTYPE_DATA
].
size
;
dataBlk
->
aSubBlock
->
szKey
=
sizeArr
[
3
]
+
sizeArr
[
2
];
dataBlk
->
aSubBlock
->
szBlock
=
dataBlk
->
aSubBlock
->
szKey
+
sizeArr
[
1
]
+
sizeArr
[
0
];
record
->
blockKeySize
=
sizeArr
[
3
]
+
sizeArr
[
2
];
record
->
blockSize
=
sizeArr
[
0
]
+
sizeArr
[
1
]
+
record
->
blockKeySize
;
for
(
int32_t
i
=
3
;
i
>=
0
;
--
i
)
{
if
(
sizeArr
[
i
])
{
...
...
@@ -725,8 +830,10 @@ static int32_t tsdbDataFileWriteDataBlock(SDataFileWriter *writer, SBlockData *b
}
}
// to .sma
TColumnDataAggArray
smaArr
[
1
]
=
{
0
};
// to .sma file
TColumnDataAggArray
smaArr
[
1
];
TARRAY2_INIT
(
smaArr
);
for
(
int32_t
i
=
0
;
i
<
bData
->
nColData
;
++
i
)
{
SColData
*
colData
=
bData
->
aColData
+
i
;
...
...
@@ -744,23 +851,20 @@ static int32_t tsdbDataFileWriteDataBlock(SDataFileWriter *writer, SBlockData *b
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
dataBlk
->
smaInfo
.
offset
=
writer
->
files
[
TSDB_FTYPE_SMA
].
size
;
dataBlk
->
smaInfo
.
size
=
TARRAY2_DATA_LEN
(
smaArr
);
record
->
smaSize
=
TARRAY2_DATA_LEN
(
smaArr
);
if
(
dataBlk
->
smaInfo
.
size
)
{
code
=
tsdbWriteFile
(
writer
->
fd
[
TSDB_FTYPE_SMA
],
dataBlk
->
smaInfo
.
o
ffset
,
(
const
uint8_t
*
)
TARRAY2_DATA
(
smaArr
),
dataBlk
->
smaInfo
.
s
ize
);
if
(
record
->
smaSize
>
0
)
{
code
=
tsdbWriteFile
(
writer
->
fd
[
TSDB_FTYPE_SMA
],
record
->
smaO
ffset
,
(
const
uint8_t
*
)
TARRAY2_DATA
(
smaArr
),
record
->
smaS
ize
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
writer
->
files
[
TSDB_FTYPE_SMA
].
size
+=
dataBlk
->
smaInfo
.
s
ize
;
writer
->
files
[
TSDB_FTYPE_SMA
].
size
+=
record
->
smaS
ize
;
}
TARRAY2_DESTROY
(
smaArr
,
NULL
);
#if 0
// to dataBlkArray
code = TARRAY2_APPEND_PTR(writer->dataBlkArray, dataBlk);
// append SBrinRecord
code
=
tsdbDataFileWriteBrinRecord
(
writer
,
record
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
#endif
tBlockDataClear
(
bData
);
...
...
@@ -812,7 +916,14 @@ static int32_t tsdbDataFileDoWriteTSRow(SDataFileWriter *writer, TSDBROW *row) {
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
TSDBKEY
key
[
1
]
=
{
TSDBROW_KEY
(
row
)};
TSDBKEY
key
[
1
];
if
(
row
->
type
==
TSDBROW_ROW_FMT
)
{
key
->
ts
=
row
->
pTSRow
->
ts
;
key
->
version
=
row
->
version
;
}
else
{
key
->
ts
=
row
->
pBlockData
->
aTSKEY
[
row
->
iRow
];
key
->
version
=
row
->
pBlockData
->
aVersion
[
row
->
iRow
];
}
if
(
key
->
version
<=
writer
->
config
->
compactVersion
//
&&
writer
->
blockData
->
nRow
>
0
//
&&
writer
->
blockData
->
aTSKEY
[
writer
->
blockData
->
nRow
-
1
]
==
key
->
ts
//
...
...
@@ -821,7 +932,7 @@ static int32_t tsdbDataFileDoWriteTSRow(SDataFileWriter *writer, TSDBROW *row) {
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
if
(
writer
->
blockData
->
nRow
>=
writer
->
config
->
maxRow
)
{
code
=
tsdbDataFile
WriteDataBlock
(
writer
,
writer
->
blockData
);
code
=
tsdbDataFile
DoWriteBlockData
(
writer
,
writer
->
blockData
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
...
...
@@ -840,54 +951,94 @@ static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row)
int32_t
code
=
0
;
int32_t
lino
=
0
;
while
(
writer
->
ctx
->
tbHasOldData
)
{
for
(;
writer
->
ctx
->
blockDataIdx
<
writer
->
ctx
->
blockData
->
nRow
;
writer
->
ctx
->
blockDataIdx
++
)
{
TSDBROW
row1
[
1
]
=
{
tsdbRowFromBlockData
(
writer
->
ctx
->
blockData
,
writer
->
ctx
->
blockDataIdx
)};
int32_t
c
=
tsdbRowCmprFn
(
row
,
row1
);
ASSERT
(
c
);
if
(
c
>
0
)
{
code
=
tsdbDataFileDoWriteTSRow
(
writer
,
row1
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
goto
_do_write
;
}
}
#if 0
if (writer->ctx->dataBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->dataBlkArray)) {
writer->ctx->tbHasOldData = false;
break;
if
(
writer
->
ctx
->
tbHasOldData
)
{
TSDBKEY
key
[
1
];
if
(
row
->
type
==
TSDBROW_ROW_FMT
)
{
key
->
ts
=
row
->
pTSRow
->
ts
;
key
->
version
=
row
->
version
;
}
else
{
key
->
ts
=
row
->
pBlockData
->
aTSKEY
[
row
->
iRow
];
key
->
version
=
row
->
pBlockData
->
aVersion
[
row
->
iRow
];
}
for (; writer->ctx->dataBlkArrayIdx < TARRAY2_SIZE(writer->ctx->dataBlkArray); writer->ctx->dataBlkArrayIdx++) {
const SDataBlk *dataBlk = TARRAY2_GET_PTR(writer->ctx->dataBlkArray, writer->ctx->dataBlkArrayIdx);
for
(;;)
{
for
(;;)
{
// SBlockData
for
(;
writer
->
ctx
->
blockDataIdx
<
writer
->
ctx
->
blockData
->
nRow
;
writer
->
ctx
->
blockDataIdx
++
)
{
if
(
key
->
ts
<
writer
->
ctx
->
blockData
->
aTSKEY
[
writer
->
ctx
->
blockDataIdx
]
//
||
(
key
->
ts
==
writer
->
ctx
->
blockData
->
aTSKEY
[
writer
->
ctx
->
blockDataIdx
]
&&
key
->
version
<
writer
->
ctx
->
blockData
->
aVersion
[
writer
->
ctx
->
blockDataIdx
]))
{
goto
_do_write
;
}
else
{
TSDBROW
row1
=
tsdbRowFromBlockData
(
writer
->
ctx
->
blockData
,
writer
->
ctx
->
blockDataIdx
);
code
=
tsdbDataFileDoWriteTSRow
(
writer
,
&
row1
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
TSDBKEY key = TSDBROW_KEY(row);
SDataBlk dataBlk1[1] = {{
.minKey = key,
.maxKey = key,
}};
// SBrinBlock
if
(
writer
->
ctx
->
brinBlockIdx
>=
BRIN_BLOCK_SIZE
(
writer
->
ctx
->
brinBlock
))
{
break
;
}
int32_t c = tDataBlkCmprFn(dataBlk, dataBlk1);
if (c < 0) {
code = tsdbDataFileWriteDataBlock(writer, writer->blockData);
TSDB_CHECK_CODE(code, lino, _exit);
for
(;
writer
->
ctx
->
brinBlockIdx
<
BRIN_BLOCK_SIZE
(
writer
->
ctx
->
brinBlock
);
writer
->
ctx
->
brinBlockIdx
++
)
{
if
(
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
uid
,
writer
->
ctx
->
brinBlockIdx
)
!=
writer
->
ctx
->
tbid
->
uid
)
{
writer
->
ctx
->
tbHasOldData
=
false
;
goto
_do_write
;
}
if
(
key
->
ts
<
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
firstKey
,
writer
->
ctx
->
brinBlockIdx
)
//
||
(
key
->
ts
==
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
firstKey
,
writer
->
ctx
->
brinBlockIdx
)
&&
key
->
version
<
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
firstKeyVer
,
writer
->
ctx
->
brinBlockIdx
)))
{
goto
_do_write
;
}
else
{
SBrinRecord
record
[
1
];
tBrinBlockGet
(
writer
->
ctx
->
brinBlock
,
writer
->
ctx
->
brinBlockIdx
,
record
);
if
(
key
->
ts
>
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
lastKey
,
writer
->
ctx
->
brinBlockIdx
)
//
||
(
key
->
ts
==
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
lastKey
,
writer
->
ctx
->
brinBlockIdx
)
&&
key
->
version
>
TARRAY2_GET
(
writer
->
ctx
->
brinBlock
->
lastKeyVer
,
writer
->
ctx
->
brinBlockIdx
)))
{
if
(
writer
->
blockData
->
nRow
>
0
)
{
code
=
tsdbDataFileDoWriteBlockData
(
writer
,
writer
->
blockData
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code
=
tsdbDataFileWriteBrinRecord
(
writer
,
record
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
code
=
tsdbDataFileReadBlockData
(
writer
->
ctx
->
reader
,
record
,
writer
->
ctx
->
blockData
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
writer
->
ctx
->
blockDataIdx
=
0
;
writer
->
ctx
->
brinBlockIdx
++
;
break
;
}
}
}
}
code = TARRAY2_APPEND_PTR(writer->dataBlkArray, dataBlk);
TSDB_CHECK_CODE(code, lino, _exit);
} else if (c > 0) {
// SBrinBlk
if
(
writer
->
ctx
->
brinBlkArrayIdx
>=
TARRAY2_SIZE
(
writer
->
ctx
->
brinBlkArray
))
{
writer
->
ctx
->
brinBlkArray
=
NULL
;
writer
->
ctx
->
tbHasOldData
=
false
;
goto
_do_write
;
} else {
code = tsdbDataFileReadDataBlock(writer->ctx->reader, dataBlk, writer->ctx->blockData);
}
for
(;
writer
->
ctx
->
brinBlkArrayIdx
<
TARRAY2_SIZE
(
writer
->
ctx
->
brinBlkArray
);
writer
->
ctx
->
brinBlkArrayIdx
++
)
{
const
SBrinBlk
*
brinBlk
=
TARRAY2_GET_PTR
(
writer
->
ctx
->
brinBlkArray
,
writer
->
ctx
->
brinBlkArrayIdx
);
if
(
brinBlk
->
minTbid
.
uid
!=
writer
->
ctx
->
tbid
->
uid
)
{
writer
->
ctx
->
tbHasOldData
=
false
;
goto
_do_write
;
}
code
=
tsdbDataFileReadBrinBlock
(
writer
->
ctx
->
reader
,
brinBlk
,
writer
->
ctx
->
brinBlock
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
writer->ctx->b
lockData
Idx = 0;
writer->ctx->
data
BlkArrayIdx++;
writer
->
ctx
->
b
rinBlock
Idx
=
0
;
writer
->
ctx
->
brin
BlkArrayIdx
++
;
break
;
}
}
#endif
}
_do_write:
...
...
@@ -915,111 +1066,9 @@ static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) {
}
}
code
=
tsdbDataFileWriteDataBlock
(
writer
,
writer
->
blockData
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
static
int32_t
tsdbDataFileWriteBrinBlock
(
SDataFileWriter
*
writer
)
{
if
(
BRIN_BLOCK_SIZE
(
writer
->
brinBlock
)
==
0
)
return
0
;
int32_t
code
=
0
;
int32_t
lino
=
0
;
// get SBrinBlk
SBrinBlk
brinBlk
[
1
]
=
{
{
.
dp
[
0
]
=
{
.
offset
=
writer
->
files
[
TSDB_FTYPE_HEAD
].
size
,
.
size
=
0
,
},
.
minTbid
=
{
.
suid
=
TARRAY2_FIRST
(
writer
->
brinBlock
->
suid
),
.
uid
=
TARRAY2_FIRST
(
writer
->
brinBlock
->
uid
),
},
.
maxTbid
=
{
.
suid
=
TARRAY2_LAST
(
writer
->
brinBlock
->
suid
),
.
uid
=
TARRAY2_LAST
(
writer
->
brinBlock
->
uid
),
},
.
minVer
=
TARRAY2_FIRST
(
writer
->
brinBlock
->
minVer
),
.
maxVer
=
TARRAY2_FIRST
(
writer
->
brinBlock
->
minVer
),
.
numRec
=
BRIN_BLOCK_SIZE
(
writer
->
brinBlock
),
.
cmprAlg
=
writer
->
config
->
cmprAlg
,
},
};
for
(
int32_t
i
=
1
;
i
<
BRIN_BLOCK_SIZE
(
writer
->
brinBlock
);
i
++
)
{
if
(
brinBlk
->
minVer
>
TARRAY2_GET
(
writer
->
brinBlock
->
minVer
,
i
))
{
brinBlk
->
minVer
=
TARRAY2_GET
(
writer
->
brinBlock
->
minVer
,
i
);
}
if
(
brinBlk
->
maxVer
<
TARRAY2_GET
(
writer
->
brinBlock
->
maxVer
,
i
))
{
brinBlk
->
maxVer
=
TARRAY2_GET
(
writer
->
brinBlock
->
maxVer
,
i
);
}
}
// write to file
for
(
int32_t
i
=
0
;
i
<
ARRAY_SIZE
(
writer
->
brinBlock
->
dataArr1
);
i
++
)
{
code
=
tsdbCmprData
((
uint8_t
*
)
TARRAY2_DATA
(
writer
->
brinBlock
->
dataArr1
+
i
),
TARRAY2_DATA_LEN
(
writer
->
brinBlock
->
dataArr1
+
i
),
TSDB_DATA_TYPE_BIGINT
,
brinBlk
->
cmprAlg
,
&
writer
->
config
->
bufArr
[
0
],
0
,
&
brinBlk
->
size
[
i
],
&
writer
->
config
->
bufArr
[
1
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbWriteFile
(
writer
->
fd
[
TSDB_FTYPE_HEAD
],
writer
->
files
[
TSDB_FTYPE_HEAD
].
size
,
writer
->
config
->
bufArr
[
0
],
brinBlk
->
size
[
i
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
brinBlk
->
dp
[
i
].
size
+=
brinBlk
->
size
[
i
];
writer
->
files
[
TSDB_FTYPE_HEAD
].
size
+=
brinBlk
->
size
[
i
];
}
for
(
int32_t
i
=
0
,
j
=
ARRAY_SIZE
(
writer
->
brinBlock
->
dataArr1
);
i
<
ARRAY_SIZE
(
writer
->
brinBlock
->
dataArr2
);
i
++
,
j
++
)
{
code
=
tsdbCmprData
((
uint8_t
*
)
TARRAY2_DATA
(
writer
->
brinBlock
->
dataArr2
+
i
),
TARRAY2_DATA_LEN
(
writer
->
brinBlock
->
dataArr2
+
i
),
TSDB_DATA_TYPE_INT
,
brinBlk
->
cmprAlg
,
&
writer
->
config
->
bufArr
[
0
],
0
,
&
brinBlk
->
size
[
j
],
&
writer
->
config
->
bufArr
[
1
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbWriteFile
(
writer
->
fd
[
TSDB_FTYPE_HEAD
],
writer
->
files
[
TSDB_FTYPE_HEAD
].
size
,
writer
->
config
->
bufArr
[
0
],
brinBlk
->
size
[
j
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
brinBlk
->
dp
[
i
].
size
+=
brinBlk
->
size
[
j
];
writer
->
files
[
TSDB_FTYPE_HEAD
].
size
+=
brinBlk
->
size
[
j
];
}
// append to brinBlkArray
code
=
TARRAY2_APPEND_PTR
(
writer
->
brinBlkArray
,
brinBlk
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
tBrinBlockClear
(
writer
->
brinBlock
);
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
static
int32_t
tsdbDataFileWriteBrinRecord
(
SDataFileWriter
*
writer
,
const
SBrinRecord
*
record
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
code
=
tBrinBlockPut
(
writer
->
brinBlock
,
record
);
code
=
tsdbDataFileDoWriteBlockData
(
writer
,
writer
->
blockData
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
BRIN_BLOCK_SIZE
(
writer
->
brinBlock
)
>=
writer
->
config
->
maxRow
)
{
code
=
tsdbDataFileWriteBrinBlock
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
),
lino
,
code
);
...
...
@@ -1498,7 +1547,7 @@ _exit:
return
code
;
}
int32_t
tsdbDataFileWrite
TSData
(
SDataFileWriter
*
writer
,
SRowInfo
*
row
)
{
int32_t
tsdbDataFileWrite
Row
(
SDataFileWriter
*
writer
,
SRowInfo
*
row
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
...
...
@@ -1530,7 +1579,7 @@ _exit:
return
code
;
}
int32_t
tsdbDataFileWrite
TSDataBlock
(
SDataFileWriter
*
writer
,
SBlockData
*
bData
)
{
int32_t
tsdbDataFileWrite
BlockData
(
SDataFileWriter
*
writer
,
SBlockData
*
bData
)
{
if
(
bData
->
nRow
==
0
)
return
0
;
int32_t
code
=
0
;
...
...
@@ -1543,7 +1592,7 @@ int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData)
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
!
writer
->
fd
[
TSDB_FTYPE_DATA
]
)
{
if
(
writer
->
fd
[
TSDB_FTYPE_DATA
]
==
NULL
)
{
code
=
tsdbDataFileWriterOpenDataFD
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
...
...
@@ -1559,7 +1608,7 @@ int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData)
if
(
!
writer
->
ctx
->
tbHasOldData
//
&&
writer
->
blockData
->
nRow
==
0
//
)
{
code
=
tsdbDataFile
WriteDataBlock
(
writer
,
bData
);
code
=
tsdbDataFile
DoWriteBlockData
(
writer
,
bData
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
for
(
int32_t
i
=
0
;
i
<
bData
->
nRow
;
++
i
)
{
...
...
@@ -1576,13 +1625,13 @@ _exit:
return
code
;
}
int32_t
tsdbDataFileFlush
TSDataBlock
(
SDataFileWriter
*
writer
)
{
int32_t
tsdbDataFileFlush
(
SDataFileWriter
*
writer
)
{
ASSERT
(
writer
->
ctx
->
opened
);
if
(
writer
->
blockData
->
nRow
==
0
)
return
0
;
if
(
writer
->
ctx
->
tbHasOldData
)
return
0
;
return
tsdbDataFile
WriteDataBlock
(
writer
,
writer
->
blockData
);
return
tsdbDataFile
DoWriteBlockData
(
writer
,
writer
->
blockData
);
}
static
int32_t
tsdbDataFileWriterOpenTombFD
(
SDataFileWriter
*
writer
)
{
...
...
source/dnode/vnode/src/tsdb/dev/tsdbMerge.c
浏览文件 @
1ee9f80e
...
...
@@ -130,35 +130,35 @@ static int32_t tsdbMergeToDataTableEnd(SMerger *merger) {
for
(
int32_t
i
=
0
;
i
<
numRow
;
i
++
)
{
row
->
row
.
iRow
=
i
;
code
=
tsdbDataFileWrite
TSData
(
merger
->
dataWriter
,
row
);
code
=
tsdbDataFileWrite
Row
(
merger
->
dataWriter
,
row
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code
=
tsdbDataFileFlush
TSDataBlock
(
merger
->
dataWriter
);
code
=
tsdbDataFileFlush
(
merger
->
dataWriter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
for
(
int32_t
i
=
numRow
;
i
<
merger
->
ctx
->
bData
[
pidx
].
nRow
;
i
++
)
{
row
->
row
.
iRow
=
i
;
code
=
tsdbDataFileWrite
TSData
(
merger
->
dataWriter
,
row
);
code
=
tsdbDataFileWrite
Row
(
merger
->
dataWriter
,
row
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
row
->
row
=
tsdbRowFromBlockData
(
merger
->
ctx
->
bData
+
cidx
,
0
);
for
(
int32_t
i
=
0
;
i
<
merger
->
ctx
->
bData
[
cidx
].
nRow
;
i
++
)
{
row
->
row
.
iRow
=
i
;
code
=
tsdbDataFileWrite
TSData
(
merger
->
dataWriter
,
row
);
code
=
tsdbDataFileWrite
Row
(
merger
->
dataWriter
,
row
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
else
{
if
(
merger
->
ctx
->
bData
[
pidx
].
nRow
>
0
)
{
code
=
tsdbDataFileWrite
TSDataBlock
(
merger
->
dataWriter
,
merger
->
ctx
->
bData
+
cidx
);
code
=
tsdbDataFileWrite
BlockData
(
merger
->
dataWriter
,
merger
->
ctx
->
bData
+
cidx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
merger
->
ctx
->
bData
[
cidx
].
nRow
<
merger
->
minRow
)
{
code
=
tsdbSttFileWriteTSDataBlock
(
merger
->
sttWriter
,
merger
->
ctx
->
bData
+
cidx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
code
=
tsdbDataFileWrite
TSDataBlock
(
merger
->
dataWriter
,
merger
->
ctx
->
bData
+
cidx
);
code
=
tsdbDataFileWrite
BlockData
(
merger
->
dataWriter
,
merger
->
ctx
->
bData
+
cidx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
...
...
@@ -224,7 +224,7 @@ static int32_t tsdbMergeToDataLevel(SMerger *merger) {
if
(
merger
->
ctx
->
bData
[
merger
->
ctx
->
bDataIdx
].
nRow
>=
merger
->
maxRow
)
{
int32_t
idx
=
(
merger
->
ctx
->
bDataIdx
+
1
)
%
2
;
code
=
tsdbDataFileWrite
TSDataBlock
(
merger
->
dataWriter
,
merger
->
ctx
->
bData
+
idx
);
code
=
tsdbDataFileWrite
BlockData
(
merger
->
dataWriter
,
merger
->
ctx
->
bData
+
idx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
tBlockDataClear
(
merger
->
ctx
->
bData
+
idx
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录