diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 831416bc296d81b89d22082732ea929cca2c035e..252129e2d79f0ec4e85a898207ba1a7cd4223e4d 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1804,6 +1804,7 @@ int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SDropTableMsg *pDropTableMsg = (SDropTableMsg*)pCmd->payload; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); strcpy(pDropTableMsg->tableId, pMeterMetaInfo->name); + pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0; pCmd->msgType = TSDB_MSG_TYPE_DROP_TABLE; return TSDB_CODE_SUCCESS; diff --git a/src/dnode/inc/dnodeVnodeMgmt.h b/src/dnode/inc/dnodeVnodeMgmt.h index 321ac3083bd429c549b5195077f7eeb0313f3502..3cc705499fee32a659f0cd7d1916cc079e8f68b9 100644 --- a/src/dnode/inc/dnodeVnodeMgmt.h +++ b/src/dnode/inc/dnodeVnodeMgmt.h @@ -45,7 +45,7 @@ bool dnodeCheckVnodeExist(int32_t vid); * Create vnode with specified configuration and open it * if exist, config it */ -int32_t dnodeCreateVnode(int32_t vnode, SVPeersMsg *cfg); +int32_t dnodeCreateVnode(SCreateVnodeMsg *pVnode); /* * Remove vnode from local repository diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 1b49ce5f3260068df6d494243962e84f5f276d31..c9e2b7ce0350840a7abf1e66a1d7b5677192a542 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -177,9 +177,9 @@ void dnodeProcessAlterStreamRequest(void *pCont, int32_t contLen, int8_t msgType void dnodeProcessRemoveTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { SDRemoveTableMsg *pTable = pCont; - pTable->sid = htonl(pTable->sid); + pTable->sid = htonl(pTable->sid); pTable->numOfVPeers = htonl(pTable->numOfVPeers); - pTable->uid = htobe64(pTable->uid); + pTable->uid = htobe64(pTable->uid); for (int i = 0; i < pTable->numOfVPeers; ++i) { pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip); @@ -194,9 +194,8 @@ void dnodeProcessVPeerCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void int32_t code = htonl(*((int32_t *) pCont)); if (code == TSDB_CODE_SUCCESS) { - SVPeersMsg *vpeer = (SVPeersMsg *) (pCont + sizeof(int32_t)); - int32_t vnode = htonl(vpeer->vnode); - dnodeCreateVnode(vnode, vpeer); + SCreateVnodeMsg *pVnode = (SCreateVnodeMsg *) (pCont + sizeof(int32_t)); + dnodeCreateVnode(pVnode); } else if (code == TSDB_CODE_INVALID_VNODE_ID) { SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) (pCont + sizeof(int32_t)); int32_t vnode = htonl(vpeer->vnode); @@ -207,22 +206,15 @@ void dnodeProcessVPeerCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void } } -void dnodeProcessVPeersMsg(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { - SVPeersMsg *vpeer = (SVPeersMsg *) pCont; - int32_t vnode = htonl(vpeer->vnode); - - dPrint("vnode:%d, start to config", vnode); - - int32_t code = dnodeCreateVnode(vnode, vpeer); +void dnodeProcessCreateVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { + SCreateVnodeMsg *pVnode = (SCreateVnodeMsg *) pCont; + int32_t code = dnodeCreateVnode(pVnode); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } void dnodeProcessFreeVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { - SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) pCont; - int32_t vnode = htonl(vpeer->vnode); - - dPrint("vnode:%d, remove it", vnode); - + SFreeVnodeMsg *pVnode = (SFreeVnodeMsg *) pCont; + int32_t vnode = htonl(pVnode->vnode); int32_t code = dnodeDropVnode(vnode); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } @@ -256,7 +248,7 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { void dnodeInitProcessShellMsg() { dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableRequest; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessRemoveTableRequest; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_VPEERS] = dnodeProcessVPeersMsg; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_VNODE] = dnodeProcessCreateVnodeRequest; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_FREE_VNODE] = dnodeProcessFreeVnodeRequest; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest; diff --git a/src/dnode/src/dnodeVnodeMgmt.c b/src/dnode/src/dnodeVnodeMgmt.c index c39f7848e15103584e0f1b175a3cef4a665a726a..de05919c6a40f7b11721f8edd45210de8bca2494 100644 --- a/src/dnode/src/dnodeVnodeMgmt.c +++ b/src/dnode/src/dnodeVnodeMgmt.c @@ -31,11 +31,13 @@ bool dnodeCheckVnodeExist(int32_t vnode) { return true; } -int32_t dnodeCreateVnode(int32_t vnode, SVPeersMsg *cfg) { +int32_t dnodeCreateVnode(SCreateVnodeMsg *pVnode) { + dPrint("vnode:%d, is created", htonl(pVnode->vnode)); return 0; } int32_t dnodeDropVnode(int32_t vnode) { + dPrint("vnode:%d, is dropped", vnode); return 0; } diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 0608d8f45b4186623db2f036aaa01c829651fa9c..6caaa34a1990e75e4afcac3acc47d4ef52cd2237 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -70,7 +70,7 @@ int32_t dnodeCreateTable(SDCreateTableMsg *pTable) { * Remove table from local repository */ int32_t dnodeDropTable(SDRemoveTableMsg *pTable) { - dPrint("table:%s, sid:%d will be removed", pTable->tableId, pTable->sid); + dPrint("table:%s, sid:%d is removed", pTable->tableId, pTable->sid); return TSDB_CODE_SUCCESS; } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 8ea4968b6858f0797a04cb5f8d62070ca35f6506..0a68c8b4f94e93fb11eb4ba61a0de5aff0f3ae7a 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -41,7 +41,7 @@ extern "C" { #define TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP 10 #define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE 11 #define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP 12 -#define TSDB_MSG_TYPE_DNODE_VPEERS 13 +#define TSDB_MSG_TYPE_DNODE_CREATE_VNODE 13 #define TSDB_MSG_TYPE_DNODE_VPEERS_RSP 14 #define TSDB_MSG_TYPE_DNODE_FREE_VNODE 15 #define TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP 16 @@ -266,7 +266,6 @@ typedef struct { typedef struct { char tableId[TSDB_TABLE_ID_LEN + 1]; - char db[TSDB_DB_NAME_LEN + 1]; int8_t igNotExists; } SDropTableMsg; @@ -623,7 +622,7 @@ typedef struct { int32_t vnode; SVnodeCfg cfg; SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS]; -} SVPeersMsg; +} SCreateVnodeMsg; typedef struct { char tableId[TSDB_TABLE_ID_LEN + 1]; diff --git a/src/mnode/inc/mgmtDnodeInt.h b/src/mnode/inc/mgmtDnodeInt.h index 2ba85d24fa435e646a45bfb67fe8da53a7277030..772c2ba3310366c0006e7d1ce76a2e35652c0cc0 100644 --- a/src/mnode/inc/mgmtDnodeInt.h +++ b/src/mnode/inc/mgmtDnodeInt.h @@ -32,7 +32,7 @@ void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle); void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle); void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle); - +void mgmtSendRemoveVgroupMsg(SVgObj *pVgroup, void *ahandle); extern int32_t (*mgmtInitDnodeInt)(); extern void (*mgmtCleanUpDnodeInt)(); diff --git a/src/mnode/inc/mgmtProfile.h b/src/mnode/inc/mgmtProfile.h index 94e4f44bcc1a86658fe02100849af012795eb974..959f9e65ab93e220567384461de4bc93889be041 100644 --- a/src/mnode/inc/mgmtProfile.h +++ b/src/mnode/inc/mgmtProfile.h @@ -39,11 +39,10 @@ int32_t mgmtKillStream(char *qidstr, void *pConn); int32_t mgmtKillConnection(char *qidstr, void *pConn); enum { - TSDB_PROCESS_CREATE_TABLE, TSDB_PROCESS_CREATE_VGROUP, - TSDB_PROCESS_CREATE_VGROUP_AND_TABLE, - TSDB_PROCESS_CREATE_VNODE, - TSDB_PROCESS_TABLE_CFG, + TSDB_PROCESS_CREATE_VGROUP_GET_META, + TSDB_PROCESS_CREATE_TABLE, + TSDB_PROCESS_CREATE_TABLE_GET_META, }; typedef struct { diff --git a/src/mnode/inc/mgmtShell.h b/src/mnode/inc/mgmtShell.h index bda949cb42baa2c801efabbc2b6c70892bc7f63f..56668b512ffd6ee886d7e1bdef91007db07a8794 100644 --- a/src/mnode/inc/mgmtShell.h +++ b/src/mnode/inc/mgmtShell.h @@ -36,6 +36,21 @@ extern void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahand extern void (*mgmtProcessDropAcctMsg)(void *pCont, int32_t contLen, void *ahandle); extern void (*mgmtProcessCreateAcctMsg)(void *pCont, int32_t contLen, void *ahandle); +/* + * If table not exist, will create it + */ +void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle); + +/* + * If vgroup not exist, will create vgroup + */ +void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta); + +/* + * If vgroup create returned, will then create table + */ +void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index 974203fb421ffc02f8eff3384fc8ff96e7222de0..a6cc811c35009a94e69bb20ac73a6255ac18be65 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -31,7 +31,7 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid); int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp); int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo); -int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle); +int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta); int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore); int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter); int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); @@ -45,7 +45,6 @@ void mgmtSetTableDirty(STableInfo *pTable, bool isDirty); SDRemoveTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable); SDRemoveSuperTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable); -void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 7c76a1b189c8d29a2e7121b995ed89a2b9f3b86b..8b3bcff73809aef890beb94d4c5ffe7b8373c1c5 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -42,7 +42,7 @@ SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb); void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable); void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable); -SVPeersMsg *mgmtBuildVpeersMsg(SVgObj *pVgroup, int32_t vnode); +SCreateVnodeMsg *mgmtBuildVpeersMsg(SVgObj *pVgroup, int32_t vnode); SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup); SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip); diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index e57b83724f5b7b44e77296d8cd8ddfafa8b9797f..34f3fa2e8dd742a7221706d2d9c423b0104c31a0 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -311,7 +311,7 @@ static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgrou pSchema++; } - memcpy(pCreateTable->data + totalCols * sizeof(SSchema), pTagData, tagDataLen); + memcpy(pCreateTable + sizeof(SCreateTableMsg) + totalCols * sizeof(SSchema), pTagData, tagDataLen); return pCreateTable; } @@ -380,6 +380,7 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } + strcpy(pRemove->tableId, pTable->tableId); pRemove->sid = htonl(pTable->sid); pRemove->uid = htobe64(pTable->uid); diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 3ce254d6c6900536584c30a97c4b6494853ab82e..857e05fddc7297b4349b65b067ccb510fb02f28d 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -295,7 +295,7 @@ int32_t mgmtSetDbDropping(SDbObj *pDb) { } } } -// mgmtSendFreeVnodesMsg(pVgroup); +// mgmtSendRemoveVgroupMsg(pVgroup); pVgroup = pVgroup->next; } @@ -355,7 +355,7 @@ int32_t mgmtDropDb(SDbObj *pDb) { if (!finished) { SVgObj *pVgroup = pDb->pHead; while (pVgroup != NULL) { - mgmtSendFreeVnodesMsg(pVgroup); + mgmtSendRemoveVgroupMsg(pVgroup, NULL); pVgroup = pVgroup->next; } return TSDB_CODE_ACTION_IN_PROGRESS; diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index f9db010798e5f180770bdf0529d8c3ccd7486b4c..d3e139539f5caf124ff30f258ce81d4f72151be4 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -27,6 +27,7 @@ #include "mgmtDnode.h" #include "mgmtDnodeInt.h" #include "mgmtProfile.h" +#include "mgmtShell.h" #include "mgmtTable.h" #include "mgmtVgroup.h" @@ -137,7 +138,7 @@ static void mgmtProcessCreateTableRsp(int8_t msgType, int8_t *pCont, int32_t con if (thandle == NULL) return; SProcessInfo *info = thandle; - assert(info->type == TSDB_PROCESS_CREATE_TABLE); + assert(info->type == TSDB_PROCESS_CREATE_TABLE || info->type == TSDB_PROCESS_CREATE_TABLE_GET_META); STableInfo *pTable = info->ahandle; if (code != TSDB_CODE_SUCCESS) { @@ -148,7 +149,17 @@ static void mgmtProcessCreateTableRsp(int8_t msgType, int8_t *pCont, int32_t con mgmtSetTableDirty(pTable, false); } - rpcSendResponse(info->thandle, code, NULL, 0); + if (code != TSDB_CODE_SUCCESS) { + rpcSendResponse(info->thandle, code, NULL, 0); + } else { + if (info->type == TSDB_PROCESS_CREATE_TABLE_GET_META) { + mTrace("table:%s, start to process get meta", pTable->tableId); + mgmtProcessGetTableMeta(pTable, thandle); + } else { + rpcSendResponse(info->thandle, code, NULL, 0); + } + } + free(info); } @@ -169,7 +180,7 @@ void mgmtSendRemoveTableMsg(SDRemoveTableMsg *pRemove, SRpcIpSet *ipSet, void *a } static void mgmtProcessFreeVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { - mTrace("free vnode rsp received, handle:%p code:%d", thandle, code); + mTrace("free vnode rsp received, thandle:%p code:%d", thandle, code); } static void mgmtProcessCreateVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { @@ -177,13 +188,18 @@ static void mgmtProcessCreateVnodeRsp(int8_t msgType, int8_t *pCont, int32_t con if (thandle == NULL) return; SProcessInfo *info = thandle; - assert(info->type == TSDB_PROCESS_CREATE_VGROUP); + assert(info->type == TSDB_PROCESS_CREATE_VGROUP || info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META); info->received++; SVgObj *pVgroup = info->ahandle; + bool isGetMeta = false; + if (info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META) { + isGetMeta = true; + } + mTrace("vgroup:%d, received:%d numOfVnodes:%d", pVgroup->vgId, info->received, pVgroup->numOfVnodes); if (info->received == pVgroup->numOfVnodes) { - mgmtProcessCreateTable(pVgroup, info->cont, info->contLen, info->thandle); + mgmtProcessCreateTable(pVgroup, info->cont, info->contLen, info->thandle, isGetMeta); free(info); } } @@ -198,9 +214,9 @@ void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle) { mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle); - SVPeersMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode); + SCreateVnodeMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode); if (pVpeer != NULL) { - mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_VPEERS, pVpeer, sizeof(SVPeersMsg), ahandle); + mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_CREATE_VNODE, pVpeer, sizeof(SCreateVnodeMsg), ahandle); } } @@ -236,23 +252,25 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) { - mTrace("table:%s, sid:%d send alter stream msg, handle:%p", pTable->tableId, pTable->sid); + mTrace("table:%s, sid:%d send alter stream msg, ahandle:%p", pTable->tableId, pTable->sid, ahandle); } -void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *handle) { - mTrace("vnode:%d send free vnode msg, handle:%p", vnode, handle); +void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle) { + mTrace("vnode:%d send free vnode msg, ahandle:%p", vnode, ahandle); SFreeVnodeMsg *pFreeVnode = rpcMallocCont(sizeof(SFreeVnodeMsg)); if (pFreeVnode != NULL) { pFreeVnode->vnode = htonl(vnode); - mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_FREE_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), handle); + mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_FREE_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), ahandle); } } -void mgmtSendFreeVnodesMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *handle) { +void mgmtSendRemoveVgroupMsg(SVgObj *pVgroup, void *ahandle) { + mTrace("vgroup:%d send free vgroup msg, ahandle:%p", pVgroup->vgId, ahandle); + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip); - mgmtSendOneFreeVnodeMsg(pVgroup->vnodeGid[i].vnode, &ipSet, handle); + mgmtSendOneFreeVnodeMsg(pVgroup->vnodeGid[i].vnode, &ipSet, ahandle); } } diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index d9c9d45e6fd37ad66e04996e0dd0fc6b9b858815..bc1d3f8c32ad819f9a9dd99dddd5b5fcf821ceb8 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -416,6 +416,7 @@ int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } + strcpy(pRemove->tableId, pTable->tableId); pRemove->sid = htonl(pTable->sid); pRemove->uid = htobe64(pTable->uid); diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 40e2bc2cc3bb69dbd2a52fe27ae77d4ce7d3e790..598ec600db73452549a880076eb286dce00d1390 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -28,6 +28,7 @@ #include "mgmtConn.h" #include "mgmtDb.h" #include "mgmtDnode.h" +#include "mgmtDnodeInt.h" #include "mgmtGrant.h" #include "mgmtMnode.h" #include "mgmtNormalTable.h" @@ -38,23 +39,20 @@ #include "mgmtUser.h" #include "mgmtVgroup.h" - -#define MAX_LEN_OF_METER_META (sizeof(SMultiTableMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS + sizeof(SSchema) * TSDB_MAX_TAGS + TSDB_MAX_TAGS_LEN) - typedef int32_t (*GetMateFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn); typedef int32_t (*RetrieveMetaFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static GetMateFp* mgmtGetMetaFp; -static RetrieveMetaFp* mgmtRetrieveFp; -static void mgmtInitShowMsgFp(); - -void *tsShellConnServer = NULL; +static GetMateFp mgmtGetMetaFp[TSDB_MGMT_TABLE_MAX] = {0}; +static RetrieveMetaFp mgmtRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0}; +static void mgmtInitShowMsgFp(); static void mgmtInitProcessShellMsg(); static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, int32_t code); static void (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, void *ahandle); static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle); static int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); +void *tsShellConnServer = NULL; + void mgmtProcessTranRequest(SSchedMsg *sched) { int8_t msgType = *(int8_t *) (sched->msg); int32_t contLen = *(int32_t *) (sched->msg + sizeof(int8_t)); @@ -118,68 +116,44 @@ void mgmtCleanUpShell() { } void mgmtProcessTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { - SRpcConnInfo connInfo; - rpcGetConnInfo(ahandle, &connInfo); - - bool usePublicIp = (connInfo.serverIp == tsPublicIpInt); - SUserObj *pUser = mgmtGetUser(connInfo.user); - if (pUser == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); - return; - } - STableInfoMsg *pInfo = pCont; pInfo->createFlag = htons(pInfo->createFlag); - SDbObj* pDb = mgmtGetDbByTableId(pInfo->tableId); - if (pDb == NULL || pDb->dropStatus != TSDB_DB_STATUS_READY) { - rpcSendResponse(ahandle, TSDB_CODE_INVALID_DB, NULL, 0); + SUserObj *pUser = mgmtGetUserFromConn(ahandle); + if (pUser == NULL) { + mError("table:%s, failed to get table meta, invalid user", pInfo->tableId); + rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); return; } STableInfo *pTable = mgmtGetTable(pInfo->tableId); - - // on demand create table from super table if meter does not exists - if (pTable == NULL && pInfo->createFlag == 1) { - // write operation needs to redirect to master mnode - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { - return; - } - - SCreateTableMsg *pCreateMsg = calloc(1, sizeof(SCreateTableMsg) + sizeof(STagData)); - if (pCreateMsg == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); + if (pTable == NULL) { + if (pInfo->createFlag != 1) { + mError("table:%s, failed to get table meta, table not exist", pInfo->tableId); + rpcSendResponse(ahandle, TSDB_CODE_INVALID_TABLE, NULL, 0); return; - } - - memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData)); - strcpy(pCreateMsg->tableId, pInfo->tableId); - - mgmtCreateTable(pCreateMsg, contLen, NULL); - - char stableName[TSDB_TABLE_ID_LEN] = {0}; - strncpy(stableName, pInfo->tags, TSDB_TABLE_ID_LEN); - mTrace("table:%s is auto created by %s from %s", pCreateMsg->tableId, pUser->user, stableName); - - tfree(pCreateMsg); - - pTable = mgmtGetTable(pInfo->tableId); - } + } else { + // on demand create table from super table if table does not exists + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + mError("table:%s, failed to create table while get meta info, need redirect message", pInfo->tableId); + return; + } - if (pTable == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_INVALID_TABLE, NULL, 0); - return; - } + SCreateTableMsg *pCreateMsg = rpcMallocCont(sizeof(SCreateTableMsg) + sizeof(STagData)); + if (pCreateMsg == NULL) { + mError("table:%s, failed to create table while get meta info, no enough memory", pInfo->tableId); + rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); + return; + } - STableMeta *pMeta = rpcMallocCont(sizeof(STableMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS); - int32_t code = mgmtGetTableMeta(pDb, pTable, pMeta, usePublicIp); + memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData)); + strcpy(pCreateMsg->tableId, pInfo->tableId); - if (code != TSDB_CODE_SUCCESS) { - rpcFreeCont(pMeta); - rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, NULL, 0); + mError("table:%s, start to create table while get meta info", pInfo->tableId); + mgmtCreateTable(pCreateMsg, contLen, ahandle, true); + } } else { - pMeta->contLen = htons(pMeta->contLen); - rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pMeta, pMeta->contLen); + mgmtProcessGetTableMeta(pTable, ahandle); } } @@ -609,7 +583,6 @@ void mgmtProcessDropDbMsg(void *pCont, int32_t contLen, void *ahandle) { } static void mgmtInitShowMsgFp() { - mgmtGetMetaFp = (GetMateFp *)malloc(TSDB_MGMT_TABLE_MAX * sizeof(GetMateFp)); mgmtGetMetaFp[TSDB_MGMT_TABLE_ACCT] = mgmtGetAcctMeta; mgmtGetMetaFp[TSDB_MGMT_TABLE_USER] = mgmtGetUserMeta; mgmtGetMetaFp[TSDB_MGMT_TABLE_DB] = mgmtGetDbMeta; @@ -627,7 +600,6 @@ static void mgmtInitShowMsgFp() { mgmtGetMetaFp[TSDB_MGMT_TABLE_GRANTS] = mgmtGetGrantsMeta; mgmtGetMetaFp[TSDB_MGMT_TABLE_VNODES] = mgmtGetVnodeMeta; - mgmtRetrieveFp = (RetrieveMetaFp *)malloc(TSDB_MGMT_TABLE_MAX * sizeof(RetrieveMetaFp)); mgmtRetrieveFp[TSDB_MGMT_TABLE_ACCT] = mgmtRetrieveAccts; mgmtRetrieveFp[TSDB_MGMT_TABLE_USER] = mgmtRetrieveUsers; mgmtRetrieveFp[TSDB_MGMT_TABLE_DB] = mgmtRetrieveDbs; @@ -782,7 +754,7 @@ void mgmtProcessCreateTableMsg(void *pCont, int32_t contLen, void *ahandle) { return; } - int32_t code = mgmtCreateTable(pCreate, contLen, ahandle); + int32_t code = mgmtCreateTable(pCreate, contLen, ahandle, false); if (code != TSDB_CODE_ACTION_IN_PROGRESS) { rpcSendResponse(ahandle, code, NULL, 0); } @@ -809,9 +781,9 @@ void mgmtProcessDropTableMsg(void *pCont, int32_t contLen, void *ahandle) { return; } - SDbObj *pDb = mgmtGetDb(pDrop->db); + SDbObj *pDb = mgmtGetDbByTableId(pDrop->tableId); if (pDb == NULL) { - mError("table:%s, failed to drop table, db:%s not selected", pDrop->tableId, pDrop->db); + mError("table:%s, failed to drop table, db not selected", pDrop->tableId); rpcSendResponse(ahandle, TSDB_CODE_DB_NOT_SELECTED, NULL, 0); return; } @@ -1107,6 +1079,111 @@ void mgmtInitProcessShellMsg() { mgmtProcessShellMsg[TSDB_MSG_TYPE_STABLE_META] = mgmtProcessSuperTableMetaMsg; } +void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { + SDbObj *pDb = mgmtGetDb(pCreate->db); + if (pDb == NULL) { + mError("table:%s, failed to create vgroup, db not found", pCreate->tableId); + rpcSendResponse(thandle, TSDB_CODE_INVALID_DB, NULL, 0); + return; + } + + SVgObj *pVgroup = mgmtCreateVgroup(pDb); + if (pVgroup == NULL) { + mError("table:%s, failed to alloc vnode to vgroup", pCreate->tableId); + rpcSendResponse(thandle, TSDB_CODE_NO_ENOUGH_DNODES, NULL, 0); + return; + } + + void *cont = rpcMallocCont(contLen); + if (cont == NULL) { + mError("table:%s, failed to create table, can not alloc memory", pCreate->tableId); + rpcSendResponse(thandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); + return; + } + + memcpy(cont, pCreate, contLen); + + SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); + info->type = TSDB_PROCESS_CREATE_VGROUP; + info->thandle = thandle; + info->ahandle = pVgroup; + info->cont = cont; + info->contLen = contLen; + + if (isGetMeta) { + info->type = TSDB_PROCESS_CREATE_VGROUP_GET_META; + } + + mgmtSendCreateVgroupMsg(pVgroup, info); +} + +void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { + assert(pVgroup != NULL); + + int32_t sid = taosAllocateId(pVgroup->idPool); + if (sid < 0) { + mTrace("table:%s, no enough sid in vgroup:%d, start to create a new vgroup", pCreate->tableId, pVgroup->vgId); + mgmtProcessCreateVgroup(pCreate, contLen, thandle, isGetMeta); + return; + } + + int32_t code; + STableInfo *pTable; + SDCreateTableMsg *pDCreate = NULL; + + if (pCreate->numOfColumns == 0) { + mTrace("table:%s, start to create child table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid); + code = mgmtCreateChildTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable); + } else { + mTrace("table:%s, start to create normal table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid); + code = mgmtCreateNormalTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable); + } + + if (code != TSDB_CODE_SUCCESS) { + mTrace("table:%s, failed to create table in vgroup:%d sid:%d ", pCreate->tableId, pVgroup->vgId, sid); + rpcSendResponse(thandle, code, NULL, 0); + return; + } + + assert(pDCreate != NULL); + assert(pTable != NULL); + + SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); + info->type = TSDB_PROCESS_CREATE_TABLE; + info->thandle = thandle; + info->ahandle = pTable; + SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); + if (isGetMeta) { + info->type = TSDB_PROCESS_CREATE_TABLE_GET_META; + } + + mgmtSendCreateTableMsg(pDCreate, &ipSet, info); +} + +void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle) { + SDbObj* pDb = mgmtGetDbByTableId(pTable->tableId); + if (pDb == NULL || pDb->dropStatus != TSDB_DB_STATUS_READY) { + mError("table:%s, failed to get table meta, db not selected", pTable->tableId); + rpcSendResponse(thandle, TSDB_CODE_DB_NOT_SELECTED, NULL, 0); + return; + } + + SRpcConnInfo connInfo; + rpcGetConnInfo(thandle, &connInfo); + bool usePublicIp = (connInfo.serverIp == tsPublicIpInt); + + STableMeta *pMeta = rpcMallocCont(sizeof(STableMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS); + int32_t code = mgmtGetTableMeta(pDb, pTable, pMeta, usePublicIp); + + if (code != TSDB_CODE_SUCCESS) { + rpcFreeCont(pMeta); + rpcSendResponse(thandle, TSDB_CODE_SUCCESS, NULL, 0); + } else { + pMeta->contLen = htons(pMeta->contLen); + rpcSendResponse(thandle, TSDB_CODE_SUCCESS, pMeta, pMeta->contLen); + } +} + static int32_t mgmtCheckRedirectMsgImp(void *pConn) { return 0; } diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index da0bebefc877c85a46abe3ab543aad58bf3f86e5..cc95d9f8cc2488487ad1d2676fe2a010f8905364 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -34,6 +34,7 @@ #include "mgmtGrant.h" #include "mgmtNormalTable.h" #include "mgmtProfile.h" +#include "mgmtShell.h" #include "mgmtSuperTable.h" #include "mgmtTable.h" #include "mgmtUser.h" @@ -111,81 +112,7 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo return TSDB_CODE_SUCCESS; } -void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle) { - SDbObj *pDb = mgmtGetDb(pCreate->db); - if (pDb == NULL) { - mError("table:%s, failed to create vgroup, db not found", pCreate->tableId); - rpcSendResponse(thandle, TSDB_CODE_INVALID_DB, NULL, 0); - return; - } - - SVgObj *pVgroup = mgmtCreateVgroup(pDb); - if (pVgroup == NULL) { - mError("table:%s, failed to alloc vnode to vgroup", pCreate->tableId); - rpcSendResponse(thandle, TSDB_CODE_NO_ENOUGH_DNODES, NULL, 0); - return; - } - - void *cont = rpcMallocCont(contLen); - if (cont == NULL) { - mError("table:%s, failed to create table, can not alloc memory", pCreate->tableId); - rpcSendResponse(thandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); - return; - } - - memcpy(cont, pCreate, contLen); - - SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); - info->type = TSDB_PROCESS_CREATE_VGROUP; - info->thandle = thandle; - info->ahandle = pVgroup; - info->cont = cont; - info->contLen = contLen; - - mgmtSendCreateVgroupMsg(pVgroup, info); -} - -void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle) { - assert(pVgroup != NULL); - - int32_t sid = taosAllocateId(pVgroup->idPool); - if (sid < 0) { - mTrace("table:%s, no enough sid in vgroup:%d, start to create a new vgroup", pCreate->tableId, pVgroup->vgId); - mgmtProcessCreateVgroup(pCreate, contLen, thandle); - return; - } - - int32_t code; - STableInfo *pTable; - SDCreateTableMsg *pDCreate = NULL; - - if (pCreate->numOfColumns == 0) { - mTrace("table:%s, start to create child table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid); - code = mgmtCreateChildTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable); - } else { - mTrace("table:%s, start to create normal table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid); - code = mgmtCreateNormalTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable); - } - - if (code != TSDB_CODE_SUCCESS) { - mTrace("table:%s, failed to create table in vgroup:%d sid:%d ", pCreate->tableId, pVgroup->vgId, sid); - rpcSendResponse(thandle, code, NULL, 0); - return; - } - - assert(pDCreate != NULL); - assert(pTable != NULL); - - SProcessInfo *info = calloc(1, sizeof(SProcessInfo)); - info->type = TSDB_PROCESS_CREATE_TABLE; - info->thandle = thandle; - info->ahandle = pTable; - SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - - mgmtSendCreateTableMsg(pDCreate, &ipSet, info); -} - -int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle) { +int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { SDbObj *pDb = mgmtGetDb(pCreate->db); if (pDb == NULL) { mError("table:%s, failed to create table, db not selected", pCreate->tableId); @@ -232,10 +159,10 @@ int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle SVgObj *pVgroup = mgmtGetAvailableVgroup(pDb); if (pVgroup == NULL) { mTrace("table:%s, no avaliable vgroup, start to create a new one", pCreate->tableId); - mgmtProcessCreateVgroup(pCreate, contLen, thandle); + mgmtProcessCreateVgroup(pCreate, contLen, thandle, isGetMeta); } else { mTrace("table:%s, try to create table in vgroup:%d", pCreate->tableId, pVgroup->vgId); - mgmtProcessCreateTable(pVgroup, pCreate, contLen, thandle); + mgmtProcessCreateTable(pVgroup, pCreate, contLen, thandle, isGetMeta); } return TSDB_CODE_ACTION_IN_PROGRESS; @@ -494,4 +421,5 @@ SDRemoveTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable) { void mgmtSetTableDirty(STableInfo *pTable, bool isDirty) { pTable->dirty = isDirty; -} \ No newline at end of file +} + diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 657706be547b8569cd9e5ee7936cfc2fc9e72ed0..10b4244bf861fe06235048a92f949791ef440b11 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -192,10 +192,12 @@ int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { } mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); -// mgmtSendFreeVnodesMsg(pVgroup); + + mgmtSendRemoveVgroupMsg(pVgroup, NULL); + sdbDeleteRow(tsVgroupSdb, pVgroup); - return 0; + return TSDB_CODE_SUCCESS; } void mgmtSetVgroupIdPool() { @@ -480,11 +482,11 @@ void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable) { taosFreeId(pVgroup->idPool, pTable->sid); } -SVPeersMsg *mgmtBuildVpeersMsg(SVgObj *pVgroup, int32_t vnode) { +SCreateVnodeMsg *mgmtBuildVpeersMsg(SVgObj *pVgroup, int32_t vnode) { SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) return NULL; - SVPeersMsg *pVPeers = rpcMallocCont(sizeof(SVPeersMsg)); + SCreateVnodeMsg *pVPeers = rpcMallocCont(sizeof(SCreateVnodeMsg)); if (pVPeers == NULL) return NULL; pVPeers->vnode = htonl(vnode); diff --git a/src/util/src/tstring.c b/src/util/src/tstring.c index 536e8489ac9eac61be01689b58d27a482df65100..c9c06ea6e7bda7d6ddd7406aba0cf6d7e0ee8749 100644 --- a/src/util/src/tstring.c +++ b/src/util/src/tstring.c @@ -28,8 +28,8 @@ char *taosMsg[] = { "remove-table", "remove-table-rsp", - "vpeers", - "vpeers-rsp", + "create-vnode", + "create-vnode-rsp", "free-vnode", "free-vnode-rsp", "cfg-dnode",