未验证 提交 99cf4ec4 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #12062 from taosdata/feature/udf

feat(udf): udaf pass the first time
......@@ -1176,7 +1176,7 @@ int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInte
// input: interBuf
// output: resultData
int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
int8_t callType = TSDB_UDF_CALL_AGG_PROC;
int8_t callType = TSDB_UDF_CALL_AGG_FIN;
int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
return err;
}
......@@ -1243,12 +1243,12 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
}
SUdfUvSession *session = (SUdfUvSession *)handle;
SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo);
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
memset(udfRes, 0, envSize);
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
udfRes->session = (SUdfUvSession *)handle;
SUdfInterBuf buf = {0};
if (callUdfAggInit(handle, &buf) != 0) {
......@@ -1260,7 +1260,6 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
}
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
int32_t numOfCols = pInput->numOfInputCols;
......@@ -1320,13 +1319,15 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
SUdfInterBuf resultBuf = {.buf = udfRes->finalResBuf,
.bufLen = session->outputLen,
.numOfResult = udfRes->finalResNum};
SUdfInterBuf resultBuf = {0};
SUdfInterBuf state = {.buf = udfRes->interResBuf,
.bufLen = session->bufSize,
.numOfResult = udfRes->interResNum};
callUdfAggFinalize(session, &state, &resultBuf);
udfRes->finalResBuf = resultBuf.buf;
udfRes->finalResNum = resultBuf.numOfResult;
teardownUdf(session);
if (resultBuf.numOfResult == 1) {
......
......@@ -124,7 +124,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
char *finishSuffix = "_finish";
strncpy(finishFuncName, processFuncName, strlen(processFuncName));
strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggFinishFunc));
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
//TODO: merge
}
return 0;
......
......@@ -27,7 +27,7 @@ int32_t udf2_start(SUdfInterBuf *buf) {
int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
int64_t sumSquares = *(int64_t*)interBuf->buf;
for (int32_t i = 0; i < block->numOfCols; ++i) {
for (int32_t j = 0; j < block->numOfRows; ++i) {
for (int32_t j = 0; j < block->numOfRows; ++j) {
SUdfColumn* col = block->udfCols[i];
//TODO: check the bitmap for null value
int32_t* rows = (int32_t*)col->colData.fixLenCol.data;
......@@ -35,7 +35,7 @@ int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInte
}
}
*(int64_t*)newInterBuf = sumSquares;
*(int64_t*)(newInterBuf->buf) = sumSquares;
newInterBuf->bufLen = sizeof(int64_t);
//TODO: if all null value, numOfResult = 0;
newInterBuf->numOfResult = 1;
......
......@@ -154,6 +154,7 @@ static SNode* logicConditionNodeCopy(const SLogicConditionNode* pSrc, SLogicCond
}
static SNode* functionNodeCopy(const SFunctionNode* pSrc, SFunctionNode* pDst) {
COPY_ALL_SCALAR_FIELDS;
exprNodeCopy((const SExprNode*)pSrc, (SExprNode*)pDst);
COPY_CHAR_ARRAY_FIELD(functionName);
COPY_SCALAR_FIELD(funcId);
......
......@@ -2077,6 +2077,7 @@ static const char* jkFunctionName = "Name";
static const char* jkFunctionId = "Id";
static const char* jkFunctionType = "Type";
static const char* jkFunctionParameter = "Parameters";
static const char* jkFunctionUdfBufSize = "UdfBufSize";
static int32_t functionNodeToJson(const void* pObj, SJson* pJson) {
const SFunctionNode* pNode = (const SFunctionNode*)pObj;
......@@ -2094,6 +2095,9 @@ static int32_t functionNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkFunctionParameter, pNode->pParameterList);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkFunctionUdfBufSize, pNode->udfBufSize);
}
return code;
}
......@@ -2114,6 +2118,9 @@ static int32_t jsonToFunctionNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkFunctionParameter, &pNode->pParameterList);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkFunctionUdfBufSize, &pNode->udfBufSize);
}
return code;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册