提交 aea2e74e 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

add more code for testing

上级 6ae7357a
...@@ -45,6 +45,12 @@ typedef struct { ...@@ -45,6 +45,12 @@ typedef struct {
uint32_t ip[TSDB_MAX_MPEERS]; uint32_t ip[TSDB_MAX_MPEERS];
} SRpcIpSet; } SRpcIpSet;
typedef struct {
uint32_t sourceIp;
uint16_t sourcePort;
char *user;
} SRpcConnInfo;
typedef struct { typedef struct {
char *localIp; // local IP used char *localIp; // local IP used
uint16_t localPort; // local port uint16_t localPort; // local port
...@@ -55,7 +61,7 @@ typedef struct { ...@@ -55,7 +61,7 @@ typedef struct {
int idleTime; // milliseconds, 0 means idle timer is disabled int idleTime; // milliseconds, 0 means idle timer is disabled
// the following is for client security only // the following is for client security only
char *meterId; // meter ID char *user; // user name
char spi; // security parameter index char spi; // security parameter index
char encrypt; // encrypt algorithm char encrypt; // encrypt algorithm
char *secret; // key for authentication char *secret; // key for authentication
...@@ -78,7 +84,7 @@ void rpcFreeCont(void *pCont); ...@@ -78,7 +84,7 @@ void rpcFreeCont(void *pCont);
void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, char msgType, void *pCont, int contLen, void *ahandle); void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, char msgType, void *pCont, int contLen, void *ahandle);
void rpcSendResponse(void *pConn, int32_t code, void *pCont, int contLen); void rpcSendResponse(void *pConn, int32_t code, void *pCont, int contLen);
void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet); void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet);
void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -32,7 +32,7 @@ typedef struct { ...@@ -32,7 +32,7 @@ typedef struct {
uint32_t uid; // for unique ID inside a client uint32_t uid; // for unique ID inside a client
uint32_t sourceId; // source ID, an index for connection list uint32_t sourceId; // source ID, an index for connection list
uint32_t destId; // destination ID, an index for connection list uint32_t destId; // destination ID, an index for connection list
char meterId[TSDB_UNI_LEN]; char user[TSDB_UNI_LEN];
uint16_t port; // for UDP only, port may be changed uint16_t port; // for UDP only, port may be changed
char empty[1]; // reserved char empty[1]; // reserved
uint8_t msgType; // message type uint8_t msgType; // message type
......
...@@ -72,7 +72,8 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) { ...@@ -72,7 +72,8 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
pTcp->numOfFds--; pTcp->numOfFds--;
if (pTcp->numOfFds < 0) tError("%s number of TCP FDs shall never be negative", pTcp->label); if (pTcp->numOfFds < 0)
tError("%s number of TCP FDs shall never be negative, FD:%p", pTcp->label, pFdObj);
// remove from the FdObject list // remove from the FdObject list
...@@ -91,7 +92,7 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) { ...@@ -91,7 +92,7 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
// notify the upper layer to clean the associated context // notify the upper layer to clean the associated context
if (pFdObj->thandle) (*(pTcp->processData))(NULL, 0, 0, 0, pTcp->shandle, pFdObj->thandle, NULL); if (pFdObj->thandle) (*(pTcp->processData))(NULL, 0, 0, 0, pTcp->shandle, pFdObj->thandle, NULL);
tTrace("%s TCP FD is cleaned up, numOfFds:%d", pTcp->label, pTcp->numOfFds); tTrace("%s TCP is cleaned up, FD:%p numOfFds:%d", pTcp->label, pFdObj, pTcp->numOfFds);
memset(pFdObj, 0, sizeof(STcpFd)); memset(pFdObj, 0, sizeof(STcpFd));
...@@ -302,7 +303,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16 ...@@ -302,7 +303,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16
pthread_mutex_unlock(&(pTcp->mutex)); pthread_mutex_unlock(&(pTcp->mutex));
tTrace("%s TCP connection to ip:%s port:%hu is created, numOfFds:%d", pTcp->label, ip, port, pTcp->numOfFds); tTrace("%s TCP connection to %s:%hu is created, FD:%p numOfFds:%d", pTcp->label, ip, port, pFdObj, pTcp->numOfFds);
return pFdObj; return pFdObj;
} }
......
...@@ -49,14 +49,14 @@ typedef struct { ...@@ -49,14 +49,14 @@ typedef struct {
int connType; int connType;
char label[12]; char label[12];
char meterId[TSDB_UNI_LEN]; // meter ID char user[TSDB_UNI_LEN]; // meter ID
char spi; // security parameter index char spi; // security parameter index
char encrypt; // encrypt algorithm char encrypt; // encrypt algorithm
char secret[TSDB_KEY_LEN]; // secret for the link char secret[TSDB_KEY_LEN]; // secret for the link
char ckey[TSDB_KEY_LEN]; // ciphering key char ckey[TSDB_KEY_LEN]; // ciphering key
void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code); void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code);
int (*afp)(char *meterId, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey);
void (*ufp)(void *ahandle, SRpcIpSet *pIpSet); void (*ufp)(void *ahandle, SRpcIpSet *pIpSet);
void *idPool; // handle to ID pool void *idPool; // handle to ID pool
...@@ -86,7 +86,7 @@ typedef struct _RpcConn { ...@@ -86,7 +86,7 @@ typedef struct _RpcConn {
int sid; // session ID int sid; // session ID
uint32_t ownId; // own link ID uint32_t ownId; // own link ID
uint32_t peerId; // peer link ID uint32_t peerId; // peer link ID
char meterId[TSDB_UNI_LEN]; // user ID for the link char user[TSDB_UNI_LEN]; // user ID for the link
char spi; // security parameter index char spi; // security parameter index
char encrypt; // encryption, 0:1 char encrypt; // encryption, 0:1
char secret[TSDB_KEY_LEN]; // secret for the link char secret[TSDB_KEY_LEN]; // secret for the link
...@@ -95,7 +95,7 @@ typedef struct _RpcConn { ...@@ -95,7 +95,7 @@ typedef struct _RpcConn {
uint32_t peerUid; // peer UID uint32_t peerUid; // peer UID
uint32_t peerIp; // peer IP uint32_t peerIp; // peer IP
uint16_t peerPort; // peer port uint16_t peerPort; // peer port
char peerIpstr[20]; // peer IP string char peerIpstr[TSDB_IPv4ADDR_LEN]; // peer IP string
uint16_t tranId; // outgoing transcation ID, for build message uint16_t tranId; // outgoing transcation ID, for build message
uint16_t outTranId; // outgoing transcation ID uint16_t outTranId; // outgoing transcation ID
uint16_t inTranId; // transcation ID for incoming msg uint16_t inTranId; // transcation ID for incoming msg
...@@ -160,8 +160,8 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) ...@@ -160,8 +160,8 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort)
static void rpcCloseConn(void *thandle); static void rpcCloseConn(void *thandle);
static SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet); static SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet);
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hashstr); static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *hashstr); static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *user, char *hashstr);
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
...@@ -198,7 +198,7 @@ void *rpcOpen(SRpcInit *pInit) { ...@@ -198,7 +198,7 @@ void *rpcOpen(SRpcInit *pInit) {
pRpc->localPort = pInit->localPort; pRpc->localPort = pInit->localPort;
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
pRpc->sessions = pInit->sessions; pRpc->sessions = pInit->sessions;
if (pInit->meterId) strcpy(pRpc->meterId, pInit->meterId); if (pInit->user) strcpy(pRpc->user, pInit->user);
if (pInit->secret) strcpy(pRpc->secret, pInit->secret); if (pInit->secret) strcpy(pRpc->secret, pInit->secret);
if (pInit->ckey) strcpy(pRpc->ckey, pInit->ckey); if (pInit->ckey) strcpy(pRpc->ckey, pInit->ckey);
pRpc->spi = pInit->spi; pRpc->spi = pInit->spi;
...@@ -263,7 +263,7 @@ void rpcClose(void *param) { ...@@ -263,7 +263,7 @@ void rpcClose(void *param) {
(*taosCleanUpConn[pRpc->connType])(pRpc->shandle); (*taosCleanUpConn[pRpc->connType])(pRpc->shandle);
for (int i = 0; i < pRpc->sessions; ++i) { for (int i = 0; i < pRpc->sessions; ++i) {
if (pRpc->connList[i].meterId[0]) { if (pRpc->connList[i].user[0]) {
rpcCloseConn((void *)(pRpc->connList + i)); rpcCloseConn((void *)(pRpc->connList + i));
} }
} }
...@@ -292,8 +292,10 @@ void *rpcMallocCont(int size) { ...@@ -292,8 +292,10 @@ void *rpcMallocCont(int size) {
} }
void rpcFreeCont(void *cont) { void rpcFreeCont(void *cont) {
char *msg = ((char *)cont) - sizeof(SRpcHead); if ( cont ) {
free(msg); char *msg = ((char *)cont) - sizeof(SRpcHead);
free(msg);
}
} }
void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, int contLen, void *ahandle) { void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, int contLen, void *ahandle) {
...@@ -333,7 +335,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { ...@@ -333,7 +335,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
pthread_mutex_lock(&pRpc->mutex); pthread_mutex_lock(&pRpc->mutex);
if ( pConn->inType == 0 || pConn->meterId[0] == 0 ) { if ( pConn->inType == 0 || pConn->user[0] == 0 ) {
tTrace("%s %p, connection is already released, rsp wont be sent", pRpc->label, pConn); tTrace("%s %p, connection is already released, rsp wont be sent", pRpc->label, pConn);
pthread_mutex_lock(&pRpc->mutex); pthread_mutex_lock(&pRpc->mutex);
return; return;
...@@ -350,7 +352,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { ...@@ -350,7 +352,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
pHead->destId = pConn->peerId; pHead->destId = pConn->peerId;
pHead->uid = 0; pHead->uid = 0;
pHead->code = htonl(code); pHead->code = htonl(code);
memcpy(pHead->meterId, pConn->meterId, tListLen(pHead->meterId)); memcpy(pHead->user, pConn->user, tListLen(pHead->user));
// set pConn parameters // set pConn parameters
pConn->inType = 0; pConn->inType = 0;
...@@ -383,6 +385,15 @@ void rpcSendRedirectRsp(void *thandle, SRpcIpSet *pIpSet) { ...@@ -383,6 +385,15 @@ void rpcSendRedirectRsp(void *thandle, SRpcIpSet *pIpSet) {
return; return;
} }
void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
SRpcConn *pConn = (SRpcConn *)thandle;
SRpcInfo *pRpc = pConn->pRpc;
pInfo->sourceIp = pConn->peerIp;
pInfo->sourcePort = pConn->peerPort;
strcpy(pInfo->user, pConn->user);
}
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) { static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) {
SRpcConn *pConn; SRpcConn *pConn;
...@@ -392,16 +403,16 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) ...@@ -392,16 +403,16 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort)
strcpy(pConn->peerIpstr, peerIpStr); strcpy(pConn->peerIpstr, peerIpStr);
pConn->peerIp = inet_addr(peerIpStr); pConn->peerIp = inet_addr(peerIpStr);
pConn->peerPort = peerPort; pConn->peerPort = peerPort;
strcpy(pConn->meterId, pRpc->meterId); strcpy(pConn->user, pRpc->user);
if (taosOpenConn[pRpc->connType]) { if (taosOpenConn[pRpc->connType]) {
pConn->chandle = (*taosOpenConn[pRpc->connType])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort); pConn->chandle = (*taosOpenConn[pRpc->connType])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort);
if (pConn->chandle) { if (pConn->chandle) {
tTrace("%s %p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label, tTrace("%s %p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label,
pConn, pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort); pConn, pConn->sid, pRpc->user, pConn->peerIpstr, pConn->peerPort, pConn->localPort);
} else { } else {
tError("%s %p, failed to set up nw connection to ip:%s:%hu", pRpc->label, pConn, tError("%s %p, failed to set up connection to ip:%s:%hu", pRpc->label, pConn,
pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort); pConn->peerIpstr, pConn->peerPort);
terrno = TSDB_CODE_NETWORK_UNAVAIL; terrno = TSDB_CODE_NETWORK_UNAVAIL;
rpcCloseConn(pConn); rpcCloseConn(pConn);
pConn = NULL; pConn = NULL;
...@@ -418,14 +429,14 @@ static void rpcCloseConn(void *thandle) { ...@@ -418,14 +429,14 @@ static void rpcCloseConn(void *thandle) {
pthread_mutex_lock(&pRpc->mutex); pthread_mutex_lock(&pRpc->mutex);
if (pConn->meterId[0]) { if (pConn->user[0]) {
pConn->meterId[0] = 0; pConn->user[0] = 0;
if (taosCloseConn[pRpc->connType]) (*taosCloseConn[pRpc->connType])(pConn->chandle); if (taosCloseConn[pRpc->connType]) (*taosCloseConn[pRpc->connType])(pConn->chandle);
taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pTimer);
taosTmrStopA(&pConn->pIdleTimer); taosTmrStopA(&pConn->pIdleTimer);
if ( pRpc->connType == TAOS_CONN_UDPS || TAOS_CONN_TCPS) { if ( pRpc->connType == TAOS_CONN_UDPS || pRpc->connType == TAOS_CONN_TCPS) {
char hashstr[40] = {0}; char hashstr[40] = {0};
sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId); sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId);
taosDeleteStrHash(pRpc->hash, hashstr); taosDeleteStrHash(pRpc->hash, hashstr);
...@@ -471,7 +482,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { ...@@ -471,7 +482,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
return pConn; return pConn;
} }
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hashstr) { static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr) {
SRpcConn *pConn = NULL; SRpcConn *pConn = NULL;
// check if it is already allocated // check if it is already allocated
...@@ -486,13 +497,13 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash ...@@ -486,13 +497,13 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash
} else { } else {
pConn = pRpc->connList + sid; pConn = pRpc->connList + sid;
memset(pConn, 0, sizeof(SRpcConn)); memset(pConn, 0, sizeof(SRpcConn));
memcpy(pConn->meterId, meterId, tListLen(pConn->meterId)); memcpy(pConn->user, user, tListLen(pConn->user));
pConn->pRpc = pRpc; pConn->pRpc = pRpc;
pConn->sid = sid; pConn->sid = sid;
pConn->tranId = (uint16_t)(rand() & 0xFFFF); pConn->tranId = (uint16_t)(rand() & 0xFFFF);
pConn->ownId = htonl(pConn->sid); pConn->ownId = htonl(pConn->sid);
if (pRpc->afp && (*pRpc->afp)(meterId, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey)) { if (pRpc->afp && (*pRpc->afp)(user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey)) {
tWarn("%s %p, meterId not there", pRpc->label, pConn); tWarn("%s %p, user not there", pRpc->label, pConn);
taosFreeId(pRpc->idPool, sid); // sid shall be released taosFreeId(pRpc->idPool, sid); // sid shall be released
terrno = TSDB_CODE_INVALID_USER; terrno = TSDB_CODE_INVALID_USER;
pConn = NULL; pConn = NULL;
...@@ -501,24 +512,24 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash ...@@ -501,24 +512,24 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash
if (pConn) { if (pConn) {
taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn);
tTrace("%s %p, rpc connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid, pConn->meterId); tTrace("%s %p, rpc connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid, pConn->user);
} }
return pConn; return pConn;
} }
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *hashstr) { static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *user, char *hashstr) {
SRpcConn *pConn = NULL; SRpcConn *pConn = NULL;
if (sid) { if (sid) {
pConn = pRpc->connList + sid; pConn = pRpc->connList + sid;
} else { } else {
pConn = rpcAllocateServerConn(pRpc, meterId, hashstr); pConn = rpcAllocateServerConn(pRpc, user, hashstr);
} }
if (pConn) { if (pConn) {
if (memcmp(pConn->meterId, meterId, tListLen(pConn->meterId)) != 0) { if (memcmp(pConn->user, user, tListLen(pConn->user)) != 0) {
tTrace("%s %p, meterId:%s is not matched, received:%s", pRpc->label, pConn, pConn->meterId, meterId); tTrace("%s %p, user:%s is not matched, received:%s", pRpc->label, pConn, pConn->user, user);
terrno = TSDB_CODE_MISMATCHED_METER_ID; terrno = TSDB_CODE_MISMATCHED_METER_ID;
pConn = NULL; pConn = NULL;
} }
...@@ -530,7 +541,7 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *has ...@@ -530,7 +541,7 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *has
SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet) { SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet) {
SRpcConn *pConn; SRpcConn *pConn;
pConn = rpcGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->meterId); pConn = rpcGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->user);
if ( pConn == NULL ) { if ( pConn == NULL ) {
char ipstr[20] = {0}; char ipstr[20] = {0};
tinet_ntoa(ipstr, ipSet.ip[ipSet.index]); tinet_ntoa(ipstr, ipSet.ip[ipSet.index]);
...@@ -654,7 +665,7 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int ...@@ -654,7 +665,7 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int
} }
if (sid == 0) sprintf(hashstr, "%x:%x:%x", ip, pHead->uid, pHead->sourceId); if (sid == 0) sprintf(hashstr, "%x:%x:%x", ip, pHead->uid, pHead->sourceId);
pConn = rpcGetConnObj(pRpc, sid, pHead->meterId, hashstr); pConn = rpcGetConnObj(pRpc, sid, pHead->user, hashstr);
if (pConn == NULL ) return terrno; if (pConn == NULL ) return terrno;
*ppConn = pConn; *ppConn = pConn;
...@@ -687,6 +698,9 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int ...@@ -687,6 +698,9 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int
static void rpcProcessBrokenLink(SRpcConn *pConn) { static void rpcProcessBrokenLink(SRpcConn *pConn) {
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
tTrace("%s %p, link is broken", pRpc->label, pConn);
pConn->chandle = NULL;
if (pConn->outType) { if (pConn->outType) {
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;
pContext->code = TSDB_CODE_NETWORK_UNAVAIL; pContext->code = TSDB_CODE_NETWORK_UNAVAIL;
...@@ -770,7 +784,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -770,7 +784,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
int32_t code = pHead->code; int32_t code = pHead->code;
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;
pConn->pContext = NULL; pConn->pContext = NULL;
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId); rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->user);
if (code == TSDB_CODE_REDIRECT) { if (code == TSDB_CODE_REDIRECT) {
pContext->redirect = 1; pContext->redirect = 1;
...@@ -803,7 +817,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { ...@@ -803,7 +817,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
pHead->sourceId = pConn->ownId; pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId; pHead->destId = pConn->peerId;
pHead->uid = 0; pHead->uid = 0;
memcpy(pHead->meterId, pConn->meterId, tListLen(pHead->meterId)); memcpy(pHead->user, pConn->user, tListLen(pHead->user));
pHead->code = htonl(code); pHead->code = htonl(code);
rpcSendMsgToPeer(pConn, msg, 0); rpcSendMsgToPeer(pConn, msg, 0);
...@@ -827,7 +841,7 @@ static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint ...@@ -827,7 +841,7 @@ static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint
pReplyHead->tranId = pRecvHead->tranId; pReplyHead->tranId = pRecvHead->tranId;
pReplyHead->sourceId = 0; pReplyHead->sourceId = 0;
pReplyHead->destId = pRecvHead->sourceId; pReplyHead->destId = pRecvHead->sourceId;
memcpy(pReplyHead->meterId, pRecvHead->meterId, tListLen(pReplyHead->meterId)); memcpy(pReplyHead->user, pRecvHead->user, tListLen(pReplyHead->user));
pReplyHead->code = htonl(code); pReplyHead->code = htonl(code);
msgLen = sizeof(SRpcHead); msgLen = sizeof(SRpcHead);
...@@ -874,7 +888,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -874,7 +888,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pHead->destId = pConn->peerId; pHead->destId = pConn->peerId;
pHead->port = 0; pHead->port = 0;
pHead->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid()); pHead->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid());
memcpy(pHead->meterId, pConn->meterId, tListLen(pHead->meterId)); memcpy(pHead->user, pConn->user, tListLen(pHead->user));
// set the connection parameters // set the connection parameters
pConn->outType = msgType; pConn->outType = msgType;
...@@ -886,7 +900,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -886,7 +900,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pthread_mutex_unlock(&pRpc->mutex); pthread_mutex_unlock(&pRpc->mutex);
rpcSendMsgToPeer(pConn, msg, msgLen); rpcSendMsgToPeer(pConn, msg, msgLen);
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); //taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
} }
static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
...@@ -921,6 +935,8 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { ...@@ -921,6 +935,8 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
static void rpcProcessConnError(void *param, void *id) { static void rpcProcessConnError(void *param, void *id) {
SRpcReqContext *pContext = (SRpcReqContext *)param; SRpcReqContext *pContext = (SRpcReqContext *)param;
SRpcInfo *pRpc = pContext->pRpc; SRpcInfo *pRpc = pContext->pRpc;
tTrace("%s connection error happens", pRpc->label);
if ( pContext->numOfTry >= pContext->ipSet.numOfIps ) { if ( pContext->numOfTry >= pContext->ipSet.numOfIps ) {
rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg
...@@ -940,7 +956,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { ...@@ -940,7 +956,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
pthread_mutex_lock(&pRpc->mutex); pthread_mutex_lock(&pRpc->mutex);
if (pConn->outType && pConn->meterId[0]) { if (pConn->outType && pConn->user[0]) {
tTrace("%s %p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]); tTrace("%s %p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]);
pConn->pTimer = NULL; pConn->pTimer = NULL;
pConn->retry++; pConn->retry++;
...@@ -962,8 +978,8 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { ...@@ -962,8 +978,8 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
pthread_mutex_unlock(&pRpc->mutex); pthread_mutex_unlock(&pRpc->mutex);
pConn->pContext->code = TSDB_CODE_NETWORK_UNAVAIL; if (reportDisc && pConn->pContext) {
if (reportDisc) { pConn->pContext->code = TSDB_CODE_NETWORK_UNAVAIL;
rpcProcessConnError(pConn->pContext, NULL); rpcProcessConnError(pConn->pContext, NULL);
rpcCloseConn(pConn); rpcCloseConn(pConn);
} }
...@@ -973,7 +989,7 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { ...@@ -973,7 +989,7 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) {
SRpcConn *pConn = (SRpcConn *)param; SRpcConn *pConn = (SRpcConn *)param;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
if (pConn->meterId[0]) { if (pConn->user[0]) {
tTrace("%s %p, close the connection since no activity", pRpc->label, pConn); tTrace("%s %p, close the connection since no activity", pRpc->label, pConn);
rpcCloseConn(pConn); rpcCloseConn(pConn);
} else { } else {
...@@ -987,7 +1003,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { ...@@ -987,7 +1003,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) {
pthread_mutex_lock(&pRpc->mutex); pthread_mutex_lock(&pRpc->mutex);
if (pConn->inType && pConn->meterId[0]) { if (pConn->inType && pConn->user[0]) {
tTrace("%s %p, progress timer expired, send progress", pRpc->label, pConn); tTrace("%s %p, progress timer expired, send progress", pRpc->label, pConn);
rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
taosTmrReset(rpcProcessProgressTimer, tsRpcTimer<<pConn->retry, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessProgressTimer, tsRpcTimer<<pConn->retry, pConn, pRpc->tmrCtrl, &pConn->pTimer);
...@@ -1015,7 +1031,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { ...@@ -1015,7 +1031,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
char *buf = malloc (contLen + overhead + 8); // 8 extra bytes char *buf = malloc (contLen + overhead + 8); // 8 extra bytes
if (buf == NULL) { if (buf == NULL) {
tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno)); tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen);
return contLen; return contLen;
} }
...@@ -1033,7 +1049,6 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { ...@@ -1033,7 +1049,6 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
pHead->comp = 1; pHead->comp = 1;
tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen); tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen);
finalLen = compLen + overhead; finalLen = compLen + overhead;
} else { } else {
finalLen = contLen; finalLen = contLen;
...@@ -1055,20 +1070,20 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { ...@@ -1055,20 +1070,20 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
int contLen = htonl(pComp->contLen); int contLen = htonl(pComp->contLen);
// prepare the temporary buffer to decompress message // prepare the temporary buffer to decompress message
char *buf = rpcMallocCont(contLen); pNewHead = (SRpcHead *)malloc(contLen + RPC_MSG_OVERHEAD);
if (buf) { if (pNewHead) {
pNewHead = rpcHeadFromCont(buf);
int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead; int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead;
int32_t originalLen = LZ4_decompress_safe((const char*)(pCont + overhead), buf, compLen, contLen); int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char *)pNewHead->content, compLen, contLen);
assert(originalLen == contLen); assert(origLen == contLen);
memcpy(pNewHead, pHead, sizeof(SRpcHead)); memcpy(pNewHead, pHead, sizeof(SRpcHead));
pNewHead->msgLen = rpcMsgLenFromCont(originalLen); pNewHead->msgLen = rpcMsgLenFromCont(origLen);
free(pHead); // free the compressed message buffer free(pHead); // free the compressed message buffer
pHead = pNewHead; pHead = pNewHead;
tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen);
} else { } else {
tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno)); tError("failed to allocate memory to decompress msg, contLen:%d", contLen);
} }
} }
......
...@@ -101,8 +101,8 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) { ...@@ -101,8 +101,8 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) {
// notify the upper layer, so it will clean the associated context // notify the upper layer, so it will clean the associated context
if (pFdObj->thandle) (*(pThreadObj->processData))(NULL, 0, 0, 0, pThreadObj->shandle, pFdObj->thandle, NULL); if (pFdObj->thandle) (*(pThreadObj->processData))(NULL, 0, 0, 0, pThreadObj->shandle, pFdObj->thandle, NULL);
tTrace("%s TCP thread:%d, FD is cleaned up, numOfFds:%d", pThreadObj->label, pThreadObj->threadId, tTrace("%s TCP thread:%d, FD:%p is cleaned up, numOfFds:%d", pThreadObj->label, pThreadObj->threadId,
pThreadObj->numOfFds); pFdObj, pThreadObj->numOfFds);
memset(pFdObj, 0, sizeof(SFdObj)); memset(pFdObj, 0, sizeof(SFdObj));
...@@ -292,8 +292,8 @@ void taosAcceptTcpConnection(void *arg) { ...@@ -292,8 +292,8 @@ void taosAcceptTcpConnection(void *arg) {
pthread_mutex_unlock(&(pThreadObj->threadMutex)); pthread_mutex_unlock(&(pThreadObj->threadMutex));
tTrace("%s TCP thread:%d, a new connection, ip:%s port:%hu, numOfFds:%d", pServerObj->label, pThreadObj->threadId, tTrace("%s TCP thread:%d, a new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label,
pFdObj->ipstr, pFdObj->port, pThreadObj->numOfFds); pThreadObj->threadId, pFdObj->ipstr, pFdObj->port, pFdObj, pThreadObj->numOfFds);
// pick up next thread for next connection // pick up next thread for next connection
threadId++; threadId++;
......
...@@ -13,58 +13,189 @@ ...@@ -13,58 +13,189 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
//#define _DEFAULT_SOURCE #include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <pthread.h>
#include <errno.h>
#include <signal.h>
#include "os.h" #include "os.h"
#include "tlog.h" #include "tlog.h"
#include "trpc.h" #include "trpc.h"
#include "taoserror.h" #include "taoserror.h"
#include <stdint.h> #include <stdint.h>
#include <unistd.h>
void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code) { typedef struct {
dPrint("response is received, type:%d, contLen:%d code:%x:%s", type, contLen, code, tstrerror(code)); int index;
SRpcIpSet ipSet;
int num;
int numOfReqs;
int msgSize;
sem_t rspSem;
sem_t *pOverSem;
pthread_t thread;
void *pRpc;
} SInfo;
void processResponse(char type, void *pCont, int contLen, void *ahandle, int32_t code) {
SInfo *pInfo = (SInfo *)ahandle;
tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, type, contLen, code);
if (pCont) rpcFreeCont(pCont);
sem_post(&pInfo->rspSem);
} }
void processUpdate(void *handle, SRpcIpSet *pIpSet) { void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) {
dPrint("ip set is changed, index:%d", pIpSet->index); SInfo *pInfo = (SInfo *)handle;
tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->index);
pInfo->ipSet = *pIpSet;
} }
int32_t main(int32_t argc, char *argv[]) { int tcount = 0;
taosInitLog("client.log", 100000, 10); void *sendRequest(void *param) {
dPrint("unit test for rpc module"); SInfo *pInfo = (SInfo *)param;
char *cont;
tTrace("thread:%d, start to send request", pInfo->index);
while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
pInfo->num++;
cont = rpcMallocCont(pInfo->msgSize);
tTrace("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest(pInfo->pRpc, &pInfo->ipSet, 1, cont, pInfo->msgSize, pInfo);
if ( pInfo->num % 20000 == 0 )
tPrint("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
sem_wait(&pInfo->rspSem);
}
SRpcInit rpcInit; tTrace("thread:%d, it is over", pInfo->index);
tcount++;
return NULL;
}
int main(int argc, char *argv[]) {
SRpcInit rpcInit;
SRpcIpSet ipSet;
int msgSize = 128;
int numOfReqs = 0;
int appThreads = 1;
char socketType[20] = "udp";
char serverIp[40] = "127.0.0.1";
struct timeval systemTime;
int64_t startTime, endTime;
pthread_attr_t thattr;
// server info
ipSet.numOfIps = 1;
ipSet.index = 0;
ipSet.port = 7000;
ipSet.ip[0] = inet_addr(serverIp);
ipSet.ip[1] = inet_addr("192.168.0.1");
// client info
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = "0.0.0.0"; rpcInit.localIp = "0.0.0.0";
rpcInit.localPort = 0; rpcInit.localPort = 0;
rpcInit.label = "APP"; rpcInit.label = "APP";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = processMsg; rpcInit.cfp = processResponse;
rpcInit.ufp = processUpdate; rpcInit.ufp = processUpdateIpSet;
rpcInit.sessions = 1000; rpcInit.sessions = 100;
rpcInit.connType = TAOS_CONN_UDPC;
rpcInit.idleTime = 2000; rpcInit.idleTime = 2000;
rpcInit.meterId = "jefftao"; rpcInit.user = "michael";
rpcInit.secret = "password"; rpcInit.secret = "mypassword";
rpcInit.ckey = "key"; rpcInit.ckey = "key";
for (int i=1; i<argc; ++i) {
if ( strcmp(argv[i], "-c")==0 && i < argc-1 ) {
strcpy(socketType, argv[++i]);
} else if (strcmp(argv[i], "-p")==0 && i < argc-1) {
ipSet.port = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
ipSet.ip[0] = inet_addr(argv[++i]);
} else if (strcmp(argv[i], "-l")==0 && i < argc-1) {
strcpy(rpcInit.localIp, argv[++i]);
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
msgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
rpcInit.sessions = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
numOfReqs = atoi(argv[++i]);
} else if (strcmp(argv[i], "-a")==0 && i < argc-1) {
appThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
rpcDebugFlag = atoi(argv[++i]);
} else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
tsCompressMsgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-c ctype]: connection type:udp or tpc, default is:%s\n", socketType);
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
printf(" [-p port]: server port number, default is:%d\n", ipSet.port);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
printf(" [-l localIp]: local IP address, default is:%s\n", rpcInit.localIp);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
rpcInit.connType = strcasecmp(socketType, "udp") == 0 ? TAOS_CONN_UDPC : TAOS_CONN_TCPC;
taosInitLog("client.log", 100000, 10);
void *pRpc = rpcOpen(&rpcInit); void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) { if (pRpc == NULL) {
dError("failed to initialize rpc"); dError("failed to initialize RPC");
return -1; return -1;
} }
SRpcIpSet ipSet; tPrint("client is initialized");
ipSet.numOfIps = 2;
ipSet.index = 0; gettimeofday(&systemTime, NULL);
ipSet.port = 7000; startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
ipSet.ip[0] = inet_addr("192.168.0.1");
ipSet.ip[1] = inet_addr("127.0.0.1"); SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo)*appThreads);
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
for (int i=0; i<appThreads; ++i) {
pInfo->index = i;
pInfo->ipSet = ipSet;
pInfo->numOfReqs = numOfReqs;
pInfo->msgSize = msgSize;
sem_init(&pInfo->rspSem, 0, 0);
pInfo->pRpc = pRpc;
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
pInfo++;
}
do {
usleep(1);
} while ( tcount < appThreads);
gettimeofday(&systemTime, NULL);
endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
float usedTime = (endTime - startTime)/1000.0; // mseconds
void *cont = rpcMallocCont(100); tPrint("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads);
rpcSendRequest(pRpc, &ipSet, 1, cont, 100, 1); tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000*numOfReqs*appThreads/usedTime, msgSize);
getchar(); taosCloseLog();
return 0; return 0;
} }
......
...@@ -19,12 +19,33 @@ ...@@ -19,12 +19,33 @@
#include "trpc.h" #include "trpc.h"
#include <stdint.h> #include <stdint.h>
void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code) { int msgSize = 128;
dPrint("request is received, type:%d, contLen:%d", type, contLen); int commit = 0;
void *rsp = rpcMallocCont(128); int dataFd = -1;
void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32_t code) {
static int num = 0;
tTrace("request is received, type:%d, contLen:%d", type, contLen);
if (dataFd >=0)
write(dataFd, pCont, contLen);
if (commit >=2) {
++num;
if ( fsync(dataFd) < 0 ) {
tPrint("failed to flush data to file, reason:%s", strerror(errno));
}
if (num % 10000 == 0) {
tPrint("%d request have been written into disk", num);
}
}
void *rsp = rpcMallocCont(msgSize);
//rpcSendResponse(ahandle, 1, rsp, 128); rpcSendResponse(thandle, 1, rsp, msgSize);
/*
SRpcIpSet ipSet; SRpcIpSet ipSet;
ipSet.numOfIps = 1; ipSet.numOfIps = 1;
ipSet.index = 0; ipSet.index = 0;
...@@ -32,46 +53,87 @@ void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code ...@@ -32,46 +53,87 @@ void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code
ipSet.ip[0] = inet_addr("192.168.0.2"); ipSet.ip[0] = inet_addr("192.168.0.2");
rpcSendRedirectRsp(ahandle, &ipSet); rpcSendRedirectRsp(ahandle, &ipSet);
*/
rpcFreeCont(pCont); rpcFreeCont(pCont);
} }
int32_t main(int32_t argc, char *argv[]) { int main(int argc, char *argv[]) {
taosInitLog("server.log", 100000, 10);
dPrint("unit test for rpc module");
SRpcInit rpcInit; SRpcInit rpcInit;
char socketType[20] = "udp";
char dataName[20] = "server.data";
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = "0.0.0.0"; rpcInit.localIp = "0.0.0.0";
rpcInit.localPort = 7000; rpcInit.localPort = 7000;
rpcInit.label = "APP"; rpcInit.label = "SER";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = processMsg; rpcInit.cfp = processRequestMsg;
rpcInit.sessions = 1000; rpcInit.sessions = 1000;
rpcInit.connType = TAOS_CONN_UDPS;
rpcInit.idleTime = 2000; rpcInit.idleTime = 2000;
rpcInit.meterId = "jefftao";
rpcInit.secret = "password"; for (int i=1; i<argc; ++i) {
rpcInit.ckey = "key"; if ( strcmp(argv[i], "-c")==0 && i < argc-1 ) {
strcpy(socketType, argv[++i]);
} else if (strcmp(argv[i], "-p")==0 && i < argc-1) {
rpcInit.localPort = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i")==0 && i < argc-1) {
strcpy(rpcInit.localIp, argv[++i]);
} else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
msgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
rpcInit.sessions = atoi(argv[++i]);
} else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
tsCompressMsgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-w")==0 && i < argc-1) {
commit = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
rpcDebugFlag = atoi(argv[++i]);
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-c ctype]: connection type:udp or tcp, default is:%s\n", socketType);
printf(" [-i ip]: server IP address, default is:%s\n", rpcInit.localIp);
printf(" [-p port]: server port number, default is:%d\n", rpcInit.localPort);
printf(" [-t threads]: number of threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of sessions, default is:%d\n", rpcInit.sessions);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
printf(" [-w write]: write received data to file(0, 1, 2), default is:%d\n", commit);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
rpcInit.connType = strcasecmp(socketType, "udp") == 0 ? TAOS_CONN_UDPS : TAOS_CONN_TCPS;
taosInitLog("server.log", 100000, 10);
void *pRpc = rpcOpen(&rpcInit); void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) { if (pRpc == NULL) {
dError("failed to initialize rpc"); tError("failed to start RPC server");
return -1; return -1;
} }
/* tPrint("RPC server is running, ctrl-c to exit");
SRpcIpSet ipSet;
ipSet.numOfIps = 2;
ipSet.index = 0;
ipSet.ip[0] = inet_addr("127.0.0.1");
ipSet.ip[1] = inet_addr("192.168.0.1");
*/
dPrint("server is running..."); if (commit) {
dataFd = open(dataName, O_APPEND | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
if (dataFd<0)
tPrint("failed to open data file, reason:%s", strerror(errno));
}
// loop forever
while(1) {
sleep(1);
}
getchar(); if (dataFd >= 0) {
close(dataFd);
remove(dataName);
}
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册