提交 7874adec 编写于 作者: H hjxilinx

fix bugs in handling the sliding query.

上级 08df5bf9
...@@ -56,7 +56,7 @@ typedef enum { ...@@ -56,7 +56,7 @@ typedef enum {
* the program will call this function again, if this status is set. * the program will call this function again, if this status is set.
* used to transfer from QUERY_RESBUF_FULL * used to transfer from QUERY_RESBUF_FULL
*/ */
QUERY_NOT_COMPLETED = 0x1, QUERY_NOT_COMPLETED = 0x1u,
/* /*
* output buffer is full, so, the next query will be employed, * output buffer is full, so, the next query will be employed,
...@@ -66,7 +66,7 @@ typedef enum { ...@@ -66,7 +66,7 @@ typedef enum {
* this status is only exist in group-by clause and * this status is only exist in group-by clause and
* diff/add/division/multiply/ query. * diff/add/division/multiply/ query.
*/ */
QUERY_RESBUF_FULL = 0x2, QUERY_RESBUF_FULL = 0x2u,
/* /*
* query is over * query is over
...@@ -76,14 +76,13 @@ typedef enum { ...@@ -76,14 +76,13 @@ typedef enum {
* 2. when the query range on timestamp is satisfied, it is also denoted as * 2. when the query range on timestamp is satisfied, it is also denoted as
* query_compeleted * query_compeleted
*/ */
QUERY_COMPLETED = 0x4, QUERY_COMPLETED = 0x4u,
/* /*
* all data has been scanned, so current search is stopped, * all data has been scanned, so current search is stopped,
* At last, the function will transfer this status to QUERY_COMPLETED * At last, the function will transfer this status to QUERY_COMPLETED
*/ */
QUERY_NO_DATA_TO_CHECK = 0x8, QUERY_NO_DATA_TO_CHECK = 0x8u,
} vnodeQueryStatus; } vnodeQueryStatus;
typedef struct SPointInterpoSupporter { typedef struct SPointInterpoSupporter {
...@@ -170,7 +169,7 @@ void disableFunctForSuppleScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order); ...@@ -170,7 +169,7 @@ void disableFunctForSuppleScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
void enableFunctForMasterScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order); void enableFunctForMasterScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
int32_t mergeMetersResultToOneGroups(STableQuerySupportObj* pSupporter); int32_t mergeMetersResultToOneGroups(STableQuerySupportObj* pSupporter);
void copyFromGroupBuf(SQInfo* pQInfo, SWindowResult* result); void copyFromWindowResToSData(SQInfo* pQInfo, SWindowResult* result);
SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv* pRuntimeEnv, void* pBlock, int32_t blockType); SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv* pRuntimeEnv, void* pBlock, int32_t blockType);
SCacheBlock* getCacheDataBlock(SMeterObj* pMeterObj, SQueryRuntimeEnv* pRuntimeEnv, int32_t slot); SCacheBlock* getCacheDataBlock(SMeterObj* pMeterObj, SQueryRuntimeEnv* pRuntimeEnv, int32_t slot);
...@@ -291,6 +290,8 @@ int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRun ...@@ -291,6 +290,8 @@ int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRun
void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv); void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv);
void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo); void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo);
void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num);
void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv); void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv);
int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo); int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo);
void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot); void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
......
...@@ -663,7 +663,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { ...@@ -663,7 +663,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
* we need to return it to client in the first place. * we need to return it to client in the first place.
*/ */
if (pSupporter->subgroupIdx > 0) { if (pSupporter->subgroupIdx > 0) {
copyFromGroupBuf(pQInfo, pRuntimeEnv->windowResInfo.pResult); copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
pQInfo->pointsRead += pQuery->pointsRead; pQInfo->pointsRead += pQuery->pointsRead;
if (pQuery->pointsRead > 0) { if (pQuery->pointsRead > 0) {
...@@ -814,7 +814,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { ...@@ -814,7 +814,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
pQInfo->pTableQuerySupporter->subgroupIdx = 0; pQInfo->pTableQuerySupporter->subgroupIdx = 0;
pQuery->pointsRead = 0; pQuery->pointsRead = 0;
copyFromGroupBuf(pQInfo, pWindowResInfo->pResult); copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult);
} }
pQInfo->pointsRead += pQuery->pointsRead; pQInfo->pointsRead += pQuery->pointsRead;
...@@ -915,7 +915,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { ...@@ -915,7 +915,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
displayInterResult(pQuery->sdata, pQuery, pQuery->sdata[0]->len); displayInterResult(pQuery->sdata, pQuery, pQuery->sdata[0]->len);
#endif #endif
} else { } else {
copyFromGroupBuf(pQInfo, pRuntimeEnv->windowResInfo.pResult); copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
} }
pQInfo->pointsRead += pQuery->pointsRead; pQInfo->pointsRead += pQuery->pointsRead;
...@@ -979,7 +979,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { ...@@ -979,7 +979,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
#endif #endif
} }
} else { // not a interval query } else { // not a interval query
copyFromGroupBuf(pQInfo, pRuntimeEnv->windowResInfo.pResult); copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
} }
// handle the limitation of output buffer // handle the limitation of output buffer
...@@ -1090,7 +1090,6 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter ...@@ -1090,7 +1090,6 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter
while (1) { while (1) {
initCtxOutputBuf(pRuntimeEnv); initCtxOutputBuf(pRuntimeEnv);
clearClosedTimeWindow(pRuntimeEnv);
vnodeScanAllData(pRuntimeEnv); vnodeScanAllData(pRuntimeEnv);
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQuery)) {
...@@ -1101,18 +1100,19 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter ...@@ -1101,18 +1100,19 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter
doFinalizeResult(pRuntimeEnv); doFinalizeResult(pRuntimeEnv);
int64_t maxOutput = getNumOfResult(pRuntimeEnv); // int64_t maxOutput = getNumOfResult(pRuntimeEnv);
// here we can ignore the records in case of no interpolation // here we can ignore the records in case of no interpolation
// todo handle offset, in case of top/bottom interval query
if ((pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) && pQuery->limit.offset > 0 && if ((pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) && pQuery->limit.offset > 0 &&
pQuery->interpoType == TSDB_INTERPO_NONE) { pQuery->interpoType == TSDB_INTERPO_NONE) {
// maxOutput <= 0, means current query does not generate any results // maxOutput <= 0, means current query does not generate any results
// todo handle offset, in case of top/bottom interval query int32_t numOfClosed = numOfClosedTimeWindow(&pRuntimeEnv->windowResInfo);
if (maxOutput > 0) {
pQuery->limit.offset--; int32_t c = MIN(numOfClosed, pQuery->limit.offset);
} clearFirstNTimeWindow(pRuntimeEnv, c);
pQuery->limit.offset -= c;
} else { } else {
// assert(0);
// pQuery->pointsRead += maxOutput; // pQuery->pointsRead += maxOutput;
// forwardCtxOutputBuf(pRuntimeEnv, maxOutput); // forwardCtxOutputBuf(pRuntimeEnv, maxOutput);
} }
...@@ -1126,16 +1126,16 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter ...@@ -1126,16 +1126,16 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter
break; break;
} }
/* // /*
* the scan limitation mechanism is upon here, // * the scan limitation mechanism is upon here,
* 1. since there is only one(k) record is generated in one scan operation // * 1. since there is only one(k) record is generated in one scan operation
* 2. remain space is not sufficient for next query output, abort // * 2. remain space is not sufficient for next query output, abort
*/ // */
if ((pQuery->pointsRead % pQuery->pointsToRead == 0 && pQuery->pointsRead != 0) || // if ((pQuery->pointsRead % pQuery->pointsToRead == 0 && pQuery->pointsRead != 0) ||
((pQuery->pointsRead + maxOutput) > pQuery->pointsToRead)) { // ((pQuery->pointsRead + maxOutput) > pQuery->pointsToRead)) {
setQueryStatus(pQuery, QUERY_RESBUF_FULL); // setQueryStatus(pQuery, QUERY_RESBUF_FULL);
break; // break;
} // }
} }
} }
...@@ -1154,9 +1154,11 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) { ...@@ -1154,9 +1154,11 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) {
vnodeSingleMeterIntervalMainLooper(pSupporter, pRuntimeEnv); vnodeSingleMeterIntervalMainLooper(pSupporter, pRuntimeEnv);
if (pQuery->intervalTime > 0) { if (pQuery->intervalTime > 0) {
pSupporter->subgroupIdx = 0; pSupporter->subgroupIdx = 0; // always start from 0
pQuery->pointsRead = 0; pQuery->pointsRead = 0;
copyFromGroupBuf(pQInfo, pRuntimeEnv->windowResInfo.pResult); copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
clearFirstNTimeWindow(pRuntimeEnv, pSupporter->subgroupIdx);
} }
// the offset is handled at prepare stage if no interpolation involved // the offset is handled at prepare stage if no interpolation involved
...@@ -1190,7 +1192,7 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) { ...@@ -1190,7 +1192,7 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
pSupporter->subgroupIdx = 0; pSupporter->subgroupIdx = 0;
pQuery->pointsRead = 0; pQuery->pointsRead = 0;
copyFromGroupBuf(pQInfo, pRuntimeEnv->windowResInfo.pResult); copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
} }
pQInfo->pointsRead += pQuery->pointsRead; pQInfo->pointsRead += pQuery->pointsRead;
...@@ -1220,13 +1222,14 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) { ...@@ -1220,13 +1222,14 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) {
SQuery * pQuery = &pQInfo->query; SQuery * pQuery = &pQInfo->query;
SMeterObj *pMeterObj = pQInfo->pObj; SMeterObj *pMeterObj = pQInfo->pObj;
STableQuerySupportObj* pSupporter = pQInfo->pTableQuerySupporter;
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
assert(pRuntimeEnv->pMeterObj == pMeterObj);
dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pMeterObj->vnode, pMeterObj->sid, dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pMeterObj->vnode, pMeterObj->sid,
pMeterObj->meterId, pMeterObj->numOfQueries, pQInfo); pMeterObj->meterId, pMeterObj->numOfQueries, pQInfo);
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->pTableQuerySupporter->runtimeEnv;
assert(pRuntimeEnv->pMeterObj == pMeterObj);
if (vnodeHasRemainResults(pQInfo)) { if (vnodeHasRemainResults(pQInfo)) {
/* /*
* There are remain results that are not returned due to result interpolation * There are remain results that are not returned due to result interpolation
...@@ -1258,12 +1261,16 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) { ...@@ -1258,12 +1261,16 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) {
// here we have scan all qualified data in both data file and cache // here we have scan all qualified data in both data file and cache
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) {
// continue to get push data from the group result // continue to get push data from the group result
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || pQuery->intervalTime > 0) {
pQuery->pointsRead = 0; pQuery->pointsRead = 0;
if (pQInfo->pTableQuerySupporter->subgroupIdx > 0) { pSupporter->subgroupIdx = 0; // always start from 0
copyFromGroupBuf(pQInfo, pQInfo->pTableQuerySupporter->runtimeEnv.windowResInfo.pResult);
if (pRuntimeEnv->windowResInfo.size > 0) {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
pQInfo->pointsRead += pQuery->pointsRead; pQInfo->pointsRead += pQuery->pointsRead;
clearFirstNTimeWindow(pRuntimeEnv, pSupporter->subgroupIdx);
if (pQuery->pointsRead > 0) { if (pQuery->pointsRead > 0) {
dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d totalReturn:%d", dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d totalReturn:%d",
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead,
...@@ -1281,7 +1288,7 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) { ...@@ -1281,7 +1288,7 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) {
dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode, pMeterObj->sid, dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode, pMeterObj->sid,
pMeterObj->meterId, pQInfo->pointsRead); pMeterObj->meterId, pQInfo->pointsRead);
vnodePrintQueryStatistics(pQInfo->pTableQuerySupporter); vnodePrintQueryStatistics(pSupporter);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
vnodeDecRefCount(pQInfo); vnodeDecRefCount(pQInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册