提交 e7377e41 编写于 作者: L Liu Jicong

refactor(mnode): check drop and alter stb for stream and topic

上级 0e0e8cfa
...@@ -1145,7 +1145,7 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p ...@@ -1145,7 +1145,7 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
return 0; return 0;
} }
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t suid, col_id_t colId) { static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
...@@ -1154,7 +1154,7 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t ...@@ -1154,7 +1154,7 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t
if (pIter == NULL) break; if (pIter == NULL) break;
mDebug("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s", mDebug("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s",
pTopic->name, stbname, suid, colId, pTopic->subType, pTopic->sql); pTopic->name, stbFullName, suid, colId, pTopic->subType, pTopic->sql);
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) { if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
continue; continue;
...@@ -1192,20 +1192,66 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t ...@@ -1192,20 +1192,66 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
nodesDestroyNode(pAst); nodesDestroyNode(pAst);
} }
return 0;
}
static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
SNode *pAst = NULL;
if (nodesStringToNode(pStream->ast, &pAst) != 0) {
ASSERT(0);
return -1;
}
SNodeList *pNodeList = NULL;
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
SNode *pNode = NULL;
FOREACH(pNode, pNodeList) {
SColumnNode *pCol = (SColumnNode *)pNode;
if (pCol->tableId != suid) {
mDebug("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
goto NEXT;
}
if (pCol->colId > 0 && pCol->colId == colId) {
sdbRelease(pSdb, pStream);
nodesDestroyNode(pAst);
terrno = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
mError("stream:%s, check colId:%d conflicted", pStream->name, pCol->colId);
return -1;
}
mDebug("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
}
NEXT:
sdbRelease(pSdb, pStream);
nodesDestroyNode(pAst);
}
return 0;
}
static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) { while (1) {
SSmaObj *pSma = NULL; SSmaObj *pSma = NULL;
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
if (pIter == NULL) break; if (pIter == NULL) break;
mDebug("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, sql:%s", pSma->name, stbname, mDebug("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, sql:%s", pSma->name,
suid, colId, pSma->sql); stbFullName, suid, colId, pSma->sql);
SNode *pAst = NULL; SNode *pAst = NULL;
if (nodesStringToNode(pSma->ast, &pAst) != 0) { if (nodesStringToNode(pSma->ast, &pAst) != 0) {
terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT; terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT;
mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err", mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err",
pSma->name, stbname, suid, colId); pSma->name, stbFullName, suid, colId);
return -1; return -1;
} }
...@@ -1218,7 +1264,7 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t ...@@ -1218,7 +1264,7 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t
if ((pCol->tableId != suid) && (pSma->stbUid != suid)) { if ((pCol->tableId != suid) && (pSma->stbUid != suid)) {
mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId); mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
goto NEXT2; goto NEXT;
} }
if ((pCol->colId) > 0 && (pCol->colId == colId)) { if ((pCol->colId) > 0 && (pCol->colId == colId)) {
sdbRelease(pSdb, pSma); sdbRelease(pSdb, pSma);
...@@ -1230,11 +1276,24 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t ...@@ -1230,11 +1276,24 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t
mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId); mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
} }
NEXT2: NEXT:
sdbRelease(pSdb, pSma); sdbRelease(pSdb, pSma);
nodesDestroyNode(pAst); nodesDestroyNode(pAst);
} }
return 0;
}
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
if (mndCheckAlterColForTopic(pMnode, stbFullName, suid, colId) < 0) {
return -1;
}
if (mndCheckAlterColForStream(pMnode, stbFullName, suid, colId) < 0) {
return -1;
}
if (mndCheckAlterColForTSma(pMnode, stbFullName, suid, colId) < 0) {
return -1;
}
return 0; return 0;
} }
...@@ -1930,6 +1989,90 @@ _OVER: ...@@ -1930,6 +1989,90 @@ _OVER:
return code; return code;
} }
static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SMqTopicObj *pTopic = NULL;
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
if (pIter == NULL) break;
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
if (pTopic->stbUid == suid) {
sdbRelease(pSdb, pTopic);
return -1;
}
}
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
sdbRelease(pSdb, pTopic);
continue;
}
SNode *pAst = NULL;
if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
ASSERT(0);
return -1;
}
SNodeList *pNodeList = NULL;
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
SNode *pNode = NULL;
FOREACH(pNode, pNodeList) {
SColumnNode *pCol = (SColumnNode *)pNode;
if (pCol->tableId != suid) {
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
sdbRelease(pSdb, pTopic);
nodesDestroyNode(pAst);
return -1;
} else {
goto NEXT;
}
}
NEXT:
sdbRelease(pSdb, pTopic);
nodesDestroyNode(pAst);
}
return 0;
}
static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, int64_t suid) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
SNode *pAst = NULL;
if (nodesStringToNode(pStream->ast, &pAst) != 0) {
ASSERT(0);
return -1;
}
SNodeList *pNodeList = NULL;
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
SNode *pNode = NULL;
FOREACH(pNode, pNodeList) {
SColumnNode *pCol = (SColumnNode *)pNode;
if (pCol->tableId != suid) {
mDebug("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
sdbRelease(pSdb, pStream);
nodesDestroyNode(pAst);
return -1;
} else {
goto NEXT;
}
}
NEXT:
sdbRelease(pSdb, pStream);
nodesDestroyNode(pAst);
}
return 0;
}
static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t code = -1; int32_t code = -1;
...@@ -1971,6 +2114,16 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { ...@@ -1971,6 +2114,16 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
if (mndCheckDropStbForTopic(pMnode, dropReq.name, pStb->uid) < 0) {
terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED;
goto _OVER;
}
if (mndCheckDropStbForStream(pMnode, dropReq.name, pStb->uid) < 0) {
terrno = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
goto _OVER;
}
code = mndDropStb(pMnode, pReq, pDb, pStb); code = mndDropStb(pMnode, pReq, pDb, pStb);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册