Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ca272200
TDengine
项目概览
taosdata
/
TDengine
10 个月 前同步成功
通知
1177
Star
22014
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
ca272200
编写于
7月 07, 2023
作者:
H
Haojun Liao
提交者:
GitHub
7月 07, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #21962 from taosdata/fix/TD-25102
add version check in rpc
上级
47a7a45d
3bd99776
变更
19
隐藏空白更改
内联
并排
Showing
19 changed file
with
145 addition
and
106 deletion
+145
-106
include/libs/transport/trpc.h
include/libs/transport/trpc.h
+2
-0
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+23
-22
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+17
-19
source/client/src/clientMain.c
source/client/src/clientMain.c
+25
-29
source/dnode/mgmt/node_mgmt/src/dmTransport.c
source/dnode/mgmt/node_mgmt/src/dmTransport.c
+10
-1
source/dnode/mgmt/test/sut/src/client.cpp
source/dnode/mgmt/test/sut/src/client.cpp
+2
-0
source/libs/function/src/udfd.c
source/libs/function/src/udfd.c
+2
-1
source/libs/sync/test/sync_test_lib/src/syncIO.c
source/libs/sync/test/sync_test_lib/src/syncIO.c
+13
-13
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+2
-0
source/libs/transport/inc/transportInt.h
source/libs/transport/inc/transportInt.h
+4
-4
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+1
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+14
-4
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+1
-1
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+8
-4
source/libs/transport/test/cliBench.c
source/libs/transport/test/cliBench.c
+2
-1
source/libs/transport/test/svrBench.c
source/libs/transport/test/svrBench.c
+5
-2
source/libs/transport/test/transUT.cpp
source/libs/transport/test/transUT.cpp
+6
-2
tests/system-test/0-others/compatibility.py
tests/system-test/0-others/compatibility.py
+4
-3
tools/shell/src/shellNettest.c
tools/shell/src/shellNettest.c
+4
-0
未找到文件。
include/libs/transport/trpc.h
浏览文件 @
ca272200
...
...
@@ -46,6 +46,7 @@ typedef struct SRpcHandleInfo {
int8_t
noResp
;
// has response or not(default 0, 0: resp, 1: no resp)
int8_t
persistHandle
;
// persist handle or not
int8_t
hasEpSet
;
int32_t
cliVer
;
// app info
void
*
ahandle
;
// app handle set by client
...
...
@@ -83,6 +84,7 @@ typedef struct SRpcInit {
int32_t
sessions
;
// number of sessions allowed
int8_t
connType
;
// TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int32_t
idleTime
;
// milliseconds, 0 means idle timer is disabled
int32_t
compatibilityVer
;
int32_t
retryMinInterval
;
// retry init interval
int32_t
retryStepFactor
;
// retry interval factor
...
...
source/client/src/clientEnv.c
浏览文件 @
ca272200
...
...
@@ -29,6 +29,7 @@
#include "trpc.h"
#include "tsched.h"
#include "ttime.h"
#include "tversion.h"
#if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL)
#include "cus_name.h"
...
...
@@ -111,7 +112,8 @@ static void deregisterRequest(SRequestObj *pRequest) {
atomic_add_fetch_64
((
int64_t
*
)
&
pActivity
->
numOfSlowQueries
,
1
);
if
(
tsSlowLogScope
&
reqType
)
{
taosPrintSlowLog
(
"PID:%d, Conn:%u, QID:0x%"
PRIx64
", Start:%"
PRId64
", Duration:%"
PRId64
"us, SQL:%s"
,
taosGetPId
(),
pTscObj
->
connId
,
pRequest
->
requestId
,
pRequest
->
metric
.
start
,
duration
,
pRequest
->
sqlstr
);
taosGetPId
(),
pTscObj
->
connId
,
pRequest
->
requestId
,
pRequest
->
metric
.
start
,
duration
,
pRequest
->
sqlstr
);
}
}
...
...
@@ -175,6 +177,8 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit
.
connLimitNum
=
connLimitNum
;
rpcInit
.
timeToGetConn
=
tsTimeToGetAvailableConn
;
taosVersionStrToInt
(
version
,
&
(
rpcInit
.
compatibilityVer
));
void
*
pDnodeConn
=
rpcOpen
(
&
rpcInit
);
if
(
pDnodeConn
==
NULL
)
{
tscError
(
"failed to init connection to server"
);
...
...
@@ -358,17 +362,16 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri
int32_t
removeRequest
(
int64_t
rid
)
{
return
taosRemoveRef
(
clientReqRefPool
,
rid
);
}
void
destroySubRequests
(
SRequestObj
*
pRequest
)
{
int32_t
reqIdx
=
-
1
;
int32_t
reqIdx
=
-
1
;
SRequestObj
*
pReqList
[
16
]
=
{
NULL
};
uint64_t
tmpRefId
=
0
;
uint64_t
tmpRefId
=
0
;
if
(
pRequest
->
relation
.
userRefId
&&
pRequest
->
relation
.
userRefId
!=
pRequest
->
self
)
{
return
;
}
SRequestObj
*
pTmp
=
pRequest
;
SRequestObj
*
pTmp
=
pRequest
;
while
(
pTmp
->
relation
.
prevRefId
)
{
tmpRefId
=
pTmp
->
relation
.
prevRefId
;
pTmp
=
acquireRequest
(
tmpRefId
);
...
...
@@ -376,9 +379,9 @@ void destroySubRequests(SRequestObj *pRequest) {
pReqList
[
++
reqIdx
]
=
pTmp
;
releaseRequest
(
tmpRefId
);
}
else
{
tscError
(
"0x%"
PRIx64
", prev req ref 0x%"
PRIx64
" is not there, reqId:0x%"
PRIx64
,
pTmp
->
self
,
tmpRefId
,
pTmp
->
requestId
);
break
;
tscError
(
"0x%"
PRIx64
", prev req ref 0x%"
PRIx64
" is not there, reqId:0x%"
PRIx64
,
pTmp
->
self
,
tmpRefId
,
pTmp
->
requestId
);
break
;
}
}
...
...
@@ -391,16 +394,15 @@ void destroySubRequests(SRequestObj *pRequest) {
pTmp
=
acquireRequest
(
tmpRefId
);
if
(
pTmp
)
{
tmpRefId
=
pTmp
->
relation
.
nextRefId
;
removeRequest
(
pTmp
->
self
);
removeRequest
(
pTmp
->
self
);
releaseRequest
(
pTmp
->
self
);
}
else
{
tscError
(
"0x%"
PRIx64
" is not there"
,
tmpRefId
);
break
;
break
;
}
}
}
void
doDestroyRequest
(
void
*
p
)
{
if
(
NULL
==
p
)
{
return
;
...
...
@@ -412,7 +414,7 @@ void doDestroyRequest(void *p) {
tscTrace
(
"begin to destroy request %"
PRIx64
" p:%p"
,
reqId
,
pRequest
);
destroySubRequests
(
pRequest
);
taosHashRemove
(
pRequest
->
pTscObj
->
pRequests
,
&
pRequest
->
self
,
sizeof
(
pRequest
->
self
));
schedulerFreeJob
(
&
pRequest
->
body
.
queryJob
,
0
);
...
...
@@ -473,15 +475,15 @@ void taosStopQueryImpl(SRequestObj *pRequest) {
}
void
stopAllQueries
(
SRequestObj
*
pRequest
)
{
int32_t
reqIdx
=
-
1
;
int32_t
reqIdx
=
-
1
;
SRequestObj
*
pReqList
[
16
]
=
{
NULL
};
uint64_t
tmpRefId
=
0
;
uint64_t
tmpRefId
=
0
;
if
(
pRequest
->
relation
.
userRefId
&&
pRequest
->
relation
.
userRefId
!=
pRequest
->
self
)
{
return
;
}
SRequestObj
*
pTmp
=
pRequest
;
SRequestObj
*
pTmp
=
pRequest
;
while
(
pTmp
->
relation
.
prevRefId
)
{
tmpRefId
=
pTmp
->
relation
.
prevRefId
;
pTmp
=
acquireRequest
(
tmpRefId
);
...
...
@@ -489,9 +491,9 @@ void stopAllQueries(SRequestObj *pRequest) {
pReqList
[
++
reqIdx
]
=
pTmp
;
releaseRequest
(
tmpRefId
);
}
else
{
tscError
(
"0x%"
PRIx64
", prev req ref 0x%"
PRIx64
" is not there, reqId:0x%"
PRIx64
,
pTmp
->
self
,
tmpRefId
,
pTmp
->
requestId
);
break
;
tscError
(
"0x%"
PRIx64
", prev req ref 0x%"
PRIx64
" is not there, reqId:0x%"
PRIx64
,
pTmp
->
self
,
tmpRefId
,
pTmp
->
requestId
);
break
;
}
}
...
...
@@ -510,12 +512,11 @@ void stopAllQueries(SRequestObj *pRequest) {
releaseRequest
(
pTmp
->
self
);
}
else
{
tscError
(
"0x%"
PRIx64
" is not there"
,
tmpRefId
);
break
;
break
;
}
}
}
void
crashReportThreadFuncUnexpectedStopped
(
void
)
{
atomic_store_32
(
&
clientStop
,
-
1
);
}
static
void
*
tscCrashReportThreadFp
(
void
*
param
)
{
...
...
source/client/src/clientImpl.c
浏览文件 @
ca272200
...
...
@@ -26,7 +26,7 @@
#include "tpagedbuf.h"
#include "tref.h"
#include "tsched.h"
#include "tversion.h"
static
int32_t
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
);
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
);
...
...
@@ -237,8 +237,9 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
return
TSDB_CODE_SUCCESS
;
}
int32_t
buildPreviousRequest
(
SRequestObj
*
pRequest
,
const
char
*
sql
,
SRequestObj
**
pNewRequest
)
{
int32_t
code
=
buildRequest
(
pRequest
->
pTscObj
->
id
,
sql
,
strlen
(
sql
),
pRequest
,
pRequest
->
validateOnly
,
pNewRequest
,
0
);
int32_t
buildPreviousRequest
(
SRequestObj
*
pRequest
,
const
char
*
sql
,
SRequestObj
**
pNewRequest
)
{
int32_t
code
=
buildRequest
(
pRequest
->
pTscObj
->
id
,
sql
,
strlen
(
sql
),
pRequest
,
pRequest
->
validateOnly
,
pNewRequest
,
0
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pRequest
->
relation
.
prevRefId
=
(
*
pNewRequest
)
->
self
;
(
*
pNewRequest
)
->
relation
.
nextRefId
=
pRequest
->
self
;
...
...
@@ -502,8 +503,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
pResInfo
->
userFields
[
i
].
bytes
=
pSchema
[
i
].
bytes
;
pResInfo
->
userFields
[
i
].
type
=
pSchema
[
i
].
type
;
if
(
pSchema
[
i
].
type
==
TSDB_DATA_TYPE_VARCHAR
||
pSchema
[
i
].
type
==
TSDB_DATA_TYPE_GEOMETRY
)
{
if
(
pSchema
[
i
].
type
==
TSDB_DATA_TYPE_VARCHAR
||
pSchema
[
i
].
type
==
TSDB_DATA_TYPE_GEOMETRY
)
{
pResInfo
->
userFields
[
i
].
bytes
-=
VARSTR_HEADER_SIZE
;
}
else
if
(
pSchema
[
i
].
type
==
TSDB_DATA_TYPE_NCHAR
||
pSchema
[
i
].
type
==
TSDB_DATA_TYPE_JSON
)
{
pResInfo
->
userFields
[
i
].
bytes
=
(
pResInfo
->
userFields
[
i
].
bytes
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
;
...
...
@@ -891,7 +891,7 @@ static bool incompletaFileParsing(SNode* pStmt) {
void
continuePostSubQuery
(
SRequestObj
*
pRequest
,
TAOS_ROW
row
)
{
SSqlCallbackWrapper
*
pWrapper
=
pRequest
->
pWrapper
;
int32_t
code
=
nodesAcquireAllocator
(
pWrapper
->
pParseCtx
->
allocatorId
);
int32_t
code
=
nodesAcquireAllocator
(
pWrapper
->
pParseCtx
->
allocatorId
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
int64_t
analyseStart
=
taosGetTimestampUs
();
code
=
qContinueParsePostQuery
(
pWrapper
->
pParseCtx
,
pRequest
->
pQuery
,
(
void
**
)
row
);
...
...
@@ -934,7 +934,7 @@ void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
TAOS_ROW
row
=
NULL
;
if
(
rowNum
>
0
)
{
row
=
taos_fetch_row
(
res
);
// for single row only now
row
=
taos_fetch_row
(
res
);
// for single row only now
}
SRequestObj
*
pNextReq
=
acquireRequest
(
pRequest
->
relation
.
nextRefId
);
...
...
@@ -2135,6 +2135,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
connLimitNum
=
TMIN
(
connLimitNum
,
500
);
rpcInit
.
connLimitNum
=
connLimitNum
;
rpcInit
.
timeToGetConn
=
tsTimeToGetAvailableConn
;
taosVersionStrToInt
(
version
,
&
(
rpcInit
.
compatibilityVer
));
clientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
clientRpc
==
NULL
)
{
...
...
@@ -2494,11 +2495,10 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly,
return
pRequest
;
}
static
void
fetchCallback
(
void
*
pResult
,
void
*
param
,
int32_t
code
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
param
;
static
void
fetchCallback
(
void
*
pResult
,
void
*
param
,
int32_t
code
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
param
;
SReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
resInfo
;
SReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
resInfo
;
tscDebug
(
"0x%"
PRIx64
" enter scheduler fetch cb, code:%d - %s, reqId:0x%"
PRIx64
,
pRequest
->
self
,
code
,
tstrerror
(
code
),
pRequest
->
requestId
);
...
...
@@ -2520,7 +2520,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
}
pRequest
->
code
=
setQueryResultFromRsp
(
pResultInfo
,
(
const
SRetrieveTableRsp
*
)
pResultInfo
->
pData
,
pResultInfo
->
convertUcs4
,
true
);
setQueryResultFromRsp
(
pResultInfo
,
(
const
SRetrieveTableRsp
*
)
pResultInfo
->
pData
,
pResultInfo
->
convertUcs4
,
true
);
if
(
pRequest
->
code
!=
TSDB_CODE_SUCCESS
)
{
pResultInfo
->
numOfRows
=
0
;
pRequest
->
code
=
code
;
...
...
@@ -2531,19 +2531,19 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
pRequest
->
self
,
pResultInfo
->
numOfRows
,
pResultInfo
->
totalRows
,
pResultInfo
->
completed
,
pRequest
->
requestId
);
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
SAppClusterSummary
*
pActivity
=
&
pTscObj
->
pAppInfo
->
summary
;
atomic_add_fetch_64
((
int64_t
*
)
&
pActivity
->
fetchBytes
,
pRequest
->
body
.
resInfo
.
payloadLen
);
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
SAppClusterSummary
*
pActivity
=
&
pTscObj
->
pAppInfo
->
summary
;
atomic_add_fetch_64
((
int64_t
*
)
&
pActivity
->
fetchBytes
,
pRequest
->
body
.
resInfo
.
payloadLen
);
}
pRequest
->
body
.
fetchFp
(
pRequest
->
body
.
param
,
pRequest
,
pResultInfo
->
numOfRows
);
}
void
taosAsyncFetchImpl
(
SRequestObj
*
pRequest
,
__taos_async_fn_t
fp
,
void
*
param
)
{
void
taosAsyncFetchImpl
(
SRequestObj
*
pRequest
,
__taos_async_fn_t
fp
,
void
*
param
)
{
pRequest
->
body
.
fetchFp
=
fp
;
pRequest
->
body
.
param
=
param
;
SReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
resInfo
;
SReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
resInfo
;
// this query has no results or error exists, return directly
if
(
taos_num_fields
(
pRequest
)
==
0
||
pRequest
->
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -2578,5 +2578,3 @@ void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param
schedulerFetchRows
(
pRequest
->
body
.
queryJob
,
&
req
);
}
source/client/src/clientMain.c
浏览文件 @
ca272200
...
...
@@ -558,13 +558,12 @@ int taos_select_db(TAOS *taos, const char *db) {
return
code
;
}
void
taos_stop_query
(
TAOS_RES
*
res
)
{
if
(
res
==
NULL
||
TD_RES_TMQ
(
res
)
||
TD_RES_TMQ_META
(
res
)
||
TD_RES_TMQ_METADATA
(
res
))
{
return
;
}
stopAllQueries
((
SRequestObj
*
)
res
);
stopAllQueries
((
SRequestObj
*
)
res
);
}
bool
taos_is_null
(
TAOS_RES
*
res
,
int32_t
row
,
int32_t
col
)
{
...
...
@@ -785,7 +784,7 @@ void destorySqlCallbackWrapper(SSqlCallbackWrapper *pWrapper) {
taosMemoryFree
(
pWrapper
);
}
void
destroyCtxInRequest
(
SRequestObj
*
pRequest
)
{
void
destroyCtxInRequest
(
SRequestObj
*
pRequest
)
{
schedulerFreeJob
(
&
pRequest
->
body
.
queryJob
,
0
);
qDestroyQuery
(
pRequest
->
pQuery
);
pRequest
->
pQuery
=
NULL
;
...
...
@@ -793,7 +792,6 @@ void destroyCtxInRequest(SRequestObj* pRequest) {
pRequest
->
pWrapper
=
NULL
;
}
static
void
doAsyncQueryFromAnalyse
(
SMetaData
*
pResultMeta
,
void
*
param
,
int32_t
code
)
{
SSqlCallbackWrapper
*
pWrapper
=
(
SSqlCallbackWrapper
*
)
param
;
SRequestObj
*
pRequest
=
pWrapper
->
pRequest
;
...
...
@@ -807,15 +805,15 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
qAnalyseSqlSemantic
(
pWrapper
->
pParseCtx
,
pWrapper
->
pCatalogReq
,
pResultMeta
,
pQuery
);
}
pRequest
->
metric
.
analyseCostUs
+=
taosGetTimestampUs
()
-
analyseStart
;
handleQueryAnslyseRes
(
pWrapper
,
pResultMeta
,
code
);
}
int32_t
cloneCatalogReq
(
SCatalogReq
*
*
ppTarget
,
SCatalogReq
*
pSrc
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SCatalogReq
*
pTarget
=
taosMemoryCalloc
(
1
,
sizeof
(
SCatalogReq
));
int32_t
cloneCatalogReq
(
SCatalogReq
**
ppTarget
,
SCatalogReq
*
pSrc
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SCatalogReq
*
pTarget
=
taosMemoryCalloc
(
1
,
sizeof
(
SCatalogReq
));
if
(
pTarget
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
else
{
...
...
@@ -842,17 +840,16 @@ int32_t cloneCatalogReq(SCatalogReq* * ppTarget, SCatalogReq* pSrc) {
return
code
;
}
void
handleSubQueryFromAnalyse
(
SSqlCallbackWrapper
*
pWrapper
,
SMetaData
*
pResultMeta
,
SNode
*
pRoot
)
{
SRequestObj
*
pNewRequest
=
NULL
;
SSqlCallbackWrapper
*
pNewWrapper
=
NULL
;
int32_t
code
=
buildPreviousRequest
(
pWrapper
->
pRequest
,
pWrapper
->
pRequest
->
sqlstr
,
&
pNewRequest
);
void
handleSubQueryFromAnalyse
(
SSqlCallbackWrapper
*
pWrapper
,
SMetaData
*
pResultMeta
,
SNode
*
pRoot
)
{
SRequestObj
*
pNewRequest
=
NULL
;
SSqlCallbackWrapper
*
pNewWrapper
=
NULL
;
int32_t
code
=
buildPreviousRequest
(
pWrapper
->
pRequest
,
pWrapper
->
pRequest
->
sqlstr
,
&
pNewRequest
);
if
(
code
)
{
handleQueryAnslyseRes
(
pWrapper
,
pResultMeta
,
code
);
return
;
}
pNewRequest
->
pQuery
=
(
SQuery
*
)
nodesMakeNode
(
QUERY_NODE_QUERY
);
pNewRequest
->
pQuery
=
(
SQuery
*
)
nodesMakeNode
(
QUERY_NODE_QUERY
);
if
(
NULL
==
pNewRequest
->
pQuery
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
else
{
...
...
@@ -871,16 +868,16 @@ void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResult
}
void
handleQueryAnslyseRes
(
SSqlCallbackWrapper
*
pWrapper
,
SMetaData
*
pResultMeta
,
int32_t
code
)
{
SRequestObj
*
pRequest
=
pWrapper
->
pRequest
;
SQuery
*
pQuery
=
pRequest
->
pQuery
;
SRequestObj
*
pRequest
=
pWrapper
->
pRequest
;
SQuery
*
pQuery
=
pRequest
->
pQuery
;
if
(
code
==
TSDB_CODE_SUCCESS
&&
pQuery
->
pPrevRoot
)
{
SNode
*
prevRoot
=
pQuery
->
pPrevRoot
;
SNode
*
prevRoot
=
pQuery
->
pPrevRoot
;
pQuery
->
pPrevRoot
=
NULL
;
handleSubQueryFromAnalyse
(
pWrapper
,
pResultMeta
,
prevRoot
);
return
;
}
if
(
code
==
TSDB_CODE_SUCCESS
)
{
pRequest
->
stableQuery
=
pQuery
->
stableQuery
;
if
(
pQuery
->
pRoot
)
{
...
...
@@ -1043,7 +1040,7 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
}
int32_t
prepareAndParseSqlSyntax
(
SSqlCallbackWrapper
**
ppWrapper
,
SRequestObj
*
pRequest
,
bool
updateMetaForce
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
SSqlCallbackWrapper
*
pWrapper
=
taosMemoryCalloc
(
1
,
sizeof
(
SSqlCallbackWrapper
));
if
(
pWrapper
==
NULL
)
{
...
...
@@ -1081,7 +1078,6 @@ int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *p
return
code
;
}
void
doAsyncQuery
(
SRequestObj
*
pRequest
,
bool
updateMetaForce
)
{
SSqlCallbackWrapper
*
pWrapper
=
NULL
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -1128,12 +1124,12 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
}
void
restartAsyncQuery
(
SRequestObj
*
pRequest
,
int32_t
code
)
{
int32_t
reqIdx
=
0
;
int32_t
reqIdx
=
0
;
SRequestObj
*
pReqList
[
16
]
=
{
NULL
};
SRequestObj
*
pUserReq
=
NULL
;
pReqList
[
0
]
=
pRequest
;
uint64_t
tmpRefId
=
0
;
SRequestObj
*
pTmp
=
pRequest
;
uint64_t
tmpRefId
=
0
;
SRequestObj
*
pTmp
=
pRequest
;
while
(
pTmp
->
relation
.
prevRefId
)
{
tmpRefId
=
pTmp
->
relation
.
prevRefId
;
pTmp
=
acquireRequest
(
tmpRefId
);
...
...
@@ -1141,9 +1137,9 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
pReqList
[
++
reqIdx
]
=
pTmp
;
releaseRequest
(
tmpRefId
);
}
else
{
tscError
(
"0x%"
PRIx64
", prev req ref 0x%"
PRIx64
" is not there, reqId:0x%"
PRIx64
,
pTmp
->
self
,
tmpRefId
,
pTmp
->
requestId
);
break
;
tscError
(
"0x%"
PRIx64
", prev req ref 0x%"
PRIx64
" is not there, reqId:0x%"
PRIx64
,
pTmp
->
self
,
tmpRefId
,
pTmp
->
requestId
);
break
;
}
}
...
...
@@ -1152,11 +1148,11 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
pTmp
=
acquireRequest
(
tmpRefId
);
if
(
pTmp
)
{
tmpRefId
=
pTmp
->
relation
.
nextRefId
;
removeRequest
(
pTmp
->
self
);
removeRequest
(
pTmp
->
self
);
releaseRequest
(
pTmp
->
self
);
}
else
{
tscError
(
"0x%"
PRIx64
" is not there"
,
tmpRefId
);
break
;
break
;
}
}
...
...
source/dnode/mgmt/node_mgmt/src/dmTransport.c
浏览文件 @
ca272200
...
...
@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "dmMgmt.h"
#include "qworker.h"
#include "tversion.h"
static
inline
void
dmSendRsp
(
SRpcMsg
*
pMsg
)
{
rpcSendResponse
(
pMsg
);
}
...
...
@@ -73,6 +74,13 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
dGTrace
(
"msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%"
PRId64
,
TMSG_INFO
(
pRpc
->
msgType
),
pRpc
->
info
.
handle
,
pRpc
->
contLen
,
pRpc
->
code
,
pRpc
->
info
.
ahandle
,
pRpc
->
info
.
refId
);
int32_t
svrVer
=
0
;
taosVersionStrToInt
(
version
,
&
svrVer
);
if
(
0
!=
taosCheckVersionCompatible
(
pRpc
->
info
.
cliVer
,
svrVer
,
3
))
{
dError
(
"Version not compatible, cli ver: %d, svr ver: %d"
,
pRpc
->
info
.
cliVer
,
svrVer
);
goto
_OVER
;
}
switch
(
pRpc
->
msgType
)
{
case
TDMT_DND_NET_TEST
:
dmProcessNetTestReq
(
pDnode
,
pRpc
);
...
...
@@ -305,6 +313,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit
.
supportBatch
=
1
;
rpcInit
.
batchSize
=
8
*
1024
;
rpcInit
.
timeToGetConn
=
tsTimeToGetAvailableConn
;
taosVersionStrToInt
(
version
,
&
(
rpcInit
.
compatibilityVer
));
pTrans
->
clientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
pTrans
->
clientRpc
==
NULL
)
{
...
...
@@ -339,7 +348,7 @@ int32_t dmInitServer(SDnode *pDnode) {
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
parent
=
pDnode
;
rpcInit
.
compressSize
=
tsCompressMsgSize
;
taosVersionStrToInt
(
version
,
&
(
rpcInit
.
compatibilityVer
));
pTrans
->
serverRpc
=
rpcOpen
(
&
rpcInit
);
if
(
pTrans
->
serverRpc
==
NULL
)
{
dError
(
"failed to init dnode rpc server"
);
...
...
source/dnode/mgmt/test/sut/src/client.cpp
浏览文件 @
ca272200
...
...
@@ -16,6 +16,7 @@
#include "sut.h"
#include "tdatablock.h"
#include "tmisce.h"
#include "tversion.h"
static
void
processClientRsp
(
void
*
parent
,
SRpcMsg
*
pRsp
,
SEpSet
*
pEpSet
)
{
TestClient
*
client
=
(
TestClient
*
)
parent
;
...
...
@@ -53,6 +54,7 @@ void TestClient::DoInit() {
rpcInit
.
parent
=
this
;
// rpcInit.secret = (char*)secretEncrypt;
// rpcInit.spi = 1;
taosVersionStrToInt
(
version
,
&
(
rpcInit
.
compatibilityVer
));
clientRpc
=
rpcOpen
(
&
rpcInit
);
ASSERT
(
clientRpc
);
...
...
source/libs/function/src/udfd.c
浏览文件 @
ca272200
...
...
@@ -29,6 +29,7 @@
#include "tmsg.h"
#include "trpc.h"
#include "tmisce.h"
#include "tversion.h"
// clang-format on
#define UDFD_MAX_SCRIPT_PLUGINS 64
...
...
@@ -1038,7 +1039,7 @@ int32_t udfdOpenClientRpc() {
connLimitNum
=
TMIN
(
connLimitNum
,
500
);
rpcInit
.
connLimitNum
=
connLimitNum
;
rpcInit
.
timeToGetConn
=
tsTimeToGetAvailableConn
;
taosVersionStrToInt
(
version
,
&
(
rpcInit
.
compatibilityVer
));
global
.
clientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
global
.
clientRpc
==
NULL
)
{
fnError
(
"failed to init dnode rpc client"
);
...
...
source/libs/sync/test/sync_test_lib/src/syncIO.c
浏览文件 @
ca272200
...
...
@@ -21,6 +21,7 @@
#include "tglobal.h"
#include "ttimer.h"
#include "tutil.h"
#include "tversion.h"
bool
gRaftDetailLog
=
false
;
SSyncIO
*
gSyncIO
=
NULL
;
...
...
@@ -188,7 +189,7 @@ static int32_t syncIOStartInternal(SSyncIO *io) {
rpcInit
.
idleTime
=
100
;
rpcInit
.
user
=
"sync-io"
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
taosVersionStrToInt
(
version
,
&
(
rpcInit
.
compatibilityVer
));
io
->
clientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
io
->
clientRpc
==
NULL
)
{
sError
(
"failed to initialize RPC"
);
...
...
@@ -209,7 +210,7 @@ static int32_t syncIOStartInternal(SSyncIO *io) {
rpcInit
.
idleTime
=
2
*
1500
;
rpcInit
.
parent
=
io
;
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
taosVersionStrToInt
(
version
,
&
(
rpcInit
.
compatibilityVer
));
void
*
pRpc
=
rpcOpen
(
&
rpcInit
);
if
(
pRpc
==
NULL
)
{
sError
(
"failed to start RPC server"
);
...
...
@@ -470,11 +471,10 @@ static void syncIOTickPing(void *param, void *tmrId) {
taosTmrReset
(
syncIOTickPing
,
io
->
pingTimerMS
,
io
,
io
->
timerMgr
,
&
io
->
pingTimer
);
}
void
syncEntryDestory
(
SSyncRaftEntry
*
pEntry
)
{}
void
syncEntryDestory
(
SSyncRaftEntry
*
pEntry
)
{}
void
syncUtilMsgNtoH
(
void
*
msg
)
{
SMsgHead
*
pHead
=
msg
;
void
syncUtilMsgNtoH
(
void
*
msg
)
{
SMsgHead
*
pHead
=
msg
;
pHead
->
contLen
=
ntohl
(
pHead
->
contLen
);
pHead
->
vgId
=
ntohl
(
pHead
->
vgId
);
}
...
...
@@ -487,9 +487,9 @@ static inline bool syncUtilCanPrint(char c) {
}
}
char
*
syncUtilPrintBin
(
char
*
ptr
,
uint32_t
len
)
{
char
*
syncUtilPrintBin
(
char
*
ptr
,
uint32_t
len
)
{
int64_t
memLen
=
(
int64_t
)(
len
+
1
);
char
*
s
=
taosMemoryMalloc
(
memLen
);
char
*
s
=
taosMemoryMalloc
(
memLen
);
ASSERT
(
s
!=
NULL
);
memset
(
s
,
0
,
len
+
1
);
memcpy
(
s
,
ptr
,
len
);
...
...
@@ -502,13 +502,13 @@ char* syncUtilPrintBin(char* ptr, uint32_t len) {
return
s
;
}
char
*
syncUtilPrintBin2
(
char
*
ptr
,
uint32_t
len
)
{
char
*
syncUtilPrintBin2
(
char
*
ptr
,
uint32_t
len
)
{
uint32_t
len2
=
len
*
4
+
1
;
char
*
s
=
taosMemoryMalloc
(
len2
);
char
*
s
=
taosMemoryMalloc
(
len2
);
ASSERT
(
s
!=
NULL
);
memset
(
s
,
0
,
len2
);
char
*
p
=
s
;
char
*
p
=
s
;
for
(
int32_t
i
=
0
;
i
<
len
;
++
i
)
{
int32_t
n
=
sprintf
(
p
,
"%d,"
,
ptr
[
i
]);
p
+=
n
;
...
...
@@ -516,7 +516,7 @@ char* syncUtilPrintBin2(char* ptr, uint32_t len) {
return
s
;
}
void
syncUtilU642Addr
(
uint64_t
u64
,
char
*
host
,
int64_t
len
,
uint16_t
*
port
)
{
void
syncUtilU642Addr
(
uint64_t
u64
,
char
*
host
,
int64_t
len
,
uint16_t
*
port
)
{
uint32_t
hostU32
=
(
uint32_t
)((
u64
>>
32
)
&
0x00000000FFFFFFFF
);
struct
in_addr
addr
=
{.
s_addr
=
hostU32
};
...
...
@@ -524,7 +524,7 @@ void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port) {
*
port
=
(
uint16_t
)((
u64
&
0x00000000FFFF0000
)
>>
16
);
}
uint64_t
syncUtilAddr2U64
(
const
char
*
host
,
uint16_t
port
)
{
uint64_t
syncUtilAddr2U64
(
const
char
*
host
,
uint16_t
port
)
{
uint32_t
hostU32
=
taosGetIpv4FromFqdn
(
host
);
if
(
hostU32
==
(
uint32_t
)
-
1
)
{
sError
(
"failed to resolve ipv4 addr, host:%s"
,
host
);
...
...
source/libs/transport/inc/transComm.h
浏览文件 @
ca272200
...
...
@@ -154,6 +154,7 @@ typedef struct {
#pragma pack(push, 1)
#define TRANS_VER 2
typedef
struct
{
char
version
:
4
;
// RPC version
char
comp
:
2
;
// compression algorithm, 0:no compression 1:lz4
...
...
@@ -166,6 +167,7 @@ typedef struct {
uint64_t
timestamp
;
char
user
[
TSDB_UNI_LEN
];
int32_t
compatibilityVer
;
uint32_t
magicNum
;
STraceId
traceId
;
uint64_t
ahandle
;
// ahandle assigned by client
...
...
source/libs/transport/inc/transportInt.h
浏览文件 @
ca272200
...
...
@@ -46,10 +46,10 @@ typedef struct {
int8_t
connType
;
char
label
[
TSDB_LABEL_LEN
];
char
user
[
TSDB_UNI_LEN
];
// meter ID
int32_t
compressSize
;
// -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t
encryption
;
// encrypt or not
int32_t
compatibilityVer
;
int32_t
compressSize
;
// -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t
encryption
;
// encrypt or not
int32_t
retryMinInterval
;
// retry init interval
int32_t
retryStepFactor
;
// retry interval factor
int32_t
retryMaxInterval
;
// retry max interval
...
...
source/libs/transport/src/trans.c
浏览文件 @
ca272200
...
...
@@ -50,6 +50,7 @@ void* rpcOpen(const SRpcInit* pInit) {
}
pRpc
->
encryption
=
pInit
->
encryption
;
pRpc
->
compatibilityVer
=
pInit
->
compatibilityVer
;
pRpc
->
retryMinInterval
=
pInit
->
retryMinInterval
;
// retry init interval
pRpc
->
retryStepFactor
=
pInit
->
retryStepFactor
;
...
...
source/libs/transport/src/transCli.c
浏览文件 @
ca272200
...
...
@@ -391,6 +391,7 @@ void cliHandleResp(SCliConn* conn) {
transMsg
.
info
.
ahandle
=
NULL
;
transMsg
.
info
.
traceId
=
pHead
->
traceId
;
transMsg
.
info
.
hasEpSet
=
pHead
->
hasEpSet
;
transMsg
.
info
.
cliVer
=
htonl
(
pHead
->
compatibilityVer
);
SCliMsg
*
pMsg
=
NULL
;
STransConnCtx
*
pCtx
=
NULL
;
...
...
@@ -488,6 +489,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
transMsg
.
code
=
code
==
-
1
?
(
pConn
->
broken
?
TSDB_CODE_RPC_BROKEN_LINK
:
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
:
code
;
transMsg
.
msgType
=
pMsg
?
pMsg
->
msg
.
msgType
+
1
:
0
;
transMsg
.
info
.
ahandle
=
NULL
;
transMsg
.
info
.
cliVer
=
pTransInst
->
compatibilityVer
;
if
(
pMsg
==
NULL
&&
!
CONN_NO_PERSIST_BY_APP
(
pConn
))
{
transMsg
.
info
.
ahandle
=
transCtxDumpVal
(
&
pConn
->
ctx
,
transMsg
.
msgType
);
...
...
@@ -984,11 +986,10 @@ void cliSendBatch(SCliConn* pConn) {
SCliThrd
*
pThrd
=
pConn
->
hostThrd
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
SCliBatch
*
pBatch
=
pConn
->
pBatch
;
SCliBatchList
*
pList
=
pBatch
->
pList
;
pList
->
connCnt
+=
1
;
SCliBatch
*
pBatch
=
pConn
->
pBatch
;
int32_t
wLen
=
pBatch
->
wLen
;
int32_t
wLen
=
pBatch
->
wLen
;
pBatch
->
pList
->
connCnt
+=
1
;
uv_buf_t
*
wb
=
taosMemoryCalloc
(
wLen
,
sizeof
(
uv_buf_t
));
int
i
=
0
;
...
...
@@ -1018,6 +1019,8 @@ void cliSendBatch(SCliConn* pConn) {
memcpy
(
pHead
->
user
,
pTransInst
->
user
,
strlen
(
pTransInst
->
user
));
pHead
->
traceId
=
pMsg
->
info
.
traceId
;
pHead
->
magicNum
=
htonl
(
TRANS_MAGIC_NUM
);
pHead
->
version
=
TRANS_VER
;
pHead
->
compatibilityVer
=
htonl
(
pTransInst
->
compatibilityVer
);
}
pHead
->
timestamp
=
taosHton64
(
taosGetTimestampUs
());
...
...
@@ -1074,6 +1077,8 @@ void cliSend(SCliConn* pConn) {
memcpy
(
pHead
->
user
,
pTransInst
->
user
,
strlen
(
pTransInst
->
user
));
pHead
->
traceId
=
pMsg
->
info
.
traceId
;
pHead
->
magicNum
=
htonl
(
TRANS_MAGIC_NUM
);
pHead
->
version
=
TRANS_VER
;
pHead
->
compatibilityVer
=
htonl
(
pTransInst
->
compatibilityVer
);
}
pHead
->
timestamp
=
taosHton64
(
taosGetTimestampUs
());
...
...
@@ -1346,6 +1351,7 @@ static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) {
transMsg
.
info
.
ahandle
=
pMsg
->
ctx
->
ahandle
;
transMsg
.
info
.
traceId
=
pMsg
->
msg
.
info
.
traceId
;
transMsg
.
info
.
hasEpSet
=
false
;
transMsg
.
info
.
cliVer
=
pTransInst
->
compatibilityVer
;
if
(
pCtx
->
pSem
!=
NULL
)
{
if
(
pCtx
->
pRsp
==
NULL
)
{
}
else
{
...
...
@@ -1527,6 +1533,9 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
// persist conn already release by server
STransMsg
resp
;
cliBuildExceptResp
(
pMsg
,
&
resp
);
// refactorr later
resp
.
info
.
cliVer
=
pTransInst
->
compatibilityVer
;
if
(
pMsg
->
type
!=
Release
)
{
pTransInst
->
cfp
(
pTransInst
->
parent
,
&
resp
,
NULL
);
}
...
...
@@ -1836,6 +1845,7 @@ void cliIteraConnMsgs(SCliConn* conn) {
if
(
-
1
==
cliBuildExceptResp
(
cmsg
,
&
resp
))
{
continue
;
}
resp
.
info
.
cliVer
=
pTransInst
->
compatibilityVer
;
pTransInst
->
cfp
(
pTransInst
->
parent
,
&
resp
,
NULL
);
cmsg
->
ctx
->
ahandle
=
NULL
;
...
...
source/libs/transport/src/transComm.c
浏览文件 @
ca272200
...
...
@@ -192,7 +192,7 @@ bool transReadComplete(SConnBuffer* connBuf) {
memcpy
((
char
*
)
&
head
,
connBuf
->
buf
,
sizeof
(
head
));
int32_t
msgLen
=
(
int32_t
)
htonl
(
head
.
msgLen
);
p
->
total
=
msgLen
;
p
->
invalid
=
TRANS_NOVALID_PACKET
(
htonl
(
head
.
magicNum
));
p
->
invalid
=
TRANS_NOVALID_PACKET
(
htonl
(
head
.
magicNum
))
||
head
.
version
!=
TRANS_VER
;
}
if
(
p
->
total
>=
p
->
len
)
{
p
->
left
=
p
->
total
-
p
->
len
;
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
ca272200
...
...
@@ -196,6 +196,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
tError
(
"%s conn %p recv invalid packet, failed to decompress"
,
transLabel
(
pTransInst
),
pConn
);
return
false
;
}
tDebug
(
"head version: %d 2"
,
pHead
->
version
);
pHead
->
code
=
htonl
(
pHead
->
code
);
pHead
->
msgLen
=
htonl
(
pHead
->
msgLen
);
...
...
@@ -236,8 +237,8 @@ static bool uvHandleReq(SSvrConn* pConn) {
if
(
pConn
->
status
==
ConnNormal
&&
pHead
->
noResp
==
0
)
{
transRefSrvHandle
(
pConn
);
if
(
cost
>=
EXCEPTION_LIMIT_US
)
{
tGDebug
(
"%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
msgLen
,
(
int
)
cost
);
tGDebug
(
"%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
msgLen
,
(
int
)
cost
);
}
else
{
tGDebug
(
"%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
msgLen
,
(
int
)
cost
);
...
...
@@ -245,8 +246,8 @@ static bool uvHandleReq(SSvrConn* pConn) {
}
else
{
if
(
cost
>=
EXCEPTION_LIMIT_US
)
{
tGDebug
(
"%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
msgLen
,
pHead
->
noResp
,
transMsg
.
code
,
(
int
)(
cost
));
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
msgLen
,
pHead
->
noResp
,
transMsg
.
code
,
(
int
)(
cost
));
}
else
{
tGDebug
(
"%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
msgLen
,
pHead
->
noResp
,
...
...
@@ -262,6 +263,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg
.
info
.
handle
=
(
void
*
)
transAcquireExHandle
(
transGetRefMgt
(),
pConn
->
refId
);
transMsg
.
info
.
refId
=
pConn
->
refId
;
transMsg
.
info
.
traceId
=
pHead
->
traceId
;
transMsg
.
info
.
cliVer
=
htonl
(
pHead
->
compatibilityVer
);
tGTrace
(
"%s handle %p conn:%p translated to app, refId:%"
PRIu64
,
transLabel
(
pTransInst
),
transMsg
.
info
.
handle
,
pConn
,
pConn
->
refId
);
...
...
@@ -410,6 +412,8 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
pHead
->
traceId
=
pMsg
->
info
.
traceId
;
pHead
->
hasEpSet
=
pMsg
->
info
.
hasEpSet
;
pHead
->
magicNum
=
htonl
(
TRANS_MAGIC_NUM
);
pHead
->
compatibilityVer
=
htonl
(((
STrans
*
)
pConn
->
pTransInst
)
->
compatibilityVer
);
pHead
->
version
=
TRANS_VER
;
// handle invalid drop_task resp, TD-20098
if
(
pConn
->
inType
==
TDMT_SCH_DROP_TASK
&&
pMsg
->
code
==
TSDB_CODE_VND_INVALID_VGROUP_ID
)
{
...
...
source/libs/transport/test/cliBench.c
浏览文件 @
ca272200
...
...
@@ -19,6 +19,7 @@
#include "transLog.h"
#include "trpc.h"
#include "tutil.h"
#include "tversion.h"
typedef
struct
{
int
index
;
...
...
@@ -155,7 +156,7 @@ int main(int argc, char *argv[]) {
}
initLogEnv
();
taosVersionStrToInt
(
version
,
&
(
rpcInit
.
compatibilityVer
));
void
*
pRpc
=
rpcOpen
(
&
rpcInit
);
if
(
pRpc
==
NULL
)
{
tError
(
"failed to initialize RPC"
);
...
...
source/libs/transport/test/svrBench.c
浏览文件 @
ca272200
...
...
@@ -13,12 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#define _DEFAULT_SOURCE
//
#define _DEFAULT_SOURCE
#include "os.h"
#include "tglobal.h"
#include "tqueue.h"
#include "transLog.h"
#include "trpc.h"
#include "tversion.h"
int
msgSize
=
128
;
int
commit
=
0
;
...
...
@@ -151,6 +152,8 @@ int main(int argc, char *argv[]) {
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
processRequestMsg
;
rpcInit
.
idleTime
=
2
*
1500
;
taosVersionStrToInt
(
version
,
&
(
rpcInit
.
compatibilityVer
));
rpcDebugFlag
=
131
;
for
(
int
i
=
1
;
i
<
argc
;
++
i
)
{
...
...
@@ -187,7 +190,7 @@ int main(int argc, char *argv[]) {
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
initLogEnv
();
taosVersionStrToInt
(
version
,
&
(
rpcInit
.
compatibilityVer
));
void
*
pRpc
=
rpcOpen
(
&
rpcInit
);
if
(
pRpc
==
NULL
)
{
tError
(
"failed to start RPC server"
);
...
...
source/libs/transport/test/transUT.cpp
浏览文件 @
ca272200
...
...
@@ -18,10 +18,10 @@
#include "tdatablock.h"
#include "tglobal.h"
#include "tlog.h"
#include "tmisce.h"
#include "transLog.h"
#include "trpc.h"
#include "tmisce.h"
#include "tversion.h"
using
namespace
std
;
const
char
*
label
=
"APP"
;
...
...
@@ -54,6 +54,8 @@ class Client {
rpcInit_
.
user
=
(
char
*
)
user
;
rpcInit_
.
parent
=
this
;
rpcInit_
.
connType
=
TAOS_CONN_CLIENT
;
taosVersionStrToInt
(
version
,
&
(
rpcInit_
.
compatibilityVer
));
this
->
transCli
=
rpcOpen
(
&
rpcInit_
);
tsem_init
(
&
this
->
sem
,
0
,
0
);
}
...
...
@@ -66,6 +68,7 @@ class Client {
void
Restart
(
CB
cb
)
{
rpcClose
(
this
->
transCli
);
rpcInit_
.
cfp
=
cb
;
taosVersionStrToInt
(
version
,
&
(
rpcInit_
.
compatibilityVer
));
this
->
transCli
=
rpcOpen
(
&
rpcInit_
);
}
void
Stop
()
{
...
...
@@ -117,6 +120,7 @@ class Server {
rpcInit_
.
cfp
=
processReq
;
rpcInit_
.
user
=
(
char
*
)
user
;
rpcInit_
.
connType
=
TAOS_CONN_SERVER
;
taosVersionStrToInt
(
version
,
&
(
rpcInit_
.
compatibilityVer
));
}
void
Start
()
{
this
->
transSrv
=
rpcOpen
(
&
this
->
rpcInit_
);
...
...
tests/system-test/0-others/compatibility.py
浏览文件 @
ca272200
...
...
@@ -151,9 +151,10 @@ class TDTestCase:
os
.
system
(
"LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '"
)
os
.
system
(
"LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql"
)
cmd
=
f
" LD_LIBRARY_PATH=
{
bPath
}
/build/lib
{
bPath
}
/build/bin/taos -h localhost ;"
if
os
.
system
(
cmd
)
==
0
:
raise
Exception
(
"failed to execute system command. cmd: %s"
%
cmd
)
# cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;"
# tdLog.info(f"new client version connect to old version taosd, commad return value:{cmd}")
# if os.system(cmd) == 0:
# raise Exception("failed to execute system command. cmd: %s" % cmd)
os
.
system
(
"pkill taosd"
)
# make sure all the data are saved in disk.
self
.
checkProcessPid
(
"taosd"
)
...
...
tools/shell/src/shellNettest.c
浏览文件 @
ca272200
...
...
@@ -15,6 +15,7 @@
#define _GNU_SOURCE
#include "shellInt.h"
#include "tversion.h"
static
void
shellWorkAsClient
()
{
SShellArgs
*
pArgs
=
&
shell
.
args
;
...
...
@@ -33,6 +34,7 @@ static void shellWorkAsClient() {
rpcInit
.
user
=
"_dnd"
;
rpcInit
.
timeToGetConn
=
tsTimeToGetAvailableConn
;
taosVersionStrToInt
(
version
,
&
(
rpcInit
.
compatibilityVer
));
clientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
clientRpc
==
NULL
)
{
printf
(
"failed to init net test client since %s
\r\n
"
,
terrstr
());
...
...
@@ -123,6 +125,8 @@ static void shellWorkAsServer() {
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
taosVersionStrToInt
(
version
,
&
(
rpcInit
.
compatibilityVer
));
void
*
serverRpc
=
rpcOpen
(
&
rpcInit
);
if
(
serverRpc
==
NULL
)
{
printf
(
"failed to init net test server since %s
\r\n
"
,
terrstr
());
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录