Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1899c460
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
1899c460
编写于
5月 10, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
5月 10, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12220 from taosdata/feature/dnode
refactor: remove rpc client in executor and scanoperator
上级
74b12592
87aca17f
变更
22
显示空白变更内容
内联
并排
Showing
22 changed file
with
132 addition
and
172 deletion
+132
-172
include/common/tmsgcb.h
include/common/tmsgcb.h
+1
-0
include/libs/executor/executor.h
include/libs/executor/executor.h
+7
-5
include/libs/qcom/query.h
include/libs/qcom/query.h
+3
-1
source/dnode/mgmt/implement/inc/dmImp.h
source/dnode/mgmt/implement/inc/dmImp.h
+4
-2
source/dnode/mgmt/implement/src/dmExec.c
source/dnode/mgmt/implement/src/dmExec.c
+26
-15
source/dnode/mgmt/implement/src/dmHandle.c
source/dnode/mgmt/implement/src/dmHandle.c
+3
-2
source/dnode/mgmt/implement/src/dmObj.c
source/dnode/mgmt/implement/src/dmObj.c
+0
-3
source/dnode/mgmt/implement/src/dmTransport.c
source/dnode/mgmt/implement/src/dmTransport.c
+58
-75
source/dnode/mgmt/interface/inc/dmInt.h
source/dnode/mgmt/interface/inc/dmInt.h
+1
-1
source/dnode/mgmt/interface/src/dmInt.c
source/dnode/mgmt/interface/src/dmInt.c
+6
-4
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+0
-1
source/dnode/mnode/impl/src/mndQuery.c
source/dnode/mnode/impl/src/mndQuery.c
+1
-1
source/dnode/qnode/src/qnode.c
source/dnode/qnode/src/qnode.c
+1
-1
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+2
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+3
-0
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+2
-1
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+3
-3
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-6
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+4
-25
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+3
-24
source/libs/qcom/src/queryUtil.c
source/libs/qcom/src/queryUtil.c
+2
-1
source/util/test/encodeTest.cpp
source/util/test/encodeTest.cpp
+1
-0
未找到文件。
include/common/tmsgcb.h
浏览文件 @
1899c460
...
...
@@ -57,6 +57,7 @@ typedef struct {
RegisterBrokenLinkArgFp
registerBrokenLinkArgFp
;
ReleaseHandleFp
releaseHandleFp
;
ReportStartup
reportStartupFp
;
void
*
clientRpc
;
}
SMsgCb
;
void
tmsgSetDefaultMsgCb
(
const
SMsgCb
*
pMsgCb
);
...
...
include/libs/executor/executor.h
浏览文件 @
1899c460
...
...
@@ -22,6 +22,7 @@ extern "C" {
#include "query.h"
#include "tcommon.h"
#include "tmsgcb.h"
typedef
void
*
qTaskInfo_t
;
typedef
void
*
DataSinkHandle
;
...
...
@@ -34,6 +35,7 @@ typedef struct SReadHandle {
void
*
config
;
void
*
vnode
;
void
*
mnd
;
SMsgCb
*
pMsgCb
;
}
SReadHandle
;
#define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1
...
...
include/libs/qcom/query.h
浏览文件 @
1899c460
...
...
@@ -24,6 +24,7 @@ extern "C" {
#include "thash.h"
#include "tlog.h"
#include "tmsg.h"
#include "tmsgcb.h"
typedef
enum
{
JOB_TASK_STATUS_NULL
=
0
,
...
...
@@ -149,7 +150,8 @@ int32_t cleanupTaskQueue();
*/
int32_t
taosAsyncExec
(
__async_exec_fn_t
execFn
,
void
*
execParam
,
int32_t
*
code
);
int32_t
asyncSendMsgToServerExt
(
void
*
pTransporter
,
SEpSet
*
epSet
,
int64_t
*
pTransporterId
,
const
SMsgSendInfo
*
pInfo
,
bool
persistHandle
,
void
*
ctx
);
int32_t
asyncSendMsgToServerExt
(
void
*
pTransporter
,
SEpSet
*
epSet
,
int64_t
*
pTransporterId
,
const
SMsgSendInfo
*
pInfo
,
bool
persistHandle
,
void
*
ctx
);
/**
* Asynchronously send message to server, after the response received, the callback will be incured.
...
...
source/dnode/mgmt/implement/inc/dmImp.h
浏览文件 @
1899c460
...
...
@@ -26,8 +26,10 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper);
void
dmCloseNode
(
SMgmtWrapper
*
pWrapper
);
// dmTransport.c
int32_t
dmInitTrans
(
SDnode
*
pDnode
);
void
dmCleanupTrans
(
SDnode
*
pDnode
);
int32_t
dmInitServer
(
SDnode
*
pDnode
);
void
dmCleanupServer
(
SDnode
*
pDnode
);
int32_t
dmInitClient
(
SDnode
*
pDnode
);
void
dmCleanupClient
(
SDnode
*
pDnode
);
SProcCfg
dmGenProcCfg
(
SMgmtWrapper
*
pWrapper
);
SMsgCb
dmGetMsgcb
(
SMgmtWrapper
*
pWrapper
);
int32_t
dmInitMsgHandle
(
SDnode
*
pDnode
);
...
...
source/dnode/mgmt/implement/src/dmExec.c
浏览文件 @
1899c460
...
...
@@ -213,10 +213,12 @@ static int32_t dmOpenNodes(SDnode *pDnode) {
}
pWrapper
->
procType
=
DND_PROC_CHILD
;
if
(
dmInitClient
(
pDnode
)
!=
0
)
{
return
-
1
;
}
SMsgCb
msgCb
=
pDnode
->
data
.
msgCb
;
msgCb
.
pWrapper
=
pWrapper
;
tmsgSetDefaultMsgCb
(
&
msgCb
);
pDnode
->
data
.
msgCb
=
dmGetMsgcb
(
pWrapper
);
tmsgSetDefaultMsgCb
(
&
pDnode
->
data
.
msgCb
);
if
(
dmOpenNode
(
pWrapper
)
!=
0
)
{
dError
(
"node:%s, failed to open since %s"
,
pWrapper
->
name
,
terrstr
());
...
...
@@ -234,6 +236,15 @@ static int32_t dmOpenNodes(SDnode *pDnode) {
pWrapper
->
procType
=
DND_PROC_SINGLE
;
}
if
(
n
==
DNODE
)
{
if
(
dmInitClient
(
pDnode
)
!=
0
)
{
return
-
1
;
}
pDnode
->
data
.
msgCb
=
dmGetMsgcb
(
pWrapper
);
tmsgSetDefaultMsgCb
(
&
pDnode
->
data
.
msgCb
);
}
if
(
dmOpenNode
(
pWrapper
)
!=
0
)
{
dError
(
"node:%s, failed to open since %s"
,
pWrapper
->
name
,
terrstr
());
return
-
1
;
...
...
@@ -281,13 +292,14 @@ static void dmProcessProcHandle(void *handle) {
}
static
void
dmWatchNodes
(
SDnode
*
pDnode
)
{
if
(
pDnode
->
ptype
!=
DND_PROC_PARENT
)
return
;
if
(
pDnode
->
ntype
==
NODE_END
)
return
;
taosThreadMutexLock
(
&
pDnode
->
mutex
);
if
(
pDnode
->
ptype
==
DND_PROC_PARENT
)
{
for
(
EDndNodeType
n
=
DNODE
+
1
;
n
<
NODE_END
;
++
n
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
n
];
if
(
!
pWrapper
->
required
)
continue
;
if
(
pWrapper
->
procType
!=
DND_PROC_PARENT
)
continue
;
if
(
pDnode
->
ntype
==
NODE_END
)
continue
;
if
(
pWrapper
->
procId
<=
0
||
!
taosProcExist
(
pWrapper
->
procId
))
{
dWarn
(
"node:%s, process:%d is killed and needs to be restarted"
,
pWrapper
->
name
,
pWrapper
->
procId
);
...
...
@@ -297,7 +309,6 @@ static void dmWatchNodes(SDnode *pDnode) {
dmNewNodeProc
(
pWrapper
,
n
);
}
}
}
taosThreadMutexUnlock
(
&
pDnode
->
mutex
);
}
...
...
source/dnode/mgmt/implement/src/dmHandle.c
浏览文件 @
1899c460
...
...
@@ -242,7 +242,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
return
-
1
;
}
if
(
dmInit
Trans
(
pDnode
)
!=
0
)
{
if
(
dmInit
Server
(
pDnode
)
!=
0
)
{
dError
(
"failed to init transport since %s"
,
terrstr
());
return
-
1
;
}
...
...
@@ -275,7 +275,8 @@ static void dmCleanupMgmt(SMgmtWrapper *pWrapper) {
}
taosWUnLockLatch
(
&
pDnode
->
data
.
latch
);
dmCleanupTrans
(
pDnode
);
dmCleanupClient
(
pDnode
);
dmCleanupServer
(
pDnode
);
dInfo
(
"dnode-mgmt is cleaned up"
);
}
...
...
source/dnode/mgmt/implement/src/dmObj.c
浏览文件 @
1899c460
...
...
@@ -124,9 +124,6 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
goto
_OVER
;
}
pDnode
->
data
.
msgCb
=
dmGetMsgcb
(
&
pDnode
->
wrappers
[
DNODE
]);
tmsgSetDefaultMsgCb
(
&
pDnode
->
data
.
msgCb
);
dInfo
(
"dnode is created, data:%p"
,
pDnode
);
code
=
0
;
...
...
source/dnode/mgmt/implement/src/dmTransport.c
浏览文件 @
1899c460
...
...
@@ -16,6 +16,8 @@
#define _DEFAULT_SOURCE
#include "dmImp.h"
#include "qworker.h"
#define INTERNAL_USER "_dnd"
#define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_pwd"
...
...
@@ -85,17 +87,14 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
if
((
pMsg
=
taosAllocateQitem
(
sizeof
(
SNodeMsg
)))
==
NULL
)
goto
_OVER
;
if
(
dmBuildMsg
(
pMsg
,
pRpc
)
!=
0
)
goto
_OVER
;
if
(
pWrapper
->
procType
==
DND_PROC_SINGLE
)
{
if
(
pWrapper
->
procType
!=
DND_PROC_PARENT
)
{
dTrace
(
"msg:%p, created, type:%s handle:%p user:%s"
,
pMsg
,
TMSG_INFO
(
msgType
),
pRpc
->
handle
,
pMsg
->
user
);
code
=
(
*
msgFp
)(
pWrapper
,
pMsg
);
}
else
if
(
pWrapper
->
procType
==
DND_PROC_PARENT
)
{
}
else
{
dTrace
(
"msg:%p, created and put into child queue, type:%s handle:%p code:0x%04x user:%s contLen:%d"
,
pMsg
,
TMSG_INFO
(
msgType
),
pRpc
->
handle
,
pMsg
->
rpcMsg
.
code
&
0XFFFF
,
pMsg
->
user
,
pRpc
->
contLen
);
code
=
taosProcPutToChildQ
(
pWrapper
->
procObj
,
pMsg
,
sizeof
(
SNodeMsg
),
pRpc
->
pCont
,
pRpc
->
contLen
,
(
isReq
&&
(
pMsg
->
rpcMsg
.
code
==
0
))
?
pRpc
->
handle
:
NULL
,
pRpc
->
refId
,
PROC_FUNC_REQ
);
}
else
{
dTrace
(
"msg:%p, should not processed in child process, handle:%p user:%s"
,
pMsg
,
pRpc
->
handle
,
pMsg
->
user
);
ASSERT
(
1
);
}
_OVER:
...
...
@@ -108,7 +107,7 @@ _OVER:
}
else
{
dError
(
"msg:%p, type:%s handle:%p failed to process since 0x%04x:%s"
,
pMsg
,
TMSG_INFO
(
msgType
),
pRpc
->
handle
,
code
&
0XFFFF
,
terrstr
());
if
(
msgType
&
1U
)
{
if
(
isReq
)
{
if
(
terrno
!=
0
)
code
=
terrno
;
if
(
code
==
TSDB_CODE_NODE_NOT_DEPLOYED
||
code
==
TSDB_CODE_NODE_OFFLINE
)
{
if
(
msgType
>
TDMT_MND_MSG
&&
msgType
<
TDMT_VND_MSG
)
{
...
...
@@ -130,21 +129,26 @@ _OVER:
}
static
void
dmProcessMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
tmsg_t
msgType
=
pMsg
->
msgType
;
bool
isReq
=
msgType
&
1u
;
SMsgHandle
*
pHandle
=
&
pTrans
->
msgHandles
[
TMSG_INDEX
(
msgType
)];
SMsgHandle
*
pHandle
=
&
pTrans
->
msgHandles
[
TMSG_INDEX
(
msgType
)];
SMgmtWrapper
*
pWrapper
=
pHandle
->
pNdWrapper
;
if
(
msgType
==
TDMT_DND_SERVER_STATUS
)
{
switch
(
msgType
)
{
case
TDMT_DND_SERVER_STATUS
:
dTrace
(
"server status req will be processed, handle:%p, app:%p"
,
pMsg
->
handle
,
pMsg
->
ahandle
);
dmProcessServerStatusReq
(
pDnode
,
pMsg
);
return
;
}
if
(
msgType
==
TDMT_DND_NET_TEST
)
{
case
TDMT_DND_NET_TEST
:
dTrace
(
"net test req will be processed, handle:%p, app:%p"
,
pMsg
->
handle
,
pMsg
->
ahandle
);
dmProcessServerStatusReq
(
pDnode
,
pMsg
);
dmProcessNetTestReq
(
pDnode
,
pMsg
);
return
;
case
TDMT_MND_SYSTABLE_RETRIEVE_RSP
:
case
TDMT_VND_FETCH_RSP
:
dTrace
(
"retrieve rsp is received"
);
qWorkerProcessFetchRsp
(
NULL
,
NULL
,
pMsg
);
pMsg
->
pCont
=
NULL
;
// already freed in qworker
return
;
}
...
...
@@ -233,16 +237,6 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
return
0
;
}
static
inline
int32_t
dmSendRpcReq
(
SDnode
*
pDnode
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pReq
)
{
if
(
pDnode
->
trans
.
clientRpc
==
NULL
)
{
terrno
=
TSDB_CODE_NODE_OFFLINE
;
return
-
1
;
}
rpcSendRequest
(
pDnode
->
trans
.
clientRpc
,
pEpSet
,
pReq
,
NULL
);
return
0
;
}
static
void
dmSendRpcRedirectRsp
(
SDnode
*
pDnode
,
const
SRpcMsg
*
pReq
)
{
SEpSet
epSet
=
{
0
};
dmGetMnodeEpSet
(
pDnode
,
&
epSet
);
...
...
@@ -288,28 +282,20 @@ void dmSendToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) {
}
static
inline
int32_t
dmSendReq
(
SMgmtWrapper
*
pWrapper
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pReq
)
{
if
(
pWrapper
->
pDnode
->
status
!=
DND_STAT_RUNNING
)
{
SDnode
*
pDnode
=
pWrapper
->
pDnode
;
if
(
pDnode
->
status
!=
DND_STAT_RUNNING
)
{
terrno
=
TSDB_CODE_NODE_OFFLINE
;
dError
(
"failed to send rpc msg since %s, handle:%p"
,
terrstr
(),
pReq
->
handle
);
return
-
1
;
}
if
(
pWrapper
->
procType
!=
DND_PROC_CHILD
)
{
return
dmSendRpcReq
(
pWrapper
->
pDnode
,
pEpSet
,
pReq
);
}
else
{
char
*
pHead
=
taosMemoryMalloc
(
sizeof
(
SRpcMsg
)
+
sizeof
(
SEpSet
));
if
(
pHead
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
if
(
pDnode
->
trans
.
clientRpc
==
NULL
)
{
terrno
=
TSDB_CODE_NODE_OFFLINE
;
return
-
1
;
}
memcpy
(
pHead
,
pReq
,
sizeof
(
SRpcMsg
));
memcpy
(
pHead
+
sizeof
(
SRpcMsg
),
pEpSet
,
sizeof
(
SEpSet
));
taosProcPutToParentQ
(
pWrapper
->
procObj
,
pHead
,
sizeof
(
SRpcMsg
)
+
sizeof
(
SEpSet
),
pReq
->
pCont
,
pReq
->
contLen
,
PROC_FUNC_REQ
);
taosMemoryFree
(
pHead
);
rpcSendRequest
(
pDnode
->
trans
.
clientRpc
,
pEpSet
,
pReq
,
NULL
);
return
0
;
}
}
static
inline
void
dmSendRsp
(
SMgmtWrapper
*
pWrapper
,
const
SRpcMsg
*
pRsp
)
{
...
...
@@ -396,9 +382,10 @@ static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
pMsg
->
pCont
=
pCont
;
if
(
ftype
==
PROC_FUNC_REQ
)
{
ASSERT
(
1
);
dTrace
(
"msg:%p, get from parent queue, send req:%s handle:%p code:0x%04x, app:%p"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
handle
,
code
,
pMsg
->
ahandle
);
dmSendR
pcReq
(
pWrapper
->
pDnode
,
(
SEpSet
*
)((
char
*
)
pMsg
+
sizeof
(
SRpcMsg
)),
pMsg
);
dmSendR
eq
(
pWrapper
,
(
SEpSet
*
)((
char
*
)
pMsg
+
sizeof
(
SRpcMsg
)),
pMsg
);
}
else
if
(
ftype
==
PROC_FUNC_RSP
)
{
dTrace
(
"msg:%p, get from parent queue, rsp handle:%p code:0x%04x, app:%p"
,
pMsg
,
pMsg
->
handle
,
code
,
pMsg
->
ahandle
);
pMsg
->
refId
=
taosProcRemoveHandle
(
pWrapper
->
procObj
,
pMsg
->
handle
);
...
...
@@ -421,7 +408,8 @@ static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
}
SProcCfg
dmGenProcCfg
(
SMgmtWrapper
*
pWrapper
)
{
SProcCfg
cfg
=
{.
childConsumeFp
=
(
ProcConsumeFp
)
dmConsumeChildQueue
,
SProcCfg
cfg
=
{
.
childConsumeFp
=
(
ProcConsumeFp
)
dmConsumeChildQueue
,
.
childMallocHeadFp
=
(
ProcMallocFp
)
taosAllocateQitem
,
.
childFreeHeadFp
=
(
ProcFreeFp
)
taosFreeQitem
,
.
childMallocBodyFp
=
(
ProcMallocFp
)
rpcMallocCont
,
...
...
@@ -433,11 +421,12 @@ SProcCfg dmGenProcCfg(SMgmtWrapper *pWrapper) {
.
parentFreeBodyFp
=
(
ProcFreeFp
)
rpcFreeCont
,
.
shm
=
pWrapper
->
procShm
,
.
parent
=
pWrapper
,
.
name
=
pWrapper
->
name
};
.
name
=
pWrapper
->
name
,
};
return
cfg
;
}
bool
rpcRfp
(
int32_t
code
)
{
static
bool
rpcRfp
(
int32_t
code
)
{
if
(
code
==
TSDB_CODE_RPC_REDIRECT
)
{
return
true
;
}
else
{
...
...
@@ -445,7 +434,7 @@ bool rpcRfp(int32_t code) {
}
}
static
int32_t
dmInitClient
(
SDnode
*
pDnode
)
{
int32_t
dmInitClient
(
SDnode
*
pDnode
)
{
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
SRpcInit
rpcInit
=
{
0
};
...
...
@@ -471,11 +460,15 @@ static int32_t dmInitClient(SDnode *pDnode) {
return
-
1
;
}
pDnode
->
data
.
msgCb
=
dmGetMsgcb
(
&
pDnode
->
wrappers
[
DNODE
]);
tmsgSetDefaultMsgCb
(
&
pDnode
->
data
.
msgCb
);
dDebug
(
"dnode rpc client is initialized"
);
return
0
;
}
static
void
dmCleanupClient
(
SDnode
*
pDnode
)
{
void
dmCleanupClient
(
SDnode
*
pDnode
)
{
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
if
(
pTrans
->
clientRpc
)
{
rpcClose
(
pTrans
->
clientRpc
);
...
...
@@ -517,7 +510,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
SAuthReq
authReq
=
{
0
};
tstrncpy
(
authReq
.
user
,
user
,
TSDB_USER_LEN
);
int32_t
contLen
=
tSerializeSAuthReq
(
NULL
,
0
,
&
authReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSAuthReq
(
pReq
,
contLen
,
&
authReq
);
SRpcMsg
rpcMsg
=
{.
pCont
=
pReq
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_MND_AUTH
,
.
ahandle
=
(
void
*
)
9528
};
...
...
@@ -543,7 +536,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
return
rpcRsp
.
code
;
}
static
int32_t
dmInitServer
(
SDnode
*
pDnode
)
{
int32_t
dmInitServer
(
SDnode
*
pDnode
)
{
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
SRpcInit
rpcInit
=
{
0
};
...
...
@@ -569,7 +562,7 @@ static int32_t dmInitServer(SDnode *pDnode) {
return
0
;
}
static
void
dmCleanupServer
(
SDnode
*
pDnode
)
{
void
dmCleanupServer
(
SDnode
*
pDnode
)
{
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
if
(
pTrans
->
serverRpc
)
{
rpcClose
(
pTrans
->
serverRpc
);
...
...
@@ -578,17 +571,6 @@ static void dmCleanupServer(SDnode *pDnode) {
}
}
int32_t
dmInitTrans
(
SDnode
*
pDnode
)
{
if
(
dmInitServer
(
pDnode
)
!=
0
)
return
-
1
;
if
(
dmInitClient
(
pDnode
)
!=
0
)
return
-
1
;
return
0
;
}
void
dmCleanupTrans
(
SDnode
*
pDnode
)
{
dmCleanupServer
(
pDnode
);
dmCleanupClient
(
pDnode
);
}
SMsgCb
dmGetMsgcb
(
SMgmtWrapper
*
pWrapper
)
{
SMsgCb
msgCb
=
{
.
sendReqFp
=
dmSendReq
,
...
...
@@ -598,6 +580,7 @@ SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
.
releaseHandleFp
=
dmReleaseHandle
,
.
reportStartupFp
=
dmReportStartupByWrapper
,
.
pWrapper
=
pWrapper
,
.
clientRpc
=
pWrapper
->
pDnode
->
trans
.
clientRpc
,
};
return
msgCb
;
}
source/dnode/mgmt/interface/inc/dmInt.h
浏览文件 @
1899c460
...
...
@@ -37,7 +37,7 @@ void dmSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgF
void
dmReportStartup
(
SDnode
*
pDnode
,
const
char
*
pName
,
const
char
*
pDesc
);
void
dmReportStartupByWrapper
(
SMgmtWrapper
*
pWrapper
,
const
char
*
pName
,
const
char
*
pDesc
);
void
dmProcessServerStatusReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
);
void
dmProcessNet
t
estReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
);
void
dmProcessNet
T
estReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
);
void
dmGetMonitorSysInfo
(
SMonSysInfo
*
pInfo
);
// dmFile.c
...
...
source/dnode/mgmt/interface/src/dmInt.c
浏览文件 @
1899c460
...
...
@@ -171,16 +171,17 @@ static void dmGetServerStatus(SDnode *pDnode, SServerStatusRsp *pStatus) {
}
}
void
dmProcessNet
testReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpc
)
{
void
dmProcessNet
TestReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
)
{
dDebug
(
"net test req is received"
);
SRpcMsg
rsp
=
{.
handle
=
pR
pc
->
handle
,
.
refId
=
pRpc
->
refId
,
.
ahandle
=
pRpc
->
ahandle
,
.
code
=
0
};
rsp
.
pCont
=
rpcMallocCont
(
pR
pc
->
contLen
);
SRpcMsg
rsp
=
{.
handle
=
pR
eq
->
handle
,
.
refId
=
pReq
->
refId
,
.
ahandle
=
pReq
->
ahandle
,
.
code
=
0
};
rsp
.
pCont
=
rpcMallocCont
(
pR
eq
->
contLen
);
if
(
rsp
.
pCont
==
NULL
)
{
rsp
.
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
else
{
rsp
.
contLen
=
pR
pc
->
contLen
;
rsp
.
contLen
=
pR
eq
->
contLen
;
}
rpcSendResponse
(
&
rsp
);
rpcFreeCont
(
pReq
->
pCont
);
}
void
dmProcessServerStatusReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
)
{
...
...
@@ -208,6 +209,7 @@ void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pReq) {
_OVER:
rpcSendResponse
(
&
rspMsg
);
rpcFreeCont
(
pReq
->
pCont
);
}
void
dmGetMonitorSysInfo
(
SMonSysInfo
*
pInfo
)
{
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
1899c460
...
...
@@ -289,7 +289,6 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_QUERY
,
vmProcessQueryMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_QUERY_CONTINUE
,
vmProcessQueryMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_FETCH
,
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_FETCH_RSP
,
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_ALTER_TABLE
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_UPDATE_TAG_VAL
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_TABLE_META
,
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
...
...
source/dnode/mnode/impl/src/mndQuery.c
浏览文件 @
1899c460
...
...
@@ -20,7 +20,7 @@
int32_t
mndProcessQueryMsg
(
SNodeMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
SReadHandle
handle
=
{.
mnd
=
pMnode
};
SReadHandle
handle
=
{.
mnd
=
pMnode
,
.
pMsgCb
=
&
pMnode
->
msgCb
};
mTrace
(
"msg:%p, in query queue is processing"
,
pReq
);
switch
(
pReq
->
rpcMsg
.
msgType
)
{
...
...
source/dnode/qnode/src/qnode.c
浏览文件 @
1899c460
...
...
@@ -51,7 +51,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; }
int32_t
qndProcessQueryMsg
(
SQnode
*
pQnode
,
SRpcMsg
*
pMsg
)
{
qTrace
(
"message in qnode query queue is processing"
);
SReadHandle
handle
=
{
0
};
SReadHandle
handle
=
{
.
pMsgCb
=
&
pQnode
->
msgCb
};
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_QUERY
:
{
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
1899c460
...
...
@@ -34,6 +34,7 @@
#include "tlockfree.h"
#include "tlosertree.h"
#include "tmallocator.h"
#include "tmsgcb.h"
#include "tskiplist.h"
#include "tstream.h"
#include "ttime.h"
...
...
@@ -121,7 +122,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
// sma
int32_t
tsdbRegisterRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
SVCreateStbReq
*
pReq
);
int32_t
tsdbRegisterRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
SVCreateStbReq
*
pReq
,
SMsgCb
*
pMsgCb
);
int32_t
tsdbFetchTbUidList
(
STsdb
*
pTsdb
,
STbUidStore
**
ppStore
,
tb_uid_t
suid
,
tb_uid_t
uid
);
int32_t
tsdbUpdateTbUidList
(
STsdb
*
pTsdb
,
STbUidStore
*
pUidStore
);
void
tsdbUidStoreDestory
(
STbUidStore
*
pStore
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
1899c460
...
...
@@ -378,6 +378,7 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
SReadHandle
handle
=
{
.
reader
=
pReadHandle
,
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
};
pTopic
->
buffer
.
output
[
j
].
pReadHandle
=
pReadHandle
;
pTopic
->
buffer
.
output
[
j
].
task
=
qCreateStreamExecTaskInfo
(
pTopic
->
qmsg
,
&
handle
);
...
...
@@ -857,6 +858,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
SReadHandle
handle
=
{
.
reader
=
pExec
->
pExecReader
[
i
],
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
};
pExec
->
task
[
i
]
=
qCreateStreamExecTaskInfo
(
pExec
->
qmsg
,
&
handle
);
ASSERT
(
pExec
->
task
[
i
]);
...
...
@@ -914,6 +916,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
SReadHandle
handle
=
{
.
reader
=
pStreamReader
,
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
};
pTask
->
exec
.
runners
[
i
].
inputHandle
=
pStreamReader
;
pTask
->
exec
.
runners
[
i
].
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
&
handle
);
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
浏览文件 @
1899c460
...
...
@@ -1698,7 +1698,7 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
* @param pReq
* @return int32_t
*/
int32_t
tsdbRegisterRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
SVCreateStbReq
*
pReq
)
{
int32_t
tsdbRegisterRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
SVCreateStbReq
*
pReq
,
SMsgCb
*
pMsgCb
)
{
if
(
!
pReq
->
rollup
)
{
tsdbDebug
(
"vgId:%d return directly since no rollup for stable %s %"
PRIi64
,
REPO_ID
(
pTsdb
),
pReq
->
name
,
pReq
->
suid
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1742,6 +1742,7 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateStbReq *pReq) {
SReadHandle
handle
=
{
.
reader
=
pReadHandle
,
.
meta
=
pMeta
,
.
pMsgCb
=
pMsgCb
,
};
if
(
param
->
qmsg1
)
{
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
1899c460
...
...
@@ -142,9 +142,9 @@ _err:
int
vnodeProcessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
vTrace
(
"message in vnode query queue is processing"
);
#if 0
SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode};
SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode
, .pMsgCb = &pVnode->msgCb
};
#endif
SReadHandle
handle
=
{.
meta
=
pVnode
->
pMeta
,
.
config
=
&
pVnode
->
config
,
.
vnode
=
pVnode
};
SReadHandle
handle
=
{.
meta
=
pVnode
->
pMeta
,
.
config
=
&
pVnode
->
config
,
.
vnode
=
pVnode
,
.
pMsgCb
=
&
pVnode
->
msgCb
};
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_QUERY
:
return
qWorkerProcessQueryMsg
(
&
handle
,
pVnode
->
pQuery
,
pMsg
);
...
...
@@ -305,7 +305,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq,
goto
_err
;
}
tsdbRegisterRSma
(
pVnode
->
pTsdb
,
pVnode
->
pMeta
,
&
req
);
tsdbRegisterRSma
(
pVnode
->
pTsdb
,
pVnode
->
pMeta
,
&
req
,
&
pVnode
->
msgCb
);
tDecoderClear
(
&
coder
);
return
0
;
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
1899c460
...
...
@@ -392,10 +392,7 @@ typedef struct SStreamBlockScanInfo {
}
SStreamBlockScanInfo
;
typedef
struct
SSysTableScanInfo
{
union
{
void
*
pTransporter
;
SReadHandle
readHandle
;
};
SRetrieveMetaTableRsp
*
pRsp
;
SRetrieveTableReq
req
;
...
...
@@ -663,8 +660,6 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
char
*
pData
,
int16_t
bytes
,
bool
masterscan
,
uint64_t
groupId
,
SExecTaskInfo
*
pTaskInfo
,
bool
isIntervalQuery
,
SAggSupporter
*
pSup
);
SOperatorInfo
*
createExchangeOperatorInfo
(
const
SNodeList
*
pSources
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableScanOperatorInfo
(
void
*
pDataReader
,
SQueryTableDataCond
*
pCond
,
int32_t
numOfOutput
,
int32_t
dataLoadFlag
,
const
uint8_t
*
scanInfo
,
SArray
*
pColMatchInfo
,
SSDataBlock
*
pResBlock
,
SNode
*
pCondition
,
SInterval
*
pInterval
,
double
sampleRatio
,
SExecTaskInfo
*
pTaskInfo
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
1899c460
...
...
@@ -3264,7 +3264,8 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
return
TSDB_CODE_SUCCESS
;
}
SOperatorInfo
*
createExchangeOperatorInfo
(
const
SNodeList
*
pSources
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
)
{
SOperatorInfo
*
createExchangeOperatorInfo
(
void
*
pTransporter
,
const
SNodeList
*
pSources
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
)
{
SExchangeInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SExchangeInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
...
...
@@ -3307,29 +3308,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
pOperator
->
fpSet
=
createOperatorFpSet
(
prepareLoadRemoteData
,
doLoadRemoteData
,
NULL
,
NULL
,
destroyExchangeOperatorInfo
,
NULL
,
NULL
,
NULL
);
#if 1
{
// todo refactor
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
"EX"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
qProcessFetchRsp
;
rpcInit
.
sessions
=
tsMaxConnections
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
user
=
(
char
*
)
"root"
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
spi
=
1
;
rpcInit
.
secret
=
(
char
*
)
"dcc5bed04851fec854c035b2e40263b6"
;
pInfo
->
pTransporter
=
rpcOpen
(
&
rpcInit
);
if
(
pInfo
->
pTransporter
==
NULL
)
{
return
NULL
;
// todo
}
}
#endif
pInfo
->
pTransporter
=
pTransporter
;
return
pOperator
;
...
...
@@ -4801,7 +4780,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
==
type
)
{
SExchangePhysiNode
*
pExchange
=
(
SExchangePhysiNode
*
)
pPhyNode
;
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pExchange
->
node
.
pOutputDataBlockDesc
);
return
createExchangeOperatorInfo
(
pExchange
->
pSrcEndPoints
,
pResBlock
,
pTaskInfo
);
return
createExchangeOperatorInfo
(
p
Handle
->
pMsgCb
->
clientRpc
,
p
Exchange
->
pSrcEndPoints
,
pResBlock
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
==
type
)
{
SScanPhysiNode
*
pScanPhyNode
=
(
SScanPhysiNode
*
)
pPhyNode
;
// simple child table.
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
1899c460
...
...
@@ -1057,7 +1057,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
pMsgSendInfo
->
fp
=
loadSysTableCallback
;
int64_t
transporterId
=
0
;
int32_t
code
=
asyncSendMsgToServer
(
pInfo
->
pTransporter
,
&
pInfo
->
epSet
,
&
transporterId
,
pMsgSendInfo
);
int32_t
code
=
asyncSendMsgToServer
(
pInfo
->
readHandle
.
pMsgCb
->
clientRpc
,
&
pInfo
->
epSet
,
&
transporterId
,
pMsgSendInfo
);
tsem_wait
(
&
pInfo
->
ready
);
if
(
pTaskInfo
->
code
)
{
...
...
@@ -1182,29 +1183,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
}
else
{
tsem_init
(
&
pInfo
->
ready
,
0
,
0
);
pInfo
->
epSet
=
epset
;
#if 1
{
// todo refactor
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
"DB-META"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
qProcessFetchRsp
;
rpcInit
.
sessions
=
tsMaxConnections
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
user
=
(
char
*
)
"root"
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
spi
=
1
;
rpcInit
.
secret
=
(
char
*
)
"dcc5bed04851fec854c035b2e40263b6"
;
pInfo
->
pTransporter
=
rpcOpen
(
&
rpcInit
);
if
(
pInfo
->
pTransporter
==
NULL
)
{
return
NULL
;
// todo
}
}
#endif
pInfo
->
readHandle
=
*
(
SReadHandle
*
)
readHandle
;
}
pOperator
->
name
=
"SysTableScanOperator"
;
...
...
source/libs/qcom/src/queryUtil.c
浏览文件 @
1899c460
...
...
@@ -136,7 +136,8 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
return
0
;
}
int32_t
asyncSendMsgToServerExt
(
void
*
pTransporter
,
SEpSet
*
epSet
,
int64_t
*
pTransporterId
,
const
SMsgSendInfo
*
pInfo
,
bool
persistHandle
,
void
*
rpcCtx
)
{
int32_t
asyncSendMsgToServerExt
(
void
*
pTransporter
,
SEpSet
*
epSet
,
int64_t
*
pTransporterId
,
const
SMsgSendInfo
*
pInfo
,
bool
persistHandle
,
void
*
rpcCtx
)
{
char
*
pMsg
=
rpcMallocCont
(
pInfo
->
msgInfo
.
len
);
if
(
NULL
==
pMsg
)
{
qError
(
"0x%"
PRIx64
" msg:%s malloc failed"
,
pInfo
->
requestId
,
TMSG_INFO
(
pInfo
->
msgType
));
...
...
source/util/test/encodeTest.cpp
浏览文件 @
1899c460
...
...
@@ -440,6 +440,7 @@ TEST(td_encode_test, compound_struct_encode_test) {
tCoderClear(&decoder);
}
#endif
#pragma GCC diagnostic pop
#endif
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录