diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index 90d857155a2d890505d9199e6be47e828376820c..b53c66e00c3771d0745064ac5976510e4960ff17 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -131,8 +131,8 @@ static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) { taosFreeQitem(pWrite); } -void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) { - SMnodeMsg *pWrite = pRaw; +void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code) { + SMnodeMsg *pWrite = pMsg; if (pWrite == NULL) return; if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) { diff --git a/src/mnode/inc/mnodeSdb.h b/src/mnode/inc/mnodeSdb.h index ca2fffe24c76b48dc647a08d08ffdce9e8873240..eec6d45e23440547462f6cf78e9750cc35719c5b 100644 --- a/src/mnode/inc/mnodeSdb.h +++ b/src/mnode/inc/mnodeSdb.h @@ -53,6 +53,7 @@ typedef struct { void * rowData; int32_t rowSize; int32_t retCode; // for callback in sdb queue + int32_t processedCount; // for sync fwd callback int32_t (*cb)(struct SMnodeMsg *pMsg, int32_t code); struct SMnodeMsg *pMsg; } SSdbOper; diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 4707616ba85c303bc92c21eee8d63d9d2578c60c..804a64be0895c44727d245c4f598eacac83dcbc6 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -88,13 +88,13 @@ static int32_t mnodeDnodeActionDelete(SSdbOper *pOper) { } static int32_t mnodeDnodeActionUpdate(SSdbOper *pOper) { - SDnodeObj *pDnode = pOper->pObj; - SDnodeObj *pSaved = mnodeGetDnode(pDnode->dnodeId); - if (pSaved != NULL && pDnode != pSaved) { - memcpy(pSaved, pDnode, pOper->rowSize); - free(pDnode); - mnodeDecDnodeRef(pSaved); + SDnodeObj *pNew = pOper->pObj; + SDnodeObj *pDnode = mnodeGetDnode(pNew->dnodeId); + if (pDnode != NULL && pNew != pDnode) { + memcpy(pDnode, pNew, pOper->rowSize); + free(pNew); } + mnodeDecDnodeRef(pDnode); return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 761dce6720c1f43a14ff1c4d01ec3e844ae039ac..0c2e478a07baebdf8011e9f6632b8871c0f8e2de 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -72,8 +72,6 @@ typedef struct { void * sync; void * wal; SSyncCfg cfg; - sem_t sem; - int32_t code; int32_t numOfTables; SSdbTable *tableList[SDB_TABLE_MAX]; pthread_mutex_t mutex; @@ -244,27 +242,36 @@ static void sdbNotifyRole(void *ahandle, int8_t role) { sdbUpdateMnodeRoles(); } +FORCE_INLINE static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { - tsSdbObj.code = code; - sem_post(&tsSdbObj.sem); - sdbDebug("forward request confirmed, version:%" PRIu64 ", result:%s", (int64_t)param, tstrerror(code)); -} + assert(param); + SSdbOper * pOper = param; + SMnodeMsg *pMsg = pOper->pMsg; + if (code <= 0) pOper->retCode = code; + + int32_t processedCount = atomic_add_fetch_32(&pOper->processedCount, 1); + if (processedCount <= 1) { + if (pMsg != NULL) { + sdbDebug("app:%p:%p, waiting for confirm this operation, count:%d", pMsg->rpcMsg.ahandle, pMsg, processedCount); + } + return; + } - static int32_t sdbForwardToPeer(SWalHead *pHead) { - if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS; + if (pMsg != NULL) { + sdbDebug("app:%p:%p, is confirmed and will do callback func", pMsg->rpcMsg.ahandle, pMsg); + } - int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version, TAOS_QTYPE_RPC); - if (code > 0) { - sdbDebug("forward request is sent, version:%" PRIu64 ", code:%d", pHead->version, code); - sem_wait(&tsSdbObj.sem); - return tsSdbObj.code; - } - return code; + if (pOper->cb != NULL) { + pOper->retCode = (*pOper->cb)(pMsg, pOper->retCode); + } + + dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode); + taosFreeQitem(pOper); } void sdbUpdateSync() { SSyncCfg syncCfg = {0}; - int32_t index = 0; + int32_t index = 0; SDMMnodeInfos *mnodes = dnodeGetMnodeInfos(); for (int32_t i = 0; i < mnodes->nodeNum; ++i) { @@ -298,7 +305,7 @@ void sdbUpdateSync() { } syncCfg.replica = index; - syncCfg.quorum = (syncCfg.replica == 1) ? 1:2; + syncCfg.quorum = (syncCfg.replica == 1) ? 1 : 2; bool hasThisDnode = false; for (int32_t i = 0; i < syncCfg.replica; ++i) { @@ -325,10 +332,10 @@ void sdbUpdateSync() { syncInfo.getWalInfo = sdbGetWalInfo; syncInfo.getFileInfo = sdbGetFileInfo; syncInfo.writeToCache = sdbWriteToQueue; - syncInfo.confirmForward = sdbConfirmForward; + syncInfo.confirmForward = sdbConfirmForward; syncInfo.notifyRole = sdbNotifyRole; tsSdbObj.cfg = syncCfg; - + if (tsSdbObj.sync) { syncReconfig(tsSdbObj.sync, &syncCfg); } else { @@ -339,7 +346,6 @@ void sdbUpdateSync() { int32_t sdbInit() { pthread_mutex_init(&tsSdbObj.mutex, NULL); - sem_init(&tsSdbObj.sem, 0, 0); if (sdbInitWriteWorker() != 0) { return -1; @@ -379,7 +385,6 @@ void sdbCleanUp() { tsSdbObj.wal = NULL; } - sem_destroy(&tsSdbObj.sem); pthread_mutex_destroy(&tsSdbObj.mutex); } @@ -513,24 +518,22 @@ static int sdbWrite(void *param, void *data, int type) { assert(pTable != NULL); pthread_mutex_lock(&tsSdbObj.mutex); + if (pHead->version == 0) { - // assign version + // assign version tsSdbObj.version++; pHead->version = tsSdbObj.version; } else { // for data from WAL or forward, version may be smaller if (pHead->version <= tsSdbObj.version) { pthread_mutex_unlock(&tsSdbObj.mutex); - if (type == TAOS_QTYPE_FWD && tsSdbObj.sync != NULL) { - sdbDebug("forward request is received, version:%" PRIu64 " confirm it", pHead->version); - syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS); - } + sdbDebug("table:%s, failed to restore %s record:%s from source(%d), version:%" PRId64 " too large, sdb version:%" PRId64, + pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version); return TSDB_CODE_SUCCESS; } else if (pHead->version != tsSdbObj.version + 1) { pthread_mutex_unlock(&tsSdbObj.mutex); - sdbError("table:%s, failed to restore %s record:%s from wal, version:%" PRId64 " too large, sdb version:%" PRId64, - pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, - tsSdbObj.version); + sdbError("table:%s, failed to restore %s record:%s from source(%d), version:%" PRId64 " too large, sdb version:%" PRId64, + pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version); return TSDB_CODE_MND_APP_ERROR; } else { tsSdbObj.version = pHead->version; @@ -542,28 +545,36 @@ static int sdbWrite(void *param, void *data, int type) { pthread_mutex_unlock(&tsSdbObj.mutex); return code; } - - code = sdbForwardToPeer(pHead); + pthread_mutex_unlock(&tsSdbObj.mutex); // from app, oper is created if (pOper != NULL) { - sdbTrace("record from app is disposed, table:%s action:%s record:%s version:%" PRIu64 " result:%s", - pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, - tstrerror(code)); - return code; + // forward to peers + pOper->processedCount = 0; + int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC); + if (syncCode <= 0) pOper->processedCount = 1; + + if (syncCode < 0) { + sdbError("table:%s, failed to forward request, result:%s action:%s record:%s version:%" PRId64, pTable->tableName, + tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + } else if (syncCode > 0) { + sdbDebug("table:%s, forward request is sent, action:%s record:%s version:%" PRId64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + } else { + sdbTrace("table:%s, no need to send fwd request, action:%s record:%s version:%" PRId64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + } + return syncCode; } - // from wal or forward msg, oper not created, should add into hash - if (tsSdbObj.sync != NULL) { - sdbTrace("record from wal forward is disposed, table:%s action:%s record:%s version:%" PRIu64 " confirm it", - pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); - syncConfirmForward(tsSdbObj.sync, pHead->version, code); - } else { - sdbTrace("record from wal restore is disposed, table:%s action:%s record:%s version:%" PRIu64, pTable->tableName, - sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); - } + sdbDebug("table:%s, record from wal/fwd is disposed, action:%s record:%s version:%" PRId64, pTable->tableName, + sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version); + // even it is WAL/FWD, it shall be called to update version in sync + syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC); + + // from wal or forward msg, oper not created, should add into hash if (action == SDB_ACTION_INSERT) { SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable}; code = (*pTable->decodeFp)(&oper); @@ -627,7 +638,7 @@ int32_t sdbInsertRow(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbDebug("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, + sdbDebug("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); } @@ -677,7 +688,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbDebug("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, + sdbDebug("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); } @@ -727,7 +738,7 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { memcpy(pNewOper, pOper, sizeof(SSdbOper)); if (pNewOper->pMsg != NULL) { - sdbDebug("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, + sdbDebug("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); } @@ -943,20 +954,20 @@ static void *sdbWorkerFp(void *param) { taosGetQitem(tsSdbWriteQall, &type, &item); if (type == TAOS_QTYPE_RPC) { pOper = (SSdbOper *)item; + pOper->processedCount = 1; pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK; + if (pOper->pMsg != NULL) { + sdbDebug("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue", + pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj, + sdbGetKeyStr(pOper->table, pHead->cont), pHead->version); + } } else { pHead = (SWalHead *)item; pOper = NULL; } - if (pOper != NULL && pOper->pMsg != NULL) { - sdbDebug("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue", - pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj, - sdbGetKeyStr(pOper->table, pHead->cont), pHead->version); - } - int32_t code = sdbWrite(pOper, pHead, type); - if (pOper) pOper->retCode = code; + if (pOper && code <= 0) pOper->retCode = code; } walFsync(tsSdbObj.wal); @@ -965,25 +976,17 @@ static void *sdbWorkerFp(void *param) { taosResetQitems(tsSdbWriteQall); for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(tsSdbWriteQall, &type, &item); + if (type == TAOS_QTYPE_RPC) { pOper = (SSdbOper *)item; - if (pOper != NULL && pOper->cb != NULL) { - sdbTrace("app:%p:%p, will do callback func, index:%d", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, i); - pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode); - } - - if (pOper != NULL && pOper->pMsg != NULL) { - sdbTrace("app:%p:%p, msg is processed, result:%s", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, - tstrerror(pOper->retCode)); - } - - if (pOper != NULL) { - sdbDecRef(pOper->table, pOper->pObj); - } - - dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode); + sdbDecRef(pOper->table, pOper->pObj); + sdbConfirmForward(NULL, pOper, pOper->retCode); + } else if (type == TAOS_QTYPE_FWD) { + syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS); + taosFreeQitem(item); + } else { + taosFreeQitem(item); } - taosFreeQitem(item); } }