提交 2f09d49a 编写于 作者: S Shengliang Guan

refact dnode - vnodes

上级 ab4b640f
...@@ -655,8 +655,8 @@ typedef struct { ...@@ -655,8 +655,8 @@ typedef struct {
} SVnodeLoad; } SVnodeLoad;
typedef struct { typedef struct {
int32_t vnodeNum; int32_t num;
SVnodeLoad vnodeLoads[]; SVnodeLoad data[];
} SVnodeLoads; } SVnodeLoads;
typedef struct SStatusMsg { typedef struct SStatusMsg {
......
...@@ -47,12 +47,18 @@ typedef struct { ...@@ -47,12 +47,18 @@ typedef struct {
SVnodeDesc replicas[TSDB_MAX_REPLICA]; SVnodeDesc replicas[TSDB_MAX_REPLICA];
} SVnodeCfg; } SVnodeCfg;
typedef enum {
VN_MSG_TYPE_WRITE = 1,
VN_MSG_TYPE_APPLY,
VN_MSG_TYPE_SYNC,
VN_MSG_TYPE_QUERY,
VN_MSG_TYPE_FETCH
} EVMType;
typedef struct SVnodeMsg { typedef struct SVnodeMsg {
int32_t msgType; int32_t curNum;
int32_t code; int32_t allocNum;
SRpcMsg rpcMsg; // original message from rpc SRpcMsg rpcMsg[];
int32_t contLen;
char pCont[];
} SVnodeMsg; } SVnodeMsg;
int32_t vnodeInit(); int32_t vnodeInit();
...@@ -67,7 +73,11 @@ int32_t vnodeCompact(SVnode *pVnode); ...@@ -67,7 +73,11 @@ int32_t vnodeCompact(SVnode *pVnode);
int32_t vnodeSync(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode);
void vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); void vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg);
SVnodeMsg *vnodeInitMsg(int32_t msgNum);
int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg);
void vnodeCleanupMsg(SVnodeMsg *pMsg);
void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVMType msgType);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -40,8 +40,8 @@ shall be used to set up the protection. ...@@ -40,8 +40,8 @@ shall be used to set up the protection.
typedef void *taos_queue; typedef void *taos_queue;
typedef void *taos_qset; typedef void *taos_qset;
typedef void *taos_qall; typedef void *taos_qall;
typedef void *(*FProcessItem)(void *pItem, void *ahandle); typedef void (*FProcessItem)(void *ahandle, void *pItem);
typedef void *(*FProcessItems)(taos_qall qall, int numOfItems, void *ahandle); typedef void (*FProcessItems)(void *ahandle, taos_qall qall, int numOfItems);
taos_queue taosOpenQueue(); taos_queue taosOpenQueue();
void taosCloseQueue(taos_queue); void taosCloseQueue(taos_queue);
......
...@@ -23,7 +23,7 @@ extern "C" { ...@@ -23,7 +23,7 @@ extern "C" {
int32_t dnodeInitVnodes(); int32_t dnodeInitVnodes();
void dnodeCleanupVnodes(); void dnodeCleanupVnodes();
void dnodeGetVnodes(SVnodeLoads *pVloads); void dnodeGetVnodeLoads(SVnodeLoads *pVloads);
void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet); void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet); void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
......
...@@ -372,8 +372,8 @@ static void dnodeSendStatusMsg() { ...@@ -372,8 +372,8 @@ static void dnodeSendStatusMsg() {
char timestr[32] = "1970-01-01 00:00:00.00"; char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
dnodeGetVnodes(&pStatus->vnodeLoads); dnodeGetVnodeLoads(&pStatus->vnodeLoads);
contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.vnodeNum * sizeof(SVnodeLoad); contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad);
SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_STATUS}; SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_STATUS};
dnodeSendMsgToMnode(&rpcMsg); dnodeSendMsgToMnode(&rpcMsg);
......
...@@ -45,27 +45,66 @@ typedef struct { ...@@ -45,27 +45,66 @@ typedef struct {
} SVThread; } SVThread;
static struct { static struct {
SHashObj *hash; SHashObj *hash;
SWorkerPool mgmtPool; SWorkerPool mgmtPool;
taos_queue pMgmtQ; SWorkerPool queryPool;
SSteps *pSteps; SWorkerPool fetchPool;
int32_t openVnodes; SMWorkerPool syncPool;
int32_t totalVnodes; SMWorkerPool writePool;
char file[PATH_MAX + 20]; taos_queue pMgmtQ;
SSteps *pSteps;
int32_t openVnodes;
int32_t totalVnodes;
char file[PATH_MAX + 20];
} tsVnodes; } tsVnodes;
static int32_t dnodeAllocVnodeQueryQueue(SVnodeObj *pVnode);
static void dnodeFreeVnodeQueryQueue(SVnodeObj *pVnode);
static int32_t dnodeAllocVnodeFetchQueue(SVnodeObj *pVnode);
static void dnodeFreeVnodeFetchQueue(SVnodeObj *pVnode);
static int32_t dnodeAllocVnodeWriteQueue(SVnodeObj *pVnode);
static void dnodeFreeVnodeWriteQueue(SVnodeObj *pVnode);
static int32_t dnodeAllocVnodeApplyQueue(SVnodeObj *pVnode);
static void dnodeFreeVnodeApplyQueue(SVnodeObj *pVnode);
static int32_t dnodeAllocVnodeSyncQueue(SVnodeObj *pVnode);
static void dnodeFreeVnodeSyncQueue(SVnodeObj *pVnode);
static int32_t dnodeCreateVnodeWrapper(int32_t vgId, SVnode *pImpl) { static int32_t dnodeCreateVnodeWrapper(int32_t vgId, SVnode *pImpl) {
SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj)); SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj));
if (pVnode == NULL) {
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
pVnode->vgId = vgId; pVnode->vgId = vgId;
pVnode->refCount = 0; pVnode->refCount = 0;
pVnode->dropped = 0; pVnode->dropped = 0;
pVnode->accessState = TSDB_VN_ALL_ACCCESS; pVnode->accessState = TSDB_VN_ALL_ACCCESS;
pVnode->pImpl = pImpl; pVnode->pImpl = pImpl;
pVnode->pWriteQ = NULL;
pVnode->pSyncQ = NULL; int32_t code = dnodeAllocVnodeQueryQueue(pVnode);
pVnode->pApplyQ = NULL; if (code != 0) {
pVnode->pQueryQ = NULL; return code;
pVnode->pFetchQ = NULL; }
code = dnodeAllocVnodeFetchQueue(pVnode);
if (code != 0) {
return code;
}
code = dnodeAllocVnodeWriteQueue(pVnode);
if (code != 0) {
return code;
}
code = dnodeAllocVnodeApplyQueue(pVnode);
if (code != 0) {
return code;
}
code = dnodeAllocVnodeSyncQueue(pVnode);
if (code != 0) {
return code;
}
return taosHashPut(tsVnodes.hash, &vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); return taosHashPut(tsVnodes.hash, &vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
} }
...@@ -74,11 +113,11 @@ static void dnodeDropVnodeWrapper(SVnodeObj *pVnode) { ...@@ -74,11 +113,11 @@ static void dnodeDropVnodeWrapper(SVnodeObj *pVnode) {
taosHashRemove(tsVnodes.hash, &pVnode->vgId, sizeof(int32_t)); taosHashRemove(tsVnodes.hash, &pVnode->vgId, sizeof(int32_t));
//todo wait all queue empty //todo wait all queue empty
pVnode->pWriteQ = NULL; dnodeFreeVnodeQueryQueue(pVnode);
pVnode->pSyncQ = NULL; dnodeFreeVnodeFetchQueue(pVnode);
pVnode->pApplyQ = NULL; dnodeFreeVnodeWriteQueue(pVnode);
pVnode->pQueryQ = NULL; dnodeFreeVnodeApplyQueue(pVnode);
pVnode->pFetchQ = NULL; dnodeFreeVnodeSyncQueue(pVnode);
} }
static int32_t dnodeGetVnodesFromHash(SVnodeObj *pVnodes[], int32_t *numOfVnodes) { static int32_t dnodeGetVnodesFromHash(SVnodeObj *pVnodes[], int32_t *numOfVnodes) {
...@@ -465,7 +504,7 @@ static int32_t vnodeProcessCompactVnodeReq(SRpcMsg *rpcMsg) { ...@@ -465,7 +504,7 @@ static int32_t vnodeProcessCompactVnodeReq(SRpcMsg *rpcMsg) {
return code; return code;
} }
static void dnodeProcessVnodeMgmtReq(SRpcMsg *pMsg, void *unused) { static void dnodeProcessVnodeMgmtQueue(void *unused, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
switch (pMsg->msgType) { switch (pMsg->msgType) {
...@@ -498,7 +537,44 @@ static void dnodeProcessVnodeMgmtReq(SRpcMsg *pMsg, void *unused) { ...@@ -498,7 +537,44 @@ static void dnodeProcessVnodeMgmtReq(SRpcMsg *pMsg, void *unused) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
static int32_t dnodeWriteToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { static void dnodeProcessVnodeQueryQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) {
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_QUERY);
}
static void dnodeProcessVnodeFetchQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) {
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_FETCH);
}
static void dnodeProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
SVnodeMsg *pMsg = vnodeInitMsg(numOfMsgs);
SRpcMsg *pRpcMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
vnodeAppendMsg(pMsg, pRpcMsg);
taosFreeQitem(pRpcMsg);
}
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_WRITE);
}
static void dnodeProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
SVnodeMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg);
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_APPLY);
}
}
static void dnodeProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
SVnodeMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg);
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_SYNC);
}
}
static int32_t dnodeWriteRpcMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) {
int32_t code = 0; int32_t code = 0;
if (pQueue == NULL) { if (pQueue == NULL) {
...@@ -520,6 +596,28 @@ static int32_t dnodeWriteToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { ...@@ -520,6 +596,28 @@ static int32_t dnodeWriteToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) {
} }
} }
static int32_t dnodeWriteVnodeMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) {
int32_t code = 0;
if (pQueue == NULL) {
code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
} else {
SVnodeMsg *pMsg = vnodeInitMsg(1);
if (pMsg == NULL) {
code = TSDB_CODE_DND_OUT_OF_MEMORY;
} else {
vnodeAppendMsg(pMsg, pRpcMsg);
code = taosWriteQitem(pQueue, pMsg);
}
}
if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
rpcSendResponse(&rsp);
rpcFreeCont(pRpcMsg->pCont);
}
}
static SVnodeObj *dnodeAcquireVnodeFromMsg(SRpcMsg *pMsg) { static SVnodeObj *dnodeAcquireVnodeFromMsg(SRpcMsg *pMsg) {
SMsgHead *pHead = (SMsgHead *)pMsg->pCont; SMsgHead *pHead = (SMsgHead *)pMsg->pCont;
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
...@@ -534,12 +632,12 @@ static SVnodeObj *dnodeAcquireVnodeFromMsg(SRpcMsg *pMsg) { ...@@ -534,12 +632,12 @@ static SVnodeObj *dnodeAcquireVnodeFromMsg(SRpcMsg *pMsg) {
return pVnode; return pVnode;
} }
void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { dnodeWriteToVnodeQueue(tsVnodes.pMgmtQ, pMsg); } void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { dnodeWriteRpcMsgToVnodeQueue(tsVnodes.pMgmtQ, pMsg); }
void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg);
if (pVnode != NULL) { if (pVnode != NULL) {
dnodeWriteToVnodeQueue(pVnode->pWriteQ, pMsg); dnodeWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg);
dnodeReleaseVnode(pVnode); dnodeReleaseVnode(pVnode);
} }
} }
...@@ -547,7 +645,7 @@ void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -547,7 +645,7 @@ void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg);
if (pVnode != NULL) { if (pVnode != NULL) {
dnodeWriteToVnodeQueue(pVnode->pSyncQ, pMsg); dnodeWriteVnodeMsgToVnodeQueue(pVnode->pSyncQ, pMsg);
dnodeReleaseVnode(pVnode); dnodeReleaseVnode(pVnode);
} }
} }
...@@ -555,7 +653,7 @@ void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -555,7 +653,7 @@ void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg);
if (pVnode != NULL) { if (pVnode != NULL) {
dnodeWriteToVnodeQueue(pVnode->pQueryQ, pMsg); dnodeWriteVnodeMsgToVnodeQueue(pVnode->pQueryQ, pMsg);
dnodeReleaseVnode(pVnode); dnodeReleaseVnode(pVnode);
} }
} }
...@@ -563,7 +661,7 @@ void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -563,7 +661,7 @@ void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg);
if (pVnode != NULL) { if (pVnode != NULL) {
dnodeWriteToVnodeQueue(pVnode->pFetchQ, pMsg); dnodeWriteVnodeMsgToVnodeQueue(pVnode->pFetchQ, pMsg);
dnodeReleaseVnode(pVnode); dnodeReleaseVnode(pVnode);
} }
} }
...@@ -577,7 +675,7 @@ static int32_t dnodeInitVnodeMgmtWorker() { ...@@ -577,7 +675,7 @@ static int32_t dnodeInitVnodeMgmtWorker() {
return TSDB_CODE_VND_OUT_OF_MEMORY; return TSDB_CODE_VND_OUT_OF_MEMORY;
} }
tsVnodes.pMgmtQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)dnodeProcessVnodeMgmtReq); tsVnodes.pMgmtQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)dnodeProcessVnodeMgmtQueue);
if (tsVnodes.pMgmtQ == NULL) { if (tsVnodes.pMgmtQ == NULL) {
return TSDB_CODE_VND_OUT_OF_MEMORY; return TSDB_CODE_VND_OUT_OF_MEMORY;
} }
...@@ -591,12 +689,137 @@ static void dnodeCleanupVnodeMgmtWorker() { ...@@ -591,12 +689,137 @@ static void dnodeCleanupVnodeMgmtWorker() {
tsVnodes.pMgmtQ = NULL; tsVnodes.pMgmtQ = NULL;
} }
static int32_t dnodeAllocVnodeQueryQueue(SVnodeObj *pVnode) {
pVnode->pQueryQ = tWorkerAllocQueue(&tsVnodes.queryPool, pVnode, (FProcessItem)dnodeProcessVnodeQueryQueue);
if (pVnode->pQueryQ == NULL) {
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeFreeVnodeQueryQueue(SVnodeObj *pVnode) {
tWorkerFreeQueue(&tsVnodes.queryPool, pVnode->pQueryQ);
pVnode->pQueryQ = NULL;
}
static int32_t dnodeAllocVnodeFetchQueue(SVnodeObj *pVnode) {
pVnode->pFetchQ = tWorkerAllocQueue(&tsVnodes.fetchPool, pVnode, (FProcessItem)dnodeProcessVnodeFetchQueue);
if (pVnode->pFetchQ == NULL) {
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeFreeVnodeFetchQueue(SVnodeObj *pVnode) {
tWorkerFreeQueue(&tsVnodes.fetchPool, pVnode->pFetchQ);
pVnode->pFetchQ = NULL;
}
static int32_t dnodeInitVnodeReadWorker() {
int32_t maxFetchThreads = 4;
float threadsForQuery = MAX(tsNumOfCores * tsRatioOfQueryCores, 1);
SWorkerPool *pPool = &tsVnodes.queryPool;
pPool->name = "vnode-query";
pPool->min = (int32_t)threadsForQuery;
pPool->max = pPool->min;
if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
pPool = &tsVnodes.fetchPool;
pPool->name = "vnode-fetch";
pPool->min = MIN(maxFetchThreads, tsNumOfCores);
pPool->max = pPool->min;
if (tWorkerInit(pPool) != 0) {
TSDB_CODE_VND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeCleanupVnodeReadWorker() {
tWorkerCleanup(&tsVnodes.fetchPool);
tWorkerCleanup(&tsVnodes.queryPool);
}
static int32_t dnodeAllocVnodeWriteQueue(SVnodeObj *pVnode) {
pVnode->pWriteQ = tMWorkerAllocQueue(&tsVnodes.writePool, pVnode, (FProcessItems)dnodeProcessVnodeWriteQueue);
if (pVnode->pWriteQ == NULL) {
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeFreeVnodeWriteQueue(SVnodeObj *pVnode) {
tMWorkerFreeQueue(&tsVnodes.writePool, pVnode->pWriteQ);
pVnode->pWriteQ = NULL;
}
static int32_t dnodeAllocVnodeApplyQueue(SVnodeObj *pVnode) {
pVnode->pApplyQ = tMWorkerAllocQueue(&tsVnodes.writePool, pVnode, (FProcessItems)dnodeProcessVnodeApplyQueue);
if (pVnode->pApplyQ == NULL) {
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeFreeVnodeApplyQueue(SVnodeObj *pVnode) {
tMWorkerFreeQueue(&tsVnodes.writePool, pVnode->pApplyQ);
pVnode->pApplyQ = NULL;
}
static int32_t dnodeInitVnodeWriteWorker() {
SMWorkerPool *pPool = &tsVnodes.writePool;
pPool->name = "vnode-write";
pPool->max = tsNumOfCores;
if (tMWorkerInit(pPool) != 0) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeCleanupVnodeWriteWorker() { tMWorkerCleanup(&tsVnodes.writePool); }
static int32_t dnodeAllocVnodeSyncQueue(SVnodeObj *pVnode) {
pVnode->pSyncQ = tMWorkerAllocQueue(&tsVnodes.writePool, pVnode, (FProcessItems)dnodeProcessVnodeSyncQueue);
if (pVnode->pSyncQ == NULL) {
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeFreeVnodeSyncQueue(SVnodeObj *pVnode) {
tMWorkerFreeQueue(&tsVnodes.writePool, pVnode->pSyncQ);
pVnode->pSyncQ = NULL;
}
static int32_t dnodeInitVnodeSyncWorker() {
int32_t maxThreads = tsNumOfCores / 2;
if (maxThreads < 1) maxThreads = 1;
SMWorkerPool *pPool = &tsVnodes.writePool;
pPool->name = "vnode-sync";
pPool->max = maxThreads;
if (tMWorkerInit(pPool) != 0) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeCleanupVnodeSyncWorker() { tMWorkerCleanup(&tsVnodes.syncPool); }
int32_t dnodeInitVnodes() { int32_t dnodeInitVnodes() {
dInfo("dnode-vnodes start to init"); dInfo("dnode-vnodes start to init");
SSteps *pSteps = taosStepInit(3, dnodeReportStartup); SSteps *pSteps = taosStepInit(3, dnodeReportStartup);
taosStepAdd(pSteps, "dnode-vnode-env", vnodeInit, vnodeCleanup); taosStepAdd(pSteps, "dnode-vnode-env", vnodeInit, vnodeCleanup);
taosStepAdd(pSteps, "dnode-vnode-mgmt", dnodeInitVnodeMgmtWorker, dnodeCleanupVnodeMgmtWorker); taosStepAdd(pSteps, "dnode-vnode-mgmt", dnodeInitVnodeMgmtWorker, dnodeCleanupVnodeMgmtWorker);
taosStepAdd(pSteps, "dnode-vnode-read", dnodeInitVnodeReadWorker, dnodeCleanupVnodeReadWorker);
taosStepAdd(pSteps, "dnode-vnode-write", dnodeInitVnodeWriteWorker, dnodeCleanupVnodeWriteWorker);
taosStepAdd(pSteps, "dnode-vnode-sync", dnodeInitVnodeSyncWorker, dnodeCleanupVnodeSyncWorker);
taosStepAdd(pSteps, "dnode-vnodes", dnodeOpenVnodes, dnodeCleanupVnodes); taosStepAdd(pSteps, "dnode-vnodes", dnodeOpenVnodes, dnodeCleanupVnodes);
tsVnodes.pSteps = pSteps; tsVnodes.pSteps = pSteps;
...@@ -612,24 +835,26 @@ void dnodeCleanupVnodes() { ...@@ -612,24 +835,26 @@ void dnodeCleanupVnodes() {
} }
} }
void dnodeGetVnodes(SVnodeLoads *pLoads) { void dnodeGetVnodeLoads(SVnodeLoads *pLoads) {
pLoads->vnodeNum = taosHashGetSize(tsVnodes.hash); pLoads->num = taosHashGetSize(tsVnodes.hash);
int32_t v = 0; int32_t v = 0;
void *pIter = taosHashIterate(tsVnodes.hash, NULL); void *pIter = taosHashIterate(tsVnodes.hash, NULL);
while (pIter) { while (pIter) {
SVnodeObj **ppVnode = pIter; SVnodeObj **ppVnode = pIter;
if (ppVnode == NULL) continue; if (ppVnode == NULL) continue;
SVnodeObj *pVnode = *ppVnode; SVnodeObj *pVnode = *ppVnode;
if (pVnode) { if (pVnode == NULL) continue;
SVnodeLoad *pLoad = &pLoads->vnodeLoads[v++];
vnodeGetLoad(pVnode->pImpl, pLoad); SVnodeLoad *pLoad = &pLoads->data[v++];
pLoad->vgId = htonl(pLoad->vgId); vnodeGetLoad(pVnode->pImpl, pLoad);
pLoad->totalStorage = htobe64(pLoad->totalStorage); pLoad->vgId = htonl(pLoad->vgId);
pLoad->compStorage = htobe64(pLoad->compStorage); pLoad->totalStorage = htobe64(pLoad->totalStorage);
pLoad->pointsWritten = htobe64(pLoad->pointsWritten); pLoad->compStorage = htobe64(pLoad->compStorage);
pLoad->tablesNum = htobe64(pLoad->tablesNum); pLoad->pointsWritten = htobe64(pLoad->pointsWritten);
} pLoad->tablesNum = htobe64(pLoad->tablesNum);
pIter = taosHashIterate(tsVnodes.hash, pIter); pIter = taosHashIterate(tsVnodes.hash, pIter);
} }
} }
\ No newline at end of file
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "vnodeInt.h" #include "vnodeInt.h"
#include "tqueue.h"
int32_t vnodeInit() { return 0; } int32_t vnodeInit() { return 0; }
void vnodeCleanup() {} void vnodeCleanup() {}
...@@ -27,5 +28,45 @@ int32_t vnodeDrop(SVnode *pVnode) { return 0; } ...@@ -27,5 +28,45 @@ int32_t vnodeDrop(SVnode *pVnode) { return 0; }
int32_t vnodeCompact(SVnode *pVnode) { return 0; } int32_t vnodeCompact(SVnode *pVnode) { return 0; }
int32_t vnodeSync(SVnode *pVnode) { return 0; } int32_t vnodeSync(SVnode *pVnode) { return 0; }
void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg) {}
void vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {} void vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {}
SVnodeMsg *vnodeInitMsg(int32_t msgNum) {
SVnodeMsg *pMsg = taosAllocateQitem(msgNum * sizeof(SRpcMsg *) + sizeof(SVnodeMsg));
if (pMsg == NULL) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
return NULL;
} else {
pMsg->allocNum = msgNum;
return pMsg;
}
}
int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg) {
if (pMsg->curNum >= pMsg->allocNum) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
pMsg->rpcMsg[pMsg->curNum++] = *pRpcMsg;
}
void vnodeCleanupMsg(SVnodeMsg *pMsg) {
for (int32_t i = 0; i < pMsg->curNum; ++i) {
rpcFreeCont(pMsg->rpcMsg[i].pCont);
}
taosFreeQitem(pMsg);
}
void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVMType msgType) {
switch (msgType) {
case VN_MSG_TYPE_WRITE:
break;
case VN_MSG_TYPE_APPLY:
break;
case VN_MSG_TYPE_SYNC:
break;
case VN_MSG_TYPE_QUERY:
break;
case VN_MSG_TYPE_FETCH:
break;
}
}
...@@ -76,7 +76,7 @@ static void *tWorkerThreadFp(SWorker *worker) { ...@@ -76,7 +76,7 @@ static void *tWorkerThreadFp(SWorker *worker) {
} }
if (fp) { if (fp) {
(*fp)(msg, ahandle); (*fp)(ahandle, msg);
} }
} }
...@@ -186,7 +186,7 @@ static void *tWriteWorkerThreadFp(SMWorker *worker) { ...@@ -186,7 +186,7 @@ static void *tWriteWorkerThreadFp(SMWorker *worker) {
} }
if (fp) { if (fp) {
(*fp)(worker->qall, numOfMsgs, ahandle); (*fp)(ahandle, worker->qall, numOfMsgs);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册