diff --git a/src/inc/query.h b/src/inc/query.h index 5e1de77889cc469566cc94b729c55622e5462bd6..7342221cb9de1b632ad0f398f2f3a8d27621747a 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -86,6 +86,7 @@ void qDestroyQueryInfo(qinfo_t qHandle); void* qOpenQueryMgmt(int32_t vgId); void qQueryMgmtNotifyClosed(void* pExecutor); +void qQueryMgmtReOpen(void *pExecutor); void qCleanupQueryMgmt(void* pExecutor); void** qRegisterQInfo(void* pMgmt, uint64_t qInfo); void** qAcquireQInfo(void* pMgmt, uint64_t key); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d9630edb9ea6fa94294761bc98f6c17d89d000e7..1f76d479687c987af7be02e7680630e43ce7e147 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -7697,6 +7697,19 @@ void qQueryMgmtNotifyClosed(void* pQMgmt) { taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn); } +void qQueryMgmtReOpen(void *pQMgmt) { + if (pQMgmt == NULL) { + return; + } + + SQueryMgmt *pQueryMgmt = pQMgmt; + qDebug("vgId:%d, set querymgmt reopen", pQueryMgmt->vgId); + + pthread_mutex_lock(&pQueryMgmt->lock); + pQueryMgmt->closed = false; + pthread_mutex_unlock(&pQueryMgmt->lock); +} + void qCleanupQueryMgmt(void* pQMgmt) { if (pQMgmt == NULL) { return; diff --git a/src/vnode/inc/vnodeRead.h b/src/vnode/inc/vnodeRead.h index f2953d79f4d07c3dac821e9a086d86c53647d9c7..f5375d6ab0c19277e2c39a987fd4428fae1885e1 100644 --- a/src/vnode/inc/vnodeRead.h +++ b/src/vnode/inc/vnodeRead.h @@ -27,6 +27,7 @@ void vnodeCleanupRead(void); int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam); void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead); int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead); +void vnodeWaitReadCompleted(void *pVnode); #ifdef __cplusplus } diff --git a/src/vnode/inc/vnodeWrite.h b/src/vnode/inc/vnodeWrite.h index 8b3f0fdb58c8a510bcfc6da3aa36adb85297efca..5238e45b81fc7955e592970fd6634199940470e0 100644 --- a/src/vnode/inc/vnodeWrite.h +++ b/src/vnode/inc/vnodeWrite.h @@ -27,6 +27,7 @@ void vnodeCleanupWrite(void); int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcMsg); void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite); int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet); +void vnodeWaitWriteCompleted(void *pVnode); #ifdef __cplusplus } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index a972ffec1cb5279b191dc1256b85c28e0e5cd9eb..c864bc995bb58e81535111b1f2c7654699578930 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -88,22 +88,15 @@ void vnodeFreeFromRQueue(void *vparam, SVReadMsg *pRead) { vnodeRelease(pVnode); } -int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) { - SVnodeObj *pVnode = vparam; - - if (qtype == TAOS_QTYPE_RPC || qtype == TAOS_QTYPE_QUERY) { - int32_t code = vnodeCheckRead(pVnode); - if (code != TSDB_CODE_SUCCESS) return code; - } - +static SVReadMsg *vnodeBuildVReadMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, int8_t qtype, SRpcMsg *pRpcMsg) { int32_t size = sizeof(SVReadMsg) + contLen; SVReadMsg *pRead = taosAllocateQitem(size); if (pRead == NULL) { - return TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + return NULL; } - if (rparam != NULL) { - SRpcMsg *pRpcMsg = rparam; + if (pRpcMsg != NULL) { pRead->rpcHandle = pRpcMsg->handle; pRead->rpcAhandle = pRpcMsg->ahandle; pRead->msgType = pRpcMsg->msgType; @@ -119,13 +112,35 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt pRead->qtype = qtype; atomic_add_fetch_32(&pVnode->refCount, 1); + + return pRead; +} + +int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) { + SVReadMsg *pRead = vnodeBuildVReadMsg(vparam, pCont, contLen, qtype, rparam); + if (pRead == NULL) { + assert(terrno != 0); + return terrno; + } + + SVnodeObj *pVnode = vparam; + + int32_t code = vnodeCheckRead(pVnode); + if (code != TSDB_CODE_SUCCESS) { + taosFreeQitem(pRead); + vnodeRelease(pVnode); + return code; + } + atomic_add_fetch_32(&pVnode->queuedRMsg, 1); - if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) { - vTrace("vgId:%d, write into vfetch queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg); + if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) { + vTrace("vgId:%d, write into vfetch queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, + pVnode->queuedRMsg); return taosWriteQitem(pVnode->fqueue, qtype, pRead); } else { - vTrace("vgId:%d, write into vquery queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg); + vTrace("vgId:%d, write into vquery queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, + pVnode->queuedRMsg); return taosWriteQitem(pVnode->qqueue, qtype, pRead); } } @@ -420,3 +435,5 @@ int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) { vTrace("QInfo:%p register qhandle to connect:%p", qhandle, handle); return rpcReportProgress(handle, (char *)pMsg, sizeof(SRetrieveTableMsg)); } + +void vnodeWaitReadCompleted(void *pVnode) {} \ No newline at end of file diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 571c7d667adfd285387ab0c11a308dc7472bb0af..b65251508d0667459d1e148c3d2c58a79c0fb215 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -120,12 +120,6 @@ static int32_t vnodeCheckWrite(SVnodeObj *pVnode) { return TSDB_CODE_APP_NOT_READY; } - if (vnodeInClosingStatus(pVnode)) { - vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status], - pVnode->refCount, pVnode); - return TSDB_CODE_APP_NOT_READY; - } - if (pVnode->isFull) { vDebug("vgId:%d, vnode is full, refCount:%d", pVnode->vgId, pVnode->refCount); return TSDB_CODE_VND_IS_FULL; @@ -254,6 +248,14 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) { } } + if (!vnodeInReadyStatus(pVnode)) { + vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status], + pVnode->refCount, pVnode); + taosFreeQitem(pWrite); + vnodeRelease(pVnode); + return TSDB_CODE_APP_NOT_READY; + } + int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1); if (queued > MAX_QUEUED_MSG_NUM) { int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3; @@ -337,4 +339,6 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { pWrite->processedCount); return TSDB_CODE_VND_ACTION_IN_PROGRESS; } -} \ No newline at end of file +} + +void vnodeWaitWriteCompleted(void *pVnode) {} \ No newline at end of file