From b7579334deed7b4c629ec977a6b8b4c7e3f47513 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 9 Apr 2021 02:53:30 +0000 Subject: [PATCH] [TD-3682]: Insufficient disk space may cause oom --- src/common/src/tglobal.c | 2 +- src/sync/src/syncMain.c | 11 +++++++++-- src/vnode/inc/vnodeInt.h | 1 + src/vnode/src/vnodeWrite.c | 24 ++++++++++++++++++++---- 4 files changed, 31 insertions(+), 7 deletions(-) diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 5f4ce046ed..ea5bf7954d 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -212,7 +212,7 @@ float tsAvailTmpDirectorySpace = 0; float tsAvailDataDirGB = 0; float tsUsedDataDirGB = 0; float tsReservedTmpDirectorySpace = 1.0f; -float tsMinimalDataDirGB = 1.0f; +float tsMinimalDataDirGB = 2.0f; int32_t tsTotalMemoryMB = 0; uint32_t tsVersion = 0; diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 72442eee6c..29ced21291 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -997,17 +997,24 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len); + int32_t code = 0; if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { // nodeVersion = pHead->version; - (*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL); + code = (*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL); } else { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { - syncSaveIntoBuffer(pPeer, pHead); + code = syncSaveIntoBuffer(pPeer, pHead); } else { sError("%s, forward discarded since sstatus:%s, hver:%" PRIu64, pPeer->id, syncStatus[nodeSStatus], pHead->version); + code = -1; } } + + if (code != 0) { + sError("%s, failed to process fwd msg, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len); + syncRestartConnection(pPeer); + } } static void syncProcessPeersStatusMsg(SPeersStatus *pPeersStatus, SSyncPeer *pPeer) { diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 4aa07196a7..d770a38e37 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -37,6 +37,7 @@ extern int32_t vDebugFlag; typedef struct { int32_t vgId; // global vnode group ID int32_t refCount; // reference count + int64_t queuedWMsgSize; int32_t queuedWMsg; int32_t queuedRMsg; int32_t flowctrlLevel; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index a0be52db7a..aab685e678 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -25,6 +25,7 @@ #include "vnodeStatus.h" #define MAX_QUEUED_MSG_NUM 100000 +#define MAX_QUEUED_MSG_SIZE 1024*1024*1024 //1GB extern void * tsDnodeTmr; static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *); @@ -269,6 +270,13 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) { } } + if (tsAvailDataDirGB <= tsMinimalDataDirGB) { + vError("vgId:%d, failed to write into vwqueue since no diskspace, avail:%fGB", pVnode->vgId, tsAvailDataDirGB); + taosFreeQitem(pWrite); + vnodeRelease(pVnode); + return TSDB_CODE_VND_NO_DISKSPACE; + } + if (!vnodeInReadyOrUpdatingStatus(pVnode)) { vError("vgId:%d, failed to write into vwqueue, vstatus is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status], pVnode->refCount, pVnode); @@ -278,14 +286,17 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) { } int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1); - if (queued > MAX_QUEUED_MSG_NUM) { + int64_t queuedSize = atomic_add_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len); + + if (queued > MAX_QUEUED_MSG_NUM || queuedSize > MAX_QUEUED_MSG_SIZE) { int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3; if (ms > 100) ms = 100; vDebug("vgId:%d, too many msg:%d in vwqueue, flow control %dms", pVnode->vgId, queued, ms); taosMsleep(ms); } - vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg); + vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d size:%" PRId64, pVnode->vgId, pVnode->refCount, + pVnode->queuedWMsg, pVnode->queuedWMsgSize); taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite); return TSDB_CODE_SUCCESS; @@ -308,7 +319,10 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) { SVnodeObj *pVnode = vparam; int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1); - vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, queued); + int64_t queuedSize = atomic_sub_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len); + + vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d size:%" PRId64, pVnode->vgId, pWrite, + pWrite->rpcMsg.ahandle, queued, queuedSize); taosFreeQitem(pWrite); vnodeRelease(pVnode); @@ -344,7 +358,9 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) { static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { SVnodeObj *pVnode = pWrite->pVnode; if (pWrite->qtype != TAOS_QTYPE_RPC) return 0; - if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->flowctrlLevel <= 0) return 0; + if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->queuedWMsgSize < MAX_QUEUED_MSG_SIZE && + pVnode->flowctrlLevel <= 0) + return 0; if (tsEnableFlowCtrl == 0) { int32_t ms = (int32_t)pow(2, pVnode->flowctrlLevel + 2); -- GitLab