提交 1ae860b0 编写于 作者: S Shengliang Guan

TD-1899

上级 98d254d5
...@@ -24,7 +24,7 @@ int32_t dnodeInitVRead(); ...@@ -24,7 +24,7 @@ int32_t dnodeInitVRead();
void dnodeCleanupVRead(); void dnodeCleanupVRead();
void dnodeDispatchToVReadQueue(SRpcMsg *pMsg); void dnodeDispatchToVReadQueue(SRpcMsg *pMsg);
void * dnodeAllocVReadQueue(void *pVnode); void * dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *rqueue); void dnodeFreeVReadQueue(void *pRqueue);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -35,7 +35,7 @@ typedef struct { ...@@ -35,7 +35,7 @@ typedef struct {
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SVReadWorkerPool; } SVReadWorkerPool;
static void *dnodeProcessReadQueue(void *param); static void *dnodeProcessReadQueue(void *pWorker);
// module global variable // module global variable
static SVReadWorkerPool tsVReadWP; static SVReadWorkerPool tsVReadWP;
...@@ -47,7 +47,7 @@ int32_t dnodeInitVRead() { ...@@ -47,7 +47,7 @@ int32_t dnodeInitVRead() {
tsVReadWP.min = tsNumOfCores; tsVReadWP.min = tsNumOfCores;
tsVReadWP.max = tsNumOfCores * tsNumOfThreadsPerCore; tsVReadWP.max = tsNumOfCores * tsNumOfThreadsPerCore;
if (tsVReadWP.max <= tsVReadWP.min * 2) tsVReadWP.max = 2 * tsVReadWP.min; if (tsVReadWP.max <= tsVReadWP.min * 2) tsVReadWP.max = 2 * tsVReadWP.min;
tsVReadWP.worker = (SVReadWorker *)calloc(sizeof(SVReadWorker), tsVReadWP.max); tsVReadWP.worker = calloc(sizeof(SVReadWorker), tsVReadWP.max);
pthread_mutex_init(&tsVReadWP.mutex, NULL); pthread_mutex_init(&tsVReadWP.mutex, NULL);
if (tsVReadWP.worker == NULL) return -1; if (tsVReadWP.worker == NULL) return -1;
...@@ -85,7 +85,7 @@ void dnodeCleanupVRead() { ...@@ -85,7 +85,7 @@ void dnodeCleanupVRead() {
void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0; int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen; int32_t leftLen = pMsg->contLen;
char * pCont = (char *)pMsg->pCont; char * pCont = pMsg->pCont;
while (leftLen > 0) { while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *)pCont; SMsgHead *pHead = (SMsgHead *)pCont;
...@@ -146,8 +146,8 @@ void *dnodeAllocVReadQueue(void *pVnode) { ...@@ -146,8 +146,8 @@ void *dnodeAllocVReadQueue(void *pVnode) {
return queue; return queue;
} }
void dnodeFreeVReadQueue(void *rqueue) { void dnodeFreeVReadQueue(void *pRqueue) {
taosCloseQueue(rqueue); taosCloseQueue(pRqueue);
} }
void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) { void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) {
...@@ -159,14 +159,12 @@ void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) { ...@@ -159,14 +159,12 @@ void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) {
}; };
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
vnodeRelease(pVnode);
} }
void dnodeDispatchNonRspMsg(void *pVnode, SVReadMsg *pRead, int32_t code) { void dnodeDispatchNonRspMsg(void *pVnode, SVReadMsg *pRead, int32_t code) {
vnodeRelease(pVnode);
} }
static void *dnodeProcessReadQueue(void *param) { static void *dnodeProcessReadQueue(void *pWorker) {
SVReadMsg *pRead; SVReadMsg *pRead;
int32_t qtype; int32_t qtype;
void * pVnode; void * pVnode;
...@@ -193,7 +191,7 @@ static void *dnodeProcessReadQueue(void *param) { ...@@ -193,7 +191,7 @@ static void *dnodeProcessReadQueue(void *param) {
} }
} }
taosFreeQitem(pRead); vnodeFreeFromRQueue(pVnode, pRead);
} }
return NULL; return NULL;
......
...@@ -57,7 +57,7 @@ void *dnodeAllocVWriteQueue(void *pVnode); ...@@ -57,7 +57,7 @@ void *dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *pWqueue); void dnodeFreeVWriteQueue(void *pWqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *pWrite, int32_t code); void dnodeSendRpcVWriteRsp(void *pVnode, void *pWrite, int32_t code);
void *dnodeAllocVReadQueue(void *pVnode); void *dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *rqueue); void dnodeFreeVReadQueue(void *pRqueue);
int32_t dnodeAllocateMPeerQueue(); int32_t dnodeAllocateMPeerQueue();
void dnodeFreeMPeerQueue(); void dnodeFreeMPeerQueue();
......
...@@ -23,12 +23,12 @@ extern "C" { ...@@ -23,12 +23,12 @@ extern "C" {
#include "twal.h" #include "twal.h"
typedef enum _VN_STATUS { typedef enum _VN_STATUS {
TAOS_VN_STATUS_INIT, TAOS_VN_STATUS_INIT = 0,
TAOS_VN_STATUS_READY, TAOS_VN_STATUS_READY = 1,
TAOS_VN_STATUS_CLOSING, TAOS_VN_STATUS_CLOSING = 2,
TAOS_VN_STATUS_UPDATING, TAOS_VN_STATUS_UPDATING = 3,
TAOS_VN_STATUS_RESET, TAOS_VN_STATUS_RESET = 4,
} EVnStatus; } EVnodeStatus;
typedef struct { typedef struct {
int32_t len; int32_t len;
...@@ -81,7 +81,8 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes); ...@@ -81,7 +81,8 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes);
int32_t vnodeInitResources(); int32_t vnodeInitResources();
void vnodeCleanupResources(); void vnodeCleanupResources();
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam); int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam);
void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead);
int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead); int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -37,8 +37,9 @@ extern int32_t vDebugFlag; ...@@ -37,8 +37,9 @@ extern int32_t vDebugFlag;
typedef struct { typedef struct {
int32_t vgId; // global vnode group ID int32_t vgId; // global vnode group ID
int32_t refCount; // reference count int32_t refCount; // reference count
int32_t queuedMsg; int32_t queuedWMsg;
int32_t delay; int32_t queuedRMsg;
int32_t delayMs;
int8_t status; int8_t status;
int8_t role; int8_t role;
int8_t accessState; int8_t accessState;
......
...@@ -378,9 +378,9 @@ int32_t vnodeClose(int32_t vgId) { ...@@ -378,9 +378,9 @@ int32_t vnodeClose(int32_t vgId) {
return 0; return 0;
} }
void vnodeRelease(void *pVnodeRaw) { void vnodeRelease(void *vparam) {
if (pVnodeRaw == NULL) return; if (vparam == NULL) return;
SVnodeObj *pVnode = pVnodeRaw; SVnodeObj *pVnode = vparam;
int32_t code = 0; int32_t code = 0;
int32_t vgId = pVnode->vgId; int32_t vgId = pVnode->vgId;
...@@ -643,18 +643,19 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { ...@@ -643,18 +643,19 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
pVnode->role = role; pVnode->role = role;
dnodeSendStatusMsgToMnode(); dnodeSendStatusMsgToMnode();
if (pVnode->role == TAOS_SYNC_ROLE_MASTER) if (pVnode->role == TAOS_SYNC_ROLE_MASTER) {
cqStart(pVnode->cq); cqStart(pVnode->cq);
else } else {
cqStop(pVnode->cq); cqStop(pVnode->cq);
}
} }
static void vnodeCtrlFlow(void *ahandle, int32_t mseconds) { static void vnodeCtrlFlow(void *ahandle, int32_t mseconds) {
SVnodeObj *pVnode = ahandle; SVnodeObj *pVnode = ahandle;
if (pVnode->delay != mseconds) { if (pVnode->delayMs != mseconds) {
vInfo("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds); pVnode->delayMs = mseconds;
vDebug("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds);
} }
pVnode->delay = mseconds;
} }
static int vnodeResetTsdb(SVnodeObj *pVnode) { static int vnodeResetTsdb(SVnodeObj *pVnode) {
......
...@@ -41,8 +41,8 @@ void vnodeInitReadFp(void) { ...@@ -41,8 +41,8 @@ void vnodeInitReadFp(void) {
// still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the // still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the
// request enters the queue // request enters the queue
// //
int32_t vnodeProcessRead(void *param, SVReadMsg *pRead) { int32_t vnodeProcessRead(void *vparam, SVReadMsg *pRead) {
SVnodeObj *pVnode = (SVnodeObj *)param; SVnodeObj *pVnode = vparam;
int32_t msgType = pRead->msgType; int32_t msgType = pRead->msgType;
if (vnodeProcessReadMsgFp[msgType] == NULL) { if (vnodeProcessReadMsgFp[msgType] == NULL) {
...@@ -53,8 +53,8 @@ int32_t vnodeProcessRead(void *param, SVReadMsg *pRead) { ...@@ -53,8 +53,8 @@ int32_t vnodeProcessRead(void *param, SVReadMsg *pRead) {
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pRead); return (*vnodeProcessReadMsgFp[msgType])(pVnode, pRead);
} }
static int32_t vnodeCheckRead(void *param) { static int32_t vnodeCheckRead(void *vparam) {
SVnodeObj *pVnode = param; SVnodeObj *pVnode = vparam;
if (pVnode->status != TAOS_VN_STATUS_READY) { if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status], vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
pVnode->refCount, pVnode); pVnode->refCount, pVnode);
...@@ -76,6 +76,16 @@ static int32_t vnodeCheckRead(void *param) { ...@@ -76,6 +76,16 @@ static int32_t vnodeCheckRead(void *param) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void vnodeFreeFromRQueue(void *vparam, SVReadMsg *pRead) {
SVnodeObj *pVnode = vparam;
atomic_sub_fetch_32(&pVnode->queuedRMsg, 1);
vTrace("vgId:%d, free from vrqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);
taosFreeQitem(pRead);
vnodeRelease(pVnode);
}
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) { int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) {
SVnodeObj *pVnode = vparam; SVnodeObj *pVnode = vparam;
...@@ -108,7 +118,8 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt ...@@ -108,7 +118,8 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt
pRead->qtype = qtype; pRead->qtype = qtype;
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode rqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); atomic_add_fetch_32(&pVnode->queuedRMsg, 1);
vTrace("vgId:%d, write into vrqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);
taosWriteQitem(pVnode->rqueue, qtype, pRead); taosWriteQitem(pVnode->rqueue, qtype, pRead);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -77,7 +77,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara ...@@ -77,7 +77,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
// assign version // assign version
pHead->version = pVnode->version + 1; pHead->version = pVnode->version + 1;
if (pVnode->delay) usleep(pVnode->delay * 1000); if (pVnode->delayMs) taosMsleep(pVnode->delayMs);
} else { // from wal or forward } else { // from wal or forward
// for data from WAL or forward, version may be smaller // for data from WAL or forward, version may be smaller
...@@ -245,13 +245,13 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar ...@@ -245,13 +245,13 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
int32_t queued = atomic_add_fetch_32(&pVnode->queuedMsg, 1); int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
if (queued > MAX_QUEUED_MSG_NUM) { if (queued > MAX_QUEUED_MSG_NUM) {
vDebug("vgId:%d, too many msg:%d in vwqueue, flow control", pVnode->vgId, queued); vDebug("vgId:%d, too many msg:%d in vwqueue, flow control", pVnode->vgId, queued);
taosMsleep(1); taosMsleep(1);
} }
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedMsg); vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg);
taosWriteQitem(pVnode->wqueue, qtype, pWrite); taosWriteQitem(pVnode->wqueue, qtype, pWrite);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -260,8 +260,8 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar ...@@ -260,8 +260,8 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) { void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
SVnodeObj *pVnode = vparam; SVnodeObj *pVnode = vparam;
atomic_sub_fetch_32(&pVnode->queuedMsg, 1); atomic_sub_fetch_32(&pVnode->queuedWMsg, 1);
vTrace("vgId:%d, free from vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedMsg); vTrace("vgId:%d, free from vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg);
taosFreeQitem(pWrite); taosFreeQitem(pWrite);
vnodeRelease(pVnode); vnodeRelease(pVnode);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册