提交 af3a8be5 编写于 作者: dengyihao's avatar dengyihao

add test UT

上级 ed788d39
...@@ -53,7 +53,6 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -53,7 +53,6 @@ void* rpcOpen(const SRpcInit* pInit) {
if (pInit->secret) { if (pInit->secret) {
memcpy(pRpc->secret, pInit->secret, strlen(pInit->secret)); memcpy(pRpc->secret, pInit->secret, strlen(pInit->secret));
} }
return pRpc; return pRpc;
} }
void rpcClose(void* arg) { void rpcClose(void* arg) {
...@@ -113,34 +112,19 @@ void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) { ...@@ -113,34 +112,19 @@ void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) {
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
void rpcCancelRequest(int64_t rid) { return; } void rpcCancelRequest(int64_t rid) { return; }
int32_t rpcInit() { void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
// impl later
return 0;
}
void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg* pMsg, int64_t *pRid) {
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
uint32_t port = pEpSet->eps[pEpSet->inUse].port; uint32_t port = pEpSet->eps[pEpSet->inUse].port;
transSendRequest(shandle, ip, port, pMsg); transSendRequest(shandle, ip, port, pMsg);
} }
void rpcSendRecv(void* shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
uint32_t port = pEpSet->eps[pEpSet->inUse].port; uint32_t port = pEpSet->eps[pEpSet->inUse].port;
transSendRecv(shandle, ip, port, pMsg, pRsp); transSendRecv(shandle, ip, port, pMsg, pRsp);
} }
void rpcSendResponse(const SRpcMsg *pMsg) { void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
transSendResponse(pMsg); int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); }
}
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
return transGetConnInfo((void *)thandle, pInfo);
}
void rpcCleanup(void) {
// impl later
//
return;
}
void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle}; void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle};
void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle}; void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle};
...@@ -155,4 +139,13 @@ void rpcUnrefHandle(void* handle, int8_t type) { ...@@ -155,4 +139,13 @@ void rpcUnrefHandle(void* handle, int8_t type) {
(*taosUnRefHandle[type])(handle); (*taosUnRefHandle[type])(handle);
} }
int32_t rpcInit() {
// impl later
return 0;
}
void rpcCleanup(void) {
// impl later
return;
}
#endif #endif
...@@ -84,7 +84,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* co ...@@ -84,7 +84,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* co
// register timer in each thread to clear expire conn // register timer in each thread to clear expire conn
static void cliTimeoutCb(uv_timer_t* handle); static void cliTimeoutCb(uv_timer_t* handle);
// alloc buf for recv // alloc buf for recv
static void cliAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
// callback after read nbytes from socket // callback after read nbytes from socket
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
// callback after write data to socket // callback after write data to socket
...@@ -154,50 +154,50 @@ void cliHandleResp(SCliConn* conn) { ...@@ -154,50 +154,50 @@ void cliHandleResp(SCliConn* conn) {
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen); pHead->msgLen = htonl(pHead->msgLen);
// buf's mem alread translated to rpcMsg.pCont STransMsg transMsg = {0};
transClearBuffer(&conn->readBuf); transMsg.contLen = transContLenFromMsg(pHead->msgLen);
transMsg.pCont = transContFromHead((char*)pHead);
STransMsg rpcMsg = {0}; transMsg.code = pHead->code;
rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); transMsg.msgType = pHead->msgType;
rpcMsg.pCont = transContFromHead((char*)pHead); transMsg.ahandle = NULL;
rpcMsg.code = pHead->code;
rpcMsg.msgType = pHead->msgType;
rpcMsg.ahandle = NULL;
SCliMsg* pMsg = conn->data; SCliMsg* pMsg = conn->data;
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) {
rpcMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, rpcMsg.msgType) : NULL; transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL;
} else { } else {
rpcMsg.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
} }
// if (rpcMsg.ahandle == NULL) { // if (rpcMsg.ahandle == NULL) {
// tDebug("%s cli conn %p handle except", CONN_GET_INST_LABEL(conn), conn); // tDebug("%s cli conn %p handle except", CONN_GET_INST_LABEL(conn), conn);
// return; // return;
//} //}
if (pTransInst->pfp != NULL && (*pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) { // buf's mem alread translated to transMsg.pCont
rpcMsg.handle = conn; transClearBuffer(&conn->readBuf);
if (pTransInst->pfp != NULL && (*pTransInst->pfp)(pTransInst->parent, transMsg.msgType)) {
transMsg.handle = conn;
CONN_SET_PERSIST_BY_APP(conn); CONN_SET_PERSIST_BY_APP(conn);
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
} }
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen); inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
conn->secured = pHead->secured; conn->secured = pHead->secured;
if (pCtx == NULL || pCtx->pSem == NULL) { if (pCtx == NULL || pCtx->pSem == NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, conn); tTrace("%s cli conn %p handle resp", pTransInst->label, conn);
(pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); (pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
} else { } else {
tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, conn); tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, conn);
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg));
tsem_post(pCtx->pSem); tsem_post(pCtx->pSem);
} }
uv_read_start((uv_stream_t*)conn->stream, cliAllocBufferCb, cliRecvCb); uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
if (CONN_NO_PERSIST_BY_APP(conn)) { if (CONN_NO_PERSIST_BY_APP(conn)) {
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
...@@ -224,23 +224,23 @@ void cliHandleExcept(SCliConn* pConn) { ...@@ -224,23 +224,23 @@ void cliHandleExcept(SCliConn* pConn) {
SCliMsg* pMsg = pConn->data; SCliMsg* pMsg = pConn->data;
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
STransMsg rpcMsg = {0}; STransMsg transMsg = {0};
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
rpcMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
rpcMsg.ahandle = NULL; transMsg.ahandle = NULL;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
rpcMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, rpcMsg.msgType) : NULL; transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL;
} else { } else {
rpcMsg.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
} }
if (pCtx == NULL || pCtx->pSem == NULL) { if (pCtx == NULL || pCtx->pSem == NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
(pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); (pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
} else { } else {
tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, pConn); tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, pConn);
memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg));
tsem_post(pCtx->pSem); tsem_post(pCtx->pSem);
} }
destroyCmsg(pConn->data); destroyCmsg(pConn->data);
...@@ -252,9 +252,9 @@ void cliHandleExcept(SCliConn* pConn) { ...@@ -252,9 +252,9 @@ void cliHandleExcept(SCliConn* pConn) {
void cliTimeoutCb(uv_timer_t* handle) { void cliTimeoutCb(uv_timer_t* handle) {
SCliThrdObj* pThrd = handle->data; SCliThrdObj* pThrd = handle->data;
STrans* pRpc = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
int64_t currentTime = pThrd->nextTimeout; int64_t currentTime = pThrd->nextTimeout;
tTrace("%s, cli conn timeout, try to remove expire conn from conn pool", pRpc->label); tTrace("%s, cli conn timeout, try to remove expire conn from conn pool", pTransInst->label);
SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
while (p != NULL) { while (p != NULL) {
...@@ -271,8 +271,8 @@ void cliTimeoutCb(uv_timer_t* handle) { ...@@ -271,8 +271,8 @@ void cliTimeoutCb(uv_timer_t* handle) {
p = taosHashIterate((SHashObj*)pThrd->pool, p); p = taosHashIterate((SHashObj*)pThrd->pool, p);
} }
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0);
} }
void* createConnPool(int size) { void* createConnPool(int size) {
...@@ -324,15 +324,15 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { ...@@ -324,15 +324,15 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
STrans* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
// list already create before // list already create before
assert(plist != NULL); assert(plist != NULL);
QUEUE_PUSH(&plist->conn, &conn->conn); QUEUE_PUSH(&plist->conn, &conn->conn);
} }
static void cliAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
transAllocBuffer(pBuf, buf); transAllocBuffer(pBuf, buf);
...@@ -416,7 +416,7 @@ static void cliSendCb(uv_write_t* req, int status) { ...@@ -416,7 +416,7 @@ static void cliSendCb(uv_write_t* req, int status) {
cliHandleExcept(pConn); cliHandleExcept(pConn);
return; return;
} }
uv_read_start((uv_stream_t*)pConn->stream, cliAllocBufferCb, cliRecvCb); uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
} }
void cliSend(SCliConn* pConn) { void cliSend(SCliConn* pConn) {
...@@ -581,14 +581,14 @@ static void* cliWorkThread(void* arg) { ...@@ -581,14 +581,14 @@ static void* cliWorkThread(void* arg) {
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
SCliObj* cli = calloc(1, sizeof(SCliObj)); SCliObj* cli = calloc(1, sizeof(SCliObj));
STrans* pRpc = shandle; STrans* pTransInst = shandle;
memcpy(cli->label, label, strlen(label)); memcpy(cli->label, label, strlen(label));
cli->numOfThreads = numOfThreads; cli->numOfThreads = numOfThreads;
cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*)); cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*));
for (int i = 0; i < cli->numOfThreads; i++) { for (int i = 0; i < cli->numOfThreads; i++) {
SCliThrdObj* pThrd = createThrdObj(); SCliThrdObj* pThrd = createThrdObj();
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
pThrd->pTransInst = shandle; pThrd->pTransInst = shandle;
int err = pthread_create(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd)); int err = pthread_create(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
......
...@@ -16,20 +16,6 @@ ...@@ -16,20 +16,6 @@
#include "transComm.h" #include "transComm.h"
int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context;
int ret = -1;
tMD5Init(&context);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Update(&context, (uint8_t*)pMsg, msgLen);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Final(&context);
if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;
return ret;
}
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context; T_MD5_CTX context;
int ret = -1; int ret = -1;
...@@ -44,17 +30,7 @@ int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { ...@@ -44,17 +30,7 @@ int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
return ret; return ret;
} }
void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context;
tMD5Init(&context);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Update(&context, (uint8_t*)pMsg, msgLen);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Final(&context);
memcpy(pAuth, context.digest, sizeof(context.digest));
}
void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) { void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context; T_MD5_CTX context;
...@@ -67,45 +43,6 @@ void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) { ...@@ -67,45 +43,6 @@ void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
memcpy(pAuth, context.digest, sizeof(context.digest)); memcpy(pAuth, context.digest, sizeof(context.digest));
} }
int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
SRpcHead* pHead = rpcHeadFromCont(pCont);
int32_t finalLen = 0;
int overhead = sizeof(SRpcComp);
if (!NEEDTO_COMPRESSS_MSG(contLen)) {
return contLen;
}
char* buf = malloc(contLen + overhead + 8); // 8 extra bytes
if (buf == NULL) {
tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen);
return contLen;
}
int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead);
tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead);
/*
* only the compressed size is less than the value of contLen - overhead, the compression is applied
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
*/
if (compLen > 0 && compLen < contLen - overhead) {
SRpcComp* pComp = (SRpcComp*)pCont;
pComp->reserved = 0;
pComp->contLen = htonl(contLen);
memcpy(pCont + overhead, buf, compLen);
pHead->comp = 1;
tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen);
finalLen = compLen + overhead;
} else {
finalLen = contLen;
}
free(buf);
return finalLen;
}
bool transCompressMsg(char* msg, int32_t len, int32_t* flen) { bool transCompressMsg(char* msg, int32_t len, int32_t* flen) {
return false; return false;
// SRpcHead* pHead = rpcHeadFromCont(pCont); // SRpcHead* pHead = rpcHeadFromCont(pCont);
...@@ -154,39 +91,6 @@ bool transDecompressMsg(char* msg, int32_t len, int32_t* flen) { ...@@ -154,39 +91,6 @@ bool transDecompressMsg(char* msg, int32_t len, int32_t* flen) {
return false; return false;
} }
SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) {
int overhead = sizeof(SRpcComp);
SRpcHead* pNewHead = NULL;
uint8_t* pCont = pHead->content;
SRpcComp* pComp = (SRpcComp*)pHead->content;
if (pHead->comp) {
// decompress the content
assert(pComp->reserved == 0);
int contLen = htonl(pComp->contLen);
// prepare the temporary buffer to decompress message
char* temp = (char*)malloc(contLen + RPC_MSG_OVERHEAD);
pNewHead = (SRpcHead*)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext
if (pNewHead) {
int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead;
int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char*)pNewHead->content, compLen, contLen);
assert(origLen == contLen);
memcpy(pNewHead, pHead, sizeof(SRpcHead));
pNewHead->msgLen = rpcMsgLenFromCont(origLen);
/// rpcFreeMsg(pHead); // free the compressed message buffer
pHead = pNewHead;
tTrace("decomp malloc mem:%p", temp);
} else {
tError("failed to allocate memory to decompress msg, contLen:%d", contLen);
}
}
return pHead;
}
void transConnCtxDestroy(STransConnCtx* ctx) { void transConnCtxDestroy(STransConnCtx* ctx) {
free(ctx->ip); free(ctx->ip);
free(ctx); free(ctx);
...@@ -315,7 +219,7 @@ int transSendAsync(SAsyncPool* pool, queue* q) { ...@@ -315,7 +219,7 @@ int transSendAsync(SAsyncPool* pool, queue* q) {
if (el > 50) { if (el > 50) {
// tInfo("lock and unlock cost: %d", (int)el); // tInfo("lock and unlock cost: %d", (int)el);
} }
return uv_async_send(async); return uv_async_send(async);
} }
#endif #endif
...@@ -58,10 +58,11 @@ typedef struct SWorkThrdObj { ...@@ -58,10 +58,11 @@ typedef struct SWorkThrdObj {
uv_os_fd_t fd; uv_os_fd_t fd;
uv_loop_t* loop; uv_loop_t* loop;
SAsyncPool* asyncPool; SAsyncPool* asyncPool;
// uv_async_t* workerAsync; //
queue msg; queue msg;
queue conn;
pthread_mutex_t msgMtx; pthread_mutex_t msgMtx;
queue conn;
void* pTransInst; void* pTransInst;
bool quit; bool quit;
} SWorkThrdObj; } SWorkThrdObj;
...@@ -90,7 +91,7 @@ static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen); ...@@ -90,7 +91,7 @@ static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen);
static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen); static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen);
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void uvOnTimeoutCb(uv_timer_t* handle); static void uvOnTimeoutCb(uv_timer_t* handle);
static void uvOnSendCb(uv_write_t* req, int status); static void uvOnSendCb(uv_write_t* req, int status);
...@@ -120,7 +121,7 @@ static void* acceptThread(void* arg); ...@@ -120,7 +121,7 @@ static void* acceptThread(void* arg);
static bool addHandleToWorkloop(void* arg); static bool addHandleToWorkloop(void* arg);
static bool addHandleToAcceptloop(void* arg); static bool addHandleToAcceptloop(void* arg);
void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SSrvConn* conn = handle->data; SSrvConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
transAllocBuffer(pBuf, buf); transAllocBuffer(pBuf, buf);
...@@ -162,7 +163,7 @@ static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) { ...@@ -162,7 +163,7 @@ static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) {
tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta); tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
code = TSDB_CODE_RPC_INVALID_TIME_STAMP; code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
} else { } else {
if (rpcAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { if (transAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
// tDebug("%s, authentication failed, msg discarded", pConn->info); // tDebug("%s, authentication failed, msg discarded", pConn->info);
code = TSDB_CODE_RPC_AUTH_FAILURE; code = TSDB_CODE_RPC_AUTH_FAILURE;
} else { } else {
...@@ -203,10 +204,6 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -203,10 +204,6 @@ static void uvHandleReq(SSrvConn* pConn) {
memcpy(pConn->user, uMsg->user, tListLen(uMsg->user)); memcpy(pConn->user, uMsg->user, tListLen(uMsg->user));
memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret)); memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret));
} }
pConn->inType = pHead->msgType;
STrans* pRpc = (STrans*)p->shandle;
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
int32_t dlen = 0; int32_t dlen = 0;
...@@ -219,21 +216,24 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -219,21 +216,24 @@ static void uvHandleReq(SSrvConn* pConn) {
// //
} }
STransMsg rpcMsg; STransMsg transMsg;
rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); transMsg.contLen = transContLenFromMsg(pHead->msgLen);
rpcMsg.pCont = pHead->content; transMsg.pCont = pHead->content;
rpcMsg.msgType = pHead->msgType; transMsg.msgType = pHead->msgType;
rpcMsg.code = pHead->code; transMsg.code = pHead->code;
rpcMsg.ahandle = NULL; transMsg.ahandle = NULL;
rpcMsg.handle = pConn; transMsg.handle = pConn;
transClearBuffer(&pConn->readBuf); transClearBuffer(&pConn->readBuf);
pConn->inType = pHead->msgType;
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(rpcMsg.msgType),
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
ntohs(pConn->locaddr.sin_port), rpcMsg.contLen); ntohs(pConn->locaddr.sin_port), transMsg.contLen);
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
STrans* pTransInst = (STrans*)p->shandle;
(*((STrans*)p->shandle)->cfp)(pTransInst->parent, &transMsg, NULL);
// uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth // auth
// validate msg type // validate msg type
...@@ -525,7 +525,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -525,7 +525,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
return; return;
} }
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnRecvCb); uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
} else { } else {
tDebug("failed to create new connection"); tDebug("failed to create new connection");
...@@ -546,7 +546,6 @@ static bool addHandleToWorkloop(void* arg) { ...@@ -546,7 +546,6 @@ static bool addHandleToWorkloop(void* arg) {
return false; return false;
} }
// STrans* pRpc = pThrd->shandle;
uv_pipe_init(pThrd->loop, pThrd->pipe, 1); uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
uv_pipe_open(pThrd->pipe, pThrd->fd); uv_pipe_open(pThrd->pipe, pThrd->fd);
......
...@@ -29,24 +29,25 @@ const char *ckey = "ckey"; ...@@ -29,24 +29,25 @@ const char *ckey = "ckey";
class Server; class Server;
int port = 7000; int port = 7000;
// server process // server process
typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
// client process; // client process;
static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
class Client { class Client {
public: public:
void Init(int nThread) { void Init(int nThread) {
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit_, 0, sizeof(rpcInit_));
rpcInit.localPort = 0; rpcInit_.localPort = 0;
rpcInit.label = (char *)label; rpcInit_.label = (char *)label;
rpcInit.numOfThreads = nThread; rpcInit_.numOfThreads = nThread;
rpcInit.cfp = processResp; rpcInit_.cfp = processResp;
rpcInit.user = (char *)user; rpcInit_.user = (char *)user;
rpcInit.secret = (char *)secret; rpcInit_.secret = (char *)secret;
rpcInit.ckey = (char *)ckey; rpcInit_.ckey = (char *)ckey;
rpcInit.spi = 1; rpcInit_.spi = 1;
rpcInit.parent = this; rpcInit_.parent = this;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit_.connType = TAOS_CONN_CLIENT;
this->transCli = rpcOpen(&rpcInit); this->transCli = rpcOpen(&rpcInit_);
tsem_init(&this->sem, 0, 0); tsem_init(&this->sem, 0, 0);
} }
void SetResp(SRpcMsg *pMsg) { void SetResp(SRpcMsg *pMsg) {
...@@ -55,9 +56,27 @@ class Client { ...@@ -55,9 +56,27 @@ class Client {
} }
SRpcMsg *Resp() { return &this->resp; } SRpcMsg *Resp() { return &this->resp; }
void Restart() { void Restart(CB cb) {
rpcClose(this->transCli);
rpcInit_.cfp = cb;
this->transCli = rpcOpen(&rpcInit_);
}
void setPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) {
rpcClose(this->transCli);
rpcInit_.pfp = pfp;
this->transCli = rpcOpen(&rpcInit_);
}
void setConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) {
rpcClose(this->transCli);
rpcInit_.mfp = mfp;
this->transCli = rpcOpen(&rpcInit_);
}
void setPAndMFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) {
rpcClose(this->transCli); rpcClose(this->transCli);
this->transCli = rpcOpen(&rpcInit);
rpcInit_.pfp = pfp;
rpcInit_.mfp = mfp;
this->transCli = rpcOpen(&rpcInit_);
} }
void SendAndRecv(SRpcMsg *req, SRpcMsg *resp) { void SendAndRecv(SRpcMsg *req, SRpcMsg *resp) {
...@@ -79,7 +98,7 @@ class Client { ...@@ -79,7 +98,7 @@ class Client {
private: private:
tsem_t sem; tsem_t sem;
SRpcInit rpcInit; SRpcInit rpcInit_;
void * transCli; void * transCli;
SRpcMsg resp; SRpcMsg resp;
}; };
...@@ -133,9 +152,8 @@ static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -133,9 +152,8 @@ static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
client->SetResp(pMsg); client->SetResp(pMsg);
client->SemPost(); client->SemPost();
} }
class TransObj {
public: static void initEnv() {
TransObj() {
dDebugFlag = 143; dDebugFlag = 143;
vDebugFlag = 0; vDebugFlag = 0;
mDebugFlag = 143; mDebugFlag = 143;
...@@ -159,13 +177,31 @@ class TransObj { ...@@ -159,13 +177,31 @@ class TransObj {
if (taosInitLog("taosdlog", 1) != 0) { if (taosInitLog("taosdlog", 1) != 0) {
printf("failed to init log file\n"); printf("failed to init log file\n");
} }
}
class TransObj {
public:
TransObj() {
initEnv();
cli = new Client; cli = new Client;
cli->Init(1); cli->Init(1);
srv = new Server; srv = new Server;
srv->Start(); srv->Start();
} }
void RestartCli() { cli->Restart(); }
void RestartCli(CB cb) { cli->Restart(cb); }
void StopSrv() { srv->Stop(); } void StopSrv() { srv->Stop(); }
void SetCliPersistFp(bool (*pfp)(void *parent, tmsg_t msgType)) {
// do nothing
cli->setPersistFP(pfp);
}
void SetCliMFp(void *(*mfp)(void *parent, tmsg_t msgType)) {
// do nothing
cli->setConstructFP(mfp);
}
void SetMAndPFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) {
// do nothing
cli->setPAndMFp(pfp, mfp);
}
void RestartSrv() { srv->Restart(); } void RestartSrv() { srv->Restart(); }
void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); } void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); }
~TransObj() { ~TransObj() {
...@@ -191,16 +227,16 @@ class TransEnv : public ::testing::Test { ...@@ -191,16 +227,16 @@ class TransEnv : public ::testing::Test {
TransObj *tr = NULL; TransObj *tr = NULL;
}; };
// TEST_F(TransEnv, 01sendAndRec) { TEST_F(TransEnv, 01sendAndRec) {
// for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
// SRpcMsg req = {0}, resp = {0}; SRpcMsg req = {0}, resp = {0};
// req.msgType = 0; req.msgType = 0;
// req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
// req.contLen = 10; req.contLen = 10;
// tr->cliSendAndRecv(&req, &resp); tr->cliSendAndRecv(&req, &resp);
// assert(resp.code == 0); assert(resp.code == 0);
// } }
//} }
TEST_F(TransEnv, 02StopServer) { TEST_F(TransEnv, 02StopServer) {
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
...@@ -218,6 +254,31 @@ TEST_F(TransEnv, 02StopServer) { ...@@ -218,6 +254,31 @@ TEST_F(TransEnv, 02StopServer) {
tr->StopSrv(); tr->StopSrv();
// tr->RestartSrv(); // tr->RestartSrv();
tr->cliSendAndRecv(&req, &resp); tr->cliSendAndRecv(&req, &resp);
assert(resp.code != 0); assert(resp.code != 0);
} }
TEST_F(TransEnv, clientUserDefined) {}
TEST_F(TransEnv, cliPersistHandle) {
// impl late
}
TEST_F(TransEnv, srvPersistHandle) {
// impl later
}
TEST_F(TransEnv, srvPersisHandleExcept) {
// conn breken
//
}
TEST_F(TransEnv, cliPersisHandleExcept) {
// conn breken
}
TEST_F(TransEnv, multiCliPersisHandleExcept) {
// conn breken
}
TEST_F(TransEnv, multiSrvPersisHandleExcept) {
// conn breken
}
TEST_F(TransEnv, queryExcept) {
// query and conn is broken
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册