提交 06db5c54 编写于 作者: S slguan

super table sdb

上级 95f748ca
...@@ -150,17 +150,13 @@ typedef struct SSuperTableObj { ...@@ -150,17 +150,13 @@ typedef struct SSuperTableObj {
int32_t vgId; int32_t vgId;
int64_t createdTime; int64_t createdTime;
int32_t sversion; int32_t sversion;
int32_t numOfTags; int32_t numOfTables;
int32_t numOfMeters;
int32_t numOfColumns; int32_t numOfColumns;
int32_t schemaSize; int32_t numOfTags;
int8_t reserved[7]; int8_t reserved[7];
int8_t updateEnd[1]; int8_t updateEnd[1];
pthread_rwlock_t rwLock;
int16_t nextColId; int16_t nextColId;
SSchema *schema;
int8_t *schema;
} SSuperTableObj; } SSuperTableObj;
typedef struct { typedef struct {
...@@ -244,7 +240,6 @@ typedef struct _db_obj { ...@@ -244,7 +240,6 @@ typedef struct _db_obj {
char reserved[16]; char reserved[16];
char updateEnd[1]; char updateEnd[1];
STabObj * pMetric;
struct _db_obj *prev, *next; struct _db_obj *prev, *next;
SVgObj * pHead; // empty vgroup first SVgObj * pHead; // empty vgroup first
SVgObj * pTail; // empty vgroup end SVgObj * pTail; // empty vgroup end
......
...@@ -31,7 +31,7 @@ void mgmtCleanUpSuperTables(); ...@@ -31,7 +31,7 @@ void mgmtCleanUpSuperTables();
int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate); int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate);
int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pTable); int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pTable);
SSuperTableObj* mgmtGetSuperTable(char *tableId); SSuperTableObj* mgmtGetSuperTable(char *tableId);
int32_t mgmtFindTagCol(SSuperTableObj *pTable, const char *tagName); int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName);
int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags); int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags);
int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName); int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName);
int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pTable, char *oldTagName, char *newTagName); int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pTable, char *oldTagName, char *newTagName);
...@@ -40,9 +40,6 @@ int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pTable, char *col ...@@ -40,9 +40,6 @@ int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pTable, char *col
SSchema* mgmtGetSuperTableSchema(SSuperTableObj *pTable); SSchema* mgmtGetSuperTableSchema(SSuperTableObj *pTable);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -36,10 +36,9 @@ int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter); ...@@ -36,10 +36,9 @@ int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter);
int32_t mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); int32_t mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn); int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
void mgmtCleanUpMeters(); void mgmtCleanUpMeters();
SSchema *mgmtGetTableSchema(STabObj *pTable); // get schema for a meter
int32_t mgmtAddMeterIntoMetric(STabObj *pMetric, STabObj *pTable); int32_t mgmtAddMeterIntoMetric(SSuperTableObj *pMetric, SChildTableObj *pTable);
int32_t mgmtRemoveMeterFromMetric(STabObj *pMetric, STabObj *pTable); int32_t mgmtRemoveMeterFromMetric(SSuperTableObj *pMetric, SChildTableObj *pTable);
int32_t mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); int32_t mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn); int32_t mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
......
...@@ -323,7 +323,7 @@ SChildTableObj* mgmtGetChildTable(char *tableId) { ...@@ -323,7 +323,7 @@ SChildTableObj* mgmtGetChildTable(char *tableId) {
} }
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent) { int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent) {
// int col = mgmtFindTagCol(pTable->superTable, tagName); // int col = mgmtFindSuperTableTagIndex(pTable->superTable, tagName);
// if (col < 0 || col > pTable->superTable->numOfTags) { // if (col < 0 || col > pTable->superTable->numOfTags) {
// return TSDB_CODE_APP_ERROR; // return TSDB_CODE_APP_ERROR;
// } // }
......
...@@ -86,7 +86,6 @@ int32_t mgmtInitDbs() { ...@@ -86,7 +86,6 @@ int32_t mgmtInitDbs() {
pDb->numOfMetrics = 0; pDb->numOfMetrics = 0;
pDb->vgStatus = TSDB_VG_STATUS_READY; pDb->vgStatus = TSDB_VG_STATUS_READY;
pDb->vgTimer = NULL; pDb->vgTimer = NULL;
pDb->pMetric = NULL;
pAcct = mgmtGetAcct(pDb->cfg.acct); pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct != NULL) if (pAcct != NULL)
mgmtAddDbIntoAcct(pAcct, pDb); mgmtAddDbIntoAcct(pAcct, pDb);
...@@ -265,12 +264,12 @@ bool mgmtCheckDropDbFinished(SDbObj *pDb) { ...@@ -265,12 +264,12 @@ bool mgmtCheckDropDbFinished(SDbObj *pDb) {
void mgmtDropDbFromSdb(SDbObj *pDb) { void mgmtDropDbFromSdb(SDbObj *pDb) {
while (pDb->pHead) mgmtDropVgroup(pDb, pDb->pHead); while (pDb->pHead) mgmtDropVgroup(pDb, pDb->pHead);
STabObj *pMetric = pDb->pMetric; // SSuperTableObj *pMetric = pDb->pSTable;
while (pMetric) { // while (pMetric) {
STabObj *pNext = pMetric->next; // SSuperTableObj *pNext = pMetric->next;
mgmtDropTable(pDb, pMetric->meterId, 0); // mgmtDropTable(pDb, pMetric->meterId, 0);
pMetric = pNext; // pMetric = pNext;
} // }
mPrint("db:%s all meters drop finished", pDb->name); mPrint("db:%s all meters drop finished", pDb->name);
sdbDeleteRow(tsDbSdb, pDb); sdbDeleteRow(tsDbSdb, pDb);
...@@ -734,7 +733,6 @@ void *mgmtDbActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { ...@@ -734,7 +733,6 @@ void *mgmtDbActionInsert(void *row, char *str, int32_t size, int32_t *ssize) {
pDb->numOfVgroups = 0; pDb->numOfVgroups = 0;
pDb->numOfTables = 0; pDb->numOfTables = 0;
pDb->vgTimer = NULL; pDb->vgTimer = NULL;
pDb->pMetric = NULL;
mgmtAddDbIntoAcct(pAcct, pDb); mgmtAddDbIntoAcct(pAcct, pDb);
return NULL; return NULL;
......
...@@ -15,15 +15,6 @@ ...@@ -15,15 +15,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtGrant.h"
#include "mgmtUtil.h"
#include "mgmtDb.h"
#include "mgmtDnodeInt.h"
#include "mgmtVgroup.h"
#include "mgmtTable.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tast.h" #include "tast.h"
#include "textbuffer.h" #include "textbuffer.h"
...@@ -33,45 +24,28 @@ ...@@ -33,45 +24,28 @@
#include "tsqlfunction.h" #include "tsqlfunction.h"
#include "ttime.h" #include "ttime.h"
#include "tstatus.h" #include "tstatus.h"
#include "sdb.h"
#include "mgmtSuperTable.h"
#include "mgmtChildTable.h"
#include "tutil.h" #include "tutil.h"
#include "mnode.h"
#include "mgmtAcct.h"
typedef struct { #include "mgmtChildTable.h"
char meterId[TSDB_TABLE_ID_LEN + 1]; #include "mgmtDb.h"
char type; #include "mgmtDnodeInt.h"
uint32_t cols; #include "mgmtGrant.h"
char data[]; #include "mgmtSuperTable.h"
} SMeterBatchUpdateMsg; #include "mgmtTable.h"
#include "mgmtUtil.h"
typedef struct { #include "mgmtVgroup.h"
int32_t col;
int32_t pos;
SSchema schema;
} SchemaUnit;
typedef struct {
char meterId[TSDB_TABLE_ID_LEN + 1];
char action;
int32_t dataSize;
char data[];
} SMeterUpdateMsg;
void *tsSuperTableSdb; void *tsSuperTableSdb;
void *(*mgmtSuperTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize); void *(*mgmtSuperTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtSuperTableActionInsert(void *row, char *str, int size, int *ssize); void *mgmtSuperTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtSuperTableActionDelete(void *row, char *str, int size, int *ssize); void *mgmtSuperTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtSuperTableActionUpdate(void *row, char *str, int size, int *ssize); void *mgmtSuperTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtSuperTableActionEncode(void *row, char *str, int size, int *ssize); void *mgmtSuperTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtSuperTableActionDecode(void *row, char *str, int size, int *ssize); void *mgmtSuperTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtSuperTableActionReset(void *row, char *str, int size, int *ssize); void *mgmtSuperTableActionReset(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtSuperTableActionDestroy(void *row, char *str, int size, int *ssize); void *mgmtSuperTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize);
static void mgmtDestroySuperTable(SSuperTableObj *pTable) { static void mgmtDestroySuperTable(SSuperTableObj *pTable) {
free(pTable->schema); free(pTable->schema);
...@@ -88,51 +62,56 @@ static void mgmtSuperTableActionInit() { ...@@ -88,51 +62,56 @@ static void mgmtSuperTableActionInit() {
mgmtSuperTableActionFp[SDB_TYPE_DESTROY] = mgmtSuperTableActionDestroy; mgmtSuperTableActionFp[SDB_TYPE_DESTROY] = mgmtSuperTableActionDestroy;
} }
void *mgmtSuperTableActionReset(void *row, char *str, int size, int *ssize) { void *mgmtSuperTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) {
SSuperTableObj *pTable = (SSuperTableObj *) row; SSuperTableObj *pTable = (SSuperTableObj *) row;
int tsize = pTable->updateEnd - (int8_t *) pTable; int32_t tsize = pTable->updateEnd - (int8_t *) pTable;
memcpy(pTable, str, tsize); memcpy(pTable, str, tsize);
pTable->schema = realloc(pTable->schema, pTable->schemaSize);
memcpy(pTable->schema, str + tsize, pTable->schemaSize); int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags);
pTable->schema = realloc(pTable->schema, schemaSize);
memcpy(pTable->schema, str + tsize, schemaSize);
return NULL; return NULL;
} }
void *mgmtSuperTableActionDestroy(void *row, char *str, int size, int *ssize) { void *mgmtSuperTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) {
SSuperTableObj *pTable = (SSuperTableObj *) row; SSuperTableObj *pTable = (SSuperTableObj *) row;
mgmtDestroySuperTable(pTable); mgmtDestroySuperTable(pTable);
return NULL; return NULL;
} }
void *mgmtSuperTableActionInsert(void *row, char *str, int size, int *ssize) { void *mgmtSuperTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize) {
return NULL; return NULL;
} }
void *mgmtSuperTableActionDelete(void *row, char *str, int size, int *ssize) { void *mgmtSuperTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize) {
return NULL; return NULL;
} }
void *mgmtSuperTableActionUpdate(void *row, char *str, int size, int *ssize) { void *mgmtSuperTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) {
return mgmtSuperTableActionReset(row, str, size, NULL); return mgmtSuperTableActionReset(row, str, size, NULL);
} }
void *mgmtSuperTableActionEncode(void *row, char *str, int size, int *ssize) { void *mgmtSuperTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize) {
SSuperTableObj *pTable = (SSuperTableObj *) row; SSuperTableObj *pTable = (SSuperTableObj *) row;
assert(row != NULL && str != NULL); assert(row != NULL && str != NULL);
int tsize = pTable->updateEnd - (int8_t *) pTable; int32_t tsize = pTable->updateEnd - (int8_t *) pTable;
if (size < tsize + pTable->schemaSize + 1) { int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags);
if (size < tsize + schemaSize + 1) {
*ssize = -1; *ssize = -1;
return NULL; return NULL;
} }
memcpy(str, pTable, tsize); memcpy(str, pTable, tsize);
memcpy(str + tsize, pTable->schema, pTable->schemaSize); memcpy(str + tsize, pTable->schema, schemaSize);
*ssize = tsize + pTable->schemaSize; *ssize = tsize + schemaSize;
return NULL; return NULL;
} }
void *mgmtSuperTableActionDecode(void *row, char *str, int size, int *ssize) { void *mgmtSuperTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize) {
assert(str != NULL); assert(str != NULL);
SSuperTableObj *pTable = (SSuperTableObj *)malloc(sizeof(SSuperTableObj)); SSuperTableObj *pTable = (SSuperTableObj *)malloc(sizeof(SSuperTableObj));
...@@ -141,24 +120,25 @@ void *mgmtSuperTableActionDecode(void *row, char *str, int size, int *ssize) { ...@@ -141,24 +120,25 @@ void *mgmtSuperTableActionDecode(void *row, char *str, int size, int *ssize) {
} }
memset(pTable, 0, sizeof(STabObj)); memset(pTable, 0, sizeof(STabObj));
int tsize = pTable->updateEnd - (int8_t *)pTable; int32_t tsize = pTable->updateEnd - (int8_t *)pTable;
if (size < tsize) { if (size < tsize) {
mgmtDestroySuperTable(pTable); mgmtDestroySuperTable(pTable);
return NULL; return NULL;
} }
memcpy(pTable, str, tsize); memcpy(pTable, str, tsize);
pTable->schema = malloc(pTable->schemaSize); int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags);
pTable->schema = malloc(schemaSize);
if (pTable->schema == NULL) { if (pTable->schema == NULL) {
mgmtDestroySuperTable(pTable); mgmtDestroySuperTable(pTable);
return NULL; return NULL;
} }
memcpy(pTable->schema, str + tsize, pTable->schemaSize); memcpy(pTable->schema, str + tsize, schemaSize);
return (void *)pTable; return (void *)pTable;
} }
void *mgmtSuperTableAction(char action, void *row, char *str, int size, int *ssize) { void *mgmtSuperTableAction(char action, void *row, char *str, int32_t size, int32_t *ssize) {
if (mgmtSuperTableActionFp[(uint8_t)action] != NULL) { if (mgmtSuperTableActionFp[(uint8_t)action] != NULL) {
return (*(mgmtSuperTableActionFp[(uint8_t)action]))(row, str, size, ssize); return (*(mgmtSuperTableActionFp[(uint8_t)action]))(row, str, size, ssize);
} }
...@@ -170,13 +150,12 @@ int32_t mgmtInitSuperTables() { ...@@ -170,13 +150,12 @@ int32_t mgmtInitSuperTables() {
void * pLastNode = NULL; void * pLastNode = NULL;
SSuperTableObj * pTable = NULL; SSuperTableObj * pTable = NULL;
// TODO: Make sure this function only run once
mgmtSuperTableActionInit(); mgmtSuperTableActionInit();
tsSuperTableSdb = sdbOpenTable(tsMaxTables, sizeof(STabObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_MAX_SQL_LEN, tsSuperTableSdb = sdbOpenTable(tsMaxTables, sizeof(STabObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_MAX_SQL_LEN,
"meters", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtSuperTableAction); "stable", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtSuperTableAction);
if (tsSuperTableSdb == NULL) { if (tsSuperTableSdb == NULL) {
mError("failed to init meter data"); mError("failed to init super table data");
return -1; return -1;
} }
...@@ -194,7 +173,7 @@ int32_t mgmtInitSuperTables() { ...@@ -194,7 +173,7 @@ int32_t mgmtInitSuperTables() {
pNode = pLastNode; pNode = pLastNode;
continue; continue;
} }
pTable->numOfMeters = 0; pTable->numOfTables = 0;
} }
mgmtSetVgroupIdPool(); mgmtSetVgroupIdPool();
...@@ -207,44 +186,44 @@ void mgmtCleanUpSuperTables() { ...@@ -207,44 +186,44 @@ void mgmtCleanUpSuperTables() {
} }
int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate) { int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate) {
int numOfTables = sdbGetNumOfRows(tsSuperTableSdb); int32_t numOfTables = sdbGetNumOfRows(tsSuperTableSdb);
if (numOfTables >= TSDB_MAX_TABLES) { if (numOfTables >= TSDB_MAX_TABLES) {
mError("super table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, TSDB_MAX_TABLES); mError("super table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, TSDB_MAX_TABLES);
return TSDB_CODE_TOO_MANY_TABLES; return TSDB_CODE_TOO_MANY_TABLES;
} }
SSuperTableObj *pMetric = (SSuperTableObj *)calloc(sizeof(SSuperTableObj), 1); SSuperTableObj *pStable = (SSuperTableObj *)calloc(sizeof(SSuperTableObj), 1);
if (pMetric == NULL) { if (pStable == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY; return TSDB_CODE_SERV_OUT_OF_MEMORY;
} }
strcpy(pMetric->tableId, pCreate->meterId); strcpy(pStable->tableId, pCreate->meterId);
pMetric->createdTime = taosGetTimestampMs(); pStable->createdTime = taosGetTimestampMs();
pMetric->vgId = 0; pStable->vgId = 0;
pMetric->sid = 0; pStable->sid = 0;
pMetric->uid = (((uint64_t)pMetric->createdTime) << 16) + ((uint64_t)sdbGetVersion() & ((1ul << 16) - 1ul)); pStable->uid = (((uint64_t)pStable->createdTime) << 16) + ((uint64_t)sdbGetVersion() & ((1ul << 16) - 1ul));
pMetric->sversion = 0; pStable->sversion = 0;
pMetric->numOfColumns = pCreate->numOfColumns; pStable->numOfColumns = pCreate->numOfColumns;
pMetric->numOfTags = pCreate->numOfTags; pStable->numOfTags = pCreate->numOfTags;
pMetric->numOfMeters = 0; pStable->numOfTables = 0;
int numOfCols = pCreate->numOfColumns + pCreate->numOfTags; int32_t numOfCols = pCreate->numOfColumns + pCreate->numOfTags;
pMetric->schemaSize = numOfCols * sizeof(SSchema); int32_t schemaSize = numOfCols * sizeof(SSchema);
pMetric->schema = (int8_t *)calloc(1, pMetric->schemaSize); pStable->schema = (SSchema *)calloc(1, schemaSize);
if (pMetric->schema == NULL) { if (pStable->schema == NULL) {
free(pMetric); free(pStable);
mError("table:%s, no schema input", pCreate->meterId); mError("table:%s, no schema input", pCreate->meterId);
return TSDB_CODE_INVALID_TABLE; return TSDB_CODE_INVALID_TABLE;
} }
memcpy(pMetric->schema, pCreate->schema, numOfCols * sizeof(SSchema)); memcpy(pStable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
pMetric->nextColId = 0; pStable->nextColId = 0;
for (int col = 0; col < pCreate->numOfColumns; col++) { for (int32_t col = 0; col < pCreate->numOfColumns; col++) {
SSchema *tschema = (SSchema *)pMetric->schema; SSchema *tschema = (SSchema *)pStable->schema;
tschema[col].colId = pMetric->nextColId++; tschema[col].colId = pStable->nextColId++;
} }
if (sdbInsertRow(tsSuperTableSdb, pMetric, 0) < 0) { if (sdbInsertRow(tsSuperTableSdb, pStable, 0) < 0) {
mError("table:%s, update sdb error", pCreate->meterId); mError("table:%s, update sdb error", pCreate->meterId);
return TSDB_CODE_SDB_ERROR; return TSDB_CODE_SDB_ERROR;
} }
...@@ -261,9 +240,9 @@ SSuperTableObj* mgmtGetSuperTable(char *tableId) { ...@@ -261,9 +240,9 @@ SSuperTableObj* mgmtGetSuperTable(char *tableId) {
return (SSuperTableObj *)sdbGetRow(tsSuperTableSdb, tableId); return (SSuperTableObj *)sdbGetRow(tsSuperTableSdb, tableId);
} }
int32_t mgmtFindTagCol(SSuperTableObj *pMetric, const char *tagName) { int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pStable, const char *tagName) {
for (int i = 0; i < pMetric->numOfTags; i++) { for (int32_t i = 0; i < pStable->numOfTags; i++) {
SSchema *schema = (SSchema *)(pMetric->schema + (pMetric->numOfColumns + i) * sizeof(SSchema)); SSchema *schema = (SSchema *)(pStable->schema + (pStable->numOfColumns + i) * sizeof(SSchema));
if (strcasecmp(tagName, schema->name) == 0) { if (strcasecmp(tagName, schema->name) == 0) {
return i; return i;
} }
...@@ -272,143 +251,128 @@ int32_t mgmtFindTagCol(SSuperTableObj *pMetric, const char *tagName) { ...@@ -272,143 +251,128 @@ int32_t mgmtFindTagCol(SSuperTableObj *pMetric, const char *tagName) {
return -1; return -1;
} }
int32_t mgmtAddSuperTableTag(SSuperTableObj *pMetric, SSchema schema[], int32_t ntags) { int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], int32_t ntags) {
if (pMetric->numOfTags + ntags > TSDB_MAX_TAGS) { if (pStable->numOfTags + ntags > TSDB_MAX_TAGS) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
// check if schemas have the same name // check if schemas have the same name
for (int i = 1; i < ntags; i++) { for (int32_t i = 1; i < ntags; i++) {
for (int j = 0; j < i; j++) { for (int32_t j = 0; j < i; j++) {
if (strcasecmp(schema[i].name, schema[j].name) == 0) { if (strcasecmp(schema[i].name, schema[j].name) == 0) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
} }
} }
for (int i = 0; i < ntags; i++) { SDbObj *pDb = mgmtGetDbByTableId(pStable->tableId);
if (mgmtFindTagCol(pMetric, schema[i].name) >= 0) { if (pDb == NULL) {
return TSDB_CODE_APP_ERROR; mError("meter: %s not belongs to any database", pStable->tableId);
} return TSDB_CODE_APP_ERROR;
} }
uint32_t size = sizeof(SMeterBatchUpdateMsg) + sizeof(SSchema) * ntags; SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
SMeterBatchUpdateMsg *msg = (SMeterBatchUpdateMsg *) malloc(size); if (pAcct == NULL) {
memset(msg, 0, size); mError("DB: %s not belongs to andy account", pDb->name);
return TSDB_CODE_APP_ERROR;
}
memcpy(msg->meterId, pMetric->tableId, TSDB_TABLE_ID_LEN); int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns);
msg->type = SDB_TYPE_INSERT; pStable->schema = realloc(pStable->schema, schemaSize + sizeof(SSchema) * ntags);
msg->cols = ntags;
memcpy(msg->data, schema, sizeof(SSchema) * ntags);
int32_t ret = sdbBatchUpdateRow(tsSuperTableSdb, msg, size); memmove(pStable->schema + sizeof(SSchema) * (pStable->numOfColumns + ntags),
tfree(msg); pStable->schema + sizeof(SSchema) * pStable->numOfColumns, sizeof(SSchema) * pStable->numOfTags);
// memcpy(pStable->schema + sizeof(SSchema) * pStable->numOfColumns, schema, sizeof(SSchema) * ntags);
// if (msg->type == SDB_TYPE_INSERT) { // Insert schema
// uint32_t total_cols = pTable->numOfColumns + pTable->numOfTags;
// pTable->schema = realloc(pTable->schema, (total_cols + msg->cols) * sizeof(SSchema));
// pTable->schemaSize = (total_cols + msg->cols) * sizeof(SSchema);
// pTable->numOfTags += msg->cols;
// memcpy(pTable->schema + total_cols * sizeof(SSchema), msg->data, msg->cols * sizeof(SSchema));
//
// }
if (ret < 0) { SSchema *tschema = (SSchema *) (pStable->schema + sizeof(SSchema) * pStable->numOfColumns);
mError("Failed to add tag column %s to table %s", schema[0].name, pMetric->tableId); for (int32_t i = 0; i < ntags; i++) {
return TSDB_CODE_APP_ERROR; tschema[i].colId = pStable->nextColId++;
} }
mTrace("Succeed to add tag column %s to table %s", schema[0].name, pMetric->tableId); pStable->numOfColumns += ntags;
pStable->sversion++;
pAcct->acctInfo.numOfTimeSeries += (ntags * pStable->numOfTables);
sdbUpdateRow(tsSuperTableSdb, pStable, 0, 1);
mTrace("Succeed to add tag column %s to table %s", schema[0].name, pStable->tableId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t mgmtDropSuperTableTag(SSuperTableObj *pMetric, char *tagName) { int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) {
int col = mgmtFindTagCol(pMetric, tagName); int32_t col = mgmtFindSuperTableTagIndex(pStable, tagName);
if (col <= 0 || col >= pMetric->numOfTags) { if (col <= 0 || col >= pStable->numOfTags) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
// Pack message to do batch update SDbObj *pDb = mgmtGetDbByTableId(pStable->tableId);
uint32_t size = sizeof(SMeterBatchUpdateMsg) + sizeof(SchemaUnit); if (pDb == NULL) {
SMeterBatchUpdateMsg *msg = (SMeterBatchUpdateMsg *) malloc(size); mError("table: %s not belongs to any database", pStable->tableId);
memset(msg, 0, size); return TSDB_CODE_APP_ERROR;
}
memcpy(msg->meterId, pMetric->tableId, TSDB_TABLE_ID_LEN);
msg->type = SDB_TYPE_DELETE;
msg->cols = 1;
//
// // Make sure the order of tag columns
// SchemaUnit *schemaUnit = (SchemaUnit *)(msg->data);
// int col = schemaUnit->col;
// assert(col > 0 && col < pTable->numOfTags);
// if (col < pTable->numOfTags - 1) {
// memmove(pTable->schema + sizeof(SSchema) * (pTable->numOfColumns + col),
// pTable->schema + sizeof(SSchema) * (pTable->numOfColumns + col + 1),
// pTable->schemaSize - (sizeof(SSchema) * (pTable->numOfColumns + col + 1)));
// }
// pTable->schemaSize -= sizeof(SSchema);
// pTable->numOfTags--;
// pTable->schema = realloc(pTable->schema, pTable->schemaSize);
//
((SchemaUnit *) (msg->data))->col = col;
((SchemaUnit *) (msg->data))->pos = mgmtGetTagsLength(pMetric, col) + TSDB_TABLE_ID_LEN;
((SchemaUnit *) (msg->data))->schema = *(SSchema *) (pMetric->schema + sizeof(SSchema) * (pMetric->numOfColumns + col));
int32_t ret = sdbBatchUpdateRow(tsSuperTableSdb, msg, size);
tfree(msg);
if (ret < 0) { SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
mError("Failed to drop tag column: %d from table: %s", col, pMetric->tableId); if (pAcct == NULL) {
mError("DB: %s not belongs to any account", pDb->name);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
mTrace("Succeed to drop tag column: %d from table: %s", col, pMetric->tableId); memmove(pStable->schema + sizeof(SSchema) * col, pStable->schema + sizeof(SSchema) * (col + 1),
sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags - col - 1));
pStable->numOfTags--;
pStable->sversion++;
int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns);
pStable->schema = realloc(pStable->schema, schemaSize);
sdbUpdateRow(tsSuperTableSdb, pStable, 0, 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pMetric, char *oldTagName, char *newTagName) { int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char *oldTagName, char *newTagName) {
int col = mgmtFindTagCol(pMetric, oldTagName); int32_t col = mgmtFindSuperTableTagIndex(pStable, oldTagName);
if (col < 0) { if (col < 0) {
// Tag name does not exist // Tag name does not exist
mError("Failed to modify table %s tag column, oname: %s, nname: %s", pMetric->tableId, oldTagName, newTagName); mError("Failed to modify table %s tag column, oname: %s, nname: %s", pStable->tableId, oldTagName, newTagName);
return TSDB_CODE_INVALID_MSG_TYPE; return TSDB_CODE_INVALID_MSG_TYPE;
} }
int rowSize = 0; int32_t rowSize = 0;
uint32_t len = strlen(newTagName); uint32_t len = strlen(newTagName);
if (col >= pMetric->numOfTags || len >= TSDB_COL_NAME_LEN || mgmtFindTagCol(pMetric, newTagName) >= 0) { if (col >= pStable->numOfTags || len >= TSDB_COL_NAME_LEN || mgmtFindSuperTableTagIndex(pStable, newTagName) >= 0) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
// update // update
SSchema *schema = (SSchema *) (pMetric->schema + (pMetric->numOfColumns + col) * sizeof(SSchema)); SSchema *schema = (SSchema *) (pStable->schema + (pStable->numOfColumns + col) * sizeof(SSchema));
strncpy(schema->name, newTagName, TSDB_COL_NAME_LEN); strncpy(schema->name, newTagName, TSDB_COL_NAME_LEN);
// Encode string // Encode string
int size = 1 + sizeof(STabObj) + TSDB_MAX_BYTES_PER_ROW; int32_t size = 1 + sizeof(STabObj) + TSDB_MAX_BYTES_PER_ROW;
char *msg = (char *) malloc(size); char *msg = (char *) malloc(size);
if (msg == NULL) return TSDB_CODE_APP_ERROR; if (msg == NULL) return TSDB_CODE_APP_ERROR;
memset(msg, 0, size); memset(msg, 0, size);
mgmtSuperTableActionEncode(pMetric, msg, size, &rowSize); mgmtSuperTableActionEncode(pStable, msg, size, &rowSize);
int32_t ret = sdbUpdateRow(tsSuperTableSdb, msg, rowSize, 1); int32_t ret = sdbUpdateRow(tsSuperTableSdb, msg, rowSize, 1);
tfree(msg); tfree(msg);
if (ret < 0) { if (ret < 0) {
mError("Failed to modify table %s tag column", pMetric->tableId); mError("Failed to modify table %s tag column", pStable->tableId);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
mTrace("Succeed to modify table %s tag column", pMetric->tableId); mTrace("Succeed to modify table %s tag column", pStable->tableId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mgmtFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName) {
static int32_t mgmtFindSuperTableColumnIndex(SSuperTableObj *pMetric, char *colName) { SSchema *schema = (SSchema *) pStable->schema;
SSchema *schema = (SSchema *) pMetric->schema; for (int32_t i = 0; i < pStable->numOfColumns; i++) {
for (int32_t i = 0; i < pMetric->numOfColumns; i++) {
if (strcasecmp(schema[i].name, colName) == 0) { if (strcasecmp(schema[i].name, colName) == 0) {
return i; return i;
} }
...@@ -417,20 +381,20 @@ static int32_t mgmtFindSuperTableColumnIndex(SSuperTableObj *pMetric, char *colN ...@@ -417,20 +381,20 @@ static int32_t mgmtFindSuperTableColumnIndex(SSuperTableObj *pMetric, char *colN
return -1; return -1;
} }
int32_t mgmtAddSuperTableColumn(SSuperTableObj *pMetric, SSchema schema[], int ncols) { int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[], int32_t ncols) {
if (ncols <= 0) { if (ncols <= 0) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
for (int i = 0; i < ncols; i++) { for (int32_t i = 0; i < ncols; i++) {
if (mgmtFindSuperTableColumnIndex(pMetric, schema[i].name) > 0) { if (mgmtFindSuperTableColumnIndex(pStable, schema[i].name) > 0) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
} }
SDbObj *pDb = mgmtGetDbByTableId(pMetric->tableId); SDbObj *pDb = mgmtGetDbByTableId(pStable->tableId);
if (pDb == NULL) { if (pDb == NULL) {
mError("meter: %s not belongs to any database", pMetric->tableId); mError("meter: %s not belongs to any database", pStable->tableId);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
...@@ -440,36 +404,36 @@ int32_t mgmtAddSuperTableColumn(SSuperTableObj *pMetric, SSchema schema[], int n ...@@ -440,36 +404,36 @@ int32_t mgmtAddSuperTableColumn(SSuperTableObj *pMetric, SSchema schema[], int n
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
pMetric->schema = realloc(pMetric->schema, pMetric->schemaSize + sizeof(SSchema) * ncols); int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns);
pStable->schema = realloc(pStable->schema, schemaSize + sizeof(SSchema) * ncols);
memmove(pMetric->schema + sizeof(SSchema) * (pMetric->numOfColumns + ncols), memmove(pStable->schema + sizeof(SSchema) * (pStable->numOfColumns + ncols),
pMetric->schema + sizeof(SSchema) * pMetric->numOfColumns, sizeof(SSchema) * pMetric->numOfTags); pStable->schema + sizeof(SSchema) * pStable->numOfColumns, sizeof(SSchema) * pStable->numOfTags);
memcpy(pMetric->schema + sizeof(SSchema) * pMetric->numOfColumns, schema, sizeof(SSchema) * ncols); memcpy(pStable->schema + sizeof(SSchema) * pStable->numOfColumns, schema, sizeof(SSchema) * ncols);
SSchema *tschema = (SSchema *) (pMetric->schema + sizeof(SSchema) * pMetric->numOfColumns); SSchema *tschema = (SSchema *) (pStable->schema + sizeof(SSchema) * pStable->numOfColumns);
for (int i = 0; i < ncols; i++) { for (int32_t i = 0; i < ncols; i++) {
tschema[i].colId = pMetric->nextColId++; tschema[i].colId = pStable->nextColId++;
} }
pMetric->schemaSize += sizeof(SSchema) * ncols; pStable->numOfColumns += ncols;
pMetric->numOfColumns += ncols; pStable->sversion++;
pMetric->sversion++;
pAcct->acctInfo.numOfTimeSeries += (ncols * pMetric->numOfMeters); pAcct->acctInfo.numOfTimeSeries += (ncols * pStable->numOfTables);
sdbUpdateRow(tsSuperTableSdb, pMetric, 0, 1); sdbUpdateRow(tsSuperTableSdb, pStable, 0, 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pMetric, char *colName) { int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pStable, char *colName) {
int32_t col = mgmtFindSuperTableColumnIndex(pMetric, colName); int32_t col = mgmtFindSuperTableColumnIndex(pStable, colName);
if (col < 0) { if (col < 0) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
SDbObj *pDb = mgmtGetDbByTableId(pMetric->tableId); SDbObj *pDb = mgmtGetDbByTableId(pStable->tableId);
if (pDb == NULL) { if (pDb == NULL) {
mError("table: %s not belongs to any database", pMetric->tableId); mError("table: %s not belongs to any database", pStable->tableId);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
...@@ -479,23 +443,23 @@ int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pMetric, char *colName) { ...@@ -479,23 +443,23 @@ int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pMetric, char *colName) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
memmove(pMetric->schema + sizeof(SSchema) * col, pMetric->schema + sizeof(SSchema) * (col + 1), memmove(pStable->schema + sizeof(SSchema) * col, pStable->schema + sizeof(SSchema) * (col + 1),
sizeof(SSchema) * (pMetric->numOfColumns + pMetric->numOfTags - col - 1)); sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags - col - 1));
pStable->numOfColumns--;
pStable->sversion++;
pMetric->schemaSize -= sizeof(SSchema); int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns);
pMetric->numOfColumns--; pStable->schema = realloc(pStable->schema, schemaSize);
pMetric->schema = realloc(pMetric->schema, pMetric->schemaSize);
pMetric->sversion++;
pAcct->acctInfo.numOfTimeSeries -= (pMetric->numOfMeters); pAcct->acctInfo.numOfTimeSeries -= (pStable->numOfTables);
sdbUpdateRow(tsSuperTableSdb, pMetric, 0, 1); sdbUpdateRow(tsSuperTableSdb, pStable, 0, 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { int32_t cols = 0;
int cols = 0;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name); if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
...@@ -538,7 +502,7 @@ int mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { ...@@ -538,7 +502,7 @@ int mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
pShow->numOfColumns = cols; pShow->numOfColumns = cols;
pShow->offset[0] = 0; pShow->offset[0] = 0;
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->numOfRows = pDb->numOfMetrics; pShow->numOfRows = pDb->numOfMetrics;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
...@@ -546,10 +510,10 @@ int mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { ...@@ -546,10 +510,10 @@ int mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { int32_t mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int numOfRows = 0; int32_t numOfRows = 0;
char * pWrite; char * pWrite;
int cols = 0; int32_t cols = 0;
SSuperTableObj *pTable = NULL; SSuperTableObj *pTable = NULL;
char prefix[20] = {0}; char prefix[20] = {0};
int32_t prefixLen; int32_t prefixLen;
...@@ -611,7 +575,7 @@ int mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int rows, SConnObj *pCo ...@@ -611,7 +575,7 @@ int mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int rows, SConnObj *pCo
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pTable->numOfMeters; *(int32_t *)pWrite = pTable->numOfTables;
cols++; cols++;
numOfRows++; numOfRows++;
...@@ -621,37 +585,14 @@ int mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int rows, SConnObj *pCo ...@@ -621,37 +585,14 @@ int mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int rows, SConnObj *pCo
return numOfRows; return numOfRows;
} }
int32_t mgmtAddMeterIntoMetric(SSuperTableObj *pStable, SChildTableObj *pTable) {
int mgmtAddMeterIntoMetric(STabObj *pMetric, STabObj *pTable) { if (pTable != NULL && pStable != NULL) return -1;
if (pTable == NULL || pMetric == NULL) return -1; pStable->numOfTables++;
pthread_rwlock_wrlock(&(pMetric->rwLock));
// add meter into skip list
pTable->next = pMetric->pHead;
pTable->prev = NULL;
if (pMetric->pHead) pMetric->pHead->prev = pTable;
pMetric->pHead = pTable;
pMetric->numOfMeters++;
pthread_rwlock_unlock(&(pMetric->rwLock));
return 0; return 0;
} }
int mgmtRemoveMeterFromMetric(STabObj *pMetric, STabObj *pTable) { int32_t mgmtRemoveMeterFromMetric(SSuperTableObj *pStable, SChildTableObj *pTable) {
pthread_rwlock_wrlock(&(pMetric->rwLock)); if (pTable != NULL && pStable != NULL) return -1;
pStable->numOfTables--;
if (pTable->prev) pTable->prev->next = pTable->next;
if (pTable->next) pTable->next->prev = pTable->prev;
if (pTable->prev == NULL) pMetric->pHead = pTable->next;
pMetric->numOfMeters--;
pthread_rwlock_unlock(&(pMetric->rwLock));
return 0; return 0;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册