Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d89f396d
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
d89f396d
编写于
2月 15, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
serialize stb msg
上级
2f4148e8
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
140 addition
and
106 deletion
+140
-106
include/common/tmsg.h
include/common/tmsg.h
+7
-6
source/common/src/tmsg.c
source/common/src/tmsg.c
+96
-57
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+8
-6
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+6
-3
source/dnode/mnode/impl/test/stb/stb.cpp
source/dnode/mnode/impl/test/stb/stb.cpp
+19
-28
source/libs/parser/src/astToMsg.c
source/libs/parser/src/astToMsg.c
+4
-6
未找到文件。
include/common/tmsg.h
浏览文件 @
d89f396d
...
@@ -272,8 +272,8 @@ typedef struct {
...
@@ -272,8 +272,8 @@ typedef struct {
char
comment
[
TSDB_STB_COMMENT_LEN
];
char
comment
[
TSDB_STB_COMMENT_LEN
];
}
SMCreateStbReq
;
}
SMCreateStbReq
;
int32_t
tSerializeSMCreateStbReq
(
void
*
*
buf
,
SMCreateStbReq
*
pReq
);
int32_t
tSerializeSMCreateStbReq
(
void
*
buf
,
int32_t
bufLen
,
SMCreateStbReq
*
pReq
);
void
*
tDeserializeSMCreateStbReq
(
void
*
buf
,
SMCreateStbReq
*
pReq
);
int32_t
tDeserializeSMCreateStbReq
(
void
*
buf
,
int32_t
bufLen
,
SMCreateStbReq
*
pReq
);
void
tFreeSMCreateStbReq
(
SMCreateStbReq
*
pReq
);
void
tFreeSMCreateStbReq
(
SMCreateStbReq
*
pReq
);
typedef
struct
{
typedef
struct
{
...
@@ -281,8 +281,8 @@ typedef struct {
...
@@ -281,8 +281,8 @@ typedef struct {
int8_t
igNotExists
;
int8_t
igNotExists
;
}
SMDropStbReq
;
}
SMDropStbReq
;
int32_t
tSerializeSMDropStbReq
(
void
*
*
buf
,
SMDropStbReq
*
pReq
);
int32_t
tSerializeSMDropStbReq
(
void
*
buf
,
int32_t
bufLen
,
SMDropStbReq
*
pReq
);
void
*
tDeserializeSMDropStbReq
(
void
*
buf
,
SMDropStbReq
*
pReq
);
int32_t
tDeserializeSMDropStbReq
(
void
*
buf
,
int32_t
bufLen
,
SMDropStbReq
*
pReq
);
typedef
struct
{
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
char
name
[
TSDB_TABLE_FNAME_LEN
];
...
@@ -291,8 +291,9 @@ typedef struct {
...
@@ -291,8 +291,9 @@ typedef struct {
SArray
*
pFields
;
SArray
*
pFields
;
}
SMAltertbReq
;
}
SMAltertbReq
;
int32_t
tSerializeSMAlterStbReq
(
void
**
buf
,
SMAltertbReq
*
pReq
);
int32_t
tSerializeSMAlterStbReq
(
void
*
buf
,
int32_t
bufLen
,
SMAltertbReq
*
pReq
);
void
*
tDeserializeSMAlterStbReq
(
void
*
buf
,
SMAltertbReq
*
pReq
);
int32_t
tDeserializeSMAlterStbReq
(
void
*
buf
,
int32_t
bufLen
,
SMAltertbReq
*
pReq
);
void
tFreeSMAltertbReq
(
SMAltertbReq
*
pReq
);
typedef
struct
{
typedef
struct
{
int32_t
pid
;
int32_t
pid
;
...
...
source/common/src/tmsg.c
浏览文件 @
d89f396d
...
@@ -375,132 +375,171 @@ void *tDeserializeSVDropTbReq(void *buf, SVDropTbReq *pReq) {
...
@@ -375,132 +375,171 @@ void *tDeserializeSVDropTbReq(void *buf, SVDropTbReq *pReq) {
return
buf
;
return
buf
;
}
}
int32_t
tSerializeSMCreateStbReq
(
void
**
buf
,
SMCreateStbReq
*
pReq
)
{
int32_t
tSerializeSMCreateStbReq
(
void
*
buf
,
int32_t
bufLen
,
SMCreateStbReq
*
pReq
)
{
int32_t
tlen
=
0
;
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
name
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
igExists
);
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
name
)
<
0
)
return
-
1
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
numOfColumns
);
if
(
tEncodeI8
(
&
encoder
,
pReq
->
igExists
)
<
0
)
return
-
1
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
numOfTags
);
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfColumns
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfTags
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfColumns
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfColumns
;
++
i
)
{
SField
*
pField
=
taosArrayGet
(
pReq
->
pColumns
,
i
);
SField
*
pField
=
taosArrayGet
(
pReq
->
pColumns
,
i
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pField
->
type
)
;
if
(
tEncodeI8
(
&
encoder
,
pField
->
type
)
<
0
)
return
-
1
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pField
->
bytes
)
;
if
(
tEncodeI32
(
&
encoder
,
pField
->
bytes
)
<
0
)
return
-
1
;
tlen
+=
taosEncodeString
(
buf
,
pField
->
name
)
;
if
(
tEncodeCStr
(
&
encoder
,
pField
->
name
)
<
0
)
return
-
1
;
}
}
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfTags
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfTags
;
++
i
)
{
SField
*
pField
=
taosArrayGet
(
pReq
->
pTags
,
i
);
SField
*
pField
=
taosArrayGet
(
pReq
->
pTags
,
i
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pField
->
type
)
;
if
(
tEncodeI8
(
&
encoder
,
pField
->
type
)
<
0
)
return
-
1
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pField
->
bytes
)
;
if
(
tEncodeI32
(
&
encoder
,
pField
->
bytes
)
<
0
)
return
-
1
;
tlen
+=
taosEncodeString
(
buf
,
pField
->
name
)
;
if
(
tEncodeCStr
(
&
encoder
,
pField
->
name
)
<
0
)
return
-
1
;
}
}
tlen
+=
taosEncodeString
(
buf
,
pReq
->
comment
);
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
comment
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tCoderClear
(
&
encoder
);
return
tlen
;
return
tlen
;
}
}
void
*
tDeserializeSMCreateStbReq
(
void
*
buf
,
SMCreateStbReq
*
pReq
)
{
int32_t
tDeserializeSMCreateStbReq
(
void
*
buf
,
int32_t
bufLen
,
SMCreateStbReq
*
pReq
)
{
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
name
);
SCoder
decoder
=
{
0
};
buf
=
taosDecodeFixedI8
(
buf
,
&
pReq
->
igExists
);
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_DECODER
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
numOfColumns
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
numOfTags
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
igExists
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
numOfColumns
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
numOfTags
)
<
0
)
return
-
1
;
pReq
->
pColumns
=
taosArrayInit
(
pReq
->
numOfColumns
,
sizeof
(
SField
));
pReq
->
pColumns
=
taosArrayInit
(
pReq
->
numOfColumns
,
sizeof
(
SField
));
pReq
->
pTags
=
taosArrayInit
(
pReq
->
numOfTags
,
sizeof
(
SField
));
pReq
->
pTags
=
taosArrayInit
(
pReq
->
numOfTags
,
sizeof
(
SField
));
if
(
pReq
->
pColumns
==
NULL
||
pReq
->
pTags
==
NULL
)
{
if
(
pReq
->
pColumns
==
NULL
||
pReq
->
pTags
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
return
-
1
;
}
}
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfColumns
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfColumns
;
++
i
)
{
SField
field
=
{
0
};
SField
field
=
{
0
};
buf
=
taosDecodeFixedI8
(
buf
,
&
field
.
type
)
;
if
(
tDecodeI8
(
&
decoder
,
&
field
.
type
)
<
0
)
return
-
1
;
buf
=
taosDecodeFixedI32
(
buf
,
&
field
.
bytes
)
;
if
(
tDecodeI32
(
&
decoder
,
&
field
.
bytes
)
<
0
)
return
-
1
;
buf
=
taosDecodeStringTo
(
buf
,
field
.
name
)
;
if
(
tDecodeCStrTo
(
&
decoder
,
field
.
name
)
<
0
)
return
-
1
;
if
(
taosArrayPush
(
pReq
->
pColumns
,
&
field
)
==
NULL
)
{
if
(
taosArrayPush
(
pReq
->
pColumns
,
&
field
)
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
return
-
1
;
}
}
}
}
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfTags
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfTags
;
++
i
)
{
SField
field
=
{
0
};
SField
field
=
{
0
};
buf
=
taosDecodeFixedI8
(
buf
,
&
field
.
type
)
;
if
(
tDecodeI8
(
&
decoder
,
&
field
.
type
)
<
0
)
return
-
1
;
buf
=
taosDecodeFixedI32
(
buf
,
&
field
.
bytes
)
;
if
(
tDecodeI32
(
&
decoder
,
&
field
.
bytes
)
<
0
)
return
-
1
;
buf
=
taosDecodeStringTo
(
buf
,
field
.
name
)
;
if
(
tDecodeCStrTo
(
&
decoder
,
field
.
name
)
<
0
)
return
-
1
;
if
(
taosArrayPush
(
pReq
->
pTags
,
&
field
)
==
NULL
)
{
if
(
taosArrayPush
(
pReq
->
pTags
,
&
field
)
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
return
-
1
;
}
}
}
}
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
comment
);
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
comment
)
<
0
)
return
-
1
;
return
buf
;
tEndDecode
(
&
decoder
);
tCoderClear
(
&
decoder
);
return
0
;
}
}
void
tFreeSMCreateStbReq
(
SMCreateStbReq
*
pReq
)
{
void
tFreeSMCreateStbReq
(
SMCreateStbReq
*
pReq
)
{
taosArrayDestroy
(
pReq
->
pColumns
);
taosArrayDestroy
(
pReq
->
pColumns
);
taosArrayDestroy
(
pReq
->
pTags
);
taosArrayDestroy
(
pReq
->
pTags
);
pReq
->
pColumns
=
NULL
;
pReq
->
pTags
=
NULL
;
}
}
int32_t
tSerializeSMDropStbReq
(
void
**
buf
,
SMDropStbReq
*
pReq
)
{
int32_t
tSerializeSMDropStbReq
(
void
*
buf
,
int32_t
bufLen
,
SMDropStbReq
*
pReq
)
{
int32_t
tlen
=
0
;
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
name
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
igNotExists
);
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
igNotExists
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tCoderClear
(
&
encoder
);
return
tlen
;
return
tlen
;
}
}
void
*
tDeserializeSMDropStbReq
(
void
*
buf
,
SMDropStbReq
*
pReq
)
{
int32_t
tDeserializeSMDropStbReq
(
void
*
buf
,
int32_t
bufLen
,
SMDropStbReq
*
pReq
)
{
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
name
)
;
SCoder
decoder
=
{
0
}
;
buf
=
taosDecodeFixedI8
(
buf
,
&
pReq
->
igNotExists
);
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_DECODER
);
return
buf
;
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
}
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
igNotExists
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
int32_t
tSerializeSMAlterStbReq
(
void
**
buf
,
SMAltertbReq
*
pReq
)
{
tCoderClear
(
&
decoder
);
int32_t
tlen
=
0
;
return
0
;
}
tlen
+=
taosEncodeString
(
buf
,
pReq
->
name
);
int32_t
tSerializeSMAlterStbReq
(
void
*
buf
,
int32_t
bufLen
,
SMAltertbReq
*
pReq
)
{
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
alterType
)
;
SCoder
encoder
=
{
0
}
;
t
len
+=
taosEncodeFixedI32
(
buf
,
pReq
->
numOfFields
);
t
CoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
alterType
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfFields
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfFields
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfFields
;
++
i
)
{
SField
*
pField
=
taosArrayGet
(
pReq
->
pFields
,
i
);
SField
*
pField
=
taosArrayGet
(
pReq
->
pFields
,
i
);
tlen
+=
taosEncodeFixedU8
(
buf
,
pField
->
type
)
;
if
(
tEncodeI8
(
&
encoder
,
pField
->
type
)
<
0
)
return
-
1
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pField
->
bytes
)
;
if
(
tEncodeI32
(
&
encoder
,
pField
->
bytes
)
<
0
)
return
-
1
;
tlen
+=
taosEncodeString
(
buf
,
pField
->
name
)
;
if
(
tEncodeCStr
(
&
encoder
,
pField
->
name
)
<
0
)
return
-
1
;
}
}
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tCoderClear
(
&
encoder
);
return
tlen
;
return
tlen
;
}
}
void
*
tDeserializeSMAlterStbReq
(
void
*
buf
,
SMAltertbReq
*
pReq
)
{
int32_t
tDeserializeSMAlterStbReq
(
void
*
buf
,
int32_t
bufLen
,
SMAltertbReq
*
pReq
)
{
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
name
);
SCoder
decoder
=
{
0
};
buf
=
taosDecodeFixedI8
(
buf
,
&
pReq
->
alterType
);
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_DECODER
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
numOfFields
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
alterType
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
numOfFields
)
<
0
)
return
-
1
;
pReq
->
pFields
=
taosArrayInit
(
pReq
->
numOfFields
,
sizeof
(
SField
));
pReq
->
pFields
=
taosArrayInit
(
pReq
->
numOfFields
,
sizeof
(
SField
));
if
(
pReq
->
pFields
==
NULL
)
{
if
(
pReq
->
pFields
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
return
-
1
;
}
}
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfFields
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfFields
;
++
i
)
{
SField
field
=
{
0
};
SField
field
=
{
0
};
buf
=
taosDecodeFixedU8
(
buf
,
&
field
.
type
)
;
if
(
tDecodeI8
(
&
decoder
,
&
field
.
type
)
<
0
)
return
-
1
;
buf
=
taosDecodeFixedI32
(
buf
,
&
field
.
bytes
)
;
if
(
tDecodeI32
(
&
decoder
,
&
field
.
bytes
)
<
0
)
return
-
1
;
buf
=
taosDecodeStringTo
(
buf
,
field
.
name
)
;
if
(
tDecodeCStrTo
(
&
decoder
,
field
.
name
)
<
0
)
return
-
1
;
if
(
taosArrayPush
(
pReq
->
pFields
,
&
field
)
==
NULL
)
{
if
(
taosArrayPush
(
pReq
->
pFields
,
&
field
)
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
return
-
1
;
}
}
}
}
return
buf
;
tEndDecode
(
&
decoder
);
tCoderClear
(
&
decoder
);
return
0
;
}
void
tFreeSMAltertbReq
(
SMAltertbReq
*
pReq
)
{
taosArrayDestroy
(
pReq
->
pFields
);
pReq
->
pFields
=
NULL
;
}
}
int32_t
tSerializeSStatusReq
(
void
**
buf
,
SStatusReq
*
pReq
)
{
int32_t
tSerializeSStatusReq
(
void
**
buf
,
SStatusReq
*
pReq
)
{
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
d89f396d
...
@@ -14,8 +14,8 @@
...
@@ -14,8 +14,8 @@
*/
*/
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "mndConsumer.h"
#include "mndConsumer.h"
#include "mndAuth.h"
#include "mndDb.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndMnode.h"
...
@@ -53,13 +53,14 @@ int32_t mndInitConsumer(SMnode *pMnode) {
...
@@ -53,13 +53,14 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void
mndCleanupConsumer
(
SMnode
*
pMnode
)
{}
void
mndCleanupConsumer
(
SMnode
*
pMnode
)
{}
SMqConsumerObj
*
mndCreateConsumer
(
int64_t
consumerId
,
const
char
*
cgroup
)
{
SMqConsumerObj
*
mndCreateConsumer
(
int64_t
consumerId
,
const
char
*
cgroup
)
{
SMqConsumerObj
*
pConsumer
=
malloc
(
sizeof
(
SMqConsumerObj
));
SMqConsumerObj
*
pConsumer
=
calloc
(
1
,
sizeof
(
SMqConsumerObj
));
if
(
pConsumer
==
NULL
)
{
if
(
pConsumer
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
return
NULL
;
}
}
pConsumer
->
recentRemovedTopics
=
taosArrayInit
(
0
,
sizeof
(
char
*
));
pConsumer
->
recentRemovedTopics
=
taosArrayInit
(
1
,
sizeof
(
char
*
));
pConsumer
->
epoch
=
1
;
pConsumer
->
epoch
=
1
;
pConsumer
->
consumerId
=
consumerId
;
pConsumer
->
consumerId
=
consumerId
;
atomic_store_32
(
&
pConsumer
->
status
,
MQ_CONSUMER_STATUS__INIT
);
atomic_store_32
(
&
pConsumer
->
status
,
MQ_CONSUMER_STATUS__INIT
);
...
@@ -70,7 +71,8 @@ SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) {
...
@@ -70,7 +71,8 @@ SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) {
SSdbRaw
*
mndConsumerActionEncode
(
SMqConsumerObj
*
pConsumer
)
{
SSdbRaw
*
mndConsumerActionEncode
(
SMqConsumerObj
*
pConsumer
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
void
*
buf
=
NULL
;
void
*
buf
=
NULL
;
int32_t
tlen
=
tEncodeSMqConsumerObj
(
NULL
,
pConsumer
);
int32_t
tlen
=
tEncodeSMqConsumerObj
(
NULL
,
pConsumer
);
int32_t
size
=
sizeof
(
int32_t
)
+
tlen
+
MND_CONSUMER_RESERVE_SIZE
;
int32_t
size
=
sizeof
(
int32_t
)
+
tlen
+
MND_CONSUMER_RESERVE_SIZE
;
...
@@ -105,7 +107,7 @@ CM_ENCODE_OVER:
...
@@ -105,7 +107,7 @@ CM_ENCODE_OVER:
SSdbRow
*
mndConsumerActionDecode
(
SSdbRaw
*
pRaw
)
{
SSdbRow
*
mndConsumerActionDecode
(
SSdbRaw
*
pRaw
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
void
*
buf
=
NULL
;
void
*
buf
=
NULL
;
int8_t
sver
=
0
;
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
goto
CM_DECODE_OVER
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
goto
CM_DECODE_OVER
;
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
d89f396d
...
@@ -552,7 +552,10 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
...
@@ -552,7 +552,10 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
SUserObj
*
pUser
=
NULL
;
SUserObj
*
pUser
=
NULL
;
SMCreateStbReq
createReq
=
{
0
};
SMCreateStbReq
createReq
=
{
0
};
if
(
tDeserializeSMCreateStbReq
(
pReq
->
rpcMsg
.
pCont
,
&
createReq
)
==
NULL
)
goto
CREATE_STB_OVER
;
if
(
tDeserializeSMCreateStbReq
(
pReq
->
rpcMsg
.
pCont
,
pReq
->
rpcMsg
.
contLen
,
&
createReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
CREATE_STB_OVER
;
}
mDebug
(
"stb:%s, start to create"
,
createReq
.
name
);
mDebug
(
"stb:%s, start to create"
,
createReq
.
name
);
if
(
mndCheckCreateStbReq
(
&
createReq
)
!=
0
)
{
if
(
mndCheckCreateStbReq
(
&
createReq
)
!=
0
)
{
...
@@ -1036,7 +1039,7 @@ static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) {
...
@@ -1036,7 +1039,7 @@ static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) {
SUserObj
*
pUser
=
NULL
;
SUserObj
*
pUser
=
NULL
;
SMAltertbReq
alterReq
=
{
0
};
SMAltertbReq
alterReq
=
{
0
};
if
(
tDeserializeSMAlterStbReq
(
pReq
->
rpcMsg
.
pCont
,
&
alterReq
)
==
NULL
)
{
if
(
tDeserializeSMAlterStbReq
(
pReq
->
rpcMsg
.
pCont
,
pReq
->
rpcMsg
.
contLen
,
&
alterReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
ALTER_STB_OVER
;
goto
ALTER_STB_OVER
;
}
}
...
@@ -1169,7 +1172,7 @@ static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) {
...
@@ -1169,7 +1172,7 @@ static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) {
SStbObj
*
pStb
=
NULL
;
SStbObj
*
pStb
=
NULL
;
SMDropStbReq
dropReq
=
{
0
};
SMDropStbReq
dropReq
=
{
0
};
if
(
tDeserializeSMDropStbReq
(
pReq
->
rpcMsg
.
pCont
,
&
dropReq
)
!=
0
)
{
if
(
tDeserializeSMDropStbReq
(
pReq
->
rpcMsg
.
pCont
,
pReq
->
rpcMsg
.
contLen
,
&
dropReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
DROP_STB_OVER
;
goto
DROP_STB_OVER
;
}
}
...
...
source/dnode/mnode/impl/test/stb/stb.cpp
浏览文件 @
d89f396d
...
@@ -129,11 +129,10 @@ void* MndTestStb::BuildCreateStbReq(const char* stbname, int32_t* pContLen) {
...
@@ -129,11 +129,10 @@ void* MndTestStb::BuildCreateStbReq(const char* stbname, int32_t* pContLen) {
taosArrayPush
(
createReq
.
pTags
,
&
field
);
taosArrayPush
(
createReq
.
pTags
,
&
field
);
}
}
int32_t
tlen
=
tSerializeSMCreateStbReq
(
NULL
,
&
createReq
);
int32_t
tlen
=
tSerializeSMCreateStbReq
(
NULL
,
0
,
&
createReq
);
void
*
pHead
=
rpcMallocCont
(
tlen
);
void
*
pHead
=
rpcMallocCont
(
tlen
);
tSerializeSMCreateStbReq
(
pHead
,
tlen
,
&
createReq
);
void
*
pBuf
=
pHead
;
tFreeSMCreateStbReq
(
&
createReq
);
tSerializeSMCreateStbReq
(
&
pBuf
,
&
createReq
);
*
pContLen
=
tlen
;
*
pContLen
=
tlen
;
return
pHead
;
return
pHead
;
}
}
...
@@ -151,10 +150,9 @@ void* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, const char* tagnam
...
@@ -151,10 +150,9 @@ void* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, const char* tagnam
strcpy
(
field
.
name
,
tagname
);
strcpy
(
field
.
name
,
tagname
);
taosArrayPush
(
req
.
pFields
,
&
field
);
taosArrayPush
(
req
.
pFields
,
&
field
);
int32_t
contLen
=
tSerializeSMAlterStbReq
(
NULL
,
&
req
);
int32_t
contLen
=
tSerializeSMAlterStbReq
(
NULL
,
0
,
&
req
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
void
*
pBuf
=
pHead
;
tSerializeSMAlterStbReq
(
pHead
,
contLen
,
&
req
);
tSerializeSMAlterStbReq
(
&
pBuf
,
&
req
);
*
pContLen
=
contLen
;
*
pContLen
=
contLen
;
return
pHead
;
return
pHead
;
...
@@ -173,10 +171,9 @@ void* MndTestStb::BuildAlterStbDropTagReq(const char* stbname, const char* tagna
...
@@ -173,10 +171,9 @@ void* MndTestStb::BuildAlterStbDropTagReq(const char* stbname, const char* tagna
strcpy
(
field
.
name
,
tagname
);
strcpy
(
field
.
name
,
tagname
);
taosArrayPush
(
req
.
pFields
,
&
field
);
taosArrayPush
(
req
.
pFields
,
&
field
);
int32_t
contLen
=
tSerializeSMAlterStbReq
(
NULL
,
&
req
);
int32_t
contLen
=
tSerializeSMAlterStbReq
(
NULL
,
0
,
&
req
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
void
*
pBuf
=
pHead
;
tSerializeSMAlterStbReq
(
pHead
,
contLen
,
&
req
);
tSerializeSMAlterStbReq
(
&
pBuf
,
&
req
);
*
pContLen
=
contLen
;
*
pContLen
=
contLen
;
return
pHead
;
return
pHead
;
...
@@ -202,10 +199,9 @@ void* MndTestStb::BuildAlterStbUpdateTagNameReq(const char* stbname, const char*
...
@@ -202,10 +199,9 @@ void* MndTestStb::BuildAlterStbUpdateTagNameReq(const char* stbname, const char*
strcpy
(
field2
.
name
,
newtagname
);
strcpy
(
field2
.
name
,
newtagname
);
taosArrayPush
(
req
.
pFields
,
&
field2
);
taosArrayPush
(
req
.
pFields
,
&
field2
);
int32_t
contLen
=
tSerializeSMAlterStbReq
(
NULL
,
&
req
);
int32_t
contLen
=
tSerializeSMAlterStbReq
(
NULL
,
0
,
&
req
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
void
*
pBuf
=
pHead
;
tSerializeSMAlterStbReq
(
pHead
,
contLen
,
&
req
);
tSerializeSMAlterStbReq
(
&
pBuf
,
&
req
);
*
pContLen
=
contLen
;
*
pContLen
=
contLen
;
return
pHead
;
return
pHead
;
...
@@ -225,10 +221,9 @@ void* MndTestStb::BuildAlterStbUpdateTagBytesReq(const char* stbname, const char
...
@@ -225,10 +221,9 @@ void* MndTestStb::BuildAlterStbUpdateTagBytesReq(const char* stbname, const char
strcpy
(
field
.
name
,
tagname
);
strcpy
(
field
.
name
,
tagname
);
taosArrayPush
(
req
.
pFields
,
&
field
);
taosArrayPush
(
req
.
pFields
,
&
field
);
int32_t
contLen
=
tSerializeSMAlterStbReq
(
NULL
,
&
req
);
int32_t
contLen
=
tSerializeSMAlterStbReq
(
NULL
,
0
,
&
req
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
void
*
pBuf
=
pHead
;
tSerializeSMAlterStbReq
(
pHead
,
contLen
,
&
req
);
tSerializeSMAlterStbReq
(
&
pBuf
,
&
req
);
*
pContLen
=
contLen
;
*
pContLen
=
contLen
;
return
pHead
;
return
pHead
;
...
@@ -247,10 +242,9 @@ void* MndTestStb::BuildAlterStbAddColumnReq(const char* stbname, const char* col
...
@@ -247,10 +242,9 @@ void* MndTestStb::BuildAlterStbAddColumnReq(const char* stbname, const char* col
strcpy
(
field
.
name
,
colname
);
strcpy
(
field
.
name
,
colname
);
taosArrayPush
(
req
.
pFields
,
&
field
);
taosArrayPush
(
req
.
pFields
,
&
field
);
int32_t
contLen
=
tSerializeSMAlterStbReq
(
NULL
,
&
req
);
int32_t
contLen
=
tSerializeSMAlterStbReq
(
NULL
,
0
,
&
req
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
void
*
pBuf
=
pHead
;
tSerializeSMAlterStbReq
(
pHead
,
contLen
,
&
req
);
tSerializeSMAlterStbReq
(
&
pBuf
,
&
req
);
*
pContLen
=
contLen
;
*
pContLen
=
contLen
;
return
pHead
;
return
pHead
;
...
@@ -269,10 +263,9 @@ void* MndTestStb::BuildAlterStbDropColumnReq(const char* stbname, const char* co
...
@@ -269,10 +263,9 @@ void* MndTestStb::BuildAlterStbDropColumnReq(const char* stbname, const char* co
strcpy
(
field
.
name
,
colname
);
strcpy
(
field
.
name
,
colname
);
taosArrayPush
(
req
.
pFields
,
&
field
);
taosArrayPush
(
req
.
pFields
,
&
field
);
int32_t
contLen
=
tSerializeSMAlterStbReq
(
NULL
,
&
req
);
int32_t
contLen
=
tSerializeSMAlterStbReq
(
NULL
,
0
,
&
req
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
void
*
pBuf
=
pHead
;
tSerializeSMAlterStbReq
(
pHead
,
contLen
,
&
req
);
tSerializeSMAlterStbReq
(
&
pBuf
,
&
req
);
*
pContLen
=
contLen
;
*
pContLen
=
contLen
;
return
pHead
;
return
pHead
;
...
@@ -292,10 +285,9 @@ void* MndTestStb::BuildAlterStbUpdateColumnBytesReq(const char* stbname, const c
...
@@ -292,10 +285,9 @@ void* MndTestStb::BuildAlterStbUpdateColumnBytesReq(const char* stbname, const c
strcpy
(
field
.
name
,
colname
);
strcpy
(
field
.
name
,
colname
);
taosArrayPush
(
req
.
pFields
,
&
field
);
taosArrayPush
(
req
.
pFields
,
&
field
);
int32_t
contLen
=
tSerializeSMAlterStbReq
(
NULL
,
&
req
);
int32_t
contLen
=
tSerializeSMAlterStbReq
(
NULL
,
0
,
&
req
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
void
*
pBuf
=
pHead
;
tSerializeSMAlterStbReq
(
pHead
,
contLen
,
&
req
);
tSerializeSMAlterStbReq
(
&
pBuf
,
&
req
);
*
pContLen
=
contLen
;
*
pContLen
=
contLen
;
return
pHead
;
return
pHead
;
...
@@ -430,10 +422,9 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
...
@@ -430,10 +422,9 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
SMDropStbReq
dropReq
=
{
0
};
SMDropStbReq
dropReq
=
{
0
};
strcpy
(
dropReq
.
name
,
stbname
);
strcpy
(
dropReq
.
name
,
stbname
);
int32_t
contLen
=
tSerializeSMDropStbReq
(
NULL
,
&
dropReq
);
int32_t
contLen
=
tSerializeSMDropStbReq
(
NULL
,
0
,
&
dropReq
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
void
*
pBuf
=
pHead
;
tSerializeSMDropStbReq
(
pHead
,
contLen
,
&
dropReq
);
tSerializeSMDropStbReq
(
&
pBuf
,
&
dropReq
);
SRpcMsg
*
pRsp
=
test
.
SendReq
(
TDMT_MND_DROP_STB
,
pHead
,
contLen
);
SRpcMsg
*
pRsp
=
test
.
SendReq
(
TDMT_MND_DROP_STB
,
pHead
,
contLen
);
ASSERT_NE
(
pRsp
,
nullptr
);
ASSERT_NE
(
pRsp
,
nullptr
);
...
...
source/libs/parser/src/astToMsg.c
浏览文件 @
d89f396d
...
@@ -404,15 +404,14 @@ char* buildCreateStbReq(SCreateTableSql* pCreateTableSql, int32_t* outputLen, SP
...
@@ -404,15 +404,14 @@ char* buildCreateStbReq(SCreateTableSql* pCreateTableSql, int32_t* outputLen, SP
return
NULL
;
return
NULL
;
}
}
int32_t
tlen
=
tSerializeSMCreateStbReq
(
NULL
,
&
createReq
);
int32_t
tlen
=
tSerializeSMCreateStbReq
(
NULL
,
0
,
&
createReq
);
void
*
pReq
=
malloc
(
tlen
);
void
*
pReq
=
malloc
(
tlen
);
if
(
pReq
==
NULL
)
{
if
(
pReq
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
return
NULL
;
}
}
void
*
pBuf
=
pReq
;
tSerializeSMCreateStbReq
(
pReq
,
tlen
,
&
createReq
);
tSerializeSMCreateStbReq
(
&
pBuf
,
&
createReq
);
*
outputLen
=
tlen
;
*
outputLen
=
tlen
;
return
pReq
;
return
pReq
;
}
}
...
@@ -433,15 +432,14 @@ char* buildDropStableReq(SSqlInfo* pInfo, int32_t* outputLen, SParseContext* pPa
...
@@ -433,15 +432,14 @@ char* buildDropStableReq(SSqlInfo* pInfo, int32_t* outputLen, SParseContext* pPa
assert
(
code
==
TSDB_CODE_SUCCESS
&&
name
.
type
==
TSDB_TABLE_NAME_T
);
assert
(
code
==
TSDB_CODE_SUCCESS
&&
name
.
type
==
TSDB_TABLE_NAME_T
);
dropReq
.
igNotExists
=
pInfo
->
pMiscInfo
->
existsCheck
?
1
:
0
;
dropReq
.
igNotExists
=
pInfo
->
pMiscInfo
->
existsCheck
?
1
:
0
;
int32_t
tlen
=
tSerializeSMDropStbReq
(
NULL
,
&
dropReq
);
int32_t
tlen
=
tSerializeSMDropStbReq
(
NULL
,
0
,
&
dropReq
);
void
*
pReq
=
malloc
(
tlen
);
void
*
pReq
=
malloc
(
tlen
);
if
(
pReq
==
NULL
)
{
if
(
pReq
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
return
NULL
;
}
}
void
*
pBuf
=
pReq
;
tSerializeSMDropStbReq
(
pReq
,
tlen
,
&
dropReq
);
tSerializeSMDropStbReq
(
&
pBuf
,
&
dropReq
);
*
outputLen
=
tlen
;
*
outputLen
=
tlen
;
return
pReq
;
return
pReq
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录