diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 2372fa30e5bfc1e27b32ff10672929a328029865..07066d2251235d325aa9f64f2751c8248e8c7e54 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -75,6 +75,7 @@ void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, voi void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname); void mndTransSetSerial(STrans *pTrans); void mndTransSetOper(STrans *pTrans, EOperType oper); +int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); int32_t mndTransProcessRsp(SRpcMsg *pRsp); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 43155124c11fef5cede04eb2c17987d99ed80e5c..0715556da2966b2f09d279ef2b10bcea0b6185d9 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -776,6 +776,8 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p int32_t code = -1; mndTransSetDbName(pTrans, pOld->name, NULL); + if (mndTrancCheckConflict(pMnode, pTrans) != 0) return -1; + if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; @@ -835,12 +837,14 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) { _OVER: if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + if (terrno != 0) code = terrno; mError("db:%s, failed to alter since %s", alterReq.db, terrstr()); } mndReleaseDb(pMnode, pDb); taosArrayDestroy(dbObj.cfg.pRetensions); + terrno = code; return code; } @@ -1183,7 +1187,8 @@ int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUs int32_t numOfTable = mndGetDBTableNum(pDb, pMnode); - if (pReq == NULL || pReq->vgVersion < pDb->vgVersion || pReq->dbId != pDb->uid || numOfTable != pReq->numOfTable || pReq->stateTs < pDb->stateTs) { + if (pReq == NULL || pReq->vgVersion < pDb->vgVersion || pReq->dbId != pDb->uid || numOfTable != pReq->numOfTable || + pReq->stateTs < pDb->stateTs) { mndBuildDBVgroupInfo(pDb, pMnode, pRsp->pVgroupInfos); } @@ -1298,21 +1303,22 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, SUseDbRsp usedbRsp = {0}; - if ((0 == strcasecmp(pDbVgVersion->dbFName, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcasecmp(pDbVgVersion->dbFName, TSDB_PERFORMANCE_SCHEMA_DB)))) { + if ((0 == strcasecmp(pDbVgVersion->dbFName, TSDB_INFORMATION_SCHEMA_DB) || + (0 == strcasecmp(pDbVgVersion->dbFName, TSDB_PERFORMANCE_SCHEMA_DB)))) { memcpy(usedbRsp.db, pDbVgVersion->dbFName, TSDB_DB_FNAME_LEN); int32_t vgVersion = mndGetGlobalVgroupVersion(pMnode); if (pDbVgVersion->vgVersion < vgVersion) { usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo)); - + mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos); usedbRsp.vgVersion = vgVersion++; } else { usedbRsp.vgVersion = pDbVgVersion->vgVersion; } usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos); - + taosArrayPush(batchUseRsp.pArray, &usedbRsp); - + continue; } @@ -1328,7 +1334,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, int32_t numOfTable = mndGetDBTableNum(pDb, pMnode); - if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable && + if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable && pDbVgVersion->stateTs == pDb->stateTs) { mTrace("db:%s, valid dbinfo, vgVersion:%d stateTs:%" PRId64 " numOfTables:%d, not changed vgVersion:%d stateTs:%" PRId64 " numOfTables:%d", diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 4b11884050752a17e57d2a73b237de3bf02eddc1..42037304b941a5280ea7f591317125673729784d 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -838,7 +838,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { return conflict; } -int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { +int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans) { if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) { if (strlen(pTrans->dbname) == 0 && strlen(pTrans->stbname) == 0) { terrno = TSDB_CODE_MND_TRANS_CONFLICT; @@ -853,6 +853,14 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { return -1; } + return 0; +} + +int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { + if (mndTrancCheckConflict(pMnode, pTrans) != 0) { + return -1; + } + if (taosArrayGetSize(pTrans->commitActions) <= 0) { terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL; mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); @@ -1027,6 +1035,7 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) { if (pAction != NULL) { pAction->msgReceived = 1; pAction->errCode = pRsp->code; + pTrans->lastErrorNo = pRsp->code; } mInfo("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId, mndTransStr(pAction->stage), @@ -1238,7 +1247,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) if (numOfActions == 0) return code; if (pTrans->redoActionPos >= numOfActions) return code; - mInfo("trans:%d, execute %d actions serial", pTrans->id, numOfActions); + mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->redoActionPos); for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos); @@ -1289,13 +1298,16 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { mInfo("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id); break; - } else if (code == pAction->retryCode) { + } else if (code == pAction->retryCode || code == TSDB_CODE_SYN_PROPOSE_NOT_READY || + code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_SYN_NOT_LEADER) { mInfo("trans:%d, %s:%d receive code:0x%x and retry", pTrans->id, mndTransStr(pAction->stage), pAction->id, code); + pTrans->lastErrorNo = code; taosMsleep(300); action--; continue; } else { terrno = code; + pTrans->lastErrorNo = code; pTrans->code = code; mInfo("trans:%d, %s:%d receive code:0x%x and wait another schedule, failedTimes:%d", pTrans->id, mndTransStr(pAction->stage), pAction->id, code, pTrans->failedTimes); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 958200da4c612bc0454e2ca7af4fd0b8739f4fa6..094468c66682f34d517cbc31f22ae96531e8e373 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -322,32 +322,34 @@ struct STsdbKeepCfg { }; struct SVnode { - char* path; - SVnodeCfg config; - SVState state; - SVStatis statis; - STfs* pTfs; - SMsgCb msgCb; - TdThreadMutex mutex; - TdThreadCond poolNotEmpty; - SVBufPool* pPool; - SVBufPool* inUse; - SMeta* pMeta; - SSma* pSma; - STsdb* pTsdb; - SWal* pWal; - STQ* pTq; - SSink* pSink; - tsem_t canCommit; - int64_t sync; - TdThreadMutex lock; - bool blocked; - bool restored; - tsem_t syncSem; - int32_t blockSec; - int64_t blockSeq; + char* path; + SVnodeCfg config; + SVState state; + SVStatis statis; + STfs* pTfs; + SMsgCb msgCb; + TdThreadMutex mutex; + TdThreadCond poolNotEmpty; + SVBufPool* pPool; + SVBufPool* inUse; + SMeta* pMeta; + SSma* pSma; + STsdb* pTsdb; + SWal* pWal; + STQ* pTq; + SSink* pSink; + tsem_t canCommit; + int64_t sync; + TdThreadMutex lock; + bool blocked; + bool restored; + tsem_t syncSem; + int32_t blockSec; + int64_t blockSeq; + SQHandle* pQuery; +#if 0 SRpcHandleInfo blockInfo; - SQHandle* pQuery; +#endif }; #define TD_VID(PVNODE) ((PVNODE)->config.vgId) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 8e907261ffbef0d1aa3a14eee59e5c7fe7136c4a..2c23646db1c874a0cbb671784fbcdcec0e5918ec 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -216,7 +216,9 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak pVnode->blocked = true; pVnode->blockSec = taosGetTimestampSec(); pVnode->blockSeq = seq; +#if 0 pVnode->blockInfo = pMsg->info; +#endif } taosThreadMutexUnlock(&pVnode->lock); @@ -628,10 +630,12 @@ void vnodeSyncCheckTimeout(SVnode *pVnode) { vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64, pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq); if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) { +#if 0 SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo}; - vInfo("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq, + vError("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq, rpcMsg.info.handle, rpcMsg.info.ahandle); rpcSendResponse(&rpcMsg); +#endif } pVnode->blocked = false; pVnode->blockSec = 0;