未验证 提交 02bf4a34 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #12485 from taosdata/feature/udf

feat: reuse udf pipes connecting to udfd
......@@ -309,7 +309,7 @@ void qAddUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo);
void qRemoveUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo);
/**
* create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf
* create udfd proxy, called once in process that call doSetupUdf/callUdfxxx/doTeardownUdf
* @return error code
*/
int32_t udfcOpen();
......
......@@ -39,16 +39,6 @@ extern "C" {
//======================================================================================
//begin API to taosd and qworker
typedef void *UdfcFuncHandle;
/**
* setup udf
* @param udf, in
* @param handle, out
* @return error code
*/
int32_t setupUdf(char udfName[], UdfcFuncHandle *handle);
typedef struct SUdfColumnMeta {
int16_t type;
int32_t bytes;
......@@ -95,32 +85,42 @@ typedef struct SUdfInterBuf {
char* buf;
int8_t numOfResult; //zero or one
} SUdfInterBuf;
typedef void *UdfcFuncHandle;
/**
* setup udf
* @param udf, in
* @param funcHandle, out
* @return error code
*/
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle);
// output: interBuf
int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf);
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf);
// input: block, state
// output: newState
int32_t callUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState);
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState);
// input: interBuf
// output: resultData
int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData);
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData);
// input: interbuf1, interbuf2
// output: resultBuf
int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf);
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf);
// input: block
// output: resultData
int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
/**
* tearn down udf
* @param handle
* @return
*/
int32_t teardownUdf(UdfcFuncHandle handle);
int32_t doTeardownUdf(UdfcFuncHandle handle);
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
// end API to taosd and qworker
//=============================================================================================================================
// begin API to UDF writer.
......
......@@ -43,7 +43,7 @@ int32_t setupUdf(SUdfInfo* udf, UdfcFuncHandle* handle);
int32_t callUdf(UdfcFuncHandle handle, int8_t step, char *state, int32_t stateSize, SSDataBlock input, char **newstate,
int32_t *newStateSize, SSDataBlock *output);
int32_t teardownUdf(UdfcFuncHandle handle);
int32_t doTeardownUdf(UdfcFuncHandle handle);
typedef struct SUdfSetupRequest {
char udfName[16]; //
......
......@@ -310,6 +310,11 @@ enum {
};
int64_t gUdfTaskSeqNum = 0;
typedef struct SUdfcFuncStub {
char udfName[TSDB_FUNC_NAME_LEN];
UdfcFuncHandle handle;
} SUdfcFuncStub;
typedef struct SUdfcProxy {
char udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
uv_barrier_t initBarrier;
......@@ -325,6 +330,9 @@ typedef struct SUdfcProxy {
QUEUE taskQueue;
QUEUE uvProcTaskQueue;
uv_mutex_t udfStubsMutex;
SArray* udfStubs; // SUdfcFuncStub
int8_t initialized;
} SUdfcProxy;
......@@ -1222,6 +1230,8 @@ int32_t udfcOpen() {
atomic_store_8(&proxy->udfcState, UDFC_STATE_READY);
proxy->udfcState = UDFC_STATE_READY;
uv_barrier_wait(&proxy->initBarrier);
uv_mutex_init(&proxy->udfStubsMutex);
proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
fnInfo("udfc initialized")
return 0;
}
......@@ -1238,6 +1248,8 @@ int32_t udfcClose() {
uv_thread_join(&udfc->loopThread);
uv_mutex_destroy(&udfc->taskQueueMutex);
uv_barrier_destroy(&udfc->initBarrier);
taosArrayDestroy(udfc->udfStubs);
uv_mutex_destroy(&udfc->udfStubsMutex);
udfc->udfcState = UDFC_STATE_INITAL;
fnInfo("udfc cleaned up");
return 0;
......@@ -1259,7 +1271,7 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
return task->errCode;
}
int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
fnInfo("udfc setup udf. udfName: %s", udfName);
if (gUdfdProxy.udfcState != UDFC_STATE_READY) {
return TSDB_CODE_UDF_INVALID_STATE;
......@@ -1287,7 +1299,7 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
task->session->outputLen = rsp->outputLen;
task->session->bufSize = rsp->bufSize;
if (task->errCode != 0) {
fnError("failed to setup udf. err: %d", task->errCode)
fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode)
} else {
fnInfo("sucessfully setup udf func handle. handle: %p", task->session);
*funcHandle = task->session;
......@@ -1373,7 +1385,7 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
return err;
}
int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
int8_t callType = TSDB_UDF_CALL_AGG_INIT;
int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf);
......@@ -1383,7 +1395,7 @@ int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
// input: block, state
// output: interbuf,
int32_t callUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) {
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) {
int8_t callType = TSDB_UDF_CALL_AGG_PROC;
int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState);
return err;
......@@ -1391,7 +1403,7 @@ int32_t callUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBu
// input: interbuf1, interbuf2
// output: resultBuf
int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) {
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) {
int8_t callType = TSDB_UDF_CALL_AGG_MERGE;
int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf);
return err;
......@@ -1399,13 +1411,13 @@ int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInte
// input: interBuf
// output: resultData
int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
int8_t callType = TSDB_UDF_CALL_AGG_FIN;
int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
return err;
}
int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam* output) {
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam* output) {
int8_t callType = TSDB_UDF_CALL_SCALA_PROC;
SSDataBlock inputBlock = {0};
convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
......@@ -1417,7 +1429,50 @@ int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t nu
return err;
}
int32_t teardownUdf(UdfcFuncHandle handle) {
int compareUdfcFuncSub(const void* elem1, const void* elem2) {
SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
return strcmp(stub1->udfName, stub2->udfName);
}
int32_t setupUdf(char* udfName, UdfcFuncHandle* pHandle) {
int32_t code = 0;
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
SUdfcFuncStub key = {0};
strcpy(key.udfName, udfName);
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
if (foundStub != NULL) {
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
*pHandle = foundStub->handle;
return 0;
}
*pHandle = NULL;
code = doSetupUdf(udfName, pHandle);
if (code == TSDB_CODE_SUCCESS) {
SUdfcFuncStub stub = {0};
strcpy(stub.udfName, udfName);
stub.handle = *pHandle;
taosArrayPush(gUdfdProxy.udfStubs, &stub);
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
} else {
*pHandle = NULL;
}
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return code;
}
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
UdfcFuncHandle handle = NULL;
int32_t code = setupUdf(udfName, &handle);
if (code != 0) {
return code;
}
code = doCallUdfScalarFunc(handle, input, numOfCols, output);
return code;
}
int32_t doTeardownUdf(UdfcFuncHandle handle) {
fnInfo("tear down udf. udf func handle: %p", handle);
SClientUdfUvSession *session = (SClientUdfUvSession *) handle;
......@@ -1471,8 +1526,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
}
UdfcFuncHandle handle;
int32_t udfCode = 0;
if ((udfCode = setupUdf((char*)pCtx->udfName, &handle)) != 0) {
fnError("udfAggInit error. step setupUdf. udf code: %d", udfCode);
if ((udfCode = setupUdf((char *)pCtx->udfName, &handle)) != 0) {
fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
return false;
}
SClientUdfUvSession *session = (SClientUdfUvSession *)handle;
......@@ -1485,8 +1540,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
udfRes->session = (SClientUdfUvSession *)handle;
SUdfInterBuf buf = {0};
if ((udfCode = callUdfAggInit(handle, &buf)) != 0) {
fnError("udfAggInit error. step callUdfAggInit. udf code: %d", udfCode);
if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
return false;
}
udfRes->interResNum = buf.numOfResult;
......@@ -1533,7 +1588,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
.numOfResult = udfRes->interResNum};
SUdfInterBuf newState = {0};
int32_t udfCode = callUdfAggProcess(session, inputBlock, &state, &newState);
int32_t udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
if (udfCode != 0) {
fnError("udfAggProcess error. code: %d", udfCode);
newState.numOfResult = 0;
......@@ -1567,9 +1622,9 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
.bufLen = session->bufSize,
.numOfResult = udfRes->interResNum};
int32_t udfCallCode= 0;
udfCallCode= callUdfAggFinalize(session, &state, &resultBuf);
if (udfCallCode!= 0) {
fnError("udfAggFinalize error. callUdfAggFinalize step. udf code:%d", udfCallCode);
udfCallCode= doCallUdfAggFinalize(session, &state, &resultBuf);
if (udfCallCode != 0) {
fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
GET_RES_INFO(pCtx)->numOfRes = 0;
} else {
memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen);
......@@ -1577,11 +1632,11 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
}
int32_t code = teardownUdf(session);
if (code != 0) {
fnError("udfAggFinalize error. teardownUdf step. udf code: %d", code);
}
return functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
// int32_t code = doTeardownUdf(session);
// if (code != 0) {
// fnError("udfAggFinalize error. doTeardownUdf step. udf code: %d", code);
// }
int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
return udfCallCode == 0 ? numOfResults : udfCallCode;
}
\ No newline at end of file
......@@ -47,7 +47,7 @@ int main(int argc, char *argv[]) {
UdfcFuncHandle handle;
setupUdf("udf1", &handle);
doSetupUdf("udf1", &handle);
SSDataBlock block = {0};
SSDataBlock *pBlock = █
......@@ -73,12 +73,12 @@ int main(int argc, char *argv[]) {
input.numOfRows = pBlock->info.rows;
input.columnData = taosArrayGet(pBlock->pDataBlock, 0);
SScalarParam output = {0};
callUdfScalarFunc(handle, &input, 1, &output);
doCallUdfScalarFunc(handle, &input, 1, &output);
SColumnInfoData *col = output.columnData;
for (int32_t i = 0; i < output.numOfRows; ++i) {
fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t)));
}
teardownUdf(handle);
doTeardownUdf(handle);
udfcClose();
}
......@@ -362,19 +362,7 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &paramNum, &rowNum));
if (fmIsUserDefinedFunc(node->funcId)) {
UdfcFuncHandle udfHandle = NULL;
code = setupUdf(node->functionName, &udfHandle);
if (code != 0) {
sclError("fmExecFunction error. setupUdf. function name: %s, code:%d", node->functionName, code);
goto _return;
}
code = callUdfScalarFunc(udfHandle, params, paramNum, output);
if (code != 0) {
sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
goto _return;
}
code = teardownUdf(udfHandle);
code = callUdfScalarFunc(node->functionName, params, paramNum, output);
if (code != 0) {
sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
goto _return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册