提交 cc5ae34d 编写于 作者: dengyihao's avatar dengyihao

fix conn persist bug

上级 8bb83943
...@@ -140,6 +140,7 @@ typedef struct { ...@@ -140,6 +140,7 @@ typedef struct {
SRpcMsg* pRsp; // for synchronous API SRpcMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API tsem_t* pSem; // for synchronous API
int hThrdIdx;
char* ip; char* ip;
uint32_t port; uint32_t port;
// SEpSet* pSet; // for synchronous API // SEpSet* pSet; // for synchronous API
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include "transComm.h" #include "transComm.h"
#define CONN_HOST_THREAD_INDEX(conn ? ((SCliConn*)conn)->hThrdIdx : -1) #define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10) #define CONN_PERSIST_TIME(para) (para * 1000 * 10)
typedef struct SCliConn { typedef struct SCliConn {
...@@ -29,8 +29,8 @@ typedef struct SCliConn { ...@@ -29,8 +29,8 @@ typedef struct SCliConn {
void* data; void* data;
queue conn; queue conn;
uint64_t expireTime; uint64_t expireTime;
int8_t ctnRdCnt; // timers already notify to client int8_t ctnRdCnt; // continue read count
int hostThreadIndex; int hThrdIdx;
SRpcPush* push; SRpcPush* push;
int persist; // int persist; //
...@@ -482,12 +482,12 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -482,12 +482,12 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
if (pMsg->msg.handle == NULL) { if (pMsg->msg.handle == NULL) {
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
if (conn != NULL) { if (conn != NULL) {
tTrace("client conn %d get from conn pool", conn); tTrace("client conn %p get from conn pool", conn);
} }
} else { } else {
conn = (SCliConn*)(pMsg->msg.handle); conn = (SCliConn*)(pMsg->msg.handle);
if (conn != NULL) { if (conn != NULL) {
tTrace("client conn %d reused", conn); tTrace("client conn %p reused", conn);
} }
} }
...@@ -503,7 +503,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -503,7 +503,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
clientWrite(conn); clientWrite(conn);
} else { } else {
SCliConn* conn = calloc(1, sizeof(SCliConn)); conn = calloc(1, sizeof(SCliConn));
conn->ref++; conn->ref++;
// read/write stream handle // read/write stream handle
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
...@@ -652,15 +652,9 @@ static void clientSendQuit(SCliThrdObj* thrd) { ...@@ -652,15 +652,9 @@ static void clientSendQuit(SCliThrdObj* thrd) {
SCliMsg* msg = calloc(1, sizeof(SCliMsg)); SCliMsg* msg = calloc(1, sizeof(SCliMsg));
msg->ctx = NULL; // msg->ctx = NULL; //
// pthread_mutex_lock(&thrd->msgMtx);
// QUEUE_PUSH(&thrd->msg, &msg->q);
// pthread_mutex_unlock(&thrd->msgMtx);
transSendAsync(thrd->asyncPool, &msg->q); transSendAsync(thrd->asyncPool, &msg->q);
// uv_async_send(thrd->cliAsync);
} }
void taosCloseClient(void* arg) { void taosCloseClient(void* arg) {
// impl later
SClientObj* cli = arg; SClientObj* cli = arg;
for (int i = 0; i < cli->numOfThreads; i++) { for (int i = 0; i < cli->numOfThreads; i++) {
clientSendQuit(cli->pThreadObj[i]); clientSendQuit(cli->pThreadObj[i]);
...@@ -683,7 +677,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* ...@@ -683,7 +677,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
SRpcInfo* pRpc = (SRpcInfo*)shandle; SRpcInfo* pRpc = (SRpcInfo*)shandle;
int index = CONN_HOST_THREAD_INDEX(pMsg.handle); int index = CONN_HOST_THREAD_INDEX(pMsg->handle);
if (index == -1) { if (index == -1) {
index = clientRBChoseIdx(pRpc); index = clientRBChoseIdx(pRpc);
} }
...@@ -717,7 +711,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { ...@@ -717,7 +711,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
SRpcInfo* pRpc = (SRpcInfo*)shandle; SRpcInfo* pRpc = (SRpcInfo*)shandle;
int index = CONN_HOST_THREAD_INDEX(pMsg.handle); int index = CONN_HOST_THREAD_INDEX(pReq->handle);
if (index == -1) { if (index == -1) {
index = clientRBChoseIdx(pRpc); index = clientRBChoseIdx(pRpc);
} }
...@@ -728,6 +722,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { ...@@ -728,6 +722,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
pCtx->msgType = pReq->msgType; pCtx->msgType = pReq->msgType;
pCtx->ip = strdup(ip); pCtx->ip = strdup(ip);
pCtx->port = port; pCtx->port = port;
pCtx->hThrdIdx = index;
pCtx->pSem = calloc(1, sizeof(tsem_t)); pCtx->pSem = calloc(1, sizeof(tsem_t));
pCtx->pRsp = pRsp; pCtx->pRsp = pRsp;
tsem_init(pCtx->pSem, 0, 0); tsem_init(pCtx->pSem, 0, 0);
......
...@@ -96,7 +96,7 @@ static void *sendRequest(void *param) { ...@@ -96,7 +96,7 @@ static void *sendRequest(void *param) {
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
SRpcInit rpcInit; SRpcInit rpcInit;
SEpSet epSet; SEpSet epSet = {0};
int msgSize = 128; int msgSize = 128;
int numOfReqs = 0; int numOfReqs = 0;
int appThreads = 1; int appThreads = 1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册