提交 563d2ec6 编写于 作者: S Shengliang Guan

proc test

上级 16382206
...@@ -89,6 +89,7 @@ static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) { ...@@ -89,6 +89,7 @@ static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) {
} }
static void dndProcessProcHandle(void *handle) { static void dndProcessProcHandle(void *handle) {
dInfo("handle:%p, the child process dies and send an offline rsp", handle);
SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_DND_OFFLINE}; SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_DND_OFFLINE};
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
} }
......
...@@ -476,6 +476,7 @@ void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) { ...@@ -476,6 +476,7 @@ void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) {
while (h != NULL) { while (h != NULL) {
void *handle = *((void **)h); void *handle = *((void **)h);
(*HandleFp)(handle); (*HandleFp)(handle);
h = taosHashIterate(pProc->hash, h);
} }
taosThreadMutexUnlock(&pProc->pChildQueue->mutex); taosThreadMutexUnlock(&pProc->pChildQueue->mutex);
} }
......
...@@ -103,7 +103,7 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) { ...@@ -103,7 +103,7 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) {
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.shm = shm, .shm = shm,
.parent = (void *)((int64_t)1235), .parent = (void *)((int64_t)1235),
.name = "child_queue"}; .name = "1235_c"};
SProcObj *cproc = taosProcInit(&cfg); SProcObj *cproc = taosProcInit(&cfg);
ASSERT_NE(cproc, nullptr); ASSERT_NE(cproc, nullptr);
...@@ -123,7 +123,7 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) { ...@@ -123,7 +123,7 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) {
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, i, 0, PROC_REQ), 0); ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, i, 0, PROC_REQ), 0);
cfg.isChild = true; cfg.isChild = true;
cfg.name = "child_queue"; cfg.name = "1235_p";
SProcObj *pproc = taosProcInit(&cfg); SProcObj *pproc = taosProcInit(&cfg);
ASSERT_NE(pproc, nullptr); ASSERT_NE(pproc, nullptr);
taosProcRun(pproc); taosProcRun(pproc);
...@@ -160,12 +160,12 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) { ...@@ -160,12 +160,12 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) {
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.shm = shm, .shm = shm,
.parent = (void *)((int64_t)1235), .parent = (void *)((int64_t)1236),
.name = "child_queue"}; .name = "1236_c"};
SProcObj *cproc = taosProcInit(&cfg); SProcObj *cproc = taosProcInit(&cfg);
ASSERT_NE(cproc, nullptr); ASSERT_NE(cproc, nullptr);
cfg.name = "parent_queue"; cfg.name = "1236_p";
cfg.isChild = true; cfg.isChild = true;
SProcObj *pproc = taosProcInit(&cfg); SProcObj *pproc = taosProcInit(&cfg);
ASSERT_NE(pproc, nullptr); ASSERT_NE(pproc, nullptr);
...@@ -184,3 +184,61 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) { ...@@ -184,3 +184,61 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) {
taosProcCleanup(cproc); taosProcCleanup(cproc);
taosDropShm(&shm); taosDropShm(&shm);
} }
void ConsumeChild3(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) {
SRpcMsg msg;
memcpy(&msg, pHead, headLen);
char body[2000] = {0};
memcpy(body, pBody, bodyLen);
uDebug("====> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d handle:%" PRId64 " body:%s <====", (int64_t)parent,
ftype, headLen, bodyLen, (int64_t)msg.handle, body);
rpcFreeCont(pBody);
taosFreeQitem(pHead);
}
void processHandle(void *handle) { uDebug("----> remove handle:%" PRId64 " <----", (int64_t)handle); }
TEST_F(UtilTesProc, 03_Handle) {
// uDebugFlag = 207;
shm.size = 3000;
ASSERT_EQ(taosCreateShm(&shm, 1237, shm.size), 0);
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)ConsumeChild3,
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.parentConsumeFp = (ProcConsumeFp)NULL,
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.shm = shm,
.parent = (void *)((int64_t)1235),
.name = "1237_p"};
SProcObj *cproc = taosProcInit(&cfg);
ASSERT_NE(cproc, nullptr);
for (int32_t j = 0; j < 1; j++) {
int32_t i = 0;
for (i = 0; i < 20; ++i) {
head.handle = (void *)((int64_t)i);
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, i, (void *)((int64_t)i), PROC_REQ), 0);
}
cfg.isChild = true;
cfg.name = "child_queue";
SProcObj *pproc = taosProcInit(&cfg);
ASSERT_NE(pproc, nullptr);
taosProcRun(pproc);
taosProcCleanup(pproc);
taosProcRemoveHandle(cproc, (void *)((int64_t)3));
taosProcRemoveHandle(cproc, (void *)((int64_t)5));
taosProcRemoveHandle(cproc, (void *)((int64_t)6));
taosProcCloseHandles(cproc, processHandle);
}
taosProcCleanup(cproc);
taosDropShm(&shm);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册