未验证 提交 b44493e4 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #15405 from taosdata/fix/td-17801-fix

fix(stream): continuously sink
...@@ -68,10 +68,12 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -68,10 +68,12 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock; pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
// todo remove it soon // todo remove it soon
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) { if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
pInfo->mergeDataBlocks = false; pInfo->mergeDataBlocks = false;
} }
int32_t numOfRows = 4096; int32_t numOfRows = 4096;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
...@@ -181,6 +183,16 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS ...@@ -181,6 +183,16 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
return PROJECT_RETRIEVE_DONE; return PROJECT_RETRIEVE_DONE;
} }
void printDataBlock1(SSDataBlock* pBlock, const char* flag) {
if (!pBlock || pBlock->info.rows == 0) {
qDebug("===stream===printDataBlock: Block is Null or Empty");
return;
}
char* pBuf = NULL;
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf));
taosMemoryFreeClear(pBuf);
}
SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
SProjectOperatorInfo* pProjectInfo = pOperator->info; SProjectOperatorInfo* pProjectInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pProjectInfo->binfo; SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
...@@ -229,6 +241,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -229,6 +241,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// for stream interval // for stream interval
if (pBlock->info.type == STREAM_RETRIEVE) { if (pBlock->info.type == STREAM_RETRIEVE) {
// printDataBlock1(pBlock, "project1");
return pBlock; return pBlock;
} }
...@@ -302,7 +315,8 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -302,7 +315,8 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
if (pOperator->cost.openCost == 0) { if (pOperator->cost.openCost == 0) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
} }
// printDataBlock1(p, "project");
return (p->info.rows > 0) ? p : NULL; return (p->info.rows > 0) ? p : NULL;
} }
...@@ -587,4 +601,4 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) { ...@@ -587,4 +601,4 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) {
} }
return (pRes->info.rows > 0) ? pRes : NULL; return (pRes->info.rows > 0) ? pRes : NULL;
} }
\ No newline at end of file
...@@ -26,7 +26,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) ...@@ -26,7 +26,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->isDataScan); ASSERT(pTask->isDataScan);
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
qDebug("task %d %p set submit input %p %p %d", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef); qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT, false); qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT, false);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)data; SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
...@@ -72,6 +72,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) ...@@ -72,6 +72,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
continue; continue;
} }
qDebug("task %d(child %d) executed and get block");
SSDataBlock block = {0}; SSDataBlock block = {0};
assignOneDataBlock(&block, output); assignOneDataBlock(&block, output);
block.info.childId = pTask->selfChildId; block.info.childId = pTask->selfChildId;
...@@ -188,7 +190,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { ...@@ -188,7 +190,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
if (pTask->execType == TASK_EXEC__NONE) { if (pTask->execType == TASK_EXEC__NONE) {
ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK); ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
streamTaskOutput(pTask, data); streamTaskOutput(pTask, data);
return pRes; continue;
} }
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt); qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册