未验证 提交 db530633 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #3945 from taosdata/feature/os

Feature/os
...@@ -216,8 +216,8 @@ static bool balanceCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) { ...@@ -216,8 +216,8 @@ static bool balanceCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) {
SVnodeGid *pVnode = pVgroup->vnodeGid + i; SVnodeGid *pVnode = pVgroup->vnodeGid + i;
if (pVnode == pRmVnode) continue; if (pVnode == pRmVnode) continue;
mTrace("vgId:%d, change vgroup status, dnode:%d status:%d", pVgroup->vgId, pVnode->pDnode->dnodeId, mTrace("vgId:%d, check vgroup status, dnode:%d status:%d, vnode role:%s", pVgroup->vgId, pVnode->pDnode->dnodeId,
pVnode->pDnode->status); pVnode->pDnode->status, syncRole[pVnode->role]);
if (pVnode->pDnode->status == TAOS_DN_STATUS_DROPPING) continue; if (pVnode->pDnode->status == TAOS_DN_STATUS_DROPPING) continue;
if (pVnode->pDnode->status == TAOS_DN_STATUS_OFFLINE) continue; if (pVnode->pDnode->status == TAOS_DN_STATUS_OFFLINE) continue;
......
...@@ -588,7 +588,8 @@ static UNUSED_FUNC bool tscKillQueryInDnode(SSqlObj* pSql) { ...@@ -588,7 +588,8 @@ static UNUSED_FUNC bool tscKillQueryInDnode(SSqlObj* pSql) {
return true; return true;
} }
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) { if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
return true; return true;
} }
...@@ -702,6 +703,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -702,6 +703,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) { if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
return; return;
} }
...@@ -750,6 +752,7 @@ void taos_stop_query(TAOS_RES *res) { ...@@ -750,6 +752,7 @@ void taos_stop_query(TAOS_RES *res) {
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
assert(pSql->pRpcCtx == NULL); assert(pSql->pRpcCtx == NULL);
tscKillSTableQuery(pSql); tscKillSTableQuery(pSql);
......
...@@ -114,9 +114,9 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { ...@@ -114,9 +114,9 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
} }
// for select query super table, the super table vgroup list can not be null in any cases. // for select query super table, the super table vgroup list can not be null in any cases.
if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
assert(pTableMetaInfo->vgroupList != NULL); // assert(pTableMetaInfo->vgroupList != NULL);
} // }
if ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE) { if ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE) {
return false; return false;
......
...@@ -189,7 +189,6 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { ...@@ -189,7 +189,6 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) { void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) {
rpcFreeCont(pRead->rpcMsg.pCont); rpcFreeCont(pRead->rpcMsg.pCont);
vnodeRelease(pVnode); vnodeRelease(pVnode);
return;
} }
static void *dnodeProcessReadQueue(void *param) { static void *dnodeProcessReadQueue(void *param) {
......
...@@ -41,6 +41,8 @@ typedef struct { ...@@ -41,6 +41,8 @@ typedef struct {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
} SReadMsg; } SReadMsg;
extern char *vnodeStatus[];
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId); int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId, char *rootDir); int32_t vnodeOpen(int32_t vgId, char *rootDir);
......
...@@ -111,7 +111,6 @@ void mnodeReleaseConn(SConnObj *pConn) { ...@@ -111,7 +111,6 @@ void mnodeReleaseConn(SConnObj *pConn) {
} }
SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port) { SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port) {
uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs();
SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t)); SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t));
if (pConn == NULL) { if (pConn == NULL) {
mDebug("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); mDebug("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
...@@ -126,7 +125,7 @@ SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t po ...@@ -126,7 +125,7 @@ SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t po
} }
// mDebug("connId:%d, is incoming, user:%s ip:%s:%u", connId, pConn->user, taosIpStr(pConn->ip), pConn->port); // mDebug("connId:%d, is incoming, user:%s ip:%s:%u", connId, pConn->user, taosIpStr(pConn->ip), pConn->port);
pConn->lastAccess = expireTime; pConn->lastAccess = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs();
return pConn; return pConn;
} }
...@@ -626,7 +625,7 @@ static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) { ...@@ -626,7 +625,7 @@ static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) {
SCMKillConnMsg *pKill = pMsg->rpcMsg.pCont; SCMKillConnMsg *pKill = pMsg->rpcMsg.pCont;
int32_t connId = atoi(pKill->queryId); int32_t connId = atoi(pKill->queryId);
SConnObj * pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t)); SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t));
if (pConn == NULL) { if (pConn == NULL) {
mError("connId:%s, failed to kill, conn not exist", pKill->queryId); mError("connId:%s, failed to kill, conn not exist", pKill->queryId);
return TSDB_CODE_MND_INVALID_CONN_ID; return TSDB_CODE_MND_INVALID_CONN_ID;
......
...@@ -56,6 +56,14 @@ int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; } ...@@ -56,6 +56,14 @@ int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; }
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {} void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {}
#endif #endif
char* vnodeStatus[] = {
"init",
"ready",
"closing",
"updating",
"reset"
};
int32_t vnodeInitResources() { int32_t vnodeInitResources() {
int code = syncInit(); int code = syncInit();
if (code != 0) return code; if (code != 0) return code;
...@@ -74,6 +82,7 @@ int32_t vnodeInitResources() { ...@@ -74,6 +82,7 @@ int32_t vnodeInitResources() {
void vnodeCleanupResources() { void vnodeCleanupResources() {
if (tsDnodeVnodesHash != NULL) { if (tsDnodeVnodesHash != NULL) {
vDebug("vnode list is cleanup");
taosHashCleanup(tsDnodeVnodesHash); taosHashCleanup(tsDnodeVnodesHash);
tsDnodeVnodesHash = NULL; tsDnodeVnodesHash = NULL;
} }
...@@ -84,9 +93,10 @@ void vnodeCleanupResources() { ...@@ -84,9 +93,10 @@ void vnodeCleanupResources() {
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
int32_t code; int32_t code;
SVnodeObj *pTemp = (SVnodeObj *)taosHashGet(tsDnodeVnodesHash, (const char *)&pVnodeCfg->cfg.vgId, sizeof(int32_t)); SVnodeObj *pVnode = vnodeAcquire(pVnodeCfg->cfg.vgId);
if (pTemp != NULL) { if (pVnode != NULL) {
vInfo("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pTemp); vDebug("vgId:%d, vnode already exist, refCount:%d pVnode:%p", pVnodeCfg->cfg.vgId, pVnode->refCount, pVnode);
vnodeRelease(pVnode);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -143,22 +153,24 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -143,22 +153,24 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
return TSDB_CODE_VND_INIT_FAILED; return TSDB_CODE_VND_INIT_FAILED;
} }
vInfo("vgId:%d, vnode is created, walLevel:%d fsyncPeriod:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel, pVnodeCfg->cfg.fsyncPeriod); vInfo("vgId:%d, vnode dir is created, walLevel:%d fsyncPeriod:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel,
pVnodeCfg->cfg.fsyncPeriod);
code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir); code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir);
return code; return code;
} }
int32_t vnodeDrop(int32_t vgId) { int32_t vnodeDrop(int32_t vgId) {
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); SVnodeObj *pVnode = vnodeAcquire(vgId);
if (ppVnode == NULL || *ppVnode == NULL) { if (pVnode == NULL) {
vDebug("vgId:%d, failed to drop, vgId not find", vgId); vDebug("vgId:%d, failed to drop, vnode not find", vgId);
return TSDB_CODE_VND_INVALID_VGROUP_ID; return TSDB_CODE_VND_INVALID_VGROUP_ID;
} }
SVnodeObj *pVnode = *ppVnode; vInfo("vgId:%d, vnode will be dropped, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
vTrace("vgId:%d, vnode will be dropped, refCount:%d", pVnode->vgId, pVnode->refCount);
pVnode->dropped = 1; pVnode->dropped = 1;
vnodeRelease(pVnode);
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -340,11 +352,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -340,11 +352,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
} }
int32_t vnodeClose(int32_t vgId) { int32_t vnodeClose(int32_t vgId) {
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); SVnodeObj *pVnode = vnodeAcquire(vgId);
if (ppVnode == NULL || *ppVnode == NULL) return 0; if (pVnode == NULL) return 0;
SVnodeObj *pVnode = *ppVnode; vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode);
vDebug("vgId:%d, vnode will be closed", pVnode->vgId); vnodeRelease(pVnode);
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return 0; return 0;
...@@ -355,21 +367,27 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -355,21 +367,27 @@ void vnodeRelease(void *pVnodeRaw) {
int32_t vgId = pVnode->vgId; int32_t vgId = pVnode->vgId;
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, release vnode, refCount:%d pVnode:%p", vgId, refCount, pVnode);
assert(refCount >= 0); assert(refCount >= 0);
if (refCount > 0) { if (refCount > 0) {
vDebug("vgId:%d, release vnode, refCount:%d", vgId, refCount); if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2) {
if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2)
tsem_post(&pVnode->sem); tsem_post(&pVnode->sem);
}
return; return;
} }
vDebug("vgId:%d, vnode will be destroyed, refCount:%d pVnode:%p", vgId, refCount, pVnode);
if (pVnode->qMgmt) {
qCleanupQueryMgmt(pVnode->qMgmt); qCleanupQueryMgmt(pVnode->qMgmt);
pVnode->qMgmt = NULL; pVnode->qMgmt = NULL;
}
if (pVnode->tsdb) if (pVnode->tsdb) {
tsdbCloseRepo(pVnode->tsdb, 1); tsdbCloseRepo(pVnode->tsdb, 1);
pVnode->tsdb = NULL; pVnode->tsdb = NULL;
}
// stop continuous query // stop continuous query
if (pVnode->cq) { if (pVnode->cq) {
...@@ -378,17 +396,20 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -378,17 +396,20 @@ void vnodeRelease(void *pVnodeRaw) {
cqClose(cq); cqClose(cq);
} }
if (pVnode->wal) if (pVnode->wal) {
walClose(pVnode->wal); walClose(pVnode->wal);
pVnode->wal = NULL; pVnode->wal = NULL;
}
if (pVnode->wqueue) if (pVnode->wqueue) {
dnodeFreeVnodeWqueue(pVnode->wqueue); dnodeFreeVnodeWqueue(pVnode->wqueue);
pVnode->wqueue = NULL; pVnode->wqueue = NULL;
}
if (pVnode->rqueue) if (pVnode->rqueue) {
dnodeFreeVnodeRqueue(pVnode->rqueue); dnodeFreeVnodeRqueue(pVnode->rqueue);
pVnode->rqueue = NULL; pVnode->rqueue = NULL;
}
taosTFree(pVnode->rootDir); taosTFree(pVnode->rootDir);
...@@ -413,22 +434,31 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -413,22 +434,31 @@ void vnodeRelease(void *pVnodeRaw) {
free(pVnode); free(pVnode);
int32_t count = taosHashGetSize(tsDnodeVnodesHash); int32_t count = taosHashGetSize(tsDnodeVnodesHash);
vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count); vDebug("vgId:%d, vnode is destroyed, vnodes:%d", vgId, count);
}
static void vnodeIncRef(void *ptNode) {
assert(ptNode != NULL);
SVnodeObj **ppVnode = (SVnodeObj **)ptNode;
assert(ppVnode);
assert(*ppVnode);
SVnodeObj *pVnode = *ppVnode;
atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
} }
void *vnodeAcquire(int32_t vgId) { void *vnodeAcquire(int32_t vgId) {
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); SVnodeObj **ppVnode = taosHashGetCB(tsDnodeVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *));
if (ppVnode == NULL || *ppVnode == NULL) { if (ppVnode == NULL || *ppVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
vInfo("vgId:%d, not exist", vgId); vDebug("vgId:%d, not exist", vgId);
return NULL; return NULL;
} }
SVnodeObj *pVnode = *ppVnode; return *ppVnode;
atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount);
return pVnode;
} }
void *vnodeAcquireRqueue(int32_t vgId) { void *vnodeAcquireRqueue(int32_t vgId) {
...@@ -528,7 +558,7 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { ...@@ -528,7 +558,7 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) {
if (pVnode != NULL) { if (pVnode != NULL) {
pVnode->accessState = pAccess[i].accessState; pVnode->accessState = pAccess[i].accessState;
if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) { if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) {
vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState) vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState);
} }
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
...@@ -538,11 +568,12 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { ...@@ -538,11 +568,12 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) {
static void vnodeCleanUp(SVnodeObj *pVnode) { static void vnodeCleanUp(SVnodeObj *pVnode) {
// remove from hash, so new messages wont be consumed // remove from hash, so new messages wont be consumed
taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t));
int i = 0;
if (pVnode->status != TAOS_VN_STATUS_INIT) { if (pVnode->status != TAOS_VN_STATUS_INIT) {
// it may be in updateing or reset state, then it shall wait // it may be in updateing or reset state, then it shall wait
while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) != TAOS_VN_STATUS_READY) { int i = 0;
while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) !=
TAOS_VN_STATUS_READY) {
if (++i % 1000 == 0) { if (++i % 1000 == 0) {
sched_yield(); sched_yield();
} }
...@@ -556,7 +587,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { ...@@ -556,7 +587,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
syncStop(sync); syncStop(sync);
} }
vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount); vDebug("vgId:%d, vnode will cleanup, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
// release local resources only after cutting off outside connections // release local resources only after cutting off outside connections
qQueryMgmtNotifyClosed(pVnode->qMgmt); qQueryMgmtNotifyClosed(pVnode->qMgmt);
...@@ -613,8 +644,9 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) ...@@ -613,8 +644,9 @@ static int vnodeResetTsdb(SVnodeObj *pVnode)
char rootDir[128] = "\0"; char rootDir[128] = "\0";
sprintf(rootDir, "%s/tsdb", pVnode->rootDir); sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY) if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY) {
return -1; return -1;
}
void *tsdb = pVnode->tsdb; void *tsdb = pVnode->tsdb;
pVnode->tsdb = NULL; pVnode->tsdb = NULL;
...@@ -622,8 +654,9 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) ...@@ -622,8 +654,9 @@ static int vnodeResetTsdb(SVnodeObj *pVnode)
// acquire vnode // acquire vnode
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
if (refCount > 2) if (refCount > 2) {
tsem_wait(&pVnode->sem); tsem_wait(&pVnode->sem);
}
// close tsdb, then open tsdb // close tsdb, then open tsdb
tsdbCloseRepo(tsdb, 0); tsdbCloseRepo(tsdb, 0);
......
...@@ -38,8 +38,7 @@ void vnodeInitReadFp(void) { ...@@ -38,8 +38,7 @@ void vnodeInitReadFp(void) {
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
} }
int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { static int32_t vnodeProcessReadImp(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
SVnodeObj *pVnode = (SVnodeObj *)param;
int msgType = pReadMsg->rpcMsg.msgType; int msgType = pReadMsg->rpcMsg.msgType;
if (vnodeProcessReadMsgFp[msgType] == NULL) { if (vnodeProcessReadMsgFp[msgType] == NULL) {
...@@ -48,16 +47,23 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { ...@@ -48,16 +47,23 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
} }
if (pVnode->status != TAOS_VN_STATUS_READY) { if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status); vDebug("vgId:%d, msgType:%s not processed, vnode status is %s", pVnode->vgId, taosMsg[msgType],
vnodeStatus[pVnode->status]);
return TSDB_CODE_APP_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
// tsdb may be in reset state // tsdb may be in reset state
if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY; if (pVnode->tsdb == NULL) {
if (pVnode->status == TAOS_VN_STATUS_CLOSING) return TSDB_CODE_APP_NOT_READY; vDebug("vgId:%d, msgType:%s not processed, tsdb is null", pVnode->vgId, taosMsg[msgType]);
return TSDB_CODE_APP_NOT_READY;
}
if (pVnode->status == TAOS_VN_STATUS_CLOSING) {
vDebug("vgId:%d, msgType:%s not processed, vstatus is %s", pVnode->vgId, taosMsg[msgType],
vnodeStatus[pVnode->status]);
return TSDB_CODE_APP_NOT_READY;
}
// TODO: Later, let slave to support query
// if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) { if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[msgType], vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[msgType],
pVnode->syncCfg.replica, syncRole[pVnode->role]); pVnode->syncCfg.replica, syncRole[pVnode->role]);
...@@ -67,6 +73,25 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { ...@@ -67,6 +73,25 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
} }
int32_t vnodeProcessRead(void *param, SReadMsg *pRead) {
SVnodeObj *pVnode = (SVnodeObj *)param;
int32_t code = vnodeProcessReadImp(pVnode, pRead);
if (code == TSDB_CODE_APP_NOT_READY && pRead->rpcMsg.msgType == TSDB_MSG_TYPE_QUERY) {
// After the fetch request enters the vnode queue
// If the vnode cannot provide services, the following operations are still required
// Or, there will be a deadlock
void **qhandle = (void **)pRead->pCont;
vError("QInfo:%p msg:%p will be killed for vstatus is %s", *qhandle, pRead, vnodeStatus[pVnode->status]);
// qKillQuery(*qhandle);
// qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true);
return TSDB_CODE_APP_NOT_READY;
} else {
return code;
}
}
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) { static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
......
...@@ -202,7 +202,7 @@ int vnodeWriteCqMsgToQueue(void *param, void *data, int type) { ...@@ -202,7 +202,7 @@ int vnodeWriteCqMsgToQueue(void *param, void *data, int type) {
memcpy(pWal, pHead, size); memcpy(pWal, pHead, size);
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("CQ: vgId:%d, get vnode wqueue, refCount:%d", pVnode->vgId, pVnode->refCount); vTrace("CQ: vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
taosWriteQitem(pVnode->wqueue, type, pSync); taosWriteQitem(pVnode->wqueue, type, pSync);
...@@ -219,7 +219,7 @@ int vnodeWriteToQueue(void *param, void *data, int type) { ...@@ -219,7 +219,7 @@ int vnodeWriteToQueue(void *param, void *data, int type) {
memcpy(pWal, pHead, size); memcpy(pWal, pHead, size);
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("vgId:%d, get vnode wqueue, refCount:%d", pVnode->vgId, pVnode->refCount); vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
taosWriteQitem(pVnode->wqueue, type, pWal); taosWriteQitem(pVnode->wqueue, type, pWal);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册