From 58118cfa39051db559f7e940c9e7d341eac8b81d Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 16 Mar 2020 14:14:12 +0800 Subject: [PATCH] [TD-10] drop stable message --- src/dnode/src/dnodeWrite.c | 146 ++++++++++++++++++++------------ src/inc/taosmsg.h | 8 +- src/mnode/inc/mgmtChildTable.h | 2 +- src/mnode/inc/mgmtNormalTable.h | 2 +- src/mnode/inc/mgmtVgroup.h | 2 +- src/mnode/src/mgmtChildTable.c | 30 +++---- src/mnode/src/mgmtDb.c | 2 +- src/mnode/src/mgmtNormalTable.c | 29 +++---- src/mnode/src/mgmtSuperTable.c | 15 ++-- src/mnode/src/mgmtTable.c | 66 ++++++++++++--- src/mnode/src/mgmtVgroup.c | 4 +- 11 files changed, 192 insertions(+), 114 deletions(-) diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index b59c2882f2..25430b91e4 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -93,17 +93,14 @@ void dnodeWrite(SRpcMsg *pMsg) { char *pCont = (char *) pMsg->pCont; SRpcContext *pRpcContext = NULL; - int32_t numOfVnodes = 0; - if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { - // TODO parse head, get number of vnodes; - numOfVnodes = 1; - } else { - numOfVnodes = 1; - } - - if (numOfVnodes > 1) { - pRpcContext = calloc(sizeof(SRpcContext), 1); - pRpcContext->numOfVnodes = numOfVnodes; + if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) { + SWriteMsgDesc *pDesc = pCont; + pDesc->numOfVnodes = htonl(pDesc->numOfVnodes); + pCont += sizeof(SWriteMsgDesc); + if (pDesc->numOfVnodes > 1) { + pRpcContext = calloc(sizeof(SRpcContext), 1); + pRpcContext->numOfVnodes = pDesc->numOfVnodes; + } } while (leftLen > 0) { @@ -291,26 +288,9 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) { static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont; - dTrace("table:%s, start to create in dnode, vgroup:%d", pTable->tableId, pTable->vgId); - SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - void *pVnode = dnodeGetVnode(pTable->vgId); - if (pVnode == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_VGROUP_ID; - dTrace("table:%s, failed to create in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code)); - rpcSendResponse(&rpcRsp); - return; - } - - void *pTsdb = dnodeGetVnodeTsdb(pVnode); - if (pTsdb == NULL) { - dnodeReleaseVnode(pVnode); - rpcRsp.code = TSDB_CODE_NOT_ACTIVE_VNODE; - dTrace("table:%s, failed to create in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code)); - rpcSendResponse(&rpcRsp); - return; - } + dTrace("table:%s, start to create in dnode, vgroup:%d", pTable->tableId, pTable->vgId); pTable->numOfColumns = htons(pTable->numOfColumns); pTable->numOfTags = htons(pTable->numOfTags); pTable->sid = htonl(pTable->sid); @@ -344,7 +324,6 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { } tsdbTableSetSchema(&tCfg, pDestTagSchema, false); - // TODO: add data row char *pTagData = pTable->data + totalCols * sizeof(SSchema); int accumBytes = 0; SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); @@ -356,50 +335,107 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { tsdbTableSetTagValue(&tCfg, dataRow, false); } + void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + rpcRsp.code = tsdbCreateTable(pTsdb, &tCfg); - dnodeReleaseVnode(pVnode); + dnodeReleaseVnode(pMsg->pVnode); - if (rpcRsp.code != TSDB_CODE_SUCCESS) { - dError("table:%s, failed to create in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code)); - rpcSendResponse(&rpcRsp); - } else { - dTrace("table:%s, created in dnode", pTable->tableId); - rpcSendResponse(&rpcRsp); - } + dTrace("table:%s, create table result:%s", pTable->tableId, tstrerror(rpcRsp.code)); + rpcSendResponse(&rpcRsp); } static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) { SMDDropTableMsg *pTable = pMsg->rpcMsg.pCont; - dPrint("table:%s, sid:%d is dropped", pTable->tableId, pTable->sid); - -// pTable->sid = htonl(pTable->sid); -// pTable->numOfVPeers = htonl(pTable->numOfVPeers); -// pTable->uid = htobe64(pTable->uid); -// -// for (int i = 0; i < pTable->numOfVPeers; ++i) { -// pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip); -// pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode); -// } -// -// int32_t code = dnodeDropTable(pTable); -// SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + + dTrace("table:%s, start to drop in dnode, vgroup:%d", pTable->tableId, pTable->vgId); + STableId tableId = { + .uid = htobe64(pTable->uid), + .tid = htonl(pTable->sid) + }; + + void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + + rpcRsp.code = tsdbDropTable(pTsdb, tableId); + dnodeReleaseVnode(pMsg->pVnode); + + dTrace("table:%s, drop table result:%s", pTable->tableId, tstrerror(rpcRsp.code)); rpcSendResponse(&rpcRsp); } static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg) { SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont; - dPrint("table:%s, sid:%d is alterd", pTable->tableId, pTable->sid); - SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + + dTrace("table:%s, start to alter in dnode, vgroup:%d", pTable->tableId, pTable->vgId); + pTable->numOfColumns = htons(pTable->numOfColumns); + pTable->numOfTags = htons(pTable->numOfTags); + pTable->sid = htonl(pTable->sid); + pTable->sversion = htonl(pTable->sversion); + pTable->tagDataLen = htonl(pTable->tagDataLen); + pTable->sqlDataLen = htonl(pTable->sqlDataLen); + pTable->uid = htobe64(pTable->uid); + pTable->superTableUid = htobe64(pTable->superTableUid); + pTable->createdTime = htobe64(pTable->createdTime); + SSchema *pSchema = (SSchema *) pTable->data; + + int totalCols = pTable->numOfColumns + pTable->numOfTags; + for (int i = 0; i < totalCols; i++) { + pSchema[i].colId = htons(pSchema[i].colId); + pSchema[i].bytes = htons(pSchema[i].bytes); + } + + STableCfg tCfg; + tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid); + + STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns); + for (int i = 0; i < pTable->numOfColumns; i++) { + tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); + } + tsdbTableSetSchema(&tCfg, pDestSchema, false); + + if (pTable->numOfTags != 0) { + STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags); + for (int i = pTable->numOfColumns; i < totalCols; i++) { + tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); + } + tsdbTableSetSchema(&tCfg, pDestTagSchema, false); + + char *pTagData = pTable->data + totalCols * sizeof(SSchema); + int accumBytes = 0; + SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); + + for (int i = 0; i < pTable->numOfTags; i++) { + tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i); + accumBytes += pSchema[i + pTable->numOfColumns].bytes; + } + tsdbTableSetTagValue(&tCfg, dataRow, false); + } + + void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + + rpcRsp.code = tsdbAlterTable(pTsdb, &tCfg); + dnodeReleaseVnode(pMsg->pVnode); + + dTrace("table:%s, alter table result:%s", pTable->tableId, tstrerror(rpcRsp.code)); rpcSendResponse(&rpcRsp); } static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) { SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont; - dPrint("stable:%s, is dropped", pTable->tableId); - SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + + dTrace("stable:%s, start to drop in dnode, vgroup:%d", pTable->tableId, pTable->vgId); + pTable->uid = htobe64(pTable->uid); + + // TODO: drop stable in vvnode + //void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + //rpcRsp.code = tsdbDropSTable(pTsdb, pTable->uid); + + rpcRsp.code = TSDB_CODE_SUCCESS; + dnodeReleaseVnode(pMsg->pVnode); + + dTrace("stable:%s, drop stable result:%s", pTable->tableId, tstrerror(rpcRsp.code)); rpcSendResponse(&rpcRsp); } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 4b5068aee4..6a96b658bc 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -234,6 +234,10 @@ typedef struct { uint32_t ip; } SVnodeDesc; +typedef struct { + int32_t numOfVnodes; +} SWriteMsgDesc; + typedef struct { int32_t contLen; int32_t vgId; @@ -341,8 +345,10 @@ typedef struct { } SMDDropTableMsg; typedef struct { - char tableId[TSDB_TABLE_ID_LEN + 1]; + int32_t contLen; + int32_t vgId; int64_t uid; + char tableId[TSDB_TABLE_ID_LEN + 1]; } SMDDropSTableMsg; typedef struct { diff --git a/src/mnode/inc/mgmtChildTable.h b/src/mnode/inc/mgmtChildTable.h index 680c48dc23..7847357d65 100644 --- a/src/mnode/inc/mgmtChildTable.h +++ b/src/mnode/inc/mgmtChildTable.h @@ -33,7 +33,7 @@ void * mgmtGetChildTable(char *tableId); void *mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pCreate, SChildTableObj *pTable); -int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable); +int32_t mgmtDropChildTable(SChildTableObj *pTable); int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent); int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *pMeta, bool usePublicIp); diff --git a/src/mnode/inc/mgmtNormalTable.h b/src/mnode/inc/mgmtNormalTable.h index c612c1325a..0577647d22 100644 --- a/src/mnode/inc/mgmtNormalTable.h +++ b/src/mnode/inc/mgmtNormalTable.h @@ -31,7 +31,7 @@ void * mgmtGetNormalTable(char *tableId); void * mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); void * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable); -int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable); +int32_t mgmtDropNormalTable(SNormalTableObj *pTable); int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols); int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName); diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 7ac97076c7..10f404f386 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -30,7 +30,7 @@ SVgObj *mgmtGetVgroup(int32_t vgId); SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode); void mgmtCreateVgroup(SQueuedMsg *pMsg); -int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup); +int32_t mgmtDropVgroup(SVgObj *pVgroup); void mgmtUpdateVgroup(SVgObj *pVgroup); void mgmtSetVgroupIdPool(); diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 0b577f710f..6544d969a8 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -355,45 +355,37 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t return pTable; } -int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) { +int32_t mgmtDropChildTable(SChildTableObj *pTable) { SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { mError("table:%s, failed to drop child table, vgroup not exist", pTable->tableId); return TSDB_CODE_OTHERS; } - SMDDropTableMsg *pRemove = rpcMallocCont(sizeof(SMDDropTableMsg)); - if (pRemove == NULL) { + SMDDropTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropTableMsg)); + if (pDrop == NULL) { mError("table:%s, failed to drop child table, no enough memory", pTable->tableId); return TSDB_CODE_SERV_OUT_OF_MEMORY; } - strcpy(pRemove->tableId, pTable->tableId); - pRemove->vgId = htonl(pTable->vgId); - pRemove->contLen = htonl(sizeof(SMDDropTableMsg)); - pRemove->sid = htonl(pTable->sid); - pRemove->uid = htobe64(pTable->uid); + strcpy(pDrop->tableId, pTable->tableId); + pDrop->vgId = htonl(pTable->vgId); + pDrop->contLen = htonl(sizeof(SMDDropTableMsg)); + pDrop->sid = htonl(pTable->sid); + pDrop->uid = htobe64(pTable->uid); SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - mTrace("table:%s, send drop table msg", pRemove->tableId); + mTrace("table:%s, send drop table msg", pDrop->tableId); SRpcMsg rpcMsg = { .handle = 0, - .pCont = pRemove, + .pCont = pDrop, .contLen = sizeof(SMDDropTableMsg), .code = 0, .msgType = TSDB_MSG_TYPE_MD_DROP_TABLE }; - mgmtSendMsgToDnode(&ipSet, &rpcMsg); - - if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) { - mError("table:%s, update ctables sdb error", pTable->tableId); - return TSDB_CODE_SDB_ERROR; - } - if (pVgroup->numOfTables <= 0) { - mgmtDropVgroup(pDb, pVgroup); - } + mgmtSendMsgToDnode(&ipSet, &rpcMsg); return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 12c34ad057..92699b69a6 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -368,7 +368,7 @@ static bool mgmtCheckDropDbFinished(SDbObj *pDb) { } static void mgmtDropDbFromSdb(SDbObj *pDb) { - while (pDb->pHead) mgmtDropVgroup(pDb, pDb->pHead); + while (pDb->pHead) mgmtDropVgroup(pDb->pHead); // SSuperTableObj *pMetric = pDb->pSTable; // while (pMetric) { diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index 8c6645b6c2..5328945bb9 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -389,43 +389,36 @@ void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t return pTable; } -int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) { +int32_t mgmtDropNormalTable(SNormalTableObj *pTable) { SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { mError("table:%s, failed to drop normal table, vgroup not exist", pTable->tableId); return TSDB_CODE_OTHERS; } - SMDDropTableMsg *pRemove = rpcMallocCont(sizeof(SMDDropTableMsg)); - if (pRemove == NULL) { + SMDDropTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropTableMsg)); + if (pDrop == NULL) { mError("table:%s, failed to drop normal table, no enough memory", pTable->tableId); return TSDB_CODE_SERV_OUT_OF_MEMORY; } - strcpy(pRemove->tableId, pTable->tableId); - pRemove->sid = htonl(pTable->sid); - pRemove->uid = htobe64(pTable->uid); + strcpy(pDrop->tableId, pTable->tableId); + pDrop->contLen = htonl(sizeof(SMDDropTableMsg)); + pDrop->vgId = htonl(pVgroup->vgId); + pDrop->sid = htonl(pTable->sid); + pDrop->uid = htobe64(pTable->uid); SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - mTrace("table:%s, send drop table msg", pRemove->tableId); + mTrace("table:%s, send drop table msg", pDrop->tableId); SRpcMsg rpcMsg = { .handle = 0, - .pCont = pRemove, + .pCont = pDrop, .contLen = sizeof(SMDDropTableMsg), .code = 0, .msgType = TSDB_MSG_TYPE_MD_DROP_TABLE }; - mgmtSendMsgToDnode(&ipSet, &rpcMsg); - - if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) { - mError("table:%s, update ntables sdb error", pTable->tableId); - return TSDB_CODE_SDB_ERROR; - } - - if (pVgroup->numOfTables <= 0) { - mgmtDropVgroup(pDb, pVgroup); - } + mgmtSendMsgToDnode(&ipSet, &rpcMsg); return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index 34665d1db7..2f3debf158 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -250,11 +250,16 @@ int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) { return TSDB_CODE_SUCCESS; } -int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pSuperTable) { - //TODO drop all child tables - - mgmtRemoveSuperTableFromDb(pDb); - return sdbDeleteRow(tsSuperTableSdb, pSuperTable); +int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pStable) { + if (pStable->numOfTables != 0) { + mError("stable:%s, numOfTables:%d not 0", pStable->tableId, pStable->numOfTables); + return TSDB_CODE_OTHERS; + } else { + //TODO: drop child tables + mError("stable:%s, is dropped from sdb", pStable->tableId); + mgmtRemoveSuperTableFromDb(pDb); + return TSDB_CODE_OTHERS; + } } void* mgmtGetSuperTable(char *tableId) { diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 97f2542a78..50d1697cd8 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -51,6 +51,7 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *queueMsg); static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *queueMsg); static void mgmtProcessSuperTableMetaMsg(SQueuedMsg *queueMsg); static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg); +static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg); static int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle); @@ -82,7 +83,7 @@ int32_t mgmtInitTables() { mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateTableRsp); - mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, NULL); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropTableRsp); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, NULL); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, NULL); @@ -439,19 +440,19 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { return; } pMDCreate = mgmtBuildCreateChildTableMsg(pCreate, pTable); - if (pCreate == NULL) { + if (pMDCreate == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; } } else { mTrace("table:%s, is a normal table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg); - code = mgmtCreateNormalTable(pCreate, pVgroup, sid); + pTable = mgmtCreateNormalTable(pCreate, pVgroup, sid); if (pTable == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; } pMDCreate = mgmtBuildCreateNormalTableMsg(pTable); - if (pCreate == NULL) { + if (pMDCreate == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; } @@ -509,31 +510,36 @@ void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { } if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { - mError("table:%s, failed to create table, in monitor database", pDrop->tableId); + mError("table:%s, failed to drop table, in monitor database", pDrop->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); return; } + int32_t code; switch (pTable->type) { case TSDB_SUPER_TABLE: mTrace("table:%s, start to drop super table", pDrop->tableId); - mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable); + code = mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable); break; case TSDB_CHILD_TABLE: mTrace("table:%s, start to drop child table", pDrop->tableId); - mgmtDropChildTable(pDb, (SChildTableObj *) pTable); + code = mgmtDropChildTable((SChildTableObj *) pTable); break; case TSDB_NORMAL_TABLE: mTrace("table:%s, start to drop normal table", pDrop->tableId); - mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable); + code = mgmtDropNormalTable((SNormalTableObj *) pTable); break; case TSDB_STREAM_TABLE: mTrace("table:%s, start to drop stream table", pDrop->tableId); - mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable); + code = mgmtDropNormalTable((SNormalTableObj *) pTable); break; default: + code = TSDB_CODE_INVALID_TABLE_TYPE; mError("table:%s, invalid table type:%d", pDrop->tableId, pTable->type); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); + } + + if (code != TSDB_CODE_SUCCESS) { + mgmtSendSimpleResp(pMsg->thandle, code); } } @@ -778,3 +784,43 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) { free(queueMsg); } + +static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { + if (rpcMsg->handle == NULL) return; + + STableInfo *pTable = rpcMsg->handle; + mTrace("table:%s, drop table rsp received, thandle:%p result:%s", pTable->tableId, rpcMsg->handle, tstrerror(rpcMsg->code)); + + if (rpcMsg->code != TSDB_CODE_SUCCESS) { + mError("table:%s, failed to drop in dnode, reason:%s", pTable->tableId, tstrerror(rpcMsg->code)); + mgmtSendSimpleResp(rpcMsg->handle, rpcMsg->code); + return; + } else { + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + mError("table:%s, failed to get vgroup", pTable->tableId); + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_VGROUP_ID); + return; + } + + if (pTable->type == TSDB_CHILD_TABLE) { + if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) { + mError("table:%s, update ctables sdb error", pTable->tableId); + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SDB_ERROR); + return; + } + } else if (pTable->type == TSDB_NORMAL_TABLE){ + if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) { + mError("table:%s, update ntables sdb error", pTable->tableId); + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SDB_ERROR); + return; + } + } + + if (pVgroup->numOfTables <= 0) { + mgmtDropVgroup(pVgroup); + } + } + + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS); +} diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 000bb79d7b..7b3fb02e8a 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -185,7 +185,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) { mgmtSendCreateVgroupMsg(pVgroup, pMsg); } -int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { +int32_t mgmtDropVgroup(SVgObj *pVgroup) { STableInfo *pTable; if (pVgroup->numOfTables > 0) { @@ -197,7 +197,7 @@ int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { // } } - mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); + mTrace("vgroup:%d, replica:%d is deleted", pVgroup->vgId, pVgroup->numOfVnodes); //mgmtSendDropVgroupMsg(pVgroup, NULL); -- GitLab