提交 344ef7e1 编写于 作者: S Shengliang Guan

fix: drop tsma

上级 0cbc2248
...@@ -47,6 +47,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, S ...@@ -47,6 +47,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, S
void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *cntlen, bool standby); void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *cntlen, bool standby);
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildAlterVnodeReq(SMnode *, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); void *mndBuildAlterVnodeReq(SMnode *, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -645,7 +645,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * ...@@ -645,7 +645,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid == pNew->uid) { if (mndVgroupInDb(pVgroup, pNew->uid)) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pNew, pVgroup, pArray) != 0) { if (mndBuildAlterVgroupAction(pMnode, pTrans, pNew, pVgroup, pArray) != 0) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
...@@ -1006,7 +1006,7 @@ static int32_t mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode) { ...@@ -1006,7 +1006,7 @@ static int32_t mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid == pDb->uid) { if (mndVgroupInDb(pVgroup, pDb->uid)) {
numOfTables += pVgroup->numOfTables / TSDB_TABLE_NUM_UNIT; numOfTables += pVgroup->numOfTables / TSDB_TABLE_NUM_UNIT;
vindex++; vindex++;
} }
......
...@@ -225,10 +225,11 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -225,10 +225,11 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
SVgObj* pVgroup; SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (strcmp(pVgroup->dbName, pStream->targetDb) != 0) { if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
continue; continue;
} }
SStreamTask* pTask = tNewSStreamTask(pStream->uid); SStreamTask* pTask = tNewSStreamTask(pStream->uid);
if (pTask == NULL) { if (pTask == NULL) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
...@@ -420,10 +421,11 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -420,10 +421,11 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
SVgObj* pVgroup; SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->sourceDbUid) { if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
continue; continue;
} }
SStreamTask* pTask = tNewSStreamTask(pStream->uid); SStreamTask* pTask = tNewSStreamTask(pStream->uid);
if (pInnerTask == NULL) { if (pInnerTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -483,10 +485,11 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -483,10 +485,11 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
SVgObj* pVgroup; SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->sourceDbUid) { if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
continue; continue;
} }
SStreamTask* pTask = tNewSStreamTask(pStream->uid); SStreamTask* pTask = tNewSStreamTask(pStream->uid);
if (pTask == NULL) { if (pTask == NULL) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
...@@ -559,7 +562,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib ...@@ -559,7 +562,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pTopic->dbUid) { if (!mndVgroupInDb(pVgroup, pTopic->uid)) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
continue; continue;
} }
......
...@@ -822,6 +822,17 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p ...@@ -822,6 +822,17 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
if (pSma->stbUid == pStb->uid) { if (pSma->stbUid == pStb->uid) {
pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId); pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
if (pVgroup == NULL) goto _OVER; if (pVgroup == NULL) goto _OVER;
SStreamObj *pStream = mndAcquireStream(pMnode, pSma->name);
if (pStream != NULL && pStream->smaId == pSma->uid) {
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
goto _OVER;
}
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
goto _OVER;
}
}
if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER; if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER;
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
......
...@@ -621,12 +621,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -621,12 +621,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) { if (!mndVgroupInDb(pVgroup, pDb->uid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
if (pVgroup->isTsma) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
continue; continue;
} }
...@@ -664,12 +659,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -664,12 +659,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) { if (!mndVgroupInDb(pVgroup, pDb->uid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
if (pVgroup->isTsma) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
continue; continue;
} }
...@@ -1297,12 +1287,7 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -1297,12 +1287,7 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) { if (!mndVgroupInDb(pVgroup, pDb->uid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
if (pVgroup->isTsma) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
continue; continue;
} }
...@@ -1688,12 +1673,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * ...@@ -1688,12 +1673,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) { if (!mndVgroupInDb(pVgroup, pDb->uid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
if (pVgroup->isTsma) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
continue; continue;
} }
......
...@@ -1146,13 +1146,13 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) ...@@ -1146,13 +1146,13 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
} else { } else {
code = TSDB_CODE_ACTION_IN_PROGRESS; code = TSDB_CODE_ACTION_IN_PROGRESS;
} }
} } else if (pAction->rawWritten) {
if (pAction->rawWritten) {
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) { if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
code = pAction->errCode; code = pAction->errCode;
} else { } else {
mDebug("trans:%d, %s:%d write successfully", pTrans->id, mndTransStr(pAction->stage), action); mDebug("trans:%d, %s:%d write successfully", pTrans->id, mndTransStr(pAction->stage), action);
} }
} else {
} }
} }
......
...@@ -1751,4 +1751,6 @@ _OVER: ...@@ -1751,4 +1751,6 @@ _OVER:
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
return code; return code;
} }
\ No newline at end of file
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
\ No newline at end of file
...@@ -63,7 +63,7 @@ python3 ./test.py -f 2-query/To_unixtimestamp.py ...@@ -63,7 +63,7 @@ python3 ./test.py -f 2-query/To_unixtimestamp.py
python3 ./test.py -f 2-query/timetruncate.py python3 ./test.py -f 2-query/timetruncate.py
python3 ./test.py -f 2-query/diff.py python3 ./test.py -f 2-query/diff.py
python3 ./test.py -f 2-query/Timediff.py python3 ./test.py -f 2-query/Timediff.py
#python3 ./test.py -f 2-query/json_tag.py python3 ./test.py -f 2-query/json_tag.py
python3 ./test.py -f 2-query/top.py python3 ./test.py -f 2-query/top.py
python3 ./test.py -f 2-query/bottom.py python3 ./test.py -f 2-query/bottom.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册