提交 ec902f8b 编写于 作者: H hjxilinx

[td-98] refactor code in reversed scan

上级 47a983fb
......@@ -30,8 +30,6 @@ extern "C" {
struct tExprNode;
struct SSchema;
struct tSkipList;
struct tSkipListNode;
enum {
TSQL_NODE_EXPR = 0x1,
......
......@@ -50,6 +50,7 @@
#define GET_QINFO_ADDR(x) ((void*)((char *)(x)-offsetof(SQInfo, runtimeEnv)))
#define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index) * (step))
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC))
/* get the qinfo struct address from the query struct address */
#define GET_COLUMN_BYTES(query, colidx) \
......@@ -84,15 +85,24 @@ typedef enum {
QUERY_OVER = 0x8u,
} vnodeQueryStatus;
static void setQueryStatus(SQuery *pQuery, int8_t status);
bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
enum {
TS_JOIN_TS_EQUAL = 0,
TS_JOIN_TS_NOT_EQUALS = 1,
TS_JOIN_TAG_NOT_EQUALS = 2,
};
typedef struct {
int32_t status; // query status
TSKEY lastKey; // the lastKey value before query executed
STimeWindow w; // whole query time window
STimeWindow current; // current query window
int32_t windowIndex; // index of active time window result for interval query
STSCursor cur;
} SQueryStatusInfo;
static void setQueryStatus(SQuery *pQuery, int8_t status);
bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray* group);
static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult);
......@@ -2226,102 +2236,11 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi
pQuery->pSelectExpr[columnIndex].resBytes * realRowId;
}
//int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
// if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) ||
// (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) {
// dTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey,
// pQuery->window.ekey, pQuery->order.order);
//
// sem_post(&pQInfo->dataReady);
// return TSDB_CODE_SUCCESS;
// }
//
// pQuery->status = 0;
// pQuery->rec = (SResultRec){0};
//
// changeExecuteScanOrder(pQuery, true);
// SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
//
// /*
// * since we employ the output control mechanism in main loop.
// * so, disable it during data block scan procedure.
// */
// setScanLimitationByResultBuffer(pQuery);
//
// // save raw query range for applying to each subgroup
// pQuery->lastKey = pQuery->window.skey;
//
// // create runtime environment
// // SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel;
//
// // get one queried meter
// assert(0);
// // SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[0]->sid);
//
// pRuntimeEnv->pTSBuf = param;
// pRuntimeEnv->cur.vnodeIndex = -1;
//
// // set the ts-comp file traverse order
// if (param != NULL) {
// int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
// tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order);
// }
//
// assert(0);
// // int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pQInfo->runtimeEnv, pTagSchemaInfo, TSDB_ORDER_ASC, true);
// // if (ret != TSDB_CODE_SUCCESS) {
// // return ret;
// // }
//
// // createTableGroup(pQInfo->pSidSet);
//
// int32_t size = getInitialPageNum(pQInfo);
// int32_t ret = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, size, pQuery->rowSize);
// if (ret != TSDB_CODE_SUCCESS) {
// return ret;
// }
//
// if (pQuery->intervalTime == 0) {
// int16_t type = TSDB_DATA_TYPE_NULL;
//
// if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags;
// type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
// } else {
// type = TSDB_DATA_TYPE_INT; // group id
// }
//
// initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type);
// }
//
// pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, true);
//
// STsdbQueryCond cond = {
// .twindow = (STimeWindow) {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey},
// .order = pQuery->order.order,
// .colList = pQuery->colList,
//
// };
//
// pRuntimeEnv->pQueryHandle = tsdbQueryTables(NULL, &cond, &pQInfo->groupInfo);
//
// // metric query do not invoke interpolation, it will be done at the second-stage merge
// if (!isPointInterpoQuery(pQuery)) {
// pQuery->interpoType = TSDB_INTERPO_NONE;
// }
//
// TSKEY revisedStime = taosGetIntervalStartTimestamp(pQuery->window.skey, pQuery->intervalTime, pQuery->slidingTimeUnit,
// pQuery->precision);
// taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, revisedStime, 0, 0);
// pRuntimeEnv->stableQuery = true;
//
// return TSDB_CODE_SUCCESS;
//}
/**
* decrease the refcount for each table involved in this query
* @param pQInfo
*/
void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
UNUSED_FUNC void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
if (pQInfo != NULL) {
// assert(taosHashGetSize(pQInfo->groupInfo) >= 1);
}
......@@ -2355,7 +2274,7 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
#endif
}
void setTimestampRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t stime, int64_t etime) {
UNUSED_FUNC void setTimestampRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t stime, int64_t etime) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
......@@ -3255,10 +3174,10 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) {
pQuery->order.order = (pQuery->order.order) ^ 1u;
}
void setCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
SWITCH_ORDER(pRuntimeEnv->pCtx[i].order);// = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
}
}
......@@ -3373,66 +3292,6 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
typedef struct SQueryStatus {
int8_t status;
// TSKEY lastKey;
STSCursor cur;
} SQueryStatus;
// todo refactor
static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) {
SQuery *pQuery = pRuntimeEnv->pQuery;
pStatus->status = pQuery->status;
// pStatus->lastKey = pQuery->lastKey;
pStatus->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor
if (pRuntimeEnv->pTSBuf) {
pRuntimeEnv->pTSBuf->cur.order ^= 1u;
tsBufNextPos(pRuntimeEnv->pTSBuf);
}
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
// SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
// pQuery->lastKey = pQuery->window.skey;
}
static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
// pQuery->lastKey = pStatus->lastKey;
pQuery->status = pStatus->status;
tsBufSetCursor(pRuntimeEnv->pTSBuf, &pStatus->cur);
}
static UNUSED_FUNC void doReverseScan(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQueryStatus qStatus = {0};
if (!needReverseScan(pQuery)) {
return;
}
dTrace("QInfo:%p start to reverse scan", GET_QINFO_ADDR(pRuntimeEnv));
SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv);
// close necessary function execution during supplementary scan
disableFuncInReverseScan(pRuntimeEnv);
queryStatusSave(pRuntimeEnv, &qStatus);
// reverse scan from current position
doScanAllDataBlocks(pRuntimeEnv);
queryStatusRestore(pRuntimeEnv, &qStatus);
setCtxOrder(pRuntimeEnv);
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
}
void setQueryStatus(SQuery *pQuery, int8_t status) {
if (status == QUERY_NOT_COMPLETED) {
pQuery->status = status;
......@@ -3488,45 +3347,118 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
return toContinue;
}
static SQueryStatusInfo getQueryStatusInfo(SQueryRuntimeEnv* pRuntimeEnv) {
SQuery* pQuery = pRuntimeEnv->pQuery;
SQueryStatusInfo info = {
.status = pQuery->status,
.windowIndex = pRuntimeEnv->windowResInfo.curIndex,
.lastKey = pQuery->lastKey,
.w = pQuery->window,
};
return info;
}
static void setEnvBeforeReverseScan(SQueryRuntimeEnv* pRuntimeEnv, SQueryStatusInfo* pStatus) {
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SQuery* pQuery = pRuntimeEnv->pQuery;
// the step should be placed before order changed
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
pStatus->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor
if (pRuntimeEnv->pTSBuf) {
SWITCH_ORDER(pRuntimeEnv->pTSBuf->cur.order);
tsBufNextPos(pRuntimeEnv->pTSBuf);
}
// reverse order time range
pQuery->window.skey = pQuery->lastKey - step;
pQuery->window.ekey = pStatus->lastKey; // the start timestamp of current query
SWITCH_ORDER(pQuery->order.order);
SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv);
STsdbQueryCond cond = {
.twindow = pQuery->window,
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
};
// clean unused handle
if (pRuntimeEnv->pSecQueryHandle != NULL) {
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
}
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo);
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
switchCtxOrder(pRuntimeEnv);
disableFuncInReverseScan(pRuntimeEnv);
}
static void clearEnvAfterReverseScan(SQueryRuntimeEnv* pRuntimeEnv, TSKEY lastKey, SQueryStatusInfo* pStatus) {
SQuery* pQuery = pRuntimeEnv->pQuery;
SWITCH_ORDER(pQuery->order.order);
switchCtxOrder(pRuntimeEnv);
tsBufSetCursor(pRuntimeEnv->pTSBuf, &pStatus->cur);
if (pRuntimeEnv->pTSBuf) {
pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order;
}
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query
// during reverse scan
pQuery->lastKey = lastKey;
pQuery->status = pStatus->status;
pQuery->window = pStatus->w;
}
void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
// store the start query position
SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv);
int64_t skey = pQuery->lastKey;
int32_t status = pQuery->status;
int32_t prevSlot = pRuntimeEnv->windowResInfo.curIndex;
SQueryStatusInfo qstatus = getQueryStatusInfo(pRuntimeEnv);
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
while (1) {
doScanAllDataBlocks(pRuntimeEnv);
if (pRuntimeEnv->scanFlag == MASTER_SCAN) {
qstatus.status = pQuery->status;
}
if (!needScanDataBlocksAgain(pRuntimeEnv)) {
// restore the status
// restore the status code and jump out of loop
if (pRuntimeEnv->scanFlag == REPEAT_SCAN) {
pQuery->status = status; // restore the status code when abort from repeat scan
pQuery->status = qstatus.status;
}
break;
}
STsdbQueryCond cond = {
.twindow = {.skey = skey, .ekey = pQuery->lastKey - step},
.twindow = {.skey = qstatus.lastKey, .ekey = pQuery->lastKey - step},
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
};
if (pRuntimeEnv->pSecQueryHandle == NULL) {
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo);
if (pRuntimeEnv->pSecQueryHandle != NULL) {
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
}
status = pQuery->status; // backup the status
pRuntimeEnv->windowResInfo.curIndex = prevSlot;
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo);
pRuntimeEnv->windowResInfo.curIndex = qstatus.windowIndex;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
pRuntimeEnv->scanFlag = REPEAT_SCAN;
......@@ -3541,63 +3473,14 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
return;
}
// save the query time window
STimeWindow prev = {.skey = pQuery->lastKey, .ekey = pQuery->window.ekey};
// reverse order time range
pQuery->window.skey = pQuery->lastKey - step;
pQuery->window.ekey = skey;
pQuery->order.order = (pQuery->order.order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
STsdbQueryCond cond = {
.twindow = pQuery->window,
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
};
// clean unused handle
if (pRuntimeEnv->pSecQueryHandle != NULL) {
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
}
TSKEY lastKey = pQuery->lastKey;
setEnvBeforeReverseScan(pRuntimeEnv, &qstatus);
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo);
dTrace("QInfo:%p start to reverse scan", GET_QINFO_ADDR(pRuntimeEnv));
SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv);
int32_t status1 = pQuery->status;
STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor
if (pRuntimeEnv->pTSBuf) {
pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order;
tsBufNextPos(pRuntimeEnv->pTSBuf);
}
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
setCtxOrder(pRuntimeEnv);
disableFuncInReverseScan(pRuntimeEnv);
// reverse scan from current position
dTrace("QInfo:%p start to reverse scan", GET_QINFO_ADDR(pRuntimeEnv));
doScanAllDataBlocks(pRuntimeEnv);
pQuery->order.order = (pQuery->order.order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
tsBufSetCursor(pRuntimeEnv->pTSBuf, &cur);
if (pRuntimeEnv->pTSBuf) {
pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order;
}
setCtxOrder(pRuntimeEnv);
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query
// during reverse scan
pQuery->lastKey = prev.skey;
pQuery->status = status1;
pQuery->window.ekey = prev.ekey;
clearEnvAfterReverseScan(pRuntimeEnv, lastKey, &qstatus);
}
void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
......@@ -4776,7 +4659,7 @@ static void doRestoreContext(SQInfo* pQInfo) {
pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1;
}
setCtxOrder(pRuntimeEnv);
switchCtxOrder(pRuntimeEnv);
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
}
......
......@@ -1049,7 +1049,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t* pQueryHandle) {
if (pTable->mem != NULL) {
// create mem table iterator if it is not created yet
assert(pCheckInfo->iter != NULL);
rows = tsdbReadRowsFromCache(pCheckInfo->iter, pHandle->window.ekey, 2, &skey, &ekey, pHandle);
rows = tsdbReadRowsFromCache(pCheckInfo->iter, pHandle->window.ekey, 4000, &skey, &ekey, pHandle);
// update the last key value
pCheckInfo->lastKey = ekey + step;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册