未验证 提交 15506d57 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1256 from taosdata/refact/rpc

Refact/rpc
...@@ -29,8 +29,8 @@ extern "C" { ...@@ -29,8 +29,8 @@ extern "C" {
extern int tsRpcHeadSize; extern int tsRpcHeadSize;
typedef struct { typedef struct {
int16_t index; int8_t inUse;
int16_t numOfIps; int8_t numOfIps;
uint16_t port; uint16_t port;
uint32_t ip[TSDB_MAX_MPEERS]; uint32_t ip[TSDB_MAX_MPEERS];
} SRpcIpSet; } SRpcIpSet;
...@@ -43,13 +43,13 @@ typedef struct { ...@@ -43,13 +43,13 @@ typedef struct {
} SRpcConnInfo; } SRpcConnInfo;
typedef struct { typedef struct {
char *localIp; // local IP used char *localIp; // local IP used
uint16_t localPort; // local port uint16_t localPort; // local port
char *label; // for debug purpose char *label; // for debug purpose
int numOfThreads; // number of threads to handle connections int numOfThreads; // number of threads to handle connections
int sessions; // number of sessions allowed int sessions; // number of sessions allowed
int connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int idleTime; // milliseconds, 0 means idle timer is disabled int idleTime; // milliseconds, 0 means idle timer is disabled
// the following is for client app ecurity only // the following is for client app ecurity only
char *user; // user name char *user; // user name
...@@ -72,6 +72,7 @@ void *rpcOpen(SRpcInit *pRpc); ...@@ -72,6 +72,7 @@ void *rpcOpen(SRpcInit *pRpc);
void rpcClose(void *); void rpcClose(void *);
void *rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen);
void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, char msgType, void *pCont, int contLen, void *ahandle); void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, char msgType, void *pCont, int contLen, void *ahandle);
void rpcSendResponse(void *pConn, int32_t code, void *pCont, int contLen); void rpcSendResponse(void *pConn, int32_t code, void *pCont, int contLen);
void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet); void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet);
......
...@@ -22,8 +22,8 @@ extern "C" { ...@@ -22,8 +22,8 @@ extern "C" {
void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer); void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer);
void rpcCloseConnCache(void *handle); void rpcCloseConnCache(void *handle);
void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user); void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, int8_t connType);
void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user); void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, int8_t connType);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
*/ */
#include "os.h" #include "os.h"
#include "tglobalcfg.h" #include "tglobalcfg.h"
#include "tlog.h" #include "tlog.h"
#include "tmempool.h" #include "tmempool.h"
...@@ -26,6 +25,7 @@ ...@@ -26,6 +25,7 @@
typedef struct _c_hash_t { typedef struct _c_hash_t {
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
char connType;
struct _c_hash_t *prev; struct _c_hash_t *prev;
struct _c_hash_t *next; struct _c_hash_t *next;
void * data; void * data;
...@@ -43,49 +43,77 @@ typedef struct { ...@@ -43,49 +43,77 @@ typedef struct {
void (*cleanFp)(void *); void (*cleanFp)(void *);
void *tmrCtrl; void *tmrCtrl;
void *pTimer; void *pTimer;
int64_t *lockedBy;
} SConnCache; } SConnCache;
int rpcHashConn(void *handle, uint32_t ip, uint16_t port, char *user) { static int rpcHashConn(void *handle, uint32_t ip, uint16_t port, int8_t connType);
SConnCache *pCache = (SConnCache *)handle; static void rpcLockCache(int64_t *lockedBy);
int hash = 0; static void rpcUnlockCache(int64_t *lockedBy);
// size_t user_len = strlen(user); static void rpcCleanConnCache(void *handle, void *tmrId);
static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time);
hash = ip >> 16; void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) {
hash += (unsigned short)(ip & 0xFFFF); SConnHash **connHashList;
hash += port; mpool_h connHashMemPool;
while (*user != '\0') { SConnCache *pCache;
hash += *user;
user++; connHashMemPool = taosMemPoolInit(maxSessions, sizeof(SConnHash));
if (connHashMemPool == 0) return NULL;
connHashList = calloc(sizeof(SConnHash *), maxSessions);
if (connHashList == 0) {
taosMemPoolCleanUp(connHashMemPool);
return NULL;
} }
hash = hash % pCache->maxSessions; pCache = malloc(sizeof(SConnCache));
if (pCache == NULL) {
taosMemPoolCleanUp(connHashMemPool);
free(connHashList);
return NULL;
}
memset(pCache, 0, sizeof(SConnCache));
return hash; pCache->count = calloc(sizeof(int), maxSessions);
pCache->total = 0;
pCache->keepTimer = keepTimer;
pCache->maxSessions = maxSessions;
pCache->connHashMemPool = connHashMemPool;
pCache->connHashList = connHashList;
pCache->cleanFp = cleanFp;
pCache->tmrCtrl = tmrCtrl;
pCache->lockedBy = calloc(sizeof(int64_t), maxSessions);
taosTmrReset(rpcCleanConnCache, pCache->keepTimer * 2, pCache, pCache->tmrCtrl, &pCache->pTimer);
pthread_mutex_init(&pCache->mutex, NULL);
return pCache;
} }
void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) { void rpcCloseConnCache(void *handle) {
if (pNode == NULL || (time < pCache->keepTimer + pNode->time) ) return; SConnCache *pCache;
SConnHash *pPrev = pNode->prev, *pNext; pCache = (SConnCache *)handle;
if (pCache == NULL || pCache->maxSessions == 0) return;
while (pNode) { pthread_mutex_lock(&pCache->mutex);
(*pCache->cleanFp)(pNode->data);
pNext = pNode->next;
pCache->total--;
pCache->count[hash]--;
tTrace("%p ip:0x%x:%hu:%d:%p removed from cache, connections:%d", pNode->data, pNode->ip, pNode->port, hash, pNode,
pCache->count[hash]);
taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
pNode = pNext;
}
if (pPrev) taosTmrStopA(&(pCache->pTimer));
pPrev->next = NULL;
else if (pCache->connHashMemPool) taosMemPoolCleanUp(pCache->connHashMemPool);
pCache->connHashList[hash] = NULL;
tfree(pCache->connHashList);
tfree(pCache->count)
pthread_mutex_unlock(&pCache->mutex);
pthread_mutex_destroy(&pCache->mutex);
memset(pCache, 0, sizeof(SConnCache));
free(pCache);
} }
void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user) { void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, int8_t connType) {
int hash; int hash;
SConnHash * pNode; SConnHash * pNode;
SConnCache *pCache; SConnCache *pCache;
...@@ -96,54 +124,34 @@ void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, c ...@@ -96,54 +124,34 @@ void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, c
assert(pCache); assert(pCache);
assert(data); assert(data);
hash = rpcHashConn(pCache, ip, port, user); hash = rpcHashConn(pCache, ip, port, connType);
pNode = (SConnHash *)taosMemPoolMalloc(pCache->connHashMemPool); pNode = (SConnHash *)taosMemPoolMalloc(pCache->connHashMemPool);
pNode->ip = ip; pNode->ip = ip;
pNode->port = port; pNode->port = port;
pNode->connType = connType;
pNode->data = data; pNode->data = data;
pNode->prev = NULL; pNode->prev = NULL;
pNode->time = time; pNode->time = time;
pthread_mutex_lock(&pCache->mutex); rpcLockCache(pCache->lockedBy+hash);
pNode->next = pCache->connHashList[hash]; pNode->next = pCache->connHashList[hash];
if (pCache->connHashList[hash] != NULL) (pCache->connHashList[hash])->prev = pNode; if (pCache->connHashList[hash] != NULL) (pCache->connHashList[hash])->prev = pNode;
pCache->connHashList[hash] = pNode; pCache->connHashList[hash] = pNode;
pCache->total++;
pCache->count[hash]++; pCache->count[hash]++;
rpcRemoveExpiredNodes(pCache, pNode->next, hash, time); rpcRemoveExpiredNodes(pCache, pNode->next, hash, time);
pthread_mutex_unlock(&pCache->mutex); rpcUnlockCache(pCache->lockedBy+hash);
tTrace("%p ip:0x%x:%hu:%d:%p added into cache, connections:%d", data, ip, port, hash, pNode, pCache->count[hash]);
return;
}
void rpcCleanConnCache(void *handle, void *tmrId) {
int hash;
SConnHash * pNode;
SConnCache *pCache;
pCache = (SConnCache *)handle; pCache->total++;
if (pCache == NULL || pCache->maxSessions == 0) return;
if (pCache->pTimer != tmrId) return;
uint64_t time = taosGetTimestampMs(); tTrace("%p ip:0x%x:%hu:%d:%d:%p added into cache, connections:%d", data, ip, port, connType, hash, pNode, pCache->count[hash]);
for (hash = 0; hash < pCache->maxSessions; ++hash) { return;
pthread_mutex_lock(&pCache->mutex);
pNode = pCache->connHashList[hash];
rpcRemoveExpiredNodes(pCache, pNode, hash, time);
pthread_mutex_unlock(&pCache->mutex);
}
// tTrace("timer, total connections in cache:%d", pCache->total);
taosTmrReset(rpcCleanConnCache, pCache->keepTimer * 2, pCache, pCache->tmrCtrl, &pCache->pTimer);
} }
void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) { void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, int8_t connType) {
int hash; int hash;
SConnHash * pNode; SConnHash * pNode;
SConnCache *pCache; SConnCache *pCache;
...@@ -154,8 +162,8 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) ...@@ -154,8 +162,8 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user)
uint64_t time = taosGetTimestampMs(); uint64_t time = taosGetTimestampMs();
hash = rpcHashConn(pCache, ip, port, user); hash = rpcHashConn(pCache, ip, port, connType);
pthread_mutex_lock(&pCache->mutex); rpcLockCache(pCache->lockedBy+hash);
pNode = pCache->connHashList[hash]; pNode = pCache->connHashList[hash];
while (pNode) { while (pNode) {
...@@ -165,7 +173,7 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) ...@@ -165,7 +173,7 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user)
break; break;
} }
if (pNode->ip == ip && pNode->port == port) break; if (pNode->ip == ip && pNode->port == port && pNode->connType == connType) break;
pNode = pNode->next; pNode = pNode->next;
} }
...@@ -189,71 +197,87 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) ...@@ -189,71 +197,87 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user)
pCache->count[hash]--; pCache->count[hash]--;
} }
pthread_mutex_unlock(&pCache->mutex); rpcUnlockCache(pCache->lockedBy+hash);
if (pData) { if (pData) {
tTrace("%p ip:0x%x:%hu:%d:%p retrieved from cache, connections:%d", pData, ip, port, hash, pNode, pCache->count[hash]); tTrace("%p ip:0x%x:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, ip, port, connType, hash, pNode, pCache->count[hash]);
} }
return pData; return pData;
} }
void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) { static void rpcCleanConnCache(void *handle, void *tmrId) {
SConnHash **connHashList; int hash;
mpool_h connHashMemPool; SConnHash * pNode;
SConnCache *pCache; SConnCache *pCache;
connHashMemPool = taosMemPoolInit(maxSessions, sizeof(SConnHash)); pCache = (SConnCache *)handle;
if (connHashMemPool == 0) return NULL; if (pCache == NULL || pCache->maxSessions == 0) return;
if (pCache->pTimer != tmrId) return;
connHashList = calloc(sizeof(SConnHash *), maxSessions); uint64_t time = taosGetTimestampMs();
if (connHashList == 0) {
taosMemPoolCleanUp(connHashMemPool);
return NULL;
}
pCache = malloc(sizeof(SConnCache)); for (hash = 0; hash < pCache->maxSessions; ++hash) {
if (pCache == NULL) { rpcLockCache(pCache->lockedBy+hash);
taosMemPoolCleanUp(connHashMemPool); pNode = pCache->connHashList[hash];
free(connHashList); rpcRemoveExpiredNodes(pCache, pNode, hash, time);
return NULL; rpcUnlockCache(pCache->lockedBy+hash);
} }
memset(pCache, 0, sizeof(SConnCache));
pCache->count = calloc(sizeof(int), maxSessions); // tTrace("timer, total connections in cache:%d", pCache->total);
pCache->total = 0;
pCache->keepTimer = keepTimer;
pCache->maxSessions = maxSessions;
pCache->connHashMemPool = connHashMemPool;
pCache->connHashList = connHashList;
pCache->cleanFp = cleanFp;
pCache->tmrCtrl = tmrCtrl;
taosTmrReset(rpcCleanConnCache, pCache->keepTimer * 2, pCache, pCache->tmrCtrl, &pCache->pTimer); taosTmrReset(rpcCleanConnCache, pCache->keepTimer * 2, pCache, pCache->tmrCtrl, &pCache->pTimer);
pthread_mutex_init(&pCache->mutex, NULL);
return pCache;
} }
void rpcCloseConnCache(void *handle) { static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) {
SConnCache *pCache; if (pNode == NULL || (time < pCache->keepTimer + pNode->time) ) return;
pCache = (SConnCache *)handle; SConnHash *pPrev = pNode->prev, *pNext;
if (pCache == NULL || pCache->maxSessions == 0) return;
pthread_mutex_lock(&pCache->mutex); while (pNode) {
(*pCache->cleanFp)(pNode->data);
pNext = pNode->next;
pCache->total--;
pCache->count[hash]--;
tTrace("%p ip:0x%x:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->ip, pNode->port, pNode->connType, hash, pNode,
pCache->count[hash]);
taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
pNode = pNext;
}
taosTmrStopA(&(pCache->pTimer)); if (pPrev)
pPrev->next = NULL;
else
pCache->connHashList[hash] = NULL;
}
if (pCache->connHashMemPool) taosMemPoolCleanUp(pCache->connHashMemPool); static int rpcHashConn(void *handle, uint32_t ip, uint16_t port, int8_t connType) {
SConnCache *pCache = (SConnCache *)handle;
int hash = 0;
tfree(pCache->connHashList); hash = ip >> 16;
tfree(pCache->count) hash += (unsigned short)(ip & 0xFFFF);
hash += port;
hash += connType;
hash = hash % pCache->maxSessions;
pthread_mutex_unlock(&pCache->mutex); return hash;
}
pthread_mutex_destroy(&pCache->mutex); static void rpcLockCache(int64_t *lockedBy) {
int64_t tid = taosGetPthreadId();
int i = 0;
while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
if (++i % 100 == 0) {
sched_yield();
}
}
}
memset(pCache, 0, sizeof(SConnCache)); static void rpcUnlockCache(int64_t *lockedBy) {
free(pCache); int64_t tid = taosGetPthreadId();
if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
assert(false);
}
} }
...@@ -41,12 +41,13 @@ ...@@ -41,12 +41,13 @@
#define rpcIsReq(type) (type & 1U) #define rpcIsReq(type) (type & 1U)
typedef struct { typedef struct {
int sessions; int sessions; // number of sessions allowed
int numOfThreads; int numOfThreads; // number of threads to process incoming messages
int idleTime; // milliseconds; int idleTime; // milliseconds;
char localIp[TSDB_IPv4ADDR_LEN]; char localIp[TSDB_IPv4ADDR_LEN];
uint16_t localPort; uint16_t localPort;
int connType; int8_t connType;
int index; // for UDP server only, round robin for multiple threads
char label[12]; char label[12];
char user[TSDB_UNI_LEN]; // meter ID char user[TSDB_UNI_LEN]; // meter ID
...@@ -78,7 +79,7 @@ typedef struct { ...@@ -78,7 +79,7 @@ typedef struct {
int32_t contLen; // content length int32_t contLen; // content length
int32_t code; // error code int32_t code; // error code
int16_t numOfTry; // number of try for different servers int16_t numOfTry; // number of try for different servers
int8_t oldIndex; // server IP index passed by app int8_t oldInUse; // server IP inUse passed by app
int8_t redirect; // flag to indicate redirect int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type int8_t connType; // connection type
char msg[0]; // RpcHead starts from here char msg[0]; // RpcHead starts from here
...@@ -115,7 +116,7 @@ typedef struct _RpcConn { ...@@ -115,7 +116,7 @@ typedef struct _RpcConn {
char *pReqMsg; // request message including header char *pReqMsg; // request message including header
int reqMsgLen; // request message length int reqMsgLen; // request message length
SRpcInfo *pRpc; // the associated SRpcInfo SRpcInfo *pRpc; // the associated SRpcInfo
int connType; // connection type int8_t connType; // connection type
int64_t lockedBy; // lock for connection int64_t lockedBy; // lock for connection
SRpcReqContext *pContext; // request context SRpcReqContext *pContext; // request context
} SRpcConn; } SRpcConn;
...@@ -172,8 +173,8 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, ...@@ -172,8 +173,8 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort,
static void rpcCloseConn(void *thandle); static void rpcCloseConn(void *thandle);
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext); static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext);
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr); static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *user, char *hashstr); static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv);
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
...@@ -207,8 +208,7 @@ void *rpcOpen(SRpcInit *pInit) { ...@@ -207,8 +208,7 @@ void *rpcOpen(SRpcInit *pInit) {
if(pInit->label) strcpy(pRpc->label, pInit->label); if(pInit->label) strcpy(pRpc->label, pInit->label);
pRpc->connType = pInit->connType; pRpc->connType = pInit->connType;
pRpc->idleTime = pInit->idleTime; pRpc->idleTime = pInit->idleTime;
// pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads;
pRpc->numOfThreads = 1;
if (pInit->localIp) strcpy(pRpc->localIp, pInit->localIp); if (pInit->localIp) strcpy(pRpc->localIp, pInit->localIp);
pRpc->localPort = pInit->localPort; pRpc->localPort = pInit->localPort;
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
...@@ -299,17 +299,16 @@ void rpcClose(void *param) { ...@@ -299,17 +299,16 @@ void rpcClose(void *param) {
tfree(pRpc); tfree(pRpc);
} }
void *rpcMallocCont(int size) { void *rpcMallocCont(int contLen) {
char *pMsg = NULL; int size = contLen + RPC_MSG_OVERHEAD;
size += RPC_MSG_OVERHEAD; char *start = (char *)calloc(1, (size_t)size);
pMsg = (char *)calloc(1, (size_t)size); if (start == NULL) {
if (pMsg == NULL) {
tError("failed to malloc msg, size:%d", size); tError("failed to malloc msg, size:%d", size);
return NULL; return NULL;
} }
return pMsg + sizeof(SRpcReqContext) + sizeof(SRpcHead); return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
} }
void rpcFreeCont(void *cont) { void rpcFreeCont(void *cont) {
...@@ -319,6 +318,24 @@ void rpcFreeCont(void *cont) { ...@@ -319,6 +318,24 @@ void rpcFreeCont(void *cont) {
} }
} }
void *rpcReallocCont(void *ptr, int contLen) {
if (ptr == NULL) return rpcMallocCont(contLen);
char *start = ((char *)ptr) - sizeof(SRpcReqContext) - sizeof(SRpcHead);
if (contLen == 0 ) {
free(start);
}
int size = contLen + RPC_MSG_OVERHEAD;
start = realloc(start, size);
if (start == NULL) {
tError("failed to realloc cont, size:%d", size);
return NULL;
}
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
}
void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, int contLen, void *ahandle) { void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, int contLen, void *ahandle) {
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext; SRpcReqContext *pContext;
...@@ -331,7 +348,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in ...@@ -331,7 +348,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in
pContext->contLen = contLen; pContext->contLen = contLen;
pContext->pCont = pCont; pContext->pCont = pCont;
pContext->msgType = type; pContext->msgType = type;
pContext->oldIndex = pIpSet->index; pContext->oldInUse = pIpSet->inUse;
pContext->connType = RPC_CONN_UDPC; pContext->connType = RPC_CONN_UDPC;
if (contLen > 16000) pContext->connType = RPC_CONN_TCPC; if (contLen > 16000) pContext->connType = RPC_CONN_TCPC;
...@@ -381,6 +398,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { ...@@ -381,6 +398,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
pHead->sourceId = pConn->ownId; pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId; pHead->destId = pConn->peerId;
pHead->uid = 0; pHead->uid = 0;
pHead->port = htons(pConn->localPort);
pHead->code = htonl(code); pHead->code = htonl(code);
memcpy(pHead->user, pConn->user, tListLen(pHead->user)); memcpy(pHead->user, pConn->user, tListLen(pHead->user));
...@@ -514,8 +532,12 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { ...@@ -514,8 +532,12 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
return pConn; return pConn;
} }
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr) { static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
SRpcConn *pConn = NULL; SRpcConn *pConn = NULL;
char hashstr[40];
SRpcHead *pHead = (SRpcHead *)pRecv->msg;
sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->uid, pHead->sourceId, pRecv->connType);
// check if it is already allocated // check if it is already allocated
SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr));
...@@ -529,12 +551,12 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr ...@@ -529,12 +551,12 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr
} else { } else {
pConn = pRpc->connList + sid; pConn = pRpc->connList + sid;
memset(pConn, 0, sizeof(SRpcConn)); memset(pConn, 0, sizeof(SRpcConn));
memcpy(pConn->user, user, tListLen(pConn->user)); memcpy(pConn->user, pHead->user, tListLen(pConn->user));
pConn->pRpc = pRpc; pConn->pRpc = pRpc;
pConn->sid = sid; pConn->sid = sid;
pConn->tranId = (uint16_t)(rand() & 0xFFFF); pConn->tranId = (uint16_t)(rand() & 0xFFFF);
pConn->ownId = htonl(pConn->sid); pConn->ownId = htonl(pConn->sid);
if (pRpc->afp && (*pRpc->afp)(user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey)) { if (pRpc->afp && (*pRpc->afp)(pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey)) {
tWarn("%s %p, user not there", pRpc->label, pConn); tWarn("%s %p, user not there", pRpc->label, pConn);
taosFreeId(pRpc->idPool, sid); // sid shall be released taosFreeId(pRpc->idPool, sid); // sid shall be released
terrno = TSDB_CODE_INVALID_USER; terrno = TSDB_CODE_INVALID_USER;
...@@ -543,25 +565,33 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr ...@@ -543,25 +565,33 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr
} }
if (pConn) { if (pConn) {
if (pRecv->connType == RPC_CONN_UDPS && pRpc->numOfThreads > 1) {
// UDP server, assign to new connection
pRpc->index = (pRpc->index+1) % pRpc->numOfThreads;
pConn->localPort = (pRpc->localPort + pRpc->index);
}
taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn);
tTrace("%s %p, rpc connection is allocated, sid:%d id:%s", pRpc->label, pConn, sid, pConn->user); tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u",
pRpc->label, pConn, sid, pConn->user, pConn->localPort);
} }
return pConn; return pConn;
} }
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *user, char *hashstr) { static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) {
SRpcConn *pConn = NULL; SRpcConn *pConn = NULL;
SRpcHead *pHead = (SRpcHead *)pRecv->msg;
if (sid) { if (sid) {
pConn = pRpc->connList + sid; pConn = pRpc->connList + sid;
} else { } else {
pConn = rpcAllocateServerConn(pRpc, user, hashstr); pConn = rpcAllocateServerConn(pRpc, pRecv);
} }
if (pConn) { if (pConn) {
if (memcmp(pConn->user, user, tListLen(pConn->user)) != 0) { if (memcmp(pConn->user, pHead->user, tListLen(pConn->user)) != 0) {
tTrace("%s %p, user:%s is not matched, received:%s", pRpc->label, pConn, pConn->user, user); tTrace("%s %p, user:%s is not matched, received:%s", pRpc->label, pConn, pConn->user, pHead->user);
terrno = TSDB_CODE_MISMATCHED_METER_ID; terrno = TSDB_CODE_MISMATCHED_METER_ID;
pConn = NULL; pConn = NULL;
} }
...@@ -575,13 +605,15 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { ...@@ -575,13 +605,15 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
SRpcInfo *pRpc = pContext->pRpc; SRpcInfo *pRpc = pContext->pRpc;
SRpcIpSet *pIpSet = &pContext->ipSet; SRpcIpSet *pIpSet = &pContext->ipSet;
pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->ip[pIpSet->index], pIpSet->port, pRpc->user); pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->ip[pIpSet->inUse], pIpSet->port, pContext->connType);
if ( pConn == NULL ) { if ( pConn == NULL ) {
char ipstr[20] = {0}; char ipstr[20] = {0};
tinet_ntoa(ipstr, pIpSet->ip[pIpSet->index]); tinet_ntoa(ipstr, pIpSet->ip[pIpSet->inUse]);
pConn = rpcOpenConn(pRpc, ipstr, pIpSet->port, pContext->connType); pConn = rpcOpenConn(pRpc, ipstr, pIpSet->port, pContext->connType);
if (pConn) pConn->destIp = pIpSet->ip[pIpSet->index]; if (pConn) pConn->destIp = pIpSet->ip[pIpSet->inUse];
} } else {
tTrace("%s %p, connection is retrieved from cache", pRpc->label, pConn);
}
return pConn; return pConn;
} }
...@@ -670,16 +702,16 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -670,16 +702,16 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SRpcConn *rpcProcessHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
int32_t sid; int32_t sid;
SRpcConn *pConn = NULL; SRpcConn *pConn = NULL;
char hashstr[40] = {0};
SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
sid = htonl(pHead->destId); sid = htonl(pHead->destId);
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
pHead->port = htons(pHead->port);
if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) { if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) {
tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
...@@ -698,8 +730,7 @@ static SRpcConn *rpcProcessHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -698,8 +730,7 @@ static SRpcConn *rpcProcessHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
terrno = TSDB_CODE_INVALID_SESSION_ID; return NULL; terrno = TSDB_CODE_INVALID_SESSION_ID; return NULL;
} }
if (sid == 0) sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->uid, pHead->sourceId, pRecv->connType); pConn = rpcGetConnObj(pRpc, sid, pRecv);
pConn = rpcGetConnObj(pRpc, sid, pHead->user, hashstr);
if (pConn == NULL) return NULL; if (pConn == NULL) return NULL;
rpcLockConn(pConn); rpcLockConn(pConn);
...@@ -714,7 +745,7 @@ static SRpcConn *rpcProcessHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -714,7 +745,7 @@ static SRpcConn *rpcProcessHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
} }
if (pRecv->port) pConn->peerPort = pRecv->port; if (pRecv->port) pConn->peerPort = pRecv->port;
if (pHead->port) pConn->peerPort = pHead->port; if (pHead->port) pConn->peerPort = pHead->port;
if (pHead->uid) pConn->peerUid = pHead->uid; if (pHead->uid) pConn->peerUid = pHead->uid;
terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen); terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen);
...@@ -755,12 +786,11 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -755,12 +786,11 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle; SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle;
SRpcConn *pConn = (SRpcConn *)pRecv->thandle; SRpcConn *pConn = (SRpcConn *)pRecv->thandle;
int32_t code = 0;
tDump(pRecv->msg, pRecv->msgLen); tDump(pRecv->msg, pRecv->msgLen);
// underlying UDP layer does not know it is server or client // underlying UDP layer does not know it is server or client
pRecv->connType = pRecv->connType | pRpc->connType; pRecv->connType = pRecv->connType | pRpc->connType;
if (pRecv->ip==0 && pConn) { if (pRecv->ip==0 && pConn) {
rpcProcessBrokenLink(pConn); rpcProcessBrokenLink(pConn);
...@@ -768,30 +798,31 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -768,30 +798,31 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
return NULL; return NULL;
} }
pConn = rpcProcessHead(pRpc, pRecv); terrno = 0;
pConn = rpcProcessMsgHead(pRpc, pRecv);
if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d", tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d port:%hu",
pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, code, pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno,
pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId); pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port);
} }
if (pConn && pRpc->idleTime) { if (pConn && pRpc->idleTime) {
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
} }
if (code != TSDB_CODE_ALREADY_PROCESSED) { if (terrno != TSDB_CODE_ALREADY_PROCESSED) {
if (code != 0) { // parsing error if (terrno != 0) { // parsing error
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
rpcSendErrorMsgToPeer(pRecv, code); rpcSendErrorMsgToPeer(pRecv, terrno);
tTrace("%s %p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code); tTrace("%s %p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], terrno);
} }
} else { // parsing OK } else { // parsing OK
rpcProcessIncomingMsg(pConn, pHead); rpcProcessIncomingMsg(pConn, pHead);
} }
} }
if ( code != 0 ) free (pRecv->msg); if ( terrno ) free (pRecv->msg);
return pConn; return pConn;
} }
...@@ -812,7 +843,8 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -812,7 +843,8 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
int32_t code = pHead->code; int32_t code = pHead->code;
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;
pConn->pContext = NULL; pConn->pContext = NULL;
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->user); // for UDP, port may be changed by server, the port in ipSet shall be used for cache
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pContext->ipSet.port, pConn->connType);
if (code == TSDB_CODE_REDIRECT) { if (code == TSDB_CODE_REDIRECT) {
pContext->redirect = 1; pContext->redirect = 1;
...@@ -821,7 +853,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -821,7 +853,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps); tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps);
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
} else { } else {
if ( pRpc->ufp && (pContext->ipSet.index != pContext->oldIndex || pContext->redirect) ) if ( pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) )
(*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet (*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet
(*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, code); (*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, code);
rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg
...@@ -969,8 +1001,8 @@ static void rpcProcessConnError(void *param, void *id) { ...@@ -969,8 +1001,8 @@ static void rpcProcessConnError(void *param, void *id) {
(*(pRpc->cfp))(pContext->msgType+1, NULL, 0, pContext->ahandle, pContext->code); (*(pRpc->cfp))(pContext->msgType+1, NULL, 0, pContext->ahandle, pContext->code);
} else { } else {
// move to next IP // move to next IP
pContext->ipSet.index++; pContext->ipSet.inUse++;
pContext->ipSet.index = pContext->ipSet.index % pContext->ipSet.numOfIps; pContext->ipSet.inUse = pContext->ipSet.inUse % pContext->ipSet.numOfIps;
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
} }
} }
......
...@@ -51,7 +51,7 @@ void processResponse(char type, void *pCont, int contLen, void *ahandle, int32_t ...@@ -51,7 +51,7 @@ void processResponse(char type, void *pCont, int contLen, void *ahandle, int32_t
void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) {
SInfo *pInfo = (SInfo *)handle; SInfo *pInfo = (SInfo *)handle;
tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->index); tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse);
pInfo->ipSet = *pIpSet; pInfo->ipSet = *pIpSet;
} }
...@@ -92,7 +92,7 @@ int main(int argc, char *argv[]) { ...@@ -92,7 +92,7 @@ int main(int argc, char *argv[]) {
// server info // server info
ipSet.numOfIps = 1; ipSet.numOfIps = 1;
ipSet.index = 0; ipSet.inUse = 0;
ipSet.port = 7000; ipSet.port = 7000;
ipSet.ip[0] = inet_addr(serverIp); ipSet.ip[0] = inet_addr(serverIp);
ipSet.ip[1] = inet_addr("192.168.0.1"); ipSet.ip[1] = inet_addr("192.168.0.1");
...@@ -189,7 +189,7 @@ int main(int argc, char *argv[]) { ...@@ -189,7 +189,7 @@ int main(int argc, char *argv[]) {
float usedTime = (endTime - startTime)/1000.0; // mseconds float usedTime = (endTime - startTime)/1000.0; // mseconds
tPrint("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads); tPrint("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads);
tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000*numOfReqs*appThreads/usedTime, msgSize); tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
taosCloseLog(); taosCloseLog();
......
...@@ -76,7 +76,7 @@ int main(int argc, char *argv[]) { ...@@ -76,7 +76,7 @@ int main(int argc, char *argv[]) {
rpcInit.localPort = atoi(argv[++i]); rpcInit.localPort = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i")==0 && i < argc-1) { } else if (strcmp(argv[i], "-i")==0 && i < argc-1) {
strcpy(rpcInit.localIp, argv[++i]); strcpy(rpcInit.localIp, argv[++i]);
} else if (strcmp(argv[i], "-n")==0 && i < argc-1) { } else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
rpcInit.numOfThreads = atoi(argv[++i]); rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) { } else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
msgSize = atoi(argv[++i]); msgSize = atoi(argv[++i]);
...@@ -92,7 +92,7 @@ int main(int argc, char *argv[]) { ...@@ -92,7 +92,7 @@ int main(int argc, char *argv[]) {
printf("\nusage: %s [options] \n", argv[0]); printf("\nusage: %s [options] \n", argv[0]);
printf(" [-i ip]: server IP address, default is:%s\n", rpcInit.localIp); printf(" [-i ip]: server IP address, default is:%s\n", rpcInit.localIp);
printf(" [-p port]: server port number, default is:%d\n", rpcInit.localPort); printf(" [-p port]: server port number, default is:%d\n", rpcInit.localPort);
printf(" [-t threads]: number of threads, default is:%d\n", rpcInit.numOfThreads); printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of sessions, default is:%d\n", rpcInit.sessions); printf(" [-s sessions]: number of sessions, default is:%d\n", rpcInit.sessions);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize); printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize); printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
...@@ -103,6 +103,7 @@ int main(int argc, char *argv[]) { ...@@ -103,6 +103,7 @@ int main(int argc, char *argv[]) {
} }
} }
tsAsyncLog = 0;
rpcInit.connType = TAOS_CONN_SERVER; rpcInit.connType = TAOS_CONN_SERVER;
taosInitLog("server.log", 100000, 10); taosInitLog("server.log", 100000, 10);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册