diff --git a/src/dnode/src/dnodeVMgmt.c b/src/dnode/src/dnodeVMgmt.c index 7e3807d98376ef4e9aecae8a11dc029cb235db04..bc24d1bf623ec014dd4a4ad35442218549aaf335 100644 --- a/src/dnode/src/dnodeVMgmt.c +++ b/src/dnode/src/dnodeVMgmt.c @@ -143,7 +143,7 @@ static SCreateVnodeMsg* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) { pCreate->cfg.fsyncPeriod = htonl(pCreate->cfg.fsyncPeriod); pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime); - for (int32_t j = 0; j < pCreate->cfg.replications; ++j) { + for (int32_t j = 0; j < pCreate->cfg.vgReplica; ++j) { pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId); } diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 69c01e6763eb6459c350e6f3536c34525eaf6668..12cff90be2d776d4ae146512937e6f632e4d15a2 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -209,9 +209,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid version file") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Database memory is full for commit failed") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FLOWCTRL, 0, 0x050C, "Database memory is full for waiting commit") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_DROPPING, 0, 0x050D, "Database is dropping") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_BALANCING, 0, 0x050E, "Database is balancing") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Database write operation denied") -TAOS_DEFINE_ERROR(TSDB_CODE_VND_SYNCING, 0, 0x0513, "Database is syncing") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_SYNCING, 0, 0x0513, "Database is syncing") // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "Invalid table ID") diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index b4480951c906d6827e5ddaece41f8b1c42ca094d..62c06122b6a8ad5d6679f1b477c0da2779dd0539 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -664,13 +664,14 @@ typedef struct { int8_t precision; int8_t compression; int8_t walLevel; - int8_t replications; + int8_t vgReplica; int8_t wals; int8_t quorum; int8_t update; int8_t cacheLastRow; int32_t vgCfgVersion; - int8_t reserved[10]; + int8_t dbReplica; + int8_t reserved[9]; } SVnodeCfg; typedef struct { diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index b79425afbbe42c834ee21d63f531c918d46b5997..5b2e89ce16447a6cc011155f82a6db5e203e242b 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -861,11 +861,12 @@ static SCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) { pCfg->precision = pDb->cfg.precision; pCfg->compression = pDb->cfg.compression; pCfg->walLevel = pDb->cfg.walLevel; - pCfg->replications = (int8_t) pVgroup->numOfVnodes; + pCfg->vgReplica = (int8_t) pVgroup->numOfVnodes; pCfg->wals = 3; pCfg->quorum = pDb->cfg.quorum; pCfg->update = pDb->cfg.update; pCfg->cacheLastRow = pDb->cfg.cacheLastRow; + pCfg->dbReplica = pDb->cfg.replications; SVnodeDesc *pNodes = pVnode->nodes; for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index fb1868ab13cd677542d0a5a70d48e8b8ee2df829..7adeda590a509df9554c5dc83fa66758ed8ed9d2 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -45,6 +45,9 @@ typedef struct { int8_t accessState; int8_t isFull; int8_t isCommiting; + int8_t dbReplica; + int8_t dropped; + int8_t reserved; uint64_t version; // current version uint64_t cversion; // version while commit start uint64_t fversion; // version on saved data file @@ -64,7 +67,6 @@ typedef struct { void * qMgmt; char * rootDir; tsem_t sem; - int8_t dropped; char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN]; pthread_mutex_t statusMutex; } SVnodeObj; diff --git a/src/vnode/src/vnodeCfg.c b/src/vnode/src/vnodeCfg.c index b47fedd46ecd34b4d763b9654a7aab1d9e1833ee..0b32f97939a04b695c641a076e863fe37d85a437 100644 --- a/src/vnode/src/vnodeCfg.c +++ b/src/vnode/src/vnodeCfg.c @@ -38,8 +38,9 @@ static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) { pVnode->walCfg.walLevel = vnodeMsg->cfg.walLevel; pVnode->walCfg.fsyncPeriod = vnodeMsg->cfg.fsyncPeriod; pVnode->walCfg.keep = TAOS_WAL_NOT_KEEP; - pVnode->syncCfg.replica = vnodeMsg->cfg.replications; + pVnode->syncCfg.replica = vnodeMsg->cfg.vgReplica; pVnode->syncCfg.quorum = vnodeMsg->cfg.quorum; + pVnode->dbReplica = vnodeMsg->cfg.dbReplica; for (int i = 0; i < pVnode->syncCfg.replica; ++i) { SVnodeDesc *node = &vnodeMsg->nodes[i]; @@ -203,12 +204,21 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { } vnodeMsg.cfg.wals = (int8_t)wals->valueint; - cJSON *replica = cJSON_GetObjectItem(root, "replica"); - if (!replica || replica->type != cJSON_Number) { + cJSON *vgReplica = cJSON_GetObjectItem(root, "replica"); + if (!vgReplica || vgReplica->type != cJSON_Number) { vError("vgId:%d, failed to read %s, replica not found", pVnode->vgId, file); goto PARSE_VCFG_ERROR; } - vnodeMsg.cfg.replications = (int8_t)replica->valueint; + vnodeMsg.cfg.vgReplica = (int8_t)vgReplica->valueint; + + cJSON *dbReplica = cJSON_GetObjectItem(root, "dbReplica"); + if (!dbReplica || dbReplica->type != cJSON_Number) { + vError("vgId:%d, failed to read %s, dbReplica not found", pVnode->vgId, file); + vnodeMsg.cfg.dbReplica = vnodeMsg.cfg.vgReplica; + vnodeMsg.cfg.vgCfgVersion = 0; + } else { + vnodeMsg.cfg.dbReplica = (int8_t)dbReplica->valueint; + } cJSON *quorum = cJSON_GetObjectItem(root, "quorum"); if (!quorum || quorum->type != cJSON_Number) { @@ -220,8 +230,8 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { cJSON *cacheLastRow = cJSON_GetObjectItem(root, "cacheLastRow"); if (!cacheLastRow || cacheLastRow->type != cJSON_Number) { vError("vgId: %d, failed to read %s, cacheLastRow not found", pVnode->vgId, file); - //goto PARSE_VCFG_ERROR; vnodeMsg.cfg.cacheLastRow = 0; + vnodeMsg.cfg.vgCfgVersion = 0; } else { vnodeMsg.cfg.cacheLastRow = (int8_t)cacheLastRow->valueint; } @@ -233,7 +243,7 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { } int size = cJSON_GetArraySize(nodeInfos); - if (size != vnodeMsg.cfg.replications) { + if (size != vnodeMsg.cfg.vgReplica) { vError("vgId:%d, failed to read %s, nodeInfos size not matched", pVnode->vgId, file); goto PARSE_VCFG_ERROR; } @@ -311,17 +321,18 @@ int32_t vnodeWriteCfg(SCreateVnodeMsg *pMsg) { len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pMsg->cfg.compression); len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pMsg->cfg.walLevel); len += snprintf(content + len, maxLen - len, " \"fsync\": %d,\n", pMsg->cfg.fsyncPeriod); - len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pMsg->cfg.replications); + len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pMsg->cfg.vgReplica); + len += snprintf(content + len, maxLen - len, " \"dbReplica\": %d,\n", pMsg->cfg.dbReplica); len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pMsg->cfg.wals); len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pMsg->cfg.quorum); len += snprintf(content + len, maxLen - len, " \"cacheLastRow\": %d,\n", pMsg->cfg.cacheLastRow); len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); - for (int32_t i = 0; i < pMsg->cfg.replications; i++) { + for (int32_t i = 0; i < pMsg->cfg.vgReplica; i++) { SVnodeDesc *node = &pMsg->nodes[i]; dnodeUpdateEp(node->nodeId, node->nodeEp, NULL, NULL); len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", node->nodeId); len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", node->nodeEp); - if (i < pMsg->cfg.replications - 1) { + if (i < pMsg->cfg.vgReplica - 1) { len += snprintf(content + len, maxLen - len, " },{\n"); } else { len += snprintf(content + len, maxLen - len, " }]\n"); diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 7cbe73fd45ab564dff1c08a9f3ec47f53acc05dc..a3a88e8b7b143684f047ac395b91e830c2884fe8 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -108,6 +108,13 @@ static int32_t vnodeCheckWrite(void *vparam) { return TSDB_CODE_VND_NO_WRITE_AUTH; } + if (pVnode->dbReplica != pVnode->syncCfg.replica && + pVnode->syncCfg.nodeInfo[pVnode->syncCfg.replica - 1].nodeId == dnodeGetDnodeId()) { + vDebug("vgId:%d, vnode is balancing and will be dropped, dbReplica:%d vgReplica:%d, refCount:%d pVnode:%p", + pVnode->vgId, pVnode->dbReplica, pVnode->syncCfg.replica, pVnode->refCount, pVnode); + return TSDB_CODE_VND_IS_BALANCING; + } + // tsdb may be in reset state if (pVnode->tsdb == NULL) { vDebug("vgId:%d, tsdb is null, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); @@ -271,7 +278,7 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) { static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) { SVWriteMsg *pWrite = param; SVnodeObj * pVnode = pWrite->pVnode; - int32_t code = TSDB_CODE_VND_SYNCING; + int32_t code = TSDB_CODE_VND_IS_SYNCING; if (pVnode->flowctrlLevel <= 0) code = TSDB_CODE_VND_IS_FLOWCTRL;