提交 9dc0bf6c 编写于 作者: H Haojun Liao

[TD-2339]<feature>: interpolation can be applied along with the time window.

上级 d50054c7
......@@ -2194,6 +2194,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (getColumnIndexByName(pCmd, &pParamElem->pNode->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
......@@ -4524,10 +4525,10 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
size_t size = tscNumOfFields(pQueryInfo);
size_t numOfFields = tscNumOfFields(pQueryInfo);
if (pQueryInfo->fillVal == NULL) {
pQueryInfo->fillVal = calloc(size, sizeof(int64_t));
pQueryInfo->fillVal = calloc(numOfFields, sizeof(int64_t));
if (pQueryInfo->fillVal == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -4537,7 +4538,7 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
pQueryInfo->fillType = TSDB_FILL_NONE;
} else if (strncasecmp(pItem->pVar.pz, "null", 4) == 0 && pItem->pVar.nLen == 4) {
pQueryInfo->fillType = TSDB_FILL_NULL;
for (int32_t i = START_INTERPO_COL_IDX; i < size; ++i) {
for (int32_t i = START_INTERPO_COL_IDX; i < numOfFields; ++i) {
TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
setNull((char*)&pQueryInfo->fillVal[i], pField->type, pField->bytes);
}
......@@ -4551,7 +4552,7 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
pQueryInfo->fillType = TSDB_FILL_SET_VALUE;
size_t num = taosArrayGetSize(pFillToken);
if (num == 1) {
if (num == 1) { // no actual value, return with error code
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
......@@ -4562,11 +4563,11 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
if (tscIsPointInterpQuery(pQueryInfo)) {
startPos = 0;
if (numOfFillVal > size) {
numOfFillVal = (int32_t)size;
if (numOfFillVal > numOfFields) {
numOfFillVal = (int32_t)numOfFields;
}
} else {
numOfFillVal = (int16_t)((num > (int32_t)size) ? (int32_t)size : num);
numOfFillVal = (int16_t)((num > (int32_t)numOfFields) ? (int32_t)numOfFields : num);
}
int32_t j = 1;
......@@ -4586,10 +4587,10 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
}
}
if ((num < size) || ((num - 1 < size) && (tscIsPointInterpQuery(pQueryInfo)))) {
if ((num < numOfFields) || ((num - 1 < numOfFields) && (tscIsPointInterpQuery(pQueryInfo)))) {
tVariantListItem* lastItem = taosArrayGetLast(pFillToken);
for (int32_t i = numOfFillVal; i < size; ++i) {
for (int32_t i = numOfFillVal; i < numOfFields; ++i) {
TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
if (pField->type == TSDB_DATA_TYPE_BINARY || pField->type == TSDB_DATA_TYPE_NCHAR) {
......
......@@ -114,6 +114,8 @@ void* tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_
char* tsdbGetTableName(void *pTable);
#define TSDB_TABLEID(_table) ((STableId*) (_table))
#define TSDB_PREV_ROW 0x1
#define TSDB_NEXT_ROW 0x2
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
......@@ -141,7 +143,6 @@ typedef struct {
int64_t tableTotalDataSize; // In bytes
int64_t tableTotalDiskSize; // In bytes
} STableInfo;
STableInfo *tsdbGetTableInfo(TSDB_REPO_T *pRepo, STableId tid);
// -- FOR INSERT DATA
/**
......@@ -163,6 +164,7 @@ typedef struct STsdbQueryCond {
int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols;
SColumnInfo *colList;
bool loadExternalRows; // load external rows or not
} STsdbQueryCond;
typedef struct SMemRef {
......@@ -240,6 +242,8 @@ TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond
*/
bool tsdbNextDataBlock(TsdbQueryHandleT *pQueryHandle);
SArray* tsdbGetExternalRow(TsdbQueryHandleT *pHandle, SMemRef* pMemRef, int16_t type);
/**
* Get current data block information
*
......
......@@ -45,6 +45,7 @@ typedef struct tstr {
case TSDB_DATA_TYPE_USMALLINT: \
(_v) = (_finalType)GET_UINT16_VAL(_data); \
break; \
case TSDB_DATA_TYPE_TIMESTAMP:\
case TSDB_DATA_TYPE_BIGINT: \
(_v) = (_finalType)(GET_INT64_VAL(_data)); \
break; \
......@@ -66,6 +67,43 @@ typedef struct tstr {
} \
} while (0)
#define SET_TYPED_DATA(_v, _type, _data) \
do { \
switch (_type) { \
case TSDB_DATA_TYPE_BOOL: \
case TSDB_DATA_TYPE_TINYINT: \
*(int8_t *)(_v) = (_data); \
break; \
case TSDB_DATA_TYPE_UTINYINT: \
*(uint8_t *)(_v) = (_data); \
break; \
case TSDB_DATA_TYPE_SMALLINT: \
*(int16_t *)(_v) = (_data); \
break; \
case TSDB_DATA_TYPE_USMALLINT: \
*(uint16_t *)(_v) = (_data); \
break; \
case TSDB_DATA_TYPE_BIGINT: \
*(int64_t *)(_v) = (_data); \
break; \
case TSDB_DATA_TYPE_UBIGINT: \
*(uint64_t *)(_v) = (_data); \
break; \
case TSDB_DATA_TYPE_FLOAT: \
*(float *)(_v) = (_data); \
break; \
case TSDB_DATA_TYPE_DOUBLE: \
*(double *)(_v) = (_data); \
break; \
case TSDB_DATA_TYPE_UINT: \
*(uint32_t *)(_v) = (_data); \
break; \
default: \
*(int32_t *)(_v) = (_data); \
break; \
} \
} while (0)
#define IS_SIGNED_NUMERIC_TYPE(_t) ((_t) >= TSDB_DATA_TYPE_TINYINT && (_t) <= TSDB_DATA_TYPE_BIGINT)
#define IS_UNSIGNED_NUMERIC_TYPE(_t) ((_t) >= TSDB_DATA_TYPE_UTINYINT && (_t) <= TSDB_DATA_TYPE_UBIGINT)
#define IS_FLOAT_TYPE(_t) ((_t) == TSDB_DATA_TYPE_FLOAT || (_t) == TSDB_DATA_TYPE_DOUBLE)
......
......@@ -169,6 +169,7 @@ typedef struct SQuery {
int32_t pos;
tFilePage** sdata;
STableQueryInfo* current;
int32_t numOfCheckedBlocks; // number of check data blocks
SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query.
SSingleColumnFilterInfo* pFilterInfo;
......
......@@ -86,7 +86,7 @@ bool taosFillHasMoreResults(SFillInfo* pFillInfo);
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);
int32_t taosGetLinearInterpolationVal(int32_t type, SPoint *point1, SPoint *point2, SPoint *point);
int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType);
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity);
......
......@@ -153,7 +153,7 @@ typedef struct SResultRowCellInfo {
typedef struct SPoint1 {
int64_t key;
double val;
union{double val; char* ptr;};
} SPoint1;
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowCellInfo)))
......
......@@ -3776,89 +3776,67 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) {
*
* @param pCtx
*/
static void interp_function(SQLFunctionCtx *pCtx) {
// at this point, the value is existed, return directly
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SInterpInfoDetail* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
assert(pCtx->startOffset == 0);
if (pCtx->size == 1) {
char *pData = GET_INPUT_DATA_LIST(pCtx);
assignVal(pCtx->aOutputBuf, pData, pCtx->inputBytes, pCtx->inputType);
} else {
/*
* use interpolation to generate the result.
* Note: the result of primary timestamp column uses the timestamp specified by user in the query sql
*/
assert(pCtx->size == 2);
if (pInfo->type == TSDB_FILL_NONE) { // set no output result
static void interp_function_impl(SQLFunctionCtx *pCtx) {
int32_t type = pCtx->param[2].i64;
if (type == TSDB_FILL_NONE) {
return;
}
if (pInfo->primaryCol == 1) {
*(TSKEY *) pCtx->aOutputBuf = pInfo->ts;
} else {
if (pInfo->type == TSDB_FILL_NULL) {
if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pCtx->aOutputBuf, pCtx->outputType);
if (pCtx->inputType == TSDB_DATA_TYPE_TIMESTAMP) {
*(TSKEY *) pCtx->aOutputBuf = pCtx->nStartQueryTimestamp;
} else {
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
if (pCtx->start.key == INT64_MIN) {
assert(pCtx->end.key == INT64_MIN);
return;
}
SET_VAL(pCtx, pCtx->size, 1);
} else if (pInfo->type == TSDB_FILL_SET_VALUE) {
if (type == TSDB_FILL_NULL) {
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
} else if (type == TSDB_FILL_SET_VALUE) {
tVariantDump(&pCtx->param[1], pCtx->aOutputBuf, pCtx->inputType, true);
} else if (pInfo->type == TSDB_FILL_PREV) {
char *data = GET_INPUT_DATA(pCtx, 0);
assignVal(pCtx->aOutputBuf, data, pCtx->outputBytes, pCtx->outputType);
SET_VAL(pCtx, pCtx->size, 1);
} else if (pInfo->type == TSDB_FILL_LINEAR) {
char *data1 = GET_INPUT_DATA(pCtx, 0);
char *data2 = GET_INPUT_DATA(pCtx, 1);
TSKEY key1 = pCtx->ptsList[0];
TSKEY key2 = pCtx->ptsList[1];
SPoint point1 = {.key = key1, .val = data1};
SPoint point2 = {.key = key2, .val = data2};
SPoint point = {.key = pInfo->ts, .val = pCtx->aOutputBuf};
int32_t srcType = pCtx->inputType;
if ((srcType >= TSDB_DATA_TYPE_TINYINT && srcType <= TSDB_DATA_TYPE_BIGINT) ||
srcType == TSDB_DATA_TYPE_TIMESTAMP || srcType == TSDB_DATA_TYPE_DOUBLE) {
point1.val = data1;
point2.val = data2;
if (isNull(data1, srcType) || isNull(data2, srcType)) {
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
} else if (type == TSDB_FILL_PREV) {
if (IS_NUMERIC_TYPE(pCtx->inputType) || pCtx->inputType == TSDB_DATA_TYPE_BOOL) {
SET_TYPED_DATA(pCtx->aOutputBuf, pCtx->inputType, pCtx->start.val);
} else {
taosGetLinearInterpolationVal(pCtx->outputType, &point1, &point2, &point);
assignVal(pCtx->aOutputBuf, pCtx->start.ptr, pCtx->outputBytes, pCtx->inputType);
}
} else if (srcType == TSDB_DATA_TYPE_FLOAT) {
point1.val = data1;
point2.val = data2;
} else if (type == TSDB_FILL_LINEAR) {
SPoint point1 = {.key = pCtx->start.key, .val = &pCtx->start.val};
SPoint point2 = {.key = pCtx->end.key, .val = &pCtx->end.val};
SPoint point = {.key = pCtx->nStartQueryTimestamp, .val = pCtx->aOutputBuf};
if (isNull(data1, srcType) || isNull(data2, srcType)) {
int32_t srcType = pCtx->inputType;
if (IS_NUMERIC_TYPE(srcType)) { // TODO should find the not null data?
if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
} else {
taosGetLinearInterpolationVal(pCtx->outputType, &point1, &point2, &point);
taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
}
} else {
if (srcType == TSDB_DATA_TYPE_BINARY || srcType == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pCtx->aOutputBuf, pCtx->inputType);
} else {
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
}
}
}
SET_VAL(pCtx, 1, 1);
}
static void interp_function(SQLFunctionCtx *pCtx) {
// at this point, the value is existed, return directly
if (pCtx->size > 0) {
// impose the timestamp check
TSKEY key = GET_TS_DATA(pCtx, 0);
if (key == pCtx->nStartQueryTimestamp) {
char *pData = GET_INPUT_DATA(pCtx, 0);
assignVal(pCtx->aOutputBuf, pData, pCtx->inputBytes, pCtx->inputType);
SET_VAL(pCtx, 1, 1);
} else {
interp_function_impl(pCtx);
}
} else { //no qualified data rows and interpolation is required
interp_function_impl(pCtx);
}
SET_VAL(pCtx, pCtx->size, 1);
}
static bool ts_comp_function_setup(SQLFunctionCtx *pCtx) {
......
......@@ -408,7 +408,7 @@ static bool isTopBottomQuery(SQuery *pQuery) {
static bool timeWindowInterpoRequired(SQuery *pQuery) {
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_TWA) {
if (functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_INTERP) {
return true;
}
}
......@@ -818,6 +818,7 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
return num;
}
// TODO decouple the data block and the SQLFunctionCtx
static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pWin, int32_t offset, int32_t forwardStep, TSKEY *tsCol, int32_t numOfTotal) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
......@@ -825,8 +826,8 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
bool hasPrev = pCtx[0].preAggVals.isSet;
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].nStartQueryTimestamp = pWin->skey;
pCtx[k].size = forwardStep;
pCtx[k].nStartQueryTimestamp = pWin->skey;
pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1);
int32_t functionId = pQuery->pExpr1[k].base.functionId;
......@@ -1029,7 +1030,8 @@ static void setNotInterpoWindowKey(SQLFunctionCtx* pCtx, int32_t numOfOutput, in
}
// window start key interpolation
static bool setTimeWindowInterpolationStartTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, int32_t numOfRows, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win) {
static bool setTimeWindowInterpolationStartTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, int32_t numOfRows,
SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win, int16_t type) {
SQuery* pQuery = pRuntimeEnv->pQuery;
TSKEY curTs = tsCols[pos];
......@@ -1118,6 +1120,8 @@ static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
assert(pDataBlock != NULL);
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t fillType = pQuery->fillType;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, 0);
......@@ -1126,7 +1130,7 @@ static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
if (!done) {
int32_t startRowIndex = startPos;
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startRowIndex, pDataBlockInfo->rows, pDataBlock, tsCols, win);
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startRowIndex, pDataBlockInfo->rows, pDataBlock, tsCols, win, fillType);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
}
......@@ -1134,6 +1138,12 @@ static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
}
// point interpolation does not require the end key time window interpolation.
if (isPointInterpoQuery(pQuery)) {
return;
}
// interpolation query does not generate the time window end interpolation
done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
if (!done) {
int32_t endRowIndex = startPos + (forwardStep - 1) * step;
......@@ -1259,7 +1269,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
pCtx[k].nStartQueryTimestamp = pDataBlockInfo->window.skey;
pCtx[k].nStartQueryTimestamp = pQuery->window.skey;
aAggs[functionId].xFunction(&pCtx[k]);
}
}
......@@ -1423,18 +1433,20 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
return true;
}
void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type) {
SQuery* pQuery = pRuntimeEnv->pQuery;
void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv *pRuntimeEnv, SArray *pDataBlock, TSKEY prevTs,
int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey,
int32_t type) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionId != TSDB_FUNC_TWA) {
if (functionId != TSDB_FUNC_TWA && functionId != TSDB_FUNC_INTERP) {
pRuntimeEnv->pCtx[k].start.key = INT64_MIN;
continue;
}
SColIndex* pColIndex = &pQuery->pExpr1[k].base.colInfo;
SColIndex * pColIndex = &pQuery->pExpr1[k].base.colInfo;
int16_t index = pColIndex->colIndex;
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, index);
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, index);
assert(pColInfo->info.colId == pColIndex->colId && curTs != windowKey);
double v1 = 0, v2 = 0, v = 0;
......@@ -1450,7 +1462,9 @@ void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDa
SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
SPoint point2 = (SPoint){.key = curTs, .val = &v2};
SPoint point = (SPoint){.key = windowKey, .val = &v};
taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point);
if (functionId == TSDB_FUNC_TWA) {
taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
if (type == RESULT_ROW_START_INTERP) {
pRuntimeEnv->pCtx[k].start.key = point.key;
......@@ -1459,6 +1473,15 @@ void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDa
pRuntimeEnv->pCtx[k].end.key = point.key;
pRuntimeEnv->pCtx[k].end.val = v;
}
} else {
if (type == RESULT_ROW_START_INTERP) {
pRuntimeEnv->pCtx[k].start.key = prevTs;
pRuntimeEnv->pCtx[k].start.val = v1;
pRuntimeEnv->pCtx[k].end.key = curTs;
pRuntimeEnv->pCtx[k].end.val = v2;
}
}
}
}
......@@ -1796,13 +1819,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
pCtx->preAggVals.statis.max = pBlockInfo->window.ekey;
}
} else if (functionId == TSDB_FUNC_INTERP) {
SResultRowCellInfo* pInfo = GET_RES_INFO(pCtx);
SInterpInfoDetail *pInterpInfo = (SInterpInfoDetail *)GET_ROWCELL_INTERBUF(pInfo);
pInterpInfo->type = (int8_t)pQuery->fillType;
pInterpInfo->ts = pQuery->window.skey;
pInterpInfo->primaryCol = (colId == PRIMARYKEY_TIMESTAMP_COL_INDEX);
pCtx->param[2].i64 = (int8_t) pQuery->fillType;
if (pQuery->fillVal != NULL) {
if (isNull((const char*) &pQuery->fillVal[colIndex], pCtx->inputType)) {
pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
......@@ -2579,7 +2596,6 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf > 0) {
*status = BLK_DATA_ALL_NEEDED;
} else { // check if this data block is required to load
// Calculate all time windows that are overlapping or contain current data block.
// If current data block is contained by all possible time window, do not load current data block.
if (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, pBlockInfo)) {
......@@ -2818,6 +2834,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1;
if (IS_MASTER_SCAN(pRuntimeEnv)) {
pQuery->numOfCheckedBlocks += 1;
}
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
......@@ -3557,7 +3577,7 @@ void setQueryStatus(SQuery *pQuery, int8_t status) {
}
}
bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
bool needRepeatScan(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
bool toContinue = false;
......@@ -3711,7 +3731,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
}
}
if (!needScanDataBlocksAgain(pRuntimeEnv)) {
if (!needRepeatScan(pRuntimeEnv)) {
// restore the status code and jump out of loop
if (pRuntimeEnv->scanFlag == REPEAT_SCAN) {
pQuery->status = qstatus.status;
......@@ -3737,17 +3757,9 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
qDebug("QInfo:%p start to repeat scan data blocks due to query func required, qrange:%"PRId64"-%"PRId64, pQInfo,
cond.twindow.skey, cond.twindow.ekey);
// check if query is killed or not
if (isQueryKilled(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
}
if (!needReverseScan(pQuery)) {
return;
}
if (needReverseScan(pQuery)) {
setEnvBeforeReverseScan(pRuntimeEnv, &qstatus);
// reverse scan from current position
......@@ -3755,6 +3767,62 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
doScanAllDataBlocks(pRuntimeEnv);
clearEnvAfterReverseScan(pRuntimeEnv, &qstatus);
}
if (isPointInterpoQuery(pQuery) && pQuery->numOfCheckedBlocks == 0) {
SArray *prev = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQInfo->memRef, TSDB_PREV_ROW);
SArray *next = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQInfo->memRef, TSDB_NEXT_ROW);
if (prev == NULL || next == NULL) {
return;
}
// setup the pCtx->start/end info and calculate the interpolation value
SColumnInfoData *startTs = taosArrayGet(prev, 0);
SColumnInfoData *endTs = taosArrayGet(next, 0);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx* pCtx = &pRuntimeEnv->pCtx[i];
int32_t functionId = pQuery->pExpr1[i].base.functionId;
SColIndex *pColIndex = &pQuery->pExpr1[i].base.colInfo;
if (!TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) {
aAggs[functionId].xFunction(pCtx);
continue;
}
SColumnInfoData *p = taosArrayGet(prev, pColIndex->colIndex);
SColumnInfoData *n = taosArrayGet(next, pColIndex->colIndex);
assert(p->info.colId == pColIndex->colId);
pCtx->start.key = *(TSKEY *)startTs->pData;
pCtx->end.key = *(TSKEY *)endTs->pData;
if (p->info.type != TSDB_DATA_TYPE_BINARY && p->info.type != TSDB_DATA_TYPE_NCHAR) {
GET_TYPED_DATA(pCtx->start.val, double, p->info.type, p->pData);
GET_TYPED_DATA(pCtx->end.val, double, n->info.type, n->pData);
} else { // string pointer
pCtx->start.ptr = p->pData;
pCtx->end.ptr = n->pData;
}
pCtx->param[2].i64 = (int8_t)pQuery->fillType;
pCtx->nStartQueryTimestamp = pQuery->window.skey;
if (pQuery->fillVal != NULL) {
if (isNull((const char*) &pQuery->fillVal[i], pCtx->inputType)) {
pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
} else { // todo refactor, tVariantCreateFromBinary should handle the NULL value
if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) {
tVariantCreateFromBinary(&pCtx->param[1], (char*) &pQuery->fillVal[i], pCtx->inputBytes, pCtx->inputType);
}
}
}
aAggs[functionId].xFunction(pCtx);
}
}
}
void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
......@@ -4891,6 +4959,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
.loadExternalRows = false,
};
// todo refactor
......@@ -4985,6 +5054,7 @@ STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win) {
.colList = pQuery->colList,
.order = pQuery->order.order,
.numOfCols = pQuery->numOfCols,
.loadExternalRows = false,
};
TIME_WINDOW_COPY(cond.twindow, *win);
......@@ -5727,8 +5797,17 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
}
}
// TODO opt performance
// if (isPointInterpoQuery(pQuery)) {
// SArray *prev = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQInfo->memRef, TSDB_PREV_ROW);
//
// for(int32_t i = 0; i < pQuery->numOfCols; ++i) {
// SColumnInfoData *p = taosArrayGet(prev, i);
// memcpy(pRuntimeEnv->prevRow[i], p->pData, p->info.bytes);
// }
// }
scanOneTableDataBlocks(pRuntimeEnv, newStartKey);
assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_NOT_COMPLETED));
finalizeQueryResult(pRuntimeEnv);
......@@ -5736,7 +5815,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
pQuery->rec.rows = 0;
// not fill or no result generated during this query
if (pQuery->fillType == TSDB_FILL_NONE || pRuntimeEnv->windowResInfo.size == 0) {
if (pQuery->fillType == TSDB_FILL_NONE || pRuntimeEnv->windowResInfo.size == 0 || isPointInterpoQuery(pQuery)) {
// all data scanned, the group by normal column can return
int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->windowResInfo);
if (pQuery->limit.offset > numOfClosed) {
......@@ -5771,7 +5850,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
SQuery * pQuery = pRuntimeEnv->pQuery;
if (hasNotReturnedResults(pRuntimeEnv)) {
if (pQuery->fillType != TSDB_FILL_NONE) {
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
/*
* There are remain results that are not returned due to result interpolation
* So, we do keep in this procedure instead of launching retrieve procedure for next results.
......
......@@ -120,7 +120,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr
point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->col.offset};
point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->index * bytes};
point = (SPoint){.key = pFillInfo->currentKey, .val = val1};
taosGetLinearInterpolationVal(type, &point1, &point2, &point);
taosGetLinearInterpolationVal(&point, type, &point1, &point2, type);
}
} else {
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
......@@ -479,25 +479,13 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes;
}
int32_t taosGetLinearInterpolationVal(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) {
double v1 = -1;
double v2 = -1;
GET_TYPED_DATA(v1, double, type, point1->val);
GET_TYPED_DATA(v2, double, type, point2->val);
int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType) {
double v1 = -1, v2 = -1;
GET_TYPED_DATA(v1, double, inputType, point1->val);
GET_TYPED_DATA(v2, double, inputType, point2->val);
double r = DO_INTERPOLATION(v1, v2, point1->key, point2->key, point->key);
switch(type) {
case TSDB_DATA_TYPE_TINYINT: *(int8_t*) point->val = (int8_t) r;break;
case TSDB_DATA_TYPE_SMALLINT: *(int16_t*) point->val = (int16_t) r;break;
case TSDB_DATA_TYPE_INT: *(int32_t*) point->val = (int32_t) r;break;
case TSDB_DATA_TYPE_BIGINT: *(int64_t*) point->val = (int64_t) r;break;
case TSDB_DATA_TYPE_DOUBLE: *(double*) point->val = (double) r;break;
case TSDB_DATA_TYPE_FLOAT: *(float*) point->val = (float) r;break;
default:
assert(0);
}
SET_TYPED_DATA(point->val, outputType, r);
return TSDB_CODE_SUCCESS;
}
......
此差异已折叠。
......@@ -638,209 +638,293 @@ if $data24 != NULL then
return -1
endi
## interp(*) from stb + group by + fill(prev)
$t = $ts0 + 1000
sql select interp(*) from $stb where ts = $t fill(prev) group by tbname
print ====== 0:$data00, 1:$data01, 2:$data02, 3:$data03, 4:$data04, 5:$data05, 6:$data06, 7:$data07, 8:$data08, 9:$data09
print ====== 0:$data20, 1:$data21, 2:$data22, 3:$data23, 4:$data24, 5:$data25, 6:$data26, 7:$data27, 8:$data28, 9:$data29
if $rows != $tbNum then
## interp(*) from stb + group by + fill(prev)
$t = $ts0 + 1000
sql select interp(*) from $stb where ts = $t fill(prev) group by tbname
print ====== 0:$data00, 1:$data01, 2:$data02, 3:$data03, 4:$data04, 5:$data05, 6:$data06, 7:$data07, 8:$data08, 9:$data09
print ====== 0:$data20, 1:$data21, 2:$data22, 3:$data23, 4:$data24, 5:$data25, 6:$data26, 7:$data27, 8:$data28, 9:$data29
if $rows != $tbNum then
return -1
endi
if $data00 != @18-09-17 09:00:01.000@ then
endi
if $data00 != @18-09-17 09:00:01.000@ then
return -1
endi
if $data01 != 0 then
endi
if $data01 != 0 then
return -1
endi
if $data02 != 0 then
endi
if $data02 != 0 then
return -1
endi
if $data03 != 0.00000 then
endi
if $data03 != 0.00000 then
return -1
endi
if $data04 != 0.000000000 then
endi
if $data04 != 0.000000000 then
return -1
endi
if $data05 != 0 then
endi
if $data05 != 0 then
return -1
endi
if $data06 != 0 then
endi
if $data06 != 0 then
return -1
endi
if $data07 != 1 then
endi
if $data07 != 1 then
return -1
endi
if $data08 != binary0 then
endi
if $data08 != binary0 then
return -1
endi
if $data09 != nchar0 then
endi
if $data09 != nchar0 then
return -1
endi
if $data20 != @18-09-17 09:00:01.000@ then
endi
if $data20 != @18-09-17 09:00:01.000@ then
return -1
endi
if $data21 != 0 then
endi
if $data21 != 0 then
return -1
endi
if $data22 != NULL then
endi
if $data22 != NULL then
return -1
endi
if $data23 != 0.00000 then
endi
if $data23 != 0.00000 then
return -1
endi
if $data24 != NULL then
endi
if $data24 != NULL then
return -1
endi
if $data25 != 0 then
endi
if $data25 != 0 then
return -1
endi
if $data26 != 0 then
endi
if $data26 != 0 then
return -1
endi
if $data27 != 1 then
endi
if $data27 != 1 then
return -1
endi
if $data28 != binary0 then
endi
if $data28 != binary0 then
return -1
endi
if $data29 != nchar0 then
endi
if $data29 != nchar0 then
return -1
endi
endi
## interp(*) from stb + group by + fill(linear)
$t = $ts0 + 1000
sql select interp(*) from $stb where ts = $t fill(linear) group by tbname
print ====== 0:$data00, 1:$data01, 2:$data02, 3:$data03, 4:$data04, 5:$data05, 6:$data06, 7:$data07, 8:$data08, 9:$data09
print ====== 0:$data20, 1:$data21, 2:$data22, 3:$data23, 4:$data24, 5:$data25, 6:$data26, 7:$data27, 8:$data28, 9:$data29
if $rows != $tbNum then
## interp(*) from stb + group by + fill(linear)
$t = $ts0 + 1000
sql select interp(*) from $stb where ts = $t fill(linear) group by tbname
print ====== 0:$data00, 1:$data01, 2:$data02, 3:$data03, 4:$data04, 5:$data05, 6:$data06, 7:$data07, 8:$data08, 9:$data09
print ====== 0:$data20, 1:$data21, 2:$data22, 3:$data23, 4:$data24, 5:$data25, 6:$data26, 7:$data27, 8:$data28, 9:$data29
if $rows != $tbNum then
return -1
endi
if $data00 != @18-09-17 09:00:01.000@ then
endi
if $data00 != @18-09-17 09:00:01.000@ then
return -1
endi
if $data01 != 0 then
endi
if $data01 != 0 then
return -1
endi
if $data02 != 0 then
endi
if $data02 != 0 then
return -1
endi
if $data03 != 0.00167 then
endi
if $data03 != 0.00167 then
return -1
endi
if $data04 != 0.001666667 then
endi
if $data04 != 0.001666667 then
return -1
endi
if $data05 != 0 then
endi
if $data05 != 0 then
return -1
endi
if $data06 != 0 then
endi
if $data06 != 0 then
return -1
endi
if $data07 != NULL then
endi
if $data07 != NULL then
return -1
endi
if $data08 != NULL then
endi
if $data08 != NULL then
return -1
endi
if $data09 != NULL then
endi
if $data09 != NULL then
return -1
endi
if $data20 != @18-09-17 09:00:01.000@ then
endi
if $data20 != @18-09-17 09:00:01.000@ then
return -1
endi
if $data21 != 0 then
endi
if $data21 != 0 then
return -1
endi
if $data22 != NULL then
endi
if $data22 != NULL then
return -1
endi
if $data23 != 0.00167 then
endi
if $data23 != 0.00167 then
return -1
endi
if $data24 != NULL then
endi
if $data24 != NULL then
return -1
endi
if $data25 != 0 then
endi
if $data25 != 0 then
return -1
endi
if $data26 != 0 then
endi
if $data26 != 0 then
return -1
endi
if $data27 != NULL then
endi
if $data27 != NULL then
return -1
endi
if $data28 != NULL then
endi
if $data28 != NULL then
return -1
endi
if $data29 != NULL then
endi
if $data29 != NULL then
return -1
endi
endi
## interp(*) from stb + group by + fill(value)
$t = $ts0 + 1000
sql select interp(*) from $stb where ts = $t fill(value, -1, -2) group by tbname
print ====== 0:$data00, 1:$data01, 2:$data02, 3:$data03, 4:$data04, 5:$data05, 6:$data06, 7:$data07, 8:$data08, 9:$data09
print ====== 0:$data20, 1:$data21, 2:$data22, 3:$data23, 4:$data24, 5:$data25, 6:$data26, 7:$data27, 8:$data28, 9:$data29
if $rows != $tbNum then
$t = $ts0 + 1000
sql select interp(*) from $stb where ts = $t fill(value, -1, -2) group by tbname
print ====== 0:$data00, 1:$data01, 2:$data02, 3:$data03, 4:$data04, 5:$data05, 6:$data06, 7:$data07, 8:$data08, 9:$data09
print ====== 0:$data20, 1:$data21, 2:$data22, 3:$data23, 4:$data24, 5:$data25, 6:$data26, 7:$data27, 8:$data28, 9:$data29
if $rows != $tbNum then
return -1
endi
if $data00 != @18-09-17 09:00:01.000@ then
endi
if $data00 != @18-09-17 09:00:01.000@ then
return -1
endi
if $data01 != -2 then
endi
if $data01 != -2 then
return -1
endi
if $data02 != -2 then
endi
if $data02 != -2 then
return -1
endi
if $data03 != -2.00000 then
endi
if $data03 != -2.00000 then
return -1
endi
if $data04 != -2.000000000 then
endi
if $data04 != -2.000000000 then
return -1
endi
if $data05 != -2 then
endi
if $data05 != -2 then
return -1
endi
if $data06 != -2 then
endi
if $data06 != -2 then
return -1
endi
if $data07 != 1 then
endi
if $data07 != 1 then
return -1
endi
if $data08 != NULL then
endi
if $data08 != NULL then
return -1
endi
if $data09 != NULL then
endi
if $data09 != NULL then
return -1
endi
if $data20 != @18-09-17 09:00:01.000@ then
endi
if $data20 != @18-09-17 09:00:01.000@ then
return -1
endi
if $data21 != -2 then
endi
if $data21 != -2 then
return -1
endi
if $data22 != -2 then
endi
if $data22 != -2 then
return -1
endi
if $data23 != -2.00000 then
endi
if $data23 != -2.00000 then
return -1
endi
if $data24 != -2.000000000 then
endi
if $data24 != -2.000000000 then
return -1
endi
if $data25 != -2 then
endi
if $data25 != -2 then
return -1
endi
if $data26 != -2 then
endi
if $data26 != -2 then
return -1
endi
if $data27 != 1 then
endi
if $data27 != 1 then
return -1
endi
if $data28 != NULL then
endi
if $data28 != NULL then
return -1
endi
if $data29 != NULL then
endi
if $data29 != NULL then
return -1
endi
endi
sql_error select interp(ts,c1) from intp_tb0 where ts>'2018-11-25 19:19:00' and ts<'2018-11-25 19:19:12';
sql select interp(ts,c1) from intp_tb0 where ts>'2018-11-25 19:19:00' and ts<'2018-11-25 19:19:12' interval(1s) fill(linear);
if $rows != 0 then
return -1
endi
sql select interp(c1) from intp_tb0 where ts>'2018-11-25 18:09:00' and ts<'2018-11-25 19:20:12' interval(18m);
if $rows != 0 then
return -1
endi
if $data00 != @2018-11-25 18:30:00.000@ then
return -1
endi
if $data01 != 3 then
return -1
endi
sql select interp(c1,c3,c4,ts) from intp_tb0 where ts>'2018-11-25 18:09:00' and ts<'2018-11-25 19:20:12' interval(18m) fill(linear)
if $rows != 5 then
return -1
endi
if $data00 != @2018-11-25 17:54:00.000@ then
return -1
endi
if $data01 != 0 then
return -1
endi
if $data02 != 0.00000 then
return -1
endi
if $data03 != 0.000000000 then
return -1
endi
if $data04 != @2018-11-25 17:54:00.000@ then
return -1
endi
if $data10 != @2018-11-25 18:12:00.000@ then
return -1
endi
if $data11 != 1 then
return -1
endi
if $data12 != 1.20000 then
return -1
endi
if $data13 != 1.200000000 then
return -1
endi
if $data14 != @2018-11-25 18:12:00.000@ then
return -1
endi
if $data40 != @2018-11-25 19:06:00.000@ then
return -1
endi
if $data41 != 6 then
return -1
endi
if $data42 != 6.60000 then
return -1
endi
if $data43 != 6.600000000 then
return -1
endi
if $data44 != @2018-11-25 19:06:00.000@ then
return -1
endi
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册