未验证 提交 76dd2bac 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #12583 from taosdata/feature/udf

feat: fix udf memory memory found by valgrind
......@@ -63,14 +63,14 @@ static FORCE_INLINE void *taosSkipFixedLen(const void *buf, size_t len) { return
static FORCE_INLINE int32_t taosEncodeFixedBool(void **buf, bool value) {
if (buf != NULL) {
((int8_t *)(*buf))[0] = value ? 1 : 0;
((int8_t *)(*buf))[0] = (value ? 1 : 0);
*buf = POINTER_SHIFT(*buf, sizeof(int8_t));
}
return (int32_t)sizeof(int8_t);
}
static FORCE_INLINE void *taosDecodeFixedBool(const void *buf, bool *value) {
*value = ((int8_t *)buf)[0] == 0 ? false : true;
*value = ( (((int8_t *)buf)[0] == 0) ? false : true );
return POINTER_SHIFT(buf, sizeof(int8_t));
}
......
......@@ -1287,7 +1287,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
task->type = UDF_TASK_SETUP;
SUdfSetupRequest *req = &task->_setup.req;
memcpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
strncpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
if (errCode != 0) {
......
......@@ -226,7 +226,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfDataBlock input = {0};
convertDataBlockToUdfDataBlock(&call->block, &input);
code = udf->scalarProcFunc(&input, &output);
freeUdfDataDataBlock(&input);
convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
freeUdfColumn(&output);
break;
......@@ -246,6 +246,8 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
.bufLen= udf->bufSize,
.numOfResult = 0};
code = udf->aggProcFunc(&input, &call->interBuf, &outBuf);
freeUdfInterBuf(&call->interBuf);
freeUdfDataDataBlock(&input);
subRsp->resultBuf = outBuf;
break;
......@@ -255,6 +257,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
.bufLen= udf->bufSize,
.numOfResult = 0};
code = udf->aggFinishFunc(&call->interBuf, &outBuf);
freeUdfInterBuf(&call->interBuf);
subRsp->resultBuf = outBuf;
break;
}
......@@ -274,6 +277,30 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
encodeUdfResponse(&buf, rsp);
uvUdf->output = uv_buf_init(bufBegin, len);
switch (call->callType) {
case TSDB_UDF_CALL_SCALA_PROC: {
tDeleteSSDataBlock(&call->block);
tDeleteSSDataBlock(&subRsp->resultData);
break;
}
case TSDB_UDF_CALL_AGG_INIT: {
freeUdfInterBuf(&subRsp->resultBuf);
break;
}
case TSDB_UDF_CALL_AGG_PROC: {
tDeleteSSDataBlock(&call->block);
freeUdfInterBuf(&subRsp->resultBuf);
break;
}
case TSDB_UDF_CALL_AGG_FIN: {
freeUdfInterBuf(&subRsp->resultBuf);
break;
}
default:
break;
}
taosMemoryFree(uvUdf->input.base);
return;
}
......@@ -348,9 +375,8 @@ void udfdProcessRequest(uv_work_t *req) {
void udfdOnWrite(uv_write_t *req, int status) {
SUvUdfWork *work = (SUvUdfWork *)req->data;
if (status < 0) {
// TODO:log error and process it.
fnError("udfd send response error, length: %zu code: %s", work->output.len, uv_err_name(status));
}
fnDebug("send response. length:%zu, status: %s", work->output.len, uv_err_name(status));
taosMemoryFree(work->output.base);
taosMemoryFree(work);
taosMemoryFree(req);
......@@ -549,6 +575,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
taosCloseFile(&file);
strncpy(udf->path, path, strlen(path));
tFreeSFuncInfo(pFuncInfo);
taosArrayDestroy(retrieveRsp.pFuncInfos);
msgInfo->code = 0;
}
......@@ -800,15 +827,26 @@ static int32_t udfdUvInit() {
return 0;
}
static void udfdCloseWalkCb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) {
uv_close(handle, NULL);
}
}
static int32_t udfdRun() {
global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
uv_mutex_init(&global.udfsMutex);
fnInfo("start the udfd");
int code = uv_run(global.loop, UV_RUN_DEFAULT);
fnInfo("udfd stopped. result: %s, code: %d", uv_err_name(code), code);
int codeClose = uv_loop_close(global.loop);
fnDebug("uv loop close. result: %s", uv_err_name(codeClose));
fnInfo("start udfd event loop");
uv_run(global.loop, UV_RUN_DEFAULT);
fnInfo("udfd event loop stopped.");
uv_loop_close(global.loop);
uv_walk(global.loop, udfdCloseWalkCb, NULL);
uv_run(global.loop, UV_RUN_DEFAULT);
uv_loop_close(global.loop);
uv_mutex_destroy(&global.udfsMutex);
taosHashCleanup(global.udfsHash);
return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册