From 457c544dc77967a201b35e63f16032aa5c012b3d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 11 Feb 2022 21:34:31 +0800 Subject: [PATCH] fix push crashed --- source/libs/transport/inc/transComm.h | 11 ++++++++- source/libs/transport/src/trans.c | 3 +++ source/libs/transport/src/transCli.c | 28 ++++++++++++++++++++--- source/libs/transport/src/transComm.c | 6 ++++- source/libs/transport/src/transSrv.c | 5 ++++ source/libs/transport/test/CMakeLists.txt | 19 ++++++++++++++- source/libs/transport/test/pushClient.c | 15 +++++++----- 7 files changed, 75 insertions(+), 12 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 846f2d5099..cc65f04a39 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -151,7 +151,8 @@ typedef struct { char version : 4; // RPC version char comp : 4; // compression algorithm, 0:no compression 1:lz4 char resflag : 2; // reserved bits - char spi : 3; // security parameter index + char spi : 1; // security parameter index + char secured : 2; char encrypt : 3; // encrypt algorithm, 0: no encryption uint32_t code; // del later @@ -170,6 +171,10 @@ typedef struct { uint8_t auth[TSDB_AUTH_LEN]; } STransDigestMsg; +typedef struct { + uint8_t user[TSDB_UNI_LEN]; +} STransUserMsg; + #pragma pack(pop) #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) @@ -236,4 +241,8 @@ int transClearBuffer(SConnBuffer* buf); int transDestroyBuffer(SConnBuffer* buf); int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); +// int transPackMsg(SRpcMsg *rpcMsg, bool sercured, bool auth, char **msg, int32_t *msgLen); + +// int transUnpackMsg(char *msg, SRpcMsg *pMsg, bool ); + #endif diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 5fc937bccd..b03adafaff 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -36,6 +36,9 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->idleTime = pInit->idleTime; pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); pRpc->parent = pInit->parent; + if (pInit->user) { + memcpy(pRpc->user, pInit->user, strlen(pInit->user)); + } return pRpc; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 29770831fa..6630cd7d31 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -154,7 +154,7 @@ static void clientHandleResp(SCliConn* conn) { SCliThrdObj* pThrd = conn->hostThrd; // user owns conn->persist = 1 - if (conn->push != NULL) { + if (conn->push == NULL) { addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); } @@ -382,10 +382,32 @@ static void clientWriteCb(uv_write_t* req, int status) { static void clientWrite(SCliConn* pConn) { SCliMsg* pCliMsg = pConn->data; - SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg); + STransConnCtx* pCtx = pCliMsg->ctx; + + SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg); + STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); + int msgLen = transMsgLenFromCont(pMsg->contLen); + + if (!pConn->secured) { + char* buf = calloc(1, msgLen + sizeof(STransUserMsg)); + memcpy(buf, (char*)pHead, msgLen); + + STransUserMsg* uMsg = (STransUserMsg*)(buf + msgLen); + memcpy(uMsg->user, pCtx->pTransInst->user, tListLen(uMsg->user)); - int msgLen = transMsgLenFromCont(pMsg->contLen); + // to avoid mem leak + destroyUserdata(pMsg); + + pMsg->pCont = (char*)buf + sizeof(STransMsgHead); + pMsg->contLen = msgLen + sizeof(STransUserMsg) - sizeof(STransMsgHead); + + pConn->secured = 1; // del later + + pHead = (STransMsgHead*)buf; + pHead->secured = 0; + msgLen += sizeof(STransUserMsg); + } pHead->msgType = pMsg->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index d0e504a0a1..05b732b8cb 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -211,7 +211,8 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { /* * formate of data buffer: * |<--------------------------data from socket------------------------------->| - * |<------STransMsgHead------->|<-------------------other data--------------->| + * |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user + * info--->| */ static const int CAPACITY = 1024; @@ -239,6 +240,9 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { } return 0; } +int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) {} + +int transUnpackMsg(STransMsgHead* msgHead) {} int transDestroyBuffer(SConnBuffer* buf) { if (buf->cap > 0) { tfree(buf->buf); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index b8bbea92ce..a731024aba 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -231,6 +231,10 @@ static void uvHandleReq(SSrvConn* pConn) { p->chandle = NULL; STransMsgHead* pHead = (STransMsgHead*)p->msg; + if (pHead->secured == 0) { + STransUserMsg* uMsg = (p->msg + p->msgLen - sizeof(STransUserMsg)); + memcpy(pConn->user, uMsg->user, tListLen(uMsg->user)); + } pConn->inType = pHead->msgType; assert(transIsReq(pHead->msgType)); @@ -339,6 +343,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { } STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); pHead->msgType = smsg->pConn->inType + 1; + pHead->code = htonl(pMsg->code); // add more info char* msg = (char*)pHead; int32_t len = transMsgLenFromCont(pMsg->contLen); diff --git a/source/libs/transport/test/CMakeLists.txt b/source/libs/transport/test/CMakeLists.txt index 3c9c40f46a..26b45ed09b 100644 --- a/source/libs/transport/test/CMakeLists.txt +++ b/source/libs/transport/test/CMakeLists.txt @@ -3,12 +3,12 @@ add_executable(client "") add_executable(server "") add_executable(transUT "") add_executable(syncClient "") +add_executable(pushClient "") target_sources(transUT PRIVATE "transUT.cc" ) - target_sources(transportTest PRIVATE "transportTests.cc" @@ -26,6 +26,11 @@ target_sources (syncClient "syncClient.c" ) +target_sources(pushClient + PRIVATE + "pushClient.c" +) + target_include_directories(transportTest PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/transport" @@ -92,4 +97,16 @@ target_link_libraries (syncClient transport ) +target_include_directories(pushClient + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/transport" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_link_libraries (pushClient + os + util + common + gtest_main + transport +) diff --git a/source/libs/transport/test/pushClient.c b/source/libs/transport/test/pushClient.c index 2756eb4666..f1aadafacc 100644 --- a/source/libs/transport/test/pushClient.c +++ b/source/libs/transport/test/pushClient.c @@ -49,9 +49,8 @@ static int tcount = 0; typedef struct SPushArg { tsem_t sem; - } SPushArg; - +// ping int pushCallback(void *arg, SRpcMsg *msg) { SPushArg *push = arg; tsem_post(&push->sem); @@ -59,7 +58,8 @@ int pushCallback(void *arg, SRpcMsg *msg) { SRpcPush *createPushArg() { SRpcPush *push = calloc(1, sizeof(SRpcPush)); push->arg = calloc(1, sizeof(SPushArg)); - tsem_init(&push->arg->sem, 0, 0); + + tsem_init(&(((SPushArg *)push->arg)->sem), 0, 0); push->callback = pushCallback; return push; } @@ -83,14 +83,17 @@ static void *sendRequest(void *param) { rpcMsg.ahandle = pInfo; rpcMsg.msgType = 1; rpcMsg.push = push; - ; // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); int64_t start = taosGetTimestampUs(); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); - tsem_wait(&pInfo->rspSem); + tsem_wait(&pInfo->rspSem); // ping->pong // tsem_wait(&pInfo->rspSem); - tsem_wait(&push->sem); + SPushArg *arg = push->arg; + /// e + tsem_wait(&arg->sem); // push callback + + // query_fetch(client->h) int64_t end = taosGetTimestampUs() - start; if (end <= 100) { u100++; -- GitLab