提交 67a8a7bf 编写于 作者: S Shengliang Guan

TD-2429

上级 c2f62c98
...@@ -143,7 +143,7 @@ static SCreateVnodeMsg* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -143,7 +143,7 @@ static SCreateVnodeMsg* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
pCreate->cfg.fsyncPeriod = htonl(pCreate->cfg.fsyncPeriod); pCreate->cfg.fsyncPeriod = htonl(pCreate->cfg.fsyncPeriod);
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime); pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
for (int32_t j = 0; j < pCreate->cfg.replications; ++j) { for (int32_t j = 0; j < pCreate->cfg.vgReplica; ++j) {
pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId); pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId);
} }
......
...@@ -209,9 +209,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected ...@@ -209,9 +209,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid version file") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid version file")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Database memory is full for commit failed") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Database memory is full for commit failed")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FLOWCTRL, 0, 0x050C, "Database memory is full for waiting commit") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FLOWCTRL, 0, 0x050C, "Database memory is full for waiting commit")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_DROPPING, 0, 0x050D, "Database is dropping")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_BALANCING, 0, 0x050E, "Database is balancing")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Database write operation denied") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Database write operation denied")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_SYNCING, 0, 0x0513, "Database is syncing") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_SYNCING, 0, 0x0513, "Database is syncing")
// tsdb // tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "Invalid table ID") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "Invalid table ID")
......
...@@ -664,13 +664,14 @@ typedef struct { ...@@ -664,13 +664,14 @@ typedef struct {
int8_t precision; int8_t precision;
int8_t compression; int8_t compression;
int8_t walLevel; int8_t walLevel;
int8_t replications; int8_t vgReplica;
int8_t wals; int8_t wals;
int8_t quorum; int8_t quorum;
int8_t update; int8_t update;
int8_t cacheLastRow; int8_t cacheLastRow;
int32_t vgCfgVersion; int32_t vgCfgVersion;
int8_t reserved[10]; int8_t dbReplica;
int8_t reserved[9];
} SVnodeCfg; } SVnodeCfg;
typedef struct { typedef struct {
......
...@@ -861,11 +861,12 @@ static SCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) { ...@@ -861,11 +861,12 @@ static SCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) {
pCfg->precision = pDb->cfg.precision; pCfg->precision = pDb->cfg.precision;
pCfg->compression = pDb->cfg.compression; pCfg->compression = pDb->cfg.compression;
pCfg->walLevel = pDb->cfg.walLevel; pCfg->walLevel = pDb->cfg.walLevel;
pCfg->replications = (int8_t) pVgroup->numOfVnodes; pCfg->vgReplica = (int8_t) pVgroup->numOfVnodes;
pCfg->wals = 3; pCfg->wals = 3;
pCfg->quorum = pDb->cfg.quorum; pCfg->quorum = pDb->cfg.quorum;
pCfg->update = pDb->cfg.update; pCfg->update = pDb->cfg.update;
pCfg->cacheLastRow = pDb->cfg.cacheLastRow; pCfg->cacheLastRow = pDb->cfg.cacheLastRow;
pCfg->dbReplica = pDb->cfg.replications;
SVnodeDesc *pNodes = pVnode->nodes; SVnodeDesc *pNodes = pVnode->nodes;
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
......
...@@ -45,6 +45,9 @@ typedef struct { ...@@ -45,6 +45,9 @@ typedef struct {
int8_t accessState; int8_t accessState;
int8_t isFull; int8_t isFull;
int8_t isCommiting; int8_t isCommiting;
int8_t dbReplica;
int8_t dropped;
int8_t reserved;
uint64_t version; // current version uint64_t version; // current version
uint64_t cversion; // version while commit start uint64_t cversion; // version while commit start
uint64_t fversion; // version on saved data file uint64_t fversion; // version on saved data file
...@@ -64,7 +67,6 @@ typedef struct { ...@@ -64,7 +67,6 @@ typedef struct {
void * qMgmt; void * qMgmt;
char * rootDir; char * rootDir;
tsem_t sem; tsem_t sem;
int8_t dropped;
char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
pthread_mutex_t statusMutex; pthread_mutex_t statusMutex;
} SVnodeObj; } SVnodeObj;
......
...@@ -38,8 +38,9 @@ static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) { ...@@ -38,8 +38,9 @@ static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) {
pVnode->walCfg.walLevel = vnodeMsg->cfg.walLevel; pVnode->walCfg.walLevel = vnodeMsg->cfg.walLevel;
pVnode->walCfg.fsyncPeriod = vnodeMsg->cfg.fsyncPeriod; pVnode->walCfg.fsyncPeriod = vnodeMsg->cfg.fsyncPeriod;
pVnode->walCfg.keep = TAOS_WAL_NOT_KEEP; pVnode->walCfg.keep = TAOS_WAL_NOT_KEEP;
pVnode->syncCfg.replica = vnodeMsg->cfg.replications; pVnode->syncCfg.replica = vnodeMsg->cfg.vgReplica;
pVnode->syncCfg.quorum = vnodeMsg->cfg.quorum; pVnode->syncCfg.quorum = vnodeMsg->cfg.quorum;
pVnode->dbReplica = vnodeMsg->cfg.dbReplica;
for (int i = 0; i < pVnode->syncCfg.replica; ++i) { for (int i = 0; i < pVnode->syncCfg.replica; ++i) {
SVnodeDesc *node = &vnodeMsg->nodes[i]; SVnodeDesc *node = &vnodeMsg->nodes[i];
...@@ -203,12 +204,21 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { ...@@ -203,12 +204,21 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
} }
vnodeMsg.cfg.wals = (int8_t)wals->valueint; vnodeMsg.cfg.wals = (int8_t)wals->valueint;
cJSON *replica = cJSON_GetObjectItem(root, "replica"); cJSON *vgReplica = cJSON_GetObjectItem(root, "replica");
if (!replica || replica->type != cJSON_Number) { if (!vgReplica || vgReplica->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, replica not found", pVnode->vgId, file); vError("vgId:%d, failed to read %s, replica not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
vnodeMsg.cfg.replications = (int8_t)replica->valueint; vnodeMsg.cfg.vgReplica = (int8_t)vgReplica->valueint;
cJSON *dbReplica = cJSON_GetObjectItem(root, "dbReplica");
if (!dbReplica || dbReplica->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, dbReplica not found", pVnode->vgId, file);
vnodeMsg.cfg.dbReplica = vnodeMsg.cfg.vgReplica;
vnodeMsg.cfg.vgCfgVersion = 0;
} else {
vnodeMsg.cfg.dbReplica = (int8_t)dbReplica->valueint;
}
cJSON *quorum = cJSON_GetObjectItem(root, "quorum"); cJSON *quorum = cJSON_GetObjectItem(root, "quorum");
if (!quorum || quorum->type != cJSON_Number) { if (!quorum || quorum->type != cJSON_Number) {
...@@ -220,8 +230,8 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { ...@@ -220,8 +230,8 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
cJSON *cacheLastRow = cJSON_GetObjectItem(root, "cacheLastRow"); cJSON *cacheLastRow = cJSON_GetObjectItem(root, "cacheLastRow");
if (!cacheLastRow || cacheLastRow->type != cJSON_Number) { if (!cacheLastRow || cacheLastRow->type != cJSON_Number) {
vError("vgId: %d, failed to read %s, cacheLastRow not found", pVnode->vgId, file); vError("vgId: %d, failed to read %s, cacheLastRow not found", pVnode->vgId, file);
//goto PARSE_VCFG_ERROR;
vnodeMsg.cfg.cacheLastRow = 0; vnodeMsg.cfg.cacheLastRow = 0;
vnodeMsg.cfg.vgCfgVersion = 0;
} else { } else {
vnodeMsg.cfg.cacheLastRow = (int8_t)cacheLastRow->valueint; vnodeMsg.cfg.cacheLastRow = (int8_t)cacheLastRow->valueint;
} }
...@@ -233,7 +243,7 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { ...@@ -233,7 +243,7 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
} }
int size = cJSON_GetArraySize(nodeInfos); int size = cJSON_GetArraySize(nodeInfos);
if (size != vnodeMsg.cfg.replications) { if (size != vnodeMsg.cfg.vgReplica) {
vError("vgId:%d, failed to read %s, nodeInfos size not matched", pVnode->vgId, file); vError("vgId:%d, failed to read %s, nodeInfos size not matched", pVnode->vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
...@@ -311,17 +321,18 @@ int32_t vnodeWriteCfg(SCreateVnodeMsg *pMsg) { ...@@ -311,17 +321,18 @@ int32_t vnodeWriteCfg(SCreateVnodeMsg *pMsg) {
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pMsg->cfg.compression); len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pMsg->cfg.compression);
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pMsg->cfg.walLevel); len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pMsg->cfg.walLevel);
len += snprintf(content + len, maxLen - len, " \"fsync\": %d,\n", pMsg->cfg.fsyncPeriod); len += snprintf(content + len, maxLen - len, " \"fsync\": %d,\n", pMsg->cfg.fsyncPeriod);
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pMsg->cfg.replications); len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pMsg->cfg.vgReplica);
len += snprintf(content + len, maxLen - len, " \"dbReplica\": %d,\n", pMsg->cfg.dbReplica);
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pMsg->cfg.wals); len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pMsg->cfg.wals);
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pMsg->cfg.quorum); len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pMsg->cfg.quorum);
len += snprintf(content + len, maxLen - len, " \"cacheLastRow\": %d,\n", pMsg->cfg.cacheLastRow); len += snprintf(content + len, maxLen - len, " \"cacheLastRow\": %d,\n", pMsg->cfg.cacheLastRow);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < pMsg->cfg.replications; i++) { for (int32_t i = 0; i < pMsg->cfg.vgReplica; i++) {
SVnodeDesc *node = &pMsg->nodes[i]; SVnodeDesc *node = &pMsg->nodes[i];
dnodeUpdateEp(node->nodeId, node->nodeEp, NULL, NULL); dnodeUpdateEp(node->nodeId, node->nodeEp, NULL, NULL);
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", node->nodeId); len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", node->nodeId);
len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", node->nodeEp); len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", node->nodeEp);
if (i < pMsg->cfg.replications - 1) { if (i < pMsg->cfg.vgReplica - 1) {
len += snprintf(content + len, maxLen - len, " },{\n"); len += snprintf(content + len, maxLen - len, " },{\n");
} else { } else {
len += snprintf(content + len, maxLen - len, " }]\n"); len += snprintf(content + len, maxLen - len, " }]\n");
......
...@@ -108,6 +108,13 @@ static int32_t vnodeCheckWrite(void *vparam) { ...@@ -108,6 +108,13 @@ static int32_t vnodeCheckWrite(void *vparam) {
return TSDB_CODE_VND_NO_WRITE_AUTH; return TSDB_CODE_VND_NO_WRITE_AUTH;
} }
if (pVnode->dbReplica != pVnode->syncCfg.replica &&
pVnode->syncCfg.nodeInfo[pVnode->syncCfg.replica - 1].nodeId == dnodeGetDnodeId()) {
vDebug("vgId:%d, vnode is balancing and will be dropped, dbReplica:%d vgReplica:%d, refCount:%d pVnode:%p",
pVnode->vgId, pVnode->dbReplica, pVnode->syncCfg.replica, pVnode->refCount, pVnode);
return TSDB_CODE_VND_IS_BALANCING;
}
// tsdb may be in reset state // tsdb may be in reset state
if (pVnode->tsdb == NULL) { if (pVnode->tsdb == NULL) {
vDebug("vgId:%d, tsdb is null, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); vDebug("vgId:%d, tsdb is null, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
...@@ -271,7 +278,7 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) { ...@@ -271,7 +278,7 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) { static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
SVWriteMsg *pWrite = param; SVWriteMsg *pWrite = param;
SVnodeObj * pVnode = pWrite->pVnode; SVnodeObj * pVnode = pWrite->pVnode;
int32_t code = TSDB_CODE_VND_SYNCING; int32_t code = TSDB_CODE_VND_IS_SYNCING;
if (pVnode->flowctrlLevel <= 0) code = TSDB_CODE_VND_IS_FLOWCTRL; if (pVnode->flowctrlLevel <= 0) code = TSDB_CODE_VND_IS_FLOWCTRL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册