提交 4bc5466b 编写于 作者: H Haojun Liao

fix(query): add tags in stream scanner.

上级 49ac4b0e
......@@ -375,30 +375,34 @@ typedef struct SessionWindowSupporter {
} SessionWindowSupporter;
typedef struct SStreamBlockScanInfo {
SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock
SSDataBlock* pUpdateRes; // update SSDataBlock
int32_t updateResIndex;
int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned?
SColumnInfo* pCols; // the output column info
uint64_t numOfExec; // execution times
void* streamBlockReader;// stream block reader handle
SArray* pColMatchInfo; //
SNode* pCondition;
SArray* tsArray;
SUpdateInfo* pUpdateInfo;
int32_t primaryTsIndex; // primary time stamp slot id
void* pDataReader;
SReadHandle readHandle;
uint64_t tableUid; // queried super table uid
SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock
SSDataBlock* pUpdateRes; // update SSDataBlock
int32_t updateResIndex;
int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned?
SColumnInfo* pCols; // the output column info
uint64_t numOfExec; // execution times
void* streamBlockReader;// stream block reader handle
SArray* pColMatchInfo; //
SNode* pCondition;
SArray* tsArray;
SUpdateInfo* pUpdateInfo;
SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr;
int32_t primaryTsIndex; // primary time stamp slot id
void* pDataReader;
SReadHandle readHandle;
uint64_t tableUid; // queried super table uid
EStreamScanMode scanMode;
SOperatorInfo* pOperatorDumy;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SOperatorInfo* pOperatorDumy;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SCatchSupporter childAggSup;
SArray* childIds;
SArray* childIds;
SessionWindowSupporter sessionSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
} SStreamBlockScanInfo;
typedef struct SSysTableScanInfo {
......@@ -737,10 +741,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SReadHandle* pHandle,
uint64_t uid, SSDataBlock* pResBlock, SArray* pColList,
SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pCondition,
SOperatorInfo* pOperatorDumy);
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal,
......
......@@ -4493,8 +4493,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
int32_t numOfCols = 0;
tsdbReaderT pDataReader = NULL;
if (pHandle->vnode) {
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
......@@ -4503,24 +4501,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
if (pDataReader == NULL && terrno != 0) {
qDebug("pDataReader is NULL");
qDebug("%s pDataReader is NULL", GET_TASKID(pTaskInfo));
// return NULL;
} else {
qDebug("pDataReader is not NULL");
qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
}
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
SArray* tableIdList = extractTableIdList(pTableListInfo);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, tableIdList, pTableScanNode, pTaskInfo);
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
SArray* pCols =
extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator =
createStreamScanOperatorInfo(pHandle->reader, pDataReader, pHandle, pScanPhyNode->uid, pResBlock, pCols,
tableIdList, pTaskInfo, pScanPhyNode->node.pConditions, pOperatorDumy);
taosArrayDestroy(tableIdList);
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
......@@ -4915,8 +4904,7 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
return pList;
}
int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid,
STableListInfo* pListInfo, SNode* pTagCond) {
int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, SNode* pTagCond) {
int32_t code = TSDB_CODE_SUCCESS;
pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo));
......
......@@ -158,7 +158,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
return false;
}
static void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock);
static void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock);
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) {
......@@ -250,7 +250,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
// currently only the tbname pseudo column
if (pTableScanInfo->numOfPseudoExpr > 0) {
addTagPseudoColumnData(pTableScanInfo, pBlock);
addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr, pBlock);
}
int64_t st = taosGetTimestampMs();
......@@ -287,18 +287,18 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
compareTimeWindow);
}
void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock) {
// currently only the tbname pseudo column
if (pTableScanInfo->numOfPseudoExpr == 0) {
if (numOfPseudoExpr == 0) {
return;
}
SMetaReader mr = {0};
metaReaderInit(&mr, pTableScanInfo->readHandle.meta, 0);
metaReaderInit(&mr, pHandle->meta, 0);
metaGetTableEntryByUid(&mr, pBlock->info.uid);
for (int32_t j = 0; j < pTableScanInfo->numOfPseudoExpr; ++j) {
SExprInfo* pExpr = &pTableScanInfo->pPseudoExpr[j];
for (int32_t j = 0; j < numOfPseudoExpr; ++j) {
SExprInfo* pExpr = &pPseudoExpr[j];
int32_t dstSlotId = pExpr->base.resSchema.slotId;
......@@ -309,7 +309,7 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
// this is to handle the tbname
if (fmIsScanPseudoColumnFunc(functionId)) {
setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId);
setTbNameColData(pHandle->meta, pBlock, pColInfoData, functionId);
} else { // these are tags
const char* p = NULL;
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
......@@ -910,8 +910,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
pInfo->pRes->info.groupId = groupId;
}
int32_t numOfCols = pInfo->pRes->info.numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
if (!pColMatchInfo->output) {
continue;
......@@ -941,10 +940,16 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
pTaskInfo->code = terrno;
return NULL;
}
rows = pBlockInfo->rows;
// currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) {
addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes);
}
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
blockDataUpdateTsWindow(pInfo->pRes, 0);
break;
}
......@@ -972,10 +977,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
}
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SReadHandle* pHandle,
uint64_t uid, SSDataBlock* pResBlock, SArray* pColList,
SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pCondition,
SOperatorInfo* pOperatorDumy) {
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -983,22 +985,28 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
goto _error;
}
STableScanInfo* pSTInfo = (STableScanInfo*)pOperatorDumy->info;
SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan;
int32_t numOfOutput = taosArrayGetSize(pColList);
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
SArray* pColIds = taosArrayInit(4, sizeof(int16_t));
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
int32_t numOfCols = 0;
pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
int32_t numOfOutput = taosArrayGetSize(pInfo->pColMatchInfo);
SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
for (int32_t i = 0; i < numOfOutput; ++i) {
SColMatchInfo* id = taosArrayGet(pColList, i);
int16_t colId = id->colId;
SColMatchInfo* id = taosArrayGet(pInfo->pColMatchInfo, i);
int16_t colId = id->colId;
taosArrayPush(pColIds, &colId);
}
pInfo->pColMatchInfo = pColList;
// set the extract column id to streamHandle
tqReadHandleSetColIdList((STqReadHandle*)streamReadHandle, pColIds);
int32_t code = tqReadHandleSetTbUidList(streamReadHandle, pTableIdList);
tqReadHandleSetColIdList((STqReadHandle*)pHandle->reader, pColIds);
int32_t code = tqReadHandleSetTbUidList(pHandle->reader, pTableIdList);
if (code != 0) {
goto _error;
}
......@@ -1021,27 +1029,32 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
pInfo->pUpdateInfo = NULL;
}
pInfo->readHandle = *pHandle;
pInfo->tableUid = uid;
pInfo->streamBlockReader = streamReadHandle;
pInfo->pRes = pResBlock;
pInfo->pCondition = pCondition;
pInfo->pDataReader = pDataReader;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->pOperatorDumy = pOperatorDumy;
pInfo->interval = pSTInfo->interval;
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
initCacheSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval",
"/tmp/"); // TODO(liuyao) get row size from phy plan
// create the pseduo columns info
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
}
pOperator->name = "StreamBlockScanOperator";
pInfo->readHandle = *pHandle;
pInfo->tableUid = pScanPhyNode->uid;
pInfo->streamBlockReader = pHandle->reader;
pInfo->pRes = createResDataBlock(pDescNode);
pInfo->pCondition = pScanPhyNode->node.pConditions;
pInfo->pDataReader = pDataReader;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->pOperatorDumy = pTableScanDummy;
pInfo->interval = pSTInfo->interval;
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
// TODO(liuyao) get row size from phy plan
initCacheSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval", TD_TMP_DIR_PATH);
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pInfo->pRes->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册