未验证 提交 c52f23ee 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #4825 from taosdata/feature/query

Feature/query
......@@ -4599,7 +4599,7 @@ int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu
const char* msg0 = "only support order by primary timestamp";
const char* msg1 = "invalid column name";
const char* msg2 = "only support order by primary timestamp or queried column";
const char* msg3 = "only support order by primary timestamp or first tag in groupby clause";
const char* msg3 = "invalid column in order by clause, only primary timestamp or first tag in groupby clause allowed";
setDefaultOrderInfo(pQueryInfo);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -5279,8 +5279,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
const char* msg0 = "soffset/offset can not be less than 0";
const char* msg1 = "slimit/soffset only available for STable query";
const char* msg2 = "functions mixed up in table query";
const char* msg3 = "slimit/soffset can not apply to projection query";
const char* msg2 = "slimit/soffset can not apply to projection query";
// handle the limit offset value, validate the limit
pQueryInfo->limit = pQuerySql->limit;
......@@ -5305,7 +5304,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
if (!tscQueryTags(pQueryInfo)) { // local handle the super table tag query
if (tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
if (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
// for projection query on super table, all queries are subqueries
......@@ -5363,24 +5362,6 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
if (pQueryInfo->slimit.limit != -1 || pQueryInfo->slimit.offset != 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
size_t size = taosArrayGetSize(pQueryInfo->exprList);
bool hasTags = false;
bool hasOtherFunc = false;
// filter the query functions operating on "tbname" column that are not supported by normal columns.
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
hasTags = true;
} else {
hasOtherFunc = true;
}
}
if (hasTags && hasOtherFunc) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
}
return TSDB_CODE_SUCCESS;
......@@ -6282,10 +6263,12 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
const char* msg1 = "invalid table name";
const char* msg2 = "functions not allowed in CQ";
const char* msg3 = "fill only available for interval query";
const char* msg4 = "fill option not supported in stream computing";
const char* msg5 = "sql too long"; // todo ADD support
const char* msg6 = "from missing in subclause";
const char* msg7 = "time interval is required";
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
......@@ -6295,10 +6278,10 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// if sql specifies db, use it, otherwise use default db
SStrToken* pzTableName = &(pCreateTable->name);
SStrToken* pName = &(pCreateTable->name);
SQuerySQL* pQuerySql = pCreateTable->pSelect;
if (tscValidateName(pzTableName) != TSDB_CODE_SUCCESS) {
if (tscValidateName(pName) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
......@@ -6337,15 +6320,19 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
// set interval value
if (parseIntervalClause(pSql, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
} else {
if ((pQueryInfo->interval.interval > 0) &&
(validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
return TSDB_CODE_TSC_INVALID_SQL;
}
}
if ((pQueryInfo->interval.interval > 0) &&
(validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
if (!tscIsProjectionQuery(pQueryInfo) && pQueryInfo->interval.interval == 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
// set the created table[stream] name
code = tscSetTableFullName(pTableMetaInfo, pzTableName, pSql);
code = tscSetTableFullName(pTableMetaInfo, pName, pSql);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......
......@@ -65,44 +65,51 @@ static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, in
return retryDelta;
}
static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
SSqlStream *pStream = (SSqlStream *)pMsg->ahandle;
SSqlObj * pSql = pStream->pSql;
static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
SSqlStream *pStream = (SSqlStream *)param;
assert(pStream->pSql == tres && code == TSDB_CODE_SUCCESS);
pSql->fp = tscProcessStreamQueryCallback;
pSql->fetchFp = tscProcessStreamQueryCallback;
pSql->param = pStream;
SSqlObj* pSql = (SSqlObj*) tres;
pSql->fp = doLaunchQuery;
pSql->fetchFp = doLaunchQuery;
pSql->res.completed = false;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int code = tscGetTableMeta(pSql, pTableMetaInfo);
pSql->res.code = code;
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, 0);
pSql->res.code = code;
}
// failed to get meter/metric meta, retry in 10sec.
if (code != TSDB_CODE_SUCCESS) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
tscDebug("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime);
tscSetRetryTimer(pStream, pSql, retryDelayTime);
} else {
// failed to get table Meta or vgroup list, retry in 10sec.
if (code == TSDB_CODE_SUCCESS) {
tscTansformSQLFuncForSTableQuery(pQueryInfo);
tscDebug("%p stream:%p start stream query on:%s", pSql, pStream, pTableMetaInfo->name);
tscDoQuery(pStream->pSql);
tscDebug("%p stream:%p, start stream query on:%s", pSql, pStream, pTableMetaInfo->name);
pSql->fp = tscProcessStreamQueryCallback;
pSql->fetchFp = tscProcessStreamQueryCallback;
tscDoQuery(pSql);
tscIncStreamExecutionCount(pStream);
} else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
pSql->res.code = code;
int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
tscDebug("%p stream:%p, get table Meta failed, retry in %" PRId64 "ms", pSql, pStream, retryDelayTime);
tscSetRetryTimer(pStream, pSql, retryDelayTime);
}
}
static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
SSqlStream *pStream = (SSqlStream *)pMsg->ahandle;
doLaunchQuery(pStream, pStream->pSql, 0);
}
static void tscProcessStreamTimer(void *handle, void *tmrId) {
SSqlStream *pStream = (SSqlStream *)handle;
if (pStream == NULL) return;
if (pStream->pTimer != tmrId) return;
if (pStream == NULL || pStream->pTimer != tmrId) {
return;
}
pStream->pTimer = NULL;
pStream->numOfRes = 0; // reset the numOfRes.
......@@ -392,11 +399,16 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
tscSetRetryTimer(pStream, pSql, timer);
}
static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
int64_t minIntervalTime =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinIntervalTime * 1000L : tsMinIntervalTime;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (!pStream->isProject && pQueryInfo->interval.interval == 0) {
sprintf(pSql->cmd.payload, "the interval value is 0");
return -1;
}
if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit!= 'y' && pQueryInfo->interval.interval < minIntervalTime) {
tscWarn("%p stream:%p, original sample interval:%" PRId64 " too small, reset to:%" PRId64, pSql, pStream,
......@@ -436,6 +448,8 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
pQueryInfo->interval.interval = 0; // clear the interval value to avoid the force time window split by query processor
pQueryInfo->interval.sliding = 0;
}
return TSDB_CODE_SUCCESS;
}
static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) {
......@@ -485,34 +499,19 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) {
return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer;
}
static void setErrorInfo(SSqlObj* pSql, int32_t code, char* info) {
if (pSql == NULL) {
return;
}
SSqlCmd* pCmd = &pSql->cmd;
pSql->res.code = code;
if (info != NULL) {
strncpy(pCmd->payload, info, pCmd->payloadLen);
}
}
static void tscCreateStream(void *param, TAOS_RES *res, int code) {
SSqlStream* pStream = (SSqlStream*)param;
SSqlObj* pSql = pStream->pSql;
SSqlCmd* pCmd = &pSql->cmd;
if (code != TSDB_CODE_SUCCESS) {
setErrorInfo(pSql, code, pCmd->payload);
tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, pSql->sqlstr, pCmd->payload, code);
pSql->res.code = code;
tscError("%p open stream failed, sql:%s, reason:%s, code:%s", pSql, pSql->sqlstr, pCmd->payload, tstrerror(code));
pStream->fp(pStream->param, NULL, NULL);
return;
}
registerSqlObj(pSql);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
......@@ -523,13 +522,22 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
pStream->ctime = taosGetTimestamp(pStream->precision);
pStream->etime = pQueryInfo->window.ekey;
tscAddIntoStreamList(pStream);
if (tscSetSlidingWindowInfo(pSql, pStream) != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscError("%p stream %p open failed, since the interval value is incorrect", pSql, pStream);
pStream->fp(pStream->param, NULL, NULL);
return;
}
tscSetSlidingWindowInfo(pSql, pStream);
pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime);
int64_t starttime = tscGetLaunchTimestamp(pStream);
pCmd->command = TSDB_SQL_SELECT;
registerSqlObj(pSql);
tscAddIntoStreamList(pStream);
taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer);
tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql,
......
......@@ -2094,6 +2094,13 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo
}
void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
// the param may be null, since it may be done by other query threads. and the asyncOnError may enter in this
// function while kill query by a user.
if (param == NULL) {
assert(code != TSDB_CODE_SUCCESS);
return;
}
SRetrieveSupport *trsupport = (SRetrieveSupport *) param;
SSqlObj* pParentSql = trsupport->pParentSql;
......
......@@ -152,7 +152,7 @@ typedef struct SQuery {
int16_t precision;
int16_t numOfOutput;
int16_t fillType;
int16_t checkBuffer; // check if the buffer is full during scan each block
int16_t checkResultBuf; // check if the buffer is full during scan each block
SLimitVal limit;
int32_t rowSize;
SSqlGroupbyExpr* pGroupbyExpr;
......
......@@ -1708,7 +1708,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
numOfRes = (int32_t) getNumOfResult(pRuntimeEnv);
// update the number of output result
if (numOfRes > 0 && pQuery->checkBuffer == 1) {
if (numOfRes > 0 && pQuery->checkResultBuf == 1) {
assert(numOfRes >= pQuery->rec.rows);
pQuery->rec.rows = numOfRes;
......@@ -2222,9 +2222,9 @@ void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int6
static void setScanLimitationByResultBuffer(SQuery *pQuery) {
if (isTopBottomQuery(pQuery)) {
pQuery->checkBuffer = 0;
pQuery->checkResultBuf = 0;
} else if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
pQuery->checkBuffer = 0;
pQuery->checkResultBuf = 0;
} else {
bool hasMultioutput = false;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
......@@ -2239,7 +2239,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) {
}
}
pQuery->checkBuffer = hasMultioutput ? 1 : 0;
pQuery->checkResultBuf = hasMultioutput ? 1 : 0;
}
}
......@@ -2911,7 +2911,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) {
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SExprInfo *pExprInfo = &pQuery->pExpr1[0];
if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) {
if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP && pRuntimeEnv->stableQuery) {
assert(pExprInfo->base.numOfParams == 1);
int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64;
......@@ -3674,6 +3674,10 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
SQueryStatusInfo qstatus = getQueryStatusInfo(pRuntimeEnv, start);
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
if (!pRuntimeEnv->groupbyColumn && pRuntimeEnv->hasTagResults) {
setTagVal(pRuntimeEnv, pTableQueryInfo->pTable, pQInfo->tsdb);
}
while (1) {
doScanAllDataBlocks(pRuntimeEnv);
......@@ -4757,20 +4761,21 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
// TODO refactor: setAdditionalInfo
static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTableQueryInfo, SDataBlockInfo* pBlockInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
setExecutionContext(pQInfo, pTableQueryInfo->groupIndex, pBlockInfo->window.ekey + step);
} else { // interval query
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
TSKEY nextKey = pBlockInfo->window.skey;
setIntervalQueryRange(pQInfo, nextKey);
if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTsBuf != NULL) {
setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo);
}
} else { // non-interval query
setExecutionContext(pQInfo, pTableQueryInfo->groupIndex, pBlockInfo->window.ekey + step);
}
}
......@@ -5626,8 +5631,6 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
return;
}
pQuery->current = pTableInfo; // set current query table info
scanOneTableDataBlocks(pRuntimeEnv, pTableInfo->lastKey);
finalizeQueryResult(pRuntimeEnv);
......@@ -5646,10 +5649,8 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
pQuery->current = pTableInfo;
// for ts_comp query, re-initialized is not allowed
SQuery *pQuery = pRuntimeEnv->pQuery;
if (!isTSCompQuery(pQuery)) {
resetDefaultResInfoOutputBuf(pRuntimeEnv);
}
......@@ -5701,9 +5702,7 @@ static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
// handle time interval query on table
static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv);
SQuery *pQuery = pRuntimeEnv->pQuery;
pQuery->current = pTableInfo;
TSKEY newStartKey = QUERY_IS_ASC_QUERY(pQuery)? INT64_MIN:INT64_MAX;
......@@ -5773,7 +5772,6 @@ static void tableQueryImpl(SQInfo *pQInfo) {
}
qDebug("QInfo:%p current:%" PRId64 " returned, total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total);
return;
} else {
pQuery->rec.rows = 0;
assert(pRuntimeEnv->windowResInfo.size > 0);
......@@ -5791,9 +5789,9 @@ static void tableQueryImpl(SQInfo *pQInfo) {
if (pQuery->rec.rows <= 0 || pRuntimeEnv->windowResInfo.size <= pQInfo->groupIndex) {
qDebug("QInfo:%p query over, %" PRId64 " rows are returned", pQInfo, pQuery->rec.total);
}
return;
}
return;
}
// number of points returned during this query
......@@ -5802,7 +5800,9 @@ static void tableQueryImpl(SQInfo *pQInfo) {
assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1);
SArray* g = GET_TABLEGROUP(pQInfo, 0);
STableQueryInfo* item = taosArrayGetP(g, 0);
pQuery->current = item;
// group by normal column, sliding window query, interval query are handled by interval query processor
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyColumn) { // interval (down sampling operation)
......@@ -5810,7 +5810,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
} else if (isFixedOutputQuery(pRuntimeEnv)) {
tableAggregationProcess(pQInfo, item);
} else { // diff/add/multiply/subtract/division
assert(pQuery->checkBuffer == 1);
assert(pQuery->checkResultBuf == 1);
tableProjectionProcess(pQInfo, item);
}
......@@ -5830,7 +5830,7 @@ static void stableQueryImpl(SQInfo *pQInfo) {
(isFixedOutputQuery(pRuntimeEnv) && (!isPointInterpoQuery(pQuery)) && (!pRuntimeEnv->groupbyColumn))) {
multiTableQueryProcess(pQInfo);
} else {
assert((pQuery->checkBuffer == 1 && pQuery->interval.interval == 0) || isPointInterpoQuery(pQuery) ||
assert((pQuery->checkResultBuf == 1 && pQuery->interval.interval == 0) || isPointInterpoQuery(pQuery) ||
pRuntimeEnv->groupbyColumn);
sequentialTableProcess(pQInfo);
......
......@@ -813,8 +813,6 @@ sql_error select first(ts), first(c1),tbname from select_tags_mt0;
sql_error select first(ts), last(ts), tbname from select_tags_mt0;
sql_error select last_row(*), first(ts), tbname, t1, t2 from select_tags_mt0;
sql_error select tbname, last_row(*), t1, first(ts) from select_tags_mt0;
sql_error select first(ts), tbname from select_tags_tb0;
sql_error select last_row(*), t1 from select_tags_tb0;
sql_error select count(*), tbname from select_tags_mt0;
sql_error select sum(c2), tbname from select_tags_mt0;
sql_error select avg(c3), tbname from select_tags_mt0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册