diff --git a/src/dnode/inc/dnodeVWrite.h b/src/dnode/inc/dnodeVWrite.h index 323405143fd10b7b4f73bac5103ad63ed870044f..759e9ca8a5599236d08228ddd87ed8d4f8c55dca 100644 --- a/src/dnode/inc/dnodeVWrite.h +++ b/src/dnode/inc/dnodeVWrite.h @@ -24,8 +24,8 @@ int32_t dnodeInitVWrite(); void dnodeCleanupVWrite(); void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg); void * dnodeAllocVWriteQueue(void *pVnode); -void dnodeFreeVWriteQueue(void *wqueue); -void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code); +void dnodeFreeVWriteQueue(void *pWqueue); +void dnodeSendRpcVWriteRsp(void *pVnode, void *pWrite, int32_t code); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 9d1d16e51e60ced5a07f3aa7d0a5a0f946771be2..da1a902fb3f6f3d84298d15cec840a7cc352f028 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -38,11 +38,11 @@ typedef struct { } SVWriteWorkerPool; static SVWriteWorkerPool tsVWriteWP; -static void *dnodeProcessVWriteQueue(void *param); +static void *dnodeProcessVWriteQueue(void *pWorker); int32_t dnodeInitVWrite() { tsVWriteWP.max = tsNumOfCores; - tsVWriteWP.worker = (SVWriteWorker *)tcalloc(sizeof(SVWriteWorker), tsVWriteWP.max); + tsVWriteWP.worker = tcalloc(sizeof(SVWriteWorker), tsVWriteWP.max); if (tsVWriteWP.worker == NULL) return -1; pthread_mutex_init(&tsVWriteWP.mutex, NULL); @@ -162,13 +162,13 @@ void *dnodeAllocVWriteQueue(void *pVnode) { return queue; } -void dnodeFreeVWriteQueue(void *wqueue) { - taosCloseQueue(wqueue); +void dnodeFreeVWriteQueue(void *pWqueue) { + taosCloseQueue(pWqueue); } -void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) { - if (param == NULL) return; - SVWriteMsg *pWrite = param; +void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) { + if (wparam == NULL) return; + SVWriteMsg *pWrite = wparam; if (code < 0) pWrite->code = code; int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1); @@ -183,13 +183,11 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) { }; rpcSendResponse(&rpcRsp); - taosFreeQitem(pWrite); - - vnodeRelease(pVnode); + vnodeFreeFromWQueue(pVnode, pWrite); } -static void *dnodeProcessVWriteQueue(void *param) { - SVWriteWorker *pWorker = param; +static void *dnodeProcessVWriteQueue(void *wparam) { + SVWriteWorker *pWorker = wparam; SVWriteMsg * pWrite; void * pVnode; int32_t numOfMsgs; @@ -232,8 +230,7 @@ static void *dnodeProcessVWriteQueue(void *param) { if (pWrite->rspRet.rsp) { rpcFreeCont(pWrite->rspRet.rsp); } - taosFreeQitem(pWrite); - vnodeRelease(pVnode); + vnodeFreeFromWQueue(pVnode, pWrite); } } } diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 9454b97e388e6d51b52e7fd6c1fe586b1e5bdbbd..d71fdd5e0a12511d84366a661f6c9dd8a73e8c9a 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -54,8 +54,8 @@ void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid); void *dnodeAllocVWriteQueue(void *pVnode); -void dnodeFreeVWriteQueue(void *wqueue); -void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code); +void dnodeFreeVWriteQueue(void *pWqueue); +void dnodeSendRpcVWriteRsp(void *pVnode, void *pWrite, int32_t code); void *dnodeAllocVReadQueue(void *pVnode); void dnodeFreeVReadQueue(void *rqueue); diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 018e96e1938336809fc8829be42ed3e946c6ac98..563d035898758a69fb57d6e1e5d8dc7e99886af4 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -70,11 +70,12 @@ void* vnodeAcquire(int32_t vgId); // add refcount void vnodeRelease(void *pVnode); // dec refCount void* vnodeGetWal(void *pVnode); -int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam); -int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam); +int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcMsg); +void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite); +int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet); int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); -void vnodeBuildStatusMsg(void *param); -void vnodeConfirmForward(void *param, uint64_t version, int32_t code); +void vnodeBuildStatusMsg(void *pStatus); +void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code); void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes); int32_t vnodeInitResources(); diff --git a/src/os/src/detail/osTimer.c b/src/os/src/detail/osTimer.c index 22f7b94c3aea55432b3e59c4ad8664cb3ef6020e..9883a03a0933075616b69800fe6e34a36fc6c746 100644 --- a/src/os/src/detail/osTimer.c +++ b/src/os/src/detail/osTimer.c @@ -111,6 +111,9 @@ void taosUninitTimer() { pthread_sigmask(SIG_BLOCK, &set, NULL); */ void taosMsleep(int mseconds) { +#if 1 + usleep(mseconds * 1000); +#else struct timeval timeout; int seconds, useconds; @@ -126,7 +129,8 @@ void taosMsleep(int mseconds) { select(0, NULL, NULL, NULL, &timeout); - /* pthread_sigmask(SIG_UNBLOCK, &set, NULL); */ +/* pthread_sigmask(SIG_UNBLOCK, &set, NULL); */ +#endif } #endif \ No newline at end of file diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 0f06af390ca37189bb9dfc2ec88e2b4107630692..c9689ce8b266acd88b0b6212637768934e4833ed 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -37,6 +37,7 @@ extern int32_t vDebugFlag; typedef struct { int32_t vgId; // global vnode group ID int32_t refCount; // reference count + int32_t queuedMsg; int32_t delay; int8_t status; int8_t role; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index ea7eb94efe81b60761f81e237c5de114b3236977..2234b4f8eda7caa8622b60be8a9018d6aae40ad1 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -28,13 +28,15 @@ #include "syncInt.h" #include "tcq.h" -static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *); -static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); -static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); -static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); -static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); -static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); -static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet); +#define MAX_QUEUED_MSG_NUM 10000 + +static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *); +static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); +static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); +static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); +static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); +static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); +static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); void vnodeInitWriteFp(void) { vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg; @@ -100,8 +102,8 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara return syncCode; } -static int32_t vnodeCheckWrite(void *param) { - SVnodeObj *pVnode = param; +static int32_t vnodeCheckWrite(void *vparam) { + SVnodeObj *pVnode = vparam; if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) { vDebug("vgId:%d, no write auth, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); return TSDB_CODE_VND_NO_WRITE_AUTH; @@ -127,8 +129,8 @@ static int32_t vnodeCheckWrite(void *param) { return TSDB_CODE_SUCCESS; } -void vnodeConfirmForward(void *param, uint64_t version, int32_t code) { - SVnodeObj *pVnode = (SVnodeObj *)param; +void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code) { + SVnodeObj *pVnode = vparam; syncConfirmForward(pVnode->sync, version, code); } @@ -242,8 +244,25 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len); atomic_add_fetch_32(&pVnode->refCount, 1); - vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); + + int32_t queued = atomic_add_fetch_32(&pVnode->queuedMsg, 1); + if (queued > MAX_QUEUED_MSG_NUM) { + vDebug("vgId:%d, too many msg:%d in vwqueue, flow control", pVnode->vgId, queued); + taosMsleep(1); + } + + vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedMsg); taosWriteQitem(pVnode->wqueue, qtype, pWrite); return TSDB_CODE_SUCCESS; } + +void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) { + SVnodeObj *pVnode = vparam; + + atomic_sub_fetch_32(&pVnode->queuedMsg, 1); + vTrace("vgId:%d, free from vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedMsg); + + taosFreeQitem(pWrite); + vnodeRelease(pVnode); +}