提交 b1862f4c 编写于 作者: S Shengliang Guan

TD-2640

上级 e288233c
......@@ -86,6 +86,7 @@ void qDestroyQueryInfo(qinfo_t qHandle);
void* qOpenQueryMgmt(int32_t vgId);
void qQueryMgmtNotifyClosed(void* pExecutor);
void qQueryMgmtReOpen(void *pExecutor);
void qCleanupQueryMgmt(void* pExecutor);
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo);
void** qAcquireQInfo(void* pMgmt, uint64_t key);
......
......@@ -7697,6 +7697,19 @@ void qQueryMgmtNotifyClosed(void* pQMgmt) {
taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn);
}
void qQueryMgmtReOpen(void *pQMgmt) {
if (pQMgmt == NULL) {
return;
}
SQueryMgmt *pQueryMgmt = pQMgmt;
qDebug("vgId:%d, set querymgmt reopen", pQueryMgmt->vgId);
pthread_mutex_lock(&pQueryMgmt->lock);
pQueryMgmt->closed = false;
pthread_mutex_unlock(&pQueryMgmt->lock);
}
void qCleanupQueryMgmt(void* pQMgmt) {
if (pQMgmt == NULL) {
return;
......
......@@ -27,6 +27,7 @@ void vnodeCleanupRead(void);
int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam);
void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead);
int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
void vnodeWaitReadCompleted(void *pVnode);
#ifdef __cplusplus
}
......
......@@ -27,6 +27,7 @@ void vnodeCleanupWrite(void);
int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcMsg);
void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite);
int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet);
void vnodeWaitWriteCompleted(void *pVnode);
#ifdef __cplusplus
}
......
......@@ -88,22 +88,15 @@ void vnodeFreeFromRQueue(void *vparam, SVReadMsg *pRead) {
vnodeRelease(pVnode);
}
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) {
SVnodeObj *pVnode = vparam;
if (qtype == TAOS_QTYPE_RPC || qtype == TAOS_QTYPE_QUERY) {
int32_t code = vnodeCheckRead(pVnode);
if (code != TSDB_CODE_SUCCESS) return code;
}
static SVReadMsg *vnodeBuildVReadMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, int8_t qtype, SRpcMsg *pRpcMsg) {
int32_t size = sizeof(SVReadMsg) + contLen;
SVReadMsg *pRead = taosAllocateQitem(size);
if (pRead == NULL) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
return NULL;
}
if (rparam != NULL) {
SRpcMsg *pRpcMsg = rparam;
if (pRpcMsg != NULL) {
pRead->rpcHandle = pRpcMsg->handle;
pRead->rpcAhandle = pRpcMsg->ahandle;
pRead->msgType = pRpcMsg->msgType;
......@@ -119,13 +112,35 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt
pRead->qtype = qtype;
atomic_add_fetch_32(&pVnode->refCount, 1);
return pRead;
}
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) {
SVReadMsg *pRead = vnodeBuildVReadMsg(vparam, pCont, contLen, qtype, rparam);
if (pRead == NULL) {
assert(terrno != 0);
return terrno;
}
SVnodeObj *pVnode = vparam;
int32_t code = vnodeCheckRead(pVnode);
if (code != TSDB_CODE_SUCCESS) {
taosFreeQitem(pRead);
vnodeRelease(pVnode);
return code;
}
atomic_add_fetch_32(&pVnode->queuedRMsg, 1);
if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) {
vTrace("vgId:%d, write into vfetch queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);
if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) {
vTrace("vgId:%d, write into vfetch queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount,
pVnode->queuedRMsg);
return taosWriteQitem(pVnode->fqueue, qtype, pRead);
} else {
vTrace("vgId:%d, write into vquery queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);
vTrace("vgId:%d, write into vquery queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount,
pVnode->queuedRMsg);
return taosWriteQitem(pVnode->qqueue, qtype, pRead);
}
}
......@@ -420,3 +435,5 @@ int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) {
vTrace("QInfo:%p register qhandle to connect:%p", qhandle, handle);
return rpcReportProgress(handle, (char *)pMsg, sizeof(SRetrieveTableMsg));
}
void vnodeWaitReadCompleted(void *pVnode) {}
\ No newline at end of file
......@@ -120,12 +120,6 @@ static int32_t vnodeCheckWrite(SVnodeObj *pVnode) {
return TSDB_CODE_APP_NOT_READY;
}
if (vnodeInClosingStatus(pVnode)) {
vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY;
}
if (pVnode->isFull) {
vDebug("vgId:%d, vnode is full, refCount:%d", pVnode->vgId, pVnode->refCount);
return TSDB_CODE_VND_IS_FULL;
......@@ -254,6 +248,14 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
}
}
if (!vnodeInReadyStatus(pVnode)) {
vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
pVnode->refCount, pVnode);
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
return TSDB_CODE_APP_NOT_READY;
}
int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
if (queued > MAX_QUEUED_MSG_NUM) {
int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3;
......@@ -337,4 +339,6 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
pWrite->processedCount);
return TSDB_CODE_VND_ACTION_IN_PROGRESS;
}
}
\ No newline at end of file
}
void vnodeWaitWriteCompleted(void *pVnode) {}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册