未验证 提交 dd78eabd 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1245 from taosdata/refact/rpc

add more code for testing
......@@ -45,6 +45,12 @@ typedef struct {
uint32_t ip[TSDB_MAX_MPEERS];
} SRpcIpSet;
typedef struct {
uint32_t sourceIp;
uint16_t sourcePort;
char *user;
} SRpcConnInfo;
typedef struct {
char *localIp; // local IP used
uint16_t localPort; // local port
......@@ -55,7 +61,7 @@ typedef struct {
int idleTime; // milliseconds, 0 means idle timer is disabled
// the following is for client security only
char *meterId; // meter ID
char *user; // user name
char spi; // security parameter index
char encrypt; // encrypt algorithm
char *secret; // key for authentication
......@@ -78,7 +84,7 @@ void rpcFreeCont(void *pCont);
void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, char msgType, void *pCont, int contLen, void *ahandle);
void rpcSendResponse(void *pConn, int32_t code, void *pCont, int contLen);
void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet);
void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
#ifdef __cplusplus
}
......
......@@ -32,7 +32,7 @@ typedef struct {
uint32_t uid; // for unique ID inside a client
uint32_t sourceId; // source ID, an index for connection list
uint32_t destId; // destination ID, an index for connection list
char meterId[TSDB_UNI_LEN];
char user[TSDB_UNI_LEN];
uint16_t port; // for UDP only, port may be changed
char empty[1]; // reserved
uint8_t msgType; // message type
......
......@@ -72,7 +72,8 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
pTcp->numOfFds--;
if (pTcp->numOfFds < 0) tError("%s number of TCP FDs shall never be negative", pTcp->label);
if (pTcp->numOfFds < 0)
tError("%s number of TCP FDs shall never be negative, FD:%p", pTcp->label, pFdObj);
// remove from the FdObject list
......@@ -91,7 +92,7 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
// notify the upper layer to clean the associated context
if (pFdObj->thandle) (*(pTcp->processData))(NULL, 0, 0, 0, pTcp->shandle, pFdObj->thandle, NULL);
tTrace("%s TCP FD is cleaned up, numOfFds:%d", pTcp->label, pTcp->numOfFds);
tTrace("%s TCP is cleaned up, FD:%p numOfFds:%d", pTcp->label, pFdObj, pTcp->numOfFds);
memset(pFdObj, 0, sizeof(STcpFd));
......@@ -302,7 +303,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16
pthread_mutex_unlock(&(pTcp->mutex));
tTrace("%s TCP connection to ip:%s port:%hu is created, numOfFds:%d", pTcp->label, ip, port, pTcp->numOfFds);
tTrace("%s TCP connection to %s:%hu is created, FD:%p numOfFds:%d", pTcp->label, ip, port, pFdObj, pTcp->numOfFds);
return pFdObj;
}
......
......@@ -49,14 +49,14 @@ typedef struct {
int connType;
char label[12];
char meterId[TSDB_UNI_LEN]; // meter ID
char user[TSDB_UNI_LEN]; // meter ID
char spi; // security parameter index
char encrypt; // encrypt algorithm
char secret[TSDB_KEY_LEN]; // secret for the link
char ckey[TSDB_KEY_LEN]; // ciphering key
void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code);
int (*afp)(char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey);
void (*ufp)(void *ahandle, SRpcIpSet *pIpSet);
void *idPool; // handle to ID pool
......@@ -86,7 +86,7 @@ typedef struct _RpcConn {
int sid; // session ID
uint32_t ownId; // own link ID
uint32_t peerId; // peer link ID
char meterId[TSDB_UNI_LEN]; // user ID for the link
char user[TSDB_UNI_LEN]; // user ID for the link
char spi; // security parameter index
char encrypt; // encryption, 0:1
char secret[TSDB_KEY_LEN]; // secret for the link
......@@ -95,7 +95,7 @@ typedef struct _RpcConn {
uint32_t peerUid; // peer UID
uint32_t peerIp; // peer IP
uint16_t peerPort; // peer port
char peerIpstr[20]; // peer IP string
char peerIpstr[TSDB_IPv4ADDR_LEN]; // peer IP string
uint16_t tranId; // outgoing transcation ID, for build message
uint16_t outTranId; // outgoing transcation ID
uint16_t inTranId; // transcation ID for incoming msg
......@@ -160,8 +160,8 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort)
static void rpcCloseConn(void *thandle);
static SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet);
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hashstr);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *hashstr);
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *user, char *hashstr);
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
......@@ -198,7 +198,7 @@ void *rpcOpen(SRpcInit *pInit) {
pRpc->localPort = pInit->localPort;
pRpc->afp = pInit->afp;
pRpc->sessions = pInit->sessions;
if (pInit->meterId) strcpy(pRpc->meterId, pInit->meterId);
if (pInit->user) strcpy(pRpc->user, pInit->user);
if (pInit->secret) strcpy(pRpc->secret, pInit->secret);
if (pInit->ckey) strcpy(pRpc->ckey, pInit->ckey);
pRpc->spi = pInit->spi;
......@@ -263,7 +263,7 @@ void rpcClose(void *param) {
(*taosCleanUpConn[pRpc->connType])(pRpc->shandle);
for (int i = 0; i < pRpc->sessions; ++i) {
if (pRpc->connList[i].meterId[0]) {
if (pRpc->connList[i].user[0]) {
rpcCloseConn((void *)(pRpc->connList + i));
}
}
......@@ -292,8 +292,10 @@ void *rpcMallocCont(int size) {
}
void rpcFreeCont(void *cont) {
char *msg = ((char *)cont) - sizeof(SRpcHead);
free(msg);
if ( cont ) {
char *msg = ((char *)cont) - sizeof(SRpcHead);
free(msg);
}
}
void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, int contLen, void *ahandle) {
......@@ -333,7 +335,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
pthread_mutex_lock(&pRpc->mutex);
if ( pConn->inType == 0 || pConn->meterId[0] == 0 ) {
if ( pConn->inType == 0 || pConn->user[0] == 0 ) {
tTrace("%s %p, connection is already released, rsp wont be sent", pRpc->label, pConn);
pthread_mutex_lock(&pRpc->mutex);
return;
......@@ -350,7 +352,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
pHead->destId = pConn->peerId;
pHead->uid = 0;
pHead->code = htonl(code);
memcpy(pHead->meterId, pConn->meterId, tListLen(pHead->meterId));
memcpy(pHead->user, pConn->user, tListLen(pHead->user));
// set pConn parameters
pConn->inType = 0;
......@@ -383,6 +385,15 @@ void rpcSendRedirectRsp(void *thandle, SRpcIpSet *pIpSet) {
return;
}
void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
SRpcConn *pConn = (SRpcConn *)thandle;
SRpcInfo *pRpc = pConn->pRpc;
pInfo->sourceIp = pConn->peerIp;
pInfo->sourcePort = pConn->peerPort;
strcpy(pInfo->user, pConn->user);
}
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) {
SRpcConn *pConn;
......@@ -392,16 +403,16 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort)
strcpy(pConn->peerIpstr, peerIpStr);
pConn->peerIp = inet_addr(peerIpStr);
pConn->peerPort = peerPort;
strcpy(pConn->meterId, pRpc->meterId);
strcpy(pConn->user, pRpc->user);
if (taosOpenConn[pRpc->connType]) {
pConn->chandle = (*taosOpenConn[pRpc->connType])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort);
if (pConn->chandle) {
tTrace("%s %p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label,
pConn, pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort, pConn->localPort);
pConn, pConn->sid, pRpc->user, pConn->peerIpstr, pConn->peerPort, pConn->localPort);
} else {
tError("%s %p, failed to set up nw connection to ip:%s:%hu", pRpc->label, pConn,
pConn->sid, pRpc->meterId, pConn->peerIpstr, pConn->peerPort);
tError("%s %p, failed to set up connection to ip:%s:%hu", pRpc->label, pConn,
pConn->peerIpstr, pConn->peerPort);
terrno = TSDB_CODE_NETWORK_UNAVAIL;
rpcCloseConn(pConn);
pConn = NULL;
......@@ -418,14 +429,14 @@ static void rpcCloseConn(void *thandle) {
pthread_mutex_lock(&pRpc->mutex);
if (pConn->meterId[0]) {
pConn->meterId[0] = 0;
if (pConn->user[0]) {
pConn->user[0] = 0;
if (taosCloseConn[pRpc->connType]) (*taosCloseConn[pRpc->connType])(pConn->chandle);
taosTmrStopA(&pConn->pTimer);
taosTmrStopA(&pConn->pIdleTimer);
if ( pRpc->connType == TAOS_CONN_UDPS || TAOS_CONN_TCPS) {
if ( pRpc->connType == TAOS_CONN_UDPS || pRpc->connType == TAOS_CONN_TCPS) {
char hashstr[40] = {0};
sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId);
taosDeleteStrHash(pRpc->hash, hashstr);
......@@ -471,7 +482,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
return pConn;
}
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hashstr) {
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr) {
SRpcConn *pConn = NULL;
// check if it is already allocated
......@@ -486,13 +497,13 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash
} else {
pConn = pRpc->connList + sid;
memset(pConn, 0, sizeof(SRpcConn));
memcpy(pConn->meterId, meterId, tListLen(pConn->meterId));
memcpy(pConn->user, user, tListLen(pConn->user));
pConn->pRpc = pRpc;
pConn->sid = sid;
pConn->tranId = (uint16_t)(rand() & 0xFFFF);
pConn->ownId = htonl(pConn->sid);
if (pRpc->afp && (*pRpc->afp)(meterId, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey)) {
tWarn("%s %p, meterId not there", pRpc->label, pConn);
if (pRpc->afp && (*pRpc->afp)(user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey)) {
tWarn("%s %p, user not there", pRpc->label, pConn);
taosFreeId(pRpc->idPool, sid); // sid shall be released
terrno = TSDB_CODE_INVALID_USER;
pConn = NULL;
......@@ -501,24 +512,24 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *meterId, char *hash
if (pConn) {
taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn);
tTrace("%s %p, rpc connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid, pConn->meterId);
tTrace("%s %p, rpc connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid, pConn->user);
}
return pConn;
}
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *hashstr) {
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *user, char *hashstr) {
SRpcConn *pConn = NULL;
if (sid) {
pConn = pRpc->connList + sid;
} else {
pConn = rpcAllocateServerConn(pRpc, meterId, hashstr);
pConn = rpcAllocateServerConn(pRpc, user, hashstr);
}
if (pConn) {
if (memcmp(pConn->meterId, meterId, tListLen(pConn->meterId)) != 0) {
tTrace("%s %p, meterId:%s is not matched, received:%s", pRpc->label, pConn, pConn->meterId, meterId);
if (memcmp(pConn->user, user, tListLen(pConn->user)) != 0) {
tTrace("%s %p, user:%s is not matched, received:%s", pRpc->label, pConn, pConn->user, user);
terrno = TSDB_CODE_MISMATCHED_METER_ID;
pConn = NULL;
}
......@@ -530,7 +541,7 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *has
SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet) {
SRpcConn *pConn;
pConn = rpcGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->meterId);
pConn = rpcGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->user);
if ( pConn == NULL ) {
char ipstr[20] = {0};
tinet_ntoa(ipstr, ipSet.ip[ipSet.index]);
......@@ -654,7 +665,7 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int
}
if (sid == 0) sprintf(hashstr, "%x:%x:%x", ip, pHead->uid, pHead->sourceId);
pConn = rpcGetConnObj(pRpc, sid, pHead->meterId, hashstr);
pConn = rpcGetConnObj(pRpc, sid, pHead->user, hashstr);
if (pConn == NULL ) return terrno;
*ppConn = pConn;
......@@ -687,6 +698,9 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int
static void rpcProcessBrokenLink(SRpcConn *pConn) {
SRpcInfo *pRpc = pConn->pRpc;
tTrace("%s %p, link is broken", pRpc->label, pConn);
pConn->chandle = NULL;
if (pConn->outType) {
SRpcReqContext *pContext = pConn->pContext;
pContext->code = TSDB_CODE_NETWORK_UNAVAIL;
......@@ -770,7 +784,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
int32_t code = pHead->code;
SRpcReqContext *pContext = pConn->pContext;
pConn->pContext = NULL;
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId);
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->user);
if (code == TSDB_CODE_REDIRECT) {
pContext->redirect = 1;
......@@ -803,7 +817,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId;
pHead->uid = 0;
memcpy(pHead->meterId, pConn->meterId, tListLen(pHead->meterId));
memcpy(pHead->user, pConn->user, tListLen(pHead->user));
pHead->code = htonl(code);
rpcSendMsgToPeer(pConn, msg, 0);
......@@ -827,7 +841,7 @@ static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint
pReplyHead->tranId = pRecvHead->tranId;
pReplyHead->sourceId = 0;
pReplyHead->destId = pRecvHead->sourceId;
memcpy(pReplyHead->meterId, pRecvHead->meterId, tListLen(pReplyHead->meterId));
memcpy(pReplyHead->user, pRecvHead->user, tListLen(pReplyHead->user));
pReplyHead->code = htonl(code);
msgLen = sizeof(SRpcHead);
......@@ -874,7 +888,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pHead->destId = pConn->peerId;
pHead->port = 0;
pHead->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid());
memcpy(pHead->meterId, pConn->meterId, tListLen(pHead->meterId));
memcpy(pHead->user, pConn->user, tListLen(pHead->user));
// set the connection parameters
pConn->outType = msgType;
......@@ -886,7 +900,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pthread_mutex_unlock(&pRpc->mutex);
rpcSendMsgToPeer(pConn, msg, msgLen);
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
//taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
}
static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
......@@ -921,6 +935,8 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
static void rpcProcessConnError(void *param, void *id) {
SRpcReqContext *pContext = (SRpcReqContext *)param;
SRpcInfo *pRpc = pContext->pRpc;
tTrace("%s connection error happens", pRpc->label);
if ( pContext->numOfTry >= pContext->ipSet.numOfIps ) {
rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg
......@@ -940,7 +956,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
pthread_mutex_lock(&pRpc->mutex);
if (pConn->outType && pConn->meterId[0]) {
if (pConn->outType && pConn->user[0]) {
tTrace("%s %p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]);
pConn->pTimer = NULL;
pConn->retry++;
......@@ -962,8 +978,8 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
pthread_mutex_unlock(&pRpc->mutex);
pConn->pContext->code = TSDB_CODE_NETWORK_UNAVAIL;
if (reportDisc) {
if (reportDisc && pConn->pContext) {
pConn->pContext->code = TSDB_CODE_NETWORK_UNAVAIL;
rpcProcessConnError(pConn->pContext, NULL);
rpcCloseConn(pConn);
}
......@@ -973,7 +989,7 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) {
SRpcConn *pConn = (SRpcConn *)param;
SRpcInfo *pRpc = pConn->pRpc;
if (pConn->meterId[0]) {
if (pConn->user[0]) {
tTrace("%s %p, close the connection since no activity", pRpc->label, pConn);
rpcCloseConn(pConn);
} else {
......@@ -987,7 +1003,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) {
pthread_mutex_lock(&pRpc->mutex);
if (pConn->inType && pConn->meterId[0]) {
if (pConn->inType && pConn->user[0]) {
tTrace("%s %p, progress timer expired, send progress", pRpc->label, pConn);
rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
taosTmrReset(rpcProcessProgressTimer, tsRpcTimer<<pConn->retry, pConn, pRpc->tmrCtrl, &pConn->pTimer);
......@@ -1015,7 +1031,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
char *buf = malloc (contLen + overhead + 8); // 8 extra bytes
if (buf == NULL) {
tError("failed to allocate memory for rpc msg compression, contLen:%d, reason:%s", contLen, strerror(errno));
tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen);
return contLen;
}
......@@ -1033,7 +1049,6 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
pHead->comp = 1;
tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen);
finalLen = compLen + overhead;
} else {
finalLen = contLen;
......@@ -1055,20 +1070,20 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
int contLen = htonl(pComp->contLen);
// prepare the temporary buffer to decompress message
char *buf = rpcMallocCont(contLen);
pNewHead = (SRpcHead *)malloc(contLen + RPC_MSG_OVERHEAD);
if (buf) {
pNewHead = rpcHeadFromCont(buf);
if (pNewHead) {
int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead;
int32_t originalLen = LZ4_decompress_safe((const char*)(pCont + overhead), buf, compLen, contLen);
assert(originalLen == contLen);
int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char *)pNewHead->content, compLen, contLen);
assert(origLen == contLen);
memcpy(pNewHead, pHead, sizeof(SRpcHead));
pNewHead->msgLen = rpcMsgLenFromCont(originalLen);
pNewHead->msgLen = rpcMsgLenFromCont(origLen);
free(pHead); // free the compressed message buffer
pHead = pNewHead;
tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen);
} else {
tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno));
tError("failed to allocate memory to decompress msg, contLen:%d", contLen);
}
}
......
......@@ -101,8 +101,8 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) {
// notify the upper layer, so it will clean the associated context
if (pFdObj->thandle) (*(pThreadObj->processData))(NULL, 0, 0, 0, pThreadObj->shandle, pFdObj->thandle, NULL);
tTrace("%s TCP thread:%d, FD is cleaned up, numOfFds:%d", pThreadObj->label, pThreadObj->threadId,
pThreadObj->numOfFds);
tTrace("%s TCP thread:%d, FD:%p is cleaned up, numOfFds:%d", pThreadObj->label, pThreadObj->threadId,
pFdObj, pThreadObj->numOfFds);
memset(pFdObj, 0, sizeof(SFdObj));
......@@ -292,8 +292,8 @@ void taosAcceptTcpConnection(void *arg) {
pthread_mutex_unlock(&(pThreadObj->threadMutex));
tTrace("%s TCP thread:%d, a new connection, ip:%s port:%hu, numOfFds:%d", pServerObj->label, pThreadObj->threadId,
pFdObj->ipstr, pFdObj->port, pThreadObj->numOfFds);
tTrace("%s TCP thread:%d, a new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label,
pThreadObj->threadId, pFdObj->ipstr, pFdObj->port, pFdObj, pThreadObj->numOfFds);
// pick up next thread for next connection
threadId++;
......
......@@ -13,58 +13,189 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#define _DEFAULT_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <pthread.h>
#include <errno.h>
#include <signal.h>
#include "os.h"
#include "tlog.h"
#include "trpc.h"
#include "taoserror.h"
#include <stdint.h>
#include <unistd.h>
void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code) {
dPrint("response is received, type:%d, contLen:%d code:%x:%s", type, contLen, code, tstrerror(code));
typedef struct {
int index;
SRpcIpSet ipSet;
int num;
int numOfReqs;
int msgSize;
sem_t rspSem;
sem_t *pOverSem;
pthread_t thread;
void *pRpc;
} SInfo;
void processResponse(char type, void *pCont, int contLen, void *ahandle, int32_t code) {
SInfo *pInfo = (SInfo *)ahandle;
tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, type, contLen, code);
if (pCont) rpcFreeCont(pCont);
sem_post(&pInfo->rspSem);
}
void processUpdate(void *handle, SRpcIpSet *pIpSet) {
dPrint("ip set is changed, index:%d", pIpSet->index);
void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) {
SInfo *pInfo = (SInfo *)handle;
tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->index);
pInfo->ipSet = *pIpSet;
}
int32_t main(int32_t argc, char *argv[]) {
int tcount = 0;
taosInitLog("client.log", 100000, 10);
dPrint("unit test for rpc module");
void *sendRequest(void *param) {
SInfo *pInfo = (SInfo *)param;
char *cont;
tTrace("thread:%d, start to send request", pInfo->index);
while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
pInfo->num++;
cont = rpcMallocCont(pInfo->msgSize);
tTrace("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest(pInfo->pRpc, &pInfo->ipSet, 1, cont, pInfo->msgSize, pInfo);
if ( pInfo->num % 20000 == 0 )
tPrint("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
sem_wait(&pInfo->rspSem);
}
SRpcInit rpcInit;
tTrace("thread:%d, it is over", pInfo->index);
tcount++;
return NULL;
}
int main(int argc, char *argv[]) {
SRpcInit rpcInit;
SRpcIpSet ipSet;
int msgSize = 128;
int numOfReqs = 0;
int appThreads = 1;
char socketType[20] = "udp";
char serverIp[40] = "127.0.0.1";
struct timeval systemTime;
int64_t startTime, endTime;
pthread_attr_t thattr;
// server info
ipSet.numOfIps = 1;
ipSet.index = 0;
ipSet.port = 7000;
ipSet.ip[0] = inet_addr(serverIp);
ipSet.ip[1] = inet_addr("192.168.0.1");
// client info
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = "0.0.0.0";
rpcInit.localPort = 0;
rpcInit.label = "APP";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processMsg;
rpcInit.ufp = processUpdate;
rpcInit.sessions = 1000;
rpcInit.connType = TAOS_CONN_UDPC;
rpcInit.cfp = processResponse;
rpcInit.ufp = processUpdateIpSet;
rpcInit.sessions = 100;
rpcInit.idleTime = 2000;
rpcInit.meterId = "jefftao";
rpcInit.secret = "password";
rpcInit.user = "michael";
rpcInit.secret = "mypassword";
rpcInit.ckey = "key";
for (int i=1; i<argc; ++i) {
if ( strcmp(argv[i], "-c")==0 && i < argc-1 ) {
strcpy(socketType, argv[++i]);
} else if (strcmp(argv[i], "-p")==0 && i < argc-1) {
ipSet.port = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
ipSet.ip[0] = inet_addr(argv[++i]);
} else if (strcmp(argv[i], "-l")==0 && i < argc-1) {
strcpy(rpcInit.localIp, argv[++i]);
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
msgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
rpcInit.sessions = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
numOfReqs = atoi(argv[++i]);
} else if (strcmp(argv[i], "-a")==0 && i < argc-1) {
appThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
rpcDebugFlag = atoi(argv[++i]);
} else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
tsCompressMsgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-c ctype]: connection type:udp or tpc, default is:%s\n", socketType);
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
printf(" [-p port]: server port number, default is:%d\n", ipSet.port);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
printf(" [-l localIp]: local IP address, default is:%s\n", rpcInit.localIp);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
rpcInit.connType = strcasecmp(socketType, "udp") == 0 ? TAOS_CONN_UDPC : TAOS_CONN_TCPC;
taosInitLog("client.log", 100000, 10);
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
dError("failed to initialize rpc");
dError("failed to initialize RPC");
return -1;
}
SRpcIpSet ipSet;
ipSet.numOfIps = 2;
ipSet.index = 0;
ipSet.port = 7000;
ipSet.ip[0] = inet_addr("192.168.0.1");
ipSet.ip[1] = inet_addr("127.0.0.1");
tPrint("client is initialized");
gettimeofday(&systemTime, NULL);
startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo)*appThreads);
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
for (int i=0; i<appThreads; ++i) {
pInfo->index = i;
pInfo->ipSet = ipSet;
pInfo->numOfReqs = numOfReqs;
pInfo->msgSize = msgSize;
sem_init(&pInfo->rspSem, 0, 0);
pInfo->pRpc = pRpc;
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
pInfo++;
}
do {
usleep(1);
} while ( tcount < appThreads);
gettimeofday(&systemTime, NULL);
endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
float usedTime = (endTime - startTime)/1000.0; // mseconds
void *cont = rpcMallocCont(100);
rpcSendRequest(pRpc, &ipSet, 1, cont, 100, 1);
tPrint("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads);
tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000*numOfReqs*appThreads/usedTime, msgSize);
getchar();
taosCloseLog();
return 0;
}
......
......@@ -19,12 +19,33 @@
#include "trpc.h"
#include <stdint.h>
void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code) {
dPrint("request is received, type:%d, contLen:%d", type, contLen);
void *rsp = rpcMallocCont(128);
int msgSize = 128;
int commit = 0;
int dataFd = -1;
void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32_t code) {
static int num = 0;
tTrace("request is received, type:%d, contLen:%d", type, contLen);
if (dataFd >=0)
write(dataFd, pCont, contLen);
if (commit >=2) {
++num;
if ( fsync(dataFd) < 0 ) {
tPrint("failed to flush data to file, reason:%s", strerror(errno));
}
if (num % 10000 == 0) {
tPrint("%d request have been written into disk", num);
}
}
void *rsp = rpcMallocCont(msgSize);
//rpcSendResponse(ahandle, 1, rsp, 128);
rpcSendResponse(thandle, 1, rsp, msgSize);
/*
SRpcIpSet ipSet;
ipSet.numOfIps = 1;
ipSet.index = 0;
......@@ -32,46 +53,87 @@ void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code
ipSet.ip[0] = inet_addr("192.168.0.2");
rpcSendRedirectRsp(ahandle, &ipSet);
*/
rpcFreeCont(pCont);
}
int32_t main(int32_t argc, char *argv[]) {
taosInitLog("server.log", 100000, 10);
dPrint("unit test for rpc module");
int main(int argc, char *argv[]) {
SRpcInit rpcInit;
char socketType[20] = "udp";
char dataName[20] = "server.data";
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = "0.0.0.0";
rpcInit.localPort = 7000;
rpcInit.label = "APP";
rpcInit.label = "SER";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processMsg;
rpcInit.cfp = processRequestMsg;
rpcInit.sessions = 1000;
rpcInit.connType = TAOS_CONN_UDPS;
rpcInit.idleTime = 2000;
rpcInit.meterId = "jefftao";
rpcInit.secret = "password";
rpcInit.ckey = "key";
for (int i=1; i<argc; ++i) {
if ( strcmp(argv[i], "-c")==0 && i < argc-1 ) {
strcpy(socketType, argv[++i]);
} else if (strcmp(argv[i], "-p")==0 && i < argc-1) {
rpcInit.localPort = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i")==0 && i < argc-1) {
strcpy(rpcInit.localIp, argv[++i]);
} else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
msgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
rpcInit.sessions = atoi(argv[++i]);
} else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
tsCompressMsgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-w")==0 && i < argc-1) {
commit = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
rpcDebugFlag = atoi(argv[++i]);
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-c ctype]: connection type:udp or tcp, default is:%s\n", socketType);
printf(" [-i ip]: server IP address, default is:%s\n", rpcInit.localIp);
printf(" [-p port]: server port number, default is:%d\n", rpcInit.localPort);
printf(" [-t threads]: number of threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of sessions, default is:%d\n", rpcInit.sessions);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
printf(" [-w write]: write received data to file(0, 1, 2), default is:%d\n", commit);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
rpcInit.connType = strcasecmp(socketType, "udp") == 0 ? TAOS_CONN_UDPS : TAOS_CONN_TCPS;
taosInitLog("server.log", 100000, 10);
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
dError("failed to initialize rpc");
tError("failed to start RPC server");
return -1;
}
/*
SRpcIpSet ipSet;
ipSet.numOfIps = 2;
ipSet.index = 0;
ipSet.ip[0] = inet_addr("127.0.0.1");
ipSet.ip[1] = inet_addr("192.168.0.1");
*/
tPrint("RPC server is running, ctrl-c to exit");
dPrint("server is running...");
if (commit) {
dataFd = open(dataName, O_APPEND | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
if (dataFd<0)
tPrint("failed to open data file, reason:%s", strerror(errno));
}
// loop forever
while(1) {
sleep(1);
}
getchar();
if (dataFd >= 0) {
close(dataFd);
remove(dataName);
}
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册