Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8329284a
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
8329284a
编写于
9月 02, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refact for further dev
上级
7fe743d0
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
804 addition
and
829 deletion
+804
-829
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+6
-6
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
+798
-823
未找到文件。
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
8329284a
...
...
@@ -586,6 +586,12 @@ struct SDelFWriter {
uint8_t
*
aBuf
[
1
];
};
struct
STsdbReadSnap
{
SMemTable
*
pMem
;
SMemTable
*
pIMem
;
STsdbFS
fs
;
};
struct
SDataFWriter
{
STsdb
*
pTsdb
;
SDFileSet
wSet
;
...
...
@@ -603,12 +609,6 @@ struct SDataFWriter {
uint8_t
*
aBuf
[
4
];
};
struct
STsdbReadSnap
{
SMemTable
*
pMem
;
SMemTable
*
pIMem
;
STsdbFS
fs
;
};
struct
SDataFReader
{
STsdb
*
pTsdb
;
SDFileSet
*
pSet
;
...
...
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
浏览文件 @
8329284a
...
...
@@ -23,7 +23,7 @@ typedef struct {
int64_t
pgno
;
}
STsdbFD
;
int32_t
tsdbOpenFile
(
const
char
*
path
,
int32_t
opt
,
STsdbFD
*
pFD
)
{
static
int32_t
tsdbOpenFile
(
const
char
*
path
,
int32_t
opt
,
STsdbFD
*
pFD
)
{
int32_t
code
=
0
;
pFD
->
pFD
=
taosOpenFile
(
path
,
opt
);
...
...
@@ -45,12 +45,12 @@ _exit:
return
code
;
}
void
tsdbCloseFile
(
STsdbFD
*
pFD
)
{
static
void
tsdbCloseFile
(
STsdbFD
*
pFD
)
{
taosMemoryFree
(
pFD
->
pBuf
);
taosCloseFile
(
&
pFD
->
pFD
);
}
int32_t
tsdbSyncFile
(
STsdbFD
*
pFD
)
{
static
int32_t
tsdbSyncFile
(
STsdbFD
*
pFD
)
{
int32_t
code
=
0
;
if
(
taosFsyncFile
(
pFD
->
pFD
)
<
0
)
{
...
...
@@ -62,7 +62,7 @@ _exit:
return
code
;
}
int32_t
tsdbWriteFile
(
STsdbFD
*
pFD
,
uint8_t
*
pBuf
,
int32_t
nBuf
,
int64_t
*
offset
)
{
static
int32_t
tsdbWriteFile
(
STsdbFD
*
pFD
,
uint8_t
*
pBuf
,
int32_t
nBuf
,
int64_t
*
offset
)
{
int32_t
code
=
0
;
int32_t
n
=
0
;
...
...
@@ -120,7 +120,7 @@ _exit:
return
code
;
}
int64_t
tsdbReadFile
(
STsdbFD
*
pFD
,
int64_t
offset
,
uint8_t
*
pBuf
,
int64_t
count
)
{
static
int64_t
tsdbReadFile
(
STsdbFD
*
pFD
,
int64_t
offset
,
uint8_t
*
pBuf
,
int64_t
count
)
{
int32_t
code
=
0
;
int64_t
pgno
=
offset
/
pFD
->
szPage
;
...
...
@@ -147,1219 +147,1195 @@ _exit:
return
code
;
}
// SDataF
Read
er ====================================================
int32_t
tsdbDataF
ReaderOpen
(
SDataFReader
**
ppRead
er
,
STsdb
*
pTsdb
,
SDFileSet
*
pSet
)
{
// SDataF
Writ
er ====================================================
int32_t
tsdbDataF
WriterOpen
(
SDataFWriter
**
ppWrit
er
,
STsdb
*
pTsdb
,
SDFileSet
*
pSet
)
{
int32_t
code
=
0
;
SDataFReader
*
pReader
;
int32_t
flag
;
int64_t
n
;
SDataFWriter
*
pWriter
=
NULL
;
char
fname
[
TSDB_FILENAME_LEN
];
char
hdr
[
TSDB_FHDR_SIZE
]
=
{
0
};
// alloc
p
Reader
=
(
SDataFReader
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pRead
er
));
if
(
p
Read
er
==
NULL
)
{
p
Writer
=
taosMemoryCalloc
(
1
,
sizeof
(
*
pWrit
er
));
if
(
p
Writ
er
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pReader
->
pTsdb
=
pTsdb
;
pReader
->
pSet
=
pSet
;
pWriter
->
pTsdb
=
pTsdb
;
pWriter
->
wSet
=
(
SDFileSet
){
.
diskId
=
pSet
->
diskId
,
.
fid
=
pSet
->
fid
,
.
pHeadF
=
&
pWriter
->
fHead
,
.
pDataF
=
&
pWriter
->
fData
,
.
pSmaF
=
&
pWriter
->
fSma
,
.
nSstF
=
pSet
->
nSstF
//
};
pWriter
->
fHead
=
*
pSet
->
pHeadF
;
pWriter
->
fData
=
*
pSet
->
pDataF
;
pWriter
->
fSma
=
*
pSet
->
pSmaF
;
for
(
int8_t
iSst
=
0
;
iSst
<
pSet
->
nSstF
;
iSst
++
)
{
pWriter
->
wSet
.
aSstF
[
iSst
]
=
&
pWriter
->
fSst
[
iSst
];
pWriter
->
fSst
[
iSst
]
=
*
pSet
->
aSstF
[
iSst
];
}
// open impl
// head
tsdbHeadFileName
(
pTsdb
,
pSet
->
diskId
,
pSet
->
fid
,
pSet
->
pHeadF
,
fname
);
pReader
->
pHeadFD
=
taosOpenFile
(
fname
,
TD_FILE_READ
);
if
(
pReader
->
pHeadFD
==
NULL
)
{
flag
=
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
;
tsdbHeadFileName
(
pTsdb
,
pWriter
->
wSet
.
diskId
,
pWriter
->
wSet
.
fid
,
&
pWriter
->
fHead
,
fname
);
pWriter
->
pHeadFD
=
taosOpenFile
(
fname
,
flag
);
if
(
pWriter
->
pHeadFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
n
=
taosWriteFile
(
pWriter
->
pHeadFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
ASSERT
(
n
==
TSDB_FHDR_SIZE
);
pWriter
->
fHead
.
size
+=
TSDB_FHDR_SIZE
;
// data
tsdbDataFileName
(
pTsdb
,
pSet
->
diskId
,
pSet
->
fid
,
pSet
->
pDataF
,
fname
);
pReader
->
pDataFD
=
taosOpenFile
(
fname
,
TD_FILE_READ
);
if
(
pReader
->
pDataFD
==
NULL
)
{
if
(
pWriter
->
fData
.
size
==
0
)
{
flag
=
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
;
}
else
{
flag
=
TD_FILE_WRITE
;
}
tsdbDataFileName
(
pTsdb
,
pWriter
->
wSet
.
diskId
,
pWriter
->
wSet
.
fid
,
&
pWriter
->
fData
,
fname
);
pWriter
->
pDataFD
=
taosOpenFile
(
fname
,
flag
);
if
(
pWriter
->
pDataFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
if
(
pWriter
->
fData
.
size
==
0
)
{
n
=
taosWriteFile
(
pWriter
->
pDataFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
pWriter
->
fData
.
size
+=
TSDB_FHDR_SIZE
;
}
else
{
n
=
taosLSeekFile
(
pWriter
->
pDataFD
,
0
,
SEEK_END
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
ASSERT
(
n
==
pWriter
->
fData
.
size
);
}
// sma
tsdbSmaFileName
(
pTsdb
,
pSet
->
diskId
,
pSet
->
fid
,
pSet
->
pSmaF
,
fname
);
pReader
->
pSmaFD
=
taosOpenFile
(
fname
,
TD_FILE_READ
);
if
(
pReader
->
pSmaFD
==
NULL
)
{
if
(
pWriter
->
fSma
.
size
==
0
)
{
flag
=
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
;
}
else
{
flag
=
TD_FILE_WRITE
;
}
tsdbSmaFileName
(
pTsdb
,
pWriter
->
wSet
.
diskId
,
pWriter
->
wSet
.
fid
,
&
pWriter
->
fSma
,
fname
);
pWriter
->
pSmaFD
=
taosOpenFile
(
fname
,
flag
);
if
(
pWriter
->
pSmaFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
if
(
pWriter
->
fSma
.
size
==
0
)
{
n
=
taosWriteFile
(
pWriter
->
pSmaFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// sst
for
(
int32_t
iSst
=
0
;
iSst
<
pSet
->
nSstF
;
iSst
++
)
{
tsdbSstFileName
(
pTsdb
,
pSet
->
diskId
,
pSet
->
fid
,
pSet
->
aSstF
[
iSst
],
fname
);
pReader
->
aLastFD
[
iSst
]
=
taosOpenFile
(
fname
,
TD_FILE_READ
);
if
(
pReader
->
aLastFD
[
iSst
]
==
NULL
)
{
pWriter
->
fSma
.
size
+=
TSDB_FHDR_SIZE
;
}
else
{
n
=
taosLSeekFile
(
pWriter
->
pSmaFD
,
0
,
SEEK_END
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
ASSERT
(
n
==
pWriter
->
fSma
.
size
);
}
*
ppReader
=
pReader
;
// sst
ASSERT
(
pWriter
->
fSst
[
pSet
->
nSstF
-
1
].
size
==
0
);
flag
=
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
;
tsdbSstFileName
(
pTsdb
,
pWriter
->
wSet
.
diskId
,
pWriter
->
wSet
.
fid
,
&
pWriter
->
fSst
[
pSet
->
nSstF
-
1
],
fname
);
pWriter
->
pLastFD
=
taosOpenFile
(
fname
,
flag
);
if
(
pWriter
->
pLastFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
n
=
taosWriteFile
(
pWriter
->
pLastFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
pWriter
->
fSst
[
pWriter
->
wSet
.
nSstF
-
1
].
size
+=
TSDB_FHDR_SIZE
;
*
ppWriter
=
pWriter
;
return
code
;
_err:
tsdbError
(
"vgId:%d, tsdb data file
read
er open failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
*
pp
Read
er
=
NULL
;
tsdbError
(
"vgId:%d, tsdb data file
writ
er open failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
*
pp
Writ
er
=
NULL
;
return
code
;
}
int32_t
tsdbDataF
ReaderClose
(
SDataFReader
**
ppReader
)
{
int32_t
tsdbDataF
WriterClose
(
SDataFWriter
**
ppWriter
,
int8_t
sync
)
{
int32_t
code
=
0
;
if
(
*
ppReader
==
NULL
)
goto
_exit
;
STsdb
*
pTsdb
=
NULL
;
// head
if
(
taosCloseFile
(
&
(
*
ppReader
)
->
pHeadFD
)
<
0
)
{
if
(
*
ppWriter
==
NULL
)
goto
_exit
;
pTsdb
=
(
*
ppWriter
)
->
pTsdb
;
if
(
sync
)
{
if
(
taosFsyncFile
((
*
ppWriter
)
->
pHeadFD
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
if
(
taosFsyncFile
((
*
ppWriter
)
->
pDataFD
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
if
(
taosFsyncFile
((
*
ppWriter
)
->
pSmaFD
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
if
(
taosFsyncFile
((
*
ppWriter
)
->
pLastFD
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
}
if
(
taosCloseFile
(
&
(
*
ppWriter
)
->
pHeadFD
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// data
if
(
taosCloseFile
(
&
(
*
ppReader
)
->
pDataFD
)
<
0
)
{
if
(
taosCloseFile
(
&
(
*
ppWriter
)
->
pDataFD
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// sma
if
(
taosCloseFile
(
&
(
*
ppReader
)
->
pSmaFD
)
<
0
)
{
if
(
taosCloseFile
(
&
(
*
ppWriter
)
->
pSmaFD
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// sst
for
(
int32_t
iSst
=
0
;
iSst
<
(
*
ppReader
)
->
pSet
->
nSstF
;
iSst
++
)
{
if
(
taosCloseFile
(
&
(
*
ppReader
)
->
aLastFD
[
iSst
])
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
if
(
taosCloseFile
(
&
(
*
ppWriter
)
->
pLastFD
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
for
(
int32_t
iBuf
=
0
;
iBuf
<
sizeof
((
*
pp
Read
er
)
->
aBuf
)
/
sizeof
(
uint8_t
*
);
iBuf
++
)
{
tFree
((
*
pp
Read
er
)
->
aBuf
[
iBuf
]);
for
(
int32_t
iBuf
=
0
;
iBuf
<
sizeof
((
*
pp
Writ
er
)
->
aBuf
)
/
sizeof
(
uint8_t
*
);
iBuf
++
)
{
tFree
((
*
pp
Writ
er
)
->
aBuf
[
iBuf
]);
}
taosMemoryFree
(
*
ppReader
);
taosMemoryFree
(
*
ppWriter
);
_exit:
*
pp
Read
er
=
NULL
;
*
pp
Writ
er
=
NULL
;
return
code
;
_err:
tsdbError
(
"vgId:%d, data file
reader close failed since %s"
,
TD_VID
((
*
ppReader
)
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d, data file
writer close failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
int32_t
tsdbReadBlockIdx
(
SDataFReader
*
pReader
,
SArray
*
aBlockIdx
)
{
int32_t
code
=
0
;
int64_t
offset
=
pReader
->
pSet
->
pHeadF
->
offset
;
int64_t
size
=
pReader
->
pSet
->
pHeadF
->
size
-
offset
;
int64_t
n
;
uint32_t
delimiter
;
taosArrayClear
(
aBlockIdx
);
if
(
size
==
0
)
{
goto
_exit
;
}
int32_t
tsdbUpdateDFileSetHeader
(
SDataFWriter
*
pWriter
)
{
int32_t
code
=
0
;
int64_t
n
;
char
hdr
[
TSDB_FHDR_SIZE
];
// alloc
code
=
tRealloc
(
&
pReader
->
aBuf
[
0
],
size
);
if
(
code
)
goto
_err
;
// head ==============
memset
(
hdr
,
0
,
TSDB_FHDR_SIZE
);
tPutHeadFile
(
hdr
,
&
pWriter
->
fHead
);
taosCalcChecksumAppend
(
0
,
hdr
,
TSDB_FHDR_SIZE
);
// seek
if
(
taosLSeekFile
(
pReader
->
pHeadFD
,
offset
,
SEEK_SET
)
<
0
)
{
n
=
taosLSeekFile
(
pWriter
->
pHeadFD
,
0
,
SEEK_SET
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// read
n
=
taosReadFile
(
pReader
->
pHeadFD
,
pReader
->
aBuf
[
0
],
size
);
n
=
taosWriteFile
(
pWriter
->
pHeadFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
else
if
(
n
<
size
)
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
}
// data ==============
memset
(
hdr
,
0
,
TSDB_FHDR_SIZE
);
tPutDataFile
(
hdr
,
&
pWriter
->
fData
);
taosCalcChecksumAppend
(
0
,
hdr
,
TSDB_FHDR_SIZE
);
n
=
taosLSeekFile
(
pWriter
->
pDataFD
,
0
,
SEEK_SET
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// check
if
(
!
taosCheckChecksumWhole
(
pReader
->
aBuf
[
0
],
size
)
)
{
code
=
T
SDB_CODE_FILE_CORRUPTED
;
n
=
taosWriteFile
(
pWriter
->
pDataFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
T
AOS_SYSTEM_ERROR
(
errno
)
;
goto
_err
;
}
//
decode
n
=
0
;
n
=
tGetU32
(
pReader
->
aBuf
[
0
]
+
n
,
&
delimiter
);
ASSERT
(
delimiter
==
TSDB_FILE_DLMT
);
//
sma ==============
memset
(
hdr
,
0
,
TSDB_FHDR_SIZE
)
;
tPutSmaFile
(
hdr
,
&
pWriter
->
fSma
);
taosCalcChecksumAppend
(
0
,
hdr
,
TSDB_FHDR_SIZE
);
while
(
n
<
size
-
sizeof
(
TSCKSUM
))
{
SBlockIdx
blockIdx
;
n
+=
tGetBlockIdx
(
pReader
->
aBuf
[
0
]
+
n
,
&
blockIdx
);
n
=
taosLSeekFile
(
pWriter
->
pSmaFD
,
0
,
SEEK_SET
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
if
(
taosArrayPush
(
aBlockIdx
,
&
blockIdx
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
n
=
taosWriteFile
(
pWriter
->
pSmaFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
)
;
goto
_err
;
}
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
// sst ==============
memset
(
hdr
,
0
,
TSDB_FHDR_SIZE
);
tPutSstFile
(
hdr
,
&
pWriter
->
fSst
[
pWriter
->
wSet
.
nSstF
-
1
]);
taosCalcChecksumAppend
(
0
,
hdr
,
TSDB_FHDR_SIZE
);
n
=
taosLSeekFile
(
pWriter
->
pLastFD
,
0
,
SEEK_SET
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
n
=
taosWriteFile
(
pWriter
->
pLastFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
_exit:
return
code
;
_err:
tsdbError
(
"vgId:%d,
read block idx failed since %s"
,
TD_VID
(
pRead
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d,
update DFileSet header failed since %s"
,
TD_VID
(
pWrit
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
int32_t
tsdbReadSstBlk
(
SDataFReader
*
pReader
,
int32_t
iSst
,
SArray
*
aSstBlk
)
{
int32_t
code
=
0
;
int64_t
offset
=
pReader
->
pSet
->
aSstF
[
iSst
]
->
offset
;
int64_t
size
=
pReader
->
pSet
->
aSstF
[
iSst
]
->
size
-
offset
;
int64_t
n
;
uint32_t
delimiter
;
int32_t
tsdbWriteBlockIdx
(
SDataFWriter
*
pWriter
,
SArray
*
aBlockIdx
)
{
int32_t
code
=
0
;
SHeadFile
*
pHeadFile
=
&
pWriter
->
fHead
;
int64_t
size
=
0
;
int64_t
n
;
taosArrayClear
(
aSstBlk
);
if
(
size
==
0
)
{
// check
if
(
taosArrayGetSize
(
aBlockIdx
)
==
0
)
{
pHeadFile
->
offset
=
pHeadFile
->
size
;
goto
_exit
;
}
// prepare
size
=
sizeof
(
uint32_t
);
for
(
int32_t
iBlockIdx
=
0
;
iBlockIdx
<
taosArrayGetSize
(
aBlockIdx
);
iBlockIdx
++
)
{
size
+=
tPutBlockIdx
(
NULL
,
taosArrayGet
(
aBlockIdx
,
iBlockIdx
));
}
size
+=
sizeof
(
TSCKSUM
);
// alloc
code
=
tRealloc
(
&
p
Read
er
->
aBuf
[
0
],
size
);
code
=
tRealloc
(
&
p
Writ
er
->
aBuf
[
0
],
size
);
if
(
code
)
goto
_err
;
// seek
if
(
taosLSeekFile
(
pReader
->
aLastFD
[
iSst
],
offset
,
SEEK_SET
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
// build
n
=
0
;
n
=
tPutU32
(
pWriter
->
aBuf
[
0
]
+
n
,
TSDB_FILE_DLMT
);
for
(
int32_t
iBlockIdx
=
0
;
iBlockIdx
<
taosArrayGetSize
(
aBlockIdx
);
iBlockIdx
++
)
{
n
+=
tPutBlockIdx
(
pWriter
->
aBuf
[
0
]
+
n
,
taosArrayGet
(
aBlockIdx
,
iBlockIdx
));
}
taosCalcChecksumAppend
(
0
,
pWriter
->
aBuf
[
0
],
size
);
// read
n
=
taosReadFile
(
pReader
->
aLastFD
[
iSst
],
pReader
->
aBuf
[
0
],
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
// write
n
=
taosWriteFile
(
pWriter
->
pHeadFD
,
pWriter
->
aBuf
[
0
],
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
else
if
(
n
<
size
)
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
// check
if
(
!
taosCheckChecksumWhole
(
pReader
->
aBuf
[
0
],
size
))
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
// decode
n
=
0
;
n
=
tGetU32
(
pReader
->
aBuf
[
0
]
+
n
,
&
delimiter
);
ASSERT
(
delimiter
==
TSDB_FILE_DLMT
);
while
(
n
<
size
-
sizeof
(
TSCKSUM
))
{
SSstBlk
blockl
;
n
+=
tGetSstBlk
(
pReader
->
aBuf
[
0
]
+
n
,
&
blockl
);
if
(
taosArrayPush
(
aSstBlk
,
&
blockl
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
}
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
// update
pHeadFile
->
offset
=
pHeadFile
->
size
;
pHeadFile
->
size
+=
size
;
_exit:
tsdbTrace
(
"vgId:%d write block idx, offset:%"
PRId64
" size:%"
PRId64
" nBlockIdx:%d"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
pHeadFile
->
offset
,
size
,
taosArrayGetSize
(
aBlockIdx
));
return
code
;
_err:
tsdbError
(
"vgId:%d
read blockl failed since %s"
,
TD_VID
(
pRead
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d
, write block idx failed since %s"
,
TD_VID
(
pWrit
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
int32_t
tsdbReadBlock
(
SDataFReader
*
pReader
,
SBlockIdx
*
pBlockIdx
,
SMapData
*
mBlock
)
{
int32_t
code
=
0
;
int64_t
offset
=
pBlockIdx
->
offset
;
int64_t
size
=
pBlockIdx
->
size
;
int64_t
n
;
int64_t
tn
;
int32_t
tsdbWriteBlock
(
SDataFWriter
*
pWriter
,
SMapData
*
mBlock
,
SBlockIdx
*
pBlockIdx
)
{
int32_t
code
=
0
;
SHeadFile
*
pHeadFile
=
&
pWriter
->
fHead
;
int64_t
size
;
int64_t
n
;
ASSERT
(
mBlock
->
nItem
>
0
);
// alloc
code
=
tRealloc
(
&
pReader
->
aBuf
[
0
],
size
);
size
=
sizeof
(
uint32_t
)
+
tPutMapData
(
NULL
,
mBlock
)
+
sizeof
(
TSCKSUM
);
code
=
tRealloc
(
&
pWriter
->
aBuf
[
0
],
size
);
if
(
code
)
goto
_err
;
//
seek
if
(
taosLSeekFile
(
pReader
->
pHeadFD
,
offset
,
SEEK_SET
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
//
build
n
=
0
;
n
+=
tPutU32
(
pWriter
->
aBuf
[
0
]
+
n
,
TSDB_FILE_DLMT
);
n
+=
tPutMapData
(
pWriter
->
aBuf
[
0
]
+
n
,
mBlock
)
;
taosCalcChecksumAppend
(
0
,
pWriter
->
aBuf
[
0
],
size
);
// read
n
=
taosReadFile
(
pReader
->
pHeadFD
,
pReader
->
aBuf
[
0
],
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
// write
n
=
taosWriteFile
(
pWriter
->
pHeadFD
,
pWriter
->
aBuf
[
0
],
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
else
if
(
n
<
size
)
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
// check
if
(
!
taosCheckChecksumWhole
(
pReader
->
aBuf
[
0
],
size
))
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
// decode
n
=
0
;
uint32_t
delimiter
;
n
+=
tGetU32
(
pReader
->
aBuf
[
0
]
+
n
,
&
delimiter
);
ASSERT
(
delimiter
==
TSDB_FILE_DLMT
);
tn
=
tGetMapData
(
pReader
->
aBuf
[
0
]
+
n
,
mBlock
);
if
(
tn
<
0
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
n
+=
tn
;
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
// update
pBlockIdx
->
offset
=
pHeadFile
->
size
;
pBlockIdx
->
size
=
size
;
pHeadFile
->
size
+=
size
;
tsdbTrace
(
"vgId:%d, write block, file ID:%d commit ID:%d suid:%"
PRId64
" uid:%"
PRId64
" offset:%"
PRId64
" size:%"
PRId64
" nItem:%d"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
pWriter
->
wSet
.
fid
,
pHeadFile
->
commitID
,
pBlockIdx
->
suid
,
pBlockIdx
->
uid
,
pBlockIdx
->
offset
,
pBlockIdx
->
size
,
mBlock
->
nItem
);
return
code
;
_err:
tsdbError
(
"vgId:%d,
read block failed since %s"
,
TD_VID
(
pRead
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d,
write block failed since %s"
,
TD_VID
(
pWrit
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
int32_t
tsdb
ReadBlockSma
(
SDataFReader
*
pReader
,
SDataBlk
*
pDataBlk
,
SArray
*
aColumnDataAgg
)
{
int32_t
tsdb
WriteSstBlk
(
SDataFWriter
*
pWriter
,
SArray
*
aSstBlk
)
{
int32_t
code
=
0
;
SSmaInfo
*
pSmaInfo
=
&
pDataBlk
->
smaInfo
;
SSstFile
*
pSstFile
=
&
pWriter
->
fSst
[
pWriter
->
wSet
.
nSstF
-
1
];
int64_t
size
;
int64_t
n
;
ASSERT
(
pSmaInfo
->
size
>
0
);
// check
if
(
taosArrayGetSize
(
aSstBlk
)
==
0
)
{
pSstFile
->
offset
=
pSstFile
->
size
;
goto
_exit
;
}
taosArrayClear
(
aColumnDataAgg
);
// size
size
=
sizeof
(
uint32_t
);
// TSDB_FILE_DLMT
for
(
int32_t
iBlockL
=
0
;
iBlockL
<
taosArrayGetSize
(
aSstBlk
);
iBlockL
++
)
{
size
+=
tPutSstBlk
(
NULL
,
taosArrayGet
(
aSstBlk
,
iBlockL
));
}
size
+=
sizeof
(
TSCKSUM
);
// alloc
int32_t
size
=
pSmaInfo
->
size
+
sizeof
(
TSCKSUM
);
code
=
tRealloc
(
&
pReader
->
aBuf
[
0
],
size
);
code
=
tRealloc
(
&
pWriter
->
aBuf
[
0
],
size
);
if
(
code
)
goto
_err
;
// seek
int64_t
n
=
taosLSeekFile
(
pReader
->
pSmaFD
,
pSmaInfo
->
offset
,
SEEK_SET
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
else
if
(
n
<
pSmaInfo
->
offset
)
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
// encode
n
=
0
;
n
+=
tPutU32
(
pWriter
->
aBuf
[
0
]
+
n
,
TSDB_FILE_DLMT
);
for
(
int32_t
iBlockL
=
0
;
iBlockL
<
taosArrayGetSize
(
aSstBlk
);
iBlockL
++
)
{
n
+=
tPutSstBlk
(
pWriter
->
aBuf
[
0
]
+
n
,
taosArrayGet
(
aSstBlk
,
iBlockL
));
}
taosCalcChecksumAppend
(
0
,
pWriter
->
aBuf
[
0
],
size
);
// read
n
=
taosReadFile
(
pReader
->
pSmaFD
,
pReader
->
aBuf
[
0
],
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
// write
n
=
taosWriteFile
(
pWriter
->
pLastFD
,
pWriter
->
aBuf
[
0
],
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
else
if
(
n
<
size
)
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
// check
if
(
!
taosCheckChecksumWhole
(
pReader
->
aBuf
[
0
],
size
))
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
// decode
n
=
0
;
while
(
n
<
pSmaInfo
->
size
)
{
SColumnDataAgg
sma
;
n
+=
tGetColumnDataAgg
(
pReader
->
aBuf
[
0
]
+
n
,
&
sma
);
if
(
taosArrayPush
(
aColumnDataAgg
,
&
sma
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
}
// update
pSstFile
->
offset
=
pSstFile
->
size
;
pSstFile
->
size
+=
size
;
_exit:
tsdbTrace
(
"vgId:%d tsdb write blockl, loffset:%"
PRId64
" size:%"
PRId64
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
pSstFile
->
offset
,
size
);
return
code
;
_err:
tsdbError
(
"vgId:%d tsdb
read block sma failed since %s"
,
TD_VID
(
pRead
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d tsdb
write blockl failed since %s"
,
TD_VID
(
pWrit
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
static
int32_t
tsdbReadBlockDataImpl
(
SDataFReader
*
pReader
,
SBlockInfo
*
pBlkInfo
,
int8_t
fromLast
,
SBlockData
*
pBlockData
)
{
static
int32_t
tsdbWriteBlockSma
(
SDataFWriter
*
pWriter
,
SBlockData
*
pBlockData
,
SSmaInfo
*
pSmaInfo
)
{
int32_t
code
=
0
;
tBlockDataClear
(
pBlockData
);
pSmaInfo
->
offset
=
0
;
pSmaInfo
->
size
=
0
;
TdFilePtr
pFD
=
fromLast
?
pReader
->
aLastFD
[
0
]
:
pReader
->
pDataFD
;
// (todo)
// encode
for
(
int32_t
iColData
=
0
;
iColData
<
taosArrayGetSize
(
pBlockData
->
aIdx
);
iColData
++
)
{
SColData
*
pColData
=
tBlockDataGetColDataByIdx
(
pBlockData
,
iColData
);
// uid + version + tskey
code
=
tsdbReadAndCheck
(
pFD
,
pBlkInfo
->
offset
,
&
pReader
->
aBuf
[
0
],
pBlkInfo
->
szKey
,
1
);
if
(
code
)
goto
_err
;
SDiskDataHdr
hdr
;
uint8_t
*
p
=
pReader
->
aBuf
[
0
]
+
tGetDiskDataHdr
(
pReader
->
aBuf
[
0
],
&
hdr
);
if
((
!
pColData
->
smaOn
)
||
IS_VAR_DATA_TYPE
(
pColData
->
type
))
continue
;
ASSERT
(
hdr
.
delimiter
==
TSDB_FILE_DLMT
);
ASSERT
(
pBlockData
->
suid
==
hdr
.
suid
);
ASSERT
(
pBlockData
->
uid
==
hdr
.
uid
);
SColumnDataAgg
sma
;
tsdbCalcColDataSMA
(
pColData
,
&
sma
);
pBlockData
->
nRow
=
hdr
.
nRow
;
code
=
tRealloc
(
&
pWriter
->
aBuf
[
0
],
pSmaInfo
->
size
+
tPutColumnDataAgg
(
NULL
,
&
sma
));
if
(
code
)
goto
_err
;
pSmaInfo
->
size
+=
tPutColumnDataAgg
(
pWriter
->
aBuf
[
0
]
+
pSmaInfo
->
size
,
&
sma
);
}
// uid
if
(
hdr
.
uid
==
0
)
{
ASSERT
(
hdr
.
szUid
);
code
=
tsdbDecmprData
(
p
,
hdr
.
szUid
,
TSDB_DATA_TYPE_BIGINT
,
hdr
.
cmprAlg
,
(
uint8_t
**
)
&
pBlockData
->
aUid
,
sizeof
(
int64_t
)
*
hdr
.
nRow
,
&
pReader
->
aBuf
[
1
]);
if
(
code
)
goto
_err
;
}
else
{
ASSERT
(
!
hdr
.
szUid
);
}
p
+=
hdr
.
szUid
;
// version
code
=
tsdbDecmprData
(
p
,
hdr
.
szVer
,
TSDB_DATA_TYPE_BIGINT
,
hdr
.
cmprAlg
,
(
uint8_t
**
)
&
pBlockData
->
aVersion
,
sizeof
(
int64_t
)
*
hdr
.
nRow
,
&
pReader
->
aBuf
[
1
]);
if
(
code
)
goto
_err
;
p
+=
hdr
.
szVer
;
// TSKEY
code
=
tsdbDecmprData
(
p
,
hdr
.
szKey
,
TSDB_DATA_TYPE_TIMESTAMP
,
hdr
.
cmprAlg
,
(
uint8_t
**
)
&
pBlockData
->
aTSKEY
,
sizeof
(
TSKEY
)
*
hdr
.
nRow
,
&
pReader
->
aBuf
[
1
]);
if
(
code
)
goto
_err
;
p
+=
hdr
.
szKey
;
ASSERT
(
p
-
pReader
->
aBuf
[
0
]
==
pBlkInfo
->
szKey
-
sizeof
(
TSCKSUM
));
// read and decode columns
if
(
taosArrayGetSize
(
pBlockData
->
aIdx
)
==
0
)
goto
_exit
;
// write
if
(
pSmaInfo
->
size
)
{
int32_t
size
=
pSmaInfo
->
size
+
sizeof
(
TSCKSUM
);
if
(
hdr
.
szBlkCol
>
0
)
{
int64_t
offset
=
pBlkInfo
->
offset
+
pBlkInfo
->
szKey
;
code
=
tsdbReadAndCheck
(
pFD
,
offset
,
&
pReader
->
aBuf
[
0
],
hdr
.
szBlkCol
+
sizeof
(
TSCKSUM
),
1
);
code
=
tRealloc
(
&
pWriter
->
aBuf
[
0
],
size
);
if
(
code
)
goto
_err
;
}
SBlockCol
blockCol
=
{.
cid
=
0
};
SBlockCol
*
pBlockCol
=
&
blockCol
;
int32_t
n
=
0
;
for
(
int32_t
iColData
=
0
;
iColData
<
taosArrayGetSize
(
pBlockData
->
aIdx
);
iColData
++
)
{
SColData
*
pColData
=
tBlockDataGetColDataByIdx
(
pBlockData
,
iColData
);
taosCalcChecksumAppend
(
0
,
pWriter
->
aBuf
[
0
],
size
);
while
(
pBlockCol
&&
pBlockCol
->
cid
<
pColData
->
cid
)
{
if
(
n
<
hdr
.
szBlkCol
)
{
n
+=
tGetBlockCol
(
pReader
->
aBuf
[
0
]
+
n
,
pBlockCol
);
}
else
{
ASSERT
(
n
==
hdr
.
szBlkCol
);
pBlockCol
=
NULL
;
}
int64_t
n
=
taosWriteFile
(
pWriter
->
pSmaFD
,
pWriter
->
aBuf
[
0
],
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
if
(
pBlockCol
==
NULL
||
pBlockCol
->
cid
>
pColData
->
cid
)
{
// add a lot of NONE
for
(
int32_t
iRow
=
0
;
iRow
<
hdr
.
nRow
;
iRow
++
)
{
code
=
tColDataAppendValue
(
pColData
,
&
COL_VAL_NONE
(
pColData
->
cid
,
pColData
->
type
));
if
(
code
)
goto
_err
;
}
}
else
{
ASSERT
(
pBlockCol
->
type
==
pColData
->
type
);
ASSERT
(
pBlockCol
->
flag
&&
pBlockCol
->
flag
!=
HAS_NONE
);
if
(
pBlockCol
->
flag
==
HAS_NULL
)
{
// add a lot of NULL
for
(
int32_t
iRow
=
0
;
iRow
<
hdr
.
nRow
;
iRow
++
)
{
code
=
tColDataAppendValue
(
pColData
,
&
COL_VAL_NULL
(
pBlockCol
->
cid
,
pBlockCol
->
type
));
if
(
code
)
goto
_err
;
}
}
else
{
// decode from binary
int64_t
offset
=
pBlkInfo
->
offset
+
pBlkInfo
->
szKey
+
hdr
.
szBlkCol
+
sizeof
(
TSCKSUM
)
+
pBlockCol
->
offset
;
int32_t
size
=
pBlockCol
->
szBitmap
+
pBlockCol
->
szOffset
+
pBlockCol
->
szValue
+
sizeof
(
TSCKSUM
);
code
=
tsdbReadAndCheck
(
pFD
,
offset
,
&
pReader
->
aBuf
[
1
],
size
,
0
);
if
(
code
)
goto
_err
;
code
=
tsdbDecmprColData
(
pReader
->
aBuf
[
1
],
pBlockCol
,
hdr
.
cmprAlg
,
hdr
.
nRow
,
pColData
,
&
pReader
->
aBuf
[
2
]);
if
(
code
)
goto
_err
;
}
}
pSmaInfo
->
offset
=
pWriter
->
fSma
.
size
;
pWriter
->
fSma
.
size
+=
size
;
}
_exit:
return
code
;
_err:
tsdbError
(
"vgId:%d tsdb
read block data impl failed since %s"
,
TD_VID
(
pRead
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d tsdb
write block sma failed since %s"
,
TD_VID
(
pWrit
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
int32_t
tsdbReadDataBlock
(
SDataFReader
*
pReader
,
SDataBlk
*
pDataBlk
,
SBlockData
*
pBlockData
)
{
int32_t
tsdbWriteBlockData
(
SDataFWriter
*
pWriter
,
SBlockData
*
pBlockData
,
SBlockInfo
*
pBlkInfo
,
SSmaInfo
*
pSmaInfo
,
int8_t
cmprAlg
,
int8_t
toLast
)
{
int32_t
code
=
0
;
code
=
tsdbReadBlockDataImpl
(
pReader
,
&
pDataBlk
->
aSubBlock
[
0
],
0
,
pBlockData
);
if
(
code
)
goto
_err
;
if
(
pDataBlk
->
nSubBlock
>
1
)
{
SBlockData
bData1
;
SBlockData
bData2
;
ASSERT
(
pBlockData
->
nRow
>
0
);
// create
code
=
tBlockDataCreate
(
&
bData1
);
if
(
code
)
goto
_err
;
code
=
tBlockDataCreate
(
&
bData2
);
if
(
code
)
goto
_err
;
pBlkInfo
->
offset
=
toLast
?
pWriter
->
fSst
[
pWriter
->
wSet
.
nSstF
-
1
].
size
:
pWriter
->
fData
.
size
;
pBlkInfo
->
szBlock
=
0
;
pBlkInfo
->
szKey
=
0
;
// init
tBlockDataInitEx
(
&
bData1
,
pBlockData
);
tBlockDataInitEx
(
&
bData2
,
pBlockData
)
;
int32_t
aBufN
[
4
]
=
{
0
};
code
=
tCmprBlockData
(
pBlockData
,
cmprAlg
,
NULL
,
NULL
,
pWriter
->
aBuf
,
aBufN
);
if
(
code
)
goto
_err
;
for
(
int32_t
iSubBlock
=
1
;
iSubBlock
<
pDataBlk
->
nSubBlock
;
iSubBlock
++
)
{
code
=
tsdbReadBlockDataImpl
(
pReader
,
&
pDataBlk
->
aSubBlock
[
iSubBlock
],
0
,
&
bData1
);
if
(
code
)
{
tBlockDataDestroy
(
&
bData1
,
1
);
tBlockDataDestroy
(
&
bData2
,
1
);
goto
_err
;
}
// write =================
TdFilePtr
pFD
=
toLast
?
pWriter
->
pLastFD
:
pWriter
->
pDataFD
;
code
=
tBlockDataCopy
(
pBlockData
,
&
bData2
);
if
(
code
)
{
tBlockDataDestroy
(
&
bData1
,
1
);
tBlockDataDestroy
(
&
bData2
,
1
);
goto
_err
;
}
pBlkInfo
->
szKey
=
aBufN
[
3
]
+
aBufN
[
2
];
pBlkInfo
->
szBlock
=
aBufN
[
0
]
+
aBufN
[
1
]
+
aBufN
[
2
]
+
aBufN
[
3
];
code
=
tBlockDataMerge
(
&
bData1
,
&
bData2
,
pBlockData
);
if
(
code
)
{
tBlockDataDestroy
(
&
bData1
,
1
);
tBlockDataDestroy
(
&
bData2
,
1
);
goto
_err
;
}
}
int64_t
n
=
taosWriteFile
(
pFD
,
pWriter
->
aBuf
[
3
],
aBufN
[
3
]);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
tBlockDataDestroy
(
&
bData1
,
1
);
tBlockDataDestroy
(
&
bData2
,
1
);
n
=
taosWriteFile
(
pFD
,
pWriter
->
aBuf
[
2
],
aBufN
[
2
]);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
return
code
;
if
(
aBufN
[
1
])
{
n
=
taosWriteFile
(
pFD
,
pWriter
->
aBuf
[
1
],
aBufN
[
1
]);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
}
_err:
tsdbError
(
"vgId:%d tsdb read data block failed since %s"
,
TD_VID
(
pReader
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
if
(
aBufN
[
0
])
{
n
=
taosWriteFile
(
pFD
,
pWriter
->
aBuf
[
0
],
aBufN
[
0
]);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
}
int32_t
tsdbReadSstBlock
(
SDataFReader
*
pReader
,
int32_t
iSst
,
SSstBlk
*
pSstBlk
,
SBlockData
*
pBlockData
)
{
int32_t
code
=
0
;
// update info
if
(
toLast
)
{
pWriter
->
fSst
[
pWriter
->
wSet
.
nSstF
-
1
].
size
+=
pBlkInfo
->
szBlock
;
}
else
{
pWriter
->
fData
.
size
+=
pBlkInfo
->
szBlock
;
}
code
=
tsdbReadBlockDataImpl
(
pReader
,
&
pSstBlk
->
bInfo
,
1
,
pBlockData
);
if
(
code
)
goto
_err
;
// ================= SMA ====================
if
(
pSmaInfo
)
{
code
=
tsdbWriteBlockSma
(
pWriter
,
pBlockData
,
pSmaInfo
);
if
(
code
)
goto
_err
;
}
_exit:
tsdbTrace
(
"vgId:%d tsdb write block data, suid:%"
PRId64
" uid:%"
PRId64
" nRow:%d, offset:%"
PRId64
" size:%d"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
pBlockData
->
suid
,
pBlockData
->
uid
,
pBlockData
->
nRow
,
pBlkInfo
->
offset
,
pBlkInfo
->
szBlock
);
return
code
;
_err:
tsdbError
(
"vgId:%d tsdb
read last block failed since %s"
,
TD_VID
(
pRead
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d tsdb
write block data failed since %s"
,
TD_VID
(
pWrit
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
int32_t
tsdbReadSstBlockEx
(
SDataFReader
*
pReader
,
int32_t
iSst
,
SSstBlk
*
pSstBlk
,
SBlockData
*
pBlockData
)
{
int32_t
code
=
0
;
// read
code
=
tsdbReadAndCheck
(
pReader
->
aLastFD
[
iSst
],
pSstBlk
->
bInfo
.
offset
,
&
pReader
->
aBuf
[
0
],
pSstBlk
->
bInfo
.
szBlock
,
0
);
if
(
code
)
goto
_exit
;
// decmpr
code
=
tDecmprBlockData
(
pReader
->
aBuf
[
0
],
pSstBlk
->
bInfo
.
szBlock
,
pBlockData
,
&
pReader
->
aBuf
[
1
]);
if
(
code
)
goto
_exit
;
_exit:
return
code
;
}
int32_t
tsdbDFileSetCopy
(
STsdb
*
pTsdb
,
SDFileSet
*
pSetFrom
,
SDFileSet
*
pSetTo
)
{
int32_t
code
=
0
;
int64_t
n
;
int64_t
size
;
TdFilePtr
pOutFD
=
NULL
;
// TODO
TdFilePtr
PInFD
=
NULL
;
// TODO
char
fNameFrom
[
TSDB_FILENAME_LEN
];
char
fNameTo
[
TSDB_FILENAME_LEN
];
// SDataFWriter ====================================================
int32_t
tsdbDataFWriterOpen
(
SDataFWriter
**
ppWriter
,
STsdb
*
pTsdb
,
SDFileSet
*
pSet
)
{
int32_t
code
=
0
;
int32_t
flag
;
int64_t
n
;
SDataFWriter
*
pWriter
=
NULL
;
char
fname
[
TSDB_FILENAME_LEN
];
char
hdr
[
TSDB_FHDR_SIZE
]
=
{
0
};
// head
tsdbHeadFileName
(
pTsdb
,
pSetFrom
->
diskId
,
pSetFrom
->
fid
,
pSetFrom
->
pHeadF
,
fNameFrom
);
tsdbHeadFileName
(
pTsdb
,
pSetTo
->
diskId
,
pSetTo
->
fid
,
pSetTo
->
pHeadF
,
fNameTo
);
// alloc
pWriter
=
taosMemoryCalloc
(
1
,
sizeof
(
*
pWriter
));
if
(
pWriter
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
pOutFD
=
taosOpenFile
(
fNameTo
,
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
);
if
(
pOutFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
pWriter
->
pTsdb
=
pTsdb
;
pWriter
->
wSet
=
(
SDFileSet
){
.
diskId
=
pSet
->
diskId
,
.
fid
=
pSet
->
fid
,
.
pHeadF
=
&
pWriter
->
fHead
,
.
pDataF
=
&
pWriter
->
fData
,
.
pSmaF
=
&
pWriter
->
fSma
,
.
nSstF
=
pSet
->
nSstF
//
};
pWriter
->
fHead
=
*
pSet
->
pHeadF
;
pWriter
->
fData
=
*
pSet
->
pDataF
;
pWriter
->
fSma
=
*
pSet
->
pSmaF
;
for
(
int8_t
iSst
=
0
;
iSst
<
pSet
->
nSstF
;
iSst
++
)
{
pWriter
->
wSet
.
aSstF
[
iSst
]
=
&
pWriter
->
fSst
[
iSst
];
pWriter
->
fSst
[
iSst
]
=
*
pSet
->
aSstF
[
iSst
];
}
// head
flag
=
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
;
tsdbHeadFileName
(
pTsdb
,
pWriter
->
wSet
.
diskId
,
pWriter
->
wSet
.
fid
,
&
pWriter
->
fHead
,
fname
);
pWriter
->
pHeadFD
=
taosOpenFile
(
fname
,
flag
);
if
(
pWriter
->
pHeadFD
==
NULL
)
{
PInFD
=
taosOpenFile
(
fNameFrom
,
TD_FILE_READ
);
if
(
PInFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
n
=
taos
WriteFile
(
pWriter
->
pHeadFD
,
hdr
,
TSDB_FHDR_SIZE
);
n
=
taos
FSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
pHeadF
->
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
ASSERT
(
n
==
TSDB_FHDR_SIZE
);
pWriter
->
fHead
.
size
+=
TSDB_FHDR_SIZE
;
taosCloseFile
(
&
pOutFD
);
taosCloseFile
(
&
PInFD
);
// data
if
(
pWriter
->
fData
.
size
==
0
)
{
flag
=
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
;
}
else
{
flag
=
TD_FILE_WRITE
;
}
tsdbDataFileName
(
pTsdb
,
pWriter
->
wSet
.
diskId
,
pWriter
->
wSet
.
fid
,
&
pWriter
->
fData
,
fname
);
pWriter
->
pDataFD
=
taosOpenFile
(
fname
,
flag
);
if
(
pWriter
->
pDataFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
if
(
pWriter
->
fData
.
size
==
0
)
{
n
=
taosWriteFile
(
pWriter
->
pDataFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
pWriter
->
fData
.
size
+=
TSDB_FHDR_SIZE
;
}
else
{
n
=
taosLSeekFile
(
pWriter
->
pDataFD
,
0
,
SEEK_END
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
ASSERT
(
n
==
pWriter
->
fData
.
size
);
}
tsdbDataFileName
(
pTsdb
,
pSetFrom
->
diskId
,
pSetFrom
->
fid
,
pSetFrom
->
pDataF
,
fNameFrom
);
tsdbDataFileName
(
pTsdb
,
pSetTo
->
diskId
,
pSetTo
->
fid
,
pSetTo
->
pDataF
,
fNameTo
);
// sma
if
(
pWriter
->
fSma
.
size
==
0
)
{
flag
=
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
;
}
else
{
flag
=
TD_FILE_WRITE
;
}
tsdbSmaFileName
(
pTsdb
,
pWriter
->
wSet
.
diskId
,
pWriter
->
wSet
.
fid
,
&
pWriter
->
fSma
,
fname
);
pWriter
->
pSmaFD
=
taosOpenFile
(
fname
,
flag
);
if
(
pWriter
->
pSmaFD
==
NULL
)
{
pOutFD
=
taosOpenFile
(
fNameTo
,
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
);
if
(
pOutFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
if
(
pWriter
->
fSma
.
size
==
0
)
{
n
=
taosWriteFile
(
pWriter
->
pSmaFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
pWriter
->
fSma
.
size
+=
TSDB_FHDR_SIZE
;
}
else
{
n
=
taosLSeekFile
(
pWriter
->
pSmaFD
,
0
,
SEEK_END
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
ASSERT
(
n
==
pWriter
->
fSma
.
size
);
}
// sst
ASSERT
(
pWriter
->
fSst
[
pSet
->
nSstF
-
1
].
size
==
0
);
flag
=
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
;
tsdbSstFileName
(
pTsdb
,
pWriter
->
wSet
.
diskId
,
pWriter
->
wSet
.
fid
,
&
pWriter
->
fSst
[
pSet
->
nSstF
-
1
],
fname
);
pWriter
->
pLastFD
=
taosOpenFile
(
fname
,
flag
);
if
(
pWriter
->
pLastFD
==
NULL
)
{
PInFD
=
taosOpenFile
(
fNameFrom
,
TD_FILE_READ
);
if
(
PInFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
n
=
taosWriteFile
(
pWriter
->
pLastFD
,
hdr
,
TSDB_FHDR_SIZE
);
n
=
taosFSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
pDataF
->
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
pWriter
->
fSst
[
pWriter
->
wSet
.
nSstF
-
1
].
size
+=
TSDB_FHDR_SIZE
;
*
ppWriter
=
pWriter
;
return
code
;
_err:
tsdbError
(
"vgId:%d, tsdb data file writer open failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
*
ppWriter
=
NULL
;
return
code
;
}
int32_t
tsdbDataFWriterClose
(
SDataFWriter
**
ppWriter
,
int8_t
sync
)
{
int32_t
code
=
0
;
STsdb
*
pTsdb
=
NULL
;
if
(
*
ppWriter
==
NULL
)
goto
_exit
;
pTsdb
=
(
*
ppWriter
)
->
pTsdb
;
if
(
sync
)
{
if
(
taosFsyncFile
((
*
ppWriter
)
->
pHeadFD
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
if
(
taosFsyncFile
((
*
ppWriter
)
->
pDataFD
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
taosCloseFile
(
&
pOutFD
);
taosCloseFile
(
&
PInFD
);
if
(
taosFsyncFile
((
*
ppWriter
)
->
pSmaFD
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// sst
tsdbSstFileName
(
pTsdb
,
pSetFrom
->
diskId
,
pSetFrom
->
fid
,
pSetFrom
->
aSstF
[
0
],
fNameFrom
);
tsdbSstFileName
(
pTsdb
,
pSetTo
->
diskId
,
pSetTo
->
fid
,
pSetTo
->
aSstF
[
0
],
fNameTo
);
if
(
taosFsyncFile
((
*
ppWriter
)
->
pLastFD
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
pOutFD
=
taosOpenFile
(
fNameTo
,
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
);
if
(
pOutFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
)
;
goto
_err
;
}
if
(
taosCloseFile
(
&
(
*
ppWriter
)
->
pHeadFD
)
<
0
)
{
PInFD
=
taosOpenFile
(
fNameFrom
,
TD_FILE_READ
);
if
(
PInFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
if
(
taosCloseFile
(
&
(
*
ppWriter
)
->
pDataFD
)
<
0
)
{
n
=
taosFSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
aSstF
[
0
]
->
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
taosCloseFile
(
&
pOutFD
);
taosCloseFile
(
&
PInFD
);
if
(
taosCloseFile
(
&
(
*
ppWriter
)
->
pSmaFD
)
<
0
)
{
// sma
tsdbSmaFileName
(
pTsdb
,
pSetFrom
->
diskId
,
pSetFrom
->
fid
,
pSetFrom
->
pSmaF
,
fNameFrom
);
tsdbSmaFileName
(
pTsdb
,
pSetTo
->
diskId
,
pSetTo
->
fid
,
pSetTo
->
pSmaF
,
fNameTo
);
pOutFD
=
taosOpenFile
(
fNameTo
,
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
);
if
(
pOutFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
if
(
taosCloseFile
(
&
(
*
ppWriter
)
->
pLastFD
)
<
0
)
{
PInFD
=
taosOpenFile
(
fNameFrom
,
TD_FILE_READ
);
if
(
PInFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
for
(
int32_t
iBuf
=
0
;
iBuf
<
sizeof
((
*
ppWriter
)
->
aBuf
)
/
sizeof
(
uint8_t
*
);
iBuf
++
)
{
tFree
((
*
ppWriter
)
->
aBuf
[
iBuf
]);
n
=
taosFSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
pSmaF
->
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
taos
MemoryFree
(
*
ppWriter
);
_exit:
*
ppWriter
=
NULL
;
taos
CloseFile
(
&
pOutFD
);
taosCloseFile
(
&
PInFD
);
return
code
;
_err:
tsdbError
(
"vgId:%d,
data file writer close
failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d,
tsdb DFileSet copy
failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
int32_t
tsdbUpdateDFileSetHeader
(
SDataFWriter
*
pWriter
)
{
int32_t
code
=
0
;
int64_t
n
;
char
hdr
[
TSDB_FHDR_SIZE
];
// head ==============
memset
(
hdr
,
0
,
TSDB_FHDR_SIZE
);
tPutHeadFile
(
hdr
,
&
pWriter
->
fHead
);
taosCalcChecksumAppend
(
0
,
hdr
,
TSDB_FHDR_SIZE
);
// SDataFReader ====================================================
int32_t
tsdbDataFReaderOpen
(
SDataFReader
**
ppReader
,
STsdb
*
pTsdb
,
SDFileSet
*
pSet
)
{
int32_t
code
=
0
;
SDataFReader
*
pReader
;
char
fname
[
TSDB_FILENAME_LEN
];
n
=
taosLSeekFile
(
pWriter
->
pHeadFD
,
0
,
SEEK_SET
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
// alloc
pReader
=
(
SDataFReader
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pReader
));
if
(
pReader
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pReader
->
pTsdb
=
pTsdb
;
pReader
->
pSet
=
pSet
;
n
=
taosWriteFile
(
pWriter
->
pHeadFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
// open impl
// head
tsdbHeadFileName
(
pTsdb
,
pSet
->
diskId
,
pSet
->
fid
,
pSet
->
pHeadF
,
fname
);
pReader
->
pHeadFD
=
taosOpenFile
(
fname
,
TD_FILE_READ
);
if
(
pReader
->
pHeadFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// data ==============
memset
(
hdr
,
0
,
TSDB_FHDR_SIZE
);
tPutDataFile
(
hdr
,
&
pWriter
->
fData
);
taosCalcChecksumAppend
(
0
,
hdr
,
TSDB_FHDR_SIZE
);
n
=
taosLSeekFile
(
pWriter
->
pDataFD
,
0
,
SEEK_SET
);
if
(
n
<
0
)
{
// data
tsdbDataFileName
(
pTsdb
,
pSet
->
diskId
,
pSet
->
fid
,
pSet
->
pDataF
,
fname
);
pReader
->
pDataFD
=
taosOpenFile
(
fname
,
TD_FILE_READ
);
if
(
pReader
->
pDataFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
n
=
taosWriteFile
(
pWriter
->
pDataFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
// sma
tsdbSmaFileName
(
pTsdb
,
pSet
->
diskId
,
pSet
->
fid
,
pSet
->
pSmaF
,
fname
);
pReader
->
pSmaFD
=
taosOpenFile
(
fname
,
TD_FILE_READ
);
if
(
pReader
->
pSmaFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// sma ==============
memset
(
hdr
,
0
,
TSDB_FHDR_SIZE
);
tPutSmaFile
(
hdr
,
&
pWriter
->
fSma
);
taosCalcChecksumAppend
(
0
,
hdr
,
TSDB_FHDR_SIZE
);
// sst
for
(
int32_t
iSst
=
0
;
iSst
<
pSet
->
nSstF
;
iSst
++
)
{
tsdbSstFileName
(
pTsdb
,
pSet
->
diskId
,
pSet
->
fid
,
pSet
->
aSstF
[
iSst
],
fname
);
pReader
->
aLastFD
[
iSst
]
=
taosOpenFile
(
fname
,
TD_FILE_READ
);
if
(
pReader
->
aLastFD
[
iSst
]
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
}
n
=
taosLSeekFile
(
pWriter
->
pSmaFD
,
0
,
SEEK_SET
);
if
(
n
<
0
)
{
*
ppReader
=
pReader
;
return
code
;
_err:
tsdbError
(
"vgId:%d, tsdb data file reader open failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
*
ppReader
=
NULL
;
return
code
;
}
int32_t
tsdbDataFReaderClose
(
SDataFReader
**
ppReader
)
{
int32_t
code
=
0
;
if
(
*
ppReader
==
NULL
)
goto
_exit
;
// head
if
(
taosCloseFile
(
&
(
*
ppReader
)
->
pHeadFD
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
n
=
taosWriteFile
(
pWriter
->
pSmaFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
// data
if
(
taosCloseFile
(
&
(
*
ppReader
)
->
pDataFD
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// sst ==============
memset
(
hdr
,
0
,
TSDB_FHDR_SIZE
);
tPutSstFile
(
hdr
,
&
pWriter
->
fSst
[
pWriter
->
wSet
.
nSstF
-
1
]);
taosCalcChecksumAppend
(
0
,
hdr
,
TSDB_FHDR_SIZE
);
n
=
taosLSeekFile
(
pWriter
->
pLastFD
,
0
,
SEEK_SET
);
if
(
n
<
0
)
{
// sma
if
(
taosCloseFile
(
&
(
*
ppReader
)
->
pSmaFD
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
n
=
taosWriteFile
(
pWriter
->
pLastFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
// sst
for
(
int32_t
iSst
=
0
;
iSst
<
(
*
ppReader
)
->
pSet
->
nSstF
;
iSst
++
)
{
if
(
taosCloseFile
(
&
(
*
ppReader
)
->
aLastFD
[
iSst
])
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
}
for
(
int32_t
iBuf
=
0
;
iBuf
<
sizeof
((
*
ppReader
)
->
aBuf
)
/
sizeof
(
uint8_t
*
);
iBuf
++
)
{
tFree
((
*
ppReader
)
->
aBuf
[
iBuf
]);
}
taosMemoryFree
(
*
ppReader
);
_exit:
*
ppReader
=
NULL
;
return
code
;
_err:
tsdbError
(
"vgId:%d,
update DFileSet header failed since %s"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d,
data file reader close failed since %s"
,
TD_VID
((
*
ppReader
)
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
int32_t
tsdbWriteBlockIdx
(
SDataFWriter
*
pWriter
,
SArray
*
aBlockIdx
)
{
int32_t
code
=
0
;
SHeadFile
*
pHeadFile
=
&
pWriter
->
fHead
;
int64_t
size
=
0
;
int64_t
n
;
int32_t
tsdbReadBlockIdx
(
SDataFReader
*
pReader
,
SArray
*
aBlockIdx
)
{
int32_t
code
=
0
;
int64_t
offset
=
pReader
->
pSet
->
pHeadF
->
offset
;
int64_t
size
=
pReader
->
pSet
->
pHeadF
->
size
-
offset
;
int64_t
n
;
uint32_t
delimiter
;
// check
if
(
taosArrayGetSize
(
aBlockIdx
)
==
0
)
{
pHeadFile
->
offset
=
pHeadFile
->
size
;
taosArrayClear
(
aBlockIdx
);
if
(
size
==
0
)
{
goto
_exit
;
}
// prepare
size
=
sizeof
(
uint32_t
);
for
(
int32_t
iBlockIdx
=
0
;
iBlockIdx
<
taosArrayGetSize
(
aBlockIdx
);
iBlockIdx
++
)
{
size
+=
tPutBlockIdx
(
NULL
,
taosArrayGet
(
aBlockIdx
,
iBlockIdx
));
}
size
+=
sizeof
(
TSCKSUM
);
// alloc
code
=
tRealloc
(
&
p
Writ
er
->
aBuf
[
0
],
size
);
code
=
tRealloc
(
&
p
Read
er
->
aBuf
[
0
],
size
);
if
(
code
)
goto
_err
;
// build
n
=
0
;
n
=
tPutU32
(
pWriter
->
aBuf
[
0
]
+
n
,
TSDB_FILE_DLMT
);
for
(
int32_t
iBlockIdx
=
0
;
iBlockIdx
<
taosArrayGetSize
(
aBlockIdx
);
iBlockIdx
++
)
{
n
+=
tPutBlockIdx
(
pWriter
->
aBuf
[
0
]
+
n
,
taosArrayGet
(
aBlockIdx
,
iBlockIdx
));
// seek
if
(
taosLSeekFile
(
pReader
->
pHeadFD
,
offset
,
SEEK_SET
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
taosCalcChecksumAppend
(
0
,
pWriter
->
aBuf
[
0
],
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
//
write
n
=
taos
WriteFile
(
pWriter
->
pHeadFD
,
pWrit
er
->
aBuf
[
0
],
size
);
//
read
n
=
taos
ReadFile
(
pReader
->
pHeadFD
,
pRead
er
->
aBuf
[
0
],
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
else
if
(
n
<
size
)
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
// check
if
(
!
taosCheckChecksumWhole
(
pReader
->
aBuf
[
0
],
size
))
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
// decode
n
=
0
;
n
=
tGetU32
(
pReader
->
aBuf
[
0
]
+
n
,
&
delimiter
);
ASSERT
(
delimiter
==
TSDB_FILE_DLMT
);
while
(
n
<
size
-
sizeof
(
TSCKSUM
))
{
SBlockIdx
blockIdx
;
n
+=
tGetBlockIdx
(
pReader
->
aBuf
[
0
]
+
n
,
&
blockIdx
);
if
(
taosArrayPush
(
aBlockIdx
,
&
blockIdx
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
}
// update
pHeadFile
->
offset
=
pHeadFile
->
size
;
pHeadFile
->
size
+=
size
;
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
_exit:
tsdbTrace
(
"vgId:%d write block idx, offset:%"
PRId64
" size:%"
PRId64
" nBlockIdx:%d"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
pHeadFile
->
offset
,
size
,
taosArrayGetSize
(
aBlockIdx
));
return
code
;
_err:
tsdbError
(
"vgId:%d,
write block idx failed since %s"
,
TD_VID
(
pWrit
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d,
read block idx failed since %s"
,
TD_VID
(
pRead
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
int32_t
tsdbWriteBlock
(
SDataFWriter
*
pWriter
,
SMapData
*
mBlock
,
SBlockIdx
*
pBlockIdx
)
{
int32_t
code
=
0
;
SHeadFile
*
pHeadFile
=
&
pWriter
->
fHead
;
int64_t
size
;
int64_t
n
;
int32_t
tsdbReadSstBlk
(
SDataFReader
*
pReader
,
int32_t
iSst
,
SArray
*
aSstBlk
)
{
int32_t
code
=
0
;
int64_t
offset
=
pReader
->
pSet
->
aSstF
[
iSst
]
->
offset
;
int64_t
size
=
pReader
->
pSet
->
aSstF
[
iSst
]
->
size
-
offset
;
int64_t
n
;
uint32_t
delimiter
;
ASSERT
(
mBlock
->
nItem
>
0
);
taosArrayClear
(
aSstBlk
);
if
(
size
==
0
)
{
goto
_exit
;
}
// alloc
size
=
sizeof
(
uint32_t
)
+
tPutMapData
(
NULL
,
mBlock
)
+
sizeof
(
TSCKSUM
);
code
=
tRealloc
(
&
pWriter
->
aBuf
[
0
],
size
);
code
=
tRealloc
(
&
pReader
->
aBuf
[
0
],
size
);
if
(
code
)
goto
_err
;
// build
n
=
0
;
n
+=
tPutU32
(
pWriter
->
aBuf
[
0
]
+
n
,
TSDB_FILE_DLMT
);
n
+=
tPutMapData
(
pWriter
->
aBuf
[
0
]
+
n
,
mBlock
);
taosCalcChecksumAppend
(
0
,
pWriter
->
aBuf
[
0
],
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
// seek
if
(
taosLSeekFile
(
pReader
->
aLastFD
[
iSst
],
offset
,
SEEK_SET
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
//
write
n
=
taos
WriteFile
(
pWriter
->
pHeadFD
,
pWrit
er
->
aBuf
[
0
],
size
);
//
read
n
=
taos
ReadFile
(
pReader
->
aLastFD
[
iSst
],
pRead
er
->
aBuf
[
0
],
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
else
if
(
n
<
size
)
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
// update
pBlockIdx
->
offset
=
pHeadFile
->
size
;
pBlockIdx
->
size
=
size
;
pHeadFile
->
size
+=
size
;
// check
if
(
!
taosCheckChecksumWhole
(
pReader
->
aBuf
[
0
],
size
))
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
tsdbTrace
(
"vgId:%d, write block, file ID:%d commit ID:%d suid:%"
PRId64
" uid:%"
PRId64
" offset:%"
PRId64
" size:%"
PRId64
" nItem:%d"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
pWriter
->
wSet
.
fid
,
pHeadFile
->
commitID
,
pBlockIdx
->
suid
,
pBlockIdx
->
uid
,
pBlockIdx
->
offset
,
pBlockIdx
->
size
,
mBlock
->
nItem
);
// decode
n
=
0
;
n
=
tGetU32
(
pReader
->
aBuf
[
0
]
+
n
,
&
delimiter
);
ASSERT
(
delimiter
==
TSDB_FILE_DLMT
);
while
(
n
<
size
-
sizeof
(
TSCKSUM
))
{
SSstBlk
blockl
;
n
+=
tGetSstBlk
(
pReader
->
aBuf
[
0
]
+
n
,
&
blockl
);
if
(
taosArrayPush
(
aSstBlk
,
&
blockl
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
}
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
_exit:
return
code
;
_err:
tsdbError
(
"vgId:%d
, write block failed since %s"
,
TD_VID
(
pWrit
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d
read blockl failed since %s"
,
TD_VID
(
pRead
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
int32_t
tsdbWriteSstBlk
(
SDataFWriter
*
pWriter
,
SArray
*
aSstBlk
)
{
int32_t
code
=
0
;
SSstFile
*
pSstFile
=
&
pWriter
->
fSst
[
pWriter
->
wSet
.
nSstF
-
1
];
int64_t
size
;
int64_t
n
;
// check
if
(
taosArrayGetSize
(
aSstBlk
)
==
0
)
{
pSstFile
->
offset
=
pSstFile
->
size
;
goto
_exit
;
}
// size
size
=
sizeof
(
uint32_t
);
// TSDB_FILE_DLMT
for
(
int32_t
iBlockL
=
0
;
iBlockL
<
taosArrayGetSize
(
aSstBlk
);
iBlockL
++
)
{
size
+=
tPutSstBlk
(
NULL
,
taosArrayGet
(
aSstBlk
,
iBlockL
));
}
size
+=
sizeof
(
TSCKSUM
);
int32_t
tsdbReadBlock
(
SDataFReader
*
pReader
,
SBlockIdx
*
pBlockIdx
,
SMapData
*
mBlock
)
{
int32_t
code
=
0
;
int64_t
offset
=
pBlockIdx
->
offset
;
int64_t
size
=
pBlockIdx
->
size
;
int64_t
n
;
int64_t
tn
;
// alloc
code
=
tRealloc
(
&
p
Writ
er
->
aBuf
[
0
],
size
);
code
=
tRealloc
(
&
p
Read
er
->
aBuf
[
0
],
size
);
if
(
code
)
goto
_err
;
// encode
n
=
0
;
n
+=
tPutU32
(
pWriter
->
aBuf
[
0
]
+
n
,
TSDB_FILE_DLMT
);
for
(
int32_t
iBlockL
=
0
;
iBlockL
<
taosArrayGetSize
(
aSstBlk
);
iBlockL
++
)
{
n
+=
tPutSstBlk
(
pWriter
->
aBuf
[
0
]
+
n
,
taosArrayGet
(
aSstBlk
,
iBlockL
));
// seek
if
(
taosLSeekFile
(
pReader
->
pHeadFD
,
offset
,
SEEK_SET
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
taosCalcChecksumAppend
(
0
,
pWriter
->
aBuf
[
0
],
size
);
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
//
write
n
=
taos
WriteFile
(
pWriter
->
pLastFD
,
pWrit
er
->
aBuf
[
0
],
size
);
//
read
n
=
taos
ReadFile
(
pReader
->
pHeadFD
,
pRead
er
->
aBuf
[
0
],
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
else
if
(
n
<
size
)
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
// update
pSstFile
->
offset
=
pSstFile
->
size
;
pSstFile
->
size
+=
size
;
_exit:
tsdbTrace
(
"vgId:%d tsdb write blockl, loffset:%"
PRId64
" size:%"
PRId64
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
pSstFile
->
offset
,
size
);
return
code
;
// check
if
(
!
taosCheckChecksumWhole
(
pReader
->
aBuf
[
0
],
size
))
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
_err:
tsdbError
(
"vgId:%d tsdb write blockl failed since %s"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
// decode
n
=
0
;
static
void
tsdbUpdateBlockInfo
(
SBlockData
*
pBlockData
,
SDataBlk
*
pDataBlk
)
{
for
(
int32_t
iRow
=
0
;
iRow
<
pBlockData
->
nRow
;
iRow
++
)
{
TSDBKEY
key
=
{.
ts
=
pBlockData
->
aTSKEY
[
iRow
],
.
version
=
pBlockData
->
aVersion
[
iRow
]}
;
uint32_t
delimiter
;
n
+=
tGetU32
(
pReader
->
aBuf
[
0
]
+
n
,
&
delimiter
);
ASSERT
(
delimiter
==
TSDB_FILE_DLMT
)
;
if
(
iRow
==
0
)
{
if
(
tsdbKeyCmprFn
(
&
pDataBlk
->
minKey
,
&
key
)
>
0
)
{
pDataBlk
->
minKey
=
key
;
}
}
else
{
if
(
pBlockData
->
aTSKEY
[
iRow
]
==
pBlockData
->
aTSKEY
[
iRow
-
1
])
{
pDataBlk
->
hasDup
=
1
;
}
}
tn
=
tGetMapData
(
pReader
->
aBuf
[
0
]
+
n
,
mBlock
);
if
(
tn
<
0
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
n
+=
tn
;
ASSERT
(
n
+
sizeof
(
TSCKSUM
)
==
size
);
if
(
iRow
==
pBlockData
->
nRow
-
1
&&
tsdbKeyCmprFn
(
&
pDataBlk
->
maxKey
,
&
key
)
<
0
)
{
pDataBlk
->
maxKey
=
key
;
}
return
code
;
pDataBlk
->
minVer
=
TMIN
(
pDataBlk
->
minVer
,
key
.
version
);
pDataBlk
->
maxVer
=
TMAX
(
pDataBlk
->
maxVer
,
key
.
version
);
}
pDataBlk
->
nRow
+=
pBlockData
->
nRow
;
_err:
tsdbError
(
"vgId:%d, read block failed since %s"
,
TD_VID
(
pReader
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
static
int32_t
tsdbWriteBlockSma
(
SDataFWriter
*
pWriter
,
SBlockData
*
pBlockData
,
SSmaInfo
*
pSmaInfo
)
{
int32_t
code
=
0
;
pSmaInfo
->
offset
=
0
;
pSmaInfo
->
size
=
0
;
int32_t
tsdbReadBlockSma
(
SDataFReader
*
pReader
,
SDataBlk
*
pDataBlk
,
SArray
*
aColumnDataAgg
)
{
int32_t
code
=
0
;
SSmaInfo
*
pSmaInfo
=
&
pDataBlk
->
smaInfo
;
// encode
for
(
int32_t
iColData
=
0
;
iColData
<
taosArrayGetSize
(
pBlockData
->
aIdx
);
iColData
++
)
{
SColData
*
pColData
=
tBlockDataGetColDataByIdx
(
pBlockData
,
iColData
);
ASSERT
(
pSmaInfo
->
size
>
0
);
if
((
!
pColData
->
smaOn
)
||
IS_VAR_DATA_TYPE
(
pColData
->
type
))
continue
;
taosArrayClear
(
aColumnDataAgg
)
;
SColumnDataAgg
sma
;
tsdbCalcColDataSMA
(
pColData
,
&
sma
);
// alloc
int32_t
size
=
pSmaInfo
->
size
+
sizeof
(
TSCKSUM
);
code
=
tRealloc
(
&
pReader
->
aBuf
[
0
],
size
);
if
(
code
)
goto
_err
;
code
=
tRealloc
(
&
pWriter
->
aBuf
[
0
],
pSmaInfo
->
size
+
tPutColumnDataAgg
(
NULL
,
&
sma
));
if
(
code
)
goto
_err
;
pSmaInfo
->
size
+=
tPutColumnDataAgg
(
pWriter
->
aBuf
[
0
]
+
pSmaInfo
->
size
,
&
sma
);
// seek
int64_t
n
=
taosLSeekFile
(
pReader
->
pSmaFD
,
pSmaInfo
->
offset
,
SEEK_SET
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
else
if
(
n
<
pSmaInfo
->
offset
)
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
// write
if
(
pSmaInfo
->
size
)
{
int32_t
size
=
pSmaInfo
->
size
+
sizeof
(
TSCKSUM
);
// read
n
=
taosReadFile
(
pReader
->
pSmaFD
,
pReader
->
aBuf
[
0
],
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
else
if
(
n
<
size
)
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
code
=
tRealloc
(
&
pWriter
->
aBuf
[
0
],
size
);
if
(
code
)
goto
_err
;
// check
if
(
!
taosCheckChecksumWhole
(
pReader
->
aBuf
[
0
],
size
))
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
taosCalcChecksumAppend
(
0
,
pWriter
->
aBuf
[
0
],
size
);
// decode
n
=
0
;
while
(
n
<
pSmaInfo
->
size
)
{
SColumnDataAgg
sma
;
int64_t
n
=
taosWriteFile
(
pWriter
->
pSmaFD
,
pWriter
->
aBuf
[
0
],
size
);
if
(
n
<
0
)
{
code
=
T
AOS_SYSTEM_ERROR
(
errno
)
;
n
+=
tGetColumnDataAgg
(
pReader
->
aBuf
[
0
]
+
n
,
&
sma
);
if
(
taosArrayPush
(
aColumnDataAgg
,
&
sma
)
==
NULL
)
{
code
=
T
SDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pSmaInfo
->
offset
=
pWriter
->
fSma
.
size
;
pWriter
->
fSma
.
size
+=
size
;
}
return
code
;
_err:
tsdbError
(
"vgId:%d tsdb
write block sma failed since %s"
,
TD_VID
(
pWrit
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d tsdb
read block sma failed since %s"
,
TD_VID
(
pRead
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
int32_t
tsdbWriteBlockData
(
SDataFWriter
*
pWriter
,
SBlockData
*
pBlockData
,
SBlockInfo
*
pBlkInfo
,
SSmaInfo
*
pSmaInfo
,
int8_t
cmprAlg
,
int8_t
toLast
)
{
static
int32_t
tsdbReadBlockDataImpl
(
SDataFReader
*
pReader
,
SBlockInfo
*
pBlkInfo
,
int8_t
fromLast
,
SBlockData
*
pBlockData
)
{
int32_t
code
=
0
;
ASSERT
(
pBlockData
->
nRow
>
0
);
tBlockDataClear
(
pBlockData
);
TdFilePtr
pFD
=
fromLast
?
pReader
->
aLastFD
[
0
]
:
pReader
->
pDataFD
;
// (todo)
// uid + version + tskey
code
=
tsdbReadAndCheck
(
pFD
,
pBlkInfo
->
offset
,
&
pReader
->
aBuf
[
0
],
pBlkInfo
->
szKey
,
1
);
if
(
code
)
goto
_err
;
SDiskDataHdr
hdr
;
uint8_t
*
p
=
pReader
->
aBuf
[
0
]
+
tGetDiskDataHdr
(
pReader
->
aBuf
[
0
],
&
hdr
);
ASSERT
(
hdr
.
delimiter
==
TSDB_FILE_DLMT
);
ASSERT
(
pBlockData
->
suid
==
hdr
.
suid
);
ASSERT
(
pBlockData
->
uid
==
hdr
.
uid
);
pBlockData
->
nRow
=
hdr
.
nRow
;
// uid
if
(
hdr
.
uid
==
0
)
{
ASSERT
(
hdr
.
szUid
);
code
=
tsdbDecmprData
(
p
,
hdr
.
szUid
,
TSDB_DATA_TYPE_BIGINT
,
hdr
.
cmprAlg
,
(
uint8_t
**
)
&
pBlockData
->
aUid
,
sizeof
(
int64_t
)
*
hdr
.
nRow
,
&
pReader
->
aBuf
[
1
]);
if
(
code
)
goto
_err
;
}
else
{
ASSERT
(
!
hdr
.
szUid
);
}
p
+=
hdr
.
szUid
;
pBlkInfo
->
offset
=
toLast
?
pWriter
->
fSst
[
pWriter
->
wSet
.
nSstF
-
1
].
size
:
pWriter
->
fData
.
size
;
pBlkInfo
->
szBlock
=
0
;
pBlkInfo
->
szKey
=
0
;
// version
code
=
tsdbDecmprData
(
p
,
hdr
.
szVer
,
TSDB_DATA_TYPE_BIGINT
,
hdr
.
cmprAlg
,
(
uint8_t
**
)
&
pBlockData
->
aVersion
,
sizeof
(
int64_t
)
*
hdr
.
nRow
,
&
pReader
->
aBuf
[
1
]);
if
(
code
)
goto
_err
;
p
+=
hdr
.
szVer
;
int32_t
aBufN
[
4
]
=
{
0
};
code
=
tCmprBlockData
(
pBlockData
,
cmprAlg
,
NULL
,
NULL
,
pWriter
->
aBuf
,
aBufN
);
// TSKEY
code
=
tsdbDecmprData
(
p
,
hdr
.
szKey
,
TSDB_DATA_TYPE_TIMESTAMP
,
hdr
.
cmprAlg
,
(
uint8_t
**
)
&
pBlockData
->
aTSKEY
,
sizeof
(
TSKEY
)
*
hdr
.
nRow
,
&
pReader
->
aBuf
[
1
]);
if
(
code
)
goto
_err
;
p
+=
hdr
.
szKey
;
// write =================
TdFilePtr
pFD
=
toLast
?
pWriter
->
pLastFD
:
pWriter
->
pDataFD
;
ASSERT
(
p
-
pReader
->
aBuf
[
0
]
==
pBlkInfo
->
szKey
-
sizeof
(
TSCKSUM
));
pBlkInfo
->
szKey
=
aBufN
[
3
]
+
aBufN
[
2
];
pBlkInfo
->
szBlock
=
aBufN
[
0
]
+
aBufN
[
1
]
+
aBufN
[
2
]
+
aBufN
[
3
]
;
// read and decode columns
if
(
taosArrayGetSize
(
pBlockData
->
aIdx
)
==
0
)
goto
_exit
;
i
nt64_t
n
=
taosWriteFile
(
pFD
,
pWriter
->
aBuf
[
3
],
aBufN
[
3
]);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
i
f
(
hdr
.
szBlkCol
>
0
)
{
int64_t
offset
=
pBlkInfo
->
offset
+
pBlkInfo
->
szKey
;
code
=
tsdbReadAndCheck
(
pFD
,
offset
,
&
pReader
->
aBuf
[
0
],
hdr
.
szBlkCol
+
sizeof
(
TSCKSUM
),
1
);
if
(
code
)
goto
_err
;
}
n
=
taosWriteFile
(
pFD
,
pWriter
->
aBuf
[
2
],
aBufN
[
2
]);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
SBlockCol
blockCol
=
{.
cid
=
0
};
SBlockCol
*
pBlockCol
=
&
blockCol
;
int32_t
n
=
0
;
if
(
aBufN
[
1
])
{
n
=
taosWriteFile
(
pFD
,
pWriter
->
aBuf
[
1
],
aBufN
[
1
]);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
}
for
(
int32_t
iColData
=
0
;
iColData
<
taosArrayGetSize
(
pBlockData
->
aIdx
);
iColData
++
)
{
SColData
*
pColData
=
tBlockDataGetColDataByIdx
(
pBlockData
,
iColData
);
if
(
aBufN
[
0
])
{
n
=
taosWriteFile
(
pFD
,
pWriter
->
aBuf
[
0
],
aBufN
[
0
]);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
while
(
pBlockCol
&&
pBlockCol
->
cid
<
pColData
->
cid
)
{
if
(
n
<
hdr
.
szBlkCol
)
{
n
+=
tGetBlockCol
(
pReader
->
aBuf
[
0
]
+
n
,
pBlockCol
);
}
else
{
ASSERT
(
n
==
hdr
.
szBlkCol
);
pBlockCol
=
NULL
;
}
}
}
// update info
if
(
toLast
)
{
pWriter
->
fSst
[
pWriter
->
wSet
.
nSstF
-
1
].
size
+=
pBlkInfo
->
szBlock
;
}
else
{
pWriter
->
fData
.
size
+=
pBlkInfo
->
szBlock
;
}
if
(
pBlockCol
==
NULL
||
pBlockCol
->
cid
>
pColData
->
cid
)
{
// add a lot of NONE
for
(
int32_t
iRow
=
0
;
iRow
<
hdr
.
nRow
;
iRow
++
)
{
code
=
tColDataAppendValue
(
pColData
,
&
COL_VAL_NONE
(
pColData
->
cid
,
pColData
->
type
));
if
(
code
)
goto
_err
;
}
}
else
{
ASSERT
(
pBlockCol
->
type
==
pColData
->
type
);
ASSERT
(
pBlockCol
->
flag
&&
pBlockCol
->
flag
!=
HAS_NONE
);
// ================= SMA ====================
if
(
pSmaInfo
)
{
code
=
tsdbWriteBlockSma
(
pWriter
,
pBlockData
,
pSmaInfo
);
if
(
code
)
goto
_err
;
if
(
pBlockCol
->
flag
==
HAS_NULL
)
{
// add a lot of NULL
for
(
int32_t
iRow
=
0
;
iRow
<
hdr
.
nRow
;
iRow
++
)
{
code
=
tColDataAppendValue
(
pColData
,
&
COL_VAL_NULL
(
pBlockCol
->
cid
,
pBlockCol
->
type
));
if
(
code
)
goto
_err
;
}
}
else
{
// decode from binary
int64_t
offset
=
pBlkInfo
->
offset
+
pBlkInfo
->
szKey
+
hdr
.
szBlkCol
+
sizeof
(
TSCKSUM
)
+
pBlockCol
->
offset
;
int32_t
size
=
pBlockCol
->
szBitmap
+
pBlockCol
->
szOffset
+
pBlockCol
->
szValue
+
sizeof
(
TSCKSUM
);
code
=
tsdbReadAndCheck
(
pFD
,
offset
,
&
pReader
->
aBuf
[
1
],
size
,
0
);
if
(
code
)
goto
_err
;
code
=
tsdbDecmprColData
(
pReader
->
aBuf
[
1
],
pBlockCol
,
hdr
.
cmprAlg
,
hdr
.
nRow
,
pColData
,
&
pReader
->
aBuf
[
2
]);
if
(
code
)
goto
_err
;
}
}
}
_exit:
tsdbTrace
(
"vgId:%d tsdb write block data, suid:%"
PRId64
" uid:%"
PRId64
" nRow:%d, offset:%"
PRId64
" size:%d"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
pBlockData
->
suid
,
pBlockData
->
uid
,
pBlockData
->
nRow
,
pBlkInfo
->
offset
,
pBlkInfo
->
szBlock
);
return
code
;
_err:
tsdbError
(
"vgId:%d tsdb
write block data failed since %s"
,
TD_VID
(
pWrit
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d tsdb
read block data impl failed since %s"
,
TD_VID
(
pRead
er
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
int32_t
tsdbDFileSetCopy
(
STsdb
*
pTsdb
,
SDFileSet
*
pSetFrom
,
SDFileSet
*
pSetTo
)
{
int32_t
code
=
0
;
int64_t
n
;
int64_t
size
;
TdFilePtr
pOutFD
=
NULL
;
// TODO
TdFilePtr
PInFD
=
NULL
;
// TODO
char
fNameFrom
[
TSDB_FILENAME_LEN
];
char
fNameTo
[
TSDB_FILENAME_LEN
];
int32_t
tsdbReadDataBlock
(
SDataFReader
*
pReader
,
SDataBlk
*
pDataBlk
,
SBlockData
*
pBlockData
)
{
int32_t
code
=
0
;
// head
tsdbHeadFileName
(
pTsdb
,
pSetFrom
->
diskId
,
pSetFrom
->
fid
,
pSetFrom
->
pHeadF
,
fNameFrom
);
tsdbHeadFileName
(
pTsdb
,
pSetTo
->
diskId
,
pSetTo
->
fid
,
pSetTo
->
pHeadF
,
fNameTo
);
code
=
tsdbReadBlockDataImpl
(
pReader
,
&
pDataBlk
->
aSubBlock
[
0
],
0
,
pBlockData
);
if
(
code
)
goto
_err
;
pOutFD
=
taosOpenFile
(
fNameTo
,
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
);
if
(
pOutFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
if
(
pDataBlk
->
nSubBlock
>
1
)
{
SBlockData
bData1
;
SBlockData
bData2
;
PInFD
=
taosOpenFile
(
fNameFrom
,
TD_FILE_READ
);
if
(
PInFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
)
;
goto
_err
;
}
// create
code
=
tBlockDataCreate
(
&
bData1
);
if
(
code
)
goto
_err
;
code
=
tBlockDataCreate
(
&
bData2
)
;
if
(
code
)
goto
_err
;
n
=
taosFSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
pHeadF
->
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
taosCloseFile
(
&
pOutFD
);
taosCloseFile
(
&
PInFD
);
// init
tBlockDataInitEx
(
&
bData1
,
pBlockData
);
tBlockDataInitEx
(
&
bData2
,
pBlockData
);
// data
tsdbDataFileName
(
pTsdb
,
pSetFrom
->
diskId
,
pSetFrom
->
fid
,
pSetFrom
->
pDataF
,
fNameFrom
);
tsdbDataFileName
(
pTsdb
,
pSetTo
->
diskId
,
pSetTo
->
fid
,
pSetTo
->
pDataF
,
fNameTo
);
for
(
int32_t
iSubBlock
=
1
;
iSubBlock
<
pDataBlk
->
nSubBlock
;
iSubBlock
++
)
{
code
=
tsdbReadBlockDataImpl
(
pReader
,
&
pDataBlk
->
aSubBlock
[
iSubBlock
],
0
,
&
bData1
);
if
(
code
)
{
tBlockDataDestroy
(
&
bData1
,
1
);
tBlockDataDestroy
(
&
bData2
,
1
);
goto
_err
;
}
pOutFD
=
taosOpenFile
(
fNameTo
,
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
);
if
(
pOutFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
code
=
tBlockDataCopy
(
pBlockData
,
&
bData2
);
if
(
code
)
{
tBlockDataDestroy
(
&
bData1
,
1
);
tBlockDataDestroy
(
&
bData2
,
1
);
goto
_err
;
}
PInFD
=
taosOpenFile
(
fNameFrom
,
TD_FILE_READ
);
if
(
PInFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
code
=
tBlockDataMerge
(
&
bData1
,
&
bData2
,
pBlockData
);
if
(
code
)
{
tBlockDataDestroy
(
&
bData1
,
1
);
tBlockDataDestroy
(
&
bData2
,
1
);
goto
_err
;
}
}
n
=
taosFSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
pDataF
->
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
tBlockDataDestroy
(
&
bData1
,
1
);
tBlockDataDestroy
(
&
bData2
,
1
);
}
taosCloseFile
(
&
pOutFD
);
taosCloseFile
(
&
PInFD
);
// sst
tsdbSstFileName
(
pTsdb
,
pSetFrom
->
diskId
,
pSetFrom
->
fid
,
pSetFrom
->
aSstF
[
0
],
fNameFrom
);
tsdbSstFileName
(
pTsdb
,
pSetTo
->
diskId
,
pSetTo
->
fid
,
pSetTo
->
aSstF
[
0
],
fNameTo
);
return
code
;
pOutFD
=
taosOpenFile
(
fNameTo
,
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
);
if
(
pOutFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
_err:
tsdbError
(
"vgId:%d tsdb read data block failed since %s"
,
TD_VID
(
pReader
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
PInFD
=
taosOpenFile
(
fNameFrom
,
TD_FILE_READ
);
if
(
PInFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
int32_t
tsdbReadSstBlock
(
SDataFReader
*
pReader
,
int32_t
iSst
,
SSstBlk
*
pSstBlk
,
SBlockData
*
pBlockData
)
{
int32_t
code
=
0
;
n
=
taosFSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
aSstF
[
0
]
->
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
taosCloseFile
(
&
pOutFD
);
taosCloseFile
(
&
PInFD
);
code
=
tsdbReadBlockDataImpl
(
pReader
,
&
pSstBlk
->
bInfo
,
1
,
pBlockData
);
if
(
code
)
goto
_err
;
// sma
tsdbSmaFileName
(
pTsdb
,
pSetFrom
->
diskId
,
pSetFrom
->
fid
,
pSetFrom
->
pSmaF
,
fNameFrom
);
tsdbSmaFileName
(
pTsdb
,
pSetTo
->
diskId
,
pSetTo
->
fid
,
pSetTo
->
pSmaF
,
fNameTo
);
return
code
;
pOutFD
=
taosOpenFile
(
fNameTo
,
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
);
if
(
pOutFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
_err:
tsdbError
(
"vgId:%d tsdb read last block failed since %s"
,
TD_VID
(
pReader
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
PInFD
=
taosOpenFile
(
fNameFrom
,
TD_FILE_READ
);
if
(
PInFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
int32_t
tsdbReadSstBlockEx
(
SDataFReader
*
pReader
,
int32_t
iSst
,
SSstBlk
*
pSstBlk
,
SBlockData
*
pBlockData
)
{
int32_t
code
=
0
;
n
=
taosFSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
pSmaF
->
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
taosCloseFile
(
&
pOutFD
);
taosCloseFile
(
&
PInFD
);
// read
code
=
tsdbReadAndCheck
(
pReader
->
aLastFD
[
iSst
],
pSstBlk
->
bInfo
.
offset
,
&
pReader
->
aBuf
[
0
],
pSstBlk
->
bInfo
.
szBlock
,
0
);
if
(
code
)
goto
_exit
;
return
code
;
// decmpr
code
=
tDecmprBlockData
(
pReader
->
aBuf
[
0
],
pSstBlk
->
bInfo
.
szBlock
,
pBlockData
,
&
pReader
->
aBuf
[
1
]);
if
(
code
)
goto
_exit
;
_err:
tsdbError
(
"vgId:%d, tsdb DFileSet copy failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
_exit:
return
code
;
}
...
...
@@ -1558,7 +1534,6 @@ _err:
tsdbError
(
"vgId:%d, update del file hdr failed since %s"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
// SDelFReader ====================================================
struct
SDelFReader
{
STsdb
*
pTsdb
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录