提交 6a150f01 编写于 作者: H Haojun Liao

[td-225] opt query perf.

上级 b1f85ec0
......@@ -235,9 +235,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT *pQueryHandle);
* Get current data block information
*
* @param pQueryHandle
* @param pBlockInfo
* @return
*/
SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle);
void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo* pBlockInfo);
/**
*
......
......@@ -119,7 +119,6 @@ static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
static void setQueryStatus(SQuery *pQuery, int8_t status);
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0)
static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
// todo move to utility
static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *group);
......@@ -562,7 +561,7 @@ static int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t sea
*/
static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SWindowResInfo *pWindowResInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pRuntimeEnv->scanFlag != MASTER_SCAN || (!isIntervalQuery(pQuery))) {
if (pRuntimeEnv->scanFlag != MASTER_SCAN || (!QUERY_IS_INTERVAL_QUERY(pQuery))) {
return pWindowResInfo->size;
}
......@@ -1178,7 +1177,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
// interval window query
if (isIntervalQuery(pQuery)) {
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
// decide the time window according to the primary timestamp
int64_t ts = tsCols[offset];
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
......@@ -1282,7 +1281,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
// interval query with limit applied
int32_t numOfRes = 0;
if (isIntervalQuery(pQuery)) {
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
} else {
numOfRes = getNumOfResult(pRuntimeEnv);
......@@ -1868,7 +1867,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
num = 128;
} else if (isIntervalQuery(pQuery)) { // time window query, allocate one page for each table
} else if (QUERY_IS_INTERVAL_QUERY(pQuery)) { // time window query, allocate one page for each table
size_t s = pQInfo->tableqinfoGroupInfo.numOfTables;
num = MAX(s, INITIAL_RESULT_ROWS_VALUE);
} else { // for super table query, one page for each subset
......@@ -2018,7 +2017,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
r |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pQuery->window.skey, pQuery->window.ekey, colId);
}
if (pRuntimeEnv->pTSBuf > 0 || isIntervalQuery(pQuery)) {
if (pRuntimeEnv->pTSBuf > 0 || QUERY_IS_INTERVAL_QUERY(pQuery)) {
r |= BLK_DATA_ALL_NEEDED;
}
}
......@@ -2202,13 +2201,15 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery->order.order);
TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
SDataBlockInfo blockInfo = {0};
while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1;
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
return 0;
}
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle);
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
// todo extract methods
if (QUERY_IS_INTERVAL_QUERY(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL) {
......@@ -2913,7 +2914,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
// group by normal columns and interval query on normal table
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
disableFuncInReverseScanImpl(pQInfo, pWindowResInfo, order);
} else { // for simple result of table query,
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo refactor
......@@ -3088,7 +3089,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
bool toContinue = false;
if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
// for each group result, call the finalize function for each column
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
......@@ -3280,7 +3281,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
// for each group result, call the finalize function for each column
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (pRuntimeEnv->groupbyNormalCol) {
......@@ -3326,9 +3327,9 @@ static bool hasMainOutput(SQuery *pQuery) {
}
static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void* pTable, STimeWindow win, void* buf) {
SQuery* pQuery = pRuntimeEnv->pQuery;
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo *pTableQueryInfo = buf;//calloc(1, sizeof(STableQueryInfo));
STableQueryInfo *pTableQueryInfo = buf;
pTableQueryInfo->win = win;
pTableQueryInfo->lastKey = win.skey;
......@@ -3336,16 +3337,14 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void
pTableQueryInfo->pTable = pTable;
pTableQueryInfo->cur.vgroupIndex = -1;
int32_t initialSize = 1;
int32_t initialThreshold = 1;
// set more initial size of interval/groupby query
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
initialSize = 20;
initialThreshold = 100;
int32_t initialSize = 20;
int32_t initialThreshold = 100;
initWindowResInfo(&pTableQueryInfo->windowResInfo, pRuntimeEnv, initialSize, initialThreshold, TSDB_DATA_TYPE_INT);
} else { // in other aggregate query, do not initialize the windowResInfo
}
initWindowResInfo(&pTableQueryInfo->windowResInfo, pRuntimeEnv, initialSize, initialThreshold, TSDB_DATA_TYPE_INT);
return pTableQueryInfo;
}
......@@ -3569,7 +3568,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t totalSubset = 0;
if (pQInfo->runtimeEnv.groupbyNormalCol || (isIntervalQuery(pQuery))) {
if (pQInfo->runtimeEnv.groupbyNormalCol || (QUERY_IS_INTERVAL_QUERY(pQuery))) {
totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
} else {
totalSubset = GET_NUM_OF_TABLEGROUP(pQInfo);
......@@ -3734,7 +3733,7 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
} else {
// there are results waiting for returned to client.
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) &&
(pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) &&
(pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) &&
(pRuntimeEnv->windowResInfo.size > 0)) {
return true;
}
......@@ -3917,12 +3916,13 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
STableQueryInfo* pTableQueryInfo = pQuery->current;
TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle;
SDataBlockInfo blockInfo = {0};
while (tsdbNextDataBlock(pQueryHandle)) {
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
return;
}
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle);
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
if (pQuery->limit.offset > blockInfo.rows) {
pQuery->limit.offset -= blockInfo.rows;
......@@ -3959,8 +3959,9 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
STableQueryInfo *pTableQueryInfo = pQuery->current;
SDataBlockInfo blockInfo = {0};
while (tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) {
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle);
tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle, &blockInfo);
if (QUERY_IS_ASC_QUERY(pQuery)) {
if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
......@@ -4066,7 +4067,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
return;
}
if (isSTableQuery && (!isIntervalQuery(pQuery)) && (!isFixedOutputQuery(pQuery))) {
if (isSTableQuery && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isFixedOutputQuery(pQuery))) {
return;
}
......@@ -4080,7 +4081,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
if (!isSTableQuery
&& (pQInfo->tableqinfoGroupInfo.numOfTables == 1)
&& (cond.order == TSDB_ORDER_ASC)
&& (!isIntervalQuery(pQuery))
&& (!QUERY_IS_INTERVAL_QUERY(pQuery))
&& (!isGroupbyNormalCol(pQuery->pGroupbyExpr))
&& (!isFixedOutputQuery(pQuery))
) {
......@@ -4173,7 +4174,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type);
}
} else if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
} else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
int32_t rows = getInitialPageNum(pQInfo);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize, pQInfo);
if (code != TSDB_CODE_SUCCESS) {
......@@ -4224,13 +4225,15 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
int64_t st = taosGetTimestampMs();
TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
SDataBlockInfo blockInfo = {0};
while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1;
if (isQueryKilled(pQInfo)) {
break;
}
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle);
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
STableQueryInfo **pTableQueryInfo = (STableQueryInfo**) taosHashGet(pQInfo->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid));
if(pTableQueryInfo == NULL) {
break;
......@@ -4243,7 +4246,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
if (!pRuntimeEnv->groupbyNormalCol) {
if (!isIntervalQuery(pQuery)) {
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1;
setExecutionContext(pQInfo, (*pTableQueryInfo)->groupIndex, blockInfo.window.ekey + step);
} else { // interval query
......@@ -4654,7 +4657,7 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) {
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (isIntervalQuery(pQuery)) {
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
for (int32_t i = 0; i < numOfGroup; ++i) {
SArray *group = GET_TABLEGROUP(pQInfo, i);
......@@ -4680,7 +4683,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
* if the groupIndex > 0, the query process must be completed yet, we only need to
* copy the data into output buffer
*/
if (isIntervalQuery(pQuery)) {
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
copyResToQueryResultBuf(pQInfo, pQuery);
#ifdef _DEBUG_VIEW
displayInterResult(pQuery->sdata, pRuntimeEnv, pQuery->sdata[0]->num);
......@@ -4889,7 +4892,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
while (1) {
tableIntervalProcessImpl(pRuntimeEnv, newStartKey);
if (isIntervalQuery(pQuery)) {
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
pQInfo->groupIndex = 0; // always start from 0
pQuery->rec.rows = 0;
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
......
......@@ -249,8 +249,6 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
pCheckInfo->initBuf = true;
int32_t order = pHandle->order;
// tsdbTakeMemSnapshot(pHandle->pTsdb, &pCheckInfo->mem, &pCheckInfo->imem);
// no data in buffer, abort
if (pHandle->mem == NULL && pHandle->imem == NULL) {
return false;
......@@ -392,8 +390,11 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
STable* pTable = pCheckInfo->pTableObj;
assert(pTable != NULL);
initTableMemIterator(pHandle, pCheckInfo);
if (!pCheckInfo->initBuf) {
initTableMemIterator(pHandle, pCheckInfo);
}
SDataRow row = getSDataRowInTableMem(pCheckInfo);
if (row == NULL) {
return false;
......@@ -1477,7 +1478,8 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
assert(numOfTables > 0);
SDataBlockInfo blockInfo = {0};
if (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL) {
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->order = TSDB_ORDER_DESC;
......@@ -1487,7 +1489,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
}
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
/*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo(pHandle);
/*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo(pHandle, &blockInfo);
/*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, sa);
if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) {
......@@ -1558,7 +1560,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
bool ret = tsdbNextDataBlock((void*) pSecQueryHandle);
assert(ret);
/*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle);
/*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo);
/*SArray *pDataBlock = */tsdbRetrieveDataBlock((void*) pSecQueryHandle, sa);
for (int32_t i = 0; i < numOfCols; ++i) {
......@@ -1750,7 +1752,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
return numOfRows;
}
SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle, SDataBlockInfo* pDataBlockInfo) {
STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;
SQueryFilePos* cur = &pHandle->cur;
STable* pTable = NULL;
......@@ -1763,16 +1765,12 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
pTable = pCheckInfo->pTableObj;
}
SDataBlockInfo blockInfo = {
.uid = pTable->tableId.uid,
.tid = pTable->tableId.tid,
.rows = cur->rows,
.window = cur->win,
.numOfCols = QH_GET_NUM_OF_COLS(pHandle),
};
return blockInfo;
pDataBlockInfo->uid = pTable->tableId.uid;
pDataBlockInfo->tid = pTable->tableId.tid;
pDataBlockInfo->rows = cur->rows;
pDataBlockInfo->window = cur->win;
pDataBlockInfo->numOfCols = QH_GET_NUM_OF_COLS(pHandle);
}
/*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册