未验证 提交 ab5b0d57 编写于 作者: P plum-lihui 提交者: GitHub

Merge pull request #1281 from taosdata/feature/liaohj

Feature/liaohj
...@@ -584,9 +584,14 @@ static bool isTopBottomQuery(SQueryInfo* pQueryInfo) { ...@@ -584,9 +584,14 @@ static bool isTopBottomQuery(SQueryInfo* pQueryInfo) {
int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
const char* msg1 = "invalid query expression"; const char* msg1 = "invalid query expression";
const char* msg2 = "interval cannot be less than 10 ms"; const char* msg2 = "interval cannot be less than 10 ms";
const char* msg3 = "interval required";
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
if (pQuerySql->interval.type == 0 && pQuerySql->sliding.type != 0) {
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
}
if (pQuerySql->interval.type == 0 || pQuerySql->interval.n == 0) { if (pQuerySql->interval.type == 0 || pQuerySql->interval.n == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -88,7 +88,7 @@ int32_t getNumOfResultBufGroupId(SQueryDiskbasedResultBuf* pResultBuf); ...@@ -88,7 +88,7 @@ int32_t getNumOfResultBufGroupId(SQueryDiskbasedResultBuf* pResultBuf);
* destroy result buffer * destroy result buffer
* @param pResultBuf * @param pResultBuf
*/ */
void destroyResultBuf(SQueryDiskbasedResultBuf* pResultBuf); void destroyDiskbasedResultBuf(SQueryDiskbasedResultBuf* pResultBuf);
/** /**
* *
......
...@@ -277,9 +277,9 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* dst, con ...@@ -277,9 +277,9 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* dst, con
int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t size, int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t size,
int32_t threshold, int16_t type); int32_t threshold, int16_t type);
void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv); void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo, int32_t numOfCols);
void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo); void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo);
void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num); void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num);
void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv); void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv);
int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo); int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo);
......
...@@ -585,7 +585,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t StartQue ...@@ -585,7 +585,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t StartQue
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo); void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo);
static void destroyTimeWindowRes(SWindowResult *pOneOutputRes, int32_t nOutputCols); static void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols);
static int32_t binarySearchForBlockImpl(SCompBlock *pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) { static int32_t binarySearchForBlockImpl(SCompBlock *pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
int32_t firstSlot = 0; int32_t firstSlot = 0;
...@@ -1740,7 +1740,8 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat ...@@ -1740,7 +1740,8 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat
pCtx[k].nStartQueryTimestamp = pWin->skey; pCtx[k].nStartQueryTimestamp = pWin->skey;
pCtx[k].size = forwardStep; pCtx[k].size = forwardStep;
pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? startPos : startPos - (forwardStep - 1); pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? startPos : startPos - (forwardStep - 1);
pCtx[k].ptsList = (TSKEY *)((char*)pRuntimeEnv->primaryColBuffer->data + pCtx[k].startOffset * TSDB_KEYSIZE);
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]); aAggs[functionId].xFunction(&pCtx[k]);
...@@ -1785,7 +1786,19 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow ...@@ -1785,7 +1786,19 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
return -1; return -1;
} }
TSKEY startKey = QUERY_IS_ASC_QUERY(pQuery) ? pNextWin->skey : pNextWin->ekey; TSKEY startKey = -1;
if (QUERY_IS_ASC_QUERY(pQuery)) {
startKey = pNextWin->skey;
if (startKey < pQuery->skey) {
startKey = pQuery->skey;
}
} else {
startKey = pNextWin->ekey;
if (startKey > pQuery->skey) {
startKey = pQuery->skey;
}
}
int32_t startPos = searchFn((char *)primaryKeys, pBlockInfo->size, startKey, pQuery->order.order); int32_t startPos = searchFn((char *)primaryKeys, pBlockInfo->size, startKey, pQuery->order.order);
/* /*
...@@ -2008,10 +2021,6 @@ static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx * ...@@ -2008,10 +2021,6 @@ static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx *
int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t size, int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t size,
int32_t threshold, int16_t type) { int32_t threshold, int16_t type) {
if (size < threshold) {
size = threshold;
}
pWindowResInfo->capacity = size; pWindowResInfo->capacity = size;
pWindowResInfo->threshold = threshold; pWindowResInfo->threshold = threshold;
...@@ -2025,7 +2034,7 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun ...@@ -2025,7 +2034,7 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
// use the pointer arraylist // use the pointer arraylist
pWindowResInfo->pResult = calloc(threshold, sizeof(SWindowResult)); pWindowResInfo->pResult = calloc(threshold, sizeof(SWindowResult));
for (int32_t i = 0; i < threshold; ++i) { for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) {
SPosInfo posInfo = {-1, -1}; SPosInfo posInfo = {-1, -1};
createQueryResultInfo(pRuntimeEnv->pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &posInfo); createQueryResultInfo(pRuntimeEnv->pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &posInfo);
} }
...@@ -2033,15 +2042,15 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun ...@@ -2033,15 +2042,15 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv) { void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, int32_t numOfCols) {
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) { if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) {
assert(pWindowResInfo->hashList == NULL && pWindowResInfo->pResult == NULL); assert(pWindowResInfo->hashList == NULL && pWindowResInfo->pResult == NULL);
return; return;
} }
for (int32_t i = 0; i < pWindowResInfo->size; ++i) { for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) {
SWindowResult *pResult = &pWindowResInfo->pResult[i]; SWindowResult *pResult = &pWindowResInfo->pResult[i];
destroyTimeWindowRes(pResult, pRuntimeEnv->pQuery->numOfOutputCols); destroyTimeWindowRes(pResult, numOfCols);
} }
taosCleanUpHashTable(pWindowResInfo->hashList); taosCleanUpHashTable(pWindowResInfo->hashList);
...@@ -2846,16 +2855,18 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2846,16 +2855,18 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
return; return;
} }
dTrace("QInfo:%p teardown runtime env", GET_QINFO_ADDR(pRuntimeEnv->pQuery)); SQuery* pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfCols; ++i) {
dTrace("QInfo:%p teardown runtime env", GET_QINFO_ADDR(pQuery));
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
tfree(pRuntimeEnv->colDataBuffer[i]); tfree(pRuntimeEnv->colDataBuffer[i]);
} }
tfree(pRuntimeEnv->secondaryUnzipBuffer); tfree(pRuntimeEnv->secondaryUnzipBuffer);
cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv); cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo, pQuery->numOfOutputCols);
if (pRuntimeEnv->pCtx != NULL) { if (pRuntimeEnv->pCtx != NULL) {
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
for (int32_t j = 0; j < pCtx->numOfParams; ++j) { for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
...@@ -2873,7 +2884,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2873,7 +2884,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tfree(pRuntimeEnv->unzipBuffer); tfree(pRuntimeEnv->unzipBuffer);
if (pRuntimeEnv->pQuery && (!PRIMARY_TSCOL_LOADED(pRuntimeEnv->pQuery))) { if (pQuery && (!PRIMARY_TSCOL_LOADED(pQuery))) {
tfree(pRuntimeEnv->primaryColBuffer); tfree(pRuntimeEnv->primaryColBuffer);
} }
...@@ -2887,13 +2898,14 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2887,13 +2898,14 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
taosDestoryInterpoInfo(&pRuntimeEnv->interpoInfo); taosDestoryInterpoInfo(&pRuntimeEnv->interpoInfo);
if (pRuntimeEnv->pInterpoBuf != NULL) { if (pRuntimeEnv->pInterpoBuf != NULL) {
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
tfree(pRuntimeEnv->pInterpoBuf[i]); tfree(pRuntimeEnv->pInterpoBuf[i]);
} }
tfree(pRuntimeEnv->pInterpoBuf); tfree(pRuntimeEnv->pInterpoBuf);
} }
destroyDiskbasedResultBuf(pRuntimeEnv->pResultBuf);
pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf);
} }
...@@ -4504,19 +4516,19 @@ static void allocMemForInterpo(STableQuerySupportObj *pSupporter, SQuery *pQuery ...@@ -4504,19 +4516,19 @@ static void allocMemForInterpo(STableQuerySupportObj *pSupporter, SQuery *pQuery
static int32_t getInitialPageNum(STableQuerySupportObj *pSupporter) { static int32_t getInitialPageNum(STableQuerySupportObj *pSupporter) {
SQuery *pQuery = pSupporter->runtimeEnv.pQuery; SQuery *pQuery = pSupporter->runtimeEnv.pQuery;
int32_t INITIAL_RESULT_ROWS_VALUE = 16;
int32_t num = 0; int32_t num = 0;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
num = 128; num = 128;
} else if (isIntervalQuery(pQuery)) { // time window query, allocate one page for each table } else if (isIntervalQuery(pQuery)) { // time window query, allocate one page for each table
num = pSupporter->numOfMeters; num = MAX(pSupporter->numOfMeters, INITIAL_RESULT_ROWS_VALUE);
} else { // for super table query, one page for each subset } else { // for super table query, one page for each subset
num = pSupporter->pSidSet->numOfSubSet; num = pSupporter->pSidSet->numOfSubSet;
} }
assert(num > 0); assert(num > 0);
return num; return num;
} }
...@@ -4816,20 +4828,6 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) { ...@@ -4816,20 +4828,6 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
pSupporter->pMetersHashTable = NULL; pSupporter->pMetersHashTable = NULL;
} }
if (pSupporter->pSidSet != NULL || isGroupbyNormalCol(pQInfo->query.pGroupbyExpr) ||
isIntervalQuery(pQuery)) {
int32_t size = 0;
if (isGroupbyNormalCol(pQInfo->query.pGroupbyExpr) || isIntervalQuery(pQuery)) {
size = 10000;
} else if (pSupporter->pSidSet != NULL) {
size = pSupporter->pSidSet->numOfSubSet;
}
for (int32_t i = 0; i < size; ++i) {
// destroyTimeWindowRes(&pSupporter->pResult[i], pQInfo->query.numOfOutputCols);
}
}
tSidSetDestroy(&pSupporter->pSidSet); tSidSetDestroy(&pSupporter->pSidSet);
if (pSupporter->pMeterDataInfo != NULL) { if (pSupporter->pMeterDataInfo != NULL) {
...@@ -5874,10 +5872,13 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery ...@@ -5874,10 +5872,13 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery
tfree(pTree); tfree(pTree);
tfree(pTableList); tfree(pTableList);
tfree(posList); tfree(posList);
tfree(pResultInfo);
pSupporter->offset = 0; pSupporter->offset = 0;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
tfree(pResultInfo[i].interResultBuf);
}
tfree(pResultInfo);
return pSupporter->numOfGroupResultPages; return pSupporter->numOfGroupResultPages;
} }
...@@ -6345,6 +6346,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -6345,6 +6346,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
int64_t skey = pQuery->lastKey; int64_t skey = pQuery->lastKey;
int32_t status = pQuery->over; int32_t status = pQuery->over;
int32_t activeSlot = pRuntimeEnv->windowResInfo.curIndex;
SET_MASTER_SCAN_FLAG(pRuntimeEnv); SET_MASTER_SCAN_FLAG(pRuntimeEnv);
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
...@@ -6370,6 +6372,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -6370,6 +6372,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
status = pQuery->over; status = pQuery->over;
pQuery->ekey = pQuery->lastKey - step; pQuery->ekey = pQuery->lastKey - step;
pQuery->lastKey = pQuery->skey; pQuery->lastKey = pQuery->skey;
pRuntimeEnv->windowResInfo.curIndex = activeSlot;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED); setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
pRuntimeEnv->scanFlag = REPEAT_SCAN; pRuntimeEnv->scanFlag = REPEAT_SCAN;
...@@ -6629,13 +6632,8 @@ void destroyMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, int32_t numOfCols) ...@@ -6629,13 +6632,8 @@ void destroyMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, int32_t numOfCols)
if (pMeterQueryInfo == NULL) { if (pMeterQueryInfo == NULL) {
return; return;
} }
// free(pMeterQueryInfo->pageList); cleanupTimeWindowInfo(&pMeterQueryInfo->windowResInfo, numOfCols);
// for (int32_t i = 0; i < numOfCols; ++i) {
// tfree(pMeterQueryInfo->[i].interResultBuf);
// }
// free(pMeterQueryInfo->resultInfo);
free(pMeterQueryInfo); free(pMeterQueryInfo);
} }
......
...@@ -189,7 +189,7 @@ SIDList getDataBufPagesIdList(SQueryDiskbasedResultBuf* pResultBuf, int32_t grou ...@@ -189,7 +189,7 @@ SIDList getDataBufPagesIdList(SQueryDiskbasedResultBuf* pResultBuf, int32_t grou
} }
} }
void destroyResultBuf(SQueryDiskbasedResultBuf* pResultBuf) { void destroyDiskbasedResultBuf(SQueryDiskbasedResultBuf* pResultBuf) {
if (pResultBuf == NULL) { if (pResultBuf == NULL) {
return; return;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册