提交 193a8471 编写于 作者: S slzhou

udaf pass the first time

上级 00509d0e
...@@ -1176,7 +1176,7 @@ int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInte ...@@ -1176,7 +1176,7 @@ int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInte
// input: interBuf // input: interBuf
// output: resultData // output: resultData
int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) { int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
int8_t callType = TSDB_UDF_CALL_AGG_PROC; int8_t callType = TSDB_UDF_CALL_AGG_FIN;
int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData); int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
return err; return err;
} }
...@@ -1243,12 +1243,12 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult ...@@ -1243,12 +1243,12 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
} }
SUdfUvSession *session = (SUdfUvSession *)handle; SUdfUvSession *session = (SUdfUvSession *)handle;
SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo); SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo);
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize; int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
memset(udfRes, 0, envSize); memset(udfRes, 0, envSize);
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
udfRes->session = (SUdfUvSession *)handle; udfRes->session = (SUdfUvSession *)handle;
SUdfInterBuf buf = {0}; SUdfInterBuf buf = {0};
if (callUdfAggInit(handle, &buf) != 0) { if (callUdfAggInit(handle, &buf) != 0) {
...@@ -1260,7 +1260,6 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult ...@@ -1260,7 +1260,6 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
} }
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
int32_t numOfCols = pInput->numOfInputCols; int32_t numOfCols = pInput->numOfInputCols;
...@@ -1320,13 +1319,15 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { ...@@ -1320,13 +1319,15 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
SUdfInterBuf resultBuf = {.buf = udfRes->finalResBuf, SUdfInterBuf resultBuf = {0};
.bufLen = session->outputLen,
.numOfResult = udfRes->finalResNum};
SUdfInterBuf state = {.buf = udfRes->interResBuf, SUdfInterBuf state = {.buf = udfRes->interResBuf,
.bufLen = session->bufSize, .bufLen = session->bufSize,
.numOfResult = udfRes->interResNum}; .numOfResult = udfRes->interResNum};
callUdfAggFinalize(session, &state, &resultBuf); callUdfAggFinalize(session, &state, &resultBuf);
udfRes->finalResBuf = resultBuf.buf;
udfRes->finalResNum = resultBuf.numOfResult;
teardownUdf(session); teardownUdf(session);
if (resultBuf.numOfResult == 1) { if (resultBuf.numOfResult == 1) {
......
...@@ -124,7 +124,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { ...@@ -124,7 +124,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
char *finishSuffix = "_finish"; char *finishSuffix = "_finish";
strncpy(finishFuncName, processFuncName, strlen(processFuncName)); strncpy(finishFuncName, processFuncName, strlen(processFuncName));
strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggFinishFunc)); uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
//TODO: merge //TODO: merge
} }
return 0; return 0;
......
...@@ -27,7 +27,7 @@ int32_t udf2_start(SUdfInterBuf *buf) { ...@@ -27,7 +27,7 @@ int32_t udf2_start(SUdfInterBuf *buf) {
int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
int64_t sumSquares = *(int64_t*)interBuf->buf; int64_t sumSquares = *(int64_t*)interBuf->buf;
for (int32_t i = 0; i < block->numOfCols; ++i) { for (int32_t i = 0; i < block->numOfCols; ++i) {
for (int32_t j = 0; j < block->numOfRows; ++i) { for (int32_t j = 0; j < block->numOfRows; ++j) {
SUdfColumn* col = block->udfCols[i]; SUdfColumn* col = block->udfCols[i];
//TODO: check the bitmap for null value //TODO: check the bitmap for null value
int32_t* rows = (int32_t*)col->colData.fixLenCol.data; int32_t* rows = (int32_t*)col->colData.fixLenCol.data;
...@@ -35,7 +35,7 @@ int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInte ...@@ -35,7 +35,7 @@ int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInte
} }
} }
*(int64_t*)newInterBuf = sumSquares; *(int64_t*)(newInterBuf->buf) = sumSquares;
newInterBuf->bufLen = sizeof(int64_t); newInterBuf->bufLen = sizeof(int64_t);
//TODO: if all null value, numOfResult = 0; //TODO: if all null value, numOfResult = 0;
newInterBuf->numOfResult = 1; newInterBuf->numOfResult = 1;
......
...@@ -154,6 +154,7 @@ static SNode* logicConditionNodeCopy(const SLogicConditionNode* pSrc, SLogicCond ...@@ -154,6 +154,7 @@ static SNode* logicConditionNodeCopy(const SLogicConditionNode* pSrc, SLogicCond
} }
static SNode* functionNodeCopy(const SFunctionNode* pSrc, SFunctionNode* pDst) { static SNode* functionNodeCopy(const SFunctionNode* pSrc, SFunctionNode* pDst) {
COPY_ALL_SCALAR_FIELDS;
exprNodeCopy((const SExprNode*)pSrc, (SExprNode*)pDst); exprNodeCopy((const SExprNode*)pSrc, (SExprNode*)pDst);
COPY_CHAR_ARRAY_FIELD(functionName); COPY_CHAR_ARRAY_FIELD(functionName);
COPY_SCALAR_FIELD(funcId); COPY_SCALAR_FIELD(funcId);
......
...@@ -1885,6 +1885,7 @@ static const char* jkFunctionName = "Name"; ...@@ -1885,6 +1885,7 @@ static const char* jkFunctionName = "Name";
static const char* jkFunctionId = "Id"; static const char* jkFunctionId = "Id";
static const char* jkFunctionType = "Type"; static const char* jkFunctionType = "Type";
static const char* jkFunctionParameter = "Parameters"; static const char* jkFunctionParameter = "Parameters";
static const char* jkFunctionUdfBufSize = "UdfBufSize";
static int32_t functionNodeToJson(const void* pObj, SJson* pJson) { static int32_t functionNodeToJson(const void* pObj, SJson* pJson) {
const SFunctionNode* pNode = (const SFunctionNode*)pObj; const SFunctionNode* pNode = (const SFunctionNode*)pObj;
...@@ -1902,6 +1903,9 @@ static int32_t functionNodeToJson(const void* pObj, SJson* pJson) { ...@@ -1902,6 +1903,9 @@ static int32_t functionNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkFunctionParameter, pNode->pParameterList); code = nodeListToJson(pJson, jkFunctionParameter, pNode->pParameterList);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkFunctionUdfBufSize, pNode->udfBufSize);
}
return code; return code;
} }
...@@ -1922,6 +1926,9 @@ static int32_t jsonToFunctionNode(const SJson* pJson, void* pObj) { ...@@ -1922,6 +1926,9 @@ static int32_t jsonToFunctionNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkFunctionParameter, &pNode->pParameterList); code = jsonToNodeList(pJson, jkFunctionParameter, &pNode->pParameterList);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkFunctionUdfBufSize, &pNode->udfBufSize);
}
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册