From a682999c1d38a3dd70bc890ca33f4391621780e2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 2 Jun 2022 17:43:03 +0800 Subject: [PATCH] refactor: adjust vnode queue --- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 14 +- source/dnode/mgmt/mgmt_vnode/src/vmFile.c | 10 +- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 102 +++++----- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 14 +- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 182 +++++++----------- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 1 + 6 files changed, 143 insertions(+), 180 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 5ec33fe810..f74fd119c5 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -108,13 +108,13 @@ int32_t vmPutRpcMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutRpcMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype); -int32_t vmPutNodeMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t vmPutNodeMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t vmPutNodeMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t vmPutNodeMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t vmPutNodeMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index cf5a7ad885..71f11e0d97 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -16,6 +16,8 @@ #define _DEFAULT_SOURCE #include "vmInt.h" +#define MAX_CONTENT_LEN 1024 * 1024 + SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) { taosThreadRwlockRdlock(&pMgmt->lock); @@ -47,7 +49,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) { int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) { int32_t code = TSDB_CODE_INVALID_JSON_FORMAT; int32_t len = 0; - int32_t maxLen = 1024 * 1024; + int32_t maxLen = MAX_CONTENT_LEN; char *content = taosMemoryCalloc(1, maxLen + 1); cJSON *root = NULL; FILE *fp = NULL; @@ -128,7 +130,7 @@ int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes = vnodesNum; code = 0; - dDebug("succcessed to read file %s", file); + dDebug("succcessed to read file %s, numOfVnodes:%d", file, vnodesNum); _OVER: if (content != NULL) taosMemoryFree(content); @@ -156,7 +158,7 @@ int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) { SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); int32_t len = 0; - int32_t maxLen = 1024 * 1024; + int32_t maxLen = MAX_CONTENT_LEN; char *content = taosMemoryCalloc(1, maxLen + 1); if (content == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -195,6 +197,6 @@ int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) { taosMemoryFree(pVnodes); } - dDebug("successed to write %s", realfile); + dDebug("successed to write %s, numOfVnodes:%d", realfile, numOfVnodes); return taosRenameFile(file, realfile); } \ No newline at end of file diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 86087a0165..f43f00e02a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -297,59 +297,59 @@ SArray *vmGetMsgHandles() { SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle)); if (pArray == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MON_VM_INFO, vmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MON_VM_LOAD, vmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MON_VM_INFO, vmPutMsgToMonitorQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MON_VM_LOAD, vmPutMsgToMonitorQueue, 0) == NULL) goto _OVER; // Requests handled by VNODE - if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, vmPutNodeMsgToQueryQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, vmPutNodeMsgToQueryQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_CONSUME, vmPutNodeMsgToQueryQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_QUERY, vmPutNodeMsgToQueryQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_CONNECT, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_DISCONNECT, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - // if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_SET_CUR, vmPutNodeMsgToWriteQueue, 0)== NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_SMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; - - if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; - - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING_REPLY, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_CONNECT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_DISCONNECT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + // if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_SET_CUR, vmPutMsgToWriteQueue, 0)== NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; + + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 0c8d492ef4..5d3479f74b 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -18,21 +18,17 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { SVnodeObj *pVnode = NULL; - int32_t refCount = 0; taosThreadRwlockRdlock(&pMgmt->lock); taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode); if (pVnode == NULL) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; } else { - refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + dTrace("vgId:%d, acquire vnode, ref:%d", pVnode->vgId, refCount); } taosThreadRwlockUnlock(&pMgmt->lock); - if (pVnode != NULL) { - dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); - } - return pVnode; } @@ -41,8 +37,8 @@ void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { taosThreadRwlockRdlock(&pMgmt->lock); int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); + dTrace("vgId:%d, release vnode, ref:%d", pVnode->vgId, refCount); taosThreadRwlockUnlock(&pMgmt->lock); - dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); } int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { @@ -138,7 +134,7 @@ static void *vmOpenVnodeInThread(void *param) { } } - dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened, + dDebug("thread:%d, numOfVnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened, pThread->failed); return NULL; } @@ -160,7 +156,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { pMgmt->state.totalVnodes = numOfVnodes; - int32_t threadNum = 1; // tsNumOfCores; + int32_t threadNum = tsNumOfCores; int32_t vnodesPerThread = numOfVnodes / threadNum + 1; SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread)); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 03db74abd7..e5953b8383 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -29,11 +29,11 @@ static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) { tmsgSendRsp(&rsp); } -static void vmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { +static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SVnodeMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; - dTrace("msg:%p, get from vnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); + dTrace("msg:%p, get from vnode-mgmt queue", pMsg); switch (pMsg->msgType) { case TDMT_MON_VM_INFO: code = vmProcessGetMonitorInfoReq(pMgmt, pMsg); @@ -49,11 +49,14 @@ static void vmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; - dError("msg:%p, not processed in vnode queue", pMsg); + dError("msg:%p, not processed in vnode-mgmt queue", pMsg); } if (IsReq(pMsg)) { - if (code != 0 && terrno != 0) code = terrno; + if (code != 0 && terrno != 0) { + dError("msg:%p failed to process since %s", pMsg, terrstr()); + code = terrno; + } vmSendRsp(pMsg, code); } @@ -65,13 +68,15 @@ static void vmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SVnodeObj *pVnode = pInfo->ahandle; - dTrace("msg:%p, get from vnode-query queue", pMsg); + dTrace("vgId:%d, msg:%p get from vnode-query queue", pVnode->vgId, pMsg); int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg); if (code != 0) { if (terrno != 0) code = terrno; + dError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, terrstr()); vmSendRsp(pMsg, code); } - dTrace("msg:%p, is freed, code:0x%x", pMsg, code); + + dTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } @@ -79,13 +84,15 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SVnodeObj *pVnode = pInfo->ahandle; - dTrace("msg:%p, get from vnode-fetch queue", pMsg); + dTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { if (terrno != 0) code = terrno; + dError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr()); vmSendRsp(pMsg, code); } - dTrace("msg:%p, is freed, code:0x%x", pMsg, code); + + dTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } @@ -99,7 +106,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO for (int32_t m = 0; m < numOfMsgs; m++) { if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; - dTrace("vgId:%d, get msg:%p from vnode-write queue, type:%s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType)); + dTrace("vgId:%d, msg:%p get from vnode-write queue", pVnode->vgId, pMsg); if (taosArrayPush(pArray, &pMsg) == NULL) { dError("vgId:%d, failed to push msg:%p to vnode-write array", pVnode->vgId, pMsg); @@ -113,7 +120,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO if (code == TSDB_CODE_ACTION_IN_PROGRESS) continue; if (code != 0) { - dError("vgId:%d, msg:%p failed to process since %s", pVnode->vgId, pMsg, tstrerror(code)); + dError("vgId:%d, msg:%p failed to write since %s", pVnode->vgId, pMsg, tstrerror(code)); vmSendRsp(pMsg, code); continue; } @@ -129,14 +136,14 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; tmsgSendRedirectRsp(&rsp, &newEpSet); } else { - dError("vgId:%d, msg:%p failed to process since %s", pVnode->vgId, pMsg, tstrerror(code)); + dError("vgId:%d, msg:%p failed to write since %s", pVnode->vgId, pMsg, tstrerror(code)); vmSendRsp(pMsg, code); } } for (int32_t i = 0; i < numOfMsgs; i++) { pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); - dTrace("vgId:%d, msg:%p, is freed", pVnode->vgId, pMsg); + dTrace("vgId:%d, msg:%p is freed", pVnode->vgId, pMsg); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } @@ -146,10 +153,11 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; + SRpcMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { - SRpcMsg *pMsg = NULL; - taosGetQitem(qall, (void **)&pMsg); + if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; + dTrace("vgId:%d, msg:%p get from vnode-apply queue", pVnode->vgId, pMsg); // init response rpc msg SRpcMsg rsp = {0}; @@ -164,7 +172,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO // apply data into tsdb if (vnodeProcessWriteReq(pVnode->pImpl, &originalRpcMsg, pSyncApplyMsg->fsmMeta.index, &rsp) < 0) { rsp.code = terrno; - dTrace("msg:%p, process write error since %s", pMsg, terrstr()); + dError("vgId:%d, msg:%p failed to apply since %s", pVnode->vgId, pMsg, terrstr()); } syncApplyMsgDestroy(pSyncApplyMsg); @@ -176,6 +184,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO tmsgSendRsp(&rsp); } + dTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, rsp.code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } @@ -183,23 +192,22 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; + SRpcMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { - SRpcMsg *pMsg = NULL; - taosGetQitem(qall, (void **)&pMsg); + if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; + dTrace("vgId:%d, msg:%p get from vnode-sync queue", pVnode->vgId, pMsg); int32_t code = vnodeProcessSyncReq(pVnode->pImpl, pMsg, NULL); if (code != 0) { + dError("vgId:%d, msg:%p failed to sync since %s", pVnode->vgId, pMsg, terrstr()); if (pMsg->info.handle != NULL) { - SRpcMsg rsp = { - .code = (terrno < 0) ? terrno : code, - .info = pMsg->info, - }; - dTrace("msg:%p, failed to process sync queue since %s", pMsg, terrstr()); - tmsgSendRsp(&rsp); + if (terrno != 0) code = terrno; + vmSendRsp(pMsg, code); } } + dTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } @@ -207,24 +215,26 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; + SRpcMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { - SRpcMsg *pMsg = NULL; - taosGetQitem(qall, (void **)&pMsg); + if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; + dTrace("vgId:%d, msg:%p get from vnode-merge queue", pVnode->vgId, pMsg); - dTrace("msg:%p, get from vnode-merge queue", pMsg); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { + dError("vgId:%d, msg:%p failed to merge since %s", pVnode->vgId, pMsg, terrstr()); if (terrno != 0) code = terrno; vmSendRsp(pMsg, code); } + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } } -static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) { +static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) { SMsgHead *pHead = pMsg->pCont; int32_t code = 0; @@ -233,31 +243,35 @@ static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) { - dError("vgId:%d, failed to put msg:%p into vnode-queue since %s", pHead->vgId, pMsg, terrstr()); + dError("vgId:%d, failed to put msg:%p into vnode queue since %s", pHead->vgId, pMsg, terrstr()); return terrno != 0 ? terrno : -1; } switch (qtype) { case QUERY_QUEUE: - dTrace("msg:%p, put into vnode-query worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); + dTrace("vgId:%d, msg:%p put into vnode-query queue, type:%s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pVnode->pQueryQ, pMsg); break; case FETCH_QUEUE: - dTrace("msg:%p, put into vnode-fetch worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); + dTrace("vgId:%d, msg:%p put into vnode-fetch queue, type:%s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pVnode->pFetchQ, pMsg); break; case WRITE_QUEUE: - dTrace("msg:%p, put into vnode-write worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); + dTrace("vgId:%d, msg:%p put into vnode-write queue, type:%s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pVnode->pWriteQ, pMsg); break; case SYNC_QUEUE: - dTrace("msg:%p, put into vnode-sync worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); + dTrace("vgId:%d, msg:%p put into vnode-sync queue, type:%s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pVnode->pSyncQ, pMsg); break; case MERGE_QUEUE: - dTrace("msg:%p, put into vnode-merge worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); + dTrace("vgId:%d, msg:%p put into vnode-merge queue, type:%s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pVnode->pMergeQ, pMsg); break; + case APPLY_QUEUE: + dTrace("vgId:%d, msg:%p put into vnode-apply queue, type:%s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType)); + taosWriteQitem(pVnode->pApplyQ, pMsg); + break; default: code = -1; terrno = TSDB_CODE_INVALID_PARA; @@ -268,88 +282,44 @@ static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType return code; } -int32_t vmPutNodeMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); -} +int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); } -int32_t vmPutNodeMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); -} +int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); } -int32_t vmPutNodeMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); -} +int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); } -int32_t vmPutNodeMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); -} +int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); } -int32_t vmPutNodeMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, MERGE_QUEUE); -} +int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, MERGE_QUEUE); } -int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { - SSingleWorker *pWorker = &pMgmt->mgmtWorker; - dTrace("msg:%p, put into vnode-mgmt worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - taosWriteQitem(pWorker->queue, pMsg); +int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { + dTrace("msg:%p, put into vnode-mgmt queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); + taosWriteQitem(pMgmt->mgmtWorker.queue, pMsg); return 0; } -int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { - SSingleWorker *pWorker = &pMgmt->monitorWorker; - dTrace("msg:%p, put into vnode-monitor worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - taosWriteQitem(pWorker->queue, pMsg); +int32_t vmPutMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { + dTrace("msg:%p, put into vnode-monitor queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); + taosWriteQitem(pMgmt->monitorWorker.queue, pMsg); return 0; } static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType qtype) { - SMsgHead *pHead = pRpc->pCont; - SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); - if (pVnode == NULL) return -1; - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); - int32_t code = 0; - if (pMsg == NULL) { rpcFreeCont(pRpc->pCont); pRpc->pCont = NULL; - code = -1; - } else { - memcpy(pMsg, pRpc, sizeof(SRpcMsg)); - switch (qtype) { - case WRITE_QUEUE: - dTrace("msg:%p, create and put into vnode-write worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); - taosWriteQitem(pVnode->pWriteQ, pMsg); - break; - case QUERY_QUEUE: - dTrace("msg:%p, create and put into vnode-query queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); - taosWriteQitem(pVnode->pQueryQ, pMsg); - break; - case FETCH_QUEUE: - dTrace("msg:%p, create and put into vnode-fetch queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); - taosWriteQitem(pVnode->pFetchQ, pMsg); - break; - case APPLY_QUEUE: - dTrace("msg:%p, create and put into vnode-apply queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); - taosWriteQitem(pVnode->pApplyQ, pMsg); - break; - case MERGE_QUEUE: - dTrace("msg:%p, create and put into vnode-merge queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); - taosWriteQitem(pVnode->pMergeQ, pMsg); - break; - case SYNC_QUEUE: - dTrace("msg:%p, create and put into vnode-sync queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); - taosWriteQitem(pVnode->pSyncQ, pMsg); - break; - default: - code = -1; - terrno = TSDB_CODE_INVALID_PARA; - break; - } + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } - vmReleaseVnode(pMgmt, pVnode); - return code; + SMsgHead *pHead = pRpc->pCont; + pHead->contLen = htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); + memcpy(pMsg, pRpc, sizeof(SRpcMsg)); + + dTrace("msg:%p, is created and will put into vnode queue", pMsg); + return vmPutMsgToQueue(pMgmt, pMsg, qtype); } int32_t vmPutRpcMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) { @@ -467,29 +437,23 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { pMPool->max = tsNumOfVnodeMergeThreads; if (tWWorkerInit(pMPool) != 0) return -1; - SSingleWorkerCfg cfg = { + SSingleWorkerCfg mgmtCfg = { .min = 1, .max = 1, .name = "vnode-mgmt", - .fp = (FItem)vmProcessQueue, + .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt, }; - if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) { - dError("failed to start vnode-mgmt worker since %s", terrstr()); - return -1; - } + if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) return -1; - SSingleWorkerCfg mCfg = { + SSingleWorkerCfg monitorCfg = { .min = 1, .max = 1, .name = "vnode-monitor", - .fp = (FItem)vmProcessQueue, + .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt, }; - if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { - dError("failed to start vnode-monitor worker since %s", terrstr()); - return -1; - } + if (tSingleWorkerInit(&pMgmt->monitorWorker, &monitorCfg) != 0) return -1; dDebug("vnode workers are initialized"); return 0; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index e5893fd947..955b94dd08 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -117,6 +117,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { if (pMsg == NULL) { goto _OVER; } + dTrace("msg:%p, is created", pMsg); if (dmBuildNodeMsg(pMsg, pRpc) != 0) { goto _OVER; -- GitLab