diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 25430b91e420f732c3ccc8758eeea90824b94e19..8d35e6afd9004d2a51acd1d2f1a0f2962e39d4e3 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -425,7 +425,7 @@ static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) { SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont; SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - dTrace("stable:%s, start to drop in dnode, vgroup:%d", pTable->tableId, pTable->vgId); + dTrace("stable:%s, start to it drop in dnode, vgroup:%d", pTable->tableId, pTable->vgId); pTable->uid = htobe64(pTable->uid); // TODO: drop stable in vvnode diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 7a07a81d262cc567a6d19b543de3738b0e7d08bd..dd996c74018c0c190e6405cd00fdbde99ad35744 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -77,6 +77,7 @@ typedef struct { } SDnodeObj; typedef struct { + int32_t dnodeId; uint32_t ip; uint32_t publicIp; int32_t vnode; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 6a96b658bc6f0ca0f2ab80910b3af587750011fb..ca8ce136f87c2c52f777de710b420821f34d4f7c 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -352,8 +352,7 @@ typedef struct { } SMDDropSTableMsg; typedef struct { - int32_t vgId; - int32_t vnode; + int32_t vgId; } SMDDropVnodeMsg; typedef struct SColIndexEx { diff --git a/src/mnode/inc/mgmtChildTable.h b/src/mnode/inc/mgmtChildTable.h index 7847357d65eb2bb77506400ef10fba94d074d8c2..20f6ca9c8db3394162e39889b7560139fa86f0fa 100644 --- a/src/mnode/inc/mgmtChildTable.h +++ b/src/mnode/inc/mgmtChildTable.h @@ -33,7 +33,7 @@ void * mgmtGetChildTable(char *tableId); void *mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pCreate, SChildTableObj *pTable); -int32_t mgmtDropChildTable(SChildTableObj *pTable); +int32_t mgmtDropChildTable(SQueuedMsg *newMsg, SChildTableObj *pTable); int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent); int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *pMeta, bool usePublicIp); diff --git a/src/mnode/inc/mgmtNormalTable.h b/src/mnode/inc/mgmtNormalTable.h index 0577647d2231b5d2a34fa6f581933ed61096bb4c..6f7bd5321b88eb34a72a046c2c94601ec41b8653 100644 --- a/src/mnode/inc/mgmtNormalTable.h +++ b/src/mnode/inc/mgmtNormalTable.h @@ -31,7 +31,7 @@ void * mgmtGetNormalTable(char *tableId); void * mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); void * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable); -int32_t mgmtDropNormalTable(SNormalTableObj *pTable); +int32_t mgmtDropNormalTable(SQueuedMsg *newMsg, SNormalTableObj *pTable); int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols); int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName); diff --git a/src/mnode/inc/mgmtSuperTable.h b/src/mnode/inc/mgmtSuperTable.h index ba66ebe582e7860eed2201c20e92e1f2acfd616e..2acf34144f00c383f1706cf2e149504b2c3d9cda 100644 --- a/src/mnode/inc/mgmtSuperTable.h +++ b/src/mnode/inc/mgmtSuperTable.h @@ -32,7 +32,7 @@ void mgmtCleanUpSuperTables(); void * mgmtGetSuperTable(char *tableId); int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate); -int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pTable); +int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pTable); int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags); int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName); int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pTable, char *oldTagName, char *newTagName); diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 10f404f38680de974b866a252e009870e612a199..524cc87460581740cc6ccc54360c78903ba3ae92 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -41,7 +41,6 @@ void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable); SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode); void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle); -void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup); SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip); diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 6544d969a89ca3ec6a8a1c5e67bbfcce12f0a407..0c6d964ae7f58e499e811eb8daf4930cb3d52e70 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -337,11 +337,11 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t strcpy(pTable->tableId, pCreate->tableId); strcpy(pTable->superTableId, pSuperTable->tableId); pTable->type = TSDB_CHILD_TABLE; + pTable->createdTime = taosGetTimestampMs(); pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul)); pTable->sid = tid; pTable->vgId = pVgroup->vgId; - pTable->createdTime = taosGetTimestampMs(); pTable->superTable = pSuperTable; if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) { @@ -355,7 +355,7 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t return pTable; } -int32_t mgmtDropChildTable(SChildTableObj *pTable) { +int32_t mgmtDropChildTable(SQueuedMsg *newMsg, SChildTableObj *pTable) { SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { mError("table:%s, failed to drop child table, vgroup not exist", pTable->tableId); @@ -378,13 +378,14 @@ int32_t mgmtDropChildTable(SChildTableObj *pTable) { mTrace("table:%s, send drop table msg", pDrop->tableId); SRpcMsg rpcMsg = { - .handle = 0, + .handle = newMsg, .pCont = pDrop, .contLen = sizeof(SMDDropTableMsg), .code = 0, .msgType = TSDB_MSG_TYPE_MD_DROP_TABLE }; + newMsg->ahandle = pTable; mgmtSendMsgToDnode(&ipSet, &rpcMsg); return TSDB_CODE_SUCCESS; diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 92699b69a6edd7ba74dc7861d2a31cf5975566c6..ca441bfb2e154923210da3e46cb982725b070e1c 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -962,7 +962,6 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { int32_t code; if (pMsg->pUser->superAuth) { - code = TSDB_CODE_OPS_NOT_SUPPORT; //SCMDropDbMsg *pDrop = rpcMsg->pCont; //rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists); //if (rpcRsp.code == TSDB_CODE_SUCCESS) { diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index 5328945bb93bab5c0a1617402f7ee729248cfc10..4ff99308a0ec0dd9445c6fa0c02857e62351c39a 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -341,9 +341,9 @@ void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t strcpy(pTable->tableId, pCreate->tableId); pTable->type = TSDB_NORMAL_TABLE; pTable->vgId = pVgroup->vgId; + pTable->createdTime = taosGetTimestampMs(); pTable->uid = (((uint64_t) pTable->createdTime) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul)); pTable->sid = sid; - pTable->createdTime = taosGetTimestampMs(); pTable->sversion = 0; pTable->numOfColumns = htons(pCreate->numOfColumns); pTable->sqlLen = htons(pCreate->sqlLen); @@ -389,7 +389,7 @@ void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t return pTable; } -int32_t mgmtDropNormalTable(SNormalTableObj *pTable) { +int32_t mgmtDropNormalTable(SQueuedMsg *newMsg, SNormalTableObj *pTable) { SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { mError("table:%s, failed to drop normal table, vgroup not exist", pTable->tableId); @@ -411,13 +411,14 @@ int32_t mgmtDropNormalTable(SNormalTableObj *pTable) { SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); mTrace("table:%s, send drop table msg", pDrop->tableId); SRpcMsg rpcMsg = { - .handle = 0, + .handle = newMsg, .pCont = pDrop, .contLen = sizeof(SMDDropTableMsg), .code = 0, .msgType = TSDB_MSG_TYPE_MD_DROP_TABLE }; + newMsg->ahandle = pTable; mgmtSendMsgToDnode(&ipSet, &rpcMsg); return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index 2f3debf1589fa22adc0667e9352dd500ad705f6b..02cb466d0520123f86827784697361b470bd1c59 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -250,7 +250,7 @@ int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) { return TSDB_CODE_SUCCESS; } -int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pStable) { +int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pStable) { if (pStable->numOfTables != 0) { mError("stable:%s, numOfTables:%d not 0", pStable->tableId, pStable->numOfTables); return TSDB_CODE_OTHERS; diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 50d1697cd8187f580b657508d170b1c10d608d99..d77d9cbb50274c1b2e4980f1f837afd1c4c41055 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -515,23 +515,27 @@ void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { return; } + SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); + memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); + pMsg->pCont = NULL; int32_t code; + switch (pTable->type) { case TSDB_SUPER_TABLE: mTrace("table:%s, start to drop super table", pDrop->tableId); - code = mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable); + code = mgmtDropSuperTable(newMsg, pDb, (SSuperTableObj *) pTable); break; case TSDB_CHILD_TABLE: mTrace("table:%s, start to drop child table", pDrop->tableId); - code = mgmtDropChildTable((SChildTableObj *) pTable); + code = mgmtDropChildTable(newMsg, (SChildTableObj *) pTable); break; case TSDB_NORMAL_TABLE: mTrace("table:%s, start to drop normal table", pDrop->tableId); - code = mgmtDropNormalTable((SNormalTableObj *) pTable); + code = mgmtDropNormalTable(newMsg, (SNormalTableObj *) pTable); break; case TSDB_STREAM_TABLE: mTrace("table:%s, start to drop stream table", pDrop->tableId); - code = mgmtDropNormalTable((SNormalTableObj *) pTable); + code = mgmtDropNormalTable(newMsg, (SNormalTableObj *) pTable); break; default: code = TSDB_CODE_INVALID_TABLE_TYPE; @@ -539,6 +543,7 @@ void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { } if (code != TSDB_CODE_SUCCESS) { + free(newMsg); mgmtSendSimpleResp(pMsg->thandle, code); } } @@ -788,39 +793,48 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) { static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { if (rpcMsg->handle == NULL) return; - STableInfo *pTable = rpcMsg->handle; - mTrace("table:%s, drop table rsp received, thandle:%p result:%s", pTable->tableId, rpcMsg->handle, tstrerror(rpcMsg->code)); + SQueuedMsg *queueMsg = rpcMsg->handle; + queueMsg->received++; + + STableInfo *pTable = queueMsg->ahandle; + mTrace("table:%s, drop table rsp received, thandle:%p result:%s", pTable->tableId, queueMsg->thandle, tstrerror(rpcMsg->code)); if (rpcMsg->code != TSDB_CODE_SUCCESS) { mError("table:%s, failed to drop in dnode, reason:%s", pTable->tableId, tstrerror(rpcMsg->code)); - mgmtSendSimpleResp(rpcMsg->handle, rpcMsg->code); + mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); + free(queueMsg); return; - } else { - SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); - if (pVgroup == NULL) { - mError("table:%s, failed to get vgroup", pTable->tableId); - mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_VGROUP_ID); + } + + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + mError("table:%s, failed to get vgroup", pTable->tableId); + mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_INVALID_VGROUP_ID); + free(queueMsg); + return; + } + + if (pTable->type == TSDB_CHILD_TABLE) { + if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) { + mError("table:%s, update ctables sdb error", pTable->tableId); + mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); + free(queueMsg); return; } - - if (pTable->type == TSDB_CHILD_TABLE) { - if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) { - mError("table:%s, update ctables sdb error", pTable->tableId); - mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SDB_ERROR); - return; - } - } else if (pTable->type == TSDB_NORMAL_TABLE){ - if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) { - mError("table:%s, update ntables sdb error", pTable->tableId); - mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SDB_ERROR); - return; - } + } else if (pTable->type == TSDB_NORMAL_TABLE){ + if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) { + mError("table:%s, update ntables sdb error", pTable->tableId); + mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); + free(queueMsg); + return; } + } - if (pVgroup->numOfTables <= 0) { - mgmtDropVgroup(pVgroup); - } + if (pVgroup->numOfTables <= 0) { + mPrint("vgroup:%d, all tables is dropped, drop vgroup", pVgroup->vgId); + mgmtDropVgroup(pVgroup); } - mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS); + mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SUCCESS); + free(queueMsg); } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 7b3fb02e8a8fb305c55491d3e1be71a017b92e4f..e700fcd877bdaa76ec790dc914f671aa88932873 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -44,8 +44,10 @@ static void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t static int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg); +static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg); -void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); +static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle); +static void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); static void mgmtVgroupActionInit() { SVgObj tObj; @@ -119,6 +121,7 @@ int32_t mgmtInitVgroups() { mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp); mTrace("vgroup is initialized"); return 0; @@ -186,7 +189,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) { } int32_t mgmtDropVgroup(SVgObj *pVgroup) { - STableInfo *pTable; +// STableInfo *pTable; if (pVgroup->numOfTables > 0) { // for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) { @@ -197,12 +200,9 @@ int32_t mgmtDropVgroup(SVgObj *pVgroup) { // } } - mTrace("vgroup:%d, replica:%d is deleted", pVgroup->vgId, pVgroup->numOfVnodes); - - //mgmtSendDropVgroupMsg(pVgroup, NULL); - + mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes); + mgmtSendDropVgroupMsg(pVgroup, NULL); sdbDeleteRow(tsVgroupSdb, pVgroup); - return TSDB_CODE_SUCCESS; } @@ -633,4 +633,37 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { } free(queueMsg); +} + +static SMDDropVnodeMsg *mgmtBuildDropVnodeMsg(SVgObj *pVgroup) { + SMDDropVnodeMsg *pDrop = rpcMallocCont(sizeof(SMDDropVnodeMsg)); + if (pDrop == NULL) return NULL; + + pDrop->vgId = htonl(pVgroup->vgId); + return pDrop; +} + +static void mgmtSendDropVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { + mTrace("vgroup:%d, send drop vnode msg, ahandle:%p", pVgroup->vgId, ahandle); + SMDDropVnodeMsg *pDrop = mgmtBuildDropVnodeMsg(pVgroup); + SRpcMsg rpcMsg = { + .handle = ahandle, + .pCont = pDrop, + .contLen = pDrop ? sizeof(SMDDropVnodeMsg) : 0, + .code = 0, + .msgType = TSDB_MSG_TYPE_MD_DROP_VNODE + }; + mgmtSendMsgToDnode(ipSet, &rpcMsg); +} + +static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) { + mTrace("vgroup:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle); + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { + SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); + mgmtSendDropVnodeMsg(pVgroup, &ipSet, ahandle); + } +} + +static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { + mTrace("drop vnode msg is received"); } \ No newline at end of file diff --git a/src/vnode/tsdb/CMakeLists.txt b/src/vnode/tsdb/CMakeLists.txt index 8a7c7a1a5197e3e47ed7e36cdb2ebcdcef2d6b49..b2154969d6209243511768f43686e2b47d787936 100644 --- a/src/vnode/tsdb/CMakeLists.txt +++ b/src/vnode/tsdb/CMakeLists.txt @@ -15,5 +15,5 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) TARGET_LINK_LIBRARIES(tsdb common tutil) # Someone has no gtest directory, so comment it - ADD_SUBDIRECTORY(tests) + # ADD_SUBDIRECTORY(tests) ENDIF ()