diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 86a222b3e6af1109df949657fed54f8e0f7653d7..8e1696c80207de54b57c9c497e589d2b7b6137ac 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -399,7 +399,7 @@ static void* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { SMDCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg); - void *pVnode = vnodeAcquireVnode(pCreate->cfg.vgId); + void *pVnode = vnodeAcquire(pCreate->cfg.vgId); if (pVnode != NULL) { dDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId); vnodeRelease(pVnode); @@ -413,7 +413,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { SMDAlterVnodeMsg *pAlter = dnodeParseVnodeMsg(rpcMsg); - void *pVnode = vnodeAcquireVnode(pAlter->cfg.vgId); + void *pVnode = vnodeAcquire(pAlter->cfg.vgId); if (pVnode != NULL) { dDebug("vgId:%d, alter vnode msg is received", pAlter->cfg.vgId); int32_t code = vnodeAlter(pVnode, pAlter); diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 66135a93e9e6c824ce5219a4f5ef65b230bbce95..bbea1a5e0bb52a56cfbf5a23dd29a17e094586a9 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -91,23 +91,21 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { int32_t queuedMsgNum = 0; int32_t leftLen = pMsg->contLen; char *pCont = (char *) pMsg->pCont; - void *pVnode; while (leftLen > 0) { SMsgHead *pHead = (SMsgHead *) pCont; pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); - pVnode = vnodeAcquireVnode(pHead->vgId); + taos_queue queue = vnodeAcquireRqueue(pHead->vgId); - if (pVnode == NULL) { + if (queue == NULL) { leftLen -= pHead->contLen; pCont -= pHead->contLen; continue; } // put message into queue - taos_queue queue = vnodeGetRqueue(pVnode); SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); pRead->rpcMsg = *pMsg; pRead->pCont = pCont; @@ -175,18 +173,6 @@ void dnodeFreeVnodeRqueue(void *rqueue) { // dynamically adjust the number of threads } -void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle) { - SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); - pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; - pRead->pCont = qhandle; - pRead->contLen = 0; - - assert(pVnode != NULL); - taos_queue queue = vnodeAcquireRqueue(pVnode); - - taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead); -} - void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { SRpcMsg rpcRsp = { .handle = pRead->rpcMsg.handle, diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index ba36e537a6c2b7a6327ef17ed0be83b880009310..dc09a03e1497cb52718279b542e3318fb2789d08 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -104,7 +104,7 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) { pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); - taos_queue queue = vnodeGetWqueue(pHead->vgId); + taos_queue queue = vnodeAcquireWqueue(pHead->vgId); if (queue) { // put message into queue SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg)); diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 096aae58f2ad9cb157ba5b700581bdf52a23f6eb..b561c407a3415d7db27333d96e21a72d4f159d8b 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -53,7 +53,6 @@ void *dnodeAllocateVnodeWqueue(void *pVnode); void dnodeFreeVnodeWqueue(void *queue); void *dnodeAllocateVnodeRqueue(void *pVnode); void dnodeFreeVnodeRqueue(void *rqueue); -void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle); void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code); int32_t dnodeAllocateMnodePqueue(); diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 972db294f62a35ba7910576534f4291c021b6272..65b91d87e479abd4c0b497cf4b95bc561db8fae5 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -79,7 +79,7 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code); typedef void (*FNotifyRole)(void *ahandle, int8_t role); // when data file is synced successfully, notity app -typedef void (*FNotifyFileSynced)(void *ahandle, uint64_t fversion); +typedef int (*FNotifyFileSynced)(void *ahandle, uint64_t fversion); typedef struct { int32_t vgId; // vgroup ID diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 77c72c24516e17fcac81d648a10061537b38249f..15ddb6afee7c3ac2914df8133df24f6ef80a0a8a 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -22,10 +22,10 @@ extern "C" { typedef enum _VN_STATUS { TAOS_VN_STATUS_INIT, - TAOS_VN_STATUS_UPDATING, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING, - TAOS_VN_STATUS_DELETING, + TAOS_VN_STATUS_UPDATING, + TAOS_VN_STATUS_RESET, } EVnStatus; typedef struct { @@ -47,13 +47,10 @@ int32_t vnodeOpen(int32_t vgId, char *rootDir); int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeClose(int32_t vgId); -void vnodeRelease(void *pVnode); -void* vnodeAcquireVnode(int32_t vgId); // add refcount -void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged - -void* vnodeAcquireRqueue(void *); -void* vnodeGetRqueue(void *); -void* vnodeGetWqueue(int32_t vgId); +void* vnodeAcquire(int32_t vgId); // add refcount +void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue +void* vnodeAcquireWqueue(int32_t vgId); // add recCount, get write queue +void vnodeRelease(void *pVnode); // dec refCount void* vnodeGetWal(void *pVnode); int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item); diff --git a/src/util/src/tsocket.c b/src/util/src/tsocket.c index b225dfa36a4572ff3f56649e22ffdc81879d4d3c..5de61a3d57a0cc480781ec581bdc76d54419bf87 100644 --- a/src/util/src/tsocket.c +++ b/src/util/src/tsocket.c @@ -270,6 +270,14 @@ int taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientI return -1; } + /* set REUSEADDR option, so the portnumber can be re-used */ + int reuse = 1; + if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { + uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno)); + close(sockFd); + return -1; + }; + if ( clientIp != 0) { memset((char *)&clientAddr, 0, sizeof(clientAddr)); clientAddr.sin_family = AF_INET; diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 77db4fd04c0f18569aec21bd459b4342a6ab568c..74cfbf1e731fd67c47f7375198a8353740429060 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -37,7 +37,7 @@ extern int32_t vDebugFlag; typedef struct { int32_t vgId; // global vnode group ID int32_t refCount; // reference count - int status; + int8_t status; int8_t role; int8_t accessState; int64_t version; // current version @@ -55,6 +55,8 @@ typedef struct { SWalCfg walCfg; void *qMgmt; char *rootDir; + tsem_t sem; + int8_t dropped; char db[TSDB_DB_NAME_LEN]; } SVnodeObj; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 539f9c68511a96369a707265bb1a7dad462f44d2..bf98824570cff209102c6a0302ae03a82f83ed96 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -44,7 +44,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static void vnodeNotifyRole(void *ahandle, int8_t role); -static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); +static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); #ifndef _SYNC tsync_h syncStart(const SSyncInfo *info) { return NULL; } @@ -153,7 +153,7 @@ int32_t vnodeDrop(int32_t vgId) { SVnodeObj *pVnode = *ppVnode; vTrace("vgId:%d, vnode will be dropped, refCount:%d", pVnode->vgId, pVnode->refCount); - pVnode->status = TAOS_VN_STATUS_DELETING; + pVnode->dropped = 1; vnodeCleanUp(pVnode); return TSDB_CODE_SUCCESS; @@ -164,18 +164,11 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { // vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS // cfgVersion can be corrected by status msg - if (pVnode->status != TAOS_VN_STATUS_READY) { + if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_UPDATING) != TAOS_VN_STATUS_READY) { vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId); return TSDB_CODE_SUCCESS; } - // the vnode may always fail to synchronize because of it in low cfgVersion - // so cannot use the following codes - // if (pVnode->syncCfg.replica > 1 && pVnode->role == TAOS_SYNC_ROLE_UNSYNCED) - // return TSDB_CODE_VND_NOT_SYNCED; - - pVnode->status = TAOS_VN_STATUS_UPDATING; - int32_t code = vnodeSaveCfg(pVnodeCfg); if (code != TSDB_CODE_SUCCESS) { pVnode->status = TAOS_VN_STATUS_READY; @@ -194,10 +187,12 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { return code; } - code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg); - if (code != TSDB_CODE_SUCCESS) { - pVnode->status = TAOS_VN_STATUS_READY; - return code; + if (pVnode->tsdb) { + code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg); + if (code != TSDB_CODE_SUCCESS) { + pVnode->status = TAOS_VN_STATUS_READY; + return code; + } } pVnode->status = TAOS_VN_STATUS_READY; @@ -223,6 +218,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->tsdbCfg.tsdbId = pVnode->vgId; pVnode->rootDir = strdup(rootDir); pVnode->accessState = TSDB_VN_ALL_ACCCESS; + tsem_init(&pVnode->sem, 0, 0); int32_t code = vnodeReadCfg(pVnode); if (code != TSDB_CODE_SUCCESS) { @@ -319,7 +315,6 @@ int32_t vnodeClose(int32_t vgId) { SVnodeObj *pVnode = *ppVnode; vDebug("vgId:%d, vnode will be closed", pVnode->vgId); - pVnode->status = TAOS_VN_STATUS_CLOSING; vnodeCleanUp(pVnode); return 0; @@ -334,6 +329,8 @@ void vnodeRelease(void *pVnodeRaw) { if (refCount > 0) { vDebug("vgId:%d, release vnode, refCount:%d", vgId, refCount); + if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2) + tsem_post(&pVnode->sem); return; } @@ -344,11 +341,6 @@ void vnodeRelease(void *pVnodeRaw) { tsdbCloseRepo(pVnode->tsdb, 1); pVnode->tsdb = NULL; - // stop continuous query - if (pVnode->cq) - cqClose(pVnode->cq); - pVnode->cq = NULL; - if (pVnode->wal) walClose(pVnode->wal); pVnode->wal = NULL; @@ -363,20 +355,21 @@ void vnodeRelease(void *pVnodeRaw) { tfree(pVnode->rootDir); - if (pVnode->status == TAOS_VN_STATUS_DELETING) { + if (pVnode->dropped) { char rootDir[TSDB_FILENAME_LEN] = {0}; sprintf(rootDir, "%s/vnode%d", tsVnodeDir, vgId); taosMvDir(tsVnodeBakDir, rootDir); taosRemoveDir(rootDir); } + tsem_destroy(&pVnode->sem); free(pVnode); int32_t count = taosHashGetSize(tsDnodeVnodesHash); vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count); } -void *vnodeGetVnode(int32_t vgId) { +void *vnodeAcquire(int32_t vgId) { SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); if (ppVnode == NULL || *ppVnode == NULL) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; @@ -384,35 +377,38 @@ void *vnodeGetVnode(int32_t vgId) { return NULL; } - return *ppVnode; -} - -void *vnodeAcquireVnode(int32_t vgId) { - SVnodeObj *pVnode = vnodeGetVnode(vgId); - if (pVnode == NULL) return pVnode; - + SVnodeObj *pVnode = *ppVnode; atomic_add_fetch_32(&pVnode->refCount, 1); vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount); return pVnode; } -void *vnodeAcquireRqueue(void *param) { - SVnodeObj *pVnode = param; +void *vnodeAcquireRqueue(int32_t vgId) { + SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) return NULL; - atomic_add_fetch_32(&pVnode->refCount, 1); - vDebug("vgId:%d, get vnode rqueue, refCount:%d", pVnode->vgId, pVnode->refCount); - return ((SVnodeObj *)pVnode)->rqueue; -} + if (pVnode->status == TAOS_VN_STATUS_RESET) { + terrno = TSDB_CODE_VND_INVALID_STATUS; + vInfo("vgId:%d, status is in reset", vgId); + vnodeRelease(pVnode); + return NULL; + } -void *vnodeGetRqueue(void *pVnode) { - return ((SVnodeObj *)pVnode)->rqueue; + return pVnode->rqueue; } -void *vnodeGetWqueue(int32_t vgId) { - SVnodeObj *pVnode = vnodeAcquireVnode(vgId); +void *vnodeAcquireWqueue(int32_t vgId) { + SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) return NULL; + + if (pVnode->status == TAOS_VN_STATUS_RESET) { + terrno = TSDB_CODE_VND_INVALID_STATUS; + vInfo("vgId:%d, status is in reset", vgId); + vnodeRelease(pVnode); + return NULL; + } + return pVnode->wqueue; } @@ -484,7 +480,7 @@ void vnodeBuildStatusMsg(void *param) { void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { for (int32_t i = 0; i < numOfVnodes; ++i) { pAccess[i].vgId = htonl(pAccess[i].vgId); - SVnodeObj *pVnode = vnodeAcquireVnode(pAccess[i].vgId); + SVnodeObj *pVnode = vnodeAcquire(pAccess[i].vgId); if (pVnode != NULL) { pVnode->accessState = pAccess[i].accessState; if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) { @@ -498,11 +494,29 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { static void vnodeCleanUp(SVnodeObj *pVnode) { // remove from hash, so new messages wont be consumed taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); + int i = 0; + + if (pVnode->status != TAOS_VN_STATUS_INIT) { + // it may be in updateing or reset state, then it shall wait + while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) != TAOS_VN_STATUS_READY) { + if (++i % 1000 == 0) { + sched_yield(); + } + } + } // stop replication module if (pVnode->sync) { - syncStop(pVnode->sync); + void *sync = pVnode->sync; pVnode->sync = NULL; + syncStop(sync); + } + + // stop continuous query + if (pVnode->cq) { + void *cq = pVnode->cq; + pVnode->cq = NULL; + cqClose(cq); } vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount); @@ -549,18 +563,25 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { cqStop(pVnode->cq); } -static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { - SVnodeObj *pVnode = ahandle; - vDebug("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion); - - pVnode->fversion = fversion; - pVnode->version = fversion; - vnodeSaveVersion(pVnode); - +static int vnodeResetTsdb(SVnodeObj *pVnode) +{ char rootDir[128] = "\0"; sprintf(rootDir, "%s/tsdb", pVnode->rootDir); - // clsoe tsdb, then open tsdb - tsdbCloseRepo(pVnode->tsdb, 0); + + if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY) + return -1; + + void *tsdb = pVnode->tsdb; + pVnode->tsdb = NULL; + + // acquire vnode + int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + + if (refCount > 2) + tsem_wait(&pVnode->sem); + + // close tsdb, then open tsdb + tsdbCloseRepo(tsdb, 0); STsdbAppH appH = {0}; appH.appH = (void *)pVnode; appH.notifyStatus = vnodeProcessTsdbStatus; @@ -569,6 +590,22 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { appH.cqDropFunc = cqDrop; appH.configFunc = dnodeSendCfgTableToRecv; pVnode->tsdb = tsdbOpenRepo(rootDir, &appH); + + pVnode->status = TAOS_VN_STATUS_READY; + vnodeRelease(pVnode); + + return 0; +} + +static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { + SVnodeObj *pVnode = ahandle; + vDebug("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion); + + pVnode->fversion = fversion; + pVnode->version = fversion; + vnodeSaveVersion(pVnode); + + return vnodeResetTsdb(pVnode); } static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index f529b713cf659d923e64ba8bf49798a0dde0f195..973df7c5a10f2ba84a77718602c8ffddd4b14134 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -26,6 +26,7 @@ #include "tsdb.h" #include "vnode.h" #include "vnodeInt.h" +#include "tqueue.h" static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); @@ -51,6 +52,11 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { return TSDB_CODE_VND_INVALID_STATUS; } + // tsdb may be in reset state + if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY; + if (pVnode->status == TAOS_VN_STATUS_CLOSING) + return TSDB_CODE_RPC_NOT_READY; + // TODO: Later, let slave to support query if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role); @@ -60,6 +66,16 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); } +static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) { + SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); + pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; + pRead->pCont = qhandle; + pRead->contLen = 0; + + atomic_add_fetch_32(&pVnode->refCount, 1); + taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); +} + static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { void *pCont = pReadMsg->pCont; int32_t contLen = pReadMsg->contLen; @@ -131,7 +147,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (handle != NULL) { vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle); - dnodePutItemIntoReadQueue(pVnode, *handle); + vnodePutItemIntoReadQueue(pVnode, *handle); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); } @@ -208,7 +224,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { } else { // if failed to dump result, free qhandle immediately if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len)) == TSDB_CODE_SUCCESS) { if (qHasMoreResultsToRetrieve(*handle)) { - dnodePutItemIntoReadQueue(pVnode, *handle); + vnodePutItemIntoReadQueue(pVnode, *handle); pRet->qhandle = *handle; freeHandle = false; } else { diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 09e4b43ed38091247da0a51a56a41c52f2bb010e..6b9b8ca4fd5a5c42028936e48b53f45ffa811903 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -59,13 +59,18 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { return TSDB_CODE_VND_NO_WRITE_AUTH; } + // tsdb may be in reset state + if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY; + if (pVnode->status == TAOS_VN_STATUS_CLOSING) + return TSDB_CODE_RPC_NOT_READY; + if (pHead->version == 0) { // from client or CQ if (pVnode->status != TAOS_VN_STATUS_READY) { vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->status); return TSDB_CODE_VND_INVALID_STATUS; // it may be in deleting or closing state } - if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { + if (pVnode->role != TAOS_SYNC_ROLE_MASTER) { vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role); return TSDB_CODE_RPC_NOT_READY; }