提交 ce906917 编写于 作者: S slguan

create table in mnode

上级 78812e6d
......@@ -2545,21 +2545,13 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) {
SSchema * pSchema;
uint8_t ieType;
char *rsp = pSql->res.pRsp;
ieType = *rsp;
if (ieType != TSDB_IE_TYPE_META) {
tscError("invalid ie type:%d", ieType);
return TSDB_CODE_INVALID_IE;
}
rsp++;
pMeta = (STableMeta *)rsp;
pMeta = (STableMeta *)pSql->res.pRsp;
pMeta->sid = htonl(pMeta->sid);
pMeta->sversion = htons(pMeta->sversion);
pMeta->vgid = htonl(pMeta->vgid);
pMeta->uid = htobe64(pMeta->uid);
pMeta->contLen = htons(pMeta->contLen);
if (pMeta->sid < 0 || pMeta->vgid < 0) {
tscError("invalid meter vgid:%d, sid%d", pMeta->vgid, pMeta->sid);
......@@ -2583,8 +2575,7 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) {
}
pMeta->rowSize = 0;
rsp += sizeof(STableMeta);
pSchema = (SSchema *)rsp;
pSchema = (SSchema *)(pSql->res.pRsp + sizeof(STableMeta));
int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
for (int i = 0; i < numOfTotalCols; ++i) {
......@@ -2598,29 +2589,29 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) {
pSchema++;
}
rsp += numOfTotalCols * sizeof(SSchema);
int32_t tagLen = 0;
SSchema *pTagsSchema = tsGetTagSchema(pMeta);
if (pMeta->tableType == TSDB_TABLE_TYPE_CHILD_TABLE) {
for (int32_t i = 0; i < pMeta->numOfTags; ++i) {
tagLen += pTagsSchema[i].bytes;
}
}
rsp += tagLen;
int32_t size = (int32_t)(rsp - (char *)pMeta);
// rsp += numOfTotalCols * sizeof(SSchema);
//
// int32_t tagLen = 0;
// SSchema *pTagsSchema = tsGetTagSchema(pMeta);
//
// if (pMeta->tableType == TSDB_TABLE_TYPE_CHILD_TABLE) {
// for (int32_t i = 0; i < pMeta->numOfTags; ++i) {
// tagLen += pTagsSchema[i].bytes;
// }
// }
//
// rsp += tagLen;
// int32_t size = (int32_t)(rsp - (char *)pMeta);
// pMeta->index = rand() % TSDB_VNODES_SUPPORT;
pMeta->index = 0;
// pMeta->index = 0;
// todo add one more function: taosAddDataIfNotExists();
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
assert(pMeterMetaInfo->pMeterMeta == NULL);
pMeterMetaInfo->pMeterMeta = (STableMeta *)taosAddDataIntoCache(tscCacheHandle, pMeterMetaInfo->name, (char *)pMeta,
size, tsMeterMetaKeepTimer);
pMeta->contLen, tsMeterMetaKeepTimer);
// todo handle out of memory case
if (pMeterMetaInfo->pMeterMeta == NULL) return 0;
......
......@@ -47,11 +47,7 @@ extern void *tsMgmtTmr;
extern void *tsMgmtTranQhandle;
extern char tsMgmtDirectory[];
extern int tsAcctUpdateSize;
extern int tsDbUpdateSize;
extern int tsDnodeUpdateSize;
extern int tsMnodeUpdateSize;
extern int tsVgUpdateSize;
typedef struct {
uint32_t privateIp;
......@@ -102,7 +98,7 @@ typedef struct {
} STableGid;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
uint64_t uid;
int32_t sid;
......@@ -113,7 +109,7 @@ typedef struct {
struct _vg_obj;
typedef struct SSuperTableObj {
char tableId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
uint64_t uid;
int32_t sid;
......@@ -130,35 +126,20 @@ typedef struct SSuperTableObj {
} SSuperTableObj;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
uint64_t uid;
int32_t sid;
int32_t vgId;
int64_t createdTime;
char superTableId[TSDB_TABLE_ID_LEN];
char superTableId[TSDB_TABLE_ID_LEN + 1];
int8_t reserved[7];
int8_t updateEnd[1];
SSuperTableObj *superTable;
} SChildTableObj;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
int8_t type;
uint64_t uid;
int32_t sid;
int32_t vgId;
int64_t createdTime;
int32_t sversion;
int32_t numOfColumns;
int8_t reserved[3];
int8_t updateEnd[1];
int16_t nextColId;
SSchema* schema;
} SNormalTableObj;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
uint64_t uid;
int32_t sid;
......@@ -169,14 +150,14 @@ typedef struct {
int16_t sqlLen;
int8_t reserved[3];
int8_t updateEnd[1];
int16_t nextColId;
char* sql; //null-terminated string
int16_t nextColId;
SSchema* schema;
} SStreamTableObj;
} SNormalTableObj;
typedef struct _vg_obj {
uint32_t vgId;
char dbName[TSDB_DB_NAME_LEN];
char dbName[TSDB_DB_NAME_LEN + 1];
int64_t createdTime;
uint64_t lastCreate;
uint64_t lastRemove;
......@@ -194,7 +175,7 @@ typedef struct _vg_obj {
} SVgObj;
typedef struct _db_obj {
char name[TSDB_DB_NAME_LEN];
char name[TSDB_DB_NAME_LEN + 1];
int64_t createdTime;
SDbCfg cfg;
int8_t dropStatus;
......@@ -213,9 +194,9 @@ typedef struct _db_obj {
struct _acctObj;
typedef struct _user_obj {
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char acct[TSDB_USER_LEN];
char user[TSDB_USER_LEN + 1];
char pass[TSDB_KEY_LEN + 1];
char acct[TSDB_USER_LEN + 1];
int64_t createdTime;
int8_t superAuth;
int8_t writeAuth;
......@@ -246,8 +227,8 @@ typedef struct {
} SAcctInfo;
typedef struct _acctObj {
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char user[TSDB_USER_LEN + 1];
char pass[TSDB_KEY_LEN + 1];
SAcctCfg cfg;
int32_t acctId;
int64_t createdTime;
......@@ -259,16 +240,9 @@ typedef struct _acctObj {
pthread_mutex_t mutex;
} SAcctObj;
typedef struct {
char spi;
char encrypt;
char secret[TSDB_KEY_LEN];
char cipheringKey[TSDB_KEY_LEN];
} SSecInfo;
typedef struct {
int8_t type;
char db[TSDB_DB_NAME_LEN];
char db[TSDB_DB_NAME_LEN + 1];
void * pNode;
int16_t numOfColumns;
int32_t rowSize;
......
......@@ -136,11 +136,8 @@ int64_t sdbGetVersion();
int32_t sdbGetRunStatus();
#define TSDB_MAX_TABLES 1000
extern void* tsChildTableSdb;
extern void* tsNormalTableSdb;
extern void* tsStreamTableSdb;
extern void* tsSuperTableSdb;
#define TSDB_MAX_NORMAL_TABLES 10000
#define TSDB_MAX_SUPER_TABLES 1000
#ifdef __cplusplus
}
......
......@@ -224,7 +224,7 @@ typedef struct {
typedef struct SSchema {
uint8_t type;
char name[TSDB_COL_NAME_LEN];
char name[TSDB_COL_NAME_LEN + 1];
int16_t colId;
int16_t bytes;
} SSchema;
......@@ -247,8 +247,8 @@ typedef struct {
} SDCreateTableMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
char db[TSDB_DB_NAME_LEN];
char tableId[TSDB_TABLE_ID_LEN + 1];
char db[TSDB_DB_NAME_LEN + 1];
int8_t igExists;
int16_t numOfTags;
int16_t numOfColumns;
......@@ -258,14 +258,14 @@ typedef struct {
} SCreateTableMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
char db[TSDB_DB_NAME_LEN];
char tableId[TSDB_TABLE_ID_LEN + 1];
char db[TSDB_DB_NAME_LEN + 1];
int8_t igNotExists;
} SDropTableMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
char db[TSDB_DB_NAME_LEN];
char tableId[TSDB_TABLE_ID_LEN + 1];
char db[TSDB_DB_NAME_LEN + 1];
int16_t type; /* operation type */
char tagVal[TSDB_MAX_BYTES_PER_ROW];
int8_t numOfCols; /* number of schema */
......@@ -275,11 +275,11 @@ typedef struct {
typedef struct {
char clientVersion[TSDB_VERSION_LEN];
char msgVersion[TSDB_VERSION_LEN];
char db[TSDB_TABLE_ID_LEN];
char db[TSDB_TABLE_ID_LEN + 1];
} SConnectMsg;
typedef struct {
char acctId[TSDB_ACCT_LEN];
char acctId[TSDB_ACCT_LEN + 1];
char serverVersion[TSDB_VERSION_LEN];
int8_t writeAuth;
int8_t superAuth;
......@@ -301,24 +301,24 @@ typedef struct {
} SAcctCfg;
typedef struct {
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char user[TSDB_USER_LEN + 1];
char pass[TSDB_KEY_LEN + 1];
SAcctCfg cfg;
} SCreateAcctMsg, SAlterAcctMsg;
typedef struct {
char user[TSDB_USER_LEN];
char user[TSDB_USER_LEN + 1];
} SDropUserMsg, SDropAcctMsg;
typedef struct {
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char user[TSDB_USER_LEN + 1];
char pass[TSDB_KEY_LEN + 1];
int8_t privilege;
int8_t flag;
} SCreateUserMsg, SAlterUserMsg;
typedef struct {
char db[TSDB_TABLE_ID_LEN];
char db[TSDB_TABLE_ID_LEN + 1];
} SMgmtHead;
typedef struct {
......@@ -444,7 +444,7 @@ typedef struct {
int64_t nAggTimeInterval; // time interval for aggregation, in million second
int64_t slidingTime; // value for sliding window
// tag schema, used to parse tag information in pSidExtInfo
uint64_t pTagSchema;
......@@ -517,8 +517,8 @@ typedef struct {
* NOTE: sizeof(SVnodeCfg) < TSDB_FILE_HEADER_LEN / 4
*/
typedef struct {
char acct[TSDB_USER_LEN];
char db[TSDB_DB_NAME_LEN];
char acct[TSDB_USER_LEN + 1];
char db[TSDB_DB_NAME_LEN + 1];
uint32_t vgId;
int32_t maxSessions;
int32_t cacheBlockSize;
......@@ -543,7 +543,7 @@ typedef struct {
} SVnodeCfg, SCreateDbMsg, SDbCfg, SAlterDbMsg;
typedef struct {
char db[TSDB_TABLE_ID_LEN];
char db[TSDB_TABLE_ID_LEN + 1];
uint8_t ignoreNotExists;
} SDropDbMsg, SUseDbMsg;
......@@ -583,7 +583,7 @@ typedef struct {
typedef struct {
uint32_t destId;
uint32_t destIp;
char tableId[TSDB_UNI_LEN];
char tableId[TSDB_UNI_LEN + 1];
char empty[3];
uint8_t msgType;
int32_t msgLen;
......@@ -615,7 +615,7 @@ typedef struct {
} SVPeersMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_ID_LEN + 1];
int16_t createFlag;
char tags[];
} STableInfoMsg;
......@@ -626,7 +626,7 @@ typedef struct {
} SMultiTableInfoMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_ID_LEN + 1];
} SSuperTableInfoMsg;
typedef struct {
......@@ -637,7 +637,7 @@ typedef struct {
typedef struct {
int16_t elemLen;
char tableId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_ID_LEN + 1];
int16_t orderIndex;
int16_t orderType; // used in group by xx order by xxx
......@@ -678,7 +678,7 @@ typedef struct {
} SSuperTableMeta;
typedef struct STableMeta {
char tableId[TSDB_TABLE_ID_LEN]; // note: This field must be at the front
char tableId[TSDB_TABLE_ID_LEN + 1]; // note: This field must be at the front
int32_t contLen;
uint8_t numOfTags : 6;
uint8_t precision : 2;
......@@ -701,7 +701,7 @@ typedef struct SMultiTableMeta {
} SMultiTableMeta;
typedef struct {
char name[TSDB_TABLE_ID_LEN];
char name[TSDB_TABLE_ID_LEN + 1];
char data[TSDB_MAX_TAGS_LEN];
} STagData;
......@@ -712,7 +712,7 @@ typedef struct {
*/
typedef struct {
int8_t type;
char db[TSDB_DB_NAME_LEN];
char db[TSDB_DB_NAME_LEN + 1];
uint16_t payloadLen;
char payload[];
} SShowMsg;
......@@ -741,14 +741,14 @@ typedef struct {
} SCfgDnodeMsg;
typedef struct {
char sql[TSDB_SHOW_SQL_LEN];
char sql[TSDB_SHOW_SQL_LEN + 1];
uint32_t queryId;
int64_t useconds;
int64_t stime;
} SQueryDesc;
typedef struct {
char sql[TSDB_SHOW_SQL_LEN];
char sql[TSDB_SHOW_SQL_LEN + 1];
uint32_t streamId;
int64_t num; // number of computing/cycles
int64_t useconds;
......@@ -781,7 +781,7 @@ typedef struct {
} SHeartBeatRsp;
typedef struct {
char queryId[TSDB_KILL_MSG_LEN];
char queryId[TSDB_KILL_MSG_LEN + 1];
} SKillQueryMsg, SKillStreamMsg, SKillConnectionMsg;
typedef struct {
......
......@@ -86,7 +86,7 @@ int main(int argc, char* argv[]) {
{
printf("=== this a test for debug usage\n");
void *taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
taos_query(taos, "create database db");
taos_query(taos, "create table d1.c2 using d1.st2 tags(1)");
while (1) {
sleep(1000);
}
......
......@@ -28,7 +28,6 @@ extern void *mgmtStatusTimer;
int32_t mgmtSendCreateTableMsg(SChildTableObj *pTable, SVgObj *pVgroup);
int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup);
int32_t mgmtSendCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup);
int mgmtSendRemoveMeterMsgToDnode(STableInfo *pTable, SVgObj *pVgroup);
int mgmtSendVPeersMsg(SVgObj *pVgroup);
......
......@@ -25,8 +25,8 @@ extern "C" {
#include "mnode.h"
extern bool (*mgmtCheckExpired)();
extern void (*mgmtAddTimeSeries)(uint32_t timeSeriesNum);
extern void (*mgmtRestoreTimeSeries)(uint32_t timeseries);
extern void (*mgmtAddTimeSeries)(SAcctObj *pAcct, uint32_t timeSeriesNum);
extern void (*mgmtRestoreTimeSeries)(SAcctObj *pAcct, uint32_t timeseries);
extern int32_t (*mgmtCheckTimeSeries)(uint32_t timeseries);
extern int32_t (*mgmtCheckUserGrant)();
extern int32_t (*mgmtCheckDbGrant)();
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TBASE_MNODE_STREAM_TABLE_H
#define TBASE_MNODE_STREAM_TABLE_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include "mnode.h"
int32_t mgmtInitStreamTables();
void mgmtCleanUpStreamTables();
void * mgmtGetStreamTable(char *tableId);
int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable);
int32_t mgmtAlterStreamTable(SDbObj *pDb, SAlterTableMsg *pAlter);
int8_t *mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup);
int32_t mgmtGetStreamTableMeta(SDbObj *pDb, SStreamTableObj *pTable, STableMeta *pMeta, bool usePublicIp);
#ifdef __cplusplus
}
#endif
#endif
......@@ -25,16 +25,21 @@ extern "C" {
#include "mnode.h"
int32_t mgmtInitVgroups();
void mgmtCleanUpVgroups();
SVgObj *mgmtGetVgroup(int32_t vgId);
SVgObj *mgmtCreateVgroup(SDbObj *pDb);
int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup);
void mgmtSetVgroupIdPool();
void mgmtUpdateVgroup(SVgObj *pVgroup);
int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
void mgmtCleanUpVgroups();
SVgObj *mgmtGetAvailVgroup(SDbObj *pDb);
int32_t mgmtAllocateSid(SDbObj *pDb, SVgObj *pVgroup);
void mgmtSetVgroupIdPool();
SVgObj *mgmtGetAvailVgroup(SDbObj *pDb, int32_t *sid);
void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable);
void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable);
#ifdef __cplusplus
}
......
......@@ -36,6 +36,7 @@
#include "mgmtVgroup.h"
void *tsChildTableSdb;
int32_t tsChildTableUpdateSize;
void *(*mgmtChildTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtChildTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
......@@ -61,6 +62,8 @@ static void mgmtChildTableActionInit() {
}
void *mgmtChildTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) {
SChildTableObj *pTable = (SChildTableObj *) row;
memcpy(pTable, str, tsChildTableUpdateSize);
return NULL;
}
......@@ -102,10 +105,9 @@ void *mgmtChildTableActionInsert(void *row, char *str, int32_t size, int32_t *ss
pTable->superTable = mgmtGetSuperTable(pTable->superTableId);
mgmtAddTableIntoSuperTable(pTable->superTable);
pAcct->acctInfo.numOfTimeSeries += (pTable->superTable->numOfColumns - 1);
pVgroup->numOfTables++;
pDb->numOfTables++;
pVgroup->tableList[pTable->sid] = (STableInfo *) pTable;
mgmtAddTimeSeries(pAcct, pTable->superTable->numOfColumns - 1);
mgmtAddTableIntoDb(pDb);
mgmtAddTableIntoVgroup(pVgroup, (STableInfo *) pTable);
if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) {
mgmtMoveVgroupToTail(pDb, pVgroup);
......@@ -138,11 +140,9 @@ void *mgmtChildTableActionDelete(void *row, char *str, int32_t size, int32_t *ss
return NULL;
}
pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1);
pVgroup->tableList[pTable->sid] = NULL;
pVgroup->numOfTables--;
pDb->numOfTables--;
taosFreeId(pVgroup->idPool, pTable->sid);
mgmtRestoreTimeSeries(pAcct, pTable->superTable->numOfColumns - 1);
mgmtRemoveTableFromDb(pDb);
mgmtRemoveTableFromVgroup(pVgroup, (STableInfo *) pTable);
mgmtRemoveTableFromSuperTable(pTable->superTable);
......@@ -161,8 +161,8 @@ void *mgmtChildTableActionEncode(void *row, char *str, int32_t size, int32_t *ss
SChildTableObj *pTable = (SChildTableObj *) row;
assert(row != NULL && str != NULL);
int32_t tsize = pTable->updateEnd - (int8_t *) pTable;
memcpy(str, pTable, tsize);
memcpy(str, pTable, tsChildTableUpdateSize);
*ssize = tsChildTableUpdateSize;
return NULL;
}
......@@ -170,18 +170,14 @@ void *mgmtChildTableActionEncode(void *row, char *str, int32_t size, int32_t *ss
void *mgmtChildTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize) {
assert(str != NULL);
SChildTableObj *pTable = (SChildTableObj *)malloc(sizeof(SChildTableObj));
if (pTable == NULL) {
return NULL;
}
memset(pTable, 0, sizeof(SChildTableObj));
SChildTableObj *pTable = (SChildTableObj *)calloc(sizeof(SChildTableObj), 1);
if (pTable == NULL) return NULL;
int32_t tsize = pTable->updateEnd - (int8_t *)pTable;
if (size < tsize) {
if (size < tsChildTableUpdateSize) {
mgmtDestroyChildTable(pTable);
return NULL;
}
memcpy(pTable, str, tsize);
memcpy(pTable, str, tsChildTableUpdateSize);
return (void *)pTable;
}
......@@ -199,8 +195,10 @@ int32_t mgmtInitChildTables() {
SChildTableObj *pTable = NULL;
mgmtChildTableActionInit();
SChildTableObj tObj;
tsChildTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
tsChildTableSdb = sdbOpenTable(tsMaxTables, sizeof(SChildTableObj),
tsChildTableSdb = sdbOpenTable(tsMaxTables, tsChildTableUpdateSize,
"ctables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtChildTableAction);
if (tsChildTableSdb == NULL) {
mError("failed to init child table data");
......@@ -216,22 +214,63 @@ int32_t mgmtInitChildTables() {
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL) {
mError("super table:%s, failed to get db, discard it", pTable->tableId);
mError("ctable:%s, failed to get db, discard it", pTable->tableId);
sdbDeleteRow(tsChildTableSdb, pTable);
pNode = pLastNode;
continue;
}
mgmtAddTableIntoDb(pDb);
}
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("ctable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->tableId, pTable->vgId, pTable->sid);
pTable->vgId = 0;
sdbDeleteRow(tsChildTableSdb, pTable);
pNode = pLastNode;
continue;
}
mgmtSetVgroupIdPool();
if (strcmp(pVgroup->dbName, pDb->name) != 0) {
mError("ctable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it",
pTable->tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid);
pTable->vgId = 0;
sdbDeleteRow(tsChildTableSdb, pTable);
pNode = pLastNode;
continue;
}
if (pVgroup->tableList == NULL) {
mError("ctable:%s, vgroup:%d tableList is null", pTable->tableId, pTable->vgId);
pTable->vgId = 0;
sdbDeleteRow(tsChildTableSdb, pTable);
pNode = pLastNode;
continue;
}
pVgroup->tableList[pTable->sid] = (STableInfo*)pTable;
taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid, 1);
SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTable->superTableId);
if (pSuperTable == NULL) {
mError("ctable:%s, stable:%s not exist", pTable->tableId, pTable->superTableId);
pTable->vgId = 0;
sdbDeleteRow(tsChildTableSdb, pTable);
pNode = pLastNode;
continue;
}
pTable->superTable = pSuperTable;
mgmtAddTableIntoSuperTable(pSuperTable);
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
mgmtAddTimeSeries(pAcct, pTable->superTable->numOfColumns - 1);
}
mTrace("child table is initialized");
return 0;
}
void mgmtCleanUpChildTables() {
sdbCloseTable(tsChildTableSdb);
}
int8_t *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup) {
......@@ -285,6 +324,7 @@ int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgr
}
strcpy(pTable->tableId, pCreate->tableId);
strcpy(pTable->superTableId, pSuperTable->tableId);
pTable->type = TSDB_TABLE_TYPE_CHILD_TABLE;
pTable->createdTime = taosGetTimestampMs();
pTable->superTable = pSuperTable;
pTable->vgId = pVgroup->vgId;
......@@ -299,47 +339,35 @@ int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgr
mError("table:%s, corresponding super table schema is null", pCreate->tableId);
return TSDB_CODE_INVALID_TABLE;
}
memcpy(schema, pTagData + TSDB_TABLE_ID_LEN + 1, size);
// memcpy(schema, pTagData + TSDB_TABLE_ID_LEN + 1, size);
if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) {
mError("table:%s, update sdb error", pCreate->tableId);
return TSDB_CODE_SDB_ERROR;
}
mgmtAddTimeSeries(pTable->superTable->numOfColumns - 1);
mgmtSendCreateTableMsg(pTable, pVgroup);
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s",
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
mgmtAddTableIntoDb(pDb);
return 0;
}
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) {
SVgObj *pVgroup;
SAcctObj *pAcct;
pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct != NULL) {
pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1);
}
pVgroup = mgmtGetVgroup(pTable->vgId);
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
return TSDB_CODE_OTHERS;
}
mgmtRestoreTimeSeries(pTable->superTable->numOfColumns - 1);
mgmtSendRemoveMeterMsgToDnode((STableInfo *) pTable, pVgroup);
sdbDeleteRow(tsChildTableSdb, pTable);
if (pVgroup->numOfTables <= 0) {
mgmtDropVgroup(pDb, pVgroup);
}
mgmtRemoveTableFromDb(pDb);
return 0;
}
......
......@@ -463,7 +463,7 @@ int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) {
}
if (pAlter->maxSessions > 0) {
//rebuild meterList in mgmtVgroup.c
sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, 0);
mgmtUpdateVgroup(pVgroup);
}
mgmtSendVPeersMsg(pVgroup);
pVgroup = pVgroup->next;
......
......@@ -32,7 +32,6 @@
#include "dnodeSystem.h"
#include "mgmtChildTable.h"
#include "mgmtNormalTable.h"
#include "mgmtStreamTable.h"
void mgmtProcessMsgFromDnode(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn);
int mgmtSendVPeersMsg(SVgObj *pVgroup);
......@@ -234,29 +233,6 @@ int32_t mgmtSendCreateTableMsg(SChildTableObj *pTable, SVgObj *pVgroup) {
return 0;
}
int32_t mgmtSendCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup) {
// uint64_t timeStamp = taosGetTimestampMs();
//
// for (int32_t index = 0; index < pVgroup->numOfVnodes; ++index) {
// SDnodeObj *pObj = mgmtGetDnode(pVgroup->vnodeGid[index].ip);
// if (pObj == NULL) {
// continue;
// }
//
// int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE, 64000);
// if (pStart == NULL) {
// continue;
// }
//
// int8_t *pMsg = mgmtBuildCreateStreamTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode);
// int32_t msgLen = pMsg - pStart;
//
// mgmtSendMsgToDnode(pObj, pStart, msgLen);
// }
//
// pVgroup->lastCreate = timeStamp;
return 0;
}
int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) {
// uint64_t timeStamp = taosGetTimestampMs();
......
......@@ -25,11 +25,15 @@ int32_t (*mgmtCheckUserGrant)() = mgmtCheckUserGrantImp;
int32_t mgmtCheckDbGrantImp() { return 0; }
int32_t (*mgmtCheckDbGrant)() = mgmtCheckDbGrantImp;
void mgmtAddTimeSeriesImp(uint32_t timeSeriesNum) {}
void (*mgmtAddTimeSeries)(uint32_t timeSeriesNum) = mgmtAddTimeSeriesImp;
void mgmtRestoreTimeSeriesImp(uint32_t timeSeriesNum) {}
void (*mgmtRestoreTimeSeries)(uint32_t timeSeriesNum) = mgmtRestoreTimeSeriesImp;
void mgmtAddTimeSeriesImp(SAcctObj *pAcct, uint32_t timeSeriesNum) {
pAcct->acctInfo.numOfTimeSeries += timeSeriesNum;
}
void (*mgmtAddTimeSeries)(SAcctObj *pAcct, uint32_t timeSeriesNum) = mgmtAddTimeSeriesImp;
void mgmtRestoreTimeSeriesImp(SAcctObj *pAcct, uint32_t timeSeriesNum) {
pAcct->acctInfo.numOfTimeSeries -= timeSeriesNum;
}
void (*mgmtRestoreTimeSeries)(SAcctObj *pAcct, uint32_t timeSeriesNum) = mgmtRestoreTimeSeriesImp;
int32_t mgmtCheckTimeSeriesImp(uint32_t timeseries) { return 0; }
int32_t (*mgmtCheckTimeSeries)(uint32_t timeseries) = mgmtCheckTimeSeriesImp;
......
......@@ -36,6 +36,7 @@
#include "mgmtVgroup.h"
void *tsNormalTableSdb;
int32_t tsNormalTableUpdateSize;
void *(*mgmtNormalTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtNormalTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
......@@ -48,6 +49,7 @@ void *mgmtNormalTableActionDestroy(void *row, char *str, int32_t size, int32_t *
static void mgmtDestroyNormalTable(SNormalTableObj *pTable) {
free(pTable->schema);
free(pTable->sql);
free(pTable);
}
......@@ -63,8 +65,13 @@ static void mgmtNormalTableActionInit() {
void *mgmtNormalTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) {
SNormalTableObj *pTable = (SNormalTableObj *) row;
int32_t tsize = pTable->updateEnd - (int8_t *) pTable;
memcpy(pTable, str, tsize);
memcpy(pTable, str, tsNormalTableUpdateSize);
int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns) + pTable->sqlLen;
pTable->schema = realloc(pTable->schema, schemaSize);
pTable->sql = (char*)pTable->schema + sizeof(SSchema) * (pTable->numOfColumns);
memcpy(pTable->schema, str + tsNormalTableUpdateSize, schemaSize);
return NULL;
}
......@@ -103,10 +110,9 @@ void *mgmtNormalTableActionInsert(void *row, char *str, int32_t size, int32_t *s
}
}
pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1);
pVgroup->numOfTables++;
pDb->numOfTables++;
pVgroup->tableList[pTable->sid] = (STableInfo *) pTable;
mgmtAddTimeSeries(pAcct, pTable->numOfColumns - 1);
mgmtAddTableIntoDb(pDb);
mgmtAddTableIntoVgroup(pVgroup, (STableInfo *) pTable);
if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) {
mgmtMoveVgroupToTail(pDb, pVgroup);
......@@ -139,11 +145,9 @@ void *mgmtNormalTableActionDelete(void *row, char *str, int32_t size, int32_t *s
return NULL;
}
pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1);
pVgroup->tableList[pTable->sid] = NULL;
pVgroup->numOfTables--;
pDb->numOfTables--;
taosFreeId(pVgroup->idPool, pTable->sid);
mgmtRestoreTimeSeries(pAcct, pTable->numOfColumns - 1);
mgmtRemoveTableFromDb(pDb);
mgmtRemoveTableFromVgroup(pVgroup, (STableInfo *) pTable);
if (pVgroup->numOfTables > 0) {
mgmtMoveVgroupToHead(pDb, pVgroup);
......@@ -160,16 +164,16 @@ void *mgmtNormalTableActionEncode(void *row, char *str, int32_t size, int32_t *s
SNormalTableObj *pTable = (SNormalTableObj *) row;
assert(row != NULL && str != NULL);
int32_t tsize = pTable->updateEnd - (int8_t *) pTable;
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
if (size < tsize + schemaSize + 1) {
if (size < tsNormalTableUpdateSize + schemaSize + 1) {
*ssize = -1;
return NULL;
}
memcpy(str, pTable, tsize);
memcpy(str + tsize, pTable->schema, schemaSize);
*ssize = tsize + schemaSize;
memcpy(str, pTable, tsNormalTableUpdateSize);
memcpy(str + tsNormalTableUpdateSize, pTable->schema, schemaSize);
memcpy(str + tsNormalTableUpdateSize + schemaSize, pTable->sql, pTable->sqlLen);
*ssize = tsNormalTableUpdateSize + schemaSize + pTable->sqlLen;
return NULL;
}
......@@ -183,12 +187,11 @@ void *mgmtNormalTableActionDecode(void *row, char *str, int32_t size, int32_t *s
}
memset(pTable, 0, sizeof(SNormalTableObj));
int32_t tsize = pTable->updateEnd - (int8_t *)pTable;
if (size < tsize) {
if (size < tsNormalTableUpdateSize) {
mgmtDestroyNormalTable(pTable);
return NULL;
}
memcpy(pTable, str, tsize);
memcpy(pTable, str, tsNormalTableUpdateSize);
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
pTable->schema = (SSchema *)malloc(schemaSize);
......@@ -197,7 +200,14 @@ void *mgmtNormalTableActionDecode(void *row, char *str, int32_t size, int32_t *s
return NULL;
}
memcpy(pTable->schema, str + tsize, schemaSize);
memcpy(pTable->schema, str + tsNormalTableUpdateSize, schemaSize);
pTable->sql = (char *)malloc(pTable->sqlLen);
if (pTable->sql == NULL) {
mgmtDestroyNormalTable(pTable);
return NULL;
}
memcpy(pTable->sql, str + tsNormalTableUpdateSize + schemaSize, pTable->sqlLen);
return (void *)pTable;
}
......@@ -211,37 +221,69 @@ void *mgmtNormalTableAction(char action, void *row, char *str, int32_t size, int
int32_t mgmtInitNormalTables() {
void *pNode = NULL;
void *pLastNode = NULL;
SChildTableObj *pTable = NULL;
SNormalTableObj *pTable = NULL;
mgmtNormalTableActionInit();
SNormalTableObj tObj;
tsNormalTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
tsNormalTableSdb = sdbOpenTable(tsMaxTables, sizeof(SNormalTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS,
"ntables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtNormalTableAction);
if (tsNormalTableSdb == NULL) {
mError("failed to init normal table data");
mError("failed to init ntables data");
return -1;
}
pNode = NULL;
while (1) {
pLastNode = pNode;
pNode = sdbFetchRow(tsNormalTableSdb, pNode, (void **)&pTable);
if (pTable == NULL) {
break;
}
if (pTable == NULL) break;
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL) {
mError("normal table:%s, failed to get db, discard it", pTable->tableId);
mError("ntable:%s, failed to get db, discard it", pTable->tableId);
sdbDeleteRow(tsNormalTableSdb, pTable);
pNode = pLastNode;
continue;
}
mgmtAddTableIntoDb(pDb);
}
mgmtSetVgroupIdPool();
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("ntable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->tableId, pTable->vgId, pTable->sid);
pTable->vgId = 0;
sdbDeleteRow(tsNormalTableSdb, pTable);
pNode = pLastNode;
continue;
}
if (strcmp(pVgroup->dbName, pDb->name) != 0) {
mError("ntable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it",
pTable->tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid);
pTable->vgId = 0;
sdbDeleteRow(tsNormalTableSdb, pTable);
pNode = pLastNode;
continue;
}
if (pVgroup->tableList == NULL) {
mError("ntable:%s, vgroup:%d tableList is null", pTable->tableId, pTable->vgId);
pTable->vgId = 0;
sdbDeleteRow(tsNormalTableSdb, pTable);
pNode = pLastNode;
continue;
}
mgmtAddTableIntoVgroup(pVgroup, pTable);
//pVgroup->tableList[pTable->sid] = (STableInfo*)pTable;
taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid, 1);
pTable->sql = (char *)pTable->schema + sizeof(SSchema) * pTable->numOfColumns;
mTrace("normal table is initialized");
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
mgmtAddTimeSeries(pAcct, pTable->numOfColumns - 1);
}
mTrace("ntables is initialized");
return 0;
}
......@@ -278,9 +320,9 @@ int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable) {
}
int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) {
int32_t numOfTables = sdbGetNumOfRows(tsChildTableSdb);
if (numOfTables >= TSDB_MAX_TABLES) {
mError("normal table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_TABLES);
int32_t numOfTables = sdbGetNumOfRows(tsNormalTableSdb);
if (numOfTables >= TSDB_MAX_NORMAL_TABLES) {
mError("normal table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_NORMAL_TABLES);
return TSDB_CODE_TOO_MANY_TABLES;
}
......@@ -290,6 +332,7 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg
}
strcpy(pTable->tableId, pCreate->tableId);
pTable->type = TSDB_TABLE_TYPE_NORMAL_TABLE;
pTable->createdTime = taosGetTimestampMs();
pTable->vgId = pVgroup->vgId;
pTable->sid = sid;
......@@ -297,13 +340,12 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg
pTable->sversion = 0;
pTable->numOfColumns = pCreate->numOfColumns;
int32_t numOfCols = pCreate->numOfColumns + pCreate->numOfTags;
int32_t numOfCols = pCreate->numOfColumns;
int32_t schemaSize = numOfCols * sizeof(SSchema);
pTable->schema = (SSchema *) calloc(1, schemaSize);
if (pTable->schema == NULL) {
free(pTable);
mError("table:%s, no schema input", pCreate->tableId);
return TSDB_CODE_INVALID_TABLE;
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
......@@ -313,50 +355,46 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg
tschema[col].colId = pTable->nextColId++;
}
pTable->sqlLen = pCreate->sqlLen;
if (pTable->sqlLen != 0) {
pTable->type = TSDB_TABLE_TYPE_STREAM_TABLE;
pTable->sql = calloc(1, pTable->sqlLen);
if (pTable->sql == NULL) {
free(pTable);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
memcpy(pTable->sql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pCreate->sqlLen);
pTable->sql[pCreate->sqlLen - 1] = 0;
mTrace("table:%s, stream sql len:%d sql:%s", pCreate->tableId, pCreate->sqlLen, pTable->sql);
}
if (sdbInsertRow(tsNormalTableSdb, pTable, 0) < 0) {
mError("table:%s, update sdb error", pCreate->tableId);
return TSDB_CODE_SDB_ERROR;
}
mgmtAddTimeSeries(pTable->numOfColumns - 1);
mgmtSendCreateNormalTableMsg(pTable, pVgroup);
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%"
PRIu64
" db:%s",
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s",
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
mgmtAddTableIntoDb(pDb);
return 0;
}
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) {
SVgObj *pVgroup;
SAcctObj *pAcct;
pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct != NULL) {
pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1);
}
pVgroup = mgmtGetVgroup(pTable->vgId);
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
return TSDB_CODE_OTHERS;
}
mgmtRestoreTimeSeries(pTable->numOfColumns - 1);
mgmtSendRemoveMeterMsgToDnode((STableInfo *) pTable, pVgroup);
sdbDeleteRow(tsChildTableSdb, pTable);
sdbDeleteRow(tsNormalTableSdb, pTable);
if (pVgroup->numOfTables <= 0) {
mgmtDropVgroup(pDb, pVgroup);
}
mgmtRemoveTableFromDb(pDb);
return 0;
}
......
......@@ -33,7 +33,6 @@
#include "mgmtNormalTable.h"
#include "mgmtProfile.h"
#include "mgmtShell.h"
#include "mgmtStreamTable.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtUser.h"
......@@ -180,7 +179,7 @@ int32_t mgmtProcessTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) {
STableMeta *pMeta = rpcMallocCont(sizeof(STableMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS);
int32_t code = mgmtGetTableMeta(pDb, pTable, pMeta, usePublicIp);
if (code == TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS) {
rpcFreeCont(pMeta);
rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, NULL, 0);
} else {
......@@ -253,7 +252,7 @@ int32_t mgmtProcessSuperTableMetaMsg(void *pCont, int32_t contLen, void *ahandle
SRpcConnInfo connInfo;
rpcGetConnInfo(ahandle, &connInfo);
bool usePublicIp = (connInfo.serverIp == tsPublicIpInt);
// bool usePublicIp = (connInfo.serverIp == tsPublicIpInt);
SSuperTableInfoMsg *pInfo = pCont;
STableInfo *pTable = mgmtGetSuperTable(pInfo->tableId);
......@@ -810,7 +809,9 @@ int32_t mgmtProcessCreateTableMsg(void *pCont, int32_t contLen, void *ahandle) {
}
}
rpcSendResponse(ahandle, code, NULL, 0);
if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
rpcSendResponse(ahandle, code, NULL, 0);
}
return code;
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "tast.h"
#include "textbuffer.h"
#include "tschemautil.h"
#include "tscompression.h"
#include "tskiplist.h"
#include "tsqlfunction.h"
#include "ttime.h"
#include "tstatus.h"
#include "tutil.h"
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtDb.h"
#include "mgmtDnodeInt.h"
#include "mgmtGrant.h"
#include "mgmtStreamTable.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtVgroup.h"
void *tsStreamTableSdb;
void *(*mgmtStreamTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtStreamTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtStreamTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtStreamTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtStreamTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtStreamTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtStreamTableActionReset(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtStreamTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize);
static void mgmtDestroyStreamTable(SStreamTableObj *pTable) {
free(pTable->schema);
free(pTable->sql);
free(pTable);
}
static void mgmtStreamTableActionInit() {
mgmtStreamTableActionFp[SDB_TYPE_INSERT] = mgmtStreamTableActionInsert;
mgmtStreamTableActionFp[SDB_TYPE_DELETE] = mgmtStreamTableActionDelete;
mgmtStreamTableActionFp[SDB_TYPE_UPDATE] = mgmtStreamTableActionUpdate;
mgmtStreamTableActionFp[SDB_TYPE_ENCODE] = mgmtStreamTableActionEncode;
mgmtStreamTableActionFp[SDB_TYPE_DECODE] = mgmtStreamTableActionDecode;
mgmtStreamTableActionFp[SDB_TYPE_RESET] = mgmtStreamTableActionReset;
mgmtStreamTableActionFp[SDB_TYPE_DESTROY] = mgmtStreamTableActionDestroy;
}
void *mgmtStreamTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) {
SStreamTableObj *pTable = (SStreamTableObj *) row;
int32_t tsize = pTable->updateEnd - (int8_t *) pTable;
memcpy(pTable, str, tsize);
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
pTable->schema = (SSchema *) realloc(pTable->schema, schemaSize);
memcpy(pTable->schema, str + tsize, schemaSize);
pTable->sql = (char *) realloc(pTable->sql, pTable->sqlLen);
memcpy(pTable->sql, str + tsize + schemaSize, pTable->sqlLen);
return NULL;
}
void *mgmtStreamTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) {
SStreamTableObj *pTable = (SStreamTableObj *)row;
mgmtDestroyStreamTable(pTable);
return NULL;
}
void *mgmtStreamTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize) {
SNormalTableObj *pTable = (SNormalTableObj *) row;
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId);
return NULL;
}
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) {
mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName);
return NULL;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("account not exists");
return NULL;
}
if (!sdbMaster) {
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid != pTable->sid) {
mError("sid:%d is not matched from the master:%d", sid, pTable->sid);
return NULL;
}
}
pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1);
pVgroup->numOfTables++;
pDb->numOfTables++;
pVgroup->tableList[pTable->sid] = (STableInfo *) pTable;
if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) {
mgmtMoveVgroupToTail(pDb, pVgroup);
}
return NULL;
}
void *mgmtStreamTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize) {
SNormalTableObj *pTable = (SNormalTableObj *) row;
if (pTable->vgId == 0) {
return NULL;
}
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId);
return NULL;
}
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) {
mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName);
return NULL;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("account not exists");
return NULL;
}
pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1);
pVgroup->tableList[pTable->sid] = NULL;
pVgroup->numOfTables--;
pDb->numOfTables--;
taosFreeId(pVgroup->idPool, pTable->sid);
if (pVgroup->numOfTables > 0) {
mgmtMoveVgroupToHead(pDb, pVgroup);
}
return NULL;
}
void *mgmtStreamTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) {
return mgmtStreamTableActionReset(row, str, size, NULL);
}
void *mgmtStreamTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize) {
SStreamTableObj *pTable = (SStreamTableObj *) row;
assert(row != NULL && str != NULL);
int32_t tsize = pTable->updateEnd - (int8_t *) pTable;
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
if (size < tsize + schemaSize + pTable->sqlLen + 1) {
*ssize = -1;
return NULL;
}
memcpy(str, pTable, tsize);
memcpy(str + tsize, pTable->schema, schemaSize);
memcpy(str + tsize + schemaSize, pTable->sql, pTable->sqlLen);
*ssize = tsize + schemaSize + pTable->sqlLen;
return NULL;
}
void *mgmtStreamTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize) {
assert(str != NULL);
SStreamTableObj *pTable = (SStreamTableObj *)malloc(sizeof(SNormalTableObj));
if (pTable == NULL) {
return NULL;
}
memset(pTable, 0, sizeof(SStreamTableObj));
int32_t tsize = pTable->updateEnd - (int8_t *)pTable;
if (size < tsize) {
mgmtDestroyStreamTable(pTable);
return NULL;
}
memcpy(pTable, str, tsize);
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
pTable->schema = (SSchema *)malloc(schemaSize);
if (pTable->schema == NULL) {
mgmtDestroyStreamTable(pTable);
return NULL;
}
memcpy(pTable->schema, str + tsize, schemaSize);
pTable->sql = (char *)malloc(pTable->sqlLen);
if (pTable->sql == NULL) {
mgmtDestroyStreamTable(pTable);
return NULL;
}
memcpy(pTable->sql, str + tsize + schemaSize, pTable->sqlLen);
return (void *)pTable;
}
void *mgmtStreamTableAction(char action, void *row, char *str, int32_t size, int32_t *ssize) {
if (mgmtStreamTableActionFp[(uint8_t)action] != NULL) {
return (*(mgmtStreamTableActionFp[(uint8_t)action]))(row, str, size, ssize);
}
return NULL;
}
int32_t mgmtInitStreamTables() {
void *pNode = NULL;
void *pLastNode = NULL;
SChildTableObj *pTable = NULL;
mgmtStreamTableActionInit();
tsStreamTableSdb = sdbOpenTable(tsMaxTables, sizeof(SStreamTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_MAX_SQL_LEN,
"streams", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtStreamTableAction);
if (tsStreamTableSdb == NULL) {
mError("failed to init stream table data");
return -1;
}
pNode = NULL;
while (1) {
pNode = sdbFetchRow(tsStreamTableSdb, pNode, (void **)&pTable);
if (pTable == NULL) {
break;
}
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL) {
mError("stream table:%s, failed to get db, discard it", pTable->tableId);
sdbDeleteRow(tsStreamTableSdb, pTable);
pNode = pLastNode;
continue;
}
mgmtAddTableIntoDb(pDb);
}
mgmtSetVgroupIdPool();
mTrace("stream table is initialized");
return 0;
}
void mgmtCleanUpStreamTables() {
}
int8_t *mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup) {
// SDCreateTableMsg *pCreateTable = (SDCreateTableMsg *) pMsg;
// memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
// pCreateTable->vnode = htonl(vnode);
// pCreateTable->sid = htonl(pTable->sid);
// pCreateTable->uid = pTable->uid;
// pCreateTable->createdTime = htobe64(pTable->createdTime);
// pCreateTable->sversion = htonl(pTable->sversion);
// pCreateTable->numOfColumns = htons(pTable->numOfColumns);
// //pCreateTable->sqlLen = htons(pTable->sqlLen);
//
// SSchema *pSchema = pTable->schema;
// int32_t totalCols = pCreateTable->numOfColumns;
// for (int32_t col = 0; col < totalCols; ++col) {
// SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col];
// colData->type = pSchema[col].type;
// colData->bytes = htons(pSchema[col].bytes);
// colData->colId = htons(pSchema[col].colId);
// }
// int32_t totalColsSize = sizeof(SMColumn *) * totalCols;
// pMsg = pCreateTable->data + totalColsSize + pTable->sqlLen;
// char *sql = pTable->schema + pTable->schemaSize;
// memcpy(pCreateTable->data + totalColsSize, pTable->sqlLen, sql);
// return pMsg;
return NULL;
}
int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) {
int32_t numOfTables = sdbGetNumOfRows(tsStreamTableSdb);
if (numOfTables >= TSDB_MAX_TABLES) {
mError("stream table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_TABLES);
return TSDB_CODE_TOO_MANY_TABLES;
}
SStreamTableObj *pTable = (SStreamTableObj *) calloc(sizeof(SStreamTableObj), 1);
if (pTable == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
strcpy(pTable->tableId, pCreate->tableId);
pTable->createdTime = taosGetTimestampMs();
pTable->vgId = pVgroup->vgId;
pTable->sid = sid;
pTable->uid = (((uint64_t) pTable->createdTime) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
pTable->sversion = 0;
pTable->numOfColumns = pCreate->numOfColumns;
int32_t numOfCols = pCreate->numOfColumns + pCreate->numOfTags;
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
pTable->schema = (SSchema *) calloc(1, schemaSize);
if (pTable->schema == NULL) {
free(pTable);
mError("table:%s, no schema input", pCreate->tableId);
return TSDB_CODE_INVALID_TABLE;
}
memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
pTable->nextColId = 0;
for (int32_t col = 0; col < pCreate->numOfColumns; col++) {
SSchema *tschema = (SSchema *) pTable->schema;
tschema[col].colId = pTable->nextColId++;
}
pTable->sql = (char*)(pTable->schema + numOfCols * sizeof(SSchema));
memcpy(pTable->sql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pCreate->sqlLen);
pTable->sql[pCreate->sqlLen - 1] = 0;
mTrace("table:%s, stream sql len:%d sql:%s", pCreate->tableId, pCreate->sqlLen, pTable->sql);
if (sdbInsertRow(tsStreamTableSdb, pTable, 0) < 0) {
mError("table:%s, update sdb error", pCreate->tableId);
return TSDB_CODE_SDB_ERROR;
}
mgmtAddTimeSeries(pTable->numOfColumns - 1);
mgmtSendCreateStreamTableMsg(pTable, pVgroup);
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%"
PRIu64
" db:%s",
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
mgmtAddTableIntoDb(pDb);
return 0;
}
int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable) {
SVgObj * pVgroup;
SAcctObj *pAcct;
pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct != NULL) {
pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1);
}
pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
return TSDB_CODE_OTHERS;
}
mgmtRestoreTimeSeries(pTable->numOfColumns - 1);
mgmtSendRemoveMeterMsgToDnode((STableInfo *) pTable, pVgroup);
sdbDeleteRow(tsChildTableSdb, pTable);
if (pVgroup->numOfTables <= 0) {
mgmtDropVgroup(pDb, pVgroup);
}
mgmtRemoveTableFromDb(pDb);
return 0;
}
void* mgmtGetStreamTable(char *tableId) {
return sdbGetRow(tsStreamTableSdb, tableId);
}
static int32_t mgmtSetSchemaFromStreamTable(SSchema *pSchema, SStreamTableObj *pTable) {
int32_t numOfCols = pTable->numOfColumns;
for (int32_t i = 0; i < numOfCols; ++i) {
strcpy(pSchema->name, pTable->schema[i].name);
pSchema->type = pTable->schema[i].type;
pSchema->bytes = htons(pTable->schema[i].bytes);
pSchema->colId = htons(pTable->schema[i].colId);
pSchema++;
}
return numOfCols * sizeof(SSchema);
}
int32_t mgmtGetStreamTableMeta(SDbObj *pDb, SStreamTableObj *pTable, STableMeta *pMeta, bool usePublicIp) {
pMeta->uid = htobe64(pTable->uid);
pMeta->sid = htonl(pTable->sid);
pMeta->vgid = htonl(pTable->vgId);
pMeta->sversion = htons(pTable->sversion);
pMeta->precision = pDb->cfg.precision;
pMeta->numOfTags = 0;
pMeta->numOfColumns = htons(pTable->numOfColumns);
pMeta->tableType = pTable->type;
pMeta->contLen = sizeof(STableMeta) + mgmtSetSchemaFromStreamTable(pMeta->schema, pTable);
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
return TSDB_CODE_INVALID_TABLE;
}
for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
if (usePublicIp) {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
} else {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -37,17 +37,17 @@
#include "mgmtUser.h"
#include "mgmtVgroup.h"
void *tsSuperTableSdb;
int32_t tsSuperTableUpdateSize;
void *(*mgmtSuperTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtSuperTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtSuperTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtSuperTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtSuperTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtSuperTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtSuperTableActionReset(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtSuperTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize);
static void *tsSuperTableSdb;
static int32_t tsSuperTableUpdateSize;
static void *(*mgmtSuperTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtSuperTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtSuperTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtSuperTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtSuperTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtSuperTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtSuperTableActionReset(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtSuperTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize);
static void mgmtDestroySuperTable(SSuperTableObj *pTable) {
free(pTable->schema);
......@@ -55,22 +55,25 @@ static void mgmtDestroySuperTable(SSuperTableObj *pTable) {
}
static void mgmtSuperTableActionInit() {
mgmtSuperTableActionFp[SDB_TYPE_INSERT] = mgmtSuperTableActionInsert;
mgmtSuperTableActionFp[SDB_TYPE_DELETE] = mgmtSuperTableActionDelete;
mgmtSuperTableActionFp[SDB_TYPE_UPDATE] = mgmtSuperTableActionUpdate;
mgmtSuperTableActionFp[SDB_TYPE_ENCODE] = mgmtSuperTableActionEncode;
mgmtSuperTableActionFp[SDB_TYPE_DECODE] = mgmtSuperTableActionDecode;
mgmtSuperTableActionFp[SDB_TYPE_RESET] = mgmtSuperTableActionReset;
SSuperTableObj tObj;
tsSuperTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
mgmtSuperTableActionFp[SDB_TYPE_INSERT] = mgmtSuperTableActionInsert;
mgmtSuperTableActionFp[SDB_TYPE_DELETE] = mgmtSuperTableActionDelete;
mgmtSuperTableActionFp[SDB_TYPE_UPDATE] = mgmtSuperTableActionUpdate;
mgmtSuperTableActionFp[SDB_TYPE_ENCODE] = mgmtSuperTableActionEncode;
mgmtSuperTableActionFp[SDB_TYPE_DECODE] = mgmtSuperTableActionDecode;
mgmtSuperTableActionFp[SDB_TYPE_RESET] = mgmtSuperTableActionReset;
mgmtSuperTableActionFp[SDB_TYPE_DESTROY] = mgmtSuperTableActionDestroy;
}
void *mgmtSuperTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) {
SSuperTableObj *pTable = (SSuperTableObj *) row;
memcpy(pTable, str, tsDbUpdateSize);
memcpy(pTable, str, tsSuperTableUpdateSize);
int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags);
pTable->schema = realloc(pTable->schema, schemaSize);
memcpy(pTable->schema, str + tsDbUpdateSize, schemaSize);
memcpy(pTable->schema, str + tsSuperTableUpdateSize, schemaSize);
return NULL;
}
......@@ -82,10 +85,20 @@ void *mgmtSuperTableActionDestroy(void *row, char *str, int32_t size, int32_t *s
}
void *mgmtSuperTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize) {
STableInfo *pTable = (STableInfo *) row;
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb) {
mgmtAddSuperTableIntoDb(pDb);
}
return NULL;
}
void *mgmtSuperTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize) {
STableInfo *pTable = (STableInfo *) row;
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb) {
mgmtRemoveSuperTableFromDb(pDb);
}
return NULL;
}
......@@ -114,7 +127,7 @@ void *mgmtSuperTableActionEncode(void *row, char *str, int32_t size, int32_t *ss
void *mgmtSuperTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize) {
assert(str != NULL);
SSuperTableObj *pTable = (SSuperTableObj *)malloc(sizeof(SSuperTableObj));
SSuperTableObj *pTable = (SSuperTableObj *) malloc(sizeof(SSuperTableObj));
if (pTable == NULL) {
return NULL;
}
......@@ -134,35 +147,33 @@ void *mgmtSuperTableActionDecode(void *row, char *str, int32_t size, int32_t *ss
}
memcpy(pTable->schema, str + tsSuperTableUpdateSize, schemaSize);
return (void *)pTable;
return (void *) pTable;
}
void *mgmtSuperTableAction(char action, void *row, char *str, int32_t size, int32_t *ssize) {
if (mgmtSuperTableActionFp[(uint8_t)action] != NULL) {
return (*(mgmtSuperTableActionFp[(uint8_t)action]))(row, str, size, ssize);
if (mgmtSuperTableActionFp[(uint8_t) action] != NULL) {
return (*(mgmtSuperTableActionFp[(uint8_t) action]))(row, str, size, ssize);
}
return NULL;
}
int32_t mgmtInitSuperTables() {
void * pNode = NULL;
void * pLastNode = NULL;
SSuperTableObj * pTable = NULL;
void *pNode = NULL;
void *pLastNode = NULL;
SSuperTableObj *pTable = NULL;
mgmtSuperTableActionInit();
SSuperTableObj tObj;
tsSuperTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
tsSuperTableSdb = sdbOpenTable(tsMaxTables, tsSuperTableUpdateSize + sizeof(SSchema) * TSDB_MAX_COLUMNS,
"stables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtSuperTableAction);
if (tsSuperTableSdb == NULL) {
mError("failed to init super table data");
mError("failed to init stables data");
return -1;
}
pNode = NULL;
while (1) {
pNode = sdbFetchRow(tsSuperTableSdb, pNode, (void **)&pTable);
pNode = sdbFetchRow(tsSuperTableSdb, pNode, (void **) &pTable);
if (pTable == NULL) {
break;
}
......@@ -178,9 +189,7 @@ int32_t mgmtInitSuperTables() {
mgmtAddSuperTableIntoDb(pDb);
}
mgmtSetVgroupIdPool();
mTrace("super table is initialized");
mTrace("stables is initialized");
return 0;
}
......@@ -190,8 +199,8 @@ void mgmtCleanUpSuperTables() {
int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate) {
int32_t numOfTables = sdbGetNumOfRows(tsSuperTableSdb);
if (numOfTables >= TSDB_MAX_TABLES) {
mError("super table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_TABLES);
if (numOfTables >= TSDB_MAX_SUPER_TABLES) {
mError("stable:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_SUPER_TABLES);
return TSDB_CODE_TOO_MANY_TABLES;
}
......@@ -201,6 +210,7 @@ int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate) {
}
strcpy(pStable->tableId, pCreate->tableId);
pStable->type = TSDB_TABLE_TYPE_SUPER_TABLE;
pStable->createdTime = taosGetTimestampMs();
pStable->vgId = 0;
pStable->sid = 0;
......@@ -214,7 +224,7 @@ int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate) {
pStable->schema = (SSchema *)calloc(1, schemaSize);
if (pStable->schema == NULL) {
free(pStable);
mError("table:%s, no schema input", pCreate->tableId);
mError("stable:%s, no schema input", pCreate->tableId);
return TSDB_CODE_INVALID_TABLE;
}
memcpy(pStable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
......@@ -230,7 +240,6 @@ int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate) {
return TSDB_CODE_SDB_ERROR;
}
mgmtAddSuperTableIntoDb(pDb);
return TSDB_CODE_SUCCESS;
}
......@@ -634,20 +643,6 @@ int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, STableMeta *p
pMeta->tableType = pTable->type;
pMeta->contLen = sizeof(STableMeta) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable);
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
return TSDB_CODE_INVALID_TABLE;
}
for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
if (usePublicIp) {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
} else {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -33,11 +33,14 @@
#include "mgmtDnodeInt.h"
#include "mgmtGrant.h"
#include "mgmtNormalTable.h"
#include "mgmtStreamTable.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtUser.h"
#include "mgmtVgroup.h"
extern void *tsNormalTableSdb;
extern void *tsChildTableSdb;
int32_t mgmtInitTables() {
int32_t code = mgmtInitSuperTables();
if (code != TSDB_CODE_SUCCESS) {
......@@ -49,16 +52,13 @@ int32_t mgmtInitTables() {
return code;
}
code = mgmtInitStreamTables();
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = mgmtInitChildTables();
if (code != TSDB_CODE_SUCCESS) {
return code;
}
mgmtSetVgroupIdPool();
return TSDB_CODE_SUCCESS;
}
......@@ -73,12 +73,7 @@ STableInfo* mgmtGetTable(char *tableId) {
return tableInfo;
}
tableInfo = (STableInfo *) mgmtGetStreamTable(tableId);
if (tableInfo != NULL) {
return tableInfo;
}
tableInfo = (STableInfo *) mgmtGetNormalTable(tableId);
tableInfo = (STableInfo *) mgmtGetChildTable(tableId);
if (tableInfo != NULL) {
return tableInfo;
}
......@@ -102,8 +97,6 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid) {
int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp) {
if (pTable->type == TSDB_TABLE_TYPE_CHILD_TABLE) {
mgmtGetChildTableMeta(pDb, (SChildTableObj *) pTable, pMeta, usePublicIp);
} else if (pTable->type == TSDB_TABLE_TYPE_STREAM_TABLE) {
mgmtGetStreamTableMeta(pDb, (SStreamTableObj *) pTable, pMeta, usePublicIp);
} else if (pTable->type == TSDB_TABLE_TYPE_NORMAL_TABLE) {
mgmtGetNormalTableMeta(pDb, (SNormalTableObj *) pTable, pMeta, usePublicIp);
} else if (pTable->type == TSDB_TABLE_TYPE_SUPER_TABLE) {
......@@ -147,22 +140,17 @@ int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) {
return grantCode;
}
SVgObj *pVgroup = mgmtGetAvailVgroup(pDb);
int32_t sid;
SVgObj *pVgroup = mgmtGetAvailVgroup(pDb, &sid);
if (pVgroup == NULL) {
return terrno;
}
int32_t sid = mgmtAllocateSid(pDb, pVgroup);
if (sid < 0) {
return terrno;
}
if (pCreate->numOfColumns == 0) {
return mgmtCreateChildTable(pDb, pCreate, pVgroup, sid);
} else if (pCreate->sqlLen > 0) {
return mgmtCreateStreamTable(pDb, pCreate, pVgroup, sid);
// process it in a callback function
return TSDB_CODE_ACTION_IN_PROGRESS;
} else {
return mgmtCreateNormalTable(pDb, pCreate, pVgroup, sid);
if (pCreate->numOfColumns == 0) {
return mgmtCreateChildTable(pDb, pCreate, pVgroup, sid);
} else {
return mgmtCreateNormalTable(pDb, pCreate, pVgroup, sid);
}
}
} else {
return mgmtCreateSuperTable(pDb, pCreate);
......@@ -188,8 +176,6 @@ int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore) {
return mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable);
case TSDB_TABLE_TYPE_CHILD_TABLE:
return mgmtDropChildTable(pDb, (SChildTableObj *) pTable);
case TSDB_TABLE_TYPE_STREAM_TABLE:
return mgmtDropStreamTable(pDb, (SStreamTableObj *) pTable);
case TSDB_TABLE_TYPE_NORMAL_TABLE:
return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable);
default:
......@@ -242,7 +228,6 @@ int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter) {
void mgmtCleanUpMeters() {
mgmtCleanUpNormalTables();
mgmtCleanUpStreamTables();
mgmtCleanUpChildTables();
mgmtCleanUpSuperTables();
}
......@@ -306,116 +291,102 @@ static void mgmtVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_
}
int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
SDbObj *pDb = mgmtGetDb(pShow->db);
if (pDb == NULL) return 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
if (strcmp(pUser->user, "root") != 0 && strcmp(pUser->user, "_root") != 0 &&
strcmp(pUser->user, "monitor") != 0) {
return 0;
}
}
int32_t numOfRows = 0;
// int32_t numOfRead = 0;
// int32_t cols = 0;
// void *pTable = NULL;
// char *pWrite = NULL;
//
// int16_t numOfColumns;
// int64_t createdTime;
// char *tableId;
// char *superTableId;
// SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
//
// SDbObj *pDb = NULL;
// if (pConn->pDb != NULL) {
// pDb = mgmtGetDb(pConn->pDb->name);
// }
//
// if (pDb == NULL) {
// return 0;
// }
//
// if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
// if (strcmp(pConn->pUser->user, "root") != 0 && strcmp(pConn->pUser->user, "_root") != 0 &&
// strcmp(pConn->pUser->user, "monitor") != 0) {
// return 0;
// }
// }
//
// char prefix[20] = {0};
// strcpy(prefix, pDb->name);
// strcat(prefix, TS_PATH_DELIMITER);
// int32_t prefixLen = strlen(prefix);
//
// while (numOfRows < rows) {
// void *pNormalTableNode = sdbFetchRow(tsNormalTableSdb, pShow->pNode, (void **) &pTable);
// if (pTable != NULL) {
// SNormalTableObj *pNormalTable = (SNormalTableObj *) pTable;
// pShow->pNode = pNormalTableNode;
// tableId = pNormalTable->tableId;
// superTableId = NULL;
// createdTime = pNormalTable->createdTime;
// numOfColumns = pNormalTable->numOfColumns;
// } else {
// void *pStreamTableNode = sdbFetchRow(tsStreamTableSdb, pShow->pNode, (void **) &pTable);
// if (pTable != NULL) {
// SStreamTableObj *pChildTable = (SStreamTableObj *) pTable;
// pShow->pNode = pStreamTableNode;
// tableId = pChildTable->tableId;
// superTableId = NULL;
// createdTime = pChildTable->createdTime;
// numOfColumns = pChildTable->numOfColumns;
// } else {
// void *pChildTableNode = sdbFetchRow(tsChildTableSdb, pShow->pNode, (void **) &pTable);
// if (pTable != NULL) {
// SChildTableObj *pChildTable = (SChildTableObj *) pTable;
// pShow->pNode = pChildTableNode;
// tableId = pChildTable->tableId;
// superTableId = NULL;
// createdTime = pChildTable->createdTime;
// numOfColumns = pChildTable->superTable->numOfColumns;
// } else {
// break;
// }
// }
// }
//
// // not belong to current db
// if (strncmp(tableId, prefix, prefixLen)) {
// continue;
// }
//
// char meterName[TSDB_TABLE_NAME_LEN] = {0};
// memset(meterName, 0, tListLen(meterName));
// numOfRead++;
//
// // pattern compare for meter name
// extractTableName(tableId, meterName);
//
// if (pShow->payloadLen > 0 &&
// patternMatch(pShow->payload, meterName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH) {
// continue;
// }
//
// cols = 0;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// strncpy(pWrite, meterName, TSDB_TABLE_NAME_LEN);
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int64_t *) pWrite = createdTime;
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int16_t *) pWrite = numOfColumns;
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// if (superTableId != NULL) {
// extractTableName(superTableId, pWrite);
// }
// cols++;
//
// numOfRows++;
// }
//
// pShow->numOfReads += numOfRead;
// const int32_t NUM_OF_COLUMNS = 4;
//
// mgmtVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow);
int32_t numOfRead = 0;
int32_t cols = 0;
void *pTable = NULL;
char *pWrite = NULL;
char prefix[20] = {0};
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
strcpy(prefix, pDb->name);
strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = strlen(prefix);
while (numOfRows < rows) {
int16_t numOfColumns = 0;
int64_t createdTime = 0;
char *tableId = NULL;
char *superTableId = NULL;
void *pNormalTableNode = sdbFetchRow(tsNormalTableSdb, pShow->pNode, (void **) &pTable);
if (pTable != NULL) {
SNormalTableObj *pNormalTable = (SNormalTableObj *) pTable;
pShow->pNode = pNormalTableNode;
tableId = pNormalTable->tableId;
superTableId = NULL;
createdTime = pNormalTable->createdTime;
numOfColumns = pNormalTable->numOfColumns;
} else {
void *pChildTableNode = sdbFetchRow(tsChildTableSdb, pShow->pNode, (void **) &pTable);
if (pTable != NULL) {
SChildTableObj *pChildTable = (SChildTableObj *) pTable;
pShow->pNode = pChildTableNode;
tableId = pChildTable->tableId;
superTableId = pChildTable->superTableId;
createdTime = pChildTable->createdTime;
numOfColumns = pChildTable->superTable->numOfColumns;
} else {
break;
}
}
// not belong to current db
if (strncmp(tableId, prefix, prefixLen)) {
continue;
}
char tableName[TSDB_TABLE_NAME_LEN] = {0};
memset(tableName, 0, tListLen(tableName));
numOfRead++;
// pattern compare for meter name
extractTableName(tableId, tableName);
if (pShow->payloadLen > 0 &&
patternMatch(pShow->payload, tableName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH) {
continue;
}
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strncpy(pWrite, tableName, TSDB_TABLE_NAME_LEN);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *) pWrite = createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *) pWrite = numOfColumns;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (superTableId != NULL) {
extractTableName(superTableId, pWrite);
}
cols++;
numOfRows++;
}
pShow->numOfReads += numOfRead;
const int32_t NUM_OF_COLUMNS = 4;
mgmtVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow);
return numOfRows;
}
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册