提交 b894ba6f 编写于 作者: dengyihao's avatar dengyihao

opt trans

上级 597d7d3d
...@@ -159,11 +159,9 @@ static void cliIdleCb(uv_idle_t* handle); ...@@ -159,11 +159,9 @@ static void cliIdleCb(uv_idle_t* handle);
static void cliPrepareCb(uv_prepare_t* handle); static void cliPrepareCb(uv_prepare_t* handle);
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd); static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd);
// static void cliConnBatchCb(uv_connect_t* req, int status);
static void cliSendBatchCb(uv_write_t* req, int status); static void cliSendBatchCb(uv_write_t* req, int status);
// static void cliConnBatchCb(uv_connect_t* req, int status);
// callback after conn to server SCliBatch* cliGetHeadFromList(SCliBatchList* pList);
// static void cliConnBatchCb(uv_connect_t* req, int status);
static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead); static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);
...@@ -847,8 +845,11 @@ void cliSendBatch(SCliConn* pConn) { ...@@ -847,8 +845,11 @@ void cliSendBatch(SCliConn* pConn) {
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
SCliBatch* pBatch = pConn->pBatch; SCliBatch* pBatch = pConn->pBatch;
int32_t wLen = pBatch->wLen; SCliBatchList* pList = pBatch->pList;
pList->connCnt += 1;
int32_t wLen = pBatch->wLen;
uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t)); uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t));
int i = 0; int i = 0;
...@@ -994,7 +995,7 @@ static void cliDestroyBatch(SCliBatch* pBatch) { ...@@ -994,7 +995,7 @@ static void cliDestroyBatch(SCliBatch* pBatch) {
taosMemoryFree(pBatch); taosMemoryFree(pBatch);
} }
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
if (pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) { if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) {
return; return;
} }
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
...@@ -1071,8 +1072,7 @@ static void cliSendBatchCb(uv_write_t* req, int status) { ...@@ -1071,8 +1072,7 @@ static void cliSendBatchCb(uv_write_t* req, int status) {
SCliBatch* p = conn->pBatch; SCliBatch* p = conn->pBatch;
SCliBatchList* pBatchList = p->pList; SCliBatchList* pBatchList = p->pList;
SCliBatch* nxtBatch = cliGetHeadFromList(pBatchList);
int32_t empty = QUEUE_IS_EMPTY(&pBatchList->wq);
pBatchList->connCnt -= 1; pBatchList->connCnt -= 1;
conn->pBatch = NULL; conn->pBatch = NULL;
...@@ -1081,23 +1081,17 @@ static void cliSendBatchCb(uv_write_t* req, int status) { ...@@ -1081,23 +1081,17 @@ static void cliSendBatchCb(uv_write_t* req, int status) {
tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn,
p->wLen, p->batchSize, uv_err_name(status)); p->wLen, p->batchSize, uv_err_name(status));
cliHandleExcept(conn); cliHandleExcept(conn);
cliHandleBatchReq(nxtBatch, thrd);
} else { } else {
tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen, tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen,
p->batchSize); p->batchSize);
if (empty == false) { if (nxtBatch != NULL) {
queue* h = QUEUE_HEAD(&pBatchList->wq); conn->pBatch = nxtBatch;
QUEUE_REMOVE(h);
conn->pBatch = QUEUE_DATA(h, SCliBatch, listq);
pBatchList->connCnt += 1;
pBatchList->len -= 1;
cliSendBatch(conn); cliSendBatch(conn);
return; } else {
addConnToPool(thrd->pool, conn);
} }
addConnToPool(thrd->pool, conn);
} }
cliDestroyBatch(p); cliDestroyBatch(p);
...@@ -1466,6 +1460,18 @@ static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) { ...@@ -1466,6 +1460,18 @@ static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) {
tTrace("cli process batch size:%d", count); tTrace("cli process batch size:%d", count);
} }
} }
SCliBatch* cliGetHeadFromList(SCliBatchList* pList) {
if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt >= pList->connMax) {
return NULL;
}
queue* hr = QUEUE_HEAD(&pList->wq);
QUEUE_REMOVE(hr);
pList->len -= 1;
SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq);
return batch;
}
static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
int count = 0; int count = 0;
...@@ -1528,6 +1534,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { ...@@ -1528,6 +1534,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) { if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) {
QUEUE_PUSH(&pBatch->wq, h); QUEUE_PUSH(&pBatch->wq, h);
pBatch->batchSize += pMsg->msg.contLen; pBatch->batchSize += pMsg->msg.contLen;
pBatch->wLen += 1;
} else { } else {
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
QUEUE_INIT(&pBatch->wq); QUEUE_INIT(&pBatch->wq);
...@@ -1551,17 +1558,10 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { ...@@ -1551,17 +1558,10 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
void** pIter = taosHashIterate(pThrd->batchCache, NULL); void** pIter = taosHashIterate(pThrd->batchCache, NULL);
while (pIter != NULL) { while (pIter != NULL) {
SCliBatchList* batchList = (SCliBatchList*)(*pIter); SCliBatchList* batchList = (SCliBatchList*)(*pIter);
if (QUEUE_IS_EMPTY(&batchList->wq) || batchList->connCnt >= batchList->connMax) { SCliBatch* batch = cliGetHeadFromList(batchList);
continue; if (batch != NULL) {
cliHandleBatchReq(batch, pThrd);
} }
queue* hr = QUEUE_HEAD(&batchList->wq);
QUEUE_REMOVE(hr);
batchList->len -= 1;
SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq);
cliHandleBatchReq(batch, pThrd);
pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
} }
...@@ -1848,7 +1848,10 @@ static void destroyThrdObj(SCliThrd* pThrd) { ...@@ -1848,7 +1848,10 @@ static void destroyThrdObj(SCliThrd* pThrd) {
SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq); SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq);
cliDestroyBatch(pBatch); cliDestroyBatch(pBatch);
} }
taosMemoryFree(pBatchList->ip);
taosMemoryFree(pBatchList->dst);
taosMemoryFree(pBatchList); taosMemoryFree(pBatchList);
pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
} }
taosHashCleanup(pThrd->batchCache); taosHashCleanup(pThrd->batchCache);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册