Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4f0833e1
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4f0833e1
编写于
3月 22, 2023
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more code
上级
275f195c
变更
2
显示空白变更内容
内联
并排
Showing
2 changed file
with
72 addition
and
1320 deletion
+72
-1320
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+3
-5
source/dnode/vnode/src/tsdb/tsdbCommit2.c
source/dnode/vnode/src/tsdb/tsdbCommit2.c
+69
-1315
未找到文件。
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
4f0833e1
...
...
@@ -406,11 +406,9 @@ struct SMemTable {
TSKEY
maxKey
;
int64_t
nRow
;
int64_t
nDel
;
struct
{
int32_t
nTbData
;
int32_t
nBucket
;
STbData
**
aBucket
;
};
};
struct
TSDBROW
{
...
...
source/dnode/vnode/src/tsdb/tsdbCommit2.c
浏览文件 @
4f0833e1
...
...
@@ -15,32 +15,9 @@
#include "tsdb.h"
typedef
enum
{
MEMORY_DATA_ITER
=
0
,
STT_DATA_ITER
}
EDataIterT
;
#define USE_STREAM_COMPRESSION 0
typedef
struct
{
SRBTreeNode
n
;
SRowInfo
r
;
EDataIterT
type
;
union
{
struct
{
int32_t
iTbDataP
;
STbDataIter
iter
;
};
// memory data iter
struct
{
int32_t
iStt
;
SArray
*
aSttBlk
;
int32_t
iSttBlk
;
SBlockData
bData
;
int32_t
iRow
;
};
// stt file data iter
};
}
SDataIter
;
typedef
struct
{
STsdb
*
pTsdb
;
/
* commit data */
/
/ config
int64_t
commitID
;
int32_t
minutes
;
int8_t
precision
;
...
...
@@ -48,1368 +25,144 @@ typedef struct {
int32_t
maxRow
;
int8_t
cmprAlg
;
int8_t
sttTrigger
;
SArray
*
aTbDataP
;
// memory
STsdbFS
fs
;
// disk
// --------------
SArray
*
aTbDataP
;
// context
TSKEY
nextKey
;
// reset by each table commit
int32_t
commitF
id
;
int32_t
f
id
;
int32_t
expLevel
;
TSKEY
minKey
;
TSKEY
maxKey
;
// commit file data
struct
{
SDataFReader
*
pReader
;
SArray
*
aBlockIdx
;
// SArray<SBlockIdx>
int32_t
iBlockIdx
;
SBlockIdx
*
pBlockIdx
;
SMapData
mBlock
;
// SMapData<SDataBlk>
SBlockData
bData
;
}
dReader
;
struct
{
SDataIter
*
pIter
;
SRBTree
rbt
;
SDataIter
dataIter
;
SDataIter
aDataIter
[
TSDB_MAX_STT_TRIGGER
];
int8_t
toLastOnly
;
};
struct
{
SDataFWriter
*
pWriter
;
SArray
*
aBlockIdx
;
// SArray<SBlockIdx>
SArray
*
aSttBlk
;
// SArray<SSttBlk>
SMapData
mBlock
;
// SMapData<SDataBlk>
SBlockData
bData
;
SBlockData
bDatal
;
}
dWriter
;
SSkmInfo
skmTable
;
SSkmInfo
skmRow
;
/* commit del */
SDelFReader
*
pDelFReader
;
SDelFWriter
*
pDelFWriter
;
SArray
*
aDelIdx
;
// SArray<SDelIdx>
SArray
*
aDelIdxN
;
// SArray<SDelIdx>
SArray
*
aDelData
;
// SArray<SDelData>
SBlockData
bData
;
SColData
aColData
[
4
];
// <suid, uid, ts, version>
SArray
*
aSttBlk
;
// SArray<SSttBlk>
SArray
*
aDelBlk
;
// SArray<SDelBlk>
}
SCommitter
;
extern
int32_t
tsdbWriteDataBlock
(
SDataFWriter
*
pWriter
,
SBlockData
*
pBlockData
,
SMapData
*
mDataBlk
,
int8_t
cmprAlg
);
extern
int32_t
tsdbWriteSttBlock
(
SDataFWriter
*
pWriter
,
SBlockData
*
pBlockData
,
SArray
*
aSttBlk
,
int8_t
cmprAlg
);
extern
int32_t
tsdbUpdateTableSchema
(
SMeta
*
pMeta
,
int64_t
suid
,
int64_t
uid
,
SSkmInfo
*
pSkmInfo
);
static
int32_t
tsdbStartCommit
(
STsdb
*
pTsdb
,
SCommitter
*
pCommitter
,
SCommitInfo
*
pInfo
);
static
int32_t
tsdbCommitData
(
SCommitter
*
pCommitter
);
static
int32_t
tsdbCommitDel
(
SCommitter
*
pCommitter
);
static
int32_t
tsdbCommitCache
(
SCommitter
*
pCommitter
);
static
int32_t
tsdbEndCommit
(
SCommitter
*
pCommitter
,
int32_t
eno
);
static
int32_t
tsdbNextCommitRow
(
SCommitter
*
pCommitter
);
static
int32_t
tsdbCommitDelStart
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
if
((
pCommitter
->
aDelIdx
=
taosArrayInit
(
0
,
sizeof
(
SDelIdx
)))
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
((
pCommitter
->
aDelData
=
taosArrayInit
(
0
,
sizeof
(
SDelData
)))
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
((
pCommitter
->
aDelIdxN
=
taosArrayInit
(
0
,
sizeof
(
SDelIdx
)))
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
SDelFile
*
pDelFileR
=
pCommitter
->
fs
.
pDelFile
;
if
(
pDelFileR
)
{
code
=
tsdbDelFReaderOpen
(
&
pCommitter
->
pDelFReader
,
pDelFileR
,
pTsdb
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbReadDelIdx
(
pCommitter
->
pDelFReader
,
pCommitter
->
aDelIdx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
// prepare new
SDelFile
wDelFile
=
{.
commitID
=
pCommitter
->
commitID
,
.
size
=
0
,
.
offset
=
0
};
code
=
tsdbDelFWriterOpen
(
&
pCommitter
->
pDelFWriter
,
&
wDelFile
,
pTsdb
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
else
{
tsdbDebug
(
"vgId:%d, commit del start"
,
TD_VID
(
pTsdb
->
pVnode
));
}
return
code
;
static
int32_t
tsdbRowIsDeleted
(
SCommitter
*
pCommitter
,
TSDBROW
*
pRow
)
{
// TODO
ASSERT
(
0
);
return
0
;
}
static
int32_t
tsdbCommitT
ableDel
(
SCommitter
*
pCommitter
,
STbData
*
pTbData
,
SDelIdx
*
pDelIdx
)
{
static
int32_t
tsdbCommitT
imeSeriesData
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
SDelData
*
pDelData
;
tb_uid_t
suid
;
tb_uid_t
uid
;
if
(
pTbData
)
{
suid
=
pTbData
->
suid
;
uid
=
pTbData
->
uid
;
if
(
pTbData
->
pHead
==
NULL
)
{
pTbData
=
NULL
;
}
}
if
(
pDelIdx
)
{
suid
=
pDelIdx
->
suid
;
uid
=
pDelIdx
->
uid
;
code
=
tsdbReadDelData
(
pCommitter
->
pDelFReader
,
pDelIdx
,
pCommitter
->
aDelData
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
taosArrayClear
(
pCommitter
->
aDelData
);
}
if
(
pTbData
==
NULL
&&
pDelIdx
==
NULL
)
goto
_exit
;
SDelIdx
delIdx
=
{.
suid
=
suid
,
.
uid
=
uid
};
// memory
pDelData
=
pTbData
?
pTbData
->
pHead
:
NULL
;
for
(;
pDelData
;
pDelData
=
pDelData
->
pNext
)
{
if
(
taosArrayPush
(
pCommitter
->
aDelData
,
pDelData
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
// write
code
=
tsdbWriteDelData
(
pCommitter
->
pDelFWriter
,
pCommitter
->
aDelData
,
&
delIdx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// put delIdx
if
(
taosArrayPush
(
pCommitter
->
aDelIdxN
,
&
delIdx
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
// TODO
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
tsdbError
(
"vgId:%d failed at line %d since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
tsdbCommitDel
End
(
SCommitter
*
pCommitter
)
{
static
int32_t
tsdbCommitDel
Data
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
code
=
tsdbWriteDelIdx
(
pCommitter
->
pDelFWriter
,
pCommitter
->
aDelIdxN
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbUpdateDelFileHdr
(
pCommitter
->
pDelFWriter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbFSUpsertDelFile
(
&
pCommitter
->
fs
,
&
pCommitter
->
pDelFWriter
->
fDel
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbDelFWriterClose
(
&
pCommitter
->
pDelFWriter
,
1
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
pCommitter
->
pDelFReader
)
{
code
=
tsdbDelFReaderClose
(
&
pCommitter
->
pDelFReader
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
taosArrayDestroy
(
pCommitter
->
aDelIdx
);
taosArrayDestroy
(
pCommitter
->
aDelData
);
taosArrayDestroy
(
pCommitter
->
aDelIdxN
);
// TODO
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
tsdbCommitterUpdateRowSchema
(
SCommitter
*
pCommitter
,
int64_t
suid
,
int64_t
uid
,
int32_t
sver
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
if
(
pCommitter
->
skmRow
.
pTSchema
)
{
if
(
pCommitter
->
skmRow
.
suid
==
suid
)
{
if
(
suid
==
0
)
{
if
(
pCommitter
->
skmRow
.
uid
==
uid
&&
sver
==
pCommitter
->
skmRow
.
pTSchema
->
version
)
goto
_exit
;
}
else
{
if
(
sver
==
pCommitter
->
skmRow
.
pTSchema
->
version
)
goto
_exit
;
}
}
}
pCommitter
->
skmRow
.
suid
=
suid
;
pCommitter
->
skmRow
.
uid
=
uid
;
tDestroyTSchema
(
pCommitter
->
skmRow
.
pTSchema
);
code
=
metaGetTbTSchemaEx
(
pCommitter
->
pTsdb
->
pVnode
->
pMeta
,
suid
,
uid
,
sver
,
&
pCommitter
->
skmRow
.
pTSchema
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
return
code
;
}
static
int32_t
tsdbCommitterNextTableData
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
ASSERT
(
pCommitter
->
dReader
.
pBlockIdx
);
pCommitter
->
dReader
.
iBlockIdx
++
;
if
(
pCommitter
->
dReader
.
iBlockIdx
<
taosArrayGetSize
(
pCommitter
->
dReader
.
aBlockIdx
))
{
pCommitter
->
dReader
.
pBlockIdx
=
(
SBlockIdx
*
)
taosArrayGet
(
pCommitter
->
dReader
.
aBlockIdx
,
pCommitter
->
dReader
.
iBlockIdx
);
code
=
tsdbReadDataBlk
(
pCommitter
->
dReader
.
pReader
,
pCommitter
->
dReader
.
pBlockIdx
,
&
pCommitter
->
dReader
.
mBlock
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
ASSERT
(
pCommitter
->
dReader
.
mBlock
.
nItem
>
0
);
}
else
{
pCommitter
->
dReader
.
pBlockIdx
=
NULL
;
tsdbError
(
"vgId:%d failed at line %d since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
lino
,
tstrerror
(
code
));
}
_exit:
return
code
;
}
static
int32_t
tDataIterCmprFn
(
const
SRBTreeNode
*
n1
,
const
SRBTreeNode
*
n2
)
{
SDataIter
*
pIter1
=
(
SDataIter
*
)((
uint8_t
*
)
n1
-
offsetof
(
SDataIter
,
n
));
SDataIter
*
pIter2
=
(
SDataIter
*
)((
uint8_t
*
)
n2
-
offsetof
(
SDataIter
,
n
));
return
tRowInfoCmprFn
(
&
pIter1
->
r
,
&
pIter2
->
r
);
}
static
int32_t
tsdbOpenCommitIter
(
SCommitter
*
pCommitter
)
{
static
int32_t
tsdbCommitNextFSet
(
SCommitter
*
pCommitter
,
int8_t
*
done
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
pCommitter
->
pIter
=
NULL
;
tRBTreeCreate
(
&
pCommitter
->
rbt
,
tDataIterCmprFn
);
// memory
TSDBKEY
tKey
=
{.
ts
=
pCommitter
->
minKey
,
.
version
=
VERSION_MIN
};
SDataIter
*
pIter
=
&
pCommitter
->
dataIter
;
pIter
->
type
=
MEMORY_DATA_ITER
;
pIter
->
iTbDataP
=
0
;
for
(;
pIter
->
iTbDataP
<
taosArrayGetSize
(
pCommitter
->
aTbDataP
);
pIter
->
iTbDataP
++
)
{
STbData
*
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pCommitter
->
aTbDataP
,
pIter
->
iTbDataP
);
tsdbTbDataIterOpen
(
pTbData
,
&
tKey
,
0
,
&
pIter
->
iter
);
TSDBROW
*
pRow
=
tsdbTbDataIterGet
(
&
pIter
->
iter
);
if
(
pRow
&&
TSDBROW_TS
(
pRow
)
>
pCommitter
->
maxKey
)
{
pCommitter
->
nextKey
=
TMIN
(
pCommitter
->
nextKey
,
TSDBROW_TS
(
pRow
));
pRow
=
NULL
;
}
if
(
pRow
==
NULL
)
continue
;
pIter
->
r
.
suid
=
pTbData
->
suid
;
pIter
->
r
.
uid
=
pTbData
->
uid
;
pIter
->
r
.
row
=
*
pRow
;
break
;
}
ASSERT
(
pIter
->
iTbDataP
<
taosArrayGetSize
(
pCommitter
->
aTbDataP
));
tRBTreePut
(
&
pCommitter
->
rbt
,
(
SRBTreeNode
*
)
pIter
);
// disk
pCommitter
->
toLastOnly
=
0
;
SDataFReader
*
pReader
=
pCommitter
->
dReader
.
pReader
;
if
(
pReader
)
{
if
(
pReader
->
pSet
->
nSttF
>=
pCommitter
->
sttTrigger
)
{
int8_t
iIter
=
0
;
for
(
int32_t
iStt
=
0
;
iStt
<
pReader
->
pSet
->
nSttF
;
iStt
++
)
{
pIter
=
&
pCommitter
->
aDataIter
[
iIter
];
pIter
->
type
=
STT_DATA_ITER
;
pIter
->
iStt
=
iStt
;
code
=
tsdbReadSttBlk
(
pCommitter
->
dReader
.
pReader
,
iStt
,
pIter
->
aSttBlk
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
taosArrayGetSize
(
pIter
->
aSttBlk
)
==
0
)
continue
;
pIter
->
iSttBlk
=
0
;
SSttBlk
*
pSttBlk
=
(
SSttBlk
*
)
taosArrayGet
(
pIter
->
aSttBlk
,
0
);
code
=
tsdbReadSttBlockEx
(
pCommitter
->
dReader
.
pReader
,
iStt
,
pSttBlk
,
&
pIter
->
bData
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
pIter
->
iRow
=
0
;
pIter
->
r
.
suid
=
pIter
->
bData
.
suid
;
pIter
->
r
.
uid
=
pIter
->
bData
.
uid
?
pIter
->
bData
.
uid
:
pIter
->
bData
.
aUid
[
0
];
pIter
->
r
.
row
=
tsdbRowFromBlockData
(
&
pIter
->
bData
,
0
);
tRBTreePut
(
&
pCommitter
->
rbt
,
(
SRBTreeNode
*
)
pIter
);
iIter
++
;
}
}
else
{
for
(
int32_t
iStt
=
0
;
iStt
<
pReader
->
pSet
->
nSttF
;
iStt
++
)
{
SSttFile
*
pSttFile
=
pReader
->
pSet
->
aSttF
[
iStt
];
if
(
pSttFile
->
size
>
pSttFile
->
offset
)
{
pCommitter
->
toLastOnly
=
1
;
break
;
}
}
}
}
code
=
tsdbNextCommitRow
(
pCommitter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
tsdbCommitFileDataStart
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SDFileSet
*
pRSet
=
NULL
;
// memory
pCommitter
->
commitFid
=
tsdbKeyFid
(
pCommitter
->
nextKey
,
pCommitter
->
minutes
,
pCommitter
->
precision
);
pCommitter
->
expLevel
=
tsdbFidLevel
(
pCommitter
->
commitFid
,
&
pCommitter
->
pTsdb
->
keepCfg
,
taosGetTimestampSec
());
tsdbFidKeyRange
(
pCommitter
->
commitFid
,
pCommitter
->
minutes
,
pCommitter
->
precision
,
&
pCommitter
->
minKey
,
&
pCommitter
->
maxKey
);
pCommitter
->
nextKey
=
TSKEY_MAX
;
// Reader
SDFileSet
tDFileSet
=
{.
fid
=
pCommitter
->
commitFid
};
pRSet
=
(
SDFileSet
*
)
taosArraySearch
(
pCommitter
->
fs
.
aDFileSet
,
&
tDFileSet
,
tDFileSetCmprFn
,
TD_EQ
);
if
(
pRSet
)
{
code
=
tsdbDataFReaderOpen
(
&
pCommitter
->
dReader
.
pReader
,
pTsdb
,
pRSet
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// data
code
=
tsdbReadBlockIdx
(
pCommitter
->
dReader
.
pReader
,
pCommitter
->
dReader
.
aBlockIdx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
pCommitter
->
dReader
.
iBlockIdx
=
0
;
if
(
taosArrayGetSize
(
pCommitter
->
dReader
.
aBlockIdx
)
>
0
)
{
pCommitter
->
dReader
.
pBlockIdx
=
(
SBlockIdx
*
)
taosArrayGet
(
pCommitter
->
dReader
.
aBlockIdx
,
0
);
code
=
tsdbReadDataBlk
(
pCommitter
->
dReader
.
pReader
,
pCommitter
->
dReader
.
pBlockIdx
,
&
pCommitter
->
dReader
.
mBlock
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
pCommitter
->
dReader
.
pBlockIdx
=
NULL
;
}
tBlockDataReset
(
&
pCommitter
->
dReader
.
bData
);
}
else
{
pCommitter
->
dReader
.
pBlockIdx
=
NULL
;
}
// Writer
SHeadFile
fHead
=
{.
commitID
=
pCommitter
->
commitID
};
SDataFile
fData
=
{.
commitID
=
pCommitter
->
commitID
};
SSmaFile
fSma
=
{.
commitID
=
pCommitter
->
commitID
};
SSttFile
fStt
=
{.
commitID
=
pCommitter
->
commitID
};
SDFileSet
wSet
=
{.
fid
=
pCommitter
->
commitFid
,
.
pHeadF
=
&
fHead
,
.
pDataF
=
&
fData
,
.
pSmaF
=
&
fSma
};
if
(
pRSet
)
{
ASSERT
(
pRSet
->
nSttF
<=
pCommitter
->
sttTrigger
);
fData
=
*
pRSet
->
pDataF
;
fSma
=
*
pRSet
->
pSmaF
;
wSet
.
diskId
=
pRSet
->
diskId
;
if
(
pRSet
->
nSttF
<
pCommitter
->
sttTrigger
)
{
for
(
int32_t
iStt
=
0
;
iStt
<
pRSet
->
nSttF
;
iStt
++
)
{
wSet
.
aSttF
[
iStt
]
=
pRSet
->
aSttF
[
iStt
];
}
wSet
.
nSttF
=
pRSet
->
nSttF
+
1
;
}
else
{
wSet
.
nSttF
=
1
;
}
}
else
{
SDiskID
did
=
{
0
};
if
(
tfsAllocDisk
(
pTsdb
->
pVnode
->
pTfs
,
pCommitter
->
expLevel
,
&
did
)
<
0
)
{
code
=
terrno
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
tfsMkdirRecurAt
(
pTsdb
->
pVnode
->
pTfs
,
pTsdb
->
path
,
did
);
wSet
.
diskId
=
did
;
wSet
.
nSttF
=
1
;
}
wSet
.
aSttF
[
wSet
.
nSttF
-
1
]
=
&
fStt
;
code
=
tsdbDataFWriterOpen
(
&
pCommitter
->
dWriter
.
pWriter
,
pTsdb
,
&
wSet
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
taosArrayClear
(
pCommitter
->
dWriter
.
aBlockIdx
);
taosArrayClear
(
pCommitter
->
dWriter
.
aSttBlk
);
tMapDataReset
(
&
pCommitter
->
dWriter
.
mBlock
);
tBlockDataReset
(
&
pCommitter
->
dWriter
.
bData
);
tBlockDataReset
(
&
pCommitter
->
dWriter
.
bDatal
);
// open iter
code
=
tsdbOpenCommitIter
(
pCommitter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
tsdbCommitSttBlk
(
SDataFWriter
*
pWriter
,
SDiskDataBuilder
*
pBuilder
,
SArray
*
aSttBlk
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
if
(
pBuilder
->
nRow
==
0
)
return
code
;
// gnrt
const
SDiskData
*
pDiskData
;
const
SBlkInfo
*
pBlkInfo
;
code
=
tGnrtDiskData
(
pBuilder
,
&
pDiskData
,
&
pBlkInfo
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
SSttBlk
sttBlk
=
{.
suid
=
pBuilder
->
suid
,
.
minUid
=
pBlkInfo
->
minUid
,
.
maxUid
=
pBlkInfo
->
maxUid
,
.
minKey
=
pBlkInfo
->
minKey
,
.
maxKey
=
pBlkInfo
->
maxKey
,
.
minVer
=
pBlkInfo
->
minVer
,
.
maxVer
=
pBlkInfo
->
maxVer
,
.
nRow
=
pBuilder
->
nRow
};
// write
code
=
tsdbWriteDiskData
(
pWriter
,
pDiskData
,
&
sttBlk
.
bInfo
,
NULL
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// fset commit start (TODO)
// push
if
(
taosArrayPush
(
aSttBlk
,
&
sttBlk
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
// commit fset
code
=
tsdbCommitTimeSeriesData
(
pCommitter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
// clear
tDiskDataBuilderClear
(
pBuilder
);
// fset commit end (TODO)
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
tsdbCommit
FileDataEnd
(
SCommitter
*
pCommitter
)
{
static
int32_t
tsdbCommit
terOpen
(
STsdb
*
pTsdb
,
SCommitInfo
*
pInfo
,
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
// write aBlockIdx
code
=
tsdbWriteBlockIdx
(
pCommitter
->
dWriter
.
pWriter
,
pCommitter
->
dWriter
.
aBlockIdx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// write aSttBlk
code
=
tsdbWriteSttBlk
(
pCommitter
->
dWriter
.
pWriter
,
pCommitter
->
dWriter
.
aSttBlk
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// update file header
code
=
tsdbUpdateDFileSetHeader
(
pCommitter
->
dWriter
.
pWriter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// upsert SDFileSet
code
=
tsdbFSUpsertFSet
(
&
pCommitter
->
fs
,
&
pCommitter
->
dWriter
.
pWriter
->
wSet
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// close and sync
code
=
tsdbDataFWriterClose
(
&
pCommitter
->
dWriter
.
pWriter
,
1
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
memset
(
pCommitter
,
0
,
sizeof
(
SCommitter
));
pCommitter
->
pTsdb
=
pTsdb
;
if
(
pCommitter
->
dReader
.
pReader
)
{
code
=
tsdbDataFReaderClose
(
&
pCommitter
->
dReader
.
pReader
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
// TODO
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
else
{
tsdbDebug
(
"vgId:%d %s done"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
);
}
return
code
;
}
static
int32_t
tsdb
MoveCommitData
(
SCommitter
*
pCommitter
,
TABLEID
toTable
)
{
static
int32_t
tsdb
CommitterClose
(
SCommitter
*
pCommiter
,
int32_t
eno
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
while
(
pCommitter
->
dReader
.
pBlockIdx
&&
tTABLEIDCmprFn
(
pCommitter
->
dReader
.
pBlockIdx
,
&
toTable
)
<
0
)
{
SBlockIdx
blockIdx
=
*
pCommitter
->
dReader
.
pBlockIdx
;
code
=
tsdbWriteDataBlk
(
pCommitter
->
dWriter
.
pWriter
,
&
pCommitter
->
dReader
.
mBlock
,
&
blockIdx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
taosArrayPush
(
pCommitter
->
dWriter
.
aBlockIdx
,
&
blockIdx
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code
=
tsdbCommitterNextTableData
(
pCommitter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
// TODO
return
code
;
}
static
int32_t
tsdbCommitFileDataImpl
(
SCommitter
*
pCommitter
);
static
int32_t
tsdbCommitFileData
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
// commit file data start
code
=
tsdbCommitFileDataStart
(
pCommitter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// impl
code
=
tsdbCommitFileDataImpl
(
pCommitter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// commit file data end
code
=
tsdbCommitFileDataEnd
(
pCommitter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
tsdbDataFReaderClose
(
&
pCommitter
->
dReader
.
pReader
);
tsdbDataFWriterClose
(
&
pCommitter
->
dWriter
.
pWriter
,
0
);
}
return
code
;
int32_t
tsdbPreCommit
(
STsdb
*
pTsdb
)
{
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
ASSERT
(
pTsdb
->
imem
==
NULL
);
pTsdb
->
imem
=
pTsdb
->
mem
;
pTsdb
->
mem
=
NULL
;
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
return
0
;
}
// ----------------------------------------------------------------------------
static
int32_t
tsdbStartCommit
(
STsdb
*
pTsdb
,
SCommitter
*
pCommitter
,
SCommitInfo
*
pInfo
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
memset
(
pCommitter
,
0
,
sizeof
(
*
pCommitter
));
ASSERT
(
pTsdb
->
imem
&&
"last tsdb commit incomplete"
);
pCommitter
->
pTsdb
=
pTsdb
;
pCommitter
->
commitID
=
pInfo
->
info
.
state
.
commitID
;
pCommitter
->
minutes
=
pTsdb
->
keepCfg
.
days
;
pCommitter
->
precision
=
pTsdb
->
keepCfg
.
precision
;
pCommitter
->
minRow
=
pInfo
->
info
.
config
.
tsdbCfg
.
minRows
;
pCommitter
->
maxRow
=
pInfo
->
info
.
config
.
tsdbCfg
.
maxRows
;
pCommitter
->
cmprAlg
=
pInfo
->
info
.
config
.
tsdbCfg
.
compression
;
pCommitter
->
sttTrigger
=
pInfo
->
info
.
config
.
sttTrigger
;
pCommitter
->
aTbDataP
=
tsdbMemTableGetTbDataArray
(
pTsdb
->
imem
);
if
(
pCommitter
->
aTbDataP
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code
=
tsdbFSCopy
(
pTsdb
,
&
pCommitter
->
fs
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
int32_t
tsdbCommitBegin
(
STsdb
*
pTsdb
,
SCommitInfo
*
pInfo
)
{
if
(
!
pTsdb
)
return
0
;
static
int32_t
tsdbCommitDataStart
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
SMemTable
*
pMem
=
pTsdb
->
imem
;
// reader
pCommitter
->
dReader
.
aBlockIdx
=
taosArrayInit
(
0
,
sizeof
(
SBlockIdx
));
if
(
pCommitter
->
dReader
.
aBlockIdx
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code
=
tBlockDataCreate
(
&
pCommitter
->
dReader
.
bData
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// merger
for
(
int32_t
iStt
=
0
;
iStt
<
TSDB_MAX_STT_TRIGGER
;
iStt
++
)
{
SDataIter
*
pIter
=
&
pCommitter
->
aDataIter
[
iStt
];
pIter
->
aSttBlk
=
taosArrayInit
(
0
,
sizeof
(
SSttBlk
));
if
(
pIter
->
aSttBlk
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
pMem
->
nRow
==
0
&&
pMem
->
nDel
==
0
)
{
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
pTsdb
->
imem
=
NULL
;
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
tsdbUnrefMemTable
(
pMem
,
NULL
,
true
);
}
else
{
SCommitter
committer
;
int8_t
done
=
0
;
code
=
t
BlockDataCreate
(
&
pIter
->
bData
);
code
=
t
sdbCommitterOpen
(
pTsdb
,
pInfo
,
&
committer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
// writer
pCommitter
->
dWriter
.
aBlockIdx
=
taosArrayInit
(
0
,
sizeof
(
SBlockIdx
));
if
(
pCommitter
->
dWriter
.
aBlockIdx
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
while
(
!
done
&&
(
code
=
tsdbCommitNextFSet
(
&
committer
,
&
done
)))
{
}
pCommitter
->
dWriter
.
aSttBlk
=
taosArrayInit
(
0
,
sizeof
(
SSttBlk
));
if
(
pCommitter
->
dWriter
.
aSttBlk
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
tsdbCommitterClose
(
&
committer
,
code
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code
=
tBlockDataCreate
(
&
pCommitter
->
dWriter
.
bData
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tBlockDataCreate
(
&
pCommitter
->
dWriter
.
bDatal
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
void
tsdbCommitDataEnd
(
SCommitter
*
pCommitter
)
{
// reader
taosArrayDestroy
(
pCommitter
->
dReader
.
aBlockIdx
);
tMapDataClear
(
&
pCommitter
->
dReader
.
mBlock
);
tBlockDataDestroy
(
&
pCommitter
->
dReader
.
bData
);
// merger
for
(
int32_t
iStt
=
0
;
iStt
<
TSDB_MAX_STT_TRIGGER
;
iStt
++
)
{
SDataIter
*
pIter
=
&
pCommitter
->
aDataIter
[
iStt
];
taosArrayDestroy
(
pIter
->
aSttBlk
);
tBlockDataDestroy
(
&
pIter
->
bData
);
}
// writer
taosArrayDestroy
(
pCommitter
->
dWriter
.
aBlockIdx
);
taosArrayDestroy
(
pCommitter
->
dWriter
.
aSttBlk
);
tMapDataClear
(
&
pCommitter
->
dWriter
.
mBlock
);
tBlockDataDestroy
(
&
pCommitter
->
dWriter
.
bData
);
tBlockDataDestroy
(
&
pCommitter
->
dWriter
.
bDatal
);
tDestroyTSchema
(
pCommitter
->
skmTable
.
pTSchema
);
tDestroyTSchema
(
pCommitter
->
skmRow
.
pTSchema
);
}
static
int32_t
tsdbCommitData
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
// check
if
(
pMemTable
->
nRow
==
0
)
goto
_exit
;
// start ====================
code
=
tsdbCommitDataStart
(
pCommitter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// impl ====================
pCommitter
->
nextKey
=
pMemTable
->
minKey
;
while
(
pCommitter
->
nextKey
<
TSKEY_MAX
)
{
code
=
tsdbCommitFileData
(
pCommitter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
// end ====================
tsdbCommitDataEnd
(
pCommitter
);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
tsdbCommitDel
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
if
(
pMemTable
->
nDel
==
0
)
{
goto
_exit
;
}
// start
code
=
tsdbCommitDelStart
(
pCommitter
);
if
(
code
)
{
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
// impl
int32_t
iDelIdx
=
0
;
int32_t
nDelIdx
=
taosArrayGetSize
(
pCommitter
->
aDelIdx
);
int32_t
iTbData
=
0
;
int32_t
nTbData
=
taosArrayGetSize
(
pCommitter
->
aTbDataP
);
STbData
*
pTbData
;
SDelIdx
*
pDelIdx
;
ASSERT
(
nTbData
>
0
);
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pCommitter
->
aTbDataP
,
iTbData
);
pDelIdx
=
(
iDelIdx
<
nDelIdx
)
?
(
SDelIdx
*
)
taosArrayGet
(
pCommitter
->
aDelIdx
,
iDelIdx
)
:
NULL
;
while
(
true
)
{
if
(
pTbData
==
NULL
&&
pDelIdx
==
NULL
)
break
;
if
(
pTbData
&&
pDelIdx
)
{
int32_t
c
=
tTABLEIDCmprFn
(
pTbData
,
pDelIdx
);
if
(
c
==
0
)
{
goto
_commit_mem_and_disk_del
;
}
else
if
(
c
<
0
)
{
goto
_commit_mem_del
;
}
else
{
goto
_commit_disk_del
;
}
}
else
if
(
pTbData
)
{
goto
_commit_mem_del
;
}
else
{
goto
_commit_disk_del
;
}
_commit_mem_del:
code
=
tsdbCommitTableDel
(
pCommitter
,
pTbData
,
NULL
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
iTbData
++
;
pTbData
=
(
iTbData
<
nTbData
)
?
(
STbData
*
)
taosArrayGetP
(
pCommitter
->
aTbDataP
,
iTbData
)
:
NULL
;
continue
;
_commit_disk_del:
code
=
tsdbCommitTableDel
(
pCommitter
,
NULL
,
pDelIdx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
iDelIdx
++
;
pDelIdx
=
(
iDelIdx
<
nDelIdx
)
?
(
SDelIdx
*
)
taosArrayGet
(
pCommitter
->
aDelIdx
,
iDelIdx
)
:
NULL
;
continue
;
_commit_mem_and_disk_del:
code
=
tsdbCommitTableDel
(
pCommitter
,
pTbData
,
pDelIdx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
iTbData
++
;
pTbData
=
(
iTbData
<
nTbData
)
?
(
STbData
*
)
taosArrayGetP
(
pCommitter
->
aTbDataP
,
iTbData
)
:
NULL
;
iDelIdx
++
;
pDelIdx
=
(
iDelIdx
<
nDelIdx
)
?
(
SDelIdx
*
)
taosArrayGet
(
pCommitter
->
aDelIdx
,
iDelIdx
)
:
NULL
;
continue
;
}
// end
code
=
tsdbCommitDelEnd
(
pCommitter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
else
{
tsdbDebug
(
"vgId:%d, commit del done, nDel:%"
PRId64
,
TD_VID
(
pTsdb
->
pVnode
),
pMemTable
->
nDel
);
}
return
code
;
}
static
int32_t
tsdbEndCommit
(
SCommitter
*
pCommitter
,
int32_t
eno
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
STsdb
*
pTsdb
=
pCommitter
->
pTsdb
;
if
(
eno
)
{
code
=
eno
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
code
=
tsdbFSPrepareCommit
(
pCommitter
->
pTsdb
,
&
pCommitter
->
fs
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
_exit:
tsdbFSDestroy
(
&
pCommitter
->
fs
);
taosArrayDestroy
(
pCommitter
->
aTbDataP
);
pCommitter
->
aTbDataP
=
NULL
;
if
(
code
||
eno
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
else
{
tsdbInfo
(
"vgId:%d, tsdb end commit"
,
TD_VID
(
pTsdb
->
pVnode
));
}
return
code
;
}
// ================================================================================
static
FORCE_INLINE
SRowInfo
*
tsdbGetCommitRow
(
SCommitter
*
pCommitter
)
{
return
(
pCommitter
->
pIter
)
?
&
pCommitter
->
pIter
->
r
:
NULL
;
}
static
int32_t
tsdbNextCommitRow
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
if
(
pCommitter
->
pIter
)
{
SDataIter
*
pIter
=
pCommitter
->
pIter
;
if
(
pCommitter
->
pIter
->
type
==
MEMORY_DATA_ITER
)
{
// memory
tsdbTbDataIterNext
(
&
pIter
->
iter
);
TSDBROW
*
pRow
=
tsdbTbDataIterGet
(
&
pIter
->
iter
);
while
(
true
)
{
if
(
pRow
&&
TSDBROW_TS
(
pRow
)
>
pCommitter
->
maxKey
)
{
pCommitter
->
nextKey
=
TMIN
(
pCommitter
->
nextKey
,
TSDBROW_TS
(
pRow
));
pRow
=
NULL
;
}
if
(
pRow
)
{
pIter
->
r
.
suid
=
pIter
->
iter
.
pTbData
->
suid
;
pIter
->
r
.
uid
=
pIter
->
iter
.
pTbData
->
uid
;
pIter
->
r
.
row
=
*
pRow
;
break
;
}
pIter
->
iTbDataP
++
;
if
(
pIter
->
iTbDataP
<
taosArrayGetSize
(
pCommitter
->
aTbDataP
))
{
STbData
*
pTbData
=
(
STbData
*
)
taosArrayGetP
(
pCommitter
->
aTbDataP
,
pIter
->
iTbDataP
);
TSDBKEY
keyFrom
=
{.
ts
=
pCommitter
->
minKey
,
.
version
=
VERSION_MIN
};
tsdbTbDataIterOpen
(
pTbData
,
&
keyFrom
,
0
,
&
pIter
->
iter
);
pRow
=
tsdbTbDataIterGet
(
&
pIter
->
iter
);
continue
;
}
else
{
pCommitter
->
pIter
=
NULL
;
break
;
}
}
}
else
if
(
pCommitter
->
pIter
->
type
==
STT_DATA_ITER
)
{
// last file
pIter
->
iRow
++
;
if
(
pIter
->
iRow
<
pIter
->
bData
.
nRow
)
{
pIter
->
r
.
uid
=
pIter
->
bData
.
uid
?
pIter
->
bData
.
uid
:
pIter
->
bData
.
aUid
[
pIter
->
iRow
];
pIter
->
r
.
row
=
tsdbRowFromBlockData
(
&
pIter
->
bData
,
pIter
->
iRow
);
}
else
{
pIter
->
iSttBlk
++
;
if
(
pIter
->
iSttBlk
<
taosArrayGetSize
(
pIter
->
aSttBlk
))
{
SSttBlk
*
pSttBlk
=
(
SSttBlk
*
)
taosArrayGet
(
pIter
->
aSttBlk
,
pIter
->
iSttBlk
);
code
=
tsdbReadSttBlockEx
(
pCommitter
->
dReader
.
pReader
,
pIter
->
iStt
,
pSttBlk
,
&
pIter
->
bData
);
if
(
code
)
goto
_exit
;
pIter
->
iRow
=
0
;
pIter
->
r
.
suid
=
pIter
->
bData
.
suid
;
pIter
->
r
.
uid
=
pIter
->
bData
.
uid
?
pIter
->
bData
.
uid
:
pIter
->
bData
.
aUid
[
0
];
pIter
->
r
.
row
=
tsdbRowFromBlockData
(
&
pIter
->
bData
,
0
);
}
else
{
pCommitter
->
pIter
=
NULL
;
}
}
}
else
{
ASSERT
(
0
);
}
// compare with min in RB Tree
pIter
=
(
SDataIter
*
)
tRBTreeMin
(
&
pCommitter
->
rbt
);
if
(
pCommitter
->
pIter
&&
pIter
)
{
int32_t
c
=
tRowInfoCmprFn
(
&
pCommitter
->
pIter
->
r
,
&
pIter
->
r
);
if
(
c
>
0
)
{
tRBTreePut
(
&
pCommitter
->
rbt
,
(
SRBTreeNode
*
)
pCommitter
->
pIter
);
pCommitter
->
pIter
=
NULL
;
}
else
{
ASSERT
(
c
);
}
}
}
if
(
pCommitter
->
pIter
==
NULL
)
{
pCommitter
->
pIter
=
(
SDataIter
*
)
tRBTreeMin
(
&
pCommitter
->
rbt
);
if
(
pCommitter
->
pIter
)
{
tRBTreeDrop
(
&
pCommitter
->
rbt
,
(
SRBTreeNode
*
)
pCommitter
->
pIter
);
}
}
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
tsdbCommitAheadBlock
(
SCommitter
*
pCommitter
,
SDataBlk
*
pDataBlk
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
SBlockData
*
pBlockData
=
&
pCommitter
->
dWriter
.
bData
;
SRowInfo
*
pRowInfo
=
tsdbGetCommitRow
(
pCommitter
);
TABLEID
id
=
{.
suid
=
pRowInfo
->
suid
,
.
uid
=
pRowInfo
->
uid
};
tBlockDataClear
(
pBlockData
);
while
(
pRowInfo
)
{
code
=
tsdbCommitterUpdateRowSchema
(
pCommitter
,
id
.
suid
,
id
.
uid
,
TSDBROW_SVERSION
(
&
pRowInfo
->
row
));
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tBlockDataAppendRow
(
pBlockData
,
&
pRowInfo
->
row
,
pCommitter
->
skmRow
.
pTSchema
,
id
.
uid
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbNextCommitRow
(
pCommitter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
pRowInfo
=
tsdbGetCommitRow
(
pCommitter
);
if
(
pRowInfo
)
{
if
(
pRowInfo
->
suid
!=
id
.
suid
||
pRowInfo
->
uid
!=
id
.
uid
)
{
pRowInfo
=
NULL
;
}
else
{
TSDBKEY
tKey
=
TSDBROW_KEY
(
&
pRowInfo
->
row
);
if
(
tsdbKeyCmprFn
(
&
tKey
,
&
pDataBlk
->
minKey
)
>=
0
)
pRowInfo
=
NULL
;
}
}
if
(
pBlockData
->
nRow
>=
pCommitter
->
maxRow
)
{
code
=
tsdbWriteDataBlock
(
pCommitter
->
dWriter
.
pWriter
,
pBlockData
,
&
pCommitter
->
dWriter
.
mBlock
,
pCommitter
->
cmprAlg
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
code
=
tsdbWriteDataBlock
(
pCommitter
->
dWriter
.
pWriter
,
pBlockData
,
&
pCommitter
->
dWriter
.
mBlock
,
pCommitter
->
cmprAlg
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
tsdbCommitMergeBlock
(
SCommitter
*
pCommitter
,
SDataBlk
*
pDataBlk
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
SRowInfo
*
pRowInfo
=
tsdbGetCommitRow
(
pCommitter
);
TABLEID
id
=
{.
suid
=
pRowInfo
->
suid
,
.
uid
=
pRowInfo
->
uid
};
SBlockData
*
pBDataR
=
&
pCommitter
->
dReader
.
bData
;
SBlockData
*
pBDataW
=
&
pCommitter
->
dWriter
.
bData
;
code
=
tsdbReadDataBlock
(
pCommitter
->
dReader
.
pReader
,
pDataBlk
,
pBDataR
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
tBlockDataClear
(
pBDataW
);
int32_t
iRow
=
0
;
TSDBROW
row
=
tsdbRowFromBlockData
(
pBDataR
,
0
);
TSDBROW
*
pRow
=
&
row
;
while
(
pRow
&&
pRowInfo
)
{
int32_t
c
=
tsdbRowCmprFn
(
pRow
,
&
pRowInfo
->
row
);
if
(
c
<
0
)
{
code
=
tBlockDataAppendRow
(
pBDataW
,
pRow
,
NULL
,
id
.
uid
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
iRow
++
;
if
(
iRow
<
pBDataR
->
nRow
)
{
row
=
tsdbRowFromBlockData
(
pBDataR
,
iRow
);
}
else
{
pRow
=
NULL
;
}
}
else
if
(
c
>
0
)
{
code
=
tsdbCommitterUpdateRowSchema
(
pCommitter
,
id
.
suid
,
id
.
uid
,
TSDBROW_SVERSION
(
&
pRowInfo
->
row
));
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tBlockDataAppendRow
(
pBDataW
,
&
pRowInfo
->
row
,
pCommitter
->
skmRow
.
pTSchema
,
id
.
uid
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbNextCommitRow
(
pCommitter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
pRowInfo
=
tsdbGetCommitRow
(
pCommitter
);
if
(
pRowInfo
)
{
if
(
pRowInfo
->
suid
!=
id
.
suid
||
pRowInfo
->
uid
!=
id
.
uid
)
{
pRowInfo
=
NULL
;
}
else
{
TSDBKEY
tKey
=
TSDBROW_KEY
(
&
pRowInfo
->
row
);
if
(
tsdbKeyCmprFn
(
&
tKey
,
&
pDataBlk
->
maxKey
)
>
0
)
pRowInfo
=
NULL
;
}
}
}
else
{
ASSERT
(
0
&&
"dup rows not allowed"
);
}
if
(
pBDataW
->
nRow
>=
pCommitter
->
maxRow
)
{
code
=
tsdbWriteDataBlock
(
pCommitter
->
dWriter
.
pWriter
,
pBDataW
,
&
pCommitter
->
dWriter
.
mBlock
,
pCommitter
->
cmprAlg
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
while
(
pRow
)
{
code
=
tBlockDataAppendRow
(
pBDataW
,
pRow
,
NULL
,
id
.
uid
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
iRow
++
;
if
(
iRow
<
pBDataR
->
nRow
)
{
row
=
tsdbRowFromBlockData
(
pBDataR
,
iRow
);
}
else
{
pRow
=
NULL
;
}
if
(
pBDataW
->
nRow
>=
pCommitter
->
maxRow
)
{
code
=
tsdbWriteDataBlock
(
pCommitter
->
dWriter
.
pWriter
,
pBDataW
,
&
pCommitter
->
dWriter
.
mBlock
,
pCommitter
->
cmprAlg
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
code
=
tsdbWriteDataBlock
(
pCommitter
->
dWriter
.
pWriter
,
pBDataW
,
&
pCommitter
->
dWriter
.
mBlock
,
pCommitter
->
cmprAlg
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
tsdbMergeTableData
(
SCommitter
*
pCommitter
,
TABLEID
id
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
SBlockIdx
*
pBlockIdx
=
pCommitter
->
dReader
.
pBlockIdx
;
ASSERT
(
pBlockIdx
==
NULL
||
tTABLEIDCmprFn
(
pBlockIdx
,
&
id
)
>=
0
);
if
(
pBlockIdx
&&
pBlockIdx
->
suid
==
id
.
suid
&&
pBlockIdx
->
uid
==
id
.
uid
)
{
int32_t
iBlock
=
0
;
SDataBlk
block
;
SDataBlk
*
pDataBlk
=
&
block
;
SRowInfo
*
pRowInfo
=
tsdbGetCommitRow
(
pCommitter
);
ASSERT
(
pRowInfo
->
suid
==
id
.
suid
&&
pRowInfo
->
uid
==
id
.
uid
);
tMapDataGetItemByIdx
(
&
pCommitter
->
dReader
.
mBlock
,
iBlock
,
pDataBlk
,
tGetDataBlk
);
while
(
pDataBlk
&&
pRowInfo
)
{
SDataBlk
tBlock
=
{.
minKey
=
TSDBROW_KEY
(
&
pRowInfo
->
row
),
.
maxKey
=
TSDBROW_KEY
(
&
pRowInfo
->
row
)};
int32_t
c
=
tDataBlkCmprFn
(
pDataBlk
,
&
tBlock
);
if
(
c
<
0
)
{
code
=
tMapDataPutItem
(
&
pCommitter
->
dWriter
.
mBlock
,
pDataBlk
,
tPutDataBlk
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
iBlock
++
;
if
(
iBlock
<
pCommitter
->
dReader
.
mBlock
.
nItem
)
{
tMapDataGetItemByIdx
(
&
pCommitter
->
dReader
.
mBlock
,
iBlock
,
pDataBlk
,
tGetDataBlk
);
}
else
{
pDataBlk
=
NULL
;
}
}
else
if
(
c
>
0
)
{
code
=
tsdbCommitAheadBlock
(
pCommitter
,
pDataBlk
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
pRowInfo
=
tsdbGetCommitRow
(
pCommitter
);
if
(
pRowInfo
&&
(
pRowInfo
->
suid
!=
id
.
suid
||
pRowInfo
->
uid
!=
id
.
uid
))
pRowInfo
=
NULL
;
}
else
{
code
=
tsdbCommitMergeBlock
(
pCommitter
,
pDataBlk
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
iBlock
++
;
if
(
iBlock
<
pCommitter
->
dReader
.
mBlock
.
nItem
)
{
tMapDataGetItemByIdx
(
&
pCommitter
->
dReader
.
mBlock
,
iBlock
,
pDataBlk
,
tGetDataBlk
);
}
else
{
pDataBlk
=
NULL
;
}
pRowInfo
=
tsdbGetCommitRow
(
pCommitter
);
if
(
pRowInfo
&&
(
pRowInfo
->
suid
!=
id
.
suid
||
pRowInfo
->
uid
!=
id
.
uid
))
pRowInfo
=
NULL
;
}
}
while
(
pDataBlk
)
{
code
=
tMapDataPutItem
(
&
pCommitter
->
dWriter
.
mBlock
,
pDataBlk
,
tPutDataBlk
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
iBlock
++
;
if
(
iBlock
<
pCommitter
->
dReader
.
mBlock
.
nItem
)
{
tMapDataGetItemByIdx
(
&
pCommitter
->
dReader
.
mBlock
,
iBlock
,
pDataBlk
,
tGetDataBlk
);
}
else
{
pDataBlk
=
NULL
;
}
}
code
=
tsdbCommitterNextTableData
(
pCommitter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
tsdbInitSttBlockBuilderIfNeed
(
SCommitter
*
pCommitter
,
TABLEID
id
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
SBlockData
*
pBData
=
&
pCommitter
->
dWriter
.
bDatal
;
if
(
pBData
->
suid
||
pBData
->
uid
)
{
if
(
!
TABLE_SAME_SCHEMA
(
pBData
->
suid
,
pBData
->
uid
,
id
.
suid
,
id
.
uid
))
{
code
=
tsdbWriteSttBlock
(
pCommitter
->
dWriter
.
pWriter
,
pBData
,
pCommitter
->
dWriter
.
aSttBlk
,
pCommitter
->
cmprAlg
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
tBlockDataReset
(
pBData
);
}
}
if
(
!
pBData
->
suid
&&
!
pBData
->
uid
)
{
ASSERT
(
pCommitter
->
skmTable
.
suid
==
id
.
suid
);
ASSERT
(
pCommitter
->
skmTable
.
uid
==
id
.
uid
);
TABLEID
tid
=
{.
suid
=
id
.
suid
,
.
uid
=
id
.
suid
?
0
:
id
.
uid
};
code
=
tBlockDataInit
(
pBData
,
&
tid
,
pCommitter
->
skmTable
.
pTSchema
,
NULL
,
0
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
tsdbAppendLastBlock
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
SBlockData
*
pBData
=
&
pCommitter
->
dWriter
.
bData
;
TABLEID
id
=
{.
suid
=
pBData
->
suid
,
.
uid
=
pBData
->
uid
};
code
=
tsdbInitSttBlockBuilderIfNeed
(
pCommitter
,
id
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
for
(
int32_t
iRow
=
0
;
iRow
<
pBData
->
nRow
;
iRow
++
)
{
TSDBROW
row
=
tsdbRowFromBlockData
(
pBData
,
iRow
);
code
=
tBlockDataAppendRow
(
&
pCommitter
->
dWriter
.
bDatal
,
&
row
,
NULL
,
id
.
uid
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
pCommitter
->
dWriter
.
bDatal
.
nRow
>=
pCommitter
->
maxRow
)
{
code
=
tsdbWriteSttBlock
(
pCommitter
->
dWriter
.
pWriter
,
&
pCommitter
->
dWriter
.
bDatal
,
pCommitter
->
dWriter
.
aSttBlk
,
pCommitter
->
cmprAlg
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
tsdbCommitTableData
(
SCommitter
*
pCommitter
,
TABLEID
id
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
SRowInfo
*
pRowInfo
=
tsdbGetCommitRow
(
pCommitter
);
if
(
pRowInfo
&&
(
pRowInfo
->
suid
!=
id
.
suid
||
pRowInfo
->
uid
!=
id
.
uid
))
{
pRowInfo
=
NULL
;
}
if
(
pRowInfo
==
NULL
)
goto
_exit
;
if
(
pCommitter
->
toLastOnly
)
{
code
=
tsdbInitSttBlockBuilderIfNeed
(
pCommitter
,
id
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
while
(
pRowInfo
)
{
STSchema
*
pTSchema
=
NULL
;
if
(
pRowInfo
->
row
.
type
==
TSDBROW_ROW_FMT
)
{
code
=
tsdbCommitterUpdateRowSchema
(
pCommitter
,
id
.
suid
,
id
.
uid
,
TSDBROW_SVERSION
(
&
pRowInfo
->
row
));
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
pTSchema
=
pCommitter
->
skmRow
.
pTSchema
;
}
code
=
tBlockDataAppendRow
(
&
pCommitter
->
dWriter
.
bDatal
,
&
pRowInfo
->
row
,
pTSchema
,
id
.
uid
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbNextCommitRow
(
pCommitter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
pRowInfo
=
tsdbGetCommitRow
(
pCommitter
);
if
(
pRowInfo
&&
(
pRowInfo
->
suid
!=
id
.
suid
||
pRowInfo
->
uid
!=
id
.
uid
))
{
pRowInfo
=
NULL
;
}
if
(
pCommitter
->
dWriter
.
bDatal
.
nRow
>=
pCommitter
->
maxRow
)
{
code
=
tsdbWriteSttBlock
(
pCommitter
->
dWriter
.
pWriter
,
&
pCommitter
->
dWriter
.
bDatal
,
pCommitter
->
dWriter
.
aSttBlk
,
pCommitter
->
cmprAlg
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
else
{
SBlockData
*
pBData
=
&
pCommitter
->
dWriter
.
bData
;
ASSERT
(
pBData
->
nRow
==
0
);
while
(
pRowInfo
)
{
STSchema
*
pTSchema
=
NULL
;
if
(
pRowInfo
->
row
.
type
==
TSDBROW_ROW_FMT
)
{
code
=
tsdbCommitterUpdateRowSchema
(
pCommitter
,
id
.
suid
,
id
.
uid
,
TSDBROW_SVERSION
(
&
pRowInfo
->
row
));
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
pTSchema
=
pCommitter
->
skmRow
.
pTSchema
;
}
code
=
tBlockDataAppendRow
(
pBData
,
&
pRowInfo
->
row
,
pTSchema
,
id
.
uid
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbNextCommitRow
(
pCommitter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
pRowInfo
=
tsdbGetCommitRow
(
pCommitter
);
if
(
pRowInfo
&&
(
pRowInfo
->
suid
!=
id
.
suid
||
pRowInfo
->
uid
!=
id
.
uid
))
{
pRowInfo
=
NULL
;
}
if
(
pBData
->
nRow
>=
pCommitter
->
maxRow
)
{
code
=
tsdbWriteDataBlock
(
pCommitter
->
dWriter
.
pWriter
,
pBData
,
&
pCommitter
->
dWriter
.
mBlock
,
pCommitter
->
cmprAlg
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
if
(
pBData
->
nRow
)
{
if
(
pBData
->
nRow
>
pCommitter
->
minRow
)
{
code
=
tsdbWriteDataBlock
(
pCommitter
->
dWriter
.
pWriter
,
pBData
,
&
pCommitter
->
dWriter
.
mBlock
,
pCommitter
->
cmprAlg
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
code
=
tsdbAppendLastBlock
(
pCommitter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
}
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
tsdbCommitFileDataImpl
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
SRowInfo
*
pRowInfo
;
TABLEID
id
=
{
0
};
while
((
pRowInfo
=
tsdbGetCommitRow
(
pCommitter
))
!=
NULL
)
{
ASSERT
(
pRowInfo
->
suid
!=
id
.
suid
||
pRowInfo
->
uid
!=
id
.
uid
);
id
.
suid
=
pRowInfo
->
suid
;
id
.
uid
=
pRowInfo
->
uid
;
code
=
tsdbMoveCommitData
(
pCommitter
,
id
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// start
tMapDataReset
(
&
pCommitter
->
dWriter
.
mBlock
);
// impl
code
=
tsdbUpdateTableSchema
(
pCommitter
->
pTsdb
->
pVnode
->
pMeta
,
id
.
suid
,
id
.
uid
,
&
pCommitter
->
skmTable
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tBlockDataInit
(
&
pCommitter
->
dReader
.
bData
,
&
id
,
pCommitter
->
skmTable
.
pTSchema
,
NULL
,
0
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tBlockDataInit
(
&
pCommitter
->
dWriter
.
bData
,
&
id
,
pCommitter
->
skmTable
.
pTSchema
,
NULL
,
0
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
/* merge with data in .data file */
code
=
tsdbMergeTableData
(
pCommitter
,
id
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
/* handle remain table data */
code
=
tsdbCommitTableData
(
pCommitter
,
id
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// end
if
(
pCommitter
->
dWriter
.
mBlock
.
nItem
>
0
)
{
SBlockIdx
blockIdx
=
{.
suid
=
id
.
suid
,
.
uid
=
id
.
uid
};
code
=
tsdbWriteDataBlk
(
pCommitter
->
dWriter
.
pWriter
,
&
pCommitter
->
dWriter
.
mBlock
,
&
blockIdx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
taosArrayPush
(
pCommitter
->
dWriter
.
aBlockIdx
,
&
blockIdx
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
}
id
.
suid
=
INT64_MAX
;
id
.
uid
=
INT64_MAX
;
code
=
tsdbMoveCommitData
(
pCommitter
,
id
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbWriteSttBlock
(
pCommitter
->
dWriter
.
pWriter
,
&
pCommitter
->
dWriter
.
bDatal
,
pCommitter
->
dWriter
.
aSttBlk
,
pCommitter
->
cmprAlg
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pCommitter
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
int32_t
tsdbPreCommit
(
STsdb
*
pTsdb
)
{
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
ASSERT
(
pTsdb
->
imem
==
NULL
);
pTsdb
->
imem
=
pTsdb
->
mem
;
pTsdb
->
mem
=
NULL
;
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
return
0
;
}
int32_t
tsdbCommitBegin
(
STsdb
*
pTsdb
,
SCommitInfo
*
pInfo
)
{
if
(
!
pTsdb
)
return
0
;
int32_t
code
=
0
;
int32_t
lino
=
0
;
SCommitter
commith
;
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
// check
if
(
pMemTable
->
nRow
==
0
&&
pMemTable
->
nDel
==
0
)
{
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
pTsdb
->
imem
=
NULL
;
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
tsdbUnrefMemTable
(
pMemTable
,
NULL
,
true
);
goto
_exit
;
}
// start commit
code
=
tsdbStartCommit
(
pTsdb
,
&
commith
,
pInfo
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// commit impl
code
=
tsdbCommitData
(
&
commith
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbCommitDel
(
&
commith
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// end commit
code
=
tsdbEndCommit
(
&
commith
,
0
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
tsdbEndCommit
(
&
commith
,
code
);
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
tsdbInfo
(
"vgId:%d %s done, nRow:%"
PRId64
" nDel:%"
PRId64
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
pMem
->
nRow
,
pMem
->
nDel
);
}
return
code
;
}
#if 0
int32_t tsdbCommitCommit(STsdb *pTsdb) {
int32_t code = 0;
int32_t lino = 0;
...
...
@@ -1456,3 +209,4 @@ _exit:
}
return code;
}
#endif
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录