diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 815d6a71ef4666e71842884409818a1719a07c5d..17954178b137cf3074236c3b37be0d2da32efa82 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3445,8 +3445,7 @@ static bool isWstartColumnExist(SFillOperatorInfo* pInfo) { } for (int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) { SExprInfo* exprInfo = pInfo->pNotFillExprInfo + i; - if (exprInfo->pExpr->nodeType == QUERY_NODE_COLUMN && - exprInfo->base.numOfParams == 1 && + if (exprInfo->pExpr->nodeType == QUERY_NODE_COLUMN && exprInfo->base.numOfParams == 1 && exprInfo->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) { return true; } @@ -3462,7 +3461,8 @@ static int32_t createWStartTsAsNotFillExpr(SFillOperatorInfo* pInfo, SFillPhysiN return TSDB_CODE_QRY_SYS_ERROR; } - SExprInfo* notFillExprs = taosMemoryRealloc(pInfo->pNotFillExprInfo, (pInfo->numOfNotFillExpr + 1) * sizeof(SExprInfo)); + SExprInfo* notFillExprs = + taosMemoryRealloc(pInfo->pNotFillExprInfo, (pInfo->numOfNotFillExpr + 1) * sizeof(SExprInfo)); if (notFillExprs == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -3473,7 +3473,7 @@ static int32_t createWStartTsAsNotFillExpr(SFillOperatorInfo* pInfo, SFillPhysiN pInfo->pNotFillExprInfo = notFillExprs; return TSDB_CODE_SUCCESS; } - + return TSDB_CODE_SUCCESS; } @@ -3513,8 +3513,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr, - (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity, - pTaskInfo->id.str, pInterval, type, order); + (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity, + pTaskInfo->id.str, pInterval, type, order); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4461,6 +4461,9 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI }; char* value = NULL; int32_t size = pAggSup->resultRowSize; + /*if (streamStateGet(pTaskInfo->streamInfo.pState, &key, (void**)&value, &size) < 0) {*/ + /*value = taosMemoryCalloc(1, size);*/ + /*}*/ if (streamStateAddIfNotExist(pTaskInfo->streamInfo.pState, &key, (void**)&value, &size) < 0) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } @@ -4474,6 +4477,7 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult) { streamStateReleaseBuf(pTaskInfo->streamInfo.pState, pKey, pResult); + /*taosMemoryFree((*(void**)pResult));*/ return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b3a85807ae4274654badace53dcb8c3f998a077e..a19b6f817936b8a360298b5b7de627b1de24f5f8 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -5704,7 +5704,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { maxTs = TMAX(maxTs, pBlock->info.window.ekey); doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap); // new disc buf - // doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap); + /*doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);*/ } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index a2a45938e4f0091b0588bcb3ca4ac653045d27f7..7cdb7c0db95cd582fad03174d0fa6927cb1fd668 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -248,9 +248,12 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, groupId); SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - // TODO: get hash function by hashMethod - uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName)); + /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ + SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo; + uint32_t hashValue = + taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); taosMemoryFree(ctbName); + bool found = false; // TODO: optimize search int32_t j;