提交 6c90a198 编写于 作者: H hjxilinx

support the interpolation search for interval query

上级 2fc2fb16
......@@ -20,6 +20,7 @@
#include "thistogram.h"
#include "tinterpolation.h"
#include "tlog.h"
#include "tpercentile.h"
#include "tscJoinProcess.h"
#include "tscSyntaxtreefunction.h"
#include "tscompression.h"
......@@ -27,7 +28,6 @@
#include "ttime.h"
#include "ttypes.h"
#include "tutil.h"
#include "tpercentile.h"
#define GET_INPUT_CHAR(x) (((char *)((x)->aInputElemBuf)) + ((x)->startOffset) * ((x)->inputBytes))
#define GET_INPUT_CHAR_INDEX(x, y) (GET_INPUT_CHAR(x) + (y) * (x)->inputBytes)
......@@ -4104,8 +4104,6 @@ static void twa_function(SQLFunctionCtx *pCtx) {
if (pResInfo->superTableQ) {
memcpy(pCtx->aOutputBuf, pInfo, sizeof(STwaInfo));
}
// pCtx->numOfIteratedElems += notNullElems;
}
static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) {
......@@ -4138,7 +4136,6 @@ static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) {
pInfo->lastKey = primaryKey[index];
setTWALastVal(pCtx, pData, 0, pInfo);
// pCtx->numOfIteratedElems += 1;
pResInfo->hasResult = DATA_SET_FLAG;
if (pResInfo->superTableQ) {
......@@ -4403,10 +4400,8 @@ static double do_calc_rate(const SRateInfo* pRateInfo) {
}
}
int64_t duration = pRateInfo->lastKey - pRateInfo->firstKey;
duration = (duration + 500) / 1000;
double resultVal = ((double)diff) / duration;
double duration = (pRateInfo->lastKey - pRateInfo->firstKey) / 1000;
double resultVal = diff / duration;
pTrace("do_calc_rate() isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%f lastValue:%f CorrectionValue:%f resultVal:%f",
pRateInfo->isIRate, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, resultVal);
......@@ -4447,62 +4442,156 @@ static void rate_function(SQLFunctionCtx *pCtx) {
TSKEY *primaryKey = pCtx->ptsList;
pTrace("%p rate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull);
for (int32_t i = 0; i < pCtx->size; ++i) {
char *pData = GET_INPUT_CHAR_INDEX(pCtx, i);
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
pTrace("%p rate_function() index of null data:%d", pCtx, i);
continue;
if (pCtx->order == TSQL_SO_ASC) {
// prev interpolation exists
if (pCtx->prev.key != -1) {
pRateInfo->firstValue = pCtx->prev.data;
pRateInfo->firstKey = pCtx->prev.key;
pCtx->prev.key = -1; // clear the flag
}
notNullElems++;
for (int32_t i = 0; i < pCtx->size; ++i) {
char *pData = GET_INPUT_CHAR_INDEX(pCtx, i);
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
pTrace("%p rate_function() index of null data:%d", pCtx, i);
continue;
}
double v = 0;
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_TINYINT:
v = (double)GET_INT8_VAL(pData);
break;
case TSDB_DATA_TYPE_SMALLINT:
v = (double)GET_INT16_VAL(pData);
break;
case TSDB_DATA_TYPE_INT:
v = (double)GET_INT32_VAL(pData);
break;
case TSDB_DATA_TYPE_BIGINT:
v = (double)GET_INT64_VAL(pData);
break;
case TSDB_DATA_TYPE_FLOAT:
v = (double)GET_FLOAT_VAL(pData);
break;
case TSDB_DATA_TYPE_DOUBLE:
v = (double)GET_DOUBLE_VAL(pData);
break;
default:
assert(0);
notNullElems++;
double v = 0;
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_TINYINT:
v = (double)GET_INT8_VAL(pData);
break;
case TSDB_DATA_TYPE_SMALLINT:
v = (double)GET_INT16_VAL(pData);
break;
case TSDB_DATA_TYPE_INT:
v = (double)GET_INT32_VAL(pData);
break;
case TSDB_DATA_TYPE_BIGINT:
v = (double)GET_INT64_VAL(pData);
break;
case TSDB_DATA_TYPE_FLOAT:
v = (double)GET_FLOAT_VAL(pData);
break;
case TSDB_DATA_TYPE_DOUBLE:
v = (double)GET_DOUBLE_VAL(pData);
break;
default:
assert(0);
}
if ((-DBL_MAX == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) {
pRateInfo->firstValue = v;
pRateInfo->firstKey = primaryKey[i];
pTrace("firstValue:%f firstKey:%" PRId64, pRateInfo->firstValue, pRateInfo->firstKey);
}
if (-DBL_MAX == pRateInfo->lastValue) {
pRateInfo->lastValue = v;
} else if (v < pRateInfo->lastValue) {
pRateInfo->CorrectionValue += pRateInfo->lastValue;
pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue);
}
pRateInfo->lastValue = v;
pRateInfo->lastKey = primaryKey[i];
pTrace("lastValue:%f lastKey:%" PRId64, pRateInfo->lastValue, pRateInfo->lastKey);
}
if ((-DBL_MAX == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) {
pRateInfo->firstValue = v;
pRateInfo->firstKey = primaryKey[i];
if (!pCtx->hasNull) {
assert(pCtx->size == notNullElems);
}
if (pCtx->next.key != -1) {
if (pCtx->next.data < pRateInfo->lastValue) {
pRateInfo->CorrectionValue += pRateInfo->lastValue;
pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue);
}
pRateInfo->lastValue = pCtx->next.data;
pRateInfo->lastKey = pCtx->next.key;
pCtx->next.key = -1;
}
} else {
if (pCtx->next.key != -1) {
pRateInfo->lastValue = pCtx->next.data;
pRateInfo->lastKey = pCtx->next.key;
pCtx->next.key = -1;
}
for (int32_t i = pCtx->size - 1; i >= 0; --i) {
char *pData = GET_INPUT_CHAR_INDEX(pCtx, i);
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
pTrace("%p rate_function() index of null data:%d", pCtx, i);
continue;
}
notNullElems++;
double v = 0;
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_TINYINT:
v = (double)GET_INT8_VAL(pData);
break;
case TSDB_DATA_TYPE_SMALLINT:
v = (double)GET_INT16_VAL(pData);
break;
case TSDB_DATA_TYPE_INT:
v = (double)GET_INT32_VAL(pData);
break;
case TSDB_DATA_TYPE_BIGINT:
v = (double)GET_INT64_VAL(pData);
break;
case TSDB_DATA_TYPE_FLOAT:
v = (double)GET_FLOAT_VAL(pData);
break;
case TSDB_DATA_TYPE_DOUBLE:
v = (double)GET_DOUBLE_VAL(pData);
break;
default:
assert(0);
}
if ((-DBL_MAX == pRateInfo->lastValue) || (INT64_MIN == pRateInfo->lastKey)) {
pRateInfo->lastValue = v;
pRateInfo->lastKey = primaryKey[i];
pTrace("firstValue:%f firstKey:%" PRId64, pRateInfo->lastValue, pRateInfo->lastKey);
}
if (-DBL_MAX == pRateInfo->firstValue) {
pRateInfo->firstValue = v;
} else if (v > pRateInfo->firstValue) {
pRateInfo->CorrectionValue += pRateInfo->firstValue;
pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue);
}
pRateInfo->firstValue = v;
pRateInfo->firstKey = primaryKey[i];
pTrace("firstValue:%f firstKey:%" PRId64, pRateInfo->firstValue, pRateInfo->firstKey);
}
if (-DBL_MAX == pRateInfo->lastValue) {
pRateInfo->lastValue = v;
} else if (v < pRateInfo->lastValue) {
pRateInfo->CorrectionValue += pRateInfo->lastValue;
pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue);
if (!pCtx->hasNull) {
assert(pCtx->size == notNullElems);
}
pRateInfo->lastValue = v;
pRateInfo->lastKey = primaryKey[i];
pTrace("lastValue:%f lastKey:%" PRId64, pRateInfo->lastValue, pRateInfo->lastKey);
}
if (!pCtx->hasNull) {
assert(pCtx->size == notNullElems);
}
if (pCtx->prev.key != -1) {
if (pCtx->prev.data > pRateInfo->firstValue) {
pRateInfo->CorrectionValue += pRateInfo->firstValue;
pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue);
}
pRateInfo->firstValue = pCtx->prev.data;
pRateInfo->firstKey = pCtx->prev.key;
pCtx->prev.key = -1;
}
};
SET_VAL(pCtx, notNullElems, 1);
......
......@@ -83,6 +83,8 @@ int32_t taosDoInterpoResult(SInterpolationInfo *pInterpoInfo, int16_t interpoTyp
int taosDoLinearInterpolation(int32_t type, SPoint *point1, SPoint *point2, SPoint *point);
int taosDoLinearInterpolationD(int32_t type, SPoint* point1, SPoint* point2, SPoint* point);
#ifdef __cplusplus
}
#endif
......
......@@ -169,7 +169,7 @@ typedef struct SExtTagsInfo {
typedef struct SBoundaryData {
TSKEY key;
char* data;
double data;
} SBoundaryData;
// sql function runtime context
......@@ -200,8 +200,8 @@ typedef struct SQLFunctionCtx {
SResultInfo *resultInfo;
SExtTagsInfo tagInfo;
SBoundaryData beforeRow; // this value may be less or equalled to the start time of time window
SBoundaryData afterRow; // this value may be greater or equalled to the end time of time window
SBoundaryData prev; // this value may be less or equalled to the start time of time window
SBoundaryData next; // this value may be greater or equalled to the end time of time window
} SQLFunctionCtx;
typedef struct SQLAggFuncElem {
......
......@@ -172,15 +172,16 @@ typedef struct SQueryRuntimeEnv {
SWindowResInfo windowResInfo;
// require time stamp that are direct before/after query time window
bool boundaryExternalTS;
STSBuf* pTSBuf;
STSCursor cur;
SQueryCostSummary summary;
bool stableQuery; // is super table query or not
SQueryDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
bool hasTimeWindow;
char** lastRowInBlock;
bool interpoSearch;
/*
* Temporarily hold the in-memory cache block info during scan cache blocks
* Here we do not use the cache block info from pMeterObj, simple because it may change anytime
......@@ -188,10 +189,6 @@ typedef struct SQueryRuntimeEnv {
* So we keep a copy of the support structure as well as the cache block data itself.
*/
SCacheBlock cacheBlock;
SPointInterpoSupporter* pInterpoSupporter;
bool hasTimeWindow;
bool interpoSearch;
} SQueryRuntimeEnv;
/* intermediate pos during multimeter query involves interval */
......
......@@ -1754,7 +1754,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat
pCtx[k].size = forwardStep;
pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? startPos : startPos - (forwardStep - 1);
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0 || functionId == TSDB_FUNC_RATE) {
pCtx[k].ptsList = (TSKEY *)((char*)pRuntimeEnv->primaryColBuffer->data + pCtx[k].startOffset * TSDB_KEYSIZE);
}
......@@ -1841,6 +1841,31 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
}
}
static void handleInBlockEkeyInterpolation(SQueryRuntimeEnv* pRuntimeEnv, int32_t endPos,
const TSKEY* primaryKeyCol, STimeWindow* win, SQLFunctionCtx* pCtx) {
// this query time window ended in the current data block
SQuery* pQuery = pRuntimeEnv->pQuery;
TSKEY lastKey = primaryKeyCol[endPos];
TSKEY e = win->skey + pQuery->intervalTime;
TSKEY next = primaryKeyCol[endPos + 1];
// the next key is beyond the query time range
if ((next > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (next > pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))) {
pCtx->next.key = -1;
return;
}
pCtx->next.key = e;
char *d = pCtx->aInputElemBuf + pCtx->inputBytes * endPos;
SPoint point1 = (SPoint){.key = lastKey, .val = d};
SPoint point2 = (SPoint){.key = next, .val = (d + pCtx->inputBytes)};
SPoint point = (SPoint){.key = pCtx->next.key, .val = &pCtx->next.data};
taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point);
}
static TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) {
TSKEY ekey = -1;
if (QUERY_IS_ASC_QUERY(pQuery)) {
......@@ -1858,6 +1883,208 @@ static TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) {
return ekey;
}
static void interpolateEndKeyValue(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* pBlockInfo, STimeWindow* win,
int32_t endPos, SQLFunctionCtx* pCtx, int32_t index) {
SQuery* pQuery = pRuntimeEnv->pQuery;
TSKEY *primaryKeyCol = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
// if current query window beyonds the whole query window, do not employ the interpolation
if ((win->ekey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(win->ekey >= pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))) {
pCtx->next.key = -1;
return;
}
if (!QUERY_IS_ASC_QUERY(pQuery) && win->skey >= pBlockInfo->keyLast) {
pCtx->next.key = -1;
return;
}
if (QUERY_IS_ASC_QUERY(pQuery)) {
/*
* the time window closed before current data block, use the interpolation to generate
* the final result part, endPos equals to -1 means that this time window ends before current data block.
*/
if (win->ekey < pBlockInfo->keyFirst) {
assert(endPos == -1);
TSKEY prev = *(int64_t*) pRuntimeEnv->lastRowInBlock[0];
TSKEY next = pBlockInfo->keyFirst;
pCtx->next.key = win->skey + pQuery->intervalTime;
char *d = pCtx->aInputElemBuf;
SPoint point1 = (SPoint){.key = prev, .val = pRuntimeEnv->lastRowInBlock[index]};
SPoint point2 = (SPoint){.key = next, .val = d};
SPoint point = (SPoint){.key = pCtx->next.key, .val = &pCtx->next.data};
taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point);
} else if (win->ekey < pBlockInfo->keyLast) {
handleInBlockEkeyInterpolation(pRuntimeEnv, endPos, primaryKeyCol, win, pCtx);
} else {
//do nothing now, the interpolation will be handled before processing the next data block
assert(win->ekey >= pBlockInfo->keyLast);
pCtx->next.key = -1;
}
} else { // desc order query
//the time window closed before current data block, use the interpolation to generate the final result part.
if (win->ekey >= pBlockInfo->keyLast) {
TSKEY prev = pBlockInfo->keyLast;
TSKEY next = *(TSKEY*) pRuntimeEnv->lastRowInBlock[0];
pCtx->next.key = win->skey + pQuery->intervalTime;
char *d = pCtx->aInputElemBuf + (pBlockInfo->size - 1) * pCtx->inputBytes;
SPoint point1 = (SPoint){.key = prev, .val = d};
SPoint point2 = (SPoint){.key = next, .val = pRuntimeEnv->lastRowInBlock[index]};
SPoint point = (SPoint){.key = pCtx->next.key, .val = &pCtx->next.data};
taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point);
} else if (win->ekey < pBlockInfo->keyLast) {
handleInBlockEkeyInterpolation(pRuntimeEnv, endPos, primaryKeyCol, win, pCtx);
} else {
pCtx->next.key = -1;
}
}
}
static void handleInBlockSkeyInterpolation (SQueryRuntimeEnv* pRuntimeEnv, int32_t startPos,
const TSKEY* primaryKeyCol, STimeWindow* win, SQLFunctionCtx* pCtx) {
assert(startPos > 0);
SQuery* pQuery = pRuntimeEnv->pQuery;
TSKEY prev = primaryKeyCol[startPos - 1];
TSKEY next = primaryKeyCol[startPos];
if (!QUERY_IS_ASC_QUERY(pQuery) && prev < pQuery->ekey) {
pCtx->prev.key = -1;
return;
}
pCtx->prev.key = win->skey;
char *d = pCtx->aInputElemBuf + pCtx->inputBytes * (startPos - 1);
SPoint point1 = (SPoint){.key = prev, .val = d};
SPoint point2 = (SPoint){.key = next, .val = (d + pCtx->inputBytes)};
SPoint point = (SPoint){.key = pCtx->prev.key, .val = &pCtx->prev.data};
taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point);
}
static void interpolateStartKeyValue(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* pBlockInfo, SWindowResInfo* pWindowResInfo,
STimeWindow* win, int32_t startPos, SQLFunctionCtx* pCtx, int32_t index) {
SQuery* pQuery = pRuntimeEnv->pQuery;
TSKEY *primaryKeyCol = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
TSKEY skey = primaryKeyCol[startPos];
/*
* no need the start time interpolation
* 1. current window is the first window in either ascending or descending order output
* 2. time window start exactly from a timestamp with data
*/
if (skey == win->skey || win->skey < pWindowResInfo->startTime ||
(win->skey < pQuery->skey && QUERY_IS_ASC_QUERY(pQuery)) ||
(win->skey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
pCtx->prev.key = -1;
return;
}
if (QUERY_IS_ASC_QUERY(pQuery)) {
// the queried time window and time window of data block must be intersect
assert(win->ekey >= pBlockInfo->keyFirst && win->skey <= pBlockInfo->keyLast);
/*
* this win should not be the first time window that starts from a less timestamp than
* the skey of current data block
*/
if (win->skey < pBlockInfo->keyFirst) {
TSKEY prev = *(TSKEY*) pRuntimeEnv->lastRowInBlock[0];
TSKEY next = pBlockInfo->keyFirst;
pCtx->prev.key = win->skey;
char *d = pCtx->aInputElemBuf;
SPoint point1 = (SPoint){.key = prev, .val = pRuntimeEnv->lastRowInBlock[index]};
SPoint point2 = (SPoint){.key = next, .val = d};
SPoint point = (SPoint){.key = pCtx->prev.key, .val = &pCtx->prev.data};
taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point);
} else {
handleInBlockSkeyInterpolation(pRuntimeEnv, startPos, primaryKeyCol, win, pCtx);
}
} else { // desc order
if (win->skey > pBlockInfo->keyLast) {
//this pBlockInfo located before current time window
TSKEY prev = pBlockInfo->keyLast;
TSKEY next = *(TSKEY*) pRuntimeEnv->lastRowInBlock[0];
pCtx->prev.key = win->skey;
char *d = pCtx->aInputElemBuf + (pBlockInfo->size - 1) * pCtx->inputBytes;
SPoint point1 = (SPoint){.key = prev, .val = d};
SPoint point2 = (SPoint){.key = next, .val = pRuntimeEnv->lastRowInBlock[index]};
SPoint point = (SPoint){.key = pCtx->prev.key, .val = &pCtx->prev.data};
taosDoLinearInterpolationD(pCtx->inputType, &point1, &point2, &point);
} else {
// the queried time window and time window of data block must be intersect
assert(win->ekey >= pBlockInfo->keyFirst && win->skey <= pBlockInfo->keyLast);
if (win->skey >= pBlockInfo->keyFirst) {
// the pBlockInfo is intersected with query time window
handleInBlockSkeyInterpolation(pRuntimeEnv, startPos, primaryKeyCol, win, pCtx);
} else {
assert(win->skey < pBlockInfo->keyFirst && win->ekey >= pBlockInfo->keyFirst);
pCtx->prev.key = -1;
}
}
}
}
static void doSetInterpolationDataForTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo *pWindowResInfo,
SBlockInfo* pBlockInfo, STimeWindow* win, int32_t startPos, int32_t forwardStep) {
SQuery* pQuery = pRuntimeEnv->pQuery;
TSKEY* primaryKeyCol = (TSKEY*) pRuntimeEnv->primaryColBuffer->data;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (pRuntimeEnv->interpoSearch) {
int32_t s = startPos;
int32_t e = forwardStep * step + startPos - step;
if (!QUERY_IS_ASC_QUERY(pQuery)) {
SWAP(s, e, int32_t);
}
for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo;
interpolateEndKeyValue(pRuntimeEnv, pBlockInfo, win, e, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf);
}
// the first time window, do not employ the interpolation
if (primaryKeyCol[s] == pWindowResInfo->startTime) {
for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
pRuntimeEnv->pCtx[i].prev.key = -1;
}
} else {
for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo;
interpolateStartKeyValue(pRuntimeEnv, pBlockInfo, pWindowResInfo, win, s, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf);
}
}
}
}
/**
*
* @param pRuntimeEnv
......@@ -1906,26 +2133,56 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
setExecParams(pRuntimeEnv, &pCtx[k], pQuery->skey, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField,
hasNull, &sasArray[k], pBlockInfo, index);
}
for(int32_t i = 0; i < pQuery->numOfCols; ++i) {
SColumnInfo* pColInfo = &pQuery->colList[i].data;
int32_t s = (QUERY_IS_ASC_QUERY(pQuery))? pColInfo->bytes * (pBlockInfo->size - 1) : 0;
memcpy(pRuntimeEnv->lastRowInBlock[i], pRuntimeEnv->colDataBuffer[i]->data + s, pColInfo->bytes);
}
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (isIntervalQuery(pQuery)) {
int32_t offset = GET_COL_DATA_POS(pQuery, 0, step);
TSKEY ts = primaryKeyCol[offset];
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
// get current not closed time window
int32_t slot = pWindowResInfo->curIndex;
if (slot != -1) {
STimeWindow w = getWindowResult(pWindowResInfo, slot)->window;
// if current window is closed and locates before current active block, interpolate the result and close it
if (w.skey != win.skey || w.ekey != win.ekey) {
assert((w.ekey < win.skey && w.ekey < ts && QUERY_IS_ASC_QUERY(pQuery)) ||
(w.skey > win.ekey && w.skey > ts && !QUERY_IS_ASC_QUERY(pQuery)));
forwardStep = 0;
doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &w, offset, forwardStep);
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &w, pQuery->pos, forwardStep);
closeTimeWindow(pWindowResInfo, curTimeWindow(pWindowResInfo));
}
}
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win) != TSDB_CODE_SUCCESS) {
return 0;
}
TSKEY ekey = reviseWindowEkey(pQuery, &win);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, true);
doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &win, offset, forwardStep);
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep);
int32_t index = pWindowResInfo->curIndex;
int32_t index = pWindowResInfo->curIndex;
STimeWindow nextWin = win;
while (1) {
int32_t startPos =
getNextQualifiedWindow(pRuntimeEnv, &nextWin, pWindowResInfo, pBlockInfo, primaryKeyCol, searchFn);
......@@ -1941,7 +2198,9 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
ekey = reviseWindowEkey(pQuery, &nextWin);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, startPos, ekey, searchFn, true);
doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &nextWin, startPos, forwardStep);
pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep);
}
......@@ -1955,6 +2214,9 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
*/
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
pCtx[k].next.key = -1;
pCtx[k].prev.key = -1;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]);
}
......@@ -2689,35 +2951,6 @@ void setExecParams(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int64_t
int32_t scanFlag = pRuntimeEnv->scanFlag;
int32_t blockStatus = pRuntimeEnv->blockStatus;
int64_t* tsArray = (int64_t*)primaryColumnData;
if (needsBoundaryTS(pQuery)) {
SPointInterpoSupporter* pSupporter = pRuntimeEnv->pInterpoSupporter;
if (pRuntimeEnv->hasTimeWindow) {
if (startQueryTimestamp != tsArray[startOffset]) {
assert(startQueryTimestamp <= tsArray[startOffset]);
pCtx->beforeRow = (SBoundaryData){*(int64_t*)pSupporter->pPrevPoint[0], pSupporter->pPrevPoint[index]};
} else {
pCtx->beforeRow.key = -1;
}
if (startOffset + size < pBlockInfo->size) {
if (pQuery->ekey < tsArray[startOffset + size]) {
getOneRowFromDataBlock(pRuntimeEnv, pRuntimeEnv->pInterpoSupporter->pNextPoint, startOffset + size);
pCtx->afterRow = (SBoundaryData){*(int64_t*)pSupporter->pNextPoint[0], pSupporter->pNextPoint[index]};
} else {
pCtx->afterRow.key = -1;
}
} else {// current query ekey is greater than current data block, do not set the after row value
pCtx->afterRow.key = -1;
}
} else {
pCtx->beforeRow.key = -1;
pCtx->afterRow.key = -1;
}
}
pCtx->nStartQueryTimestamp = startQueryTimestamp;
pCtx->scanFlag = scanFlag;
......@@ -3487,10 +3720,8 @@ static bool getNeighborPoints(STableQuerySupportObj *pSupporter, SMeterObj *pMet
} else {
assert(QUERY_IS_ASC_QUERY(pQuery));
}
assert(pPointInterpSupporter != NULL && pQuery->skey == pQuery->ekey);
SCacheBlock *pBlock = NULL;
qTrace("QInfo:%p get next data point, fileId:%d, slot:%d, pos:%d", GET_QINFO_ADDR(pQuery), pQuery->fileId,
pQuery->slot, pQuery->pos);
......@@ -3697,13 +3928,11 @@ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, STableQuerySup
}
// needs the data before the begin timestamp of query time window
if ((*key) != pQuery->skey && needsBoundaryTS(pQuery)) {
if ((*key) != pQuery->skey) {
if (!pRuntimeEnv->hasTimeWindow) {
pQuery->skey = nextKey; // change the query skey
return true;
} else {
return loadPrevDataPoint(pRuntimeEnv, pPointInterpSupporter->pPrevPoint);
pQuery->skey = nextKey; // change the query skey
}
return true;
} else {
return doGetQueryPos(nextKey, pSupporter, pPointInterpSupporter);
}
......@@ -4560,12 +4789,15 @@ static int32_t allocateRuntimeEnvBuf(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *p
SQuery *pQuery = pRuntimeEnv->pQuery;
// To make sure the start position of each buffer is aligned to 4bytes in 32-bit ARM system.
pRuntimeEnv->lastRowInBlock = calloc(pQuery->numOfCols, POINTER_BYTES);
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
int32_t bytes = pQuery->colList[i].data.bytes;
pRuntimeEnv->colDataBuffer[i] = calloc(1, sizeof(SData) + EXTRA_BYTES + pMeterObj->pointsPerFileBlock * bytes);
if (pRuntimeEnv->colDataBuffer[i] == NULL) {
goto _error_clean;
}
pRuntimeEnv->lastRowInBlock[i] = calloc(1, bytes);
}
// record the maximum column width among columns of this meter/metric
......@@ -4679,9 +4911,9 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pMeterObj = pMeterObj;
pRuntimeEnv->boundaryExternalTS = needsBoundaryTS(pQuery);
pRuntimeEnv->hasTimeWindow = !notHasQueryTimeRange(pQuery);
pRuntimeEnv->interpoSearch = needsBoundaryTS(pQuery);
if ((code = allocateRuntimeEnvBuf(pRuntimeEnv, pMeterObj)) != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -4736,9 +4968,9 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
/* query on single table */
pSupporter->numOfMeters = 1;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
pRuntimeEnv->pInterpoSupporter = calloc(1, sizeof(SPointInterpoSupporter));
pointInterpSupporterInit(pQuery, pRuntimeEnv->pInterpoSupporter);
SPointInterpoSupporter interpoSupporter = {0};
pointInterpSupporterInit(pQuery, &interpoSupporter);
/*
* in case of last_row query without query range, we set the query timestamp to
......@@ -4746,11 +4978,11 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
*/
if (isFirstLastRowQuery(pQuery) && notHasQueryTimeRange(pQuery)) {
if (!normalizeUnBoundLastRowQuery(pSupporter, pRuntimeEnv->pInterpoSupporter)) {
if (!normalizeUnBoundLastRowQuery(pSupporter, &interpoSupporter)) {
sem_post(&pQInfo->dataReady);
pQInfo->over = 1;
// pointInterpSupporterDestroy(pRuntimeEnv->pInterpoSupporter);
pointInterpSupporterDestroy(&interpoSupporter);
return TSDB_CODE_SUCCESS;
}
} else { // find the skey and ekey in case of sliding query
......@@ -4764,23 +4996,34 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
}
int64_t skey = 0;
if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, pRuntimeEnv->pInterpoSupporter, &skey) == false) ||
if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpoSupporter, &skey) == false) ||
(isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) ||
(isTopBottomQuery(pQuery) && pQuery->limit.offset >= pQuery->pSelectExpr[1].pBase.arg[0].argValue.i64)) {
sem_post(&pQInfo->dataReady);
pQInfo->over = 1;
// pointInterpSupporterDestroy(&interpInfo);
pointInterpSupporterDestroy(&interpoSupporter);
return TSDB_CODE_SUCCESS;
}
pQuery->skey = skey;
if (!QUERY_IS_ASC_QUERY(pQuery)) {
win.skey = minKey;
win.ekey = skey;
pQuery->ekey = minKey;
} else {
win.skey = skey;
win.ekey = pQuery->ekey;
}
// empty result
if (QUERY_IS_ASC_QUERY(pQuery) && win.skey > win.ekey) {
sem_post(&pQInfo->dataReady);
pQInfo->over = 1;
pointInterpSupporterDestroy(&interpoSupporter);
return TSDB_CODE_SUCCESS;
}
TSKEY skey1, ekey1;
TSKEY windowSKey = 0, windowEKey = 0;
......@@ -4799,13 +5042,13 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
pQuery->over = QUERY_NOT_COMPLETED;
} else {
int64_t ekey = 0;
if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, pRuntimeEnv->pInterpoSupporter, &ekey) == false) ||
if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpoSupporter, &ekey) == false) ||
(isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) ||
(isTopBottomQuery(pQuery) && pQuery->limit.offset >= pQuery->pSelectExpr[1].pBase.arg[0].argValue.i64)) {
sem_post(&pQInfo->dataReady);
pQInfo->over = 1;
// pointInterpSupporterDestroy(&interpInfo);
pointInterpSupporterDestroy(&interpoSupporter);
return TSDB_CODE_SUCCESS;
}
}
......@@ -4815,8 +5058,8 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
* here we set the value for before and after the specified time into the
* parameter for interpolation query
*/
pointInterpSupporterSetData(pQInfo, pRuntimeEnv->pInterpoSupporter);
// pointInterpSupporterDestroy(&interpInfo);
pointInterpSupporterSetData(pQInfo, &interpoSupporter);
pointInterpSupporterDestroy(&interpoSupporter);
if (!forwardQueryStartPosIfNeeded(pQInfo, pSupporter, dataInDisk, dataInCache)) {
return TSDB_CODE_SUCCESS;
......
......@@ -191,6 +191,49 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi
return 0;
}
int taosDoLinearInterpolationD(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) {
switch (type) {
case TSDB_DATA_TYPE_INT: {
*(double*) point->val = doLinearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, point1->key,
point2->key, point->key);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
*(double*)point->val =
doLinearInterpolationImpl(*(float*)point1->val, *(float*)point2->val, point1->key, point2->key, point->key);
break;
};
case TSDB_DATA_TYPE_DOUBLE: {
*(double*)point->val =
doLinearInterpolationImpl(*(double*)point1->val, *(double*)point2->val, point1->key, point2->key, point->key);
break;
};
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT: {
*(double*)point->val = doLinearInterpolationImpl(*(int64_t*)point1->val, *(int64_t*)point2->val, point1->key,
point2->key, point->key);
break;
};
case TSDB_DATA_TYPE_SMALLINT: {
*(double*)point->val = doLinearInterpolationImpl(*(int16_t*)point1->val, *(int16_t*)point2->val, point1->key,
point2->key, point->key);
break;
};
case TSDB_DATA_TYPE_TINYINT: {
*(double*)point->val =
doLinearInterpolationImpl(*(int8_t*)point1->val, *(int8_t*)point2->val, point1->key, point2->key, point->key);
break;
};
default: {
// TODO: Deal with interpolation with bool and strings and timestamp
return -1;
}
}
return 0;
}
static char* getPos(char* data, int32_t bytes, int32_t index) { return data + index * bytes; }
static void setTagsValueInInterpolation(tFilePage** data, char** pTags, SColumnModel* pModel, int32_t order,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册