未验证 提交 c1ed5d58 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #13162 from taosdata/feature/3_liaohj

fix(query): add tags in stream scanner.
...@@ -375,30 +375,34 @@ typedef struct SessionWindowSupporter { ...@@ -375,30 +375,34 @@ typedef struct SessionWindowSupporter {
} SessionWindowSupporter; } SessionWindowSupporter;
typedef struct SStreamBlockScanInfo { typedef struct SStreamBlockScanInfo {
SArray* pBlockLists; // multiple SSDatablock. SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock SSDataBlock* pRes; // result SSDataBlock
SSDataBlock* pUpdateRes; // update SSDataBlock SSDataBlock* pUpdateRes; // update SSDataBlock
int32_t updateResIndex; int32_t updateResIndex;
int32_t blockType; // current block type int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned? int32_t validBlockIndex; // Is current data has returned?
SColumnInfo* pCols; // the output column info SColumnInfo* pCols; // the output column info
uint64_t numOfExec; // execution times uint64_t numOfExec; // execution times
void* streamBlockReader;// stream block reader handle void* streamBlockReader;// stream block reader handle
SArray* pColMatchInfo; // SArray* pColMatchInfo; //
SNode* pCondition; SNode* pCondition;
SArray* tsArray; SArray* tsArray;
SUpdateInfo* pUpdateInfo; SUpdateInfo* pUpdateInfo;
int32_t primaryTsIndex; // primary time stamp slot id
void* pDataReader; SExprInfo* pPseudoExpr;
SReadHandle readHandle; int32_t numOfPseudoExpr;
uint64_t tableUid; // queried super table uid
int32_t primaryTsIndex; // primary time stamp slot id
void* pDataReader;
SReadHandle readHandle;
uint64_t tableUid; // queried super table uid
EStreamScanMode scanMode; EStreamScanMode scanMode;
SOperatorInfo* pOperatorDumy; SOperatorInfo* pOperatorDumy;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SCatchSupporter childAggSup; SCatchSupporter childAggSup;
SArray* childIds; SArray* childIds;
SessionWindowSupporter sessionSup; SessionWindowSupporter sessionSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA. bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
} SStreamBlockScanInfo; } SStreamBlockScanInfo;
typedef struct SSysTableScanInfo { typedef struct SSysTableScanInfo {
...@@ -737,10 +741,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx ...@@ -737,10 +741,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SReadHandle* pHandle,
uint64_t uid, SSDataBlock* pResBlock, SArray* pColList, SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo);
SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pCondition,
SOperatorInfo* pOperatorDumy);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal, SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal,
......
...@@ -4493,8 +4493,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4493,8 +4493,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
int32_t numOfCols = 0;
tsdbReaderT pDataReader = NULL; tsdbReaderT pDataReader = NULL;
if (pHandle->vnode) { if (pHandle->vnode) {
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
...@@ -4503,24 +4501,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4503,24 +4501,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} }
if (pDataReader == NULL && terrno != 0) { if (pDataReader == NULL && terrno != 0) {
qDebug("pDataReader is NULL"); qDebug("%s pDataReader is NULL", GET_TASKID(pTaskInfo));
// return NULL; // return NULL;
} else { } else {
qDebug("pDataReader is not NULL"); qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
} }
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
SArray* tableIdList = extractTableIdList(pTableListInfo); SArray* tableIdList = extractTableIdList(pTableListInfo);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, tableIdList, pTableScanNode, pTaskInfo);
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
SArray* pCols =
extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator =
createStreamScanOperatorInfo(pHandle->reader, pDataReader, pHandle, pScanPhyNode->uid, pResBlock, pCols,
tableIdList, pTaskInfo, pScanPhyNode->node.pConditions, pOperatorDumy);
taosArrayDestroy(tableIdList); taosArrayDestroy(tableIdList);
return pOperator; return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
...@@ -4915,8 +4904,7 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod ...@@ -4915,8 +4904,7 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
return pList; return pList;
} }
int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, SNode* pTagCond) {
STableListInfo* pListInfo, SNode* pTagCond) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo)); pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo));
......
...@@ -158,7 +158,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn ...@@ -158,7 +158,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
return false; return false;
} }
static void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock); static void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock);
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) { uint32_t* status) {
...@@ -250,7 +250,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ...@@ -250,7 +250,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pTableScanInfo->numOfPseudoExpr > 0) { if (pTableScanInfo->numOfPseudoExpr > 0) {
addTagPseudoColumnData(pTableScanInfo, pBlock); addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr, pBlock);
} }
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
...@@ -287,18 +287,18 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction ...@@ -287,18 +287,18 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
compareTimeWindow); compareTimeWindow);
} }
void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) { void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock) {
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pTableScanInfo->numOfPseudoExpr == 0) { if (numOfPseudoExpr == 0) {
return; return;
} }
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pTableScanInfo->readHandle.meta, 0); metaReaderInit(&mr, pHandle->meta, 0);
metaGetTableEntryByUid(&mr, pBlock->info.uid); metaGetTableEntryByUid(&mr, pBlock->info.uid);
for (int32_t j = 0; j < pTableScanInfo->numOfPseudoExpr; ++j) { for (int32_t j = 0; j < numOfPseudoExpr; ++j) {
SExprInfo* pExpr = &pTableScanInfo->pPseudoExpr[j]; SExprInfo* pExpr = &pPseudoExpr[j];
int32_t dstSlotId = pExpr->base.resSchema.slotId; int32_t dstSlotId = pExpr->base.resSchema.slotId;
...@@ -309,7 +309,7 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) ...@@ -309,7 +309,7 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
// this is to handle the tbname // this is to handle the tbname
if (fmIsScanPseudoColumnFunc(functionId)) { if (fmIsScanPseudoColumnFunc(functionId)) {
setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId); setTbNameColData(pHandle->meta, pBlock, pColInfoData, functionId);
} else { // these are tags } else { // these are tags
const char* p = NULL; const char* p = NULL;
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) { if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
...@@ -910,8 +910,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -910,8 +910,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
pInfo->pRes->info.groupId = groupId; pInfo->pRes->info.groupId = groupId;
} }
int32_t numOfCols = pInfo->pRes->info.numOfCols; for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
if (!pColMatchInfo->output) { if (!pColMatchInfo->output) {
continue; continue;
...@@ -941,10 +940,16 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -941,10 +940,16 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
} }
rows = pBlockInfo->rows; rows = pBlockInfo->rows;
// currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) {
addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes);
}
doFilter(pInfo->pCondition, pInfo->pRes, NULL); doFilter(pInfo->pCondition, pInfo->pRes, NULL);
blockDataUpdateTsWindow(pInfo->pRes, 0); blockDataUpdateTsWindow(pInfo->pRes, 0);
break; break;
} }
...@@ -972,10 +977,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -972,10 +977,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
} }
} }
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SReadHandle* pHandle, SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo) {
uint64_t uid, SSDataBlock* pResBlock, SArray* pColList,
SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pCondition,
SOperatorInfo* pOperatorDumy) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -983,22 +985,28 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR ...@@ -983,22 +985,28 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
goto _error; goto _error;
} }
STableScanInfo* pSTInfo = (STableScanInfo*)pOperatorDumy->info; SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan;
int32_t numOfOutput = taosArrayGetSize(pColList); SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
SArray* pColIds = taosArrayInit(4, sizeof(int16_t)); STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
int32_t numOfCols = 0;
pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
int32_t numOfOutput = taosArrayGetSize(pInfo->pColMatchInfo);
SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
SColMatchInfo* id = taosArrayGet(pColList, i); SColMatchInfo* id = taosArrayGet(pInfo->pColMatchInfo, i);
int16_t colId = id->colId;
int16_t colId = id->colId;
taosArrayPush(pColIds, &colId); taosArrayPush(pColIds, &colId);
} }
pInfo->pColMatchInfo = pColList;
// set the extract column id to streamHandle // set the extract column id to streamHandle
tqReadHandleSetColIdList((STqReadHandle*)streamReadHandle, pColIds); tqReadHandleSetColIdList((STqReadHandle*)pHandle->reader, pColIds);
int32_t code = tqReadHandleSetTbUidList(streamReadHandle, pTableIdList); int32_t code = tqReadHandleSetTbUidList(pHandle->reader, pTableIdList);
if (code != 0) { if (code != 0) {
goto _error; goto _error;
} }
...@@ -1021,27 +1029,32 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR ...@@ -1021,27 +1029,32 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
pInfo->pUpdateInfo = NULL; pInfo->pUpdateInfo = NULL;
} }
pInfo->readHandle = *pHandle; // create the pseduo columns info
pInfo->tableUid = uid; if (pTableScanNode->scan.pScanPseudoCols != NULL) {
pInfo->streamBlockReader = streamReadHandle; pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
pInfo->pRes = pResBlock; }
pInfo->pCondition = pCondition;
pInfo->pDataReader = pDataReader;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->pOperatorDumy = pOperatorDumy;
pInfo->interval = pSTInfo->interval;
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
initCacheSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval",
"/tmp/"); // TODO(liuyao) get row size from phy plan
pOperator->name = "StreamBlockScanOperator"; pInfo->readHandle = *pHandle;
pInfo->tableUid = pScanPhyNode->uid;
pInfo->streamBlockReader = pHandle->reader;
pInfo->pRes = createResDataBlock(pDescNode);
pInfo->pCondition = pScanPhyNode->node.pConditions;
pInfo->pDataReader = pDataReader;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->pOperatorDumy = pTableScanDummy;
pInfo->interval = pSTInfo->interval;
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
// TODO(liuyao) get row size from phy plan
initCacheSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval", TD_TMP_DIR_PATH);
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfExprs = pResBlock->info.numOfCols; pOperator->numOfExprs = pInfo->pRes->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册