Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4d5bd478
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
4d5bd478
编写于
5月 21, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
5月 21, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12806 from taosdata/fix/valgrind
fix: santizer error in sim
上级
62e3cb16
94230001
变更
32
隐藏空白更改
内联
并排
Showing
32 changed file
with
157 addition
and
227 deletion
+157
-227
include/dnode/qnode/qnode.h
include/dnode/qnode/qnode.h
+0
-1
include/util/taoserror.h
include/util/taoserror.h
+2
-3
source/dnode/mgmt/mgmt_bnode/src/bmWorker.c
source/dnode/mgmt/mgmt_bnode/src/bmWorker.c
+3
-5
source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
+3
-7
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
+2
-24
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
+17
-50
source/dnode/mgmt/mgmt_snode/src/smWorker.c
source/dnode/mgmt/mgmt_snode/src/smWorker.c
+3
-5
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+11
-12
source/dnode/mgmt/node_mgmt/inc/dmMgmt.h
source/dnode/mgmt/node_mgmt/inc/dmMgmt.h
+0
-1
source/dnode/mgmt/node_mgmt/src/dmMgmt.c
source/dnode/mgmt/node_mgmt/src/dmMgmt.c
+0
-5
source/dnode/mgmt/node_mgmt/src/dmTransport.c
source/dnode/mgmt/node_mgmt/src/dmTransport.c
+2
-1
source/dnode/mnode/impl/src/mndBnode.c
source/dnode/mnode/impl/src/mndBnode.c
+4
-4
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+1
-1
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+6
-6
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+4
-4
source/dnode/mnode/impl/src/mndFunc.c
source/dnode/mnode/impl/src/mndFunc.c
+4
-4
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+4
-4
source/dnode/mnode/impl/src/mndOffset.c
source/dnode/mnode/impl/src/mndOffset.c
+1
-1
source/dnode/mnode/impl/src/mndQnode.c
source/dnode/mnode/impl/src/mndQnode.c
+4
-4
source/dnode/mnode/impl/src/mndQuery.c
source/dnode/mnode/impl/src/mndQuery.c
+23
-25
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+4
-4
source/dnode/mnode/impl/src/mndSnode.c
source/dnode/mnode/impl/src/mndSnode.c
+4
-4
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+6
-6
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+2
-2
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+3
-3
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+5
-5
source/dnode/mnode/impl/src/mndUser.c
source/dnode/mnode/impl/src/mndUser.c
+6
-6
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+1
-1
source/dnode/qnode/src/qnode.c
source/dnode/qnode/src/qnode.c
+30
-25
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+0
-3
source/util/src/terror.c
source/util/src/terror.c
+0
-1
tests/tsim/src/simSystem.c
tests/tsim/src/simSystem.c
+2
-0
未找到文件。
include/dnode/qnode/qnode.h
浏览文件 @
4d5bd478
...
...
@@ -72,7 +72,6 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad);
* @param pMsg The request message
*/
int32_t
qndProcessQueryMsg
(
SQnode
*
pQnode
,
SRpcMsg
*
pMsg
);
int32_t
qndProcessFetchMsg
(
SQnode
*
pQnode
,
SRpcMsg
*
pMsg
);
#ifdef __cplusplus
}
...
...
include/util/taoserror.h
浏览文件 @
4d5bd478
...
...
@@ -129,9 +129,8 @@ int32_t* taosGetErrno();
// mnode-common
#define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300)
#define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0301)
#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0302)
#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0303)
#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0304)
#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0302)
#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0303)
// mnode-show
#define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x0310)
...
...
source/dnode/mgmt/mgmt_bnode/src/bmWorker.c
浏览文件 @
4d5bd478
...
...
@@ -47,10 +47,8 @@ static inline void bmSendRsp(SRpcMsg *pMsg, int32_t code) {
static
void
bmProcessMonitorQueue
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
SBnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
int32_t
code
=
-
1
;
dTrace
(
"msg:%p, get from bnode-monitor queue"
,
pMsg
);
SRpcMsg
*
pRpc
=
pMsg
;
int32_t
code
=
-
1
;
if
(
pMsg
->
msgType
==
TDMT_MON_BM_INFO
)
{
code
=
bmProcessGetMonBmInfoReq
(
pMgmt
,
pMsg
);
...
...
@@ -58,13 +56,13 @@ static void bmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
terrno
=
TSDB_CODE_MSG_NOT_PROCESSED
;
}
if
(
pRpc
->
msgType
&
1U
)
{
if
(
IsReq
(
pMsg
)
)
{
if
(
code
!=
0
&&
terrno
!=
0
)
code
=
terrno
;
bmSendRsp
(
pMsg
,
code
);
}
dTrace
(
"msg:%p, is freed, code:0x%x"
,
pMsg
,
code
);
rpcFreeCont
(
p
Rpc
->
pCont
);
rpcFreeCont
(
p
Msg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
...
...
source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
浏览文件 @
4d5bd478
...
...
@@ -19,7 +19,6 @@
static
void
*
dmStatusThreadFp
(
void
*
param
)
{
SDnodeMgmt
*
pMgmt
=
param
;
int64_t
lastTime
=
taosGetTimestampMs
();
setThreadName
(
"dnode-status"
);
while
(
1
)
{
...
...
@@ -40,7 +39,6 @@ static void *dmStatusThreadFp(void *param) {
static
void
*
dmMonitorThreadFp
(
void
*
param
)
{
SDnodeMgmt
*
pMgmt
=
param
;
int64_t
lastTime
=
taosGetTimestampMs
();
setThreadName
(
"dnode-monitor"
);
while
(
1
)
{
...
...
@@ -103,11 +101,9 @@ void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
static
void
dmProcessMgmtQueue
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
SDnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
int32_t
code
=
-
1
;
tmsg_t
msgType
=
pMsg
->
msgType
;
bool
isRequest
=
msgType
&
1u
;
dTrace
(
"msg:%p, will be processed in dnode-mgmt queue, type:%s"
,
pMsg
,
TMSG_INFO
(
msgType
));
dTrace
(
"msg:%p, will be processed in dnode queue, type:%s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
switch
(
msgType
)
{
switch
(
pMsg
->
msgType
)
{
case
TDMT_DND_CONFIG_DNODE
:
code
=
dmProcessConfigReq
(
pMgmt
,
pMsg
);
break
;
...
...
@@ -149,7 +145,7 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
break
;
}
if
(
isRequest
)
{
if
(
IsReq
(
pMsg
)
)
{
if
(
code
!=
0
&&
terrno
!=
0
)
code
=
terrno
;
SRpcMsg
rsp
=
{
.
code
=
code
,
...
...
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
浏览文件 @
4d5bd478
...
...
@@ -46,7 +46,7 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
code
=
mndProcessMsg
(
pMsg
);
}
if
(
IsReq
(
pMsg
)
&&
pMsg
->
info
.
handle
!=
NULL
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
IsReq
(
pMsg
)
&&
pMsg
->
info
.
handle
!=
NULL
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
terrno
!=
0
)
code
=
terrno
;
mmSendRsp
(
pMsg
,
code
);
}
...
...
@@ -56,28 +56,6 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
taosFreeQitem
(
pMsg
);
}
static
void
mmProcessQueryQueue
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
int32_t
code
=
-
1
;
tmsg_t
msgType
=
pMsg
->
msgType
;
bool
isRequest
=
msgType
&
1U
;
dTrace
(
"msg:%p, get from mnode-query queue"
,
pMsg
);
pMsg
->
info
.
node
=
pMgmt
->
pMnode
;
code
=
mndProcessMsg
(
pMsg
);
if
(
isRequest
)
{
if
(
pMsg
->
info
.
handle
!=
NULL
&&
code
!=
0
)
{
if
(
code
!=
0
&&
terrno
!=
0
)
code
=
terrno
;
mmSendRsp
(
pMsg
,
code
);
}
}
dTrace
(
"msg:%p, is freed, code:0x%x"
,
pMsg
,
code
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
static
int32_t
mmPutNodeMsgToWorker
(
SSingleWorker
*
pWorker
,
SRpcMsg
*
pMsg
)
{
dTrace
(
"msg:%p, put into worker %s, type:%s"
,
pMsg
,
pWorker
->
name
,
TMSG_INFO
(
pMsg
->
msgType
));
taosWriteQitem
(
pWorker
->
queue
,
pMsg
);
...
...
@@ -135,7 +113,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.
min
=
tsNumOfMnodeQueryThreads
,
.
max
=
tsNumOfMnodeQueryThreads
,
.
name
=
"mnode-query"
,
.
fp
=
(
FItem
)
mmProcessQue
ryQue
ue
,
.
fp
=
(
FItem
)
mmProcessQueue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
queryWorker
,
&
qCfg
)
!=
0
)
{
...
...
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
浏览文件 @
4d5bd478
...
...
@@ -19,70 +19,39 @@
static
inline
void
qmSendRsp
(
SRpcMsg
*
pMsg
,
int32_t
code
)
{
SRpcMsg
rsp
=
{
.
code
=
code
,
.
info
=
pMsg
->
info
,
.
pCont
=
pMsg
->
info
.
rsp
,
.
contLen
=
pMsg
->
info
.
rspLen
,
.
info
=
pMsg
->
info
,
};
tmsgSendRsp
(
&
rsp
);
}
static
void
qmProcess
Monitor
Queue
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
static
void
qmProcessQueue
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
SQnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
int32_t
code
=
-
1
;
dTrace
(
"msg:%p, get from qnode queue"
,
pMsg
);
dTrace
(
"msg:%p, get from qnode-monitor queue"
,
pMsg
);
SRpcMsg
*
pRpc
=
pMsg
;
int32_t
code
=
-
1
;
if
(
pMsg
->
msgType
==
TDMT_MON_QM_INFO
)
{
code
=
qmProcessGetMonitorInfoReq
(
pMgmt
,
pMsg
);
}
else
{
terrno
=
TSDB_CODE_MSG_NOT_PROCESSED
;
switch
(
pMsg
->
msgType
)
{
case
TDMT_MON_QM_INFO
:
code
=
qmProcessGetMonitorInfoReq
(
pMgmt
,
pMsg
);
break
;
default:
code
=
qndProcessQueryMsg
(
pMgmt
->
pQnode
,
pMsg
);
break
;
}
if
(
pRpc
->
msgType
&
1U
)
{
if
(
IsReq
(
pMsg
)
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
terrno
!=
0
)
code
=
terrno
;
qmSendRsp
(
pMsg
,
code
);
}
dTrace
(
"msg:%p, is freed, code:0x%x"
,
pMsg
,
code
);
rpcFreeCont
(
pRpc
->
pCont
);
taosFreeQitem
(
pMsg
);
}
static
void
qmProcessQueryQueue
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
SQnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
dTrace
(
"msg:%p, get from qnode-query queue"
,
pMsg
);
SRpcMsg
*
pRpc
=
pMsg
;
int32_t
code
=
qndProcessQueryMsg
(
pMgmt
->
pQnode
,
pRpc
);
if
(
pRpc
->
msgType
&
1U
&&
code
!=
0
)
{
qmSendRsp
(
pMsg
,
code
);
}
dTrace
(
"msg:%p, is freed, code:0x%x"
,
pMsg
,
code
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
static
void
qmProcessFetchQueue
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
SQnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
dTrace
(
"msg:%p, get from qnode-fetch queue"
,
pMsg
);
SRpcMsg
*
pRpc
=
pMsg
;
int32_t
code
=
qndProcessFetchMsg
(
pMgmt
->
pQnode
,
pRpc
);
if
(
pRpc
->
msgType
&
1U
&&
code
!=
0
)
{
qmSendRsp
(
pMsg
,
code
);
}
dTrace
(
"msg:%p, is freed, code:0x%x"
,
pMsg
,
code
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
static
int32_t
qmPutNodeMsgToWorker
(
SSingleWorker
*
pWorker
,
SRpcMsg
*
pMsg
)
{
dTrace
(
"msg:%p, put into worker %s
"
,
pMsg
,
pWorker
->
name
);
dTrace
(
"msg:%p, put into worker %s
, type:%s"
,
pMsg
,
pWorker
->
name
,
TMSG_INFO
(
pMsg
->
msgType
)
);
taosWriteQitem
(
pWorker
->
queue
,
pMsg
);
return
0
;
}
...
...
@@ -101,9 +70,7 @@ int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
static
int32_t
qmPutRpcMsgToWorker
(
SQnodeMgmt
*
pMgmt
,
SSingleWorker
*
pWorker
,
SRpcMsg
*
pRpc
)
{
SRpcMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
),
RPC_QITEM
);
if
(
pMsg
==
NULL
)
{
return
-
1
;
}
if
(
pMsg
==
NULL
)
return
-
1
;
dTrace
(
"msg:%p, create and put into worker:%s, type:%s"
,
pMsg
,
pWorker
->
name
,
TMSG_INFO
(
pRpc
->
msgType
));
memcpy
(
pMsg
,
pRpc
,
sizeof
(
SRpcMsg
));
...
...
@@ -141,7 +108,7 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
.
min
=
tsNumOfVnodeQueryThreads
,
.
max
=
tsNumOfVnodeQueryThreads
,
.
name
=
"qnode-query"
,
.
fp
=
(
FItem
)
qmProcessQue
ryQue
ue
,
.
fp
=
(
FItem
)
qmProcessQueue
,
.
param
=
pMgmt
,
};
...
...
@@ -154,7 +121,7 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
.
min
=
tsNumOfQnodeFetchThreads
,
.
max
=
tsNumOfQnodeFetchThreads
,
.
name
=
"qnode-fetch"
,
.
fp
=
(
FItem
)
qmProcess
Fetch
Queue
,
.
fp
=
(
FItem
)
qmProcessQueue
,
.
param
=
pMgmt
,
};
...
...
@@ -167,7 +134,7 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
.
min
=
1
,
.
max
=
1
,
.
name
=
"qnode-monitor"
,
.
fp
=
(
FItem
)
qmProcess
Monitor
Queue
,
.
fp
=
(
FItem
)
qmProcessQueue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
monitorWorker
,
&
mCfg
)
!=
0
)
{
...
...
source/dnode/mgmt/mgmt_snode/src/smWorker.c
浏览文件 @
4d5bd478
...
...
@@ -28,10 +28,8 @@ static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) {
static
void
smProcessMonitorQueue
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
SSnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
int32_t
code
=
-
1
;
dTrace
(
"msg:%p, get from snode-monitor queue"
,
pMsg
);
SRpcMsg
*
pRpc
=
pMsg
;
int32_t
code
=
-
1
;
if
(
pMsg
->
msgType
==
TDMT_MON_SM_INFO
)
{
code
=
smProcessGetMonitorInfoReq
(
pMgmt
,
pMsg
);
...
...
@@ -39,13 +37,13 @@ static void smProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
terrno
=
TSDB_CODE_MSG_NOT_PROCESSED
;
}
if
(
pRpc
->
msgType
&
1U
)
{
if
(
IsReq
(
pMsg
)
)
{
if
(
code
!=
0
&&
terrno
!=
0
)
code
=
terrno
;
smSendRsp
(
pMsg
,
code
);
}
dTrace
(
"msg:%p, is freed, code:0x%x"
,
pMsg
,
code
);
rpcFreeCont
(
p
Rpc
->
pCont
);
rpcFreeCont
(
p
Msg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
4d5bd478
...
...
@@ -29,7 +29,7 @@ static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) {
tmsgSendRsp
(
&
rsp
);
}
static
void
vmProcess
MgmtMonitor
Queue
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
static
void
vmProcessQueue
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
SVnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
int32_t
code
=
-
1
;
dTrace
(
"msg:%p, get from vnode queue, type:%s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
...
...
@@ -92,7 +92,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static
void
vmProcessWriteQueue
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SVnodeObj
*
pVnode
=
pInfo
->
ahandle
;
SArray
*
pArray
=
taosArrayInit
(
numOfMsgs
,
sizeof
(
SRpcMsg
*
));
SArray
*
pArray
=
taosArrayInit
(
numOfMsgs
,
sizeof
(
SRpcMsg
*
));
if
(
pArray
==
NULL
)
{
dError
(
"failed to process %d msgs in write-queue since %s"
,
numOfMsgs
,
terrstr
());
return
;
...
...
@@ -222,8 +222,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
}
static
int32_t
vmPutNodeMsgToQueue
(
SVnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
,
EQueueType
qtype
)
{
SRpcMsg
*
pRpc
=
pMsg
;
SMsgHead
*
pHead
=
pRpc
->
pCont
;
SMsgHead
*
pHead
=
pMsg
->
pCont
;
int32_t
code
=
0
;
pHead
->
contLen
=
ntohl
(
pHead
->
contLen
);
...
...
@@ -237,23 +236,23 @@ static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType
switch
(
qtype
)
{
case
QUERY_QUEUE
:
dTrace
(
"msg:%p, put into vnode-query worker, type:%s"
,
pMsg
,
TMSG_INFO
(
p
Rpc
->
msgType
));
dTrace
(
"msg:%p, put into vnode-query worker, type:%s"
,
pMsg
,
TMSG_INFO
(
p
Msg
->
msgType
));
taosWriteQitem
(
pVnode
->
pQueryQ
,
pMsg
);
break
;
case
FETCH_QUEUE
:
dTrace
(
"msg:%p, put into vnode-fetch worker, type:%s"
,
pMsg
,
TMSG_INFO
(
p
Rpc
->
msgType
));
dTrace
(
"msg:%p, put into vnode-fetch worker, type:%s"
,
pMsg
,
TMSG_INFO
(
p
Msg
->
msgType
));
taosWriteQitem
(
pVnode
->
pFetchQ
,
pMsg
);
break
;
case
WRITE_QUEUE
:
dTrace
(
"msg:%p, put into vnode-write worker, type:%s"
,
pMsg
,
TMSG_INFO
(
p
Rpc
->
msgType
));
dTrace
(
"msg:%p, put into vnode-write worker, type:%s"
,
pMsg
,
TMSG_INFO
(
p
Msg
->
msgType
));
taosWriteQitem
(
pVnode
->
pWriteQ
,
pMsg
);
break
;
case
SYNC_QUEUE
:
dTrace
(
"msg:%p, put into vnode-sync worker, type:%s"
,
pMsg
,
TMSG_INFO
(
p
Rpc
->
msgType
));
dTrace
(
"msg:%p, put into vnode-sync worker, type:%s"
,
pMsg
,
TMSG_INFO
(
p
Msg
->
msgType
));
taosWriteQitem
(
pVnode
->
pSyncQ
,
pMsg
);
break
;
case
MERGE_QUEUE
:
dTrace
(
"msg:%p, put into vnode-merge worker, type:%s"
,
pMsg
,
TMSG_INFO
(
p
Rpc
->
msgType
));
dTrace
(
"msg:%p, put into vnode-merge worker, type:%s"
,
pMsg
,
TMSG_INFO
(
p
Msg
->
msgType
));
taosWriteQitem
(
pVnode
->
pMergeQ
,
pMsg
);
break
;
default:
...
...
@@ -301,7 +300,7 @@ int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
static
int32_t
vmPutRpcMsgToQueue
(
SVnodeMgmt
*
pMgmt
,
SRpcMsg
*
pRpc
,
EQueueType
qtype
)
{
SMsgHead
*
pHead
=
pRpc
->
pCont
;
SMsgHead
*
pHead
=
pRpc
->
pCont
;
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
pHead
->
vgId
);
if
(
pVnode
==
NULL
)
return
-
1
;
...
...
@@ -469,7 +468,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
.
min
=
1
,
.
max
=
1
,
.
name
=
"vnode-mgmt"
,
.
fp
=
(
FItem
)
vmProcess
MgmtMonitor
Queue
,
.
fp
=
(
FItem
)
vmProcessQueue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
mgmtWorker
,
&
cfg
)
!=
0
)
{
...
...
@@ -481,7 +480,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
.
min
=
1
,
.
max
=
1
,
.
name
=
"vnode-monitor"
,
.
fp
=
(
FItem
)
vmProcess
MgmtMonitor
Queue
,
.
fp
=
(
FItem
)
vmProcessQueue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
monitorWorker
,
&
mCfg
)
!=
0
)
{
...
...
source/dnode/mgmt/node_mgmt/inc/dmMgmt.h
浏览文件 @
4d5bd478
...
...
@@ -137,7 +137,6 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper);
void
dmSetStatus
(
SDnode
*
pDnode
,
EDndRunStatus
stype
);
void
dmProcessServerStartupStatus
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
);
void
dmProcessNetTestReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
);
void
dmProcessFetchRsp
(
SRpcMsg
*
pMsg
);
// dmNodes.c
int32_t
dmOpenNode
(
SMgmtWrapper
*
pWrapper
);
...
...
source/dnode/mgmt/node_mgmt/src/dmMgmt.c
浏览文件 @
4d5bd478
...
...
@@ -314,8 +314,3 @@ void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) {
rpcSendResponse
(
&
rsp
);
rpcFreeCont
(
pMsg
->
pCont
);
}
void
dmProcessFetchRsp
(
SRpcMsg
*
pMsg
)
{
qWorkerProcessFetchRsp
(
NULL
,
NULL
,
pMsg
);
// rpcFreeCont(pMsg->pCont);
}
\ No newline at end of file
source/dnode/mgmt/node_mgmt/src/dmTransport.c
浏览文件 @
4d5bd478
...
...
@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "dmMgmt.h"
#include "qworker.h"
static
void
dmSendRedirectRsp
(
SRpcMsg
*
pMsg
,
const
SEpSet
*
pNewEpSet
);
static
void
dmSendRsp
(
SRpcMsg
*
pMsg
);
...
...
@@ -61,7 +62,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
dmProcessNetTestReq
(
pDnode
,
pRpc
);
return
;
}
else
if
(
pRpc
->
msgType
==
TDMT_MND_SYSTABLE_RETRIEVE_RSP
||
pRpc
->
msgType
==
TDMT_VND_FETCH_RSP
)
{
dmProcessFetchRsp
(
pRpc
);
qWorkerProcessFetchRsp
(
NULL
,
NULL
,
pRpc
);
return
;
}
else
{
}
...
...
source/dnode/mnode/impl/src/mndBnode.c
浏览文件 @
4d5bd478
...
...
@@ -304,10 +304,10 @@ static int32_t mndProcessCreateBnodeReq(SRpcMsg *pReq) {
}
code
=
mndCreateBnode
(
pMnode
,
pReq
,
pDnode
,
&
createReq
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"bnode:%d, failed to create since %s"
,
createReq
.
dnodeId
,
terrstr
());
}
...
...
@@ -414,10 +414,10 @@ static int32_t mndProcessDropBnodeReq(SRpcMsg *pReq) {
}
code
=
mndDropBnode
(
pMnode
,
pReq
,
pObj
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"bnode:%d, failed to drop since %s"
,
dropReq
.
dnodeId
,
terrstr
());
}
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
4d5bd478
...
...
@@ -511,7 +511,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
SUBSCRIBE_OVER
;
}
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
SUBSCRIBE_OVER:
mndTransDrop
(
pTrans
);
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
4d5bd478
...
...
@@ -605,10 +605,10 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
}
code
=
mndCreateDb
(
pMnode
,
pReq
,
&
createReq
,
pUser
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"db:%s, failed to create since %s"
,
createReq
.
db
,
terrstr
());
}
...
...
@@ -839,10 +839,10 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
dbObj
.
cfgVersion
++
;
dbObj
.
updateTime
=
taosGetTimestampMs
();
code
=
mndAlterDb
(
pMnode
,
pReq
,
pDb
,
&
dbObj
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"db:%s, failed to alter since %s"
,
alterReq
.
db
,
terrstr
());
}
...
...
@@ -1110,10 +1110,10 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) {
}
code
=
mndDropDb
(
pMnode
,
pReq
,
pDb
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"db:%s, failed to drop since %s"
,
dropReq
.
db
,
terrstr
());
}
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
4d5bd478
...
...
@@ -504,10 +504,10 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
}
code
=
mndCreateDnode
(
pMnode
,
pReq
,
&
createReq
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
CREATE_DNODE_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"dnode:%s:%d, failed to create since %s"
,
createReq
.
fqdn
,
createReq
.
port
,
terrstr
());
}
...
...
@@ -585,10 +585,10 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
}
code
=
mndDropDnode
(
pMnode
,
pReq
,
pDnode
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
DROP_DNODE_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"dnode:%d, failed to drop since %s"
,
dropReq
.
dnodeId
,
terrstr
());
}
...
...
source/dnode/mnode/impl/src/mndFunc.c
浏览文件 @
4d5bd478
...
...
@@ -330,10 +330,10 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) {
}
code
=
mndCreateFunc
(
pMnode
,
pReq
,
&
createReq
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"func:%s, failed to create since %s"
,
createReq
.
name
,
terrstr
());
}
...
...
@@ -386,10 +386,10 @@ static int32_t mndProcessDropFuncReq(SRpcMsg *pReq) {
}
code
=
mndDropFunc
(
pMnode
,
pReq
,
pFunc
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"func:%s, failed to drop since %s"
,
dropReq
.
name
,
terrstr
());
}
...
...
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
4d5bd478
...
...
@@ -402,10 +402,10 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
}
code
=
mndCreateMnode
(
pMnode
,
pReq
,
pDnode
,
&
createReq
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"mnode:%d, failed to create since %s"
,
createReq
.
dnodeId
,
terrstr
());
}
...
...
@@ -574,10 +574,10 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
}
code
=
mndDropMnode
(
pMnode
,
pReq
,
pObj
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"mnode:%d, failed to drop since %s"
,
dropReq
.
dnodeId
,
terrstr
());
}
...
...
source/dnode/mnode/impl/src/mndOffset.c
浏览文件 @
4d5bd478
...
...
@@ -205,7 +205,7 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
}
mndTransDrop
(
pTrans
);
return
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
return
TSDB_CODE_ACTION_IN_PROGRESS
;
}
static
int32_t
mndOffsetActionInsert
(
SSdb
*
pSdb
,
SMqOffsetObj
*
pOffset
)
{
...
...
source/dnode/mnode/impl/src/mndQnode.c
浏览文件 @
4d5bd478
...
...
@@ -306,10 +306,10 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) {
}
code
=
mndCreateQnode
(
pMnode
,
pReq
,
pDnode
,
&
createReq
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"qnode:%d, failed to create since %s"
,
createReq
.
dnodeId
,
terrstr
());
}
...
...
@@ -416,10 +416,10 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) {
}
code
=
mndDropQnode
(
pMnode
,
pReq
,
pObj
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"qnode:%d, failed to drop since %s"
,
dropReq
.
dnodeId
,
terrstr
());
}
...
...
source/dnode/mnode/impl/src/mndQuery.c
浏览文件 @
4d5bd478
...
...
@@ -18,37 +18,35 @@
#include "mndMnode.h"
#include "qworker.h"
int32_t
mndProcessQueryMsg
(
SRpcMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
int32_t
mndProcessQueryMsg
(
SRpcMsg
*
pMsg
)
{
int32_t
code
=
-
1
;
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
SReadHandle
handle
=
{.
mnd
=
pMnode
,
.
pMsgCb
=
&
pMnode
->
msgCb
};
mTrace
(
"msg:%p, in query queue is processing"
,
p
Req
);
switch
(
p
Req
->
msgType
)
{
mTrace
(
"msg:%p, in query queue is processing"
,
p
Msg
);
switch
(
p
Msg
->
msgType
)
{
case
TDMT_VND_QUERY
:
return
qWorkerProcessQueryMsg
(
&
handle
,
pMnode
->
pQuery
,
pReq
);
code
=
qWorkerProcessQueryMsg
(
&
handle
,
pMnode
->
pQuery
,
pMsg
);
break
;
case
TDMT_VND_QUERY_CONTINUE
:
return
qWorkerProcessCQueryMsg
(
&
handle
,
pMnode
->
pQuery
,
pReq
);
default:
mError
(
"unknown msg type:%d in query queue"
,
pReq
->
msgType
);
return
TSDB_CODE_VND_APP_ERROR
;
}
}
int32_t
mndProcessFetchMsg
(
SRpcMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
mTrace
(
"msg:%p, in fetch queue is processing"
,
pMsg
);
switch
(
pMsg
->
msgType
)
{
code
=
qWorkerProcessCQueryMsg
(
&
handle
,
pMnode
->
pQuery
,
pMsg
);
break
;
case
TDMT_VND_FETCH
:
return
qWorkerProcessFetchMsg
(
pMnode
,
pMnode
->
pQuery
,
pMsg
);
code
=
qWorkerProcessFetchMsg
(
pMnode
,
pMnode
->
pQuery
,
pMsg
);
break
;
case
TDMT_VND_DROP_TASK
:
return
qWorkerProcessDropMsg
(
pMnode
,
pMnode
->
pQuery
,
pMsg
);
code
=
qWorkerProcessDropMsg
(
pMnode
,
pMnode
->
pQuery
,
pMsg
);
break
;
case
TDMT_VND_QUERY_HEARTBEAT
:
return
qWorkerProcessHbMsg
(
pMnode
,
pMnode
->
pQuery
,
pMsg
);
code
=
qWorkerProcessHbMsg
(
pMnode
,
pMnode
->
pQuery
,
pMsg
);
break
;
default:
mError
(
"unknown msg type:%d in fetch queue"
,
pMsg
->
msgType
)
;
return
TSDB_CODE_VND_APP_ERROR
;
terrno
=
TSDB_CODE_VND_APP_ERROR
;
mError
(
"unknown msg type:%d in query queue"
,
pMsg
->
msgType
)
;
}
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
return
code
;
}
int32_t
mndInitQuery
(
SMnode
*
pMnode
)
{
...
...
@@ -59,9 +57,9 @@ int32_t mndInitQuery(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_VND_QUERY
,
mndProcessQueryMsg
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_QUERY_CONTINUE
,
mndProcessQueryMsg
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_FETCH
,
mndProcess
Fetch
Msg
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_DROP_TASK
,
mndProcess
Fetch
Msg
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_QUERY_HEARTBEAT
,
mndProcess
Fetch
Msg
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_FETCH
,
mndProcess
Query
Msg
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_DROP_TASK
,
mndProcess
Query
Msg
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_QUERY_HEARTBEAT
,
mndProcess
Query
Msg
);
return
0
;
}
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
4d5bd478
...
...
@@ -562,10 +562,10 @@ static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq) {
}
code
=
mndCreateSma
(
pMnode
,
pReq
,
&
createReq
,
pDb
,
pStb
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"sma:%s, failed to create since %s"
,
createReq
.
name
,
terrstr
(
terrno
));
}
...
...
@@ -706,10 +706,10 @@ static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq) {
}
code
=
mndDropSma
(
pMnode
,
pReq
,
pDb
,
pSma
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"sma:%s, failed to drop since %s"
,
dropReq
.
name
,
terrstr
());
}
...
...
source/dnode/mnode/impl/src/mndSnode.c
浏览文件 @
4d5bd478
...
...
@@ -312,10 +312,10 @@ static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) {
}
code
=
mndCreateSnode
(
pMnode
,
pReq
,
pDnode
,
&
createReq
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"snode:%d, failed to create since %s"
,
createReq
.
dnodeId
,
terrstr
());
return
-
1
;
}
...
...
@@ -424,10 +424,10 @@ static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) {
}
code
=
mndDropSnode
(
pMnode
,
pReq
,
pObj
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"snode:%d, failed to drop since %s"
,
dropReq
.
dnodeId
,
terrstr
());
}
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
4d5bd478
...
...
@@ -827,10 +827,10 @@ static int32_t mndProcessMCreateStbReq(SRpcMsg *pReq) {
}
code
=
mndCreateStb
(
pMnode
,
pReq
,
&
createReq
,
pDb
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"stb:%s, failed to create since %s"
,
createReq
.
name
,
terrstr
());
}
...
...
@@ -1334,10 +1334,10 @@ static int32_t mndProcessMAlterStbReq(SRpcMsg *pReq) {
}
code
=
mndAlterStb
(
pMnode
,
pReq
,
&
alterReq
,
pDb
,
pStb
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"stb:%s, failed to alter since %s"
,
alterReq
.
name
,
terrstr
());
}
...
...
@@ -1475,10 +1475,10 @@ static int32_t mndProcessMDropStbReq(SRpcMsg *pReq) {
}
code
=
mndDropStb
(
pMnode
,
pReq
,
pDb
,
pStb
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"stb:%s, failed to drop since %s"
,
dropReq
.
name
,
terrstr
());
}
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
4d5bd478
...
...
@@ -472,10 +472,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
}
code
=
mndCreateStream
(
pMnode
,
pReq
,
&
createStreamReq
,
pDb
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
CREATE_STREAM_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"stream:%s, failed to create since %s"
,
createStreamReq
.
name
,
terrstr
());
}
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
4d5bd478
...
...
@@ -457,10 +457,10 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
}
code
=
mndCreateTopic
(
pMnode
,
pReq
,
&
createTopicReq
,
pDb
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
CREATE_TOPIC_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"topic:%s, failed to create since %s"
,
createTopicReq
.
name
,
terrstr
());
}
...
...
@@ -547,7 +547,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
return
-
1
;
}
return
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
return
TSDB_CODE_ACTION_IN_PROGRESS
;
}
static
int32_t
mndProcessDropTopicInRsp
(
SRpcMsg
*
pRsp
)
{
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
4d5bd478
...
...
@@ -1035,13 +1035,13 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
}
}
else
{
mDebug
(
"trans:%d, %d of %d actions executed"
,
pTrans
->
id
,
numOfReceived
,
numOfActions
);
return
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
return
TSDB_CODE_ACTION_IN_PROGRESS
;
}
}
static
int32_t
mndTransExecuteRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
mndTransExecuteActions
(
pMnode
,
pTrans
,
pTrans
->
redoActions
);
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"failed to execute redoActions since:%s, code:0x%x"
,
terrstr
(),
terrno
);
}
return
code
;
...
...
@@ -1049,7 +1049,7 @@ static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
static
int32_t
mndTransExecuteUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
mndTransExecuteActions
(
pMnode
,
pTrans
,
pTrans
->
undoActions
);
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"failed to execute undoActions since %s"
,
terrstr
());
}
return
code
;
...
...
@@ -1088,7 +1088,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
pTrans
->
stage
=
TRN_STAGE_COMMIT
;
mDebug
(
"trans:%d, stage from redoAction to commit"
,
pTrans
->
id
);
continueExec
=
true
;
}
else
if
(
code
==
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
}
else
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mDebug
(
"trans:%d, stage keep on redoAction since %s"
,
pTrans
->
id
,
tstrerror
(
code
));
continueExec
=
false
;
}
else
{
...
...
@@ -1176,7 +1176,7 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
pTrans
->
stage
=
TRN_STAGE_UNDO_LOG
;
mDebug
(
"trans:%d, stage from undoAction to undoLog"
,
pTrans
->
id
);
continueExec
=
true
;
}
else
if
(
code
==
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
}
else
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mDebug
(
"trans:%d, stage keep on undoAction since %s"
,
pTrans
->
id
,
tstrerror
(
code
));
continueExec
=
false
;
}
else
{
...
...
source/dnode/mnode/impl/src/mndUser.c
浏览文件 @
4d5bd478
...
...
@@ -331,10 +331,10 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) {
}
code
=
mndCreateUser
(
pMnode
,
pOperUser
->
acct
,
&
createReq
,
pReq
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"user:%s, failed to create since %s"
,
createReq
.
user
,
terrstr
());
}
...
...
@@ -536,10 +536,10 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
}
code
=
mndAlterUser
(
pMnode
,
pUser
,
&
newUser
,
pReq
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"user:%s, failed to alter since %s"
,
alterReq
.
user
,
terrstr
());
}
...
...
@@ -613,10 +613,10 @@ static int32_t mndProcessDropUserReq(SRpcMsg *pReq) {
}
code
=
mndDropUser
(
pMnode
,
pReq
,
pUser
);
if
(
code
==
0
)
code
=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"user:%s, failed to drop since %s"
,
dropReq
.
user
,
terrstr
());
}
...
...
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
4d5bd478
...
...
@@ -367,7 +367,7 @@ int32_t mndProcessMsg(SRpcMsg *pMsg) {
}
int32_t
code
=
(
*
fp
)(
pMsg
);
if
(
code
==
TSDB_CODE_
MND_
ACTION_IN_PROGRESS
)
{
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
terrno
=
code
;
mTrace
(
"msg:%p, in progress, app:%p"
,
pMsg
,
ahandle
);
}
else
if
(
code
!=
0
)
{
...
...
source/dnode/qnode/src/qnode.c
浏览文件 @
4d5bd478
...
...
@@ -43,44 +43,49 @@ void qndClose(SQnode *pQnode) {
int32_t
qndGetLoad
(
SQnode
*
pQnode
,
SQnodeLoad
*
pLoad
)
{
return
0
;
}
int32_t
qndProcessQueryMsg
(
SQnode
*
pQnode
,
SRpcMsg
*
pMsg
)
{
qTrace
(
"message in qnode query queue is processing"
)
;
int32_t
code
=
-
1
;
SReadHandle
handle
=
{.
pMsgCb
=
&
pQnode
->
msgCb
};
qTrace
(
"message in qnode queue is processing"
);
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_QUERY
:
{
return
qWorkerProcessQueryMsg
(
&
handle
,
pQnode
->
pQuery
,
pMsg
);
}
case
TDMT_VND_QUERY
:
code
=
qWorkerProcessQueryMsg
(
&
handle
,
pQnode
->
pQuery
,
pMsg
);
break
;
case
TDMT_VND_QUERY_CONTINUE
:
return
qWorkerProcessCQueryMsg
(
&
handle
,
pQnode
->
pQuery
,
pMsg
);
default:
qError
(
"unknown msg type:%d in query queue"
,
pMsg
->
msgType
);
return
TSDB_CODE_VND_APP_ERROR
;
}
}
int32_t
qndProcessFetchMsg
(
SQnode
*
pQnode
,
SRpcMsg
*
pMsg
)
{
qTrace
(
"message in fetch queue is processing"
);
switch
(
pMsg
->
msgType
)
{
code
=
qWorkerProcessCQueryMsg
(
&
handle
,
pQnode
->
pQuery
,
pMsg
);
break
;
case
TDMT_VND_FETCH
:
return
qWorkerProcessFetchMsg
(
pQnode
,
pQnode
->
pQuery
,
pMsg
);
code
=
qWorkerProcessFetchMsg
(
pQnode
,
pQnode
->
pQuery
,
pMsg
);
break
;
case
TDMT_VND_FETCH_RSP
:
return
qWorkerProcessFetchRsp
(
pQnode
,
pQnode
->
pQuery
,
pMsg
);
code
=
qWorkerProcessFetchRsp
(
pQnode
,
pQnode
->
pQuery
,
pMsg
);
break
;
case
TDMT_VND_RES_READY
:
return
qWorkerProcessReadyMsg
(
pQnode
,
pQnode
->
pQuery
,
pMsg
);
code
=
qWorkerProcessReadyMsg
(
pQnode
,
pQnode
->
pQuery
,
pMsg
);
break
;
case
TDMT_VND_TASKS_STATUS
:
return
qWorkerProcessStatusMsg
(
pQnode
,
pQnode
->
pQuery
,
pMsg
);
code
=
qWorkerProcessStatusMsg
(
pQnode
,
pQnode
->
pQuery
,
pMsg
);
break
;
case
TDMT_VND_CANCEL_TASK
:
return
qWorkerProcessCancelMsg
(
pQnode
,
pQnode
->
pQuery
,
pMsg
);
code
=
qWorkerProcessCancelMsg
(
pQnode
,
pQnode
->
pQuery
,
pMsg
);
break
;
case
TDMT_VND_DROP_TASK
:
return
qWorkerProcessDropMsg
(
pQnode
,
pQnode
->
pQuery
,
pMsg
);
code
=
qWorkerProcessDropMsg
(
pQnode
,
pQnode
->
pQuery
,
pMsg
);
break
;
case
TDMT_VND_TABLE_META
:
// return vnodeGetTableMeta(pQnode, pMsg);
// code = vnodeGetTableMeta(pQnode, pMsg);
// break;
case
TDMT_VND_CONSUME
:
// return tqProcessConsumeReq(pQnode->pTq, pMsg);
// code = tqProcessConsumeReq(pQnode->pTq, pMsg);
// break;
case
TDMT_VND_QUERY_HEARTBEAT
:
return
qWorkerProcessHbMsg
(
pQnode
,
pQnode
->
pQuery
,
pMsg
);
code
=
qWorkerProcessHbMsg
(
pQnode
,
pQnode
->
pQuery
,
pMsg
);
break
;
default:
qError
(
"unknown msg type:%d in
fetch
queue"
,
pMsg
->
msgType
);
return
TSDB_CODE_VND_APP_ERROR
;
qError
(
"unknown msg type:%d in
qnode
queue"
,
pMsg
->
msgType
);
terrno
=
TSDB_CODE_VND_APP_ERROR
;
}
if
(
code
==
0
)
return
TSDB_CODE_ACTION_IN_PROGRESS
;
return
code
;
}
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
4d5bd478
...
...
@@ -147,9 +147,6 @@ _err:
int
vnodeProcessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
vTrace
(
"message in vnode query queue is processing"
);
#if 0
SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
#endif
SReadHandle
handle
=
{.
meta
=
pVnode
->
pMeta
,
.
config
=
&
pVnode
->
config
,
.
vnode
=
pVnode
,
.
pMsgCb
=
&
pVnode
->
msgCb
};
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_QUERY
:
...
...
source/util/src/terror.c
浏览文件 @
4d5bd478
...
...
@@ -134,7 +134,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CLAUSE_ERROR, "not supported stmt cl
// mnode-common
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_APP_ERROR
,
"Mnode internal error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_NOT_READY
,
"Mnode not ready"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_ACTION_IN_PROGRESS
,
"Message is progressing"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_NO_RIGHTS
,
"Insufficient privilege for operation"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_CONNECTION
,
"Invalid message connection"
)
...
...
tests/tsim/src/simSystem.c
浏览文件 @
4d5bd478
...
...
@@ -98,6 +98,8 @@ SScript *simProcessCallOver(SScript *script) {
return
NULL
;
}
if
(
simScriptPos
==
-
1
)
return
NULL
;
return
simScriptList
[
simScriptPos
];
}
else
{
simDebug
(
"script:%s, is stopped"
,
script
->
fileName
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录