未验证 提交 7fc333c0 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #10149 from taosdata/feature/transImpl

enhance interface
......@@ -134,10 +134,12 @@ typedef struct {
// int16_t numOfTry; // number of try for different servers
// int8_t oldInUse; // server EP inUse passed by app
// int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type
int64_t rid; // refId returned by taosAddRef
SRpcMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
int8_t connType; // connection type
int64_t rid; // refId returned by taosAddRef
SRpcMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
char* ip;
uint32_t port;
// SEpSet* pSet; // for synchronous API
......
......@@ -813,8 +813,8 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
SRpcInfo *pRpc = pContext->pRpc;
SEpSet * pEpSet = &pContext->epSet;
pConn =
rpcGetConnFromCache(pRpc->pCache, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, pContext->connType);
pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port,
pContext->connType);
if (pConn == NULL || pConn->user[0] == 0) {
pConn = rpcOpenConn(pRpc, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, pContext->connType);
}
......
......@@ -63,17 +63,41 @@ void rpcFreeCont(void* cont) {
}
free((char*)cont - TRANS_MSG_OVERHEAD);
}
void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
void* rpcReallocCont(void* ptr, int contLen) {
if (ptr == NULL) {
return rpcMallocCont(contLen);
}
char* st = (char*)ptr - TRANS_MSG_OVERHEAD;
int sz = contLen + TRANS_MSG_OVERHEAD;
st = realloc(st, sz);
if (st == NULL) {
return NULL;
}
return st + TRANS_MSG_OVERHEAD;
}
void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) {
SRpcMsg rpcMsg;
memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.contLen = sizeof(SEpSet);
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
if (rpcMsg.pCont == NULL) return;
memcpy(rpcMsg.pCont, pEpSet, sizeof(SEpSet));
rpcMsg.code = TSDB_CODE_RPC_REDIRECT;
rpcMsg.handle = thandle;
rpcSendResponse(&rpcMsg);
}
void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {}
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; }
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; }
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
void rpcCancelRequest(int64_t rid) { return; }
int32_t rpcInit(void) {
// impl later
return -1;
return 0;
}
void rpcCleanup(void) {
......
......@@ -123,9 +123,14 @@ static void clientHandleResp(SCliConn* conn) {
rpcMsg.code = pHead->code;
rpcMsg.msgType = pHead->msgType;
rpcMsg.ahandle = pCtx->ahandle;
tDebug("conn %p handle resp", conn);
(pRpc->cfp)(NULL, &rpcMsg, NULL);
if (pCtx->pSem == NULL) {
tDebug("conn %p handle resp", conn);
(pRpc->cfp)(NULL, &rpcMsg, NULL);
} else {
tDebug("conn %p handle resp", conn);
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
tsem_post(pCtx->pSem);
}
conn->notifyCount += 1;
// buf's mem alread translated to rpcMsg.pCont
......@@ -159,14 +164,20 @@ static void clientHandleExcept(SCliConn* pConn) {
SRpcMsg rpcMsg = {0};
rpcMsg.ahandle = pCtx->ahandle;
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
(pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL);
pConn->notifyCount += 1;
if (pCtx->pSem == NULL) {
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
(pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL);
} else {
memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
// SRpcMsg rpcMsg
tsem_post(pCtx->pSem);
}
destroyCmsg(pMsg);
pConn->data = NULL;
// transDestroyConnCtx(pCtx);
clientConnDestroy(pConn, true);
pConn->notifyCount += 1;
}
static void clientTimeoutCb(uv_timer_t* handle) {
......@@ -463,6 +474,7 @@ static void clientAsyncCb(uv_async_t* handle) {
static void* clientThread(void* arg) {
SCliThrdObj* pThrd = (SCliThrdObj*)arg;
setThreadName("trans-client-work");
uv_run(pThrd->loop, UV_RUN_DEFAULT);
}
......@@ -568,8 +580,8 @@ void taosCloseClient(void* arg) {
}
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
// impl later
char* ip = (char*)(pEpSet->fqdn[pEpSet->inUse]);
uint32_t port = pEpSet->port[pEpSet->inUse];
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
SRpcInfo* pRpc = (SRpcInfo*)shandle;
......@@ -609,4 +621,45 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
// int end = taosGetTimestampUs() - start;
// tError("client sent to rpc, time cost: %d", (int)end);
}
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
SRpcInfo* pRpc = (SRpcInfo*)shandle;
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
pCtx->pTransInst = (SRpcInfo*)shandle;
pCtx->ahandle = pReq->ahandle;
pCtx->msgType = pReq->msgType;
pCtx->ip = strdup(ip);
pCtx->port = port;
pCtx->pSem = calloc(1, sizeof(tsem_t));
pCtx->pRsp = pRsp;
tsem_init(pCtx->pSem, 0, 0);
int64_t index = pRpc->index;
if (pRpc->index++ >= pRpc->numOfThreads) {
pRpc->index = 0;
}
SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
cliMsg->ctx = pCtx;
cliMsg->msg = *pReq;
cliMsg->st = taosGetTimestampUs();
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];
// pthread_mutex_lock(&thrd->msgMtx);
// QUEUE_PUSH(&thrd->msg, &cliMsg->q);
// pthread_mutex_unlock(&thrd->msgMtx);
// int start = taosGetTimestampUs();
transSendAsync(thrd->asyncPool, &(cliMsg->q));
tsem_t* pSem = pCtx->pSem;
tsem_wait(pSem);
tsem_destroy(pSem);
free(pSem);
return;
}
#endif
......@@ -33,6 +33,8 @@ typedef struct SSrvConn {
void* hostThrd;
void* pSrvMsg;
struct sockaddr peername;
// SRpcMsg sendMsg;
// del later
char secured;
......@@ -487,7 +489,13 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_os_fd_t fd;
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
tDebug("conn %p created, fd: %d", pConn, fd);
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
int namelen = sizeof(pConn->peername);
if (0 != uv_tcp_getpeername(pConn->pTcp, &pConn->peername, &namelen)) {
tError("failed to get peer name");
destroyConn(pConn, true);
} else {
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
}
} else {
tDebug("failed to create new connection");
destroyConn(pConn, true);
......@@ -496,6 +504,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
void* acceptThread(void* arg) {
// opt
setThreadName("trans-accept");
SServerObj* srv = (SServerObj*)arg;
uv_run(srv->loop, UV_RUN_DEFAULT);
}
......@@ -548,6 +557,7 @@ static bool addHandleToAcceptloop(void* arg) {
return true;
}
void* workerThread(void* arg) {
setThreadName("trans-worker");
SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
uv_run(pThrd->loop, UV_RUN_DEFAULT);
}
......@@ -723,4 +733,16 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
// uv_async_send(pThrd->workerAsync);
}
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) {
SSrvConn* pConn = thandle;
struct sockaddr* pPeerName = &pConn->peername;
struct sockaddr_in caddr = *(struct sockaddr_in*)(pPeerName);
pInfo->clientIp = (uint32_t)(caddr.sin_addr.s_addr);
pInfo->clientPort = ntohs(caddr.sin_port);
tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
return 0;
}
#endif
......@@ -2,6 +2,7 @@ add_executable(transportTest "")
add_executable(client "")
add_executable(server "")
add_executable(transUT "")
add_executable(syncClient "")
target_sources(transUT
PRIVATE
......@@ -20,6 +21,10 @@ target_sources (server
PRIVATE
"rserver.c"
)
target_sources (syncClient
PRIVATE
"syncClient.c"
)
target_include_directories(transportTest
PUBLIC
......@@ -67,7 +72,6 @@ target_include_directories(transUT
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (server
os
util
......@@ -75,4 +79,17 @@ target_link_libraries (server
gtest_main
transport
)
target_include_directories(syncClient
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (syncClient
os
util
common
gtest_main
transport
)
......@@ -33,7 +33,6 @@ typedef struct {
pthread_t thread;
void * pRpc;
} SInfo;
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SInfo *pInfo = (SInfo *)pMsg->ahandle;
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <sys/time.h>
#include <tep.h>
#include "os.h"
#include "rpcLog.h"
#include "taoserror.h"
#include "tglobal.h"
#include "trpc.h"
#include "tutil.h"
typedef struct {
int index;
SEpSet epSet;
int num;
int numOfReqs;
int msgSize;
tsem_t rspSem;
tsem_t * pOverSem;
pthread_t thread;
void * pRpc;
} SInfo;
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SInfo *pInfo = (SInfo *)pMsg->ahandle;
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
pMsg->code);
if (pEpSet) pInfo->epSet = *pEpSet;
rpcFreeCont(pMsg->pCont);
// tsem_post(&pInfo->rspSem);
tsem_post(&pInfo->rspSem);
}
static int tcount = 0;
static void *sendRequest(void *param) {
SInfo * pInfo = (SInfo *)param;
SRpcMsg rpcMsg = {0};
tDebug("thread:%d, start to send request", pInfo->index);
tDebug("thread:%d, reqs: %d", pInfo->index, pInfo->numOfReqs);
int u100 = 0;
int u500 = 0;
int u1000 = 0;
int u10000 = 0;
SRpcMsg respMsg = {0};
while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
pInfo->num++;
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
rpcMsg.contLen = pInfo->msgSize;
rpcMsg.ahandle = pInfo;
rpcMsg.msgType = 1;
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
int64_t start = taosGetTimestampUs();
rpcSendRecv(pInfo->pRpc, &pInfo->epSet, &rpcMsg, &respMsg);
// rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
// tsem_wait(&pInfo->rspSem);
// wtsem_wait(&pInfo->rspSem);
int64_t end = taosGetTimestampUs() - start;
if (end <= 100) {
u100++;
} else if (end > 100 && end <= 500) {
u500++;
} else if (end > 500 && end < 1000) {
u1000++;
} else {
u10000++;
}
tDebug("recv response succefully");
// usleep(100000000);
}
tError("send and recv sum: %d, %d, %d, %d", u100, u500, u1000, u10000);
tDebug("thread:%d, it is over", pInfo->index);
tcount++;
return NULL;
}
int main(int argc, char *argv[]) {
SRpcInit rpcInit;
SEpSet epSet = {0};
int msgSize = 128;
int numOfReqs = 0;
int appThreads = 1;
char serverIp[40] = "127.0.0.1";
char secret[20] = "mypassword";
struct timeval systemTime;
int64_t startTime, endTime;
pthread_attr_t thattr;
// server info
epSet.inUse = 0;
addEpIntoEpSet(&epSet, serverIp, 7000);
addEpIntoEpSet(&epSet, "192.168.0.1", 7000);
// client info
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "APP";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processResponse;
rpcInit.sessions = 100;
rpcInit.idleTime = 100;
rpcInit.user = "michael";
rpcInit.secret = secret;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.connType = TAOS_CONN_CLIENT;
for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
epSet.eps[0].port = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
tstrncpy(epSet.eps[0].fqdn, argv[++i], sizeof(epSet.eps[0].fqdn));
} else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
msgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) {
rpcInit.sessions = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n") == 0 && i < argc - 1) {
numOfReqs = atoi(argv[++i]);
} else if (strcmp(argv[i], "-a") == 0 && i < argc - 1) {
appThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) {
tsCompressMsgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
rpcInit.user = argv[++i];
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
rpcInit.secret = argv[++i];
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
rpcInit.spi = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
rpcDebugFlag = atoi(argv[++i]);
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
printf(" [-p port]: server port number, default is:%d\n", epSet.eps[0].port);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
taosInitLog("client.log", 100000, 10);
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
tError("failed to initialize RPC");
return -1;
}
tInfo("client is initialized");
tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
gettimeofday(&systemTime, NULL);
startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo) * appThreads);
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
for (int i = 0; i < appThreads; ++i) {
pInfo->index = i;
pInfo->epSet = epSet;
pInfo->numOfReqs = numOfReqs;
pInfo->msgSize = msgSize;
tsem_init(&pInfo->rspSem, 0, 0);
pInfo->pRpc = pRpc;
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
pInfo++;
}
do {
usleep(1);
} while (tcount < appThreads);
gettimeofday(&systemTime, NULL);
endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
float usedTime = (endTime - startTime) / 1000.0f; // mseconds
tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
int ch = getchar();
UNUSED(ch);
taosCloseLog();
return 0;
}
......@@ -15,6 +15,7 @@
#include <gtest/gtest.h>
#include <cstdio>
#include <cstring>
#include "tep.h"
#include "trpc.h"
using namespace std;
......@@ -50,6 +51,25 @@ class TransObj {
trans = rpcOpen(&rpcInit);
return trans != NULL ? true : false;
}
bool sendAndRecv() {
SEpSet epSet = {0};
epSet.inUse = 0;
addEpIntoEpSet(&epSet, "192.168.1.1", 7000);
addEpIntoEpSet(&epSet, "192.168.0.1", 7000);
if (trans == NULL) {
return false;
}
SRpcMsg rpcMsg = {0}, reqMsg = {0};
reqMsg.pCont = rpcMallocCont(10);
reqMsg.contLen = 10;
reqMsg.ahandle = NULL;
rpcSendRecv(trans, &epSet, &reqMsg, &rpcMsg);
int code = rpcMsg.code;
std::cout << tstrerror(code) << std::endl;
return true;
}
bool stop() {
rpcClose(trans);
trans = NULL;
......@@ -75,6 +95,7 @@ class TransEnv : public ::testing::Test {
};
TEST_F(TransEnv, test_start_stop) {
assert(tr->startCli());
assert(tr->sendAndRecv());
assert(tr->stop());
assert(tr->startSrv());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册