diff --git a/src/dnode/inc/dnodeWrite.h b/src/dnode/inc/dnodeWrite.h index ec0aea73ac043e468fa163c67bff5b7f276acd12..03fd81640154da61c3216f3b70928582d1fb5a56 100644 --- a/src/dnode/inc/dnodeWrite.h +++ b/src/dnode/inc/dnodeWrite.h @@ -43,7 +43,7 @@ int32_t dnodeCreateTable(SDCreateTableMsg *pTable); /* * Remove table from local repository */ -int32_t dnodeDropTable(int32_t vnode, int32_t sid, uint64_t uid); +int32_t dnodeDropTable(SDRemoveTableMsg *pTable); /* * Create stream diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index a5e42845764eff7e7ea1469c64b4a3f8a54ed590..1b49ce5f3260068df6d494243962e84f5f276d31 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -33,7 +33,7 @@ void (*dnodeProcessStatusRspFp)(int8_t *pCont, int32_t contLen, int8_t msgTyp void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL; void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL; -static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn); +static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, int8_t msgType, void *pConn); static void dnodeInitProcessShellMsg(); static void dnodeSendMsgToMnodeQueueFp(SSchedMsg *sched) { @@ -121,26 +121,25 @@ void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void //rpcFreeCont(pCont); } -void dnodeProcessTableCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { +void dnodeProcessTableCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { int32_t code = htonl(*((int32_t *) pCont)); if (code == TSDB_CODE_SUCCESS) { SDCreateTableMsg *table = (SDCreateTableMsg *) (pCont + sizeof(int32_t)); dnodeCreateTable(table); } else if (code == TSDB_CODE_INVALID_TABLE_ID) { - SDRemoveTableMsg *table = (SDRemoveTableMsg *) (pCont + sizeof(int32_t)); - int32_t vnode = htonl(table->vnode); - int32_t sid = htonl(table->sid); - uint64_t uid = htobe64(table->uid); - dError("vnode:%d, sid:%d table is not configured, remove it", vnode, sid); - dnodeDropTable(vnode, sid, uid); + SDRemoveTableMsg *pTable = (SDRemoveTableMsg *) (pCont + sizeof(int32_t)); + pTable->sid = htonl(pTable->sid); + pTable->uid = htobe64(pTable->uid); + dError("table:%s, sid:%d table is not configured, remove it", pTable->tableId, pTable->sid); + dnodeDropTable(pTable); } else { dError("code:%d invalid message", code); } } -void dnodeProcessCreateTableRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { - SDCreateTableMsg *pTable = (SDCreateTableMsg *) pCont; +void dnodeProcessCreateTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { + SDCreateTableMsg *pTable = pCont; pTable->numOfColumns = htons(pTable->numOfColumns); pTable->numOfTags = htons(pTable->numOfTags); pTable->sid = htonl(pTable->sid); @@ -170,24 +169,28 @@ void dnodeProcessCreateTableRequest(int8_t *pCont, int32_t contLen, int8_t msgTy dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } -void dnodeProcessAlterStreamRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { +void dnodeProcessAlterStreamRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { SAlterStreamMsg *stream = (SAlterStreamMsg *) pCont; int32_t code = dnodeCreateStream(stream); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } -void dnodeProcessRemoveTableRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { - SDRemoveTableMsg *table = (SDRemoveTableMsg *) pCont; - int32_t vnode = htonl(table->vnode); - int32_t sid = htonl(table->sid); - uint64_t uid = htobe64(table->uid); +void dnodeProcessRemoveTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { + SDRemoveTableMsg *pTable = pCont; + pTable->sid = htonl(pTable->sid); + pTable->numOfVPeers = htonl(pTable->numOfVPeers); + pTable->uid = htobe64(pTable->uid); - dPrint("vnode:%d, sid:%d table is not configured, remove it", vnode, sid); - int32_t code = dnodeDropTable(vnode, sid, uid); + for (int i = 0; i < pTable->numOfVPeers; ++i) { + pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip); + pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode); + } + + int32_t code = dnodeDropTable(pTable); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } -void dnodeProcessVPeerCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { +void dnodeProcessVPeerCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { int32_t code = htonl(*((int32_t *) pCont)); if (code == TSDB_CODE_SUCCESS) { @@ -204,7 +207,7 @@ void dnodeProcessVPeerCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType, voi } } -void dnodeProcessVPeersMsg(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { +void dnodeProcessVPeersMsg(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { SVPeersMsg *vpeer = (SVPeersMsg *) pCont; int32_t vnode = htonl(vpeer->vnode); @@ -214,7 +217,7 @@ void dnodeProcessVPeersMsg(int8_t *pCont, int32_t contLen, int8_t msgType, void dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } -void dnodeProcessFreeVnodeRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { +void dnodeProcessFreeVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) pCont; int32_t vnode = htonl(vpeer->vnode); @@ -224,7 +227,7 @@ void dnodeProcessFreeVnodeRequest(int8_t *pCont, int32_t contLen, int8_t msgType dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } -void dnodeProcessDnodeCfgRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) { +void dnodeProcessDnodeCfgRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont; int32_t code = tsCfgDynamicOptions(pCfg->config); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index f466b4f3f0e59ff911d6ce82a2f542698c930ff3..0608d8f45b4186623db2f036aaa01c829651fa9c 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -69,7 +69,8 @@ int32_t dnodeCreateTable(SDCreateTableMsg *pTable) { /* * Remove table from local repository */ -int32_t dnodeDropTable(int32_t vnode, int32_t sid, uint64_t uid) { +int32_t dnodeDropTable(SDRemoveTableMsg *pTable) { + dPrint("table:%s, sid:%d will be removed", pTable->tableId, pTable->sid); return TSDB_CODE_SUCCESS; } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 2661712adb81a1b3141fe8b3069195d0e0438173..8ea4968b6858f0797a04cb5f8d62070ca35f6506 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -329,10 +329,11 @@ typedef struct { } SMgmtHead; typedef struct { - short vnode; - int32_t sid; - uint64_t uid; - char tableId[TSDB_TABLE_ID_LEN + 1]; + int32_t sid; + int32_t numOfVPeers; + uint64_t uid; + SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS]; + char tableId[TSDB_TABLE_ID_LEN + 1]; } SDRemoveTableMsg; typedef struct { diff --git a/src/mnode/inc/mgmtChildTable.h b/src/mnode/inc/mgmtChildTable.h index d589e9bc97bacba3639a6cb856e790ddc2e7c17a..83c18ca46cb210c5c3c21b5eba9e7ae4a722c541 100644 --- a/src/mnode/inc/mgmtChildTable.h +++ b/src/mnode/inc/mgmtChildTable.h @@ -28,13 +28,11 @@ extern "C" { int32_t mgmtInitChildTables(); void mgmtCleanUpChildTables(); - void * mgmtGetChildTable(char *tableId); int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut); int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable); -int32_t mgmtAlterChildTable(SDbObj *pDb, SAlterTableMsg *pAlter); int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent); int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *pMeta, bool usePublicIp); diff --git a/src/mnode/inc/mgmtDnodeInt.h b/src/mnode/inc/mgmtDnodeInt.h index fd1ba85b172c3b87c53949514e91508721bc73df..2ba85d24fa435e646a45bfb67fe8da53a7277030 100644 --- a/src/mnode/inc/mgmtDnodeInt.h +++ b/src/mnode/inc/mgmtDnodeInt.h @@ -27,10 +27,10 @@ extern "C" { extern void *mgmtStatusTimer; void mgmtSendCreateTableMsg(SDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle); -void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle); +void mgmtSendRemoveTableMsg(SDRemoveTableMsg *pRemove, SRpcIpSet *ipSet, void *ahandle); void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle); void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle); -void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *table_info); +void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle); diff --git a/src/mnode/inc/mgmtNormalTable.h b/src/mnode/inc/mgmtNormalTable.h index 270e4998ae792e2422516d6861496837dd34183c..e632b8b98759029d8f1217695ef5c61939379932 100644 --- a/src/mnode/inc/mgmtNormalTable.h +++ b/src/mnode/inc/mgmtNormalTable.h @@ -26,14 +26,13 @@ extern "C" { int32_t mgmtInitNormalTables(); void mgmtCleanUpNormalTables(); - void * mgmtGetNormalTable(char *tableId); -int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid, SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut); +int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, + SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut); int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable); int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols); int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName); -int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable); int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMeta *pMeta, bool usePublicIp); diff --git a/src/mnode/inc/mgmtSuperTable.h b/src/mnode/inc/mgmtSuperTable.h index 9938b6b43979e6dd0ac41049ad1d14b8107fbee2..4709772e84ab69575ea165e2c24d50437486f1f3 100644 --- a/src/mnode/inc/mgmtSuperTable.h +++ b/src/mnode/inc/mgmtSuperTable.h @@ -46,7 +46,6 @@ void * mgmtGetSuperTableVgroup(SSuperTableObj *pStable); int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName); int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable); -int32_t mgmtGetTagsLength(SSuperTableObj *pSuperTable, int32_t col); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index d5f91a4959dfa5046be26bec864dfc444e119cd0..974203fb421ffc02f8eff3384fc8ff96e7222de0 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -31,7 +31,7 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid); int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp); int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo); -void mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle); +int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle); int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore); int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter); int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); @@ -45,6 +45,8 @@ void mgmtSetTableDirty(STableInfo *pTable, bool isDirty); SDRemoveTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable); SDRemoveSuperTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable); +void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle); + #ifdef __cplusplus } #endif diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 8cd88a73199fe3e9abce15344b568554541c77b9..e57b83724f5b7b44e77296d8cd8ddfafa8b9797f 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -370,18 +370,38 @@ int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj * int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) { SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { + mError("table:%s, failed to drop child table, vgroup not exist", pTable->tableId); return TSDB_CODE_OTHERS; } -// mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup); + SDRemoveTableMsg *pRemove = rpcMallocCont(sizeof(SDRemoveTableMsg)); + if (pRemove == NULL) { + mError("table:%s, failed to drop child table, no enough memory", pTable->tableId); + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + + pRemove->sid = htonl(pTable->sid); + pRemove->uid = htobe64(pTable->uid); + + pRemove->numOfVPeers = htonl(pVgroup->numOfVnodes); + for (int i = 0; i < pVgroup->numOfVnodes; ++i) { + pRemove->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip); + pRemove->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + } - sdbDeleteRow(tsChildTableSdb, pTable); + SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); + mgmtSendRemoveTableMsg(pRemove, &ipSet, NULL); + + if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) { + mError("table:%s, update ctables sdb error", pTable->tableId); + return TSDB_CODE_SDB_ERROR; + } if (pVgroup->numOfTables <= 0) { mgmtDropVgroup(pDb, pVgroup); } - return 0; + return TSDB_CODE_SUCCESS; } void* mgmtGetChildTable(char *tableId) { diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index bed5ea6afc8f9b9e87f8149448657b96187777c2..f9db010798e5f180770bdf0529d8c3ccd7486b4c 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -149,6 +149,7 @@ static void mgmtProcessCreateTableRsp(int8_t msgType, int8_t *pCont, int32_t con } rpcSendResponse(info->thandle, code, NULL, 0); + free(info); } void mgmtSendCreateTableMsg(SDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle) { @@ -157,58 +158,49 @@ void mgmtSendCreateTableMsg(SDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *a } static void mgmtProcessRemoveTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { - mTrace("remove table rsp received, handle:%p code:%d", thandle, code); + mTrace("remove table rsp received, thandle:%p code:%d", thandle, code); +} + +void mgmtSendRemoveTableMsg(SDRemoveTableMsg *pRemove, SRpcIpSet *ipSet, void *ahandle) { + mTrace("table:%s, sid:%d send remove table msg, ahandle:%p", pRemove->tableId, htonl(pRemove->sid), ahandle); + if (pRemove != NULL) { + mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_REMOVE_TABLE, pRemove, sizeof(SDRemoveTableMsg), ahandle); + } } static void mgmtProcessFreeVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { mTrace("free vnode rsp received, handle:%p code:%d", thandle, code); } -static void mgmtProcessCreateVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *ahandle, int32_t code) { - SProcessInfo *vgprocess = ahandle; - mTrace("create vnode received, vgprocess:%p code:%d", vgprocess, code); - - if (vgprocess == NULL) { - rpcFreeCont(pCont); - return; - } - - assert(vgprocess->type == TSDB_PROCESS_CREATE_VGROUP); - - vgprocess->received++; - SVgObj *pVgroup = vgprocess->ahandle; - - mTrace("vgroup:%d, received:%d numOfVnodes:%d vgprocess:%p tbprocess::%p", - pVgroup->vgId, vgprocess->received, pVgroup->numOfVnodes, vgprocess->thandle); - if (vgprocess->received == pVgroup->numOfVnodes) { - - STableInfo *pTable = vgprocess->ahandle; - SProcessInfo *tbprocess = calloc(1, sizeof(SProcessInfo)); - tbprocess->type = TSDB_PROCESS_CREATE_TABLE; - tbprocess->thandle = tbprocess->thandle; - tbprocess->ahandle = pTable; - SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - mgmtSendCreateTableMsg(pTable, &ipSet, tbprocess); +static void mgmtProcessCreateVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { + mTrace("create vnode rsp received, thandle:%p code:%d", thandle, code); + if (thandle == NULL) return; - free(vgprocess); + SProcessInfo *info = thandle; + assert(info->type == TSDB_PROCESS_CREATE_VGROUP); + info->received++; + SVgObj *pVgroup = info->ahandle; + + mTrace("vgroup:%d, received:%d numOfVnodes:%d", pVgroup->vgId, info->received, pVgroup->numOfVnodes); + if (info->received == pVgroup->numOfVnodes) { + mgmtProcessCreateTable(pVgroup, info->cont, info->contLen, info->thandle); + free(info); } - - rpcFreeCont(pCont); } -void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *table_info) { - mTrace("vgroup:%d, send create all vnodes msg, table_info:%p", pVgroup->vgId, table_info); +void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { + mTrace("vgroup:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle); for (int i = 0; i < pVgroup->numOfVnodes; ++i) { SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); - mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, table_info); + mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle); } } -void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *table_info) { - mTrace("vgroup:%d, send create vnode:%d msg, table_info:%p", pVgroup->vgId, vnode, table_info); +void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle) { + mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle); SVPeersMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode); if (pVpeer != NULL) { - mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_VPEERS, pVpeer, sizeof(SVPeersMsg), table_info); + mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_VPEERS, pVpeer, sizeof(SVPeersMsg), ahandle); } } @@ -242,14 +234,6 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p } -void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) { - mTrace("table:%s, sid:%d send remove table msg, handle:%p", pTable->tableId, pTable->sid); - - SDRemoveTableMsg *pRemove = mgmtBuildRemoveTableMsg(pTable); - if (pRemove != NULL) { - mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_REMOVE_TABLE, pRemove, sizeof(SDRemoveTableMsg), ahandle); - } -} void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) { mTrace("table:%s, sid:%d send alter stream msg, handle:%p", pTable->tableId, pTable->sid); diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index 4bff3c1e96e8e42388af266f4c2bff7b8bb96f01..d9c9d45e6fd37ad66e04996e0dd0fc6b9b858815 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -291,43 +291,58 @@ void mgmtCleanUpNormalTables() { sdbCloseTable(tsNormalTableSdb); } -int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable) { -// int8_t *pMsg = NULL; -// SDCreateTableMsg *pCreateTable = (SDCreateTableMsg *) pMsg; -// memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); -// pCreateTable->vnode = htobe32(vnode); -// pCreateTable->sid = htobe32(pTable->sid); -// pCreateTable->uid = htobe64(pTable->uid); -// pCreateTable->createdTime = htobe64(pTable->createdTime); -// pCreateTable->sversion = htobe32(pTable->sversion); -// pCreateTable->numOfColumns = htobe16(pTable->numOfColumns); -// -// SSchema *pSchema = pTable->schema; -// int32_t totalCols = pCreateTable->numOfColumns; - -// for (int32_t col = 0; col < totalCols; ++col) { -// SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col]; -// colData->type = pSchema[col].type; -// colData->bytes = htons(pSchema[col].bytes); -// colData->colId = htons(pSchema[col].colId); -// } - -// int32_t totalColsSize = sizeof(SMColumn *) * totalCols; -// pMsg = pCreateTable->data + totalColsSize; - -// return pMsg; - return NULL; +static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) { + int32_t totalCols = pTable->numOfColumns; + int32_t contLen = sizeof(SCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen; + + SDCreateTableMsg *pCreateTable = rpcMallocCont(contLen); + if (pCreateTable == NULL) { + return NULL; + } + + memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); + pCreateTable->tableType = pTable->type; + pCreateTable->numOfColumns = htons(pTable->numOfColumns); + pCreateTable->numOfTags = htons(0); + pCreateTable->sid = htonl(pTable->sid); + pCreateTable->sversion = htonl(pTable->sversion); + pCreateTable->tagDataLen = htonl(0); + pCreateTable->sqlDataLen = htonl(pTable->sqlLen); + pCreateTable->contLen = htonl(contLen); + pCreateTable->numOfVPeers = htonl(pVgroup->numOfVnodes); + pCreateTable->uid = htobe64(pTable->uid); + pCreateTable->superTableUid = htobe64(0); + pCreateTable->createdTime = htobe64(pTable->createdTime); + + for (int i = 0; i < pVgroup->numOfVnodes; ++i) { + pCreateTable->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip); + pCreateTable->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + } + + SSchema *pSchema = (SSchema *) pCreateTable->data; + memcpy(pSchema, pTable->schema, totalCols * sizeof(SSchema)); + for (int32_t col = 0; col < totalCols; ++col) { + pSchema->bytes = htons(pSchema->bytes); + pSchema->colId = htons(pSchema->colId); + pSchema++; + } + + memcpy(pCreateTable->data + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen); + + return pCreateTable; } -int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid, SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) { +int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, + SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) { int32_t numOfTables = sdbGetNumOfRows(tsNormalTableSdb); if (numOfTables >= TSDB_MAX_NORMAL_TABLES) { - mError("normal table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_NORMAL_TABLES); + mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_NORMAL_TABLES); return TSDB_CODE_TOO_MANY_TABLES; } SNormalTableObj *pTable = (SNormalTableObj *) calloc(sizeof(SNormalTableObj), 1); if (pTable == NULL) { + mError("table:%s, failed to alloc memory", pCreate->tableId); return TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -339,6 +354,7 @@ int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t pTable->uid = (((uint64_t) pTable->createdTime) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul)); pTable->sversion = 0; pTable->numOfColumns = pCreate->numOfColumns; + pTable->sqlLen = pTable->sqlLen; int32_t numOfCols = pCreate->numOfColumns; int32_t schemaSize = numOfCols * sizeof(SSchema); @@ -373,6 +389,14 @@ int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t return TSDB_CODE_SDB_ERROR; } + *pDCreateOut = mgmtBuildCreateNormalTableMsg(pTable, pVgroup); + if (*pDCreateOut == NULL) { + mError("table:%s, failed to build create table message", pCreate->tableId); + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + + *pTableOut = pTable; + mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 , pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid); @@ -382,18 +406,38 @@ int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) { SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { + mError("table:%s, failed to drop normal table, vgroup not exist", pTable->tableId); return TSDB_CODE_OTHERS; } -// mgmtSendRemoveTableMsg((STableInfo *) pTable, pVgroup); + SDRemoveTableMsg *pRemove = rpcMallocCont(sizeof(SDRemoveTableMsg)); + if (pRemove == NULL) { + mError("table:%s, failed to drop normal table, no enough memory", pTable->tableId); + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + + pRemove->sid = htonl(pTable->sid); + pRemove->uid = htobe64(pTable->uid); + + pRemove->numOfVPeers = htonl(pVgroup->numOfVnodes); + for (int i = 0; i < pVgroup->numOfVnodes; ++i) { + pRemove->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip); + pRemove->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + } - sdbDeleteRow(tsNormalTableSdb, pTable); + SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); + mgmtSendRemoveTableMsg(pRemove, &ipSet, NULL); + + if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) { + mError("table:%s, update ntables sdb error", pTable->tableId); + return TSDB_CODE_SDB_ERROR; + } if (pVgroup->numOfTables <= 0) { mgmtDropVgroup(pDb, pVgroup); } - return 0; + return TSDB_CODE_SUCCESS; } void* mgmtGetNormalTable(char *tableId) { diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index e9244e9e234b0c173ffd57871d69e13e82581177..40e2bc2cc3bb69dbd2a52fe27ae77d4ce7d3e790 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -782,38 +782,42 @@ void mgmtProcessCreateTableMsg(void *pCont, int32_t contLen, void *ahandle) { return; } - mgmtCreateTable(pCreate, contLen, ahandle); + int32_t code = mgmtCreateTable(pCreate, contLen, ahandle); + if (code != TSDB_CODE_ACTION_IN_PROGRESS) { + rpcSendResponse(ahandle, code, NULL, 0); + } } void mgmtProcessDropTableMsg(void *pCont, int32_t contLen, void *ahandle) { + SDropTableMsg *pDrop = (SDropTableMsg *) pCont; + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + mError("table:%s, failed to drop table, need redirect message", pDrop->tableId); return; } SUserObj *pUser = mgmtGetUserFromConn(ahandle); if (pUser == NULL) { + mError("table:%s, failed to drop table, invalid user", pDrop->tableId); rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); return; } - SDropTableMsg *pDrop = (SDropTableMsg *) pCont; - int32_t code; - if (!pUser->writeAuth) { - code = TSDB_CODE_NO_RIGHTS; - } else { - SDbObj *pDb = mgmtGetDb(pDrop->db); - if (pDb) { - code = mgmtDropTable(pDb, pDrop->tableId, pDrop->igNotExists); - if (code == TSDB_CODE_SUCCESS) { - mTrace("table:%s is dropped by user:%s", pDrop->tableId, pUser->user); - } - } else { - code = TSDB_CODE_DB_NOT_SELECTED; - } + mError("table:%s, failed to drop table, no rights", pDrop->tableId); + rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + return; } - if (code != TSDB_CODE_SUCCESS) { + SDbObj *pDb = mgmtGetDb(pDrop->db); + if (pDb == NULL) { + mError("table:%s, failed to drop table, db:%s not selected", pDrop->tableId, pDrop->db); + rpcSendResponse(ahandle, TSDB_CODE_DB_NOT_SELECTED, NULL, 0); + return; + } + + int32_t code = mgmtDropTable(pDb, pDrop->tableId, pDrop->igNotExists); + if (code != TSDB_CODE_ACTION_IN_PROGRESS) { rpcSendResponse(ahandle, code, NULL, 0); } } diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index d420391d73d6bd116a86cfceda6a1073c8272348..fa3d591680d94e952cc28e6cd727613c35209df6 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -608,17 +608,6 @@ void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable) { pStable->numOfTables--; } -int32_t mgmtGetTagsLength(SSuperTableObj* pSuperTable, int32_t col) { // length before column col - int32_t len = 0; - int32_t tagColumnIndexOffset = pSuperTable->numOfColumns; - - for (int32_t i = 0; i < pSuperTable->numOfTags && i < col; ++i) { - len += ((SSchema*)pSuperTable->schema)[tagColumnIndexOffset + i].bytes; - } - - return len; -} - int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) { int32_t numOfCols = pTable->numOfColumns + pTable->numOfTags; for (int32_t i = 0; i < numOfCols; ++i) { diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 9a8a97d28f725debf1724c3326cf36bc810cb832..da0bebefc877c85a46abe3ab543aad58bf3f86e5 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -164,7 +164,7 @@ void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t c code = mgmtCreateChildTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable); } else { mTrace("table:%s, start to create normal table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid); - code = mgmtCreateNormalTable(pCreate, pVgroup, sid, &pDCreate, &pTable); + code = mgmtCreateNormalTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable); } if (code != TSDB_CODE_SUCCESS) { @@ -178,31 +178,29 @@ void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t c SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); info->type = TSDB_PROCESS_CREATE_TABLE; - info->ahandle = pTable; info->thandle = thandle; + info->ahandle = pTable; SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); mgmtSendCreateTableMsg(pDCreate, &ipSet, info); } -void mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle) { +int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle) { SDbObj *pDb = mgmtGetDb(pCreate->db); if (pDb == NULL) { mError("table:%s, failed to create table, db not selected", pCreate->tableId); - rpcSendResponse(thandle, TSDB_CODE_DB_NOT_SELECTED, NULL, 0); - return; + return TSDB_CODE_DB_NOT_SELECTED; } STableInfo *pTable = mgmtGetTable(pCreate->tableId); if (pTable != NULL) { if (pCreate->igExists) { - mTrace("table:%s, table is alredy exist, think it success", pCreate->tableId); - rpcSendResponse(thandle, TSDB_CODE_SUCCESS, NULL, 0); + mTrace("table:%s, table is already exist, think it success", pCreate->tableId); + return TSDB_CODE_SUCCESS; } else { mError("table:%s, failed to create table, table already exist", pCreate->tableId); - rpcSendResponse(thandle, TSDB_CODE_TABLE_ALREADY_EXIST, NULL, 0); + return TSDB_CODE_TABLE_ALREADY_EXIST; } - return; } SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); @@ -211,60 +209,70 @@ void mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle) { int32_t code = mgmtCheckTableLimit(pAcct, pCreate); if (code != TSDB_CODE_SUCCESS) { mError("table:%s, failed to create table, table num exceed the limit", pCreate->tableId); - rpcSendResponse(thandle, code, NULL, 0); - return; + return code; } if (mgmtCheckExpired()) { mError("table:%s, failed to create table, grant expired", pCreate->tableId); - rpcSendResponse(thandle, TSDB_CODE_GRANT_EXPIRED, NULL, 0); - return; + return TSDB_CODE_GRANT_EXPIRED; } if (pCreate->numOfTags != 0) { - mTrace("table:%s, start to create super table, tags:%d columns:%d", pCreate->tableId, pCreate->numOfTags, pCreate->numOfColumns); - mgmtCreateSuperTable(pDb, pCreate); - return; + mTrace("table:%s, start to create super table, tags:%d columns:%d", + pCreate->tableId, pCreate->numOfTags, pCreate->numOfColumns); + return mgmtCreateSuperTable(pDb, pCreate); } code = mgmtCheckTimeSeries(pCreate->numOfColumns); if (code != TSDB_CODE_SUCCESS) { mError("table:%s, failed to create table, timeseries exceed the limit", pCreate->tableId); - return; + return TSDB_CODE_SUCCESS; } SVgObj *pVgroup = mgmtGetAvailableVgroup(pDb); if (pVgroup == NULL) { - mTrace("table:%s, no avaliable vgroup, start to create a new one", pCreate->tableId, pVgroup->vgId); + mTrace("table:%s, no avaliable vgroup, start to create a new one", pCreate->tableId); mgmtProcessCreateVgroup(pCreate, contLen, thandle); } else { mTrace("table:%s, try to create table in vgroup:%d", pCreate->tableId, pVgroup->vgId); mgmtProcessCreateTable(pVgroup, pCreate, contLen, thandle); } + + return TSDB_CODE_ACTION_IN_PROGRESS; } int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore) { STableInfo *pTable = mgmtGetTable(tableId); if (pTable == NULL) { if (ignore) { + mTrace("table:%s, table is not exist, think it success", tableId); return TSDB_CODE_SUCCESS; } else { + mError("table:%s, failed to create table, table not exist", tableId); return TSDB_CODE_INVALID_TABLE; } } if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { + mError("table:%s, failed to create table, in monitor database", tableId); return TSDB_CODE_MONITOR_DB_FORBIDDEN; } switch (pTable->type) { case TSDB_TABLE_TYPE_SUPER_TABLE: + mTrace("table:%s, start to drop super table", tableId); return mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable); case TSDB_TABLE_TYPE_CHILD_TABLE: + mTrace("table:%s, start to drop child table", tableId); return mgmtDropChildTable(pDb, (SChildTableObj *) pTable); case TSDB_TABLE_TYPE_NORMAL_TABLE: + mTrace("table:%s, start to drop normal table", tableId); + return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable); + case TSDB_TABLE_TYPE_STREAM_TABLE: + mTrace("table:%s, start to drop stream table", tableId); return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable); default: + mError("table:%s, invalid table type:%d", tableId, pTable->type); return TSDB_CODE_INVALID_TABLE; } }