diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index a6aed29e3b3a72941c878eee4304fe42e2e1e2a2..b53c66e00c3771d0745064ac5976510e4960ff17 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -140,8 +140,6 @@ void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code) { return; } - if (code > 0) return; - SRpcMsg rpcRsp = { .handle = pWrite->rpcMsg.handle, .pCont = pWrite->rpcRsp.rsp, diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index cdcb426ba273cb80fd98143575d656299e2b4123..70354cf55038fbed5e355d54697c0c16cee30eb5 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -244,12 +244,20 @@ static void sdbNotifyRole(void *ahandle, int8_t role) { FORCE_INLINE static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { - SMnodeMsg *pMsg = param; + if (code > 0) return; - if (pMsg) { - sdbDebug("app:%p:%p, forward request is confirmed, result:%s", pMsg->rpcMsg.ahandle, pMsg, tstrerror(code)); + assert(param); + + SSdbOper * pOper = param; + SMnodeMsg *pMsg = pOper->pMsg; + + if (pOper->cb != NULL) { + sdbDebug("app:%p:%p, is confirmed and will do callback func", pMsg->rpcMsg.ahandle, pMsg); + pOper->retCode = (*pOper->cb)(pMsg, code); } - dnodeSendRpcMnodeWriteRsp(pMsg, code); + + dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode); + taosFreeQitem(pOper); } void sdbUpdateSync() { @@ -529,7 +537,6 @@ static int sdbWrite(void *param, void *data, int type) { return code; } - // forward to peers, even it is WAL/FWD, it shall be called to update version in sync void *mhandle = NULL; if (pOper != NULL) mhandle = pOper->pMsg; @@ -541,18 +548,19 @@ static int sdbWrite(void *param, void *data, int type) { tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); return syncCode; } else if (syncCode > 0) { - sdbDebug("table:%s, forward request is sent, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, - tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); - } else {} + sdbDebug("table:%s, forward request is sent, syncCode:%d action:%s record:%s version:%" PRId64, pTable->tableName, + syncCode, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + } else { + } // from app, oper is created if (pOper != NULL) { - sdbDebug("table:%s, record from app is disposed, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, - tstrerror(code), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + sdbDebug("table:%s, record from app is disposed, action:%s record:%s version:%" PRId64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); return syncCode; } else { - sdbDebug("table:%s, record from wal/fwd is disposed, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, - tstrerror(code), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + sdbDebug("table:%s, record from wal/fwd is disposed, action:%s record:%s version:%" PRId64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); } // from wal or forward msg, oper not created, should add into hash @@ -619,7 +627,7 @@ int32_t sdbInsertRow(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbDebug("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, + sdbDebug("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); } @@ -669,7 +677,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbDebug("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, + sdbDebug("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); } @@ -719,7 +727,7 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbDebug("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, + sdbDebug("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); } @@ -964,18 +972,14 @@ static void *sdbWorkerFp(void *param) { continue; } - if (pOper->cb != NULL) { - sdbTrace("app:%p:%p, will do callback func, index:%d", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, i); - pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode); - } - - dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode); sdbDecRef(pOper->table, pOper->pObj); + sdbConfirmForward(NULL, pOper, pOper->retCode); } else if (type == TAOS_QTYPE_FWD) { syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS); + taosFreeQitem(item); } else { + taosFreeQitem(item); } - taosFreeQitem(item); } }