提交 edaec52b 编写于 作者: L Liu Jicong

refactor: add debug flag

上级 414673db
...@@ -42,23 +42,27 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -42,23 +42,27 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
// TODO: if a block was set but not consumed, // TODO: if a block was set but not consumed,
// prevent setting a different type of block // prevent setting a different type of block
pInfo->blockType = type;
pInfo->validBlockIndex = 0; pInfo->validBlockIndex = 0;
taosArrayClear(pInfo->pBlockLists); taosArrayClear(pInfo->pBlockLists);
if (type == STREAM_INPUT__DATA_SUBMIT) { if (type == STREAM_INPUT__MERGED_SUBMIT) {
ASSERT(numOfBlocks > 1);
for (int32_t i = 0; i < numOfBlocks; i++) {
SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
taosArrayPush(pInfo->pBlockLists, &pReq);
}
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
/*if (tqReaderSetDataMsg(pInfo->tqReader, input, 0) < 0) {*/ /*if (tqReaderSetDataMsg(pInfo->tqReader, input, 0) < 0) {*/
/*qError("submit msg messed up when initing stream block, %s" PRIx64, id);*/ /*qError("submit msg messed up when initing stream block, %s" PRIx64, id);*/
/*return TSDB_CODE_QRY_APP_ERROR;*/ /*return TSDB_CODE_QRY_APP_ERROR;*/
/*}*/ /*}*/
if (numOfBlocks == 1) { ASSERT(numOfBlocks == 1);
taosArrayPush(pInfo->pBlockLists, &input); /*if (numOfBlocks == 1) {*/
} else { taosArrayPush(pInfo->pBlockLists, &input);
for (int32_t i = 0; i < numOfBlocks; i++) { pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*)); /*} else {*/
taosArrayPush(pInfo->pBlockLists, &pReq); /*}*/
}
}
} else if (type == STREAM_INPUT__DATA_BLOCK) { } else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
...@@ -71,6 +75,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -71,6 +75,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
taosArrayPush(pInfo->pBlockLists, &p); taosArrayPush(pInfo->pBlockLists, &p);
} }
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
} else { } else {
ASSERT(0); ASSERT(0);
} }
......
...@@ -36,7 +36,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) ...@@ -36,7 +36,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data; SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data;
SArray* blocks = pMerged->reqs; SArray* blocks = pMerged->reqs;
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_SUBMIT, false); qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT, false);
} else { } else {
ASSERT(0); ASSERT(0);
} }
...@@ -147,7 +148,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { ...@@ -147,7 +148,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
while (1) { while (1) {
int32_t cnt = 0; int32_t cnt = 1;
void* data = NULL; void* data = NULL;
while (1) { while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册