提交 9a9468d1 编写于 作者: S slguan

[TD-185] add db alter options

上级 b13c6c02
...@@ -1377,11 +1377,6 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1377,11 +1377,6 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd->payloadLen = sizeof(SCMAlterDbMsg); pCmd->payloadLen = sizeof(SCMAlterDbMsg);
pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB; pCmd->msgType = TSDB_MSG_TYPE_CM_ALTER_DB;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload; SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
strcpy(pAlterDbMsg->db, pTableMetaInfo->name); strcpy(pAlterDbMsg->db, pTableMetaInfo->name);
......
...@@ -128,6 +128,7 @@ static void dnodeCloseVnodes() { ...@@ -128,6 +128,7 @@ static void dnodeCloseVnodes() {
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.cfgVersion = htonl(pCreate->cfg.cfgVersion);
pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables); pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables);
pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize); pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize);
pCreate->cfg.totalBlocks = htonl(pCreate->cfg.totalBlocks); pCreate->cfg.totalBlocks = htonl(pCreate->cfg.totalBlocks);
......
...@@ -505,6 +505,7 @@ typedef struct SRetrieveTableRsp { ...@@ -505,6 +505,7 @@ typedef struct SRetrieveTableRsp {
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int32_t cfgVersion;
int64_t totalStorage; int64_t totalStorage;
int64_t compStorage; int64_t compStorage;
int64_t pointsWritten; int64_t pointsWritten;
...@@ -599,6 +600,7 @@ typedef struct { ...@@ -599,6 +600,7 @@ typedef struct {
typedef struct { typedef struct {
uint32_t vgId; uint32_t vgId;
int32_t cfgVersion;
int32_t cacheBlockSize; int32_t cacheBlockSize;
int32_t totalBlocks; int32_t totalBlocks;
int32_t maxTables; int32_t maxTables;
......
...@@ -164,6 +164,7 @@ typedef struct SDbObj { ...@@ -164,6 +164,7 @@ typedef struct SDbObj {
char name[TSDB_DB_NAME_LEN + 1]; char name[TSDB_DB_NAME_LEN + 1];
char acct[TSDB_USER_LEN + 1]; char acct[TSDB_USER_LEN + 1];
int64_t createdTime; int64_t createdTime;
int32_t cfgVersion;
SDbCfg cfg; SDbCfg cfg;
int8_t status; int8_t status;
int8_t reserved[14]; int8_t reserved[14];
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "tglobal.h" #include "tglobal.h"
#include "ttime.h" #include "ttime.h"
#include "tname.h" #include "tname.h"
#include "tbalance.h"
#include "mgmtDef.h" #include "mgmtDef.h"
#include "mgmtLog.h" #include "mgmtLog.h"
#include "mgmtAcct.h" #include "mgmtAcct.h"
...@@ -715,37 +716,63 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) { ...@@ -715,37 +716,63 @@ static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) {
static SDbCfg mgmtGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) { static SDbCfg mgmtGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
SDbCfg newCfg = pDb->cfg; SDbCfg newCfg = pDb->cfg;
int32_t daysToKeep = htonl(pAlter->daysToKeep); int32_t cacheBlockSize = htonl(pAlter->daysToKeep);
int32_t totalBlocks = htonl(pAlter->totalBlocks);
int32_t maxTables = htonl(pAlter->maxSessions); int32_t maxTables = htonl(pAlter->maxSessions);
int32_t daysToKeep = htonl(pAlter->daysToKeep);
int32_t daysToKeep1 = htonl(pAlter->daysToKeep1);
int32_t daysToKeep2 = htonl(pAlter->daysToKeep2);
int8_t compression = pAlter->compression;
int8_t replications = pAlter->replications; int8_t replications = pAlter->replications;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
if (cacheBlockSize > 0 && cacheBlockSize != pDb->cfg.cacheBlockSize) {
mTrace("db:%s, cache:%d change to %d", pDb->name, pDb->cfg.cacheBlockSize, cacheBlockSize);
newCfg.cacheBlockSize = cacheBlockSize;
}
if (totalBlocks > 0 && totalBlocks != pDb->cfg.totalBlocks) {
mTrace("db:%s, blocks:%d change to %d", pDb->name, pDb->cfg.totalBlocks, totalBlocks);
newCfg.totalBlocks = totalBlocks;
}
if (maxTables > 0 && maxTables != pDb->cfg.maxTables) {
mTrace("db:%s, tables:%d change to %d", pDb->name, pDb->cfg.maxTables, maxTables);
newCfg.maxTables = maxTables;
if (newCfg.maxTables < pDb->cfg.maxTables) {
mTrace("db:%s, tables:%d should larger than origin:%d", pDb->name, newCfg.maxTables, pDb->cfg.maxTables);
terrno = TSDB_CODE_INVALID_OPTION;
}
}
if (daysToKeep > 0 && daysToKeep != pDb->cfg.daysToKeep) { if (daysToKeep > 0 && daysToKeep != pDb->cfg.daysToKeep) {
mTrace("db:%s, daysToKeep:%d change to %d", pDb->name, pDb->cfg.daysToKeep, daysToKeep); mTrace("db:%s, daysToKeep:%d change to %d", pDb->name, pDb->cfg.daysToKeep, daysToKeep);
newCfg.daysToKeep = daysToKeep; newCfg.daysToKeep = daysToKeep;
} }
if (replications > 0 && replications != pDb->cfg.replications) { if (daysToKeep1 > 0 && daysToKeep1 != pDb->cfg.daysToKeep1) {
mTrace("db:%s, replica:%d change to %d", pDb->name, pDb->cfg.replications, replications); mTrace("db:%s, daysToKeep1:%d change to %d", pDb->name, pDb->cfg.daysToKeep1, daysToKeep1);
if (replications < TSDB_MIN_REPLICA_NUM || replications > TSDB_MAX_REPLICA_NUM) { newCfg.daysToKeep1 = daysToKeep1;
mError("invalid db option replica: %d valid range: %d--%d", replications, TSDB_MIN_REPLICA_NUM, TSDB_MAX_REPLICA_NUM);
terrno = TSDB_CODE_INVALID_OPTION;
} }
newCfg.replications = replications;
if (daysToKeep2 > 0 && daysToKeep2 != pDb->cfg.daysToKeep2) {
mTrace("db:%s, daysToKeep2:%d change to %d", pDb->name, pDb->cfg.daysToKeep2, daysToKeep2);
newCfg.daysToKeep2 = daysToKeep2;
} }
if (maxTables > 0 && maxTables != pDb->cfg.maxTables) { if (compression > 0 && compression != pDb->cfg.compression) {
mTrace("db:%s, tables:%d change to %d", pDb->name, pDb->cfg.maxTables, maxTables); mTrace("db:%s, compression:%d change to %d", pDb->name, pDb->cfg.compression, compression);
if (maxTables < TSDB_MIN_TABLES || maxTables > TSDB_MAX_TABLES) { newCfg.compression = compression;
mError("invalid db option tables: %d valid range: %d--%d", maxTables, TSDB_MIN_TABLES, TSDB_MAX_TABLES);
terrno = TSDB_CODE_INVALID_OPTION;
} }
if (maxTables < pDb->cfg.maxTables) {
mError("invalid db option tables: %d should larger than original:%d", maxTables, pDb->cfg.maxTables); if (replications > 0 && replications != pDb->cfg.replications) {
terrno = TSDB_CODE_INVALID_OPTION; mTrace("db:%s, replications:%d change to %d", pDb->name, pDb->cfg.replications, replications);
newCfg.replications = replications;
} }
newCfg.maxTables = maxTables; if (replications > mgmtGetDnodesNum()) {
mError("db:%s, no enough dnode to change replica:%d", pDb->name, replications);
terrno = TSDB_CODE_NO_ENOUGH_DNODES;
} }
return newCfg; return newCfg;
...@@ -757,8 +784,16 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) { ...@@ -757,8 +784,16 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
return terrno; return terrno;
} }
int32_t code = mgmtCheckDbCfg(&newCfg);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t oldReplica = pDb->cfg.replications;
if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) { if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) {
pDb->cfg = newCfg; pDb->cfg = newCfg;
pDb->cfgVersion++;
SSdbOper oper = { SSdbOper oper = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsDbSdb, .table = tsDbSdb,
...@@ -772,6 +807,19 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) { ...@@ -772,6 +807,19 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
} }
} }
void *pNode = NULL;
while (1) {
SVgObj *pVgroup = NULL;
pNode = mgmtGetNextVgroup(pNode, &pVgroup);
if (pVgroup == NULL) break;
mgmtSendCreateVgroupMsg(pVgroup, NULL);
mgmtDecVgroupRef(pVgroup);
}
if (oldReplica != pDb->cfg.replications) {
balanceNotify();
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -799,16 +847,6 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { ...@@ -799,16 +847,6 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
return; return;
} }
SVgObj *pVgroup = pDb->pHead;
if (pVgroup != NULL) {
mPrint("vgroup:%d, will be altered", pVgroup->vgId);
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
newMsg->ahandle = pVgroup;
newMsg->expected = pVgroup->numOfVnodes;
mgmtAlterVgroup(pVgroup, newMsg);
return;
}
mTrace("db:%s, all vgroups is altered", pDb->name); mTrace("db:%s, all vgroups is altered", pDb->name);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS);
} }
......
...@@ -336,13 +336,14 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -336,13 +336,14 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
if (pStatus->dnodeId == 0) { if (pStatus->dnodeId == 0) {
mTrace("dnode:%d, first access, privateIp:%s, name:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName); mTrace("dnode:%d, first access, privateIp:%s, name:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName);
} else { } else {
//mTrace("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess); mTrace("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess);
} }
int32_t openVnodes = htons(pStatus->openVnodes); int32_t openVnodes = htons(pStatus->openVnodes);
for (int32_t j = 0; j < openVnodes; ++j) { for (int32_t j = 0; j < openVnodes; ++j) {
SVnodeLoad *pVload = &pStatus->load[j]; SVnodeLoad *pVload = &pStatus->load[j];
pVload->vgId = htonl(pVload->vgId); pVload->vgId = htonl(pVload->vgId);
pVload->cfgVersion = htonl(pVload->cfgVersion);
SVgObj *pVgroup = mgmtGetVgroup(pVload->vgId); SVgObj *pVgroup = mgmtGetVgroup(pVload->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
......
...@@ -272,8 +272,9 @@ void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVlo ...@@ -272,8 +272,9 @@ void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVlo
pVgroup->pointsWritten = htobe64(pVload->pointsWritten); pVgroup->pointsWritten = htobe64(pVload->pointsWritten);
} }
if (pVload->replica != pVgroup->numOfVnodes) { if (pVload->cfgVersion != pVgroup->pDb->cfgVersion || pVload->replica != pVgroup->numOfVnodes) {
mError("dnode:%d, vgroup:%d replica:%d not match with mgmt:%d", pDnode->dnodeId, pVload->vgId, pVload->replica, mError("dnode:%d, vgroup:%d, vnode cfgVersion:%d repica:%d not match with mgmt cfgVersion:%d replica:%d",
pDnode->dnodeId, pVload->vgId, pVload->cfgVersion, pVload->replica, pVgroup->pDb->cfgVersion,
pVgroup->numOfVnodes); pVgroup->numOfVnodes);
mgmtSendCreateVgroupMsg(pVgroup, NULL); mgmtSendCreateVgroupMsg(pVgroup, NULL);
} }
...@@ -535,6 +536,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { ...@@ -535,6 +536,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
SMDVnodeCfg *pCfg = &pVnode->cfg; SMDVnodeCfg *pCfg = &pVnode->cfg;
pCfg->vgId = htonl(pVgroup->vgId); pCfg->vgId = htonl(pVgroup->vgId);
pCfg->cfgVersion = htonl(pDb->cfgVersion);
pCfg->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize); pCfg->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize);
pCfg->totalBlocks = htonl(pDb->cfg.totalBlocks); pCfg->totalBlocks = htonl(pDb->cfg.totalBlocks);
pCfg->maxTables = htonl(pDb->cfg.maxTables); pCfg->maxTables = htonl(pDb->cfg.maxTables);
...@@ -769,15 +771,3 @@ void mgmtDropAllVgroups(SDbObj *pDropDb) { ...@@ -769,15 +771,3 @@ void mgmtDropAllVgroups(SDbObj *pDropDb) {
mPrint("db:%s, all vgroups:%d is dropped from sdb", pDropDb->name, numOfVgroups); mPrint("db:%s, all vgroups:%d is dropped from sdb", pDropDb->name, numOfVgroups);
} }
void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle) {
assert(ahandle != NULL);
if (pVgroup->numOfVnodes != pVgroup->pDb->cfg.replications) {
// TODO:
// mgmtSendAlterVgroupMsg(pVgroup, NULL);
} else {
mgmtAddToShellQueue(ahandle);
}
}
...@@ -36,6 +36,7 @@ typedef struct { ...@@ -36,6 +36,7 @@ typedef struct {
void *sync; void *sync;
void *events; void *events;
void *cq; // continuous query void *cq; // continuous query
int32_t cfgVersion;
STsdbCfg tsdbCfg; STsdbCfg tsdbCfg;
SSyncCfg syncCfg; SSyncCfg syncCfg;
SWalCfg walCfg; SWalCfg walCfg;
......
...@@ -331,6 +331,7 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) { ...@@ -331,6 +331,7 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) {
SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++]; SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
pLoad->vgId = htonl(pVnode->vgId); pLoad->vgId = htonl(pVnode->vgId);
pLoad->cfgVersion = htonl(pVnode->cfgVersion);
pLoad->totalStorage = htobe64(pLoad->totalStorage); pLoad->totalStorage = htobe64(pLoad->totalStorage);
pLoad->compStorage = htobe64(pLoad->compStorage); pLoad->compStorage = htobe64(pLoad->compStorage);
pLoad->pointsWritten = htobe64(pLoad->pointsWritten); pLoad->pointsWritten = htobe64(pLoad->pointsWritten);
...@@ -389,6 +390,7 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -389,6 +390,7 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pVnodeCfg->cfg.cfgVersion);
len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pVnodeCfg->cfg.cacheBlockSize); len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pVnodeCfg->cfg.cacheBlockSize);
len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pVnodeCfg->cfg.totalBlocks); len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pVnodeCfg->cfg.totalBlocks);
len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables); len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables);
...@@ -463,6 +465,13 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { ...@@ -463,6 +465,13 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON *cfgVersion = cJSON_GetObjectItem(root, "cfgVersion");
if (!cfgVersion || cfgVersion->type != cJSON_Number) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, cfgVersion not found", pVnode, pVnode->vgId);
goto PARSE_OVER;
}
pVnode->cfgVersion = cfgVersion->valueint;
cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize"); cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize");
if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) { if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, cacheBlockSize not found", pVnode, pVnode->vgId); dError("pVnode:%p vgId:%d, failed to read vnode cfg, cacheBlockSize not found", pVnode, pVnode->vgId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册