提交 dc15a3da 编写于 作者: dengyihao's avatar dengyihao

opt rpc send/recv

上级 876443bb
...@@ -127,7 +127,7 @@ typedef struct { ...@@ -127,7 +127,7 @@ typedef struct {
int8_t retryCnt; int8_t retryCnt;
int8_t retryLimit; int8_t retryLimit;
// bool setMaxRetry;
STransCtx appCtx; // STransCtx appCtx; //
STransMsg* pRsp; // for synchronous API STransMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API tsem_t* pSem; // for synchronous API
...@@ -194,17 +194,7 @@ typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } Co ...@@ -194,17 +194,7 @@ typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } Co
#define transLabel(trans) ((STrans*)trans)->label #define transLabel(trans) ((STrans*)trans)->label
// int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
// void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
//// int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
//
// int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
// void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
// bool transCompressMsg(char* msg, int32_t len, int32_t* flen);
// bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
void transFreeMsg(void* msg); void transFreeMsg(void* msg);
// //
typedef struct SConnBuffer { typedef struct SConnBuffer {
char* buf; char* buf;
...@@ -321,8 +311,8 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType); ...@@ -321,8 +311,8 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType);
// request list // request list
typedef struct STransReq { typedef struct STransReq {
queue q; queue q;
void* data; uv_write_t wreq;
} STransReq; } STransReq;
void transReqQueueInit(queue* q); void transReqQueueInit(queue* q);
......
...@@ -23,33 +23,6 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; ...@@ -23,33 +23,6 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static int32_t refMgt; static int32_t refMgt;
static int32_t instMgt; static int32_t instMgt;
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context;
int ret = -1;
tMD5Init(&context);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Update(&context, (uint8_t*)pMsg, msgLen);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Final(&context);
if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;
return ret;
}
void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context;
tMD5Init(&context);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Update(&context, (uint8_t*)pMsg, msgLen);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Final(&context);
memcpy(pAuth, context.digest, sizeof(context.digest));
}
bool transCompressMsg(char* msg, int32_t len, int32_t* flen) { bool transCompressMsg(char* msg, int32_t len, int32_t* flen) {
return false; return false;
// SRpcHead* pHead = rpcHeadFromCont(pCont); // SRpcHead* pHead = rpcHeadFromCont(pCont);
...@@ -176,7 +149,6 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { ...@@ -176,7 +149,6 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
* info--->| * info--->|
*/ */
SConnBuffer* p = connBuf; SConnBuffer* p = connBuf;
uvBuf->base = p->buf + p->len; uvBuf->base = p->buf + p->len;
if (p->left == -1) { if (p->left == -1) {
uvBuf->len = p->cap - p->len; uvBuf->len = p->cap - p->len;
...@@ -267,14 +239,9 @@ int transAsyncSend(SAsyncPool* pool, queue* q) { ...@@ -267,14 +239,9 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
uv_async_t* async = &(pool->asyncs[idx]); uv_async_t* async = &(pool->asyncs[idx]);
SAsyncItem* item = async->data; SAsyncItem* item = async->data;
int64_t st = taosGetTimestampUs();
taosThreadMutexLock(&item->mtx); taosThreadMutexLock(&item->mtx);
QUEUE_PUSH(&item->qmsg, q); QUEUE_PUSH(&item->qmsg, q);
taosThreadMutexUnlock(&item->mtx); taosThreadMutexUnlock(&item->mtx);
int64_t el = taosGetTimestampUs() - st;
if (el > 50) {
// tInfo("lock and unlock cost:%d", (int)el);
}
return uv_async_send(async); return uv_async_send(async);
} }
...@@ -350,30 +317,21 @@ void transReqQueueInit(queue* q) { ...@@ -350,30 +317,21 @@ void transReqQueueInit(queue* q) {
QUEUE_INIT(q); QUEUE_INIT(q);
} }
void* transReqQueuePush(queue* q) { void* transReqQueuePush(queue* q) {
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); STransReq* req = taosMemoryCalloc(1, sizeof(STransReq));
STransReq* wreq = taosMemoryCalloc(1, sizeof(STransReq)); req->wreq.data = req;
wreq->data = req; QUEUE_PUSH(q, &req->q);
req->data = wreq; return &req->wreq;
QUEUE_PUSH(q, &wreq->q);
return req;
} }
void* transReqQueueRemove(void* arg) { void* transReqQueueRemove(void* arg) {
void* ret = NULL; void* ret = NULL;
uv_write_t* req = arg; uv_write_t* wreq = arg;
STransReq* wreq = req && req->data ? req->data : NULL;
assert(wreq->data == req);
if (wreq == NULL || wreq->data == NULL) {
taosMemoryFree(wreq->data);
taosMemoryFree(wreq);
return req;
}
QUEUE_REMOVE(&wreq->q); STransReq* req = wreq ? wreq->data : NULL;
if (req == NULL) return NULL;
QUEUE_REMOVE(&req->q);
ret = req && req->handle ? req->handle->data : NULL; ret = wreq && wreq->handle ? wreq->handle->data : NULL;
taosMemoryFree(wreq->data); taosMemoryFree(req);
taosMemoryFree(wreq);
return ret; return ret;
} }
...@@ -382,7 +340,6 @@ void transReqQueueClear(queue* q) { ...@@ -382,7 +340,6 @@ void transReqQueueClear(queue* q) {
queue* h = QUEUE_HEAD(q); queue* h = QUEUE_HEAD(q);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
STransReq* req = QUEUE_DATA(h, STransReq, q); STransReq* req = QUEUE_DATA(h, STransReq, q);
taosMemoryFree(req->data);
taosMemoryFree(req); taosMemoryFree(req);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册