From dc15a3dadf06d55531e037ff20e51d2df9e5a529 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 5 Aug 2022 11:23:36 +0800 Subject: [PATCH] opt rpc send/recv --- source/libs/transport/inc/transComm.h | 16 ++----- source/libs/transport/src/transComm.c | 63 +++++---------------------- 2 files changed, 13 insertions(+), 66 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index a81d6db80f..853f24b0ce 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -127,7 +127,7 @@ typedef struct { int8_t retryCnt; int8_t retryLimit; - // bool setMaxRetry; + STransCtx appCtx; // STransMsg* pRsp; // for synchronous API tsem_t* pSem; // for synchronous API @@ -194,17 +194,7 @@ typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } Co #define transLabel(trans) ((STrans*)trans)->label -// int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey); -// void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); -//// int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); -// -// int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey); -// void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); -// bool transCompressMsg(char* msg, int32_t len, int32_t* flen); -// bool transDecompressMsg(char* msg, int32_t len, int32_t* flen); - void transFreeMsg(void* msg); - // typedef struct SConnBuffer { char* buf; @@ -321,8 +311,8 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType); // request list typedef struct STransReq { - queue q; - void* data; + queue q; + uv_write_t wreq; } STransReq; void transReqQueueInit(queue* q); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 8cf525a506..ca405fa536 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -23,33 +23,6 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static int32_t refMgt; static int32_t instMgt; -int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { - T_MD5_CTX context; - int ret = -1; - - tMD5Init(&context); - tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); - tMD5Update(&context, (uint8_t*)pMsg, msgLen); - tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); - tMD5Final(&context); - - if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0; - - return ret; -} - -void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) { - T_MD5_CTX context; - - tMD5Init(&context); - tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); - tMD5Update(&context, (uint8_t*)pMsg, msgLen); - tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); - tMD5Final(&context); - - memcpy(pAuth, context.digest, sizeof(context.digest)); -} - bool transCompressMsg(char* msg, int32_t len, int32_t* flen) { return false; // SRpcHead* pHead = rpcHeadFromCont(pCont); @@ -176,7 +149,6 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { * info--->| */ SConnBuffer* p = connBuf; - uvBuf->base = p->buf + p->len; if (p->left == -1) { uvBuf->len = p->cap - p->len; @@ -267,14 +239,9 @@ int transAsyncSend(SAsyncPool* pool, queue* q) { uv_async_t* async = &(pool->asyncs[idx]); SAsyncItem* item = async->data; - int64_t st = taosGetTimestampUs(); taosThreadMutexLock(&item->mtx); QUEUE_PUSH(&item->qmsg, q); taosThreadMutexUnlock(&item->mtx); - int64_t el = taosGetTimestampUs() - st; - if (el > 50) { - // tInfo("lock and unlock cost:%d", (int)el); - } return uv_async_send(async); } @@ -350,30 +317,21 @@ void transReqQueueInit(queue* q) { QUEUE_INIT(q); } void* transReqQueuePush(queue* q) { - uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); - STransReq* wreq = taosMemoryCalloc(1, sizeof(STransReq)); - wreq->data = req; - req->data = wreq; - QUEUE_PUSH(q, &wreq->q); - return req; + STransReq* req = taosMemoryCalloc(1, sizeof(STransReq)); + req->wreq.data = req; + QUEUE_PUSH(q, &req->q); + return &req->wreq; } void* transReqQueueRemove(void* arg) { void* ret = NULL; - uv_write_t* req = arg; - STransReq* wreq = req && req->data ? req->data : NULL; - - assert(wreq->data == req); - if (wreq == NULL || wreq->data == NULL) { - taosMemoryFree(wreq->data); - taosMemoryFree(wreq); - return req; - } + uv_write_t* wreq = arg; - QUEUE_REMOVE(&wreq->q); + STransReq* req = wreq ? wreq->data : NULL; + if (req == NULL) return NULL; + QUEUE_REMOVE(&req->q); - ret = req && req->handle ? req->handle->data : NULL; - taosMemoryFree(wreq->data); - taosMemoryFree(wreq); + ret = wreq && wreq->handle ? wreq->handle->data : NULL; + taosMemoryFree(req); return ret; } @@ -382,7 +340,6 @@ void transReqQueueClear(queue* q) { queue* h = QUEUE_HEAD(q); QUEUE_REMOVE(h); STransReq* req = QUEUE_DATA(h, STransReq, q); - taosMemoryFree(req->data); taosMemoryFree(req); } } -- GitLab