From ea0b5b9bb9468664f91aeec57f394eb4602ab1f0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 31 May 2021 14:43:41 +0800 Subject: [PATCH] [td-4312] --- src/client/src/tscUtil.c | 102 ++++++++++++++++++++------------------ src/query/inc/qExecutor.h | 2 + src/query/src/qExecutor.c | 19 +++++++ 3 files changed, 75 insertions(+), 48 deletions(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 952246524e..c5bf04d142 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -646,9 +646,10 @@ static SColumnInfo* extractColumnInfoFromResult(SArray* pTableCols) { } typedef struct SDummyInputInfo { - SSDataBlock *block; - SSqlObj *pSql; // refactor: remove it - int32_t numOfFilterCols; + SSDataBlock *block; + STableQueryInfo *pTableQueryInfo; + SSqlObj *pSql; // refactor: remove it + int32_t numOfFilterCols; SSingleColumnFilterInfo *pFilterInfo; } SDummyInputInfo; @@ -665,7 +666,7 @@ typedef struct SJoinOperatorInfo { SRspResultInfo resultInfo; // todo refactor, add this info for each operator } SJoinOperatorInfo; -static void doSetupSDataBlock(SSqlRes* pRes, SSDataBlock* pBlock) { +static void doSetupSDataBlock(SSqlRes* pRes, SSDataBlock* pBlock, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols) { int32_t offset = 0; char* pData = pRes->data; @@ -680,6 +681,18 @@ static void doSetupSDataBlock(SSqlRes* pRes, SSDataBlock* pBlock) { offset += pColData->info.bytes; } + // filter data if needed + if (numOfFilterCols > 0) { + doSetFilterColumnInfo(pFilterInfo, numOfFilterCols, pBlock); + int8_t* p = calloc(pBlock->info.rows, sizeof(int8_t)); + bool all = doFilterDataBlock(pFilterInfo, numOfFilterCols, pBlock->info.rows, p); + if (!all) { + doCompactSDataBlock(pBlock, pBlock->info.rows, p); + } + + tfree(p); + } + // todo refactor: extract method // set the timestamp range of current result data block SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, 0); @@ -703,22 +716,11 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { SSqlRes* pRes = &pSql->res; SSDataBlock* pBlock = pInput->block; + pOperator->pRuntimeEnv->current = pInput->pTableQueryInfo; pBlock->info.rows = pRes->numOfRows; if (pRes->numOfRows != 0) { - doSetupSDataBlock(pRes, pBlock); - - if (pInput->numOfFilterCols > 0) { - doSetFilterColumnInfo(pInput->pFilterInfo, pInput->numOfFilterCols, pBlock); - int8_t* p = calloc(pBlock->info.rows, sizeof(int8_t)); - bool all = doFilterDataBlock(pInput->pFilterInfo, pInput->numOfFilterCols, pBlock->info.rows, p); - if (!all) { - doCompactSDataBlock(pBlock, pBlock->info.rows, p); - } - - tfree(p); - } - + doSetupSDataBlock(pRes, pBlock, pInput->pFilterInfo, pInput->numOfFilterCols); *newgroup = false; return pBlock; } @@ -733,7 +735,7 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { } pBlock->info.rows = pRes->numOfRows; - doSetupSDataBlock(pRes, pBlock); + doSetupSDataBlock(pRes, pBlock, pInput->pFilterInfo, pInput->numOfFilterCols); *newgroup = false; return pBlock; } @@ -890,11 +892,14 @@ static void destroyDummyInputOperator(void* param, int32_t numOfOutput) { // todo this operator servers as the adapter for Operator tree and SqlRes result, remove it later SOperatorInfo* createDummyInputOperator(SSqlObj* pSql, SSchema* pSchema, int32_t numOfCols, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols) { assert(numOfCols > 0); + STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX}; + SDummyInputInfo* pInfo = calloc(1, sizeof(SDummyInputInfo)); pInfo->pSql = pSql; pInfo->pFilterInfo = pFilterInfo; pInfo->numOfFilterCols = numOfFilterCols; + pInfo->pTableQueryInfo = createTmpTableQueryInfo(win); pInfo->block = calloc(numOfCols, sizeof(SSDataBlock)); pInfo->block->info.numOfCols = numOfCols; @@ -980,12 +985,32 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) { pRes->completed = (pRes->numOfRows == 0); } +static void createInputDataFilterInfo(SQueryInfo* px, int32_t numOfCol1, int32_t* numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo) { + SColumnInfo* tableCols = calloc(numOfCol1, sizeof(SColumnInfo)); + for(int32_t i = 0; i < numOfCol1; ++i) { + SColumn* pCol = taosArrayGetP(px->colList, i); + if (pCol->info.flist.numOfFilters > 0) { + (*numOfFilterCols) += 1; + } + + tableCols[i] = pCol->info; + } + + if ((*numOfFilterCols) > 0) { + doCreateFilterInfo(tableCols, numOfCol1, (*numOfFilterCols), pFilterInfo, 0); + } + + tfree(tableCols); +} + void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput) { // handle the following query process if (px->pQInfo == NULL) { SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->colList); - SSchema* pSchema = tscGetTableSchema(px->pTableMetaInfo[0]->pTableMeta); + STableMeta* pTableMeta = tscGetMetaInfo(px, 0)->pTableMeta; + SSchema* pSchema = tscGetTableSchema(pTableMeta); + STableGroupInfo tableGroupInfo = { .numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES), @@ -1001,23 +1026,11 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue taosArrayPush(tableGroupInfo.pGroupList, &group); // if it is a join query, create join operator here - int32_t numOfCol1 = px->pTableMetaInfo[0]->pTableMeta->tableInfo.numOfColumns; + int32_t numOfCol1 = pTableMeta->tableInfo.numOfColumns; int32_t numOfFilterCols = 0; - SColumnInfo* tableCols = calloc(numOfCol1, sizeof(SColumnInfo)); - for(int32_t i = 0; i < numOfCol1; ++i) { - SColumn* pCol = taosArrayGetP(px->colList, i); - if (pCol->info.flist.numOfFilters > 0) { - numOfFilterCols += 1; - } - - tableCols[i] = pCol->info; - } - SSingleColumnFilterInfo* pFilterInfo = NULL; - if (numOfFilterCols > 0) { - doCreateFilterInfo(tableCols, numOfCol1, numOfFilterCols, &pFilterInfo, 0); - } + createInputDataFilterInfo(px, numOfCol1, &numOfFilterCols, &pFilterInfo); SOperatorInfo* pSourceOperator = createDummyInputOperator(pSqlObjList[0], pSchema, numOfCol1, pFilterInfo, numOfFilterCols); @@ -1033,24 +1046,14 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue int32_t offset = pSourceOperator->numOfOutput; for(int32_t i = 1; i < px->numOfTables; ++i) { - SSchema* pSchema1 = tscGetTableSchema(px->pTableMetaInfo[i]->pTableMeta); - int32_t n = px->pTableMetaInfo[i]->pTableMeta->tableInfo.numOfColumns; - - int32_t numOfFilterCols1 = 0; - SColumnInfo* tableCols1 = calloc(numOfCol1, sizeof(SColumnInfo)); - for(int32_t j = 0; j < numOfCol1; ++j) { - SColumn* pCol = taosArrayGetP(px->colList, j); - if (pCol->info.flist.numOfFilters > 0) { - numOfFilterCols += 1; - } + STableMeta* pTableMeta1 = tscGetMetaInfo(px, i)->pTableMeta; - tableCols1[j] = pCol->info; - } + SSchema* pSchema1 = tscGetTableSchema(pTableMeta1); + int32_t n = pTableMeta1->tableInfo.numOfColumns; + int32_t numOfFilterCols1 = 0; SSingleColumnFilterInfo* pFilterInfo1 = NULL; - if (numOfFilterCols1 > 0) { - doCreateFilterInfo(tableCols1, numOfCol1, numOfFilterCols1, &pFilterInfo1, 0); - } + createInputDataFilterInfo(px, numOfCol1, &numOfFilterCols1, &pFilterInfo1); p[i] = createDummyInputOperator(pSqlObjList[i], pSchema1, n, pFilterInfo1, numOfFilterCols1); memcpy(&schema[offset], pSchema1, n * sizeof(SSchema)); @@ -1068,6 +1071,9 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN); tfree(pColumnInfo); tfree(schema); + + // set the pRuntimeEnv for pSourceOperator + pSourceOperator->pRuntimeEnv = &px->pQInfo->runtimeEnv; } uint64_t qId = 0; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 35799bd35f..c30856812e 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -552,6 +552,8 @@ int32_t createFilterInfo(SQueryAttr* pQueryAttr, uint64_t qId); void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); STableQueryInfo *createTableQueryInfo(SQueryAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf); +STableQueryInfo* createTmpTableQueryInfo(STimeWindow win); + int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg); bool isQueryKilled(SQInfo *pQInfo); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 05f6c235d7..348d493a52 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -3237,6 +3237,25 @@ STableQueryInfo *createTableQueryInfo(SQueryAttr* pQueryAttr, void* pTable, bool return pTableQueryInfo; } +STableQueryInfo* createTmpTableQueryInfo(STimeWindow win) { + STableQueryInfo* pTableQueryInfo = calloc(1, sizeof(STableQueryInfo)); + + pTableQueryInfo->win = win; + pTableQueryInfo->lastKey = win.skey; + + pTableQueryInfo->pTable = NULL; + pTableQueryInfo->cur.vgroupIndex = -1; + + // set more initial size of interval/groupby query + int32_t initialSize = 16; + int32_t code = initResultRowInfo(&pTableQueryInfo->resInfo, initialSize, TSDB_DATA_TYPE_INT); + if (code != TSDB_CODE_SUCCESS) { + return NULL; + } + + return pTableQueryInfo; +} + void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) { if (pTableQueryInfo == NULL) { return; -- GitLab