diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 5f643295d6e3d70999de55da47470ad0f0c0df93..6b1c3a87f9b84aa53bd78f5ab3a17717bf4849e0 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -23,13 +23,6 @@ extern "C" { #include "trpc.h" #include "twal.h" -typedef enum _VN_STATUS { - TAOS_VN_STATUS_INIT = 0, - TAOS_VN_STATUS_READY = 1, - TAOS_VN_STATUS_CLOSING = 2, - TAOS_VN_STATUS_UPDATING = 3, - TAOS_VN_STATUS_RESET = 4, -} EVnodeStatus; typedef struct { int32_t len; diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index e468c2e83e1560b0711e42d827da9604473f47b0..7d03df5ecf57256cc2499a2130693010db36460d 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -24,6 +24,7 @@ extern "C" { #include "tsync.h" #include "twal.h" #include "tcq.h" +#include "tsdb.h" extern int32_t vDebugFlag; @@ -63,6 +64,7 @@ typedef struct { tsem_t sem; int8_t dropped; char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN]; + pthread_mutex_t statusMutex; } SVnodeObj; void vnodeInitWriteFp(void); diff --git a/src/vnode/inc/vnodeStatus.h b/src/vnode/inc/vnodeStatus.h new file mode 100644 index 0000000000000000000000000000000000000000..911f48e3924d61b5dce19f34120967c5bff9e002 --- /dev/null +++ b/src/vnode/inc/vnodeStatus.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_VNODE_STATUS_H +#define TDENGINE_VNODE_STATUS_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef enum _VN_STATUS { + TAOS_VN_STATUS_INIT = 0, + TAOS_VN_STATUS_READY = 1, + TAOS_VN_STATUS_CLOSING = 2, + TAOS_VN_STATUS_UPDATING = 3, + TAOS_VN_STATUS_RESET = 4, +} EVnodeStatus; + +bool vnodeSetInitStatus(SVnodeObj* pVnode); +bool vnodeSetReadyStatus(SVnodeObj* pVnode); +bool vnodeSetClosingStatus(SVnodeObj* pVnode); +bool vnodeSetUpdatingStatus(SVnodeObj* pVnode); +bool vnodeSetResetStatus(SVnodeObj* pVnode); + +bool vnodeInInitStatus(SVnodeObj* pVnode); +bool vnodeInReadyStatus(SVnodeObj* pVnode); +bool vnodeInClosingStatus(SVnodeObj* pVnode); +bool vnodeInResetStatus(SVnodeObj* pVnode); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 6452cc449950a175f0d31ca771cc50e6fb84577b..3a4881672207ed523aca8224178f6750b6be095b 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -19,14 +19,14 @@ #include "taosmsg.h" #include "tglobal.h" #include "trpc.h" -#include "tsdb.h" #include "tutil.h" #include "vnode.h" #include "vnodeInt.h" -#include "query.h" -#include "dnode.h" #include "vnodeCfg.h" +#include "vnodeStatus.h" #include "vnodeVersion.h" +#include "query.h" +#include "dnode.h" #include "dnodeVWrite.h" #include "dnodeVRead.h" #include "tfs.h" @@ -52,14 +52,6 @@ int32_t syncGetNodesRole(int64_t rid, SNodesRole *cfg) { return 0; } void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {} #endif -char* vnodeStatus[] = { - "init", - "ready", - "closing", - "updating", - "reset" -}; - int32_t vnodeInitResources() { int32_t code = syncInit(); if (code != 0) return code; @@ -168,52 +160,57 @@ int32_t vnodeDrop(int32_t vgId) { return TSDB_CODE_SUCCESS; } -int32_t vnodeAlter(void *vparam, SCreateVnodeMsg *pVnodeCfg) { - SVnodeObj *pVnode = vparam; - - // vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS - // cfgVersion can be corrected by status msg - if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_UPDATING) != TAOS_VN_STATUS_READY) { - vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId); - return TSDB_CODE_SUCCESS; - } - +static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) { int32_t code = vnodeWriteCfg(pVnodeCfg); if (code != TSDB_CODE_SUCCESS) { - pVnode->status = TAOS_VN_STATUS_READY; return code; } code = vnodeReadCfg(pVnode); if (code != TSDB_CODE_SUCCESS) { - pVnode->status = TAOS_VN_STATUS_READY; return code; } code = walAlter(pVnode->wal, &pVnode->walCfg); if (code != TSDB_CODE_SUCCESS) { - pVnode->status = TAOS_VN_STATUS_READY; return code; } code = syncReconfig(pVnode->sync, &pVnode->syncCfg); if (code != TSDB_CODE_SUCCESS) { - pVnode->status = TAOS_VN_STATUS_READY; return code; - } + } if (pVnode->tsdb) { code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg); if (code != TSDB_CODE_SUCCESS) { - pVnode->status = TAOS_VN_STATUS_READY; - return code; + return code; } } - pVnode->status = TAOS_VN_STATUS_READY; - vDebug("vgId:%d, vnode is altered", pVnode->vgId); + return 0; +} - return TSDB_CODE_SUCCESS; +int32_t vnodeAlter(void *vparam, SCreateVnodeMsg *pVnodeCfg) { + SVnodeObj *pVnode = vparam; + + // vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS + // cfgVersion can be corrected by status msg + if (vnodeSetUpdatingStatus(pVnode) != 0) { + vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId); + return TSDB_CODE_SUCCESS; + } + + int32_t code = vnodeAlterImp(pVnode, pVnodeCfg); + vnodeSetReadyStatus(pVnode); + + if (code != 0) { + vError("vgId:%d, failed to alter vnode, code:0x%x", pVnode->vgId, code); + } else { + vDebug("vgId:%d, vnode is altered", pVnode->vgId); + } + + return code; } int32_t vnodeOpen(int32_t vnode, char *rootDir) { @@ -228,13 +225,14 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { atomic_add_fetch_32(&pVnode->refCount, 1); pVnode->vgId = vnode; - pVnode->status = TAOS_VN_STATUS_INIT; pVnode->fversion = 0; pVnode->version = 0; pVnode->tsdbCfg.tsdbId = pVnode->vgId; pVnode->rootDir = strdup(rootDir); pVnode->accessState = TSDB_VN_ALL_ACCCESS; tsem_init(&pVnode->sem, 0, 0); + pthread_mutex_init(&pVnode->statusMutex, NULL); + vnodeSetInitStatus(pVnode); int32_t code = vnodeReadCfg(pVnode); if (code != TSDB_CODE_SUCCESS) { @@ -360,7 +358,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { } #endif - pVnode->status = TAOS_VN_STATUS_READY; + vnodeSetReadyStatus(pVnode); return TSDB_CODE_SUCCESS; } @@ -386,7 +384,7 @@ void vnodeRelease(void *vparam) { assert(refCount >= 0); if (refCount > 0) { - if (pVnode->status == TAOS_VN_STATUS_RESET && refCount <= 3) { + if (vnodeInResetStatus(pVnode) && refCount <= 3) { tsem_post(&pVnode->sem); } return; @@ -455,6 +453,7 @@ void vnodeRelease(void *vparam) { } tsem_destroy(&pVnode->sem); + pthread_mutex_destroy(&pVnode->statusMutex); free(pVnode); tsdbDecCommitRef(vgId); @@ -495,7 +494,7 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) { int64_t compStorage = 0; int64_t pointsWritten = 0; - if (pVnode->status != TAOS_VN_STATUS_READY) return; + if (!vnodeInReadyStatus(pVnode)) return; if (pStatus->openVnodes >= TSDB_MAX_VNODES) return; if (pVnode->tsdb) { @@ -565,11 +564,10 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { // remove from hash, so new messages wont be consumed taosHashRemove(tsVnodesHash, &pVnode->vgId, sizeof(int32_t)); - if (pVnode->status != TAOS_VN_STATUS_INIT) { + if (!vnodeInInitStatus(pVnode)) { // it may be in updateing or reset state, then it shall wait int32_t i = 0; - while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) != - TAOS_VN_STATUS_READY) { + while (!vnodeSetClosingStatus(pVnode)) { if (++i % 1000 == 0) { sched_yield(); } @@ -605,7 +603,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { pVnode->isCommiting = 1; pVnode->fversion = pVnode->version; vDebug("vgId:%d, start commit, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version); - if (pVnode->status != TAOS_VN_STATUS_INIT) { + if (!vnodeInInitStatus(pVnode)) { return walRenew(pVnode->wal); } return 0; @@ -615,7 +613,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version); pVnode->isCommiting = 0; pVnode->isFull = 0; - if (pVnode->status != TAOS_VN_STATUS_INIT) { + if (!vnodeInInitStatus(pVnode)) { walRemoveOneOldFile(pVnode->wal); } return vnodeSaveVersion(pVnode); @@ -691,8 +689,8 @@ static int32_t vnodeResetTsdb(SVnodeObj *pVnode) { char rootDir[128] = "\0"; sprintf(rootDir, "vnode/vnode%d/tsdb", pVnode->vgId); - if (pVnode->status != TAOS_VN_STATUS_CLOSING && pVnode->status != TAOS_VN_STATUS_INIT) { - pVnode->status = TAOS_VN_STATUS_RESET; + if (!vnodeSetResetStatus(pVnode)) { + return -1; } void *tsdb = pVnode->tsdb; @@ -715,7 +713,7 @@ static int32_t vnodeResetTsdb(SVnodeObj *pVnode) { appH.cqDropFunc = cqDrop; pVnode->tsdb = tsdbOpenRepo(rootDir, &appH); - pVnode->status = TAOS_VN_STATUS_READY; + vnodeSetReadyStatus(pVnode); vnodeRelease(pVnode); return 0; diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 5ef79cfbf01fb49ebbfd4603b8a9163b3ae9bc32..3f7e54d46da251398b1fb042e04b196129097373 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -24,6 +24,7 @@ #include "tsdb.h" #include "vnode.h" #include "vnodeInt.h" +#include "vnodeStatus.h" #include "tqueue.h" static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead); @@ -54,7 +55,7 @@ int32_t vnodeProcessRead(void *vparam, SVReadMsg *pRead) { } static int32_t vnodeCheckRead(SVnodeObj *pVnode) { - if (pVnode->status != TAOS_VN_STATUS_READY) { + if (!vnodeInReadyStatus(pVnode)) { vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status], pVnode->refCount, pVnode); return TSDB_CODE_APP_NOT_READY; diff --git a/src/vnode/src/vnodeStatus.c b/src/vnode/src/vnodeStatus.c new file mode 100644 index 0000000000000000000000000000000000000000..4dce0bd9619b624799e333c8824c19960fc9f26d --- /dev/null +++ b/src/vnode/src/vnodeStatus.c @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taosmsg.h" +#include "vnode.h" +#include "vnodeInt.h" +#include "vnodeStatus.h" + +char* vnodeStatus[] = { + "init", + "ready", + "closing", + "updating", + "reset" +}; + +bool vnodeSetInitStatus(SVnodeObj* pVnode) { + pthread_mutex_lock(&pVnode->statusMutex); + pVnode->status = TAOS_VN_STATUS_INIT; + pthread_mutex_unlock(&pVnode->statusMutex); + return true; +} + +bool vnodeSetReadyStatus(SVnodeObj* pVnode) { + bool set = false; + pthread_mutex_lock(&pVnode->statusMutex); + + if (pVnode->status == TAOS_VN_STATUS_INIT || pVnode->status == TAOS_VN_STATUS_READY || + pVnode->status == TAOS_VN_STATUS_UPDATING || pVnode->status == TAOS_VN_STATUS_RESET) { + pVnode->status = TAOS_VN_STATUS_READY; + set = true; + } else { + vDebug("vgId:%d, cannot set status:ready, old:%s", pVnode->vgId, vnodeStatus[pVnode->status]); + } + + pthread_mutex_unlock(&pVnode->statusMutex); + return set; +} + +bool vnodeSetClosingStatus(SVnodeObj* pVnode) { + bool set = false; + pthread_mutex_lock(&pVnode->statusMutex); + + if (pVnode->status == TAOS_VN_STATUS_READY) { + pVnode->status = TAOS_VN_STATUS_CLOSING; + set = true; + } else { + vTrace("vgId:%d, cannot set status:closing, old:%s", pVnode->vgId, vnodeStatus[pVnode->status]); + } + + pthread_mutex_unlock(&pVnode->statusMutex); + return set; +} + +bool vnodeSetUpdatingStatus(SVnodeObj* pVnode) { + bool set = false; + pthread_mutex_lock(&pVnode->statusMutex); + + if (pVnode->status == TAOS_VN_STATUS_READY) { + pVnode->status = TAOS_VN_STATUS_UPDATING; + set = true; + } else { + vDebug("vgId:%d, cannot set status:updating, old:%s", pVnode->vgId, vnodeStatus[pVnode->status]); + } + + pthread_mutex_unlock(&pVnode->statusMutex); + return set; +} + +bool vnodeSetResetStatus(SVnodeObj* pVnode) { + bool set = false; + pthread_mutex_lock(&pVnode->statusMutex); + + if (pVnode->status != TAOS_VN_STATUS_CLOSING && pVnode->status != TAOS_VN_STATUS_INIT) { + pVnode->status = TAOS_VN_STATUS_RESET; + set = true; + } else { + vDebug("vgId:%d, cannot set status:reset, old:%s", pVnode->vgId, vnodeStatus[pVnode->status]); + } + + pthread_mutex_unlock(&pVnode->statusMutex); + return set; +} + +bool vnodeInInitStatus(SVnodeObj* pVnode) { + bool in = false; + pthread_mutex_lock(&pVnode->statusMutex); + + if (pVnode->status == TAOS_VN_STATUS_INIT) { + in = true; + } + + pthread_mutex_unlock(&pVnode->statusMutex); + return in; +} + +bool vnodeInReadyStatus(SVnodeObj* pVnode) { + bool in = false; + pthread_mutex_lock(&pVnode->statusMutex); + + if (pVnode->status == TAOS_VN_STATUS_READY) { + in = true; + } + + pthread_mutex_unlock(&pVnode->statusMutex); + return in; +} + +bool vnodeInClosingStatus(SVnodeObj* pVnode) { + bool in = false; + pthread_mutex_lock(&pVnode->statusMutex); + + if (pVnode->status == TAOS_VN_STATUS_CLOSING) { + in = true; + } + + pthread_mutex_unlock(&pVnode->statusMutex); + return in; +} + +bool vnodeInResetStatus(SVnodeObj* pVnode) { + bool in = false; + pthread_mutex_lock(&pVnode->statusMutex); + + if (pVnode->status == TAOS_VN_STATUS_RESET) { + in = true; + } + + pthread_mutex_unlock(&pVnode->statusMutex); + return in; +} diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 268d1fb53b35f485ebefddf8467062b17d1950d0..91fc6d399897b21c6cfa96eafd3e67333799a210 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -27,6 +27,7 @@ #include "tdataformat.h" #include "vnode.h" #include "vnodeInt.h" +#include "vnodeStatus.h" #include "syncInt.h" #include "tcq.h" #include "dnode.h" @@ -68,7 +69,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara taosMsg[pHead->msgType], qtypeStr[qtype], pHead->version, pVnode->version); if (pHead->version == 0) { // from client or CQ - if (pVnode->status != TAOS_VN_STATUS_READY) { + if (!vnodeInReadyStatus(pVnode)) { vDebug("vgId:%d, msg:%s not processed since vstatus:%d, qtype:%s hver:%" PRIu64, pVnode->vgId, taosMsg[pHead->msgType], pVnode->status, qtypeStr[qtype], pHead->version); return TSDB_CODE_APP_NOT_READY; // it may be in deleting or closing state @@ -118,7 +119,7 @@ static int32_t vnodeCheckWrite(void *vparam) { return TSDB_CODE_APP_NOT_READY; } - if (pVnode->status == TAOS_VN_STATUS_CLOSING) { + if (vnodeInClosingStatus(pVnode)) { vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status], pVnode->refCount, pVnode); return TSDB_CODE_APP_NOT_READY;