diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 5afafa08a38e8ce1c6acf69b527184582a0c764d..f913ba06d090f5e651364952153283e2bdff0f74 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -29,6 +29,8 @@ extern "C" { extern int tsRpcHeadSize; +typedef struct SRpcPush SRpcPush; + typedef struct SRpcConnInfo { uint32_t clientIp; uint16_t clientPort; @@ -43,8 +45,17 @@ typedef struct SRpcMsg { int32_t code; void * handle; // rpc handle returned to app void * ahandle; // app handle set by client + int persist; // keep handle or not, default 0 + + SRpcPush *push; + } SRpcMsg; +typedef struct SRpcPush { + void *arg; + int (*callback)(void *arg, SRpcMsg *rpcMsg); +} SRpcPush; + typedef struct SRpcInit { uint16_t localPort; // local port char * label; // for debug purpose @@ -83,6 +94,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); + #ifdef __cplusplus } #endif diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index decd0af484737f943ae087a7658071ed76a7ab3c..f957f3b695764f238992e9d6f9ab1c2de7289f2a 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -27,12 +27,16 @@ typedef struct SCliConn { SConnBuffer readBuf; void* data; queue conn; - char spi; - char secured; uint64_t expireTime; int8_t notifyCount; // timers already notify to client - int32_t ref; + SRpcPush* push; + int persist; // + // spi configure + char spi; + char secured; + int32_t ref; + // debug and log info struct sockaddr_in addr; } SCliConn; @@ -128,13 +132,18 @@ static void clientHandleResp(SCliConn* conn) { tDebug("client conn %p %s received from %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port)); - if (pCtx->pSem == NULL) { - tTrace("client conn(sync) %p handle resp", conn); - (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL); + + if (conn->push != NULL) { + (*conn->push->callback)(conn->push->arg, &rpcMsg); } else { - tTrace("client conn(sync) %p handle resp", conn); - memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); - tsem_post(pCtx->pSem); + if (pCtx->pSem == NULL) { + tTrace("client conn(sync) %p handle resp", conn); + (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL); + } else { + tTrace("client conn(sync) %p handle resp", conn); + memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); + tsem_post(pCtx->pSem); + } } conn->notifyCount += 1; @@ -144,7 +153,10 @@ static void clientHandleResp(SCliConn* conn) { uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb); SCliThrdObj* pThrd = conn->hostThrd; - addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); + // user owns conn->persist = 1 + if (conn->push != NULL) { + addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); + } destroyCmsg(pMsg); conn->data = NULL; @@ -154,7 +166,7 @@ static void clientHandleResp(SCliConn* conn) { } } static void clientHandleExcept(SCliConn* pConn) { - if (pConn->data == NULL) { + if (pConn->data == NULL && pConn->push == NULL) { // handle conn except in conn pool clientConnDestroy(pConn, true); return; @@ -169,13 +181,17 @@ static void clientHandleExcept(SCliConn* pConn) { SRpcMsg rpcMsg = {0}; rpcMsg.ahandle = pCtx->ahandle; rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - if (pCtx->pSem == NULL) { - // SRpcInfo* pRpc = pMsg->ctx->pRpc; - (pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL); + + if (pConn->push != NULL) { + (*pConn->push->callback)(pConn->push->arg, &rpcMsg); } else { - memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); - // SRpcMsg rpcMsg - tsem_post(pCtx->pSem); + if (pCtx->pSem == NULL) { + (pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL); + } else { + memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); + // SRpcMsg rpcMsg + tsem_post(pCtx->pSem); + } } destroyCmsg(pMsg); @@ -411,6 +427,10 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { tTrace("client msg tran time cost: %" PRIu64 "", el); et = taosGetTimestampUs(); + // if (pMsg->msg.handle != NULL) { + // // handle + //} + STransConnCtx* pCtx = pMsg->ctx; SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); if (conn != NULL) { @@ -426,6 +446,8 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { } clientWrite(conn); + conn->push = pMsg->msg.push; + } else { SCliConn* conn = calloc(1, sizeof(SCliConn)); conn->ref++; @@ -444,6 +466,8 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->data = pMsg; conn->hostThrd = pThrd; + conn->push = pMsg->msg.push; + struct sockaddr_in addr; uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); // handle error in callback if fail to connect