提交 442902e5 编写于 作者: B Benguang Zhao

fix: send rpc response on closing sync or failing to enqueue

上级 8a773d6b
...@@ -134,6 +134,13 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf ...@@ -134,6 +134,13 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
} }
} }
static void vmSendResponse(SRpcMsg *pMsg) {
if (pMsg->info.handle) {
SRpcMsg rsp = {.info = pMsg->info, .code = terrno};
rpcSendResponse(&rsp);
}
}
static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) { static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
if (pMsg->contLen < sizeof(SMsgHead)) { if (pMsg->contLen < sizeof(SMsgHead)) {
...@@ -152,7 +159,9 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp ...@@ -152,7 +159,9 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
if (pVnode == NULL) { if (pVnode == NULL) {
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg, dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg,
terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen); terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen);
return terrno != 0 ? terrno : -1; terrno = (terrno != 0) ? terrno : -1;
vmSendResponse(pMsg);
return terrno;
} }
switch (qtype) { switch (qtype) {
......
...@@ -182,7 +182,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -182,7 +182,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
vError("vgId:%d, duplicate write request. version: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), version, vError("vgId:%d, duplicate write request. version: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), version,
pVnode->state.applied); pVnode->state.applied);
terrno = TSDB_CODE_VND_DUP_REQUEST; terrno = TSDB_CODE_VND_DUP_REQUEST;
pRsp->info.handle = NULL;
return -1; return -1;
} }
......
...@@ -1252,6 +1252,9 @@ void syncNodePreClose(SSyncNode* pSyncNode) { ...@@ -1252,6 +1252,9 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
// stop heartbeat timer // stop heartbeat timer
syncNodeStopHeartbeatTimer(pSyncNode); syncNodeStopHeartbeatTimer(pSyncNode);
// clean rsp
syncRespCleanRsp(pSyncNode->pSyncRespMgr);
} }
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); } void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
...@@ -2667,16 +2670,12 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn ...@@ -2667,16 +2670,12 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
} }
int32_t code = syncNodeAppend(ths, pEntry); int32_t code = syncNodeAppend(ths, pEntry);
if (code < 0) {
sNError(ths, "failed to append blocking msg");
}
return code; return code;
} else { } else {
syncEntryDestroy(pEntry); syncEntryDestroy(pEntry);
pEntry = NULL; pEntry = NULL;
return -1;
} }
return -1;
} }
int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) { int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册