diff --git a/source/libs/transport/src/transport.c b/source/libs/transport/src/transport.c index a6c9cee0b7cd11fea255dc579024dfc1be560a6e..f33d54c4f3e4f2c08bdd2ef43bf5ddfb8f4a7638 100644 --- a/source/libs/transport/src/transport.c +++ b/source/libs/transport/src/transport.c @@ -151,6 +151,7 @@ typedef struct SRpcConn { } SRpcConn; // auth function +static int uvAuthMsg(SRpcConn* pConn, char* msg, int msgLen); static int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey); static void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); static int rpcAddAuthPart(SRpcConn* pConn, char* msg, int msgLen); @@ -259,7 +260,7 @@ static bool isReadAll(SConnBuffer* data) { SRpcHead rpcHead; int32_t headLen = sizeof(rpcHead); if (data->len >= headLen) { - memcpy((char*)&rpcHead, data->buf, headLen); + memcpy((char*)&rpcHead, data->buf + RPC_RESERVE_SIZE, headLen); int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); if (msgLen > data->len) { data->left = msgLen - data->len; @@ -283,7 +284,7 @@ static void uvDoProcess(SRecvInfo* pRecv) { // do auth and check } -static int uvAuthData(SRpcConn* pConn, char* msg, int len) { +static int uvAuthMsg(SRpcConn* pConn, char* msg, int len) { SRpcHead* pHead = (SRpcHead*)msg; int code = 0; @@ -334,16 +335,16 @@ static int uvAuthData(SRpcConn* pConn, char* msg, int len) { return code; } -static void uvProcessData(SRpcConn* ctx) { +static void uvProcessData(SRpcConn* pConn) { SRecvInfo info; SRecvInfo* p = &info; - SConnBuffer* pBuf = &ctx->connBuf; + SConnBuffer* pBuf = &pConn->connBuf; p->msg = pBuf->buf + RPC_RESERVE_SIZE; p->msgLen = pBuf->len; p->ip = 0; p->port = 0; - p->shandle = ctx->shandle; // - p->thandle = ctx; + p->shandle = pConn->shandle; // + p->thandle = pConn; p->chandle = NULL; // @@ -351,9 +352,14 @@ static void uvProcessData(SRpcConn* ctx) { assert(rpcIsReq(pHead->msgType)); SRpcInfo* pRpc = (SRpcInfo*)p->shandle; - SRpcConn* pConn = (SRpcConn*)p->thandle; - pConn->ahandle = (void*)pHead->ahandle; + // auth here + + int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen); + if (code != 0) { + terrno = code; + } + // rpcCheckAuthentication(pConn, (char*)pHead, pBuf->len); pHead->code = htonl(pHead->code); SRpcMsg rpcMsg; @@ -383,6 +389,9 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { } return; } + if (terrno != 0) { + // handle err code + } if (nread != UV_EOF) { tDebug("Read error %s\n", uv_err_name(nread)); @@ -547,6 +556,7 @@ static void connDestroy(SRpcConn* conn) { uv_timer_stop(conn->pTimer); free(conn->pTimer); uv_close((uv_handle_t*)conn->pTcp, NULL); + free(conn->connBuf.buf); free(conn->pTcp); free(conn->pWriter); free(conn);