提交 5e7da351 编写于 作者: S Shengliang Guan

fix(cluster): add refid in rpc, and handle it in multi-process mode

上级 fcd0d272
...@@ -53,8 +53,8 @@ int32_t taosProcRun(SProcObj *pProc); ...@@ -53,8 +53,8 @@ int32_t taosProcRun(SProcObj *pProc);
void taosProcStop(SProcObj *pProc); void taosProcStop(SProcObj *pProc);
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, EProcFuncType ftype); void *handle, int64_t handleRef, EProcFuncType ftype);
void taosProcRemoveHandle(SProcObj *pProc, void *handle); int64_t taosProcRemoveHandle(SProcObj *pProc, void *handle);
void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)); void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle));
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
EProcFuncType ftype); EProcFuncType ftype);
......
...@@ -88,7 +88,7 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe ...@@ -88,7 +88,7 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
dTrace("msg:%p, is created and put into child queue, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType), dTrace("msg:%p, is created and put into child queue, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType),
pRpc->handle, pMsg->user); pRpc->handle, pMsg->user);
code = taosProcPutToChildQ(pWrapper->procObj, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, pRpc->handle, code = taosProcPutToChildQ(pWrapper->procObj, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, pRpc->handle,
PROC_FUNC_REQ); pRpc->refId, PROC_FUNC_REQ);
} else { } else {
dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user); dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
ASSERT(1); ASSERT(1);
...@@ -356,7 +356,7 @@ static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t ...@@ -356,7 +356,7 @@ static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
dmSendRpcReq(pWrapper->pDnode, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg); dmSendRpcReq(pWrapper->pDnode, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
break; break;
case PROC_FUNC_RSP: case PROC_FUNC_RSP:
taosProcRemoveHandle(pWrapper->procObj, pMsg->handle); pMsg->refId = taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
dmSendRpcRsp(pWrapper->pDnode, pMsg); dmSendRpcRsp(pWrapper->pDnode, pMsg);
break; break;
default: default:
......
...@@ -154,7 +154,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) { ...@@ -154,7 +154,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) {
} }
static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen,
const char *pBody, int32_t rawBodyLen, int64_t handle, EProcFuncType ftype) { const char *pBody, int32_t rawBodyLen, int64_t handle, int64_t handleRef,
EProcFuncType ftype) {
if (rawHeadLen == 0 || pHead == NULL) { if (rawHeadLen == 0 || pHead == NULL) {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return -1; return -1;
...@@ -172,7 +173,7 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char ...@@ -172,7 +173,7 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char
} }
if (handle != 0 && ftype == PROC_FUNC_REQ) { if (handle != 0 && ftype == PROC_FUNC_REQ) {
if (taosHashPut(pProc->hash, &handle, sizeof(int64_t), &handle, sizeof(int64_t)) != 0) { if (taosHashPut(pProc->hash, &handle, sizeof(int64_t), &handleRef, sizeof(int64_t)) != 0) {
taosThreadMutexUnlock(&pQueue->mutex); taosThreadMutexUnlock(&pQueue->mutex);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -286,13 +287,13 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea ...@@ -286,13 +287,13 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
pQueue->head = headLen + bodyLen; pQueue->head = headLen + bodyLen;
} else if (remain < 8 + headLen) { } else if (remain < 8 + headLen) {
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, remain - 8); memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, remain - 8);
memcpy((char*)pHead + remain - 8, pQueue->pBuffer, headLen - (remain - 8)); memcpy((char *)pHead + remain - 8, pQueue->pBuffer, headLen - (remain - 8));
memcpy(pBody, pQueue->pBuffer + headLen - (remain - 8), bodyLen); memcpy(pBody, pQueue->pBuffer + headLen - (remain - 8), bodyLen);
pQueue->head = headLen - (remain - 8) + bodyLen; pQueue->head = headLen - (remain - 8) + bodyLen;
} else if (remain < 8 + headLen + bodyLen) { } else if (remain < 8 + headLen + bodyLen) {
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen); memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, remain - 8 - headLen); memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, remain - 8 - headLen);
memcpy((char*)pBody + remain - 8 - headLen, pQueue->pBuffer, bodyLen - (remain - 8 - headLen)); memcpy((char *)pBody + remain - 8 - headLen, pQueue->pBuffer, bodyLen - (remain - 8 - headLen));
pQueue->head = bodyLen - (remain - 8 - headLen); pQueue->head = bodyLen - (remain - 8 - headLen);
} else { } else {
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen); memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
...@@ -454,19 +455,25 @@ void taosProcCleanup(SProcObj *pProc) { ...@@ -454,19 +455,25 @@ void taosProcCleanup(SProcObj *pProc) {
} }
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, EProcFuncType ftype) { void *handle, int64_t handleRef, EProcFuncType ftype) {
if (ftype != PROC_FUNC_REQ) { if (ftype != PROC_FUNC_REQ) {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return -1; return -1;
} }
return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ftype); return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, handleRef,
ftype);
} }
void taosProcRemoveHandle(SProcObj *pProc, void *handle) { int64_t taosProcRemoveHandle(SProcObj *pProc, void *handle) {
int64_t h = (int64_t)handle; int64_t h = (int64_t)handle;
taosThreadMutexLock(&pProc->pChildQueue->mutex); taosThreadMutexLock(&pProc->pChildQueue->mutex);
int64_t *handleRef = taosHashGet(pProc->hash, &h, sizeof(int64_t));
taosHashRemove(pProc->hash, &h, sizeof(int64_t)); taosHashRemove(pProc->hash, &h, sizeof(int64_t));
taosThreadMutexUnlock(&pProc->pChildQueue->mutex); taosThreadMutexUnlock(&pProc->pChildQueue->mutex);
if (handleRef == NULL) return 0;
return *handleRef;
} }
void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) { void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) {
...@@ -484,7 +491,7 @@ void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) { ...@@ -484,7 +491,7 @@ void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) {
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
EProcFuncType ftype) { EProcFuncType ftype) {
int32_t retry = 0; int32_t retry = 0;
while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) { while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, 0, ftype) != 0) {
uWarn("proc:%s, failed to put to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry); uWarn("proc:%s, failed to put to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry);
retry++; retry++;
taosMsleep(retry); taosMsleep(retry);
......
...@@ -120,20 +120,20 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) { ...@@ -120,20 +120,20 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) {
SProcObj *cproc = taosProcInit(&cfg); SProcObj *cproc = taosProcInit(&cfg);
ASSERT_NE(cproc, nullptr); ASSERT_NE(cproc, nullptr);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_RSP), 0); ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_RSP), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_REGIST), 0); ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_REGIST), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_RELEASE), 0); ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_RELEASE), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, NULL, 12, body, 0, 0, PROC_FUNC_REQ), 0); ASSERT_NE(taosProcPutToChildQ(cproc, NULL, 12, body, 0, 0, 0, PROC_FUNC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_REQ), 0); ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, shm.size, body, 0, 0, PROC_FUNC_REQ), 0); ASSERT_NE(taosProcPutToChildQ(cproc, &head, shm.size, body, 0, 0, 0, PROC_FUNC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, shm.size, 0, PROC_FUNC_REQ), 0); ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, shm.size, 0, 0, PROC_FUNC_REQ), 0);
for (int32_t j = 0; j < 1000; j++) { for (int32_t j = 0; j < 1000; j++) {
int32_t i = 0; int32_t i = 0;
for (i = 0; i < 20; ++i) { for (i = 0; i < 20; ++i) {
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_FUNC_REQ), 0); ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, 0, PROC_FUNC_REQ), 0);
} }
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_FUNC_REQ), 0); ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, 0, PROC_FUNC_REQ), 0);
cfg.isChild = true; cfg.isChild = true;
cfg.name = "1235_p"; cfg.name = "1235_p";
...@@ -236,7 +236,7 @@ TEST_F(UtilTesProc, 03_Handle) { ...@@ -236,7 +236,7 @@ TEST_F(UtilTesProc, 03_Handle) {
int32_t i = 0; int32_t i = 0;
for (i = 0; i < 20; ++i) { for (i = 0; i < 20; ++i) {
head.handle = (void *)((int64_t)i); head.handle = (void *)((int64_t)i);
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), PROC_FUNC_REQ), 0); ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), i, PROC_FUNC_REQ), 0);
} }
cfg.isChild = true; cfg.isChild = true;
...@@ -246,9 +246,14 @@ TEST_F(UtilTesProc, 03_Handle) { ...@@ -246,9 +246,14 @@ TEST_F(UtilTesProc, 03_Handle) {
taosProcRun(pproc); taosProcRun(pproc);
taosProcCleanup(pproc); taosProcCleanup(pproc);
taosProcRemoveHandle(cproc, (void *)((int64_t)3)); int64_t ref = 0;
taosProcRemoveHandle(cproc, (void *)((int64_t)5));
taosProcRemoveHandle(cproc, (void *)((int64_t)6)); ref = taosProcRemoveHandle(cproc, (void *)((int64_t)3));
EXPECT_EQ(ref, 3);
ref = taosProcRemoveHandle(cproc, (void *)((int64_t)5));
EXPECT_EQ(ref, 5);
ref = taosProcRemoveHandle(cproc, (void *)((int64_t)6));
EXPECT_EQ(ref, 6);
taosProcCloseHandles(cproc, processHandle); taosProcCloseHandles(cproc, processHandle);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册