提交 1bf7fbbc 编写于 作者: H Haojun Liao

feature(query): support interp function, and do some internal refactor.

上级 ad0595b9
......@@ -887,6 +887,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
SRequestObj *pRequest = res;
pRequest->body.fetchFp = fp;
pRequest->body.param = param;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
if (taos_num_fields(pRequest) == 0) {
......
......@@ -123,4 +123,6 @@ SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond);
int32_t convertFillType(int32_t mode);
#endif // TDENGINE_QUERYUTIL_H
......@@ -565,13 +565,14 @@ typedef struct SStreamSessionAggOperatorInfo {
} SStreamSessionAggOperatorInfo;
typedef struct STimeSliceOperatorInfo {
SOptrBasicInfo binfo;
SSDataBlock* pRes;
STimeWindow win;
SInterval interval;
int64_t current;
SArray* pPrevRow; // SArray<SGroupValue>
SArray* pCols; // SArray<SColumn>
int32_t fillType; // fill type
SColumn tsCol; // primary timestamp column
SExprSupp scalarSup; // scalar calculation
struct SFillColInfo* pFillColInfo; // fill column info
} STimeSliceOperatorInfo;
......@@ -669,7 +670,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
void initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
void cleanupExprSup(SExprSupp* pSup);
int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey);
......@@ -756,8 +757,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, /*SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, const SNodeListNode* pValNode, */SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode,
SExecTaskInfo* pTaskInfo);
......
......@@ -686,4 +686,32 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) {
taosMemoryFree(pCond->twindows);
taosMemoryFree(pCond->colList);
}
\ No newline at end of file
}
int32_t convertFillType(int32_t mode) {
int32_t type = TSDB_FILL_NONE;
switch (mode) {
case FILL_MODE_PREV:
type = TSDB_FILL_PREV;
break;
case FILL_MODE_NONE:
type = TSDB_FILL_NONE;
break;
case FILL_MODE_NULL:
type = TSDB_FILL_NULL;
break;
case FILL_MODE_NEXT:
type = TSDB_FILL_NEXT;
break;
case FILL_MODE_VALUE:
type = TSDB_FILL_SET_VALUE;
break;
case FILL_MODE_LINEAR:
type = TSDB_FILL_LINEAR;
break;
default:
type = TSDB_FILL_NONE;
}
return type;
}
......@@ -2751,7 +2751,11 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
goto _error;
}
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, num, &pOperator->exprSupp.rowEntryInfoOffset);
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, num);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo);
if (pOperator->exprSupp.pCtx == NULL || pInfo->binfo.pRes == NULL) {
......@@ -2759,7 +2763,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
}
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pOperator->exprSupp.pCtx, num, keyBufSize, pTaskInfo->id.str);
code = doInitAggInfoSup(&pInfo->aggSup, pOperator->exprSupp.pCtx, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -2780,12 +2784,9 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
pOperator->name = "SortedMerge";
// pOperator->operatorType = OP_SortedMerge;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = num;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSortedMerge, NULL, NULL, destroySortedMergeOperatorInfo,
......@@ -3405,7 +3406,11 @@ void cleanupAggSup(SAggSupporter* pAggSup) {
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
size_t keyBufSize, const char* pkey) {
initExprSupp(pSup, pExprInfo, numOfCols);
int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
for (int32_t i = 0; i < numOfCols; ++i) {
pSup->pCtx[i].pBuf = pAggSup->pResultBuf;
......@@ -3428,12 +3433,17 @@ void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
initResultRowInfo(&pInfo->resultRowInfo);
}
void initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
pSup->pExprInfo = pExprInfo;
pSup->numOfExprs = numOfExpr;
if (pSup->pExprInfo != NULL) {
pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
if (pSup->pCtx == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
......@@ -3455,7 +3465,10 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
}
initBasicInfo(&pInfo->binfo, pResultBlock);
initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->groupId = INT32_MIN;
pOperator->name = "TableAggregate";
......@@ -3719,13 +3732,15 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pFuncs, NULL, &numOfExpr);
if (pPhyNode->pExprs != NULL) {
SExprSupp* pSup1 = &pInfo->scalarSup;
pSup1->pExprInfo = createExprInfo(pPhyNode->pExprs, NULL, &pSup1->numOfExprs);
pSup1->pCtx = createSqlFunctionCtx(pSup1->pExprInfo, pSup1->numOfExprs, &pSup1->rowEntryInfoOffset);
int32_t num = 0;
SExprInfo* pSExpr = createExprInfo(pPhyNode->pExprs, NULL, &num);
int32_t code = initExprSupp(&pInfo->scalarSup, pSExpr, num);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc);
;
int32_t numOfRows = 4096;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
......@@ -3742,15 +3757,14 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
pInfo->binfo.pRes = pResBlock;
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
pInfo->binfo.pRes = pResBlock;
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
pOperator->name = "IndefinitOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfExpr;
pOperator->pTaskInfo = pTaskInfo;
......@@ -3791,34 +3805,6 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
}
}
static int32_t convertFillType(int32_t mode) {
int32_t type = TSDB_FILL_NONE;
switch (mode) {
case FILL_MODE_PREV:
type = TSDB_FILL_PREV;
break;
case FILL_MODE_NONE:
type = TSDB_FILL_NONE;
break;
case FILL_MODE_NULL:
type = TSDB_FILL_NULL;
break;
case FILL_MODE_NEXT:
type = TSDB_FILL_NEXT;
break;
case FILL_MODE_VALUE:
type = TSDB_FILL_SET_VALUE;
break;
case FILL_MODE_LINEAR:
type = TSDB_FILL_LINEAR;
break;
default:
type = TSDB_FILL_NONE;
}
return type;
}
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult,
SExecTaskInfo* pTaskInfo) {
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
......@@ -3848,8 +3834,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = num;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = num;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
......@@ -4269,6 +4255,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, false, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else {
ASSERT(0);
}
......
......@@ -387,11 +387,12 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pInfo->pGroupCols = pGroupColList;
pInfo->pCondition = pCondition;
pInfo->scalarSup.pExprInfo = pScalarExprInfo;
pInfo->scalarSup.numOfExprs = numOfScalarExpr;
pInfo->scalarSup.pCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->scalarSup.rowEntryInfoOffset);
int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -697,10 +698,12 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
if (pPartNode->pExprs != NULL) {
pInfo->scalarSup.numOfExprs = 0;
pInfo->scalarSup.pExprInfo = createExprInfo(pPartNode->pExprs, NULL, &pInfo->scalarSup.numOfExprs);
pInfo->scalarSup.pCtx = createSqlFunctionCtx(
pInfo->scalarSup.pExprInfo, pInfo->scalarSup.numOfExprs, &pInfo->scalarSup.rowEntryInfoOffset);
int32_t num = 0;
SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num);
int32_t code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
......
......@@ -686,7 +686,10 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols);
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pOperator->name = "DataBlockDistScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
......@@ -1869,7 +1872,10 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->pTableList = pTableListInfo;
pInfo->pColMatchInfo = colList;
......
......@@ -1705,7 +1705,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
return (rows == 0) ? NULL : pBInfo->pRes;
}
static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock) {
static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
......@@ -1715,21 +1715,140 @@ static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, i);
pkey->isNull = false;
char* val = colDataGetData(pColInfoData, i);
char* val = colDataGetData(pColInfoData, rowIndex);
memcpy(pkey->pData, val, pkey->bytes);
}
}
}
static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pBlock,
int32_t rowIndex, SSDataBlock* pResBlock) {
int32_t rows = pResBlock->info.rows;
// todo set the correct primary timestamp column
// output the result
for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
switch (pSliceInfo->fillType) {
case TSDB_FILL_NULL:
colDataAppendNULL(pDst, rows);
break;
case TSDB_FILL_SET_VALUE: {
SVariant* pVar = &pSliceInfo->pFillColInfo[j].fillVal;
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0;
GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
colDataAppend(pDst, rows, (char*)&v, false);
} else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
double v = 0;
GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
colDataAppend(pDst, rows, (char*)&v, false);
} else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
int64_t v = 0;
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
colDataAppend(pDst, rows, (char*)&v, false);
}
} break;
case TSDB_FILL_LINEAR:
#if 0
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs
|| pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
// goto interp_exit;
}
double v1 = -1, v2 = -1;
GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val);
GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val);
SPoint point1 = {.key = ts, .val = &v1};
SPoint point2 = {.key = nextTs, .val = &v2};
SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput};
int32_t srcType = pCtx->inputType;
if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
} else {
bool exceedMax = false, exceedMin = false;
taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin);
if (exceedMax || exceedMin) {
__compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0);
if (func(&pCtx->start.val, &pCtx->end.val) <= 0) {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val);
} else {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val);
}
}
}
#endif
break;
case TSDB_FILL_PREV: {
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
colDataAppend(pDst, rows, pkey->pData, false);
} break;
case TSDB_FILL_NEXT: {
char* p = colDataGetData(pSrc, rowIndex);
colDataAppend(pDst, rows, p, colDataIsNull_s(pSrc, rowIndex));
} break;
case TSDB_FILL_NONE:
default:
break;
}
}
pResBlock->info.rows += 1;
}
static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
if (pInfo->pPrevRow != NULL) {
return TSDB_CODE_SUCCESS;
}
pInfo->pPrevRow = taosArrayInit(4, sizeof(SGroupKeys));
if (pInfo->pPrevRow == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t numOfCols = pBlock->info.numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
SGroupKeys key = {0};
key.bytes = pColInfo->info.bytes;
key.type = pColInfo->info.type;
key.isNull = false;
key.pData = taosMemoryCalloc(1, pColInfo->info.bytes);
taosArrayPush(pInfo->pPrevRow, &key);
}
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
SSDataBlock* pResBlock = pSliceInfo->binfo.pRes;
SSDataBlock* pResBlock = pSliceInfo->pRes;
SExprSupp* pSup = &pOperator->exprSupp;
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
// if (pOperator->status == OP_RES_TO_RETURN) {
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
// if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) {
......@@ -1750,10 +1869,15 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
break;
}
int32_t code = initPrevRowsKeeper(pSliceInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, MAIN_SCAN, true);
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, 0);
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t ts = *(int64_t*) colDataGetData(pTsCol, i);
......@@ -1771,107 +1895,45 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
}
numOfRows += 1;
doKeepPrevRows(pSliceInfo, pBlock, i);
pSliceInfo->current += taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
}
} else if (ts < pSliceInfo->current) {
if (i != pBlock->info.window.ekey) {
if (i < pBlock->info.rows - 1) {
int64_t nextTs = *(int64_t*) colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) {
// output the result
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[j];
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlot);
switch (pSliceInfo->fillType) {
case TSDB_FILL_NULL:
colDataAppendNULL(pDst, numOfRows);
break;
case TSDB_FILL_SET_VALUE: {
SVariant* pVar = &pSliceInfo->pFillColInfo[i].fillVal;
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0;
GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
colDataAppend(pDst, numOfRows, (char*)&v, false);
} else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
double v = 0;
GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
colDataAppend(pDst, numOfRows, (char*)&v, false);
} else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
int64_t v = 0;
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
colDataAppend(pDst, numOfRows, (char*)&v, false);
}
}
break;
case TSDB_FILL_LINEAR:
#if 0
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs
|| pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
// goto interp_exit;
}
double v1 = -1, v2 = -1;
GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val);
GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val);
SPoint point1 = {.key = ts, .val = &v1};
SPoint point2 = {.key = nextTs, .val = &v2};
SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput};
int32_t srcType = pCtx->inputType;
if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
} else {
bool exceedMax = false, exceedMin = false;
taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin);
if (exceedMax || exceedMin) {
__compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0);
if (func(&pCtx->start.val, &pCtx->end.val) <= 0) {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val);
} else {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val);
}
}
}
#endif
break;
case TSDB_FILL_PREV: {
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
colDataAppend(pDst, numOfRows, pkey->pData, false);
} break;
case TSDB_FILL_NEXT: {
} break;
case TSDB_FILL_NONE:
default:
break;
}
pSliceInfo->current +=
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, i, pResBlock);
numOfRows += 1;
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
}
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
}
} else {
// ignore current row, and do nothing
}
} else { // it is the last row of current block
doKeepPrevRows(pSliceInfo, pBlock);
doKeepPrevRows(pSliceInfo, pBlock, i);
}
} else { // ts > pSliceInfo->current
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, i, pResBlock);
numOfRows += 1;
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
}
}
}
......@@ -1886,59 +1948,46 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
return pResBlock->info.rows == 0 ? NULL : pResBlock;
}
static int32_t initTimesliceInfo(STimeSliceOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfCols) {
pInfo->pPrevRow = taosArrayInit(4, sizeof(SGroupKeys));
pInfo->pCols = taosArrayInit(4, sizeof(SColumn));
if (pInfo->pPrevRow == NULL || pInfo->pCols == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = pCtx[i].pExpr;
SFunctParam* pParam = &pExpr->base.pParam[0];
SColumn c = *pParam->pCol;
taosArrayPush(pInfo->pCols, &c);
SGroupKeys key = {0};
key.bytes = c.bytes;
key.type = c.type;
key.isNull = false;
key.pData = taosMemoryCalloc(1, c.bytes);
taosArrayPush(pInfo->pPrevRow, &key);
}
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pPhyNode, SExecTaskInfo* pTaskInfo) {
STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL || pInfo == NULL) {
goto _error;
}
SInterpFuncPhysiNode* pInterpPhyNode = (SInterpFuncPhysiNode*)pPhyNode;
SExprSupp* pSup = &pOperator->exprSupp;
int32_t code = initTimesliceInfo(pInfo, pSup->pCtx, numOfCols);
int32_t numOfExprs = 0;
SExprInfo* pExprInfo = createExprInfo(pInterpPhyNode->pFuncs, NULL, &numOfExprs);
int32_t code = initExprSupp(pSup, pExprInfo, numOfExprs);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfCols, pValNode);
if (pInterpPhyNode->pExprs != NULL) {
int32_t num = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pInterpPhyNode->pExprs, NULL, &num);
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries);
pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
initResultSizeInfo(pOperator, 4096);
pInfo->binfo.pRes = pResultBlock;
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues);
pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pInfo->win = pInterpPhyNode->timeRange;
pInfo->interval.interval = pInterpPhyNode->interval;
pInfo->current = pInfo->win.skey;
pOperator->name = "TimeSliceOperator";
// pOperator->operatorType = OP_AllTimeWindow;
pOperator->blocking = true;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
......@@ -2432,7 +2481,11 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
}
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock) {
initExprSupp(pSup, pExprInfo, numOfCols);
int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
initBasicInfo(pBasicInfo, pResultBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册