提交 3de71e39 编写于 作者: dengyihao's avatar dengyihao

opt code

上级 a75e1701
...@@ -164,6 +164,8 @@ static void cliSend(SCliConn* pConn); ...@@ -164,6 +164,8 @@ static void cliSend(SCliConn* pConn);
static void cliSendBatch(SCliConn* pConn); static void cliSendBatch(SCliConn* pConn);
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port);
// cli util func // cli util func
static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx); static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
...@@ -1000,7 +1002,14 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { ...@@ -1000,7 +1002,14 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
SCliBatch* pNewBatch = cliDumpBatch(pBatch); SCliBatch* pNewBatch = cliDumpBatch(pBatch);
SCliConn* conn = getConnFromPool(pThrd->pool, pBatch->ip, pBatch->port); SCliConn* conn = getConnFromPool(pThrd->pool, pNewBatch->ip, pNewBatch->port);
if (conn == NULL && 0 != cliPreCheckSessionLimit(pThrd, pNewBatch->ip, pNewBatch->port)) {
tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pNewBatch->wLen,
pNewBatch->batchSize);
cliDestroyBatch(pNewBatch);
return;
}
if (conn == NULL) { if (conn == NULL) {
conn = cliCreateConn(pThrd); conn = cliCreateConn(pThrd);
conn->pBatch = pNewBatch; conn->pBatch = pNewBatch;
...@@ -1064,16 +1073,17 @@ static void cliSendBatchCb(uv_write_t* req, int status) { ...@@ -1064,16 +1073,17 @@ static void cliSendBatchCb(uv_write_t* req, int status) {
SCliConn* conn = req->data; SCliConn* conn = req->data;
taosMemoryFree(req); taosMemoryFree(req);
tDebug("%p conn %p send batch msg out, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, conn->pBatch->wLen,
conn->pBatch->batchSize);
SCliThrd* thrd = conn->hostThrd; SCliThrd* thrd = conn->hostThrd;
cliDestroyBatch(conn->pBatch); cliDestroyBatch(conn->pBatch);
conn->pBatch = NULL; conn->pBatch = NULL;
if (status != 0) { if (status != 0) {
tDebug("%p conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn,
conn->pBatch->wLen, conn->pBatch->batchSize, uv_err_name(status));
cliHandleExcept(conn); cliHandleExcept(conn);
} else { } else {
tDebug("%p conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn,
conn->pBatch->wLen, conn->pBatch->batchSize);
addConnToPool(thrd->pool, conn); addConnToPool(thrd->pool, conn);
} }
} }
...@@ -1282,12 +1292,12 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { ...@@ -1282,12 +1292,12 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
return; return;
} }
static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, SCliMsg* pMsg) { static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port) {
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
STransConnCtx* pCtx = pMsg->ctx; // STransConnCtx* pCtx = pMsg->ctx;
char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); // char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
int32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); // int32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
char key[TSDB_FQDN_LEN + 64] = {0}; char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port); CONN_CONSTRUCT_HASH_KEY(key, ip, port);
...@@ -1306,6 +1316,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -1306,6 +1316,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
STraceId* trace = &pMsg->msg.info.traceId; STraceId* trace = &pMsg->msg.info.traceId;
char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
if (!EPSET_IS_VALID(&pCtx->epSet)) { if (!EPSET_IS_VALID(&pCtx->epSet)) {
tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
...@@ -1314,9 +1326,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -1314,9 +1326,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
} }
if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); char key[TSDB_FQDN_LEN + 64] = {0};
uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port); CONN_CONSTRUCT_HASH_KEY(key, ip, port);
SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key));
...@@ -1344,7 +1354,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -1344,7 +1354,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
return; return;
} }
if (conn == NULL && REQUEST_NO_RESP(&pMsg->msg) && 0 != cliPreCheckSessionLimit(pThrd, pMsg)) { if (conn == NULL && REQUEST_NO_RESP(&pMsg->msg) && 0 != cliPreCheckSessionLimit(pThrd, ip, port)) {
tGTrace("%s, msg %s cancel to send, reason: %s", pTransInst->label, TMSG_INFO(pMsg->msg.msgType), tGTrace("%s, msg %s cancel to send, reason: %s", pTransInst->label, TMSG_INFO(pMsg->msg.msgType),
tstrerror(TSDB_CODE_RPC_MAX_SESSIONS)); tstrerror(TSDB_CODE_RPC_MAX_SESSIONS));
destroyCmsg(pMsg); destroyCmsg(pMsg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册