Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ad2d2c7c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ad2d2c7c
编写于
4月 21, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/TD-14481-3.0
上级
e3d32ca9
ec8f6a5c
变更
12
显示空白变更内容
内联
并排
Showing
12 changed file
with
161 addition
and
125 deletion
+161
-125
include/common/tmsg.h
include/common/tmsg.h
+3
-1
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+0
-1
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+31
-24
source/common/src/tmsg.c
source/common/src/tmsg.c
+5
-3
source/dnode/mgmt/implement/src/dmTransport.c
source/dnode/mgmt/implement/src/dmTransport.c
+4
-4
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+49
-42
source/dnode/mnode/impl/test/profile/profile.cpp
source/dnode/mnode/impl/test/profile/profile.cpp
+13
-0
source/dnode/mnode/impl/test/show/show.cpp
source/dnode/mnode/impl/test/show/show.cpp
+7
-1
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+12
-11
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+3
-22
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+33
-15
tests/script/runAllSimCases.sh
tests/script/runAllSimCases.sh
+1
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
ad2d2c7c
...
...
@@ -331,6 +331,8 @@ typedef struct {
int32_t
pid
;
char
app
[
TSDB_APP_NAME_LEN
];
char
db
[
TSDB_DB_NAME_LEN
];
char
user
[
TSDB_USER_LEN
];
char
passwd
[
TSDB_PASSWORD_LEN
];
int64_t
startTime
;
}
SConnectReq
;
...
...
source/client/inc/clientInt.h
浏览文件 @
ad2d2c7c
...
...
@@ -306,7 +306,6 @@ void hbMgrInitMqHbRspHandle();
SRequestObj
*
launchQueryImpl
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
int32_t
code
,
bool
keepQuery
);
#ifdef __cplusplus
}
#endif
...
...
source/client/src/clientImpl.c
浏览文件 @
ad2d2c7c
...
...
@@ -146,7 +146,8 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj*
(
*
pRequest
)
->
sqlstr
[
sqlLen
]
=
0
;
(
*
pRequest
)
->
sqlLen
=
sqlLen
;
if
(
taosHashPut
(
pTscObj
->
pRequests
,
&
(
*
pRequest
)
->
self
,
sizeof
((
*
pRequest
)
->
self
),
&
(
*
pRequest
)
->
self
,
sizeof
((
*
pRequest
)
->
self
)))
{
if
(
taosHashPut
(
pTscObj
->
pRequests
,
&
(
*
pRequest
)
->
self
,
sizeof
((
*
pRequest
)
->
self
),
&
(
*
pRequest
)
->
self
,
sizeof
((
*
pRequest
)
->
self
)))
{
destroyRequest
(
*
pRequest
);
*
pRequest
=
NULL
;
tscError
(
"put request to request hash failed"
);
...
...
@@ -263,7 +264,8 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
}
void
setResPrecision
(
SReqResultInfo
*
pResInfo
,
int32_t
precision
)
{
if
(
precision
!=
TSDB_TIME_PRECISION_MILLI
&&
precision
!=
TSDB_TIME_PRECISION_MICRO
&&
precision
!=
TSDB_TIME_PRECISION_NANO
)
{
if
(
precision
!=
TSDB_TIME_PRECISION_MILLI
&&
precision
!=
TSDB_TIME_PRECISION_MICRO
&&
precision
!=
TSDB_TIME_PRECISION_NANO
)
{
return
;
}
...
...
@@ -336,7 +338,6 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
return
pRequest
;
}
SRequestObj
*
launchQuery
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
)
{
SRequestObj
*
pRequest
=
NULL
;
SQuery
*
pQuery
=
NULL
;
...
...
@@ -522,6 +523,8 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType) {
connectReq
.
pid
=
htonl
(
appInfo
.
pid
);
connectReq
.
startTime
=
htobe64
(
appInfo
.
startTime
);
tstrncpy
(
connectReq
.
app
,
appInfo
.
appName
,
sizeof
(
connectReq
.
app
));
tstrncpy
(
connectReq
.
user
,
pObj
->
user
,
sizeof
(
connectReq
.
user
));
tstrncpy
(
connectReq
.
passwd
,
pObj
->
pass
,
sizeof
(
connectReq
.
passwd
));
int32_t
contLen
=
tSerializeSConnectReq
(
NULL
,
0
,
&
connectReq
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
...
...
@@ -751,44 +754,48 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
char
*
pStart
=
pCol
->
offset
[
j
]
+
pCol
->
pData
;
int32_t
jsonInnerType
=
*
pStart
;
char
*
jsonInnerData
=
pStart
+
CHAR_BYTES
;
char
*
jsonInnerData
=
pStart
+
CHAR_BYTES
;
char
dst
[
TSDB_MAX_JSON_TAG_LEN
]
=
{
0
};
if
(
jsonInnerType
==
TSDB_DATA_TYPE_NULL
)
{
if
(
jsonInnerType
==
TSDB_DATA_TYPE_NULL
)
{
sprintf
(
varDataVal
(
dst
),
"%s"
,
TSDB_DATA_NULL_STR_L
);
varDataSetLen
(
dst
,
strlen
(
varDataVal
(
dst
)));
}
else
if
(
jsonInnerType
==
TSDB_DATA_TYPE_JSON
){
int32_t
length
=
taosUcs4ToMbs
((
TdUcs4
*
)
varDataVal
(
jsonInnerData
),
varDataLen
(
jsonInnerData
),
varDataVal
(
dst
));
}
else
if
(
jsonInnerType
==
TSDB_DATA_TYPE_JSON
)
{
int32_t
length
=
taosUcs4ToMbs
((
TdUcs4
*
)
varDataVal
(
jsonInnerData
),
varDataLen
(
jsonInnerData
),
varDataVal
(
dst
));
if
(
length
<=
0
)
{
tscError
(
"charset:%s to %s. val:%s convert failed."
,
DEFAULT_UNICODE_ENCODEC
,
tsCharset
,
varDataVal
(
jsonInnerData
));
tscError
(
"charset:%s to %s. val:%s convert failed."
,
DEFAULT_UNICODE_ENCODEC
,
tsCharset
,
varDataVal
(
jsonInnerData
));
length
=
0
;
}
varDataSetLen
(
dst
,
length
);
}
else
if
(
jsonInnerType
==
TSDB_DATA_TYPE_NCHAR
)
{
// value -> "value"
}
else
if
(
jsonInnerType
==
TSDB_DATA_TYPE_NCHAR
)
{
// value -> "value"
*
(
char
*
)
varDataVal
(
dst
)
=
'\"'
;
int32_t
length
=
taosUcs4ToMbs
((
TdUcs4
*
)
varDataVal
(
jsonInnerData
),
varDataLen
(
jsonInnerData
),
varDataVal
(
dst
)
+
CHAR_BYTES
);
int32_t
length
=
taosUcs4ToMbs
((
TdUcs4
*
)
varDataVal
(
jsonInnerData
),
varDataLen
(
jsonInnerData
),
varDataVal
(
dst
)
+
CHAR_BYTES
);
if
(
length
<=
0
)
{
tscError
(
"charset:%s to %s. val:%s convert failed."
,
DEFAULT_UNICODE_ENCODEC
,
tsCharset
,
varDataVal
(
jsonInnerData
));
tscError
(
"charset:%s to %s. val:%s convert failed."
,
DEFAULT_UNICODE_ENCODEC
,
tsCharset
,
varDataVal
(
jsonInnerData
));
length
=
0
;
}
varDataSetLen
(
dst
,
length
+
CHAR_BYTES
*
2
);
varDataSetLen
(
dst
,
length
+
CHAR_BYTES
*
2
);
*
(
char
*
)(
varDataVal
(
dst
),
length
+
CHAR_BYTES
)
=
'\"'
;
}
else
if
(
jsonInnerType
==
TSDB_DATA_TYPE_DOUBLE
)
{
}
else
if
(
jsonInnerType
==
TSDB_DATA_TYPE_DOUBLE
)
{
double
jsonVd
=
*
(
double
*
)(
jsonInnerData
);
sprintf
(
varDataVal
(
dst
),
"%.9lf"
,
jsonVd
);
varDataSetLen
(
dst
,
strlen
(
varDataVal
(
dst
)));
}
else
if
(
jsonInnerType
==
TSDB_DATA_TYPE_BIGINT
)
{
}
else
if
(
jsonInnerType
==
TSDB_DATA_TYPE_BIGINT
)
{
int64_t
jsonVd
=
*
(
int64_t
*
)(
jsonInnerData
);
sprintf
(
varDataVal
(
dst
),
"%"
PRId64
,
jsonVd
);
varDataSetLen
(
dst
,
strlen
(
varDataVal
(
dst
)));
}
else
if
(
jsonInnerType
==
TSDB_DATA_TYPE_BOOL
)
{
sprintf
(
varDataVal
(
dst
),
"%s"
,
(
*
((
char
*
)
jsonInnerData
)
==
1
)
?
"true"
:
"false"
);
}
else
if
(
jsonInnerType
==
TSDB_DATA_TYPE_BOOL
)
{
sprintf
(
varDataVal
(
dst
),
"%s"
,
(
*
((
char
*
)
jsonInnerData
)
==
1
)
?
"true"
:
"false"
);
varDataSetLen
(
dst
,
strlen
(
varDataVal
(
dst
)));
}
else
{
}
else
{
ASSERT
(
0
);
}
if
(
len
+
varDataTLen
(
dst
)
>
colLength
[
i
])
{
if
(
len
+
varDataTLen
(
dst
)
>
colLength
[
i
])
{
p
=
taosMemoryRealloc
(
pResultInfo
->
convertBuf
[
i
],
len
+
varDataTLen
(
dst
));
if
(
p
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
source/common/src/tmsg.c
浏览文件 @
ad2d2c7c
...
...
@@ -2758,6 +2758,8 @@ int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
if
(
tEncodeI32
(
&
encoder
,
pReq
->
pid
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
app
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
db
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
user
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
passwd
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
startTime
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
...
...
@@ -2775,6 +2777,8 @@ int32_t tDeserializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
pid
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
app
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
db
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
user
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
passwd
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
startTime
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
...
...
@@ -3034,7 +3038,6 @@ int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq
return
0
;
}
int32_t
tSerializeSAlterVnodeReq
(
void
*
buf
,
int32_t
bufLen
,
SAlterVnodeReq
*
pReq
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
...
...
@@ -3087,7 +3090,6 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR
return
0
;
}
int32_t
tSerializeSKillQueryReq
(
void
*
buf
,
int32_t
bufLen
,
SKillQueryReq
*
pReq
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
...
...
source/dnode/mgmt/implement/src/dmTransport.c
浏览文件 @
ad2d2c7c
...
...
@@ -119,10 +119,10 @@ _OVER:
}
static
void
dmProcessMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
tmsg_t
msgType
=
pMsg
->
msgType
;
bool
isReq
=
msgType
&
1u
;
SMsgHandle
*
pHandle
=
&
pTrans
->
msgHandles
[
TMSG_INDEX
(
msgType
)];
SMsgHandle
*
pHandle
=
&
pTrans
->
msgHandles
[
TMSG_INDEX
(
msgType
)];
SMgmtWrapper
*
pWrapper
=
pHandle
->
pNdWrapper
;
if
(
msgType
==
TDMT_DND_SERVER_STATUS
)
{
...
...
@@ -443,7 +443,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
SAuthReq
authReq
=
{
0
};
tstrncpy
(
authReq
.
user
,
user
,
TSDB_USER_LEN
);
int32_t
contLen
=
tSerializeSAuthReq
(
NULL
,
0
,
&
authReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSAuthReq
(
pReq
,
contLen
,
&
authReq
);
SRpcMsg
rpcMsg
=
{.
pCont
=
pReq
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_MND_AUTH
,
.
ahandle
=
(
void
*
)
9528
};
...
...
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
ad2d2c7c
...
...
@@ -37,7 +37,7 @@ typedef struct {
int64_t
lastAccessTimeMs
;
uint64_t
killId
;
int32_t
numOfQueries
;
SArray
*
pQueries
;
//
SArray<SQueryDesc>
SArray
*
pQueries
;
//
SArray<SQueryDesc>
}
SConnObj
;
static
SConnObj
*
mndCreateConn
(
SMnode
*
pMnode
,
const
char
*
user
,
int8_t
connType
,
uint32_t
ip
,
uint16_t
port
,
...
...
@@ -45,7 +45,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType
static
void
mndFreeConn
(
SConnObj
*
pConn
);
static
SConnObj
*
mndAcquireConn
(
SMnode
*
pMnode
,
uint32_t
connId
);
static
void
mndReleaseConn
(
SMnode
*
pMnode
,
SConnObj
*
pConn
);
static
void
*
mndGetNextConn
(
SMnode
*
pMnode
,
SCacheIter
*
pIter
);
static
void
*
mndGetNextConn
(
SMnode
*
pMnode
,
SCacheIter
*
pIter
);
static
void
mndCancelGetNextConn
(
SMnode
*
pMnode
,
void
*
pIter
);
static
int32_t
mndProcessHeartBeatReq
(
SNodeMsg
*
pReq
);
static
int32_t
mndProcessConnectReq
(
SNodeMsg
*
pReq
);
...
...
@@ -71,9 +71,9 @@ int32_t mndInitProfile(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_MND_KILL_QUERY
,
mndProcessKillQueryReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_KILL_CONN
,
mndProcessKillConnReq
);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_CONNS
,
mndCancelGetNextConn
);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_QUERIES
,
mndCancelGetNextQuery
);
return
0
;
...
...
@@ -174,10 +174,10 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
}
static
int32_t
mndProcessConnectReq
(
SNodeMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
SUserObj
*
pUser
=
NULL
;
SDbObj
*
pDb
=
NULL
;
SConnObj
*
pConn
=
NULL
;
SMnode
*
pMnode
=
pReq
->
pNode
;
SUserObj
*
pUser
=
NULL
;
SDbObj
*
pDb
=
NULL
;
SConnObj
*
pConn
=
NULL
;
int32_t
code
=
-
1
;
SConnectReq
connReq
=
{
0
};
char
ip
[
30
]
=
{
0
};
...
...
@@ -194,6 +194,11 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
mError
(
"user:%s, failed to login while acquire user since %s"
,
pReq
->
user
,
terrstr
());
goto
CONN_OVER
;
}
if
(
0
!=
strncmp
(
connReq
.
passwd
,
pUser
->
pass
,
TSDB_PASSWORD_LEN
-
1
))
{
mError
(
"user:%s, failed to auth while acquire user
\n
%s
\r\n
%s"
,
pReq
->
user
,
connReq
.
passwd
,
pUser
->
pass
);
code
=
TSDB_CODE_RPC_AUTH_FAILURE
;
goto
CONN_OVER
;
}
if
(
connReq
.
db
[
0
])
{
char
db
[
TSDB_DB_FNAME_LEN
];
...
...
@@ -324,7 +329,8 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
return
NULL
;
}
static
int32_t
mndProcessQueryHeartBeat
(
SMnode
*
pMnode
,
SRpcMsg
*
pMsg
,
SClientHbReq
*
pHbReq
,
SClientHbBatchRsp
*
pBatchRsp
)
{
static
int32_t
mndProcessQueryHeartBeat
(
SMnode
*
pMnode
,
SRpcMsg
*
pMsg
,
SClientHbReq
*
pHbReq
,
SClientHbBatchRsp
*
pBatchRsp
)
{
SProfileMgmt
*
pMgmt
=
&
pMnode
->
profileMgmt
;
SClientHbRsp
hbRsp
=
{.
connKey
=
pHbReq
->
connKey
,
.
status
=
0
,
.
info
=
NULL
,
.
query
=
NULL
};
...
...
@@ -336,7 +342,8 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
SConnObj
*
pConn
=
mndAcquireConn
(
pMnode
,
pBasic
->
connId
);
if
(
pConn
==
NULL
)
{
pConn
=
mndCreateConn
(
pMnode
,
connInfo
.
user
,
CONN_TYPE__QUERY
,
connInfo
.
clientIp
,
connInfo
.
clientPort
,
pBasic
->
pid
,
pBasic
->
app
,
0
);
pConn
=
mndCreateConn
(
pMnode
,
connInfo
.
user
,
CONN_TYPE__QUERY
,
connInfo
.
clientIp
,
connInfo
.
clientPort
,
pBasic
->
pid
,
pBasic
->
app
,
0
);
if
(
pConn
==
NULL
)
{
mError
(
"user:%s, conn:%u is freed and failed to create new since %s"
,
connInfo
.
user
,
pBasic
->
connId
,
terrstr
());
return
-
1
;
...
...
@@ -369,8 +376,8 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
}
rspBasic
->
connId
=
pConn
->
id
;
rspBasic
->
totalDnodes
=
1
;
//
TODO
rspBasic
->
onlineDnodes
=
1
;
//
TODO
rspBasic
->
totalDnodes
=
1
;
//
TODO
rspBasic
->
onlineDnodes
=
1
;
//
TODO
mndGetMnodeEpSet
(
pMnode
,
&
rspBasic
->
epSet
);
mndReleaseConn
(
pMnode
,
pConn
);
...
...
@@ -396,7 +403,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
switch
(
kv
->
key
)
{
case
HEARTBEAT_KEY_DBINFO
:
{
void
*
rspMsg
=
NULL
;
void
*
rspMsg
=
NULL
;
int32_t
rspLen
=
0
;
mndValidateDbInfo
(
pMnode
,
kv
->
value
,
kv
->
valueLen
/
sizeof
(
SDbVgVersion
),
&
rspMsg
,
&
rspLen
);
if
(
rspMsg
&&
rspLen
>
0
)
{
...
...
@@ -406,7 +413,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
break
;
}
case
HEARTBEAT_KEY_STBINFO
:
{
void
*
rspMsg
=
NULL
;
void
*
rspMsg
=
NULL
;
int32_t
rspLen
=
0
;
mndValidateStbInfo
(
pMnode
,
kv
->
value
,
kv
->
valueLen
/
sizeof
(
SSTableMetaVersion
),
&
rspMsg
,
&
rspLen
);
if
(
rspMsg
&&
rspLen
>
0
)
{
...
...
@@ -457,7 +464,7 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
taosArrayDestroyEx
(
batchReq
.
reqs
,
tFreeClientHbReq
);
int32_t
tlen
=
tSerializeSClientHbBatchRsp
(
NULL
,
0
,
&
batchRsp
);
void
*
buf
=
rpcMallocCont
(
tlen
);
void
*
buf
=
rpcMallocCont
(
tlen
);
tSerializeSClientHbBatchRsp
(
buf
,
tlen
,
&
batchRsp
);
int32_t
rspNum
=
(
int32_t
)
taosArrayGetSize
(
batchRsp
.
rsps
);
...
...
@@ -479,7 +486,7 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
}
static
int32_t
mndProcessKillQueryReq
(
SNodeMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
SMnode
*
pMnode
=
pReq
->
pNode
;
SProfileMgmt
*
pMgmt
=
&
pMnode
->
profileMgmt
;
SUserObj
*
pUser
=
mndAcquireUser
(
pMnode
,
pReq
->
user
);
...
...
@@ -513,7 +520,7 @@ static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) {
}
static
int32_t
mndProcessKillConnReq
(
SNodeMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
SMnode
*
pMnode
=
pReq
->
pNode
;
SProfileMgmt
*
pMgmt
=
&
pMnode
->
profileMgmt
;
SUserObj
*
pUser
=
mndAcquireUser
(
pMnode
,
pReq
->
user
);
...
...
@@ -545,11 +552,11 @@ static int32_t mndProcessKillConnReq(SNodeMsg *pReq) {
}
static
int32_t
mndRetrieveConns
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
SMnode
*
pMnode
=
pReq
->
pNode
;
int32_t
numOfRows
=
0
;
SConnObj
*
pConn
=
NULL
;
int32_t
cols
=
0
;
char
*
pWrite
;
char
*
pWrite
;
char
ipStr
[
TSDB_IPv4ADDR_LEN
+
6
];
if
(
pShow
->
pIter
==
NULL
)
{
...
...
source/dnode/mnode/impl/test/profile/profile.cpp
浏览文件 @
ad2d2c7c
...
...
@@ -30,8 +30,15 @@ int32_t MndTestProfile::connId;
TEST_F
(
MndTestProfile
,
01
_ConnectMsg
)
{
SConnectReq
connectReq
=
{
0
};
connectReq
.
pid
=
1234
;
char
passwd
[]
=
"taosdata"
;
char
secretEncrypt
[
TSDB_PASSWORD_LEN
]
=
{
0
};
taosEncryptPass_c
((
uint8_t
*
)
passwd
,
strlen
(
passwd
),
secretEncrypt
);
strcpy
(
connectReq
.
app
,
"mnode_test_profile"
);
strcpy
(
connectReq
.
db
,
""
);
strcpy
(
connectReq
.
user
,
"root"
);
strcpy
(
connectReq
.
passwd
,
secretEncrypt
);
int32_t
contLen
=
tSerializeSConnectReq
(
NULL
,
0
,
&
connectReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
...
...
@@ -58,10 +65,16 @@ TEST_F(MndTestProfile, 01_ConnectMsg) {
}
TEST_F
(
MndTestProfile
,
02
_ConnectMsg_InvalidDB
)
{
char
passwd
[]
=
"taosdata"
;
char
secretEncrypt
[
TSDB_PASSWORD_LEN
]
=
{
0
};
taosEncryptPass_c
((
uint8_t
*
)
passwd
,
strlen
(
passwd
),
secretEncrypt
);
SConnectReq
connectReq
=
{
0
};
connectReq
.
pid
=
1234
;
strcpy
(
connectReq
.
app
,
"mnode_test_profile"
);
strcpy
(
connectReq
.
db
,
"invalid_db"
);
strcpy
(
connectReq
.
user
,
"root"
);
strcpy
(
connectReq
.
passwd
,
secretEncrypt
);
int32_t
contLen
=
tSerializeSConnectReq
(
NULL
,
0
,
&
connectReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
...
...
source/dnode/mnode/impl/test/show/show.cpp
浏览文件 @
ad2d2c7c
...
...
@@ -54,10 +54,16 @@ TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) {
}
TEST_F
(
MndTestShow
,
03
_ShowMsg_Conn
)
{
char
passwd
[]
=
"taosdata"
;
char
secretEncrypt
[
TSDB_PASSWORD_LEN
]
=
{
0
};
taosEncryptPass_c
((
uint8_t
*
)
passwd
,
strlen
(
passwd
),
secretEncrypt
);
SConnectReq
connectReq
=
{
0
};
connectReq
.
pid
=
1234
;
strcpy
(
connectReq
.
app
,
"mnode_test_show"
);
strcpy
(
connectReq
.
db
,
""
);
strcpy
(
connectReq
.
user
,
"root"
);
strcpy
(
connectReq
.
passwd
,
secretEncrypt
);
int32_t
contLen
=
tSerializeSConnectReq
(
NULL
,
0
,
&
connectReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
...
...
source/libs/transport/inc/transComm.h
浏览文件 @
ad2d2c7c
...
...
@@ -158,6 +158,7 @@ typedef struct {
char
secured
:
2
;
char
spi
:
2
;
char
user
[
TSDB_UNI_LEN
];
uint64_t
ahandle
;
// ahandle assigned by client
uint32_t
code
;
// del later
uint32_t
msgType
;
...
...
source/libs/transport/src/transCli.c
浏览文件 @
ad2d2c7c
...
...
@@ -614,35 +614,16 @@ void cliSend(SCliConn* pConn) {
pMsg
->
pCont
=
(
void
*
)
rpcMallocCont
(
0
);
pMsg
->
contLen
=
0
;
}
STransMsgHead
*
pHead
=
transHeadFromCont
(
pMsg
->
pCont
);
pHead
->
ahandle
=
pCtx
!=
NULL
?
(
uint64_t
)
pCtx
->
ahandle
:
0
;
int
msgLen
=
transMsgLenFromCont
(
pMsg
->
contLen
);
if
(
!
pConn
->
secured
)
{
char
*
buf
=
taosMemoryCalloc
(
1
,
msgLen
+
sizeof
(
STransUserMsg
));
memcpy
(
buf
,
(
char
*
)
pHead
,
msgLen
);
STransUserMsg
*
uMsg
=
(
STransUserMsg
*
)(
buf
+
msgLen
);
memcpy
(
uMsg
->
user
,
pTransInst
->
user
,
tListLen
(
uMsg
->
user
));
memcpy
(
uMsg
->
secret
,
pTransInst
->
secret
,
tListLen
(
uMsg
->
secret
));
// to avoid mem leak
destroyUserdata
(
pMsg
);
pMsg
->
pCont
=
(
char
*
)
buf
+
sizeof
(
STransMsgHead
);
pMsg
->
contLen
=
msgLen
+
sizeof
(
STransUserMsg
)
-
sizeof
(
STransMsgHead
);
pHead
=
(
STransMsgHead
*
)
buf
;
pHead
->
secured
=
1
;
msgLen
+=
sizeof
(
STransUserMsg
);
}
STransMsgHead
*
pHead
=
transHeadFromCont
(
pMsg
->
pCont
);
pHead
->
ahandle
=
pCtx
!=
NULL
?
(
uint64_t
)
pCtx
->
ahandle
:
0
;
pHead
->
noResp
=
REQUEST_NO_RESP
(
pMsg
)
?
1
:
0
;
pHead
->
persist
=
REQUEST_PERSIS_HANDLE
(
pMsg
)
?
1
:
0
;
pHead
->
msgType
=
pMsg
->
msgType
;
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
msgLen
);
pHead
->
release
=
REQUEST_RELEASE_HANDLE
(
pCliMsg
)
?
1
:
0
;
memcpy
(
pHead
->
user
,
pTransInst
->
user
,
strlen
(
pTransInst
->
user
));
uv_buf_t
wb
=
uv_buf_init
((
char
*
)
pHead
,
msgLen
);
tDebug
(
"%s cli conn %p %s is send to %s:%d, local info %s:%d"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
ad2d2c7c
...
...
@@ -46,7 +46,6 @@ typedef struct SSrvConn {
struct
sockaddr_in
addr
;
struct
sockaddr_in
locaddr
;
char
secured
;
int
spi
;
char
info
[
64
];
char
user
[
TSDB_UNI_LEN
];
// user ID for the link
...
...
@@ -104,6 +103,13 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf)
static
void
uvWorkerAsyncCb
(
uv_async_t
*
handle
);
static
void
uvAcceptAsyncCb
(
uv_async_t
*
handle
);
static
void
uvShutDownCb
(
uv_shutdown_t
*
req
,
int
status
);
/*
* time-consuming task throwed into BG work thread
*/
static
void
uvWorkDoTask
(
uv_work_t
*
req
);
static
void
uvWorkAfterTask
(
uv_work_t
*
req
,
int
status
);
static
void
uvWalkCb
(
uv_handle_t
*
handle
,
void
*
arg
);
static
void
uvFreeCb
(
uv_handle_t
*
handle
);
...
...
@@ -181,16 +187,16 @@ static void uvHandleReq(SSrvConn* pConn) {
uint32_t
msgLen
=
pBuf
->
len
;
STransMsgHead
*
pHead
=
(
STransMsgHead
*
)
msg
;
if
(
pHead
->
secured
==
1
)
{
STransUserMsg
*
uMsg
=
(
STransUserMsg
*
)((
char
*
)
msg
+
msgLen
-
sizeof
(
STransUserMsg
));
memcpy
(
pConn
->
user
,
uMsg
->
user
,
tListLen
(
uMsg
->
user
));
memcpy
(
pConn
->
secret
,
uMsg
->
secret
,
tListLen
(
uMsg
->
secret
));
}
pHead
->
code
=
htonl
(
pHead
->
code
);
pHead
->
msgLen
=
htonl
(
pHead
->
msgLen
);
if
(
pHead
->
secured
==
1
)
{
pHead
->
msgLen
-=
sizeof
(
STransUserMsg
);
}
memcpy
(
pConn
->
user
,
pHead
->
user
,
strlen
(
pHead
->
user
));
// TODO(dengyihao): time-consuming task throwed into BG Thread
// uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t));
// wreq->data = pConn;
// uv_read_stop((uv_stream_t*)pConn->pTcp);
// transRefSrvHandle(pConn);
// uv_queue_work(((SWorkThrdObj*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
CONN_SHOULD_RELEASE
(
pConn
,
pHead
);
...
...
@@ -344,12 +350,6 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
STransMsgHead
*
pHead
=
transHeadFromCont
(
pMsg
->
pCont
);
pHead
->
ahandle
=
(
uint64_t
)
pMsg
->
ahandle
;
// pHead->secured = pMsg->code == 0 ? 1 : 0; //
if
(
!
pConn
->
secured
)
{
pConn
->
secured
=
pMsg
->
code
==
0
?
1
:
0
;
}
pHead
->
secured
=
pConn
->
secured
;
if
(
pConn
->
status
==
ConnNormal
)
{
pHead
->
msgType
=
pConn
->
inType
+
1
;
}
else
{
...
...
@@ -464,6 +464,24 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) {
taosMemoryFree
(
req
);
}
static
void
uvWorkDoTask
(
uv_work_t
*
req
)
{
// doing time-consumeing task
// only auth conn currently, add more func later
tTrace
(
"server conn %p start to be processed in BG Thread"
,
req
->
data
);
return
;
}
static
void
uvWorkAfterTask
(
uv_work_t
*
req
,
int
status
)
{
if
(
status
!=
0
)
{
tTrace
(
"server conn %p failed to processed "
,
req
->
data
);
}
// Done time-consumeing task
// add more func later
// this func called in main loop
tTrace
(
"server conn %p already processed "
,
req
->
data
);
taosMemoryFree
(
req
);
}
void
uvOnAcceptCb
(
uv_stream_t
*
stream
,
int
status
)
{
if
(
status
==
-
1
)
{
return
;
...
...
tests/script/runAllSimCases.sh
浏览文件 @
ad2d2c7c
!
/bin/bash
#
!/bin/bash
##################################################
#
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录