提交 a018c701 编写于 作者: S slzhou

udfd proxy init and close for udf calling

上级 d044a4ee
...@@ -34,23 +34,24 @@ extern "C" { ...@@ -34,23 +34,24 @@ extern "C" {
enum { enum {
UDFC_CODE_STOPPING = -1, UDFC_CODE_STOPPING = -1,
UDFC_CODE_RESTARTING = -2,
UDFC_CODE_PIPE_READ_ERR = -3, UDFC_CODE_PIPE_READ_ERR = -3,
}; };
typedef void *UdfcHandle;
typedef void *UdfcFuncHandle;
/** /**
* create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf * create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf
* @return error code * @return error code
*/ */
int32_t createUdfdProxy(int32_t dnodeId); int32_t udfcOpen(int32_t dnodeId, UdfcHandle* proxyHandle);
/** /**
* destroy udfd proxy * destroy udfd proxy
* @return error code * @return error code
*/ */
int32_t destroyUdfdProxy(int32_t dnodeId); int32_t udfcClose(UdfcHandle proxyhandle);
typedef void *UdfHandle;
/** /**
* setup udf * setup udf
...@@ -58,7 +59,7 @@ typedef void *UdfHandle; ...@@ -58,7 +59,7 @@ typedef void *UdfHandle;
* @param handle, out * @param handle, out
* @return error code * @return error code
*/ */
int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle); int32_t setupUdf(UdfcHandle proxyHandle, char udfName[], SEpSet *epSet, UdfcFuncHandle *handle);
typedef struct SUdfColumnMeta { typedef struct SUdfColumnMeta {
int16_t type; int16_t type;
...@@ -105,26 +106,26 @@ typedef struct SUdfInterBuf { ...@@ -105,26 +106,26 @@ typedef struct SUdfInterBuf {
} SUdfInterBuf; } SUdfInterBuf;
// output: interBuf // output: interBuf
int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf); int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf);
// input: block, state // input: block, state
// output: newState // output: newState
int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState); int32_t callUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState);
// input: interBuf // input: interBuf
// output: resultData // output: resultData
int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData);
// input: interbuf1, interbuf2 // input: interbuf1, interbuf2
// output: resultBuf // output: resultBuf
int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf);
// input: block // input: block
// output: resultData // output: resultData
int32_t callUdfScalaProcess(UdfHandle handle, SSDataBlock *block, SSDataBlock *resultData); int32_t callUdfScalaProcess(UdfcFuncHandle handle, SSDataBlock *block, SSDataBlock *resultData);
/** /**
* tearn down udf * tearn down udf
* @param handle * @param handle
* @return * @return
*/ */
int32_t teardownUdf(UdfHandle handle); int32_t teardownUdf(UdfcFuncHandle handle);
// end API to taosd and qworker // end API to taosd and qworker
//============================================================================================================================= //=============================================================================================================================
......
...@@ -30,20 +30,20 @@ typedef struct SUdfInfo { ...@@ -30,20 +30,20 @@ typedef struct SUdfInfo {
char *path; char *path;
} SUdfInfo; } SUdfInfo;
typedef void *UdfHandle; typedef void *UdfcFuncHandle;
int32_t createUdfdProxy(); int32_t createUdfdProxy();
int32_t destroyUdfdProxy(); int32_t destroyUdfdProxy();
//int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfHandle *handles); //int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfcFuncHandle *handles);
int32_t setupUdf(SUdfInfo* udf, UdfHandle* handle); int32_t setupUdf(SUdfInfo* udf, UdfcFuncHandle* handle);
int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SSDataBlock input, char **newstate, int32_t callUdf(UdfcFuncHandle handle, int8_t step, char *state, int32_t stateSize, SSDataBlock input, char **newstate,
int32_t *newStateSize, SSDataBlock *output); int32_t *newStateSize, SSDataBlock *output);
int32_t teardownUdf(UdfHandle handle); int32_t teardownUdf(UdfcFuncHandle handle);
typedef struct SUdfSetupRequest { typedef struct SUdfSetupRequest {
char udfName[16]; // char udfName[16]; //
......
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
*/ */
#include "uv.h" #include "uv.h"
#include "os.h" #include "os.h"
#include "tlog.h"
#include "tudf.h" #include "tudf.h"
#include "tudfInt.h" #include "tudfInt.h"
#include "tarray.h" #include "tarray.h"
...@@ -122,12 +121,35 @@ enum { ...@@ -122,12 +121,35 @@ enum {
UV_TASK_DISCONNECT = 2 UV_TASK_DISCONNECT = 2
}; };
int64_t gUdfTaskSeqNum = 0;
typedef struct SUdfdProxy {
int32_t dnodeId;
uv_barrier_t gUdfInitBarrier;
uv_loop_t gUdfdLoop;
uv_thread_t gUdfLoopThread;
uv_async_t gUdfLoopTaskAync;
uv_async_t gUdfLoopStopAsync;
uv_mutex_t gUdfTaskQueueMutex;
int8_t gUdfcState;
QUEUE gUdfTaskQueue;
QUEUE gUvProcTaskQueue;
// int8_t gUdfcState = UDFC_STATE_INITAL;
// QUEUE gUdfTaskQueue = {0};
// QUEUE gUvProcTaskQueue = {0};
} SUdfdProxy;
typedef struct SUdfUvSession { typedef struct SUdfUvSession {
SUdfdProxy *udfc;
int64_t severHandle; int64_t severHandle;
uv_pipe_t *udfSvcPipe; uv_pipe_t *udfSvcPipe;
} SUdfUvSession; } SUdfUvSession;
typedef struct SClientUvTaskNode { typedef struct SClientUvTaskNode {
SUdfdProxy *udfc;
int8_t type; int8_t type;
int errCode; int errCode;
...@@ -166,7 +188,6 @@ typedef struct SClientUdfTask { ...@@ -166,7 +188,6 @@ typedef struct SClientUdfTask {
} _teardown; } _teardown;
}; };
} SClientUdfTask; } SClientUdfTask;
typedef struct SClientConnBuf { typedef struct SClientConnBuf {
...@@ -182,31 +203,13 @@ typedef struct SClientUvConn { ...@@ -182,31 +203,13 @@ typedef struct SClientUvConn {
SClientConnBuf readBuf; SClientConnBuf readBuf;
} SClientUvConn; } SClientUvConn;
uv_barrier_t gUdfInitBarrier;
uv_loop_t gUdfdLoop;
uv_thread_t gUdfLoopThread;
uv_async_t gUdfLoopTaskAync;
uv_async_t gUdfLoopStopAsync;
uv_mutex_t gUdfTaskQueueMutex;
int64_t gUdfTaskSeqNum = 0;
enum { enum {
UDFC_STATE_INITAL = 0, // initial state UDFC_STATE_INITAL = 0, // initial state
UDFC_STATE_STARTNG, // starting after createUdfdProxy UDFC_STATE_STARTNG, // starting after udfcOpen
UDFC_STATE_READY, // started and begin to receive quests UDFC_STATE_READY, // started and begin to receive quests
UDFC_STATE_STOPPING, // stopping after destroyUdfdProxy UDFC_STATE_STOPPING, // stopping after udfcClose
UDFC_STATUS_FINAL, // stopped UDFC_STATUS_FINAL, // stopped
}; };
int8_t gUdfcState = UDFC_STATE_INITAL;
//double circular linked list
QUEUE gUdfTaskQueue = {0};
QUEUE gUvProcTaskQueue = {0};
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) { int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
int32_t len = 0; int32_t len = 0;
...@@ -771,13 +774,14 @@ void onUdfClientConnect(uv_connect_t *connect, int status) { ...@@ -771,13 +774,14 @@ void onUdfClientConnect(uv_connect_t *connect, int status) {
int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) { int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) {
SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode)); SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
uvTask->type = uvTaskType; uvTask->type = uvTaskType;
uvTask->udfc = task->session->udfc;
if (uvTaskType == UV_TASK_CONNECT) { if (uvTaskType == UV_TASK_CONNECT) {
} else if (uvTaskType == UV_TASK_REQ_RSP) { } else if (uvTaskType == UV_TASK_REQ_RSP) {
uvTask->pipe = task->session->udfSvcPipe; uvTask->pipe = task->session->udfSvcPipe;
SUdfRequest request; SUdfRequest request;
request.type = task->type; request.type = task->type;
request.seqNum = gUdfTaskSeqNum++; request.seqNum = atomic_fetch_add_64(&gUdfTaskSeqNum, 1);
if (task->type == UDF_TASK_SETUP) { if (task->type == UDF_TASK_SETUP) {
request.setup = task->_setup.req; request.setup = task->_setup.req;
...@@ -809,11 +813,11 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN ...@@ -809,11 +813,11 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) { int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) {
debugPrint("%s, %d", "queue uv task", uvTask->type); debugPrint("%s, %d", "queue uv task", uvTask->type);
SUdfdProxy *udfc = uvTask->udfc;
uv_mutex_lock(&gUdfTaskQueueMutex); uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
QUEUE_INSERT_TAIL(&gUdfTaskQueue, &uvTask->recvTaskQueue); QUEUE_INSERT_TAIL(&udfc->gUdfTaskQueue, &uvTask->recvTaskQueue);
uv_mutex_unlock(&gUdfTaskQueueMutex); uv_mutex_unlock(&udfc->gUdfTaskQueueMutex);
uv_async_send(&gUdfLoopTaskAync); uv_async_send(&udfc->gUdfLoopTaskAync);
uv_sem_wait(&uvTask->taskSem); uv_sem_wait(&uvTask->taskSem);
uv_sem_destroy(&uvTask->taskSem); uv_sem_destroy(&uvTask->taskSem);
...@@ -826,7 +830,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { ...@@ -826,7 +830,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
switch (uvTask->type) { switch (uvTask->type) {
case UV_TASK_CONNECT: { case UV_TASK_CONNECT: {
uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t)); uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
uv_pipe_init(&gUdfdLoop, pipe, 0); uv_pipe_init(&uvTask->udfc->gUdfdLoop, pipe, 0);
uvTask->pipe = pipe; uvTask->pipe = pipe;
SClientUvConn *conn = taosMemoryMalloc(sizeof(SClientUvConn)); SClientUvConn *conn = taosMemoryMalloc(sizeof(SClientUvConn));
...@@ -867,45 +871,46 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { ...@@ -867,45 +871,46 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
} }
void udfClientAsyncCb(uv_async_t *async) { void udfClientAsyncCb(uv_async_t *async) {
SUdfdProxy *udfc = async->data;
QUEUE wq; QUEUE wq;
uv_mutex_lock(&gUdfTaskQueueMutex); uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
QUEUE_MOVE(&gUdfTaskQueue, &wq); QUEUE_MOVE(&udfc->gUdfTaskQueue, &wq);
uv_mutex_unlock(&gUdfTaskQueueMutex); uv_mutex_unlock(&udfc->gUdfTaskQueueMutex);
while (!QUEUE_EMPTY(&wq)) { while (!QUEUE_EMPTY(&wq)) {
QUEUE* h = QUEUE_HEAD(&wq); QUEUE* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
startUvUdfTask(task); startUvUdfTask(task);
QUEUE_INSERT_TAIL(&gUvProcTaskQueue, &task->procTaskQueue); QUEUE_INSERT_TAIL(&udfc->gUvProcTaskQueue, &task->procTaskQueue);
} }
} }
void cleanUpUvTasks() { void cleanUpUvTasks(SUdfdProxy *udfc) {
QUEUE wq; QUEUE wq;
uv_mutex_lock(&gUdfTaskQueueMutex); uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
QUEUE_MOVE(&gUdfTaskQueue, &wq); QUEUE_MOVE(&udfc->gUdfTaskQueue, &wq);
uv_mutex_unlock(&gUdfTaskQueueMutex); uv_mutex_unlock(&udfc->gUdfTaskQueueMutex);
while (!QUEUE_EMPTY(&wq)) { while (!QUEUE_EMPTY(&wq)) {
QUEUE* h = QUEUE_HEAD(&wq); QUEUE* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
if (gUdfcState == UDFC_STATE_STOPPING) { if (udfc->gUdfcState == UDFC_STATE_STOPPING) {
task->errCode = UDFC_CODE_STOPPING; task->errCode = UDFC_CODE_STOPPING;
} }
uv_sem_post(&task->taskSem); uv_sem_post(&task->taskSem);
} }
// TODO: deal with tasks that are waiting result. // TODO: deal with tasks that are waiting result.
while (!QUEUE_EMPTY(&gUvProcTaskQueue)) { while (!QUEUE_EMPTY(&udfc->gUvProcTaskQueue)) {
QUEUE* h = QUEUE_HEAD(&gUvProcTaskQueue); QUEUE* h = QUEUE_HEAD(&udfc->gUvProcTaskQueue);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
if (gUdfcState == UDFC_STATE_STOPPING) { if (udfc->gUdfcState == UDFC_STATE_STOPPING) {
task->errCode = UDFC_CODE_STOPPING; task->errCode = UDFC_CODE_STOPPING;
} }
uv_sem_post(&task->taskSem); uv_sem_post(&task->taskSem);
...@@ -913,42 +918,51 @@ void cleanUpUvTasks() { ...@@ -913,42 +918,51 @@ void cleanUpUvTasks() {
} }
void udfStopAsyncCb(uv_async_t *async) { void udfStopAsyncCb(uv_async_t *async) {
cleanUpUvTasks(); SUdfdProxy *udfc = async->data;
if (gUdfcState == UDFC_STATE_STOPPING) { cleanUpUvTasks(udfc);
uv_stop(&gUdfdLoop); if (udfc->gUdfcState == UDFC_STATE_STOPPING) {
uv_stop(&udfc->gUdfdLoop);
} }
} }
void constructUdfService(void *argsThread) { void constructUdfService(void *argsThread) {
uv_loop_init(&gUdfdLoop); SUdfdProxy *udfc = (SUdfdProxy*)argsThread;
uv_loop_init(&udfc->gUdfdLoop);
uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb);
uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb); uv_async_init(&udfc->gUdfdLoop, &udfc->gUdfLoopTaskAync, udfClientAsyncCb);
uv_mutex_init(&gUdfTaskQueueMutex); udfc->gUdfLoopTaskAync.data = udfc;
QUEUE_INIT(&gUdfTaskQueue); uv_async_init(&udfc->gUdfdLoop, &udfc->gUdfLoopStopAsync, udfStopAsyncCb);
QUEUE_INIT(&gUvProcTaskQueue); udfc->gUdfLoopStopAsync.data = udfc;
uv_barrier_wait(&gUdfInitBarrier); uv_mutex_init(&udfc->gUdfTaskQueueMutex);
QUEUE_INIT(&udfc->gUdfTaskQueue);
QUEUE_INIT(&udfc->gUvProcTaskQueue);
uv_barrier_wait(&udfc->gUdfInitBarrier);
//TODO return value of uv_run //TODO return value of uv_run
uv_run(&gUdfdLoop, UV_RUN_DEFAULT); uv_run(&udfc->gUdfdLoop, UV_RUN_DEFAULT);
uv_loop_close(&gUdfdLoop); uv_loop_close(&udfc->gUdfdLoop);
} }
int32_t createUdfdProxy(int32_t dnodeId) { int32_t udfcOpen(int32_t dnodeId, UdfcHandle *udfc) {
gUdfcState = UDFC_STATE_STARTNG; SUdfdProxy *proxy = taosMemoryCalloc(1, sizeof(SUdfdProxy));
uv_barrier_init(&gUdfInitBarrier, 2); proxy->dnodeId = dnodeId;
uv_thread_create(&gUdfLoopThread, constructUdfService, 0); proxy->gUdfcState = UDFC_STATE_STARTNG;
uv_barrier_wait(&gUdfInitBarrier); uv_barrier_init(&proxy->gUdfInitBarrier, 2);
gUdfcState = UDFC_STATE_READY; uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy);
uv_barrier_wait(&proxy->gUdfInitBarrier);
proxy->gUdfcState = UDFC_STATE_READY;
*udfc = proxy;
return 0; return 0;
} }
int32_t destroyUdfdProxy(int32_t dnodeId) { int32_t udfcClose(UdfcHandle udfcHandle) {
gUdfcState = UDFC_STATE_STOPPING; SUdfdProxy *udfc = udfcHandle;
uv_async_send(&gUdfLoopStopAsync); udfc->gUdfcState = UDFC_STATE_STOPPING;
uv_thread_join(&gUdfLoopThread); uv_async_send(&udfc->gUdfLoopStopAsync);
uv_mutex_destroy(&gUdfTaskQueueMutex); uv_thread_join(&udfc->gUdfLoopThread);
uv_barrier_destroy(&gUdfInitBarrier); uv_mutex_destroy(&udfc->gUdfTaskQueueMutex);
gUdfcState = UDFC_STATUS_FINAL; uv_barrier_destroy(&udfc->gUdfInitBarrier);
udfc->gUdfcState = UDFC_STATUS_FINAL;
taosMemoryFree(udfc);
return 0; return 0;
} }
...@@ -966,11 +980,12 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) { ...@@ -966,11 +980,12 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) {
return task->errCode; return task->errCode;
} }
int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle) { int32_t setupUdf(UdfcHandle udfc, char udfName[], SEpSet *epSet, UdfcFuncHandle *funcHandle) {
debugPrint("%s", "client setup udf"); debugPrint("%s", "client setup udf");
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
task->errCode = 0; task->errCode = 0;
task->session = taosMemoryMalloc(sizeof(SUdfUvSession)); task->session = taosMemoryMalloc(sizeof(SUdfUvSession));
task->session->udfc = udfc;
task->type = UDF_TASK_SETUP; task->type = UDF_TASK_SETUP;
SUdfSetupRequest *req = &task->_setup.req; SUdfSetupRequest *req = &task->_setup.req;
...@@ -986,13 +1001,13 @@ int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle) { ...@@ -986,13 +1001,13 @@ int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle) {
SUdfSetupResponse *rsp = &task->_setup.rsp; SUdfSetupResponse *rsp = &task->_setup.rsp;
task->session->severHandle = rsp->udfHandle; task->session->severHandle = rsp->udfHandle;
*handle = task->session; *funcHandle = task->session;
int32_t err = task->errCode; int32_t err = task->errCode;
taosMemoryFree(task); taosMemoryFree(task);
return err; return err;
} }
int32_t callUdf(UdfHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
SSDataBlock* output, SUdfInterBuf *newState) { SSDataBlock* output, SUdfInterBuf *newState) {
debugPrint("%s", "client call udf"); debugPrint("%s", "client call udf");
...@@ -1061,7 +1076,7 @@ int32_t callUdf(UdfHandle handle, int8_t callType, SSDataBlock *input, SUdfInter ...@@ -1061,7 +1076,7 @@ int32_t callUdf(UdfHandle handle, int8_t callType, SSDataBlock *input, SUdfInter
} }
//TODO: translate these calls to callUdf //TODO: translate these calls to callUdf
int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf) { int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
int8_t callType = TSDB_UDF_CALL_AGG_INIT; int8_t callType = TSDB_UDF_CALL_AGG_INIT;
int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf); int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf);
...@@ -1071,7 +1086,7 @@ int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf) { ...@@ -1071,7 +1086,7 @@ int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf) {
// input: block, state // input: block, state
// output: interbuf, // output: interbuf,
int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) { int32_t callUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) {
int8_t callType = TSDB_UDF_CALL_AGG_PROC; int8_t callType = TSDB_UDF_CALL_AGG_PROC;
int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState); int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState);
return err; return err;
...@@ -1079,7 +1094,7 @@ int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *st ...@@ -1079,7 +1094,7 @@ int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *st
// input: interbuf1, interbuf2 // input: interbuf1, interbuf2
// output: resultBuf // output: resultBuf
int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) { int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) {
int8_t callType = TSDB_UDF_CALL_AGG_MERGE; int8_t callType = TSDB_UDF_CALL_AGG_MERGE;
int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf); int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf);
return err; return err;
...@@ -1087,7 +1102,7 @@ int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf ...@@ -1087,7 +1102,7 @@ int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf
// input: interBuf // input: interBuf
// output: resultData // output: resultData
int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) { int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
int8_t callType = TSDB_UDF_CALL_AGG_PROC; int8_t callType = TSDB_UDF_CALL_AGG_PROC;
int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData); int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
return err; return err;
...@@ -1095,13 +1110,13 @@ int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBu ...@@ -1095,13 +1110,13 @@ int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBu
// input: block // input: block
// output: resultData // output: resultData
int32_t callUdfScalaProcess(UdfHandle handle, SSDataBlock *block, SSDataBlock *resultData) { int32_t callUdfScalaProcess(UdfcFuncHandle handle, SSDataBlock *block, SSDataBlock *resultData) {
int8_t callType = TSDB_UDF_CALL_SCALA_PROC; int8_t callType = TSDB_UDF_CALL_SCALA_PROC;
int32_t err = callUdf(handle, callType, block, NULL, NULL, resultData, NULL); int32_t err = callUdf(handle, callType, block, NULL, NULL, resultData, NULL);
return err; return err;
} }
int32_t teardownUdf(UdfHandle handle) { int32_t teardownUdf(UdfcFuncHandle handle) {
debugPrint("%s", "client teardown udf"); debugPrint("%s", "client teardown udf");
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
......
...@@ -76,9 +76,9 @@ typedef struct SUdf { ...@@ -76,9 +76,9 @@ typedef struct SUdf {
// TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix // TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
// TODO: add private udf structure. // TODO: add private udf structure.
typedef struct SUdfHandle { typedef struct SUdfcFuncHandle {
SUdf *udf; SUdf *udf;
} SUdfHandle; } SUdfcFuncHandle;
int32_t udfdLoadUdf(char* udfName, SUdf* udf) { int32_t udfdLoadUdf(char* udfName, SUdf* udf) {
strcpy(udf->name, udfName); strcpy(udf->name, udfName);
...@@ -143,7 +143,7 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -143,7 +143,7 @@ void udfdProcessRequest(uv_work_t *req) {
} }
uv_mutex_unlock(&udf->lock); uv_mutex_unlock(&udf->lock);
} }
SUdfHandle *handle = taosMemoryMalloc(sizeof(SUdfHandle)); SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
handle->udf = udf; handle->udf = udf;
// TODO: allocate private structure and call init function and set it to handle // TODO: allocate private structure and call init function and set it to handle
SUdfResponse rsp; SUdfResponse rsp;
...@@ -166,7 +166,7 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -166,7 +166,7 @@ void udfdProcessRequest(uv_work_t *req) {
case UDF_TASK_CALL: { case UDF_TASK_CALL: {
SUdfCallRequest *call = &request.call; SUdfCallRequest *call = &request.call;
fnDebug("%"PRId64 "call request. call type %d, handle: %"PRIx64, request.seqNum, call->callType, call->udfHandle); fnDebug("%"PRId64 "call request. call type %d, handle: %"PRIx64, request.seqNum, call->callType, call->udfHandle);
SUdfHandle *handle = (SUdfHandle *)(call->udfHandle); SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
SUdf *udf = handle->udf; SUdf *udf = handle->udf;
SUdfDataBlock input = {0}; SUdfDataBlock input = {0};
...@@ -204,7 +204,7 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -204,7 +204,7 @@ void udfdProcessRequest(uv_work_t *req) {
case UDF_TASK_TEARDOWN: { case UDF_TASK_TEARDOWN: {
SUdfTeardownRequest *teardown = &request.teardown; SUdfTeardownRequest *teardown = &request.teardown;
fnInfo("teardown. %"PRId64"handle:%"PRIx64, request.seqNum, teardown->udfHandle) fnInfo("teardown. %"PRId64"handle:%"PRIx64, request.seqNum, teardown->udfHandle)
SUdfHandle *handle = (SUdfHandle *)(teardown->udfHandle); SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
SUdf *udf = handle->udf; SUdf *udf = handle->udf;
bool unloadUdf = false; bool unloadUdf = false;
uv_mutex_lock(&global.udfsMutex); uv_mutex_lock(&global.udfsMutex);
......
...@@ -8,7 +8,8 @@ ...@@ -8,7 +8,8 @@
#include "tdatablock.h" #include "tdatablock.h"
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
createUdfdProxy(1); UdfcHandle udfc;
udfcOpen(1, &udfc);
uv_sleep(1000); uv_sleep(1000);
char path[256] = {0}; char path[256] = {0};
size_t cwdSize = 256; size_t cwdSize = 256;
...@@ -20,9 +21,9 @@ int main(int argc, char *argv[]) { ...@@ -20,9 +21,9 @@ int main(int argc, char *argv[]) {
fprintf(stdout, "current working directory:%s\n", path); fprintf(stdout, "current working directory:%s\n", path);
strcat(path, "/libudf1.so"); strcat(path, "/libudf1.so");
UdfHandle handle; UdfcFuncHandle handle;
SEpSet epSet; SEpSet epSet;
setupUdf("udf1", &epSet, &handle); setupUdf(udfc, "udf1", &epSet, &handle);
SSDataBlock block = {0}; SSDataBlock block = {0};
SSDataBlock* pBlock = █ SSDataBlock* pBlock = █
...@@ -53,5 +54,5 @@ int main(int argc, char *argv[]) { ...@@ -53,5 +54,5 @@ int main(int argc, char *argv[]) {
} }
teardownUdf(handle); teardownUdf(handle);
destroyUdfdProxy(1); udfcClose(udfc);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册