Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e88b47b9
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e88b47b9
编写于
3月 24, 2022
作者:
P
plum-lihui
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of github.com:taosdata/TDengine into 3.0
上级
c1f25f3a
8431ff34
变更
21
展开全部
隐藏空白更改
内联
并排
Showing
21 changed file
with
1323 addition
and
37 deletion
+1323
-37
include/common/tmsg.h
include/common/tmsg.h
+41
-4
include/common/tmsgdef.h
include/common/tmsgdef.h
+2
-0
include/dnode/mnode/sdb/sdb.h
include/dnode/mnode/sdb/sdb.h
+5
-4
include/util/taoserror.h
include/util/taoserror.h
+18
-12
source/common/src/tmsg.c
source/common/src/tmsg.c
+126
-1
source/dnode/mgmt/mnode/src/mmMsg.c
source/dnode/mgmt/mnode/src/mmMsg.c
+4
-0
source/dnode/mgmt/vnode/src/vmMsg.c
source/dnode/mgmt/vnode/src/vmMsg.c
+3
-0
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+29
-1
source/dnode/mnode/impl/inc/mndInt.h
source/dnode/mnode/impl/inc/mndInt.h
+3
-3
source/dnode/mnode/impl/inc/mndSma.h
source/dnode/mnode/impl/inc/mndSma.h
+34
-0
source/dnode/mnode/impl/src/mndShow.c
source/dnode/mnode/impl/src/mndShow.c
+2
-0
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+762
-0
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+25
-7
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+1
-1
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+4
-0
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+6
-4
source/dnode/mnode/impl/test/CMakeLists.txt
source/dnode/mnode/impl/test/CMakeLists.txt
+1
-0
source/dnode/mnode/impl/test/sma/CMakeLists.txt
source/dnode/mnode/impl/test/sma/CMakeLists.txt
+11
-0
source/dnode/mnode/impl/test/sma/sma.cpp
source/dnode/mnode/impl/test/sma/sma.cpp
+236
-0
source/dnode/vnode/src/vnd/vnodeWrite.c
source/dnode/vnode/src/vnd/vnodeWrite.c
+4
-0
source/util/src/terror.c
source/util/src/terror.c
+6
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
e88b47b9
...
...
@@ -109,6 +109,7 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_STREAMTABLES
,
TSDB_MGMT_TABLE_TP
,
TSDB_MGMT_TABLE_FUNC
,
TSDB_MGMT_TABLE_INDEX
,
TSDB_MGMT_TABLE_MAX
,
}
EShowType
;
...
...
@@ -270,9 +271,10 @@ typedef struct {
int8_t
igExists
;
int32_t
numOfColumns
;
int32_t
numOfTags
;
int32_t
commentLen
;
SArray
*
pColumns
;
SArray
*
pTags
;
char
comment
[
TSDB_STB_COMMENT_LEN
]
;
char
*
comment
;
}
SMCreateStbReq
;
int32_t
tSerializeSMCreateStbReq
(
void
*
buf
,
int32_t
bufLen
,
SMCreateStbReq
*
pReq
);
...
...
@@ -1938,12 +1940,47 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW)
}
return
buf
;
}
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
char
stb
[
TSDB_TABLE_FNAME_LEN
];
int8_t
igExists
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
int8_t
timezone
;
int32_t
dstVgId
;
// for stream
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int32_t
exprLen
;
// strlen + 1
int32_t
tagsFilterLen
;
// strlen + 1
int32_t
sqlLen
;
// strlen + 1
int32_t
astLen
;
// strlen + 1
char
*
expr
;
char
*
tagsFilter
;
char
*
sql
;
char
*
ast
;
}
SMCreateSmaReq
;
int32_t
tSerializeSMCreateSmaReq
(
void
*
buf
,
int32_t
bufLen
,
SMCreateSmaReq
*
pReq
);
int32_t
tDeserializeSMCreateSmaReq
(
void
*
buf
,
int32_t
bufLen
,
SMCreateSmaReq
*
pReq
);
void
tFreeSMCreateSmaReq
(
SMCreateSmaReq
*
pReq
);
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
int8_t
igNotExists
;
}
SMDropSmaReq
;
int32_t
tSerializeSMDropSmaReq
(
void
*
buf
,
int32_t
bufLen
,
SMDropSmaReq
*
pReq
);
int32_t
tDeserializeSMDropSmaReq
(
void
*
buf
,
int32_t
bufLen
,
SMDropSmaReq
*
pReq
);
typedef
struct
{
int8_t
version
;
// for compatibility(default 0)
int8_t
intervalUnit
;
// MACRO: TIME_UNIT_XXX
int8_t
slidingUnit
;
// MACRO: TIME_UNIT_XXX
int8_t
timezoneInt
;
// sma data expired if timezone changes.
char
indexName
[
TSDB_INDEX_NAME_LEN
];
char
timezone
[
TD_TIMEZONE_LEN
];
// sma data expired if timezone changes.
char
timezone
[
TD_TIMEZONE_LEN
];
int32_t
exprLen
;
int32_t
tagsFilterLen
;
int64_t
indexUid
;
...
...
@@ -2050,8 +2087,8 @@ static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) {
tlen
+=
taosEncodeFixedI8
(
buf
,
pSma
->
version
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pSma
->
intervalUnit
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pSma
->
slidingUnit
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pSma
->
timezoneInt
);
tlen
+=
taosEncodeString
(
buf
,
pSma
->
indexName
);
tlen
+=
taosEncodeString
(
buf
,
pSma
->
timezone
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pSma
->
exprLen
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pSma
->
tagsFilterLen
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pSma
->
indexUid
);
...
...
@@ -2085,8 +2122,8 @@ static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) {
buf
=
taosDecodeFixedI8
(
buf
,
&
pSma
->
version
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pSma
->
intervalUnit
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pSma
->
slidingUnit
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pSma
->
timezoneInt
);
buf
=
taosDecodeStringTo
(
buf
,
pSma
->
indexName
);
buf
=
taosDecodeStringTo
(
buf
,
pSma
->
timezone
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pSma
->
exprLen
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pSma
->
tagsFilterLen
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pSma
->
indexUid
);
...
...
include/common/tmsgdef.h
浏览文件 @
e88b47b9
...
...
@@ -127,6 +127,8 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_STB
,
"mnode-create-stb"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_ALTER_STB
,
"mnode-alter-stb"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_STB
,
"mnode-drop-stb"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_SMA
,
"mnode-create-sma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_SMA
,
"mnode-drop-sma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_TABLE_META
,
"mnode-table-meta"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_VGROUP_LIST
,
"mnode-vgroup-list"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_QNODE_LIST
,
"mnode-qnode-list"
,
NULL
,
NULL
)
...
...
include/dnode/mnode/sdb/sdb.h
浏览文件 @
e88b47b9
...
...
@@ -119,10 +119,11 @@ typedef enum {
SDB_CONSUMER
=
13
,
SDB_TOPIC
=
14
,
SDB_VGROUP
=
15
,
SDB_STB
=
16
,
SDB_DB
=
17
,
SDB_FUNC
=
18
,
SDB_MAX
=
19
SDB_SMA
=
16
,
SDB_STB
=
17
,
SDB_DB
=
18
,
SDB_FUNC
=
19
,
SDB_MAX
=
20
}
ESdbType
;
typedef
struct
SSdb
SSdb
;
...
...
include/util/taoserror.h
浏览文件 @
e88b47b9
...
...
@@ -274,18 +274,23 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_STREAM_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F1)
#define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2)
// mnode-sma
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0400)
#define TSDB_CODE_MND_SMA_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0401)
#define TSDB_CODE_MND_INVALID_SMA_OPTION TAOS_DEF_ERROR_CODE(0, 0x0402)
// dnode
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x04
0
0)
#define TSDB_CODE_DND_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x04
0
1)
#define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x04
0
2)
#define TSDB_CODE_NODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x04
0
3)
#define TSDB_CODE_NODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x04
0
4)
#define TSDB_CODE_NODE_PARSE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x04
0
5)
#define TSDB_CODE_NODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x04
0
6)
#define TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x04
10
)
#define TSDB_CODE_DND_VNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x04
11
)
#define TSDB_CODE_DND_VNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x04
12
)
#define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x04
13
)
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x04
A
0)
#define TSDB_CODE_DND_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x04
A
1)
#define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x04
A
2)
#define TSDB_CODE_NODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x04
A
3)
#define TSDB_CODE_NODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x04
A
4)
#define TSDB_CODE_NODE_PARSE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x04
A
5)
#define TSDB_CODE_NODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x04
A
6)
#define TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x04
A7
)
#define TSDB_CODE_DND_VNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x04
A8
)
#define TSDB_CODE_DND_VNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x04
A9
)
#define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x04
AA
)
// vnode
#define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500)
...
...
@@ -309,7 +314,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513)
#define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514)
#define TSDB_CODE_VND_TB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0515)
#define TSDB_CODE_VND_HASH_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x0516)
#define TSDB_CODE_VND_SMA_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0516)
#define TSDB_CODE_VND_HASH_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x0517)
// tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
...
...
source/common/src/tmsg.c
浏览文件 @
e88b47b9
...
...
@@ -510,6 +510,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if
(
tEncodeI8
(
&
encoder
,
pReq
->
igExists
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfColumns
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfTags
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
commentLen
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfColumns
;
++
i
)
{
SField
*
pField
=
taosArrayGet
(
pReq
->
pColumns
,
i
);
...
...
@@ -525,7 +526,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if
(
tEncodeCStr
(
&
encoder
,
pField
->
name
)
<
0
)
return
-
1
;
}
if
(
tEncode
CStr
(
&
encoder
,
pReq
->
comment
)
<
0
)
return
-
1
;
if
(
tEncode
Binary
(
&
encoder
,
pReq
->
comment
,
pReq
->
commentLen
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
...
...
@@ -542,6 +543,7 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
igExists
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
numOfColumns
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
numOfTags
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
commentLen
)
<
0
)
return
-
1
;
pReq
->
pColumns
=
taosArrayInit
(
pReq
->
numOfColumns
,
sizeof
(
SField
));
pReq
->
pTags
=
taosArrayInit
(
pReq
->
numOfTags
,
sizeof
(
SField
));
...
...
@@ -572,6 +574,12 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
}
}
if
(
pReq
->
commentLen
>
0
)
{
pReq
->
comment
=
malloc
(
pReq
->
commentLen
);
if
(
pReq
->
comment
==
NULL
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
comment
)
<
0
)
return
-
1
;
}
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
comment
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
...
...
@@ -669,6 +677,123 @@ void tFreeSMAltertbReq(SMAltertbReq *pReq) {
pReq
->
pFields
=
NULL
;
}
int32_t
tSerializeSMCreateSmaReq
(
void
*
buf
,
int32_t
bufLen
,
SMCreateSmaReq
*
pReq
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
stb
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
igExists
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
intervalUnit
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
slidingUnit
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
timezone
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
dstVgId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
interval
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
offset
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
sliding
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
exprLen
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
tagsFilterLen
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
sqlLen
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
astLen
)
<
0
)
return
-
1
;
if
(
pReq
->
exprLen
>
0
)
{
if
(
tEncodeBinary
(
&
encoder
,
pReq
->
expr
,
pReq
->
exprLen
)
<
0
)
return
-
1
;
}
if
(
pReq
->
tagsFilterLen
>
0
)
{
if
(
tEncodeBinary
(
&
encoder
,
pReq
->
tagsFilter
,
pReq
->
tagsFilterLen
)
<
0
)
return
-
1
;
}
if
(
pReq
->
sqlLen
>
0
)
{
if
(
tEncodeBinary
(
&
encoder
,
pReq
->
sql
,
pReq
->
sqlLen
)
<
0
)
return
-
1
;
}
if
(
pReq
->
astLen
>
0
)
{
if
(
tEncodeBinary
(
&
encoder
,
pReq
->
ast
,
pReq
->
astLen
)
<
0
)
return
-
1
;
}
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tCoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSMCreateSmaReq
(
void
*
buf
,
int32_t
bufLen
,
SMCreateSmaReq
*
pReq
)
{
SCoder
decoder
=
{
0
};
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_DECODER
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
stb
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
igExists
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
intervalUnit
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
slidingUnit
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
timezone
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
dstVgId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
interval
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
offset
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
sliding
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
exprLen
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
tagsFilterLen
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
sqlLen
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
astLen
)
<
0
)
return
-
1
;
if
(
pReq
->
exprLen
>
0
)
{
pReq
->
expr
=
malloc
(
pReq
->
exprLen
);
if
(
pReq
->
expr
==
NULL
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
expr
)
<
0
)
return
-
1
;
}
if
(
pReq
->
tagsFilterLen
>
0
)
{
pReq
->
tagsFilter
=
malloc
(
pReq
->
tagsFilterLen
);
if
(
pReq
->
tagsFilter
==
NULL
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
tagsFilter
)
<
0
)
return
-
1
;
}
if
(
pReq
->
sqlLen
>
0
)
{
pReq
->
sql
=
malloc
(
pReq
->
sqlLen
);
if
(
pReq
->
sql
==
NULL
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
sql
)
<
0
)
return
-
1
;
}
if
(
pReq
->
astLen
>
0
)
{
pReq
->
ast
=
malloc
(
pReq
->
astLen
);
if
(
pReq
->
ast
==
NULL
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
ast
)
<
0
)
return
-
1
;
}
tEndDecode
(
&
decoder
);
tCoderClear
(
&
decoder
);
return
0
;
}
void
tFreeSMCreateSmaReq
(
SMCreateSmaReq
*
pReq
)
{
tfree
(
pReq
->
expr
);
tfree
(
pReq
->
tagsFilter
);
tfree
(
pReq
->
sql
);
tfree
(
pReq
->
ast
);
}
int32_t
tSerializeSMDropSmaReq
(
void
*
buf
,
int32_t
bufLen
,
SMDropSmaReq
*
pReq
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
igNotExists
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tCoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSMDropSmaReq
(
void
*
buf
,
int32_t
bufLen
,
SMDropSmaReq
*
pReq
)
{
SCoder
decoder
=
{
0
};
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_DECODER
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
igNotExists
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tCoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSStatusReq
(
void
*
buf
,
int32_t
bufLen
,
SStatusReq
*
pReq
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
...
...
source/dnode/mgmt/mnode/src/mmMsg.c
浏览文件 @
e88b47b9
...
...
@@ -123,6 +123,8 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle
(
pWrapper
,
TDMT_MND_CREATE_STB
,
(
NodeMsgFp
)
mmProcessWriteMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_MND_ALTER_STB
,
(
NodeMsgFp
)
mmProcessWriteMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_MND_DROP_STB
,
(
NodeMsgFp
)
mmProcessWriteMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_MND_CREATE_SMA
,
(
NodeMsgFp
)
mmProcessWriteMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_MND_DROP_SMA
,
(
NodeMsgFp
)
mmProcessWriteMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_MND_TABLE_META
,
(
NodeMsgFp
)
mmProcessReadMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_MND_VGROUP_LIST
,
(
NodeMsgFp
)
mmProcessReadMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_MND_KILL_QUERY
,
(
NodeMsgFp
)
mmProcessWriteMsg
,
0
);
...
...
@@ -151,4 +153,6 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_CREATE_STB_RSP
,
(
NodeMsgFp
)
mmProcessWriteMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_ALTER_STB_RSP
,
(
NodeMsgFp
)
mmProcessWriteMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_DROP_STB_RSP
,
(
NodeMsgFp
)
mmProcessWriteMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_CREATE_SMA_RSP
,
(
NodeMsgFp
)
mmProcessWriteMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_DROP_SMA_RSP
,
(
NodeMsgFp
)
mmProcessWriteMsg
,
0
);
}
source/dnode/mgmt/vnode/src/vmMsg.c
浏览文件 @
e88b47b9
...
...
@@ -268,6 +268,9 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_CREATE_TABLE
,
(
NodeMsgFp
)
vmProcessWriteMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_ALTER_TABLE
,
(
NodeMsgFp
)
vmProcessWriteMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_DROP_TABLE
,
(
NodeMsgFp
)
vmProcessWriteMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_CREATE_SMA
,
(
NodeMsgFp
)
vmProcessWriteMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_CANCEL_SMA
,
(
NodeMsgFp
)
vmProcessWriteMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_DROP_SMA
,
(
NodeMsgFp
)
vmProcessWriteMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_SHOW_TABLES
,
(
NodeMsgFp
)
vmProcessFetchMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_SHOW_TABLES_FETCH
,
(
NodeMsgFp
)
vmProcessFetchMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_SET_CONN
,
(
NodeMsgFp
)
vmProcessWriteMsg
,
0
);
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
e88b47b9
...
...
@@ -103,6 +103,8 @@ typedef enum {
TRN_TYPE_CREATE_STB
=
4001
,
TRN_TYPE_ALTER_STB
=
4002
,
TRN_TYPE_DROP_STB
=
4003
,
TRN_TYPE_CREATE_SMA
=
4004
,
TRN_TYPE_DROP_SMA
=
4005
,
TRN_TYPE_STB_SCOPE_END
,
}
ETrnType
;
...
...
@@ -305,6 +307,31 @@ typedef struct {
SVnodeGid
vnodeGid
[
TSDB_MAX_REPLICA
];
}
SVgObj
;
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
char
stb
[
TSDB_TABLE_FNAME_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
int64_t
createdTime
;
int64_t
uid
;
int64_t
stbUid
;
int64_t
dbUid
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
int8_t
timezone
;
int32_t
dstVgId
;
// for stream
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int32_t
exprLen
;
// strlen + 1
int32_t
tagsFilterLen
;
int32_t
sqlLen
;
int32_t
astLen
;
char
*
expr
;
char
*
tagsFilter
;
char
*
sql
;
char
*
ast
;
}
SSmaObj
;
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
...
...
@@ -316,10 +343,11 @@ typedef struct {
int32_t
nextColId
;
int32_t
numOfColumns
;
int32_t
numOfTags
;
int32_t
commentLen
;
SSchema
*
pColumns
;
SSchema
*
pTags
;
char
*
comment
;
SRWLatch
lock
;
char
comment
[
TSDB_STB_COMMENT_LEN
];
}
SStbObj
;
typedef
struct
{
...
...
source/dnode/mnode/impl/inc/mndInt.h
浏览文件 @
e88b47b9
...
...
@@ -122,9 +122,9 @@ typedef struct SMnode {
SMsgCb
msgCb
;
}
SMnode
;
void
mndSetMsgHandle
(
SMnode
*
pMnode
,
tmsg_t
msgType
,
MndMsgFp
fp
);
u
int64_t
mndGenerateUid
(
char
*
name
,
int32_t
len
);
void
mndGetLoad
(
SMnode
*
pMnode
,
SMnodeLoad
*
pLoad
);
void
mndSetMsgHandle
(
SMnode
*
pMnode
,
tmsg_t
msgType
,
MndMsgFp
fp
);
int64_t
mndGenerateUid
(
char
*
name
,
int32_t
len
);
void
mndGetLoad
(
SMnode
*
pMnode
,
SMnodeLoad
*
pLoad
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/inc/mndSma.h
0 → 100644
浏览文件 @
e88b47b9
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_SMA_H_
#define _TD_MND_SMA_H_
#include "mndInt.h"
#ifdef __cplusplus
extern
"C"
{
#endif
int32_t
mndInitSma
(
SMnode
*
pMnode
);
void
mndCleanupSma
(
SMnode
*
pMnode
);
SSmaObj
*
mndAcquireSma
(
SMnode
*
pMnode
,
char
*
smaName
);
void
mndReleaseSma
(
SMnode
*
pMnode
,
SSmaObj
*
pSma
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_MND_SMA_H_*/
source/dnode/mnode/impl/src/mndShow.c
浏览文件 @
e88b47b9
...
...
@@ -427,6 +427,8 @@ char *mndShowStr(int32_t showType) {
return
"show topics"
;
case
TSDB_MGMT_TABLE_FUNC
:
return
"show functions"
;
case
TSDB_MGMT_TABLE_INDEX
:
return
"show indexes"
;
default:
return
"undefined"
;
}
...
...
source/dnode/mnode/impl/src/mndSma.c
0 → 100644
浏览文件 @
e88b47b9
此差异已折叠。
点击以展开。
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
e88b47b9
...
...
@@ -13,20 +13,19 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndStb.h"
#include "mndAuth.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndInfoSchema.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "mndInfoSchema.h"
#include "tname.h"
#define TSDB_STB_VER_NUMBER 1
#define TSDB_STB_VER_NUMBER
1
#define TSDB_STB_RESERVE_SIZE 64
static
SSdbRow
*
mndStbActionDecode
(
SSdbRaw
*
pRaw
);
...
...
@@ -88,6 +87,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
nextColId
,
STB_ENCODE_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
numOfColumns
,
STB_ENCODE_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
numOfTags
,
STB_ENCODE_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
commentLen
,
STB_ENCODE_OVER
)
for
(
int32_t
i
=
0
;
i
<
pStb
->
numOfColumns
;
++
i
)
{
SSchema
*
pSchema
=
&
pStb
->
pColumns
[
i
];
...
...
@@ -105,7 +105,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pSchema
->
name
,
TSDB_COL_NAME_LEN
,
STB_ENCODE_OVER
)
}
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pStb
->
comment
,
TSDB_STB_COMMENT_LEN
,
STB_ENCODE_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pStb
->
comment
,
pStb
->
commentLen
,
STB_ENCODE_OVER
)
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_STB_RESERVE_SIZE
,
STB_ENCODE_OVER
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
,
STB_ENCODE_OVER
)
...
...
@@ -150,6 +150,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
nextColId
,
STB_DECODE_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
numOfColumns
,
STB_DECODE_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
numOfTags
,
STB_DECODE_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
commentLen
,
STB_DECODE_OVER
)
pStb
->
pColumns
=
calloc
(
pStb
->
numOfColumns
,
sizeof
(
SSchema
));
pStb
->
pTags
=
calloc
(
pStb
->
numOfTags
,
sizeof
(
SSchema
));
...
...
@@ -173,7 +174,11 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pSchema
->
name
,
TSDB_COL_NAME_LEN
,
STB_DECODE_OVER
)
}
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pStb
->
comment
,
TSDB_STB_COMMENT_LEN
,
STB_DECODE_OVER
)
if
(
pStb
->
commentLen
>
0
)
{
pStb
->
comment
=
calloc
(
pStb
->
commentLen
,
1
);
if
(
pStb
->
comment
==
NULL
)
goto
STB_DECODE_OVER
;
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pStb
->
comment
,
pStb
->
commentLen
,
STB_DECODE_OVER
)
}
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
TSDB_STB_RESERVE_SIZE
,
STB_DECODE_OVER
)
terrno
=
0
;
...
...
@@ -183,6 +188,7 @@ STB_DECODE_OVER:
mError
(
"stb:%s, failed to decode from raw:%p since %s"
,
pStb
->
name
,
pRaw
,
terrstr
());
tfree
(
pStb
->
pColumns
);
tfree
(
pStb
->
pTags
);
tfree
(
pStb
->
comment
);
tfree
(
pRow
);
return
NULL
;
}
...
...
@@ -200,6 +206,7 @@ static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
mTrace
(
"stb:%s, perform delete action, row:%p"
,
pStb
->
name
,
pStb
);
tfree
(
pStb
->
pColumns
);
tfree
(
pStb
->
pTags
);
tfree
(
pStb
->
comment
);
return
0
;
}
...
...
@@ -502,6 +509,13 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
stbObj
.
nextColId
=
1
;
stbObj
.
numOfColumns
=
pCreate
->
numOfColumns
;
stbObj
.
numOfTags
=
pCreate
->
numOfTags
;
stbObj
.
commentLen
=
pCreate
->
commentLen
;
stbObj
.
comment
=
calloc
(
stbObj
.
commentLen
,
1
);
if
(
stbObj
.
comment
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
memcpy
(
stbObj
.
comment
,
pCreate
->
comment
,
stbObj
.
commentLen
);
stbObj
.
pColumns
=
malloc
(
stbObj
.
numOfColumns
*
sizeof
(
SSchema
));
stbObj
.
pTags
=
malloc
(
stbObj
.
numOfTags
*
sizeof
(
SSchema
));
...
...
@@ -1162,7 +1176,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
static
int32_t
mndDropStb
(
SMnode
*
pMnode
,
SNodeMsg
*
pReq
,
SDbObj
*
pDb
,
SStbObj
*
pStb
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_TYPE_DROP_STB
,
&
pReq
->
rpcMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_TYPE_DROP_STB
,
&
pReq
->
rpcMsg
);
if
(
pTrans
==
NULL
)
goto
DROP_STB_OVER
;
mDebug
(
"trans:%d, used to drop stb:%s"
,
pTrans
->
id
,
pStb
->
name
);
...
...
@@ -1568,7 +1582,11 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
STR_TO_VARSTR
(
pWrite
,
pStb
->
comment
);
if
(
pStb
->
commentLen
!=
0
)
{
STR_TO_VARSTR
(
pWrite
,
pStb
->
comment
);
}
else
{
STR_TO_VARSTR
(
pWrite
,
""
);
}
cols
++
;
numOfRows
++
;
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
e88b47b9
...
...
@@ -299,7 +299,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
return
-
1
;
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_R
OLLBACK
,
TRN_TYPE_CREATE_STREAM
,
&
pReq
->
rpcMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_R
ETRY
,
TRN_TYPE_CREATE_STREAM
,
&
pReq
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"stream:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
e88b47b9
...
...
@@ -406,6 +406,10 @@ static const char *mndTransType(ETrnType type) {
return
"alter-stb"
;
case
TRN_TYPE_DROP_STB
:
return
"drop-stb"
;
case
TRN_TYPE_CREATE_SMA
:
return
"create-sma"
;
case
TRN_TYPE_DROP_SMA
:
return
"drop-sma"
;
default:
return
"invalid"
;
}
...
...
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
e88b47b9
...
...
@@ -28,6 +28,7 @@
#include "mndProfile.h"
#include "mndQnode.h"
#include "mndShow.h"
#include "mndSma.h"
#include "mndSnode.h"
#include "mndStb.h"
#include "mndStream.h"
...
...
@@ -204,6 +205,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
if
(
mndAllocStep
(
pMnode
,
"mnode-offset"
,
mndInitOffset
,
mndCleanupOffset
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-vgroup"
,
mndInitVgroup
,
mndCleanupVgroup
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-stb"
,
mndInitStb
,
mndCleanupStb
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-stb"
,
mndInitSma
,
mndCleanupSma
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-infos"
,
mndInitInfos
,
mndCleanupInfos
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-db"
,
mndInitDb
,
mndCleanupDb
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-func"
,
mndInitFunc
,
mndCleanupFunc
)
!=
0
)
return
-
1
;
...
...
@@ -409,15 +411,15 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
}
// Note: uid 0 is reserved
u
int64_t
mndGenerateUid
(
char
*
name
,
int32_t
len
)
{
int64_t
mndGenerateUid
(
char
*
name
,
int32_t
len
)
{
int32_t
hashval
=
MurmurHash3_32
(
name
,
len
);
do
{
int64_t
us
=
taosGetTimestampUs
();
u
int64_t
x
=
(
us
&
0x000000FFFFFFFFFF
)
<<
24
;
u
int64_t
uuid
=
x
+
((
hashval
&
((
1ul
<<
16
)
-
1ul
))
<<
8
)
+
(
taosRand
()
&
((
1ul
<<
8
)
-
1ul
));
int64_t
x
=
(
us
&
0x000000FFFFFFFFFF
)
<<
24
;
int64_t
uuid
=
x
+
((
hashval
&
((
1ul
<<
16
)
-
1ul
))
<<
8
)
+
(
taosRand
()
&
((
1ul
<<
8
)
-
1ul
));
if
(
uuid
)
{
return
uuid
;
return
abs
(
uuid
)
;
}
}
while
(
true
);
}
...
...
source/dnode/mnode/impl/test/CMakeLists.txt
浏览文件 @
e88b47b9
...
...
@@ -12,5 +12,6 @@ add_subdirectory(dnode)
add_subdirectory
(
mnode
)
add_subdirectory
(
db
)
add_subdirectory
(
stb
)
add_subdirectory
(
sma
)
add_subdirectory
(
func
)
add_subdirectory
(
topic
)
source/dnode/mnode/impl/test/sma/CMakeLists.txt
0 → 100644
浏览文件 @
e88b47b9
aux_source_directory
(
. SMA_SRC
)
add_executable
(
mnode_test_sma
${
SMA_SRC
}
)
target_link_libraries
(
mnode_test_sma
PUBLIC sut
)
add_test
(
NAME mnode_test_sma
COMMAND mnode_test_sma
)
source/dnode/mnode/impl/test/sma/sma.cpp
0 → 100644
浏览文件 @
e88b47b9
/**
* @file sma.cpp
* @author slguan (slguan@taosdata.com)
* @brief MNODE module sma tests
* @version 1.0
* @date 2022-03-23
*
* @copyright Copyright (c) 2022
*
*/
#include "sut.h"
class
MndTestSma
:
public
::
testing
::
Test
{
protected:
static
void
SetUpTestSuite
()
{
test
.
Init
(
"/tmp/mnode_test_sma"
,
9035
);
}
static
void
TearDownTestSuite
()
{
test
.
Cleanup
();
}
static
Testbase
test
;
public:
void
SetUp
()
override
{}
void
TearDown
()
override
{}
void
*
BuildCreateDbReq
(
const
char
*
dbname
,
int32_t
*
pContLen
);
void
*
BuildDropDbReq
(
const
char
*
dbname
,
int32_t
*
pContLen
);
void
*
BuildCreateStbReq
(
const
char
*
stbname
,
int32_t
*
pContLen
);
void
*
BuildDropStbReq
(
const
char
*
stbname
,
int32_t
*
pContLen
);
void
*
BuildCreateSmaReq
(
const
char
*
smaname
,
const
char
*
stbname
,
int8_t
igExists
,
const
char
*
expr
,
const
char
*
tagsFilter
,
const
char
*
sql
,
const
char
*
ast
,
int32_t
*
pContLen
);
void
*
BuildDropSmaReq
(
const
char
*
smaname
,
int8_t
igNotExists
,
int32_t
*
pContLen
);
};
Testbase
MndTestSma
::
test
;
void
*
MndTestSma
::
BuildCreateDbReq
(
const
char
*
dbname
,
int32_t
*
pContLen
)
{
SCreateDbReq
createReq
=
{
0
};
strcpy
(
createReq
.
db
,
dbname
);
createReq
.
numOfVgroups
=
2
;
createReq
.
cacheBlockSize
=
16
;
createReq
.
totalBlocks
=
10
;
createReq
.
daysPerFile
=
10
;
createReq
.
daysToKeep0
=
3650
;
createReq
.
daysToKeep1
=
3650
;
createReq
.
daysToKeep2
=
3650
;
createReq
.
minRows
=
100
;
createReq
.
maxRows
=
4096
;
createReq
.
commitTime
=
3600
;
createReq
.
fsyncPeriod
=
3000
;
createReq
.
walLevel
=
1
;
createReq
.
precision
=
0
;
createReq
.
compression
=
2
;
createReq
.
replications
=
1
;
createReq
.
quorum
=
1
;
createReq
.
update
=
0
;
createReq
.
cacheLastRow
=
0
;
createReq
.
ignoreExist
=
1
;
int32_t
contLen
=
tSerializeSCreateDbReq
(
NULL
,
0
,
&
createReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSCreateDbReq
(
pReq
,
contLen
,
&
createReq
);
*
pContLen
=
contLen
;
return
pReq
;
}
void
*
MndTestSma
::
BuildDropDbReq
(
const
char
*
dbname
,
int32_t
*
pContLen
)
{
SDropDbReq
dropdbReq
=
{
0
};
strcpy
(
dropdbReq
.
db
,
dbname
);
int32_t
contLen
=
tSerializeSDropDbReq
(
NULL
,
0
,
&
dropdbReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSDropDbReq
(
pReq
,
contLen
,
&
dropdbReq
);
*
pContLen
=
contLen
;
return
pReq
;
}
void
*
MndTestSma
::
BuildCreateStbReq
(
const
char
*
stbname
,
int32_t
*
pContLen
)
{
SMCreateStbReq
createReq
=
{
0
};
createReq
.
numOfColumns
=
3
;
createReq
.
numOfTags
=
1
;
createReq
.
igExists
=
0
;
createReq
.
pColumns
=
taosArrayInit
(
createReq
.
numOfColumns
,
sizeof
(
SField
));
createReq
.
pTags
=
taosArrayInit
(
createReq
.
numOfTags
,
sizeof
(
SField
));
strcpy
(
createReq
.
name
,
stbname
);
{
SField
field
=
{
0
};
field
.
bytes
=
8
;
field
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
strcpy
(
field
.
name
,
"ts"
);
taosArrayPush
(
createReq
.
pColumns
,
&
field
);
}
{
SField
field
=
{
0
};
field
.
bytes
=
2
;
field
.
type
=
TSDB_DATA_TYPE_TINYINT
;
strcpy
(
field
.
name
,
"col1"
);
taosArrayPush
(
createReq
.
pColumns
,
&
field
);
}
{
SField
field
=
{
0
};
field
.
bytes
=
8
;
field
.
type
=
TSDB_DATA_TYPE_BIGINT
;
strcpy
(
field
.
name
,
"col2"
);
taosArrayPush
(
createReq
.
pColumns
,
&
field
);
}
{
SField
field
=
{
0
};
field
.
bytes
=
2
;
field
.
type
=
TSDB_DATA_TYPE_TINYINT
;
strcpy
(
field
.
name
,
"tag1"
);
taosArrayPush
(
createReq
.
pTags
,
&
field
);
}
int32_t
tlen
=
tSerializeSMCreateStbReq
(
NULL
,
0
,
&
createReq
);
void
*
pHead
=
rpcMallocCont
(
tlen
);
tSerializeSMCreateStbReq
(
pHead
,
tlen
,
&
createReq
);
tFreeSMCreateStbReq
(
&
createReq
);
*
pContLen
=
tlen
;
return
pHead
;
}
void
*
MndTestSma
::
BuildDropStbReq
(
const
char
*
stbname
,
int32_t
*
pContLen
)
{
SMDropStbReq
dropstbReq
=
{
0
};
strcpy
(
dropstbReq
.
name
,
stbname
);
int32_t
contLen
=
tSerializeSMDropStbReq
(
NULL
,
0
,
&
dropstbReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSMDropStbReq
(
pReq
,
contLen
,
&
dropstbReq
);
*
pContLen
=
contLen
;
return
pReq
;
}
void
*
MndTestSma
::
BuildCreateSmaReq
(
const
char
*
smaname
,
const
char
*
stbname
,
int8_t
igExists
,
const
char
*
expr
,
const
char
*
tagsFilter
,
const
char
*
sql
,
const
char
*
ast
,
int32_t
*
pContLen
)
{
SMCreateSmaReq
createReq
=
{
0
};
strcpy
(
createReq
.
name
,
smaname
);
strcpy
(
createReq
.
stb
,
stbname
);
createReq
.
igExists
=
igExists
;
createReq
.
intervalUnit
=
1
;
createReq
.
slidingUnit
=
2
;
createReq
.
timezone
=
3
;
createReq
.
dstVgId
=
4
;
createReq
.
interval
=
10
;
createReq
.
offset
=
5
;
createReq
.
sliding
=
6
;
createReq
.
expr
=
(
char
*
)
expr
;
createReq
.
exprLen
=
strlen
(
createReq
.
expr
)
+
1
;
createReq
.
tagsFilter
=
(
char
*
)
tagsFilter
;
createReq
.
tagsFilterLen
=
strlen
(
createReq
.
tagsFilter
)
+
1
;
createReq
.
sql
=
(
char
*
)
sql
;
createReq
.
sqlLen
=
strlen
(
createReq
.
sql
)
+
1
;
createReq
.
ast
=
(
char
*
)
expr
;
createReq
.
astLen
=
strlen
(
createReq
.
ast
)
+
1
;
int32_t
tlen
=
tSerializeSMCreateSmaReq
(
NULL
,
0
,
&
createReq
);
void
*
pHead
=
rpcMallocCont
(
tlen
);
tSerializeSMCreateSmaReq
(
pHead
,
tlen
,
&
createReq
);
*
pContLen
=
tlen
;
return
pHead
;
}
void
*
MndTestSma
::
BuildDropSmaReq
(
const
char
*
smaname
,
int8_t
igNotExists
,
int32_t
*
pContLen
)
{
SMDropSmaReq
dropsmaReq
=
{
0
};
dropsmaReq
.
igNotExists
=
igNotExists
;
strcpy
(
dropsmaReq
.
name
,
smaname
);
int32_t
contLen
=
tSerializeSMDropSmaReq
(
NULL
,
0
,
&
dropsmaReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSMDropSmaReq
(
pReq
,
contLen
,
&
dropsmaReq
);
*
pContLen
=
contLen
;
return
pReq
;
}
TEST_F
(
MndTestSma
,
01
_Create_Show_Meta_Drop_Restart_Stb
)
{
const
char
*
dbname
=
"1.d1"
;
const
char
*
stbname
=
"1.d1.stb"
;
const
char
*
smaname
=
"1.d1.sma"
;
int32_t
contLen
=
0
;
void
*
pReq
;
SRpcMsg
*
pRsp
;
{
pReq
=
BuildCreateDbReq
(
dbname
,
&
contLen
);
pRsp
=
test
.
SendReq
(
TDMT_MND_CREATE_DB
,
pReq
,
contLen
);
ASSERT_EQ
(
pRsp
->
code
,
0
);
}
{
pReq
=
BuildCreateStbReq
(
stbname
,
&
contLen
);
pRsp
=
test
.
SendReq
(
TDMT_MND_CREATE_STB
,
pReq
,
contLen
);
ASSERT_EQ
(
pRsp
->
code
,
0
);
test
.
SendShowMetaReq
(
TSDB_MGMT_TABLE_STB
,
dbname
);
test
.
SendShowRetrieveReq
();
EXPECT_EQ
(
test
.
GetShowRows
(),
1
);
}
{
pReq
=
BuildCreateSmaReq
(
smaname
,
stbname
,
0
,
"expr"
,
"tagsFilter"
,
"sql"
,
"ast"
,
&
contLen
);
pRsp
=
test
.
SendReq
(
TDMT_MND_CREATE_SMA
,
pReq
,
contLen
);
ASSERT_EQ
(
pRsp
->
code
,
0
);
test
.
SendShowMetaReq
(
TSDB_MGMT_TABLE_INDEX
,
dbname
);
test
.
SendShowRetrieveReq
();
EXPECT_EQ
(
test
.
GetShowRows
(),
1
);
}
// restart
test
.
Restart
();
{
test
.
SendShowMetaReq
(
TSDB_MGMT_TABLE_INDEX
,
dbname
);
CHECK_META
(
"show indexes"
,
3
);
test
.
SendShowRetrieveReq
();
EXPECT_EQ
(
test
.
GetShowRows
(),
1
);
CheckBinary
(
"sma"
,
TSDB_INDEX_NAME_LEN
);
CheckTimestamp
();
CheckBinary
(
"stb"
,
TSDB_TABLE_NAME_LEN
);
}
{
pReq
=
BuildDropSmaReq
(
smaname
,
0
,
&
contLen
);
pRsp
=
test
.
SendReq
(
TDMT_MND_DROP_SMA
,
pReq
,
contLen
);
ASSERT_EQ
(
pRsp
->
code
,
0
);
test
.
SendShowMetaReq
(
TSDB_MGMT_TABLE_INDEX
,
dbname
);
test
.
SendShowRetrieveReq
();
EXPECT_EQ
(
test
.
GetShowRows
(),
0
);
}
}
source/dnode/vnode/src/vnd/vnodeWrite.c
浏览文件 @
e88b47b9
...
...
@@ -168,6 +168,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
}
}
break
;
case
TDMT_VND_CREATE_SMA
:
{
// timeRangeSMA
#if 0
SSmaCfg vCreateSmaReq = {0};
if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
...
...
@@ -189,10 +190,12 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// }
tdDestroyTSma(&vCreateSmaReq.tSma);
// TODO: return directly or go on follow steps?
#endif
}
break
;
case
TDMT_VND_CANCEL_SMA
:
{
// timeRangeSMA
}
break
;
case
TDMT_VND_DROP_SMA
:
{
// timeRangeSMA
#if 0
SVDropTSmaReq vDropSmaReq = {0};
if (tDeserializeSVDropTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vDropSmaReq) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
...
...
@@ -209,6 +212,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// return -1;
// }
// TODO: return directly or go on follow steps?
#endif
}
break
;
default:
ASSERT
(
0
);
...
...
source/util/src/terror.c
浏览文件 @
e88b47b9
...
...
@@ -270,6 +270,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CANT_PARALLEL, "Invalid stage to kill
// mnode-topic
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_UNSUPPORTED_TOPIC
,
"Topic with aggregation is unsupported"
)
// mnode-sma
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_SMA_ALREADY_EXIST
,
"SMA already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_SMA_NOT_EXIST
,
"SMA does not exist"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_SMA_OPTION
,
"Invalid sma option"
)
// dnode
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_ACTION_IN_PROGRESS
,
"Action in progress"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_OFFLINE
,
"Dnode is offline"
)
...
...
@@ -305,6 +310,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operat
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_IS_SYNCING
,
"Database is syncing"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_INVALID_TSDB_STATE
,
"Invalid tsdb state"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_TB_NOT_EXIST
,
"Table not exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_SMA_NOT_EXIST
,
"SMA not exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_HASH_MISMATCH
,
"Hash value mismatch"
)
// tsdb
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录