diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 792a8ec1cce02ed0b00ef5d44b32624ccfb9ca0e..80a5e2b36c491e938fce127221178b6ceb20d01c 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -98,67 +98,45 @@ typedef struct { } SVnodeGid; typedef struct { - char tableId[TSDB_TABLE_ID_LEN]; - int8_t type; - int8_t dirty; - uint64_t uid; - int32_t sid; - int32_t vgId; - int64_t createdTime; + char tableId[TSDB_TABLE_ID_LEN]; + int8_t type; + int8_t dirty; } STableInfo; struct _vg_obj; typedef struct SSuperTableObj { - char tableId[TSDB_TABLE_ID_LEN + 1]; - int8_t type; - int8_t dirty; - uint64_t uid; - int32_t sid; - int32_t vgId; - int64_t createdTime; - int32_t sversion; - int32_t numOfColumns; - int32_t numOfTags; - int8_t reserved[15]; - int8_t updateEnd[1]; - int32_t numOfTables; - int16_t nextColId; - SSchema *schema; + STableInfo info; + uint64_t uid; + int64_t createdTime; + int32_t sversion; + int32_t numOfColumns; + int32_t numOfTags; + int8_t reserved[15]; + int8_t updateEnd[1]; + int32_t numOfTables; + int16_t nextColId; + SSchema * schema; } SSuperTableObj; typedef struct { - char tableId[TSDB_TABLE_ID_LEN + 1]; - int8_t type; - int8_t dirty; - uint64_t uid; - int32_t sid; - int32_t vgId; - int64_t createdTime; - char superTableId[TSDB_TABLE_ID_LEN + 1]; - int8_t reserved[1]; - int8_t updateEnd[1]; + STableInfo info; + uint64_t uid; + int64_t createdTime; + int32_t sversion; //used by normal table + int32_t numOfColumns; //used by normal table + int32_t sid; + int32_t vgId; + char superTableId[TSDB_TABLE_ID_LEN + 1]; + int32_t sqlLen; + int8_t reserved[1]; + int8_t updateEnd[1]; + int16_t nextColId; //used by normal table + char* sql; //used by normal table + SSchema* schema; //used by normal table SSuperTableObj *superTable; } SChildTableObj; -typedef struct { - char tableId[TSDB_TABLE_ID_LEN]; - int8_t type; - int8_t dirty; - uint64_t uid; - int32_t sid; - int32_t vgId; - int64_t createdTime; - int32_t sversion; - int32_t numOfColumns; - int32_t sqlLen; - int8_t reserved[7]; - int8_t updateEnd[1]; - char* sql; //null-terminated string - int16_t nextColId; - SSchema* schema; -} SNormalTableObj; - struct _db_obj; typedef struct _vg_obj { @@ -260,7 +238,7 @@ typedef struct { typedef struct { uint8_t msgType; - int8_t expected; + int8_t usePublicIp; int8_t received; int8_t successed; int32_t contLen; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 2fcc91a7bb84d98ff9ae4e898dc77472b332b008..f1e07c98a8728c8e8c1be96e89233f74efdc1d5a 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -94,8 +94,8 @@ extern "C" { #define TSDB_MSG_TYPE_CM_ALTER_TABLE_RSP 62 #define TSDB_MSG_TYPE_CM_TABLE_META 63 #define TSDB_MSG_TYPE_CM_TABLE_META_RSP 64 -#define TSDB_MSG_TYPE_CM_STABLE_META 65 -#define TSDB_MSG_TYPE_CM_STABLE_META_RSP 66 +#define TSDB_MSG_TYPE_CM_STABLE_VGROUP 65 +#define TSDB_MSG_TYPE_CM_STABLE_VGROUP_RSP 66 #define TSDB_MSG_TYPE_CM_TABLES_META 67 #define TSDB_MSG_TYPE_CM_TABLES_META_RSP 68 #define TSDB_MSG_TYPE_CM_ALTER_STREAM 69 diff --git a/src/mnode/inc/mgmtChildTable.h b/src/mnode/inc/mgmtChildTable.h index 9252a7d485ae02055de81835f2c8c0a0d0da5cbe..3f6b1d8b858a8e9bcbee8f59a65ab07b87c99965 100644 --- a/src/mnode/inc/mgmtChildTable.h +++ b/src/mnode/inc/mgmtChildTable.h @@ -23,23 +23,18 @@ extern "C" { #include #include #include "taosdef.h" - #include "mnode.h" int32_t mgmtInitChildTables(); void mgmtCleanUpChildTables(); void * mgmtGetChildTable(char *tableId); -void *mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); -void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pCreate, SChildTableObj *pTable); - -int32_t mgmtDropChildTable(SQueuedMsg *newMsg, SChildTableObj *pTable); -int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent); - -int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp); - -void mgmtDropAllChildTables(SDbObj *pDropDb); -void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable); +void mgmtCreateChildTable(SQueuedMsg *pMsg); +void mgmtDropChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable); +void mgmtGetChildTableMeta(SQueuedMsg *pMsg, SChildTableObj *pTable); +void mgmtAlterChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable); +void mgmtDropAllChildTables(SDbObj *pDropDb); +void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtNormalTable.h b/src/mnode/inc/mgmtNormalTable.h deleted file mode 100644 index dd09a62bb4bc5f2051d374725e3759ddf5c2dae5..0000000000000000000000000000000000000000 --- a/src/mnode/inc/mgmtNormalTable.h +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TBASE_MNODE_NORMAL_TABLE_H -#define TBASE_MNODE_NORMAL_TABLE_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include -#include "mnode.h" - -int32_t mgmtInitNormalTables(); -void mgmtCleanUpNormalTables(); -void * mgmtGetNormalTable(char *tableId); - -void * mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); -void * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable); - -int32_t mgmtDropNormalTable(SQueuedMsg *newMsg, SNormalTableObj *pTable); -int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols); -int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName); - -int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp); - -void mgmtDropAllNormalTables(SDbObj *pDropDb); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/mnode/inc/mgmtSuperTable.h b/src/mnode/inc/mgmtSuperTable.h index 6d0c565c3022f09460a4ec3eb84bc66c797030b7..73809148dec89f0e5c97a59164baa6490d0c4689 100644 --- a/src/mnode/inc/mgmtSuperTable.h +++ b/src/mnode/inc/mgmtSuperTable.h @@ -22,31 +22,19 @@ extern "C" { #include #include - #include "taosdef.h" #include "mnode.h" int32_t mgmtInitSuperTables(); void mgmtCleanUpSuperTables(); - void * mgmtGetSuperTable(char *tableId); -int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate); -int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pTable); -int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags); -int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName); -int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pTable, char *oldTagName, char *newTagName); -int32_t mgmtAddSuperTableColumn(SSuperTableObj *pTable, SSchema schema[], int32_t ncols); -int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pTable, char *colName); - -int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp); -void * mgmtGetSuperTableVgroup(SSuperTableObj *pStable); - -int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName); -int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable); - +void mgmtCreateSuperTable(SQueuedMsg *pMsg); +void mgmtDropSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable); +void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable); +void mgmtAlterSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable); void mgmtDropAllSuperTables(SDbObj *pDropDb); -int32_t mgmtExtractTableName(const char* tableId, char* name); +int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index b145210e9bbf1800ad6e80b46f635407102fb86c..66557b33621bc1b3d950491009e02cf2799e042a 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -27,13 +27,8 @@ extern "C" { int32_t mgmtInitTables(); void mgmtCleanUpTables(); -STableInfo* mgmtGetTable(char *tableId); - -STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid); -int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMetaMsg *pMeta, bool usePublicIp); - -void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable); -void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable); +STableInfo* mgmtGetTable(char* tableId); +void mgmtExtractTableName(char* tableId, char* tableName); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtUser.h b/src/mnode/inc/mgmtUser.h index d1f927f6de629d9d0fb374deff58ff73cf8baa62..5001bc277014b12fe05216eb0b76ec463c71a83c 100644 --- a/src/mnode/inc/mgmtUser.h +++ b/src/mnode/inc/mgmtUser.h @@ -24,7 +24,7 @@ extern "C" { int32_t mgmtInitUsers(); void mgmtCleanUpUsers(); SUserObj *mgmtGetUser(char *name); -SUserObj *mgmtGetUserFromConn(void *pConn); +SUserObj *mgmtGetUserFromConn(void *pConn, bool *usePublicIp) #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 3379a93f28f0dc8933fcff3cf51bc587c01cc444..aba95fb478fccfe3e78a582c1970bbdb6f779dc5 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -36,8 +36,8 @@ void mgmtUpdateVgroup(SVgObj *pVgroup); void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle); SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb); -void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable); -void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable); +void mgmtAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable); +void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable); void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle); void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle); diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 8fbf210118622f50df6ddf843cb0737db8bdac5f..9b4be2fdc2cb07525b7d33f9d7193d685488102f 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -35,10 +35,19 @@ #include "mgmtTable.h" #include "mgmtVgroup.h" -void *tsChildTableSdb; -int32_t tsChildTableUpdateSize; +static void *tsChildTableSdb; +static int32_t tsChildTableUpdateSize; +static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *queueMsg); +static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg); +static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg); +static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg); +static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg); +static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static void mgmtDestroyChildTable(SChildTableObj *pTable) { + tfree(pTable->schema); + tfree(pTable->sql); tfree(pTable); } @@ -52,28 +61,31 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) { SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { - mError("ctable:%s, not in vgroup:%d", pTable->tableId, pTable->vgId); + mError("ctable:%s, not in vgroup:%d", pTable->info.tableId, pTable->vgId); return TSDB_CODE_INVALID_VGROUP_ID; } SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) { - mError("ctable:%s, vgroup:%d not in db:%s", pTable->tableId, pVgroup->vgId, pVgroup->dbName); + mError("ctable:%s, vgroup:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); return TSDB_CODE_INVALID_DB; } SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); if (pAcct == NULL) { - mError("ctable:%s, account:%s not exists", pTable->tableId, pDb->cfg.acct); + mError("ctable:%s, account:%s not exists", pTable->info.tableId, pDb->cfg.acct); return TSDB_CODE_INVALID_ACCT; } - pTable->superTable = mgmtGetSuperTable(pTable->superTableId); - mgmtAddTableIntoSuperTable(pTable->superTable); - - mgmtAddTimeSeries(pAcct, pTable->superTable->numOfColumns - 1); + if (pTable->info.type == TSDB_CHILD_TABLE) { + pTable->superTable = mgmtGetSuperTable(pTable->superTableId); + pStable->numOfTables++; + mgmtAddTimeSeries(pAcct, pTable->superTable->numOfColumns - 1); + } else { + mgmtAddTimeSeries(pAcct, pTable->numOfColumns - 1); + } mgmtAddTableIntoDb(pDb); - mgmtAddTableIntoVgroup(pVgroup, (STableInfo *) pTable); + mgmtAddTableIntoVgroup(pVgroup, pTable); return TSDB_CODE_SUCCESS; } @@ -91,21 +103,25 @@ static int32_t mgmtChildTableActionDelete(SSdbOperDesc *pOper) { SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) { - mError("ctable:%s, vgroup:%d not in DB:%s", pTable->tableId, pVgroup->vgId, pVgroup->dbName); + mError("ctable:%s, vgroup:%d not in DB:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); return TSDB_CODE_INVALID_DB; } SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); if (pAcct == NULL) { - mError("ctable:%s, account:%s not exists", pTable->tableId, pDb->cfg.acct); + mError("ctable:%s, account:%s not exists", pTable->info.tableId, pDb->cfg.acct); return TSDB_CODE_INVALID_ACCT; } - mgmtRestoreTimeSeries(pAcct, pTable->superTable->numOfColumns - 1); + if (pTable->info.type == TSDB_CHILD_TABLE) { + mgmtRestoreTimeSeries(pAcct, pTable->superTable->numOfColumns - 1); + pStable->numOfTables--; + } else { + mgmtRestoreTimeSeries(pAcct, pTable->numOfColumns - 1); + } mgmtRemoveTableFromDb(pDb); mgmtRemoveTableFromVgroup(pVgroup, (STableInfo *) pTable); - mgmtRemoveTableFromSuperTable(pTable->superTable); - + return TSDB_CODE_SUCCESS; } @@ -117,22 +133,50 @@ static int32_t mgmtChildTableActionEncode(SSdbOperDesc *pOper) { SChildTableObj *pTable = pOper->pObj; assert(pTable != NULL && pOper->rowData != NULL); - memcpy(pOper->rowData, pTable, tsChildTableUpdateSize); - pOper->rowSize = tsChildTableUpdateSize; + if (pTable->info.type == TSDB_CHILD_TABLE) { + memcpy(pOper->rowData, pTable, tsChildTableUpdateSize); + pOper->rowSize = tsChildTableUpdateSize; + } else { + int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); + if (pOper->maxRowSize < tsChildTableUpdateSize + schemaSize) { + return TSDB_CODE_INVALID_MSG_LEN; + } + memcpy(pOper->rowData, pTable, tsChildTableUpdateSize); + memcpy(pOper->rowData + tsChildTableUpdateSize, pTable->schema, schemaSize); + memcpy(pOper->rowData + tsChildTableUpdateSize + schemaSize, pTable->sql, pTable->sqlLen); + pOper->rowSize = tsChildTableUpdateSize + schemaSize + pTable->sqlLen; + } return TSDB_CODE_SUCCESS; } static int32_t mgmtChildTableActionDecode(SSdbOperDesc *pOper) { assert(pOper->rowData != NULL); - - pOper->pObj = calloc(1, sizeof(SChildTableObj)); - if (pOper->pObj == NULL) { + SChildTableObj *pTable = calloc(1, sizeof(SChildTableObj)); + if (pTable == NULL) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } - memcpy(pOper->pObj, pOper->rowData, tsChildTableUpdateSize); + memcpy(pTable, pOper->rowData, tsChildTableUpdateSize); + if (pTable->info.type != TSDB_CHILD_TABLE) { + int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); + pTable->schema = (SSchema *)malloc(schemaSize); + if (pTable->schema == NULL) { + mgmtDestroyNormalTable(pTable); + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + memcpy(pTable->schema, pOper->rowData + tsChildTableUpdateSize, schemaSize); + + pTable->sql = (char *)malloc(pTable->sqlLen); + if (pTable->sql == NULL) { + mgmtDestroyNormalTable(pTable); + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + memcpy(pTable->sql, pOper->rowData + tsChildTableUpdateSize + schemaSize, pTable->sqlLen); + } + + pOper->pObj = pTable; return TSDB_CODE_SUCCESS; } @@ -147,7 +191,7 @@ int32_t mgmtInitChildTables() { SSdbTableDesc tableDesc = { .tableName = "ctables", .hashSessions = tsMaxTables, - .maxRowSize = tsChildTableUpdateSize, + .maxRowSize = sizeof(SChildTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS,, .keyType = SDB_KEY_TYPE_STRING, .insertFp = mgmtChildTableActionInsert, .deleteFp = mgmtChildTableActionDelete, @@ -165,14 +209,15 @@ int32_t mgmtInitChildTables() { pNode = NULL; while (1) { + pLastNode = pNode; pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable); if (pTable == NULL) { break; } - SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); + SDbObj *pDb = mgmtGetDbByTableId(pTable->info.tableId); if (pDb == NULL) { - mError("ctable:%s, failed to get db, discard it", pTable->tableId); + mError("ctable:%s, failed to get db, discard it", pTable->info.tableId); SSdbOperDesc desc = {0}; desc.type = SDB_OPER_TYPE_LOCAL; desc.pObj = pTable; @@ -184,7 +229,7 @@ int32_t mgmtInitChildTables() { SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { - mError("ctable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->tableId, pTable->vgId, pTable->sid); + mError("ctable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->info.tableId, pTable->vgId, pTable->sid); pTable->vgId = 0; SSdbOperDesc desc = {0}; desc.type = SDB_OPER_TYPE_LOCAL; @@ -197,7 +242,7 @@ int32_t mgmtInitChildTables() { if (strcmp(pVgroup->dbName, pDb->name) != 0) { mError("ctable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it", - pTable->tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid); + pTable->info.tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid); pTable->vgId = 0; SSdbOperDesc desc = {0}; desc.type = SDB_OPER_TYPE_LOCAL; @@ -209,7 +254,7 @@ int32_t mgmtInitChildTables() { } if (pVgroup->tableList == NULL) { - mError("ctable:%s, vgroup:%d tableList is null", pTable->tableId, pTable->vgId); + mError("ctable:%s, vgroup:%d tableList is null", pTable->info.tableId, pTable->vgId); pTable->vgId = 0; SSdbOperDesc desc = {0}; desc.type = SDB_OPER_TYPE_LOCAL; @@ -220,20 +265,30 @@ int32_t mgmtInitChildTables() { continue; } - SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTable->superTableId); - if (pSuperTable == NULL) { - mError("ctable:%s, stable:%s not exist", pTable->tableId, pTable->superTableId); - pTable->vgId = 0; - SSdbOperDesc desc = {0}; - desc.type = SDB_OPER_TYPE_LOCAL; - desc.pObj = pTable; - desc.table = tsChildTableSdb; - sdbDeleteRow(&desc); - pNode = pLastNode; - continue; + if (pTable->info.type == TSDB_CHILD_TABLE) { + SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTable->superTableId); + if (pSuperTable == NULL) { + mError("ctable:%s, stable:%s not exist", pTable->info.tableId, pTable->superTableId); + pTable->vgId = 0; + SSdbOperDesc desc = {0}; + desc.type = SDB_OPER_TYPE_LOCAL; + desc.pObj = pTable; + desc.table = tsChildTableSdb; + sdbDeleteRow(&desc); + pNode = pLastNode; + continue; + } } } + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_TABLES_META, mgmtProcessMultiTableMetaMsg); + mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta); + mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateTableRsp); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropTableRsp); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, mgmtProcessAlterTableRsp); + mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_TABLE, mgmtProcessTableCfgMsg); + mTrace("child table is initialized"); return 0; } @@ -242,79 +297,141 @@ void mgmtCleanUpChildTables() { sdbCloseTable(tsChildTableSdb); } -void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableObj *pTable) { +static void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableObj *pTable) { char * pTagData = NULL; int32_t tagDataLen = 0; - if (pMsg != NULL) { + int32_t totalCols = 0; + int32_t contLen = 0; + if (pTable->info.type == TSDB_CHILD_TABLE && pMsg != NULL) { pTagData = pMsg->schema + TSDB_TABLE_ID_LEN + 1; tagDataLen = htonl(pMsg->contLen) - sizeof(SCMCreateTableMsg) - TSDB_TABLE_ID_LEN - 1; + totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags; + contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen + pTable->sqlLen; + } else { + totalCols = pTable->numOfColumns; + contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen; } - int32_t totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags; - int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen; - SMDCreateTableMsg *pCreate = rpcMallocCont(contLen); if (pCreate == NULL) { terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; return NULL; } - memcpy(pCreate->tableId, pTable->tableId, TSDB_TABLE_ID_LEN + 1); + memcpy(pCreate->tableId, pTable->info.tableId, TSDB_TABLE_ID_LEN + 1); memcpy(pCreate->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN + 1); pCreate->contLen = htonl(contLen); pCreate->vgId = htonl(pTable->vgId); - pCreate->tableType = pTable->type; - pCreate->numOfColumns = htons(pTable->superTable->numOfColumns); - pCreate->numOfTags = htons(pTable->superTable->numOfTags); + pCreate->tableType = pTable->info.type; + pCreate->createdTime = htobe64(pTable->createdTime); pCreate->sid = htonl(pTable->sid); - pCreate->sversion = htonl(pTable->superTable->sversion); - pCreate->tagDataLen = htonl(tagDataLen); - pCreate->sqlDataLen = 0; + pCreate->sqlDataLen = htonl(pTable->sqlLen); pCreate->uid = htobe64(pTable->uid); - pCreate->superTableUid = htobe64(pTable->superTable->uid); - pCreate->createdTime = htobe64(pTable->createdTime); - + + if (pTable->info.type == TSDB_CHILD_TABLE) { + pCreate->numOfColumns = htons(pTable->superTable->numOfColumns); + pCreate->numOfTags = htons(pTable->superTable->numOfTags); + pCreate->sversion = htonl(pTable->superTable->sversion); + pCreate->tagDataLen = htonl(tagDataLen); + pCreate->superTableUid = htobe64(pTable->superTable->uid); + } else { + pCreate->numOfColumns = htons(pTable->numOfColumns); + pCreate->numOfTags = 0; + pCreate->sversion = htonl(pTable->sversion); + pCreate->tagDataLen = 0; + pCreate->superTableUid = 0; + } + SSchema *pSchema = (SSchema *) pCreate->data; - memcpy(pSchema, pTable->superTable->schema, totalCols * sizeof(SSchema)); + if (pTable->info.type == TSDB_CHILD_TABLE) { + memcpy(pSchema, pTable->superTable->schema, totalCols * sizeof(SSchema)); + } else { + memcpy(pSchema, pTable->schema, totalCols * sizeof(SSchema)); + } for (int32_t col = 0; col < totalCols; ++col) { pSchema->bytes = htons(pSchema->bytes); pSchema->colId = htons(pSchema->colId); pSchema++; } - if (pMsg != NULL) { + if (pTable->info.type == TSDB_CHILD_TABLE && pMsg != NULL) { memcpy(pCreate->data + totalCols * sizeof(SSchema), pTagData, tagDataLen); + memcpy(pCreate->data + totalCols * sizeof(SSchema) + tagDataLen, pTable->sql, pTable->sqlLen); } return pCreate; } -void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t tid) { - char *pTagData = (char *) pCreate->schema; // it is a tag key - SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData); - if (pSuperTable == NULL) { - mError("ctable:%s, corresponding super table does not exist", pCreate->tableId); - terrno = TSDB_CODE_INVALID_TABLE; - return NULL; - } - - SChildTableObj *pTable = (SChildTableObj *) calloc(sizeof(SChildTableObj), 1); +static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t tid) { + SChildTableObj *pTable = (SChildTableObj *) calloc(1, sizeof(SChildTableObj)); if (pTable == NULL) { mError("ctable:%s, failed to alloc memory", pCreate->tableId); terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; return NULL; } - strcpy(pTable->tableId, pCreate->tableId); - strcpy(pTable->superTableId, pSuperTable->tableId); - pTable->type = TSDB_CHILD_TABLE; + if (pCreate->numOfColumns == 0) { + pTable->info.type = TSDB_CHILD_TABLE; + } else { + pTable->info.type = TSDB_NORMAL_TABLE; + } + + strcpy(pTable->info.tableId, pCreate->tableId); pTable->createdTime = taosGetTimestampMs(); - pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) + - (sdbGetVersion() & ((1ul << 16) - 1ul)); pTable->sid = tid; pTable->vgId = pVgroup->vgId; - pTable->superTable = pSuperTable; + + if (Table->info.type == TSDB_CHILD_TABLE) { + char *pTagData = (char *) pCreate->schema; // it is a tag key + SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData); + if (pSuperTable == NULL) { + mError("ctable:%s, corresponding super table does not exist", pCreate->tableId); + free(pTable); + terrno = TSDB_CODE_INVALID_TABLE; + return NULL; + } + + strcpy(pTable->superTableId, pSuperTable->tableId); + pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) + + (sdbGetVersion() & ((1ul << 16) - 1ul)); + pTable->superTable = pSuperTable; + } else { + pTable->uid = (((uint64_t) pTable->createdTime) << 16) + (sdbGetVersion() & ((1ul << 16) - 1ul)); + pTable->sversion = 0; + pTable->numOfColumns = htons(pCreate->numOfColumns); + pTable->sqlLen = htons(pCreate->sqlLen); + + int32_t numOfCols = pTable->numOfColumns; + int32_t schemaSize = numOfCols * sizeof(SSchema); + pTable->schema = (SSchema *) calloc(1, schemaSize); + if (pTable->schema == NULL) { + free(pTable); + terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; + return NULL; + } + memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema)); + pTable->nextColId = 0; + for (int32_t col = 0; col < numOfCols; col++) { + SSchema *tschema = pTable->schema; + tschema[col].colId = pTable->nextColId++; + tschema[col].bytes = htons(tschema[col].bytes); + } + + if (pTable->sqlLen != 0) { + pTable->info.type = TSDB_STREAM_TABLE; + pTable->sql = calloc(1, pTable->sqlLen); + if (pTable->sql == NULL) { + free(pTable); + terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; + return NULL; + } + memcpy(pTable->sql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pTable->sqlLen); + pTable->sql[pTable->sqlLen - 1] = 0; + mTrace("table:%s, stream sql len:%d sql:%s", pTable->info.tableId, pTable->sqlLen, pTable->sql); + } + } + SSdbOperDesc desc = {0}; desc.type = SDB_OPER_TYPE_GLOBAL; desc.pObj = pTable; @@ -327,24 +444,81 @@ void* mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t return NULL; } - mTrace("ctable:%s, create ctable in vgroup, uid:%" PRIu64 , pTable->tableId, pTable->uid); + mTrace("ctable:%s, create ctable in vgroup, uid:%" PRIu64 , pTable->info.tableId, pTable->uid); return pTable; } -int32_t mgmtDropChildTable(SQueuedMsg *newMsg, SChildTableObj *pTable) { +void mgmtCreateChildTable(SQueuedMsg *pMsg) { + SCMCreateTableMsg *pCreate = pMsg->pCont; + + int32_t code = mgmtCheckTimeSeries(htons(pCreate->numOfColumns)); + if (code != TSDB_CODE_SUCCESS) { + mError("table:%s, failed to create, timeseries exceed the limit", pCreate->tableId); + mgmtSendSimpleResp(pMsg->thandle, code); + return; + } + + SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); + memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); + pMsg->pCont = NULL; + + SVgObj *pVgroup = mgmtGetAvailableVgroup(pMsg->pDb); + if (pVgroup == NULL) { + mTrace("table:%s, start to create a new vgroup", pCreate->tableId); + mgmtCreateVgroup(newMsg); + return; + } + + int32_t sid = taosAllocateId(pVgroup->idPool); + if (sid < 0) { + mTrace("tables:%s, no enough sid in vgroup:%d", pVgroup->vgId); + mgmtCreateVgroup(newMsg); + return; + } + + SChildTableObj *pTable = mgmtDoCreateChildTable(pCreate, pVgroup, sid); + if (pTable == NULL) { + mgmtSendSimpleResp(pMsg->thandle, terrno); + mgmtFreeQueuedMsg(newMsg); + return; + } + + SMDCreateTableMsg *pMDCreate = mgmtBuildCreateChildTableMsg(pCreate, (SChildTableObj *) pTable); + if (pMDCreate == NULL) { + mgmtSendSimpleResp(pMsg->thandle, terrno); + mgmtFreeQueuedMsg(newMsg); + return; + } + + SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); + SRpcMsg rpcMsg = { + .handle = newMsg, + .pCont = pMDCreate, + .contLen = htonl(pMDCreate->contLen), + .code = 0, + .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE + }; + + newMsg->ahandle = pTable; + mgmtSendMsgToDnode(&ipSet, &rpcMsg); +} + +static int32_t mgmtDropChildTable(SQueuedMsg *newMsg, SChildTableObj *pTable) { SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { - mError("ctable:%s, failed to drop child table, vgroup not exist", pTable->tableId); - return TSDB_CODE_OTHERS; + mError("ctable:%s, failed to drop child table, vgroup not exist", pTable->info.tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OTHERS); + return; } SMDDropTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropTableMsg)); if (pDrop == NULL) { - mError("ctable:%s, failed to drop child table, no enough memory", pTable->tableId); - return TSDB_CODE_SERV_OUT_OF_MEMORY; + mError("ctable:%s, failed to drop child table, no enough memory", pTable->info.tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); + return; } - strcpy(pDrop->tableId, pTable->tableId); + strcpy(pDrop->tableId, pTable->info.tableId); pDrop->vgId = htonl(pTable->vgId); pDrop->contLen = htonl(sizeof(SMDDropTableMsg)); pDrop->sid = htonl(pTable->sid); @@ -363,8 +537,6 @@ int32_t mgmtDropChildTable(SQueuedMsg *newMsg, SChildTableObj *pTable) { newMsg->ahandle = pTable; mgmtSendMsgToDnode(&ipSet, &rpcMsg); - - return TSDB_CODE_SUCCESS; } void* mgmtGetChildTable(char *tableId) { @@ -379,7 +551,7 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName // } // // //TODO send msg to dnode -// mTrace("Succeed to modify tag column %d of table %s", col, pTable->tableId); +// mTrace("Succeed to modify tag column %d of table %s", col, pTable->info.tableId); // return TSDB_CODE_SUCCESS; // int32_t rowSize = 0; @@ -411,42 +583,220 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName // if (pTable->isDirty) pTable->isDirty = 0; // // if (ret < 0) { -// mError("Failed to modify tag column %d of table %s", col, pTable->tableId); +// mError("Failed to modify tag column %d of table %s", col, pTable->info.tableId); // return TSDB_CODE_APP_ERROR; // } // -// mTrace("Succeed to modify tag column %d of table %s", col, pTable->tableId); +// mTrace("Succeed to modify tag column %d of table %s", col, pTable->info.tableId); // return TSDB_CODE_SUCCESS; return 0; } -int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp) { - pMeta->uid = htobe64(pTable->uid); - pMeta->sid = htonl(pTable->sid); - pMeta->vgId = htonl(pTable->vgId); - pMeta->sversion = htons(pTable->superTable->sversion); - pMeta->precision = pDb->cfg.precision; - pMeta->numOfTags = pTable->superTable->numOfTags; - pMeta->numOfColumns = htons(pTable->superTable->numOfColumns); - pMeta->tableType = pTable->type; - pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable->superTable); - strncpy(pMeta->tableId, pTable->tableId, tListLen(pTable->tableId)); +static int32_t mgmtFindNormalTableColumnIndex(SNormalTableObj *pTable, char *colName) { + SSchema *schema = (SSchema *) pTable->schema; + for (int32_t i = 0; i < pTable->numOfColumns; i++) { + if (strcasecmp(schema[i].name, colName) == 0) { + return i; + } + } + + return -1; +} + +static int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols) { + if (ncols <= 0) { + return TSDB_CODE_APP_ERROR; + } + + for (int32_t i = 0; i < ncols; i++) { + if (mgmtFindNormalTableColumnIndex(pTable, schema[i].name) > 0) { + return TSDB_CODE_APP_ERROR; + } + } + + SDbObj *pDb = mgmtGetDbByTableId(pTable->info.tableId); + if (pDb == NULL) { + mError("table: %s not belongs to any database", pTable->info.tableId); + return TSDB_CODE_APP_ERROR; + } + + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); + if (pAcct == NULL) { + mError("DB: %s not belongs to andy account", pDb->name); + return TSDB_CODE_APP_ERROR; + } + + int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); + pTable->schema = realloc(pTable->schema, schemaSize + sizeof(SSchema) * ncols); + + memcpy(pTable->schema + schemaSize, schema, sizeof(SSchema) * ncols); + + SSchema *tschema = (SSchema *) (pTable->schema + sizeof(SSchema) * pTable->numOfColumns); + for (int32_t i = 0; i < ncols; i++) { + tschema[i].colId = pTable->nextColId++; + } + + pTable->numOfColumns += ncols; + pTable->sversion++; + pAcct->acctInfo.numOfTimeSeries += ncols; + + SSdbOperDesc desc = {0}; + desc.type = SDB_OPER_TYPE_GLOBAL; + desc.pObj = pTable; + desc.table = tsNormalTableSdb; + desc.rowData = pTable; + desc.rowSize = tsChildTableUpdateSize; + sdbUpdateRow(&desc); + + return TSDB_CODE_SUCCESS; +} + +static int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName) { + int32_t col = mgmtFindNormalTableColumnIndex(pTable, colName); + if (col < 0) { + return TSDB_CODE_APP_ERROR; + } + + SDbObj *pDb = mgmtGetDbByTableId(pTable->info.tableId); + if (pDb == NULL) { + mError("table: %s not belongs to any database", pTable->info.tableId); + return TSDB_CODE_APP_ERROR; + } + + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); + if (pAcct == NULL) { + mError("DB: %s not belongs to any account", pDb->name); + return TSDB_CODE_APP_ERROR; + } + + memmove(pTable->schema + sizeof(SSchema) * col, pTable->schema + sizeof(SSchema) * (col + 1), + sizeof(SSchema) * (pTable->numOfColumns - col - 1)); + + pTable->numOfColumns--; + pTable->sversion++; + + pAcct->acctInfo.numOfTimeSeries--; + + SSdbOperDesc desc = {0}; + desc.type = SDB_OPER_TYPE_GLOBAL; + desc.pObj = pTable; + desc.table = tsNormalTableSdb; + desc.rowData = pTable; + desc.rowSize = tsChildTableUpdateSize; + sdbUpdateRow(&desc); + + return TSDB_CODE_SUCCESS; +} + +static int32_t mgmtSetSchemaFromNormalTable(SSchema *pSchema, SChildTableObj *pTable) { + int32_t numOfCols = pTable->numOfColumns; + for (int32_t i = 0; i < numOfCols; ++i) { + strcpy(pSchema->name, pTable->schema[i].name); + pSchema->type = pTable->schema[i].type; + pSchema->bytes = htons(pTable->schema[i].bytes); + pSchema->colId = htons(pTable->schema[i].colId); + pSchema++; + } + + return numOfCols * sizeof(SSchema); +} + +static void mgmtGetChildTableMeta(SQueuedMsg *pMsg, SDbObj *pDb, SChildTableObj *pTable) { + STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS); + if (pMeta == NULL) { + mError("table:%s, failed to get table meta, no enough memory", pTable->info.tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); + return; + } + + pMeta->uid = htobe64(pTable->uid); + pMeta->sid = htonl(pTable->sid); + pMeta->vgId = htonl(pTable->vgId); + pMeta->precision = pDb->cfg.precision; + pMeta->tableType = pTable->info.type; + strncpy(pMeta->tableId, pTable->info.tableId, tListLen(pTable->info.tableId)); + + if (pTable->info.type == TSDB_CHILD_TABLE) { + pMeta->sversion = htons(pTable->superTable->sversion); + pMeta->numOfTags = htons(pTable->superTable->numOfTags); + pMeta->numOfColumns = htons(pTable->superTable->numOfColumns); + pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable->superTable); + } else { + pMeta->sversion = htons(pTable->sversion); + pMeta->numOfTags = 0; + pMeta->numOfColumns = htons(pTable->numOfColumns); + pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromNormalTable(pMeta->schema, pTable); + } + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { - return TSDB_CODE_INVALID_TABLE; + mError("table:%s, failed to get table meta, db not selected", pTable->info.tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_VGROUP_ID); + return; } + for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { - if (usePublicIp) { - pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; + if (pMsg->usePublicIp) { + pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; } else { - pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].privateIp; + pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].privateIp; } pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); } pMeta->numOfVpeers = pVgroup->numOfVnodes; - return TSDB_CODE_SUCCESS; + SRpcMsg rpcRsp = { + .handle = pMsg->thandle, + .pCont = pMeta, + .contLen = pMeta->contLen, + }; + pMeta->contLen = htons(pMeta->contLen); + rpcSendResponse(&rpcRsp); + + mTrace("table:%s, uid:%" PRIu64 " table meta is retrieved", pTable->info.tableId, pTable->uid); +} + +void mgmtProcessChildTableMetaMsg(SQueuedMsg *pMsg) { + SCMTableInfoMsg *pInfo = pMsg->pCont; + SDbObj *pDb = mgmtGetDbByTableId(pTable->info.tableId); + if (pDb == NULL || pDb->dirty) { + mError("table:%s, failed to get table meta, db not selected", pTable->info.tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); + return; + } + + STableInfo *pTable = mgmtGetTable(pInfo->tableId); + if (pTable == NULL) { + if (htons(pInfo->createFlag) != 1) { + mError("table:%s, failed to get table meta, table not exist", pInfo->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); + return; + } else { + //TODO: on demand create table from super table if table does not exists + int32_t contLen = sizeof(SCMCreateTableMsg) + sizeof(STagData); + SCMCreateTableMsg *pCreateMsg = rpcMallocCont(contLen); + if (pCreateMsg == NULL) { + mError("table:%s, failed to create table while get meta info, no enough memory", pInfo->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); + return; + } + memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData)); + strcpy(pCreateMsg->tableId, pInfo->tableId); + + SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); + memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); + pMsg->pCont = NULL; + + newMsg->ahandle = newMsg->pCont; + newMsg->pCont = pCreateMsg; + mTrace("table:%s, start to create in demand", pInfo->tableId); + mgmtAddToShellQueue(newMsg); + return; + } + } + + mgmtGetChildTableMeta(pMsg, pDb, pTable); } void mgmtDropAllChildTables(SDbObj *pDropDb) { @@ -462,7 +812,7 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) { break; } - if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) { + if (strncmp(pDropDb->name, pTable->info.tableId, dbNameLen) == 0) { SSdbOperDesc oper = { .type = SDB_OPER_TYPE_LOCAL, .table = tsChildTableSdb, @@ -504,4 +854,360 @@ void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) { } mTrace("stable:%s, all child tables:%d is dropped from sdb", pStable->tableId, numOfTables); +} + +static STableInfo* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_t sid) { + SDnodeObj *pObj = mgmtGetDnode(dnodeId); + SVgObj *pVgroup = mgmtGetVgroup(vnode); + + if (pObj == NULL || pVgroup == NULL) { + return NULL; + } + + return pVgroup->tableList[sid]; +} + +static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) { + if (mgmtCheckRedirect(rpcMsg->handle)) return; + + SDMConfigTableMsg *pCfg = (SDMConfigTableMsg *) rpcMsg->pCont; + pCfg->dnode = htonl(pCfg->dnode); + pCfg->vnode = htonl(pCfg->vnode); + pCfg->sid = htonl(pCfg->sid); + mTrace("dnode:%s, vnode:%d, sid:%d, receive table config msg", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); + + STableInfo *pTable = mgmtGetTableByPos(pCfg->dnode, pCfg->vnode, pCfg->sid); + if (pTable == NULL) { + mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_TABLE); + return; + } + + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS); + + SMDCreateTableMsg *pMDCreate = NULL; + pMDCreate = mgmtBuildCreateChildTableMsg(NULL, (SChildTableObj *) pTable); + if (pMDCreate == NULL) { + return; + } + + SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode); + SRpcMsg rpcRsp = { + .handle = NULL, + .pCont = pMDCreate, + .contLen = htonl(pMDCreate->contLen), + .code = 0, + .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE + }; + mgmtSendMsgToDnode(&ipSet, &rpcRsp); +} + +static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { + if (rpcMsg->handle == NULL) return; + + SQueuedMsg *queueMsg = rpcMsg->handle; + queueMsg->received++; + + STableInfo *pTable = queueMsg->ahandle; + mTrace("table:%s, drop table rsp received, thandle:%p result:%s", pTable->info.tableId, queueMsg->thandle, tstrerror(rpcMsg->code)); + + if (rpcMsg->code != TSDB_CODE_SUCCESS) { + mError("table:%s, failed to drop in dnode, reason:%s", pTable->info.tableId, tstrerror(rpcMsg->code)); + mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); + free(queueMsg); + return; + } + + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + mError("table:%s, failed to get vgroup", pTable->info.tableId); + mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_INVALID_VGROUP_ID); + free(queueMsg); + return; + } + + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsChildTableSdb, + .pObj = pTable + }; + int32_t code = sdbDeleteRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + mError("table:%s, update ctables sdb error", pTable->info.tableId); + mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); + free(queueMsg); + return; + } + + if (pVgroup->numOfTables <= 0) { + mPrint("vgroup:%d, all tables is dropped, drop vgroup", pVgroup->vgId); + mgmtDropVgroup(pVgroup, NULL); + } + + mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SUCCESS); + free(queueMsg); +} + +static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg) { + mTrace("drop stable rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); +} + +static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) { + if (rpcMsg->handle == NULL) return; + + SQueuedMsg *queueMsg = rpcMsg->handle; + queueMsg->received++; + + STableInfo *pTable = queueMsg->ahandle; + mTrace("table:%s, create table rsp received, thandle:%p ahandle:%p result:%s", pTable->info.tableId, queueMsg->thandle, + rpcMsg->handle, tstrerror(rpcMsg->code)); + + if (rpcMsg->code != TSDB_CODE_SUCCESS) { + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsChildTableSdb, + .pObj = pTable + }; + sdbDeleteRow(&oper); + + mError("table:%s, failed to create in dnode, reason:%s", pTable->info.tableId, tstrerror(rpcMsg->code)); + mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); + } else { + mTrace("table:%s, created in dnode", pTable->info.tableId); + if (queueMsg->msgType != TSDB_MSG_TYPE_CM_CREATE_TABLE) { + SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg)); + newMsg->msgType = queueMsg->msgType; + newMsg->thandle = queueMsg->thandle; + newMsg->pDb = queueMsg->pDb; + newMsg->pUser = queueMsg->pUser; + newMsg->contLen = queueMsg->contLen; + newMsg->pCont = rpcMallocCont(newMsg->contLen); + memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen); + mTrace("table:%s, start to get meta", pTable->info.tableId); + mgmtAddToShellQueue(newMsg); + } else { + mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); + } + } + + free(queueMsg); +} + +static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg) { + mTrace("alter table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); +} + +static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) { + SRpcConnInfo connInfo; + if (rpcGetConnInfo(pMsg->thandle, &connInfo) != 0) { + mError("conn:%p is already released while get mulit table meta", pMsg->thandle); + return; + } + + bool usePublicIp = (connInfo.serverIp == tsPublicIpInt); + SUserObj *pUser = mgmtGetUser(connInfo.user); + if (pUser == NULL) { + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); + return; + } + + SCMMultiTableInfoMsg *pInfo = pMsg->pCont; + pInfo->numOfTables = htonl(pInfo->numOfTables); + + int32_t totalMallocLen = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice + SMultiTableMeta *pMultiMeta = rpcMallocCont(totalMallocLen); + if (pMultiMeta == NULL) { + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); + return; + } + + pMultiMeta->contLen = sizeof(SMultiTableMeta); + pMultiMeta->numOfTables = 0; + + for (int t = 0; t < pInfo->numOfTables; ++t) { + char *tableId = (char*)(pInfo->tableIds + t * TSDB_TABLE_ID_LEN); + STableInfo *pTable = mgmtGetTable(tableId); + if (pTable == NULL) continue; + + SDbObj *pDb = mgmtGetDbByTableId(tableId); + if (pDb == NULL) continue; + + int availLen = totalMallocLen - pMultiMeta->contLen; + if (availLen <= sizeof(STableMetaMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS) { + //TODO realloc + //totalMallocLen *= 2; + //pMultiMeta = rpcReMalloc(pMultiMeta, totalMallocLen); + //if (pMultiMeta == NULL) { + /// rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); + // return TSDB_CODE_SERV_OUT_OF_MEMORY; + //} else { + // t--; + // continue; + //} + } + + STableMetaMsg *pMeta = (STableMetaMsg *)(pMultiMeta->metas + pMultiMeta->contLen); + int32_t code = mgmtGetTableMeta(pDb, pTable, pMeta, usePublicIp); + if (code == TSDB_CODE_SUCCESS) { + pMultiMeta->numOfTables ++; + pMultiMeta->contLen += pMeta->contLen; + } + } + + SRpcMsg rpcRsp = {0}; + rpcRsp.handle = pMsg->thandle; + rpcRsp.pCont = pMultiMeta; + rpcRsp.contLen = pMultiMeta->contLen; + rpcSendResponse(&rpcRsp); +} + +static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { + SDbObj *pDb = mgmtGetDb(pShow->db); + if (pDb == NULL) { + return TSDB_CODE_DB_NOT_SELECTED; + } + + int32_t cols = 0; + SSchema *pSchema = pMeta->schema; + + pShow->bytes[cols] = TSDB_TABLE_NAME_LEN; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "table name"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "create time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 2; + pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; + strcpy(pSchema[cols].name, "columns"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_TABLE_NAME_LEN; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "stable name"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = pDb->numOfTables; + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + + return 0; +} + +static void mgmtVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) { + if (rows < capacity) { + for (int32_t i = 0; i < numOfCols; ++i) { + memmove(data + pShow->offset[i] * rows, data + pShow->offset[i] * capacity, pShow->bytes[i] * rows); + } + } +} + +static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) { + SDbObj *pDb = mgmtGetDb(pShow->db); + if (pDb == NULL) return 0; + + SUserObj *pUser = mgmtGetUserFromConn(pConn); + if (pUser == NULL) return 0; + + if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { + if (strcmp(pUser->user, "root") != 0 && strcmp(pUser->user, "_root") != 0 && + strcmp(pUser->user, "monitor") != 0) { + return 0; + } + } + + int32_t numOfRows = 0; + SChildTableObj *pTable = NULL; + SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; + + char prefix[64] = {0}; + strcpy(prefix, pDb->name); + strcat(prefix, TS_PATH_DELIMITER); + int32_t prefixLen = strlen(prefix); + + while (numOfRows < rows) { + pShow->pNode = sdbFetchRow(tsNormalTableSdb, pShow->pNode, (void **) &pTable); + if (pTable == NULL) break; + + // not belong to current db + if (strncmp(tableId, prefix, prefixLen)) { + continue; + } + + char tableName[TSDB_TABLE_NAME_LEN] = {0}; + memset(tableName, 0, tListLen(tableName)); + numOfRead++; + + // pattern compare for meter name + mgmtExtractTableName(pTable->info.tableId, tableName); + + if (pShow->payloadLen > 0 && + patternMatch(pShow->payload, tableName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH) { + continue; + } + + int32_t cols = 0; + + char *pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strncpy(pWrite, tableName, TSDB_TABLE_NAME_LEN); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *) pWrite = pTable->createdTime; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + if (pTable->info.type == TSDB_CHILD_TABLE) { + *(int16_t *)pWrite = pTable->superTable->numOfColumns; + } else { + *(int16_t *)pWrite = pTable->numOfColumns; + } + + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + if (pTable->info.type == TSDB_CHILD_TABLE) { + mgmtExtractTableName(pTable->superTableId, pWrite); + } + cols++; + + numOfRows++; + } + + pShow->numOfReads += numOfRead; + const int32_t NUM_OF_COLUMNS = 4; + + mgmtVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + + return numOfRows; +} + +void mgmtAlterChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable) { + int32_t code = TSDB_CODE_OPS_NOT_SUPPORT; + SCMAlterTableMsg *pAlter = pMsg->pCont;; + + if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) { + code = mgmtModifyChildTableTagValueByName((SChildTableObj *)pTable, pAlter->schema[0].name, pAlter->tagVal); + } else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) { + code = mgmtAddNormalTableColumn((SNormalTableObj *)pTable, pAlter->schema, 1); + } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) { + code = mgmtDropNormalTableColumnByName((SNormalTableObj *)pTable, pAlter->schema[0].name); + } else { + } + + mgmtSendSimpleResp(pMsg->thandle, code); } \ No newline at end of file diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c deleted file mode 100644 index fde9db2d743c947148f2412099470d9058ba481a..0000000000000000000000000000000000000000 --- a/src/mnode/src/mgmtNormalTable.c +++ /dev/null @@ -1,551 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "taosmsg.h" -#include "tscompression.h" -#include "tskiplist.h" -#include "ttime.h" -#include "tstatus.h" -#include "tutil.h" -#include "mnode.h" -#include "mgmtAcct.h" -#include "mgmtDb.h" -#include "mgmtDClient.h" -#include "mgmtGrant.h" -#include "mgmtMnode.h" -#include "mgmtNormalTable.h" -#include "mgmtSdb.h" -#include "mgmtSuperTable.h" -#include "mgmtTable.h" -#include "mgmtVgroup.h" - -void *tsNormalTableSdb; -int32_t tsNormalTableUpdateSize; - -static void mgmtDestroyNormalTable(SNormalTableObj *pTable) { - tfree(pTable->schema); - tfree(pTable->sql); - tfree(pTable); -} - -static int32_t mgmtNormalTableActionDestroy(SSdbOperDesc *pOper) { - mgmtDestroyNormalTable(pOper->pObj); - return TSDB_CODE_SUCCESS; -} - -static int32_t mgmtNormalTableActionInsert(SSdbOperDesc *pOper) { - SNormalTableObj *pTable = pOper->pObj; - - SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); - if (pVgroup == NULL) { - mError("ntable:%s not in vgroup:%d", pTable->tableId, pTable->vgId); - return TSDB_CODE_INVALID_VGROUP_ID; - } - - SDbObj *pDb = mgmtGetDb(pVgroup->dbName); - if (pDb == NULL) { - mError("ntable:%s, vgroup:%d not in DB:%s", pTable->tableId, pVgroup->vgId, pVgroup->dbName); - return TSDB_CODE_INVALID_DB; - } - - SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); - if (pAcct == NULL) { - mError("ntable:%s, account:%s not exists", pTable->tableId, pDb->cfg.acct); - return TSDB_CODE_INVALID_ACCT; - } - - mgmtAddTimeSeries(pAcct, pTable->numOfColumns - 1); - mgmtAddTableIntoDb(pDb); - mgmtAddTableIntoVgroup(pVgroup, (STableInfo *) pTable); - - return TSDB_CODE_SUCCESS; -} - -static int32_t mgmtNormalTableActionDelete(SSdbOperDesc *pOper) { - SNormalTableObj *pTable = pOper->pObj; - if (pTable->vgId == 0) { - return TSDB_CODE_INVALID_VGROUP_ID; - } - - SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); - if (pVgroup == NULL) { - return TSDB_CODE_INVALID_VGROUP_ID; - } - - SDbObj *pDb = mgmtGetDb(pVgroup->dbName); - if (pDb == NULL) { - mError("ntable:%s, vgroup:%d not in DB:%s", pTable->tableId, pVgroup->vgId, pVgroup->dbName); - return TSDB_CODE_INVALID_DB; - } - - SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); - if (pAcct == NULL) { - mError("account not exists"); - return TSDB_CODE_INVALID_ACCT; - } - - mgmtRestoreTimeSeries(pAcct, pTable->numOfColumns - 1); - mgmtRemoveTableFromDb(pDb); - mgmtRemoveTableFromVgroup(pVgroup, (STableInfo *) pTable); - - return TSDB_CODE_SUCCESS; -} - -static int32_t mgmtNormalTableActionUpdate(SSdbOperDesc *pOper) { - return TSDB_CODE_SUCCESS; -} - -static int32_t mgmtNormalTableActionEncode(SSdbOperDesc *pOper) { - SNormalTableObj *pTable = pOper->pObj; - assert(pOper->pObj != NULL && pOper->rowData != NULL); - - int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); - if (pOper->maxRowSize < tsNormalTableUpdateSize + schemaSize) { - return TSDB_CODE_INVALID_MSG_LEN; - } - - memcpy(pOper->rowData, pTable, tsNormalTableUpdateSize); - memcpy(pOper->rowData + tsNormalTableUpdateSize, pTable->schema, schemaSize); - memcpy(pOper->rowData + tsNormalTableUpdateSize + schemaSize, pTable->sql, pTable->sqlLen); - - pOper->rowSize = tsNormalTableUpdateSize + schemaSize + pTable->sqlLen; - return TSDB_CODE_SUCCESS; -} - -static int32_t mgmtNormalTableActionDecode(SSdbOperDesc *pOper) { - assert(pOper->rowData != NULL); - - SNormalTableObj *pTable = (SNormalTableObj *)calloc(1, sizeof(SNormalTableObj)); - if (pTable == NULL) TSDB_CODE_SERV_OUT_OF_MEMORY; - - memcpy(pTable, pOper->rowData, tsNormalTableUpdateSize); - - int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); - pTable->schema = (SSchema *)malloc(schemaSize); - if (pTable->schema == NULL) { - mgmtDestroyNormalTable(pTable); - return -1; - } - - memcpy(pTable->schema, pOper->rowData + tsNormalTableUpdateSize, schemaSize); - - pTable->sql = (char *)malloc(pTable->sqlLen); - if (pTable->sql == NULL) { - mgmtDestroyNormalTable(pTable); - return -1; - } - memcpy(pTable->sql, pOper->rowData + tsNormalTableUpdateSize + schemaSize, pTable->sqlLen); - - pOper->pObj = pTable; - return TSDB_CODE_SUCCESS; -} - -int32_t mgmtInitNormalTables() { - void *pNode = NULL; - void *pLastNode = NULL; - SNormalTableObj *pTable = NULL; - - SNormalTableObj tObj; - tsNormalTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; - - SSdbTableDesc tableDesc = { - .tableName = "ntables", - .hashSessions = TSDB_MAX_NORMAL_TABLES, - .maxRowSize = sizeof(SNormalTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS, - .keyType = SDB_KEY_TYPE_STRING, - .insertFp = mgmtNormalTableActionInsert, - .deleteFp = mgmtNormalTableActionDelete, - .updateFp = mgmtNormalTableActionUpdate, - .encodeFp = mgmtNormalTableActionEncode, - .decodeFp = mgmtNormalTableActionDecode, - .destroyFp = mgmtNormalTableActionDestroy, - }; - - tsNormalTableSdb = sdbOpenTable(&tableDesc); - if (tsNormalTableSdb == NULL) { - mError("failed to init ntables data"); - return -1; - } - - while (1) { - pLastNode = pNode; - pNode = sdbFetchRow(tsNormalTableSdb, pNode, (void **)&pTable); - if (pTable == NULL) break; - - SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); - if (pDb == NULL) { - mError("ntable:%s, failed to get db, discard it", pTable->tableId); - SSdbOperDesc desc = {0}; - desc.type = SDB_OPER_TYPE_LOCAL; - desc.pObj = pTable; - desc.table = tsNormalTableSdb; - sdbDeleteRow(&desc); - pNode = pLastNode; - continue; - } - - SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); - if (pVgroup == NULL) { - mError("ntable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->tableId, pTable->vgId, pTable->sid); - pTable->vgId = 0; - SSdbOperDesc desc = {0}; - desc.type = SDB_OPER_TYPE_LOCAL; - desc.pObj = pTable; - desc.table = tsNormalTableSdb; - sdbDeleteRow(&desc); - pNode = pLastNode; - continue; - } - - if (strcmp(pVgroup->dbName, pDb->name) != 0) { - mError("ntable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it", - pTable->tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid); - pTable->vgId = 0; - - SSdbOperDesc desc = {0}; - desc.type = SDB_OPER_TYPE_LOCAL; - desc.pObj = pTable; - desc.table = tsNormalTableSdb; - sdbDeleteRow(&desc); - pNode = pLastNode; - continue; - } - - if (pVgroup->tableList == NULL) { - mError("ntable:%s, vgroup:%d tableList is null", pTable->tableId, pTable->vgId); - pTable->vgId = 0; - SSdbOperDesc desc = {0}; - desc.type = SDB_OPER_TYPE_LOCAL; - desc.pObj = pTable; - desc.table = tsNormalTableSdb; - sdbDeleteRow(&desc); - pNode = pLastNode; - continue; - } - } - - mTrace("ntables is initialized"); - return 0; -} - -void mgmtCleanUpNormalTables() { - sdbCloseTable(tsNormalTableSdb); -} - -void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable) { - int32_t totalCols = pTable->numOfColumns; - int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen; - - SMDCreateTableMsg *pCreate = rpcMallocCont(contLen); - if (pCreate == NULL) { - terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; - return NULL; - } - - memcpy(pCreate->tableId, pTable->tableId, TSDB_TABLE_ID_LEN + 1); - pCreate->contLen = htonl(contLen); - pCreate->vgId = htonl(pTable->vgId); - pCreate->tableType = pTable->type; - pCreate->numOfColumns = htons(pTable->numOfColumns); - pCreate->numOfTags = 0; - pCreate->sid = htonl(pTable->sid); - pCreate->sversion = htonl(pTable->sversion); - pCreate->tagDataLen = 0; - pCreate->sqlDataLen = htonl(pTable->sqlLen); - pCreate->uid = htobe64(pTable->uid); - pCreate->superTableUid = 0; - pCreate->createdTime = htobe64(pTable->createdTime); - - SSchema *pSchema = (SSchema *) pCreate->data; - memcpy(pSchema, pTable->schema, totalCols * sizeof(SSchema)); - for (int32_t col = 0; col < totalCols; ++col) { - pSchema->bytes = htons(pSchema->bytes); - pSchema->colId = htons(pSchema->colId); - pSchema++; - } - - memcpy(pCreate + sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen); - return pCreate; -} - -void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) { - SNormalTableObj *pTable = (SNormalTableObj *) calloc(sizeof(SNormalTableObj), 1); - if (pTable == NULL) { - mError("table:%s, failed to alloc memory", pCreate->tableId); - terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; - return NULL; - } - - strcpy(pTable->tableId, pCreate->tableId); - pTable->type = TSDB_NORMAL_TABLE; - pTable->vgId = pVgroup->vgId; - pTable->createdTime = taosGetTimestampMs(); - pTable->uid = (((uint64_t) pTable->createdTime) << 16) + (sdbGetVersion() & ((1ul << 16) - 1ul)); - pTable->sid = sid; - pTable->sversion = 0; - pTable->numOfColumns = htons(pCreate->numOfColumns); - pTable->sqlLen = htons(pCreate->sqlLen); - - int32_t numOfCols = pTable->numOfColumns; - int32_t schemaSize = numOfCols * sizeof(SSchema); - pTable->schema = (SSchema *) calloc(1, schemaSize); - if (pTable->schema == NULL) { - free(pTable); - terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; - return NULL; - } - memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema)); - - pTable->nextColId = 0; - for (int32_t col = 0; col < numOfCols; col++) { - SSchema *tschema = pTable->schema; - tschema[col].colId = pTable->nextColId++; - tschema[col].bytes = htons(tschema[col].bytes); - } - - if (pTable->sqlLen != 0) { - pTable->type = TSDB_STREAM_TABLE; - pTable->sql = calloc(1, pTable->sqlLen); - if (pTable->sql == NULL) { - free(pTable); - terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; - return NULL; - } - memcpy(pTable->sql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pTable->sqlLen); - pTable->sql[pTable->sqlLen - 1] = 0; - mTrace("table:%s, stream sql len:%d sql:%s", pTable->tableId, pTable->sqlLen, pTable->sql); - } - - SSdbOperDesc desc = {0}; - desc.type = SDB_OPER_TYPE_GLOBAL; - desc.pObj = pTable; - desc.table = tsNormalTableSdb; - if (sdbInsertRow(&desc) != TSDB_CODE_SUCCESS) { - mError("table:%s, update sdb error", pTable->tableId); - free(pTable); - terrno = TSDB_CODE_SDB_ERROR; - return NULL; - } - - mTrace("table:%s, create ntable in vgroup, uid:%" PRIu64 , pTable->tableId, pTable->uid); - return pTable; -} - -int32_t mgmtDropNormalTable(SQueuedMsg *newMsg, SNormalTableObj *pTable) { - SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); - if (pVgroup == NULL) { - mError("table:%s, failed to drop normal table, vgroup not exist", pTable->tableId); - return TSDB_CODE_OTHERS; - } - - SMDDropTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropTableMsg)); - if (pDrop == NULL) { - mError("table:%s, failed to drop normal table, no enough memory", pTable->tableId); - return TSDB_CODE_SERV_OUT_OF_MEMORY; - } - - strcpy(pDrop->tableId, pTable->tableId); - pDrop->contLen = htonl(sizeof(SMDDropTableMsg)); - pDrop->vgId = htonl(pVgroup->vgId); - pDrop->sid = htonl(pTable->sid); - pDrop->uid = htobe64(pTable->uid); - - SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - mTrace("table:%s, send drop table msg", pDrop->tableId); - SRpcMsg rpcMsg = { - .handle = newMsg, - .pCont = pDrop, - .contLen = sizeof(SMDDropTableMsg), - .code = 0, - .msgType = TSDB_MSG_TYPE_MD_DROP_TABLE - }; - - newMsg->ahandle = pTable; - mgmtSendMsgToDnode(&ipSet, &rpcMsg); - return TSDB_CODE_SUCCESS; -} - -void* mgmtGetNormalTable(char *tableId) { - return sdbGetRow(tsNormalTableSdb, tableId); -} - -static int32_t mgmtFindNormalTableColumnIndex(SNormalTableObj *pTable, char *colName) { - SSchema *schema = (SSchema *) pTable->schema; - for (int32_t i = 0; i < pTable->numOfColumns; i++) { - if (strcasecmp(schema[i].name, colName) == 0) { - return i; - } - } - - return -1; -} - -int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols) { - if (ncols <= 0) { - return TSDB_CODE_APP_ERROR; - } - - for (int32_t i = 0; i < ncols; i++) { - if (mgmtFindNormalTableColumnIndex(pTable, schema[i].name) > 0) { - return TSDB_CODE_APP_ERROR; - } - } - - SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); - if (pDb == NULL) { - mError("table: %s not belongs to any database", pTable->tableId); - return TSDB_CODE_APP_ERROR; - } - - SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); - if (pAcct == NULL) { - mError("DB: %s not belongs to andy account", pDb->name); - return TSDB_CODE_APP_ERROR; - } - - int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); - pTable->schema = realloc(pTable->schema, schemaSize + sizeof(SSchema) * ncols); - - memcpy(pTable->schema + schemaSize, schema, sizeof(SSchema) * ncols); - - SSchema *tschema = (SSchema *) (pTable->schema + sizeof(SSchema) * pTable->numOfColumns); - for (int32_t i = 0; i < ncols; i++) { - tschema[i].colId = pTable->nextColId++; - } - - pTable->numOfColumns += ncols; - pTable->sversion++; - pAcct->acctInfo.numOfTimeSeries += ncols; - - SSdbOperDesc desc = {0}; - desc.type = SDB_OPER_TYPE_GLOBAL; - desc.pObj = pTable; - desc.table = tsNormalTableSdb; - desc.rowData = pTable; - desc.rowSize = tsNormalTableUpdateSize; - sdbUpdateRow(&desc); - - return TSDB_CODE_SUCCESS; -} - -int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName) { - int32_t col = mgmtFindNormalTableColumnIndex(pTable, colName); - if (col < 0) { - return TSDB_CODE_APP_ERROR; - } - - SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); - if (pDb == NULL) { - mError("table: %s not belongs to any database", pTable->tableId); - return TSDB_CODE_APP_ERROR; - } - - SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); - if (pAcct == NULL) { - mError("DB: %s not belongs to any account", pDb->name); - return TSDB_CODE_APP_ERROR; - } - - memmove(pTable->schema + sizeof(SSchema) * col, pTable->schema + sizeof(SSchema) * (col + 1), - sizeof(SSchema) * (pTable->numOfColumns - col - 1)); - - pTable->numOfColumns--; - pTable->sversion++; - - pAcct->acctInfo.numOfTimeSeries--; - - SSdbOperDesc desc = {0}; - desc.type = SDB_OPER_TYPE_GLOBAL; - desc.pObj = pTable; - desc.table = tsNormalTableSdb; - desc.rowData = pTable; - desc.rowSize = tsNormalTableUpdateSize; - sdbUpdateRow(&desc); - - return TSDB_CODE_SUCCESS; -} - -static int32_t mgmtSetSchemaFromNormalTable(SSchema *pSchema, SNormalTableObj *pTable) { - int32_t numOfCols = pTable->numOfColumns; - for (int32_t i = 0; i < numOfCols; ++i) { - strcpy(pSchema->name, pTable->schema[i].name); - pSchema->type = pTable->schema[i].type; - pSchema->bytes = htons(pTable->schema[i].bytes); - pSchema->colId = htons(pTable->schema[i].colId); - pSchema++; - } - - return numOfCols * sizeof(SSchema); -} - -int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp) { - pMeta->uid = htobe64(pTable->uid); - pMeta->sid = htonl(pTable->sid); - pMeta->vgId = htonl(pTable->vgId); - pMeta->sversion = htons(pTable->sversion); - pMeta->precision = pDb->cfg.precision; - pMeta->numOfTags = 0; - pMeta->numOfColumns = htons(pTable->numOfColumns); - pMeta->tableType = pTable->type; - pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromNormalTable(pMeta->schema, pTable); - - strncpy(pMeta->tableId, pTable->tableId, tListLen(pTable->tableId)); - - SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); - if (pVgroup == NULL) { - return TSDB_CODE_INVALID_TABLE; - } - for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { - if (usePublicIp) { - pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; - } else { - pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].privateIp; - } - - pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); - } - pMeta->numOfVpeers = pVgroup->numOfVnodes; - - return TSDB_CODE_SUCCESS; -} - -void mgmtDropAllNormalTables(SDbObj *pDropDb) { - void *pNode = NULL; - void *pLastNode = NULL; - int32_t numOfTables = 0; - int32_t dbNameLen = strlen(pDropDb->name); - SNormalTableObj *pTable = NULL; - - while (1) { - pNode = sdbFetchRow(tsNormalTableSdb, pNode, (void **)&pTable); - if (pTable == NULL) break; - - if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) { - SSdbOperDesc oper = { - .type = SDB_OPER_TYPE_LOCAL, - .table = tsNormalTableSdb, - .pObj = pTable, - }; - sdbDeleteRow(&oper); - pNode = pLastNode; - numOfTables++; - continue; - } - } - - mTrace("db:%s, all normal tables:%d is dropped from sdb", pDropDb->name, numOfTables); -} diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 3ff78b985e4cb55d07d3284f750a56bc4fb99faf..aa7a494a9fb7063536a900f8d1e081d3cec89658 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -138,13 +138,19 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { return; } + if (mgmtCheckExpired()) { + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED); + return; + } + if (tsMgmtProcessShellMsgFp[rpcMsg->msgType] == NULL) { mgmtProcessUnSupportMsg(rpcMsg); rpcFreeCont(rpcMsg->pCont); return; } - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); + bool usePublicIp = false; + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle, &usePublicIp); if (pUser == NULL) { mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_USER); rpcFreeCont(rpcMsg->pCont); @@ -158,6 +164,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { queuedMsg.contLen = rpcMsg->contLen; queuedMsg.pCont = rpcMsg->pCont; queuedMsg.pUser = pUser; + queuedMsg.usePublicIp = usePublicIp; (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(&queuedMsg); rpcFreeCont(rpcMsg->pCont); } else { @@ -167,6 +174,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { queuedMsg->contLen = rpcMsg->contLen; queuedMsg->pCont = rpcMsg->pCont; queuedMsg->pUser = pUser; + queuedMsg.usePublicIp = usePublicIp; mgmtAddToShellQueue(queuedMsg); } } diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index e8161deb4dc10b27277ab18e7248acf7a7b08967..4b8a4aba76424ace02e58d36200f8867b1738580 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -15,7 +15,8 @@ #define _DEFAULT_SOURCE #include "os.h" - +#include "name.h" +#include "tsqlfunction.h" #include "mgmtAcct.h" #include "mgmtChildTable.h" #include "mgmtDb.h" @@ -27,14 +28,11 @@ #include "mgmtTable.h" #include "mgmtUser.h" #include "mgmtVgroup.h" -#include "mnode.h" - -#include "name.h" -#include "tsqlfunction.h" static void *tsSuperTableSdb; static int32_t tsSuperTableUpdateSize; - +static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *queueMsg); +static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg); static int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); @@ -132,8 +130,10 @@ int32_t mgmtInitSuperTables() { return -1; } + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_STABLE_VGROUP, mgmtProcessSuperTableVgroupMsg); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mgmtGetShowSuperTableMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mgmtRetrieveShowSuperTables); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, mgmtProcessDropStableRsp); mTrace("stables is initialized"); return 0; @@ -143,10 +143,12 @@ void mgmtCleanUpSuperTables() { sdbCloseTable(tsSuperTableSdb); } -int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) { +void mgmtCreateSuperTable(SQueuedMsg *pMsg) { + SCMCreateTableMsg *pCreate = pMsg->pCont; SSuperTableObj *pStable = (SSuperTableObj *)calloc(1, sizeof(SSuperTableObj)); if (pStable == NULL) { - return TSDB_CODE_SERV_OUT_OF_MEMORY; + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); + return; } strcpy(pStable->tableId, pCreate->tableId); @@ -165,7 +167,8 @@ int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) { if (pStable->schema == NULL) { free(pStable); mError("stable:%s, no schema input", pCreate->tableId); - return TSDB_CODE_INVALID_TABLE; + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); + return; } memcpy(pStable->schema, pCreate->schema, numOfCols * sizeof(SSchema)); @@ -186,17 +189,17 @@ int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) { int32_t code = sdbInsertRow(&oper); if (code != TSDB_CODE_SUCCESS) { mgmtDestroySuperTable(pStable); - return TSDB_CODE_SDB_ERROR; + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SDB_ERROR); } else { mLPrint("stable:%s, is created, tags:%d cols:%d", pStable->tableId, pStable->numOfTags, pStable->numOfColumns); - return TSDB_CODE_SUCCESS; + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); } } -int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pStable) { +void mgmtDropSuperTable(SQueuedMsg *newMsg, SSuperTableObj *pTable) { if (pStable->numOfTables != 0) { mError("stable:%s, numOfTables:%d not 0", pStable->tableId, pStable->numOfTables); - return TSDB_CODE_OTHERS; + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OTHERS); } else { SSdbOperDesc oper = { .type = SDB_OPER_TYPE_GLOBAL, @@ -205,7 +208,7 @@ int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pSta }; int32_t code = sdbDeleteRow(&oper); mLPrint("stable:%s, is dropped from sdb, result:%s", pStable->tableId, tstrerror(code)); - return code; + mgmtSendSimpleResp(pMsg->thandle, code); } } @@ -213,14 +216,14 @@ void* mgmtGetSuperTable(char *tableId) { return sdbGetRow(tsSuperTableSdb, tableId); } -void *mgmtGetSuperTableVgroup(SSuperTableObj *pStable) { +static void *mgmtGetSuperTableVgroup(SSuperTableObj *pStable) { SCMSuperTableInfoRsp *rsp = rpcMallocCont(sizeof(SCMSuperTableInfoRsp) + sizeof(uint32_t) * mgmtGetDnodesNum()); rsp->numOfDnodes = htonl(1); rsp->dnodeIps[0] = htonl(inet_addr(tsPrivateIp)); return rsp; } -int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pStable, const char *tagName) { +static int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pStable, const char *tagName) { for (int32_t i = 0; i < pStable->numOfTags; i++) { SSchema *schema = (SSchema *)(pStable->schema + (pStable->numOfColumns + i) * sizeof(SSchema)); if (strcasecmp(tagName, schema->name) == 0) { @@ -231,7 +234,7 @@ int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pStable, const char *tagName) return -1; } -int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], int32_t ntags) { +static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], int32_t ntags) { if (pStable->numOfTags + ntags > TSDB_MAX_TAGS) { return TSDB_CODE_APP_ERROR; } @@ -279,7 +282,7 @@ int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], int32_t return TSDB_CODE_SUCCESS; } -int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) { +static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) { int32_t col = mgmtFindSuperTableTagIndex(pStable, tagName); if (col <= 0 || col >= pStable->numOfTags) { return TSDB_CODE_APP_ERROR; @@ -311,7 +314,7 @@ int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) { return TSDB_CODE_SUCCESS; } -int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char *oldTagName, char *newTagName) { +static int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char *oldTagName, char *newTagName) { int32_t col = mgmtFindSuperTableTagIndex(pStable, oldTagName); if (col < 0) { // Tag name does not exist @@ -362,7 +365,7 @@ static int32_t mgmtFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colN return -1; } -int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[], int32_t ncols) { +static int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[], int32_t ncols) { if (ncols <= 0) { return TSDB_CODE_APP_ERROR; } @@ -406,7 +409,7 @@ int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[], int32 return TSDB_CODE_SUCCESS; } -int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pStable, char *colName) { +static int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pStable, char *colName) { int32_t col = mgmtFindSuperTableColumnIndex(pStable, colName); if (col < 0) { return TSDB_CODE_APP_ERROR; @@ -519,12 +522,12 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v while (numOfRows < rows) { pShow->pNode = sdbFetchRow(tsSuperTableSdb, pShow->pNode, (void **) &pTable); if (pTable == NULL) break; - if (strncmp(pTable->tableId, prefix, prefixLen)) { + if (strncmp(pTable->info.tableId, prefix, prefixLen)) { continue; } memset(stableName, 0, tListLen(stableName)); - mgmtExtractTableName(pTable->tableId, stableName); + mgmtExtractTableName(pTable->info.tableId, stableName); if (pShow->payloadLen > 0 && patternMatch(pShow->payload, stableName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH) @@ -572,7 +575,7 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) { break; } - if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) { + if (strncmp(pDropDb->name, pTable->info.tableId, dbNameLen) == 0) { SSdbOperDesc oper = { .type = SDB_OPER_TYPE_LOCAL, .table = tsSuperTableSdb, @@ -588,14 +591,6 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) { mTrace("db:%s, all super tables:%d is dropped from sdb", pDropDb->name, numOfTables); } -void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable) { - pStable->numOfTables++; -} - -void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable) { - pStable->numOfTables--; -} - int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) { int32_t numOfCols = pTable->numOfColumns + pTable->numOfTags; for (int32_t i = 0; i < numOfCols; ++i) { @@ -609,32 +604,67 @@ int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) { return (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema); } -int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp) { +void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable) { + SCMTableInfoMsg *pInfo = pMsg->pCont; + SDbObj *pDb = pMsg->pDb; + + STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS); pMeta->uid = htobe64(pTable->uid); - pMeta->sid = htonl(pTable->sid); - pMeta->vgId = htonl(pTable->vgId); pMeta->sversion = htons(pTable->sversion); pMeta->precision = pDb->cfg.precision; pMeta->numOfTags = pTable->numOfTags; pMeta->numOfColumns = htons(pTable->numOfColumns); - pMeta->tableType = pTable->type; + pMeta->tableType = pTable->info.type; pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable); - strcpy(pMeta->tableId, pTable->tableId); + strcpy(pMeta->tableId, pTable->info.tableId); - return TSDB_CODE_SUCCESS; -} + SRpcMsg rpcRsp = { + .handle = pMsg->thandle, + .pCont = pMeta, + .contLen = pMeta->contLen, + }; + pMeta->contLen = htons(pMeta->contLen); + rpcSendResponse(&rpcRsp); -int32_t mgmtExtractTableName(const char* tableId, char* name) { - int pos = -1; - int num = 0; - for (pos = 0; tableId[pos] != 0; ++pos) { - if (tableId[pos] == '.') num++; - if (num == 2) break; - } + mTrace("stable:%%s, uid:%" PRIu64 " table meta is retrieved", pTable->info.tableId, pTable->uid); +} - if (num == 2) { - strcpy(name, tableId + pos + 1); +static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) { + SCMSuperTableInfoMsg *pInfo = pMsg->pCont; + STableInfo *pTable = mgmtGetSuperTable(pInfo->tableId); + if (pTable == NULL) { + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); + return; + } + + SCMSuperTableInfoRsp *pRsp = mgmtGetSuperTableVgroup((SSuperTableObj *) pTable); + if (pRsp != NULL) { + int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t); + SRpcMsg rpcRsp = {0}; + rpcRsp.handle = pMsg->thandle; + rpcRsp.pCont = pRsp; + rpcRsp.contLen = msgLen; + rpcSendResponse(&rpcRsp); + } else { + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); } - return 0; } +void mgmtAlterSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable) { + int32_t code = TSDB_CODE_OPS_NOT_SUPPORT; + SCMAlterTableMsg *pAlter = pMsg->pCont; + + if (pAlter->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) { + code = mgmtAddSuperTableTag((SSuperTableObj *) pTable, pAlter->schema, 1); + } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) { + code = mgmtDropSuperTableTag((SSuperTableObj *) pTable, pAlter->schema[0].name); + } else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) { + code = mgmtModifySuperTableTagNameByName((SSuperTableObj *) pTable, pAlter->schema[0].name, pAlter->schema[1].name); + } else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) { + code = mgmtAddSuperTableColumn((SSuperTableObj *) pTable, pAlter->schema, 1); + } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) { + code = mgmtDropSuperTableColumnByName((SSuperTableObj *) pTable, pAlter->schema[0].name); + } else {} + + mgmtSendSimpleResp(pMsg->thandle, code); +} \ No newline at end of file diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index c9a0333b1733ad122e992a63989daa8a09249f76..0d02e5f8e192b6f8e8f9bf8fd01ae534813d4f4c 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -15,8 +15,6 @@ #define _DEFAULT_SOURCE #include "os.h" - -#include "mgmtTable.h" #include "mgmtAcct.h" #include "mgmtChildTable.h" #include "mgmtDClient.h" @@ -25,44 +23,17 @@ #include "mgmtDServer.h" #include "mgmtGrant.h" #include "mgmtMnode.h" -#include "mgmtNormalTable.h" #include "mgmtProfile.h" #include "mgmtSdb.h" #include "mgmtShell.h" #include "mgmtSuperTable.h" +#include "mgmtTable.h" #include "mgmtUser.h" -#include "mgmtVgroup.h" -#include "mnode.h" - -#include "qast.h" -#include "qextbuffer.h" -#include "taoserror.h" -#include "taosmsg.h" -#include "tscompression.h" -#include "tskiplist.h" -#include "tsqlfunction.h" -#include "tstatus.h" -#include "ttime.h" -#include "name.h" - -extern void *tsNormalTableSdb; -extern void *tsChildTableSdb; static void mgmtProcessCreateTableMsg(SQueuedMsg *queueMsg); static void mgmtProcessDropTableMsg(SQueuedMsg *queueMsg); static void mgmtProcessAlterTableMsg(SQueuedMsg *queueMsg); static void mgmtProcessTableMetaMsg(SQueuedMsg *queueMsg); -static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *queueMsg); -static void mgmtProcessSuperTableMetaMsg(SQueuedMsg *queueMsg); -static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg); -static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg); -static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); -static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg); -static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg); -static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); -static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle); -static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg); int32_t mgmtInitTables() { int32_t code = mgmtInitSuperTables(); @@ -70,11 +41,6 @@ int32_t mgmtInitTables() { return code; } - code = mgmtInitNormalTables(); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - code = mgmtInitChildTables(); if (code != TSDB_CODE_SUCCESS) { return code; @@ -84,31 +50,17 @@ int32_t mgmtInitTables() { mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_TABLE, mgmtProcessDropTableMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_ALTER_TABLE, mgmtProcessAlterTableMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_TABLE_META, mgmtProcessTableMetaMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_TABLES_META, mgmtProcessMultiTableMetaMsg); - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_STABLE_META, mgmtProcessSuperTableMetaMsg); - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables); - mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateTableRsp); - mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropTableRsp); - mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, mgmtProcessAlterTableRsp); - mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, mgmtProcessDropStableRsp); - mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_TABLE, mgmtProcessTableCfgMsg); return TSDB_CODE_SUCCESS; } -STableInfo* mgmtGetTable(char *tableId) { - STableInfo *tableInfo = (STableInfo *) mgmtGetSuperTable(tableId); +STableInfo *mgmtGetTable(char *tableId) { + STableInfo *tableInfo = mgmtGetSuperTable(tableId); if (tableInfo != NULL) { return tableInfo; } - tableInfo = (STableInfo *) mgmtGetNormalTable(tableId); - if (tableInfo != NULL) { - return tableInfo; - } - - tableInfo = (STableInfo *) mgmtGetChildTable(tableId); + tableInfo = mgmtGetChildTable(tableId); if (tableInfo != NULL) { return tableInfo; } @@ -116,250 +68,29 @@ STableInfo* mgmtGetTable(char *tableId) { return NULL; } -STableInfo* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_t sid) { - SDnodeObj *pObj = mgmtGetDnode(dnodeId); - SVgObj *pVgroup = mgmtGetVgroup(vnode); - - if (pObj == NULL || pVgroup == NULL) { - return NULL; - } - - return pVgroup->tableList[sid]; -} - -int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMetaMsg *pMeta, bool usePublicIp) { - if (pTable->type == TSDB_CHILD_TABLE) { - mgmtGetChildTableMeta(pDb, (SChildTableObj *) pTable, pMeta, usePublicIp); - } else if (pTable->type == TSDB_NORMAL_TABLE) { - mgmtGetNormalTableMeta(pDb, (SNormalTableObj *) pTable, pMeta, usePublicIp); - } else if (pTable->type == TSDB_SUPER_TABLE) { - mgmtGetSuperTableMeta(pDb, (SSuperTableObj *) pTable, pMeta, usePublicIp); - } else { - mTrace("%s, uid:%" PRIu64 " table meta retrieve failed, invalid type", pTable->tableId, pTable->uid); - return TSDB_CODE_INVALID_TABLE; - } - - mTrace("%s, uid:%" PRIu64 " table meta is retrieved", pTable->tableId, pTable->uid); - return TSDB_CODE_SUCCESS; -} - -int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter) { - STableInfo *pTable = mgmtGetTable(pAlter->tableId); - if (pTable == NULL) { - return TSDB_CODE_INVALID_TABLE; - } - - if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { - return TSDB_CODE_MONITOR_DB_FORBIDDEN; - } - - if (pAlter->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) { - if (pTable->type == TSDB_SUPER_TABLE) { - return mgmtAddSuperTableTag((SSuperTableObj *) pTable, pAlter->schema, 1); - } - } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) { - if (pTable->type == TSDB_SUPER_TABLE) { - return mgmtDropSuperTableTag((SSuperTableObj *) pTable, pAlter->schema[0].name); - } - } else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) { - if (pTable->type == TSDB_SUPER_TABLE) { - return mgmtModifySuperTableTagNameByName((SSuperTableObj *) pTable, pAlter->schema[0].name, pAlter->schema[1].name); - } - } else if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) { - if (pTable->type == TSDB_CHILD_TABLE) { - return mgmtModifyChildTableTagValueByName((SChildTableObj *) pTable, pAlter->schema[0].name, pAlter->tagVal); - } - } else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) { - if (pTable->type == TSDB_NORMAL_TABLE) { - return mgmtAddNormalTableColumn((SNormalTableObj *) pTable, pAlter->schema, 1); - } else if (pTable->type == TSDB_SUPER_TABLE) { - return mgmtAddSuperTableColumn((SSuperTableObj *) pTable, pAlter->schema, 1); - } else {} - } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) { - if (pTable->type == TSDB_NORMAL_TABLE) { - return mgmtDropNormalTableColumnByName((SNormalTableObj *) pTable, pAlter->schema[0].name); - } else if (pTable->type == TSDB_SUPER_TABLE) { - return mgmtDropSuperTableColumnByName((SSuperTableObj *) pTable, pAlter->schema[0].name); - } else {} - } else {} - - return TSDB_CODE_OPS_NOT_SUPPORT; -} - void mgmtCleanUpTables() { - mgmtCleanUpNormalTables(); mgmtCleanUpChildTables(); mgmtCleanUpSuperTables(); } -int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { - SDbObj *pDb = mgmtGetDb(pShow->db); - if (pDb == NULL) { - return TSDB_CODE_DB_NOT_SELECTED; +void mgmtExtractTableName(const char* tableId, char* name) { + int pos = -1; + int num = 0; + for (pos = 0; tableId[pos] != 0; ++pos) { + if (tableId[pos] == '.') num++; + if (num == 2) break; } - int32_t cols = 0; - SSchema *pSchema = pMeta->schema; - - pShow->bytes[cols] = TSDB_TABLE_NAME_LEN; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "table name"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 8; - pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "create time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 2; - pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; - strcpy(pSchema[cols].name, "columns"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = TSDB_TABLE_NAME_LEN; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "stable name"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pMeta->numOfColumns = htons(cols); - pShow->numOfColumns = cols; - - pShow->offset[0] = 0; - for (int32_t i = 1; i < cols; ++i) { - pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + if (num == 2) { + strcpy(name, tableId + pos + 1); } - - pShow->numOfRows = pDb->numOfTables; - pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - - return 0; } -static void mgmtVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) { - if (rows < capacity) { - for (int32_t i = 0; i < numOfCols; ++i) { - memmove(data + pShow->offset[i] * rows, data + pShow->offset[i] * capacity, pShow->bytes[i] * rows); - } - } -} - -int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) { - SDbObj *pDb = mgmtGetDb(pShow->db); - if (pDb == NULL) return 0; - - SUserObj *pUser = mgmtGetUserFromConn(pConn); - if (pUser == NULL) return 0; - - if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { - if (strcmp(pUser->user, "root") != 0 && strcmp(pUser->user, "_root") != 0 && - strcmp(pUser->user, "monitor") != 0) { - return 0; - } - } - - int32_t numOfRows = 0; - int32_t numOfRead = 0; - int32_t cols = 0; - void *pTable = NULL; - char *pWrite = NULL; - char prefix[20] = {0}; - SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; - - strcpy(prefix, pDb->name); - strcat(prefix, TS_PATH_DELIMITER); - int32_t prefixLen = strlen(prefix); - - while (numOfRows < rows) { - int16_t numOfColumns = 0; - int64_t createdTime = 0; - char *tableId = NULL; - char *superTableId = NULL; - void *pNormalTableNode = sdbFetchRow(tsNormalTableSdb, pShow->pNode, (void **) &pTable); - if (pTable != NULL) { - SNormalTableObj *pNormalTable = (SNormalTableObj *) pTable; - pShow->pNode = pNormalTableNode; - tableId = pNormalTable->tableId; - superTableId = NULL; - createdTime = pNormalTable->createdTime; - numOfColumns = pNormalTable->numOfColumns; - } else { - pShow->pNode = NULL; - void *pChildTableNode = sdbFetchRow(tsChildTableSdb, pShow->pNode, (void **) &pTable); - if (pTable != NULL) { - SChildTableObj *pChildTable = (SChildTableObj *) pTable; - pShow->pNode = pChildTableNode; - tableId = pChildTable->tableId; - superTableId = pChildTable->superTableId; - createdTime = pChildTable->createdTime; - numOfColumns = pChildTable->superTable->numOfColumns; - } else { - break; - } - } - - // not belong to current db - if (strncmp(tableId, prefix, prefixLen)) { - continue; - } - - char tableName[TSDB_TABLE_NAME_LEN] = {0}; - memset(tableName, 0, tListLen(tableName)); - numOfRead++; - - // pattern compare for meter name - mgmtExtractTableName(tableId, tableName); - - if (pShow->payloadLen > 0 && - patternMatch(pShow->payload, tableName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH) { - continue; - } - - cols = 0; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strncpy(pWrite, tableName, TSDB_TABLE_NAME_LEN); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *) pWrite = createdTime; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *) pWrite = numOfColumns; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - if (superTableId != NULL) { - mgmtExtractTableName(superTableId, pWrite); - } - cols++; - - numOfRows++; - } - - pShow->numOfReads += numOfRead; - const int32_t NUM_OF_COLUMNS = 4; - - mgmtVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); - - return numOfRows; -} - -void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { - if (mgmtCheckRedirect(pMsg->thandle)) return; - +static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { SCMCreateTableMsg *pCreate = pMsg->pCont; mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->thandle); - if (mgmtCheckExpired()) { - mError("table:%s, failed to create, grant expired", pCreate->tableId); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED); - return; - } + if (mgmtCheckRedirect(pMsg->thandle)) return; if (!pMsg->pUser->writeAuth) { mError("table:%s, failed to create, no rights", pCreate->tableId); @@ -367,16 +98,8 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { return; } - SAcctObj *pAcct = pMsg->pUser->pAcct; - int32_t code = mgmtCheckTableLimit(pAcct, htons(pCreate->numOfColumns)); - if (code != TSDB_CODE_SUCCESS) { - mError("table:%s, failed to create, exceed the limit", pCreate->tableId); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); - return; - } - pMsg->pDb = mgmtGetDb(pCreate->db); - if (pMsg->pDb == NULL) { + if (pMsg->pDb == NULL || pMsg->pDb->dirty) { mError("table:%s, failed to create, db not selected", pCreate->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); return; @@ -396,92 +119,19 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { } if (pCreate->numOfTags != 0) { - mTrace("table:%s, is a super table", pCreate->tableId); - code = mgmtCreateSuperTable(pCreate); - mgmtSendSimpleResp(pMsg->thandle, code); - return; - } - - code = mgmtCheckTimeSeries(pCreate->numOfColumns); - if (code != TSDB_CODE_SUCCESS) { - mError("table:%s, failed to create, timeseries exceed the limit", pCreate->tableId); - mgmtSendSimpleResp(pMsg->thandle, code); - return; - } - - SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); - memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); - pMsg->pCont = NULL; - - SVgObj *pVgroup = mgmtGetAvailableVgroup(pMsg->pDb); - if (pVgroup == NULL) { - mTrace("table:%s, start to create a new vgroup", pCreate->tableId); - mgmtCreateVgroup(newMsg); - return; - } - - int32_t sid = taosAllocateId(pVgroup->idPool); - if (sid < 0) { - mTrace("tables:%s, no enough sid in vgroup:%d", pVgroup->vgId); - mgmtCreateVgroup(newMsg); - return; - } - - SMDCreateTableMsg *pMDCreate = NULL; - if (pCreate->numOfColumns == 0) { - mTrace("table:%s, is a child table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg); - pTable = mgmtCreateChildTable(pCreate, pVgroup, sid); - if (pTable == NULL) { - mgmtSendSimpleResp(pMsg->thandle, terrno); - mgmtFreeQueuedMsg(newMsg); - return; - } - pMDCreate = mgmtBuildCreateChildTableMsg(pCreate, (SChildTableObj *) pTable); - if (pMDCreate == NULL) { - mgmtSendSimpleResp(pMsg->thandle, terrno); - mgmtFreeQueuedMsg(newMsg); - return; - } + mTrace("table:%s, is a stable", pCreate->tableId); + mgmtCreateSuperTable(pMsg); } else { - mTrace("table:%s, is a normal table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg); - pTable = mgmtCreateNormalTable(pCreate, pVgroup, sid); - if (pTable == NULL) { - mgmtSendSimpleResp(pMsg->thandle, terrno); - mgmtFreeQueuedMsg(newMsg); - return; - } - pMDCreate = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable); - if (pMDCreate == NULL) { - mgmtSendSimpleResp(pMsg->thandle, terrno); - mgmtFreeQueuedMsg(newMsg); - return; - } + mTrace("table:%s, is a ctable", pCreate->tableId); + mgmtCreateChildTable(pMsg); } - - SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); - SRpcMsg rpcMsg = { - .handle = newMsg, - .pCont = pMDCreate, - .contLen = htonl(pMDCreate->contLen), - .code = 0, - .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE - }; - - newMsg->ahandle = pTable; - mgmtSendMsgToDnode(&ipSet, &rpcMsg); } -void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { - if (mgmtCheckRedirect(pMsg->thandle)) return; - +static void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { SCMDropTableMsg *pDrop = pMsg->pCont; mTrace("table:%s, drop table msg is received from thandle:%p", pDrop->tableId, pMsg->thandle); - if (mgmtCheckExpired()) { - mError("table:%s, failed to drop, grant expired", pDrop->tableId); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED); - return; - } + if (mgmtCheckRedirect(pMsg->thandle)) return; if (!pMsg->pUser->writeAuth) { mError("table:%s, failed to drop, no rights", pDrop->tableId); @@ -489,13 +139,19 @@ void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { return; } - SDbObj *pDb = mgmtGetDbByTableId(pDrop->tableId); - if (pDb == NULL) { + pMsg->pDb = mgmtGetDb(pCreate->db); + if (pMsg->pDb == NULL || pMsg->pDb->dirty) { mError("table:%s, failed to drop table, db not selected", pDrop->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); return; } + if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { + mError("table:%s, failed to drop table, in monitor database", pDrop->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); + return; + } + STableInfo *pTable = mgmtGetTable(pDrop->tableId); if (pTable == NULL) { if (pDrop->igNotExists) { @@ -509,409 +165,75 @@ void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { } } - if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { - mError("table:%s, failed to drop table, in monitor database", pDrop->tableId); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); - return; - } - - SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); - memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); - pMsg->pCont = NULL; - int32_t code; - - switch (pTable->type) { - case TSDB_SUPER_TABLE: - mTrace("table:%s, start to drop super table", pDrop->tableId); - code = mgmtDropSuperTable(newMsg, pDb, (SSuperTableObj *) pTable); - break; - case TSDB_CHILD_TABLE: - mTrace("table:%s, start to drop child table", pDrop->tableId); - code = mgmtDropChildTable(newMsg, (SChildTableObj *) pTable); - break; - case TSDB_NORMAL_TABLE: - mTrace("table:%s, start to drop normal table", pDrop->tableId); - code = mgmtDropNormalTable(newMsg, (SNormalTableObj *) pTable); - break; - case TSDB_STREAM_TABLE: - mTrace("table:%s, start to drop stream table", pDrop->tableId); - code = mgmtDropNormalTable(newMsg, (SNormalTableObj *) pTable); - break; - default: - code = TSDB_CODE_INVALID_TABLE_TYPE; - mError("table:%s, invalid table type:%d", pDrop->tableId, pTable->type); - } - - if (code != TSDB_CODE_SUCCESS) { - free(newMsg); - mgmtSendSimpleResp(pMsg->thandle, code); - } -} - -void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { - if (mgmtCheckRedirect(pMsg->thandle)) { - return; - } - - SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); - if (pUser == NULL) { - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); - return; - } - - SCMAlterTableMsg *pAlter = pMsg->pCont; - - int32_t code; - if (!pUser->writeAuth) { - code = TSDB_CODE_NO_RIGHTS; + if (pTable->type == TSDB_SUPER_TABLE) { + mTrace("table:%s, start to drop stable", pDrop->tableId); + mgmtDropSuperTable(pMsg, (SSuperTableObj *)pTable); } else { - pAlter->type = htons(pAlter->type); - pAlter->numOfCols = htons(pAlter->numOfCols); - - if (pAlter->numOfCols > 2) { - mError("table:%s error numOfCols:%d in alter table", pAlter->tableId, pAlter->numOfCols); - code = TSDB_CODE_APP_ERROR; - } else { - SDbObj *pDb = mgmtGetDb(pAlter->db); - if (pDb) { - for (int32_t i = 0; i < pAlter->numOfCols; ++i) { - pAlter->schema[i].bytes = htons(pAlter->schema[i].bytes); - } - - code = mgmtAlterTable(pDb, pAlter); - if (code == 0) { - mLPrint("table:%s is altered by %s", pAlter->tableId, pUser->user); - } - } else { - code = TSDB_CODE_DB_NOT_SELECTED; - } - } + mTrace("table:%s, start to drop ctable", pDrop->tableId); + mgmtDropChildTable(pMsg, (SNormalTableObj *)pTable); } - - mgmtSendSimpleResp(pMsg->thandle, code); } -void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle) { - SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SDbObj* pDb = mgmtGetDbByTableId(pTable->tableId); - if (pDb == NULL || pDb->dirty) { - mError("table:%s, failed to get table meta, db not selected", pTable->tableId); - rpcRsp.code = TSDB_CODE_DB_NOT_SELECTED; - rpcSendResponse(&rpcRsp); - return; - } - - SRpcConnInfo connInfo; - if (rpcGetConnInfo(thandle, &connInfo) != 0) { - mError("conn:%p is already released while get table meta", thandle); - return; - } - - bool usePublicIp = (connInfo.serverIp == tsPublicIpInt); - - STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS); - rpcRsp.code = mgmtGetTableMeta(pDb, pTable, pMeta, usePublicIp); - - if (rpcRsp.code != TSDB_CODE_SUCCESS) { - rpcFreeCont(pMeta); - } else { - rpcRsp.pCont = pMeta; - rpcRsp.contLen = pMeta->contLen; - - pMeta->contLen = htons(pMeta->contLen); - } - - rpcSendResponse(&rpcRsp); -} - -void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) { - SCMTableInfoMsg *pInfo = pMsg->pCont; - pInfo->createFlag = htons(pInfo->createFlag); - - SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); - if (pUser == NULL) { - mError("table:%s, failed to get table meta, invalid user", pInfo->tableId); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); - return; - } +static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { + SCMAlterTableMsg *pAlter = pMsg->pCont; + mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->thandle); - STableInfo *pTable = mgmtGetTable(pInfo->tableId); - if (pTable == NULL) { - if (pInfo->createFlag != 1) { - mError("table:%s, failed to get table meta, table not exist", pInfo->tableId); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); - return; - } else { - // on demand create table from super table if table does not exists - if (mgmtCheckRedirect(pMsg->thandle)) { - mError("table:%s, failed to create table while get meta info, need redirect message", pInfo->tableId); - return; - } - - int32_t contLen = sizeof(SCMCreateTableMsg) + sizeof(STagData); - SCMCreateTableMsg *pCreateMsg = rpcMallocCont(contLen); - if (pCreateMsg == NULL) { - mError("table:%s, failed to create table while get meta info, no enough memory", pInfo->tableId); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); - return; - } - - memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData)); - strcpy(pCreateMsg->tableId, pInfo->tableId); - - mError("table:%s, start to create table while get meta info", pInfo->tableId); -// mgmtCreateTable(pCreateMsg, contLen, pMsg->thandle, true); - } - } else { - mgmtProcessGetTableMeta(pTable, pMsg->thandle); - } -} + if (mgmtCheckRedirect(pMsg->thandle)) return; -void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) { - SRpcConnInfo connInfo; - if (rpcGetConnInfo(pMsg->thandle, &connInfo) != 0) { - mError("conn:%p is already released while get mulit table meta", pMsg->thandle); + if (!pMsg->pUser->writeAuth) { + mTrace("table:%s, failed to alter table, no rights", pAlter->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); return; } - bool usePublicIp = (connInfo.serverIp == tsPublicIpInt); - SUserObj *pUser = mgmtGetUser(connInfo.user); - if (pUser == NULL) { - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER); + pMsg->pDb = mgmtGetDbByTableId(pTable->tableId); + if (pMsg->pDb == NULL || pMsg->pDb->dirty) { + mError("table:%s, failed to alter table, db not selected", pTable->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); return; } - SCMMultiTableInfoMsg *pInfo = pMsg->pCont; - pInfo->numOfTables = htonl(pInfo->numOfTables); - - int32_t totalMallocLen = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice - SMultiTableMeta *pMultiMeta = rpcMallocCont(totalMallocLen); - if (pMultiMeta == NULL) { - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); + if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { + mError("table:%s, failed to alter table, its log db", pTable->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); return; } - pMultiMeta->contLen = sizeof(SMultiTableMeta); - pMultiMeta->numOfTables = 0; - - for (int t = 0; t < pInfo->numOfTables; ++t) { - char *tableId = (char*)(pInfo->tableIds + t * TSDB_TABLE_ID_LEN); - STableInfo *pTable = mgmtGetTable(tableId); - if (pTable == NULL) continue; - - SDbObj *pDb = mgmtGetDbByTableId(tableId); - if (pDb == NULL) continue; - - int availLen = totalMallocLen - pMultiMeta->contLen; - if (availLen <= sizeof(STableMetaMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS) { - //TODO realloc - //totalMallocLen *= 2; - //pMultiMeta = rpcReMalloc(pMultiMeta, totalMallocLen); - //if (pMultiMeta == NULL) { - /// rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); - // return TSDB_CODE_SERV_OUT_OF_MEMORY; - //} else { - // t--; - // continue; - //} - } - - STableMetaMsg *pMeta = (STableMetaMsg *)(pMultiMeta->metas + pMultiMeta->contLen); - int32_t code = mgmtGetTableMeta(pDb, pTable, pMeta, usePublicIp); - if (code == TSDB_CODE_SUCCESS) { - pMultiMeta->numOfTables ++; - pMultiMeta->contLen += pMeta->contLen; - } - } - - SRpcMsg rpcRsp = {0}; - rpcRsp.handle = pMsg->thandle; - rpcRsp.pCont = pMultiMeta; - rpcRsp.contLen = pMultiMeta->contLen; - rpcSendResponse(&rpcRsp); -} - -void mgmtProcessSuperTableMetaMsg(SQueuedMsg *pMsg) { - SCMSuperTableInfoMsg *pInfo = pMsg->pCont; - STableInfo *pTable = mgmtGetSuperTable(pInfo->tableId); + STableInfo *pTable = mgmtGetTable(pAlter->tableId); if (pTable == NULL) { + mError("table:%s, failed to alter table, table not exist", pTable->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); return; } - SCMSuperTableInfoRsp *pRsp = mgmtGetSuperTableVgroup((SSuperTableObj *) pTable); - if (pRsp != NULL) { - int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t); - SRpcMsg rpcRsp = {0}; - rpcRsp.handle = pMsg->thandle; - rpcRsp.pCont = pRsp; - rpcRsp.contLen = msgLen; - rpcSendResponse(&rpcRsp); - } else { - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); - } -} - -static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) { - if (rpcMsg->handle == NULL) return; - - SQueuedMsg *queueMsg = rpcMsg->handle; - queueMsg->received++; - - STableInfo *pTable = queueMsg->ahandle; - mTrace("table:%s, create table rsp received, thandle:%p ahandle:%p result:%s", pTable->tableId, queueMsg->thandle, - rpcMsg->handle, tstrerror(rpcMsg->code)); - - if (rpcMsg->code != TSDB_CODE_SUCCESS) { - if (pTable->type == TSDB_CHILD_TABLE) { - SSdbOperDesc oper = { - .type = SDB_OPER_TYPE_GLOBAL, - .table = tsChildTableSdb, - .pObj = pTable - }; - sdbDeleteRow(&oper); - } else if (pTable->type == TSDB_NORMAL_TABLE){ - SSdbOperDesc oper = { - .type = SDB_OPER_TYPE_GLOBAL, - .table = tsNormalTableSdb, - .pObj = pTable - }; - sdbDeleteRow(&oper); - } else {} - mError("table:%s, failed to create in dnode, reason:%s", pTable->tableId, tstrerror(rpcMsg->code)); - mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); - } else { - mTrace("table:%s, created in dnode", pTable->tableId); - if (queueMsg->msgType != TSDB_MSG_TYPE_CM_CREATE_TABLE) { - SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg)); - newMsg->msgType = queueMsg->msgType; - newMsg->thandle = queueMsg->thandle; - newMsg->pDb = queueMsg->pDb; - newMsg->pUser = queueMsg->pUser; - newMsg->contLen = queueMsg->contLen; - newMsg->pCont = rpcMallocCont(newMsg->contLen); - memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen); - mTrace("table:%s, start to get meta", pTable->tableId); - mgmtAddToShellQueue(newMsg); - } else { - mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); - } - } - - free(queueMsg); -} - -static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg) { - mTrace("alter table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); -} - -static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { - if (rpcMsg->handle == NULL) return; - - SQueuedMsg *queueMsg = rpcMsg->handle; - queueMsg->received++; - - STableInfo *pTable = queueMsg->ahandle; - mTrace("table:%s, drop table rsp received, thandle:%p result:%s", pTable->tableId, queueMsg->thandle, tstrerror(rpcMsg->code)); - - if (rpcMsg->code != TSDB_CODE_SUCCESS) { - mError("table:%s, failed to drop in dnode, reason:%s", pTable->tableId, tstrerror(rpcMsg->code)); - mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); - free(queueMsg); - return; - } - - SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); - if (pVgroup == NULL) { - mError("table:%s, failed to get vgroup", pTable->tableId); - mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_INVALID_VGROUP_ID); - free(queueMsg); + pAlter->numOfCols = htons(pAlter->numOfCols); + if (pAlter->numOfCols > 2) { + mError("table:%s, error numOfCols:%d in alter table", pAlter->tableId, pAlter->numOfCols); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_APP_ERROR); return; } - if (pTable->type == TSDB_CHILD_TABLE) { - SSdbOperDesc oper = { - .type = SDB_OPER_TYPE_GLOBAL, - .table = tsChildTableSdb, - .pObj = pTable - }; - int32_t code = sdbDeleteRow(&oper); - if (code != TSDB_CODE_SUCCESS) { - mError("table:%s, update ctables sdb error", pTable->tableId); - mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); - free(queueMsg); - return; - } - } else if (pTable->type == TSDB_NORMAL_TABLE){ - SSdbOperDesc oper = { - .type = SDB_OPER_TYPE_GLOBAL, - .table = tsNormalTableSdb, - .pObj = pTable - }; - int32_t code = sdbDeleteRow(&oper); - if (code != TSDB_CODE_SUCCESS) { - mError("table:%s, update ntables sdb error", pTable->tableId); - mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); - free(queueMsg); - return; - } + for (int32_t i = 0; i < pAlter->numOfCols; ++i) { + pAlter->schema[i].bytes = htons(pAlter->schema[i].bytes); } - if (pVgroup->numOfTables <= 0) { - mPrint("vgroup:%d, all tables is dropped, drop vgroup", pVgroup->vgId); - mgmtDropVgroup(pVgroup, NULL); + if (pTable->type == TSDB_SUPER_TABLE) { + mTrace("table:%s, start to alter stable", pDrop->tableId); + mgmtAlterSuperTable(pMsg, (SSuperTableObj *)pTable); + } else { + mTrace("table:%s, start to alter ctable", pDrop->tableId); + mgmtAlterChildTable(pMsg, (SNormalTableObj *)pTable); } - - mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SUCCESS); - free(queueMsg); -} - -static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg) { - mTrace("drop stable rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); } -static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) { - if (mgmtCheckRedirect(rpcMsg->handle)) return; +static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) { + SCMTableInfoMsg *pInfo = mgmtGetTable(pInfo->tableId); + mTrace("table:%s, table meta msg is received from thandle:%p", pInfo->tableId, pMsg->thandle); - SDMConfigTableMsg *pCfg = (SDMConfigTableMsg *) rpcMsg->pCont; - pCfg->dnode = htonl(pCfg->dnode); - pCfg->vnode = htonl(pCfg->vnode); - pCfg->sid = htonl(pCfg->sid); - mTrace("dnode:%s, vnode:%d, sid:%d, receive table config msg", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); - - STableInfo *pTable = mgmtGetTableByPos(pCfg->dnode, pCfg->vnode, pCfg->sid); - if (pTable == NULL) { - mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); - mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_TABLE); - return; - } - - mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS); - - SMDCreateTableMsg *pMDCreate = NULL; - if (pTable->type == TSDB_CHILD_TABLE) { - mTrace("table:%s, is a child table, vgroup:%d sid:%d", pTable->tableId, pCfg->vnode, pCfg->sid); - pMDCreate = mgmtBuildCreateChildTableMsg(NULL, (SChildTableObj *) pTable); - if (pMDCreate == NULL) { - return; - } - } else if (pTable->type == TSDB_NORMAL_TABLE) { - mTrace("table:%s, is a normal table, vgroup:%d sid:%d", pTable->tableId, pCfg->vnode, pCfg->sid); - pMDCreate = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable); - if (pMDCreate == NULL) { - return; - } + STableInfo *pTable = mgmtGetTable(pInfo->tableId); + if (pTable == NULL || pTable->type != TSDB_SUPER_TABLE) { + mgmtGetChildTableMeta(pMsg, (SSuperTableObj *)pTable); } else { - mError("table:%s, invalid msg type, vgroup:%d sid:%d", pTable->tableId, pCfg->vnode, pCfg->sid); + mgmtGetSuperTableMeta(pMsg, (SNormalTableObj *)pTable); } - - SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode); - SRpcMsg rpcRsp = { - .handle = NULL, - .pCont = pMDCreate, - .contLen = htonl(pMDCreate->contLen), - .code = 0, - .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE - }; - mgmtSendMsgToDnode(&ipSet, &rpcRsp); -} +} \ No newline at end of file diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index 576b326669e6bdd8e29e505cdd350a31147ccf6d..8c72520563276e9caa66955fa3a9df6654aa8e96 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -310,9 +310,10 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void return numOfRows; } -SUserObj *mgmtGetUserFromConn(void *pConn) { +SUserObj *mgmtGetUserFromConn(void *pConn, bool *usePublicIp) { SRpcConnInfo connInfo; if (rpcGetConnInfo(pConn, &connInfo) == 0) { + *usePublicIp = (connInfo.serverIp == tsPublicIpInt); return mgmtGetUser(connInfo.user); } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 9e52574b8c8890289f12770229675e99d46cb749..f0649dab81b8890a0e2929ca11992e32aba80b62 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -428,7 +428,7 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo return numOfRows; } -void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable) { +void mgmtAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { if (pTable->sid >= 0 && pVgroup->tableList[pTable->sid] == NULL) { pVgroup->tableList[pTable->sid] = pTable; taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid); @@ -439,7 +439,7 @@ void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable) { mgmtAddVgroupIntoDbTail(pVgroup); } -void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable) { +void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { if (pTable->sid >= 0 && pVgroup->tableList[pTable->sid] != NULL) { pVgroup->tableList[pTable->sid] = NULL; taosFreeId(pVgroup->idPool, pTable->sid);