提交 04e9996d 编写于 作者: X Xiaoyu Wang

feat: the function manager adds an interface for manually configuring the...

feat: the function manager adds an interface for manually configuring the parameters of the merge function
上级 c0074588
...@@ -24,22 +24,24 @@ extern "C" { ...@@ -24,22 +24,24 @@ extern "C" {
typedef int32_t (*FTranslateFunc)(SFunctionNode* pFunc, char* pErrBuf, int32_t len); typedef int32_t (*FTranslateFunc)(SFunctionNode* pFunc, char* pErrBuf, int32_t len);
typedef EFuncDataRequired (*FFuncDataRequired)(SFunctionNode* pFunc, STimeWindow* pTimeWindow); typedef EFuncDataRequired (*FFuncDataRequired)(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
typedef int32_t (*FCreateMergeFuncParameters)(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters);
typedef struct SBuiltinFuncDefinition { typedef struct SBuiltinFuncDefinition {
const char* name; const char* name;
EFunctionType type; EFunctionType type;
uint64_t classification; uint64_t classification;
FTranslateFunc translateFunc; FTranslateFunc translateFunc;
FFuncDataRequired dataRequiredFunc; FFuncDataRequired dataRequiredFunc;
FExecGetEnv getEnvFunc; FExecGetEnv getEnvFunc;
FExecInit initFunc; FExecInit initFunc;
FExecProcess processFunc; FExecProcess processFunc;
FScalarExecProcess sprocessFunc; FScalarExecProcess sprocessFunc;
FExecFinalize finalizeFunc; FExecFinalize finalizeFunc;
FExecProcess invertFunc; FExecProcess invertFunc;
FExecCombine combineFunc; FExecCombine combineFunc;
const char* pPartialFunc; const char* pPartialFunc;
const char* pMergeFunc; const char* pMergeFunc;
FCreateMergeFuncParameters createMergeParaFuc;
} SBuiltinFuncDefinition; } SBuiltinFuncDefinition;
extern const SBuiltinFuncDefinition funcMgtBuiltins[]; extern const SBuiltinFuncDefinition funcMgtBuiltins[];
......
...@@ -51,8 +51,6 @@ extern "C" { ...@@ -51,8 +51,6 @@ extern "C" {
#define FUNC_UDF_ID_START 5000 #define FUNC_UDF_ID_START 5000
extern const int funcMgtUdfNum;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -419,6 +419,14 @@ static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len) ...@@ -419,6 +419,14 @@ static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t topCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters) {
int32_t code = nodesListMakeAppend(pParameters, pPartialRes);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListStrictAppend(*pParameters, nodesCloneNode(nodesListGetNode(pRawParameters, 1)));
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateTopBotImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { static int32_t translateTopBotImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
...@@ -1728,7 +1736,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1728,7 +1736,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.finalizeFunc = topBotFinalize, .finalizeFunc = topBotFinalize,
.combineFunc = topCombine, .combineFunc = topCombine,
.pPartialFunc = "_top_partial", .pPartialFunc = "_top_partial",
.pMergeFunc = "_top_merge" .pMergeFunc = "_top_merge",
.createMergeParaFuc = topCreateMergePara
}, },
{ {
.name = "_top_partial", .name = "_top_partial",
......
...@@ -794,8 +794,8 @@ int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) { ...@@ -794,8 +794,8 @@ int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) {
SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
char* data = colDataGetData(pCol, start); char* data = colDataGetData(pCol, start);
SAvgRes* pInputInfo = (SAvgRes*)varDataVal(data); SAvgRes* pInputInfo = (SAvgRes*)varDataVal(data);
avgTransferInfo(pInputInfo, pInfo); avgTransferInfo(pInputInfo, pInfo);
...@@ -908,9 +908,9 @@ int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -908,9 +908,9 @@ int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = getAvgInfoSize(); int32_t resultBytes = getAvgInfoSize();
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
memcpy(varDataVal(res), pInfo, resultBytes); memcpy(varDataVal(res), pInfo, resultBytes);
varDataSetLen(res, resultBytes); varDataSetLen(res, resultBytes);
...@@ -1418,7 +1418,7 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -1418,7 +1418,7 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex) { void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex) {
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
colDataAppendNULL(pDstCol, rowIndex); colDataAppendNULL(pDstCol, rowIndex);
...@@ -1663,8 +1663,8 @@ int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) { ...@@ -1663,8 +1663,8 @@ int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) {
SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
char* data = colDataGetData(pCol, start); char* data = colDataGetData(pCol, start);
SStddevRes* pInputInfo = (SStddevRes*)varDataVal(data); SStddevRes* pInputInfo = (SStddevRes*)varDataVal(data);
stddevTransferInfo(pInputInfo, pInfo); stddevTransferInfo(pInputInfo, pInfo);
...@@ -1756,9 +1756,9 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -1756,9 +1756,9 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t stddevPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t stddevPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = getStddevInfoSize(); int32_t resultBytes = getStddevInfoSize();
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
memcpy(varDataVal(res), pInfo, resultBytes); memcpy(varDataVal(res), pInfo, resultBytes);
varDataSetLen(res, resultBytes); varDataSetLen(res, resultBytes);
...@@ -2360,12 +2360,10 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) ...@@ -2360,12 +2360,10 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getFirstLastInfoSize(int32_t resBytes) { int32_t getFirstLastInfoSize(int32_t resBytes) { return sizeof(SFirstLastRes) + resBytes + sizeof(int64_t); }
return sizeof(SFirstLastRes) + resBytes + sizeof(int64_t);
}
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pNode = (SColumnNode *)nodesListGetNode(pFunc->pParameterList, 0); SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
pEnv->calcMemSize = sizeof(SFirstLastRes) + pNode->node.resType.bytes + sizeof(int64_t); pEnv->calcMemSize = sizeof(SFirstLastRes) + pNode->node.resType.bytes + sizeof(int64_t);
return true; return true;
} }
...@@ -2390,14 +2388,14 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { ...@@ -2390,14 +2388,14 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0; int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SFirstLastRes *pInfo = GET_ROWCELL_INTERBUF(pResInfo); SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
int32_t type = pInputCol->info.type; int32_t type = pInputCol->info.type;
int32_t bytes = pInputCol->info.bytes; int32_t bytes = pInputCol->info.bytes;
pInfo->bytes = bytes; pInfo->bytes = bytes;
// All null data column, return directly. // All null data column, return directly.
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
...@@ -2488,14 +2486,14 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { ...@@ -2488,14 +2486,14 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0; int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SFirstLastRes *pInfo = GET_ROWCELL_INTERBUF(pResInfo); SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
int32_t type = pInputCol->info.type; int32_t type = pInputCol->info.type;
int32_t bytes = pInputCol->info.bytes; int32_t bytes = pInputCol->info.bytes;
pInfo->bytes = bytes; pInfo->bytes = bytes;
// All null data column, return directly. // All null data column, return directly.
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
...@@ -2567,8 +2565,8 @@ static void firstLastTransferInfo(SFirstLastRes* pInput, SFirstLastRes* pOutput, ...@@ -2567,8 +2565,8 @@ static void firstLastTransferInfo(SFirstLastRes* pInput, SFirstLastRes* pOutput,
return; return;
} }
pOutput->bytes = pInput->bytes; pOutput->bytes = pInput->bytes;
TSKEY *tsIn = (TSKEY*)(pInput->buf + pInput->bytes); TSKEY* tsIn = (TSKEY*)(pInput->buf + pInput->bytes);
TSKEY *tsOut = (TSKEY*)(pOutput->buf + pInput->bytes); TSKEY* tsOut = (TSKEY*)(pOutput->buf + pInput->bytes);
if (pOutput->hasResult) { if (pOutput->hasResult) {
if (isFirst) { if (isFirst) {
if (*tsIn > *tsOut) { if (*tsIn > *tsOut) {
...@@ -2586,16 +2584,16 @@ static void firstLastTransferInfo(SFirstLastRes* pInput, SFirstLastRes* pOutput, ...@@ -2586,16 +2584,16 @@ static void firstLastTransferInfo(SFirstLastRes* pInput, SFirstLastRes* pOutput,
return; return;
} }
static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx *pCtx, bool isFirstQuery) { static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuery) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0]; SColumnInfoData* pCol = pInput->pData[0];
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
char* data = colDataGetData(pCol, start); char* data = colDataGetData(pCol, start);
SFirstLastRes* pInputInfo = (SFirstLastRes *)varDataVal(data); SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data);
firstLastTransferInfo(pInputInfo, pInfo, isFirstQuery); firstLastTransferInfo(pInputInfo, pInfo, isFirstQuery);
...@@ -2604,13 +2602,9 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx *pCtx, bool isFirstQuer ...@@ -2604,13 +2602,9 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx *pCtx, bool isFirstQuer
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t firstFunctionMerge(SqlFunctionCtx *pCtx) { int32_t firstFunctionMerge(SqlFunctionCtx* pCtx) { return firstLastFunctionMergeImpl(pCtx, true); }
return firstLastFunctionMergeImpl(pCtx, true);
}
int32_t lastFunctionMerge(SqlFunctionCtx *pCtx) { int32_t lastFunctionMerge(SqlFunctionCtx* pCtx) { return firstLastFunctionMergeImpl(pCtx, false); }
return firstLastFunctionMergeImpl(pCtx, false);
}
int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
...@@ -2627,9 +2621,9 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -2627,9 +2621,9 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = getFirstLastInfoSize(pRes->bytes); int32_t resultBytes = getFirstLastInfoSize(pRes->bytes);
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
memcpy(varDataVal(res), pRes, resultBytes); memcpy(varDataVal(res), pRes, resultBytes);
varDataSetLen(res, resultBytes); varDataSetLen(res, resultBytes);
...@@ -3369,8 +3363,8 @@ int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx) { ...@@ -3369,8 +3363,8 @@ int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx) {
SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
char* data = colDataGetData(pCol, start); char* data = colDataGetData(pCol, start);
SSpreadInfo* pInputInfo = (SSpreadInfo*)varDataVal(data); SSpreadInfo* pInputInfo = (SSpreadInfo*)varDataVal(data);
spreadTransferInfo(pInputInfo, pInfo); spreadTransferInfo(pInputInfo, pInfo);
...@@ -3390,9 +3384,9 @@ int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -3390,9 +3384,9 @@ int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = getSpreadInfoSize(); int32_t resultBytes = getSpreadInfoSize();
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
memcpy(varDataVal(res), pInfo, resultBytes); memcpy(varDataVal(res), pInfo, resultBytes);
varDataSetLen(res, resultBytes); varDataSetLen(res, resultBytes);
...@@ -3484,8 +3478,8 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) { ...@@ -3484,8 +3478,8 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) {
SColumnInfoData* pCol = pInput->pData[0]; SColumnInfoData* pCol = pInput->pData[0];
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
TSKEY* ptsList = (int64_t*)colDataGetData(pCol, 0); TSKEY* ptsList = (int64_t*)colDataGetData(pCol, 0);
if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->order == TSDB_ORDER_DESC) {
if (pCtx->start.key == INT64_MIN) { if (pCtx->start.key == INT64_MIN) {
pInfo->max = pInfo->max =
...@@ -5176,11 +5170,11 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -5176,11 +5170,11 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
} }
typedef struct SDerivInfo { typedef struct SDerivInfo {
double prevValue; // previous value double prevValue; // previous value
TSKEY prevTs; // previous timestamp TSKEY prevTs; // previous timestamp
bool ignoreNegative;// ignore the negative value bool ignoreNegative; // ignore the negative value
int64_t tsWindow; // time window for derivative int64_t tsWindow; // time window for derivative
bool valueSet; // the value has been set already bool valueSet; // the value has been set already
} SDerivInfo; } SDerivInfo;
bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
...@@ -5188,7 +5182,7 @@ bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { ...@@ -5188,7 +5182,7 @@ bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
return true; return true;
} }
bool derivativeFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo) { bool derivativeFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
if (!functionSetup(pCtx, pResInfo)) { if (!functionSetup(pCtx, pResInfo)) {
return false; // not initialized since it has been initialized return false; // not initialized since it has been initialized
} }
...@@ -5196,25 +5190,25 @@ bool derivativeFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo) { ...@@ -5196,25 +5190,25 @@ bool derivativeFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo) {
SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo); SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo);
pDerivInfo->ignoreNegative = pCtx->param[2].param.i; pDerivInfo->ignoreNegative = pCtx->param[2].param.i;
pDerivInfo->prevTs = -1; pDerivInfo->prevTs = -1;
pDerivInfo->tsWindow = pCtx->param[1].param.i; pDerivInfo->tsWindow = pCtx->param[1].param.i;
pDerivInfo->valueSet = false; pDerivInfo->valueSet = false;
return true; return true;
} }
int32_t derivativeFunction(SqlFunctionCtx *pCtx) { int32_t derivativeFunction(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo); SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
int32_t numOfElems = 0; int32_t numOfElems = 0;
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
SColumnInfoData* pTsOutput = pCtx->pTsOutput; SColumnInfoData* pTsOutput = pCtx->pTsOutput;
int32_t i = pInput->startRowIndex; int32_t i = pInput->startRowIndex;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData; TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
double v = 0; double v = 0;
...@@ -5224,7 +5218,7 @@ int32_t derivativeFunction(SqlFunctionCtx *pCtx) { ...@@ -5224,7 +5218,7 @@ int32_t derivativeFunction(SqlFunctionCtx *pCtx) {
continue; continue;
} }
char* d = (char*)pInputCol->pData + pInputCol->info.bytes * i; char* d = (char*)pInputCol->pData + pInputCol->info.bytes * i;
GET_TYPED_DATA(v, double, pInputCol->info.type, d); GET_TYPED_DATA(v, double, pInputCol->info.type, d);
int32_t pos = pCtx->offset + numOfElems; int32_t pos = pCtx->offset + numOfElems;
...@@ -5251,7 +5245,7 @@ int32_t derivativeFunction(SqlFunctionCtx *pCtx) { ...@@ -5251,7 +5245,7 @@ int32_t derivativeFunction(SqlFunctionCtx *pCtx) {
continue; continue;
} }
char* d = (char*)pInputCol->pData + pInputCol->info.bytes * i; char* d = (char*)pInputCol->pData + pInputCol->info.bytes * i;
GET_TYPED_DATA(v, double, pInputCol->info.type, d); GET_TYPED_DATA(v, double, pInputCol->info.type, d);
int32_t pos = pCtx->offset + numOfElems; int32_t pos = pCtx->offset + numOfElems;
...@@ -5277,7 +5271,7 @@ int32_t derivativeFunction(SqlFunctionCtx *pCtx) { ...@@ -5277,7 +5271,7 @@ int32_t derivativeFunction(SqlFunctionCtx *pCtx) {
return numOfElems; return numOfElems;
} }
int32_t interpFunction(SqlFunctionCtx *pCtx) { int32_t interpFunction(SqlFunctionCtx* pCtx) {
#if 0 #if 0
int32_t fillType = (int32_t) pCtx->param[2].i64; int32_t fillType = (int32_t) pCtx->param[2].i64;
//bool ascQuery = (pCtx->order == TSDB_ORDER_ASC); //bool ascQuery = (pCtx->order == TSDB_ORDER_ASC);
......
...@@ -261,14 +261,14 @@ static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterLis ...@@ -261,14 +261,14 @@ static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterLis
return pFunc; return pFunc;
} }
static SColumnNode* createColumnByFunc(const SFunctionNode* pFunc) { static SNode* createColumnByFunc(const SFunctionNode* pFunc) {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) { if (NULL == pCol) {
return NULL; return NULL;
} }
strcpy(pCol->colName, pFunc->node.aliasName); strcpy(pCol->colName, pFunc->node.aliasName);
pCol->node.resType = pFunc->node.resType; pCol->node.resType = pFunc->node.resType;
return pCol; return (SNode*)pCol;
} }
bool fmIsDistExecFunc(int32_t funcId) { bool fmIsDistExecFunc(int32_t funcId) {
...@@ -296,21 +296,45 @@ static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNod ...@@ -296,21 +296,45 @@ static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNod
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t createMergeFuncPara(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc,
SNodeList** pParameterList) {
SNode* pRes = createColumnByFunc(pPartialFunc);
if (NULL != funcMgtBuiltins[pSrcFunc->funcId].createMergeParaFuc) {
return funcMgtBuiltins[pSrcFunc->funcId].createMergeParaFuc(pSrcFunc->pParameterList, pRes, pParameterList);
} else {
return nodesListMakeStrictAppend(pParameterList, pRes);
}
}
static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc, static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc,
SFunctionNode** pMergeFunc) { SFunctionNode** pMergeFunc) {
SNodeList* pParameterList = NULL; SNodeList* pParameterList = NULL;
nodesListMakeStrictAppend(&pParameterList, (SNode*)createColumnByFunc(pPartialFunc)); SFunctionNode* pFunc = NULL;
*pMergeFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList);
if (NULL == *pMergeFunc) { int32_t code = createMergeFuncPara(pSrcFunc, pPartialFunc, &pParameterList);
nodesDestroyList(pParameterList); if (TSDB_CODE_SUCCESS == code) {
return TSDB_CODE_OUT_OF_MEMORY; pFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList);
if (NULL == pFunc) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
} }
// overwrite function restype set by translate function if (TSDB_CODE_SUCCESS == code) {
if (fmIsSameInOutType(pSrcFunc->funcId)) { // overwrite function restype set by translate function
(*pMergeFunc)->node.resType = pSrcFunc->node.resType; if (fmIsSameInOutType(pSrcFunc->funcId)) {
(*pMergeFunc)->node.resType = pSrcFunc->node.resType;
}
strcpy((*pMergeFunc)->node.aliasName, pSrcFunc->node.aliasName);
} }
strcpy((*pMergeFunc)->node.aliasName, pSrcFunc->node.aliasName);
return TSDB_CODE_SUCCESS; if (TSDB_CODE_SUCCESS == code) {
*pMergeFunc = pFunc;
} else {
pFunc->pParameterList = NULL;
nodesDestroyNode((SNode*)pFunc);
nodesDestroyList(pParameterList);
}
return code;
} }
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc) { int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册