提交 f43efb9f 编写于 作者: dengyihao's avatar dengyihao

handle except and update UT

上级 6e9bc0cb
...@@ -73,7 +73,7 @@ typedef struct SRpcInit { ...@@ -73,7 +73,7 @@ typedef struct SRpcInit {
void *(*mfp)(void *parent, tmsg_t msgType); void *(*mfp)(void *parent, tmsg_t msgType);
// call back to handle except when query/fetch in progress // call back to handle except when query/fetch in progress
void (*efp)(void *parent, tmsg_t msgType); bool (*efp)(void *parent, tmsg_t msgType);
void *parent; void *parent;
} SRpcInit; } SRpcInit;
......
...@@ -65,7 +65,7 @@ typedef struct { ...@@ -65,7 +65,7 @@ typedef struct {
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
bool (*pfp)(void* parent, tmsg_t msgType); bool (*pfp)(void* parent, tmsg_t msgType);
void* (*mfp)(void* parent, tmsg_t msgType); void* (*mfp)(void* parent, tmsg_t msgType);
void (*efp)(void* parent, tmsg_t msgType); bool (*efp)(void* parent, tmsg_t msgType);
int32_t refCount; int32_t refCount;
void* parent; void* parent;
......
...@@ -106,6 +106,8 @@ static void uvStartSendRespInternal(SSrvMsg* smsg); ...@@ -106,6 +106,8 @@ static void uvStartSendRespInternal(SSrvMsg* smsg);
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb); static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg); static void uvStartSendResp(SSrvMsg* msg);
static void uvNotifyLinkBrokenToApp(SSrvConn* conn);
static void destroySmsg(SSrvMsg* smsg); static void destroySmsg(SSrvMsg* smsg);
// check whether already read complete packet // check whether already read complete packet
static SSrvConn* createConn(void* hThrd); static SSrvConn* createConn(void* hThrd);
...@@ -233,7 +235,7 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -233,7 +235,7 @@ static void uvHandleReq(SSrvConn* pConn) {
ntohs(pConn->locaddr.sin_port), transMsg.contLen); ntohs(pConn->locaddr.sin_port), transMsg.contLen);
STrans* pTransInst = (STrans*)p->shandle; STrans* pTransInst = (STrans*)p->shandle;
(*((STrans*)p->shandle)->cfp)(pTransInst->parent, &transMsg, NULL); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
// uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth // auth
// validate msg type // validate msg type
...@@ -261,13 +263,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { ...@@ -261,13 +263,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
tError("server conn %p read error: %s", conn, uv_err_name(nread)); tError("server conn %p read error: %s", conn, uv_err_name(nread));
if (nread < 0) { if (nread < 0) {
conn->broken = true; conn->broken = true;
transUnrefSrvHandle(conn); uvNotifyLinkBrokenToApp(conn);
// if (conn->ref > 1) { // STrans* pTransInst = conn->pTransInst;
// conn->ref++; // ref > 1 signed that write is in progress // if (pTransInst->efp != NULL && (pTransInst->efp)(NULL, conn->inType)) {
//} //}
// tError("server conn %p read error: %s", conn, uv_err_name(nread)); transUnrefSrvHandle(conn);
// destroyConn(conn, true);
} }
} }
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
...@@ -373,6 +374,17 @@ static void uvStartSendResp(SSrvMsg* smsg) { ...@@ -373,6 +374,17 @@ static void uvStartSendResp(SSrvMsg* smsg) {
uvStartSendRespInternal(smsg); uvStartSendRespInternal(smsg);
return; return;
} }
static void uvNotifyLinkBrokenToApp(SSrvConn* conn) {
STrans* pTransInst = conn->pTransInst;
if (pTransInst->efp != NULL && (*pTransInst->efp)(NULL, conn->inType) && T_REF_VAL_GET(conn) >= 2) {
STransMsg transMsg = {0};
transMsg.msgType = conn->inType;
transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
// transRefSrvHandle(conn);
(*pTransInst->cfp)(pTransInst->parent, &transMsg, 0);
}
}
static void destroySmsg(SSrvMsg* smsg) { static void destroySmsg(SSrvMsg* smsg) {
if (smsg == NULL) { if (smsg == NULL) {
return; return;
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <cstdio> #include <cstdio>
#include <cstring> #include <cstring>
#include "rpcLog.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h" #include "tglobal.h"
#include "tlog.h" #include "tlog.h"
...@@ -48,7 +49,9 @@ static void *ConstructArgForSpecificMsgType(void *parent, tmsg_t msgType) { ...@@ -48,7 +49,9 @@ static void *ConstructArgForSpecificMsgType(void *parent, tmsg_t msgType) {
return NULL; return NULL;
} }
// server except // server except
static void NotifyAppLinkBroken(void *parent, tmsg_t msgType) {} static bool handleExcept(void *parent, tmsg_t msgType) {
return msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP;
}
typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
...@@ -83,6 +86,10 @@ class Client { ...@@ -83,6 +86,10 @@ class Client {
rpcInit_.cfp = cb; rpcInit_.cfp = cb;
this->transCli = rpcOpen(&rpcInit_); this->transCli = rpcOpen(&rpcInit_);
} }
void Stop() {
rpcClose(this->transCli);
this->transCli = NULL;
}
void SetPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) { void SetPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) {
rpcClose(this->transCli); rpcClose(this->transCli);
rpcInit_.pfp = pfp; rpcInit_.pfp = pfp;
...@@ -157,7 +164,7 @@ class Server { ...@@ -157,7 +164,7 @@ class Server {
rpcClose(this->transSrv); rpcClose(this->transSrv);
this->transSrv = NULL; this->transSrv = NULL;
} }
void SetExceptFp(void (*efp)(void *parent, tmsg_t msgType)) { void SetExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) {
this->Stop(); this->Stop();
rpcInit_.efp = efp; rpcInit_.efp = efp;
this->Start(); this->Start();
...@@ -207,6 +214,7 @@ static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -207,6 +214,7 @@ static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
Client *client = (Client *)parent; Client *client = (Client *)parent;
client->SetResp(pMsg); client->SetResp(pMsg);
client->SemPost(); client->SemPost();
tDebug("received resp");
} }
static void initEnv() { static void initEnv() {
...@@ -266,7 +274,7 @@ class TransObj { ...@@ -266,7 +274,7 @@ class TransObj {
cli->SetPAndMFp(pfp, mfp); cli->SetPAndMFp(pfp, mfp);
} }
// call when link broken, and notify query or fetch stop // call when link broken, and notify query or fetch stop
void SetSrvExceptFp(void (*efp)(void *parent, tmsg_t msgType)) { void SetSrvExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) {
//////// ////////
srv->SetExceptFp(efp); srv->SetExceptFp(efp);
} }
...@@ -275,6 +283,10 @@ class TransObj { ...@@ -275,6 +283,10 @@ class TransObj {
srv->SetSrvContinueSend(cfp); srv->SetSrvContinueSend(cfp);
} }
void RestartSrv() { srv->Restart(); } void RestartSrv() { srv->Restart(); }
void cliStop() {
///////
cli->Stop();
}
void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); } void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); }
void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); } void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); }
...@@ -417,20 +429,31 @@ TEST_F(TransEnv, srvContinueSend) { ...@@ -417,20 +429,31 @@ TEST_F(TransEnv, srvContinueSend) {
} }
TEST_F(TransEnv, srvPersistHandleExcept) { TEST_F(TransEnv, srvPersistHandleExcept) {
// conn breken tr->SetSrvContinueSend(processContinueSend);
tr->SetCliPersistFp(cliPersistHandle);
SRpcMsg resp = {0};
for (int i = 0; i < 5; i++) {
SRpcMsg req = {.handle = resp.handle};
req.msgType = 1;
req.pCont = rpcMallocCont(10);
req.contLen = 10;
tr->cliSendAndRecv(&req, &resp);
if (i > 2) {
tr->cliStop();
break;
}
}
taosMsleep(2000);
// conn broken
// //
} }
TEST_F(TransEnv, cliPersistHandleExcept) {
// conn breken
}
TEST_F(TransEnv, multiCliPersisHandleExcept) { TEST_F(TransEnv, multiCliPersistHandleExcept) {
// conn breken // conn broken
}
TEST_F(TransEnv, multiSrvPersisHandleExcept) {
// conn breken
} }
TEST_F(TransEnv, queryExcept) { TEST_F(TransEnv, queryExcept) {
tr->SetSrvExceptFp(handleExcept);
// query and conn is broken // query and conn is broken
} }
TEST_F(TransEnv, noResp) { TEST_F(TransEnv, noResp) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册