提交 1bf35e9e 编写于 作者: S Shengliang

refactor: node mgmt

上级 628a5bba
......@@ -61,6 +61,12 @@ static int32_t qmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
}
tmsgReportStartup("qnode-impl", "initialized");
if (udfcOpen() != 0) {
dError("qnode can not open udfc");
qmClose(pMgmt);
return -1;
}
if (qmStartWorker(pMgmt) != 0) {
dError("failed to start qnode worker since %s", terrstr());
qmClose(pMgmt);
......
......@@ -25,14 +25,11 @@
extern "C" {
#endif
typedef struct SVnodesMgmt {
SHashObj *hash;
SRWLatch latch;
SVnodesStat state;
typedef struct SVnodeMgmt {
SMsgCb msgCb;
const char *path;
SDnode *pDnode;
SMgmtWrapper *pWrapper;
STfs *pTfs;
const char *name;
int32_t dnodeId;
SQWorkerPool queryPool;
SQWorkerPool fetchPool;
SWWorkerPool syncPool;
......@@ -40,7 +37,11 @@ typedef struct SVnodesMgmt {
SWWorkerPool mergePool;
SSingleWorker mgmtWorker;
SSingleWorker monitorWorker;
} SVnodesMgmt;
SHashObj *hash;
SRWLatch latch;
SVnodesStat state;
STfs *pTfs;
} SVnodeMgmt;
typedef struct {
int32_t vgId;
......@@ -52,22 +53,21 @@ typedef struct {
} SWrapperCfg;
typedef struct {
int32_t vgId;
int32_t refCount;
int32_t vgVersion;
int8_t dropped;
int8_t accessState;
uint64_t dbUid;
char *db;
char *path;
SVnode *pImpl;
STaosQueue *pWriteQ;
STaosQueue *pSyncQ;
STaosQueue *pApplyQ;
STaosQueue *pQueryQ;
STaosQueue *pFetchQ;
STaosQueue *pMergeQ;
SMgmtWrapper *pWrapper;
int32_t vgId;
int32_t refCount;
int32_t vgVersion;
int8_t dropped;
int8_t accessState;
uint64_t dbUid;
char *db;
char *path;
SVnode *pImpl;
STaosQueue *pWriteQ;
STaosQueue *pSyncQ;
STaosQueue *pApplyQ;
STaosQueue *pQueryQ;
STaosQueue *pFetchQ;
STaosQueue *pMergeQ;
} SVnodeObj;
typedef struct {
......@@ -76,50 +76,49 @@ typedef struct {
int32_t failed;
int32_t threadIndex;
TdThread thread;
SVnodesMgmt *pMgmt;
SVnodeMgmt *pMgmt;
SWrapperCfg *pCfgs;
} SVnodeThread;
// vmInt.c
SVnodeObj *vmAcquireVnode(SVnodesMgmt *pMgmt, int32_t vgId);
void vmReleaseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl);
void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId);
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl);
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
// vmHandle.c
void vmInitMsgHandle(SMgmtWrapper *pWrapper);
int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessGetMonVmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq);
int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq);
void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo);
SArray *vmGetMsgHandles();
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq);
// vmFile.c
int32_t vmGetVnodeListFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
int32_t vmWriteVnodeListToFile(SVnodesMgmt *pMgmt);
SVnodeObj **vmGetVnodeListFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes);
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt);
SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes);
// vmWorker.c
int32_t vmStartWorker(SVnodesMgmt *pMgmt);
void vmStopWorker(SVnodesMgmt *pMgmt);
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmPutMsgToMergeQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype);
int32_t vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrappert, SNodeMsg *pMsg);
int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t vmStartWorker(SVnodeMgmt *pMgmt);
void vmStopWorker(SVnodeMgmt *pMgmt);
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmPutRpcMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutRpcMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutRpcMsgToApplyQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutRpcMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutRpcMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutRpcMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype);
int32_t vmPutNodeMsgToWriteQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmPutNodeMsgToSyncQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmPutNodeMsgToQueryQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmPutNodeMsgToFetchQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmPutNodeMsgToMergeQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
SVnodeObj **vmGetVnodeListFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes) {
SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) {
taosRLockLatch(&pMgmt->latch);
int32_t num = 0;
......@@ -44,7 +44,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes) {
return pVnodes;
}
int32_t vmGetVnodeListFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) {
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) {
int32_t code = TSDB_CODE_INVALID_JSON_FORMAT;
int32_t len = 0;
int32_t maxLen = 30000;
......@@ -148,9 +148,9 @@ _OVER:
return code;
}
int32_t vmWriteVnodeListToFile(SVnodesMgmt *pMgmt) {
char file[PATH_MAX];
char realfile[PATH_MAX];
int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) {
char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%svnodes.json.bak", pMgmt->path, TD_DIRSEP);
snprintf(realfile, sizeof(file), "%s%svnodes.json", pMgmt->path, TD_DIRSEP);
......
......@@ -16,9 +16,7 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
static void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
if (pInfo->pVloads == NULL) return;
......@@ -39,11 +37,9 @@ void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) {
taosRUnLockLatch(&pMgmt->latch);
}
void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
static void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
SMonVloadInfo vloads = {0};
vmGetVnodeLoads(pWrapper, &vloads);
vmGetVnodeLoads(pMgmt, &vloads);
SArray *pVloads = vloads.pVloads;
if (pVloads == NULL) return;
......@@ -86,10 +82,10 @@ void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) {
taosArrayDestroy(pVloads);
}
int32_t vmProcessGetMonVmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq) {
SMonVmInfo vmInfo = {0};
vmGetMonitorInfo(pWrapper, &vmInfo);
dmGetMonitorSysInfo(&vmInfo.sys);
vmGetMonitorInfo(pMgmt, &vmInfo);
dmGetSystemInfo(&vmInfo.sys);
monGetLogs(&vmInfo.log);
int32_t rspLen = tSerializeSMonVmInfo(NULL, 0, &vmInfo);
......@@ -111,9 +107,9 @@ int32_t vmProcessGetMonVmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
return 0;
}
int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq) {
SMonVloadInfo vloads = {0};
vmGetVnodeLoads(pWrapper, &vloads);
vmGetVnodeLoads(pMgmt, &vloads);
int32_t rspLen = tSerializeSMonVloadInfo(NULL, 0, &vloads);
if (rspLen < 0) {
......@@ -163,7 +159,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
}
}
static void vmGenerateWrapperCfg(SVnodesMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
pCfg->vgId = pCreate->vgId;
pCfg->vgVersion = pCreate->vgVersion;
pCfg->dropped = 0;
......@@ -172,7 +168,7 @@ static void vmGenerateWrapperCfg(SVnodesMgmt *pMgmt, SCreateVnodeReq *pCreate, S
snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
}
int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
SCreateVnodeReq createReq = {0};
int32_t code = -1;
......@@ -207,17 +203,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return -1;
}
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue;
msgCb.queueFps[SYNC_QUEUE] = vmPutMsgToSyncQueue;
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
msgCb.queueFps[MERGE_QUEUE] = vmPutMsgToMergeQueue;
msgCb.qsizeFp = vmGetQueueSize;
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
goto _OVER;
......@@ -249,7 +235,7 @@ _OVER:
return code;
}
int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
SDropVnodeReq dropReq = {0};
if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
......@@ -280,57 +266,71 @@ int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return 0;
}
void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle(pWrapper, TDMT_MON_VM_INFO, vmProcessMonitorMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_MON_VM_LOAD, vmProcessMonitorMsg, 0);
SArray *mmGetMsgHandles() {
int32_t code = -1;
SArray *pArray = taosArrayInit(64, sizeof(SMgmtHandle));
if (pArray == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MON_VM_INFO, vmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MON_VM_LOAD, vmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
// Requests handled by VNODE
dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, vmProcessWriteMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, vmProcessFetchMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, vmProcessQueryMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, vmProcessQueryMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, vmProcessWriteMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, vmProcessWriteMsg, 0);
// dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, vmProcessFetchMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, vmProcessFetchMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, vmProcessFetchMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, vmProcessFetchMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, vmProcessWriteMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, vmProcessWriteMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, vmProcessWriteMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, vmProcessWriteMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, vmProcessWriteMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, vmProcessWriteMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, vmProcessWriteMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, vmProcessWriteMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT_RSMA, vmProcessWriteMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE, vmProcessWriteMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, vmProcessFetchMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, vmProcessMergeMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_WRITE_EXEC, vmProcessWriteMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, vmProcessFetchMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_VNODE, vmProcessWriteMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_COMPACT_VNODE, vmProcessWriteMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessMgmtMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessMgmtMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_TIMEOUT, vmProcessSyncMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING, vmProcessSyncMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING_REPLY, vmProcessSyncMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST, vmProcessSyncMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, vmProcessSyncMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE, vmProcessSyncMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, vmProcessSyncMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES, vmProcessSyncMsg, 0);
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, vmProcessSyncMsg, 0);
if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, vmPutNodeMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, vmPutNodeMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_CONSUME, vmPutNodeMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_QUERY, vmPutNodeMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_CONNECT, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_DISCONNECT, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
// if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_SET_CUR, vmPutNodeMsgToWriteQueue, 0)== NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_RES_READY, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASKS_STATUS, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_SMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_PIPE_EXEC, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_MERGE_EXEC, vmPutNodeMsgToMergeQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_WRITE_EXEC, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING_REPLY, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
code = 0;
_OVER:
if (code != 0) {
taosArrayDestroy(pArray);
return NULL;
} else {
return pArray;
}
}
......@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
SVnodeObj *vmAcquireVnode(SVnodesMgmt *pMgmt, int32_t vgId) {
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
SVnodeObj *pVnode = NULL;
int32_t refCount = 0;
......@@ -36,7 +36,7 @@ SVnodeObj *vmAcquireVnode(SVnodesMgmt *pMgmt, int32_t vgId) {
return pVnode;
}
void vmReleaseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
if (pVnode == NULL) return;
taosRLockLatch(&pMgmt->latch);
......@@ -45,7 +45,7 @@ void vmReleaseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
}
int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
if (pVnode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -61,7 +61,6 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
pVnode->db = tstrdup(pCfg->db);
pVnode->path = tstrdup(pCfg->path);
pVnode->pImpl = pImpl;
pVnode->pWrapper = pMgmt->pWrapper;
if (pVnode->path == NULL || pVnode->db == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -80,7 +79,7 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
return code;
}
void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
char path[TSDB_FILENAME_LEN] = {0};
taosWLockLatch(&pMgmt->latch);
......@@ -115,8 +114,7 @@ void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
static void *vmOpenVnodeInThread(void *param) {
SVnodeThread *pThread = param;
SVnodesMgmt *pMgmt = pThread->pMgmt;
SDnode *pDnode = pMgmt->pDnode;
SVnodeMgmt *pMgmt = pThread->pMgmt;
char path[TSDB_FILENAME_LEN];
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
......@@ -128,19 +126,10 @@ static void *vmOpenVnodeInThread(void *param) {
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
dmReportStartup(pDnode, "vnode-open", stepDesc);
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue;
msgCb.queueFps[SYNC_QUEUE] = vmPutMsgToSyncQueue;
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
msgCb.queueFps[MERGE_QUEUE] = vmPutMsgToMergeQueue;
msgCb.qsizeFp = vmGetQueueSize;
tmsgReportStartup("vnode-open", stepDesc);
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->failed++;
......@@ -157,9 +146,7 @@ static void *vmOpenVnodeInThread(void *param) {
return NULL;
}
static int32_t vmOpenVnodes(SVnodesMgmt *pMgmt) {
SDnode *pDnode = pMgmt->pDnode;
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pMgmt->hash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -227,7 +214,7 @@ static int32_t vmOpenVnodes(SVnodesMgmt *pMgmt) {
}
}
static void vmCloseVnodes(SVnodesMgmt *pMgmt) {
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
dInfo("start to close all vnodes");
int32_t numOfVnodes = 0;
......@@ -249,40 +236,44 @@ static void vmCloseVnodes(SVnodesMgmt *pMgmt) {
dInfo("total vnodes:%d are all closed", numOfVnodes);
}
static void vmCleanup(SMgmtWrapper *pWrapper) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
static void vmCleanup(SVnodeMgmt *pMgmt) {
dInfo("vnode-mgmt start to cleanup");
vmCloseVnodes(pMgmt);
vmStopWorker(pMgmt);
vnodeCleanup();
tfsClose(pMgmt->pTfs);
taosMemoryFree(pMgmt);
pWrapper->pMgmt = NULL;
dInfo("vnode-mgmt is cleaned up");
}
static int32_t vmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode;
SVnodesMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodesMgmt));
int32_t code = -1;
static int32_t vmInit(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
dInfo("vnode-mgmt start to init");
int32_t code = -1;
SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
if (pMgmt == NULL) goto _OVER;
pMgmt->path = pWrapper->path;
pMgmt->pDnode = pWrapper->pDnode;
pMgmt->pWrapper = pWrapper;
pMgmt->path = pInput->path;
pMgmt->name = pInput->name;
pMgmt->dnodeId = pInput->dnodeId;
pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToWriteQueue;
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)vmPutRpcMsgToSyncQueue;
pMgmt->msgCb.queueFps[APPLY_QUEUE] = (PutToQueueFp)vmPutRpcMsgToApplyQueue;
pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)vmPutRpcMsgToQueryQueue;
pMgmt->msgCb.queueFps[FETCH_QUEUE] = (PutToQueueFp)vmPutRpcMsgToFetchQueue;
pMgmt->msgCb.queueFps[MERGE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToMergeQueue;
pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
pMgmt->msgCb.pMgmt = pMgmt;
taosInitRWLatch(&pMgmt->latch);
SDiskCfg dCfg = {0};
tstrncpy(dCfg.dir, pDnode->data.dataDir, TSDB_FILENAME_LEN);
tstrncpy(dCfg.dir, pInput->dataDir, TSDB_FILENAME_LEN);
dCfg.level = 0;
dCfg.primary = 1;
SDiskCfg *pDisks = pDnode->data.disks;
int32_t numOfDisks = pDnode->data.numOfDisks;
SDiskCfg *pDisks = tsDiskCfg;
int32_t numOfDisks = tsDiskCfgNum;
if (numOfDisks <= 0 || pDisks == NULL) {
pDisks = &dCfg;
numOfDisks = 1;
......@@ -293,64 +284,64 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
dError("failed to init tfs since %s", terrstr());
goto _OVER;
}
dmReportStartup(pDnode, "vnode-tfs", "initialized");
tmsgReportStartup("vnode-tfs", "initialized");
if (walInit() != 0) {
dError("failed to init wal since %s", terrstr());
goto _OVER;
}
dmReportStartup(pDnode, "vnode-wal", "initialized");
tmsgReportStartup("vnode-wal", "initialized");
if (syncInit() != 0) {
dError("failed to open sync since %s", terrstr());
return -1;
goto _OVER;
}
tmsgReportStartup("vnode-sync", "initialized");
if (vnodeInit(tsNumOfCommitThreads) != 0) {
dError("failed to init vnode since %s", terrstr());
goto _OVER;
}
dmReportStartup(pDnode, "vnode-commit", "initialized");
tmsgReportStartup("vnode-commit", "initialized");
if (vmStartWorker(pMgmt) != 0) {
dError("failed to init workers since %s", terrstr()) goto _OVER;
dError("failed to init workers since %s", terrstr());
goto _OVER;
}
dmReportStartup(pDnode, "vnode-worker", "initialized");
tmsgReportStartup("vnode-worker", "initialized");
if (vmOpenVnodes(pMgmt) != 0) {
dError("failed to open vnode since %s", terrstr());
return -1;
goto _OVER;
}
dmReportStartup(pDnode, "vnode-vnodes", "initialized");
tmsgReportStartup("vnode-vnodes", "initialized");
if (udfcOpen() != 0) {
dError("failed to open udfc in vnode");
goto _OVER;
}
code = 0;
_OVER:
if (code == 0) {
pWrapper->pMgmt = pMgmt;
pOutput->pMgmt = pMgmt;
dInfo("vnodes-mgmt is initialized");
} else {
dError("failed to init vnodes-mgmt since %s", terrstr());
vmCleanup(pWrapper);
vmCleanup(pMgmt);
}
return 0;
return code;
}
static int32_t vmRequire(SMgmtWrapper *pWrapper, bool *required) {
SDnode *pDnode = pWrapper->pDnode;
*required = pDnode->data.supportVnodes > 0;
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
*required = pInput->supportVnodes > 0;
return 0;
}
static int32_t vmStart(SMgmtWrapper *pWrapper) {
static int32_t vmStart(SVnodeMgmt *pMgmt) {
dDebug("vnode-mgmt start to run");
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
taosRLockLatch(&pMgmt->latch);
void *pIter = taosHashIterate(pMgmt->hash, NULL);
......@@ -367,20 +358,18 @@ static int32_t vmStart(SMgmtWrapper *pWrapper) {
return 0;
}
static void vmStop(SMgmtWrapper *pWrapper) {
static void vmStop(SVnodeMgmt *pMgmt) {
// process inside the vnode
}
void vmInitWrapper(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = vmInit;
mgmtFp.closeFp = vmCleanup;
mgmtFp.startFp = vmStart;
mgmtFp.stopFp = vmStop;
mgmtFp.requiredFp = vmRequire;
vmInitMsgHandle(pWrapper);
pWrapper->name = "vnode";
pWrapper->fp = mgmtFp;
}
SMgmtFunc vmGetMgmtFunc() {
SMgmtFunc mgmtFunc = {0};
mgmtFunc.openFp = vmInit;
mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
mgmtFunc.startFp = (NodeStartFp)vmStart;
mgmtFunc.stopFp = (NodeStopFp)vmStop;
mgmtFunc.requiredFp = vmRequire;
mgmtFunc.getHandlesFp = vmGetMsgHandles;
return mgmtFunc;
}
......@@ -32,7 +32,7 @@ static inline void vmSendRsp(SNodeMsg *pMsg, int32_t code) {
}
static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SVnodesMgmt *pMgmt = pInfo->ahandle;
SVnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
......@@ -40,10 +40,10 @@ static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
switch (msgType) {
case TDMT_MON_VM_INFO:
code = vmProcessGetMonVmInfoReq(pMgmt->pWrapper, pMsg);
code = vmProcessGetMonitorInfoReq(pMgmt, pMsg);
break;
case TDMT_MON_VM_LOAD:
code = vmProcessGetVnodeLoadsReq(pMgmt->pWrapper, pMsg);
code = vmProcessGetLoadsReq(pMgmt, pMsg);
break;
case TDMT_DND_CREATE_VNODE:
code = vmProcessCreateVnodeReq(pMgmt, pMsg);
......@@ -240,7 +240,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
}
}
static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) {
static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) {
SRpcMsg *pRpc = &pMsg->rpcMsg;
SMsgHead *pHead = pRpc->pCont;
int32_t code = 0;
......@@ -285,41 +285,34 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp
return code;
}
int32_t vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
int32_t vmPutNodeMsgToSyncQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE);
}
int32_t vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
int32_t vmPutNodeMsgToWriteQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE);
}
int32_t vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
int32_t vmPutNodeMsgToQueryQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE);
}
int32_t vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
int32_t vmPutNodeMsgToFetchQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE);
}
int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
int32_t vmPutNodeMsgToMergeQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, MERGE_QUEUE);
}
int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SSingleWorker *pWorker = &pMgmt->mgmtWorker;
dTrace("msg:%p, will be put into vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg);
return 0;
}
int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, will be put into vnode-monitor queue, worker:%s", pMsg, pWorker->name);
......@@ -327,9 +320,8 @@ int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0;
}
static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
SMsgHead *pHead = pRpc->pCont;
static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType qtype) {
SMsgHead *pHead = pRpc->pCont;
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) return -1;
......@@ -377,33 +369,31 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
return code;
}
int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, WRITE_QUEUE);
int32_t vmPutRpcMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pMgmt, pRpc, WRITE_QUEUE);
}
int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, SYNC_QUEUE);
}
int32_t vmPutRpcMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) { return vmPutRpcMsgToQueue(pMgmt, pRpc, SYNC_QUEUE); }
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE);
int32_t vmPutRpcMsgToApplyQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pMgmt, pRpc, APPLY_QUEUE);
}
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, QUERY_QUEUE);
int32_t vmPutRpcMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pMgmt, pRpc, QUERY_QUEUE);
}
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, FETCH_QUEUE);
int32_t vmPutRpcMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pMgmt, pRpc, FETCH_QUEUE);
}
int32_t vmPutMsgToMergeQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, MERGE_QUEUE);
int32_t vmPutRpcMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pMgmt, pRpc, MERGE_QUEUE);
}
int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
int32_t size = -1;
SVnodeObj *pVnode = vmAcquireVnode(pWrapper->pMgmt, vgId);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
if (pVnode != NULL) {
switch (qtype) {
case WRITE_QUEUE:
......@@ -428,11 +418,11 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
break;
}
}
vmReleaseVnode(pWrapper->pMgmt, pVnode);
vmReleaseVnode(pMgmt, pVnode);
return size;
}
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue);
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
......@@ -450,7 +440,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
return 0;
}
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
......@@ -466,7 +456,7 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
dDebug("vgId:%d, vnode queue is freed", pVnode->vgId);
}
int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
SQWorkerPool *pQPool = &pMgmt->queryPool;
pQPool->name = "vnode-query";
pQPool->min = tsNumOfVnodeQueryThreads;
......@@ -524,7 +514,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
return 0;
}
void vmStopWorker(SVnodesMgmt *pMgmt) {
void vmStopWorker(SVnodeMgmt *pMgmt) {
tSingleWorkerCleanup(&pMgmt->monitorWorker);
tSingleWorkerCleanup(&pMgmt->mgmtWorker);
tWWorkerCleanup(&pMgmt->writePool);
......
......@@ -26,10 +26,6 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
return NULL;
}
if (udfcOpen() != 0) {
qError("qnode can not open udfc");
}
if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) {
taosMemoryFreeClear(pQnode);
return NULL;
......@@ -41,9 +37,6 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
void qndClose(SQnode *pQnode) {
qWorkerDestroy((void **)&pQnode->pQuery);
udfcClose();
taosMemoryFree(pQnode);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册