提交 c35b9386 编写于 作者: dengyihao's avatar dengyihao

fix invalid read/write

上级 a6a8daec
...@@ -20,6 +20,15 @@ typedef struct SConnList { ...@@ -20,6 +20,15 @@ typedef struct SConnList {
int32_t size; int32_t size;
} SConnList; } SConnList;
typedef struct {
queue wq;
int32_t wLen;
int32_t batchSize; //
int32_t batch;
char* dst;
char* ip;
uint16_t port;
} SCliBatch;
typedef struct SCliConn { typedef struct SCliConn {
T_REF_DECLARE() T_REF_DECLARE()
uv_connect_t connReq; uv_connect_t connReq;
...@@ -64,16 +73,6 @@ typedef struct SCliMsg { ...@@ -64,16 +73,6 @@ typedef struct SCliMsg {
int sent; //(0: no send, 1: alread sent) int sent; //(0: no send, 1: alread sent)
} SCliMsg; } SCliMsg;
typedef struct {
queue wq;
int32_t wLen;
int32_t batchSize; //
int32_t batch;
char* dst;
char* ip;
uint16_t port;
} SCliBatch;
typedef struct SCliThrd { typedef struct SCliThrd {
TdThread thread; // tid TdThread thread; // tid
int64_t pid; // pid int64_t pid; // pid
...@@ -145,10 +144,12 @@ static void cliAsyncCb(uv_async_t* handle); ...@@ -145,10 +144,12 @@ static void cliAsyncCb(uv_async_t* handle);
static void cliIdleCb(uv_idle_t* handle); static void cliIdleCb(uv_idle_t* handle);
static void cliPrepareCb(uv_prepare_t* handle); static void cliPrepareCb(uv_prepare_t* handle);
static void cliSendBatch(const SCliBatch* pBatch, SCliThrd* pThrd); static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd);
// static void cliConnBatchCb(uv_connect_t* req, int status);
static void cliSendBatchCb(uv_write_t* req, int status); static void cliSendBatchCb(uv_write_t* req, int status);
// callback after conn to server // static void cliConnBatchCb(uv_connect_t* req, int status);
static void cliConnBatchCb(uv_connect_t* req, int status); // callback after conn to server
// static void cliConnBatchCb(uv_connect_t* req, int status);
static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead); static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);
...@@ -160,6 +161,7 @@ static SCliConn* cliCreateConn(SCliThrd* thrd); ...@@ -160,6 +161,7 @@ static SCliConn* cliCreateConn(SCliThrd* thrd);
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void cliDestroy(uv_handle_t* handle); static void cliDestroy(uv_handle_t* handle);
static void cliSend(SCliConn* pConn); static void cliSend(SCliConn* pConn);
static void cliSendBatch(SCliConn* pConn);
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
// cli util func // cli util func
...@@ -825,7 +827,63 @@ static void cliSendCb(uv_write_t* req, int status) { ...@@ -825,7 +827,63 @@ static void cliSendCb(uv_write_t* req, int status) {
} }
uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb); uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
} }
void cliSendBatch(SCliConn* pConn) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
SCliBatch* pBatch = pConn->pBatch;
int32_t wLen = pBatch->wLen;
uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t));
int i = 0;
while (!QUEUE_IS_EMPTY(&pBatch->wq)) {
queue* h = QUEUE_HEAD(&pBatch->wq);
SCliMsg* pCliMsg = QUEUE_DATA(h, SCliMsg, q);
QUEUE_REMOVE(&pCliMsg->q);
STransConnCtx* pCtx = pCliMsg->ctx;
STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
if (pMsg->pCont == 0) {
pMsg->pCont = (void*)rpcMallocCont(0);
pMsg->contLen = 0;
}
int msgLen = transMsgLenFromCont(pMsg->contLen);
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
if (pHead->comp == 0) {
pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
pHead->msgType = pMsg->msgType;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
pHead->traceId = pMsg->info.traceId;
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
}
pHead->timestamp = taosHton64(taosGetTimestampUs());
if (pHead->comp == 0) {
if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
}
} else {
msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
}
wb[i++] = uv_buf_init((char*)pHead, msgLen);
}
pBatch->wLen = 0;
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
req->data = pConn;
uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb);
taosMemoryFree(wb);
}
void cliSend(SCliConn* pConn) { void cliSend(SCliConn* pConn) {
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
...@@ -911,8 +969,8 @@ _RETURN: ...@@ -911,8 +969,8 @@ _RETURN:
} }
static SCliBatch* cliDumpBatch(SCliBatch* pBatch) { static SCliBatch* cliDumpBatch(SCliBatch* pBatch) {
SCliBatch* pNewBatch = taosMemCalloc(1, sizeof(SClicBatch)); SCliBatch* pNewBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
pNewBatch->wq = pBatch->wq; memcpy(pNewBatch->wq, pBatch->wq, sizeof(pBatch->wq));
pNewBatch->batchSize = pBatch->batchSize; pNewBatch->batchSize = pBatch->batchSize;
pNewBatch->batch = pBatch->batch; pNewBatch->batch = pBatch->batch;
...@@ -929,19 +987,19 @@ static SCliBatch* cliDumpBatch(SCliBatch* pBatch) { ...@@ -929,19 +987,19 @@ static SCliBatch* cliDumpBatch(SCliBatch* pBatch) {
return pNewBatch; return pNewBatch;
} }
static void cliDestroyBatch(SCliBatch* pBatch) { static void cliDestroyBatch(SCliBatch* pBatch) {
while (!EMPTY_IS_EMPTY(&pBatch->wq)) { while (!QUEUE_IS_EMPTY(&pBatch->wq)) {
queue* h = QUEUE_HEAD(&pBatch->wq); queue* h = QUEUE_HEAD(&pBatch->wq);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); SCliMsg* p = QUEUE_DATA(h, SCliMsg, q);
QUEUE_REMOVE(&pMsg->q); QUEUE_REMOVE(&p->q);
destroyCmsg(p); destroyCmsg(p);
} }
taosMemoryFree(pBatch->ip); taosMemoryFree(pBatch->ip);
taosMemoryFree(pBatch->dst); taosMemoryFree(pBatch->dst);
taosMemoryFree(pBatch); taosMemoryFree(pBatch);
} }
static void cliSendBatch(SCliBatch* pBatch, SCliThrd* pThrd) { static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
if (pBatch->wLen == 0 || EMPTY_IS_EMPTY(&pBatch->wq)) { if (pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) {
return; return;
} }
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
...@@ -961,33 +1019,32 @@ static void cliSendBatch(SCliBatch* pBatch, SCliThrd* pThrd) { ...@@ -961,33 +1019,32 @@ static void cliSendBatch(SCliBatch* pBatch, SCliThrd* pThrd) {
taosArrayPush(pThrd->timerList, &conn->timer); taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL; conn->timer = NULL;
cliHandleExcept(conn); cliHandleFastFail(conn, -1);
return; return;
} }
struct sockaddr_in addr; struct sockaddr_in addr;
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
addr.sin_addr.s_addr = ipaddr; addr.sin_addr.s_addr = ipaddr;
addr.sin_port = (uint16_t)htons(port); addr.sin_port = (uint16_t)htons(pBatch->port);
tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->ip); tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pBatch->ip);
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4); int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
if (fd == -1) { if (fd == -1) {
tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
tstrerror(TAOS_SYSTEM_ERROR(errno))); tstrerror(TAOS_SYSTEM_ERROR(errno)));
cliHandleExcept(conn); cliHandleFastFail(conn, -1);
errno = 0;
return; return;
} }
int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
if (ret != 0) { if (ret != 0) {
tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); tError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
cliHandleExcept(conn); cliHandleFastFail(conn, -1);
return; return;
} }
ret = transSetConnOption((uv_tcp_t*)conn->stream); ret = transSetConnOption((uv_tcp_t*)conn->stream);
if (ret != 0) { if (ret != 0) {
tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
cliHandleExcept(conn); cliHandleFastFail(conn, -1);
return; return;
} }
...@@ -997,8 +1054,7 @@ static void cliSendBatch(SCliBatch* pBatch, SCliThrd* pThrd) { ...@@ -997,8 +1054,7 @@ static void cliSendBatch(SCliBatch* pBatch, SCliThrd* pThrd) {
conn->timer->data = NULL; conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer); taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL; conn->timer = NULL;
cliHandleFastFail(conn, -1);
cliHandleFastFail(conn, ret);
return; return;
} }
uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
...@@ -1006,59 +1062,7 @@ static void cliSendBatch(SCliBatch* pBatch, SCliThrd* pThrd) { ...@@ -1006,59 +1062,7 @@ static void cliSendBatch(SCliBatch* pBatch, SCliThrd* pThrd) {
} }
conn->pBatch = pNewBatch; conn->pBatch = pNewBatch;
cliSendBatch(conn);
int32_t wLen = pBatch->wLen;
uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t));
int i = 0;
while (!EMPTY_IS_EMPTY(&pBatch->wq)) {
queue* h = QUEUE_HEAD(&pBatch->wq);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
QUEUE_REMOVE(&pMsg->q);
transQueuePush(conn->cliMsgs, pMsg);
STransConnCtx* pCtx = pCliMsg->ctx;
STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
if (pMsg->pCont == 0) {
pMsg->pCont = (void*)rpcMallocCont(0);
pMsg->contLen = 0;
}
int msgLen = transMsgLenFromCont(pMsg->contLen);
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
if (pHead->comp == 0) {
pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
pHead->msgType = pMsg->msgType;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
pHead->traceId = pMsg->info.traceId;
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
}
pHead->timestamp = taosHton64(taosGetTimestampUs());
if (pHead->comp == 0) {
if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
}
} else {
msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
}
wb[i++] = uv_buf_init((char*)pHead, msgLen);
}
pBatch->wLen = 0;
uv_write_t* req = taosMemCalloc(1, sizeof(uv_write_t));
req->data = pConn;
uv_write(req, (uv_stream_t*)conn->stream, wb, wLen, cliSendBatchCb);
taosMemoryFree(wb);
} }
static void cliSendBatchCb(uv_write_t* req, int status) { static void cliSendBatchCb(uv_write_t* req, int status) {
SCliConn* conn = req->data; SCliConn* conn = req->data;
...@@ -1075,29 +1079,34 @@ static void cliSendBatchCb(uv_write_t* req, int status) { ...@@ -1075,29 +1079,34 @@ static void cliSendBatchCb(uv_write_t* req, int status) {
static void cliHandleFastFail(SCliConn* pConn, int status) { static void cliHandleFastFail(SCliConn* pConn, int status) {
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
tError("conn %p free twice", pConn);
SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); if (pConn->pBatch == NULL) {
SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0);
STraceId* trace = &pMsg->msg.info.traceId;
tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), STraceId* trace = &pMsg->msg.info.traceId;
TMSG_INFO(pMsg->msg.msgType), pConn, pConn->ip, uv_strerror(status)); tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
TMSG_INFO(pMsg->msg.msgType), pConn, pConn->ip, uv_strerror(status));
if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
(pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip)); (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
int64_t cTimestamp = taosGetTimestampMs(); SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip));
if (item != NULL) { int64_t cTimestamp = taosGetTimestampMs();
int32_t elapse = cTimestamp - item->timestamp; if (item != NULL) {
if (elapse >= 0 && elapse <= pTransInst->failFastInterval) { int32_t elapse = cTimestamp - item->timestamp;
item->count++; if (elapse >= 0 && elapse <= pTransInst->failFastInterval) {
item->count++;
} else {
item->count = 1;
item->timestamp = cTimestamp;
}
} else { } else {
item->count = 1; SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
item->timestamp = cTimestamp; taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip), &item, sizeof(SFailFastItem));
} }
} else {
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip), &item, sizeof(SFailFastItem));
} }
} else {
cliDestroyBatch(pConn->pBatch);
pConn->pBatch = NULL;
} }
cliHandleExcept(pConn); cliHandleExcept(pConn);
} }
...@@ -1117,7 +1126,11 @@ void cliConnCb(uv_connect_t* req, int status) { ...@@ -1117,7 +1126,11 @@ void cliConnCb(uv_connect_t* req, int status) {
} }
if (status != 0) { if (status != 0) {
if (timeout == false) cliHandleFastFail(pConn, status); if (timeout == false) {
cliHandleFastFail(pConn, status);
} else if (timeout == true) {
// already deal by timeout
}
return; return;
} }
...@@ -1135,8 +1148,11 @@ void cliConnCb(uv_connect_t* req, int status) { ...@@ -1135,8 +1148,11 @@ void cliConnCb(uv_connect_t* req, int status) {
transSockInfo2Str(&sockname, pConn->src); transSockInfo2Str(&sockname, pConn->src);
tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
if (pConn->pBatch != NULL) {
cliSend(pConn); cliSendBatch(pConn);
} else {
cliSend(pConn);
}
} }
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
...@@ -1403,11 +1419,11 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -1403,11 +1419,11 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
tGTrace("%s conn %p ready", pTransInst->label, conn); tGTrace("%s conn %p ready", pTransInst->label, conn);
} }
static void cliNoBatchDealReq(queue wq, SCliThrd* pThrd) { static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) {
int count = 0; int count = 0;
while (!QUEUE_IS_EMPTY(&wq)) { while (!QUEUE_IS_EMPTY(wq)) {
queue* h = QUEUE_HEAD(&wq); queue* h = QUEUE_HEAD(wq);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
...@@ -1420,10 +1436,10 @@ static void cliNoBatchDealReq(queue wq, SCliThrd* pThrd) { ...@@ -1420,10 +1436,10 @@ static void cliNoBatchDealReq(queue wq, SCliThrd* pThrd) {
} }
} }
static void cliHandleBatch() static void cliBatchDealReq(queue wq, SCliThrd* pThrd) { static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
int count = 0; int count = 0;
while (!QUEUE_IS_EMPTY(&wq)) { while (!QUEUE_IS_EMPTY(wq)) {
queue* h = QUEUE_HEAD(&wq); queue* h = QUEUE_HEAD(wq);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
...@@ -1435,8 +1451,8 @@ static void cliHandleBatch() static void cliBatchDealReq(queue wq, SCliThrd* pTh ...@@ -1435,8 +1451,8 @@ static void cliHandleBatch() static void cliBatchDealReq(queue wq, SCliThrd* pTh
char key[TSDB_FQDN_LEN + 64] = {0}; char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port); CONN_CONSTRUCT_HASH_KEY(key, ip, port);
SCliBatch *ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key))); SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key));
if (*ppBatch == NULL) { if (ppBatch == NULL || *ppBatch == NULL) {
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
QUEUE_INIT(&pBatch->wq); QUEUE_INIT(&pBatch->wq);
QUEUE_PUSH(&pBatch->wq, h); QUEUE_PUSH(&pBatch->wq, h);
...@@ -1450,20 +1466,21 @@ static void cliHandleBatch() static void cliBatchDealReq(queue wq, SCliThrd* pTh ...@@ -1450,20 +1466,21 @@ static void cliHandleBatch() static void cliBatchDealReq(queue wq, SCliThrd* pTh
taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatch, sizeof(void*)); taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatch, sizeof(void*));
} else { } else {
QUEUE_PUSH(&(*ppBatch)->wq, h); QUEUE_PUSH(&(*ppBatch)->wq, h);
(*pBatch)->wLen += 1; (*ppBatch)->wLen += 1;
(*pBatch)->batchSize += pMsg->msg.contLen; (*ppBatch)->batchSize += pMsg->msg.contLen;
} }
return;
} }
(*cliAsyncHandle[pMsg->type])(pMsg, pThrd); (*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
count++; count++;
} }
void** pIter = taoskHashIterate(pThrd->batchCache, NULL); void** pIter = taosHashIterate(pThrd->batchCache, NULL);
while (pIter != NULL) { while (pIter != NULL) {
SCliBatch* batch = (SCliBatch*)(*pIter); SCliBatch* batch = (SCliBatch*)(*pIter);
cliSendBatch(batch, pThrd); cliHandleBatchReq(batch, pThrd);
pIter = (void**)taosHashIterate(info, pIter); pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
} }
if (count >= 2) { if (count >= 2) {
...@@ -1483,11 +1500,11 @@ static void cliAsyncCb(uv_async_t* handle) { ...@@ -1483,11 +1500,11 @@ static void cliAsyncCb(uv_async_t* handle) {
QUEUE_MOVE(&item->qmsg, &wq); QUEUE_MOVE(&item->qmsg, &wq);
taosThreadMutexUnlock(&item->mtx); taosThreadMutexUnlock(&item->mtx);
int8_t supportBatch = pTransInst->supprtBatch; int8_t supportBatch = pTransInst->supportBatch;
if (supportBatch == 0) { if (supportBatch == 0) {
cliNotBatchDealReq(wq, pThrd); cliNoBatchDealReq(&wq, pThrd);
} else if (supportBatch == 1) { } else if (supportBatch == 1) {
cliBatchDealReq(wq, pThrd); cliBatchDealReq(&wq, pThrd);
} }
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd); if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
...@@ -1704,7 +1721,7 @@ static SCliThrd* createThrdObj(void* trans) { ...@@ -1704,7 +1721,7 @@ static SCliThrd* createThrdObj(void* trans) {
pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
pTransInst->connLimitLock == 0 ? HASH_NO_LOCK : HASH_ENTRY_LOCK); pTransInst->connLimitLock == 0 ? HASH_NO_LOCK : HASH_ENTRY_LOCK);
pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, hash_no_lock); pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->quit = false; pThrd->quit = false;
return pThrd; return pThrd;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册