提交 edbdd272 编写于 作者: S shenglian zhou

[TD-6142]<feature>:support math scalar function

上级 544f1323
......@@ -602,8 +602,10 @@ static void doMergeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, i
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
} else {
} else if (!TSDB_FUNC_IS_SCALAR(functionId)){
aAggs[functionId].mergeFunc(&pCtx[j]);
} else {
assert(0);
}
}
}
......@@ -618,8 +620,10 @@ static void doFinalizeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
} else {
} else if (!TSDB_FUNC_IS_SCALAR(functionId)){
aAggs[functionId].xFinalize(&pCtx[j]);
} else {
aScalarFunctions[TSDB_FUNC_SCALAR_INDEX(functionId)].xFinalize(&pCtx[j]);
}
}
}
......@@ -658,8 +662,11 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
if (pCtx[j].functionId < 0) {
continue;
}
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
if (TSDB_FUNC_IS_SCALAR(pCtx[j].functionId)) {
assert(0);
} else {
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
}
}
doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
......@@ -899,8 +906,11 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
clearOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity);
continue;
}
aAggs[pCtx->functionId].init(pCtx, pCtx->resultInfo);
if (!TSDB_FUNC_SCALAR_INDEX(pCtx->functionId)) {
aAggs[pCtx->functionId].init(pCtx, pCtx->resultInfo);
} else {
assert(0);
}
}
}
......
......@@ -2351,11 +2351,20 @@ void setResultColName(char* name, tSqlExprItem* pItem, int32_t functionId, SStrT
if (tsKeepOriginalColumnName) { // keep the original column name
tstrncpy(name, uname, TSDB_COL_NAME_LEN);
} else {
int32_t size = TSDB_COL_NAME_LEN + tListLen(aAggs[functionId].name) + 2 + 1;
char tmp[TSDB_COL_NAME_LEN + tListLen(aAggs[functionId].name) + 2 + 1] = {0};
snprintf(tmp, size, "%s(%s)", aAggs[functionId].name, uname);
if (!TSDB_FUNC_IS_SCALAR(functionId)) {
int32_t size = TSDB_COL_NAME_LEN + tListLen(aAggs[functionId].name) + 2 + 1;
char tmp[TSDB_COL_NAME_LEN + tListLen(aAggs[functionId].name) + 2 + 1] = {0};
snprintf(tmp, size, "%s(%s)", aAggs[functionId].name, uname);
tstrncpy(name, tmp, TSDB_COL_NAME_LEN);
tstrncpy(name, tmp, TSDB_COL_NAME_LEN);
} else {
int32_t index = TSDB_FUNC_SCALAR_INDEX(functionId);
int32_t size = TSDB_COL_NAME_LEN + tListLen(aScalarFunctions[index].name) + 2 + 1;
char tmp[TSDB_COL_NAME_LEN + tListLen(aScalarFunctions[index].name) + 2 + 1] = {0};
snprintf(tmp, size, "%s(%s)", aScalarFunctions[index].name, uname);
tstrncpy(name, tmp, TSDB_COL_NAME_LEN);
}
}
} else { // use the user-input result column name
int32_t len = MIN(pItem->pNode->exprToken.n + 1, TSDB_COL_NAME_LEN);
......@@ -2991,6 +3000,77 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return TSDB_CODE_SUCCESS;
}
case TSDB_FUNC_SCALAR_LOG:
case TSDB_FUNC_SCALAR_POW: {
// 1. valid the number of parameters
if (pItem->pNode->Expr.paramList == NULL || taosArrayGetSize(pItem->pNode->Expr.paramList) != 2) {
/* no parameters or more than one parameter for function */
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
tSqlExprItem* pParamElem = taosArrayGet(pItem->pNode->Expr.paramList, 0);
if (pParamElem->pNode->tokenId != TK_ID) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
// functions can not be applied to tags
if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
// 2. valid the column type
if (!IS_NUMERIC_TYPE(pSchema->type)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
// 3. valid the parameters
if (pParamElem[1].pNode->tokenId == TK_ID) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
int16_t resultType = pSchema->type;
int16_t resultSize = pSchema->bytes;
int32_t interResult = 0;
tVariant* pVariant = &pParamElem[1].pNode->value;
char val[8] = {0};
tVariantDump(pVariant, val, TSDB_DATA_TYPE_DOUBLE, true);
SExprInfo* pExpr = NULL;
getResultDataInfo(pSchema->type, pSchema->bytes, functionId, 0, &resultType, &resultSize, &interResult, 0, false,
pUdfInfo);
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), interResult, false);
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double ));
memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName));
getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token,sizeof(pExpr->base.aliasName) - 1);
SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex);
if (finalResult) {
insertResultField(pQueryInfo, colIndex, &ids, resultSize, (int8_t)resultType, pExpr->base.aliasName, pExpr);
} else {
assert(ids.num == 1);
tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema);
}
tscInsertPrimaryTsSourceColumn(pQueryInfo, pExpr->base.uid);
return TSDB_CODE_SUCCESS;
}
default: {
pUdfInfo = isValidUdf(pQueryInfo->pUdfInfo, pItem->pNode->Expr.operand.z, pItem->pNode->Expr.operand.n);
if (pUdfInfo == NULL) {
......@@ -3337,7 +3417,11 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) {
size_t size = tscNumOfExprs(pQueryInfo);
for (int32_t k = 0; k < size; ++k) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, k);
int16_t functionId = aAggs[pExpr->base.functionId].stableFuncId;
int16_t functionId = pExpr->base.functionId;
if (!TSDB_FUNC_IS_SCALAR(functionId)) {
functionId = aAggs[pExpr->base.functionId].stableFuncId;
}
int32_t colIndex = pExpr->base.colInfo.colIndex;
SSchema* pSrcSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, colIndex);
......@@ -3412,6 +3496,10 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo)
continue;
}
if (TSDB_FUNC_IS_SCALAR(functionId)) {
continue;
}
if ((aAggs[functionId].status & TSDB_FUNCSTATE_STABLE) == 0) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
return true;
......@@ -3461,7 +3549,8 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool
int32_t scalarUdf = 0;
int32_t prjNum = 0;
int32_t aggNum = 0;
int32_t scalNum = 0;
int32_t scalarFuncNum = 0;
int32_t funcCompatFactor = INT_MAX;
size_t numOfExpr = tscNumOfExprs(pQueryInfo);
assert(numOfExpr > 0);
......@@ -3494,17 +3583,27 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool
}
if (functionId == TSDB_FUNC_CEIL || functionId == TSDB_FUNC_FLOOR || functionId == TSDB_FUNC_ROUND) {
++scalNum;
++scalarFuncNum;
}
if (TSDB_FUNC_IS_SCALAR(functionId)) {
++scalarFuncNum;
}
if (functionId == TSDB_FUNC_PRJ && (pExpr1->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX || TSDB_COL_IS_UD_COL(pExpr1->base.colInfo.flag))) {
continue;
}
if (TSDB_FUNC_IS_SCALAR(functionId)) {
funcCompatFactor = 1;
} else {
funcCompatFactor = functionCompatList[functionId];
}
if (factor == INT32_MAX) {
factor = functionCompatList[functionId];
factor = funcCompatFactor;
} else {
if (functionCompatList[functionId] != factor) {
if (funcCompatFactor != factor) {
return false;
} else {
if (factor == -1) { // two functions with the same -1 flag
......@@ -3518,19 +3617,19 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool
}
}
aggNum = (int32_t)size - prjNum - scalNum - aggUdf - scalarUdf;
aggNum = (int32_t)size - prjNum - scalarFuncNum - aggUdf - scalarUdf;
assert(aggNum >= 0);
if (aggUdf > 0 && (prjNum > 0 || aggNum > 0 || scalNum > 0 || scalarUdf > 0)) {
if (aggUdf > 0 && (prjNum > 0 || aggNum > 0 || scalarFuncNum > 0 || scalarUdf > 0)) {
return false;
}
if (scalarUdf > 0 && (aggNum > 0 || scalNum > 0)) {
if (scalarUdf > 0 && (aggNum > 0 || scalarFuncNum > 0)) {
return false;
}
if (aggNum > 0 && scalNum > 0) {
if (aggNum > 0 && scalarFuncNum > 0) {
return false;
}
......@@ -6272,6 +6371,9 @@ int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
size_t size = taosArrayGetSize(pQueryInfo->exprList);
for (int32_t i = 0; i < size; ++i) {
int32_t functId = tscExprGet(pQueryInfo, i)->base.functionId;
if (TSDB_FUNC_IS_SCALAR(functId)) {
continue;
}
if (!IS_STREAM_QUERY_VALID(aAggs[functId].status)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
......@@ -6300,6 +6402,11 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQu
}
}
if (TSDB_FUNC_IS_SCALAR(pExpr->base.functionId)) {
isProjectionFunction = true;
break;
}
// projection query on primary timestamp, the selectivity function needs to be present.
if (pExpr->base.functionId == TSDB_FUNC_PRJ && pExpr->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
bool hasSelectivity = false;
......@@ -6955,6 +7062,11 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
continue;
}
if (TSDB_FUNC_IS_SCALAR(functionId)) {
numOfScalar++;
continue;
}
if ((aAggs[functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
numOfSelectivity++;
} else if ((aAggs[functionId].status & TSDB_FUNCSTATE_SCALAR) != 0) {
......@@ -6989,6 +7101,10 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
int16_t functionId = pExpr->base.functionId;
if (TSDB_FUNC_IS_SCALAR(functionId)) {
continue;
}
if (functionId == TSDB_FUNC_TAGPRJ || (aAggs[functionId].status & TSDB_FUNCSTATE_SELECTIVITY) == 0) {
continue;
}
......@@ -7191,6 +7307,10 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char*
continue;
}
if (TSDB_FUNC_IS_SCALAR(f)) {
continue;
}
if ((!pQueryInfo->stateWindow) && (f == TSDB_FUNC_DIFF || f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE)) {
for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, j);
......@@ -7417,8 +7537,10 @@ void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex) {
if (pExpr->base.functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * pExpr->base.functionId - 1);
name = pUdfInfo->name;
} else {
} else if (!TSDB_FUNC_IS_SCALAR(pExpr->base.functionId)) {
name = aAggs[pExpr->base.functionId].name;
} else {
name = aScalarFunctions[TSDB_FUNC_SCALAR_INDEX(pExpr->base.functionId)].name;
}
tmpLen =
......
......@@ -277,7 +277,8 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
functionId != TSDB_FUNC_TID_TAG &&
functionId != TSDB_FUNC_CEIL &&
functionId != TSDB_FUNC_FLOOR &&
functionId != TSDB_FUNC_ROUND) {
functionId != TSDB_FUNC_ROUND &&
!TSDB_FUNC_IS_SCALAR(functionId)) {
return false;
}
}
......@@ -631,6 +632,10 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) {
continue;
}
if (TSDB_FUNC_IS_SCALAR(functionId)) {
continue;
}
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
......
......@@ -86,6 +86,16 @@ extern "C" {
// #define TSDB_FUNC_HLL 41
// #define TSDB_FUNC_MODE 42
#define TSDB_FUNC_FLAG_UDF 0x8000
#define TSDB_FUNC_FLAG_SCALAR 0x4000
#define TSDB_FUNC_IS_SCALAR(id) ((((id) & TSDB_FUNC_FLAG_UDF) == 0) && (((id) & TSDB_FUNC_FLAG_SCALAR) != 0))
#define TSDB_FUNC_SCALAR_INDEX(id) ((id) & ~TSDB_FUNC_FLAG_SCALAR)
///////////////////////////////////////////
// SCALAR FUNCTIONS
#define TSDB_FUNC_SCALAR_POW (TSDB_FUNC_FLAG_SCALAR | 0x0000)
#define TSDB_FUNC_SCALAR_LOG (TSDB_FUNC_FLAG_SCALAR | 0x0001)
#define TSDB_FUNC_SCALAR_MAX_NUM 2
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
#define TSDB_FUNCSTATE_STREAM 0x4u // function avail for stream
......@@ -218,6 +228,15 @@ typedef struct SAggFunctionInfo {
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId);
} SAggFunctionInfo;
typedef struct SScalarFunctionInfo {
int16_t functionId; // scalar function id & ~TSDB_FUNC_FLAG_SCALAR == index
char name[TSDB_FUNCTIONS_NAME_MAX_LENGTH];
bool (*init)(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultCellInfo);
void (*xFunction)(SQLFunctionCtx *pCtx);
void (*xFinalize)(SQLFunctionCtx *pCtx);
} SScalarFunctionInfo;
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
......@@ -251,6 +270,9 @@ void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDi
/* global sql function array */
extern struct SAggFunctionInfo aAggs[];
/* global scalar sql functions array */
extern struct SScalarFunctionInfo aScalarFunctions[TSDB_FUNC_SCALAR_MAX_NUM];
extern int32_t functionCompatList[]; // compatible check array list
bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const char *maxval);
......@@ -266,11 +288,11 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const cha
static FORCE_INLINE void initResultInfo(SResultRowCellInfo *pResInfo, int32_t bufLen) {
pResInfo->initialized = true; // the this struct has been initialized flag
pResInfo->complete = false;
pResInfo->hasResult = false;
pResInfo->numOfRes = 0;
memset(GET_ROWCELL_INTERBUF(pResInfo), 0, bufLen);
}
......
......@@ -197,6 +197,25 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (TSDB_FUNC_IS_SCALAR(functionId)) {
switch (functionId) {
case TSDB_FUNC_SCALAR_POW: {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
break;
}
case TSDB_FUNC_SCALAR_LOG: {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
break;
}
default: {
qError("Illegal function id: %d", functionId);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
}
return TSDB_CODE_SUCCESS;
}
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG_DUMMY ||
functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAGPRJ ||
......@@ -471,6 +490,17 @@ int32_t isValidFunction(const char* name, int32_t len) {
}
}
for (int32_t i = 0; i < TSDB_FUNC_SCALAR_MAX_NUM; ++i) {
int32_t nameLen = (int32_t) strlen(aScalarFunctions[i].name);
if (len != nameLen) {
continue;
}
if (strncasecmp(aScalarFunctions[i].name, name, len) == 0) {
return aScalarFunctions[i].functionId;
}
}
return -1;
}
......@@ -5352,3 +5382,71 @@ SAggFunctionInfo aAggs[] = {{
dataBlockRequired,
},
};
static void scalar_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
int32_t notNullElems = 0;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size -1;
TSKEY* pTimestamp = pCtx->ptsOutputBuf;
qDebug("%p scalar_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull);
for (; i < pCtx->size && i >= 0; i += step) {
char* pData = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
qDebug("%p scalar_function() index of null data:%d", pCtx, i);
continue;
}
switch (pCtx->functionId) {
case TSDB_FUNC_SCALAR_POW: {
double v = 0;
GET_TYPED_DATA(v, double, pCtx->inputType, pData);
double result = pow(v, pCtx->param[0].dKey);
SET_TYPED_DATA(pCtx->pOutput, pCtx->outputType, result);
break;
}
case TSDB_FUNC_SCALAR_LOG: {
double v = 0;
GET_TYPED_DATA(v, double, pCtx->inputType, pData);
double result = log(v)/ log(pCtx->param[0].dKey);
SET_TYPED_DATA(pCtx->pOutput, pCtx->outputType, result);
break;
}
default:
qError("invalid function id %d", pCtx->functionId);
break;
}
++notNullElems;
pCtx->pOutput += pCtx->outputBytes;
pTimestamp++;
}
if (notNullElems == 0) {
assert(pCtx->hasNull);
} else {
pResInfo->numOfRes += notNullElems;
pResInfo->hasResult = DATA_SET_FLAG;
}
}
SScalarFunctionInfo aScalarFunctions[] = {
{
TSDB_FUNC_SCALAR_POW,
"pow",
function_setup,
scalar_function,
doFinalizer,
},
{
TSDB_FUNC_SCALAR_LOG,
"log",
function_setup,
scalar_function,
doFinalizer,
},
};
\ No newline at end of file
......@@ -392,6 +392,10 @@ static bool isSelectivityWithTagsQuery(SQLFunctionCtx *pCtx, int32_t numOfOutput
for (int32_t i = 0; i < numOfOutput; ++i) {
int32_t functId = pCtx[i].functionId;
if (TSDB_FUNC_IS_SCALAR(functId)) {
continue;
}
if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY) {
hasTags = true;
continue;
......@@ -411,6 +415,11 @@ static bool isScalarWithTagsQuery(SQLFunctionCtx *pCtx, int32_t numOfOutput) {
for (int32_t i = 0; i < numOfOutput; ++i) {
int32_t functId = pCtx[i].functionId;
if (TSDB_FUNC_IS_SCALAR(functId)) {
numOfScalar++;
continue;
}
if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY) {
hasTags = true;
continue;
......@@ -1024,8 +1033,10 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
if (functionId < 0) { // load the script and exec, pRuntimeEnv->pUdfInfo
SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL);
} else {
} else if (!TSDB_FUNC_IS_SCALAR(functionId)){
aAggs[functionId].xFunction(&pCtx[k]);
} else {
aScalarFunctions[TSDB_FUNC_SCALAR_INDEX(functionId)].xFunction(&pCtx[k]);
}
}
......@@ -1241,9 +1252,13 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
pCtx[i].pInput = p->pData;
assert(p->info.colId == pColIndex->colId && pCtx[i].inputType == p->info.type);
if (pCtx[i].functionId < 0) {
if (pCtx[i].functionId < 0 || TSDB_FUNC_IS_SCALAR(pCtx[i].functionId)) {
SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0);
pCtx[i].ptsList = (int64_t*) tsInfo->pData;
if (tsInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
pCtx[i].ptsList = (int64_t*)tsInfo->pData;
} else {
pCtx[i].ptsList = NULL;
}
continue;
}
......@@ -1285,8 +1300,10 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
if (functionId < 0) {
SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL);
} else {
} else if (!TSDB_FUNC_IS_SCALAR(functionId)){
aAggs[functionId].xFunction(&pCtx[k]);
} else {
aScalarFunctions[TSDB_FUNC_SCALAR_INDEX(functionId)].xFunction(&pCtx[k]);
}
}
}
......@@ -1309,8 +1326,10 @@ static void projectApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
// load the script and exec
SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL);
} else {
} else if (!TSDB_FUNC_IS_SCALAR(pCtx[k].functionId)) {
aAggs[pCtx[k].functionId].xFunction(&pCtx[k]);
} else {
aScalarFunctions[TSDB_FUNC_SCALAR_INDEX(pCtx[k].functionId)].xFunction(&pCtx[k]);
}
}
}
......@@ -1973,6 +1992,9 @@ static int32_t setCtxTagColumnInfo(SQLFunctionCtx *pCtx, int32_t numOfOutput) {
for (int32_t i = 0; i < numOfOutput; ++i) {
int32_t functionId = pCtx[i].functionId;
if (functionId < 0 || TSDB_FUNC_IS_SCALAR(functionId)) {
continue;
}
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
tagLen += pCtx[i].outputBytes;
......@@ -3056,7 +3078,7 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData
int32_t colId = pTableScanInfo->pExpr[i].base.colInfo.colId;
// group by + first/last should not apply the first/last block filter
if (functionId < 0) {
if (functionId < 0 || TSDB_FUNC_IS_SCALAR(functionId)) {
status |= BLK_DATA_ALL_NEEDED;
return status;
} else {
......@@ -3754,8 +3776,10 @@ void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) {
if (pCtx[j].functionId < 0) { // todo udf initialization
continue;
} else {
} else if (!TSDB_FUNC_IS_SCALAR(pCtx[j].functionId)) {
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
} else {
aScalarFunctions[TSDB_FUNC_SCALAR_INDEX(pCtx[j].functionId)].init(&pCtx[j], pCtx[j].resultInfo);
}
}
}
......@@ -3814,8 +3838,10 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
pCtx[j].startTs = buf->win.skey;
if (pCtx[j].functionId < 0) {
doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
} else {
} else if (!TSDB_FUNC_IS_SCALAR(pCtx[j].functionId)) {
aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]);
} else {
aScalarFunctions[TSDB_FUNC_SCALAR_INDEX(pCtx[j].functionId)].xFinalize(&pCtx[j]);
}
}
......@@ -3831,8 +3857,10 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
for (int32_t j = 0; j < numOfOutput; ++j) {
if (pCtx[j].functionId < 0) {
doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
} else {
} else if (!TSDB_FUNC_IS_SCALAR(pCtx[j].functionId)) {
aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]);
} else {
aScalarFunctions[TSDB_FUNC_SCALAR_INDEX(pCtx[j].functionId)].xFinalize(&pCtx[j]);
}
}
}
......@@ -3920,9 +3948,6 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
offset += pCtx[i].outputBytes;
int32_t functionId = pCtx[i].functionId;
if (functionId < 0) {
continue;
}
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF ||
functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_SAMPLE) {
......@@ -3930,7 +3955,13 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
}
if (!pResInfo->initialized) {
aAggs[functionId].init(&pCtx[i], pResInfo);
if (functionId < 0 ) {
doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[i], 0, TSDB_UDF_FUNC_INIT);
} else if (TSDB_FUNC_IS_SCALAR(functionId)) {
aScalarFunctions[TSDB_FUNC_SCALAR_INDEX(functionId)].init(&pCtx[i], pResInfo);
} else {
aAggs[functionId].init(&pCtx[i], pResInfo);
}
}
}
}
......@@ -5836,7 +5867,15 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
break;
} else { // init output buffer for a new group data
for (int32_t j = 0; j < pOperator->numOfOutput; ++j) {
aAggs[pInfo->pCtx[j].functionId].xFinalize(&pInfo->pCtx[j]);
int16_t functionId = pInfo->pCtx[j].functionId;
if (functionId < 0 ) {
SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
doInvokeUdf(pUdfInfo, &pInfo->pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
} else if (TSDB_FUNC_IS_SCALAR(functionId)) {
aScalarFunctions[TSDB_FUNC_SCALAR_INDEX(functionId)].xFinalize(&pInfo->pCtx[j]);
} else {
aAggs[pInfo->pCtx[j].functionId].xFinalize(&pInfo->pCtx[j]);
}
}
initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfOutput);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册