提交 27c94691 编写于 作者: S shenglian zhou

move scalar function from qAggMain to tExpr

上级 6b2461ce
......@@ -623,7 +623,7 @@ static void doFinalizeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx
} else if (!TSDB_FUNC_IS_SCALAR(functionId)){
aAggs[functionId].xFinalize(&pCtx[j]);
} else {
aScalarFunctions[TSDB_FUNC_SCALAR_INDEX(functionId)].xFinalize(&pCtx[j]);
assert(0);
}
}
}
......
......@@ -1749,7 +1749,7 @@ static int32_t handleScalarExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t e
int32_t tableIndex = columnList->ids[0].tableIndex;
pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY;
// all columns in arithmetic expression must belong to the same table
// all columns in scalar expression must belong to the same table
for (int32_t f = 1; f < columnList->num; ++f) {
if (columnList->ids[f].tableIndex != tableIndex) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
......@@ -1776,7 +1776,7 @@ static int32_t handleScalarExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t e
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
// check for if there is a tag in the arithmetic express
// check for if there is a tag in the scalar expression
size_t numOfNode = taosArrayGetSize(colList);
for(int32_t k = 0; k < numOfNode; ++k) {
SColIndex* pIndex = taosArrayGet(colList, k);
......@@ -1827,6 +1827,7 @@ static int32_t handleAggregateExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_
char aliasName[TSDB_COL_NAME_LEN] = {0};
getColumnName(pItem, aliasName, rawName, TSDB_COL_NAME_LEN);
// the expr associated with the result field will become exprList1 in SQueryInfo, then pExpr2 in SQueryAttr
insertResultField(pQueryInfo, exprIndex, columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, aliasName, NULL);
int32_t slot = tscNumOfFields(pQueryInfo) - 1;
......@@ -4387,6 +4388,7 @@ static int32_t validateSQLExprSQLFunc(SSqlCmd* pCmd, tSqlExpr* pExpr,
}
}
// add the aggregate function to SQueryInfo exprList, which is pExpr1/global aggregate pExpr3 in SQueryAttr
if (addExprAndResultField(pCmd, pQueryInfo, outputIndex, &item, false, NULL) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
......
......@@ -19,12 +19,6 @@
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
int16_t type;
int16_t bytes;
int16_t numOfRows;
char* data;
} tExprOperandInfo;
typedef void (*_arithmetic_operator_fn_t)(void *left, int32_t numLeft, int32_t leftType, void *right, int32_t numRight,
int32_t rightType, void *output, int32_t order);
......@@ -32,10 +26,6 @@ typedef void (*_arithmetic_operator_fn_t)(void *left, int32_t numLeft, int32_t l
_arithmetic_operator_fn_t getArithmeticOperatorFn(int32_t arithmeticOptr);
typedef void (*_expr_scalar_function_t)(tExprOperandInfo* pInputs, uint8_t numInputs, tExprOperandInfo* pOutput, int32_t order);
_expr_scalar_function_t getExprScalarFunction(uint16_t scalar);
#ifdef __cplusplus
}
#endif
......
......@@ -49,6 +49,27 @@ struct SSchema;
#define TSDB_FUNC_SCALAR_POW (TSDB_FUNC_FLAG_SCALAR | 0x0000)
#define TSDB_FUNC_SCALAR_LOG (TSDB_FUNC_FLAG_SCALAR | 0x0001)
#define TSDB_FUNC_SCALAR_MAX_NUM 2
#define TSDB_FUNC_SCALAR_NAME_MAX_LEN 16
typedef struct {
int16_t type;
int16_t bytes;
int16_t numOfRows;
char* data;
} tExprOperandInfo;
typedef void (*_expr_scalar_function_t)(tExprOperandInfo* pInputs, uint8_t numInputs, tExprOperandInfo* pOutput, int32_t order);
_expr_scalar_function_t getExprScalarFunction(uint16_t scalar);
typedef struct tScalarFunctionInfo{
int16_t functionId; // scalar function id & ~TSDB_FUNC_FLAG_SCALAR == index
char name[TSDB_FUNC_SCALAR_NAME_MAX_LEN];
} tScalarFunctionInfo;
/* global scalar sql functions array */
extern struct tScalarFunctionInfo aScalarFunctions[TSDB_FUNC_SCALAR_MAX_NUM];
typedef bool (*__result_filter_fn_t)(const void *, void *);
typedef void (*__do_filter_suppl_fn_t)(void *, void *);
......
......@@ -411,60 +411,3 @@ _arithmetic_operator_fn_t getArithmeticOperatorFn(int32_t arithmeticOptr) {
return NULL;
}
}
void vectorPow(tExprOperandInfo* pInputs, uint8_t numInputs, tExprOperandInfo* pOutput, int32_t order) {
assert(numInputs == 2);
assert(pInputs[1].numOfRows == 1 && pInputs[0].numOfRows >= 1);
int numOfRows = pInputs[0].numOfRows;
double base = 0;
GET_TYPED_DATA(base, double, pInputs[1].type, pInputs[1].data);
for (int i = 0; i < numOfRows; ++i) {
char* pInputData = pInputs[0].data + i * pInputs[0].bytes;
char* pOutputData = pOutput->data + i * pOutput->bytes;
if (isNull(pInputData, pInputs[0].type)) {
setNull(pOutputData, pOutput->type, pOutput->bytes);
} else {
double v1 = 0;
GET_TYPED_DATA(v1, double, pInputs[0].type, pInputData);
double result = pow(v1, base);
SET_TYPED_DATA(pOutputData, pOutput->type, result);
}
}
}
void vectorLog(tExprOperandInfo* pInputs, uint8_t numInputs, tExprOperandInfo* pOutput, int32_t order) {
assert(numInputs == 2);
assert(pInputs[1].numOfRows == 1 && pInputs[0].numOfRows >= 1);
int numOfRows = pInputs[0].numOfRows;
double base = 0;
GET_TYPED_DATA(base, double, pInputs[1].type, pInputs[1].data);
for (int i = 0; i < numOfRows; ++i) {
char* pInputData = pInputs[0].data + i * pInputs[0].bytes;
char* pOutputData = pOutput->data + i * pOutput->bytes;
if (isNull(pInputData, pInputs[0].type)) {
setNull(pOutputData, pOutput->type, pOutput->bytes);
} else {
double v1 = 0;
GET_TYPED_DATA(v1, double, pInputs[0].type, pInputData);
double result = log(v1) / log(base);
SET_TYPED_DATA(pOutputData, pOutput->type, result);
}
}
}
_expr_scalar_function_t getExprScalarFunction(uint16_t scalarFunc) {
switch (scalarFunc) {
case TSDB_FUNC_SCALAR_POW:
return vectorPow;
case TSDB_FUNC_SCALAR_LOG:
return vectorLog;
default:
assert(0);
return NULL;
}
}
\ No newline at end of file
......@@ -743,3 +743,71 @@ tExprNode* exprdup(tExprNode* pNode) {
return pCloned;
}
void vectorPow(tExprOperandInfo* pInputs, uint8_t numInputs, tExprOperandInfo* pOutput, int32_t order) {
assert(numInputs == 2);
assert(pInputs[1].numOfRows == 1 && pInputs[0].numOfRows >= 1);
int numOfRows = pInputs[0].numOfRows;
double base = 0;
GET_TYPED_DATA(base, double, pInputs[1].type, pInputs[1].data);
for (int i = 0; i < numOfRows; ++i) {
char* pInputData = pInputs[0].data + i * pInputs[0].bytes;
char* pOutputData = pOutput->data + i * pOutput->bytes;
if (isNull(pInputData, pInputs[0].type)) {
setNull(pOutputData, pOutput->type, pOutput->bytes);
} else {
double v1 = 0;
GET_TYPED_DATA(v1, double, pInputs[0].type, pInputData);
double result = pow(v1, base);
SET_TYPED_DATA(pOutputData, pOutput->type, result);
}
}
}
void vectorLog(tExprOperandInfo* pInputs, uint8_t numInputs, tExprOperandInfo* pOutput, int32_t order) {
assert(numInputs == 2);
assert(pInputs[1].numOfRows == 1 && pInputs[0].numOfRows >= 1);
int numOfRows = pInputs[0].numOfRows;
double base = 0;
GET_TYPED_DATA(base, double, pInputs[1].type, pInputs[1].data);
for (int i = 0; i < numOfRows; ++i) {
char* pInputData = pInputs[0].data + i * pInputs[0].bytes;
char* pOutputData = pOutput->data + i * pOutput->bytes;
if (isNull(pInputData, pInputs[0].type)) {
setNull(pOutputData, pOutput->type, pOutput->bytes);
} else {
double v1 = 0;
GET_TYPED_DATA(v1, double, pInputs[0].type, pInputData);
double result = log(v1) / log(base);
SET_TYPED_DATA(pOutputData, pOutput->type, result);
}
}
}
_expr_scalar_function_t getExprScalarFunction(uint16_t scalarFunc) {
switch (scalarFunc) {
case TSDB_FUNC_SCALAR_POW:
return vectorPow;
case TSDB_FUNC_SCALAR_LOG:
return vectorLog;
default:
assert(0);
return NULL;
}
}
tScalarFunctionInfo aScalarFunctions[] = {
{
TSDB_FUNC_SCALAR_POW,
"pow"
},
{
TSDB_FUNC_SCALAR_LOG,
"log"
},
};
\ No newline at end of file
Subproject commit f56aa0f485d7bb6aebbcefc2007eeecdccb767c8
......@@ -224,15 +224,6 @@ typedef struct SAggFunctionInfo {
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId);
} SAggFunctionInfo;
typedef struct SScalarFunctionInfo {
int16_t functionId; // scalar function id & ~TSDB_FUNC_FLAG_SCALAR == index
char name[TSDB_FUNCTIONS_NAME_MAX_LENGTH];
bool (*init)(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultCellInfo);
void (*xFunction)(SQLFunctionCtx *pCtx);
void (*xFinalize)(SQLFunctionCtx *pCtx);
} SScalarFunctionInfo;
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
......@@ -267,9 +258,6 @@ void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDi
/* global sql function array */
extern struct SAggFunctionInfo aAggs[];
/* global scalar sql functions array */
extern struct SScalarFunctionInfo aScalarFunctions[TSDB_FUNC_SCALAR_MAX_NUM];
extern int32_t functionCompatList[]; // compatible check array list
bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const char *maxval);
......
......@@ -5554,74 +5554,4 @@ SAggFunctionInfo aAggs[] = {{
block_func_merge,
dataBlockRequired,
},
};
static void scalar_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
int32_t numElems = 0;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size -1;
TSKEY* pTimestamp = pCtx->ptsOutputBuf;
qDebug("%p scalar_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull);
for (; i < pCtx->size && i >= 0; i += step) {
char* pData = GET_INPUT_DATA(pCtx, i);
switch (pCtx->functionId) {
case TSDB_FUNC_SCALAR_POW: {
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
setNull(pCtx->pOutput, TSDB_DATA_TYPE_DOUBLE, tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes);
} else {
double v = 0;
GET_TYPED_DATA(v, double, pCtx->inputType, pData);
double result = pow(v, pCtx->param[0].dKey);
SET_TYPED_DATA(pCtx->pOutput, pCtx->outputType, result);
}
break;
}
case TSDB_FUNC_SCALAR_LOG: {
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
setNull(pCtx->pOutput, TSDB_DATA_TYPE_DOUBLE, tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes);
} else {
double v = 0;
GET_TYPED_DATA(v, double, pCtx->inputType, pData);
double result = log(v) / log(pCtx->param[0].dKey);
SET_TYPED_DATA(pCtx->pOutput, pCtx->outputType, result);
}
break;
}
default:
qError("invalid function id %d", pCtx->functionId);
break;
}
++numElems;
pCtx->pOutput += pCtx->outputBytes;
pTimestamp++;
}
if (numElems != 0) {
pResInfo->numOfRes += numElems;
pResInfo->hasResult = DATA_SET_FLAG;
}
}
SScalarFunctionInfo aScalarFunctions[] = {
{
TSDB_FUNC_SCALAR_POW,
"pow",
function_setup,
scalar_function,
doFinalizer,
},
{
TSDB_FUNC_SCALAR_LOG,
"log",
function_setup,
scalar_function,
doFinalizer,
},
};
\ No newline at end of file
......@@ -1046,7 +1046,7 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
} else if (!TSDB_FUNC_IS_SCALAR(functionId)){
aAggs[functionId].xFunction(&pCtx[k]);
} else {
aScalarFunctions[TSDB_FUNC_SCALAR_INDEX(functionId)].xFunction(&pCtx[k]);
assert(0);
}
}
......@@ -1314,7 +1314,7 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
} else if (!TSDB_FUNC_IS_SCALAR(functionId)){
aAggs[functionId].xFunction(&pCtx[k]);
} else {
aScalarFunctions[TSDB_FUNC_SCALAR_INDEX(functionId)].xFunction(&pCtx[k]);
assert(0);
}
}
}
......@@ -1337,7 +1337,7 @@ static void projectApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
} else if (!TSDB_FUNC_IS_SCALAR(pCtx[k].functionId)) {
aAggs[pCtx[k].functionId].xFunction(&pCtx[k]);
} else {
aScalarFunctions[TSDB_FUNC_SCALAR_INDEX(pCtx[k].functionId)].xFunction(&pCtx[k]);
assert(0);
}
}
}
......@@ -3815,7 +3815,7 @@ void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) {
} else if (!TSDB_FUNC_IS_SCALAR(pCtx[j].functionId)) {
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
} else {
aScalarFunctions[TSDB_FUNC_SCALAR_INDEX(pCtx[j].functionId)].init(&pCtx[j], pCtx[j].resultInfo);
assert(0);
}
}
}
......@@ -3877,7 +3877,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
} else if (!TSDB_FUNC_IS_SCALAR(pCtx[j].functionId)) {
aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]);
} else {
aScalarFunctions[TSDB_FUNC_SCALAR_INDEX(pCtx[j].functionId)].xFinalize(&pCtx[j]);
assert(0);
}
}
......@@ -3896,7 +3896,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
} else if (!TSDB_FUNC_IS_SCALAR(pCtx[j].functionId)) {
aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]);
} else {
aScalarFunctions[TSDB_FUNC_SCALAR_INDEX(pCtx[j].functionId)].xFinalize(&pCtx[j]);
assert(0);
}
}
}
......@@ -3993,10 +3993,10 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
if (!pResInfo->initialized) {
if (functionId < 0 ) {
doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[i], 0, TSDB_UDF_FUNC_INIT);
} else if (TSDB_FUNC_IS_SCALAR(functionId)) {
aScalarFunctions[TSDB_FUNC_SCALAR_INDEX(functionId)].init(&pCtx[i], pResInfo);
} else {
} else if (!TSDB_FUNC_IS_SCALAR(functionId)) {
aAggs[functionId].init(&pCtx[i], pResInfo);
} else {
assert(0);
}
}
}
......@@ -5914,10 +5914,10 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
if (functionId < 0 ) {
SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
doInvokeUdf(pUdfInfo, &pInfo->pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
} else if (TSDB_FUNC_IS_SCALAR(functionId)) {
aScalarFunctions[TSDB_FUNC_SCALAR_INDEX(functionId)].xFinalize(&pInfo->pCtx[j]);
} else {
} else if (!TSDB_FUNC_IS_SCALAR(functionId)) {
aAggs[pInfo->pCtx[j].functionId].xFinalize(&pInfo->pCtx[j]);
} else {
assert(0);
}
}
initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfOutput);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册