提交 c56be8f8 编写于 作者: H Haojun Liao

enh(query): add interp function.

上级 531b85ce
......@@ -639,9 +639,10 @@ typedef struct STimeSliceOperatorInfo {
STimeWindow win;
SInterval interval;
int64_t current;
SGroupResInfo groupResInfo; // multiple results build supporter
SArray* pPrevRow; // SArray<SGroupValue>
SArray* pCols; // SArray<SColumn>
int32_t fillType; // fill type
struct SFillColInfo* pFillColInfo; // fill column info
} STimeSliceOperatorInfo;
typedef struct SStateWindowOperatorInfo {
......@@ -841,7 +842,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
......
......@@ -28,8 +28,6 @@ struct SSDataBlock;
typedef struct SFillColInfo {
SExprInfo *pExpr;
// SResSchema schema;
// int16_t functionId; // sql function id
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
int16_t tagIndex; // index of current tag in SFillTagColInfo array list
SVariant fillVal;
......
......@@ -1128,40 +1128,6 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
pCtx->increase = false;
pCtx->param = pFunct->pParam;
// for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
// // set the order information for top/bottom query
// int32_t functionId = pCtx->functionId;
// if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
// int32_t f = getExprFunctionId(&pExpr[0]);
// assert(f == FUNCTION_TS || f == FUNCTION_TS_DUMMY);
//
// // pCtx->param[2].i = pQueryAttr->order.order;
// // pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
// // pCtx->param[3].i = functionId;
// // pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT;
//
// // pCtx->param[1].i = pQueryAttr->order.col.info.colId;
// } else if (functionId == FUNCTION_INTERP) {
// // pCtx->param[2].i = (int8_t)pQueryAttr->fillType;
// // if (pQueryAttr->fillVal != NULL) {
// // if (isNull((const char *)&pQueryAttr->fillVal[i], pCtx->inputType)) {
// // pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
// // } else { // todo refactor, taosVariantCreateFromBinary should handle the NULL value
// // if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) {
// // taosVariantCreateFromBinary(&pCtx->param[1], (char *)&pQueryAttr->fillVal[i],
// pCtx->inputBytes, pCtx->inputType);
// // }
// // }
// // }
// } else if (functionId == FUNCTION_TWA) {
// // pCtx->param[1].i = pQueryAttr->window.skey;
// // pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT;
// // pCtx->param[2].i = pQueryAttr->window.ekey;
// // pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
// } else if (functionId == FUNCTION_ARITHM) {
// // pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i);
// }
// }
}
for (int32_t i = 1; i < numOfOutput; ++i) {
......@@ -4317,6 +4283,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
taosMemoryFree(pInfo->pFillInfo);
taosMemoryFree(pInfo->p);
return TSDB_CODE_OUT_OF_MEMORY;
} else {
return TSDB_CODE_SUCCESS;
......
......@@ -332,12 +332,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pInfo->binfo.resultRowInfo);
// if (!stableQuery) { // finalize include the update of result rows
// finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs);
// } else {
// updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfExprs, &pInfo->binfo.resultRowInfo,
// pInfo->binfo.rowCellInfoOffset);
// }
#if 0
if(pOperator->fpSet.encodeResultRow){
char *result = NULL;
......
......@@ -17,6 +17,7 @@
#include "functionMgt.h"
#include "tdatablock.h"
#include "ttime.h"
#include "tfill.h"
typedef enum SResultTsInterpType {
RESULT_ROW_START_INTERP = 1,
......@@ -1689,6 +1690,22 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
return (rows == 0) ? NULL : pBInfo->pRes;
}
static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
// null data should not be kept since it can not be used to perform interpolation
if (!colDataIsNull_s(pColInfoData, i)) {
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, i);
pkey->isNull = false;
char* val = colDataGetData(pColInfoData, i);
memcpy(pkey->pData, val, pkey->bytes);
}
}
}
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
......@@ -1697,19 +1714,20 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
SSDataBlock* pResBlock = pSliceInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) {
// doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pResBlock->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
return pResBlock;
}
// if (pOperator->status == OP_RES_TO_RETURN) {
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
// if (pResBlock->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) {
// doSetOperatorCompleted(pOperator);
// }
//
// return pResBlock;
// }
int32_t order = TSDB_ORDER_ASC;
SInterval* pInterval = &pSliceInfo->interval;
SOperatorInfo* downstream = pOperator->pDownstream[0];
int32_t numOfRows = 0;
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
......@@ -1724,7 +1742,19 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
int64_t ts = *(int64_t*) colDataGetData(pTsCol, i);
if (ts == pSliceInfo->current) {
// output the result
for(int32_t j = 0; j < pOperator->numOfExprs; ++j) {
SExprInfo* pExprInfo = &pOperator->pExpr[j];
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlot);
char* v = colDataGetData(pSrc, i);
colDataAppend(pDst, numOfRows, v, false);
}
numOfRows += 1;
pSliceInfo->current += taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
......@@ -1736,32 +1766,104 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
int64_t nextTs = *(int64_t*) colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) {
// output the result
pSliceInfo->current += taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
for (int32_t j = 0; j < pOperator->numOfExprs; ++j) {
SExprInfo* pExprInfo = &pOperator->pExpr[j];
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlot);
switch (pSliceInfo->fillType) {
case TSDB_FILL_NULL:
colDataAppendNULL(pDst, numOfRows);
break;
case TSDB_FILL_SET_VALUE: {
SVariant* pVar = &pSliceInfo->pFillColInfo[i].fillVal;
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0;
GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
colDataAppend(pDst, numOfRows, (char*)&v, false);
} else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
double v = 0;
GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
colDataAppend(pDst, numOfRows, (char*)&v, false);
} else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
int64_t v = 0;
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
colDataAppend(pDst, numOfRows, (char*)&v, false);
}
}
break;
case TSDB_FILL_LINEAR:
#if 0
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs
|| pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
// goto interp_exit;
}
double v1 = -1, v2 = -1;
GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val);
GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val);
SPoint point1 = {.key = ts, .val = &v1};
SPoint point2 = {.key = nextTs, .val = &v2};
SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput};
int32_t srcType = pCtx->inputType;
if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
} else {
bool exceedMax = false, exceedMin = false;
taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin);
if (exceedMax || exceedMin) {
__compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0);
if (func(&pCtx->start.val, &pCtx->end.val) <= 0) {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val);
} else {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val);
}
}
}
#endif
break;
case TSDB_FILL_PREV: {
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
colDataAppend(pDst, numOfRows, pkey->pData, false);
} break;
case TSDB_FILL_NEXT: {
} break;
case TSDB_FILL_NONE:
default:
break;
}
pSliceInfo->current +=
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
}
}
} else {
// keep current row
// ignore current row, and do nothing
}
} else { // it is the last row of current block
// keep current row
doKeepPrevRows(pSliceInfo, pBlock);
}
}
}
}
// restore the value
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pSliceInfo->binfo.resultRowInfo);
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
// finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfExprs);
// initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo);
// doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes);
if (pResBlock->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) {
if (pResBlock->info.rows == 0) {
pOperator->status = OP_EXEC_DONE;
}
......@@ -1796,7 +1898,7 @@ static int32_t initTimesliceInfo(STimeSliceOperatorInfo* pInfo, SqlFunctionCtx*
}
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo) {
SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo) {
STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL || pInfo == NULL) {
......@@ -1809,8 +1911,9 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
}
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfCols, pValNode);
pInfo->binfo.pRes = pResultBlock;
pInfo->binfo.pRes = pResultBlock;
pOperator->name = "TimeSliceOperator";
// pOperator->operatorType = OP_AllTimeWindow;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册