diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 6479a7ecba505c90025b66c41f8740651132cb66..280c8b7ec2b8887d161fce04e5c22efecc0c4e46 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4599,7 +4599,7 @@ int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu const char* msg0 = "only support order by primary timestamp"; const char* msg1 = "invalid column name"; const char* msg2 = "only support order by primary timestamp or queried column"; - const char* msg3 = "only support order by primary timestamp or first tag in groupby clause"; + const char* msg3 = "invalid column in order by clause, only primary timestamp or first tag in groupby clause allowed"; setDefaultOrderInfo(pQueryInfo); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -5279,8 +5279,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn const char* msg0 = "soffset/offset can not be less than 0"; const char* msg1 = "slimit/soffset only available for STable query"; - const char* msg2 = "functions mixed up in table query"; - const char* msg3 = "slimit/soffset can not apply to projection query"; + const char* msg2 = "slimit/soffset can not apply to projection query"; // handle the limit offset value, validate the limit pQueryInfo->limit = pQuerySql->limit; @@ -5305,7 +5304,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn if (!tscQueryTags(pQueryInfo)) { // local handle the super table tag query if (tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { if (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } // for projection query on super table, all queries are subqueries @@ -5363,24 +5362,6 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn if (pQueryInfo->slimit.limit != -1 || pQueryInfo->slimit.offset != 0) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - - size_t size = taosArrayGetSize(pQueryInfo->exprList); - - bool hasTags = false; - bool hasOtherFunc = false; - // filter the query functions operating on "tbname" column that are not supported by normal columns. - for (int32_t i = 0; i < size; ++i) { - SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); - if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) { - hasTags = true; - } else { - hasOtherFunc = true; - } - } - - if (hasTags && hasOtherFunc) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); - } } return TSDB_CODE_SUCCESS; @@ -6282,10 +6263,12 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { const char* msg1 = "invalid table name"; + const char* msg2 = "functions not allowed in CQ"; const char* msg3 = "fill only available for interval query"; const char* msg4 = "fill option not supported in stream computing"; const char* msg5 = "sql too long"; // todo ADD support const char* msg6 = "from missing in subclause"; + const char* msg7 = "time interval is required"; SSqlCmd* pCmd = &pSql->cmd; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); @@ -6295,10 +6278,10 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); // if sql specifies db, use it, otherwise use default db - SStrToken* pzTableName = &(pCreateTable->name); + SStrToken* pName = &(pCreateTable->name); SQuerySQL* pQuerySql = pCreateTable->pSelect; - if (tscValidateName(pzTableName) != TSDB_CODE_SUCCESS) { + if (tscValidateName(pName) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } @@ -6337,15 +6320,19 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { // set interval value if (parseIntervalClause(pSql, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; - } else { - if ((pQueryInfo->interval.interval > 0) && - (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) { - return TSDB_CODE_TSC_INVALID_SQL; - } + } + + if ((pQueryInfo->interval.interval > 0) && + (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + } + + if (!tscIsProjectionQuery(pQueryInfo) && pQueryInfo->interval.interval == 0) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7); } // set the created table[stream] name - code = tscSetTableFullName(pTableMetaInfo, pzTableName, pSql); + code = tscSetTableFullName(pTableMetaInfo, pName, pSql); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index c1ed9b0ba09f956207eeeccb2c7b8123538a2151..d1004fff62db5d58c0ae0ef45bd4571ee7368b20 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -65,44 +65,51 @@ static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, in return retryDelta; } -static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { - SSqlStream *pStream = (SSqlStream *)pMsg->ahandle; - SSqlObj * pSql = pStream->pSql; +static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) { + SSqlStream *pStream = (SSqlStream *)param; + assert(pStream->pSql == tres && code == TSDB_CODE_SUCCESS); - pSql->fp = tscProcessStreamQueryCallback; - pSql->fetchFp = tscProcessStreamQueryCallback; - pSql->param = pStream; + SSqlObj* pSql = (SSqlObj*) tres; + pSql->fp = doLaunchQuery; + pSql->fetchFp = doLaunchQuery; pSql->res.completed = false; - + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - int code = tscGetTableMeta(pSql, pTableMetaInfo); - pSql->res.code = code; - + code = tscGetTableMeta(pSql, pTableMetaInfo); if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { code = tscGetSTableVgroupInfo(pSql, 0); - pSql->res.code = code; } - // failed to get meter/metric meta, retry in 10sec. - if (code != TSDB_CODE_SUCCESS) { - int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision); - tscDebug("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime); - tscSetRetryTimer(pStream, pSql, retryDelayTime); - - } else { + // failed to get table Meta or vgroup list, retry in 10sec. + if (code == TSDB_CODE_SUCCESS) { tscTansformSQLFuncForSTableQuery(pQueryInfo); - tscDebug("%p stream:%p start stream query on:%s", pSql, pStream, pTableMetaInfo->name); - tscDoQuery(pStream->pSql); + tscDebug("%p stream:%p, start stream query on:%s", pSql, pStream, pTableMetaInfo->name); + + pSql->fp = tscProcessStreamQueryCallback; + pSql->fetchFp = tscProcessStreamQueryCallback; + tscDoQuery(pSql); tscIncStreamExecutionCount(pStream); + } else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + pSql->res.code = code; + int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision); + tscDebug("%p stream:%p, get table Meta failed, retry in %" PRId64 "ms", pSql, pStream, retryDelayTime); + tscSetRetryTimer(pStream, pSql, retryDelayTime); } } +static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { + SSqlStream *pStream = (SSqlStream *)pMsg->ahandle; + doLaunchQuery(pStream, pStream->pSql, 0); +} + static void tscProcessStreamTimer(void *handle, void *tmrId) { SSqlStream *pStream = (SSqlStream *)handle; - if (pStream == NULL) return; - if (pStream->pTimer != tmrId) return; + if (pStream == NULL || pStream->pTimer != tmrId) { + return; + } + pStream->pTimer = NULL; pStream->numOfRes = 0; // reset the numOfRes. @@ -392,11 +399,16 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { tscSetRetryTimer(pStream, pSql, timer); } -static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { +static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { int64_t minIntervalTime = (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinIntervalTime * 1000L : tsMinIntervalTime; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + + if (!pStream->isProject && pQueryInfo->interval.interval == 0) { + sprintf(pSql->cmd.payload, "the interval value is 0"); + return -1; + } if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit!= 'y' && pQueryInfo->interval.interval < minIntervalTime) { tscWarn("%p stream:%p, original sample interval:%" PRId64 " too small, reset to:%" PRId64, pSql, pStream, @@ -436,6 +448,8 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { pQueryInfo->interval.interval = 0; // clear the interval value to avoid the force time window split by query processor pQueryInfo->interval.sliding = 0; } + + return TSDB_CODE_SUCCESS; } static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) { @@ -485,34 +499,19 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) { return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer; } -static void setErrorInfo(SSqlObj* pSql, int32_t code, char* info) { - if (pSql == NULL) { - return; - } - - SSqlCmd* pCmd = &pSql->cmd; - - pSql->res.code = code; - - if (info != NULL) { - strncpy(pCmd->payload, info, pCmd->payloadLen); - } -} - static void tscCreateStream(void *param, TAOS_RES *res, int code) { SSqlStream* pStream = (SSqlStream*)param; SSqlObj* pSql = pStream->pSql; SSqlCmd* pCmd = &pSql->cmd; if (code != TSDB_CODE_SUCCESS) { - setErrorInfo(pSql, code, pCmd->payload); - tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, pSql->sqlstr, pCmd->payload, code); + pSql->res.code = code; + tscError("%p open stream failed, sql:%s, reason:%s, code:%s", pSql, pSql->sqlstr, pCmd->payload, tstrerror(code)); + pStream->fp(pStream->param, NULL, NULL); return; } - registerSqlObj(pSql); - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); @@ -523,13 +522,22 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { pStream->ctime = taosGetTimestamp(pStream->precision); pStream->etime = pQueryInfo->window.ekey; - tscAddIntoStreamList(pStream); + if (tscSetSlidingWindowInfo(pSql, pStream) != TSDB_CODE_SUCCESS) { + pSql->res.code = code; + + tscError("%p stream %p open failed, since the interval value is incorrect", pSql, pStream); + pStream->fp(pStream->param, NULL, NULL); + return; + } - tscSetSlidingWindowInfo(pSql, pStream); pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime); int64_t starttime = tscGetLaunchTimestamp(pStream); pCmd->command = TSDB_SQL_SELECT; + + registerSqlObj(pSql); + tscAddIntoStreamList(pStream); + taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer); tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql, diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 681291d0db7039af09147d86f6db1fa48ccb233c..063b6af0e621a2e0609577b7a0a1ac75324ab047 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2094,6 +2094,13 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo } void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { + // the param may be null, since it may be done by other query threads. and the asyncOnError may enter in this + // function while kill query by a user. + if (param == NULL) { + assert(code != TSDB_CODE_SUCCESS); + return; + } + SRetrieveSupport *trsupport = (SRetrieveSupport *) param; SSqlObj* pParentSql = trsupport->pParentSql; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 32a68549fabd73104c56ea89a705f7cf5a6a30d1..c3d60e21dc59ccbbf9940c4a155ee29d57d2f8a2 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -152,7 +152,7 @@ typedef struct SQuery { int16_t precision; int16_t numOfOutput; int16_t fillType; - int16_t checkBuffer; // check if the buffer is full during scan each block + int16_t checkResultBuf; // check if the buffer is full during scan each block SLimitVal limit; int32_t rowSize; SSqlGroupbyExpr* pGroupbyExpr; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 05487d435d933d019fda5fbe1551a5a2118fad22..001b8654b117abf56aa8321ae6eed8f4a0363a5e 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1708,7 +1708,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl numOfRes = (int32_t) getNumOfResult(pRuntimeEnv); // update the number of output result - if (numOfRes > 0 && pQuery->checkBuffer == 1) { + if (numOfRes > 0 && pQuery->checkResultBuf == 1) { assert(numOfRes >= pQuery->rec.rows); pQuery->rec.rows = numOfRes; @@ -2222,9 +2222,9 @@ void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int6 static void setScanLimitationByResultBuffer(SQuery *pQuery) { if (isTopBottomQuery(pQuery)) { - pQuery->checkBuffer = 0; + pQuery->checkResultBuf = 0; } else if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { - pQuery->checkBuffer = 0; + pQuery->checkResultBuf = 0; } else { bool hasMultioutput = false; for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { @@ -2239,7 +2239,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) { } } - pQuery->checkBuffer = hasMultioutput ? 1 : 0; + pQuery->checkResultBuf = hasMultioutput ? 1 : 0; } } @@ -2911,7 +2911,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) { SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); SExprInfo *pExprInfo = &pQuery->pExpr1[0]; - if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) { + if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP && pRuntimeEnv->stableQuery) { assert(pExprInfo->base.numOfParams == 1); int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64; @@ -3674,6 +3674,10 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { SQueryStatusInfo qstatus = getQueryStatusInfo(pRuntimeEnv, start); SET_MASTER_SCAN_FLAG(pRuntimeEnv); + if (!pRuntimeEnv->groupbyColumn && pRuntimeEnv->hasTagResults) { + setTagVal(pRuntimeEnv, pTableQueryInfo->pTable, pQInfo->tsdb); + } + while (1) { doScanAllDataBlocks(pRuntimeEnv); @@ -4757,20 +4761,21 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) { } } +// TODO refactor: setAdditionalInfo static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTableQueryInfo, SDataBlockInfo* pBlockInfo) { SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQuery* pQuery = pQInfo->runtimeEnv.pQuery; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - if (!QUERY_IS_INTERVAL_QUERY(pQuery)) { - setExecutionContext(pQInfo, pTableQueryInfo->groupIndex, pBlockInfo->window.ekey + step); - } else { // interval query + if (QUERY_IS_INTERVAL_QUERY(pQuery)) { TSKEY nextKey = pBlockInfo->window.skey; setIntervalQueryRange(pQInfo, nextKey); if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTsBuf != NULL) { setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo); } + } else { // non-interval query + setExecutionContext(pQInfo, pTableQueryInfo->groupIndex, pBlockInfo->window.ekey + step); } } @@ -5626,8 +5631,6 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) return; } - pQuery->current = pTableInfo; // set current query table info - scanOneTableDataBlocks(pRuntimeEnv, pTableInfo->lastKey); finalizeQueryResult(pRuntimeEnv); @@ -5646,10 +5649,8 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery *pQuery = pRuntimeEnv->pQuery; - pQuery->current = pTableInfo; - // for ts_comp query, re-initialized is not allowed + SQuery *pQuery = pRuntimeEnv->pQuery; if (!isTSCompQuery(pQuery)) { resetDefaultResInfoOutputBuf(pRuntimeEnv); } @@ -5701,9 +5702,7 @@ static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) // handle time interval query on table static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv); - SQuery *pQuery = pRuntimeEnv->pQuery; - pQuery->current = pTableInfo; TSKEY newStartKey = QUERY_IS_ASC_QUERY(pQuery)? INT64_MIN:INT64_MAX; @@ -5773,7 +5772,6 @@ static void tableQueryImpl(SQInfo *pQInfo) { } qDebug("QInfo:%p current:%" PRId64 " returned, total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total); - return; } else { pQuery->rec.rows = 0; assert(pRuntimeEnv->windowResInfo.size > 0); @@ -5791,9 +5789,9 @@ static void tableQueryImpl(SQInfo *pQInfo) { if (pQuery->rec.rows <= 0 || pRuntimeEnv->windowResInfo.size <= pQInfo->groupIndex) { qDebug("QInfo:%p query over, %" PRId64 " rows are returned", pQInfo, pQuery->rec.total); } - - return; } + + return; } // number of points returned during this query @@ -5802,7 +5800,9 @@ static void tableQueryImpl(SQInfo *pQInfo) { assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1); SArray* g = GET_TABLEGROUP(pQInfo, 0); + STableQueryInfo* item = taosArrayGetP(g, 0); + pQuery->current = item; // group by normal column, sliding window query, interval query are handled by interval query processor if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyColumn) { // interval (down sampling operation) @@ -5810,7 +5810,7 @@ static void tableQueryImpl(SQInfo *pQInfo) { } else if (isFixedOutputQuery(pRuntimeEnv)) { tableAggregationProcess(pQInfo, item); } else { // diff/add/multiply/subtract/division - assert(pQuery->checkBuffer == 1); + assert(pQuery->checkResultBuf == 1); tableProjectionProcess(pQInfo, item); } @@ -5830,7 +5830,7 @@ static void stableQueryImpl(SQInfo *pQInfo) { (isFixedOutputQuery(pRuntimeEnv) && (!isPointInterpoQuery(pQuery)) && (!pRuntimeEnv->groupbyColumn))) { multiTableQueryProcess(pQInfo); } else { - assert((pQuery->checkBuffer == 1 && pQuery->interval.interval == 0) || isPointInterpoQuery(pQuery) || + assert((pQuery->checkResultBuf == 1 && pQuery->interval.interval == 0) || isPointInterpoQuery(pQuery) || pRuntimeEnv->groupbyColumn); sequentialTableProcess(pQInfo); diff --git a/tests/script/general/parser/select_with_tags.sim b/tests/script/general/parser/select_with_tags.sim index dab76f60044c7b587e6c98bb256f895aeedb656e..c254d31ffc20693918ae32db0e9afb036a7fd44f 100644 --- a/tests/script/general/parser/select_with_tags.sim +++ b/tests/script/general/parser/select_with_tags.sim @@ -813,8 +813,6 @@ sql_error select first(ts), first(c1),tbname from select_tags_mt0; sql_error select first(ts), last(ts), tbname from select_tags_mt0; sql_error select last_row(*), first(ts), tbname, t1, t2 from select_tags_mt0; sql_error select tbname, last_row(*), t1, first(ts) from select_tags_mt0; -sql_error select first(ts), tbname from select_tags_tb0; -sql_error select last_row(*), t1 from select_tags_tb0; sql_error select count(*), tbname from select_tags_mt0; sql_error select sum(c2), tbname from select_tags_mt0; sql_error select avg(c3), tbname from select_tags_mt0;