Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c85f9f89
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
c85f9f89
编写于
6月 01, 2022
作者:
C
Cary Xu
提交者:
GitHub
6月 01, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13327 from taosdata/feature/TD-14481-3.0
feat: tsma refactor
上级
5422db24
1e719603
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
45 addition
and
15 deletion
+45
-15
include/common/tmsg.h
include/common/tmsg.h
+1
-0
include/util/tdef.h
include/util/tdef.h
+1
-1
include/util/tencode.h
include/util/tencode.h
+12
-8
source/common/src/tmsg.c
source/common/src/tmsg.c
+11
-0
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+2
-1
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+1
-0
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+8
-2
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+2
-0
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+2
-1
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+1
-1
source/dnode/vnode/src/vnd/vnodeCfg.c
source/dnode/vnode/src/vnd/vnodeCfg.c
+3
-0
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+1
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
c85f9f89
...
...
@@ -1033,6 +1033,7 @@ typedef struct {
// for tsma
int8_t
isTsma
;
void
*
pTsma
;
}
SCreateVnodeReq
;
...
...
include/util/tdef.h
浏览文件 @
c85f9f89
...
...
@@ -343,7 +343,7 @@ typedef enum ELogicConditionType {
#define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF
#define TSDB_MIN_ROLLUP_FILE_FACTOR 0
#define TSDB_MAX_ROLLUP_FILE_FACTOR 1
#define TSDB_MAX_ROLLUP_FILE_FACTOR 1
0
#define TSDB_DEFAULT_ROLLUP_FILE_FACTOR 0.1
#define TSDB_MIN_TABLE_TTL 0
#define TSDB_DEFAULT_TABLE_TTL 0
...
...
include/util/tencode.h
浏览文件 @
c85f9f89
...
...
@@ -378,14 +378,16 @@ static FORCE_INLINE int32_t tDecodeDouble(SDecoder* pCoder, double* val) {
}
static
FORCE_INLINE
int32_t
tDecodeBinary
(
SDecoder
*
pCoder
,
uint8_t
**
val
,
uint32_t
*
len
)
{
if
(
tDecodeU32v
(
pCoder
,
len
)
<
0
)
return
-
1
;
uint32_t
length
=
0
;
if
(
tDecodeU32v
(
pCoder
,
&
length
)
<
0
)
return
-
1
;
if
(
len
)
*
len
=
length
;
if
(
TD_CODER_CHECK_CAPACITY_FAILED
(
pCoder
,
*
len
))
return
-
1
;
if
(
TD_CODER_CHECK_CAPACITY_FAILED
(
pCoder
,
length
))
return
-
1
;
if
(
val
)
{
*
val
=
(
uint8_t
*
)
TD_CODER_CURRENT
(
pCoder
);
}
TD_CODER_MOVE_POS
(
pCoder
,
*
len
);
TD_CODER_MOVE_POS
(
pCoder
,
length
);
return
0
;
}
...
...
@@ -410,14 +412,16 @@ static int32_t tDecodeCStrTo(SDecoder* pCoder, char* val) {
}
static
FORCE_INLINE
int32_t
tDecodeBinaryAlloc
(
SDecoder
*
pCoder
,
void
**
val
,
uint64_t
*
len
)
{
if
(
tDecodeU64v
(
pCoder
,
len
)
<
0
)
return
-
1
;
uint64_t
length
=
0
;
if
(
tDecodeU64v
(
pCoder
,
&
length
)
<
0
)
return
-
1
;
if
(
len
)
*
len
=
length
;
if
(
TD_CODER_CHECK_CAPACITY_FAILED
(
pCoder
,
*
len
))
return
-
1
;
*
val
=
taosMemoryMalloc
(
*
len
);
if
(
TD_CODER_CHECK_CAPACITY_FAILED
(
pCoder
,
length
))
return
-
1
;
*
val
=
taosMemoryMalloc
(
length
);
if
(
*
val
==
NULL
)
return
-
1
;
memcpy
(
*
val
,
TD_CODER_CURRENT
(
pCoder
),
*
len
);
memcpy
(
*
val
,
TD_CODER_CURRENT
(
pCoder
),
length
);
TD_CODER_MOVE_POS
(
pCoder
,
*
len
);
TD_CODER_MOVE_POS
(
pCoder
,
length
);
return
0
;
}
...
...
source/common/src/tmsg.c
浏览文件 @
c85f9f89
...
...
@@ -2973,6 +2973,11 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
}
if
(
tEncodeI8
(
&
encoder
,
pReq
->
isTsma
)
<
0
)
return
-
1
;
if
(
pReq
->
isTsma
)
{
uint32_t
tsmaLen
=
(
uint32_t
)(
htonl
(((
SMsgHead
*
)
pReq
->
pTsma
)
->
contLen
));
if
(
tEncodeBinary
(
&
encoder
,
(
const
uint8_t
*
)
pReq
->
pTsma
,
tsmaLen
)
<
0
)
return
-
1
;
}
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
...
...
@@ -3036,6 +3041,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
}
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
isTsma
)
<
0
)
return
-
1
;
if
(
pReq
->
isTsma
)
{
if
(
tDecodeBinaryAlloc
(
&
decoder
,
&
pReq
->
pTsma
,
NULL
)
<
0
)
return
-
1
;
}
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
...
...
@@ -3045,6 +3053,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
int32_t
tFreeSCreateVnodeReq
(
SCreateVnodeReq
*
pReq
)
{
taosArrayDestroy
(
pReq
->
pRetensions
);
pReq
->
pRetensions
=
NULL
;
if
(
pReq
->
isTsma
)
{
taosMemoryFreeClear
(
pReq
->
pTsma
);
}
return
0
;
}
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
c85f9f89
...
...
@@ -140,6 +140,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg
->
szCache
=
pCreate
->
pages
;
pCfg
->
szBuf
=
(
uint64_t
)
pCreate
->
buffer
*
1024
*
1024
;
pCfg
->
isWeak
=
true
;
pCfg
->
isTsma
=
pCreate
->
isTsma
;
pCfg
->
tsdbCfg
.
compression
=
pCreate
->
compression
;
pCfg
->
tsdbCfg
.
precision
=
pCreate
->
precision
;
pCfg
->
tsdbCfg
.
days
=
pCreate
->
daysPerFile
;
...
...
@@ -209,7 +210,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SVnode
*
pImpl
=
vnodeOpen
(
path
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
if
(
pImpl
==
NULL
)
{
dError
(
"vgId:%d, failed to
create
vnode since %s"
,
createReq
.
vgId
,
terrstr
());
dError
(
"vgId:%d, failed to
open
vnode since %s"
,
createReq
.
vgId
,
terrstr
());
code
=
terrno
;
goto
_OVER
;
}
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
c85f9f89
...
...
@@ -344,6 +344,7 @@ typedef struct {
int8_t
isTsma
;
int8_t
replica
;
SVnodeGid
vnodeGid
[
TSDB_MAX_REPLICA
];
void
*
pTsma
;
}
SVgObj
;
typedef
struct
{
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
c85f9f89
...
...
@@ -409,7 +409,8 @@ static int32_t mndSetCreateSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return
0
;
}
static
int32_t
mndSetCreateSmaVgroupRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
static
int32_t
mndSetCreateSmaVgroupRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
SSmaObj
*
pSma
)
{
SVnodeGid
*
pVgid
=
pVgroup
->
vnodeGid
+
0
;
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
if
(
pDnode
==
NULL
)
return
-
1
;
...
...
@@ -419,9 +420,14 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
mndReleaseDnode
(
pMnode
,
pDnode
);
// todo add sma info here
int32_t
smaContLen
=
0
;
void
*
pSmaReq
=
mndBuildVCreateSmaReq
(
pMnode
,
pVgroup
,
pSma
,
&
smaContLen
);
if
(
pSmaReq
==
NULL
)
return
-
1
;
pVgroup
->
pTsma
=
pSmaReq
;
int32_t
contLen
=
0
;
void
*
pReq
=
mndBuildCreateVnodeReq
(
pMnode
,
pDnode
,
pDb
,
pVgroup
,
&
contLen
);
taosMemoryFreeClear
(
pSmaReq
);
if
(
pReq
==
NULL
)
return
-
1
;
action
.
pCont
=
pReq
;
...
...
@@ -514,7 +520,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
if
(
mndSetCreateSmaCommitLogs
(
pMnode
,
pTrans
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaVgroupCommitLogs
(
pMnode
,
pTrans
,
&
streamObj
.
fixedSinkVg
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaRedoActions
(
pMnode
,
pTrans
,
pDb
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaVgroupRedoActions
(
pMnode
,
pTrans
,
pDb
,
&
streamObj
.
fixedSinkVg
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaVgroupRedoActions
(
pMnode
,
pTrans
,
pDb
,
&
streamObj
.
fixedSinkVg
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndAddStreamToTrans
(
pMnode
,
&
streamObj
,
pCreate
->
ast
,
STREAM_TRIGGER_AT_ONCE
,
0
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
c85f9f89
...
...
@@ -218,6 +218,8 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
createReq
.
hashMethod
=
pDb
->
cfg
.
hashMethod
;
createReq
.
numOfRetensions
=
pDb
->
cfg
.
numOfRetensions
;
createReq
.
pRetensions
=
pDb
->
cfg
.
pRetensions
;
createReq
.
isTsma
=
pVgroup
->
isTsma
;
createReq
.
pTsma
=
pVgroup
->
pTsma
;
for
(
int32_t
v
=
0
;
v
<
pVgroup
->
replica
;
++
v
)
{
SReplica
*
pReplica
=
&
createReq
.
replicas
[
v
];
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
c85f9f89
...
...
@@ -171,12 +171,13 @@ struct SVnodeCfg {
uint64_t
szBuf
;
bool
isHeap
;
bool
isWeak
;
int8_t
isTsma
;
int8_t
hashMethod
;
STsdbCfg
tsdbCfg
;
SWalCfg
walCfg
;
SSyncCfg
syncCfg
;
uint32_t
hashBegin
;
uint32_t
hashEnd
;
int8_t
hashMethod
;
};
typedef
struct
{
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
c85f9f89
...
...
@@ -414,7 +414,7 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
}
taosMemoryFreeClear
(
pReq
);
}
else
{
sma
Warn
(
"vgId:%d no rsma % "
PRIi8
" data generated since %s"
,
SMA_VID
(
pSma
),
level
,
tstrerror
(
terrno
));
sma
Debug
(
"vgId:%d no rsma % "
PRIi8
" data generated since %s"
,
SMA_VID
(
pSma
),
level
,
tstrerror
(
terrno
));
}
taosArrayDestroy
(
pResult
);
...
...
source/dnode/vnode/src/vnd/vnodeCfg.c
浏览文件 @
c85f9f89
...
...
@@ -56,6 +56,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if
(
tjsonAddIntegerToObject
(
pJson
,
"szBuf"
,
pCfg
->
szBuf
)
<
0
)
return
-
1
;
if
(
tjsonAddIntegerToObject
(
pJson
,
"isHeap"
,
pCfg
->
isHeap
)
<
0
)
return
-
1
;
if
(
tjsonAddIntegerToObject
(
pJson
,
"isWeak"
,
pCfg
->
isWeak
)
<
0
)
return
-
1
;
if
(
tjsonAddIntegerToObject
(
pJson
,
"isTsma"
,
pCfg
->
isTsma
)
<
0
)
return
-
1
;
if
(
tjsonAddIntegerToObject
(
pJson
,
"precision"
,
pCfg
->
tsdbCfg
.
precision
)
<
0
)
return
-
1
;
if
(
tjsonAddIntegerToObject
(
pJson
,
"update"
,
pCfg
->
tsdbCfg
.
update
)
<
0
)
return
-
1
;
if
(
tjsonAddIntegerToObject
(
pJson
,
"compression"
,
pCfg
->
tsdbCfg
.
compression
)
<
0
)
return
-
1
;
...
...
@@ -130,6 +131,8 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if
(
code
<
0
)
return
-
1
;
tjsonGetNumberValue
(
pJson
,
"isWeak"
,
pCfg
->
isWeak
,
code
);
if
(
code
<
0
)
return
-
1
;
tjsonGetNumberValue
(
pJson
,
"isTsma"
,
pCfg
->
isTsma
,
code
);
if
(
code
<
0
)
return
-
1
;
tjsonGetNumberValue
(
pJson
,
"precision"
,
pCfg
->
tsdbCfg
.
precision
,
code
);
if
(
code
<
0
)
return
-
1
;
tjsonGetNumberValue
(
pJson
,
"update"
,
pCfg
->
tsdbCfg
.
update
,
code
);
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
c85f9f89
...
...
@@ -97,7 +97,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
}
// open tsdb
if
(
!
vnodeIsRollup
(
pVnode
)
&&
tsdbOpen
(
pVnode
,
&
VND_TSDB
(
pVnode
),
VNODE_TSDB_DIR
,
TSDB_TYPE_TSDB
)
<
0
)
{
if
(
!
vnodeIsRollup
(
pVnode
)
&&
tsdbOpen
(
pVnode
,
&
VND_TSDB
(
pVnode
),
VNODE_TSDB_DIR
,
NULL
)
<
0
)
{
vError
(
"vgId:%d failed to open vnode tsdb since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录