From e424202e578c8e19114cdd3035920fe2d0798d09 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Thu, 23 Apr 2020 15:06:20 +0800 Subject: [PATCH] [td-171] refactor codes --- src/client/inc/tsclient.h | 6 ++-- src/client/src/tscSQLParser.c | 55 ++++++++++++++---------------- src/client/src/tscSecondaryMerge.c | 10 +++--- src/client/src/tscServer.c | 8 ++--- src/client/src/tscStream.c | 20 +++++------ src/client/src/tscSubquery.c | 6 ++-- src/client/src/tscUtil.c | 4 +-- 7 files changed, 53 insertions(+), 56 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 89bed123c4..337b6b9628 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -200,14 +200,14 @@ typedef struct SQueryInfo { uint16_t type; // query/insert/import type char slidingTimeUnit; - int64_t etime, stime; + STimeWindow window; int64_t intervalTime; // aggregation time interval int64_t slidingTime; // sliding window in mseconds SSqlGroupbyExpr groupbyExpr; // group by tags info - SArray * colList; // SArray + SArray * colList; // SArray SFieldInfo fieldsInfo; - SArray * exprsInfo; // SArray + SArray * exprsInfo; // SArray SLimitVal limit; SLimitVal slimit; STagCond tagCond; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 8b7cea356f..fce83c8790 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2656,7 +2656,7 @@ typedef struct SCondExpr { bool tsJoin; } SCondExpr; -static int32_t getTimeRange(int64_t* stime, int64_t* etime, tSQLExpr* pRight, int32_t optr, int16_t timePrecision); +static int32_t getTimeRange(STimeWindow* win, tSQLExpr* pRight, int32_t optr, int16_t timePrecision); static int32_t tSQLExprNodeToString(tSQLExpr* pExpr, char** str) { if (pExpr->nSQLOptr == TK_ID) { // column name @@ -3631,20 +3631,18 @@ static int32_t getTimeRangeFromExpr(SQueryInfo* pQueryInfo, tSQLExpr* pExpr) { tSQLExpr* pRight = pExpr->pRight; - TSKEY stime = 0; - TSKEY etime = INT64_MAX; - - if (getTimeRange(&stime, &etime, pRight, pExpr->nSQLOptr, tinfo.precision) != TSDB_CODE_SUCCESS) { + STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX}; + if (getTimeRange(&win, pRight, pExpr->nSQLOptr, tinfo.precision) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(pQueryInfo->msg, msg0); } // update the timestamp query range - if (pQueryInfo->stime < stime) { - pQueryInfo->stime = stime; + if (pQueryInfo->window.skey < win.skey) { + pQueryInfo->window.skey = win.skey; } - if (pQueryInfo->etime > etime) { - pQueryInfo->etime = etime; + if (pQueryInfo->window.ekey > win.ekey) { + pQueryInfo->window.ekey = win.ekey; } } @@ -3756,8 +3754,8 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SSqlObj* pSql int32_t ret = TSDB_CODE_SUCCESS; - pQueryInfo->stime = 0; - pQueryInfo->etime = INT64_MAX; + pQueryInfo->window.skey = 0; + pQueryInfo->window.ekey = INT64_MAX; // tags query condition may be larger than 512bytes, therefore, we need to prepare enough large space SStringBuilder sb = {0}; @@ -3823,7 +3821,7 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SSqlObj* pSql return ret; } -int32_t getTimeRange(int64_t* stime, int64_t* etime, tSQLExpr* pRight, int32_t optr, int16_t timePrecision) { +int32_t getTimeRange(STimeWindow* win, tSQLExpr* pRight, int32_t optr, int16_t timePrecision) { // this is join condition, do nothing if (pRight->nSQLOptr == TK_ID) { return TSDB_CODE_SUCCESS; @@ -3898,16 +3896,15 @@ int32_t getTimeRange(int64_t* stime, int64_t* etime, tSQLExpr* pRight, int32_t o } if (optr == TK_LE) { - *etime = val; + win->ekey = val; } else if (optr == TK_LT) { - *etime = val - delta; + win->ekey = val - delta; } else if (optr == TK_GT) { - *stime = val + delta; + win->skey = val + delta; } else if (optr == TK_GE) { - *stime = val; + win->skey = val; } else if (optr == TK_EQ) { - *stime = val; - *etime = *stime; + win->ekey = win->skey = val; } return TSDB_CODE_SUCCESS; } @@ -4612,7 +4609,7 @@ bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo) { return true; } - return (pQueryInfo->stime == pQueryInfo->etime) && (pQueryInfo->stime != 0); + return (pQueryInfo->window.skey == pQueryInfo->window.ekey) && (pQueryInfo->window.skey != 0); } int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* pQuerySql, SSqlObj* pSql) { @@ -5757,22 +5754,22 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { pQuerySql->pWhere = NULL; if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) { - pQueryInfo->stime = pQueryInfo->stime / 1000; - pQueryInfo->etime = pQueryInfo->etime / 1000; + pQueryInfo->window.skey = pQueryInfo->window.skey / 1000; + pQueryInfo->window.ekey = pQueryInfo->window.ekey / 1000; } } else { // set the time rang - pQueryInfo->stime = 0; - pQueryInfo->etime = INT64_MAX; + pQueryInfo->window.skey = 0; + pQueryInfo->window.ekey = INT64_MAX; } // user does not specified the query time window, twa is not allowed in such case. - if ((pQueryInfo->stime == 0 || pQueryInfo->etime == INT64_MAX || - (pQueryInfo->etime == INT64_MAX / 1000 && tinfo.precision == TSDB_TIME_PRECISION_MILLI)) && tscIsTWAQuery(pQueryInfo)) { + if ((pQueryInfo->window.skey == 0 || pQueryInfo->window.ekey == INT64_MAX || + (pQueryInfo->window.ekey == INT64_MAX / 1000 && tinfo.precision == TSDB_TIME_PRECISION_MILLI)) && tscIsTWAQuery(pQueryInfo)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg9); } // no result due to invalid query time range - if (pQueryInfo->stime > pQueryInfo->etime) { + if (pQueryInfo->window.skey > pQueryInfo->window.ekey) { pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; return TSDB_CODE_SUCCESS; } @@ -5783,9 +5780,9 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { // in case of join query, time range is required. if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { - int64_t timeRange = labs(pQueryInfo->stime - pQueryInfo->etime); + int64_t timeRange = labs(pQueryInfo->window.skey - pQueryInfo->window.ekey); - if (timeRange == 0 && pQueryInfo->stime == 0) { + if (timeRange == 0 && pQueryInfo->window.skey == 0) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); } } @@ -5814,7 +5811,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { } if (pQueryInfo->intervalTime > 0) { - int64_t timeRange = labs(pQueryInfo->stime - pQueryInfo->etime); + int64_t timeRange = labs(pQueryInfo->window.skey - pQueryInfo->window.ekey); // number of result is not greater than 10,000,000 if ((timeRange == 0) || (timeRange / pQueryInfo->intervalTime) > MAX_RETRIEVE_ROWS_IN_INTERVAL_QUERY) { return invalidSqlErrMsg(pQueryInfo->msg, msg6); diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index bd2e6e9019..84f14abf4c 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -327,7 +327,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); int16_t prec = tinfo.precision; - int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime; + int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey; int64_t revisedSTime = taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec); @@ -787,7 +787,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); int16_t prec = tinfo.precision; - int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime; + int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey; int64_t revisedSTime = taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec); @@ -912,7 +912,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo; - int64_t actualETime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->etime : pQueryInfo->stime; + int64_t actualETime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey; tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput); for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { @@ -1295,7 +1295,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer // for group result interpolation, do not return if not data is generated if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) { - int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime; + int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey; int64_t newTime = taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision); @@ -1361,7 +1361,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { prevGroupCompleted) { // if interpoType == TSDB_INTERPO_NONE, return directly if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) { - int64_t etime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->etime : pQueryInfo->stime; + int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey; etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 332d56c1b1..88d0be3fb9 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -669,11 +669,11 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } if (pQueryInfo->order.order == TSDB_ORDER_ASC) { - pQueryMsg->window.skey = htobe64(pQueryInfo->stime); - pQueryMsg->window.ekey = htobe64(pQueryInfo->etime); + pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey); + pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey); } else { - pQueryMsg->window.skey = htobe64(pQueryInfo->etime); - pQueryMsg->window.ekey = htobe64(pQueryInfo->stime); + pQueryMsg->window.skey = htobe64(pQueryInfo->window.ekey); + pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey); } pQueryMsg->numOfTables = htonl(numOfTables); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index bae1c0f86a..9f7d4887d1 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -116,18 +116,18 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { if (isProjectStream(pQueryInfo)) { /* - * pQueryInfo->etime, which is the start time, does not change in case of + * pQueryInfo->window.ekey, which is the start time, does not change in case of * repeat first execution, once the first execution failed. */ - pQueryInfo->stime = pStream->stime; // start time + pQueryInfo->window.skey = pStream->stime; // start time - pQueryInfo->etime = taosGetTimestamp(pStream->precision); // end time - if (pQueryInfo->etime > pStream->etime) { - pQueryInfo->etime = pStream->etime; + pQueryInfo->window.ekey = taosGetTimestamp(pStream->precision); // end time + if (pQueryInfo->window.ekey > pStream->etime) { + pQueryInfo->window.ekey = pStream->etime; } } else { - pQueryInfo->stime = pStream->stime - pStream->interval; - pQueryInfo->etime = pStream->stime - 1; + pQueryInfo->window.skey = pStream->stime - pStream->interval; + pQueryInfo->window.ekey = pStream->stime - 1; } // launch stream computing in a new thread @@ -425,10 +425,10 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in pStream->slidingTime = tsProjectExecInterval; if (stime != 0) { // first projection start from the latest event timestamp - assert(stime >= pQueryInfo->stime); + assert(stime >= pQueryInfo->window.skey); stime += 1; // exclude the last records from table } else { - stime = pQueryInfo->stime; + stime = pQueryInfo->window.skey; } } else { // timewindow based aggregation stream if (stime == 0) { // no data in meter till now @@ -548,7 +548,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p pStream->precision = tinfo.precision; pStream->ctime = taosGetTimestamp(pStream->precision); - pStream->etime = pQueryInfo->etime; + pStream->etime = pQueryInfo->window.ekey; pSql->pStream = pStream; tscAddIntoStreamList(pStream); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 00a4c8c448..fba82657aa 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -416,10 +416,10 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter // update the query time range according to the join results on timestamp static void updateQueryTimeRange(SQueryInfo* pQueryInfo, int64_t st, int64_t et) { - assert(pQueryInfo->stime <= st && pQueryInfo->etime >= et); + assert(pQueryInfo->window.skey <= st && pQueryInfo->window.ekey >= et); - pQueryInfo->stime = st; - pQueryInfo->etime = et; + pQueryInfo->window.skey = st; + pQueryInfo->window.ekey = et; } static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d0fbc4c373..d7db533e7d 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1866,8 +1866,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void "%p new subquery: %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d," "fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64, pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo), - size, pNewQueryInfo->fieldsInfo.numOfOutput, pFinalInfo->name, pNewQueryInfo->stime, - pNewQueryInfo->etime, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit); + size, pNewQueryInfo->fieldsInfo.numOfOutput, pFinalInfo->name, pNewQueryInfo->window.skey, + pNewQueryInfo->window.ekey, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit); tscPrintSelectClause(pNew, 0); } else { -- GitLab