From afdd343e1e15880a304fd7ee8f93042286216eb6 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Wed, 15 Jul 2020 15:14:32 +0000 Subject: [PATCH] minor changes --- src/vnode/src/vnodeMain.c | 71 ++++++++++++++++++++++++--------------- 1 file changed, 43 insertions(+), 28 deletions(-) diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 29aeaf2b1f..3bf0ca9f55 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -164,18 +164,11 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { // vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS // cfgVersion can be corrected by status msg - if (pVnode->status != TAOS_VN_STATUS_READY) { + if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_UPDATING) != TAOS_VN_STATUS_READY) { vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId); return TSDB_CODE_SUCCESS; } - // the vnode may always fail to synchronize because of it in low cfgVersion - // so cannot use the following codes - // if (pVnode->syncCfg.replica > 1 && pVnode->role == TAOS_SYNC_ROLE_UNSYNCED) - // return TSDB_CODE_VND_NOT_SYNCED; - - pVnode->status = TAOS_VN_STATUS_UPDATING; - int32_t code = vnodeSaveCfg(pVnodeCfg); if (code != TSDB_CODE_SUCCESS) { pVnode->status = TAOS_VN_STATUS_READY; @@ -403,12 +396,6 @@ void *vnodeAcquireVnode(int32_t vgId) { } SVnodeObj *pVnode = *ppVnode; - if (pVnode->status == TAOS_VN_STATUS_RESET) { - terrno = TSDB_CODE_VND_INVALID_STATUS; - vInfo("vgId:%d, status is in reset", vgId); - return NULL; - } - atomic_add_fetch_32(&pVnode->refCount, 1); vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount); @@ -418,12 +405,28 @@ void *vnodeAcquireVnode(int32_t vgId) { void *vnodeAcquireRqueue(int32_t vgId) { SVnodeObj *pVnode = vnodeAcquireVnode(vgId); if (pVnode == NULL) return NULL; + + if (pVnode->status == TAOS_VN_STATUS_RESET) { + terrno = TSDB_CODE_VND_INVALID_STATUS; + vInfo("vgId:%d, status is in reset", vgId); + vnodeRelease(pVnode); + return NULL; + } + return pVnode->rqueue; } void *vnodeAcquireWqueue(int32_t vgId) { SVnodeObj *pVnode = vnodeAcquireVnode(vgId); if (pVnode == NULL) return NULL; + + if (pVnode->status == TAOS_VN_STATUS_RESET) { + terrno = TSDB_CODE_VND_INVALID_STATUS; + vInfo("vgId:%d, status is in reset", vgId); + vnodeRelease(pVnode); + return NULL; + } + return pVnode->wqueue; } @@ -509,8 +512,16 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { static void vnodeCleanUp(SVnodeObj *pVnode) { // remove from hash, so new messages wont be consumed taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); + int i = 0; - pVnode->status = TSDB_VN_STATUS_CLOSING; + if (pVnode->status != TAOS_VN_STATUS_INIT) { + // it may be in updateing or reset state, then it shall wait + while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) != TAOS_VN_STATUS_READY) { + if (++i % 1000 == 0) { + sched_yield(); + } + } + } // stop replication module if (pVnode->sync) { @@ -562,20 +573,17 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { cqStop(pVnode->cq); } -static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { - SVnodeObj *pVnode = ahandle; - vDebug("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion); - - pVnode->fversion = fversion; - pVnode->version = fversion; - vnodeSaveVersion(pVnode); - +static int vnodeResetTsdb(SVnodeObj *pVnode) +{ char rootDir[128] = "\0"; sprintf(rootDir, "%s/tsdb", pVnode->rootDir); if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY) return -1; + void *tsdb = pVnode->tsdb; + pVnode->tsdb = NULL; + // acquire vnode int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); @@ -583,9 +591,6 @@ static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { tsem_wait(&pVnode->sem); // close tsdb, then open tsdb - void *tsdb = pVnode->tsdb; - - pVnode->tsdb = NULL; tsdbCloseRepo(tsdb, 0); STsdbAppH appH = {0}; appH.appH = (void *)pVnode; @@ -596,13 +601,23 @@ static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { appH.configFunc = dnodeSendCfgTableToRecv; pVnode->tsdb = tsdbOpenRepo(rootDir, &appH); - // vnode status may be changed to DELETING or CLOSING - atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_RESET, TAOS_VN_STATUS_READY); + pVnode->status = TAOS_VN_STATUS_READY; vnodeRelease(pVnode); return 0; } +static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { + SVnodeObj *pVnode = ahandle; + vDebug("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion); + + pVnode->fversion = fversion; + pVnode->version = fversion; + vnodeSaveVersion(pVnode); + + return vnodeResetTsdb(pVnode); +} + static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { char cfgFile[TSDB_FILENAME_LEN + 30] = {0}; sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnodeCfg->cfg.vgId); -- GitLab