提交 d7579e02 编写于 作者: H Haojun Liao

[td-225] fix the bugs in fill for interval query

上级 21f0afe5
......@@ -822,7 +822,7 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource *
}
}
void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) {
void savePrevRecordAndSetupFillInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) {
// discard following dataset in the same group and reset the interpolation information
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -865,64 +865,66 @@ static void reversedCopyFromInterpolationToDstBuf(SQueryInfo *pQueryInfo, SSqlRe
}
}
/*
* Note: pRes->pLocalReducer may be null, due to the fact that "tscDestroyLocalReducer" is called
* by "interuptHandler" function in shell
*/
static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) {
SSqlCmd * pCmd = &pSql->cmd;
SSqlRes * pRes = &pSql->res;
tFilePage * pFinalDataPage = pLocalReducer->pResultBuf;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer, SQueryInfo* pQueryInfo) {
assert(pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE);
// no interval query, no fill operation
if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
pRes->data = pLocalReducer->pFinalRes;
pRes->numOfRows = pFinalDataPage->num;
tFilePage * pBeforeFillData = pLocalReducer->pResultBuf;
if (pQueryInfo->limit.offset > 0) {
if (pQueryInfo->limit.offset < pRes->numOfRows) {
int32_t prevSize = (int32_t)pFinalDataPage->num;
tColModelErase(pLocalReducer->resColModel, pFinalDataPage, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1);
pRes->data = pLocalReducer->pFinalRes;
pRes->numOfRows = pBeforeFillData->num;
/* remove the hole in column model */
tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize);
if (pQueryInfo->limit.offset > 0) {
if (pQueryInfo->limit.offset < pRes->numOfRows) {
int32_t prevSize = (int32_t)pBeforeFillData->num;
tColModelErase(pLocalReducer->resColModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1);
pRes->numOfRows -= pQueryInfo->limit.offset;
pQueryInfo->limit.offset = 0;
} else {
pQueryInfo->limit.offset -= pRes->numOfRows;
pRes->numOfRows = 0;
}
/* remove the hole in column model */
tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize);
pRes->numOfRows -= pQueryInfo->limit.offset;
pQueryInfo->limit.offset = 0;
} else {
pQueryInfo->limit.offset -= pRes->numOfRows;
pRes->numOfRows = 0;
}
}
pRes->numOfRowsGroup += pRes->numOfRows;
pRes->numOfRowsGroup += pRes->numOfRows;
if (pQueryInfo->limit.limit >= 0 && pRes->numOfRowsGroup > pQueryInfo->limit.limit) {
/* impose the limitation of output rows on the final result */
int32_t prevSize = (int32_t)pFinalDataPage->num;
int32_t overflow = (int32_t)(pRes->numOfRowsGroup - pQueryInfo->limit.limit);
assert(overflow < pRes->numOfRows);
// impose the limitation of output rows on the final result
if (pQueryInfo->limit.limit >= 0 && pRes->numOfRowsGroup > pQueryInfo->limit.limit) {
int32_t prevSize = (int32_t)pBeforeFillData->num;
int32_t overflow = (int32_t)(pRes->numOfRowsGroup - pQueryInfo->limit.limit);
assert(overflow < pRes->numOfRows);
pRes->numOfRowsGroup = pQueryInfo->limit.limit;
pRes->numOfRows -= overflow;
pFinalDataPage->num -= overflow;
pRes->numOfRowsGroup = pQueryInfo->limit.limit;
pRes->numOfRows -= overflow;
pBeforeFillData->num -= overflow;
tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize);
tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize);
/* set remain data to be discarded, and reset the interpolation information */
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo);
}
// set remain data to be discarded, and reset the interpolation information
savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo);
}
memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * pLocalReducer->finalRowSize);
memcpy(pRes->data, pBeforeFillData->data, pRes->numOfRows * pLocalReducer->finalRowSize);
pRes->numOfClauseTotal += pRes->numOfRows;
pFinalDataPage->num = 0;
return;
}
pRes->numOfClauseTotal += pRes->numOfRows;
pBeforeFillData->num = 0;
}
/*
* Note: pRes->pLocalReducer may be null, due to the fact that "tscDestroyLocalReducer" is called
* by "interuptHandler" function in shell
*/
static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
tFilePage *pBeforeFillData = pLocalReducer->pResultBuf;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SFillInfo *pFillInfo = pLocalReducer->pFillInfo;
SFillInfo *pFillInfo = pLocalReducer->pFillInfo;
int64_t actualETime = MAX(pQueryInfo->window.skey, pQueryInfo->window.ekey);
tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput);
......@@ -960,7 +962,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
break;
}
/* all output for current group are completed */
// all output in current group are completed
int32_t totalRemainRows = (int32_t)getFilledNumOfRes(pFillInfo, actualETime, pLocalReducer->resColModel->capacity);
if (totalRemainRows <= 0) {
break;
......@@ -970,15 +972,16 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
}
if (pRes->numOfRows > 0) {
if (pQueryInfo->limit.limit >= 0 && pRes->numOfRows > pQueryInfo->limit.limit) {
int32_t overflow = (int32_t)(pRes->numOfRows - pQueryInfo->limit.limit);
pRes->numOfRows -= overflow;
pFinalDataPage->num -= overflow;
int32_t currentTotal = pRes->numOfRowsGroup + pRes->numOfRows;
assert(pRes->numOfRows >= 0 && pFinalDataPage->num > 0);
if (pQueryInfo->limit.limit >= 0 && currentTotal > pQueryInfo->limit.limit) {
int32_t overflow = (int32_t)(currentTotal - pQueryInfo->limit.limit);
pRes->numOfRows -= overflow;
assert(pRes->numOfRows >= 0);
/* set remain data to be discarded, and reset the interpolation information */
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pFillInfo);
savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pFillInfo);
}
if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
......@@ -995,7 +998,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
pRes->numOfClauseTotal += pRes->numOfRows;
}
pFinalDataPage->num = 0;
pBeforeFillData->num = 0;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
taosTFree(pResPages[i]);
}
......@@ -1212,7 +1215,7 @@ static bool saveGroupResultInfo(SSqlObj *pSql) {
* @param noMoreCurrentGroupRes
* @return if current group is skipped, return false, and do NOT record it into pRes->numOfGroups
*/
bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCurrentGroupRes) {
bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCurrentGroupRes) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
......@@ -1246,13 +1249,21 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
// tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->num, pResBuf->num);
#endif
SFillInfo* pFillInfo = pLocalReducer->pFillInfo;
if (pFillInfo != NULL) {
taosFillSetStartInfo(pFillInfo, (int32_t)pResBuf->num, pQueryInfo->window.ekey);
taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf);
}
doFillResult(pSql, pLocalReducer, noMoreCurrentGroupRes);
// no interval query, no fill operation
if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
genFinalResWithoutFill(pRes, pLocalReducer, pQueryInfo);
} else {
SFillInfo* pFillInfo = pLocalReducer->pFillInfo;
if (pFillInfo != NULL) {
taosFillSetStartInfo(pFillInfo, (int32_t)pResBuf->num, pQueryInfo->window.ekey);
taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf);
}
doFillResult(pSql, pLocalReducer, noMoreCurrentGroupRes);
}
return true;
}
......@@ -1337,7 +1348,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey;
int32_t rows = (int32_t)getFilledNumOfRes(pFillInfo, etime, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo
if (rows > 0) {
doFillResult(pSql, pLocalReducer, true);
}
}
......@@ -1502,7 +1513,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
*/
if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num == pLocalReducer->resColModel->capacity)) {
// does not belong to the same group
bool notSkipped = doGenerateFinalResults(pSql, pLocalReducer, !sameGroup);
bool notSkipped = genFinalResults(pSql, pLocalReducer, !sameGroup);
// this row needs to discard, since it belongs to the group of previous
if (pLocalReducer->discard && sameGroup) {
......@@ -1571,7 +1582,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
}
if (pLocalReducer->pResultBuf->num) {
doGenerateFinalResults(pSql, pLocalReducer, true);
genFinalResults(pSql, pLocalReducer, true);
}
assert(pLocalReducer->status == TSC_LOCALREDUCE_IN_PROGRESS && pRes->row == 0);
......
......@@ -248,17 +248,17 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi
return 0;
}
static void setTagsValue(SFillInfo* pColInfo, tFilePage** data, SFillTagColInfo *pTags, int32_t start, int32_t num) {
for(int32_t j = 0; j < pColInfo->numOfCols; ++j) {
SFillColInfo* pCol = &pColInfo->pFillCol[j];
static void setTagsValue(SFillInfo* pFillInfo, tFilePage** data, int32_t num) {
for(int32_t j = 0; j < pFillInfo->numOfCols; ++j) {
SFillColInfo* pCol = &pFillInfo->pFillCol[j];
if (pCol->flag == TSDB_COL_NORMAL) {
continue;
}
char* val1 = elePtrAt(data[j]->data, pCol->col.bytes, num);
for(int32_t i = 0; i < pColInfo->numOfTags; ++i) {
SFillTagColInfo* pTag = &pColInfo->pTags[i];
for(int32_t i = 0; i < pFillInfo->numOfTags; ++i) {
SFillTagColInfo* pTag = &pFillInfo->pTags[i];
if (pTag->col.colId == pCol->col.colId) {
assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type);
break;
......@@ -267,8 +267,8 @@ static void setTagsValue(SFillInfo* pColInfo, tFilePage** data, SFillTagColInfo
}
}
static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData,
int64_t ts, SFillTagColInfo* pTags, bool outOfBound) {
static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData, int64_t ts,
bool outOfBound) {
char* prevValues = pFillInfo->prevValues;
char* nextValues = pFillInfo->nextValues;
......@@ -312,7 +312,7 @@ static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* nu
}
}
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
setTagsValue(pFillInfo, data, *num);
} else if (pFillInfo->fillType == TSDB_FILL_LINEAR) {
// TODO : linear interpolation supports NULL value
if (prevValues != NULL && !outOfBound) {
......@@ -337,7 +337,7 @@ static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* nu
taosDoLinearInterpolation(type, &point1, &point2, &point);
}
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
setTagsValue(pFillInfo, data, *num);
} else {
for (int32_t i = 1; i < numOfValCols; ++i) {
......@@ -352,7 +352,7 @@ static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* nu
}
}
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
setTagsValue(pFillInfo, data, *num);
}
} else { /* fill the default value */
......@@ -363,7 +363,7 @@ static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* nu
assignVal(val1, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type);
}
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
setTagsValue(pFillInfo, data, *num);
}
pFillInfo->start += (pFillInfo->slidingTime * step);
......@@ -397,8 +397,6 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
char** nextValues = &pFillInfo->nextValues;
int32_t numOfTags = pFillInfo->numOfTags;
SFillTagColInfo* pTags = pFillInfo->pTags;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
if (numOfRows == 0) {
/*
......@@ -406,7 +404,7 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
* real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
*/
while (num < outputRows) {
doFillResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, pTags, true);
doFillResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, true);
}
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
......@@ -433,12 +431,11 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
while (((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
(pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) && num < outputRows) {
doFillResultImpl(pFillInfo, data, &num, srcData, ts, pTags, false);
doFillResultImpl(pFillInfo, data, &num, srcData, ts, false);
}
/* output buffer is full, abort */
if ((num == outputRows && FILL_IS_ASC_FILL(pFillInfo)) ||
(num < 0 && !FILL_IS_ASC_FILL(pFillInfo))) {
if ((num == outputRows && FILL_IS_ASC_FILL(pFillInfo)) || (num < 0 && !FILL_IS_ASC_FILL(pFillInfo))) {
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
return outputRows;
}
......@@ -447,10 +444,12 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
initBeforeAfterDataBuf(pFillInfo, prevValues);
// assign rows to dst buffer
int32_t i = 0;
for (; i < pFillInfo->numOfCols - numOfTags; ++i) {
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (pCol->flag == TSDB_COL_TAG) {
continue;
}
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, num);
char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->rowIdx);
......@@ -472,10 +471,12 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
}
// set the tag value for final result
setTagsValue(pFillInfo, data, pTags, pFillInfo->numOfCols - numOfTags, num);
setTagsValue(pFillInfo, data, num);
pFillInfo->start += (pFillInfo->slidingTime * step);
pFillInfo->rowIdx += 1;
pFillInfo->numOfCurrent +=1;
num += 1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册