Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
737820ee
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
737820ee
编写于
6月 01, 2022
作者:
C
Cary Xu
提交者:
GitHub
6月 01, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13371 from taosdata/feature/TD-14481-3.0
feat: tsma/rsma refactor
上级
d948a43f
648e0ac1
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
63 addition
and
24 deletion
+63
-24
include/common/tmsg.h
include/common/tmsg.h
+1
-0
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/common/src/tmsg.c
source/common/src/tmsg.c
+2
-0
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+21
-2
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+2
-0
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+2
-5
source/dnode/vnode/src/sma/smaOpen.c
source/dnode/vnode/src/sma/smaOpen.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+1
-1
source/dnode/vnode/src/vnd/vnodeCfg.c
source/dnode/vnode/src/vnd/vnodeCfg.c
+3
-0
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+1
-2
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+1
-1
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+26
-12
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
737820ee
...
...
@@ -2305,6 +2305,7 @@ typedef struct {
int8_t
intervalUnit
;
// MACRO: TIME_UNIT_XXX
int8_t
slidingUnit
;
// MACRO: TIME_UNIT_XXX
int8_t
timezoneInt
;
// sma data expired if timezone changes.
int32_t
dstVgId
;
char
indexName
[
TSDB_INDEX_NAME_LEN
];
int32_t
exprLen
;
int32_t
tagsFilterLen
;
...
...
include/util/taoserror.h
浏览文件 @
737820ee
...
...
@@ -70,6 +70,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0028)
#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029)
#define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030)
#define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041)
...
...
source/common/src/tmsg.c
浏览文件 @
737820ee
...
...
@@ -3654,6 +3654,7 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
if
(
tEncodeI8
(
pCoder
,
pSma
->
intervalUnit
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pCoder
,
pSma
->
slidingUnit
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pCoder
,
pSma
->
timezoneInt
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pCoder
,
pSma
->
dstVgId
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pCoder
,
pSma
->
indexName
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pCoder
,
pSma
->
exprLen
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pCoder
,
pSma
->
tagsFilterLen
)
<
0
)
return
-
1
;
...
...
@@ -3676,6 +3677,7 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
if
(
tDecodeI8
(
pCoder
,
&
pSma
->
version
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pCoder
,
&
pSma
->
intervalUnit
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pCoder
,
&
pSma
->
slidingUnit
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pCoder
,
&
pSma
->
dstVgId
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pCoder
,
&
pSma
->
timezoneInt
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pCoder
,
pSma
->
indexName
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pCoder
,
&
pSma
->
exprLen
)
<
0
)
return
-
1
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
737820ee
...
...
@@ -150,8 +150,13 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg
->
tsdbCfg
.
minRows
=
pCreate
->
minRows
;
pCfg
->
tsdbCfg
.
maxRows
=
pCreate
->
maxRows
;
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
pCreate
->
pRetensions
);
++
i
)
{
memcpy
(
&
pCfg
->
tsdbCfg
.
retentions
[
i
],
taosArrayGet
(
pCreate
->
pRetensions
,
i
),
sizeof
(
SRetention
));
SRetention
*
pRetention
=
&
pCfg
->
tsdbCfg
.
retentions
[
i
];
memcpy
(
pRetention
,
taosArrayGet
(
pCreate
->
pRetensions
,
i
),
sizeof
(
SRetention
));
if
(
i
==
0
)
{
if
((
pRetention
->
freq
>
0
&&
pRetention
->
keep
>
0
))
pCfg
->
isRsma
=
1
;
}
}
pCfg
->
walCfg
.
vgId
=
pCreate
->
vgId
;
pCfg
->
hashBegin
=
pCreate
->
hashBegin
;
pCfg
->
hashEnd
=
pCreate
->
hashEnd
;
...
...
@@ -218,9 +223,20 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
code
=
vmOpenVnode
(
pMgmt
,
&
wrapperCfg
,
pImpl
);
if
(
code
!=
0
)
{
dError
(
"vgId:%d, failed to open vnode since %s"
,
createReq
.
vgId
,
terrstr
());
code
=
terrno
;
goto
_OVER
;
}
if
(
createReq
.
isTsma
)
{
SMsgHead
*
smaMsg
=
createReq
.
pTsma
;
uint32_t
contLen
=
(
uint32_t
)(
htonl
(
smaMsg
->
contLen
)
-
sizeof
(
SMsgHead
));
if
(
vnodeProcessCreateTSma
(
pImpl
,
POINTER_SHIFT
(
smaMsg
,
sizeof
(
SMsgHead
)),
contLen
)
<
0
)
{
dError
(
"vgId:%d, failed to create tsma since %s"
,
createReq
.
vgId
,
terrstr
());
code
=
terrno
;
goto
_OVER
;
};
}
code
=
vnodeStart
(
pImpl
);
if
(
code
!=
0
)
{
dError
(
"vgId:%d, failed to start sync since %s"
,
createReq
.
vgId
,
terrstr
());
...
...
@@ -228,7 +244,10 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
code
=
vmWriteVnodeListToFile
(
pMgmt
);
if
(
code
!=
0
)
goto
_OVER
;
if
(
code
!=
0
)
{
code
=
terrno
;
goto
_OVER
;
}
_OVER:
if
(
code
!=
0
)
{
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
737820ee
...
...
@@ -68,6 +68,7 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId);
int32_t
vnodeSnapshotReaderOpen
(
SVnode
*
pVnode
,
SVSnapshotReader
**
ppReader
,
int64_t
sver
,
int64_t
ever
);
int32_t
vnodeSnapshotReaderClose
(
SVSnapshotReader
*
pReader
);
int32_t
vnodeSnapshotRead
(
SVSnapshotReader
*
pReader
,
const
void
**
ppData
,
uint32_t
*
nData
);
int32_t
vnodeProcessCreateTSma
(
SVnode
*
pVnode
,
void
*
pCont
,
uint32_t
contLen
);
// meta
typedef
struct
SMeta
SMeta
;
// todo: remove
...
...
@@ -172,6 +173,7 @@ struct SVnodeCfg {
bool
isHeap
;
bool
isWeak
;
int8_t
isTsma
;
int8_t
isRsma
;
int8_t
hashMethod
;
STsdbCfg
tsdbCfg
;
SWalCfg
walCfg
;
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
737820ee
...
...
@@ -239,6 +239,8 @@ struct SVnode {
#define VND_RSMA1(vnd) ((vnd)->pSma->pRSmaTsdb1)
#define VND_RSMA2(vnd) ((vnd)->pSma->pRSmaTsdb2)
#define VND_RETENTIONS(vnd) (&(vnd)->config.tsdbCfg.retentions)
#define VND_IS_RSMA(v) ((v)->config.isRsma == 1)
#define VND_IS_TSMA(v) ((v)->config.isTsma == 1)
struct
STbUidStore
{
tb_uid_t
suid
;
...
...
@@ -271,11 +273,6 @@ struct SSma {
#define SMA_RSMA_TSDB1(s) ((s)->pRSmaTsdb1)
#define SMA_RSMA_TSDB2(s) ((s)->pRSmaTsdb2)
static
FORCE_INLINE
bool
vnodeIsRollup
(
SVnode
*
pVnode
)
{
SRetention
*
pRetention
=
&
(
pVnode
->
config
.
tsdbCfg
.
retentions
[
0
]);
return
(
pRetention
->
freq
>
0
&&
pRetention
->
keep
>
0
);
}
// sma
void
smaHandleRes
(
void
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
);
...
...
source/dnode/vnode/src/sma/smaOpen.c
浏览文件 @
737820ee
...
...
@@ -104,7 +104,7 @@ int32_t smaOpen(SVnode *pVnode) {
taosThreadMutexInit
(
&
pSma
->
mutex
,
NULL
);
pSma
->
locked
=
false
;
if
(
vnodeIsRollup
(
pVnode
))
{
if
(
VND_IS_RSMA
(
pVnode
))
{
STsdbKeepCfg
keepCfg
=
{
0
};
for
(
int
i
=
0
;
i
<
TSDB_RETENTION_MAX
;
++
i
)
{
if
(
i
==
TSDB_RETENTION_L0
)
{
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
737820ee
...
...
@@ -333,7 +333,7 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableData
}
static
STsdb
*
getTsdbByRetentions
(
SVnode
*
pVnode
,
STsdbReadHandle
*
pReadHandle
,
TSKEY
winSKey
,
SRetention
*
retentions
)
{
if
(
vnodeIsRollup
(
pVnode
))
{
if
(
VND_IS_RSMA
(
pVnode
))
{
int
level
=
0
;
int64_t
now
=
taosGetTimestamp
(
pVnode
->
config
.
tsdbCfg
.
precision
);
...
...
source/dnode/vnode/src/vnd/vnodeCfg.c
浏览文件 @
737820ee
...
...
@@ -57,6 +57,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if
(
tjsonAddIntegerToObject
(
pJson
,
"isHeap"
,
pCfg
->
isHeap
)
<
0
)
return
-
1
;
if
(
tjsonAddIntegerToObject
(
pJson
,
"isWeak"
,
pCfg
->
isWeak
)
<
0
)
return
-
1
;
if
(
tjsonAddIntegerToObject
(
pJson
,
"isTsma"
,
pCfg
->
isTsma
)
<
0
)
return
-
1
;
if
(
tjsonAddIntegerToObject
(
pJson
,
"isRsma"
,
pCfg
->
isRsma
)
<
0
)
return
-
1
;
if
(
tjsonAddIntegerToObject
(
pJson
,
"precision"
,
pCfg
->
tsdbCfg
.
precision
)
<
0
)
return
-
1
;
if
(
tjsonAddIntegerToObject
(
pJson
,
"update"
,
pCfg
->
tsdbCfg
.
update
)
<
0
)
return
-
1
;
if
(
tjsonAddIntegerToObject
(
pJson
,
"compression"
,
pCfg
->
tsdbCfg
.
compression
)
<
0
)
return
-
1
;
...
...
@@ -133,6 +134,8 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if
(
code
<
0
)
return
-
1
;
tjsonGetNumberValue
(
pJson
,
"isTsma"
,
pCfg
->
isTsma
,
code
);
if
(
code
<
0
)
return
-
1
;
tjsonGetNumberValue
(
pJson
,
"isRsma"
,
pCfg
->
isRsma
,
code
);
if
(
code
<
0
)
return
-
1
;
tjsonGetNumberValue
(
pJson
,
"precision"
,
pCfg
->
tsdbCfg
.
precision
,
code
);
if
(
code
<
0
)
return
-
1
;
tjsonGetNumberValue
(
pJson
,
"update"
,
pCfg
->
tsdbCfg
.
update
,
code
);
...
...
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
737820ee
...
...
@@ -230,7 +230,7 @@ int vnodeCommit(SVnode *pVnode) {
return
-
1
;
}
if
(
vnodeIsRollup
(
pVnode
))
{
if
(
VND_IS_RSMA
(
pVnode
))
{
if
(
tsdbCommit
(
VND_RSMA0
(
pVnode
))
<
0
)
{
ASSERT
(
0
);
return
-
1
;
...
...
@@ -250,7 +250,6 @@ int vnodeCommit(SVnode *pVnode) {
}
}
if
(
tqCommit
(
pVnode
->
pTq
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
737820ee
...
...
@@ -97,7 +97,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
}
// open tsdb
if
(
!
vnodeIsRollup
(
pVnode
)
&&
tsdbOpen
(
pVnode
,
&
VND_TSDB
(
pVnode
),
VNODE_TSDB_DIR
,
NULL
)
<
0
)
{
if
(
!
VND_IS_RSMA
(
pVnode
)
&&
tsdbOpen
(
pVnode
,
&
VND_TSDB
(
pVnode
),
VNODE_TSDB_DIR
,
NULL
)
<
0
)
{
vError
(
"vgId:%d failed to open vnode tsdb since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
}
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
737820ee
...
...
@@ -776,8 +776,7 @@ _exit:
// TODO: the partial success scenario and the error case
// TODO: refactor
if
((
terrno
==
TSDB_CODE_SUCCESS
||
terrno
==
TSDB_CODE_TDB_TABLE_ALREADY_EXIST
)
&&
(
pRsp
->
code
==
TSDB_CODE_SUCCESS
))
{
if
((
terrno
==
TSDB_CODE_SUCCESS
)
&&
(
pRsp
->
code
==
TSDB_CODE_SUCCESS
))
{
tdProcessRSmaSubmit
(
pVnode
->
pSma
,
pReq
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
);
}
...
...
@@ -788,16 +787,19 @@ static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq
SVCreateTSmaReq
req
=
{
0
};
SDecoder
coder
;
pRsp
->
msgType
=
TDMT_VND_CREATE_SMA_RSP
;
pRsp
->
code
=
TSDB_CODE_SUCCESS
;
pRsp
->
pCont
=
NULL
;
pRsp
->
contLen
=
0
;
if
(
pRsp
)
{
pRsp
->
msgType
=
TDMT_VND_CREATE_SMA_RSP
;
pRsp
->
code
=
TSDB_CODE_SUCCESS
;
pRsp
->
pCont
=
NULL
;
pRsp
->
contLen
=
0
;
}
// decode and process req
tDecoderInit
(
&
coder
,
pReq
,
len
);
if
(
tDecodeSVCreateTSmaReq
(
&
coder
,
&
req
)
<
0
)
{
pRsp
->
code
=
terrno
;
terrno
=
TSDB_CODE_MSG_DECODE_ERROR
;
if
(
pRsp
)
pRsp
->
code
=
terrno
;
goto
_err
;
}
...
...
@@ -805,18 +807,30 @@ static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq
req
.
timezoneInt
=
tsTimezone
;
if
(
tdProcessTSmaCreate
(
pVnode
->
pSma
,
version
,
(
const
char
*
)
&
req
)
<
0
)
{
pRsp
->
code
=
terrno
;
if
(
pRsp
)
pRsp
->
code
=
terrno
;
goto
_err
;
}
tDecoderClear
(
&
coder
);
vDebug
(
"vgId:%d success to create tsma %s:%"
PRIi64
"
for table %"
PRIi64
,
TD_VID
(
pVnode
),
req
.
indexName
,
req
.
index
Uid
,
req
.
tableUid
);
vDebug
(
"vgId:%d success to create tsma %s:%"
PRIi64
"
version %"
PRIi64
" for table %"
PRIi64
,
TD_VID
(
pVnode
)
,
req
.
index
Name
,
req
.
indexUid
,
version
,
req
.
tableUid
);
return
0
;
_err:
tDecoderClear
(
&
coder
);
vError
(
"vgId:%d failed to create tsma %s:%"
PRIi64
"
for table %"
PRIi64
" since %s"
,
TD_VID
(
pVnode
),
req
.
indexName
,
req
.
indexUid
,
req
.
tableUid
,
terrstr
(
terrno
));
vError
(
"vgId:%d failed to create tsma %s:%"
PRIi64
"
version %"
PRIi64
"for table %"
PRIi64
" since %s"
,
TD_VID
(
pVnode
),
req
.
indexName
,
req
.
indexUid
,
version
,
req
.
tableUid
,
terrstr
(
terrno
));
return
-
1
;
}
/**
* @brief specific for smaDstVnode
*
* @param pVnode
* @param pCont
* @param contLen
* @return int32_t
*/
int32_t
vnodeProcessCreateTSma
(
SVnode
*
pVnode
,
void
*
pCont
,
uint32_t
contLen
)
{
return
vnodeProcessCreateTSmaReq
(
pVnode
,
1
,
pCont
,
contLen
,
NULL
);
}
source/util/src/terror.c
浏览文件 @
737820ee
...
...
@@ -75,6 +75,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DUP_KEY, "Cannot add duplicate
TAOS_DEFINE_ERROR
(
TSDB_CODE_NEED_RETRY
,
"Retry needed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE
,
"Out of memory in rpc queue"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_TIMESTAMP
,
"Invalid timestamp format"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MSG_DECODE_ERROR
,
"Msg decode error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_REF_NO_MEMORY
,
"Ref out of memory"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_REF_FULL
,
"too many Ref Objs"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录