提交 b7050af1 编写于 作者: S slguan

reorgnize mgmtVgroup.c

上级 79341c73
...@@ -133,9 +133,18 @@ typedef struct _tab_obj { ...@@ -133,9 +133,18 @@ typedef struct _tab_obj {
// SSchema schema[]; // SSchema schema[];
} STabObj; } STabObj;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
uint64_t uid;
int32_t sid;
int32_t vgId;
int64_t createdTime;
} STableInfo;
typedef struct SSuperTableObj { typedef struct SSuperTableObj {
char tableId[TSDB_TABLE_ID_LEN + 1]; char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
uint64_t uid; uint64_t uid;
int32_t sid; int32_t sid;
int32_t vgId; int32_t vgId;
...@@ -156,11 +165,12 @@ typedef struct SSuperTableObj { ...@@ -156,11 +165,12 @@ typedef struct SSuperTableObj {
typedef struct { typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1]; char tableId[TSDB_TABLE_ID_LEN + 1];
char superTableId[TSDB_TABLE_ID_LEN + 1]; int8_t type;
uint64_t uid; uint64_t uid;
int32_t sid; int32_t sid;
int32_t vgId; int32_t vgId;
int64_t createdTime; int64_t createdTime;
char superTableId[TSDB_TABLE_ID_LEN + 1];
int8_t reserved[7]; int8_t reserved[7];
int8_t updateEnd[1]; int8_t updateEnd[1];
SSuperTableObj *superTable; SSuperTableObj *superTable;
...@@ -168,6 +178,7 @@ typedef struct { ...@@ -168,6 +178,7 @@ typedef struct {
typedef struct { typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1]; char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
uint64_t uid; uint64_t uid;
int32_t sid; int32_t sid;
int32_t vgId; int32_t vgId;
...@@ -183,6 +194,7 @@ typedef struct { ...@@ -183,6 +194,7 @@ typedef struct {
typedef struct { typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1]; char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
uint64_t uid; uint64_t uid;
int32_t sid; int32_t sid;
int32_t vgId; int32_t vgId;
...@@ -198,7 +210,6 @@ typedef struct { ...@@ -198,7 +210,6 @@ typedef struct {
char* schema; char* schema;
} SStreamTableObj; } SStreamTableObj;
typedef struct _vg_obj { typedef struct _vg_obj {
uint32_t vgId; uint32_t vgId;
char dbName[TSDB_DB_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN];
...@@ -215,7 +226,7 @@ typedef struct _vg_obj { ...@@ -215,7 +226,7 @@ typedef struct _vg_obj {
int8_t updateEnd[1]; int8_t updateEnd[1];
struct _vg_obj *prev, *next; struct _vg_obj *prev, *next;
void * idPool; void * idPool;
void ** meterList; STableInfo ** tableList;
} SVgObj; } SVgObj;
typedef struct _db_obj { typedef struct _db_obj {
......
...@@ -32,8 +32,8 @@ typedef struct { ...@@ -32,8 +32,8 @@ typedef struct {
} STableObj; } STableObj;
int mgmtInitMeters(); int mgmtInitMeters();
STableObj mgmtGetTable(char *tableId); STableInfo* mgmtGetTable(char *tableId);
STableObj mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid); STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid);
STabObj *mgmtGetTableInfo(char *src, char *tags[]); STabObj *mgmtGetTableInfo(char *src, char *tags[]);
int mgmtRetrieveMetricMeta(SConnObj *pConn, char **pStart, SSuperTableMetaMsg *pInfo); int mgmtRetrieveMetricMeta(SConnObj *pConn, char **pStart, SSuperTableMetaMsg *pInfo);
......
...@@ -111,7 +111,7 @@ void *mgmtChildTableActionInsert(void *row, char *str, int size, int *ssize) { ...@@ -111,7 +111,7 @@ void *mgmtChildTableActionInsert(void *row, char *str, int size, int *ssize) {
pAcct->acctInfo.numOfTimeSeries += (pTable->superTable->numOfColumns - 1); pAcct->acctInfo.numOfTimeSeries += (pTable->superTable->numOfColumns - 1);
pVgroup->numOfMeters++; pVgroup->numOfMeters++;
pDb->numOfTables++; pDb->numOfTables++;
pVgroup->meterList[pTable->sid] = pTable; pVgroup->tableList[pTable->sid] = pTable;
if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) { if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) {
mgmtMoveVgroupToTail(pDb, pVgroup); mgmtMoveVgroupToTail(pDb, pVgroup);
...@@ -145,7 +145,7 @@ void *mgmtChildTableActionDelete(void *row, char *str, int size, int *ssize) { ...@@ -145,7 +145,7 @@ void *mgmtChildTableActionDelete(void *row, char *str, int size, int *ssize) {
} }
pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1); pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1);
pVgroup->meterList[pTable->sid] = NULL; pVgroup->tableList[pTable->sid] = NULL;
pVgroup->numOfMeters--; pVgroup->numOfMeters--;
pDb->numOfTables--; pDb->numOfTables--;
taosFreeId(pVgroup->idPool, pTable->sid); taosFreeId(pVgroup->idPool, pTable->sid);
......
...@@ -87,19 +87,19 @@ int32_t mgmtProcessMeterCfgMsg(int8_t *pCont, int32_t contLen, void *pConn) { ...@@ -87,19 +87,19 @@ int32_t mgmtProcessMeterCfgMsg(int8_t *pCont, int32_t contLen, void *pConn) {
int32_t vnode = htonl(cfg->vnode); int32_t vnode = htonl(cfg->vnode);
int32_t sid = htonl(cfg->sid); int32_t sid = htonl(cfg->sid);
STableObj table = mgmtGetTableByPos(0, vnode, sid); STableInfo *pTable = mgmtGetTableByPos(0, vnode, sid);
if (table.obj == NULL) { if (pTable == NULL) {
mgmtSendSimpleRspToDnode(pConn, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_INVALID_TABLE); mgmtSendSimpleRspToDnode(pConn, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_INVALID_TABLE);
return TSDB_CODE_INVALID_TABLE_ID; return TSDB_CODE_INVALID_TABLE_ID;
} }
int8_t *pCreateTableMsg = NULL; int8_t *pCreateTableMsg = NULL;
if (table.type == TSDB_TABLE_TYPE_NORMAL_TABLE) { if (pTable->type == TSDB_TABLE_TYPE_NORMAL_TABLE) {
pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)table.obj, vnode); pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)pTable, vnode);
} else if (table.type == TSDB_TABLE_TYPE_CHILD_TABLE) { } else if (pTable->type == TSDB_TABLE_TYPE_CHILD_TABLE) {
pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)table.obj, vnode); pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)pTable, vnode);
} else if (table.type == TSDB_TABLE_TYPE_STREAM_TABLE) { } else if (pTable->type == TSDB_TABLE_TYPE_STREAM_TABLE) {
pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)table.obj, vnode); pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)pTable, vnode);
} else {} } else {}
if (pCreateTableMsg != NULL) { if (pCreateTableMsg != NULL) {
......
...@@ -110,7 +110,7 @@ void *mgmtNormalTableActionInsert(void *row, char *str, int size, int *ssize) { ...@@ -110,7 +110,7 @@ void *mgmtNormalTableActionInsert(void *row, char *str, int size, int *ssize) {
pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1); pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1);
pVgroup->numOfMeters++; pVgroup->numOfMeters++;
pDb->numOfTables++; pDb->numOfTables++;
pVgroup->meterList[pTable->sid] = pTable; pVgroup->tableList[pTable->sid] = pTable;
if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) { if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) {
mgmtMoveVgroupToTail(pDb, pVgroup); mgmtMoveVgroupToTail(pDb, pVgroup);
...@@ -144,7 +144,7 @@ void *mgmtNormalTableActionDelete(void *row, char *str, int size, int *ssize) { ...@@ -144,7 +144,7 @@ void *mgmtNormalTableActionDelete(void *row, char *str, int size, int *ssize) {
} }
pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1); pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1);
pVgroup->meterList[pTable->sid] = NULL; pVgroup->tableList[pTable->sid] = NULL;
pVgroup->numOfMeters--; pVgroup->numOfMeters--;
pDb->numOfTables--; pDb->numOfTables--;
taosFreeId(pVgroup->idPool, pTable->sid); taosFreeId(pVgroup->idPool, pTable->sid);
......
...@@ -167,7 +167,7 @@ bool mgmtCheckMeterMetaMsgType(char *pMsg) { ...@@ -167,7 +167,7 @@ bool mgmtCheckMeterMetaMsgType(char *pMsg) {
SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg; SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg;
int16_t autoCreate = htons(pInfo->createFlag); int16_t autoCreate = htons(pInfo->createFlag);
STableObj table = mgmtGetTable(pInfo->meterId); STableInfo *table = mgmtGetTable(pInfo->meterId);
// If table does not exists and autoCreate flag is set, we add the handler into another task queue, namely tranQueue // If table does not exists and autoCreate flag is set, we add the handler into another task queue, namely tranQueue
// bool addIntoTranQueue = (pMeterObj == NULL && autoCreate == 1); // bool addIntoTranQueue = (pMeterObj == NULL && autoCreate == 1);
......
...@@ -115,7 +115,7 @@ void *mgmtStreamTableActionInsert(void *row, char *str, int size, int *ssize) { ...@@ -115,7 +115,7 @@ void *mgmtStreamTableActionInsert(void *row, char *str, int size, int *ssize) {
pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1); pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1);
pVgroup->numOfMeters++; pVgroup->numOfMeters++;
pDb->numOfTables++; pDb->numOfTables++;
pVgroup->meterList[pTable->sid] = pTable; pVgroup->tableList[pTable->sid] = pTable;
if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) { if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) {
mgmtMoveVgroupToTail(pDb, pVgroup); mgmtMoveVgroupToTail(pDb, pVgroup);
...@@ -149,7 +149,7 @@ void *mgmtStreamTableActionDelete(void *row, char *str, int size, int *ssize) { ...@@ -149,7 +149,7 @@ void *mgmtStreamTableActionDelete(void *row, char *str, int size, int *ssize) {
} }
pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1); pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1);
pVgroup->meterList[pTable->sid] = NULL; pVgroup->tableList[pTable->sid] = NULL;
pVgroup->numOfMeters--; pVgroup->numOfMeters--;
pDb->numOfTables--; pDb->numOfTables--;
taosFreeId(pVgroup->idPool, pTable->sid); taosFreeId(pVgroup->idPool, pTable->sid);
......
...@@ -95,44 +95,37 @@ int mgmtInitMeters() { ...@@ -95,44 +95,37 @@ int mgmtInitMeters() {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
STableObj mgmtGetTable(char *tableId) { STableInfo* mgmtGetTable(char *tableId) {
STableObj table = {.type = TSDB_TABLE_TYPE_MAX, .obj = NULL}; STableInfo *tableInfo = (STableInfo *) mgmtGetSuperTable(tableId);
if (tableInfo != NULL) {
table.obj = mgmtGetSuperTable(tableId); return tableInfo;
if (table.obj != NULL) {
table.type = TSDB_TABLE_TYPE_SUPER_TABLE;
return table;
} }
table.obj = mgmtGetNormalTable(tableId); tableInfo = (STableInfo *) mgmtGetNormalTable(tableId);
if (table.obj != NULL) { if (tableInfo != NULL) {
table.type = TSDB_TABLE_TYPE_NORMAL_TABLE; return tableInfo;
return table;
} }
table.obj = mgmtGetStreamTable(tableId); tableInfo = (STableInfo *) mgmtGetStreamTable(tableId);
if (table.obj != NULL) { if (tableInfo != NULL) {
table.type = TSDB_TABLE_TYPE_STREAM_TABLE; return tableInfo;
return table;
} }
table.obj = mgmtGetNormalTable(tableId); tableInfo = (STableInfo *) mgmtGetNormalTable(tableId);
if (table.obj != NULL) { if (tableInfo != NULL) {
table.type = TSDB_TABLE_TYPE_CHILD_TABLE; return tableInfo;
return table;
} }
return table; return NULL;
} }
STableObj mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid) { STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid) {
STableObj table = {0}; return NULL;
return table;
} }
int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) { int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) {
STableObj table = mgmtGetTable(pCreate->meterId); STableInfo *table = mgmtGetTable(pCreate->meterId);
if (table.obj != NULL) { if (table != NULL) {
if (pCreate->igExists) { if (pCreate->igExists) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
...@@ -183,8 +176,8 @@ int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) { ...@@ -183,8 +176,8 @@ int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) {
} }
int mgmtDropTable(SDbObj *pDb, char *tableId, int ignore) { int mgmtDropTable(SDbObj *pDb, char *tableId, int ignore) {
STableObj table = mgmtGetTable(tableId); STableInfo *table = mgmtGetTable(tableId);
if (table.obj == NULL) { if (table == NULL) {
if (ignore) { if (ignore) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
...@@ -197,23 +190,23 @@ int mgmtDropTable(SDbObj *pDb, char *tableId, int ignore) { ...@@ -197,23 +190,23 @@ int mgmtDropTable(SDbObj *pDb, char *tableId, int ignore) {
return TSDB_CODE_MONITOR_DB_FORBIDDEN; return TSDB_CODE_MONITOR_DB_FORBIDDEN;
} }
switch (table.type) { switch (table->type) {
case TSDB_TABLE_TYPE_SUPER_TABLE: case TSDB_TABLE_TYPE_SUPER_TABLE:
return mgmtDropSuperTable(pDb, table.obj); return mgmtDropSuperTable(pDb, table);
case TSDB_TABLE_TYPE_CHILD_TABLE: case TSDB_TABLE_TYPE_CHILD_TABLE:
return mgmtDropChildTable(pDb, table.obj); return mgmtDropChildTable(pDb, table);
case TSDB_TABLE_TYPE_STREAM_TABLE: case TSDB_TABLE_TYPE_STREAM_TABLE:
return mgmtDropStreamTable(pDb, table.obj); return mgmtDropStreamTable(pDb, table);
case TSDB_TABLE_TYPE_NORMAL_TABLE: case TSDB_TABLE_TYPE_NORMAL_TABLE:
return mgmtDropNormalTable(pDb, table.obj); return mgmtDropNormalTable(pDb, table);
default: default:
return TSDB_CODE_INVALID_TABLE; return TSDB_CODE_INVALID_TABLE;
} }
} }
int mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter) { int mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter) {
STableObj table = mgmtGetTable(pAlter->meterId); STableInfo *table = mgmtGetTable(pAlter->meterId);
if (table.obj == NULL) { if (table == NULL) {
return TSDB_CODE_INVALID_TABLE; return TSDB_CODE_INVALID_TABLE;
} }
...@@ -232,32 +225,32 @@ int mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter) { ...@@ -232,32 +225,32 @@ int mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter) {
// todo add // todo add
/* mgmtMeterAddTags */ /* mgmtMeterAddTags */
if (pAlter->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) { if (pAlter->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) {
if (table.type == TSDB_TABLE_TYPE_SUPER_TABLE) { if (table->type == TSDB_TABLE_TYPE_SUPER_TABLE) {
return mgmtAddSuperTableTag(table.obj, pAlter->schema, 1); return mgmtAddSuperTableTag(table, pAlter->schema, 1);
} }
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) { } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) {
if (table.type == TSDB_TABLE_TYPE_SUPER_TABLE) { if (table->type == TSDB_TABLE_TYPE_SUPER_TABLE) {
return mgmtDropSuperTableTag(table.obj, pAlter->schema[0].name); return mgmtDropSuperTableTag(table, pAlter->schema[0].name);
} }
} else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) { } else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) {
if (table.type == TSDB_TABLE_TYPE_SUPER_TABLE) { if (table->type == TSDB_TABLE_TYPE_SUPER_TABLE) {
return mgmtModifySuperTableTagNameByName(table.obj, pAlter->schema[0].name, pAlter->schema[1].name); return mgmtModifySuperTableTagNameByName(table, pAlter->schema[0].name, pAlter->schema[1].name);
} }
} else if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) { } else if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) {
if (table.type == TSDB_TABLE_TYPE_CHILD_TABLE) { if (table->type == TSDB_TABLE_TYPE_CHILD_TABLE) {
return mgmtModifyChildTableTagValueByName(table.obj, pAlter->schema[0].name, pAlter->tagVal); return mgmtModifyChildTableTagValueByName(table, pAlter->schema[0].name, pAlter->tagVal);
} }
} else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) { } else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) {
if (table.type == TSDB_TABLE_TYPE_NORMAL_TABLE) { if (table->type == TSDB_TABLE_TYPE_NORMAL_TABLE) {
return mgmtAddNormalTableColumn(table.obj, pAlter->schema, 1); return mgmtAddNormalTableColumn(table, pAlter->schema, 1);
} else if (table.type == TSDB_TABLE_TYPE_SUPER_TABLE) { } else if (table->type == TSDB_TABLE_TYPE_SUPER_TABLE) {
return mgmtAddSuperTableColumn(table.obj, pAlter->schema, 1); return mgmtAddSuperTableColumn(table, pAlter->schema, 1);
} else {} } else {}
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) { } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) {
if (table.type == TSDB_TABLE_TYPE_NORMAL_TABLE) { if (table->type == TSDB_TABLE_TYPE_NORMAL_TABLE) {
return mgmtDropNormalTableColumnByName(table.obj, pAlter->schema[0].name); return mgmtDropNormalTableColumnByName(table, pAlter->schema[0].name);
} else if (table.type == TSDB_TABLE_TYPE_SUPER_TABLE) { } else if (table->type == TSDB_TABLE_TYPE_SUPER_TABLE) {
return mgmtDropSuperTableColumnByName(table.obj, pAlter->schema[0].name); return mgmtDropSuperTableColumnByName(table, pAlter->schema[0].name);
} else {} } else {}
} else {} } else {}
......
...@@ -27,10 +27,8 @@ ...@@ -27,10 +27,8 @@
#include "mgmtTable.h" #include "mgmtTable.h"
#include "mgmtVgroup.h" #include "mgmtVgroup.h"
void * tsVgroupSdb = NULL; void *tsVgroupSdb = NULL;
int32_t tsVgUpdateSize; int32_t tsVgUpdateSize;
extern void *tsDbSdb;
extern void *tsUserSdb;
void *(*mgmtVgroupActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); void *(*mgmtVgroupActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtVgroupActionInsert(void *row, char *str, int32_t size, int32_t *ssize); void *mgmtVgroupActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
...@@ -38,9 +36,6 @@ void *mgmtVgroupActionDelete(void *row, char *str, int32_t size, int32_t *ssize) ...@@ -38,9 +36,6 @@ void *mgmtVgroupActionDelete(void *row, char *str, int32_t size, int32_t *ssize)
void *mgmtVgroupActionUpdate(void *row, char *str, int32_t size, int32_t *ssize); void *mgmtVgroupActionUpdate(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtVgroupActionEncode(void *row, char *str, int32_t size, int32_t *ssize); void *mgmtVgroupActionEncode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtVgroupActionDecode(void *row, char *str, int32_t size, int32_t *ssize); void *mgmtVgroupActionDecode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtVgroupActionBeforeBatchUpdate(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtVgroupActionBatchUpdate(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtVgroupActionAfterBatchUpdate(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtVgroupActionReset(void *row, char *str, int32_t size, int32_t *ssize); void *mgmtVgroupActionReset(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t *ssize); void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t *ssize);
...@@ -85,19 +80,19 @@ int32_t mgmtInitVgroups() { ...@@ -85,19 +80,19 @@ int32_t mgmtInitVgroups() {
pVgroup->prev = NULL; pVgroup->prev = NULL;
pVgroup->next = NULL; pVgroup->next = NULL;
int32_t size = sizeof(STabObj *) * pDb->cfg.maxSessions; int32_t size = sizeof(STableInfo *) * pDb->cfg.maxSessions;
pVgroup->meterList = (STabObj **)malloc(size); pVgroup->tableList = (STableInfo **)malloc(size);
if (pVgroup->meterList == NULL) { if (pVgroup->tableList == NULL) {
mError("failed to malloc(size:%d) for the meterList of vgroups", size); mError("failed to malloc(size:%d) for the tableList of vgroups", size);
return -1; return -1;
} }
memset(pVgroup->meterList, 0, size); memset(pVgroup->tableList, 0, size);
pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions); pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions);
if (pVgroup->idPool == NULL) { if (pVgroup->idPool == NULL) {
mError("failed to taosInitIdPool for vgroups"); mError("failed to taosInitIdPool for vgroups");
free(pVgroup->meterList); free(pVgroup->tableList);
return -1; return -1;
} }
...@@ -182,7 +177,7 @@ void mgmtProcessVgTimer(void *handle, void *tmrId) { ...@@ -182,7 +177,7 @@ void mgmtProcessVgTimer(void *handle, void *tmrId) {
SVgObj *mgmtCreateVgroup(SDbObj *pDb) { SVgObj *mgmtCreateVgroup(SDbObj *pDb) {
SVgObj *pVgroup; SVgObj *pVgroup;
int32_t size; int32_t size;
size = sizeof(SVgObj); size = sizeof(SVgObj);
pVgroup = (SVgObj *)malloc(size); pVgroup = (SVgObj *)malloc(size);
...@@ -212,13 +207,13 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) { ...@@ -212,13 +207,13 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) {
} }
int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) {
STabObj *pTable; STableInfo *pTable;
if (pVgroup->numOfMeters > 0) { if (pVgroup->numOfMeters > 0) {
for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) { for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) {
if (pVgroup->meterList != NULL) { if (pVgroup->tableList != NULL) {
pTable = pVgroup->meterList[i]; pTable = pVgroup->tableList[i];
if (pTable) mgmtDropTable(pDb, pTable->meterId, 0); if (pTable) mgmtDropTable(pDb, pTable->tableId, 0);
} }
} }
} }
...@@ -281,18 +276,18 @@ int32_t mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { ...@@ -281,18 +276,18 @@ int32_t mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
int32_t maxReplica = 0; int32_t maxReplica = 0;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
STabObj *pTable = NULL; STableInfo *pTable = NULL;
if (pShow->payloadLen > 0 ) { if (pShow->payloadLen > 0 ) {
// pTable = mgmtGetTable(pShow->payload); pTable = mgmtGetTable(pShow->payload);
// if (NULL == pTable) { if (NULL == pTable) {
// return TSDB_CODE_INVALID_TABLE_ID; return TSDB_CODE_INVALID_TABLE_ID;
// } }
//
// pVgroup = mgmtGetVgroup(pTable->gid.vgId); pVgroup = mgmtGetVgroup(pTable->vgId);
// if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID; if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID;
//
maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica; maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
} else { } else {
SVgObj *pVgroup = pDb->pHead; SVgObj *pVgroup = pDb->pHead;
...@@ -348,10 +343,10 @@ int32_t mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { ...@@ -348,10 +343,10 @@ int32_t mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
} }
int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) { int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int32_t numOfRows = 0; int32_t numOfRows = 0;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
char * pWrite; char * pWrite;
int32_t cols = 0; int32_t cols = 0;
char ipstr[20]; char ipstr[20];
int32_t maxReplica = 0; int32_t maxReplica = 0;
...@@ -424,9 +419,9 @@ void *mgmtVgroupActionInsert(void *row, char *str, int32_t size, int32_t *ssize) ...@@ -424,9 +419,9 @@ void *mgmtVgroupActionInsert(void *row, char *str, int32_t size, int32_t *ssize)
if (pDb == NULL) return NULL; if (pDb == NULL) return NULL;
int32_t tsize = sizeof(STabObj *) * pDb->cfg.maxSessions; int32_t tsize = sizeof(STableInfo *) * pDb->cfg.maxSessions;
pVgroup->meterList = (STabObj **)malloc(tsize); pVgroup->tableList = (STableInfo **)malloc(tsize);
memset(pVgroup->meterList, 0, tsize); memset(pVgroup->tableList, 0, tsize);
pVgroup->numOfMeters = 0; pVgroup->numOfMeters = 0;
pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions); pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions);
mgmtAddVgroupIntoDb(pDb, pVgroup); mgmtAddVgroupIntoDb(pDb, pVgroup);
...@@ -441,7 +436,7 @@ void *mgmtVgroupActionDelete(void *row, char *str, int32_t size, int32_t *ssize) ...@@ -441,7 +436,7 @@ void *mgmtVgroupActionDelete(void *row, char *str, int32_t size, int32_t *ssize)
if (pDb != NULL) mgmtRemoveVgroupFromDb(pDb, pVgroup); if (pDb != NULL) mgmtRemoveVgroupFromDb(pDb, pVgroup);
mgmtUnSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes); mgmtUnSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes);
tfree(pVgroup->meterList); tfree(pVgroup->tableList);
return NULL; return NULL;
} }
...@@ -456,8 +451,8 @@ void *mgmtVgroupActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) ...@@ -456,8 +451,8 @@ void *mgmtVgroupActionUpdate(void *row, char *str, int32_t size, int32_t *ssize)
if (pDb->cfg.maxSessions != oldTables) { if (pDb->cfg.maxSessions != oldTables) {
mPrint("vgroup:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxSessions); mPrint("vgroup:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxSessions);
taosUpdateIdPool(pVgroup->idPool, pDb->cfg.maxSessions); taosUpdateIdPool(pVgroup->idPool, pDb->cfg.maxSessions);
int32_t size = sizeof(STabObj *) * pDb->cfg.maxSessions; int32_t size = sizeof(STableInfo *) * pDb->cfg.maxSessions;
pVgroup->meterList = (STabObj **)realloc(pVgroup->meterList, size); pVgroup->tableList = (STableInfo **)realloc(pVgroup->tableList, size);
} }
} }
...@@ -465,9 +460,10 @@ void *mgmtVgroupActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) ...@@ -465,9 +460,10 @@ void *mgmtVgroupActionUpdate(void *row, char *str, int32_t size, int32_t *ssize)
return NULL; return NULL;
} }
void *mgmtVgroupActionEncode(void *row, char *str, int32_t size, int32_t *ssize) { void *mgmtVgroupActionEncode(void *row, char *str, int32_t size, int32_t *ssize) {
SVgObj *pVgroup = (SVgObj *)row; SVgObj *pVgroup = (SVgObj *)row;
int32_t tsize = pVgroup->updateEnd - (int8_t *)pVgroup; int32_t tsize = pVgroup->updateEnd - (int8_t *)pVgroup;
if (size < tsize) { if (size < tsize) {
*ssize = -1; *ssize = -1;
} else { } else {
...@@ -477,6 +473,7 @@ void *mgmtVgroupActionEncode(void *row, char *str, int32_t size, int32_t *ssize) ...@@ -477,6 +473,7 @@ void *mgmtVgroupActionEncode(void *row, char *str, int32_t size, int32_t *ssize)
return NULL; return NULL;
} }
void *mgmtVgroupActionDecode(void *row, char *str, int32_t size, int32_t *ssize) { void *mgmtVgroupActionDecode(void *row, char *str, int32_t size, int32_t *ssize) {
SVgObj *pVgroup = (SVgObj *)malloc(sizeof(SVgObj)); SVgObj *pVgroup = (SVgObj *)malloc(sizeof(SVgObj));
if (pVgroup == NULL) return NULL; if (pVgroup == NULL) return NULL;
...@@ -487,24 +484,23 @@ void *mgmtVgroupActionDecode(void *row, char *str, int32_t size, int32_t *ssize) ...@@ -487,24 +484,23 @@ void *mgmtVgroupActionDecode(void *row, char *str, int32_t size, int32_t *ssize)
return (void *)pVgroup; return (void *)pVgroup;
} }
void *mgmtVgroupActionBeforeBatchUpdate(void *row, char *str, int32_t size, int32_t *ssize) { return NULL; }
void *mgmtVgroupActionBatchUpdate(void *row, char *str, int32_t size, int32_t *ssize) { return NULL; }
void *mgmtVgroupActionAfterBatchUpdate(void *row, char *str, int32_t size, int32_t *ssize) { return NULL; }
void *mgmtVgroupActionReset(void *row, char *str, int32_t size, int32_t *ssize) { void *mgmtVgroupActionReset(void *row, char *str, int32_t size, int32_t *ssize) {
SVgObj *pVgroup = (SVgObj *)row; SVgObj *pVgroup = (SVgObj *)row;
int32_t tsize = pVgroup->updateEnd - (int8_t *)pVgroup; int32_t tsize = pVgroup->updateEnd - (int8_t *)pVgroup;
memcpy(pVgroup, str, tsize); memcpy(pVgroup, str, tsize);
return NULL; return NULL;
} }
void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) { void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) {
SVgObj *pVgroup = (SVgObj *)row; SVgObj *pVgroup = (SVgObj *)row;
if (pVgroup->idPool) { if (pVgroup->idPool) {
taosIdPoolCleanUp(pVgroup->idPool); taosIdPoolCleanUp(pVgroup->idPool);
pVgroup->idPool = NULL; pVgroup->idPool = NULL;
} }
if (pVgroup->meterList) tfree(pVgroup->meterList); if (pVgroup->tableList) tfree(pVgroup->tableList);
tfree(row); tfree(row);
return NULL; return NULL;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册