From 29df3ea39bed7bca698460aa49ef20c27efd8341 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 21 Jan 2020 14:11:05 +0800 Subject: [PATCH] support order for super table projection query. #1166. [tbase-1477] --- src/client/inc/tscUtil.h | 4 +++ src/client/inc/tsclient.h | 5 ++- src/client/src/tscFunctionImpl.c | 44 +++++++++++++---------- src/client/src/tscSQLParser.c | 40 ++++++++++++++------- src/client/src/tscSecondaryMerge.c | 27 +++++++++----- src/client/src/tscServer.c | 1 - src/client/src/tscUtil.c | 31 +++++++++++----- src/system/detail/src/vnodeQueryImpl.c | 14 ++++---- src/system/detail/src/vnodeQueryProcess.c | 6 ++-- src/system/detail/src/vnodeRead.c | 14 -------- 10 files changed, 113 insertions(+), 73 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 3999377149..9868f703c3 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -97,7 +97,11 @@ SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); */ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo); bool tscIsTWAQuery(SQueryInfo* pQueryInfo); + bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex); +bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex); +bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex); + bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo); bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index e03ef5eb1f..411b9b9470 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -226,7 +226,10 @@ typedef struct SQueryInfo { struct STSBuf * tsBuf; int64_t * defaultVal; // default value for interpolation char * msg; // pointer to the pCmd->payload to keep error message temporarily - int64_t clauseLimit; // limit for this sub clause + int64_t clauseLimit; // limit for current sub clause + + // offset value in the original sql expression, NOT sent to virtual node, only applied at client side + int64_t prjOffset; } SQueryInfo; // data source from sql string or from file diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index b4667fa502..32ddb6aee6 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -2020,11 +2020,11 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) { int32_t step = 0; // in case of second stage merge, always use incremental output. - if (pCtx->currentStage == SECONDARY_STAGE_MERGE) { +// if (pCtx->currentStage == SECONDARY_STAGE_MERGE) { step = QUERY_ASC_FORWARD_STEP; - } else { - step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); - } +// } else { +// step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); +// } int32_t len = GET_RES_INFO(pCtx)->numOfRes; @@ -2891,16 +2891,23 @@ static void col_project_function(SQLFunctionCtx *pCtx) { INC_INIT_VAL(pCtx, pCtx->size); char *pDest = 0; - if (pCtx->order == TSQL_SO_ASC) { +// if (pCtx->order == TSQL_SO_ASC) { pDest = pCtx->aOutputBuf; - } else { - pDest = pCtx->aOutputBuf - (pCtx->size - 1) * pCtx->inputBytes; - } +// } else { +// pDest = pCtx->aOutputBuf - (pCtx->size - 1) * pCtx->inputBytes; +// } char *pData = GET_INPUT_CHAR(pCtx); - memcpy(pDest, pData, (size_t)pCtx->size * pCtx->inputBytes); + if (pCtx->order == TSQL_SO_ASC) { + memcpy(pDest, pData, (size_t)pCtx->size * pCtx->inputBytes); + } else { + for(int32_t i = 0; i < pCtx->size; ++i) { + memcpy(pDest + (pCtx->size - 1 - i) * pCtx->inputBytes, pData + i * pCtx->inputBytes, + pCtx->inputBytes); + } + } - pCtx->aOutputBuf += pCtx->size * pCtx->outputBytes * GET_FORWARD_DIRECTION_FACTOR(pCtx->order); + pCtx->aOutputBuf += pCtx->size * pCtx->outputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/; } static void col_project_function_f(SQLFunctionCtx *pCtx, int32_t index) { @@ -2915,7 +2922,7 @@ static void col_project_function_f(SQLFunctionCtx *pCtx, int32_t index) { char *pData = GET_INPUT_CHAR_INDEX(pCtx, index); memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes); - pCtx->aOutputBuf += pCtx->inputBytes * GET_FORWARD_DIRECTION_FACTOR(pCtx->order); + pCtx->aOutputBuf += pCtx->inputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/; } /** @@ -2927,18 +2934,18 @@ static void tag_project_function(SQLFunctionCtx *pCtx) { INC_INIT_VAL(pCtx, pCtx->size); assert(pCtx->inputBytes == pCtx->outputBytes); - int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); +// int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); for (int32_t i = 0; i < pCtx->size; ++i) { tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->outputType); - pCtx->aOutputBuf += pCtx->outputBytes * factor; + pCtx->aOutputBuf += pCtx->outputBytes/* * factor*/; } } static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) { INC_INIT_VAL(pCtx, 1); tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType); - pCtx->aOutputBuf += pCtx->outputBytes * GET_FORWARD_DIRECTION_FACTOR(pCtx->order); + pCtx->aOutputBuf += pCtx->outputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/; } /** @@ -2986,9 +2993,10 @@ static void diff_function(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); + int32_t step = 1 /*GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/; - int32_t i = (pCtx->order == TSQL_SO_ASC) ? 0 : pCtx->size - 1; +// int32_t i = (pCtx->order == TSQL_SO_ASC) ? 0 : pCtx->size - 1; + int32_t i = 0; TSKEY * pTimestamp = pCtx->ptsOutputBuf; switch (pCtx->inputType) { @@ -3289,7 +3297,7 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) { tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, pCtx->size, pCtx->aOutputBuf, sas, pCtx->order, arithmetic_callback_function); - pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size * GET_FORWARD_DIRECTION_FACTOR(pCtx->order); + pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/; pCtx->param[1].pz = NULL; } @@ -3301,7 +3309,7 @@ static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) { tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, 1, pCtx->aOutputBuf, sas, pCtx->order, arithmetic_callback_function); - pCtx->aOutputBuf += pCtx->outputBytes * GET_FORWARD_DIRECTION_FACTOR(pCtx->order); + pCtx->aOutputBuf += pCtx->outputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/; } #define LIST_MINMAX_N(ctx, minOutput, maxOutput, elemCnt, data, type, tsdbType, numOfNotNullElem) \ diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 945c0aff22..f5b1b76607 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4460,21 +4460,23 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* if (queryOnTags == true) { // local handle the metric tag query pQueryInfo->command = TSDB_SQL_RETRIEVE_TAGS; } else { - if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { - if (pQueryInfo->order.orderColId >= 0) { - if (pQueryInfo->limit.limit == -1) { - return invalidSqlErrMsg(pQueryInfo->msg, msg4); - } else if (pQueryInfo->limit.limit > 10000) { // the result set can not be larger than 10000 - //todo use global config parameter - return invalidSqlErrMsg(pQueryInfo->msg, msg5); - } - } - + if (tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { if (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0) { return invalidSqlErrMsg(pQueryInfo->msg, msg3); } - - pQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; // for projection query on super table, all queries are subqueries + + if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { +// if (pQueryInfo->order.orderColId >= 0) { +// if (pQueryInfo->limit.limit == -1) { +// return invalidSqlErrMsg(pQueryInfo->msg, msg4); +// } else if (pQueryInfo->limit.limit > 10000) { // the result set can not be larger than 10000 +// //todo use global config parameter +// return invalidSqlErrMsg(pQueryInfo->msg, msg5); +// } +// } + + pQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; // for projection query on super table, all queries are subqueries + } } } @@ -4504,6 +4506,20 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* // keep original limitation value in globalLimit pQueryInfo->clauseLimit = pQueryInfo->limit.limit; + pQueryInfo->prjOffset = pQueryInfo->limit.offset; + + if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { + /* + * the limitation/offset value should be removed during retrieve data from virtual node, + * since the global order are done in client side, so the limitation should also + * be done at the client side. + */ + if (pQueryInfo->limit.limit > 0) { + pQueryInfo->limit.limit = -1; + } + + pQueryInfo->limit.offset = 0; + } } else { if (pQueryInfo->slimit.limit != -1 || pQueryInfo->slimit.offset != 0) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 956ca8249b..30f1dfad77 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -306,8 +306,15 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd // we change the maxCapacity of schema to denote that there is only one row in temp buffer pReducer->pDesc->pSchema->maxCapacity = 1; + + //restore the limitation value at the last stage + if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { + pQueryInfo->limit.limit = pQueryInfo->clauseLimit; + pQueryInfo->limit.offset = pQueryInfo->prjOffset; + } + pReducer->offset = pQueryInfo->limit.offset; - + pRes->pLocalReducer = pReducer; pRes->numOfGroups = 0; @@ -510,7 +517,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm } // primary timestamp column is involved in final result - if (pQueryInfo->nAggTimeInterval != 0) { + if (pQueryInfo->nAggTimeInterval != 0 || tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { numOfGroupByCols++; } @@ -549,6 +556,12 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage int16_t functionId = tscSqlExprGet(pQueryInfo, 0)->functionId; // disable merge procedure for column projection query + assert(functionId != TSDB_FUNC_ARITHM); + + if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { + return true; + } + if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_ARITHM) { return false; } @@ -848,11 +861,11 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo int32_t rowSize = tscGetResRowLength(pQueryInfo); // handle the descend order output - if (pQueryInfo->order.order == TSQL_SO_ASC) { +// if (pQueryInfo->order.order == TSQL_SO_ASC) { memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * rowSize); - } else { - reversedCopyResultToDstBuf(pQueryInfo, pRes, pFinalDataPage); - } +// } else { +// reversedCopyResultToDstBuf(pQueryInfo, pRes, pFinalDataPage); +// } pFinalDataPage->numOfElems = 0; return; @@ -1400,8 +1413,6 @@ int32_t tscDoLocalreduce(SSqlObj *pSql) { #if defined(_DEBUG_VIEW) printf("chosen row:\t"); SSrcColumnInfo colInfo[256] = {0}; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - tscGetSrcColumnInfo(colInfo, pQueryInfo); tColModelDisplayEx(pModel, tmpBuffer->data, tmpBuffer->numOfElems, pModel->maxCapacity, colInfo); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e281e4d752..50614d2501 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1182,7 +1182,6 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { #ifdef _DEBUG_VIEW printf("received data from vnode: %d rows\n", pRes->numOfRows); SSrcColumnInfo colInfo[256] = {0}; - SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); tscGetSrcColumnInfo(colInfo, pQueryInfo); tColModelDisplayEx(pDesc->pSchema, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index c54c23acdb..31a4433fb5 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -228,9 +228,9 @@ bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { return false; } -bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { +bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, tableIndex); - + /* * In following cases, return false for non ordered project query on super table * 1. failed to get metermeta from server; 2. not a super table; 3. limitation is 0; @@ -240,17 +240,12 @@ bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableI pQueryInfo->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pQueryInfo->exprsInfo.numOfExprs == 0) { return false; } - + // only query on tag, not a projection query if (tscQueryMetricTags(pQueryInfo)) { return false; } - // order by column exists, not a non-ordered projection query - if (pQueryInfo->order.orderColId >= 0) { - return false; - } - // for project query, only the following two function is allowed for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId; @@ -259,10 +254,28 @@ bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableI return false; } } - + return true; } +bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { + if (!tscIsProjectionQueryOnSTable(pQueryInfo, tableIndex)) { + return false; + } + + // order by column exists, not a non-ordered projection query + return pQueryInfo->order.orderColId < 0; +} + +bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { + if (!tscIsProjectionQueryOnSTable(pQueryInfo, tableIndex)) { + return false; + } + + // order by column exists, a non-ordered projection query + return pQueryInfo->order.orderColId >= 0; +} + bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId; diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index e721174cfe..9c4f442bbc 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -5391,14 +5391,14 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; - if (QUERY_IS_ASC_QUERY(pQuery)) { +// if (QUERY_IS_ASC_QUERY(pQuery)) { memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->pointsRead * bytes); - } else { // DESC query - int32_t maxrows = pQuery->pointsToRead; - - memmove(pQuery->sdata[i]->data + (maxrows - pQuery->pointsRead) * bytes, - pQuery->sdata[i]->data + (maxrows - size) * bytes, pQuery->pointsRead * bytes); - } +// } else { // DESC query +// int32_t maxrows = pQuery->pointsToRead; +// +// memmove(pQuery->sdata[i]->data + (maxrows - pQuery->pointsRead) * bytes, +// pQuery->sdata[i]->data + (maxrows - size) * bytes, pQuery->pointsRead * bytes); +// } pRuntimeEnv->pCtx[i].aOutputBuf -= bytes * numOfSkip * step; diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index d9c51567f0..b8b8c9ea80 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -747,10 +747,10 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { pSupporter->meterIdx++; // if the buffer is full or group by each table, we need to jump out of the loop - if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL) || - isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)) { +// if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL) || +// isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)) { break; - } +// } } else { // forward query range diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index 81830ec6da..936fde3a0e 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -392,11 +392,6 @@ __clean_memory: return NULL; } -//static void vnodeFreeQInfoInQueueImpl(SSchedMsg *pMsg) { -// SQInfo *pQInfo = (SQInfo *)pMsg->ahandle; -// vnodeFreeQInfo(pQInfo, true); -//} - void vnodeFreeQInfoInQueue(void *param) { SQInfo *pQInfo = (SQInfo *)param; @@ -406,15 +401,6 @@ void vnodeFreeQInfoInQueue(void *param) { dTrace("QInfo:%p set kill flag to free QInfo"); vnodeDecRefCount(pQInfo); - -// dTrace("QInfo:%p set kill flag and add to queue, stop query ASAP", pQInfo); -// SSchedMsg schedMsg = {0}; -// schedMsg.fp = vnodeFreeQInfoInQueueImpl; - -// schedMsg.msg = NULL; -// schedMsg.thandle = (void *)1; -// schedMsg.ahandle = param; -// taosScheduleTask(queryQhandle, &schedMsg); } void vnodeFreeQInfo(void *param, bool decQueryRef) { -- GitLab