提交 ee28ab3b 编写于 作者: dengyihao's avatar dengyihao

enh: batch send

上级 c35b9386
......@@ -837,10 +837,9 @@ void cliSendBatch(SCliConn* pConn) {
uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t));
int i = 0;
while (!QUEUE_IS_EMPTY(&pBatch->wq)) {
queue* h = QUEUE_HEAD(&pBatch->wq);
queue* h = NULL;
QUEUE_FOREACH(h, &pBatch->wq) {
SCliMsg* pCliMsg = QUEUE_DATA(h, SCliMsg, q);
QUEUE_REMOVE(&pCliMsg->q);
STransConnCtx* pCtx = pCliMsg->ctx;
......@@ -878,7 +877,6 @@ void cliSendBatch(SCliConn* pConn) {
wb[i++] = uv_buf_init((char*)pHead, msgLen);
}
pBatch->wLen = 0;
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
req->data = pConn;
uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb);
......@@ -970,7 +968,13 @@ _RETURN:
static SCliBatch* cliDumpBatch(SCliBatch* pBatch) {
SCliBatch* pNewBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
memcpy(pNewBatch->wq, pBatch->wq, sizeof(pBatch->wq));
QUEUE_INIT(&pNewBatch->wq);
while (!QUEUE_IS_EMPTY(&pBatch->wq)) {
queue* h = QUEUE_HEAD(&pBatch->wq);
QUEUE_REMOVE(h);
QUEUE_PUSH(&pNewBatch->wq, h);
}
pNewBatch->batchSize = pBatch->batchSize;
pNewBatch->batch = pBatch->batch;
......@@ -1027,7 +1031,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
addr.sin_addr.s_addr = ipaddr;
addr.sin_port = (uint16_t)htons(pBatch->port);
tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pBatch->ip);
tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pBatch->dst);
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
if (fd == -1) {
tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
......@@ -1079,7 +1083,7 @@ static void cliSendBatchCb(uv_write_t* req, int status) {
static void cliHandleFastFail(SCliConn* pConn, int status) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
tError("conn %p free twice", pConn);
tError("conn %p free twice, reason:%s", pConn, uv_err_name(status));
if (pConn->pBatch == NULL) {
SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0);
......@@ -1443,7 +1447,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (REQUEST_NO_RESP(&pMsg->msg)) {
if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) {
STransConnCtx* pCtx = pMsg->ctx;
char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
......@@ -1469,7 +1473,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
(*ppBatch)->wLen += 1;
(*ppBatch)->batchSize += pMsg->msg.contLen;
}
return;
continue;
}
(*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
count++;
......@@ -1751,6 +1755,13 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosHashCleanup(pThrd->fqdn2ipCache);
taosHashCleanup(pThrd->failFastCache);
taosHashCleanup(pThrd->connLimitCache);
void** pIter = taosHashIterate(pThrd->batchCache, NULL);
while (pIter != NULL) {
SCliBatch* batch = (SCliBatch*)(*pIter);
cliDestroyBatch(batch);
pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
}
taosHashCleanup(pThrd->batchCache);
taosMemoryFree(pThrd);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册