diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index fe29f4f838b42b5ac31649f723731a2b149c772a..a4f720edc49f6c8c1fdad19288403fa0402142eb 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -300,9 +300,9 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta); uint32_t tscGetTableMetaMaxSize(); int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name); STableMeta* tscTableMetaDup(STableMeta* pTableMeta); -SQuery* tscCreateQueryFromQueryNodeInfo(SQueryInfo* pQueryNodeInfo); +int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryNodeInfo, SQuery* pQuery); -void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx); +void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchemaEx* pSchema); void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, uint64_t* qId, char* sql); diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index ddc1a76665d4a56b50bb27651884cda48419fd2c..5095a9355f3be482f798bcb9c3b86b7f8ab13c25 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -57,7 +57,7 @@ int32_t treeComparator(const void *pLeft, const void *pRight, void *param) { } // todo merge with vnode side function -void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx) { +void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchemaEx* pSchema) { size_t size = tscSqlExprNumOfExprs(pQueryInfo); for (int32_t i = 0; i < size; ++i) { @@ -66,9 +66,15 @@ void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx) { pCtx[i].order = pQueryInfo->order.order; pCtx[i].functionId = pExpr->base.functionId; + pCtx[i].order = pQueryInfo->order.order; + pCtx[i].functionId = pExpr->base.functionId; + // input buffer hold only one point data - pCtx[i].inputType = pExpr->base.colType; - pCtx[i].inputBytes = pExpr->base.colBytes; + SSchema *s = &pSchema[i].field; + + // input data format comes from pModel + pCtx[i].inputType = s->type; + pCtx[i].inputBytes = s->bytes; pCtx[i].outputBytes = pExpr->base.resBytes; pCtx[i].outputType = pExpr->base.resType; @@ -133,6 +139,11 @@ static void setCtxInputOutputBuffer(SQueryInfo* pQueryInfo, SQLFunctionCtx *pCtx // input buffer hold only one point data int16_t offset = getColumnModelOffset(pDesc->pColumnModel, i); pCtx[i].pInput = pReducer->pTempBuffer->data + offset; + + int32_t functionId = pCtx[i].functionId; + if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { + pCtx[i].ptsOutputBuf = pCtx[0].pOutput; + } } } @@ -144,7 +155,7 @@ static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) { for(int32_t i = 0; i < numOfCols; ++i) { SInternalField* pIField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i); - if (pIField->pExpr->pExpr != NULL) { + if (pIField->pExpr->pExpr == NULL) { SExprInfo* pExpr = pIField->pExpr; pFillCol[i].col.bytes = pExpr->base.resBytes; @@ -363,8 +374,9 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde pReducer->pTempBuffer->num = 0; tscCreateResPointerInfo(pRes, pQueryInfo); - tsCreateSQLFunctionCtx(pQueryInfo, pReducer->pCtx); + tsCreateSQLFunctionCtx(pQueryInfo, pReducer->pCtx, pDesc->pColumnModel->pFields); setCtxInputOutputBuffer(pQueryInfo, pReducer->pCtx, pReducer, pDesc); + // we change the capacity of schema to denote that there is only one row in temp buffer pReducer->pDesc->pColumnModel->capacity = 1; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index fc7671f7aa5eb065e8d5c14e287128a9971097f3..dd17977949ced29640a3344a5f4cab664d2d0e0a 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -5283,7 +5283,11 @@ bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo) { return true; } - return (pQueryInfo->window.skey == pQueryInfo->window.ekey) && (pQueryInfo->window.skey != 0); + if (pQueryInfo->window.skey == INT64_MIN || pQueryInfo->window.ekey == INT64_MAX) { + return false; + } + + return !(pQueryInfo->window.skey != pQueryInfo->window.ekey && pQueryInfo->interval.interval == 0); } int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySqlNode* pQuerySqlNode, SSqlObj* pSql) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index bd9936dc91457c97703d64ab6b93ad94f54f87cb..df146a641269765f7c15f02cb88f63de8e679099 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -24,6 +24,7 @@ #include "tsclient.h" #include "ttimer.h" #include "tlockfree.h" +#include "qPlan.h" int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; @@ -705,8 +706,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd); - SQuery* pQuery = tscCreateQueryFromQueryNodeInfo(pQueryInfo); - UNUSED(pQuery); + SQuery query = {0}; + tscCreateQueryFromQueryInfo(pQueryInfo, &query); + SArray* tableScanOperator = createTableScanPlan(&query); + SArray* queryOperator = createExecOperatorPlan(&query); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; @@ -731,54 +734,63 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { */ { + + SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload; tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version)); - int32_t numOfTags = pQuery->numOfTags; + int32_t numOfTags = query.numOfTags; int32_t sqlLen = (int32_t) strlen(pSql->sqlstr); - if (pQuery->order.order == TSDB_ORDER_ASC) { - pQueryMsg->window.skey = htobe64(pQuery->window.skey); - pQueryMsg->window.ekey = htobe64(pQuery->window.ekey); + if (taosArrayGetSize(tableScanOperator) == 0) { + pQueryMsg->tableScanOperator = htonl(-1); + } else { + int32_t* tablescanOp = taosArrayGet(tableScanOperator, 0); + pQueryMsg->tableScanOperator = htonl(*tablescanOp); + } + + if (query.order.order == TSDB_ORDER_ASC) { + pQueryMsg->window.skey = htobe64(query.window.skey); + pQueryMsg->window.ekey = htobe64(query.window.ekey); } else { - pQueryMsg->window.skey = htobe64(pQuery->window.ekey); - pQueryMsg->window.ekey = htobe64(pQuery->window.skey); + pQueryMsg->window.skey = htobe64(query.window.ekey); + pQueryMsg->window.ekey = htobe64(query.window.skey); } - pQueryMsg->order = htons(pQuery->order.order); - pQueryMsg->orderColId = htons(pQuery->order.orderColId); - pQueryMsg->fillType = htons(pQuery->fillType); - pQueryMsg->limit = htobe64(pQuery->limit.limit); - pQueryMsg->offset = htobe64(pQuery->limit.offset); + pQueryMsg->order = htons(query.order.order); + pQueryMsg->orderColId = htons(query.order.orderColId); + pQueryMsg->fillType = htons(query.fillType); + pQueryMsg->limit = htobe64(query.limit.limit); + pQueryMsg->offset = htobe64(query.limit.offset); + pQueryMsg->numOfCols = htons(query.numOfCols); + + pQueryMsg->interval.interval = htobe64(query.interval.interval); + pQueryMsg->interval.sliding = htobe64(query.interval.sliding); + pQueryMsg->interval.offset = htobe64(query.interval.offset); + pQueryMsg->interval.intervalUnit = query.interval.intervalUnit; + pQueryMsg->interval.slidingUnit = query.interval.slidingUnit; + pQueryMsg->interval.offsetUnit = query.interval.offsetUnit; - pQueryMsg->numOfCols = htons(pQuery->numOfCols); + pQueryMsg->numOfTags = htonl(numOfTags); + pQueryMsg->sqlstrLen = htonl(sqlLen); + pQueryMsg->sw.gap = htobe64(query.sw.gap); + pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX); - pQueryMsg->interval.interval = htobe64(pQuery->interval.interval); - pQueryMsg->interval.sliding = htobe64(pQuery->interval.sliding); - pQueryMsg->interval.offset = htobe64(pQuery->interval.offset); - pQueryMsg->interval.intervalUnit = pQuery->interval.intervalUnit; - pQueryMsg->interval.slidingUnit = pQuery->interval.slidingUnit; - pQueryMsg->interval.offsetUnit = pQuery->interval.offsetUnit; + pQueryMsg->secondStageOutput = htonl(query.numOfExpr2); + pQueryMsg->numOfOutput = htons((int16_t)query.numOfOutput); // this is the stage one output column number pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); pQueryMsg->tbnameCondLen = htonl(pQueryInfo->tagCond.tbnameCond.len); - pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->queryType = htonl(pQueryInfo->type); -// pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit); - pQueryMsg->sqlstrLen = htonl(sqlLen); pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen); - pQueryMsg->sw.gap = htobe64(pQuery->sw.gap); - pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX); - - pQueryMsg->numOfOutput = htons((int16_t)pQuery->numOfOutput); // this is the stage one output column number // set column list ids size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo); for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfo *pCol = &pQuery->colList[i]; + SColumnInfo *pCol = &query.colList[i]; pQueryMsg->colList[i].colId = htons(pCol->colId); pQueryMsg->colList[i].bytes = htons(pCol->bytes); @@ -814,23 +826,23 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } { - pQueryMsg->stableQuery = pQuery->stableQuery; - pQueryMsg->topBotQuery = pQuery->topBotQuery; - pQueryMsg->groupbyColumn = pQuery->groupbyColumn; - pQueryMsg->hasTagResults = pQuery->hasTagResults; - pQueryMsg->timeWindowInterpo = pQuery->timeWindowInterpo; - pQueryMsg->queryBlockDist = pQuery->queryBlockDist; - pQueryMsg->stabledev = pQuery->stabledev; - pQueryMsg->tsCompQuery = pQuery->tsCompQuery; - pQueryMsg->simpleAgg = pQuery->simpleAgg; - pQueryMsg->pointInterpQuery = pQuery->pointInterpQuery; - pQueryMsg->needReverseScan = pQuery->needReverseScan; + pQueryMsg->stableQuery = query.stableQuery; + pQueryMsg->topBotQuery = query.topBotQuery; + pQueryMsg->groupbyColumn = query.groupbyColumn; + pQueryMsg->hasTagResults = query.hasTagResults; + pQueryMsg->timeWindowInterpo = query.timeWindowInterpo; + pQueryMsg->queryBlockDist = query.queryBlockDist; + pQueryMsg->stabledev = query.stabledev; + pQueryMsg->tsCompQuery = query.tsCompQuery; + pQueryMsg->simpleAgg = query.simpleAgg; + pQueryMsg->pointInterpQuery = query.pointInterpQuery; + pQueryMsg->needReverseScan = query.needReverseScan; } SSqlExpr *pSqlExpr = (SSqlExpr *)pMsg; - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SSqlExpr *pExpr = &pQuery->pExpr1[i].base; + for (int32_t i = 0; i < query.numOfOutput; ++i) { + SSqlExpr *pExpr = &query.pExpr1[i].base; // the queried table has been removed and a new table with the same name has already been created already // return error msg @@ -849,17 +861,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSqlExpr->colInfo.colId = htons(pExpr->colInfo.colId); pSqlExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex); pSqlExpr->colInfo.flag = htons(pExpr->colInfo.flag); + pSqlExpr->uid = htobe64(pExpr->uid); + pSqlExpr->colType = htons(pExpr->colType); + pSqlExpr->colBytes = htons(pExpr->colBytes); + pSqlExpr->resType = htons(pExpr->resType); + pSqlExpr->resBytes = htons(pExpr->resBytes); + pSqlExpr->functionId = htons(pExpr->functionId); + pSqlExpr->numOfParams = htons(pExpr->numOfParams); + pSqlExpr->resColId = htons(pExpr->resColId); - pSqlExpr->colType = htons(pExpr->colType); - pSqlExpr->colBytes = htons(pExpr->colBytes); - pSqlExpr->resType = htons(pExpr->resType); - pSqlExpr->resBytes = htons(pExpr->resBytes); - - pSqlExpr->functionId = htons(pExpr->functionId); - pSqlExpr->numOfParams = htons(pExpr->numOfParams); - pSqlExpr->resColId = htons(pExpr->resColId); pMsg += sizeof(SSqlExpr); - for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log pSqlExpr->param[j].nType = htons((uint16_t)pExpr->param[j].nType); pSqlExpr->param[j].nLen = htons(pExpr->param[j].nLen); @@ -875,9 +886,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSqlExpr = (SSqlExpr *)pMsg; } - if (pQuery->numOfExpr2 > 0) { - for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) { - SSqlExpr *pExpr = &pQuery->pExpr2[i].base; + if (query.numOfExpr2 > 0) { + for (int32_t i = 0; i < query.numOfExpr2; ++i) { + SSqlExpr *pExpr = &query.pExpr2[i].base; // the queried table has been removed and a new table with the same name has already been created already // return error msg @@ -886,23 +897,19 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_TSC_INVALID_TABLE_NAME; } - if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) { - tscError("%p table schema is not matched with parsed sql", pSql); - return TSDB_CODE_TSC_INVALID_SQL; - } - assert(pExpr->resColId < 0); pSqlExpr->colInfo.colId = htons(pExpr->colInfo.colId); pSqlExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex); pSqlExpr->colInfo.flag = htons(pExpr->colInfo.flag); + pSqlExpr->uid = htobe64(pExpr->uid); - pSqlExpr->colType = htons(pExpr->colType); + pSqlExpr->colType = htons(pExpr->colType); pSqlExpr->colBytes = htons(pExpr->colBytes); - pSqlExpr->resType = htons(pExpr->resType); + pSqlExpr->resType = htons(pExpr->resType); pSqlExpr->resBytes = htons(pExpr->resBytes); - pSqlExpr->functionId = htons(pExpr->functionId); + pSqlExpr->functionId = htons(pExpr->functionId); pSqlExpr->numOfParams = htons(pExpr->numOfParams); pSqlExpr->resColId = htons(pExpr->resColId); pMsg += sizeof(SSqlExpr); @@ -921,14 +928,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSqlExpr = (SSqlExpr *)pMsg; } - } else { - pQueryMsg->secondStageOutput = 0; } // serialize the table info (sid, uid, tags) pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg); - SSqlGroupbyExpr *pGroupbyExpr = pQuery->pGroupbyExpr; + SSqlGroupbyExpr *pGroupbyExpr = query.pGroupbyExpr; if (pGroupbyExpr != NULL && pGroupbyExpr->numOfGroupCols > 0) { pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex); pQueryMsg->orderType = htons(pGroupbyExpr->orderType); @@ -950,16 +955,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } } - if (pQuery->fillType != TSDB_FILL_NONE) { - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - *((int64_t *)pMsg) = htobe64(pQuery->fillVal[i]); - pMsg += sizeof(pQuery->fillVal[0]); + if (query.fillType != TSDB_FILL_NONE) { + for (int32_t i = 0; i < query.numOfOutput; ++i) { + *((int64_t *)pMsg) = htobe64(query.fillVal[i]); + pMsg += sizeof(query.fillVal[0]); } } - if (pQuery->numOfTags > 0) { - for (int32_t i = 0; i < pQuery->numOfTags; ++i) { - SColumnInfo* pTag = &pQuery->tagColList[i]; + if (query.numOfTags > 0) { + for (int32_t i = 0; i < query.numOfTags; ++i) { + SColumnInfo* pTag = &query.tagColList[i]; SColumnInfo* pTagCol = (SColumnInfo*) pMsg; pTagCol->colId = htons(pTag->colId); @@ -1013,6 +1018,15 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks); } + int32_t numOfOperator = taosArrayGetSize(queryOperator); + pQueryMsg->numOfOperator = htonl(numOfOperator); + for(int32_t i = 0; i < numOfOperator; ++i) { + int32_t *operator = taosArrayGet(queryOperator, i); + *(int32_t*)pMsg = htonl(*operator); + + pMsg += sizeof(int32_t); + } + memcpy(pMsg, pSql->sqlstr, sqlLen); pMsg += sqlLen; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 552bdac7ad1a7502973fb8571171597071f80023..3eb53cb863b79670478ce21742f0a7c065416ba4 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -3218,8 +3218,6 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, uint64_t* qId, char* sql) { assert(pQueryNodeInfo != NULL); - - int16_t numOfCols = taosArrayGetSize(pQueryNodeInfo->colList); int16_t numOfOutput = pQueryNodeInfo->fieldsInfo.numOfOutput; SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); @@ -3229,42 +3227,10 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs // to make sure third party won't overwrite this structure pQInfo->signature = pQInfo; - SQuery* pQuery = &pQInfo->query; - pQInfo->runtimeEnv.pQuery = pQuery; - - pQuery->tableGroupInfo = *pTableGroupInfo; - pQuery->numOfCols = numOfCols; - pQuery->numOfOutput = numOfOutput; - pQuery->limit = pQueryNodeInfo->limit; - pQuery->order = pQueryNodeInfo->order; - pQuery->pExpr1 = pExprs; - pQuery->pExpr2 = NULL; // not support yet. - pQuery->numOfExpr2 = 0; - pQuery->pGroupbyExpr = NULL; - memcpy(&pQuery->interval, &pQueryNodeInfo->interval, sizeof(pQuery->interval)); - pQuery->fillType = pQueryNodeInfo->fillType; - pQuery->numOfTags = 0; - pQuery->tagColList = NULL; -// pQuery->prjInfo.vgroupLimit = pQueryNodeInfo->vgroupLimit; - pQuery->prjInfo.ts = (pQueryNodeInfo->order.order == TSDB_ORDER_ASC)? INT64_MIN:INT64_MAX; - pQuery->sw = pQueryNodeInfo->sessionWindow; - pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); - if (pQuery->colList == NULL) { - goto _cleanup; - } - - pQuery->srcRowSize = 0; - pQuery->maxSrcColumnSize = 0; - for (int16_t i = 0; i < numOfCols; ++i) { - SColumn* pCol = taosArrayGet(pQueryNodeInfo->colList, i); - pQuery->colList[i] = pCol->info; - pQuery->colList[i].filterInfo = tFilterInfoDup(pCol->info.filterInfo, pQuery->colList[i].numOfFilters); + SQuery *pQuery = &pQInfo->query; - pQuery->srcRowSize += pQuery->colList[i].bytes; - if (pQuery->maxSrcColumnSize < pQuery->colList[i].bytes) { - pQuery->maxSrcColumnSize = pQuery->colList[i].bytes; - } - } + tscCreateQueryFromQueryInfo(pQueryNodeInfo, pQuery); + pQInfo->runtimeEnv.pQuery = pQuery; // calculate the result row size for (int16_t col = 0; col < numOfOutput; ++col) { @@ -3283,16 +3249,6 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs // goto _cleanup; // } - if (pQuery->fillType != TSDB_FILL_NONE) { - pQuery->fillVal = malloc(sizeof(int64_t) * pQuery->numOfOutput); - if (pQuery->fillVal == NULL) { - goto _cleanup; - } - - // the first column is the timestamp - memcpy(pQuery->fillVal, (char *)pQueryNodeInfo->fillVal, pQuery->numOfOutput * sizeof(int64_t)); - } - size_t numOfGroups = 0; if (pTableGroupInfo->pGroupList != NULL) { numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList); @@ -3311,14 +3267,13 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs pQInfo->dataReady = QUERY_RESULT_NOT_READY; pQInfo->rspContext = NULL; pQInfo->sql = sql; + pthread_mutex_init(&pQInfo->lock, NULL); tsem_init(&pQInfo->ready, 0, 0); - pQuery->window = pQueryNodeInfo->window; // changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery); SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - bool groupByCol = false;//isGroupbyColumn(pQuery->pGroupbyExpr); STimeWindow window = pQuery->window; @@ -3339,7 +3294,7 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs window.skey = info->lastKey; void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo); - STableQueryInfo* item = createTableQueryInfo(pQuery, info->pTable, groupByCol, window, buf); + STableQueryInfo* item = createTableQueryInfo(pQuery, info->pTable, pQuery->groupbyColumn, window, buf); if (item == NULL) { goto _cleanup; } @@ -3347,7 +3302,6 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs item->groupIndex = i; taosArrayPush(p1, &item); -// STableId* id = TSDB_TABLEID(info->pTable); STableId id = {.tid = 0, .uid = 0}; taosHashPut(pRuntimeEnv->tableqinfoGroupInfo.map, &id.tid, sizeof(id.tid), &item, POINTER_BYTES); index += 1; @@ -3356,10 +3310,7 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs // colIdCheck(pQuery, pQInfo); - // todo refactor - pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX); - - pQInfo->qId = 0;//atomic_add_fetch_64(&queryHandleId, 1); + pQInfo->qId = 0; if (qId != NULL) { *qId = pQInfo->qId; } @@ -3384,7 +3335,7 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs STsBufInfo bufInfo = {0}; SQueryParam param = {0}; - /*int32_t code = */initQInfo(&bufInfo, NULL, 0, pQInfo, ¶m, NULL, 0, false); + /*int32_t code = */initQInfo(&bufInfo, NULL, pQInfo, ¶m, NULL, 0); qTableQuery(pQInfo); return pQInfo; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b5682139e5735125ac5f58ce48bdf35384820db1..82243103b037d5aa9e15734aca6b2f5b5e45229c 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -211,7 +211,7 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) { assert(pExpr != NULL); int32_t functionId = pExpr->base.functionId; - if (functionId == TSDB_FUNC_TAG) { + if (functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS) { continue; } @@ -555,7 +555,7 @@ void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) { int32_t numOfCols = taosArrayGetSize(px->colList); SQueriedTableInfo info = {.colList = colInfo, .numOfCols = numOfCols,}; /*int32_t code = */createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL); - tsCreateSQLFunctionCtx(px, pCtx); + tsCreateSQLFunctionCtx(px, pCtx, NULL); STableGroupInfo tableGroupInfo = {0}; tableGroupInfo.numOfTables = 1; @@ -3073,20 +3073,24 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta) { return p; } -SQuery* tscCreateQueryFromQueryNodeInfo(SQueryInfo* pQueryInfo) { - SQuery* pQuery = calloc(1, sizeof(SQuery)); +int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQuery* pQuery) { + memset(pQuery, 0, sizeof(SQuery)); - int16_t numOfCols = taosArrayGetSize(pQueryInfo->colList); - int16_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput; - - pQuery->tsdb = NULL; - pQuery->topBotQuery = tscIsTopBotQuery(pQueryInfo); - pQuery->hasTagResults = hasTagValOutput(pQueryInfo); + pQuery->tsdb = NULL; + pQuery->topBotQuery = tscIsTopBotQuery(pQueryInfo); + pQuery->hasTagResults = hasTagValOutput(pQueryInfo); + pQuery->stabledev = isStabledev(pQueryInfo); + pQuery->tsCompQuery = isTsCompQuery(pQueryInfo); + pQuery->simpleAgg = isSimpleAggregate(pQueryInfo); + pQuery->needReverseScan = tscNeedReverseScan(pQueryInfo); + pQuery->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type); + pQuery->groupbyColumn = tscGroupbyColumn(pQueryInfo); + pQuery->queryBlockDist = isBlockDistQuery(pQueryInfo); + pQuery->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo); pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo); - pQuery->stabledev = isStabledev(pQueryInfo); - pQuery->tsCompQuery = isTsCompQuery(pQueryInfo); - pQuery->simpleAgg = isSimpleAggregate(pQueryInfo); - pQuery->needReverseScan = tscNeedReverseScan(pQueryInfo); + + int16_t numOfCols = taosArrayGetSize(pQueryInfo->colList); + int16_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput; pQuery->numOfCols = numOfCols; pQuery->numOfOutput = numOfOutput; @@ -3096,18 +3100,19 @@ SQuery* tscCreateQueryFromQueryNodeInfo(SQueryInfo* pQueryInfo) { pQuery->pExpr2 = NULL; // not support yet. pQuery->numOfExpr2 = 0; pQuery->pGroupbyExpr = NULL; - memcpy(&pQuery->interval, &pQueryInfo->interval, sizeof(pQuery->interval)); - pQuery->fillType = pQueryInfo->fillType; pQuery->numOfTags = 0; pQuery->tagColList = NULL; + memcpy(&pQuery->interval, &pQueryInfo->interval, sizeof(pQuery->interval)); + STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; pQuery->vgId = 0; pQuery->stableQuery = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); pQuery->groupbyColumn = tscGroupbyColumn(pQueryInfo); pQuery->window = pQueryInfo->window; + pQuery->pGroupbyExpr = calloc(1, sizeof(SSqlGroupbyExpr)); *pQuery->pGroupbyExpr = pQueryInfo->groupbyExpr; @@ -3128,6 +3133,7 @@ SQuery* tscCreateQueryFromQueryNodeInfo(SQueryInfo* pQueryInfo) { } pQuery->colList[i] = pCol->info; + pQuery->colList[i].filterInfo = tFilterInfoDup(pCol->info.filterInfo, pQuery->colList[i].numOfFilters); } } @@ -3139,34 +3145,48 @@ SQuery* tscCreateQueryFromQueryNodeInfo(SQueryInfo* pQueryInfo) { for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) { SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i); - SExprInfo *pExpr = pField->pExpr; - SSqlExpr *pse = &pQuery->pExpr2[i].base; + SExprInfo* pExpr = pField->pExpr; + + SSqlExpr* pse = &pQuery->pExpr2[i].base; + pse->uid = pTableMetaInfo->pTableMeta->id.uid; // this should be switched to projection query if (pExpr->pExpr == NULL) { - pse->numOfParams = 0; // no params for projection query - pse->functionId = TSDB_FUNC_PRJ; + pse->numOfParams = 0; // no params for projection query + pse->functionId = TSDB_FUNC_PRJ; pse->colInfo.colId = pExpr->base.resColId; - pse->colInfo.flag = TSDB_COL_NORMAL; - pse->colType = pExpr->base.resType; - pse->colBytes = pExpr->base.resBytes; - pse->resType = pExpr->base.resType; - pse->resBytes = pExpr->base.resBytes; + + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { + if (pQuery->pExpr1[j].base.resColId == pse->colInfo.colId) { + pse->colInfo.colIndex = j; + } + } + pse->colInfo.flag = TSDB_COL_NORMAL; + pse->colType = pExpr->base.resType; + pse->colBytes = pExpr->base.resBytes; + pse->resType = pExpr->base.resType; + pse->resBytes = pExpr->base.resBytes; } else { assert(pField->pExpr->pExpr != NULL); pse->colInfo.colId = pExpr->base.colInfo.colId; - pse->colType = pExpr->base.colType; + pse->colType = pExpr->base.colType; pse->colBytes = pExpr->base.colBytes; pse->resBytes = sizeof(double); - pse->resType = TSDB_DATA_TYPE_DOUBLE; + pse->resType = TSDB_DATA_TYPE_DOUBLE; - pse->functionId = pExpr->base.functionId; + pse->functionId = pExpr->base.functionId; pse->numOfParams = pExpr->base.numOfParams; + + memset(pse->param, 0, sizeof(tVariant) * tListLen(pse->param)); + for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) { + tVariantAssign(&pse->param[j], &pExpr->base.param[j]); + } } + + pse->resColId = pExpr->base.resColId; + pse->uid = pTableMetaInfo->pTableMeta->id.uid; } - } else { - pQuery->numOfExpr2 = 0; } } @@ -3195,7 +3215,7 @@ SQuery* tscCreateQueryFromQueryNodeInfo(SQueryInfo* pQueryInfo) { // tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s", // pSql, pTableMeta->id.tid, pTableMeta->id.uid, n, total, numOfTagColumns, pCol->colIndex.columnIndex, pColSchema->name); - return NULL; + return TSDB_CODE_SUCCESS; } SColumnInfo* pTagCol = &pQuery->tagColList[i]; @@ -3208,20 +3228,20 @@ SQuery* tscCreateQueryFromQueryNodeInfo(SQueryInfo* pQueryInfo) { } } - pQuery->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type); - pQuery->topBotQuery = tscIsTopBotQuery(pQueryInfo); - pQuery->groupbyColumn = tscGroupbyColumn(pQueryInfo); - pQuery->hasTagResults = hasTagValOutput(pQueryInfo); - pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo); - pQuery->queryBlockDist = isBlockDistQuery(pQueryInfo); - pQuery->stabledev = isStabledev(pQueryInfo); - pQuery->tsCompQuery = isTsCompQuery(pQueryInfo); - pQuery->simpleAgg = isSimpleAggregate(pQueryInfo); - pQuery->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo); - pQuery->needReverseScan = tscNeedReverseScan(pQueryInfo); + if (pQuery->fillType != TSDB_FILL_NONE) { + pQuery->fillVal = calloc(pQuery->numOfOutput, sizeof(int64_t)); + memcpy(pQuery->fillVal, pQueryInfo->fillVal, pQuery->numOfOutput * sizeof(int64_t)); + } + + pQuery->srcRowSize = 0; + pQuery->maxSrcColumnSize = 0; + for (int16_t i = 0; i < numOfCols; ++i) { + pQuery->srcRowSize += pQuery->colList[i].bytes; + if (pQuery->maxSrcColumnSize < pQuery->colList[i].bytes) { + pQuery->maxSrcColumnSize = pQuery->colList[i].bytes; + } + } pQuery->interBufSize = getOutputInterResultBufSize(pQuery); - return pQuery; + return TSDB_CODE_SUCCESS; } - - diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 653f4b8bf162ccfc69f85ba8f45e3d73fa86f275..8a92c6dedbaf6234b2c8b9f58681ac0752f9042e 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -493,6 +493,8 @@ typedef struct { int32_t numOfTags; // number of tags columns involved int32_t sqlstrLen; // sql query string int32_t prevResultLen; // previous result length + int32_t numOfOperator; + int32_t tableScanOperator;// table scan operator. -1 means no scan operator SColumnInfo colList[]; } SQueryTableMsg; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 655c76e58202a638688f2d74f679713949a18f99..9504b44aa103d89494f096f7752a4da726b77124 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -350,6 +350,8 @@ typedef struct SQueryParam { SColIndex *pGroupColIndex; SColumnInfo *pTagColumnInfo; SSqlGroupbyExpr *pGroupbyExpr; + int32_t tableScanOperator; + SArray *pOperator; } SQueryParam; typedef struct STableScanInfo { @@ -442,9 +444,10 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code); SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, - SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql, uint64_t *qId); -int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, int32_t vgId, SQInfo* pQInfo, SQueryParam* param, char* start, - int32_t prevResultLen, bool isSTable); + SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, int32_t vgId, char* sql, uint64_t *qId); + +int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryParam* param, char* start, + int32_t prevResultLen); void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 2080cdff8442f9f48a6cbae804df7167e9a7d155..cc986537221297b5c15c7ca515805dbb5aa8f419 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1711,7 +1711,7 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) { return NULL; } -static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables) { +static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables, SArray* pOperator) { qDebug("QInfo:%p setup runtime env", pRuntimeEnv->qinfo); SQuery *pQuery = pRuntimeEnv->pQuery; @@ -1749,6 +1749,85 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf // group by normal column, sliding window query, interval query are handled by interval query processor // interval (down sampling operation) + int32_t numOfOperator = taosArrayGetSize(pOperator); + for(int32_t i = 0; i < numOfOperator; ++i) { + int32_t* op = taosArrayGet(pOperator, i); + + switch (*op) { + case OP_TagScan: { + pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput); + break; + } + case OP_MultiTableTimeInterval: { + pRuntimeEnv->proot = + createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); + break; + } + case OP_TimeWindow: { + pRuntimeEnv->proot = + createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); + break; + } + case OP_Groupby: { + pRuntimeEnv->proot = + createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); + break; + } + case OP_SessionWindow: { + pRuntimeEnv->proot = + createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); + break; + } + case OP_MultiTableAggregate: { + pRuntimeEnv->proot = + createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); + break; + } + case OP_Aggregate: { + pRuntimeEnv->proot = + createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); + break; + } + + case OP_Arithmetic: { + SOperatorInfo* prev = pRuntimeEnv->pTableScanner; + if (i >= 1) { + prev = pRuntimeEnv->proot; + } + + pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); + break; + } + + case OP_Limit: { + pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); + break; + } + + case OP_Offset: { + pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); + break; + } + + case OP_Fill: { + SOperatorInfo* pInfo = pRuntimeEnv->proot; + pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput); + break; + } + + default: { + assert(0); + } + } + } + /* if (onlyQueryTags(pQuery)) { // do nothing for tags query } else if (QUERY_IS_INTERVAL_QUERY(pQuery)) { @@ -1816,7 +1895,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf if (pQuery->limit.limit > 0) { pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); } - +*/ return TSDB_CODE_SUCCESS; _clean: @@ -1966,29 +2045,6 @@ static bool isFirstLastRowQuery(SQuery *pQuery) { return false; } -static bool needReverseScan(SQuery *pQuery) { - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functionId = pQuery->pExpr1[i].base.functionId; - if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG) { - continue; - } - - if ((functionId == TSDB_FUNC_FIRST || functionId == TSDB_FUNC_FIRST_DST) && !QUERY_IS_ASC_QUERY(pQuery)) { - return true; - } - - if (functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_LAST_DST) { - // the scan order to acquire the last result of the specified column - int32_t order = (int32_t)pQuery->pExpr1[i].base.param[0].i64; - if (order != pQuery->order.order) { - return true; - } - } - } - - return false; -} - /** * The following 4 kinds of query are treated as the tags query * tagprj, tid_tag query, count(tbname), 'abc' (user defined constant value column) query @@ -3911,7 +3967,7 @@ static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, in SExprInfo* pExprInfo = &pExpr[i]; pFillCol[i].col.bytes = pExprInfo->base.resBytes; - pFillCol[i].col.type = (int8_t)pExprInfo->base.resBytes; + pFillCol[i].col.type = (int8_t)pExprInfo->base.resType; pFillCol[i].col.offset = offset; pFillCol[i].tagIndex = -2; pFillCol[i].flag = TSDB_COL_NORMAL; // always be ta normal column for table query @@ -3924,42 +3980,36 @@ static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, in return pFillCol; } -int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *tsdb, int32_t vgId, bool isSTableQuery) { +int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *tsdb, int32_t tbScanner, SArray* pOperator) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; pQuery->tsdb = tsdb; -// pQuery->topBotQuery = isTopBottomQuery(pQuery); -// pQuery->hasTagResults = hasTagValOutput(pQuery); -// pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQuery); -// pQuery->stabledev = isStabledev(pQuery); - pRuntimeEnv->prevResult = prevResult; pRuntimeEnv->qinfo = pQInfo; if (tsdb != NULL) { - int32_t code = setupQueryHandle(tsdb, pQInfo, isSTableQuery); + int32_t code = setupQueryHandle(tsdb, pQInfo, pQuery->stableQuery); if (code != TSDB_CODE_SUCCESS) { return code; } } pQuery->tsdb = tsdb; - pQuery->vgId = vgId; - pQuery->stableQuery = isSTableQuery; pQuery->interBufSize = getOutputInterResultBufSize(pQuery); - pRuntimeEnv->groupResInfo.totalGroup = (int32_t) (isSTableQuery? GET_NUM_OF_TABLEGROUP(pRuntimeEnv):0); + pRuntimeEnv->groupResInfo.totalGroup = (int32_t) (pQuery->stableQuery? GET_NUM_OF_TABLEGROUP(pRuntimeEnv):0); pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pTsBuf = pTsBuf; pRuntimeEnv->cur.vgroupIndex = -1; setResultBufSize(pQuery, &pRuntimeEnv->resultInfo); + /* if (onlyQueryTags(pQuery)) { - pRuntimeEnv->resultInfo.capacity = 4096; - pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput); +// pRuntimeEnv->resultInfo.capacity = 4096; +// pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput); } else if (pQuery->queryBlockDist) { pRuntimeEnv->pTableScanner = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); } else if (pQuery->tsCompQuery || pQuery->pointInterpQuery) { @@ -3969,7 +4019,29 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts } else { pRuntimeEnv->pTableScanner = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery)); } - +*/ + switch(tbScanner) { + case OP_TableBlockInfoScan: { + pRuntimeEnv->pTableScanner = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); + break; + } + case OP_TableSeqScan: { + pRuntimeEnv->pTableScanner = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); + break; + } + case OP_DataBlocksOptScan: { + pRuntimeEnv->pTableScanner = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1); + break; + } + case OP_TableScan: { + pRuntimeEnv->pTableScanner = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery)); + break; + } + default: { + // do nothing + break; + } + } if (pTsBuf != NULL) { int16_t order = (pQuery->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; tsBufSetTraverseOrder(pRuntimeEnv->pTsBuf, order); @@ -3987,7 +4059,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts // create runtime environment int32_t numOfTables = (int32_t)pQuery->tableGroupInfo.numOfTables; pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo)); - code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQuery->tableGroupInfo.numOfTables); + code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQuery->tableGroupInfo.numOfTables, pOperator); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -5446,7 +5518,6 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { } pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables); - pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey); pQueryMsg->window.ekey = htobe64(pQueryMsg->window.ekey); pQueryMsg->interval.interval = htobe64(pQueryMsg->interval.interval); @@ -5476,6 +5547,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { pQueryMsg->prevResultLen = htonl(pQueryMsg->prevResultLen); pQueryMsg->sw.gap = htobe64(pQueryMsg->sw.gap); pQueryMsg->sw.primaryColId = htonl(pQueryMsg->sw.primaryColId); + pQueryMsg->tableScanOperator = htonl(pQueryMsg->tableScanOperator); + pQueryMsg->numOfOperator = htonl(pQueryMsg->numOfOperator); // query msg safety check if (!validateQueryMsg(pQueryMsg)) { @@ -5536,6 +5609,7 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { } } + param->tableScanOperator = pQueryMsg->tableScanOperator; param->pExpr = calloc(pQueryMsg->numOfOutput, POINTER_BYTES); if (param->pExpr == NULL) { code = TSDB_CODE_QRY_OUT_OF_MEMORY; @@ -5727,6 +5801,14 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { pMsg = (char *)pQueryMsg + pQueryMsg->tsBuf.tsOffset + pQueryMsg->tsBuf.tsLen; } + param->pOperator = taosArrayInit(pQueryMsg->numOfOperator, sizeof(int32_t)); + for(int32_t i = 0; i < pQueryMsg->numOfOperator; ++i) { + int32_t op = htonl(*(int32_t*)pMsg); + taosArrayPush(param->pOperator, &op); + + pMsg += sizeof(int32_t); + } + param->sql = strndup(pMsg, pQueryMsg->sqlstrLen); SQueriedTableInfo info = { .numOfTags = pQueryMsg->numOfTags, .numOfCols = pQueryMsg->numOfCols, .colList = pQueryMsg->colList}; @@ -6091,7 +6173,7 @@ FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) { } SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs, - SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, + SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, int32_t vgId, char* sql, uint64_t *qId) { int16_t numOfCols = pQueryMsg->numOfCols; int16_t numOfOutput = pQueryMsg->numOfOutput; @@ -6136,6 +6218,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr pQuery->simpleAgg = pQueryMsg->simpleAgg; pQuery->pointInterpQuery = pQueryMsg->pointInterpQuery; pQuery->needReverseScan = pQueryMsg->needReverseScan; + pQuery->vgId = vgId; pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); if (pQuery->colList == NULL) { @@ -6203,11 +6286,9 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr tsem_init(&pQInfo->ready, 0, 0); pQuery->window = pQueryMsg->window; - changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery); + changeExecuteScanOrder(pQInfo, pQueryMsg, pQuery->stableQuery); SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; -// bool groupByCol = isGroupbyColumn(pQuery->pGroupbyExpr); - STimeWindow window = pQuery->window; int32_t index = 0; @@ -6243,9 +6324,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr colIdCheck(pQuery, pQInfo); - // todo refactor - pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX); - pQInfo->qId = atomic_add_fetch_64(&queryHandleId, 1); *qId = pQInfo->qId; qDebug("qmsg:%p QInfo:%" PRIu64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo); @@ -6289,8 +6367,7 @@ bool isValidQInfo(void *param) { return (sig == (uint64_t)pQInfo); } -int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, int32_t vgId, SQInfo* pQInfo, SQueryParam* param, char* start, - int32_t prevResultLen, bool isSTable) { +int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryParam* param, char* start, int32_t prevResultLen) { int32_t code = TSDB_CODE_SUCCESS; SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; @@ -6299,7 +6376,8 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, int32_t vgId, SQInfo* pQIn STSBuf *pTsBuf = NULL; if (pTsBufInfo->tsLen > 0) { // open new file to save the result char *tsBlock = start + pTsBufInfo->tsOffset; - pTsBuf = tsBufCreateFromCompBlocks(tsBlock, pTsBufInfo->tsNumOfBlocks, pTsBufInfo->tsLen, pTsBufInfo->tsOrder, vgId); + pTsBuf = tsBufCreateFromCompBlocks(tsBlock, pTsBufInfo->tsNumOfBlocks, pTsBufInfo->tsLen, pTsBufInfo->tsOrder, + pQuery->vgId); tsBufResetPos(pTsBuf); bool ret = tsBufNextPos(pTsBuf); @@ -6332,7 +6410,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, int32_t vgId, SQInfo* pQIn } // filter the qualified - if ((code = doInitQInfo(pQInfo, pTsBuf, prevResult, tsdb, vgId, isSTable)) != TSDB_CODE_SUCCESS) { + if ((code = doInitQInfo(pQInfo, pTsBuf, prevResult, tsdb, param->tableScanOperator, param->pOperator)) != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index 3248e28df9f90c38678412d13cf100d89cc95aa1..ec0d118261dc9711110db5d39509b6719adf805a 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -36,7 +36,7 @@ SQueryNode* queryPlanFromString() { return NULL; } -UNUSED_FUNC SArray* createTableScanPlan(SQuery* pQuery) { +SArray* createTableScanPlan(SQuery* pQuery) { SArray* plan = taosArrayInit(4, sizeof(int32_t)); int32_t op = 0; @@ -56,7 +56,7 @@ UNUSED_FUNC SArray* createTableScanPlan(SQuery* pQuery) { return plan; } -UNUSED_FUNC SArray* createExecOperatorPlan(SQuery* pQuery) { +SArray* createExecOperatorPlan(SQuery* pQuery) { SArray* plan = taosArrayInit(4, sizeof(int32_t)); int32_t op = 0; diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index b6eef5a090352dc7267dc1db8d2c92f44c67a5b5..10fec83a30bf5da4a0d089b7676545425036e63d 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -157,7 +157,9 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi goto _over; } - (*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, param.pTagColumnInfo, isSTableQuery, param.sql, qId); + assert(pQueryMsg->stableQuery == isSTableQuery); + (*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, + param.pTagColumnInfo, vgId, param.sql, qId); param.sql = NULL; param.pExprs = NULL; @@ -170,7 +172,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi goto _over; } - code = initQInfo(&pQueryMsg->tsBuf, tsdb, vgId, *pQInfo, ¶m, (char*)pQueryMsg, pQueryMsg->prevResultLen, isSTableQuery); + code = initQInfo(&pQueryMsg->tsBuf, tsdb, *pQInfo, ¶m, (char*)pQueryMsg, pQueryMsg->prevResultLen); _over: if (param.pGroupbyExpr != NULL) { diff --git a/tests/script/general/parser/col_arithmetic_query.sim b/tests/script/general/parser/col_arithmetic_query.sim index 2c56c6445fa8134ab28837940a11ff6f4127b7c7..191f56fcfb7d13caf1cb981f518e8bdd439c42b3 100644 --- a/tests/script/general/parser/col_arithmetic_query.sim +++ b/tests/script/general/parser/col_arithmetic_query.sim @@ -414,6 +414,7 @@ if $rows != 1 then endi if $data00 != 0.204545455 then + print expect 0.204545455, actual: $data00 return -1 endi