diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 919e05ddd0c5fe67bd548ab100736725ee1441f2..8cfab02be02cada4dfce66687dfbc845026aaf61 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -211,6 +211,8 @@ bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid); SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema); void tscColumnListDestroy(SArray* pColList); +void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo); + void tscDequoteAndTrimToken(SStrToken* pToken); int32_t tscValidateName(SStrToken* pToken); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 2ebba3a492fe1eeb8b99482ef23d57558bb23665..56d61fba8638b923ba7575771a6f9703fd80ca2c 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -218,11 +218,11 @@ typedef struct SQueryInfo { SQInfo* pQInfo; // global merge operator SArray* pDSOperator; // data source operator SArray* pPhyOperator; // physical query execution plan - SQueryAttr* pQueryAttr; // query object + SQueryAttr* pQueryAttr; // query object struct SQueryInfo *sibling; // sibling - SArray *pUpstream; // SArray - SArray *pDownstream; // SArray + SArray *pUpstream; // SArray + struct SQueryInfo *pDownstream; } SQueryInfo; typedef struct { diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 60f6f543356d5ca1e95a408e5d7c43463db3331d..ca293a8cf9ea6ffc6376e659b5a58a204def1d68 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -77,7 +77,7 @@ static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SC static uint8_t convertOptr(SStrToken *pToken); -static int32_t validateSelectNodeList(SSqlCmd* pCmd, int32_t clauseIndex, SArray* pSelNodeList, bool isSTable, bool joinQuery, bool timeWindowQuery); +static int32_t validateSelectNodeList(SSqlCmd* pCmd, int32_t clauseIndex, SQueryInfo* pQueryInfo, SArray* pSelNodeList, bool isSTable, bool joinQuery, bool timeWindowQuery); static bool validateIpAddress(const char* ip, size_t size); static bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo); @@ -1607,7 +1607,7 @@ bool isValidDistinctSql(SQueryInfo* pQueryInfo) { return false; } -int32_t validateSelectNodeList(SSqlCmd* pCmd, int32_t clauseIndex, SArray* pSelNodeList, bool isSTable, bool joinQuery, +int32_t validateSelectNodeList(SSqlCmd* pCmd, int32_t clauseIndex, SQueryInfo* pQueryInfo, SArray* pSelNodeList, bool isSTable, bool joinQuery, bool timeWindowQuery) { assert(pSelNodeList != NULL && pCmd != NULL); @@ -1617,8 +1617,6 @@ int32_t validateSelectNodeList(SSqlCmd* pCmd, int32_t clauseIndex, SArray* pSelN const char* msg4 = "only support distinct one tag"; const char* msg5 = "invalid function name"; - SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, clauseIndex); - // too many result columns not support order by in query if (taosArrayGetSize(pSelNodeList) > TSDB_MAX_COLUMNS) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); @@ -6710,7 +6708,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { } bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); - if (validateSelectNodeList(&pSql->cmd, 0, pQuerySqlNode->pSelNodeList, isSTable, false, false) != TSDB_CODE_SUCCESS) { + if (validateSelectNodeList(&pSql->cmd, 0, pQueryInfo, pQuerySqlNode->pSelNodeList, isSTable, false, false) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } @@ -6932,18 +6930,30 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind } if (pQuerySqlNode->from->type == SQL_NODE_FROM_SUBQUERY) { - // support only one nestquery + // parse the subquery in the first place + code = validateSqlNode(pSql, pQuerySqlNode->from->pNode.pClause[0], 0); + if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + return code; + } + pQueryInfo = pCmd->pQueryInfo[0]; + SQueryInfo* current = calloc(1, sizeof(SQueryInfo)); + + tscInitQueryInfo(current); + taosArrayPush(current->pUpstream, &pQueryInfo); + + STableMeta* pTableMeta = extractTempTableMetaFromNestQuery(pQueryInfo); STableMetaInfo* pTableMetaInfo1 = calloc(1, sizeof(STableMetaInfo)); - STableMeta* pTableMeta = extractTempTableMetaFromNestQuery(taosArrayGetP(pQueryInfo->pUpstream, 0)); pTableMetaInfo1->pTableMeta = pTableMeta; - pQueryInfo->pTableMetaInfo = calloc(1, POINTER_BYTES); - pQueryInfo->pTableMetaInfo[0] = pTableMetaInfo1; + current->pTableMetaInfo = calloc(1, POINTER_BYTES); + current->pTableMetaInfo[0] = pTableMetaInfo1; - // parse the group by clause in the first place - if (validateSelectNodeList(pCmd, index, pQuerySqlNode->pSelNodeList, false, false, false) != TSDB_CODE_SUCCESS) { + pCmd->pQueryInfo[0] = current; + pQueryInfo->pDownstream = current; + + if (validateSelectNodeList(pCmd, index, current, pQuerySqlNode->pSelNodeList, false, false, false) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } @@ -7002,7 +7012,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind int32_t timeWindowQuery = (TPARSER_HAS_TOKEN(pQuerySqlNode->interval.interval) || TPARSER_HAS_TOKEN(pQuerySqlNode->sessionVal.gap)); - if (validateSelectNodeList(pCmd, index, pQuerySqlNode->pSelNodeList, isSTable, joinQuery, timeWindowQuery) != + if (validateSelectNodeList(pCmd, index, pQueryInfo, pQuerySqlNode->pSelNodeList, isSTable, joinQuery, timeWindowQuery) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 5590c9449fd1ea61aad256862f90e3df6cf0e61b..764876f3dd3510f02dbbab8a6703451d7bf8e674 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -615,8 +615,8 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) { tableSerialize + sqlLen + 4096 + pQueryInfo->bufLen; } -static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg, int32_t *succeed) { - STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0); +static char *doSerializeTableInfo(SQueryTableMsg *pQueryMsg, SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, char *pMsg, + int32_t *succeed) { TSKEY dfltKey = htobe64(pQueryMsg->window.skey); STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; @@ -878,7 +878,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t succeed = 1; // serialize the table info (sid, uid, tags) - pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg, &succeed); + pMsg = doSerializeTableInfo(pQueryMsg, pSql, pTableMetaInfo, pMsg, &succeed); if (succeed == 0) { return TSDB_CODE_TSC_APP_ERROR; } @@ -1591,18 +1591,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { uint64_t localQueryId = 0; qTableQuery(pQueryInfo->pQInfo, &localQueryId); - SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf; - pRes->numOfRows = (p != NULL)? p->info.rows: 0; - - //pRes->code = tscDoLocalMerge(pSql); - - if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) { - tscCreateResPointerInfo(pRes, pQueryInfo); - tscSetResRawPtrRv(pRes, pQueryInfo, p); - } - - pRes->row = 0; - pRes->completed = (pRes->numOfRows == 0); + convertQueryResult(pRes, pQueryInfo); code = pRes->code; if (pRes->code == TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 652551e34dbde73eaca6c37b2168214d40374433..54ef04e6a59a1c3c0a0834e0c13b08c05b78c8ae 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -584,7 +584,6 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { pQueryInfo->fieldsInfo = pSupporter->fieldsInfo; pQueryInfo->groupbyExpr = pSupporter->groupInfo; pQueryInfo->pUpstream = taosArrayInit(4, sizeof(POINTER_BYTES)); - pQueryInfo->pDownstream = taosArrayInit(4, sizeof(POINTER_BYTES)); assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1); @@ -3590,7 +3589,7 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST STsBufInfo bufInfo = {0}; SQueryParam param = {.pOperator = pa}; - /*int32_t code = */initQInfo(&bufInfo, NULL, pQInfo, ¶m, NULL, 0, merger); + /*int32_t code = */initQInfo(&bufInfo, NULL, pSourceOperator, pQInfo, ¶m, NULL, 0, merger); return pQInfo; _cleanup: diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b9c68ebdce0a09bd97aa8393bfc1642df049048a..77d8d3e130a09244caa4eb3784c4796f3dc81081 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -665,12 +665,24 @@ SOperatorInfo* createDummyInputOperator(char* pResult, SSchema* pSchema, int32_t return pOptr; } +void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) { + // set the correct result + SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf; + pRes->numOfRows = (p != NULL)? p->info.rows: 0; + + if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) { + tscCreateResPointerInfo(pRes, pQueryInfo); + tscSetResRawPtrRv(pRes, pQueryInfo, p); + } + + pRes->row = 0; + pRes->completed = (pRes->numOfRows == 0); +} + void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) { - if (pQueryInfo->pDownstream != NULL && taosArrayGetSize(pQueryInfo->pDownstream) > 0) { + if (pQueryInfo->pDownstream != NULL) { // handle the following query process - SQueryInfo* px = taosArrayGetP(pQueryInfo->pDownstream, 0); - printf("%d\n", px->type); - + SQueryInfo *px = pQueryInfo->pDownstream; SColumnInfo* colInfo = extractColumnInfoFromResult(px->pTableMetaInfo[0]->pTableMeta, px->colList); int32_t numOfOutput = tscSqlExprNumOfExprs(px); @@ -696,11 +708,11 @@ void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) { SOperatorInfo* pSourceOptr = createDummyInputOperator((char*)pRes, pSchema, numOfOutput); - SQInfo* pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOptr, NULL, NULL, MASTER_SCAN); - SSDataBlock* pres = pQInfo->runtimeEnv.outputBuf; + px->pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOptr, NULL, NULL, MASTER_SCAN); - // build result - pRes->numOfRows = pres->info.rows; + uint64_t qId = 0; + qTableQuery(px->pQInfo, &qId); + convertQueryResult(pRes, px); } } @@ -2172,7 +2184,6 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) { pQueryInfo->slimit.limit = -1; pQueryInfo->slimit.offset = 0; pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES); - pQueryInfo->pDownstream = taosArrayInit(4, POINTER_BYTES); } int32_t tscAddQueryInfo(SSqlCmd* pCmd) { @@ -2223,10 +2234,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) { tfree(pQueryInfo->buf); taosArrayDestroy(pQueryInfo->pUpstream); - taosArrayDestroy(pQueryInfo->pDownstream); - pQueryInfo->pUpstream = NULL; - pQueryInfo->pDownstream = NULL; } void tscClearSubqueryInfo(SSqlCmd* pCmd) { @@ -2697,6 +2705,8 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { SQueryInfo* pq = taosArrayGetP(pQueryInfo->pUpstream, 0); pSql->cmd.active = pq; + pSql->cmd.command = TSDB_SQL_SELECT; + executeQuery(pSql, pq); // merge nest query result and generate final results diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index e7ff0f32eda1381a8f9721c03a0ee3bbe2aa79dc..84a2dda4af7eabd7046381a3c1dbbe2449a03a87 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -520,7 +520,7 @@ SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex * SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, int32_t vgId, char* sql, uint64_t *qId); -int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryParam* param, char* start, +int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, SQueryParam* param, char* start, int32_t prevResultLen, void* merger); void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d28c04bc240a2a978e4af15e43fd25efc4e07974..5d72fc739560fb596b5bd9692508ca425dc8cf3e 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1715,7 +1715,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_Aggregate: { pRuntimeEnv->proot = createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); + if (pRuntimeEnv->pTableScanner->operatorType != OP_DummyInput) { + setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); + } break; } @@ -3811,9 +3813,8 @@ void queryCostStatis(SQInfo *pQInfo) { static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo); -static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; +static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64_t qId, bool isSTableQuery) { + SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; // TODO set the tags scan handle if (onlyQueryTags(pQueryAttr)) { @@ -3839,7 +3840,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) terrno = TSDB_CODE_SUCCESS; if (isFirstLastRowQuery(pQueryAttr)) { - pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQueryAttr->tableGroupInfo, pQInfo->qId, &pQueryAttr->memRef); + pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); // update the query time window pQueryAttr->window = cond.twindow; @@ -3860,9 +3861,9 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) } } } else if (pQueryAttr->pointInterpQuery) { - pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQueryAttr->tableGroupInfo, pQInfo->qId, &pQueryAttr->memRef); + pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); } else { - pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQueryAttr->tableGroupInfo, pQInfo->qId, &pQueryAttr->memRef); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); } return terrno; @@ -3894,7 +3895,7 @@ static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, in return pFillCol; } -int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, SArray* prevResult, void* tsdb, int32_t tbScanner, +int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, SArray* prevResult, void* tsdb, void* sourceOptr, int32_t tbScanner, SArray* pOperator, void* param) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; @@ -3904,13 +3905,12 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, SArray* prevResult, void* ts pRuntimeEnv->prevResult = prevResult; if (tsdb != NULL) { - int32_t code = setupQueryHandle(tsdb, pQInfo, pQueryAttr->stableQuery); + int32_t code = setupQueryHandle(tsdb, pRuntimeEnv, pQInfo->qId, pQueryAttr->stableQuery); if (code != TSDB_CODE_SUCCESS) { return code; } } - pQueryAttr->tsdb = tsdb; pQueryAttr->interBufSize = getOutputInterResultBufSize(pQueryAttr); pRuntimeEnv->groupResInfo.totalGroup = (int32_t) (pQueryAttr->stableQuery? GET_NUM_OF_TABLEGROUP(pRuntimeEnv):0); @@ -3941,6 +3941,12 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, SArray* prevResult, void* ts break; } } + + if (sourceOptr != NULL) { + assert(pRuntimeEnv->pTableScanner == NULL); + pRuntimeEnv->pTableScanner = sourceOptr; + } + if (pTsBuf != NULL) { int16_t order = (pQueryAttr->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; tsBufSetTraverseOrder(pRuntimeEnv->pTsBuf, order); @@ -4538,7 +4544,9 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { break; } - setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); + if (pRuntimeEnv->current != NULL) { + setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); + } if (upstream->operatorType == OP_DataBlocksOptScan) { STableScanInfo* pScanInfo = upstream->info; @@ -6558,7 +6566,7 @@ bool isValidQInfo(void *param) { return (sig == (uint64_t)pQInfo); } -int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryParam* param, char* start, +int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, SQueryParam* param, char* start, int32_t prevResultLen, void* merger) { int32_t code = TSDB_CODE_SUCCESS; @@ -6604,7 +6612,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryPara } // filter the qualified - if ((code = doInitQInfo(pQInfo, pTsBuf, prevResult, tsdb, param->tableScanOperator, param->pOperator, merger)) != TSDB_CODE_SUCCESS) { + if ((code = doInitQInfo(pQInfo, pTsBuf, prevResult, tsdb, sourceOptr, param->tableScanOperator, param->pOperator, merger)) != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 542bbd67e63407fe6eba1af3d1cf48c12623cae1..46eb56e3f2365d40b73f308cfc88493f83d612d3 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -172,7 +172,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi goto _over; } - code = initQInfo(&pQueryMsg->tsBuf, tsdb, *pQInfo, ¶m, (char*)pQueryMsg, pQueryMsg->prevResultLen, NULL); + code = initQInfo(&pQueryMsg->tsBuf, tsdb, NULL, *pQInfo, ¶m, (char*)pQueryMsg, pQueryMsg->prevResultLen, NULL); _over: if (param.pGroupbyExpr != NULL) {