diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index f2ac77fe61e41cbc8d041cfb82d80d3a39f0e505..06d5a7da915d53969a7008ee2cb8755d84a55642 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -120,6 +120,10 @@ typedef struct { // SEpSet* pSet; // for synchronous API } SRpcReqContext; +typedef SRpcMsg STransMsg; +typedef SRpcInfo STrans; +typedef SRpcConnInfo STransHandleInfo; + typedef struct { SEpSet epSet; // ip list provided by app void* ahandle; // handle provided by app @@ -134,7 +138,7 @@ typedef struct { int8_t connType; // connection type int64_t rid; // refId returned by taosAddRef - SRpcMsg* pRsp; // for synchronous API + STransMsg* pRsp; // for synchronous API tsem_t* pSem; // for synchronous API int hThrdIdx; @@ -143,6 +147,7 @@ typedef struct { // SEpSet* pSet; // for synchronous API } STransConnCtx; + #pragma pack(push, 1) typedef struct { @@ -243,10 +248,24 @@ bool transReadComplete(SConnBuffer* connBuf); int transSetConnOption(uv_tcp_t* stream); + void transRefSrvHandle(void* handle); void transUnrefSrvHandle(void* handle); void transRefCliHandle(void* handle); void transUnrefCliHandle(void* handle); + +void transSendRequest(void *shandle, const char *ip, uint32_t port, STransMsg *pMsg); +void transSendRecv(void* shandle, const char *ip, uint32_t port, STransMsg *pMsg, STransMsg *pRsp); +void transSendResponse(const STransMsg* pMsg); +int transGetConnInfo(void *thandle, STransHandleInfo *pInfo); + + +void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); +void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); + +void transCloseClient(void *arg); +void transCloseServer(void *arg); + #endif diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 4d244665c7d84dbfedded1e9339097d4a1bdfb3c..015018f73f32d113706f0b27f4b75a0e1713caed 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -18,8 +18,9 @@ #include "transComm.h" void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = { - taosInitServer, taosInitClient}; -void (*taosCloseHandle[])(void* arg) = {taosCloseServer, taosCloseClient}; + transInitServer, transInitClient}; + +void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient}; void* rpcOpen(const SRpcInit* pInit) { SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); @@ -34,11 +35,12 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; pRpc->pfp = pInit->pfp; + pRpc->mfp = pInit->mfp; if (pInit->connType == TAOS_CONN_SERVER) { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; } else { - pRpc->numOfThreads = pInit->numOfThreads; + pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; } pRpc->connType = pInit->connType; @@ -116,6 +118,24 @@ int32_t rpcInit() { return 0; } +void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg* pMsg, int64_t *pRid) { + char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); + uint32_t port = pEpSet->eps[pEpSet->inUse].port; + transSendRequest(shandle, ip, port, pMsg); +} +void rpcSendRecv(void* shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); + uint32_t port = pEpSet->eps[pEpSet->inUse].port; + transSendRecv(shandle, ip, port, pMsg, pRsp); +} + +void rpcSendResponse(const SRpcMsg *pMsg) { + transSendResponse(pMsg); +} +int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { + return transGetConnInfo((void *)thandle, pInfo); +} + void rpcCleanup(void) { // impl later // @@ -129,6 +149,7 @@ void rpcRefHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); (*taosRefHandle[type])(handle); } + void rpcUnrefHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); (*taosUnRefHandle[type])(handle); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index ce3c1c2dc89b8540c7d78d9c23d0f877d018a603..c55d699e4f83c7eee2c7470a47b2fc74b365c44a 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -42,7 +42,7 @@ typedef struct SCliConn { typedef struct SCliMsg { STransConnCtx* ctx; - SRpcMsg msg; + STransMsg msg; queue q; uint64_t st; } SCliMsg; @@ -105,9 +105,9 @@ static void cliHandleExcept(SCliConn* conn); static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliSendQuit(SCliThrdObj* thrd); -static void destroyUserdata(SRpcMsg* userdata); +static void destroyUserdata(STransMsg* userdata); -static int cliRBChoseIdx(SRpcInfo* pTransInst); +static int cliRBChoseIdx(STrans* pTransInst); static void destroyCmsg(SCliMsg* cmsg); static void transDestroyConnCtx(STransConnCtx* ctx); @@ -118,7 +118,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) #define CONN_PERSIST_TIME(para) (para * 1000 * 10) -#define CONN_GET_INST_LABEL(conn) (((SRpcInfo*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label) +#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label) #define CONN_HANDLE_THREAD_QUIT(conn, thrd) \ do { \ if (thrd->quit) { \ @@ -135,14 +135,14 @@ static void destroyThrdObj(SCliThrdObj* pThrd); } \ } while (0); -#define CONN_PERSIST_BY_APP(conn) do { if (conn->persist == false) { conn->persist = true; transRefCliHandle(conn);}} while(0) +#define CONN_SET_PERSIST_BY_APP(conn) do { if (conn->persist == false) { conn->persist = true; transRefCliHandle(conn);}} while(0) #define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false) static void* cliWorkThread(void* arg); -static void cliHandleResp(SCliConn* conn) { +void cliHandleResp(SCliConn* conn) { SCliThrdObj* pThrd = conn->hostThrd; - SRpcInfo* pTransInst = pThrd->pTransInst; + STrans* pTransInst = pThrd->pTransInst; STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); pHead->code = htonl(pHead->code); @@ -151,7 +151,7 @@ static void cliHandleResp(SCliConn* conn) { // buf's mem alread translated to rpcMsg.pCont transClearBuffer(&conn->readBuf); - SRpcMsg rpcMsg = {0}; + STransMsg rpcMsg = {0}; rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.pCont = transContFromHead((char*)pHead); rpcMsg.code = pHead->code; @@ -172,7 +172,7 @@ static void cliHandleResp(SCliConn* conn) { if (pTransInst->pfp != NULL && (*pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) { rpcMsg.handle = conn; - CONN_PERSIST_BY_APP(conn); + CONN_SET_PERSIST_BY_APP(conn); tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); } @@ -204,7 +204,8 @@ static void cliHandleResp(SCliConn* conn) { // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } } -static void cliHandleExcept(SCliConn* pConn) { + +void cliHandleExcept(SCliConn* pConn) { if (pConn->data == NULL) { if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) { transUnrefCliHandle(pConn); @@ -212,12 +213,12 @@ static void cliHandleExcept(SCliConn* pConn) { } } SCliThrdObj* pThrd = pConn->hostThrd; - SRpcInfo* pTransInst = pThrd->pTransInst; + STrans* pTransInst = pThrd->pTransInst; SCliMsg* pMsg = pConn->data; STransConnCtx *pCtx = pMsg ? pMsg->ctx : NULL; - SRpcMsg rpcMsg = {0}; + STransMsg rpcMsg = {0}; rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; rpcMsg.ahandle = NULL; @@ -243,9 +244,9 @@ static void cliHandleExcept(SCliConn* pConn) { transUnrefCliHandle(pConn); } -static void cliTimeoutCb(uv_timer_t* handle) { +void cliTimeoutCb(uv_timer_t* handle) { SCliThrdObj* pThrd = handle->data; - SRpcInfo* pRpc = pThrd->pTransInst; + STrans* pRpc = pThrd->pTransInst; int64_t currentTime = pThrd->nextTimeout; tTrace("%s, cli conn timeout, try to remove expire conn from conn pool", pRpc->label); @@ -267,11 +268,12 @@ static void cliTimeoutCb(uv_timer_t* handle) { pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } -static void* createConnPool(int size) { + +void* createConnPool(int size) { // thread local, no lock return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); } -static void* destroyConnPool(void* pool) { +void* destroyConnPool(void* pool) { SConnList* connList = taosHashIterate((SHashObj*)pool, NULL); while (connList != NULL) { while (!QUEUE_IS_EMPTY(&connList->conn)) { @@ -316,7 +318,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); - SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; + STrans* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); @@ -411,16 +413,16 @@ static void cliSendCb(uv_write_t* req, int status) { uv_read_start((uv_stream_t*)pConn->stream, cliAllocBufferCb, cliRecvCb); } -static void cliSend(SCliConn* pConn) { +void cliSend(SCliConn* pConn) { CONN_HANDLE_BROKEN(pConn); SCliMsg* pCliMsg = pConn->data; STransConnCtx* pCtx = pCliMsg->ctx; SCliThrdObj* pThrd = pConn->hostThrd; - SRpcInfo* pTransInst = pThrd->pTransInst; + STrans* pTransInst = pThrd->pTransInst; - SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg); + STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); int msgLen = transMsgLenFromCont(pMsg->contLen); @@ -458,7 +460,8 @@ static void cliSend(SCliConn* pConn) { _RETURE: return; } -static void cliConnCb(uv_connect_t* req, int status) { + +void cliConnCb(uv_connect_t* req, int status) { // impl later SCliConn* pConn = req->data; if (status != 0) { @@ -488,7 +491,8 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { pThrd->quit = true; uv_stop(pThrd->loop); } -static SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { + +SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = NULL; if (pMsg->msg.handle != NULL) { conn = (SCliConn*)(pMsg->msg.handle); @@ -502,13 +506,14 @@ static SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { } return conn; } -static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { + +void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; - tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el); + tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((STrans*)pThrd->pTransInst)->label, el); STransConnCtx* pCtx = pMsg->ctx; - SRpcInfo* pTransInst = pThrd->pTransInst; + STrans* pTransInst = pThrd->pTransInst; SCliConn* conn = cliGetConn(pMsg, pThrd); if (conn != NULL) { @@ -567,10 +572,10 @@ static void* cliWorkThread(void* arg) { uv_run(pThrd->loop, UV_RUN_DEFAULT); } -void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { +void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { SCliObj* cli = calloc(1, sizeof(SCliObj)); - SRpcInfo* pRpc = shandle; + STrans* pRpc = shandle; memcpy(cli->label, label, strlen(label)); cli->numOfThreads = numOfThreads; cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*)); @@ -589,7 +594,7 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, return cli; } -static void destroyUserdata(SRpcMsg* userdata) { +static void destroyUserdata(STransMsg* userdata) { if (userdata->pCont == NULL) { return; } @@ -645,12 +650,20 @@ static void transDestroyConnCtx(STransConnCtx* ctx) { free(ctx); } // -static void cliSendQuit(SCliThrdObj* thrd) { +void cliSendQuit(SCliThrdObj* thrd) { // cli can stop gracefully SCliMsg* msg = calloc(1, sizeof(SCliMsg)); transSendAsync(thrd->asyncPool, &msg->q); } -void taosCloseClient(void* arg) { +int cliRBChoseIdx(STrans* pTransInst) { + int64_t index = pTransInst->index; + if (pTransInst->index++ >= pTransInst->numOfThreads) { + pTransInst->index = 0; + } + return index % pTransInst->numOfThreads; +} + +void transCloseClient(void* arg) { SCliObj* cli = arg; for (int i = 0; i < cli->numOfThreads; i++) { cliSendQuit(cli->pThreadObj[i]); @@ -659,13 +672,6 @@ void taosCloseClient(void* arg) { free(cli->pThreadObj); free(cli); } -static int cliRBChoseIdx(SRpcInfo* pTransInst) { - int64_t index = pTransInst->index; - if (pTransInst->index++ >= pTransInst->numOfThreads) { - pTransInst->index = 0; - } - return index % pTransInst->numOfThreads; -} void transRefCliHandle(void* handle) { if (handle == NULL) { return; @@ -681,17 +687,11 @@ void transUnrefCliHandle(void* handle) { if (ref == 0) { cliDestroyConn((SCliConn*)handle, true); } - - // unref cli handle } -void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { - // impl later - char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); - uint32_t port = pEpSet->eps[pEpSet->inUse].port; - - SRpcInfo* pTransInst = (SRpcInfo*)shandle; - int index = CONN_HOST_THREAD_INDEX(pMsg->handle); +void transSendRequest(void *shandle, const char *ip, uint32_t port, STransMsg *pMsg) { + STrans* pTransInst = (STrans*)shandle; + int index = CONN_HOST_THREAD_INDEX((SCliConn *)pMsg->handle); if (index == -1) { index = cliRBChoseIdx(pTransInst); } @@ -718,13 +718,8 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); } - -void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { - char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); - uint32_t port = pEpSet->eps[pEpSet->inUse].port; - - SRpcInfo* pTransInst = (SRpcInfo*)shandle; - +void transSendRecv(void* shandle, const char *ip, uint32_t port, STransMsg *pReq, STransMsg *pRsp) { + STrans* pTransInst = (STrans*)shandle; int index = CONN_HOST_THREAD_INDEX(pReq->handle); if (index == -1) { index = cliRBChoseIdx(pTransInst); @@ -751,7 +746,6 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { tsem_wait(pSem); tsem_destroy(pSem); free(pSem); - - return; } + #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index c4c3d9ed0a62445f9fb95b6c9d44bca45cb69581..f9c014902c406a394f7af96239cc83bbf099907d 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -37,8 +37,7 @@ typedef struct SSrvConn { struct sockaddr_in addr; struct sockaddr_in locaddr; - // SRpcMsg sendMsg; - // del later + char secured; int spi; char info[64]; @@ -49,7 +48,7 @@ typedef struct SSrvConn { typedef struct SSrvMsg { SSrvConn* pConn; - SRpcMsg msg; + STransMsg msg; queue q; } SSrvMsg; @@ -207,20 +206,20 @@ static void uvHandleReq(SSrvConn* pConn) { pConn->inType = pHead->msgType; - SRpcInfo* pRpc = (SRpcInfo*)p->shandle; + STrans* pRpc = (STrans*)p->shandle; pHead->code = htonl(pHead->code); int32_t dlen = 0; if (transDecompressMsg(NULL, 0, NULL)) { // add compress later - // pHead = rpcDecompressRpcMsg(pHead); + // pHead = rpcDecompresSTransMsg(pHead); } else { pHead->msgLen = htonl(pHead->msgLen); // impl later // } - SRpcMsg rpcMsg; + STransMsg rpcMsg; rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.pCont = pHead->content; rpcMsg.msgType = pHead->msgType; @@ -319,7 +318,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { tTrace("server conn %p prepare to send resp", smsg->pConn); SSrvConn* pConn = smsg->pConn; - SRpcMsg* pMsg = &smsg->msg; + STransMsg* pMsg = &smsg->msg; if (pMsg->pCont == 0) { pMsg->pCont = (void*)rpcMallocCont(0); pMsg->contLen = 0; @@ -547,7 +546,7 @@ static bool addHandleToWorkloop(void* arg) { return false; } - // SRpcInfo* pRpc = pThrd->shandle; + // STrans* pRpc = pThrd->shandle; uv_pipe_init(pThrd->loop, pThrd->pipe, 1); uv_pipe_open(pThrd->pipe, pThrd->fd); @@ -668,7 +667,7 @@ static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) { return msgLen; } -void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { +void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { SServerObj* srv = calloc(1, sizeof(SServerObj)); srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); srv->numOfThreads = numOfThreads; @@ -720,7 +719,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, return srv; End: - taosCloseServer(srv); + transCloseServer(srv); return NULL; } @@ -740,7 +739,7 @@ void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { transSendAsync(pThrd->asyncPool, &srvMsg->q); } -void taosCloseServer(void* arg) { +void transCloseServer(void* arg) { // impl later SServerObj* srv = arg; for (int i = 0; i < srv->numOfThreads; i++) { @@ -786,7 +785,7 @@ void transUnrefSrvHandle(void* handle) { } // unref srv handle } -void rpcSendResponse(const SRpcMsg* pMsg) { +void transSendResponse(const STransMsg* pMsg) { if (pMsg->handle == NULL) { return; } @@ -799,14 +798,12 @@ void rpcSendResponse(const SRpcMsg* pMsg) { tTrace("server conn %p start to send resp", pConn); transSendAsync(pThrd->asyncPool, &srvMsg->q); } - -int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { +int transGetConnInfo(void *thandle, STransHandleInfo *pInfo) { SSrvConn* pConn = thandle; - struct sockaddr_in addr = pConn->addr; + pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr); pInfo->clientPort = ntohs(addr.sin_port); - tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user)); return 0; }