未验证 提交 598a72e9 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #6059 from taosdata/fix/TD-4081-v2

[TD-4081]<fix>: [v2] fix vnode confirm forward missing response & write msg freeing
...@@ -218,7 +218,7 @@ int32_t* taosGetErrno(); ...@@ -218,7 +218,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_NOT_SYNCED TAOS_DEF_ERROR_CODE(0, 0x0511) //"Database suspended") #define TSDB_CODE_VND_NOT_SYNCED TAOS_DEF_ERROR_CODE(0, 0x0511) //"Database suspended")
#define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied") #define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied")
#define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513) //"Database is syncing") #define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513) //"Database is syncing")
#define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0513) //"Invalid tsdb state") #define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514) //"Invalid tsdb state")
// tsdb // tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) //"Invalid table ID") #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) //"Invalid table ID")
......
...@@ -119,7 +119,6 @@ void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code) { ...@@ -119,7 +119,6 @@ void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code) {
void *pVnode = vnodeAcquire(vgId); void *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
vError("vgId:%d, vnode not found while confirm forward", vgId); vError("vgId:%d, vnode not found while confirm forward", vgId);
return;
} }
dnodeSendRpcVWriteRsp(pVnode, wparam, code); dnodeSendRpcVWriteRsp(pVnode, wparam, code);
...@@ -162,4 +161,4 @@ int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) { ...@@ -162,4 +161,4 @@ int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) {
void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code, bool force) { void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code, bool force) {
SVnodeObj *pVnode = vparam; SVnodeObj *pVnode = vparam;
syncConfirmForward(pVnode->sync, version, code, force); syncConfirmForward(pVnode->sync, version, code, force);
} }
\ No newline at end of file
...@@ -317,12 +317,13 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar ...@@ -317,12 +317,13 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) { void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
SVnodeObj *pVnode = vparam; SVnodeObj *pVnode = vparam;
if (pVnode) {
int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1);
int64_t queuedSize = atomic_sub_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len);
int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1); vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d size:%" PRId64, pVnode->vgId, pWrite,
int64_t queuedSize = atomic_sub_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len); pWrite->rpcMsg.ahandle, queued, queuedSize);
}
vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d size:%" PRId64, pVnode->vgId, pWrite,
pWrite->rpcMsg.ahandle, queued, queuedSize);
taosFreeQitem(pWrite); taosFreeQitem(pWrite);
vnodeRelease(pVnode); vnodeRelease(pVnode);
...@@ -371,8 +372,8 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { ...@@ -371,8 +372,8 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
taosMsleep(ms); taosMsleep(ms);
return 0; return 0;
} else { } else {
void *unUsed = NULL; void *unUsedTimerId = NULL;
taosTmrReset(vnodeFlowCtrlMsgToWQueue, 100, pWrite, tsDnodeTmr, &unUsed); taosTmrReset(vnodeFlowCtrlMsgToWQueue, 100, pWrite, tsDnodeTmr, &unUsedTimerId);
vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl, retry:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl, retry:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle,
pWrite->processedCount); pWrite->processedCount);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册