From 7dba7215f46876897829731ded0e18e7039c45b5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 12 Feb 2022 18:14:28 +0800 Subject: [PATCH] add demo --- source/libs/transport/inc/transComm.h | 1 + source/libs/transport/src/trans.c | 3 + source/libs/transport/src/transCli.c | 30 ++-- source/libs/transport/src/transSrv.c | 6 +- source/libs/transport/test/CMakeLists.txt | 18 ++ source/libs/transport/test/pushServer.c | 199 ++++++++++++++++++++++ 6 files changed, 241 insertions(+), 16 deletions(-) create mode 100644 source/libs/transport/test/pushServer.c diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index cc65f04a39..bec0375dbe 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -173,6 +173,7 @@ typedef struct { typedef struct { uint8_t user[TSDB_UNI_LEN]; + uint8_t secret[TSDB_PASSWORD_LEN]; } STransUserMsg; #pragma pack(pop) diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index b03adafaff..707f23113b 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -39,6 +39,9 @@ void* rpcOpen(const SRpcInit* pInit) { if (pInit->user) { memcpy(pRpc->user, pInit->user, strlen(pInit->user)); } + if (pInit->secret) { + memcpy(pRpc->secret, pInit->secret, strlen(pInit->secret)); + } return pRpc; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9d13ec8225..2a4a1891ed 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -135,9 +135,10 @@ static void clientHandleResp(SCliConn* conn) { if (conn->push != NULL && conn->notifyCount != 0) { (*conn->push->callback)(conn->push->arg, &rpcMsg); + conn->push = NULL; } else { if (pCtx->pSem == NULL) { - tTrace("client conn(sync) %p handle resp", conn); + tTrace("client conn%p handle resp", conn); (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL); } else { tTrace("client conn(sync) %p handle resp", conn); @@ -146,6 +147,7 @@ static void clientHandleResp(SCliConn* conn) { } } conn->notifyCount += 1; + conn->secured = pHead->secured; // buf's mem alread translated to rpcMsg.pCont transClearBuffer(&conn->readBuf); @@ -156,10 +158,10 @@ static void clientHandleResp(SCliConn* conn) { // user owns conn->persist = 1 if (conn->push == NULL) { addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); - } - destroyCmsg(pMsg); - conn->data = NULL; + destroyCmsg(conn->data); + conn->data = NULL; + } // start thread's timer of conn pool if not active if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) { uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); @@ -182,21 +184,23 @@ static void clientHandleExcept(SCliConn* pConn) { if (pConn->push != NULL && pConn->notifyCount != 0) { (*pConn->push->callback)(pConn->push->arg, &rpcMsg); + pConn->push = NULL; } else { if (pCtx->pSem == NULL) { (pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL); } else { memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); - // SRpcMsg rpcMsg tsem_post(pCtx->pSem); } if (pConn->push != NULL) { (*pConn->push->callback)(pConn->push->arg, &rpcMsg); } + pConn->push = NULL; + } + if (pConn->push == NULL) { + destroyCmsg(pConn->data); + pConn->data = NULL; } - - destroyCmsg(pMsg); - pConn->data = NULL; // transDestroyConnCtx(pCtx); clientConnDestroy(pConn, true); pConn->notifyCount += 1; @@ -383,6 +387,7 @@ static void clientWriteCb(uv_write_t* req, int status) { static void clientWrite(SCliConn* pConn) { SCliMsg* pCliMsg = pConn->data; STransConnCtx* pCtx = pCliMsg->ctx; + SRpcInfo* pTransInst = pCtx->pTransInst; SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg); @@ -394,7 +399,8 @@ static void clientWrite(SCliConn* pConn) { memcpy(buf, (char*)pHead, msgLen); STransUserMsg* uMsg = (STransUserMsg*)(buf + msgLen); - memcpy(uMsg->user, pCtx->pTransInst->user, tListLen(uMsg->user)); + memcpy(uMsg->user, pTransInst->user, tListLen(uMsg->user)); + memcpy(uMsg->secret, pTransInst->secret, tListLen(uMsg->secret)); // to avoid mem leak destroyUserdata(pMsg); @@ -402,8 +408,6 @@ static void clientWrite(SCliConn* pConn) { pMsg->pCont = (char*)buf + sizeof(STransMsgHead); pMsg->contLen = msgLen + sizeof(STransUserMsg) - sizeof(STransMsgHead); - pConn->secured = 1; // del later - pHead = (STransMsgHead*)buf; pHead->secured = 1; msgLen += sizeof(STransUserMsg); @@ -450,10 +454,6 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { tTrace("client msg tran time cost: %" PRIu64 "", el); et = taosGetTimestampUs(); - // if (pMsg->msg.handle != NULL) { - // // handle - //} - STransConnCtx* pCtx = pMsg->ctx; SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); if (conn != NULL) { diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index cc9933f919..5f4daef344 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -234,6 +234,7 @@ static void uvHandleReq(SSrvConn* pConn) { if (pHead->secured == 1) { STransUserMsg* uMsg = (p->msg + p->msgLen - sizeof(STransUserMsg)); memcpy(pConn->user, uMsg->user, tListLen(uMsg->user)); + memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret)); } pConn->inType = pHead->msgType; @@ -335,13 +336,16 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { // impl later; tTrace("server conn %p prepare to send resp", smsg->pConn); - SRpcMsg* pMsg = &smsg->msg; + SSrvConn* pConn = smsg->pConn; + SRpcMsg* pMsg = &smsg->msg; if (pMsg->pCont == 0) { pMsg->pCont = (void*)rpcMallocCont(0); pMsg->contLen = 0; } STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); + + pHead->secured = pMsg->code == 0 ? 1 : 0; // pHead->msgType = smsg->pConn->inType + 1; pHead->code = htonl(pMsg->code); // add more info diff --git a/source/libs/transport/test/CMakeLists.txt b/source/libs/transport/test/CMakeLists.txt index 26b45ed09b..b4f50219ff 100644 --- a/source/libs/transport/test/CMakeLists.txt +++ b/source/libs/transport/test/CMakeLists.txt @@ -4,6 +4,7 @@ add_executable(server "") add_executable(transUT "") add_executable(syncClient "") add_executable(pushClient "") +add_executable(pushServer "") target_sources(transUT PRIVATE @@ -30,6 +31,10 @@ target_sources(pushClient PRIVATE "pushClient.c" ) +target_sources(pushServer + PRIVATE + "pushServer.c" +) target_include_directories(transportTest PUBLIC @@ -110,3 +115,16 @@ target_link_libraries (pushClient transport ) +target_include_directories(pushServer + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/transport" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_link_libraries (pushServer + os + util + common + gtest_main + transport +) + diff --git a/source/libs/transport/test/pushServer.c b/source/libs/transport/test/pushServer.c new file mode 100644 index 0000000000..f9115d3d4f --- /dev/null +++ b/source/libs/transport/test/pushServer.c @@ -0,0 +1,199 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//#define _DEFAULT_SOURCE +#include "os.h" +#include "rpcLog.h" +#include "tglobal.h" +#include "tqueue.h" +#include "trpc.h" + +int msgSize = 128; +int commit = 0; +int dataFd = -1; +STaosQueue *qhandle = NULL; +STaosQset * qset = NULL; + +void processShellMsg() { + static int num = 0; + STaosQall *qall; + SRpcMsg * pRpcMsg, rpcMsg; + int type; + void * pvnode; + + qall = taosAllocateQall(); + + while (1) { + int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &pvnode, NULL); + tDebug("%d shell msgs are received", numOfMsgs); + if (numOfMsgs <= 0) break; + + for (int i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pRpcMsg); + + if (dataFd >= 0) { + if (write(dataFd, pRpcMsg->pCont, pRpcMsg->contLen) < 0) { + tInfo("failed to write data file, reason:%s", strerror(errno)); + } + } + } + + if (commit >= 2) { + num += numOfMsgs; + // if (taosFsync(dataFd) < 0) { + // tInfo("failed to flush data to file, reason:%s", strerror(errno)); + //} + + if (num % 10000 == 0) { + tInfo("%d request have been written into disk", num); + } + } + + taosResetQitems(qall); + for (int i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pRpcMsg); + rpcFreeCont(pRpcMsg->pCont); + + memset(&rpcMsg, 0, sizeof(rpcMsg)); + rpcMsg.pCont = rpcMallocCont(msgSize); + rpcMsg.contLen = msgSize; + rpcMsg.handle = pRpcMsg->handle; + rpcMsg.code = 0; + rpcSendResponse(&rpcMsg); + + void *handle = pRpcMsg->handle; + taosFreeQitem(pRpcMsg); + + { + sleep(1); + SRpcMsg nRpcMsg = {0}; + nRpcMsg.pCont = rpcMallocCont(msgSize); + nRpcMsg.contLen = msgSize; + nRpcMsg.handle = handle; + nRpcMsg.code = TSDB_CODE_CTG_NOT_READY; + rpcSendResponse(&nRpcMsg); + } + } + } + + taosFreeQall(qall); +} + +int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) { + // app shall retrieve the auth info based on meterID from DB or a data file + // demo code here only for simple demo + int ret = 0; + + if (strcmp(meterId, "michael") == 0) { + *spi = 1; + *encrypt = 0; + strcpy(secret, "mypassword"); + strcpy(ckey, "key"); + } else if (strcmp(meterId, "jeff") == 0) { + *spi = 0; + *encrypt = 0; + } else { + ret = -1; // user not there + } + + return ret; +} + +void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { + SRpcMsg *pTemp; + + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, pMsg, sizeof(SRpcMsg)); + + tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); + taosWriteQitem(qhandle, pTemp); +} + +int main(int argc, char *argv[]) { + SRpcInit rpcInit; + char dataName[20] = "server.data"; + + taosBlockSIGPIPE(); + + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 7000; + rpcInit.label = "SER"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = processRequestMsg; + rpcInit.sessions = 1000; + rpcInit.idleTime = 2 * 1500; + rpcInit.afp = retrieveAuthInfo; + + for (int i = 1; i < argc; ++i) { + if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { + rpcInit.localPort = atoi(argv[++i]); + } else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) { + rpcInit.numOfThreads = atoi(argv[++i]); + } else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) { + msgSize = atoi(argv[++i]); + } else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) { + rpcInit.sessions = atoi(argv[++i]); + } else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) { + tsCompressMsgSize = atoi(argv[++i]); + } else if (strcmp(argv[i], "-w") == 0 && i < argc - 1) { + commit = atoi(argv[++i]); + } else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) { + rpcDebugFlag = atoi(argv[++i]); + dDebugFlag = rpcDebugFlag; + uDebugFlag = rpcDebugFlag; + } else { + printf("\nusage: %s [options] \n", argv[0]); + printf(" [-p port]: server port number, default is:%d\n", rpcInit.localPort); + printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads); + printf(" [-s sessions]: number of sessions, default is:%d\n", rpcInit.sessions); + printf(" [-m msgSize]: message body size, default is:%d\n", msgSize); + printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize); + printf(" [-w write]: write received data to file(0, 1, 2), default is:%d\n", commit); + printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag); + printf(" [-h help]: print out this help\n\n"); + exit(0); + } + } + + tsAsyncLog = 0; + rpcInit.connType = TAOS_CONN_SERVER; + taosInitLog("server.log", 100000, 10); + + void *pRpc = rpcOpen(&rpcInit); + if (pRpc == NULL) { + tError("failed to start RPC server"); + return -1; + } + // sleep(5); + + tInfo("RPC server is running, ctrl-c to exit"); + + if (commit) { + dataFd = open(dataName, O_APPEND | O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO); + if (dataFd < 0) tInfo("failed to open data file, reason:%s", strerror(errno)); + } + qhandle = taosOpenQueue(); + qset = taosOpenQset(); + taosAddIntoQset(qset, qhandle, NULL); + + processShellMsg(); + + if (dataFd >= 0) { + close(dataFd); + remove(dataName); + } + + return 0; +} -- GitLab