Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
374c8b54
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
374c8b54
编写于
1月 07, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'feature/qnode' of github.com:taosdata/TDengine into feature/qnode
上级
99d602b8
1c7b2a5d
变更
17
隐藏空白更改
内联
并排
Showing
17 changed file
with
675 addition
and
463 deletion
+675
-463
include/common/tmsg.h
include/common/tmsg.h
+75
-7
source/client/inc/clientHb.h
source/client/inc/clientHb.h
+11
-106
source/client/src/clientHb.c
source/client/src/clientHb.c
+32
-15
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+2
-2
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+213
-216
source/common/src/tglobal.c
source/common/src/tglobal.c
+1
-1
source/common/src/tmsg.c
source/common/src/tmsg.c
+35
-1
source/dnode/mgmt/impl/src/dndMnode.c
source/dnode/mgmt/impl/src/dndMnode.c
+25
-25
source/dnode/mgmt/impl/src/dndTransport.c
source/dnode/mgmt/impl/src/dndTransport.c
+37
-37
source/dnode/mgmt/impl/test/mnode/qmnode.cpp
source/dnode/mgmt/impl/test/mnode/qmnode.cpp
+157
-0
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+2
-6
source/libs/parser/inc/astGenerator.h
source/libs/parser/inc/astGenerator.h
+1
-1
source/libs/parser/src/astGenerator.c
source/libs/parser/src/astGenerator.c
+1
-1
source/libs/parser/src/dCDAstProcess.c
source/libs/parser/src/dCDAstProcess.c
+0
-1
source/libs/parser/src/parser.c
source/libs/parser/src/parser.c
+0
-8
source/util/src/thash.c
source/util/src/thash.c
+21
-5
tests/test/c/create_table.c
tests/test/c/create_table.c
+62
-31
未找到文件。
include/common/tmsg.h
浏览文件 @
374c8b54
...
...
@@ -26,6 +26,7 @@ extern "C" {
#include "tarray.h"
#include "tcoding.h"
#include "tdataformat.h"
#include "thash.h"
#include "tlist.h"
/* ------------------------ MESSAGE DEFINITIONS ------------------------ */
...
...
@@ -132,6 +133,73 @@ typedef enum _mgmt_table {
#define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC)
#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0)
typedef
struct
SKlv
{
int32_t
keyLen
;
int32_t
valueLen
;
void
*
key
;
void
*
value
;
}
SKlv
;
static
FORCE_INLINE
int
taosEncodeSKlv
(
void
**
buf
,
const
SKlv
*
pKlv
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pKlv
->
keyLen
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pKlv
->
valueLen
);
tlen
+=
taosEncodeBinary
(
buf
,
pKlv
->
key
,
pKlv
->
keyLen
);
tlen
+=
taosEncodeBinary
(
buf
,
pKlv
->
value
,
pKlv
->
valueLen
);
return
tlen
;
}
static
FORCE_INLINE
void
*
taosDecodeSKlv
(
void
*
buf
,
SKlv
*
pKlv
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pKlv
->
keyLen
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pKlv
->
valueLen
);
buf
=
taosDecodeBinary
(
buf
,
&
pKlv
->
key
,
pKlv
->
keyLen
);
buf
=
taosDecodeBinary
(
buf
,
&
pKlv
->
value
,
pKlv
->
valueLen
);
return
buf
;
}
typedef
struct
SClientHbKey
{
int32_t
connId
;
int32_t
hbType
;
}
SClientHbKey
;
static
FORCE_INLINE
int
taosEncodeSClientHbKey
(
void
**
buf
,
const
SClientHbKey
*
pKey
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pKey
->
connId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pKey
->
hbType
);
return
tlen
;
}
static
FORCE_INLINE
void
*
taosDecodeSClientHbKey
(
void
*
buf
,
SClientHbKey
*
pKey
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pKey
->
connId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pKey
->
hbType
);
return
buf
;
}
typedef
struct
SClientHbReq
{
SClientHbKey
connKey
;
SHashObj
*
info
;
// hash<Slv.key, Sklv>
}
SClientHbReq
;
typedef
struct
SClientHbBatchReq
{
int64_t
reqId
;
SArray
*
reqs
;
// SArray<SClientHbReq>
}
SClientHbBatchReq
;
int
tSerializeSClientHbReq
(
void
**
buf
,
const
SClientHbReq
*
pReq
);
void
*
tDeserializeClientHbReq
(
void
*
buf
,
SClientHbReq
*
pReq
);
typedef
struct
SClientHbRsp
{
SClientHbKey
connKey
;
int32_t
status
;
int32_t
bodyLen
;
void
*
body
;
}
SClientHbRsp
;
typedef
struct
SClientHbBatchRsp
{
int64_t
reqId
;
int64_t
rspId
;
SArray
*
rsps
;
// SArray<SClientHbRsp>
}
SClientHbBatchRsp
;
typedef
struct
SBuildTableMetaInput
{
int32_t
vgId
;
char
*
dbName
;
...
...
@@ -929,7 +997,7 @@ typedef struct {
char
encrypt
;
char
secret
[
TSDB_PASSWORD_LEN
];
char
ckey
[
TSDB_PASSWORD_LEN
];
}
SAuth
Msg
,
SAuthRsp
;
}
SAuth
Req
,
SAuthRsp
;
typedef
struct
{
int8_t
finished
;
...
...
@@ -1124,7 +1192,7 @@ typedef struct {
int32_t
topicNum
;
int64_t
consumerId
;
char
*
consumerGroup
;
SArray
*
topicNames
;
// SArray<char*>
SArray
*
topicNames
;
// SArray<char*>
}
SCMSubscribeReq
;
static
FORCE_INLINE
int
tSerializeSCMSubscribeReq
(
void
**
buf
,
const
SCMSubscribeReq
*
pReq
)
{
...
...
@@ -1133,7 +1201,7 @@ static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribe
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
consumerId
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
consumerGroup
);
for
(
int
i
=
0
;
i
<
pReq
->
topicNum
;
i
++
)
{
for
(
int
i
=
0
;
i
<
pReq
->
topicNum
;
i
++
)
{
tlen
+=
taosEncodeString
(
buf
,
(
char
*
)
taosArrayGetP
(
pReq
->
topicNames
,
i
));
}
return
tlen
;
...
...
@@ -1144,7 +1212,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
consumerId
);
buf
=
taosDecodeString
(
buf
,
&
pReq
->
consumerGroup
);
pReq
->
topicNames
=
taosArrayInit
(
pReq
->
topicNum
,
sizeof
(
void
*
));
for
(
int
i
=
0
;
i
<
pReq
->
topicNum
;
i
++
)
{
for
(
int
i
=
0
;
i
<
pReq
->
topicNum
;
i
++
)
{
char
*
name
=
NULL
;
buf
=
taosDecodeString
(
buf
,
&
name
);
taosArrayPush
(
pReq
->
topicNames
,
&
name
);
...
...
@@ -1159,14 +1227,14 @@ typedef struct SMqSubTopic {
}
SMqSubTopic
;
typedef
struct
{
int32_t
topicNum
;
int32_t
topicNum
;
SMqSubTopic
topics
[];
}
SCMSubscribeRsp
;
static
FORCE_INLINE
int
tSerializeSCMSubscribeRsp
(
void
**
buf
,
const
SCMSubscribeRsp
*
pRsp
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pRsp
->
topicNum
);
for
(
int
i
=
0
;
i
<
pRsp
->
topicNum
;
i
++
)
{
for
(
int
i
=
0
;
i
<
pRsp
->
topicNum
;
i
++
)
{
tlen
+=
taosEncodeFixedI32
(
buf
,
pRsp
->
topics
[
i
].
vgId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
topics
[
i
].
topicId
);
tlen
+=
taosEncodeSEpSet
(
buf
,
&
pRsp
->
topics
[
i
].
epSet
);
...
...
@@ -1176,7 +1244,7 @@ static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribe
static
FORCE_INLINE
void
*
tDeserializeSCMSubscribeRsp
(
void
*
buf
,
SCMSubscribeRsp
*
pRsp
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
topicNum
);
for
(
int
i
=
0
;
i
<
pRsp
->
topicNum
;
i
++
)
{
for
(
int
i
=
0
;
i
<
pRsp
->
topicNum
;
i
++
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
topics
[
i
].
vgId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
topics
[
i
].
topicId
);
buf
=
taosDecodeSEpSet
(
buf
,
&
pRsp
->
topics
[
i
].
epSet
);
...
...
source/client/inc/clientHb.h
浏览文件 @
374c8b54
...
...
@@ -20,118 +20,21 @@
typedef
enum
{
mq
=
0
,
// type can be added here
//
HEARTBEAT_TYPE_MAX
}
EHbType
;
typedef
struct
SKlv
{
int32_t
keyLen
;
int32_t
valueLen
;
void
*
key
;
void
*
value
;
}
SKlv
;
static
FORCE_INLINE
int
taosEncodeSKlv
(
void
**
buf
,
const
SKlv
*
pKlv
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pKlv
->
keyLen
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pKlv
->
valueLen
);
tlen
+=
taosEncodeBinary
(
buf
,
pKlv
->
key
,
pKlv
->
keyLen
);
tlen
+=
taosEncodeBinary
(
buf
,
pKlv
->
value
,
pKlv
->
valueLen
);
return
tlen
;
}
static
FORCE_INLINE
void
*
taosDecodeSKlv
(
void
*
buf
,
SKlv
*
pKlv
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pKlv
->
keyLen
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pKlv
->
valueLen
);
buf
=
taosDecodeBinary
(
buf
,
&
pKlv
->
key
,
pKlv
->
keyLen
);
buf
=
taosDecodeBinary
(
buf
,
&
pKlv
->
value
,
pKlv
->
valueLen
);
return
buf
;
}
typedef
struct
SClientHbKey
{
int32_t
connId
;
int32_t
hbType
;
}
SClientHbKey
;
static
FORCE_INLINE
int
taosEncodeSClientHbKey
(
void
**
buf
,
const
SClientHbKey
*
pKey
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pKey
->
connId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pKey
->
hbType
);
return
tlen
;
}
static
FORCE_INLINE
void
*
taosDecodeSClientHbKey
(
void
*
buf
,
SClientHbKey
*
pKey
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pKey
->
connId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pKey
->
hbType
);
return
buf
;
}
typedef
struct
SClientHbReq
{
SClientHbKey
hbKey
;
SHashObj
*
info
;
// hash<Sklv>
}
SClientHbReq
;
static
FORCE_INLINE
int
tSerializeSClientHbReq
(
void
**
buf
,
const
SClientHbReq
*
pReq
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeSClientHbKey
(
buf
,
&
pReq
->
hbKey
);
void
*
pIter
=
NULL
;
void
*
data
;
SKlv
klv
;
data
=
taosHashIterate
(
pReq
->
info
,
pIter
);
while
(
data
!=
NULL
)
{
taosHashGetKey
(
data
,
&
klv
.
key
,
(
size_t
*
)
&
klv
.
keyLen
);
klv
.
valueLen
=
taosHashGetDataLen
(
data
);
klv
.
value
=
data
;
taosEncodeSKlv
(
buf
,
&
klv
);
data
=
taosHashIterate
(
pReq
->
info
,
pIter
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDeserializeClientHbReq
(
void
*
buf
,
SClientHbReq
*
pReq
)
{
ASSERT
(
pReq
->
info
!=
NULL
);
buf
=
taosDecodeSClientHbKey
(
buf
,
&
pReq
->
hbKey
);
//TODO: error handling
if
(
pReq
->
info
==
NULL
)
{
pReq
->
info
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
}
SKlv
klv
;
buf
=
taosDecodeSKlv
(
buf
,
&
klv
);
taosHashPut
(
pReq
->
info
,
klv
.
key
,
klv
.
keyLen
,
klv
.
value
,
klv
.
valueLen
);
return
buf
;
}
typedef
struct
SClientHbBatchReq
{
int64_t
reqId
;
SArray
*
reqs
;
// SArray<SClientHbReq>
}
SClientHbBatchReq
;
typedef
struct
SClientHbHandleResult
{
}
SClientHbHandleResult
;
typedef
struct
SClientHbRsp
{
int32_t
connId
;
int32_t
hbType
;
}
SClientHbRsp
;
typedef
struct
SClientHbBatchRsp
{
int64_t
reqId
;
int64_t
rspId
;
SArray
*
rsps
;
// SArray<SClientHbRsp>
}
SClientHbBatchRsp
;
typedef
int32_t
(
*
FHbRspHandle
)(
SClientHbReq
*
pReq
);
typedef
int32_t
(
*
FGetConnInfo
)(
int32_t
conn
,
void
*
self
);
typedef
int32_t
(
*
FHbRspHandle
)(
SClientHbRsp
*
pReq
);
typedef
int32_t
(
*
FGetConnInfo
)(
SClientHbKey
connKey
,
void
*
param
);
typedef
struct
SClientHbMgr
{
int8_t
inited
;
int32_t
reportInterval
;
// unit ms
int32_t
stats
;
SRWLatch
lock
;
SHashObj
*
info
;
//hash<SClientHbKey, SClientHbReq>
SHashObj
*
activeInfo
;
// hash<SClientHbKey, SClientHbReq>
SHashObj
*
getInfoFuncs
;
// hash<SClientHbKey, FGetConnInfo>
FHbRspHandle
handle
[
HEARTBEAT_TYPE_MAX
];
// input queue
}
SClientHbMgr
;
...
...
@@ -140,9 +43,11 @@ static SClientHbMgr clientHbMgr = {0};
int
hbMgrInit
();
void
hbMgrCleanUp
();
int
hbHandleRsp
(
void
*
hbMsg
);
int
hbRegisterConn
(
SClientHbKey
connKey
,
FGetConnInfo
func
);
int
registerConn
(
int32_t
connId
,
FGetConnInfo
func
,
FHbRspHandle
rspHandle
);
int
registerHbRspHandle
(
int32_t
connId
,
int32_t
hbType
,
FHbRspHandle
rspHandle
);
int
hbAddConnInfo
(
SClientHbKey
connKey
,
void
*
key
,
void
*
value
,
int32_t
keyLen
,
int32_t
valueLen
);
int
HbAddConnInfo
(
int32_t
connId
,
void
*
key
,
void
*
value
,
int32_t
keyLen
,
int32_t
valueLen
);
source/client/src/clientHb.c
浏览文件 @
374c8b54
...
...
@@ -15,45 +15,62 @@
#include "clientHb.h"
static
int32_t
mqHbRspHandle
(
SClientHbR
eq
*
pReq
)
{
static
int32_t
mqHbRspHandle
(
SClientHbR
sp
*
pReq
)
{
return
0
;
}
uint32_t
hbKeyHashFunc
(
const
char
*
key
,
uint32_t
keyLen
)
{
return
0
;
}
static
void
hbMgrInitMqHbFunc
()
{
clientHbMgr
.
handle
[
mq
]
=
mqHbRspHandle
;
}
int
hbMgrInit
()
{
//init once
//
//init lock
//
//init
handle funcs
clientHbMgr
.
handle
[
mq
]
=
mqHbRspHandle
;
int8_t
old
=
atomic_val_compare_exchange_8
(
&
clientHbMgr
.
inited
,
0
,
1
);
if
(
old
==
1
)
return
0
;
//init
config
clientHbMgr
.
reportInterval
=
1500
;
//init stat
clientHbMgr
.
stats
=
0
;
//init config
clientHbMgr
.
reportInterval
=
1500
;
//init lock
taosInitRWLatch
(
&
clientHbMgr
.
lock
);
//init handle funcs
hbMgrInitMqHbFunc
();
//init hash info
//
clientHbMgr
.
activeInfo
=
taosHashInit
(
64
,
hbKeyHashFunc
,
1
,
HASH_ENTRY_LOCK
);
//init getInfoFunc
clientHbMgr
.
getInfoFuncs
=
taosHashInit
(
64
,
hbKeyHashFunc
,
1
,
HASH_ENTRY_LOCK
);
return
0
;
}
void
hbMgrCleanUp
()
{
int8_t
old
=
atomic_val_compare_exchange_8
(
&
clientHbMgr
.
inited
,
1
,
0
);
if
(
old
==
0
)
return
;
taosHashCleanup
(
clientHbMgr
.
activeInfo
);
taosHashCleanup
(
clientHbMgr
.
getInfoFuncs
);
}
int
registerConn
(
int32_t
connId
,
FGetConnInfo
func
,
FHbRspHandle
rspHandle
)
{
return
0
;
}
int
registerHbRspHandle
(
int32_t
connId
,
int32_t
hbType
,
FHbRspHandle
rspHandle
)
{
int
hbRegisterConn
(
SClientHbKey
connKey
,
FGetConnInfo
func
)
{
return
0
;
}
int
HbAddConnInfo
(
int32_t
connId
,
void
*
key
,
void
*
value
,
int32_t
keyLen
,
int32_t
valueLen
)
{
int
hbAddConnInfo
(
SClientHbKey
connKey
,
void
*
key
,
void
*
value
,
int32_t
keyLen
,
int32_t
valueLen
)
{
//lock
//find req by connection id
SClientHbReq
*
data
=
taosHashGet
(
clientHbMgr
.
activeInfo
,
&
connKey
,
sizeof
(
SClientHbKey
));
ASSERT
(
data
!=
NULL
);
taosHashPut
(
data
->
info
,
key
,
keyLen
,
value
,
valueLen
);
//unlock
return
0
;
...
...
source/client/src/clientImpl.c
浏览文件 @
374c8b54
...
...
@@ -140,7 +140,7 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj*
(
*
pRequest
)
->
sqlstr
[
sqlLen
]
=
0
;
(
*
pRequest
)
->
sqlLen
=
sqlLen
;
tscDebugL
(
"0x%"
PRIx64
" SQL: %s, reqId:0x"
PRIx64
,
(
*
pRequest
)
->
self
,
(
*
pRequest
)
->
sqlstr
,
(
*
pRequest
)
->
requestId
);
tscDebugL
(
"0x%"
PRIx64
" SQL: %s, reqId:0x
%
"
PRIx64
,
(
*
pRequest
)
->
self
,
(
*
pRequest
)
->
sqlstr
,
(
*
pRequest
)
->
requestId
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -181,7 +181,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
if
(
pDcl
->
msgType
==
TDMT_VND_SHOW_TABLES
)
{
SShowReqInfo
*
pShowReqInfo
=
&
pRequest
->
body
.
showInfo
;
if
(
pShowReqInfo
->
pArray
==
NULL
)
{
pShowReqInfo
->
currentIndex
=
0
;
pShowReqInfo
->
currentIndex
=
0
;
// set the first vnode/ then iterate the next vnode
pShowReqInfo
->
pArray
=
pDcl
->
pExtension
;
}
}
...
...
source/client/test/clientTests.cpp
浏览文件 @
374c8b54
...
...
@@ -101,13 +101,13 @@ TEST(testCase, show_user_Test) {
assert
(
pConn
!=
NULL
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"show users"
);
TAOS_ROW
pRow
=
NULL
;
TAOS_ROW
pRow
=
NULL
;
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
char
str
[
512
]
=
{
0
};
while
((
pRow
=
taos_fetch_row
(
pRes
))
!=
NULL
)
{
while
((
pRow
=
taos_fetch_row
(
pRes
))
!=
NULL
)
{
int32_t
code
=
taos_print_row
(
str
,
pRow
,
pFields
,
numOfFields
);
printf
(
"%s
\n
"
,
str
);
}
...
...
@@ -134,13 +134,13 @@ TEST(testCase, show_db_Test) {
assert
(
pConn
!=
NULL
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"show databases"
);
TAOS_ROW
pRow
=
NULL
;
TAOS_ROW
pRow
=
NULL
;
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
char
str
[
512
]
=
{
0
};
while
((
pRow
=
taos_fetch_row
(
pRes
))
!=
NULL
)
{
while
((
pRow
=
taos_fetch_row
(
pRes
))
!=
NULL
)
{
int32_t
code
=
taos_print_row
(
str
,
pRow
,
pFields
,
numOfFields
);
printf
(
"%s
\n
"
,
str
);
}
...
...
@@ -228,29 +228,29 @@ TEST(testCase, use_db_test) {
taos_close
(
pConn
);
}
//TEST(testCase, drop_db_test) {
//
//
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
//
//
assert(pConn != NULL);
//
//
//
//
showDB(pConn);
//
//
//
//
TAOS_RES* pRes = taos_query(pConn, "drop database abc1");
//
//
if (taos_errno(pRes) != 0) {
//
//
printf("failed to drop db, reason:%s\n", taos_errstr(pRes));
//
//
}
//
//
taos_free_result(pRes);
//
//
//
//
showDB(pConn);
//
//
//
//
pRes = taos_query(pConn, "create database abc1");
//
//
if (taos_errno(pRes) != 0) {
//
//
printf("create to drop db, reason:%s\n", taos_errstr(pRes));
//
//
}
//
//
taos_free_result(pRes);
//
//
taos_close(pConn);
//
TEST(testCase, drop_db_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// showDB(pConn);
//
// TAOS_RES* pRes = taos_query(pConn, "drop database abc1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// showDB(pConn);
//
// pRes = taos_query(pConn, "create database abc1");
// if (taos_errno(pRes) != 0) {
// printf("create to drop db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
// taos_close(pConn);
//}
TEST
(
testCase
,
create_stable_Test
)
{
TEST
(
testCase
,
create_stable_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
...
...
@@ -281,188 +281,100 @@ TEST(testCase, use_db_test) {
taos_close
(
pConn
);
}
//TEST(testCase, create_table_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create table tm0(ts timestamp, k int)");
// taos_free_result(pRes);
//
// taos_close(pConn);
//}
TEST
(
testCase
,
create_table_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
//TEST(testCase, create_ctable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("failed to use db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create table tm0 using st1 tags(1)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
//TEST(testCase, show_stable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("failed to use db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "show stables");
// if (taos_errno(pRes) != 0) {
// printf("failed to show stables, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
//
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1"
);
taos_free_result
(
pRes
);
//TEST(testCase, show_vgroup_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("failed to use db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "show vgroups");
// if (taos_errno(pRes) != 0) {
// printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
//
// TAOS_ROW pRow = NULL;
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_free_result(pRes);
//
// taos_close(pConn);
//}
pRes
=
taos_query
(
pConn
,
"create table tm0(ts timestamp, k int)"
);
taos_free_result
(
pRes
);
//TEST(testCase, drop_stable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create database abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in creating db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in using db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "drop stable st1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop stable, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
taos_close
(
pConn
);
}
//TEST(testCase, create_topic_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create database abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in use db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)");
// if (taos_errno(pRes) != 0) {
// printf("error in create stable, reason:%s\n", taos_errstr(pRes));
// }
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == NULL);
//
// int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes);
//
// char* sql = "select * from st1";
// tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql));
// taos_close(pConn);
//}
TEST
(
testCase
,
create_ctable_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
//TEST(testCase, show_table_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "show tables");
// if (taos_errno(pRes) != 0) {
// printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
//
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table tm0 using st1 tags(1)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table tm0, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
TEST
(
testCase
,
show_stable_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"show stables"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to show stables, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taos_free_result
(
pRes
);
ASSERT_TRUE
(
false
);
}
TAOS_ROW
pRow
=
NULL
;
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
char
str
[
512
]
=
{
0
};
while
((
pRow
=
taos_fetch_row
(
pRes
))
!=
NULL
)
{
int32_t
code
=
taos_print_row
(
str
,
pRow
,
pFields
,
numOfFields
);
printf
(
"%s
\n
"
,
str
);
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
TEST
(
testCase
,
show_vgroup_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"show vgroups"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to show vgroups, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taos_free_result
(
pRes
);
ASSERT_TRUE
(
false
);
}
TAOS_ROW
pRow
=
NULL
;
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
char
str
[
512
]
=
{
0
};
while
((
pRow
=
taos_fetch_row
(
pRes
))
!=
NULL
)
{
int32_t
code
=
taos_print_row
(
str
,
pRow
,
pFields
,
numOfFields
);
printf
(
"%s
\n
"
,
str
);
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
TEST
(
testCase
,
create_multiple_tables
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
...
...
@@ -493,39 +405,125 @@ TEST(testCase, create_multiple_tables) {
ASSERT_TRUE
(
false
);
}
TAOS_ROW
pRow
=
NULL
;
TAOS_ROW
pRow
=
NULL
;
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
char
str
[
512
]
=
{
0
};
while
((
pRow
=
taos_fetch_row
(
pRes
))
!=
NULL
)
{
while
((
pRow
=
taos_fetch_row
(
pRes
))
!=
NULL
)
{
int32_t
code
=
taos_print_row
(
str
,
pRow
,
pFields
,
numOfFields
);
printf
(
"%s
\n
"
,
str
);
}
taos_free_result
(
pRes
);
for
(
int32_t
i
=
0
;
i
<
20000
0
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
2
0
;
++
i
)
{
char
sql
[
512
]
=
{
0
};
snprintf
(
sql
,
tListLen
(
sql
),
"create table t_x_%d using st1 tags(2)"
,
i
);
snprintf
(
sql
,
tListLen
(
sql
),
"create table t_x_%d using st1 tags(2) t_x_%d using st1 tags(5) t_x_%d using st1 tags(911)"
,
i
,
(
i
+
1
)
*
30
,
(
i
+
2
)
*
40
);
TAOS_RES
*
pres
=
taos_query
(
pConn
,
sql
);
if
(
taos_errno
(
pres
)
!=
0
)
{
printf
(
"failed to create table %d
\n
, reason:%s"
,
i
,
taos_errstr
(
pres
));
}
printf
(
"%d
\n
"
,
i
);
taos_free_result
(
pres
);
}
taos_close
(
pConn
);
}
TEST
(
testCase
,
show_table_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1"
);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"show tables"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to show vgroups, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taos_free_result
(
pRes
);
ASSERT_TRUE
(
false
);
}
TAOS_ROW
pRow
=
NULL
;
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
char
str
[
512
]
=
{
0
};
while
((
pRow
=
taos_fetch_row
(
pRes
))
!=
NULL
)
{
int32_t
code
=
taos_print_row
(
str
,
pRow
,
pFields
,
numOfFields
);
printf
(
"%s
\n
"
,
str
);
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
TEST
(
testCase
,
drop_stable_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"create database abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in creating db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in using db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop stable st1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to drop stable, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
// TEST(testCase, create_topic_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create database abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in use db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)");
// if (taos_errno(pRes) != 0) {
// printf("error in create stable, reason:%s\n", taos_errstr(pRes));
// }
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == NULL);
//
// int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes);
//
// char* sql = "select * from st1";
// tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql));
// taos_close(pConn);
//}
TEST
(
testCase
,
generated_request_id_test
)
{
SHashObj
*
phash
=
taosHashInit
(
10000
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_ENTRY_LOCK
);
SHashObj
*
phash
=
taosHashInit
(
10000
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_ENTRY_LOCK
);
for
(
int32_t
i
=
0
;
i
<
50000
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
50000
;
++
i
)
{
uint64_t
v
=
generateRequestId
();
void
*
result
=
taosHashGet
(
phash
,
&
v
,
sizeof
(
v
));
void
*
result
=
taosHashGet
(
phash
,
&
v
,
sizeof
(
v
));
if
(
result
!=
nullptr
)
{
printf
(
"0x%lx, index:%d
\n
"
,
v
,
i
);
}
...
...
@@ -536,7 +534,7 @@ TEST(testCase, generated_request_id_test) {
taosHashCleanup
(
phash
);
}
//TEST(testCase, projection_query_tables) {
//
TEST(testCase, projection_query_tables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_EQ(pConn, nullptr);
//
...
...
@@ -563,4 +561,3 @@ TEST(testCase, generated_request_id_test) {
// taos_free_result(pRes);
// taos_close(pConn);
//}
source/common/src/tglobal.c
浏览文件 @
374c8b54
...
...
@@ -910,7 +910,7 @@ static void doInitGlobalConfig(void) {
cfg
.
option
=
"tsdbDebugFlag"
;
cfg
.
ptr
=
&
tsdbDebugFlag
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT32
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_LOG
|
TSDB_CFG_CTYPE_B_CLIENT
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_LOG
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
255
;
cfg
.
ptrLength
=
0
;
...
...
source/common/src/tmsg.c
浏览文件 @
374c8b54
...
...
@@ -27,6 +27,40 @@
#undef TD_MSG_SEG_CODE_
#include "tmsgdef.h"
int
tSerializeSClientHbReq
(
void
**
buf
,
const
SClientHbReq
*
pReq
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeSClientHbKey
(
buf
,
&
pReq
->
connKey
);
void
*
pIter
=
NULL
;
void
*
data
;
SKlv
klv
;
data
=
taosHashIterate
(
pReq
->
info
,
pIter
);
while
(
data
!=
NULL
)
{
taosHashGetKey
(
data
,
&
klv
.
key
,
(
size_t
*
)
&
klv
.
keyLen
);
klv
.
valueLen
=
taosHashGetDataLen
(
data
);
klv
.
value
=
data
;
taosEncodeSKlv
(
buf
,
&
klv
);
data
=
taosHashIterate
(
pReq
->
info
,
pIter
);
}
return
tlen
;
}
void
*
tDeserializeClientHbReq
(
void
*
buf
,
SClientHbReq
*
pReq
)
{
ASSERT
(
pReq
->
info
!=
NULL
);
buf
=
taosDecodeSClientHbKey
(
buf
,
&
pReq
->
connKey
);
// TODO: error handling
if
(
pReq
->
info
==
NULL
)
{
pReq
->
info
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
}
SKlv
klv
;
buf
=
taosDecodeSKlv
(
buf
,
&
klv
);
taosHashPut
(
pReq
->
info
,
klv
.
key
,
klv
.
keyLen
,
klv
.
value
,
klv
.
valueLen
);
return
buf
;
}
int
tSerializeSVCreateTbReq
(
void
**
buf
,
SVCreateTbReq
*
pReq
)
{
int
tlen
=
0
;
...
...
@@ -148,4 +182,4 @@ void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) {
}
return
buf
;
}
\ No newline at end of file
}
source/dnode/mgmt/impl/src/dndMnode.c
浏览文件 @
374c8b54
...
...
@@ -305,18 +305,18 @@ static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) {
memcpy
(
&
pOption
->
replicas
,
pMgmt
->
replicas
,
sizeof
(
SReplica
)
*
TSDB_MAX_REPLICA
);
}
static
int32_t
dndBuildMnodeOptionFrom
Msg
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
,
SDCreateMnodeMsg
*
pMsg
)
{
static
int32_t
dndBuildMnodeOptionFrom
Req
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
,
SDCreateMnodeMsg
*
pReq
)
{
dndInitMnodeOption
(
pDnode
,
pOption
);
pOption
->
dnodeId
=
dndGetDnodeId
(
pDnode
);
pOption
->
clusterId
=
dndGetClusterId
(
pDnode
);
pOption
->
replica
=
p
Msg
->
replica
;
pOption
->
replica
=
p
Req
->
replica
;
pOption
->
selfIndex
=
-
1
;
for
(
int32_t
i
=
0
;
i
<
p
Msg
->
replica
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
p
Req
->
replica
;
++
i
)
{
SReplica
*
pReplica
=
&
pOption
->
replicas
[
i
];
pReplica
->
id
=
p
Msg
->
replicas
[
i
].
id
;
pReplica
->
port
=
p
Msg
->
replicas
[
i
].
port
;
memcpy
(
pReplica
->
fqdn
,
p
Msg
->
replicas
[
i
].
fqdn
,
TSDB_FQDN_LEN
);
pReplica
->
id
=
p
Req
->
replicas
[
i
].
id
;
pReplica
->
port
=
p
Req
->
replicas
[
i
].
port
;
memcpy
(
pReplica
->
fqdn
,
p
Req
->
replicas
[
i
].
fqdn
,
TSDB_FQDN_LEN
);
if
(
pReplica
->
id
==
pOption
->
dnodeId
)
{
pOption
->
selfIndex
=
i
;
}
...
...
@@ -423,26 +423,26 @@ static int32_t dndDropMnode(SDnode *pDnode) {
return
0
;
}
static
SDCreateMnodeMsg
*
dndParseCreateMnode
Msg
(
SRpcMsg
*
pRpcMsg
)
{
SDCreateMnodeMsg
*
p
Msg
=
pRpcMsg
->
pCont
;
p
Msg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
for
(
int32_t
i
=
0
;
i
<
p
Msg
->
replica
;
++
i
)
{
p
Msg
->
replicas
[
i
].
id
=
htonl
(
pMsg
->
replicas
[
i
].
id
);
p
Msg
->
replicas
[
i
].
port
=
htons
(
pMsg
->
replicas
[
i
].
port
);
static
SDCreateMnodeMsg
*
dndParseCreateMnode
Req
(
SRpcMsg
*
pReq
)
{
SDCreateMnodeMsg
*
p
Create
=
pReq
->
pCont
;
p
Create
->
dnodeId
=
htonl
(
pCreate
->
dnodeId
);
for
(
int32_t
i
=
0
;
i
<
p
Create
->
replica
;
++
i
)
{
p
Create
->
replicas
[
i
].
id
=
htonl
(
pCreate
->
replicas
[
i
].
id
);
p
Create
->
replicas
[
i
].
port
=
htons
(
pCreate
->
replicas
[
i
].
port
);
}
return
p
Msg
;
return
p
Create
;
}
int32_t
dndProcessCreateMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pR
pcMsg
)
{
SDCreateMnodeMsg
*
p
Msg
=
dndParseCreateMnodeMsg
(
pRpcMsg
);
int32_t
dndProcessCreateMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pR
eq
)
{
SDCreateMnodeMsg
*
p
Create
=
dndParseCreateMnodeReq
(
pReq
);
if
(
p
Msg
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
if
(
p
Create
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
terrno
=
TSDB_CODE_DND_MNODE_ID_INVALID
;
return
-
1
;
}
else
{
SMnodeOpt
option
=
{
0
};
if
(
dndBuildMnodeOptionFrom
Msg
(
pDnode
,
&
option
,
pMsg
)
!=
0
)
{
if
(
dndBuildMnodeOptionFrom
Req
(
pDnode
,
&
option
,
pCreate
)
!=
0
)
{
return
-
1
;
}
...
...
@@ -450,16 +450,16 @@ int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
}
}
int32_t
dndProcessAlterMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pR
pcMsg
)
{
SDAlterMnodeMsg
*
p
Msg
=
dndParseCreateMnodeMsg
(
pRpcMsg
);
int32_t
dndProcessAlterMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pR
eq
)
{
SDAlterMnodeMsg
*
p
Alter
=
dndParseCreateMnodeReq
(
pReq
);
if
(
p
Msg
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
if
(
p
Alter
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
terrno
=
TSDB_CODE_DND_MNODE_ID_INVALID
;
return
-
1
;
}
SMnodeOpt
option
=
{
0
};
if
(
dndBuildMnodeOptionFrom
Msg
(
pDnode
,
&
option
,
pMsg
)
!=
0
)
{
if
(
dndBuildMnodeOptionFrom
Req
(
pDnode
,
&
option
,
pAlter
)
!=
0
)
{
return
-
1
;
}
...
...
@@ -470,11 +470,11 @@ int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
return
dndWriteMnodeFile
(
pDnode
);
}
int32_t
dndProcessDropMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pR
pcMsg
)
{
SDDropMnodeMsg
*
p
Msg
=
pRpcMsg
->
pCont
;
p
Msg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
int32_t
dndProcessDropMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pR
eq
)
{
SDDropMnodeMsg
*
p
Drop
=
pReq
->
pCont
;
p
Drop
->
dnodeId
=
htonl
(
pDrop
->
dnodeId
);
if
(
p
Msg
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
if
(
p
Drop
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
terrno
=
TSDB_CODE_DND_MNODE_ID_INVALID
;
return
-
1
;
}
else
{
...
...
source/dnode/mgmt/impl/src/dndTransport.c
浏览文件 @
374c8b54
...
...
@@ -143,26 +143,26 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_SHOW_TABLES_FETCH
)]
=
dndProcessVnodeFetchMsg
;
}
static
void
dndProcessResponse
(
void
*
parent
,
SRpcMsg
*
p
Msg
,
SEpSet
*
pEpSet
)
{
static
void
dndProcessResponse
(
void
*
parent
,
SRpcMsg
*
p
Rsp
,
SEpSet
*
pEpSet
)
{
SDnode
*
pDnode
=
parent
;
STransMgmt
*
pMgmt
=
&
pDnode
->
tmgmt
;
tmsg_t
msgType
=
p
Msg
->
msgType
;
tmsg_t
msgType
=
p
Rsp
->
msgType
;
if
(
dndGetStat
(
pDnode
)
==
DND_STAT_STOPPED
)
{
if
(
p
Msg
==
NULL
||
pMsg
->
pCont
==
NULL
)
return
;
dTrace
(
"RPC %p, rsp:%s is ignored since dnode is stopping"
,
p
Msg
->
handle
,
TMSG_INFO
(
msgType
));
rpcFreeCont
(
p
Msg
->
pCont
);
if
(
p
Rsp
==
NULL
||
pRsp
->
pCont
==
NULL
)
return
;
dTrace
(
"RPC %p, rsp:%s is ignored since dnode is stopping"
,
p
Rsp
->
handle
,
TMSG_INFO
(
msgType
));
rpcFreeCont
(
p
Rsp
->
pCont
);
return
;
}
DndMsgFp
fp
=
pMgmt
->
msgFp
[
TMSG_INDEX
(
msgType
)];
if
(
fp
!=
NULL
)
{
dTrace
(
"RPC %p, rsp:%s will be processed, code:0x%x"
,
p
Msg
->
handle
,
TMSG_INFO
(
msgType
),
pMsg
->
code
&
0XFFFF
);
(
*
fp
)(
pDnode
,
p
Msg
,
pEpSet
);
dTrace
(
"RPC %p, rsp:%s will be processed, code:0x%x"
,
p
Rsp
->
handle
,
TMSG_INFO
(
msgType
),
pRsp
->
code
&
0XFFFF
);
(
*
fp
)(
pDnode
,
p
Rsp
,
pEpSet
);
}
else
{
dError
(
"RPC %p, rsp:%s not processed"
,
p
Msg
->
handle
,
TMSG_INFO
(
msgType
));
rpcFreeCont
(
p
Msg
->
pCont
);
dError
(
"RPC %p, rsp:%s not processed"
,
p
Rsp
->
handle
,
TMSG_INFO
(
msgType
));
rpcFreeCont
(
p
Rsp
->
pCont
);
}
}
...
...
@@ -201,48 +201,48 @@ static void dndCleanupClient(SDnode *pDnode) {
}
}
static
void
dndProcessRequest
(
void
*
param
,
SRpcMsg
*
p
Msg
,
SEpSet
*
pEpSet
)
{
static
void
dndProcessRequest
(
void
*
param
,
SRpcMsg
*
p
Req
,
SEpSet
*
pEpSet
)
{
SDnode
*
pDnode
=
param
;
STransMgmt
*
pMgmt
=
&
pDnode
->
tmgmt
;
tmsg_t
msgType
=
p
Msg
->
msgType
;
tmsg_t
msgType
=
p
Req
->
msgType
;
if
(
msgType
==
TDMT_DND_NETWORK_TEST
)
{
dTrace
(
"RPC %p, network test req, app:%p will be processed, code:0x%x"
,
p
Msg
->
handle
,
pMsg
->
ahandle
,
pMsg
->
code
);
dndProcessStartupReq
(
pDnode
,
p
Msg
);
dTrace
(
"RPC %p, network test req, app:%p will be processed, code:0x%x"
,
p
Req
->
handle
,
pReq
->
ahandle
,
pReq
->
code
);
dndProcessStartupReq
(
pDnode
,
p
Req
);
return
;
}
if
(
dndGetStat
(
pDnode
)
==
DND_STAT_STOPPED
)
{
dError
(
"RPC %p, req:%s app:%p is ignored since dnode exiting"
,
p
Msg
->
handle
,
TMSG_INFO
(
msgType
),
pMsg
->
ahandle
);
SRpcMsg
rspMsg
=
{.
handle
=
p
Msg
->
handle
,
.
code
=
TSDB_CODE_DND_OFFLINE
};
dError
(
"RPC %p, req:%s app:%p is ignored since dnode exiting"
,
p
Req
->
handle
,
TMSG_INFO
(
msgType
),
pReq
->
ahandle
);
SRpcMsg
rspMsg
=
{.
handle
=
p
Req
->
handle
,
.
code
=
TSDB_CODE_DND_OFFLINE
};
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
p
Msg
->
pCont
);
rpcFreeCont
(
p
Req
->
pCont
);
return
;
}
else
if
(
dndGetStat
(
pDnode
)
!=
DND_STAT_RUNNING
)
{
dError
(
"RPC %p, req:%s app:%p is ignored since dnode not running"
,
p
Msg
->
handle
,
TMSG_INFO
(
msgType
),
pMsg
->
ahandle
);
SRpcMsg
rspMsg
=
{.
handle
=
p
Msg
->
handle
,
.
code
=
TSDB_CODE_APP_NOT_READY
};
dError
(
"RPC %p, req:%s app:%p is ignored since dnode not running"
,
p
Req
->
handle
,
TMSG_INFO
(
msgType
),
pReq
->
ahandle
);
SRpcMsg
rspMsg
=
{.
handle
=
p
Req
->
handle
,
.
code
=
TSDB_CODE_APP_NOT_READY
};
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
p
Msg
->
pCont
);
rpcFreeCont
(
p
Req
->
pCont
);
return
;
}
if
(
p
Msg
->
pCont
==
NULL
)
{
dTrace
(
"RPC %p, req:%s app:%p not processed since content is null"
,
p
Msg
->
handle
,
TMSG_INFO
(
msgType
),
p
Msg
->
ahandle
);
SRpcMsg
rspMsg
=
{.
handle
=
p
Msg
->
handle
,
.
code
=
TSDB_CODE_DND_INVALID_MSG_LEN
};
if
(
p
Req
->
pCont
==
NULL
)
{
dTrace
(
"RPC %p, req:%s app:%p not processed since content is null"
,
p
Req
->
handle
,
TMSG_INFO
(
msgType
),
p
Req
->
ahandle
);
SRpcMsg
rspMsg
=
{.
handle
=
p
Req
->
handle
,
.
code
=
TSDB_CODE_DND_INVALID_MSG_LEN
};
rpcSendResponse
(
&
rspMsg
);
return
;
}
DndMsgFp
fp
=
pMgmt
->
msgFp
[
TMSG_INDEX
(
msgType
)];
if
(
fp
!=
NULL
)
{
dTrace
(
"RPC %p, req:%s app:%p will be processed"
,
p
Msg
->
handle
,
TMSG_INFO
(
msgType
),
pMsg
->
ahandle
);
(
*
fp
)(
pDnode
,
p
Msg
,
pEpSet
);
dTrace
(
"RPC %p, req:%s app:%p will be processed"
,
p
Req
->
handle
,
TMSG_INFO
(
msgType
),
pReq
->
ahandle
);
(
*
fp
)(
pDnode
,
p
Req
,
pEpSet
);
}
else
{
dError
(
"RPC %p, req:%s app:%p is not processed since no handle"
,
p
Msg
->
handle
,
TMSG_INFO
(
msgType
),
pMsg
->
ahandle
);
SRpcMsg
rspMsg
=
{.
handle
=
p
Msg
->
handle
,
.
code
=
TSDB_CODE_MSG_NOT_PROCESSED
};
dError
(
"RPC %p, req:%s app:%p is not processed since no handle"
,
p
Req
->
handle
,
TMSG_INFO
(
msgType
),
pReq
->
ahandle
);
SRpcMsg
rspMsg
=
{.
handle
=
p
Req
->
handle
,
.
code
=
TSDB_CODE_MSG_NOT_PROCESSED
};
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
p
Msg
->
pCont
);
rpcFreeCont
(
p
Req
->
pCont
);
}
}
...
...
@@ -254,7 +254,7 @@ static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRp
rpcSendRecv
(
pMgmt
->
clientRpc
,
&
epSet
,
pRpcMsg
,
pRpcRsp
);
}
static
int32_t
dndAuthInternal
Msg
(
SDnode
*
pDnode
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
static
int32_t
dndAuthInternal
Req
(
SDnode
*
pDnode
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
if
(
strcmp
(
user
,
INTERNAL_USER
)
==
0
)
{
// A simple temporary implementation
char
pass
[
TSDB_PASSWORD_LEN
]
=
{
0
};
...
...
@@ -281,7 +281,7 @@ static int32_t dndAuthInternalMsg(SDnode *pDnode, char *user, char *spi, char *e
static
int32_t
dndRetrieveUserAuthInfo
(
void
*
parent
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
SDnode
*
pDnode
=
parent
;
if
(
dndAuthInternal
Msg
(
parent
,
user
,
spi
,
encrypt
,
secret
,
ckey
)
==
0
)
{
if
(
dndAuthInternal
Req
(
parent
,
user
,
spi
,
encrypt
,
secret
,
ckey
)
==
0
)
{
// dTrace("get internal auth success");
return
0
;
}
...
...
@@ -298,10 +298,10 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
// dDebug("user:%s, send auth msg to other mnodes", user);
SAuth
Msg
*
pMsg
=
rpcMallocCont
(
sizeof
(
SAuthMsg
));
tstrncpy
(
p
Msg
->
user
,
user
,
TSDB_USER_LEN
);
SAuth
Req
*
pReq
=
rpcMallocCont
(
sizeof
(
SAuthReq
));
tstrncpy
(
p
Req
->
user
,
user
,
TSDB_USER_LEN
);
SRpcMsg
rpcMsg
=
{.
pCont
=
p
Msg
,
.
contLen
=
sizeof
(
SAuthMsg
),
.
msgType
=
TDMT_MND_AUTH
};
SRpcMsg
rpcMsg
=
{.
pCont
=
p
Req
,
.
contLen
=
sizeof
(
SAuthReq
),
.
msgType
=
TDMT_MND_AUTH
};
SRpcMsg
rpcRsp
=
{
0
};
dndSendMsgToMnodeRecv
(
pDnode
,
&
rpcMsg
,
&
rpcRsp
);
...
...
@@ -381,19 +381,19 @@ void dndCleanupTrans(SDnode *pDnode) {
dInfo
(
"dnode-transport is cleaned up"
);
}
int32_t
dndSendReqToDnode
(
SDnode
*
pDnode
,
SEpSet
*
pEpSet
,
SRpcMsg
*
p
Msg
)
{
int32_t
dndSendReqToDnode
(
SDnode
*
pDnode
,
SEpSet
*
pEpSet
,
SRpcMsg
*
p
Req
)
{
STransMgmt
*
pMgmt
=
&
pDnode
->
tmgmt
;
if
(
pMgmt
->
clientRpc
==
NULL
)
{
terrno
=
TSDB_CODE_DND_OFFLINE
;
return
-
1
;
}
rpcSendRequest
(
pMgmt
->
clientRpc
,
pEpSet
,
p
Msg
,
NULL
);
rpcSendRequest
(
pMgmt
->
clientRpc
,
pEpSet
,
p
Req
,
NULL
);
return
0
;
}
int32_t
dndSendReqToMnode
(
SDnode
*
pDnode
,
SRpcMsg
*
p
Msg
)
{
int32_t
dndSendReqToMnode
(
SDnode
*
pDnode
,
SRpcMsg
*
p
Req
)
{
SEpSet
epSet
=
{
0
};
dndGetMnodeEpSet
(
pDnode
,
&
epSet
);
return
dndSendReqToDnode
(
pDnode
,
&
epSet
,
p
Msg
);
return
dndSendReqToDnode
(
pDnode
,
&
epSet
,
p
Req
);
}
source/dnode/mgmt/impl/test/mnode/qmnode.cpp
浏览文件 @
374c8b54
...
...
@@ -24,3 +24,160 @@ class DndTestMnode : public ::testing::Test {
};
Testbase
DndTestMnode
::
test
;
#if 0
TEST_F(DndTestMnode, 01_Create_Mnode) {
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID);
}
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED);
}
test.Restart();
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED);
}
}
TEST_F(DndTestMnode, 02_Alter_Mnode) {
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID);
}
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED);
}
test.Restart();
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED);
}
}
TEST_F(DndTestMnode, 03_Drop_Mnode) {
{
int32_t contLen = sizeof(SDDropMnodeReq);
SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID);
}
{
int32_t contLen = sizeof(SDDropMnodeReq);
SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
{
int32_t contLen = sizeof(SDDropMnodeReq);
SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED);
}
test.Restart();
{
int32_t contLen = sizeof(SDDropMnodeReq);
SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED);
}
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
}
#endif
\ No newline at end of file
source/libs/catalog/src/catalog.c
浏览文件 @
374c8b54
...
...
@@ -419,10 +419,8 @@ int32_t ctgGetVgInfoFromHashValue(struct SCatalog *pCatalog, SDBVgroupInfo *dbIn
vgInfo
=
NULL
;
}
ctgInfo
(
"numOfVgroup:%d"
,
taosHashGetSize
(
dbInfo
->
vgInfo
));
if
(
NULL
==
vgInfo
)
{
ctgError
(
"no hash range found for hash
value[%u], db:%s"
,
hashValue
,
db
);
ctgError
(
"no hash range found for hash
value [%u], db:%s, numOfVgId:%d"
,
hashValue
,
db
,
taosHashGetSize
(
dbInfo
->
vgInfo
)
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
...
...
@@ -1078,12 +1076,11 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
SDBVgroupInfo
*
db
=
NULL
;
SDBVgroupInfo
*
db
=
NULL
;
SVgroupInfo
*
vgInfo
=
NULL
;
int32_t
code
=
0
;
SArray
*
vgList
=
NULL
;
CTG_ERR_JRET
(
ctgGetDBVgroup
(
pCatalog
,
pRpc
,
pMgmtEps
,
dbName
,
forceUpdate
,
&
db
));
vgList
=
taosArrayInit
(
taosHashGetSize
(
db
->
vgInfo
),
sizeof
(
SVgroupInfo
));
...
...
@@ -1109,7 +1106,6 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
vgList
=
NULL
;
_return:
if
(
db
)
{
CTG_UNLOCK
(
CTG_READ
,
&
db
->
lock
);
taosHashRelease
(
pCatalog
->
dbCache
.
cache
,
db
);
...
...
source/libs/parser/inc/astGenerator.h
浏览文件 @
374c8b54
...
...
@@ -123,7 +123,7 @@ typedef struct SCreatedTableInfo {
SToken
name
;
// table name token
SToken
stbName
;
// super table name token , for using clause
SArray
*
pTagNames
;
// create by using super table, tag name
SArray
*
pTagVals
;
// create by using super table, tag value
SArray
*
pTagVals
;
// create by using super table, tag value
. SArray<SToken>
char
*
fullname
;
// table full name
int8_t
igExist
;
// ignore if exists
}
SCreatedTableInfo
;
...
...
source/libs/parser/src/astGenerator.c
浏览文件 @
374c8b54
...
...
@@ -686,7 +686,7 @@ void destroySqlNode(SSqlNode *pSqlNode) {
void
freeCreateTableInfo
(
void
*
p
)
{
SCreatedTableInfo
*
pInfo
=
(
SCreatedTableInfo
*
)
p
;
taosArrayDestroy
(
pInfo
->
pTagNames
);
taosArrayDestroy
Ex
(
pInfo
->
pTagVals
,
freeItem
);
taosArrayDestroy
(
pInfo
->
pTagVals
);
tfree
(
pInfo
->
fullname
);
}
...
...
source/libs/parser/src/dCDAstProcess.c
浏览文件 @
374c8b54
...
...
@@ -332,7 +332,6 @@ static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputT
char
*
endPtr
=
NULL
;
char
tmpTokenBuf
[
TSDB_MAX_TAGS_LEN
]
=
{
0
};
SKvParam
param
=
{.
builder
=
pKvRowBuilder
,
.
schema
=
pSchema
};
SToken
*
pItem
=
taosArrayGet
(
pTagValList
,
i
);
...
...
source/libs/parser/src/parser.c
浏览文件 @
374c8b54
...
...
@@ -44,15 +44,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
}
if
(
!
isDqlSqlStatement
(
&
info
))
{
// bool toVnode = false;
if
(
info
.
type
==
TSDB_SQL_CREATE_TABLE
)
{
// SCreateTableSql* pCreateSql = info.pCreateTableInfo;
// if (pCreateSql->type == TSQL_CREATE_CTABLE || pCreateSql->type == TSQL_CREATE_TABLE) {
// toVnode = true;
// }
// }
// if (toVnode) {
SVnodeModifOpStmtInfo
*
pModifStmtInfo
=
qParserValidateCreateTbSqlNode
(
&
info
,
&
pCxt
->
ctx
,
pCxt
->
pMsg
,
pCxt
->
msgLen
);
if
(
pModifStmtInfo
==
NULL
)
{
return
terrno
;
...
...
source/util/src/thash.c
浏览文件 @
374c8b54
...
...
@@ -19,8 +19,9 @@
#include "taos.h"
#include "tdef.h"
#define EXT_SIZE 1024
// the add ref count operation may trigger the warning if the reference count is greater than the MAX_WARNING_REF_COUNT
#define MAX_WARNING_REF_COUNT 10000
#define EXT_SIZE 1024
#define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR)
#define DO_FREE_HASH_NODE(_n) \
...
...
@@ -924,8 +925,24 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
if
(
pNode
)
{
SHashEntry
*
pe
=
pHashObj
->
hashList
[
slot
];
pNode
->
count
++
;
data
=
GET_HASH_NODE_DATA
(
pNode
);
uint16_t
prevRef
=
atomic_load_16
(
&
pNode
->
count
);
uint16_t
afterRef
=
atomic_add_fetch_16
(
&
pNode
->
count
,
1
);
// the reference count value is overflow, which will cause the delete node operation immediately.
if
(
prevRef
>
afterRef
)
{
uError
(
"hash entry ref count overflow, prev ref:%d, current ref:%d"
,
prevRef
,
afterRef
);
// restore the value
atomic_sub_fetch_16
(
&
pNode
->
count
,
1
);
data
=
NULL
;
}
else
{
data
=
GET_HASH_NODE_DATA
(
pNode
);
}
if
(
afterRef
>=
MAX_WARNING_REF_COUNT
)
{
uWarn
(
"hash entry ref count is abnormally high: %d"
,
afterRef
);
}
if
(
pHashObj
->
type
==
HASH_ENTRY_LOCK
)
{
taosWUnLockLatch
(
&
pe
->
latch
);
}
...
...
@@ -933,7 +950,6 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
__rd_unlock
((
void
*
)
&
pHashObj
->
lock
,
pHashObj
->
type
);
return
data
;
}
void
taosHashCancelIterate
(
SHashObj
*
pHashObj
,
void
*
p
)
{
...
...
tests/test/c/create_table.c
浏览文件 @
374c8b54
...
...
@@ -25,20 +25,21 @@
char
dbName
[
32
]
=
"db"
;
char
stbName
[
64
]
=
"st"
;
int32_t
numOfThreads
=
1
;
int
32_t
numOfTables
=
1
0000
;
int
64_t
numOfTables
=
20
0000
;
int32_t
createTable
=
1
;
int32_t
insertData
=
0
;
int32_t
batchNum
=
10
;
int32_t
batchNum
=
10
0
;
int32_t
numOfVgroups
=
2
;
typedef
struct
{
int
32
_t
tableBeginIndex
;
int
32
_t
tableEndIndex
;
int
64
_t
tableBeginIndex
;
int
64
_t
tableEndIndex
;
int32_t
threadIndex
;
char
dbName
[
32
];
char
stbName
[
64
];
float
createTableSpeed
;
float
insertDataSpeed
;
int64_t
startMs
;
pthread_t
thread
;
}
SThreadInfo
;
...
...
@@ -57,7 +58,7 @@ int32_t main(int32_t argc, char *argv[]) {
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
calloc
(
numOfThreads
,
sizeof
(
SThreadInfo
));
int
32
_t
numOfTablesPerThread
=
numOfTables
/
numOfThreads
;
int
64
_t
numOfTablesPerThread
=
numOfTables
/
numOfThreads
;
numOfTables
=
numOfTablesPerThread
*
numOfThreads
;
for
(
int32_t
i
=
0
;
i
<
numOfThreads
;
++
i
)
{
pInfo
[
i
].
tableBeginIndex
=
i
*
numOfTablesPerThread
;
...
...
@@ -83,8 +84,10 @@ int32_t main(int32_t argc, char *argv[]) {
insertDataSpeed
+=
pInfo
[
i
].
insertDataSpeed
;
}
pPrint
(
"%s total %.1f tables/second, threads:%d %s"
,
GREEN
,
createTableSpeed
,
numOfThreads
,
NC
);
pPrint
(
"%s total %.1f rows/second, threads:%d %s"
,
GREEN
,
insertDataSpeed
,
numOfThreads
,
NC
);
pPrint
(
"%s total %"
PRId64
" tables, %.1f tables/second, threads:%d %s"
,
GREEN
,
numOfTables
,
createTableSpeed
,
numOfThreads
,
NC
);
pPrint
(
"%s total %"
PRId64
" tables, %.1f rows/second, threads:%d %s"
,
GREEN
,
numOfTables
,
insertDataSpeed
,
numOfThreads
,
NC
);
pthread_attr_destroy
(
&
thattr
);
free
(
pInfo
);
...
...
@@ -130,6 +133,26 @@ void createDbAndStb() {
taos_close
(
con
);
}
void
printCreateProgress
(
SThreadInfo
*
pInfo
,
int64_t
t
)
{
int64_t
endMs
=
taosGetTimestampMs
();
int64_t
totalTables
=
t
-
pInfo
->
tableBeginIndex
;
float
seconds
=
(
endMs
-
pInfo
->
startMs
)
/
1000
.
0
;
float
speed
=
totalTables
/
seconds
;
pInfo
->
createTableSpeed
=
speed
;
pPrint
(
"thread:%d, %"
PRId64
" tables created, time:%.2f sec, speed:%.1f tables/second, "
,
pInfo
->
threadIndex
,
totalTables
,
seconds
,
speed
);
}
void
printInsertProgress
(
SThreadInfo
*
pInfo
,
int64_t
t
)
{
int64_t
endMs
=
taosGetTimestampMs
();
int64_t
totalTables
=
t
-
pInfo
->
tableBeginIndex
;
float
seconds
=
(
endMs
-
pInfo
->
startMs
)
/
1000
.
0
;
float
speed
=
totalTables
/
seconds
;
pInfo
->
insertDataSpeed
=
speed
;
pPrint
(
"thread:%d, %"
PRId64
" rows inserted, time:%.2f sec, speed:%.1f rows/second, "
,
pInfo
->
threadIndex
,
totalTables
,
seconds
,
speed
);
}
void
*
threadFunc
(
void
*
param
)
{
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
param
;
char
*
qstr
=
malloc
(
2000
*
1000
);
...
...
@@ -146,47 +169,55 @@ void *threadFunc(void *param) {
taos_free_result
(
pSql
);
if
(
createTable
)
{
int64_t
startMs
=
taosGetTimestampMs
();
for
(
int
32
_t
t
=
pInfo
->
tableBeginIndex
;
t
<
pInfo
->
tableEndIndex
;
++
t
)
{
int
32
_t
batch
=
(
pInfo
->
tableEndIndex
-
t
);
pInfo
->
startMs
=
taosGetTimestampMs
();
for
(
int
64
_t
t
=
pInfo
->
tableBeginIndex
;
t
<
pInfo
->
tableEndIndex
;
++
t
)
{
int
64
_t
batch
=
(
pInfo
->
tableEndIndex
-
t
);
batch
=
MIN
(
batch
,
batchNum
);
int32_t
len
=
sprintf
(
qstr
,
"create table"
);
for
(
int32_t
i
=
0
;
i
<
batch
;
++
i
)
{
len
+=
sprintf
(
qstr
+
len
,
" t%
d using %s tags(%d
)"
,
t
+
i
,
stbName
,
t
+
i
);
len
+=
sprintf
(
qstr
+
len
,
" t%
"
PRId64
" using %s tags(%"
PRId64
"
)"
,
t
+
i
,
stbName
,
t
+
i
);
}
TAOS_RES
*
pSql
=
taos_query
(
con
,
qstr
);
code
=
taos_errno
(
pSql
);
if
(
code
!=
0
)
{
pError
(
"failed to create table t%
d
, reason:%s"
,
t
,
tstrerror
(
code
));
pError
(
"failed to create table t%
"
PRId64
"
, reason:%s"
,
t
,
tstrerror
(
code
));
}
taos_free_result
(
pSql
);
if
(
t
%
100000
==
0
)
{
printCreateProgress
(
pInfo
,
t
);
}
t
+=
(
batch
-
1
);
}
int64_t
endMs
=
taosGetTimestampMs
();
int32_t
totalTables
=
pInfo
->
tableEndIndex
-
pInfo
->
tableBeginIndex
;
float
seconds
=
(
endMs
-
startMs
)
/
1000
.
0
;
float
speed
=
totalTables
/
seconds
;
pInfo
->
createTableSpeed
=
speed
;
pPrint
(
"thread:%d, time:%.2f sec, speed:%.1f tables/second, "
,
pInfo
->
threadIndex
,
seconds
,
speed
);
printCreateProgress
(
pInfo
,
pInfo
->
tableEndIndex
);
}
if
(
insertData
)
{
int64_t
startMs
=
taosGetTimestampMs
();
for
(
int32_t
t
=
pInfo
->
tableBeginIndex
;
t
<
pInfo
->
tableEndIndex
;
++
t
)
{
sprintf
(
qstr
,
"insert into %s%d values(now, 1)"
,
stbName
,
t
);
pInfo
->
startMs
=
taosGetTimestampMs
();
for
(
int64_t
t
=
pInfo
->
tableBeginIndex
;
t
<
pInfo
->
tableEndIndex
;
++
t
)
{
int64_t
batch
=
(
pInfo
->
tableEndIndex
-
t
);
batch
=
MIN
(
batch
,
batchNum
);
int32_t
len
=
sprintf
(
qstr
,
"insert into"
);
for
(
int32_t
i
=
0
;
i
<
batch
;
++
i
)
{
len
+=
sprintf
(
qstr
+
len
,
" t%"
PRId64
" values(now, %"
PRId64
")"
,
t
+
i
,
t
+
i
);
}
TAOS_RES
*
pSql
=
taos_query
(
con
,
qstr
);
code
=
taos_errno
(
pSql
);
if
(
code
!=
0
)
{
pError
(
"failed to
create table %s%d, reason:%s"
,
stbName
,
t
,
tstrerror
(
code
));
pError
(
"failed to
insert table t%"
PRId64
", reason:%s"
,
t
,
tstrerror
(
code
));
}
taos_free_result
(
pSql
);
if
(
t
%
100000
==
0
)
{
printInsertProgress
(
pInfo
,
t
);
}
t
+=
(
batch
-
1
);
}
int64_t
endMs
=
taosGetTimestampMs
();
int32_t
totalTables
=
pInfo
->
tableEndIndex
-
pInfo
->
tableBeginIndex
;
float
seconds
=
(
endMs
-
startMs
)
/
1000
.
0
;
float
speed
=
totalTables
/
seconds
;
pInfo
->
insertDataSpeed
=
speed
;
pPrint
(
"thread:%d, time:%.2f sec, speed:%.1f rows/second, "
,
pInfo
->
threadIndex
,
seconds
,
speed
);
printInsertProgress
(
pInfo
,
pInfo
->
tableEndIndex
);
}
taos_close
(
con
);
...
...
@@ -207,7 +238,7 @@ void printHelp() {
printf
(
"%s%s
\n
"
,
indent
,
"-t"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"numOfThreads, default is "
,
numOfThreads
);
printf
(
"%s%s
\n
"
,
indent
,
"-n"
);
printf
(
"%s%s%s%
d
\n
"
,
indent
,
indent
,
"numOfTables, default is "
,
numOfTables
);
printf
(
"%s%s%s%
"
PRId64
"
\n
"
,
indent
,
indent
,
"numOfTables, default is "
,
numOfTables
);
printf
(
"%s%s
\n
"
,
indent
,
"-v"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"numOfVgroups, default is "
,
numOfVgroups
);
printf
(
"%s%s
\n
"
,
indent
,
"-a"
);
...
...
@@ -234,7 +265,7 @@ void parseArgument(int32_t argc, char *argv[]) {
}
else
if
(
strcmp
(
argv
[
i
],
"-t"
)
==
0
)
{
numOfThreads
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-n"
)
==
0
)
{
numOfTables
=
ato
i
(
argv
[
++
i
]);
numOfTables
=
ato
ll
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-n"
)
==
0
)
{
numOfVgroups
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-a"
)
==
0
)
{
...
...
@@ -250,7 +281,7 @@ void parseArgument(int32_t argc, char *argv[]) {
pPrint
(
"%s dbName:%s %s"
,
GREEN
,
dbName
,
NC
);
pPrint
(
"%s stbName:%s %s"
,
GREEN
,
stbName
,
NC
);
pPrint
(
"%s configDir:%s %s"
,
GREEN
,
configDir
,
NC
);
pPrint
(
"%s numOfTables:%
d
%s"
,
GREEN
,
numOfTables
,
NC
);
pPrint
(
"%s numOfTables:%
"
PRId64
"
%s"
,
GREEN
,
numOfTables
,
NC
);
pPrint
(
"%s numOfThreads:%d %s"
,
GREEN
,
numOfThreads
,
NC
);
pPrint
(
"%s numOfVgroups:%d %s"
,
GREEN
,
numOfVgroups
,
NC
);
pPrint
(
"%s createTable:%d %s"
,
GREEN
,
createTable
,
NC
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录