Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
859c74a4
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
859c74a4
编写于
1月 26, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-11818] refactor sepset struct.
上级
9648fe40
变更
33
隐藏空白更改
内联
并排
Showing
33 changed file
with
314 addition
and
432 deletion
+314
-432
include/common/tep.h
include/common/tep.h
+12
-2
include/common/tmsg.h
include/common/tmsg.h
+12
-21
include/libs/qcom/query.h
include/libs/qcom/query.h
+1
-12
include/libs/transport/transport.h
include/libs/transport/transport.h
+0
-15
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+5
-19
source/client/src/clientMsgHandler.c
source/client/src/clientMsgHandler.c
+3
-2
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+97
-98
source/common/src/tep.c
source/common/src/tep.c
+20
-9
source/common/src/tglobal.c
source/common/src/tglobal.c
+5
-7
source/dnode/mgmt/impl/src/dndMgmt.c
source/dnode/mgmt/impl/src/dndMgmt.c
+22
-19
source/dnode/mgmt/impl/test/sut/src/client.cpp
source/dnode/mgmt/impl/test/sut/src/client.cpp
+2
-5
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+5
-5
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+4
-4
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+2
-2
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+7
-5
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+1
-3
source/dnode/mnode/impl/test/db/db.cpp
source/dnode/mnode/impl/test/db/db.cpp
+6
-6
source/dnode/mnode/impl/test/profile/profile.cpp
source/dnode/mnode/impl/test/profile/profile.cpp
+3
-3
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+1
-17
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+18
-18
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+2
-7
source/libs/parser/src/astValidate.c
source/libs/parser/src/astValidate.c
+4
-7
source/libs/parser/src/dCDAstProcess.c
source/libs/parser/src/dCDAstProcess.c
+1
-7
source/libs/parser/src/parserUtil.c
source/libs/parser/src/parserUtil.c
+0
-42
source/libs/parser/test/mockCatalogService.cpp
source/libs/parser/test/mockCatalogService.cpp
+21
-6
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+11
-24
source/libs/planner/src/physicalPlanJson.c
source/libs/planner/src/physicalPlanJson.c
+7
-7
source/libs/planner/test/phyPlanTests.cpp
source/libs/planner/test/phyPlanTests.cpp
+2
-4
source/libs/qcom/src/querymsg.c
source/libs/qcom/src/querymsg.c
+2
-2
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+6
-20
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+20
-20
source/libs/transport/src/rpcMain.c
source/libs/transport/src/rpcMain.c
+6
-6
source/libs/transport/test/rclient.c
source/libs/transport/test/rclient.c
+6
-8
未找到文件。
include/common/tep.h
浏览文件 @
859c74a4
#ifndef TDENGINE_TEP_H
#define TDENGINE_TEP_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "os.h"
#include "tmsg.h"
...
...
@@ -9,10 +13,16 @@ typedef struct SCorEpSet {
SEpSet
epSet
;
}
SCorEpSet
;
int
taosGetFqdnPortFromEp
(
const
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
);
int
taosGetFqdnPortFromEp
(
const
char
*
ep
,
SEp
*
pEp
);
void
addEpIntoEpSet
(
SEpSet
*
pEpSet
,
const
char
*
fqdn
,
uint16_t
port
);
bool
isEpsetEqual
(
const
SEpSet
*
s1
,
const
SEpSet
*
s2
);
void
updateEpSet_s
(
SCorEpSet
*
pEpSet
,
SEpSet
*
pNewEpSet
);
void
updateEpSet_s
(
SCorEpSet
*
pEpSet
,
SEpSet
*
pNewEpSet
);
SEpSet
getEpSet_s
(
SCorEpSet
*
pEpSet
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TEP_H
include/common/tmsg.h
浏览文件 @
859c74a4
...
...
@@ -154,10 +154,10 @@ typedef struct {
#pragma pack(push, 1)
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
typedef
struct
{
typedef
struct
SEp
{
char
fqdn
[
TSDB_FQDN_LEN
];
uint16_t
port
;
}
SEp
Addr
;
}
SEp
;
typedef
struct
{
int32_t
contLen
;
...
...
@@ -266,8 +266,7 @@ typedef struct {
typedef
struct
SEpSet
{
int8_t
inUse
;
int8_t
numOfEps
;
uint16_t
port
[
TSDB_MAX_REPLICA
];
char
fqdn
[
TSDB_MAX_REPLICA
][
TSDB_FQDN_LEN
];
SEp
eps
[
TSDB_MAX_REPLICA
];
}
SEpSet
;
static
FORCE_INLINE
int
taosEncodeSEpSet
(
void
**
buf
,
const
SEpSet
*
pEp
)
{
...
...
@@ -275,8 +274,8 @@ static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) {
tlen
+=
taosEncodeFixedI8
(
buf
,
pEp
->
inUse
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pEp
->
numOfEps
);
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
i
++
)
{
tlen
+=
taosEncodeFixedU16
(
buf
,
pEp
->
port
[
i
]
);
tlen
+=
taosEncodeString
(
buf
,
pEp
->
fqdn
[
i
]
);
tlen
+=
taosEncodeFixedU16
(
buf
,
pEp
->
eps
[
i
].
port
);
tlen
+=
taosEncodeString
(
buf
,
pEp
->
eps
[
i
].
fqdn
);
}
return
tlen
;
}
...
...
@@ -285,8 +284,8 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) {
buf
=
taosDecodeFixedI8
(
buf
,
&
pEp
->
inUse
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pEp
->
numOfEps
);
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
i
++
)
{
buf
=
taosDecodeFixedU16
(
buf
,
&
pEp
->
port
[
i
]
);
buf
=
taosDecodeStringTo
(
buf
,
pEp
->
fqdn
[
i
]
);
buf
=
taosDecodeFixedU16
(
buf
,
&
pEp
->
eps
[
i
].
port
);
buf
=
taosDecodeStringTo
(
buf
,
pEp
->
eps
[
i
].
fqdn
);
}
return
buf
;
}
...
...
@@ -617,8 +616,7 @@ typedef struct {
int32_t
id
;
int8_t
isMnode
;
int8_t
align
;
uint16_t
port
;
char
fqdn
[
TSDB_FQDN_LEN
];
SEp
ep
;
}
SDnodeEp
;
typedef
struct
{
...
...
@@ -691,24 +689,17 @@ typedef struct {
char
tableNames
[];
}
SMultiTableInfoReq
;
// todo refactor
typedef
struct
SVgroupInfo
{
int32_t
vgId
;
uint32_t
hashBegin
;
uint32_t
hashEnd
;
int8_t
inUse
;
int8_t
numOfEps
;
SEpAddr
epAddr
[
TSDB_MAX_REPLICA
];
SEpSet
epset
;
}
SVgroupInfo
;
typedef
struct
{
int32_t
vgId
;
int8_t
numOfEps
;
SEpAddr
epAddr
[
TSDB_MAX_REPLICA
];
}
SVgroupMsg
;
typedef
struct
{
int32_t
numOfVgroups
;
SVgroupMsg
vgroups
[];
int32_t
numOfVgroups
;
SVgroupInfo
vgroups
[];
}
SVgroupsInfo
;
typedef
struct
{
...
...
include/libs/qcom/query.h
浏览文件 @
859c74a4
...
...
@@ -128,20 +128,9 @@ typedef struct SMsgSendInfo {
typedef
struct
SQueryNodeAddr
{
int32_t
nodeId
;
// vgId or qnodeId
int8_t
inUse
;
int8_t
numOfEps
;
SEpAddr
epAddr
[
TSDB_MAX_REPLICA
];
SEpSet
epset
;
}
SQueryNodeAddr
;
static
FORCE_INLINE
void
tConvertQueryAddrToEpSet
(
SEpSet
*
pEpSet
,
const
SQueryNodeAddr
*
pAddr
)
{
pEpSet
->
inUse
=
pAddr
->
inUse
;
pEpSet
->
numOfEps
=
pAddr
->
numOfEps
;
for
(
int
j
=
0
;
j
<
TSDB_MAX_REPLICA
;
j
++
)
{
pEpSet
->
port
[
j
]
=
pAddr
->
epAddr
[
j
].
port
;
memcpy
(
pEpSet
->
fqdn
[
j
],
pAddr
->
epAddr
[
j
].
fqdn
,
TSDB_FQDN_LEN
);
}
}
int32_t
initTaskQueue
();
int32_t
cleanupTaskQueue
();
...
...
include/libs/transport/transport.h
浏览文件 @
859c74a4
...
...
@@ -20,21 +20,6 @@
extern
"C"
{
#endif
//typedef struct SEpAddr {
// char fqdn[TSDB_FQDN_LEN];
// uint16_t port;
//} SEpAddr;
//
//typedef struct SVgroup {
// int32_t vgId;
// int8_t numOfEps;
// SEpAddr epAddr[TSDB_MAX_REPLICA];
//} SVgroup;
//
//typedef struct SVgroupsInfo {
// int32_t numOfVgroups;
// SVgroup vgroups[];
//} SVgroupsInfo;
#ifdef __cplusplus
}
...
...
source/client/src/clientImpl.c
浏览文件 @
859c74a4
...
...
@@ -101,7 +101,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
}
if
(
port
)
{
epSet
.
epSet
.
port
[
0
]
=
port
;
epSet
.
epSet
.
eps
[
0
].
port
=
port
;
}
}
else
{
if
(
initEpSetFromCfg
(
tsFirst
,
tsSecond
,
&
epSet
)
<
0
)
{
...
...
@@ -701,7 +701,7 @@ int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSe
return
-
1
;
}
taosGetFqdnPortFromEp
(
firstEp
,
mgmtEpSet
->
fqdn
[
0
],
&
(
mgmtEpSet
->
port
[
0
])
);
taosGetFqdnPortFromEp
(
firstEp
,
&
mgmtEpSet
->
eps
[
0
]
);
mgmtEpSet
->
numOfEps
++
;
}
...
...
@@ -711,7 +711,7 @@ int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSe
return
-
1
;
}
taosGetFqdnPortFromEp
(
secondEp
,
mgmtEpSet
->
fqdn
[
mgmtEpSet
->
numOfEps
],
&
(
mgmtEpSet
->
port
[
mgmtEpSet
->
numOfEps
])
);
taosGetFqdnPortFromEp
(
secondEp
,
&
mgmtEpSet
->
eps
[
mgmtEpSet
->
numOfEps
]
);
mgmtEpSet
->
numOfEps
++
;
}
...
...
@@ -916,14 +916,7 @@ void* doFetchRow(SRequestObj* pRequest) {
SShowReqInfo
*
pShowReqInfo
=
&
pRequest
->
body
.
showInfo
;
SVgroupInfo
*
pVgroupInfo
=
taosArrayGet
(
pShowReqInfo
->
pArray
,
pShowReqInfo
->
currentIndex
);
epSet
.
numOfEps
=
pVgroupInfo
->
numOfEps
;
epSet
.
inUse
=
pVgroupInfo
->
inUse
;
for
(
int32_t
i
=
0
;
i
<
epSet
.
numOfEps
;
++
i
)
{
strncpy
(
epSet
.
fqdn
[
i
],
pVgroupInfo
->
epAddr
[
i
].
fqdn
,
tListLen
(
epSet
.
fqdn
[
i
]));
epSet
.
port
[
i
]
=
pVgroupInfo
->
epAddr
[
i
].
port
;
}
epSet
=
pVgroupInfo
->
epset
;
}
else
if
(
pRequest
->
type
==
TDMT_VND_SHOW_TABLES_FETCH
)
{
pRequest
->
type
=
TDMT_VND_SHOW_TABLES
;
SShowReqInfo
*
pShowReqInfo
=
&
pRequest
->
body
.
showInfo
;
...
...
@@ -940,14 +933,7 @@ void* doFetchRow(SRequestObj* pRequest) {
pRequest
->
body
.
requestMsg
.
pData
=
pShowReq
;
SMsgSendInfo
*
body
=
buildMsgInfoImpl
(
pRequest
);
epSet
.
numOfEps
=
pVgroupInfo
->
numOfEps
;
epSet
.
inUse
=
pVgroupInfo
->
inUse
;
for
(
int32_t
i
=
0
;
i
<
epSet
.
numOfEps
;
++
i
)
{
strncpy
(
epSet
.
fqdn
[
i
],
pVgroupInfo
->
epAddr
[
i
].
fqdn
,
tListLen
(
epSet
.
fqdn
[
i
]));
epSet
.
port
[
i
]
=
pVgroupInfo
->
epAddr
[
i
].
port
;
}
epSet
=
pVgroupInfo
->
epset
;
int64_t
transporterId
=
0
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
...
...
source/client/src/clientMsgHandler.c
浏览文件 @
859c74a4
...
...
@@ -53,7 +53,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
assert
(
pConnect
->
epSet
.
numOfEps
>
0
);
for
(
int32_t
i
=
0
;
i
<
pConnect
->
epSet
.
numOfEps
;
++
i
)
{
pConnect
->
epSet
.
port
[
i
]
=
htons
(
pConnect
->
epSet
.
port
[
i
]
);
pConnect
->
epSet
.
eps
[
i
].
port
=
htons
(
pConnect
->
epSet
.
eps
[
i
].
port
);
}
if
(
!
isEpsetEqual
(
&
pTscObj
->
pAppInfo
->
mgmtEp
.
epSet
,
&
pConnect
->
epSet
))
{
...
...
@@ -61,7 +61,8 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
}
for
(
int
i
=
0
;
i
<
pConnect
->
epSet
.
numOfEps
;
++
i
)
{
tscDebug
(
"0x%"
PRIx64
" epSet.fqdn[%d]:%s port:%d, connObj:0x%"
PRIx64
,
pRequest
->
requestId
,
i
,
pConnect
->
epSet
.
fqdn
[
i
],
pConnect
->
epSet
.
port
[
i
],
pTscObj
->
id
);
tscDebug
(
"0x%"
PRIx64
" epSet.fqdn[%d]:%s port:%d, connObj:0x%"
PRIx64
,
pRequest
->
requestId
,
i
,
pConnect
->
epSet
.
eps
[
i
].
fqdn
,
pConnect
->
epSet
.
eps
[
i
].
port
,
pTscObj
->
id
);
}
pTscObj
->
connId
=
pConnect
->
connId
;
...
...
source/client/test/clientTests.cpp
浏览文件 @
859c74a4
...
...
@@ -401,7 +401,16 @@ TEST(testCase, create_multiple_tables) {
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
pConn
,
nullptr
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1"
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taos_free_result
(
pRes
);
taos_close
(
pConn
);
return
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taos_free_result
(
pRes
);
...
...
@@ -411,6 +420,13 @@ TEST(testCase, create_multiple_tables) {
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, k int) tags(a int)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create stable tables, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table t_2 using st1 tags(1)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create multiple tables, reason:%s
\n
"
,
taos_errstr
(
pRes
));
...
...
@@ -528,6 +544,25 @@ TEST(testCase, generated_request_id_test) {
taosHashCleanup
(
phash
);
}
TEST
(
testCase
,
insert_test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
pConn
,
nullptr
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1"
);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into t_2 values(now, 1)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create into table t_2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taos_free_result
(
pRes
);
ASSERT_TRUE
(
false
);
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
#if 0
TEST(testCase, create_topic_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
...
...
@@ -552,25 +587,7 @@ TEST(testCase, create_topic_Test) {
taos_close(pConn);
}
TEST
(
testCase
,
insert_test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
pConn
,
nullptr
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1"
);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into t_2 values(now, 1)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create multiple tables, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taos_free_result
(
pRes
);
ASSERT_TRUE
(
false
);
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
#if 0
TEST(testCase, tmq_subscribe_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
...
...
@@ -597,24 +614,6 @@ TEST(testCase, tmq_commit_TEST) {
}
#endif
//TEST(testCase, insert_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "insert into t_2 values(now, 1)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
TEST
(
testCase
,
projection_query_tables
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
pConn
,
nullptr
);
...
...
@@ -666,66 +665,66 @@ TEST(testCase, projection_query_tables) {
taos_close
(
pConn
);
}
//
TEST(testCase, projection_query_stables) {
//
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
//
ASSERT_NE(pConn, nullptr);
//
//
TAOS_RES* pRes = taos_query(pConn, "use abc1");
//
taos_free_result(pRes);
//
// pRes = taos_query(pConn, "select ts from m
1");
//
if (taos_errno(pRes) != 0) {
//
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
//
taos_free_result(pRes);
//
ASSERT_TRUE(false);
//
}
//
//
TAOS_ROW pRow = NULL;
//
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
//
int32_t numOfFields = taos_num_fields(pRes);
//
//
char str[512] = {0};
//
while ((pRow = taos_fetch_row(pRes)) != NULL) {
//
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
//
printf("%s\n", str);
//
}
//
//
taos_free_result(pRes);
//
taos_close(pConn);
//
}
//
TEST(testCase, agg_query_tables) {
//
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
//
ASSERT_NE(pConn, nullptr);
//
// TAOS_RES* pRes = taos_query(pConn, "use dbv
");
//
taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create table tx using st
tags(111111111111111)");
//
if (taos_errno(pRes) != 0) {
//
printf("failed to create table, reason:%s\n", taos_errstr(pRes));
//
}
//
taos_free_result(pRes);
//
//
pRes = taos_query(pConn, "select count(*) from tu");
//
if (taos_errno(pRes) != 0) {
//
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
//
taos_free_result(pRes);
//
ASSERT_TRUE(false);
//
}
//
//
TAOS_ROW pRow = NULL;
//
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
//
int32_t numOfFields = taos_num_fields(pRes);
//
//
char str[512] = {0};
//
while ((pRow = taos_fetch_row(pRes)) != NULL) {
//
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
//
printf("%s\n", str);
//
}
//
//
taos_free_result(pRes);
//
taos_close(pConn);
//
}
TEST
(
testCase
,
projection_query_stables
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
pConn
,
nullptr
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1"
);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"select ts from st
1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to select from table, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taos_free_result
(
pRes
);
ASSERT_TRUE
(
false
);
}
TAOS_ROW
pRow
=
NULL
;
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
char
str
[
512
]
=
{
0
};
while
((
pRow
=
taos_fetch_row
(
pRes
))
!=
NULL
)
{
int32_t
code
=
taos_print_row
(
str
,
pRow
,
pFields
,
numOfFields
);
printf
(
"%s
\n
"
,
str
);
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
TEST
(
testCase
,
agg_query_tables
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
pConn
,
nullptr
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1
"
);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table tx using st1
tags(111111111111111)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create table, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"select count(*) from tu"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to select from table, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taos_free_result
(
pRes
);
ASSERT_TRUE
(
false
);
}
TAOS_ROW
pRow
=
NULL
;
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
char
str
[
512
]
=
{
0
};
while
((
pRow
=
taos_fetch_row
(
pRes
))
!=
NULL
)
{
int32_t
code
=
taos_print_row
(
str
,
pRow
,
pFields
,
numOfFields
);
printf
(
"%s
\n
"
,
str
);
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
#pragma GCC diagnostic pop
source/common/src/tep.c
浏览文件 @
859c74a4
...
...
@@ -2,32 +2,43 @@
#include "tglobal.h"
#include "tlockfree.h"
int
taosGetFqdnPortFromEp
(
const
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
)
{
*
port
=
0
;
strcpy
(
fqdn
,
ep
);
int
taosGetFqdnPortFromEp
(
const
char
*
ep
,
SEp
*
pEp
)
{
pEp
->
port
=
0
;
strcpy
(
pEp
->
fqdn
,
ep
);
char
*
temp
=
strchr
(
fqdn
,
':'
);
char
*
temp
=
strchr
(
pEp
->
fqdn
,
':'
);
if
(
temp
)
{
*
temp
=
0
;
*
port
=
atoi
(
temp
+
1
);
pEp
->
port
=
atoi
(
temp
+
1
);
}
if
(
*
port
==
0
)
{
*
port
=
tsServerPort
;
if
(
pEp
->
port
==
0
)
{
pEp
->
port
=
tsServerPort
;
return
-
1
;
}
return
0
;
}
void
addEpIntoEpSet
(
SEpSet
*
pEpSet
,
const
char
*
fqdn
,
uint16_t
port
)
{
if
(
pEpSet
==
NULL
||
fqdn
==
NULL
||
strlen
(
fqdn
)
==
0
)
{
return
;
}
int32_t
index
=
pEpSet
->
numOfEps
;
tstrncpy
(
pEpSet
->
eps
[
index
].
fqdn
,
fqdn
,
tListLen
(
pEpSet
->
eps
[
index
].
fqdn
));
pEpSet
->
eps
[
index
].
port
=
port
;
pEpSet
->
numOfEps
+=
1
;
}
bool
isEpsetEqual
(
const
SEpSet
*
s1
,
const
SEpSet
*
s2
)
{
if
(
s1
->
numOfEps
!=
s2
->
numOfEps
||
s1
->
inUse
!=
s2
->
inUse
)
{
return
false
;
}
for
(
int32_t
i
=
0
;
i
<
s1
->
numOfEps
;
i
++
)
{
if
(
s1
->
port
[
i
]
!=
s2
->
port
[
i
]
||
strncmp
(
s1
->
fqdn
[
i
],
s2
->
fqdn
[
i
]
,
TSDB_FQDN_LEN
)
!=
0
)
if
(
s1
->
eps
[
i
].
port
!=
s2
->
eps
[
i
].
port
||
strncmp
(
s1
->
eps
[
i
].
fqdn
,
s2
->
eps
[
i
].
fqdn
,
TSDB_FQDN_LEN
)
!=
0
)
return
false
;
}
return
true
;
...
...
source/common/src/tglobal.c
浏览文件 @
859c74a4
...
...
@@ -1080,9 +1080,7 @@ static void doInitGlobalConfig(void) {
void
taosInitGlobalCfg
()
{
pthread_once
(
&
tsInitGlobalCfgOnce
,
doInitGlobalConfig
);
}
int32_t
taosCheckAndPrintCfg
()
{
char
fqdn
[
TSDB_FQDN_LEN
];
uint16_t
port
;
SEp
ep
=
{
0
};
if
(
debugFlag
&
DEBUG_TRACE
||
debugFlag
&
DEBUG_DEBUG
||
debugFlag
&
DEBUG_DUMP
)
{
taosSetAllDebugFlag
();
}
...
...
@@ -1097,15 +1095,15 @@ int32_t taosCheckAndPrintCfg() {
if
(
tsFirst
[
0
]
==
0
)
{
strcpy
(
tsFirst
,
tsLocalEp
);
}
else
{
taosGetFqdnPortFromEp
(
tsFirst
,
fqdn
,
&
port
);
snprintf
(
tsFirst
,
sizeof
(
tsFirst
),
"%s:%u"
,
fqdn
,
port
);
taosGetFqdnPortFromEp
(
tsFirst
,
&
ep
);
snprintf
(
tsFirst
,
sizeof
(
tsFirst
),
"%s:%u"
,
ep
.
fqdn
,
ep
.
port
);
}
if
(
tsSecond
[
0
]
==
0
)
{
strcpy
(
tsSecond
,
tsLocalEp
);
}
else
{
taosGetFqdnPortFromEp
(
tsSecond
,
fqdn
,
&
port
);
snprintf
(
tsSecond
,
sizeof
(
tsSecond
),
"%s:%u"
,
fqdn
,
port
);
taosGetFqdnPortFromEp
(
tsSecond
,
&
ep
);
snprintf
(
tsSecond
,
sizeof
(
tsSecond
),
"%s:%u"
,
ep
.
fqdn
,
ep
.
port
);
}
taosCheckDataDirCfg
();
...
...
source/dnode/mgmt/impl/src/dndMgmt.c
浏览文件 @
859c74a4
...
...
@@ -57,13 +57,13 @@ void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint
SDnodeEp
*
pDnodeEp
=
taosHashGet
(
pMgmt
->
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
if
(
pDnodeEp
!=
NULL
)
{
if
(
pPort
!=
NULL
)
{
*
pPort
=
pDnodeEp
->
port
;
*
pPort
=
pDnodeEp
->
ep
.
port
;
}
if
(
pFqdn
!=
NULL
)
{
tstrncpy
(
pFqdn
,
pDnodeEp
->
fqdn
,
TSDB_FQDN_LEN
);
tstrncpy
(
pFqdn
,
pDnodeEp
->
ep
.
fqdn
,
TSDB_FQDN_LEN
);
}
if
(
pEp
!=
NULL
)
{
snprintf
(
pEp
,
TSDB_EP_LEN
,
"%s:%u"
,
pDnodeEp
->
fqdn
,
pDnodeEp
->
port
);
snprintf
(
pEp
,
TSDB_EP_LEN
,
"%s:%u"
,
pDnodeEp
->
ep
.
fqdn
,
pDnodeEp
->
ep
.
port
);
}
}
...
...
@@ -85,12 +85,12 @@ void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) {
dDebug
(
"RPC %p, req:%s is redirected, num:%d use:%d"
,
pReq
->
handle
,
TMSG_INFO
(
msgType
),
epSet
.
numOfEps
,
epSet
.
inUse
);
for
(
int32_t
i
=
0
;
i
<
epSet
.
numOfEps
;
++
i
)
{
dDebug
(
"mnode index:%d %s:%u"
,
i
,
epSet
.
fqdn
[
i
],
epSet
.
port
[
i
]
);
if
(
strcmp
(
epSet
.
fqdn
[
i
],
pDnode
->
cfg
.
localFqdn
)
==
0
&&
epSet
.
port
[
i
]
==
pDnode
->
cfg
.
serverPort
)
{
dDebug
(
"mnode index:%d %s:%u"
,
i
,
epSet
.
eps
[
i
].
fqdn
,
epSet
.
eps
[
i
].
port
);
if
(
strcmp
(
epSet
.
eps
[
i
].
fqdn
,
pDnode
->
cfg
.
localFqdn
)
==
0
&&
epSet
.
eps
[
i
].
port
==
pDnode
->
cfg
.
serverPort
)
{
epSet
.
inUse
=
(
i
+
1
)
%
epSet
.
numOfEps
;
}
epSet
.
port
[
i
]
=
htons
(
epSet
.
port
[
i
]
);
epSet
.
eps
[
i
].
port
=
htons
(
epSet
.
eps
[
i
].
port
);
}
rpcSendRedirectRsp
(
pReq
->
handle
,
&
epSet
);
...
...
@@ -104,7 +104,7 @@ static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
pMgmt
->
mnodeEpSet
=
*
pEpSet
;
for
(
int32_t
i
=
0
;
i
<
pEpSet
->
numOfEps
;
++
i
)
{
dInfo
(
"mnode index:%d %s:%u"
,
i
,
pEpSet
->
fqdn
[
i
],
pEpSet
->
port
[
i
]
);
dInfo
(
"mnode index:%d %s:%u"
,
i
,
pEpSet
->
eps
[
i
].
fqdn
,
pEpSet
->
eps
[
i
].
port
);
}
taosWUnLockLatch
(
&
pMgmt
->
latch
);
...
...
@@ -116,7 +116,7 @@ static void dndPrintDnodes(SDnode *pDnode) {
dDebug
(
"print dnode ep list, num:%d"
,
pMgmt
->
dnodeEps
->
num
);
for
(
int32_t
i
=
0
;
i
<
pMgmt
->
dnodeEps
->
num
;
i
++
)
{
SDnodeEp
*
pEp
=
&
pMgmt
->
dnodeEps
->
eps
[
i
];
dDebug
(
"dnode:%d, fqdn:%s port:%u isMnode:%d"
,
pEp
->
id
,
pEp
->
fqdn
,
pEp
->
port
,
pEp
->
isMnode
);
dDebug
(
"dnode:%d, fqdn:%s port:%u isMnode:%d"
,
pEp
->
id
,
pEp
->
ep
.
fqdn
,
pEp
->
ep
.
port
,
pEp
->
isMnode
);
}
}
...
...
@@ -145,8 +145,8 @@ static void dndResetDnodes(SDnode *pDnode, SDnodeEps *pDnodeEps) {
if
(
!
pDnodeEp
->
isMnode
)
continue
;
if
(
mIndex
>=
TSDB_MAX_REPLICA
)
continue
;
pMgmt
->
mnodeEpSet
.
numOfEps
++
;
strcpy
(
pMgmt
->
mnodeEpSet
.
fqdn
[
mIndex
],
pDnodeEp
->
fqdn
);
pMgmt
->
mnodeEpSet
.
port
[
mIndex
]
=
pDnodeEp
->
port
;
pMgmt
->
mnodeEpSet
.
eps
[
mIndex
]
=
pDnodeEp
->
ep
;
mIndex
++
;
}
...
...
@@ -167,7 +167,7 @@ static bool dndIsEpChanged(SDnode *pDnode, int32_t dnodeId, char *pEp) {
SDnodeEp
*
pDnodeEp
=
taosHashGet
(
pMgmt
->
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
if
(
pDnodeEp
!=
NULL
)
{
char
epstr
[
TSDB_EP_LEN
+
1
];
snprintf
(
epstr
,
TSDB_EP_LEN
,
"%s:%u"
,
pDnodeEp
->
fqdn
,
pDnodeEp
->
port
);
snprintf
(
epstr
,
TSDB_EP_LEN
,
"%s:%u"
,
pDnodeEp
->
ep
.
fqdn
,
pDnodeEp
->
ep
.
port
);
changed
=
strcmp
(
pEp
,
epstr
)
!=
0
;
}
...
...
@@ -251,11 +251,12 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
SDnodeEp
*
pDnodeEp
=
&
pMgmt
->
dnodeEps
->
eps
[
i
];
cJSON
*
d
nodeI
d
=
cJSON_GetObjectItem
(
node
,
"id"
);
if
(
!
d
nodeId
||
dnodeI
d
->
type
!=
cJSON_Number
)
{
cJSON
*
d
i
d
=
cJSON_GetObjectItem
(
node
,
"id"
);
if
(
!
d
id
||
di
d
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since dnodeId not found"
,
pMgmt
->
file
);
goto
PRASE_DNODE_OVER
;
}
pDnodeEp
->
id
=
dnodeId
->
valueint
;
cJSON
*
dnodeFqdn
=
cJSON_GetObjectItem
(
node
,
"fqdn"
);
...
...
@@ -263,14 +264,15 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
dError
(
"failed to read %s since dnodeFqdn not found"
,
pMgmt
->
file
);
goto
PRASE_DNODE_OVER
;
}
tstrncpy
(
pDnodeEp
->
fqdn
,
dnodeFqdn
->
valuestring
,
TSDB_FQDN_LEN
);
tstrncpy
(
pDnodeEp
->
ep
.
fqdn
,
dnodeFqdn
->
valuestring
,
TSDB_FQDN_LEN
);
cJSON
*
dnodePort
=
cJSON_GetObjectItem
(
node
,
"port"
);
if
(
!
dnodePort
||
dnodePort
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since dnodePort not found"
,
pMgmt
->
file
);
goto
PRASE_DNODE_OVER
;
}
pDnodeEp
->
port
=
dnodePort
->
valueint
;
pDnodeEp
->
ep
.
port
=
dnodePort
->
valueint
;
cJSON
*
isMnode
=
cJSON_GetObjectItem
(
node
,
"isMnode"
);
if
(
!
isMnode
||
isMnode
->
type
!=
cJSON_Number
)
{
...
...
@@ -298,7 +300,8 @@ PRASE_DNODE_OVER:
pMgmt
->
dnodeEps
=
calloc
(
1
,
sizeof
(
SDnodeEps
)
+
sizeof
(
SDnodeEp
));
pMgmt
->
dnodeEps
->
num
=
1
;
pMgmt
->
dnodeEps
->
eps
[
0
].
isMnode
=
1
;
taosGetFqdnPortFromEp
(
pDnode
->
cfg
.
firstEp
,
pMgmt
->
dnodeEps
->
eps
[
0
].
fqdn
,
&
pMgmt
->
dnodeEps
->
eps
[
0
].
port
);
taosGetFqdnPortFromEp
(
pDnode
->
cfg
.
firstEp
,
&
(
pMgmt
->
dnodeEps
->
eps
[
0
].
ep
));
}
dndResetDnodes
(
pDnode
,
pMgmt
->
dnodeEps
);
...
...
@@ -329,8 +332,8 @@ static int32_t dndWriteDnodes(SDnode *pDnode) {
for
(
int32_t
i
=
0
;
i
<
pMgmt
->
dnodeEps
->
num
;
++
i
)
{
SDnodeEp
*
pDnodeEp
=
&
pMgmt
->
dnodeEps
->
eps
[
i
];
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
id
\"
: %d,
\n
"
,
pDnodeEp
->
id
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
fqdn
\"
:
\"
%s
\"
,
\n
"
,
pDnodeEp
->
fqdn
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
port
\"
: %u,
\n
"
,
pDnodeEp
->
port
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
fqdn
\"
:
\"
%s
\"
,
\n
"
,
pDnodeEp
->
ep
.
fqdn
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
port
\"
: %u,
\n
"
,
pDnodeEp
->
ep
.
port
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
isMnode
\"
: %d
\n
"
,
pDnodeEp
->
isMnode
);
if
(
i
<
pMgmt
->
dnodeEps
->
num
-
1
)
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" },{
\n
"
);
...
...
@@ -450,7 +453,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
pDnodeEps
->
num
=
htonl
(
pDnodeEps
->
num
);
for
(
int32_t
i
=
0
;
i
<
pDnodeEps
->
num
;
++
i
)
{
pDnodeEps
->
eps
[
i
].
id
=
htonl
(
pDnodeEps
->
eps
[
i
].
id
);
pDnodeEps
->
eps
[
i
].
port
=
htons
(
pDnodeEps
->
eps
[
i
]
.
port
);
pDnodeEps
->
eps
[
i
].
ep
.
port
=
htons
(
pDnodeEps
->
eps
[
i
].
ep
.
port
);
}
dndUpdateDnodeEps
(
pDnode
,
pDnodeEps
);
...
...
source/dnode/mgmt/impl/test/sut/src/client.cpp
浏览文件 @
859c74a4
...
...
@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tep.h"
#include "sut.h"
static
void
processClientRsp
(
void
*
parent
,
SRpcMsg
*
pRsp
,
SEpSet
*
pEpSet
)
{
...
...
@@ -61,11 +62,7 @@ void TestClient::Cleanup() {
SRpcMsg
*
TestClient
::
SendReq
(
SRpcMsg
*
pReq
)
{
SEpSet
epSet
=
{
0
};
epSet
.
inUse
=
0
;
epSet
.
numOfEps
=
1
;
epSet
.
port
[
0
]
=
port
;
memcpy
(
epSet
.
fqdn
[
0
],
fqdn
,
TSDB_FQDN_LEN
);
addEpIntoEpSet
(
&
epSet
,
fqdn
,
port
);
rpcSendRequest
(
clientRpc
,
&
epSet
,
pReq
,
NULL
);
tsem_wait
(
&
sem
);
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
859c74a4
...
...
@@ -840,18 +840,18 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
pInfo
->
vgId
=
htonl
(
pVgroup
->
vgId
);
pInfo
->
hashBegin
=
htonl
(
pVgroup
->
hashBegin
);
pInfo
->
hashEnd
=
htonl
(
pVgroup
->
hashEnd
);
pInfo
->
numOfEps
=
pVgroup
->
replica
;
pInfo
->
epset
.
numOfEps
=
pVgroup
->
replica
;
for
(
int32_t
gid
=
0
;
gid
<
pVgroup
->
replica
;
++
gid
)
{
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
gid
];
SEp
Addr
*
pEpArrr
=
&
pInfo
->
epAddr
[
gid
];
SEp
*
pEp
=
&
pInfo
->
epset
.
eps
[
gid
];
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
if
(
pDnode
!=
NULL
)
{
memcpy
(
pEp
Arrr
->
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
pEp
Arrr
->
port
=
htons
(
pDnode
->
port
);
memcpy
(
pEp
->
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
pEp
->
port
=
htons
(
pDnode
->
port
);
}
mndReleaseDnode
(
pMnode
,
pDnode
);
if
(
pVgid
->
role
==
TAOS_SYNC_STATE_LEADER
)
{
pInfo
->
inUse
=
gid
;
pInfo
->
epset
.
inUse
=
gid
;
}
}
vindex
++
;
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
859c74a4
...
...
@@ -203,8 +203,8 @@ void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
}
SEpSet
mndGetDnodeEpset
(
SDnodeObj
*
pDnode
)
{
SEpSet
epSet
=
{
.
inUse
=
0
,
.
numOfEps
=
1
,
.
port
[
0
]
=
pDnode
->
port
};
memcpy
(
epSet
.
fqdn
[
0
],
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
SEpSet
epSet
=
{
0
};
addEpIntoEpSet
(
&
epSet
,
pDnode
->
fqdn
,
pDnode
->
port
);
return
epSet
;
}
...
...
@@ -261,8 +261,8 @@ static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t maxEps) {
SDnodeEp
*
pEp
=
&
pEps
->
eps
[
numOfEps
];
pEp
->
id
=
htonl
(
pDnode
->
id
);
pEp
->
port
=
htons
(
pDnode
->
port
);
memcpy
(
pEp
->
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
pEp
->
ep
.
port
=
htons
(
pDnode
->
port
);
memcpy
(
pEp
->
ep
.
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
pEp
->
isMnode
=
0
;
if
(
mndIsMnode
(
pMnode
,
pDnode
->
id
))
{
pEp
->
isMnode
=
1
;
...
...
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
859c74a4
...
...
@@ -237,8 +237,8 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
if
(
pIter
==
NULL
)
break
;
if
(
pObj
->
pDnode
==
NULL
)
break
;
pEpSet
->
port
[
pEpSet
->
numOfEps
]
=
htons
(
pObj
->
pDnode
->
port
);
memcpy
(
pEpSet
->
fqdn
[
pEpSet
->
numOfEps
]
,
pObj
->
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
pEpSet
->
eps
[
pEpSet
->
numOfEps
].
port
=
htons
(
pObj
->
pDnode
->
port
);
memcpy
(
pEpSet
->
eps
[
pEpSet
->
numOfEps
].
fqdn
,
pObj
->
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
if
(
pObj
->
role
==
TAOS_SYNC_STATE_LEADER
)
{
pEpSet
->
inUse
=
pEpSet
->
numOfEps
;
}
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
859c74a4
...
...
@@ -151,11 +151,13 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
SArray
*
pArray
;
SArray
*
inner
=
taosArrayGet
(
pDag
->
pSubplans
,
0
);
SSubplan
*
plan
=
taosArrayGetP
(
inner
,
0
);
plan
->
execNode
.
inUse
=
0
;
strcpy
(
plan
->
execNode
.
epAddr
[
0
].
fqdn
,
"localhost"
);
plan
->
execNode
.
epAddr
[
0
].
port
=
6030
;
plan
->
execNode
.
nodeId
=
2
;
plan
->
execNode
.
numOfEps
=
1
;
SEpSet
*
pEpSet
=
&
plan
->
execNode
.
epset
;
pEpSet
->
inUse
=
0
;
pEpSet
->
numOfEps
=
0
;
addEpIntoEpSet
(
pEpSet
,
"localhost"
,
6030
);
if
(
schedulerConvertDagToTaskList
(
pDag
,
&
pArray
)
<
0
)
{
return
-
1
;
...
...
@@ -167,7 +169,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
CEp
.
status
=
0
;
CEp
.
lastConsumerHbTs
=
CEp
.
lastVgHbTs
=
-
1
;
STaskInfo
*
pTaskInfo
=
taosArrayGet
(
pArray
,
i
);
tConvertQueryAddrToEpSet
(
&
CEp
.
epSet
,
&
pTaskInfo
->
addr
)
;
CEp
.
epSet
=
pTaskInfo
->
addr
.
epset
;
/*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1], CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/
CEp
.
vgId
=
pTaskInfo
->
addr
.
nodeId
;
CEp
.
qmsgLen
=
pTaskInfo
->
msg
->
contentLen
;
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
859c74a4
...
...
@@ -424,9 +424,7 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) {
epset
.
inUse
=
epset
.
numOfEps
;
}
epset
.
port
[
epset
.
numOfEps
]
=
pDnode
->
port
;
memcpy
(
&
epset
.
fqdn
[
epset
.
numOfEps
],
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
epset
.
numOfEps
++
;
addEpIntoEpSet
(
&
epset
,
pDnode
->
fqdn
,
pDnode
->
port
);
mndReleaseDnode
(
pMnode
,
pDnode
);
}
...
...
source/dnode/mnode/impl/test/db/db.cpp
浏览文件 @
859c74a4
...
...
@@ -277,9 +277,9 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
EXPECT_GT
(
pInfo
->
vgId
,
0
);
EXPECT_EQ
(
pInfo
->
hashBegin
,
0
);
EXPECT_EQ
(
pInfo
->
hashEnd
,
UINT32_MAX
/
2
-
1
);
EXPECT_EQ
(
pInfo
->
inUse
,
0
);
EXPECT_EQ
(
pInfo
->
numOfEps
,
1
);
SEp
Addr
*
pAddr
=
&
pInfo
->
epAddr
[
0
];
EXPECT_EQ
(
pInfo
->
epset
.
inUse
,
0
);
EXPECT_EQ
(
pInfo
->
epset
.
numOfEps
,
1
);
SEp
*
pAddr
=
&
pInfo
->
epset
.
eps
[
0
];
pAddr
->
port
=
htons
(
pAddr
->
port
);
EXPECT_EQ
(
pAddr
->
port
,
9030
);
EXPECT_STREQ
(
pAddr
->
fqdn
,
"localhost"
);
...
...
@@ -293,9 +293,9 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
EXPECT_GT
(
pInfo
->
vgId
,
0
);
EXPECT_EQ
(
pInfo
->
hashBegin
,
UINT32_MAX
/
2
);
EXPECT_EQ
(
pInfo
->
hashEnd
,
UINT32_MAX
);
EXPECT_EQ
(
pInfo
->
inUse
,
0
);
EXPECT_EQ
(
pInfo
->
numOfEps
,
1
);
SEp
Addr
*
pAddr
=
&
pInfo
->
epAddr
[
0
];
EXPECT_EQ
(
pInfo
->
epset
.
inUse
,
0
);
EXPECT_EQ
(
pInfo
->
epset
.
numOfEps
,
1
);
SEp
*
pAddr
=
&
pInfo
->
epset
.
eps
[
0
];
pAddr
->
port
=
htons
(
pAddr
->
port
);
EXPECT_EQ
(
pAddr
->
port
,
9030
);
EXPECT_STREQ
(
pAddr
->
fqdn
,
"localhost"
);
...
...
source/dnode/mnode/impl/test/profile/profile.cpp
浏览文件 @
859c74a4
...
...
@@ -44,7 +44,7 @@ TEST_F(MndTestProfile, 01_ConnectMsg) {
pRsp
->
acctId
=
htonl
(
pRsp
->
acctId
);
pRsp
->
clusterId
=
htobe64
(
pRsp
->
clusterId
);
pRsp
->
connId
=
htonl
(
pRsp
->
connId
);
pRsp
->
epSet
.
port
[
0
]
=
htons
(
pRsp
->
epSet
.
port
[
0
]
);
pRsp
->
epSet
.
eps
[
0
].
port
=
htons
(
pRsp
->
epSet
.
eps
[
0
].
port
);
EXPECT_EQ
(
pRsp
->
acctId
,
1
);
EXPECT_GT
(
pRsp
->
clusterId
,
0
);
...
...
@@ -53,8 +53,8 @@ TEST_F(MndTestProfile, 01_ConnectMsg) {
EXPECT_EQ
(
pRsp
->
epSet
.
inUse
,
0
);
EXPECT_EQ
(
pRsp
->
epSet
.
numOfEps
,
1
);
EXPECT_EQ
(
pRsp
->
epSet
.
port
[
0
]
,
9031
);
EXPECT_STREQ
(
pRsp
->
epSet
.
fqdn
[
0
]
,
"localhost"
);
EXPECT_EQ
(
pRsp
->
epSet
.
eps
[
0
].
port
,
9031
);
EXPECT_STREQ
(
pRsp
->
epSet
.
eps
[
0
].
fqdn
,
"localhost"
);
connId
=
pRsp
->
connId
;
}
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
859c74a4
...
...
@@ -65,7 +65,6 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S
int32_t
ctgGetDBVgroupFromMnode
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SBuildUseDBInput
*
input
,
SUseDbOutput
*
out
)
{
char
*
msg
=
NULL
;
SEpSet
*
pVnodeEpSet
=
NULL
;
int32_t
msgLen
=
0
;
ctgDebug
(
"try to get db vgroup from mnode, db:%s"
,
input
->
db
);
...
...
@@ -216,17 +215,6 @@ int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableN
return
TSDB_CODE_SUCCESS
;
}
void
ctgGenEpSet
(
SEpSet
*
epSet
,
SVgroupInfo
*
vgroupInfo
)
{
epSet
->
inUse
=
0
;
epSet
->
numOfEps
=
vgroupInfo
->
numOfEps
;
for
(
int32_t
i
=
0
;
i
<
vgroupInfo
->
numOfEps
;
++
i
)
{
memcpy
(
&
epSet
->
port
[
i
],
&
vgroupInfo
->
epAddr
[
i
].
port
,
sizeof
(
epSet
->
port
[
i
]));
memcpy
(
&
epSet
->
fqdn
[
i
],
&
vgroupInfo
->
epAddr
[
i
].
fqdn
,
sizeof
(
epSet
->
fqdn
[
i
]));
}
}
int32_t
ctgGetTableMetaFromMnodeImpl
(
struct
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
char
*
tbFullName
,
STableMetaOutput
*
output
)
{
SBuildTableMetaInput
bInput
=
{.
vgId
=
0
,
.
dbName
=
NULL
,
.
tableFullName
=
tbFullName
};
char
*
msg
=
NULL
;
...
...
@@ -292,7 +280,6 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter,
SBuildTableMetaInput
bInput
=
{.
vgId
=
vgroupInfo
->
vgId
,
.
dbName
=
dbFullName
,
.
tableFullName
=
(
char
*
)
tNameGetTableName
(
pTableName
)};
char
*
msg
=
NULL
;
SEpSet
*
pVnodeEpSet
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
TDMT_VND_TABLE_META
)](
&
bInput
,
&
msg
,
0
,
&
msgLen
);
...
...
@@ -308,10 +295,7 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter,
};
SRpcMsg
rpcRsp
=
{
0
};
SEpSet
epSet
;
ctgGenEpSet
(
&
epSet
,
vgroupInfo
);
rpcSendRecv
(
pTransporter
,
&
epSet
,
&
rpcMsg
,
&
rpcRsp
);
rpcSendRecv
(
pTransporter
,
&
vgroupInfo
->
epset
,
&
rpcMsg
,
&
rpcRsp
);
if
(
TSDB_CODE_SUCCESS
!=
rpcRsp
.
code
)
{
if
(
CTG_TABLE_NOT_EXIST
(
rpcRsp
.
code
))
{
...
...
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
859c74a4
...
...
@@ -195,10 +195,10 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
vgInfo
.
vgId
=
i
+
1
;
vgInfo
.
hashBegin
=
i
*
hashUnit
;
vgInfo
.
hashEnd
=
hashUnit
*
(
i
+
1
)
-
1
;
vgInfo
.
numOfEps
=
i
%
TSDB_MAX_REPLICA
+
1
;
vgInfo
.
inUse
=
i
%
vgInfo
.
numOfEps
;
for
(
int32_t
n
=
0
;
n
<
vgInfo
.
numOfEps
;
++
n
)
{
SEp
Addr
*
addr
=
&
vgInfo
.
epAddr
[
n
];
vgInfo
.
epset
.
numOfEps
=
i
%
TSDB_MAX_REPLICA
+
1
;
vgInfo
.
epset
.
inUse
=
i
%
vgInfo
.
epset
.
numOfEps
;
for
(
int32_t
n
=
0
;
n
<
vgInfo
.
epset
.
numOfEps
;
++
n
)
{
SEp
*
addr
=
&
vgInfo
.
epset
.
eps
[
n
];
strcpy
(
addr
->
fqdn
,
"a0"
);
addr
->
port
=
htons
(
n
+
22
);
}
...
...
@@ -229,10 +229,10 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
vg
->
vgId
=
htonl
(
i
+
1
);
vg
->
hashBegin
=
htonl
(
i
*
hashUnit
);
vg
->
hashEnd
=
htonl
(
hashUnit
*
(
i
+
1
)
-
1
);
vg
->
numOfEps
=
i
%
TSDB_MAX_REPLICA
+
1
;
vg
->
inUse
=
i
%
vg
->
numOfEps
;
for
(
int32_t
n
=
0
;
n
<
vg
->
numOfEps
;
++
n
)
{
SEp
Addr
*
addr
=
&
vg
->
epAddr
[
n
];
vg
->
epset
.
numOfEps
=
i
%
TSDB_MAX_REPLICA
+
1
;
vg
->
epset
.
inUse
=
i
%
vg
->
epset
.
numOfEps
;
for
(
int32_t
n
=
0
;
n
<
vg
->
epset
.
numOfEps
;
++
n
)
{
SEp
*
addr
=
&
vg
->
epset
.
eps
[
n
];
strcpy
(
addr
->
fqdn
,
"a0"
);
addr
->
port
=
htons
(
n
+
22
);
}
...
...
@@ -693,7 +693,7 @@ TEST(tableMeta, normalTable) {
code
=
catalogGetTableHashVgroup
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
&
n
,
&
vgInfo
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
vgInfo
.
vgId
,
8
);
ASSERT_EQ
(
vgInfo
.
numOfEps
,
3
);
ASSERT_EQ
(
vgInfo
.
epset
.
numOfEps
,
3
);
ctgTestSetPrepareTableMeta
();
...
...
@@ -983,7 +983,7 @@ TEST(tableDistVgroup, normalTable) {
ASSERT_EQ
(
taosArrayGetSize
((
const
SArray
*
)
vgList
),
1
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
0
);
ASSERT_EQ
(
vgInfo
->
vgId
,
8
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
3
);
ASSERT_EQ
(
vgInfo
->
epset
.
numOfEps
,
3
);
catalogDestroy
();
}
...
...
@@ -1015,7 +1015,7 @@ TEST(tableDistVgroup, childTableCase) {
ASSERT_EQ
(
taosArrayGetSize
((
const
SArray
*
)
vgList
),
1
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
0
);
ASSERT_EQ
(
vgInfo
->
vgId
,
9
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
4
);
ASSERT_EQ
(
vgInfo
->
epset
.
numOfEps
,
4
);
catalogDestroy
();
}
...
...
@@ -1046,13 +1046,13 @@ TEST(tableDistVgroup, superTableCase) {
ASSERT_EQ
(
taosArrayGetSize
((
const
SArray
*
)
vgList
),
10
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
0
);
ASSERT_EQ
(
vgInfo
->
vgId
,
1
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
1
);
ASSERT_EQ
(
vgInfo
->
epset
.
numOfEps
,
1
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
1
);
ASSERT_EQ
(
vgInfo
->
vgId
,
2
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
2
);
ASSERT_EQ
(
vgInfo
->
epset
.
numOfEps
,
2
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
2
);
ASSERT_EQ
(
vgInfo
->
vgId
,
3
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
3
);
ASSERT_EQ
(
vgInfo
->
epset
.
numOfEps
,
3
);
catalogDestroy
();
}
...
...
@@ -1088,14 +1088,14 @@ TEST(dbVgroup, getSetDbVgroupCase) {
code
=
catalogGetTableHashVgroup
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
&
n
,
&
vgInfo
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
vgInfo
.
vgId
,
8
);
ASSERT_EQ
(
vgInfo
.
numOfEps
,
3
);
ASSERT_EQ
(
vgInfo
.
epset
.
numOfEps
,
3
);
code
=
catalogGetTableDistVgroup
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
&
n
,
&
vgList
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
taosArrayGetSize
((
const
SArray
*
)
vgList
),
1
);
pvgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
0
);
ASSERT_EQ
(
pvgInfo
->
vgId
,
8
);
ASSERT_EQ
(
pvgInfo
->
numOfEps
,
3
);
ASSERT_EQ
(
pvgInfo
->
epset
.
numOfEps
,
3
);
taosArrayDestroy
(
vgList
);
ctgTestBuildDBVgroup
(
&
dbVgroup
);
...
...
@@ -1105,14 +1105,14 @@ TEST(dbVgroup, getSetDbVgroupCase) {
code
=
catalogGetTableHashVgroup
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
&
n
,
&
vgInfo
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
vgInfo
.
vgId
,
7
);
ASSERT_EQ
(
vgInfo
.
numOfEps
,
2
);
ASSERT_EQ
(
vgInfo
.
epset
.
numOfEps
,
2
);
code
=
catalogGetTableDistVgroup
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
&
n
,
&
vgList
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
taosArrayGetSize
((
const
SArray
*
)
vgList
),
1
);
pvgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
0
);
ASSERT_EQ
(
pvgInfo
->
vgId
,
8
);
ASSERT_EQ
(
pvgInfo
->
numOfEps
,
3
);
ASSERT_EQ
(
pvgInfo
->
epset
.
numOfEps
,
3
);
taosArrayDestroy
(
vgList
);
catalogDestroy
();
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
859c74a4
...
...
@@ -5163,14 +5163,9 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
SDownstreamSource
*
pSource
=
taosArrayGet
(
pExchangeInfo
->
pSources
,
pExchangeInfo
->
current
);
SEpSet
epSet
=
{
0
};
epSet
.
numOfEps
=
pSource
->
addr
.
numOfEps
;
epSet
.
port
[
0
]
=
pSource
->
addr
.
epAddr
[
0
].
port
;
tstrncpy
(
epSet
.
fqdn
[
0
],
pSource
->
addr
.
epAddr
[
0
].
fqdn
,
tListLen
(
epSet
.
fqdn
[
0
]));
int64_t
startTs
=
taosGetTimestampUs
();
qDebug
(
"%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%"
PRIx64
", %d/%"
PRIzu
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
epSet
.
fqdn
[
0
]
,
pSource
->
taskId
,
pExchangeInfo
->
current
,
totalSources
);
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
addr
.
epset
.
eps
[
0
].
fqdn
,
pSource
->
taskId
,
pExchangeInfo
->
current
,
totalSources
);
pMsg
->
header
.
vgId
=
htonl
(
pSource
->
addr
.
nodeId
);
pMsg
->
sId
=
htobe64
(
pSource
->
schedId
);
...
...
@@ -5192,7 +5187,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
pMsgSendInfo
->
fp
=
loadRemoteDataCallback
;
int64_t
transporterId
=
0
;
int32_t
code
=
asyncSendMsgToServer
(
pExchangeInfo
->
pTransporter
,
&
epS
et
,
&
transporterId
,
pMsgSendInfo
);
int32_t
code
=
asyncSendMsgToServer
(
pExchangeInfo
->
pTransporter
,
&
pSource
->
addr
.
eps
et
,
&
transporterId
,
pMsgSendInfo
);
tsem_wait
(
&
pExchangeInfo
->
ready
);
SRetrieveTableRsp
*
pRsp
=
pExchangeInfo
->
pRsp
;
...
...
source/libs/parser/src/astValidate.c
浏览文件 @
859c74a4
...
...
@@ -3644,6 +3644,7 @@ int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf)
return
TSDB_CODE_SUCCESS
;
}
//TODO remove it
int32_t
setTableVgroupList
(
SParseContext
*
pCtx
,
SName
*
name
,
SVgroupsInfo
**
pVgList
)
{
SArray
*
vgroupList
=
NULL
;
int32_t
code
=
catalogGetTableDistVgroup
(
pCtx
->
pCatalog
,
pCtx
->
pTransporter
,
&
pCtx
->
mgmtEpSet
,
name
,
&
vgroupList
);
...
...
@@ -3651,21 +3652,17 @@ int32_t setTableVgroupList(SParseContext *pCtx, SName* name, SVgroupsInfo **pVgL
return
code
;
}
int32
_t
vgroupNum
=
taosArrayGetSize
(
vgroupList
);
size
_t
vgroupNum
=
taosArrayGetSize
(
vgroupList
);
SVgroupsInfo
*
vgList
=
calloc
(
1
,
sizeof
(
SVgroupsInfo
)
+
sizeof
(
SVgroupMsg
)
*
vgroupNum
);
SVgroupsInfo
*
vgList
=
calloc
(
1
,
sizeof
(
SVgroupsInfo
)
+
sizeof
(
SVgroupInfo
)
*
vgroupNum
);
vgList
->
numOfVgroups
=
vgroupNum
;
for
(
int32_t
i
=
0
;
i
<
vgroupNum
;
++
i
)
{
SVgroupInfo
*
vg
=
taosArrayGet
(
vgroupList
,
i
);
vgList
->
vgroups
[
i
].
vgId
=
vg
->
vgId
;
vgList
->
vgroups
[
i
].
numOfEps
=
vg
->
numOfEps
;
memcpy
(
vgList
->
vgroups
[
i
].
epAddr
,
vg
->
epAddr
,
sizeof
(
vgList
->
vgroups
[
i
].
epAddr
));
vgList
->
vgroups
[
i
]
=
*
vg
;
}
*
pVgList
=
vgList
;
taosArrayDestroy
(
vgroupList
);
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/parser/src/dCDAstProcess.c
浏览文件 @
859c74a4
...
...
@@ -58,13 +58,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out
SVgroupInfo
*
info
=
taosArrayGet
(
array
,
0
);
pShowReq
->
head
.
vgId
=
htonl
(
info
->
vgId
);
pEpSet
->
numOfEps
=
info
->
numOfEps
;
pEpSet
->
inUse
=
info
->
inUse
;
for
(
int32_t
i
=
0
;
i
<
pEpSet
->
numOfEps
;
++
i
)
{
strncpy
(
pEpSet
->
fqdn
[
i
],
info
->
epAddr
[
i
].
fqdn
,
tListLen
(
pEpSet
->
fqdn
[
i
]));
pEpSet
->
port
[
i
]
=
info
->
epAddr
[
i
].
port
;
}
*
pEpSet
=
info
->
epset
;
*
outputLen
=
sizeof
(
SVShowTablesReq
);
*
output
=
pShowReq
;
...
...
source/libs/parser/src/parserUtil.c
浏览文件 @
859c74a4
...
...
@@ -1426,35 +1426,6 @@ bool isQueryWithLimit(SQueryStmtInfo* pQueryInfo) {
return
false
;
}
SVgroupsInfo
*
vgroupInfoClone
(
SVgroupsInfo
*
vgroupList
)
{
if
(
vgroupList
==
NULL
)
{
return
NULL
;
}
size_t
size
=
sizeof
(
SVgroupsInfo
)
+
sizeof
(
SVgroupMsg
)
*
vgroupList
->
numOfVgroups
;
SVgroupsInfo
*
pNew
=
malloc
(
size
);
if
(
pNew
==
NULL
)
{
return
NULL
;
}
pNew
->
numOfVgroups
=
vgroupList
->
numOfVgroups
;
for
(
int32_t
i
=
0
;
i
<
vgroupList
->
numOfVgroups
;
++
i
)
{
SVgroupMsg
*
pNewVInfo
=
&
pNew
->
vgroups
[
i
];
SVgroupMsg
*
pvInfo
=
&
vgroupList
->
vgroups
[
i
];
pNewVInfo
->
vgId
=
pvInfo
->
vgId
;
pNewVInfo
->
numOfEps
=
pvInfo
->
numOfEps
;
for
(
int32_t
j
=
0
;
j
<
pvInfo
->
numOfEps
;
++
j
)
{
pNewVInfo
->
epAddr
[
j
].
port
=
pvInfo
->
epAddr
[
j
].
port
;
tstrncpy
(
pNewVInfo
->
epAddr
[
j
].
fqdn
,
pvInfo
->
epAddr
[
j
].
fqdn
,
TSDB_FQDN_LEN
);
}
}
return
pNew
;
}
void
*
vgroupInfoClear
(
SVgroupsInfo
*
vgroupList
)
{
if
(
vgroupList
==
NULL
)
{
return
NULL
;
...
...
@@ -1505,19 +1476,6 @@ STableMeta* tableMetaDup(const STableMeta* pTableMeta) {
return
p
;
}
SVgroupsInfo
*
vgroupsInfoDup
(
SVgroupsInfo
*
pVgroupsInfo
)
{
assert
(
pVgroupsInfo
!=
NULL
);
size_t
size
=
sizeof
(
SVgroupMsg
)
*
pVgroupsInfo
->
numOfVgroups
+
sizeof
(
SVgroupsInfo
);
SVgroupsInfo
*
pInfo
=
calloc
(
1
,
size
);
pInfo
->
numOfVgroups
=
pVgroupsInfo
->
numOfVgroups
;
for
(
int32_t
m
=
0
;
m
<
pVgroupsInfo
->
numOfVgroups
;
++
m
)
{
memcpy
(
&
pInfo
->
vgroups
[
m
],
&
pVgroupsInfo
->
vgroups
[
m
],
sizeof
(
SVgroupMsg
));
}
return
pInfo
;
}
int32_t
getNumOfOutput
(
SFieldInfo
*
pFieldInfo
)
{
return
pFieldInfo
->
numOfOutput
;
}
...
...
source/libs/parser/test/mockCatalogService.cpp
浏览文件 @
859c74a4
...
...
@@ -15,6 +15,7 @@
#include "mockCatalogService.h"
#include "tep.h"
#include <iomanip>
#include <iostream>
#include <map>
...
...
@@ -39,7 +40,16 @@ public:
virtual
TableBuilder
&
setVgid
(
int16_t
vgid
)
{
schema
()
->
vgId
=
vgid
;
meta_
->
vgs
.
emplace_back
(
SVgroupInfo
{.
vgId
=
vgid
,
.
hashBegin
=
0
,
.
hashEnd
=
0
,
.
inUse
=
0
,
.
numOfEps
=
3
,
.
epAddr
=
{{
"dnode_1"
,
6030
},
{
"dnode_2"
,
6030
},
{
"dnode_3"
,
6030
}}});
SVgroupInfo
vgroup
=
{.
vgId
=
vgid
,
.
hashBegin
=
0
,
.
hashEnd
=
0
,
};
vgroup
.
epset
.
eps
[
0
]
=
(
SEp
){
"dnode_1"
,
6030
};
vgroup
.
epset
.
eps
[
1
]
=
(
SEp
){
"dnode_2"
,
6030
};
vgroup
.
epset
.
eps
[
2
]
=
(
SEp
){
"dnode_3"
,
6030
};
vgroup
.
epset
.
inUse
=
0
;
vgroup
.
epset
.
numOfEps
=
3
;
meta_
->
vgs
.
emplace_back
(
vgroup
);
return
*
this
;
}
...
...
@@ -112,9 +122,7 @@ public:
int32_t
catalogGetTableHashVgroup
(
const
SName
*
pTableName
,
SVgroupInfo
*
vgInfo
)
const
{
// todo
vgInfo
->
vgId
=
1
;
vgInfo
->
numOfEps
=
1
;
vgInfo
->
epAddr
[
0
].
port
=
6030
;
strcpy
(
vgInfo
->
epAddr
[
0
].
fqdn
,
"node1"
);
addEpIntoEpSet
(
&
vgInfo
->
epset
,
"node1"
,
6030
);
return
0
;
}
...
...
@@ -133,9 +141,16 @@ public:
meta_
[
db
][
tbname
].
reset
(
new
MockTableMeta
());
meta_
[
db
][
tbname
]
->
schema
.
reset
(
table
.
release
());
meta_
[
db
][
tbname
]
->
schema
->
uid
=
id_
++
;
meta_
[
db
][
tbname
]
->
vgs
.
emplace_back
((
SVgroupInfo
){.
vgId
=
vgid
,
.
hashBegin
=
0
,
.
hashEnd
=
0
,
.
inUse
=
0
,
.
numOfEps
=
3
,
.
epAddr
=
{{
"dnode_1"
,
6030
},
{
"dnode_2"
,
6030
},
{
"dnode_3"
,
6030
}}});
SVgroupInfo
vgroup
=
{.
vgId
=
vgid
,
.
hashBegin
=
0
,
.
hashEnd
=
0
,};
addEpIntoEpSet
(
&
vgroup
.
epset
,
"dnode_1"
,
6030
);
addEpIntoEpSet
(
&
vgroup
.
epset
,
"dnode_2"
,
6030
);
addEpIntoEpSet
(
&
vgroup
.
epset
,
"dnode_3"
,
6030
);
vgroup
.
epset
.
inUse
=
0
;
meta_
[
db
][
tbname
]
->
vgs
.
emplace_back
(
vgroup
);
// super table
meta_
[
db
][
stbname
]
->
vgs
.
emplace_back
(
(
SVgroupInfo
){.
vgId
=
vgid
,
.
hashBegin
=
0
,
.
hashEnd
=
0
,
.
inUse
=
0
,
.
numOfEps
=
3
,
.
epAddr
=
{{
"dnode_1"
,
6030
},
{
"dnode_2"
,
6030
},
{
"dnode_3"
,
6030
}}}
);
meta_
[
db
][
stbname
]
->
vgs
.
emplace_back
(
vgroup
);
}
void
showTables
()
const
{
...
...
source/libs/planner/src/physicalPlan.c
浏览文件 @
859c74a4
...
...
@@ -251,24 +251,9 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
return
subplan
;
}
static
void
vgroupInfoToEpSet
(
const
SVgroupInfo
*
vg
,
SQueryNodeAddr
*
execNode
)
{
execNode
->
nodeId
=
vg
->
vgId
;
execNode
->
inUse
=
vg
->
inUse
;
execNode
->
numOfEps
=
vg
->
numOfEps
;
for
(
int8_t
i
=
0
;
i
<
vg
->
numOfEps
;
++
i
)
{
execNode
->
epAddr
[
i
]
=
vg
->
epAddr
[
i
];
}
return
;
}
static
void
vgroupMsgToEpSet
(
const
SVgroupMsg
*
vg
,
SQueryNodeAddr
*
execNode
)
{
execNode
->
nodeId
=
vg
->
vgId
;
execNode
->
inUse
=
0
;
// todo
execNode
->
numOfEps
=
vg
->
numOfEps
;
for
(
int8_t
i
=
0
;
i
<
vg
->
numOfEps
;
++
i
)
{
execNode
->
epAddr
[
i
]
=
vg
->
epAddr
[
i
];
}
return
;
static
void
vgroupInfoToNodeAddr
(
const
SVgroupInfo
*
vg
,
SQueryNodeAddr
*
pNodeAddr
)
{
pNodeAddr
->
nodeId
=
vg
->
vgId
;
pNodeAddr
->
epset
=
vg
->
epset
;
}
static
uint64_t
splitSubplanByTable
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
,
SQueryTableInfo
*
pTableInfo
)
{
...
...
@@ -277,7 +262,8 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod
STORE_CURRENT_SUBPLAN
(
pCxt
);
SSubplan
*
subplan
=
initSubplan
(
pCxt
,
QUERY_TYPE_SCAN
);
subplan
->
msgType
=
TDMT_VND_QUERY
;
vgroupMsgToEpSet
(
&
(
pTableInfo
->
pMeta
->
vgroupList
->
vgroups
[
i
]),
&
subplan
->
execNode
);
vgroupInfoToNodeAddr
(
&
(
pTableInfo
->
pMeta
->
vgroupList
->
vgroups
[
i
]),
&
subplan
->
execNode
);
subplan
->
pNode
=
createMultiTableScanNode
(
pPlanNode
,
pTableInfo
);
subplan
->
pDataSink
=
createDataDispatcher
(
pCxt
,
pPlanNode
,
subplan
->
pNode
);
RECOVERY_CURRENT_SUBPLAN
(
pCxt
);
...
...
@@ -297,11 +283,12 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) {
return
(
TSDB_SUPER_TABLE
==
pTable
->
pMeta
->
pTableMeta
->
tableType
);
}
static
SPhyNode
*
createSingleTableScanNode
(
SQueryPlanNode
*
pPlanNode
,
SQueryTableInfo
*
pTable
,
SSubplan
*
subplan
)
{
vgroupMsgToEpSet
(
&
(
pTable
->
pMeta
->
vgroupList
->
vgroups
[
0
]),
&
subplan
->
execNode
);
// TODO: the SVgroupInfo index
static
SPhyNode
*
createSingleTableScanNode
(
SQueryPlanNode
*
pPlanNode
,
SQueryTableInfo
*
pTableInfo
,
SSubplan
*
subplan
)
{
SVgroupsInfo
*
pVgroupsInfo
=
pTableInfo
->
pMeta
->
vgroupList
;
vgroupInfoToNodeAddr
(
&
(
pVgroupsInfo
->
vgroups
[
0
]),
&
subplan
->
execNode
);
int32_t
type
=
(
pPlanNode
->
info
.
type
==
QNODE_TABLESCAN
)
?
OP_TableScan
:
OP_StreamScan
;
return
createUserTableScanNode
(
pPlanNode
,
pTable
,
type
);
return
createUserTableScanNode
(
pPlanNode
,
pTable
Info
,
type
);
}
static
SPhyNode
*
createTableScanNode
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
)
{
...
...
@@ -374,7 +361,7 @@ static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlan
SSubplan
*
subplan
=
initSubplan
(
pCxt
,
QUERY_TYPE_MODIFY
);
SVgDataBlocks
*
blocks
=
(
SVgDataBlocks
*
)
taosArrayGetP
(
pPayload
->
payload
,
i
);
vgroupInfoToEpSet
(
&
blocks
->
vg
,
&
subplan
->
execNode
)
;
subplan
->
execNode
.
epset
=
blocks
->
vg
.
epset
;
subplan
->
pDataSink
=
createDataInserter
(
pCxt
,
blocks
,
NULL
);
subplan
->
pNode
=
NULL
;
subplan
->
type
=
QUERY_TYPE_MODIFY
;
...
...
source/libs/planner/src/physicalPlanJson.c
浏览文件 @
859c74a4
...
...
@@ -736,7 +736,7 @@ static const char* jkEpAddrFqdn = "Fqdn";
static
const
char
*
jkEpAddrPort
=
"Port"
;
static
bool
epAddrToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
const
SEp
Addr
*
ep
=
(
const
SEpAddr
*
)
obj
;
const
SEp
*
ep
=
(
const
SEp
*
)
obj
;
bool
res
=
cJSON_AddStringToObject
(
json
,
jkEpAddrFqdn
,
ep
->
fqdn
);
if
(
res
)
{
res
=
cJSON_AddNumberToObject
(
json
,
jkEpAddrPort
,
ep
->
port
);
...
...
@@ -745,7 +745,7 @@ static bool epAddrToJson(const void* obj, cJSON* json) {
}
static
bool
epAddrFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
SEp
Addr
*
ep
=
(
SEpAddr
*
)
obj
;
SEp
*
ep
=
(
SEp
*
)
obj
;
copyString
(
json
,
jkEpAddrFqdn
,
ep
->
fqdn
);
ep
->
port
=
getNumber
(
json
,
jkEpAddrPort
);
return
true
;
...
...
@@ -763,11 +763,11 @@ static bool queryNodeAddrToJson(const void* obj, cJSON* json) {
bool
res
=
cJSON_AddNumberToObject
(
json
,
jkNodeAddrId
,
pAddr
->
nodeId
);
if
(
res
)
{
res
=
cJSON_AddNumberToObject
(
json
,
jkNodeAddrInUse
,
pAddr
->
inUse
);
res
=
cJSON_AddNumberToObject
(
json
,
jkNodeAddrInUse
,
pAddr
->
epset
.
inUse
);
}
if
(
res
)
{
res
=
addRawArray
(
json
,
jkNodeAddrEpAddrs
,
epAddrToJson
,
pAddr
->
ep
Addr
,
sizeof
(
SEpAddr
),
pAddr
->
numOfEps
);
res
=
addRawArray
(
json
,
jkNodeAddrEpAddrs
,
epAddrToJson
,
pAddr
->
ep
set
.
eps
,
sizeof
(
SEp
),
pAddr
->
epset
.
numOfEps
);
}
return
res
;
}
...
...
@@ -776,11 +776,11 @@ static bool queryNodeAddrFromJson(const cJSON* json, void* obj) {
SQueryNodeAddr
*
pAddr
=
(
SQueryNodeAddr
*
)
obj
;
pAddr
->
nodeId
=
getNumber
(
json
,
jkNodeAddrId
);
pAddr
->
inUse
=
getNumber
(
json
,
jkNodeAddrInUse
);
pAddr
->
epset
.
inUse
=
getNumber
(
json
,
jkNodeAddrInUse
);
int32_t
numOfEps
=
0
;
bool
res
=
fromRawArray
(
json
,
jkNodeAddrEpAddrs
,
epAddrFromJson
,
pAddr
->
ep
Addr
,
sizeof
(
SEpAddr
),
&
numOfEps
);
pAddr
->
numOfEps
=
numOfEps
;
bool
res
=
fromRawArray
(
json
,
jkNodeAddrEpAddrs
,
epAddrFromJson
,
pAddr
->
ep
set
.
eps
,
sizeof
(
SEp
),
&
numOfEps
);
pAddr
->
epset
.
numOfEps
=
numOfEps
;
return
res
;
}
...
...
source/libs/planner/test/phyPlanTests.cpp
浏览文件 @
859c74a4
...
...
@@ -124,12 +124,10 @@ private:
}
void
copyStorageMeta
(
SVgroupsInfo
**
dst
,
const
std
::
vector
<
SVgroupInfo
>&
src
)
{
*
dst
=
(
SVgroupsInfo
*
)
myCalloc
(
1
,
sizeof
(
SVgroupsInfo
)
+
sizeof
(
SVgroup
Msg
)
*
src
.
size
());
*
dst
=
(
SVgroupsInfo
*
)
myCalloc
(
1
,
sizeof
(
SVgroupsInfo
)
+
sizeof
(
SVgroup
Info
)
*
src
.
size
());
(
*
dst
)
->
numOfVgroups
=
src
.
size
();
for
(
int32_t
i
=
0
;
i
<
src
.
size
();
++
i
)
{
(
*
dst
)
->
vgroups
[
i
].
vgId
=
src
[
i
].
vgId
;
(
*
dst
)
->
vgroups
[
i
].
numOfEps
=
src
[
i
].
numOfEps
;
memcpy
((
*
dst
)
->
vgroups
[
i
].
epAddr
,
src
[
i
].
epAddr
,
src
[
i
].
numOfEps
);
(
*
dst
)
->
vgroups
[
i
]
=
src
[
i
];
}
}
...
...
source/libs/qcom/src/querymsg.c
浏览文件 @
859c74a4
...
...
@@ -127,8 +127,8 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
pRsp
->
vgroupInfo
[
i
].
hashBegin
=
ntohl
(
pRsp
->
vgroupInfo
[
i
].
hashBegin
);
pRsp
->
vgroupInfo
[
i
].
hashEnd
=
ntohl
(
pRsp
->
vgroupInfo
[
i
].
hashEnd
);
for
(
int32_t
n
=
0
;
n
<
pRsp
->
vgroupInfo
[
i
].
numOfEps
;
++
n
)
{
pRsp
->
vgroupInfo
[
i
].
ep
Addr
[
n
].
port
=
ntohs
(
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
);
for
(
int32_t
n
=
0
;
n
<
pRsp
->
vgroupInfo
[
i
].
epset
.
numOfEps
;
++
n
)
{
pRsp
->
vgroupInfo
[
i
].
ep
set
.
eps
[
n
].
port
=
ntohs
(
pRsp
->
vgroupInfo
[
i
].
epset
.
eps
[
n
].
port
);
}
if
(
0
!=
taosHashPut
(
pOut
->
dbVgroup
.
vgInfo
,
&
pRsp
->
vgroupInfo
[
i
].
vgId
,
sizeof
(
pRsp
->
vgroupInfo
[
i
].
vgId
),
&
pRsp
->
vgroupInfo
[
i
],
sizeof
(
pRsp
->
vgroupInfo
[
i
])))
{
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
859c74a4
...
...
@@ -417,13 +417,13 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
if
(
pTask
->
plan
->
execNode
.
numOfEps
>
0
)
{
if
(
pTask
->
plan
->
execNode
.
epset
.
numOfEps
>
0
)
{
if
(
NULL
==
taosArrayPush
(
pTask
->
candidateAddrs
,
&
pTask
->
plan
->
execNode
))
{
SCH_TASK_ELOG
(
"taosArrayPush execNode to candidate addrs failed, errno:%d"
,
errno
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SCH_TASK_DLOG
(
"use execNode from plan as candidate addr, numOfEps:%d"
,
pTask
->
plan
->
execNode
.
numOfEps
);
SCH_TASK_DLOG
(
"use execNode from plan as candidate addr, numOfEps:%d"
,
pTask
->
plan
->
execNode
.
epset
.
numOfEps
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -446,7 +446,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
}
if
(
addNum
<=
0
)
{
SCH_TASK_ELOG
(
"no available execNode as candidate
addr
, nodeNum:%d"
,
nodeNum
);
SCH_TASK_ELOG
(
"no available execNode as candidate
s
, nodeNum:%d"
,
nodeNum
);
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
...
...
@@ -1050,31 +1050,19 @@ _return:
SCH_RET
(
code
);
}
void
schConvertAddrToEpSet
(
SQueryNodeAddr
*
addr
,
SEpSet
*
epSet
)
{
epSet
->
inUse
=
addr
->
inUse
;
epSet
->
numOfEps
=
addr
->
numOfEps
;
for
(
int8_t
i
=
0
;
i
<
epSet
->
numOfEps
;
++
i
)
{
strncpy
(
epSet
->
fqdn
[
i
],
addr
->
epAddr
[
i
].
fqdn
,
sizeof
(
addr
->
epAddr
[
i
].
fqdn
));
epSet
->
port
[
i
]
=
addr
->
epAddr
[
i
].
port
;
}
}
int32_t
schBuildAndSendMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeAddr
*
addr
,
int32_t
msgType
)
{
uint32_t
msgSize
=
0
;
void
*
msg
=
NULL
;
int32_t
code
=
0
;
bool
isCandidateAddr
=
false
;
SEpSet
epSet
;
if
(
NULL
==
addr
)
{
addr
=
taosArrayGet
(
pTask
->
candidateAddrs
,
atomic_load_8
(
&
pTask
->
candidateIdx
));
isCandidateAddr
=
true
;
}
schConvertAddrToEpSet
(
addr
,
&
epSet
)
;
SEpSet
epSet
=
addr
->
epset
;
switch
(
msgType
)
{
case
TDMT_VND_CREATE_TABLE
:
case
TDMT_VND_SUBMIT
:
{
...
...
@@ -1218,8 +1206,6 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
SCH_TASK_ELOG
(
"subplanToString error, code:%x, msg:%p, len:%d"
,
code
,
pTask
->
msg
,
pTask
->
msgLen
);
SCH_ERR_JRET
(
code
);
}
// printf("physical plan:%s\n", pTask->msg);
}
SCH_ERR_JRET
(
schSetTaskCandidateAddrs
(
pJob
,
pTask
));
...
...
@@ -1300,7 +1286,7 @@ void schDropJobAllTasks(SSchJob *pJob) {
int32_t
schExecJobImpl
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryDag
*
pDag
,
struct
SSchJob
**
job
,
bool
syncSchedule
)
{
qDebug
(
"QID:0x%"
PRIx64
" job started"
,
pDag
->
queryId
);
if
(
pNodeList
&&
taosArrayGetSize
(
pNodeList
)
<=
0
)
{
if
(
pNodeList
==
NULL
||
(
pNodeList
&&
taosArrayGetSize
(
pNodeList
)
<=
0
)
)
{
qDebug
(
"QID:0x%"
PRIx64
" input exec nodeList is empty"
,
pDag
->
queryId
);
}
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
859c74a4
...
...
@@ -29,7 +29,6 @@
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wliteral-suffix"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
...
...
@@ -92,11 +91,11 @@ void schtBuildQueryDag(SQueryDag *dag) {
scanPlan
->
id
.
templateId
=
0x0000000000000002
;
scanPlan
->
id
.
subplanId
=
0x0000000000000003
;
scanPlan
->
type
=
QUERY_TYPE_SCAN
;
scanPlan
->
execNode
.
numOfEps
=
1
;
scanPlan
->
execNode
.
nodeId
=
1
;
scanPlan
->
execNode
.
inUse
=
0
;
scanPlan
->
execNode
.
epAddr
[
0
].
port
=
6030
;
strcpy
(
scanPlan
->
execNode
.
epAddr
[
0
].
fqdn
,
"ep0"
);
scanPlan
->
execNode
.
epset
.
inUse
=
0
;
addEpIntoEpSet
(
&
scanPlan
->
execNode
.
epset
,
"ep0"
,
6030
)
;
scanPlan
->
pChildren
=
NULL
;
scanPlan
->
level
=
1
;
scanPlan
->
pParents
=
taosArrayInit
(
1
,
POINTER_BYTES
);
...
...
@@ -108,7 +107,8 @@ void schtBuildQueryDag(SQueryDag *dag) {
mergePlan
->
id
.
subplanId
=
0x5555555555
;
mergePlan
->
type
=
QUERY_TYPE_MERGE
;
mergePlan
->
level
=
0
;
mergePlan
->
execNode
.
numOfEps
=
0
;
mergePlan
->
execNode
.
epset
.
numOfEps
=
0
;
mergePlan
->
pChildren
=
taosArrayInit
(
1
,
POINTER_BYTES
);
mergePlan
->
pParents
=
NULL
;
mergePlan
->
pNode
=
(
SPhyNode
*
)
calloc
(
1
,
sizeof
(
SPhyNode
));
...
...
@@ -144,11 +144,11 @@ void schtBuildInsertDag(SQueryDag *dag) {
insertPlan
[
0
].
id
.
subplanId
=
0x0000000000000004
;
insertPlan
[
0
].
type
=
QUERY_TYPE_MODIFY
;
insertPlan
[
0
].
level
=
0
;
insertPlan
[
0
].
execNode
.
numOfEps
=
1
;
insertPlan
[
0
].
execNode
.
nodeId
=
1
;
insertPlan
[
0
].
execNode
.
inUse
=
0
;
insertPlan
[
0
].
execNode
.
epAddr
[
0
].
port
=
6030
;
strcpy
(
insertPlan
[
0
].
execNode
.
epAddr
[
0
].
fqdn
,
"ep0"
);
insertPlan
[
0
].
execNode
.
epset
.
inUse
=
0
;
addEpIntoEpSet
(
&
insertPlan
[
0
].
execNode
.
epset
,
"ep0"
,
6030
)
;
insertPlan
[
0
].
pChildren
=
NULL
;
insertPlan
[
0
].
pParents
=
NULL
;
insertPlan
[
0
].
pNode
=
NULL
;
...
...
@@ -160,11 +160,11 @@ void schtBuildInsertDag(SQueryDag *dag) {
insertPlan
[
1
].
id
.
subplanId
=
0x0000000000000005
;
insertPlan
[
1
].
type
=
QUERY_TYPE_MODIFY
;
insertPlan
[
1
].
level
=
0
;
insertPlan
[
1
].
execNode
.
numOfEps
=
1
;
insertPlan
[
1
].
execNode
.
nodeId
=
1
;
insertPlan
[
1
].
execNode
.
inUse
=
1
;
insertPlan
[
1
].
execNode
.
epAddr
[
0
].
port
=
6030
;
strcpy
(
insertPlan
[
1
].
execNode
.
epAddr
[
0
].
fqdn
,
"ep1"
);
insertPlan
[
1
].
execNode
.
epset
.
inUse
=
0
;
addEpIntoEpSet
(
&
insertPlan
[
1
].
execNode
.
epset
,
"ep0"
,
6030
)
;
insertPlan
[
1
].
pChildren
=
NULL
;
insertPlan
[
1
].
pParents
=
NULL
;
insertPlan
[
1
].
pNode
=
NULL
;
...
...
@@ -371,9 +371,9 @@ void* schtRunJobThread(void *aa) {
while
(
!
schtTestStop
)
{
schtBuildQueryDag
(
&
dag
);
SArray
*
qnodeList
=
taosArrayInit
(
1
,
sizeof
(
SEp
Addr
));
SArray
*
qnodeList
=
taosArrayInit
(
1
,
sizeof
(
SEp
));
SEp
Addr
qnodeAddr
=
{
0
};
SEp
qnodeAddr
=
{
0
};
strcpy
(
qnodeAddr
.
fqdn
,
"qnode0.ep"
);
qnodeAddr
.
port
=
6031
;
taosArrayPush
(
qnodeList
,
&
qnodeAddr
);
...
...
@@ -523,9 +523,9 @@ TEST(queryTest, normalCase) {
schtInitLogFile
();
SArray
*
qnodeList
=
taosArrayInit
(
1
,
sizeof
(
SEp
Addr
));
SArray
*
qnodeList
=
taosArrayInit
(
1
,
sizeof
(
SEp
));
SEp
Addr
qnodeAddr
=
{
0
};
SEp
qnodeAddr
=
{
0
};
strcpy
(
qnodeAddr
.
fqdn
,
"qnode0.ep"
);
qnodeAddr
.
port
=
6031
;
taosArrayPush
(
qnodeList
,
&
qnodeAddr
);
...
...
@@ -627,9 +627,9 @@ TEST(insertTest, normalCase) {
schtInitLogFile
();
SArray
*
qnodeList
=
taosArrayInit
(
1
,
sizeof
(
SEp
Addr
));
SArray
*
qnodeList
=
taosArrayInit
(
1
,
sizeof
(
SEp
));
SEp
Addr
qnodeAddr
=
{
0
};
SEp
qnodeAddr
=
{
0
};
strcpy
(
qnodeAddr
.
fqdn
,
"qnode0.ep"
);
qnodeAddr
.
port
=
6031
;
taosArrayPush
(
qnodeList
,
&
qnodeAddr
);
...
...
source/libs/transport/src/rpcMain.c
浏览文件 @
859c74a4
...
...
@@ -814,9 +814,9 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
SEpSet
*
pEpSet
=
&
pContext
->
epSet
;
pConn
=
rpcGetConnFromCache
(
pRpc
->
pCache
,
pEpSet
->
fqdn
[
pEpSet
->
inUse
],
pEpSet
->
port
[
pEpSet
->
inUse
]
,
pContext
->
connType
);
rpcGetConnFromCache
(
pRpc
->
pCache
,
pEpSet
->
eps
[
pEpSet
->
inUse
].
fqdn
,
pEpSet
->
eps
[
pEpSet
->
inUse
].
port
,
pContext
->
connType
);
if
(
pConn
==
NULL
||
pConn
->
user
[
0
]
==
0
)
{
pConn
=
rpcOpenConn
(
pRpc
,
pEpSet
->
fqdn
[
pEpSet
->
inUse
],
pEpSet
->
port
[
pEpSet
->
inUse
]
,
pContext
->
connType
);
pConn
=
rpcOpenConn
(
pRpc
,
pEpSet
->
eps
[
pEpSet
->
inUse
].
fqdn
,
pEpSet
->
eps
[
pEpSet
->
inUse
].
port
,
pContext
->
connType
);
}
if
(
pConn
)
{
...
...
@@ -1188,7 +1188,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
// for UDP, port may be changed by server, the port in epSet shall be used for cache
if
(
pHead
->
code
!=
TSDB_CODE_RPC_TOO_SLOW
)
{
rpcAddConnIntoCache
(
pRpc
->
pCache
,
pConn
,
pConn
->
peerFqdn
,
pContext
->
epSet
.
port
[
pContext
->
epSet
.
inUse
]
,
rpcAddConnIntoCache
(
pRpc
->
pCache
,
pConn
,
pConn
->
peerFqdn
,
pContext
->
epSet
.
eps
[
pContext
->
epSet
.
inUse
].
port
,
pConn
->
connType
);
}
else
{
rpcCloseConn
(
pConn
);
...
...
@@ -1202,9 +1202,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
tDebug
(
"%s, redirect is received, numOfEps:%d inUse:%d"
,
pConn
->
info
,
pContext
->
epSet
.
numOfEps
,
pContext
->
epSet
.
inUse
);
for
(
int
i
=
0
;
i
<
pContext
->
epSet
.
numOfEps
;
++
i
)
{
pContext
->
epSet
.
port
[
i
]
=
htons
(
pContext
->
epSet
.
port
[
i
]
);
tDebug
(
"%s, redirect is received, index:%d ep:%s:%u"
,
pConn
->
info
,
i
,
pContext
->
epSet
.
fqdn
[
i
]
,
pContext
->
epSet
.
port
[
i
]
);
pContext
->
epSet
.
eps
[
i
].
port
=
htons
(
pContext
->
epSet
.
eps
[
i
].
port
);
tDebug
(
"%s, redirect is received, index:%d ep:%s:%u"
,
pConn
->
info
,
i
,
pContext
->
epSet
.
eps
[
i
].
fqdn
,
pContext
->
epSet
.
eps
[
i
].
port
);
}
}
rpcSendReqToServer
(
pRpc
,
pContext
);
...
...
source/libs/transport/test/rclient.c
浏览文件 @
859c74a4
...
...
@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <tep.h>
#include "os.h"
#include "rpcLog.h"
#include "taoserror.h"
...
...
@@ -86,12 +87,9 @@ int main(int argc, char *argv[]) {
pthread_attr_t
thattr
;
// server info
epSet
.
numOfEps
=
1
;
epSet
.
inUse
=
0
;
epSet
.
port
[
0
]
=
7000
;
epSet
.
port
[
1
]
=
7000
;
strcpy
(
epSet
.
fqdn
[
0
],
serverIp
);
strcpy
(
epSet
.
fqdn
[
1
],
"192.168.0.1"
);
addEpIntoEpSet
(
&
epSet
,
serverIp
,
7000
);
addEpIntoEpSet
(
&
epSet
,
"192.168.0.1"
,
7000
);
// client info
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
...
...
@@ -109,9 +107,9 @@ int main(int argc, char *argv[]) {
for
(
int
i
=
1
;
i
<
argc
;
++
i
)
{
if
(
strcmp
(
argv
[
i
],
"-p"
)
==
0
&&
i
<
argc
-
1
)
{
epSet
.
port
[
0
]
=
atoi
(
argv
[
++
i
]);
epSet
.
eps
[
0
].
port
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-i"
)
==
0
&&
i
<
argc
-
1
)
{
tstrncpy
(
epSet
.
fqdn
[
0
],
argv
[
++
i
],
sizeof
(
epSet
.
fqdn
[
0
]
));
tstrncpy
(
epSet
.
eps
[
0
].
fqdn
,
argv
[
++
i
],
sizeof
(
epSet
.
eps
[
0
].
fqdn
));
}
else
if
(
strcmp
(
argv
[
i
],
"-t"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
numOfThreads
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-m"
)
==
0
&&
i
<
argc
-
1
)
{
...
...
@@ -135,7 +133,7 @@ int main(int argc, char *argv[]) {
}
else
{
printf
(
"
\n
usage: %s [options]
\n
"
,
argv
[
0
]);
printf
(
" [-i ip]: first server IP address, default is:%s
\n
"
,
serverIp
);
printf
(
" [-p port]: server port number, default is:%d
\n
"
,
epSet
.
port
[
0
]
);
printf
(
" [-p port]: server port number, default is:%d
\n
"
,
epSet
.
eps
[
0
].
port
);
printf
(
" [-t threads]: number of rpc threads, default is:%d
\n
"
,
rpcInit
.
numOfThreads
);
printf
(
" [-s sessions]: number of rpc sessions, default is:%d
\n
"
,
rpcInit
.
sessions
);
printf
(
" [-m msgSize]: message body size, default is:%d
\n
"
,
msgSize
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录