提交 6fa6a016 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

add testing files

上级 99849ec7
......@@ -68,14 +68,16 @@ typedef struct {
} SRpcInfo;
typedef struct {
SRpcIpSet ipSet;
void *ahandle; // handle provided by app
SRpcInfo *pRpc; // associated SRpcInfo
SRpcIpSet ipSet; // ip list provided by app
void *ahandle; // handle provided by app
char msgType; // message type
char *pCont; // content provided by app
int contLen; // content length
int numOfRetry; // number of retry for different servers
uint8_t *pCont; // content provided by app
int32_t contLen; // content length
int32_t code; // error code
int16_t numOfTry; // number of try for different servers
int8_t oldIndex; // server IP index passed by app
int8_t redirect; // flag to indicate redirect
char msg[0]; // RpcHeader starts from here
} SRpcReqContext;
......@@ -131,6 +133,11 @@ typedef struct {
uint8_t content[0]; // message body starts from here
} SRpcHeader;
typedef struct {
int32_t reserved;
int32_t contLen;
} SRpcComp;
typedef struct {
uint32_t timeStamp;
uint8_t auth[TSDB_AUTH_LEN];
......@@ -333,6 +340,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet ipSet, char type, void *pCont, int
pContext->contLen = contLen;
pContext->pCont = pCont;
pContext->msgType = type;
pContext->oldIndex = ipSet.index;
rpcSendReqToServer(pRpc, pContext);
......@@ -779,11 +787,15 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHeader *pHeader) {
taosAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pConn->peerPort, pConn->meterId);
if (code == TSDB_CODE_REDIRECT) {
pContext->redirect = 1;
pContext->numOfTry = 0;
memcpy(&pContext->ipSet, pHeader->content, sizeof(pContext->ipSet));
rpcSendReqToServer(pRpc, pContext);
} else {
rpcFreeOutMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg
(*(pRpc->cfp))(pHeader->msgType, pCont, contLen, pContext->ahandle, pContext->ipSet.index);
if ( pContext->ipSet.index != pContext->oldIndex || pContext->redirect )
(*pRpc->ufp)(pContext->ahandle, pContext->ipSet);
(*pRpc->cfp)(pHeader->msgType, pCont, contLen, pContext->ahandle, pContext->ipSet.index);
}
}
}
......@@ -853,6 +865,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
int msgLen = rpcMsgLenFromCont(pContext->contLen);
char msgType = pContext->msgType;
pContext->numOfTry++;
SRpcConn *pConn = rpcSetConnToServer(pRpc, pContext->ipSet);
if (pConn == NULL) {
pContext->code = terrno;
......@@ -922,7 +935,7 @@ static void rpcProcessConnError(void *param, void *id) {
SRpcReqContext *pContext = (SRpcReqContext *)param;
SRpcInfo *pRpc = pContext->pRpc;
if ( pContext->numOfRetry >= pContext->ipSet.numOfIps ) {
if ( pContext->numOfTry >= pContext->ipSet.numOfIps ) {
rpcFreeOutMsg(rpcHeaderFromCont(pContext->pCont)); // free the request msg
(*(pRpc->cfp))(pContext->msgType+1, NULL, 0, pContext->ahandle, pContext->code);
} else {
......@@ -1004,11 +1017,6 @@ static void rpcFreeOutMsg(void *msg) {
free(req);
}
typedef struct {
int32_t reserved;
int32_t contLen;
} SRpcComp;
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
SRpcHeader *pHeader = rpcHeaderFromCont(pCont);
int32_t finalLen = 0;
......
......@@ -19,6 +19,10 @@
#include "trpc.h"
#include <stdint.h>
void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code) {
dPrint("response is received, type:%d, contLen:%d code:%d, ahandle:%p", type, contLen, code, ahandle);
}
int32_t main(int32_t argc, char *argv[]) {
dPrint("unit test for rpc module");
......@@ -28,18 +32,28 @@ int32_t main(int32_t argc, char *argv[]) {
rpcInit.localPort = 7000;
rpcInit.label = "unittest";
rpcInit.numOfThreads = 1;
rpcInit.fp = NULL;
rpcInit.cfp = processMsg;
rpcInit.sessions = 1000;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S();
rpcInit.connType = TAOS_CONN_UDPC;
rpcInit.idleTime = 2000;
void *pConn = rpcOpen(&rpcInit);
if (pConn != NULL) {
dPrint("conection is opened");
} else {
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
dError("failed to initialize rpc");
return -1;
}
SRpcIpSet ipSet;
ipSet.numOfIps = 2;
ipSet.index = 0;
ipSet.ip[0] = inet_addr("127.0.0.1");
ipSet.ip[1] = inet_addr("192.168.0.1");
void *cont = rpcMallocCont(100);
rpcSendRequest(pRpc, ipSet, 1, cont, 100, 1);
getchar();
return 0;
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#define _DEFAULT_SOURCE
#include "os.h"
#include "tlog.h"
#include "trpc.h"
#include <stdint.h>
void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code) {
dPrint("request is received, type:%d, contLen:%d", type, contLen);
void *rsp = rpcMallocCont(128);
rpcSendResponse(ahandle, 1, rsp, 128);
rpcFreeCont(pCont);
}
int32_t main(int32_t argc, char *argv[]) {
dPrint("unit test for rpc module");
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = "0.0.0.0";
rpcInit.localPort = 7000;
rpcInit.label = "unittest";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processMsg;
rpcInit.sessions = 1000;
rpcInit.connType = TAOS_CONN_UDPS;
rpcInit.idleTime = 2000;
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
dError("failed to initialize rpc");
return -1;
}
/*
SRpcIpSet ipSet;
ipSet.numOfIps = 2;
ipSet.index = 0;
ipSet.ip[0] = inet_addr("127.0.0.1");
ipSet.ip[1] = inet_addr("192.168.0.1");
*/
dPrint("server is running...");
getchar();
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册