提交 7dba7215 编写于 作者: dengyihao's avatar dengyihao

add demo

上级 4bc48166
...@@ -173,6 +173,7 @@ typedef struct { ...@@ -173,6 +173,7 @@ typedef struct {
typedef struct { typedef struct {
uint8_t user[TSDB_UNI_LEN]; uint8_t user[TSDB_UNI_LEN];
uint8_t secret[TSDB_PASSWORD_LEN];
} STransUserMsg; } STransUserMsg;
#pragma pack(pop) #pragma pack(pop)
......
...@@ -39,6 +39,9 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -39,6 +39,9 @@ void* rpcOpen(const SRpcInit* pInit) {
if (pInit->user) { if (pInit->user) {
memcpy(pRpc->user, pInit->user, strlen(pInit->user)); memcpy(pRpc->user, pInit->user, strlen(pInit->user));
} }
if (pInit->secret) {
memcpy(pRpc->secret, pInit->secret, strlen(pInit->secret));
}
return pRpc; return pRpc;
} }
......
...@@ -135,9 +135,10 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -135,9 +135,10 @@ static void clientHandleResp(SCliConn* conn) {
if (conn->push != NULL && conn->notifyCount != 0) { if (conn->push != NULL && conn->notifyCount != 0) {
(*conn->push->callback)(conn->push->arg, &rpcMsg); (*conn->push->callback)(conn->push->arg, &rpcMsg);
conn->push = NULL;
} else { } else {
if (pCtx->pSem == NULL) { if (pCtx->pSem == NULL) {
tTrace("client conn(sync) %p handle resp", conn); tTrace("client conn%p handle resp", conn);
(pRpc->cfp)(pRpc->parent, &rpcMsg, NULL); (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL);
} else { } else {
tTrace("client conn(sync) %p handle resp", conn); tTrace("client conn(sync) %p handle resp", conn);
...@@ -146,6 +147,7 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -146,6 +147,7 @@ static void clientHandleResp(SCliConn* conn) {
} }
} }
conn->notifyCount += 1; conn->notifyCount += 1;
conn->secured = pHead->secured;
// buf's mem alread translated to rpcMsg.pCont // buf's mem alread translated to rpcMsg.pCont
transClearBuffer(&conn->readBuf); transClearBuffer(&conn->readBuf);
...@@ -156,10 +158,10 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -156,10 +158,10 @@ static void clientHandleResp(SCliConn* conn) {
// user owns conn->persist = 1 // user owns conn->persist = 1
if (conn->push == NULL) { if (conn->push == NULL) {
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
}
destroyCmsg(pMsg); destroyCmsg(conn->data);
conn->data = NULL; conn->data = NULL;
}
// start thread's timer of conn pool if not active // start thread's timer of conn pool if not active
if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) { if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) {
uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
...@@ -182,21 +184,23 @@ static void clientHandleExcept(SCliConn* pConn) { ...@@ -182,21 +184,23 @@ static void clientHandleExcept(SCliConn* pConn) {
if (pConn->push != NULL && pConn->notifyCount != 0) { if (pConn->push != NULL && pConn->notifyCount != 0) {
(*pConn->push->callback)(pConn->push->arg, &rpcMsg); (*pConn->push->callback)(pConn->push->arg, &rpcMsg);
pConn->push = NULL;
} else { } else {
if (pCtx->pSem == NULL) { if (pCtx->pSem == NULL) {
(pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL); (pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL);
} else { } else {
memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
// SRpcMsg rpcMsg
tsem_post(pCtx->pSem); tsem_post(pCtx->pSem);
} }
if (pConn->push != NULL) { if (pConn->push != NULL) {
(*pConn->push->callback)(pConn->push->arg, &rpcMsg); (*pConn->push->callback)(pConn->push->arg, &rpcMsg);
} }
pConn->push = NULL;
}
if (pConn->push == NULL) {
destroyCmsg(pConn->data);
pConn->data = NULL;
} }
destroyCmsg(pMsg);
pConn->data = NULL;
// transDestroyConnCtx(pCtx); // transDestroyConnCtx(pCtx);
clientConnDestroy(pConn, true); clientConnDestroy(pConn, true);
pConn->notifyCount += 1; pConn->notifyCount += 1;
...@@ -383,6 +387,7 @@ static void clientWriteCb(uv_write_t* req, int status) { ...@@ -383,6 +387,7 @@ static void clientWriteCb(uv_write_t* req, int status) {
static void clientWrite(SCliConn* pConn) { static void clientWrite(SCliConn* pConn) {
SCliMsg* pCliMsg = pConn->data; SCliMsg* pCliMsg = pConn->data;
STransConnCtx* pCtx = pCliMsg->ctx; STransConnCtx* pCtx = pCliMsg->ctx;
SRpcInfo* pTransInst = pCtx->pTransInst;
SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg); SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg);
...@@ -394,7 +399,8 @@ static void clientWrite(SCliConn* pConn) { ...@@ -394,7 +399,8 @@ static void clientWrite(SCliConn* pConn) {
memcpy(buf, (char*)pHead, msgLen); memcpy(buf, (char*)pHead, msgLen);
STransUserMsg* uMsg = (STransUserMsg*)(buf + msgLen); STransUserMsg* uMsg = (STransUserMsg*)(buf + msgLen);
memcpy(uMsg->user, pCtx->pTransInst->user, tListLen(uMsg->user)); memcpy(uMsg->user, pTransInst->user, tListLen(uMsg->user));
memcpy(uMsg->secret, pTransInst->secret, tListLen(uMsg->secret));
// to avoid mem leak // to avoid mem leak
destroyUserdata(pMsg); destroyUserdata(pMsg);
...@@ -402,8 +408,6 @@ static void clientWrite(SCliConn* pConn) { ...@@ -402,8 +408,6 @@ static void clientWrite(SCliConn* pConn) {
pMsg->pCont = (char*)buf + sizeof(STransMsgHead); pMsg->pCont = (char*)buf + sizeof(STransMsgHead);
pMsg->contLen = msgLen + sizeof(STransUserMsg) - sizeof(STransMsgHead); pMsg->contLen = msgLen + sizeof(STransUserMsg) - sizeof(STransMsgHead);
pConn->secured = 1; // del later
pHead = (STransMsgHead*)buf; pHead = (STransMsgHead*)buf;
pHead->secured = 1; pHead->secured = 1;
msgLen += sizeof(STransUserMsg); msgLen += sizeof(STransUserMsg);
...@@ -450,10 +454,6 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -450,10 +454,6 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tTrace("client msg tran time cost: %" PRIu64 "", el); tTrace("client msg tran time cost: %" PRIu64 "", el);
et = taosGetTimestampUs(); et = taosGetTimestampUs();
// if (pMsg->msg.handle != NULL) {
// // handle
//}
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
if (conn != NULL) { if (conn != NULL) {
......
...@@ -234,6 +234,7 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -234,6 +234,7 @@ static void uvHandleReq(SSrvConn* pConn) {
if (pHead->secured == 1) { if (pHead->secured == 1) {
STransUserMsg* uMsg = (p->msg + p->msgLen - sizeof(STransUserMsg)); STransUserMsg* uMsg = (p->msg + p->msgLen - sizeof(STransUserMsg));
memcpy(pConn->user, uMsg->user, tListLen(uMsg->user)); memcpy(pConn->user, uMsg->user, tListLen(uMsg->user));
memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret));
} }
pConn->inType = pHead->msgType; pConn->inType = pHead->msgType;
...@@ -335,13 +336,16 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { ...@@ -335,13 +336,16 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
// impl later; // impl later;
tTrace("server conn %p prepare to send resp", smsg->pConn); tTrace("server conn %p prepare to send resp", smsg->pConn);
SRpcMsg* pMsg = &smsg->msg;
SSrvConn* pConn = smsg->pConn; SSrvConn* pConn = smsg->pConn;
SRpcMsg* pMsg = &smsg->msg;
if (pMsg->pCont == 0) { if (pMsg->pCont == 0) {
pMsg->pCont = (void*)rpcMallocCont(0); pMsg->pCont = (void*)rpcMallocCont(0);
pMsg->contLen = 0; pMsg->contLen = 0;
} }
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->secured = pMsg->code == 0 ? 1 : 0; //
pHead->msgType = smsg->pConn->inType + 1; pHead->msgType = smsg->pConn->inType + 1;
pHead->code = htonl(pMsg->code); pHead->code = htonl(pMsg->code);
// add more info // add more info
......
...@@ -4,6 +4,7 @@ add_executable(server "") ...@@ -4,6 +4,7 @@ add_executable(server "")
add_executable(transUT "") add_executable(transUT "")
add_executable(syncClient "") add_executable(syncClient "")
add_executable(pushClient "") add_executable(pushClient "")
add_executable(pushServer "")
target_sources(transUT target_sources(transUT
PRIVATE PRIVATE
...@@ -30,6 +31,10 @@ target_sources(pushClient ...@@ -30,6 +31,10 @@ target_sources(pushClient
PRIVATE PRIVATE
"pushClient.c" "pushClient.c"
) )
target_sources(pushServer
PRIVATE
"pushServer.c"
)
target_include_directories(transportTest target_include_directories(transportTest
PUBLIC PUBLIC
...@@ -110,3 +115,16 @@ target_link_libraries (pushClient ...@@ -110,3 +115,16 @@ target_link_libraries (pushClient
transport transport
) )
target_include_directories(pushServer
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (pushServer
os
util
common
gtest_main
transport
)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#define _DEFAULT_SOURCE
#include "os.h"
#include "rpcLog.h"
#include "tglobal.h"
#include "tqueue.h"
#include "trpc.h"
int msgSize = 128;
int commit = 0;
int dataFd = -1;
STaosQueue *qhandle = NULL;
STaosQset * qset = NULL;
void processShellMsg() {
static int num = 0;
STaosQall *qall;
SRpcMsg * pRpcMsg, rpcMsg;
int type;
void * pvnode;
qall = taosAllocateQall();
while (1) {
int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &pvnode, NULL);
tDebug("%d shell msgs are received", numOfMsgs);
if (numOfMsgs <= 0) break;
for (int i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
if (dataFd >= 0) {
if (write(dataFd, pRpcMsg->pCont, pRpcMsg->contLen) < 0) {
tInfo("failed to write data file, reason:%s", strerror(errno));
}
}
}
if (commit >= 2) {
num += numOfMsgs;
// if (taosFsync(dataFd) < 0) {
// tInfo("failed to flush data to file, reason:%s", strerror(errno));
//}
if (num % 10000 == 0) {
tInfo("%d request have been written into disk", num);
}
}
taosResetQitems(qall);
for (int i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
rpcFreeCont(pRpcMsg->pCont);
memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize;
rpcMsg.handle = pRpcMsg->handle;
rpcMsg.code = 0;
rpcSendResponse(&rpcMsg);
void *handle = pRpcMsg->handle;
taosFreeQitem(pRpcMsg);
{
sleep(1);
SRpcMsg nRpcMsg = {0};
nRpcMsg.pCont = rpcMallocCont(msgSize);
nRpcMsg.contLen = msgSize;
nRpcMsg.handle = handle;
nRpcMsg.code = TSDB_CODE_CTG_NOT_READY;
rpcSendResponse(&nRpcMsg);
}
}
}
taosFreeQall(qall);
}
int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) {
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int ret = 0;
if (strcmp(meterId, "michael") == 0) {
*spi = 1;
*encrypt = 0;
strcpy(secret, "mypassword");
strcpy(ckey, "key");
} else if (strcmp(meterId, "jeff") == 0) {
*spi = 0;
*encrypt = 0;
} else {
ret = -1; // user not there
}
return ret;
}
void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
taosWriteQitem(qhandle, pTemp);
}
int main(int argc, char *argv[]) {
SRpcInit rpcInit;
char dataName[20] = "server.data";
taosBlockSIGPIPE();
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 7000;
rpcInit.label = "SER";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processRequestMsg;
rpcInit.sessions = 1000;
rpcInit.idleTime = 2 * 1500;
rpcInit.afp = retrieveAuthInfo;
for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
rpcInit.localPort = atoi(argv[++i]);
} else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
msgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) {
rpcInit.sessions = atoi(argv[++i]);
} else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) {
tsCompressMsgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-w") == 0 && i < argc - 1) {
commit = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
rpcDebugFlag = atoi(argv[++i]);
dDebugFlag = rpcDebugFlag;
uDebugFlag = rpcDebugFlag;
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-p port]: server port number, default is:%d\n", rpcInit.localPort);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of sessions, default is:%d\n", rpcInit.sessions);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
printf(" [-w write]: write received data to file(0, 1, 2), default is:%d\n", commit);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
tsAsyncLog = 0;
rpcInit.connType = TAOS_CONN_SERVER;
taosInitLog("server.log", 100000, 10);
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
tError("failed to start RPC server");
return -1;
}
// sleep(5);
tInfo("RPC server is running, ctrl-c to exit");
if (commit) {
dataFd = open(dataName, O_APPEND | O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO);
if (dataFd < 0) tInfo("failed to open data file, reason:%s", strerror(errno));
}
qhandle = taosOpenQueue();
qset = taosOpenQset();
taosAddIntoQset(qset, qhandle, NULL);
processShellMsg();
if (dataFd >= 0) {
close(dataFd);
remove(dataName);
}
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册