提交 39fe4bc5 编写于 作者: H hjxilinx

fix bugs in group by normal columns.

上级 1e01ce39
...@@ -714,12 +714,13 @@ static int32_t last_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY ...@@ -714,12 +714,13 @@ static int32_t last_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY
return BLK_DATA_NO_NEEDED; return BLK_DATA_NO_NEEDED;
} }
SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes); return BLK_DATA_ALL_NEEDED;
if (pInfo->hasResult != DATA_SET_FLAG) { // SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes);
return BLK_DATA_ALL_NEEDED; // if (pInfo->hasResult != DATA_SET_FLAG) {
} else { // return BLK_DATA_ALL_NEEDED;
return (pInfo->ts > end) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED; // } else {
} // return (pInfo->ts > end) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED;
// }
} }
////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////
......
...@@ -100,10 +100,11 @@ extern "C" { ...@@ -100,10 +100,11 @@ extern "C" {
#define TOP_BOTTOM_QUERY_LIMIT 100 #define TOP_BOTTOM_QUERY_LIMIT 100
enum { enum {
MASTER_SCAN = 0x0, MASTER_SCAN = 0x0u,
SUPPLEMENTARY_SCAN = 0x1, SUPPLEMENTARY_SCAN = 0x1u,
FIRST_STAGE_MERGE = 0x10, REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan
SECONDARY_STAGE_MERGE = 0x20, FIRST_STAGE_MERGE = 0x10u,
SECONDARY_STAGE_MERGE = 0x20u,
}; };
#define QUERY_IS_STABLE_QUERY(type) (((type)&TSDB_QUERY_TYPE_STABLE_QUERY) != 0) #define QUERY_IS_STABLE_QUERY(type) (((type)&TSDB_QUERY_TYPE_STABLE_QUERY) != 0)
...@@ -164,7 +165,7 @@ typedef struct SQLFunctionCtx { ...@@ -164,7 +165,7 @@ typedef struct SQLFunctionCtx {
int32_t startOffset; int32_t startOffset;
int32_t size; // number of rows int32_t size; // number of rows
int32_t order; // asc|desc int32_t order; // asc|desc
int32_t scanFlag; // TODO merge with currentStage uint32_t scanFlag; // TODO merge with currentStage
int16_t inputType; int16_t inputType;
int16_t inputBytes; int16_t inputBytes;
......
...@@ -111,8 +111,8 @@ typedef enum { ...@@ -111,8 +111,8 @@ typedef enum {
DISK_DATA_DISCARDED = 0x01, DISK_DATA_DISCARDED = 0x01,
} vnodeDiskLoadStatus; } vnodeDiskLoadStatus;
#define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN) #define IS_MASTER_SCAN(runtime) (((runtime)->scanFlag & 1u) == MASTER_SCAN)
#define IS_SUPPLEMENT_SCAN(runtime) (!IS_MASTER_SCAN(runtime)) #define IS_SUPPLEMENT_SCAN(runtime) ((runtime)->scanFlag == SUPPLEMENTARY_SCAN)
#define SET_SUPPLEMENT_SCAN_FLAG(runtime) ((runtime)->scanFlag = SUPPLEMENTARY_SCAN) #define SET_SUPPLEMENT_SCAN_FLAG(runtime) ((runtime)->scanFlag = SUPPLEMENTARY_SCAN)
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN) #define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
......
...@@ -2138,19 +2138,30 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) { ...@@ -2138,19 +2138,30 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) {
} }
} }
static int32_t setGroupResultFromKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes) { static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes) {
if (isNull(pData, type)) { // ignore the null value if (isNull(pData, type)) { // ignore the null value
return -1; return -1;
} }
int32_t GROUPRESULTID = 1;
SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes); SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes);
if (pWindowRes == NULL) { if (pWindowRes == NULL) {
return -1; return -1;
} }
// not assign result buffer yet, add new result buffer
if (pWindowRes->pos.pageId == -1) {
int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, GROUPRESULTID, pRuntimeEnv->numOfRowsPerPage);
if (ret != 0) {
return -1;
}
}
setWindowResOutputBuf(pRuntimeEnv, pWindowRes); setWindowResOutputBuf(pRuntimeEnv, pWindowRes);
initCtxOutputBuf(pRuntimeEnv); initCtxOutputBuf(pRuntimeEnv);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2368,7 +2379,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * ...@@ -2368,7 +2379,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
if (groupbyStateValue) { if (groupbyStateValue) {
char *stateVal = groupbyColumnData + bytes * offset; char *stateVal = groupbyColumnData + bytes * offset;
int32_t ret = setGroupResultFromKey(pRuntimeEnv, stateVal, type, bytes); int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, stateVal, type, bytes);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
continue; continue;
} }
...@@ -2495,7 +2506,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockI ...@@ -2495,7 +2506,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockI
} }
TSKEY lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? pBlockInfo->keyLast : pBlockInfo->keyFirst; TSKEY lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? pBlockInfo->keyLast : pBlockInfo->keyFirst;
doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); //todo refactor merge
// interval query with limit applied // interval query with limit applied
if (isIntervalQuery(pQuery) && pQuery->limit.limit > 0 && if (isIntervalQuery(pQuery) && pQuery->limit.limit > 0 &&
...@@ -4815,12 +4826,6 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { ...@@ -4815,12 +4826,6 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
} }
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type); initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type);
// }
// } else {
// ret = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, pSupporter->numOfMeters, pQuery->rowSize);
// if (ret != TSDB_CODE_SUCCESS) {
// return ret;
// }
} }
pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, true); pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, true);
...@@ -5702,7 +5707,7 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery ...@@ -5702,7 +5707,7 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery
char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes); char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes);
TSKEY ts = GET_INT64_VAL(b); TSKEY ts = GET_INT64_VAL(b);
assert(ts > 0 && ts == pWindowRes->window.skey); assert(ts == pWindowRes->window.skey);
int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes); int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes);
if (num <= 0) { if (num <= 0) {
cs.position[pos] += 1; cs.position[pos] += 1;
...@@ -6243,7 +6248,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -6243,7 +6248,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
void doFinalizeResult(SQueryRuntimeEnv *pRuntimeEnv) { void doFinalizeResult(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
// for each group result, call the finalize function for each column // for each group result, call the finalize function for each column
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
...@@ -6314,48 +6319,6 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -6314,48 +6319,6 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) {
return maxOutput; return maxOutput;
} }
/*
* forward the query range for next interval query
*/
// void forwardIntervalQueryRange(STableQuerySupportObj *pSupporter, SQueryRuntimeEnv *pRuntimeEnv) {
// SQuery *pQuery = pRuntimeEnv->pQuery;
// if (pQuery->slidingTime > 0 && isIntervalQuery(pQuery)) {
// if ((QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey >= pQuery->ekey) ||
// (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey <= pQuery->ekey)) {
// setQueryStatus(pQuery, QUERY_COMPLETED);
// } else {
// /*TSKEY nextTimestamp =*/loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
// }
//
// return;
// }
//
// // int32_t r = getNextIntervalQueryRange(pSupporter, pRuntimeEnv, &pQuery->skey, &pQuery->ekey);
// // if (r == QUERY_COMPLETED) {
// // setQueryStatus(pQuery, QUERY_COMPLETED);
// // return;
// // }
// //
// // getNextTimeWindow(pRuntimeEnv, &pRuntimeEnv->intervalWindow);
// //
// // /* ensure the search in cache will return right position */
// // pQuery->lastKey = pQuery->skey;
// //
// // TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
// // if ((nextTimestamp > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) ||
// // (nextTimestamp < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery)) ||
// // Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
// // setQueryStatus(pQuery, QUERY_COMPLETED);
// // return;
// // }
// //
// // // bridge the gap in group by time function
// // if ((nextTimestamp > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
// // (nextTimestamp < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
// // getAlignedIntervalQueryRange(pRuntimeEnv, nextTimestamp, pSupporter->rawSKey, pSupporter->rawEKey);
// // }
//}
static int32_t offsetComparator(const void *pLeft, const void *pRight) { static int32_t offsetComparator(const void *pLeft, const void *pRight) {
SMeterDataInfo **pLeft1 = (SMeterDataInfo **)pLeft; SMeterDataInfo **pLeft1 = (SMeterDataInfo **)pLeft;
SMeterDataInfo **pRight1 = (SMeterDataInfo **)pRight; SMeterDataInfo **pRight1 = (SMeterDataInfo **)pRight;
...@@ -7072,7 +7035,7 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, STableQuerySupportO ...@@ -7072,7 +7035,7 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, STableQuerySupportO
SWindowResInfo *pWindowResInfo = &pMeterQueryInfo->windowResInfo; SWindowResInfo *pWindowResInfo = &pMeterQueryInfo->windowResInfo;
doGetAlignedIntervalQueryRangeImpl(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &windowSKey, &windowEKey); doGetAlignedIntervalQueryRangeImpl(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &windowSKey, &windowEKey);
pWindowResInfo->startTime = windowSKey; pWindowResInfo->startTime = windowSKey; // windowSKey may be 0 in case of 1970 timestamp
// assert(pWindowResInfo->startTime > 0); // assert(pWindowResInfo->startTime > 0);
if (pWindowResInfo->prevSKey == 0) { if (pWindowResInfo->prevSKey == 0) {
......
...@@ -1174,10 +1174,11 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) { ...@@ -1174,10 +1174,11 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) {
} }
// all data scanned, the group by normal column can return // all data scanned, the group by normal column can return
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {//todo refactor with merge interval time result
pSupporter->subgroupIdx = 0; pSupporter->subgroupIdx = 0;
pQuery->pointsRead = 0; pQuery->pointsRead = 0;
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
clearFirstNTimeWindow(pRuntimeEnv, pSupporter->subgroupIdx);
} }
pQInfo->pointsRead += pQuery->pointsRead; pQInfo->pointsRead += pQuery->pointsRead;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册