diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 006d9e749cfd273f4f112e84e435d495f29125b1..2fb934aaad735240e1a249447b5d041853819d82 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -489,7 +489,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); ASSERT(smaObj.uid != 0); char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0}; - snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb",pCreate->name); + snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb", pCreate->name); memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN); smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN); smaObj.stbUid = pStb->uid; @@ -530,7 +530,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea streamObj.sourceDbUid = pDb->uid; streamObj.targetDbUid = pDb->uid; streamObj.version = 1; - streamObj.sql = pCreate->sql; + streamObj.sql = strdup(pCreate->sql); streamObj.smaId = smaObj.uid; streamObj.watermark = pCreate->watermark; streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE; @@ -585,6 +585,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea return -1; } if (pAst != NULL) nodesDestroyNode(pAst); + nodesDestroyNode((SNode *)pPlan); int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq); @@ -609,6 +610,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea code = 0; _OVER: + tFreeStreamObj(&streamObj); mndDestroySmaObj(&smaObj); mndTransDrop(pTrans); return code; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 0567ec4e1425a1815af45448505a47cfada71259..09eed7fb32e8831e6b6c863b44edd3e9e28110a3 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -509,6 +509,7 @@ int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) { pVgroup->replica = 1; if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1; + taosArrayDestroy(pArray); mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId); return 0; @@ -1862,4 +1863,4 @@ _OVER: #endif } -bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; } \ No newline at end of file +bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index b8803b20fc21e68b2af71c1d80e2475ef06aae63..e6a331f20e1943a3e40b672a0ef214322db09c5c 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -341,7 +341,7 @@ FAIL: return -1; } -void tqReaderSetColIdList(STqReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; } +void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; } int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) { if (pReader->tbIdHash) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 6134e9eafac9e1ad6d345e11d49b88447caf9a0c..9366f014c1fdd68747bb5f42d9e1131e9e1ae9ae 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3223,8 +3223,8 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa } static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); -static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo *pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, - SExecTaskInfo* pTaskInfo) { +static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, + SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) { pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; SSDataBlock* pResBlock = pInfo->pFinalRes; @@ -3248,8 +3248,8 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo *pOperator, SFillOp pInfo->existNewGroupBlock = NULL; } -static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, - SExecTaskInfo* pTaskInfo) { +static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, + SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) { if (taosFillHasMoreResults(pInfo->pFillInfo)) { int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows; taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows); @@ -3265,8 +3265,8 @@ static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOpera static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) { SFillOperatorInfo* pInfo = pOperator->info; - SExprSupp* pSup = &pOperator->exprSupp; - SSDataBlock* pResBlock = pInfo->pFinalRes; + SExprSupp* pSup = &pOperator->exprSupp; + SSDataBlock* pResBlock = pInfo->pFinalRes; setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false); projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL); @@ -3276,13 +3276,13 @@ static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlo SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, pInfo->primarySrcSlotId); colDataAssign(pDst, pSrc, pInfo->pRes->info.rows, &pResBlock->info); - for(int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) { + for (int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) { SFillColInfo* pCol = &pInfo->pFillInfo->pFillCol[i + pInfo->numOfExpr]; ASSERT(pCol->notFillCol); SExprInfo* pExpr = pCol->pExpr; - int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId; - int32_t dstSlotId = pExpr->base.resSchema.slotId; + int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId; + int32_t dstSlotId = pExpr->base.resSchema.slotId; SColumnInfoData* pDst1 = taosArrayGet(pInfo->pRes->pDataBlock, dstSlotId); SColumnInfoData* pSrc1 = taosArrayGet(pBlock->pDataBlock, srcSlotId); @@ -3670,7 +3670,7 @@ void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { taosRemoveRef(exchangeObjRefPool, pExInfo->self); } -void freeSourceDataInfo(void *p) { +void freeSourceDataInfo(void* p) { SSourceDataInfo* pInfo = (SSourceDataInfo*)p; taosMemoryFreeClear(pInfo->pRsp); } @@ -3700,8 +3700,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey); w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC); - pInfo->pFillInfo = - taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo, pInfo->primaryTsCol, order, id); + pInfo->pFillInfo = taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo, + pInfo->primaryTsCol, order, id); pInfo->win = win; pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); @@ -3727,10 +3727,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr); pInfo->pNotFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr); - SInterval* pInterval = + SInterval* pInterval = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType - ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval - : &((SIntervalAggOperatorInfo*)downstream->info)->interval; + ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval + : &((SIntervalAggOperatorInfo*)downstream->info)->interval; int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; int32_t type = convertFillType(pPhyFillNode->mode); @@ -3747,9 +3747,9 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); - int32_t code = - initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr, (SNodeListNode*)pPhyFillNode->pValues, - pPhyFillNode->timeRange, pResultInfo->capacity, pTaskInfo->id.str, pInterval, type, order); + int32_t code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr, + (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity, + pTaskInfo->id.str, pInterval, type, order); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index aca0078592ec1f85c89fbc28788733a5721f2118..7b13aa8ad8fe5f33a8427d5a4d1e49e2ef1690ff 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -139,7 +139,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn } assert(w.ekey > pBlockInfo->window.ekey); - if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) { + if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) { return true; } } @@ -147,7 +147,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey); assert(w.skey <= pBlockInfo->window.ekey); - if (w.skey > pBlockInfo->window.skey) { + if (TMAX(w.skey, pBlockInfo->window.skey) <= TMIN(w.ekey, pBlockInfo->window.ekey)) { return true; } @@ -158,7 +158,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn } assert(w.skey < pBlockInfo->window.skey); - if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) { + if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) { return true; } } @@ -1649,6 +1649,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys } taosArrayDestroy(tableIdList); memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->cond, sizeof(SQueryTableDataCond)); + } else { + taosArrayDestroy(pColIds); } // create the pseduo columns info @@ -2038,10 +2040,34 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { metaReaderClear(&smr); if (numOfRows >= pOperator->resultInfo.capacity) { - break; + p->info.rows = numOfRows; + pInfo->pRes->info.rows = numOfRows; + + relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + doFilterResult(pInfo); + + blockDataCleanup(p); + numOfRows = 0; + + if (pInfo->pRes->info.rows > 0) { + break; + } } } + if (numOfRows > 0) { + p->info.rows = numOfRows; + pInfo->pRes->info.rows = numOfRows; + + relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + doFilterResult(pInfo); + + blockDataCleanup(p); + numOfRows = 0; + } + + blockDataDestroy(p); + // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found if (ret != 0) { metaCloseTbCursor(pInfo->pCur); @@ -2049,14 +2075,6 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { doSetOperatorCompleted(pOperator); } - p->info.rows = numOfRows; - pInfo->pRes->info.rows = numOfRows; - - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); - doFilterResult(pInfo); - - blockDataDestroy(p); - pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; } @@ -2213,10 +2231,34 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { colDataAppend(pColInfoData, numOfRows, n, false); if (++numOfRows >= pOperator->resultInfo.capacity) { - break; + p->info.rows = numOfRows; + pInfo->pRes->info.rows = numOfRows; + + relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + doFilterResult(pInfo); + + blockDataCleanup(p); + numOfRows = 0; + + if (pInfo->pRes->info.rows > 0) { + break; + } } } + if (numOfRows > 0) { + p->info.rows = numOfRows; + pInfo->pRes->info.rows = numOfRows; + + relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + doFilterResult(pInfo); + + blockDataCleanup(p); + numOfRows = 0; + } + + blockDataDestroy(p); + // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found if (ret != 0) { metaCloseTbCursor(pInfo->pCur); @@ -2224,14 +2266,6 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { doSetOperatorCompleted(pOperator); } - p->info.rows = numOfRows; - pInfo->pRes->info.rows = numOfRows; - - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); - doFilterResult(pInfo); - - blockDataDestroy(p); - pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 757ab09d1ab843e733a0d631bd4fafa3fbc2e095..02be01cdb831d2bb6c91a6ffbe8b80fae9eb55ee 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1706,14 +1706,19 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { blockDataDestroy(pInfo->pPullDataRes); taosArrayDestroy(pInfo->pRecycledPages); blockDataDestroy(pInfo->pUpdateRes); + taosArrayDestroy(pInfo->pDelWins); + blockDataDestroy(pInfo->pDelRes); if (pInfo->pChildren) { int32_t size = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i); destroyStreamFinalIntervalOperatorInfo(pChildOp->info, numOfOutput); + taosMemoryFree(pChildOp->pDownstream); + cleanupExprSupp(&pChildOp->exprSupp); taosMemoryFreeClear(pChildOp); } + taosArrayDestroy(pInfo->pChildren); } nodesDestroyNode((SNode*)pInfo->pPhyNode); colDataDestroy(&pInfo->twAggSup.timeWindowData); @@ -2644,7 +2649,6 @@ void destroyTimeSliceOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFreeClear(param); } - SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index d588d90543d90e19699c5c901672b6ab4fdd2c88..4009a47c65af469bc8a6f4fe5443411306e4ec2b 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -152,11 +152,12 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } void tFreeSStreamTask(SStreamTask* pTask) { + qDebug("free stream task %d", pTask->taskId); if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); if (pTask->outputQueue) streamQueueClose(pTask->outputQueue); if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg); if (pTask->exec.executor) qDestroyTask(pTask->exec.executor); - taosArrayDestroy(pTask->childEpInfo); + taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree); if (pTask->outputType == TASK_OUTPUT__TABLE) { tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper); taosMemoryFree(pTask->tbSink.pTSchema); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index bc1c6386f6e0d9a61b21e5ca0e3aa198c1da043e..c6f3066be72c289b3e2bd57d1b4b995c15fc7dac 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -104,6 +104,12 @@ typedef void* queue[2]; #define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) +#define TRANS_PACKET_LIMIT 1024 * 1024 * 512 + +#define TRANS_MAGIC_NUM 0x5f375a86 + +#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) + typedef SRpcMsg STransMsg; typedef SRpcCtx STransCtx; typedef SRpcCtxVal STransCtxVal; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8fdfcd5309c927e4e49386340fbd44496a4a7688..763483cbf6245448afa6d70e7952862ce88cbfee 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -318,10 +318,17 @@ void cliHandleResp(SCliConn* conn) { } STransMsgHead* pHead = NULL; - transDumpFromBuffer(&conn->readBuf, (char**)&pHead); + if (transDumpFromBuffer(&conn->readBuf, (char**)&pHead) <= 0) { + tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); + return; + } pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); + if (cliRecvReleaseReq(conn, pHead)) { + return; + } + STransMsg transMsg = {0}; transMsg.contLen = transContLenFromMsg(pHead->msgLen); transMsg.pCont = transContFromHead((char*)pHead); @@ -333,10 +340,6 @@ void cliHandleResp(SCliConn* conn) { SCliMsg* pMsg = NULL; STransConnCtx* pCtx = NULL; - if (cliRecvReleaseReq(conn, pHead)) { - return; - } - if (CONN_NO_PERSIST_BY_APP(conn)) { pMsg = transQueuePop(&conn->cliMsgs); @@ -598,7 +601,12 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { pBuf->len += nread; while (transReadComplete(pBuf)) { tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn); - cliHandleResp(conn); + if (pBuf->invalid) { + cliHandleExcept(conn); + break; + } else { + cliHandleResp(conn); + } } return; } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 3ba8186e9dbebb52260eeb24766de603f957a9e3..a4d679b281512ff13757eab7c9c42a11e0edb36b 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -188,7 +188,6 @@ bool transReadComplete(SConnBuffer* connBuf) { p->left = 0; } } - return (p->left == 0 || p->invalid) ? true : false; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index c980b70abddd570e63d424842205e5147e2ae31f..db05aefe7b1930b286e3a2cc8aa92a251077939d 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -198,15 +198,16 @@ static bool uvHandleReq(SSvrConn* pConn) { pHead->msgLen = htonl(pHead->msgLen); memcpy(pConn->user, pHead->user, strlen(pHead->user)); + if (uvRecvReleaseReq(pConn, pHead)) { + return true; + } + // TODO(dengyihao): time-consuming task throwed into BG Thread // uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t)); // wreq->data = pConn; // uv_read_stop((uv_stream_t*)pConn->pTcp); // transRefSrvHandle(pConn); // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); - if (uvRecvReleaseReq(pConn, pHead)) { - return true; - } STransMsg transMsg; memset(&transMsg, 0, sizeof(transMsg)); @@ -274,14 +275,16 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { if (pBuf->len <= TRANS_PACKET_LIMIT) { while (transReadComplete(pBuf)) { tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); - if (uvHandleReq(conn) == false) { + if (pBuf->invalid) { + tTrace("%s conn %p alread read invalid packet", transLabel(pTransInst), conn); destroyConn(conn, true); return; + } else { + if (false == uvHandleReq(conn)) break; } } return; } else { - tError("%s conn %p read unexpected packet, exceed limit", transLabel(pTransInst), conn); destroyConn(conn, true); return; } @@ -872,6 +875,7 @@ static int reallocConnRef(SSvrConn* conn) { } static void uvDestroyConn(uv_handle_t* handle) { SSvrConn* conn = handle->data; + if (conn == NULL) { return; } @@ -887,9 +891,8 @@ static void uvDestroyConn(uv_handle_t* handle) { SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i); destroySmsg(msg); } - - transReqQueueClear(&conn->wreqQueue); transQueueDestroy(&conn->srvMsgs); + transReqQueueClear(&conn->wreqQueue); QUEUE_REMOVE(&conn->queue); taosMemoryFree(conn->pTcp); diff --git a/tests/script/tsim/stream/basic0.sim b/tests/script/tsim/stream/basic0.sim index 9a5fb8012f082ca9a4d10f0ef8f6c2c710d0ac54..6d05f69dcfc542de9b4ebb195479263db4d76881 100644 --- a/tests/script/tsim/stream/basic0.sim +++ b/tests/script/tsim/stream/basic0.sim @@ -1,7 +1,7 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 -system sh/exec.sh -n dnode1 -s start -sleep 50 +system sh/cfg.sh -n dnode1 -c debugflag -v 131 +system sh/exec.sh -n dnode1 -s start -v sql connect print =============== create database @@ -137,4 +137,17 @@ if $data13 != 789 then return -1 endi +_OVER: system sh/exec.sh -n dnode1 -s stop -x SIGINT +print =============== check +$null= + +system_content sh/checkValgrind.sh -n dnode1 +print cmd return result ----> [ $system_content ] +if $system_content > 0 then + return -1 +endi + +if $system_content == $null then + return -1 +endi