提交 a6c3ffd5 编写于 作者: S Shengliang Guan

[TD-335] fix invalid read in mwrite queue

上级 5c889fff
......@@ -32,7 +32,7 @@ void* dnodeGetVnodeWal(void *pVnode);
void* dnodeGetVnodeTsdb(void *pVnode);
void dnodeReleaseVnode(void *pVnode);
void dnodeSendRedirectMsg(int32_t msgType, void *thandle, bool forShell);
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
void dnodeGetMnodeIpSetForPeer(void *ipSet);
void dnodeGetMnodeIpSetForShell(void *ipSe);
......
......@@ -111,7 +111,7 @@ void dnodeFreeMnodePqueue() {
void dnodeDispatchToMnodePeerQueue(SRpcMsg *pMsg) {
if (!mnodeIsRunning() || tsMPeerQueue == NULL) {
dnodeSendRedirectMsg(pMsg->msgType, pMsg->handle, false);
dnodeSendRedirectMsg(pMsg, false);
return;
}
......@@ -120,18 +120,23 @@ void dnodeDispatchToMnodePeerQueue(SRpcMsg *pMsg) {
taosWriteQitem(tsMPeerQueue, TAOS_QTYPE_RPC, pPeer);
}
static void dnodeFreeMnodePeadMsg(SMnodeMsg *pPeer) {
mnodeCleanupMsg(pPeer);
taosFreeQitem(pPeer);
}
static void dnodeSendRpcMnodePeerRsp(SMnodeMsg *pPeer, int32_t code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
SRpcMsg rpcRsp = {
.handle = pPeer->thandle,
.handle = pPeer->rpcMsg.handle,
.pCont = pPeer->rpcRsp.rsp,
.contLen = pPeer->rpcRsp.len,
.code = code,
};
rpcSendResponse(&rpcRsp);
mnodeCleanupMsg(pPeer);
dnodeFreeMnodePeadMsg(pPeer);
}
static void *dnodeProcessMnodePeerQueue(void *param) {
......@@ -145,10 +150,9 @@ static void *dnodeProcessMnodePeerQueue(void *param) {
break;
}
dTrace("%p, msg:%s will be processed in mpeer queue", pPeerMsg->ahandle, taosMsg[pPeerMsg->msgType]);
dTrace("%p, msg:%s will be processed in mpeer queue", pPeerMsg->rpcMsg.ahandle, taosMsg[pPeerMsg->rpcMsg.msgType]);
int32_t code = mnodeProcessPeerReq(pPeerMsg);
dnodeSendRpcMnodePeerRsp(pPeerMsg, code);
taosFreeQitem(pPeerMsg);
}
return NULL;
......
......@@ -116,7 +116,7 @@ void dnodeFreeMnodeRqueue() {
void dnodeDispatchToMnodeReadQueue(SRpcMsg *pMsg) {
if (!mnodeIsRunning() || tsMReadQueue == NULL) {
dnodeSendRedirectMsg(pMsg->msgType, pMsg->handle, true);
dnodeSendRedirectMsg(pMsg, true);
return;
}
......@@ -125,18 +125,23 @@ void dnodeDispatchToMnodeReadQueue(SRpcMsg *pMsg) {
taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead);
}
static void dnodeFreeMnodeReadMsg(SMnodeMsg *pRead) {
mnodeCleanupMsg(pRead);
taosFreeQitem(pRead);
}
static void dnodeSendRpcMnodeReadRsp(SMnodeMsg *pRead, int32_t code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
SRpcMsg rpcRsp = {
.handle = pRead->thandle,
.handle = pRead->rpcMsg.handle,
.pCont = pRead->rpcRsp.rsp,
.contLen = pRead->rpcRsp.len,
.code = code,
};
rpcSendResponse(&rpcRsp);
mnodeCleanupMsg(pRead);
dnodeFreeMnodeReadMsg(pRead);
}
static void *dnodeProcessMnodeReadQueue(void *param) {
......@@ -150,10 +155,9 @@ static void *dnodeProcessMnodeReadQueue(void *param) {
break;
}
dTrace("%p, msg:%s will be processed in mread queue", pReadMsg->ahandle, taosMsg[pReadMsg->msgType]);
dTrace("%p, msg:%s will be processed in mread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]);
int32_t code = mnodeProcessRead(pReadMsg);
dnodeSendRpcMnodeReadRsp(pReadMsg, code);
taosFreeQitem(pReadMsg);
dnodeSendRpcMnodeReadRsp(pReadMsg, code);
}
return NULL;
......
......@@ -113,7 +113,7 @@ void dnodeFreeMnodeWqueue() {
void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) {
if (!mnodeIsRunning() || tsMWriteQueue == NULL) {
dnodeSendRedirectMsg(pMsg->msgType, pMsg->handle, true);
dnodeSendRedirectMsg(pMsg, true);
return;
}
......@@ -122,19 +122,24 @@ void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) {
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
}
static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) {
mnodeCleanupMsg(pWrite);
taosFreeQitem(pWrite);
}
void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) {
SMnodeMsg *pWrite = pRaw;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
SRpcMsg rpcRsp = {
.handle = pWrite->thandle,
.handle = pWrite->rpcMsg.handle,
.pCont = pWrite->rpcRsp.rsp,
.contLen = pWrite->rpcRsp.len,
.code = code,
};
rpcSendResponse(&rpcRsp);
mnodeCleanupMsg(pWrite);
dnodeFreeMnodeWriteMsg(pWrite);
}
static void *dnodeProcessMnodeWriteQueue(void *param) {
......@@ -148,26 +153,19 @@ static void *dnodeProcessMnodeWriteQueue(void *param) {
break;
}
dTrace("%p, msg:%s will be processed in mwrite queue", pWriteMsg->ahandle, taosMsg[pWriteMsg->msgType]);
dTrace("%p, msg:%s will be processed in mwrite queue", pWriteMsg->rpcMsg.ahandle, taosMsg[pWriteMsg->rpcMsg.msgType]);
int32_t code = mnodeProcessWrite(pWriteMsg);
dnodeSendRpcMnodeWriteRsp(pWriteMsg, code);
taosFreeQitem(pWriteMsg);
dnodeSendRpcMnodeWriteRsp(pWriteMsg, code);
}
return NULL;
}
static void dnodeFreeMnodeWriteMsg(void *pMsg) {
SMnodeMsg *pWrite = pMsg;
mnodeCleanupMsg(pWrite);
taosFreeQitem(pWrite);
}
void dnodeReprocessMnodeWriteMsg(void *pMsg) {
SMnodeMsg *pWrite = pMsg;
if (!mnodeIsRunning() || tsMWriteQueue == NULL) {
dnodeSendRedirectMsg(pWrite->msgType, pWrite->thandle, true);
dnodeSendRedirectMsg(pMsg, true);
dnodeFreeMnodeWriteMsg(pWrite);
} else {
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
......
......@@ -609,9 +609,9 @@ int32_t dnodeGetDnodeId() {
return tsDnodeCfg.dnodeId;
}
void dnodeSendRedirectMsg(int32_t msgType, void *thandle, bool forShell) {
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
SRpcConnInfo connInfo;
rpcGetConnInfo(thandle, &connInfo);
rpcGetConnInfo(rpcMsg->handle, &connInfo);
SRpcIpSet ipSet = {0};
if (forShell) {
......@@ -620,7 +620,7 @@ void dnodeSendRedirectMsg(int32_t msgType, void *thandle, bool forShell) {
dnodeGetMnodeIpSetForPeer(&ipSet);
}
dTrace("msg:%s will be redirected, dnodeIp:%s user:%s, numOfIps:%d inUse:%d", taosMsg[msgType],
dTrace("msg:%s will be redirected, dnodeIp:%s user:%s, numOfIps:%d inUse:%d", taosMsg[rpcMsg->msgType],
taosIpStr(connInfo.clientIp), connInfo.user, ipSet.numOfIps, ipSet.inUse);
for (int i = 0; i < ipSet.numOfIps; ++i) {
......@@ -628,5 +628,5 @@ void dnodeSendRedirectMsg(int32_t msgType, void *thandle, bool forShell) {
ipSet.port[i] = htons(ipSet.port[i]);
}
rpcSendRedirectRsp(thandle, &ipSet);
rpcSendRedirectRsp(rpcMsg->handle, &ipSet);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册