diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 6bc28e3ea03719e16f75a0aab98fa9b9228eed1f..af5afb51c5c065dbe5c644bda1bcabba65c36af2 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -38,13 +38,13 @@ typedef struct SRpcConnInfo { typedef struct SRpcMsg { tmsg_t msgType; - tmsg_t expectMsgType; void * pCont; int contLen; int32_t code; - void * handle; // rpc handle returned to app - void * ahandle; // app handle set by client - int noResp; // has response or not(default 0 indicate resp); + void * handle; // rpc handle returned to app + void * ahandle; // app handle set by client + int noResp; // has response or not(default 0, 0: resp, 1: no resp); + int persistHandle; // persist handle or not } SRpcMsg; @@ -69,18 +69,19 @@ typedef struct SRpcInit { // call back to retrieve the client auth info, for server app only int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); - // call back to keep conn or not - bool (*pfp)(void *parent, tmsg_t msgType); - - // to support Send messages multiple times on a link - void *(*mfp)(void *parent, tmsg_t msgType); - - // call back to handle except when query/fetch in progress - bool (*efp)(void *parent, tmsg_t msgType); - void *parent; } SRpcInit; +typedef struct { + void * val; + int32_t len; + void (*free)(void *arg); +} SRpcCtxVal; + +typedef struct { + SHashObj *args; +} SRpcCtx; + int32_t rpcInit(); void rpcCleanup(); void * rpcOpen(const SRpcInit *pRpc); @@ -89,16 +90,17 @@ void * rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void * rpcReallocCont(void *ptr, int contLen); void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); -void rpcSendResponse(const SRpcMsg *pMsg); -void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); -int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); -void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); -int rpcReportProgress(void *pConn, char *pCont, int contLen); -void rpcCancelRequest(int64_t rid); - +void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); + +void rpcSendResponse(const SRpcMsg *pMsg); +void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); +int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); +void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +int rpcReportProgress(void *pConn, char *pCont, int contLen); +void rpcCancelRequest(int64_t rid); +void rpcRegisterBrokenLinkArg(SRpcMsg *msg); // just release client conn to rpc instance, no close sock -void rpcReleaseHandle(void *handle, int8_t type); - +void rpcReleaseHandle(void *handle, int8_t type); // void rpcRefHandle(void *handle, int8_t type); void rpcUnrefHandle(void *handle, int8_t type); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 2233742625e6e733f184bd2c4e7eac376afd9139..24abbd263c7750787c99a3472002c1b993842bd0 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -90,7 +90,6 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.label = "TSC"; rpcInit.numOfThreads = numOfThread; rpcInit.cfp = processMsgFromServer; - rpcInit.pfp = persistConnForSpecificMsg; rpcInit.sessions = tsMaxConnections; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.user = (char *)user; diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 63fbf59c064a91a883a0ae52ad7fb9e1c56b1c55..b39d3e6e374815efc30ee439928f9f574f97f7f5 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -155,6 +155,10 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp .ahandle = (void*)pInfo, .handle = pInfo->msgInfo.handle, .code = 0}; + if (pInfo->msgType == TDMT_VND_QUERY || pInfo->msgType == TDMT_VND_FETCH || + pInfo->msgType == TDMT_VND_QUERY_CONTINUE) { + rpcMsg.persistHandle = 1; + } assert(pInfo->fp != NULL); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 8ea65b193d2911878bd9e74af8c814232c954a27..c861ed350e229fdd0d2a159a1e018c5fc17273ee 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -14,6 +14,10 @@ */ #ifdef USE_UV +#ifdef __cplusplus +extern "C" { +#endif + #include #include "lz4.h" #include "os.h" @@ -121,24 +125,21 @@ typedef struct { } SRpcReqContext; typedef SRpcMsg STransMsg; +typedef SRpcCtx STransCtx; +typedef SRpcCtxVal STransCtxVal; typedef SRpcInfo STrans; typedef SRpcConnInfo STransHandleInfo; typedef struct { - SEpSet epSet; // ip list provided by app - void* ahandle; // handle provided by app - tmsg_t msgType; // message type - uint8_t* pCont; // content provided by app - int32_t contLen; // content length - // int32_t code; // error code - // int16_t numOfTry; // number of try for different servers - // int8_t oldInUse; // server EP inUse passed by app - // int8_t redirect; // flag to indicate redirect + SEpSet epSet; // ip list provided by app + void* ahandle; // handle provided by app + tmsg_t msgType; // message type int8_t connType; // connection type cli/srv int64_t rid; // refId returned by taosAddRef - STransMsg* pRsp; // for synchronous API - tsem_t* pSem; // for synchronous API + STransCtx appCtx; // + STransMsg* pRsp; // for synchronous API + tsem_t* pSem; // for synchronous API int hThrdIdx; char* ip; @@ -150,11 +151,12 @@ typedef struct { typedef struct { char version : 4; // RPC version - char comp : 4; // compression algorithm, 0:no compression 1:lz4 - char resflag : 2; // reserved bits - char spi : 1; // security parameter index + char comp : 2; // compression algorithm, 0:no compression 1:lz4 + char noResp : 2; // noResp bits, 0: resp, 1: resp + char persist : 2; // persist handle,0: no persit, 1: persist handle + char release : 2; char secured : 2; - char encrypt : 3; // encrypt algorithm, 0: no encryption + char spi : 2; uint32_t code; // del later uint32_t msgType; @@ -179,6 +181,9 @@ typedef struct { #pragma pack(pop) +typedef enum { Normal, Quit, Release, Register } STransMsgType; +typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken } ConnStatus; + #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) #define RPC_RESERVE_SIZE (sizeof(STranConnCtx)) @@ -255,9 +260,10 @@ void transUnrefCliHandle(void* handle); void transReleaseCliHandle(void* handle); void transReleaseSrvHandle(void* handle); -void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg); +void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* pCtx); void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp); -void transSendResponse(const STransMsg* pMsg); +void transSendResponse(const STransMsg* msg); +void transRegisterMsg(const STransMsg* msg); int transGetConnInfo(void* thandle, STransHandleInfo* pInfo); void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); @@ -266,4 +272,14 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void transCloseClient(void* arg); void transCloseServer(void* arg); +void transCtxInit(STransCtx* ctx); +void transCtxDestroy(STransCtx* ctx); +void transCtxClear(STransCtx* ctx); +void transCtxMerge(STransCtx* dst, STransCtx* src); +void* transCtxDumpVal(STransCtx* ctx, int32_t key); + +#ifdef __cplusplus +} +#endif + #endif diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index e7393804674341417e6589d5b6f99961d39e856d..139540896092be9b7b3c28f1ba9781b4bd63deca 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -63,9 +63,6 @@ typedef struct { void (*cfp)(void* parent, SRpcMsg*, SEpSet*); int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); - bool (*pfp)(void* parent, tmsg_t msgType); - void* (*mfp)(void* parent, tmsg_t msgType); - bool (*efp)(void* parent, tmsg_t msgType); int32_t refCount; void* parent; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 2cab03f133620fce7d2129f251320946bcc010ea..317f80c48d00b0da50505a059e56b3ea05ff309a 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -39,9 +39,6 @@ void* rpcOpen(const SRpcInit* pInit) { // register callback handle pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; - pRpc->pfp = pInit->pfp; - pRpc->mfp = pInit->mfp; - pRpc->efp = pInit->efp; if (pInit->connType == TAOS_CONN_SERVER) { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; @@ -121,7 +118,12 @@ void rpcCancelRequest(int64_t rid) { return; } void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); uint32_t port = pEpSet->eps[pEpSet->inUse].port; - transSendRequest(shandle, ip, port, pMsg); + transSendRequest(shandle, ip, port, pMsg, NULL); +} +void rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) { + char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); + uint32_t port = pEpSet->eps[pEpSet->inUse].port; + transSendRequest(shandle, ip, port, pMsg, pCtx); } void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); @@ -142,6 +144,7 @@ void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); } +void rpcRegisterBrokenLinkArg(SRpcMsg* msg) { rpcSendResponse(msg); } void rpcReleaseHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); (*transReleaseHandle[type])(handle); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index df35b8d67b2e2b0c91c154c4ff3d7cb951d0d8b1..842b5a1b4b7839ce3edcf63e8cb2cd9fc9fc445a 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -17,11 +17,6 @@ #include "transComm.h" -// Normal(default): send/recv msg -// Quit: quit rpc inst -// Release: release handle to rpc inst -typedef enum { Normal, Quit, Release } SCliMsgType; - typedef struct SCliConn { T_REF_DECLARE() uv_connect_t connReq; @@ -35,8 +30,10 @@ typedef struct SCliConn { uint64_t expireTime; int hThrdIdx; bool broken; // link broken or not + STransCtx ctx; - int persist; // + ConnStatus status; // + int release; // 1: release // spi configure char spi; char secured; @@ -55,7 +52,7 @@ typedef struct SCliMsg { STransMsg msg; queue q; uint64_t st; - SCliMsgType type; + STransMsgType type; } SCliMsg; typedef struct SCliThrdObj { @@ -113,10 +110,12 @@ static void cliSend(SCliConn* pConn); static void cliHandleResp(SCliConn* conn); // handle except about conn static void cliHandleExcept(SCliConn* conn); + // handle req from app static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrdObj* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease}; static void cliSendQuit(SCliThrdObj* thrd); static void destroyUserdata(STransMsg* userdata); @@ -133,6 +132,20 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_PERSIST_TIME(para) (para * 1000 * 10) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label) +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + conn->status = ConnRelease; \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + if (T_REF_VAL_GET(conn) == 1) { \ + SCliThrdObj* thrd = conn->hostThrd; \ + addConnToPool(thrd->pool, conn); \ + } \ + goto _RETURN; \ + } \ + } while (0) + #define CONN_HANDLE_THREAD_QUIT(conn, thrd) \ do { \ if (thrd->quit) { \ @@ -151,14 +164,16 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_SET_PERSIST_BY_APP(conn) \ do { \ - if (conn->persist == false) { \ - conn->persist = true; \ + if (conn->status == ConnNormal) { \ + conn->status = ConnAcquire; \ transRefCliHandle(conn); \ } \ } while (0) -#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false) +#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->status == ConnNormal && T_REF_VAL_GET(conn) == 1) #define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) +#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) +#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release) static void* cliWorkThread(void* arg); @@ -177,7 +192,6 @@ void cliHandleResp(SCliConn* conn) { STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); - STransMsg transMsg = {0}; transMsg.contLen = transContLenFromMsg(pHead->msgLen); transMsg.pCont = transContFromHead((char*)pHead); @@ -185,6 +199,8 @@ void cliHandleResp(SCliConn* conn) { transMsg.msgType = pHead->msgType; transMsg.ahandle = NULL; + CONN_SHOULD_RELEASE(conn, pHead); + SCliMsg* pMsg = NULL; if (taosArrayGetSize(conn->cliMsgs) > 0) { pMsg = taosArrayGetP(conn->cliMsgs, 0); @@ -193,16 +209,15 @@ void cliHandleResp(SCliConn* conn) { STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { - transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL; + transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; } // buf's mem alread translated to transMsg.pCont transClearBuffer(&conn->readBuf); - if (pTransInst->pfp != NULL && (*pTransInst->pfp)(pTransInst->parent, transMsg.msgType)) { + if (!CONN_NO_PERSIST_BY_APP(conn)) { transMsg.handle = conn; - CONN_SET_PERSIST_BY_APP(conn); tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); } @@ -241,6 +256,8 @@ void cliHandleResp(SCliConn* conn) { if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) { // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } +_RETURN: + return; } void cliHandleExcept(SCliConn* pConn) { @@ -268,7 +285,7 @@ void cliHandleExcept(SCliConn* pConn) { transMsg.ahandle = NULL; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { - transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL; + transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; } @@ -359,6 +376,7 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { static void addConnToPool(void* pool, SCliConn* conn) { char key[128] = {0}; + transCtxDestroy(&conn->ctx); tstrncpy(key, conn->ip, strlen(conn->ip)); tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port)); tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); @@ -367,6 +385,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + conn->status = ConnNormal; // list already create before assert(plist != NULL); QUEUE_PUSH(&plist->conn, &conn->conn); @@ -420,16 +439,16 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { conn->writeReq.data = conn; conn->connReq.data = conn; conn->cliMsgs = taosArrayInit(2, sizeof(void*)); - QUEUE_INIT(&conn->conn); conn->hostThrd = pThrd; - conn->persist = false; - conn->broken = false; + conn->status = ConnNormal; + conn->broken = 0; transRefCliHandle(conn); return conn; } static void cliDestroyConn(SCliConn* conn, bool clear) { tTrace("%s cli conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); + QUEUE_REMOVE(&conn->conn); if (clear) { uv_close((uv_handle_t*)conn->stream, cliDestroy); @@ -439,6 +458,7 @@ static void cliDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; free(conn->ip); free(conn->stream); + transCtxDestroy(&conn->ctx); taosArrayDestroy(conn->cliMsgs); tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); free(conn); @@ -490,7 +510,10 @@ void cliSend(SCliConn* pConn) { STrans* pTransInst = pThrd->pTransInst; STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); - + if (pMsg->pCont == 0) { + pMsg->pCont = (void*)rpcMallocCont(0); + pMsg->contLen = 0; + } STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); int msgLen = transMsgLenFromCont(pMsg->contLen); @@ -513,15 +536,22 @@ void cliSend(SCliConn* pConn) { msgLen += sizeof(STransUserMsg); } - pHead->resflag = REQUEST_NO_RESP(pMsg) ? 1 : 0; + pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0; + + pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0; pHead->msgType = pMsg->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + if (pHead->persist == 1) { + CONN_SET_PERSIST_BY_APP(pConn); + } + pConn->writeReq.data = pConn; uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); @@ -562,22 +592,13 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { } static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = pMsg->msg.handle; - tDebug("%s cli conn %p release to inst", CONN_GET_INST_LABEL(conn), conn); + tDebug("%s cli conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); - while (taosArrayGetSize(conn->cliMsgs) > 0) { - SCliMsg* pMsg = taosArrayGetP(conn->cliMsgs, 0); - destroyCmsg(pMsg); - taosArrayRemove(conn->cliMsgs, 0); - } - - transDestroyBuffer(&conn->readBuf); - if (conn->persist && T_REF_VAL_GET(conn) >= 2) { - conn->persist = false; - transUnrefCliHandle(conn); - addConnToPool(pThrd->pool, conn); - } else { - transUnrefCliHandle(conn); + taosArrayPush(conn->cliMsgs, &pMsg); + if (taosArrayGetSize(conn->cliMsgs) >= 2) { + return; // send one by one } + cliSend(conn); } SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { @@ -609,10 +630,12 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { if (conn != NULL) { conn->hThrdIdx = pCtx->hThrdIdx; + transCtxMerge(&conn->ctx, &pCtx->appCtx); if (taosArrayGetSize(conn->cliMsgs) > 0) { taosArrayPush(conn->cliMsgs, &pMsg); return; } + taosArrayPush(conn->cliMsgs, &pMsg); transDestroyBuffer(&conn->readBuf); cliSend(conn); @@ -652,14 +675,10 @@ static void cliAsyncCb(uv_async_t* handle) { QUEUE_REMOVE(h); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - - if (pMsg->type == Normal) { - cliHandleReq(pMsg, pThrd); - } else if (pMsg->type == Quit) { - cliHandleQuit(pMsg, pThrd); - } else if (pMsg->type == Release) { - cliHandleRelease(pMsg, pThrd); + if (pMsg == NULL) { + continue; } + (*cliAsyncHandle[pMsg->type])(pMsg, pThrd); count++; } if (count >= 2) { @@ -802,13 +821,13 @@ void transReleaseCliHandle(void* handle) { STransMsg tmsg = {.handle = handle}; SCliMsg* cmsg = calloc(1, sizeof(SCliMsg)); - cmsg->type = Release; cmsg->msg = tmsg; + cmsg->type = Release; transSendAsync(thrd->asyncPool, &cmsg->q); } -void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg) { +void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* ctx) { STrans* pTransInst = (STrans*)shandle; int index = CONN_HOST_THREAD_INDEX((SCliConn*)pMsg->handle); if (index == -1) { @@ -818,7 +837,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { // imp later } - tDebug("send request at thread:%d %p", index, pMsg); + tDebug("send request at thread:%d %p, dst: %s:%d", index, pMsg, ip, port); STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); pCtx->ahandle = pMsg->ahandle; pCtx->msgType = pMsg->msgType; @@ -826,6 +845,10 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p pCtx->port = port; pCtx->hThrdIdx = index; + if (ctx != NULL) { + pCtx->appCtx = *ctx; + } + assert(pTransInst->connType == TAOS_CONN_CLIENT); // atomic or not @@ -833,10 +856,12 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p cliMsg->ctx = pCtx; cliMsg->msg = *pMsg; cliMsg->st = taosGetTimestampUs(); + cliMsg->type = Normal; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); } + void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) { STrans* pTransInst = (STrans*)shandle; int index = CONN_HOST_THREAD_INDEX(pReq->handle); @@ -858,6 +883,7 @@ void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq cliMsg->ctx = pCtx; cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); + cliMsg->type = Normal; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 7123593a33bc5e64f28f9763ec4429e1531167fc..2c90efc3aa5d236a3bb65d1063f595bc3e9b72ff 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -155,9 +155,9 @@ bool transReadComplete(SConnBuffer* connBuf) { } return false; } -int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) {return 0;} +int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) { return 0; } -int transUnpackMsg(STransMsgHead* msgHead) {return 0;} +int transUnpackMsg(STransMsgHead* msgHead) { return 0; } int transDestroyBuffer(SConnBuffer* buf) { if (buf->cap > 0) { tfree(buf->buf); @@ -224,4 +224,56 @@ int transSendAsync(SAsyncPool* pool, queue* q) { return uv_async_send(async); } +void transCtxInit(STransCtx* ctx) { + // init transCtx + ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK); +} +void transCtxDestroy(STransCtx* ctx) { + if (ctx->args == NULL) { + return; + } + + STransCtxVal* iter = taosHashIterate(ctx->args, NULL); + while (iter) { + iter->free(iter->val); + iter = taosHashIterate(ctx->args, iter); + } + taosHashCleanup(ctx->args); +} + +void transCtxMerge(STransCtx* dst, STransCtx* src) { + if (dst->args == NULL) { + dst->args = src->args; + src->args = NULL; + return; + } + void* key = NULL; + size_t klen = 0; + void* iter = taosHashIterate(src->args, NULL); + while (iter) { + STransCtxVal* sVal = (STransCtxVal*)iter; + key = taosHashGetKey(sVal, &klen); + + STransCtxVal* dVal = taosHashGet(dst->args, key, klen); + if (dVal) { + dVal->free(dVal->val); + } + taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal)); + iter = taosHashIterate(src->args, iter); + } + taosHashCleanup(src->args); +} +void* transCtxDumpVal(STransCtx* ctx, int32_t key) { + if (ctx->args == NULL) { + return NULL; + } + STransCtxVal* cVal = taosHashGet(ctx->args, (const void*)&key, sizeof(key)); + if (cVal == NULL) { + return NULL; + } + char* ret = calloc(1, cVal->len); + memcpy(ret, (char*)cVal->val, cVal->len); + return (void*)ret; +} + #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 73d836319fcabd96fc42790b391fb39d8a6a154d..108b12542c1b89f5672f60f5242d8f98cd6baf70 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -17,6 +17,12 @@ #include "transComm.h" +typedef struct { + int notifyCount; // + int init; // init or not + STransMsg msg; +} SSrvRegArg; + typedef struct SSrvConn { T_REF_DECLARE() uv_tcp_t* pTcp; @@ -33,8 +39,10 @@ typedef struct SSrvConn { void* hostThrd; SArray* srvMsgs; - bool broken; // conn broken; + SSrvRegArg regArg; + bool broken; // conn broken; + ConnStatus status; struct sockaddr_in addr; struct sockaddr_in locaddr; @@ -47,18 +55,18 @@ typedef struct SSrvConn { } SSrvConn; typedef struct SSrvMsg { - SSrvConn* pConn; - STransMsg msg; - queue q; + SSrvConn* pConn; + STransMsg msg; + queue q; + STransMsgType type; } SSrvMsg; typedef struct SWorkThrdObj { - pthread_t thread; - uv_pipe_t* pipe; - uv_os_fd_t fd; - uv_loop_t* loop; - SAsyncPool* asyncPool; - + pthread_t thread; + uv_pipe_t* pipe; + uv_os_fd_t fd; + uv_loop_t* loop; + SAsyncPool* asyncPool; queue msg; pthread_mutex_t msgMtx; @@ -85,6 +93,15 @@ typedef struct SServerObj { static const char* notify = "a"; +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + conn->status = ConnRelease; \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + goto _RETURE; \ + } \ + } while (0) // refactor later static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen); @@ -113,6 +130,13 @@ static void destroySmsg(SSrvMsg* smsg); static SSrvConn* createConn(void* hThrd); static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/); +static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd); +static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); +static void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd); +static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd); +static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleSendResp, uvHandleQuit, uvHandleRelease, + uvHandleRegister}; + static void uvDestroyConn(uv_handle_t* handle); // server and worker thread @@ -217,8 +241,8 @@ static void uvHandleReq(SSrvConn* pConn) { if (pHead->secured == 1) { pHead->msgLen -= sizeof(STransUserMsg); } - // } + CONN_SHOULD_RELEASE(pConn, pHead); STransMsg transMsg; transMsg.contLen = transContLenFromMsg(pHead->msgLen); @@ -230,24 +254,34 @@ static void uvHandleReq(SSrvConn* pConn) { transClearBuffer(&pConn->readBuf); pConn->inType = pHead->msgType; - - if (pHead->resflag == 0) { + if (pConn->status == ConnNormal) { + if (pHead->persist == 1) { + pConn->status = ConnAcquire; + transRefSrvHandle(pConn); + } + } + if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); - transMsg.handle = pConn; tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen); } else { - tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, no resp ", pConn, + tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), - taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen); + taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp); + // no ref here + } + + if (pHead->noResp == 0) { + transMsg.handle = pConn; } STrans* pTransInst = (STrans*)p->shandle; (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth - // validate msg type +_RETURE: + return; } void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { @@ -272,11 +306,13 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { tError("server conn %p read error: %s", conn, uv_err_name(nread)); if (nread < 0) { conn->broken = true; - uvNotifyLinkBrokenToApp(conn); - - // STrans* pTransInst = conn->pTransInst; - // if (pTransInst->efp != NULL && (pTransInst->efp)(NULL, conn->inType)) { - //} + if (conn->status == ConnAcquire) { + if (conn->regArg.init) { + STrans* pTransInst = conn->pTransInst; + (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); + memset(&conn->regArg, 0, sizeof(conn->regArg)); + } + } transUnrefSrvHandle(conn); } } @@ -301,8 +337,22 @@ void uvOnSendCb(uv_write_t* req, int status) { SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0); tTrace("server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs)); taosArrayRemove(conn->srvMsgs, 0); + if (msg->type == Release && conn->status != ConnNormal) { + conn->status = ConnNormal; + transUnrefSrvHandle(conn); + } else if (msg->type == Register && conn->status == ConnAcquire) { + conn->regArg.notifyCount = 0; + conn->regArg.init = 1; + conn->regArg.msg = msg->msg; + if (conn->broken) { + STrans* pTransInst = conn->pTransInst; + (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); + memset(&conn->regArg, 0, sizeof(conn->regArg)); + } + free(msg); + return; + } destroySmsg(msg); - // send second data, just use for push if (taosArrayGetSize(conn->srvMsgs) > 0) { tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs)); @@ -312,7 +362,7 @@ void uvOnSendCb(uv_write_t* req, int status) { } } else { tError("server conn %p failed to write data, %s", conn, uv_err_name(status)); - conn->broken = false; + conn->broken = true; transUnrefSrvHandle(conn); } } @@ -339,6 +389,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { pHead->secured = pMsg->code == 0 ? 1 : 0; // pHead->msgType = smsg->pConn->inType + 1; + pHead->release = smsg->type == Release ? 1 : 0; pHead->code = htonl(pMsg->code); // add more info char* msg = (char*)pHead; @@ -368,13 +419,16 @@ static void uvStartSendResp(SSrvMsg* smsg) { SSrvConn* pConn = smsg->pConn; if (pConn->broken == true) { + // persist by transUnrefSrvHandle(pConn); return; } - transUnrefSrvHandle(pConn); + if (pConn->status == ConnNormal) { + transUnrefSrvHandle(pConn); + } if (taosArrayGetSize(pConn->srvMsgs) > 0) { - tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, taosInetNtoa(pConn->addr.sin_addr), + tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); taosArrayPush(pConn->srvMsgs, &smsg); return; @@ -384,16 +438,6 @@ static void uvStartSendResp(SSrvMsg* smsg) { return; } -static void uvNotifyLinkBrokenToApp(SSrvConn* conn) { - STrans* pTransInst = conn->pTransInst; - if (pTransInst->efp != NULL && (*pTransInst->efp)(NULL, conn->inType) && T_REF_VAL_GET(conn) >= 2) { - STransMsg transMsg = {0}; - transMsg.msgType = conn->inType; - transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - // transRefSrvHandle(conn); - (*pTransInst->cfp)(pTransInst->parent, &transMsg, 0); - } -} static void destroySmsg(SSrvMsg* smsg) { if (smsg == NULL) { return; @@ -408,6 +452,9 @@ static void destroyAllConn(SWorkThrdObj* pThrd) { QUEUE_INIT(h); SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue); + while (T_REF_VAL_GET(c) >= 2) { + transUnrefSrvHandle(c); + } transUnrefSrvHandle(c); } } @@ -431,20 +478,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) { tError("unexcept occurred, continue"); continue; } - if (msg->pConn == NULL) { - free(msg); - bool noConn = QUEUE_IS_EMPTY(&pThrd->conn); - if (noConn == true) { - uv_loop_close(pThrd->loop); - uv_stop(pThrd->loop); - } else { - destroyAllConn(pThrd); - // uv_loop_close(pThrd->loop); - pThrd->quit = true; - } - } else { - uvStartSendResp(msg); - } + (*transAsyncHandle[msg->type])(msg, pThrd); } } static void uvAcceptAsyncCb(uv_async_t* async) { @@ -632,7 +666,9 @@ static SSrvConn* createConn(void* hThrd) { pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); // tTrace("conn %p created", pConn); + memset(&pConn->regArg, 0, sizeof(pConn->regArg)); pConn->broken = false; + pConn->status = ConnNormal; transRefSrvHandle(pConn); return pConn; @@ -748,7 +784,58 @@ End: transCloseServer(srv); return NULL; } - +void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { + if (QUEUE_IS_EMPTY(&thrd->conn)) { + uv_loop_close(thrd->loop); + uv_stop(thrd->loop); + } else { + destroyAllConn(thrd); + thrd->quit = true; + } + free(msg); +} +void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { + // release handle to rpc init + SSrvConn* conn = msg->pConn; + if (conn->status == ConnAcquire) { + if (taosArrayGetSize(conn->srvMsgs) > 0) { + taosArrayPush(conn->srvMsgs, &msg); + return; + } + taosArrayPush(conn->srvMsgs, &msg); + uvStartSendRespInternal(msg); + return; + } else if (conn->status == ConnRelease) { + // already release by server app, do nothing + } else if (conn->status == ConnNormal) { + // no nothing + // user should not call this rpcRelease handle; + } + free(msg); +} +void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd) { + // send msg to client + uvStartSendResp(msg); +} +void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) { + SSrvConn* conn = msg->pConn; + if (conn->status == ConnAcquire) { + if (taosArrayGetSize(conn->srvMsgs) > 0) { + taosArrayPush(conn->srvMsgs, &msg); + return; + } + conn->regArg.notifyCount = 0; + conn->regArg.init = 1; + conn->regArg.msg = msg->msg; + + if (conn->broken) { + STrans* pTransInst = conn->pTransInst; + (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); + memset(&conn->regArg, 0, sizeof(conn->regArg)); + } + free(msg); + } +} void destroyWorkThrd(SWorkThrdObj* pThrd) { if (pThrd == NULL) { return; @@ -759,10 +846,10 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { free(pThrd); } void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { - SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); + SSrvMsg* msg = calloc(1, sizeof(SSrvMsg)); + msg->type = Quit; tDebug("server send quit msg to work thread"); - - transSendAsync(pThrd->asyncPool, &srvMsg->q); + transSendAsync(pThrd->asyncPool, &msg->q); } void transCloseServer(void* arg) { @@ -813,8 +900,21 @@ void transUnrefSrvHandle(void* handle) { } void transReleaseSrvHandle(void* handle) { - // do nothing currently - // + if (handle == NULL) { + return; + } + SSrvConn* pConn = handle; + SWorkThrdObj* pThrd = pConn->hostThrd; + + STransMsg tmsg = {.handle = handle, .code = 0}; + + SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); + srvMsg->msg = tmsg; + srvMsg->type = Release; + srvMsg->pConn = pConn; + + tTrace("server conn %p start to release", pConn); + transSendAsync(pThrd->asyncPool, &srvMsg->q); } void transSendResponse(const STransMsg* pMsg) { if (pMsg->handle == NULL) { @@ -826,6 +926,21 @@ void transSendResponse(const STransMsg* pMsg) { SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); srvMsg->pConn = pConn; srvMsg->msg = *pMsg; + srvMsg->type = Normal; + tTrace("server conn %p start to send resp", pConn); + transSendAsync(pThrd->asyncPool, &srvMsg->q); +} +void transRegisterMsg(const STransMsg* msg) { + if (msg->handle == NULL) { + return; + } + SSrvConn* pConn = msg->handle; + SWorkThrdObj* pThrd = pConn->hostThrd; + + SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); + srvMsg->pConn = pConn; + srvMsg->msg = *msg; + srvMsg->type = Register; tTrace("server conn %p start to send resp", pConn); transSendAsync(pThrd->asyncPool, &srvMsg->q); } diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index ec89d695a2bac264b4ee1f844c3baa7752c046f5..deccd633d8a0e929a0a72fbfdaf07170814a6d7e 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -31,11 +31,6 @@ class Server; int port = 7000; // server process -static bool cliPersistHandle(void *parent, tmsg_t msgType) { - // client persist handle - return msgType == 2 || msgType == 4; -} - typedef struct CbArgs { tmsg_t msgType; } CbArgs; @@ -91,21 +86,8 @@ class Client { rpcClose(this->transCli); this->transCli = NULL; } - void SetPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) { - rpcClose(this->transCli); - rpcInit_.pfp = pfp; - this->transCli = rpcOpen(&rpcInit_); - } void SetConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) { rpcClose(this->transCli); - rpcInit_.mfp = mfp; - this->transCli = rpcOpen(&rpcInit_); - } - void SetPAndMFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { - rpcClose(this->transCli); - - rpcInit_.pfp = pfp; - rpcInit_.mfp = mfp; this->transCli = rpcOpen(&rpcInit_); } @@ -149,7 +131,6 @@ class Server { rpcInit_.label = (char *)label; rpcInit_.numOfThreads = 5; rpcInit_.cfp = processReq; - rpcInit_.efp = NULL; rpcInit_.user = (char *)user; rpcInit_.secret = (char *)secret; rpcInit_.ckey = (char *)ckey; @@ -165,11 +146,6 @@ class Server { rpcClose(this->transSrv); this->transSrv = NULL; } - void SetExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) { - this->Stop(); - rpcInit_.efp = efp; - this->Start(); - } void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { this->Stop(); rpcInit_.cfp = cfp; @@ -262,23 +238,11 @@ class TransObj { // srv->Stop(); } - void SetCliPersistFp(bool (*pfp)(void *parent, tmsg_t msgType)) { - // do nothing - cli->SetPersistFP(pfp); - } void SetCliMFp(void *(*mfp)(void *parent, tmsg_t msgType)) { // do nothing cli->SetConstructFP(mfp); } - void SetCliMAndPFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { - // do nothing - cli->SetPAndMFp(pfp, mfp); - } // call when link broken, and notify query or fetch stop - void SetSrvExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) { - //////// - srv->SetExceptFp(efp); - } void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { /////// srv->SetSrvContinueSend(cfp); @@ -358,10 +322,10 @@ TEST_F(TransEnv, clientUserDefined) { } TEST_F(TransEnv, cliPersistHandle) { - tr->SetCliPersistFp(cliPersistHandle); + // tr->SetCliPersistFp(cliPersistHandle); SRpcMsg resp = {0}; for (int i = 0; i < 10; i++) { - SRpcMsg req = {.handle = resp.handle, .noResp = 0}; + SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -378,31 +342,22 @@ TEST_F(TransEnv, cliPersistHandle) { } TEST_F(TransEnv, cliReleaseHandle) { - tr->SetCliPersistFp(cliPersistHandle); - SRpcMsg resp = {0}; for (int i = 0; i < 10; i++) { - SRpcMsg req = {.handle = resp.handle}; + SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; tr->cliSendAndRecvNoHandle(&req, &resp); - // if (i == 5) { - // std::cout << "stop server" << std::endl; - // tr->StopSrv(); - //} - // if (i >= 6) { EXPECT_TRUE(resp.code == 0); //} } ////////////////// } TEST_F(TransEnv, cliReleaseHandleExcept) { - tr->SetCliPersistFp(cliPersistHandle); - SRpcMsg resp = {0}; for (int i = 0; i < 10; i++) { - SRpcMsg req = {.handle = resp.handle}; + SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; @@ -431,7 +386,7 @@ TEST_F(TransEnv, srvContinueSend) { TEST_F(TransEnv, srvPersistHandleExcept) { tr->SetSrvContinueSend(processContinueSend); - tr->SetCliPersistFp(cliPersistHandle); + // tr->SetCliPersistFp(cliPersistHandle); SRpcMsg resp = {0}; for (int i = 0; i < 5; i++) { SRpcMsg req = {.handle = resp.handle}; @@ -450,7 +405,6 @@ TEST_F(TransEnv, srvPersistHandleExcept) { } TEST_F(TransEnv, cliPersistHandleExcept) { tr->SetSrvContinueSend(processContinueSend); - tr->SetCliPersistFp(cliPersistHandle); SRpcMsg resp = {0}; for (int i = 0; i < 5; i++) { SRpcMsg req = {.handle = resp.handle}; @@ -472,7 +426,7 @@ TEST_F(TransEnv, multiCliPersistHandleExcept) { // conn broken } TEST_F(TransEnv, queryExcept) { - tr->SetSrvExceptFp(handleExcept); + // tr->SetSrvExceptFp(handleExcept); // query and conn is broken } diff --git a/source/libs/transport/test/transportTests.cc b/source/libs/transport/test/transportTests.cc index 53910aa30c1d7bcea1ce0d6dc46f1bb579c3c3c1..1f8c8e8ff20d3ae57d120e02dfd04c8e5a84850d 100644 --- a/source/libs/transport/test/transportTests.cc +++ b/source/libs/transport/test/transportTests.cc @@ -136,4 +136,98 @@ TEST_F(QueueEnv, testIter) { assert(result.size() == vals.size()); } +class TransCtxEnv : public ::testing::Test { + protected: + virtual void SetUp() { + ctx = (STransCtx *)calloc(1, sizeof(STransCtx)); + transCtxInit(ctx); + // TODO + } + virtual void TearDown() { + transCtxDestroy(ctx); + // formate + } + STransCtx *ctx; +}; + +TEST_F(TransCtxEnv, mergeTest) { + int key = 1; + { + STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx)); + transCtxInit(src); + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = malloc(12); + val1.len = 12; + + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = malloc(12); + val1.len = 12; + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + transCtxMerge(ctx, src); + free(src); + } + EXPECT_EQ(2, taosHashGetSize(ctx->args)); + { + STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx)); + transCtxInit(src); + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = malloc(12); + val1.len = 12; + + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = malloc(12); + val1.len = 12; + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + transCtxMerge(ctx, src); + free(src); + } + std::string val("Hello"); + EXPECT_EQ(4, taosHashGetSize(ctx->args)); + { + key = 1; + STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx)); + transCtxInit(src); + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = calloc(1, 11); + memcpy(val1.val, val.c_str(), val.size()); + val1.len = 11; + + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + { + STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + val1.val = calloc(1, 11); + memcpy(val1.val, val.c_str(), val.size()); + val1.len = 11; + taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); + key++; + } + transCtxMerge(ctx, src); + free(src); + } + EXPECT_EQ(4, taosHashGetSize(ctx->args)); + + char *skey = (char *)transCtxDumpVal(ctx, 1); + EXPECT_EQ(0, strcmp(skey, val.c_str())); + free(skey); + + skey = (char *)transCtxDumpVal(ctx, 2); + EXPECT_EQ(0, strcmp(skey, val.c_str())); +} #endif