未验证 提交 d5dbbb77 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2609 from taosdata/feature/query

Feature/query
......@@ -87,6 +87,16 @@ typedef struct SVgroupTableInfo {
SArray* itemList; //SArray<STableIdInfo>
} SVgroupTableInfo;
static FORCE_INLINE SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex) {
assert(pCmd != NULL && subClauseIndex >= 0 && subClauseIndex < TSDB_MAX_UNION_CLAUSE);
if (pCmd->pQueryInfo == NULL || subClauseIndex >= pCmd->numOfClause) {
return NULL;
}
return pCmd->pQueryInfo[subClauseIndex];
}
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
......
......@@ -477,7 +477,13 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscDebug("%p redo parse sql string to build submit block", pSql);
pCmd->parseFinished = false;
if ((code = tsParseSql(pSql, true)) == TSDB_CODE_SUCCESS) {
code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
}
if (code == TSDB_CODE_SUCCESS) {
/*
* Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks,
* and send the required submit block according to index value in supporter to server.
......
......@@ -340,13 +340,12 @@ bool stableQueryFunctChanged(int32_t funcId) {
*/
void resetResultInfo(SResultInfo *pResInfo) { pResInfo->initialized = false; }
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable) {
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf) {
assert(pResInfo->interResultBuf == NULL);
pResInfo->bufLen = size;
pResInfo->superTableQ = superTable;
pResInfo->interResultBuf = calloc(1, (size_t)size);
pResInfo->interResultBuf = buf;
}
// set the query flag to denote that query is completed
......
......@@ -1464,16 +1464,6 @@ STableMetaInfo* tscGetMetaInfo(SQueryInfo* pQueryInfo, int32_t tableIndex) {
return pQueryInfo->pTableMetaInfo[tableIndex];
}
SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex) {
assert(pCmd != NULL && subClauseIndex >= 0 && subClauseIndex < TSDB_MAX_UNION_CLAUSE);
if (pCmd->pQueryInfo == NULL || subClauseIndex >= pCmd->numOfClause) {
return NULL;
}
return pCmd->pQueryInfo[subClauseIndex];
}
int32_t tscGetQueryInfoDetailSafely(SSqlCmd* pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo) {
int32_t ret = TSDB_CODE_SUCCESS;
......@@ -2097,7 +2087,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
}
void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) {
SFieldSupInfo* pInfo = taosArrayGet(pFieldInfo->pSupportInfo, columnIndex);//tscFieldInfoGetSupp(pFieldInfo, columnIndex);
SFieldSupInfo* pInfo = taosArrayGet(pFieldInfo->pSupportInfo, columnIndex);
assert(pInfo->pSqlExpr != NULL);
int32_t type = pInfo->pSqlExpr->resType;
......@@ -2112,7 +2102,7 @@ void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t column
if (isNull(pData, type)) {
pRes->tsrow[columnIndex] = NULL;
} else {
pRes->tsrow[columnIndex] = pData + VARSTR_HEADER_SIZE;
pRes->tsrow[columnIndex] = ((tstr*)pData)->data;
}
if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor
......
......@@ -172,6 +172,7 @@ typedef struct SQueryRuntimeEnv {
bool topBotQuery; // false
bool groupbyNormalCol; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not
int32_t interBufSize; // intermediate buffer sizse
int32_t prevGroupId; // previous executed group id
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
} SQueryRuntimeEnv;
......
......@@ -15,6 +15,8 @@
#ifndef TDENGINE_QUERYUTIL_H
#define TDENGINE_QUERYUTIL_H
int32_t getOutputInterResultBufSize(SQuery* pQuery);
void clearTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* pOneOutputRes);
void copyTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* dst, const SWindowResult* src);
......@@ -35,7 +37,7 @@ SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot);
#define curTimeWindow(_winres) ((_winres)->curIndex)
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot);
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo);
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo, size_t interBufSize);
char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult);
......
......@@ -272,7 +272,7 @@ bool top_bot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, char *mi
bool stableQueryFunctChanged(int32_t funcId);
void resetResultInfo(SResultInfo *pResInfo);
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable);
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf);
static FORCE_INLINE void initResultInfo(SResultInfo *pResInfo) {
pResInfo->initialized = true; // the this struct has been initialized flag
......
......@@ -123,6 +123,14 @@ static void setQueryStatus(SQuery *pQuery, int8_t status);
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0)
// previous time window may not be of the same size of pQuery->intervalTime
#define GET_NEXT_TIMEWINDOW(_q, tw) \
do { \
int32_t factor = GET_FORWARD_DIRECTION_FACTOR((_q)->order.order); \
(tw)->skey += ((_q)->slidingTime * factor); \
(tw)->ekey = (tw)->skey + ((_q)->intervalTime - 1); \
} while (0)
// todo move to utility
static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *group);
......@@ -130,7 +138,6 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
static void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult);
static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow);
static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
SDataStatis *pStatis, void *param, int32_t colIndex);
......@@ -419,7 +426,7 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
for (int32_t i = pWindowResInfo->capacity; i < newCap; ++i) {
SPosInfo pos = {-1, -1};
createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &pos);
createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &pos, pRuntimeEnv->interBufSize);
}
pWindowResInfo->capacity = newCap;
}
......@@ -551,19 +558,29 @@ static SWindowStatus *getTimeWindowResStatus(SWindowResInfo *pWindowResInfo, int
static int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int16_t pos,
int16_t order, int64_t *pData) {
int32_t endPos = searchFn((char *)pData, numOfRows, ekey, order);
int32_t forwardStep = 0;
if (endPos >= 0) {
forwardStep = (order == TSDB_ORDER_ASC) ? (endPos - pos) : (pos - endPos);
assert(forwardStep >= 0);
if (order == TSDB_ORDER_ASC) {
int32_t end = searchFn((char*) &pData[pos], numOfRows - pos, ekey, order);
if (end >= 0) {
forwardStep = end;
// endPos data is equalled to the key so, we do need to read the element in endPos
if (pData[endPos] == ekey) {
forwardStep += 1;
if (pData[end + pos] == ekey) {
forwardStep += 1;
}
}
} else {
int32_t end = searchFn((char *)pData, pos + 1, ekey, order);
if (end >= 0) {
forwardStep = pos - end;
if (pData[end] == ekey) {
forwardStep += 1;
}
}
}
assert(forwardStep > 0);
return forwardStep;
}
......@@ -686,7 +703,7 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
}
}
assert(num >= 0);
assert(num > 0);
return num;
}
......@@ -736,59 +753,60 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus
}
}
static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pNextWin,
SDataBlockInfo *pDataBlockInfo, TSKEY *primaryKeys,
__block_search_fn_t searchFn) {
static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pNext, SDataBlockInfo *pDataBlockInfo,
TSKEY *primaryKeys, __block_search_fn_t searchFn, int32_t prevPosition) {
SQuery *pQuery = pRuntimeEnv->pQuery;
// tumbling time window query, a special case of sliding time window query
if (pQuery->slidingTime == pQuery->intervalTime) {
// todo opt
}
getNextTimeWindow(pQuery, pNextWin);
GET_NEXT_TIMEWINDOW(pQuery, pNext);
// next time window is not in current block
if ((pNextWin->skey > pDataBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pNextWin->ekey < pDataBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQuery))) {
if ((pNext->skey > pDataBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pNext->ekey < pDataBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQuery))) {
return -1;
}
TSKEY startKey = -1;
if (QUERY_IS_ASC_QUERY(pQuery)) {
startKey = pNextWin->skey;
startKey = pNext->skey;
if (startKey < pQuery->window.skey) {
startKey = pQuery->window.skey;
}
} else {
startKey = pNextWin->ekey;
startKey = pNext->ekey;
if (startKey > pQuery->window.skey) {
startKey = pQuery->window.skey;
}
}
int32_t startPos = searchFn((char *)primaryKeys, pDataBlockInfo->rows, startKey, pQuery->order.order);
int32_t startPos = 0;
// tumbling time window query, a special case of sliding time window query
if (pQuery->slidingTime == pQuery->intervalTime && prevPosition != -1) {
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
startPos = prevPosition + factor;
} else {
startPos = searchFn((char *)primaryKeys, pDataBlockInfo->rows, startKey, pQuery->order.order);
}
/*
* This time window does not cover any data, try next time window,
* this case may happen when the time window is too small
*/
if (QUERY_IS_ASC_QUERY(pQuery) && primaryKeys[startPos] > pNextWin->ekey) {
if (QUERY_IS_ASC_QUERY(pQuery) && primaryKeys[startPos] > pNext->ekey) {
TSKEY next = primaryKeys[startPos];
pNextWin->ekey += ((next - pNextWin->ekey + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime;
pNextWin->skey = pNextWin->ekey - pQuery->intervalTime + 1;
} else if ((!QUERY_IS_ASC_QUERY(pQuery)) && primaryKeys[startPos] < pNextWin->skey) {
pNext->ekey += ((next - pNext->ekey + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime;
pNext->skey = pNext->ekey - pQuery->intervalTime + 1;
} else if ((!QUERY_IS_ASC_QUERY(pQuery)) && primaryKeys[startPos] < pNext->skey) {
TSKEY next = primaryKeys[startPos];
pNextWin->skey -= ((pNextWin->skey - next + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime;
pNextWin->ekey = pNextWin->skey + pQuery->intervalTime - 1;
pNext->skey -= ((pNext->skey - next + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime;
pNext->ekey = pNext->skey + pQuery->intervalTime - 1;
}
return startPos;
}
static TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) {
static FORCE_INLINE TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) {
TSKEY ekey = -1;
if (QUERY_IS_ASC_QUERY(pQuery)) {
ekey = pWindow->ekey;
......@@ -924,20 +942,23 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
return;
}
int32_t forwardStep = 0;
int32_t startPos = pQuery->pos;
if (hasTimeWindow) {
TSKEY ekey = reviseWindowEkey(pQuery, &win);
int32_t forwardStep =
getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true);
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep, tsCols, pDataBlockInfo->rows);
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, startPos, forwardStep, tsCols, pDataBlockInfo->rows);
}
int32_t index = pWindowResInfo->curIndex;
STimeWindow nextWin = win;
while (1) {
int32_t startPos = getNextQualifiedWindow(pRuntimeEnv, &nextWin, pDataBlockInfo, tsCols, searchFn);
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
startPos = getNextQualifiedWindow(pRuntimeEnv, &nextWin, pDataBlockInfo, tsCols, searchFn, prevEndPos);
if (startPos < 0) {
break;
}
......@@ -953,7 +974,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
}
TSKEY ekey = reviseWindowEkey(pQuery, &nextWin);
int32_t forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true);
SWindowStatus* pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows);
......@@ -1224,7 +1245,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
int32_t index = pWindowResInfo->curIndex;
while (1) {
getNextTimeWindow(pQuery, &nextWin);
GET_NEXT_TIMEWINDOW(pQuery, &nextWin);
if (/*pWindowResInfo->startTime > nextWin.skey ||*/
(nextWin.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(nextWin.skey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
......@@ -1236,7 +1257,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
// null data, failed to allocate more memory buffer
bool hasTimeWindow = false;
hasTimeWindow = false;
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) {
break;
}
......@@ -1459,11 +1480,13 @@ static void setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *p
}
}
static FORCE_INLINE void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery) {
static FORCE_INLINE void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery, char* buf) {
char* p = buf;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
assert(pQuery->pSelectExpr[i].interBytes <= DEFAULT_INTERN_BUF_PAGE_SIZE);
setResultInfoBuf(&pResultInfo[i], pQuery->pSelectExpr[i].interBytes, isStableQuery);
int32_t size = pQuery->pSelectExpr[i].interBytes;
setResultInfoBuf(&pResultInfo[i], size, isStableQuery, p);
p += size;
}
}
......@@ -1542,8 +1565,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
}
}
char* buf = calloc(1, pRuntimeEnv->interBufSize);
// set the intermediate result output buffer
setWindowResultInfo(pRuntimeEnv->resultInfo, pQuery, pRuntimeEnv->stableQuery);
setWindowResultInfo(pRuntimeEnv->resultInfo, pQuery, pRuntimeEnv->stableQuery, buf);
// if it is group by normal column, do not set output buffer, the output buffer is pResult
if (!isGroupbyNormalCol(pQuery->pGroupbyExpr) && !pRuntimeEnv->stableQuery) {
......@@ -1581,9 +1606,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tVariantDestroy(&pCtx->tag);
tfree(pCtx->tagInfo.pTagCtxList);
tfree(pRuntimeEnv->resultInfo[i].interResultBuf);
}
tfree(pRuntimeEnv->resultInfo[0].interResultBuf);
tfree(pRuntimeEnv->resultInfo);
tfree(pRuntimeEnv->pCtx);
}
......@@ -2017,14 +2042,6 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun
return true;
}
// previous time window may not be of the same size of pQuery->intervalTime
static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow) {
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
pTimeWindow->skey += (pQuery->slidingTime * factor);
pTimeWindow->ekey = pTimeWindow->skey + (pQuery->intervalTime - 1);
}
SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis) {
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -2737,7 +2754,8 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery);
char* buf = calloc(1, pRuntimeEnv->interBufSize);
setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery, buf);
resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo);
int64_t lastTimestamp = -1;
......@@ -2823,11 +2841,9 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
tfree(pTree);
pQInfo->offset = 0;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
tfree(pResultInfo[i].interResultBuf);
}
tfree(pResultInfo);
tfree(buf);
return pQInfo->numOfGroupResultPages;
}
......@@ -2980,14 +2996,16 @@ void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo) {
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo, size_t interBufSize) {
int32_t numOfCols = pQuery->numOfOutput;
pResultRow->resultInfo = calloc((size_t)numOfCols, sizeof(SResultInfo));
pResultRow->pos = *posInfo;
char* buf = calloc(1, interBufSize);
// set the intermediate result output buffer
setWindowResultInfo(pResultRow->resultInfo, pQuery, isSTableQuery);
setWindowResultInfo(pResultRow->resultInfo, pQuery, isSTableQuery, buf);
}
void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
......@@ -3365,7 +3383,7 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void
// set more initial size of interval/groupby query
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
int32_t initialSize = 20;
int32_t initialSize = 16;
int32_t initialThreshold = 100;
initWindowResInfo(&pTableQueryInfo->windowResInfo, pRuntimeEnv, initialSize, initialThreshold, TSDB_DATA_TYPE_INT);
} else { // in other aggregate query, do not initialize the windowResInfo
......@@ -3591,20 +3609,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) {
return loadPrimaryTS;
}
static int32_t getNumOfSubset(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t totalSubset = 0;
if (pQInfo->runtimeEnv.groupbyNormalCol || (QUERY_IS_INTERVAL_QUERY(pQuery))) {
totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
} else {
totalSubset = GET_NUM_OF_TABLEGROUP(pQInfo);
}
return totalSubset;
}
static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orderType) {
static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_t orderType) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
......@@ -3613,17 +3618,18 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
int32_t step = -1;
qDebug("QInfo:%p start to copy data from windowResInfo to query buf", pQInfo);
int32_t totalSubset = getNumOfSubset(pQInfo);
int32_t totalSet = numOfClosedTimeWindow(pResultInfo);
SWindowResult* result = pResultInfo->pResult;
if (orderType == TSDB_ORDER_ASC) {
startIdx = pQInfo->groupIndex;
step = 1;
} else { // desc order copy all data
startIdx = totalSubset - pQInfo->groupIndex - 1;
startIdx = totalSet - pQInfo->groupIndex - 1;
step = -1;
}
for (int32_t i = startIdx; (i < totalSubset) && (i >= 0); i += step) {
for (int32_t i = startIdx; (i < totalSet) && (i >= 0); i += step) {
if (result[i].numOfRows == 0) {
pQInfo->offset = 0;
pQInfo->groupIndex += 1;
......@@ -3678,11 +3684,11 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
* @param pQInfo
* @param result
*/
void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) {
void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSDB_ORDER_ASC;
int32_t numOfResult = doCopyToSData(pQInfo, result, orderType);
int32_t numOfResult = doCopyToSData(pQInfo, pResultInfo, orderType);
pQuery->rec.rows += numOfResult;
......@@ -4013,7 +4019,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
}
STimeWindow tw = win;
getNextTimeWindow(pQuery, &tw);
GET_NEXT_TIMEWINDOW(pQuery, &tw);
if (pQuery->limit.offset == 0) {
if ((tw.skey <= blockInfo.window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
......@@ -4025,7 +4031,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
tw = win;
int32_t startPos =
getNextQualifiedWindow(pRuntimeEnv, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey);
getNextQualifiedWindow(pRuntimeEnv, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
assert(startPos >= 0);
// set the abort info
......@@ -4068,7 +4074,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
tw = win;
int32_t startPos =
getNextQualifiedWindow(pRuntimeEnv, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey);
getNextQualifiedWindow(pRuntimeEnv, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
assert(startPos >= 0);
// set the abort info
......@@ -4197,7 +4203,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
type = TSDB_DATA_TYPE_INT; // group id
}
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type);
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 32, 4096, type);
}
} else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
......@@ -4505,7 +4511,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pQInfo->groupIndex = 0;
ensureOutputBufferSimple(pRuntimeEnv, pWindowResInfo->size);
copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult);
copyFromWindowResToSData(pQInfo, pWindowResInfo);
pQInfo->groupIndex = currentGroupIndex; //restore the group index
assert(pQuery->rec.rows == pWindowResInfo->size);
......@@ -4520,7 +4526,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
* we need to return it to client in the first place.
*/
if (pQInfo->groupIndex > 0) {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
pQuery->rec.total += pQuery->rec.rows;
if (pQuery->rec.rows > 0) {
......@@ -4721,7 +4727,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
displayInterResult(pQuery->sdata, pRuntimeEnv, pQuery->sdata[0]->num);
#endif
} else {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
}
qDebug("QInfo:%p current:%"PRId64", total:%"PRId64"", pQInfo, pQuery->rec.rows, pQuery->rec.total);
......@@ -4772,7 +4778,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
#endif
}
} else { // not a interval query
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
}
// handle the limitation of output buffer
......@@ -4927,7 +4933,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
pQInfo->groupIndex = 0; // always start from 0
pQuery->rec.rows = 0;
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
}
......@@ -4956,7 +4962,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
if (pRuntimeEnv->groupbyNormalCol) { // todo refactor with merge interval time result
pQInfo->groupIndex = 0;
pQuery->rec.rows = 0;
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
}
......@@ -4988,7 +4994,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
pQInfo->groupIndex = 0; // always start from 0
if (pRuntimeEnv->windowResInfo.size > 0) {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
if (pQuery->rec.rows > 0) {
......@@ -5736,7 +5742,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
STimeWindow window = pQueryMsg->window;
taosArraySort(pTableIdList, compareTableIdInfo);
// TODO optimize the STableQueryInfo malloc strategy
pQInfo->runtimeEnv.interBufSize = getOutputInterResultBufSize(pQuery);
pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
int32_t index = 0;
......
......@@ -17,15 +17,24 @@
#include "hash.h"
#include "taosmsg.h"
#include "qextbuffer.h"
#include "ttime.h"
#include "qfill.h"
#include "ttime.h"
#include "qExecutor.h"
#include "qUtil.h"
int32_t getOutputInterResultBufSize(SQuery* pQuery) {
int32_t size = 0;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
assert(pQuery->pSelectExpr[i].interBytes <= DEFAULT_INTERN_BUF_PAGE_SIZE);
size += pQuery->pSelectExpr[i].interBytes;
}
assert(size > 0);
return size;
}
int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t size,
int32_t threshold, int16_t type) {
pWindowResInfo->capacity = size;
......@@ -43,7 +52,7 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
pWindowResInfo->pResult = calloc(threshold, sizeof(SWindowResult));
for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) {
SPosInfo posInfo = {-1, -1};
createQueryResultInfo(pRuntimeEnv->pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &posInfo);
createQueryResultInfo(pRuntimeEnv->pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &posInfo, pRuntimeEnv->interBufSize);
}
return TSDB_CODE_SUCCESS;
......@@ -54,11 +63,7 @@ void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols) {
return;
}
// TODO opt malloc strategy
for (int32_t i = 0; i < nOutputCols; ++i) {
free(pWindowRes->resultInfo[i].interResultBuf);
}
free(pWindowRes->resultInfo[0].interResultBuf);
free(pWindowRes->resultInfo);
}
......@@ -241,10 +246,9 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
}
pWindowRes->numOfRows = 0;
// pWindowRes->nAlloc = 0;
pWindowRes->pos = (SPosInfo){-1, -1};
pWindowRes->status.closed = false;
pWindowRes->window = (STimeWindow){0, 0};
pWindowRes->window = TSWINDOW_INITIALIZER;
}
/**
......@@ -254,7 +258,6 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
*/
void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, const SWindowResult *src) {
dst->numOfRows = src->numOfRows;
// dst->nAlloc = src->nAlloc;
dst->window = src->window;
dst->status = src->status;
......
system sh/stop_dnodes.sh
system sh/ip.sh -i 1 -s up
system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1
system sh/cfg.sh -n dnode1 -c commitLog -v 0
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sql connect
......
sleep 2000
run general/parser/alter.sim
sleep 2000
run general/parser/alter1.sim
......@@ -7,7 +8,6 @@ sleep 2000
run general/parser/auto_create_tb.sim
sleep 2000
run general/parser/auto_create_tb_drop_tb.sim
sleep 2000
run general/parser/col_arithmetic_operation.sim
sleep 2000
......@@ -23,65 +23,61 @@ run general/parser/create_tb.sim
sleep 2000
run general/parser/dbtbnameValidate.sim
sleep 2000
run general/parser/fill.sim
sleep 2000
run general/parser/fill_stb.sim
sleep 2000
#run general/parser/fill_us.sim #
sleep 2000
run general/parser/first_last.sim
sleep 2000
run general/parser/import_commit1.sim
sleep 2000
run general/parser/import_commit2.sim
sleep 2000
run general/parser/import_commit3.sim
sleep 2000
#run general/parser/import_file.sim
sleep 2000
run general/parser/insert_tb.sim
sleep 2000
run general/parser/first_last.sim
run general/parser/tags_dynamically_specifiy.sim
sleep 2000
#run general/parser/import_file.sim
run general/parser/interp.sim
sleep 2000
run general/parser/lastrow.sim
sleep 2000
run general/parser/nchar.sim
sleep 2000
#run general/parser/null_char.sim
run general/parser/limit.sim
sleep 2000
run general/parser/single_row_in_tb.sim
run general/parser/limit1.sim
sleep 2000
run general/parser/select_from_cache_disk.sim
run general/parser/limit1_tblocks100.sim
sleep 2000
run general/parser/selectResNum.sim
run general/parser/limit2.sim
sleep 2000
run general/parser/mixed_blocks.sim
sleep 2000
run general/parser/limit1.sim
run general/parser/nchar.sim
sleep 2000
run general/parser/limit.sim
run general/parser/null_char.sim
sleep 2000
run general/parser/limit1_tblocks100.sim
run general/parser/selectResNum.sim
sleep 2000
run general/parser/select_across_vnodes.sim
sleep 2000
run general/parser/slimit1.sim
sleep 2000
run general/parser/tbnameIn.sim
sleep 2000
run general/parser/projection_limit_offset.sim
sleep 2000
run general/parser/limit2.sim
sleep 2000
run general/parser/fill.sim
run general/parser/select_from_cache_disk.sim
sleep 2000
run general/parser/fill_stb.sim
run general/parser/set_tag_vals.sim
sleep 2000
run general/parser/where.sim
run general/parser/single_row_in_tb.sim
sleep 2000
run general/parser/slimit.sim
sleep 2000
run general/parser/select_with_tags.sim
sleep 2000
run general/parser/interp.sim
run general/parser/slimit1.sim
sleep 2000
run general/parser/tags_dynamically_specifiy.sim
run general/parser/slimit_alter_tags.sim
sleep 2000
run general/parser/groupby.sim
sleep 2000
run general/parser/set_tag_vals.sim
run general/parser/tbnameIn.sim
sleep 2000
run general/parser/slimit_alter_tags.sim # persistent failed
sleep 2000
......@@ -89,11 +85,19 @@ run general/parser/join.sim
sleep 2000
run general/parser/join_multivnode.sim
sleep 2000
run general/parser/repeatAlter.sim
run general/parser/projection_limit_offset.sim
sleep 2000
run general/parser/select_with_tags.sim
sleep 2000
run general/parser/groupby.sim
sleep 2000
run general/parser/union.sim
sleep 2000
run general/parser/sliding.sim
sleep 2000
run general/parser/binary_escapeCharacter.sim
run general/parser/fill_us.sim
sleep 2000
run general/parser/bug.sim
run general/parser/tags_filter.sim
#sleep 2000
#run general/parser/repeatStream.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册