提交 4fc0e3bd 编写于 作者: S Shengliang Guan

enh: adjust the number of vnode threads so that one vnode has one write thread

上级 fc41f43c
...@@ -735,19 +735,17 @@ To prevent system resource from being exhausted by multiple concurrent streams, ...@@ -735,19 +735,17 @@ To prevent system resource from being exhausted by multiple concurrent streams,
| 44 | numOfVnodeQueryThreads | No | Yes | | 44 | numOfVnodeQueryThreads | No | Yes |
| 45 | numOfVnodeStreamThreads | No | Yes | | 45 | numOfVnodeStreamThreads | No | Yes |
| 46 | numOfVnodeFetchThreads | No | Yes | | 46 | numOfVnodeFetchThreads | No | Yes |
| 47 | numOfVnodeWriteThreads | No | Yes | | 47 | numOfVnodeRsmaThreads | No | Yes |
| 48 | numOfVnodeSyncThreads | No | Yes | | 48 | numOfQnodeQueryThreads | No | Yes |
| 49 | numOfVnodeRsmaThreads | No | Yes | | 49 | numOfQnodeFetchThreads | No | Yes |
| 50 | numOfQnodeQueryThreads | No | Yes | | 50 | numOfSnodeSharedThreads | No | Yes |
| 51 | numOfQnodeFetchThreads | No | Yes | | 51 | numOfSnodeUniqueThreads | No | Yes |
| 52 | numOfSnodeSharedThreads | No | Yes | | 52 | rpcQueueMemoryAllowed | No | Yes |
| 53 | numOfSnodeUniqueThreads | No | Yes | | 53 | logDir | Yes | Yes |
| 54 | rpcQueueMemoryAllowed | No | Yes | | 54 | minimalLogDirGB | Yes | Yes |
| 55 | logDir | Yes | Yes | | 55 | numOfLogLines | Yes | Yes |
| 56 | minimalLogDirGB | Yes | Yes | | 56 | asyncLog | Yes | Yes |
| 57 | numOfLogLines | Yes | Yes | | 57 | logKeepDays | Yes | Yes |
| 58 | asyncLog | Yes | Yes |
| 59 | logKeepDays | Yes | Yes |
| 60 | debugFlag | Yes | Yes | | 60 | debugFlag | Yes | Yes |
| 61 | tmrDebugFlag | Yes | Yes | | 61 | tmrDebugFlag | Yes | Yes |
| 62 | uDebugFlag | Yes | Yes | | 62 | uDebugFlag | Yes | Yes |
......
...@@ -711,19 +711,17 @@ charset 的有效值是 UTF-8。 ...@@ -711,19 +711,17 @@ charset 的有效值是 UTF-8。
| 44 | numOfVnodeQueryThreads | 否 | 是 | | | 44 | numOfVnodeQueryThreads | 否 | 是 | |
| 45 | numOfVnodeStreamThreads | 否 | 是 | | | 45 | numOfVnodeStreamThreads | 否 | 是 | |
| 46 | numOfVnodeFetchThreads | 否 | 是 | | | 46 | numOfVnodeFetchThreads | 否 | 是 | |
| 47 | numOfVnodeWriteThreads | 否 | 是 | | | 47 | numOfVnodeRsmaThreads | 否 | 是 | |
| 48 | numOfVnodeSyncThreads | 否 | 是 | | | 48 | numOfQnodeQueryThreads | 否 | 是 | |
| 49 | numOfVnodeRsmaThreads | 否 | 是 | | | 49 | numOfQnodeFetchThreads | 否 | 是 | |
| 50 | numOfQnodeQueryThreads | 否 | 是 | | | 50 | numOfSnodeSharedThreads | 否 | 是 | |
| 51 | numOfQnodeFetchThreads | 否 | 是 | | | 51 | numOfSnodeUniqueThreads | 否 | 是 | |
| 52 | numOfSnodeSharedThreads | 否 | 是 | | | 52 | rpcQueueMemoryAllowed | 否 | 是 | |
| 53 | numOfSnodeUniqueThreads | 否 | 是 | | | 53 | logDir | 是 | 是 | |
| 54 | rpcQueueMemoryAllowed | 否 | 是 | | | 54 | minimalLogDirGB | 是 | 是 | |
| 55 | logDir | 是 | 是 | | | 55 | numOfLogLines | 是 | 是 | |
| 56 | minimalLogDirGB | 是 | 是 | | | 56 | asyncLog | 是 | 是 | |
| 57 | numOfLogLines | 是 | 是 | | | 57 | logKeepDays | 是 | 是 | |
| 58 | asyncLog | 是 | 是 | |
| 59 | logKeepDays | 是 | 是 | |
| 60 | debugFlag | 是 | 是 | | | 60 | debugFlag | 是 | 是 | |
| 61 | tmrDebugFlag | 是 | 是 | | | 61 | tmrDebugFlag | 是 | 是 | |
| 62 | uDebugFlag | 是 | 是 | | | 62 | uDebugFlag | 是 | 是 | |
......
...@@ -55,8 +55,6 @@ extern int32_t tsNumOfMnodeReadThreads; ...@@ -55,8 +55,6 @@ extern int32_t tsNumOfMnodeReadThreads;
extern int32_t tsNumOfVnodeQueryThreads; extern int32_t tsNumOfVnodeQueryThreads;
extern int32_t tsNumOfVnodeStreamThreads; extern int32_t tsNumOfVnodeStreamThreads;
extern int32_t tsNumOfVnodeFetchThreads; extern int32_t tsNumOfVnodeFetchThreads;
extern int32_t tsNumOfVnodeWriteThreads;
extern int32_t tsNumOfVnodeSyncThreads;
extern int32_t tsNumOfVnodeRsmaThreads; extern int32_t tsNumOfVnodeRsmaThreads;
extern int32_t tsNumOfQnodeQueryThreads; extern int32_t tsNumOfQnodeQueryThreads;
extern int32_t tsNumOfQnodeFetchThreads; extern int32_t tsNumOfQnodeFetchThreads;
......
...@@ -26,9 +26,9 @@ typedef struct SQWorkerPool SQWorkerPool; ...@@ -26,9 +26,9 @@ typedef struct SQWorkerPool SQWorkerPool;
typedef struct SWWorkerPool SWWorkerPool; typedef struct SWWorkerPool SWWorkerPool;
typedef struct SQWorker { typedef struct SQWorker {
int32_t id; // worker ID int32_t id; // worker id
TdThread thread; // thread int64_t pid; // thread pid
int64_t pid; TdThread thread; // thread id
SQWorkerPool *pool; SQWorkerPool *pool;
} SQWorker; } SQWorker;
...@@ -44,10 +44,10 @@ typedef struct SQWorkerPool { ...@@ -44,10 +44,10 @@ typedef struct SQWorkerPool {
typedef struct SWWorker { typedef struct SWWorker {
int32_t id; // worker id int32_t id; // worker id
TdThread thread; // thread int64_t pid; // thread pid
int64_t pid; TdThread thread; // thread id
STaosQall *qall; STaosQall *qall;
STaosQset *qset; // queue set STaosQset *qset;
SWWorkerPool *pool; SWWorkerPool *pool;
} SWWorker; } SWWorker;
......
...@@ -50,8 +50,6 @@ int32_t tsNumOfMnodeReadThreads = 1; ...@@ -50,8 +50,6 @@ int32_t tsNumOfMnodeReadThreads = 1;
int32_t tsNumOfVnodeQueryThreads = 4; int32_t tsNumOfVnodeQueryThreads = 4;
int32_t tsNumOfVnodeStreamThreads = 2; int32_t tsNumOfVnodeStreamThreads = 2;
int32_t tsNumOfVnodeFetchThreads = 4; int32_t tsNumOfVnodeFetchThreads = 4;
int32_t tsNumOfVnodeWriteThreads = 2;
int32_t tsNumOfVnodeSyncThreads = 2;
int32_t tsNumOfVnodeRsmaThreads = 2; int32_t tsNumOfVnodeRsmaThreads = 2;
int32_t tsNumOfQnodeQueryThreads = 4; int32_t tsNumOfQnodeQueryThreads = 4;
int32_t tsNumOfQnodeFetchThreads = 1; int32_t tsNumOfQnodeFetchThreads = 1;
...@@ -374,14 +372,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -374,14 +372,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, 0) != 0) return -1;
tsNumOfVnodeWriteThreads = tsNumOfCores;
tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1);
if (cfgAddInt32(pCfg, "numOfVnodeWriteThreads", tsNumOfVnodeWriteThreads, 1, 1024, 0) != 0) return -1;
tsNumOfVnodeSyncThreads = tsNumOfCores * 2;
tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 16);
if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1;
tsNumOfVnodeRsmaThreads = tsNumOfCores; tsNumOfVnodeRsmaThreads = tsNumOfCores;
tsNumOfVnodeRsmaThreads = TMAX(tsNumOfVnodeRsmaThreads, 4); tsNumOfVnodeRsmaThreads = TMAX(tsNumOfVnodeRsmaThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, 0) != 0) return -1;
...@@ -506,22 +496,6 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { ...@@ -506,22 +496,6 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->stype = stype; pItem->stype = stype;
} }
pItem = cfgGetItem(tsCfg, "numOfVnodeWriteThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfVnodeWriteThreads = numOfCores;
tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1);
pItem->i32 = tsNumOfVnodeWriteThreads;
pItem->stype = stype;
}
pItem = cfgGetItem(tsCfg, "numOfVnodeSyncThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfVnodeSyncThreads = numOfCores * 2;
tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 16);
pItem->i32 = tsNumOfVnodeSyncThreads;
pItem->stype = stype;
}
pItem = cfgGetItem(tsCfg, "numOfVnodeRsmaThreads"); pItem = cfgGetItem(tsCfg, "numOfVnodeRsmaThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfVnodeRsmaThreads = numOfCores; tsNumOfVnodeRsmaThreads = numOfCores;
...@@ -699,8 +673,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -699,8 +673,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32; tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
tsNumOfVnodeStreamThreads = cfgGetItem(pCfg, "numOfVnodeStreamThreads")->i32; tsNumOfVnodeStreamThreads = cfgGetItem(pCfg, "numOfVnodeStreamThreads")->i32;
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32; tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32;
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
// tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; // tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
...@@ -943,10 +915,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { ...@@ -943,10 +915,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
} else if (strcasecmp("numOfVnodeFetchThreads", name) == 0) { } else if (strcasecmp("numOfVnodeFetchThreads", name) == 0) {
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32; tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
*/ */
} else if (strcasecmp("numOfVnodeWriteThreads", name) == 0) {
tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32;
} else if (strcasecmp("numOfVnodeSyncThreads", name) == 0) {
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
} else if (strcasecmp("numOfVnodeRsmaThreads", name) == 0) { } else if (strcasecmp("numOfVnodeRsmaThreads", name) == 0) {
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
} else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) { } else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) {
......
...@@ -33,10 +33,6 @@ typedef struct SVnodeMgmt { ...@@ -33,10 +33,6 @@ typedef struct SVnodeMgmt {
SQWorkerPool queryPool; SQWorkerPool queryPool;
SQWorkerPool streamPool; SQWorkerPool streamPool;
SWWorkerPool fetchPool; SWWorkerPool fetchPool;
SWWorkerPool syncPool;
SWWorkerPool syncCtrlPool;
SWWorkerPool writePool;
SWWorkerPool applyPool;
SSingleWorker mgmtWorker; SSingleWorker mgmtWorker;
SHashObj *hash; SHashObj *hash;
TdThreadRwlock lock; TdThreadRwlock lock;
...@@ -58,10 +54,10 @@ typedef struct { ...@@ -58,10 +54,10 @@ typedef struct {
int8_t dropped; int8_t dropped;
char *path; char *path;
SVnode *pImpl; SVnode *pImpl;
STaosQueue *pWriteQ; SMultiWorker pWriteW;
STaosQueue *pSyncQ; SMultiWorker pSyncW;
STaosQueue *pSyncCtrlQ; SMultiWorker pSyncCtrlW;
STaosQueue *pApplyQ; SMultiWorker pApplyW;
STaosQueue *pQueryQ; STaosQueue *pQueryQ;
STaosQueue *pStreamQ; STaosQueue *pStreamQ;
STaosQueue *pFetchQ; STaosQueue *pFetchQ;
......
...@@ -90,21 +90,21 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { ...@@ -90,21 +90,21 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId); dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
while (pVnode->refCount > 0) taosMsleep(10); while (pVnode->refCount > 0) taosMsleep(10);
dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteQ, dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
pVnode->pWriteQ->threadId); pVnode->pWriteW.queue->threadId);
while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10); tMultiWorkerCleanup(&pVnode->pWriteW);
dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncQ, dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
pVnode->pSyncQ->threadId); pVnode->pSyncW.queue->threadId);
while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10); tMultiWorkerCleanup(&pVnode->pSyncW);
dInfo("vgId:%d, wait for vnode sync ctrl queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncCtrlQ, dInfo("vgId:%d, wait for vnode sync ctrl queue:%p is empty, thread:%08" PRId64, pVnode->vgId,
pVnode->pSyncCtrlQ->threadId); pVnode->pSyncCtrlW.queue, pVnode->pSyncCtrlW.queue->threadId);
while (!taosQueueEmpty(pVnode->pSyncCtrlQ)) taosMsleep(10); tMultiWorkerCleanup(&pVnode->pSyncCtrlW);
dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyQ, dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
pVnode->pApplyQ->threadId); pVnode->pApplyW.queue->threadId);
while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10); tMultiWorkerCleanup(&pVnode->pApplyW);
dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ); dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
......
...@@ -188,30 +188,20 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp ...@@ -188,30 +188,20 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code)); dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code));
} else { } else {
dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pWriteQ, pMsg); taosWriteQitem(pVnode->pWriteW.queue, pMsg);
#if 0 // tests for batch writes
if (pMsg->msgType == TDMT_VND_CREATE_TABLE) {
SRpcMsg *pDup = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
memcpy(pDup, pMsg, sizeof(SRpcMsg));
pDup->pCont = rpcMallocCont(pMsg->contLen);
memcpy(pDup->pCont, pMsg->pCont, pMsg->contLen);
pDup->info.handle = NULL;
taosWriteQitem(pVnode->pWriteQ, pDup);
}
#endif
} }
break; break;
case SYNC_QUEUE: case SYNC_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pSyncQ, pMsg); taosWriteQitem(pVnode->pSyncW.queue, pMsg);
break; break;
case SYNC_CTRL_QUEUE: case SYNC_CTRL_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-sync-ctrl queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-sync-ctrl queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pSyncCtrlQ, pMsg); taosWriteQitem(pVnode->pSyncCtrlW.queue, pMsg);
break; break;
case APPLY_QUEUE: case APPLY_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pApplyQ, pMsg); taosWriteQitem(pVnode->pApplyW.queue, pMsg);
break; break;
default: default:
code = -1; code = -1;
...@@ -276,13 +266,13 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { ...@@ -276,13 +266,13 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
if (pVnode != NULL) { if (pVnode != NULL) {
switch (qtype) { switch (qtype) {
case WRITE_QUEUE: case WRITE_QUEUE:
size = taosQueueItemSize(pVnode->pWriteQ); size = taosQueueItemSize(pVnode->pWriteW.queue);
break; break;
case SYNC_QUEUE: case SYNC_QUEUE:
size = taosQueueItemSize(pVnode->pSyncQ); size = taosQueueItemSize(pVnode->pSyncW.queue);
break; break;
case APPLY_QUEUE: case APPLY_QUEUE:
size = taosQueueItemSize(pVnode->pApplyQ); size = taosQueueItemSize(pVnode->pApplyW.queue);
break; break;
case QUERY_QUEUE: case QUERY_QUEUE:
size = taosQueueItemSize(pVnode->pQueryQ); size = taosQueueItemSize(pVnode->pQueryQ);
...@@ -306,27 +296,33 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { ...@@ -306,27 +296,33 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
} }
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeWriteMsg); SMultiWorkerCfg wcfg = {.max = 1, .name = "vnode-write", .fp = (FItems)vnodeProposeWriteMsg, .param = pVnode->pImpl};
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); SMultiWorkerCfg scfg = {.max = 1, .name = "vnode-sync", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
pVnode->pSyncCtrlQ = tWWorkerAllocQueue(&pMgmt->syncCtrlPool, pVnode, (FItems)vmProcessSyncQueue); SMultiWorkerCfg sccfg = {.max = 1, .name = "vnode-sync-ctrl", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg); SMultiWorkerCfg acfg = {.max = 1, .name = "vnode-apply", .fp = (FItems)vnodeApplyWriteMsg, .param = pVnode};
(void)tMultiWorkerInit(&pVnode->pWriteW, &wcfg);
(void)tMultiWorkerInit(&pVnode->pSyncW, &scfg);
(void)tMultiWorkerInit(&pVnode->pSyncCtrlW, &sccfg);
(void)tMultiWorkerInit(&pVnode->pApplyW, &acfg);
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
pVnode->pStreamQ = tQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue); pVnode->pStreamQ = tQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue); pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
if (pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pApplyQ == NULL || pVnode->pQueryQ == NULL || if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncCtrlW.queue == NULL ||
pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL) { pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
dInfo("vgId:%d, write-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteQ, dInfo("vgId:%d, write-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
pVnode->pWriteQ->threadId); pVnode->pWriteW.queue->threadId);
dInfo("vgId:%d, sync-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncQ, pVnode->pSyncQ->threadId); dInfo("vgId:%d, sync-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
dInfo("vgId:%d, sync-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncCtrlQ, pVnode->pSyncW.queue->threadId);
pVnode->pSyncCtrlQ->threadId); dInfo("vgId:%d, sync-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncCtrlW.queue,
dInfo("vgId:%d, apply-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyQ, pVnode->pSyncCtrlW.queue->threadId);
pVnode->pApplyQ->threadId); dInfo("vgId:%d, apply-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
pVnode->pApplyW.queue->threadId);
dInfo("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ); dInfo("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ);
dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ, dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
pVnode->pFetchQ->threadId); pVnode->pFetchQ->threadId);
...@@ -335,16 +331,9 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { ...@@ -335,16 +331,9 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
} }
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
tWWorkerFreeQueue(&pMgmt->applyPool, pVnode->pApplyQ);
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
tWWorkerFreeQueue(&pMgmt->syncCtrlPool, pVnode->pSyncCtrlQ);
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
tQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ); tQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
pVnode->pWriteQ = NULL;
pVnode->pSyncQ = NULL;
pVnode->pApplyQ = NULL;
pVnode->pQueryQ = NULL; pVnode->pQueryQ = NULL;
pVnode->pStreamQ = NULL; pVnode->pStreamQ = NULL;
pVnode->pFetchQ = NULL; pVnode->pFetchQ = NULL;
...@@ -369,26 +358,6 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { ...@@ -369,26 +358,6 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
pFPool->max = tsNumOfVnodeFetchThreads; pFPool->max = tsNumOfVnodeFetchThreads;
if (tWWorkerInit(pFPool) != 0) return -1; if (tWWorkerInit(pFPool) != 0) return -1;
SWWorkerPool *pWPool = &pMgmt->writePool;
pWPool->name = "vnode-write";
pWPool->max = tsNumOfVnodeWriteThreads;
if (tWWorkerInit(pWPool) != 0) return -1;
SWWorkerPool *pAPool = &pMgmt->applyPool;
pAPool->name = "vnode-apply";
pAPool->max = tsNumOfVnodeWriteThreads;
if (tWWorkerInit(pAPool) != 0) return -1;
SWWorkerPool *pSPool = &pMgmt->syncPool;
pSPool->name = "vnode-sync";
pSPool->max = tsNumOfVnodeSyncThreads;
if (tWWorkerInit(pSPool) != 0) return -1;
SWWorkerPool *pSCPool = &pMgmt->syncCtrlPool;
pSCPool->name = "vnode-sync-ctrl";
pSCPool->max = tsNumOfVnodeSyncThreads;
if (tWWorkerInit(pSCPool) != 0) return -1;
SSingleWorkerCfg mgmtCfg = { SSingleWorkerCfg mgmtCfg = {
.min = 1, .min = 1,
.max = 1, .max = 1,
...@@ -403,10 +372,6 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { ...@@ -403,10 +372,6 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
} }
void vmStopWorker(SVnodeMgmt *pMgmt) { void vmStopWorker(SVnodeMgmt *pMgmt) {
tWWorkerCleanup(&pMgmt->writePool);
tWWorkerCleanup(&pMgmt->applyPool);
tWWorkerCleanup(&pMgmt->syncPool);
tWWorkerCleanup(&pMgmt->syncCtrlPool);
tQWorkerCleanup(&pMgmt->queryPool); tQWorkerCleanup(&pMgmt->queryPool);
tQWorkerCleanup(&pMgmt->streamPool); tQWorkerCleanup(&pMgmt->streamPool);
tWWorkerCleanup(&pMgmt->fetchPool); tWWorkerCleanup(&pMgmt->fetchPool);
......
...@@ -43,7 +43,6 @@ int32_t tQWorkerInit(SQWorkerPool *pool) { ...@@ -43,7 +43,6 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
void tQWorkerCleanup(SQWorkerPool *pool) { void tQWorkerCleanup(SQWorkerPool *pool) {
for (int32_t i = 0; i < pool->max; ++i) { for (int32_t i = 0; i < pool->max; ++i) {
SQWorker *worker = pool->workers + i; SQWorker *worker = pool->workers + i;
// if (worker == NULL) continue;
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
taosQsetThreadResume(pool->qset); taosQsetThreadResume(pool->qset);
} }
...@@ -51,7 +50,6 @@ void tQWorkerCleanup(SQWorkerPool *pool) { ...@@ -51,7 +50,6 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
for (int32_t i = 0; i < pool->max; ++i) { for (int32_t i = 0; i < pool->max; ++i) {
SQWorker *worker = pool->workers + i; SQWorker *worker = pool->workers + i;
// if (worker == NULL) continue;
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
taosThreadJoin(worker->thread, NULL); taosThreadJoin(worker->thread, NULL);
taosThreadClear(&worker->thread); taosThreadClear(&worker->thread);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册