提交 8bb83943 编写于 作者: dengyihao's avatar dengyihao

add conn persist

上级 2fb3d3f2
......@@ -17,6 +17,7 @@
#include "transComm.h"
#define CONN_HOST_THREAD_INDEX(conn ? ((SCliConn*)conn)->hThrdIdx : -1)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
typedef struct SCliConn {
......@@ -28,7 +29,8 @@ typedef struct SCliConn {
void* data;
queue conn;
uint64_t expireTime;
int8_t notifyCount; // timers already notify to client
int8_t ctnRdCnt; // timers already notify to client
int hostThreadIndex;
SRpcPush* push;
int persist; //
......@@ -131,11 +133,16 @@ static void clientHandleResp(SCliConn* conn) {
rpcMsg.msgType = pHead->msgType;
rpcMsg.ahandle = pCtx->ahandle;
if (rpcMsg.msgType == TDMT_VND_QUERY_RSP || rpcMsg.msgType == TDMT_VND_FETCH_RSP) {
rpcMsg.handle = conn;
conn->persist = 1;
}
tDebug("client conn %p %s received from %s:%d, local info: %s:%d", conn, TMSG_INFO(pHead->msgType),
inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr),
ntohs(conn->locaddr.sin_port));
if (conn->push != NULL && conn->notifyCount != 0) {
if (conn->push != NULL && conn->ctnRdCnt != 0) {
(*conn->push->callback)(conn->push->arg, &rpcMsg);
conn->push = NULL;
} else {
......@@ -148,7 +155,7 @@ static void clientHandleResp(SCliConn* conn) {
tsem_post(pCtx->pSem);
}
}
conn->notifyCount += 1;
conn->ctnRdCnt += 1;
conn->secured = pHead->secured;
// buf's mem alread translated to rpcMsg.pCont
......@@ -157,13 +164,15 @@ static void clientHandleResp(SCliConn* conn) {
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
SCliThrdObj* pThrd = conn->hostThrd;
// user owns conn->persist = 1
if (conn->push == NULL) {
if (conn->push == NULL || conn->persist == 0) {
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
destroyCmsg(conn->data);
conn->data = NULL;
}
// start thread's timer of conn pool if not active
if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) {
uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
......@@ -189,7 +198,7 @@ static void clientHandleExcept(SCliConn* pConn) {
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
rpcMsg.msgType = msgType + 1;
if (pConn->push != NULL && pConn->notifyCount != 0) {
if (pConn->push != NULL && pConn->ctnRdCnt != 0) {
(*pConn->push->callback)(pConn->push->arg, &rpcMsg);
pConn->push = NULL;
} else {
......@@ -210,7 +219,7 @@ static void clientHandleExcept(SCliConn* pConn) {
}
// transDestroyConnCtx(pCtx);
clientConnDestroy(pConn, true);
pConn->notifyCount += 1;
pConn->ctnRdCnt += 1;
}
static void clientTimeoutCb(uv_timer_t* handle) {
......@@ -292,7 +301,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
conn->notifyCount = 0;
conn->ctnRdCnt = 0;
// list already create before
assert(plist != NULL);
QUEUE_PUSH(&plist->conn, &conn->conn);
......@@ -423,6 +432,8 @@ static void clientWrite(SCliConn* pConn) {
pHead->msgType = pMsg->msgType;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
// if (pHead->msgType == TDMT_VND_QUERY || pHead->msgType == TDMT_VND_)
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
tDebug("client conn %p %s is send to %s:%d, local info %s:%d", pConn, TMSG_INFO(pHead->msgType),
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
......@@ -462,14 +473,25 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
uint64_t et = taosGetTimestampUs();
uint64_t el = et - pMsg->st;
tTrace("client msg tran time cost: %" PRIu64 "", el);
tTrace("client msg tran time cost: %" PRIu64 "us", el);
et = taosGetTimestampUs();
STransConnCtx* pCtx = pMsg->ctx;
SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
SCliConn* conn = NULL;
if (pMsg->msg.handle == NULL) {
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
if (conn != NULL) {
tTrace("client conn %d get from conn pool", conn);
}
} else {
conn = (SCliConn*)(pMsg->msg.handle);
if (conn != NULL) {
tTrace("client conn %d reused", conn);
}
}
if (conn != NULL) {
// impl later
tTrace("client get conn %p from pool", conn);
conn->data = pMsg;
conn->writeReq->data = conn;
transDestroyBuffer(&conn->readBuf);
......@@ -480,8 +502,6 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
}
clientWrite(conn);
conn->push = pMsg->msg.push;
} else {
SCliConn* conn = calloc(1, sizeof(SCliConn));
conn->ref++;
......@@ -500,13 +520,18 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
conn->data = pMsg;
conn->hostThrd = pThrd;
conn->push = pMsg->msg.push;
// conn->push = pMsg->msg.push;
// conn->ctnRdCnt = 0;
struct sockaddr_in addr;
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
// handle error in callback if fail to connect
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
}
conn->push = pMsg->msg.push;
conn->ctnRdCnt = 0;
conn->hThrdIdx = pCtx->hThrdIdx;
}
static void clientAsyncCb(uv_async_t* handle) {
SAsyncItem* item = handle->data;
......@@ -644,6 +669,13 @@ void taosCloseClient(void* arg) {
free(cli->pThreadObj);
free(cli);
}
static int clientRBChoseIdx(SRpcInfo* pRpc) {
int64_t index = pRpc->index;
if (pRpc->index++ >= pRpc->numOfThreads) {
pRpc->index = 0;
}
return index % pRpc->numOfThreads;
}
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
// impl later
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
......@@ -651,48 +683,45 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
SRpcInfo* pRpc = (SRpcInfo*)shandle;
int index = CONN_HOST_THREAD_INDEX(pMsg.handle);
if (index == -1) {
index = clientRBChoseIdx(pRpc);
}
int32_t flen = 0;
if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
// imp later
}
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
pCtx->pTransInst = (SRpcInfo*)shandle;
pCtx->ahandle = pMsg->ahandle;
pCtx->msgType = pMsg->msgType;
pCtx->ip = strdup(ip);
pCtx->port = port;
pCtx->hThrdIdx = index;
assert(pRpc->connType == TAOS_CONN_CLIENT);
// atomic or not
int64_t index = pRpc->index;
if (pRpc->index++ >= pRpc->numOfThreads) {
pRpc->index = 0;
}
SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
cliMsg->ctx = pCtx;
cliMsg->msg = *pMsg;
cliMsg->st = taosGetTimestampUs();
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];
// pthread_mutex_lock(&thrd->msgMtx);
// QUEUE_PUSH(&thrd->msg, &cliMsg->q);
// pthread_mutex_unlock(&thrd->msgMtx);
// int start = taosGetTimestampUs();
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index];
transSendAsync(thrd->asyncPool, &(cliMsg->q));
// uv_async_send(thrd->cliAsync);
// int end = taosGetTimestampUs() - start;
// tError("client sent to rpc, time cost: %d", (int)end);
}
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
SRpcInfo* pRpc = (SRpcInfo*)shandle;
int index = CONN_HOST_THREAD_INDEX(pMsg.handle);
if (index == -1) {
index = clientRBChoseIdx(pRpc);
}
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
pCtx->pTransInst = (SRpcInfo*)shandle;
pCtx->ahandle = pReq->ahandle;
......@@ -703,23 +732,13 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
pCtx->pRsp = pRsp;
tsem_init(pCtx->pSem, 0, 0);
int64_t index = pRpc->index;
if (pRpc->index++ >= pRpc->numOfThreads) {
pRpc->index = 0;
}
SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
cliMsg->ctx = pCtx;
cliMsg->msg = *pReq;
cliMsg->st = taosGetTimestampUs();
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];
// pthread_mutex_lock(&thrd->msgMtx);
// QUEUE_PUSH(&thrd->msg, &cliMsg->q);
// pthread_mutex_unlock(&thrd->msgMtx);
// int start = taosGetTimestampUs();
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index];
transSendAsync(thrd->asyncPool, &(cliMsg->q));
tsem_t* pSem = pCtx->pSem;
tsem_wait(pSem);
tsem_destroy(pSem);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册