diff --git a/src/mnode/inc/mnodeSdb.h b/src/mnode/inc/mnodeSdb.h index ca2fffe24c76b48dc647a08d08ffdce9e8873240..eec6d45e23440547462f6cf78e9750cc35719c5b 100644 --- a/src/mnode/inc/mnodeSdb.h +++ b/src/mnode/inc/mnodeSdb.h @@ -53,6 +53,7 @@ typedef struct { void * rowData; int32_t rowSize; int32_t retCode; // for callback in sdb queue + int32_t processedCount; // for sync fwd callback int32_t (*cb)(struct SMnodeMsg *pMsg, int32_t code); struct SMnodeMsg *pMsg; } SSdbOper; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 0f657bdde8e75b0b23c7a1fa4d90bd459fb1dc51..4aed82095886e03f42c1cf21f97a617a2e7c24c1 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -247,20 +247,22 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { assert(param); SSdbOper * pOper = param; SMnodeMsg *pMsg = pOper->pMsg; + if (code <= 0) pOper->retCode = code; - if (code > 0) { + int32_t processedCount = atomic_add_fetch_32(&pOper->processedCount, 1); + if (processedCount <= 1) { if (pMsg != NULL) { - sdbDebug("app:%p:%p, waiting for slave to confirm this operation", pMsg->rpcMsg.ahandle, pMsg); + sdbDebug("app:%p:%p, waiting for confirm this operation, count:%d", pMsg->rpcMsg.ahandle, pMsg, processedCount); } return; } if (pMsg != NULL) { - sdbDebug("app:%p:%p, is confirmed and will do callback func, code:%s", pMsg->rpcMsg.ahandle, pMsg, tstrerror(code)); + sdbDebug("app:%p:%p, is confirmed and will do callback func", pMsg->rpcMsg.ahandle, pMsg); } if (pOper->cb != NULL) { - code = (*pOper->cb)(pMsg, code); + code = (*pOper->cb)(pMsg, pOper->retCode); } dnodeSendRpcMnodeWriteRsp(pMsg, code); @@ -543,31 +545,34 @@ static int sdbWrite(void *param, void *data, int type) { pthread_mutex_unlock(&tsSdbObj.mutex); return code; } - - // forward to peers, even it is WAL/FWD, it shall be called to update version in sync - int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC); - pthread_mutex_unlock(&tsSdbObj.mutex); - if (syncCode < 0) { - sdbDebug("table:%s, failed to forward request, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, - tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); - return syncCode; - } else if (syncCode > 0) { - sdbDebug("table:%s, forward request is sent, syncCode:%d action:%s record:%s version:%" PRId64, pTable->tableName, - syncCode, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); - } else { - } + pthread_mutex_unlock(&tsSdbObj.mutex); // from app, oper is created if (pOper != NULL) { - sdbDebug("table:%s, record from app is disposed, action:%s record:%s version:%" PRId64, pTable->tableName, - sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + // forward to peers + int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC); + if (syncCode <= 0) atomic_add_fetch_32(&pOper->processedCount, 1); + + if (syncCode < 0) { + sdbError("table:%s, failed to forward request, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, + tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + } else if (syncCode > 0) { + sdbDebug("table:%s, forward request is sent, action:%s record:%s version:%" PRId64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + } else { + sdbTrace("table:%s, no need to send fwd request, action:%s record:%s version:%" PRId64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + } return syncCode; - } else { - sdbDebug("table:%s, record from wal/fwd is disposed, action:%s record:%s version:%" PRId64, pTable->tableName, - sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); } + sdbDebug("table:%s, record from wal/fwd is disposed, action:%s record:%s version:%" PRId64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + + // even it is WAL/FWD, it shall be called to update version in sync + syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC); + // from wal or forward msg, oper not created, should add into hash if (action == SDB_ACTION_INSERT) { SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable}; @@ -972,11 +977,6 @@ static void *sdbWorkerFp(void *param) { if (type == TAOS_QTYPE_RPC) { pOper = (SSdbOper *)item; - if (pOper == NULL) { - taosFreeQitem(item); - continue; - } - sdbDecRef(pOper->table, pOper->pObj); sdbConfirmForward(NULL, pOper, pOper->retCode); } else if (type == TAOS_QTYPE_FWD) {