From ab805d418e8c237672a0dc010be51d3b9fb8e6c3 Mon Sep 17 00:00:00 2001 From: slguan Date: Sun, 15 Mar 2020 21:08:51 +0800 Subject: [PATCH] [TD-10] refact create table --- src/dnode/src/dnodeWrite.c | 1 + src/inc/taosmsg.h | 3 +- src/mnode/src/mgmtChildTable.c | 12 +-- src/mnode/src/mgmtNormalTable.c | 6 -- src/mnode/src/mgmtTable.c | 170 +++++++++++++++----------------- src/mnode/src/mgmtVgroup.c | 12 +-- 6 files changed, 93 insertions(+), 111 deletions(-) diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 1b6c6c148f..b59c2882f2 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -402,3 +402,4 @@ static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; rpcSendResponse(&rpcRsp); } + diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 94692fb15f..6e15da71d4 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -332,11 +332,10 @@ typedef struct { } SMgmtHead; typedef struct { + int32_t contLen; int32_t vgId; int32_t sid; - int32_t numOfVPeers; uint64_t uid; - SVnodeDesc vpeerDesc[TSDB_MAX_MPEERS]; char tableId[TSDB_TABLE_ID_LEN + 1]; } SMDDropTableMsg; diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 79b2280c17..386d4fecd3 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -373,14 +373,10 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) { } strcpy(pRemove->tableId, pTable->tableId); - pRemove->sid = htonl(pTable->sid); - pRemove->uid = htobe64(pTable->uid); - - pRemove->numOfVPeers = htonl(pVgroup->numOfVnodes); - for (int i = 0; i < pVgroup->numOfVnodes; ++i) { - pRemove->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip); - pRemove->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); - } + pRemove->vgId = htonl(pTable->vgId); + pRemove->contLen = htonl(sizeof(SMDDropTableMsg)); + pRemove->sid = htonl(pTable->sid); + pRemove->uid = htobe64(pTable->uid); SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index eb511bcd44..d1556562e5 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -411,12 +411,6 @@ int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) { pRemove->sid = htonl(pTable->sid); pRemove->uid = htobe64(pTable->uid); - pRemove->numOfVPeers = htonl(pVgroup->numOfVnodes); - for (int i = 0; i < pVgroup->numOfVnodes; ++i) { - pRemove->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip); - pRemove->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); - } - SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); mTrace("table:%s, send drop table msg", pRemove->tableId); SRpcMsg rpcMsg = { diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 49b0d3bca0..bfdd5d57a8 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -135,80 +135,7 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo } static void mgmtCreateTable(SVgObj *pVgroup, SQueuedMsg *pMsg) { - SCMCreateTableMsg *pCreate = pMsg->pCont; - - int32_t sid = taosAllocateId(pVgroup->idPool); - if (sid < 0) { - mTrace("tables:%s, no enough sid in vgroup:%d", pVgroup->vgId); - mgmtCreateVgroup(pMsg); - return; - } - - int32_t code; - STableInfo *pTable; - SMDCreateTableMsg *pMDCreate = NULL; - - if (pCreate->numOfColumns == 0) { - mTrace("table:%s, is a child table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg); - code = mgmtCreateChildTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable); - } else { - mTrace("table:%s, is a normal table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg); - code = mgmtCreateNormalTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable); - } - if (code != TSDB_CODE_SUCCESS) { - mTrace("table:%s, failed to create in vgroup:%d", pCreate->tableId, pVgroup->vgId); - mgmtSendSimpleResp(pMsg->thandle, code); - return; - } - - SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - SRpcMsg rpcMsg = { - .handle = pMsg, - .pCont = pMDCreate, - .contLen = htonl(pMDCreate->contLen), - .code = 0, - .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE - }; - - pMsg->ahandle = pTable; - mgmtSendMsgToDnode(&ipSet, &rpcMsg); -} - -int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore) { - STableInfo *pTable = mgmtGetTable(tableId); - if (pTable == NULL) { - if (ignore) { - mTrace("table:%s, table is not exist, think it success", tableId); - return TSDB_CODE_SUCCESS; - } else { - mError("table:%s, failed to create table, table not exist", tableId); - return TSDB_CODE_INVALID_TABLE; - } - } - - if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { - mError("table:%s, failed to create table, in monitor database", tableId); - return TSDB_CODE_MONITOR_DB_FORBIDDEN; - } - - switch (pTable->type) { - case TSDB_SUPER_TABLE: - mTrace("table:%s, start to drop super table", tableId); - return mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable); - case TSDB_CHILD_TABLE: - mTrace("table:%s, start to drop child table", tableId); - return mgmtDropChildTable(pDb, (SChildTableObj *) pTable); - case TSDB_NORMAL_TABLE: - mTrace("table:%s, start to drop normal table", tableId); - return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable); - case TSDB_STREAM_TABLE: - mTrace("table:%s, start to drop stream table", tableId); - return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable); - default: - mError("table:%s, invalid table type:%d", tableId, pTable->type); - return TSDB_CODE_INVALID_TABLE; - } } int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter) { @@ -494,29 +421,58 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { if (pVgroup == NULL) { mTrace("table:%s, start to create a new vgroup", pCreate->tableId); mgmtCreateVgroup(newMsg); + return; + } + + int32_t sid = taosAllocateId(pVgroup->idPool); + if (sid < 0) { + mTrace("tables:%s, no enough sid in vgroup:%d", pVgroup->vgId); + mgmtCreateVgroup(newMsg); + return; + } + + SMDCreateTableMsg *pMDCreate = NULL; + if (pCreate->numOfColumns == 0) { + mTrace("table:%s, is a child table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg); + code = mgmtCreateChildTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable); } else { - mTrace("table:%s, vgroup:%d is selected", pCreate->tableId, pVgroup->vgId); - mgmtCreateTable(pVgroup, newMsg); + mTrace("table:%s, is a normal table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg); + code = mgmtCreateNormalTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable); } + + if (code != TSDB_CODE_SUCCESS) { + mTrace("table:%s, failed to create in vgroup:%d", pCreate->tableId, pVgroup->vgId); + mgmtSendSimpleResp(pMsg->thandle, code); + return; + } + + SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); + SRpcMsg rpcMsg = { + .handle = newMsg, + .pCont = pMDCreate, + .contLen = htonl(pMDCreate->contLen), + .code = 0, + .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE + }; + + newMsg->ahandle = pTable; + mgmtSendMsgToDnode(&ipSet, &rpcMsg); } void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { - SCMDropTableMsg *pDrop = pMsg->pCont; + if (mgmtCheckRedirect(pMsg->thandle)) return; - if (mgmtCheckRedirect(pMsg->thandle)) { - mError("thandle:%p, failed to drop table:%s, need redirect message", pMsg->thandle, pDrop->tableId); - return; - } + SCMDropTableMsg *pDrop = pMsg->pCont; + mTrace("table:%s, drop msg is received from thandle:%p", pDrop->tableId, pMsg->thandle); - SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); - if (pUser == NULL) { - mError("table:%s, failed to drop table, invalid user", pDrop->tableId); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); + if (mgmtCheckExpired()) { + mError("table:%s, failed to drop, grant expired", pDrop->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED); return; } - if (!pUser->writeAuth) { - mError("table:%s, failed to drop table, no rights", pDrop->tableId); + if (!pMsg->pUser->writeAuth) { + mError("table:%s, failed to drop, no rights", pDrop->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); return; } @@ -528,9 +484,45 @@ void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { return; } - int32_t code = mgmtDropTable(pDb, pDrop->tableId, pDrop->igNotExists); - if (code != TSDB_CODE_ACTION_IN_PROGRESS) { - mgmtSendSimpleResp(pMsg->thandle, code); + STableInfo *pTable = mgmtGetTable(pDrop->tableId); + if (pTable == NULL) { + if (pDrop->igNotExists) { + mTrace("table:%s, table is not exist, think drop success", pDrop->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); + return; + } else { + mError("table:%s, failed to drop table, table not exist", pDrop->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); + return; + } + } + + if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { + mError("table:%s, failed to create table, in monitor database", pDrop->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); + return; + } + + switch (pTable->type) { + case TSDB_SUPER_TABLE: + mTrace("table:%s, start to drop super table", pDrop->tableId); + mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable); + break; + case TSDB_CHILD_TABLE: + mTrace("table:%s, start to drop child table", pDrop->tableId); + mgmtDropChildTable(pDb, (SChildTableObj *) pTable); + break; + case TSDB_NORMAL_TABLE: + mTrace("table:%s, start to drop normal table", pDrop->tableId); + mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable); + break; + case TSDB_STREAM_TABLE: + mTrace("table:%s, start to drop stream table", pDrop->tableId); + mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable); + break; + default: + mError("table:%s, invalid table type:%d", pDrop->tableId, pTable->type); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); } } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index a8c701a213..000bb79d7b 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -189,12 +189,12 @@ int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { STableInfo *pTable; if (pVgroup->numOfTables > 0) { - for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) { - if (pVgroup->tableList != NULL) { - pTable = pVgroup->tableList[i]; - if (pTable) mgmtDropTable(pDb, pTable->tableId, 0); - } - } +// for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) { +// if (pVgroup->tableList != NULL) { +// pTable = pVgroup->tableList[i]; +// if (pTable) mgmtDropTable(pDb, pTable->tableId, 0); +// } +// } } mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); -- GitLab