提交 f8be6486 编写于 作者: M Minglei Jin

fix: new alter keep[012], walLevel, walFsyncPeriod, cacheLast

上级 c2faef73
...@@ -297,8 +297,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -297,8 +297,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("message in fetch queue is processing"); vTrace("message in fetch queue is processing");
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType == TDMT_VND_BATCH_META) && pMsg->msgType == TDMT_VND_BATCH_META) &&
!vnodeIsLeader(pVnode)) { !vnodeIsLeader(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg); vnodeRedirectRpcMsg(pVnode, pMsg);
return 0; return 0;
...@@ -486,7 +486,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR ...@@ -486,7 +486,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
rcode = -1; rcode = -1;
goto _exit; goto _exit;
} }
// validate hash // validate hash
sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name); sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
if (vnodeValidateTableHash(pVnode, tbName) < 0) { if (vnodeValidateTableHash(pVnode, tbName) < 0) {
...@@ -516,7 +516,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR ...@@ -516,7 +516,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
tdUidStoreFree(pStore); tdUidStoreFree(pStore);
// prepare rsp // prepare rsp
int32_t ret = 0; int32_t ret = 0;
tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret); tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
pRsp->pCont = rpcMallocCont(pRsp->contLen); pRsp->pCont = rpcMallocCont(pRsp->contLen);
if (pRsp->pCont == NULL) { if (pRsp->pCont == NULL) {
...@@ -977,6 +977,8 @@ static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, vo ...@@ -977,6 +977,8 @@ static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, vo
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SAlterVnodeReq alterReq = {0}; SAlterVnodeReq alterReq = {0};
bool walChanged = false;
if (tDeserializeSAlterVnodeReq(pReq, len, &alterReq) != 0) { if (tDeserializeSAlterVnodeReq(pReq, len, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG;
...@@ -986,9 +988,50 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void ...@@ -986,9 +988,50 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void
alterReq.cacheLastSize); alterReq.cacheLastSize);
if (pVnode->config.cacheLastSize != alterReq.cacheLastSize) { if (pVnode->config.cacheLastSize != alterReq.cacheLastSize) {
pVnode->config.cacheLastSize = alterReq.cacheLastSize; pVnode->config.cacheLastSize = alterReq.cacheLastSize;
// TODO: save config
tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024); tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
} }
if (pVnode->config.cacheLast != alterReq.cacheLast) {
pVnode->config.cacheLast = alterReq.cacheLast;
}
if (pVnode->config.walCfg.fsyncPeriod != alterReq.walFsyncPeriod) {
pVnode->config.walCfg.fsyncPeriod = alterReq.walFsyncPeriod;
walChanged = true;
}
if (pVnode->config.walCfg.level != alterReq.walLevel) {
pVnode->config.walCfg.level = alterReq.walLevel;
walChanged = true;
}
if (pVnode->config.tsdbCfg.keep0 != alterReq.daysToKeep0) {
pVnode->config.tsdbCfg.keep0 != alterReq.daysToKeep0;
if (!VND_IS_RSMA(pVnode)) {
pVnode->pTsdb->keepCfg.keep0 = alterReq.daysToKeep0;
}
}
if (pVnode->config.tsdbCfg.keep1 != alterReq.daysToKeep1) {
pVnode->config.tsdbCfg.keep1 != alterReq.daysToKeep1;
if (!VND_IS_RSMA(pVnode)) {
pVnode->pTsdb->keepCfg.keep1 = alterReq.daysToKeep1;
}
}
if (pVnode->config.tsdbCfg.keep2 != alterReq.daysToKeep2) {
pVnode->config.tsdbCfg.keep2 != alterReq.daysToKeep2;
if (!VND_IS_RSMA(pVnode)) {
pVnode->pTsdb->keepCfg.keep2 = alterReq.daysToKeep2;
}
}
if (walChanged) {
walAlter(pVnode->pWal, pVnode->config.walCfg);
}
return 0; return 0;
} }
...@@ -1021,10 +1064,10 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -1021,10 +1064,10 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq
taosArrayDestroy(pRes->uidList); taosArrayDestroy(pRes->uidList);
SVDeleteRsp rsp = {.affectedRows = pRes->affectedRows}; SVDeleteRsp rsp = {.affectedRows = pRes->affectedRows};
int32_t ret = 0; int32_t ret = 0;
tEncodeSize(tEncodeSVDeleteRsp, &rsp, pRsp->contLen, ret); tEncodeSize(tEncodeSVDeleteRsp, &rsp, pRsp->contLen, ret);
pRsp->pCont = rpcMallocCont(pRsp->contLen); pRsp->pCont = rpcMallocCont(pRsp->contLen);
SEncoder ec = {0}; SEncoder ec = {0};
tEncoderInit(&ec, pRsp->pCont, pRsp->contLen); tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
tEncodeSVDeleteRsp(&ec, &rsp); tEncodeSVDeleteRsp(&ec, &rsp);
tEncoderClear(&ec); tEncoderClear(&ec);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册