未验证 提交 9f9aa0ee 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18952 from taosdata/fix/TS-2255

fix: reduce the frequency of retry in sync not ready case while alter db
...@@ -75,6 +75,7 @@ void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, voi ...@@ -75,6 +75,7 @@ void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, voi
void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname); void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname);
void mndTransSetSerial(STrans *pTrans); void mndTransSetSerial(STrans *pTrans);
void mndTransSetOper(STrans *pTrans, EOperType oper); void mndTransSetOper(STrans *pTrans, EOperType oper);
int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
int32_t mndTransProcessRsp(SRpcMsg *pRsp); int32_t mndTransProcessRsp(SRpcMsg *pRsp);
......
...@@ -776,6 +776,8 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p ...@@ -776,6 +776,8 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p
int32_t code = -1; int32_t code = -1;
mndTransSetDbName(pTrans, pOld->name, NULL); mndTransSetDbName(pTrans, pOld->name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) return -1;
if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
...@@ -835,12 +837,14 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) { ...@@ -835,12 +837,14 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
if (terrno != 0) code = terrno;
mError("db:%s, failed to alter since %s", alterReq.db, terrstr()); mError("db:%s, failed to alter since %s", alterReq.db, terrstr());
} }
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
taosArrayDestroy(dbObj.cfg.pRetensions); taosArrayDestroy(dbObj.cfg.pRetensions);
terrno = code;
return code; return code;
} }
...@@ -1183,7 +1187,8 @@ int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUs ...@@ -1183,7 +1187,8 @@ int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUs
int32_t numOfTable = mndGetDBTableNum(pDb, pMnode); int32_t numOfTable = mndGetDBTableNum(pDb, pMnode);
if (pReq == NULL || pReq->vgVersion < pDb->vgVersion || pReq->dbId != pDb->uid || numOfTable != pReq->numOfTable || pReq->stateTs < pDb->stateTs) { if (pReq == NULL || pReq->vgVersion < pDb->vgVersion || pReq->dbId != pDb->uid || numOfTable != pReq->numOfTable ||
pReq->stateTs < pDb->stateTs) {
mndBuildDBVgroupInfo(pDb, pMnode, pRsp->pVgroupInfos); mndBuildDBVgroupInfo(pDb, pMnode, pRsp->pVgroupInfos);
} }
...@@ -1298,21 +1303,22 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, ...@@ -1298,21 +1303,22 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
SUseDbRsp usedbRsp = {0}; SUseDbRsp usedbRsp = {0};
if ((0 == strcasecmp(pDbVgVersion->dbFName, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcasecmp(pDbVgVersion->dbFName, TSDB_PERFORMANCE_SCHEMA_DB)))) { if ((0 == strcasecmp(pDbVgVersion->dbFName, TSDB_INFORMATION_SCHEMA_DB) ||
(0 == strcasecmp(pDbVgVersion->dbFName, TSDB_PERFORMANCE_SCHEMA_DB)))) {
memcpy(usedbRsp.db, pDbVgVersion->dbFName, TSDB_DB_FNAME_LEN); memcpy(usedbRsp.db, pDbVgVersion->dbFName, TSDB_DB_FNAME_LEN);
int32_t vgVersion = mndGetGlobalVgroupVersion(pMnode); int32_t vgVersion = mndGetGlobalVgroupVersion(pMnode);
if (pDbVgVersion->vgVersion < vgVersion) { if (pDbVgVersion->vgVersion < vgVersion) {
usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo)); usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo));
mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos); mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos);
usedbRsp.vgVersion = vgVersion++; usedbRsp.vgVersion = vgVersion++;
} else { } else {
usedbRsp.vgVersion = pDbVgVersion->vgVersion; usedbRsp.vgVersion = pDbVgVersion->vgVersion;
} }
usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos); usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos);
taosArrayPush(batchUseRsp.pArray, &usedbRsp); taosArrayPush(batchUseRsp.pArray, &usedbRsp);
continue; continue;
} }
...@@ -1328,7 +1334,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, ...@@ -1328,7 +1334,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
int32_t numOfTable = mndGetDBTableNum(pDb, pMnode); int32_t numOfTable = mndGetDBTableNum(pDb, pMnode);
if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable && if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable &&
pDbVgVersion->stateTs == pDb->stateTs) { pDbVgVersion->stateTs == pDb->stateTs) {
mTrace("db:%s, valid dbinfo, vgVersion:%d stateTs:%" PRId64 mTrace("db:%s, valid dbinfo, vgVersion:%d stateTs:%" PRId64
" numOfTables:%d, not changed vgVersion:%d stateTs:%" PRId64 " numOfTables:%d", " numOfTables:%d, not changed vgVersion:%d stateTs:%" PRId64 " numOfTables:%d",
......
...@@ -838,7 +838,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { ...@@ -838,7 +838,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
return conflict; return conflict;
} }
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans) {
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) { if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (strlen(pTrans->dbname) == 0 && strlen(pTrans->stbname) == 0) { if (strlen(pTrans->dbname) == 0 && strlen(pTrans->stbname) == 0) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT; terrno = TSDB_CODE_MND_TRANS_CONFLICT;
...@@ -853,6 +853,14 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { ...@@ -853,6 +853,14 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
return -1; return -1;
} }
return 0;
}
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
return -1;
}
if (taosArrayGetSize(pTrans->commitActions) <= 0) { if (taosArrayGetSize(pTrans->commitActions) <= 0) {
terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL; terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
...@@ -1027,6 +1035,7 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) { ...@@ -1027,6 +1035,7 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
if (pAction != NULL) { if (pAction != NULL) {
pAction->msgReceived = 1; pAction->msgReceived = 1;
pAction->errCode = pRsp->code; pAction->errCode = pRsp->code;
pTrans->lastErrorNo = pRsp->code;
} }
mInfo("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId, mndTransStr(pAction->stage), mInfo("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId, mndTransStr(pAction->stage),
...@@ -1238,7 +1247,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) ...@@ -1238,7 +1247,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
if (numOfActions == 0) return code; if (numOfActions == 0) return code;
if (pTrans->redoActionPos >= numOfActions) return code; if (pTrans->redoActionPos >= numOfActions) return code;
mInfo("trans:%d, execute %d actions serial", pTrans->id, numOfActions); mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->redoActionPos);
for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) { for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos); STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos);
...@@ -1289,13 +1298,16 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) ...@@ -1289,13 +1298,16 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mInfo("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id); mInfo("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id);
break; break;
} else if (code == pAction->retryCode) { } else if (code == pAction->retryCode || code == TSDB_CODE_SYN_PROPOSE_NOT_READY ||
code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_SYN_NOT_LEADER) {
mInfo("trans:%d, %s:%d receive code:0x%x and retry", pTrans->id, mndTransStr(pAction->stage), pAction->id, code); mInfo("trans:%d, %s:%d receive code:0x%x and retry", pTrans->id, mndTransStr(pAction->stage), pAction->id, code);
pTrans->lastErrorNo = code;
taosMsleep(300); taosMsleep(300);
action--; action--;
continue; continue;
} else { } else {
terrno = code; terrno = code;
pTrans->lastErrorNo = code;
pTrans->code = code; pTrans->code = code;
mInfo("trans:%d, %s:%d receive code:0x%x and wait another schedule, failedTimes:%d", pTrans->id, mInfo("trans:%d, %s:%d receive code:0x%x and wait another schedule, failedTimes:%d", pTrans->id,
mndTransStr(pAction->stage), pAction->id, code, pTrans->failedTimes); mndTransStr(pAction->stage), pAction->id, code, pTrans->failedTimes);
......
...@@ -322,32 +322,34 @@ struct STsdbKeepCfg { ...@@ -322,32 +322,34 @@ struct STsdbKeepCfg {
}; };
struct SVnode { struct SVnode {
char* path; char* path;
SVnodeCfg config; SVnodeCfg config;
SVState state; SVState state;
SVStatis statis; SVStatis statis;
STfs* pTfs; STfs* pTfs;
SMsgCb msgCb; SMsgCb msgCb;
TdThreadMutex mutex; TdThreadMutex mutex;
TdThreadCond poolNotEmpty; TdThreadCond poolNotEmpty;
SVBufPool* pPool; SVBufPool* pPool;
SVBufPool* inUse; SVBufPool* inUse;
SMeta* pMeta; SMeta* pMeta;
SSma* pSma; SSma* pSma;
STsdb* pTsdb; STsdb* pTsdb;
SWal* pWal; SWal* pWal;
STQ* pTq; STQ* pTq;
SSink* pSink; SSink* pSink;
tsem_t canCommit; tsem_t canCommit;
int64_t sync; int64_t sync;
TdThreadMutex lock; TdThreadMutex lock;
bool blocked; bool blocked;
bool restored; bool restored;
tsem_t syncSem; tsem_t syncSem;
int32_t blockSec; int32_t blockSec;
int64_t blockSeq; int64_t blockSeq;
SQHandle* pQuery;
#if 0
SRpcHandleInfo blockInfo; SRpcHandleInfo blockInfo;
SQHandle* pQuery; #endif
}; };
#define TD_VID(PVNODE) ((PVNODE)->config.vgId) #define TD_VID(PVNODE) ((PVNODE)->config.vgId)
......
...@@ -216,7 +216,9 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak ...@@ -216,7 +216,9 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak
pVnode->blocked = true; pVnode->blocked = true;
pVnode->blockSec = taosGetTimestampSec(); pVnode->blockSec = taosGetTimestampSec();
pVnode->blockSeq = seq; pVnode->blockSeq = seq;
#if 0
pVnode->blockInfo = pMsg->info; pVnode->blockInfo = pMsg->info;
#endif
} }
taosThreadMutexUnlock(&pVnode->lock); taosThreadMutexUnlock(&pVnode->lock);
...@@ -628,10 +630,12 @@ void vnodeSyncCheckTimeout(SVnode *pVnode) { ...@@ -628,10 +630,12 @@ void vnodeSyncCheckTimeout(SVnode *pVnode) {
vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64, vError("vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%" PRId64,
pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq); pVnode->config.vgId, pVnode->blockSec, curSec, delta, pVnode->blockSeq);
if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) { if (syncSendTimeoutRsp(pVnode->sync, pVnode->blockSeq) != 0) {
#if 0
SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo}; SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo};
vInfo("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq, vError("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq,
rpcMsg.info.handle, rpcMsg.info.ahandle); rpcMsg.info.handle, rpcMsg.info.ahandle);
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
#endif
} }
pVnode->blocked = false; pVnode->blocked = false;
pVnode->blockSec = 0; pVnode->blockSec = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册