提交 597d7d3d 编写于 作者: dengyihao's avatar dengyihao

opt transport

上级 d22e97d2
...@@ -22,13 +22,27 @@ typedef struct SConnList { ...@@ -22,13 +22,27 @@ typedef struct SConnList {
typedef struct { typedef struct {
queue wq; queue wq;
int32_t wLen; int32_t len;
int32_t batchSize; //
int32_t batch; int connMax;
int connCnt;
int batchLenLimit;
char* dst; char* dst;
char* ip; char* ip;
uint16_t port; uint16_t port;
} SCliBatchList;
typedef struct {
queue wq;
queue listq;
int32_t wLen;
int32_t batchSize; //
int32_t batch;
SCliBatchList* pList;
} SCliBatch; } SCliBatch;
typedef struct SCliConn { typedef struct SCliConn {
T_REF_DECLARE() T_REF_DECLARE()
uv_connect_t connReq; uv_connect_t connReq;
...@@ -866,14 +880,21 @@ void cliSendBatch(SCliConn* pConn) { ...@@ -866,14 +880,21 @@ void cliSendBatch(SCliConn* pConn) {
pHead->magicNum = htonl(TRANS_MAGIC_NUM); pHead->magicNum = htonl(TRANS_MAGIC_NUM);
} }
pHead->timestamp = taosHton64(taosGetTimestampUs()); pHead->timestamp = taosHton64(taosGetTimestampUs());
msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
if (pHead->comp == 0) {
if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
}
} else {
msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
}
wb[i++] = uv_buf_init((char*)pHead, msgLen); wb[i++] = uv_buf_init((char*)pHead, msgLen);
} }
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
req->data = pConn; req->data = pConn;
tDebug("%p conn %p start to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(pConn), pConn, tDebug("%s conn %p start to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(pConn), pConn,
pBatch->wLen, pBatch->batchSize); pBatch->wLen, pBatch->batchSize);
uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb); uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb);
taosMemoryFree(wb); taosMemoryFree(wb);
...@@ -962,36 +983,14 @@ _RETURN: ...@@ -962,36 +983,14 @@ _RETURN:
return; return;
} }
static SCliBatch* cliDumpBatch(SCliBatch* pBatch) {
SCliBatch* pNewBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
QUEUE_INIT(&pNewBatch->wq);
QUEUE_MOVE(&pBatch->wq, &pNewBatch->wq);
pNewBatch->batchSize = pBatch->batchSize;
pNewBatch->batch = pBatch->batch;
pNewBatch->wLen = pBatch->wLen;
pNewBatch->dst = strdup(pBatch->dst);
pNewBatch->ip = strdup(pBatch->ip);
pNewBatch->port = pBatch->port;
QUEUE_INIT(&pBatch->wq);
pBatch->batchSize = 0;
pBatch->batch = 0;
pBatch->wLen = 0;
return pNewBatch;
}
static void cliDestroyBatch(SCliBatch* pBatch) { static void cliDestroyBatch(SCliBatch* pBatch) {
while (!QUEUE_IS_EMPTY(&pBatch->wq)) { while (!QUEUE_IS_EMPTY(&pBatch->wq)) {
queue* h = QUEUE_HEAD(&pBatch->wq); queue* h = QUEUE_HEAD(&pBatch->wq);
SCliMsg* p = QUEUE_DATA(h, SCliMsg, q); QUEUE_REMOVE(h);
QUEUE_REMOVE(&p->q); SCliMsg* p = QUEUE_DATA(h, SCliMsg, q);
destroyCmsg(p); destroyCmsg(p);
} }
taosMemoryFree(pBatch->ip);
taosMemoryFree(pBatch->dst);
taosMemoryFree(pBatch); taosMemoryFree(pBatch);
} }
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
...@@ -999,25 +998,22 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { ...@@ -999,25 +998,22 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
return; return;
} }
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
SCliBatchList* pList = pBatch->pList;
SCliBatch* pNewBatch = cliDumpBatch(pBatch); SCliConn* conn = getConnFromPool(pThrd->pool, pList->ip, pList->port);
SCliConn* conn = getConnFromPool(pThrd->pool, pNewBatch->ip, pNewBatch->port); if (conn == NULL && 0 != cliPreCheckSessionLimit(pThrd, pList->ip, pList->port)) {
tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pBatch->wLen,
if (conn == NULL && 0 != cliPreCheckSessionLimit(pThrd, pNewBatch->ip, pNewBatch->port)) { pBatch->batchSize);
tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pNewBatch->wLen, cliDestroyBatch(pBatch);
pNewBatch->batchSize);
cliDestroyBatch(pNewBatch);
return; return;
} }
if (conn == NULL) { if (conn == NULL) {
conn = cliCreateConn(pThrd); conn = cliCreateConn(pThrd);
conn->pBatch = pNewBatch; conn->pBatch = pBatch;
conn->ip = strdup(pNewBatch->dst); conn->ip = strdup(pList->dst);
char* ip = pNewBatch->ip; uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip);
uint16_t port = pNewBatch->port;
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip);
if (ipaddr == 0xffffffff) { if (ipaddr == 0xffffffff) {
uv_timer_stop(conn->timer); uv_timer_stop(conn->timer);
conn->timer->data = NULL; conn->timer->data = NULL;
...@@ -1030,9 +1026,9 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { ...@@ -1030,9 +1026,9 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
struct sockaddr_in addr; struct sockaddr_in addr;
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
addr.sin_addr.s_addr = ipaddr; addr.sin_addr.s_addr = ipaddr;
addr.sin_port = (uint16_t)htons(port); addr.sin_port = (uint16_t)htons(pList->port);
tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pBatch->dst); tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst);
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4); int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
if (fd == -1) { if (fd == -1) {
tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
...@@ -1066,7 +1062,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { ...@@ -1066,7 +1062,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
return; return;
} }
conn->pBatch = pNewBatch; conn->pBatch = pBatch;
cliSendBatch(conn); cliSendBatch(conn);
} }
static void cliSendBatchCb(uv_write_t* req, int status) { static void cliSendBatchCb(uv_write_t* req, int status) {
...@@ -1074,15 +1070,33 @@ static void cliSendBatchCb(uv_write_t* req, int status) { ...@@ -1074,15 +1070,33 @@ static void cliSendBatchCb(uv_write_t* req, int status) {
SCliThrd* thrd = conn->hostThrd; SCliThrd* thrd = conn->hostThrd;
SCliBatch* p = conn->pBatch; SCliBatch* p = conn->pBatch;
SCliBatchList* pBatchList = p->pList;
int32_t empty = QUEUE_IS_EMPTY(&pBatchList->wq);
pBatchList->connCnt -= 1;
conn->pBatch = NULL; conn->pBatch = NULL;
if (status != 0) { if (status != 0) {
tDebug("%p conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn,
p->wLen, p->batchSize, uv_err_name(status)); p->wLen, p->batchSize, uv_err_name(status));
cliHandleExcept(conn); cliHandleExcept(conn);
} else { } else {
tDebug("%p conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen, tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen,
p->batchSize); p->batchSize);
if (empty == false) {
queue* h = QUEUE_HEAD(&pBatchList->wq);
QUEUE_REMOVE(h);
conn->pBatch = QUEUE_DATA(h, SCliBatch, listq);
pBatchList->connCnt += 1;
pBatchList->len -= 1;
cliSendBatch(conn);
return;
}
addConnToPool(thrd->pool, conn); addConnToPool(thrd->pool, conn);
} }
...@@ -1468,23 +1482,65 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { ...@@ -1468,23 +1482,65 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
char key[TSDB_FQDN_LEN + 64] = {0}; char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port); CONN_CONSTRUCT_HASH_KEY(key, ip, port);
SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key)); // SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key));
if (ppBatch == NULL || *ppBatch == NULL) { SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, sizeof(key));
if (ppBatchList == NULL || *ppBatchList == NULL) {
SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
QUEUE_INIT(&pBatchList->wq);
pBatchList->connMax = 200;
pBatchList->connCnt = 0;
pBatchList->batchLenLimit = 16 * 1024;
pBatchList->ip = strdup(ip);
pBatchList->dst = strdup(key);
pBatchList->port = port;
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
QUEUE_INIT(&pBatch->wq); QUEUE_INIT(&pBatch->wq);
QUEUE_INIT(&pBatch->listq);
QUEUE_PUSH(&pBatch->wq, h); QUEUE_PUSH(&pBatch->wq, h);
pBatch->wLen += 1; pBatch->wLen += 1;
pBatch->batchSize += pMsg->msg.contLen; pBatch->batchSize += pMsg->msg.contLen;
pBatch->pList = pBatchList;
QUEUE_PUSH(&pBatchList->wq, &pBatch->listq);
pBatch->dst = strdup(key); taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatchList, sizeof(void*));
pBatch->ip = strdup(ip); } else {
pBatch->port = (uint16_t)port; if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
QUEUE_INIT(&pBatch->wq);
QUEUE_INIT(&pBatch->listq);
QUEUE_PUSH(&pBatch->wq, h);
pBatch->wLen += 1;
pBatch->batchSize = pMsg->msg.contLen;
pBatch->pList = *ppBatchList;
QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
(*ppBatchList)->len += 1;
continue;
}
taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatch, sizeof(void*)); queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq));
SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq);
if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) {
QUEUE_PUSH(&pBatch->wq, h);
pBatch->batchSize += pMsg->msg.contLen;
} else { } else {
QUEUE_PUSH(&(*ppBatch)->wq, h); SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
(*ppBatch)->wLen += 1; QUEUE_INIT(&pBatch->wq);
(*ppBatch)->batchSize += pMsg->msg.contLen; QUEUE_INIT(&pBatch->listq);
QUEUE_PUSH(&pBatch->wq, h);
pBatch->wLen += 1;
pBatch->batchSize += pMsg->msg.contLen;
pBatch->pList = *ppBatchList;
QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
(*ppBatchList)->len += 1;
}
} }
continue; continue;
} }
...@@ -1494,7 +1550,16 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { ...@@ -1494,7 +1550,16 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
void** pIter = taosHashIterate(pThrd->batchCache, NULL); void** pIter = taosHashIterate(pThrd->batchCache, NULL);
while (pIter != NULL) { while (pIter != NULL) {
SCliBatch* batch = (SCliBatch*)(*pIter); SCliBatchList* batchList = (SCliBatchList*)(*pIter);
if (QUEUE_IS_EMPTY(&batchList->wq) || batchList->connCnt >= batchList->connMax) {
continue;
}
queue* hr = QUEUE_HEAD(&batchList->wq);
QUEUE_REMOVE(hr);
batchList->len -= 1;
SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq);
cliHandleBatchReq(batch, pThrd); cliHandleBatchReq(batch, pThrd);
pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
...@@ -1775,8 +1840,15 @@ static void destroyThrdObj(SCliThrd* pThrd) { ...@@ -1775,8 +1840,15 @@ static void destroyThrdObj(SCliThrd* pThrd) {
void** pIter = taosHashIterate(pThrd->batchCache, NULL); void** pIter = taosHashIterate(pThrd->batchCache, NULL);
while (pIter != NULL) { while (pIter != NULL) {
SCliBatch* batch = (SCliBatch*)(*pIter); SCliBatchList* pBatchList = (SCliBatchList*)(*pIter);
cliDestroyBatch(batch); while (!QUEUE_IS_EMPTY(&pBatchList->wq)) {
queue* h = QUEUE_HEAD(&pBatchList->wq);
QUEUE_REMOVE(h);
SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq);
cliDestroyBatch(pBatch);
}
taosMemoryFree(pBatchList);
pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
} }
taosHashCleanup(pThrd->batchCache); taosHashCleanup(pThrd->batchCache);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册