提交 e424202e 编写于 作者: H hjxilinx

[td-171] refactor codes

上级 5e221d72
......@@ -200,14 +200,14 @@ typedef struct SQueryInfo {
uint16_t type; // query/insert/import type
char slidingTimeUnit;
int64_t etime, stime;
STimeWindow window;
int64_t intervalTime; // aggregation time interval
int64_t slidingTime; // sliding window in mseconds
SSqlGroupbyExpr groupbyExpr; // group by tags info
SArray * colList; // SArray<SColumn*>
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
SArray * exprsInfo; // SArray<SSqlExpr*>
SArray * exprsInfo; // SArray<SSqlExpr*>
SLimitVal limit;
SLimitVal slimit;
STagCond tagCond;
......
......@@ -2656,7 +2656,7 @@ typedef struct SCondExpr {
bool tsJoin;
} SCondExpr;
static int32_t getTimeRange(int64_t* stime, int64_t* etime, tSQLExpr* pRight, int32_t optr, int16_t timePrecision);
static int32_t getTimeRange(STimeWindow* win, tSQLExpr* pRight, int32_t optr, int16_t timePrecision);
static int32_t tSQLExprNodeToString(tSQLExpr* pExpr, char** str) {
if (pExpr->nSQLOptr == TK_ID) { // column name
......@@ -3631,20 +3631,18 @@ static int32_t getTimeRangeFromExpr(SQueryInfo* pQueryInfo, tSQLExpr* pExpr) {
tSQLExpr* pRight = pExpr->pRight;
TSKEY stime = 0;
TSKEY etime = INT64_MAX;
if (getTimeRange(&stime, &etime, pRight, pExpr->nSQLOptr, tinfo.precision) != TSDB_CODE_SUCCESS) {
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
if (getTimeRange(&win, pRight, pExpr->nSQLOptr, tinfo.precision) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg0);
}
// update the timestamp query range
if (pQueryInfo->stime < stime) {
pQueryInfo->stime = stime;
if (pQueryInfo->window.skey < win.skey) {
pQueryInfo->window.skey = win.skey;
}
if (pQueryInfo->etime > etime) {
pQueryInfo->etime = etime;
if (pQueryInfo->window.ekey > win.ekey) {
pQueryInfo->window.ekey = win.ekey;
}
}
......@@ -3756,8 +3754,8 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SSqlObj* pSql
int32_t ret = TSDB_CODE_SUCCESS;
pQueryInfo->stime = 0;
pQueryInfo->etime = INT64_MAX;
pQueryInfo->window.skey = 0;
pQueryInfo->window.ekey = INT64_MAX;
// tags query condition may be larger than 512bytes, therefore, we need to prepare enough large space
SStringBuilder sb = {0};
......@@ -3823,7 +3821,7 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SSqlObj* pSql
return ret;
}
int32_t getTimeRange(int64_t* stime, int64_t* etime, tSQLExpr* pRight, int32_t optr, int16_t timePrecision) {
int32_t getTimeRange(STimeWindow* win, tSQLExpr* pRight, int32_t optr, int16_t timePrecision) {
// this is join condition, do nothing
if (pRight->nSQLOptr == TK_ID) {
return TSDB_CODE_SUCCESS;
......@@ -3898,16 +3896,15 @@ int32_t getTimeRange(int64_t* stime, int64_t* etime, tSQLExpr* pRight, int32_t o
}
if (optr == TK_LE) {
*etime = val;
win->ekey = val;
} else if (optr == TK_LT) {
*etime = val - delta;
win->ekey = val - delta;
} else if (optr == TK_GT) {
*stime = val + delta;
win->skey = val + delta;
} else if (optr == TK_GE) {
*stime = val;
win->skey = val;
} else if (optr == TK_EQ) {
*stime = val;
*etime = *stime;
win->ekey = win->skey = val;
}
return TSDB_CODE_SUCCESS;
}
......@@ -4612,7 +4609,7 @@ bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo) {
return true;
}
return (pQueryInfo->stime == pQueryInfo->etime) && (pQueryInfo->stime != 0);
return (pQueryInfo->window.skey == pQueryInfo->window.ekey) && (pQueryInfo->window.skey != 0);
}
int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* pQuerySql, SSqlObj* pSql) {
......@@ -5757,22 +5754,22 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
pQuerySql->pWhere = NULL;
if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) {
pQueryInfo->stime = pQueryInfo->stime / 1000;
pQueryInfo->etime = pQueryInfo->etime / 1000;
pQueryInfo->window.skey = pQueryInfo->window.skey / 1000;
pQueryInfo->window.ekey = pQueryInfo->window.ekey / 1000;
}
} else { // set the time rang
pQueryInfo->stime = 0;
pQueryInfo->etime = INT64_MAX;
pQueryInfo->window.skey = 0;
pQueryInfo->window.ekey = INT64_MAX;
}
// user does not specified the query time window, twa is not allowed in such case.
if ((pQueryInfo->stime == 0 || pQueryInfo->etime == INT64_MAX ||
(pQueryInfo->etime == INT64_MAX / 1000 && tinfo.precision == TSDB_TIME_PRECISION_MILLI)) && tscIsTWAQuery(pQueryInfo)) {
if ((pQueryInfo->window.skey == 0 || pQueryInfo->window.ekey == INT64_MAX ||
(pQueryInfo->window.ekey == INT64_MAX / 1000 && tinfo.precision == TSDB_TIME_PRECISION_MILLI)) && tscIsTWAQuery(pQueryInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg9);
}
// no result due to invalid query time range
if (pQueryInfo->stime > pQueryInfo->etime) {
if (pQueryInfo->window.skey > pQueryInfo->window.ekey) {
pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
return TSDB_CODE_SUCCESS;
}
......@@ -5783,9 +5780,9 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
// in case of join query, time range is required.
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
int64_t timeRange = labs(pQueryInfo->stime - pQueryInfo->etime);
int64_t timeRange = labs(pQueryInfo->window.skey - pQueryInfo->window.ekey);
if (timeRange == 0 && pQueryInfo->stime == 0) {
if (timeRange == 0 && pQueryInfo->window.skey == 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
}
......@@ -5814,7 +5811,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
}
if (pQueryInfo->intervalTime > 0) {
int64_t timeRange = labs(pQueryInfo->stime - pQueryInfo->etime);
int64_t timeRange = labs(pQueryInfo->window.skey - pQueryInfo->window.ekey);
// number of result is not greater than 10,000,000
if ((timeRange == 0) || (timeRange / pQueryInfo->intervalTime) > MAX_RETRIEVE_ROWS_IN_INTERVAL_QUERY) {
return invalidSqlErrMsg(pQueryInfo->msg, msg6);
......
......@@ -327,7 +327,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int16_t prec = tinfo.precision;
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec);
......@@ -787,7 +787,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int16_t prec = tinfo.precision;
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec);
......@@ -912,7 +912,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo;
int64_t actualETime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->etime : pQueryInfo->stime;
int64_t actualETime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey;
tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
......@@ -1295,7 +1295,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer
// for group result interpolation, do not return if not data is generated
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
int64_t newTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision);
......@@ -1361,7 +1361,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
prevGroupCompleted) {
// if interpoType == TSDB_INTERPO_NONE, return directly
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
int64_t etime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->etime : pQueryInfo->stime;
int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey;
etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime,
pQueryInfo->slidingTimeUnit, precision);
......
......@@ -669,11 +669,11 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
pQueryMsg->window.skey = htobe64(pQueryInfo->stime);
pQueryMsg->window.ekey = htobe64(pQueryInfo->etime);
pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
} else {
pQueryMsg->window.skey = htobe64(pQueryInfo->etime);
pQueryMsg->window.ekey = htobe64(pQueryInfo->stime);
pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey);
pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
}
pQueryMsg->numOfTables = htonl(numOfTables);
......
......@@ -116,18 +116,18 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
if (isProjectStream(pQueryInfo)) {
/*
* pQueryInfo->etime, which is the start time, does not change in case of
* pQueryInfo->window.ekey, which is the start time, does not change in case of
* repeat first execution, once the first execution failed.
*/
pQueryInfo->stime = pStream->stime; // start time
pQueryInfo->window.skey = pStream->stime; // start time
pQueryInfo->etime = taosGetTimestamp(pStream->precision); // end time
if (pQueryInfo->etime > pStream->etime) {
pQueryInfo->etime = pStream->etime;
pQueryInfo->window.ekey = taosGetTimestamp(pStream->precision); // end time
if (pQueryInfo->window.ekey > pStream->etime) {
pQueryInfo->window.ekey = pStream->etime;
}
} else {
pQueryInfo->stime = pStream->stime - pStream->interval;
pQueryInfo->etime = pStream->stime - 1;
pQueryInfo->window.skey = pStream->stime - pStream->interval;
pQueryInfo->window.ekey = pStream->stime - 1;
}
// launch stream computing in a new thread
......@@ -425,10 +425,10 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
pStream->slidingTime = tsProjectExecInterval;
if (stime != 0) { // first projection start from the latest event timestamp
assert(stime >= pQueryInfo->stime);
assert(stime >= pQueryInfo->window.skey);
stime += 1; // exclude the last records from table
} else {
stime = pQueryInfo->stime;
stime = pQueryInfo->window.skey;
}
} else { // timewindow based aggregation stream
if (stime == 0) { // no data in meter till now
......@@ -548,7 +548,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
pStream->precision = tinfo.precision;
pStream->ctime = taosGetTimestamp(pStream->precision);
pStream->etime = pQueryInfo->etime;
pStream->etime = pQueryInfo->window.ekey;
pSql->pStream = pStream;
tscAddIntoStreamList(pStream);
......
......@@ -416,10 +416,10 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter
// update the query time range according to the join results on timestamp
static void updateQueryTimeRange(SQueryInfo* pQueryInfo, int64_t st, int64_t et) {
assert(pQueryInfo->stime <= st && pQueryInfo->etime >= et);
assert(pQueryInfo->window.skey <= st && pQueryInfo->window.ekey >= et);
pQueryInfo->stime = st;
pQueryInfo->etime = et;
pQueryInfo->window.skey = st;
pQueryInfo->window.ekey = et;
}
static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
......
......@@ -1866,8 +1866,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
"%p new subquery: %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d,"
"fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64,
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
size, pNewQueryInfo->fieldsInfo.numOfOutput, pFinalInfo->name, pNewQueryInfo->stime,
pNewQueryInfo->etime, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit);
size, pNewQueryInfo->fieldsInfo.numOfOutput, pFinalInfo->name, pNewQueryInfo->window.skey,
pNewQueryInfo->window.ekey, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit);
tscPrintSelectClause(pNew, 0);
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册