diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 598d6e47bed1987587dbc7fcc9dff25df68457dc..028596c041c50cea431a04eafee48086c589d19d 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -829,6 +829,7 @@ static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); int32_t code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp); if (pRsp != NULL) { + pRsp->ahandle = pMsg->ahandle; rpcSendResponse(pRsp); free(pRsp); } else { diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 7c12714a46f187b52a72794cc221ee76fa4b7c6d..d7e8768f65c228f143e258d79d263b66d4f59153 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -541,6 +541,15 @@ static int32_t mndSetDbCfgFromAlterDbMsg(SDbObj *pDb, SAlterDbMsg *pAlter) { } static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { + SSdbRaw *pRedoRaw = mndDbActionEncode(pOldDb); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_UPDATING) != 0) return -1; + + return 0; +} + +static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { SSdbRaw *pRedoRaw = mndDbActionEncode(pNewDb); if (pRedoRaw == NULL) return -1; if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; @@ -549,24 +558,57 @@ static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pO return 0; } -static int32_t mndSetUpdateDbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { - SSdbRaw *pUndoRaw = mndDbActionEncode(pOldDb); - if (pUndoRaw == NULL) return -1; - if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; - if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1; +static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { + for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { + STransAction action = {0}; + SVnodeGid *pVgid = pVgroup->vnodeGid + vn; + + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode == NULL) return -1; + action.epSet = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + + SAlterVnodeMsg *pMsg = (SAlterVnodeMsg *)mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup); + if (pMsg == NULL) return -1; + + action.pCont = pMsg; + action.contLen = sizeof(SAlterVnodeMsg); + action.msgType = TSDB_MSG_TYPE_ALTER_VNODE_IN; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pMsg); + return -1; + } + } return 0; } -static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { return 0; } +static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; -static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { return 0; } + while (1) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; -static int32_t mndSetUpdateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { return 0; } + if (pVgroup->dbUid == pNewDb->uid) { + if (mndBuildUpdateVgroupAction(pMnode, pTrans, pNewDb, pVgroup) != 0) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + return -1; + } + } + + sdbRelease(pSdb, pVgroup); + } + + return 0; +} static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbObj *pNewDb) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle); if (pTrans == NULL) { mError("db:%s, failed to update since %s", pOldDb->name, terrstr()); return terrno; @@ -579,11 +621,6 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO goto UPDATE_DB_OVER; } - if (mndSetUpdateDbUndoLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) { - mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); - goto UPDATE_DB_OVER; - } - if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) { mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); goto UPDATE_DB_OVER; @@ -594,11 +631,6 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO goto UPDATE_DB_OVER; } - if (mndSetUpdateDbUndoActions(pMnode, pTrans, pOldDb, pNewDb) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto UPDATE_DB_OVER; - } - if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); goto UPDATE_DB_OVER; @@ -660,31 +692,87 @@ static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) return 0; } -static int32_t mndSetDropDbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { - SSdbRaw *pUndoRaw = mndDbActionEncode(pDb); - if (pUndoRaw == NULL) return -1; - if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; - if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1; - - return 0; -} - static int32_t mndSetDropDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { SSdbRaw *pCommitRaw = mndDbActionEncode(pDb); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + + while (1) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + if (pVgroup->dbUid == pDb->uid) { + SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup); + if (pVgRaw == NULL || mndTransAppendCommitlog(pTrans, pVgRaw) != 0) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + return -1; + } + sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED); + } + + sdbRelease(pSdb, pVgroup); + } + + return 0; +} + +static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { + for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { + STransAction action = {0}; + SVnodeGid * pVgid = pVgroup->vnodeGid + vn; + + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode == NULL) return -1; + action.epSet = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + + SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup); + if (pMsg == NULL) return -1; + + action.pCont = pMsg; + action.contLen = sizeof(SCreateVnodeMsg); + action.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pMsg); + return -1; + } + } + return 0; } -static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { return 0; } +static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; -static int32_t mndSetDropDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { return 0; } + while (1) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + if (pVgroup->dbUid == pDb->uid) { + if (mndBuildDropVgroupAction(pMnode, pTrans, pDb, pVgroup) != 0) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + return -1; + } + } + + sdbRelease(pSdb, pVgroup); + } + + return 0; +} static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle); if (pTrans == NULL) { mError("db:%s, failed to drop since %s", pDb->name, terrstr()); return -1; @@ -697,11 +785,6 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) { goto DROP_DB_OVER; } - if (mndSetDropDbUndoLogs(pMnode, pTrans, pDb) != 0) { - mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); - goto DROP_DB_OVER; - } - if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) { mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); goto DROP_DB_OVER; @@ -712,11 +795,6 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) { goto DROP_DB_OVER; } - if (mndSetDropDbUndoActions(pMnode, pTrans, pDb) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto DROP_DB_OVER; - } - if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); goto DROP_DB_OVER; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 53e3bc52036dee574f43dcd96889146bdc312445..eca49cf2d03e18c3128fcbe38b57c9969d4e4662 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -696,7 +696,8 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); if (pAction == NULL) continue; - if (pAction->msgReceived && pAction->errCode == 0) continue; + if (pAction->msgSent && !pAction->msgReceived) continue; + if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue; int64_t signature = pTrans->id; signature = (signature << 32); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index c65436efcb844d3b9387fdc9467673205d03d4dc..92bcbb73943ca81251b4666fadf78c7ec9c61d57 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -337,8 +337,16 @@ static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) { return 0; } -static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg) { return 0; } -static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg) { + mndTransHandleActionRsp(pMsg); + return 0; +} + +static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) { + mndTransHandleActionRsp(pMsg); + return 0; +} + static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; } diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 1c8b82fd92cd3ac8ef536a744533e077c4a79588..0c311b86b6fcf3ed48d7e4c2cc5f64ef8cb88bb4 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -199,6 +199,7 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) { SSdbRow *pRow = *ppRow; switch (pRow->status) { case SDB_STATUS_READY: + case SDB_STATUS_UPDATING: atomic_add_fetch_32(&pRow->refCount, 1); pRet = pRow->pObj; break; diff --git a/tests/script/general/db/basic1.sim b/tests/script/general/db/basic1.sim index dc62c33842ef9a1a1af4bff341867c77dbc7ed6c..666a9dfd9e2a1e641dad52b84006bae1634db524 100644 --- a/tests/script/general/db/basic1.sim +++ b/tests/script/general/db/basic1.sim @@ -1,7 +1,6 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 system sh/exec.sh -n dnode1 -s start -sleep 2000 sql connect print =============== create database diff --git a/tests/script/general/table/basic1.sim b/tests/script/general/table/basic1.sim index c26c3c33e4102723805925cccb61c63d01c866b8..ded8d79a3fc814aa82eef1a658d12c6ae14aab12 100644 --- a/tests/script/general/table/basic1.sim +++ b/tests/script/general/table/basic1.sim @@ -1,7 +1,6 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 system sh/exec.sh -n dnode1 -s start -sleep 2000 sql connect print =============== create database @@ -13,29 +12,24 @@ endi print $data00 $data01 $data02 -print =============== create normal table -sql create table d1.n1 (ts timestamp, i int) -sql show d1.tables -if $rows != 1 then - return -1 -endi - -print $data00 $data01 $data02 +sql use d1 print =============== create super table -sql create table d1.st (ts timestamp, i int) tags (j int) -sql show d1.stables +sql create table st (ts timestamp, i int) tags (j int) +sql show stables if $rows != 1 then return -1 endi print $data00 $data01 $data02 +return + print =============== create child table -sql create table d1.c1 using d1.st tags(1) -sql create table d1.c2 using d1.st tags(2) -sql show d1.tables -if $rows != 3 then +sql create table c1 using st tags(1) +sql create table c2 using st tags(2) +sql show tables +if $rows != 2 then return -1 endi @@ -44,12 +38,12 @@ print $data10 $data11 $data22 print $data20 $data11 $data22 print =============== insert data -sql insert into d1.n1 values(now+1s, 1) -sql insert into d1.n1 values(now+2s, 2) -sql insert into d1.n1 values(now+3s, 3) +sql insert into c1 values(now+1s, 1) +sql insert into c1 values(now+2s, 2) +sql insert into c1 values(now+3s, 3) print =============== query data -sql select * from d1.n1 +sql select * from c1 if $rows != 3 then return -1 endi