提交 266bf53a 编写于 作者: S Shengliang Guan

shm

上级 ae4835db
......@@ -25,7 +25,6 @@ extern "C" {
/* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SDnode SDnode;
typedef struct SMnode SMnode;
typedef struct SMndMsg SMndMsg;
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef int32_t (*PutReqToMWriteQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
......@@ -33,7 +32,7 @@ typedef int32_t (*PutReqToMReadQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef struct SMndMsg {
typedef struct {
char user[TSDB_USER_LEN];
char db[TSDB_DB_FNAME_LEN];
int32_t acctId;
......
......@@ -22,43 +22,26 @@
extern "C" {
#endif
typedef struct {
int32_t contLen;
char pCont[];
} SBlockItem;
typedef void *(*ProcFp)(void *parent, SBlockItem *pItem);
typedef struct SProcQueue SProcQueue;
typedef struct SProcObj SProcObj;
typedef void *(*ProcFp)(void *pParent, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen);
typedef struct {
void *pParent;
bool testFlag;
int32_t childQueueSize;
int32_t parentQueueSize;
ProcFp childFp;
ProcFp parentFp;
void *pParent;
bool testFlag;
} SProcCfg;
typedef struct {
int32_t pid;
SProcQueue *pChildQueue;
SProcQueue *pParentQueue;
pthread_t childThread;
pthread_t parentThread;
ProcFp childFp;
ProcFp parentFp;
void *pParent;
bool stopFlag;
bool testFlag;
bool isChild;
} SProcObj;
SProcObj *taosProcInit(const SProcCfg *pCfg);
void taosProcCleanup(SProcObj *pProc);
int32_t taosProcStart(SProcObj *pProc);
void taosProcStop(SProcObj *pProc);
void taosProcCleanup(SProcObj *pProc);
int32_t taosProcPushChild(SProcObj *pProc, void *pCont, int32_t contLen);
int32_t taosProcPutToChildQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen);
int32_t taosProcPutToParentQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen);
#ifdef __cplusplus
}
......
......@@ -52,8 +52,8 @@ void mmInitMsgFp(SMnodeMgmt *pMgmt);
void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void mmConsumeChildQueue(SDnode *pDnode, SBlockItem *pBlock);
void mmConsumeParentQueue(SMnodeMgmt *pMgmt, SBlockItem *pBlock);
void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void mmConsumeParentQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
#ifdef __cplusplus
}
......
......@@ -164,7 +164,7 @@ static int32_t mmBuildMsg(SMndMsg *pMsg, SRpcMsg *pRpc) {
void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
int32_t code = -1;
SMndMsg *pMsg = NULL;
SMndMsg *pMsg = NULL;
MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)];
if (msgFp == NULL) {
......@@ -185,7 +185,7 @@ void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
if (pMgmt->singleProc) {
code = (*msgFp)(pDnode, pMsg);
} else {
code = taosProcPushChild(pMgmt->pProcess, pMsg, contLen);
code = taosProcPutToChildQueue(pMgmt->pProcess, pMsg, sizeof(pMsg), pRpc->pCont, pRpc->contLen);
}
_OVER:
......@@ -243,15 +243,15 @@ static int32_t mmPutMndMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMndMs
}
static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc) {
int32_t contLen = sizeof(SMndMsg) + pRpc->contLen;
int32_t contLen = sizeof(SMndMsg) + pRpc->contLen;
SMndMsg *pMsg = taosAllocateQitem(contLen);
if (pMsg == NULL) {
return -1;
}
pMsg->contLen = pRpc->contLen;
pMsg->pCont = (char *)pMsg + sizeof(SMndMsg);
memcpy(pMsg->pCont, pRpc->pCont, pRpc->contLen);
pMsg->rpcMsg = *pRpc;
pMsg->rpcMsg.pCont = (char *)pMsg + sizeof(SMndMsg);
memcpy(pMsg->rpcMsg.pCont, pRpc->pCont, pRpc->contLen);
rpcFreeCont(pRpc->pCont);
int32_t code = mmPutMndMsgToWorker(pDnode, pWorker, pMsg);
......@@ -262,15 +262,14 @@ static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMs
return code;
}
void mmConsumeChildQueue(SDnode *pDnode, SBlockItem *pBlock) {
void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMndMsg *pMsg = (SMndMsg *)pBlock->pCont;
SRpcMsg *pRpc = &pMsg->rpcMsg;
pRpc->pCont = (char *)pMsg + sizeof(SMndMsg);
MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)];
int32_t code = (*msgFp)(pDnode, pMsg);
int32_t code = (*msgFp)(pDnode, pMsg);
if (code == 0) return;
......@@ -287,7 +286,7 @@ void mmConsumeChildQueue(SDnode *pDnode, SBlockItem *pBlock) {
taosFreeQitem(pMsg);
}
void mmConsumeParentQueue(SMnodeMgmt *pMgmt, SBlockItem *pBlock) {}
void mmConsumeParentQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {}
static void mmConsumeQueue(SDnode *pDnode, SMndMsg *pMsg) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
......
......@@ -20,7 +20,7 @@
#include "tqueue.h"
#define SHM_DEFAULT_SIZE (20 * 1024 * 1024)
#define CEIL4(n) (ceil((float)(n) / 4) * 4)
#define CEIL8(n) (ceil((float)(n) / 8) * 8)
typedef struct SProcQueue {
int32_t head;
......@@ -33,11 +33,25 @@ typedef struct SProcQueue {
pthread_mutex_t mutex;
} SProcQueue;
typedef struct SProcObj {
SProcQueue *pChildQueue;
SProcQueue *pParentQueue;
pthread_t childThread;
pthread_t parentThread;
ProcFp childFp;
ProcFp parentFp;
void *pParent;
int32_t pid;
bool isChild;
bool stopFlag;
bool testFlag;
} SProcObj;
static SProcQueue *taosProcQueueInit(int32_t size) {
if (size <= 0) size = SHM_DEFAULT_SIZE;
int32_t bufSize = CEIL4(size);
int32_t headSize = CEIL4(sizeof(SProcQueue));
int32_t bufSize = CEIL8(size);
int32_t headSize = CEIL8(sizeof(SProcQueue));
SProcQueue *pQueue = malloc(bufSize + headSize);
if (pQueue == NULL) {
......@@ -45,35 +59,40 @@ static SProcQueue *taosProcQueueInit(int32_t size) {
return NULL;
}
pQueue->total = bufSize;
pQueue->avail = bufSize;
pQueue->head = 0;
pQueue->tail = 0;
pQueue->items = 0;
pQueue->pBuffer = (char *)pQueue + headSize;
if (pthread_mutex_init(&pQueue->mutex, NULL) != 0) {
free(pQueue);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tsem_init(&pQueue->sem, 0, 0);
if (tsem_init(&pQueue->sem, 0, 0) != 0) {
pthread_mutex_destroy(&pQueue->mutex);
free(pQueue);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pQueue->head = 0;
pQueue->tail = 0;
pQueue->total = bufSize;
pQueue->avail = bufSize;
pQueue->items = 0;
pQueue->pBuffer = (char *)pQueue + headSize;
return pQueue;
}
static void taosProcQueueCleanup(SProcQueue *pQueue) {
pthread_mutex_destroy(&pQueue->mutex);
tsem_destroy(&pQueue->sem);
free(pQueue);
if (pQueue != NULL) {
pthread_mutex_destroy(&pQueue->mutex);
tsem_destroy(&pQueue->sem);
free(pQueue);
}
}
static int32_t taosProcQueuePush(SProcQueue *pQueue, void *pItem, int32_t itemLen) {
char *pHead = NULL;
char *pBody1 = NULL;
char *pBody2 = NULL;
int32_t body1Len = 0;
int32_t body2Len = 0;
int32_t fullLen = CEIL4(itemLen) + 4;
static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHeadLen, char *pBody, int32_t rawBodyLen) {
const int32_t headLen = CEIL8(rawHeadLen);
const int32_t bodyLen = CEIL8(rawBodyLen);
const int32_t fullLen = headLen + bodyLen + 8;
pthread_mutex_lock(&pQueue->mutex);
if (fullLen > pQueue->avail) {
......@@ -82,55 +101,54 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, void *pItem, int32_t itemLe
return -1;
}
if (pQueue->tail < pQueue->total) {
*(int32_t *)(pQueue->pBuffer + pQueue->head) = headLen;
*(int32_t *)(pQueue->pBuffer + pQueue->head + 4) = bodyLen;
} else {
*(int32_t *)(pQueue->pBuffer) = headLen;
*(int32_t *)(pQueue->pBuffer + 4) = bodyLen;
}
if (pQueue->tail < pQueue->head) {
pHead = pQueue->pBuffer + pQueue->tail;
pBody1 = pQueue->pBuffer + pQueue->tail + 4;
body1Len = itemLen;
pQueue->tail += fullLen;
memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, rawHeadLen);
memcpy(pQueue->pBuffer + pQueue->tail + 8 + headLen, pBody, rawBodyLen);
pQueue->tail = pQueue->tail + 8 + headLen + bodyLen;
} else {
int32_t remain = pQueue->total - pQueue->tail;
if (remain >= fullLen) {
pHead = pQueue->pBuffer + pQueue->tail;
pBody1 = pQueue->pBuffer + pQueue->tail + 4;
body1Len = itemLen;
pQueue->tail += fullLen;
if (remain == 0) {
memcpy(pQueue->pBuffer + 8, pHead, rawHeadLen);
memcpy(pQueue->pBuffer + 8 + headLen, pBody, rawBodyLen);
pQueue->tail = 8 + headLen + bodyLen;
} else if (remain == 8) {
memcpy(pQueue->pBuffer, pHead, rawHeadLen);
memcpy(pQueue->pBuffer + headLen, pBody, rawBodyLen);
pQueue->tail = headLen + bodyLen;
} else if (remain < 8 + headLen) {
memcpy(pQueue->pBuffer + pQueue->head + 8, pHead, remain - 8);
memcpy(pQueue->pBuffer, pHead + remain - 8, rawHeadLen - (remain - 8));
memcpy(pQueue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
pQueue->tail = headLen - (remain - 8) + bodyLen;
} else if (remain < 8 + bodyLen) {
memcpy(pQueue->pBuffer + pQueue->head + 8, pHead, rawHeadLen);
memcpy(pQueue->pBuffer + pQueue->head + 8 + headLen, pBody, remain - 8 - headLen);
memcpy(pQueue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
pQueue->tail = bodyLen - (remain - 8 - headLen);
} else {
if (remain == 0) {
pHead = pQueue->pBuffer;
pBody1 = pQueue->pBuffer + 4;
body1Len = itemLen;
pQueue->tail = fullLen;
} else if (remain == 4) {
pHead = pQueue->pBuffer + pQueue->tail;
pBody1 = pQueue->pBuffer;
body1Len = itemLen;
pQueue->tail = fullLen - 4;
} else {
pHead = pQueue->pBuffer + pQueue->tail;
pBody1 = pQueue->pBuffer + pQueue->tail + 4;
body1Len = remain - 4;
pBody2 = pQueue->pBuffer;
body2Len = itemLen - body1Len;
pQueue->tail = fullLen - body1Len;
}
memcpy(pQueue->pBuffer + pQueue->head + 8, pHead, rawHeadLen);
memcpy(pQueue->pBuffer + pQueue->head + headLen + 8, pBody, rawBodyLen);
pQueue->tail = pQueue->head + headLen + bodyLen + 8;
}
}
*(int32_t *)(pHead) = fullLen;
memcpy(pBody1, pItem, body1Len);
if (pBody2 && body2Len != 0) {
memcpy(pBody1, pItem + body1Len, body2Len);
}
pQueue->avail -= fullLen;
pQueue->items++;
pthread_mutex_unlock(&pQueue->mutex);
tsem_post(&pQueue->sem);
return 0;
}
static int32_t taosProcQueuePop(SProcQueue *pQueue, SBlockItem **ppItem) {
static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHeadLen, void **ppBody,
int32_t *pBodyLen) {
tsem_wait(&pQueue->sem);
pthread_mutex_lock(&pQueue->mutex);
......@@ -141,38 +159,66 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, SBlockItem **ppItem) {
return -1;
}
SBlockItem *pBlock = (SBlockItem *)(pQueue->pBuffer + pQueue->head);
int32_t headLen = 0;
int32_t bodyLen = 0;
if (pQueue->head < pQueue->total) {
headLen = *(int32_t *)(pQueue->pBuffer + pQueue->head);
bodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4);
} else {
headLen = *(int32_t *)(pQueue->pBuffer);
bodyLen = *(int32_t *)(pQueue->pBuffer + 4);
}
SBlockItem *pItem = taosAllocateQitem(pBlock->contLen);
if (pItem == NULL) {
void *pHead = taosAllocateQitem(headLen);
void *pBody = malloc(bodyLen);
if (pHead == NULL || pBody == NULL) {
pthread_mutex_unlock(&pQueue->mutex);
tsem_post(&pQueue->sem);
taosFreeQitem(pHead);
free(pBody);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (pQueue->head < pQueue->tail) {
memcpy(pItem, pQueue->pBuffer + pQueue->head, pBlock->contLen);
pQueue->head += pBlock->contLen;
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, bodyLen);
pQueue->head = pQueue->head + 8 + headLen + bodyLen;
} else {
int32_t remain = pQueue->total - pQueue->head;
if (remain >= pBlock->contLen) {
memcpy(pItem, pQueue->pBuffer + pQueue->head, pBlock->contLen);
pQueue->head += pBlock->contLen;
if (remain == 0) {
memcpy(pHead, pQueue->pBuffer + 8, headLen);
memcpy(pBody, pQueue->pBuffer + 8 + headLen, bodyLen);
pQueue->head = 8 + headLen + bodyLen;
} else if (remain == 8) {
memcpy(pHead, pQueue->pBuffer, headLen);
memcpy(pBody, pQueue->pBuffer + headLen, bodyLen);
pQueue->head = headLen + bodyLen;
} else if (remain < 8 + headLen) {
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, remain - 8);
memcpy(pHead + remain - 8, pQueue->pBuffer, headLen - (remain - 8));
memcpy(pBody, pQueue->pBuffer + headLen - (remain - 8), bodyLen);
pQueue->head = headLen - (remain - 8) + bodyLen;
} else if (remain < 8 + bodyLen) {
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, remain - 8 - headLen);
memcpy(pBody + remain - 8 - headLen, pQueue->pBuffer, bodyLen - (remain - 8 - headLen));
pQueue->head = bodyLen - (remain - 8 - headLen);
} else {
memcpy(pItem, pQueue->pBuffer + pQueue->head, remain);
memcpy(pItem + remain, pQueue->pBuffer, pBlock->contLen - remain);
pQueue->head = pBlock->contLen - remain;
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
memcpy(pBody, pQueue->pBuffer + pQueue->head + headLen + 8, bodyLen);
pQueue->head = pQueue->head + headLen + bodyLen + 8;
}
}
pQueue->avail += pBlock->contLen;
pQueue->avail = pQueue->avail + headLen + bodyLen + 8;
pQueue->items--;
pItem->contLen = pBlock->contLen - 4;
*ppItem = pItem;
pthread_mutex_unlock(&pQueue->mutex);
*ppHead = pHead;
*ppBody = pBody;
*pHeadLen = headLen;
*pBodyLen = bodyLen;
return 0;
}
......@@ -190,16 +236,27 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
pProc->pChildQueue = taosProcQueueInit(pCfg->childQueueSize);
pProc->pParentQueue = taosProcQueueInit(pCfg->parentQueueSize);
if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) {
taosProcQueueCleanup(pProc->pChildQueue);
taosProcQueueCleanup(pProc->pParentQueue);
free(pProc);
return NULL;
}
// todo
pProc->isChild = 0;
return pProc;
}
static bool taosProcIsChild(SProcObj *pProc) { return pProc->pid == 0; }
static void taosProcThreadLoop(SProcQueue *pQueue, ProcFp procFp, void *pParent) {
SBlockItem *pItem = NULL;
void *pHead;
void *pBody;
int32_t headLen;
int32_t bodyLen;
while (1) {
int32_t code = taosProcQueuePop(pQueue, &pItem);
int32_t code = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen);
if (code < 0) {
uDebug("queue:%p, got no message and exiting", pQueue);
break;
......@@ -208,7 +265,7 @@ static void taosProcThreadLoop(SProcQueue *pQueue, ProcFp procFp, void *pParent)
taosMsleep(1);
continue;
} else {
(*procFp)(pParent, pItem);
(*procFp)(pParent, pHead, headLen, pBody, bodyLen);
}
}
}
......@@ -230,8 +287,7 @@ int32_t taosProcStart(SProcObj *pProc) {
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
bool isChild = taosProcIsChild(pProc);
if (isChild || !pProc->testFlag) {
if (pProc->isChild || pProc->testFlag) {
if (pthread_create(&pProc->childThread, &thAttr, taosProcThreadChildLoop, pProc) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create thread since %s", terrstr());
......@@ -239,7 +295,7 @@ int32_t taosProcStart(SProcObj *pProc) {
}
}
if (!isChild || !pProc->testFlag) {
if (!pProc->isChild || pProc->testFlag) {
if (pthread_create(&pProc->parentThread, &thAttr, taosProcThreadParentLoop, pProc) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create thread since %s", terrstr());
......@@ -252,12 +308,22 @@ int32_t taosProcStart(SProcObj *pProc) {
void taosProcStop(SProcObj *pProc) {
pProc->stopFlag = true;
// todo join
// todo
// join
}
void taosProcCleanup(SProcObj *pProc) {}
void taosProcCleanup(SProcObj *pProc) {
if (pProc != NULL) {
taosProcQueueCleanup(pProc->pChildQueue);
taosProcQueueCleanup(pProc->pParentQueue);
free(pProc);
}
}
int32_t taosProcPutToChildQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen) {
return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen);
}
int32_t taosProcPushChild(SProcObj *pProc, void *pCont, int32_t contLen) {
SProcQueue *pQueue = pProc->pChildQueue;
taosProcQueuePush(pQueue, pCont, contLen);
int32_t taosProcPutToParentQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen) {
return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册