提交 2af212f4 编写于 作者: S Shengliang Guan

refact worker

上级 f48fddc7
......@@ -74,31 +74,31 @@ typedef struct {
int32_t maxNum;
FItem fp;
void *param;
} SQWorkerAllCfg;
} SSingleWorkerCfg;
typedef struct {
const char *name;
STaosQueue *queue;
SQWorkerPool pool;
} SQWorkerAll;
} SSingleWorker;
typedef struct {
const char *name;
int32_t maxNum;
FItems fp;
void *param;
} SWWorkerAllCfg;
} SMultiWorkerCfg;
typedef struct {
const char *name;
STaosQueue *queue;
SWWorkerPool pool;
} SWWorkerAll;
} SMultiWorker;
int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg);
void tQWorkerAllCleanup(SQWorkerAll *pWorker);
int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg);
void tWWorkerAllCleanup(SWWorkerAll *pWorker);
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg);
void tSingleWorkerCleanup(SSingleWorker *pWorker);
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg);
void tMultiWorkerCleanup(SMultiWorker *pWorker);
#ifdef __cplusplus
}
......
......@@ -28,7 +28,7 @@ typedef struct SBnodeMgmt {
SDnode *pDnode;
SMgmtWrapper *pWrapper;
const char *path;
SWWorkerAll writeWorker;
SMultiWorker writeWorker;
} SBnodeMgmt;
// bmInt.c
......
......@@ -64,15 +64,15 @@ static void bmProcessQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs
}
int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SWWorkerAll *pWorker = &pMgmt->writeWorker;
SMultiWorker *pWorker = &pMgmt->writeWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
return taosWriteQitem(pWorker->queue, pMsg);
}
int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
SWWorkerAllCfg cfg = {.maxNum = 1, .name = "bnode-write", .fp = (FItems)bmProcessQueue, .param = pMgmt};
if (tWWorkerAllInit(&pMgmt->writeWorker, &cfg) != 0) {
SMultiWorkerCfg cfg = {.maxNum = 1, .name = "bnode-write", .fp = (FItems)bmProcessQueue, .param = pMgmt};
if (tMultiWorkerInit(&pMgmt->writeWorker, &cfg) != 0) {
dError("failed to start bnode write worker since %s", terrstr());
return -1;
}
......@@ -80,4 +80,4 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
return 0;
}
void bmStopWorker(SBnodeMgmt *pMgmt) { tWWorkerAllCleanup(&pMgmt->writeWorker); }
void bmStopWorker(SBnodeMgmt *pMgmt) { tMultiWorkerCleanup(&pMgmt->writeWorker); }
......@@ -31,8 +31,8 @@ typedef struct SDnodeMgmt {
SArray *dnodeEps;
TdThread *threadId;
SRWLatch latch;
SQWorkerAll mgmtWorker;
SQWorkerAll statusWorker;
SSingleWorker mgmtWorker;
SSingleWorker statusWorker;
const char *path;
SDnode *pDnode;
SMgmtWrapper *pWrapper;
......
......@@ -100,16 +100,16 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
}
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
SQWorkerAllCfg mgmtCfg = {
SSingleWorkerCfg mgmtCfg = {
.minNum = 1, .maxNum = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt};
if (tQWorkerAllInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) {
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr());
return -1;
}
SQWorkerAllCfg statusCfg = {
SSingleWorkerCfg statusCfg = {
.minNum = 1, .maxNum = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt};
if (tQWorkerAllInit(&pMgmt->statusWorker, &statusCfg) != 0) {
if (tSingleWorkerInit(&pMgmt->statusWorker, &statusCfg) != 0) {
dError("failed to start dnode status worker since %s", terrstr());
return -1;
}
......@@ -129,8 +129,8 @@ int32_t dmStartThread(SDnodeMgmt *pMgmt) {
}
void dmStopWorker(SDnodeMgmt *pMgmt) {
tQWorkerAllCleanup(&pMgmt->mgmtWorker);
tQWorkerAllCleanup(&pMgmt->statusWorker);
tSingleWorkerCleanup(&pMgmt->mgmtWorker);
tSingleWorkerCleanup(&pMgmt->statusWorker);
if (pMgmt->threadId != NULL) {
taosDestoryThread(pMgmt->threadId);
......@@ -139,7 +139,7 @@ void dmStopWorker(SDnodeMgmt *pMgmt) {
}
int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SQWorkerAll *pWorker = &pMgmt->mgmtWorker;
SSingleWorker *pWorker = &pMgmt->mgmtWorker;
if (pMsg->rpcMsg.msgType == TDMT_MND_STATUS_RSP) {
pWorker = &pMgmt->statusWorker;
}
......
......@@ -28,9 +28,9 @@ typedef struct SMnodeMgmt {
SDnode *pDnode;
SMgmtWrapper *pWrapper;
const char *path;
SQWorkerAll readWorker;
SQWorkerAll writeWorker;
SQWorkerAll syncWorker;
SSingleWorker readWorker;
SSingleWorker writeWorker;
SSingleWorker syncWorker;
SReplica replicas[TSDB_MAX_REPLICA];
int8_t replica;
int8_t selfIndex;
......
......@@ -44,7 +44,7 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
taosFreeQitem(pMsg);
}
static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SNodeMsg *pMsg) {
static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SNodeMsg *pMsg) {
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
return taosWriteQitem(pWorker->queue, pMsg);
}
......@@ -61,7 +61,7 @@ int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
}
static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SRpcMsg *pRpc) {
static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg == NULL) {
return -1;
......@@ -91,19 +91,19 @@ int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
}
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
SQWorkerAllCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt};
SSingleWorkerCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt};
if (tQWorkerAllInit(&pMgmt->readWorker, &cfg) != 0) {
if (tSingleWorkerInit(&pMgmt->readWorker, &cfg) != 0) {
dError("failed to start mnode-read worker since %s", terrstr());
return -1;
}
if (tQWorkerAllInit(&pMgmt->writeWorker, &cfg) != 0) {
if (tSingleWorkerInit(&pMgmt->writeWorker, &cfg) != 0) {
dError("failed to start mnode-write worker since %s", terrstr());
return -1;
}
if (tQWorkerAllInit(&pMgmt->syncWorker, &cfg) != 0) {
if (tSingleWorkerInit(&pMgmt->syncWorker, &cfg) != 0) {
dError("failed to start mnode sync-worker since %s", terrstr());
return -1;
}
......@@ -112,7 +112,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
}
void mmStopWorker(SMnodeMgmt *pMgmt) {
tQWorkerAllCleanup(&pMgmt->readWorker);
tQWorkerAllCleanup(&pMgmt->writeWorker);
tQWorkerAllCleanup(&pMgmt->syncWorker);
tSingleWorkerCleanup(&pMgmt->readWorker);
tSingleWorkerCleanup(&pMgmt->writeWorker);
tSingleWorkerCleanup(&pMgmt->syncWorker);
}
......@@ -28,8 +28,8 @@ typedef struct SQnodeMgmt {
SDnode *pDnode;
SMgmtWrapper *pWrapper;
const char *path;
SQWorkerAll queryWorker;
SQWorkerAll fetchWorker;
SSingleWorker queryWorker;
SSingleWorker fetchWorker;
} SQnodeMgmt;
// qmInt.c
......
......@@ -49,7 +49,7 @@ static void qmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
taosFreeQitem(pMsg);
}
static int32_t qmPutMsgToWorker(SQWorkerAll *pWorker, SNodeMsg *pMsg) {
static int32_t qmPutMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) {
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
return taosWriteQitem(pWorker->queue, pMsg);
}
......@@ -58,7 +58,7 @@ int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgTo
int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->fetchWorker, pMsg); }
static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SQWorkerAll *pWorker, SRpcMsg *pRpc) {
static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg == NULL) {
return -1;
......@@ -110,24 +110,24 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1);
int32_t maxQueryThreads = minQueryThreads;
SQWorkerAllCfg queryCfg = {.minNum = minQueryThreads,
SSingleWorkerCfg queryCfg = {.minNum = minQueryThreads,
.maxNum = maxQueryThreads,
.name = "qnode-query",
.fp = (FItem)qmProcessQueryQueue,
.param = pMgmt};
if (tQWorkerAllInit(&pMgmt->queryWorker, &queryCfg) != 0) {
if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) {
dError("failed to start qnode-query worker since %s", terrstr());
return -1;
}
SQWorkerAllCfg fetchCfg = {.minNum = minFetchThreads,
SSingleWorkerCfg fetchCfg = {.minNum = minFetchThreads,
.maxNum = maxFetchThreads,
.name = "qnode-fetch",
.fp = (FItem)qmProcessFetchQueue,
.param = pMgmt};
if (tQWorkerAllInit(&pMgmt->fetchWorker, &fetchCfg) != 0) {
if (tSingleWorkerInit(&pMgmt->fetchWorker, &fetchCfg) != 0) {
dError("failed to start qnode-fetch worker since %s", terrstr());
return -1;
}
......@@ -136,6 +136,6 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
}
void qmStopWorker(SQnodeMgmt *pMgmt) {
tQWorkerAllCleanup(&pMgmt->queryWorker);
tQWorkerAllCleanup(&pMgmt->fetchWorker);
tSingleWorkerCleanup(&pMgmt->queryWorker);
tSingleWorkerCleanup(&pMgmt->fetchWorker);
}
......@@ -30,8 +30,8 @@ typedef struct SSnodeMgmt {
const char *path;
SRWLatch latch;
int8_t uniqueWorkerInUse;
SArray *uniqueWorkers; // SArray<SWWorkerAll*>
SQWorkerAll sharedWorker;
SArray *uniqueWorkers; // SArray<SMultiWorker*>
SSingleWorker sharedWorker;
} SSnodeMgmt;
// smInt.c
......
......@@ -44,22 +44,22 @@ static void smProcessSharedQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
}
int32_t smStartWorker(SSnodeMgmt *pMgmt) {
pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(SWWorkerAll *));
pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(SMultiWorker *));
if (pMgmt->uniqueWorkers == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < SND_UNIQUE_THREAD_NUM; i++) {
SWWorkerAll *pUniqueWorker = malloc(sizeof(SWWorkerAll));
SMultiWorker *pUniqueWorker = malloc(sizeof(SMultiWorker));
if (pUniqueWorker == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
SWWorkerAllCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt};
SMultiWorkerCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt};
if (tWWorkerAllInit(pUniqueWorker, &cfg) != 0) {
if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) {
dError("failed to start snode-unique worker since %s", terrstr());
return -1;
}
......@@ -69,13 +69,13 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
}
}
SQWorkerAllCfg cfg = {.minNum = SND_SHARED_THREAD_NUM,
SSingleWorkerCfg cfg = {.minNum = SND_SHARED_THREAD_NUM,
.maxNum = SND_SHARED_THREAD_NUM,
.name = "snode-shared",
.fp = (FItem)smProcessSharedQueue,
.param = pMgmt};
if (tQWorkerAllInit(&pMgmt->sharedWorker, &cfg)) {
if (tSingleWorkerInit(&pMgmt->sharedWorker, &cfg)) {
dError("failed to start snode shared-worker since %s", terrstr());
return -1;
}
......@@ -85,11 +85,11 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
void smStopWorker(SSnodeMgmt *pMgmt) {
for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) {
SWWorkerAll *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i);
tWWorkerAllCleanup(pWorker);
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i);
tMultiWorkerCleanup(pWorker);
}
taosArrayDestroy(pMgmt->uniqueWorkers);
tQWorkerAllCleanup(&pMgmt->sharedWorker);
tSingleWorkerCleanup(&pMgmt->sharedWorker);
}
static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) {
......@@ -105,7 +105,7 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) {
}
int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SWWorkerAll *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0);
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0);
if (pWorker == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
......@@ -117,7 +117,7 @@ int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg);
SWWorkerAll *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index);
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index);
if (pWorker == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
......@@ -128,7 +128,7 @@ int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
}
int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SQWorkerAll *pWorker = &pMgmt->sharedWorker;
SSingleWorker *pWorker = &pMgmt->sharedWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
return taosWriteQitem(pWorker->queue, pMsg);
......
......@@ -36,7 +36,7 @@ typedef struct SVnodesMgmt {
const char *path;
SDnode *pDnode;
SMgmtWrapper *pWrapper;
SQWorkerAll mgmtWorker;
SSingleWorker mgmtWorker;
} SVnodesMgmt;
typedef struct {
......
......@@ -200,24 +200,16 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp
return code;
}
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE);
}
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); }
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE);
}
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); }
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE);
}
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); }
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE);
}
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); }
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SQWorkerAll *pWorker = &pMgmt->mgmtWorker;
SSingleWorker *pWorker = &pMgmt->mgmtWorker;
dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
return taosWriteQitem(pWorker->queue, pMsg);
}
......@@ -357,9 +349,9 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
pWPool->max = maxSyncThreads;
if (tWWorkerInit(pWPool) != 0) return -1;
SQWorkerAllCfg cfg = {
SSingleWorkerCfg cfg = {
.minNum = 1, .maxNum = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
if (tQWorkerAllInit(&pMgmt->mgmtWorker, &cfg) != 0) {
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) {
dError("failed to start vnode-mgmt worker since %s", terrstr());
return -1;
}
......@@ -369,7 +361,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
}
void vmStopWorker(SVnodesMgmt *pMgmt) {
tQWorkerAllCleanup(&pMgmt->mgmtWorker);
tSingleWorkerCleanup(&pMgmt->mgmtWorker);
tQWorkerCleanup(&pMgmt->fetchPool);
tQWorkerCleanup(&pMgmt->queryPool);
tWWorkerCleanup(&pMgmt->writePool);
......
......@@ -284,7 +284,7 @@ void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
uDebug("worker:%s, queue:%p is freed", pool->name, queue);
}
int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg) {
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) {
SQWorkerPool *pPool = &pWorker->pool;
pPool->name = pCfg->name;
pPool->min = pCfg->minNum;
......@@ -302,7 +302,7 @@ int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg) {
return 0;
}
void tQWorkerAllCleanup(SQWorkerAll *pWorker) {
void tSingleWorkerCleanup(SSingleWorker *pWorker) {
if (pWorker->queue == NULL) return;
while (!taosQueueEmpty(pWorker->queue)) {
......@@ -313,7 +313,7 @@ void tQWorkerAllCleanup(SQWorkerAll *pWorker) {
tQWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}
int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg) {
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) {
SWWorkerPool *pPool = &pWorker->pool;
pPool->name = pCfg->name;
pPool->max = pCfg->maxNum;
......@@ -330,7 +330,7 @@ int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg) {
return 0;
}
void tWWorkerAllCleanup(SWWorkerAll *pWorker) {
void tMultiWorkerCleanup(SMultiWorker *pWorker) {
if (pWorker->queue == NULL) return;
while (!taosQueueEmpty(pWorker->queue)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册