提交 81fef75f 编写于 作者: S slguan

#1177

上级 d1f7112e
......@@ -22,6 +22,11 @@ extern "C" {
#include "os.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "sdb.h"
#include "tglobalcfg.h"
#include "thash.h"
......@@ -132,6 +137,72 @@ typedef struct _tab_obj {
// SSchema schema[];
} STabObj;
typedef struct SSuperTableObj {
char tableId[TSDB_TABLE_ID_LEN + 1];
uint64_t uid;
int32_t sid;
int32_t vgId;
int64_t createdTime;
int32_t sversion;
int32_t numOfTags;
int32_t numOfMeters;
int32_t numOfColumns;
int32_t schemaSize;
int8_t reserved[7];
int8_t updateEnd[1];
int16_t nextColId;
pthread_rwlock_t rwLock;
tSkipList * pSkipList;
struct SSuperTableObj *pHead;
struct SSuperTableObj *prev, *next;
int8_t* schema;
} SSuperTableObj;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
char superTableId[TSDB_TABLE_ID_LEN + 1];
uint64_t uid;
int32_t sid;
int32_t vgId;
int64_t createdTime;
int8_t reserved[7];
int8_t updateEnd[1];
SSuperTableObj *superTable;
} SChildTableObj;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
uint64_t uid;
int32_t sid;
int32_t vgId;
int64_t createdTime;
int32_t sversion;
int32_t numOfColumns;
int32_t schemaSize;
char reserved[3];
char updateEnd[1];
int16_t nextColId;
char* schema;
} SNormalTableObj;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
uint64_t uid;
int32_t sid;
int32_t vgId;
int64_t createdTime;
int32_t sversion;
int32_t numOfColumns;
int32_t schemaSize;
char reserved[3];
char updateEnd[1];
int16_t nextColId;
char* pSql; //null-terminated string
char* schema;
} SStreamTableObj;
typedef struct _vg_obj {
uint32_t vgId;
char dbName[TSDB_DB_NAME_LEN];
......
......@@ -136,6 +136,15 @@ void sdbCleanUpPeers();
int sdbCfgNode(char *cont);
int64_t sdbGetVersion();
#define TSDB_MAX_TABLES 1000
extern void* tsChildTableSdb;
extern void* tsNormalTableSdb;
extern void* tsStreamTableSdb;
extern void* tsSuperTableSdb;
#ifdef __cplusplus
}
#endif
......
......@@ -20,6 +20,9 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include "tsdb.h"
#include "taoserror.h"
......@@ -294,6 +297,11 @@ typedef struct SMColumn {
short bytes;
} SMColumn;
typedef struct {
int32_t size;
int8_t* data;
} SVariableMsg;
typedef struct {
short vnode;
int32_t sid;
......@@ -310,6 +318,9 @@ typedef struct {
char reserved[16];
int32_t sversion;
SMColumn schema[];
SVariableMsg tags;
} SCreateMsg;
typedef struct {
......
......@@ -24,27 +24,14 @@ extern "C" {
#include <stdbool.h>
#include "taosdef.h"
struct SSuperTableObj;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
char superTableId[TSDB_TABLE_ID_LEN + 1];
int64_t uid;
int32_t sid;
int32_t vgId;
int64_t createdTime;
int32_t sversion;
char reserved[3];
char updateEnd[1];
struct SSuperTableObj *superTable;
} SChildTableObj;
#include "mnode.h"
int32_t mgmtInitChildTables();
void mgmtCleanUpChildTables();
int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate);
int32_t mgmtDropChildTable(SDbObj *pDb, char *meterId, int ignore);
int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid);
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable);
int32_t mgmtAlterChildTable(SDbObj *pDb, SAlterTableMsg *pAlter);
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent);
SChildTableObj* mgmtGetChildTable(char *tableId);
SSchema* mgmtGetChildTableSchema(SChildTableObj *pTable);
......
......@@ -23,41 +23,16 @@ extern "C" {
#include <stdint.h>
#include <stdbool.h>
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
int64_t uid;
int32_t sid;
int32_t vgId;
int32_t sversion; // schema version
int32_t createdTime;
int32_t numOfTags; // for metric
int32_t numOfMeters; // for metric
int32_t numOfColumns;
int32_t schemaSize;
short nextColId;
char tableType : 4;
char status : 3;
char isDirty : 1; // if the table change tag column 1 value
char reserved[15];
char updateEnd[1];
pthread_rwlock_t rwLock;
tSkipList * pSkipList;
struct _tab_obj *pHead; // for metric, a link list for all meters created
// according to this metric
char *pTagData; // TSDB_TABLE_ID_LEN(metric_name)+
// tags_value1/tags_value2/tags_value3
struct _tab_obj *prev, *next;
char * pSql; // pointer to SQL, for SC, null-terminated string
char * pReserve1;
char * pReserve2;
char * schema;
// SSchema schema[];
} SNormalTableObj;
int32_t mgmtInitSTable();
#include "mnode.h"
int32_t mgmtInitNormalTables();
void mgmtCleanUpNormalTables();
int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid);
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable);
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols);
int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName);
SNormalTableObj* mgmtGetNormalTable(char *tableId);
SSchema* mgmtGetNormalTableSchema(SNormalTableObj *pTable);
#ifdef __cplusplus
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TBASE_MNODE_STABLE_H
#define TBASE_MNODE_STABLE_H
#ifndef TBASE_MNODE_STREAM_TABLE_H
#define TBASE_MNODE_STREAM_TABLE_H
#ifdef __cplusplus
extern "C" {
......@@ -23,41 +23,16 @@ extern "C" {
#include <stdint.h>
#include <stdbool.h>
typedef struct {
char meterId[TSDB_TABLE_ID_LEN + 1];
uint64_t uid;
STableGid gid;
#include "mnode.h"
int32_t sversion; // schema version
int32_t createdTime;
int32_t mgmtInitStreamTables();
void mgmtCleanUpStreamTables();
int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid);
int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable);
int32_t mgmtAlterStreamTable(SDbObj *pDb, SAlterTableMsg *pAlter);
SStreamTableObj* mgmtGetStreamTable(char *tableId);
SSchema* mgmtGetStreamTableSchema(SStreamTableObj *pTable);
int32_t numOfTags; // for metric
int32_t numOfMeters; // for metric
int32_t numOfColumns;
int32_t schemaSize;
short nextColId;
char tableType : 4;
char status : 3;
char isDirty : 1; // if the table change tag column 1 value
char reserved[15];
char updateEnd[1];
pthread_rwlock_t rwLock;
tSkipList * pSkipList;
struct _tab_obj *pHead; // for metric, a link list for all meters created
// according to this metric
char *pTagData; // TSDB_TABLE_ID_LEN(metric_name)+
// tags_value1/tags_value2/tags_value3
struct _tab_obj *prev, *next;
char * pSql; // pointer to SQL, for SC, null-terminated string
char * pReserve1;
char * pReserve2;
char * schema;
// SSchema schema[];
} STabObj;
int32_t mgmtInitSTable();
#ifdef __cplusplus
......
......@@ -24,25 +24,20 @@ extern "C" {
#include <stdbool.h>
#include "taosdef.h"
typedef struct {
char superTableId[TSDB_TABLE_ID_LEN + 1];
int64_t uid;
int32_t sid;
int32_t vgId;
int32_t sversion;
int32_t createdTime;
char reserved[7];
char updateEnd[1];
} SSuperTableObj;
#include "mnode.h"
int32_t mgmtInitSuperTables();
void mgmtCleanUpSuperTables();
int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate);
int32_t mgmtDropSuperTable(SDbObj *pDb, char *meterId, int ignore);
int32_t mgmtAlterSuperTable(SDbObj *pDb, SAlterTableMsg *pAlter);
int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pTable);
SSuperTableObj* mgmtGetSuperTable(char *tableId);
int32_t mgmtFindTagCol(SSuperTableObj *pTable, const char *tagName);
int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags);
int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName);
int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pTable, char *oldTagName, char *newTagName);
int32_t mgmtAddSuperTableColumn(SSuperTableObj *pTable, SSchema schema[], int32_t ncols);
int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pTable, char *colName);
SSchema* mgmtGetSuperTableSchema(SSuperTableObj *pTable);
......
......@@ -25,24 +25,29 @@ extern "C" {
#include <stdint.h>
#include "mnode.h"
typedef struct {
ETableType type;
void* obj;
} STableObj;
int mgmtInitMeters();
STabObj *mgmtGetTable(char *meterId);
STableObj mgmtGetTable(char *tableId);
STabObj *mgmtGetTableInfo(char *src, char *tags[]);
int mgmtRetrieveMetricMeta(SConnObj *pConn, char **pStart, SSuperTableMetaMsg *pInfo);
int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate);
int mgmtDropMeter(SDbObj *pDb, char *meterId, int ignore);
int mgmtAlterMeter(SDbObj *pDb, SAlterTableMsg *pAlter);
int mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate);
int mgmtDropTable(SDbObj *pDb, char *meterId, int ignore);
int mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter);
int mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int mgmtRetrieveMeters(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
int mgmtRetrieveTables(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
void mgmtCleanUpMeters();
SSchema *mgmtGetTableSchema(STabObj *pTable); // get schema for a meter
int32_t mgmtFindTagCol(STabObj * pTable, const char * tagName);
int mgmtAddMeterIntoMetric(STabObj *pMetric, STabObj *pTable);
int mgmtRemoveMeterFromMetric(STabObj *pMetric, STabObj *pTable);
int mgmtGetMetricMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int mgmtRetrieveMetrics(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
int mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
......
......@@ -33,6 +33,10 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int mgmtRetrieveVgroups(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
void mgmtCleanUpVgroups();
SVgObj *mgmtGetAvailVgroup(SDbObj *pDb);
int32_t mgmtAllocateSid(SDbObj *pDb, SVgObj *pVgroup);
#ifdef __cplusplus
}
#endif
......
......@@ -35,3 +35,149 @@
#include "ttime.h"
#include "tstatus.h"
#include "sdb.h"
#include "mgmtChildTable.h"
#include "mgmtSuperTable.h"
int32_t mgmtInitChildTables() {
return 0;
}
void mgmtCleanUpChildTables() {
}
char *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, char *pMsg, int vnode) {
}
int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid) {
int numOfTables = sdbGetNumOfRows(tsChildTableSdb);
if (numOfTables >= tsMaxTables) {
mError("child table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, tsMaxTables);
return TSDB_CODE_TOO_MANY_TABLES;
}
char *pTagData = (char *)pCreate->schema; // it is a tag key
SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData);
if (pSuperTable == NULL) {
mError("table:%s, corresponding super table does not exist", pCreate->meterId);
return TSDB_CODE_INVALID_TABLE;
}
SChildTableObj *pTable = (SChildTableObj *)calloc(sizeof(SChildTableObj), 1);
if (pTable == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
strcpy(pTable->tableId, pCreate->meterId);
strcpy(pTable->superTableId, pSuperTable->tableId);
pTable->createdTime = taosGetTimestampMs();
pTable->superTable = pSuperTable;
pTable->vgId = vgId;
pTable->sid = sid;
pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) +
((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
SVariableMsg tags = {0};
tags.size = mgmtGetTagsLength(pSuperTable, INT_MAX) + (uint32_t)TSDB_TABLE_ID_LEN;
tags.data = (char *)calloc(1, tags.size);
if (tags.data == NULL) {
free(pTable);
mError("table:%s, corresponding super table schema is null", pCreate->meterId);
return TSDB_CODE_INVALID_TABLE;
}
memcpy(tags.data, pTagData, tags.size);
if (sdbInsertRow(tsStreamTableSdb, pTable, 0) < 0) {
mError("table:%s, update sdb error", pCreate->meterId);
return TSDB_CODE_SDB_ERROR;
}
mgmtAddTimeSeries(pTable->superTable->numOfColumns - 1);
mgmtSendCreateMsgToVgroup(pTable, pVgroup);
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s",
pTable->tableId, vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
return 0;
}
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) {
SVgObj *pVgroup;
SAcctObj *pAcct;
pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct != NULL) {
pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1);
}
pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
return TSDB_CODE_OTHERS;
}
mgmtRestoreTimeSeries(pTable->superTable->numOfColumns - 1);
mgmtSendRemoveMeterMsgToDnode(pTable, pVgroup);
sdbDeleteRow(tsChildTableSdb, pTable);
if (pVgroup->numOfMeters <= 0) {
mgmtDropVgroup(pDb, pVgroup);
}
return 0;
}
SChildTableObj* mgmtGetChildTable(char *tableId) {
return (SChildTableObj *)sdbGetRow(tsChildTableSdb, tableId);
}
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent) {
int col = mgmtFindTagCol(pTable->superTable, tagName);
if (col < 0 || col > pTable->superTable->numOfTags) {
return TSDB_CODE_APP_ERROR;
}
//TODO send msg to dnode
mTrace("Succeed to modify tag column %d of table %s", col, pTable->tableId);
return TSDB_CODE_SUCCESS;
// int rowSize = 0;
// SSchema *schema = (SSchema *)(pSuperTable->schema + (pSuperTable->numOfColumns + col) * sizeof(SSchema));
//
// if (col == 0) {
// pTable->isDirty = 1;
// removeMeterFromMetricIndex(pSuperTable, pTable);
// }
// memcpy(pTable->pTagData + mgmtGetTagsLength(pMetric, col) + TSDB_TABLE_ID_LEN, nContent, schema->bytes);
// if (col == 0) {
// addMeterIntoMetricIndex(pMetric, pTable);
// }
//
// // Encode the string
// int size = sizeof(STabObj) + TSDB_MAX_BYTES_PER_ROW + 1;
// char *msg = (char *)malloc(size);
// if (msg == NULL) {
// mError("failed to allocate message memory while modify tag value");
// return TSDB_CODE_APP_ERROR;
// }
// memset(msg, 0, size);
//
// mgmtMeterActionEncode(pTable, msg, size, &rowSize);
//
// int32_t ret = sdbUpdateRow(meterSdb, msg, rowSize, 1); // Need callback function
// tfree(msg);
//
// if (pTable->isDirty) pTable->isDirty = 0;
//
// if (ret < 0) {
// mError("Failed to modify tag column %d of table %s", col, pTable->meterId);
// return TSDB_CODE_APP_ERROR;
// }
//
// mTrace("Succeed to modify tag column %d of table %s", col, pTable->meterId);
// return TSDB_CODE_SUCCESS;
}
......@@ -282,7 +282,7 @@ void mgmtDropDbFromSdb(SDbObj *pDb) {
STabObj *pMetric = pDb->pMetric;
while (pMetric) {
STabObj *pNext = pMetric->next;
mgmtDropMeter(pDb, pMetric->meterId, 0);
mgmtDropTable(pDb, pMetric->meterId, 0);
pMetric = pNext;
}
......@@ -324,7 +324,7 @@ int mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists) {
}
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
return TSDB_CODE_MONITOR_DB_FORBEIDDEN;
return TSDB_CODE_MONITOR_DB_FORBEIDEN;
}
return mgmtDropDb(pDb);
......
......@@ -229,7 +229,7 @@ char *mgmtBuildCreateMeterIe(STabObj *pTable, char *pMsg, int vnode) {
return pMsg;
}
int mgmtSendCreateMsgToVgroup(STabObj *pTable, SVgObj *pVgroup) {
int mgmtSendCreateMsgToVgroup(STabObj table, SVgObj *pVgroup) {
char * pMsg, *pStart;
int i, msgLen = 0;
SDnodeObj *pObj;
......
......@@ -35,3 +35,184 @@
#include "ttime.h"
#include "tstatus.h"
#include "sdb.h"
#include "mgmtNormalTable.h"
int32_t mgmtInitNormalTables() {
return 0;
}
void mgmtCleanUpNormalTables() {
sdbCloseTable(tsNormalTableSdb);
}
int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid) {
int numOfTables = sdbGetNumOfRows(tsChildTableSdb);
if (numOfTables >= TSDB_MAX_TABLES) {
mError("normal table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, TSDB_MAX_TABLES);
return TSDB_CODE_TOO_MANY_TABLES;
}
SNormalTableObj *pTable = (SNormalTableObj *)calloc(sizeof(SNormalTableObj), 1);
if (pTable == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
strcpy(pTable->tableId, pCreate->meterId);
pTable->createdTime = taosGetTimestampMs();
pTable->vgId = vgId;
pTable->sid = sid;
pTable->uid = (((uint64_t)pTable->createdTime) << 16) + ((uint64_t)sdbGetVersion() & ((1ul << 16) - 1ul));
pTable->sversion = 0;
pTable->numOfColumns = pCreate->numOfColumns;
int numOfCols = pCreate->numOfColumns + pCreate->numOfTags;
pTable->schemaSize = numOfCols * sizeof(SSchema);
pTable->schema = (int8_t *)calloc(1, pTable->schemaSize);
if (pTable->schema == NULL) {
free(pTable);
mError("table:%s, no schema input", pCreate->meterId);
return TSDB_CODE_INVALID_TABLE;
}
memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
pTable->nextColId = 0;
for (int col = 0; col < pCreate->numOfColumns; col++) {
SSchema *tschema = (SSchema *)pTable->schema;
tschema[col].colId = pTable->nextColId++;
}
if (sdbInsertRow(tsNormalTableSdb, pTable, 0) < 0) {
mError("table:%s, update sdb error", pCreate->meterId);
return TSDB_CODE_SDB_ERROR;
}
// mTrace("table:%s, send create table msg to dnode, vgId:%d, sid:%d, vnode:%d",
// pTable->meterId, pTable->gid.vgId, pTable->gid.sid, pVgroup->vnodeGid[0].vnode);
//
// mgmtAddTimeSeries(pTable->numOfColumns - 1);
// mgmtSendCreateMsgToVgroup(pTable, pVgroup);
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s",
pTable->meterId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
return 0;
}
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) {
SVgObj *pVgroup;
SAcctObj *pAcct;
pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct != NULL) {
pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1);
}
pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
return TSDB_CODE_OTHERS;
}
mgmtRestoreTimeSeries(pTable->numOfColumns - 1);
mgmtSendRemoveMeterMsgToDnode(pTable, pVgroup);
sdbDeleteRow(tsChildTableSdb, pTable);
if (pVgroup->numOfMeters <= 0) {
mgmtDropVgroup(pDb, pVgroup);
}
return 0;
}
SNormalTableObj* mgmtGetNormalTable(char *tableId) {
return (SNormalTableObj *)sdbGetRow(tsNormalTableSdb, tableId);
}
static int32_t mgmtFindNormalTableColumnIndex(SNormalTableObj *pTable, char *colName) {
SSchema *schema = (SSchema *) pTable->schema;
for (int32_t i = 0; i < pTable->numOfColumns; i++) {
if (strcasecmp(schema[i].name, colName) == 0) {
return i;
}
}
return -1;
}
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int ncols) {
if (ncols <= 0) {
return TSDB_CODE_APP_ERROR;
}
for (int i = 0; i < ncols; i++) {
if (mgmtFindNormalTableColumnIndex(pTable, schema[i].name) > 0) {
return TSDB_CODE_APP_ERROR;
}
}
SDbObj *pDb = mgmtGetDbByMeterId(pTable->tableId);
if (pDb == NULL) {
mError("table: %s not belongs to any database", pTable->tableId);
return TSDB_CODE_APP_ERROR;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("DB: %s not belongs to andy account", pDb->name);
return TSDB_CODE_APP_ERROR;
}
pTable->schema = realloc(pTable->schema, pTable->schemaSize + sizeof(SSchema) * ncols);
memcpy(pTable->schema + pTable->schemaSize, schema, sizeof(SSchema) * ncols);
SSchema *tschema = (SSchema *) (pTable->schema + sizeof(SSchema) * pTable->numOfColumns);
for (int i = 0; i < ncols; i++) {
tschema[i].colId = pTable->nextColId++;
}
pTable->schemaSize += sizeof(SSchema) * ncols;
pTable->numOfColumns += ncols;
pTable->sversion++;
pAcct->acctInfo.numOfTimeSeries += ncols;
sdbUpdateRow(tsNormalTableSdb, pTable, 0, 1);
return TSDB_CODE_SUCCESS;
}
int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName) {
int32_t col = mgmtFindNormalTableColumnIndex(pTable, colName);
if (col < 0) {
return TSDB_CODE_APP_ERROR;
}
SDbObj *pDb = mgmtGetDbByMeterId(pTable->tableId);
if (pDb == NULL) {
mError("table: %s not belongs to any database", pTable->tableId);
return TSDB_CODE_APP_ERROR;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("DB: %s not belongs to any account", pDb->name);
return TSDB_CODE_APP_ERROR;
}
memmove(pTable->schema + sizeof(SSchema) * col, pTable->schema + sizeof(SSchema) * (col + 1),
sizeof(SSchema) * (pTable->numOfColumns - col - 1));
pTable->schemaSize -= sizeof(SSchema);
pTable->numOfColumns--;
pTable->schema = realloc(pTable->schema, pTable->schemaSize);
pTable->sversion++;
pAcct->acctInfo.numOfTimeSeries--;
sdbUpdateRow(tsNormalTableSdb, pTable, 0, 1);
return TSDB_CODE_SUCCESS;
}
......@@ -230,7 +230,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
pCreateMsg->meterId, pDb, pDb->name, pMeterDb, pMeterDb->name);
assert(pDb == pMeterDb);
int32_t code = mgmtCreateMeter(pDb, pCreateMsg);
int32_t code = mgmtCreateTable(pDb, pCreateMsg);
char stableName[TSDB_TABLE_ID_LEN] = {0};
strncpy(stableName, pInfo->tags, TSDB_TABLE_ID_LEN);
......@@ -896,7 +896,7 @@ static void mgmtInitShowMsgFp() {
mgmtGetMetaFp[TSDB_MGMT_TABLE_DNODE] = mgmtGetDnodeMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_MNODE] = mgmtGetMnodeMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_VGROUP] = mgmtGetVgroupMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_METRIC] = mgmtGetMetricMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_METRIC] = mgmtGetSuperTableMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_MODULE] = mgmtGetModuleMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_QUERIES] = mgmtGetQueryMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_STREAMS] = mgmtGetStreamMeta;
......@@ -910,11 +910,11 @@ static void mgmtInitShowMsgFp() {
mgmtRetrieveFp[TSDB_MGMT_TABLE_ACCT] = mgmtRetrieveAccts;
mgmtRetrieveFp[TSDB_MGMT_TABLE_USER] = mgmtRetrieveUsers;
mgmtRetrieveFp[TSDB_MGMT_TABLE_DB] = mgmtRetrieveDbs;
mgmtRetrieveFp[TSDB_MGMT_TABLE_TABLE] = mgmtRetrieveMeters;
mgmtRetrieveFp[TSDB_MGMT_TABLE_TABLE] = mgmtRetrieveTables;
mgmtRetrieveFp[TSDB_MGMT_TABLE_DNODE] = mgmtRetrieveDnodes;
mgmtRetrieveFp[TSDB_MGMT_TABLE_MNODE] = mgmtRetrieveMnodes;
mgmtRetrieveFp[TSDB_MGMT_TABLE_VGROUP] = mgmtRetrieveVgroups;
mgmtRetrieveFp[TSDB_MGMT_TABLE_METRIC] = mgmtRetrieveMetrics;
mgmtRetrieveFp[TSDB_MGMT_TABLE_METRIC] = mgmtRetrieveSuperTables;
mgmtRetrieveFp[TSDB_MGMT_TABLE_MODULE] = mgmtRetrieveModules;
mgmtRetrieveFp[TSDB_MGMT_TABLE_QUERIES] = mgmtRetrieveQueries;
mgmtRetrieveFp[TSDB_MGMT_TABLE_STREAMS] = mgmtRetrieveStreams;
......@@ -1097,7 +1097,7 @@ int mgmtProcessCreateTableMsg(char *pMsg, int msgLen, SConnObj *pConn) {
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
if (pDb) {
code = mgmtCreateMeter(pDb, pCreate);
code = mgmtCreateTable(pDb, pCreate);
} else {
code = TSDB_CODE_DB_NOT_SELECTED;
}
......@@ -1139,7 +1139,7 @@ int mgmtProcessDropTableMsg(char *pMsg, int msgLen, SConnObj *pConn) {
SDbObj *pDb = NULL;
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
code = mgmtDropMeter(pDb, pDrop->meterId, pDrop->igNotExists);
code = mgmtDropTable(pDb, pDrop->meterId, pDrop->igNotExists);
if (code == 0) {
mTrace("meter:%s is dropped by user:%s", pDrop->meterId, pConn->pUser->user);
// mLPrint("meter:%s is dropped by user:%s", pDrop->meterId, pConn->pUser->user);
......@@ -1177,7 +1177,7 @@ int mgmtProcessAlterTableMsg(char *pMsg, int msgLen, SConnObj *pConn) {
pAlter->schema[i].bytes = htons(pAlter->schema[i].bytes);
}
code = mgmtAlterMeter(pDb, pAlter);
code = mgmtAlterTable(pDb, pAlter);
if (code == 0) {
mLPrint("meter:%s is altered by %s", pAlter->meterId, pConn->pUser->user);
}
......
......@@ -23,7 +23,7 @@
#include "mgmtDb.h"
#include "mgmtDnodeInt.h"
#include "mgmtVgroup.h"
#include "mgmtSupertableQuery.h"
#include "mgmtStreamtableQuery.h"
#include "mgmtTable.h"
#include "taosmsg.h"
#include "tast.h"
......@@ -35,3 +35,103 @@
#include "ttime.h"
#include "tstatus.h"
#include "sdb.h"
#include "mgmtStreamTable.h"
int32_t mgmtInitStreamTables() {
return 0;
}
void mgmtCleanUpStreamTables() {
}
int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid) {
int numOfTables = sdbGetNumOfRows(tsStreamTableSdb);
if (numOfTables >= TSDB_MAX_TABLES) {
mError("stream table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, TSDB_MAX_TABLES);
return TSDB_CODE_TOO_MANY_TABLES;
}
SStreamTableObj *pTable = (SStreamTableObj *)calloc(sizeof(SStreamTableObj), 1);
if (pTable == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
strcpy(pTable->tableId, pCreate->meterId);
pTable->createdTime = taosGetTimestampMs();
pTable->vgId = vgId;
pTable->sid = sid;
pTable->uid = (((uint64_t)pTable->createdTime) << 16) + ((uint64_t)sdbGetVersion() & ((1ul << 16) - 1ul));
pTable->sversion = 0;
pTable->numOfColumns = pCreate->numOfColumns;
int numOfCols = pCreate->numOfColumns + pCreate->numOfTags;
pTable->schemaSize = numOfCols * sizeof(SSchema) + pCreate->sqlLen;
pTable->schema = (int8_t *)calloc(1, pTable->schemaSize);
if (pTable->schema == NULL) {
free(pTable);
mError("table:%s, no schema input", pCreate->meterId);
return TSDB_CODE_INVALID_TABLE;
}
memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
pTable->nextColId = 0;
for (int col = 0; col < pCreate->numOfColumns; col++) {
SSchema *tschema = (SSchema *)pTable->schema;
tschema[col].colId = pTable->nextColId++;
}
pTable->pSql = pTable->schema + numOfCols * sizeof(SSchema);
memcpy(pTable->pSql, (char *)(pCreate->schema) + numOfCols * sizeof(SSchema), pCreate->sqlLen);
pTable->pSql[pCreate->sqlLen - 1] = 0;
mTrace("table:%s, stream sql len:%d sql:%s", pCreate->meterId, pCreate->sqlLen, pTable->pSql);
if (sdbInsertRow(tsStreamTableSdb, pTable, 0) < 0) {
mError("table:%s, update sdb error", pCreate->meterId);
return TSDB_CODE_SDB_ERROR;
}
// mTrace("table:%s, send create table msg to dnode, vgId:%d, sid:%d, vnode:%d",
// pTable->meterId, pTable->gid.vgId, pTable->gid.sid, pVgroup->vnodeGid[0].vnode);
//
// mgmtAddTimeSeries(pTable->numOfColumns - 1);
// mgmtSendCreateMsgToVgroup(pTable, pVgroup);
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s",
pTable->meterId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
return 0;
}
int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable) {
SVgObj * pVgroup;
SAcctObj *pAcct;
pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct != NULL) {
pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1);
}
pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
return TSDB_CODE_OTHERS;
}
mgmtRestoreTimeSeries(pTable->numOfColumns - 1);
mgmtSendRemoveMeterMsgToDnode(pTable, pVgroup);
sdbDeleteRow(tsChildTableSdb, pTable);
if (pVgroup->numOfMeters <= 0) {
mgmtDropVgroup(pDb, pVgroup);
}
return 0;
}
SStreamTableObj* mgmtGetStreamTable(char *tableId); {
return (SStreamTableObj *)sdbGetRow(tsStreamTableSdb, tableId);
}
\ No newline at end of file
......@@ -35,3 +35,439 @@
#include "ttime.h"
#include "tstatus.h"
#include "sdb.h"
#include "mgmtSuperTable.h"
#include "mgmtChildTable.h"
#define mgmtDestroyMeter(pMetric) \
do { \
tfree(pMetric->schema); \
pMetric->pSkipList = tSkipListDestroy((pMetric)->pSkipList); \
tfree(pMetric); \
} while (0)
typedef struct {
char meterId[TSDB_TABLE_ID_LEN + 1];
char type;
uint32_t cols;
char data[];
} SMeterBatchUpdateMsg;
typedef struct {
int32_t col;
int32_t pos;
SSchema schema;
} SchemaUnit;
typedef struct {
char meterId[TSDB_TABLE_ID_LEN + 1];
char action;
int32_t dataSize;
char data[];
} SMeterUpdateMsg;
int32_t mgmtInitSuperTables() {
return 0;
}
void mgmtCleanUpSuperTables() {
}
int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate) {
int numOfTables = sdbGetNumOfRows(tsSuperTableSdb);
if (numOfTables >= TSDB_MAX_TABLES) {
mError("super table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, TSDB_MAX_TABLES);
return TSDB_CODE_TOO_MANY_TABLES;
}
SSuperTableObj *pMetric = (SSuperTableObj *)calloc(sizeof(SSuperTableObj), 1);
if (pMetric == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
strcpy(pMetric->tableId, pCreate->meterId);
pMetric->createdTime = taosGetTimestampMs();
pMetric->vgId = 0;
pMetric->sid = 0;
pMetric->uid = (((uint64_t)pMetric->createdTime) << 16) + ((uint64_t)sdbGetVersion() & ((1ul << 16) - 1ul));
pMetric->sversion = 0;
pMetric->numOfColumns = pCreate->numOfColumns;
pMetric->numOfTags = pCreate->numOfTags;
pMetric->numOfMeters = 0;
int numOfCols = pCreate->numOfColumns + pCreate->numOfTags;
pMetric->schemaSize = numOfCols * sizeof(SSchema);
pMetric->schema = (int8_t *)calloc(1, pMetric->schemaSize);
if (pMetric->schema == NULL) {
free(pMetric);
mError("table:%s, no schema input", pCreate->meterId);
return TSDB_CODE_INVALID_TABLE;
}
memcpy(pMetric->schema, pCreate->schema, numOfCols * sizeof(SSchema));
pMetric->nextColId = 0;
for (int col = 0; col < pCreate->numOfColumns; col++) {
SSchema *tschema = (SSchema *)pMetric->schema;
tschema[col].colId = pMetric->nextColId++;
}
if (sdbInsertRow(tsSuperTableSdb, pMetric, 0) < 0) {
mError("table:%s, update sdb error", pCreate->meterId);
return TSDB_CODE_SDB_ERROR;
}
return 0;
}
int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pSuperTable) {
SChildTableObj *pMetric;
while ((pMetric = pSuperTable->pHead) != NULL) {
mgmtDropChildTable(pDb, pMetric);
}
sdbDeleteRow(tsSuperTableSdb, pMetric);
}
SSuperTableObj* mgmtGetSuperTable(char *tableId) {
return (SSuperTableObj *)sdbGetRow(tsSuperTableSdb, tableId);
}
int32_t mgmtFindTagCol(SSuperTableObj *pMetric, const char *tagName) {
for (int i = 0; i < pMetric->numOfTags; i++) {
SSchema *schema = (SSchema *)(pMetric->schema + (pMetric->numOfColumns + i) * sizeof(SSchema));
if (strcasecmp(tagName, schema->name) == 0) {
return i;
}
}
return -1;
}
int32_t mgmtAddSuperTableTag(SSuperTableObj *pMetric, SSchema schema[], int32_t ntags) {
if (pMetric->numOfTags + ntags > TSDB_MAX_TAGS) {
return TSDB_CODE_APP_ERROR;
}
// check if schemas have the same name
for (int i = 1; i < ntags; i++) {
for (int j = 0; j < i; j++) {
if (strcasecmp(schema[i].name, schema[j].name) == 0) {
return TSDB_CODE_APP_ERROR;
}
}
}
for (int i = 0; i < ntags; i++) {
if (mgmtFindTagCol(pMetric, schema[i].name) >= 0) {
return TSDB_CODE_APP_ERROR;
}
}
uint32_t size = sizeof(SMeterBatchUpdateMsg) + sizeof(SSchema) * ntags;
SMeterBatchUpdateMsg *msg = (SMeterBatchUpdateMsg *) malloc(size);
memset(msg, 0, size);
memcpy(msg->meterId, pMetric->tableId, TSDB_TABLE_ID_LEN);
msg->type = SDB_TYPE_INSERT;
msg->cols = ntags;
memcpy(msg->data, schema, sizeof(SSchema) * ntags);
int32_t ret = sdbBatchUpdateRow(tsSuperTableSdb, msg, size);
tfree(msg);
if (ret < 0) {
mError("Failed to add tag column %s to table %s", schema[0].name, pMetric->tableId);
return TSDB_CODE_APP_ERROR;
}
mTrace("Succeed to add tag column %s to table %s", schema[0].name, pMetric->tableId);
return TSDB_CODE_SUCCESS;
}
int32_t mgmtDropSuperTableTag(SSuperTableObj *pMetric, char *tagName) {
int col = mgmtFindTagCol(pMetric, tagName);
if (col <= 0 || col >= pMetric->numOfTags) {
return TSDB_CODE_APP_ERROR;
}
// Pack message to do batch update
uint32_t size = sizeof(SMeterBatchUpdateMsg) + sizeof(SchemaUnit);
SMeterBatchUpdateMsg *msg = (SMeterBatchUpdateMsg *) malloc(size);
memset(msg, 0, size);
memcpy(msg->meterId, pMetric->tableId, TSDB_TABLE_ID_LEN);
msg->type = SDB_TYPE_DELETE;
msg->cols = 1;
((SchemaUnit *) (msg->data))->col = col;
((SchemaUnit *) (msg->data))->pos = mgmtGetTagsLength(pMetric, col) + TSDB_TABLE_ID_LEN;
((SchemaUnit *) (msg->data))->schema = *(SSchema *) (pMetric->schema + sizeof(SSchema) * (pMetric->numOfColumns + col));
int32_t ret = sdbBatchUpdateRow(tsSuperTableSdb, msg, size);
tfree(msg);
if (ret < 0) {
mError("Failed to drop tag column: %d from table: %s", col, pMetric->tableId);
return TSDB_CODE_APP_ERROR;
}
mTrace("Succeed to drop tag column: %d from table: %s", col, pMetric->tableId);
return TSDB_CODE_SUCCESS;
}
int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pMetric, char *oldTagName, char *newTagName) {
int col = mgmtFindTagCol(pMetric, oldTagName);
if (col < 0) {
// Tag name does not exist
mError("Failed to modify table %s tag column, oname: %s, nname: %s", pMetric->tableId, oldTagName, newTagName);
return TSDB_CODE_INVALID_MSG_TYPE;
}
int rowSize = 0;
uint32_t len = strlen(newTagName);
if (col >= pMetric->numOfTags || len >= TSDB_COL_NAME_LEN || mgmtFindTagCol(pMetric, newTagName) >= 0) {
return TSDB_CODE_APP_ERROR;
}
// update
SSchema *schema = (SSchema *) (pMetric->schema + (pMetric->numOfColumns + col) * sizeof(SSchema));
strncpy(schema->name, newTagName, TSDB_COL_NAME_LEN);
// Encode string
int size = 1 + sizeof(STabObj) + TSDB_MAX_BYTES_PER_ROW;
char *msg = (char *) malloc(size);
if (msg == NULL) return TSDB_CODE_APP_ERROR;
memset(msg, 0, size);
mgmtMeterActionEncode(pMetric, msg, size, &rowSize);
int32_t ret = sdbUpdateRow(tsSuperTableSdb, msg, rowSize, 1);
tfree(msg);
if (ret < 0) {
mError("Failed to modify table %s tag column", pMetric->tableId);
return TSDB_CODE_APP_ERROR;
}
mTrace("Succeed to modify table %s tag column", pMetric->tableId);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtFindSuperTableColumnIndex(SNormalTableObj *pMetric, char *colName) {
SSchema *schema = (SSchema *) pMetric->schema;
for (int32_t i = 0; i < pMetric->numOfColumns; i++) {
if (strcasecmp(schema[i].name, colName) == 0) {
return i;
}
}
return -1;
}
int32_t mgmtAddSuperTableColumn(SSuperTableObj *pMetric, SSchema schema[], int ncols) {
if (ncols <= 0) {
return TSDB_CODE_APP_ERROR;
}
for (int i = 0; i < ncols; i++) {
if (mgmtFindSuperTableColumnIndex(pMetric, schema[i].name) > 0) {
return TSDB_CODE_APP_ERROR;
}
}
SDbObj *pDb = mgmtGetDbByMeterId(pMetric->tableId);
if (pDb == NULL) {
mError("meter: %s not belongs to any database", pMetric->tableId);
return TSDB_CODE_APP_ERROR;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("DB: %s not belongs to andy account", pDb->name);
return TSDB_CODE_APP_ERROR;
}
pMetric->schema = realloc(pMetric->schema, pMetric->schemaSize + sizeof(SSchema) * ncols);
memmove(pMetric->schema + sizeof(SSchema) * (pMetric->numOfColumns + ncols),
pMetric->schema + sizeof(SSchema) * pMetric->numOfColumns, sizeof(SSchema) * pMetric->numOfTags);
memcpy(pMetric->schema + sizeof(SSchema) * pMetric->numOfColumns, schema, sizeof(SSchema) * ncols);
SSchema *tschema = (SSchema *) (pMetric->schema + sizeof(SSchema) * pMetric->numOfColumns);
for (int i = 0; i < ncols; i++) {
tschema[i].colId = pMetric->nextColId++;
}
pMetric->schemaSize += sizeof(SSchema) * ncols;
pMetric->numOfColumns += ncols;
pMetric->sversion++;
pAcct->acctInfo.numOfTimeSeries += (ncols * pMetric->numOfMeters);
sdbUpdateRow(tsSuperTableSdb, pMetric, 0, 1);
return TSDB_CODE_SUCCESS;
}
int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pMetric, char *colName) {
int32_t col = mgmtFindSuperTableColumnIndex(pMetric, colName);
if (col < 0) {
return TSDB_CODE_APP_ERROR;
}
SDbObj *pDb = mgmtGetDbByMeterId(pMetric->tableId);
if (pDb == NULL) {
mError("table: %s not belongs to any database", pMetric->tableId);
return TSDB_CODE_APP_ERROR;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("DB: %s not belongs to any account", pDb->name);
return TSDB_CODE_APP_ERROR;
}
memmove(pMetric->schema + sizeof(SSchema) * col, pMetric->schema + sizeof(SSchema) * (col + 1),
sizeof(SSchema) * (pMetric->numOfColumns + pMetric->numOfTags - col - 1));
pMetric->schemaSize -= sizeof(SSchema);
pMetric->numOfColumns--;
pMetric->schema = realloc(pMetric->schema, pMetric->schemaSize);
pMetric->sversion++;
pAcct->acctInfo.numOfTimeSeries -= (pMetric->numOfMeters);
sdbUpdateRow(tsSuperTableSdb, pMetric, 0, 1);
return TSDB_CODE_SUCCESS;
}
int mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int cols = 0;
SDbObj *pDb = NULL;
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
SSchema *pSchema = tsGetSchema(pMeta);
pShow->bytes[cols] = TSDB_METER_NAME_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created_time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "columns");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "tags");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "tables");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->numOfRows = pDb->numOfMetrics;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
return 0;
}
int mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
int numOfRows = 0;
char * pWrite;
int cols = 0;
SSuperTableObj *pTable = NULL;
char prefix[20] = {0};
int32_t prefixLen;
SDbObj *pDb = NULL;
if (pConn->pDb != NULL) {
pDb = mgmtGetDb(pConn->pDb->name);
}
if (pDb == NULL) {
return 0;
}
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
if (strcmp(pConn->pUser->user, "root") != 0 && strcmp(pConn->pUser->user, "_root") != 0 && strcmp(pConn->pUser->user, "monitor") != 0 ) {
return 0;
}
}
strcpy(prefix, pDb->name);
strcat(prefix, TS_PATH_DELIMITER);
prefixLen = strlen(prefix);
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
char metricName[TSDB_METER_NAME_LEN] = {0};
while (numOfRows < rows) {
pTable = (SSuperTableObj *)pShow->pNode;
if (pTable == NULL) break;
pShow->pNode = (void *)pTable->next;
if (strncmp(pTable->tableId, prefix, prefixLen)) {
continue;
}
memset(metricName, 0, tListLen(metricName));
extractTableName(pTable->tableId, metricName);
if (pShow->payloadLen > 0 &&
patternMatch(pShow->payload, metricName, TSDB_METER_NAME_LEN, &info) != TSDB_PATTERN_MATCH)
continue;
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
extractTableName(pTable->tableId, pWrite);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pTable->createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pTable->numOfColumns;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pTable->numOfTags;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pTable->numOfMeters;
cols++;
numOfRows++;
}
pShow->numOfReads += numOfRows;
return numOfRows;
}
\ No newline at end of file
......@@ -40,43 +40,10 @@
#include "mgmtNormalTable.h"
#include "mgmtStreamTable.h"
#include "taoserror.h"
extern int64_t sdbVersion;
#define mgmtDestroyMeter(pTable) \
do { \
tfree(pTable->schema); \
pTable->pSkipList = tSkipListDestroy((pTable)->pSkipList); \
tfree(pTable); \
} while (0)
enum _Meter_Update_Action {
METER_UPDATE_TAG_NAME,
METER_UPDATE_TAG_VALUE,
METER_UPDATE_TAG_VALUE_COL0,
METER_UPDATE_NULL,
MAX_METER_UPDATE_ACTION
};
typedef struct {
int32_t col;
int32_t pos;
SSchema schema;
} SchemaUnit;
typedef struct {
char meterId[TSDB_TABLE_ID_LEN + 1];
char type;
uint32_t cols;
char data[];
} SMeterBatchUpdateMsg;
typedef struct {
char meterId[TSDB_TABLE_ID_LEN + 1];
char action;
int32_t dataSize;
char data[];
} SMeterUpdateMsg;
void *meterSdb = NULL;
void *(*mgmtMeterActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize);
......@@ -532,216 +499,91 @@ int mgmtInitMeters() {
return 0;
}
STabObj *mgmtGetTable(char *meterId) { return (STabObj *)sdbGetRow(meterSdb, meterId); }
STableObj mgmtGetTable(char *tableId) {
STableObj table = {.type = TSDB_TABLE_TYPE_MAX, .obj = NULL};
int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
STabObj * pTable = NULL;
STabObj * pMetric = NULL;
SVgObj * pVgroup = NULL;
int size = 0;
SAcctObj *pAcct = NULL;
int numOfTables = sdbGetNumOfRows(meterSdb);
if (numOfTables >= tsMaxTables) {
mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, tsMaxTables);
return TSDB_CODE_TOO_MANY_TABLES;
table.obj = mgmtGetSuperTable(tableId);
if (table.obj != NULL) {
table.type = TSDB_TABLE_TYPE_SUPER_TABLE;
return table;
}
pAcct = mgmtGetAcct(pDb->cfg.acct);
assert(pAcct != NULL);
int code = mgmtCheckTableLimit(pAcct, pCreate);
if (code != 0) {
mError("table:%s, exceed the limit", pCreate->meterId);
return code;
table.obj = mgmtGetNormalTable(tableId);
if (table.obj != NULL) {
table.type = TSDB_TABLE_TYPE_NORMAL_TABLE;
return table;
}
// does table exist?
pTable = mgmtGetTable(pCreate->meterId);
if (pTable) {
if (pCreate->igExists) {
return TSDB_CODE_SUCCESS;
} else {
return TSDB_CODE_TABLE_ALREADY_EXIST;
}
table.obj = mgmtGetStreamTable(tableId);
if (table.obj != NULL) {
table.type = TSDB_TABLE_TYPE_STREAM_TABLE;
return table;
}
// Create the table object
pTable = (STabObj *)malloc(sizeof(STabObj));
if (pTable == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY;
memset(pTable, 0, sizeof(STabObj));
//
ETableType tableType = TSDB_TABLE_TYPE_MAX;
char *tagData = NULL;
int32_t tagDataSize = 0;
if (pCreate->numOfColumns == 0 && pCreate->numOfTags == 0) {
tableType = TSDB_TABLE_TYPE_CHILD_TABLE;
tagData = (char *)pCreate->schema; // it is a tag key
SSuperTableObj *pSuperTable = mgmtGetSuperTable(tagData);
if (pSuperTable == NULL) {
mError("table:%s, corresponding super table does not exist", pCreate->meterId);
return TSDB_CODE_INVALID_TABLE;
}
tagDataSize = mgmtGetTagsLength(pMetric, INT_MAX) + (uint32_t)TSDB_TABLE_ID_LEN;
/*
* for meters created according to metrics, the schema of this meter isn't needed.
* so, we don't allocate memory for it in order to save a huge amount of
* memory when a large amount of meters are created according to this super table.
*/
size = mgmtGetTagsLength(pMetric, INT_MAX) + (uint32_t)TSDB_TABLE_ID_LEN;
pTable->schema = (char *)malloc(size);
if (pTable->schema == NULL) {
mgmtDestroyMeter(pTable);
mError("table:%s, corresponding super table schema is null", pCreate->meterId);
return TSDB_CODE_INVALID_TABLE;
}
memset(pTable->schema, 0, size);
pTable->schemaSize = size;
pTable->numOfColumns = pMetric->numOfColumns;
pTable->sversion = pMetric->sversion;
pTable->pTagData = pTable->schema;
pTable->nextColId = pMetric->nextColId;
memcpy(pTable->pTagData, pTagData, size);
} else {
int numOfCols = pCreate->numOfColumns + pCreate->numOfTags;
size = numOfCols * sizeof(SSchema) + pCreate->sqlLen;
pTable->schema = (char *)malloc(size);
if (pTable->schema == NULL) {
mgmtDestroyMeter(pTable);
mError("table:%s, no schema input", pCreate->meterId);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
memset(pTable->schema, 0, size);
pTable->numOfColumns = pCreate->numOfColumns;
pTable->sversion = 0;
pTable->numOfTags = pCreate->numOfTags;
pTable->schemaSize = size;
memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
table.obj = mgmtGetNormalTable(tableId);
if (table.obj != NULL) {
table.type = TSDB_TABLE_TYPE_CHILD_TABLE;
return table;
}
for (int k = 0; k < pCreate->numOfColumns; k++) {
SSchema *tschema = (SSchema *)pTable->schema;
tschema[k].colId = pTable->nextColId++;
}
return table;
}
if (pCreate->sqlLen > 0) {
pTable->tableType = TSDB_TABLE_TYPE_STREAM_TABLE;
pTable->pSql = pTable->schema + numOfCols * sizeof(SSchema);
memcpy(pTable->pSql, (char *)(pCreate->schema) + numOfCols * sizeof(SSchema), pCreate->sqlLen);
pTable->pSql[pCreate->sqlLen - 1] = 0;
mTrace("table:%s, stream sql len:%d sql:%s", pCreate->meterId, pCreate->sqlLen, pTable->pSql);
int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) {
STableObj table = mgmtGetTable(pCreate->meterId);
if (table.obj != NULL) {
if (pCreate->igExists) {
return TSDB_CODE_SUCCESS;
} else {
if (pCreate->numOfTags > 0) {
pTable->tableType = TSDB_TABLE_TYPE_SUPER_TABLE;
} else {
pTable->tableType = TSDB_TABLE_TYPE_NORMAL_TABLE;
}
return TSDB_CODE_TABLE_ALREADY_EXIST;
}
}
pTable->createdTime = taosGetTimestampMs();
strcpy(pTable->meterId, pCreate->meterId);
if (pthread_rwlock_init(&pTable->rwLock, NULL)) {
mError("table:%s, failed to init meter lock", pCreate->meterId);
mgmtDestroyMeter(pTable);
return TSDB_CODE_FAILED_TO_LOCK_RESOURCES;
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
assert(pAcct != NULL);
int code = mgmtCheckTableLimit(pAcct, pCreate);
if (code != 0) {
mError("table:%s, exceed the limit", pCreate->meterId);
return code;
}
if (mgmtCheckExpired()) {
mError("failed to create meter:%s, reason:grant expired", pTable->meterId);
mError("failed to create meter:%s, reason:grant expired", pCreate->meterId);
return TSDB_CODE_GRANT_EXPIRED;
}
if (pCreate->numOfTags == 0) {
int grantCode = mgmtCheckTimeSeries(pTable->numOfColumns);
int grantCode = mgmtCheckTimeSeries(pCreate->numOfColumns);
if (grantCode != 0) {
mError("table:%s, grant expired", pCreate->meterId);
return grantCode;
}
}
if (pCreate->numOfTags == 0) { // handle normal meter creation
pVgroup = pDb->pHead;
if (pDb->vgStatus == TSDB_VG_STATUS_IN_PROGRESS) {
mgmtDestroyMeter(pTable);
//mTrace("table:%s, vgroup in creating progress", pCreate->meterId);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
if (pDb->vgStatus == TSDB_VG_STATUS_FULL) {
mgmtDestroyMeter(pTable);
mError("table:%s, vgroup is full", pCreate->meterId);
return TSDB_CODE_NO_ENOUGH_DNODES;
}
if (pDb->vgStatus == TSDB_VG_STATUS_NO_DISK_PERMISSIONS ||
pDb->vgStatus == TSDB_VG_STATUS_SERVER_NO_PACE ||
pDb->vgStatus == TSDB_VG_STATUS_SERV_OUT_OF_MEMORY ||
pDb->vgStatus == TSDB_VG_STATUS_INIT_FAILED ) {
mgmtDestroyMeter(pTable);
mError("table:%s, vgroup init failed, reason:%d %s", pCreate->meterId, pDb->vgStatus, taosGetVgroupStatusStr(pDb->vgStatus));
return pDb->vgStatus;
}
SVgObj *pVgroup = mgmtGetAvailVgroup(pDb);
if (pVgroup == NULL) {
pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS;
mgmtCreateVgroup(pDb);
mgmtDestroyMeter(pTable);
mTrace("table:%s, vgroup malloced, wait for create progress finished", pCreate->meterId);
return TSDB_CODE_ACTION_IN_PROGRESS;
return terrno;
}
int sid = taosAllocateId(pVgroup->idPool);
int32_t sid = mgmtAllocateSid(pDb, pVgroup);
if (sid < 0) {
mWarn("table:%s, vgroup:%d run out of ID, num:%d", pCreate->meterId, pVgroup->vgId, taosIdPoolNumOfUsed(pVgroup->idPool));
pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS;
mgmtCreateVgroup(pDb);
mgmtDestroyMeter(pTable);
return TSDB_CODE_ACTION_IN_PROGRESS;
return terrno;
}
pTable->gid.sid = sid;
pTable->gid.vgId = pVgroup->vgId;
pTable->uid = (((uint64_t)pTable->gid.vgId) << 40) + ((((uint64_t)pTable->gid.sid) & ((1ul << 24) - 1ul)) << 16) +
((uint64_t)sdbVersion & ((1ul << 16) - 1ul));
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s",
pTable->meterId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
if (pCreate->numOfColumns == 0) {
return mgmtCreateChildTable(pDb, pCreate, pVgroup->vgId, sid);
} else if (pCreate->sqlLen > 0) {
return mgmtCreateStreamTable(pDb, pCreate, pVgroup->vgId, sid);
} else {
return mgmtCreateNormalTable(pDb, pCreate, pVgroup->vgId, sid);
}
} else {
pTable->uid = (((uint64_t)pTable->createdTime) << 16) + ((uint64_t)sdbVersion & ((1ul << 16) - 1ul));
return mgmtCreateSuperTable(pDb, pCreate);
}
if (sdbInsertRow(meterSdb, pTable, 0) < 0) {
mError("table:%s, update sdb error", pCreate->meterId);
return TSDB_CODE_SDB_ERROR;
}
// send create message to the selected vnode servers
if (pCreate->numOfTags == 0) {
mTrace("table:%s, send create table msg to dnode, vgId:%d, sid:%d, vnode:%d",
pTable->meterId, pTable->gid.vgId, pTable->gid.sid, pVgroup->vnodeGid[0].vnode);
mgmtAddTimeSeries(pTable->numOfColumns - 1);
mgmtSendCreateMsgToVgroup(pTable, pVgroup);
}
return 0;
}
int mgmtDropMeter(SDbObj *pDb, char *meterId, int ignore) {
STabObj * pTable;
SAcctObj *pAcct;
pTable = mgmtGetTable(meterId);
if (pTable == NULL) {
int mgmtDropTable(SDbObj *pDb, char *tableId, int ignore) {
STableObj table = mgmtGetTable(tableId);
if (table.obj == NULL) {
if (ignore) {
return TSDB_CODE_SUCCESS;
} else {
......@@ -749,159 +591,76 @@ int mgmtDropMeter(SDbObj *pDb, char *meterId, int ignore) {
}
}
pAcct = mgmtGetAcct(pDb->cfg.acct);
// 0.log
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
return TSDB_CODE_MONITOR_DB_FORBEIDDEN;
}
if (mgmtIsNormalTable(pTable)) {
return dropMeterImp(pDb, pTable, pAcct);
} else {
// remove a metric
/*
if (pTable->numOfMeters > 0) {
assert(pTable->pSkipList != NULL && pTable->pSkipList->nSize > 0);
return TSDB_CODE_RELATED_TABLES_EXIST;
}
*/
// first delet all meters of metric
dropAllMetersOfMetric(pDb, pTable, pAcct);
// finally delete metric
sdbDeleteRow(meterSdb, pTable);
return TSDB_CODE_MONITOR_DB_FORBIDDEN;
}
switch (table.type) {
case TSDB_TABLE_TYPE_SUPER_TABLE:
return mgmtDropSuperTable(pDb, table.obj);
case TSDB_TABLE_TYPE_CHILD_TABLE:
return mgmtDropChildTable(pDb, table.obj);
case TSDB_TABLE_TYPE_STREAM_TABLE:
return mgmtDropStreamTable(pDb, table.obj);
case TSDB_TABLE_TYPE_NORMAL_TABLE:
return mgmtDropNormalTable(pDb, table.obj);
default:
return TSDB_CODE_INVALID_TABLE;
}
return 0;
}
int mgmtAlterMeter(SDbObj *pDb, SAlterTableMsg *pAlter) {
STabObj *pTable;
pTable = mgmtGetTable(pAlter->meterId);
if (pTable == NULL) {
int mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter) {
STableObj table = mgmtGetTable(tableId);
if (table.obj == NULL) {
return TSDB_CODE_INVALID_TABLE;
}
// 0.log
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) return TSDB_CODE_MONITOR_DB_FORBEIDDEN;
if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) {
if (!mgmtIsNormalTable(pTable) || !mgmtTableCreateFromSuperTable(pTable)) {
return TSDB_CODE_OPS_NOT_SUPPORT;
}
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
return TSDB_CODE_MONITOR_DB_FORBIDDEN;
}
// if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) {
// return mgmtUpdate
// if (!mgmtIsNormalTable(pTable) || !mgmtTableCreateFromSuperTable(pTable)) {
// return TSDB_CODE_OPS_NOT_SUPPORT;
// }
// }
// todo add
/* mgmtMeterAddTags */
if (pAlter->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) {
mTrace("alter table %s to add tag column:%s, type:%d", pTable->meterId, pAlter->schema[0].name,
pAlter->schema[0].type);
return mgmtMeterAddTags(pTable, pAlter->schema, 1);
if (table.type == TSDB_TABLE_TYPE_SUPER_TABLE) {
return mgmtAddSuperTableTag(table.obj, pAlter->schema, 1);
}
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) {
mTrace("alter table %s to drop tag column:%s", pTable->meterId, pAlter->schema[0].name);
return mgmtMeterDropTagByName(pTable, pAlter->schema[0].name);
if (table.type == TSDB_TABLE_TYPE_SUPER_TABLE) {
return mgmtDropSuperTableTag(table.obj, pAlter->schema[0].name);
}
} else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) {
mTrace("alter table %s to change tag column name, old: %s, new: %s", pTable->meterId, pAlter->schema[0].name,
pAlter->schema[1].name);
return mgmtMeterModifyTagNameByName(pTable, pAlter->schema[0].name, pAlter->schema[1].name);
if (table.type == TSDB_TABLE_TYPE_SUPER_TABLE) {
return mgmtModifySuperTableTagNameByName(table.obj, pAlter->schema[0].name, pAlter->schema[1].name);
}
} else if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) {
mTrace("alter table %s to modify tag value, tag name:%s", pTable->meterId, pAlter->schema[0].name);
return mgmtMeterModifyTagValueByName(pTable, pAlter->schema[0].name, pAlter->tagVal);
if (table.type == TSDB_TABLE_TYPE_CHILD_TABLE) {
return mgmtModifyChildTableTagValueByName(table.obj, pAlter->schema[0].name, pAlter->tagVal);
}
} else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) {
mTrace("alter table %s to add column:%s, type:%d", pTable->meterId, pAlter->schema[0].name, pAlter->schema[0].type);
return mgmtMeterAddColumn(pTable, pAlter->schema, 1);
if (table.type == TSDB_TABLE_TYPE_NORMAL_TABLE) {
return mgmtAddNormalTableColumn(table.obj, pAlter->schema, 1);
} else if (table.type == TSDB_TABLE_TYPE_SUPER_TABLE) {
return mgmtAddSuperTableColumn(table.obj, pAlter->schema, 1);
} else {}
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) {
mTrace("alter table %s to drop column:%s", pTable->meterId, pAlter->schema[0].name);
return mgmtMeterDropColumnByName(pTable, pAlter->schema[0].name);
} else {
return TSDB_CODE_INVALID_MSG_TYPE;
}
return TSDB_CODE_SUCCESS;
}
static int dropMeterImp(SDbObj *pDb, STabObj * pTable, SAcctObj *pAcct) {
SVgObj * pVgroup;
if (pAcct != NULL) pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1);
pVgroup = mgmtGetVgroup(pTable->gid.vgId);
if (pVgroup == NULL) return TSDB_CODE_OTHERS;
mgmtRestoreTimeSeries(pTable->numOfColumns - 1);
mgmtSendRemoveMeterMsgToDnode(pTable, pVgroup);
sdbDeleteRow(meterSdb, pTable);
if (pVgroup->numOfMeters <= 0) mgmtDropVgroup(pDb, pVgroup);
return 0;
}
static void dropAllMetersOfMetric(SDbObj *pDb, STabObj * pMetric, SAcctObj *pAcct) {
STabObj * pTable = NULL;
while ((pTable = pMetric->pHead) != NULL) {
(void)dropMeterImp(pDb, pTable, pAcct);
}
}
/*
* create key of each meter for skip list, which is generated from first tag
* column
*/
static void createKeyFromTagValue(STabObj *pMetric, STabObj *pTable, tSkipListKey *pKey) {
SSchema * pTagSchema = (SSchema *)(pMetric->schema + pMetric->numOfColumns * sizeof(SSchema));
const int16_t KEY_COLUMN_OF_TAGS = 0;
if (table.type == TSDB_TABLE_TYPE_NORMAL_TABLE) {
return mgmtDropNormalTableColumnByName(table.obj, pAlter->schema[0].name);
} else if (table.type == TSDB_TABLE_TYPE_SUPER_TABLE) {
return mgmtDropSuperTableColumnByName(table.obj, pAlter->schema[0].name);
} else {}
} else {}
char *tagVal = pTable->pTagData + TSDB_TABLE_ID_LEN; // tag start position
*pKey = tSkipListCreateKey(pTagSchema[KEY_COLUMN_OF_TAGS].type, tagVal, pTagSchema[KEY_COLUMN_OF_TAGS].bytes);
}
/*
* add a meter into a metric's skip list
*/
static void addMeterIntoMetricIndex(STabObj *pMetric, STabObj *pTable) {
const int16_t KEY_COLUMN_OF_TAGS = 0;
SSchema * pTagSchema = (SSchema *)(pMetric->schema + pMetric->numOfColumns * sizeof(SSchema));
if (pMetric->pSkipList == NULL) {
pMetric->pSkipList = tSkipListCreate(MAX_SKIP_LIST_LEVEL, pTagSchema[KEY_COLUMN_OF_TAGS].type,
pTagSchema[KEY_COLUMN_OF_TAGS].bytes);
}
if (pMetric->pSkipList) {
tSkipListKey key = {0};
createKeyFromTagValue(pMetric, pTable, &key);
tSkipListPut(pMetric->pSkipList, pTable, &key, 1);
tSkipListDestroyKey(&key);
}
}
static void removeMeterFromMetricIndex(STabObj *pMetric, STabObj *pTable) {
if (pMetric->pSkipList == NULL) {
return;
}
tSkipListKey key = {0};
createKeyFromTagValue(pMetric, pTable, &key);
tSkipListNode **pRes = NULL;
int32_t num = tSkipListGets(pMetric->pSkipList, &key, &pRes);
for (int32_t i = 0; i < num; ++i) {
STabObj *pOneMeter = (STabObj *)pRes[i]->pData;
if (pOneMeter->gid.sid == pTable->gid.sid && pOneMeter->gid.vgId == pTable->gid.vgId) {
assert(pTable == pOneMeter);
tSkipListRemoveNode(pMetric->pSkipList, pRes[i]);
}
}
tSkipListDestroyKey(&key);
if (num != 0) {
free(pRes);
}
return TSDB_CODE_OPS_NOT_SUPPORT;
}
int mgmtAddMeterIntoMetric(STabObj *pMetric, STabObj *pTable) {
......@@ -942,15 +701,24 @@ int mgmtRemoveMeterFromMetric(STabObj *pMetric, STabObj *pTable) {
return 0;
}
void mgmtCleanUpMeters() { sdbCloseTable(meterSdb); }
void mgmtCleanUpMeters() {
mgmtCleanUpNormalTables();
mgmtCleanUpStreamTables();
mgmtCleanUpChildTables();
mgmtCleanUpSuperTables();
}
int mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int cols = 0;
int32_t mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int32_t cols = 0;
SDbObj *pDb = NULL;
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
if (pConn->pDb != NULL) {
pDb = mgmtGetDb(pConn->pDb->name);
}
if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
if (pDb == NULL) {
return TSDB_CODE_DB_NOT_SELECTED;
}
SSchema *pSchema = tsGetSchema(pMeta);
......@@ -982,320 +750,43 @@ int mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
// pShow->numOfRows = sdbGetNumOfRows (meterSdb);
pShow->numOfRows = pDb->numOfTables;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
return 0;
}
SSchema *mgmtGetTableSchema(STabObj *pTable) {
if (pTable == NULL) {
return NULL;
}
if (!mgmtTableCreateFromSuperTable(pTable)) {
return (SSchema *)pTable->schema;
}
STabObj *pMetric = mgmtGetTable(pTable->pTagData);
assert(pMetric != NULL);
return (SSchema *)pMetric->schema;
}
static int32_t mgmtSerializeTagValue(char* pMsg, STabObj* pTable, int16_t* tagsId, int32_t numOfTags) {
int32_t offset = 0;
for (int32_t j = 0; j < numOfTags; ++j) {
if (tagsId[j] == TSDB_TBNAME_COLUMN_INDEX) { // handle the table name tags
char name[TSDB_METER_NAME_LEN] = {0};
extractTableName(pTable->meterId, name);
memcpy(pMsg + offset, name, TSDB_METER_NAME_LEN);
offset += TSDB_METER_NAME_LEN;
} else {
SSchema s = {0};
char * tag = mgmtTableGetTag(pTable, tagsId[j], &s);
memcpy(pMsg + offset, tag, (size_t)s.bytes);
offset += s.bytes;
}
}
return offset;
}
/*
* serialize SVnodeSidList to byte array
*/
static char *mgmtBuildMetricMetaMsg(SConnObj *pConn, STabObj *pTable, int32_t *ovgId, SVnodeSidList **pList, SMetricMeta *pMeta,
int32_t tagLen, int16_t numOfTags, int16_t *tagsId, int32_t maxNumOfMeters,
char *pMsg) {
if (pTable->gid.vgId != *ovgId || ((*pList) != NULL && (*pList)->numOfSids >= maxNumOfMeters)) {
/*
* here we construct a new vnode group for 2 reasons
* 1. the query msg may be larger than 64k,
* 2. the following meters belong to different vnodes
*/
(*pList) = (SVnodeSidList *)pMsg;
(*pList)->numOfSids = 0;
(*pList)->index = 0;
pMeta->numOfVnodes++;
SVgObj *pVgroup = mgmtGetVgroup(pTable->gid.vgId);
for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
if (pConn->usePublicIp) {
(*pList)->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp;
(*pList)->vpeerDesc[i].vnode = pVgroup->vnodeGid[i].vnode;
} else {
(*pList)->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip;
(*pList)->vpeerDesc[i].vnode = pVgroup->vnodeGid[i].vnode;
}
}
pMsg += sizeof(SVnodeSidList);
(*ovgId) = pTable->gid.vgId;
}
pMeta->numOfMeters++;
(*pList)->numOfSids++;
SMeterSidExtInfo *pSMeterTagInfo = (SMeterSidExtInfo *)pMsg;
pSMeterTagInfo->sid = htonl(pTable->gid.sid);
pSMeterTagInfo->uid = htobe64(pTable->uid);
pMsg += sizeof(SMeterSidExtInfo);
int32_t offset = mgmtSerializeTagValue(pMsg, pTable, tagsId, numOfTags);
assert(offset == tagLen);
pMsg += offset;
return pMsg;
}
// get total number of vnodes in final result set
static int32_t mgmtGetNumOfVnodesInResult(tQueryResultset *pResult) {
int32_t numOfVnodes = 0;
int32_t prevGid = -1;
for (int32_t i = 0; i < pResult->num; ++i) {
STabObj *pTable = pResult->pRes[i];
if (prevGid == -1) {
prevGid = pTable->gid.vgId;
numOfVnodes++;
} else if (prevGid != pTable->gid.vgId) {
prevGid = pTable->gid.vgId;
numOfVnodes++;
}
}
return numOfVnodes;
}
static int32_t mgmtGetMetricMetaMsgSize(tQueryResultset *pResult, int32_t tagLength, int32_t maxMetersPerQuery) {
int32_t numOfVnodes = mgmtGetNumOfVnodesInResult(pResult);
int32_t size = (sizeof(SMeterSidExtInfo) + tagLength) * pResult->num +
((pResult->num / maxMetersPerQuery) + 1 + numOfVnodes) * sizeof(SVnodeSidList) + sizeof(SMetricMeta) +
1024;
return size;
}
static SMetricMetaElemMsg *doConvertMetricMetaMsg(SSuperTableMetaMsg *pSuperTableMetaMsg, int32_t tableIndex) {
SMetricMetaElemMsg *pElem = (SMetricMetaElemMsg *)((char *)pSuperTableMetaMsg + pSuperTableMetaMsg->metaElem[tableIndex]);
pElem->orderIndex = htons(pElem->orderIndex);
pElem->orderType = htons(pElem->orderType);
pElem->numOfTags = htons(pElem->numOfTags);
pElem->numOfGroupCols = htons(pElem->numOfGroupCols);
pElem->condLen = htonl(pElem->condLen);
pElem->cond = htonl(pElem->cond);
pElem->elemLen = htons(pElem->elemLen);
pElem->tableCond = htonl(pElem->tableCond);
pElem->tableCondLen = htonl(pElem->tableCondLen);
pElem->rel = htons(pElem->rel);
for (int32_t i = 0; i < pElem->numOfTags; ++i) {
pElem->tagCols[i] = htons(pElem->tagCols[i]);
}
pElem->groupbyTagColumnList = htonl(pElem->groupbyTagColumnList);
SColIndexEx *groupColIds = (SColIndexEx*) (((char *)pSuperTableMetaMsg) + pElem->groupbyTagColumnList);
for (int32_t i = 0; i < pElem->numOfGroupCols; ++i) {
groupColIds[i].colId = htons(groupColIds[i].colId);
groupColIds[i].colIdx = htons(groupColIds[i].colIdx);
groupColIds[i].flag = htons(groupColIds[i].flag);
groupColIds[i].colIdxInBuf = 0;
}
return pElem;
}
static int32_t mgmtBuildMetricMetaRspMsg(SConnObj *pConn, SSuperTableMetaMsg *pSuperTableMetaMsg, tQueryResultset *pResult,
char **pStart, int32_t *tagLen, int32_t rspMsgSize, int32_t maxTablePerVnode,
int32_t code) {
*pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_METRIC_META_RSP, rspMsgSize);
if (*pStart == NULL) {
return 0;
}
char * pMsg = (*pStart);
STaosRsp *pRsp = (STaosRsp *)pMsg;
pRsp->code = code;
pMsg += sizeof(STaosRsp);
*pMsg = TSDB_IE_TYPE_META;
pMsg++;
if (code != TSDB_CODE_SUCCESS) {
return pMsg - (*pStart); // one bit in payload
}
int32_t msgLen = 0;
*(int16_t *)pMsg = htons(pSuperTableMetaMsg->numOfMeters);
pMsg += sizeof(int16_t);
for (int32_t j = 0; j < pSuperTableMetaMsg->numOfMeters; ++j) {
SVnodeSidList *pList = NULL;
int ovgId = -1;
SMetricMeta *pMeta = (SMetricMeta *)pMsg;
pMeta->numOfMeters = 0;
pMeta->numOfVnodes = 0;
pMeta->tagLen = htons((uint16_t)tagLen[j]);
pMsg = (char *)pMeta + sizeof(SMetricMeta);
SMetricMetaElemMsg *pElem = (SMetricMetaElemMsg *)((char *)pSuperTableMetaMsg + pSuperTableMetaMsg->metaElem[j]);
for (int32_t i = 0; i < pResult[j].num; ++i) {
STabObj *pTable = pResult[j].pRes[i];
pMsg = mgmtBuildMetricMetaMsg(pConn, pTable, &ovgId, &pList, pMeta, tagLen[j], pElem->numOfTags, pElem->tagCols,
maxTablePerVnode, pMsg);
}
mTrace("metric:%s metric-meta tables:%d, vnode:%d", pElem->meterId, pMeta->numOfMeters, pMeta->numOfVnodes);
pMeta->numOfMeters = htonl(pMeta->numOfMeters);
pMeta->numOfVnodes = htonl(pMeta->numOfVnodes);
}
msgLen = pMsg - (*pStart);
mTrace("metric-meta msg size %d", msgLen);
return msgLen;
}
int mgmtRetrieveMetricMeta(SConnObj *pConn, char **pStart, SSuperTableMetaMsg *pSuperTableMetaMsg) {
/*
* naive method: Do not limit the maximum number of meters in each
* vnode(subquery), split the result according to vnodes
*
* todo: split the number of vnodes to make sure each vnode has the same
* number of tables to query, while not break the upper limit of number of vnode queries
*/
int32_t maxMetersPerVNodeForQuery = INT32_MAX;
int msgLen = 0;
int ret = TSDB_CODE_SUCCESS;
tQueryResultset *result = calloc(1, pSuperTableMetaMsg->numOfMeters * sizeof(tQueryResultset));
int32_t * tagLen = calloc(1, sizeof(int32_t) * pSuperTableMetaMsg->numOfMeters);
if (result == NULL || tagLen == NULL) {
tfree(result);
tfree(tagLen);
return -1;
}
for (int32_t i = 0; i < pSuperTableMetaMsg->numOfMeters; ++i) {
SMetricMetaElemMsg *pElem = doConvertMetricMetaMsg(pSuperTableMetaMsg, i);
STabObj * pMetric = mgmtGetTable(pElem->meterId);
if (!mgmtIsSuperTable(pMetric)) {
ret = TSDB_CODE_NOT_SUPER_TABLE;
break;
}
tagLen[i] = mgmtGetReqTagsLength(pMetric, (int16_t *)pElem->tagCols, pElem->numOfTags);
}
#if 0
//todo: opt for join process
int64_t num = 0;
int32_t index = 0;
for (int32_t i = 0; i < pSuperTableMetaMsg->numOfMeters; ++i) {
SMetricMetaElemMsg *pElem = (SMetricMetaElemMsg*) ((char *) pSuperTableMetaMsg + pSuperTableMetaMsg->metaElem[i]);
STabObj *pMetric = mgmtGetTable(pElem->meterId);
if (pMetric->pSkipList->nSize > num) {
index = i;
num = pMetric->pSkipList->nSize;
}
}
#endif
if (ret == TSDB_CODE_SUCCESS) {
// todo opt performance
for (int32_t i = 0; i < pSuperTableMetaMsg->numOfMeters; ++i) {
ret = mgmtRetrieveMetersFromSuperTable(pSuperTableMetaMsg, i, &result[i]);
}
}
if (ret == TSDB_CODE_SUCCESS) {
ret = mgmtDoJoin(pSuperTableMetaMsg, result);
}
if (ret == TSDB_CODE_SUCCESS) {
for (int32_t i = 0; i < pSuperTableMetaMsg->numOfMeters; ++i) {
mgmtReorganizeMetersInMetricMeta(pSuperTableMetaMsg, i, &result[i]);
}
}
if (ret == TSDB_CODE_SUCCESS) {
for (int32_t i = 0; i < pSuperTableMetaMsg->numOfMeters; ++i) {
msgLen += mgmtGetMetricMetaMsgSize(&result[i], tagLen[i], maxMetersPerVNodeForQuery);
}
} else {
msgLen = 512;
}
msgLen = mgmtBuildMetricMetaRspMsg(pConn, pSuperTableMetaMsg, result, pStart, tagLen, msgLen, maxMetersPerVNodeForQuery, ret);
for (int32_t i = 0; i < pSuperTableMetaMsg->numOfMeters; ++i) {
tQueryResultClean(&result[i]);
}
free(tagLen);
free(result);
return msgLen;
}
int mgmtRetrieveMeters(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
int numOfRows = 0;
int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
int32_t numOfRows = 0;
STabObj *pTable = NULL;
char * pWrite;
int cols = 0;
int prefixLen;
int numOfRead = 0;
int32_t cols = 0;
int32_t prefixLen;
int32_t numOfRead = 0;
char prefix[20] = {0};
int16_t numOfColumns;
char * tableId;
char * superTableId;
int64_t createdTime;
void * pNormalTableNode;
void * pChildTableNode;
SDbObj *pDb = NULL;
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
if (pConn->pDb != NULL) {
pDb = mgmtGetDb(pConn->pDb->name);
}
if (pDb == NULL) {
return 0;
}
if (pDb == NULL) return 0;
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
if (strcmp(pConn->pUser->user, "root") != 0 && strcmp(pConn->pUser->user, "_root") != 0 && strcmp(pConn->pUser->user, "monitor") != 0 ) {
if (strcmp(pConn->pUser->user, "root") != 0 && strcmp(pConn->pUser->user, "_root") != 0 &&
strcmp(pConn->pUser->user, "monitor") != 0) {
return 0;
}
}
......@@ -1305,26 +796,46 @@ int mgmtRetrieveMeters(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
prefixLen = strlen(prefix);
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
char meterName[TSDB_METER_NAME_LEN] = {0};
char meterName[TSDB_METER_NAME_LEN] = {0};
while (numOfRows < rows) {
pShow->pNode = sdbFetchRow(meterSdb, pShow->pNode, (void **)&pTable);
if (pTable == NULL) break;
if (mgmtIsSuperTable(pTable)) continue;
pNormalTableNode = sdbFetchRow(tsNormalTableSdb, pShow->pNode, (void **) &pTable);
if (pTable != NULL) {
pShow->pNode = pNormalTableNode;
SNormalTableObj *pNormalTable = (SNormalTableObj *) pTable;
tableId = pNormalTable->tableId;
superTableId = NULL;
createdTime = pNormalTable->createdTime;
numOfColumns = pNormalTable->numOfColumns;
} else {
pChildTableNode = sdbFetchRow(tsChildTableSdb, pShow->pNode, (void **) &pTable);
if (pTable != NULL) {
pShow->pNode = pChildTableNode;
SChildTableObj *pChildTable = (SChildTableObj *) pTable;
tableId = pChildTable->tableId;
superTableId = NULL;
createdTime = pChildTable->createdTime;
numOfColumns = pChildTable->superTable->numOfColumns;
} else {
break;
}
}
// not belong to current db
if (strncmp(pTable->meterId, prefix, prefixLen)) continue;
if (strncmp(tableId, prefix, prefixLen)) {
continue;
}
numOfRead++;
memset(meterName, 0, tListLen(meterName));
// pattern compare for meter name
extractTableName(pTable->meterId, meterName);
extractTableName(tableId, meterName);
if (pShow->payloadLen > 0 &&
patternMatch(pShow->payload, meterName, TSDB_METER_NAME_LEN, &info) != TSDB_PATTERN_MATCH)
patternMatch(pShow->payload, meterName, TSDB_METER_NAME_LEN, &info) != TSDB_PATTERN_MATCH) {
continue;
}
cols = 0;
......@@ -1333,16 +844,16 @@ int mgmtRetrieveMeters(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pTable->createdTime;
*(int64_t *) pWrite = createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pTable->numOfColumns;
*(int16_t *) pWrite = numOfColumns;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pTable->pTagData) {
extractTableName(pTable->pTagData, pWrite);
extractTableName(superTableId, pWrite);
}
cols++;
......@@ -1357,452 +868,18 @@ int mgmtRetrieveMeters(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
return numOfRows;
}
int mgmtGetMetricMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int cols = 0;
SDbObj *pDb = NULL;
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
SSchema *pSchema = tsGetSchema(pMeta);
pShow->bytes[cols] = TSDB_METER_NAME_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created_time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "columns");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "tags");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "tables");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->numOfRows = pDb->numOfMetrics;
pShow->pNode = pDb->pMetric;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
return 0;
}
int mgmtRetrieveMetrics(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
int numOfRows = 0;
STabObj *pMetric = NULL;
char * pWrite;
int cols = 0;
SDbObj *pDb = NULL;
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
if (pDb == NULL) return 0;
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
if (strcmp(pConn->pUser->user, "root") != 0 && strcmp(pConn->pUser->user, "_root") != 0 && strcmp(pConn->pUser->user, "monitor") != 0 ) {
return 0;
}
}
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
char metricName[TSDB_METER_NAME_LEN] = {0};
while (numOfRows < rows) {
pMetric = (STabObj *)pShow->pNode;
if (pMetric == NULL) break;
pShow->pNode = (void *)pMetric->next;
memset(metricName, 0, tListLen(metricName));
extractTableName(pMetric->meterId, metricName);
if (pShow->payloadLen > 0 &&
patternMatch(pShow->payload, metricName, TSDB_METER_NAME_LEN, &info) != TSDB_PATTERN_MATCH)
continue;
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
extractTableName(pMetric->meterId, pWrite);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pMetric->createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pMetric->numOfColumns;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pMetric->numOfTags;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pMetric->numOfMeters;
cols++;
numOfRows++;
}
pShow->numOfReads += numOfRows;
return numOfRows;
}
int32_t mgmtFindTagCol(STabObj *pMetric, const char *tagName) {
if (!mgmtIsSuperTable(pMetric)) return -1;
SSchema *schema = NULL;
for (int i = 0; i < pMetric->numOfTags; i++) {
schema = (SSchema *)(pMetric->schema + (pMetric->numOfColumns + i) * sizeof(SSchema));
if (strcasecmp(tagName, schema->name) == 0) return i;
}
return -1;
}
int32_t mgmtMeterModifyTagNameByCol(STabObj *pMetric, uint32_t col, const char *nname) {
int rowSize = 0;
assert(col >= 0);
uint32_t len = strlen(nname);
if (pMetric == NULL || (!mgmtIsSuperTable(pMetric)) || col >= pMetric->numOfTags || len >= TSDB_COL_NAME_LEN ||
mgmtFindTagCol(pMetric, nname) >= 0)
return TSDB_CODE_APP_ERROR;
// update
SSchema *schema = (SSchema *)(pMetric->schema + (pMetric->numOfColumns + col) * sizeof(SSchema));
strncpy(schema->name, nname, TSDB_COL_NAME_LEN);
// Encode string
int size = 1 + sizeof(STabObj) + TSDB_MAX_BYTES_PER_ROW;
char *msg = (char *)malloc(size);
if (msg == NULL) return TSDB_CODE_APP_ERROR;
memset(msg, 0, size);
mgmtMeterActionEncode(pMetric, msg, size, &rowSize);
int32_t ret = sdbUpdateRow(meterSdb, msg, rowSize, 1);
tfree(msg);
if (ret < 0) {
mError("Failed to modify table %s tag column", pMetric->meterId);
return TSDB_CODE_APP_ERROR;
SSchema *mgmtGetTableSchema(STabObj *pTable) {
if (pTable == NULL) {
return NULL;
}
mTrace("Succeed to modify table %s tag column", pMetric->meterId);
return TSDB_CODE_SUCCESS;
}
int32_t mgmtMeterModifyTagNameByName(STabObj *pMetric, const char *oname, const char *nname) {
if (pMetric == NULL || (!mgmtIsSuperTable(pMetric))) return TSDB_CODE_APP_ERROR;
int index = mgmtFindTagCol(pMetric, oname);
if (index < 0) {
// Tag name does not exist
mError("Failed to modify table %s tag column, oname: %s, nname: %s", pMetric->meterId, oname, nname);
return TSDB_CODE_INVALID_MSG_TYPE;
if (!mgmtTableCreateFromSuperTable(pTable)) {
return (SSchema *)pTable->schema;
}
return mgmtMeterModifyTagNameByCol(pMetric, index, nname);
}
int32_t mgmtMeterModifyTagValueByCol(STabObj *pTable, int col, const char *nContent) {
int rowSize = 0;
if (pTable == NULL || nContent == NULL || (!mgmtTableCreateFromSuperTable(pTable))) return TSDB_CODE_APP_ERROR;
STabObj *pMetric = mgmtGetTable(pTable->pTagData);
assert(pMetric != NULL);
if (col < 0 || col > pMetric->numOfTags) return TSDB_CODE_APP_ERROR;
SSchema *schema = (SSchema *)(pMetric->schema + (pMetric->numOfColumns + col) * sizeof(SSchema));
if (col == 0) {
pTable->isDirty = 1;
removeMeterFromMetricIndex(pMetric, pTable);
}
memcpy(pTable->pTagData + mgmtGetTagsLength(pMetric, col) + TSDB_TABLE_ID_LEN, nContent, schema->bytes);
if (col == 0) {
addMeterIntoMetricIndex(pMetric, pTable);
}
// Encode the string
int size = sizeof(STabObj) + TSDB_MAX_BYTES_PER_ROW + 1;
char *msg = (char *)malloc(size);
if (msg == NULL) {
mError("failed to allocate message memory while modify tag value");
return TSDB_CODE_APP_ERROR;
}
memset(msg, 0, size);
mgmtMeterActionEncode(pTable, msg, size, &rowSize);
int32_t ret = sdbUpdateRow(meterSdb, msg, rowSize, 1); // Need callback function
tfree(msg);
if (pTable->isDirty) pTable->isDirty = 0;
if (ret < 0) {
mError("Failed to modify tag column %d of table %s", col, pTable->meterId);
return TSDB_CODE_APP_ERROR;
}
mTrace("Succeed to modify tag column %d of table %s", col, pTable->meterId);
return TSDB_CODE_SUCCESS;
}
int32_t mgmtMeterModifyTagValueByName(STabObj *pTable, char *tagName, char *nContent) {
if (pTable == NULL || tagName == NULL || nContent == NULL || (!mgmtTableCreateFromSuperTable(pTable)))
return TSDB_CODE_INVALID_MSG_TYPE;
STabObj *pMetric = mgmtGetTable(pTable->pTagData);
if (pMetric == NULL) return TSDB_CODE_APP_ERROR;
int col = mgmtFindTagCol(pMetric, tagName);
if (col < 0) return TSDB_CODE_APP_ERROR;
return mgmtMeterModifyTagValueByCol(pTable, col, nContent);
}
int32_t mgmtMeterAddTags(STabObj *pMetric, SSchema schema[], int ntags) {
if (pMetric == NULL || (!mgmtIsSuperTable(pMetric))) return TSDB_CODE_INVALID_TABLE;
if (pMetric->numOfTags + ntags > TSDB_MAX_TAGS) return TSDB_CODE_APP_ERROR;
// check if schemas have the same name
for (int i = 1; i < ntags; i++) {
for (int j = 0; j < i; j++) {
if (strcasecmp(schema[i].name, schema[j].name) == 0) {
return TSDB_CODE_APP_ERROR;
}
}
}
for (int i = 0; i < ntags; i++) {
if (mgmtFindTagCol(pMetric, schema[i].name) >= 0) {
return TSDB_CODE_APP_ERROR;
}
}
uint32_t size = sizeof(SMeterBatchUpdateMsg) + sizeof(SSchema) * ntags;
SMeterBatchUpdateMsg *msg = (SMeterBatchUpdateMsg *)malloc(size);
memset(msg, 0, size);
memcpy(msg->meterId, pMetric->meterId, TSDB_TABLE_ID_LEN);
msg->type = SDB_TYPE_INSERT;
msg->cols = ntags;
memcpy(msg->data, schema, sizeof(SSchema) * ntags);
int32_t ret = sdbBatchUpdateRow(meterSdb, msg, size);
tfree(msg);
if (ret < 0) {
mError("Failed to add tag column %s to table %s", schema[0].name, pMetric->meterId);
return TSDB_CODE_APP_ERROR;
}
mTrace("Succeed to add tag column %s to table %s", schema[0].name, pMetric->meterId);
return TSDB_CODE_SUCCESS;
}
int32_t mgmtMeterDropTagByCol(STabObj *pMetric, int col) {
if (pMetric == NULL || (!mgmtIsSuperTable(pMetric)) || col <= 0 || col >= pMetric->numOfTags) return TSDB_CODE_APP_ERROR;
// Pack message to do batch update
uint32_t size = sizeof(SMeterBatchUpdateMsg) + sizeof(SchemaUnit);
SMeterBatchUpdateMsg *msg = (SMeterBatchUpdateMsg *)malloc(size);
memset(msg, 0, size);
memcpy(msg->meterId, pMetric->meterId, TSDB_TABLE_ID_LEN);
msg->type = SDB_TYPE_DELETE; // TODO: what should here be ?
msg->cols = 1;
((SchemaUnit *)(msg->data))->col = col;
((SchemaUnit *)(msg->data))->pos = mgmtGetTagsLength(pMetric, col) + TSDB_TABLE_ID_LEN;
((SchemaUnit *)(msg->data))->schema = *(SSchema *)(pMetric->schema + sizeof(SSchema) * (pMetric->numOfColumns + col));
int32_t ret = sdbBatchUpdateRow(meterSdb, msg, size);
tfree(msg);
if (ret < 0) {
mError("Failed to drop tag column: %d from table: %s", col, pMetric->meterId);
return TSDB_CODE_APP_ERROR;
}
mTrace("Succeed to drop tag column: %d from table: %s", col, pMetric->meterId);
return TSDB_CODE_SUCCESS;
}
int32_t mgmtMeterDropTagByName(STabObj *pMetric, char *name) {
if (pMetric == NULL || (!mgmtIsSuperTable(pMetric))) {
mTrace("Failed to drop tag name: %s from table: %s", name, pMetric->meterId);
return TSDB_CODE_INVALID_TABLE;
}
int col = mgmtFindTagCol(pMetric, name);
return mgmtMeterDropTagByCol(pMetric, col);
}
int32_t mgmtFindColumnIndex(STabObj *pTable, const char *colName) {
STabObj *pMetric = NULL;
SSchema *schema = NULL;
if (pTable->tableType == TSDB_TABLE_TYPE_NORMAL_TABLE || pTable->tableType == TSDB_TABLE_TYPE_SUPER_TABLE) {
schema = (SSchema *)pTable->schema;
for (int32_t i = 0; i < pTable->numOfColumns; i++) {
if (strcasecmp(schema[i].name, colName) == 0) {
return i;
}
}
} else if (pTable->tableType == TSDB_TABLE_TYPE_CHILD_TABLE) {
pMetric = mgmtGetTable(pTable->pTagData);
if (pMetric == NULL) {
mError("MTable not belongs to any metric, meter: %s", pTable->meterId);
return -1;
}
schema = (SSchema *)pMetric->schema;
for (int32_t i = 0; i < pMetric->numOfColumns; i++) {
if (strcasecmp(schema[i].name, colName) == 0) {
return i;
}
}
}
return -1;
}
int32_t mgmtMeterAddColumn(STabObj *pTable, SSchema schema[], int ncols) {
SAcctObj *pAcct = NULL;
SDbObj * pDb = NULL;
if (pTable == NULL || pTable->tableType == TSDB_TABLE_TYPE_CHILD_TABLE || pTable->tableType == TSDB_TABLE_TYPE_STREAM_TABLE || ncols <= 0)
return TSDB_CODE_APP_ERROR;
// ASSUMPTION: no two tags are the same
for (int i = 0; i < ncols; i++)
if (mgmtFindColumnIndex(pTable, schema[i].name) > 0) return TSDB_CODE_APP_ERROR;
pDb = mgmtGetDbByMeterId(pTable->meterId);
if (pDb == NULL) {
mError("meter: %s not belongs to any database", pTable->meterId);
return TSDB_CODE_APP_ERROR;
}
pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("DB: %s not belongs to andy account", pDb->name);
return TSDB_CODE_APP_ERROR;
}
pTable->schema = realloc(pTable->schema, pTable->schemaSize + sizeof(SSchema) * ncols);
if (pTable->tableType == TSDB_TABLE_TYPE_NORMAL_TABLE) {
memcpy(pTable->schema + pTable->schemaSize, schema, sizeof(SSchema) * ncols);
} else if (pTable->tableType == TSDB_TABLE_TYPE_SUPER_TABLE) {
memmove(pTable->schema + sizeof(SSchema) * (pTable->numOfColumns + ncols),
pTable->schema + sizeof(SSchema) * pTable->numOfColumns, sizeof(SSchema) * pTable->numOfTags);
memcpy(pTable->schema + sizeof(SSchema) * pTable->numOfColumns, schema, sizeof(SSchema) * ncols);
}
SSchema *tschema = (SSchema *)(pTable->schema + sizeof(SSchema) * pTable->numOfColumns);
for (int i = 0; i < ncols; i++) tschema[i].colId = pTable->nextColId++;
pTable->schemaSize += sizeof(SSchema) * ncols;
pTable->numOfColumns += ncols;
pTable->sversion++;
if (mgmtIsNormalTable(pTable))
pAcct->acctInfo.numOfTimeSeries += ncols;
else
pAcct->acctInfo.numOfTimeSeries += (ncols * pTable->numOfMeters);
sdbUpdateRow(meterSdb, pTable, 0, 1);
if (pTable->tableType == TSDB_TABLE_TYPE_SUPER_TABLE) {
for (STabObj *pObj = pTable->pHead; pObj != NULL; pObj = pObj->next) {
pObj->numOfColumns++;
pObj->nextColId = pTable->nextColId;
pObj->sversion = pTable->sversion;
sdbUpdateRow(meterSdb, pObj, 0, 1);
}
}
return TSDB_CODE_SUCCESS;
return (SSchema *)pMetric->schema;
}
int32_t mgmtMeterDropColumnByName(STabObj *pTable, const char *name) {
SAcctObj *pAcct = NULL;
SDbObj * pDb = NULL;
if (pTable == NULL || pTable->tableType == TSDB_TABLE_TYPE_CHILD_TABLE || pTable->tableType == TSDB_TABLE_TYPE_STREAM_TABLE)
return TSDB_CODE_APP_ERROR;
int32_t index = mgmtFindColumnIndex(pTable, name);
if (index < 0) return TSDB_CODE_APP_ERROR;
pDb = mgmtGetDbByMeterId(pTable->meterId);
if (pDb == NULL) {
mError("meter: %s not belongs to any database", pTable->meterId);
return TSDB_CODE_APP_ERROR;
}
pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("DB: %s not belongs to any account", pDb->name);
return TSDB_CODE_APP_ERROR;
}
if (pTable->tableType == TSDB_TABLE_TYPE_NORMAL_TABLE) {
memmove(pTable->schema + sizeof(SSchema) * index, pTable->schema + sizeof(SSchema) * (index + 1),
sizeof(SSchema) * (pTable->numOfColumns - index - 1));
} else if (pTable->tableType == TSDB_TABLE_TYPE_SUPER_TABLE) {
memmove(pTable->schema + sizeof(SSchema) * index, pTable->schema + sizeof(SSchema) * (index + 1),
sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags - index - 1));
}
pTable->schemaSize -= sizeof(SSchema);
pTable->numOfColumns--;
if (mgmtIsNormalTable(pTable))
pAcct->acctInfo.numOfTimeSeries--;
else
pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfMeters);
pTable->schema = realloc(pTable->schema, pTable->schemaSize);
pTable->sversion++;
sdbUpdateRow(meterSdb, pTable, 0, 1);
if (pTable->tableType == TSDB_TABLE_TYPE_SUPER_TABLE) {
for (STabObj *pObj = pTable->pHead; pObj != NULL; pObj = pObj->next) {
pObj->numOfColumns--;
pObj->sversion = pTable->sversion;
sdbUpdateRow(meterSdb, pObj, 0, 1);
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -26,6 +26,7 @@
#include "tschemautil.h"
#include "tlog.h"
#include "tstatus.h"
#include "taoserror.h"
void * vgSdb = NULL;
int tsVgUpdateSize;
......@@ -128,6 +129,54 @@ int mgmtInitVgroups() {
SVgObj *mgmtGetVgroup(int vgId) { return (SVgObj *)sdbGetRow(vgSdb, &vgId); }
SVgObj *mgmtGetAvailVgroup(SDbObj *pDb) {
SVgObj *pVgroup = pDb->pHead;
if (pDb->vgStatus == TSDB_VG_STATUS_IN_PROGRESS) {
terrno = TSDB_CODE_ACTION_IN_PROGRESS;
return NULL;
}
if (pDb->vgStatus == TSDB_VG_STATUS_FULL) {
mError("db:%s, vgroup is full", pDb->name);
terrno = TSDB_CODE_NO_ENOUGH_DNODES;
return NULL;
}
if (pDb->vgStatus == TSDB_VG_STATUS_NO_DISK_PERMISSIONS ||
pDb->vgStatus == TSDB_VG_STATUS_SERVER_NO_PACE ||
pDb->vgStatus == TSDB_VG_STATUS_SERV_OUT_OF_MEMORY ||
pDb->vgStatus == TSDB_VG_STATUS_INIT_FAILED ) {
mError("db:%s, vgroup init failed, reason:%d %s", pDb->name, pDb->vgStatus, taosGetVgroupStatusStr(pDb->vgStatus));
terrno = pDb->vgStatus;
return NULL;
}
if (pVgroup == NULL) {
pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS;
mgmtCreateVgroup(pDb);
mTrace("db:%s, vgroup malloced, wait for create progress finished", pDb->name);
terrno = TSDB_CODE_ACTION_IN_PROGRESS;
return NULL;
}
terrno = 0;
return pVgroup;
}
int32_t mgmtAllocateSid(SDbObj *pDb, SVgObj *pVgroup) {
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid < 0) {
mWarn("table:%s, vgroup:%d run out of ID, num:%d", pDb->name, pVgroup->vgId, taosIdPoolNumOfUsed(pVgroup->idPool));
pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS;
mgmtCreateVgroup(pDb);
terrno = TSDB_CODE_ACTION_IN_PROGRESS;
}
terrno = 0;
return sid;
}
void mgmtProcessVgTimer(void *handle, void *tmrId) {
SDbObj *pDb = (SDbObj *)handle;
if (pDb == NULL) return;
......@@ -178,7 +227,7 @@ int mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) {
for (int i = 0; i < pDb->cfg.maxSessions; ++i) {
if (pVgroup->meterList != NULL) {
pTable = pVgroup->meterList[i];
if (pTable) mgmtDropMeter(pDb, pTable->meterId, 0);
if (pTable) mgmtDropTable(pDb, pTable->meterId, 0);
}
}
}
......
......@@ -144,4 +144,5 @@ int sdbRetrieveRows(int fd, SSdbTable *pTable, uint64_t version);
void sdbResetTable(SSdbTable *pTable);
extern const int16_t sdbFileVersion;
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册