提交 649ef3d3 编写于 作者: S slguan

#1177

上级 5e054576
...@@ -151,12 +151,13 @@ typedef struct SSuperTableObj { ...@@ -151,12 +151,13 @@ typedef struct SSuperTableObj {
int32_t schemaSize; int32_t schemaSize;
int8_t reserved[7]; int8_t reserved[7];
int8_t updateEnd[1]; int8_t updateEnd[1];
int16_t nextColId;
pthread_rwlock_t rwLock; pthread_rwlock_t rwLock;
tSkipList * pSkipList;
struct SSuperTableObj *pHead;
struct SSuperTableObj *prev, *next; struct SSuperTableObj *prev, *next;
int8_t* schema; int16_t nextColId;
int8_t *schema;
} SSuperTableObj; } SSuperTableObj;
typedef struct { typedef struct {
...@@ -180,8 +181,8 @@ typedef struct { ...@@ -180,8 +181,8 @@ typedef struct {
int32_t sversion; int32_t sversion;
int32_t numOfColumns; int32_t numOfColumns;
int32_t schemaSize; int32_t schemaSize;
char reserved[3]; int8_t reserved[3];
char updateEnd[1]; int8_t updateEnd[1];
int16_t nextColId; int16_t nextColId;
char* schema; char* schema;
} SNormalTableObj; } SNormalTableObj;
...@@ -196,8 +197,8 @@ typedef struct { ...@@ -196,8 +197,8 @@ typedef struct {
int32_t numOfColumns; int32_t numOfColumns;
int32_t schemaSize; int32_t schemaSize;
int16_t sqlLen; int16_t sqlLen;
char reserved[3]; int8_t reserved[3];
char updateEnd[1]; int8_t updateEnd[1];
int16_t nextColId; int16_t nextColId;
char* pSql; //null-terminated string char* pSql; //null-terminated string
char* schema; char* schema;
...@@ -216,8 +217,8 @@ typedef struct _vg_obj { ...@@ -216,8 +217,8 @@ typedef struct _vg_obj {
int32_t lbIp; int32_t lbIp;
int32_t lbTime; int32_t lbTime;
int8_t lbStatus; int8_t lbStatus;
char reserved[16]; int8_t reserved[16];
char updateEnd[1]; int8_t updateEnd[1];
struct _vg_obj *prev, *next; struct _vg_obj *prev, *next;
void * idPool; void * idPool;
STabObj ** meterList; STabObj ** meterList;
......
...@@ -45,6 +45,10 @@ extern void (*mgmtCleanUpAccts)(); ...@@ -45,6 +45,10 @@ extern void (*mgmtCleanUpAccts)();
extern int32_t (*mgmtGetAcctMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); extern int32_t (*mgmtGetAcctMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
extern int32_t (*mgmtRetrieveAccts)(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn); extern int32_t (*mgmtRetrieveAccts)(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
void mgmtAddMeterStatisticToAcct(SAcctObj *pAcct, int numOfColumns);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -184,3 +184,9 @@ int32_t (*mgmtGetAcctMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) ...@@ -184,3 +184,9 @@ int32_t (*mgmtGetAcctMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn)
int32_t mgmtRetrieveAcctsImp(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { return 0; } int32_t mgmtRetrieveAcctsImp(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { return 0; }
int32_t (*mgmtRetrieveAccts)(SShowObj *pShow, char *data, int rows, SConnObj *pConn) = mgmtRetrieveAcctsImp; int32_t (*mgmtRetrieveAccts)(SShowObj *pShow, char *data, int rows, SConnObj *pConn) = mgmtRetrieveAcctsImp;
void mgmtAddMeterStatisticToAcct(SAcctObj *pAcct, int numOfColumns) {
pAcct->acctInfo.numOfTimeSeries += (numOfColumns - 1);
}
...@@ -23,7 +23,6 @@ ...@@ -23,7 +23,6 @@
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDnodeInt.h" #include "mgmtDnodeInt.h"
#include "mgmtVgroup.h" #include "mgmtVgroup.h"
#include "mgmtSupertableQuery.h"
#include "mgmtTable.h" #include "mgmtTable.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tast.h" #include "tast.h"
...@@ -38,9 +37,167 @@ ...@@ -38,9 +37,167 @@
#include "sdb.h" #include "sdb.h"
#include "mgmtChildTable.h" #include "mgmtChildTable.h"
#include "mgmtSuperTable.h" #include "mgmtChildTable.h"
void *tsChildTableSdb;
void *(*mgmtChildTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize);
void *mgmtChildTableActionInsert(void *row, char *str, int size, int *ssize);
void *mgmtChildTableActionDelete(void *row, char *str, int size, int *ssize);
void *mgmtChildTableActionUpdate(void *row, char *str, int size, int *ssize);
void *mgmtChildTableActionEncode(void *row, char *str, int size, int *ssize);
void *mgmtChildTableActionDecode(void *row, char *str, int size, int *ssize);
void *mgmtChildTableActionReset(void *row, char *str, int size, int *ssize);
void *mgmtChildTableActionDestroy(void *row, char *str, int size, int *ssize);
static void mgmtDestroyChildTable(SChildTableObj *pTable) {
free(pTable);
}
static void mgmtChildTableActionInit() {
mgmtChildTableActionFp[SDB_TYPE_INSERT] = mgmtChildTableActionInsert;
mgmtChildTableActionFp[SDB_TYPE_DELETE] = mgmtChildTableActionDelete;
mgmtChildTableActionFp[SDB_TYPE_UPDATE] = mgmtChildTableActionUpdate;
mgmtChildTableActionFp[SDB_TYPE_ENCODE] = mgmtChildTableActionEncode;
mgmtChildTableActionFp[SDB_TYPE_DECODE] = mgmtChildTableActionDecode;
mgmtChildTableActionFp[SDB_TYPE_RESET] = mgmtChildTableActionReset;
mgmtChildTableActionFp[SDB_TYPE_DESTROY] = mgmtChildTableActionDestroy;
}
void *mgmtChildTableActionReset(void *row, char *str, int size, int *ssize) {
return NULL;
}
void *mgmtChildTableActionDestroy(void *row, char *str, int size, int *ssize) {
SChildTableObj *pTable = (SChildTableObj *)row;
mgmtDestroyChildTable(pTable);
return NULL;
}
void *mgmtChildTableActionInsert(void *row, char *str, int size, int *ssize) {
SChildTableObj *pTable = (SChildTableObj *) row;
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId);
return NULL;
}
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) {
mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName);
return NULL;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("account not exists");
return NULL;
}
if (!sdbMaster) {
int sid = taosAllocateId(pVgroup->idPool);
if (sid != pTable->sid) {
mError("sid:%d is not matched from the master:%d", sid, pTable->sid);
return NULL;
}
}
mgmtAddMeterIntoMetric(pTable->superTableId, pTable);
pAcct->acctInfo.numOfTimeSeries += (pTable->superTable->numOfColumns - 1);
pVgroup->numOfMeters++;
pDb->numOfTables++;
pVgroup->meterList[pTable->sid] = pTable;
if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) {
mgmtMoveVgroupToTail(pDb, pVgroup);
}
return NULL;
}
void *mgmtChildTableActionDelete(void *row, char *str, int size, int *ssize) {
SChildTableObj *pTable = (SChildTableObj *) row;
if (pTable->vgId == 0) {
return NULL;
}
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId);
return NULL;
}
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) {
mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName);
return NULL;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("account not exists");
return NULL;
}
pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1);
pVgroup->meterList[pTable->sid] = NULL;
pVgroup->numOfMeters--;
pDb->numOfTables--;
taosFreeId(pVgroup->idPool, pTable->sid);
mgmtRemoveMeterFromMetric(pTable->superTable, pTable);
if (pVgroup->numOfMeters > 0) {
mgmtMoveVgroupToHead(pDb, pVgroup);
}
return NULL;
}
void *mgmtChildTableActionUpdate(void *row, char *str, int size, int *ssize) {
return mgmtChildTableActionReset(row, str, size, NULL);
}
void *mgmtChildTableActionEncode(void *row, char *str, int size, int *ssize) {
SChildTableObj *pTable = (SChildTableObj *) row;
assert(row != NULL && str != NULL);
int tsize = pTable->updateEnd - (int8_t *) pTable;
memcpy(str, pTable, tsize);
return NULL;
}
void *mgmtChildTableActionDecode(void *row, char *str, int size, int *ssize) {
assert(str != NULL);
SChildTableObj *pTable = (SChildTableObj *)malloc(sizeof(SChildTableObj));
if (pTable == NULL) {
return NULL;
}
memset(pTable, 0, sizeof(STabObj));
int tsize = pTable->updateEnd - (int8_t *)pTable;
if (size < tsize) {
mgmtDestroyChildTable(pTable);
return NULL;
}
memcpy(pTable, str, tsize);
return (void *)pTable;
}
void *mgmtChildTableAction(char action, void *row, char *str, int size, int *ssize) {
if (mgmtChildTableActionFp[(uint8_t)action] != NULL) {
return (*(mgmtChildTableActionFp[(uint8_t)action]))(row, str, size, ssize);
}
return NULL;
}
int32_t mgmtInitChildTables() { int32_t mgmtInitChildTables() {
return 0; return 0;
......
...@@ -23,7 +23,6 @@ ...@@ -23,7 +23,6 @@
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDnodeInt.h" #include "mgmtDnodeInt.h"
#include "mgmtVgroup.h" #include "mgmtVgroup.h"
#include "mgmtSupertableQuery.h"
#include "mgmtTable.h" #include "mgmtTable.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tast.h" #include "tast.h"
...@@ -39,6 +38,178 @@ ...@@ -39,6 +38,178 @@
#include "sdb.h" #include "sdb.h"
#include "mgmtNormalTable.h" #include "mgmtNormalTable.h"
void *tsSuperTableSdb;
void *(*mgmtNormalTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize);
void *mgmtNormalTableActionInsert(void *row, char *str, int size, int *ssize);
void *mgmtNormalTableActionDelete(void *row, char *str, int size, int *ssize);
void *mgmtNormalTableActionUpdate(void *row, char *str, int size, int *ssize);
void *mgmtNormalTableActionEncode(void *row, char *str, int size, int *ssize);
void *mgmtNormalTableActionDecode(void *row, char *str, int size, int *ssize);
void *mgmtNormalTableActionReset(void *row, char *str, int size, int *ssize);
void *mgmtNormalTableActionDestroy(void *row, char *str, int size, int *ssize);
static void mgmtDestroyNormalTable(SNormalTableObj *pTable) {
free(pTable->schema);
free(pTable);
}
static void mgmtNormalTableActionInit() {
mgmtNormalTableActionFp[SDB_TYPE_INSERT] = mgmtNormalTableActionInsert;
mgmtNormalTableActionFp[SDB_TYPE_DELETE] = mgmtNormalTableActionDelete;
mgmtNormalTableActionFp[SDB_TYPE_UPDATE] = mgmtNormalTableActionUpdate;
mgmtNormalTableActionFp[SDB_TYPE_ENCODE] = mgmtNormalTableActionEncode;
mgmtNormalTableActionFp[SDB_TYPE_DECODE] = mgmtNormalTableActionDecode;
mgmtNormalTableActionFp[SDB_TYPE_RESET] = mgmtNormalTableActionReset;
mgmtNormalTableActionFp[SDB_TYPE_DESTROY] = mgmtNormalTableActionDestroy;
}
void *mgmtNormalTableActionReset(void *row, char *str, int size, int *ssize) {
SNormalTableObj *pTable = (SNormalTableObj *) row;
int tsize = pTable->updateEnd - (int8_t *) pTable;
memcpy(pTable, str, tsize);
return NULL;
}
void *mgmtNormalTableActionDestroy(void *row, char *str, int size, int *ssize) {
SNormalTableObj *pTable = (SNormalTableObj *)row;
mgmtDestroyNormalTable(pTable);
return NULL;
}
void *mgmtNormalTableActionInsert(void *row, char *str, int size, int *ssize) {
SNormalTableObj *pTable = (SNormalTableObj *) row;
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId);
return NULL;
}
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) {
mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName);
return NULL;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("account not exists");
return NULL;
}
if (!sdbMaster) {
int sid = taosAllocateId(pVgroup->idPool);
if (sid != pTable->sid) {
mError("sid:%d is not matched from the master:%d", sid, pTable->sid);
return NULL;
}
}
pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1);
pVgroup->numOfMeters++;
pDb->numOfTables++;
pVgroup->meterList[pTable->sid] = pTable;
if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) {
mgmtMoveVgroupToTail(pDb, pVgroup);
}
return NULL;
}
void *mgmtNormalTableActionDelete(void *row, char *str, int size, int *ssize) {
SNormalTableObj *pTable = (SNormalTableObj *) row;
if (pTable->vgId == 0) {
return NULL;
}
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId);
return NULL;
}
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) {
mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName);
return NULL;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("account not exists");
return NULL;
}
pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1);
pVgroup->meterList[pTable->sid] = NULL;
pVgroup->numOfMeters--;
pDb->numOfTables--;
taosFreeId(pVgroup->idPool, pTable->sid);
if (pVgroup->numOfMeters > 0) {
mgmtMoveVgroupToHead(pDb, pVgroup);
}
return NULL;
}
void *mgmtNormalTableActionUpdate(void *row, char *str, int size, int *ssize) {
return mgmtNormalTableActionReset(row, str, size, NULL);
}
void *mgmtNormalTableActionEncode(void *row, char *str, int size, int *ssize) {
SNormalTableObj *pTable = (SNormalTableObj *) row;
assert(row != NULL && str != NULL);
int tsize = pTable->updateEnd - (int8_t *) pTable;
if (size < tsize + pTable->schemaSize + 1) {
*ssize = -1;
return NULL;
}
memcpy(str, pTable, tsize);
memcpy(str + tsize, pTable->schema, pTable->schemaSize);
*ssize = tsize + pTable->schemaSize;
return NULL;
}
void *mgmtNormalTableActionDecode(void *row, char *str, int size, int *ssize) {
assert(str != NULL);
SNormalTableObj *pTable = (SNormalTableObj *)malloc(sizeof(SNormalTableObj));
if (pTable == NULL) {
return NULL;
}
memset(pTable, 0, sizeof(STabObj));
int tsize = pTable->updateEnd - (int8_t *)pTable;
if (size < tsize) {
mgmtDestroyNormalTable(pTable);
return NULL;
}
memcpy(pTable, str, tsize);
pTable->schema = (char *)malloc(pTable->schemaSize);
if (pTable->schema == NULL) {
mgmtDestroyNormalTable(pTable);
return NULL;
}
memcpy(pTable->schema, str + tsize, pTable->schemaSize);
return (void *)pTable;
}
void *mgmtNormalTableAction(char action, void *row, char *str, int size, int *ssize) {
if (mgmtNormalTableActionFp[(uint8_t)action] != NULL) {
return (*(mgmtNormalTableActionFp[(uint8_t)action]))(row, str, size, ssize);
}
return NULL;
}
int32_t mgmtInitNormalTables() { int32_t mgmtInitNormalTables() {
return 0; return 0;
} }
......
...@@ -23,7 +23,6 @@ ...@@ -23,7 +23,6 @@
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDnodeInt.h" #include "mgmtDnodeInt.h"
#include "mgmtVgroup.h" #include "mgmtVgroup.h"
#include "mgmtStreamtableQuery.h"
#include "mgmtTable.h" #include "mgmtTable.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tast.h" #include "tast.h"
...@@ -39,6 +38,190 @@ ...@@ -39,6 +38,190 @@
#include "sdb.h" #include "sdb.h"
#include "mgmtStreamTable.h" #include "mgmtStreamTable.h"
void *tsSuperTableSdb;
void *(*mgmtStreamTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize);
void *mgmtStreamTableActionInsert(void *row, char *str, int size, int *ssize);
void *mgmtStreamTableActionDelete(void *row, char *str, int size, int *ssize);
void *mgmtStreamTableActionUpdate(void *row, char *str, int size, int *ssize);
void *mgmtStreamTableActionEncode(void *row, char *str, int size, int *ssize);
void *mgmtStreamTableActionDecode(void *row, char *str, int size, int *ssize);
void *mgmtStreamTableActionReset(void *row, char *str, int size, int *ssize);
void *mgmtStreamTableActionDestroy(void *row, char *str, int size, int *ssize);
static void mgmtDestroyStreamTable(SStreamTableObj *pTable) {
free(pTable->schema);
free(pTable->pSql);
free(pTable);
}
static void mgmtStreamTableActionInit() {
mgmtStreamTableActionFp[SDB_TYPE_INSERT] = mgmtStreamTableActionInsert;
mgmtStreamTableActionFp[SDB_TYPE_DELETE] = mgmtStreamTableActionDelete;
mgmtStreamTableActionFp[SDB_TYPE_UPDATE] = mgmtStreamTableActionUpdate;
mgmtStreamTableActionFp[SDB_TYPE_ENCODE] = mgmtStreamTableActionEncode;
mgmtStreamTableActionFp[SDB_TYPE_DECODE] = mgmtStreamTableActionDecode;
mgmtStreamTableActionFp[SDB_TYPE_RESET] = mgmtStreamTableActionReset;
mgmtStreamTableActionFp[SDB_TYPE_DESTROY] = mgmtStreamTableActionDestroy;
}
void *mgmtStreamTableActionReset(void *row, char *str, int size, int *ssize) {
SStreamTableObj *pTable = (SStreamTableObj *) row;
int tsize = pTable->updateEnd - (int8_t *) pTable;
memcpy(pTable, str, tsize);
pTable->schema = (char *) realloc(pTable->schema, pTable->schemaSize);
memcpy(pTable->schema, str + tsize, pTable->schemaSize);
pTable->pSql = (char *) realloc(pTable->pSql, pTable->sqlLen);
memcpy(pTable->pSql, str + tsize + pTable->schemaSize, pTable->sqlLen);
return NULL;
}
void *mgmtStreamTableActionDestroy(void *row, char *str, int size, int *ssize) {
SSuperTableObj *pTable = (STabObj *)row;
mgmtDestroyStreamTable(pTable);
return NULL;
}
void *mgmtStreamTableActionInsert(void *row, char *str, int size, int *ssize) {
SNormalTableObj *pTable = (SNormalTableObj *) row;
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId);
return NULL;
}
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) {
mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName);
return NULL;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("account not exists");
return NULL;
}
if (!sdbMaster) {
int sid = taosAllocateId(pVgroup->idPool);
if (sid != pTable->sid) {
mError("sid:%d is not matched from the master:%d", sid, pTable->sid);
return NULL;
}
}
pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1);
pVgroup->numOfMeters++;
pDb->numOfTables++;
pVgroup->meterList[pTable->sid] = pTable;
if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) {
mgmtMoveVgroupToTail(pDb, pVgroup);
}
return NULL;
}
void *mgmtStreamTableActionDelete(void *row, char *str, int size, int *ssize) {
SNormalTableObj *pTable = (SNormalTableObj *) row;
if (pTable->vgId == 0) {
return NULL;
}
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId);
return NULL;
}
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) {
mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName);
return NULL;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("account not exists");
return NULL;
}
pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1);
pVgroup->meterList[pTable->sid] = NULL;
pVgroup->numOfMeters--;
pDb->numOfTables--;
taosFreeId(pVgroup->idPool, pTable->sid);
if (pVgroup->numOfMeters > 0) {
mgmtMoveVgroupToHead(pDb, pVgroup);
}
return NULL;
}
void *mgmtStreamTableActionUpdate(void *row, char *str, int size, int *ssize) {
return mgmtStreamTableActionReset(row, str, size, NULL);
}
void *mgmtStreamTableActionEncode(void *row, char *str, int size, int *ssize) {
SStreamTableObj *pTable = (SStreamTableObj *) row;
assert(row != NULL && str != NULL);
int tsize = pTable->updateEnd - (int8_t *) pTable;
if (size < tsize + pTable->schemaSize + pTable->sqlLen + 1) {
*ssize = -1;
return NULL;
}
memcpy(str, pTable, tsize);
memcpy(str + tsize, pTable->schema, pTable->schemaSize);
memcpy(str + tsize + pTable->schemaSize, pTable->pSql, pTable->sqlLen);
*ssize = tsize + pTable->schemaSize + pTable->sqlLen;
return NULL;
}
void *mgmtStreamTableActionDecode(void *row, char *str, int size, int *ssize) {
assert(str != NULL);
SStreamTableObj *pTable = (SStreamTableObj *)malloc(sizeof(SNormalTableObj));
if (pTable == NULL) {
return NULL;
}
memset(pTable, 0, sizeof(STabObj));
int tsize = pTable->updateEnd - (int8_t *)pTable;
if (size < tsize) {
mgmtDestroyStreamTable(pTable);
return NULL;
}
memcpy(pTable, str, tsize);
pTable->schema = (char *)malloc(pTable->schemaSize);
if (pTable->schema == NULL) {
mgmtDestroyStreamTable(pTable);
return NULL;
}
memcpy(pTable->schema, str + tsize, pTable->schemaSize);
pTable->pSql = (char *)malloc(pTable->sqlLen);
if (pTable->pSql == NULL) {
mgmtDestroyStreamTable(pTable);
return NULL;
}
memcpy(pTable->pSql, str + tsize + pTable->schemaSize, pTable->sqlLen);
return (void *)pTable;
}
void *mgmtStreamTableAction(char action, void *row, char *str, int size, int *ssize) {
if (mgmtStreamTableActionFp[(uint8_t)action] != NULL) {
return (*(mgmtStreamTableActionFp[(uint8_t)action]))(row, str, size, ssize);
}
return NULL;
}
int32_t mgmtInitStreamTables() { int32_t mgmtInitStreamTables() {
return 0; return 0;
} }
......
...@@ -40,13 +40,7 @@ ...@@ -40,13 +40,7 @@
#include "mgmtSuperTable.h" #include "mgmtSuperTable.h"
#include "mgmtChildTable.h" #include "mgmtChildTable.h"
#include "tutil.h"
#define mgmtDestroyMeter(pMetric) \
do { \
tfree(pMetric->schema); \
pMetric->pSkipList = tSkipListDestroy((pMetric)->pSkipList); \
tfree(pMetric); \
} while (0)
typedef struct { typedef struct {
...@@ -69,7 +63,211 @@ typedef struct { ...@@ -69,7 +63,211 @@ typedef struct {
char data[]; char data[];
} SMeterUpdateMsg; } SMeterUpdateMsg;
void *tsSuperTableSdb;
void *(*mgmtSuperTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize);
void *mgmtSuperTableActionInsert(void *row, char *str, int size, int *ssize);
void *mgmtSuperTableActionDelete(void *row, char *str, int size, int *ssize);
void *mgmtSuperTableActionUpdate(void *row, char *str, int size, int *ssize);
void *mgmtSuperTableActionEncode(void *row, char *str, int size, int *ssize);
void *mgmtSuperTableActionDecode(void *row, char *str, int size, int *ssize);
void *mgmtSuperTableActionReset(void *row, char *str, int size, int *ssize);
void *mgmtSuperTableActionDestroy(void *row, char *str, int size, int *ssize);
static void mgmtDestroySuperTable(SSuperTableObj *pTable) {
free(pTable->schema);
free(pTable);
}
static void mgmtSuperTableActionInit() {
mgmtSuperTableActionFp[SDB_TYPE_INSERT] = mgmtSuperTableActionInsert;
mgmtSuperTableActionFp[SDB_TYPE_DELETE] = mgmtSuperTableActionDelete;
mgmtSuperTableActionFp[SDB_TYPE_UPDATE] = mgmtSuperTableActionUpdate;
mgmtSuperTableActionFp[SDB_TYPE_ENCODE] = mgmtSuperTableActionEncode;
mgmtSuperTableActionFp[SDB_TYPE_DECODE] = mgmtSuperTableActionDecode;
mgmtSuperTableActionFp[SDB_TYPE_RESET] = mgmtSuperTableActionReset;
mgmtSuperTableActionFp[SDB_TYPE_DESTROY] = mgmtSuperTableActionDestroy;
}
void *mgmtSuperTableActionReset(void *row, char *str, int size, int *ssize) {
SSuperTableObj *pTable = (SSuperTableObj *) row;
int tsize = pTable->updateEnd - (int8_t *) pTable;
memcpy(pTable, str, tsize);
pTable->schema = (char *) realloc(pTable->schema, pTable->schemaSize);
memcpy(pTable->schema, str + tsize, pTable->schemaSize);
return NULL;
}
void *mgmtSuperTableActionDestroy(void *row, char *str, int size, int *ssize) {
SSuperTableObj *pTable = (SSuperTableObj *) row;
mgmtDestroySuperTable(pTable);
return NULL;
}
void *mgmtSuperTableActionInsert(void *row, char *str, int size, int *ssize) {
SSuperTableObj *pTable = (SSuperTableObj *) row;
SDbObj *pDb = mgmtGetDbByMeterId(pTable->tableId);
if (pDb) {
mgmtAddMetricIntoDb(pDb, pTable);
}
return NULL;
}
void *mgmtSuperTableActionDelete(void *row, char *str, int size, int *ssize) {
SSuperTableObj *pTable = (SSuperTableObj *) row;
SDbObj *pDb = mgmtGetDbByMeterId(pTable->tableId);
if (pDb) {
mgmtRemoveMetricFromDb(pDb, pTable);
}
return NULL;
}
void *mgmtSuperTableActionUpdate(void *row, char *str, int size, int *ssize) {
return mgmtSuperTableActionReset(row, str, size, NULL);
}
void *mgmtSuperTableActionEncode(void *row, char *str, int size, int *ssize) {
SSuperTableObj *pTable = (SSuperTableObj *) row;
assert(row != NULL && str != NULL);
int tsize = pTable->updateEnd - (int8_t *) pTable;
if (size < tsize + pTable->schemaSize + 1) {
*ssize = -1;
return NULL;
}
memcpy(str, pTable, tsize);
memcpy(str + tsize, pTable->schema, pTable->schemaSize);
*ssize = tsize + pTable->schemaSize;
return NULL;
}
void *mgmtSuperTableActionDecode(void *row, char *str, int size, int *ssize) {
assert(str != NULL);
SSuperTableObj *pTable = (SSuperTableObj *)malloc(sizeof(SSuperTableObj));
if (pTable == NULL) {
return NULL;
}
memset(pTable, 0, sizeof(STabObj));
int tsize = pTable->updateEnd - (int8_t *)pTable;
if (size < tsize) {
mgmtDestroySuperTable(pTable);
return NULL;
}
memcpy(pTable, str, tsize);
pTable->schema = (char *)malloc(pTable->schemaSize);
if (pTable->schema == NULL) {
mgmtDestroySuperTable(pTable);
return NULL;
}
memcpy(pTable->schema, str + tsize, pTable->schemaSize);
return (void *)pTable;
}
void *mgmtSuperTableAction(char action, void *row, char *str, int size, int *ssize) {
if (mgmtSuperTableActionFp[(uint8_t)action] != NULL) {
return (*(mgmtSuperTableActionFp[(uint8_t)action]))(row, str, size, ssize);
}
return NULL;
}
int32_t mgmtInitSuperTables() { int32_t mgmtInitSuperTables() {
void * pNode = NULL;
void * pLastNode = NULL;
SVgObj * pVgroup = NULL;
STabObj * pTable = NULL;
STabObj * pMetric = NULL;
SDbObj * pDb = NULL;
SAcctObj *pAcct = NULL;
// TODO: Make sure this function only run once
mgmtSuperTableActionInit();
tsSuperTableSdb = sdbOpenTable(tsMaxTables, sizeof(STabObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_MAX_SQL_LEN,
"meters", SDB_KEYTYPE_STRING, mgmtDirectory, mgmtSuperTableAction);
if (meterSdb == NULL) {
mError("failed to init meter data");
return -1;
}
pNode = NULL;
while (1) {
pNode = sdbFetchRow(meterSdb, pNode, (void **)&pTable);
if (pTable == NULL) break;
if (mgmtIsSuperTable(pTable)) pTable->numOfMeters = 0;
}
pNode = NULL;
while (1) {
pLastNode = pNode;
pNode = sdbFetchRow(meterSdb, pNode, (void **)&pTable);
if (pTable == NULL) break;
pDb = mgmtGetDbByMeterId(pTable->meterId);
if (pDb == NULL) {
mError("meter:%s, failed to get db, discard it", pTable->meterId, pTable->gid.vgId, pTable->gid.sid);
pTable->gid.vgId = 0;
sdbDeleteRow(meterSdb, pTable);
pNode = pLastNode;
continue;
}
if (mgmtIsNormalTable(pTable)) {
pVgroup = mgmtGetVgroup(pTable->gid.vgId);
if (pVgroup == NULL) {
mError("meter:%s, failed to get vgroup:%d sid:%d, discard it", pTable->meterId, pTable->gid.vgId, pTable->gid.sid);
pTable->gid.vgId = 0;
sdbDeleteRow(meterSdb, pTable);
pNode = pLastNode;
continue;
}
if (strcmp(pVgroup->dbName, pDb->name) != 0) {
mError("meter:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it",
pTable->meterId, pDb->name, pTable->gid.vgId, pVgroup->dbName, pTable->gid.sid);
pTable->gid.vgId = 0;
sdbDeleteRow(meterSdb, pTable);
pNode = pLastNode;
continue;
}
if ( pVgroup->meterList == NULL) {
mError("meter:%s, vgroup:%d meterlist is null", pTable->meterId, pTable->gid.vgId);
pTable->gid.vgId = 0;
sdbDeleteRow(meterSdb, pTable);
pNode = pLastNode;
continue;
}
pVgroup->meterList[pTable->gid.sid] = pTable;
taosIdPoolMarkStatus(pVgroup->idPool, pTable->gid.sid, 1);
if (pTable->tableType == TSDB_TABLE_TYPE_STREAM_TABLE) {
pTable->pSql = (char *)pTable->schema + sizeof(SSchema) * pTable->numOfColumns;
}
if (mgmtTableCreateFromSuperTable(pTable)) {
pTable->pTagData = (char *)pTable->schema; // + sizeof(SSchema)*pTable->numOfColumns;
pMetric = mgmtGetTable(pTable->pTagData);
if (pMetric) mgmtAddMeterIntoMetric(pMetric, pTable);
}
pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct) mgmtAddMeterStatisticToAcct(pTable, pAcct);
} else {
if (pDb) mgmtAddMetricIntoDb(pDb, pTable);
}
}
mgmtSetVgroupIdPool();
mTrace("meter is initialized");
return 0; return 0;
} }
...@@ -176,6 +374,15 @@ int32_t mgmtAddSuperTableTag(SSuperTableObj *pMetric, SSchema schema[], int32_t ...@@ -176,6 +374,15 @@ int32_t mgmtAddSuperTableTag(SSuperTableObj *pMetric, SSchema schema[], int32_t
int32_t ret = sdbBatchUpdateRow(tsSuperTableSdb, msg, size); int32_t ret = sdbBatchUpdateRow(tsSuperTableSdb, msg, size);
tfree(msg); tfree(msg);
//
// if (msg->type == SDB_TYPE_INSERT) { // Insert schema
// uint32_t total_cols = pTable->numOfColumns + pTable->numOfTags;
// pTable->schema = realloc(pTable->schema, (total_cols + msg->cols) * sizeof(SSchema));
// pTable->schemaSize = (total_cols + msg->cols) * sizeof(SSchema);
// pTable->numOfTags += msg->cols;
// memcpy(pTable->schema + total_cols * sizeof(SSchema), msg->data, msg->cols * sizeof(SSchema));
//
// }
if (ret < 0) { if (ret < 0) {
mError("Failed to add tag column %s to table %s", schema[0].name, pMetric->tableId); mError("Failed to add tag column %s to table %s", schema[0].name, pMetric->tableId);
...@@ -200,7 +407,20 @@ int32_t mgmtDropSuperTableTag(SSuperTableObj *pMetric, char *tagName) { ...@@ -200,7 +407,20 @@ int32_t mgmtDropSuperTableTag(SSuperTableObj *pMetric, char *tagName) {
memcpy(msg->meterId, pMetric->tableId, TSDB_TABLE_ID_LEN); memcpy(msg->meterId, pMetric->tableId, TSDB_TABLE_ID_LEN);
msg->type = SDB_TYPE_DELETE; msg->type = SDB_TYPE_DELETE;
msg->cols = 1; msg->cols = 1;
//
// // Make sure the order of tag columns
// SchemaUnit *schemaUnit = (SchemaUnit *)(msg->data);
// int col = schemaUnit->col;
// assert(col > 0 && col < pTable->numOfTags);
// if (col < pTable->numOfTags - 1) {
// memmove(pTable->schema + sizeof(SSchema) * (pTable->numOfColumns + col),
// pTable->schema + sizeof(SSchema) * (pTable->numOfColumns + col + 1),
// pTable->schemaSize - (sizeof(SSchema) * (pTable->numOfColumns + col + 1)));
// }
// pTable->schemaSize -= sizeof(SSchema);
// pTable->numOfTags--;
// pTable->schema = realloc(pTable->schema, pTable->schemaSize);
//
((SchemaUnit *) (msg->data))->col = col; ((SchemaUnit *) (msg->data))->col = col;
((SchemaUnit *) (msg->data))->pos = mgmtGetTagsLength(pMetric, col) + TSDB_TABLE_ID_LEN; ((SchemaUnit *) (msg->data))->pos = mgmtGetTagsLength(pMetric, col) + TSDB_TABLE_ID_LEN;
((SchemaUnit *) (msg->data))->schema = *(SSchema *) (pMetric->schema + sizeof(SSchema) * (pMetric->numOfColumns + col)); ((SchemaUnit *) (msg->data))->schema = *(SSchema *) (pMetric->schema + sizeof(SSchema) * (pMetric->numOfColumns + col));
...@@ -242,7 +462,7 @@ int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pMetric, char *oldTagN ...@@ -242,7 +462,7 @@ int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pMetric, char *oldTagN
if (msg == NULL) return TSDB_CODE_APP_ERROR; if (msg == NULL) return TSDB_CODE_APP_ERROR;
memset(msg, 0, size); memset(msg, 0, size);
mgmtMeterActionEncode(pMetric, msg, size, &rowSize); mgmtSuperTableActionEncode(pMetric, msg, size, &rowSize);
int32_t ret = sdbUpdateRow(tsSuperTableSdb, msg, rowSize, 1); int32_t ret = sdbUpdateRow(tsSuperTableSdb, msg, rowSize, 1);
tfree(msg); tfree(msg);
...@@ -471,3 +691,42 @@ int mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int rows, SConnObj *pCo ...@@ -471,3 +691,42 @@ int mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int rows, SConnObj *pCo
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
return numOfRows; return numOfRows;
} }
int mgmtAddMeterIntoMetric(STabObj *pMetric, STabObj *pTable) {
if (pTable == NULL || pMetric == NULL) return -1;
pthread_rwlock_wrlock(&(pMetric->rwLock));
// add meter into skip list
pTable->next = pMetric->pHead;
pTable->prev = NULL;
if (pMetric->pHead) pMetric->pHead->prev = pTable;
pMetric->pHead = pTable;
pMetric->numOfMeters++;
addMeterIntoMetricIndex(pMetric, pTable);
pthread_rwlock_unlock(&(pMetric->rwLock));
return 0;
}
int mgmtRemoveMeterFromMetric(STabObj *pMetric, STabObj *pTable) {
pthread_rwlock_wrlock(&(pMetric->rwLock));
if (pTable->prev) pTable->prev->next = pTable->next;
if (pTable->next) pTable->next->prev = pTable->prev;
if (pTable->prev == NULL) pMetric->pHead = pTable->next;
pMetric->numOfMeters--;
removeMeterFromMetricIndex(pMetric, pTable);
pthread_rwlock_unlock(&(pMetric->rwLock));
return 0;
}
\ No newline at end of file
...@@ -48,17 +48,6 @@ extern int64_t sdbVersion; ...@@ -48,17 +48,6 @@ extern int64_t sdbVersion;
void *meterSdb = NULL; void *meterSdb = NULL;
void *(*mgmtMeterActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize); void *(*mgmtMeterActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize);
// Function declaration
void *mgmtMeterActionInsert(void *row, char *str, int size, int *ssize);
void *mgmtMeterActionDelete(void *row, char *str, int size, int *ssize);
void *mgmtMeterActionUpdate(void *row, char *str, int size, int *ssize);
void *mgmtMeterActionEncode(void *row, char *str, int size, int *ssize);
void *mgmtMeterActionDecode(void *row, char *str, int size, int *ssize);
void *mgmtMeterActionBeforeBatchUpdate(void *row, char *str, int size, int *ssize);
void *mgmtMeterActionBatchUpdate(void *row, char *str, int size, int *ssize);
void *mgmtMeterActionAfterBatchUpdate(void *row, char *str, int size, int *ssize);
void *mgmtMeterActionReset(void *row, char *str, int size, int *ssize);
void *mgmtMeterActionDestroy(void *row, char *str, int size, int *ssize);
int32_t mgmtMeterAddTags(STabObj *pMetric, SSchema schema[], int ntags); int32_t mgmtMeterAddTags(STabObj *pMetric, SSchema schema[], int ntags);
static void removeMeterFromMetricIndex(STabObj *pMetric, STabObj *pTable); static void removeMeterFromMetricIndex(STabObj *pMetric, STabObj *pTable);
static void addMeterIntoMetricIndex(STabObj *pMetric, STabObj *pTable); static void addMeterIntoMetricIndex(STabObj *pMetric, STabObj *pTable);
...@@ -70,19 +59,6 @@ int32_t mgmtMeterDropColumnByName(STabObj *pTable, const char *name); ...@@ -70,19 +59,6 @@ int32_t mgmtMeterDropColumnByName(STabObj *pTable, const char *name);
static int dropMeterImp(SDbObj *pDb, STabObj * pTable, SAcctObj *pAcct); static int dropMeterImp(SDbObj *pDb, STabObj * pTable, SAcctObj *pAcct);
static void dropAllMetersOfMetric(SDbObj *pDb, STabObj * pMetric, SAcctObj *pAcct); static void dropAllMetersOfMetric(SDbObj *pDb, STabObj * pMetric, SAcctObj *pAcct);
static void mgmtMeterActionInit() {
mgmtMeterActionFp[SDB_TYPE_INSERT] = mgmtMeterActionInsert;
mgmtMeterActionFp[SDB_TYPE_DELETE] = mgmtMeterActionDelete;
mgmtMeterActionFp[SDB_TYPE_UPDATE] = mgmtMeterActionUpdate;
mgmtMeterActionFp[SDB_TYPE_ENCODE] = mgmtMeterActionEncode;
mgmtMeterActionFp[SDB_TYPE_DECODE] = mgmtMeterActionDecode;
mgmtMeterActionFp[SDB_TYPE_BEFORE_BATCH_UPDATE] = mgmtMeterActionBeforeBatchUpdate;
mgmtMeterActionFp[SDB_TYPE_BATCH_UPDATE] = mgmtMeterActionBatchUpdate;
mgmtMeterActionFp[SDB_TYPE_AFTER_BATCH_UPDATE] = mgmtMeterActionAfterBatchUpdate;
mgmtMeterActionFp[SDB_TYPE_RESET] = mgmtMeterActionReset;
mgmtMeterActionFp[SDB_TYPE_DESTROY] = mgmtMeterActionDestroy;
}
static int32_t mgmtGetReqTagsLength(STabObj *pMetric, int16_t *cols, int32_t numOfCols) { static int32_t mgmtGetReqTagsLength(STabObj *pMetric, int16_t *cols, int32_t numOfCols) {
assert(mgmtIsSuperTable(pMetric) && numOfCols >= 0 && numOfCols <= TSDB_MAX_TAGS + 1); assert(mgmtIsSuperTable(pMetric) && numOfCols >= 0 && numOfCols <= TSDB_MAX_TAGS + 1);
...@@ -110,300 +86,6 @@ static void mgmtVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_ ...@@ -110,300 +86,6 @@ static void mgmtVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_
} }
} }
void *mgmtMeterActionReset(void *row, char *str, int size, int *ssize) {
STabObj *pTable = (STabObj *)row;
int tsize = pTable->updateEnd - (char *)pTable;
memcpy(pTable, str, tsize);
pTable->schema = (char *)realloc(pTable->schema, pTable->schemaSize);
memcpy(pTable->schema, str + tsize, pTable->schemaSize);
if (mgmtTableCreateFromSuperTable(pTable)) {
pTable->pTagData = pTable->schema;
}
return NULL;
}
void *mgmtMeterActionDestroy(void *row, char *str, int size, int *ssize) {
STabObj *pTable = (STabObj *)row;
mgmtDestroyMeter(pTable);
return NULL;
}
void *mgmtMeterActionInsert(void *row, char *str, int size, int *ssize) {
STabObj * pTable = NULL;
SVgObj * pVgroup = NULL;
SDbObj * pDb = NULL;
STabObj * pMetric = NULL;
SAcctObj *pAcct = NULL;
pTable = (STabObj *)row;
if (mgmtIsNormalTable(pTable)) {
pVgroup = mgmtGetVgroup(pTable->gid.vgId);
if (pVgroup == NULL) {
mError("id:%s not in vgroup:%d", pTable->meterId, pTable->gid.vgId);
return NULL;
}
pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) {
mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName);
return NULL;
}
pAcct = mgmtGetAcct(pDb->cfg.acct);
// TODO : check if account exists.
if (pAcct == NULL) {
mError("account not exists");
return NULL;
}
}
if (mgmtTableCreateFromSuperTable(pTable)) {
pTable->pTagData = (char *)pTable->schema;
pMetric = mgmtGetTable(pTable->pTagData);
assert(pMetric != NULL);
}
if (pTable->tableType == TSDB_TABLE_TYPE_STREAM_TABLE) {
pTable->pSql = (char *)pTable->schema + sizeof(SSchema) * pTable->numOfColumns;
}
if (mgmtIsNormalTable(pTable)) {
if (pMetric) mgmtAddMeterIntoMetric(pMetric, pTable);
if (!sdbMaster) {
int sid = taosAllocateId(pVgroup->idPool);
if (sid != pTable->gid.sid) {
mError("sid:%d is not matched from the master:%d", sid, pTable->gid.sid);
return NULL;
}
}
pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1);
pVgroup->numOfMeters++;
pDb->numOfTables++;
pVgroup->meterList[pTable->gid.sid] = pTable;
if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) mgmtMoveVgroupToTail(pDb, pVgroup);
} else {
// insert a metric
pTable->pHead = NULL;
pTable->pSkipList = NULL;
pDb = mgmtGetDbByMeterId(pTable->meterId);
if (pDb) {
mgmtAddMetricIntoDb(pDb, pTable);
}
}
return NULL;
}
void *mgmtMeterActionDelete(void *row, char *str, int size, int *ssize) {
STabObj *pTable = NULL;
SVgObj * pVgroup = NULL;
SDbObj * pDb = NULL;
STabObj *pMetric = NULL;
pTable = (STabObj *)row;
if (mgmtIsNormalTable(pTable)) {
if (pTable->gid.vgId == 0) {
return NULL;
}
pVgroup = mgmtGetVgroup(pTable->gid.vgId);
if (pVgroup == NULL) {
mError("id:%s not in vgroup:%d", pTable->meterId, pTable->gid.vgId);
return NULL;
}
pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) {
mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName);
return NULL;
}
}
if (mgmtTableCreateFromSuperTable(pTable)) {
pTable->pTagData = (char *)pTable->schema;
pMetric = mgmtGetTable(pTable->pTagData);
assert(pMetric != NULL);
}
if (mgmtIsNormalTable(pTable)) {
if (pMetric) mgmtRemoveMeterFromMetric(pMetric, pTable);
pVgroup->meterList[pTable->gid.sid] = NULL;
pVgroup->numOfMeters--;
pDb->numOfTables--;
taosFreeId(pVgroup->idPool, pTable->gid.sid);
if (pVgroup->numOfMeters > 0) mgmtMoveVgroupToHead(pDb, pVgroup);
} else {
// remove a metric
// remove all the associated meters
pDb = mgmtGetDbByMeterId(pTable->meterId);
if (pDb) mgmtRemoveMetricFromDb(pDb, pTable);
}
return NULL;
}
void *mgmtMeterActionUpdate(void *row, char *str, int size, int *ssize) {
STabObj *pTable = NULL;
STabObj *pMetric = NULL;
pTable = (STabObj *)row;
STabObj *pNew = (STabObj *)str;
if (pNew->isDirty) {
pMetric = mgmtGetTable(pTable->pTagData);
removeMeterFromMetricIndex(pMetric, pTable);
}
mgmtMeterActionReset(pTable, str, size, NULL);
pTable->pTagData = pTable->schema;
if (pNew->isDirty) {
addMeterIntoMetricIndex(pMetric, pTable);
pTable->isDirty = 0;
}
return NULL;
}
void *mgmtMeterActionEncode(void *row, char *str, int size, int *ssize) {
assert(row != NULL && str != NULL);
STabObj *pTable = (STabObj *)row;
int tsize = pTable->updateEnd - (char *)pTable;
if (size < tsize + pTable->schemaSize + 1) {
*ssize = -1;
return NULL;
}
memcpy(str, pTable, tsize);
memcpy(str + tsize, pTable->schema, pTable->schemaSize);
*ssize = tsize + pTable->schemaSize;
return NULL;
}
void *mgmtMeterActionDecode(void *row, char *str, int size, int *ssize) {
assert(str != NULL);
STabObj *pTable = (STabObj *)malloc(sizeof(STabObj));
if (pTable == NULL) return NULL;
memset(pTable, 0, sizeof(STabObj));
int tsize = pTable->updateEnd - (char *)pTable;
if (size < tsize) {
mgmtDestroyMeter(pTable);
return NULL;
}
memcpy(pTable, str, tsize);
pTable->schema = (char *)malloc(pTable->schemaSize);
if (pTable->schema == NULL) {
mgmtDestroyMeter(pTable);
return NULL;
}
memcpy(pTable->schema, str + tsize, pTable->schemaSize);
return (void *)pTable;
}
void *mgmtMeterActionBeforeBatchUpdate(void *row, char *str, int size, int *ssize) {
STabObj *pMetric = (STabObj *)row;
pthread_rwlock_wrlock(&(pMetric->rwLock));
return NULL;
}
void *mgmtMeterActionBatchUpdate(void *row, char *str, int size, int *ssize) {
STabObj * pTable = (STabObj *)row;
SMeterBatchUpdateMsg *msg = (SMeterBatchUpdateMsg *)str;
if (mgmtIsSuperTable(pTable)) {
if (msg->type == SDB_TYPE_INSERT) { // Insert schema
uint32_t total_cols = pTable->numOfColumns + pTable->numOfTags;
pTable->schema = realloc(pTable->schema, (total_cols + msg->cols) * sizeof(SSchema));
pTable->schemaSize = (total_cols + msg->cols) * sizeof(SSchema);
pTable->numOfTags += msg->cols;
memcpy(pTable->schema + total_cols * sizeof(SSchema), msg->data, msg->cols * sizeof(SSchema));
} else if (msg->type == SDB_TYPE_DELETE) { // Delete schema
// Make sure the order of tag columns
SchemaUnit *schemaUnit = (SchemaUnit *)(msg->data);
int col = schemaUnit->col;
assert(col > 0 && col < pTable->numOfTags);
if (col < pTable->numOfTags - 1) {
memmove(pTable->schema + sizeof(SSchema) * (pTable->numOfColumns + col),
pTable->schema + sizeof(SSchema) * (pTable->numOfColumns + col + 1),
pTable->schemaSize - (sizeof(SSchema) * (pTable->numOfColumns + col + 1)));
}
pTable->schemaSize -= sizeof(SSchema);
pTable->numOfTags--;
pTable->schema = realloc(pTable->schema, pTable->schemaSize);
}
return pTable->pHead;
} else if (mgmtTableCreateFromSuperTable(pTable)) {
if (msg->type == SDB_TYPE_INSERT) {
SSchema *schemas = (SSchema *)msg->data;
int total_size = 0;
for (int i = 0; i < msg->cols; i++) {
total_size += schemas[i].bytes;
}
pTable->schema = realloc(pTable->schema, pTable->schemaSize + total_size);
pTable->pTagData = pTable->schema;
memset(pTable->schema + pTable->schemaSize, 0, total_size);
pTable->schemaSize += total_size;
// TODO: set the data as default value
} else if (msg->type == SDB_TYPE_DELETE) { // Delete values in MTABLEs
SchemaUnit *schemaUnit = (SchemaUnit *)(msg->data);
int32_t pos = schemaUnit->pos;
int32_t bytes = schemaUnit->schema.bytes;
assert(pos + bytes <= pTable->schemaSize);
if (pos + bytes != pTable->schemaSize) {
memmove(pTable->schema + pos, pTable->schema + pos + bytes, pTable->schemaSize - (pos + bytes));
}
pTable->schemaSize -= bytes;
pTable->schema = realloc(pTable->schema, pTable->schemaSize);
}
return pTable->next;
}
return NULL;
}
void *mgmtMeterActionAfterBatchUpdate(void *row, char *str, int size, int *ssize) {
STabObj *pMetric = (STabObj *)row;
pthread_rwlock_unlock(&(pMetric->rwLock));
return NULL;
}
void *mgmtMeterAction(char action, void *row, char *str, int size, int *ssize) {
if (mgmtMeterActionFp[(uint8_t)action] != NULL) {
return (*(mgmtMeterActionFp[(uint8_t)action]))(row, str, size, ssize);
}
return NULL;
}
void mgmtAddMeterStatisticToAcct(STabObj *pTable, SAcctObj *pAcct) {
pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1);
}
int mgmtInitMeters() { int mgmtInitMeters() {
void * pNode = NULL; void * pNode = NULL;
void * pLastNode = NULL; void * pLastNode = NULL;
...@@ -663,44 +345,6 @@ int mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter) { ...@@ -663,44 +345,6 @@ int mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter) {
return TSDB_CODE_OPS_NOT_SUPPORT; return TSDB_CODE_OPS_NOT_SUPPORT;
} }
int mgmtAddMeterIntoMetric(STabObj *pMetric, STabObj *pTable) {
if (pTable == NULL || pMetric == NULL) return -1;
pthread_rwlock_wrlock(&(pMetric->rwLock));
// add meter into skip list
pTable->next = pMetric->pHead;
pTable->prev = NULL;
if (pMetric->pHead) pMetric->pHead->prev = pTable;
pMetric->pHead = pTable;
pMetric->numOfMeters++;
addMeterIntoMetricIndex(pMetric, pTable);
pthread_rwlock_unlock(&(pMetric->rwLock));
return 0;
}
int mgmtRemoveMeterFromMetric(STabObj *pMetric, STabObj *pTable) {
pthread_rwlock_wrlock(&(pMetric->rwLock));
if (pTable->prev) pTable->prev->next = pTable->next;
if (pTable->next) pTable->next->prev = pTable->prev;
if (pTable->prev == NULL) pMetric->pHead = pTable->next;
pMetric->numOfMeters--;
removeMeterFromMetricIndex(pMetric, pTable);
pthread_rwlock_unlock(&(pMetric->rwLock));
return 0;
}
void mgmtCleanUpMeters() { void mgmtCleanUpMeters() {
mgmtCleanUpNormalTables(); mgmtCleanUpNormalTables();
mgmtCleanUpStreamTables(); mgmtCleanUpStreamTables();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册