diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 6ad34eb0c061fc4564d1cdf4eb97f6bd38d881cb..1d40093c45ca70e495735129e270b058fca1c317 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -75,6 +75,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x010B) #define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x010C) #define TSDB_CODE_INVALID_CFG TAOS_DEF_ERROR_CODE(0, 0x010D) +#define TSDB_CODE_OUT_OF_SHM_MEM TAOS_DEF_ERROR_CODE(0, 0x010E) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0110) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0111) #define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0112) diff --git a/include/util/tprocess.h b/include/util/tprocess.h new file mode 100644 index 0000000000000000000000000000000000000000..067404a0252e602253f55923d97f9f66e80e917f --- /dev/null +++ b/include/util/tprocess.h @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_UTIL_PROCESS_H_ +#define _TD_UTIL_PROCESS_H_ + +#include "os.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + int32_t contLen; + char pCont[]; +} SBlockItem; + +typedef void *(*ProcFp)(void *parent, SBlockItem *pItem); + +typedef struct SProcQueue SProcQueue; + +typedef struct { + int32_t childQueueSize; + int32_t parentQueueSize; + ProcFp childFp; + ProcFp parentFp; +} SProcCfg; + +typedef struct { + int32_t pid; + SProcCfg cfg; + SProcQueue *pChildQueue; + SProcQueue *pParentQueue; + pthread_t childThread; + pthread_t parentThread; + void *pParent; + bool stopFlag; + bool testFlag; +} SProcObj; + +SProcObj *taosProcInit(const SProcCfg *pCfg); +int32_t taosProcStart(SProcObj *pProc); +void taosProcStop(SProcObj *pProc); +void taosProcCleanup(SProcObj *pProc); +int32_t taosProcPushChild(SProcObj *pProc, void *pCont, int32_t contLen); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_UTIL_PROCESS_H_*/ diff --git a/include/util/tthread.h b/include/util/tthread.h index 49412069445d0f1cb89631683daa6411ca88879d..b9eaae01b7b8c1ef21eb6e9c802acb0c646c431a 100644 --- a/include/util/tthread.h +++ b/include/util/tthread.h @@ -26,6 +26,8 @@ pthread_t* taosCreateThread(void* (*__start_routine)(void*), void* param); bool taosDestoryThread(pthread_t* pthread); bool taosThreadRunning(pthread_t* pthread); +typedef void *(*ThreadFp)(void *param); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/impl/inc/dndEnv.h b/source/dnode/mgmt/impl/inc/dndEnv.h index 13ef101908906d309a88d2349382fdedb765cf30..7f8e11b7aabc924ff50ec73eed2f880f3a1e41a2 100644 --- a/source/dnode/mgmt/impl/inc/dndEnv.h +++ b/source/dnode/mgmt/impl/inc/dndEnv.h @@ -74,6 +74,10 @@ typedef struct { int8_t replica; int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; + + // + bool multiProcess; + SProcObj *pProcess; } SMnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 4ca6b97ad44d3e1fb21fe76cfdfa198def67552c..10e79f6710351aa6ca6e09d5fdac7f9f9779213f 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -47,6 +47,8 @@ extern "C" { #include "vnode.h" #include "tfs.h" +#include "tprocess.h" + #define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} #define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} #define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 0ea47c89d8cfa6af4307001a753fa0bfb00b09a2..531f0b84cda52e3fb9b50755d93bf22512b97e3e 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -364,6 +364,41 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) { return 0; } +static void dndMnodeProcessChildQueue(SDnode *pDnode, SBlockItem *pBlock) { + SRpcMsg *pMsg = (SRpcMsg*)pBlock->pCont; + dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg); + free(pBlock); +} + +static void dndMnodeProcessParentQueue(SMnodeMgmt *pMgmt, SBlockItem *pItem) { + +} + +static int32_t dndMnodeOpen(SDnode *pDnode, SMnodeOpt *pOption) { + SMnodeMgmt *pMgmt = &pDnode->mmgmt; + pMgmt->multiProcess = true; + + int32_t code = dndOpenMnode(pDnode, pOption); + + if (code == 0 && pMgmt->multiProcess) { + SProcCfg cfg = {0}; + cfg.childFp = (ProcFp)dndMnodeProcessChildQueue; + cfg.parentFp = (ProcFp)dndMnodeProcessParentQueue; + cfg.childQueueSize = 1024 * 1024; + cfg.parentQueueSize = 1024 * 1024; + + pMgmt->pProcess = taosProcInit(&cfg); + if (pMgmt->pProcess == NULL) { + return -1; + } + pMgmt->pProcess->pParent = pDnode; + pMgmt->pProcess->testFlag = true; + return taosProcStart(pMgmt->pProcess); + } + + return code; +} + static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; @@ -557,16 +592,23 @@ static void dndWriteMnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpc } } +static void dndMnodeWriteToChildQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + taosProcPushChild(pMgmt->pProcess, pMsg, sizeof(SRpcMsg)); +} + void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg); + dndMnodeWriteToChildQueue(&pDnode->mmgmt, pMsg); + // dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg); } void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMsg); + dndMnodeWriteToChildQueue(&pDnode->mmgmt, pMsg); + // dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMsg); } void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMsg); + dndMnodeWriteToChildQueue(&pDnode->mmgmt, pMsg); + // dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMsg); } int32_t dndInitMnode(SDnode *pDnode) { @@ -594,12 +636,12 @@ int32_t dndInitMnode(SDnode *pDnode) { dInfo("start to deploy mnode"); SMnodeOpt option = {0}; dndBuildMnodeDeployOption(pDnode, &option); - return dndOpenMnode(pDnode, &option); + return dndMnodeOpen(pDnode, &option); } else { dInfo("start to open mnode"); SMnodeOpt option = {0}; dndBuildMnodeOpenOption(pDnode, &option); - return dndOpenMnode(pDnode, &option); + return dndMnodeOpen(pDnode, &option); } } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c1cb4f8a410665fae27b4a24848c4de8ff3a7f30..7b348787ec241eb4f75eabf91b15834f7fbe4b88 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -84,6 +84,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_PARA, "Invalid parameters") TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization") TAOS_DEFINE_ERROR(TSDB_CODE_CFG_NOT_FOUND, "Config not found") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CFG, "Invalid config option") +TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_SHM_MEM, "Out of Share memory") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs") diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c new file mode 100644 index 0000000000000000000000000000000000000000..e0ebed744c5adf6e66601000229e742b61d0a5fc --- /dev/null +++ b/source/util/src/tprocess.c @@ -0,0 +1,266 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "tprocess.h" +#include "taoserror.h" +#include "tlog.h" + +#define SHM_DEFAULT_SIZE (20 * 1024 * 1024) +#define CEIL4(n) (ceil((float)(n) / 4) * 4) + +typedef struct SProcQueue { + int32_t head; + int32_t tail; + int32_t total; + int32_t avail; + int32_t items; + char *pBuffer; + tsem_t sem; + pthread_mutex_t mutex; +} SProcQueue; + +static SProcQueue *taosProcQueueInit(int32_t size) { + int32_t bufSize = CEIL4(size); + int32_t headSize = CEIL4(sizeof(SProcQueue)); + + SProcQueue *pQueue = malloc(bufSize + headSize); + if (pQueue == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pQueue->total = bufSize; + pQueue->avail = bufSize; + pQueue->head = 0; + pQueue->tail = 0; + pQueue->items = 0; + pQueue->pBuffer = (char *)pQueue + headSize; + + if (pthread_mutex_init(&pQueue->mutex, NULL) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + tsem_init(&pQueue->sem, 0, 0); + return pQueue; +} + +static void taosProcQueueCleanup(SProcQueue *pQueue) { + pthread_mutex_destroy(&pQueue->mutex); + tsem_destroy(&pQueue->sem); + free(pQueue); +} + +static int32_t taosProcQueuePush(SProcQueue *pQueue, void *pItem, int32_t itemLen) { + char *pHead = NULL; + char *pBody1 = NULL; + char *pBody2 = NULL; + int32_t body1Len = 0; + int32_t body2Len = 0; + int32_t fullLen = CEIL4(itemLen) + 4; + + pthread_mutex_lock(&pQueue->mutex); + if (fullLen > pQueue->avail) { + pthread_mutex_unlock(&pQueue->mutex); + terrno = TSDB_CODE_OUT_OF_SHM_MEM; + return -1; + } + + if (pQueue->tail < pQueue->head) { + pHead = pQueue->pBuffer + pQueue->tail; + pBody1 = pQueue->pBuffer + pQueue->tail + 4; + body1Len = itemLen; + pQueue->tail += fullLen; + } else { + int32_t remain = pQueue->total - pQueue->tail; + if (remain >= fullLen) { + pHead = pQueue->pBuffer + pQueue->tail; + pBody1 = pQueue->pBuffer + pQueue->tail + 4; + body1Len = itemLen; + pQueue->tail += fullLen; + } else { + if (remain == 0) { + pHead = pQueue->pBuffer; + pBody1 = pQueue->pBuffer + 4; + body1Len = itemLen; + pQueue->tail = fullLen; + } else if (remain == 4) { + pHead = pQueue->pBuffer + pQueue->tail; + pBody1 = pQueue->pBuffer; + body1Len = itemLen; + pQueue->tail = fullLen - 4; + } else { + pHead = pQueue->pBuffer + pQueue->tail; + pBody1 = pQueue->pBuffer + pQueue->tail + 4; + body1Len = remain - 4; + pBody2 = pQueue->pBuffer; + body2Len = itemLen - body1Len; + pQueue->tail = fullLen - body1Len; + } + } + } + + *(int32_t *)(pHead) = fullLen; + memcpy(pBody1, pItem, body1Len); + if (pBody2 && body2Len != 0) { + memcpy(pBody1, pItem + body1Len, body2Len); + } + + pQueue->avail -= fullLen; + pQueue->items++; + + pthread_mutex_unlock(&pQueue->mutex); + tsem_post(&pQueue->sem); + return 0; +} + +static int32_t taosProcQueuePop(SProcQueue *pQueue, SBlockItem **ppItem) { + tsem_wait(&pQueue->sem); + + pthread_mutex_lock(&pQueue->mutex); + if (pQueue->total - pQueue->avail <= 0) { + pthread_mutex_unlock(&pQueue->mutex); + tsem_post(&pQueue->sem); + terrno = TSDB_CODE_OUT_OF_SHM_MEM; + return -1; + } + + SBlockItem *pBlock = (SBlockItem *)(pQueue->pBuffer + pQueue->head); + + SBlockItem *pItem = malloc(pBlock->contLen); + if (pItem == NULL) { + pthread_mutex_unlock(&pQueue->mutex); + tsem_post(&pQueue->sem); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (pQueue->head < pQueue->tail) { + memcpy(pItem, pQueue->pBuffer + pQueue->head, pBlock->contLen); + pQueue->head += pBlock->contLen; + } else { + int32_t remain = pQueue->total - pQueue->head; + if (remain >= pBlock->contLen) { + memcpy(pItem, pQueue->pBuffer + pQueue->head, pBlock->contLen); + pQueue->head += pBlock->contLen; + } else { + memcpy(pItem, pQueue->pBuffer + pQueue->head, remain); + memcpy(pItem + remain, pQueue->pBuffer, pBlock->contLen - remain); + pQueue->head = pBlock->contLen - remain; + } + } + + pQueue->avail += pBlock->contLen; + pQueue->items--; + + pItem->contLen = pBlock->contLen - 4; + *ppItem = pItem; + pthread_mutex_unlock(&pQueue->mutex); + + return 0; +} + +SProcObj *taosProcInit(const SProcCfg *pCfg) { + SProcObj *pProc = calloc(1, sizeof(SProcObj)); + if (pProc == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pProc->cfg = *pCfg; + + if (pProc->cfg.childQueueSize <= 0) { + pProc->cfg.childQueueSize = SHM_DEFAULT_SIZE; + } + + if (pProc->cfg.parentQueueSize <= 0) { + pProc->cfg.parentQueueSize = SHM_DEFAULT_SIZE; + } + + pProc->pChildQueue = taosProcQueueInit(pProc->cfg.childQueueSize); + pProc->pParentQueue = taosProcQueueInit(pProc->cfg.parentQueueSize); + + return pProc; +} + +static bool taosProcIsChild(SProcObj *pProc) { return pProc->pid == 0; } + +static void taosProcThreadLoop(SProcQueue *pQueue, ProcFp procFp, void *pParent) { + SBlockItem *pItem = NULL; + + while (1) { + int32_t code = taosProcQueuePop(pQueue, &pItem); + if (code < 0) { + uDebug("queue:%p, got no message and exiting", pQueue); + break; + } else if (code < 0) { + uTrace("queue:%p, got no message since %s", pQueue, terrstr()); + taosMsleep(1); + continue; + } else { + (*procFp)(pParent, pItem); + } + } +} + +static void *taosProcThreadChildLoop(void *param) { + SProcObj *pProc = param; + taosProcThreadLoop(pProc->pChildQueue, pProc->cfg.childFp, pProc->pParent); + return NULL; +} + +static void *taosProcThreadParentLoop(void *param) { + SProcObj *pProc = param; + taosProcThreadLoop(pProc->pParentQueue, pProc->cfg.parentFp, pProc->pParent); + return NULL; +} + +int32_t taosProcStart(SProcObj *pProc) { + pthread_attr_t thAttr = {0}; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + bool isChild = taosProcIsChild(pProc); + if (isChild || !pProc->testFlag) { + if (pthread_create(&pProc->childThread, &thAttr, taosProcThreadChildLoop, pProc) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to create thread since %s", terrstr()); + return -1; + } + } + + if (!isChild || !pProc->testFlag) { + if (pthread_create(&pProc->parentThread, &thAttr, taosProcThreadParentLoop, pProc) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to create thread since %s", terrstr()); + return -1; + } + } + + return 0; +} + +void taosProcStop(SProcObj *pProc) { + pProc->stopFlag = true; + // todo join +} + +void taosProcCleanup(SProcObj *pProc) {} + +int32_t taosProcPushChild(SProcObj *pProc, void *pCont, int32_t contLen) { + SProcQueue *pQueue = pProc->pChildQueue; + taosProcQueuePush(pQueue, pCont, contLen); +} diff --git a/source/util/test/queueTest.cpp b/source/util/test/queueTest.cpp index 310ae4350e2f354e01fc0cced7e7506fe6df9b26..7a448bd704b1a54bef2c34571ba796d75b0b3ba9 100644 --- a/source/util/test/queueTest.cpp +++ b/source/util/test/queueTest.cpp @@ -14,6 +14,9 @@ #include "os.h" #include "tqueue.h" +#include +#include + class UtilTestQueue : public ::testing::Test { public: void SetUp() override {} @@ -24,6 +27,104 @@ class UtilTestQueue : public ::testing::Test { static void TearDownTestSuite() {} }; -TEST_F(UtilTestQueue, 01_ReadQitemFromQsetByThread) { - EXPECT_EQ(0, 0); +TEST_F(UtilTestQueue, 01_fork) { + pid_t pid; + int shmid; + int* shmptr; + int* tmp; + + int err; + pthread_mutexattr_t mattr; + if ((err = pthread_mutexattr_init(&mattr)) < 0) { + printf("mutex addr init error:%s\n", strerror(err)); + exit(1); + } + + if ((err = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED)) < 0) { + printf("mutex addr get shared error:%s\n", strerror(err)); + exit(1); + } + + pthread_mutex_t* m; + int mid = shmget(IPC_PRIVATE, sizeof(pthread_mutex_t), 0600); + m = (pthread_mutex_t*)shmat(mid, NULL, 0); + + if ((err = pthread_mutex_init(m, &mattr)) < 0) { + printf("mutex mutex init error:%s\n", strerror(err)); + exit(1); + } + + if ((shmid = shmget(IPC_PRIVATE, 1000, IPC_CREAT | 0600)) < 0) { + perror("shmget error"); + exit(1); + } + + if ((shmptr = (int*)shmat(shmid, 0, 0)) == (void*)-1) { + perror("shmat error"); + exit(1); + } + + tmp = shmptr; + + int shmid2; + int** shmptr2; + if ((shmid2 = shmget(IPC_PRIVATE, 20, IPC_CREAT | 0600)) < 0) { + perror("shmget2 error"); + exit(1); + } + + if ((shmptr2 = (int**)shmat(shmid2, 0, 0)) == (void*)-1) { + perror("shmat2 error"); + exit(1); + } + + *shmptr2 = shmptr; + + if ((pid = fork()) < 0) { + perror("fork error"); + exit(1); + } else if (pid == 0) { + if ((err = pthread_mutex_lock(m)) < 0) { + printf("lock error:%s\n", strerror(err)); + exit(1); + } + for (int i = 0; i < 30; ++i) { + **shmptr2 = i; + (*shmptr2)++; + } + + if ((err = pthread_mutex_unlock(m)) < 0) { + printf("unlock error:%s\n", strerror(err)); + exit(1); + } + exit(0); + + } else { + if ((err = pthread_mutex_lock(m)) < 0) { + printf("lock error:%s\n", strerror(err)); + exit(1); + } + for (int i = 10; i < 42; ++i) { + **shmptr2 = i; + (*shmptr2)++; + } + if ((err = pthread_mutex_unlock(m)) < 0) { + printf("unlock error:%s\n", strerror(err)); + exit(1); + } + } + + wait(NULL); + + for (int i = 0; i < 70; ++i) { + printf("%d ", tmp[i]); + } + + printf("\n"); + + pthread_mutexattr_destroy(&mattr); + //销毁mutex + pthread_mutex_destroy(m); + + exit(0); } \ No newline at end of file