From 266bf53a099b2542e02d980056dbc3d7d9501379 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 11 Mar 2022 14:44:17 +0800 Subject: [PATCH] shm --- include/dnode/mnode/mnode.h | 3 +- include/util/tprocess.h | 33 +-- source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h | 4 +- .../dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c | 21 +- source/util/src/tprocess.c | 234 +++++++++++------- 5 files changed, 171 insertions(+), 124 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index e1c068d88a..513da09f79 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -25,7 +25,6 @@ extern "C" { /* ------------------------ TYPES EXPOSED ------------------------ */ typedef struct SDnode SDnode; typedef struct SMnode SMnode; -typedef struct SMndMsg SMndMsg; typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef int32_t (*PutReqToMWriteQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); @@ -33,7 +32,7 @@ typedef int32_t (*PutReqToMReadQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); -typedef struct SMndMsg { +typedef struct { char user[TSDB_USER_LEN]; char db[TSDB_DB_FNAME_LEN]; int32_t acctId; diff --git a/include/util/tprocess.h b/include/util/tprocess.h index 0e6f25e637..11aafe710e 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -22,43 +22,26 @@ extern "C" { #endif -typedef struct { - int32_t contLen; - char pCont[]; -} SBlockItem; - -typedef void *(*ProcFp)(void *parent, SBlockItem *pItem); - typedef struct SProcQueue SProcQueue; +typedef struct SProcObj SProcObj; +typedef void *(*ProcFp)(void *pParent, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); typedef struct { - void *pParent; - bool testFlag; int32_t childQueueSize; int32_t parentQueueSize; ProcFp childFp; ProcFp parentFp; + void *pParent; + bool testFlag; } SProcCfg; -typedef struct { - int32_t pid; - SProcQueue *pChildQueue; - SProcQueue *pParentQueue; - pthread_t childThread; - pthread_t parentThread; - ProcFp childFp; - ProcFp parentFp; - void *pParent; - bool stopFlag; - bool testFlag; - bool isChild; -} SProcObj; - SProcObj *taosProcInit(const SProcCfg *pCfg); +void taosProcCleanup(SProcObj *pProc); int32_t taosProcStart(SProcObj *pProc); void taosProcStop(SProcObj *pProc); -void taosProcCleanup(SProcObj *pProc); -int32_t taosProcPushChild(SProcObj *pProc, void *pCont, int32_t contLen); + +int32_t taosProcPutToChildQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); +int32_t taosProcPutToParentQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h b/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h index 7e8d01836e..88a0d2483a 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h +++ b/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h @@ -52,8 +52,8 @@ void mmInitMsgFp(SMnodeMgmt *pMgmt); void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -void mmConsumeChildQueue(SDnode *pDnode, SBlockItem *pBlock); -void mmConsumeParentQueue(SMnodeMgmt *pMgmt, SBlockItem *pBlock); +void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); +void mmConsumeParentQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c index 8bcbea4f4d..8cac511a4d 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c @@ -164,7 +164,7 @@ static int32_t mmBuildMsg(SMndMsg *pMsg, SRpcMsg *pRpc) { void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; int32_t code = -1; - SMndMsg *pMsg = NULL; + SMndMsg *pMsg = NULL; MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)]; if (msgFp == NULL) { @@ -185,7 +185,7 @@ void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { if (pMgmt->singleProc) { code = (*msgFp)(pDnode, pMsg); } else { - code = taosProcPushChild(pMgmt->pProcess, pMsg, contLen); + code = taosProcPutToChildQueue(pMgmt->pProcess, pMsg, sizeof(pMsg), pRpc->pCont, pRpc->contLen); } _OVER: @@ -243,15 +243,15 @@ static int32_t mmPutMndMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMndMs } static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc) { - int32_t contLen = sizeof(SMndMsg) + pRpc->contLen; + int32_t contLen = sizeof(SMndMsg) + pRpc->contLen; SMndMsg *pMsg = taosAllocateQitem(contLen); if (pMsg == NULL) { return -1; } - pMsg->contLen = pRpc->contLen; - pMsg->pCont = (char *)pMsg + sizeof(SMndMsg); - memcpy(pMsg->pCont, pRpc->pCont, pRpc->contLen); + pMsg->rpcMsg = *pRpc; + pMsg->rpcMsg.pCont = (char *)pMsg + sizeof(SMndMsg); + memcpy(pMsg->rpcMsg.pCont, pRpc->pCont, pRpc->contLen); rpcFreeCont(pRpc->pCont); int32_t code = mmPutMndMsgToWorker(pDnode, pWorker, pMsg); @@ -262,15 +262,14 @@ static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMs return code; } -void mmConsumeChildQueue(SDnode *pDnode, SBlockItem *pBlock) { +void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SMndMsg *pMsg = (SMndMsg *)pBlock->pCont; - + SRpcMsg *pRpc = &pMsg->rpcMsg; pRpc->pCont = (char *)pMsg + sizeof(SMndMsg); MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)]; - int32_t code = (*msgFp)(pDnode, pMsg); + int32_t code = (*msgFp)(pDnode, pMsg); if (code == 0) return; @@ -287,7 +286,7 @@ void mmConsumeChildQueue(SDnode *pDnode, SBlockItem *pBlock) { taosFreeQitem(pMsg); } -void mmConsumeParentQueue(SMnodeMgmt *pMgmt, SBlockItem *pBlock) {} +void mmConsumeParentQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {} static void mmConsumeQueue(SDnode *pDnode, SMndMsg *pMsg) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index e445f7fcd0..3f5c4c7477 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -20,7 +20,7 @@ #include "tqueue.h" #define SHM_DEFAULT_SIZE (20 * 1024 * 1024) -#define CEIL4(n) (ceil((float)(n) / 4) * 4) +#define CEIL8(n) (ceil((float)(n) / 8) * 8) typedef struct SProcQueue { int32_t head; @@ -33,11 +33,25 @@ typedef struct SProcQueue { pthread_mutex_t mutex; } SProcQueue; +typedef struct SProcObj { + SProcQueue *pChildQueue; + SProcQueue *pParentQueue; + pthread_t childThread; + pthread_t parentThread; + ProcFp childFp; + ProcFp parentFp; + void *pParent; + int32_t pid; + bool isChild; + bool stopFlag; + bool testFlag; +} SProcObj; + static SProcQueue *taosProcQueueInit(int32_t size) { if (size <= 0) size = SHM_DEFAULT_SIZE; - int32_t bufSize = CEIL4(size); - int32_t headSize = CEIL4(sizeof(SProcQueue)); + int32_t bufSize = CEIL8(size); + int32_t headSize = CEIL8(sizeof(SProcQueue)); SProcQueue *pQueue = malloc(bufSize + headSize); if (pQueue == NULL) { @@ -45,35 +59,40 @@ static SProcQueue *taosProcQueueInit(int32_t size) { return NULL; } - pQueue->total = bufSize; - pQueue->avail = bufSize; - pQueue->head = 0; - pQueue->tail = 0; - pQueue->items = 0; - pQueue->pBuffer = (char *)pQueue + headSize; - if (pthread_mutex_init(&pQueue->mutex, NULL) != 0) { + free(pQueue); terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - tsem_init(&pQueue->sem, 0, 0); + if (tsem_init(&pQueue->sem, 0, 0) != 0) { + pthread_mutex_destroy(&pQueue->mutex); + free(pQueue); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pQueue->head = 0; + pQueue->tail = 0; + pQueue->total = bufSize; + pQueue->avail = bufSize; + pQueue->items = 0; + pQueue->pBuffer = (char *)pQueue + headSize; return pQueue; } static void taosProcQueueCleanup(SProcQueue *pQueue) { - pthread_mutex_destroy(&pQueue->mutex); - tsem_destroy(&pQueue->sem); - free(pQueue); + if (pQueue != NULL) { + pthread_mutex_destroy(&pQueue->mutex); + tsem_destroy(&pQueue->sem); + free(pQueue); + } } -static int32_t taosProcQueuePush(SProcQueue *pQueue, void *pItem, int32_t itemLen) { - char *pHead = NULL; - char *pBody1 = NULL; - char *pBody2 = NULL; - int32_t body1Len = 0; - int32_t body2Len = 0; - int32_t fullLen = CEIL4(itemLen) + 4; +static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHeadLen, char *pBody, int32_t rawBodyLen) { + const int32_t headLen = CEIL8(rawHeadLen); + const int32_t bodyLen = CEIL8(rawBodyLen); + const int32_t fullLen = headLen + bodyLen + 8; pthread_mutex_lock(&pQueue->mutex); if (fullLen > pQueue->avail) { @@ -82,55 +101,54 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, void *pItem, int32_t itemLe return -1; } + if (pQueue->tail < pQueue->total) { + *(int32_t *)(pQueue->pBuffer + pQueue->head) = headLen; + *(int32_t *)(pQueue->pBuffer + pQueue->head + 4) = bodyLen; + } else { + *(int32_t *)(pQueue->pBuffer) = headLen; + *(int32_t *)(pQueue->pBuffer + 4) = bodyLen; + } + if (pQueue->tail < pQueue->head) { - pHead = pQueue->pBuffer + pQueue->tail; - pBody1 = pQueue->pBuffer + pQueue->tail + 4; - body1Len = itemLen; - pQueue->tail += fullLen; + memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, rawHeadLen); + memcpy(pQueue->pBuffer + pQueue->tail + 8 + headLen, pBody, rawBodyLen); + pQueue->tail = pQueue->tail + 8 + headLen + bodyLen; } else { int32_t remain = pQueue->total - pQueue->tail; - if (remain >= fullLen) { - pHead = pQueue->pBuffer + pQueue->tail; - pBody1 = pQueue->pBuffer + pQueue->tail + 4; - body1Len = itemLen; - pQueue->tail += fullLen; + if (remain == 0) { + memcpy(pQueue->pBuffer + 8, pHead, rawHeadLen); + memcpy(pQueue->pBuffer + 8 + headLen, pBody, rawBodyLen); + pQueue->tail = 8 + headLen + bodyLen; + } else if (remain == 8) { + memcpy(pQueue->pBuffer, pHead, rawHeadLen); + memcpy(pQueue->pBuffer + headLen, pBody, rawBodyLen); + pQueue->tail = headLen + bodyLen; + } else if (remain < 8 + headLen) { + memcpy(pQueue->pBuffer + pQueue->head + 8, pHead, remain - 8); + memcpy(pQueue->pBuffer, pHead + remain - 8, rawHeadLen - (remain - 8)); + memcpy(pQueue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen); + pQueue->tail = headLen - (remain - 8) + bodyLen; + } else if (remain < 8 + bodyLen) { + memcpy(pQueue->pBuffer + pQueue->head + 8, pHead, rawHeadLen); + memcpy(pQueue->pBuffer + pQueue->head + 8 + headLen, pBody, remain - 8 - headLen); + memcpy(pQueue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen)); + pQueue->tail = bodyLen - (remain - 8 - headLen); } else { - if (remain == 0) { - pHead = pQueue->pBuffer; - pBody1 = pQueue->pBuffer + 4; - body1Len = itemLen; - pQueue->tail = fullLen; - } else if (remain == 4) { - pHead = pQueue->pBuffer + pQueue->tail; - pBody1 = pQueue->pBuffer; - body1Len = itemLen; - pQueue->tail = fullLen - 4; - } else { - pHead = pQueue->pBuffer + pQueue->tail; - pBody1 = pQueue->pBuffer + pQueue->tail + 4; - body1Len = remain - 4; - pBody2 = pQueue->pBuffer; - body2Len = itemLen - body1Len; - pQueue->tail = fullLen - body1Len; - } + memcpy(pQueue->pBuffer + pQueue->head + 8, pHead, rawHeadLen); + memcpy(pQueue->pBuffer + pQueue->head + headLen + 8, pBody, rawBodyLen); + pQueue->tail = pQueue->head + headLen + bodyLen + 8; } } - *(int32_t *)(pHead) = fullLen; - memcpy(pBody1, pItem, body1Len); - if (pBody2 && body2Len != 0) { - memcpy(pBody1, pItem + body1Len, body2Len); - } - pQueue->avail -= fullLen; pQueue->items++; - pthread_mutex_unlock(&pQueue->mutex); tsem_post(&pQueue->sem); return 0; } -static int32_t taosProcQueuePop(SProcQueue *pQueue, SBlockItem **ppItem) { +static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHeadLen, void **ppBody, + int32_t *pBodyLen) { tsem_wait(&pQueue->sem); pthread_mutex_lock(&pQueue->mutex); @@ -141,38 +159,66 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, SBlockItem **ppItem) { return -1; } - SBlockItem *pBlock = (SBlockItem *)(pQueue->pBuffer + pQueue->head); + int32_t headLen = 0; + int32_t bodyLen = 0; + if (pQueue->head < pQueue->total) { + headLen = *(int32_t *)(pQueue->pBuffer + pQueue->head); + bodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4); + } else { + headLen = *(int32_t *)(pQueue->pBuffer); + bodyLen = *(int32_t *)(pQueue->pBuffer + 4); + } - SBlockItem *pItem = taosAllocateQitem(pBlock->contLen); - if (pItem == NULL) { + void *pHead = taosAllocateQitem(headLen); + void *pBody = malloc(bodyLen); + if (pHead == NULL || pBody == NULL) { pthread_mutex_unlock(&pQueue->mutex); tsem_post(&pQueue->sem); + taosFreeQitem(pHead); + free(pBody); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } if (pQueue->head < pQueue->tail) { - memcpy(pItem, pQueue->pBuffer + pQueue->head, pBlock->contLen); - pQueue->head += pBlock->contLen; + memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen); + memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, bodyLen); + pQueue->head = pQueue->head + 8 + headLen + bodyLen; } else { int32_t remain = pQueue->total - pQueue->head; - if (remain >= pBlock->contLen) { - memcpy(pItem, pQueue->pBuffer + pQueue->head, pBlock->contLen); - pQueue->head += pBlock->contLen; + if (remain == 0) { + memcpy(pHead, pQueue->pBuffer + 8, headLen); + memcpy(pBody, pQueue->pBuffer + 8 + headLen, bodyLen); + pQueue->head = 8 + headLen + bodyLen; + } else if (remain == 8) { + memcpy(pHead, pQueue->pBuffer, headLen); + memcpy(pBody, pQueue->pBuffer + headLen, bodyLen); + pQueue->head = headLen + bodyLen; + } else if (remain < 8 + headLen) { + memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, remain - 8); + memcpy(pHead + remain - 8, pQueue->pBuffer, headLen - (remain - 8)); + memcpy(pBody, pQueue->pBuffer + headLen - (remain - 8), bodyLen); + pQueue->head = headLen - (remain - 8) + bodyLen; + } else if (remain < 8 + bodyLen) { + memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen); + memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, remain - 8 - headLen); + memcpy(pBody + remain - 8 - headLen, pQueue->pBuffer, bodyLen - (remain - 8 - headLen)); + pQueue->head = bodyLen - (remain - 8 - headLen); } else { - memcpy(pItem, pQueue->pBuffer + pQueue->head, remain); - memcpy(pItem + remain, pQueue->pBuffer, pBlock->contLen - remain); - pQueue->head = pBlock->contLen - remain; + memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen); + memcpy(pBody, pQueue->pBuffer + pQueue->head + headLen + 8, bodyLen); + pQueue->head = pQueue->head + headLen + bodyLen + 8; } } - pQueue->avail += pBlock->contLen; + pQueue->avail = pQueue->avail + headLen + bodyLen + 8; pQueue->items--; - - pItem->contLen = pBlock->contLen - 4; - *ppItem = pItem; pthread_mutex_unlock(&pQueue->mutex); + *ppHead = pHead; + *ppBody = pBody; + *pHeadLen = headLen; + *pBodyLen = bodyLen; return 0; } @@ -190,16 +236,27 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { pProc->pChildQueue = taosProcQueueInit(pCfg->childQueueSize); pProc->pParentQueue = taosProcQueueInit(pCfg->parentQueueSize); + if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) { + taosProcQueueCleanup(pProc->pChildQueue); + taosProcQueueCleanup(pProc->pParentQueue); + free(pProc); + return NULL; + } + + // todo + pProc->isChild = 0; + return pProc; } -static bool taosProcIsChild(SProcObj *pProc) { return pProc->pid == 0; } - static void taosProcThreadLoop(SProcQueue *pQueue, ProcFp procFp, void *pParent) { - SBlockItem *pItem = NULL; + void *pHead; + void *pBody; + int32_t headLen; + int32_t bodyLen; while (1) { - int32_t code = taosProcQueuePop(pQueue, &pItem); + int32_t code = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen); if (code < 0) { uDebug("queue:%p, got no message and exiting", pQueue); break; @@ -208,7 +265,7 @@ static void taosProcThreadLoop(SProcQueue *pQueue, ProcFp procFp, void *pParent) taosMsleep(1); continue; } else { - (*procFp)(pParent, pItem); + (*procFp)(pParent, pHead, headLen, pBody, bodyLen); } } } @@ -230,8 +287,7 @@ int32_t taosProcStart(SProcObj *pProc) { pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - bool isChild = taosProcIsChild(pProc); - if (isChild || !pProc->testFlag) { + if (pProc->isChild || pProc->testFlag) { if (pthread_create(&pProc->childThread, &thAttr, taosProcThreadChildLoop, pProc) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to create thread since %s", terrstr()); @@ -239,7 +295,7 @@ int32_t taosProcStart(SProcObj *pProc) { } } - if (!isChild || !pProc->testFlag) { + if (!pProc->isChild || pProc->testFlag) { if (pthread_create(&pProc->parentThread, &thAttr, taosProcThreadParentLoop, pProc) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to create thread since %s", terrstr()); @@ -252,12 +308,22 @@ int32_t taosProcStart(SProcObj *pProc) { void taosProcStop(SProcObj *pProc) { pProc->stopFlag = true; - // todo join + // todo + // join } -void taosProcCleanup(SProcObj *pProc) {} +void taosProcCleanup(SProcObj *pProc) { + if (pProc != NULL) { + taosProcQueueCleanup(pProc->pChildQueue); + taosProcQueueCleanup(pProc->pParentQueue); + free(pProc); + } +} + +int32_t taosProcPutToChildQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen) { + return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen); +} -int32_t taosProcPushChild(SProcObj *pProc, void *pCont, int32_t contLen) { - SProcQueue *pQueue = pProc->pChildQueue; - taosProcQueuePush(pQueue, pCont, contLen); +int32_t taosProcPutToParentQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen) { + return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen); } -- GitLab