提交 54cd817f 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

fix many bugs

上级 e9470f0a
......@@ -20,10 +20,10 @@
extern "C" {
#endif
void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer);
void taosCloseConnCache(void *handle);
void taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user);
void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user);
void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer);
void rpcCloseConnCache(void *handle);
void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user);
void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user);
#ifdef __cplusplus
}
......
......@@ -40,13 +40,13 @@ typedef struct {
int * count;
int64_t keepTimer;
pthread_mutex_t mutex;
void (*cleanFp)(void *);
void *tmrCtrl;
void *pTimer;
void (*cleanFp)(void *);
void *tmrCtrl;
void *pTimer;
} SConnCache;
int taosHashConn(void *handle, uint32_t ip, uint16_t port, char *user) {
SConnCache *pObj = (SConnCache *)handle;
int rpcHashConn(void *handle, uint32_t ip, uint16_t port, char *user) {
SConnCache *pCache = (SConnCache *)handle;
int hash = 0;
// size_t user_len = strlen(user);
......@@ -58,109 +58,109 @@ int taosHashConn(void *handle, uint32_t ip, uint16_t port, char *user) {
user++;
}
hash = hash % pObj->maxSessions;
hash = hash % pCache->maxSessions;
return hash;
}
void taosRemoveExpiredNodes(SConnCache *pObj, SConnHash *pNode, int hash, uint64_t time) {
if (time < pObj->keepTimer + pNode->time) return;
void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) {
if (pNode == NULL || (time < pCache->keepTimer + pNode->time) ) return;
SConnHash *pPrev = pNode->prev, *pNext;
while (pNode) {
(*pObj->cleanFp)(pNode->data);
(*pCache->cleanFp)(pNode->data);
pNext = pNode->next;
pObj->total--;
pObj->count[hash]--;
tscTrace("%p ip:0x%x:%hu:%d:%p removed, connections in cache:%d", pNode->data, pNode->ip, pNode->port, hash, pNode,
pObj->count[hash]);
taosMemPoolFree(pObj->connHashMemPool, (char *)pNode);
pCache->total--;
pCache->count[hash]--;
tTrace("%p ip:0x%x:%hu:%d:%p removed, connections in cache:%d", pNode->data, pNode->ip, pNode->port, hash, pNode,
pCache->count[hash]);
taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
pNode = pNext;
}
if (pPrev)
pPrev->next = NULL;
else
pObj->connHashList[hash] = NULL;
pCache->connHashList[hash] = NULL;
}
void taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user) {
void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user) {
int hash;
SConnHash * pNode;
SConnCache *pObj;
SConnCache *pCache;
uint64_t time = taosGetTimestampMs();
pObj = (SConnCache *)handle;
assert(pObj);
pCache = (SConnCache *)handle;
assert(pCache);
assert(data);
hash = taosHashConn(pObj, ip, port, user);
pNode = (SConnHash *)taosMemPoolMalloc(pObj->connHashMemPool);
hash = rpcHashConn(pCache, ip, port, user);
pNode = (SConnHash *)taosMemPoolMalloc(pCache->connHashMemPool);
pNode->ip = ip;
pNode->port = port;
pNode->data = data;
pNode->prev = NULL;
pNode->time = time;
pthread_mutex_lock(&pObj->mutex);
pthread_mutex_lock(&pCache->mutex);
pNode->next = pObj->connHashList[hash];
if (pObj->connHashList[hash] != NULL) (pObj->connHashList[hash])->prev = pNode;
pObj->connHashList[hash] = pNode;
pNode->next = pCache->connHashList[hash];
if (pCache->connHashList[hash] != NULL) (pCache->connHashList[hash])->prev = pNode;
pCache->connHashList[hash] = pNode;
pObj->total++;
pObj->count[hash]++;
taosRemoveExpiredNodes(pObj, pNode->next, hash, time);
pCache->total++;
pCache->count[hash]++;
rpcRemoveExpiredNodes(pCache, pNode->next, hash, time);
pthread_mutex_unlock(&pObj->mutex);
pthread_mutex_unlock(&pCache->mutex);
tscTrace("%p ip:0x%x:%hu:%d:%p added, connections in cache:%d", data, ip, port, hash, pNode, pObj->count[hash]);
tTrace("%p ip:0x%x:%hu:%d:%p added, connections in cache:%d", data, ip, port, hash, pNode, pCache->count[hash]);
return;
}
void taosCleanConnCache(void *handle, void *tmrId) {
void rpcCleanConnCache(void *handle, void *tmrId) {
int hash;
SConnHash * pNode;
SConnCache *pObj;
SConnCache *pCache;
pObj = (SConnCache *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return;
if (pObj->pTimer != tmrId) return;
pCache = (SConnCache *)handle;
if (pCache == NULL || pCache->maxSessions == 0) return;
if (pCache->pTimer != tmrId) return;
uint64_t time = taosGetTimestampMs();
for (hash = 0; hash < pObj->maxSessions; ++hash) {
pthread_mutex_lock(&pObj->mutex);
pNode = pObj->connHashList[hash];
taosRemoveExpiredNodes(pObj, pNode, hash, time);
pthread_mutex_unlock(&pObj->mutex);
for (hash = 0; hash < pCache->maxSessions; ++hash) {
pthread_mutex_lock(&pCache->mutex);
pNode = pCache->connHashList[hash];
rpcRemoveExpiredNodes(pCache, pNode, hash, time);
pthread_mutex_unlock(&pCache->mutex);
}
// tscTrace("timer, total connections in cache:%d", pObj->total);
taosTmrReset(taosCleanConnCache, pObj->keepTimer * 2, pObj, pObj->tmrCtrl, &pObj->pTimer);
// tTrace("timer, total connections in cache:%d", pCache->total);
taosTmrReset(rpcCleanConnCache, pCache->keepTimer * 2, pCache, pCache->tmrCtrl, &pCache->pTimer);
}
void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) {
void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) {
int hash;
SConnHash * pNode;
SConnCache *pObj;
SConnCache *pCache;
void * pData = NULL;
pObj = (SConnCache *)handle;
assert(pObj);
pCache = (SConnCache *)handle;
assert(pCache);
uint64_t time = taosGetTimestampMs();
hash = taosHashConn(pObj, ip, port, user);
pthread_mutex_lock(&pObj->mutex);
hash = rpcHashConn(pCache, ip, port, user);
pthread_mutex_lock(&pCache->mutex);
pNode = pObj->connHashList[hash];
pNode = pCache->connHashList[hash];
while (pNode) {
if (time >= pObj->keepTimer + pNode->time) {
taosRemoveExpiredNodes(pObj, pNode, hash, time);
if (time >= pCache->keepTimer + pNode->time) {
rpcRemoveExpiredNodes(pCache, pNode, hash, time);
pNode = NULL;
break;
}
......@@ -171,12 +171,12 @@ void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user)
}
if (pNode) {
taosRemoveExpiredNodes(pObj, pNode->next, hash, time);
rpcRemoveExpiredNodes(pCache, pNode->next, hash, time);
if (pNode->prev) {
pNode->prev->next = pNode->next;
} else {
pObj->connHashList[hash] = pNode->next;
pCache->connHashList[hash] = pNode->next;
}
if (pNode->next) {
......@@ -184,24 +184,24 @@ void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user)
}
pData = pNode->data;
taosMemPoolFree(pObj->connHashMemPool, (char *)pNode);
pObj->total--;
pObj->count[hash]--;
taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
pCache->total--;
pCache->count[hash]--;
}
pthread_mutex_unlock(&pObj->mutex);
pthread_mutex_unlock(&pCache->mutex);
if (pData) {
tscTrace("%p ip:0x%x:%hu:%d:%p retrieved, connections in cache:%d", pData, ip, port, hash, pNode, pObj->count[hash]);
tTrace("%p ip:0x%x:%hu:%d:%p retrieved, connections in cache:%d", pData, ip, port, hash, pNode, pCache->count[hash]);
}
return pData;
}
void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) {
void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) {
SConnHash **connHashList;
mpool_h connHashMemPool;
SConnCache *pObj;
SConnCache *pCache;
connHashMemPool = taosMemPoolInit(maxSessions, sizeof(SConnHash));
if (connHashMemPool == 0) return NULL;
......@@ -212,48 +212,48 @@ void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl,
return NULL;
}
pObj = malloc(sizeof(SConnCache));
if (pObj == NULL) {
pCache = malloc(sizeof(SConnCache));
if (pCache == NULL) {
taosMemPoolCleanUp(connHashMemPool);
free(connHashList);
return NULL;
}
memset(pObj, 0, sizeof(SConnCache));
memset(pCache, 0, sizeof(SConnCache));
pObj->count = calloc(sizeof(int), maxSessions);
pObj->total = 0;
pObj->keepTimer = keepTimer;
pObj->maxSessions = maxSessions;
pObj->connHashMemPool = connHashMemPool;
pObj->connHashList = connHashList;
pObj->cleanFp = cleanFp;
pObj->tmrCtrl = tmrCtrl;
taosTmrReset(taosCleanConnCache, pObj->keepTimer * 2, pObj, pObj->tmrCtrl, &pObj->pTimer);
pCache->count = calloc(sizeof(int), maxSessions);
pCache->total = 0;
pCache->keepTimer = keepTimer;
pCache->maxSessions = maxSessions;
pCache->connHashMemPool = connHashMemPool;
pCache->connHashList = connHashList;
pCache->cleanFp = cleanFp;
pCache->tmrCtrl = tmrCtrl;
taosTmrReset(rpcCleanConnCache, pCache->keepTimer * 2, pCache, pCache->tmrCtrl, &pCache->pTimer);
pthread_mutex_init(&pObj->mutex, NULL);
pthread_mutex_init(&pCache->mutex, NULL);
return pObj;
return pCache;
}
void taosCloseConnCache(void *handle) {
SConnCache *pObj;
void rpcCloseConnCache(void *handle) {
SConnCache *pCache;
pObj = (SConnCache *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return;
pCache = (SConnCache *)handle;
if (pCache == NULL || pCache->maxSessions == 0) return;
pthread_mutex_lock(&pObj->mutex);
pthread_mutex_lock(&pCache->mutex);
taosTmrStopA(&(pObj->pTimer));
taosTmrStopA(&(pCache->pTimer));
if (pObj->connHashMemPool) taosMemPoolCleanUp(pObj->connHashMemPool);
if (pCache->connHashMemPool) taosMemPoolCleanUp(pCache->connHashMemPool);
tfree(pObj->connHashList);
tfree(pObj->count)
tfree(pCache->connHashList);
tfree(pCache->count)
pthread_mutex_unlock(&pObj->mutex);
pthread_mutex_unlock(&pCache->mutex);
pthread_mutex_destroy(&pObj->mutex);
pthread_mutex_destroy(&pCache->mutex);
memset(pObj, 0, sizeof(SConnCache));
free(pObj);
memset(pCache, 0, sizeof(SConnCache));
free(pCache);
}
......@@ -32,11 +32,11 @@
#include "trpc.h"
#include "taoserror.h"
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHeader) + sizeof(SRpcDigest))
#define rpcHeaderFromCont(cont) ((SRpcHeader *) (cont - sizeof(SRpcHeader)))
#define rpcContFromHeader(msg) (msg + sizeof(SRpcHeader))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHeader))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHeader))
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead *) (cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
#define rpcIsReq(type) (type & 1U)
typedef struct {
......@@ -48,11 +48,11 @@ typedef struct {
int connType;
char label[12];
char *meterId; // meter ID
char meterId[TSDB_UNI_LEN]; // meter ID
char spi; // security parameter index
char encrypt; // encrypt algorithm
char *secret; // key for authentication
char *ckey; // ciphering key
uint8_t secret[TSDB_KEY_LEN]; // secret for the link
uint8_t ckey[TSDB_KEY_LEN]; // ciphering key
void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code);
int (*afp)(char *meterId, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey);
......@@ -78,7 +78,7 @@ typedef struct {
int16_t numOfTry; // number of try for different servers
int8_t oldIndex; // server IP index passed by app
int8_t redirect; // flag to indicate redirect
char msg[0]; // RpcHeader starts from here
char msg[0]; // RpcHead starts from here
} SRpcReqContext;
typedef struct _RpcConn {
......@@ -114,6 +114,8 @@ typedef struct _RpcConn {
SRpcReqContext *pContext; // request context
} SRpcConn;
#pragma pack(push, 1)
typedef struct {
char version:4; // RPC version
char comp:4; // compression algorithm, 0:no compression 1:lz4
......@@ -131,7 +133,7 @@ typedef struct {
int32_t msgLen; // message length including the header iteslf
int32_t code;
uint8_t content[0]; // message body starts from here
} SRpcHeader;
} SRpcHead;
typedef struct {
int32_t reserved;
......@@ -143,6 +145,8 @@ typedef struct {
uint8_t auth[TSDB_AUTH_LEN];
} SRpcDigest;
#pragma pack(pop)
int tsRpcProgressTime = 10; // milliseocnds
// not configurable
......@@ -197,7 +201,7 @@ static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uin
static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle);
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader);
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead);
static void rpcProcessConnError(void *param, void *id);
static void rpcProcessRetryTimer(void *, void *);
static void rpcProcessIdleTimer(void *param, void *tmrId);
......@@ -205,7 +209,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId);
static void rpcFreeOutMsg(void *msg);
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader);
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead);
static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen);
static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen);
......@@ -218,7 +222,7 @@ void *rpcOpen(SRpcInit *pInit) {
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) return NULL;
strcpy(pRpc->label, pInit->label);
if(pInit->label) strcpy(pRpc->label, pInit->label);
pRpc->connType = pInit->connType;
pRpc->idleTime = pInit->idleTime;
pRpc->numOfThreads = pInit->numOfThreads;
......@@ -226,14 +230,14 @@ void *rpcOpen(SRpcInit *pInit) {
pRpc->numOfThreads = TSDB_MAX_RPC_THREADS;
}
strcpy(pRpc->localIp, pInit->localIp);
if (pInit->localIp) strcpy(pRpc->localIp, pInit->localIp);
pRpc->localPort = pInit->localPort;
pRpc->afp = pInit->afp;
pRpc->sessions = pInit->sessions;
strcpy(pRpc->meterId, pInit->meterId);
if (pInit->meterId) strcpy(pRpc->meterId, pInit->meterId);
if (pInit->secret) strcpy(pRpc->secret, pInit->secret);
if (pInit->ckey) strcpy(pRpc->ckey, pInit->ckey);
pRpc->spi = pInit->spi;
strcpy(pRpc->secret, pInit->secret);
strcpy(pRpc->ckey, pInit->ckey);
pRpc->ufp = pInit->ufp;
pRpc->cfp = pInit->cfp;
pRpc->afp = pInit->afp;
......@@ -275,7 +279,7 @@ void *rpcOpen(SRpcInit *pInit) {
return NULL;
}
pRpc->pCache = taosOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, tsShellActivityTimer*1000);
pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, tsShellActivityTimer*1000);
if ( pRpc->pCache == NULL ) {
tError("%s failed to init connection cache", pRpc->label);
rpcClose(pRpc);
......@@ -303,7 +307,7 @@ void rpcClose(void *param) {
taosCleanUpStrHash(pRpc->hash);
taosTmrCleanUp(pRpc->tmrCtrl);
taosIdPoolCleanUp(pRpc->idPool);
taosCloseConnCache(pRpc->pCache);
rpcCloseConnCache(pRpc->pCache);
tfree(pRpc->connList);
pthread_mutex_destroy(&pRpc->mutex);
......@@ -320,11 +324,11 @@ void *rpcMallocCont(int size) {
return NULL;
}
return pMsg + sizeof(SRpcReqContext) + sizeof(SRpcHeader);
return pMsg + sizeof(SRpcReqContext) + sizeof(SRpcHead);
}
void rpcFreeCont(void *cont) {
char *msg = ((char *)cont) - sizeof(SRpcHeader);
char *msg = ((char *)cont) - sizeof(SRpcHead);
free(msg);
}
......@@ -333,7 +337,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int
SRpcReqContext *pContext;
contLen = rpcCompressRpcMsg(pCont, contLen);
pContext = (SRpcReqContext *) (pCont-sizeof(SRpcHeader)-sizeof(SRpcReqContext));
pContext = (SRpcReqContext *) (pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
pContext->ahandle = ahandle;
pContext->pRpc = (SRpcInfo *)shandle;
pContext->ipSet = ipSet;
......@@ -348,11 +352,11 @@ void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int
}
void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
int msgLen = 0;
SRpcConn *pConn = (SRpcConn *)handle;
SRpcInfo *pRpc = pConn->pRpc;
SRpcHeader *pHeader = rpcHeaderFromCont(pCont);
char *msg = (char *)pHeader;
int msgLen = 0;
SRpcConn *pConn = (SRpcConn *)handle;
SRpcInfo *pRpc = pConn->pRpc;
SRpcHead *pHead = rpcHeadFromCont(pCont);
char *msg = (char *)pHead;
if ( pCont == NULL ) {
pCont = rpcMallocCont(0);
......@@ -371,17 +375,17 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
}
// set msg header
pHeader->version = 1;
pHeader->msgType = pConn->inType+1;
pHeader->spi = 0;
pHeader->tcp = 0;
pHeader->encrypt = 0;
pHeader->tranId = pConn->inTranId;
pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId;
pHeader->uid = 0;
pHeader->code = htonl(code);
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
pHead->version = 1;
pHead->msgType = pConn->inType+1;
pHead->spi = 0;
pHead->tcp = 0;
pHead->encrypt = 0;
pHead->tranId = pConn->inTranId;
pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId;
pHead->uid = 0;
pHead->code = htonl(code);
memcpy(pHead->meterId, pConn->meterId, tListLen(pHead->meterId));
// set pConn parameters
pConn->inType = 0;
......@@ -390,7 +394,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
rpcFreeOutMsg(pConn->pRspMsg);
pConn->pRspMsg = msg;
pConn->rspMsgLen = msgLen;
if (pHeader->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
if (pHead->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
pthread_mutex_unlock(&pRpc->mutex);
......@@ -487,7 +491,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
terrno = TSDB_CODE_MAX_SESSIONS;
} else {
tTrace("%s sid:%d, ID allocated, used:%d, old id:%d", pRpc->label, sid, taosIdPoolNumOfUsed(pRpc->idPool));
tTrace("%s sid:%d, ID allocated, used:%d", pRpc->label, sid, taosIdPoolNumOfUsed(pRpc->idPool));
pConn = pRpc->connList + sid;
memset(pConn, 0, sizeof(SRpcConn));
......@@ -562,7 +566,7 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *meterId, char *has
SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet) {
SRpcConn *pConn;
pConn = taosGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->meterId);
pConn = rpcGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->meterId);
if ( pConn == NULL ) {
char ipstr[20] = {0};
tinet_ntoa(ipstr, ipSet.ip[ipSet.index]);
......@@ -572,29 +576,29 @@ SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet) {
return pConn;
}
static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
SRpcInfo *pRpc= pConn->pRpc;
if (pConn->peerId == 0) {
pConn->peerId = pHeader->sourceId;
pConn->peerId = pHead->sourceId;
} else {
if (pConn->peerId != pHeader->sourceId) {
if (pConn->peerId != pHead->sourceId) {
tTrace("%s pConn:%p, source Id is changed, old:0x%08x new:0x%08x", pRpc->label, pConn,
pConn->peerId, pHeader->sourceId);
pConn->peerId, pHead->sourceId);
return TSDB_CODE_INVALID_VALUE;
}
}
if (pConn->inTranId == pHeader->tranId) {
if (pConn->inType == pHeader->msgType) {
tTrace("%s pConn:%p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHeader->msgType]);
if (pConn->inTranId == pHead->tranId) {
if (pConn->inType == pHead->msgType) {
tTrace("%s pConn:%p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]);
rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
} else if (pConn->inType == 0) {
tTrace("%s pConn:%p, %s is already processed, tranId:%d", pRpc->label, pConn,
taosMsg[pHeader->msgType], pConn->inTranId);
taosMsg[pHead->msgType], pConn->inTranId);
rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response
} else {
tTrace("%s pConn:%p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHeader->msgType]);
tTrace("%s pConn:%p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHead->msgType]);
}
// do not reply any message
......@@ -603,40 +607,40 @@ static int rpcProcessReqHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
if (pConn->inType != 0) {
tTrace("%s pConn:%p, last session is not finished, inTranId:%d tranId:%d", pRpc->label, pConn,
pConn->inTranId, pHeader->tranId);
pConn->inTranId, pHead->tranId);
return TSDB_CODE_LAST_SESSION_NOT_FINISHED;
}
pConn->inTranId = pHeader->tranId;
pConn->inType = pHeader->msgType;
pConn->inTranId = pHead->tranId;
pConn->inType = pHead->msgType;
return 0;
}
static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
SRpcInfo *pRpc = pConn->pRpc;
pConn->peerId = pHeader->sourceId;
pConn->peerId = pHead->sourceId;
if (pConn->outType == 0 || pConn->pContext == NULL) {
return TSDB_CODE_UNEXPECTED_RESPONSE;
}
if (pHeader->tranId != pConn->outTranId) {
if (pHead->tranId != pConn->outTranId) {
return TSDB_CODE_INVALID_TRAN_ID;
}
if (pHeader->msgType != pConn->outType + 1) {
if (pHead->msgType != pConn->outType + 1) {
return TSDB_CODE_INVALID_RESPONSE_TYPE;
}
if (*pHeader->content == TSDB_CODE_NOT_READY) {
if (*pHead->content == TSDB_CODE_NOT_READY) {
return TSDB_CODE_ALREADY_PROCESSED;
}
taosTmrStopA(&pConn->pTimer);
pConn->retry = 0;
if (*pHeader->content == TSDB_CODE_ACTION_IN_PROGRESS || pHeader->tcp) {
if (*pHead->content == TSDB_CODE_ACTION_IN_PROGRESS || pHead->tcp) {
if (pConn->tretry <= tsRpcMaxRetry) {
pConn->tretry++;
tTrace("%s pConn:%p, peer is still processing the transaction", pRpc->label, pConn);
......@@ -644,7 +648,7 @@ static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
return TSDB_CODE_ALREADY_PROCESSED;
} else {
// peer still in processing, give up
*pHeader->content = TSDB_CODE_TOO_SLOW;
*pHead->content = TSDB_CODE_TOO_SLOW;
}
}
......@@ -656,77 +660,77 @@ static int rpcProcessRspHeader(SRpcConn *pConn, SRpcHeader *pHeader) {
return TSDB_CODE_SUCCESS;
}
static int rpcProcessHeader(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int dataLen, uint32_t ip) {
static int rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int dataLen, uint32_t ip) {
int32_t sid, code = 0;
SRpcConn * pConn = NULL;
char hashstr[40] = {0};
*ppConn = NULL;
SRpcHeader *pHeader = (SRpcHeader *)data;
SRpcHead *pHead = (SRpcHead *)data;
sid = htonl(pHeader->destId);
pHeader->code = htonl(pHeader->code);
pHeader->msgLen = (int32_t)htonl((uint32_t)pHeader->msgLen);
sid = htonl(pHead->destId);
pHead->code = htonl(pHead->code);
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
if (pHeader->msgType >= TSDB_MSG_TYPE_MAX || pHeader->msgType <= 0) {
tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHeader->msgType);
if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) {
tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
return TSDB_CODE_INVALID_MSG_TYPE;
}
if (dataLen != pHeader->msgLen) {
if (dataLen != pHead->msgLen) {
tTrace("%s sid:%d, %s has invalid length, dataLen:%d, msgLen:%d", pRpc->label, sid,
taosMsg[pHeader->msgType], dataLen, pHeader->msgLen);
taosMsg[pHead->msgType], dataLen, pHead->msgLen);
return TSDB_CODE_INVALID_MSG_LEN;
}
if (sid < 0 || sid >= pRpc->sessions) {
tTrace("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid,
pRpc->sessions, taosMsg[pHeader->msgType]);
pRpc->sessions, taosMsg[pHead->msgType]);
return TSDB_CODE_INVALID_SESSION_ID;
}
if (sid == 0) sprintf(hashstr, "%x:%x:%x", ip, pHeader->uid, pHeader->sourceId);
pConn = rpcGetConnObj(pRpc, sid, pHeader->meterId, hashstr);
if (sid == 0) sprintf(hashstr, "%x:%x:%x", ip, pHead->uid, pHead->sourceId);
pConn = rpcGetConnObj(pRpc, sid, pHead->meterId, hashstr);
if (pConn == NULL ) return terrno;
*ppConn = pConn;
sid = pConn->sid;
if (pHeader->uid) pConn->peerUid = pHeader->uid;
if (pHead->uid) pConn->peerUid = pHead->uid;
if (pHeader->tcp) {
if (pHead->tcp) {
tTrace("%s pConn:%p, content will be transfered via TCP", pRpc->label, pConn);
if (pConn->outType) taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
return TSDB_CODE_ALREADY_PROCESSED;
}
code = rpcCheckAuthentication(pConn, (char *)pHeader, dataLen);
code = rpcCheckAuthentication(pConn, (char *)pHead, dataLen);
if ( code != 0 ) return code;
if (pHeader->msgType != TSDB_MSG_TYPE_REG && pHeader->encrypt) {
if (pHead->msgType != TSDB_MSG_TYPE_REG && pHead->encrypt) {
// decrypt here
}
if ( rpcIsReq(pHeader->msgType) ) {
code = rpcProcessReqHeader(pConn, pHeader);
if ( rpcIsReq(pHead->msgType) ) {
code = rpcProcessReqHead(pConn, pHead);
} else {
code = rpcProcessRspHeader(pConn, pHeader);
code = rpcProcessRspHead(pConn, pHead);
}
return code;
}
static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle) {
SRpcHeader *pHeader = (SRpcHeader *)data;
SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcConn *pConn = NULL;
uint8_t code = 0;
SRpcHead *pHead = (SRpcHead *)data;
SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcConn *pConn = NULL;
uint8_t code = 0;
tDump(data, dataLen);
pthread_mutex_lock(&pRpc->mutex);
code = rpcProcessHeader(pRpc, &pConn, data, dataLen, ip);
code = rpcProcessHead(pRpc, &pConn, data, dataLen, ip);
if (pConn) {
// update connection info
......@@ -739,16 +743,16 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_
}
if (port) pConn->peerPort = port;
if (pHeader->port) // port maybe changed by the peer
pConn->peerPort = pHeader->port;
if (pHead->port) // port maybe changed by the peer
pConn->peerPort = pHead->port;
}
pthread_mutex_unlock(&pRpc->mutex);
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
tTrace("%s pConn:%p, %s received from 0x%x:%hu, parse code:%u len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHeader->msgType], ip, port, code,
dataLen, pHeader->sourceId, pHeader->destId, pHeader->tranId);
pRpc->label, pConn, taosMsg[pHead->msgType], ip, port, code,
dataLen, pHead->sourceId, pHead->destId, pHead->tranId);
}
if (pConn && pRpc->idleTime) {
......@@ -757,12 +761,12 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_
if (code != TSDB_CODE_ALREADY_PROCESSED) {
if (code != 0) { // parsing error
if ( rpcIsReq(pHeader->msgType) ) {
if ( rpcIsReq(pHead->msgType) ) {
rpcSendErrorMsgToPeer(pRpc, data, code, ip, port, chandle);
tTrace("%s pConn:%p, %s is sent with error code:%u", pRpc->label, pConn, taosMsg[pHeader->msgType+1], code);
tTrace("%s pConn:%p, %s is sent with error code:%u", pRpc->label, pConn, taosMsg[pHead->msgType+1], code);
}
} else { // parsing OK
rpcProcessIncomingMsg(pConn, pHeader);
rpcProcessIncomingMsg(pConn, pHead);
}
}
......@@ -770,100 +774,100 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_
return pConn;
}
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) {
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
SRpcInfo *pRpc = pConn->pRpc;
pHeader = rpcDecompressRpcMsg(pHeader);
int contLen = rpcContLenFromMsg(pHeader->msgLen);
uint8_t *pCont = pHeader->content;
pHead = rpcDecompressRpcMsg(pHead);
int contLen = rpcContLenFromMsg(pHead->msgLen);
uint8_t *pCont = pHead->content;
if ( rpcIsReq(pHeader->msgType) ) {
if ( rpcIsReq(pHead->msgType) ) {
taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer);
(*(pRpc->cfp))(pHeader->msgType, pCont, contLen, pConn, 0);
(*(pRpc->cfp))(pHead->msgType, pCont, contLen, pConn, 0);
} else {
// it's a response
int32_t code = pHeader->code;
int32_t code = pHead->code;
SRpcReqContext *pContext = pConn->pContext;
pConn->pContext = NULL;
taosAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId);
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId);
if (code == TSDB_CODE_REDIRECT) {
pContext->redirect = 1;
pContext->numOfTry = 0;
memcpy(&pContext->ipSet, pHeader->content, sizeof(pContext->ipSet));
memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet));
rpcSendReqToServer(pRpc, pContext);
} else {
rpcFreeOutMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg
rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg
if ( pContext->ipSet.index != pContext->oldIndex || pContext->redirect )
(*pRpc->ufp)(pContext->ahandle, pContext->ipSet);
(*pRpc->cfp)(pHeader->msgType, pCont, contLen, pContext->ahandle, pContext->ipSet.index);
(*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, pContext->ipSet.index);
}
}
}
static void rpcSendQuickRsp(SRpcConn *pConn, char code) {
char msg[RPC_MSG_OVERHEAD];
SRpcHeader *pHeader;
char msg[RPC_MSG_OVERHEAD];
SRpcHead *pHead;
// set msg header
memset(msg, 0, sizeof(SRpcHeader));
pHeader = (SRpcHeader *)msg;
pHeader->version = 1;
pHeader->msgType = pConn->inType+1;
pHeader->spi = 0;
pHeader->tcp = 0;
pHeader->encrypt = 0;
pHeader->tranId = pConn->inTranId;
pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId;
pHeader->uid = 0;
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
pHeader->code = htonl(code);
memset(msg, 0, sizeof(SRpcHead));
pHead = (SRpcHead *)msg;
pHead->version = 1;
pHead->msgType = pConn->inType+1;
pHead->spi = 0;
pHead->tcp = 0;
pHead->encrypt = 0;
pHead->tranId = pConn->inTranId;
pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId;
pHead->uid = 0;
memcpy(pHead->meterId, pConn->meterId, tListLen(pHead->meterId));
pHead->code = htonl(code);
rpcSendMsgToPeer(pConn, msg, 0);
}
static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle) {
SRpcHeader *pRecvHeader, *pReplyHeader;
char msg[sizeof(SRpcHeader) + sizeof(SRpcDigest) + sizeof(uint32_t) ];
SRpcHead *pRecvHead, *pReplyHead;
char msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t) ];
uint32_t timeStamp;
int msgLen;
pRecvHeader = (SRpcHeader *)pMsg;
pReplyHeader = (SRpcHeader *)msg;
pRecvHead = (SRpcHead *)pMsg;
pReplyHead = (SRpcHead *)msg;
memset(msg, 0, sizeof(SRpcHeader));
pReplyHeader->version = pRecvHeader->version;
pReplyHeader->msgType = (char)(pRecvHeader->msgType + 1);
pReplyHeader->tcp = 0;
pReplyHeader->spi = 0;
pReplyHeader->encrypt = 0;
pReplyHeader->tranId = pRecvHeader->tranId;
pReplyHeader->sourceId = 0;
pReplyHeader->destId = pRecvHeader->sourceId;
memcpy(pReplyHeader->meterId, pRecvHeader->meterId, tListLen(pReplyHeader->meterId));
memset(msg, 0, sizeof(SRpcHead));
pReplyHead->version = pRecvHead->version;
pReplyHead->msgType = (char)(pRecvHead->msgType + 1);
pReplyHead->tcp = 0;
pReplyHead->spi = 0;
pReplyHead->encrypt = 0;
pReplyHead->tranId = pRecvHead->tranId;
pReplyHead->sourceId = 0;
pReplyHead->destId = pRecvHead->sourceId;
memcpy(pReplyHead->meterId, pRecvHead->meterId, tListLen(pReplyHead->meterId));
pReplyHeader->code = htonl(code);
msgLen = sizeof(SRpcHeader);
pReplyHead->code = htonl(code);
msgLen = sizeof(SRpcHead);
if (code == TSDB_CODE_INVALID_TIME_STAMP) {
// include a time stamp if client's time is not synchronized well
uint8_t *pContent = pReplyHeader->content;
uint8_t *pContent = pReplyHead->content;
timeStamp = taosGetTimestampSec();
memcpy(pContent, &timeStamp, sizeof(timeStamp));
msgLen += sizeof(timeStamp);
}
pReplyHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
pReplyHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
(*taosSendData[pRpc->connType])(ip, port, msg, msgLen, chandle);
return;
}
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
SRpcHeader *pHeader = rpcHeaderFromCont(pContext->pCont);
char *msg = (char *)pHeader;
int msgLen = rpcMsgLenFromCont(pContext->contLen);
char msgType = pContext->msgType;
SRpcHead *pHead = rpcHeadFromCont(pContext->pCont);
char *msg = (char *)pHead;
int msgLen = rpcMsgLenFromCont(pContext->contLen);
char msgType = pContext->msgType;
pContext->numOfTry++;
SRpcConn *pConn = rpcSetConnToServer(pRpc, pContext->ipSet);
......@@ -876,22 +880,22 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pthread_mutex_lock(&pRpc->mutex);
// set the message header
pHeader->version = 1;
pHeader->msgType = msgType;
pHeader->tcp = 0;
pHeader->encrypt = 0;
pHead->version = 1;
pHead->msgType = msgType;
pHead->tcp = 0;
pHead->encrypt = 0;
pConn->tranId++;
if ( pConn->tranId == 0 ) pConn->tranId++;
pHeader->tranId = pConn->tranId;
pHeader->sourceId = pConn->ownId;
pHeader->destId = pConn->peerId;
pHeader->port = 0;
pHeader->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid());
memcpy(pHeader->meterId, pConn->meterId, tListLen(pHeader->meterId));
pHead->tranId = pConn->tranId;
pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId;
pHead->port = 0;
pHead->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid());
memcpy(pHead->meterId, pConn->meterId, tListLen(pHead->meterId));
// set the connection parameters
pConn->outType = msgType;
pConn->outTranId = pHeader->tranId;
pConn->outTranId = pHead->tranId;
pConn->pReqMsg = msg;
pConn->reqMsgLen = msgLen;
pConn->pContext = pContext;
......@@ -903,25 +907,25 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
}
static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
int writtenLen = 0;
SRpcInfo *pRpc = pConn->pRpc;
SRpcHeader *pHeader = (SRpcHeader *)msg;
int writtenLen = 0;
SRpcInfo *pRpc = pConn->pRpc;
SRpcHead *pHead = (SRpcHead *)msg;
msgLen = rpcAddAuthPart(pConn, msg, msgLen);
if ( rpcIsReq(pHeader->msgType)) {
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
if ( rpcIsReq(pHead->msgType)) {
if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace("%s pConn:%p, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr,
pConn->peerPort, msgLen, pHeader->sourceId, pHeader->destId, pHeader->tranId);
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr,
pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} else {
if (pHeader->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace( "%s pConn:%p, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHeader->msgType], pConn->peerIpstr, pConn->peerPort,
(uint8_t)pHeader->content[0], msgLen, pHeader->sourceId, pHeader->destId, pHeader->tranId);
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort,
(uint8_t)pHead->content[0], msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
}
writtenLen = (*taosSendData[pRpc->connType])(pConn->peerIp, pConn->peerPort, (char *)pHeader, msgLen, pConn->chandle);
writtenLen = (*taosSendData[pRpc->connType])(pConn->peerIp, pConn->peerPort, (char *)pHead, msgLen, pConn->chandle);
if (writtenLen != msgLen) {
tError("%s pConn:%p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn,
......@@ -936,7 +940,7 @@ static void rpcProcessConnError(void *param, void *id) {
SRpcInfo *pRpc = pContext->pRpc;
if ( pContext->numOfTry >= pContext->ipSet.numOfIps ) {
rpcFreeOutMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg
rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg
(*(pRpc->cfp))(pContext->msgType+1, NULL, 0, pContext->ahandle, pContext->code);
} else {
// move to next IP
......@@ -959,7 +963,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
pConn->retry++;
if (pConn->retry < 4) {
tTrace("%s pConn:%p, re-send msg:%s to %s:%hu", pRpc->label,
tTrace("%s pConn:%p, re-send msg:%s to %s:%hu", pRpc->label, pConn,
taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort);
rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer<<pConn->retry, pConn, pRpc->tmrCtrl, &pConn->pTimer);
......@@ -1018,9 +1022,9 @@ static void rpcFreeOutMsg(void *msg) {
}
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
SRpcHeader *pHeader = rpcHeaderFromCont(pCont);
int32_t finalLen = 0;
int overhead = sizeof(SRpcComp);
SRpcHead *pHead = rpcHeadFromCont(pCont);
int32_t finalLen = 0;
int overhead = sizeof(SRpcComp);
if (!NEEDTO_COMPRESSS_MSG(contLen)) {
return contLen;
......@@ -1044,7 +1048,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
pComp->contLen = htonl(contLen);
memcpy(pCont + overhead, buf, compLen);
pHeader->comp = 1;
pHead->comp = 1;
tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen);
finalLen = compLen + overhead;
......@@ -1056,13 +1060,13 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
return finalLen;
}
static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) {
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
int overhead = sizeof(SRpcComp);
SRpcHeader *pNewHeader = NULL;
uint8_t *pCont = pHeader->content;
SRpcComp *pComp = (SRpcComp *)pHeader->content;
SRpcHead *pNewHead = NULL;
uint8_t *pCont = pHead->content;
SRpcComp *pComp = (SRpcComp *)pHead->content;
if (pHeader->comp) {
if (pHead->comp) {
// decompress the content
assert(pComp->reserved == 0);
int contLen = htonl(pComp->contLen);
......@@ -1071,21 +1075,21 @@ static SRpcHeader *rpcDecompressRpcMsg(SRpcHeader *pHeader) {
char *buf = rpcMallocCont(contLen);
if (buf) {
pNewHeader = rpcHeaderFromCont(buf);
int compLen = rpcContLenFromMsg(pHeader->msgLen) - overhead;
pNewHead = rpcHeadFromCont(buf);
int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead;
int32_t originalLen = LZ4_decompress_safe((const char*)(pCont + overhead), buf, compLen, contLen);
assert(originalLen == contLen);
memcpy(pNewHeader, pHeader, sizeof(SRpcHeader));
pNewHeader->msgLen = rpcMsgLenFromCont(originalLen);
free(pHeader); // free the compressed message buffer
pHeader = pNewHeader;
memcpy(pNewHead, pHead, sizeof(SRpcHead));
pNewHead->msgLen = rpcMsgLenFromCont(originalLen);
free(pHead); // free the compressed message buffer
pHead = pNewHead;
} else {
tError("failed to allocate memory to decompress msg, contLen:%d, reason:%s", contLen, strerror(errno));
}
}
return pHeader;
return pHead;
}
static int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) {
......@@ -1103,7 +1107,7 @@ static int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t
return ret;
}
static int rpcBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) {
static int rpcBuildAuthHead(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) {
MD5_CTX context;
MD5Init(&context);
......@@ -1118,33 +1122,33 @@ static int rpcBuildAuthHeader(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t
}
static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHeader *pHeader = (SRpcHeader *)msg;
SRpcHead *pHead = (SRpcHead *)msg;
if (pConn->spi) {
// add auth part
pHeader->spi = pConn->spi;
pHead->spi = pConn->spi;
SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen);
pDigest->timeStamp = htonl(taosGetTimestampSec());
msgLen += sizeof(SRpcDigest);
pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
rpcBuildAuthHeader((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
rpcBuildAuthHead((uint8_t *)pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
} else {
pHeader->msgLen = (int32_t)htonl((uint32_t)msgLen);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
}
return msgLen;
}
static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHeader *pHeader = (SRpcHeader *)msg;
SRpcInfo *pRpc = pConn->pRpc;
int code = 0;
SRpcHead *pHead = (SRpcHead *)msg;
SRpcInfo *pRpc = pConn->pRpc;
int code = 0;
if (pConn->spi == 0 ) return 0;
if (pHeader->spi == pConn->spi) {
if (pHead->spi == pConn->spi) {
// authentication
SRpcDigest *pDigest = (SRpcDigest *)((char *)pHeader + msgLen - sizeof(SRpcDigest));
SRpcDigest *pDigest = (SRpcDigest *)((char *)pHead + msgLen - sizeof(SRpcDigest));
int32_t delta;
delta = (int32_t)htonl(pDigest->timeStamp);
......@@ -1154,16 +1158,16 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
delta, htonl(pDigest->timeStamp));
code = TSDB_CODE_INVALID_TIME_STAMP;
} else {
if (rpcAuthenticateMsg((uint8_t *)pHeader, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
if (rpcAuthenticateMsg((uint8_t *)pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
tError("%s pConn:%p, authentication failed, msg discarded", pRpc->label, pConn);
code = TSDB_CODE_AUTH_FAILURE;
} else {
pHeader->msgLen -= sizeof(SRpcDigest);
pHead->msgLen -= sizeof(SRpcDigest);
}
}
} else {
// if it is request or response with code 0, msg shall be discarded
if (rpcIsReq(pHeader->msgType) || (pHeader->content[0] == 0)) {
if (rpcIsReq(pHead->msgType) || (pHead->content[0] == 0)) {
tTrace("%s pConn:%p, auth spi not matched, msg discarded", pRpc->label, pConn);
code = TSDB_CODE_AUTH_FAILURE;
}
......
......@@ -24,18 +24,23 @@ void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code
}
int32_t main(int32_t argc, char *argv[]) {
taosInitLog("client.log", 100000, 10);
dPrint("unit test for rpc module");
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = "0.0.0.0";
rpcInit.localPort = 7000;
rpcInit.label = "unittest";
rpcInit.localPort = 0;
rpcInit.label = "APP";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processMsg;
rpcInit.cfp = processMsg;
rpcInit.sessions = 1000;
rpcInit.connType = TAOS_CONN_UDPC;
rpcInit.idleTime = 2000;
rpcInit.meterId = "jefftao";
rpcInit.secret = "password";
rpcInit.ckey = "key";
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
......@@ -46,6 +51,7 @@ int32_t main(int32_t argc, char *argv[]) {
SRpcIpSet ipSet;
ipSet.numOfIps = 2;
ipSet.index = 0;
ipSet.port = 7000;
ipSet.ip[0] = inet_addr("127.0.0.1");
ipSet.ip[1] = inet_addr("192.168.0.1");
......
......@@ -27,18 +27,23 @@ void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code
}
int32_t main(int32_t argc, char *argv[]) {
taosInitLog("server.log", 100000, 10);
dPrint("unit test for rpc module");
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = "0.0.0.0";
rpcInit.localPort = 7000;
rpcInit.label = "unittest";
rpcInit.label = "APP";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processMsg;
rpcInit.cfp = processMsg;
rpcInit.sessions = 1000;
rpcInit.connType = TAOS_CONN_UDPS;
rpcInit.idleTime = 2000;
rpcInit.meterId = "jefftao";
rpcInit.secret = "password";
rpcInit.ckey = "key";
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
......
......@@ -209,7 +209,7 @@ char tsLocale[TSDB_LOCALE_LEN] = {0};
char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string
int tsNumOfLogLines = 10000000;
uint32_t rpcDebugFlag = 131;
uint32_t rpcDebugFlag = 135;
uint32_t ddebugFlag = 131;
uint32_t mdebugFlag = 135;
uint32_t sdbDebugFlag = 135;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册