From c4264da2ec090eab16bf5a96bfe242b660812834 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 8 May 2022 22:20:53 +0800 Subject: [PATCH] refactor: vnodes mgmt --- source/dnode/mgmt/interface/inc/dmDef.h | 2 + source/dnode/mgmt/interface/src/dmEnv.c | 1 + source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 19 +-- source/dnode/mgmt/mgmt_vnode/src/vmFile.c | 32 ++-- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 128 +++++++++------ source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 83 ++-------- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 170 +++++++++----------- source/dnode/vnode/inc/vnode.h | 2 +- 8 files changed, 202 insertions(+), 235 deletions(-) diff --git a/source/dnode/mgmt/interface/inc/dmDef.h b/source/dnode/mgmt/interface/inc/dmDef.h index 087892e741..2e8ad982d8 100644 --- a/source/dnode/mgmt/interface/inc/dmDef.h +++ b/source/dnode/mgmt/interface/inc/dmDef.h @@ -41,6 +41,8 @@ #include "monitor.h" #include "sync.h" +#include "libs/function/function.h" + #ifdef __cplusplus extern "C" { #endif diff --git a/source/dnode/mgmt/interface/src/dmEnv.c b/source/dnode/mgmt/interface/src/dmEnv.c index 92a6e018cb..2c836714ce 100644 --- a/source/dnode/mgmt/interface/src/dmEnv.c +++ b/source/dnode/mgmt/interface/src/dmEnv.c @@ -55,6 +55,7 @@ void dmCleanup() { monCleanup(); syncCleanUp(); walCleanUp(); + udfcClose(); taosStopCacheRefreshWorker(); dInfo("dnode env is cleaned up"); } diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index fa5e1d78a0..51b3860461 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -29,15 +29,15 @@ typedef struct SVnodesMgmt { SHashObj *hash; SRWLatch latch; SVnodesStat state; + const char *path; + SDnode *pDnode; + SMgmtWrapper *pWrapper; STfs *pTfs; SQWorkerPool queryPool; SQWorkerPool fetchPool; SWWorkerPool syncPool; SWWorkerPool writePool; SWWorkerPool mergePool; - const char *path; - SDnode *pDnode; - SMgmtWrapper *pWrapper; SSingleWorker mgmtWorker; SSingleWorker monitorWorker; } SVnodesMgmt; @@ -95,9 +95,9 @@ int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo); // vmFile.c -int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); -int32_t vmWriteVnodesToFile(SVnodesMgmt *pMgmt); -SVnodeObj **vmGetVnodesFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes); +int32_t vmGetVnodeListFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); +int32_t vmWriteVnodeListToFile(SVnodesMgmt *pMgmt); +SVnodeObj **vmGetVnodeListFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes); // vmWorker.c int32_t vmStartWorker(SVnodesMgmt *pMgmt); @@ -105,11 +105,12 @@ void vmStopWorker(SVnodesMgmt *pMgmt); int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); -int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); // sync integration -int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); +int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); +int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); +int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); +int32_t vmPutMsgToMergeQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype); int32_t vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index 7e00a022b2..f251dd120e 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "vmInt.h" -SVnodeObj **vmGetVnodesFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes) { +SVnodeObj **vmGetVnodeListFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes) { taosRLockLatch(&pMgmt->latch); int32_t num = 0; @@ -44,14 +44,14 @@ SVnodeObj **vmGetVnodesFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes) { return pVnodes; } -int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) { +int32_t vmGetVnodeListFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) { int32_t code = TSDB_CODE_INVALID_JSON_FORMAT; int32_t len = 0; int32_t maxLen = 30000; char *content = taosMemoryCalloc(1, maxLen + 1); cJSON *root = NULL; FILE *fp = NULL; - char file[PATH_MAX]; + char file[PATH_MAX] = {0}; SWrapperCfg *pCfgs = NULL; TdFilePtr pFile = NULL; @@ -61,26 +61,26 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n if (pFile == NULL) { dDebug("file %s not exist", file); code = 0; - goto PRASE_VNODE_OVER; + goto _OVER; } len = (int32_t)taosReadFile(pFile, content, maxLen); if (len <= 0) { dError("failed to read %s since content is null", file); - goto PRASE_VNODE_OVER; + goto _OVER; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { dError("failed to read %s since invalid json format", file); - goto PRASE_VNODE_OVER; + goto _OVER; } cJSON *vnodes = cJSON_GetObjectItem(root, "vnodes"); if (!vnodes || vnodes->type != cJSON_Array) { dError("failed to read %s since vnodes not found", file); - goto PRASE_VNODE_OVER; + goto _OVER; } int32_t vnodesNum = cJSON_GetArraySize(vnodes); @@ -88,7 +88,7 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n pCfgs = taosMemoryCalloc(vnodesNum, sizeof(SWrapperCfg)); if (pCfgs == NULL) { dError("failed to read %s since out of memory", file); - goto PRASE_VNODE_OVER; + goto _OVER; } for (int32_t i = 0; i < vnodesNum; ++i) { @@ -98,7 +98,7 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId"); if (!vgId || vgId->type != cJSON_Number) { dError("failed to read %s since vgId not found", file); - goto PRASE_VNODE_OVER; + goto _OVER; } pCfg->vgId = vgId->valueint; snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCfg->vgId); @@ -106,28 +106,28 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped"); if (!dropped || dropped->type != cJSON_Number) { dError("failed to read %s since dropped not found", file); - goto PRASE_VNODE_OVER; + goto _OVER; } pCfg->dropped = dropped->valueint; cJSON *vgVersion = cJSON_GetObjectItem(vnode, "vgVersion"); if (!vgVersion || vgVersion->type != cJSON_Number) { dError("failed to read %s since vgVersion not found", file); - goto PRASE_VNODE_OVER; + goto _OVER; } pCfg->vgVersion = vgVersion->valueint; cJSON *dbUid = cJSON_GetObjectItem(vnode, "dbUid"); if (!dbUid || dbUid->type != cJSON_String) { dError("failed to read %s since dbUid not found", file); - goto PRASE_VNODE_OVER; + goto _OVER; } pCfg->dbUid = atoll(dbUid->valuestring); cJSON *db = cJSON_GetObjectItem(vnode, "db"); if (!db || db->type != cJSON_String) { dError("failed to read %s since db not found", file); - goto PRASE_VNODE_OVER; + goto _OVER; } tstrncpy(pCfg->db, db->valuestring, TSDB_DB_FNAME_LEN); } @@ -139,7 +139,7 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n code = 0; dInfo("succcessed to read file %s", file); -PRASE_VNODE_OVER: +_OVER: if (content != NULL) taosMemoryFree(content); if (root != NULL) cJSON_Delete(root); if (pFile != NULL) taosCloseFile(&pFile); @@ -148,7 +148,7 @@ PRASE_VNODE_OVER: return code; } -int32_t vmWriteVnodesToFile(SVnodesMgmt *pMgmt) { +int32_t vmWriteVnodeListToFile(SVnodesMgmt *pMgmt) { char file[PATH_MAX]; char realfile[PATH_MAX]; snprintf(file, sizeof(file), "%s%svnodes.json.bak", pMgmt->path, TD_DIRSEP); @@ -162,7 +162,7 @@ int32_t vmWriteVnodesToFile(SVnodesMgmt *pMgmt) { } int32_t numOfVnodes = 0; - SVnodeObj **pVnodes = vmGetVnodesFromHash(pMgmt, &numOfVnodes); + SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); int32_t len = 0; int32_t maxLen = 65536; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 7fcfd5133d..b19602f3ff 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -16,12 +16,37 @@ #define _DEFAULT_SOURCE #include "vmInt.h" +void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) { + SVnodesMgmt *pMgmt = pWrapper->pMgmt; + + pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); + if (pInfo->pVloads == NULL) return; + + taosRLockLatch(&pMgmt->latch); + + void *pIter = taosHashIterate(pMgmt->hash, NULL); + while (pIter) { + SVnodeObj **ppVnode = pIter; + if (ppVnode == NULL || *ppVnode == NULL) continue; + + SVnodeObj *pVnode = *ppVnode; + SVnodeLoad vload = {0}; + vnodeGetLoad(pVnode->pImpl, &vload); + taosArrayPush(pInfo->pVloads, &vload); + pIter = taosHashIterate(pMgmt->hash, pIter); + } + + taosRUnLockLatch(&pMgmt->latch); +} + void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) { SVnodesMgmt *pMgmt = pWrapper->pMgmt; SMonVloadInfo vloads = {0}; vmGetVnodeLoads(pWrapper, &vloads); - if (vloads.pVloads == NULL) return; + + SArray *pVloads = vloads.pVloads; + if (pVloads == NULL) return; int32_t totalVnodes = 0; int32_t masterNum = 0; @@ -31,8 +56,8 @@ void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) { int64_t numOfBatchInsertReqs = 0; int64_t numOfBatchInsertSuccessReqs = 0; - for (int32_t i = 0; i < taosArrayGetSize(vloads.pVloads); ++i) { - SVnodeLoad *pLoad = taosArrayGet(vloads.pVloads, i); + for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) { + SVnodeLoad *pLoad = taosArrayGet(pVloads, i); numOfSelectReqs += pLoad->numOfSelectReqs; numOfInsertReqs += pLoad->numOfInsertReqs; numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs; @@ -49,10 +74,16 @@ void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) { pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs - pMgmt->state.numOfInsertSuccessReqs; pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs - pMgmt->state.numOfBatchInsertReqs; pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs - pMgmt->state.numOfBatchInsertSuccessReqs; - pMgmt->state = pInfo->vstat; + pMgmt->state.totalVnodes = totalVnodes; + pMgmt->state.masterNum = masterNum; + pMgmt->state.numOfSelectReqs = numOfSelectReqs; + pMgmt->state.numOfInsertReqs = numOfInsertReqs; + pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs; + pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs; + pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs); - taosArrayDestroy(vloads.pVloads); + taosArrayDestroy(pVloads); } int32_t vmProcessGetMonVmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { @@ -107,12 +138,13 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg)); pCfg->vgId = pCreate->vgId; - strcpy(pCfg->dbname, pCreate->db); + tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname)); + pCfg->dbId = pCreate->dbUid; pCfg->isWeak = true; pCfg->tsdbCfg.days = 10; - pCfg->tsdbCfg.keep2 = 3650; pCfg->tsdbCfg.keep0 = 3650; pCfg->tsdbCfg.keep1 = 3650; + pCfg->tsdbCfg.keep2 = 3650; for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) { memcpy(&pCfg->tsdbCfg.retentions[i], taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention)); } @@ -121,30 +153,30 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->hashEnd = pCreate->hashEnd; pCfg->hashMethod = pCreate->hashMethod; - // sync integration pCfg->syncCfg.myIndex = pCreate->selfIndex; pCfg->syncCfg.replicaNum = pCreate->replica; - memset(&(pCfg->syncCfg.nodeInfo), 0, sizeof(pCfg->syncCfg.nodeInfo)); + memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo)); for (int i = 0; i < pCreate->replica; ++i) { - (pCfg->syncCfg.nodeInfo)[i].nodePort = (pCreate->replicas)[i].port; - snprintf((pCfg->syncCfg.nodeInfo)[i].nodeFqdn, sizeof((pCfg->syncCfg.nodeInfo)[i].nodeFqdn), "%s", - (pCreate->replicas)[i].fqdn); + pCfg->syncCfg.nodeInfo[i].nodePort = pCreate->replicas[i].port; + snprintf(pCfg->syncCfg.nodeInfo[i].nodeFqdn, sizeof(pCfg->syncCfg.nodeInfo[i].nodeFqdn), "%s", + pCreate->replicas[i].fqdn); } } static void vmGenerateWrapperCfg(SVnodesMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) { - memcpy(pCfg->db, pCreate->db, TSDB_DB_FNAME_LEN); - pCfg->dbUid = pCreate->dbUid; - pCfg->dropped = 0; - snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId); pCfg->vgId = pCreate->vgId; pCfg->vgVersion = pCreate->vgVersion; + pCfg->dropped = 0; + pCfg->dbUid = pCreate->dbUid; + tstrncpy(pCfg->db, pCreate->db, TSDB_DB_FNAME_LEN); + snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId); } int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SRpcMsg *pReq = &pMsg->rpcMsg; SCreateVnodeReq createReq = {0}; - char path[TSDB_FILENAME_LEN]; + int32_t code = -1; + char path[TSDB_FILENAME_LEN] = {0}; if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -161,14 +193,13 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SVnodeObj *pVnode = vmAcquireVnode(pMgmt, createReq.vgId); if (pVnode != NULL) { - tFreeSCreateVnodeReq(&createReq); dDebug("vgId:%d, already exist", createReq.vgId); + tFreeSCreateVnodeReq(&createReq); vmReleaseVnode(pMgmt, pVnode); terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED; return -1; } - // create vnode snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId); if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) { tFreeSCreateVnodeReq(&createReq); @@ -179,49 +210,43 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SMsgCb msgCb = pMgmt->pDnode->data.msgCb; msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue; + msgCb.queueFps[SYNC_QUEUE] = vmPutMsgToSyncQueue; + msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; - msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; - msgCb.queueFps[SYNC_QUEUE] = vmPutMsgToSyncQueue; // sync integration + msgCb.queueFps[MERGE_QUEUE] = vmPutMsgToMergeQueue; msgCb.qsizeFp = vmGetQueueSize; SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb); if (pImpl == NULL) { dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr()); - tFreeSCreateVnodeReq(&createReq); - return -1; + goto _OVER; } - int32_t code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl); + code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl); if (code != 0) { - tFreeSCreateVnodeReq(&createReq); dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr()); - vnodeClose(pImpl); - vnodeDestroy(path, pMgmt->pTfs); - terrno = code; - return code; + goto _OVER; } code = vnodeStart(pImpl); if (code != 0) { - tFreeSCreateVnodeReq(&createReq); dError("vgId:%d, failed to start sync since %s", createReq.vgId, terrstr()); - vnodeClose(pImpl); - vnodeDestroy(path, pMgmt->pTfs); - terrno = code; - return code; + goto _OVER; } - code = vmWriteVnodesToFile(pMgmt); + code = vmWriteVnodeListToFile(pMgmt); + if (code != 0) goto _OVER; + +_OVER: if (code != 0) { - tFreeSCreateVnodeReq(&createReq); vnodeClose(pImpl); vnodeDestroy(path, pMgmt->pTfs); - terrno = code; - return code; } - return 0; + tFreeSCreateVnodeReq(&createReq); + terrno = code; + return code; } int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { @@ -243,14 +268,14 @@ int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { } pVnode->dropped = 1; - if (vmWriteVnodesToFile(pMgmt) != 0) { + if (vmWriteVnodeListToFile(pMgmt) != 0) { pVnode->dropped = 0; vmReleaseVnode(pMgmt, pVnode); return -1; } vmCloseVnode(pMgmt, pVnode); - vmWriteVnodesToFile(pMgmt); + vmWriteVnodeListToFile(pMgmt); return 0; } @@ -287,7 +312,7 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) { dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT_RSMA, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg, DEFAULT_HANDLE); @@ -300,14 +325,13 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) { dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE); - // sync integration - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_TIMEOUT, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING_REPLY, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_TIMEOUT, vmProcessSyncMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING, vmProcessSyncMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING_REPLY, vmProcessSyncMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST, vmProcessSyncMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, vmProcessSyncMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE, vmProcessSyncMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, vmProcessSyncMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES, vmProcessSyncMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, vmProcessSyncMsg, DEFAULT_HANDLE); } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 17c7563885..ab4174857a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -15,7 +15,6 @@ #define _DEFAULT_SOURCE #include "vmInt.h" -#include "libs/function/function.h" SVnodeObj *vmAcquireVnode(SVnodesMgmt *pMgmt, int32_t vgId) { SVnodeObj *pVnode = NULL; @@ -55,14 +54,14 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { pVnode->vgId = pCfg->vgId; pVnode->refCount = 0; + pVnode->vgVersion = pCfg->vgVersion; pVnode->dropped = 0; pVnode->accessState = TSDB_VN_ALL_ACCCESS; - pVnode->pWrapper = pMgmt->pWrapper; - pVnode->pImpl = pImpl; - pVnode->vgVersion = pCfg->vgVersion; pVnode->dbUid = pCfg->dbUid; pVnode->db = tstrdup(pCfg->db); pVnode->path = tstrdup(pCfg->path); + pVnode->pImpl = pImpl; + pVnode->pWrapper = pMgmt->pWrapper; if (pVnode->path == NULL || pVnode->db == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -78,14 +77,11 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); taosWUnLockLatch(&pMgmt->latch); - if (code != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - } return code; } void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { - char path[TSDB_FILENAME_LEN]; + char path[TSDB_FILENAME_LEN] = {0}; taosWLockLatch(&pMgmt->latch); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); @@ -98,6 +94,7 @@ void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pMergeQ)) taosMsleep(10); vmFreeQueue(pMgmt, pVnode); vnodeClose(pVnode->pImpl); @@ -116,7 +113,7 @@ void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { taosMemoryFree(pVnode); } -static void *vmOpenVnodeFunc(void *param) { +static void *vmOpenVnodeInThread(void *param) { SVnodeThread *pThread = param; SVnodesMgmt *pMgmt = pThread->pMgmt; SDnode *pDnode = pMgmt->pDnode; @@ -136,10 +133,11 @@ static void *vmOpenVnodeFunc(void *param) { SMsgCb msgCb = pMgmt->pDnode->data.msgCb; msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue; + msgCb.queueFps[SYNC_QUEUE] = vmPutMsgToSyncQueue; + msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; - msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; - msgCb.queueFps[SYNC_QUEUE] = vmPutMsgToSyncQueue; // sync integration + msgCb.queueFps[MERGE_QUEUE] = vmPutMsgToMergeQueue; msgCb.qsizeFp = vmGetQueueSize; snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId); SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb); @@ -148,12 +146,10 @@ static void *vmOpenVnodeFunc(void *param) { pThread->failed++; } else { vmOpenVnode(pMgmt, pCfg, pImpl); - // vnodeStart(pImpl); dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->opened++; + atomic_add_fetch_32(&pMgmt->state.openVnodes, 1); } - - atomic_add_fetch_32(&pMgmt->state.openVnodes, 1); } dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened, @@ -163,29 +159,24 @@ static void *vmOpenVnodeFunc(void *param) { static int32_t vmOpenVnodes(SVnodesMgmt *pMgmt) { SDnode *pDnode = pMgmt->pDnode; - taosInitRWLatch(&pMgmt->latch); pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); if (pMgmt->hash == NULL) { - dError("failed to init vnode hash"); terrno = TSDB_CODE_OUT_OF_MEMORY; + dError("failed to init vnode hash since %s", terrstr()); return -1; } SWrapperCfg *pCfgs = NULL; int32_t numOfVnodes = 0; - if (vmGetVnodesFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) { + if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) { dInfo("failed to get vnode list from disk since %s", terrstr()); return -1; } pMgmt->state.totalVnodes = numOfVnodes; -#if 0 - int32_t threadNum = tsNumOfCores; -#else - int32_t threadNum = 1; -#endif + int32_t threadNum = 1; // tsNumOfCores; int32_t vnodesPerThread = numOfVnodes / threadNum + 1; SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread)); @@ -210,7 +201,7 @@ static int32_t vmOpenVnodes(SVnodesMgmt *pMgmt) { TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); - if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeFunc, pThread) != 0) { + if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) { dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno)); } @@ -240,7 +231,7 @@ static void vmCloseVnodes(SVnodesMgmt *pMgmt) { dInfo("start to close all vnodes"); int32_t numOfVnodes = 0; - SVnodeObj **pVnodes = vmGetVnodesFromHash(pMgmt, &numOfVnodes); + SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); for (int32_t i = 0; i < numOfVnodes; ++i) { vmCloseVnode(pMgmt, pVnodes[i]); @@ -267,12 +258,9 @@ static void vmCleanup(SMgmtWrapper *pWrapper) { vmStopWorker(pMgmt); vnodeCleanup(); tfsClose(pMgmt->pTfs); - // walCleanUp(); taosMemoryFree(pMgmt); pWrapper->pMgmt = NULL; - // syncCleanUp(); - udfcClose(); dInfo("vnode-mgmt is cleaned up"); } @@ -313,7 +301,6 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { } dmReportStartup(pDnode, "vnode-wal", "initialized"); - // sync integration if (syncInit() != 0) { dError("failed to open sync since %s", terrstr()); return -1; @@ -381,23 +368,7 @@ static int32_t vmStart(SMgmtWrapper *pWrapper) { } static void vmStop(SMgmtWrapper *pWrapper) { -#if 0 - dDebug("vnode-mgmt start to stop"); - SVnodesMgmt *pMgmt = pWrapper->pMgmt; - taosRLockLatch(&pMgmt->latch); - - void *pIter = taosHashIterate(pMgmt->hash, NULL); - while (pIter) { - SVnodeObj **ppVnode = pIter; - if (ppVnode == NULL || *ppVnode == NULL) continue; - - SVnodeObj *pVnode = *ppVnode; - vnodeStop(pVnode->pImpl); - pIter = taosHashIterate(pMgmt->hash, pIter); - } - - taosRUnLockLatch(&pMgmt->latch); -#endif + // process inside the vnode } void vmSetMgmtFp(SMgmtWrapper *pWrapper) { @@ -413,25 +384,3 @@ void vmSetMgmtFp(SMgmtWrapper *pWrapper) { pWrapper->fp = mgmtFp; } -void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; - - pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); - if (pInfo->pVloads == NULL) return; - - taosRLockLatch(&pMgmt->latch); - - void *pIter = taosHashIterate(pMgmt->hash, NULL); - while (pIter) { - SVnodeObj **ppVnode = pIter; - if (ppVnode == NULL || *ppVnode == NULL) continue; - - SVnodeObj *pVnode = *ppVnode; - SVnodeLoad vload = {0}; - vnodeGetLoad(pVnode->pImpl, &vload); - taosArrayPush(pInfo->pVloads, &vload); - pIter = taosHashIterate(pMgmt->hash, pIter); - } - - taosRUnLockLatch(&pMgmt->latch); -} diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 06e24cb48a..11afe33335 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -14,28 +14,29 @@ */ #define _DEFAULT_SOURCE - #include "vmInt.h" #include "sync.h" #include "syncTools.h" -static inline void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { - SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, - .ahandle = pMsg->rpcMsg.ahandle, - .refId = pMsg->rpcMsg.refId, - .code = code, - .pCont = pMsg->pRsp, - .contLen = pMsg->rspLen}; +static inline void vmSendRsp(SNodeMsg *pMsg, int32_t code) { + SRpcMsg rsp = { + .handle = pMsg->rpcMsg.handle, + .ahandle = pMsg->rpcMsg.ahandle, + .refId = pMsg->rpcMsg.refId, + .code = code, + .pCont = pMsg->pRsp, + .contLen = pMsg->rspLen, + }; tmsgSendRsp(&rsp); } -static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { +static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { SVnodesMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; tmsg_t msgType = pMsg->rpcMsg.msgType; - dTrace("msg:%p, will be processed in vnode-m queue", pMsg); + dTrace("msg:%p, will be processed in vnode-mgmt/monitor queue", pMsg); switch (msgType) { case TDMT_MON_VM_INFO: @@ -52,12 +53,12 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; - dError("msg:%p, not processed in vnode-mgmt queue", pMsg); + dError("msg:%p, not processed in vnode-mgmt/monitor queue", pMsg); } if (msgType & 1u) { if (code != 0 && terrno != 0) code = terrno; - vmSendRsp(pMgmt->pWrapper, pMsg, code); + vmSendRsp(pMsg, code); } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); @@ -71,7 +72,9 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { dTrace("msg:%p, will be processed in vnode-query queue", pMsg); int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg); if (code != 0) { - vmSendRsp(pVnode->pWrapper, pMsg, code); + if (terrno != 0) code = terrno; + vmSendRsp(pMsg, code); + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); rpcFreeCont(pMsg->rpcMsg.pCont); taosFreeQitem(pMsg); @@ -84,7 +87,9 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg, pInfo); if (code != 0) { - vmSendRsp(pVnode->pWrapper, pMsg, code); + if (terrno != 0) code = terrno; + vmSendRsp(pMsg, code); + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); rpcFreeCont(pMsg->rpcMsg.pCont); taosFreeQitem(pMsg); @@ -108,32 +113,10 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO dTrace("msg:%p, will be processed in vnode-write queue", pMsg); if (taosArrayPush(pArray, &pMsg) == NULL) { dTrace("msg:%p, failed to process since %s", pMsg, terrstr()); - vmSendRsp(pVnode->pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY); + vmSendRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY); } } -#if 0 - int64_t version; - - vnodePreprocessWriteReqs(pVnode->pImpl, pArray, &version); - - numOfMsgs = taosArrayGetSize(pArray); - for (int32_t i = 0; i < numOfMsgs; i++) { - SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); - SRpcMsg *pRpc = &pMsg->rpcMsg; - - rsp.pCont = NULL; - rsp.contLen = 0; - rsp.code = 0; - rsp.handle = pRpc->handle; - rsp.ahandle = pRpc->ahandle; - rsp.refId = pRpc->refId; - - int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &rsp); - tmsgSendRsp(&rsp); - } -#else - // sync integration response for (int i = 0; i < taosArrayGetSize(pArray); i++) { SNodeMsg *pMsg; SRpcMsg *pRpc; @@ -168,7 +151,6 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO assert(0); } } -#endif for (int32_t i = 0; i < numOfMsgs; i++) { SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); @@ -186,9 +168,6 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO SRpcMsg rsp; for (int32_t i = 0; i < numOfMsgs; ++i) { -#if 1 - // sync integration - taosGetQitem(qall, (void **)&pMsg); // init response rpc msg @@ -219,7 +198,6 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO rsp.refId = pMsg->rpcMsg.refId; tmsgSendRsp(&rsp); } -#endif } } @@ -246,7 +224,9 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO dTrace("msg:%p, will be processed in vnode-merge queue", pMsg); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg, pInfo); if (code != 0) { - vmSendRsp(pVnode->pWrapper, pMsg, code); + if (terrno != 0) code = terrno; + vmSendRsp(pMsg, code); + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); rpcFreeCont(pMsg->rpcMsg.pCont); taosFreeQitem(pMsg); @@ -257,16 +237,17 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) { SRpcMsg *pRpc = &pMsg->rpcMsg; SMsgHead *pHead = pRpc->pCont; + int32_t code = 0; + pHead->contLen = ntohl(pHead->contLen); pHead->vgId = ntohl(pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) { dError("vgId:%d, failed to write msg:%p to vnode-queue since %s", pHead->vgId, pMsg, terrstr()); - return terrno; + return terrno != 0 ? terrno : -1; } - int32_t code = 0; switch (qtype) { case QUERY_QUEUE: dTrace("msg:%p, type:%s will be written into vnode-query queue", pMsg, TMSG_INFO(pRpc->msgType)); @@ -326,7 +307,7 @@ int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SVnodesMgmt *pMgmt = pWrapper->pMgmt; SSingleWorker *pWorker = &pMgmt->mgmtWorker; - dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name); + dTrace("msg:%p, will be put into vnode-mgmt queue, worker:%s", pMsg, pWorker->name); taosWriteQitem(pWorker->queue, pMsg); return 0; } @@ -335,7 +316,7 @@ int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SVnodesMgmt *pMgmt = pWrapper->pMgmt; SSingleWorker *pWorker = &pMgmt->monitorWorker; - dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + dTrace("msg:%p, will be put into vnode-monitor queue, worker:%s", pMsg, pWorker->name); taosWriteQitem(pWorker->queue, pMsg); return 0; } @@ -350,9 +331,7 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); int32_t code = 0; - if (pMsg == NULL) { - code = -1; - } else { + if (pMsg != NULL) { dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); pMsg->rpcMsg = *pRpc; // if (pMsg->rpcMsg.handle != NULL) assert(pMsg->rpcMsg.refId != 0); @@ -377,7 +356,7 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT dTrace("msg:%p, will be put into vnode-merge queue", pMsg); taosWriteQitem(pVnode->pMergeQ, pMsg); break; - case SYNC_QUEUE: // sync integration + case SYNC_QUEUE: dTrace("msg:%p, will be put into vnode-sync queue", pMsg); taosWriteQitem(pVnode->pSyncQ, pMsg); break; @@ -387,6 +366,7 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT break; } } + vmReleaseVnode(pMgmt, pVnode); return code; } @@ -395,6 +375,14 @@ int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { return vmPutRpcMsgToQueue(pWrapper, pRpc, WRITE_QUEUE); } +int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + return vmPutRpcMsgToQueue(pWrapper, pRpc, SYNC_QUEUE); +} + +int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE); +} + int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { return vmPutRpcMsgToQueue(pWrapper, pRpc, QUERY_QUEUE); } @@ -403,30 +391,15 @@ int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { return vmPutRpcMsgToQueue(pWrapper, pRpc, FETCH_QUEUE); } -int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE); -} - int32_t vmPutMsgToMergeQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { return vmPutRpcMsgToQueue(pWrapper, pRpc, MERGE_QUEUE); } -// sync integration -int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - return vmPutRpcMsgToQueue(pWrapper, pRpc, SYNC_QUEUE); -} - int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { int32_t size = -1; SVnodeObj *pVnode = vmAcquireVnode(pWrapper->pMgmt, vgId); if (pVnode != NULL) { switch (qtype) { - case QUERY_QUEUE: - size = taosQueueSize(pVnode->pQueryQ); - break; - case FETCH_QUEUE: - size = taosQueueSize(pVnode->pFetchQ); - break; case WRITE_QUEUE: size = taosQueueSize(pVnode->pWriteQ); break; @@ -436,6 +409,12 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { case APPLY_QUEUE: size = taosQueueSize(pVnode->pApplyQ); break; + case QUERY_QUEUE: + size = taosQueueSize(pVnode->pQueryQ); + break; + case FETCH_QUEUE: + size = taosQueueSize(pVnode->pFetchQ); + break; case MERGE_QUEUE: size = taosQueueSize(pVnode->pMergeQ); break; @@ -449,14 +428,14 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue); - pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue); - pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); - pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); + pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); + pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); + pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue); - if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL || - pVnode->pQueryQ == NULL || pVnode->pMergeQ == NULL) { + if (pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pApplyQ == NULL || pVnode->pQueryQ == NULL || + pVnode->pFetchQ == NULL || pVnode->pMergeQ == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -466,17 +445,17 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { } void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { - tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); - tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); + tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); + tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); + tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); tWWorkerFreeQueue(&pMgmt->mergePool, pVnode->pMergeQ); - tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); pVnode->pWriteQ = NULL; - pVnode->pApplyQ = NULL; pVnode->pSyncQ = NULL; - pVnode->pFetchQ = NULL; + pVnode->pApplyQ = NULL; pVnode->pQueryQ = NULL; + pVnode->pFetchQ = NULL; pVnode->pMergeQ = NULL; dDebug("vgId:%d, vnode queue is freed", pVnode->vgId); } @@ -499,17 +478,23 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { pWPool->max = tsNumOfVnodeWriteThreads; if (tWWorkerInit(pWPool) != 0) return -1; - pWPool = &pMgmt->syncPool; - pWPool->name = "vnode-sync"; - pWPool->max = tsNumOfVnodeSyncThreads; - if (tWWorkerInit(pWPool) != 0) return -1; - - pWPool = &pMgmt->mergePool; - pWPool->name = "vnode-merge"; - pWPool->max = tsNumOfVnodeMergeThreads; - if (tWWorkerInit(pWPool) != 0) return -1; - - SSingleWorkerCfg cfg = {.min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; + SWWorkerPool *pSPool = &pMgmt->syncPool; + pSPool->name = "vnode-sync"; + pSPool->max = tsNumOfVnodeSyncThreads; + if (tWWorkerInit(pSPool) != 0) return -1; + + SWWorkerPool *pMPool = &pMgmt->mergePool; + pMPool->name = "vnode-merge"; + pMPool->max = tsNumOfVnodeMergeThreads; + if (tWWorkerInit(pMPool) != 0) return -1; + + SSingleWorkerCfg cfg = { + .min = 1, + .max = 1, + .name = "vnode-mgmt", + .fp = (FItem)vmProcessMgmtMonitorQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) { dError("failed to start vnode-mgmt worker since %s", terrstr()); return -1; @@ -517,7 +502,12 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { if (tsMultiProcess) { SSingleWorkerCfg mCfg = { - .min = 1, .max = 1, .name = "vnode-monitor", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; + .min = 1, + .max = 1, + .name = "vnode-monitor", + .fp = (FItem)vmProcessMgmtMonitorQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { dError("failed to start mnode vnode-monitor worker since %s", terrstr()); return -1; @@ -531,10 +521,10 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { void vmStopWorker(SVnodesMgmt *pMgmt) { tSingleWorkerCleanup(&pMgmt->monitorWorker); tSingleWorkerCleanup(&pMgmt->mgmtWorker); - tQWorkerCleanup(&pMgmt->fetchPool); - tQWorkerCleanup(&pMgmt->queryPool); tWWorkerCleanup(&pMgmt->writePool); tWWorkerCleanup(&pMgmt->syncPool); + tQWorkerCleanup(&pMgmt->queryPool); + tQWorkerCleanup(&pMgmt->fetchPool); tWWorkerCleanup(&pMgmt->mergePool); dDebug("vnode workers are closed"); } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 28f3b3edb9..8850b35397 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -148,7 +148,7 @@ struct STsdbCfg { struct SVnodeCfg { int32_t vgId; - char dbname[TSDB_DB_NAME_LEN]; + char dbname[TSDB_DB_FNAME_LEN]; uint64_t dbId; int32_t szPage; int32_t szCache; -- GitLab