提交 1d52dd9d 编写于 作者: H Haojun Liao

[td-13039] fix bug.

上级 f6bcae19
......@@ -415,6 +415,7 @@ typedef struct STableScanInfo {
int32_t* rowCellInfoOffset;
SExprInfo* pExpr;
SSDataBlock block;
SArray* pColMatchInfo;
int32_t numOfOutput;
int64_t elapsedTime;
int32_t prevGroupId; // previous table group id
......@@ -646,8 +647,8 @@ typedef struct SDistinctOperatorInfo {
} SDistinctOperatorInfo;
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime,
int32_t reverseTime, SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
......
......@@ -66,6 +66,11 @@ typedef enum SResultTsInterpType {
RESULT_ROW_END_INTERP = 2,
} SResultTsInterpType;
typedef struct SColMatchInfo {
int32_t colId;
int32_t targetSlotId;
} SColMatchInfo;
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
uint32_t v = taosRand();
......@@ -2944,12 +2949,21 @@ int32_t loadDataBlock(SExecTaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo,
*status = BLK_DATA_ALL_NEEDED;
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
if (pBlock->pDataBlock == NULL) {
SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
if (pCols == NULL) {
return terrno;
} else {
return TSDB_CODE_SUCCESS;
}
int32_t numOfCols = pBlock->info.numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pCols, i);
SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
ASSERT(pColMatchInfo->colId == p->info.colId);
taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p);
}
return TSDB_CODE_SUCCESS;
}
int32_t loadDataBlockOnDemand(SExecTaskInfo *pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
......@@ -5374,7 +5388,8 @@ SSDataBlock* createResultDataBlock(const SArray* pExprInfo) {
return pResBlock;
}
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo,
SExecTaskInfo* pTaskInfo) {
assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
......@@ -5387,12 +5402,19 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
return NULL;
}
pInfo->block.pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
for(int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData idata = {0};
taosArrayPush(pInfo->block.pDataBlock, &idata);
}
pInfo->pTsdbReadHandle = pTsdbReadHandle;
pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime;
pInfo->order = order;
pInfo->current = 0;
pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColMatchInfo;
pOperator->name = "TableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blockingOptr = false;
......@@ -8497,6 +8519,7 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
static SArray* extractScanColumnId(SNodeList* pNodeList);
static SArray* extractColumnInfo(SNodeList* pNodeList);
static SArray* extractColMatchInfo(SNodeList* pNodeList);
SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
......@@ -8505,7 +8528,9 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols);
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo);
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols);
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pColList, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc);
......@@ -8683,6 +8708,28 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
return pList;
}
SArray* extractColMatchInfo(SNodeList* pNodeList) {
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo));
if (pList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for(int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*) nodesListGetNode(pNodeList, i);
SColumnNode* pColNode = (SColumnNode*) pNode->pExpr;
SColMatchInfo c = {0};
c.colId = pColNode->colId;
c.targetSlotId = pNode->slotId;
taosArrayPush(pList, &c);
}
return pList;
}
int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId) {
int32_t code = 0;
if (tableType == TSDB_SUPER_TABLE) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册