From db3cbbf22d6179a2cb9d148b0008e7cab848abbd Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sun, 8 May 2022 00:21:17 +0800 Subject: [PATCH] fix(query): column match --- source/libs/executor/src/scanoperator.c | 193 ++++++++++++------------ source/libs/stream/src/tstream.c | 2 +- 2 files changed, 101 insertions(+), 94 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 72f285231b..0296a312d0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -38,7 +38,8 @@ #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity); -static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName); +static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, + const char* dbName); static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) { for (int32_t i = 0; i < numOfOutput; ++i) { @@ -159,7 +160,8 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn return false; } -static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { +static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, + uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STableScanInfo* pInfo = pOperator->info; @@ -189,7 +191,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { pCost->loadBlockStatis += 1; - bool allColumnsHaveAgg = true; + bool allColumnsHaveAgg = true; SColumnDataAgg** pColAgg = NULL; tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg); @@ -261,7 +263,7 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { STableScanInfo* pTableScanInfo = pOperator->info; - SSDataBlock* pBlock = pTableScanInfo->pResBlock; + SSDataBlock* pBlock = pTableScanInfo->pResBlock; while (tsdbNextDataBlock(pTableScanInfo->dataReader)) { if (isTaskKilled(pOperator->pTaskInfo)) { @@ -344,7 +346,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { pTableScanInfo->scanFlag = REPEAT_SCAN; qDebug("%s start to repeat descending order scan data blocks due to query func required, qrange:%" PRId64 - "-%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); + "-%" PRId64, + GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); // do prepare for the next round table scan operation tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond); @@ -373,22 +376,22 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon pInfo->cond = *pCond; pInfo->scanInfo = (SScanInfo){.numOfAsc = scanInfo[0], .numOfDesc = scanInfo[1]}; - pInfo->interval = *pInterval; - pInfo->sampleRatio = sampleRatio; + pInfo->interval = *pInterval; + pInfo->sampleRatio = sampleRatio; pInfo->dataBlockLoadFlag = dataLoadFlag; - pInfo->pResBlock = pResBlock; - pInfo->pFilterNode = pCondition; - pInfo->dataReader = pDataReader; - pInfo->scanFlag = MAIN_SCAN; - pInfo->pColMatchInfo = pColMatchInfo; - - pOperator->name = "TableScanOperator"; // for dubug purpose - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = numOfOutput; - pOperator->pTaskInfo = pTaskInfo; + pInfo->pResBlock = pResBlock; + pInfo->pFilterNode = pCondition; + pInfo->dataReader = pDataReader; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColMatchInfo; + + pOperator->name = "TableScanOperator"; // for dubug purpose + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = numOfOutput; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, NULL, NULL, NULL, NULL); @@ -404,17 +407,17 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) { STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - pInfo->dataReader = pReadHandle; -// pInfo->prevGroupId = -1; + pInfo->dataReader = pReadHandle; + // pInfo->prevGroupId = -1; - pOperator->name = "TableSeqScanOperator"; + pOperator->name = "TableSeqScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL); return pOperator; @@ -514,18 +517,18 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo) { SColumnInfoData* pColDataInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsIndex); - TSKEY* ts = (TSKEY*)pColDataInfo->pData; + TSKEY* ts = (TSKEY*)pColDataInfo->pData; for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) { if (updateInfoIsUpdated(pInfo->pUpdateInfo, pInfo->pRes->info.uid, ts[i])) { - taosArrayPush(pInfo->tsArray, ts+i); + taosArrayPush(pInfo->tsArray, ts + i); } } if (taosArrayGetSize(pInfo->tsArray) > 0) { - //TODO(liuyao) get from tsdb - // SSDataBlock* p = createOneDataBlock(pInfo->pRes, true); - // p->info.type = STREAM_INVERT; - // taosArrayClear(pInfo->tsArray); - // return p; + // TODO(liuyao) get from tsdb + // SSDataBlock* p = createOneDataBlock(pInfo->pRes, true); + // p->info.type = STREAM_INVERT; + // taosArrayClear(pInfo->tsArray); + // return p; return NULL; } return NULL; @@ -535,7 +538,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamBlockScanInfo* pInfo = pOperator->info; - int32_t rows = 0; + int32_t rows = 0; pTaskInfo->code = pOperator->fpSet._openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) { @@ -571,7 +574,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { int32_t numOfRows = 0; int16_t outputCol = 0; - int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &uid, &numOfRows, &outputCol); + int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &uid, &numOfRows, &outputCol); if (code != TSDB_CODE_SUCCESS || numOfRows == 0) { pTaskInfo->code = code; @@ -652,8 +655,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* SArray* pColIds = taosArrayInit(4, sizeof(int16_t)); for (int32_t i = 0; i < numOfOutput; ++i) { - int16_t* id = taosArrayGet(pColList, i); - taosArrayPush(pColIds, id); + SColMatchInfo* id = taosArrayGet(pColList, i); + int16_t colId = id->colId; + taosArrayPush(pColIds, &colId); } pInfo->pColMatchInfo = pColList; @@ -678,8 +682,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* return NULL; } - pInfo->primaryTsIndex = 0; //TODO(liuyao) get it from physical plan - pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); //TODO(liuyao) get it from physical plan + pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan + pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); // TODO(liuyao) get it from physical plan if (pInfo->pUpdateInfo == NULL) { taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); @@ -687,25 +691,26 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* } pInfo->readerHandle = streamReadHandle; - pInfo->pRes = pResBlock; - pInfo->pCondition = pCondition; - - pOperator->name = "StreamBlockScanOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = pResBlock->info.numOfCols; - pOperator->fpSet._openFn = operatorDummyOpenFn; + pInfo->pRes = pResBlock; + pInfo->pCondition = pCondition; + + pOperator->name = "StreamBlockScanOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = pResBlock->info.numOfCols; + pOperator->fpSet._openFn = operatorDummyOpenFn; pOperator->fpSet.getNextFn = doStreamBlockScan; - pOperator->fpSet.closeFn = operatorDummyCloseFn; + pOperator->fpSet.closeFn = operatorDummyCloseFn; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL); + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL); return pOperator; - _error: +_error: taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); return NULL; @@ -774,7 +779,7 @@ static void getDBNameFromCondition(SNode* pCondition, const char* dbName) { if (NULL == pCondition) { return; } - nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*) dbName); + nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*)dbName); } static int32_t loadSysTableCallback(void* param, const SDataBuf* pMsg, int32_t code) { @@ -809,7 +814,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { code = filterSetDataFromSlotId(filter, ¶m1); int8_t* rowRes = NULL; - bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols); + bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols); filterFreeInfo(filter); SSDataBlock* px = createOneDataBlock(pInfo->pRes, false); @@ -853,13 +858,13 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { static SSDataBlock* buildSysTableMetaBlock() { SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - size_t size = 0; - const SSysTableMeta *pMeta = NULL; + size_t size = 0; + const SSysTableMeta* pMeta = NULL; getInfosDbMeta(&pMeta, &size); int32_t index = 0; - for(int32_t i = 0; i < size; ++i) { - if(strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) { + for (int32_t i = 0; i < size; ++i) { + if (strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) { index = i; break; } @@ -867,7 +872,7 @@ static SSDataBlock* buildSysTableMetaBlock() { pBlock->pDataBlock = taosArrayInit(pBlock->info.numOfCols, sizeof(SColumnInfoData)); - for(int32_t i = 0; i < pMeta[index].colNum; ++i) { + for (int32_t i = 0; i < pMeta[index].colNum; ++i) { SColumnInfoData colInfoData = {0}; colInfoData.info.colId = i + 1; colInfoData.info.type = pMeta[index].schema[i].type; @@ -1091,7 +1096,7 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) { SSDataBlock* p = buildSysTableMetaBlock(); blockDataEnsureCapacity(p, capacity); - size_t size = 0; + size_t size = 0; const SSysTableMeta* pSysDbTableMeta = NULL; getInfosDbMeta(&pSysDbTableMeta, &size); @@ -1100,18 +1105,19 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) { getPerfDbMeta(&pSysDbTableMeta, &size); p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB); - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock); -// blockDataDestroy(p); todo handle memory leak + relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock); + // blockDataDestroy(p); todo handle memory leak pInfo->pRes->info.rows = p->info.rows; return p->info.rows; } -int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName) { - char n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; +int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, + const char* dbName) { + char n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; int32_t numOfRows = p->info.rows; - for(int32_t i = 0; i < size; ++i) { + for (int32_t i = 0; i < size; ++i) { const SSysTableMeta* pm = &pSysDbTableMeta[i]; SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0); @@ -1132,7 +1138,7 @@ int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbT pColInfoData = taosArrayGet(p->pDataBlock, 3); colDataAppend(pColInfoData, numOfRows, (char*)&pm->colNum, false); - for(int32_t j = 4; j <= 8; ++j) { + for (int32_t j = 4; j <= 8; ++j) { pColInfoData = taosArrayGet(p->pDataBlock, j); colDataAppendNULL(pColInfoData, numOfRows); } @@ -1160,18 +1166,18 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe return NULL; } - pInfo->accountId = accountId; + pInfo->accountId = accountId; pInfo->showRewrite = showRewrite; - pInfo->pRes = pResBlock; - pInfo->pCondition = pCondition; - pInfo->scanCols = colList; + pInfo->pRes = pResBlock; + pInfo->pCondition = pCondition; + pInfo->scanCols = colList; initResultSizeInfo(pOperator, 4096); tNameAssign(&pInfo->name, pName); const char* name = tNameGetTableName(&pInfo->name); if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { - pInfo->readHandle = *(SReadHandle*) readHandle; + pInfo->readHandle = *(SReadHandle*)readHandle; blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); } else { tsem_init(&pInfo->ready, 0, 0); @@ -1201,14 +1207,14 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe #endif } - pOperator->name = "SysTableScanOperator"; + pOperator->name = "SysTableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = pResBlock->info.numOfCols; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, - NULL, NULL, NULL); + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = pResBlock->info.numOfCols; + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL); pOperator->pTaskInfo = pTaskInfo; return pOperator; @@ -1355,26 +1361,27 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { } SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, - SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { + SSDataBlock* pResBlock, SArray* pColMatchInfo, + STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->pTableGroups = pTableGroupInfo; - pInfo->pColMatchInfo = pColMatchInfo; - pInfo->pRes = pResBlock; - pInfo->readHandle = *pReadHandle; - pInfo->curPos = 0; - pOperator->name = "TagScanOperator"; + pInfo->pTableGroups = pTableGroupInfo; + pInfo->pColMatchInfo = pColMatchInfo; + pInfo->pRes = pResBlock; + pInfo->readHandle = *pReadHandle; + pInfo->curPos = 0; + pOperator->name = "TagScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pExpr = pExpr; - pOperator->numOfExprs = numOfOutput; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pExpr = pExpr; + pOperator->numOfExprs = numOfOutput; + pOperator->pTaskInfo = pTaskInfo; initResultSizeInfo(pOperator, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index a78f818ded..3b49dca800 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -152,7 +152,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in // sink if (pTask->sinkType == TASK_SINK__TABLE) { - blockDebugShowData(pRes); + /*blockDebugShowData(pRes);*/ ASSERT(pTask->tbSink.pTSchema); SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, false, pTask->tbSink.stbUid); tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema); -- GitLab