提交 36989803 编写于 作者: dengyihao's avatar dengyihao

refactor code and test reading half packet

上级 04679404
...@@ -32,6 +32,7 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -32,6 +32,7 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
pRpc->connType = pInit->connType; pRpc->connType = pInit->connType;
pRpc->idleTime = pInit->idleTime;
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
return pRpc; return pRpc;
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include "transComm.h" #include "transComm.h"
#define CONN_PERSIST_TIME(para) (para * 1000 * 100) #define CONN_PERSIST_TIME(para) (para * 1000 * 10)
typedef struct SCliConn { typedef struct SCliConn {
uv_connect_t connReq; uv_connect_t connReq;
...@@ -65,15 +65,15 @@ typedef struct SConnList { ...@@ -65,15 +65,15 @@ typedef struct SConnList {
// conn pool // conn pool
// add expire timeout and capacity limit // add expire timeout and capacity limit
static void* connPoolCreate(int size); static void* creatConnPool(int size);
static void* connPoolDestroy(void* pool); static void* destroyConnPool(void* pool);
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn); static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn);
// register timer in each thread to clear expire conn // register timer in each thread to clear expire conn
static void clientTimeoutCb(uv_timer_t* handle); static void clientTimeoutCb(uv_timer_t* handle);
// process data read from server, auth/decompress etc later // process data read from server, auth/decompress etc later
static void clientProcessData(SCliConn* conn); static void clientHandleResp(SCliConn* conn);
// check whether already read complete packet from server // check whether already read complete packet from server
static bool clientReadComplete(SConnBuffer* pBuf); static bool clientReadComplete(SConnBuffer* pBuf);
// alloc buf for read // alloc buf for read
...@@ -86,9 +86,11 @@ static void clientWriteCb(uv_write_t* req, int status); ...@@ -86,9 +86,11 @@ static void clientWriteCb(uv_write_t* req, int status);
static void clientConnCb(uv_connect_t* req, int status); static void clientConnCb(uv_connect_t* req, int status);
static void clientAsyncCb(uv_async_t* handle); static void clientAsyncCb(uv_async_t* handle);
static void clientDestroy(uv_handle_t* handle); static void clientDestroy(uv_handle_t* handle);
static void clientConnDestroy(SCliConn* pConn); static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void clientMsgDestroy(SCliMsg* pMsg); static void clientMsgDestroy(SCliMsg* pMsg);
// handle req from app
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
// thread obj // thread obj
static SCliThrdObj* createThrdObj(); static SCliThrdObj* createThrdObj();
...@@ -96,9 +98,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd); ...@@ -96,9 +98,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
// thread // thread
static void* clientThread(void* arg); static void* clientThread(void* arg);
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientHandleResp(SCliConn* conn) {
static void clientProcessData(SCliConn* conn) {
STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx; STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx;
SRpcInfo* pRpc = pCtx->pRpc; SRpcInfo* pRpc = pCtx->pRpc;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
...@@ -131,7 +131,9 @@ static void clientTimeoutCb(uv_timer_t* handle) { ...@@ -131,7 +131,9 @@ static void clientTimeoutCb(uv_timer_t* handle) {
SCliConn* c = QUEUE_DATA(h, SCliConn, conn); SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
if (c->expireTime < currentTime) { if (c->expireTime < currentTime) {
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
clientConnDestroy(c); // uv_stream_t stm = *(c->stream);
// uv_close((uv_handle_t*)&stm, clientDestroy);
clientConnDestroy(c, true);
} else { } else {
break; break;
} }
...@@ -142,18 +144,18 @@ static void clientTimeoutCb(uv_timer_t* handle) { ...@@ -142,18 +144,18 @@ static void clientTimeoutCb(uv_timer_t* handle) {
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
} }
static void* connPoolCreate(int size) { static void* creatConnPool(int size) {
SHashObj* pool = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); // thread local, no lock
return pool; return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
} }
static void* connPoolDestroy(void* pool) { static void* destroyConnPool(void* pool) {
SConnList* connList = taosHashIterate((SHashObj*)pool, NULL); SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
while (connList != NULL) { while (connList != NULL) {
while (!QUEUE_IS_EMPTY(&connList->conn)) { while (!QUEUE_IS_EMPTY(&connList->conn)) {
queue* h = QUEUE_HEAD(&connList->conn); queue* h = QUEUE_HEAD(&connList->conn);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SCliConn* c = QUEUE_DATA(h, SCliConn, conn); SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
clientConnDestroy(c); clientConnDestroy(c, true);
} }
connList = taosHashIterate((SHashObj*)pool, connList); connList = taosHashIterate((SHashObj*)pool, connList);
} }
...@@ -245,28 +247,37 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf ...@@ -245,28 +247,37 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
pBuf->len += nread; pBuf->len += nread;
if (clientReadComplete(pBuf)) { if (clientReadComplete(pBuf)) {
tDebug("alread read complete"); tDebug("alread read complete");
clientProcessData(conn); clientHandleResp(conn);
} else { } else {
tDebug("read halp packet, continue to read"); tDebug("read half packet, continue to read");
} }
return; return;
} }
assert(nread <= 0);
if (nread == 0) {
return;
}
if (nread != UV_EOF) { if (nread != UV_EOF) {
tDebug("Read error %s\n", uv_err_name(nread)); tDebug("read error %s", uv_err_name(nread));
} }
// // tDebug("Read error %s\n", uv_err_name(nread));
uv_close((uv_handle_t*)handle, clientDestroy); // uv_close((uv_handle_t*)handle, clientDestroy);
} }
static void clientConnDestroy(SCliConn* conn) { static void clientConnDestroy(SCliConn* conn, bool clear) {
// impl later tDebug("conn %p destroy", conn);
// if (clear) {
uv_close((uv_handle_t*)conn->stream, NULL);
}
free(conn->stream);
free(conn->readBuf.buf);
free(conn->writeReq);
free(conn);
} }
static void clientDestroy(uv_handle_t* handle) { static void clientDestroy(uv_handle_t* handle) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
QUEUE_REMOVE(&conn->conn); // QUEUE_REMOVE(&conn->conn);
clientConnDestroy(conn); clientConnDestroy(conn, false);
} }
static void clientWriteCb(uv_write_t* req, int status) { static void clientWriteCb(uv_write_t* req, int status) {
...@@ -274,7 +285,8 @@ static void clientWriteCb(uv_write_t* req, int status) { ...@@ -274,7 +285,8 @@ static void clientWriteCb(uv_write_t* req, int status) {
if (status == 0) { if (status == 0) {
tDebug("data already was written on stream"); tDebug("data already was written on stream");
} else { } else {
uv_close((uv_handle_t*)pConn->stream, clientDestroy); tError("failed to write: %s", uv_err_name(status));
clientConnDestroy(pConn, true);
return; return;
} }
SCliThrdObj* pThrd = pConn->hostThrd; SCliThrdObj* pThrd = pConn->hostThrd;
...@@ -317,7 +329,9 @@ static void clientConnCb(uv_connect_t* req, int status) { ...@@ -317,7 +329,9 @@ static void clientConnCb(uv_connect_t* req, int status) {
rpcMsg.ahandle = pCtx->ahandle; rpcMsg.ahandle = pCtx->ahandle;
// SRpcInfo* pRpc = pMsg->ctx->pRpc; // SRpcInfo* pRpc = pMsg->ctx->pRpc;
(pRpc->cfp)(NULL, &rpcMsg, NULL); (pRpc->cfp)(NULL, &rpcMsg, NULL);
uv_close((uv_handle_t*)req->handle, clientDestroy);
clientConnDestroy(pConn, true);
// uv_close((uv_handle_t*)req->handle, clientDestroy);
return; return;
} }
...@@ -421,7 +435,6 @@ static void clientMsgDestroy(SCliMsg* pMsg) { ...@@ -421,7 +435,6 @@ static void clientMsgDestroy(SCliMsg* pMsg) {
} }
static SCliThrdObj* createThrdObj() { static SCliThrdObj* createThrdObj() {
SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
QUEUE_INIT(&pThrd->msg); QUEUE_INIT(&pThrd->msg);
pthread_mutex_init(&pThrd->msgMtx, NULL); pthread_mutex_init(&pThrd->msgMtx, NULL);
...@@ -436,7 +449,7 @@ static SCliThrdObj* createThrdObj() { ...@@ -436,7 +449,7 @@ static SCliThrdObj* createThrdObj() {
uv_timer_init(pThrd->loop, pThrd->pTimer); uv_timer_init(pThrd->loop, pThrd->pTimer);
pThrd->pTimer->data = pThrd; pThrd->pTimer->data = pThrd;
pThrd->pool = connPoolCreate(1); pThrd->pool = creatConnPool(1);
return pThrd; return pThrd;
} }
static void destroyThrdObj(SCliThrdObj* pThrd) { static void destroyThrdObj(SCliThrdObj* pThrd) {
......
...@@ -14,8 +14,8 @@ ...@@ -14,8 +14,8 @@
*/ */
#ifdef USE_UV #ifdef USE_UV
#include "transComm.h"
#include "transComm.h"
typedef struct SConn { typedef struct SConn {
uv_tcp_t* pTcp; uv_tcp_t* pTcp;
uv_write_t* pWriter; uv_write_t* pWriter;
...@@ -84,12 +84,12 @@ static void uvWorkerAsyncCb(uv_async_t* handle); ...@@ -84,12 +84,12 @@ static void uvWorkerAsyncCb(uv_async_t* handle);
static void uvPrepareSendData(SConn* conn, uv_buf_t* wb); static void uvPrepareSendData(SConn* conn, uv_buf_t* wb);
// already read complete packet // check whether already read complete packet
static bool readComplete(SConnBuffer* buf); static bool readComplete(SConnBuffer* buf);
static SConn* createConn();
static void destroyConn(SConn* conn, bool clear /*clear handle or not*/);
static SConn* connCreate(); static void uvDestroyConn(uv_handle_t* handle);
static void connDestroy(SConn* conn);
static void uvConnDestroy(uv_handle_t* handle);
// server and worker thread // server and worker thread
static void* workerThread(void* arg); static void* workerThread(void* arg);
...@@ -283,6 +283,7 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { ...@@ -283,6 +283,7 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
SConnBuffer* pBuf = &conn->connBuf; SConnBuffer* pBuf = &conn->connBuf;
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
tDebug("on read %p, total read: %d, current read: %d", cli, pBuf->len, (int)nread);
if (readComplete(pBuf)) { if (readComplete(pBuf)) {
tDebug("alread read complete packet"); tDebug("alread read complete packet");
uvProcessData(conn); uvProcessData(conn);
...@@ -291,11 +292,12 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { ...@@ -291,11 +292,12 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
} }
return; return;
} }
if (nread != UV_EOF) { if (nread == 0) {
tDebug("Read error %s", uv_err_name(nread)); return;
} }
if (nread != UV_EOF) {
tDebug("read error %s", uv_err_name(nread)); tDebug("read error %s", uv_err_name(nread));
uv_close((uv_handle_t*)cli, uvConnDestroy); }
} }
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = malloc(sizeof(char)); buf->base = malloc(sizeof(char));
...@@ -318,7 +320,7 @@ void uvOnWriteCb(uv_write_t* req, int status) { ...@@ -318,7 +320,7 @@ void uvOnWriteCb(uv_write_t* req, int status) {
tDebug("data already was written on stream"); tDebug("data already was written on stream");
} else { } else {
tDebug("failed to write data, %s", uv_err_name(status)); tDebug("failed to write data, %s", uv_err_name(status));
connDestroy(conn); destroyConn(conn, true);
} }
// opt // opt
} }
...@@ -331,7 +333,8 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { ...@@ -331,7 +333,8 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
} }
static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) { static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) {
// impl later // impl later;
tDebug("prepare to send back");
SRpcMsg* pMsg = &conn->sendMsg; SRpcMsg* pMsg = &conn->sendMsg;
if (pMsg->pCont == 0) { if (pMsg->pCont == 0) {
pMsg->pCont = (void*)rpcMallocCont(0); pMsg->pCont = (void*)rpcMallocCont(0);
...@@ -394,6 +397,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { ...@@ -394,6 +397,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb); uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
} else { } else {
uv_close((uv_handle_t*)cli, NULL); uv_close((uv_handle_t*)cli, NULL);
free(cli);
} }
} }
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
...@@ -403,7 +407,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -403,7 +407,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
tError("read error %s", uv_err_name(nread)); tError("read error %s", uv_err_name(nread));
} }
// TODO(log other failure reason) // TODO(log other failure reason)
uv_close((uv_handle_t*)q, NULL); // uv_close((uv_handle_t*)q, NULL);
return; return;
} }
// free memory allocated by // free memory allocated by
...@@ -422,7 +426,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -422,7 +426,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_handle_type pending = uv_pipe_pending_type(pipe); uv_handle_type pending = uv_pipe_pending_type(pipe);
assert(pending == UV_TCP); assert(pending == UV_TCP);
SConn* pConn = connCreate(); SConn* pConn = createConn();
pConn->pTransInst = pThrd->pTransInst; pConn->pTransInst = pThrd->pTransInst;
/* init conn timer*/ /* init conn timer*/
pConn->pTimer = malloc(sizeof(uv_timer_t)); pConn->pTimer = malloc(sizeof(uv_timer_t));
...@@ -448,7 +452,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -448,7 +452,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
} else { } else {
tDebug("failed to create new connection"); tDebug("failed to create new connection");
connDestroy(pConn); destroyConn(pConn, true);
} }
} }
...@@ -509,7 +513,7 @@ void* workerThread(void* arg) { ...@@ -509,7 +513,7 @@ void* workerThread(void* arg) {
uv_run(pThrd->loop, UV_RUN_DEFAULT); uv_run(pThrd->loop, UV_RUN_DEFAULT);
} }
static SConn* connCreate() { static SConn* createConn() {
SConn* pConn = (SConn*)calloc(1, sizeof(SConn)); SConn* pConn = (SConn*)calloc(1, sizeof(SConn));
return pConn; return pConn;
} }
...@@ -517,22 +521,24 @@ static void connCloseCb(uv_handle_t* handle) { ...@@ -517,22 +521,24 @@ static void connCloseCb(uv_handle_t* handle) {
// impl later // impl later
// //
} }
static void connDestroy(SConn* conn) { static void destroyConn(SConn* conn, bool clear) {
if (conn == NULL) { if (conn == NULL) {
return; return;
} }
if (clear) {
uv_handle_t handle = *((uv_handle_t*)conn->pTcp);
uv_close(&handle, NULL);
}
uv_timer_stop(conn->pTimer); uv_timer_stop(conn->pTimer);
free(conn->pTimer); free(conn->pTimer);
// uv_close((uv_handle_t*)conn->pTcp, connCloseCb);
free(conn->pTcp); free(conn->pTcp);
free(conn->connBuf.buf); free(conn->connBuf.buf);
free(conn->pWriter); free(conn->pWriter);
// free(conn); free(conn);
// handle
} }
static void uvConnDestroy(uv_handle_t* handle) { static void uvDestroyConn(uv_handle_t* handle) {
SConn* conn = handle->data; SConn* conn = handle->data;
connDestroy(conn); destroyConn(conn, false);
} }
static int transAddAuthPart(SConn* pConn, char* msg, int msgLen) { static int transAddAuthPart(SConn* pConn, char* msg, int msgLen) {
STransMsgHead* pHead = (STransMsgHead*)msg; STransMsgHead* pHead = (STransMsgHead*)msg;
......
...@@ -34,8 +34,8 @@ typedef struct { ...@@ -34,8 +34,8 @@ typedef struct {
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SInfo *pInfo = (SInfo *)pMsg->ahandle; SInfo *pInfo = (SInfo *)pMsg->ahandle;
// tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
// pMsg->code); pMsg->code);
if (pEpSet) pInfo->epSet = *pEpSet; if (pEpSet) pInfo->epSet = *pEpSet;
...@@ -63,6 +63,8 @@ static void *sendRequest(void *param) { ...@@ -63,6 +63,8 @@ static void *sendRequest(void *param) {
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
// tsem_wait(&pInfo->rspSem); // tsem_wait(&pInfo->rspSem);
tsem_wait(&pInfo->rspSem); tsem_wait(&pInfo->rspSem);
tDebug("recv response");
// usleep(100000000);
} }
tDebug("thread:%d, it is over", pInfo->index); tDebug("thread:%d, it is over", pInfo->index);
...@@ -98,7 +100,7 @@ int main(int argc, char *argv[]) { ...@@ -98,7 +100,7 @@ int main(int argc, char *argv[]) {
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = processResponse; rpcInit.cfp = processResponse;
rpcInit.sessions = 100; rpcInit.sessions = 100;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = 100;
rpcInit.user = "michael"; rpcInit.user = "michael";
rpcInit.secret = secret; rpcInit.secret = secret;
rpcInit.ckey = "key"; rpcInit.ckey = "key";
......
...@@ -122,7 +122,7 @@ int main(int argc, char *argv[]) { ...@@ -122,7 +122,7 @@ int main(int argc, char *argv[]) {
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = processRequestMsg; rpcInit.cfp = processRequestMsg;
rpcInit.sessions = 1000; rpcInit.sessions = 1000;
rpcInit.idleTime = tsShellActivityTimer * 1500; rpcInit.idleTime = 2 * 1500;
rpcInit.afp = retrieveAuthInfo; rpcInit.afp = retrieveAuthInfo;
for (int i = 1; i < argc; ++i) { for (int i = 1; i < argc; ++i) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册