提交 676819f3 编写于 作者: S Shengliang Guan

[TD-10430] add transport for compile errors

上级 0d36837e
......@@ -55,8 +55,9 @@ typedef struct {
char secret[TSDB_KEY_LEN]; // secret for the link
char ckey[TSDB_KEY_LEN]; // ciphering key
void (*cfp)(SRpcMsg *, SRpcEpSet *);
int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey);
void *owner;
void (*cfp)(void * owner, SRpcMsg *, SRpcEpSet *);
int (*afp)(void * owner, char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t refCount;
void *idPool; // handle to ID pool
......@@ -259,6 +260,7 @@ void *rpcOpen(const SRpcInit *pInit) {
if (pInit->secret) memcpy(pRpc->secret, pInit->secret, sizeof(pRpc->secret));
if (pInit->ckey) tstrncpy(pRpc->ckey, pInit->ckey, sizeof(pRpc->ckey));
pRpc->spi = pInit->spi;
pRpc->owner = pInit->owner;
pRpc->cfp = pInit->cfp;
pRpc->afp = pInit->afp;
pRpc->refCount = 1;
......@@ -741,7 +743,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
if (pConn->user[0] == 0) {
terrno = TSDB_CODE_RPC_AUTH_REQUIRED;
} else {
terrno = (*pRpc->afp)(pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey);
terrno = (*pRpc->afp)(pRpc->owner, pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey);
}
if (terrno != 0) {
......@@ -1021,7 +1023,7 @@ static void doRpcReportBrokenLinkToServer(void *param, void *id) {
SRpcMsg *pRpcMsg = (SRpcMsg *)(param);
SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle);
SRpcInfo *pRpc = pConn->pRpc;
(*(pRpc->cfp))(pRpcMsg, NULL);
(*(pRpc->cfp))(pRpc->owner, pRpcMsg, NULL);
free(pRpcMsg);
}
static void rpcReportBrokenLinkToServer(SRpcConn *pConn) {
......@@ -1136,7 +1138,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect)
pEpSet = &pContext->epSet;
(*pRpc->cfp)(pMsg, pEpSet);
(*pRpc->cfp)(pRpc->owner, pMsg, pEpSet);
}
// free the request message
......@@ -1160,7 +1162,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
rpcAddRef(pRpc); // add the refCount for requests
// notify the server app
(*(pRpc->cfp))(&rpcMsg, NULL);
(*(pRpc->cfp))(pRpc->owner, &rpcMsg, NULL);
} else {
// it's a response
rpcMsg.handle = pContext;
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(trpc ${SRC})
TARGET_LINK_LIBRARIES(trpc tutil lz4 common)
ADD_SUBDIRECTORY(test)
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/rpc/inc)
IF (TD_LINUX)
LIST(APPEND CLIENT_SRC ./rclient.c)
ADD_EXECUTABLE(rclient ${CLIENT_SRC})
TARGET_LINK_LIBRARIES(rclient trpc)
LIST(APPEND SCLIENT_SRC ./rsclient.c)
ADD_EXECUTABLE(rsclient ${SCLIENT_SRC})
TARGET_LINK_LIBRARIES(rsclient trpc)
LIST(APPEND SERVER_SRC ./rserver.c)
ADD_EXECUTABLE(rserver ${SERVER_SRC})
TARGET_LINK_LIBRARIES(rserver trpc)
ENDIF ()
IF (TD_DARWIN)
LIST(APPEND CLIENT_SRC ./rclient.c)
ADD_EXECUTABLE(rclient ${CLIENT_SRC})
TARGET_LINK_LIBRARIES(rclient trpc)
LIST(APPEND SCLIENT_SRC ./rsclient.c)
ADD_EXECUTABLE(rsclient ${SCLIENT_SRC})
TARGET_LINK_LIBRARIES(rsclient trpc)
LIST(APPEND SERVER_SRC ./rserver.c)
ADD_EXECUTABLE(rserver ${SERVER_SRC})
TARGET_LINK_LIBRARIES(rserver trpc)
ENDIF ()
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tutil.h"
#include "tglobal.h"
#include "rpcLog.h"
#include "trpc.h"
#include "taoserror.h"
typedef struct {
int index;
SRpcEpSet epSet;
int num;
int numOfReqs;
int msgSize;
tsem_t rspSem;
tsem_t *pOverSem;
pthread_t thread;
void *pRpc;
} SInfo;
static void processResponse(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SInfo *pInfo = (SInfo *)pMsg->ahandle;
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code);
if (pEpSet) pInfo->epSet = *pEpSet;
rpcFreeCont(pMsg->pCont);
tsem_post(&pInfo->rspSem);
}
static int tcount = 0;
static void *sendRequest(void *param) {
SInfo *pInfo = (SInfo *)param;
SRpcMsg rpcMsg = {0};
tDebug("thread:%d, start to send request", pInfo->index);
while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
pInfo->num++;
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
rpcMsg.contLen = pInfo->msgSize;
rpcMsg.ahandle = pInfo;
rpcMsg.msgType = 1;
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
if ( pInfo->num % 20000 == 0 )
tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
tsem_wait(&pInfo->rspSem);
}
tDebug("thread:%d, it is over", pInfo->index);
tcount++;
return NULL;
}
int main(int argc, char *argv[]) {
SRpcInit rpcInit;
SRpcEpSet epSet;
int msgSize = 128;
int numOfReqs = 0;
int appThreads = 1;
char serverIp[40] = "127.0.0.1";
char secret[TSDB_KEY_LEN] = "mypassword";
struct timeval systemTime;
int64_t startTime, endTime;
pthread_attr_t thattr;
// server info
epSet.numOfEps = 1;
epSet.inUse = 0;
epSet.port[0] = 7000;
epSet.port[1] = 7000;
strcpy(epSet.fqdn[0], serverIp);
strcpy(epSet.fqdn[1], "192.168.0.1");
// client info
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "APP";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processResponse;
rpcInit.sessions = 100;
rpcInit.idleTime = tsShellActivityTimer*1000;
rpcInit.user = "michael";
rpcInit.secret = secret;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.connType = TAOS_CONN_CLIENT;
for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
epSet.port[0] = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
tstrncpy(epSet.fqdn[0], argv[++i], sizeof(epSet.fqdn[0]));
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
msgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
rpcInit.sessions = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
numOfReqs = atoi(argv[++i]);
} else if (strcmp(argv[i], "-a")==0 && i < argc-1) {
appThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
tsCompressMsgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-u")==0 && i < argc-1) {
rpcInit.user = argv[++i];
} else if (strcmp(argv[i], "-k")==0 && i < argc-1) {
rpcInit.secret = argv[++i];
} else if (strcmp(argv[i], "-spi")==0 && i < argc-1) {
rpcInit.spi = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
rpcDebugFlag = atoi(argv[++i]);
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
printf(" [-p port]: server port number, default is:%d\n", epSet.port[0]);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
taosInitLog("client.log", 100000, 10);
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
tError("failed to initialize RPC");
return -1;
}
tInfo("client is initialized");
tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
gettimeofday(&systemTime, NULL);
startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo)*appThreads);
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
for (int i=0; i<appThreads; ++i) {
pInfo->index = i;
pInfo->epSet = epSet;
pInfo->numOfReqs = numOfReqs;
pInfo->msgSize = msgSize;
tsem_init(&pInfo->rspSem, 0, 0);
pInfo->pRpc = pRpc;
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
pInfo++;
}
do {
usleep(1);
} while ( tcount < appThreads);
gettimeofday(&systemTime, NULL);
endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
float usedTime = (endTime - startTime)/1000.0f; // mseconds
tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads);
tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
int ch = getchar();
UNUSED(ch);
taosCloseLog();
return 0;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tutil.h"
#include "tglobal.h"
#include "rpcLog.h"
#include "trpc.h"
#include "taoserror.h"
typedef struct {
int index;
SRpcEpSet epSet;
int num;
int numOfReqs;
int msgSize;
tsem_t rspSem;
tsem_t *pOverSem;
pthread_t thread;
void *pRpc;
} SInfo;
static int tcount = 0;
static int terror = 0;
static void *sendRequest(void *param) {
SInfo *pInfo = (SInfo *)param;
SRpcMsg rpcMsg, rspMsg;
tDebug("thread:%d, start to send request", pInfo->index);
while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
pInfo->num++;
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
rpcMsg.contLen = pInfo->msgSize;
rpcMsg.handle = pInfo;
rpcMsg.msgType = 1;
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRecv(pInfo->pRpc, &pInfo->epSet, &rpcMsg, &rspMsg);
// handle response
if (rspMsg.code != 0) terror++;
tDebug("thread:%d, rspLen:%d code:%d", pInfo->index, rspMsg.contLen, rspMsg.code);
rpcFreeCont(rspMsg.pCont);
if ( pInfo->num % 20000 == 0 )
tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
}
tDebug("thread:%d, it is over", pInfo->index);
tcount++;
return NULL;
}
int main(int argc, char *argv[]) {
SRpcInit rpcInit;
SRpcEpSet epSet;
int msgSize = 128;
int numOfReqs = 0;
int appThreads = 1;
char serverIp[40] = "127.0.0.1";
char secret[TSDB_KEY_LEN] = "mypassword";
struct timeval systemTime;
int64_t startTime, endTime;
pthread_attr_t thattr;
// server info
epSet.numOfEps = 1;
epSet.inUse = 0;
epSet.port[0] = 7000;
epSet.port[1] = 7000;
strcpy(epSet.fqdn[0], serverIp);
strcpy(epSet.fqdn[1], "192.168.0.1");
// client info
memset(&rpcInit, 0, sizeof(rpcInit));
//rpcInit.localIp = "0.0.0.0";
rpcInit.localPort = 0;
rpcInit.label = "APP";
rpcInit.numOfThreads = 1;
rpcInit.sessions = 100;
rpcInit.idleTime = tsShellActivityTimer*1000;
rpcInit.user = "michael";
rpcInit.secret = secret;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.connType = TAOS_CONN_CLIENT;
for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
epSet.port[0] = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
tstrncpy(epSet.fqdn[0], argv[++i], sizeof(epSet.fqdn[0]));
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
msgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
rpcInit.sessions = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
numOfReqs = atoi(argv[++i]);
} else if (strcmp(argv[i], "-a")==0 && i < argc-1) {
appThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
tsCompressMsgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-u")==0 && i < argc-1) {
rpcInit.user = argv[++i];
} else if (strcmp(argv[i], "-k")==0 && i < argc-1) {
rpcInit.secret = argv[++i];
} else if (strcmp(argv[i], "-spi")==0 && i < argc-1) {
rpcInit.spi = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
rpcDebugFlag = atoi(argv[++i]);
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
printf(" [-p port]: server port number, default is:%d\n", epSet.port[0]);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
taosInitLog("client.log", 100000, 10);
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
tError("failed to initialize RPC");
return -1;
}
tInfo("client is initialized");
gettimeofday(&systemTime, NULL);
startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo)*appThreads);
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
for (int i=0; i<appThreads; ++i) {
pInfo->index = i;
pInfo->epSet = epSet;
pInfo->numOfReqs = numOfReqs;
pInfo->msgSize = msgSize;
tsem_init(&pInfo->rspSem, 0, 0);
pInfo->pRpc = pRpc;
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
pInfo++;
}
do {
usleep(1);
} while ( tcount < appThreads);
gettimeofday(&systemTime, NULL);
endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
float usedTime = (endTime - startTime)/1000.0; // mseconds
tInfo("it takes %.3f mseconds to send %d requests to server, error num:%d", usedTime, numOfReqs*appThreads, terror);
tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
taosCloseLog();
return 0;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#define _DEFAULT_SOURCE
#include "os.h"
#include "tglobal.h"
#include "rpcLog.h"
#include "trpc.h"
#include "tqueue.h"
int msgSize = 128;
int commit = 0;
int dataFd = -1;
void *qhandle = NULL;
void *qset = NULL;
void processShellMsg() {
static int num = 0;
taos_qall qall;
SRpcMsg *pRpcMsg, rpcMsg;
int type;
void *pvnode;
qall = taosAllocateQall();
while (1) {
int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &pvnode);
tDebug("%d shell msgs are received", numOfMsgs);
if (numOfMsgs <= 0) break;
for (int i=0; i<numOfMsgs; ++i) {
taosGetQitem(qall, &type, (void **)&pRpcMsg);
if (dataFd >=0) {
if ( write(dataFd, pRpcMsg->pCont, pRpcMsg->contLen) <0 ) {
tInfo("failed to write data file, reason:%s", strerror(errno));
}
}
}
if (commit >=2) {
num += numOfMsgs;
if ( taosFsync(dataFd) < 0 ) {
tInfo("failed to flush data to file, reason:%s", strerror(errno));
}
if (num % 10000 == 0) {
tInfo("%d request have been written into disk", num);
}
}
taosResetQitems(qall);
for (int i=0; i<numOfMsgs; ++i) {
taosGetQitem(qall, &type, (void **)&pRpcMsg);
rpcFreeCont(pRpcMsg->pCont);
memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize;
rpcMsg.handle = pRpcMsg->handle;
rpcMsg.code = 0;
rpcSendResponse(&rpcMsg);
taosFreeQitem(pRpcMsg);
}
}
taosFreeQall(qall);
}
int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char *ckey) {
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int ret = 0;
if (strcmp(meterId, "michael") == 0) {
*spi = 1;
*encrypt = 0;
strcpy(secret, "mypassword");
strcpy(ckey, "key");
} else if (strcmp(meterId, "jeff") == 0) {
*spi = 0;
*encrypt = 0;
} else {
ret = -1; // user not there
}
return ret;
}
void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp);
}
int main(int argc, char *argv[]) {
SRpcInit rpcInit;
char dataName[20] = "server.data";
taosBlockSIGPIPE();
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 7000;
rpcInit.label = "SER";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processRequestMsg;
rpcInit.sessions = 1000;
rpcInit.idleTime = tsShellActivityTimer*1500;
rpcInit.afp = retrieveAuthInfo;
for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
rpcInit.localPort = atoi(argv[++i]);
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
msgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
rpcInit.sessions = atoi(argv[++i]);
} else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
tsCompressMsgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-w")==0 && i < argc-1) {
commit = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
rpcDebugFlag = atoi(argv[++i]);
dDebugFlag = rpcDebugFlag;
uDebugFlag = rpcDebugFlag;
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-p port]: server port number, default is:%d\n", rpcInit.localPort);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of sessions, default is:%d\n", rpcInit.sessions);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
printf(" [-w write]: write received data to file(0, 1, 2), default is:%d\n", commit);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
tsAsyncLog = 0;
rpcInit.connType = TAOS_CONN_SERVER;
taosInitLog("server.log", 100000, 10);
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
tError("failed to start RPC server");
return -1;
}
tInfo("RPC server is running, ctrl-c to exit");
if (commit) {
dataFd = open(dataName, O_APPEND | O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO);
if (dataFd<0)
tInfo("failed to open data file, reason:%s", strerror(errno));
}
qhandle = taosOpenQueue(sizeof(SRpcMsg));
qset = taosOpenQset();
taosAddIntoQset(qset, qhandle, NULL);
processShellMsg();
if (dataFd >= 0) {
close(dataFd);
remove(dataName);
}
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册