提交 599ea89a 编写于 作者: 5 54liuyao

stream scan

上级 557531da
......@@ -45,6 +45,12 @@ enum {
STREAM_TRIGGER__BY_EVENT_TIME,
};
typedef enum EStreamType {
STREAM_NORMAL = 1,
STREAM_INVERT,
STREAM_INVALID,
} EStreamType;
typedef struct {
uint32_t numOfTables;
SArray* pGroupList;
......@@ -71,6 +77,7 @@ typedef struct SDataBlockInfo {
int16_t numOfCols;
int16_t hasVarCol;
int32_t capacity;
EStreamType type;
} SDataBlockInfo;
typedef struct SSDataBlock {
......
......@@ -168,6 +168,9 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet);
int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
bool fmIsInvertible(int32_t funcId);
#ifdef __cplusplus
}
......
......@@ -36,7 +36,7 @@ typedef struct SUpdateInfo {
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark);
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
bool isUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts);
bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts);
void updateInfoDestroy(SUpdateInfo *pInfo);
#ifdef __cplusplus
......
......@@ -38,6 +38,7 @@ extern "C" {
#include "tlockfree.h"
#include "tmsg.h"
#include "tpagedbuf.h"
#include "tstreamUpdate.h"
#include "vnode.h"
#include "executorInt.h"
......@@ -386,6 +387,9 @@ typedef struct SStreamBlockScanInfo {
void* readerHandle; // stream block reader handle
SArray* pColMatchInfo; //
SNode* pCondition;
SArray* tsArray;
SUpdateInfo* pUpdateInfo;
int32_t primaryTsIndex; // primary time stamp slot id
} SStreamBlockScanInfo;
typedef struct SSysTableScanInfo {
......@@ -446,6 +450,7 @@ typedef struct SIntervalAggOperatorInfo {
SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
STimeWindowAggSupp twAggSup;
struct SFillInfo* pFillInfo; // fill info
bool invertible;
} SIntervalAggOperatorInfo;
typedef struct SAggOperatorInfo {
......
......@@ -198,6 +198,7 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
pBlock->info.blockId = pNode->dataBlockId;
pBlock->info.rowSize = pNode->totalRowSize; // todo ??
pBlock->info.type = STREAM_INVALID;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData idata = {{0}};
......
......@@ -512,6 +512,25 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
taosArrayClear(pInfo->pBlockLists);
}
static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo) {
SColumnInfoData* pColDataInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsIndex);
TSKEY* ts = (TSKEY*)pColDataInfo->pData;
for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) {
if (updateInfoIsUpdated(pInfo->pUpdateInfo, pInfo->pRes->info.uid, ts[i])) {
taosArrayPush(pInfo->tsArray, ts+i);
}
}
if (taosArrayGetSize(pInfo->tsArray) > 0) {
//TODO(liuyao) get from tsdb
// SSDataBlock* p = createOneDataBlock(pInfo->pRes, true);
// p->info.type = STREAM_INVERT;
// taosArrayClear(pInfo->tsArray);
// return p;
return NULL;
}
return NULL;
}
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -523,8 +542,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
return NULL;
}
if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) {
size_t total = taosArrayGetSize(pInfo->pBlockLists);
if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) {
if (pInfo->validBlockIndex >= total) {
doClearBufferedBlocks(pInfo);
pOperator->status = OP_EXEC_DONE;
......@@ -534,6 +553,14 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
int32_t current = pInfo->validBlockIndex++;
return taosArrayGetP(pInfo->pBlockLists, current);
} else {
if (total > 0) {
ASSERT(total == 2);
SSDataBlock* pRes = taosArrayGetP(pInfo->pBlockLists, 0);
SSDataBlock* pUpRes = taosArrayGetP(pInfo->pBlockLists, 1);
blockDataDestroy(pUpRes);
taosArrayClear(pInfo->pBlockLists);
return pRes;
}
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
blockDataCleanup(pInfo->pRes);
......@@ -554,6 +581,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
pInfo->pRes->info.groupId = groupId;
pInfo->pRes->info.rows = numOfRows;
pInfo->pRes->info.uid = uid;
pInfo->pRes->info.type = STREAM_NORMAL;
int32_t numOfCols = pInfo->pRes->info.numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
......@@ -598,6 +626,13 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
if (rows == 0) {
pOperator->status = OP_EXEC_DONE;
} else {
SSDataBlock* upRes = getUpdateDataBlock(pInfo);
if (upRes) {
taosArrayPush(pInfo->pBlockLists, &(pInfo->pRes));
taosArrayPush(pInfo->pBlockLists, &upRes);
return upRes;
}
}
return (rows == 0) ? NULL : pInfo->pRes;
......@@ -636,6 +671,21 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
goto _error;
}
pInfo->tsArray = taosArrayInit(4, sizeof(TSKEY));
if (pInfo->tsArray == NULL) {
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
}
pInfo->primaryTsIndex = 0; //TODO(liuyao) get it from physical plan
pInfo->pUpdateInfo = updateInfoInit(60000, 0, 100); //TODO(liuyao) get it from physical plan
if (pInfo->pUpdateInfo == NULL) {
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
}
pInfo->readerHandle = streamReadHandle;
pInfo->pRes = pResBlock;
pInfo->pCondition = pCondition;
......
#include "ttime.h"
#include "tdatablock.h"
#include "executorimpl.h"
#include "functionMgt.h"
typedef enum SResultTsInterpType {
RESULT_ROW_START_INTERP = 1,
......@@ -979,6 +980,15 @@ static void finalizeUpdatedResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SArr
releaseBufPage(pBuf, bufPage);
}
}
static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) {
for ( int i = 0; i < num; i++) {
if (type == STREAM_INVERT) {
fmSetInvertFunc(pCtx[i].functionId, &(pCtx[i].fpSet));
} else if (type == STREAM_NORMAL){
fmSetNormalFunc(pCtx[i].functionId, &(pCtx[i].fpSet));
}
}
}
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SIntervalAggOperatorInfo* pInfo = pOperator->info;
......@@ -1016,6 +1026,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
if (pInfo->invertible) {
setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type);
}
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
}
......@@ -1043,6 +1056,15 @@ void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) {
cleanupAggSup(&pInfo->aggSup);
}
bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
for (int32_t i = 0; i < numOfCols; i++) {
if (!fmIsInvertible(pFCtx[i].functionId)) {
return false;
}
}
return true;
}
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
......@@ -1068,6 +1090,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols);
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) {
......
......@@ -36,6 +36,7 @@ typedef struct SBuiltinFuncDefinition {
FExecProcess processFunc;
FScalarExecProcess sprocessFunc;
FExecFinalize finalizeFunc;
FExecProcess invertFunc;
} SBuiltinFuncDefinition;
extern const SBuiltinFuncDefinition funcMgtBuiltins[];
......
......@@ -30,10 +30,12 @@ int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t countFunction(SqlFunctionCtx *pCtx);
int32_t countInvertFunction(SqlFunctionCtx *pCtx);
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t sumFunction(SqlFunctionCtx *pCtx);
int32_t sumInvertFunction(SqlFunctionCtx *pCtx);
bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
......@@ -45,11 +47,13 @@ bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t avgFunction(SqlFunctionCtx* pCtx);
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t avgInvertFunction(SqlFunctionCtx* pCtx);
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t stddevFunction(SqlFunctionCtx* pCtx);
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t stddevInvertFunction(SqlFunctionCtx* pCtx);
bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
......
......@@ -473,7 +473,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getCountFuncEnv,
.initFunc = functionSetup,
.processFunc = countFunction,
.finalizeFunc = functionFinalize
.finalizeFunc = functionFinalize,
.invertFunc = countInvertFunction
},
{
.name = "sum",
......@@ -484,7 +485,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getSumFuncEnv,
.initFunc = functionSetup,
.processFunc = sumFunction,
.finalizeFunc = functionFinalize
.finalizeFunc = functionFinalize,
.invertFunc = sumInvertFunction
},
{
.name = "min",
......@@ -516,7 +518,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getStddevFuncEnv,
.initFunc = stddevFunctionSetup,
.processFunc = stddevFunction,
.finalizeFunc = stddevFinalize
.finalizeFunc = stddevFinalize,
.invertFunc = stddevInvertFunction
},
{
.name = "avg",
......@@ -526,7 +529,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getAvgFuncEnv,
.initFunc = avgFunctionSetup,
.processFunc = avgFunction,
.finalizeFunc = avgFinalize
.finalizeFunc = avgFinalize,
.invertFunc = avgInvertFunction
},
{
.name = "percentile",
......
......@@ -209,11 +209,7 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return true;
}
/*
* count function does need the finalize, if data is missing, the default value, which is 0, is used
* count function does not use the pCtx->interResBuf to keep the intermediate buffer
*/
int32_t countFunction(SqlFunctionCtx* pCtx) {
static FORCE_INLINE int32_t getNumofElem(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0;
/*
......@@ -240,7 +236,14 @@ int32_t countFunction(SqlFunctionCtx* pCtx) {
numOfElem = pInput->numOfRows;
}
}
return numOfElem;
}
/*
* count function does need the finalize, if data is missing, the default value, which is 0, is used
* count function does not use the pCtx->interResBuf to keep the intermediate buffer
*/
int32_t countFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = getNumofElem(pCtx);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
*((int64_t*)buf) += numOfElem;
......@@ -249,6 +252,17 @@ int32_t countFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
int32_t countInvertFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = getNumofElem(pCtx);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
*((int64_t*)buf) -= numOfElem;
SET_VAL(pResInfo, *((int64_t*)buf), 1);
return TSDB_CODE_SUCCESS;
}
#define LIST_ADD_N(_res, _col, _start, _rows, _t, numOfElem) \
do { \
_t* d = (_t*)(_col->pData); \
......@@ -261,6 +275,18 @@ int32_t countFunction(SqlFunctionCtx* pCtx) {
} \
} while (0)
#define LIST_SUB_N(_res, _col, _start, _rows, _t, numOfElem) \
do { \
_t* d = (_t*)(_col->pData); \
for (int32_t i = (_start); i < (_rows) + (_start); ++i) { \
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
continue; \
}; \
(_res) -= (d)[i]; \
(numOfElem)++; \
} \
} while (0)
int32_t sumFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0;
......@@ -320,6 +346,65 @@ int32_t sumFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
int32_t sumInvertFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0;
// Only the pre-computing information loaded and actual data does not loaded
SInputColumnInfoData* pInput = &pCtx->input;
SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
int32_t type = pInput->pData[0]->info.type;
SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
if (pInput->colDataAggIsSet) {
numOfElem = pInput->numOfRows - pAgg->numOfNull;
ASSERT(numOfElem >= 0);
if (IS_SIGNED_NUMERIC_TYPE(type)) {
pSumRes->isum -= pAgg->sum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
pSumRes->usum -= pAgg->sum;
} else if (IS_FLOAT_TYPE(type)) {
pSumRes->dsum -= GET_DOUBLE_VAL((const char*)&(pAgg->sum));
}
} else { // computing based on the true data block
SColumnInfoData* pCol = pInput->pData[0];
int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) {
LIST_SUB_N(pSumRes->isum, pCol, start, numOfRows, int8_t, numOfElem);
} else if (type == TSDB_DATA_TYPE_SMALLINT) {
LIST_SUB_N(pSumRes->isum, pCol, start, numOfRows, int16_t, numOfElem);
} else if (type == TSDB_DATA_TYPE_INT) {
LIST_SUB_N(pSumRes->isum, pCol, start, numOfRows, int32_t, numOfElem);
} else if (type == TSDB_DATA_TYPE_BIGINT) {
LIST_SUB_N(pSumRes->isum, pCol, start, numOfRows, int64_t, numOfElem);
}
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
if (type == TSDB_DATA_TYPE_UTINYINT) {
LIST_SUB_N(pSumRes->usum, pCol, start, numOfRows, uint8_t, numOfElem);
} else if (type == TSDB_DATA_TYPE_USMALLINT) {
LIST_SUB_N(pSumRes->usum, pCol, start, numOfRows, uint16_t, numOfElem);
} else if (type == TSDB_DATA_TYPE_UINT) {
LIST_SUB_N(pSumRes->usum, pCol, start, numOfRows, uint32_t, numOfElem);
} else if (type == TSDB_DATA_TYPE_UBIGINT) {
LIST_SUB_N(pSumRes->usum, pCol, start, numOfRows, uint64_t, numOfElem);
}
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
LIST_SUB_N(pSumRes->dsum, pCol, start, numOfRows, double, numOfElem);
} else if (type == TSDB_DATA_TYPE_FLOAT) {
LIST_SUB_N(pSumRes->dsum, pCol, start, numOfRows, float, numOfElem);
}
}
// data in the check operation are all null, not output
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
return TSDB_CODE_SUCCESS;
}
bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SSumRes);
return true;
......@@ -451,6 +536,69 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
#define LIST_AVG_N(sumT, T) \
do { \
T* plist = (T*)pCol->pData; \
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { \
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { \
continue; \
} \
\
numOfElem += 1; \
pAvgRes->count -= 1; \
sumT -= plist[i]; \
} \
} while (0)
int32_t avgInvertFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0;
// Only the pre-computing information loaded and actual data does not loaded
SInputColumnInfoData* pInput = &pCtx->input;
int32_t type = pInput->pData[0]->info.type;
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
// computing based on the true data block
SColumnInfoData* pCol = pInput->pData[0];
int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
switch (type) {
case TSDB_DATA_TYPE_TINYINT: {
LIST_AVG_N(pAvgRes->sum.isum, int8_t);
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
LIST_AVG_N(pAvgRes->sum.isum, int16_t);
break;
}
case TSDB_DATA_TYPE_INT: {
LIST_AVG_N(pAvgRes->sum.isum, int32_t);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
LIST_AVG_N(pAvgRes->sum.isum, int64_t);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
LIST_AVG_N(pAvgRes->sum.dsum, float);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
LIST_AVG_N(pAvgRes->sum.dsum, double);
break;
}
default:
break;
}
// data in the check operation are all null, not output
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
return TSDB_CODE_SUCCESS;
}
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SInputColumnInfoData* pInput = &pCtx->input;
int32_t type = pInput->pData[0]->info.type;
......@@ -885,6 +1033,69 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
#define LIST_STDDEV_SUB_N(sumT, T) \
do { \
T* plist = (T*)pCol->pData; \
for (int32_t i = start; i < numOfRows + start; ++i) { \
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { \
continue; \
} \
numOfElem += 1; \
pStddevRes->count -= 1; \
sumT -= plist[i]; \
pStddevRes->quadraticISum -= plist[i] * plist[i]; \
} \
} while (0)
int32_t stddevInvertFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0;
// Only the pre-computing information loaded and actual data does not loaded
SInputColumnInfoData* pInput = &pCtx->input;
int32_t type = pInput->pData[0]->info.type;
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
// computing based on the true data block
SColumnInfoData* pCol = pInput->pData[0];
int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
switch (type) {
case TSDB_DATA_TYPE_TINYINT: {
LIST_STDDEV_SUB_N(pStddevRes->isum, int8_t);
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
LIST_STDDEV_SUB_N(pStddevRes->isum, int16_t);
break;
}
case TSDB_DATA_TYPE_INT: {
LIST_STDDEV_SUB_N(pStddevRes->isum, int32_t);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
LIST_STDDEV_SUB_N(pStddevRes->isum, int64_t);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
LIST_STDDEV_SUB_N(pStddevRes->dsum, float);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
LIST_STDDEV_SUB_N(pStddevRes->dsum, double);
break;
}
default:
break;
}
// data in the check operation are all null, not output
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
return TSDB_CODE_SUCCESS;
}
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SInputColumnInfoData* pInput = &pCtx->input;
int32_t type = pInput->pData[0]->info.type;
......
......@@ -177,3 +177,35 @@ void fmFuncMgtDestroy() {
taosHashCleanup(m);
}
}
int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet) {
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return TSDB_CODE_FAILED;
}
pFpSet->process = funcMgtBuiltins[funcId].invertFunc;
return TSDB_CODE_SUCCESS;
}
int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet) {
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return TSDB_CODE_FAILED;
}
pFpSet->process = funcMgtBuiltins[funcId].processFunc;
return TSDB_CODE_SUCCESS;
}
bool fmIsInvertible(int32_t funcId) {
bool res = false;
switch (funcMgtBuiltins[funcId].type) {
case FUNCTION_TYPE_COUNT:
case FUNCTION_TYPE_SUM:
case FUNCTION_TYPE_STDDEV:
case FUNCTION_TYPE_AVG:
res = true;
break;
default:
break;
}
return res;
}
......@@ -138,7 +138,7 @@ static SScalableBf* getSBf(SUpdateInfo *pInfo, TSKEY ts) {
return res;
}
bool isUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
int32_t res = TSDB_CODE_FAILED;
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
SScalableBf* pSBf = getSBf(pInfo, ts);
......
......@@ -9,40 +9,40 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
int64_t interval = 20 * 1000;
int64_t watermark = 10 * 60 * 1000;
SUpdateInfo *pSU = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
GTEST_ASSERT_EQ(isUpdated(pSU,1, 0), true);
GTEST_ASSERT_EQ(isUpdated(pSU,1, -1), true);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,1, 0), true);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,1, -1), true);
for(int i=0; i < 1024; i++) {
GTEST_ASSERT_EQ(isUpdated(pSU,i, 1), false);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,i, 1), false);
}
for(int i=0; i < 1024; i++) {
GTEST_ASSERT_EQ(isUpdated(pSU,i, 1), true);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,i, 1), true);
}
for(int i=0; i < 1024; i++) {
GTEST_ASSERT_EQ(isUpdated(pSU,i, 2), false);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,i, 2), false);
}
for(int i=0; i < 1024; i++) {
GTEST_ASSERT_EQ(isUpdated(pSU,i, 2), true);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,i, 2), true);
}
for(int i=0; i < 1024; i++) {
GTEST_ASSERT_EQ(isUpdated(pSU,i, 1), true);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,i, 1), true);
}
for(int i=3; i < 1024; i++) {
GTEST_ASSERT_EQ(isUpdated(pSU,0, i), false);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,0, i), false);
}
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU->pTsBuckets,0), 1023);
for(int i=3; i < 1024; i++) {
GTEST_ASSERT_EQ(isUpdated(pSU,0, i), true);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,0, i), true);
}
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU->pTsBuckets,0), 1023);
SUpdateInfo *pSU1 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
for(int i=1; i <= watermark / interval; i++) {
GTEST_ASSERT_EQ(isUpdated(pSU1, 1, i * interval + 5), false);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU1, 1, i * interval + 5), false);
GTEST_ASSERT_EQ(pSU1->minTS, interval);
GTEST_ASSERT_EQ(pSU1->numSBFs, watermark / interval);
}
......@@ -53,7 +53,7 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
}
for(int i= watermark / interval + 1, j = 2 ; i <= watermark / interval + 10; i++,j++) {
GTEST_ASSERT_EQ(isUpdated(pSU1, 1, i * interval + 5), false);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU1, 1, i * interval + 5), false);
GTEST_ASSERT_EQ(pSU1->minTS, interval*j);
GTEST_ASSERT_EQ(pSU1->numSBFs, watermark / interval);
SScalableBf *pSBF = (SScalableBf *)taosArrayGetP(pSU1->pTsSBFs, pSU1->numSBFs - 1);
......@@ -62,16 +62,16 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
}
for(int i= watermark / interval * 100, j = 0; j < 10; i+= (watermark / interval * 2), j++) {
GTEST_ASSERT_EQ(isUpdated(pSU1, 1, i * interval + 5), false);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU1, 1, i * interval + 5), false);
GTEST_ASSERT_EQ(pSU1->minTS, (i-(pSU1->numSBFs-1))*interval);
GTEST_ASSERT_EQ(pSU1->numSBFs, watermark / interval);
}
SUpdateInfo *pSU2 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
GTEST_ASSERT_EQ(isUpdated(pSU2, 1, 1 * interval + 5), false);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU2, 1, 1 * interval + 5), false);
GTEST_ASSERT_EQ(pSU2->minTS, interval);
for(int i= watermark / interval * 100, j = 0; j < 10; i+= (watermark / interval * 10), j++) {
GTEST_ASSERT_EQ(isUpdated(pSU2, 1, i * interval + 5), false);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU2, 1, i * interval + 5), false);
GTEST_ASSERT_EQ(pSU2->minTS, (i-(pSU2->numSBFs-1))*interval);
GTEST_ASSERT_EQ(pSU2->numSBFs, watermark / interval);
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU2->pTsBuckets,1), i * interval + 5);
......@@ -80,7 +80,7 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
SUpdateInfo *pSU3 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
for(int j = 1; j < 100; j++) {
for(int i = 0; i < pSU3->numSBFs; i++) {
GTEST_ASSERT_EQ(isUpdated(pSU3, i, i * interval + 5 * j), false);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU3, i, i * interval + 5 * j), false);
GTEST_ASSERT_EQ(pSU3->minTS, 0);
GTEST_ASSERT_EQ(pSU3->numSBFs, watermark / interval);
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU3->pTsBuckets, i), i * interval + 5 * j);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册