提交 4d30dc9d 编写于 作者: S Shengliang Guan

TD-1657

上级 1abffd94
......@@ -33,7 +33,8 @@ typedef struct {
} SMPeerWorker;
typedef struct {
int32_t num;
int32_t curNum;
int32_t maxNum;
SMPeerWorker *peerWorker;
} SMPeerWorkerPool;
......@@ -46,37 +47,44 @@ static void *dnodeProcessMnodePeerQueue(void *param);
int32_t dnodeInitMnodePeer() {
tsMPeerQset = taosOpenQset();
tsMPeerPool.num = 1;
tsMPeerPool.peerWorker = (SMPeerWorker *)calloc(sizeof(SMPeerWorker), tsMPeerPool.num);
tsMPeerPool.maxNum = 1;
tsMPeerPool.curNum = 0;
tsMPeerPool.peerWorker = (SMPeerWorker *)calloc(sizeof(SMPeerWorker), tsMPeerPool.maxNum);
if (tsMPeerPool.peerWorker == NULL) return -1;
for (int32_t i = 0; i < tsMPeerPool.num; ++i) {
for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) {
SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i;
pWorker->workerId = i;
dDebug("dnode mpeer worker:%d is created", i);
}
dInfo("dnode mpeer is opened");
dDebug("dnode mpeer is opened, workers:%d qset:%p", tsMPeerPool.maxNum, tsMPeerQset);
return 0;
}
void dnodeCleanupMnodePeer() {
for (int32_t i = 0; i < tsMPeerPool.num; ++i) {
for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) {
SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(tsMPeerQset);
}
dDebug("dnode mpeer worker:%d is closed", i);
}
for (int32_t i = 0; i < tsMPeerPool.num; ++i) {
for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) {
SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i;
dDebug("dnode mpeer worker:%d start to join", i);
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
}
dDebug("dnode mpeer worker:%d join success", i);
}
dDebug("dnode mpeer is closed, qset:%p", tsMPeerQset);
taosCloseQset(tsMPeerQset);
tsMPeerQset = NULL;
taosTFree(tsMPeerPool.peerWorker);
dInfo("dnode mpeer is closed");
}
int32_t dnodeAllocateMnodePqueue() {
......@@ -85,7 +93,7 @@ int32_t dnodeAllocateMnodePqueue() {
taosAddIntoQset(tsMPeerQset, tsMPeerQueue, NULL);
for (int32_t i = 0; i < tsMPeerPool.num; ++i) {
for (int32_t i = tsMPeerPool.curNum; i < tsMPeerPool.maxNum; ++i) {
SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i;
pWorker->workerId = i;
......@@ -98,7 +106,9 @@ int32_t dnodeAllocateMnodePqueue() {
}
pthread_attr_destroy(&thAttr);
dDebug("dnode mpeer worker:%d is launched, total:%d", pWorker->workerId, tsMPeerPool.num);
tsMPeerPool.curNum = i + 1;
dDebug("dnode mpeer worker:%d is launched, total:%d", pWorker->workerId, tsMPeerPool.maxNum);
}
dDebug("dnode mpeer queue:%p is allocated", tsMPeerQueue);
......@@ -106,6 +116,7 @@ int32_t dnodeAllocateMnodePqueue() {
}
void dnodeFreeMnodePqueue() {
dDebug("dnode mpeer queue:%p is freed", tsMPeerQueue);
taosCloseQueue(tsMPeerQueue);
tsMPeerQueue = NULL;
}
......
......@@ -33,7 +33,8 @@ typedef struct {
} SMReadWorker;
typedef struct {
int32_t num;
int32_t curNum;
int32_t maxNum;
SMReadWorker *readWorker;
} SMReadWorkerPool;
......@@ -46,40 +47,46 @@ static void *dnodeProcessMnodeReadQueue(void *param);
int32_t dnodeInitMnodeRead() {
tsMReadQset = taosOpenQset();
tsMReadPool.num = tsNumOfCores * tsNumOfThreadsPerCore / 2;
tsMReadPool.num = MAX(2, tsMReadPool.num);
tsMReadPool.num = MIN(4, tsMReadPool.num);
tsMReadPool.readWorker = (SMReadWorker *)calloc(sizeof(SMReadWorker), tsMReadPool.num);
tsMReadPool.maxNum = tsNumOfCores * tsNumOfThreadsPerCore / 2;
tsMReadPool.maxNum = MAX(2, tsMReadPool.maxNum);
tsMReadPool.maxNum = MIN(4, tsMReadPool.maxNum);
tsMReadPool.curNum = 0;
tsMReadPool.readWorker = (SMReadWorker *)calloc(sizeof(SMReadWorker), tsMReadPool.maxNum);
if (tsMReadPool.readWorker == NULL) return -1;
for (int32_t i = 0; i < tsMReadPool.num; ++i) {
for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) {
SMReadWorker *pWorker = tsMReadPool.readWorker + i;
pWorker->workerId = i;
dDebug("dnode mread worker:%d is created", i);
}
dInfo("dnode mread is opened");
dDebug("dnode mread is opened, workers:%d qset:%p", tsMReadPool.maxNum, tsMReadQset);
return 0;
}
void dnodeCleanupMnodeRead() {
for (int32_t i = 0; i < tsMReadPool.num; ++i) {
for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) {
SMReadWorker *pWorker = tsMReadPool.readWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(tsMReadQset);
}
dDebug("dnode mread worker:%d is closed", i);
}
for (int32_t i = 0; i < tsMReadPool.num; ++i) {
for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) {
SMReadWorker *pWorker = tsMReadPool.readWorker + i;
dDebug("dnode mread worker:%d start to join", i);
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
}
dDebug("dnode mread worker:%d start to join", i);
}
dDebug("dnode mread is closed, qset:%p", tsMReadQset);
taosCloseQset(tsMReadQset);
tsMReadQset = NULL;
free(tsMReadPool.readWorker);
dInfo("dnode mread is closed");
}
int32_t dnodeAllocateMnodeRqueue() {
......@@ -88,7 +95,7 @@ int32_t dnodeAllocateMnodeRqueue() {
taosAddIntoQset(tsMReadQset, tsMReadQueue, NULL);
for (int32_t i = 0; i < tsMReadPool.num; ++i) {
for (int32_t i = tsMReadPool.curNum; i < tsMReadPool.maxNum; ++i) {
SMReadWorker *pWorker = tsMReadPool.readWorker + i;
pWorker->workerId = i;
......@@ -101,7 +108,8 @@ int32_t dnodeAllocateMnodeRqueue() {
}
pthread_attr_destroy(&thAttr);
dDebug("dnode mread worker:%d is launched, total:%d", pWorker->workerId, tsMReadPool.num);
tsMReadPool.curNum = i + 1;
dDebug("dnode mread worker:%d is launched, total:%d", pWorker->workerId, tsMReadPool.maxNum);
}
dDebug("dnode mread queue:%p is allocated", tsMReadQueue);
......@@ -109,6 +117,7 @@ int32_t dnodeAllocateMnodeRqueue() {
}
void dnodeFreeMnodeRqueue() {
dDebug("dnode mread queue:%p is freed", tsMReadQueue);
taosCloseQueue(tsMReadQueue);
tsMReadQueue = NULL;
}
......
......@@ -34,7 +34,8 @@ typedef struct {
} SMWriteWorker;
typedef struct {
int32_t num;
int32_t curNum;
int32_t maxNum;
SMWriteWorker *writeWorker;
} SMWriteWorkerPool;
......@@ -47,38 +48,45 @@ static void *dnodeProcessMnodeWriteQueue(void *param);
int32_t dnodeInitMnodeWrite() {
tsMWriteQset = taosOpenQset();
tsMWritePool.num = 1;
tsMWritePool.writeWorker = (SMWriteWorker *)calloc(sizeof(SMWriteWorker), tsMWritePool.num);
tsMWritePool.maxNum = 1;
tsMWritePool.curNum = 0;
tsMWritePool.writeWorker = (SMWriteWorker *)calloc(sizeof(SMWriteWorker), tsMWritePool.maxNum);
if (tsMWritePool.writeWorker == NULL) return -1;
for (int32_t i = 0; i < tsMWritePool.num; ++i) {
for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) {
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
pWorker->workerId = i;
dDebug("dnode mwrite worker:%d is created", i);
}
dInfo("dnode mwrite is opened");
dDebug("dnode mwrite is opened, workers:%d qset:%p", tsMWritePool.maxNum, tsMWriteQset);
return 0;
}
void dnodeCleanupMnodeWrite() {
for (int32_t i = 0; i < tsMWritePool.num; ++i) {
for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) {
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(tsMWriteQset);
}
dDebug("dnode mwrite worker:%d is closed", i);
}
for (int32_t i = 0; i < tsMWritePool.num; ++i) {
for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) {
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
dDebug("dnode mwrite worker:%d start to join", i);
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
}
dDebug("dnode mwrite worker:%d join success", i);
}
dDebug("dnode mwrite is closed, qset:%p", tsMWriteQset);
taosCloseQset(tsMWriteQset);
tsMWriteQset = NULL;
taosTFree(tsMWritePool.writeWorker);
dInfo("dnode mwrite is closed");
}
int32_t dnodeAllocateMnodeWqueue() {
......@@ -87,7 +95,7 @@ int32_t dnodeAllocateMnodeWqueue() {
taosAddIntoQset(tsMWriteQset, tsMWriteQueue, NULL);
for (int32_t i = 0; i < tsMWritePool.num; ++i) {
for (int32_t i = tsMWritePool.curNum; i < tsMWritePool.maxNum; ++i) {
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
pWorker->workerId = i;
......@@ -100,7 +108,8 @@ int32_t dnodeAllocateMnodeWqueue() {
}
pthread_attr_destroy(&thAttr);
dDebug("dnode mwrite worker:%d is launched, total:%d", pWorker->workerId, tsMWritePool.num);
tsMWritePool.curNum = i + 1;
dDebug("dnode mwrite worker:%d is launched, total:%d", pWorker->workerId, tsMWritePool.maxNum);
}
dDebug("dnode mwrite queue:%p is allocated", tsMWriteQueue);
......@@ -108,6 +117,7 @@ int32_t dnodeAllocateMnodeWqueue() {
}
void dnodeFreeMnodeWqueue() {
dDebug("dnode mwrite queue:%p is freed", tsMWriteQueue);
taosCloseQueue(tsMWriteQueue);
tsMWriteQueue = NULL;
}
......@@ -122,11 +132,15 @@ void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) {
SMnodeMsg *pWrite = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg));
mnodeCreateMsg(pWrite, pMsg);
dDebug("app:%p:%p, msg:%s is put into mwrite queue", pWrite->rpcMsg.ahandle, pWrite, taosMsg[pWrite->rpcMsg.msgType]);
dDebug("app:%p:%p, msg:%s is put into mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
}
static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) {
dDebug("app:%p:%p, msg:%s is freed from mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
mnodeCleanupMsg(pWrite);
taosFreeQitem(pWrite);
}
......@@ -182,8 +196,8 @@ void dnodeReprocessMnodeWriteMsg(void *pMsg) {
dnodeSendRedirectMsg(pMsg, true);
dnodeFreeMnodeWriteMsg(pWrite);
} else {
dDebug("app:%p:%p, msg:%s is reput into mwrite queue, retry times:%d", pWrite->rpcMsg.ahandle, pWrite,
taosMsg[pWrite->rpcMsg.msgType], pWrite->retry);
dDebug("app:%p:%p, msg:%s is reput into mwrite queue:%p, retry times:%d", pWrite->rpcMsg.ahandle, pWrite,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue, pWrite->retry);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册