未验证 提交 38424a65 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #16790 from taosdata/feature/stream

feat(stream): support hash suffix and prefix
...@@ -3445,8 +3445,7 @@ static bool isWstartColumnExist(SFillOperatorInfo* pInfo) { ...@@ -3445,8 +3445,7 @@ static bool isWstartColumnExist(SFillOperatorInfo* pInfo) {
} }
for (int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) { for (int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) {
SExprInfo* exprInfo = pInfo->pNotFillExprInfo + i; SExprInfo* exprInfo = pInfo->pNotFillExprInfo + i;
if (exprInfo->pExpr->nodeType == QUERY_NODE_COLUMN && if (exprInfo->pExpr->nodeType == QUERY_NODE_COLUMN && exprInfo->base.numOfParams == 1 &&
exprInfo->base.numOfParams == 1 &&
exprInfo->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) { exprInfo->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) {
return true; return true;
} }
...@@ -3462,7 +3461,8 @@ static int32_t createWStartTsAsNotFillExpr(SFillOperatorInfo* pInfo, SFillPhysiN ...@@ -3462,7 +3461,8 @@ static int32_t createWStartTsAsNotFillExpr(SFillOperatorInfo* pInfo, SFillPhysiN
return TSDB_CODE_QRY_SYS_ERROR; return TSDB_CODE_QRY_SYS_ERROR;
} }
SExprInfo* notFillExprs = taosMemoryRealloc(pInfo->pNotFillExprInfo, (pInfo->numOfNotFillExpr + 1) * sizeof(SExprInfo)); SExprInfo* notFillExprs =
taosMemoryRealloc(pInfo->pNotFillExprInfo, (pInfo->numOfNotFillExpr + 1) * sizeof(SExprInfo));
if (notFillExprs == NULL) { if (notFillExprs == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
...@@ -3473,7 +3473,7 @@ static int32_t createWStartTsAsNotFillExpr(SFillOperatorInfo* pInfo, SFillPhysiN ...@@ -3473,7 +3473,7 @@ static int32_t createWStartTsAsNotFillExpr(SFillOperatorInfo* pInfo, SFillPhysiN
pInfo->pNotFillExprInfo = notFillExprs; pInfo->pNotFillExprInfo = notFillExprs;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3513,8 +3513,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -3513,8 +3513,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
&numOfOutputCols, COL_MATCH_FROM_SLOT_ID); &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr, code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr,
(SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
pTaskInfo->id.str, pInterval, type, order); pTaskInfo->id.str, pInterval, type, order);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -4461,6 +4461,9 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI ...@@ -4461,6 +4461,9 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI
}; };
char* value = NULL; char* value = NULL;
int32_t size = pAggSup->resultRowSize; int32_t size = pAggSup->resultRowSize;
/*if (streamStateGet(pTaskInfo->streamInfo.pState, &key, (void**)&value, &size) < 0) {*/
/*value = taosMemoryCalloc(1, size);*/
/*}*/
if (streamStateAddIfNotExist(pTaskInfo->streamInfo.pState, &key, (void**)&value, &size) < 0) { if (streamStateAddIfNotExist(pTaskInfo->streamInfo.pState, &key, (void**)&value, &size) < 0) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
...@@ -4474,6 +4477,7 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI ...@@ -4474,6 +4477,7 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI
int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult) { int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult) {
streamStateReleaseBuf(pTaskInfo->streamInfo.pState, pKey, pResult); streamStateReleaseBuf(pTaskInfo->streamInfo.pState, pKey, pResult);
/*taosMemoryFree((*(void**)pResult));*/
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -5704,7 +5704,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5704,7 +5704,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.window.ekey);
doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap); doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap);
// new disc buf // new disc buf
// doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap); /*doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);*/
} }
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
......
...@@ -248,9 +248,12 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S ...@@ -248,9 +248,12 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, groupId); char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, groupId);
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
// TODO: get hash function by hashMethod /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName)); SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo;
uint32_t hashValue =
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
taosMemoryFree(ctbName); taosMemoryFree(ctbName);
bool found = false; bool found = false;
// TODO: optimize search // TODO: optimize search
int32_t j; int32_t j;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册