diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 792200639a3d0cb05b3b12612a2dd8d90f8faace..4b14f9f2c792e7727a50679609d5d9832a022c5e 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -68,6 +68,25 @@ typedef void* queue[2]; QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \ QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \ } +#define QUEUE_SPLIT(h, q, n) \ + do { \ + QUEUE_PREV(n) = QUEUE_PREV(h); \ + QUEUE_PREV_NEXT(n) = (n); \ + QUEUE_NEXT(n) = (q); \ + QUEUE_PREV(h) = QUEUE_PREV(q); \ + QUEUE_PREV_NEXT(h) = (h); \ + QUEUE_PREV(q) = (n); \ + } while (0) + +#define QUEUE_MOVE(h, n) \ + do { \ + if (QUEUE_IS_EMPTY(h)) { \ + QUEUE_INIT(n); \ + } else { \ + queue* q = QUEUE_HEAD(h); \ + QUEUE_SPLIT(h, q, n); \ + } \ + } while (0) /* Return the element at the front of the queue. */ #define QUEUE_HEAD(q) (QUEUE_NEXT(q)) diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 89361b13ad59c65ab08c0cd8a801d957f6e5ec5b..cb8ef87b48acd9bae0ae8acf7cfab43ccdd61942 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -35,6 +35,7 @@ void* rpcOpen(const SRpcInit* pInit) { if (pInit->label) { tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); } + pRpc->cfp = pInit->cfp; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->connType = pInit->connType; pRpc->tcphandle = (*taosHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 29f3361b10b1778094f476802041eb093152935b..7e5874d8cd563685bb6b4ad702c73b5eeab4f050 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -20,12 +20,14 @@ typedef struct SCliConn { uv_connect_t connReq; uv_stream_t* stream; + uv_write_t* writeReq; void* data; queue conn; } SCliConn; typedef struct SCliMsg { SRpcReqContext* context; queue q; + uint64_t st; } SCliMsg; typedef struct SCliThrdObj { @@ -45,86 +47,153 @@ typedef struct SClientObj { SCliThrdObj** pThreadObj; } SClientObj; -static void clientWriteCb(uv_write_t* req, int status); +static void clientAllocrReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); -static void clientConnCb(struct uv_connect_s* req, int status); +static void clientWriteCb(uv_write_t* req, int status); +static void clientConnCb(uv_connect_t* req, int status); static void clientAsyncCb(uv_async_t* handle); +static void clientDestroy(uv_handle_t* handle); +static void clientConnDestroy(SCliConn* pConn); static void* clientThread(void* arg); -static void clientWriteCb(uv_write_t* req, int status) { +static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); + +static void clientAllocrReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + // impl later +} +static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { // impl later + SCliConn* conn = handle->data; + if (nread > 0) { + return; + } + // + uv_close((uv_handle_t*)handle, clientDestroy); } -static void clientFailedCb(uv_handle_t* handle) { + +static void clientConnDestroy(SCliConn* conn) { // impl later - tDebug("close handle"); + // } -static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { +static void clientDestroy(uv_handle_t* handle) { + SCliConn* conn = handle->data; + clientConnDestroy(conn); +} + +static void clientWriteCb(uv_write_t* req, int status) { + SCliConn* pConn = req->data; + if (status == 0) { + tDebug("data already was written on stream"); + } else { + uv_close((uv_handle_t*)pConn->stream, clientDestroy); + return; + } + + uv_read_start((uv_stream_t*)pConn->stream, clientAllocrReadBufferCb, clientReadCb); // impl later } -static void clientConnCb(struct uv_connect_s* req, int status) { +static void clientConnCb(uv_connect_t* req, int status) { + // impl later SCliConn* pConn = req->data; - SCliMsg* pMsg = pConn->data; - SEpSet* pEpSet = &pMsg->context->epSet; + if (status != 0) { + tError("failed to connect %s", uv_err_name(status)); + clientConnDestroy(pConn); + return; + } + + SCliMsg* pMsg = pConn->data; + SEpSet* pEpSet = &pMsg->context->epSet; + SRpcMsg rpcMsg; + rpcMsg.ahandle = pMsg->context->ahandle; + rpcMsg.pCont = NULL; char* fqdn = pEpSet->fqdn[pEpSet->inUse]; uint32_t port = pEpSet->port[pEpSet->inUse]; if (status != 0) { // call user fp later tError("failed to connect server(%s, %d), errmsg: %s", fqdn, port, uv_strerror(status)); - uv_close((uv_handle_t*)req->handle, clientFailedCb); + SRpcInfo* pRpc = pMsg->context->pRpc; + (pRpc->cfp)(NULL, &rpcMsg, pEpSet); + uv_close((uv_handle_t*)req->handle, clientDestroy); return; } assert(pConn->stream == req->handle); - // impl later + uv_buf_t wb; + uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); } static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { // impl later return NULL; } -static void clientAsyncCb(uv_async_t* handle) { - SCliThrdObj* pThrd = handle->data; - SCliMsg* pMsg = NULL; - pthread_mutex_lock(&pThrd->msgMtx); - if (!QUEUE_IS_EMPTY(&pThrd->msg)) { - queue* head = QUEUE_HEAD(&pThrd->msg); - pMsg = QUEUE_DATA(head, SCliMsg, q); - QUEUE_REMOVE(head); - } - pthread_mutex_unlock(&pThrd->msgMtx); - SEpSet* pEpSet = &pMsg->context->epSet; +static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { + SEpSet* pEpSet = &pMsg->context->epSet; + char* fqdn = pEpSet->fqdn[pEpSet->inUse]; uint32_t port = pEpSet->port[pEpSet->inUse]; + uint64_t el = taosGetTimestampUs() - pMsg->st; + tDebug("msg tran time cost: %" PRIu64 "", el); + SCliConn* conn = getConnFromCache(pThrd->cache, fqdn, port); if (conn != NULL) { + conn->data = pMsg; + conn->writeReq->data = conn; + uv_buf_t wb; + uv_write(conn->writeReq, (uv_stream_t*)conn->stream, &wb, 1, clientWriteCb); // impl later } else { SCliConn* conn = malloc(sizeof(SCliConn)); conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); + conn->writeReq = malloc(sizeof(uv_write_t)); conn->connReq.data = conn; conn->data = pMsg; - struct sockaddr_in addr; uv_ip4_addr(fqdn, port, &addr); - // handle error in callback if connect error + // handle error in callback if fail to connect uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); - } - // SRpcReqContext* pCxt = pMsg->context; + // SRpcMsg rpcMsg; + // SEpSet* pEpSet = &pMsg->context->epSet; + // SRpcInfo* pRpc = pMsg->context->pRpc; + //// rpcMsg.ahandle = pMsg->context->ahandle; + // rpcMsg.pCont = NULL; + // rpcMsg.ahandle = pMsg->context->ahandle; + // uint64_t el1 = taosGetTimestampUs() - et; + // tError("msg tran back first: time cost: %" PRIu64 "", el1); + // et = taosGetTimestampUs(); + //(pRpc->cfp)(NULL, &rpcMsg, pEpSet); + // uint64_t el2 = taosGetTimestampUs() - et; + // tError("msg tran back second: time cost: %" PRIu64 "", el2); + } +} +static void clientAsyncCb(uv_async_t* handle) { + SCliThrdObj* pThrd = handle->data; + SCliMsg* pMsg = NULL; + queue wq; - // SRpcHead* pHead = rpcHeadFromCont(pCtx->pCont); - // char* msg = (char*)pHead; - // int len = rpcMsgLenFromCont(pCtx->contLen); - // tmsg_t msgType = pCtx->msgType; + // batch process to avoid to lock/unlock frequently + pthread_mutex_lock(&pThrd->msgMtx); + QUEUE_MOVE(&pThrd->msg, &wq); + pthread_mutex_unlock(&pThrd->msgMtx); - // impl later + int count = 0; + while (!QUEUE_IS_EMPTY(&wq)) { + queue* h = QUEUE_HEAD(&wq); + QUEUE_REMOVE(h); + pMsg = QUEUE_DATA(h, SCliMsg, q); + clientHandleReq(pMsg, pThrd); + count++; + if (count >= 2) { + tError("send batch size: %d", count); + } + } } static void* clientThread(void* arg) { @@ -142,9 +211,6 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); QUEUE_INIT(&pThrd->msg); pthread_mutex_init(&pThrd->msgMtx, NULL); - - // QUEUE_INIT(&pThrd->clientCache); - pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); uv_loop_init(pThrd->loop); @@ -186,6 +252,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* } SCliMsg* msg = malloc(sizeof(SCliMsg)); msg->context = pContext; + msg->st = taosGetTimestampUs(); SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads]; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 0bf39b9985f6c2cb4f660bb36d22eb80200b0ab5..bc4cc695b008328614eab4385482d89094d3b901 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -277,10 +277,6 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { } return; } - if (terrno != 0) { - // handle err code - } - if (nread != UV_EOF) { tDebug("Read error %s\n", uv_err_name(nread)); } @@ -309,21 +305,23 @@ void uvOnWriteCb(uv_write_t* req, int status) { void uvWorkerAsyncCb(uv_async_t* handle) { SWorkThrdObj* pThrd = container_of(handle, SWorkThrdObj, workerAsync); SConn* conn = NULL; - - // opt later + queue wq; + // batch process to avoid to lock/unlock frequently pthread_mutex_lock(&pThrd->connMtx); - if (!QUEUE_IS_EMPTY(&pThrd->conn)) { - queue* head = QUEUE_HEAD(&pThrd->conn); - conn = QUEUE_DATA(head, SConn, queue); - QUEUE_REMOVE(head); - } + QUEUE_MOVE(&pThrd->conn, &wq); pthread_mutex_unlock(&pThrd->connMtx); - if (conn == NULL) { - tError("except occurred, do nothing"); - return; + + while (!QUEUE_IS_EMPTY(&wq)) { + queue* head = QUEUE_HEAD(&wq); + QUEUE_REMOVE(head); + SConn* conn = QUEUE_DATA(head, SConn, queue); + if (conn == NULL) { + tError("except occurred, do nothing"); + return; + } + uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len); + uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); } - uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len); - uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); } void uvOnAcceptCb(uv_stream_t* stream, int status) { diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index 58fbf6ae8559aaebbdbd83eb589eafebc408a436..6339e58560515b7884bfd720b727731a12ff7ec8 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -34,8 +34,8 @@ typedef struct { static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { SInfo *pInfo = (SInfo *)pMsg->ahandle; - tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, - pMsg->code); + // tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, + // pMsg->code); if (pEpSet) pInfo->epSet = *pEpSet; @@ -57,7 +57,7 @@ static void *sendRequest(void *param) { rpcMsg.contLen = pInfo->msgSize; rpcMsg.ahandle = pInfo; rpcMsg.msgType = 1; - tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); + // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); tsem_wait(&pInfo->rspSem);