未验证 提交 62d79763 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1290 from taosdata/refact/rpc

Refact/rpc
...@@ -48,7 +48,7 @@ typedef struct { ...@@ -48,7 +48,7 @@ typedef struct {
char spi:3; // security parameter index char spi:3; // security parameter index
char encrypt:3; // encrypt algorithm, 0: no encryption char encrypt:3; // encrypt algorithm, 0: no encryption
uint16_t tranId; // transcation ID uint16_t tranId; // transcation ID
uint32_t uid; // for unique ID inside a client uint32_t linkUid; // for unique connection ID assigned by client
uint32_t sourceId; // source ID, an index for connection list uint32_t sourceId; // source ID, an index for connection list
uint32_t destId; // destination ID, an index for connection list uint32_t destId; // destination ID, an index for connection list
uint32_t destIp; // destination IP address, for NAT scenario uint32_t destIp; // destination IP address, for NAT scenario
......
...@@ -94,8 +94,9 @@ typedef struct _RpcConn { ...@@ -94,8 +94,9 @@ typedef struct _RpcConn {
char encrypt; // encryption, 0:1 char encrypt; // encryption, 0:1
char secret[TSDB_KEY_LEN]; // secret for the link char secret[TSDB_KEY_LEN]; // secret for the link
char ckey[TSDB_KEY_LEN]; // ciphering key char ckey[TSDB_KEY_LEN]; // ciphering key
char secured; // if set to 1, no authentication
uint16_t localPort; // for UDP only uint16_t localPort; // for UDP only
uint32_t peerUid; // peer UID uint32_t linkUid; // connection unique ID assigned by client
uint32_t peerIp; // peer IP uint32_t peerIp; // peer IP
uint32_t destIp; // server destination IP to handle NAT uint32_t destIp; // server destination IP to handle NAT
uint16_t peerPort; // peer port uint16_t peerPort; // peer port
...@@ -264,7 +265,7 @@ void *rpcOpen(SRpcInit *pInit) { ...@@ -264,7 +265,7 @@ void *rpcOpen(SRpcInit *pInit) {
return NULL; return NULL;
} }
} else { } else {
pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, tsShellActivityTimer*1000); pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime);
if ( pRpc->pCache == NULL ) { if ( pRpc->pCache == NULL ) {
tError("%s failed to init connection cache", pRpc->label); tError("%s failed to init connection cache", pRpc->label);
rpcClose(pRpc); rpcClose(pRpc);
...@@ -399,10 +400,9 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { ...@@ -399,10 +400,9 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
pHead->tranId = pConn->inTranId; pHead->tranId = pConn->inTranId;
pHead->sourceId = pConn->ownId; pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId; pHead->destId = pConn->peerId;
pHead->uid = 0; pHead->linkUid = pConn->linkUid;
pHead->port = htons(pConn->localPort); pHead->port = htons(pConn->localPort);
pHead->code = htonl(code); pHead->code = htonl(code);
memcpy(pHead->user, pConn->user, tListLen(pHead->user));
// set pConn parameters // set pConn parameters
pConn->inType = 0; pConn->inType = 0;
...@@ -417,6 +417,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { ...@@ -417,6 +417,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pTimer);
rpcSendMsgToPeer(pConn, msg, msgLen); rpcSendMsgToPeer(pConn, msg, msgLen);
pConn->secured = 1; // connection shall be secured
return; return;
} }
...@@ -499,7 +500,7 @@ static void rpcCloseConn(void *thandle) { ...@@ -499,7 +500,7 @@ static void rpcCloseConn(void *thandle) {
if ( pRpc->connType == TAOS_CONN_SERVER) { if ( pRpc->connType == TAOS_CONN_SERVER) {
char hashstr[40] = {0}; char hashstr[40] = {0};
sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->peerUid, pConn->peerId, pConn->connType); sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType);
taosDeleteStrHash(pRpc->hash, hashstr); taosDeleteStrHash(pRpc->hash, hashstr);
rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
pConn->pRspMsg = NULL; pConn->pRspMsg = NULL;
...@@ -535,6 +536,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { ...@@ -535,6 +536,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
pConn->sid = sid; pConn->sid = sid;
pConn->tranId = (uint16_t)(rand() & 0xFFFF); pConn->tranId = (uint16_t)(rand() & 0xFFFF);
pConn->ownId = htonl(pConn->sid); pConn->ownId = htonl(pConn->sid);
pConn->linkUid = (uint32_t)((int64_t)pConn + (int64_t)getpid());
pConn->spi = pRpc->spi; pConn->spi = pRpc->spi;
pConn->encrypt = pRpc->encrypt; pConn->encrypt = pRpc->encrypt;
if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN); if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN);
...@@ -548,7 +550,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -548,7 +550,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
char hashstr[40] = {0}; char hashstr[40] = {0};
SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->uid, pHead->sourceId, pRecv->connType); sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);
// check if it is already allocated // check if it is already allocated
SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr));
...@@ -567,6 +569,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -567,6 +569,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
pConn->sid = sid; pConn->sid = sid;
pConn->tranId = (uint16_t)(rand() & 0xFFFF); pConn->tranId = (uint16_t)(rand() & 0xFFFF);
pConn->ownId = htonl(pConn->sid); pConn->ownId = htonl(pConn->sid);
pConn->linkUid = pHead->linkUid;
if (pRpc->afp && (*pRpc->afp)(pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey) < 0) { if (pRpc->afp && (*pRpc->afp)(pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey) < 0) {
tWarn("%s %p, user not there", pRpc->label, pConn); tWarn("%s %p, user not there", pRpc->label, pConn);
taosFreeId(pRpc->idPool, sid); // sid shall be released taosFreeId(pRpc->idPool, sid); // sid shall be released
...@@ -601,8 +604,8 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) { ...@@ -601,8 +604,8 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) {
} }
if (pConn) { if (pConn) {
if (memcmp(pConn->user, pHead->user, tListLen(pConn->user)) != 0) { if (pConn->linkUid != pHead->linkUid) {
tTrace("%s %p, user:%s is not matched, received:%s", pRpc->label, pConn, pConn->user, pHead->user); tTrace("%s %p, linkUid:0x%x not matched, received:0x%x", pRpc->label, pConn, pConn->linkUid, pHead->linkUid);
terrno = TSDB_CODE_MISMATCHED_METER_ID; terrno = TSDB_CODE_MISMATCHED_METER_ID;
pConn = NULL; pConn = NULL;
} }
...@@ -748,7 +751,6 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -748,7 +751,6 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
if (pRecv->port) pConn->peerPort = pRecv->port; if (pRecv->port) pConn->peerPort = pRecv->port;
if (pHead->port) pConn->peerPort = htons(pHead->port); if (pHead->port) pConn->peerPort = htons(pHead->port);
if (pHead->uid) pConn->peerUid = pHead->uid;
terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen); terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen);
...@@ -813,7 +815,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -813,7 +815,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port); pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port);
} }
if (pConn && pRpc->idleTime) { if (pRpc->connType == TAOS_CONN_SERVER && pConn && pRpc->idleTime) {
// only for server, starts the idle timer. For client, it is started by cache mgmt
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
} }
...@@ -881,7 +884,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { ...@@ -881,7 +884,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
pHead->tranId = pConn->inTranId; pHead->tranId = pConn->inTranId;
pHead->sourceId = pConn->ownId; pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId; pHead->destId = pConn->peerId;
pHead->uid = 0; pHead->linkUid = pConn->linkUid;
memcpy(pHead->user, pConn->user, tListLen(pHead->user)); memcpy(pHead->user, pConn->user, tListLen(pHead->user));
pHead->code = htonl(code); pHead->code = htonl(code);
...@@ -905,7 +908,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { ...@@ -905,7 +908,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
pReplyHead->tranId = pRecvHead->tranId; pReplyHead->tranId = pRecvHead->tranId;
pReplyHead->sourceId = pRecvHead->destId; pReplyHead->sourceId = pRecvHead->destId;
pReplyHead->destId = pRecvHead->sourceId; pReplyHead->destId = pRecvHead->sourceId;
memcpy(pReplyHead->user, pRecvHead->user, tListLen(pReplyHead->user)); pReplyHead->linkUid = pRecvHead->linkUid;
pReplyHead->code = htonl(code); pReplyHead->code = htonl(code);
msgLen = sizeof(SRpcHead); msgLen = sizeof(SRpcHead);
...@@ -951,8 +954,8 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -951,8 +954,8 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pHead->destId = pConn->peerId; pHead->destId = pConn->peerId;
pHead->destIp = pConn->destIp; pHead->destIp = pConn->destIp;
pHead->port = 0; pHead->port = 0;
pHead->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid()); pHead->linkUid = pConn->linkUid;
memcpy(pHead->user, pConn->user, tListLen(pHead->user)); if (!pConn->secured) memcpy(pHead->user, pConn->user, tListLen(pHead->user));
// set the connection parameters // set the connection parameters
pConn->outType = msgType; pConn->outType = msgType;
...@@ -1026,8 +1029,8 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { ...@@ -1026,8 +1029,8 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
pConn->retry++; pConn->retry++;
if (pConn->retry < 4) { if (pConn->retry < 4) {
tTrace("%s %p, re-send msg:%s to %s:%hu retry:%d", pRpc->label, pConn, tTrace("%s %p, re-send msg:%s to %s:%hud", pRpc->label, pConn,
taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort, pConn->retry); taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort);
rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
} else { } else {
...@@ -1179,7 +1182,7 @@ static void rpcBuildAuthHead(void *pMsg, int msgLen, void *pAuth, void *pKey) { ...@@ -1179,7 +1182,7 @@ static void rpcBuildAuthHead(void *pMsg, int msgLen, void *pAuth, void *pKey) {
static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHead *pHead = (SRpcHead *)msg; SRpcHead *pHead = (SRpcHead *)msg;
if (pConn->spi) { if (pConn->spi && pConn->secured == 0) {
// add auth part // add auth part
pHead->spi = pConn->spi; pHead->spi = pConn->spi;
SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen); SRpcDigest *pDigest = (SRpcDigest *)(msg + msgLen);
...@@ -1188,6 +1191,7 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1188,6 +1191,7 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
} else { } else {
pHead->spi = 0;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
} }
...@@ -1197,9 +1201,10 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1197,9 +1201,10 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHead *pHead = (SRpcHead *)msg; SRpcHead *pHead = (SRpcHead *)msg;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
int32_t code = 0; int code = 0;
if (pConn->spi == 0) { if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)){
// secured link, or no authentication
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
return 0; return 0;
} }
...@@ -1214,7 +1219,6 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1214,7 +1219,6 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
} }
code = 0; code = 0;
if (pHead->spi == pConn->spi) { if (pHead->spi == pConn->spi) {
// authentication // authentication
SRpcDigest *pDigest = (SRpcDigest *)((char *)pHead + msgLen - sizeof(SRpcDigest)); SRpcDigest *pDigest = (SRpcDigest *)((char *)pHead + msgLen - sizeof(SRpcDigest));
...@@ -1231,6 +1235,8 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1231,6 +1235,8 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
code = TSDB_CODE_AUTH_FAILURE; code = TSDB_CODE_AUTH_FAILURE;
} else { } else {
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest); pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
if ( !rpcIsReq(pHead->msgType) ) pConn->secured = 1; // link is secured for client
tTrace("%s %p, message is authenticated", pRpc->label, pConn);
} }
} }
} else { } else {
...@@ -1243,7 +1249,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1243,7 +1249,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
static void rpcLockConn(SRpcConn *pConn) { static void rpcLockConn(SRpcConn *pConn) {
int64_t tid = taosGetPthreadId(); int64_t tid = taosGetPthreadId();
int i = 0; int i = 0;
while (atomic_val_compare_exchange_64(&(pConn->lockedBy), 0, tid) != 0) { while (atomic_val_compare_exchange_64(&(pConn->lockedBy), 0, tid) != 0) {
if (++i % 1000 == 0) { if (++i % 1000 == 0) {
sched_yield(); sched_yield();
......
...@@ -106,11 +106,12 @@ int main(int argc, char *argv[]) { ...@@ -106,11 +106,12 @@ int main(int argc, char *argv[]) {
rpcInit.cfp = processResponse; rpcInit.cfp = processResponse;
rpcInit.ufp = processUpdateIpSet; rpcInit.ufp = processUpdateIpSet;
rpcInit.sessions = 100; rpcInit.sessions = 100;
rpcInit.idleTime = 2000; rpcInit.idleTime = tsShellActivityTimer*1000;
rpcInit.user = "michael"; rpcInit.user = "michael";
rpcInit.secret = "mypassword"; rpcInit.secret = "mypassword";
rpcInit.ckey = "key"; rpcInit.ckey = "key";
rpcInit.spi = 1; rpcInit.spi = 1;
rpcInit.connType = TAOS_CONN_CLIENT;
for (int i=1; i<argc; ++i) { for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) { if (strcmp(argv[i], "-p")==0 && i < argc-1) {
...@@ -159,8 +160,8 @@ int main(int argc, char *argv[]) { ...@@ -159,8 +160,8 @@ int main(int argc, char *argv[]) {
} }
} }
rpcInit.connType = TAOS_CONN_CLIENT;
taosInitLog("client.log", 100000, 10); taosInitLog("client.log", 100000, 10);
tPrint("rpcDebugFlag:%d", rpcDebugFlag);
void *pRpc = rpcOpen(&rpcInit); void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) { if (pRpc == NULL) {
...@@ -200,7 +201,7 @@ int main(int argc, char *argv[]) { ...@@ -200,7 +201,7 @@ int main(int argc, char *argv[]) {
tPrint("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads); tPrint("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads);
tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize); tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
taosCloseLog(); taosCloseLogger();
return 0; return 0;
} }
......
...@@ -110,7 +110,7 @@ int main(int argc, char *argv[]) { ...@@ -110,7 +110,7 @@ int main(int argc, char *argv[]) {
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = processRequestMsg; rpcInit.cfp = processRequestMsg;
rpcInit.sessions = 1000; rpcInit.sessions = 1000;
rpcInit.idleTime = 2000; rpcInit.idleTime = tsShellActivityTimer*1500;
rpcInit.afp = retrieveAuthInfo; rpcInit.afp = retrieveAuthInfo;
for (int i=1; i<argc; ++i) { for (int i=1; i<argc; ++i) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册