提交 d668ee7b 编写于 作者: S slzhou

fix: aggregate memory leaking

上级 c94c4395
......@@ -87,6 +87,7 @@ typedef struct SUdfInterBuf {
} SUdfInterBuf;
typedef void *UdfcFuncHandle;
//low level APIs
/**
* setup udf
* @param udf, in
......@@ -115,6 +116,9 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t
*/
int32_t doTeardownUdf(UdfcFuncHandle handle);
void freeUdfInterBuf(SUdfInterBuf *buf);
//high level APIs
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
......
......@@ -836,7 +836,7 @@ int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *
fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask);
if (uvTask->type == UV_TASK_REQ_RSP) {
if (uvTask->rspBuf.base != NULL) {
SUdfResponse rsp;
SUdfResponse rsp = {0};
void* buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
assert(uvTask->rspBuf.len == POINTER_DISTANCE(buf, uvTask->rspBuf.base));
task->errCode = rsp.code;
......@@ -1569,6 +1569,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
}
udfRes->interResNum = buf.numOfResult;
memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
freeUdfInterBuf(&buf);
return true;
}
......@@ -1626,7 +1627,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
blockDataDestroy(inputBlock);
taosArrayDestroy(tempBlock.pDataBlock);
taosMemoryFree(newState.buf);
freeUdfInterBuf(&newState);
return udfCode;
}
......@@ -1655,6 +1656,8 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
}
freeUdfInterBuf(&resultBuf);
int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
releaseUdfFuncHandle(pCtx->udfName);
return udfCallCode == 0 ? numOfResults : udfCallCode;
......
......@@ -34,17 +34,7 @@ static int32_t initLog() {
return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, NULL, 0);
}
int main(int argc, char *argv[]) {
parseArgs(argc, argv);
initLog();
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
fnError("failed to start since read config error");
return -1;
}
udfcOpen();
uv_sleep(1000);
int scalarFuncTest() {
UdfcFuncHandle handle;
if (doSetupUdf("udf1", &handle) != 0) {
......@@ -87,5 +77,68 @@ int main(int argc, char *argv[]) {
taosMemoryFree(output.columnData);
doTeardownUdf(handle);
return 0;
}
int aggregateFuncTest() {
UdfcFuncHandle handle;
if (doSetupUdf("udf2", &handle) != 0) {
fnError("setup udf failure");
return -1;
}
SSDataBlock block = {0};
SSDataBlock *pBlock = █
pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
pBlock->info.numOfCols = 1;
pBlock->info.rows = 4;
char data[16] = {0};
char bitmap[4] = {0};
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData colInfo = {0};
colInfo.info.type = TSDB_DATA_TYPE_INT;
colInfo.info.bytes = sizeof(int32_t);
colInfo.info.colId = 1;
colInfo.pData = data;
colInfo.nullbitmap = bitmap;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
colDataAppendInt32(&colInfo, j, &j);
}
taosArrayPush(pBlock->pDataBlock, &colInfo);
}
SUdfInterBuf buf = {0};
SUdfInterBuf newBuf = {0};
SUdfInterBuf resultBuf = {0};
doCallUdfAggInit(handle, &buf);
doCallUdfAggProcess(handle, pBlock, &buf, &newBuf);
taosArrayDestroy(pBlock->pDataBlock);
doCallUdfAggFinalize(handle, &newBuf, &resultBuf);
fprintf(stderr, "agg result: %f\n", *(double*)resultBuf.buf);
freeUdfInterBuf(&buf);
freeUdfInterBuf(&newBuf);
freeUdfInterBuf(&resultBuf);
doTeardownUdf(handle);
return 0;
}
int main(int argc, char *argv[]) {
parseArgs(argc, argv);
initLog();
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
fnError("failed to start since read config error");
return -1;
}
udfcOpen();
uv_sleep(1000);
scalarFuncTest();
aggregateFuncTest();
udfcClose();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册