提交 c6f51591 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/3.0_query_integrate' into 3.0_query_integrate

......@@ -25,23 +25,25 @@ extern "C" {
typedef struct SRpcMsg SRpcMsg;
typedef struct SEpSet SEpSet;
typedef struct SMgmtWrapper SMgmtWrapper;
typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType;
typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueType qtype);
typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq);
typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp);
typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType;
typedef struct {
SMgmtWrapper* pWrapper;
PutToQueueFp queueFps[QUEUE_MAX];
GetQueueSizeFp qsizeFp;
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
} SMsgCb;
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq);
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq);
void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp);
......
......@@ -42,8 +42,14 @@ shall be used to set up the protection.
typedef struct STaosQueue STaosQueue;
typedef struct STaosQset STaosQset;
typedef struct STaosQall STaosQall;
typedef void (*FItem)(void *ahandle, void *pItem);
typedef void (*FItems)(void *ahandle, STaosQall *qall, int32_t numOfItems);
typedef struct {
void *ahandle;
int32_t workerId;
int32_t threadNum;
} SQueueInfo;
typedef void (*FItem)(SQueueInfo *pInfo, void *pItem);
typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems);
STaosQueue *taosOpenQueue();
void taosCloseQueue(STaosQueue *queue);
......
......@@ -51,6 +51,7 @@ typedef struct SWWorker {
typedef struct SWWorkerPool {
int32_t max; // max number of workers
int32_t num;
int32_t nextId; // from 0 to max-1, cyclic
const char *name;
SWWorker *workers;
......@@ -73,31 +74,31 @@ typedef struct {
int32_t maxNum;
FItem fp;
void *param;
} SQWorkerAllCfg;
} SSingleWorkerCfg;
typedef struct {
const char *name;
STaosQueue *queue;
SQWorkerPool pool;
} SQWorkerAll;
} SSingleWorker;
typedef struct {
const char *name;
int32_t maxNum;
FItems fp;
void *param;
} SWWorkerAllCfg;
} SMultiWorkerCfg;
typedef struct {
const char *name;
STaosQueue *queue;
SWWorkerPool pool;
} SWWorkerAll;
} SMultiWorker;
int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg);
void tQWorkerAllCleanup(SQWorkerAll *pWorker);
int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg);
void tWWorkerAllCleanup(SWWorkerAll *pWorker);
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg);
void tSingleWorkerCleanup(SSingleWorker *pWorker);
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg);
void tMultiWorkerCleanup(SMultiWorker *pWorker);
#ifdef __cplusplus
}
......
......@@ -20,6 +20,10 @@ int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq);
}
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
return (*pMsgCb->qsizeFp)(pMsgCb->pWrapper, vgId, qtype);
}
int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq) {
return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq);
}
......
......@@ -28,7 +28,7 @@ typedef struct SBnodeMgmt {
SDnode *pDnode;
SMgmtWrapper *pWrapper;
const char *path;
SDnodeWorker writeWorker;
SMultiWorker writeWorker;
} SBnodeMgmt;
// bmInt.c
......
......@@ -33,7 +33,8 @@ static void bmSendErrorRsps(SMgmtWrapper *pWrapper, STaosQall *qall, int32_t num
}
}
static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs) {
static void bmProcessQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SBnodeMgmt *pMgmt = pInfo->ahandle;
SMgmtWrapper *pWrapper = pMgmt->pWrapper;
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
......@@ -63,14 +64,15 @@ static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs
}
int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = &pMgmt->writeWorker;
SMultiWorker *pWorker = &pMgmt->writeWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
return taosWriteQitem(pWorker->queue, pMsg);
}
int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_MULTI, "bnode-write", 0, 1, bmProcessQueue) != 0) {
SMultiWorkerCfg cfg = {.maxNum = 1, .name = "bnode-write", .fp = (FItems)bmProcessQueue, .param = pMgmt};
if (tMultiWorkerInit(&pMgmt->writeWorker, &cfg) != 0) {
dError("failed to start bnode write worker since %s", terrstr());
return -1;
}
......@@ -78,4 +80,4 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
return 0;
}
void bmStopWorker(SBnodeMgmt *pMgmt) { dndCleanupWorker(&pMgmt->writeWorker); }
void bmStopWorker(SBnodeMgmt *pMgmt) { tMultiWorkerCleanup(&pMgmt->writeWorker); }
......@@ -74,20 +74,6 @@ typedef int32_t (*CreateNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
typedef int32_t (*DropNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
typedef int32_t (*RequireNodeFp)(SMgmtWrapper *pWrapper, bool *required);
typedef struct {
EWorkerType type;
const char *name;
int32_t minNum;
int32_t maxNum;
void *queueFp;
void *param;
STaosQueue *queue;
union {
SQWorkerPool pool;
SWWorkerPool mpool;
};
} SDnodeWorker;
typedef struct SMsgHandle {
int32_t vgId;
NodeMsgFp vgIdMsgFp;
......@@ -161,11 +147,6 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg);
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
int32_t maxNum, void *queueFp);
void dndCleanupWorker(SDnodeWorker *pWorker);
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pMsg);
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dndInt.h"
int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
int32_t maxNum, void *queueFp) {
if (pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || queueFp == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
pWorker->type = type;
pWorker->name = name;
pWorker->minNum = minNum;
pWorker->maxNum = maxNum;
pWorker->queueFp = queueFp;
pWorker->param = param;
if (pWorker->type == DND_WORKER_SINGLE) {
SQWorkerPool *pPool = &pWorker->pool;
pPool->name = name;
pPool->min = minNum;
pPool->max = maxNum;
if (tQWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pWorker->queue = tQWorkerAllocQueue(pPool, param, (FItem)queueFp);
if (pWorker->queue == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
} else if (pWorker->type == DND_WORKER_MULTI) {
SWWorkerPool *pPool = &pWorker->mpool;
pPool->name = name;
pPool->max = maxNum;
if (tWWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pWorker->queue = tWWorkerAllocQueue(pPool, param, (FItems)queueFp);
if (pWorker->queue == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
} else {
terrno = TSDB_CODE_INVALID_PARA;
}
return 0;
}
void dndCleanupWorker(SDnodeWorker *pWorker) {
if (pWorker->queue == NULL) return;
while (!taosQueueEmpty(pWorker->queue)) {
taosMsleep(10);
}
if (pWorker->type == DND_WORKER_SINGLE) {
tQWorkerCleanup(&pWorker->pool);
tQWorkerFreeQueue(&pWorker->pool, pWorker->queue);
} else if (pWorker->type == DND_WORKER_MULTI) {
tWWorkerCleanup(&pWorker->mpool);
tWWorkerFreeQueue(&pWorker->mpool, pWorker->queue);
} else {
}
}
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pMsg) {
if (pWorker == NULL || pWorker->queue == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
if (taosWriteQitem(pWorker->queue, pMsg) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
......@@ -31,8 +31,8 @@ typedef struct SDnodeMgmt {
SArray *dnodeEps;
TdThread *threadId;
SRWLatch latch;
SDnodeWorker mgmtWorker;
SDnodeWorker statusWorker;
SSingleWorker mgmtWorker;
SSingleWorker statusWorker;
const char *path;
SDnode *pDnode;
SMgmtWrapper *pWrapper;
......
......@@ -52,7 +52,9 @@ static void *dmThreadRoutine(void *param) {
}
}
static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SDnodeMgmt *pMgmt = pInfo->ahandle;
SDnode *pDnode = pMgmt->pDnode;
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
......@@ -98,13 +100,17 @@ static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
}
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dmProcessQueue) != 0) {
SSingleWorkerCfg mgmtCfg = {
.minNum = 1, .maxNum = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr());
return -1;
}
if (dndInitWorker(pMgmt, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dmProcessQueue) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr());
SSingleWorkerCfg statusCfg = {
.minNum = 1, .maxNum = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->statusWorker, &statusCfg) != 0) {
dError("failed to start dnode status worker since %s", terrstr());
return -1;
}
......@@ -123,8 +129,8 @@ int32_t dmStartThread(SDnodeMgmt *pMgmt) {
}
void dmStopWorker(SDnodeMgmt *pMgmt) {
dndCleanupWorker(&pMgmt->mgmtWorker);
dndCleanupWorker(&pMgmt->statusWorker);
tSingleWorkerCleanup(&pMgmt->mgmtWorker);
tSingleWorkerCleanup(&pMgmt->statusWorker);
if (pMgmt->threadId != NULL) {
taosDestoryThread(pMgmt->threadId);
......@@ -133,11 +139,11 @@ void dmStopWorker(SDnodeMgmt *pMgmt) {
}
int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
SSingleWorker *pWorker = &pMgmt->mgmtWorker;
if (pMsg->rpcMsg.msgType == TDMT_MND_STATUS_RSP) {
pWorker = &pMgmt->statusWorker;
}
dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
return taosWriteQitem(pWorker->queue, pMsg);
}
......@@ -28,9 +28,9 @@ typedef struct SMnodeMgmt {
SDnode *pDnode;
SMgmtWrapper *pWrapper;
const char *path;
SQWorkerAll readWorker;
SQWorkerAll writeWorker;
SQWorkerAll syncWorker;
SSingleWorker readWorker;
SSingleWorker writeWorker;
SSingleWorker syncWorker;
SReplica replicas[TSDB_MAX_REPLICA];
int8_t replica;
int8_t selfIndex;
......
......@@ -16,7 +16,9 @@
#define _DEFAULT_SOURCE
#include "mmInt.h"
static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, will be processed in mnode queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
......@@ -42,7 +44,7 @@ static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
taosFreeQitem(pMsg);
}
static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SNodeMsg *pMsg) {
static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SNodeMsg *pMsg) {
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
return taosWriteQitem(pWorker->queue, pMsg);
}
......@@ -59,7 +61,7 @@ int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
}
static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SRpcMsg *pRpc) {
static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg == NULL) {
return -1;
......@@ -89,19 +91,19 @@ int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
}
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
SQWorkerAllCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt};
SSingleWorkerCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt};
if (tQWorkerAllInit(&pMgmt->readWorker, &cfg) != 0) {
if (tSingleWorkerInit(&pMgmt->readWorker, &cfg) != 0) {
dError("failed to start mnode-read worker since %s", terrstr());
return -1;
}
if (tQWorkerAllInit(&pMgmt->writeWorker, &cfg) != 0) {
if (tSingleWorkerInit(&pMgmt->writeWorker, &cfg) != 0) {
dError("failed to start mnode-write worker since %s", terrstr());
return -1;
}
if (tQWorkerAllInit(&pMgmt->syncWorker, &cfg) != 0) {
if (tSingleWorkerInit(&pMgmt->syncWorker, &cfg) != 0) {
dError("failed to start mnode sync-worker since %s", terrstr());
return -1;
}
......@@ -110,7 +112,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
}
void mmStopWorker(SMnodeMgmt *pMgmt) {
tQWorkerAllCleanup(&pMgmt->readWorker);
tQWorkerAllCleanup(&pMgmt->writeWorker);
tQWorkerAllCleanup(&pMgmt->syncWorker);
tSingleWorkerCleanup(&pMgmt->readWorker);
tSingleWorkerCleanup(&pMgmt->writeWorker);
tSingleWorkerCleanup(&pMgmt->syncWorker);
}
......@@ -28,8 +28,8 @@ typedef struct SQnodeMgmt {
SDnode *pDnode;
SMgmtWrapper *pWrapper;
const char *path;
SDnodeWorker queryWorker;
SDnodeWorker fetchWorker;
SSingleWorker queryWorker;
SSingleWorker fetchWorker;
} SQnodeMgmt;
// qmInt.c
......@@ -44,6 +44,7 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
// qmWorker.c
int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype);
int32_t qmStartWorker(SQnodeMgmt *pMgmt);
void qmStopWorker(SQnodeMgmt *pMgmt);
......
......@@ -23,6 +23,7 @@ static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) {
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue;
msgCb.qsizeFp = qmGetQueueSize;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
......
......@@ -21,7 +21,9 @@ static void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
dndSendRsp(pWrapper, &rsp);
}
static void qmProcessQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SQnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, will be processed in qnode-query queue", pMsg);
int32_t code = qndProcessQueryMsg(pMgmt->pQnode, &pMsg->rpcMsg);
if (code != 0) {
......@@ -33,7 +35,9 @@ static void qmProcessQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
taosFreeQitem(pMsg);
}
static void qmProcessFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
static void qmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SQnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, will be processed in qnode-fetch queue", pMsg);
int32_t code = qndProcessFetchMsg(pMgmt->pQnode, &pMsg->rpcMsg);
if (code != 0) {
......@@ -45,16 +49,16 @@ static void qmProcessFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
taosFreeQitem(pMsg);
}
static int32_t qmPutMsgToWorker(SDnodeWorker *pWorker, SNodeMsg *pMsg) {
static int32_t qmPutMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) {
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
return taosWriteQitem(pWorker->queue, pMsg);
}
int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->queryWorker, pMsg); }
int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->fetchWorker, pMsg); }
static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRpcMsg *pRpc) {
static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg == NULL) {
return -1;
......@@ -63,7 +67,7 @@ static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRp
dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
pMsg->rpcMsg = *pRpc;
int32_t code = dndWriteMsgToWorker(pWorker, pMsg);
int32_t code = taosWriteQitem(pWorker->queue, pMsg);
if (code != 0) {
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
......@@ -83,21 +87,48 @@ int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return qmPutRpcMsgToWorker(pMgmt, &pMgmt->fetchWorker, pRpc);
}
int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
int32_t size = -1;
SQnodeMgmt *pMgmt = pWrapper->pMgmt;
switch (qtype) {
case QUERY_QUEUE:
size = taosQueueSize(pMgmt->queryWorker.queue);
break;
case FETCH_QUEUE:
size = taosQueueSize(pMgmt->fetchWorker.queue);
break;
default:
break;
}
return size;
}
int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
int32_t maxFetchThreads = 4;
int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores);
int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1);
int32_t maxQueryThreads = minQueryThreads;
if (dndInitWorker(pMgmt, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", minQueryThreads, maxQueryThreads,
qmProcessQueryQueue) != 0) {
dError("failed to start qnode query worker since %s", terrstr());
SSingleWorkerCfg queryCfg = {.minNum = minQueryThreads,
.maxNum = maxQueryThreads,
.name = "qnode-query",
.fp = (FItem)qmProcessQueryQueue,
.param = pMgmt};
if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) {
dError("failed to start qnode-query worker since %s", terrstr());
return -1;
}
if (dndInitWorker(pMgmt, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", minFetchThreads, maxFetchThreads,
qmProcessFetchQueue) != 0) {
dError("failed to start qnode fetch worker since %s", terrstr());
SSingleWorkerCfg fetchCfg = {.minNum = minFetchThreads,
.maxNum = maxFetchThreads,
.name = "qnode-fetch",
.fp = (FItem)qmProcessFetchQueue,
.param = pMgmt};
if (tSingleWorkerInit(&pMgmt->fetchWorker, &fetchCfg) != 0) {
dError("failed to start qnode-fetch worker since %s", terrstr());
return -1;
}
......@@ -105,6 +136,6 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
}
void qmStopWorker(SQnodeMgmt *pMgmt) {
dndCleanupWorker(&pMgmt->queryWorker);
dndCleanupWorker(&pMgmt->fetchWorker);
tSingleWorkerCleanup(&pMgmt->queryWorker);
tSingleWorkerCleanup(&pMgmt->fetchWorker);
}
......@@ -30,8 +30,8 @@ typedef struct SSnodeMgmt {
const char *path;
SRWLatch latch;
int8_t uniqueWorkerInUse;
SArray *uniqueWorkers; // SArray<SDnodeWorker*>
SDnodeWorker sharedWorker;
SArray *uniqueWorkers; // SArray<SMultiWorker*>
SSingleWorker sharedWorker;
} SSnodeMgmt;
// smInt.c
......
......@@ -16,7 +16,9 @@
#define _DEFAULT_SOURCE
#include "smInt.h"
static void smProcessUniqueQueue(SSnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs) {
static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SSnodeMgmt *pMgmt = pInfo->ahandle;
for (int32_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg);
......@@ -30,7 +32,9 @@ static void smProcessUniqueQueue(SSnodeMgmt *pMgmt, STaosQall *qall, int32_t num
}
}
static void smProcessSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
static void smProcessSharedQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, will be processed in snode shared queue", pMsg);
sndProcessSMsg(pMgmt->pSnode, &pMsg->rpcMsg);
......@@ -40,20 +44,23 @@ static void smProcessSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
}
int32_t smStartWorker(SSnodeMgmt *pMgmt) {
pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *));
pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(SMultiWorker *));
if (pMgmt->uniqueWorkers == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < SND_UNIQUE_THREAD_NUM; i++) {
SDnodeWorker *pUniqueWorker = malloc(sizeof(SDnodeWorker));
SMultiWorker *pUniqueWorker = malloc(sizeof(SMultiWorker));
if (pUniqueWorker == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dndInitWorker(pMgmt, pUniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, smProcessSharedQueue) != 0) {
dError("failed to start snode unique worker since %s", terrstr());
SMultiWorkerCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt};
if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) {
dError("failed to start snode-unique worker since %s", terrstr());
return -1;
}
if (taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker) == NULL) {
......@@ -62,9 +69,14 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
}
}
if (dndInitWorker(pMgmt, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", SND_SHARED_THREAD_NUM,
SND_SHARED_THREAD_NUM, smProcessSharedQueue)) {
dError("failed to start snode shared worker since %s", terrstr());
SSingleWorkerCfg cfg = {.minNum = SND_SHARED_THREAD_NUM,
.maxNum = SND_SHARED_THREAD_NUM,
.name = "snode-shared",
.fp = (FItem)smProcessSharedQueue,
.param = pMgmt};
if (tSingleWorkerInit(&pMgmt->sharedWorker, &cfg)) {
dError("failed to start snode shared-worker since %s", terrstr());
return -1;
}
......@@ -73,11 +85,11 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
void smStopWorker(SSnodeMgmt *pMgmt) {
for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) {
SDnodeWorker *worker = taosArrayGetP(pMgmt->uniqueWorkers, i);
dndCleanupWorker(worker);
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i);
tMultiWorkerCleanup(pWorker);
}
taosArrayDestroy(pMgmt->uniqueWorkers);
dndCleanupWorker(&pMgmt->sharedWorker);
tSingleWorkerCleanup(&pMgmt->sharedWorker);
}
static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) {
......@@ -93,33 +105,33 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) {
}
int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0);
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0);
if (pWorker == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
return taosWriteQitem(pWorker->queue, pMsg);
}
int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg);
SDnodeWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index);
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index);
if (pWorker == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
return taosWriteQitem(pWorker->queue, pMsg);
}
int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = &pMgmt->sharedWorker;
SSingleWorker *pWorker = &pMgmt->sharedWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
return taosWriteQitem(pWorker->queue, pMsg);
}
int32_t smProcessExecMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
......
......@@ -36,7 +36,7 @@ typedef struct SVnodesMgmt {
const char *path;
SDnode *pDnode;
SMgmtWrapper *pWrapper;
SDnodeWorker mgmtWorker;
SSingleWorker mgmtWorker;
} SVnodesMgmt;
typedef struct {
......@@ -104,6 +104,7 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype);
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
......
......@@ -133,6 +133,7 @@ static void *vmOpenVnodeFunc(void *param) {
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
msgCb.qsizeFp = vmGetQueueSize;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
......
......@@ -87,6 +87,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
msgCb.qsizeFp = vmGetQueueSize;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
......
......@@ -21,7 +21,9 @@ static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
dndSendRsp(pWrapper, &rsp);
}
static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SVnodesMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg);
......@@ -57,7 +59,9 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
taosFreeQitem(pMsg);
}
static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
static void vmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SVnodeObj *pVnode = pInfo->ahandle;
dTrace("msg:%p, will be processed in vnode-query queue", pMsg);
int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
if (code != 0) {
......@@ -68,7 +72,9 @@ static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
}
}
static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SVnodeObj *pVnode = pInfo->ahandle;
dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
if (code != 0) {
......@@ -79,7 +85,9 @@ static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
}
}
static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
if (pArray == NULL) {
dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
......@@ -126,7 +134,8 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO
taosArrayDestroy(pArray);
}
static void vmProcessApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
......@@ -138,7 +147,8 @@ static void vmProcessApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO
}
}
static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
......@@ -190,26 +200,18 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp
return code;
}
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE);
}
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); }
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE);
}
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); }
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE);
}
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); }
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE);
}
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); }
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
SSingleWorker *pWorker = &pMgmt->mgmtWorker;
dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
return taosWriteQitem(pWorker->queue, pMsg);
}
static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) {
......@@ -258,6 +260,34 @@ int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE);
}
int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
int32_t size = -1;
SVnodeObj *pVnode = vmAcquireVnode(pWrapper->pMgmt, vgId);
if (pVnode != NULL) {
switch (qtype) {
case QUERY_QUEUE:
size = taosQueueSize(pVnode->pQueryQ);
break;
case FETCH_QUEUE:
size = taosQueueSize(pVnode->pFetchQ);
break;
case WRITE_QUEUE:
size = taosQueueSize(pVnode->pWriteQ);
break;
case SYNC_QUEUE:
size = taosQueueSize(pVnode->pSyncQ);
break;
case APPLY_QUEUE:
size = taosQueueSize(pVnode->pApplyQ);
break;
default:
break;
}
}
vmReleaseVnode(pWrapper->pMgmt, pVnode);
return size;
}
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue);
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
......@@ -319,7 +349,9 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
pWPool->max = maxSyncThreads;
if (tWWorkerInit(pWPool) != 0) return -1;
if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "vnode-mgmt", 1, 1, vmProcessMgmtQueue) != 0) {
SSingleWorkerCfg cfg = {
.minNum = 1, .maxNum = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) {
dError("failed to start vnode-mgmt worker since %s", terrstr());
return -1;
}
......@@ -329,7 +361,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
}
void vmStopWorker(SVnodesMgmt *pMgmt) {
dndCleanupWorker(&pMgmt->mgmtWorker);
tSingleWorkerCleanup(&pMgmt->mgmtWorker);
tQWorkerCleanup(&pMgmt->fetchPool);
tQWorkerCleanup(&pMgmt->queryPool);
tWWorkerCleanup(&pMgmt->writePool);
......
......@@ -86,7 +86,8 @@ static void *tQWorkerThreadFp(SQWorker *worker) {
}
if (fp != NULL) {
(*fp)(ahandle, msg);
SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num};
(*fp)(&info, msg);
}
}
......@@ -210,7 +211,8 @@ static void *tWWorkerThreadFp(SWWorker *worker) {
}
if (fp != NULL) {
(*fp)(ahandle, worker->qall, numOfMsgs);
SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num};
(*fp)(&info, worker->qall, numOfMsgs);
}
}
......@@ -264,6 +266,8 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
}
taosThreadAttrDestroy(&thAttr);
pool->num++;
if (pool->num > pool->max) pool->num = pool->max;
} else {
taosAddIntoQset(worker->qset, queue, ahandle);
pool->nextId = (pool->nextId + 1) % pool->max;
......@@ -280,7 +284,7 @@ void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
uDebug("worker:%s, queue:%p is freed", pool->name, queue);
}
int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg) {
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) {
SQWorkerPool *pPool = &pWorker->pool;
pPool->name = pCfg->name;
pPool->min = pCfg->minNum;
......@@ -298,7 +302,7 @@ int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg) {
return 0;
}
void tQWorkerAllCleanup(SQWorkerAll *pWorker) {
void tSingleWorkerCleanup(SSingleWorker *pWorker) {
if (pWorker->queue == NULL) return;
while (!taosQueueEmpty(pWorker->queue)) {
......@@ -309,7 +313,7 @@ void tQWorkerAllCleanup(SQWorkerAll *pWorker) {
tQWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}
int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg) {
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) {
SWWorkerPool *pPool = &pWorker->pool;
pPool->name = pCfg->name;
pPool->max = pCfg->maxNum;
......@@ -326,7 +330,7 @@ int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg) {
return 0;
}
void tWWorkerAllCleanup(SWWorkerAll *pWorker) {
void tMultiWorkerCleanup(SMultiWorker *pWorker) {
if (pWorker->queue == NULL) return;
while (!taosQueueEmpty(pWorker->queue)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册