提交 5d1a1e2b 编写于 作者: H Haojun Liao

[td-225] refactor code for fill

上级 cf0a90ce
......@@ -324,7 +324,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
tfree(pReducer->discardData);
tfree(pReducer->pResultBuf);
tfree(pReducer->pFinalRes);
// tfree(pReducer->pBufForInterpo);
tfree(pReducer->prevRowOfInput);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -363,7 +362,8 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
SFillColInfo* pFillCol = createFillColInfo(pQueryInfo);
pReducer->pFillInfo = taosInitFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
4096, numOfCols, pQueryInfo->slidingTime, pQueryInfo->fillType, pFillCol);
4096, numOfCols, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit,
tinfo.precision, pQueryInfo->fillType, pFillCol);
}
int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
......@@ -494,7 +494,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
tscTrace("%p waiting for delete procedure, status: %d", pSql, status);
}
taosDestoryFillInfo(pLocalReducer->pFillInfo);
pLocalReducer->pFillInfo = taosDestoryFillInfo(pLocalReducer->pFillInfo);
if (pLocalReducer->pCtx != NULL) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
......@@ -980,8 +980,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
}
/* all output for current group are completed */
int32_t totalRemainRows =
taosGetNumOfResultWithFill(pFillInfo, rpoints, pFillInfo->slidingTime, actualETime);
int32_t totalRemainRows = getFilledNumOfRes(pFillInfo, actualETime, pLocalReducer->resColModel->capacity);
if (totalRemainRows <= 0) {
break;
}
......@@ -1267,13 +1266,7 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
SFillInfo* pFillInfo = pLocalReducer->pFillInfo;
if (pFillInfo != NULL) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
TSKEY ekey = taosGetRevisedEndKey(pQueryInfo->window.ekey, pFillInfo->order, pFillInfo->slidingTime,
pQueryInfo->slidingTimeUnit, tinfo.precision);
taosFillSetStartInfo(pFillInfo, pResBuf->num, ekey);
taosFillSetStartInfo(pFillInfo, pResBuf->num, pQueryInfo->window.ekey);
taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf);
}
......@@ -1327,23 +1320,15 @@ static bool doBuildFilledResultForGroup(SSqlObj *pSql) {
SLocalReducer *pLocalReducer = pRes->pLocalReducer;
SFillInfo *pFillInfo = pLocalReducer->pFillInfo;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int8_t p = tinfo.precision;
if (pFillInfo != NULL && taosNumOfRemainRows(pFillInfo) > 0) {
assert(pQueryInfo->fillType != TSDB_FILL_NONE);
tFilePage *pFinalDataBuf = pLocalReducer->pResultBuf;
int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pFillInfo->numOfRows - 1));
int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pFillInfo->numOfRows - 1));
int32_t remain = taosNumOfRemainRows(pFillInfo);
TSKEY ekey = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, p);
// the first column must be the timestamp column
int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, ekey, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo
int32_t rows = getFilledNumOfRes(pFillInfo, etime, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do fill gap
doFillResult(pSql, pLocalReducer, false);
}
......@@ -1362,10 +1347,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
bool prevGroupCompleted = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if ((isAllSourcesCompleted(pLocalReducer) && !pLocalReducer->hasPrevRow) || pLocalReducer->pLocalDataSrc[0] == NULL ||
prevGroupCompleted) {
......@@ -1373,9 +1355,8 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey;
etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime,
pQueryInfo->slidingTimeUnit, tinfo.precision);
int32_t rows = taosGetNumOfResultWithFill(pFillInfo, 0, etime, pLocalReducer->resColModel->capacity);
assert(pFillInfo->numOfRows == 0);
int32_t rows = getFilledNumOfRes(pFillInfo, etime, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo
doFillResult(pSql, pLocalReducer, true);
}
......
......@@ -50,7 +50,8 @@ typedef struct SFillInfo {
char * nextValues; // next row of data
char** pData; // original result data block involved in filling data
int32_t capacityInRows; // data buffer size in rows
int8_t slidingUnit; // sliding time unit
int8_t precision; // time resoluation
SFillColInfo* pFillCol; // column info for fill operations
} SFillInfo;
......@@ -61,12 +62,13 @@ typedef struct SPoint {
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, char timeUnit, int16_t precision);
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity,
int32_t numOfCols, int64_t slidingTime, int32_t fillType, SFillColInfo* pFillCol);
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
SFillColInfo* pFillCol);
void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp);
void taosDestoryFillInfo(SFillInfo *pFillInfo);
void* taosDestoryFillInfo(SFillInfo *pFillInfo);
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
......@@ -74,9 +76,7 @@ void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput)
void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput);
TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision);
int64_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows);
int64_t getFilledNumOfRes(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);
int32_t taosNumOfRemainRows(SFillInfo *pFillInfo);
......
......@@ -1466,7 +1466,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tfree(pRuntimeEnv->pCtx);
}
taosDestoryFillInfo(pRuntimeEnv->pFillInfo);
pRuntimeEnv->pFillInfo = taosDestoryFillInfo(pRuntimeEnv->pFillInfo);
destroyResultBuf(pRuntimeEnv->pResultBuf, pQInfo);
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
......@@ -3557,9 +3557,7 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
* first result row in the actual result set will fill nothing.
*/
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime,
pQuery->slidingTimeUnit, pQuery->precision);
int32_t numOfTotal = taosGetNumOfResultWithFill(pFillInfo, remain, ekey, pQuery->rec.capacity);
int32_t numOfTotal = getFilledNumOfRes(pFillInfo, pQuery->window.ekey, pQuery->rec.capacity);
return numOfTotal > 0;
}
......@@ -3601,7 +3599,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
}
}
int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int32_t numOfRows, int32_t *numOfInterpo) {
int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int32_t *numOfInterpo) {
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SQuery *pQuery = pRuntimeEnv->pQuery;
SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo;
......@@ -4013,7 +4011,8 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
SFillColInfo* pColInfo = taosCreateFillColInfo(pQuery);
pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, 0, 0, pQuery->rec.capacity, pQuery->numOfOutput,
pQuery->slidingTime, pQuery->fillType, pColInfo);
pQuery->slidingTime, pQuery->slidingTimeUnit, pQuery->precision,
pQuery->fillType, pColInfo);
}
// todo refactor
......@@ -4666,13 +4665,11 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
limitResults(pRuntimeEnv);
break;
} else {
TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime,
pQuery->slidingTimeUnit, pQuery->precision);
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pQuery->rec.rows, ekey);
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pQuery->rec.rows, pQuery->window.ekey);
taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata);
numOfInterpo = 0;
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, pQuery->rec.rows, &numOfInterpo);
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfInterpo);
if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
limitResults(pRuntimeEnv);
break;
......@@ -4704,8 +4701,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
* So, we do keep in this procedure instead of launching retrieve procedure for next results.
*/
int32_t numOfInterpo = 0;
int32_t remain = taosNumOfRemainRows(pRuntimeEnv->pFillInfo);
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, remain, &numOfInterpo);
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfInterpo);
if (pQuery->rec.rows > 0) {
limitResults(pRuntimeEnv);
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "qfill.h"
#include "os.h"
#include "qfill.h"
#include "qextbuffer.h"
#include "taosdef.h"
#include "taosmsg.h"
......@@ -58,7 +58,7 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, ch
}
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int32_t fillType, SFillColInfo* pFillCol) {
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, SFillColInfo* pFillCol) {
if (fillType == TSDB_FILL_NONE) {
return NULL;
}
......@@ -72,8 +72,10 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_
pFillInfo->pFillCol = pFillCol;
pFillInfo->numOfTags = numOfTags;
pFillInfo->numOfCols = numOfCols;
pFillInfo->precision = precision;
pFillInfo->slidingTime = slidingTime;
pFillInfo->slidingUnit = slidingUnit;
pFillInfo->pData = malloc(POINTER_BYTES * numOfCols);
int32_t rowsize = 0;
......@@ -102,9 +104,9 @@ void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) {
pFillInfo->numOfTotal = 0;
}
void taosDestoryFillInfo(SFillInfo* pFillInfo) {
void* taosDestoryFillInfo(SFillInfo* pFillInfo) {
if (pFillInfo == NULL) {
return;
return NULL;
}
tfree(pFillInfo->prevValues);
......@@ -119,6 +121,15 @@ void taosDestoryFillInfo(SFillInfo* pFillInfo) {
tfree(pFillInfo->pFillCol);
tfree(pFillInfo);
return NULL;
}
static TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision) {
if (order == TSDB_ORDER_ASC) {
return ekey;
} else {
return taosGetIntervalStartTimestamp(ekey, timeInterval, slidingTimeUnit, precision);
}
}
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) {
......@@ -126,8 +137,10 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
return;
}
pFillInfo->endKey = taosGetRevisedEndKey(endKey, pFillInfo->order, pFillInfo->slidingTime, pFillInfo->slidingUnit,
pFillInfo->precision);
pFillInfo->rowIdx = 0;
pFillInfo->endKey = endKey;
pFillInfo->numOfRows = numOfRows;
// ensure the space
......@@ -165,36 +178,29 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInpu
}
}
TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision) {
if (order == TSDB_ORDER_ASC) {
return ekey;
} else {
return taosGetIntervalStartTimestamp(ekey, timeInterval, slidingTimeUnit, precision);
}
}
int64_t getFilledNumOfRes(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) {
int64_t* tsList = (int64_t*) pFillInfo->pData[0];
static int32_t taosGetTotalNumOfFilledRes(SFillInfo* pFillInfo, const TSKEY* tsArray, int32_t remain,
int64_t nInterval, int64_t ekey) {
if (remain > 0) { // still fill gap within current data block, not generating data after the result set.
TSKEY lastKey = tsArray[pFillInfo->numOfRows - 1];
int32_t total = (int32_t)(labs(lastKey - pFillInfo->start) / nInterval) + 1;
int32_t numOfRows = taosNumOfRemainRows(pFillInfo);
TSKEY ekey1 = taosGetRevisedEndKey(ekey, pFillInfo->order, pFillInfo->slidingTime, pFillInfo->slidingUnit,
pFillInfo->precision);
assert(total >= remain);
return total;
int64_t numOfRes = -1;
if (numOfRows > 0) { // still fill gap within current data block, not generating data after the result set.
TSKEY lastKey = tsList[pFillInfo->numOfRows - 1];
numOfRes = (int64_t)(labs(lastKey - pFillInfo->start) / pFillInfo->slidingTime) + 1;
assert(numOfRes >= numOfRows);
} else { // reach the end of data
if ((ekey < pFillInfo->start && FILL_IS_ASC_FILL(pFillInfo)) ||
(ekey > pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo))) {
if ((ekey1 < pFillInfo->start && FILL_IS_ASC_FILL(pFillInfo)) ||
(ekey1 > pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo))) {
return 0;
} else {
return (int32_t)(labs(ekey - pFillInfo->start) / nInterval) + 1;
} else { // the numOfRes rows are all filled with specified policy
numOfRes = (labs(ekey1 - pFillInfo->start) / pFillInfo->slidingTime) + 1;
}
}
}
int64_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows) {
int32_t numOfRes = taosGetTotalNumOfFilledRes(pFillInfo, (int64_t*) pFillInfo->pData[0], numOfRows,
pFillInfo->slidingTime, ekey);
return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes;
}
......@@ -496,8 +502,8 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
int64_t taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity) {
int32_t remain = taosNumOfRemainRows(pFillInfo); // todo use iterator?
int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, pFillInfo->endKey, capacity);
int32_t rows = getFilledNumOfRes(pFillInfo, pFillInfo->endKey, capacity);
int32_t numOfRes = generateDataBlockImpl(pFillInfo, output, remain, rows, pFillInfo->pData);
assert(numOfRes == rows);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册