提交 474939ee 编写于 作者: S Shengliang Guan

shm

上级 1436d2f2
...@@ -178,7 +178,6 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { ...@@ -178,7 +178,6 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
.parentFreeHeadFp = (ProcFreeFp)free, .parentFreeHeadFp = (ProcFreeFp)free,
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.testFlag = 0,
.pParent = pWrapper, .pParent = pWrapper,
.name = pWrapper->name}; .name = pWrapper->name};
SProcObj *pProc = taosProcInit(&cfg); SProcObj *pProc = taosProcInit(&cfg);
......
...@@ -56,7 +56,6 @@ typedef struct SProcObj { ...@@ -56,7 +56,6 @@ typedef struct SProcObj {
int32_t pid; int32_t pid;
bool isChild; bool isChild;
bool stopFlag; bool stopFlag;
bool testFlag;
} SProcObj; } SProcObj;
static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) { static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) {
...@@ -77,7 +76,7 @@ static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) { ...@@ -77,7 +76,7 @@ static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) {
goto _OVER; goto _OVER;
} }
shmid = shmget(IPC_PRIVATE, sizeof(TdThreadMutex), 0600); shmid = shmget(IPC_PRIVATE, sizeof(TdThreadMutex), IPC_CREAT | 0600);
if (shmid <= 0) { if (shmid <= 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init mutex while shmget since %s", terrstr()); uError("failed to init mutex while shmget since %s", terrstr());
...@@ -101,8 +100,13 @@ static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) { ...@@ -101,8 +100,13 @@ static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) {
_OVER: _OVER:
if (code != 0) { if (code != 0) {
if (pMutex != NULL) {
taosThreadMutexDestroy(pMutex); taosThreadMutexDestroy(pMutex);
shmdt(pMutex);
}
if (shmid >= 0) {
shmctl(shmid, IPC_RMID, NULL); shmctl(shmid, IPC_RMID, NULL);
}
} else { } else {
*ppMutex = pMutex; *ppMutex = pMutex;
*pShmid = shmid; *pShmid = shmid;
...@@ -112,12 +116,12 @@ _OVER: ...@@ -112,12 +116,12 @@ _OVER:
return code; return code;
} }
static void taosProcDestroyMutex(TdThreadMutex *pMutex, int32_t *pShmid) { static void taosProcDestroyMutex(TdThreadMutex *pMutex, int32_t shmid) {
if (pMutex != NULL) { if (pMutex != NULL) {
taosThreadMutexDestroy(pMutex); taosThreadMutexDestroy(pMutex);
} }
if (*pShmid > 0) { if (shmid >= 0) {
shmctl(*pShmid, IPC_RMID, NULL); shmctl(shmid, IPC_RMID, NULL);
} }
} }
...@@ -141,9 +145,10 @@ static int32_t taosProcInitBuffer(void **ppBuffer, int32_t size) { ...@@ -141,9 +145,10 @@ static int32_t taosProcInitBuffer(void **ppBuffer, int32_t size) {
return shmid; return shmid;
} }
static void taosProcDestroyBuffer(void *pBuffer, int32_t *pShmid) { static void taosProcDestroyBuffer(void *pBuffer, int32_t shmid) {
if (*pShmid > 0) { if (shmid > 0) {
shmctl(*pShmid, IPC_RMID, NULL); shmdt(pBuffer);
shmctl(shmid, IPC_RMID, NULL);
} }
} }
...@@ -155,29 +160,28 @@ static SProcQueue *taosProcQueueInit(int32_t size) { ...@@ -155,29 +160,28 @@ static SProcQueue *taosProcQueueInit(int32_t size) {
SProcQueue *pQueue = NULL; SProcQueue *pQueue = NULL;
int32_t shmId = taosProcInitBuffer((void **)&pQueue, bufSize + headSize); int32_t shmId = taosProcInitBuffer((void **)&pQueue, bufSize + headSize);
if (shmId <= 0) { if (shmId < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pQueue->bufferShmid = shmId; pQueue->bufferShmid = shmId;
if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) { if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) {
free(pQueue); taosProcDestroyBuffer(pQueue, pQueue->bufferShmid);
return NULL; return NULL;
} }
if (tsem_init(&pQueue->sem, 1, 0) != 0) { if (tsem_init(&pQueue->sem, 1, 0) != 0) {
taosProcDestroyMutex(pQueue->mutex, &pQueue->mutexShmid); taosProcDestroyMutex(pQueue->mutex, pQueue->mutexShmid);
free(pQueue); taosProcDestroyBuffer(pQueue, pQueue->bufferShmid);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) { if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) {
taosProcDestroyMutex(pQueue->mutex, &pQueue->mutexShmid);
tsem_destroy(&pQueue->sem); tsem_destroy(&pQueue->sem);
free(pQueue); taosProcDestroyMutex(pQueue->mutex, pQueue->mutexShmid);
taosProcDestroyBuffer(pQueue, pQueue->bufferShmid);
return NULL; return NULL;
} }
...@@ -190,12 +194,12 @@ static SProcQueue *taosProcQueueInit(int32_t size) { ...@@ -190,12 +194,12 @@ static SProcQueue *taosProcQueueInit(int32_t size) {
return pQueue; return pQueue;
} }
static void taosProcQueueCleanup(SProcQueue *pQueue) { static void taosProcCleanupQueue(SProcQueue *pQueue) {
if (pQueue != NULL) { if (pQueue != NULL) {
uDebug("proc:%s, queue:%p clean up", pQueue->name, pQueue); uDebug("proc:%s, queue:%p clean up", pQueue->name, pQueue);
taosProcDestroyMutex(pQueue->mutex, &pQueue->mutexShmid);
tsem_destroy(&pQueue->sem); tsem_destroy(&pQueue->sem);
free(pQueue); taosProcDestroyMutex(pQueue->mutex, pQueue->mutexShmid);
taosProcDestroyBuffer(pQueue, pQueue->bufferShmid);
} }
} }
...@@ -344,12 +348,10 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { ...@@ -344,12 +348,10 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
} }
pProc->name = pCfg->name; pProc->name = pCfg->name;
pProc->testFlag = pCfg->testFlag;
pProc->pChildQueue = taosProcQueueInit(pCfg->childQueueSize); pProc->pChildQueue = taosProcQueueInit(pCfg->childQueueSize);
pProc->pParentQueue = taosProcQueueInit(pCfg->parentQueueSize); pProc->pParentQueue = taosProcQueueInit(pCfg->parentQueueSize);
if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) { if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) {
taosProcQueueCleanup(pProc->pChildQueue); taosProcCleanupQueue(pProc->pChildQueue);
free(pProc); free(pProc);
return NULL; return NULL;
} }
...@@ -369,17 +371,15 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { ...@@ -369,17 +371,15 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
pProc->pParentQueue->freeBodyFp = pCfg->parentFreeBodyFp; pProc->pParentQueue->freeBodyFp = pCfg->parentFreeBodyFp;
pProc->pParentQueue->consumeFp = pCfg->parentConsumeFp; pProc->pParentQueue->consumeFp = pCfg->parentConsumeFp;
uDebug("proc:%s, initialized, child queue:%p parent queue:%p", pProc->name, pProc->pChildQueue, pProc->pParentQueue); uDebug("proc:%s, is initialized, child queue:%p parent queue:%p", pProc->name, pProc->pChildQueue, pProc->pParentQueue);
if (!pProc->testFlag) {
pProc->pid = fork(); pProc->pid = fork();
if (pProc->pid == 0) { if (pProc->pid == 0) {
pProc->isChild = 1; pProc->isChild = 1;
uInfo("this is child process, pid:%d", pProc->pid); uInfo("this is child process, pid:%d", pProc->pid);
} else { } else {
pProc->isChild = 0; pProc->isChild = 0;
uInfo("this is parent process, pid:%d", pProc->pid); uInfo("this is parent process, child pid:%d", pProc->pid);
}
} }
return pProc; return pProc;
...@@ -398,7 +398,7 @@ static void taosProcThreadLoop(SProcQueue *pQueue) { ...@@ -398,7 +398,7 @@ static void taosProcThreadLoop(SProcQueue *pQueue) {
if (code < 0) { if (code < 0) {
uDebug("proc:%s, get no message from queue:%p and exiting", pQueue->name, pQueue); uDebug("proc:%s, get no message from queue:%p and exiting", pQueue->name, pQueue);
break; break;
} else if (code < 0) { } else if (code == 0) {
uTrace("proc:%s, get no message from queue:%p since %s", pQueue->name, pQueue, terrstr()); uTrace("proc:%s, get no message from queue:%p since %s", pQueue->name, pQueue, terrstr());
taosMsleep(1); taosMsleep(1);
continue; continue;
...@@ -413,16 +413,14 @@ int32_t taosProcRun(SProcObj *pProc) { ...@@ -413,16 +413,14 @@ int32_t taosProcRun(SProcObj *pProc) {
taosThreadAttrInit(&thAttr); taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pProc->isChild || pProc->testFlag) { if (pProc->isChild) {
if (taosThreadCreate(&pProc->childThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pChildQueue) != 0) { if (taosThreadCreate(&pProc->childThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pChildQueue) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create thread since %s", terrstr()); uError("failed to create thread since %s", terrstr());
return -1; return -1;
} }
uDebug("proc:%s, child start to consume queue:%p", pProc->name, pProc->pChildQueue); uDebug("proc:%s, child start to consume queue:%p", pProc->name, pProc->pChildQueue);
} } else {
if (!pProc->isChild || pProc->testFlag) {
if (taosThreadCreate(&pProc->parentThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pParentQueue) != 0) { if (taosThreadCreate(&pProc->parentThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pParentQueue) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create thread since %s", terrstr()); uError("failed to create thread since %s", terrstr());
...@@ -445,8 +443,8 @@ void taosProcCleanup(SProcObj *pProc) { ...@@ -445,8 +443,8 @@ void taosProcCleanup(SProcObj *pProc) {
if (pProc != NULL) { if (pProc != NULL) {
uDebug("proc:%s, clean up", pProc->name); uDebug("proc:%s, clean up", pProc->name);
taosProcStop(pProc); taosProcStop(pProc);
taosProcQueueCleanup(pProc->pChildQueue); taosProcCleanupQueue(pProc->pChildQueue);
taosProcQueueCleanup(pProc->pParentQueue); taosProcCleanupQueue(pProc->pParentQueue);
free(pProc); free(pProc);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册