提交 7d8c04ed 编写于 作者: S Shengliang Guan

fix bug in transaction

上级 b5429120
...@@ -829,6 +829,7 @@ static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t ...@@ -829,6 +829,7 @@ static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp); int32_t code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
if (pRsp != NULL) { if (pRsp != NULL) {
pRsp->ahandle = pMsg->ahandle;
rpcSendResponse(pRsp); rpcSendResponse(pRsp);
free(pRsp); free(pRsp);
} else { } else {
......
...@@ -541,6 +541,15 @@ static int32_t mndSetDbCfgFromAlterDbMsg(SDbObj *pDb, SAlterDbMsg *pAlter) { ...@@ -541,6 +541,15 @@ static int32_t mndSetDbCfgFromAlterDbMsg(SDbObj *pDb, SAlterDbMsg *pAlter) {
} }
static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) {
SSdbRaw *pRedoRaw = mndDbActionEncode(pOldDb);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_UPDATING) != 0) return -1;
return 0;
}
static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) {
SSdbRaw *pRedoRaw = mndDbActionEncode(pNewDb); SSdbRaw *pRedoRaw = mndDbActionEncode(pNewDb);
if (pRedoRaw == NULL) return -1; if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
...@@ -549,24 +558,57 @@ static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pO ...@@ -549,24 +558,57 @@ static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pO
return 0; return 0;
} }
static int32_t mndSetUpdateDbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
SSdbRaw *pUndoRaw = mndDbActionEncode(pOldDb); for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
if (pUndoRaw == NULL) return -1; STransAction action = {0};
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) return -1;
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
SAlterVnodeMsg *pMsg = (SAlterVnodeMsg *)mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup);
if (pMsg == NULL) return -1;
action.pCont = pMsg;
action.contLen = sizeof(SAlterVnodeMsg);
action.msgType = TSDB_MSG_TYPE_ALTER_VNODE_IN;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg);
return -1;
}
}
return 0; return 0;
} }
static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { return 0; } static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { return 0; } while (1) {
SVgObj *pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
static int32_t mndSetUpdateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { return 0; } if (pVgroup->dbUid == pNewDb->uid) {
if (mndBuildUpdateVgroupAction(pMnode, pTrans, pNewDb, pVgroup) != 0) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
return -1;
}
}
sdbRelease(pSdb, pVgroup);
}
return 0;
}
static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbObj *pNewDb) { static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbObj *pNewDb) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("db:%s, failed to update since %s", pOldDb->name, terrstr()); mError("db:%s, failed to update since %s", pOldDb->name, terrstr());
return terrno; return terrno;
...@@ -579,11 +621,6 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO ...@@ -579,11 +621,6 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO
goto UPDATE_DB_OVER; goto UPDATE_DB_OVER;
} }
if (mndSetUpdateDbUndoLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) {
mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
goto UPDATE_DB_OVER;
}
if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) { if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto UPDATE_DB_OVER; goto UPDATE_DB_OVER;
...@@ -594,11 +631,6 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO ...@@ -594,11 +631,6 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO
goto UPDATE_DB_OVER; goto UPDATE_DB_OVER;
} }
if (mndSetUpdateDbUndoActions(pMnode, pTrans, pOldDb, pNewDb) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto UPDATE_DB_OVER;
}
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
goto UPDATE_DB_OVER; goto UPDATE_DB_OVER;
...@@ -660,31 +692,87 @@ static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) ...@@ -660,31 +692,87 @@ static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb)
return 0; return 0;
} }
static int32_t mndSetDropDbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdbRaw *pUndoRaw = mndDbActionEncode(pDb);
if (pUndoRaw == NULL) return -1;
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
}
static int32_t mndSetDropDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { static int32_t mndSetDropDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdbRaw *pCommitRaw = mndDbActionEncode(pDb); SSdbRaw *pCommitRaw = mndDbActionEncode(pDb);
if (pCommitRaw == NULL) return -1; if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SVgObj *pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid == pDb->uid) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
if (pVgRaw == NULL || mndTransAppendCommitlog(pTrans, pVgRaw) != 0) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
return -1;
}
sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED);
}
sdbRelease(pSdb, pVgroup);
}
return 0;
}
static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
STransAction action = {0};
SVnodeGid * pVgid = pVgroup->vnodeGid + vn;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) return -1;
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup);
if (pMsg == NULL) return -1;
action.pCont = pMsg;
action.contLen = sizeof(SCreateVnodeMsg);
action.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg);
return -1;
}
}
return 0; return 0;
} }
static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { return 0; } static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
static int32_t mndSetDropDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { return 0; } while (1) {
SVgObj *pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid == pDb->uid) {
if (mndBuildDropVgroupAction(pMnode, pTrans, pDb, pVgroup) != 0) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
return -1;
}
}
sdbRelease(pSdb, pVgroup);
}
return 0;
}
static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) { static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("db:%s, failed to drop since %s", pDb->name, terrstr()); mError("db:%s, failed to drop since %s", pDb->name, terrstr());
return -1; return -1;
...@@ -697,11 +785,6 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) { ...@@ -697,11 +785,6 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) {
goto DROP_DB_OVER; goto DROP_DB_OVER;
} }
if (mndSetDropDbUndoLogs(pMnode, pTrans, pDb) != 0) {
mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
goto DROP_DB_OVER;
}
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) { if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto DROP_DB_OVER; goto DROP_DB_OVER;
...@@ -712,11 +795,6 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) { ...@@ -712,11 +795,6 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) {
goto DROP_DB_OVER; goto DROP_DB_OVER;
} }
if (mndSetDropDbUndoActions(pMnode, pTrans, pDb) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto DROP_DB_OVER;
}
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
goto DROP_DB_OVER; goto DROP_DB_OVER;
......
...@@ -696,7 +696,8 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA ...@@ -696,7 +696,8 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
for (int32_t action = 0; action < numOfActions; ++action) { for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action); STransAction *pAction = taosArrayGet(pArray, action);
if (pAction == NULL) continue; if (pAction == NULL) continue;
if (pAction->msgReceived && pAction->errCode == 0) continue; if (pAction->msgSent && !pAction->msgReceived) continue;
if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue;
int64_t signature = pTrans->id; int64_t signature = pTrans->id;
signature = (signature << 32); signature = (signature << 32);
......
...@@ -337,8 +337,16 @@ static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) { ...@@ -337,8 +337,16 @@ static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) {
return 0; return 0;
} }
static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg) {
static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) { return 0; } mndTransHandleActionRsp(pMsg);
return 0;
}
static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
return 0;
}
static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; }
......
...@@ -199,6 +199,7 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) { ...@@ -199,6 +199,7 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) {
SSdbRow *pRow = *ppRow; SSdbRow *pRow = *ppRow;
switch (pRow->status) { switch (pRow->status) {
case SDB_STATUS_READY: case SDB_STATUS_READY:
case SDB_STATUS_UPDATING:
atomic_add_fetch_32(&pRow->refCount, 1); atomic_add_fetch_32(&pRow->refCount, 1);
pRet = pRow->pObj; pRet = pRow->pObj;
break; break;
......
system sh/stop_dnodes.sh system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
sleep 2000
sql connect sql connect
print =============== create database print =============== create database
......
system sh/stop_dnodes.sh system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
sleep 2000
sql connect sql connect
print =============== create database print =============== create database
...@@ -13,29 +12,24 @@ endi ...@@ -13,29 +12,24 @@ endi
print $data00 $data01 $data02 print $data00 $data01 $data02
print =============== create normal table sql use d1
sql create table d1.n1 (ts timestamp, i int)
sql show d1.tables
if $rows != 1 then
return -1
endi
print $data00 $data01 $data02
print =============== create super table print =============== create super table
sql create table d1.st (ts timestamp, i int) tags (j int) sql create table st (ts timestamp, i int) tags (j int)
sql show d1.stables sql show stables
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
print $data00 $data01 $data02 print $data00 $data01 $data02
return
print =============== create child table print =============== create child table
sql create table d1.c1 using d1.st tags(1) sql create table c1 using st tags(1)
sql create table d1.c2 using d1.st tags(2) sql create table c2 using st tags(2)
sql show d1.tables sql show tables
if $rows != 3 then if $rows != 2 then
return -1 return -1
endi endi
...@@ -44,12 +38,12 @@ print $data10 $data11 $data22 ...@@ -44,12 +38,12 @@ print $data10 $data11 $data22
print $data20 $data11 $data22 print $data20 $data11 $data22
print =============== insert data print =============== insert data
sql insert into d1.n1 values(now+1s, 1) sql insert into c1 values(now+1s, 1)
sql insert into d1.n1 values(now+2s, 2) sql insert into c1 values(now+2s, 2)
sql insert into d1.n1 values(now+3s, 3) sql insert into c1 values(now+3s, 3)
print =============== query data print =============== query data
sql select * from d1.n1 sql select * from c1
if $rows != 3 then if $rows != 3 then
return -1 return -1
endi endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册