提交 7d020d70 编写于 作者: H Haojun Liao

[TD-2293]

上级 6f6fc46b
...@@ -136,7 +136,7 @@ typedef struct SSqlExpr { ...@@ -136,7 +136,7 @@ typedef struct SSqlExpr {
int16_t numOfParams; // argument value of each function int16_t numOfParams; // argument value of each function
tVariant param[3]; // parameters are not more than 3 tVariant param[3]; // parameters are not more than 3
int32_t offset; // sub result column value of arithmetic expression. int32_t offset; // sub result column value of arithmetic expression.
int16_t resColId; // result column id int16_t resColId; // result column id
} SSqlExpr; } SSqlExpr;
typedef struct SColumnIndex { typedef struct SColumnIndex {
...@@ -252,7 +252,7 @@ typedef struct SQueryInfo { ...@@ -252,7 +252,7 @@ typedef struct SQueryInfo {
int64_t clauseLimit; // limit for current sub clause int64_t clauseLimit; // limit for current sub clause
int64_t prjOffset; // offset value in the original sql expression, only applied at client side int64_t prjOffset; // offset value in the original sql expression, only applied at client side
int64_t tableLimit; // table limit in case of super table projection query + global order + limit int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int16_t resColumnId; // result column id int16_t resColumnId; // result column id
......
...@@ -726,10 +726,14 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr ...@@ -726,10 +726,14 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
SSchema p1 = {0}; SSchema p1 = {0};
if (pExpr->colInfo.colIndex != TSDB_TBNAME_COLUMN_INDEX) { if (pExpr->colInfo.colIndex == TSDB_TBNAME_COLUMN_INDEX) {
p1 = *tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pExpr->colInfo.colIndex);
} else {
p1 = tGetTableNameColumnSchema(); p1 = tGetTableNameColumnSchema();
} else if (pExpr->colInfo.colIndex == TSDB_UD_COLUMN_INDEX) {
p1.bytes = pExpr->resBytes;
p1.type = pExpr->resType;
tstrncpy(p1.name, pExpr->aliasName, tListLen(p1.name));
} else {
p1 = *tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pExpr->colInfo.colIndex);
} }
int32_t inter = 0; int32_t inter = 0;
......
...@@ -1310,7 +1310,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t ...@@ -1310,7 +1310,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
SColumnIndex index = {.tableIndex = tableIndex}; SColumnIndex index = {.tableIndex = tableIndex};
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_ARITHM, &index, TSDB_DATA_TYPE_DOUBLE, sizeof(double), SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_ARITHM, &index, TSDB_DATA_TYPE_DOUBLE, sizeof(double),
-1000, sizeof(double), false); getNewResColId(pQueryInfo), sizeof(double), false);
char* name = (pItem->aliasName != NULL)? pItem->aliasName:pItem->pNode->token.z; char* name = (pItem->aliasName != NULL)? pItem->aliasName:pItem->pNode->token.z;
size_t len = MIN(sizeof(pExpr->aliasName), pItem->pNode->token.n + 1); size_t len = MIN(sizeof(pExpr->aliasName), pItem->pNode->token.n + 1);
...@@ -5312,7 +5312,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn ...@@ -5312,7 +5312,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
// keep original limitation value in globalLimit // keep original limitation value in globalLimit
pQueryInfo->clauseLimit = pQueryInfo->limit.limit; pQueryInfo->clauseLimit = pQueryInfo->limit.limit;
pQueryInfo->prjOffset = pQueryInfo->limit.offset; pQueryInfo->prjOffset = pQueryInfo->limit.offset;
pQueryInfo->tableLimit = -1; pQueryInfo->vgroupLimit = -1;
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
/* /*
...@@ -5322,7 +5322,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn ...@@ -5322,7 +5322,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
* than or equal to the value of limit. * than or equal to the value of limit.
*/ */
if (pQueryInfo->limit.limit > 0) { if (pQueryInfo->limit.limit > 0) {
pQueryInfo->tableLimit = pQueryInfo->limit.limit + pQueryInfo->limit.offset; pQueryInfo->vgroupLimit = pQueryInfo->limit.limit + pQueryInfo->limit.offset;
pQueryInfo->limit.limit = -1; pQueryInfo->limit.limit = -1;
} }
......
...@@ -681,7 +681,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -681,7 +681,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->queryType = htonl(pQueryInfo->type); pQueryMsg->queryType = htonl(pQueryInfo->type);
pQueryMsg->tableLimit = htobe64(pQueryInfo->tableLimit); pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit);
size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number
......
...@@ -2023,6 +2023,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -2023,6 +2023,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pNewQueryInfo->limit = pQueryInfo->limit; pNewQueryInfo->limit = pQueryInfo->limit;
pNewQueryInfo->slimit = pQueryInfo->slimit; pNewQueryInfo->slimit = pQueryInfo->slimit;
pNewQueryInfo->order = pQueryInfo->order; pNewQueryInfo->order = pQueryInfo->order;
pNewQueryInfo->vgroupLimit = pQueryInfo->vgroupLimit;
pNewQueryInfo->tsBuf = NULL; pNewQueryInfo->tsBuf = NULL;
pNewQueryInfo->fillType = pQueryInfo->fillType; pNewQueryInfo->fillType = pQueryInfo->fillType;
pNewQueryInfo->fillVal = NULL; pNewQueryInfo->fillVal = NULL;
......
...@@ -476,7 +476,7 @@ typedef struct { ...@@ -476,7 +476,7 @@ typedef struct {
int16_t numOfGroupCols; // num of group by columns int16_t numOfGroupCols; // num of group by columns
int16_t orderByIdx; int16_t orderByIdx;
int16_t orderType; // used in group by xx order by xxx int16_t orderType; // used in group by xx order by xxx
int64_t tableLimit; // limit the number of rows for each table, used in order by + limit in stable projection query. int64_t vgroupLimit; // limit the number of rows for each table, used in order by + limit in stable projection query.
int16_t prjOrder; // global order in super table projection query. int16_t prjOrder; // global order in super table projection query.
int64_t limit; int64_t limit;
int64_t offset; int64_t offset;
......
...@@ -140,6 +140,11 @@ typedef struct SQueryCostInfo { ...@@ -140,6 +140,11 @@ typedef struct SQueryCostInfo {
uint64_t numOfTimeWindows; uint64_t numOfTimeWindows;
} SQueryCostInfo; } SQueryCostInfo;
typedef struct {
int64_t vgroupLimit;
int64_t ts;
} SOrderedPrjQueryInfo;
typedef struct SQuery { typedef struct SQuery {
int16_t numOfCols; int16_t numOfCols;
int16_t numOfTags; int16_t numOfTags;
...@@ -167,6 +172,7 @@ typedef struct SQuery { ...@@ -167,6 +172,7 @@ typedef struct SQuery {
tFilePage** sdata; tFilePage** sdata;
STableQueryInfo* current; STableQueryInfo* current;
SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query.
SSingleColumnFilterInfo* pFilterInfo; SSingleColumnFilterInfo* pFilterInfo;
} SQuery; } SQuery;
......
...@@ -5479,6 +5479,12 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5479,6 +5479,12 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
// return; // return;
// } // }
if (pQuery->prjInfo.vgroupLimit != -1) {
assert(pQuery->limit.limit == -1 && pQuery->limit.offset == 0);
} else if (pQuery->limit.limit != -1) {
assert(pQuery->prjInfo.vgroupLimit == -1);
}
bool hasMoreBlock = true; bool hasMoreBlock = true;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
SQueryCostInfo *summary = &pRuntimeEnv->summary; SQueryCostInfo *summary = &pRuntimeEnv->summary;
...@@ -5491,7 +5497,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5491,7 +5497,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
STableQueryInfo **pTableQueryInfo = STableQueryInfo **pTableQueryInfo =
(STableQueryInfo **)taosHashGet(pQInfo->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid)); (STableQueryInfo **) taosHashGet(pQInfo->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid));
if (pTableQueryInfo == NULL) { if (pTableQueryInfo == NULL) {
break; break;
} }
...@@ -5503,6 +5509,25 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5503,6 +5509,25 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
setTagVal(pRuntimeEnv, pQuery->current->pTable, pQInfo->tsdb); setTagVal(pRuntimeEnv, pQuery->current->pTable, pQInfo->tsdb);
} }
if (pQuery->prjInfo.vgroupLimit > 0 && pQuery->current->windowResInfo.size > pQuery->prjInfo.vgroupLimit) {
pQuery->current->lastKey =
QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
continue;
}
// it is a super table ordered projection query, check for the number of output for each vgroup
if (pQuery->prjInfo.vgroupLimit > 0 && pQuery->rec.rows >= pQuery->prjInfo.vgroupLimit) {
if (QUERY_IS_ASC_QUERY(pQuery) && blockInfo.window.skey >= pQuery->prjInfo.ts) {
pQuery->current->lastKey =
QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
continue;
} else if (!QUERY_IS_ASC_QUERY(pQuery) && blockInfo.window.ekey <= pQuery->prjInfo.ts) {
pQuery->current->lastKey =
QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
continue;
}
}
uint32_t status = 0; uint32_t status = 0;
SDataStatis *pStatis = NULL; SDataStatis *pStatis = NULL;
SArray *pDataBlock = NULL; SArray *pDataBlock = NULL;
...@@ -5520,6 +5545,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5520,6 +5545,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
} }
ensureOutputBuffer(pRuntimeEnv, &blockInfo); ensureOutputBuffer(pRuntimeEnv, &blockInfo);
int64_t prev = getNumOfResult(pRuntimeEnv);
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1; pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1;
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock); int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
...@@ -5530,17 +5557,30 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -5530,17 +5557,30 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pQuery->rec.rows = getNumOfResult(pRuntimeEnv); pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
int64_t inc = pQuery->rec.rows - prev;
pQuery->current->windowResInfo.size += inc;
// the flag may be set by tableApplyFunctionsOnBlock, clear it here // the flag may be set by tableApplyFunctionsOnBlock, clear it here
CLEAR_QUERY_STATUS(pQuery, QUERY_COMPLETED); CLEAR_QUERY_STATUS(pQuery, QUERY_COMPLETED);
updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo); updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo);
skipResults(pRuntimeEnv);
// the limitation of output result is reached, set the query completed if (pQuery->prjInfo.vgroupLimit >= 0) {
if (limitResults(pRuntimeEnv)) { if (((pQuery->rec.rows + pQuery->rec.total) < pQuery->prjInfo.vgroupLimit) || ((pQuery->rec.rows + pQuery->rec.total) > pQuery->prjInfo.vgroupLimit && prev < pQuery->prjInfo.vgroupLimit)) {
setQueryStatus(pQuery, QUERY_COMPLETED); if (QUERY_IS_ASC_QUERY(pQuery) && pQuery->prjInfo.ts < blockInfo.window.ekey) {
SET_STABLE_QUERY_OVER(pQInfo); pQuery->prjInfo.ts = blockInfo.window.ekey;
break; } else if (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->prjInfo.ts > blockInfo.window.skey) {
pQuery->prjInfo.ts = blockInfo.window.skey;
}
}
} else {
// the limitation of output result is reached, set the query completed
skipResults(pRuntimeEnv);
if (limitResults(pRuntimeEnv)) {
setQueryStatus(pQuery, QUERY_COMPLETED);
SET_STABLE_QUERY_OVER(pQInfo);
break;
}
} }
// while the output buffer is full or limit/offset is applied, query may be paused here // while the output buffer is full or limit/offset is applied, query may be paused here
...@@ -6284,7 +6324,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -6284,7 +6324,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg->interval.offset = htobe64(pQueryMsg->interval.offset); pQueryMsg->interval.offset = htobe64(pQueryMsg->interval.offset);
pQueryMsg->limit = htobe64(pQueryMsg->limit); pQueryMsg->limit = htobe64(pQueryMsg->limit);
pQueryMsg->offset = htobe64(pQueryMsg->offset); pQueryMsg->offset = htobe64(pQueryMsg->offset);
pQueryMsg->tableLimit = htobe64(pQueryMsg->tableLimit); pQueryMsg->vgroupLimit = htobe64(pQueryMsg->vgroupLimit);
pQueryMsg->order = htons(pQueryMsg->order); pQueryMsg->order = htons(pQueryMsg->order);
pQueryMsg->orderColId = htons(pQueryMsg->orderColId); pQueryMsg->orderColId = htons(pQueryMsg->orderColId);
...@@ -6885,6 +6925,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -6885,6 +6925,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQuery->fillType = pQueryMsg->fillType; pQuery->fillType = pQueryMsg->fillType;
pQuery->numOfTags = pQueryMsg->numOfTags; pQuery->numOfTags = pQueryMsg->numOfTags;
pQuery->tagColList = pTagCols; pQuery->tagColList = pTagCols;
pQuery->prjInfo.vgroupLimit = pQueryMsg->vgroupLimit;
pQuery->prjInfo.ts = (pQueryMsg->order == TSDB_ORDER_ASC)? INT64_MIN:INT64_MAX;
pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
if (pQuery->colList == NULL) { if (pQuery->colList == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册