Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
605b0d5e
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
605b0d5e
编写于
7月 22, 2022
作者:
H
Hongze Cheng
提交者:
GitHub
7月 22, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15286 from taosdata/fix/hzcheng_3.0
fix: r/w concurrency
上级
ab2ad2cc
c4743997
变更
13
展开全部
隐藏空白更改
内联
并排
Showing
13 changed file
with
1848 addition
and
1373 deletion
+1848
-1373
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+76
-59
source/dnode/vnode/src/tsdb/tsdbCache.c
source/dnode/vnode/src/tsdb/tsdbCache.c
+485
-483
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+41
-32
source/dnode/vnode/src/tsdb/tsdbFS.c
source/dnode/vnode/src/tsdb/tsdbFS.c
+831
-449
source/dnode/vnode/src/tsdb/tsdbFile.c
source/dnode/vnode/src/tsdb/tsdbFile.c
+59
-127
source/dnode/vnode/src/tsdb/tsdbMemTable.c
source/dnode/vnode/src/tsdb/tsdbMemTable.c
+11
-47
source/dnode/vnode/src/tsdb/tsdbOpen.c
source/dnode/vnode/src/tsdb/tsdbOpen.c
+2
-2
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+80
-20
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
+116
-64
source/dnode/vnode/src/tsdb/tsdbRetention.c
source/dnode/vnode/src/tsdb/tsdbRetention.c
+66
-57
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
+56
-25
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+13
-2
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+12
-6
未找到文件。
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
605b0d5e
...
...
@@ -40,12 +40,9 @@ typedef struct SDelIdx SDelIdx;
typedef
struct
STbData
STbData
;
typedef
struct
SMemTable
SMemTable
;
typedef
struct
STbDataIter
STbDataIter
;
typedef
struct
STable
STable
;
typedef
struct
SMapData
SMapData
;
typedef
struct
SBlockIdx
SBlockIdx
;
typedef
struct
SBlock
SBlock
;
typedef
struct
SBlockStatis
SBlockStatis
;
typedef
struct
SAggrBlkCol
SAggrBlkCol
;
typedef
struct
SColData
SColData
;
typedef
struct
SBlockDataHdr
SBlockDataHdr
;
typedef
struct
SBlockData
SBlockData
;
...
...
@@ -62,8 +59,7 @@ typedef struct SDelFReader SDelFReader;
typedef
struct
SRowIter
SRowIter
;
typedef
struct
STsdbFS
STsdbFS
;
typedef
struct
SRowMerger
SRowMerger
;
typedef
struct
STsdbFSState
STsdbFSState
;
typedef
struct
STsdbSnapHdr
STsdbSnapHdr
;
typedef
struct
STsdbReadSnap
STsdbReadSnap
;
#define TSDB_MAX_SUBBLOCKS 8
#define TSDB_FHDR_SIZE 512
...
...
@@ -176,8 +172,6 @@ void tsdbMemTableDestroy(SMemTable *pMemTable);
void
tsdbGetTbDataFromMemTable
(
SMemTable
*
pMemTable
,
tb_uid_t
suid
,
tb_uid_t
uid
,
STbData
**
ppTbData
);
void
tsdbRefMemTable
(
SMemTable
*
pMemTable
);
void
tsdbUnrefMemTable
(
SMemTable
*
pMemTable
);
int32_t
tsdbTakeMemSnapshot
(
STsdb
*
pTsdb
,
SMemTable
**
ppMem
,
SMemTable
**
ppIMem
);
void
tsdbUntakeMemSnapshot
(
STsdb
*
pTsdb
,
SMemTable
*
pMem
,
SMemTable
*
pIMem
);
// STbDataIter
int32_t
tsdbTbDataIterCreate
(
STbData
*
pTbData
,
TSDBKEY
*
pFrom
,
int8_t
backward
,
STbDataIter
**
ppIter
);
void
*
tsdbTbDataIterDestroy
(
STbDataIter
*
pIter
);
...
...
@@ -188,30 +182,39 @@ bool tsdbTbDataIterNext(STbDataIter *pIter);
int32_t
tsdbGetNRowsInTbData
(
STbData
*
pTbData
);
// tsdbFile.c ==============================================================================================
typedef
enum
{
TSDB_HEAD_FILE
=
0
,
TSDB_DATA_FILE
,
TSDB_LAST_FILE
,
TSDB_SMA_FILE
}
EDataFileT
;
void
tsdbDataFileName
(
STsdb
*
pTsdb
,
SDFileSet
*
pDFileSet
,
EDataFileT
ftype
,
char
fname
[]);
bool
tsdbFileIsSame
(
SDFileSet
*
pDFileSet1
,
SDFileSet
*
pDFileSet2
,
EDataFileT
ftype
);
bool
tsdbDelFileIsSame
(
SDelFile
*
pDelFile1
,
SDelFile
*
pDelFile2
);
int32_t
tsdbUpdateDFileHdr
(
TdFilePtr
pFD
,
SDFileSet
*
pSet
,
EDataFileT
ftype
);
int32_t
tsdbDFileRollback
(
STsdb
*
pTsdb
,
SDFileSet
*
pSet
,
EDataFileT
ftype
);
int32_t
tPutDataFileHdr
(
uint8_t
*
p
,
SDFileSet
*
pSet
,
EDataFileT
ftype
);
int32_t
tPutHeadFile
(
uint8_t
*
p
,
SHeadFile
*
pHeadFile
);
int32_t
tPutDataFile
(
uint8_t
*
p
,
SDataFile
*
pDataFile
);
int32_t
tPutLastFile
(
uint8_t
*
p
,
SLastFile
*
pLastFile
);
int32_t
tPutSmaFile
(
uint8_t
*
p
,
SSmaFile
*
pSmaFile
);
int32_t
tPutDelFile
(
uint8_t
*
p
,
SDelFile
*
pDelFile
);
int32_t
tGetDelFile
(
uint8_t
*
p
,
SDelFile
*
pDelFile
);
int32_t
tPutDFileSet
(
uint8_t
*
p
,
SDFileSet
*
pSet
);
int32_t
tGetDFileSet
(
uint8_t
*
p
,
SDFileSet
*
pSet
);
void
tsdbHeadFileName
(
STsdb
*
pTsdb
,
SDiskID
did
,
int32_t
fid
,
SHeadFile
*
pHeadF
,
char
fname
[]);
void
tsdbDataFileName
(
STsdb
*
pTsdb
,
SDiskID
did
,
int32_t
fid
,
SDataFile
*
pDataF
,
char
fname
[]);
void
tsdbLastFileName
(
STsdb
*
pTsdb
,
SDiskID
did
,
int32_t
fid
,
SLastFile
*
pLastF
,
char
fname
[]);
void
tsdbSmaFileName
(
STsdb
*
pTsdb
,
SDiskID
did
,
int32_t
fid
,
SSmaFile
*
pSmaF
,
char
fname
[]);
// SDelFile
void
tsdbDelFileName
(
STsdb
*
pTsdb
,
SDelFile
*
pFile
,
char
fname
[]);
// tsdbFS.c ==============================================================================================
int32_t
tsdbFSOpen
(
STsdb
*
pTsdb
,
STsdbFS
**
ppFS
);
int32_t
tsdbFSClose
(
STsdbFS
*
pFS
);
int32_t
tsdbFSBegin
(
STsdbFS
*
pFS
);
int32_t
tsdbFSCommit
(
STsdbFS
*
pFS
);
int32_t
tsdbFSOpen
(
STsdb
*
pTsdb
);
int32_t
tsdbFSClose
(
STsdb
*
pTsdb
);
int32_t
tsdbFSCopy
(
STsdb
*
pTsdb
,
STsdbFS
*
pFS
);
void
tsdbFSDestroy
(
STsdbFS
*
pFS
);
int32_t
tDFileSetCmprFn
(
const
void
*
p1
,
const
void
*
p2
);
int32_t
tsdbFSCommit1
(
STsdb
*
pTsdb
,
STsdbFS
*
pFS
);
int32_t
tsdbFSCommit2
(
STsdb
*
pTsdb
,
STsdbFS
*
pFS
);
int32_t
tsdbFSRef
(
STsdb
*
pTsdb
,
STsdbFS
*
pFS
);
void
tsdbFSUnref
(
STsdb
*
pTsdb
,
STsdbFS
*
pFS
);
int32_t
tsdbFSRollback
(
STsdbFS
*
pFS
);
int32_t
tsdbFSStateUpsertDelFile
(
STsdbFSState
*
pState
,
SDelFile
*
pDelFile
);
int32_t
tsdbFSStateUpsertDFileSet
(
STsdbFSState
*
pState
,
SDFileSet
*
pSet
);
void
tsdbFSStateDeleteDFileSet
(
STsdbFSState
*
pState
,
int32_t
fid
);
SDelFile
*
tsdbFSStateGetDelFile
(
STsdbFSState
*
pState
);
SDFileSet
*
tsdbFSStateGetDFileSet
(
STsdbFSState
*
pState
,
int32_t
fid
,
int32_t
flag
);
int32_t
tsdbFSUpsertFSet
(
STsdbFS
*
pFS
,
SDFileSet
*
pSet
);
int32_t
tsdbFSUpsertDelFile
(
STsdbFS
*
pFS
,
SDelFile
*
pDelFile
);
// tsdbReaderWriter.c ==============================================================================================
// SDataFWriter
int32_t
tsdbDataFWriterOpen
(
SDataFWriter
**
ppWriter
,
STsdb
*
pTsdb
,
SDFileSet
*
pSet
);
...
...
@@ -222,8 +225,7 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBu
int32_t
tsdbWriteBlockData
(
SDataFWriter
*
pWriter
,
SBlockData
*
pBlockData
,
uint8_t
**
ppBuf1
,
uint8_t
**
ppBuf2
,
SBlockIdx
*
pBlockIdx
,
SBlock
*
pBlock
,
int8_t
cmprAlg
);
SDFileSet
*
tsdbDataFWriterGetWSet
(
SDataFWriter
*
pWriter
);
int32_t
tsdbDFileSetCopy
(
STsdb
*
pTsdb
,
SDFileSet
*
pSetFrom
,
SDFileSet
*
pSetTo
);
int32_t
tsdbDFileSetCopy
(
STsdb
*
pTsdb
,
SDFileSet
*
pSetFrom
,
SDFileSet
*
pSetTo
);
// SDataFReader
int32_t
tsdbDataFReaderOpen
(
SDataFReader
**
ppReader
,
STsdb
*
pTsdb
,
SDFileSet
*
pSet
);
int32_t
tsdbDataFReaderClose
(
SDataFReader
**
ppReader
);
...
...
@@ -245,6 +247,9 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
int32_t
tsdbDelFReaderClose
(
SDelFReader
**
ppReader
);
int32_t
tsdbReadDelData
(
SDelFReader
*
pReader
,
SDelIdx
*
pDelIdx
,
SArray
*
aDelData
,
uint8_t
**
ppBuf
);
int32_t
tsdbReadDelIdx
(
SDelFReader
*
pReader
,
SArray
*
aDelIdx
,
uint8_t
**
ppBuf
);
// tsdbRead.c ==============================================================================================
int32_t
tsdbTakeReadSnap
(
STsdb
*
pTsdb
,
STsdbReadSnap
**
ppSnap
);
void
tsdbUntakeReadSnap
(
STsdb
*
pTsdb
,
STsdbReadSnap
*
pSnap
);
#define TSDB_CACHE_NO(c) ((c).cacheLast == 0)
#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0)
...
...
@@ -276,6 +281,11 @@ typedef struct {
TSKEY
minKey
;
}
SRtn
;
struct
STsdbFS
{
SDelFile
*
pDelFile
;
SArray
*
aDFileSet
;
// SArray<SDFileSet>
};
struct
STsdb
{
char
*
path
;
SVnode
*
pVnode
;
...
...
@@ -283,7 +293,7 @@ struct STsdb {
TdThreadRwlock
rwLock
;
SMemTable
*
mem
;
SMemTable
*
imem
;
STsdbFS
*
pFS
;
STsdbFS
fs
;
SLRUCache
*
lruCache
;
};
...
...
@@ -402,16 +412,6 @@ struct SBlock {
SSubBlock
aSubBlock
[
TSDB_MAX_SUBBLOCKS
];
};
struct
SAggrBlkCol
{
int16_t
colId
;
int16_t
maxIndex
;
int16_t
minIndex
;
int16_t
numOfNull
;
int64_t
sum
;
int64_t
max
;
int64_t
min
;
};
struct
SColData
{
int16_t
cid
;
int8_t
type
;
...
...
@@ -465,12 +465,6 @@ struct SDelIdx {
int64_t
size
;
};
struct
SDelFile
{
int64_t
commitID
;
int64_t
size
;
int64_t
offset
;
};
#pragma pack(push, 1)
struct
SBlockDataHdr
{
uint32_t
delimiter
;
...
...
@@ -479,34 +473,50 @@ struct SBlockDataHdr {
};
#pragma pack(pop)
struct
SDelFile
{
volatile
int32_t
nRef
;
int64_t
commitID
;
int64_t
size
;
int64_t
offset
;
};
struct
SHeadFile
{
volatile
int32_t
nRef
;
int64_t
commitID
;
int64_t
size
;
int64_t
offset
;
};
struct
SDataFile
{
volatile
int32_t
nRef
;
int64_t
commitID
;
int64_t
size
;
};
struct
SLastFile
{
volatile
int32_t
nRef
;
int64_t
commitID
;
int64_t
size
;
};
struct
SSmaFile
{
volatile
int32_t
nRef
;
int64_t
commitID
;
int64_t
size
;
};
struct
SDFileSet
{
SDiskID
diskId
;
int32_t
fid
;
SHeadFile
fHead
;
SDataFile
fData
;
SLastFile
fLast
;
SSmaFile
fSma
;
SDiskID
diskId
;
int32_t
fid
;
SHeadFile
*
pHeadF
;
SDataFile
*
pDataF
;
SLastFile
*
pLastF
;
SSmaFile
*
pSmaF
;
};
struct
SRowIter
{
...
...
@@ -521,26 +531,33 @@ struct SRowMerger {
SArray
*
pArray
;
// SArray<SColVal>
};
struct
STsdbFSState
{
SDelFile
*
pDelFile
;
SArray
*
aDFileSet
;
// SArray<aDFileSet>
SDelFile
delFile
;
};
struct
STsdbFS
{
STsdb
*
pTsdb
;
TdThreadRwlock
lock
;
int8_t
inTxn
;
STsdbFSState
*
cState
;
STsdbFSState
*
nState
;
};
struct
SDelFWriter
{
STsdb
*
pTsdb
;
SDelFile
fDel
;
TdFilePtr
pWriteH
;
};
struct
SDataFWriter
{
STsdb
*
pTsdb
;
SDFileSet
wSet
;
TdFilePtr
pHeadFD
;
TdFilePtr
pDataFD
;
TdFilePtr
pLastFD
;
TdFilePtr
pSmaFD
;
SHeadFile
fHead
;
SDataFile
fData
;
SLastFile
fLast
;
SSmaFile
fSma
;
};
struct
STsdbReadSnap
{
SMemTable
*
pMem
;
SMemTable
*
pIMem
;
STsdbFS
fs
;
};
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/src/tsdb/tsdbCache.c
浏览文件 @
605b0d5e
此差异已折叠。
点击以展开。
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
605b0d5e
...
...
@@ -29,6 +29,7 @@ typedef struct {
int32_t
minRow
;
int32_t
maxRow
;
int8_t
cmprAlg
;
STsdbFS
fs
;
// --------------
TSKEY
nextKey
;
// reset by each table commit
int32_t
commitFid
;
...
...
@@ -119,9 +120,6 @@ int32_t tsdbCommit(STsdb *pTsdb) {
code
=
tsdbCommitDel
(
&
commith
);
if
(
code
)
goto
_err
;
code
=
tsdbCommitCache
(
&
commith
);
if
(
code
)
goto
_err
;
// end commit
code
=
tsdbEndCommit
(
&
commith
,
0
);
if
(
code
)
goto
_err
;
...
...
@@ -158,7 +156,7 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
goto
_err
;
}
SDelFile
*
pDelFileR
=
p
Tsdb
->
pFS
->
nState
->
pDelFile
;
SDelFile
*
pDelFileR
=
p
Committer
->
fs
.
pDelFile
;
if
(
pDelFileR
)
{
code
=
tsdbDelFReaderOpen
(
&
pCommitter
->
pDelFReader
,
pDelFileR
,
pTsdb
,
NULL
);
if
(
code
)
goto
_err
;
...
...
@@ -247,7 +245,7 @@ static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
code
=
tsdbUpdateDelFileHdr
(
pCommitter
->
pDelFWriter
);
if
(
code
)
goto
_err
;
code
=
tsdbFS
StateUpsertDelFile
(
pTsdb
->
pFS
->
nState
,
&
pCommitter
->
pDelFWriter
->
fDel
);
code
=
tsdbFS
UpsertDelFile
(
&
pCommitter
->
fs
,
&
pCommitter
->
pDelFWriter
->
fDel
);
if
(
code
)
goto
_err
;
code
=
tsdbDelFWriterClose
(
&
pCommitter
->
pDelFWriter
,
1
);
...
...
@@ -273,7 +271,6 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SDFileSet
*
pRSet
=
NULL
;
SDFileSet
wSet
;
// memory
pCommitter
->
nextKey
=
TSKEY_MAX
;
...
...
@@ -282,7 +279,8 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
taosArrayClear
(
pCommitter
->
aBlockIdx
);
tMapDataReset
(
&
pCommitter
->
oBlockMap
);
tBlockDataReset
(
&
pCommitter
->
oBlockData
);
pRSet
=
tsdbFSStateGetDFileSet
(
pTsdb
->
pFS
->
nState
,
pCommitter
->
commitFid
,
TD_EQ
);
pRSet
=
(
SDFileSet
*
)
taosArraySearch
(
pCommitter
->
fs
.
aDFileSet
,
&
(
SDFileSet
){.
fid
=
pCommitter
->
commitFid
},
tDFileSetCmprFn
,
TD_EQ
);
if
(
pRSet
)
{
code
=
tsdbDataFReaderOpen
(
&
pCommitter
->
pReader
,
pTsdb
,
pRSet
);
if
(
code
)
goto
_err
;
...
...
@@ -292,23 +290,29 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
}
// new
SHeadFile
fHead
;
SDataFile
fData
;
SLastFile
fLast
;
SSmaFile
fSma
;
SDFileSet
wSet
=
{.
pHeadF
=
&
fHead
,
.
pDataF
=
&
fData
,
.
pLastF
=
&
fLast
,
.
pSmaF
=
&
fSma
};
taosArrayClear
(
pCommitter
->
aBlockIdxN
);
tMapDataReset
(
&
pCommitter
->
nBlockMap
);
tBlockDataReset
(
&
pCommitter
->
nBlockData
);
if
(
pRSet
)
{
wSet
=
(
SDFileSet
){.
diskId
=
pRSet
->
diskId
,
.
fid
=
pCommitter
->
commitFid
,
.
fHead
=
{.
commitID
=
pCommitter
->
commitID
,
.
offset
=
0
,
.
size
=
0
},
.
fData
=
pRSet
->
fData
,
.
fLast
=
{.
commitID
=
pCommitter
->
commitID
,
.
size
=
0
},
.
fSma
=
pRSet
->
fSma
}
;
wSet
.
diskId
=
pRSet
->
diskId
;
wSet
.
fid
=
pCommitter
->
commitFid
;
fHead
=
(
SHeadFile
){.
commitID
=
pCommitter
->
commitID
,
.
offset
=
0
,
.
size
=
0
};
fData
=
*
pRSet
->
pDataF
;
fLast
=
(
SLastFile
){.
commitID
=
pCommitter
->
commitID
,
.
size
=
0
};
fSma
=
*
pRSet
->
pSmaF
;
}
else
{
wSet
=
(
SDFileSet
){.
diskId
=
(
SDiskID
){.
level
=
0
,
.
id
=
0
},
.
fid
=
pCommitter
->
commitFid
,
.
fHead
=
{.
commitID
=
pCommitter
->
commitID
,
.
offset
=
0
,
.
size
=
0
},
.
fData
=
{.
commitID
=
pCommitter
->
commitID
,
.
size
=
0
},
.
fLast
=
{.
commitID
=
pCommitter
->
commitID
,
.
size
=
0
},
.
fSma
=
{.
commitID
=
pCommitter
->
commitID
,
.
size
=
0
}
};
wSet
.
diskId
=
(
SDiskID
){.
level
=
0
,
.
id
=
0
};
wSet
.
fid
=
pCommitter
->
commitFid
;
fHead
=
(
SHeadFile
){.
commitID
=
pCommitter
->
commitID
,
.
offset
=
0
,
.
size
=
0
};
fData
=
(
SDataFile
){.
commitID
=
pCommitter
->
commitID
,
.
size
=
0
};
fLast
=
(
SLastFile
){.
commitID
=
pCommitter
->
commitID
,
.
size
=
0
};
fSma
=
(
SSmaFile
){.
commitID
=
pCommitter
->
commitID
,
.
size
=
0
};
}
code
=
tsdbDataFWriterOpen
(
&
pCommitter
->
pWriter
,
pTsdb
,
&
wSet
);
if
(
code
)
goto
_err
;
...
...
@@ -855,7 +859,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
if
(
code
)
goto
_err
;
// upsert SDFileSet
code
=
tsdbFS
StateUpsertDFileSet
(
pCommitter
->
pTsdb
->
pFS
->
nState
,
tsdbDataFWriterGetWSet
(
pCommitter
->
pWriter
)
);
code
=
tsdbFS
UpsertFSet
(
&
pCommitter
->
fs
,
&
pCommitter
->
pWriter
->
wSet
);
if
(
code
)
goto
_err
;
// close and sync
...
...
@@ -973,7 +977,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
pCommitter
->
maxRow
=
pTsdb
->
pVnode
->
config
.
tsdbCfg
.
maxRows
;
pCommitter
->
cmprAlg
=
pTsdb
->
pVnode
->
config
.
tsdbCfg
.
compression
;
code
=
tsdbFS
Begin
(
pTsdb
->
pFS
);
code
=
tsdbFS
Copy
(
pTsdb
,
&
pCommitter
->
fs
);
if
(
code
)
goto
_err
;
return
code
;
...
...
@@ -1142,28 +1146,33 @@ _err:
return
code
;
}
static
int32_t
tsdbCommitCache
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
static
int32_t
tsdbEndCommit
(
SCommitter
*
pCommitter
,
int32_t
eno
)
{
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
if
(
eno
==
0
)
{
code
=
tsdbFSCommit
(
pTsdb
->
pFS
);
}
else
{
code
=
tsdbFSRollback
(
pTsdb
->
pFS
);
}
ASSERT
(
eno
==
0
);
code
=
tsdbFSCommit1
(
pTsdb
,
&
pCommitter
->
fs
);
if
(
code
)
goto
_err
;
// lock
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
// commit or rollback
code
=
tsdbFSCommit2
(
pTsdb
,
&
pCommitter
->
fs
);
if
(
code
)
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_err
;
}
pTsdb
->
imem
=
NULL
;
// unlock
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
tsdbUnrefMemTable
(
pMemTable
);
tsdbFSDestroy
(
&
pCommitter
->
fs
);
tsdbInfo
(
"vgId:%d tsdb end commit"
,
TD_VID
(
pTsdb
->
pVnode
));
return
code
;
...
...
source/dnode/vnode/src/tsdb/tsdbFS.c
浏览文件 @
605b0d5e
此差异已折叠。
点击以展开。
source/dnode/vnode/src/tsdb/tsdbFile.c
浏览文件 @
605b0d5e
...
...
@@ -15,7 +15,7 @@
#include "tsdb.h"
static
int32_t
tPutHeadFile
(
uint8_t
*
p
,
SHeadFile
*
pHeadFile
)
{
int32_t
tPutHeadFile
(
uint8_t
*
p
,
SHeadFile
*
pHeadFile
)
{
int32_t
n
=
0
;
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pHeadFile
->
commitID
);
...
...
@@ -35,7 +35,7 @@ static int32_t tGetHeadFile(uint8_t *p, SHeadFile *pHeadFile) {
return
n
;
}
static
int32_t
tPutDataFile
(
uint8_t
*
p
,
SDataFile
*
pDataFile
)
{
int32_t
tPutDataFile
(
uint8_t
*
p
,
SDataFile
*
pDataFile
)
{
int32_t
n
=
0
;
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pDataFile
->
commitID
);
...
...
@@ -53,7 +53,7 @@ static int32_t tGetDataFile(uint8_t *p, SDataFile *pDataFile) {
return
n
;
}
static
int32_t
tPutLastFile
(
uint8_t
*
p
,
SLastFile
*
pLastFile
)
{
int32_t
tPutLastFile
(
uint8_t
*
p
,
SLastFile
*
pLastFile
)
{
int32_t
n
=
0
;
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pLastFile
->
commitID
);
...
...
@@ -71,7 +71,7 @@ static int32_t tGetLastFile(uint8_t *p, SLastFile *pLastFile) {
return
n
;
}
static
int32_t
tPutSmaFile
(
uint8_t
*
p
,
SSmaFile
*
pSmaFile
)
{
int32_t
tPutSmaFile
(
uint8_t
*
p
,
SSmaFile
*
pSmaFile
)
{
int32_t
n
=
0
;
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pSmaFile
->
commitID
);
...
...
@@ -90,123 +90,79 @@ static int32_t tGetSmaFile(uint8_t *p, SSmaFile *pSmaFile) {
}
// EXPOSED APIS ==================================================
void
tsdbDataFileName
(
STsdb
*
pTsdb
,
SDFileSet
*
pDFileSet
,
EDataFileT
ftype
,
char
fname
[])
{
STfs
*
pTfs
=
pTsdb
->
pVnode
->
pTfs
;
void
tsdbHeadFileName
(
STsdb
*
pTsdb
,
SDiskID
did
,
int32_t
fid
,
SHeadFile
*
pHeadF
,
char
fname
[])
{
snprintf
(
fname
,
TSDB_FILENAME_LEN
-
1
,
"%s%s%s%sv%df%dver%"
PRId64
"%s"
,
tfsGetDiskPath
(
pTsdb
->
pVnode
->
pTfs
,
did
),
TD_DIRSEP
,
pTsdb
->
path
,
TD_DIRSEP
,
TD_VID
(
pTsdb
->
pVnode
),
fid
,
pHeadF
->
commitID
,
".head"
);
}
switch
(
ftype
)
{
case
TSDB_HEAD_FILE
:
snprintf
(
fname
,
TSDB_FILENAME_LEN
-
1
,
"%s%s%s%sv%df%dver%"
PRId64
"%s"
,
tfsGetDiskPath
(
pTfs
,
pDFileSet
->
diskId
),
TD_DIRSEP
,
pTsdb
->
path
,
TD_DIRSEP
,
TD_VID
(
pTsdb
->
pVnode
),
pDFileSet
->
fid
,
pDFileSet
->
fHead
.
commitID
,
".head"
);
break
;
case
TSDB_DATA_FILE
:
snprintf
(
fname
,
TSDB_FILENAME_LEN
-
1
,
"%s%s%s%sv%df%dver%"
PRId64
"%s"
,
tfsGetDiskPath
(
pTfs
,
pDFileSet
->
diskId
),
TD_DIRSEP
,
pTsdb
->
path
,
TD_DIRSEP
,
TD_VID
(
pTsdb
->
pVnode
),
pDFileSet
->
fid
,
pDFileSet
->
fData
.
commitID
,
".data"
);
break
;
case
TSDB_LAST_FILE
:
snprintf
(
fname
,
TSDB_FILENAME_LEN
-
1
,
"%s%s%s%sv%df%dver%"
PRId64
"%s"
,
tfsGetDiskPath
(
pTfs
,
pDFileSet
->
diskId
),
TD_DIRSEP
,
pTsdb
->
path
,
TD_DIRSEP
,
TD_VID
(
pTsdb
->
pVnode
),
pDFileSet
->
fid
,
pDFileSet
->
fLast
.
commitID
,
".last"
);
break
;
case
TSDB_SMA_FILE
:
snprintf
(
fname
,
TSDB_FILENAME_LEN
-
1
,
"%s%s%s%sv%df%dver%"
PRId64
"%s"
,
tfsGetDiskPath
(
pTfs
,
pDFileSet
->
diskId
),
TD_DIRSEP
,
pTsdb
->
path
,
TD_DIRSEP
,
TD_VID
(
pTsdb
->
pVnode
),
pDFileSet
->
fid
,
pDFileSet
->
fSma
.
commitID
,
".sma"
);
break
;
default:
ASSERT
(
0
);
break
;
}
void
tsdbDataFileName
(
STsdb
*
pTsdb
,
SDiskID
did
,
int32_t
fid
,
SDataFile
*
pDataF
,
char
fname
[])
{
snprintf
(
fname
,
TSDB_FILENAME_LEN
-
1
,
"%s%s%s%sv%df%dver%"
PRId64
"%s"
,
tfsGetDiskPath
(
pTsdb
->
pVnode
->
pTfs
,
did
),
TD_DIRSEP
,
pTsdb
->
path
,
TD_DIRSEP
,
TD_VID
(
pTsdb
->
pVnode
),
fid
,
pDataF
->
commitID
,
".data"
);
}
bool
tsdbFileIsSame
(
SDFileSet
*
pDFileSet1
,
SDFileSet
*
pDFileSet2
,
EDataFileT
ftype
)
{
if
(
pDFileSet1
->
diskId
.
level
!=
pDFileSet2
->
diskId
.
level
||
pDFileSet1
->
diskId
.
id
!=
pDFileSet2
->
diskId
.
id
)
{
return
false
;
}
void
tsdbLastFileName
(
STsdb
*
pTsdb
,
SDiskID
did
,
int32_t
fid
,
SLastFile
*
pLastF
,
char
fname
[]
)
{
snprintf
(
fname
,
TSDB_FILENAME_LEN
-
1
,
"%s%s%s%sv%df%dver%"
PRId64
"%s"
,
tfsGetDiskPath
(
pTsdb
->
pVnode
->
pTfs
,
did
),
TD_DIRSEP
,
pTsdb
->
path
,
TD_DIRSEP
,
TD_VID
(
pTsdb
->
pVnode
),
fid
,
pLastF
->
commitID
,
".last"
)
;
}
switch
(
ftype
)
{
case
TSDB_HEAD_FILE
:
return
pDFileSet1
->
fHead
.
commitID
==
pDFileSet2
->
fHead
.
commitID
;
case
TSDB_DATA_FILE
:
return
pDFileSet1
->
fData
.
commitID
==
pDFileSet2
->
fData
.
commitID
;
case
TSDB_LAST_FILE
:
return
pDFileSet1
->
fLast
.
commitID
==
pDFileSet2
->
fLast
.
commitID
;
case
TSDB_SMA_FILE
:
return
pDFileSet1
->
fSma
.
commitID
==
pDFileSet2
->
fSma
.
commitID
;
default:
ASSERT
(
0
);
break
;
}
void
tsdbSmaFileName
(
STsdb
*
pTsdb
,
SDiskID
did
,
int32_t
fid
,
SSmaFile
*
pSmaF
,
char
fname
[])
{
snprintf
(
fname
,
TSDB_FILENAME_LEN
-
1
,
"%s%s%s%sv%df%dver%"
PRId64
"%s"
,
tfsGetDiskPath
(
pTsdb
->
pVnode
->
pTfs
,
did
),
TD_DIRSEP
,
pTsdb
->
path
,
TD_DIRSEP
,
TD_VID
(
pTsdb
->
pVnode
),
fid
,
pSmaF
->
commitID
,
".sma"
);
}
bool
tsdbDelFileIsSame
(
SDelFile
*
pDelFile1
,
SDelFile
*
pDelFile2
)
{
return
pDelFile1
->
commitID
==
pDelFile2
->
commitID
;
}
int32_t
tsdbUpdateDFileHdr
(
TdFilePtr
pFD
,
SDFileSet
*
pSet
,
EDataFileT
ftype
)
{
int32_t
code
=
0
;
int64_t
n
;
char
hdr
[
TSDB_FHDR_SIZE
];
memset
(
hdr
,
0
,
TSDB_FHDR_SIZE
);
tPutDataFileHdr
(
hdr
,
pSet
,
ftype
);
taosCalcChecksumAppend
(
0
,
hdr
,
TSDB_FHDR_SIZE
);
n
=
taosLSeekFile
(
pFD
,
0
,
SEEK_SET
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_exit
;
}
n
=
taosWriteFile
(
pFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_exit
;
}
_exit:
return
code
;
}
int32_t
tsdbDFileRollback
(
STsdb
*
pTsdb
,
SDFileSet
*
pSet
,
EDataFileT
ftype
)
{
int32_t
code
=
0
;
int64_t
size
;
int64_t
n
;
TdFilePtr
pFD
;
char
fname
[
TSDB_FILENAME_LEN
];
tsdbDataFileName
(
pTsdb
,
pSet
,
ftype
,
fname
);
// open
pFD
=
taosOpenFile
(
fname
,
TD_FILE_WRITE
);
if
(
pFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
char
hdr
[
TSDB_FHDR_SIZE
]
=
{
0
};
// truncate
switch
(
ftype
)
{
case
TSDB_HEAD_FILE
:
size
=
pSet
->
fHead
.
size
;
break
;
case
TSDB_DATA_FILE
:
size
=
pSet
->
fData
.
size
;
break
;
case
TSDB_LAST_FILE
:
size
=
pSet
->
fLast
.
size
;
size
=
pSet
->
pDataF
->
size
;
tsdbDataFileName
(
pTsdb
,
pSet
->
diskId
,
pSet
->
fid
,
pSet
->
pDataF
,
fname
);
tPutDataFile
(
hdr
,
pSet
->
pDataF
);
break
;
case
TSDB_SMA_FILE
:
size
=
pSet
->
fSma
.
size
;
size
=
pSet
->
pSmaF
->
size
;
tsdbSmaFileName
(
pTsdb
,
pSet
->
diskId
,
pSet
->
fid
,
pSet
->
pSmaF
,
fname
);
tPutSmaFile
(
hdr
,
pSet
->
pSmaF
);
break
;
default:
ASSERT
(
0
);
}
taosCalcChecksumAppend
(
0
,
hdr
,
TSDB_FHDR_SIZE
);
// open
pFD
=
taosOpenFile
(
fname
,
TD_FILE_WRITE
);
if
(
pFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// ftruncate
if
(
taosFtruncateFile
(
pFD
,
size
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// update header
code
=
tsdbUpdateDFileHdr
(
pFD
,
pSet
,
ftype
);
if
(
code
)
goto
_err
;
n
=
taosLSeekFile
(
pFD
,
0
,
SEEK_SET
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
n
=
taosWriteFile
(
pFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// sync
if
(
taosFsyncFile
(
pFD
)
<
0
)
{
...
...
@@ -220,42 +176,20 @@ int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype) {
return
code
;
_err:
tsdbError
(
"vgId:%d tsdb rollback file failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
int32_t
tPutDataFileHdr
(
uint8_t
*
p
,
SDFileSet
*
pSet
,
EDataFileT
ftype
)
{
int32_t
n
=
0
;
switch
(
ftype
)
{
case
TSDB_HEAD_FILE
:
n
+=
tPutHeadFile
(
p
?
p
+
n
:
p
,
&
pSet
->
fHead
);
break
;
case
TSDB_DATA_FILE
:
n
+=
tPutDataFile
(
p
?
p
+
n
:
p
,
&
pSet
->
fData
);
break
;
case
TSDB_LAST_FILE
:
n
+=
tPutLastFile
(
p
?
p
+
n
:
p
,
&
pSet
->
fLast
);
break
;
case
TSDB_SMA_FILE
:
n
+=
tPutSmaFile
(
p
?
p
+
n
:
p
,
&
pSet
->
fSma
);
break
;
default:
ASSERT
(
0
);
}
return
n
;
}
int32_t
tPutDFileSet
(
uint8_t
*
p
,
SDFileSet
*
pSet
)
{
int32_t
n
=
0
;
n
+=
tPutI32v
(
p
?
p
+
n
:
p
,
pSet
->
diskId
.
level
);
n
+=
tPutI32v
(
p
?
p
+
n
:
p
,
pSet
->
diskId
.
id
);
n
+=
tPutI32v
(
p
?
p
+
n
:
p
,
pSet
->
fid
);
n
+=
tPutHeadFile
(
p
?
p
+
n
:
p
,
&
pSet
->
fHead
);
n
+=
tPutDataFile
(
p
?
p
+
n
:
p
,
&
pSet
->
fData
);
n
+=
tPutLastFile
(
p
?
p
+
n
:
p
,
&
pSet
->
fLast
);
n
+=
tPutSmaFile
(
p
?
p
+
n
:
p
,
&
pSet
->
fSma
);
n
+=
tPutHeadFile
(
p
?
p
+
n
:
p
,
pSet
->
pHeadF
);
n
+=
tPutDataFile
(
p
?
p
+
n
:
p
,
pSet
->
pDataF
);
n
+=
tPutLastFile
(
p
?
p
+
n
:
p
,
pSet
->
pLastF
);
n
+=
tPutSmaFile
(
p
?
p
+
n
:
p
,
pSet
->
pSmaF
);
return
n
;
}
...
...
@@ -266,20 +200,18 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) {
n
+=
tGetI32v
(
p
+
n
,
&
pSet
->
diskId
.
level
);
n
+=
tGetI32v
(
p
+
n
,
&
pSet
->
diskId
.
id
);
n
+=
tGetI32v
(
p
+
n
,
&
pSet
->
fid
);
n
+=
tGetHeadFile
(
p
+
n
,
&
pSet
->
fHead
);
n
+=
tGetDataFile
(
p
+
n
,
&
pSet
->
fData
);
n
+=
tGetLastFile
(
p
+
n
,
&
pSet
->
fLast
);
n
+=
tGetSmaFile
(
p
+
n
,
&
pSet
->
fSma
);
n
+=
tGetHeadFile
(
p
+
n
,
pSet
->
pHeadF
);
n
+=
tGetDataFile
(
p
+
n
,
pSet
->
pDataF
);
n
+=
tGetLastFile
(
p
+
n
,
pSet
->
pLastF
);
n
+=
tGetSmaFile
(
p
+
n
,
pSet
->
pSmaF
);
return
n
;
}
// SDelFile ===============================================
void
tsdbDelFileName
(
STsdb
*
pTsdb
,
SDelFile
*
pFile
,
char
fname
[])
{
STfs
*
pTfs
=
pTsdb
->
pVnode
->
pTfs
;
snprintf
(
fname
,
TSDB_FILENAME_LEN
-
1
,
"%s%s%s%sv%dver%"
PRId64
"%s"
,
tfsGetPrimaryPath
(
pTfs
),
TD_DIRSEP
,
pTsdb
->
path
,
TD_DIRSEP
,
TD_VID
(
pTsdb
->
pVnode
),
pFile
->
commitID
,
".del"
);
snprintf
(
fname
,
TSDB_FILENAME_LEN
-
1
,
"%s%s%s%sv%dver%"
PRId64
"%s"
,
tfsGetPrimaryPath
(
pTsdb
->
pVnode
->
pTfs
),
TD_DIRSEP
,
pTsdb
->
path
,
TD_DIRSEP
,
TD_VID
(
pTsdb
->
pVnode
),
pFile
->
commitID
,
".del"
);
}
int32_t
tPutDelFile
(
uint8_t
*
p
,
SDelFile
*
pDelFile
)
{
...
...
source/dnode/vnode/src/tsdb/tsdbMemTable.c
浏览文件 @
605b0d5e
...
...
@@ -93,7 +93,11 @@ static int32_t tbDataPCmprFn(const void *p1, const void *p2) {
}
void
tsdbGetTbDataFromMemTable
(
SMemTable
*
pMemTable
,
tb_uid_t
suid
,
tb_uid_t
uid
,
STbData
**
ppTbData
)
{
STbData
*
pTbData
=
&
(
STbData
){.
suid
=
suid
,
.
uid
=
uid
};
void
*
p
=
taosArraySearch
(
pMemTable
->
aTbData
,
&
pTbData
,
tbDataPCmprFn
,
TD_EQ
);
taosRLockLatch
(
&
pMemTable
->
latch
);
void
*
p
=
taosArraySearch
(
pMemTable
->
aTbData
,
&
pTbData
,
tbDataPCmprFn
,
TD_EQ
);
taosRUnLockLatch
(
&
pMemTable
->
latch
);
*
ppTbData
=
p
?
*
(
STbData
**
)
p
:
NULL
;
}
...
...
@@ -363,10 +367,13 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
void
*
p
;
if
(
idx
<
0
)
{
p
=
taosArrayPush
(
pMemTable
->
aTbData
,
&
pTbData
);
}
else
{
p
=
taosArrayInsert
(
pMemTable
->
aTbData
,
idx
,
&
pTbData
);
idx
=
taosArrayGetSize
(
pMemTable
->
aTbData
);
}
taosWLockLatch
(
&
pMemTable
->
latch
);
p
=
taosArrayInsert
(
pMemTable
->
aTbData
,
idx
,
&
pTbData
);
taosWUnLockLatch
(
&
pMemTable
->
latch
);
if
(
p
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
...
...
@@ -605,46 +612,3 @@ void tsdbUnrefMemTable(SMemTable *pMemTable) {
tsdbMemTableDestroy
(
pMemTable
);
}
}
int32_t
tsdbTakeMemSnapshot
(
STsdb
*
pTsdb
,
SMemTable
**
ppMem
,
SMemTable
**
ppIMem
)
{
int32_t
code
=
0
;
// lock
code
=
taosThreadRwlockRdlock
(
&
pTsdb
->
rwLock
);
if
(
code
)
{
code
=
TAOS_SYSTEM_ERROR
(
code
);
goto
_exit
;
}
// take snapshot
*
ppMem
=
pTsdb
->
mem
;
*
ppIMem
=
pTsdb
->
imem
;
if
(
*
ppMem
)
{
tsdbRefMemTable
(
*
ppMem
);
}
if
(
*
ppIMem
)
{
tsdbRefMemTable
(
*
ppIMem
);
}
// unlock
code
=
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
if
(
code
)
{
code
=
TAOS_SYSTEM_ERROR
(
code
);
goto
_exit
;
}
_exit:
return
code
;
}
void
tsdbUntakeMemSnapshot
(
STsdb
*
pTsdb
,
SMemTable
*
pMem
,
SMemTable
*
pIMem
)
{
if
(
pMem
)
{
tsdbUnrefMemTable
(
pMem
);
}
if
(
pIMem
)
{
tsdbUnrefMemTable
(
pIMem
);
}
}
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbOpen.c
浏览文件 @
605b0d5e
...
...
@@ -66,7 +66,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
tfsMkdir
(
pVnode
->
pTfs
,
pTsdb
->
path
);
// open tsdb
if
(
tsdbFSOpen
(
pTsdb
,
&
pTsdb
->
pFS
)
<
0
)
{
if
(
tsdbFSOpen
(
pTsdb
)
<
0
)
{
goto
_err
;
}
...
...
@@ -88,7 +88,7 @@ _err:
int
tsdbClose
(
STsdb
**
pTsdb
)
{
if
(
*
pTsdb
)
{
taosThreadRwlockDestroy
(
&
(
*
pTsdb
)
->
rwLock
);
tsdbFSClose
(
(
*
pTsdb
)
->
pFS
);
tsdbFSClose
(
*
pTsdb
);
tsdbCloseCache
((
*
pTsdb
)
->
lruCache
);
taosMemoryFreeClear
(
*
pTsdb
);
}
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
605b0d5e
...
...
@@ -118,8 +118,7 @@ struct STsdbReader {
char
*
idStr
;
// query info handle, for debug purpose
int32_t
type
;
// query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
SBlockLoadSuppInfo
suppInfo
;
SMemTable
*
pMem
;
SMemTable
*
pIMem
;
STsdbReadSnap
*
pReadSnap
;
SIOCostSummary
cost
;
STSchema
*
pSchema
;
...
...
@@ -275,20 +274,18 @@ static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* cap
}
// init file iterator
static
int32_t
initFilesetIterator
(
SFilesetIter
*
pIter
,
const
STsdbFSState
*
pFState
,
int32_t
order
,
const
char
*
idstr
)
{
size_t
numOfFileset
=
taosArrayGetSize
(
pFState
->
aDFileSet
);
static
int32_t
initFilesetIterator
(
SFilesetIter
*
pIter
,
SArray
*
aDFileSet
,
int32_t
order
,
const
char
*
idstr
)
{
size_t
numOfFileset
=
taosArrayGetSize
(
aDFileSet
);
pIter
->
index
=
ASCENDING_TRAVERSE
(
order
)
?
-
1
:
numOfFileset
;
pIter
->
order
=
order
;
pIter
->
pFileList
=
taosArrayDup
(
pFState
->
aDFileSet
)
;
pIter
->
pFileList
=
aDFileSet
;
pIter
->
numOfFiles
=
numOfFileset
;
tsdbDebug
(
"init fileset iterator, total files:%d %s"
,
pIter
->
numOfFiles
,
idstr
);
return
TSDB_CODE_SUCCESS
;
}
static
void
cleanupFilesetIterator
(
SFilesetIter
*
pIter
)
{
taosArrayDestroy
(
pIter
->
pFileList
);
}
static
bool
filesetIteratorNext
(
SFilesetIter
*
pIter
,
STsdbReader
*
pReader
)
{
bool
asc
=
ASCENDING_TRAVERSE
(
pIter
->
order
);
int32_t
step
=
asc
?
1
:
-
1
;
...
...
@@ -1881,8 +1878,8 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
int32_t
backward
=
(
!
ASCENDING_TRAVERSE
(
pReader
->
order
));
STbData
*
d
=
NULL
;
if
(
pReader
->
pMem
!=
NULL
)
{
tsdbGetTbDataFromMemTable
(
pReader
->
pMem
,
pReader
->
suid
,
pBlockScanInfo
->
uid
,
&
d
);
if
(
pReader
->
p
ReadSnap
->
p
Mem
!=
NULL
)
{
tsdbGetTbDataFromMemTable
(
pReader
->
p
ReadSnap
->
p
Mem
,
pReader
->
suid
,
pBlockScanInfo
->
uid
,
&
d
);
if
(
d
!=
NULL
)
{
code
=
tsdbTbDataIterCreate
(
d
,
&
startKey
,
backward
,
&
pBlockScanInfo
->
iter
.
iter
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1902,8 +1899,8 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
}
STbData
*
di
=
NULL
;
if
(
pReader
->
pIMem
!=
NULL
)
{
tsdbGetTbDataFromMemTable
(
pReader
->
pIMem
,
pReader
->
suid
,
pBlockScanInfo
->
uid
,
&
di
);
if
(
pReader
->
p
ReadSnap
->
p
IMem
!=
NULL
)
{
tsdbGetTbDataFromMemTable
(
pReader
->
p
ReadSnap
->
p
IMem
,
pReader
->
suid
,
pBlockScanInfo
->
uid
,
&
di
);
if
(
di
!=
NULL
)
{
code
=
tsdbTbDataIterCreate
(
di
,
&
startKey
,
backward
,
&
pBlockScanInfo
->
iiter
.
iter
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1939,7 +1936,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
SArray
*
pDelData
=
taosArrayInit
(
4
,
sizeof
(
SDelData
));
SDelFile
*
pDelFile
=
tsdbFSStateGetDelFile
(
pTsdb
->
pFS
->
cState
)
;
SDelFile
*
pDelFile
=
pReader
->
pReadSnap
->
fs
.
pDelFile
;
if
(
pDelFile
)
{
SDelFReader
*
pDelFReader
=
NULL
;
code
=
tsdbDelFReaderOpen
(
&
pDelFReader
,
pDelFile
,
pTsdb
,
NULL
);
...
...
@@ -2836,8 +2833,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
SDataBlockIter
*
pBlockIter
=
&
pReader
->
status
.
blockIter
;
STsdbFSState
*
pFState
=
pReader
->
pTsdb
->
pFS
->
cState
;
initFilesetIterator
(
&
pReader
->
status
.
fileIter
,
pFState
,
pReader
->
order
,
pReader
->
idStr
);
code
=
tsdbTakeReadSnap
(
pReader
->
pTsdb
,
&
pReader
->
pReadSnap
);
if
(
code
)
goto
_err
;
initFilesetIterator
(
&
pReader
->
status
.
fileIter
,
(
*
ppReader
)
->
pReadSnap
->
fs
.
aDFileSet
,
pReader
->
order
,
pReader
->
idStr
);
resetDataBlockIterator
(
&
pReader
->
status
.
blockIter
,
pReader
->
order
);
// no data in files, let's try buffer in memory
...
...
@@ -2850,8 +2849,6 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
}
}
tsdbTakeMemSnapshot
(
pReader
->
pTsdb
,
&
pReader
->
pMem
,
&
pReader
->
pIMem
);
tsdbDebug
(
"%p total numOfTable:%d in this query %s"
,
pReader
,
numOfTables
,
pReader
->
idStr
);
return
code
;
...
...
@@ -2867,7 +2864,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
SBlockLoadSuppInfo
*
pSupInfo
=
&
pReader
->
suppInfo
;
tsdbUntake
MemSnapshot
(
pReader
->
pTsdb
,
pReader
->
pMem
,
pReader
->
pIMem
);
tsdbUntake
ReadSnap
(
pReader
->
pTsdb
,
pReader
->
pReadSnap
);
taosMemoryFreeClear
(
pSupInfo
->
plist
);
taosMemoryFree
(
pSupInfo
->
colIds
);
...
...
@@ -2880,7 +2877,6 @@ void tsdbReaderClose(STsdbReader* pReader) {
}
taosMemoryFree
(
pSupInfo
->
buildBuf
);
cleanupFilesetIterator
(
&
pReader
->
status
.
fileIter
);
cleanupDataBlockIterator
(
&
pReader
->
status
.
blockIter
);
destroyBlockScanInfo
(
pReader
->
status
.
pTableMap
);
blockDataDestroy
(
pReader
->
pResBlock
);
...
...
@@ -3087,8 +3083,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
tsdbDataFReaderClose
(
&
pReader
->
pFileReader
);
STsdbFSState
*
pFState
=
pReader
->
pTsdb
->
pFS
->
cState
;
initFilesetIterator
(
&
pReader
->
status
.
fileIter
,
pFState
,
pReader
->
order
,
pReader
->
idStr
);
initFilesetIterator
(
&
pReader
->
status
.
fileIter
,
pReader
->
pReadSnap
->
fs
.
aDFileSet
,
pReader
->
order
,
pReader
->
idStr
);
resetDataBlockIterator
(
&
pReader
->
status
.
blockIter
,
pReader
->
order
);
resetDataBlockScanInfo
(
pReader
->
status
.
pTableMap
);
...
...
@@ -3250,3 +3245,68 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbTakeReadSnap
(
STsdb
*
pTsdb
,
STsdbReadSnap
**
ppSnap
)
{
int32_t
code
=
0
;
// alloc
*
ppSnap
=
(
STsdbReadSnap
*
)
taosMemoryCalloc
(
1
,
sizeof
(
STsdbReadSnap
));
if
(
*
ppSnap
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
}
// lock
code
=
taosThreadRwlockRdlock
(
&
pTsdb
->
rwLock
);
if
(
code
)
{
code
=
TAOS_SYSTEM_ERROR
(
code
);
goto
_exit
;
}
// take snapshot
(
*
ppSnap
)
->
pMem
=
pTsdb
->
mem
;
(
*
ppSnap
)
->
pIMem
=
pTsdb
->
imem
;
if
((
*
ppSnap
)
->
pMem
)
{
tsdbRefMemTable
((
*
ppSnap
)
->
pMem
);
}
if
((
*
ppSnap
)
->
pIMem
)
{
tsdbRefMemTable
((
*
ppSnap
)
->
pIMem
);
}
// fs
code
=
tsdbFSRef
(
pTsdb
,
&
(
*
ppSnap
)
->
fs
);
if
(
code
)
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_exit
;
}
// unlock
code
=
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
if
(
code
)
{
code
=
TAOS_SYSTEM_ERROR
(
code
);
goto
_exit
;
}
tsdbTrace
(
"vgId:%d take read snapshot"
,
TD_VID
(
pTsdb
->
pVnode
));
_exit:
return
code
;
}
void
tsdbUntakeReadSnap
(
STsdb
*
pTsdb
,
STsdbReadSnap
*
pSnap
)
{
if
(
pSnap
)
{
if
(
pSnap
->
pMem
)
{
tsdbUnrefMemTable
(
pSnap
->
pMem
);
}
if
(
pSnap
->
pIMem
)
{
tsdbUnrefMemTable
(
pSnap
->
pIMem
);
}
tsdbFSUnref
(
pTsdb
,
&
pSnap
->
fs
);
taosMemoryFree
(
pSnap
);
}
tsdbTrace
(
"vgId:%d untake read snapshot"
,
TD_VID
(
pTsdb
->
pVnode
));
}
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
浏览文件 @
605b0d5e
...
...
@@ -459,7 +459,7 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
// open impl
// head
tsdb
DataFileName
(
pTsdb
,
pSet
,
TSDB_HEAD_FILE
,
fname
);
tsdb
HeadFileName
(
pTsdb
,
pSet
->
diskId
,
pSet
->
fid
,
pSet
->
pHeadF
,
fname
);
pReader
->
pHeadFD
=
taosOpenFile
(
fname
,
TD_FILE_READ
);
if
(
pReader
->
pHeadFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -467,7 +467,7 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
}
// data
tsdbDataFileName
(
pTsdb
,
pSet
,
TSDB_DATA_FILE
,
fname
);
tsdbDataFileName
(
pTsdb
,
pSet
->
diskId
,
pSet
->
fid
,
pSet
->
pDataF
,
fname
);
pReader
->
pDataFD
=
taosOpenFile
(
fname
,
TD_FILE_READ
);
if
(
pReader
->
pDataFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -475,7 +475,7 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
}
// last
tsdb
DataFileName
(
pTsdb
,
pSet
,
TSDB_LAST_FILE
,
fname
);
tsdb
LastFileName
(
pTsdb
,
pSet
->
diskId
,
pSet
->
fid
,
pSet
->
pLastF
,
fname
);
pReader
->
pLastFD
=
taosOpenFile
(
fname
,
TD_FILE_READ
);
if
(
pReader
->
pLastFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -483,7 +483,7 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
}
// sma
tsdb
DataFileName
(
pTsdb
,
pSet
,
TSDB_SMA_FILE
,
fname
);
tsdb
SmaFileName
(
pTsdb
,
pSet
->
diskId
,
pSet
->
fid
,
pSet
->
pSmaF
,
fname
);
pReader
->
pSmaFD
=
taosOpenFile
(
fname
,
TD_FILE_READ
);
if
(
pReader
->
pSmaFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -536,8 +536,8 @@ _err:
int32_t
tsdbReadBlockIdx
(
SDataFReader
*
pReader
,
SArray
*
aBlockIdx
,
uint8_t
**
ppBuf
)
{
int32_t
code
=
0
;
int64_t
offset
=
pReader
->
pSet
->
fHead
.
offset
;
int64_t
size
=
pReader
->
pSet
->
fHead
.
size
-
offset
;
int64_t
offset
=
pReader
->
pSet
->
pHeadF
->
offset
;
int64_t
size
=
pReader
->
pSet
->
pHeadF
->
size
-
offset
;
uint8_t
*
pBuf
=
NULL
;
int64_t
n
;
uint32_t
delimiter
;
...
...
@@ -1211,17 +1211,6 @@ _err:
}
// SDataFWriter ====================================================
struct
SDataFWriter
{
STsdb
*
pTsdb
;
SDFileSet
wSet
;
TdFilePtr
pHeadFD
;
TdFilePtr
pDataFD
;
TdFilePtr
pLastFD
;
TdFilePtr
pSmaFD
;
};
SDFileSet
*
tsdbDataFWriterGetWSet
(
SDataFWriter
*
pWriter
)
{
return
&
pWriter
->
wSet
;
}
int32_t
tsdbDataFWriterOpen
(
SDataFWriter
**
ppWriter
,
STsdb
*
pTsdb
,
SDFileSet
*
pSet
)
{
int32_t
code
=
0
;
int32_t
flag
;
...
...
@@ -1237,12 +1226,20 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
goto
_err
;
}
pWriter
->
pTsdb
=
pTsdb
;
pWriter
->
wSet
=
*
pSet
;
pSet
=
&
pWriter
->
wSet
;
pWriter
->
wSet
=
(
SDFileSet
){.
diskId
=
pSet
->
diskId
,
.
fid
=
pSet
->
fid
,
.
pHeadF
=
&
pWriter
->
fHead
,
.
pDataF
=
&
pWriter
->
fData
,
.
pLastF
=
&
pWriter
->
fLast
,
.
pSmaF
=
&
pWriter
->
fSma
};
pWriter
->
fHead
=
*
pSet
->
pHeadF
;
pWriter
->
fData
=
*
pSet
->
pDataF
;
pWriter
->
fLast
=
*
pSet
->
pLastF
;
pWriter
->
fSma
=
*
pSet
->
pSmaF
;
// head
flag
=
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
;
tsdb
DataFileName
(
pTsdb
,
pSet
,
TSDB_HEAD_FILE
,
fname
);
tsdb
HeadFileName
(
pTsdb
,
pWriter
->
wSet
.
diskId
,
pWriter
->
wSet
.
fid
,
&
pWriter
->
fHead
,
fname
);
pWriter
->
pHeadFD
=
taosOpenFile
(
fname
,
flag
);
if
(
pWriter
->
pHeadFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -1257,28 +1254,28 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
ASSERT
(
n
==
TSDB_FHDR_SIZE
);
p
Set
->
fHead
.
size
+=
TSDB_FHDR_SIZE
;
p
Writer
->
fHead
.
size
+=
TSDB_FHDR_SIZE
;
// data
if
(
p
Set
->
fData
.
size
==
0
)
{
if
(
p
Writer
->
fData
.
size
==
0
)
{
flag
=
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
;
}
else
{
flag
=
TD_FILE_WRITE
;
}
tsdbDataFileName
(
pTsdb
,
p
Set
,
TSDB_DATA_FILE
,
fname
);
tsdbDataFileName
(
pTsdb
,
p
Writer
->
wSet
.
diskId
,
pWriter
->
wSet
.
fid
,
&
pWriter
->
fData
,
fname
);
pWriter
->
pDataFD
=
taosOpenFile
(
fname
,
flag
);
if
(
pWriter
->
pDataFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
if
(
p
Set
->
fData
.
size
==
0
)
{
if
(
p
Writer
->
fData
.
size
==
0
)
{
n
=
taosWriteFile
(
pWriter
->
pDataFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
p
Set
->
fData
.
size
+=
TSDB_FHDR_SIZE
;
p
Writer
->
fData
.
size
+=
TSDB_FHDR_SIZE
;
}
else
{
n
=
taosLSeekFile
(
pWriter
->
pDataFD
,
0
,
SEEK_END
);
if
(
n
<
0
)
{
...
...
@@ -1286,29 +1283,29 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
goto
_err
;
}
ASSERT
(
n
==
p
Set
->
fData
.
size
);
ASSERT
(
n
==
p
Writer
->
fData
.
size
);
}
// last
if
(
p
Set
->
fLast
.
size
==
0
)
{
if
(
p
Writer
->
fLast
.
size
==
0
)
{
flag
=
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
;
}
else
{
flag
=
TD_FILE_WRITE
;
}
tsdb
DataFileName
(
pTsdb
,
pSet
,
TSDB_LAST_FILE
,
fname
);
tsdb
LastFileName
(
pTsdb
,
pWriter
->
wSet
.
diskId
,
pWriter
->
wSet
.
fid
,
&
pWriter
->
fLast
,
fname
);
pWriter
->
pLastFD
=
taosOpenFile
(
fname
,
flag
);
if
(
pWriter
->
pLastFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
if
(
p
Set
->
fLast
.
size
==
0
)
{
if
(
p
Writer
->
fLast
.
size
==
0
)
{
n
=
taosWriteFile
(
pWriter
->
pLastFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
p
Set
->
fLast
.
size
+=
TSDB_FHDR_SIZE
;
p
Writer
->
fLast
.
size
+=
TSDB_FHDR_SIZE
;
}
else
{
n
=
taosLSeekFile
(
pWriter
->
pLastFD
,
0
,
SEEK_END
);
if
(
n
<
0
)
{
...
...
@@ -1316,29 +1313,29 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
goto
_err
;
}
ASSERT
(
n
==
p
Set
->
fLast
.
size
);
ASSERT
(
n
==
p
Writer
->
fLast
.
size
);
}
// sma
if
(
p
Set
->
fSma
.
size
==
0
)
{
if
(
p
Writer
->
fSma
.
size
==
0
)
{
flag
=
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
;
}
else
{
flag
=
TD_FILE_WRITE
;
}
tsdb
DataFileName
(
pTsdb
,
pSet
,
TSDB_SMA_FILE
,
fname
);
tsdb
SmaFileName
(
pTsdb
,
pWriter
->
wSet
.
diskId
,
pWriter
->
wSet
.
fid
,
&
pWriter
->
fSma
,
fname
);
pWriter
->
pSmaFD
=
taosOpenFile
(
fname
,
flag
);
if
(
pWriter
->
pSmaFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
if
(
p
Set
->
fSma
.
size
==
0
)
{
if
(
p
Writer
->
fSma
.
size
==
0
)
{
n
=
taosWriteFile
(
pWriter
->
pSmaFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
p
Set
->
fSma
.
size
+=
TSDB_FHDR_SIZE
;
p
Writer
->
fSma
.
size
+=
TSDB_FHDR_SIZE
;
}
else
{
n
=
taosLSeekFile
(
pWriter
->
pSmaFD
,
0
,
SEEK_END
);
if
(
n
<
0
)
{
...
...
@@ -1346,7 +1343,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
goto
_err
;
}
ASSERT
(
n
==
p
Set
->
fSma
.
size
);
ASSERT
(
n
==
p
Writer
->
fSma
.
size
);
}
*
ppWriter
=
pWriter
;
...
...
@@ -1418,22 +1415,76 @@ _err:
int32_t
tsdbUpdateDFileSetHeader
(
SDataFWriter
*
pWriter
)
{
int32_t
code
=
0
;
int64_t
n
;
char
hdr
[
TSDB_FHDR_SIZE
];
// head ==============
code
=
tsdbUpdateDFileHdr
(
pWriter
->
pHeadFD
,
&
pWriter
->
wSet
,
TSDB_HEAD_FILE
);
if
(
code
)
goto
_err
;
memset
(
hdr
,
0
,
TSDB_FHDR_SIZE
);
tPutHeadFile
(
hdr
,
&
pWriter
->
fHead
);
taosCalcChecksumAppend
(
0
,
hdr
,
TSDB_FHDR_SIZE
);
n
=
taosLSeekFile
(
pWriter
->
pHeadFD
,
0
,
SEEK_SET
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
n
=
taosWriteFile
(
pWriter
->
pHeadFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// data ==============
code
=
tsdbUpdateDFileHdr
(
pWriter
->
pHeadFD
,
&
pWriter
->
wSet
,
TSDB_DATA_FILE
);
if
(
code
)
goto
_err
;
memset
(
hdr
,
0
,
TSDB_FHDR_SIZE
);
tPutDataFile
(
hdr
,
&
pWriter
->
fData
);
taosCalcChecksumAppend
(
0
,
hdr
,
TSDB_FHDR_SIZE
);
n
=
taosLSeekFile
(
pWriter
->
pDataFD
,
0
,
SEEK_SET
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
n
=
taosWriteFile
(
pWriter
->
pDataFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// last ==============
code
=
tsdbUpdateDFileHdr
(
pWriter
->
pHeadFD
,
&
pWriter
->
wSet
,
TSDB_LAST_FILE
);
if
(
code
)
goto
_err
;
memset
(
hdr
,
0
,
TSDB_FHDR_SIZE
);
tPutLastFile
(
hdr
,
&
pWriter
->
fLast
);
taosCalcChecksumAppend
(
0
,
hdr
,
TSDB_FHDR_SIZE
);
n
=
taosLSeekFile
(
pWriter
->
pLastFD
,
0
,
SEEK_SET
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
n
=
taosWriteFile
(
pWriter
->
pLastFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// sma ==============
code
=
tsdbUpdateDFileHdr
(
pWriter
->
pHeadFD
,
&
pWriter
->
wSet
,
TSDB_SMA_FILE
);
if
(
code
)
goto
_err
;
memset
(
hdr
,
0
,
TSDB_FHDR_SIZE
);
tPutSmaFile
(
hdr
,
&
pWriter
->
fSma
);
taosCalcChecksumAppend
(
0
,
hdr
,
TSDB_FHDR_SIZE
);
n
=
taosLSeekFile
(
pWriter
->
pSmaFD
,
0
,
SEEK_SET
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
n
=
taosWriteFile
(
pWriter
->
pSmaFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
return
code
;
...
...
@@ -1444,7 +1495,7 @@ _err:
int32_t
tsdbWriteBlockIdx
(
SDataFWriter
*
pWriter
,
SArray
*
aBlockIdx
,
uint8_t
**
ppBuf
)
{
int32_t
code
=
0
;
SHeadFile
*
pHeadFile
=
&
pWriter
->
wSet
.
fHead
;
SHeadFile
*
pHeadFile
=
&
pWriter
->
fHead
;
uint8_t
*
pBuf
=
NULL
;
int64_t
size
;
int64_t
n
;
...
...
@@ -1494,7 +1545,7 @@ _err:
int32_t
tsdbWriteBlock
(
SDataFWriter
*
pWriter
,
SMapData
*
mBlock
,
uint8_t
**
ppBuf
,
SBlockIdx
*
pBlockIdx
)
{
int32_t
code
=
0
;
SHeadFile
*
pHeadFile
=
&
pWriter
->
wSet
.
fHead
;
SHeadFile
*
pHeadFile
=
&
pWriter
->
fHead
;
SBlockDataHdr
hdr
=
{.
delimiter
=
TSDB_FILE_DLMT
,
.
suid
=
pBlockIdx
->
suid
,
.
uid
=
pBlockIdx
->
uid
};
uint8_t
*
pBuf
=
NULL
;
int64_t
size
;
...
...
@@ -1831,9 +1882,9 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
pSubBlock
->
nRow
=
pBlockData
->
nRow
;
pSubBlock
->
cmprAlg
=
cmprAlg
;
if
(
pBlock
->
last
)
{
pSubBlock
->
offset
=
pWriter
->
wSet
.
fLast
.
size
;
pSubBlock
->
offset
=
pWriter
->
fLast
.
size
;
}
else
{
pSubBlock
->
offset
=
pWriter
->
wSet
.
fData
.
size
;
pSubBlock
->
offset
=
pWriter
->
fData
.
size
;
}
// ======================= BLOCK DATA =======================
...
...
@@ -1881,9 +1932,9 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
pSubBlock
->
szBlock
=
pSubBlock
->
szBlockCol
+
sizeof
(
TSCKSUM
)
+
nData
;
if
(
pBlock
->
last
)
{
pWriter
->
wSet
.
fLast
.
size
+=
pSubBlock
->
szBlock
;
pWriter
->
fLast
.
size
+=
pSubBlock
->
szBlock
;
}
else
{
pWriter
->
wSet
.
fData
.
size
+=
pSubBlock
->
szBlock
;
pWriter
->
fData
.
size
+=
pSubBlock
->
szBlock
;
}
// ======================= BLOCK SMA =======================
...
...
@@ -1896,8 +1947,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
if
(
code
)
goto
_err
;
if
(
pSubBlock
->
nSma
>
0
)
{
pSubBlock
->
sOffset
=
pWriter
->
wSet
.
fSma
.
size
;
pWriter
->
wSet
.
fSma
.
size
+=
(
sizeof
(
SColumnDataAgg
)
*
pSubBlock
->
nSma
+
sizeof
(
TSCKSUM
));
pSubBlock
->
sOffset
=
pWriter
->
fSma
.
size
;
pWriter
->
fSma
.
size
+=
(
sizeof
(
SColumnDataAgg
)
*
pSubBlock
->
nSma
+
sizeof
(
TSCKSUM
));
}
_exit:
...
...
@@ -1924,8 +1975,8 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
char
fNameTo
[
TSDB_FILENAME_LEN
];
// head
tsdb
DataFileName
(
pTsdb
,
pSetFrom
,
TSDB_HEAD_FILE
,
fNameFrom
);
tsdb
DataFileName
(
pTsdb
,
pSetTo
,
TSDB_HEAD_FILE
,
fNameTo
);
tsdb
HeadFileName
(
pTsdb
,
pSetFrom
->
diskId
,
pSetFrom
->
fid
,
pSetFrom
->
pHeadF
,
fNameFrom
);
tsdb
HeadFileName
(
pTsdb
,
pSetTo
->
diskId
,
pSetTo
->
fid
,
pSetTo
->
pHeadF
,
fNameTo
);
pOutFD
=
taosOpenFile
(
fNameTo
,
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
);
if
(
pOutFD
==
NULL
)
{
...
...
@@ -1939,7 +1990,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
goto
_err
;
}
n
=
taosFSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
fHead
.
size
);
n
=
taosFSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
pHeadF
->
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
@@ -1948,8 +1999,8 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
taosCloseFile
(
&
PInFD
);
// data
tsdbDataFileName
(
pTsdb
,
pSetFrom
,
TSDB_DATA_FILE
,
fNameFrom
);
tsdbDataFileName
(
pTsdb
,
pSetTo
,
TSDB_DATA_FILE
,
fNameTo
);
tsdbDataFileName
(
pTsdb
,
pSetFrom
->
diskId
,
pSetFrom
->
fid
,
pSetFrom
->
pDataF
,
fNameFrom
);
tsdbDataFileName
(
pTsdb
,
pSetTo
->
diskId
,
pSetTo
->
fid
,
pSetTo
->
pDataF
,
fNameTo
);
pOutFD
=
taosOpenFile
(
fNameTo
,
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
);
if
(
pOutFD
==
NULL
)
{
...
...
@@ -1963,7 +2014,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
goto
_err
;
}
n
=
taosFSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
fData
.
size
);
n
=
taosFSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
pDataF
->
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
@@ -1972,8 +2023,9 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
taosCloseFile
(
&
PInFD
);
// last
tsdbDataFileName
(
pTsdb
,
pSetFrom
,
TSDB_LAST_FILE
,
fNameFrom
);
tsdbDataFileName
(
pTsdb
,
pSetTo
,
TSDB_LAST_FILE
,
fNameTo
);
tsdbLastFileName
(
pTsdb
,
pSetFrom
->
diskId
,
pSetFrom
->
fid
,
pSetFrom
->
pLastF
,
fNameFrom
);
tsdbLastFileName
(
pTsdb
,
pSetTo
->
diskId
,
pSetTo
->
fid
,
pSetTo
->
pLastF
,
fNameTo
);
pOutFD
=
taosOpenFile
(
fNameTo
,
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
);
if
(
pOutFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -1986,7 +2038,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
goto
_err
;
}
n
=
taosFSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
fLast
.
size
);
n
=
taosFSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
pLastF
->
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
@@ -1995,8 +2047,8 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
taosCloseFile
(
&
PInFD
);
// sma
tsdb
DataFileName
(
pTsdb
,
pSetFrom
,
TSDB_SMA_FILE
,
fNameFrom
);
tsdb
DataFileName
(
pTsdb
,
pSetTo
,
TSDB_SMA_FILE
,
fNameTo
);
tsdb
SmaFileName
(
pTsdb
,
pSetFrom
->
diskId
,
pSetFrom
->
fid
,
pSetFrom
->
pSmaF
,
fNameFrom
);
tsdb
SmaFileName
(
pTsdb
,
pSetTo
->
diskId
,
pSetTo
->
fid
,
pSetTo
->
pSmaF
,
fNameTo
);
pOutFD
=
taosOpenFile
(
fNameTo
,
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
);
if
(
pOutFD
==
NULL
)
{
...
...
@@ -2010,7 +2062,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
goto
_err
;
}
n
=
taosFSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
fSma
.
size
);
n
=
taosFSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
pSmaF
->
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
source/dnode/vnode/src/tsdb/tsdbRetention.c
浏览文件 @
605b0d5e
...
...
@@ -15,90 +15,99 @@
#include "tsdb.h"
static
int32_t
tsdbDoRetentionImpl
(
STsdb
*
pTsdb
,
int64_t
now
,
int8_t
try
,
int8_t
*
canDo
)
{
int32_t
code
=
0
;
STsdbFSState
*
pState
;
if
(
try
)
{
pState
=
pTsdb
->
pFS
->
cState
;
*
canDo
=
0
;
}
else
{
pState
=
pTsdb
->
pFS
->
nState
;
}
for
(
int32_t
iSet
=
0
;
iSet
<
taosArrayGetSize
(
pState
->
aDFileSet
);
iSet
++
)
{
SDFileSet
*
pDFileSet
=
(
SDFileSet
*
)
taosArrayGet
(
pState
->
aDFileSet
,
iSet
);
int32_t
expLevel
=
tsdbFidLevel
(
pDFileSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
);
static
bool
tsdbShouldDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
)
{
for
(
int32_t
iSet
=
0
;
iSet
<
taosArrayGetSize
(
pTsdb
->
fs
.
aDFileSet
);
iSet
++
)
{
SDFileSet
*
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
pTsdb
->
fs
.
aDFileSet
,
iSet
);
int32_t
expLevel
=
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
);
SDiskID
did
;
// check
if
(
expLevel
==
pDFileSet
->
diskId
.
id
)
continue
;
if
(
expLevel
==
pSet
->
diskId
.
level
)
continue
;
// delete or move
if
(
expLevel
<
0
)
{
if
(
try
)
{
*
canDo
=
1
;
}
else
{
tsdbFSStateDeleteDFileSet
(
pState
,
pDFileSet
->
fid
);
iSet
--
;
}
return
true
;
}
else
{
// alloc
if
(
tfsAllocDisk
(
pTsdb
->
pVnode
->
pTfs
,
expLevel
,
&
did
)
<
0
)
{
code
=
terrno
;
goto
_exit
;
return
false
;
}
if
(
did
.
level
==
p
DFile
Set
->
diskId
.
level
)
continue
;
if
(
did
.
level
==
pSet
->
diskId
.
level
)
continue
;
if
(
try
)
{
*
canDo
=
1
;
}
else
{
// copy the file to new disk
SDFileSet
nDFileSet
=
*
pDFileSet
;
nDFileSet
.
diskId
=
did
;
tfsMkdirRecurAt
(
pTsdb
->
pVnode
->
pTfs
,
pTsdb
->
path
,
did
);
code
=
tsdbDFileSetCopy
(
pTsdb
,
pDFileSet
,
&
nDFileSet
);
if
(
code
)
goto
_exit
;
code
=
tsdbFSStateUpsertDFileSet
(
pState
,
&
nDFileSet
);
if
(
code
)
goto
_exit
;
}
return
true
;
}
}
_exit:
return
code
;
return
false
;
}
int32_t
tsdbDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
)
{
int32_t
code
=
0
;
int8_t
canDo
;
// try
tsdbDoRetentionImpl
(
pTsdb
,
now
,
1
,
&
canDo
);
if
(
!
canDo
)
goto
_exit
;
// begin
code
=
tsdbFSBegin
(
pTsdb
->
pFS
)
;
if
(
code
)
goto
_err
;
if
(
!
tsdbShouldDoRetention
(
pTsdb
,
now
))
{
return
code
;
}
// do retention
code
=
tsdbDoRetentionImpl
(
pTsdb
,
now
,
0
,
NULL
);
STsdbFS
fs
;
code
=
tsdbFSCopy
(
pTsdb
,
&
fs
);
if
(
code
)
goto
_err
;
// commit
code
=
tsdbFSCommit
(
pTsdb
->
pFS
);
for
(
int32_t
iSet
=
0
;
iSet
<
taosArrayGetSize
(
fs
.
aDFileSet
);
iSet
++
)
{
SDFileSet
*
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
pTsdb
->
fs
.
aDFileSet
,
iSet
);
int32_t
expLevel
=
tsdbFidLevel
(
pSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
);
SDiskID
did
;
if
(
expLevel
<
0
)
{
taosMemoryFree
(
pSet
->
pHeadF
);
taosMemoryFree
(
pSet
->
pDataF
);
taosMemoryFree
(
pSet
->
pLastF
);
taosMemoryFree
(
pSet
->
pSmaF
);
taosArrayRemove
(
fs
.
aDFileSet
,
iSet
);
iSet
--
;
}
else
{
if
(
tfsAllocDisk
(
pTsdb
->
pVnode
->
pTfs
,
expLevel
,
&
did
)
<
0
)
{
code
=
terrno
;
goto
_exit
;
}
if
(
did
.
level
==
pSet
->
diskId
.
level
)
continue
;
// copy file to new disk (todo)
SDFileSet
fSet
=
*
pSet
;
fSet
.
diskId
=
did
;
code
=
tsdbDFileSetCopy
(
pTsdb
,
pSet
,
&
fSet
);
if
(
code
)
goto
_err
;
code
=
tsdbFSUpsertFSet
(
&
fs
,
&
fSet
);
if
(
code
)
goto
_err
;
}
/* code */
}
// do change fs
code
=
tsdbFSCommit1
(
pTsdb
,
&
fs
);
if
(
code
)
goto
_err
;
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
code
=
tsdbFSCommit2
(
pTsdb
,
&
fs
);
if
(
code
)
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_err
;
}
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
tsdbFSDestroy
(
&
fs
);
_exit:
return
code
;
_err:
tsdbError
(
"vgId:%d tsdb do retention failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbFSRollback
(
pTsdb
->
pFS
);
ASSERT
(
0
);
// tsdbFSRollback(pTsdb->pFS);
return
code
;
}
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
浏览文件 @
605b0d5e
...
...
@@ -20,6 +20,7 @@ struct STsdbSnapReader {
STsdb
*
pTsdb
;
int64_t
sver
;
int64_t
ever
;
STsdbFS
fs
;
// for data file
int8_t
dataDone
;
int32_t
fid
;
...
...
@@ -45,7 +46,8 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
while
(
true
)
{
if
(
pReader
->
pDataFReader
==
NULL
)
{
SDFileSet
*
pSet
=
tsdbFSStateGetDFileSet
(
pTsdb
->
pFS
->
cState
,
pReader
->
fid
,
TD_GT
);
SDFileSet
*
pSet
=
taosArraySearch
(
pReader
->
fs
.
aDFileSet
,
&
(
SDFileSet
){.
fid
=
pReader
->
fid
},
tDFileSetCmprFn
,
TD_GT
);
if
(
pSet
==
NULL
)
goto
_exit
;
...
...
@@ -159,7 +161,7 @@ _err:
static
int32_t
tsdbSnapReadDel
(
STsdbSnapReader
*
pReader
,
uint8_t
**
ppData
)
{
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pReader
->
pTsdb
;
SDelFile
*
pDelFile
=
p
Tsdb
->
pFS
->
cState
->
pDelFile
;
SDelFile
*
pDelFile
=
p
Reader
->
fs
.
pDelFile
;
if
(
pReader
->
pDelFReader
==
NULL
)
{
if
(
pDelFile
==
NULL
)
{
...
...
@@ -254,6 +256,24 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe
pReader
->
sver
=
sver
;
pReader
->
ever
=
ever
;
code
=
taosThreadRwlockRdlock
(
&
pTsdb
->
rwLock
);
if
(
code
)
{
code
=
TAOS_SYSTEM_ERROR
(
code
);
goto
_err
;
}
code
=
tsdbFSRef
(
pTsdb
,
&
pReader
->
fs
);
if
(
code
)
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_err
;
}
code
=
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
if
(
code
)
{
code
=
TAOS_SYSTEM_ERROR
(
code
);
goto
_err
;
}
pReader
->
fid
=
INT32_MIN
;
pReader
->
aBlockIdx
=
taosArrayInit
(
0
,
sizeof
(
SBlockIdx
));
if
(
pReader
->
aBlockIdx
==
NULL
)
{
...
...
@@ -305,6 +325,8 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
taosArrayDestroy
(
pReader
->
aDelIdx
);
taosArrayDestroy
(
pReader
->
aDelData
);
tsdbFSUnref
(
pReader
->
pTsdb
,
&
pReader
->
fs
);
tsdbInfo
(
"vgId:%d vnode snapshot tsdb reader closed"
,
TD_VID
(
pReader
->
pTsdb
->
pVnode
));
taosMemoryFree
(
pReader
);
...
...
@@ -358,6 +380,7 @@ struct STsdbSnapWriter {
STsdb
*
pTsdb
;
int64_t
sver
;
int64_t
ever
;
STsdbFS
fs
;
// config
int32_t
minutes
;
...
...
@@ -798,7 +821,7 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
code
=
tsdbWriteBlockIdx
(
pWriter
->
pDataFWriter
,
pWriter
->
aBlockIdxW
,
NULL
);
if
(
code
)
goto
_err
;
code
=
tsdbFS
StateUpsertDFileSet
(
pTsdb
->
pFS
->
nState
,
tsdbDataFWriterGetWSet
(
pWriter
->
pDataFWriter
)
);
code
=
tsdbFS
UpsertFSet
(
&
pWriter
->
fs
,
&
pWriter
->
pDataFWriter
->
wSet
);
if
(
code
)
goto
_err
;
code
=
tsdbDataFWriterClose
(
&
pWriter
->
pDataFWriter
,
1
);
...
...
@@ -843,7 +866,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
pWriter
->
fid
=
fid
;
// read
SDFileSet
*
pSet
=
t
sdbFSStateGetDFileSet
(
pTsdb
->
pFS
->
nState
,
fid
,
TD_EQ
);
SDFileSet
*
pSet
=
t
aosArraySearch
(
pWriter
->
fs
.
aDFileSet
,
&
(
SDFileSet
){.
fid
=
fid
},
tDFileSetCmprFn
,
TD_EQ
);
if
(
pSet
)
{
code
=
tsdbDataFReaderOpen
(
&
pWriter
->
pDataFReader
,
pTsdb
,
pSet
);
if
(
code
)
goto
_err
;
...
...
@@ -863,22 +886,26 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
tBlockDataReset
(
&
pWriter
->
bDataR
);
// write
SDFileSet
wSet
;
SHeadFile
fHead
;
SDataFile
fData
;
SLastFile
fLast
;
SSmaFile
fSma
;
SDFileSet
wSet
=
{.
pHeadF
=
&
fHead
,
.
pDataF
=
&
fData
,
.
pLastF
=
&
fLast
,
.
pSmaF
=
&
fSma
};
if
(
pSet
)
{
wSet
=
(
SDFileSet
){.
diskId
=
pSet
->
diskId
,
.
fid
=
fid
,
.
fHead
=
{.
commitID
=
pWriter
->
commitID
,
.
offset
=
0
,
.
size
=
0
},
.
fData
=
pSet
->
fData
,
.
fLast
=
{.
commitID
=
pWriter
->
commitID
,
.
size
=
0
},
.
fSma
=
pSet
->
fSma
}
;
wSet
.
diskId
=
pSet
->
diskId
;
wSet
.
fid
=
fid
;
fHead
=
(
SHeadFile
){.
commitID
=
pWriter
->
commitID
,
.
offset
=
0
,
.
size
=
0
};
fData
=
*
pSet
->
pDataF
;
fLast
=
(
SLastFile
){.
commitID
=
pWriter
->
commitID
,
.
size
=
0
};
fSma
=
*
pSet
->
pSmaF
;
}
else
{
wSet
=
(
SDFileSet
){.
diskId
=
(
SDiskID
){.
level
=
0
,
.
id
=
0
},
.
fid
=
fid
,
.
fHead
=
{.
commitID
=
pWriter
->
commitID
,
.
offset
=
0
,
.
size
=
0
},
.
fData
=
{.
commitID
=
pWriter
->
commitID
,
.
size
=
0
},
.
fLast
=
{.
commitID
=
pWriter
->
commitID
,
.
size
=
0
},
.
fSma
=
{.
commitID
=
pWriter
->
commitID
,
.
size
=
0
}
};
wSet
.
diskId
=
(
SDiskID
){.
level
=
0
,
.
id
=
0
};
wSet
.
fid
=
fid
;
fHead
=
(
SHeadFile
){.
commitID
=
pWriter
->
commitID
,
.
offset
=
0
,
.
size
=
0
};
fData
=
(
SDataFile
){.
commitID
=
pWriter
->
commitID
,
.
size
=
0
};
fLast
=
(
SLastFile
){.
commitID
=
pWriter
->
commitID
,
.
size
=
0
};
fSma
=
(
SSmaFile
){.
commitID
=
pWriter
->
commitID
,
.
size
=
0
};
}
code
=
tsdbDataFWriterOpen
(
&
pWriter
->
pDataFWriter
,
pTsdb
,
&
wSet
);
...
...
@@ -907,7 +934,7 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
STsdb
*
pTsdb
=
pWriter
->
pTsdb
;
if
(
pWriter
->
pDelFWriter
==
NULL
)
{
SDelFile
*
pDelFile
=
tsdbFSStateGetDelFile
(
pTsdb
->
pFS
->
nState
)
;
SDelFile
*
pDelFile
=
pWriter
->
fs
.
pDelFile
;
// reader
if
(
pDelFile
)
{
...
...
@@ -1017,7 +1044,7 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
code
=
tsdbUpdateDelFileHdr
(
pWriter
->
pDelFWriter
);
if
(
code
)
goto
_err
;
code
=
tsdbFS
StateUpsertDelFile
(
pTsdb
->
pFS
->
nState
,
&
pWriter
->
pDelFWriter
->
fDel
);
code
=
tsdbFS
UpsertDelFile
(
&
pWriter
->
fs
,
&
pWriter
->
pDelFWriter
->
fDel
);
if
(
code
)
goto
_err
;
code
=
tsdbDelFWriterClose
(
&
pWriter
->
pDelFWriter
,
1
);
...
...
@@ -1051,6 +1078,9 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
pWriter
->
sver
=
sver
;
pWriter
->
ever
=
ever
;
code
=
tsdbFSCopy
(
pTsdb
,
&
pWriter
->
fs
);
if
(
code
)
goto
_err
;
// config
pWriter
->
minutes
=
pTsdb
->
keepCfg
.
days
;
pWriter
->
precision
=
pTsdb
->
keepCfg
.
precision
;
...
...
@@ -1096,9 +1126,6 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
goto
_err
;
}
code
=
tsdbFSBegin
(
pTsdb
->
pFS
);
if
(
code
)
goto
_err
;
*
ppWriter
=
pWriter
;
return
code
;
...
...
@@ -1113,8 +1140,9 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
STsdbSnapWriter
*
pWriter
=
*
ppWriter
;
if
(
rollback
)
{
code
=
tsdbFSRollback
(
pWriter
->
pTsdb
->
pFS
);
if
(
code
)
goto
_err
;
ASSERT
(
0
);
// code = tsdbFSRollback(pWriter->pTsdb->pFS);
// if (code) goto _err;
}
else
{
code
=
tsdbSnapWriteDataEnd
(
pWriter
);
if
(
code
)
goto
_err
;
...
...
@@ -1122,7 +1150,10 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
code
=
tsdbSnapWriteDelEnd
(
pWriter
);
if
(
code
)
goto
_err
;
code
=
tsdbFSCommit
(
pWriter
->
pTsdb
->
pFS
);
code
=
tsdbFSCommit1
(
pWriter
->
pTsdb
,
&
pWriter
->
fs
);
if
(
code
)
goto
_err
;
code
=
tsdbFSCommit2
(
pWriter
->
pTsdb
,
&
pWriter
->
fs
);
if
(
code
)
goto
_err
;
}
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
605b0d5e
...
...
@@ -315,6 +315,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
STableScanInfo
*
pTSInfo
=
pInfo
->
pTableScanOp
->
info
;
tsdbReaderClose
(
pTSInfo
->
dataReader
);
pTSInfo
->
dataReader
=
NULL
;
#if 0
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
...
...
@@ -349,8 +352,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
#ifndef NDEBUG
qDebug
(
"switch to next table %ld (cursor %d), %ld rows returned"
,
uid
,
p
TableScanInfo
->
currentTable
,
p
Info
->
pTableScanOp
->
resultInfo
.
totalRows
);
qDebug
(
"switch to next table %ld (cursor %d), %ld rows returned"
,
uid
,
pTableScanInfo
->
currentTable
,
pInfo
->
pTableScanOp
->
resultInfo
.
totalRows
);
pInfo
->
pTableScanOp
->
resultInfo
.
totalRows
=
0
;
#endif
...
...
@@ -367,6 +370,14 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
// TODO after dropping table, table may be not found
ASSERT
(
found
);
if
(
pTableScanInfo
->
dataReader
==
NULL
)
{
if
(
tsdbReaderOpen
(
pTableScanInfo
->
readHandle
.
vnode
,
&
pTableScanInfo
->
cond
,
pTaskInfo
->
tableqinfoList
.
pTableList
,
&
pTableScanInfo
->
dataReader
,
NULL
)
<
0
||
pTableScanInfo
->
dataReader
==
NULL
)
{
ASSERT
(
0
);
}
}
tsdbSetTableId
(
pTableScanInfo
->
dataReader
,
uid
);
int64_t
oldSkey
=
pTableScanInfo
->
cond
.
twindows
.
skey
;
pTableScanInfo
->
cond
.
twindows
.
skey
=
ts
+
1
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
605b0d5e
...
...
@@ -740,7 +740,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
static
void
destroyBlockDistScanOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SBlockDistInfo
*
pDistInfo
=
(
SBlockDistInfo
*
)
param
;
blockDataDestroy
(
pDistInfo
->
pResBlock
);
tsdbReaderClose
(
pDistInfo
->
pHandle
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -982,6 +982,9 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
if
(
!
pResult
)
{
blockDataCleanup
(
pSDB
);
*
pRowIndex
=
0
;
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pTableScanOp
->
info
;
tsdbReaderClose
(
pTableScanInfo
->
dataReader
);
pTableScanInfo
->
dataReader
=
NULL
;
return
NULL
;
}
...
...
@@ -1003,6 +1006,9 @@ static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_
}
if
(
!
pResult
)
{
pInfo
->
updateWin
=
(
STimeWindow
){.
skey
=
INT64_MIN
,
.
ekey
=
INT64_MAX
};
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pTableScanOp
->
info
;
tsdbReaderClose
(
pTableScanInfo
->
dataReader
);
pTableScanInfo
->
dataReader
=
NULL
;
return
NULL
;
}
...
...
@@ -2047,8 +2053,8 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
uint64_t
suid
=
pInfo
->
pCur
->
mr
.
me
.
ctbEntry
.
suid
;
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
suid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get super table meta, cname:%s, suid:0x%"
PRIx64
", code:%s, %s"
,
pInfo
->
pCur
->
mr
.
me
.
name
,
suid
,
tstrerror
(
terrno
),
GET_TASKID
(
pTaskInfo
));
qError
(
"failed to get super table meta, cname:%s, suid:0x%"
PRIx64
", code:%s, %s"
,
pInfo
->
pCur
->
mr
.
me
.
name
,
suid
,
tstrerror
(
terrno
),
GET_TASKID
(
pTaskInfo
));
metaReaderClear
(
&
mr
);
metaCloseTbCursor
(
pInfo
->
pCur
);
pInfo
->
pCur
=
NULL
;
...
...
@@ -2154,7 +2160,6 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
}
}
static
SSDataBlock
*
sysTableScanUserSTables
(
SOperatorInfo
*
pOperator
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SSysTableScanInfo
*
pInfo
=
pOperator
->
info
;
...
...
@@ -2180,12 +2185,13 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
getDBNameFromCondition
(
pInfo
->
pCondition
,
dbName
);
sprintf
(
pInfo
->
req
.
db
,
"%d.%s"
,
pInfo
->
accountId
,
dbName
);
}
if
(
strncasecmp
(
name
,
TSDB_INS_TABLE_USER_TABLES
,
TSDB_TABLE_FNAME_LEN
)
==
0
)
{
return
sysTableScanUserTables
(
pOperator
);
}
else
if
(
strncasecmp
(
name
,
TSDB_INS_TABLE_USER_TAGS
,
TSDB_TABLE_FNAME_LEN
)
==
0
)
{
return
sysTableScanUserTags
(
pOperator
);
}
else
if
(
strncasecmp
(
name
,
TSDB_INS_TABLE_USER_STABLES
,
TSDB_TABLE_FNAME_LEN
)
==
0
&&
IS_SYS_DBNAME
(
pInfo
->
req
.
db
))
{
}
else
if
(
strncasecmp
(
name
,
TSDB_INS_TABLE_USER_STABLES
,
TSDB_TABLE_FNAME_LEN
)
==
0
&&
IS_SYS_DBNAME
(
pInfo
->
req
.
db
))
{
return
sysTableScanUserSTables
(
pOperator
);
}
else
{
// load the meta from mnode of the given epset
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录