提交 5b35fcac 编写于 作者: S slzhou

fix: teardown udf functions handles

上级 89ba9439
......@@ -121,6 +121,8 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
int32_t teardownUdfs();
// end API to taosd and qworker
//=============================================================================================================================
// begin API to UDF writer.
......
......@@ -21,13 +21,14 @@
#include "tcache.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tudf.h"
#include "thash.h"
#include "executorimpl.h"
#include "executor.h"
#include "executorimpl.h"
#include "query.h"
#include "thash.h"
#include "tlosertree.h"
#include "ttypes.h"
#include "query.h"
typedef struct STaskMgmt {
TdThreadMutex lock;
......@@ -156,6 +157,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
pTaskInfo->totalRows += current;
teardownUdfs();
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0);
......
......@@ -313,6 +313,7 @@ int64_t gUdfTaskSeqNum = 0;
typedef struct SUdfcFuncStub {
char udfName[TSDB_FUNC_NAME_LEN];
UdfcFuncHandle handle;
int32_t refCount;
} SUdfcFuncStub;
typedef struct SUdfcProxy {
......@@ -338,7 +339,7 @@ typedef struct SUdfcProxy {
SUdfcProxy gUdfdProxy = {0};
typedef struct SClientUdfUvSession {
typedef struct SUdfcUvSession {
SUdfcProxy *udfc;
int64_t severHandle;
uv_pipe_t *udfUvPipe;
......@@ -346,7 +347,9 @@ typedef struct SClientUdfUvSession {
int8_t outputType;
int32_t outputLen;
int32_t bufSize;
} SClientUdfUvSession;
char udfName[TSDB_FUNC_NAME_LEN];
} SUdfcUvSession;
typedef struct SClientUvTaskNode {
SUdfcProxy *udfc;
......@@ -369,7 +372,7 @@ typedef struct SClientUvTaskNode {
typedef struct SClientUdfTask {
int8_t type;
SClientUdfUvSession *session;
SUdfcUvSession *session;
int32_t errCode;
......@@ -401,7 +404,7 @@ typedef struct SClientUvConn {
uv_pipe_t *pipe;
QUEUE taskQueue;
SClientConnBuf readBuf;
SClientUdfUvSession *session;
SUdfcUvSession *session;
} SClientUvConn;
enum {
......@@ -825,11 +828,6 @@ void onUdfcPipeClose(uv_handle_t *handle) {
taosMemoryFree(conn->readBuf.buf);
taosMemoryFree(conn);
taosMemoryFree((uv_pipe_t *) handle);
//clear the udf handles cache TODO move to other thread
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
taosArrayClear(gUdfdProxy.udfStubs);
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
}
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
......@@ -1283,7 +1281,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
}
SClientUdfTask *task = taosMemoryCalloc(1,sizeof(SClientUdfTask));
task->errCode = 0;
task->session = taosMemoryCalloc(1, sizeof(SClientUdfUvSession));
task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession));
task->session->udfc = &gUdfdProxy;
task->type = UDF_TASK_SETUP;
......@@ -1303,6 +1301,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
task->session->outputType = rsp->outputType;
task->session->outputLen = rsp->outputLen;
task->session->bufSize = rsp->bufSize;
strcpy(task->session->udfName, udfName);
if (task->errCode != 0) {
fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode)
} else {
......@@ -1317,14 +1316,14 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
SSDataBlock* output, SUdfInterBuf *newState) {
fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
SClientUdfUvSession *session = (SClientUdfUvSession *) handle;
SUdfcUvSession *session = (SUdfcUvSession *) handle;
if (session->udfUvPipe == NULL) {
fnError("No pipe to udfd");
return TSDB_CODE_UDF_PIPE_NO_PIPE;
}
SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
task->errCode = 0;
task->session = (SClientUdfUvSession *) handle;
task->session = (SUdfcUvSession *) handle;
task->type = UDF_TASK_CALL;
SUdfCallRequest *req = &task->_call.req;
......@@ -1440,7 +1439,7 @@ int compareUdfcFuncSub(const void* elem1, const void* elem2) {
return strcmp(stub1->udfName, stub2->udfName);
}
int32_t setupUdf(char* udfName, UdfcFuncHandle* pHandle) {
int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
int32_t code = 0;
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
SUdfcFuncStub key = {0};
......@@ -1449,6 +1448,7 @@ int32_t setupUdf(char* udfName, UdfcFuncHandle* pHandle) {
if (foundStub != NULL) {
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
*pHandle = foundStub->handle;
++foundStub->refCount;
return 0;
}
*pHandle = NULL;
......@@ -1457,6 +1457,7 @@ int32_t setupUdf(char* udfName, UdfcFuncHandle* pHandle) {
SUdfcFuncStub stub = {0};
strcpy(stub.udfName, udfName);
stub.handle = *pHandle;
++stub.refCount;
taosArrayPush(gUdfdProxy.udfStubs, &stub);
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
} else {
......@@ -1467,13 +1468,25 @@ int32_t setupUdf(char* udfName, UdfcFuncHandle* pHandle) {
return code;
}
void releaseUdfFuncHandle(char* udfName) {
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
SUdfcFuncStub key = {0};
strcpy(key.udfName, udfName);
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
ASSERT(foundStub);
--foundStub->refCount;
ASSERT(foundStub->refCount>=0);
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
}
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
UdfcFuncHandle handle = NULL;
int32_t code = setupUdf(udfName, &handle);
int32_t code = accquireUdfFuncHandle(udfName, &handle);
if (code != 0) {
return code;
}
code = doCallUdfScalarFunc(handle, input, numOfCols, output);
releaseUdfFuncHandle(udfName);
return code;
}
......@@ -1481,7 +1494,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
int32_t doTeardownUdf(UdfcFuncHandle handle) {
fnInfo("tear down udf. udf func handle: %p", handle);
SClientUdfUvSession *session = (SClientUdfUvSession *) handle;
SUdfcUvSession *session = (SUdfcUvSession *) handle;
if (session->udfUvPipe == NULL) {
fnError("pipe to udfd does not exist");
return TSDB_CODE_UDF_PIPE_NO_PIPE;
......@@ -1511,7 +1524,7 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
//memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
typedef struct SUdfAggRes {
SClientUdfUvSession *session;
SUdfcUvSession *session;
int8_t finalResNum;
int8_t interResNum;
char* finalResBuf;
......@@ -1532,11 +1545,11 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
}
UdfcFuncHandle handle;
int32_t udfCode = 0;
if ((udfCode = setupUdf((char *)pCtx->udfName, &handle)) != 0) {
if ((udfCode = accquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
return false;
}
SClientUdfUvSession *session = (SClientUdfUvSession *)handle;
SUdfcUvSession *session = (SUdfcUvSession *)handle;
SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo);
int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
memset(udfRes, 0, envSize);
......@@ -1544,7 +1557,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
udfRes->session = (SClientUdfUvSession *)handle;
udfRes->session = (SUdfcUvSession *)handle;
SUdfInterBuf buf = {0};
if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
......@@ -1560,7 +1573,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
int32_t numOfCols = pInput->numOfInputCols;
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
SClientUdfUvSession *session = udfRes->session;
SUdfcUvSession *session = udfRes->session;
if (session == NULL) {
return TSDB_CODE_UDF_NO_FUNC_HANDLE;
}
......@@ -1615,7 +1628,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
SClientUdfUvSession *session = udfRes->session;
SUdfcUvSession *session = udfRes->session;
if (session == NULL) {
return TSDB_CODE_UDF_NO_FUNC_HANDLE;
}
......@@ -1644,5 +1657,25 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
// }
int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
releaseUdfFuncHandle(pCtx->udfName);
return udfCallCode == 0 ? numOfResults : udfCallCode;
}
int32_t teardownUdfs() {
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
int32_t i = 0;
SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
if (stub->refCount == 0) {
doTeardownUdf(stub->handle);
} else {
taosArrayPush(udfStubs, stub);
}
++i;
}
taosArrayDestroy(gUdfdProxy.udfStubs);
gUdfdProxy.udfStubs = udfStubs;
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册