提交 0e6c01c0 编写于 作者: S Shengliang Guan

[TD-468] the first version of tables' allocation

上级 e9eb13e7
......@@ -68,6 +68,7 @@ extern int64_t tsMaxRetentWindow;
extern int32_t tsCacheBlockSize;
extern int32_t tsBlocksPerVnode;
extern int32_t tsMaxTablePerVnode;
extern int32_t tsMaxVgroupsPerDb;
extern int16_t tsDaysPerFile;
extern int32_t tsDaysToKeep;
extern int32_t tsMinRowsInFileBlock;
......
......@@ -109,13 +109,8 @@ int32_t tsTimePrecision = TSDB_DEFAULT_PRECISION;
int16_t tsCompression = TSDB_DEFAULT_COMP_LEVEL;
int16_t tsWAL = TSDB_DEFAULT_WAL_LEVEL;
int32_t tsReplications = TSDB_DEFAULT_REPLICA_NUM;
#ifdef _TD_ARM_32_
int32_t tsMaxTablePerVnode = 100;
#else
int32_t tsMaxTablePerVnode = TSDB_DEFAULT_TABLES;
#endif
int32_t tsMaxVgroupsPerDb = 0;
int32_t tsMaxTablePerVnode = TSDB_DEFAULT_TABLES;
// balance
int32_t tsEnableBalance = 1;
int32_t tsAlternativeRole = 0;
......@@ -594,6 +589,16 @@ static void doInitGlobalConfig() {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "maxVgroupsPerDb";
cfg.ptr = &tsMaxVgroupsPerDb;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 8192;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// database configs
cfg.option = "maxtablesPerVnode";
cfg.ptr = &tsMaxTablePerVnode;
......
......@@ -271,8 +271,9 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_DEFAULT_TOTAL_BLOCKS 4
#define TSDB_MIN_TABLES 4
#define TSDB_MAX_TABLES 200000
#define TSDB_DEFAULT_TABLES 1000
#define TSDB_MAX_TABLES 5000000
#define TSDB_DEFAULT_TABLES 500000
#define TSDB_TABLES_STEP 10000
#define TSDB_MIN_DAYS_PER_FILE 1
#define TSDB_MAX_DAYS_PER_FILE 3650
......
......@@ -44,10 +44,7 @@ void mnodeRemoveSuperTableFromDb(SDbObj *pDb);
void mnodeAddTableIntoDb(SDbObj *pDb);
void mnodeRemoveTableFromDb(SDbObj *pDb);
void mnodeAddVgroupIntoDb(SVgObj *pVgroup);
void mnodeAddVgroupIntoDbTail(SVgObj *pVgroup);
void mnodeRemoveVgroupFromDb(SVgObj *pVgroup);
void mnodeMoveVgroupToTail(SVgObj *pVgroup);
void mnodeMoveVgroupToHead(SVgObj *pVgroup);
#ifdef __cplusplus
}
......
......@@ -144,10 +144,8 @@ typedef struct SVgObj {
int64_t totalStorage;
int64_t compStorage;
int64_t pointsWritten;
struct SVgObj *prev, *next;
struct SDbObj *pDb;
void * idPool;
SChildTableObj **tableList;
} SVgObj;
typedef struct {
......@@ -182,9 +180,11 @@ typedef struct SDbObj {
int32_t numOfVgroups;
int32_t numOfTables;
int32_t numOfSuperTables;
SVgObj *pHead;
SVgObj *pTail;
int32_t vgListSize;
int32_t vgListIndex;
SVgObj **vgList;
struct SAcctObj *pAcct;
pthread_mutex_t mutex;
} SDbObj;
typedef struct SUserObj {
......@@ -245,7 +245,8 @@ typedef struct {
int16_t offset[TSDB_MAX_COLUMNS];
int16_t bytes[TSDB_MAX_COLUMNS];
int32_t numOfReads;
int8_t reserved0[2];
int8_t maxReplica;
int8_t reserved0[0];
uint16_t payloadLen;
char payload[];
} SShowObj;
......
......@@ -40,6 +40,7 @@ char* mnodeGetDnodeStatusStr(int32_t dnodeStatus);
void mgmtMonitorDnodeModule();
int32_t mnodeGetDnodesNum();
int32_t mnodeGetOnlinDnodesCpuCoreNum();
int32_t mnodeGetOnlinDnodesNum();
void * mnodeGetNextDnode(void *pIter, SDnodeObj **pDnode);
void mnodeIncDnodeRef(SDnodeObj *pDnode);
......
......@@ -30,17 +30,17 @@ void mnodeDecVgroupRef(SVgObj *pVgroup);
void mnodeDropAllDbVgroups(SDbObj *pDropDb);
void mnodeSendDropAllDbVgroupsMsg(SDbObj *pDropDb);
void mnodeDropAllDnodeVgroups(SDnodeObj *pDropDnode);
void mnodeUpdateAllDbVgroups(SDbObj *pAlterDb);
//void mnodeUpdateAllDbVgroups(SDbObj *pAlterDb);
void * mnodeGetNextVgroup(void *pIter, SVgObj **pVgroup);
void mnodeUpdateVgroup(SVgObj *pVgroup);
void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVload);
void mnodeCheckUnCreatedVgroup(SDnodeObj *pDnode, SVnodeLoad *pVloads, int32_t openVnodes);
int32_t mnodeCreateVgroup(struct SMnodeMsg *pMsg, SDbObj *pDb);
int32_t mnodeCreateVgroup(struct SMnodeMsg *pMsg);
void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle);
void mnodeAlterVgroup(SVgObj *pVgroup, void *ahandle);
SVgObj *mnodeGetAvailableVgroup(SDbObj *pDb);
int32_t mnodeGetAvailableVgroup(struct SMnodeMsg *pMsg, SVgObj **pVgroup, int32_t *sid);
void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable);
void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable);
......
......@@ -38,6 +38,7 @@
#include "mnodeUser.h"
#include "mnodeVgroup.h"
#define VG_LIST_SIZE 1
static void * tsDbSdb = NULL;
static int32_t tsDbUpdateSize;
......@@ -50,8 +51,14 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg);
static void mnodeDestroyDb(SDbObj *pDb) {
pthread_mutex_destroy(&pDb->mutex);
tfree(pDb->vgList);
tfree(pDb);
}
static int32_t mnodeDbActionDestroy(SSdbOper *pOper) {
tfree(pOper->pObj);
mnodeDestroyDb(pOper->pObj);
return TSDB_CODE_SUCCESS;
}
......@@ -59,8 +66,9 @@ static int32_t mnodeDbActionInsert(SSdbOper *pOper) {
SDbObj *pDb = pOper->pObj;
SAcctObj *pAcct = mnodeGetAcct(pDb->acct);
pDb->pHead = NULL;
pDb->pTail = NULL;
pthread_mutex_init(&pDb->mutex, NULL);
pDb->vgListSize = VG_LIST_SIZE;
pDb->vgList = calloc(pDb->vgListSize, sizeof(SVgObj *));
pDb->numOfVgroups = 0;
pDb->numOfTables = 0;
pDb->numOfSuperTables = 0;
......@@ -94,14 +102,15 @@ static int32_t mnodeDbActionDelete(SSdbOper *pOper) {
}
static int32_t mnodeDbActionUpdate(SSdbOper *pOper) {
SDbObj *pDb = pOper->pObj;
SDbObj *pSaved = mnodeGetDb(pDb->name);
if (pDb != pSaved) {
memcpy(pSaved, pDb, pOper->rowSize);
free(pDb);
SDbObj *pNew = pOper->pObj;
SDbObj *pDb = mnodeGetDb(pNew->name);
if (pDb != NULL && pNew != pDb) {
memcpy(pDb, pNew, pOper->rowSize);
free(pNew->vgList);
free(pNew);
}
mnodeUpdateAllDbVgroups(pSaved);
mnodeDecDbRef(pSaved);
//mnodeUpdateAllDbVgroups(pDb);
mnodeDecDbRef(pDb);
return TSDB_CODE_SUCCESS;
}
......@@ -416,45 +425,33 @@ void mnodePrintVgroups(SDbObj *pDb, char *oper) {
void mnodeAddVgroupIntoDb(SVgObj *pVgroup) {
SDbObj *pDb = pVgroup->pDb;
pVgroup->next = pDb->pHead;
pVgroup->prev = NULL;
if (pDb->pHead) pDb->pHead->prev = pVgroup;
if (pDb->pTail == NULL) pDb->pTail = pVgroup;
pDb->pHead = pVgroup;
pDb->numOfVgroups++;
}
void mnodeAddVgroupIntoDbTail(SVgObj *pVgroup) {
SDbObj *pDb = pVgroup->pDb;
pVgroup->next = NULL;
pVgroup->prev = pDb->pTail;
if (pDb->pTail) pDb->pTail->next = pVgroup;
if (pDb->pHead == NULL) pDb->pHead = pVgroup;
pthread_mutex_lock(&pDb->mutex);
int32_t vgPos = pDb->numOfVgroups++;
if (vgPos >= pDb->vgListSize) {
pDb->vgList = realloc(pDb->vgList, pDb->vgListSize * 2 * sizeof(SVgObj *));
memset(pDb->vgList + pDb->vgListSize, 0, pDb->vgListSize * sizeof(SVgObj *));
pDb->vgListSize *= 2;
}
pDb->pTail = pVgroup;
pDb->numOfVgroups++;
pDb->vgList[vgPos] = pVgroup;
pthread_mutex_lock(&pDb->mutex);
}
void mnodeRemoveVgroupFromDb(SVgObj *pVgroup) {
SDbObj *pDb = pVgroup->pDb;
if (pVgroup->prev) pVgroup->prev->next = pVgroup->next;
if (pVgroup->next) pVgroup->next->prev = pVgroup->prev;
if (pVgroup->prev == NULL) pDb->pHead = pVgroup->next;
if (pVgroup->next == NULL) pDb->pTail = pVgroup->prev;
pDb->numOfVgroups--;
}
void mnodeMoveVgroupToTail(SVgObj *pVgroup) {
mnodeRemoveVgroupFromDb(pVgroup);
mnodeAddVgroupIntoDbTail(pVgroup);
}
pthread_mutex_lock(&pDb->mutex);
for (int32_t v1 = 0; v1 < pDb->numOfVgroups; ++v1) {
if (pDb->vgList[v1] == pVgroup) {
for (int32_t v2 = v1; v2 < pDb->numOfVgroups - 1; ++v2) {
pDb->vgList[v2] = pDb->vgList[v2 + 1];
}
pDb->numOfVgroups--;
break;
}
}
void mnodeMoveVgroupToHead(SVgObj *pVgroup) {
mnodeRemoveVgroupFromDb(pVgroup);
mnodeAddVgroupIntoDb(pVgroup);
pthread_mutex_lock(&pDb->mutex);
}
void mnodeCleanupDbs() {
......@@ -525,11 +522,6 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn
#ifndef __CLOUD_VERSION__
if (strcmp(pUser->user, TSDB_DEFAULT_USER) == 0) {
#endif
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "maxtables");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
......@@ -555,12 +547,6 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "ctime(Sec.)");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 1;
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema[cols].name, "wallevel");
......@@ -670,10 +656,6 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
#ifndef __CLOUD_VERSION__
if (strcmp(pUser->user, TSDB_DEFAULT_USER) == 0) {
#endif
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.maxTables; // table num can be created should minus 1
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.cacheBlockSize;
cols++;
......@@ -690,10 +672,6 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
*(int32_t *)pWrite = pDb->cfg.maxRowsPerFileBlock;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.commitTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pDb->cfg.walLevel;
cols++;
......
......@@ -186,7 +186,27 @@ int32_t mnodeGetDnodesNum() {
return sdbGetNumOfRows(tsDnodeSdb);
}
int32_t mnodeGetOnlinDnodesNum(char *ep) {
int32_t mnodeGetOnlinDnodesCpuCoreNum() {
SDnodeObj *pDnode = NULL;
void * pIter = NULL;
int32_t cpuCores = 0;
while (1) {
pIter = mnodeGetNextDnode(pIter, &pDnode);
if (pDnode == NULL) break;
if (pDnode->status != TAOS_DN_STATUS_OFFLINE) {
cpuCores += pDnode->numOfCores;
}
mnodeDecDnodeRef(pDnode);
}
sdbFreeIter(pIter);
if (cpuCores < 2) cpuCores = 2;
return cpuCores;
}
int32_t mnodeGetOnlinDnodesNum() {
SDnodeObj *pDnode = NULL;
void * pIter = NULL;
int32_t onlineDnodes = 0;
......
......@@ -314,15 +314,6 @@ static int32_t mnodeChildTableActionRestored() {
continue;
}
if (pVgroup->tableList == NULL) {
mError("ctable:%s, vgId:%d tableList is null", pTable->info.tableId, pTable->vgId);
pTable->vgId = 0;
SSdbOper desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb};
sdbDeleteRow(&desc);
mnodeDecTableRef(pTable);
continue;
}
if (pTable->info.type == TSDB_CHILD_TABLE) {
SSuperTableObj *pSuperTable = mnodeGetSuperTableByUid(pTable->suid);
if (pSuperTable == NULL) {
......@@ -1686,19 +1677,15 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
return code;
}
SVgObj *pVgroup = mnodeGetAvailableVgroup(pMsg->pDb);
if (pVgroup == NULL) {
mDebug("app:%p:%p, table:%s, start to create a new vgroup", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId);
return mnodeCreateVgroup(pMsg, pMsg->pDb);
}
if (pMsg->retry == 0) {
if (pMsg->pTable == NULL) {
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid <= 0) {
mDebug("app:%p:%p, table:%s, no enough sid in vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId,
pVgroup->vgId);
return mnodeCreateVgroup(pMsg, pMsg->pDb);
SVgObj *pVgroup;
int32_t sid;
code = mnodeGetAvailableVgroup(pMsg, &pVgroup, &sid);
if (code != TSDB_CODE_SUCCESS) {
mDebug("app:%p:%p, table:%s, failed to get available vgroup, reason:%s", pMsg->rpcMsg.ahandle, pMsg,
pCreate->tableId, tstrerror(code));
return code;
}
if (pMsg->pVgroup == NULL) {
......@@ -2105,6 +2092,7 @@ static void mnodeDropAllChildTablesInStable(SSuperTableObj *pStable) {
mInfo("stable:%s, all child tables:%d is dropped from sdb", pStable->info.tableId, numOfTables);
}
#if 0
static SChildTableObj* mnodeGetTableByPos(int32_t vnode, int32_t sid) {
SVgObj *pVgroup = mnodeGetVgroup(vnode);
if (pVgroup == NULL) return NULL;
......@@ -2115,8 +2103,11 @@ static SChildTableObj* mnodeGetTableByPos(int32_t vnode, int32_t sid) {
mnodeDecVgroupRef(pVgroup);
return pTable;
}
#endif
static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_COM_OPS_NOT_SUPPORT;
#if 0
SDMConfigTableMsg *pCfg = pMsg->rpcMsg.pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->vgId = htonl(pCfg->vgId);
......@@ -2140,6 +2131,7 @@ static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg) {
pMsg->rpcRsp.rsp = pCreate;
pMsg->rpcRsp.len = htonl(pCreate->contLen);
return TSDB_CODE_SUCCESS;
#endif
}
// handle drop child response
......
......@@ -41,6 +41,7 @@
static void *tsVgroupSdb = NULL;
static int32_t tsVgUpdateSize = 0;
static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup);
static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg);
......@@ -54,9 +55,6 @@ static int32_t mnodeVgroupActionDestroy(SSdbOper *pOper) {
taosIdPoolCleanUp(pVgroup->idPool);
pVgroup->idPool = NULL;
}
if (pVgroup->tableList) {
tfree(pVgroup->tableList);
}
tfree(pOper->pObj);
return TSDB_CODE_SUCCESS;
......@@ -72,21 +70,9 @@ static int32_t mnodeVgroupActionInsert(SSdbOper *pOper) {
}
pVgroup->pDb = pDb;
pVgroup->prev = NULL;
pVgroup->next = NULL;
pVgroup->accessState = TSDB_VN_ALL_ACCCESS;
int32_t size = sizeof(SChildTableObj *) * pDb->cfg.maxTables;
pVgroup->tableList = calloc(pDb->cfg.maxTables, sizeof(SChildTableObj *));
if (pVgroup->tableList == NULL) {
mError("vgId:%d, failed to malloc(size:%d) for the tableList of vgroups", pVgroup->vgId, size);
return -1;
}
pVgroup->idPool = taosInitIdPool(pDb->cfg.maxTables);
if (pVgroup->idPool == NULL) {
mError("vgId:%d, failed to taosInitIdPool for vgroups", pVgroup->vgId);
tfree(pVgroup->tableList);
if (mnodeAllocVgroupIdPool(pVgroup) < 0) {
mError("vgId:%d, failed to init idpool for vgroups", pVgroup->vgId);
return -1;
}
......@@ -124,20 +110,6 @@ static int32_t mnodeVgroupActionDelete(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
static void mnodeVgroupUpdateIdPool(SVgObj *pVgroup) {
int32_t oldTables = taosIdPoolMaxSize(pVgroup->idPool);
SDbObj *pDb = pVgroup->pDb;
if (pDb != NULL) {
if (pDb->cfg.maxTables != oldTables) {
mInfo("vgId:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxTables);
taosUpdateIdPool(pVgroup->idPool, pDb->cfg.maxTables);
int32_t size = sizeof(SChildTableObj *) * pDb->cfg.maxTables;
pVgroup->tableList = (SChildTableObj **)realloc(pVgroup->tableList, size);
memset(pVgroup->tableList + oldTables, 0, (pDb->cfg.maxTables - oldTables) * sizeof(SChildTableObj *));
}
}
}
static int32_t mnodeVgroupActionUpdate(SSdbOper *pOper) {
SVgObj *pNew = pOper->pObj;
SVgObj *pVgroup = mnodeGetVgroup(pNew->vgId);
......@@ -162,8 +134,6 @@ static int32_t mnodeVgroupActionUpdate(SSdbOper *pOper) {
}
}
mnodeVgroupUpdateIdPool(pVgroup);
mnodeDecVgroupRef(pVgroup);
mDebug("vgId:%d, is updated, numOfVnode:%d", pVgroup->vgId, pVgroup->numOfVnodes);
......@@ -325,8 +295,122 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl
}
}
SVgObj *mnodeGetAvailableVgroup(SDbObj *pDb) {
return pDb->pHead;
static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup) {
SDbObj *pDb = pInputVgroup->pDb;
if (pDb == NULL) return TSDB_CODE_MND_APP_ERROR;
int32_t currIdPoolSize = TSDB_MIN_TABLES;
for (int32_t v = 0; v < pDb->numOfVgroups; ++v) {
SVgObj *pVgroup = pDb->vgList[v];
if (pVgroup == NULL) continue;
int32_t idPoolSize = taosIdPoolMaxSize(pVgroup->idPool);
currIdPoolSize = MAX(currIdPoolSize, idPoolSize);
}
// new vgroup
if (pInputVgroup->idPool == NULL) {
pInputVgroup->idPool = taosInitIdPool(currIdPoolSize);
if (pInputVgroup->idPool == NULL) {
mError("vgId:%d, failed to init idPool for vgroup, size:%d", pInputVgroup->vgId, currIdPoolSize);
return TSDB_CODE_MND_OUT_OF_MEMORY;
} else {
mDebug("vgId:%d, init idPool for vgroup, size:%d", pInputVgroup->vgId, currIdPoolSize);
return TSDB_CODE_SUCCESS;
}
}
// realloc all vgroups in db
int32_t newIdPoolSize;
if (currIdPoolSize < TSDB_TABLES_STEP) {
newIdPoolSize = currIdPoolSize * 2;
} else {
newIdPoolSize = ((currIdPoolSize / TSDB_TABLES_STEP) + 1) * TSDB_TABLES_STEP;
}
if (newIdPoolSize > tsMaxTablePerVnode) {
mDebug("db:%s, currIdPoolSize:%d newIdPoolSize%d larger than %d", pDb->name, currIdPoolSize, newIdPoolSize,
tsMaxTablePerVnode);
return TSDB_CODE_MND_NO_ENOUGH_DNODES;
}
for (int32_t v = 0; v < pDb->numOfVgroups; ++v) {
SVgObj *pVgroup = pDb->vgList[v];
if (pVgroup == NULL) continue;
int32_t oldIdPoolSize = taosIdPoolMaxSize(pVgroup->idPool);
if (taosUpdateIdPool(pVgroup->idPool, newIdPoolSize) < 0) {
mError("vgId:%d, failed to update idPoolSize from %d to %d", pVgroup->vgId, oldIdPoolSize, newIdPoolSize);
return TSDB_CODE_MND_NO_ENOUGH_DNODES;
} else {
mDebug("vgId:%d, idPoolSize update from %d to %d", pVgroup->vgId, oldIdPoolSize, newIdPoolSize);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSid) {
SDbObj *pDb = pMsg->pDb;
pthread_mutex_lock(&pDb->mutex);
for (int32_t v = 0; v < pDb->numOfVgroups; ++v) {
int vgIndex = (v + pDb->vgListIndex) % pDb->numOfVgroups;
SVgObj *pVgroup = pDb->vgList[vgIndex];
if (pVgroup == NULL) {
mError("db:%s, index:%d vgroup is null", pDb->name, vgIndex);
pthread_mutex_unlock(&pDb->mutex);
return TSDB_CODE_MND_APP_ERROR;
}
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid <= 0) {
mDebug("app:%p:%p, db:%s, no enough sid in vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pDb->name, pVgroup->vgId);
continue;
}
*pSid = sid;
*ppVgroup = pVgroup;
pDb->vgListIndex = vgIndex;
pthread_mutex_unlock(&pDb->mutex);
return TSDB_CODE_SUCCESS;
}
int maxVgroupsPerDb = tsMaxVgroupsPerDb;
if (maxVgroupsPerDb <= 0) {
maxVgroupsPerDb = mnodeGetOnlinDnodesCpuCoreNum();
maxVgroupsPerDb = MIN(maxVgroupsPerDb, 2);
}
if (pDb->numOfVgroups < maxVgroupsPerDb) {
mDebug("app:%p:%p, db:%s, start to create a new vgroup, numOfVgroups:%d maxVgroupsPerDb:%d", pMsg->rpcMsg.ahandle, pMsg,
pDb->name, pDb->numOfVgroups, maxVgroupsPerDb);
pthread_mutex_unlock(&pDb->mutex);
return mnodeCreateVgroup(pMsg);
}
SVgObj *pVgroup = pDb->vgList[0];
int32_t code = mnodeAllocVgroupIdPool(pVgroup);
if (code != TSDB_CODE_SUCCESS) {
pthread_mutex_unlock(&pDb->mutex);
return code;
}
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid <= 0) {
mError("app:%p:%p, db:%s, no enough sid in vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pDb->name, pVgroup->vgId);
pthread_mutex_unlock(&pDb->mutex);
return TSDB_CODE_MND_NO_ENOUGH_DNODES;
}
*pSid = sid;
*ppVgroup = pVgroup;
pDb->vgListIndex = 0;
pthread_mutex_unlock(&pDb->mutex);
return TSDB_CODE_SUCCESS;
}
void *mnodeGetNextVgroup(void *pIter, SVgObj **pVgroup) {
......@@ -354,8 +438,9 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
int32_t mnodeCreateVgroup(SMnodeMsg *pMsg, SDbObj *pDb) {
int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) {
if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR;
SDbObj *pDb = pMsg->pDb;
SVgObj *pVgroup = (SVgObj *)calloc(1, sizeof(SVgObj));
tstrncpy(pVgroup->dbName, pDb->name, TSDB_ACCT_LEN + TSDB_DB_NAME_LEN);
......@@ -430,29 +515,21 @@ int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
int32_t maxReplica = 0;
SVgObj *pVgroup = NULL;
STableObj *pTable = NULL;
if (pShow->payloadLen > 0 ) {
pTable = mnodeGetTable(pShow->payload);
if (NULL == pTable || pTable->type == TSDB_SUPER_TABLE) {
mnodeDecTableRef(pTable);
return TSDB_CODE_MND_INVALID_TABLE_NAME;
}
mnodeDecTableRef(pTable);
pVgroup = mnodeGetVgroup(((SChildTableObj*)pTable)->vgId);
if (NULL == pVgroup) return TSDB_CODE_MND_INVALID_TABLE_NAME;
mnodeDecVgroupRef(pVgroup);
maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
} else {
SVgObj *pVgroup = pDb->pHead;
while (pVgroup != NULL) {
maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
pVgroup = pVgroup->next;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "maxTables");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->maxReplica = 1;
for (int32_t v = 0; v < pDb->numOfVgroups; ++v) {
SVgObj *pVgroup = pDb->vgList[v];
if (pVgroup != NULL) {
pShow->maxReplica = pVgroup->numOfVnodes > pShow->maxReplica ? pVgroup->numOfVnodes : pShow->maxReplica;
}
}
for (int32_t i = 0; i < maxReplica; ++i) {
for (int32_t i = 0; i < pShow->maxReplica; ++i) {
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "dnode");
......@@ -476,43 +553,48 @@ int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = pDb->numOfVgroups;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
if (NULL == pTable) {
pShow->numOfRows = pDb->numOfVgroups;
pShow->pIter = pDb->pHead;
} else {
pShow->numOfRows = 1;
pShow->pIter = pVgroup;
}
mnodeDecDbRef(pDb);
return 0;
}
mnodeDecDbRef(pDb);
static bool mnodeFilterVgroups(SVgObj *pVgroup, STableObj *pTable) {
if (NULL == pTable || pTable->type == TSDB_SUPER_TABLE) {
return true;
}
return 0;
SChildTableObj *pCTable = (SChildTableObj *)pTable;
if (pVgroup->vgId == pCTable->vgId) {
return true;
} else {
return false;
}
}
int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
SVgObj *pVgroup = NULL;
int32_t maxReplica = 0;
int32_t cols = 0;
char * pWrite;
SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return 0;
pVgroup = pDb->pHead;
while (pVgroup != NULL) {
maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
pVgroup = pVgroup->next;
STableObj *pTable = NULL;
if (pShow->payloadLen > 0 ) {
pTable = mnodeGetTable(pShow->payload);
}
while (numOfRows < rows) {
pVgroup = (SVgObj *) pShow->pIter;
pShow->pIter = mnodeGetNextVgroup(pShow->pIter, &pVgroup);
if (pVgroup == NULL) break;
pShow->pIter = (void *) pVgroup->next;
if (!mnodeFilterVgroups(pVgroup, pTable)) continue;
cols = 0;
......@@ -524,7 +606,11 @@ int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pC
*(int32_t *) pWrite = pVgroup->numOfTables;
cols++;
for (int32_t i = 0; i < maxReplica; ++i) {
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *) pWrite = tsMaxTablePerVnode;
cols++;
for (int32_t i = 0; i < pShow->maxReplica; ++i) {
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *) pWrite = pVgroup->vnodeGid[i].dnodeId;
cols++;
......@@ -552,38 +638,36 @@ int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pC
}
}
mnodeDecVgroupRef(pVgroup);
numOfRows++;
}
pShow->numOfReads += numOfRows;
mnodeDecTableRef(pTable);
mnodeDecDbRef(pDb);
return numOfRows;
}
void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) {
if (pTable->sid >= 1 && pVgroup->tableList[pTable->sid - 1] == NULL) {
pVgroup->tableList[pTable->sid - 1] = pTable;
int32_t idPoolSize = taosIdPoolMaxSize(pVgroup->idPool);
if (pTable->sid > idPoolSize) {
mnodeAllocVgroupIdPool(pVgroup);
}
if (pTable->sid >= 1) {
taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid);
pVgroup->numOfTables++;
mnodeIncVgroupRef(pVgroup);
}
if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxTables) {
mnodeMoveVgroupToTail(pVgroup);
}
mnodeIncVgroupRef(pVgroup);
}
void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable) {
if (pTable->sid >= 1 && pVgroup->tableList[pTable->sid - 1] != NULL) {
pVgroup->tableList[pTable->sid - 1] = NULL;
if (pTable->sid >= 1) {
taosFreeId(pVgroup->idPool, pTable->sid);
pVgroup->numOfTables--;
mnodeDecVgroupRef(pVgroup);
}
mnodeMoveVgroupToHead(pVgroup);
mnodeDecVgroupRef(pVgroup);
}
SMDCreateVnodeMsg *mnodeBuildCreateVnodeMsg(SVgObj *pVgroup) {
......@@ -594,13 +678,14 @@ SMDCreateVnodeMsg *mnodeBuildCreateVnodeMsg(SVgObj *pVgroup) {
if (pVnode == NULL) return NULL;
strcpy(pVnode->db, pVgroup->dbName);
int32_t maxTables = taosIdPoolMaxSize(pVgroup->idPool);
SMDVnodeCfg *pCfg = &pVnode->cfg;
pCfg->vgId = htonl(pVgroup->vgId);
pCfg->cfgVersion = htonl(pDb->cfgVersion);
pCfg->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize);
pCfg->totalBlocks = htonl(pDb->cfg.totalBlocks);
pCfg->maxTables = htonl(pDb->cfg.maxTables + 1);
pCfg->maxTables = htonl(maxTables + 1);
pCfg->daysPerFile = htonl(pDb->cfg.daysPerFile);
pCfg->daysToKeep = htonl(pDb->cfg.daysToKeep);
pCfg->daysToKeep1 = htonl(pDb->cfg.daysToKeep1);
......@@ -822,6 +907,7 @@ void mnodeDropAllDnodeVgroups(SDnodeObj *pDropDnode) {
mInfo("dnode:%d, all vgroups is dropped from sdb", pDropDnode->dnodeId);
}
#if 0
void mnodeUpdateAllDbVgroups(SDbObj *pAlterDb) {
void * pIter = NULL;
SVgObj *pVgroup = NULL;
......@@ -843,6 +929,7 @@ void mnodeUpdateAllDbVgroups(SDbObj *pAlterDb) {
mInfo("db:%s, all vgroups is updated in sdb", pAlterDb->name);
}
#endif
void mnodeDropAllDbVgroups(SDbObj *pDropDb) {
void * pIter = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册