From 649ef3d307061bcaf122b580100aa158287ec85a Mon Sep 17 00:00:00 2001 From: slguan Date: Fri, 14 Feb 2020 16:56:20 +0800 Subject: [PATCH] #1177 --- src/inc/mnode.h | 23 ++- src/mnode/inc/mgmtAcct.h | 4 + src/mnode/src/mgmtAcct.c | 6 + src/mnode/src/mgmtChildTable.c | 161 ++++++++++++++- src/mnode/src/mgmtNormalTable.c | 173 +++++++++++++++- src/mnode/src/mgmtStreamTable.c | 185 ++++++++++++++++- src/mnode/src/mgmtSuperTable.c | 277 ++++++++++++++++++++++++- src/mnode/src/mgmtTable.c | 356 -------------------------------- 8 files changed, 805 insertions(+), 380 deletions(-) diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 5c1ac25bc1..4d7591fdd3 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -151,12 +151,13 @@ typedef struct SSuperTableObj { int32_t schemaSize; int8_t reserved[7]; int8_t updateEnd[1]; - int16_t nextColId; - pthread_rwlock_t rwLock; - tSkipList * pSkipList; - struct SSuperTableObj *pHead; + + + pthread_rwlock_t rwLock; struct SSuperTableObj *prev, *next; - int8_t* schema; + int16_t nextColId; + + int8_t *schema; } SSuperTableObj; typedef struct { @@ -180,8 +181,8 @@ typedef struct { int32_t sversion; int32_t numOfColumns; int32_t schemaSize; - char reserved[3]; - char updateEnd[1]; + int8_t reserved[3]; + int8_t updateEnd[1]; int16_t nextColId; char* schema; } SNormalTableObj; @@ -196,8 +197,8 @@ typedef struct { int32_t numOfColumns; int32_t schemaSize; int16_t sqlLen; - char reserved[3]; - char updateEnd[1]; + int8_t reserved[3]; + int8_t updateEnd[1]; int16_t nextColId; char* pSql; //null-terminated string char* schema; @@ -216,8 +217,8 @@ typedef struct _vg_obj { int32_t lbIp; int32_t lbTime; int8_t lbStatus; - char reserved[16]; - char updateEnd[1]; + int8_t reserved[16]; + int8_t updateEnd[1]; struct _vg_obj *prev, *next; void * idPool; STabObj ** meterList; diff --git a/src/mnode/inc/mgmtAcct.h b/src/mnode/inc/mgmtAcct.h index 8d0fb72667..3124f0e4df 100644 --- a/src/mnode/inc/mgmtAcct.h +++ b/src/mnode/inc/mgmtAcct.h @@ -45,6 +45,10 @@ extern void (*mgmtCleanUpAccts)(); extern int32_t (*mgmtGetAcctMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); extern int32_t (*mgmtRetrieveAccts)(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn); + + +void mgmtAddMeterStatisticToAcct(SAcctObj *pAcct, int numOfColumns); + #ifdef __cplusplus } #endif diff --git a/src/mnode/src/mgmtAcct.c b/src/mnode/src/mgmtAcct.c index 67b14c319e..be695a036d 100644 --- a/src/mnode/src/mgmtAcct.c +++ b/src/mnode/src/mgmtAcct.c @@ -184,3 +184,9 @@ int32_t (*mgmtGetAcctMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) int32_t mgmtRetrieveAcctsImp(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { return 0; } int32_t (*mgmtRetrieveAccts)(SShowObj *pShow, char *data, int rows, SConnObj *pConn) = mgmtRetrieveAcctsImp; + + + +void mgmtAddMeterStatisticToAcct(SAcctObj *pAcct, int numOfColumns) { + pAcct->acctInfo.numOfTimeSeries += (numOfColumns - 1); +} diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index b94282cf57..da1b9c9a6e 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -23,7 +23,6 @@ #include "mgmtDb.h" #include "mgmtDnodeInt.h" #include "mgmtVgroup.h" -#include "mgmtSupertableQuery.h" #include "mgmtTable.h" #include "taosmsg.h" #include "tast.h" @@ -38,9 +37,167 @@ #include "sdb.h" #include "mgmtChildTable.h" -#include "mgmtSuperTable.h" +#include "mgmtChildTable.h" + + + +void *tsChildTableSdb; +void *(*mgmtChildTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize); + +void *mgmtChildTableActionInsert(void *row, char *str, int size, int *ssize); +void *mgmtChildTableActionDelete(void *row, char *str, int size, int *ssize); +void *mgmtChildTableActionUpdate(void *row, char *str, int size, int *ssize); +void *mgmtChildTableActionEncode(void *row, char *str, int size, int *ssize); +void *mgmtChildTableActionDecode(void *row, char *str, int size, int *ssize); +void *mgmtChildTableActionReset(void *row, char *str, int size, int *ssize); +void *mgmtChildTableActionDestroy(void *row, char *str, int size, int *ssize); + +static void mgmtDestroyChildTable(SChildTableObj *pTable) { + free(pTable); +} + +static void mgmtChildTableActionInit() { + mgmtChildTableActionFp[SDB_TYPE_INSERT] = mgmtChildTableActionInsert; + mgmtChildTableActionFp[SDB_TYPE_DELETE] = mgmtChildTableActionDelete; + mgmtChildTableActionFp[SDB_TYPE_UPDATE] = mgmtChildTableActionUpdate; + mgmtChildTableActionFp[SDB_TYPE_ENCODE] = mgmtChildTableActionEncode; + mgmtChildTableActionFp[SDB_TYPE_DECODE] = mgmtChildTableActionDecode; + mgmtChildTableActionFp[SDB_TYPE_RESET] = mgmtChildTableActionReset; + mgmtChildTableActionFp[SDB_TYPE_DESTROY] = mgmtChildTableActionDestroy; +} + +void *mgmtChildTableActionReset(void *row, char *str, int size, int *ssize) { + return NULL; +} + +void *mgmtChildTableActionDestroy(void *row, char *str, int size, int *ssize) { + SChildTableObj *pTable = (SChildTableObj *)row; + mgmtDestroyChildTable(pTable); + return NULL; +} + +void *mgmtChildTableActionInsert(void *row, char *str, int size, int *ssize) { + SChildTableObj *pTable = (SChildTableObj *) row; + + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId); + return NULL; + } + + SDbObj *pDb = mgmtGetDb(pVgroup->dbName); + if (pDb == NULL) { + mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName); + return NULL; + } + + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); + if (pAcct == NULL) { + mError("account not exists"); + return NULL; + } + + + if (!sdbMaster) { + int sid = taosAllocateId(pVgroup->idPool); + if (sid != pTable->sid) { + mError("sid:%d is not matched from the master:%d", sid, pTable->sid); + return NULL; + } + } + + mgmtAddMeterIntoMetric(pTable->superTableId, pTable); + + pAcct->acctInfo.numOfTimeSeries += (pTable->superTable->numOfColumns - 1); + pVgroup->numOfMeters++; + pDb->numOfTables++; + pVgroup->meterList[pTable->sid] = pTable; + + if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) { + mgmtMoveVgroupToTail(pDb, pVgroup); + } + + return NULL; +} +void *mgmtChildTableActionDelete(void *row, char *str, int size, int *ssize) { + SChildTableObj *pTable = (SChildTableObj *) row; + if (pTable->vgId == 0) { + return NULL; + } + + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId); + return NULL; + } + + SDbObj *pDb = mgmtGetDb(pVgroup->dbName); + if (pDb == NULL) { + mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName); + return NULL; + } + + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); + if (pAcct == NULL) { + mError("account not exists"); + return NULL; + } + + pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1); + pVgroup->meterList[pTable->sid] = NULL; + pVgroup->numOfMeters--; + pDb->numOfTables--; + taosFreeId(pVgroup->idPool, pTable->sid); + + mgmtRemoveMeterFromMetric(pTable->superTable, pTable); + + if (pVgroup->numOfMeters > 0) { + mgmtMoveVgroupToHead(pDb, pVgroup); + } + + return NULL; +} + +void *mgmtChildTableActionUpdate(void *row, char *str, int size, int *ssize) { + return mgmtChildTableActionReset(row, str, size, NULL); +} + +void *mgmtChildTableActionEncode(void *row, char *str, int size, int *ssize) { + SChildTableObj *pTable = (SChildTableObj *) row; + assert(row != NULL && str != NULL); + + int tsize = pTable->updateEnd - (int8_t *) pTable; + memcpy(str, pTable, tsize); + + return NULL; +} + +void *mgmtChildTableActionDecode(void *row, char *str, int size, int *ssize) { + assert(str != NULL); + SChildTableObj *pTable = (SChildTableObj *)malloc(sizeof(SChildTableObj)); + if (pTable == NULL) { + return NULL; + } + memset(pTable, 0, sizeof(STabObj)); + + int tsize = pTable->updateEnd - (int8_t *)pTable; + if (size < tsize) { + mgmtDestroyChildTable(pTable); + return NULL; + } + memcpy(pTable, str, tsize); + + return (void *)pTable; +} + +void *mgmtChildTableAction(char action, void *row, char *str, int size, int *ssize) { + if (mgmtChildTableActionFp[(uint8_t)action] != NULL) { + return (*(mgmtChildTableActionFp[(uint8_t)action]))(row, str, size, ssize); + } + return NULL; +} int32_t mgmtInitChildTables() { return 0; diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index a4ed370043..0551e3bc99 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -23,7 +23,6 @@ #include "mgmtDb.h" #include "mgmtDnodeInt.h" #include "mgmtVgroup.h" -#include "mgmtSupertableQuery.h" #include "mgmtTable.h" #include "taosmsg.h" #include "tast.h" @@ -39,6 +38,178 @@ #include "sdb.h" #include "mgmtNormalTable.h" + +void *tsSuperTableSdb; +void *(*mgmtNormalTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize); + +void *mgmtNormalTableActionInsert(void *row, char *str, int size, int *ssize); +void *mgmtNormalTableActionDelete(void *row, char *str, int size, int *ssize); +void *mgmtNormalTableActionUpdate(void *row, char *str, int size, int *ssize); +void *mgmtNormalTableActionEncode(void *row, char *str, int size, int *ssize); +void *mgmtNormalTableActionDecode(void *row, char *str, int size, int *ssize); +void *mgmtNormalTableActionReset(void *row, char *str, int size, int *ssize); +void *mgmtNormalTableActionDestroy(void *row, char *str, int size, int *ssize); + +static void mgmtDestroyNormalTable(SNormalTableObj *pTable) { + free(pTable->schema); + free(pTable); +} + +static void mgmtNormalTableActionInit() { + mgmtNormalTableActionFp[SDB_TYPE_INSERT] = mgmtNormalTableActionInsert; + mgmtNormalTableActionFp[SDB_TYPE_DELETE] = mgmtNormalTableActionDelete; + mgmtNormalTableActionFp[SDB_TYPE_UPDATE] = mgmtNormalTableActionUpdate; + mgmtNormalTableActionFp[SDB_TYPE_ENCODE] = mgmtNormalTableActionEncode; + mgmtNormalTableActionFp[SDB_TYPE_DECODE] = mgmtNormalTableActionDecode; + mgmtNormalTableActionFp[SDB_TYPE_RESET] = mgmtNormalTableActionReset; + mgmtNormalTableActionFp[SDB_TYPE_DESTROY] = mgmtNormalTableActionDestroy; +} + +void *mgmtNormalTableActionReset(void *row, char *str, int size, int *ssize) { + SNormalTableObj *pTable = (SNormalTableObj *) row; + int tsize = pTable->updateEnd - (int8_t *) pTable; + memcpy(pTable, str, tsize); + return NULL; +} + +void *mgmtNormalTableActionDestroy(void *row, char *str, int size, int *ssize) { + SNormalTableObj *pTable = (SNormalTableObj *)row; + mgmtDestroyNormalTable(pTable); + return NULL; +} + +void *mgmtNormalTableActionInsert(void *row, char *str, int size, int *ssize) { + SNormalTableObj *pTable = (SNormalTableObj *) row; + + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId); + return NULL; + } + + SDbObj *pDb = mgmtGetDb(pVgroup->dbName); + if (pDb == NULL) { + mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName); + return NULL; + } + + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); + if (pAcct == NULL) { + mError("account not exists"); + return NULL; + } + + if (!sdbMaster) { + int sid = taosAllocateId(pVgroup->idPool); + if (sid != pTable->sid) { + mError("sid:%d is not matched from the master:%d", sid, pTable->sid); + return NULL; + } + } + + pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1); + pVgroup->numOfMeters++; + pDb->numOfTables++; + pVgroup->meterList[pTable->sid] = pTable; + + if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) { + mgmtMoveVgroupToTail(pDb, pVgroup); + } + + return NULL; +} + +void *mgmtNormalTableActionDelete(void *row, char *str, int size, int *ssize) { + SNormalTableObj *pTable = (SNormalTableObj *) row; + if (pTable->vgId == 0) { + return NULL; + } + + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId); + return NULL; + } + + SDbObj *pDb = mgmtGetDb(pVgroup->dbName); + if (pDb == NULL) { + mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName); + return NULL; + } + + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); + if (pAcct == NULL) { + mError("account not exists"); + return NULL; + } + + pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1); + pVgroup->meterList[pTable->sid] = NULL; + pVgroup->numOfMeters--; + pDb->numOfTables--; + taosFreeId(pVgroup->idPool, pTable->sid); + + if (pVgroup->numOfMeters > 0) { + mgmtMoveVgroupToHead(pDb, pVgroup); + } + + return NULL; +} + +void *mgmtNormalTableActionUpdate(void *row, char *str, int size, int *ssize) { + return mgmtNormalTableActionReset(row, str, size, NULL); +} + +void *mgmtNormalTableActionEncode(void *row, char *str, int size, int *ssize) { + SNormalTableObj *pTable = (SNormalTableObj *) row; + assert(row != NULL && str != NULL); + + int tsize = pTable->updateEnd - (int8_t *) pTable; + if (size < tsize + pTable->schemaSize + 1) { + *ssize = -1; + return NULL; + } + + memcpy(str, pTable, tsize); + memcpy(str + tsize, pTable->schema, pTable->schemaSize); + *ssize = tsize + pTable->schemaSize; + + return NULL; +} + +void *mgmtNormalTableActionDecode(void *row, char *str, int size, int *ssize) { + assert(str != NULL); + + SNormalTableObj *pTable = (SNormalTableObj *)malloc(sizeof(SNormalTableObj)); + if (pTable == NULL) { + return NULL; + } + memset(pTable, 0, sizeof(STabObj)); + + int tsize = pTable->updateEnd - (int8_t *)pTable; + if (size < tsize) { + mgmtDestroyNormalTable(pTable); + return NULL; + } + memcpy(pTable, str, tsize); + + pTable->schema = (char *)malloc(pTable->schemaSize); + if (pTable->schema == NULL) { + mgmtDestroyNormalTable(pTable); + return NULL; + } + + memcpy(pTable->schema, str + tsize, pTable->schemaSize); + return (void *)pTable; +} + +void *mgmtNormalTableAction(char action, void *row, char *str, int size, int *ssize) { + if (mgmtNormalTableActionFp[(uint8_t)action] != NULL) { + return (*(mgmtNormalTableActionFp[(uint8_t)action]))(row, str, size, ssize); + } + return NULL; +} + int32_t mgmtInitNormalTables() { return 0; } diff --git a/src/mnode/src/mgmtStreamTable.c b/src/mnode/src/mgmtStreamTable.c index d6958db075..ef9aaa0086 100644 --- a/src/mnode/src/mgmtStreamTable.c +++ b/src/mnode/src/mgmtStreamTable.c @@ -23,7 +23,6 @@ #include "mgmtDb.h" #include "mgmtDnodeInt.h" #include "mgmtVgroup.h" -#include "mgmtStreamtableQuery.h" #include "mgmtTable.h" #include "taosmsg.h" #include "tast.h" @@ -39,6 +38,190 @@ #include "sdb.h" #include "mgmtStreamTable.h" + +void *tsSuperTableSdb; +void *(*mgmtStreamTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize); + +void *mgmtStreamTableActionInsert(void *row, char *str, int size, int *ssize); +void *mgmtStreamTableActionDelete(void *row, char *str, int size, int *ssize); +void *mgmtStreamTableActionUpdate(void *row, char *str, int size, int *ssize); +void *mgmtStreamTableActionEncode(void *row, char *str, int size, int *ssize); +void *mgmtStreamTableActionDecode(void *row, char *str, int size, int *ssize); +void *mgmtStreamTableActionReset(void *row, char *str, int size, int *ssize); +void *mgmtStreamTableActionDestroy(void *row, char *str, int size, int *ssize); + +static void mgmtDestroyStreamTable(SStreamTableObj *pTable) { + free(pTable->schema); + free(pTable->pSql); + free(pTable); +} + +static void mgmtStreamTableActionInit() { + mgmtStreamTableActionFp[SDB_TYPE_INSERT] = mgmtStreamTableActionInsert; + mgmtStreamTableActionFp[SDB_TYPE_DELETE] = mgmtStreamTableActionDelete; + mgmtStreamTableActionFp[SDB_TYPE_UPDATE] = mgmtStreamTableActionUpdate; + mgmtStreamTableActionFp[SDB_TYPE_ENCODE] = mgmtStreamTableActionEncode; + mgmtStreamTableActionFp[SDB_TYPE_DECODE] = mgmtStreamTableActionDecode; + mgmtStreamTableActionFp[SDB_TYPE_RESET] = mgmtStreamTableActionReset; + mgmtStreamTableActionFp[SDB_TYPE_DESTROY] = mgmtStreamTableActionDestroy; +} + +void *mgmtStreamTableActionReset(void *row, char *str, int size, int *ssize) { + SStreamTableObj *pTable = (SStreamTableObj *) row; + int tsize = pTable->updateEnd - (int8_t *) pTable; + memcpy(pTable, str, tsize); + pTable->schema = (char *) realloc(pTable->schema, pTable->schemaSize); + memcpy(pTable->schema, str + tsize, pTable->schemaSize); + pTable->pSql = (char *) realloc(pTable->pSql, pTable->sqlLen); + memcpy(pTable->pSql, str + tsize + pTable->schemaSize, pTable->sqlLen); + return NULL; +} + +void *mgmtStreamTableActionDestroy(void *row, char *str, int size, int *ssize) { + SSuperTableObj *pTable = (STabObj *)row; + mgmtDestroyStreamTable(pTable); + return NULL; +} + +void *mgmtStreamTableActionInsert(void *row, char *str, int size, int *ssize) { + SNormalTableObj *pTable = (SNormalTableObj *) row; + + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId); + return NULL; + } + + SDbObj *pDb = mgmtGetDb(pVgroup->dbName); + if (pDb == NULL) { + mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName); + return NULL; + } + + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); + if (pAcct == NULL) { + mError("account not exists"); + return NULL; + } + + if (!sdbMaster) { + int sid = taosAllocateId(pVgroup->idPool); + if (sid != pTable->sid) { + mError("sid:%d is not matched from the master:%d", sid, pTable->sid); + return NULL; + } + } + + pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1); + pVgroup->numOfMeters++; + pDb->numOfTables++; + pVgroup->meterList[pTable->sid] = pTable; + + if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) { + mgmtMoveVgroupToTail(pDb, pVgroup); + } + + return NULL; +} + +void *mgmtStreamTableActionDelete(void *row, char *str, int size, int *ssize) { + SNormalTableObj *pTable = (SNormalTableObj *) row; + if (pTable->vgId == 0) { + return NULL; + } + + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + mError("id:%s not in vgroup:%d", pTable->tableId, pTable->vgId); + return NULL; + } + + SDbObj *pDb = mgmtGetDb(pVgroup->dbName); + if (pDb == NULL) { + mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName); + return NULL; + } + + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); + if (pAcct == NULL) { + mError("account not exists"); + return NULL; + } + + pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1); + pVgroup->meterList[pTable->sid] = NULL; + pVgroup->numOfMeters--; + pDb->numOfTables--; + taosFreeId(pVgroup->idPool, pTable->sid); + + if (pVgroup->numOfMeters > 0) { + mgmtMoveVgroupToHead(pDb, pVgroup); + } + + return NULL; +} + +void *mgmtStreamTableActionUpdate(void *row, char *str, int size, int *ssize) { + return mgmtStreamTableActionReset(row, str, size, NULL); +} + +void *mgmtStreamTableActionEncode(void *row, char *str, int size, int *ssize) { + SStreamTableObj *pTable = (SStreamTableObj *) row; + assert(row != NULL && str != NULL); + + int tsize = pTable->updateEnd - (int8_t *) pTable; + if (size < tsize + pTable->schemaSize + pTable->sqlLen + 1) { + *ssize = -1; + return NULL; + } + + memcpy(str, pTable, tsize); + memcpy(str + tsize, pTable->schema, pTable->schemaSize); + memcpy(str + tsize + pTable->schemaSize, pTable->pSql, pTable->sqlLen); + *ssize = tsize + pTable->schemaSize + pTable->sqlLen; + + return NULL; +} + +void *mgmtStreamTableActionDecode(void *row, char *str, int size, int *ssize) { + assert(str != NULL); + + SStreamTableObj *pTable = (SStreamTableObj *)malloc(sizeof(SNormalTableObj)); + if (pTable == NULL) { + return NULL; + } + memset(pTable, 0, sizeof(STabObj)); + + int tsize = pTable->updateEnd - (int8_t *)pTable; + if (size < tsize) { + mgmtDestroyStreamTable(pTable); + return NULL; + } + memcpy(pTable, str, tsize); + + pTable->schema = (char *)malloc(pTable->schemaSize); + if (pTable->schema == NULL) { + mgmtDestroyStreamTable(pTable); + return NULL; + } + memcpy(pTable->schema, str + tsize, pTable->schemaSize); + + pTable->pSql = (char *)malloc(pTable->sqlLen); + if (pTable->pSql == NULL) { + mgmtDestroyStreamTable(pTable); + return NULL; + } + memcpy(pTable->pSql, str + tsize + pTable->schemaSize, pTable->sqlLen); + return (void *)pTable; +} + +void *mgmtStreamTableAction(char action, void *row, char *str, int size, int *ssize) { + if (mgmtStreamTableActionFp[(uint8_t)action] != NULL) { + return (*(mgmtStreamTableActionFp[(uint8_t)action]))(row, str, size, ssize); + } + return NULL; +} + int32_t mgmtInitStreamTables() { return 0; } diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index 5092ee8a74..3542979fc2 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -40,13 +40,7 @@ #include "mgmtSuperTable.h" #include "mgmtChildTable.h" - -#define mgmtDestroyMeter(pMetric) \ - do { \ - tfree(pMetric->schema); \ - pMetric->pSkipList = tSkipListDestroy((pMetric)->pSkipList); \ - tfree(pMetric); \ - } while (0) +#include "tutil.h" typedef struct { @@ -69,7 +63,211 @@ typedef struct { char data[]; } SMeterUpdateMsg; +void *tsSuperTableSdb; +void *(*mgmtSuperTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize); + +void *mgmtSuperTableActionInsert(void *row, char *str, int size, int *ssize); +void *mgmtSuperTableActionDelete(void *row, char *str, int size, int *ssize); +void *mgmtSuperTableActionUpdate(void *row, char *str, int size, int *ssize); +void *mgmtSuperTableActionEncode(void *row, char *str, int size, int *ssize); +void *mgmtSuperTableActionDecode(void *row, char *str, int size, int *ssize); +void *mgmtSuperTableActionReset(void *row, char *str, int size, int *ssize); +void *mgmtSuperTableActionDestroy(void *row, char *str, int size, int *ssize); + +static void mgmtDestroySuperTable(SSuperTableObj *pTable) { + free(pTable->schema); + free(pTable); +} + +static void mgmtSuperTableActionInit() { + mgmtSuperTableActionFp[SDB_TYPE_INSERT] = mgmtSuperTableActionInsert; + mgmtSuperTableActionFp[SDB_TYPE_DELETE] = mgmtSuperTableActionDelete; + mgmtSuperTableActionFp[SDB_TYPE_UPDATE] = mgmtSuperTableActionUpdate; + mgmtSuperTableActionFp[SDB_TYPE_ENCODE] = mgmtSuperTableActionEncode; + mgmtSuperTableActionFp[SDB_TYPE_DECODE] = mgmtSuperTableActionDecode; + mgmtSuperTableActionFp[SDB_TYPE_RESET] = mgmtSuperTableActionReset; + mgmtSuperTableActionFp[SDB_TYPE_DESTROY] = mgmtSuperTableActionDestroy; +} + +void *mgmtSuperTableActionReset(void *row, char *str, int size, int *ssize) { + SSuperTableObj *pTable = (SSuperTableObj *) row; + int tsize = pTable->updateEnd - (int8_t *) pTable; + memcpy(pTable, str, tsize); + pTable->schema = (char *) realloc(pTable->schema, pTable->schemaSize); + memcpy(pTable->schema, str + tsize, pTable->schemaSize); + return NULL; +} + +void *mgmtSuperTableActionDestroy(void *row, char *str, int size, int *ssize) { + SSuperTableObj *pTable = (SSuperTableObj *) row; + mgmtDestroySuperTable(pTable); + return NULL; +} + +void *mgmtSuperTableActionInsert(void *row, char *str, int size, int *ssize) { + SSuperTableObj *pTable = (SSuperTableObj *) row; + SDbObj *pDb = mgmtGetDbByMeterId(pTable->tableId); + if (pDb) { + mgmtAddMetricIntoDb(pDb, pTable); + } + return NULL; +} + +void *mgmtSuperTableActionDelete(void *row, char *str, int size, int *ssize) { + SSuperTableObj *pTable = (SSuperTableObj *) row; + SDbObj *pDb = mgmtGetDbByMeterId(pTable->tableId); + if (pDb) { + mgmtRemoveMetricFromDb(pDb, pTable); + } + return NULL; +} + +void *mgmtSuperTableActionUpdate(void *row, char *str, int size, int *ssize) { + return mgmtSuperTableActionReset(row, str, size, NULL); +} + +void *mgmtSuperTableActionEncode(void *row, char *str, int size, int *ssize) { + SSuperTableObj *pTable = (SSuperTableObj *) row; + assert(row != NULL && str != NULL); + + int tsize = pTable->updateEnd - (int8_t *) pTable; + if (size < tsize + pTable->schemaSize + 1) { + *ssize = -1; + return NULL; + } + + memcpy(str, pTable, tsize); + memcpy(str + tsize, pTable->schema, pTable->schemaSize); + *ssize = tsize + pTable->schemaSize; + + return NULL; +} + +void *mgmtSuperTableActionDecode(void *row, char *str, int size, int *ssize) { + assert(str != NULL); + + SSuperTableObj *pTable = (SSuperTableObj *)malloc(sizeof(SSuperTableObj)); + if (pTable == NULL) { + return NULL; + } + memset(pTable, 0, sizeof(STabObj)); + + int tsize = pTable->updateEnd - (int8_t *)pTable; + if (size < tsize) { + mgmtDestroySuperTable(pTable); + return NULL; + } + memcpy(pTable, str, tsize); + + pTable->schema = (char *)malloc(pTable->schemaSize); + if (pTable->schema == NULL) { + mgmtDestroySuperTable(pTable); + return NULL; + } + + memcpy(pTable->schema, str + tsize, pTable->schemaSize); + return (void *)pTable; +} + +void *mgmtSuperTableAction(char action, void *row, char *str, int size, int *ssize) { + if (mgmtSuperTableActionFp[(uint8_t)action] != NULL) { + return (*(mgmtSuperTableActionFp[(uint8_t)action]))(row, str, size, ssize); + } + return NULL; +} + int32_t mgmtInitSuperTables() { + void * pNode = NULL; + void * pLastNode = NULL; + SVgObj * pVgroup = NULL; + STabObj * pTable = NULL; + STabObj * pMetric = NULL; + SDbObj * pDb = NULL; + SAcctObj *pAcct = NULL; + + // TODO: Make sure this function only run once + mgmtSuperTableActionInit(); + + tsSuperTableSdb = sdbOpenTable(tsMaxTables, sizeof(STabObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_MAX_SQL_LEN, + "meters", SDB_KEYTYPE_STRING, mgmtDirectory, mgmtSuperTableAction); + if (meterSdb == NULL) { + mError("failed to init meter data"); + return -1; + } + + pNode = NULL; + while (1) { + pNode = sdbFetchRow(meterSdb, pNode, (void **)&pTable); + if (pTable == NULL) break; + if (mgmtIsSuperTable(pTable)) pTable->numOfMeters = 0; + } + + pNode = NULL; + while (1) { + pLastNode = pNode; + pNode = sdbFetchRow(meterSdb, pNode, (void **)&pTable); + if (pTable == NULL) break; + + pDb = mgmtGetDbByMeterId(pTable->meterId); + if (pDb == NULL) { + mError("meter:%s, failed to get db, discard it", pTable->meterId, pTable->gid.vgId, pTable->gid.sid); + pTable->gid.vgId = 0; + sdbDeleteRow(meterSdb, pTable); + pNode = pLastNode; + continue; + } + + if (mgmtIsNormalTable(pTable)) { + pVgroup = mgmtGetVgroup(pTable->gid.vgId); + + if (pVgroup == NULL) { + mError("meter:%s, failed to get vgroup:%d sid:%d, discard it", pTable->meterId, pTable->gid.vgId, pTable->gid.sid); + pTable->gid.vgId = 0; + sdbDeleteRow(meterSdb, pTable); + pNode = pLastNode; + continue; + } + + if (strcmp(pVgroup->dbName, pDb->name) != 0) { + mError("meter:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it", + pTable->meterId, pDb->name, pTable->gid.vgId, pVgroup->dbName, pTable->gid.sid); + pTable->gid.vgId = 0; + sdbDeleteRow(meterSdb, pTable); + pNode = pLastNode; + continue; + } + + if ( pVgroup->meterList == NULL) { + mError("meter:%s, vgroup:%d meterlist is null", pTable->meterId, pTable->gid.vgId); + pTable->gid.vgId = 0; + sdbDeleteRow(meterSdb, pTable); + pNode = pLastNode; + continue; + } + + pVgroup->meterList[pTable->gid.sid] = pTable; + taosIdPoolMarkStatus(pVgroup->idPool, pTable->gid.sid, 1); + + if (pTable->tableType == TSDB_TABLE_TYPE_STREAM_TABLE) { + pTable->pSql = (char *)pTable->schema + sizeof(SSchema) * pTable->numOfColumns; + } + + if (mgmtTableCreateFromSuperTable(pTable)) { + pTable->pTagData = (char *)pTable->schema; // + sizeof(SSchema)*pTable->numOfColumns; + pMetric = mgmtGetTable(pTable->pTagData); + if (pMetric) mgmtAddMeterIntoMetric(pMetric, pTable); + } + + pAcct = mgmtGetAcct(pDb->cfg.acct); + if (pAcct) mgmtAddMeterStatisticToAcct(pTable, pAcct); + } else { + if (pDb) mgmtAddMetricIntoDb(pDb, pTable); + } + } + + mgmtSetVgroupIdPool(); + + mTrace("meter is initialized"); return 0; } @@ -176,6 +374,15 @@ int32_t mgmtAddSuperTableTag(SSuperTableObj *pMetric, SSchema schema[], int32_t int32_t ret = sdbBatchUpdateRow(tsSuperTableSdb, msg, size); tfree(msg); +// +// if (msg->type == SDB_TYPE_INSERT) { // Insert schema +// uint32_t total_cols = pTable->numOfColumns + pTable->numOfTags; +// pTable->schema = realloc(pTable->schema, (total_cols + msg->cols) * sizeof(SSchema)); +// pTable->schemaSize = (total_cols + msg->cols) * sizeof(SSchema); +// pTable->numOfTags += msg->cols; +// memcpy(pTable->schema + total_cols * sizeof(SSchema), msg->data, msg->cols * sizeof(SSchema)); +// +// } if (ret < 0) { mError("Failed to add tag column %s to table %s", schema[0].name, pMetric->tableId); @@ -200,7 +407,20 @@ int32_t mgmtDropSuperTableTag(SSuperTableObj *pMetric, char *tagName) { memcpy(msg->meterId, pMetric->tableId, TSDB_TABLE_ID_LEN); msg->type = SDB_TYPE_DELETE; msg->cols = 1; - +// +// // Make sure the order of tag columns +// SchemaUnit *schemaUnit = (SchemaUnit *)(msg->data); +// int col = schemaUnit->col; +// assert(col > 0 && col < pTable->numOfTags); +// if (col < pTable->numOfTags - 1) { +// memmove(pTable->schema + sizeof(SSchema) * (pTable->numOfColumns + col), +// pTable->schema + sizeof(SSchema) * (pTable->numOfColumns + col + 1), +// pTable->schemaSize - (sizeof(SSchema) * (pTable->numOfColumns + col + 1))); +// } +// pTable->schemaSize -= sizeof(SSchema); +// pTable->numOfTags--; +// pTable->schema = realloc(pTable->schema, pTable->schemaSize); +// ((SchemaUnit *) (msg->data))->col = col; ((SchemaUnit *) (msg->data))->pos = mgmtGetTagsLength(pMetric, col) + TSDB_TABLE_ID_LEN; ((SchemaUnit *) (msg->data))->schema = *(SSchema *) (pMetric->schema + sizeof(SSchema) * (pMetric->numOfColumns + col)); @@ -242,7 +462,7 @@ int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pMetric, char *oldTagN if (msg == NULL) return TSDB_CODE_APP_ERROR; memset(msg, 0, size); - mgmtMeterActionEncode(pMetric, msg, size, &rowSize); + mgmtSuperTableActionEncode(pMetric, msg, size, &rowSize); int32_t ret = sdbUpdateRow(tsSuperTableSdb, msg, rowSize, 1); tfree(msg); @@ -470,4 +690,43 @@ int mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int rows, SConnObj *pCo pShow->numOfReads += numOfRows; return numOfRows; +} + + +int mgmtAddMeterIntoMetric(STabObj *pMetric, STabObj *pTable) { + if (pTable == NULL || pMetric == NULL) return -1; + + pthread_rwlock_wrlock(&(pMetric->rwLock)); + // add meter into skip list + pTable->next = pMetric->pHead; + pTable->prev = NULL; + + if (pMetric->pHead) pMetric->pHead->prev = pTable; + + pMetric->pHead = pTable; + pMetric->numOfMeters++; + + addMeterIntoMetricIndex(pMetric, pTable); + + pthread_rwlock_unlock(&(pMetric->rwLock)); + + return 0; +} + +int mgmtRemoveMeterFromMetric(STabObj *pMetric, STabObj *pTable) { + pthread_rwlock_wrlock(&(pMetric->rwLock)); + + if (pTable->prev) pTable->prev->next = pTable->next; + + if (pTable->next) pTable->next->prev = pTable->prev; + + if (pTable->prev == NULL) pMetric->pHead = pTable->next; + + pMetric->numOfMeters--; + + removeMeterFromMetricIndex(pMetric, pTable); + + pthread_rwlock_unlock(&(pMetric->rwLock)); + + return 0; } \ No newline at end of file diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 06f13d1f56..6aa90f34f1 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -48,17 +48,6 @@ extern int64_t sdbVersion; void *meterSdb = NULL; void *(*mgmtMeterActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize); -// Function declaration -void *mgmtMeterActionInsert(void *row, char *str, int size, int *ssize); -void *mgmtMeterActionDelete(void *row, char *str, int size, int *ssize); -void *mgmtMeterActionUpdate(void *row, char *str, int size, int *ssize); -void *mgmtMeterActionEncode(void *row, char *str, int size, int *ssize); -void *mgmtMeterActionDecode(void *row, char *str, int size, int *ssize); -void *mgmtMeterActionBeforeBatchUpdate(void *row, char *str, int size, int *ssize); -void *mgmtMeterActionBatchUpdate(void *row, char *str, int size, int *ssize); -void *mgmtMeterActionAfterBatchUpdate(void *row, char *str, int size, int *ssize); -void *mgmtMeterActionReset(void *row, char *str, int size, int *ssize); -void *mgmtMeterActionDestroy(void *row, char *str, int size, int *ssize); int32_t mgmtMeterAddTags(STabObj *pMetric, SSchema schema[], int ntags); static void removeMeterFromMetricIndex(STabObj *pMetric, STabObj *pTable); static void addMeterIntoMetricIndex(STabObj *pMetric, STabObj *pTable); @@ -70,19 +59,6 @@ int32_t mgmtMeterDropColumnByName(STabObj *pTable, const char *name); static int dropMeterImp(SDbObj *pDb, STabObj * pTable, SAcctObj *pAcct); static void dropAllMetersOfMetric(SDbObj *pDb, STabObj * pMetric, SAcctObj *pAcct); -static void mgmtMeterActionInit() { - mgmtMeterActionFp[SDB_TYPE_INSERT] = mgmtMeterActionInsert; - mgmtMeterActionFp[SDB_TYPE_DELETE] = mgmtMeterActionDelete; - mgmtMeterActionFp[SDB_TYPE_UPDATE] = mgmtMeterActionUpdate; - mgmtMeterActionFp[SDB_TYPE_ENCODE] = mgmtMeterActionEncode; - mgmtMeterActionFp[SDB_TYPE_DECODE] = mgmtMeterActionDecode; - mgmtMeterActionFp[SDB_TYPE_BEFORE_BATCH_UPDATE] = mgmtMeterActionBeforeBatchUpdate; - mgmtMeterActionFp[SDB_TYPE_BATCH_UPDATE] = mgmtMeterActionBatchUpdate; - mgmtMeterActionFp[SDB_TYPE_AFTER_BATCH_UPDATE] = mgmtMeterActionAfterBatchUpdate; - mgmtMeterActionFp[SDB_TYPE_RESET] = mgmtMeterActionReset; - mgmtMeterActionFp[SDB_TYPE_DESTROY] = mgmtMeterActionDestroy; -} - static int32_t mgmtGetReqTagsLength(STabObj *pMetric, int16_t *cols, int32_t numOfCols) { assert(mgmtIsSuperTable(pMetric) && numOfCols >= 0 && numOfCols <= TSDB_MAX_TAGS + 1); @@ -110,300 +86,6 @@ static void mgmtVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_ } } -void *mgmtMeterActionReset(void *row, char *str, int size, int *ssize) { - STabObj *pTable = (STabObj *)row; - int tsize = pTable->updateEnd - (char *)pTable; - memcpy(pTable, str, tsize); - pTable->schema = (char *)realloc(pTable->schema, pTable->schemaSize); - memcpy(pTable->schema, str + tsize, pTable->schemaSize); - - if (mgmtTableCreateFromSuperTable(pTable)) { - pTable->pTagData = pTable->schema; - } - - return NULL; -} - -void *mgmtMeterActionDestroy(void *row, char *str, int size, int *ssize) { - STabObj *pTable = (STabObj *)row; - mgmtDestroyMeter(pTable); - return NULL; -} - -void *mgmtMeterActionInsert(void *row, char *str, int size, int *ssize) { - STabObj * pTable = NULL; - SVgObj * pVgroup = NULL; - SDbObj * pDb = NULL; - STabObj * pMetric = NULL; - SAcctObj *pAcct = NULL; - - pTable = (STabObj *)row; - - if (mgmtIsNormalTable(pTable)) { - pVgroup = mgmtGetVgroup(pTable->gid.vgId); - if (pVgroup == NULL) { - mError("id:%s not in vgroup:%d", pTable->meterId, pTable->gid.vgId); - return NULL; - } - - pDb = mgmtGetDb(pVgroup->dbName); - if (pDb == NULL) { - mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName); - return NULL; - } - - pAcct = mgmtGetAcct(pDb->cfg.acct); - // TODO : check if account exists. - if (pAcct == NULL) { - mError("account not exists"); - return NULL; - } - } - - if (mgmtTableCreateFromSuperTable(pTable)) { - pTable->pTagData = (char *)pTable->schema; - pMetric = mgmtGetTable(pTable->pTagData); - assert(pMetric != NULL); - } - - if (pTable->tableType == TSDB_TABLE_TYPE_STREAM_TABLE) { - pTable->pSql = (char *)pTable->schema + sizeof(SSchema) * pTable->numOfColumns; - } - - if (mgmtIsNormalTable(pTable)) { - if (pMetric) mgmtAddMeterIntoMetric(pMetric, pTable); - - if (!sdbMaster) { - int sid = taosAllocateId(pVgroup->idPool); - if (sid != pTable->gid.sid) { - mError("sid:%d is not matched from the master:%d", sid, pTable->gid.sid); - return NULL; - } - } - - pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1); - pVgroup->numOfMeters++; - pDb->numOfTables++; - pVgroup->meterList[pTable->gid.sid] = pTable; - - if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) mgmtMoveVgroupToTail(pDb, pVgroup); - } else { - // insert a metric - pTable->pHead = NULL; - pTable->pSkipList = NULL; - pDb = mgmtGetDbByMeterId(pTable->meterId); - if (pDb) { - mgmtAddMetricIntoDb(pDb, pTable); - } - } - - return NULL; -} - -void *mgmtMeterActionDelete(void *row, char *str, int size, int *ssize) { - STabObj *pTable = NULL; - SVgObj * pVgroup = NULL; - SDbObj * pDb = NULL; - STabObj *pMetric = NULL; - - pTable = (STabObj *)row; - - if (mgmtIsNormalTable(pTable)) { - if (pTable->gid.vgId == 0) { - return NULL; - } - - pVgroup = mgmtGetVgroup(pTable->gid.vgId); - if (pVgroup == NULL) { - mError("id:%s not in vgroup:%d", pTable->meterId, pTable->gid.vgId); - return NULL; - } - - pDb = mgmtGetDb(pVgroup->dbName); - if (pDb == NULL) { - mError("vgroup:%d not in DB:%s", pVgroup->vgId, pVgroup->dbName); - return NULL; - } - } - - if (mgmtTableCreateFromSuperTable(pTable)) { - pTable->pTagData = (char *)pTable->schema; - pMetric = mgmtGetTable(pTable->pTagData); - assert(pMetric != NULL); - } - - if (mgmtIsNormalTable(pTable)) { - if (pMetric) mgmtRemoveMeterFromMetric(pMetric, pTable); - - pVgroup->meterList[pTable->gid.sid] = NULL; - pVgroup->numOfMeters--; - pDb->numOfTables--; - taosFreeId(pVgroup->idPool, pTable->gid.sid); - - if (pVgroup->numOfMeters > 0) mgmtMoveVgroupToHead(pDb, pVgroup); - } else { - // remove a metric - // remove all the associated meters - - pDb = mgmtGetDbByMeterId(pTable->meterId); - if (pDb) mgmtRemoveMetricFromDb(pDb, pTable); - } - - return NULL; -} - -void *mgmtMeterActionUpdate(void *row, char *str, int size, int *ssize) { - STabObj *pTable = NULL; - STabObj *pMetric = NULL; - - pTable = (STabObj *)row; - STabObj *pNew = (STabObj *)str; - - if (pNew->isDirty) { - pMetric = mgmtGetTable(pTable->pTagData); - removeMeterFromMetricIndex(pMetric, pTable); - } - mgmtMeterActionReset(pTable, str, size, NULL); - pTable->pTagData = pTable->schema; - if (pNew->isDirty) { - addMeterIntoMetricIndex(pMetric, pTable); - pTable->isDirty = 0; - } - - return NULL; -} - -void *mgmtMeterActionEncode(void *row, char *str, int size, int *ssize) { - assert(row != NULL && str != NULL); - - STabObj *pTable = (STabObj *)row; - int tsize = pTable->updateEnd - (char *)pTable; - - if (size < tsize + pTable->schemaSize + 1) { - *ssize = -1; - return NULL; - } - - memcpy(str, pTable, tsize); - memcpy(str + tsize, pTable->schema, pTable->schemaSize); - - *ssize = tsize + pTable->schemaSize; - - return NULL; -} - -void *mgmtMeterActionDecode(void *row, char *str, int size, int *ssize) { - assert(str != NULL); - - STabObj *pTable = (STabObj *)malloc(sizeof(STabObj)); - if (pTable == NULL) return NULL; - memset(pTable, 0, sizeof(STabObj)); - - int tsize = pTable->updateEnd - (char *)pTable; - if (size < tsize) { - mgmtDestroyMeter(pTable); - return NULL; - } - memcpy(pTable, str, tsize); - - pTable->schema = (char *)malloc(pTable->schemaSize); - if (pTable->schema == NULL) { - mgmtDestroyMeter(pTable); - return NULL; - } - - memcpy(pTable->schema, str + tsize, pTable->schemaSize); - return (void *)pTable; -} - -void *mgmtMeterActionBeforeBatchUpdate(void *row, char *str, int size, int *ssize) { - STabObj *pMetric = (STabObj *)row; - - pthread_rwlock_wrlock(&(pMetric->rwLock)); - - return NULL; -} - -void *mgmtMeterActionBatchUpdate(void *row, char *str, int size, int *ssize) { - STabObj * pTable = (STabObj *)row; - SMeterBatchUpdateMsg *msg = (SMeterBatchUpdateMsg *)str; - - if (mgmtIsSuperTable(pTable)) { - if (msg->type == SDB_TYPE_INSERT) { // Insert schema - uint32_t total_cols = pTable->numOfColumns + pTable->numOfTags; - pTable->schema = realloc(pTable->schema, (total_cols + msg->cols) * sizeof(SSchema)); - pTable->schemaSize = (total_cols + msg->cols) * sizeof(SSchema); - pTable->numOfTags += msg->cols; - memcpy(pTable->schema + total_cols * sizeof(SSchema), msg->data, msg->cols * sizeof(SSchema)); - - } else if (msg->type == SDB_TYPE_DELETE) { // Delete schema - // Make sure the order of tag columns - SchemaUnit *schemaUnit = (SchemaUnit *)(msg->data); - int col = schemaUnit->col; - assert(col > 0 && col < pTable->numOfTags); - if (col < pTable->numOfTags - 1) { - memmove(pTable->schema + sizeof(SSchema) * (pTable->numOfColumns + col), - pTable->schema + sizeof(SSchema) * (pTable->numOfColumns + col + 1), - pTable->schemaSize - (sizeof(SSchema) * (pTable->numOfColumns + col + 1))); - } - pTable->schemaSize -= sizeof(SSchema); - pTable->numOfTags--; - pTable->schema = realloc(pTable->schema, pTable->schemaSize); - } - - return pTable->pHead; - - } else if (mgmtTableCreateFromSuperTable(pTable)) { - if (msg->type == SDB_TYPE_INSERT) { - SSchema *schemas = (SSchema *)msg->data; - int total_size = 0; - for (int i = 0; i < msg->cols; i++) { - total_size += schemas[i].bytes; - } - pTable->schema = realloc(pTable->schema, pTable->schemaSize + total_size); - pTable->pTagData = pTable->schema; - memset(pTable->schema + pTable->schemaSize, 0, total_size); - pTable->schemaSize += total_size; - // TODO: set the data as default value - } else if (msg->type == SDB_TYPE_DELETE) { // Delete values in MTABLEs - SchemaUnit *schemaUnit = (SchemaUnit *)(msg->data); - int32_t pos = schemaUnit->pos; - int32_t bytes = schemaUnit->schema.bytes; - assert(pos + bytes <= pTable->schemaSize); - - if (pos + bytes != pTable->schemaSize) { - memmove(pTable->schema + pos, pTable->schema + pos + bytes, pTable->schemaSize - (pos + bytes)); - } - - pTable->schemaSize -= bytes; - pTable->schema = realloc(pTable->schema, pTable->schemaSize); - } - - return pTable->next; - } - - return NULL; -} - -void *mgmtMeterActionAfterBatchUpdate(void *row, char *str, int size, int *ssize) { - STabObj *pMetric = (STabObj *)row; - - pthread_rwlock_unlock(&(pMetric->rwLock)); - - return NULL; -} - -void *mgmtMeterAction(char action, void *row, char *str, int size, int *ssize) { - if (mgmtMeterActionFp[(uint8_t)action] != NULL) { - return (*(mgmtMeterActionFp[(uint8_t)action]))(row, str, size, ssize); - } - return NULL; -} - -void mgmtAddMeterStatisticToAcct(STabObj *pTable, SAcctObj *pAcct) { - pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1); -} - int mgmtInitMeters() { void * pNode = NULL; void * pLastNode = NULL; @@ -663,44 +345,6 @@ int mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter) { return TSDB_CODE_OPS_NOT_SUPPORT; } -int mgmtAddMeterIntoMetric(STabObj *pMetric, STabObj *pTable) { - if (pTable == NULL || pMetric == NULL) return -1; - - pthread_rwlock_wrlock(&(pMetric->rwLock)); - // add meter into skip list - pTable->next = pMetric->pHead; - pTable->prev = NULL; - - if (pMetric->pHead) pMetric->pHead->prev = pTable; - - pMetric->pHead = pTable; - pMetric->numOfMeters++; - - addMeterIntoMetricIndex(pMetric, pTable); - - pthread_rwlock_unlock(&(pMetric->rwLock)); - - return 0; -} - -int mgmtRemoveMeterFromMetric(STabObj *pMetric, STabObj *pTable) { - pthread_rwlock_wrlock(&(pMetric->rwLock)); - - if (pTable->prev) pTable->prev->next = pTable->next; - - if (pTable->next) pTable->next->prev = pTable->prev; - - if (pTable->prev == NULL) pMetric->pHead = pTable->next; - - pMetric->numOfMeters--; - - removeMeterFromMetricIndex(pMetric, pTable); - - pthread_rwlock_unlock(&(pMetric->rwLock)); - - return 0; -} - void mgmtCleanUpMeters() { mgmtCleanUpNormalTables(); mgmtCleanUpStreamTables(); -- GitLab