提交 62700c1f 编写于 作者: dengyihao's avatar dengyihao

handle persist conn

上级 9913d0c7
...@@ -47,8 +47,6 @@ typedef struct SRpcMsg { ...@@ -47,8 +47,6 @@ typedef struct SRpcMsg {
void * ahandle; // app handle set by client void * ahandle; // app handle set by client
int persist; // keep handle or not, default 0 int persist; // keep handle or not, default 0
SRpcPush *push;
} SRpcMsg; } SRpcMsg;
typedef struct SRpcPush { typedef struct SRpcPush {
......
...@@ -17,11 +17,6 @@ ...@@ -17,11 +17,6 @@
#include "transComm.h" #include "transComm.h"
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_INST_LABEL(conn) (((SRpcInfo*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label)
typedef struct SCliConn { typedef struct SCliConn {
T_REF_DECLARE() T_REF_DECLARE()
uv_connect_t connReq; uv_connect_t connReq;
...@@ -32,7 +27,6 @@ typedef struct SCliConn { ...@@ -32,7 +27,6 @@ typedef struct SCliConn {
void* data; void* data;
queue conn; queue conn;
uint64_t expireTime; uint64_t expireTime;
int8_t ctnRdCnt; // continue read count
int hThrdIdx; int hThrdIdx;
bool broken; // link broken or not bool broken; // link broken or not
...@@ -92,9 +86,9 @@ static void clientTimeoutCb(uv_timer_t* handle); ...@@ -92,9 +86,9 @@ static void clientTimeoutCb(uv_timer_t* handle);
// alloc buf for read // alloc buf for read
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
// callback after read nbytes from socket // callback after read nbytes from socket
static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void clientRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
// callback after write data to socket // callback after write data to socket
static void clientWriteCb(uv_write_t* req, int status); static void clientSendDataCb(uv_write_t* req, int status);
// callback after conn to server // callback after conn to server
static void clientConnCb(uv_connect_t* req, int status); static void clientConnCb(uv_connect_t* req, int status);
static void clientAsyncCb(uv_async_t* handle); static void clientAsyncCb(uv_async_t* handle);
...@@ -120,7 +114,27 @@ static void transDestroyConnCtx(STransConnCtx* ctx); ...@@ -120,7 +114,27 @@ static void transDestroyConnCtx(STransConnCtx* ctx);
// thread obj // thread obj
static SCliThrdObj* createThrdObj(); static SCliThrdObj* createThrdObj();
static void destroyThrdObj(SCliThrdObj* pThrd); static void destroyThrdObj(SCliThrdObj* pThrd);
// thread
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_INST_LABEL(conn) (((SRpcInfo*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label)
#define CONN_HANDLE_THREAD_QUIT(conn, thrd) \
do { \
if (thrd->quit) { \
clientHandleExcept(conn); \
} \
goto _RETURE; \
} while (0)
#define CONN_HANDLE_BROKEN(conn) \
do { \
if (conn->broken) { \
clientHandleExcept(conn); \
} \
goto _RETURE; \
} while (0);
static void* clientThread(void* arg); static void* clientThread(void* arg);
static void* clientNotifyApp() {} static void* clientNotifyApp() {}
...@@ -147,6 +161,8 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -147,6 +161,8 @@ static void clientHandleResp(SCliConn* conn) {
if (pTransInst->pfp != NULL && (pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) { if (pTransInst->pfp != NULL && (pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) {
rpcMsg.handle = conn; rpcMsg.handle = conn;
transRefCliHandle(conn);
conn->persist = 1; conn->persist = 1;
tDebug("client conn %p persist by app", conn); tDebug("client conn %p persist by app", conn);
} }
...@@ -165,9 +181,8 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -165,9 +181,8 @@ static void clientHandleResp(SCliConn* conn) {
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
tsem_post(pCtx->pSem); tsem_post(pCtx->pSem);
} }
conn->ctnRdCnt += 1;
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb); uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientRecvCb);
// user owns conn->persist = 1 // user owns conn->persist = 1
if (conn->persist == 0) { if (conn->persist == 0) {
...@@ -290,7 +305,6 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { ...@@ -290,7 +305,6 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
conn->ctnRdCnt = 0;
// list already create before // list already create before
assert(plist != NULL); assert(plist != NULL);
QUEUE_PUSH(&plist->conn, &conn->conn); QUEUE_PUSH(&plist->conn, &conn->conn);
...@@ -300,7 +314,7 @@ static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_b ...@@ -300,7 +314,7 @@ static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_b
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
transAllocBuffer(pBuf, buf); transAllocBuffer(pBuf, buf);
} }
static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
// impl later // impl later
if (handle->data == NULL) { if (handle->data == NULL) {
return; return;
...@@ -363,7 +377,7 @@ static void clientDestroy(uv_handle_t* handle) { ...@@ -363,7 +377,7 @@ static void clientDestroy(uv_handle_t* handle) {
free(conn); free(conn);
} }
static void clientWriteCb(uv_write_t* req, int status) { static void clientSendDataCb(uv_write_t* req, int status) {
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
if (status == 0) { if (status == 0) {
...@@ -378,10 +392,12 @@ static void clientWriteCb(uv_write_t* req, int status) { ...@@ -378,10 +392,12 @@ static void clientWriteCb(uv_write_t* req, int status) {
clientHandleExcept(pConn); clientHandleExcept(pConn);
return; return;
} }
uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb); uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientRecvCb);
} }
static void clientWrite(SCliConn* pConn) { static void clientSendData(SCliConn* pConn) {
CONN_HANDLE_BROKEN(pConn);
SCliMsg* pCliMsg = pConn->data; SCliMsg* pCliMsg = pConn->data;
STransConnCtx* pCtx = pCliMsg->ctx; STransConnCtx* pCtx = pCliMsg->ctx;
...@@ -420,7 +436,11 @@ static void clientWrite(SCliConn* pConn) { ...@@ -420,7 +436,11 @@ static void clientWrite(SCliConn* pConn) {
TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientSendDataCb);
return;
_RETURE:
return;
} }
static void clientConnCb(uv_connect_t* req, int status) { static void clientConnCb(uv_connect_t* req, int status) {
// impl later // impl later
...@@ -439,7 +459,7 @@ static void clientConnCb(uv_connect_t* req, int status) { ...@@ -439,7 +459,7 @@ static void clientConnCb(uv_connect_t* req, int status) {
tTrace("%s client conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s client conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
assert(pConn->stream == req->handle); assert(pConn->stream == req->handle);
clientWrite(pConn); clientSendData(pConn);
} }
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
...@@ -452,35 +472,34 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -452,35 +472,34 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
pThrd->quit = true; pThrd->quit = true;
uv_stop(pThrd->loop); uv_stop(pThrd->loop);
} }
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { static SCliConn* clientGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
uint64_t et = taosGetTimestampUs(); SCliConn* conn = NULL;
uint64_t el = et - pMsg->st;
tTrace("%s client msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el);
STransConnCtx* pCtx = pMsg->ctx;
SRpcInfo* pTransInst = pThrd->pTransInst;
SCliConn* conn = NULL;
if (pMsg->msg.handle != NULL) { if (pMsg->msg.handle != NULL) {
conn = (SCliConn*)(pMsg->msg.handle); conn = (SCliConn*)(pMsg->msg.handle);
transUnrefCliHandle(conn);
if (conn != NULL) { if (conn != NULL) {
tTrace("%s client conn %p reused", CONN_GET_INST_LABEL(conn), conn); tTrace("%s client conn %p reused", CONN_GET_INST_LABEL(conn), conn);
} }
} else { } else {
STransConnCtx* pCtx = pMsg->ctx;
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
if (conn != NULL) tTrace("%s client conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); if (conn != NULL) tTrace("%s client conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
} }
return conn;
}
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
uint64_t et = taosGetTimestampUs();
uint64_t el = et - pMsg->st;
tTrace("%s client msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el);
STransConnCtx* pCtx = pMsg->ctx;
SRpcInfo* pTransInst = pThrd->pTransInst;
SCliConn* conn = clientGetConn(pMsg, pThrd);
if (conn != NULL) { if (conn != NULL) {
conn->data = pMsg; conn->data = pMsg;
conn->writeReq.data = conn;
transDestroyBuffer(&conn->readBuf); transDestroyBuffer(&conn->readBuf);
clientSendData(conn);
if (pThrd->quit) {
clientHandleExcept(conn);
return;
}
clientWrite(conn);
} else { } else {
conn = clientConnCreate(pThrd); conn = clientConnCreate(pThrd);
conn->data = pMsg; conn->data = pMsg;
...@@ -495,8 +514,6 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -495,8 +514,6 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tTrace("%s client conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); tTrace("%s client conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port);
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
} }
conn->ctnRdCnt = 0;
conn->hThrdIdx = pCtx->hThrdIdx; conn->hThrdIdx = pCtx->hThrdIdx;
} }
static void clientAsyncCb(uv_async_t* handle) { static void clientAsyncCb(uv_async_t* handle) {
......
...@@ -92,9 +92,9 @@ static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen); ...@@ -92,9 +92,9 @@ static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen);
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void uvOnTimeoutCb(uv_timer_t* handle); static void uvOnTimeoutCb(uv_timer_t* handle);
static void uvOnWriteCb(uv_write_t* req, int status); static void uvOnSendCb(uv_write_t* req, int status);
static void uvOnPipeWriteCb(uv_write_t* req, int status); static void uvOnPipeWriteCb(uv_write_t* req, int status);
static void uvOnAcceptCb(uv_stream_t* stream, int status); static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
...@@ -240,7 +240,7 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -240,7 +240,7 @@ static void uvHandleReq(SSrvConn* pConn) {
// validate msg type // validate msg type
} }
void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
// opt // opt
SSrvConn* conn = cli->data; SSrvConn* conn = cli->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
...@@ -282,7 +282,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) { ...@@ -282,7 +282,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
tError("server conn %p time out", pConn); tError("server conn %p time out", pConn);
} }
void uvOnWriteCb(uv_write_t* req, int status) { void uvOnSendCb(uv_write_t* req, int status) {
SSrvConn* conn = req->data; SSrvConn* conn = req->data;
transClearBuffer(&conn->readBuf); transClearBuffer(&conn->readBuf);
if (status == 0) { if (status == 0) {
...@@ -350,7 +350,7 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) { ...@@ -350,7 +350,7 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) {
SSrvConn* pConn = smsg->pConn; SSrvConn* pConn = smsg->pConn;
uv_timer_stop(&pConn->pTimer); uv_timer_stop(&pConn->pTimer);
uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb); uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
} }
static void uvStartSendResp(SSrvMsg* smsg) { static void uvStartSendResp(SSrvMsg* smsg) {
// impl // impl
...@@ -526,7 +526,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -526,7 +526,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
return; return;
} }
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnRecvCb);
} else { } else {
tDebug("failed to create new connection"); tDebug("failed to create new connection");
...@@ -641,7 +641,7 @@ static void uvDestroyConn(uv_handle_t* handle) { ...@@ -641,7 +641,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
uv_timer_stop(&conn->pTimer); uv_timer_stop(&conn->pTimer);
QUEUE_REMOVE(&conn->queue); QUEUE_REMOVE(&conn->queue);
free(conn->pTcp); free(conn->pTcp);
// free(conn); free(conn);
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
uv_loop_close(thrd->loop); uv_loop_close(thrd->loop);
......
...@@ -82,7 +82,7 @@ static void *sendRequest(void *param) { ...@@ -82,7 +82,7 @@ static void *sendRequest(void *param) {
rpcMsg.contLen = pInfo->msgSize; rpcMsg.contLen = pInfo->msgSize;
rpcMsg.ahandle = pInfo; rpcMsg.ahandle = pInfo;
rpcMsg.msgType = 1; rpcMsg.msgType = 1;
rpcMsg.push = push; // rpcMsg.push = push;
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
int64_t start = taosGetTimestampUs(); int64_t start = taosGetTimestampUs();
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册