diff --git a/src/dnode/src/dnodeMPeer.c b/src/dnode/src/dnodeMPeer.c index b2491d4bfdd3289845f5df4ed5e76014b0cf8eaa..8414d79a9815287dfdfae3af5cc123304745c56d 100644 --- a/src/dnode/src/dnodeMPeer.c +++ b/src/dnode/src/dnodeMPeer.c @@ -33,7 +33,8 @@ typedef struct { } SMPeerWorker; typedef struct { - int32_t num; + int32_t curNum; + int32_t maxNum; SMPeerWorker *peerWorker; } SMPeerWorkerPool; @@ -46,37 +47,44 @@ static void *dnodeProcessMnodePeerQueue(void *param); int32_t dnodeInitMnodePeer() { tsMPeerQset = taosOpenQset(); - tsMPeerPool.num = 1; - tsMPeerPool.peerWorker = (SMPeerWorker *)calloc(sizeof(SMPeerWorker), tsMPeerPool.num); + tsMPeerPool.maxNum = 1; + tsMPeerPool.curNum = 0; + tsMPeerPool.peerWorker = (SMPeerWorker *)calloc(sizeof(SMPeerWorker), tsMPeerPool.maxNum); if (tsMPeerPool.peerWorker == NULL) return -1; - for (int32_t i = 0; i < tsMPeerPool.num; ++i) { + for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) { SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i; pWorker->workerId = i; + dDebug("dnode mpeer worker:%d is created", i); } - dInfo("dnode mpeer is opened"); + dDebug("dnode mpeer is opened, workers:%d qset:%p", tsMPeerPool.maxNum, tsMPeerQset); return 0; } void dnodeCleanupMnodePeer() { - for (int32_t i = 0; i < tsMPeerPool.num; ++i) { + for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) { SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i; if (pWorker->thread) { taosQsetThreadResume(tsMPeerQset); } + dDebug("dnode mpeer worker:%d is closed", i); } - for (int32_t i = 0; i < tsMPeerPool.num; ++i) { + for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) { SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i; + dDebug("dnode mpeer worker:%d start to join", i); if (pWorker->thread) { pthread_join(pWorker->thread, NULL); } + dDebug("dnode mpeer worker:%d join success", i); } + dDebug("dnode mpeer is closed, qset:%p", tsMPeerQset); + taosCloseQset(tsMPeerQset); + tsMPeerQset = NULL; taosTFree(tsMPeerPool.peerWorker); - dInfo("dnode mpeer is closed"); } int32_t dnodeAllocateMnodePqueue() { @@ -85,7 +93,7 @@ int32_t dnodeAllocateMnodePqueue() { taosAddIntoQset(tsMPeerQset, tsMPeerQueue, NULL); - for (int32_t i = 0; i < tsMPeerPool.num; ++i) { + for (int32_t i = tsMPeerPool.curNum; i < tsMPeerPool.maxNum; ++i) { SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i; pWorker->workerId = i; @@ -98,7 +106,9 @@ int32_t dnodeAllocateMnodePqueue() { } pthread_attr_destroy(&thAttr); - dDebug("dnode mpeer worker:%d is launched, total:%d", pWorker->workerId, tsMPeerPool.num); + + tsMPeerPool.curNum = i + 1; + dDebug("dnode mpeer worker:%d is launched, total:%d", pWorker->workerId, tsMPeerPool.maxNum); } dDebug("dnode mpeer queue:%p is allocated", tsMPeerQueue); @@ -106,6 +116,7 @@ int32_t dnodeAllocateMnodePqueue() { } void dnodeFreeMnodePqueue() { + dDebug("dnode mpeer queue:%p is freed", tsMPeerQueue); taosCloseQueue(tsMPeerQueue); tsMPeerQueue = NULL; } diff --git a/src/dnode/src/dnodeMRead.c b/src/dnode/src/dnodeMRead.c index a92c81a2f1f5eb832fe20e76d1fbb5a5dcf2a8fe..fdcbb5889f766ddebdb3f1e56ccffa0b5b129552 100644 --- a/src/dnode/src/dnodeMRead.c +++ b/src/dnode/src/dnodeMRead.c @@ -33,7 +33,8 @@ typedef struct { } SMReadWorker; typedef struct { - int32_t num; + int32_t curNum; + int32_t maxNum; SMReadWorker *readWorker; } SMReadWorkerPool; @@ -46,40 +47,46 @@ static void *dnodeProcessMnodeReadQueue(void *param); int32_t dnodeInitMnodeRead() { tsMReadQset = taosOpenQset(); - tsMReadPool.num = tsNumOfCores * tsNumOfThreadsPerCore / 2; - tsMReadPool.num = MAX(2, tsMReadPool.num); - tsMReadPool.num = MIN(4, tsMReadPool.num); - tsMReadPool.readWorker = (SMReadWorker *)calloc(sizeof(SMReadWorker), tsMReadPool.num); + tsMReadPool.maxNum = tsNumOfCores * tsNumOfThreadsPerCore / 2; + tsMReadPool.maxNum = MAX(2, tsMReadPool.maxNum); + tsMReadPool.maxNum = MIN(4, tsMReadPool.maxNum); + tsMReadPool.curNum = 0; + tsMReadPool.readWorker = (SMReadWorker *)calloc(sizeof(SMReadWorker), tsMReadPool.maxNum); if (tsMReadPool.readWorker == NULL) return -1; - for (int32_t i = 0; i < tsMReadPool.num; ++i) { + for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) { SMReadWorker *pWorker = tsMReadPool.readWorker + i; pWorker->workerId = i; + dDebug("dnode mread worker:%d is created", i); } - dInfo("dnode mread is opened"); + dDebug("dnode mread is opened, workers:%d qset:%p", tsMReadPool.maxNum, tsMReadQset); return 0; } void dnodeCleanupMnodeRead() { - for (int32_t i = 0; i < tsMReadPool.num; ++i) { + for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) { SMReadWorker *pWorker = tsMReadPool.readWorker + i; if (pWorker->thread) { taosQsetThreadResume(tsMReadQset); } + dDebug("dnode mread worker:%d is closed", i); } - for (int32_t i = 0; i < tsMReadPool.num; ++i) { + for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) { SMReadWorker *pWorker = tsMReadPool.readWorker + i; + dDebug("dnode mread worker:%d start to join", i); if (pWorker->thread) { pthread_join(pWorker->thread, NULL); } + dDebug("dnode mread worker:%d start to join", i); } + dDebug("dnode mread is closed, qset:%p", tsMReadQset); + taosCloseQset(tsMReadQset); + tsMReadQset = NULL; free(tsMReadPool.readWorker); - - dInfo("dnode mread is closed"); } int32_t dnodeAllocateMnodeRqueue() { @@ -88,7 +95,7 @@ int32_t dnodeAllocateMnodeRqueue() { taosAddIntoQset(tsMReadQset, tsMReadQueue, NULL); - for (int32_t i = 0; i < tsMReadPool.num; ++i) { + for (int32_t i = tsMReadPool.curNum; i < tsMReadPool.maxNum; ++i) { SMReadWorker *pWorker = tsMReadPool.readWorker + i; pWorker->workerId = i; @@ -101,7 +108,8 @@ int32_t dnodeAllocateMnodeRqueue() { } pthread_attr_destroy(&thAttr); - dDebug("dnode mread worker:%d is launched, total:%d", pWorker->workerId, tsMReadPool.num); + tsMReadPool.curNum = i + 1; + dDebug("dnode mread worker:%d is launched, total:%d", pWorker->workerId, tsMReadPool.maxNum); } dDebug("dnode mread queue:%p is allocated", tsMReadQueue); @@ -109,6 +117,7 @@ int32_t dnodeAllocateMnodeRqueue() { } void dnodeFreeMnodeRqueue() { + dDebug("dnode mread queue:%p is freed", tsMReadQueue); taosCloseQueue(tsMReadQueue); tsMReadQueue = NULL; } diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index b1191e23a9a9aa9dfcbd4692c6ee7fe69ccaff72..384a0fae75088197d7fb01dd67c6e1b9d38739cd 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -34,7 +34,8 @@ typedef struct { } SMWriteWorker; typedef struct { - int32_t num; + int32_t curNum; + int32_t maxNum; SMWriteWorker *writeWorker; } SMWriteWorkerPool; @@ -47,38 +48,45 @@ static void *dnodeProcessMnodeWriteQueue(void *param); int32_t dnodeInitMnodeWrite() { tsMWriteQset = taosOpenQset(); - - tsMWritePool.num = 1; - tsMWritePool.writeWorker = (SMWriteWorker *)calloc(sizeof(SMWriteWorker), tsMWritePool.num); + + tsMWritePool.maxNum = 1; + tsMWritePool.curNum = 0; + tsMWritePool.writeWorker = (SMWriteWorker *)calloc(sizeof(SMWriteWorker), tsMWritePool.maxNum); if (tsMWritePool.writeWorker == NULL) return -1; - for (int32_t i = 0; i < tsMWritePool.num; ++i) { + for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) { SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; pWorker->workerId = i; + dDebug("dnode mwrite worker:%d is created", i); } - dInfo("dnode mwrite is opened"); + dDebug("dnode mwrite is opened, workers:%d qset:%p", tsMWritePool.maxNum, tsMWriteQset); return 0; } void dnodeCleanupMnodeWrite() { - for (int32_t i = 0; i < tsMWritePool.num; ++i) { + for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) { SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; if (pWorker->thread) { taosQsetThreadResume(tsMWriteQset); } + dDebug("dnode mwrite worker:%d is closed", i); } - for (int32_t i = 0; i < tsMWritePool.num; ++i) { + for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) { SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; + dDebug("dnode mwrite worker:%d start to join", i); if (pWorker->thread) { pthread_join(pWorker->thread, NULL); } + dDebug("dnode mwrite worker:%d join success", i); } + dDebug("dnode mwrite is closed, qset:%p", tsMWriteQset); + taosCloseQset(tsMWriteQset); + tsMWriteQset = NULL; taosTFree(tsMWritePool.writeWorker); - dInfo("dnode mwrite is closed"); } int32_t dnodeAllocateMnodeWqueue() { @@ -87,7 +95,7 @@ int32_t dnodeAllocateMnodeWqueue() { taosAddIntoQset(tsMWriteQset, tsMWriteQueue, NULL); - for (int32_t i = 0; i < tsMWritePool.num; ++i) { + for (int32_t i = tsMWritePool.curNum; i < tsMWritePool.maxNum; ++i) { SMWriteWorker *pWorker = tsMWritePool.writeWorker + i; pWorker->workerId = i; @@ -100,7 +108,8 @@ int32_t dnodeAllocateMnodeWqueue() { } pthread_attr_destroy(&thAttr); - dDebug("dnode mwrite worker:%d is launched, total:%d", pWorker->workerId, tsMWritePool.num); + tsMWritePool.curNum = i + 1; + dDebug("dnode mwrite worker:%d is launched, total:%d", pWorker->workerId, tsMWritePool.maxNum); } dDebug("dnode mwrite queue:%p is allocated", tsMWriteQueue); @@ -108,6 +117,7 @@ int32_t dnodeAllocateMnodeWqueue() { } void dnodeFreeMnodeWqueue() { + dDebug("dnode mwrite queue:%p is freed", tsMWriteQueue); taosCloseQueue(tsMWriteQueue); tsMWriteQueue = NULL; } @@ -122,11 +132,15 @@ void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) { SMnodeMsg *pWrite = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg)); mnodeCreateMsg(pWrite, pMsg); - dDebug("app:%p:%p, msg:%s is put into mwrite queue", pWrite->rpcMsg.ahandle, pWrite, taosMsg[pWrite->rpcMsg.msgType]); + dDebug("app:%p:%p, msg:%s is put into mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite, + taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); } static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) { + dDebug("app:%p:%p, msg:%s is freed from mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite, + taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); + mnodeCleanupMsg(pWrite); taosFreeQitem(pWrite); } @@ -182,8 +196,8 @@ void dnodeReprocessMnodeWriteMsg(void *pMsg) { dnodeSendRedirectMsg(pMsg, true); dnodeFreeMnodeWriteMsg(pWrite); } else { - dDebug("app:%p:%p, msg:%s is reput into mwrite queue, retry times:%d", pWrite->rpcMsg.ahandle, pWrite, - taosMsg[pWrite->rpcMsg.msgType], pWrite->retry); + dDebug("app:%p:%p, msg:%s is reput into mwrite queue:%p, retry times:%d", pWrite->rpcMsg.ahandle, pWrite, + taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue, pWrite->retry); taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); }