diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index c04871471205a066e5c4f5fadda866823a7cac0c..a9b2e12fd5c061a8099d3dff2960f4d781ec20a8 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1377,11 +1377,6 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pCmd->payloadLen = sizeof(SCMAlterDbMsg); pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB; - if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { - tscError("%p failed to malloc for query msg", pSql); - return TSDB_CODE_CLI_OUT_OF_MEMORY; - } - SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload; STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); strcpy(pAlterDbMsg->db, pTableMetaInfo->name); diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index a319c0bb9a733febe4c093159e6c3749f7878b4f..fef91c578662c7c7b52bb66663d504c288528a7e 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -128,6 +128,7 @@ static void dnodeCloseVnodes() { static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); + pCreate->cfg.cfgVersion = htonl(pCreate->cfg.cfgVersion); pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables); pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize); pCreate->cfg.totalBlocks = htonl(pCreate->cfg.totalBlocks); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 54523196dd54c9b3d3ff104e5cfe1a293455bd4d..1b422af31f2652272e9c4ccb886106911315dfb5 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -505,6 +505,7 @@ typedef struct SRetrieveTableRsp { typedef struct { int32_t vgId; + int32_t cfgVersion; int64_t totalStorage; int64_t compStorage; int64_t pointsWritten; @@ -599,6 +600,7 @@ typedef struct { typedef struct { uint32_t vgId; + int32_t cfgVersion; int32_t cacheBlockSize; int32_t totalBlocks; int32_t maxTables; diff --git a/src/mnode/inc/mgmtDef.h b/src/mnode/inc/mgmtDef.h index f91a8d6bb4c98c1abb88ce5a3367af22445ccb89..a7e75f977761937c7ad272d907e9046f2f6fbc92 100644 --- a/src/mnode/inc/mgmtDef.h +++ b/src/mnode/inc/mgmtDef.h @@ -164,6 +164,7 @@ typedef struct SDbObj { char name[TSDB_DB_NAME_LEN + 1]; char acct[TSDB_USER_LEN + 1]; int64_t createdTime; + int32_t cfgVersion; SDbCfg cfg; int8_t status; int8_t reserved[14]; diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 844d40b6056c23e9d59e03e010c9b5279026c3ee..29d23432319f2db0525e7606ab3256816b6a3c37 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -22,6 +22,7 @@ #include "tglobal.h" #include "ttime.h" #include "tname.h" +#include "tbalance.h" #include "mgmtDef.h" #include "mgmtLog.h" #include "mgmtAcct.h" @@ -715,39 +716,65 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) { static SDbCfg mgmtGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) { SDbCfg newCfg = pDb->cfg; - int32_t daysToKeep = htonl(pAlter->daysToKeep); + int32_t cacheBlockSize = htonl(pAlter->daysToKeep); + int32_t totalBlocks = htonl(pAlter->totalBlocks); int32_t maxTables = htonl(pAlter->maxSessions); + int32_t daysToKeep = htonl(pAlter->daysToKeep); + int32_t daysToKeep1 = htonl(pAlter->daysToKeep1); + int32_t daysToKeep2 = htonl(pAlter->daysToKeep2); + int8_t compression = pAlter->compression; int8_t replications = pAlter->replications; terrno = TSDB_CODE_SUCCESS; + if (cacheBlockSize > 0 && cacheBlockSize != pDb->cfg.cacheBlockSize) { + mTrace("db:%s, cache:%d change to %d", pDb->name, pDb->cfg.cacheBlockSize, cacheBlockSize); + newCfg.cacheBlockSize = cacheBlockSize; + } + + if (totalBlocks > 0 && totalBlocks != pDb->cfg.totalBlocks) { + mTrace("db:%s, blocks:%d change to %d", pDb->name, pDb->cfg.totalBlocks, totalBlocks); + newCfg.totalBlocks = totalBlocks; + } + + if (maxTables > 0 && maxTables != pDb->cfg.maxTables) { + mTrace("db:%s, tables:%d change to %d", pDb->name, pDb->cfg.maxTables, maxTables); + newCfg.maxTables = maxTables; + if (newCfg.maxTables < pDb->cfg.maxTables) { + mTrace("db:%s, tables:%d should larger than origin:%d", pDb->name, newCfg.maxTables, pDb->cfg.maxTables); + terrno = TSDB_CODE_INVALID_OPTION; + } + } + if (daysToKeep > 0 && daysToKeep != pDb->cfg.daysToKeep) { mTrace("db:%s, daysToKeep:%d change to %d", pDb->name, pDb->cfg.daysToKeep, daysToKeep); newCfg.daysToKeep = daysToKeep; - } - + } + + if (daysToKeep1 > 0 && daysToKeep1 != pDb->cfg.daysToKeep1) { + mTrace("db:%s, daysToKeep1:%d change to %d", pDb->name, pDb->cfg.daysToKeep1, daysToKeep1); + newCfg.daysToKeep1 = daysToKeep1; + } + + if (daysToKeep2 > 0 && daysToKeep2 != pDb->cfg.daysToKeep2) { + mTrace("db:%s, daysToKeep2:%d change to %d", pDb->name, pDb->cfg.daysToKeep2, daysToKeep2); + newCfg.daysToKeep2 = daysToKeep2; + } + + if (compression > 0 && compression != pDb->cfg.compression) { + mTrace("db:%s, compression:%d change to %d", pDb->name, pDb->cfg.compression, compression); + newCfg.compression = compression; + } + if (replications > 0 && replications != pDb->cfg.replications) { - mTrace("db:%s, replica:%d change to %d", pDb->name, pDb->cfg.replications, replications); - if (replications < TSDB_MIN_REPLICA_NUM || replications > TSDB_MAX_REPLICA_NUM) { - mError("invalid db option replica: %d valid range: %d--%d", replications, TSDB_MIN_REPLICA_NUM, TSDB_MAX_REPLICA_NUM); - terrno = TSDB_CODE_INVALID_OPTION; - } + mTrace("db:%s, replications:%d change to %d", pDb->name, pDb->cfg.replications, replications); newCfg.replications = replications; } - - if (maxTables > 0 && maxTables != pDb->cfg.maxTables) { - mTrace("db:%s, tables:%d change to %d", pDb->name, pDb->cfg.maxTables, maxTables); - if (maxTables < TSDB_MIN_TABLES || maxTables > TSDB_MAX_TABLES) { - mError("invalid db option tables: %d valid range: %d--%d", maxTables, TSDB_MIN_TABLES, TSDB_MAX_TABLES); - terrno = TSDB_CODE_INVALID_OPTION; - } - if (maxTables < pDb->cfg.maxTables) { - mError("invalid db option tables: %d should larger than original:%d", maxTables, pDb->cfg.maxTables); - terrno = TSDB_CODE_INVALID_OPTION; - } - newCfg.maxTables = maxTables; + if (replications > mgmtGetDnodesNum()) { + mError("db:%s, no enough dnode to change replica:%d", pDb->name, replications); + terrno = TSDB_CODE_NO_ENOUGH_DNODES; } - + return newCfg; } @@ -757,8 +784,16 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) { return terrno; } + int32_t code = mgmtCheckDbCfg(&newCfg); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + int32_t oldReplica = pDb->cfg.replications; + if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) { pDb->cfg = newCfg; + pDb->cfgVersion++; SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsDbSdb, @@ -771,7 +806,20 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) { return TSDB_CODE_SDB_ERROR; } } - + + void *pNode = NULL; + while (1) { + SVgObj *pVgroup = NULL; + pNode = mgmtGetNextVgroup(pNode, &pVgroup); + if (pVgroup == NULL) break; + mgmtSendCreateVgroupMsg(pVgroup, NULL); + mgmtDecVgroupRef(pVgroup); + } + + if (oldReplica != pDb->cfg.replications) { + balanceNotify(); + } + return TSDB_CODE_SUCCESS; } @@ -799,16 +847,6 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { return; } - SVgObj *pVgroup = pDb->pHead; - if (pVgroup != NULL) { - mPrint("vgroup:%d, will be altered", pVgroup->vgId); - SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); - newMsg->ahandle = pVgroup; - newMsg->expected = pVgroup->numOfVnodes; - mgmtAlterVgroup(pVgroup, newMsg); - return; - } - mTrace("db:%s, all vgroups is altered", pDb->name); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); } diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 8afeadeb0c3b54d7a44c07e49d91479f6f966559..efa0b03ded09c95db07f9abeb9113614beb8e90a 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -336,13 +336,14 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { if (pStatus->dnodeId == 0) { mTrace("dnode:%d, first access, privateIp:%s, name:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName); } else { - //mTrace("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess); + mTrace("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess); } int32_t openVnodes = htons(pStatus->openVnodes); for (int32_t j = 0; j < openVnodes; ++j) { SVnodeLoad *pVload = &pStatus->load[j]; pVload->vgId = htonl(pVload->vgId); + pVload->cfgVersion = htonl(pVload->cfgVersion); SVgObj *pVgroup = mgmtGetVgroup(pVload->vgId); if (pVgroup == NULL) { diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index feedf5fdc3b9148e3a31d0e2c8accc27a5cf53d9..45512b86b320c5f6b706dc404f0b22c6315c2aff 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -272,8 +272,9 @@ void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVlo pVgroup->pointsWritten = htobe64(pVload->pointsWritten); } - if (pVload->replica != pVgroup->numOfVnodes) { - mError("dnode:%d, vgroup:%d replica:%d not match with mgmt:%d", pDnode->dnodeId, pVload->vgId, pVload->replica, + if (pVload->cfgVersion != pVgroup->pDb->cfgVersion || pVload->replica != pVgroup->numOfVnodes) { + mError("dnode:%d, vgroup:%d, vnode cfgVersion:%d repica:%d not match with mgmt cfgVersion:%d replica:%d", + pDnode->dnodeId, pVload->vgId, pVload->cfgVersion, pVload->replica, pVgroup->pDb->cfgVersion, pVgroup->numOfVnodes); mgmtSendCreateVgroupMsg(pVgroup, NULL); } @@ -535,6 +536,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { SMDVnodeCfg *pCfg = &pVnode->cfg; pCfg->vgId = htonl(pVgroup->vgId); + pCfg->cfgVersion = htonl(pDb->cfgVersion); pCfg->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize); pCfg->totalBlocks = htonl(pDb->cfg.totalBlocks); pCfg->maxTables = htonl(pDb->cfg.maxTables); @@ -769,15 +771,3 @@ void mgmtDropAllVgroups(SDbObj *pDropDb) { mPrint("db:%s, all vgroups:%d is dropped from sdb", pDropDb->name, numOfVgroups); } - -void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle) { - assert(ahandle != NULL); - - if (pVgroup->numOfVnodes != pVgroup->pDb->cfg.replications) { - // TODO: - // mgmtSendAlterVgroupMsg(pVgroup, NULL); - } else { - mgmtAddToShellQueue(ahandle); - } -} - diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index f1b9d55d54daf224c96529499ff8bf9ffc94a512..d99cb1c96c044f83305f05829b2565f79d4993d0 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -36,6 +36,7 @@ typedef struct { void *sync; void *events; void *cq; // continuous query + int32_t cfgVersion; STsdbCfg tsdbCfg; SSyncCfg syncCfg; SWalCfg walCfg; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 153bdbffd806f4addc6aa68b9f445f5a9aab40c6..96470c4e3a81bf20651642e423fd0f3a26871db7 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -331,6 +331,7 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) { SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++]; pLoad->vgId = htonl(pVnode->vgId); + pLoad->cfgVersion = htonl(pVnode->cfgVersion); pLoad->totalStorage = htobe64(pLoad->totalStorage); pLoad->compStorage = htobe64(pLoad->compStorage); pLoad->pointsWritten = htobe64(pLoad->pointsWritten); @@ -389,6 +390,7 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pVnodeCfg->cfg.cfgVersion); len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pVnodeCfg->cfg.cacheBlockSize); len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pVnodeCfg->cfg.totalBlocks); len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables); @@ -463,6 +465,13 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { goto PARSE_OVER; } + cJSON *cfgVersion = cJSON_GetObjectItem(root, "cfgVersion"); + if (!cfgVersion || cfgVersion->type != cJSON_Number) { + dError("pVnode:%p vgId:%d, failed to read vnode cfg, cfgVersion not found", pVnode, pVnode->vgId); + goto PARSE_OVER; + } + pVnode->cfgVersion = cfgVersion->valueint; + cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize"); if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) { dError("pVnode:%p vgId:%d, failed to read vnode cfg, cacheBlockSize not found", pVnode, pVnode->vgId);