提交 00e18b70 编写于 作者: S slguan

[TD-15] refact code of tidpool

上级 4271952d
......@@ -159,6 +159,8 @@ typedef struct {
SSchema* schema;
} SNormalTableObj;
struct _db_obj;
typedef struct _vg_obj {
uint32_t vgId;
char dbName[TSDB_DB_NAME_LEN + 1];
......@@ -172,6 +174,7 @@ typedef struct _vg_obj {
int8_t reserved[14];
int8_t updateEnd[1];
struct _vg_obj *prev, *next;
struct _db_obj *pDb;
void * idPool;
STableInfo ** tableList;
} SVgObj;
......
......@@ -22,11 +22,11 @@ extern "C" {
#include "mnode.h"
int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtRemoveVgroupFromDb(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtMoveVgroupToTail(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtMoveVgroupToHead(SDbObj *pDb, SVgObj *pVgroup);
void mgmtAddVgroupIntoDb(SVgObj *pVgroup);
void mgmtAddVgroupIntoDbTail(SVgObj *pVgroup);
void mgmtRemoveVgroupFromDb(SVgObj *pVgroup);
void mgmtMoveVgroupToTail(SVgObj *pVgroup);
void mgmtMoveVgroupToHead(SVgObj *pVgroup);
int32_t mgmtInitDbs();
void mgmtCleanUpDbs();
......
......@@ -28,6 +28,8 @@ bool mgmtCheckQhandle(uint64_t qhandle);
void mgmtSaveQhandle(void *qhandle);
void mgmtFreeQhandle(void *qhandle);
void mgmtFreeQueuedMsg(SQueuedMsg *pMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -28,13 +28,12 @@ int32_t mgmtInitVgroups();
void mgmtCleanUpVgroups();
SVgObj *mgmtGetVgroup(int32_t vgId);
SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode);
void mgmtDropAllVgroups(SDbObj *pDropDb);
void mgmtCreateVgroup(SQueuedMsg *pMsg);
void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle);
void mgmtUpdateVgroup(SVgObj *pVgroup);
void mgmtUpdateVgroupIp(SDnodeObj *pDnode);
void mgmtSetVgroupIdPool();
void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle);
SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb);
void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable);
......
......@@ -85,7 +85,9 @@ int32_t mgmtRemoveUserFromAcct(SAcctObj *pAcct, SUserObj *pUser) {
pUser->prev->next = pUser->next;
}
if (pUser->next) pUser->next->prev = pUser->prev;
if (pUser->next) {
pUser->next->prev = pUser->prev;
}
if (pUser->prev == NULL) {
pAcct->pUser = pUser->next;
......
......@@ -333,7 +333,8 @@ bool mgmtCheckIsMonitorDB(char *db, char *monitordb) {
return (strncasecmp(dbName, monitordb, len) == 0 && len == strlen(monitordb));
}
int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup) {
void mgmtAddVgroupIntoDb(SVgObj *pVgroup) {
SDbObj *pDb = pVgroup->pDb;
pVgroup->next = pDb->pHead;
pVgroup->prev = NULL;
......@@ -342,11 +343,10 @@ int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup) {
pDb->pHead = pVgroup;
pDb->numOfVgroups++;
return 0;
}
int32_t mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup) {
void mgmtAddVgroupIntoDbTail(SVgObj *pVgroup) {
SDbObj *pDb = pVgroup->pDb;
pVgroup->next = NULL;
pVgroup->prev = pDb->pTail;
......@@ -355,32 +355,25 @@ int32_t mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup) {
pDb->pTail = pVgroup;
pDb->numOfVgroups++;
return 0;
}
int32_t mgmtRemoveVgroupFromDb(SDbObj *pDb, SVgObj *pVgroup) {
void mgmtRemoveVgroupFromDb(SVgObj *pVgroup) {
SDbObj *pDb = pVgroup->pDb;
if (pVgroup->prev) pVgroup->prev->next = pVgroup->next;
if (pVgroup->next) pVgroup->next->prev = pVgroup->prev;
if (pVgroup->prev == NULL) pDb->pHead = pVgroup->next;
if (pVgroup->next == NULL) pDb->pTail = pVgroup->prev;
pDb->numOfVgroups--;
return 0;
}
int32_t mgmtMoveVgroupToTail(SDbObj *pDb, SVgObj *pVgroup) {
mgmtRemoveVgroupFromDb(pDb, pVgroup);
mgmtAddVgroupIntoDbTail(pDb, pVgroup);
return 0;
void mgmtMoveVgroupToTail(SVgObj *pVgroup) {
mgmtRemoveVgroupFromDb(pVgroup);
mgmtAddVgroupIntoDbTail(pVgroup);
}
int32_t mgmtMoveVgroupToHead(SDbObj *pDb, SVgObj *pVgroup) {
mgmtRemoveVgroupFromDb(pDb, pVgroup);
mgmtAddVgroupIntoDb(pDb, pVgroup);
return 0;
void mgmtMoveVgroupToHead(SVgObj *pVgroup) {
mgmtRemoveVgroupFromDb(pVgroup);
mgmtAddVgroupIntoDb(pVgroup);
}
void mgmtCleanUpDbs() {
......@@ -600,7 +593,7 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
if (strcmp(pUser->user, "root") == 0) {
#endif
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.maxSessions - 1; // table num can be created should minus 1
*(int32_t *)pWrite = pDb->cfg.maxSessions; // table num can be created should minus 1
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......@@ -613,9 +606,9 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
#ifdef _TD_ARM_32_
*(int32_t *)pWrite = (pDb->cfg.cacheNumOfBlocks.totalBlocks * 1.0 / (pDb->cfg.maxSessions - 1));
*(int32_t *)pWrite = (pDb->cfg.cacheNumOfBlocks.totalBlocks * 1.0 / (pDb->cfg.maxSessions));
#else
*(float *)pWrite = (pDb->cfg.cacheNumOfBlocks.totalBlocks * 1.0 / (pDb->cfg.maxSessions - 1));
*(float *)pWrite = (pDb->cfg.cacheNumOfBlocks.totalBlocks * 1.0 / (pDb->cfg.maxSessions));
#endif
cols++;
......@@ -721,7 +714,7 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) {
static SDbCfg mgmtGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
SDbCfg newCfg = pDb->cfg;
int32_t daysToKeep = htonl(pAlter->daysToKeep);
int32_t maxSessions = htonl(pAlter->maxSessions) + 1;
int32_t maxSessions = htonl(pAlter->maxSessions);
int8_t replications = pAlter->replications;
terrno = TSDB_CODE_SUCCESS;
......
......@@ -566,11 +566,6 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
mgmtSetDnodeMaxVnodes(pDnode);
}
if (lastPrivateIp != pDnode->privateIp || lastPublicIp != pDnode->publicIp) {
mgmtUpdateVgroupIp(pDnode);
//mgmtUpdateMnodeIp();
}
int32_t openVnodes = htons(pStatus->openVnodes);
for (int32_t j = 0; j < openVnodes; ++j) {
pDnode->vload[j].vgId = htonl(pStatus->load[j].vgId);
......
......@@ -762,3 +762,13 @@ int32_t mgmtInitProfile() {
void mgmtCleanUpProfile() {
}
void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) {
if (pMsg != NULL) {
if (pMsg->pCont != NULL) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
free(pMsg);
}
}
\ No newline at end of file
......@@ -23,6 +23,7 @@
#include "mgmtDb.h"
#include "mgmtDClient.h"
#include "mgmtDnode.h"
#include "mgmtDServer.h"
#include "mgmtProfile.h"
#include "mgmtSdb.h"
#include "mgmtShell.h"
......@@ -36,49 +37,76 @@ static int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg);
static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg);
static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) ;
static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle);
static void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
static int32_t mgmtVgroupActionDestroy(void *pObj) {
SVgObj *pVgroup = (SVgObj *) pObj;
static int32_t mgmtVgroupActionDestroy(SSdbOperDesc *pOper) {
SVgObj *pVgroup = pOper->pObj;
if (pVgroup->idPool) {
taosIdPoolCleanUp(pVgroup->idPool);
pVgroup->idPool = NULL;
}
if (pVgroup->tableList) tfree(pVgroup->tableList);
tfree(pObj);
if (pVgroup->tableList) {
tfree(pVgroup->tableList);
}
tfree(pOper->pObj);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtVgroupActionInsert(void *pObj) {
SVgObj *pVgroup = pObj;
static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) {
SVgObj *pVgroup = pOper->pObj;
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) {
return TSDB_CODE_INVALID_DB;
}
pVgroup->pDb = pDb;
pVgroup->prev = NULL;
pVgroup->next = NULL;
int32_t size = sizeof(STableInfo *) * pDb->cfg.maxSessions;
pVgroup->tableList = (STableInfo **)calloc(pDb->cfg.maxSessions, sizeof(STableInfo *));
if (pVgroup->tableList == NULL) {
mError("vgroup:%d, failed to malloc(size:%d) for the tableList of vgroups", pVgroup->vgId, size);
return -1;
}
pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions);
if (pVgroup->idPool == NULL) {
mError("vgroup:%d, failed to taosInitIdPool for vgroups", pVgroup->vgId);
tfree(pVgroup->tableList);
return -1;
}
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId);
pVgroup->vnodeGid[i].privateIp = pDnode->privateIp;
pVgroup->vnodeGid[i].publicIp = pDnode->publicIp;
pVgroup->vnodeGid[i].vnode = pVgroup->vgId;
}
mgmtAddVgroupIntoDb(pVgroup);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtVgroupActionDelete(void *pObj) {
SVgObj *pVgroup = pObj;
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
static int32_t mgmtVgroupActionDelete(SSdbOperDesc *pOper) {
SVgObj *pVgroup = pOper->pObj;
if (pDb != NULL) {
mgmtRemoveVgroupFromDb(pDb, pVgroup);
if (pVgroup->pDb != NULL) {
mgmtRemoveVgroupFromDb(pVgroup);
}
// mgmtUnSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes);
tfree(pVgroup->tableList);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtVgroupActionUpdate(void *pObj) {
SVgObj *pVgroup = (SVgObj *) pObj;
static int32_t mgmtVgroupActionUpdate(SSdbOperDesc *pOper) {
SVgObj *pVgroup = pOper->pObj;
int32_t oldTables = taosIdPoolMaxSize(pVgroup->idPool);
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
SDbObj *pDb = pVgroup->pDb;
if (pDb != NULL) {
if (pDb->cfg.maxSessions != oldTables) {
mPrint("vgroup:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxSessions);
......@@ -88,33 +116,31 @@ static int32_t mgmtVgroupActionUpdate(void *pObj) {
}
}
mTrace("vgroup:%d update, numOfVnode:%d", pVgroup->vgId, pVgroup->numOfVnodes);
mTrace("vgroup:%d, is updated, tables:%d numOfVnode:%d", pVgroup->vgId, pDb->cfg.maxSessions, pVgroup->numOfVnodes);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtVgroupActionEncode(void *pObj, void *pData, int32_t maxRowSize) {
SVgObj *pVgroup = (SVgObj *) pObj;
if (maxRowSize < tsVgUpdateSize) {
static int32_t mgmtVgroupActionEncode(SSdbOperDesc *pOper) {
SVgObj *pVgroup = pOper->pObj;
if (pOper->maxRowSize < tsVgUpdateSize) {
return -1;
} else {
memcpy(pData, pVgroup, tsVgUpdateSize);
return tsVgUpdateSize;
memcpy(pOper->rowData, pVgroup, tsVgUpdateSize);
pOper->rowSize = tsVgUpdateSize;
return TSDB_CODE_SUCCESS;
}
}
static void *mgmtVgroupActionDecode(void *pObj) {
SVgObj *pVgroup = (SVgObj *) malloc(sizeof(SVgObj));
if (pVgroup == NULL) return NULL;
memset(pVgroup, 0, sizeof(SVgObj));
memcpy(pVgroup, pObj, tsVgUpdateSize);
static int32_t mgmtVgroupActionDecode(SSdbOperDesc *pOper) {
SVgObj *pVgroup = (SVgObj *) calloc(1, sizeof(SVgObj));
if (pVgroup == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY;
return pVgroup;
memcpy(pVgroup, pOper->rowData, tsVgUpdateSize);
pOper->pObj = pVgroup;
return TSDB_CODE_SUCCESS;
}
int32_t mgmtInitVgroups() {
void *pNode = NULL;
SVgObj *pVgroup = NULL;
SVgObj tObj;
tsVgUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
......@@ -122,7 +148,7 @@ int32_t mgmtInitVgroups() {
.tableName = "vgroups",
.hashSessions = TSDB_MAX_VGROUPS,
.maxRowSize = tsVgUpdateSize,
.keyType = SDB_KEYTYPE_AUTO,
.keyType = SDB_KEY_TYPE_AUTO,
.insertFp = mgmtVgroupActionInsert,
.deleteFp = mgmtVgroupActionDelete,
.updateFp = mgmtVgroupActionUpdate,
......@@ -137,47 +163,11 @@ int32_t mgmtInitVgroups() {
return -1;
}
while (1) {
pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup);
if (pVgroup == NULL) break;
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) continue;
pVgroup->prev = NULL;
pVgroup->next = NULL;
int32_t size = sizeof(STableInfo *) * pDb->cfg.maxSessions;
pVgroup->tableList = (STableInfo **)malloc(size);
if (pVgroup->tableList == NULL) {
mError("failed to malloc(size:%d) for the tableList of vgroups", size);
return -1;
}
memset(pVgroup->tableList, 0, size);
pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions);
if (pVgroup->idPool == NULL) {
mError("failed to taosInitIdPool for vgroups");
free(pVgroup->tableList);
return -1;
}
taosIdPoolReinit(pVgroup->idPool);
if (tsIsCluster && pVgroup->vnodeGid[0].publicIp == 0) {
pVgroup->vnodeGid[0].publicIp = inet_addr(tsPublicIp);
pVgroup->vnodeGid[0].privateIp = inet_addr(tsPrivateIp);
sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, SDB_OPER_GLOBAL);
}
// mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId);
}
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp);
mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mgmtProcessVnodeCfgMsg);
mTrace("vgroup is initialized");
return 0;
......@@ -196,32 +186,41 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg) {
if (pDb == NULL) {
mError("failed to create vgroup, db not found");
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB);
mgmtFreeQueuedMsg(pMsg);
return;
}
SVgObj *pVgroup = (SVgObj *)calloc(sizeof(SVgObj), 1);
SVgObj *pVgroup = (SVgObj *)calloc(1, sizeof(SVgObj));
strcpy(pVgroup->dbName, pDb->name);
pVgroup->numOfVnodes = pDb->cfg.replications;
pVgroup->createdTime = taosGetTimestampMs();
if (mgmtAllocVnodes(pVgroup) != 0) {
mError("db:%s, no enough dnode to alloc %d vnodes to vgroup", pDb->name, pVgroup->numOfVnodes);
free(pVgroup);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_ENOUGH_DNODES);
mgmtFreeQueuedMsg(pMsg);
return;
}
pVgroup->createdTime = taosGetTimestampMs();
pVgroup->tableList = (STableInfo **) calloc(sizeof(STableInfo *), pDb->cfg.maxSessions);
pVgroup->numOfTables = 0;
pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions);
mgmtAddVgroupIntoDb(pDb, pVgroup);
// mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId);
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsVgroupSdb,
.pObj = pVgroup,
.rowSize = sizeof(SVgObj)
};
sdbInsertRow(tsVgroupSdb, pVgroup, SDB_OPER_GLOBAL);
int32_t code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
tfree(pVgroup);
code = TSDB_CODE_SDB_ERROR;
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SDB_ERROR);
mgmtFreeQueuedMsg(pMsg);
return;
}
mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
mPrint("vgroup:%d, dnode:%d vnode:%d", pVgroup->vgId, pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].vnode);
mPrint("vgroup:%d, index:%d, dnode:%d vnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].vnode);
}
pMsg->ahandle = pVgroup;
......@@ -235,28 +234,12 @@ void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle) {
} else {
mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes);
mgmtSendDropVgroupMsg(pVgroup, NULL);
sdbDeleteRow(tsVgroupSdb, pVgroup, SDB_OPER_GLOBAL);
}
}
void mgmtSetVgroupIdPool() {
void * pNode = NULL;
SVgObj *pVgroup = NULL;
SDbObj *pDb;
while (1) {
pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup);
if (pVgroup == NULL || pVgroup->idPool == 0) break;
taosIdPoolSetFreeList(pVgroup->idPool);
pVgroup->numOfTables = taosIdPoolNumOfUsed(pVgroup->idPool);
pDb = mgmtGetDb(pVgroup->dbName);
pDb->numOfTables += pVgroup->numOfTables;
if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1)
mgmtAddVgroupIntoDbTail(pDb, pVgroup);
else
mgmtAddVgroupIntoDb(pDb, pVgroup);
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsVgroupSdb,
.pObj = pVgroup
};
sdbDeleteRow(&oper);
}
}
......@@ -287,7 +270,7 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
pShow->bytes[cols] = 9;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "vgroup_status");
strcpy(pSchema[cols].name, "vgroup status");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
......@@ -327,13 +310,13 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
pShow->bytes[cols] = 9;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "vnode_status");
strcpy(pSchema[cols].name, "vnode status");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "public_ip");
strcpy(pSchema[cols].name, "public ip");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
}
......@@ -445,25 +428,36 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
return numOfRows;
}
void mgmtUpdateVgroup(SVgObj *pVgroup) {
sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, SDB_OPER_LOCAL);
}
void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable) {
pVgroup->numOfTables++;
if (pTable->sid >= 0)
if (pTable->sid >= 0 && pVgroup->tableList[pTable->sid] == NULL) {
pVgroup->tableList[pTable->sid] = pTable;
taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid);
pVgroup->numOfTables++;
pVgroup->pDb->numOfTables++;
}
if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxSessions)
mgmtAddVgroupIntoDbTail(pVgroup);
else
mgmtAddVgroupIntoDb(pVgroup);
}
void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable) {
pVgroup->numOfTables--;
if (pTable->sid >= 0)
if (pTable->sid >= 0 && pVgroup->tableList[pTable->sid] != NULL) {
pVgroup->tableList[pTable->sid] = NULL;
taosFreeId(pVgroup->idPool, pTable->sid);
pVgroup->numOfTables--;
pVgroup->pDb->numOfTables--;
}
if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxSessions)
mgmtAddVgroupIntoDbTail(pVgroup);
else
mgmtAddVgroupIntoDb(pVgroup);
}
SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
SDbObj *pDb = pVgroup->pDb;
if (pDb == NULL) return NULL;
SMDCreateVnodeMsg *pVnode = rpcMallocCont(sizeof(SMDCreateVnodeMsg));
......@@ -487,6 +481,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
SVnodeDesc *vpeerDesc = pVnode->vpeerDesc;
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode);
vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].privateIp);
}
......@@ -576,13 +571,23 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
newMsg->contLen = queueMsg->contLen;
newMsg->pCont = rpcMallocCont(newMsg->contLen);
memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen);
queueMsg->pCont = NULL;
mgmtAddToShellQueue(newMsg);
} else {
sdbDeleteRow(tsVgroupSdb, pVgroup, SDB_OPER_GLOBAL);
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsVgroupSdb,
.pObj = pVgroup
};
int32_t code = sdbDeleteRow(&oper);
if (code != 0) {
code = TSDB_CODE_SDB_ERROR;
}
mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
}
free(queueMsg);
mgmtFreeQueuedMsg(queueMsg);
}
static SMDDropVnodeMsg *mgmtBuildDropVnodeMsg(int32_t vgId) {
......@@ -632,7 +637,15 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
if (queueMsg->received != queueMsg->expected) return;
sdbDeleteRow(tsVgroupSdb, pVgroup, SDB_OPER_GLOBAL);
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL,
.table = tsVgroupSdb,
.pObj = pVgroup
};
int32_t code = sdbDeleteRow(&oper);
if (code != 0) {
code = TSDB_CODE_SDB_ERROR;
}
SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg));
newMsg->msgType = queueMsg->msgType;
......@@ -644,50 +657,64 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen);
mgmtAddToShellQueue(newMsg);
free(queueMsg);
queueMsg->pCont = NULL;
mgmtFreeQueuedMsg(queueMsg);
}
void mgmtUpdateVgroupIp(SDnodeObj *pDnode) {
void * pNode = NULL;
static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) {
if (mgmtCheckRedirect(rpcMsg->handle)) return;
SDMConfigVnodeMsg *pCfg = (SDMConfigVnodeMsg *) rpcMsg->pCont;
pCfg->dnode = htonl(pCfg->dnode);
pCfg->vnode = htonl(pCfg->vnode);
SVgObj *pVgroup = mgmtGetVgroupByVnode(pCfg->dnode, pCfg->vnode);
if (pVgroup == NULL) {
mTrace("dnode:%s, vnode:%d, no vgroup info", taosIpStr(pCfg->dnode), pCfg->vnode);
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE);
return;
}
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS);
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode);
mgmtSendCreateVnodeMsg(pVgroup, &ipSet, NULL);
}
void mgmtDropAllVgroups(SDbObj *pDropDb) {
void *pNode = NULL;
void *pLastNode = NULL;
int32_t numOfTables = 0;
int32_t dbNameLen = strlen(pDropDb->name);
SVgObj *pVgroup = NULL;
while (1) {
pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup);
if (pVgroup == NULL) break;
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SVnodeGid *vnodeGid = pVgroup->vnodeGid + i;
if (vnodeGid->dnodeId == pDnode->dnodeId) {
mPrint("vgroup:%d, dnode:%d, privateIp:%s change to %s, publicIp:%s change to %s",
pVgroup->vgId, vnodeGid->dnodeId, pDnode->privateIp, taosIpStr(vnodeGid->privateIp),
pDnode->publicIp, taosIpStr(vnodeGid->publicIp));
vnodeGid->publicIp = pDnode->publicIp;
vnodeGid->privateIp = pDnode->privateIp;
sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, SDB_OPER_GLOBAL);
}
if (strncmp(pDropDb->name, pVgroup->dbName, dbNameLen) == 0) {
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_LOCAL,
.table = tsVgroupSdb,
.pObj = pVgroup,
};
sdbDeleteRow(&oper);
pNode = pLastNode;
numOfTables++;
continue;
}
}
mTrace("db:%s, all vgroups is dropped from sdb", pDropDb->name, numOfTables);
}
//static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *pConn) {
// if (!sdbMaster) {
// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0);
// return;
// }
//
// SDMConfigVnodeMsg *pCfg = (SDMConfigVnodeMsg *) pCont;
// pCfg->dnode = htonl(pCfg->dnode);
// pCfg->vnode = htonl(pCfg->vnode);
//
// SVgObj *pVgroup = mgmtGetVgroupByVnode(pCfg->dnode, pCfg->vnode);
// if (pVgroup == NULL) {
// mTrace("dnode:%s, vnode:%d, no vgroup info", taosIpStr(pCfg->dnode), pCfg->vnode);
// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_NOT_ACTIVE_VNODE, NULL, 0);
// return;
// }
//
// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0);
//
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode);
// mgmtSendCreateVnodeMsg(pVgroup, pCfg->vnode, &ipSet, NULL);
//}
//
\ No newline at end of file
void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle) {
assert(ahandle != NULL);
if (pVgroup->numOfVnodes != pVgroup->pDb->cfg.replications) {
// TODO:
// mgmtSendAlterVgroupMsg(pVgroup, NULL);
} else {
mgmtAddToShellQueue(ahandle);
}
}
\ No newline at end of file
......@@ -34,11 +34,7 @@ void taosIdPoolCleanUp(void *handle);
int taosIdPoolNumOfUsed(void *handle);
void taosIdPoolReinit(void *handle);
void taosIdPoolMarkStatus(void *handle, int id, int status);
void taosIdPoolSetFreeList(void *handle);
void taosIdPoolMarkStatus(void *handle, int id);
#ifdef __cplusplus
}
......
......@@ -25,78 +25,74 @@ typedef struct {
} id_pool_t;
void *taosInitIdPool(int maxId) {
id_pool_t *pIdPool;
int * idList, i;
if (maxId < 3) maxId = 3;
pIdPool = (id_pool_t *)malloc(sizeof(id_pool_t));
id_pool_t *pIdPool = calloc(1, sizeof(id_pool_t));
if (pIdPool == NULL) return NULL;
idList = (int *)malloc(sizeof(int) * (size_t)maxId);
if (idList == NULL) {
pIdPool->freeList = malloc(sizeof(int) * (size_t)maxId);
if (pIdPool->freeList == NULL) {
free(pIdPool);
return NULL;
}
memset(pIdPool, 0, sizeof(id_pool_t));
pIdPool->maxId = maxId;
pIdPool->numOfFree = maxId - 1;
pIdPool->numOfFree = maxId;
pIdPool->freeSlot = 0;
pIdPool->freeList = idList;
pthread_mutex_init(&pIdPool->mutex, NULL);
for (i = 1; i < maxId; ++i) idList[i - 1] = i;
for (int i = 0; i < maxId; ++i) {
pIdPool->freeList[i] = -1;
}
pTrace("pool:%p is setup, maxId:%d", pIdPool, pIdPool->maxId);
return (void *)pIdPool;
return pIdPool;
}
int taosAllocateId(void *handle) {
id_pool_t *pIdPool;
int id = -1;
if (handle == NULL) return id;
pIdPool = (id_pool_t *)handle;
if (pIdPool->maxId < 3) pError("pool:%p is messed up, maxId:%d", pIdPool, pIdPool->maxId);
id_pool_t *pIdPool = handle;
if (handle == NULL) {
return -1;
}
if (pthread_mutex_lock(&pIdPool->mutex) != 0) perror("lock pIdPool Mutex");
int slot = -1;
pthread_mutex_lock(&pIdPool->mutex);
if (pIdPool->numOfFree > 0) {
id = pIdPool->freeList[pIdPool->freeSlot];
pIdPool->freeSlot = (pIdPool->freeSlot + 1) % pIdPool->maxId;
for (int i = 0; i < pIdPool->maxId; ++i) {
slot = (i + pIdPool->freeSlot) % pIdPool->maxId;
if (pIdPool->freeList[slot] == -1) {
pIdPool->freeList[slot] = slot;
pIdPool->freeSlot = slot + 1;
pIdPool->numOfFree--;
break;
}
}
}
if (pthread_mutex_unlock(&pIdPool->mutex) != 0) perror("unlock pIdPool Mutex");
return id;
pthread_mutex_unlock(&pIdPool->mutex);
return slot;
}
void taosFreeId(void *handle, int id) {
id_pool_t *pIdPool;
int slot;
id_pool_t *pIdPool = handle;
if (handle == NULL) return;
pIdPool = (id_pool_t *)handle;
if (pIdPool->freeList == NULL || pIdPool->maxId == 0) return;
if (id <= 0 || id >= pIdPool->maxId) return;
if (pthread_mutex_lock(&pIdPool->mutex) != 0) perror("lock pIdPool Mutex");
pthread_mutex_lock(&pIdPool->mutex);
slot = (pIdPool->freeSlot + pIdPool->numOfFree) % pIdPool->maxId;
pIdPool->freeList[slot] = id;
int slot = id % pIdPool->maxId;
if (pIdPool->freeList[slot] != -1) {
pIdPool->freeList[slot] = -1;
pIdPool->numOfFree++;
}
if (pthread_mutex_unlock(&pIdPool->mutex) != 0) perror("unlock pIdPool Mutex");
pthread_mutex_unlock(&pIdPool->mutex);
}
void taosIdPoolCleanUp(void *handle) {
id_pool_t *pIdPool;
id_pool_t *pIdPool = handle;
if (handle == NULL) return;
pIdPool = (id_pool_t *)handle;
if (pIdPool == NULL) return;
pTrace("pool:%p is cleaned", pIdPool);
......@@ -110,42 +106,21 @@ void taosIdPoolCleanUp(void *handle) {
}
int taosIdPoolNumOfUsed(void *handle) {
id_pool_t *pIdPool = (id_pool_t *)handle;
return pIdPool->maxId - pIdPool->numOfFree - 1;
id_pool_t *pIdPool = handle;
return pIdPool->maxId - pIdPool->numOfFree;
}
void taosIdPoolReinit(void *handle) {
id_pool_t *pIdPool;
void taosIdPoolMarkStatus(void *handle, int id) {
id_pool_t *pIdPool = handle;
pthread_mutex_lock(&pIdPool->mutex);
pIdPool = (id_pool_t *)handle;
pIdPool->numOfFree = 0;
pIdPool->freeSlot = 0;
for (int i = 0; i < pIdPool->maxId; ++i) pIdPool->freeList[i] = 0;
}
void taosIdPoolMarkStatus(void *handle, int id, int status) {
id_pool_t *pIdPool = (id_pool_t *)handle;
pIdPool->freeList[id] = status;
}
void taosIdPoolSetFreeList(void *handle) {
id_pool_t *pIdPool;
int pos = 0;
pIdPool = (id_pool_t *)handle;
pIdPool->numOfFree = 0;
pIdPool->freeSlot = 0;
for (int i = 1; i < pIdPool->maxId; ++i) {
if (pIdPool->freeList[i] == 0) {
pIdPool->freeList[pos] = i;
pIdPool->numOfFree++;
pos++;
}
int slot = id % pIdPool->maxId;
if (pIdPool->freeList[slot] == -1) {
pIdPool->freeList[slot] = slot;
pIdPool->numOfFree--;
}
pthread_mutex_unlock(&pIdPool->mutex);
}
int taosUpdateIdPool(id_pool_t *handle, int maxId) {
......@@ -154,18 +129,18 @@ int taosUpdateIdPool(id_pool_t *handle, int maxId) {
return -1;
}
int *idList, i;
idList = (int *)malloc(sizeof(int) * (size_t)maxId);
int *idList = malloc(sizeof(int) * maxId);
if (idList == NULL) {
return -1;
}
for (i = 1; i < maxId; ++i) {
idList[i - 1] = i;
}
if (pthread_mutex_lock(&pIdPool->mutex) != 0) perror("lock pIdPool Mutex");
pthread_mutex_lock(&pIdPool->mutex);
for (int i = 0; i < maxId; ++i) {
idList[i] = -1;
}
memcpy(idList, pIdPool->freeList, sizeof(int) * (size_t)pIdPool->maxId);
memcpy(idList, pIdPool->freeList, sizeof(int) * pIdPool->maxId);
pIdPool->numOfFree += (maxId - pIdPool->maxId);
pIdPool->maxId = maxId;
......@@ -173,7 +148,7 @@ int taosUpdateIdPool(id_pool_t *handle, int maxId) {
pIdPool->freeList = idList;
free(oldIdList);
if (pthread_mutex_unlock(&pIdPool->mutex) != 0) perror("unlock pIdPool Mutex");
pthread_mutex_unlock(&pIdPool->mutex);
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册