提交 34e37463 编写于 作者: S Shengliang Guan

TD-2393

上级 259e2941
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "tqueue.h" #include "tqueue.h"
#include "tworker.h"
#include "dnodeVMgmt.h" #include "dnodeVMgmt.h"
typedef struct { typedef struct {
...@@ -23,9 +24,8 @@ typedef struct { ...@@ -23,9 +24,8 @@ typedef struct {
char pCont[]; char pCont[];
} SMgmtMsg; } SMgmtMsg;
static taos_qset tsMgmtQset = NULL; static SWorkerPool tsVMgmtWP;
static taos_queue tsMgmtQueue = NULL; static taos_queue tsVMgmtQueue = NULL;
static pthread_t tsQthread;
static void * dnodeProcessMgmtQueue(void *param); static void * dnodeProcessMgmtQueue(void *param);
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
...@@ -47,45 +47,23 @@ int32_t dnodeInitVMgmt() { ...@@ -47,45 +47,23 @@ int32_t dnodeInitVMgmt() {
int32_t code = vnodeInitMgmt(); int32_t code = vnodeInitMgmt();
if (code != TSDB_CODE_SUCCESS) return -1; if (code != TSDB_CODE_SUCCESS) return -1;
tsMgmtQset = taosOpenQset(); tsVMgmtWP.name = "vmgmt";
if (tsMgmtQset == NULL) { tsVMgmtWP.workerFp = dnodeProcessMgmtQueue;
dError("failed to create the vmgmt queue set"); tsVMgmtWP.min = 1;
return -1; tsVMgmtWP.max = 1;
} if (tWorkerInit(&tsVMgmtWP) != 0) return -1;
tsMgmtQueue = taosOpenQueue();
if (tsMgmtQueue == NULL) {
dError("failed to create the vmgmt queue");
return -1;
}
taosAddIntoQset(tsMgmtQset, tsMgmtQueue, NULL);
pthread_attr_t thAttr; tsVMgmtQueue = tWorkerAllocQueue(&tsVMgmtWP, NULL);
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
code = pthread_create(&tsQthread, &thAttr, dnodeProcessMgmtQueue, NULL);
pthread_attr_destroy(&thAttr);
if (code != 0) {
dError("failed to create thread to process vmgmt queue, reason:%s", strerror(errno));
return -1;
}
dInfo("dnode vmgmt is initialized"); dInfo("dnode vmgmt is initialized");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void dnodeCleanupVMgmt() { void dnodeCleanupVMgmt() {
if (tsMgmtQset) taosQsetThreadResume(tsMgmtQset); tWorkerFreeQueue(&tsVMgmtWP, tsVMgmtQueue);
if (tsQthread) pthread_join(tsQthread, NULL); tWorkerCleanup(&tsVMgmtWP);
if (tsMgmtQueue) taosCloseQueue(tsMgmtQueue);
if (tsMgmtQset) taosCloseQset(tsMgmtQset);
tsMgmtQset = NULL;
tsMgmtQueue = NULL;
tsVMgmtQueue = NULL;
vnodeCleanupMgmt(); vnodeCleanupMgmt();
} }
...@@ -97,7 +75,7 @@ static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) { ...@@ -97,7 +75,7 @@ static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) {
pMgmt->rpcMsg = *pMsg; pMgmt->rpcMsg = *pMsg;
pMgmt->rpcMsg.pCont = pMgmt->pCont; pMgmt->rpcMsg.pCont = pMgmt->pCont;
memcpy(pMgmt->pCont, pMsg->pCont, pMsg->contLen); memcpy(pMgmt->pCont, pMsg->pCont, pMsg->contLen);
taosWriteQitem(tsMgmtQueue, TAOS_QTYPE_RPC, pMgmt); taosWriteQitem(tsVMgmtQueue, TAOS_QTYPE_RPC, pMgmt);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -112,16 +90,18 @@ void dnodeDispatchToVMgmtQueue(SRpcMsg *pMsg) { ...@@ -112,16 +90,18 @@ void dnodeDispatchToVMgmtQueue(SRpcMsg *pMsg) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
static void *dnodeProcessMgmtQueue(void *param) { static void *dnodeProcessMgmtQueue(void *wparam) {
SMgmtMsg *pMgmt; SWorker * pWorker = wparam;
SWorkerPool *pPool = pWorker->pPool;
SMgmtMsg * pMgmt;
SRpcMsg * pMsg; SRpcMsg * pMsg;
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
int32_t qtype; int32_t qtype;
void * handle; void * handle;
while (1) { while (1) {
if (taosReadQitemFromQset(tsMgmtQset, &qtype, (void **)&pMgmt, &handle) == 0) { if (taosReadQitemFromQset(pPool->qset, &qtype, (void **)&pMgmt, &handle) == 0) {
dDebug("qset:%p, dnode mgmt got no message from qset, exit", tsMgmtQset); dDebug("qdnode mgmt got no message from qset:%p, , exit", pPool->qset);
break; break;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册