提交 cc53df1a 编写于 作者: H Haojun Liao

[td-13039] fix bug in creating streamscanner.

上级 56f38425
......@@ -5372,7 +5372,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt
return pOperator;
}
SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExprInfo, SArray* pTableIdList, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo) {
SStreamBlockScanInfo* pInfo = calloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -5382,16 +5382,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp
return NULL;
}
// todo dynamic set the value of 4096
pInfo->pRes = createOutputBuf_rv(pExprInfo, 4096);
int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprInfo);
SArray* pColList = taosArrayInit(numOfOutput, sizeof(int32_t));
for(int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
taosArrayPush(pColList, &pExpr->pExpr->pSchema[0].colId);
}
// set the extract column id to streamHandle
tqReadHandleSetColIdList((STqReadHandle* )streamReadHandle, pColList);
int32_t code = tqReadHandleSetTbUidList(streamReadHandle, pTableIdList);
......@@ -5401,15 +5391,16 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp
return NULL;
}
pInfo->readerHandle = streamReadHandle;
pInfo->readerHandle = streamReadHandle;
pInfo->pRes = pResBlock;
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput;
pOperator->nextDataFn = doStreamBlockScan;
pOperator->numOfOutput = pResBlock->info.numOfCols;
pOperator->nextDataFn = doStreamBlockScan;
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
}
......@@ -8097,6 +8088,8 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) {
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId);
static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId);
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
static SArray* extractScanColumnId(SNodeList* pNodeList);
SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_PROJECT) { // ignore the project node
......@@ -8119,30 +8112,19 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
return createExchangeOperatorInfo(pExchange->pSrcEndPoints, pResBlock, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
STableGroupInfo groupInfo = {0};
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId);
SArray* idList = NULL;
SArray* tableIdList = extractTableIdList(&groupInfo);
if (groupInfo.numOfTables > 0) {
SArray* pa = taosArrayGetP(groupInfo.pGroupList, 0);
ASSERT(taosArrayGetSize(groupInfo.pGroupList) == 1);
SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc);
SArray* colList = extractScanColumnId(pScanPhyNode->pScanCols);
// Transfer the Array of STableKeyInfo into uid list.
size_t numOfTables = taosArrayGetSize(pa);
idList = taosArrayInit(numOfTables, sizeof(uint64_t));
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, colList, tableIdList, pTaskInfo);
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pkeyInfo = taosArrayGet(pa, i);
taosArrayPush(idList, &pkeyInfo->uid);
}
} else {
idList = taosArrayInit(4, sizeof(uint64_t));
}
// SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pScanPhyNode->pScanCols, idList, pTaskInfo);
taosArrayDestroy(idList);
// return pOperator;
taosArrayDestroy(tableIdList);
return pOperator;
}
}
......@@ -8203,7 +8185,24 @@ static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STa
return tsdbQueryTables(readHandle, &cond, pGroupInfo, queryId, taskId);
}
static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId) {
SArray* extractScanColumnId(SNodeList* pNodeList) {
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(int16_t));
if (pList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for(int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*) nodesListGetNode(pNodeList, i);
SColumnNode* pColNode = (SColumnNode*) pNode->pExpr;
taosArrayPush(pList, &pColNode->colId);
}
return pList;
}
int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId) {
int32_t code = 0;
if (tableType == TSDB_SUPER_TABLE) {
code = tsdbQuerySTableByTagCond(metaHandle, tableUid, 0, NULL, 0, 0, NULL, pGroupInfo, NULL, 0, queryId, taskId);
......@@ -8214,7 +8213,25 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t
return code;
}
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId) {
SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo) {
SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));
if (pTableGroupInfo->numOfTables > 0) {
SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, 0);
ASSERT(taosArrayGetSize(pTableGroupInfo->pGroupList) == 1);
// Transfer the Array of STableKeyInfo into uid list.
size_t numOfTables = taosArrayGetSize(pa);
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pkeyInfo = taosArrayGet(pa, i);
taosArrayPush(tableIdList, &pkeyInfo->uid);
}
}
return tableIdList;
}
tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId) {
STableGroupInfo groupInfo = {0};
uint64_t uid = pTableScanNode->scan.uid;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册