未验证 提交 a8454a0d 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4266 from taosdata/feature/wal

[TD-2014]<fix>: add flow ctrl to vnode 
...@@ -24,8 +24,8 @@ int32_t dnodeInitVWrite(); ...@@ -24,8 +24,8 @@ int32_t dnodeInitVWrite();
void dnodeCleanupVWrite(); void dnodeCleanupVWrite();
void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg); void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg);
void * dnodeAllocVWriteQueue(void *pVnode); void * dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue); void dnodeFreeVWriteQueue(void *pWqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code); void dnodeSendRpcVWriteRsp(void *pVnode, void *pWrite, int32_t code);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -38,11 +38,11 @@ typedef struct { ...@@ -38,11 +38,11 @@ typedef struct {
} SVWriteWorkerPool; } SVWriteWorkerPool;
static SVWriteWorkerPool tsVWriteWP; static SVWriteWorkerPool tsVWriteWP;
static void *dnodeProcessVWriteQueue(void *param); static void *dnodeProcessVWriteQueue(void *pWorker);
int32_t dnodeInitVWrite() { int32_t dnodeInitVWrite() {
tsVWriteWP.max = tsNumOfCores; tsVWriteWP.max = tsNumOfCores;
tsVWriteWP.worker = (SVWriteWorker *)tcalloc(sizeof(SVWriteWorker), tsVWriteWP.max); tsVWriteWP.worker = tcalloc(sizeof(SVWriteWorker), tsVWriteWP.max);
if (tsVWriteWP.worker == NULL) return -1; if (tsVWriteWP.worker == NULL) return -1;
pthread_mutex_init(&tsVWriteWP.mutex, NULL); pthread_mutex_init(&tsVWriteWP.mutex, NULL);
...@@ -162,13 +162,13 @@ void *dnodeAllocVWriteQueue(void *pVnode) { ...@@ -162,13 +162,13 @@ void *dnodeAllocVWriteQueue(void *pVnode) {
return queue; return queue;
} }
void dnodeFreeVWriteQueue(void *wqueue) { void dnodeFreeVWriteQueue(void *pWqueue) {
taosCloseQueue(wqueue); taosCloseQueue(pWqueue);
} }
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) { void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) {
if (param == NULL) return; if (wparam == NULL) return;
SVWriteMsg *pWrite = param; SVWriteMsg *pWrite = wparam;
if (code < 0) pWrite->code = code; if (code < 0) pWrite->code = code;
int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1); int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1);
...@@ -183,13 +183,11 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) { ...@@ -183,13 +183,11 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
}; };
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
taosFreeQitem(pWrite); vnodeFreeFromWQueue(pVnode, pWrite);
vnodeRelease(pVnode);
} }
static void *dnodeProcessVWriteQueue(void *param) { static void *dnodeProcessVWriteQueue(void *wparam) {
SVWriteWorker *pWorker = param; SVWriteWorker *pWorker = wparam;
SVWriteMsg * pWrite; SVWriteMsg * pWrite;
void * pVnode; void * pVnode;
int32_t numOfMsgs; int32_t numOfMsgs;
...@@ -232,8 +230,7 @@ static void *dnodeProcessVWriteQueue(void *param) { ...@@ -232,8 +230,7 @@ static void *dnodeProcessVWriteQueue(void *param) {
if (pWrite->rspRet.rsp) { if (pWrite->rspRet.rsp) {
rpcFreeCont(pWrite->rspRet.rsp); rpcFreeCont(pWrite->rspRet.rsp);
} }
taosFreeQitem(pWrite); vnodeFreeFromWQueue(pVnode, pWrite);
vnodeRelease(pVnode);
} }
} }
} }
......
...@@ -54,8 +54,8 @@ void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet ...@@ -54,8 +54,8 @@ void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid); void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
void *dnodeAllocVWriteQueue(void *pVnode); void *dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue); void dnodeFreeVWriteQueue(void *pWqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code); void dnodeSendRpcVWriteRsp(void *pVnode, void *pWrite, int32_t code);
void *dnodeAllocVReadQueue(void *pVnode); void *dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *rqueue); void dnodeFreeVReadQueue(void *rqueue);
......
...@@ -70,11 +70,12 @@ void* vnodeAcquire(int32_t vgId); // add refcount ...@@ -70,11 +70,12 @@ void* vnodeAcquire(int32_t vgId); // add refcount
void vnodeRelease(void *pVnode); // dec refCount void vnodeRelease(void *pVnode); // dec refCount
void* vnodeGetWal(void *pVnode); void* vnodeGetWal(void *pVnode);
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam); int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcMsg);
int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam); void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite);
int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param); void vnodeBuildStatusMsg(void *pStatus);
void vnodeConfirmForward(void *param, uint64_t version, int32_t code); void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code);
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes); void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes);
int32_t vnodeInitResources(); int32_t vnodeInitResources();
......
...@@ -111,6 +111,9 @@ void taosUninitTimer() { ...@@ -111,6 +111,9 @@ void taosUninitTimer() {
pthread_sigmask(SIG_BLOCK, &set, NULL); pthread_sigmask(SIG_BLOCK, &set, NULL);
*/ */
void taosMsleep(int mseconds) { void taosMsleep(int mseconds) {
#if 1
usleep(mseconds * 1000);
#else
struct timeval timeout; struct timeval timeout;
int seconds, useconds; int seconds, useconds;
...@@ -126,7 +129,8 @@ void taosMsleep(int mseconds) { ...@@ -126,7 +129,8 @@ void taosMsleep(int mseconds) {
select(0, NULL, NULL, NULL, &timeout); select(0, NULL, NULL, NULL, &timeout);
/* pthread_sigmask(SIG_UNBLOCK, &set, NULL); */ /* pthread_sigmask(SIG_UNBLOCK, &set, NULL); */
#endif
} }
#endif #endif
\ No newline at end of file
...@@ -85,7 +85,7 @@ static void httpProcessHttpData(void *param) { ...@@ -85,7 +85,7 @@ static void httpProcessHttpData(void *param) {
while (1) { while (1) {
struct epoll_event events[HTTP_MAX_EVENTS]; struct epoll_event events[HTTP_MAX_EVENTS];
//-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1 //-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1
fdNum = epoll_wait(pThread->pollFd, events, HTTP_MAX_EVENTS, 1); fdNum = epoll_wait(pThread->pollFd, events, HTTP_MAX_EVENTS, TAOS_EPOLL_WAIT_TIME);
if (pThread->stop) { if (pThread->stop) {
httpDebug("%p, http thread get stop event, exiting...", pThread); httpDebug("%p, http thread get stop event, exiting...", pThread);
break; break;
......
...@@ -37,6 +37,7 @@ extern int32_t vDebugFlag; ...@@ -37,6 +37,7 @@ extern int32_t vDebugFlag;
typedef struct { typedef struct {
int32_t vgId; // global vnode group ID int32_t vgId; // global vnode group ID
int32_t refCount; // reference count int32_t refCount; // reference count
int32_t queuedMsg;
int32_t delay; int32_t delay;
int8_t status; int8_t status;
int8_t role; int8_t role;
......
...@@ -28,13 +28,15 @@ ...@@ -28,13 +28,15 @@
#include "syncInt.h" #include "syncInt.h"
#include "tcq.h" #include "tcq.h"
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *); #define MAX_QUEUED_MSG_NUM 10000
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *);
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet); static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
void vnodeInitWriteFp(void) { void vnodeInitWriteFp(void) {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg;
...@@ -100,8 +102,8 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara ...@@ -100,8 +102,8 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
return syncCode; return syncCode;
} }
static int32_t vnodeCheckWrite(void *param) { static int32_t vnodeCheckWrite(void *vparam) {
SVnodeObj *pVnode = param; SVnodeObj *pVnode = vparam;
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) { if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
vDebug("vgId:%d, no write auth, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); vDebug("vgId:%d, no write auth, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
return TSDB_CODE_VND_NO_WRITE_AUTH; return TSDB_CODE_VND_NO_WRITE_AUTH;
...@@ -127,8 +129,8 @@ static int32_t vnodeCheckWrite(void *param) { ...@@ -127,8 +129,8 @@ static int32_t vnodeCheckWrite(void *param) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void vnodeConfirmForward(void *param, uint64_t version, int32_t code) { void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code) {
SVnodeObj *pVnode = (SVnodeObj *)param; SVnodeObj *pVnode = vparam;
syncConfirmForward(pVnode->sync, version, code); syncConfirmForward(pVnode->sync, version, code);
} }
...@@ -242,8 +244,25 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar ...@@ -242,8 +244,25 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len); memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len);
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
int32_t queued = atomic_add_fetch_32(&pVnode->queuedMsg, 1);
if (queued > MAX_QUEUED_MSG_NUM) {
vDebug("vgId:%d, too many msg:%d in vwqueue, flow control", pVnode->vgId, queued);
taosMsleep(1);
}
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedMsg);
taosWriteQitem(pVnode->wqueue, qtype, pWrite); taosWriteQitem(pVnode->wqueue, qtype, pWrite);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
SVnodeObj *pVnode = vparam;
atomic_sub_fetch_32(&pVnode->queuedMsg, 1);
vTrace("vgId:%d, free from vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedMsg);
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册