提交 75029e98 编写于 作者: H hjxilinx

[td-225] 1. fix bugs for first/last query in super table; 2. add STableIdTag info function

上级 5476ccaa
......@@ -62,7 +62,7 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
IF (${CMAKE_SIZEOF_VOID_P} MATCHES 8)
SET(TD_LINUX_64 TRUE)
SET(TD_OS_DIR ${TD_COMMUNITY_DIR}/src/os/linux)
ADD_DEFINITIONS(-D_M_X64)
ADD_DEFINITIONS(-D_M_X64 -D_DEBUG_VIEW)
MESSAGE(STATUS "The current platform is Linux 64-bit")
ELSEIF (${CMAKE_SIZEOF_VOID_P} MATCHES 4)
IF (TD_ARM)
......
......@@ -168,9 +168,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS;
}
// (uid, tid) + VGID + TAGSIZE + VARSTR_HEADER_SIZE
if (functionId == TSDB_FUNC_TID_TAG) { // todo use struct
*type = TSDB_DATA_TYPE_BINARY;
*bytes = dataBytes + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t); // (uid, tid) + VGID + TAGSIZE
*bytes = dataBytes + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE;
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
}
......@@ -5285,10 +5286,10 @@ SQLAggFuncElem aAggs[] = {{
},
{
// 34
"tid_tag", // return table id and the corresponding tags for join match
"tid_tag", // return table id and the corresponding tags for join match and subscribe
TSDB_FUNC_TID_TAG,
TSDB_FUNC_TID_TAG,
TSDB_FUNCSTATE_MO,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE,
function_setup,
noop1,
noop2,
......
......@@ -1123,7 +1123,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
if (addProjectionExprAndResultField(pQueryInfo, pItem) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_INVALID_SQL;
}
} else if (pItem->pNode->nSQLOptr >= TK_COUNT && pItem->pNode->nSQLOptr <= TK_AVG_IRATE) {
} else if (pItem->pNode->nSQLOptr >= TK_COUNT && pItem->pNode->nSQLOptr <= TK_TBID) {
// sql function in selection clause, append sql function info in pSqlCmd structure sequentially
if (addExprAndResultField(pQueryInfo, outputIndex, pItem, true) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_INVALID_SQL;
......@@ -1468,7 +1468,8 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema,
int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExprItem* pItem, bool finalResult) {
STableMetaInfo* pTableMetaInfo = NULL;
int32_t optr = pItem->pNode->nSQLOptr;
int32_t optr = pItem->pNode->nSQLOptr;
const char* msg1 = "not support column types";
const char* msg2 = "invalid parameters";
......@@ -1476,7 +1477,8 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
const char* msg4 = "invalid table name";
const char* msg5 = "parameter is out of range [0, 100]";
const char* msg6 = "function applied to tags not allowed";
const char* msg7 = "normal table can not apply this function";
switch (optr) {
case TK_COUNT: {
if (pItem->pNode->pParam != NULL && pItem->pNode->pParam->nExpr != 1) {
......@@ -1858,13 +1860,68 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
}
}
return TSDB_CODE_SUCCESS;
};
case TK_TBID: {
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg7);
}
// no parameters or more than one parameter for function
if (pItem->pNode->pParam == NULL || pItem->pNode->pParam->nExpr != 1) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
}
tSQLExpr* pParam = pItem->pNode->pParam->a[0].pNode;
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(&pParam->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
}
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
// functions can not be applied to normal columns
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
if (index.columnIndex < numOfCols) {
return invalidSqlErrMsg(pQueryInfo->msg, msg6);
}
index.columnIndex -= numOfCols;
// 2. valid the column type
int16_t colType = pSchema[index.columnIndex].type;
if (colType == TSDB_DATA_TYPE_BOOL || colType >= TSDB_DATA_TYPE_BINARY) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
}
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
SSchema* pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
SSchema s = pTagSchema[index.columnIndex];
int16_t bytes = 0;
int16_t type = 0;
int16_t inter = 0;
int32_t ret = getResultDataInfo(s.type, s.bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0);
assert(ret == TSDB_CODE_SUCCESS);
s.type = type;
s.bytes = bytes;
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_TID_TAG, &index, &s, TSDB_COL_TAG);
return TSDB_CODE_SUCCESS;
}
default:
return TSDB_CODE_INVALID_SQL;
}
}
// todo refactor
......
......@@ -412,10 +412,6 @@ static void updateQueryTimeRange(SQueryInfo* pQueryInfo, int64_t st, int64_t et)
static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* pSql) {
SSqlObj* pParentSql = pSupporter->pObj;
// SSqlCmd* pCmd = &pSql->cmd;
// SSqlRes* pRes = &pSql->res;
// SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
// if (tscNonOrderedProjectionQueryOnSTable(pParentQueryInfo, 0)) {
......@@ -602,21 +598,6 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
// tscError("%p abort query due to other subquery failure. code:%d, global code:%s", pSql, numOfRows,
// tstrerror(pSupporter->pState->code));
//
// quitAllSubquery(pParentSql, pSupporter);
// return;
// }
//
// if (numOfRows < 0) {
// tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
// pSupporter->pState->code = numOfRows;
// quitAllSubquery(pParentSql, pSupporter);
// return;
// }
// response of tag retrieve
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
if (numOfRows == 0 || pSql->res.completed) {
......@@ -1051,6 +1032,55 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
/////////////////////////////////////////////////////////////////////////////////////////
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);
//static void ddx(TAOS* taos) {
//
// SSqlCmd* pCmd = &pSql->cmd;
//
// tscPartiallyFreeSqlObj(pSql);
//// tscTrace("continue parse sql: %s", pSql->cmd.curSql);
//
// SQueryInfo* pQueryInfo = NULL;
// int32_t code = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
// assert(code == TSDB_CODE_SUCCESS);
//
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
//
// if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { // return the tableId & tag
// SSchema s = {0};
// SColumnIndex index = {0};
//
// size_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList);
// for (int32_t i = 0; i < numOfTags; ++i) {
// SColumn* c = taosArrayGetP(pTableMetaInfo->tagColList, i);
// index = (SColumnIndex){.tableIndex = 0, .columnIndex = c->colIndex.columnIndex};
//
// SSchema* pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
// s = pTagSchema[c->colIndex.columnIndex];
//
// int16_t bytes = 0;
// int16_t type = 0;
// int16_t inter = 0;
//
// getResultDataInfo(s.type, s.bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0);
//
// s.type = type;
// s.bytes = bytes;
//// pSupporter->tagSize = s.bytes;
// }
//
// // set get tags query type
// TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
//
// tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_TID_TAG, &index, &s, TSDB_COL_TAG);
// size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
//
// tscTrace( "%p vgroupIndex:%d, type:%d, tid_tag query to retrieve (tableId, tags), "
// "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
// pSql, pTableMetaInfo->vgroupIndex, pQueryInfo->type, tscSqlExprNumOfExprs(pQueryInfo),
// numOfCols, pQueryInfo->fieldsInfo.numOfOutput, pQueryInfo->pTableMetaInfo[0]->name);
// }
//}
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
// todo merge with callback
......
......@@ -206,19 +206,20 @@
#define TK_SUM_IRATE 188
#define TK_AVG_RATE 189
#define TK_AVG_IRATE 190
#define TK_SEMI 191
#define TK_NONE 192
#define TK_PREV 193
#define TK_LINEAR 194
#define TK_IMPORT 195
#define TK_METRIC 196
#define TK_TBNAME 197
#define TK_JOIN 198
#define TK_METRICS 199
#define TK_STABLE 200
#define TK_INSERT 201
#define TK_INTO 202
#define TK_VALUES 203
#define TK_TBID 191
#define TK_SEMI 192
#define TK_NONE 193
#define TK_PREV 194
#define TK_LINEAR 195
#define TK_IMPORT 196
#define TK_METRIC 197
#define TK_TBNAME 198
#define TK_JOIN 199
#define TK_METRICS 200
#define TK_STABLE 201
#define TK_INSERT 202
#define TK_INTO 203
#define TK_VALUES 204
#endif
......
......@@ -655,5 +655,5 @@ cmd ::= KILL QUERY IPTOKEN(X) COLON(Z) INTEGER(Y) COLON(K) INTEGER(F). {X
DELIMITERS DESC DETACH EACH END EXPLAIN FAIL FOR GLOB IGNORE IMMEDIATE INITIALLY INSTEAD
LIKE MATCH KEY OF OFFSET RAISE REPLACE RESTRICT ROW STATEMENT TRIGGER VIEW ALL
COUNT SUM AVG MIN MAX FIRST LAST TOP BOTTOM STDDEV PERCENTILE APERCENTILE LEASTSQUARES HISTOGRAM DIFF
SPREAD TWA INTERP LAST_ROW RATE IRATE SUM_RATE SUM_IRATE AVG_RATE AVG_IRATE NOW IPTOKEN SEMI NONE PREV LINEAR IMPORT
SPREAD TWA INTERP LAST_ROW RATE IRATE SUM_RATE SUM_IRATE AVG_RATE AVG_IRATE TBID NOW IPTOKEN SEMI NONE PREV LINEAR IMPORT
METRIC TBNAME JOIN METRICS STABLE NULL INSERT INTO VALUES.
......@@ -245,21 +245,8 @@ enum {
BLK_DATA_ALL_NEEDED = 0x3,
};
#define IS_FILE_BLOCK(x) (((x)&BLK_FILE_BLOCK) != 0)
#define SET_FILE_BLOCK_FLAG(x) \
do { \
(x) &= (~BLK_CACHE_BLOCK); \
(x) |= BLK_FILE_BLOCK; \
} while (0);
#define SET_CACHE_BLOCK_FLAG(x) ((x) = BLK_CACHE_BLOCK | BLK_BLOCK_LOADED);
#define SET_DATA_BLOCK_NOT_LOADED(x) ((x) &= (~BLK_BLOCK_LOADED));
#define SET_DATA_BLOCK_LOADED(x) ((x) |= BLK_BLOCK_LOADED);
#define IS_DATA_BLOCK_LOADED(x) (((x)&BLK_BLOCK_LOADED) != 0)
typedef struct STwaInfo {
TSKEY lastKey;
int8_t hasResult; // flag to denote has value
......@@ -291,7 +278,6 @@ bool top_bot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, char *mi
bool stableQueryFunctChanged(int32_t funcId);
void resetResultInfo(SResultInfo *pResInfo);
void initResultInfo(SResultInfo *pResInfo);
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable);
......
......@@ -225,6 +225,7 @@ static SKeyword keywordTable[] = {
{"TBNAME", TK_TBNAME},
{"JOIN", TK_JOIN},
{"METRICS", TK_METRICS},
{"TBID", TK_TBID},
{"STABLE", TK_STABLE},
{"FILE", TK_FILE},
{"VNODES", TK_VNODES},
......
......@@ -39,10 +39,10 @@
#define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0)
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
#define IS_MASTER_SCAN(runtime) (((runtime)->scanFlag & 1u) == MASTER_SCAN)
#define IS_SUPPLEMENT_SCAN(runtime) ((runtime)->scanFlag == SUPPLEMENTARY_SCAN)
#define SET_SUPPLEMENT_SCAN_FLAG(runtime) ((runtime)->scanFlag = SUPPLEMENTARY_SCAN)
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
#define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN)
#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == SUPPLEMENTARY_SCAN)
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = SUPPLEMENTARY_SCAN)
#define GET_QINFO_ADDR(x) ((void *)((char *)(x)-offsetof(SQInfo, runtimeEnv)))
......@@ -1101,7 +1101,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
}
// in the supplementary scan, only the following functions need to be executed
if (IS_SUPPLEMENT_SCAN(pRuntimeEnv) &&
if (IS_REVERSE_SCAN(pRuntimeEnv) &&
!(functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_FIRST ||
functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS)) {
return false;
......@@ -2450,8 +2450,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
qTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d",
GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order);
TsdbQueryHandleT pQueryHandle =
pRuntimeEnv->scanFlag == MASTER_SCAN ? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
while (tsdbNextDataBlock(pQueryHandle)) {
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
return 0;
......@@ -2835,11 +2834,12 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
return; // failed to save data in the disk
}
// set current query completed
// if (pQInfo->numOfGroupResultPages == 0 && pQInfo->groupIndex == pQInfo->pSidSet->numOfSubSet) {
// pQInfo->tableIndex = pQInfo->pSidSet->numOfTables;
// return;
// }
// check if all results has been sent to client
int32_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
if (pQInfo->numOfGroupResultPages == 0 && pQInfo->groupIndex == numOfGroup) {
pQInfo->tableIndex = pQInfo->groupInfo.numOfTables; // set query completed
return;
}
}
SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv;
......@@ -3087,7 +3087,31 @@ void setTableDataInfo(STableQueryInfo *pTableQueryInfo, int32_t tableIndex, int3
pTableQueryInfo->tableIndex = tableIndex;
}
static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *pWindowResInfo, int32_t order) {
static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *pTableQueryInfo) {
if (pTableQueryInfo == NULL) {
return;
}
// order has change already!
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (!QUERY_IS_ASC_QUERY(pQuery)) {
assert(pTableQueryInfo->win.ekey >= pTableQueryInfo->lastKey + step);
} else {
assert(pTableQueryInfo->win.ekey <= pTableQueryInfo->lastKey + step);
}
pTableQueryInfo->win.ekey = pTableQueryInfo->lastKey + step;
SWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, TSKEY);
pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
SWITCH_ORDER(pTableQueryInfo->cur.order);
pTableQueryInfo->cur.vgroupIndex = -1;
}
static void disableFuncInReverseScanImpl(SQInfo* pQInfo, SWindowResInfo *pWindowResInfo, int32_t order) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, i);
if (!pStatus->closed) {
......@@ -3108,18 +3132,32 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *
}
}
}
int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
for(int32_t i = 0; i < numOfGroups; ++i) {
SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, i);
qTrace("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->groupIndex - 1);
size_t t = taosArrayGetSize(group);
for (int32_t j = 0; j < t; ++j) {
SGroupItem *item = taosArrayGet(group, j);
updateTableQueryInfoForReverseScan(pQuery, item->info);
}
}
}
void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv) {
void disableFuncInReverseScan(SQInfo *pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t order = pQuery->order.order;
// group by normal columns and interval query on normal table
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
disableFuncInReverseScanImpl(pQInfo, pWindowResInfo, order);
} else { // for simple result of table query,
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo refactor
int32_t functId = pQuery->pSelectExpr[j].base.functionId;
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[j];
......@@ -3134,34 +3172,10 @@ void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u;
}
if (isIntervalQuery(pQuery)) {
// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableQueryInfo[i].pTableQInfo;
// SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
//
// doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
// }
} else {
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
}
pQuery->order.order = (pQuery->order.order) ^ 1u;
}
void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SWITCH_ORDER(pRuntimeEnv->pCtx[i]
.order); // = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
SWITCH_ORDER(pRuntimeEnv->pCtx[i] .order);
}
}
......@@ -3358,7 +3372,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
SWITCH_ORDER(pQuery->order.order);
SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv);
SET_REVERSE_SCAN_FLAG(pRuntimeEnv);
STsdbQueryCond cond = {
.twindow = pQuery->window,
......@@ -3376,7 +3390,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
switchCtxOrder(pRuntimeEnv);
disableFuncInReverseScan(pRuntimeEnv);
disableFuncInReverseScan(pQInfo);
}
static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) {
......@@ -3533,28 +3547,6 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols)
free(pTableQueryInfo);
}
void changeMeterQueryInfoForSuppleQuery(SQuery *pQuery, STableQueryInfo *pTableQueryInfo) {
if (pTableQueryInfo == NULL) {
return;
}
// order has change already!
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (!QUERY_IS_ASC_QUERY(pQuery)) {
assert(pTableQueryInfo->win.ekey >= pTableQueryInfo->lastKey + step);
} else {
assert(pTableQueryInfo->win.ekey <= pTableQueryInfo->lastKey + step);
}
pTableQueryInfo->win.ekey = pTableQueryInfo->lastKey + step;
SWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, TSKEY);
pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
pTableQueryInfo->cur.order = pTableQueryInfo->cur.order ^ 1u;
pTableQueryInfo->cur.vgroupIndex = -1;
}
void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -3943,9 +3935,16 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
data += bytes * numOfRows;
}
// all data returned, set query over
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
setQueryStatus(pQuery, QUERY_OVER);
if (pQInfo->runtimeEnv.stableQuery && isIntervalQuery(pQuery)) {
if (pQInfo->tableIndex >= pQInfo->groupInfo.numOfTables) {
setQueryStatus(pQuery, QUERY_OVER);
}
} else {
setQueryStatus(pQuery, QUERY_OVER);
}
}
}
......@@ -4368,7 +4367,8 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
int64_t st = taosGetTimestampMs();
TsdbQueryHandleT *pQueryHandle = pRuntimeEnv->pQueryHandle;
TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
while (tsdbNextDataBlock(pQueryHandle)) {
if (isQueryKilled(pQInfo)) {
break;
......@@ -4400,7 +4400,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
}
}
assert(pTableQueryInfo != NULL && pTableQueryInfo != NULL);
assert(pTableQueryInfo != NULL);
restoreIntervalQueryRange(pRuntimeEnv, pTableQueryInfo);
SDataStatis *pStatis = NULL;
......@@ -4759,28 +4759,35 @@ static void createTableQueryInfo(SQInfo *pQInfo) {
}
}
static void prepareQueryInfoForReverseScan(SQInfo *pQInfo) {
// SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
// STableQueryInfo *pTableQueryInfo = pQInfo->pTableQueryInfo[i].pTableQInfo;
// changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo);
// }
}
static void doSaveContext(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv);
disableFuncForReverseScan(pQInfo, pQuery->order.order);
SET_REVERSE_SCAN_FLAG(pRuntimeEnv);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
SWITCH_ORDER(pQuery->order.order);
if (pRuntimeEnv->pTSBuf != NULL) {
pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1u;
pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order;
}
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
prepareQueryInfoForReverseScan(pQInfo);
STsdbQueryCond cond = {
.twindow = pQuery->window,
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
};
// clean unused handle
if (pRuntimeEnv->pSecQueryHandle != NULL) {
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
}
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableIdGroupInfo);
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
switchCtxOrder(pRuntimeEnv);
disableFuncInReverseScan(pQInfo);
}
static void doRestoreContext(SQInfo *pQInfo) {
......@@ -4835,8 +4842,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
}
pQuery->rec.rows += pQuery->rec.rows;
if (pQuery->rec.rows == 0) {
// vnodePrintQueryStatistics(pSupporter);
}
......@@ -6287,7 +6292,10 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
SGroupItem* item = taosArrayGet(pa, i);
char* output = pQuery->sdata[0]->data + i * rsize;
*(int64_t*) output = item->id.uid; // memory align problem
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
output = varDataVal(output);
*(int64_t*) output = item->id.uid; // memory align problem, todo serialize
output += sizeof(item->id.uid);
*(int32_t*) output = item->id.tid;
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册