diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index f747184ce017e9cb780ba2a61b87360cee0379d8..448fee936f5c4ad6d5428638b531d8ac742a1648 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -123,6 +123,7 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i */ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo); bool tscIsTWAQuery(SQueryInfo* pQueryInfo); +bool tscIsDiffQuery(SQueryInfo* pQueryInfo); bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo); bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo); bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo); @@ -132,7 +133,6 @@ bool hasTagValOutput(SQueryInfo* pQueryInfo); bool timeWindowInterpoRequired(SQueryInfo *pQueryInfo); bool isStabledev(SQueryInfo* pQueryInfo); bool isTsCompQuery(SQueryInfo* pQueryInfo); -bool isSimpleAggregate(SQueryInfo* pQueryInfo); bool isBlockDistQuery(SQueryInfo* pQueryInfo); bool isSimpleAggregateRv(SQueryInfo* pQueryInfo); @@ -331,7 +331,7 @@ SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo); int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr); void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema); -void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage); +void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage, uint64_t qId); void* malloc_throw(size_t size); void* calloc_throw(size_t nmemb, size_t size); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 67fd34ffd7425cf921e927247149808540de3020..46aad90c42a0a4872f4961163d5722a7900f29d0 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -319,7 +319,7 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo); void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock); -void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput); +void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent); void destroyTableNameList(SInsertStatementParam* pInsertParam); void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 15276a38887523541cbe37ebf1add2152d2ffe80..d857d00e159d06df266187693913a6b6e404a2b3 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -144,7 +144,7 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) { } // local merge has handle this situation during super table non-projection query. - if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) { + if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE) { pRes->numOfClauseTotal += pRes->numOfRows; } @@ -174,7 +174,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo } pSql->fp = fp; - if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) { + if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE && pCmd->command < TSDB_SQL_LOCAL) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; } @@ -257,14 +257,14 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) { } return; - } else if (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE) { + } else if (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE) { // in case of show command, return no data (*pSql->fetchFp)(param, pSql, 0); } else { assert(0); } } else { // current query is not completed, continue retrieve from node - if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) { + if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE && pCmd->command < TSDB_SQL_LOCAL) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; } diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index f97f54a6267d2754cb2edf2fd38ff6c075a2b285..2c9717306f2b26dfbbb5ec1d676b5baad3eb2d2d 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -323,7 +323,7 @@ TAOS_ROW tscFetchRow(void *param) { // current data set are exhausted, fetch more data from node if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) && (pCmd->command == TSDB_SQL_RETRIEVE || - pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE || + pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE || pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_SHOW || diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 15e40d59187a4eabd7c01caf7733da118deb548d..5d712bb980f92ebaf99979b5fd551e4203a50edd 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2928,8 +2928,8 @@ void tscRestoreFuncForSTableQuery(SQueryInfo* pQueryInfo) { } bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { - const char* msg1 = "TWA not allowed to apply to super table directly"; - const char* msg2 = "TWA only support group by tbname for super table query"; + const char* msg1 = "TWA/Diff not allowed to apply to super table directly"; + const char* msg2 = "TWA/Diff only support group by tbname for super table query"; const char* msg3 = "function not support for super table query"; // filter sql function not supported by metric query yet. @@ -2942,7 +2942,7 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) } } - if (tscIsTWAQuery(pQueryInfo)) { + if (tscIsTWAQuery(pQueryInfo) || tscIsDiffQuery(pQueryInfo)) { if (pQueryInfo->groupbyExpr.numOfGroupCols == 0) { invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return true; @@ -6276,7 +6276,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { } if (IS_MULTIOUTPUT(aAggs[functId].status) && functId != TSDB_FUNC_TOP && functId != TSDB_FUNC_BOTTOM && - functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_PRJ) { + functId != TSDB_FUNC_DIFF && functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_PRJ) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); } @@ -6758,7 +6758,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { const char* msg6 = "from missing in subclause"; const char* msg7 = "time interval is required"; const char* msg8 = "the first column should be primary timestamp column"; - + SSqlCmd* pCmd = &pSql->cmd; SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); assert(pQueryInfo->numOfTables == 1); @@ -7719,6 +7719,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf pQueryInfo->arithmeticOnAgg = tsIsArithmeticQueryOnAggResult(pQueryInfo); pQueryInfo->orderProjectQuery = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0); + pQueryInfo->diffQuery = tscIsDiffQuery(pQueryInfo); SExprInfo** p = NULL; int32_t numOfExpr = 0; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 27630053036db1afa5b6f75eda620f58b3ec62be..0fd8d6a0c74566ba9d5ea778d06b4d2dc6fcf872 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1588,7 +1588,7 @@ int tscProcessLocalRetrieveRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, numOfRes); } -int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { +int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd* pCmd = &pSql->cmd; @@ -1615,10 +1615,11 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { taosArrayPush(group, &tableKeyInfo); taosArrayPush(tableGroupInfo.pGroupList, &group); - pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, &tableGroupInfo, NULL, NULL, pRes->pMerger, MERGE_STAGE); + tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute query processing", pSql->self, pSql->self); + pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, &tableGroupInfo, NULL, NULL, pRes->pMerger, MERGE_STAGE, pSql->self); } - uint64_t localQueryId = 0; + uint64_t localQueryId = pSql->self; qTableQuery(pQueryInfo->pQInfo, &localQueryId); convertQueryResult(pRes, pQueryInfo); @@ -2689,7 +2690,7 @@ void tscInitMsgsFp() { tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp; - tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp; + tscProcessMsgRsp[TSDB_SQL_RETRIEVE_GLOBALMERGE] = tscProcessRetrieveGlobalMergeRsp; tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp; tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 554ce351eb05a3196a08a79c3b3067df6de6aab8..1e9389287674996b63ff3a7cc2d8ece81d4bc130 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -456,7 +456,7 @@ static bool needToFetchNewBlock(SSqlObj* pSql) { return (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) && (pCmd->command == TSDB_SQL_RETRIEVE || - pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE || + pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE || pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_SHOW || diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 4b1b3e9080fe9078380540792cb2c8835acc33dc..431357bf08fd3285f4276d29d66e2020569e5a66 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2422,7 +2422,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { // pRes->code check only serves in launching metric sub-queries if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { - pCmd->command = TSDB_SQL_RETRIEVE_LOCALMERGE; // enable the abort of kill super table function. + pCmd->command = TSDB_SQL_RETRIEVE_GLOBALMERGE; // enable the abort of kill super table function. return pRes->code; } @@ -2778,7 +2778,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p if (code == TSDB_CODE_SUCCESS && trsupport->pExtMemBuffer == NULL) { pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; // no result, set the result empty } else { - pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; + pParentSql->cmd.command = TSDB_SQL_RETRIEVE_GLOBALMERGE; } tscCreateResPointerInfo(&pParentSql->res, pPQueryInfo); @@ -3500,7 +3500,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { } void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pSourceOperator, - char* sql, void* merger, int32_t stage) { + char* sql, void* merger, int32_t stage, uint64_t qId) { assert(pQueryInfo != NULL); SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); if (pQInfo == NULL) { @@ -3509,7 +3509,7 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr // to make sure third party won't overwrite this structure pQInfo->signature = pQInfo; - + pQInfo->qId = qId; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryAttr *pQueryAttr = &pQInfo->query; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 8042f032c8a2fad85de23c4f2bbdc5c07670cdc3..7a9e2d2289212155924aad2d291acf478e884c2f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -222,6 +222,8 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM && functionId != TSDB_FUNC_TS_COMP && + functionId != TSDB_FUNC_DIFF && + functionId != TSDB_FUNC_TS_DUMMY && functionId != TSDB_FUNC_TID_TAG) { return false; } @@ -434,6 +436,23 @@ bool tscIsTWAQuery(SQueryInfo* pQueryInfo) { return false; } + +bool tscIsDiffQuery(SQueryInfo* pQueryInfo) { + size_t num = tscNumOfExprs(pQueryInfo); + for(int32_t i = 0; i < num; ++i) { + SExprInfo* pExpr = tscExprGet(pQueryInfo, i); + if (pExpr == NULL || pExpr->base.functionId == TSDB_FUNC_TS_DUMMY) { + continue; + } + + if (pExpr->base.functionId == TSDB_FUNC_DIFF) { + return true; + } + } + + return false; +} + bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo) { return pQueryInfo->sessionWindow.gap > 0; } @@ -467,42 +486,12 @@ bool tscNeedReverseScan(SQueryInfo* pQueryInfo) { return false; } -bool isSimpleAggregate(SQueryInfo* pQueryInfo) { - if (pQueryInfo->interval.interval > 0) { - return false; - } - - // Note:top/bottom query is fixed output query - if (tscIsTopBotQuery(pQueryInfo) || tscGroupbyColumn(pQueryInfo) || isTsCompQuery(pQueryInfo)) { - return true; - } - - size_t numOfExprs = tscNumOfExprs(pQueryInfo); - for (int32_t i = 0; i < numOfExprs; ++i) { - SExprInfo* pExpr = tscExprGet(pQueryInfo, i); - if (pExpr == NULL) { - continue; - } - - int32_t functionId = pExpr->base.functionId; - if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - - if (!IS_MULTIOUTPUT(aAggs[functionId].status)) { - return true; - } - } - - return false; -} - bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) { if (pQueryInfo->interval.interval > 0 || pQueryInfo->sessionWindow.gap > 0) { return false; } - if (tscGroupbyColumn(pQueryInfo) || isTsCompQuery(pQueryInfo)) { + if (tscGroupbyColumn(pQueryInfo) || isTsCompQuery(pQueryInfo) || tscIsTopBotQuery(pQueryInfo) || tscIsDiffQuery(pQueryInfo)) { return false; } @@ -812,6 +801,7 @@ static void fetchNextBlockIfCompleted(SOperatorInfo* pOperator, bool* newgroup) for (int32_t i = 0; i < pOperator->numOfUpstream; ++i) { SJoinStatus* pStatus = &pJoinInfo->status[i]; if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) { + tscDebug("Retrieve nest query result, index:%d, total:%d", i, pOperator->numOfUpstream); pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup); pStatus->index = 0; @@ -1088,7 +1078,9 @@ static void createInputDataFilterInfo(SQueryInfo* px, int32_t numOfCol1, int32_t tfree(tableCols); } -void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput) { +void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pSql) { + SSqlRes* pOutput = &pSql->res; + // handle the following query process if (px->pQInfo == NULL) { SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->colList); @@ -1166,7 +1158,9 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue } } - px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN); + tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute the main query while all nest query is ready", pSql->self, pSql->self); + px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN, pSql->self); + tfree(pColumnInfo); tfree(schema); @@ -1174,7 +1168,7 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue pSourceOperator->pRuntimeEnv = &px->pQInfo->runtimeEnv; } - uint64_t qId = 0; + uint64_t qId = pSql->self; qTableQuery(px->pQInfo, &qId); convertQueryResult(pOutput, px); } @@ -1372,7 +1366,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; int32_t cmd = pCmd->command; - if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT || + if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_GLOBALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT || cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) { tscRemoveFromSqlList(pSql); } @@ -3436,7 +3430,7 @@ void doRetrieveSubqueryData(SSchedMsg *pMsg) { } SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd); - handleDownstreamOperator(pSql->pSubs, pSql->subState.numOfSub, pQueryInfo, &pSql->res); + handleDownstreamOperator(pSql->pSubs, pSql->subState.numOfSub, pQueryInfo, pSql); pSql->res.qId = -1; if (pSql->res.code == TSDB_CODE_SUCCESS) { @@ -4236,10 +4230,11 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt pQueryAttr->hasTagResults = hasTagValOutput(pQueryInfo); pQueryAttr->stabledev = isStabledev(pQueryInfo); pQueryAttr->tsCompQuery = isTsCompQuery(pQueryInfo); - pQueryAttr->simpleAgg = isSimpleAggregate(pQueryInfo); + pQueryAttr->diffQuery = pQueryInfo->diffQuery; + pQueryAttr->simpleAgg = pQueryInfo->simpleAgg; pQueryAttr->needReverseScan = tscNeedReverseScan(pQueryInfo); pQueryAttr->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type); - pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && tscGroupbyColumn(pQueryInfo); + pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && pQueryInfo->groupbyColumn; pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo); pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo); pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo); @@ -4255,7 +4250,6 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt pQueryAttr->fillType = pQueryInfo->fillType; pQueryAttr->havingNum = pQueryInfo->havingFieldNum; - if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor pQueryAttr->window = pQueryInfo->window; } else { diff --git a/src/common/inc/tcmdtype.h b/src/common/inc/tcmdtype.h index adf210cfebb15ddc0adcfce2aa85e606f79c44a5..90426519e66682ab9d5fab16a3a38e827a7d97df 100644 --- a/src/common/inc/tcmdtype.h +++ b/src/common/inc/tcmdtype.h @@ -76,7 +76,7 @@ enum { // SQL below for client local TSDB_DEFINE_SQL_TYPE( TSDB_SQL_LOCAL, "local" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DESCRIBE_TABLE, "describe-table" ) - TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_LOCALMERGE, "retrieve-localmerge" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_GLOBALMERGE, "retrieve-globalmerge" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_TABLE_JOIN_RETRIEVE, "join-retrieve" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SHOW_CREATE_TABLE, "show-create-table") diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index b5b4fb58548f5067c26644cf63e0a5911b69ad8b..93f2b73d7ac6645563b7e830b82206ca00c0dbe0 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -185,6 +185,7 @@ typedef struct SQueryAttr { bool queryBlockDist; // if query data block distribution bool stabledev; // super table stddev query bool tsCompQuery; // is tscomp query + bool diffQuery; // is diff query bool simpleAgg; bool pointInterpQuery; // point interpolation query bool needReverseScan; // need reverse scan @@ -386,6 +387,7 @@ typedef struct STableScanInfo { int64_t elapsedTime; int32_t tableIndex; + int32_t prevGroupId; // previous table group id } STableScanInfo; typedef struct STagScanInfo { diff --git a/src/query/inc/qTableMeta.h b/src/query/inc/qTableMeta.h index 4fc252b644efab0c6bfdb97b19c3af3ef9c68920..1fd78ed324597ca338824931999bb6c1fa5e1219 100644 --- a/src/query/inc/qTableMeta.h +++ b/src/query/inc/qTableMeta.h @@ -138,6 +138,7 @@ typedef struct SQueryInfo { bool hasFilter; bool onlyTagQuery; bool orderProjectQuery; + bool diffQuery; bool stateWindow; } SQueryInfo; diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index ba6efcabb2f0788908485d3af246a6e0855ed2a1..74c95e7281e2438b77144d87a88e8090fa038fc0 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -3393,7 +3393,7 @@ enum { }; static bool diff_function_setup(SQLFunctionCtx *pCtx) { - if (function_setup(pCtx)) { + if (!function_setup(pCtx)) { return false; } @@ -4606,7 +4606,7 @@ static void rate_function_f(SQLFunctionCtx *pCtx, int32_t index) { pRateInfo->lastValue = v; pRateInfo->lastKey = primaryKey[index]; - + SET_VAL(pCtx, 1, 1); // set has result flag @@ -4630,7 +4630,7 @@ static void rate_func_copy(SQLFunctionCtx *pCtx) { static void rate_finalizer(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SRateInfo *pRateInfo = (SRateInfo *)GET_ROWCELL_INTERBUF(pResInfo); - + if (pRateInfo->hasResult != DATA_SET_FLAG) { setNull(pCtx->pOutput, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); return; @@ -5215,7 +5215,7 @@ SAggFunctionInfo aAggs[] = {{ "diff", TSDB_FUNC_DIFF, TSDB_FUNC_INVALID_ID, - TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS, + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS, diff_function_setup, diff_function, diff_function_f, diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 9e1534eb24b7c4495c57fd9f0f60b1bd7a20ac1e..7972649e25be1b601beb2873db51e1fe1c98e8dd 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2652,7 +2652,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery)) { // stable aggregate, not interval aggregate or normal column aggregate + } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, pTableScanInfo->rowCellInfoOffset, pTableScanInfo->numOfOutput, pRuntimeEnv->current->groupIndex); @@ -4273,6 +4273,12 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) { pRuntimeEnv->current = *pTableQueryInfo; doTableQueryInfoTimeWindowCheck(pQueryAttr, *pTableQueryInfo); + + if (pTableScanInfo->prevGroupId != -1 && pTableScanInfo->prevGroupId != (*pTableQueryInfo)->groupIndex) { + *newgroup = true; + } + + pTableScanInfo->prevGroupId = (*pTableQueryInfo)->groupIndex; } // this function never returns error? @@ -4419,6 +4425,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pInfo->reverseTimes = 0; pInfo->order = pRuntimeEnv->pQueryAttr->order.order; pInfo->current = 0; +// pInfo->prevGroupId = -1; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableScanOperator"; @@ -4441,6 +4448,7 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE pInfo->reverseTimes = 0; pInfo->order = pRuntimeEnv->pQueryAttr->order.order; pInfo->current = 0; + pInfo->prevGroupId = -1; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableSeqScanOperator"; @@ -4545,6 +4553,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime pInfo->reverseTimes = reverseTime; pInfo->current = 0; pInfo->order = pRuntimeEnv->pQueryAttr->order.order; +// pInfo->prevGroupId = -1; SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); pOptr->name = "DataBlocksOptimizedScanOperator"; @@ -4923,10 +4932,17 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) { } // Return result of the previous group in the firstly. - if (*newgroup && pRes->info.rows > 0) { - pArithInfo->existDataBlock = pBlock; - clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput); - return pInfo->pRes; + if (*newgroup) { + if (pRes->info.rows > 0) { + pArithInfo->existDataBlock = pBlock; + clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput); + return pInfo->pRes; + } else { // init output buffer for a new group data + for (int32_t j = 0; j < pOperator->numOfOutput; ++j) { + aAggs[pInfo->pCtx[j].functionId].xFinalize(&pInfo->pCtx[j]); + } + initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfOutput); + } } STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; @@ -7442,7 +7458,6 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data); } - pRuntimeEnv->resultInfo.total += pRuntimeEnv->outputBuf->info.rows; qDebug("QInfo:0x%"PRIx64" current numOfRes rows:%d, total:%" PRId64, pQInfo->qId, pRuntimeEnv->outputBuf->info.rows, pRuntimeEnv->resultInfo.total); diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index 24531c7f4e6e9486cb2ad388ec50fa422606de52..ee587a515dca39559bc6d061501d4e3397c0781a 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -531,7 +531,7 @@ SArray* createTableScanPlan(SQueryAttr* pQueryAttr) { } else { if (pQueryAttr->queryBlockDist) { op = OP_TableBlockInfoScan; - } else if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery) { + } else if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery || pQueryAttr->diffQuery) { op = OP_TableSeqScan; } else if (pQueryAttr->needReverseScan) { op = OP_DataBlocksOptScan; @@ -605,7 +605,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { taosArrayPush(plan, &op); } } else if (pQueryAttr->simpleAgg) { - if (pQueryAttr->stableQuery && !pQueryAttr->tsCompQuery) { + if (pQueryAttr->stableQuery && !pQueryAttr->tsCompQuery && !pQueryAttr->diffQuery) { op = OP_MultiTableAggregate; } else { op = OP_Aggregate; diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index f00d5ceb41aad443911036925ef92a6fc3fd4030..fa2eac66194314cff252f61339097628ba4c4f25 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -241,15 +241,16 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) { bool newgroup = false; pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup); + pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv); if (isQueryKilled(pQInfo)) { qDebug("QInfo:0x%"PRIx64" query is killed", pQInfo->qId); } else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) { - qDebug("QInfo:0x%"PRIx64" over, %u tables queried, %"PRId64" rows are returned", pQInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables, + qDebug("QInfo:0x%"PRIx64" over, %u tables queried, total %"PRId64" rows returned", pQInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables, pRuntimeEnv->resultInfo.total); } else { - qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, numOfTotal:%" PRId64 " rows", - pQInfo->qId, GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total + GET_NUM_OF_RESULTS(pRuntimeEnv)); + qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows", pQInfo->qId, + GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total); } return doBuildResCheck(pQInfo);