diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index deb1719527dd7e8d54cfc90cedb6c92703629040..cc416e1cb7c358727ce75faedc40c682834f3ec7 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -180,7 +180,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { return TSDB_CODE_CLI_OUT_OF_MEMORY; } - pSql->ipList->ip[0] = inet_addr("192.168.0.1"); + pSql->ipList->ip[0] = inet_addr(tsPrivateIp); if (pSql->cmd.command < TSDB_SQL_MGMT) { pSql->ipList->port = tsDnodeShellPort; @@ -197,7 +197,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { rpcSendRequest(pVnodeConn, pSql->ipList, &rpcMsg); } else { pSql->ipList->port = tsMnodeShellPort; - tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); + tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); SRpcMsg rpcMsg = { .msgType = pSql->cmd.msgType, @@ -213,7 +213,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { } void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { - tscPrint("response:%s is received, len:%d error:%s", taosMsg[rpcMsg->msgType], rpcMsg->contLen, tstrerror(rpcMsg->code)); + tscTrace("response:%s is received, len:%d error:%s", taosMsg[rpcMsg->msgType], rpcMsg->contLen, tstrerror(rpcMsg->code)); SSqlObj *pSql = (SSqlObj *)rpcMsg->handle; if (pSql == NULL || pSql->signature != pSql) { tscError("%p sql is already released, signature:%p", pSql, pSql->signature); diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 20fc948844e814326cfabfdde397f15a054a4b4d..9ba6e343dc06f86150724a13220d7a5af22db63f 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -70,25 +70,31 @@ void dnodeCleanupRead() { } void dnodeRead(SRpcMsg *pMsg) { + int32_t queuedMsgNum = 0; int32_t leftLen = pMsg->contLen; char *pCont = (char *) pMsg->pCont; - int32_t contLen = 0; - int32_t numOfVnodes = 0; - int32_t vgId = 0; SRpcContext *pRpcContext = NULL; - // parse head, get number of vnodes; - if ( numOfVnodes > 1) { - pRpcContext = calloc(sizeof(SRpcContext), 1); - pRpcContext->numOfVnodes = 1; +// SMsgDesc *pDesc = pCont; +// pDesc->numOfVnodes = htonl(pDesc->numOfVnodes); +// pCont += sizeof(SMsgDesc); +// if (pDesc->numOfVnodes > 1) { +// pRpcContext = calloc(sizeof(SRpcContext), 1); +// pRpcContext->numOfVnodes = pDesc->numOfVnodes; +// } + if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) { + queuedMsgNum = 0; } while (leftLen > 0) { - // todo: parse head, get vgId, contLen + SMsgHead *pHead = (SMsgHead *) pCont; + pHead->vgId = 1; //htonl(pHead->vgId); + pHead->contLen = pMsg->contLen; //htonl(pHead->contLen); - // get pVnode from vgId - void *pVnode = dnodeGetVnode(vgId); + void *pVnode = dnodeGetVnode(pHead->vgId); if (pVnode == NULL) { + leftLen -= pHead->contLen; + pCont -= pHead->contLen; continue; } @@ -96,7 +102,7 @@ void dnodeRead(SRpcMsg *pMsg) { SReadMsg readMsg; readMsg.rpcMsg = *pMsg; readMsg.pCont = pCont; - readMsg.contLen = contLen; + readMsg.contLen = pHead->contLen; readMsg.pRpcContext = pRpcContext; readMsg.pVnode = pVnode; @@ -104,11 +110,23 @@ void dnodeRead(SRpcMsg *pMsg) { taosWriteQitem(queue, &readMsg); // next vnode - leftLen -= contLen; - pCont -= contLen; + leftLen -= pHead->contLen; + pCont -= pHead->contLen; + queuedMsgNum++; dnodeReleaseVnode(pVnode); } + + if (queuedMsgNum == 0) { + SRpcMsg rpcRsp = { + .handle = pMsg->handle, + .pCont = NULL, + .contLen = 0, + .code = TSDB_CODE_INVALID_VGROUP_ID, + .msgType = 0 + }; + rpcSendResponse(&rpcRsp); + } } void *dnodeAllocateReadWorker() { diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 8d35e6afd9004d2a51acd1d2f1a0f2962e39d4e3..b10ca16467073d7f5da7751f914717b14e70178e 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -94,9 +94,9 @@ void dnodeWrite(SRpcMsg *pMsg) { SRpcContext *pRpcContext = NULL; if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) { - SWriteMsgDesc *pDesc = pCont; + SMsgDesc *pDesc = pCont; pDesc->numOfVnodes = htonl(pDesc->numOfVnodes); - pCont += sizeof(SWriteMsgDesc); + pCont += sizeof(SMsgDesc); if (pDesc->numOfVnodes > 1) { pRpcContext = calloc(sizeof(SRpcContext), 1); pRpcContext->numOfVnodes = pDesc->numOfVnodes; @@ -104,7 +104,7 @@ void dnodeWrite(SRpcMsg *pMsg) { } while (leftLen > 0) { - SWriteMsgHead *pHead = (SWriteMsgHead *) pCont; + SMsgHead *pHead = (SMsgHead *) pCont; pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); @@ -322,7 +322,7 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { for (int i = pTable->numOfColumns; i < totalCols; i++) { tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); } - tsdbTableSetSchema(&tCfg, pDestTagSchema, false); + tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false); char *pTagData = pTable->data + totalCols * sizeof(SSchema); int accumBytes = 0; diff --git a/src/inc/mnode.h b/src/inc/mnode.h index dd996c74018c0c190e6405cd00fdbde99ad35744..48aeb2dfe6b2736f93437065b87f4c21f550e8a0 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -166,6 +166,7 @@ typedef struct _vg_obj { typedef struct _db_obj { char name[TSDB_DB_NAME_LEN + 1]; + int8_t dirty; int64_t createdTime; SDbCfg cfg; int8_t dropStatus; @@ -175,10 +176,8 @@ typedef struct _db_obj { int32_t numOfVgroups; int32_t numOfTables; int32_t numOfSuperTables; - int32_t vgStatus; - SVgObj *pHead; // empty vgroup first - SVgObj *pTail; // empty vgroup end - void * vgTimer; + SVgObj *pHead; + SVgObj *pTail; } SDbObj; struct _acctObj; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index ca8ce136f87c2c52f777de710b420821f34d4f7c..e4b083bb9d7d73162dac378e9b818c668df8c91b 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -236,12 +236,12 @@ typedef struct { typedef struct { int32_t numOfVnodes; -} SWriteMsgDesc; +} SMsgDesc; typedef struct { int32_t contLen; int32_t vgId; -} SWriteMsgHead; +} SMsgHead; typedef struct { int32_t contLen; diff --git a/src/mnode/inc/mgmtChildTable.h b/src/mnode/inc/mgmtChildTable.h index 20f6ca9c8db3394162e39889b7560139fa86f0fa..9ba5da50241f67c756dc6418de368be996fc079c 100644 --- a/src/mnode/inc/mgmtChildTable.h +++ b/src/mnode/inc/mgmtChildTable.h @@ -38,6 +38,8 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *pMeta, bool usePublicIp); +void mgmtDropAllChildTables(SDbObj *pDropDb); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtNormalTable.h b/src/mnode/inc/mgmtNormalTable.h index 6f7bd5321b88eb34a72a046c2c94601ec41b8653..c78d9fce43b27a5c470a188f44f27e65f1ce9b81 100644 --- a/src/mnode/inc/mgmtNormalTable.h +++ b/src/mnode/inc/mgmtNormalTable.h @@ -37,6 +37,8 @@ int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName); int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMeta *pMeta, bool usePublicIp); +void mgmtDropAllNormalTables(SDbObj *pDropDb); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtSuperTable.h b/src/mnode/inc/mgmtSuperTable.h index 2acf34144f00c383f1706cf2e149504b2c3d9cda..4b0599b359a660693ee8032940c8e9bc3042ab01 100644 --- a/src/mnode/inc/mgmtSuperTable.h +++ b/src/mnode/inc/mgmtSuperTable.h @@ -45,6 +45,8 @@ void * mgmtGetSuperTableVgroup(SSuperTableObj *pStable); int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName); int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable); +void mgmtDropAllSuperTables(SDbObj *pDropDb); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index 9cd28c379ec76fc1111666c074ed9edc0b0ddc59..a6c537a49b51734dd42717cef75fa657892aea5a 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -32,16 +32,9 @@ STableInfo* mgmtGetTable(char *tableId); STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid); int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp); -int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo); -int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore); -int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter); - void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable); void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable); -SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable); -SMDDropSTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable); - #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 524cc87460581740cc6ccc54360c78903ba3ae92..56a8fd054f2ce2bd35c6ed2d311f237c3ca85dee 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -30,7 +30,7 @@ SVgObj *mgmtGetVgroup(int32_t vgId); SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode); void mgmtCreateVgroup(SQueuedMsg *pMsg); -int32_t mgmtDropVgroup(SVgObj *pVgroup); +void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle); void mgmtUpdateVgroup(SVgObj *pVgroup); void mgmtSetVgroupIdPool(); diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 0c6d964ae7f58e499e811eb8daf4930cb3d52e70..1e9e55e38121696f5fb3d8329cb0432d292b391e 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -123,7 +123,6 @@ void *mgmtChildTableActionDelete(void *row, char *str, int32_t size, int32_t *ss SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { - mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId); return NULL; } @@ -462,13 +461,37 @@ int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *p for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { if (usePublicIp) { pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; - pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); } else { pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip; - pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); } + pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId); } pMeta->numOfVpeers = pVgroup->numOfVnodes; return TSDB_CODE_SUCCESS; } + +void mgmtDropAllChildTables(SDbObj *pDropDb) { + void *pNode = NULL; + void *pLastNode = NULL; + int32_t numOfTables = 0; + int32_t dbNameLen = strlen(pDropDb->name); + SChildTableObj *pTable = NULL; + + while (1) { + pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable); + if (pTable == NULL) { + break; + } + + if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) { + sdbDeleteRow(tsChildTableSdb, pTable); + pNode = pLastNode; + numOfTables ++; + continue; + } + } + + mTrace("db:%s, all child tables:%d is dropped", pDropDb->name, numOfTables); +} \ No newline at end of file diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index ca441bfb2e154923210da3e46cb982725b070e1c..10a60cf927be2c291b544e40ca5d0732e891ca53 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -27,6 +27,9 @@ #include "mgmtMnode.h" #include "mgmtGrant.h" #include "mgmtShell.h" +#include "mgmtNormalTable.h" +#include "mgmtChildTable.h" +#include "mgmtSuperTable.h" #include "mgmtTable.h" #include "mgmtUser.h" #include "mgmtVgroup.h" @@ -34,10 +37,9 @@ static void *tsDbSdb = NULL; static int32_t tsDbUpdateSize; -static int32_t mgmtUpdateDb(SDbObj *pDb); static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate); -static int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists); -static int32_t mgmtDropDb(SDbObj *pDb); +static void mgmtDropDb(void *handle, void *tmrId); +static void mgmtSetDbDirty(SDbObj *pDb); static int32_t mgmtGetDbMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn); @@ -60,7 +62,7 @@ static void mgmtDbActionInit() { mgmtDbActionFp[SDB_TYPE_UPDATE] = mgmtDbActionUpdate; mgmtDbActionFp[SDB_TYPE_ENCODE] = mgmtDbActionEncode; mgmtDbActionFp[SDB_TYPE_DECODE] = mgmtDbActionDecode; - mgmtDbActionFp[SDB_TYPE_RESET] = mgmtDbActionReset; + mgmtDbActionFp[SDB_TYPE_RESET] = mgmtDbActionReset; mgmtDbActionFp[SDB_TYPE_DESTROY] = mgmtDbActionDestroy; } @@ -98,8 +100,6 @@ int32_t mgmtInitDbs() { pDb->numOfTables = 0; pDb->numOfVgroups = 0; pDb->numOfSuperTables = 0; - pDb->vgStatus = TSDB_VG_STATUS_READY; - pDb->vgTimer = NULL; pAcct = mgmtGetAcct(pDb->cfg.acct); if (pAcct != NULL) mgmtAddDbIntoAcct(pAcct, pDb); @@ -293,135 +293,6 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) { return code; } -static int32_t mgmtUpdateDb(SDbObj *pDb) { - return sdbUpdateRow(tsDbSdb, pDb, tsDbUpdateSize, 1); -} - -static int32_t mgmtSetDbDropping(SDbObj *pDb) { - if (pDb->dropStatus == TSDB_DB_STATUS_DROP_FROM_SDB) return 0; - - SVgObj *pVgroup = pDb->pHead; - while (pVgroup != NULL) { - for (int32_t i = 0; i < pVgroup->numOfVnodes; i++) { - SVnodeGid *pVnodeGid = pVgroup->vnodeGid + i; - SDnodeObj *pDnode = mgmtGetDnode(pVnodeGid->ip); - if (pDnode == NULL) continue; - - SVnodeLoad *pVload = &pDnode->vload[pVnodeGid->vnode]; - if (pVload->dropStatus != TSDB_VN_DROP_STATUS_DROPPING) { - pVload->dropStatus = TSDB_VN_DROP_STATUS_DROPPING; - - mPrint("dnode:%s vnode:%d db:%s set to dropping status", taosIpStr(pDnode->privateIp), pVnodeGid->vnode, pDb->name); - if (mgmtUpdateDnode(pDnode) < 0) { - mError("db:%s drop failed, dnode sdb update error", pDb->name); - return TSDB_CODE_SDB_ERROR; - } - } - } - - //void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) { - // mTrace("vgroup:%d send free vgroup msg, ahandle:%p", pVgroup->vgId, ahandle); - // - // for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - // SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); - // mgmtSendDropVnodeMsg(pVgroup->vnodeGid[i].vnode, &ipSet, ahandle); - // } - //} - // -// mgmtSendDropVgroupMsg(pVgroup); - pVgroup = pVgroup->next; - } - - if (pDb->dropStatus == TSDB_DB_STATUS_DROPPING) return 0; - - pDb->dropStatus = TSDB_DB_STATUS_DROPPING; - if (mgmtUpdateDb(pDb) < 0) { - mError("db:%s drop failed, db sdb update error", pDb->name); - return TSDB_CODE_SDB_ERROR; - } - - mPrint("db:%s set to dropping status", pDb->name); - return 0; -} - -static bool mgmtCheckDropDbFinished(SDbObj *pDb) { - SVgObj *pVgroup = pDb->pHead; - while (pVgroup) { - for (int32_t i = 0; i < pVgroup->numOfVnodes; i++) { - SVnodeGid *pVnodeGid = pVgroup->vnodeGid + i; - SDnodeObj *pDnode = mgmtGetDnode(pVnodeGid->ip); - - if (pDnode == NULL) continue; - if (pDnode->status == TSDB_DN_STATUS_OFFLINE) continue; - - SVnodeLoad *pVload = &pDnode->vload[pVnodeGid->vnode]; - if (pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) { - mTrace("dnode:%s, vnode:%d db:%s wait dropping", taosIpStr(pDnode->privateIp), pVnodeGid->vnode, pDb->name); - return false; - } - } - pVgroup = pVgroup->next; - } - - mPrint("db:%s all vnodes drop finished", pDb->name); - return true; -} - -static void mgmtDropDbFromSdb(SDbObj *pDb) { - while (pDb->pHead) mgmtDropVgroup(pDb->pHead); - -// SSuperTableObj *pMetric = pDb->pSTable; -// while (pMetric) { -// SSuperTableObj *pNext = pMetric->next; -// mgmtDropTable(pDb, pMetric->tableId, 0); -// pMetric = pNext; -// } - - mPrint("db:%s all meters drop finished", pDb->name); - sdbDeleteRow(tsDbSdb, pDb); - mPrint("db:%s database drop finished", pDb->name); -} - -static int32_t mgmtDropDb(SDbObj *pDb) { - - if (pDb->dropStatus == TSDB_DB_STATUS_DROPPING) { - bool finished = mgmtCheckDropDbFinished(pDb); - if (!finished) { - SVgObj *pVgroup = pDb->pHead; - while (pVgroup != NULL) { - //mgmtSendDropVgroupMsg(pVgroup, NULL); - pVgroup = pVgroup->next; - } - return TSDB_CODE_ACTION_IN_PROGRESS; - } - - // don't sync this action - pDb->dropStatus = TSDB_DB_STATUS_DROP_FROM_SDB; - mgmtDropDbFromSdb(pDb); - return 0; - } else { - int32_t code = mgmtSetDbDropping(pDb); - if (code != 0) return code; - return TSDB_CODE_ACTION_IN_PROGRESS; - } -} - -UNUSED_FUNC -static int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists) { - SDbObj *pDb = (SDbObj *)sdbGetRow(tsDbSdb, name); - if (pDb == NULL) { - if (ignoreNotExists) return TSDB_CODE_SUCCESS; - mWarn("db:%s is not there", name); - return TSDB_CODE_INVALID_DB; - } - - if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { - return TSDB_CODE_MONITOR_DB_FORBIDDEN; - } - - return mgmtDropDb(pDb); -} - bool mgmtCheckIsMonitorDB(char *db, char *monitordb) { char dbName[TSDB_DB_NAME_LEN + 1] = {0}; extractDBName(db, dbName); @@ -430,20 +301,6 @@ bool mgmtCheckIsMonitorDB(char *db, char *monitordb) { return (strncasecmp(dbName, monitordb, len) == 0 && len == strlen(monitordb)); } -UNUSED_FUNC -static void mgmtMonitorDbDrop(void *unused, void *unusedt) { - void * pNode = NULL; - SDbObj *pDb = NULL; - - while (1) { - pNode = sdbFetchRow(tsDbSdb, pNode, (void **)&pDb); - if (pDb == NULL) break; - if (pDb->dropStatus != TSDB_DB_STATUS_DROPPING) continue; - mgmtDropDb(pDb); - break; - } -} - static int32_t mgmtAlterDb(SAcctObj *pAcct, SCMAlterDbMsg *pAlter) { return 0; // int32_t code = TSDB_CODE_SUCCESS; @@ -840,7 +697,6 @@ void *mgmtDbActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { pDb->pTail = NULL; pDb->numOfVgroups = 0; pDb->numOfTables = 0; - pDb->vgTimer = NULL; mgmtAddDbIntoAcct(pAcct, pDb); return NULL; @@ -851,6 +707,10 @@ void *mgmtDbActionDelete(void *row, char *str, int32_t size, int32_t *ssize) { SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); mgmtRemoveDbFromAcct(pAcct, pDb); + mgmtDropAllNormalTables(pDb); + mgmtDropAllChildTables(pDb); + mgmtDropAllSuperTables(pDb); + return NULL; } @@ -906,6 +766,10 @@ void mgmtRemoveTableFromDb(SDbObj *pDb) { atomic_add_fetch_32(&pDb->numOfTables, -1); } +static void mgmtSetDbDirty(SDbObj *pDb) { + pDb->dirty = true; +} + static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) { if (mgmtCheckRedirect(pMsg->thandle)) return; @@ -919,7 +783,6 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) { pCreate->commitTime = htonl(pCreate->commitTime); pCreate->blocksPerTable = htons(pCreate->blocksPerTable); pCreate->rowsInFileBlock = htonl(pCreate->rowsInFileBlock); - // pCreate->cacheNumOfBlocks = htonl(pCreate->cacheNumOfBlocks); int32_t code; if (mgmtCheckExpired()) { @@ -957,21 +820,76 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { mgmtSendSimpleResp(pMsg->thandle, code); } +static void mgmtDropDb(void *handle, void *tmrId) { + SQueuedMsg *newMsg = handle; + SDbObj *pDb = newMsg->ahandle; + mPrint("db:%s, drop db from sdb", pDb->name); + + int32_t code = sdbDeleteRow(tsDbSdb, pDb); + if (code != 0) { + code = TSDB_CODE_SDB_ERROR; + } + + mgmtSendSimpleResp(newMsg->thandle, code); + rpcFreeCont(newMsg->pCont); + free(newMsg); +} + static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { if (mgmtCheckRedirect(pMsg->thandle)) return; - int32_t code; - if (pMsg->pUser->superAuth) { - //SCMDropDbMsg *pDrop = rpcMsg->pCont; - //rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists); - //if (rpcRsp.code == TSDB_CODE_SUCCESS) { - // mLPrint("DB:%s is dropped by %s", pDrop->db, pUser->user); - //} - } else { - code = TSDB_CODE_NO_RIGHTS; + SCMDropDbMsg *pDrop = pMsg->pCont; + mTrace("db:%s, drop db msg is received from thandle:%p", pDrop->db, pMsg->thandle); + + if (mgmtCheckExpired()) { + mError("db:%s, failed to drop, grant expired", pDrop->db); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED); + return; + } + + if (!pMsg->pUser->writeAuth) { + mError("db:%s, failed to drop, no rights", pDrop->db); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); + return; + } + + SDbObj *pDb = mgmtGetDb(pDrop->db); + if (pDb == NULL) { + if (pDrop->ignoreNotExists) { + mTrace("db:%s, db is not exist, think drop success", pDrop->db); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); + return; + } else { + mError("db:%s, failed to drop, invalid db", pDrop->db); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB); + return; + } + } + + if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { + mError("db:%s, can't drop monitor database", pDrop->db); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); + return; } - if (code != TSDB_CODE_SUCCESS) { - mgmtSendSimpleResp(pMsg->thandle, code); + mgmtSetDbDirty(pDb); + + SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); + memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); + pMsg->pCont = NULL; + + SVgObj *pVgroup = pDb->pHead; + if (pVgroup != NULL) { + mPrint("vgroup:%d, will be dropped", pVgroup->vgId); + newMsg->ahandle = pVgroup; + newMsg->expected = pVgroup->numOfVnodes; + mgmtDropVgroup(pVgroup, newMsg); + return; } + + mTrace("db:%s, all vgroups is dropped", pDb->name); + + void *tmpTmr; + newMsg->ahandle = pDb; + taosTmrReset(mgmtDropDb, 10, newMsg, tsMgmtTmr, &tmpTmr); } diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index 4ff99308a0ec0dd9445c6fa0c02857e62351c39a..799d123ac3cdce6cdde6dcc15480531da444463e 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -125,7 +125,6 @@ void *mgmtNormalTableActionDelete(void *row, char *str, int32_t size, int32_t *s SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { - mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId); return NULL; } @@ -540,14 +539,35 @@ int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMeta for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { if (usePublicIp) { pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; - pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); } else { pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip; - pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); } + pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId); } pMeta->numOfVpeers = pVgroup->numOfVnodes; return TSDB_CODE_SUCCESS; } +void mgmtDropAllNormalTables(SDbObj *pDropDb) { + void *pNode = NULL; + void *pLastNode = NULL; + int32_t numOfTables = 0; + int32_t dbNameLen = strlen(pDropDb->name); + SNormalTableObj *pTable = NULL; + + while (1) { + pNode = sdbFetchRow(tsNormalTableSdb, pNode, (void **)&pTable); + if (pTable == NULL) break; + + if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) { + sdbDeleteRow(tsNormalTableSdb, pTable); + pNode = pLastNode; + numOfTables ++; + continue; + } + } + + mTrace("db:%s, all normal tables:%d is dropped", pDropDb->name, numOfTables); +} diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 4970d8794af0293e92b2cc7c2babd602915e73bd..223e67ebfb8e5f4e2d9b1e8ac1c271b1b75360a4 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -242,6 +242,8 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) { } SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; + mTrace("show:%p, type:%s, retrieve data", pShow, taosGetShowTypeStr(pShow->type)); + if (!mgmtCheckQhandle(pRetrieve->qhandle)) { mError("pShow:%p, query memory is corrupted", pShow); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MEMORY_CORRUPTED); diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index 02cb466d0520123f86827784697361b470bd1c59..a4539f7b9f2b6200e7b4aecdaaa9113902f16743 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -612,6 +612,30 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v return numOfRows; } +void mgmtDropAllSuperTables(SDbObj *pDropDb) { + void *pNode = NULL; + void *pLastNode = NULL; + int32_t numOfTables = 0; + int32_t dbNameLen = strlen(pDropDb->name); + SSuperTableObj *pTable = NULL; + + while (1) { + pNode = sdbFetchRow(tsSuperTableSdb, pNode, (void **)&pTable); + if (pTable == NULL) { + break; + } + + if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) { + sdbDeleteRow(tsSuperTableSdb, pTable); + pNode = pLastNode; + numOfTables ++; + continue; + } + } + + mTrace("db:%s, all super tables:%d is dropped", pDropDb->name, numOfTables); +} + void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable) { pStable->numOfTables++; } diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index d77d9cbb50274c1b2e4980f1f837afd1c4c41055..abf1d9162c216e7fe14b6081abc2c9bcbaaa3e65 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -346,13 +346,6 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void * return numOfRows; } -SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable) { - SMDDropTableMsg *pRemove = NULL; - - - return pRemove; -} - void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { if (mgmtCheckRedirect(pMsg->thandle)) return; @@ -439,7 +432,7 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; } - pMDCreate = mgmtBuildCreateChildTableMsg(pCreate, pTable); + pMDCreate = mgmtBuildCreateChildTableMsg(pCreate, (SChildTableObj *) pTable); if (pMDCreate == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; @@ -451,7 +444,7 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; } - pMDCreate = mgmtBuildCreateNormalTableMsg(pTable); + pMDCreate = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable); if (pMDCreate == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; @@ -475,7 +468,7 @@ void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { if (mgmtCheckRedirect(pMsg->thandle)) return; SCMDropTableMsg *pDrop = pMsg->pCont; - mTrace("table:%s, drop msg is received from thandle:%p", pDrop->tableId, pMsg->thandle); + mTrace("table:%s, drop table msg is received from thandle:%p", pDrop->tableId, pMsg->thandle); if (mgmtCheckExpired()) { mError("table:%s, failed to drop, grant expired", pDrop->tableId); @@ -832,7 +825,7 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { if (pVgroup->numOfTables <= 0) { mPrint("vgroup:%d, all tables is dropped, drop vgroup", pVgroup->vgId); - mgmtDropVgroup(pVgroup); + mgmtDropVgroup(pVgroup, NULL); } mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SUCCESS); diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index e700fcd877bdaa76ec790dc914f671aa88932873..c2f3e199517d2e13b00d382087ec72969c3aa589 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -138,18 +138,6 @@ SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) { return pDb->pHead; } -void mgmtProcessVgTimer(void *handle, void *tmrId) { - SDbObj *pDb = (SDbObj *)handle; - if (pDb == NULL) return; - - if (pDb->vgStatus > TSDB_VG_STATUS_IN_PROGRESS) { - mTrace("db:%s, set vgroup status from %d to ready", pDb->name, pDb->vgStatus); - pDb->vgStatus = TSDB_VG_STATUS_READY; - } - - pDb->vgTimer = NULL; -} - void mgmtCreateVgroup(SQueuedMsg *pMsg) { SDbObj *pDb = pMsg->pDb; if (pDb == NULL) { @@ -188,22 +176,14 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) { mgmtSendCreateVgroupMsg(pVgroup, pMsg); } -int32_t mgmtDropVgroup(SVgObj *pVgroup) { -// STableInfo *pTable; - - if (pVgroup->numOfTables > 0) { -// for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) { -// if (pVgroup->tableList != NULL) { -// pTable = pVgroup->tableList[i]; -// if (pTable) mgmtDropTable(pDb, pTable->tableId, 0); -// } -// } +void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle) { + if (ahandle != NULL) { + mgmtSendDropVgroupMsg(pVgroup, ahandle); + } else { + mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes); + mgmtSendDropVgroupMsg(pVgroup, NULL); + sdbDeleteRow(tsVgroupSdb, pVgroup); } - - mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes); - mgmtSendDropVgroupMsg(pVgroup, NULL); - sdbDeleteRow(tsVgroupSdb, pVgroup); - return TSDB_CODE_SUCCESS; } void mgmtSetVgroupIdPool() { @@ -666,4 +646,33 @@ static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) { static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { mTrace("drop vnode msg is received"); + if (rpcMsg->handle == NULL) return; + + SQueuedMsg *queueMsg = rpcMsg->handle; + queueMsg->received++; + if (rpcMsg->code == TSDB_CODE_SUCCESS) { + queueMsg->code = rpcMsg->code; + queueMsg->successed++; + } + + SVgObj *pVgroup = queueMsg->ahandle; + mTrace("vgroup:%d, drop vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p", + pVgroup->vgId, tstrerror(rpcMsg->code), queueMsg->received, queueMsg->successed, queueMsg->expected, + queueMsg->thandle, rpcMsg->handle); + + if (queueMsg->received != queueMsg->expected) return; + + sdbDeleteRow(tsVgroupSdb, pVgroup); + + SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg)); + newMsg->msgType = queueMsg->msgType; + newMsg->thandle = queueMsg->thandle; + newMsg->pDb = queueMsg->pDb; + newMsg->pUser = queueMsg->pUser; + newMsg->contLen = queueMsg->contLen; + newMsg->pCont = rpcMallocCont(newMsg->contLen); + memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen); + mgmtAddToShellQueue(newMsg); + + free(queueMsg); } \ No newline at end of file