提交 1d8ffd95 编写于 作者: dengyihao's avatar dengyihao

handle except

上级 3d4280c7
...@@ -43,6 +43,7 @@ typedef struct SRpcMsg { ...@@ -43,6 +43,7 @@ typedef struct SRpcMsg {
int32_t code; int32_t code;
void * handle; // rpc handle returned to app void * handle; // rpc handle returned to app
void * ahandle; // app handle set by client void * ahandle; // app handle set by client
int noResp; // has response or not(default 0 indicate resp);
} SRpcMsg; } SRpcMsg;
typedef struct SRpcInit { typedef struct SRpcInit {
......
...@@ -158,9 +158,11 @@ static void destroyThrdObj(SCliThrdObj* pThrd); ...@@ -158,9 +158,11 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
} while (0) } while (0)
#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false) #define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false)
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
static void* cliWorkThread(void* arg); static void* cliWorkThread(void* arg);
bool cliMayContinueSendMsg(SCliConn* conn) { bool cliMaySendCachedMsg(SCliConn* conn) {
if (taosArrayGetSize(conn->cliMsgs) > 0) { if (taosArrayGetSize(conn->cliMsgs) > 0) {
cliSend(conn); cliSend(conn);
return true; return true;
...@@ -226,7 +228,7 @@ void cliHandleResp(SCliConn* conn) { ...@@ -226,7 +228,7 @@ void cliHandleResp(SCliConn* conn) {
} }
destroyCmsg(pMsg); destroyCmsg(pMsg);
if (cliMayContinueSendMsg(conn) == true) { if (cliMaySendCachedMsg(conn) == true) {
return; return;
} }
...@@ -441,7 +443,24 @@ static void cliDestroy(uv_handle_t* handle) { ...@@ -441,7 +443,24 @@ static void cliDestroy(uv_handle_t* handle) {
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
free(conn); free(conn);
} }
static bool cliHandleNoResp(SCliConn* conn) {
bool res = false;
SArray* msgs = conn->cliMsgs;
if (taosArrayGetSize(msgs) > 0) {
SCliMsg* pMsg = taosArrayGetP(msgs, 0);
if (REQUEST_NO_RESP(&pMsg->msg)) {
taosArrayRemove(msgs, 0);
destroyCmsg(pMsg);
res = true;
}
if (res == true) {
if (cliMaySendCachedMsg(conn) == false) {
addConnToPool(conn->hostThrd, conn);
}
}
}
return res;
}
static void cliSendCb(uv_write_t* req, int status) { static void cliSendCb(uv_write_t* req, int status) {
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
...@@ -452,6 +471,10 @@ static void cliSendCb(uv_write_t* req, int status) { ...@@ -452,6 +471,10 @@ static void cliSendCb(uv_write_t* req, int status) {
cliHandleExcept(pConn); cliHandleExcept(pConn);
return; return;
} }
if (cliHandleNoResp(pConn) == true) {
tTrace("%s cli conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn);
return;
}
uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb); uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
} }
...@@ -489,6 +512,7 @@ void cliSend(SCliConn* pConn) { ...@@ -489,6 +512,7 @@ void cliSend(SCliConn* pConn) {
msgLen += sizeof(STransUserMsg); msgLen += sizeof(STransUserMsg);
} }
pHead->resflag = REQUEST_NO_RESP(pMsg) ? 1 : 0;
pHead->msgType = pMsg->msgType; pHead->msgType = pMsg->msgType;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
......
...@@ -226,15 +226,22 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -226,15 +226,22 @@ static void uvHandleReq(SSrvConn* pConn) {
transMsg.msgType = pHead->msgType; transMsg.msgType = pHead->msgType;
transMsg.code = pHead->code; transMsg.code = pHead->code;
transMsg.ahandle = NULL; transMsg.ahandle = NULL;
transMsg.handle = pConn; transMsg.handle = NULL;
transClearBuffer(&pConn->readBuf); transClearBuffer(&pConn->readBuf);
pConn->inType = pHead->msgType; pConn->inType = pHead->msgType;
transRefSrvHandle(pConn);
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), if (pHead->resflag == 0) {
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), transRefSrvHandle(pConn);
ntohs(pConn->locaddr.sin_port), transMsg.contLen); transMsg.handle = pConn;
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
ntohs(pConn->locaddr.sin_port), transMsg.contLen);
} else {
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, no resp ", pConn,
TMSG_INFO(transMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen);
}
STrans* pTransInst = (STrans*)p->shandle; STrans* pTransInst = (STrans*)p->shandle;
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
......
...@@ -361,7 +361,7 @@ TEST_F(TransEnv, cliPersistHandle) { ...@@ -361,7 +361,7 @@ TEST_F(TransEnv, cliPersistHandle) {
tr->SetCliPersistFp(cliPersistHandle); tr->SetCliPersistFp(cliPersistHandle);
SRpcMsg resp = {0}; SRpcMsg resp = {0};
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
SRpcMsg req = {.handle = resp.handle}; SRpcMsg req = {.handle = resp.handle, .noResp = 0};
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
...@@ -448,6 +448,25 @@ TEST_F(TransEnv, srvPersistHandleExcept) { ...@@ -448,6 +448,25 @@ TEST_F(TransEnv, srvPersistHandleExcept) {
// conn broken // conn broken
// //
} }
TEST_F(TransEnv, cliPersistHandleExcept) {
tr->SetSrvContinueSend(processContinueSend);
tr->SetCliPersistFp(cliPersistHandle);
SRpcMsg resp = {0};
for (int i = 0; i < 5; i++) {
SRpcMsg req = {.handle = resp.handle};
req.msgType = 1;
req.pCont = rpcMallocCont(10);
req.contLen = 10;
tr->cliSendAndRecv(&req, &resp);
if (i > 2) {
tr->StopSrv();
break;
}
}
taosMsleep(2000);
// conn broken
//
}
TEST_F(TransEnv, multiCliPersistHandleExcept) { TEST_F(TransEnv, multiCliPersistHandleExcept) {
// conn broken // conn broken
...@@ -458,5 +477,15 @@ TEST_F(TransEnv, queryExcept) { ...@@ -458,5 +477,15 @@ TEST_F(TransEnv, queryExcept) {
// query and conn is broken // query and conn is broken
} }
TEST_F(TransEnv, noResp) { TEST_F(TransEnv, noResp) {
SRpcMsg resp = {0};
for (int i = 0; i < 5; i++) {
SRpcMsg req = {.noResp = 1};
req.msgType = 1;
req.pCont = rpcMallocCont(10);
req.contLen = 10;
tr->cliSendAndRecv(&req, &resp);
}
taosMsleep(2000);
// no resp // no resp
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册