提交 29df3ea3 编写于 作者: H hjxilinx

support order for super table projection query. #1166. [tbase-1477]

上级 44acd0ee
......@@ -97,7 +97,11 @@ SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
*/
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo);
bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
......
......@@ -226,7 +226,10 @@ typedef struct SQueryInfo {
struct STSBuf * tsBuf;
int64_t * defaultVal; // default value for interpolation
char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for this sub clause
int64_t clauseLimit; // limit for current sub clause
// offset value in the original sql expression, NOT sent to virtual node, only applied at client side
int64_t prjOffset;
} SQueryInfo;
// data source from sql string or from file
......
......@@ -2020,11 +2020,11 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) {
int32_t step = 0;
// in case of second stage merge, always use incremental output.
if (pCtx->currentStage == SECONDARY_STAGE_MERGE) {
// if (pCtx->currentStage == SECONDARY_STAGE_MERGE) {
step = QUERY_ASC_FORWARD_STEP;
} else {
step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
}
// } else {
// step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
// }
int32_t len = GET_RES_INFO(pCtx)->numOfRes;
......@@ -2891,16 +2891,23 @@ static void col_project_function(SQLFunctionCtx *pCtx) {
INC_INIT_VAL(pCtx, pCtx->size);
char *pDest = 0;
if (pCtx->order == TSQL_SO_ASC) {
// if (pCtx->order == TSQL_SO_ASC) {
pDest = pCtx->aOutputBuf;
} else {
pDest = pCtx->aOutputBuf - (pCtx->size - 1) * pCtx->inputBytes;
}
// } else {
// pDest = pCtx->aOutputBuf - (pCtx->size - 1) * pCtx->inputBytes;
// }
char *pData = GET_INPUT_CHAR(pCtx);
memcpy(pDest, pData, (size_t)pCtx->size * pCtx->inputBytes);
if (pCtx->order == TSQL_SO_ASC) {
memcpy(pDest, pData, (size_t)pCtx->size * pCtx->inputBytes);
} else {
for(int32_t i = 0; i < pCtx->size; ++i) {
memcpy(pDest + (pCtx->size - 1 - i) * pCtx->inputBytes, pData + i * pCtx->inputBytes,
pCtx->inputBytes);
}
}
pCtx->aOutputBuf += pCtx->size * pCtx->outputBytes * GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
pCtx->aOutputBuf += pCtx->size * pCtx->outputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
}
static void col_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
......@@ -2915,7 +2922,7 @@ static void col_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
char *pData = GET_INPUT_CHAR_INDEX(pCtx, index);
memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes);
pCtx->aOutputBuf += pCtx->inputBytes * GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
pCtx->aOutputBuf += pCtx->inputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
}
/**
......@@ -2927,18 +2934,18 @@ static void tag_project_function(SQLFunctionCtx *pCtx) {
INC_INIT_VAL(pCtx, pCtx->size);
assert(pCtx->inputBytes == pCtx->outputBytes);
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
// int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
for (int32_t i = 0; i < pCtx->size; ++i) {
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->outputType);
pCtx->aOutputBuf += pCtx->outputBytes * factor;
pCtx->aOutputBuf += pCtx->outputBytes/* * factor*/;
}
}
static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
INC_INIT_VAL(pCtx, 1);
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType);
pCtx->aOutputBuf += pCtx->outputBytes * GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
pCtx->aOutputBuf += pCtx->outputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
}
/**
......@@ -2986,9 +2993,10 @@ static void diff_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t step = 1 /*GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
int32_t i = (pCtx->order == TSQL_SO_ASC) ? 0 : pCtx->size - 1;
// int32_t i = (pCtx->order == TSQL_SO_ASC) ? 0 : pCtx->size - 1;
int32_t i = 0;
TSKEY * pTimestamp = pCtx->ptsOutputBuf;
switch (pCtx->inputType) {
......@@ -3289,7 +3297,7 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) {
tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, pCtx->size, pCtx->aOutputBuf, sas, pCtx->order,
arithmetic_callback_function);
pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size * GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
pCtx->param[1].pz = NULL;
}
......@@ -3301,7 +3309,7 @@ static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) {
tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, 1, pCtx->aOutputBuf, sas, pCtx->order,
arithmetic_callback_function);
pCtx->aOutputBuf += pCtx->outputBytes * GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
pCtx->aOutputBuf += pCtx->outputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
}
#define LIST_MINMAX_N(ctx, minOutput, maxOutput, elemCnt, data, type, tsdbType, numOfNotNullElem) \
......
......@@ -4460,21 +4460,23 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
if (queryOnTags == true) { // local handle the metric tag query
pQueryInfo->command = TSDB_SQL_RETRIEVE_TAGS;
} else {
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
if (pQueryInfo->order.orderColId >= 0) {
if (pQueryInfo->limit.limit == -1) {
return invalidSqlErrMsg(pQueryInfo->msg, msg4);
} else if (pQueryInfo->limit.limit > 10000) { // the result set can not be larger than 10000
//todo use global config parameter
return invalidSqlErrMsg(pQueryInfo->msg, msg5);
}
}
if (tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
if (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0) {
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
}
pQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; // for projection query on super table, all queries are subqueries
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
// if (pQueryInfo->order.orderColId >= 0) {
// if (pQueryInfo->limit.limit == -1) {
// return invalidSqlErrMsg(pQueryInfo->msg, msg4);
// } else if (pQueryInfo->limit.limit > 10000) { // the result set can not be larger than 10000
// //todo use global config parameter
// return invalidSqlErrMsg(pQueryInfo->msg, msg5);
// }
// }
pQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; // for projection query on super table, all queries are subqueries
}
}
}
......@@ -4504,6 +4506,20 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
// keep original limitation value in globalLimit
pQueryInfo->clauseLimit = pQueryInfo->limit.limit;
pQueryInfo->prjOffset = pQueryInfo->limit.offset;
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
/*
* the limitation/offset value should be removed during retrieve data from virtual node,
* since the global order are done in client side, so the limitation should also
* be done at the client side.
*/
if (pQueryInfo->limit.limit > 0) {
pQueryInfo->limit.limit = -1;
}
pQueryInfo->limit.offset = 0;
}
} else {
if (pQueryInfo->slimit.limit != -1 || pQueryInfo->slimit.offset != 0) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
......
......@@ -306,8 +306,15 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
// we change the maxCapacity of schema to denote that there is only one row in temp buffer
pReducer->pDesc->pSchema->maxCapacity = 1;
//restore the limitation value at the last stage
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
pQueryInfo->limit.limit = pQueryInfo->clauseLimit;
pQueryInfo->limit.offset = pQueryInfo->prjOffset;
}
pReducer->offset = pQueryInfo->limit.offset;
pRes->pLocalReducer = pReducer;
pRes->numOfGroups = 0;
......@@ -510,7 +517,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
}
// primary timestamp column is involved in final result
if (pQueryInfo->nAggTimeInterval != 0) {
if (pQueryInfo->nAggTimeInterval != 0 || tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
numOfGroupByCols++;
}
......@@ -549,6 +556,12 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
int16_t functionId = tscSqlExprGet(pQueryInfo, 0)->functionId;
// disable merge procedure for column projection query
assert(functionId != TSDB_FUNC_ARITHM);
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
return true;
}
if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_ARITHM) {
return false;
}
......@@ -848,11 +861,11 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
int32_t rowSize = tscGetResRowLength(pQueryInfo);
// handle the descend order output
if (pQueryInfo->order.order == TSQL_SO_ASC) {
// if (pQueryInfo->order.order == TSQL_SO_ASC) {
memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * rowSize);
} else {
reversedCopyResultToDstBuf(pQueryInfo, pRes, pFinalDataPage);
}
// } else {
// reversedCopyResultToDstBuf(pQueryInfo, pRes, pFinalDataPage);
// }
pFinalDataPage->numOfElems = 0;
return;
......@@ -1400,8 +1413,6 @@ int32_t tscDoLocalreduce(SSqlObj *pSql) {
#if defined(_DEBUG_VIEW)
printf("chosen row:\t");
SSrcColumnInfo colInfo[256] = {0};
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pModel, tmpBuffer->data, tmpBuffer->numOfElems, pModel->maxCapacity, colInfo);
......
......@@ -1182,7 +1182,6 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
#ifdef _DEBUG_VIEW
printf("received data from vnode: %d rows\n", pRes->numOfRows);
SSrcColumnInfo colInfo[256] = {0};
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pDesc->pSchema, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
......
......@@ -228,9 +228,9 @@ bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
return false;
}
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, tableIndex);
/*
* In following cases, return false for non ordered project query on super table
* 1. failed to get metermeta from server; 2. not a super table; 3. limitation is 0;
......@@ -240,17 +240,12 @@ bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableI
pQueryInfo->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pQueryInfo->exprsInfo.numOfExprs == 0) {
return false;
}
// only query on tag, not a projection query
if (tscQueryMetricTags(pQueryInfo)) {
return false;
}
// order by column exists, not a non-ordered projection query
if (pQueryInfo->order.orderColId >= 0) {
return false;
}
// for project query, only the following two function is allowed
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
......@@ -259,10 +254,28 @@ bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableI
return false;
}
}
return true;
}
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
if (!tscIsProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
return false;
}
// order by column exists, not a non-ordered projection query
return pQueryInfo->order.orderColId < 0;
}
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
if (!tscIsProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
return false;
}
// order by column exists, a non-ordered projection query
return pQueryInfo->order.orderColId >= 0;
}
bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
......
......@@ -5391,14 +5391,14 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) {
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
if (QUERY_IS_ASC_QUERY(pQuery)) {
// if (QUERY_IS_ASC_QUERY(pQuery)) {
memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->pointsRead * bytes);
} else { // DESC query
int32_t maxrows = pQuery->pointsToRead;
memmove(pQuery->sdata[i]->data + (maxrows - pQuery->pointsRead) * bytes,
pQuery->sdata[i]->data + (maxrows - size) * bytes, pQuery->pointsRead * bytes);
}
// } else { // DESC query
// int32_t maxrows = pQuery->pointsToRead;
//
// memmove(pQuery->sdata[i]->data + (maxrows - pQuery->pointsRead) * bytes,
// pQuery->sdata[i]->data + (maxrows - size) * bytes, pQuery->pointsRead * bytes);
// }
pRuntimeEnv->pCtx[i].aOutputBuf -= bytes * numOfSkip * step;
......
......@@ -747,10 +747,10 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
pSupporter->meterIdx++;
// if the buffer is full or group by each table, we need to jump out of the loop
if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL) ||
isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)) {
// if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL) ||
// isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)) {
break;
}
// }
} else {
// forward query range
......
......@@ -392,11 +392,6 @@ __clean_memory:
return NULL;
}
//static void vnodeFreeQInfoInQueueImpl(SSchedMsg *pMsg) {
// SQInfo *pQInfo = (SQInfo *)pMsg->ahandle;
// vnodeFreeQInfo(pQInfo, true);
//}
void vnodeFreeQInfoInQueue(void *param) {
SQInfo *pQInfo = (SQInfo *)param;
......@@ -406,15 +401,6 @@ void vnodeFreeQInfoInQueue(void *param) {
dTrace("QInfo:%p set kill flag to free QInfo");
vnodeDecRefCount(pQInfo);
// dTrace("QInfo:%p set kill flag and add to queue, stop query ASAP", pQInfo);
// SSchedMsg schedMsg = {0};
// schedMsg.fp = vnodeFreeQInfoInQueueImpl;
// schedMsg.msg = NULL;
// schedMsg.thandle = (void *)1;
// schedMsg.ahandle = param;
// taosScheduleTask(queryQhandle, &schedMsg);
}
void vnodeFreeQInfo(void *param, bool decQueryRef) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册