diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index b5b8d6ab66c5a9ac947638edc8d3243cd28d6b6b..5795cdd91921fd4ad8c21dfda492b65282999df4 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -73,7 +73,7 @@ typedef struct SRpcInit { void *(*mfp)(void *parent, tmsg_t msgType); // call back to handle except when query/fetch in progress - void (*efp)(void *parent, tmsg_t msgType); + bool (*efp)(void *parent, tmsg_t msgType); void *parent; } SRpcInit; diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index e760fe1de949133cae86425247a920c52630b282..e7393804674341417e6589d5b6f99961d39e856d 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -65,7 +65,7 @@ typedef struct { int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); bool (*pfp)(void* parent, tmsg_t msgType); void* (*mfp)(void* parent, tmsg_t msgType); - void (*efp)(void* parent, tmsg_t msgType); + bool (*efp)(void* parent, tmsg_t msgType); int32_t refCount; void* parent; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 960e064b8fbce0a374b9ee58be44c47971c63d9f..59bc85e91d22923441a136f6e9e6a2c1f6cf3529 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -106,6 +106,8 @@ static void uvStartSendRespInternal(SSrvMsg* smsg); static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb); static void uvStartSendResp(SSrvMsg* msg); +static void uvNotifyLinkBrokenToApp(SSrvConn* conn); + static void destroySmsg(SSrvMsg* smsg); // check whether already read complete packet static SSrvConn* createConn(void* hThrd); @@ -233,7 +235,7 @@ static void uvHandleReq(SSrvConn* pConn) { ntohs(pConn->locaddr.sin_port), transMsg.contLen); STrans* pTransInst = (STrans*)p->shandle; - (*((STrans*)p->shandle)->cfp)(pTransInst->parent, &transMsg, NULL); + (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth // validate msg type @@ -261,13 +263,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { tError("server conn %p read error: %s", conn, uv_err_name(nread)); if (nread < 0) { conn->broken = true; - transUnrefSrvHandle(conn); + uvNotifyLinkBrokenToApp(conn); - // if (conn->ref > 1) { - // conn->ref++; // ref > 1 signed that write is in progress + // STrans* pTransInst = conn->pTransInst; + // if (pTransInst->efp != NULL && (pTransInst->efp)(NULL, conn->inType)) { //} - // tError("server conn %p read error: %s", conn, uv_err_name(nread)); - // destroyConn(conn, true); + transUnrefSrvHandle(conn); } } void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { @@ -373,6 +374,17 @@ static void uvStartSendResp(SSrvMsg* smsg) { uvStartSendRespInternal(smsg); return; } + +static void uvNotifyLinkBrokenToApp(SSrvConn* conn) { + STrans* pTransInst = conn->pTransInst; + if (pTransInst->efp != NULL && (*pTransInst->efp)(NULL, conn->inType) && T_REF_VAL_GET(conn) >= 2) { + STransMsg transMsg = {0}; + transMsg.msgType = conn->inType; + transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + // transRefSrvHandle(conn); + (*pTransInst->cfp)(pTransInst->parent, &transMsg, 0); + } +} static void destroySmsg(SSrvMsg* smsg) { if (smsg == NULL) { return; diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index 46f6424b0b79d22c1e286f30f7b1b19d1dea4f95..b3fbade050eea96917dc1e7430af3d95630e8316 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -15,6 +15,7 @@ #include #include #include +#include "rpcLog.h" #include "tdatablock.h" #include "tglobal.h" #include "tlog.h" @@ -48,7 +49,9 @@ static void *ConstructArgForSpecificMsgType(void *parent, tmsg_t msgType) { return NULL; } // server except -static void NotifyAppLinkBroken(void *parent, tmsg_t msgType) {} +static bool handleExcept(void *parent, tmsg_t msgType) { + return msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP; +} typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); @@ -83,6 +86,10 @@ class Client { rpcInit_.cfp = cb; this->transCli = rpcOpen(&rpcInit_); } + void Stop() { + rpcClose(this->transCli); + this->transCli = NULL; + } void SetPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) { rpcClose(this->transCli); rpcInit_.pfp = pfp; @@ -157,7 +164,7 @@ class Server { rpcClose(this->transSrv); this->transSrv = NULL; } - void SetExceptFp(void (*efp)(void *parent, tmsg_t msgType)) { + void SetExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) { this->Stop(); rpcInit_.efp = efp; this->Start(); @@ -207,6 +214,7 @@ static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { Client *client = (Client *)parent; client->SetResp(pMsg); client->SemPost(); + tDebug("received resp"); } static void initEnv() { @@ -266,7 +274,7 @@ class TransObj { cli->SetPAndMFp(pfp, mfp); } // call when link broken, and notify query or fetch stop - void SetSrvExceptFp(void (*efp)(void *parent, tmsg_t msgType)) { + void SetSrvExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) { //////// srv->SetExceptFp(efp); } @@ -275,6 +283,10 @@ class TransObj { srv->SetSrvContinueSend(cfp); } void RestartSrv() { srv->Restart(); } + void cliStop() { + /////// + cli->Stop(); + } void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); } void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); } @@ -417,20 +429,31 @@ TEST_F(TransEnv, srvContinueSend) { } TEST_F(TransEnv, srvPersistHandleExcept) { - // conn breken + tr->SetSrvContinueSend(processContinueSend); + tr->SetCliPersistFp(cliPersistHandle); + SRpcMsg resp = {0}; + for (int i = 0; i < 5; i++) { + SRpcMsg req = {.handle = resp.handle}; + req.msgType = 1; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecv(&req, &resp); + if (i > 2) { + tr->cliStop(); + break; + } + } + taosMsleep(2000); + // conn broken // } -TEST_F(TransEnv, cliPersistHandleExcept) { - // conn breken -} -TEST_F(TransEnv, multiCliPersisHandleExcept) { - // conn breken -} -TEST_F(TransEnv, multiSrvPersisHandleExcept) { - // conn breken +TEST_F(TransEnv, multiCliPersistHandleExcept) { + // conn broken } TEST_F(TransEnv, queryExcept) { + tr->SetSrvExceptFp(handleExcept); + // query and conn is broken } TEST_F(TransEnv, noResp) {