未验证 提交 054befc7 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4802 from taosdata/feature/sim

TD-2640
...@@ -86,6 +86,7 @@ void qDestroyQueryInfo(qinfo_t qHandle); ...@@ -86,6 +86,7 @@ void qDestroyQueryInfo(qinfo_t qHandle);
void* qOpenQueryMgmt(int32_t vgId); void* qOpenQueryMgmt(int32_t vgId);
void qQueryMgmtNotifyClosed(void* pExecutor); void qQueryMgmtNotifyClosed(void* pExecutor);
void qQueryMgmtReOpen(void *pExecutor);
void qCleanupQueryMgmt(void* pExecutor); void qCleanupQueryMgmt(void* pExecutor);
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo); void** qRegisterQInfo(void* pMgmt, uint64_t qInfo);
void** qAcquireQInfo(void* pMgmt, uint64_t key); void** qAcquireQInfo(void* pMgmt, uint64_t key);
......
...@@ -7635,6 +7635,19 @@ void qQueryMgmtNotifyClosed(void* pQMgmt) { ...@@ -7635,6 +7635,19 @@ void qQueryMgmtNotifyClosed(void* pQMgmt) {
taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn); taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn);
} }
void qQueryMgmtReOpen(void *pQMgmt) {
if (pQMgmt == NULL) {
return;
}
SQueryMgmt *pQueryMgmt = pQMgmt;
qDebug("vgId:%d, set querymgmt reopen", pQueryMgmt->vgId);
pthread_mutex_lock(&pQueryMgmt->lock);
pQueryMgmt->closed = false;
pthread_mutex_unlock(&pQueryMgmt->lock);
}
void qCleanupQueryMgmt(void* pQMgmt) { void qCleanupQueryMgmt(void* pQMgmt) {
if (pQMgmt == NULL) { if (pQMgmt == NULL) {
return; return;
......
...@@ -27,6 +27,7 @@ void vnodeCleanupRead(void); ...@@ -27,6 +27,7 @@ void vnodeCleanupRead(void);
int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam); int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam);
void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead); void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead);
int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead); int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
void vnodeWaitReadCompleted(void *pVnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -27,6 +27,7 @@ void vnodeCleanupWrite(void); ...@@ -27,6 +27,7 @@ void vnodeCleanupWrite(void);
int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcMsg); int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcMsg);
void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite); void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite);
int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet); int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet);
void vnodeWaitWriteCompleted(void *pVnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -153,6 +153,11 @@ static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) { ...@@ -153,6 +153,11 @@ static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) {
int32_t vnodeAlter(void *vparam, SCreateVnodeMsg *pVnodeCfg) { int32_t vnodeAlter(void *vparam, SCreateVnodeMsg *pVnodeCfg) {
SVnodeObj *pVnode = vparam; SVnodeObj *pVnode = vparam;
if (pVnode->dbCfgVersion == pVnodeCfg->cfg.dbCfgVersion && pVnode->vgCfgVersion == pVnodeCfg->cfg.vgCfgVersion) {
vDebug("vgId:%d, dbCfgVersion:%d and vgCfgVersion:%d not change", pVnode->vgId, pVnode->dbCfgVersion,
pVnode->vgCfgVersion);
return TSDB_CODE_SUCCESS;
}
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS // vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
// dbCfgVersion can be corrected by status msg // dbCfgVersion can be corrected by status msg
...@@ -411,15 +416,12 @@ void vnodeDestroy(SVnodeObj *pVnode) { ...@@ -411,15 +416,12 @@ void vnodeDestroy(SVnodeObj *pVnode) {
} }
void vnodeCleanUp(SVnodeObj *pVnode) { void vnodeCleanUp(SVnodeObj *pVnode) {
if (!vnodeInInitStatus(pVnode)) { vDebug("vgId:%d, vnode will cleanup, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
// it may be in updateing or reset state, then it shall wait
int32_t i = 0; vnodeSetClosingStatus(pVnode);
while (!vnodeSetClosingStatus(pVnode)) {
if (++i % 1000 == 0) { // release local resources only after cutting off outside connections
sched_yield(); qQueryMgmtNotifyClosed(pVnode->qMgmt);
}
}
}
// stop replication module // stop replication module
if (pVnode->sync > 0) { if (pVnode->sync > 0) {
...@@ -428,10 +430,7 @@ void vnodeCleanUp(SVnodeObj *pVnode) { ...@@ -428,10 +430,7 @@ void vnodeCleanUp(SVnodeObj *pVnode) {
syncStop(sync); syncStop(sync);
} }
vDebug("vgId:%d, vnode will cleanup, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); vDebug("vgId:%d, vnode is cleaned, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
// release local resources only after cutting off outside connections
qQueryMgmtNotifyClosed(pVnode->qMgmt);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
......
...@@ -88,22 +88,15 @@ void vnodeFreeFromRQueue(void *vparam, SVReadMsg *pRead) { ...@@ -88,22 +88,15 @@ void vnodeFreeFromRQueue(void *vparam, SVReadMsg *pRead) {
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) { static SVReadMsg *vnodeBuildVReadMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, int8_t qtype, SRpcMsg *pRpcMsg) {
SVnodeObj *pVnode = vparam;
if (qtype == TAOS_QTYPE_RPC || qtype == TAOS_QTYPE_QUERY) {
int32_t code = vnodeCheckRead(pVnode);
if (code != TSDB_CODE_SUCCESS) return code;
}
int32_t size = sizeof(SVReadMsg) + contLen; int32_t size = sizeof(SVReadMsg) + contLen;
SVReadMsg *pRead = taosAllocateQitem(size); SVReadMsg *pRead = taosAllocateQitem(size);
if (pRead == NULL) { if (pRead == NULL) {
return TSDB_CODE_VND_OUT_OF_MEMORY; terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
return NULL;
} }
if (rparam != NULL) { if (pRpcMsg != NULL) {
SRpcMsg *pRpcMsg = rparam;
pRead->rpcHandle = pRpcMsg->handle; pRead->rpcHandle = pRpcMsg->handle;
pRead->rpcAhandle = pRpcMsg->ahandle; pRead->rpcAhandle = pRpcMsg->ahandle;
pRead->msgType = pRpcMsg->msgType; pRead->msgType = pRpcMsg->msgType;
...@@ -119,13 +112,35 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt ...@@ -119,13 +112,35 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt
pRead->qtype = qtype; pRead->qtype = qtype;
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
return pRead;
}
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) {
SVReadMsg *pRead = vnodeBuildVReadMsg(vparam, pCont, contLen, qtype, rparam);
if (pRead == NULL) {
assert(terrno != 0);
return terrno;
}
SVnodeObj *pVnode = vparam;
int32_t code = vnodeCheckRead(pVnode);
if (code != TSDB_CODE_SUCCESS) {
taosFreeQitem(pRead);
vnodeRelease(pVnode);
return code;
}
atomic_add_fetch_32(&pVnode->queuedRMsg, 1); atomic_add_fetch_32(&pVnode->queuedRMsg, 1);
if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) { if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) {
vTrace("vgId:%d, write into vfetch queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg); vTrace("vgId:%d, write into vfetch queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount,
pVnode->queuedRMsg);
return taosWriteQitem(pVnode->fqueue, qtype, pRead); return taosWriteQitem(pVnode->fqueue, qtype, pRead);
} else { } else {
vTrace("vgId:%d, write into vquery queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg); vTrace("vgId:%d, write into vquery queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount,
pVnode->queuedRMsg);
return taosWriteQitem(pVnode->qqueue, qtype, pRead); return taosWriteQitem(pVnode->qqueue, qtype, pRead);
} }
} }
...@@ -420,3 +435,5 @@ int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) { ...@@ -420,3 +435,5 @@ int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) {
vTrace("QInfo:%p register qhandle to connect:%p", qhandle, handle); vTrace("QInfo:%p register qhandle to connect:%p", qhandle, handle);
return rpcReportProgress(handle, (char *)pMsg, sizeof(SRetrieveTableMsg)); return rpcReportProgress(handle, (char *)pMsg, sizeof(SRetrieveTableMsg));
} }
void vnodeWaitReadCompleted(void *pVnode) {}
\ No newline at end of file
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taosmsg.h"
#include "query.h"
#include "vnodeStatus.h" #include "vnodeStatus.h"
char* vnodeStatus[] = { char* vnodeStatus[] = {
...@@ -44,11 +46,13 @@ bool vnodeSetReadyStatus(SVnodeObj* pVnode) { ...@@ -44,11 +46,13 @@ bool vnodeSetReadyStatus(SVnodeObj* pVnode) {
vDebug("vgId:%d, cannot set status:ready, old:%s", pVnode->vgId, vnodeStatus[pVnode->status]); vDebug("vgId:%d, cannot set status:ready, old:%s", pVnode->vgId, vnodeStatus[pVnode->status]);
} }
qQueryMgmtReOpen(pVnode->qMgmt);
pthread_mutex_unlock(&pVnode->statusMutex); pthread_mutex_unlock(&pVnode->statusMutex);
return set; return set;
} }
bool vnodeSetClosingStatus(SVnodeObj* pVnode) { static bool vnodeSetClosingStatusImp(SVnodeObj* pVnode) {
bool set = false; bool set = false;
pthread_mutex_lock(&pVnode->statusMutex); pthread_mutex_lock(&pVnode->statusMutex);
...@@ -63,6 +67,20 @@ bool vnodeSetClosingStatus(SVnodeObj* pVnode) { ...@@ -63,6 +67,20 @@ bool vnodeSetClosingStatus(SVnodeObj* pVnode) {
return set; return set;
} }
bool vnodeSetClosingStatus(SVnodeObj* pVnode) {
if (!vnodeInInitStatus(pVnode)) {
// it may be in updating or reset state, then it shall wait
int32_t i = 0;
while (!vnodeSetClosingStatusImp(pVnode)) {
if (++i % 1000 == 0) {
sched_yield();
}
}
}
return true;
}
bool vnodeSetUpdatingStatus(SVnodeObj* pVnode) { bool vnodeSetUpdatingStatus(SVnodeObj* pVnode) {
bool set = false; bool set = false;
pthread_mutex_lock(&pVnode->statusMutex); pthread_mutex_lock(&pVnode->statusMutex);
......
...@@ -120,12 +120,6 @@ static int32_t vnodeCheckWrite(SVnodeObj *pVnode) { ...@@ -120,12 +120,6 @@ static int32_t vnodeCheckWrite(SVnodeObj *pVnode) {
return TSDB_CODE_APP_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
if (vnodeInClosingStatus(pVnode)) {
vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY;
}
if (pVnode->isFull) { if (pVnode->isFull) {
vDebug("vgId:%d, vnode is full, refCount:%d", pVnode->vgId, pVnode->refCount); vDebug("vgId:%d, vnode is full, refCount:%d", pVnode->vgId, pVnode->refCount);
return TSDB_CODE_VND_IS_FULL; return TSDB_CODE_VND_IS_FULL;
...@@ -254,6 +248,14 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) { ...@@ -254,6 +248,14 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
} }
} }
if (!vnodeInReadyStatus(pVnode)) {
vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
pVnode->refCount, pVnode);
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
return TSDB_CODE_APP_NOT_READY;
}
int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1); int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
if (queued > MAX_QUEUED_MSG_NUM) { if (queued > MAX_QUEUED_MSG_NUM) {
int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3; int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3;
...@@ -337,4 +339,6 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { ...@@ -337,4 +339,6 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
pWrite->processedCount); pWrite->processedCount);
return TSDB_CODE_VND_ACTION_IN_PROGRESS; return TSDB_CODE_VND_ACTION_IN_PROGRESS;
} }
} }
\ No newline at end of file
void vnodeWaitWriteCompleted(void *pVnode) {}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册