diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ed905a8d544197a7474e7fc4256d1bc4e676750d..cc416e1cb7c358727ce75faedc40c682834f3ec7 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -180,7 +180,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { return TSDB_CODE_CLI_OUT_OF_MEMORY; } - pSql->ipList->ip[0] = inet_addr("192.168.0.1"); + pSql->ipList->ip[0] = inet_addr(tsPrivateIp); if (pSql->cmd.command < TSDB_SQL_MGMT) { pSql->ipList->port = tsDnodeShellPort; @@ -197,7 +197,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { rpcSendRequest(pVnodeConn, pSql->ipList, &rpcMsg); } else { pSql->ipList->port = tsMnodeShellPort; - tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); + tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); SRpcMsg rpcMsg = { .msgType = pSql->cmd.msgType, @@ -213,7 +213,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { } void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { - tscPrint("response:%s is received, len:%d error:%s", taosMsg[rpcMsg->msgType], rpcMsg->contLen, tstrerror(rpcMsg->code)); + tscTrace("response:%s is received, len:%d error:%s", taosMsg[rpcMsg->msgType], rpcMsg->contLen, tstrerror(rpcMsg->code)); SSqlObj *pSql = (SSqlObj *)rpcMsg->handle; if (pSql == NULL || pSql->signature != pSql) { tscError("%p sql is already released, signature:%p", pSql, pSql->signature); @@ -2032,6 +2032,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tscClearFieldInfo(&pQueryInfo->fieldsInfo); msgLen = pMsg - (char*)pCreateTableMsg; + pCreateTableMsg->contLen = htonl(msgLen); pCmd->payloadLen = msgLen; pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE; diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 20fc948844e814326cfabfdde397f15a054a4b4d..9ba6e343dc06f86150724a13220d7a5af22db63f 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -70,25 +70,31 @@ void dnodeCleanupRead() { } void dnodeRead(SRpcMsg *pMsg) { + int32_t queuedMsgNum = 0; int32_t leftLen = pMsg->contLen; char *pCont = (char *) pMsg->pCont; - int32_t contLen = 0; - int32_t numOfVnodes = 0; - int32_t vgId = 0; SRpcContext *pRpcContext = NULL; - // parse head, get number of vnodes; - if ( numOfVnodes > 1) { - pRpcContext = calloc(sizeof(SRpcContext), 1); - pRpcContext->numOfVnodes = 1; +// SMsgDesc *pDesc = pCont; +// pDesc->numOfVnodes = htonl(pDesc->numOfVnodes); +// pCont += sizeof(SMsgDesc); +// if (pDesc->numOfVnodes > 1) { +// pRpcContext = calloc(sizeof(SRpcContext), 1); +// pRpcContext->numOfVnodes = pDesc->numOfVnodes; +// } + if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) { + queuedMsgNum = 0; } while (leftLen > 0) { - // todo: parse head, get vgId, contLen + SMsgHead *pHead = (SMsgHead *) pCont; + pHead->vgId = 1; //htonl(pHead->vgId); + pHead->contLen = pMsg->contLen; //htonl(pHead->contLen); - // get pVnode from vgId - void *pVnode = dnodeGetVnode(vgId); + void *pVnode = dnodeGetVnode(pHead->vgId); if (pVnode == NULL) { + leftLen -= pHead->contLen; + pCont -= pHead->contLen; continue; } @@ -96,7 +102,7 @@ void dnodeRead(SRpcMsg *pMsg) { SReadMsg readMsg; readMsg.rpcMsg = *pMsg; readMsg.pCont = pCont; - readMsg.contLen = contLen; + readMsg.contLen = pHead->contLen; readMsg.pRpcContext = pRpcContext; readMsg.pVnode = pVnode; @@ -104,11 +110,23 @@ void dnodeRead(SRpcMsg *pMsg) { taosWriteQitem(queue, &readMsg); // next vnode - leftLen -= contLen; - pCont -= contLen; + leftLen -= pHead->contLen; + pCont -= pHead->contLen; + queuedMsgNum++; dnodeReleaseVnode(pVnode); } + + if (queuedMsgNum == 0) { + SRpcMsg rpcRsp = { + .handle = pMsg->handle, + .pCont = NULL, + .contLen = 0, + .code = TSDB_CODE_INVALID_VGROUP_ID, + .msgType = 0 + }; + rpcSendResponse(&rpcRsp); + } } void *dnodeAllocateReadWorker() { diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 1b6c6c148f234daf4db713b199b4a1d22a68ed08..b10ca16467073d7f5da7751f914717b14e70178e 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -93,21 +93,18 @@ void dnodeWrite(SRpcMsg *pMsg) { char *pCont = (char *) pMsg->pCont; SRpcContext *pRpcContext = NULL; - int32_t numOfVnodes = 0; - if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { - // TODO parse head, get number of vnodes; - numOfVnodes = 1; - } else { - numOfVnodes = 1; - } - - if (numOfVnodes > 1) { - pRpcContext = calloc(sizeof(SRpcContext), 1); - pRpcContext->numOfVnodes = numOfVnodes; + if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) { + SMsgDesc *pDesc = pCont; + pDesc->numOfVnodes = htonl(pDesc->numOfVnodes); + pCont += sizeof(SMsgDesc); + if (pDesc->numOfVnodes > 1) { + pRpcContext = calloc(sizeof(SRpcContext), 1); + pRpcContext->numOfVnodes = pDesc->numOfVnodes; + } } while (leftLen > 0) { - SWriteMsgHead *pHead = (SWriteMsgHead *) pCont; + SMsgHead *pHead = (SMsgHead *) pCont; pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); @@ -291,26 +288,9 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) { static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont; - dTrace("table:%s, start to create in dnode, vgroup:%d", pTable->tableId, pTable->vgId); - SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - void *pVnode = dnodeGetVnode(pTable->vgId); - if (pVnode == NULL) { - rpcRsp.code = TSDB_CODE_INVALID_VGROUP_ID; - dTrace("table:%s, failed to create in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code)); - rpcSendResponse(&rpcRsp); - return; - } - - void *pTsdb = dnodeGetVnodeTsdb(pVnode); - if (pTsdb == NULL) { - dnodeReleaseVnode(pVnode); - rpcRsp.code = TSDB_CODE_NOT_ACTIVE_VNODE; - dTrace("table:%s, failed to create in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code)); - rpcSendResponse(&rpcRsp); - return; - } + dTrace("table:%s, start to create in dnode, vgroup:%d", pTable->tableId, pTable->vgId); pTable->numOfColumns = htons(pTable->numOfColumns); pTable->numOfTags = htons(pTable->numOfTags); pTable->sid = htonl(pTable->sid); @@ -342,9 +322,8 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { for (int i = pTable->numOfColumns; i < totalCols; i++) { tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); } - tsdbTableSetSchema(&tCfg, pDestTagSchema, false); + tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false); - // TODO: add data row char *pTagData = pTable->data + totalCols * sizeof(SSchema); int accumBytes = 0; SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); @@ -356,49 +335,107 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { tsdbTableSetTagValue(&tCfg, dataRow, false); } + void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + rpcRsp.code = tsdbCreateTable(pTsdb, &tCfg); - dnodeReleaseVnode(pVnode); + dnodeReleaseVnode(pMsg->pVnode); - if (rpcRsp.code != TSDB_CODE_SUCCESS) { - dError("table:%s, failed to create in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code)); - rpcSendResponse(&rpcRsp); - } else { - dTrace("table:%s, created in dnode", pTable->tableId); - rpcSendResponse(&rpcRsp); - } + dTrace("table:%s, create table result:%s", pTable->tableId, tstrerror(rpcRsp.code)); + rpcSendResponse(&rpcRsp); } static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) { SMDDropTableMsg *pTable = pMsg->rpcMsg.pCont; - dPrint("table:%s, sid:%d is dropped", pTable->tableId, pTable->sid); - -// pTable->sid = htonl(pTable->sid); -// pTable->numOfVPeers = htonl(pTable->numOfVPeers); -// pTable->uid = htobe64(pTable->uid); -// -// for (int i = 0; i < pTable->numOfVPeers; ++i) { -// pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip); -// pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode); -// } -// -// int32_t code = dnodeDropTable(pTable); -// SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + + dTrace("table:%s, start to drop in dnode, vgroup:%d", pTable->tableId, pTable->vgId); + STableId tableId = { + .uid = htobe64(pTable->uid), + .tid = htonl(pTable->sid) + }; + + void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + + rpcRsp.code = tsdbDropTable(pTsdb, tableId); + dnodeReleaseVnode(pMsg->pVnode); + + dTrace("table:%s, drop table result:%s", pTable->tableId, tstrerror(rpcRsp.code)); rpcSendResponse(&rpcRsp); } static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg) { SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont; - dPrint("table:%s, sid:%d is alterd", pTable->tableId, pTable->sid); - SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + + dTrace("table:%s, start to alter in dnode, vgroup:%d", pTable->tableId, pTable->vgId); + pTable->numOfColumns = htons(pTable->numOfColumns); + pTable->numOfTags = htons(pTable->numOfTags); + pTable->sid = htonl(pTable->sid); + pTable->sversion = htonl(pTable->sversion); + pTable->tagDataLen = htonl(pTable->tagDataLen); + pTable->sqlDataLen = htonl(pTable->sqlDataLen); + pTable->uid = htobe64(pTable->uid); + pTable->superTableUid = htobe64(pTable->superTableUid); + pTable->createdTime = htobe64(pTable->createdTime); + SSchema *pSchema = (SSchema *) pTable->data; + + int totalCols = pTable->numOfColumns + pTable->numOfTags; + for (int i = 0; i < totalCols; i++) { + pSchema[i].colId = htons(pSchema[i].colId); + pSchema[i].bytes = htons(pSchema[i].bytes); + } + + STableCfg tCfg; + tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid); + + STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns); + for (int i = 0; i < pTable->numOfColumns; i++) { + tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); + } + tsdbTableSetSchema(&tCfg, pDestSchema, false); + + if (pTable->numOfTags != 0) { + STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags); + for (int i = pTable->numOfColumns; i < totalCols; i++) { + tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); + } + tsdbTableSetSchema(&tCfg, pDestTagSchema, false); + + char *pTagData = pTable->data + totalCols * sizeof(SSchema); + int accumBytes = 0; + SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); + + for (int i = 0; i < pTable->numOfTags; i++) { + tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i); + accumBytes += pSchema[i + pTable->numOfColumns].bytes; + } + tsdbTableSetTagValue(&tCfg, dataRow, false); + } + + void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + + rpcRsp.code = tsdbAlterTable(pTsdb, &tCfg); + dnodeReleaseVnode(pMsg->pVnode); + + dTrace("table:%s, alter table result:%s", pTable->tableId, tstrerror(rpcRsp.code)); rpcSendResponse(&rpcRsp); } static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) { SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont; - dPrint("stable:%s, is dropped", pTable->tableId); - SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + + dTrace("stable:%s, start to it drop in dnode, vgroup:%d", pTable->tableId, pTable->vgId); + pTable->uid = htobe64(pTable->uid); + + // TODO: drop stable in vvnode + //void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + //rpcRsp.code = tsdbDropSTable(pTsdb, pTable->uid); + + rpcRsp.code = TSDB_CODE_SUCCESS; + dnodeReleaseVnode(pMsg->pVnode); + + dTrace("stable:%s, drop stable result:%s", pTable->tableId, tstrerror(rpcRsp.code)); rpcSendResponse(&rpcRsp); } + diff --git a/src/inc/mnode.h b/src/inc/mnode.h index b96905ff826441b6539382cbd858330ecae897f9..48aeb2dfe6b2736f93437065b87f4c21f550e8a0 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -39,7 +39,7 @@ extern "C" { #include "ttimer.h" #include "tutil.h" - typedef struct { +typedef struct { uint32_t privateIp; int32_t sid; uint32_t moduleStatus; @@ -77,6 +77,7 @@ extern "C" { } SDnodeObj; typedef struct { + int32_t dnodeId; uint32_t ip; uint32_t publicIp; int32_t vnode; @@ -97,6 +98,7 @@ struct _vg_obj; typedef struct SSuperTableObj { char tableId[TSDB_TABLE_ID_LEN + 1]; int8_t type; + int8_t dirty; uint64_t uid; int32_t sid; int32_t vgId; @@ -104,7 +106,7 @@ typedef struct SSuperTableObj { int32_t sversion; int32_t numOfColumns; int32_t numOfTags; - int8_t reserved[7]; + int8_t reserved[5]; int8_t updateEnd[1]; int32_t numOfTables; int16_t nextColId; @@ -114,12 +116,13 @@ typedef struct SSuperTableObj { typedef struct { char tableId[TSDB_TABLE_ID_LEN + 1]; int8_t type; + int8_t dirty; uint64_t uid; int32_t sid; int32_t vgId; int64_t createdTime; char superTableId[TSDB_TABLE_ID_LEN + 1]; - int8_t reserved[7]; + int8_t reserved[1]; int8_t updateEnd[1]; SSuperTableObj *superTable; } SChildTableObj; @@ -127,13 +130,14 @@ typedef struct { typedef struct { char tableId[TSDB_TABLE_ID_LEN + 1]; int8_t type; + int8_t dirty; uint64_t uid; int32_t sid; int32_t vgId; int64_t createdTime; int32_t sversion; int32_t numOfColumns; - int16_t sqlLen; + int32_t sqlLen; int8_t reserved[3]; int8_t updateEnd[1]; char* sql; //null-terminated string @@ -162,6 +166,7 @@ typedef struct _vg_obj { typedef struct _db_obj { char name[TSDB_DB_NAME_LEN + 1]; + int8_t dirty; int64_t createdTime; SDbCfg cfg; int8_t dropStatus; @@ -171,10 +176,8 @@ typedef struct _db_obj { int32_t numOfVgroups; int32_t numOfTables; int32_t numOfSuperTables; - int32_t vgStatus; - SVgObj *pHead; // empty vgroup first - SVgObj *pTail; // empty vgroup end - void * vgTimer; + SVgObj *pHead; + SVgObj *pTail; } SDbObj; struct _acctObj; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 94692fb15f9f834d3fb491cd150e9db449226c4f..e4b083bb9d7d73162dac378e9b818c668df8c91b 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -234,10 +234,14 @@ typedef struct { uint32_t ip; } SVnodeDesc; +typedef struct { + int32_t numOfVnodes; +} SMsgDesc; + typedef struct { int32_t contLen; int32_t vgId; -} SWriteMsgHead; +} SMsgHead; typedef struct { int32_t contLen; @@ -264,7 +268,8 @@ typedef struct { int16_t numOfTags; int16_t numOfColumns; int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string - int16_t reserved[16]; + int32_t contLen; + int8_t reserved[16]; char schema[]; } SCMCreateTableMsg; @@ -332,22 +337,22 @@ typedef struct { } SMgmtHead; typedef struct { + int32_t contLen; int32_t vgId; int32_t sid; - int32_t numOfVPeers; uint64_t uid; - SVnodeDesc vpeerDesc[TSDB_MAX_MPEERS]; char tableId[TSDB_TABLE_ID_LEN + 1]; } SMDDropTableMsg; typedef struct { - char tableId[TSDB_TABLE_ID_LEN + 1]; + int32_t contLen; + int32_t vgId; int64_t uid; + char tableId[TSDB_TABLE_ID_LEN + 1]; } SMDDropSTableMsg; typedef struct { - int32_t vgId; - int32_t vnode; + int32_t vgId; } SMDDropVnodeMsg; typedef struct SColIndexEx { diff --git a/src/mnode/inc/mgmtChildTable.h b/src/mnode/inc/mgmtChildTable.h index 5012ae8f17d21bd6f2817e6ed7dc6b0bd0184d30..9ba5da50241f67c756dc6418de368be996fc079c 100644 --- a/src/mnode/inc/mgmtChildTable.h +++ b/src/mnode/inc/mgmtChildTable.h @@ -30,13 +30,16 @@ int32_t mgmtInitChildTables(); void mgmtCleanUpChildTables(); void * mgmtGetChildTable(char *tableId); -int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, - SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut); -int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable); +void *mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); +void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pCreate, SChildTableObj *pTable); + +int32_t mgmtDropChildTable(SQueuedMsg *newMsg, SChildTableObj *pTable); int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent); int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *pMeta, bool usePublicIp); +void mgmtDropAllChildTables(SDbObj *pDropDb); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtNormalTable.h b/src/mnode/inc/mgmtNormalTable.h index f740765ed1475e951a0f5e5cc08e9667dbc11d15..c78d9fce43b27a5c470a188f44f27e65f1ce9b81 100644 --- a/src/mnode/inc/mgmtNormalTable.h +++ b/src/mnode/inc/mgmtNormalTable.h @@ -28,14 +28,17 @@ int32_t mgmtInitNormalTables(); void mgmtCleanUpNormalTables(); void * mgmtGetNormalTable(char *tableId); -int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, - SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut); -int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable); +void * mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); +void * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable); + +int32_t mgmtDropNormalTable(SQueuedMsg *newMsg, SNormalTableObj *pTable); int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols); int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName); int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMeta *pMeta, bool usePublicIp); +void mgmtDropAllNormalTables(SDbObj *pDropDb); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtSuperTable.h b/src/mnode/inc/mgmtSuperTable.h index e9da9e546df7e2215f340dca6850a1d28e214e50..4b0599b359a660693ee8032940c8e9bc3042ab01 100644 --- a/src/mnode/inc/mgmtSuperTable.h +++ b/src/mnode/inc/mgmtSuperTable.h @@ -31,8 +31,8 @@ void mgmtCleanUpSuperTables(); void * mgmtGetSuperTable(char *tableId); -int32_t mgmtCreateSuperTable(SDbObj *pDb, SCMCreateTableMsg *pCreate); -int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pTable); +int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate); +int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pTable); int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags); int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName); int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pTable, char *oldTagName, char *newTagName); @@ -45,6 +45,8 @@ void * mgmtGetSuperTableVgroup(SSuperTableObj *pStable); int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName); int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable); +void mgmtDropAllSuperTables(SDbObj *pDropDb); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index 9cd28c379ec76fc1111666c074ed9edc0b0ddc59..a6c537a49b51734dd42717cef75fa657892aea5a 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -32,16 +32,9 @@ STableInfo* mgmtGetTable(char *tableId); STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid); int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp); -int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo); -int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore); -int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter); - void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable); void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable); -SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable); -SMDDropSTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable); - #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 7ac97076c7f9113cb45082692213e02a5c76fe27..56a8fd054f2ce2bd35c6ed2d311f237c3ca85dee 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -30,7 +30,7 @@ SVgObj *mgmtGetVgroup(int32_t vgId); SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode); void mgmtCreateVgroup(SQueuedMsg *pMsg); -int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup); +void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle); void mgmtUpdateVgroup(SVgObj *pVgroup); void mgmtSetVgroupIdPool(); @@ -41,7 +41,6 @@ void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable); SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode); void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle); -void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup); SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip); diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 79b2280c17e823a2818ae91250732d47a42be449..1e9e55e38121696f5fb3d8329cb0432d292b391e 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -123,7 +123,6 @@ void *mgmtChildTableActionDelete(void *row, char *str, int32_t size, int32_t *ss SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { - mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId); return NULL; } @@ -272,19 +271,22 @@ void mgmtCleanUpChildTables() { sdbCloseTable(tsChildTableSdb); } -static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, void *pTagData, int32_t tagDataLen) { - int32_t totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags; - int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen; +void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableObj *pTable) { + char *pTagData = pMsg->schema + TSDB_TABLE_ID_LEN + 1; + int32_t tagDataLen = htonl(pMsg->contLen) - sizeof(SCMCreateTableMsg) - TSDB_TABLE_ID_LEN - 1; + int32_t totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags; + int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen; SMDCreateTableMsg *pCreate = rpcMallocCont(contLen); if (pCreate == NULL) { + terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; return NULL; } - memcpy(pCreate->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); - memcpy(pCreate->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN); + memcpy(pCreate->tableId, pTable->tableId, TSDB_TABLE_ID_LEN + 1); + memcpy(pCreate->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN + 1); pCreate->contLen = htonl(contLen); - pCreate->vgId = htonl(pVgroup->vgId); + pCreate->vgId = htonl(pTable->vgId); pCreate->tableType = pTable->type; pCreate->numOfColumns = htons(pTable->superTable->numOfColumns); pCreate->numOfTags = htons(pTable->superTable->numOfTags); @@ -305,103 +307,85 @@ static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgrou } memcpy(pCreate->data + totalCols * sizeof(SSchema), pTagData, tagDataLen); - return pCreate; } -int32_t mgmtCreateChildTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, - SMDCreateTableMsg **pMDCreateOut, STableInfo **pTableOut) { +void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t tid) { int32_t numOfTables = sdbGetNumOfRows(tsChildTableSdb); if (numOfTables >= tsMaxTables) { mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, tsMaxTables); - return TSDB_CODE_TOO_MANY_TABLES; + terrno = TSDB_CODE_TOO_MANY_TABLES; + return NULL; } char *pTagData = (char *) pCreate->schema; // it is a tag key SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData); if (pSuperTable == NULL) { mError("table:%s, corresponding super table does not exist", pCreate->tableId); - return TSDB_CODE_INVALID_TABLE; + terrno = TSDB_CODE_INVALID_TABLE; + return NULL; } SChildTableObj *pTable = (SChildTableObj *) calloc(sizeof(SChildTableObj), 1); if (pTable == NULL) { mError("table:%s, failed to alloc memory", pCreate->tableId); - return TSDB_CODE_SERV_OUT_OF_MEMORY; + terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; + return NULL; } + strcpy(pTable->tableId, pCreate->tableId); strcpy(pTable->superTableId, pSuperTable->tableId); pTable->type = TSDB_CHILD_TABLE; + pTable->createdTime = taosGetTimestampMs(); pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul)); - pTable->sid = sid; + pTable->sid = tid; pTable->vgId = pVgroup->vgId; - pTable->createdTime = taosGetTimestampMs(); pTable->superTable = pSuperTable; if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) { free(pTable); mError("table:%s, update sdb error", pCreate->tableId); - return TSDB_CODE_SDB_ERROR; - } - - pTagData += (TSDB_TABLE_ID_LEN + 1); - int32_t tagDataLen = contLen - sizeof(SCMCreateTableMsg) - TSDB_TABLE_ID_LEN - 1; - *pMDCreateOut = mgmtBuildCreateChildTableMsg(pTable, pVgroup, pTagData, tagDataLen); - if (*pMDCreateOut == NULL) { - mError("table:%s, failed to build create table message", pCreate->tableId); - return TSDB_CODE_SERV_OUT_OF_MEMORY; + terrno = TSDB_CODE_SDB_ERROR; + return NULL; } - *pTableOut = (STableInfo *) pTable; - mTrace("table:%s, create ctable in vgroup, uid:%" PRIu64 , pTable->tableId, pTable->uid); - return TSDB_CODE_SUCCESS; + return pTable; } -int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) { +int32_t mgmtDropChildTable(SQueuedMsg *newMsg, SChildTableObj *pTable) { SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { mError("table:%s, failed to drop child table, vgroup not exist", pTable->tableId); return TSDB_CODE_OTHERS; } - SMDDropTableMsg *pRemove = rpcMallocCont(sizeof(SMDDropTableMsg)); - if (pRemove == NULL) { + SMDDropTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropTableMsg)); + if (pDrop == NULL) { mError("table:%s, failed to drop child table, no enough memory", pTable->tableId); return TSDB_CODE_SERV_OUT_OF_MEMORY; } - strcpy(pRemove->tableId, pTable->tableId); - pRemove->sid = htonl(pTable->sid); - pRemove->uid = htobe64(pTable->uid); - - pRemove->numOfVPeers = htonl(pVgroup->numOfVnodes); - for (int i = 0; i < pVgroup->numOfVnodes; ++i) { - pRemove->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip); - pRemove->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); - } + strcpy(pDrop->tableId, pTable->tableId); + pDrop->vgId = htonl(pTable->vgId); + pDrop->contLen = htonl(sizeof(SMDDropTableMsg)); + pDrop->sid = htonl(pTable->sid); + pDrop->uid = htobe64(pTable->uid); SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - mTrace("table:%s, send drop table msg", pRemove->tableId); + mTrace("table:%s, send drop table msg", pDrop->tableId); SRpcMsg rpcMsg = { - .handle = 0, - .pCont = pRemove, + .handle = newMsg, + .pCont = pDrop, .contLen = sizeof(SMDDropTableMsg), .code = 0, .msgType = TSDB_MSG_TYPE_MD_DROP_TABLE }; - mgmtSendMsgToDnode(&ipSet, &rpcMsg); - - if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) { - mError("table:%s, update ctables sdb error", pTable->tableId); - return TSDB_CODE_SDB_ERROR; - } - if (pVgroup->numOfTables <= 0) { - mgmtDropVgroup(pDb, pVgroup); - } + newMsg->ahandle = pTable; + mgmtSendMsgToDnode(&ipSet, &rpcMsg); return TSDB_CODE_SUCCESS; } @@ -477,13 +461,37 @@ int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *p for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { if (usePublicIp) { pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; - pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); } else { pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip; - pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); } + pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId); } pMeta->numOfVpeers = pVgroup->numOfVnodes; return TSDB_CODE_SUCCESS; } + +void mgmtDropAllChildTables(SDbObj *pDropDb) { + void *pNode = NULL; + void *pLastNode = NULL; + int32_t numOfTables = 0; + int32_t dbNameLen = strlen(pDropDb->name); + SChildTableObj *pTable = NULL; + + while (1) { + pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable); + if (pTable == NULL) { + break; + } + + if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) { + sdbDeleteRow(tsChildTableSdb, pTable); + pNode = pLastNode; + numOfTables ++; + continue; + } + } + + mTrace("db:%s, all child tables:%d is dropped", pDropDb->name, numOfTables); +} \ No newline at end of file diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 12c34ad05762217a164a235ef554cced31c0218a..10a60cf927be2c291b544e40ca5d0732e891ca53 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -27,6 +27,9 @@ #include "mgmtMnode.h" #include "mgmtGrant.h" #include "mgmtShell.h" +#include "mgmtNormalTable.h" +#include "mgmtChildTable.h" +#include "mgmtSuperTable.h" #include "mgmtTable.h" #include "mgmtUser.h" #include "mgmtVgroup.h" @@ -34,10 +37,9 @@ static void *tsDbSdb = NULL; static int32_t tsDbUpdateSize; -static int32_t mgmtUpdateDb(SDbObj *pDb); static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate); -static int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists); -static int32_t mgmtDropDb(SDbObj *pDb); +static void mgmtDropDb(void *handle, void *tmrId); +static void mgmtSetDbDirty(SDbObj *pDb); static int32_t mgmtGetDbMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn); @@ -60,7 +62,7 @@ static void mgmtDbActionInit() { mgmtDbActionFp[SDB_TYPE_UPDATE] = mgmtDbActionUpdate; mgmtDbActionFp[SDB_TYPE_ENCODE] = mgmtDbActionEncode; mgmtDbActionFp[SDB_TYPE_DECODE] = mgmtDbActionDecode; - mgmtDbActionFp[SDB_TYPE_RESET] = mgmtDbActionReset; + mgmtDbActionFp[SDB_TYPE_RESET] = mgmtDbActionReset; mgmtDbActionFp[SDB_TYPE_DESTROY] = mgmtDbActionDestroy; } @@ -98,8 +100,6 @@ int32_t mgmtInitDbs() { pDb->numOfTables = 0; pDb->numOfVgroups = 0; pDb->numOfSuperTables = 0; - pDb->vgStatus = TSDB_VG_STATUS_READY; - pDb->vgTimer = NULL; pAcct = mgmtGetAcct(pDb->cfg.acct); if (pAcct != NULL) mgmtAddDbIntoAcct(pAcct, pDb); @@ -293,135 +293,6 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) { return code; } -static int32_t mgmtUpdateDb(SDbObj *pDb) { - return sdbUpdateRow(tsDbSdb, pDb, tsDbUpdateSize, 1); -} - -static int32_t mgmtSetDbDropping(SDbObj *pDb) { - if (pDb->dropStatus == TSDB_DB_STATUS_DROP_FROM_SDB) return 0; - - SVgObj *pVgroup = pDb->pHead; - while (pVgroup != NULL) { - for (int32_t i = 0; i < pVgroup->numOfVnodes; i++) { - SVnodeGid *pVnodeGid = pVgroup->vnodeGid + i; - SDnodeObj *pDnode = mgmtGetDnode(pVnodeGid->ip); - if (pDnode == NULL) continue; - - SVnodeLoad *pVload = &pDnode->vload[pVnodeGid->vnode]; - if (pVload->dropStatus != TSDB_VN_DROP_STATUS_DROPPING) { - pVload->dropStatus = TSDB_VN_DROP_STATUS_DROPPING; - - mPrint("dnode:%s vnode:%d db:%s set to dropping status", taosIpStr(pDnode->privateIp), pVnodeGid->vnode, pDb->name); - if (mgmtUpdateDnode(pDnode) < 0) { - mError("db:%s drop failed, dnode sdb update error", pDb->name); - return TSDB_CODE_SDB_ERROR; - } - } - } - - //void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) { - // mTrace("vgroup:%d send free vgroup msg, ahandle:%p", pVgroup->vgId, ahandle); - // - // for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - // SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); - // mgmtSendDropVnodeMsg(pVgroup->vnodeGid[i].vnode, &ipSet, ahandle); - // } - //} - // -// mgmtSendDropVgroupMsg(pVgroup); - pVgroup = pVgroup->next; - } - - if (pDb->dropStatus == TSDB_DB_STATUS_DROPPING) return 0; - - pDb->dropStatus = TSDB_DB_STATUS_DROPPING; - if (mgmtUpdateDb(pDb) < 0) { - mError("db:%s drop failed, db sdb update error", pDb->name); - return TSDB_CODE_SDB_ERROR; - } - - mPrint("db:%s set to dropping status", pDb->name); - return 0; -} - -static bool mgmtCheckDropDbFinished(SDbObj *pDb) { - SVgObj *pVgroup = pDb->pHead; - while (pVgroup) { - for (int32_t i = 0; i < pVgroup->numOfVnodes; i++) { - SVnodeGid *pVnodeGid = pVgroup->vnodeGid + i; - SDnodeObj *pDnode = mgmtGetDnode(pVnodeGid->ip); - - if (pDnode == NULL) continue; - if (pDnode->status == TSDB_DN_STATUS_OFFLINE) continue; - - SVnodeLoad *pVload = &pDnode->vload[pVnodeGid->vnode]; - if (pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) { - mTrace("dnode:%s, vnode:%d db:%s wait dropping", taosIpStr(pDnode->privateIp), pVnodeGid->vnode, pDb->name); - return false; - } - } - pVgroup = pVgroup->next; - } - - mPrint("db:%s all vnodes drop finished", pDb->name); - return true; -} - -static void mgmtDropDbFromSdb(SDbObj *pDb) { - while (pDb->pHead) mgmtDropVgroup(pDb, pDb->pHead); - -// SSuperTableObj *pMetric = pDb->pSTable; -// while (pMetric) { -// SSuperTableObj *pNext = pMetric->next; -// mgmtDropTable(pDb, pMetric->tableId, 0); -// pMetric = pNext; -// } - - mPrint("db:%s all meters drop finished", pDb->name); - sdbDeleteRow(tsDbSdb, pDb); - mPrint("db:%s database drop finished", pDb->name); -} - -static int32_t mgmtDropDb(SDbObj *pDb) { - - if (pDb->dropStatus == TSDB_DB_STATUS_DROPPING) { - bool finished = mgmtCheckDropDbFinished(pDb); - if (!finished) { - SVgObj *pVgroup = pDb->pHead; - while (pVgroup != NULL) { - //mgmtSendDropVgroupMsg(pVgroup, NULL); - pVgroup = pVgroup->next; - } - return TSDB_CODE_ACTION_IN_PROGRESS; - } - - // don't sync this action - pDb->dropStatus = TSDB_DB_STATUS_DROP_FROM_SDB; - mgmtDropDbFromSdb(pDb); - return 0; - } else { - int32_t code = mgmtSetDbDropping(pDb); - if (code != 0) return code; - return TSDB_CODE_ACTION_IN_PROGRESS; - } -} - -UNUSED_FUNC -static int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists) { - SDbObj *pDb = (SDbObj *)sdbGetRow(tsDbSdb, name); - if (pDb == NULL) { - if (ignoreNotExists) return TSDB_CODE_SUCCESS; - mWarn("db:%s is not there", name); - return TSDB_CODE_INVALID_DB; - } - - if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { - return TSDB_CODE_MONITOR_DB_FORBIDDEN; - } - - return mgmtDropDb(pDb); -} - bool mgmtCheckIsMonitorDB(char *db, char *monitordb) { char dbName[TSDB_DB_NAME_LEN + 1] = {0}; extractDBName(db, dbName); @@ -430,20 +301,6 @@ bool mgmtCheckIsMonitorDB(char *db, char *monitordb) { return (strncasecmp(dbName, monitordb, len) == 0 && len == strlen(monitordb)); } -UNUSED_FUNC -static void mgmtMonitorDbDrop(void *unused, void *unusedt) { - void * pNode = NULL; - SDbObj *pDb = NULL; - - while (1) { - pNode = sdbFetchRow(tsDbSdb, pNode, (void **)&pDb); - if (pDb == NULL) break; - if (pDb->dropStatus != TSDB_DB_STATUS_DROPPING) continue; - mgmtDropDb(pDb); - break; - } -} - static int32_t mgmtAlterDb(SAcctObj *pAcct, SCMAlterDbMsg *pAlter) { return 0; // int32_t code = TSDB_CODE_SUCCESS; @@ -840,7 +697,6 @@ void *mgmtDbActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { pDb->pTail = NULL; pDb->numOfVgroups = 0; pDb->numOfTables = 0; - pDb->vgTimer = NULL; mgmtAddDbIntoAcct(pAcct, pDb); return NULL; @@ -851,6 +707,10 @@ void *mgmtDbActionDelete(void *row, char *str, int32_t size, int32_t *ssize) { SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); mgmtRemoveDbFromAcct(pAcct, pDb); + mgmtDropAllNormalTables(pDb); + mgmtDropAllChildTables(pDb); + mgmtDropAllSuperTables(pDb); + return NULL; } @@ -906,6 +766,10 @@ void mgmtRemoveTableFromDb(SDbObj *pDb) { atomic_add_fetch_32(&pDb->numOfTables, -1); } +static void mgmtSetDbDirty(SDbObj *pDb) { + pDb->dirty = true; +} + static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) { if (mgmtCheckRedirect(pMsg->thandle)) return; @@ -919,7 +783,6 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) { pCreate->commitTime = htonl(pCreate->commitTime); pCreate->blocksPerTable = htons(pCreate->blocksPerTable); pCreate->rowsInFileBlock = htonl(pCreate->rowsInFileBlock); - // pCreate->cacheNumOfBlocks = htonl(pCreate->cacheNumOfBlocks); int32_t code; if (mgmtCheckExpired()) { @@ -957,22 +820,76 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { mgmtSendSimpleResp(pMsg->thandle, code); } +static void mgmtDropDb(void *handle, void *tmrId) { + SQueuedMsg *newMsg = handle; + SDbObj *pDb = newMsg->ahandle; + mPrint("db:%s, drop db from sdb", pDb->name); + + int32_t code = sdbDeleteRow(tsDbSdb, pDb); + if (code != 0) { + code = TSDB_CODE_SDB_ERROR; + } + + mgmtSendSimpleResp(newMsg->thandle, code); + rpcFreeCont(newMsg->pCont); + free(newMsg); +} + static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { if (mgmtCheckRedirect(pMsg->thandle)) return; - int32_t code; - if (pMsg->pUser->superAuth) { - code = TSDB_CODE_OPS_NOT_SUPPORT; - //SCMDropDbMsg *pDrop = rpcMsg->pCont; - //rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists); - //if (rpcRsp.code == TSDB_CODE_SUCCESS) { - // mLPrint("DB:%s is dropped by %s", pDrop->db, pUser->user); - //} - } else { - code = TSDB_CODE_NO_RIGHTS; + SCMDropDbMsg *pDrop = pMsg->pCont; + mTrace("db:%s, drop db msg is received from thandle:%p", pDrop->db, pMsg->thandle); + + if (mgmtCheckExpired()) { + mError("db:%s, failed to drop, grant expired", pDrop->db); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED); + return; + } + + if (!pMsg->pUser->writeAuth) { + mError("db:%s, failed to drop, no rights", pDrop->db); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); + return; + } + + SDbObj *pDb = mgmtGetDb(pDrop->db); + if (pDb == NULL) { + if (pDrop->ignoreNotExists) { + mTrace("db:%s, db is not exist, think drop success", pDrop->db); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); + return; + } else { + mError("db:%s, failed to drop, invalid db", pDrop->db); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB); + return; + } + } + + if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { + mError("db:%s, can't drop monitor database", pDrop->db); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); + return; } - if (code != TSDB_CODE_SUCCESS) { - mgmtSendSimpleResp(pMsg->thandle, code); + mgmtSetDbDirty(pDb); + + SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); + memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); + pMsg->pCont = NULL; + + SVgObj *pVgroup = pDb->pHead; + if (pVgroup != NULL) { + mPrint("vgroup:%d, will be dropped", pVgroup->vgId); + newMsg->ahandle = pVgroup; + newMsg->expected = pVgroup->numOfVnodes; + mgmtDropVgroup(pVgroup, newMsg); + return; } + + mTrace("db:%s, all vgroups is dropped", pDb->name); + + void *tmpTmr; + newMsg->ahandle = pDb; + taosTmrReset(mgmtDropDb, 10, newMsg, tsMgmtTmr, &tmpTmr); } diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index eb511bcd44a74f7ba745c64a9e97f68183d91711..799d123ac3cdce6cdde6dcc15480531da444463e 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -125,7 +125,6 @@ void *mgmtNormalTableActionDelete(void *row, char *str, int32_t size, int32_t *s SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { - mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId); return NULL; } @@ -287,18 +286,19 @@ void mgmtCleanUpNormalTables() { sdbCloseTable(tsNormalTableSdb); } -static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) { +void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable) { int32_t totalCols = pTable->numOfColumns; int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen; SMDCreateTableMsg *pCreate = rpcMallocCont(contLen); if (pCreate == NULL) { + terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; return NULL; } memcpy(pCreate->tableId, pTable->tableId, TSDB_TABLE_ID_LEN + 1); pCreate->contLen = htonl(contLen); - pCreate->vgId = htonl(pVgroup->vgId); + pCreate->vgId = htonl(pTable->vgId); pCreate->tableType = pTable->type; pCreate->numOfColumns = htons(pTable->numOfColumns); pCreate->numOfTags = 0; @@ -319,30 +319,30 @@ static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgr } memcpy(pCreate + sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen); - return pCreate; } -int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, - SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) { +void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) { int32_t numOfTables = sdbGetNumOfRows(tsNormalTableSdb); if (numOfTables >= TSDB_MAX_NORMAL_TABLES) { mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_NORMAL_TABLES); - return TSDB_CODE_TOO_MANY_TABLES; + terrno = TSDB_CODE_TOO_MANY_TABLES; + return NULL; } SNormalTableObj *pTable = (SNormalTableObj *) calloc(sizeof(SNormalTableObj), 1); if (pTable == NULL) { mError("table:%s, failed to alloc memory", pCreate->tableId); - return TSDB_CODE_SERV_OUT_OF_MEMORY; + terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; + return NULL; } strcpy(pTable->tableId, pCreate->tableId); pTable->type = TSDB_NORMAL_TABLE; pTable->vgId = pVgroup->vgId; + pTable->createdTime = taosGetTimestampMs(); pTable->uid = (((uint64_t) pTable->createdTime) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul)); pTable->sid = sid; - pTable->createdTime = taosGetTimestampMs(); pTable->sversion = 0; pTable->numOfColumns = htons(pCreate->numOfColumns); pTable->sqlLen = htons(pCreate->sqlLen); @@ -352,7 +352,8 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb pTable->schema = (SSchema *) calloc(1, schemaSize); if (pTable->schema == NULL) { free(pTable); - return TSDB_CODE_SERV_OUT_OF_MEMORY; + terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; + return NULL; } memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema)); @@ -368,7 +369,8 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb pTable->sql = calloc(1, pTable->sqlLen); if (pTable->sql == NULL) { free(pTable); - return TSDB_CODE_SERV_OUT_OF_MEMORY; + terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; + return NULL; } memcpy(pTable->sql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pTable->sqlLen); pTable->sql[pTable->sqlLen - 1] = 0; @@ -378,65 +380,45 @@ int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgOb if (sdbInsertRow(tsNormalTableSdb, pTable, 0) < 0) { mError("table:%s, update sdb error", pTable->tableId); free(pTable); - return TSDB_CODE_SDB_ERROR; - } - - *pDCreateOut = mgmtBuildCreateNormalTableMsg(pTable, pVgroup); - if (*pDCreateOut == NULL) { - mError("table:%s, failed to build create table message", pTable->tableId); - sdbDeleteRow(tsNormalTableSdb, pTable); - return TSDB_CODE_SERV_OUT_OF_MEMORY; + terrno = TSDB_CODE_SDB_ERROR; + return NULL; } - *pTableOut = (STableInfo *) pTable; - mTrace("table:%s, create ntable in vgroup, uid:%" PRIu64 , pTable->tableId, pTable->uid); - return TSDB_CODE_SUCCESS; + return pTable; } -int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) { +int32_t mgmtDropNormalTable(SQueuedMsg *newMsg, SNormalTableObj *pTable) { SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { mError("table:%s, failed to drop normal table, vgroup not exist", pTable->tableId); return TSDB_CODE_OTHERS; } - SMDDropTableMsg *pRemove = rpcMallocCont(sizeof(SMDDropTableMsg)); - if (pRemove == NULL) { + SMDDropTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropTableMsg)); + if (pDrop == NULL) { mError("table:%s, failed to drop normal table, no enough memory", pTable->tableId); return TSDB_CODE_SERV_OUT_OF_MEMORY; } - strcpy(pRemove->tableId, pTable->tableId); - pRemove->sid = htonl(pTable->sid); - pRemove->uid = htobe64(pTable->uid); - - pRemove->numOfVPeers = htonl(pVgroup->numOfVnodes); - for (int i = 0; i < pVgroup->numOfVnodes; ++i) { - pRemove->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip); - pRemove->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); - } + strcpy(pDrop->tableId, pTable->tableId); + pDrop->contLen = htonl(sizeof(SMDDropTableMsg)); + pDrop->vgId = htonl(pVgroup->vgId); + pDrop->sid = htonl(pTable->sid); + pDrop->uid = htobe64(pTable->uid); SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - mTrace("table:%s, send drop table msg", pRemove->tableId); + mTrace("table:%s, send drop table msg", pDrop->tableId); SRpcMsg rpcMsg = { - .handle = 0, - .pCont = pRemove, + .handle = newMsg, + .pCont = pDrop, .contLen = sizeof(SMDDropTableMsg), .code = 0, .msgType = TSDB_MSG_TYPE_MD_DROP_TABLE }; - mgmtSendMsgToDnode(&ipSet, &rpcMsg); - - if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) { - mError("table:%s, update ntables sdb error", pTable->tableId); - return TSDB_CODE_SDB_ERROR; - } - - if (pVgroup->numOfTables <= 0) { - mgmtDropVgroup(pDb, pVgroup); - } + newMsg->ahandle = pTable; + mgmtSendMsgToDnode(&ipSet, &rpcMsg); return TSDB_CODE_SUCCESS; } @@ -557,14 +539,35 @@ int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMeta for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { if (usePublicIp) { pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; - pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); } else { pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip; - pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); } + pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId); } pMeta->numOfVpeers = pVgroup->numOfVnodes; return TSDB_CODE_SUCCESS; } +void mgmtDropAllNormalTables(SDbObj *pDropDb) { + void *pNode = NULL; + void *pLastNode = NULL; + int32_t numOfTables = 0; + int32_t dbNameLen = strlen(pDropDb->name); + SNormalTableObj *pTable = NULL; + + while (1) { + pNode = sdbFetchRow(tsNormalTableSdb, pNode, (void **)&pTable); + if (pTable == NULL) break; + + if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) { + sdbDeleteRow(tsNormalTableSdb, pTable); + pNode = pLastNode; + numOfTables ++; + continue; + } + } + + mTrace("db:%s, all normal tables:%d is dropped", pDropDb->name, numOfTables); +} diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 4970d8794af0293e92b2cc7c2babd602915e73bd..223e67ebfb8e5f4e2d9b1e8ac1c271b1b75360a4 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -242,6 +242,8 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) { } SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; + mTrace("show:%p, type:%s, retrieve data", pShow, taosGetShowTypeStr(pShow->type)); + if (!mgmtCheckQhandle(pRetrieve->qhandle)) { mError("pShow:%p, query memory is corrupted", pShow); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MEMORY_CORRUPTED); diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index 7778534424029ce029e4ec9e4f4d936fd5f606f3..a4539f7b9f2b6200e7b4aecdaaa9113902f16743 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -202,7 +202,7 @@ void mgmtCleanUpSuperTables() { sdbCloseTable(tsSuperTableSdb); } -int32_t mgmtCreateSuperTable(SDbObj *pDb, SCMCreateTableMsg *pCreate) { +int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) { int32_t numOfTables = sdbGetNumOfRows(tsSuperTableSdb); if (numOfTables >= TSDB_MAX_SUPER_TABLES) { mError("stable:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_SUPER_TABLES); @@ -250,11 +250,16 @@ int32_t mgmtCreateSuperTable(SDbObj *pDb, SCMCreateTableMsg *pCreate) { return TSDB_CODE_SUCCESS; } -int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pSuperTable) { - //TODO drop all child tables - - mgmtRemoveSuperTableFromDb(pDb); - return sdbDeleteRow(tsSuperTableSdb, pSuperTable); +int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pStable) { + if (pStable->numOfTables != 0) { + mError("stable:%s, numOfTables:%d not 0", pStable->tableId, pStable->numOfTables); + return TSDB_CODE_OTHERS; + } else { + //TODO: drop child tables + mError("stable:%s, is dropped from sdb", pStable->tableId); + mgmtRemoveSuperTableFromDb(pDb); + return TSDB_CODE_OTHERS; + } } void* mgmtGetSuperTable(char *tableId) { @@ -607,6 +612,30 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v return numOfRows; } +void mgmtDropAllSuperTables(SDbObj *pDropDb) { + void *pNode = NULL; + void *pLastNode = NULL; + int32_t numOfTables = 0; + int32_t dbNameLen = strlen(pDropDb->name); + SSuperTableObj *pTable = NULL; + + while (1) { + pNode = sdbFetchRow(tsSuperTableSdb, pNode, (void **)&pTable); + if (pTable == NULL) { + break; + } + + if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) { + sdbDeleteRow(tsSuperTableSdb, pTable); + pNode = pLastNode; + numOfTables ++; + continue; + } + } + + mTrace("db:%s, all super tables:%d is dropped", pDropDb->name, numOfTables); +} + void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable) { pStable->numOfTables++; } diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 49b0d3bca02f9b7e5024fc6b17534a1cdb99bf14..abf1d9162c216e7fe14b6081abc2c9bcbaaa3e65 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -51,6 +51,7 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *queueMsg); static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *queueMsg); static void mgmtProcessSuperTableMetaMsg(SQueuedMsg *queueMsg); static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg); +static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg); static int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle); @@ -82,6 +83,9 @@ int32_t mgmtInitTables() { mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateTableRsp); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropTableRsp); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, NULL); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, NULL); return TSDB_CODE_SUCCESS; } @@ -134,83 +138,6 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo return TSDB_CODE_SUCCESS; } -static void mgmtCreateTable(SVgObj *pVgroup, SQueuedMsg *pMsg) { - SCMCreateTableMsg *pCreate = pMsg->pCont; - - int32_t sid = taosAllocateId(pVgroup->idPool); - if (sid < 0) { - mTrace("tables:%s, no enough sid in vgroup:%d", pVgroup->vgId); - mgmtCreateVgroup(pMsg); - return; - } - - int32_t code; - STableInfo *pTable; - SMDCreateTableMsg *pMDCreate = NULL; - - if (pCreate->numOfColumns == 0) { - mTrace("table:%s, is a child table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg); - code = mgmtCreateChildTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable); - } else { - mTrace("table:%s, is a normal table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg); - code = mgmtCreateNormalTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable); - } - - if (code != TSDB_CODE_SUCCESS) { - mTrace("table:%s, failed to create in vgroup:%d", pCreate->tableId, pVgroup->vgId); - mgmtSendSimpleResp(pMsg->thandle, code); - return; - } - - SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - SRpcMsg rpcMsg = { - .handle = pMsg, - .pCont = pMDCreate, - .contLen = htonl(pMDCreate->contLen), - .code = 0, - .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE - }; - - pMsg->ahandle = pTable; - mgmtSendMsgToDnode(&ipSet, &rpcMsg); -} - -int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore) { - STableInfo *pTable = mgmtGetTable(tableId); - if (pTable == NULL) { - if (ignore) { - mTrace("table:%s, table is not exist, think it success", tableId); - return TSDB_CODE_SUCCESS; - } else { - mError("table:%s, failed to create table, table not exist", tableId); - return TSDB_CODE_INVALID_TABLE; - } - } - - if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { - mError("table:%s, failed to create table, in monitor database", tableId); - return TSDB_CODE_MONITOR_DB_FORBIDDEN; - } - - switch (pTable->type) { - case TSDB_SUPER_TABLE: - mTrace("table:%s, start to drop super table", tableId); - return mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable); - case TSDB_CHILD_TABLE: - mTrace("table:%s, start to drop child table", tableId); - return mgmtDropChildTable(pDb, (SChildTableObj *) pTable); - case TSDB_NORMAL_TABLE: - mTrace("table:%s, start to drop normal table", tableId); - return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable); - case TSDB_STREAM_TABLE: - mTrace("table:%s, start to drop stream table", tableId); - return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable); - default: - mError("table:%s, invalid table type:%d", tableId, pTable->type); - return TSDB_CODE_INVALID_TABLE; - } -} - int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter) { STableInfo *pTable = mgmtGetTable(pAlter->tableId); if (pTable == NULL) { @@ -419,13 +346,6 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void * return numOfRows; } -SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable) { - SMDDropTableMsg *pRemove = NULL; - - - return pRemove; -} - void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { if (mgmtCheckRedirect(pMsg->thandle)) return; @@ -474,7 +394,7 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { if (pCreate->numOfTags != 0) { mTrace("table:%s, is a super table", pCreate->tableId); - code = mgmtCreateSuperTable(pMsg->pDb, pCreate); + code = mgmtCreateSuperTable(pCreate); mgmtSendSimpleResp(pMsg->thandle, code); return; } @@ -494,29 +414,70 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { if (pVgroup == NULL) { mTrace("table:%s, start to create a new vgroup", pCreate->tableId); mgmtCreateVgroup(newMsg); + return; + } + + int32_t sid = taosAllocateId(pVgroup->idPool); + if (sid < 0) { + mTrace("tables:%s, no enough sid in vgroup:%d", pVgroup->vgId); + mgmtCreateVgroup(newMsg); + return; + } + + SMDCreateTableMsg *pMDCreate = NULL; + if (pCreate->numOfColumns == 0) { + mTrace("table:%s, is a child table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg); + pTable = mgmtCreateChildTable(pCreate, pVgroup, sid); + if (pTable == NULL) { + mgmtSendSimpleResp(pMsg->thandle, terrno); + return; + } + pMDCreate = mgmtBuildCreateChildTableMsg(pCreate, (SChildTableObj *) pTable); + if (pMDCreate == NULL) { + mgmtSendSimpleResp(pMsg->thandle, terrno); + return; + } } else { - mTrace("table:%s, vgroup:%d is selected", pCreate->tableId, pVgroup->vgId); - mgmtCreateTable(pVgroup, newMsg); + mTrace("table:%s, is a normal table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg); + pTable = mgmtCreateNormalTable(pCreate, pVgroup, sid); + if (pTable == NULL) { + mgmtSendSimpleResp(pMsg->thandle, terrno); + return; + } + pMDCreate = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable); + if (pMDCreate == NULL) { + mgmtSendSimpleResp(pMsg->thandle, terrno); + return; + } } + + SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); + SRpcMsg rpcMsg = { + .handle = newMsg, + .pCont = pMDCreate, + .contLen = htonl(pMDCreate->contLen), + .code = 0, + .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE + }; + + newMsg->ahandle = pTable; + mgmtSendMsgToDnode(&ipSet, &rpcMsg); } void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { - SCMDropTableMsg *pDrop = pMsg->pCont; + if (mgmtCheckRedirect(pMsg->thandle)) return; - if (mgmtCheckRedirect(pMsg->thandle)) { - mError("thandle:%p, failed to drop table:%s, need redirect message", pMsg->thandle, pDrop->tableId); - return; - } + SCMDropTableMsg *pDrop = pMsg->pCont; + mTrace("table:%s, drop table msg is received from thandle:%p", pDrop->tableId, pMsg->thandle); - SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); - if (pUser == NULL) { - mError("table:%s, failed to drop table, invalid user", pDrop->tableId); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); + if (mgmtCheckExpired()) { + mError("table:%s, failed to drop, grant expired", pDrop->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED); return; } - if (!pUser->writeAuth) { - mError("table:%s, failed to drop table, no rights", pDrop->tableId); + if (!pMsg->pUser->writeAuth) { + mError("table:%s, failed to drop, no rights", pDrop->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); return; } @@ -528,8 +489,54 @@ void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { return; } - int32_t code = mgmtDropTable(pDb, pDrop->tableId, pDrop->igNotExists); - if (code != TSDB_CODE_ACTION_IN_PROGRESS) { + STableInfo *pTable = mgmtGetTable(pDrop->tableId); + if (pTable == NULL) { + if (pDrop->igNotExists) { + mTrace("table:%s, table is not exist, think drop success", pDrop->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); + return; + } else { + mError("table:%s, failed to drop table, table not exist", pDrop->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); + return; + } + } + + if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { + mError("table:%s, failed to drop table, in monitor database", pDrop->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); + return; + } + + SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); + memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); + pMsg->pCont = NULL; + int32_t code; + + switch (pTable->type) { + case TSDB_SUPER_TABLE: + mTrace("table:%s, start to drop super table", pDrop->tableId); + code = mgmtDropSuperTable(newMsg, pDb, (SSuperTableObj *) pTable); + break; + case TSDB_CHILD_TABLE: + mTrace("table:%s, start to drop child table", pDrop->tableId); + code = mgmtDropChildTable(newMsg, (SChildTableObj *) pTable); + break; + case TSDB_NORMAL_TABLE: + mTrace("table:%s, start to drop normal table", pDrop->tableId); + code = mgmtDropNormalTable(newMsg, (SNormalTableObj *) pTable); + break; + case TSDB_STREAM_TABLE: + mTrace("table:%s, start to drop stream table", pDrop->tableId); + code = mgmtDropNormalTable(newMsg, (SNormalTableObj *) pTable); + break; + default: + code = TSDB_CODE_INVALID_TABLE_TYPE; + mError("table:%s, invalid table type:%d", pDrop->tableId, pTable->type); + } + + if (code != TSDB_CODE_SUCCESS) { + free(newMsg); mgmtSendSimpleResp(pMsg->thandle, code); } } @@ -775,3 +782,52 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) { free(queueMsg); } + +static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { + if (rpcMsg->handle == NULL) return; + + SQueuedMsg *queueMsg = rpcMsg->handle; + queueMsg->received++; + + STableInfo *pTable = queueMsg->ahandle; + mTrace("table:%s, drop table rsp received, thandle:%p result:%s", pTable->tableId, queueMsg->thandle, tstrerror(rpcMsg->code)); + + if (rpcMsg->code != TSDB_CODE_SUCCESS) { + mError("table:%s, failed to drop in dnode, reason:%s", pTable->tableId, tstrerror(rpcMsg->code)); + mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); + free(queueMsg); + return; + } + + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + mError("table:%s, failed to get vgroup", pTable->tableId); + mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_INVALID_VGROUP_ID); + free(queueMsg); + return; + } + + if (pTable->type == TSDB_CHILD_TABLE) { + if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) { + mError("table:%s, update ctables sdb error", pTable->tableId); + mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); + free(queueMsg); + return; + } + } else if (pTable->type == TSDB_NORMAL_TABLE){ + if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) { + mError("table:%s, update ntables sdb error", pTable->tableId); + mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); + free(queueMsg); + return; + } + } + + if (pVgroup->numOfTables <= 0) { + mPrint("vgroup:%d, all tables is dropped, drop vgroup", pVgroup->vgId); + mgmtDropVgroup(pVgroup, NULL); + } + + mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SUCCESS); + free(queueMsg); +} diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index a8c701a2137b0957b4f890032b44deb3f5ce0bb7..c2f3e199517d2e13b00d382087ec72969c3aa589 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -44,8 +44,10 @@ static void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t static int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg); +static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg); -void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); +static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle); +static void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); static void mgmtVgroupActionInit() { SVgObj tObj; @@ -119,6 +121,7 @@ int32_t mgmtInitVgroups() { mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp); mTrace("vgroup is initialized"); return 0; @@ -135,18 +138,6 @@ SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) { return pDb->pHead; } -void mgmtProcessVgTimer(void *handle, void *tmrId) { - SDbObj *pDb = (SDbObj *)handle; - if (pDb == NULL) return; - - if (pDb->vgStatus > TSDB_VG_STATUS_IN_PROGRESS) { - mTrace("db:%s, set vgroup status from %d to ready", pDb->name, pDb->vgStatus); - pDb->vgStatus = TSDB_VG_STATUS_READY; - } - - pDb->vgTimer = NULL; -} - void mgmtCreateVgroup(SQueuedMsg *pMsg) { SDbObj *pDb = pMsg->pDb; if (pDb == NULL) { @@ -185,25 +176,14 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) { mgmtSendCreateVgroupMsg(pVgroup, pMsg); } -int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { - STableInfo *pTable; - - if (pVgroup->numOfTables > 0) { - for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) { - if (pVgroup->tableList != NULL) { - pTable = pVgroup->tableList[i]; - if (pTable) mgmtDropTable(pDb, pTable->tableId, 0); - } - } +void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle) { + if (ahandle != NULL) { + mgmtSendDropVgroupMsg(pVgroup, ahandle); + } else { + mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes); + mgmtSendDropVgroupMsg(pVgroup, NULL); + sdbDeleteRow(tsVgroupSdb, pVgroup); } - - mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); - - //mgmtSendDropVgroupMsg(pVgroup, NULL); - - sdbDeleteRow(tsVgroupSdb, pVgroup); - - return TSDB_CODE_SUCCESS; } void mgmtSetVgroupIdPool() { @@ -632,5 +612,67 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); } + free(queueMsg); +} + +static SMDDropVnodeMsg *mgmtBuildDropVnodeMsg(SVgObj *pVgroup) { + SMDDropVnodeMsg *pDrop = rpcMallocCont(sizeof(SMDDropVnodeMsg)); + if (pDrop == NULL) return NULL; + + pDrop->vgId = htonl(pVgroup->vgId); + return pDrop; +} + +static void mgmtSendDropVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { + mTrace("vgroup:%d, send drop vnode msg, ahandle:%p", pVgroup->vgId, ahandle); + SMDDropVnodeMsg *pDrop = mgmtBuildDropVnodeMsg(pVgroup); + SRpcMsg rpcMsg = { + .handle = ahandle, + .pCont = pDrop, + .contLen = pDrop ? sizeof(SMDDropVnodeMsg) : 0, + .code = 0, + .msgType = TSDB_MSG_TYPE_MD_DROP_VNODE + }; + mgmtSendMsgToDnode(ipSet, &rpcMsg); +} + +static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) { + mTrace("vgroup:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle); + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { + SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); + mgmtSendDropVnodeMsg(pVgroup, &ipSet, ahandle); + } +} + +static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { + mTrace("drop vnode msg is received"); + if (rpcMsg->handle == NULL) return; + + SQueuedMsg *queueMsg = rpcMsg->handle; + queueMsg->received++; + if (rpcMsg->code == TSDB_CODE_SUCCESS) { + queueMsg->code = rpcMsg->code; + queueMsg->successed++; + } + + SVgObj *pVgroup = queueMsg->ahandle; + mTrace("vgroup:%d, drop vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p", + pVgroup->vgId, tstrerror(rpcMsg->code), queueMsg->received, queueMsg->successed, queueMsg->expected, + queueMsg->thandle, rpcMsg->handle); + + if (queueMsg->received != queueMsg->expected) return; + + sdbDeleteRow(tsVgroupSdb, pVgroup); + + SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg)); + newMsg->msgType = queueMsg->msgType; + newMsg->thandle = queueMsg->thandle; + newMsg->pDb = queueMsg->pDb; + newMsg->pUser = queueMsg->pUser; + newMsg->contLen = queueMsg->contLen; + newMsg->pCont = rpcMallocCont(newMsg->contLen); + memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen); + mgmtAddToShellQueue(newMsg); + free(queueMsg); } \ No newline at end of file diff --git a/src/vnode/tsdb/CMakeLists.txt b/src/vnode/tsdb/CMakeLists.txt index 8a7c7a1a5197e3e47ed7e36cdb2ebcdcef2d6b49..b2154969d6209243511768f43686e2b47d787936 100644 --- a/src/vnode/tsdb/CMakeLists.txt +++ b/src/vnode/tsdb/CMakeLists.txt @@ -15,5 +15,5 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) TARGET_LINK_LIBRARIES(tsdb common tutil) # Someone has no gtest directory, so comment it - ADD_SUBDIRECTORY(tests) + # ADD_SUBDIRECTORY(tests) ENDIF ()