Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
af9178c5
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
af9178c5
编写于
1月 19, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/tfs
上级
5c5d593e
bc1d8f6a
变更
26
展开全部
隐藏空白更改
内联
并排
Showing
26 changed file
with
1697 addition
and
711 deletion
+1697
-711
include/common/tmsg.h
include/common/tmsg.h
+362
-342
include/common/tmsgdef.h
include/common/tmsgdef.h
+1
-0
include/dnode/mnode/sdb/sdb.h
include/dnode/mnode/sdb/sdb.h
+2
-2
include/util/tdef.h
include/util/tdef.h
+1
-0
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+19
-19
source/dnode/mgmt/impl/src/dndTransport.c
source/dnode/mgmt/impl/src/dndTransport.c
+1
-1
source/dnode/mnode/impl/CMakeLists.txt
source/dnode/mnode/impl/CMakeLists.txt
+2
-1
source/dnode/mnode/impl/inc/mndConsumer.h
source/dnode/mnode/impl/inc/mndConsumer.h
+3
-0
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+228
-12
source/dnode/mnode/impl/inc/mndInt.h
source/dnode/mnode/impl/inc/mndInt.h
+1
-0
source/dnode/mnode/impl/inc/mndSubscribe.h
source/dnode/mnode/impl/inc/mndSubscribe.h
+38
-0
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+35
-232
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+3
-0
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+690
-0
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+3
-11
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+18
-7
source/dnode/mnode/sdb/src/sdbHash.c
source/dnode/mnode/sdb/src/sdbHash.c
+2
-2
source/dnode/vnode/src/vnd/vnodeWrite.c
source/dnode/vnode/src/vnd/vnodeWrite.c
+9
-0
source/libs/scheduler/CMakeLists.txt
source/libs/scheduler/CMakeLists.txt
+2
-2
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+19
-0
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+1
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+119
-34
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+14
-16
source/libs/transport/test/rclient.c
source/libs/transport/test/rclient.c
+3
-3
tests/script/sh/massiveTable/deployCluster.sh
tests/script/sh/massiveTable/deployCluster.sh
+56
-5
tests/test/c/create_table.c
tests/test/c/create_table.c
+65
-22
未找到文件。
include/common/tmsg.h
浏览文件 @
af9178c5
此差异已折叠。
点击以展开。
include/common/tmsgdef.h
浏览文件 @
af9178c5
...
...
@@ -140,6 +140,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MND_ALTER_TOPIC
,
"mnode-alter-topic"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_TOPIC
,
"mnode-drop-topic"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_SUBSCRIBE
,
"mnode-subscribe"
,
SCMSubscribeReq
,
SCMSubscribeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_MND_MQ_TIMER
,
"mnode-timer"
,
SMqTmrMsg
,
SMqTmrMsg
)
// Requests handled by VNODE
TD_NEW_MSG_SEG
(
TDMT_VND_MSG
)
...
...
include/dnode/mnode/sdb/sdb.h
浏览文件 @
af9178c5
...
...
@@ -113,8 +113,8 @@ typedef enum {
SDB_USER
=
7
,
SDB_AUTH
=
8
,
SDB_ACCT
=
9
,
SDB_
CONSUMER
=
10
,
SDB_C
GROUP
=
11
,
SDB_
SUBSCRIBE
=
10
,
SDB_C
ONSUMER
=
11
,
SDB_TOPIC
=
12
,
SDB_VGROUP
=
13
,
SDB_STB
=
14
,
...
...
include/util/tdef.h
浏览文件 @
af9178c5
...
...
@@ -178,6 +178,7 @@ do { \
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
#define TSDB_CONSUMER_GROUP_LEN 192
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
#define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
...
...
source/client/test/clientTests.cpp
浏览文件 @
af9178c5
...
...
@@ -148,30 +148,30 @@ TEST(testCase, connect_Test) {
// taos_close(pConn);
//}
//
//
TEST(testCase, create_db_Test) {
//
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
//
assert(pConn != NULL);
TEST
(
testCase
,
create_db_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
//
TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2");
//
if (taos_errno(pRes) != 0) {
//
printf("error in create db, reason:%s\n", taos_errstr(pRes));
//
}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"create database abc1 vgroups 2"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
//
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
//
ASSERT_TRUE(pFields == NULL);
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
ASSERT_TRUE
(
pFields
==
NULL
);
//
int32_t numOfFields = taos_num_fields(pRes);
//
ASSERT_EQ(numOfFields, 0);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
ASSERT_EQ
(
numOfFields
,
0
);
//taos_free_result(pRes);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create database abc1 vgroups 4"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_close
(
pConn
);
}
//pRes = taos_query(pConn, "create database abc1 vgroups 4");
//if (taos_errno(pRes) != 0) {
//printf("error in create db, reason:%s\n", taos_errstr(pRes));
//}
//taos_close(pConn);
//}
//
//TEST(testCase, create_dnode_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
...
...
source/dnode/mgmt/impl/src/dndTransport.c
浏览文件 @
af9178c5
...
...
@@ -100,7 +100,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_VGROUP_LIST
)]
=
dndProcessMnodeReadMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_KILL_QUERY
)]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_KILL_CONN
)]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_HEARTBEAT
)]
=
dndProcessMnode
Read
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_HEARTBEAT
)]
=
dndProcessMnode
Write
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_SHOW
)]
=
dndProcessMnodeReadMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_SHOW_RETRIEVE
)]
=
dndProcessMnodeReadMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_STATUS
)]
=
dndProcessMnodeReadMsg
;
...
...
source/dnode/mnode/impl/CMakeLists.txt
浏览文件 @
af9178c5
...
...
@@ -7,6 +7,7 @@ target_include_directories(
)
target_link_libraries
(
mnode
PRIVATE scheduler
PRIVATE sdb
PRIVATE wal
PRIVATE transport
...
...
@@ -16,4 +17,4 @@ target_link_libraries(
if
(
${
BUILD_TEST
}
)
add_subdirectory
(
test
)
endif
(
${
BUILD_TEST
}
)
\ No newline at end of file
endif
(
${
BUILD_TEST
}
)
source/dnode/mnode/impl/inc/mndConsumer.h
浏览文件 @
af9178c5
...
...
@@ -28,6 +28,9 @@ void mndCleanupConsumer(SMnode *pMnode);
SMqConsumerObj
*
mndAcquireConsumer
(
SMnode
*
pMnode
,
int32_t
consumerId
);
void
mndReleaseConsumer
(
SMnode
*
pMnode
,
SMqConsumerObj
*
pConsumer
);
SSdbRaw
*
mndConsumerActionEncode
(
SMqConsumerObj
*
pConsumer
);
SSdbRow
*
mndConsumerActionDecode
(
SSdbRaw
*
pRaw
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
af9178c5
...
...
@@ -26,6 +26,7 @@
#include "tlog.h"
#include "trpc.h"
#include "ttimer.h"
#include "scheduler.h"
#include "mnode.h"
...
...
@@ -326,17 +327,157 @@ typedef struct SMqTopicConsumer {
#endif
typedef
struct
SMqConsumerEp
{
int32_t
vgId
;
int32_t
vgId
;
// -1 for unassigned
SEpSet
epset
;
int64_t
consumerId
;
int64_t
consumerId
;
// -1 for unassigned
int64_t
lastConsumerHbTs
;
int64_t
lastVgHbTs
;
}
SMqConsumerEp
;
typedef
struct
SMqCgroupTopicPair
{
char
key
[
TSDB_CONSUMER_GROUP_LEN
+
TSDB_TOPIC_FNAME_LEN
];
SArray
*
assigned
;
// SArray<SMqConsumerEp>
SArray
*
unassignedConsumer
;
SArray
*
unassignedVg
;
}
SMqCgroupTopicPair
;
static
FORCE_INLINE
int32_t
tEncodeSMqConsumerEp
(
void
**
buf
,
SMqConsumerEp
*
pConsumerEp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumerEp
->
vgId
);
tlen
+=
taosEncodeSEpSet
(
buf
,
&
pConsumerEp
->
epset
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumerEp
->
consumerId
);
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqConsumerEp
(
void
**
buf
,
SMqConsumerEp
*
pConsumerEp
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumerEp
->
vgId
);
buf
=
taosDecodeSEpSet
(
buf
,
&
pConsumerEp
->
epset
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumerEp
->
consumerId
);
return
buf
;
}
//unit for rebalance
typedef
struct
SMqSubscribeObj
{
char
key
[
TSDB_SUBSCRIBE_KEY_LEN
];
int32_t
epoch
;
//TODO: replace with priority queue
int32_t
nextConsumerIdx
;
SArray
*
availConsumer
;
// SArray<int64_t> (consumerId)
SArray
*
assigned
;
// SArray<SMqConsumerEp>
SArray
*
unassignedConsumer
;
// SArray<SMqConsumerEp>
SArray
*
unassignedVg
;
// SArray<SMqConsumerEp>
}
SMqSubscribeObj
;
static
FORCE_INLINE
SMqSubscribeObj
*
tNewSubscribeObj
()
{
SMqSubscribeObj
*
pSub
=
malloc
(
sizeof
(
SMqSubscribeObj
));
pSub
->
key
[
0
]
=
0
;
pSub
->
epoch
=
0
;
if
(
pSub
==
NULL
)
{
return
NULL
;
}
pSub
->
availConsumer
=
taosArrayInit
(
0
,
sizeof
(
int64_t
));
if
(
pSub
->
availConsumer
==
NULL
)
{
free
(
pSub
);
return
NULL
;
}
pSub
->
assigned
=
taosArrayInit
(
0
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
assigned
==
NULL
)
{
taosArrayDestroy
(
pSub
->
availConsumer
);
free
(
pSub
);
return
NULL
;
}
pSub
->
unassignedConsumer
=
taosArrayInit
(
0
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
assigned
==
NULL
)
{
taosArrayDestroy
(
pSub
->
availConsumer
);
taosArrayDestroy
(
pSub
->
unassignedConsumer
);
free
(
pSub
);
return
NULL
;
}
pSub
->
unassignedVg
=
taosArrayInit
(
0
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
assigned
==
NULL
)
{
taosArrayDestroy
(
pSub
->
availConsumer
);
taosArrayDestroy
(
pSub
->
unassignedConsumer
);
taosArrayDestroy
(
pSub
->
unassignedVg
);
free
(
pSub
);
return
NULL
;
}
return
NULL
;
}
static
FORCE_INLINE
int32_t
tEncodeSubscribeObj
(
void
**
buf
,
const
SMqSubscribeObj
*
pSub
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeString
(
buf
,
pSub
->
key
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pSub
->
epoch
);
int32_t
sz
;
sz
=
taosArrayGetSize
(
pSub
->
availConsumer
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int64_t
*
pConsumerId
=
taosArrayGet
(
pSub
->
availConsumer
,
i
);
tlen
+=
taosEncodeFixedI64
(
buf
,
*
pConsumerId
);
}
sz
=
taosArrayGetSize
(
pSub
->
assigned
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
*
pCEp
=
taosArrayGet
(
pSub
->
assigned
,
i
);
tlen
+=
tEncodeSMqConsumerEp
(
buf
,
pCEp
);
}
sz
=
taosArrayGetSize
(
pSub
->
unassignedConsumer
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
*
pCEp
=
taosArrayGet
(
pSub
->
unassignedConsumer
,
i
);
tlen
+=
tEncodeSMqConsumerEp
(
buf
,
pCEp
);
}
sz
=
taosArrayGetSize
(
pSub
->
unassignedVg
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
*
pCEp
=
taosArrayGet
(
pSub
->
unassignedVg
,
i
);
tlen
+=
tEncodeSMqConsumerEp
(
buf
,
pCEp
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSubscribeObj
(
void
*
buf
,
SMqSubscribeObj
*
pSub
)
{
buf
=
taosDecodeStringTo
(
buf
,
pSub
->
key
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pSub
->
epoch
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pSub
->
assigned
=
taosArrayInit
(
sz
,
sizeof
(
int64_t
));
if
(
pSub
->
assigned
==
NULL
)
{
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int64_t
consumerId
;
buf
=
taosDecodeFixedI64
(
buf
,
&
consumerId
);
taosArrayPush
(
pSub
->
assigned
,
&
consumerId
);
}
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pSub
->
unassignedConsumer
=
taosArrayInit
(
sz
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
unassignedConsumer
==
NULL
)
{
taosArrayDestroy
(
pSub
->
assigned
);
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
cEp
;
buf
=
tDecodeSMqConsumerEp
(
buf
,
&
cEp
);
taosArrayPush
(
pSub
->
unassignedConsumer
,
&
cEp
);
}
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pSub
->
unassignedVg
=
taosArrayInit
(
sz
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
unassignedVg
==
NULL
)
{
taosArrayDestroy
(
pSub
->
assigned
);
taosArrayDestroy
(
pSub
->
unassignedConsumer
);
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
cEp
;
buf
=
tDecodeSMqConsumerEp
(
buf
,
&
cEp
);
taosArrayPush
(
pSub
->
unassignedVg
,
&
cEp
);
}
return
buf
;
}
typedef
struct
SMqCGroup
{
char
name
[
TSDB_CONSUMER_GROUP_LEN
];
...
...
@@ -358,8 +499,8 @@ typedef struct SMqTopicObj {
char
*
sql
;
char
*
logicalPlan
;
char
*
physicalPlan
;
SHashObj
*
cgroups
;
// SHashObj<SMqCGroup>
SHashObj
*
consumers
;
// SHashObj<SMqConsumerObj>
//
SHashObj *cgroups; // SHashObj<SMqCGroup>
//
SHashObj *consumers; // SHashObj<SMqConsumerObj>
}
SMqTopicObj
;
// TODO: add cache and change name to id
...
...
@@ -367,18 +508,93 @@ typedef struct SMqConsumerTopic {
char
name
[
TSDB_TOPIC_FNAME_LEN
];
int32_t
epoch
;
//TODO: replace with something with ep
SList
*
vgroups
;
// SList<int32_t>
//SList *vgroups; // SList<int32_t>
//vg assigned to the consumer on the topic
SArray
*
pVgInfo
;
// SArray<int32_t>
}
SMqConsumerTopic
;
static
FORCE_INLINE
SMqConsumerTopic
*
tNewConsumerTopic
(
int64_t
consumerId
,
SMqTopicObj
*
pTopic
,
SMqSubscribeObj
*
pSub
)
{
SMqConsumerTopic
*
pCTopic
=
malloc
(
sizeof
(
SMqConsumerTopic
));
if
(
pCTopic
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
strcpy
(
pCTopic
->
name
,
pTopic
->
name
);
pCTopic
->
epoch
=
0
;
pCTopic
->
pVgInfo
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
int32_t
unassignedVgSz
=
taosArrayGetSize
(
pSub
->
unassignedVg
);
if
(
unassignedVgSz
>
0
)
{
SMqConsumerEp
*
pCEp
=
taosArrayPop
(
pSub
->
unassignedVg
);
pCEp
->
consumerId
=
consumerId
;
taosArrayPush
(
pCTopic
->
pVgInfo
,
&
pCEp
->
vgId
);
taosArrayPush
(
pSub
->
assigned
,
pCEp
);
}
return
pCTopic
;
}
static
FORCE_INLINE
int32_t
tEncodeSMqConsumerTopic
(
void
**
buf
,
SMqConsumerTopic
*
pConsumerTopic
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeString
(
buf
,
pConsumerTopic
->
name
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumerTopic
->
epoch
);
int32_t
sz
=
taosArrayGetSize
(
pConsumerTopic
->
pVgInfo
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int32_t
*
pVgInfo
=
taosArrayGet
(
pConsumerTopic
->
pVgInfo
,
i
);
tlen
+=
taosEncodeFixedI32
(
buf
,
*
pVgInfo
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqConsumerTopic
(
void
*
buf
,
SMqConsumerTopic
*
pConsumerTopic
)
{
buf
=
taosDecodeStringTo
(
buf
,
pConsumerTopic
->
name
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumerTopic
->
epoch
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pConsumerTopic
->
pVgInfo
=
taosArrayInit
(
sz
,
sizeof
(
SMqConsumerTopic
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int32_t
vgInfo
;
buf
=
taosDecodeFixedI32
(
buf
,
&
vgInfo
);
taosArrayPush
(
pConsumerTopic
->
pVgInfo
,
&
vgInfo
);
}
return
buf
;
}
typedef
struct
SMqConsumerObj
{
int64_t
consumerId
;
SRWLatch
lock
;
char
cgroup
[
TSDB_CONSUMER_GROUP_LEN
];
SArray
*
topics
;
// SArray<SMqConsumerTopic>
SHashObj
*
topicHash
;
//SHashObj<SMqConsumerTopic
>
//SHashObj *topicHash; //SHashObj<SMqTopicObj
>
}
SMqConsumerObj
;
static
FORCE_INLINE
int32_t
tEncodeSMqConsumerObj
(
void
**
buf
,
const
SMqConsumerObj
*
pConsumer
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
consumerId
);
tlen
+=
taosEncodeString
(
buf
,
pConsumer
->
cgroup
);
int32_t
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerTopic
*
pConsumerTopic
=
taosArrayGet
(
pConsumer
->
topics
,
i
);
tlen
+=
tEncodeSMqConsumerTopic
(
buf
,
pConsumerTopic
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqConsumerObj
(
void
*
buf
,
SMqConsumerObj
*
pConsumer
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
consumerId
);
buf
=
taosDecodeStringTo
(
buf
,
pConsumer
->
cgroup
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pConsumer
->
topics
=
taosArrayInit
(
sz
,
sizeof
(
SMqConsumerObj
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerTopic
cTopic
;
buf
=
tDecodeSMqConsumerTopic
(
buf
,
&
cTopic
);
taosArrayPush
(
pConsumer
->
topics
,
&
cTopic
);
}
return
buf
;
}
typedef
struct
SMqSubConsumerObj
{
int64_t
consumerUid
;
// if -1, unassigned
SList
*
vgId
;
// SList<int32_t>
...
...
source/dnode/mnode/impl/inc/mndInt.h
浏览文件 @
af9178c5
...
...
@@ -80,6 +80,7 @@ typedef struct SMnode {
SReplica
replicas
[
TSDB_MAX_REPLICA
];
tmr_h
timer
;
tmr_h
transTimer
;
tmr_h
mqTimer
;
char
*
path
;
SMnodeCfg
cfg
;
int64_t
checkTime
;
...
...
source/dnode/mnode/impl/inc/mndSubscribe.h
0 → 100644
浏览文件 @
af9178c5
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_SUBSCRIBE_H_
#define _TD_MND_SUBSCRIBE_H_
#include "mndInt.h"
#ifdef __cplusplus
extern
"C"
{
#endif
int32_t
mndInitSubscribe
(
SMnode
*
pMnode
);
void
mndCleanupSubscribe
(
SMnode
*
pMnode
);
SMqSubscribeObj
*
mndAcquireSubscribe
(
SMnode
*
pMnode
,
char
*
CGroup
,
char
*
topicName
);
void
mndReleaseSubscribe
(
SMnode
*
pMnode
,
SMqSubscribeObj
*
pSub
);
SSdbRaw
*
mndSubscribeActionEncode
(
SMqSubscribeObj
*
pSub
);
SSdbRow
*
mndSubscribeActionDecode
(
SSdbRaw
*
pRaw
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_MND_SUBSCRIBE_H_*/
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
af9178c5
...
...
@@ -30,24 +30,14 @@
#define MND_CONSUMER_VER_NUMBER 1
#define MND_CONSUMER_RESERVE_SIZE 64
static
SSdbRaw
*
mndConsumerActionEncode
(
SMqConsumerObj
*
pConsumer
);
static
SSdbRow
*
mndConsumerActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndConsumerActionInsert
(
SSdb
*
pSdb
,
SMqConsumerObj
*
pConsumer
);
static
int32_t
mndConsumerActionDelete
(
SSdb
*
pSdb
,
SMqConsumerObj
*
pConsumer
);
static
int32_t
mndConsumerActionUpdate
(
SSdb
*
pSdb
,
SMqConsumerObj
*
pConsumer
,
SMqConsumerObj
*
pNewConsumer
);
static
int32_t
mndProcessCreateConsumerMsg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropConsumerMsg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropConsumerInRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessConsumerMetaMsg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndGetConsumerMeta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaRsp
*
pMeta
);
static
int32_t
mndRetrieveConsumer
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
);
static
void
mndCancelGetNextConsumer
(
SMnode
*
pMnode
,
void
*
pIter
);
static
int32_t
mndProcessSubscribeReq
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessSubscribeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessSubscribeInternalReq
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessSubscribeInternalRsp
(
SMnodeMsg
*
pMsg
);
int32_t
mndInitConsumer
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_CONSUMER
,
.
keyType
=
SDB_KEY_BINARY
,
...
...
@@ -57,26 +47,29 @@ int32_t mndInitConsumer(SMnode *pMnode) {
.
updateFp
=
(
SdbUpdateFp
)
mndConsumerActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndConsumerActionDelete
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_SUBSCRIBE
,
mndProcessSubscribeReq
);
/*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/
/*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/
mndSetMsgHandle
(
pMnode
,
TDMT_VND_SUBSCRIBE_RSP
,
mndProcessSubscribeInternalRsp
);
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
}
void
mndCleanupConsumer
(
SMnode
*
pMnode
)
{}
static
void
*
mndBuildMqVGroupSetReq
(
SMnode
*
pMnode
,
char
*
topicName
,
int32_t
vgId
,
int64_t
consumerId
,
char
*
cgroup
)
{
return
0
;
}
static
SSdbRaw
*
mndConsumerActionEncode
(
SMqConsumerObj
*
pConsumer
)
{
int32_t
size
=
sizeof
(
SMqConsumerObj
)
+
MND_CONSUMER_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_CONSUMER
,
MND_CONSUMER_VER_NUMBER
,
size
);
SSdbRaw
*
mndConsumerActionEncode
(
SMqConsumerObj
*
pConsumer
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
tlen
=
tEncodeSMqConsumerObj
(
NULL
,
pConsumer
);
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_CONSUMER
,
MND_CONSUMER_VER_NUMBER
,
tlen
);
if
(
pRaw
==
NULL
)
goto
CM_ENCODE_OVER
;
void
*
buf
=
malloc
(
tlen
);
if
(
buf
==
NULL
)
goto
CM_ENCODE_OVER
;
void
*
abuf
=
buf
;
tEncodeSMqConsumerObj
(
&
abuf
,
pConsumer
);
int32_t
dataPos
=
0
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
tlen
,
CM_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
buf
,
tlen
,
CM_ENCODE_OVER
);
#if 0
int32_t topicNum = taosArrayGetSize(pConsumer->topics);
SDB_SET_INT64(pRaw, dataPos, pConsumer->consumerId, CM_ENCODE_OVER);
int32_t len = strlen(pConsumer->cgroup);
...
...
@@ -101,10 +94,13 @@ static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
/*SDB_SET_INT64(pRaw, dataPos, 0[> change to list item <]);*/
}
}
#endif
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
MND_CONSUMER_RESERVE_SIZE
,
CM_ENCODE_OVER
);
SDB_SET_DATALEN
(
pRaw
,
dataPos
,
CM_ENCODE_OVER
);
terrno
=
TSDB_CODE_SUCCESS
;
CM_ENCODE_OVER:
if
(
terrno
!=
0
)
{
mError
(
"consumer:%ld, failed to encode to raw:%p since %s"
,
pConsumer
->
consumerId
,
pRaw
,
terrstr
());
...
...
@@ -116,7 +112,7 @@ CM_ENCODE_OVER:
return
pRaw
;
}
static
SSdbRow
*
mndConsumerActionDecode
(
SSdbRaw
*
pRaw
)
{
SSdbRow
*
mndConsumerActionDecode
(
SSdbRaw
*
pRaw
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int8_t
sver
=
0
;
...
...
@@ -127,18 +123,27 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
goto
CONSUME_DECODE_OVER
;
}
int32_t
size
=
sizeof
(
SMqConsumerObj
);
SSdbRow
*
pRow
=
sdbAllocRow
(
size
);
SSdbRow
*
pRow
=
sdbAllocRow
(
sizeof
(
SMqConsumerObj
));
if
(
pRow
==
NULL
)
goto
CONSUME_DECODE_OVER
;
SMqConsumerObj
*
pConsumer
=
sdbGetRowObj
(
pRow
);
if
(
pConsumer
==
NULL
)
goto
CONSUME_DECODE_OVER
;
int32_t
dataPos
=
0
;
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pConsumer
->
consumerId
,
CONSUME_DECODE_OVER
);
int32_t
len
,
topicNum
;
int32_t
len
;
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
len
,
CONSUME_DECODE_OVER
);
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pConsumer
->
cgroup
,
len
,
CONSUME_DECODE_OVER
);
void
*
buf
=
malloc
(
len
);
if
(
buf
==
NULL
)
goto
CONSUME_DECODE_OVER
;
SDB_GET_BINARY
(
pRaw
,
dataPos
,
buf
,
len
,
CONSUME_DECODE_OVER
);
tDecodeSMqConsumerObj
(
buf
,
pConsumer
);
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_CONSUMER_RESERVE_SIZE
,
CONSUME_DECODE_OVER
);
terrno
=
TSDB_CODE_SUCCESS
;
#if 0
SDB_GET_INT32(pRaw, dataPos, &topicNum, CONSUME_DECODE_OVER);
for (int i = 0; i < topicNum; i++) {
int32_t topicLen;
...
...
@@ -154,6 +159,7 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
int32_t vgSize;
SDB_GET_INT32(pRaw, dataPos, &vgSize, CONSUME_DECODE_OVER);
}
#endif
CONSUME_DECODE_OVER:
if
(
terrno
!=
0
)
{
...
...
@@ -162,8 +168,6 @@ CONSUME_DECODE_OVER:
return
NULL
;
}
/*SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE);*/
return
pRow
;
}
...
...
@@ -201,214 +205,13 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
sdbRelease
(
pSdb
,
pConsumer
);
}
static
int32_t
mndProcessSubscribeReq
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
char
*
msgStr
=
pMsg
->
rpcMsg
.
pCont
;
SCMSubscribeReq
subscribe
;
tDeserializeSCMSubscribeReq
(
msgStr
,
&
subscribe
);
int64_t
consumerId
=
subscribe
.
consumerId
;
char
*
consumerGroup
=
subscribe
.
consumerGroup
;
int32_t
cgroupLen
=
strlen
(
consumerGroup
);
SArray
*
newSub
=
NULL
;
int
newTopicNum
=
subscribe
.
topicNum
;
if
(
newTopicNum
)
{
newSub
=
taosArrayInit
(
newTopicNum
,
sizeof
(
SMqConsumerTopic
));
}
SMqConsumerTopic
*
pConsumerTopics
=
calloc
(
newTopicNum
,
sizeof
(
SMqConsumerTopic
));
if
(
pConsumerTopics
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
for
(
int
i
=
0
;
i
<
newTopicNum
;
i
++
)
{
char
*
newTopicName
=
taosArrayGetP
(
newSub
,
i
);
SMqConsumerTopic
*
pConsumerTopic
=
&
pConsumerTopics
[
i
];
strcpy
(
pConsumerTopic
->
name
,
newTopicName
);
pConsumerTopic
->
vgroups
=
tdListNew
(
sizeof
(
int64_t
));
}
taosArrayAddBatch
(
newSub
,
pConsumerTopics
,
newTopicNum
);
free
(
pConsumerTopics
);
taosArraySortString
(
newSub
,
taosArrayCompareString
);
SArray
*
oldSub
=
NULL
;
int
oldTopicNum
=
0
;
// create consumer if not exist
SMqConsumerObj
*
pConsumer
=
mndAcquireConsumer
(
pMnode
,
consumerId
);
if
(
pConsumer
==
NULL
)
{
// create consumer
pConsumer
=
malloc
(
sizeof
(
SMqConsumerObj
));
if
(
pConsumer
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pConsumer
->
consumerId
=
consumerId
;
strcpy
(
pConsumer
->
cgroup
,
consumerGroup
);
}
else
{
oldSub
=
pConsumer
->
topics
;
oldTopicNum
=
taosArrayGetSize
(
oldSub
);
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
//TODO: free memory
return
-
1
;
}
int
i
=
0
,
j
=
0
;
while
(
i
<
newTopicNum
||
j
<
oldTopicNum
)
{
SMqConsumerTopic
*
pOldTopic
=
NULL
;
SMqConsumerTopic
*
pNewTopic
=
NULL
;
if
(
i
>=
newTopicNum
)
{
// encode unset topic msg to all vnodes related to that topic
pOldTopic
=
taosArrayGet
(
oldSub
,
j
);
j
++
;
}
else
if
(
j
>=
oldTopicNum
)
{
pNewTopic
=
taosArrayGet
(
newSub
,
i
);
i
++
;
}
else
{
pNewTopic
=
taosArrayGet
(
newSub
,
i
);
pOldTopic
=
taosArrayGet
(
oldSub
,
j
);
char
*
newName
=
pNewTopic
->
name
;
char
*
oldName
=
pOldTopic
->
name
;
int
comp
=
compareLenPrefixedStr
(
newName
,
oldName
);
if
(
comp
==
0
)
{
// do nothing
pOldTopic
=
pNewTopic
=
NULL
;
i
++
;
j
++
;
continue
;
}
else
if
(
comp
<
0
)
{
pOldTopic
=
NULL
;
i
++
;
}
else
{
pNewTopic
=
NULL
;
j
++
;
}
}
if
(
pOldTopic
!=
NULL
)
{
//cancel subscribe of that old topic
ASSERT
(
pNewTopic
==
NULL
);
char
*
oldTopicName
=
pOldTopic
->
name
;
SList
*
vgroups
=
pOldTopic
->
vgroups
;
SListIter
iter
;
tdListInitIter
(
vgroups
,
&
iter
,
TD_LIST_FORWARD
);
SListNode
*
pn
;
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
oldTopicName
);
ASSERT
(
pTopic
!=
NULL
);
SMqCGroup
*
pGroup
=
taosHashGet
(
pTopic
->
cgroups
,
consumerGroup
,
cgroupLen
);
while
((
pn
=
tdListNext
(
&
iter
))
!=
NULL
)
{
int32_t
vgId
=
*
(
int64_t
*
)
pn
->
data
;
// acquire and get epset
SVgObj
*
pVgObj
=
mndAcquireVgroup
(
pMnode
,
vgId
);
// TODO what time to release?
if
(
pVgObj
==
NULL
)
{
// TODO handle error
continue
;
}
//build reset msg
void
*
pMqVgSetReq
=
mndBuildMqVGroupSetReq
(
pMnode
,
oldTopicName
,
vgId
,
consumerId
,
consumerGroup
);
// TODO:serialize
if
(
pMsg
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgObj
);
action
.
pCont
=
pMqVgSetReq
;
action
.
contLen
=
0
;
// TODO
action
.
msgType
=
TDMT_VND_MQ_SET_CONN
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
pMqVgSetReq
);
mndTransDrop
(
pTrans
);
// TODO free
return
-
1
;
}
}
//delete data in mnode
taosHashRemove
(
pTopic
->
cgroups
,
consumerGroup
,
cgroupLen
);
mndReleaseTopic
(
pMnode
,
pTopic
);
}
else
if
(
pNewTopic
!=
NULL
)
{
// save subscribe info to mnode
ASSERT
(
pOldTopic
==
NULL
);
char
*
newTopicName
=
pNewTopic
->
name
;
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
newTopicName
);
ASSERT
(
pTopic
!=
NULL
);
SMqCGroup
*
pGroup
=
taosHashGet
(
pTopic
->
cgroups
,
consumerGroup
,
cgroupLen
);
if
(
pGroup
==
NULL
)
{
// add new group
pGroup
=
malloc
(
sizeof
(
SMqCGroup
));
if
(
pGroup
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pGroup
->
consumerIds
=
tdListNew
(
sizeof
(
int64_t
));
if
(
pGroup
->
consumerIds
==
NULL
)
{
free
(
pGroup
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pGroup
->
status
=
0
;
// add into cgroups
taosHashPut
(
pTopic
->
cgroups
,
consumerGroup
,
cgroupLen
,
pGroup
,
sizeof
(
SMqCGroup
));
}
/*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/
// put the consumer into list
// rebalance will be triggered by timer
tdListAppend
(
pGroup
->
consumerIds
,
&
consumerId
);
SSdbRaw
*
pTopicRaw
=
mndTopicActionEncode
(
pTopic
);
sdbSetRawStatus
(
pTopicRaw
,
SDB_STATUS_READY
);
// TODO: error handling
mndTransAppendRedolog
(
pTrans
,
pTopicRaw
);
mndReleaseTopic
(
pMnode
,
pTopic
);
}
else
{
ASSERT
(
0
);
}
}
// destroy old sub
taosArrayDestroy
(
oldSub
);
// put new sub into consumerobj
pConsumer
->
topics
=
newSub
;
// persist consumerObj
SSdbRaw
*
pConsumerRaw
=
mndConsumerActionEncode
(
pConsumer
);
sdbSetRawStatus
(
pConsumerRaw
,
SDB_STATUS_READY
);
// TODO: error handling
mndTransAppendRedolog
(
pTrans
,
pConsumerRaw
);
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
mndReleaseConsumer
(
pMnode
,
pConsumer
);
return
-
1
;
}
// TODO: free memory
mndTransDrop
(
pTrans
);
mndReleaseConsumer
(
pMnode
,
pConsumer
);
return
0
;
}
static
int32_t
mndProcessSubscribeInternalRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
#if 0
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
mDebug("consumer:%s, start to retrieve meta", pInfo->tableFname);
#if 0
SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
...
...
@@ -463,7 +266,6 @@ static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
pMsg->contLen = contLen;
mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags);
#endif
return 0;
}
...
...
@@ -546,3 +348,4 @@ static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
#endif
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
af9178c5
...
...
@@ -273,6 +273,7 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
}
static
SClientHbRsp
*
mndMqHbBuildRsp
(
SMnode
*
pMnode
,
SClientHbReq
*
pReq
)
{
#if 0
SClientHbRsp* pRsp = malloc(sizeof(SClientHbRsp));
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
...
...
@@ -332,6 +333,8 @@ static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) {
pRsp->body = buf;
pRsp->bodyLen = tlen;
return pRsp;
#endif
return
NULL
;
}
static
int32_t
mndProcessHeartBeatReq
(
SMnodeMsg
*
pReq
)
{
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
0 → 100644
浏览文件 @
af9178c5
此差异已折叠。
点击以展开。
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
af9178c5
...
...
@@ -79,8 +79,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
logicalPlan
,
logicalPlanLen
,
TOPIC_ENCODE_OVER
);
int32_t
physicalPlanLen
=
strlen
(
pTopic
->
physicalPlan
)
+
1
;
pTopic
->
physicalPlan
=
calloc
(
physicalPlanLen
,
sizeof
(
char
));
if
(
pTopic
->
physicalPlan
==
NULL
)
goto
TOPIC_ENCODE_OVER
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
strlen
(
pTopic
->
physicalPlan
)
+
1
,
TOPIC_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
physicalPlan
,
physicalPlanLen
,
TOPIC_ENCODE_OVER
);
...
...
@@ -92,12 +90,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
TOPIC_ENCODE_OVER:
if
(
terrno
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"topic:%s, failed to encode to raw:%p since %s"
,
pTopic
->
name
,
pRaw
,
terrstr
());
/*if (pTopic->logicalPlan) {*/
/*free(pTopic->logicalPlan);*/
/*}*/
/*if (pTopic->physicalPlan) {*/
/*free(pTopic->physicalPlan);*/
/*}*/
sdbFreeRaw
(
pRaw
);
return
NULL
;
}
...
...
@@ -138,7 +130,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
sql
,
pTopic
->
sqlLen
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
len
,
TOPIC_DECODE_OVER
);
pTopic
->
logicalPlan
=
calloc
(
len
+
1
,
sizeof
(
char
));
pTopic
->
logicalPlan
=
calloc
(
len
+
1
,
sizeof
(
char
));
if
(
pTopic
->
logicalPlan
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TOPIC_DECODE_OVER
;
...
...
@@ -146,7 +138,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
logicalPlan
,
len
+
1
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
len
,
TOPIC_DECODE_OVER
);
pTopic
->
log
icalPlan
=
calloc
(
len
+
1
,
sizeof
(
char
));
pTopic
->
phys
icalPlan
=
calloc
(
len
+
1
,
sizeof
(
char
));
if
(
pTopic
->
physicalPlan
==
NULL
)
{
free
(
pTopic
->
logicalPlan
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -154,7 +146,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
}
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
physicalPlan
,
len
+
1
,
TOPIC_DECODE_OVER
);
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_TOPIC_RESERVE_SIZE
,
TOPIC_DECODE_OVER
)
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_TOPIC_RESERVE_SIZE
,
TOPIC_DECODE_OVER
)
;
terrno
=
TSDB_CODE_SUCCESS
;
...
...
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
af9178c5
...
...
@@ -18,6 +18,7 @@
#include "mndAuth.h"
#include "mndBnode.h"
#include "mndCluster.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
...
...
@@ -27,6 +28,7 @@
#include "mndShow.h"
#include "mndSnode.h"
#include "mndStb.h"
#include "mndSubscribe.h"
#include "mndSync.h"
#include "mndTelem.h"
#include "mndTopic.h"
...
...
@@ -69,15 +71,15 @@ static void mndTransReExecute(void *param, void *tmrId) {
taosTmrReset
(
mndTransReExecute
,
3000
,
pMnode
,
pMnode
->
timer
,
&
pMnode
->
transTimer
);
}
static
void
mndCalMqRebalance
(
void
*
param
,
void
*
tmrId
)
{
SMnode
*
pMnode
=
param
;
static
void
mndCalMqRebalance
(
void
*
param
,
void
*
tmrId
)
{
SMnode
*
pMnode
=
param
;
if
(
mndIsMaster
(
pMnode
))
{
// iterate cgroup, cal rebalance
// sync with raft
// write sdb
SMqTmrMsg
*
pMsg
=
rpcMallocCont
(
sizeof
(
SMqTmrMsg
));
SRpcMsg
rpcMsg
=
{.
msgType
=
TDMT_MND_MQ_TIMER
,
.
pCont
=
pMsg
,
.
contLen
=
sizeof
(
SMqTmrMsg
)};
pMnode
->
putReqToMWriteQFp
(
pMnode
->
pDnode
,
&
rpcMsg
);
}
taosTmrReset
(
mndCalMqRebalance
,
3000
,
pMnode
,
pMnode
->
timer
,
&
pMnode
->
trans
Timer
);
taosTmrReset
(
mndCalMqRebalance
,
3000
,
pMnode
,
pMnode
->
timer
,
&
pMnode
->
mq
Timer
);
}
static
int32_t
mndInitTimer
(
SMnode
*
pMnode
)
{
...
...
@@ -95,6 +97,11 @@ static int32_t mndInitTimer(SMnode *pMnode) {
return
-
1
;
}
if
(
taosTmrReset
(
mndCalMqRebalance
,
3000
,
pMnode
,
pMnode
->
timer
,
&
pMnode
->
mqTimer
))
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
return
0
;
}
...
...
@@ -102,6 +109,8 @@ static void mndCleanupTimer(SMnode *pMnode) {
if
(
pMnode
->
timer
!=
NULL
)
{
taosTmrStop
(
pMnode
->
transTimer
);
pMnode
->
transTimer
=
NULL
;
taosTmrStop
(
pMnode
->
mqTimer
);
pMnode
->
mqTimer
=
NULL
;
taosTmrCleanUp
(
pMnode
->
timer
);
pMnode
->
timer
=
NULL
;
}
...
...
@@ -171,6 +180,8 @@ static int32_t mndInitSteps(SMnode *pMnode) {
if
(
mndAllocStep
(
pMnode
,
"mnode-auth"
,
mndInitAuth
,
mndCleanupAuth
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-acct"
,
mndInitAcct
,
mndCleanupAcct
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-topic"
,
mndInitTopic
,
mndCleanupTopic
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-consumer"
,
mndInitConsumer
,
mndCleanupConsumer
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-subscribe"
,
mndInitSubscribe
,
mndCleanupSubscribe
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-vgroup"
,
mndInitVgroup
,
mndCleanupVgroup
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-stb"
,
mndInitStb
,
mndCleanupStb
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-db"
,
mndInitDb
,
mndCleanupDb
)
!=
0
)
return
-
1
;
...
...
@@ -377,7 +388,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
return
NULL
;
}
if
(
pRpcMsg
->
msgType
!=
TDMT_MND_TRANS
)
{
if
(
pRpcMsg
->
msgType
!=
TDMT_MND_TRANS
&&
pRpcMsg
->
msgType
!=
TDMT_MND_MQ_TIMER
)
{
SRpcConnInfo
connInfo
=
{
0
};
if
((
pRpcMsg
->
msgType
&
1U
)
&&
rpcGetConnInfo
(
pRpcMsg
->
handle
,
&
connInfo
)
!=
0
)
{
taosFreeQitem
(
pMsg
);
...
...
source/dnode/mnode/sdb/src/sdbHash.c
浏览文件 @
af9178c5
...
...
@@ -38,10 +38,10 @@ const char *sdbTableName(ESdbType type) {
return
"auth"
;
case
SDB_ACCT
:
return
"acct"
;
case
SDB_SUBSCRIBE
:
return
"subscribe"
;
case
SDB_CONSUMER
:
return
"consumer"
;
case
SDB_CGROUP
:
return
"cgroup"
;
case
SDB_TOPIC
:
return
"topic"
;
case
SDB_VGROUP
:
...
...
source/dnode/vnode/src/vnd/vnodeWrite.c
浏览文件 @
af9178c5
...
...
@@ -108,6 +108,15 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO: handle error
}
break
;
case
TDMT_VND_MQ_SET_CONN
:
{
char
*
reqStr
=
ptr
;
SMqSetCVgReq
req
;
/*tDecodeSMqSetCVgReq(reqStr, &req);*/
// create topic if not exist
// convert to task
// write mq meta
}
break
;
default:
ASSERT
(
0
);
break
;
...
...
source/libs/scheduler/CMakeLists.txt
浏览文件 @
af9178c5
...
...
@@ -9,9 +9,9 @@ target_include_directories(
target_link_libraries
(
scheduler
P
RIVATE
os util planner qcom common catalog transport
P
UBLIC
os util planner qcom common catalog transport
)
if
(
${
BUILD_TEST
}
)
ADD_SUBDIRECTORY
(
test
)
endif
(
${
BUILD_TEST
}
)
\ No newline at end of file
endif
(
${
BUILD_TEST
}
)
source/libs/transport/inc/transComm.h
浏览文件 @
af9178c5
...
...
@@ -68,6 +68,25 @@ typedef void* queue[2];
QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \
QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \
}
#define QUEUE_SPLIT(h, q, n) \
do { \
QUEUE_PREV(n) = QUEUE_PREV(h); \
QUEUE_PREV_NEXT(n) = (n); \
QUEUE_NEXT(n) = (q); \
QUEUE_PREV(h) = QUEUE_PREV(q); \
QUEUE_PREV_NEXT(h) = (h); \
QUEUE_PREV(q) = (n); \
} while (0)
#define QUEUE_MOVE(h, n) \
do { \
if (QUEUE_IS_EMPTY(h)) { \
QUEUE_INIT(n); \
} else { \
queue* q = QUEUE_HEAD(h); \
QUEUE_SPLIT(h, q, n); \
} \
} while (0)
/* Return the element at the front of the queue. */
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))
...
...
source/libs/transport/src/trans.c
浏览文件 @
af9178c5
...
...
@@ -35,6 +35,7 @@ void* rpcOpen(const SRpcInit* pInit) {
if
(
pInit
->
label
)
{
tstrncpy
(
pRpc
->
label
,
pInit
->
label
,
strlen
(
pInit
->
label
));
}
pRpc
->
cfp
=
pInit
->
cfp
;
pRpc
->
numOfThreads
=
pInit
->
numOfThreads
>
TSDB_MAX_RPC_THREADS
?
TSDB_MAX_RPC_THREADS
:
pInit
->
numOfThreads
;
pRpc
->
connType
=
pInit
->
connType
;
pRpc
->
tcphandle
=
(
*
taosHandle
[
pRpc
->
connType
])(
0
,
pInit
->
localPort
,
pRpc
->
label
,
pRpc
->
numOfThreads
,
NULL
,
pRpc
);
...
...
source/libs/transport/src/transCli.c
浏览文件 @
af9178c5
...
...
@@ -20,12 +20,16 @@
typedef
struct
SCliConn
{
uv_connect_t
connReq
;
uv_stream_t
*
stream
;
uv_write_t
*
writeReq
;
void
*
data
;
queue
conn
;
char
spi
;
char
secured
;
}
SCliConn
;
typedef
struct
SCliMsg
{
SRpcReqContext
*
context
;
queue
q
;
uint64_t
st
;
}
SCliMsg
;
typedef
struct
SCliThrdObj
{
...
...
@@ -45,86 +49,169 @@ typedef struct SClientObj {
SCliThrdObj
**
pThreadObj
;
}
SClientObj
;
static
void
clientWriteCb
(
uv_write_t
*
req
,
int
status
);
// conn pool
static
SCliConn
*
getConnFromCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
);
static
void
addConnToCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
,
SCliConn
*
conn
);
static
void
clientAllocrReadBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
);
static
void
clientReadCb
(
uv_stream_t
*
cli
,
ssize_t
nread
,
const
uv_buf_t
*
buf
);
static
void
clientConnCb
(
struct
uv_connect_s
*
req
,
int
status
);
static
void
clientWriteCb
(
uv_write_t
*
req
,
int
status
);
static
void
clientConnCb
(
uv_connect_t
*
req
,
int
status
);
static
void
clientAsyncCb
(
uv_async_t
*
handle
);
static
void
clientDestroy
(
uv_handle_t
*
handle
);
static
void
clientConnDestroy
(
SCliConn
*
pConn
);
static
void
*
clientThread
(
void
*
arg
);
static
void
clientWriteCb
(
uv_write_t
*
req
,
int
status
)
{
static
void
clientHandleReq
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
);
static
void
clientAllocrReadBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
)
{
// impl later
}
static
void
client
FailedCb
(
uv_handle_t
*
handle
)
{
static
void
client
ReadCb
(
uv_stream_t
*
handle
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
// impl later
tDebug
(
"close handle"
);
SCliConn
*
conn
=
handle
->
data
;
if
(
nread
>
0
)
{
return
;
}
//
uv_close
((
uv_handle_t
*
)
handle
,
clientDestroy
);
}
static
void
clientReadCb
(
uv_stream_t
*
cli
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
static
void
clientConnDestroy
(
SCliConn
*
conn
)
{
// impl later
//
}
static
void
clientDestroy
(
uv_handle_t
*
handle
)
{
SCliConn
*
conn
=
handle
->
data
;
clientConnDestroy
(
conn
);
}
static
void
clientConnCb
(
struct
uv_connect_s
*
req
,
int
status
)
{
static
void
clientWriteCb
(
uv_write_t
*
req
,
int
status
)
{
SCliConn
*
pConn
=
req
->
data
;
if
(
status
==
0
)
{
tDebug
(
"data already was written on stream"
);
}
else
{
uv_close
((
uv_handle_t
*
)
pConn
->
stream
,
clientDestroy
);
return
;
}
uv_read_start
((
uv_stream_t
*
)
pConn
->
stream
,
clientAllocrReadBufferCb
,
clientReadCb
);
// impl later
}
static
void
clientWrite
(
SCliConn
*
pConn
)
{
SCliMsg
*
pMsg
=
pConn
->
data
;
SEpSet
*
pEpSet
=
&
pMsg
->
context
->
epSet
;
SRpcHead
*
pHead
=
rpcHeadFromCont
(
pMsg
->
context
->
pCont
);
int
msgLen
=
rpcMsgLenFromCont
(
pMsg
->
context
->
contLen
);
char
*
msg
=
(
char
*
)(
pHead
);
uv_buf_t
wb
=
uv_buf_init
(
msg
,
msgLen
);
uv_write
(
pConn
->
writeReq
,
(
uv_stream_t
*
)
pConn
->
stream
,
&
wb
,
1
,
clientWriteCb
);
}
static
void
clientConnCb
(
uv_connect_t
*
req
,
int
status
)
{
// impl later
SCliConn
*
pConn
=
req
->
data
;
if
(
status
!=
0
)
{
tError
(
"failed to connect %s"
,
uv_err_name
(
status
));
clientConnDestroy
(
pConn
);
return
;
}
SCliMsg
*
pMsg
=
pConn
->
data
;
SEpSet
*
pEpSet
=
&
pMsg
->
context
->
epSet
;
SRpcMsg
rpcMsg
;
// rpcMsg.ahandle = pMsg->context->ahandle;
// rpcMsg.pCont = NULL;
char
*
fqdn
=
pEpSet
->
fqdn
[
pEpSet
->
inUse
];
uint32_t
port
=
pEpSet
->
port
[
pEpSet
->
inUse
];
if
(
status
!=
0
)
{
// call user fp later
tError
(
"failed to connect server(%s, %d), errmsg: %s"
,
fqdn
,
port
,
uv_strerror
(
status
));
uv_close
((
uv_handle_t
*
)
req
->
handle
,
clientFailedCb
);
SRpcInfo
*
pRpc
=
pMsg
->
context
->
pRpc
;
(
pRpc
->
cfp
)(
NULL
,
&
rpcMsg
,
pEpSet
);
uv_close
((
uv_handle_t
*
)
req
->
handle
,
clientDestroy
);
return
;
}
assert
(
pConn
->
stream
==
req
->
handle
);
// impl later
}
static
SCliConn
*
getConnFromCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
)
{
// impl later
return
NULL
;
}
static
void
clientAsyncCb
(
uv_async_t
*
handle
)
{
SCliThrdObj
*
pThrd
=
handle
->
data
;
SCliMsg
*
pMsg
=
NULL
;
pthread_mutex_lock
(
&
pThrd
->
msgMtx
);
if
(
!
QUEUE_IS_EMPTY
(
&
pThrd
->
msg
))
{
queue
*
head
=
QUEUE_HEAD
(
&
pThrd
->
msg
);
pMsg
=
QUEUE_DATA
(
head
,
SCliMsg
,
q
);
QUEUE_REMOVE
(
head
);
}
pthread_mutex_unlock
(
&
pThrd
->
msgMtx
);
static
void
addConnToCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
,
SCliConn
*
conn
)
{
// impl later
}
static
void
clientHandleReq
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
)
{
SEpSet
*
pEpSet
=
&
pMsg
->
context
->
epSet
;
SEpSet
*
pEpSet
=
&
pMsg
->
context
->
epSet
;
char
*
fqdn
=
pEpSet
->
fqdn
[
pEpSet
->
inUse
];
uint32_t
port
=
pEpSet
->
port
[
pEpSet
->
inUse
];
uint64_t
el
=
taosGetTimestampUs
()
-
pMsg
->
st
;
tDebug
(
"msg tran time cost: %"
PRIu64
""
,
el
);
SCliConn
*
conn
=
getConnFromCache
(
pThrd
->
cache
,
fqdn
,
port
);
if
(
conn
!=
NULL
)
{
// impl later
conn
->
data
=
pMsg
;
conn
->
writeReq
->
data
=
conn
;
clientWrite
(
conn
);
// uv_buf_t wb;
// uv_write(conn->writeReq, (uv_stream_t*)conn->stream, &wb, 1, clientWriteCb);
}
else
{
SCliConn
*
conn
=
malloc
(
sizeof
(
SCliConn
));
conn
->
stream
=
(
uv_stream_t
*
)
malloc
(
sizeof
(
uv_tcp_t
));
uv_tcp_init
(
pThrd
->
loop
,
(
uv_tcp_t
*
)(
conn
->
stream
));
conn
->
writeReq
=
malloc
(
sizeof
(
uv_write_t
));
conn
->
connReq
.
data
=
conn
;
conn
->
data
=
pMsg
;
struct
sockaddr_in
addr
;
uv_ip4_addr
(
fqdn
,
port
,
&
addr
);
// handle error in callback if
connect error
// handle error in callback if
fail to connect
uv_tcp_connect
(
&
conn
->
connReq
,
(
uv_tcp_t
*
)(
conn
->
stream
),
(
const
struct
sockaddr
*
)
&
addr
,
clientConnCb
);
}
// SRpcReqContext* pCxt = pMsg->context;
// SRpcMsg rpcMsg;
// SEpSet* pEpSet = &pMsg->context->epSet;
// SRpcInfo* pRpc = pMsg->context->pRpc;
//// rpcMsg.ahandle = pMsg->context->ahandle;
// rpcMsg.pCont = NULL;
// rpcMsg.ahandle = pMsg->context->ahandle;
// uint64_t el1 = taosGetTimestampUs() - et;
// tError("msg tran back first: time cost: %" PRIu64 "", el1);
// et = taosGetTimestampUs();
//(pRpc->cfp)(NULL, &rpcMsg, pEpSet);
// uint64_t el2 = taosGetTimestampUs() - et;
// tError("msg tran back second: time cost: %" PRIu64 "", el2);
}
}
static
void
clientAsyncCb
(
uv_async_t
*
handle
)
{
SCliThrdObj
*
pThrd
=
handle
->
data
;
SCliMsg
*
pMsg
=
NULL
;
queue
wq
;
//
SRpcHead* pHead = rpcHeadFromCont(pCtx->pCont);
// char* msg = (char*)pHead
;
// int len = rpcMsgLenFromCont(pCtx->contLen
);
// tmsg_t msgType = pCtx->msgType
;
//
batch process to avoid to lock/unlock frequently
pthread_mutex_lock
(
&
pThrd
->
msgMtx
)
;
QUEUE_MOVE
(
&
pThrd
->
msg
,
&
wq
);
pthread_mutex_unlock
(
&
pThrd
->
msgMtx
)
;
// impl later
int
count
=
0
;
while
(
!
QUEUE_IS_EMPTY
(
&
wq
))
{
queue
*
h
=
QUEUE_HEAD
(
&
wq
);
QUEUE_REMOVE
(
h
);
pMsg
=
QUEUE_DATA
(
h
,
SCliMsg
,
q
);
clientHandleReq
(
pMsg
,
pThrd
);
count
++
;
if
(
count
>=
2
)
{
tError
(
"send batch size: %d"
,
count
);
}
}
}
static
void
*
clientThread
(
void
*
arg
)
{
...
...
@@ -142,9 +229,6 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
SCliThrdObj
*
pThrd
=
(
SCliThrdObj
*
)
calloc
(
1
,
sizeof
(
SCliThrdObj
));
QUEUE_INIT
(
&
pThrd
->
msg
);
pthread_mutex_init
(
&
pThrd
->
msgMtx
,
NULL
);
// QUEUE_INIT(&pThrd->clientCache);
pThrd
->
loop
=
(
uv_loop_t
*
)
malloc
(
sizeof
(
uv_loop_t
));
uv_loop_init
(
pThrd
->
loop
);
...
...
@@ -186,6 +270,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
}
SCliMsg
*
msg
=
malloc
(
sizeof
(
SCliMsg
));
msg
->
context
=
pContext
;
msg
->
st
=
taosGetTimestampUs
();
SCliThrdObj
*
thrd
=
((
SClientObj
*
)
pRpc
->
tcphandle
)
->
pThreadObj
[
index
%
pRpc
->
numOfThreads
];
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
af9178c5
...
...
@@ -277,10 +277,6 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
}
return
;
}
if
(
terrno
!=
0
)
{
// handle err code
}
if
(
nread
!=
UV_EOF
)
{
tDebug
(
"Read error %s
\n
"
,
uv_err_name
(
nread
));
}
...
...
@@ -309,21 +305,23 @@ void uvOnWriteCb(uv_write_t* req, int status) {
void
uvWorkerAsyncCb
(
uv_async_t
*
handle
)
{
SWorkThrdObj
*
pThrd
=
container_of
(
handle
,
SWorkThrdObj
,
workerAsync
);
SConn
*
conn
=
NULL
;
//
opt later
queue
wq
;
//
batch process to avoid to lock/unlock frequently
pthread_mutex_lock
(
&
pThrd
->
connMtx
);
if
(
!
QUEUE_IS_EMPTY
(
&
pThrd
->
conn
))
{
queue
*
head
=
QUEUE_HEAD
(
&
pThrd
->
conn
);
conn
=
QUEUE_DATA
(
head
,
SConn
,
queue
);
QUEUE_REMOVE
(
head
);
}
QUEUE_MOVE
(
&
pThrd
->
conn
,
&
wq
);
pthread_mutex_unlock
(
&
pThrd
->
connMtx
);
if
(
conn
==
NULL
)
{
tError
(
"except occurred, do nothing"
);
return
;
while
(
!
QUEUE_IS_EMPTY
(
&
wq
))
{
queue
*
head
=
QUEUE_HEAD
(
&
wq
);
QUEUE_REMOVE
(
head
);
SConn
*
conn
=
QUEUE_DATA
(
head
,
SConn
,
queue
);
if
(
conn
==
NULL
)
{
tError
(
"except occurred, do nothing"
);
return
;
}
uv_buf_t
wb
=
uv_buf_init
(
conn
->
writeBuf
.
buf
,
conn
->
writeBuf
.
len
);
uv_write
(
conn
->
pWriter
,
(
uv_stream_t
*
)
conn
->
pTcp
,
&
wb
,
1
,
uvOnWriteCb
);
}
uv_buf_t
wb
=
uv_buf_init
(
conn
->
writeBuf
.
buf
,
conn
->
writeBuf
.
len
);
uv_write
(
conn
->
pWriter
,
(
uv_stream_t
*
)
conn
->
pTcp
,
&
wb
,
1
,
uvOnWriteCb
);
}
void
uvOnAcceptCb
(
uv_stream_t
*
stream
,
int
status
)
{
...
...
source/libs/transport/test/rclient.c
浏览文件 @
af9178c5
...
...
@@ -34,8 +34,8 @@ typedef struct {
static
void
processResponse
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SInfo
*
pInfo
=
(
SInfo
*
)
pMsg
->
ahandle
;
tDebug
(
"thread:%d, response is received, type:%d contLen:%d code:0x%x"
,
pInfo
->
index
,
pMsg
->
msgType
,
pMsg
->
contLen
,
pMsg
->
code
);
//
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
//
pMsg->code);
if
(
pEpSet
)
pInfo
->
epSet
=
*
pEpSet
;
...
...
@@ -57,7 +57,7 @@ static void *sendRequest(void *param) {
rpcMsg
.
contLen
=
pInfo
->
msgSize
;
rpcMsg
.
ahandle
=
pInfo
;
rpcMsg
.
msgType
=
1
;
tDebug
(
"thread:%d, send request, contLen:%d num:%d"
,
pInfo
->
index
,
pInfo
->
msgSize
,
pInfo
->
num
);
//
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest
(
pInfo
->
pRpc
,
&
pInfo
->
epSet
,
&
rpcMsg
,
NULL
);
if
(
pInfo
->
num
%
20000
==
0
)
tInfo
(
"thread:%d, %d requests have been sent"
,
pInfo
->
index
,
pInfo
->
num
);
tsem_wait
(
&
pInfo
->
rspSem
);
...
...
tests/script/sh/massiveTable/deployCluster.sh
浏览文件 @
af9178c5
...
...
@@ -5,6 +5,49 @@
set
-e
#set -x
masterDnode
=
slave
dataRootDir
=
"/data"
firstEp
=
"trd02:7000"
startPort
=
7000
dnodeNumber
=
1
updateSrc
=
no
while
getopts
"hm:f:n:r:p:u:"
arg
do
case
$arg
in
m
)
masterDnode
=
$(
echo
$OPTARG
)
;;
n
)
dnodeNumber
=
$(
echo
$OPTARG
)
;;
u
)
updateSrc
=
$(
echo
$OPTARG
)
;;
f
)
firstEp
=
$(
echo
$OPTARG
)
;;
p
)
startPort
=
$(
echo
$OPTARG
)
;;
r
)
dataRootDir
=
$(
echo
$OPTARG
)
;;
h
)
echo
"Usage:
`
basename
$0
`
-m [if master dnode] "
echo
" -n [ dnode number] "
echo
" -f [ first ep] "
echo
" -p [ start port] "
echo
" -r [ dnode root dir] "
exit
0
;;
?
)
#unknow option
echo
"unkonw argument"
exit
1
;;
esac
done
# deployCluster.sh
curr_dir
=
$(
readlink
-f
"
$(
dirname
"
$0
"
)
"
)
echo
$curr_dir
...
...
@@ -12,13 +55,21 @@ echo $curr_dir
${
curr_dir
}
/cleanCluster.sh
-r
"/data"
${
curr_dir
}
/cleanCluster.sh
-r
"/data2"
${
curr_dir
}
/compileVersion.sh
-r
${
curr_dir
}
/../../../../
-v
"3.0"
if
[[
"
${
updateSrc
}
"
==
"yes"
]]
;
then
${
curr_dir
}
/compileVersion.sh
-r
${
curr_dir
}
/../../../../
-v
"3.0"
fi
${
curr_dir
}
/setupDnodes.sh
-r
"/data"
-n
1
-f
"trd02:7000"
-p
7000
${
curr_dir
}
/setupDnodes.sh
-r
"/data2"
-n
1
-f
"trd02:7000"
-p
8000
${
curr_dir
}
/setupDnodes.sh
-r
"/data"
-n
${
dnodeNumber
}
-f
${
firstEp
}
-p
7000
${
curr_dir
}
/setupDnodes.sh
-r
"/data2"
-n
${
dnodeNumber
}
-f
${
firstEp
}
-p
8000
#./setupDnodes.sh -r "/data" -n 2 -f trd02:7000 -p 7000
#./setupDnodes.sh -r "/data2" -n 2 -f trd02:7000 -p 8000
if
[[
"
${
masterDnode
}
"
==
"master"
]]
;
then
# create all dnode into cluster
taos
-s
"create dnode trd02 port 8000;"
taos
-s
"create dnode trd03 port 7000;"
taos
-s
"create dnode trd03 port 8000;"
taos
-s
"create dnode trd04 port 7000;"
taos
-s
"create dnode trd04 port 8000;"
fi
...
...
tests/test/c/create_table.c
浏览文件 @
af9178c5
...
...
@@ -28,9 +28,14 @@ int32_t numOfThreads = 1;
int64_t
numOfTables
=
200000
;
int32_t
createTable
=
1
;
int32_t
insertData
=
0
;
int32_t
batchNum
=
100
;
int32_t
batchNumOfTbl
=
100
;
int32_t
batchNumOfRow
=
1
;
int32_t
numOfVgroups
=
2
;
int32_t
showTablesFlag
=
0
;
int32_t
queryFlag
=
0
;
int64_t
startTimestamp
=
1640966400000
;
// 2020-01-01 00:00:00.000
typedef
struct
{
int64_t
tableBeginIndex
;
...
...
@@ -167,7 +172,7 @@ void showTables() {
void
*
threadFunc
(
void
*
param
)
{
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
param
;
char
*
qstr
=
malloc
(
2000
*
1000
);
char
*
qstr
=
malloc
(
batchNumOfTbl
*
batchNumOfRow
*
128
);
int32_t
code
=
0
;
TAOS
*
con
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
...
...
@@ -192,7 +197,7 @@ void *threadFunc(void *param) {
// batch = MIN(batch, batchNum);
int32_t
len
=
sprintf
(
qstr
,
"create table"
);
for
(
int32_t
i
=
0
;
i
<
batchNum
;)
{
for
(
int32_t
i
=
0
;
i
<
batchNum
OfTbl
;)
{
len
+=
sprintf
(
qstr
+
len
,
" %s_t%"
PRId64
" using %s tags(%"
PRId64
")"
,
stbName
,
t
,
stbName
,
t
);
t
++
;
i
++
;
...
...
@@ -204,7 +209,7 @@ void *threadFunc(void *param) {
int64_t
startTs
=
taosGetTimestampUs
();
TAOS_RES
*
pRes
=
taos_query
(
con
,
qstr
);
code
=
taos_errno
(
pRes
);
if
((
code
!=
0
)
&&
(
code
!=
0x0002
))
{
if
((
code
!=
0
)
&&
(
code
!=
TSDB_CODE_RPC_AUTH_REQUIRED
))
{
pError
(
"failed to create table t%"
PRId64
", reason:%s"
,
t
,
tstrerror
(
code
));
}
taos_free_result
(
pRes
);
...
...
@@ -227,31 +232,49 @@ void *threadFunc(void *param) {
if
(
insertData
)
{
int64_t
curMs
=
0
;
int64_t
beginMs
=
taosGetTimestampMs
();
pInfo
->
startMs
=
beginMs
;
int64_t
t
=
pInfo
->
tableBeginIndex
;
for
(;
t
<=
pInfo
->
tableEndIndex
;)
{
// int64_t batch = (pInfo->tableEndIndex - t);
// batch = MIN(batch, batchNum);
pInfo
->
startMs
=
taosGetTimestampMs
();
for
(
int64_t
t
=
pInfo
->
tableBeginIndex
;
t
<
pInfo
->
tableEndIndex
;
++
t
)
{
int64_t
batch
=
(
pInfo
->
tableEndIndex
-
t
);
batch
=
MIN
(
batch
,
batchNum
);
int32_t
len
=
sprintf
(
qstr
,
"insert into"
);
for
(
int32_t
i
=
0
;
i
<
batch
;
++
i
)
{
len
+=
sprintf
(
qstr
+
len
,
" t%"
PRId64
" values(now, %"
PRId64
")"
,
t
+
i
,
t
+
i
);
}
int32_t
len
=
sprintf
(
qstr
,
"insert into "
);
for
(
int32_t
i
=
0
;
i
<
batchNumOfTbl
;)
{
int64_t
ts
=
startTimestamp
;
len
+=
sprintf
(
qstr
+
len
,
"%s_t%"
PRId64
" values "
,
stbName
,
t
);
for
(
int32_t
j
=
0
;
j
<
batchNumOfRow
;
j
++
)
{
len
+=
sprintf
(
qstr
+
len
,
"(%"
PRId64
", 6666) "
,
ts
++
);
}
t
++
;
i
++
;
if
(
t
>
pInfo
->
tableEndIndex
)
{
break
;
}
}
int64_t
startTs
=
taosGetTimestampUs
();
TAOS_RES
*
pRes
=
taos_query
(
con
,
qstr
);
code
=
taos_errno
(
pRes
);
if
(
code
!=
0
)
{
pError
(
"failed to insert
table t%"
PRId64
", reason:%s"
,
t
,
tstrerror
(
code
));
if
(
(
code
!=
0
)
&&
(
code
!=
TSDB_CODE_RPC_AUTH_REQUIRED
)
)
{
pError
(
"failed to insert
%s_t%"
PRId64
", reason:%s"
,
stbName
,
t
,
tstrerror
(
code
));
}
taos_free_result
(
pRes
);
int64_t
endTs
=
taosGetTimestampUs
();
int64_t
delay
=
endTs
-
startTs
;
// printf("==== %"PRId64" - %"PRId64", %"PRId64"\n", startTs, endTs, delay);
if
(
delay
>
pInfo
->
maxDelay
)
pInfo
->
maxDelay
=
delay
;
if
(
delay
<
pInfo
->
minDelay
)
pInfo
->
minDelay
=
delay
;
curMs
=
taosGetTimestampMs
();
if
(
curMs
-
beginMs
>
10000
)
{
beginMs
=
curMs
;
// printf("==== tableBeginIndex: %"PRId64", t: %"PRId64"\n", pInfo->tableBeginIndex, t);
printInsertProgress
(
pInfo
,
t
);
}
t
+=
(
batch
-
1
);
}
printInsertProgress
(
pInfo
,
pInfo
->
tableEndIndex
);
printInsertProgress
(
pInfo
,
t
);
}
taos_close
(
con
);
...
...
@@ -280,9 +303,13 @@ void printHelp() {
printf
(
"%s%s
\n
"
,
indent
,
"-i"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"insertData, default is "
,
insertData
);
printf
(
"%s%s
\n
"
,
indent
,
"-b"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"batchNum
, default is "
,
batchNum
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"batchNum
OfTbl, default is "
,
batchNumOfTbl
);
printf
(
"%s%s
\n
"
,
indent
,
"-w"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"showTablesFlag, default is "
,
showTablesFlag
);
printf
(
"%s%s
\n
"
,
indent
,
"-q"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"queryFlag, default is "
,
queryFlag
);
printf
(
"%s%s
\n
"
,
indent
,
"-l"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"batchNumOfRow, default is "
,
batchNumOfRow
);
exit
(
EXIT_SUCCESS
);
}
...
...
@@ -309,10 +336,15 @@ void parseArgument(int32_t argc, char *argv[]) {
}
else
if
(
strcmp
(
argv
[
i
],
"-i"
)
==
0
)
{
insertData
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-b"
)
==
0
)
{
batchNum
=
atoi
(
argv
[
++
i
]);
batchNumOfTbl
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-l"
)
==
0
)
{
batchNumOfRow
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-w"
)
==
0
)
{
showTablesFlag
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-q"
)
==
0
)
{
queryFlag
=
atoi
(
argv
[
++
i
]);
}
else
{
pPrint
(
"%s unknow para: %s %s"
,
GREEN
,
argv
[
++
i
],
NC
);
}
}
...
...
@@ -324,8 +356,10 @@ void parseArgument(int32_t argc, char *argv[]) {
pPrint
(
"%s numOfVgroups:%d %s"
,
GREEN
,
numOfVgroups
,
NC
);
pPrint
(
"%s createTable:%d %s"
,
GREEN
,
createTable
,
NC
);
pPrint
(
"%s insertData:%d %s"
,
GREEN
,
insertData
,
NC
);
pPrint
(
"%s batchNum:%d %s"
,
GREEN
,
batchNum
,
NC
);
pPrint
(
"%s batchNumOfTbl:%d %s"
,
GREEN
,
batchNumOfTbl
,
NC
);
pPrint
(
"%s batchNumOfRow:%d %s"
,
GREEN
,
batchNumOfRow
,
NC
);
pPrint
(
"%s showTablesFlag:%d %s"
,
GREEN
,
showTablesFlag
,
NC
);
pPrint
(
"%s queryFlag:%d %s"
,
GREEN
,
queryFlag
,
NC
);
pPrint
(
"%s start create table performace test %s"
,
GREEN
,
NC
);
}
...
...
@@ -338,8 +372,15 @@ int32_t main(int32_t argc, char *argv[]) {
return
0
;
}
createDbAndStb
();
if
(
queryFlag
)
{
//selectRowsFromTable();
return
0
;
}
if
(
createTable
)
{
createDbAndStb
();
}
pPrint
(
"%d threads are spawned to create %"
PRId64
" tables"
,
numOfThreads
,
numOfTables
);
pthread_attr_t
thattr
;
...
...
@@ -396,9 +437,11 @@ int32_t main(int32_t argc, char *argv[]) {
insertDataSpeed
+=
pInfo
[
i
].
insertDataSpeed
;
}
pPrint
(
"%s total %"
PRId64
" tables, %.1f tables/second, threads:%d, maxDelay: %"
PRId64
"us, minDelay: %"
PRId64
if
(
createTable
)
{
pPrint
(
"%s total %"
PRId64
" tables, %.1f tables/second, threads:%d, maxDelay: %"
PRId64
"us, minDelay: %"
PRId64
"us %s"
,
GREEN
,
numOfTables
,
createTableSpeed
,
numOfThreads
,
maxDelay
,
minDelay
,
NC
);
}
if
(
insertData
)
{
pPrint
(
"%s total %"
PRId64
" tables, %.1f rows/second, threads:%d %s"
,
GREEN
,
numOfTables
,
insertDataSpeed
,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录