未验证 提交 0aaf581f 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #15282 from taosdata/feature/stream

refactor(stream): do not merge output
...@@ -215,10 +215,10 @@ int32_t tqCheckColModifiable(STQ* pTq, int32_t colId) { ...@@ -215,10 +215,10 @@ int32_t tqCheckColModifiable(STQ* pTq, int32_t colId) {
if (pIter == NULL) break; if (pIter == NULL) break;
STqHandle* pExec = (STqHandle*)pIter; STqHandle* pExec = (STqHandle*)pIter;
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
int32_t sz = taosArrayGetSize(pExec->colIdList); int32_t sz = pExec->execHandle.pSchemaWrapper->nCols;
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int32_t forbidColId = *(int32_t*)taosArrayGet(pExec->colIdList, i); SSchema* pSchema = &pExec->execHandle.pSchemaWrapper->pSchema[i];
if (forbidColId == colId) { if (pSchema->colId == colId) {
taosHashCancelIterate(pTq->handles, pIter); taosHashCancelIterate(pTq->handles, pIter);
return -1; return -1;
} }
...@@ -523,7 +523,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -523,7 +523,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
.version = ver, .version = ver,
}; };
pHandle->execHandle.execCol.task = pHandle->execHandle.execCol.task =
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, &pHandle->execHandle.pSchemaWrapper); qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols,
&pHandle->execHandle.pSchemaWrapper);
ASSERT(pHandle->execHandle.execCol.task); ASSERT(pHandle->execHandle.execCol.task);
void* scanner = NULL; void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.execCol.task, &scanner); qExtractStreamScanner(pHandle->execHandle.execCol.task, &scanner);
......
...@@ -108,6 +108,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa ...@@ -108,6 +108,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
} }
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqDebug("vgId: %d, tsdb consume over, switch to wal, ver %ld", TD_VID(pTq->pVnode), pHandle->snapshotVer + 1);
tqOffsetResetToLog(pOffset, pHandle->snapshotVer); tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
qStreamPrepareScan(task, pOffset); qStreamPrepareScan(task, pOffset);
continue; continue;
......
...@@ -120,7 +120,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO ...@@ -120,7 +120,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
return code; return code;
} }
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchemaWrapper) { qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols,
SSchemaWrapper** pSchemaWrapper) {
if (msg == NULL) { if (msg == NULL) {
// TODO create raw scan // TODO create raw scan
return NULL; return NULL;
...@@ -146,7 +147,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n ...@@ -146,7 +147,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc;
*numOfCols = 0; *numOfCols = 0;
SNode* pNode; SNode* pNode;
FOREACH(pNode, pDescNode->pSlots) { FOREACH(pNode, pDescNode->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) { if (pSlotDesc->output) {
...@@ -249,9 +250,11 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo ...@@ -249,9 +250,11 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
// add to qTaskInfo // add to qTaskInfo
// todo refactor STableList // todo refactor STableList
for(int32_t i = 0; i < taosArrayGetSize(qa); ++i) { for (int32_t i = 0; i < taosArrayGetSize(qa); ++i) {
uint64_t* uid = taosArrayGet(qa, i); uint64_t* uid = taosArrayGet(qa, i);
qDebug("table %ld added to task info", *uid);
STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0}; STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo); taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
} }
......
...@@ -341,11 +341,20 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { ...@@ -341,11 +341,20 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
return -1; return -1;
} }
} }
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/ /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/ /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
bool found = false;
#ifndef NDEBUG
qDebug("switch to next table %ld (cursor %d), %ld rows returned", uid,
pTableScanInfo->currentTable, pInfo->pTableScanOp->resultInfo.totalRows);
pInfo->pTableScanOp->resultInfo.totalRows = 0;
#endif
bool found = false;
for (int32_t i = 0; i < tableSz; i++) { for (int32_t i = 0; i < tableSz; i++) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
if (pTableInfo->uid == uid) { if (pTableInfo->uid == uid) {
......
...@@ -376,9 +376,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow ...@@ -376,9 +376,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
colDataAppendInt64(pColData, 4, &pQueryWindow->ekey); colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
} }
void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) { void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) { colDataDestroy(pColData); }
colDataDestroy(pColData);
}
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin,
SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
...@@ -524,8 +522,8 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt ...@@ -524,8 +522,8 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
// NOTE: the last parameter is the primary timestamp column // NOTE: the last parameter is the primary timestamp column
// todo: refactor this // todo: refactor this
if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) { if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
pInput->pPTS = pInput->pData[j]; // in case of merge function, this is not always the ts column data. pInput->pPTS = pInput->pData[j]; // in case of merge function, this is not always the ts column data.
// ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP); // ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
} }
ASSERT(pInput->pData[j] != NULL); ASSERT(pInput->pData[j] != NULL);
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
...@@ -633,7 +631,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc ...@@ -633,7 +631,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
ASSERT(pResult->info.capacity > 0); ASSERT(pResult->info.capacity > 0);
colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows); colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
colDataDestroy(&idata); colDataDestroy(&idata);
numOfRows = dest.numOfRows; numOfRows = dest.numOfRows;
taosArrayDestroy(pBlockList); taosArrayDestroy(pBlockList);
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) { } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
...@@ -835,7 +833,7 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_Q ...@@ -835,7 +833,7 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_Q
///////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) { STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
STimeWindow win = {0}; STimeWindow win = {0};
win.skey = taosTimeTruncate(key, pInterval, precision); win.skey = taosTimeTruncate(key, pInterval, precision);
/* /*
...@@ -2378,7 +2376,7 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -2378,7 +2376,7 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
while(1) { while (1) {
SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator); SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
if (pBlock == NULL) { if (pBlock == NULL) {
return NULL; return NULL;
...@@ -3431,13 +3429,13 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { ...@@ -3431,13 +3429,13 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol); blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol);
if (pInfo->curGroupId == 0 || pInfo->curGroupId == pBlock->info.groupId) { if (pInfo->curGroupId == 0 || pInfo->curGroupId == pBlock->info.groupId) {
pInfo->curGroupId = pBlock->info.groupId; // the first data block pInfo->curGroupId = pBlock->info.groupId; // the first data block
pInfo->totalInputRows += pBlock->info.rows; pInfo->totalInputRows += pBlock->info.rows;
taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey); taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock); taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock);
} else if (pInfo->curGroupId != pBlock->info.groupId) { // the new group data block } else if (pInfo->curGroupId != pBlock->info.groupId) { // the new group data block
pInfo->existNewGroupBlock = pBlock; pInfo->existNewGroupBlock = pBlock;
// Fill the previous group data block, before handle the data block of new group. // Fill the previous group data block, before handle the data block of new group.
...@@ -3511,7 +3509,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { ...@@ -3511,7 +3509,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) { static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
for (int32_t i = 0; i < numOfExprs; ++i) { for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExprInfo = &pExpr[i]; SExprInfo* pExprInfo = &pExpr[i];
for(int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) { for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) { if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol); taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
} }
...@@ -3604,7 +3602,7 @@ int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf ...@@ -3604,7 +3602,7 @@ int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows) { void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
ASSERT(numOfRows != 0); ASSERT(numOfRows != 0);
pResultInfo->capacity = numOfRows; pResultInfo->capacity = numOfRows;
pResultInfo->threshold = numOfRows * 0.75; pResultInfo->threshold = numOfRows * 0.75;
...@@ -3724,7 +3722,6 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -3724,7 +3722,6 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
static void freeItem(void* pItem) { static void freeItem(void* pItem) {
void** p = pItem; void** p = pItem;
if (*p != NULL) { if (*p != NULL) {
...@@ -4051,8 +4048,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t ...@@ -4051,8 +4048,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC); w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, pInfo->pFillInfo =
fillType, pColInfo, pInfo->primaryTsCol, id); taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, pInfo->primaryTsCol, id);
pInfo->win = win; pInfo->win = win;
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
...@@ -4066,7 +4063,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t ...@@ -4066,7 +4063,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
} }
} }
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
SExecTaskInfo* pTaskInfo) {
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo)); SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -4149,8 +4147,8 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, ...@@ -4149,8 +4147,8 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode,
metaReaderInit(&mr, pHandle->meta, 0); metaReaderInit(&mr, pHandle->meta, 0);
int32_t code = metaGetTableEntryByUid(&mr, pScanNode->uid); int32_t code = metaGetTableEntryByUid(&mr, pScanNode->uid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("failed to get the table meta, uid:0x%"PRIx64", suid:0x%"PRIx64 ", %s", pScanNode->uid, pScanNode->suid, qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid,
GET_TASKID(pTaskInfo)); GET_TASKID(pTaskInfo));
metaReaderClear(&mr); metaReaderClear(&mr);
return terrno; return terrno;
...@@ -4180,11 +4178,11 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, ...@@ -4180,11 +4178,11 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode,
} }
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols); int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols);
SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
pqSw->pSchema = taosMemoryCalloc(numOfCols, sizeof(SSchema)); pqSw->pSchema = taosMemoryCalloc(numOfCols, sizeof(SSchema));
for(int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i); STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i);
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
...@@ -4387,21 +4385,23 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC ...@@ -4387,21 +4385,23 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
pCond->suid = uid; pCond->suid = uid;
pCond->type = BLOCK_LOAD_OFFSET_ORDER; pCond->type = BLOCK_LOAD_OFFSET_ORDER;
pCond->startVersion = -1; pCond->startVersion = -1;
pCond->endVersion = -1; pCond->endVersion = -1;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* pUser) { STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* pUser) {
int32_t type = nodeType(pPhyNode); int32_t type = nodeType(pPhyNode);
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, int32_t code =
pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
if (code) { if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -4420,8 +4420,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4420,8 +4420,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, int32_t code =
pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
if (code) { if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -4433,8 +4434,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4433,8 +4434,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
SOperatorInfo* pOperator = SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo);
createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo);
STableScanInfo* pScanInfo = pOperator->info; STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
...@@ -4445,13 +4445,22 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4445,13 +4445,22 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
if (pHandle->vnode) { if (pHandle->vnode) {
int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, int32_t code =
pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
if (code) { if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
} }
#ifndef NDEBUG
int32_t sz = taosArrayGetSize(pTableListInfo->pTableList);
for (int32_t i = 0; i < sz; i++) {
STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
qDebug("creating stream task: add table %ld", pKeyInfo->uid);
}
} }
#endif
pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo); SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo);
...@@ -4486,7 +4495,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4486,7 +4495,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} }
SQueryTableDataCond cond = {0}; SQueryTableDataCond cond = {0};
int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond); int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
...@@ -4499,7 +4508,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4499,7 +4508,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -4961,7 +4971,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -4961,7 +4971,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
(*pTaskInfo)->sql = sql; (*pTaskInfo)->sql = sql;
(*pTaskInfo)->pSubplan = pPlan; (*pTaskInfo)->pSubplan = pPlan;
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user); (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList,
pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
if (NULL == (*pTaskInfo)->pRoot) { if (NULL == (*pTaskInfo)->pRoot) {
code = (*pTaskInfo)->code; code = (*pTaskInfo)->code;
......
...@@ -159,6 +159,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { ...@@ -159,6 +159,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
if (data == NULL) { if (data == NULL) {
data = qItem; data = qItem;
streamQueueProcessSuccess(pTask->inputQueue); streamQueueProcessSuccess(pTask->inputQueue);
if (pTask->execType == TASK_EXEC__NONE) break;
/*if (qItem->type == STREAM_INPUT__DATA_BLOCK) {*/ /*if (qItem->type == STREAM_INPUT__DATA_BLOCK) {*/
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
/*}*/ /*}*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册