提交 ca3dd878 编写于 作者: S Shengliang Guan

TD-2289

上级 4a78a061
......@@ -23,13 +23,6 @@ extern "C" {
#include "trpc.h"
#include "twal.h"
typedef enum _VN_STATUS {
TAOS_VN_STATUS_INIT = 0,
TAOS_VN_STATUS_READY = 1,
TAOS_VN_STATUS_CLOSING = 2,
TAOS_VN_STATUS_UPDATING = 3,
TAOS_VN_STATUS_RESET = 4,
} EVnodeStatus;
typedef struct {
int32_t len;
......
......@@ -24,6 +24,7 @@ extern "C" {
#include "tsync.h"
#include "twal.h"
#include "tcq.h"
#include "tsdb.h"
extern int32_t vDebugFlag;
......@@ -63,6 +64,7 @@ typedef struct {
tsem_t sem;
int8_t dropped;
char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
pthread_mutex_t statusMutex;
} SVnodeObj;
void vnodeInitWriteFp(void);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_STATUS_H
#define TDENGINE_VNODE_STATUS_H
#ifdef __cplusplus
extern "C" {
#endif
typedef enum _VN_STATUS {
TAOS_VN_STATUS_INIT = 0,
TAOS_VN_STATUS_READY = 1,
TAOS_VN_STATUS_CLOSING = 2,
TAOS_VN_STATUS_UPDATING = 3,
TAOS_VN_STATUS_RESET = 4,
} EVnodeStatus;
bool vnodeSetInitStatus(SVnodeObj* pVnode);
bool vnodeSetReadyStatus(SVnodeObj* pVnode);
bool vnodeSetClosingStatus(SVnodeObj* pVnode);
bool vnodeSetUpdatingStatus(SVnodeObj* pVnode);
bool vnodeSetResetStatus(SVnodeObj* pVnode);
bool vnodeInInitStatus(SVnodeObj* pVnode);
bool vnodeInReadyStatus(SVnodeObj* pVnode);
bool vnodeInClosingStatus(SVnodeObj* pVnode);
bool vnodeInResetStatus(SVnodeObj* pVnode);
#ifdef __cplusplus
}
#endif
#endif
......@@ -19,14 +19,14 @@
#include "taosmsg.h"
#include "tglobal.h"
#include "trpc.h"
#include "tsdb.h"
#include "tutil.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "query.h"
#include "dnode.h"
#include "vnodeCfg.h"
#include "vnodeStatus.h"
#include "vnodeVersion.h"
#include "query.h"
#include "dnode.h"
#include "dnodeVWrite.h"
#include "dnodeVRead.h"
#include "tfs.h"
......@@ -52,14 +52,6 @@ int32_t syncGetNodesRole(int64_t rid, SNodesRole *cfg) { return 0; }
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {}
#endif
char* vnodeStatus[] = {
"init",
"ready",
"closing",
"updating",
"reset"
};
int32_t vnodeInitResources() {
int32_t code = syncInit();
if (code != 0) return code;
......@@ -168,52 +160,57 @@ int32_t vnodeDrop(int32_t vgId) {
return TSDB_CODE_SUCCESS;
}
int32_t vnodeAlter(void *vparam, SCreateVnodeMsg *pVnodeCfg) {
SVnodeObj *pVnode = vparam;
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
// cfgVersion can be corrected by status msg
if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_UPDATING) != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
return TSDB_CODE_SUCCESS;
}
static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) {
int32_t code = vnodeWriteCfg(pVnodeCfg);
if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY;
return code;
}
code = vnodeReadCfg(pVnode);
if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY;
return code;
}
code = walAlter(pVnode->wal, &pVnode->walCfg);
if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY;
return code;
}
code = syncReconfig(pVnode->sync, &pVnode->syncCfg);
if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY;
return code;
}
}
if (pVnode->tsdb) {
code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg);
if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY;
return code;
return code;
}
}
pVnode->status = TAOS_VN_STATUS_READY;
vDebug("vgId:%d, vnode is altered", pVnode->vgId);
return 0;
}
return TSDB_CODE_SUCCESS;
int32_t vnodeAlter(void *vparam, SCreateVnodeMsg *pVnodeCfg) {
SVnodeObj *pVnode = vparam;
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
// cfgVersion can be corrected by status msg
if (vnodeSetUpdatingStatus(pVnode) != 0) {
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
return TSDB_CODE_SUCCESS;
}
int32_t code = vnodeAlterImp(pVnode, pVnodeCfg);
vnodeSetReadyStatus(pVnode);
if (code != 0) {
vError("vgId:%d, failed to alter vnode, code:0x%x", pVnode->vgId, code);
} else {
vDebug("vgId:%d, vnode is altered", pVnode->vgId);
}
return code;
}
int32_t vnodeOpen(int32_t vnode, char *rootDir) {
......@@ -228,13 +225,14 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
atomic_add_fetch_32(&pVnode->refCount, 1);
pVnode->vgId = vnode;
pVnode->status = TAOS_VN_STATUS_INIT;
pVnode->fversion = 0;
pVnode->version = 0;
pVnode->tsdbCfg.tsdbId = pVnode->vgId;
pVnode->rootDir = strdup(rootDir);
pVnode->accessState = TSDB_VN_ALL_ACCCESS;
tsem_init(&pVnode->sem, 0, 0);
pthread_mutex_init(&pVnode->statusMutex, NULL);
vnodeSetInitStatus(pVnode);
int32_t code = vnodeReadCfg(pVnode);
if (code != TSDB_CODE_SUCCESS) {
......@@ -360,7 +358,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
}
#endif
pVnode->status = TAOS_VN_STATUS_READY;
vnodeSetReadyStatus(pVnode);
return TSDB_CODE_SUCCESS;
}
......@@ -386,7 +384,7 @@ void vnodeRelease(void *vparam) {
assert(refCount >= 0);
if (refCount > 0) {
if (pVnode->status == TAOS_VN_STATUS_RESET && refCount <= 3) {
if (vnodeInResetStatus(pVnode) && refCount <= 3) {
tsem_post(&pVnode->sem);
}
return;
......@@ -455,6 +453,7 @@ void vnodeRelease(void *vparam) {
}
tsem_destroy(&pVnode->sem);
pthread_mutex_destroy(&pVnode->statusMutex);
free(pVnode);
tsdbDecCommitRef(vgId);
......@@ -495,7 +494,7 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) {
int64_t compStorage = 0;
int64_t pointsWritten = 0;
if (pVnode->status != TAOS_VN_STATUS_READY) return;
if (!vnodeInReadyStatus(pVnode)) return;
if (pStatus->openVnodes >= TSDB_MAX_VNODES) return;
if (pVnode->tsdb) {
......@@ -565,11 +564,10 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
// remove from hash, so new messages wont be consumed
taosHashRemove(tsVnodesHash, &pVnode->vgId, sizeof(int32_t));
if (pVnode->status != TAOS_VN_STATUS_INIT) {
if (!vnodeInInitStatus(pVnode)) {
// it may be in updateing or reset state, then it shall wait
int32_t i = 0;
while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) !=
TAOS_VN_STATUS_READY) {
while (!vnodeSetClosingStatus(pVnode)) {
if (++i % 1000 == 0) {
sched_yield();
}
......@@ -605,7 +603,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
pVnode->isCommiting = 1;
pVnode->fversion = pVnode->version;
vDebug("vgId:%d, start commit, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
if (pVnode->status != TAOS_VN_STATUS_INIT) {
if (!vnodeInInitStatus(pVnode)) {
return walRenew(pVnode->wal);
}
return 0;
......@@ -615,7 +613,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
pVnode->isCommiting = 0;
pVnode->isFull = 0;
if (pVnode->status != TAOS_VN_STATUS_INIT) {
if (!vnodeInInitStatus(pVnode)) {
walRemoveOneOldFile(pVnode->wal);
}
return vnodeSaveVersion(pVnode);
......@@ -691,8 +689,8 @@ static int32_t vnodeResetTsdb(SVnodeObj *pVnode) {
char rootDir[128] = "\0";
sprintf(rootDir, "vnode/vnode%d/tsdb", pVnode->vgId);
if (pVnode->status != TAOS_VN_STATUS_CLOSING && pVnode->status != TAOS_VN_STATUS_INIT) {
pVnode->status = TAOS_VN_STATUS_RESET;
if (!vnodeSetResetStatus(pVnode)) {
return -1;
}
void *tsdb = pVnode->tsdb;
......@@ -715,7 +713,7 @@ static int32_t vnodeResetTsdb(SVnodeObj *pVnode) {
appH.cqDropFunc = cqDrop;
pVnode->tsdb = tsdbOpenRepo(rootDir, &appH);
pVnode->status = TAOS_VN_STATUS_READY;
vnodeSetReadyStatus(pVnode);
vnodeRelease(pVnode);
return 0;
......
......@@ -24,6 +24,7 @@
#include "tsdb.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "vnodeStatus.h"
#include "tqueue.h"
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead);
......@@ -54,7 +55,7 @@ int32_t vnodeProcessRead(void *vparam, SVReadMsg *pRead) {
}
static int32_t vnodeCheckRead(SVnodeObj *pVnode) {
if (pVnode->status != TAOS_VN_STATUS_READY) {
if (!vnodeInReadyStatus(pVnode)) {
vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "vnodeStatus.h"
char* vnodeStatus[] = {
"init",
"ready",
"closing",
"updating",
"reset"
};
bool vnodeSetInitStatus(SVnodeObj* pVnode) {
pthread_mutex_lock(&pVnode->statusMutex);
pVnode->status = TAOS_VN_STATUS_INIT;
pthread_mutex_unlock(&pVnode->statusMutex);
return true;
}
bool vnodeSetReadyStatus(SVnodeObj* pVnode) {
bool set = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_INIT || pVnode->status == TAOS_VN_STATUS_READY ||
pVnode->status == TAOS_VN_STATUS_UPDATING || pVnode->status == TAOS_VN_STATUS_RESET) {
pVnode->status = TAOS_VN_STATUS_READY;
set = true;
} else {
vDebug("vgId:%d, cannot set status:ready, old:%s", pVnode->vgId, vnodeStatus[pVnode->status]);
}
pthread_mutex_unlock(&pVnode->statusMutex);
return set;
}
bool vnodeSetClosingStatus(SVnodeObj* pVnode) {
bool set = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_READY) {
pVnode->status = TAOS_VN_STATUS_CLOSING;
set = true;
} else {
vTrace("vgId:%d, cannot set status:closing, old:%s", pVnode->vgId, vnodeStatus[pVnode->status]);
}
pthread_mutex_unlock(&pVnode->statusMutex);
return set;
}
bool vnodeSetUpdatingStatus(SVnodeObj* pVnode) {
bool set = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_READY) {
pVnode->status = TAOS_VN_STATUS_UPDATING;
set = true;
} else {
vDebug("vgId:%d, cannot set status:updating, old:%s", pVnode->vgId, vnodeStatus[pVnode->status]);
}
pthread_mutex_unlock(&pVnode->statusMutex);
return set;
}
bool vnodeSetResetStatus(SVnodeObj* pVnode) {
bool set = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status != TAOS_VN_STATUS_CLOSING && pVnode->status != TAOS_VN_STATUS_INIT) {
pVnode->status = TAOS_VN_STATUS_RESET;
set = true;
} else {
vDebug("vgId:%d, cannot set status:reset, old:%s", pVnode->vgId, vnodeStatus[pVnode->status]);
}
pthread_mutex_unlock(&pVnode->statusMutex);
return set;
}
bool vnodeInInitStatus(SVnodeObj* pVnode) {
bool in = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_INIT) {
in = true;
}
pthread_mutex_unlock(&pVnode->statusMutex);
return in;
}
bool vnodeInReadyStatus(SVnodeObj* pVnode) {
bool in = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_READY) {
in = true;
}
pthread_mutex_unlock(&pVnode->statusMutex);
return in;
}
bool vnodeInClosingStatus(SVnodeObj* pVnode) {
bool in = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_CLOSING) {
in = true;
}
pthread_mutex_unlock(&pVnode->statusMutex);
return in;
}
bool vnodeInResetStatus(SVnodeObj* pVnode) {
bool in = false;
pthread_mutex_lock(&pVnode->statusMutex);
if (pVnode->status == TAOS_VN_STATUS_RESET) {
in = true;
}
pthread_mutex_unlock(&pVnode->statusMutex);
return in;
}
......@@ -27,6 +27,7 @@
#include "tdataformat.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "vnodeStatus.h"
#include "syncInt.h"
#include "tcq.h"
#include "dnode.h"
......@@ -68,7 +69,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
taosMsg[pHead->msgType], qtypeStr[qtype], pHead->version, pVnode->version);
if (pHead->version == 0) { // from client or CQ
if (pVnode->status != TAOS_VN_STATUS_READY) {
if (!vnodeInReadyStatus(pVnode)) {
vDebug("vgId:%d, msg:%s not processed since vstatus:%d, qtype:%s hver:%" PRIu64, pVnode->vgId,
taosMsg[pHead->msgType], pVnode->status, qtypeStr[qtype], pHead->version);
return TSDB_CODE_APP_NOT_READY; // it may be in deleting or closing state
......@@ -118,7 +119,7 @@ static int32_t vnodeCheckWrite(void *vparam) {
return TSDB_CODE_APP_NOT_READY;
}
if (pVnode->status == TAOS_VN_STATUS_CLOSING) {
if (vnodeInClosingStatus(pVnode)) {
vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册