提交 f94f8aad 编写于 作者: S Shengliang Guan

dnode may offline while create vnode

上级 59d23065
...@@ -94,6 +94,7 @@ typedef struct { ...@@ -94,6 +94,7 @@ typedef struct {
pthread_t *threadId; pthread_t *threadId;
SRWLatch latch; SRWLatch latch;
SDnodeWorker mgmtWorker; SDnodeWorker mgmtWorker;
SDnodeWorker statusWorker;
} SDnodeMgmt; } SDnodeMgmt;
typedef struct { typedef struct {
......
...@@ -536,6 +536,11 @@ int32_t dndInitMgmt(SDnode *pDnode) { ...@@ -536,6 +536,11 @@ int32_t dndInitMgmt(SDnode *pDnode) {
return -1; return -1;
} }
if (dndInitWorker(pDnode, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dndProcessMgmtQueue) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr());
return -1;
}
pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode); pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode);
if (pMgmt->threadId == NULL) { if (pMgmt->threadId == NULL) {
dError("failed to init dnode thread"); dError("failed to init dnode thread");
...@@ -550,6 +555,7 @@ int32_t dndInitMgmt(SDnode *pDnode) { ...@@ -550,6 +555,7 @@ int32_t dndInitMgmt(SDnode *pDnode) {
void dndStopMgmt(SDnode *pDnode) { void dndStopMgmt(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
dndCleanupWorker(&pMgmt->mgmtWorker); dndCleanupWorker(&pMgmt->mgmtWorker);
dndCleanupWorker(&pMgmt->statusWorker);
if (pMgmt->threadId != NULL) { if (pMgmt->threadId != NULL) {
taosDestoryThread(pMgmt->threadId); taosDestoryThread(pMgmt->threadId);
...@@ -587,7 +593,12 @@ void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -587,7 +593,12 @@ void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndUpdateMnodeEpSet(pDnode, pEpSet); dndUpdateMnodeEpSet(pDnode, pEpSet);
} }
if (dndWriteMsgToWorker(&pMgmt->mgmtWorker, pMsg, sizeof(SRpcMsg)) != 0) { SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
if (pMsg->msgType == TDMT_MND_STATUS_RSP) {
pWorker = &pMgmt->statusWorker;
}
if (dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)) != 0) {
if (pMsg->msgType & 1u) { if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY}; SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
......
...@@ -235,7 +235,7 @@ int32_t mndGetDnodeSize(SMnode *pMnode) { ...@@ -235,7 +235,7 @@ int32_t mndGetDnodeSize(SMnode *pMnode) {
bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) { bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) {
int64_t interval = ABS(pDnode->lastAccessTime - curMs); int64_t interval = ABS(pDnode->lastAccessTime - curMs);
if (interval > 10000 * pMnode->cfg.statusInterval) { if (interval > 3500 * pMnode->cfg.statusInterval) {
if (pDnode->rebootTime > 0) { if (pDnode->rebootTime > 0) {
pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT; pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册