diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 987fc5441653a09c27d889b03af30150622f96a3..e5893fd94740fa20fa244bd1957a02a50e39bf08 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -130,7 +130,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { _OVER: if (code != 0) { - dError("msg:%p, failed to process since %s", pMsg, terrstr()); + dTrace("msg:%p, failed to process since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pRpc->msgType)); if (terrno != 0) code = terrno; if (IsReq(pRpc)) { diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index d9408467ad7bdee87bfec48913f453bb6a64407d..a7e1f7cd025d55ff1a7e9a38125d444928e36084 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -31,17 +31,18 @@ typedef enum { typedef struct { int32_t id; - tmsg_t msgType; - int8_t msgSent; - int8_t msgReceived; + int32_t errCode; + int32_t acceptableCode; + int8_t stage; int8_t isRaw; int8_t rawWritten; - SSdbRaw *pRaw; + int8_t msgSent; + int8_t msgReceived; + tmsg_t msgType; SEpSet epSet; - int32_t errCode; - int32_t acceptableCode; int32_t contLen; void *pCont; + SSdbRaw *pRaw; } STransAction; typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen); diff --git a/source/dnode/mnode/impl/src/mndAcct.c b/source/dnode/mnode/impl/src/mndAcct.c index a4fde4b70670952dbf14554aa0fce15f77cb49f5..f3ec3a421b6290dbe00997ba13707d62459dccff 100644 --- a/source/dnode/mnode/impl/src/mndAcct.c +++ b/source/dnode/mnode/impl/src/mndAcct.c @@ -78,10 +78,8 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) { if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mDebug("acct:%s, will be created while deploy sdb, raw:%p", acctObj.acct, pRaw); -#if 0 - return sdbWrite(pMnode->pSdb, pRaw); -#else + mDebug("acct:%s, will be created when deploying, raw:%p", acctObj.acct, pRaw); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_ACCT, NULL); if (pTrans == NULL) { mError("acct:%s, failed to create since %s", acctObj.acct, terrstr()); @@ -94,7 +92,6 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) { mndTransDrop(pTrans); return -1; } - sdbSetRawStatus(pRaw, SDB_STATUS_READY); if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); @@ -104,7 +101,6 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) { mndTransDrop(pTrans); return 0; -#endif } static SSdbRaw *mndAcctActionEncode(SAcctObj *pAcct) { diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index a421be5c062a709bdd1e74f583a95142da2aac82..76c8acf407762cbb4d6d455f2bd552f055ecd0f4 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -172,13 +172,13 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { clusterObj.id = mndGenerateUid(clusterObj.name, TSDB_CLUSTER_ID_LEN); clusterObj.id = (clusterObj.id >= 0 ? clusterObj.id : -clusterObj.id); pMnode->clusterId = clusterObj.id; - mDebug("cluster:%" PRId64 ", name is %s", clusterObj.id, clusterObj.name); + mInfo("cluster:%" PRId64 ", name is %s", clusterObj.id, clusterObj.name); SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj); if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mDebug("cluster:%" PRId64 ", will be created while deploy sdb, raw:%p", clusterObj.id, pRaw); + mDebug("cluster:%" PRId64 ", will be created when deploying, raw:%p", clusterObj.id, pRaw); #if 0 return sdbWrite(pMnode->pSdb, pRaw); #else diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 95d3383ee10e378c4c5a66e9d16de4fda90db9ed..e3f843f0c7a128b12e51df5891e8189d512101fb 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1314,7 +1314,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, SDbObj *pDb = mndAcquireDb(pMnode, pDbVgVersion->dbFName); if (pDb == NULL) { - mDebug("db:%s, no exist", pDbVgVersion->dbFName); + mTrace("db:%s, no exist", pDbVgVersion->dbFName); memcpy(usedbRsp.db, pDbVgVersion->dbFName, TSDB_DB_FNAME_LEN); usedbRsp.uid = pDbVgVersion->dbId; usedbRsp.vgVersion = -1; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 22f858c60bdbfd56652570195b89cbf3f207651a..8e06139c8c041672a292672fc9712bbabfcb74fd 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -98,7 +98,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) { if (pRaw == NULL) return -1; if (sdbSetRawStatus(pRaw, SDB_STATUS_READY) != 0) return -1; - mDebug("dnode:%d, will be created while deploy sdb, raw:%p", dnodeObj.id, pRaw); + mDebug("dnode:%d, will be created when deploying, raw:%p", dnodeObj.id, pRaw); #if 0 return sdbWrite(pMnode->pSdb, pRaw); @@ -388,9 +388,10 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { mndReleaseMnode(pMnode, pObj); } + int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE); int64_t curMs = taosGetTimestampMs(); bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); - bool dnodeChanged = (statusReq.dnodeVer != sdbGetTableVer(pMnode->pSdb, SDB_DNODE)); + bool dnodeChanged = (statusReq.dnodeVer != dnodeVer); bool reboot = (pDnode->rebootTime != statusReq.rebootTime); bool needCheck = !online || dnodeChanged || reboot; @@ -433,7 +434,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { if (!online) { mInfo("dnode:%d, from offline to online", pDnode->id); } else { - mDebug("dnode:%d, send dnode eps", pDnode->id); + mDebug("dnode:%d, send dnode epset, online:%d ver:% " PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online, + statusReq.dnodeVer, dnodeVer, reboot); } pDnode->rebootTime = statusReq.rebootTime; @@ -441,7 +443,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes; SStatusRsp statusRsp = {0}; - statusRsp.dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE); + statusRsp.dnodeVer = dnodeVer; statusRsp.dnodeCfg.dnodeId = pDnode->id; statusRsp.dnodeCfg.clusterId = pMnode->clusterId; statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp)); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 0ac36c20ed3463a5bd9e794c83edab22b1675408..2a2a45a45d5759dfb8fbe8f0dc4662cfab8fcd14 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -472,7 +472,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { } else if (code == 0) { mTrace("msg:%p, successfully processed and response", pMsg); } else { - mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, + mDebug("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); } diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 23634be77b3ca6f21f31019275353de12ab6c83b..8c5ea840af85ce6d926d11c2985959b0fa0c9c04 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -90,7 +90,7 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) { if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mDebug("mnode:%d, will be created while deploy sdb, raw:%p", mnodeObj.id, pRaw); + mDebug("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw); #if 0 return sdbWrite(pMnode->pSdb, pRaw); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index b33c09a0f9d0a4740a3b0b9ce9fb06dd5ea878ae..81c3b24d97ef19c652dda58c551ea075c6323a71 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1597,7 +1597,7 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) { pReq->info.rspLen = rspLen; code = 0; - mDebug("stb:%s.%s, meta is retrieved", infoReq.dbFName, infoReq.tbName); + mTrace("%s.%s, meta is retrieved", infoReq.dbFName, infoReq.tbName); _OVER: if (code != 0) { diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 8b602d796c47f29efa8dcfb059d2aff5b3b9de40..245f0938b906300af29bf3f6caf71c834877eaa1 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -65,7 +65,7 @@ int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { void mndRestoreFinish(struct SSyncFSM *pFsm) { SMnode *pMnode = pFsm->data; if (!pMnode->deploy) { - mInfo("mnode sync restore finished"); + mInfo("mnode sync restore finished, and will handle outstanding transactions"); mndTransPullup(pMnode); mndSetRestore(pMnode, true); } else { @@ -244,7 +244,7 @@ void mndSyncStart(SMnode *pMnode) { } else { syncStart(pMgmt->sync); } - mDebug("sync:%" PRId64 " is started, standby:%d", pMgmt->sync, pMgmt->standby); + mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby); } void mndSyncStop(SMnode *pMnode) {} diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 239e1bf4b1c0f9502e0073f807c342b98eb340c1..c5a1e0ba5ae9c4125cbba165de6f6966ff05cfee 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -37,7 +37,6 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); static void mndTransDropLogs(SArray *pArray); static void mndTransDropActions(SArray *pArray); static void mndTransDropData(STrans *pTrans); -static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray); static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray); static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); @@ -133,15 +132,21 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { for (int32_t i = 0; i < redoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->redoActions, i); + SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER) if (pAction->isRaw) { int32_t len = sdbGetRawTotalSize(pAction->pRaw); + SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER) SDB_SET_INT32(pRaw, dataPos, len, _OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) } else { SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER) SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) } @@ -149,15 +154,21 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { for (int32_t i = 0; i < undoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->undoActions, i); + SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER) if (pAction->isRaw) { int32_t len = sdbGetRawTotalSize(pAction->pRaw); + SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER) SDB_SET_INT32(pRaw, dataPos, len, _OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) } else { SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER) SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) } @@ -165,15 +176,21 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { for (int32_t i = 0; i < commitActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->commitActions, i); + SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER) if (pAction->isRaw) { int32_t len = sdbGetRawTotalSize(pAction->pRaw); + SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER) SDB_SET_INT32(pRaw, dataPos, len, _OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) } else { SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER) SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) } @@ -259,19 +276,25 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { if (pTrans->commitActions == NULL) goto _OVER; for (int32_t i = 0; i < redoActionNum; ++i) { + SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER) + SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER) if (action.isRaw) { + SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER) SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) - pData = taosMemoryMalloc(dataLen); - if (pData == NULL) goto _OVER; + action.pRaw = taosMemoryMalloc(dataLen); + if (action.pRaw == NULL) goto _OVER; mTrace("raw:%p, is created", pData); - SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER); - if (taosArrayPush(pTrans->redoActions, &pData) == NULL) goto _OVER; - pData = NULL; + SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER); + if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER; + action.pRaw = NULL; } else { SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) + SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER) + SDB_GET_INT8(pRaw, dataPos, &action.msgReceived, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) action.pCont = taosMemoryMalloc(action.contLen); if (action.pCont == NULL) goto _OVER; @@ -282,19 +305,25 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { } for (int32_t i = 0; i < undoActionNum; ++i) { + SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER) + SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER) if (action.isRaw) { + SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER) SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) - pData = taosMemoryMalloc(dataLen); - if (pData == NULL) goto _OVER; + action.pRaw = taosMemoryMalloc(dataLen); + if (action.pRaw == NULL) goto _OVER; mTrace("raw:%p, is created", pData); - SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER); - if (taosArrayPush(pTrans->undoActions, &pData) == NULL) goto _OVER; - pData = NULL; + SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER); + if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER; + action.pRaw = NULL; } else { SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) + SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER) + SDB_GET_INT8(pRaw, dataPos, &action.msgReceived, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) action.pCont = taosMemoryMalloc(action.contLen); if (action.pCont == NULL) goto _OVER; @@ -305,19 +334,25 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { } for (int32_t i = 0; i < commitActionNum; ++i) { + SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER) + SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER) if (action.isRaw) { + SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER) SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) - pData = taosMemoryMalloc(dataLen); - if (pData == NULL) goto _OVER; + action.pRaw = taosMemoryMalloc(dataLen); + if (action.pRaw == NULL) goto _OVER; mTrace("raw:%p, is created", pData); - SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER); - if (taosArrayPush(pTrans->commitActions, &pData) == NULL) goto _OVER; - pData = NULL; + SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER); + if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER; + action.pRaw = NULL; } else { SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) + SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER) + SDB_GET_INT8(pRaw, dataPos, &action.msgReceived, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) action.pCont = taosMemoryMalloc(action.contLen); if (action.pCont == NULL) goto _OVER; @@ -344,7 +379,6 @@ _OVER: mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, terrstr()); mndTransDropData(pTrans); taosMemoryFreeClear(pRow); - taosMemoryFreeClear(pData); taosMemoryFreeClear(action.pCont); return NULL; } @@ -502,7 +536,7 @@ static void mndTransDropData(STrans *pTrans) { } static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) { - mDebug("trans:%d, perform delete action, row:%p stage:%s callfunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage), + mTrace("trans:%d, perform delete action, row:%p stage:%s callfunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage), callFunc); if (pTrans->stopFunc > 0 && callFunc) { TransCbFp fp = mndTransGetCbFp(pTrans->stopFunc); @@ -515,20 +549,34 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) { return 0; } -static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { - if (pNew->stage == TRN_STAGE_COMMIT) { - pNew->stage = TRN_STAGE_COMMIT_ACTION; - mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_ACTION)); - } - - if (pNew->stage == TRN_STAGE_ROLLBACK) { - pNew->stage = TRN_STAGE_FINISHED; - mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_ROLLBACK), mndTransStr(TRN_STAGE_FINISHED)); +static void mndTransUpdateActions(SArray *pOldArray, SArray *pNewArray) { + for (int32_t i = 0; i < taosArrayGetSize(pOldArray); ++i) { + STransAction *pOldAction = taosArrayGet(pOldArray, i); + STransAction *pNewAction = taosArrayGet(pNewArray, i); + pOldAction->rawWritten = pNewAction->rawWritten; + pOldAction->msgSent = pNewAction->msgSent; + pOldAction->msgReceived = pNewAction->msgReceived; + pOldAction->errCode = pNewAction->errCode; } +} +static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { mTrace("trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s", pOld->id, pOld, mndTransStr(pOld->stage), pNew, mndTransStr(pNew->stage)); + mndTransUpdateActions(pOld->redoActions, pNew->redoActions); + mndTransUpdateActions(pOld->undoActions, pNew->undoActions); + mndTransUpdateActions(pOld->commitActions, pNew->commitActions); pOld->stage = pNew->stage; + + if (pOld->stage == TRN_STAGE_COMMIT) { + pOld->stage = TRN_STAGE_COMMIT_ACTION; + mTrace("trans:%d, stage from commit to commitAction", pNew->id); + } + + if (pOld->stage == TRN_STAGE_ROLLBACK) { + pOld->stage = TRN_STAGE_FINISHED; + mTrace("trans:%d, stage from rollback to finished", pNew->id); + } return 0; } @@ -557,8 +605,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S pTrans->stage = TRN_STAGE_PREPARE; pTrans->policy = policy; pTrans->type = type; + pTrans->parallel = TRN_EXEC_PARALLEL; pTrans->createdTime = taosGetTimestampMs(); - if (pReq != NULL) pTrans->rpcInfo = pReq->info; pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); @@ -569,7 +617,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S return NULL; } - mDebug("trans:%d, local object is created, data:%p", pTrans->id, pTrans); + if (pReq != NULL) pTrans->rpcInfo = pReq->info; + mTrace("trans:%d, local object is created, data:%p", pTrans->id, pTrans); return pTrans; } @@ -578,7 +627,7 @@ static void mndTransDropActions(SArray *pArray) { for (int32_t i = 0; i < size; ++i) { STransAction *pAction = taosArrayGet(pArray, i); if (pAction->isRaw) { - sdbFreeRaw(pAction->pRaw); + taosMemoryFreeClear(pAction->pRaw); } else { taosMemoryFreeClear(pAction->pCont); } @@ -590,12 +639,14 @@ static void mndTransDropActions(SArray *pArray) { void mndTransDrop(STrans *pTrans) { if (pTrans != NULL) { mndTransDropData(pTrans); - mDebug("trans:%d, local object is freed, data:%p", pTrans->id, pTrans); + mTrace("trans:%d, local object is freed, data:%p", pTrans->id, pTrans); taosMemoryFreeClear(pTrans); } } static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) { + pAction->id = taosArrayGetSize(pArray); + void *ptr = taosArrayPush(pArray, pAction); if (ptr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -606,25 +657,27 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) { } int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { - STransAction action = {.isRaw = true, .pRaw = pRaw}; + STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .isRaw = true, .pRaw = pRaw}; return mndTransAppendAction(pTrans->redoActions, &action); } int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { - STransAction action = {.isRaw = true, .pRaw = pRaw}; + STransAction action = {.stage = TRN_STAGE_UNDO_ACTION, .isRaw = true, .pRaw = pRaw}; return mndTransAppendAction(pTrans->undoActions, &action); } int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { - STransAction action = {.isRaw = true, .pRaw = pRaw}; + STransAction action = {.stage = TRN_STAGE_COMMIT_ACTION, .isRaw = true, .pRaw = pRaw}; return mndTransAppendAction(pTrans->commitActions, &action); } int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) { + pAction->stage = TRN_STAGE_REDO_ACTION; return mndTransAppendAction(pTrans->redoActions, pAction); } int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) { + pAction->stage = TRN_STAGE_UNDO_ACTION; return mndTransAppendAction(pTrans->undoActions, pAction); } @@ -821,7 +874,8 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { } taosMemoryFree(pTrans->rpcRsp); - mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, code, pTrans->stage, pTrans->rpcInfo.ahandle); + mDebug("trans:%d, send rsp, code:0x%x stage:%s app:%p", pTrans->id, code, mndTransStr(pTrans->stage), + pTrans->rpcInfo.ahandle); SRpcMsg rspMsg = { .code = code, .pCont = rpcCont, @@ -877,55 +931,46 @@ void mndTransProcessRsp(SRpcMsg *pRsp) { } } - mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%04x", transId, action, pRsp->code, - pAction->acceptableCode); + mDebug("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x", transId, mndTransStr(pAction->stage), action, + pRsp->code, pAction->acceptableCode); mndTransExecute(pMnode, pTrans); _OVER: mndReleaseTrans(pMnode, pTrans); } -static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) { - SSdb *pSdb = pMnode->pSdb; - int32_t arraySize = taosArrayGetSize(pArray); - - if (arraySize == 0) return 0; - - int32_t code = 0; - for (int32_t i = 0; i < arraySize; ++i) { - SSdbRaw *pRaw = taosArrayGetP(pArray, i); - if (sdbWriteWithoutFree(pSdb, pRaw) != 0) { - code = ((terrno != 0) ? terrno : -1); - } - } - - terrno = code; - return code; -} - - static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { int32_t numOfActions = taosArrayGetSize(pArray); for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); if (pAction == NULL) continue; - if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue; - if (pAction->rawWritten && pAction->errCode == 0) continue; + if (pAction->msgSent && pAction->msgReceived && + (pAction->errCode == 0 || pAction->errCode == pAction->acceptableCode)) + continue; + if (pAction->rawWritten && (pAction->errCode == 0 || pAction->errCode == pAction->acceptableCode)) continue; + pAction->rawWritten = 0; pAction->msgSent = 0; pAction->msgReceived = 0; pAction->errCode = 0; - mDebug("trans:%d, action:%d execute status is reset", pTrans->id, action); + mDebug("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), action); } } static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { + if (pAction->rawWritten) return 0; + int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw); - if (code == 0) { - mDebug("trans:%d, action:%d write to sdb", pTrans->id, pAction->id); + if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { + pAction->rawWritten = true; + pAction->errCode = 0; + code = 0; + mDebug("trans:%d, %s:%d write to sdb", pTrans->id, mndTransStr(pAction->stage), pAction->id); } else { - mError("trans:%d, action:%d failed to write sdb since %s", pTrans->id, pAction->id, terrstr()); + pAction->errCode = (terrno != 0) ? terrno : code; + mError("trans:%d, %s:%d failed to write sdb since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, + terrstr()); } return code; @@ -952,13 +997,13 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio pAction->msgSent = 1; pAction->msgReceived = 0; pAction->errCode = 0; - mDebug("trans:%d, action:%d is sent to %s:%u", pTrans->id, pAction->id, + mDebug("trans:%d, %s:%d is sent to %s:%u", pTrans->id, mndTransStr(pAction->stage), pAction->id, pAction->epSet.eps[pAction->epSet.inUse].fqdn, pAction->epSet.eps[pAction->epSet.inUse].port); } else { pAction->msgSent = 0; pAction->msgReceived = 0; pAction->errCode = (terrno != 0) ? terrno : code; - mError("trans:%d, action:%d not send since %s", pTrans->id, pAction->id, terrstr()); + mError("trans:%d, %s:%d not send since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, terrstr()); } return code; @@ -995,20 +1040,20 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA return -1; } - int32_t numOfReceived = 0; + int32_t numOfExecuted = 0; int32_t errCode = 0; for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); if (pAction == NULL) continue; - if (pAction->msgSent && pAction->msgReceived) { - numOfReceived++; + if ((pAction->msgSent && pAction->msgReceived) || pAction->rawWritten) { + numOfExecuted++; if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) { errCode = pAction->errCode; } } } - if (numOfReceived == numOfActions) { + if (numOfExecuted == numOfActions) { if (errCode == 0) { mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions); return 0; @@ -1019,7 +1064,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA return errCode; } } else { - mDebug("trans:%d, %d of %d actions executed", pTrans->id, numOfReceived, numOfActions); + mDebug("trans:%d, %d of %d actions executed", pTrans->id, numOfExecuted, numOfActions); return TSDB_CODE_ACTION_IN_PROGRESS; } } @@ -1041,7 +1086,7 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { } static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteLogs(pMnode, pTrans->commitActions); + int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions); if (code != 0) { mError("failed to execute commitActions since %s", terrstr()); } @@ -1063,14 +1108,16 @@ static bool mndTransExecuteRedoActionsOneByOne(SMnode *pMnode, STrans *pTrans) { int32_t code = mndTransExecSingleAction(pMnode, pTrans, pAction); if (code == 0) { pTrans->redoActionPos++; - mDebug("trans:%d, redo action:%d is executed and need sync to other mnodes", pTrans->id, pAction->id); + mDebug("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage), + pAction->id); // todo sync these infos } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { - mDebug("trans:%d, redo action:%d is in progress and wait it finish", pTrans->id, pAction->id); + mDebug("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id); continueExec = false; } else { - mError("trans:%d, redo action:%d failed to execute since %s", pTrans->id, pAction->id, terrstr()); + mError("trans:%d, %s:%d failed to execute since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, + terrstr()); continueExec = false; } @@ -1207,8 +1254,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) { mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); } - mDebug("trans:%d, finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes); - + mDebug("trans:%d, execute finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes); return continueExec; } @@ -1271,15 +1317,15 @@ int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) { if (pAction == NULL) continue; if (pAction->msgReceived == 0) { - mInfo("trans:%d, action:%d set processed for kill msg received", pTrans->id, i); + mInfo("trans:%d, %s:%d set processed for kill msg received", pTrans->id, mndTransStr(pAction->stage), i); pAction->msgSent = 1; pAction->msgReceived = 1; pAction->errCode = 0; } if (pAction->errCode != 0) { - mInfo("trans:%d, action:%d set processed for kill msg received, errCode from %s to success", pTrans->id, i, - tstrerror(pAction->errCode)); + mInfo("trans:%d, %s:%d set processed for kill msg received, errCode from %s to success", pTrans->id, + mndTransStr(pAction->stage), i, tstrerror(pAction->errCode)); pAction->msgSent = 1; pAction->msgReceived = 1; pAction->errCode = 0; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index cc6364c4571b7b56b096d282c4f8f29a7b624dca..83d00c86e3eb8797f5b40fa58243df429e905d5a 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -77,7 +77,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mDebug("user:%s, will be created while deploy sdb, raw:%p", userObj.user, pRaw); + mDebug("user:%s, will be created when deploying, raw:%p", userObj.user, pRaw); #if 0 return sdbWrite(pMnode->pSdb, pRaw); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index e05b38a7c0345293eb53caeab2eb680f6d113651..161fc5379c45a166ab331bce7b6cbf0f6e2f8c98 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -501,7 +501,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { *ppVgroups = pVgroups; code = 0; - mInfo("db:%s, %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications); + mInfo("db:%s, total %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications); _OVER: if (code != 0) taosMemoryFree(pVgroups); @@ -539,7 +539,7 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) { pVgid->role = TAOS_SYNC_STATE_FOLLOWER; pDnode->numOfVnodes++; - mInfo("db:%s, vgId:%d, vn:%d dnode:%d is added", pVgroup->dbName, pVgroup->vgId, maxPos, pVgid->dnodeId); + mInfo("db:%s, vgId:%d, vnode_index:%d dnode:%d is added", pVgroup->dbName, pVgroup->vgId, maxPos, pVgid->dnodeId); maxPos++; if (maxPos == 3) return 0; } diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index c66b47a24b13f0c9efd55dc965743416737177ea..1fd0260d0d22849dafcd1698c89e98392fbe31b0 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -168,6 +168,7 @@ typedef struct SSdb { char *currDir; char *tmpDir; int64_t lastCommitVer; + int64_t lastCommitTerm; int64_t curVer; int64_t curTerm; int64_t tableVer[SDB_MAX]; diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 485b729deb52ffcdf4c5b76c1999124a5157f5b2..0526ea5c2d65cee2b57d6312b92b90830bad0b8b 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -55,6 +55,7 @@ SSdb *sdbInit(SSdbOpt *pOption) { pSdb->curVer = -1; pSdb->curTerm = -1; pSdb->lastCommitVer = -1; + pSdb->lastCommitTerm = -1; pSdb->pMnode = pOption->pMnode; taosThreadMutexInit(&pSdb->filelock, NULL); mDebug("sdb init successfully"); diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 834e7a00c8c58638a9ac51ec498f87d66abe2b1e..83135491a993e5f8106ed05409255951342c0ac7 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -70,6 +70,7 @@ static void sdbResetData(SSdb *pSdb) { pSdb->curVer = -1; pSdb->curTerm = -1; pSdb->lastCommitVer = -1; + pSdb->lastCommitTerm = -1; mDebug("sdb reset successfully"); } @@ -211,12 +212,12 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { char file[PATH_MAX] = {0}; snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); - mDebug("start to read file:%s", file); + mDebug("start to read sdb file:%s", file); SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100); if (pRaw == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("failed read file since %s", terrstr()); + mError("failed read sdb file since %s", terrstr()); return -1; } @@ -224,12 +225,12 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { if (pFile == NULL) { taosMemoryFree(pRaw); terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to read file:%s since %s", file, terrstr()); + mError("failed to read sdb file:%s since %s", file, terrstr()); return 0; } if (sdbReadFileHead(pSdb, pFile) != 0) { - mError("failed to read file:%s head since %s", file, terrstr()); + mError("failed to read sdb file:%s head since %s", file, terrstr()); taosMemoryFree(pRaw); taosCloseFile(&pFile); return -1; @@ -245,13 +246,13 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { if (ret < 0) { code = TAOS_SYSTEM_ERROR(errno); - mError("failed to read file:%s since %s", file, tstrerror(code)); + mError("failed to read sdb file:%s since %s", file, tstrerror(code)); break; } if (ret != readLen) { code = TSDB_CODE_FILE_CORRUPTED; - mError("failed to read file:%s since %s", file, tstrerror(code)); + mError("failed to read sdb file:%s since %s", file, tstrerror(code)); break; } @@ -259,34 +260,36 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { ret = taosReadFile(pFile, pRaw->pData, readLen); if (ret < 0) { code = TAOS_SYSTEM_ERROR(errno); - mError("failed to read file:%s since %s", file, tstrerror(code)); + mError("failed to read sdb file:%s since %s", file, tstrerror(code)); break; } if (ret != readLen) { code = TSDB_CODE_FILE_CORRUPTED; - mError("failed to read file:%s since %s", file, tstrerror(code)); + mError("failed to read sdb file:%s since %s", file, tstrerror(code)); break; } int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t); if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) { code = TSDB_CODE_CHECKSUM_ERROR; - mError("failed to read file:%s since %s", file, tstrerror(code)); + mError("failed to read sdb file:%s since %s", file, tstrerror(code)); break; } code = sdbWriteWithoutFree(pSdb, pRaw); if (code != 0) { - mError("failed to read file:%s since %s", file, terrstr()); + mError("failed to read sdb file:%s since %s", file, terrstr()); goto _OVER; } } code = 0; pSdb->lastCommitVer = pSdb->curVer; + pSdb->lastCommitTerm = pSdb->curTerm; memcpy(pSdb->tableVer, tableVer, sizeof(tableVer)); - mDebug("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer); + mDebug("read sdb file:%s successfully, ver:%" PRId64 " term:%" PRId64, file, pSdb->lastCommitVer, + pSdb->lastCommitTerm); _OVER: taosCloseFile(&pFile); @@ -302,7 +305,7 @@ int32_t sdbReadFile(SSdb *pSdb) { sdbResetData(pSdb); int32_t code = sdbReadFileImp(pSdb); if (code != 0) { - mError("failed to read sdb since %s", terrstr()); + mError("failed to read sdb file since %s", terrstr()); sdbResetData(pSdb); } @@ -318,18 +321,19 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { char curfile[PATH_MAX] = {0}; snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); - mDebug("start to write file:%s, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer, - pSdb->curTerm, pSdb->lastCommitVer); + mDebug("start to write sdb file, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64 " term:%" PRId64 + " file:%s", + pSdb->curVer, pSdb->curTerm, pSdb->lastCommitVer, pSdb->lastCommitTerm, curfile); TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to open file:%s for write since %s", tmpfile, terrstr()); + mError("failed to open sdb file:%s for write since %s", tmpfile, terrstr()); return -1; } if (sdbWriteFileHead(pSdb, pFile) != 0) { - mError("failed to write file:%s head since %s", tmpfile, terrstr()); + mError("failed to write sdb file:%s head since %s", tmpfile, terrstr()); taosCloseFile(&pFile); return -1; } @@ -338,7 +342,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { SdbEncodeFp encodeFp = pSdb->encodeFps[i]; if (encodeFp == NULL) continue; - mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i)); + mTrace("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i)); SHashObj *hash = pSdb->hashObjs[i]; TdThreadRwlock *pLock = &pSdb->locks[i]; @@ -394,7 +398,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { code = taosFsyncFile(pFile); if (code != 0) { code = TAOS_SYSTEM_ERROR(errno); - mError("failed to sync file:%s since %s", tmpfile, tstrerror(code)); + mError("failed to sync sdb file:%s since %s", tmpfile, tstrerror(code)); } } @@ -404,15 +408,17 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { code = taosRenameFile(tmpfile, curfile); if (code != 0) { code = TAOS_SYSTEM_ERROR(errno); - mError("failed to write file:%s since %s", curfile, tstrerror(code)); + mError("failed to write sdb file:%s since %s", curfile, tstrerror(code)); } } if (code != 0) { - mError("failed to write file:%s since %s", curfile, tstrerror(code)); + mError("failed to write sdb file:%s since %s", curfile, tstrerror(code)); } else { pSdb->lastCommitVer = pSdb->curVer; - mDebug("write file:%s successfully, ver:%" PRId64 " term:%" PRId64, curfile, pSdb->lastCommitVer, pSdb->curTerm); + pSdb->lastCommitTerm = pSdb->curTerm; + mDebug("write sdb file successfully, ver:%" PRId64 " term:%" PRId64 " file:%s", pSdb->lastCommitVer, + pSdb->lastCommitTerm, curfile); } terrno = code; @@ -427,7 +433,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { taosThreadMutexLock(&pSdb->filelock); int32_t code = sdbWriteFileImp(pSdb); if (code != 0) { - mError("failed to write sdb since %s", terrstr()); + mError("failed to write sdb file since %s", terrstr()); } taosThreadMutexUnlock(&pSdb->filelock); return code; @@ -493,7 +499,7 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { if (taosCopyFile(datafile, pIter->name) < 0) { taosThreadMutexUnlock(&pSdb->filelock); terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to copy file %s to %s since %s", datafile, pIter->name, terrstr()); + mError("failed to copy sdb file %s to %s since %s", datafile, pIter->name, terrstr()); sdbCloseIter(pIter); return -1; } @@ -502,7 +508,7 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { pIter->file = taosOpenFile(pIter->name, TD_FILE_READ); if (pIter->file == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to open file:%s since %s", pIter->name, terrstr()); + mError("failed to open sdb file:%s since %s", pIter->name, terrstr()); sdbCloseIter(pIter); return -1; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 7201820854e6a87a1dffc12a47c37b8d6b692668..655dcbc853adba2dcfa2ebedac89bb0b35bb343b 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -79,7 +79,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { if (taskHandle) { code = qExecTask(taskHandle, &pRes, &useconds); if (code) { - QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); + if (code != TSDB_CODE_OPS_NOT_SUPPORT) { + QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); + } else { + QW_TASK_DLOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); + } QW_ERR_RET(code); } }