提交 3343bef1 编写于 作者: S Shengliang Guan

shm

上级 ad1609fe
...@@ -43,6 +43,7 @@ typedef struct { ...@@ -43,6 +43,7 @@ typedef struct {
ProcFreeFp parentFreeBodyFp; ProcFreeFp parentFreeBodyFp;
bool testFlag; bool testFlag;
void *pParent; void *pParent;
const char *name;
} SProcCfg; } SProcCfg;
SProcObj *taosProcInit(const SProcCfg *pCfg); SProcObj *taosProcInit(const SProcCfg *pCfg);
......
...@@ -309,7 +309,6 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) { ...@@ -309,7 +309,6 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "os release", info.release, 1) != 0) return -1; if (cfgAddString(pCfg, "os release", info.release, 1) != 0) return -1;
if (cfgAddString(pCfg, "os version", info.version, 1) != 0) return -1; if (cfgAddString(pCfg, "os version", info.version, 1) != 0) return -1;
if (cfgAddString(pCfg, "os machine", info.machine, 1) != 0) return -1; if (cfgAddString(pCfg, "os machine", info.machine, 1) != 0) return -1;
if (cfgAddString(pCfg, "os sysname", info.sysname, 1) != 0) return -1;
if (cfgAddString(pCfg, "version", version, 1) != 0) return -1; if (cfgAddString(pCfg, "version", version, 1) != 0) return -1;
if (cfgAddString(pCfg, "compatible_version", compatible_version, 1) != 0) return -1; if (cfgAddString(pCfg, "compatible_version", compatible_version, 1) != 0) return -1;
......
...@@ -134,8 +134,9 @@ int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) { ...@@ -134,8 +134,9 @@ int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) {
.parentFreeHeadFp = (ProcFreeFp)free, .parentFreeHeadFp = (ProcFreeFp)free,
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.testFlag = true, .testFlag = 0,
.pParent = pDnode}; .pParent = pDnode,
.name = "mnode"};
pMgmt->pProcess = taosProcInit(&cfg); pMgmt->pProcess = taosProcInit(&cfg);
if (pMgmt->pProcess == NULL) { if (pMgmt->pProcess == NULL) {
......
...@@ -159,7 +159,6 @@ static int32_t mmBuildMsg(SMndMsg *pMsg, SRpcMsg *pRpc) { ...@@ -159,7 +159,6 @@ static int32_t mmBuildMsg(SMndMsg *pMsg, SRpcMsg *pRpc) {
pMsg->rpcMsg = *pRpc; pMsg->rpcMsg = *pRpc;
pMsg->createdTime = taosGetTimestampSec(); pMsg->createdTime = taosGetTimestampSec();
dTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMsg, pRpc->ahandle, pRpc->handle, pMsg->user);
return 0; return 0;
} }
...@@ -183,6 +182,8 @@ void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -183,6 +182,8 @@ void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
goto _OVER; goto _OVER;
} }
dTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMsg, pRpc->ahandle, pRpc->handle, pMsg->user);
if (pMgmt->singleProc) { if (pMgmt->singleProc) {
code = (*msgFp)(pDnode, pMsg); code = (*msgFp)(pDnode, pMsg);
} else { } else {
...@@ -193,6 +194,7 @@ _OVER: ...@@ -193,6 +194,7 @@ _OVER:
if (code == 0) { if (code == 0) {
if (!pMgmt->singleProc) { if (!pMgmt->singleProc) {
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont); rpcFreeCont(pRpc->pCont);
} }
...@@ -202,6 +204,7 @@ _OVER: ...@@ -202,6 +204,7 @@ _OVER:
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
mmSendRpcRsp(pDnode, &rsp); mmSendRpcRsp(pDnode, &rsp);
} }
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont); rpcFreeCont(pRpc->pCont);
} }
...@@ -231,6 +234,7 @@ static int32_t mmPutMndMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMndMs ...@@ -231,6 +234,7 @@ static int32_t mmPutMndMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMndMs
SMnode *pMnode = mmAcquire(pDnode); SMnode *pMnode = mmAcquire(pDnode);
if (pMnode == NULL) return -1; if (pMnode == NULL) return -1;
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
int32_t code = dndWriteMsgToWorker(pWorker, pMsg, 0); int32_t code = dndWriteMsgToWorker(pWorker, pMsg, 0);
mmRelease(pDnode, pMnode); mmRelease(pDnode, pMnode);
return code; return code;
...@@ -242,11 +246,13 @@ static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMs ...@@ -242,11 +246,13 @@ static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMs
return -1; return -1;
} }
dTrace("msg:%p, is created", pMsg);
pMsg->rpcMsg = *pRpc; pMsg->rpcMsg = *pRpc;
pMsg->createdTime = taosGetTimestampSec(); pMsg->createdTime = taosGetTimestampSec();
int32_t code = mmPutMndMsgToWorker(pDnode, pWorker, pMsg); int32_t code = mmPutMndMsgToWorker(pDnode, pWorker, pMsg);
if (code != 0) { if (code != 0) {
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont); rpcFreeCont(pRpc->pCont);
} }
...@@ -271,6 +277,7 @@ void mmPutRpcRspToWorker(SDnode *pDnode, SRpcMsg *pRpc) { ...@@ -271,6 +277,7 @@ void mmPutRpcRspToWorker(SDnode *pDnode, SRpcMsg *pRpc) {
} }
void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {
dTrace("msg:%p, get from child queue", pMsg);
SMndMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
...@@ -285,18 +292,22 @@ void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pC ...@@ -285,18 +292,22 @@ void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pC
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
mmPutRpcRspToWorker(pDnode, &rsp); mmPutRpcRspToWorker(pDnode, &rsp);
} }
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
rpcFreeCont(pCont); rpcFreeCont(pCont);
} }
} }
void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {
dTrace("msg:%p, get from parent queue", pMsg);
pMsg->pCont = pCont; pMsg->pCont = pCont;
mmSendRpcRsp(pDnode, pMsg); mmSendRpcRsp(pDnode, pMsg);
free(pMsg); free(pMsg);
} }
static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) { static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) {
dTrace("msg:%p, get from msg queue", pMsg);
SMnode *pMnode = mmAcquire(pDnode); SMnode *pMnode = mmAcquire(pDnode);
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
bool isReq = (pRpc->msgType & 1U); bool isReq = (pRpc->msgType & 1U);
...@@ -321,6 +332,7 @@ static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) { ...@@ -321,6 +332,7 @@ static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) {
} }
} }
dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pRpc->pCont); rpcFreeCont(pRpc->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
...@@ -19,25 +19,32 @@ ...@@ -19,25 +19,32 @@
#include "tlog.h" #include "tlog.h"
#include "tqueue.h" #include "tqueue.h"
// todo
#include <sys/shm.h>
#include <sys/wait.h>
#define SHM_DEFAULT_SIZE (20 * 1024 * 1024) #define SHM_DEFAULT_SIZE (20 * 1024 * 1024)
#define CEIL8(n) (ceil((float)(n) / 8) * 8) #define CEIL8(n) (ceil((float)(n) / 8) * 8)
typedef void *(*ProcThreadFp)(void *param); typedef void *(*ProcThreadFp)(void *param);
typedef struct SProcQueue { typedef struct SProcQueue {
int32_t head; int32_t head;
int32_t tail; int32_t tail;
int32_t total; int32_t total;
int32_t avail; int32_t avail;
int32_t items; int32_t items;
char *pBuffer; char *pBuffer;
ProcMallocFp mallocHeadFp; ProcMallocFp mallocHeadFp;
ProcFreeFp freeHeadFp; ProcFreeFp freeHeadFp;
ProcMallocFp mallocBodyFp; ProcMallocFp mallocBodyFp;
ProcFreeFp freeBodyFp; ProcFreeFp freeBodyFp;
ProcConsumeFp consumeFp; ProcConsumeFp consumeFp;
void *pParent; void *pParent;
tsem_t sem; tsem_t sem;
pthread_mutex_t mutex; pthread_mutex_t *mutex;
int32_t mutexShmid;
int32_t bufferShmid;
const char *name;
} SProcQueue; } SProcQueue;
typedef struct SProcObj { typedef struct SProcObj {
...@@ -45,37 +52,135 @@ typedef struct SProcObj { ...@@ -45,37 +52,135 @@ typedef struct SProcObj {
SProcQueue *pChildQueue; SProcQueue *pChildQueue;
pthread_t parentThread; pthread_t parentThread;
SProcQueue *pParentQueue; SProcQueue *pParentQueue;
const char *name;
int32_t pid; int32_t pid;
bool isChild; bool isChild;
bool stopFlag; bool stopFlag;
bool testFlag; bool testFlag;
} SProcObj; } SProcObj;
static int32_t taosProcInitMutex(pthread_mutex_t **ppMutex, int32_t *pShmid) {
pthread_mutex_t *pMutex = NULL;
pthread_mutexattr_t mattr = {0};
int32_t shmid = -1;
int32_t code = -1;
if (pthread_mutexattr_init(&mattr) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init mutex while init attr since %s", terrstr());
goto _OVER;
}
if (pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init mutex while set shared since %s", terrstr());
goto _OVER;
}
shmid = shmget(IPC_PRIVATE, sizeof(pthread_mutex_t), 0600);
if (shmid <= 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init mutex while shmget since %s", terrstr());
goto _OVER;
}
pMutex = (pthread_mutex_t *)shmat(shmid, NULL, 0);
if (pMutex == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init mutex while shmat since %s", terrstr());
goto _OVER;
}
if (pthread_mutex_init(pMutex, &mattr) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init mutex since %s", terrstr());
goto _OVER;
}
code = 0;
_OVER:
if (code != 0) {
pthread_mutex_destroy(pMutex);
shmctl(shmid, IPC_RMID, NULL);
} else {
*ppMutex = pMutex;
*pShmid = shmid;
}
pthread_mutexattr_destroy(&mattr);
return code;
}
static void taosProcDestroyMutex(pthread_mutex_t *pMutex, int32_t *pShmid) {
if (pMutex != NULL) {
pthread_mutex_destroy(pMutex);
}
if (*pShmid > 0) {
shmctl(*pShmid, IPC_RMID, NULL);
}
}
static int32_t taosProcInitBuffer(void **ppBuffer, int32_t size) {
int32_t shmid = shmget(IPC_PRIVATE, size, IPC_CREAT | 0600);
if (shmid <= 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init buffer while shmget since %s", terrstr());
return -1;
}
void *shmptr = (pthread_mutex_t *)shmat(shmid, NULL, 0);
if (shmptr == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init buffer while shmat since %s", terrstr());
shmctl(shmid, IPC_RMID, NULL);
return -1;
}
*ppBuffer = shmptr;
return shmid;
}
static void taosProcDestroyBuffer(void *pBuffer, int32_t *pShmid) {
if (*pShmid > 0) {
shmctl(*pShmid, IPC_RMID, NULL);
}
}
static SProcQueue *taosProcQueueInit(int32_t size) { static SProcQueue *taosProcQueueInit(int32_t size) {
if (size <= 0) size = SHM_DEFAULT_SIZE; if (size <= 0) size = SHM_DEFAULT_SIZE;
int32_t bufSize = CEIL8(size); int32_t bufSize = CEIL8(size);
int32_t headSize = CEIL8(sizeof(SProcQueue)); int32_t headSize = CEIL8(sizeof(SProcQueue));
SProcQueue *pQueue = malloc(bufSize + headSize); SProcQueue *pQueue = NULL;
if (pQueue == NULL) { int32_t shmId = taosProcInitBuffer((void **)&pQueue, bufSize + headSize);
if (shmId <= 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
if (pthread_mutex_init(&pQueue->mutex, NULL) != 0) { pQueue->bufferShmid = shmId;
if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) {
free(pQueue); free(pQueue);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
if (tsem_init(&pQueue->sem, 0, 0) != 0) { if (tsem_init(&pQueue->sem, 1, 0) != 0) {
pthread_mutex_destroy(&pQueue->mutex); taosProcDestroyMutex(pQueue->mutex, &pQueue->mutexShmid);
free(pQueue); free(pQueue);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) {
taosProcDestroyMutex(pQueue->mutex, &pQueue->mutexShmid);
tsem_destroy(&pQueue->sem);
free(pQueue);
return NULL;
}
pQueue->head = 0; pQueue->head = 0;
pQueue->tail = 0; pQueue->tail = 0;
pQueue->total = bufSize; pQueue->total = bufSize;
...@@ -87,7 +192,8 @@ static SProcQueue *taosProcQueueInit(int32_t size) { ...@@ -87,7 +192,8 @@ static SProcQueue *taosProcQueueInit(int32_t size) {
static void taosProcQueueCleanup(SProcQueue *pQueue) { static void taosProcQueueCleanup(SProcQueue *pQueue) {
if (pQueue != NULL) { if (pQueue != NULL) {
pthread_mutex_destroy(&pQueue->mutex); uDebug("proc:%s, queue:%p clean up", pQueue->name, pQueue);
taosProcDestroyMutex(pQueue->mutex, &pQueue->mutexShmid);
tsem_destroy(&pQueue->sem); tsem_destroy(&pQueue->sem);
free(pQueue); free(pQueue);
} }
...@@ -98,9 +204,9 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea ...@@ -98,9 +204,9 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea
const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t bodyLen = CEIL8(rawBodyLen);
const int32_t fullLen = headLen + bodyLen + 8; const int32_t fullLen = headLen + bodyLen + 8;
pthread_mutex_lock(&pQueue->mutex); pthread_mutex_lock(pQueue->mutex);
if (fullLen > pQueue->avail) { if (fullLen > pQueue->avail) {
pthread_mutex_unlock(&pQueue->mutex); pthread_mutex_unlock(pQueue->mutex);
terrno = TSDB_CODE_OUT_OF_SHM_MEM; terrno = TSDB_CODE_OUT_OF_SHM_MEM;
return -1; return -1;
} }
...@@ -146,11 +252,10 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea ...@@ -146,11 +252,10 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea
pQueue->avail -= fullLen; pQueue->avail -= fullLen;
pQueue->items++; pQueue->items++;
pthread_mutex_unlock(&pQueue->mutex); pthread_mutex_unlock(pQueue->mutex);
tsem_post(&pQueue->sem); tsem_post(&pQueue->sem);
// (*pQueue->freeHeadFp)(pHead); uTrace("proc:%s, push msg:%p:%d cont:%p:%d to queue:%p", pQueue->name, pHead, rawHeadLen, pBody, rawBodyLen, pQueue);
// (*pQueue->freeBodyFp)(pBody);
return 0; return 0;
} }
...@@ -158,9 +263,9 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea ...@@ -158,9 +263,9 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea
int32_t *pBodyLen) { int32_t *pBodyLen) {
tsem_wait(&pQueue->sem); tsem_wait(&pQueue->sem);
pthread_mutex_lock(&pQueue->mutex); pthread_mutex_lock(pQueue->mutex);
if (pQueue->total - pQueue->avail <= 0) { if (pQueue->total - pQueue->avail <= 0) {
pthread_mutex_unlock(&pQueue->mutex); pthread_mutex_unlock(pQueue->mutex);
tsem_post(&pQueue->sem); tsem_post(&pQueue->sem);
terrno = TSDB_CODE_OUT_OF_SHM_MEM; terrno = TSDB_CODE_OUT_OF_SHM_MEM;
return -1; return -1;
...@@ -179,7 +284,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea ...@@ -179,7 +284,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea
void *pHead = (*pQueue->mallocHeadFp)(headLen); void *pHead = (*pQueue->mallocHeadFp)(headLen);
void *pBody = (*pQueue->mallocBodyFp)(bodyLen); void *pBody = (*pQueue->mallocBodyFp)(bodyLen);
if (pHead == NULL || pBody == NULL) { if (pHead == NULL || pBody == NULL) {
pthread_mutex_unlock(&pQueue->mutex); pthread_mutex_unlock(pQueue->mutex);
tsem_post(&pQueue->sem); tsem_post(&pQueue->sem);
(*pQueue->freeHeadFp)(pHead); (*pQueue->freeHeadFp)(pHead);
(*pQueue->freeBodyFp)(pBody); (*pQueue->freeBodyFp)(pBody);
...@@ -220,12 +325,14 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea ...@@ -220,12 +325,14 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea
pQueue->avail = pQueue->avail + headLen + bodyLen + 8; pQueue->avail = pQueue->avail + headLen + bodyLen + 8;
pQueue->items--; pQueue->items--;
pthread_mutex_unlock(&pQueue->mutex); pthread_mutex_unlock(pQueue->mutex);
*ppHead = pHead; *ppHead = pHead;
*ppBody = pBody; *ppBody = pBody;
*pHeadLen = headLen; *pHeadLen = headLen;
*pBodyLen = bodyLen; *pBodyLen = bodyLen;
uTrace("proc:%s, get msg:%p:%d cont:%p:%d from queue:%p", pQueue->name, pHead, headLen, pBody, bodyLen, pQueue);
return 0; return 0;
} }
...@@ -236,22 +343,25 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { ...@@ -236,22 +343,25 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
return NULL; return NULL;
} }
pProc->name = pCfg->name;
pProc->testFlag = pCfg->testFlag;
pProc->pChildQueue = taosProcQueueInit(pCfg->childQueueSize); pProc->pChildQueue = taosProcQueueInit(pCfg->childQueueSize);
pProc->pParentQueue = taosProcQueueInit(pCfg->parentQueueSize); pProc->pParentQueue = taosProcQueueInit(pCfg->parentQueueSize);
if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) { if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) {
taosProcQueueCleanup(pProc->pChildQueue); taosProcQueueCleanup(pProc->pChildQueue);
taosProcQueueCleanup(pProc->pParentQueue);
free(pProc); free(pProc);
return NULL; return NULL;
} }
pProc->testFlag = pCfg->testFlag; pProc->pChildQueue->name = pCfg->name;
pProc->pChildQueue->pParent = pCfg->pParent; pProc->pChildQueue->pParent = pCfg->pParent;
pProc->pChildQueue->mallocHeadFp = pCfg->childMallocHeadFp; pProc->pChildQueue->mallocHeadFp = pCfg->childMallocHeadFp;
pProc->pChildQueue->freeHeadFp = pCfg->childFreeHeadFp; pProc->pChildQueue->freeHeadFp = pCfg->childFreeHeadFp;
pProc->pChildQueue->mallocBodyFp = pCfg->childMallocBodyFp; pProc->pChildQueue->mallocBodyFp = pCfg->childMallocBodyFp;
pProc->pChildQueue->freeBodyFp = pCfg->childFreeBodyFp; pProc->pChildQueue->freeBodyFp = pCfg->childFreeBodyFp;
pProc->pChildQueue->consumeFp = pCfg->childConsumeFp; pProc->pChildQueue->consumeFp = pCfg->childConsumeFp;
pProc->pParentQueue->name = pCfg->name;
pProc->pParentQueue->pParent = pCfg->pParent; pProc->pParentQueue->pParent = pCfg->pParent;
pProc->pParentQueue->mallocHeadFp = pCfg->parentdMallocHeadFp; pProc->pParentQueue->mallocHeadFp = pCfg->parentdMallocHeadFp;
pProc->pParentQueue->freeHeadFp = pCfg->parentFreeHeadFp; pProc->pParentQueue->freeHeadFp = pCfg->parentFreeHeadFp;
...@@ -259,8 +369,20 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { ...@@ -259,8 +369,20 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
pProc->pParentQueue->freeBodyFp = pCfg->parentFreeBodyFp; pProc->pParentQueue->freeBodyFp = pCfg->parentFreeBodyFp;
pProc->pParentQueue->consumeFp = pCfg->parentConsumeFp; pProc->pParentQueue->consumeFp = pCfg->parentConsumeFp;
// todo uDebug("proc:%s, initialized, child queue:%p parent queue:%p", pProc->name, pProc->pChildQueue, pProc->pParentQueue);
pProc->isChild = 0;
if (!pProc->testFlag) {
pProc->pid = fork();
if (pProc->pid == 0) {
tsLogInited = 0;
taosInitLog("mnodelog", 1);
pProc->isChild = 1;
uInfo("this is child process, pid:%d", pProc->pid);
} else {
pProc->isChild = 0;
uInfo("this is parent process, pid:%d", pProc->pid);
}
}
return pProc; return pProc;
} }
...@@ -271,13 +393,15 @@ static void taosProcThreadLoop(SProcQueue *pQueue) { ...@@ -271,13 +393,15 @@ static void taosProcThreadLoop(SProcQueue *pQueue) {
void *pHead, *pBody; void *pHead, *pBody;
int32_t headLen, bodyLen; int32_t headLen, bodyLen;
uDebug("proc:%s, start to get message from queue:%p", pQueue->name, pQueue);
while (1) { while (1) {
int32_t code = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen); int32_t code = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen);
if (code < 0) { if (code < 0) {
uDebug("queue:%p, got no message and exiting", pQueue); uDebug("proc:%s, get no message from queue:%p and exiting", pQueue->name, pQueue);
break; break;
} else if (code < 0) { } else if (code < 0) {
uTrace("queue:%p, got no message since %s", pQueue, terrstr()); uTrace("proc:%s, get no message from queue:%p since %s", pQueue->name, pQueue, terrstr());
taosMsleep(1); taosMsleep(1);
continue; continue;
} else { } else {
...@@ -297,6 +421,7 @@ int32_t taosProcStart(SProcObj *pProc) { ...@@ -297,6 +421,7 @@ int32_t taosProcStart(SProcObj *pProc) {
uError("failed to create thread since %s", terrstr()); uError("failed to create thread since %s", terrstr());
return -1; return -1;
} }
uDebug("proc:%s, child start to consume queue:%p", pProc->name, pProc->pChildQueue);
} }
if (!pProc->isChild || pProc->testFlag) { if (!pProc->isChild || pProc->testFlag) {
...@@ -305,6 +430,7 @@ int32_t taosProcStart(SProcObj *pProc) { ...@@ -305,6 +430,7 @@ int32_t taosProcStart(SProcObj *pProc) {
uError("failed to create thread since %s", terrstr()); uError("failed to create thread since %s", terrstr());
return -1; return -1;
} }
uDebug("proc:%s, parent start to consume queue:%p", pProc->name, pProc->pParentQueue);
} }
return 0; return 0;
...@@ -318,6 +444,7 @@ void taosProcStop(SProcObj *pProc) { ...@@ -318,6 +444,7 @@ void taosProcStop(SProcObj *pProc) {
void taosProcCleanup(SProcObj *pProc) { void taosProcCleanup(SProcObj *pProc) {
if (pProc != NULL) { if (pProc != NULL) {
uDebug("proc:%s, clean up", pProc->name);
taosProcQueueCleanup(pProc->pChildQueue); taosProcQueueCleanup(pProc->pChildQueue);
taosProcQueueCleanup(pProc->pParentQueue); taosProcQueueCleanup(pProc->pParentQueue);
free(pProc); free(pProc);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册