diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 22e84b521b094ec7b125f47b31e4126bfffd0551..831416bc296d81b89d22082732ea929cca2c035e 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2545,21 +2545,13 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) { SSchema * pSchema; uint8_t ieType; - char *rsp = pSql->res.pRsp; - - ieType = *rsp; - if (ieType != TSDB_IE_TYPE_META) { - tscError("invalid ie type:%d", ieType); - return TSDB_CODE_INVALID_IE; - } - - rsp++; - pMeta = (STableMeta *)rsp; + pMeta = (STableMeta *)pSql->res.pRsp; pMeta->sid = htonl(pMeta->sid); pMeta->sversion = htons(pMeta->sversion); pMeta->vgid = htonl(pMeta->vgid); pMeta->uid = htobe64(pMeta->uid); + pMeta->contLen = htons(pMeta->contLen); if (pMeta->sid < 0 || pMeta->vgid < 0) { tscError("invalid meter vgid:%d, sid%d", pMeta->vgid, pMeta->sid); @@ -2583,8 +2575,7 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) { } pMeta->rowSize = 0; - rsp += sizeof(STableMeta); - pSchema = (SSchema *)rsp; + pSchema = (SSchema *)(pSql->res.pRsp + sizeof(STableMeta)); int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags; for (int i = 0; i < numOfTotalCols; ++i) { @@ -2598,29 +2589,29 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) { pSchema++; } - rsp += numOfTotalCols * sizeof(SSchema); - - int32_t tagLen = 0; - SSchema *pTagsSchema = tsGetTagSchema(pMeta); - - if (pMeta->tableType == TSDB_TABLE_TYPE_CHILD_TABLE) { - for (int32_t i = 0; i < pMeta->numOfTags; ++i) { - tagLen += pTagsSchema[i].bytes; - } - } - - rsp += tagLen; - int32_t size = (int32_t)(rsp - (char *)pMeta); +// rsp += numOfTotalCols * sizeof(SSchema); +// +// int32_t tagLen = 0; +// SSchema *pTagsSchema = tsGetTagSchema(pMeta); +// +// if (pMeta->tableType == TSDB_TABLE_TYPE_CHILD_TABLE) { +// for (int32_t i = 0; i < pMeta->numOfTags; ++i) { +// tagLen += pTagsSchema[i].bytes; +// } +// } +// +// rsp += tagLen; +// int32_t size = (int32_t)(rsp - (char *)pMeta); // pMeta->index = rand() % TSDB_VNODES_SUPPORT; - pMeta->index = 0; +// pMeta->index = 0; // todo add one more function: taosAddDataIfNotExists(); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0); assert(pMeterMetaInfo->pMeterMeta == NULL); pMeterMetaInfo->pMeterMeta = (STableMeta *)taosAddDataIntoCache(tscCacheHandle, pMeterMetaInfo->name, (char *)pMeta, - size, tsMeterMetaKeepTimer); + pMeta->contLen, tsMeterMetaKeepTimer); // todo handle out of memory case if (pMeterMetaInfo->pMeterMeta == NULL) return 0; diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 013e64f9576ffca32123b1a27e15838c8ba95cc7..70ded6a8151fb049fd0a7bc23181fd9097c13cc7 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -47,11 +47,7 @@ extern void *tsMgmtTmr; extern void *tsMgmtTranQhandle; extern char tsMgmtDirectory[]; -extern int tsAcctUpdateSize; extern int tsDbUpdateSize; -extern int tsDnodeUpdateSize; -extern int tsMnodeUpdateSize; -extern int tsVgUpdateSize; typedef struct { uint32_t privateIp; @@ -102,7 +98,7 @@ typedef struct { } STableGid; typedef struct { - char tableId[TSDB_TABLE_ID_LEN]; + char tableId[TSDB_TABLE_ID_LEN + 1]; int8_t type; uint64_t uid; int32_t sid; @@ -113,7 +109,7 @@ typedef struct { struct _vg_obj; typedef struct SSuperTableObj { - char tableId[TSDB_TABLE_ID_LEN]; + char tableId[TSDB_TABLE_ID_LEN + 1]; int8_t type; uint64_t uid; int32_t sid; @@ -130,35 +126,20 @@ typedef struct SSuperTableObj { } SSuperTableObj; typedef struct { - char tableId[TSDB_TABLE_ID_LEN]; + char tableId[TSDB_TABLE_ID_LEN + 1]; int8_t type; uint64_t uid; int32_t sid; int32_t vgId; int64_t createdTime; - char superTableId[TSDB_TABLE_ID_LEN]; + char superTableId[TSDB_TABLE_ID_LEN + 1]; int8_t reserved[7]; int8_t updateEnd[1]; SSuperTableObj *superTable; } SChildTableObj; typedef struct { - char tableId[TSDB_TABLE_ID_LEN]; - int8_t type; - uint64_t uid; - int32_t sid; - int32_t vgId; - int64_t createdTime; - int32_t sversion; - int32_t numOfColumns; - int8_t reserved[3]; - int8_t updateEnd[1]; - int16_t nextColId; - SSchema* schema; -} SNormalTableObj; - -typedef struct { - char tableId[TSDB_TABLE_ID_LEN]; + char tableId[TSDB_TABLE_ID_LEN + 1]; int8_t type; uint64_t uid; int32_t sid; @@ -169,14 +150,14 @@ typedef struct { int16_t sqlLen; int8_t reserved[3]; int8_t updateEnd[1]; - int16_t nextColId; char* sql; //null-terminated string + int16_t nextColId; SSchema* schema; -} SStreamTableObj; +} SNormalTableObj; typedef struct _vg_obj { uint32_t vgId; - char dbName[TSDB_DB_NAME_LEN]; + char dbName[TSDB_DB_NAME_LEN + 1]; int64_t createdTime; uint64_t lastCreate; uint64_t lastRemove; @@ -194,7 +175,7 @@ typedef struct _vg_obj { } SVgObj; typedef struct _db_obj { - char name[TSDB_DB_NAME_LEN]; + char name[TSDB_DB_NAME_LEN + 1]; int64_t createdTime; SDbCfg cfg; int8_t dropStatus; @@ -213,9 +194,9 @@ typedef struct _db_obj { struct _acctObj; typedef struct _user_obj { - char user[TSDB_USER_LEN]; - char pass[TSDB_KEY_LEN]; - char acct[TSDB_USER_LEN]; + char user[TSDB_USER_LEN + 1]; + char pass[TSDB_KEY_LEN + 1]; + char acct[TSDB_USER_LEN + 1]; int64_t createdTime; int8_t superAuth; int8_t writeAuth; @@ -246,8 +227,8 @@ typedef struct { } SAcctInfo; typedef struct _acctObj { - char user[TSDB_USER_LEN]; - char pass[TSDB_KEY_LEN]; + char user[TSDB_USER_LEN + 1]; + char pass[TSDB_KEY_LEN + 1]; SAcctCfg cfg; int32_t acctId; int64_t createdTime; @@ -259,16 +240,9 @@ typedef struct _acctObj { pthread_mutex_t mutex; } SAcctObj; -typedef struct { - char spi; - char encrypt; - char secret[TSDB_KEY_LEN]; - char cipheringKey[TSDB_KEY_LEN]; -} SSecInfo; - typedef struct { int8_t type; - char db[TSDB_DB_NAME_LEN]; + char db[TSDB_DB_NAME_LEN + 1]; void * pNode; int16_t numOfColumns; int32_t rowSize; diff --git a/src/inc/sdb.h b/src/inc/sdb.h index d48adc5bbd0b1550a6aaafc5ee58e787ead3ad85..d0239522a9d2939cdd09374a5ad97937949d2400 100644 --- a/src/inc/sdb.h +++ b/src/inc/sdb.h @@ -136,11 +136,8 @@ int64_t sdbGetVersion(); int32_t sdbGetRunStatus(); -#define TSDB_MAX_TABLES 1000 -extern void* tsChildTableSdb; -extern void* tsNormalTableSdb; -extern void* tsStreamTableSdb; -extern void* tsSuperTableSdb; +#define TSDB_MAX_NORMAL_TABLES 10000 +#define TSDB_MAX_SUPER_TABLES 1000 #ifdef __cplusplus } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 900fee960d7f7d0aee3c92885edb10724b154c01..67efef10f2935493aa3a923f22da50ea0fa352ce 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -224,7 +224,7 @@ typedef struct { typedef struct SSchema { uint8_t type; - char name[TSDB_COL_NAME_LEN]; + char name[TSDB_COL_NAME_LEN + 1]; int16_t colId; int16_t bytes; } SSchema; @@ -247,8 +247,8 @@ typedef struct { } SDCreateTableMsg; typedef struct { - char tableId[TSDB_TABLE_ID_LEN]; - char db[TSDB_DB_NAME_LEN]; + char tableId[TSDB_TABLE_ID_LEN + 1]; + char db[TSDB_DB_NAME_LEN + 1]; int8_t igExists; int16_t numOfTags; int16_t numOfColumns; @@ -258,14 +258,14 @@ typedef struct { } SCreateTableMsg; typedef struct { - char tableId[TSDB_TABLE_ID_LEN]; - char db[TSDB_DB_NAME_LEN]; + char tableId[TSDB_TABLE_ID_LEN + 1]; + char db[TSDB_DB_NAME_LEN + 1]; int8_t igNotExists; } SDropTableMsg; typedef struct { - char tableId[TSDB_TABLE_ID_LEN]; - char db[TSDB_DB_NAME_LEN]; + char tableId[TSDB_TABLE_ID_LEN + 1]; + char db[TSDB_DB_NAME_LEN + 1]; int16_t type; /* operation type */ char tagVal[TSDB_MAX_BYTES_PER_ROW]; int8_t numOfCols; /* number of schema */ @@ -275,11 +275,11 @@ typedef struct { typedef struct { char clientVersion[TSDB_VERSION_LEN]; char msgVersion[TSDB_VERSION_LEN]; - char db[TSDB_TABLE_ID_LEN]; + char db[TSDB_TABLE_ID_LEN + 1]; } SConnectMsg; typedef struct { - char acctId[TSDB_ACCT_LEN]; + char acctId[TSDB_ACCT_LEN + 1]; char serverVersion[TSDB_VERSION_LEN]; int8_t writeAuth; int8_t superAuth; @@ -301,24 +301,24 @@ typedef struct { } SAcctCfg; typedef struct { - char user[TSDB_USER_LEN]; - char pass[TSDB_KEY_LEN]; + char user[TSDB_USER_LEN + 1]; + char pass[TSDB_KEY_LEN + 1]; SAcctCfg cfg; } SCreateAcctMsg, SAlterAcctMsg; typedef struct { - char user[TSDB_USER_LEN]; + char user[TSDB_USER_LEN + 1]; } SDropUserMsg, SDropAcctMsg; typedef struct { - char user[TSDB_USER_LEN]; - char pass[TSDB_KEY_LEN]; + char user[TSDB_USER_LEN + 1]; + char pass[TSDB_KEY_LEN + 1]; int8_t privilege; int8_t flag; } SCreateUserMsg, SAlterUserMsg; typedef struct { - char db[TSDB_TABLE_ID_LEN]; + char db[TSDB_TABLE_ID_LEN + 1]; } SMgmtHead; typedef struct { @@ -444,7 +444,7 @@ typedef struct { int64_t nAggTimeInterval; // time interval for aggregation, in million second int64_t slidingTime; // value for sliding window - + // tag schema, used to parse tag information in pSidExtInfo uint64_t pTagSchema; @@ -517,8 +517,8 @@ typedef struct { * NOTE: sizeof(SVnodeCfg) < TSDB_FILE_HEADER_LEN / 4 */ typedef struct { - char acct[TSDB_USER_LEN]; - char db[TSDB_DB_NAME_LEN]; + char acct[TSDB_USER_LEN + 1]; + char db[TSDB_DB_NAME_LEN + 1]; uint32_t vgId; int32_t maxSessions; int32_t cacheBlockSize; @@ -543,7 +543,7 @@ typedef struct { } SVnodeCfg, SCreateDbMsg, SDbCfg, SAlterDbMsg; typedef struct { - char db[TSDB_TABLE_ID_LEN]; + char db[TSDB_TABLE_ID_LEN + 1]; uint8_t ignoreNotExists; } SDropDbMsg, SUseDbMsg; @@ -583,7 +583,7 @@ typedef struct { typedef struct { uint32_t destId; uint32_t destIp; - char tableId[TSDB_UNI_LEN]; + char tableId[TSDB_UNI_LEN + 1]; char empty[3]; uint8_t msgType; int32_t msgLen; @@ -615,7 +615,7 @@ typedef struct { } SVPeersMsg; typedef struct { - char tableId[TSDB_TABLE_ID_LEN]; + char tableId[TSDB_TABLE_ID_LEN + 1]; int16_t createFlag; char tags[]; } STableInfoMsg; @@ -626,7 +626,7 @@ typedef struct { } SMultiTableInfoMsg; typedef struct { - char tableId[TSDB_TABLE_ID_LEN]; + char tableId[TSDB_TABLE_ID_LEN + 1]; } SSuperTableInfoMsg; typedef struct { @@ -637,7 +637,7 @@ typedef struct { typedef struct { int16_t elemLen; - char tableId[TSDB_TABLE_ID_LEN]; + char tableId[TSDB_TABLE_ID_LEN + 1]; int16_t orderIndex; int16_t orderType; // used in group by xx order by xxx @@ -678,7 +678,7 @@ typedef struct { } SSuperTableMeta; typedef struct STableMeta { - char tableId[TSDB_TABLE_ID_LEN]; // note: This field must be at the front + char tableId[TSDB_TABLE_ID_LEN + 1]; // note: This field must be at the front int32_t contLen; uint8_t numOfTags : 6; uint8_t precision : 2; @@ -701,7 +701,7 @@ typedef struct SMultiTableMeta { } SMultiTableMeta; typedef struct { - char name[TSDB_TABLE_ID_LEN]; + char name[TSDB_TABLE_ID_LEN + 1]; char data[TSDB_MAX_TAGS_LEN]; } STagData; @@ -712,7 +712,7 @@ typedef struct { */ typedef struct { int8_t type; - char db[TSDB_DB_NAME_LEN]; + char db[TSDB_DB_NAME_LEN + 1]; uint16_t payloadLen; char payload[]; } SShowMsg; @@ -741,14 +741,14 @@ typedef struct { } SCfgDnodeMsg; typedef struct { - char sql[TSDB_SHOW_SQL_LEN]; + char sql[TSDB_SHOW_SQL_LEN + 1]; uint32_t queryId; int64_t useconds; int64_t stime; } SQueryDesc; typedef struct { - char sql[TSDB_SHOW_SQL_LEN]; + char sql[TSDB_SHOW_SQL_LEN + 1]; uint32_t streamId; int64_t num; // number of computing/cycles int64_t useconds; @@ -781,7 +781,7 @@ typedef struct { } SHeartBeatRsp; typedef struct { - char queryId[TSDB_KILL_MSG_LEN]; + char queryId[TSDB_KILL_MSG_LEN + 1]; } SKillQueryMsg, SKillStreamMsg, SKillConnectionMsg; typedef struct { diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index 78674e6c13d4b13617ee352d5cb3404a031730ea..94220b74da39ee046aa17736f9b567a3c0037b68 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -86,7 +86,7 @@ int main(int argc, char* argv[]) { { printf("=== this a test for debug usage\n"); void *taos = taos_connect(NULL, "root", "taosdata", NULL, 0); - taos_query(taos, "create database db"); + taos_query(taos, "create table d1.c2 using d1.st2 tags(1)"); while (1) { sleep(1000); } diff --git a/src/mnode/inc/mgmtDnodeInt.h b/src/mnode/inc/mgmtDnodeInt.h index 20cb7e5e81a80090073a2f418a582d148008bfb9..816d916a2a6fa7ca0585a113d6d2255edbadb406 100644 --- a/src/mnode/inc/mgmtDnodeInt.h +++ b/src/mnode/inc/mgmtDnodeInt.h @@ -28,7 +28,6 @@ extern void *mgmtStatusTimer; int32_t mgmtSendCreateTableMsg(SChildTableObj *pTable, SVgObj *pVgroup); int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup); -int32_t mgmtSendCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup); int mgmtSendRemoveMeterMsgToDnode(STableInfo *pTable, SVgObj *pVgroup); int mgmtSendVPeersMsg(SVgObj *pVgroup); diff --git a/src/mnode/inc/mgmtGrant.h b/src/mnode/inc/mgmtGrant.h index 96ecc4a3a300e725a04858bbf89c2a7c1f97872c..e68e6ae71e31e6d961cf3f3a2ce352cb9a46d16f 100644 --- a/src/mnode/inc/mgmtGrant.h +++ b/src/mnode/inc/mgmtGrant.h @@ -25,8 +25,8 @@ extern "C" { #include "mnode.h" extern bool (*mgmtCheckExpired)(); -extern void (*mgmtAddTimeSeries)(uint32_t timeSeriesNum); -extern void (*mgmtRestoreTimeSeries)(uint32_t timeseries); +extern void (*mgmtAddTimeSeries)(SAcctObj *pAcct, uint32_t timeSeriesNum); +extern void (*mgmtRestoreTimeSeries)(SAcctObj *pAcct, uint32_t timeseries); extern int32_t (*mgmtCheckTimeSeries)(uint32_t timeseries); extern int32_t (*mgmtCheckUserGrant)(); extern int32_t (*mgmtCheckDbGrant)(); diff --git a/src/mnode/inc/mgmtStreamTable.h b/src/mnode/inc/mgmtStreamTable.h deleted file mode 100644 index 43d3341db13084f0c91ffc48b57e368fe4a54ec4..0000000000000000000000000000000000000000 --- a/src/mnode/inc/mgmtStreamTable.h +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TBASE_MNODE_STREAM_TABLE_H -#define TBASE_MNODE_STREAM_TABLE_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include -#include "mnode.h" - -int32_t mgmtInitStreamTables(); -void mgmtCleanUpStreamTables(); - -void * mgmtGetStreamTable(char *tableId); - -int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); -int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable); -int32_t mgmtAlterStreamTable(SDbObj *pDb, SAlterTableMsg *pAlter); -int8_t *mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup); - -int32_t mgmtGetStreamTableMeta(SDbObj *pDb, SStreamTableObj *pTable, STableMeta *pMeta, bool usePublicIp); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index b13f9b985155d508d6f262bfe6e50c6d283a55bd..65bdd076655cacc8d0e21b2ebc0b064030b3a146 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -25,16 +25,21 @@ extern "C" { #include "mnode.h" int32_t mgmtInitVgroups(); +void mgmtCleanUpVgroups(); SVgObj *mgmtGetVgroup(int32_t vgId); + SVgObj *mgmtCreateVgroup(SDbObj *pDb); int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup); -void mgmtSetVgroupIdPool(); +void mgmtUpdateVgroup(SVgObj *pVgroup); + int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); -void mgmtCleanUpVgroups(); -SVgObj *mgmtGetAvailVgroup(SDbObj *pDb); -int32_t mgmtAllocateSid(SDbObj *pDb, SVgObj *pVgroup); +void mgmtSetVgroupIdPool(); +SVgObj *mgmtGetAvailVgroup(SDbObj *pDb, int32_t *sid); + +void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable); +void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable); #ifdef __cplusplus } diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index bae81a64c338322201b45051d810b2c4e293bf2b..a9454c689f363dfe6eb018e5a9bb24f60f550bfd 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -36,6 +36,7 @@ #include "mgmtVgroup.h" void *tsChildTableSdb; +int32_t tsChildTableUpdateSize; void *(*mgmtChildTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); void *mgmtChildTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize); @@ -61,6 +62,8 @@ static void mgmtChildTableActionInit() { } void *mgmtChildTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) { + SChildTableObj *pTable = (SChildTableObj *) row; + memcpy(pTable, str, tsChildTableUpdateSize); return NULL; } @@ -102,10 +105,9 @@ void *mgmtChildTableActionInsert(void *row, char *str, int32_t size, int32_t *ss pTable->superTable = mgmtGetSuperTable(pTable->superTableId); mgmtAddTableIntoSuperTable(pTable->superTable); - pAcct->acctInfo.numOfTimeSeries += (pTable->superTable->numOfColumns - 1); - pVgroup->numOfTables++; - pDb->numOfTables++; - pVgroup->tableList[pTable->sid] = (STableInfo *) pTable; + mgmtAddTimeSeries(pAcct, pTable->superTable->numOfColumns - 1); + mgmtAddTableIntoDb(pDb); + mgmtAddTableIntoVgroup(pVgroup, (STableInfo *) pTable); if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) { mgmtMoveVgroupToTail(pDb, pVgroup); @@ -138,11 +140,9 @@ void *mgmtChildTableActionDelete(void *row, char *str, int32_t size, int32_t *ss return NULL; } - pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1); - pVgroup->tableList[pTable->sid] = NULL; - pVgroup->numOfTables--; - pDb->numOfTables--; - taosFreeId(pVgroup->idPool, pTable->sid); + mgmtRestoreTimeSeries(pAcct, pTable->superTable->numOfColumns - 1); + mgmtRemoveTableFromDb(pDb); + mgmtRemoveTableFromVgroup(pVgroup, (STableInfo *) pTable); mgmtRemoveTableFromSuperTable(pTable->superTable); @@ -161,8 +161,8 @@ void *mgmtChildTableActionEncode(void *row, char *str, int32_t size, int32_t *ss SChildTableObj *pTable = (SChildTableObj *) row; assert(row != NULL && str != NULL); - int32_t tsize = pTable->updateEnd - (int8_t *) pTable; - memcpy(str, pTable, tsize); + memcpy(str, pTable, tsChildTableUpdateSize); + *ssize = tsChildTableUpdateSize; return NULL; } @@ -170,18 +170,14 @@ void *mgmtChildTableActionEncode(void *row, char *str, int32_t size, int32_t *ss void *mgmtChildTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize) { assert(str != NULL); - SChildTableObj *pTable = (SChildTableObj *)malloc(sizeof(SChildTableObj)); - if (pTable == NULL) { - return NULL; - } - memset(pTable, 0, sizeof(SChildTableObj)); + SChildTableObj *pTable = (SChildTableObj *)calloc(sizeof(SChildTableObj), 1); + if (pTable == NULL) return NULL; - int32_t tsize = pTable->updateEnd - (int8_t *)pTable; - if (size < tsize) { + if (size < tsChildTableUpdateSize) { mgmtDestroyChildTable(pTable); return NULL; } - memcpy(pTable, str, tsize); + memcpy(pTable, str, tsChildTableUpdateSize); return (void *)pTable; } @@ -199,8 +195,10 @@ int32_t mgmtInitChildTables() { SChildTableObj *pTable = NULL; mgmtChildTableActionInit(); + SChildTableObj tObj; + tsChildTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj; - tsChildTableSdb = sdbOpenTable(tsMaxTables, sizeof(SChildTableObj), + tsChildTableSdb = sdbOpenTable(tsMaxTables, tsChildTableUpdateSize, "ctables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtChildTableAction); if (tsChildTableSdb == NULL) { mError("failed to init child table data"); @@ -216,22 +214,63 @@ int32_t mgmtInitChildTables() { SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); if (pDb == NULL) { - mError("super table:%s, failed to get db, discard it", pTable->tableId); + mError("ctable:%s, failed to get db, discard it", pTable->tableId); sdbDeleteRow(tsChildTableSdb, pTable); pNode = pLastNode; continue; } - mgmtAddTableIntoDb(pDb); - } + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + mError("ctable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->tableId, pTable->vgId, pTable->sid); + pTable->vgId = 0; + sdbDeleteRow(tsChildTableSdb, pTable); + pNode = pLastNode; + continue; + } - mgmtSetVgroupIdPool(); + if (strcmp(pVgroup->dbName, pDb->name) != 0) { + mError("ctable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it", + pTable->tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid); + pTable->vgId = 0; + sdbDeleteRow(tsChildTableSdb, pTable); + pNode = pLastNode; + continue; + } + + if (pVgroup->tableList == NULL) { + mError("ctable:%s, vgroup:%d tableList is null", pTable->tableId, pTable->vgId); + pTable->vgId = 0; + sdbDeleteRow(tsChildTableSdb, pTable); + pNode = pLastNode; + continue; + } + + pVgroup->tableList[pTable->sid] = (STableInfo*)pTable; + taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid, 1); + + SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTable->superTableId); + if (pSuperTable == NULL) { + mError("ctable:%s, stable:%s not exist", pTable->tableId, pTable->superTableId); + pTable->vgId = 0; + sdbDeleteRow(tsChildTableSdb, pTable); + pNode = pLastNode; + continue; + } + + pTable->superTable = pSuperTable; + mgmtAddTableIntoSuperTable(pSuperTable); + + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); + mgmtAddTimeSeries(pAcct, pTable->superTable->numOfColumns - 1); + } mTrace("child table is initialized"); return 0; } void mgmtCleanUpChildTables() { + sdbCloseTable(tsChildTableSdb); } int8_t *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup) { @@ -285,6 +324,7 @@ int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgr } strcpy(pTable->tableId, pCreate->tableId); strcpy(pTable->superTableId, pSuperTable->tableId); + pTable->type = TSDB_TABLE_TYPE_CHILD_TABLE; pTable->createdTime = taosGetTimestampMs(); pTable->superTable = pSuperTable; pTable->vgId = pVgroup->vgId; @@ -299,47 +339,35 @@ int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgr mError("table:%s, corresponding super table schema is null", pCreate->tableId); return TSDB_CODE_INVALID_TABLE; } - memcpy(schema, pTagData + TSDB_TABLE_ID_LEN + 1, size); + // memcpy(schema, pTagData + TSDB_TABLE_ID_LEN + 1, size); if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) { mError("table:%s, update sdb error", pCreate->tableId); return TSDB_CODE_SDB_ERROR; } - mgmtAddTimeSeries(pTable->superTable->numOfColumns - 1); mgmtSendCreateTableMsg(pTable, pVgroup); mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s", pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); - mgmtAddTableIntoDb(pDb); return 0; } int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) { - SVgObj *pVgroup; - SAcctObj *pAcct; - - pAcct = mgmtGetAcct(pDb->cfg.acct); - - if (pAcct != NULL) { - pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1); - } - - pVgroup = mgmtGetVgroup(pTable->vgId); + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { return TSDB_CODE_OTHERS; } - mgmtRestoreTimeSeries(pTable->superTable->numOfColumns - 1); mgmtSendRemoveMeterMsgToDnode((STableInfo *) pTable, pVgroup); + sdbDeleteRow(tsChildTableSdb, pTable); if (pVgroup->numOfTables <= 0) { mgmtDropVgroup(pDb, pVgroup); } - mgmtRemoveTableFromDb(pDb); return 0; } diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 5c454f647ee7409b32e30276f0a99433b869fa1c..c34670330c3532751ddd69801545fc8518abadd6 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -463,7 +463,7 @@ int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) { } if (pAlter->maxSessions > 0) { //rebuild meterList in mgmtVgroup.c - sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, 0); + mgmtUpdateVgroup(pVgroup); } mgmtSendVPeersMsg(pVgroup); pVgroup = pVgroup->next; diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index 2d93eb6ab9c21ffe0dab6b1f3bf17a82d9d42f4b..27e83abbcaa68598a50f377992931488ec27ce2e 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -32,7 +32,6 @@ #include "dnodeSystem.h" #include "mgmtChildTable.h" #include "mgmtNormalTable.h" -#include "mgmtStreamTable.h" void mgmtProcessMsgFromDnode(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn); int mgmtSendVPeersMsg(SVgObj *pVgroup); @@ -234,29 +233,6 @@ int32_t mgmtSendCreateTableMsg(SChildTableObj *pTable, SVgObj *pVgroup) { return 0; } -int32_t mgmtSendCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup) { -// uint64_t timeStamp = taosGetTimestampMs(); -// -// for (int32_t index = 0; index < pVgroup->numOfVnodes; ++index) { -// SDnodeObj *pObj = mgmtGetDnode(pVgroup->vnodeGid[index].ip); -// if (pObj == NULL) { -// continue; -// } -// -// int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE, 64000); -// if (pStart == NULL) { -// continue; -// } -// -// int8_t *pMsg = mgmtBuildCreateStreamTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode); -// int32_t msgLen = pMsg - pStart; -// -// mgmtSendMsgToDnode(pObj, pStart, msgLen); -// } -// -// pVgroup->lastCreate = timeStamp; - return 0; -} int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) { // uint64_t timeStamp = taosGetTimestampMs(); diff --git a/src/mnode/src/mgmtGrant.c b/src/mnode/src/mgmtGrant.c index f3a8f608cb7a2c33f5eefad7d8a81723e6a6f859..37a0753c23eb77db07d054b6227f0457f2bcf289 100644 --- a/src/mnode/src/mgmtGrant.c +++ b/src/mnode/src/mgmtGrant.c @@ -25,11 +25,15 @@ int32_t (*mgmtCheckUserGrant)() = mgmtCheckUserGrantImp; int32_t mgmtCheckDbGrantImp() { return 0; } int32_t (*mgmtCheckDbGrant)() = mgmtCheckDbGrantImp; -void mgmtAddTimeSeriesImp(uint32_t timeSeriesNum) {} -void (*mgmtAddTimeSeries)(uint32_t timeSeriesNum) = mgmtAddTimeSeriesImp; - -void mgmtRestoreTimeSeriesImp(uint32_t timeSeriesNum) {} -void (*mgmtRestoreTimeSeries)(uint32_t timeSeriesNum) = mgmtRestoreTimeSeriesImp; +void mgmtAddTimeSeriesImp(SAcctObj *pAcct, uint32_t timeSeriesNum) { + pAcct->acctInfo.numOfTimeSeries += timeSeriesNum; +} +void (*mgmtAddTimeSeries)(SAcctObj *pAcct, uint32_t timeSeriesNum) = mgmtAddTimeSeriesImp; + +void mgmtRestoreTimeSeriesImp(SAcctObj *pAcct, uint32_t timeSeriesNum) { + pAcct->acctInfo.numOfTimeSeries -= timeSeriesNum; +} +void (*mgmtRestoreTimeSeries)(SAcctObj *pAcct, uint32_t timeSeriesNum) = mgmtRestoreTimeSeriesImp; int32_t mgmtCheckTimeSeriesImp(uint32_t timeseries) { return 0; } int32_t (*mgmtCheckTimeSeries)(uint32_t timeseries) = mgmtCheckTimeSeriesImp; diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index 9411807aa4dfcb7ddfed6812530bd855a5d775fa..adb14560edb85f07c4a220a16298320223328502 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -36,6 +36,7 @@ #include "mgmtVgroup.h" void *tsNormalTableSdb; +int32_t tsNormalTableUpdateSize; void *(*mgmtNormalTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); void *mgmtNormalTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize); @@ -48,6 +49,7 @@ void *mgmtNormalTableActionDestroy(void *row, char *str, int32_t size, int32_t * static void mgmtDestroyNormalTable(SNormalTableObj *pTable) { free(pTable->schema); + free(pTable->sql); free(pTable); } @@ -63,8 +65,13 @@ static void mgmtNormalTableActionInit() { void *mgmtNormalTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) { SNormalTableObj *pTable = (SNormalTableObj *) row; - int32_t tsize = pTable->updateEnd - (int8_t *) pTable; - memcpy(pTable, str, tsize); + memcpy(pTable, str, tsNormalTableUpdateSize); + + int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns) + pTable->sqlLen; + pTable->schema = realloc(pTable->schema, schemaSize); + pTable->sql = (char*)pTable->schema + sizeof(SSchema) * (pTable->numOfColumns); + memcpy(pTable->schema, str + tsNormalTableUpdateSize, schemaSize); + return NULL; } @@ -103,10 +110,9 @@ void *mgmtNormalTableActionInsert(void *row, char *str, int32_t size, int32_t *s } } - pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1); - pVgroup->numOfTables++; - pDb->numOfTables++; - pVgroup->tableList[pTable->sid] = (STableInfo *) pTable; + mgmtAddTimeSeries(pAcct, pTable->numOfColumns - 1); + mgmtAddTableIntoDb(pDb); + mgmtAddTableIntoVgroup(pVgroup, (STableInfo *) pTable); if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) { mgmtMoveVgroupToTail(pDb, pVgroup); @@ -139,11 +145,9 @@ void *mgmtNormalTableActionDelete(void *row, char *str, int32_t size, int32_t *s return NULL; } - pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1); - pVgroup->tableList[pTable->sid] = NULL; - pVgroup->numOfTables--; - pDb->numOfTables--; - taosFreeId(pVgroup->idPool, pTable->sid); + mgmtRestoreTimeSeries(pAcct, pTable->numOfColumns - 1); + mgmtRemoveTableFromDb(pDb); + mgmtRemoveTableFromVgroup(pVgroup, (STableInfo *) pTable); if (pVgroup->numOfTables > 0) { mgmtMoveVgroupToHead(pDb, pVgroup); @@ -160,16 +164,16 @@ void *mgmtNormalTableActionEncode(void *row, char *str, int32_t size, int32_t *s SNormalTableObj *pTable = (SNormalTableObj *) row; assert(row != NULL && str != NULL); - int32_t tsize = pTable->updateEnd - (int8_t *) pTable; int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); - if (size < tsize + schemaSize + 1) { + if (size < tsNormalTableUpdateSize + schemaSize + 1) { *ssize = -1; return NULL; } - memcpy(str, pTable, tsize); - memcpy(str + tsize, pTable->schema, schemaSize); - *ssize = tsize + schemaSize; + memcpy(str, pTable, tsNormalTableUpdateSize); + memcpy(str + tsNormalTableUpdateSize, pTable->schema, schemaSize); + memcpy(str + tsNormalTableUpdateSize + schemaSize, pTable->sql, pTable->sqlLen); + *ssize = tsNormalTableUpdateSize + schemaSize + pTable->sqlLen; return NULL; } @@ -183,12 +187,11 @@ void *mgmtNormalTableActionDecode(void *row, char *str, int32_t size, int32_t *s } memset(pTable, 0, sizeof(SNormalTableObj)); - int32_t tsize = pTable->updateEnd - (int8_t *)pTable; - if (size < tsize) { + if (size < tsNormalTableUpdateSize) { mgmtDestroyNormalTable(pTable); return NULL; } - memcpy(pTable, str, tsize); + memcpy(pTable, str, tsNormalTableUpdateSize); int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); pTable->schema = (SSchema *)malloc(schemaSize); @@ -197,7 +200,14 @@ void *mgmtNormalTableActionDecode(void *row, char *str, int32_t size, int32_t *s return NULL; } - memcpy(pTable->schema, str + tsize, schemaSize); + memcpy(pTable->schema, str + tsNormalTableUpdateSize, schemaSize); + + pTable->sql = (char *)malloc(pTable->sqlLen); + if (pTable->sql == NULL) { + mgmtDestroyNormalTable(pTable); + return NULL; + } + memcpy(pTable->sql, str + tsNormalTableUpdateSize + schemaSize, pTable->sqlLen); return (void *)pTable; } @@ -211,37 +221,69 @@ void *mgmtNormalTableAction(char action, void *row, char *str, int32_t size, int int32_t mgmtInitNormalTables() { void *pNode = NULL; void *pLastNode = NULL; - SChildTableObj *pTable = NULL; + SNormalTableObj *pTable = NULL; mgmtNormalTableActionInit(); + SNormalTableObj tObj; + tsNormalTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj; tsNormalTableSdb = sdbOpenTable(tsMaxTables, sizeof(SNormalTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS, "ntables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtNormalTableAction); if (tsNormalTableSdb == NULL) { - mError("failed to init normal table data"); + mError("failed to init ntables data"); return -1; } - pNode = NULL; while (1) { + pLastNode = pNode; pNode = sdbFetchRow(tsNormalTableSdb, pNode, (void **)&pTable); - if (pTable == NULL) { - break; - } + if (pTable == NULL) break; SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); if (pDb == NULL) { - mError("normal table:%s, failed to get db, discard it", pTable->tableId); + mError("ntable:%s, failed to get db, discard it", pTable->tableId); sdbDeleteRow(tsNormalTableSdb, pTable); pNode = pLastNode; continue; } - mgmtAddTableIntoDb(pDb); - } - mgmtSetVgroupIdPool(); + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + mError("ntable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->tableId, pTable->vgId, pTable->sid); + pTable->vgId = 0; + sdbDeleteRow(tsNormalTableSdb, pTable); + pNode = pLastNode; + continue; + } + + if (strcmp(pVgroup->dbName, pDb->name) != 0) { + mError("ntable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it", + pTable->tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid); + pTable->vgId = 0; + sdbDeleteRow(tsNormalTableSdb, pTable); + pNode = pLastNode; + continue; + } + + if (pVgroup->tableList == NULL) { + mError("ntable:%s, vgroup:%d tableList is null", pTable->tableId, pTable->vgId); + pTable->vgId = 0; + sdbDeleteRow(tsNormalTableSdb, pTable); + pNode = pLastNode; + continue; + } + + mgmtAddTableIntoVgroup(pVgroup, pTable); + //pVgroup->tableList[pTable->sid] = (STableInfo*)pTable; + taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid, 1); + + pTable->sql = (char *)pTable->schema + sizeof(SSchema) * pTable->numOfColumns; - mTrace("normal table is initialized"); + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); + mgmtAddTimeSeries(pAcct, pTable->numOfColumns - 1); + } + + mTrace("ntables is initialized"); return 0; } @@ -278,9 +320,9 @@ int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable) { } int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) { - int32_t numOfTables = sdbGetNumOfRows(tsChildTableSdb); - if (numOfTables >= TSDB_MAX_TABLES) { - mError("normal table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_TABLES); + int32_t numOfTables = sdbGetNumOfRows(tsNormalTableSdb); + if (numOfTables >= TSDB_MAX_NORMAL_TABLES) { + mError("normal table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_NORMAL_TABLES); return TSDB_CODE_TOO_MANY_TABLES; } @@ -290,6 +332,7 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg } strcpy(pTable->tableId, pCreate->tableId); + pTable->type = TSDB_TABLE_TYPE_NORMAL_TABLE; pTable->createdTime = taosGetTimestampMs(); pTable->vgId = pVgroup->vgId; pTable->sid = sid; @@ -297,13 +340,12 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg pTable->sversion = 0; pTable->numOfColumns = pCreate->numOfColumns; - int32_t numOfCols = pCreate->numOfColumns + pCreate->numOfTags; + int32_t numOfCols = pCreate->numOfColumns; int32_t schemaSize = numOfCols * sizeof(SSchema); pTable->schema = (SSchema *) calloc(1, schemaSize); if (pTable->schema == NULL) { free(pTable); - mError("table:%s, no schema input", pCreate->tableId); - return TSDB_CODE_INVALID_TABLE; + return TSDB_CODE_SERV_OUT_OF_MEMORY; } memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema)); @@ -313,50 +355,46 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg tschema[col].colId = pTable->nextColId++; } + pTable->sqlLen = pCreate->sqlLen; + if (pTable->sqlLen != 0) { + pTable->type = TSDB_TABLE_TYPE_STREAM_TABLE; + pTable->sql = calloc(1, pTable->sqlLen); + if (pTable->sql == NULL) { + free(pTable); + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + memcpy(pTable->sql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pCreate->sqlLen); + pTable->sql[pCreate->sqlLen - 1] = 0; + mTrace("table:%s, stream sql len:%d sql:%s", pCreate->tableId, pCreate->sqlLen, pTable->sql); + } + if (sdbInsertRow(tsNormalTableSdb, pTable, 0) < 0) { mError("table:%s, update sdb error", pCreate->tableId); return TSDB_CODE_SDB_ERROR; } - mgmtAddTimeSeries(pTable->numOfColumns - 1); - mgmtSendCreateNormalTableMsg(pTable, pVgroup); - mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" - PRIu64 - " db:%s", + mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s", pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); - mgmtAddTableIntoDb(pDb); return 0; } int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) { - SVgObj *pVgroup; - SAcctObj *pAcct; - - pAcct = mgmtGetAcct(pDb->cfg.acct); - - if (pAcct != NULL) { - pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1); - } - - pVgroup = mgmtGetVgroup(pTable->vgId); + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { return TSDB_CODE_OTHERS; } - mgmtRestoreTimeSeries(pTable->numOfColumns - 1); - mgmtSendRemoveMeterMsgToDnode((STableInfo *) pTable, pVgroup); - sdbDeleteRow(tsChildTableSdb, pTable); + sdbDeleteRow(tsNormalTableSdb, pTable); if (pVgroup->numOfTables <= 0) { mgmtDropVgroup(pDb, pVgroup); } - mgmtRemoveTableFromDb(pDb); return 0; } diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index f702ed15c0df4135de230c32ce0cdd515d4090f0..c55d3279faf3fbdbe97c9a52a96a66a996f850ce 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -33,7 +33,6 @@ #include "mgmtNormalTable.h" #include "mgmtProfile.h" #include "mgmtShell.h" -#include "mgmtStreamTable.h" #include "mgmtSuperTable.h" #include "mgmtTable.h" #include "mgmtUser.h" @@ -180,7 +179,7 @@ int32_t mgmtProcessTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { STableMeta *pMeta = rpcMallocCont(sizeof(STableMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS); int32_t code = mgmtGetTableMeta(pDb, pTable, pMeta, usePublicIp); - if (code == TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS) { rpcFreeCont(pMeta); rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, NULL, 0); } else { @@ -253,7 +252,7 @@ int32_t mgmtProcessSuperTableMetaMsg(void *pCont, int32_t contLen, void *ahandle SRpcConnInfo connInfo; rpcGetConnInfo(ahandle, &connInfo); - bool usePublicIp = (connInfo.serverIp == tsPublicIpInt); +// bool usePublicIp = (connInfo.serverIp == tsPublicIpInt); SSuperTableInfoMsg *pInfo = pCont; STableInfo *pTable = mgmtGetSuperTable(pInfo->tableId); @@ -810,7 +809,9 @@ int32_t mgmtProcessCreateTableMsg(void *pCont, int32_t contLen, void *ahandle) { } } - rpcSendResponse(ahandle, code, NULL, 0); + if (code != TSDB_CODE_ACTION_IN_PROGRESS) { + rpcSendResponse(ahandle, code, NULL, 0); + } return code; } diff --git a/src/mnode/src/mgmtStreamTable.c b/src/mnode/src/mgmtStreamTable.c deleted file mode 100644 index aaceb24d296d3b2c25a603f716875d7690ed9ae0..0000000000000000000000000000000000000000 --- a/src/mnode/src/mgmtStreamTable.c +++ /dev/null @@ -1,428 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "taosmsg.h" -#include "tast.h" -#include "textbuffer.h" -#include "tschemautil.h" -#include "tscompression.h" -#include "tskiplist.h" -#include "tsqlfunction.h" -#include "ttime.h" -#include "tstatus.h" -#include "tutil.h" -#include "mnode.h" -#include "mgmtAcct.h" -#include "mgmtDb.h" -#include "mgmtDnodeInt.h" -#include "mgmtGrant.h" -#include "mgmtStreamTable.h" -#include "mgmtSuperTable.h" -#include "mgmtTable.h" -#include "mgmtVgroup.h" - -void *tsStreamTableSdb; -void *(*mgmtStreamTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); - -void *mgmtStreamTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtStreamTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtStreamTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtStreamTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtStreamTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtStreamTableActionReset(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtStreamTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize); - -static void mgmtDestroyStreamTable(SStreamTableObj *pTable) { - free(pTable->schema); - free(pTable->sql); - free(pTable); -} - -static void mgmtStreamTableActionInit() { - mgmtStreamTableActionFp[SDB_TYPE_INSERT] = mgmtStreamTableActionInsert; - mgmtStreamTableActionFp[SDB_TYPE_DELETE] = mgmtStreamTableActionDelete; - mgmtStreamTableActionFp[SDB_TYPE_UPDATE] = mgmtStreamTableActionUpdate; - mgmtStreamTableActionFp[SDB_TYPE_ENCODE] = mgmtStreamTableActionEncode; - mgmtStreamTableActionFp[SDB_TYPE_DECODE] = mgmtStreamTableActionDecode; - mgmtStreamTableActionFp[SDB_TYPE_RESET] = mgmtStreamTableActionReset; - mgmtStreamTableActionFp[SDB_TYPE_DESTROY] = mgmtStreamTableActionDestroy; -} - -void *mgmtStreamTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) { - SStreamTableObj *pTable = (SStreamTableObj *) row; - int32_t tsize = pTable->updateEnd - (int8_t *) pTable; - memcpy(pTable, str, tsize); - - int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); - pTable->schema = (SSchema *) realloc(pTable->schema, schemaSize); - memcpy(pTable->schema, str + tsize, schemaSize); - - pTable->sql = (char *) realloc(pTable->sql, pTable->sqlLen); - memcpy(pTable->sql, str + tsize + schemaSize, pTable->sqlLen); - return NULL; -} - -void *mgmtStreamTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) { - SStreamTableObj *pTable = (SStreamTableObj *)row; - mgmtDestroyStreamTable(pTable); - return NULL; -} - -void *mgmtStreamTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { - SNormalTableObj *pTable = (SNormalTableObj *) row; - - SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); - if (pVgroup == NULL) { - mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId); - return NULL; - } - - SDbObj *pDb = mgmtGetDb(pVgroup->dbName); - if (pDb == NULL) { - mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName); - return NULL; - } - - SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); - if (pAcct == NULL) { - mError("account not exists"); - return NULL; - } - - if (!sdbMaster) { - int32_t sid = taosAllocateId(pVgroup->idPool); - if (sid != pTable->sid) { - mError("sid:%d is not matched from the master:%d", sid, pTable->sid); - return NULL; - } - } - - pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1); - pVgroup->numOfTables++; - pDb->numOfTables++; - pVgroup->tableList[pTable->sid] = (STableInfo *) pTable; - - if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) { - mgmtMoveVgroupToTail(pDb, pVgroup); - } - - return NULL; -} - -void *mgmtStreamTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize) { - SNormalTableObj *pTable = (SNormalTableObj *) row; - if (pTable->vgId == 0) { - return NULL; - } - - SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); - if (pVgroup == NULL) { - mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId); - return NULL; - } - - SDbObj *pDb = mgmtGetDb(pVgroup->dbName); - if (pDb == NULL) { - mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName); - return NULL; - } - - SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); - if (pAcct == NULL) { - mError("account not exists"); - return NULL; - } - - pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1); - pVgroup->tableList[pTable->sid] = NULL; - pVgroup->numOfTables--; - pDb->numOfTables--; - taosFreeId(pVgroup->idPool, pTable->sid); - - if (pVgroup->numOfTables > 0) { - mgmtMoveVgroupToHead(pDb, pVgroup); - } - - return NULL; -} - -void *mgmtStreamTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) { - return mgmtStreamTableActionReset(row, str, size, NULL); -} - -void *mgmtStreamTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize) { - SStreamTableObj *pTable = (SStreamTableObj *) row; - assert(row != NULL && str != NULL); - - int32_t tsize = pTable->updateEnd - (int8_t *) pTable; - int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); - if (size < tsize + schemaSize + pTable->sqlLen + 1) { - *ssize = -1; - return NULL; - } - - memcpy(str, pTable, tsize); - memcpy(str + tsize, pTable->schema, schemaSize); - memcpy(str + tsize + schemaSize, pTable->sql, pTable->sqlLen); - *ssize = tsize + schemaSize + pTable->sqlLen; - - return NULL; -} - -void *mgmtStreamTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize) { - assert(str != NULL); - - SStreamTableObj *pTable = (SStreamTableObj *)malloc(sizeof(SNormalTableObj)); - if (pTable == NULL) { - return NULL; - } - memset(pTable, 0, sizeof(SStreamTableObj)); - - int32_t tsize = pTable->updateEnd - (int8_t *)pTable; - if (size < tsize) { - mgmtDestroyStreamTable(pTable); - return NULL; - } - memcpy(pTable, str, tsize); - - int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); - pTable->schema = (SSchema *)malloc(schemaSize); - if (pTable->schema == NULL) { - mgmtDestroyStreamTable(pTable); - return NULL; - } - memcpy(pTable->schema, str + tsize, schemaSize); - - pTable->sql = (char *)malloc(pTable->sqlLen); - if (pTable->sql == NULL) { - mgmtDestroyStreamTable(pTable); - return NULL; - } - memcpy(pTable->sql, str + tsize + schemaSize, pTable->sqlLen); - return (void *)pTable; -} - -void *mgmtStreamTableAction(char action, void *row, char *str, int32_t size, int32_t *ssize) { - if (mgmtStreamTableActionFp[(uint8_t)action] != NULL) { - return (*(mgmtStreamTableActionFp[(uint8_t)action]))(row, str, size, ssize); - } - return NULL; -} - -int32_t mgmtInitStreamTables() { - void *pNode = NULL; - void *pLastNode = NULL; - SChildTableObj *pTable = NULL; - - mgmtStreamTableActionInit(); - - tsStreamTableSdb = sdbOpenTable(tsMaxTables, sizeof(SStreamTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_MAX_SQL_LEN, - "streams", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtStreamTableAction); - if (tsStreamTableSdb == NULL) { - mError("failed to init stream table data"); - return -1; - } - - pNode = NULL; - while (1) { - pNode = sdbFetchRow(tsStreamTableSdb, pNode, (void **)&pTable); - if (pTable == NULL) { - break; - } - - SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); - if (pDb == NULL) { - mError("stream table:%s, failed to get db, discard it", pTable->tableId); - sdbDeleteRow(tsStreamTableSdb, pTable); - pNode = pLastNode; - continue; - } - mgmtAddTableIntoDb(pDb); - } - - mgmtSetVgroupIdPool(); - - mTrace("stream table is initialized"); - return 0; -} - -void mgmtCleanUpStreamTables() { -} - -int8_t *mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup) { -// SDCreateTableMsg *pCreateTable = (SDCreateTableMsg *) pMsg; -// memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); -// pCreateTable->vnode = htonl(vnode); -// pCreateTable->sid = htonl(pTable->sid); -// pCreateTable->uid = pTable->uid; -// pCreateTable->createdTime = htobe64(pTable->createdTime); -// pCreateTable->sversion = htonl(pTable->sversion); -// pCreateTable->numOfColumns = htons(pTable->numOfColumns); -// //pCreateTable->sqlLen = htons(pTable->sqlLen); -// -// SSchema *pSchema = pTable->schema; -// int32_t totalCols = pCreateTable->numOfColumns; - -// for (int32_t col = 0; col < totalCols; ++col) { -// SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col]; -// colData->type = pSchema[col].type; -// colData->bytes = htons(pSchema[col].bytes); -// colData->colId = htons(pSchema[col].colId); -// } - -// int32_t totalColsSize = sizeof(SMColumn *) * totalCols; -// pMsg = pCreateTable->data + totalColsSize + pTable->sqlLen; - -// char *sql = pTable->schema + pTable->schemaSize; -// memcpy(pCreateTable->data + totalColsSize, pTable->sqlLen, sql); - -// return pMsg; - return NULL; -} - -int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) { - int32_t numOfTables = sdbGetNumOfRows(tsStreamTableSdb); - if (numOfTables >= TSDB_MAX_TABLES) { - mError("stream table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_TABLES); - return TSDB_CODE_TOO_MANY_TABLES; - } - - SStreamTableObj *pTable = (SStreamTableObj *) calloc(sizeof(SStreamTableObj), 1); - if (pTable == NULL) { - return TSDB_CODE_SERV_OUT_OF_MEMORY; - } - - strcpy(pTable->tableId, pCreate->tableId); - pTable->createdTime = taosGetTimestampMs(); - pTable->vgId = pVgroup->vgId; - pTable->sid = sid; - pTable->uid = (((uint64_t) pTable->createdTime) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul)); - pTable->sversion = 0; - pTable->numOfColumns = pCreate->numOfColumns; - - int32_t numOfCols = pCreate->numOfColumns + pCreate->numOfTags; - int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); - pTable->schema = (SSchema *) calloc(1, schemaSize); - if (pTable->schema == NULL) { - free(pTable); - mError("table:%s, no schema input", pCreate->tableId); - return TSDB_CODE_INVALID_TABLE; - } - memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema)); - - pTable->nextColId = 0; - for (int32_t col = 0; col < pCreate->numOfColumns; col++) { - SSchema *tschema = (SSchema *) pTable->schema; - tschema[col].colId = pTable->nextColId++; - } - - pTable->sql = (char*)(pTable->schema + numOfCols * sizeof(SSchema)); - memcpy(pTable->sql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pCreate->sqlLen); - pTable->sql[pCreate->sqlLen - 1] = 0; - mTrace("table:%s, stream sql len:%d sql:%s", pCreate->tableId, pCreate->sqlLen, pTable->sql); - - if (sdbInsertRow(tsStreamTableSdb, pTable, 0) < 0) { - mError("table:%s, update sdb error", pCreate->tableId); - return TSDB_CODE_SDB_ERROR; - } - - mgmtAddTimeSeries(pTable->numOfColumns - 1); - - mgmtSendCreateStreamTableMsg(pTable, pVgroup); - - mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" - PRIu64 - " db:%s", - pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); - - mgmtAddTableIntoDb(pDb); - return 0; -} - -int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable) { - SVgObj * pVgroup; - SAcctObj *pAcct; - - pAcct = mgmtGetAcct(pDb->cfg.acct); - - if (pAcct != NULL) { - pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1); - } - - pVgroup = mgmtGetVgroup(pTable->vgId); - if (pVgroup == NULL) { - return TSDB_CODE_OTHERS; - } - - mgmtRestoreTimeSeries(pTable->numOfColumns - 1); - - mgmtSendRemoveMeterMsgToDnode((STableInfo *) pTable, pVgroup); - - sdbDeleteRow(tsChildTableSdb, pTable); - - if (pVgroup->numOfTables <= 0) { - mgmtDropVgroup(pDb, pVgroup); - } - - mgmtRemoveTableFromDb(pDb); - return 0; -} - -void* mgmtGetStreamTable(char *tableId) { - return sdbGetRow(tsStreamTableSdb, tableId); -} - -static int32_t mgmtSetSchemaFromStreamTable(SSchema *pSchema, SStreamTableObj *pTable) { - int32_t numOfCols = pTable->numOfColumns; - for (int32_t i = 0; i < numOfCols; ++i) { - strcpy(pSchema->name, pTable->schema[i].name); - pSchema->type = pTable->schema[i].type; - pSchema->bytes = htons(pTable->schema[i].bytes); - pSchema->colId = htons(pTable->schema[i].colId); - pSchema++; - } - - return numOfCols * sizeof(SSchema); -} - -int32_t mgmtGetStreamTableMeta(SDbObj *pDb, SStreamTableObj *pTable, STableMeta *pMeta, bool usePublicIp) { - pMeta->uid = htobe64(pTable->uid); - pMeta->sid = htonl(pTable->sid); - pMeta->vgid = htonl(pTable->vgId); - pMeta->sversion = htons(pTable->sversion); - pMeta->precision = pDb->cfg.precision; - pMeta->numOfTags = 0; - pMeta->numOfColumns = htons(pTable->numOfColumns); - pMeta->tableType = pTable->type; - pMeta->contLen = sizeof(STableMeta) + mgmtSetSchemaFromStreamTable(pMeta->schema, pTable); - - SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); - if (pVgroup == NULL) { - return TSDB_CODE_INVALID_TABLE; - } - for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { - if (usePublicIp) { - pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; - pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); - } else { - pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip; - pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); - } - } - - return TSDB_CODE_SUCCESS; -} diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index 894ac7beba3ca4a60ac4b0e0840dd2b4a5815708..d420391d73d6bd116a86cfceda6a1073c8272348 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -37,17 +37,17 @@ #include "mgmtUser.h" #include "mgmtVgroup.h" -void *tsSuperTableSdb; -int32_t tsSuperTableUpdateSize; -void *(*mgmtSuperTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); - -void *mgmtSuperTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtSuperTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtSuperTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtSuperTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtSuperTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtSuperTableActionReset(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtSuperTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize); +static void *tsSuperTableSdb; +static int32_t tsSuperTableUpdateSize; + +static void *(*mgmtSuperTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); +static void *mgmtSuperTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize); +static void *mgmtSuperTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize); +static void *mgmtSuperTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize); +static void *mgmtSuperTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize); +static void *mgmtSuperTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize); +static void *mgmtSuperTableActionReset(void *row, char *str, int32_t size, int32_t *ssize); +static void *mgmtSuperTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize); static void mgmtDestroySuperTable(SSuperTableObj *pTable) { free(pTable->schema); @@ -55,22 +55,25 @@ static void mgmtDestroySuperTable(SSuperTableObj *pTable) { } static void mgmtSuperTableActionInit() { - mgmtSuperTableActionFp[SDB_TYPE_INSERT] = mgmtSuperTableActionInsert; - mgmtSuperTableActionFp[SDB_TYPE_DELETE] = mgmtSuperTableActionDelete; - mgmtSuperTableActionFp[SDB_TYPE_UPDATE] = mgmtSuperTableActionUpdate; - mgmtSuperTableActionFp[SDB_TYPE_ENCODE] = mgmtSuperTableActionEncode; - mgmtSuperTableActionFp[SDB_TYPE_DECODE] = mgmtSuperTableActionDecode; - mgmtSuperTableActionFp[SDB_TYPE_RESET] = mgmtSuperTableActionReset; + SSuperTableObj tObj; + tsSuperTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj; + + mgmtSuperTableActionFp[SDB_TYPE_INSERT] = mgmtSuperTableActionInsert; + mgmtSuperTableActionFp[SDB_TYPE_DELETE] = mgmtSuperTableActionDelete; + mgmtSuperTableActionFp[SDB_TYPE_UPDATE] = mgmtSuperTableActionUpdate; + mgmtSuperTableActionFp[SDB_TYPE_ENCODE] = mgmtSuperTableActionEncode; + mgmtSuperTableActionFp[SDB_TYPE_DECODE] = mgmtSuperTableActionDecode; + mgmtSuperTableActionFp[SDB_TYPE_RESET] = mgmtSuperTableActionReset; mgmtSuperTableActionFp[SDB_TYPE_DESTROY] = mgmtSuperTableActionDestroy; } void *mgmtSuperTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) { SSuperTableObj *pTable = (SSuperTableObj *) row; - memcpy(pTable, str, tsDbUpdateSize); + memcpy(pTable, str, tsSuperTableUpdateSize); int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags); pTable->schema = realloc(pTable->schema, schemaSize); - memcpy(pTable->schema, str + tsDbUpdateSize, schemaSize); + memcpy(pTable->schema, str + tsSuperTableUpdateSize, schemaSize); return NULL; } @@ -82,10 +85,20 @@ void *mgmtSuperTableActionDestroy(void *row, char *str, int32_t size, int32_t *s } void *mgmtSuperTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { + STableInfo *pTable = (STableInfo *) row; + SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); + if (pDb) { + mgmtAddSuperTableIntoDb(pDb); + } return NULL; } void *mgmtSuperTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize) { + STableInfo *pTable = (STableInfo *) row; + SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); + if (pDb) { + mgmtRemoveSuperTableFromDb(pDb); + } return NULL; } @@ -114,7 +127,7 @@ void *mgmtSuperTableActionEncode(void *row, char *str, int32_t size, int32_t *ss void *mgmtSuperTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize) { assert(str != NULL); - SSuperTableObj *pTable = (SSuperTableObj *)malloc(sizeof(SSuperTableObj)); + SSuperTableObj *pTable = (SSuperTableObj *) malloc(sizeof(SSuperTableObj)); if (pTable == NULL) { return NULL; } @@ -134,35 +147,33 @@ void *mgmtSuperTableActionDecode(void *row, char *str, int32_t size, int32_t *ss } memcpy(pTable->schema, str + tsSuperTableUpdateSize, schemaSize); - return (void *)pTable; + return (void *) pTable; } void *mgmtSuperTableAction(char action, void *row, char *str, int32_t size, int32_t *ssize) { - if (mgmtSuperTableActionFp[(uint8_t)action] != NULL) { - return (*(mgmtSuperTableActionFp[(uint8_t)action]))(row, str, size, ssize); + if (mgmtSuperTableActionFp[(uint8_t) action] != NULL) { + return (*(mgmtSuperTableActionFp[(uint8_t) action]))(row, str, size, ssize); } return NULL; } int32_t mgmtInitSuperTables() { - void * pNode = NULL; - void * pLastNode = NULL; - SSuperTableObj * pTable = NULL; + void *pNode = NULL; + void *pLastNode = NULL; + SSuperTableObj *pTable = NULL; mgmtSuperTableActionInit(); - SSuperTableObj tObj; - tsSuperTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj; tsSuperTableSdb = sdbOpenTable(tsMaxTables, tsSuperTableUpdateSize + sizeof(SSchema) * TSDB_MAX_COLUMNS, "stables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtSuperTableAction); if (tsSuperTableSdb == NULL) { - mError("failed to init super table data"); + mError("failed to init stables data"); return -1; } pNode = NULL; while (1) { - pNode = sdbFetchRow(tsSuperTableSdb, pNode, (void **)&pTable); + pNode = sdbFetchRow(tsSuperTableSdb, pNode, (void **) &pTable); if (pTable == NULL) { break; } @@ -178,9 +189,7 @@ int32_t mgmtInitSuperTables() { mgmtAddSuperTableIntoDb(pDb); } - mgmtSetVgroupIdPool(); - - mTrace("super table is initialized"); + mTrace("stables is initialized"); return 0; } @@ -190,8 +199,8 @@ void mgmtCleanUpSuperTables() { int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate) { int32_t numOfTables = sdbGetNumOfRows(tsSuperTableSdb); - if (numOfTables >= TSDB_MAX_TABLES) { - mError("super table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_TABLES); + if (numOfTables >= TSDB_MAX_SUPER_TABLES) { + mError("stable:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_SUPER_TABLES); return TSDB_CODE_TOO_MANY_TABLES; } @@ -201,6 +210,7 @@ int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate) { } strcpy(pStable->tableId, pCreate->tableId); + pStable->type = TSDB_TABLE_TYPE_SUPER_TABLE; pStable->createdTime = taosGetTimestampMs(); pStable->vgId = 0; pStable->sid = 0; @@ -214,7 +224,7 @@ int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate) { pStable->schema = (SSchema *)calloc(1, schemaSize); if (pStable->schema == NULL) { free(pStable); - mError("table:%s, no schema input", pCreate->tableId); + mError("stable:%s, no schema input", pCreate->tableId); return TSDB_CODE_INVALID_TABLE; } memcpy(pStable->schema, pCreate->schema, numOfCols * sizeof(SSchema)); @@ -230,7 +240,6 @@ int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate) { return TSDB_CODE_SDB_ERROR; } - mgmtAddSuperTableIntoDb(pDb); return TSDB_CODE_SUCCESS; } @@ -634,20 +643,6 @@ int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, STableMeta *p pMeta->tableType = pTable->type; pMeta->contLen = sizeof(STableMeta) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable); - SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); - if (pVgroup == NULL) { - return TSDB_CODE_INVALID_TABLE; - } - for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { - if (usePublicIp) { - pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; - pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); - } else { - pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip; - pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); - } - } - return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index d92c85abfffd99d4c7e7b651d21b457dd7cd1c27..b9e80529026121a77a150a69fab57fd55e3b1ac9 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -33,11 +33,14 @@ #include "mgmtDnodeInt.h" #include "mgmtGrant.h" #include "mgmtNormalTable.h" -#include "mgmtStreamTable.h" #include "mgmtSuperTable.h" #include "mgmtTable.h" +#include "mgmtUser.h" #include "mgmtVgroup.h" +extern void *tsNormalTableSdb; +extern void *tsChildTableSdb; + int32_t mgmtInitTables() { int32_t code = mgmtInitSuperTables(); if (code != TSDB_CODE_SUCCESS) { @@ -49,16 +52,13 @@ int32_t mgmtInitTables() { return code; } - code = mgmtInitStreamTables(); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - code = mgmtInitChildTables(); if (code != TSDB_CODE_SUCCESS) { return code; } + mgmtSetVgroupIdPool(); + return TSDB_CODE_SUCCESS; } @@ -73,12 +73,7 @@ STableInfo* mgmtGetTable(char *tableId) { return tableInfo; } - tableInfo = (STableInfo *) mgmtGetStreamTable(tableId); - if (tableInfo != NULL) { - return tableInfo; - } - - tableInfo = (STableInfo *) mgmtGetNormalTable(tableId); + tableInfo = (STableInfo *) mgmtGetChildTable(tableId); if (tableInfo != NULL) { return tableInfo; } @@ -102,8 +97,6 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid) { int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp) { if (pTable->type == TSDB_TABLE_TYPE_CHILD_TABLE) { mgmtGetChildTableMeta(pDb, (SChildTableObj *) pTable, pMeta, usePublicIp); - } else if (pTable->type == TSDB_TABLE_TYPE_STREAM_TABLE) { - mgmtGetStreamTableMeta(pDb, (SStreamTableObj *) pTable, pMeta, usePublicIp); } else if (pTable->type == TSDB_TABLE_TYPE_NORMAL_TABLE) { mgmtGetNormalTableMeta(pDb, (SNormalTableObj *) pTable, pMeta, usePublicIp); } else if (pTable->type == TSDB_TABLE_TYPE_SUPER_TABLE) { @@ -147,22 +140,17 @@ int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) { return grantCode; } - SVgObj *pVgroup = mgmtGetAvailVgroup(pDb); + int32_t sid; + SVgObj *pVgroup = mgmtGetAvailVgroup(pDb, &sid); if (pVgroup == NULL) { - return terrno; - } - - int32_t sid = mgmtAllocateSid(pDb, pVgroup); - if (sid < 0) { - return terrno; - } - - if (pCreate->numOfColumns == 0) { - return mgmtCreateChildTable(pDb, pCreate, pVgroup, sid); - } else if (pCreate->sqlLen > 0) { - return mgmtCreateStreamTable(pDb, pCreate, pVgroup, sid); + // process it in a callback function + return TSDB_CODE_ACTION_IN_PROGRESS; } else { - return mgmtCreateNormalTable(pDb, pCreate, pVgroup, sid); + if (pCreate->numOfColumns == 0) { + return mgmtCreateChildTable(pDb, pCreate, pVgroup, sid); + } else { + return mgmtCreateNormalTable(pDb, pCreate, pVgroup, sid); + } } } else { return mgmtCreateSuperTable(pDb, pCreate); @@ -188,8 +176,6 @@ int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore) { return mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable); case TSDB_TABLE_TYPE_CHILD_TABLE: return mgmtDropChildTable(pDb, (SChildTableObj *) pTable); - case TSDB_TABLE_TYPE_STREAM_TABLE: - return mgmtDropStreamTable(pDb, (SStreamTableObj *) pTable); case TSDB_TABLE_TYPE_NORMAL_TABLE: return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable); default: @@ -242,7 +228,6 @@ int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter) { void mgmtCleanUpMeters() { mgmtCleanUpNormalTables(); - mgmtCleanUpStreamTables(); mgmtCleanUpChildTables(); mgmtCleanUpSuperTables(); } @@ -306,116 +291,102 @@ static void mgmtVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_ } int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) { + SDbObj *pDb = mgmtGetDb(pShow->db); + if (pDb == NULL) return 0; + + SUserObj *pUser = mgmtGetUserFromConn(pConn); + if (pUser == NULL) return 0; + + if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { + if (strcmp(pUser->user, "root") != 0 && strcmp(pUser->user, "_root") != 0 && + strcmp(pUser->user, "monitor") != 0) { + return 0; + } + } + int32_t numOfRows = 0; -// int32_t numOfRead = 0; -// int32_t cols = 0; -// void *pTable = NULL; -// char *pWrite = NULL; -// -// int16_t numOfColumns; -// int64_t createdTime; -// char *tableId; -// char *superTableId; -// SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; -// -// SDbObj *pDb = NULL; -// if (pConn->pDb != NULL) { -// pDb = mgmtGetDb(pConn->pDb->name); -// } -// -// if (pDb == NULL) { -// return 0; -// } -// -// if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { -// if (strcmp(pConn->pUser->user, "root") != 0 && strcmp(pConn->pUser->user, "_root") != 0 && -// strcmp(pConn->pUser->user, "monitor") != 0) { -// return 0; -// } -// } -// -// char prefix[20] = {0}; -// strcpy(prefix, pDb->name); -// strcat(prefix, TS_PATH_DELIMITER); -// int32_t prefixLen = strlen(prefix); -// -// while (numOfRows < rows) { -// void *pNormalTableNode = sdbFetchRow(tsNormalTableSdb, pShow->pNode, (void **) &pTable); -// if (pTable != NULL) { -// SNormalTableObj *pNormalTable = (SNormalTableObj *) pTable; -// pShow->pNode = pNormalTableNode; -// tableId = pNormalTable->tableId; -// superTableId = NULL; -// createdTime = pNormalTable->createdTime; -// numOfColumns = pNormalTable->numOfColumns; -// } else { -// void *pStreamTableNode = sdbFetchRow(tsStreamTableSdb, pShow->pNode, (void **) &pTable); -// if (pTable != NULL) { -// SStreamTableObj *pChildTable = (SStreamTableObj *) pTable; -// pShow->pNode = pStreamTableNode; -// tableId = pChildTable->tableId; -// superTableId = NULL; -// createdTime = pChildTable->createdTime; -// numOfColumns = pChildTable->numOfColumns; -// } else { -// void *pChildTableNode = sdbFetchRow(tsChildTableSdb, pShow->pNode, (void **) &pTable); -// if (pTable != NULL) { -// SChildTableObj *pChildTable = (SChildTableObj *) pTable; -// pShow->pNode = pChildTableNode; -// tableId = pChildTable->tableId; -// superTableId = NULL; -// createdTime = pChildTable->createdTime; -// numOfColumns = pChildTable->superTable->numOfColumns; -// } else { -// break; -// } -// } -// } -// -// // not belong to current db -// if (strncmp(tableId, prefix, prefixLen)) { -// continue; -// } -// -// char meterName[TSDB_TABLE_NAME_LEN] = {0}; -// memset(meterName, 0, tListLen(meterName)); -// numOfRead++; -// -// // pattern compare for meter name -// extractTableName(tableId, meterName); -// -// if (pShow->payloadLen > 0 && -// patternMatch(pShow->payload, meterName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH) { -// continue; -// } -// -// cols = 0; -// -// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; -// strncpy(pWrite, meterName, TSDB_TABLE_NAME_LEN); -// cols++; -// -// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; -// *(int64_t *) pWrite = createdTime; -// cols++; -// -// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; -// *(int16_t *) pWrite = numOfColumns; -// cols++; -// -// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; -// if (superTableId != NULL) { -// extractTableName(superTableId, pWrite); -// } -// cols++; -// -// numOfRows++; -// } -// -// pShow->numOfReads += numOfRead; -// const int32_t NUM_OF_COLUMNS = 4; -// -// mgmtVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + int32_t numOfRead = 0; + int32_t cols = 0; + void *pTable = NULL; + char *pWrite = NULL; + char prefix[20] = {0}; + SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; + + strcpy(prefix, pDb->name); + strcat(prefix, TS_PATH_DELIMITER); + int32_t prefixLen = strlen(prefix); + + while (numOfRows < rows) { + int16_t numOfColumns = 0; + int64_t createdTime = 0; + char *tableId = NULL; + char *superTableId = NULL; + void *pNormalTableNode = sdbFetchRow(tsNormalTableSdb, pShow->pNode, (void **) &pTable); + if (pTable != NULL) { + SNormalTableObj *pNormalTable = (SNormalTableObj *) pTable; + pShow->pNode = pNormalTableNode; + tableId = pNormalTable->tableId; + superTableId = NULL; + createdTime = pNormalTable->createdTime; + numOfColumns = pNormalTable->numOfColumns; + } else { + void *pChildTableNode = sdbFetchRow(tsChildTableSdb, pShow->pNode, (void **) &pTable); + if (pTable != NULL) { + SChildTableObj *pChildTable = (SChildTableObj *) pTable; + pShow->pNode = pChildTableNode; + tableId = pChildTable->tableId; + superTableId = pChildTable->superTableId; + createdTime = pChildTable->createdTime; + numOfColumns = pChildTable->superTable->numOfColumns; + } else { + break; + } + } + + // not belong to current db + if (strncmp(tableId, prefix, prefixLen)) { + continue; + } + + char tableName[TSDB_TABLE_NAME_LEN] = {0}; + memset(tableName, 0, tListLen(tableName)); + numOfRead++; + + // pattern compare for meter name + extractTableName(tableId, tableName); + + if (pShow->payloadLen > 0 && + patternMatch(pShow->payload, tableName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH) { + continue; + } + + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strncpy(pWrite, tableName, TSDB_TABLE_NAME_LEN); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *) pWrite = createdTime; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int16_t *) pWrite = numOfColumns; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + if (superTableId != NULL) { + extractTableName(superTableId, pWrite); + } + cols++; + + numOfRows++; + } + + pShow->numOfReads += numOfRead; + const int32_t NUM_OF_COLUMNS = 4; + + mgmtVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); return numOfRows; } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index e218b8bd6fae7ec5865812960c10da8bd05c67d7..1cef4492476cff3aba03c89855355e8944963841 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -27,47 +27,47 @@ #include "mgmtTable.h" #include "mgmtVgroup.h" -void *tsVgroupSdb = NULL; -int32_t tsVgUpdateSize; - -void *(*mgmtVgroupActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtVgroupActionInsert(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtVgroupActionDelete(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtVgroupActionUpdate(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtVgroupActionEncode(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtVgroupActionDecode(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtVgroupActionReset(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t *ssize); - -void mgmtVgroupActionInit() { - mgmtVgroupActionFp[SDB_TYPE_INSERT] = mgmtVgroupActionInsert; - mgmtVgroupActionFp[SDB_TYPE_DELETE] = mgmtVgroupActionDelete; - mgmtVgroupActionFp[SDB_TYPE_UPDATE] = mgmtVgroupActionUpdate; - mgmtVgroupActionFp[SDB_TYPE_ENCODE] = mgmtVgroupActionEncode; - mgmtVgroupActionFp[SDB_TYPE_DECODE] = mgmtVgroupActionDecode; - mgmtVgroupActionFp[SDB_TYPE_RESET] = mgmtVgroupActionReset; +static void *tsVgroupSdb = NULL; +static int32_t tsVgUpdateSize = 0; + +static void *(*mgmtVgroupActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); +static void *mgmtVgroupActionInsert(void *row, char *str, int32_t size, int32_t *ssize); +static void *mgmtVgroupActionDelete(void *row, char *str, int32_t size, int32_t *ssize); +static void *mgmtVgroupActionUpdate(void *row, char *str, int32_t size, int32_t *ssize); +static void *mgmtVgroupActionEncode(void *row, char *str, int32_t size, int32_t *ssize); +static void *mgmtVgroupActionDecode(void *row, char *str, int32_t size, int32_t *ssize); +static void *mgmtVgroupActionReset(void *row, char *str, int32_t size, int32_t *ssize); +static void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t *ssize); + +static void mgmtVgroupActionInit() { + SVgObj tObj; + tsVgUpdateSize = tObj.updateEnd - (int8_t *)&tObj; + + mgmtVgroupActionFp[SDB_TYPE_INSERT] = mgmtVgroupActionInsert; + mgmtVgroupActionFp[SDB_TYPE_DELETE] = mgmtVgroupActionDelete; + mgmtVgroupActionFp[SDB_TYPE_UPDATE] = mgmtVgroupActionUpdate; + mgmtVgroupActionFp[SDB_TYPE_ENCODE] = mgmtVgroupActionEncode; + mgmtVgroupActionFp[SDB_TYPE_DECODE] = mgmtVgroupActionDecode; + mgmtVgroupActionFp[SDB_TYPE_RESET] = mgmtVgroupActionReset; mgmtVgroupActionFp[SDB_TYPE_DESTROY] = mgmtVgroupActionDestroy; } -void *mgmtVgroupAction(char action, void *row, char *str, int32_t size, int32_t *ssize) { - if (mgmtVgroupActionFp[(uint8_t)action] != NULL) { - return (*(mgmtVgroupActionFp[(uint8_t)action]))(row, str, size, ssize); +static void *mgmtVgroupAction(char action, void *row, char *str, int32_t size, int32_t *ssize) { + if (mgmtVgroupActionFp[(uint8_t) action] != NULL) { + return (*(mgmtVgroupActionFp[(uint8_t) action]))(row, str, size, ssize); } return NULL; } int32_t mgmtInitVgroups() { - void * pNode = NULL; + void *pNode = NULL; SVgObj *pVgroup = NULL; mgmtVgroupActionInit(); - SVgObj tObj; - tsVgUpdateSize = tObj.updateEnd - (int8_t *)&tObj; - - tsVgroupSdb = sdbOpenTable(tsMaxVGroups, sizeof(SVgObj), "vgroups", SDB_KEYTYPE_AUTO, tsMgmtDirectory, mgmtVgroupAction); + tsVgroupSdb = sdbOpenTable(tsMaxVGroups, tsVgUpdateSize, "vgroups", SDB_KEYTYPE_AUTO, tsMgmtDirectory, mgmtVgroupAction); if (tsVgroupSdb == NULL) { - mError("failed to init vgroup data"); + mError("failed to init vgroups data"); return -1; } @@ -80,6 +80,7 @@ int32_t mgmtInitVgroups() { pVgroup->prev = NULL; pVgroup->next = NULL; + int32_t size = sizeof(STableInfo *) * pDb->cfg.maxSessions; pVgroup->tableList = (STableInfo **)malloc(size); if (pVgroup->tableList == NULL) { @@ -98,7 +99,7 @@ int32_t mgmtInitVgroups() { taosIdPoolReinit(pVgroup->idPool); - if (pVgroup->vnodeGid[0].publicIp == 0) { + if (tsIsCluster && pVgroup->vnodeGid[0].publicIp == 0) { pVgroup->vnodeGid[0].publicIp = inet_addr(tsPublicIp); pVgroup->vnodeGid[0].ip = inet_addr(tsPrivateIp); sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, 1); @@ -115,54 +116,66 @@ SVgObj *mgmtGetVgroup(int32_t vgId) { return (SVgObj *)sdbGetRow(tsVgroupSdb, &vgId); } -SVgObj *mgmtGetAvailVgroup(SDbObj *pDb) { +//TODO: temp way for debug usage +SVgObj *mgmtGetAvailVgroup(SDbObj *pDb, int32_t *sid) { SVgObj *pVgroup = pDb->pHead; - if (pDb->vgStatus == TSDB_VG_STATUS_IN_PROGRESS) { - terrno = TSDB_CODE_ACTION_IN_PROGRESS; - return NULL; - } - - if (pDb->vgStatus == TSDB_VG_STATUS_FULL) { - mError("db:%s, vgroup is full", pDb->name); - terrno = TSDB_CODE_NO_ENOUGH_DNODES; - return NULL; - } - - if (pDb->vgStatus == TSDB_VG_STATUS_NO_DISK_PERMISSIONS || - pDb->vgStatus == TSDB_VG_STATUS_SERVER_NO_PACE || - pDb->vgStatus == TSDB_VG_STATUS_SERV_OUT_OF_MEMORY || - pDb->vgStatus == TSDB_VG_STATUS_INIT_FAILED ) { - mError("db:%s, vgroup init failed, reason:%d %s", pDb->name, pDb->vgStatus, taosGetVgroupStatusStr(pDb->vgStatus)); - terrno = pDb->vgStatus; - return NULL; - } - if (pVgroup == NULL) { - pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS; - mgmtCreateVgroup(pDb); - mTrace("db:%s, vgroup malloced, wait for create progress finished", pDb->name); - terrno = TSDB_CODE_ACTION_IN_PROGRESS; - return NULL; + pVgroup = mgmtCreateVgroup(pDb); } - terrno = 0; + *sid = taosAllocateId(pVgroup->idPool); return pVgroup; -} -int32_t mgmtAllocateSid(SDbObj *pDb, SVgObj *pVgroup) { - int32_t sid = taosAllocateId(pVgroup->idPool); - if (sid < 0) { - mWarn("table:%s, vgroup:%d run out of ID, num:%d", pDb->name, pVgroup->vgId, taosIdPoolNumOfUsed(pVgroup->idPool)); - pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS; - mgmtCreateVgroup(pDb); - terrno = TSDB_CODE_ACTION_IN_PROGRESS; - } - - terrno = 0; - return sid; +// if (pDb->vgStatus) +// +// if (pDb->vgStatus == TSDB_VG_STATUS_IN_PROGRESS) { +// terrno = TSDB_CODE_ACTION_IN_PROGRESS; +// return NULL; +// } +// +// if (pDb->vgStatus == TSDB_VG_STATUS_FULL) { +// mError("db:%s, vgroup is full", pDb->name); +// terrno = TSDB_CODE_NO_ENOUGH_DNODES; +// return NULL; +// } +// +// if (pDb->vgStatus == TSDB_VG_STATUS_NO_DISK_PERMISSIONS || +// pDb->vgStatus == TSDB_VG_STATUS_SERVER_NO_PACE || +// pDb->vgStatus == TSDB_VG_STATUS_SERV_OUT_OF_MEMORY || +// pDb->vgStatus == TSDB_VG_STATUS_INIT_FAILED ) { +// mError("db:%s, vgroup init failed, reason:%d %s", pDb->name, pDb->vgStatus, taosGetVgroupStatusStr(pDb->vgStatus)); +// terrno = pDb->vgStatus; +// return NULL; +// } +// +// if (pVgroup == NULL) { +// //TODO +// //pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS; +// pDb->vgStatus = TSDB_VG_STATUS_READY; +// mgmtCreateVgroup(pDb); +// mTrace("db:%s, vgroup malloced, wait for create progress finished", pDb->name); +// terrno = TSDB_CODE_ACTION_IN_PROGRESS; +// return NULL; +// } +// +// terrno = 0; +// return pVgroup; } +//int32_t mgmtAllocateSid(SDbObj *pDb, SVgObj *pVgroup) { +// int32_t sid = taosAllocateId(pVgroup->idPool); +// if (sid < 0) { +// mWarn("table:%s, vgroup:%d run out of ID, num:%d", pDb->name, pVgroup->vgId, taosIdPoolNumOfUsed(pVgroup->idPool)); +// pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS; +// mgmtCreateVgroup(pDb); +// terrno = TSDB_CODE_ACTION_IN_PROGRESS; +// } +// +// terrno = 0; +// return sid; +//} + void mgmtProcessVgTimer(void *handle, void *tmrId) { SDbObj *pDb = (SDbObj *)handle; if (pDb == NULL) return; @@ -176,12 +189,7 @@ void mgmtProcessVgTimer(void *handle, void *tmrId) { } SVgObj *mgmtCreateVgroup(SDbObj *pDb) { - SVgObj *pVgroup; - int32_t size; - - size = sizeof(SVgObj); - pVgroup = (SVgObj *)malloc(size); - memset(pVgroup, 0, size); + SVgObj *pVgroup = (SVgObj *)calloc(sizeof(SVgObj), 1); strcpy(pVgroup->dbName, pDb->name); pVgroup->numOfVnodes = pDb->cfg.replications; pVgroup->createdTime = taosGetTimestampMs(); @@ -195,6 +203,13 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) { return NULL; } + pVgroup->tableList = (STableInfo **) calloc(sizeof(STableInfo *), pDb->cfg.maxSessions); + pVgroup->numOfTables = 0; + pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions); + + mgmtAddVgroupIntoDb(pDb, pVgroup); + mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId); + sdbInsertRow(tsVgroupSdb, pVgroup, 0); mTrace("vgroup:%d, vgroup is created, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); @@ -246,204 +261,193 @@ void mgmtSetVgroupIdPool() { } } -void mgmtCleanUpVgroups() { sdbCloseTable(tsVgroupSdb); } +void mgmtCleanUpVgroups() { + sdbCloseTable(tsVgroupSdb); +} int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { -// int32_t cols = 0; -// -// SDbObj *pDb = NULL; -// if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name); -// -// if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED; -// -// SSchema *pSchema = tsGetSchema(pMeta); -// -// pShow->bytes[cols] = 4; -// pSchema[cols].type = TSDB_DATA_TYPE_INT; -// strcpy(pSchema[cols].name, "vgId"); -// pSchema[cols].bytes = htons(pShow->bytes[cols]); -// cols++; -// -// pShow->bytes[cols] = 4; -// pSchema[cols].type = TSDB_DATA_TYPE_INT; -// strcpy(pSchema[cols].name, "meters"); -// pSchema[cols].bytes = htons(pShow->bytes[cols]); -// cols++; -// -// pShow->bytes[cols] = 9; -// pSchema[cols].type = TSDB_DATA_TYPE_BINARY; -// strcpy(pSchema[cols].name, "vgroup status"); -// pSchema[cols].bytes = htons(pShow->bytes[cols]); -// cols++; -// -// int32_t maxReplica = 0; -// SVgObj *pVgroup = NULL; -// STableInfo *pTable = NULL; -// if (pShow->payloadLen > 0 ) { -// pTable = mgmtGetTable(pShow->payload); -// if (NULL == pTable) { -// return TSDB_CODE_INVALID_TABLE_ID; -// } -// -// pVgroup = mgmtGetVgroup(pTable->vgId); -// if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID; -// -// maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica; -// } else { -// SVgObj *pVgroup = pDb->pHead; -// while (pVgroup != NULL) { -// maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica; -// pVgroup = pVgroup->next; -// } -// } -// -// for (int32_t i = 0; i < maxReplica; ++i) { -// pShow->bytes[cols] = 16; -// pSchema[cols].type = TSDB_DATA_TYPE_BINARY; -// strcpy(pSchema[cols].name, "ip"); -// pSchema[cols].bytes = htons(pShow->bytes[cols]); -// cols++; -// -// pShow->bytes[cols] = 2; -// pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; -// strcpy(pSchema[cols].name, "vnode"); -// pSchema[cols].bytes = htons(pShow->bytes[cols]); -// cols++; -// -// pShow->bytes[cols] = 9; -// pSchema[cols].type = TSDB_DATA_TYPE_BINARY; -// strcpy(pSchema[cols].name, "vnode status"); -// pSchema[cols].bytes = htons(pShow->bytes[cols]); -// cols++; -// -// pShow->bytes[cols] = 16; -// pSchema[cols].type = TSDB_DATA_TYPE_BINARY; -// strcpy(pSchema[cols].name, "public ip"); -// pSchema[cols].bytes = htons(pShow->bytes[cols]); -// cols++; -// } -// -// pMeta->numOfColumns = htons(cols); -// pShow->numOfColumns = cols; -// -// pShow->offset[0] = 0; -// for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; -// -// pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; -// -// if (NULL == pTable) { -// pShow->numOfRows = pDb->numOfVgroups; -// pShow->pNode = pDb->pHead; -// } else { -// pShow->numOfRows = 1; -// pShow->pNode = pVgroup; -// } + SDbObj *pDb = mgmtGetDb(pShow->db); + if (pDb == NULL) { + return TSDB_CODE_DB_NOT_SELECTED; + } + + int32_t cols = 0; + SSchema *pSchema = tsGetSchema(pMeta); + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "vgId"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "tables"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 9; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "vgroup status"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + int32_t maxReplica = 0; + SVgObj *pVgroup = NULL; + STableInfo *pTable = NULL; + if (pShow->payloadLen > 0 ) { + pTable = mgmtGetTable(pShow->payload); + if (NULL == pTable) { + return TSDB_CODE_INVALID_TABLE_ID; + } + + pVgroup = mgmtGetVgroup(pTable->vgId); + if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID; + + maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica; + } else { + SVgObj *pVgroup = pDb->pHead; + while (pVgroup != NULL) { + maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica; + pVgroup = pVgroup->next; + } + } + + for (int32_t i = 0; i < maxReplica; ++i) { + pShow->bytes[cols] = 16; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "ip"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 2; + pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; + strcpy(pSchema[cols].name, "vnode"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 9; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "vnode status"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 16; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "public ip"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + } + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + + if (NULL == pTable) { + pShow->numOfRows = pDb->numOfVgroups; + pShow->pNode = pDb->pHead; + } else { + pShow->numOfRows = 1; + pShow->pNode = pVgroup; + } return 0; } int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; -// SVgObj *pVgroup = NULL; -// char * pWrite; -// int32_t cols = 0; -// char ipstr[20]; -// -// int32_t maxReplica = 0; -// -// SDbObj *pDb = NULL; -// if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name); -// assert(pDb != NULL); -// -// pVgroup = pDb->pHead; -// while (pVgroup != NULL) { -// maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica; -// pVgroup = pVgroup->next; -// } -// -// while (numOfRows < rows) { -// // pShow->pNode = sdbFetchRow(tsVgroupSdb, pShow->pNode, (void **)&pVgroup); -// pVgroup = (SVgObj *)pShow->pNode; -// if (pVgroup == NULL) break; -// pShow->pNode = (void *)pVgroup->next; -// -// cols = 0; -// -// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; -// *(int32_t *)pWrite = pVgroup->vgId; -// cols++; -// -// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; -// *(int32_t *)pWrite = pVgroup->numOfTables; -// cols++; -// -// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; -// strcpy(pWrite, taosGetVgroupLbStatusStr(pVgroup->lbStatus)); -// cols++; -// -// for (int32_t i = 0; i < maxReplica; ++i) { -// tinet_ntoa(ipstr, pVgroup->vnodeGid[i].ip); -// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; -// strcpy(pWrite, ipstr); -// cols++; -// -// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; -// *(int16_t *)pWrite = pVgroup->vnodeGid[i].vnode; -// cols++; -// -// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; -// if (pVgroup->vnodeGid[i].ip != 0) { -// char *vnodeStatus = mgmtGetVnodeStatus(pVgroup, pVgroup->vnodeGid + i); -// strcpy(pWrite, vnodeStatus); -// } else { -// strcpy(pWrite, "null"); -// } -// cols++; -// -// tinet_ntoa(ipstr, pVgroup->vnodeGid[i].publicIp); -// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; -// strcpy(pWrite, ipstr); -// cols++; -// } -// -// numOfRows++; -// } -// -// pShow->numOfReads += numOfRows; - return numOfRows; -} + SVgObj *pVgroup = NULL; + int32_t maxReplica = 0; + int32_t cols = 0; + char ipstr[20]; + char * pWrite; + + SDbObj *pDb = mgmtGetDb(pShow->db); + if (pDb == NULL) return 0; + + pVgroup = pDb->pHead; + while (pVgroup != NULL) { + maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica; + pVgroup = pVgroup->next; + } -void *mgmtVgroupActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { - SVgObj *pVgroup = (SVgObj *)row; - SDbObj *pDb = mgmtGetDb(pVgroup->dbName); + while (numOfRows < rows) { + pVgroup = (SVgObj *) pShow->pNode; + if (pVgroup == NULL) break; + pShow->pNode = (void *) pVgroup->next; + + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t *) pWrite = pVgroup->vgId; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t *) pWrite = pVgroup->numOfTables; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strcpy(pWrite, taosGetVgroupLbStatusStr(pVgroup->lbStatus)); + cols++; + + for (int32_t i = 0; i < maxReplica; ++i) { + tinet_ntoa(ipstr, pVgroup->vnodeGid[i].ip); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strcpy(pWrite, ipstr); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int16_t *) pWrite = pVgroup->vnodeGid[i].vnode; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + if (pVgroup->vnodeGid[i].ip != 0) { + char *vnodeStatus = mgmtGetVnodeStatus(pVgroup, pVgroup->vnodeGid + i); + strcpy(pWrite, vnodeStatus); + } else { + strcpy(pWrite, "null"); + } + cols++; - if (pDb == NULL) return NULL; + tinet_ntoa(ipstr, pVgroup->vnodeGid[i].publicIp); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strcpy(pWrite, ipstr); + cols++; + } - int32_t tsize = sizeof(STableInfo *) * pDb->cfg.maxSessions; - pVgroup->tableList = (STableInfo **)malloc(tsize); - memset(pVgroup->tableList, 0, tsize); - pVgroup->numOfTables = 0; - pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions); - mgmtAddVgroupIntoDb(pDb, pVgroup); - mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId); + numOfRows++; + } + pShow->numOfReads += numOfRows; + return numOfRows; +} + +static void *mgmtVgroupActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { return NULL; } -void *mgmtVgroupActionDelete(void *row, char *str, int32_t size, int32_t *ssize) { - SVgObj *pVgroup = (SVgObj *)row; +static void *mgmtVgroupActionDelete(void *row, char *str, int32_t size, int32_t *ssize) { + SVgObj *pVgroup = row; SDbObj *pDb = mgmtGetDb(pVgroup->dbName); - if (pDb != NULL) mgmtRemoveVgroupFromDb(pDb, pVgroup); + if (pDb != NULL) { + mgmtRemoveVgroupFromDb(pDb, pVgroup); + } + mgmtUnSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes); tfree(pVgroup->tableList); return NULL; } -void *mgmtVgroupActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) { +static void *mgmtVgroupActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) { mgmtVgroupActionReset(row, str, size, ssize); - SVgObj *pVgroup = (SVgObj *)row; + + SVgObj *pVgroup = (SVgObj *) row; int32_t oldTables = taosIdPoolMaxSize(pVgroup->idPool); SDbObj *pDb = mgmtGetDb(pVgroup->dbName); @@ -461,41 +465,37 @@ void *mgmtVgroupActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) return NULL; } -void *mgmtVgroupActionEncode(void *row, char *str, int32_t size, int32_t *ssize) { - SVgObj *pVgroup = (SVgObj *)row; - int32_t tsize = pVgroup->updateEnd - (int8_t *)pVgroup; - if (size < tsize) { +static void *mgmtVgroupActionEncode(void *row, char *str, int32_t size, int32_t *ssize) { + SVgObj *pVgroup = (SVgObj *) row; + if (size < tsVgUpdateSize) { *ssize = -1; } else { - memcpy(str, pVgroup, tsize); - *ssize = tsize; + memcpy(str, pVgroup, tsVgUpdateSize); + *ssize = tsVgUpdateSize; } return NULL; } -void *mgmtVgroupActionDecode(void *row, char *str, int32_t size, int32_t *ssize) { - SVgObj *pVgroup = (SVgObj *)malloc(sizeof(SVgObj)); +static void *mgmtVgroupActionDecode(void *row, char *str, int32_t size, int32_t *ssize) { + SVgObj *pVgroup = (SVgObj *) malloc(sizeof(SVgObj)); if (pVgroup == NULL) return NULL; memset(pVgroup, 0, sizeof(SVgObj)); - int32_t tsize = pVgroup->updateEnd - (int8_t *)pVgroup; - memcpy(pVgroup, str, tsize); + int32_t tsVgUpdateSize = pVgroup->updateEnd - (int8_t *) pVgroup; + memcpy(pVgroup, str, tsVgUpdateSize); - return (void *)pVgroup; + return (void *) pVgroup; } -void *mgmtVgroupActionReset(void *row, char *str, int32_t size, int32_t *ssize) { - SVgObj *pVgroup = (SVgObj *)row; - int32_t tsize = pVgroup->updateEnd - (int8_t *)pVgroup; - - memcpy(pVgroup, str, tsize); - +static void *mgmtVgroupActionReset(void *row, char *str, int32_t size, int32_t *ssize) { + SVgObj *pVgroup = (SVgObj *) row; + memcpy(pVgroup, str, tsVgUpdateSize); return NULL; } -void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) { - SVgObj *pVgroup = (SVgObj *)row; +static void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) { + SVgObj *pVgroup = (SVgObj *) row; if (pVgroup->idPool) { taosIdPoolCleanUp(pVgroup->idPool); pVgroup->idPool = NULL; @@ -504,3 +504,20 @@ void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t *ssize tfree(row); return NULL; } + +void mgmtUpdateVgroup(SVgObj *pVgroup) { + sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, 0); +} + +void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable) { + pVgroup->numOfTables++; + if (pTable->sid >= 0) + pVgroup->tableList[pTable->sid] = pTable; +} + +void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable) { + pVgroup->numOfTables--; + if (pTable->sid >= 0) + pVgroup->tableList[pTable->sid] = NULL; + taosFreeId(pVgroup->idPool, pTable->sid); +}