提交 dbae9a47 编写于 作者: S Shengliang Guan

fix: check memory while alter db buffer

上级 37c193b3
......@@ -212,38 +212,47 @@ static int32_t vmTsmaProcessCreate(SVnode *pVnode, SCreateVnodeReq *pReq) {
}
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SCreateVnodeReq createReq = {0};
SCreateVnodeReq req = {0};
SVnodeCfg vnodeCfg = {0};
SWrapperCfg wrapperCfg = {0};
int32_t code = -1;
char path[TSDB_FILENAME_LEN] = {0};
if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
dInfo(
"vgId:%d, start to create vnode, tsma:%d standby:%d cacheLast:%d cacheLastSize:%d sstTrigger:%d "
"tsdbPageSize:%d",
createReq.vgId, createReq.isTsma, createReq.standby, createReq.cacheLast, createReq.cacheLastSize,
createReq.sstTrigger, createReq.tsdbPageSize);
dInfo("vgId:%d, hashMethod:%d begin:%u end:%u prefix:%d surfix:%d", createReq.vgId, createReq.hashMethod,
createReq.hashBegin, createReq.hashEnd, createReq.hashPrefix, createReq.hashSuffix);
vmGenerateVnodeCfg(&createReq, &vnodeCfg);
if (vmTsmaAdjustDays(&vnodeCfg, &createReq) < 0) {
dError("vgId:%d, failed to adjust tsma days since %s", createReq.vgId, terrstr());
"vgId:%d, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
" cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
"days:%d keep0:%d keep1:%d keep2:%d tsma:%d precision:%d compression:%d minRows:%d maxRows:%d, wal "
"fsync:%d level:%d retentionPeriod:%d retentionSize:%d rollPeriod:%d segSize:%d, hash method:%d begin:%u end:%u "
"prefix:%d surfix:%d replica:%d selfIndex:%d strict:%d",
req.vgId, req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024,
req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize, req.tsdbPageSize * 1024, req.db, req.dbUid,
req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2, req.isTsma, req.precision, req.compression,
req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
req.replica, req.selfIndex, req.strict);
for (int32_t i = 0; i < req.replica; ++i) {
dInfo("vgId:%d, replica:%d fqdn:%s port:%u", req.vgId, req.replicas[i].id, req.replicas[i].fqdn,
req.replicas[i].port);
}
vmGenerateVnodeCfg(&req, &vnodeCfg);
if (vmTsmaAdjustDays(&vnodeCfg, &req) < 0) {
dError("vgId:%d, failed to adjust tsma days since %s", req.vgId, terrstr());
code = terrno;
goto _OVER;
}
vmGenerateWrapperCfg(pMgmt, &createReq, &wrapperCfg);
vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, createReq.vgId);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode != NULL) {
dDebug("vgId:%d, already exist", createReq.vgId);
tFreeSCreateVnodeReq(&createReq);
dDebug("vgId:%d, already exist", req.vgId);
tFreeSCreateVnodeReq(&req);
vmReleaseVnode(pMgmt, pVnode);
terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
code = terrno;
......@@ -252,36 +261,36 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) {
tFreeSCreateVnodeReq(&createReq);
dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
tFreeSCreateVnodeReq(&req);
dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr());
code = terrno;
goto _OVER;
}
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr());
dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
code = terrno;
goto _OVER;
}
code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
if (code != 0) {
dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr());
dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
code = terrno;
goto _OVER;
}
code = vmTsmaProcessCreate(pImpl, &createReq);
code = vmTsmaProcessCreate(pImpl, &req);
if (code != 0) {
dError("vgId:%d, failed to create tsma since %s", createReq.vgId, terrstr());
dError("vgId:%d, failed to create tsma since %s", req.vgId, terrstr());
code = terrno;
goto _OVER;
}
code = vnodeStart(pImpl);
if (code != 0) {
dError("vgId:%d, failed to start sync since %s", createReq.vgId, terrstr());
dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
goto _OVER;
}
......@@ -296,10 +305,10 @@ _OVER:
vnodeClose(pImpl);
vnodeDestroy(path, pMgmt->pTfs);
} else {
dInfo("vgId:%d, vnode is created", createReq.vgId);
dInfo("vgId:%d, vnode is created", req.vgId);
}
tFreeSCreateVnodeReq(&createReq);
tFreeSCreateVnodeReq(&req);
terrno = code;
return code;
}
......
......@@ -44,7 +44,8 @@ int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pV
int32_t mndAddDropVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool isRedo);
int32_t mndSetMoveVgroupInfoToTrans(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vn, SArray *pArray);
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *, STrans *pTrans, int32_t dropDnodeId);
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray);
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
SArray *pArray);
void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *cntlen, bool standby);
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
......
......@@ -746,7 +746,7 @@ static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
return 0;
}
static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
......@@ -756,8 +756,8 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (mndVgroupInDb(pVgroup, pNew->uid)) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pNew, pVgroup, pArray) != 0) {
if (mndVgroupInDb(pVgroup, pNewDb->uid)) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pOldDb, pNewDb, pVgroup, pArray) != 0) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
taosArrayDestroy(pArray);
......
......@@ -1530,44 +1530,81 @@ _OVER:
#endif
}
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
if (pVgroup->replica <= 0 || pVgroup->replica == pDb->cfg.replications) {
return mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_CONFIG);
static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pOldVgroup,
SVgObj *pNewVgroup, SArray *pArray) {
for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
SDnodeObj *pDnode = taosArrayGet(pArray, i);
bool inVgroup = false;
for (int32_t j = 0; j < pOldVgroup->replica; ++j) {
SVnodeGid *pVgId = &pOldVgroup->vnodeGid[i];
if (pDnode->id == pVgId->dnodeId) {
pDnode->memUsed -= mndGetVgroupMemory(pMnode, pOldDb, pOldVgroup);
inVgroup = true;
}
}
for (int32_t j = 0; j < pNewVgroup->replica; ++j) {
SVnodeGid *pVgId = &pNewVgroup->vnodeGid[i];
if (pDnode->id == pVgId->dnodeId) {
pDnode->memUsed += mndGetVgroupMemory(pMnode, pNewDb, pNewVgroup);
inVgroup = true;
}
}
if (pDnode->memAvail - pDnode->memUsed <= 0) {
mError("db:%s, vgId:%d, no enough memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName,
pNewVgroup->vgId, pDnode->id, pDnode->memAvail, pDnode->memUsed);
terrno = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
return -1;
} else if (inVgroup) {
mInfo("db:%s, vgId:%d, memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName,
pNewVgroup->vgId, pDnode->id, pDnode->memAvail, pDnode->memUsed);
} else {
}
}
return 0;
}
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
SArray *pArray) {
SVgObj newVgroup = {0};
memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, pVgroup, TDMT_VND_ALTER_CONFIG) != 0) return -1;
if (mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, &newVgroup, pVgroup, pArray) != 0) return -1;
return 0;
}
mndTransSetSerial(pTrans);
if (newVgroup.replica < pDb->cfg.replications) {
if (newVgroup.replica < pNewDb->cfg.replications) {
mInfo("db:%s, vgId:%d, vn:0 dnode:%d, will add 2 vnodes", pVgroup->dbName, pVgroup->vgId,
pVgroup->vnodeGid[0].dnodeId);
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[2], true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
} else if (newVgroup.replica > pDb->cfg.replications) {
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2], true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
} else if (newVgroup.replica > pNewDb->cfg.replications) {
mInfo("db:%s, vgId:%d, will remove 2 vnodes", pVgroup->dbName, pVgroup->vgId);
SVnodeGid del1 = {0};
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1;
if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &del1, true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del1, true) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
if (mndAddSetVnodeStandByAction(pMnode, pTrans, pNewDb, pVgroup, &del1, true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
SVnodeGid del2 = {0};
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del2) != 0) return -1;
if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &del2, true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del2, true) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
if (mndAddSetVnodeStandByAction(pMnode, pTrans, pNewDb, pVgroup, &del2, true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
} else {
}
......@@ -1648,8 +1685,8 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg2, TDMT_VND_ALTER_HASHRANGE) != 0) goto _OVER;
// adjust vgroup
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, &newVg1, pArray) != 0) goto _OVER;
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, &newVg2, pArray) != 0) goto _OVER;
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER;
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER;
_OVER:
mndTransDrop(pTrans);
......
......@@ -1024,71 +1024,75 @@ static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, vo
}
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SAlterVnodeReq alterReq = {0};
SAlterVnodeReq req = {0};
bool walChanged = false;
bool tsdbChanged = false;
if (tDeserializeSAlterVnodeReq(pReq, len, &alterReq) != 0) {
if (tDeserializeSAlterVnodeReq(pReq, len, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return TSDB_CODE_INVALID_MSG;
}
vInfo("vgId:%d, start to alter vnode config, cacheLast:%d cacheLastSize:%d", TD_VID(pVnode), alterReq.cacheLast,
alterReq.cacheLastSize);
if (pVnode->config.cacheLastSize != alterReq.cacheLastSize) {
pVnode->config.cacheLastSize = alterReq.cacheLastSize;
vInfo("vgId:%d, start to alter vnode config, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
" cacheLast:%d cacheLastSize:%d days:%d keep0:%d keep1:%d keep2:%d fsync:%d level:%d strict:%d",
TD_VID(pVnode), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024,
req.cacheLast, req.cacheLastSize, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
req.walFsyncPeriod, req.walLevel, req.strict);
if (pVnode->config.cacheLastSize != req.cacheLastSize) {
pVnode->config.cacheLastSize = req.cacheLastSize;
tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
}
if (pVnode->config.szBuf != alterReq.buffer * 1024LL * 1024LL) {
if (pVnode->config.szBuf != req.buffer * 1024LL * 1024LL) {
vInfo("vgId:%d vnode buffer is changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pVnode->config.szBuf,
alterReq.buffer * 1024LL * 1024LL);
pVnode->config.szBuf = alterReq.buffer * 1024LL * 1024LL;
req.buffer * 1024LL * 1024LL);
pVnode->config.szBuf = req.buffer * 1024LL * 1024LL;
}
if (pVnode->config.szCache != alterReq.pages) {
if (metaAlterCache(pVnode->pMeta, alterReq.pages) < 0) {
if (pVnode->config.szCache != req.pages) {
if (metaAlterCache(pVnode->pMeta, req.pages) < 0) {
vError("vgId:%d failed to change vnode pages from %d to %d failed since %s", TD_VID(pVnode),
pVnode->config.szCache, alterReq.pages, tstrerror(errno));
pVnode->config.szCache, req.pages, tstrerror(errno));
return errno;
} else {
vInfo("vgId:%d vnode pages is changed from %d to %d", TD_VID(pVnode), pVnode->config.szCache, alterReq.pages);
pVnode->config.szCache = alterReq.pages;
vInfo("vgId:%d vnode pages is changed from %d to %d", TD_VID(pVnode), pVnode->config.szCache, req.pages);
pVnode->config.szCache = req.pages;
}
}
if (pVnode->config.cacheLast != alterReq.cacheLast) {
pVnode->config.cacheLast = alterReq.cacheLast;
if (pVnode->config.cacheLast != req.cacheLast) {
pVnode->config.cacheLast = req.cacheLast;
}
if (pVnode->config.walCfg.fsyncPeriod != alterReq.walFsyncPeriod) {
pVnode->config.walCfg.fsyncPeriod = alterReq.walFsyncPeriod;
if (pVnode->config.walCfg.fsyncPeriod != req.walFsyncPeriod) {
pVnode->config.walCfg.fsyncPeriod = req.walFsyncPeriod;
walChanged = true;
}
if (pVnode->config.walCfg.level != alterReq.walLevel) {
pVnode->config.walCfg.level = alterReq.walLevel;
if (pVnode->config.walCfg.level != req.walLevel) {
pVnode->config.walCfg.level = req.walLevel;
walChanged = true;
}
if (pVnode->config.tsdbCfg.keep0 != alterReq.daysToKeep0) {
pVnode->config.tsdbCfg.keep0 = alterReq.daysToKeep0;
if (pVnode->config.tsdbCfg.keep0 != req.daysToKeep0) {
pVnode->config.tsdbCfg.keep0 = req.daysToKeep0;
if (!VND_IS_RSMA(pVnode)) {
tsdbChanged = true;
}
}
if (pVnode->config.tsdbCfg.keep1 != alterReq.daysToKeep1) {
pVnode->config.tsdbCfg.keep1 = alterReq.daysToKeep1;
if (pVnode->config.tsdbCfg.keep1 != req.daysToKeep1) {
pVnode->config.tsdbCfg.keep1 = req.daysToKeep1;
if (!VND_IS_RSMA(pVnode)) {
tsdbChanged = true;
}
}
if (pVnode->config.tsdbCfg.keep2 != alterReq.daysToKeep2) {
pVnode->config.tsdbCfg.keep2 = alterReq.daysToKeep2;
if (pVnode->config.tsdbCfg.keep2 != req.daysToKeep2) {
pVnode->config.tsdbCfg.keep2 = req.daysToKeep2;
if (!VND_IS_RSMA(pVnode)) {
tsdbChanged = true;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册