提交 4bf606a5 编写于 作者: H hjxilinx

add order by ts support for super table query

上级 5ebd8a1b
......@@ -97,7 +97,7 @@ SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
*/
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo);
bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
......
......@@ -31,10 +31,13 @@ extern "C" {
#include "tsqlfunction.h"
#include "tutil.h"
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col, ord) \
((res->data + tscFieldInfoGetOffset(_queryinfo, col) * res->numOfRows) + \
(1 - ord.order) * (res->numOfRows - 1) * tscFieldInfoGetField(_queryinfo, col)->bytes)
//#define TSC_GET_RESPTR_BASE(res, _queryinfo, col, ord) \
// ((res->data + tscFieldInfoGetOffset(_queryinfo, col) * res->numOfRows) + \
// (1 - ord.order) * (res->numOfRows - 1) * tscFieldInfoGetField(_queryinfo, col)->bytes)
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col, ord) \
(res->data + tscFieldInfoGetOffset(_queryinfo, col) * res->numOfRows)
// forward declaration
struct SSqlInfo;
......
......@@ -1466,7 +1466,9 @@ static void first_function(SQLFunctionCtx *pCtx) {
}
memcpy(pCtx->aOutputBuf, data, pCtx->inputBytes);
DO_UPDATE_TAG_COLUMNS(pCtx, i);
TSKEY k = pCtx->ptsList[i];
DO_UPDATE_TAG_COLUMNS(pCtx, k);
SResultInfo *pInfo = GET_RES_INFO(pCtx);
pInfo->hasResult = DATA_SET_FLAG;
......
......@@ -467,7 +467,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
//todo refactor
if (tscProjectionQueryOnSTable(pParentQueryInfo, 0)) {
if (tscNonOrderedProjectionQueryOnSTable(pParentQueryInfo, 0)) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
......@@ -532,7 +532,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
pSql->res.numOfTotal += pSql->res.numOfRows;
}
if (tscProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) {
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
......@@ -603,7 +603,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
if (tscProjectionQueryOnSTable(pQueryInfo, 0)) {
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
if (pRes->row >= pRes->numOfRows && pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes &&
(!tscHasReachLimitation(pQueryInfo, pRes))) {
numOfFetch++;
......@@ -783,7 +783,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
* if the query is a continue query (vnodeIndex > 0 for projection query) for next vnode, do the retrieval of
* data instead of returning to its invoker
*/
if (pMeterMetaInfo->vnodeIndex > 0 && tscProjectionQueryOnSTable(pQueryInfo, 0)) {
if (pMeterMetaInfo->vnodeIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
assert(pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes);
pSupporter->pState->numOfCompleted = 0; // reset the record value
......
......@@ -4460,7 +4460,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
if (queryOnTags == true) { // local handle the metric tag query
pQueryInfo->command = TSDB_SQL_RETRIEVE_TAGS;
} else {
if (tscProjectionQueryOnSTable(pQueryInfo, 0)) {
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
if (pQueryInfo->order.orderColId >= 0) {
if (pQueryInfo->limit.limit == -1) {
return invalidSqlErrMsg(pQueryInfo->msg, msg4);
......@@ -4473,6 +4473,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
if (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0) {
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
}
pQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; // for projection query on super table, all queries are subqueries
}
}
......@@ -5016,7 +5017,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
}
// projection query on metric does not compatible with "group by" syntax
if (tscProjectionQueryOnSTable(pQueryInfo, 0)) {
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
......
......@@ -257,9 +257,6 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
* in the edge edition, ip is 0, and at this time we use masterIp instead
* in the cluster edition, ip is vnode ip
*/
//(pSql->index) = (pSql->index + 1) % TSDB_VNODES_SUPPORT;
//continue;
pVPeersDesc[pSql->index].ip = tscMgmtIpList.ip[0];
}
*pCode = TSDB_CODE_SUCCESS;
......@@ -2484,12 +2481,12 @@ static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
pRes->bytes[i] = pField->bytes;
if (pQueryInfo->order.order == TSQL_SO_DESC) {
pRes->bytes[i] = -pRes->bytes[i];
pRes->tsrow[i] = ((pRes->data + offset * pRes->numOfRows) + (pRes->numOfRows - 1) * pField->bytes);
} else {
// if (pQueryInfo->order.order == TSQL_SO_DESC) {
// pRes->bytes[i] = -pRes->bytes[i];
// pRes->tsrow[i] = ((pRes->data + offset * pRes->numOfRows) + (pRes->numOfRows - 1) * pField->bytes);
// } else {
pRes->tsrow[i] = (pRes->data + offset * pRes->numOfRows);
}
// }
}
return 0;
......@@ -3422,7 +3419,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
* If the query result is exhausted, or current query is to free resource at server side,
* the connection will be recycled.
*/
if ((pRes->numOfRows == 0 && !(tscProjectionQueryOnSTable(pQueryInfo, 0) && pRes->offset > 0)) ||
if ((pRes->numOfRows == 0 && !(tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pRes->offset > 0)) ||
((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE)) {
tscTrace("%p no result or free resource, recycle connection", pSql);
taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user);
......
......@@ -360,8 +360,9 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) +
pRes->bytes[i] * (1 - pQueryInfo->order.order) * (pRes->numOfRows - 1);
// pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) +
// pRes->bytes[i] * (1 - pQueryInfo->order.order) * (pRes->numOfRows - 1);
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order);
}
*rows = pRes->tsrow;
......@@ -425,7 +426,7 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscProjectionQueryOnSTable(pQueryInfo, 0)) {
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
bool allSubqueryExhausted = true;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
......
......@@ -215,8 +215,8 @@ bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
return false;
}
// for projection query, iterate all qualified vnodes sequentially
if (tscProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
// for ordered projection query, iterate all qualified vnodes sequentially
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
return false;
}
......@@ -228,12 +228,13 @@ bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
return false;
}
bool tscProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, tableIndex);
/*
* In following cases, return false for project query on metric
* 1. failed to get metermeta from server; 2. not a metric; 3. limit 0; 4. show query, instead of a select query
* In following cases, return false for non ordered project query on super table
* 1. failed to get metermeta from server; 2. not a super table; 3. limitation is 0;
* 4. show queries, instead of a select query
*/
if (pMeterMetaInfo == NULL || !UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo) ||
pQueryInfo->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pQueryInfo->exprsInfo.numOfExprs == 0) {
......@@ -244,6 +245,11 @@ bool tscProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
if (tscQueryMetricTags(pQueryInfo)) {
return false;
}
// order by column exists, not a non-ordered projection query
if (pQueryInfo->order.orderColId >= 0) {
return false;
}
// for project query, only the following two function is allowed
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
......@@ -2070,7 +2076,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
}
int32_t totalVnode = pMeterMetaInfo->pMetricMeta->numOfVnodes;
return pRes->numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0) &&
return pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
(!tscHasReachLimitation(pQueryInfo, pRes)) && (pMeterMetaInfo->vnodeIndex < totalVnode - 1);
}
......@@ -2084,7 +2090,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
* no result returned from the current virtual node anymore, try the next vnode if exists
* if case of: multi-vnode super table projection query
*/
assert(pRes->numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes));
assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes));
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
int32_t totalVnode = pMeterMetaInfo->pMetricMeta->numOfVnodes;
......
......@@ -162,8 +162,8 @@ typedef struct SExtTagsInfo {
// sql function runtime context
typedef struct SQLFunctionCtx {
int32_t startOffset;
int32_t size;
int32_t order;
int32_t size; // number of rows
int32_t order; // asc|desc
int32_t scanFlag; // TODO merge with currentStage
int16_t inputType;
......
......@@ -630,7 +630,14 @@ int vnodeQueryFromCache(SMeterObj *pObj, SQuery *pQuery) {
setNullN(pData, type, bytes, pCacheBlock->numOfPoints);
} else {
pRead = pCacheBlock->offset[colIdx] + startPos * bytes;
memcpy(pData, pRead, numOfReads * bytes);
if (QUERY_IS_ASC_QUERY(pQuery)) {
memcpy(pData, pRead, numOfReads * bytes);
} else {
for(int32_t j = 0; j < numOfReads; ++j) {
memcpy(pData + bytes * j, pRead + (numOfReads - 1 - j) * bytes, bytes);
}
}
}
}
numOfQualifiedPoints = numOfReads;
......@@ -668,8 +675,7 @@ int vnodeQueryFromCache(SMeterObj *pObj, SQuery *pQuery) {
}
ids[numOfQualifiedPoints] = j;
if (++numOfQualifiedPoints == numOfReads) {
// qualified data are enough
if (++numOfQualifiedPoints == numOfReads) { // qualified data are enough
break;
}
}
......@@ -691,23 +697,22 @@ int vnodeQueryFromCache(SMeterObj *pObj, SQuery *pQuery) {
if (!vnodeFilterData(pQuery, &numOfActualRead, j)) {
continue;
}
ids[numOfReads - numOfQualifiedPoints - 1] = j;
if (++numOfQualifiedPoints == numOfReads) {
// qualified data are enough
ids[numOfQualifiedPoints] = j;
if (++numOfQualifiedPoints == numOfReads) { // qualified data are enough
break;
}
}
}
int32_t start = QUERY_IS_ASC_QUERY(pQuery) ? 0 : numOfReads - numOfQualifiedPoints;
// int32_t start = QUERY_IS_ASC_QUERY(pQuery) ? 0 : numOfReads - numOfQualifiedPoints;
for (int32_t j = 0; j < numOfQualifiedPoints; ++j) {
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) {
int16_t colIndex = pQuery->pSelectExpr[col].pBase.colInfo.colIdx;
int32_t bytes = pObj->schema[colIndex].bytes;
pData = pQuery->sdata[col]->data + (pQuery->pointsOffset + j) * bytes;
pRead = pCacheBlock->offset[colIndex] + ids[j + start] * bytes;
pRead = pCacheBlock->offset[colIndex] + ids[j/* + start*/] * bytes;
memcpy(pData, pRead, bytes);
}
......
......@@ -1640,11 +1640,15 @@ int vnodeQueryFromFile(SMeterObj *pObj, SQuery *pQuery) {
pData = pQuery->sdata[i]->data + pQuery->pointsOffset * bytes;
pRead = sdata[colBufferIndex]->data + startPos * bytes;
memcpy(pData, pRead, numOfReads * bytes);
if (QUERY_IS_ASC_QUERY(pQuery)) {
memcpy(pData, pRead, numOfReads * bytes);
} else { //reversed copy to output buffer
for(int32_t j = 0; j < numOfReads; ++j) {
memcpy(pData + bytes * j, pRead + (numOfReads - 1 - j) * bytes, bytes);
}
}
}
numOfQualifiedPoints = numOfReads;
} else {
// check each data one by one set the input column data
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
......@@ -1675,8 +1679,7 @@ int vnodeQueryFromFile(SMeterObj *pObj, SQuery *pQuery) {
}
ids[numOfQualifiedPoints] = j;
if (++numOfQualifiedPoints == numOfReads) {
// qualified data are enough
if (++numOfQualifiedPoints == numOfReads) { // qualified data are enough
break;
}
}
......@@ -1698,22 +1701,21 @@ int vnodeQueryFromFile(SMeterObj *pObj, SQuery *pQuery) {
if (!vnodeFilterData(pQuery, &numOfActualRead, j)) {
continue;
}
ids[numOfReads - numOfQualifiedPoints - 1] = j;
if (++numOfQualifiedPoints == numOfReads) {
// qualified data are enough
ids[numOfQualifiedPoints] = j;
if (++numOfQualifiedPoints == numOfReads) { // qualified data are enough
break;
}
}
}
int32_t start = QUERY_IS_ASC_QUERY(pQuery) ? 0 : numOfReads - numOfQualifiedPoints;
// int32_t start = QUERY_IS_ASC_QUERY(pQuery) ? 0 : numOfReads - numOfQualifiedPoints;
for (int32_t j = 0; j < numOfQualifiedPoints; ++j) {
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) {
int16_t colIndexInBuffer = pQuery->pSelectExpr[col].pBase.colInfo.colIdxInBuf;
int32_t bytes = GET_COLUMN_BYTES(pQuery, col);
pData = pQuery->sdata[col]->data + (pQuery->pointsOffset + j) * bytes;
pRead = sdata[colIndexInBuffer]->data + ids[j + start] * bytes;
pRead = sdata[colIndexInBuffer]->data + ids[j/* + start*/] * bytes;
memcpy(pData, pRead, bytes);
}
......
......@@ -4461,7 +4461,7 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl
SQuery * pQuery = pRuntimeEnv->pQuery;
SQueryCostSummary *pSummary = &pRuntimeEnv->summary;
TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
int64_t start = taosGetTimestampUs();
if (IS_DISK_DATA_BLOCK(pQuery)) {
......@@ -5297,11 +5297,11 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
// ts_comp query does not required reversed output
if (QUERY_IS_ASC_QUERY(pQuery) || isTSCompQuery(pQuery)) {
// if (QUERY_IS_ASC_QUERY(pQuery) || isTSCompQuery(pQuery)) {
pCtx->aOutputBuf = pQuery->sdata[i]->data;
} else { // point to the last position of output buffer for desc query
pCtx->aOutputBuf = pQuery->sdata[i]->data + (rows - 1) * pCtx->outputBytes;
}
// } else { // point to the last position of output buffer for desc query
// pCtx->aOutputBuf = pQuery->sdata[i]->data + (rows - 1) * pCtx->outputBytes;
// }
/*
* set the output buffer information and intermediate buffer
......@@ -5333,7 +5333,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
// set next output position
if (IS_OUTER_FORWARD(aAggs[functionId].nStatus)) {
pRuntimeEnv->pCtx[j].aOutputBuf += pRuntimeEnv->pCtx[j].outputBytes * output * factor;
pRuntimeEnv->pCtx[j].aOutputBuf += pRuntimeEnv->pCtx[j].outputBytes * output /** factor*/;
}
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
......@@ -5415,20 +5415,20 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) {
* @param pRuntimeEnv
*/
void moveDescOrderResultsToFront(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t maxrows = pQuery->pointsToRead;
if (QUERY_IS_ASC_QUERY(pQuery) || isTSCompQuery(pQuery)) {
return;
}
if (pQuery->pointsRead > 0 && pQuery->pointsRead < maxrows) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + (maxrows - pQuery->pointsRead) * bytes,
pQuery->pointsRead * bytes);
}
}
// SQuery *pQuery = pRuntimeEnv->pQuery;
// int32_t maxrows = pQuery->pointsToRead;
//
// if (QUERY_IS_ASC_QUERY(pQuery) || isTSCompQuery(pQuery)) {
// return;
// }
//
// if (pQuery->pointsRead > 0 && pQuery->pointsRead < maxrows) {
// for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
// int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
// memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + (maxrows - pQuery->pointsRead) * bytes,
// pQuery->pointsRead * bytes);
// }
// }
}
typedef struct SQueryStatus {
......
......@@ -1162,7 +1162,7 @@ static void vnodeSingleMeterIntervalProcessor(SQInfo *pQInfo) {
pQInfo->pointsRead += pQuery->pointsRead;
pQInfo->pointsInterpo += numOfInterpo;
moveDescOrderResultsToFront(pRuntimeEnv);
// moveDescOrderResultsToFront(pRuntimeEnv);
dTrace("%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d",
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo,
......
......@@ -1603,7 +1603,7 @@ void tColModelAppend(tColModel *dstModel, tFilePage *dstPage, void *srcData, int
tOrderDescriptor *tOrderDesCreate(int32_t *orderColIdx, int32_t numOfOrderCols, tColModel *pModel,
int32_t tsOrderType) {
tOrderDescriptor *desc = (tOrderDescriptor *)malloc(sizeof(tOrderDescriptor) + sizeof(int32_t) * numOfOrderCols);
tOrderDescriptor *desc = (tOrderDescriptor *)calloc(1, sizeof(tOrderDescriptor) + sizeof(int32_t) * numOfOrderCols);
if (desc == NULL) {
return NULL;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册