diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 66135a93e9e6c824ce5219a4f5ef65b230bbce95..bbea1a5e0bb52a56cfbf5a23dd29a17e094586a9 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -91,23 +91,21 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { int32_t queuedMsgNum = 0; int32_t leftLen = pMsg->contLen; char *pCont = (char *) pMsg->pCont; - void *pVnode; while (leftLen > 0) { SMsgHead *pHead = (SMsgHead *) pCont; pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); - pVnode = vnodeAcquireVnode(pHead->vgId); + taos_queue queue = vnodeAcquireRqueue(pHead->vgId); - if (pVnode == NULL) { + if (queue == NULL) { leftLen -= pHead->contLen; pCont -= pHead->contLen; continue; } // put message into queue - taos_queue queue = vnodeGetRqueue(pVnode); SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); pRead->rpcMsg = *pMsg; pRead->pCont = pCont; @@ -175,18 +173,6 @@ void dnodeFreeVnodeRqueue(void *rqueue) { // dynamically adjust the number of threads } -void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle) { - SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); - pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; - pRead->pCont = qhandle; - pRead->contLen = 0; - - assert(pVnode != NULL); - taos_queue queue = vnodeAcquireRqueue(pVnode); - - taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead); -} - void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { SRpcMsg rpcRsp = { .handle = pRead->rpcMsg.handle, diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index ba36e537a6c2b7a6327ef17ed0be83b880009310..dc09a03e1497cb52718279b542e3318fb2789d08 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -104,7 +104,7 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) { pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); - taos_queue queue = vnodeGetWqueue(pHead->vgId); + taos_queue queue = vnodeAcquireWqueue(pHead->vgId); if (queue) { // put message into queue SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg)); diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 096aae58f2ad9cb157ba5b700581bdf52a23f6eb..b561c407a3415d7db27333d96e21a72d4f159d8b 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -53,7 +53,6 @@ void *dnodeAllocateVnodeWqueue(void *pVnode); void dnodeFreeVnodeWqueue(void *queue); void *dnodeAllocateVnodeRqueue(void *pVnode); void dnodeFreeVnodeRqueue(void *rqueue); -void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle); void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code); int32_t dnodeAllocateMnodePqueue(); diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 972db294f62a35ba7910576534f4291c021b6272..65b91d87e479abd4c0b497cf4b95bc561db8fae5 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -79,7 +79,7 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code); typedef void (*FNotifyRole)(void *ahandle, int8_t role); // when data file is synced successfully, notity app -typedef void (*FNotifyFileSynced)(void *ahandle, uint64_t fversion); +typedef int (*FNotifyFileSynced)(void *ahandle, uint64_t fversion); typedef struct { int32_t vgId; // vgroup ID diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 1e6cfa97006e110f5ce1e36073f165540adeda26..d25953ec686da8519e88ee1840001e254f9d628c 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -23,6 +23,7 @@ extern "C" { typedef enum _VN_STATUS { TAOS_VN_STATUS_INIT, TAOS_VN_STATUS_UPDATING, + TAOS_VN_STATUS_RESET, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING, TAOS_VN_STATUS_DELETING, @@ -48,13 +49,11 @@ int32_t vnodeStartStream(int32_t vgId); int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeClose(int32_t vgId); -void vnodeRelease(void *pVnode); void* vnodeAcquireVnode(int32_t vgId); // add refcount -void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged +void vnodeRelease(void *pVnode); -void* vnodeAcquireRqueue(void *); -void* vnodeGetRqueue(void *); -void* vnodeGetWqueue(int32_t vgId); +void* vnodeAcquireRqueue(int32_t vgId); +void* vnodeAcquireWqueue(int32_t vgId); void* vnodeGetWal(void *pVnode); int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item); diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 4f22c7784d535b4190bf9b27e76f9ceed684211c..2e6d52106cfbddc5189cfe0053662fd2e2e38916 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -37,7 +37,7 @@ extern int32_t vDebugFlag; typedef struct { int32_t vgId; // global vnode group ID int32_t refCount; // reference count - int status; + int8_t status; int8_t role; int8_t accessState; int64_t version; // current version @@ -55,6 +55,7 @@ typedef struct { SWalCfg walCfg; void *qMgmt; char *rootDir; + tsem_t sem; char db[TSDB_DB_NAME_LEN]; } SVnodeObj; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 9cff420a06a6fb23b4adce485ca8b3cba277b4e0..1c771f07ac2492ec55f85934c9954404b4f08c35 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -44,7 +44,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static void vnodeNotifyRole(void *ahandle, int8_t role); -static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); +static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); #ifndef _SYNC tsync_h syncStart(const SSyncInfo *info) { return NULL; } @@ -224,6 +224,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->tsdbCfg.tsdbId = pVnode->vgId; pVnode->rootDir = strdup(rootDir); pVnode->accessState = TSDB_VN_ALL_ACCCESS; + tsem_init(&pVnode->sem, 0, 0); int32_t code = vnodeReadCfg(pVnode); if (code != TSDB_CODE_SUCCESS) { @@ -348,6 +349,8 @@ void vnodeRelease(void *pVnodeRaw) { if (refCount > 0) { vDebug("vgId:%d, release vnode, refCount:%d", vgId, refCount); + if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2) + tsem_post(&pVnode->sem); return; } @@ -384,13 +387,14 @@ void vnodeRelease(void *pVnodeRaw) { taosRemoveDir(rootDir); } + tsem_destroy(&pVnode->sem); free(pVnode); int32_t count = taosHashGetSize(tsDnodeVnodesHash); vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count); } -void *vnodeGetVnode(int32_t vgId) { +void *vnodeAcquireVnode(int32_t vgId) { SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); if (ppVnode == NULL || *ppVnode == NULL) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; @@ -398,12 +402,12 @@ void *vnodeGetVnode(int32_t vgId) { return NULL; } - return *ppVnode; -} - -void *vnodeAcquireVnode(int32_t vgId) { - SVnodeObj *pVnode = vnodeGetVnode(vgId); - if (pVnode == NULL) return pVnode; + SVnodeObj *pVnode = *ppVnode; + if (pVnode->status == TAOS_VN_STATUS_RESET) { + terrno = TSDB_CODE_VND_INVALID_STATUS; + vInfo("vgId:%d, status is in reset", vgId); + return NULL; + } atomic_add_fetch_32(&pVnode->refCount, 1); vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount); @@ -411,20 +415,13 @@ void *vnodeAcquireVnode(int32_t vgId) { return pVnode; } -void *vnodeAcquireRqueue(void *param) { - SVnodeObj *pVnode = param; +void *vnodeAcquireRqueue(int32_t vgId) { + SVnodeObj *pVnode = vnodeAcquireVnode(vgId); if (pVnode == NULL) return NULL; - - atomic_add_fetch_32(&pVnode->refCount, 1); - vDebug("vgId:%d, get vnode rqueue, refCount:%d", pVnode->vgId, pVnode->refCount); - return ((SVnodeObj *)pVnode)->rqueue; + return pVnode->rqueue; } -void *vnodeGetRqueue(void *pVnode) { - return ((SVnodeObj *)pVnode)->rqueue; -} - -void *vnodeGetWqueue(int32_t vgId) { +void *vnodeAcquireWqueue(int32_t vgId) { SVnodeObj *pVnode = vnodeAcquireVnode(vgId); if (pVnode == NULL) return NULL; return pVnode->wqueue; @@ -563,7 +560,7 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { cqStop(pVnode->cq); } -static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { +static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { SVnodeObj *pVnode = ahandle; vDebug("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion); @@ -573,8 +570,19 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { char rootDir[128] = "\0"; sprintf(rootDir, "%s/tsdb", pVnode->rootDir); - // clsoe tsdb, then open tsdb + + if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY) + return -1; + + // acquire vnode + int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + + if (refCount > 2) + tsem_wait(&pVnode->sem); + + // close tsdb, then open tsdb void *tsdb = pVnode->tsdb; + pVnode->tsdb = NULL; tsdbCloseRepo(tsdb, 0); STsdbAppH appH = {0}; @@ -585,6 +593,12 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { appH.cqDropFunc = cqDrop; appH.configFunc = dnodeSendCfgTableToRecv; pVnode->tsdb = tsdbOpenRepo(rootDir, &appH); + + // vnode status may be changed to DELETING or CLOSING + atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_RESET, TAOS_VN_STATUS_READY); + vnodeRelease(pVnode); + + return 0; } static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index f529b713cf659d923e64ba8bf49798a0dde0f195..fb27da4be5aee46f11941cfd6fe47d871c06adf0 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -26,6 +26,7 @@ #include "tsdb.h" #include "vnode.h" #include "vnodeInt.h" +#include "tqueue.h" static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); @@ -51,6 +52,11 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { return TSDB_CODE_VND_INVALID_STATUS; } + // tsdb may be in reset state + if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY; + if (pVnode->status == TAOS_VN_STATUS_CLOSING || pVnode->status == TAOS_VN_STATUS_DELETING) + return TSDB_CODE_RPC_NOT_READY; + // TODO: Later, let slave to support query if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role); @@ -60,6 +66,16 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); } +static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) { + SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); + pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; + pRead->pCont = qhandle; + pRead->contLen = 0; + + atomic_add_fetch_32(&pVnode->refCount, 1); + taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); +} + static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { void *pCont = pReadMsg->pCont; int32_t contLen = pReadMsg->contLen; @@ -131,7 +147,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (handle != NULL) { vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle); - dnodePutItemIntoReadQueue(pVnode, *handle); + vnodePutItemIntoReadQueue(pVnode, *handle); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); } @@ -208,7 +224,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { } else { // if failed to dump result, free qhandle immediately if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len)) == TSDB_CODE_SUCCESS) { if (qHasMoreResultsToRetrieve(*handle)) { - dnodePutItemIntoReadQueue(pVnode, *handle); + vnodePutItemIntoReadQueue(pVnode, *handle); pRet->qhandle = *handle; freeHandle = false; } else { diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 540cd9da6edfdccc5a91e14131d27c0f0d58b306..e2b502a3a486f0f231990758aa7174daada800dc 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -60,7 +60,9 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { } // tsdb may be in reset state - if (pVnode->tsdb == NULL) return 0; + if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY; + if (pVnode->status == TAOS_VN_STATUS_CLOSING || pVnode->status == TAOS_VN_STATUS_DELETING) + return TSDB_CODE_RPC_NOT_READY; if (pHead->version == 0) { // from client or CQ if (pVnode->status != TAOS_VN_STATUS_READY) {