未验证 提交 9f04cfe0 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #10978 from taosdata/szhou/3.0_udfd

first pull request of udfd. a lot of improvements shall follow this.
aux_source_directory(src FUNCTION_SRC) aux_source_directory(src FUNCTION_SRC)
list(REMOVE_ITEM FUNCTION_SRC src/udfd.c)
add_library(function STATIC ${FUNCTION_SRC}) add_library(function STATIC ${FUNCTION_SRC})
target_include_directories( target_include_directories(
function function
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/function" PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/function"
"${CMAKE_SOURCE_DIR}/contrib/libuv/include"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
) )
target_link_libraries( target_link_libraries(
function function
PUBLIC uv_a
PRIVATE os util common nodes PRIVATE os util common nodes
) )
\ No newline at end of file
add_executable(runUdf test/runUdf.c)
target_include_directories(
runUdf
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/function"
"${CMAKE_SOURCE_DIR}/contrib/libuv/include"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
runUdf
PUBLIC uv_a
PRIVATE os util common nodes function
)
add_library(udf1 MODULE test/udf1.c)
target_include_directories(
udf1
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/function"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
#SET(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/build/bin)
add_executable(udfd src/udfd.c)
target_include_directories(
udfd
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/function"
"${CMAKE_SOURCE_DIR}/contrib/libuv/include"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
udfd
PUBLIC uv_a
PRIVATE os util common nodes function
)
...@@ -20,68 +20,116 @@ ...@@ -20,68 +20,116 @@
extern "C" { extern "C" {
#endif #endif
#include "os.h" //======================================================================================
#include "taoserror.h" //begin API to taosd and qworker
/**
* start udf dameon service
* @return error code
*/
int32_t startUdfService();
/**
* stop udf dameon service
* @return error code
*/
int32_t stopUdfService();
enum { enum {
TSDB_UDF_FUNC_NORMAL = 0, TSDB_UDF_TYPE_SCALAR = 0,
TSDB_UDF_FUNC_INIT, TSDB_UDF_TYPE_AGGREGATE = 1
TSDB_UDF_FUNC_FINALIZE,
TSDB_UDF_FUNC_MERGE,
TSDB_UDF_FUNC_DESTROY,
TSDB_UDF_FUNC_MAX_NUM
}; };
typedef struct SUdfInit { enum {
int32_t maybe_null; /* 1 if function can return NULL */ TSDB_UDF_SCRIPT_BIN_LIB = 0,
uint32_t decimals; /* for real functions */ TSDB_UDF_SCRIPT_LUA = 1,
uint64_t length; /* For string functions */ };
char* ptr; /* free pointer for function data */
int32_t const_item; /* 0 if result is independent of arguments */
// script like lua/javascript
void* script_ctx;
void (*destroyCtxFunc)(void* script_ctx);
} SUdfInit;
typedef struct SUdfInfo { typedef struct SUdfInfo {
int32_t functionId; // system assigned function id char *udfName; // function name
int32_t funcType; // scalar function or aggregate function int32_t udfType; // scalar function or aggregate function
int8_t resType; // result type int8_t scriptType;
int16_t resBytes; // result byte char *path;
int32_t contLen; // content length
int32_t bufSize; // interbuf size int8_t resType; // result type
char* name; // function name int16_t resBytes; // result byte
void* handle; // handle loaded in mem int32_t bufSize; //interbuf size
void* funcs[TSDB_UDF_FUNC_MAX_NUM]; // function ptr
// for script like lua/javascript only
int isScript;
void* pScriptCtx;
SUdfInit init;
char* content;
char* path;
} SUdfInfo; } SUdfInfo;
typedef void *UdfHandle;
/**
* setup udf
* @param udf, in
* @param handle, out
* @return error code
*/
int32_t setupUdf(SUdfInfo* udf, UdfHandle *handle);
enum {
TSDB_UDF_STEP_NORMAL = 0,
TSDB_UDF_STEP_MERGE,
TSDb_UDF_STEP_FINALIZE,
TSDB_UDF_STEP_MAX_NUM
};
/**
* call udf
* @param handle udf handle
* @param step
* @param state
* @param stateSize
* @param input
* @param newstate
* @param newStateSize
* @param output
* @return error code
*/
//TODO: must change the following after metadata flow and data flow between qworker and udfd is well defined
typedef struct SUdfDataBlock {
char* data;
int32_t size;
} SUdfDataBlock;
int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SUdfDataBlock input, char **newstate,
int32_t *newStateSize, SUdfDataBlock *output);
/**
* tearn down udf
* @param handle
* @return
*/
int32_t teardownUdf(UdfHandle handle);
// end API to taosd and qworker
//=============================================================================================================================
// TODO: Must change
// begin API to UDF writer.
// script // script
typedef int32_t (*scriptInitFunc)(void* pCtx); //typedef int32_t (*scriptInitFunc)(void* pCtx);
typedef void (*scriptNormalFunc)(void* pCtx, char* data, int16_t iType, int16_t iBytes, int32_t numOfRows, //typedef void (*scriptNormalFunc)(void* pCtx, char* data, int16_t iType, int16_t iBytes, int32_t numOfRows,
int64_t* ptList, int64_t key, char* dataOutput, char* tsOutput, int32_t* numOfOutput, // int64_t* ptList, int64_t key, char* dataOutput, char* tsOutput, int32_t* numOfOutput,
int16_t oType, int16_t oBytes); // int16_t oType, int16_t oBytes);
typedef void (*scriptFinalizeFunc)(void* pCtx, int64_t key, char* dataOutput, int32_t* numOfOutput); //typedef void (*scriptFinalizeFunc)(void* pCtx, int64_t key, char* dataOutput, int32_t* numOfOutput);
typedef void (*scriptMergeFunc)(void* pCtx, char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput); //typedef void (*scriptMergeFunc)(void* pCtx, char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput);
typedef void (*scriptDestroyFunc)(void* pCtx); //typedef void (*scriptDestroyFunc)(void* pCtx);
// dynamic lib // dynamic lib
typedef void (*udfNormalFunc)(char* data, int16_t itype, int16_t iBytes, int32_t numOfRows, int64_t* ts, typedef int32_t (*TUdfInitFunc)();
char* dataOutput, char* interBuf, char* tsOutput, int32_t* numOfOutput, int16_t oType, typedef void (*TUdfDestroyFunc)();
int16_t oBytes, SUdfInit* buf);
typedef int32_t (*udfInitFunc)(SUdfInit* data); typedef void (*TUdfFunc)(int8_t step,
typedef void (*udfFinalizeFunc)(char* dataOutput, char* interBuf, int32_t* numOfOutput, SUdfInit* buf); char *state, int32_t stateSize, SUdfDataBlock input,
typedef void (*udfMergeFunc)(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf); char **newstate, int32_t *newStateSize, SUdfDataBlock *output);
typedef void (*udfDestroyFunc)(SUdfInit* buf);
//typedef void (*udfMergeFunc)(char *data, int32_t numOfRows, char *dataOutput, int32_t* numOfOutput);
//typedef void (*udfFinalizeFunc)(char* state, int32_t stateSize, SUdfDataBlock *output);
// end API to UDF writer
//=======================================================================================================================
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TUDF_INT_H
#define TDENGINE_TUDF_INT_H
#ifdef __cplusplus
extern "C" {
#endif
//TODO replaces them with fnDebug
//#define debugPrint(...) taosPrintLog("Function", DEBUG_INFO, 135, __VA_ARGS__)
#define debugPrint(...) {fprintf(stderr, __VA_ARGS__);fprintf(stderr, "\n");}
enum {
UDF_TASK_SETUP = 0,
UDF_TASK_CALL = 1,
UDF_TASK_TEARDOWN = 2
};
typedef struct SUdfSetupRequest {
char udfName[16]; //
int8_t scriptType; // 0:c, 1: lua, 2:js
int8_t udfType; //udaf, udf
int16_t pathSize;
char *path;
} SUdfSetupRequest;
typedef struct SUdfSetupResponse {
int64_t udfHandle;
} SUdfSetupResponse;
typedef struct SUdfCallRequest {
int64_t udfHandle;
int8_t step;
int32_t inputBytes;
char *input;
int32_t stateBytes;
char *state;
} SUdfCallRequest;
typedef struct SUdfCallResponse {
int32_t outputBytes;
char *output;
int32_t newStateBytes;
char *newState;
} SUdfCallResponse;
typedef struct SUdfTeardownRequest {
int64_t udfHandle;
} SUdfTeardownRequest;
typedef struct SUdfTeardownResponse {
} SUdfTeardownResponse;
typedef struct SUdfRequest {
int32_t msgLen;
int64_t seqNum;
int8_t type;
void *subReq;
} SUdfRequest;
typedef struct SUdfResponse {
int32_t msgLen;
int64_t seqNum;
int8_t type;
int32_t code;
void *subRsp;
} SUdfResponse;
int32_t decodeRequest(char *buf, int32_t bufLen, SUdfRequest **pRequest);
int32_t encodeResponse(char **buf, int32_t *bufLen, SUdfResponse *response);
int32_t encodeRequest(char **buf, int32_t *bufLen, SUdfRequest *request);
int32_t decodeResponse(char *buf, int32_t bufLen, SUdfResponse **pResponse);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TUDF_INT_H
//
// Created by shenglian on 28/02/22.
//
#ifndef UDF_UDF_H
#define UDF_UDF_H
#include <stdlib.h>
#define DEBUG
#ifdef DEBUG
#define debugPrint(...) fprintf(__VA_ARGS__)
#else
#define debugPrint(...) /**/
#endif
enum {
UDF_TASK_SETUP = 0,
UDF_TASK_CALL = 1,
UDF_TASK_TEARDOWN = 2
};
typedef struct SSDataBlock{
char *data;
int32_t size;
} SSDataBlock;
typedef struct SUdfInfo {
char *udfName;
char *path;
} SUdfInfo;
typedef void *UdfHandle;
int32_t startUdfService();
int32_t stopUdfService();
//int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfHandle *handles);
int32_t setupUdf(SUdfInfo* udf, UdfHandle* handle);
int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SSDataBlock input, char **newstate,
int32_t *newStateSize, SSDataBlock *output);
int32_t teardownUdf(UdfHandle handle);
typedef struct SUdfSetupRequest {
char udfName[16]; //
int8_t scriptType; // 0:c, 1: lua, 2:js
int8_t udfType; //udaf, udf, udtf
int16_t pathSize;
char *path;
} SUdfSetupRequest;
typedef struct SUdfSetupResponse {
int64_t udfHandle;
} SUdfSetupResponse;
typedef struct SUdfCallRequest {
int64_t udfHandle;
int8_t step;
int32_t inputBytes;
char *input;
int32_t stateBytes;
char *state;
} SUdfCallRequest;
typedef struct SUdfCallResponse {
int32_t outputBytes;
char *output;
int32_t newStateBytes;
char *newState;
} SUdfCallResponse;
typedef struct SUdfTeardownRequest {
int64_t udfHandle;
} SUdfTeardownRequest;
typedef struct SUdfTeardownResponse {
} SUdfTeardownResponse;
typedef struct SUdfRequest {
int32_t msgLen;
int64_t seqNum;
int8_t type;
void *subReq;
} SUdfRequest;
typedef struct SUdfResponse {
int32_t msgLen;
int64_t seqNum;
int8_t type;
int32_t code;
void *subRsp;
} SUdfResponse;
int32_t decodeRequest(char *buf, int32_t bufLen, SUdfRequest **pRequest);
int32_t encodeResponse(char **buf, int32_t *bufLen, SUdfResponse *response);
int32_t encodeRequest(char **buf, int32_t *bufLen, SUdfRequest *request);
int32_t decodeResponse(char *buf, int32_t bufLen, SUdfResponse **pResponse);
#endif //UDF_UDF_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "uv.h"
#include "os.h"
#include "tlog.h"
#include "tudf.h" #include "tudf.h"
#include "tudfInt.h"
static char* getUdfFuncName(char* funcname, char* name, int type) { //TODO: when startup, set thread poll size. add it to cfg
switch (type) { //TODO: udfd restart when exist or aborts
case TSDB_UDF_FUNC_NORMAL: //TODO: network error processing.
strcpy(funcname, name); //TODO: add unit test
//TODO: add lua support
void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
enum {
UV_TASK_CONNECT = 0,
UV_TASK_REQ_RSP = 1,
UV_TASK_DISCONNECT = 2
};
typedef struct SUdfUvSession {
int64_t severHandle;
uv_pipe_t *udfSvcPipe;
} SUdfUvSession;
typedef struct SClientUvTaskNode {
int8_t type;
int errCode;
uv_pipe_t *pipe;
int64_t seqNum;
uv_buf_t reqBuf;
uv_sem_t taskSem;
uv_buf_t rspBuf;
struct SClientUvTaskNode *prev;
struct SClientUvTaskNode *next;
} SClientUvTaskNode;
typedef struct SClientUdfTask {
int8_t type;
SUdfUvSession *session;
int32_t errCode;
union {
struct {
SUdfSetupRequest req;
SUdfSetupResponse rsp;
} _setup;
struct {
SUdfCallRequest req;
SUdfCallResponse rsp;
} _call;
struct {
SUdfTeardownRequest req;
SUdfTeardownResponse rsp;
} _teardown;
};
} SClientUdfTask;
typedef struct SClientConnBuf {
char *buf;
int32_t len;
int32_t cap;
int32_t total;
} SClientConnBuf;
typedef struct SClientUvConn {
uv_pipe_t *pipe;
SClientUvTaskNode taskQueue;
SClientConnBuf readBuf;
} SClientUvConn;
uv_process_t gUdfdProcess;
uv_barrier_t gUdfInitBarrier;
uv_loop_t gUdfdLoop;
uv_thread_t gUdfLoopThread;
uv_async_t gUdfLoopTaskAync;
uv_async_t gUdfLoopStopAsync;
uv_mutex_t gUdfTaskQueueMutex;
int64_t gUdfTaskSeqNum = 0;
//double circular linked list
typedef SClientUvTaskNode *SClientUvTaskQueue;
SClientUvTaskNode gUdfQueueNode;
SClientUvTaskQueue gUdfTaskQueue = &gUdfQueueNode;
//add SClientUvTaskNode task that close conn
void udfTaskQueueInit(SClientUvTaskQueue q) {
q->next = q;
q->prev = q;
}
bool udfTaskQueueIsEmpty(SClientUvTaskQueue q) {
return q == q->next;
}
void udfTaskQueueInsertTail(SClientUvTaskQueue q, SClientUvTaskNode *e) {
e->next = q;
e->prev = q->prev;
e->prev->next = e;
q->prev = e;
}
void udfTaskQueueInsertTaskAtHead(SClientUvTaskQueue q, SClientUvTaskNode *e) {
e->next = q->next;
e->prev = q;
q->next->prev = e;
q->next = e;
}
void udfTaskQueueRemoveTask(SClientUvTaskNode *e) {
e->prev->next = e->next;
e->next->prev = e->prev;
}
void udfTaskQueueSplit(SClientUvTaskQueue q, SClientUvTaskNode *from, SClientUvTaskQueue n) {
n->prev = q->prev;
n->prev->next = n;
n->next = from;
q->prev = from->prev;
q->prev->next = q;
from->prev = n;
}
SClientUvTaskNode *udfTaskQueueHeadTask(SClientUvTaskQueue q) {
return q->next;
}
SClientUvTaskNode *udfTaskQueueTailTask(SClientUvTaskQueue q) {
return q->prev;
}
SClientUvTaskNode *udfTaskQueueNext(SClientUvTaskNode *e) {
return e->next;
}
void udfTaskQueueMove(SClientUvTaskQueue q, SClientUvTaskQueue n) {
if (udfTaskQueueIsEmpty(q)) {
udfTaskQueueInit(n);
} else {
SClientUvTaskNode *h = udfTaskQueueHeadTask(q);
udfTaskQueueSplit(q, h, n);
}
}
int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) {
debugPrint("%s", "encoding request");
int len = sizeof(SUdfRequest) - sizeof(void *);
switch (request->type) {
case UDF_TASK_SETUP: {
SUdfSetupRequest *setup = (SUdfSetupRequest *) (request->subReq);
len += sizeof(SUdfSetupRequest) - 1 * sizeof(char *) + setup->pathSize;
break; break;
case TSDB_UDF_FUNC_INIT: }
sprintf(funcname, "%s_init", name); case UDF_TASK_CALL: {
SUdfCallRequest *call = (SUdfCallRequest *) (request->subReq);
len += sizeof(SUdfCallRequest) - 2 * sizeof(char *) + call->inputBytes + call->stateBytes;
break; break;
case TSDB_UDF_FUNC_FINALIZE: }
sprintf(funcname, "%s_finalize", name); case UDF_TASK_TEARDOWN: {
SUdfTeardownRequest *teardown = (SUdfTeardownRequest *) (request->subReq);
len += sizeof(SUdfTeardownRequest);
break; break;
case TSDB_UDF_FUNC_MERGE: }
sprintf(funcname, "%s_merge", name); default:
break; break;
case TSDB_UDF_FUNC_DESTROY: }
sprintf(funcname, "%s_destroy", name);
char *bufBegin = malloc(len);
char *buf = bufBegin;
//skip msgLen first
buf += sizeof(int32_t);
*(int64_t *) buf = request->seqNum;
buf += sizeof(int64_t);
*(int8_t *) buf = request->type;
buf += sizeof(int8_t);
switch (request->type) {
case UDF_TASK_SETUP: {
SUdfSetupRequest *setup = (SUdfSetupRequest *) (request->subReq);
memcpy(buf, setup->udfName, 16);
buf += 16;
*(int8_t *) buf = setup->scriptType;
buf += sizeof(int8_t);
*(int8_t *) buf = setup->udfType;
buf += sizeof(int8_t);
*(int16_t *) buf = setup->pathSize;
buf += sizeof(int16_t);
memcpy(buf, setup->path, setup->pathSize);
buf += setup->pathSize;
break;
}
case UDF_TASK_CALL: {
SUdfCallRequest *call = (SUdfCallRequest *) (request->subReq);
*(int64_t *) buf = call->udfHandle;
buf += sizeof(int64_t);
*(int8_t *) buf = call->step;
buf += sizeof(int8_t);
*(int32_t *) buf = call->inputBytes;
buf += sizeof(int32_t);
memcpy(buf, call->input, call->inputBytes);
buf += call->inputBytes;
*(int32_t *) buf = call->stateBytes;
buf += sizeof(int32_t);
memcpy(buf, call->state, call->stateBytes);
buf += call->stateBytes;
break; break;
}
case UDF_TASK_TEARDOWN: {
SUdfTeardownRequest *teardown = (SUdfTeardownRequest *) (request->subReq);
*(int64_t *) buf = teardown->udfHandle;
buf += sizeof(int64_t);
break;
}
default: default:
assert(0);
break; break;
} }
return funcname; request->msgLen = buf - bufBegin;
*(int32_t *) bufBegin = request->msgLen;
*pBuf = bufBegin;
*pBufLen = request->msgLen;
return 0;
} }
#if 0 int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest **pRequest) {
int32_t initUdfInfo(SUdfInfo* pUdfInfo) { debugPrint("%s", "decoding request");
if (pUdfInfo == NULL) { if (*(int32_t *) bufMsg != bufLen) {
return TSDB_CODE_SUCCESS; debugPrint("%s", "decoding request error");
return -1;
} }
////qError("script len: %d", pUdfInfo->contLen); char *buf = bufMsg;
if (isValidScript(pUdfInfo->content, pUdfInfo->contLen)) { SUdfRequest *request = malloc(sizeof(SUdfRequest));
pUdfInfo->isScript = 1; request->subReq = NULL;
pUdfInfo->pScriptCtx = createScriptCtx(pUdfInfo->content, pUdfInfo->resType, pUdfInfo->resBytes); request->msgLen = *(int32_t *) (buf);
if (pUdfInfo->pScriptCtx == NULL) { buf += sizeof(int32_t);
return TSDB_CODE_QRY_SYS_ERROR; request->seqNum = *(int64_t *) (buf);
buf += sizeof(int64_t);
request->type = *(int8_t *) (buf);
buf += sizeof(int8_t);
switch (request->type) {
case UDF_TASK_SETUP: {
SUdfSetupRequest *setup = malloc(sizeof(SUdfSetupRequest));
memcpy(setup->udfName, buf, 16);
buf += 16;
setup->scriptType = *(int8_t *) buf;
buf += sizeof(int8_t);
setup->udfType = *(int8_t *) buf;
buf += sizeof(int8_t);
setup->pathSize = *(int16_t *) buf;
buf += sizeof(int16_t);
setup->path = buf;
buf += setup->pathSize;
request->subReq = setup;
break;
} }
tfree(pUdfInfo->content); case UDF_TASK_CALL: {
SUdfCallRequest *call = malloc(sizeof(SUdfCallRequest));
pUdfInfo->funcs[TSDB_UDF_FUNC_INIT] = taosLoadScriptInit;
if (pUdfInfo->funcs[TSDB_UDF_FUNC_INIT] == NULL call->udfHandle = *(int64_t *) buf;
|| (*(scriptInitFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_INIT])(pUdfInfo->pScriptCtx) != TSDB_CODE_SUCCESS) { buf += sizeof(int64_t);
return TSDB_CODE_QRY_SYS_ERROR; call->step = *(int8_t *) buf;
buf += sizeof(int8_t);
call->inputBytes = *(int32_t *) buf;
buf += sizeof(int32_t);
call->input = buf;
buf += call->inputBytes;
call->stateBytes = *(int32_t *) buf;
buf += sizeof(int32_t);
call->state = buf;
buf += call->stateBytes;
request->subReq = call;
break;
} }
pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL] = taosLoadScriptNormal; case UDF_TASK_TEARDOWN: {
SUdfTeardownRequest *teardown = malloc(sizeof(SUdfTeardownRequest));
teardown->udfHandle = *(int64_t *) buf;
buf += sizeof(int64_t);
if (pUdfInfo->funcType == FUNCTION_TYPE_AGG) { request->subReq = teardown;
pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE] = taosLoadScriptFinalize;
pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE] = taosLoadScriptMerge;
} }
pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY] = taosLoadScriptDestroy;
} else { }
char path[PATH_MAX] = {0}; if (buf - bufMsg != bufLen) {
taosGetTmpfilePath("script", path, tsTempDir); debugPrint("%s", "decode request error");
free(request->subReq);
free(request);
return -1;
}
*pRequest = request;
return 0;
}
int32_t encodeResponse(char **pBuf, int32_t *pBufLen, SUdfResponse *response) {
debugPrint("%s", "encoding response");
int32_t len = sizeof(SUdfResponse) - sizeof(void *);
switch (response->type) {
case UDF_TASK_SETUP: {
len += sizeof(SUdfSetupResponse);
break;
}
case UDF_TASK_CALL: {
SUdfCallResponse *callResp = (SUdfCallResponse *) (response->subRsp);
len += sizeof(SUdfCallResponse) - 2 * sizeof(char *) +
callResp->outputBytes + callResp->newStateBytes;
break;
}
case UDF_TASK_TEARDOWN: {
len += sizeof(SUdfTeardownResponse);
break;
}
}
FILE* file = fopen(path, "w+"); char *bufBegin = malloc(len);
char *buf = bufBegin;
// TODO check for failure of flush to disk //skip msgLen
/*size_t t = */ fwrite(pUdfInfo->content, pUdfInfo->contLen, 1, file); buf += sizeof(int32_t);
fclose(file);
tfree(pUdfInfo->content);
pUdfInfo->path = strdup(path); *(int64_t *) buf = response->seqNum;
buf += sizeof(int64_t);
*(int8_t *) buf = response->type;
buf += sizeof(int8_t);
*(int32_t *) buf = response->code;
buf += sizeof(int32_t);
pUdfInfo->handle = taosLoadDll(path);
if (NULL == pUdfInfo->handle) { switch (response->type) {
return TSDB_CODE_QRY_SYS_ERROR; case UDF_TASK_SETUP: {
SUdfSetupResponse *setupResp = (SUdfSetupResponse *) (response->subRsp);
*(int64_t *) buf = setupResp->udfHandle;
buf += sizeof(int64_t);
break;
}
case UDF_TASK_CALL: {
SUdfCallResponse *callResp = (SUdfCallResponse *) (response->subRsp);
*(int32_t *) buf = callResp->outputBytes;
buf += sizeof(int32_t);
memcpy(buf, callResp->output, callResp->outputBytes);
buf += callResp->outputBytes;
*(int32_t *) buf = callResp->newStateBytes;
buf += sizeof(int32_t);
memcpy(buf, callResp->newState, callResp->newStateBytes);
buf += callResp->newStateBytes;
break;
} }
case UDF_TASK_TEARDOWN: {
SUdfTeardownResponse *teardownResp = (SUdfTeardownResponse *) (response->subRsp);
break;
}
default:
break;
}
response->msgLen = buf - bufBegin;
*(int32_t *) bufBegin = response->msgLen;
*pBuf = bufBegin;
*pBufLen = response->msgLen;
return 0;
}
char funcname[FUNCTIONS_NAME_MAX_LENGTH + 10] = {0}; int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse **pResponse) {
pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_NORMAL)); debugPrint("%s", "decoding response");
if (NULL == pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL]) {
return TSDB_CODE_QRY_SYS_ERROR; if (*(int32_t *) bufMsg != bufLen) {
debugPrint("%s", "can not decode response");
return -1;
}
char *buf = bufMsg;
SUdfResponse *rsp = malloc(sizeof(SUdfResponse));
rsp->msgLen = *(int32_t *) buf;
buf += sizeof(int32_t);
rsp->seqNum = *(int64_t *) buf;
buf += sizeof(int64_t);
rsp->type = *(int8_t *) buf;
buf += sizeof(int8_t);
rsp->code = *(int32_t *) buf;
buf += sizeof(int32_t);
switch (rsp->type) {
case UDF_TASK_SETUP: {
SUdfSetupResponse *setupRsp = (SUdfSetupResponse *) malloc(sizeof(SUdfSetupResponse));
setupRsp->udfHandle = *(int64_t *) buf;
buf += sizeof(int64_t);
rsp->subRsp = (char *) setupRsp;
break;
} }
case UDF_TASK_CALL: {
SUdfCallResponse *callRsp = (SUdfCallResponse *) malloc(sizeof(SUdfCallResponse));
callRsp->outputBytes = *(int32_t *) buf;
buf += sizeof(int32_t);
pUdfInfo->funcs[TSDB_UDF_FUNC_INIT] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_INIT)); callRsp->output = buf;
buf += callRsp->outputBytes;
if (pUdfInfo->funcType == FUNCTION_TYPE_AGG) { callRsp->newStateBytes = *(int32_t *) buf;
pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_FINALIZE)); buf += sizeof(int32_t);
pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_MERGE));
callRsp->newState = buf;
buf += callRsp->newStateBytes;
rsp->subRsp = callRsp;
break;
} }
case UDF_TASK_TEARDOWN: {
SUdfTeardownResponse *teardownRsp = (SUdfTeardownResponse *) malloc(sizeof(SUdfTeardownResponse));
rsp->subRsp = teardownRsp;
break;
}
default:
break;
}
if (buf - bufMsg != bufLen) {
debugPrint("%s", "can not decode response");
free(rsp->subRsp);
free(rsp);
return -1;
}
*pResponse = rsp;
return 0;
}
void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) {
debugPrint("Process exited with status %" PRId64 ", signal %d", exit_status, term_signal);
uv_close((uv_handle_t *) req, NULL);
//TODO: restart the udfd process
}
pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_DESTROY)); void onUdfcPipeClose(uv_handle_t *handle) {
SClientUvConn *conn = handle->data;
if (!udfTaskQueueIsEmpty(&conn->taskQueue)) {
SClientUvTaskNode *task = udfTaskQueueHeadTask(&conn->taskQueue);
task->errCode = 0;
uv_sem_post(&task->taskSem);
}
free(conn->readBuf.buf);
free(conn);
free((uv_pipe_t *) handle);
}
int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
debugPrint("%s", "get uv task result");
if (uvTask->type == UV_TASK_REQ_RSP) {
if (uvTask->rspBuf.base != NULL) {
SUdfResponse *rsp;
decodeResponse(uvTask->rspBuf.base, uvTask->rspBuf.len, &rsp);
task->errCode = rsp->code;
switch (task->type) {
case UDF_TASK_SETUP: {
//TODO: copy or not
task->_setup.rsp = *(SUdfSetupResponse *) (rsp->subRsp);
break;
}
case UDF_TASK_CALL: {
task->_call.rsp = *(SUdfCallResponse *) (rsp->subRsp);
//TODO: copy or not
break;
}
case UDF_TASK_TEARDOWN: {
task->_teardown.rsp = *(SUdfTeardownResponse *) (rsp->subRsp);
//TODO: copy or not?
break;
}
default: {
break;
}
}
// TODO: the call buffer is setup and freed by udf invocation
free(uvTask->rspBuf.base);
free(rsp->subRsp);
free(rsp);
} else {
task->errCode = uvTask->errCode;
}
} else if (uvTask->type == UV_TASK_CONNECT) {
task->errCode = uvTask->errCode;
} else if (uvTask->type == UV_TASK_DISCONNECT) {
task->errCode = uvTask->errCode;
}
return 0;
}
if (pUdfInfo->funcs[TSDB_UDF_FUNC_INIT]) { void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
return (*(udfInitFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_INIT])(&pUdfInfo->init); debugPrint("%s", "client allocate buffer to receive from pipe");
SClientUvConn *conn = handle->data;
SClientConnBuf *connBuf = &conn->readBuf;
int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
if (connBuf->cap == 0) {
connBuf->buf = malloc(msgHeadSize);
if (connBuf->buf) {
connBuf->len = 0;
connBuf->cap = msgHeadSize;
connBuf->total = -1;
buf->base = connBuf->buf;
buf->len = connBuf->cap;
} else {
//TODO: log error
buf->base = NULL;
buf->len = 0;
}
} else {
connBuf->cap = connBuf->total > connBuf->cap ? connBuf->total : connBuf->cap;
void *resultBuf = realloc(connBuf->buf, connBuf->cap);
if (resultBuf) {
connBuf->buf = resultBuf;
buf->base = connBuf->buf + connBuf->len;
buf->len = connBuf->cap - connBuf->len;
} else {
//TODO: log error free connBuf->buf
buf->base = NULL;
buf->len = 0;
} }
} }
return TSDB_CODE_SUCCESS; debugPrint("\tconn buf cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
} }
void destroyUdfInfo(SUdfInfo* pUdfInfo) { bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
if (pUdfInfo == NULL) { if (connBuf->total == -1 && connBuf->len >= sizeof(int32_t)) {
return; connBuf->total = *(int32_t *) (connBuf->buf);
} }
if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
return true;
}
return false;
}
void udfcUvHandleRsp(SClientUvConn *conn) {
SClientConnBuf *connBuf = &conn->readBuf;
int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen int32_t then seqnum
if (pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY]) { if (udfTaskQueueIsEmpty(&conn->taskQueue)) {
if (pUdfInfo->isScript) { //LOG error
(*(scriptDestroyFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY])(pUdfInfo->pScriptCtx); return;
tfree(pUdfInfo->content); }
}else{ bool found = false;
(*(udfDestroyFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY])(&pUdfInfo->init); SClientUvTaskNode *taskFound = NULL;
SClientUvTaskNode *task = udfTaskQueueNext(&conn->taskQueue);
while (task != &conn->taskQueue) {
if (task->seqNum == seqNum) {
if (found == false) {
found = true;
taskFound = task;
} else {
//LOG error;
continue;
}
} }
task = udfTaskQueueNext(task);
} }
tfree(pUdfInfo->name); if (taskFound) {
taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
udfTaskQueueRemoveTask(taskFound);
uv_sem_post(&taskFound->taskSem);
} else {
//LOG error
}
connBuf->buf = NULL;
connBuf->total = -1;
connBuf->len = 0;
connBuf->cap = 0;
}
void udfcUvHandleError(SClientUvConn *conn) {
uv_close((uv_handle_t *) conn->pipe, onUdfcPipeClose);
}
void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
debugPrint("%s, nread: %zd", "client read from pipe", nread);
if (nread == 0) return;
SClientUvConn *conn = client->data;
SClientConnBuf *connBuf = &conn->readBuf;
if (nread > 0) {
connBuf->len += nread;
if (isUdfcUvMsgComplete(connBuf)) {
udfcUvHandleRsp(conn);
}
if (pUdfInfo->path) { }
unlink(pUdfInfo->path); if (nread < 0) {
debugPrint("\tclient read error: %s", uv_strerror(nread));
if (nread == UV_EOF) {
//TODO:
}
udfcUvHandleError(conn);
} }
tfree(pUdfInfo->path);
tfree(pUdfInfo->content);
taosCloseDll(pUdfInfo->handle);
tfree(pUdfInfo);
} }
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx *pCtx, int32_t idx, int32_t type) { void onUdfClientWrite(uv_write_t *write, int status) {
int32_t output = 0; debugPrint("%s", "after writing to pipe");
SClientUvTaskNode *uvTask = write->data;
if (status == 0) {
uv_pipe_t *pipe = uvTask->pipe;
SClientUvConn *conn = pipe->data;
udfTaskQueueInsertTail(&conn->taskQueue, uvTask);
} else {
//TODO Log error;
}
debugPrint("\tlength:%zu", uvTask->reqBuf.len);
free(write);
free(uvTask->reqBuf.base);
}
if (pUdfInfo == NULL || pUdfInfo->funcs[type] == NULL) { void onUdfClientConnect(uv_connect_t *connect, int status) {
//qError("empty udf function, type:%d", type); SClientUvTaskNode *uvTask = connect->data;
return; uvTask->errCode = status;
if (status != 0) {
//TODO: LOG error
} }
uv_read_start((uv_stream_t *) uvTask->pipe, udfcAllocateBuffer, onUdfcRead);
free(connect);
uv_sem_post(&uvTask->taskSem);
}
// //qDebug("invoke udf function:%s,%p", pUdfInfo->name, pUdfInfo->funcs[type]); int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) {
SClientUvTaskNode *uvTask = calloc(1, sizeof(SClientUvTaskNode));
uvTask->type = uvTaskType;
if (uvTaskType == UV_TASK_CONNECT) {
} else if (uvTaskType == UV_TASK_REQ_RSP) {
uvTask->pipe = task->session->udfSvcPipe;
SUdfRequest request;
request.type = task->type;
request.seqNum = gUdfTaskSeqNum++;
if (task->type == UDF_TASK_SETUP) {
request.subReq = &task->_setup.req;
request.type = UDF_TASK_SETUP;
} else if (task->type == UDF_TASK_CALL) {
request.subReq = &task->_call.req;
request.type = UDF_TASK_CALL;
} else if (task->type == UDF_TASK_TEARDOWN) {
request.subReq = &task->_teardown.req;
request.type = UDF_TASK_TEARDOWN;
} else {
//TODO log and return error
}
char *buf = NULL;
int32_t bufLen = 0;
encodeRequest(&buf, &bufLen, &request);
uvTask->reqBuf = uv_buf_init(buf, bufLen);
uvTask->seqNum = request.seqNum;
} else if (uvTaskType == UV_TASK_DISCONNECT) {
uvTask->pipe = task->session->udfSvcPipe;
}
uv_sem_init(&uvTask->taskSem, 0);
switch (type) { *pUvTask = uvTask;
case TSDB_UDF_FUNC_NORMAL: return 0;
if (pUdfInfo->isScript) { }
(*(scriptNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])(pUdfInfo->pScriptCtx,
(char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->startTs, pCtx->pOutput,
(char *)pCtx->ptsOutputBuf, &output, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes);
} else {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo); int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) {
debugPrint("%s, %d", "queue uv task", uvTask->type);
(*(udfNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])((char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, uv_mutex_lock(&gUdfTaskQueueMutex);
pCtx->pOutput, interBuf, (char *)pCtx->ptsOutputBuf, &output, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes, &pUdfInfo->init); udfTaskQueueInsertTail(gUdfTaskQueue, uvTask);
} uv_mutex_unlock(&gUdfTaskQueueMutex);
uv_async_send(&gUdfLoopTaskAync);
if (pUdfInfo->funcType == TSDB_FUNC_TYPE_AGGREGATE) { uv_sem_wait(&uvTask->taskSem);
pCtx->resultInfo->numOfRes = output; uv_sem_destroy(&uvTask->taskSem);
} else {
pCtx->resultInfo->numOfRes += output;
}
if (pCtx->resultInfo->numOfRes > 0) { return 0;
pCtx->resultInfo->hasResult = DATA_SET_FLAG; }
}
break; int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
debugPrint("%s, type %d", "start uv task ", uvTask->type);
switch (uvTask->type) {
case UV_TASK_CONNECT: {
uv_pipe_t *pipe = malloc(sizeof(uv_pipe_t));
uv_pipe_init(&gUdfdLoop, pipe, 0);
uvTask->pipe = pipe;
case TSDB_UDF_FUNC_MERGE: SClientUvConn *conn = malloc(sizeof(SClientUvConn));
if (pUdfInfo->isScript) { conn->pipe = pipe;
(*(scriptMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pUdfInfo->pScriptCtx, pCtx->pInput, pCtx->size, pCtx->pOutput, &output); conn->readBuf.len = 0;
} else { conn->readBuf.cap = 0;
(*(udfMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pCtx->pInput, pCtx->size, pCtx->pOutput, &output, &pUdfInfo->init); conn->readBuf.buf = 0;
} conn->readBuf.total = -1;
udfTaskQueueInit(&conn->taskQueue);
// set the output value exist pipe->data = conn;
pCtx->resultInfo->numOfRes = output;
if (output > 0) { uv_connect_t *connReq = malloc(sizeof(uv_connect_t));
pCtx->resultInfo->hasResult = DATA_SET_FLAG; connReq->data = uvTask;
}
uv_pipe_connect(connReq, pipe, "udf.sock", onUdfClientConnect);
break;
}
case UV_TASK_REQ_RSP: {
uv_pipe_t *pipe = uvTask->pipe;
uv_write_t *write = malloc(sizeof(uv_write_t));
write->data = uvTask;
uv_write(write, (uv_stream_t *) pipe, &uvTask->reqBuf, 1, onUdfClientWrite);
break; break;
}
case UV_TASK_DISCONNECT: {
SClientUvConn *conn = uvTask->pipe->data;
udfTaskQueueInsertTail(&conn->taskQueue, uvTask);
uv_close((uv_handle_t *) uvTask->pipe, onUdfcPipeClose);
break;
}
default: {
break;
}
}
case TSDB_UDF_FUNC_FINALIZE: { return 0;
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); }
void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo);
if (pUdfInfo->isScript) {
(*(scriptFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pUdfInfo->pScriptCtx, pCtx->startTs, pCtx->pOutput, &output);
} else {
(*(udfFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx->pOutput, interBuf, &output, &pUdfInfo->init);
}
// set the output value exist
pCtx->resultInfo->numOfRes = output;
if (output > 0) {
pCtx->resultInfo->hasResult = DATA_SET_FLAG;
}
break; void udfClientAsyncCb(uv_async_t *async) {
} SClientUvTaskNode node;
SClientUvTaskQueue q = &node;
udfTaskQueueInit(q);
uv_mutex_lock(&gUdfTaskQueueMutex);
udfTaskQueueMove(gUdfTaskQueue, q);
uv_mutex_unlock(&gUdfTaskQueueMutex);
while (!udfTaskQueueIsEmpty(q)) {
SClientUvTaskNode *task = udfTaskQueueHeadTask(q);
udfTaskQueueRemoveTask(task);
startUvUdfTask(task);
}
}
void udfStopAsyncCb(uv_async_t *async) {
uv_stop(&gUdfdLoop);
uv_loop_close(&gUdfdLoop);
}
void startUdfd(void *argsThread) {
uv_loop_init(&gUdfdLoop);
//TODO: path
uv_process_options_t options;
static char path[256] = {0};
size_t cwdSize;
uv_cwd(path, &cwdSize);
strcat(path, "./udfd");
char* args[2] = {path, NULL};
options.args = args;
options.file = path;
options.exit_cb = onUdfdExit;
int err = uv_spawn(&gUdfdLoop, &gUdfdProcess, &options);
if (err != 0) {
debugPrint("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
}
uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb);
uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb);
uv_mutex_init(&gUdfTaskQueueMutex);
udfTaskQueueInit(gUdfTaskQueue);
uv_barrier_wait(&gUdfInitBarrier);
uv_run(&gUdfdLoop, UV_RUN_DEFAULT);
}
int32_t startUdfService() {
uv_barrier_init(&gUdfInitBarrier, 2);
uv_thread_create(&gUdfLoopThread, startUdfd, 0);
uv_barrier_wait(&gUdfInitBarrier);
return 0;
}
int32_t stopUdfService() {
uv_barrier_destroy(&gUdfInitBarrier);
uv_process_kill(&gUdfdProcess, SIGINT);
uv_async_send(&gUdfLoopStopAsync);
uv_mutex_destroy(&gUdfTaskQueueMutex);
uv_thread_join(&gUdfLoopThread);
return 0;
}
int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) {
SClientUvTaskNode *uvTask = NULL;
createUdfcUvTask(task, uvTaskType, &uvTask);
queueUvUdfTask(uvTask);
udfcGetUvTaskResponseResult(task, uvTask);
if (uvTaskType == UV_TASK_CONNECT) {
task->session->udfSvcPipe = uvTask->pipe;
}
free(uvTask);
uvTask = NULL;
return task->errCode;
}
int32_t setupUdf(SUdfInfo *udfInfo, UdfHandle *handle) {
debugPrint("%s", "client setup udf");
SClientUdfTask *task = malloc(sizeof(SClientUdfTask));
task->errCode = 0;
task->session = malloc(sizeof(SUdfUvSession));
task->type = UDF_TASK_SETUP;
SUdfSetupRequest *req = &task->_setup.req;
memcpy(req->udfName, udfInfo->udfName, 16);
req->path = udfInfo->path;
req->pathSize = strlen(req->path) + 1;
req->udfType = udfInfo->udfType;
req->scriptType = udfInfo->scriptType;
int32_t errCode = udfcRunUvTask(task, UV_TASK_CONNECT);
if (errCode != 0) {
//TODO: log error
return -1;
} }
udfcRunUvTask(task, UV_TASK_REQ_RSP);
SUdfSetupResponse *rsp = &task->_setup.rsp;
task->session->severHandle = rsp->udfHandle;
*handle = task->session;
int32_t err = task->errCode;
free(task);
return err;
} }
#endif int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SUdfDataBlock input, char **newState,
\ No newline at end of file int32_t *newStateSize, SUdfDataBlock *output) {
debugPrint("%s", "client call udf");
SClientUdfTask *task = malloc(sizeof(SClientUdfTask));
task->errCode = 0;
task->session = (SUdfUvSession *) handle;
task->type = UDF_TASK_CALL;
SUdfCallRequest *req = &task->_call.req;
req->state = state;
req->stateBytes = stateSize;
req->inputBytes = input.size;
req->input = input.data;
req->udfHandle = task->session->severHandle;
req->step = step;
udfcRunUvTask(task, UV_TASK_REQ_RSP);
SUdfCallResponse *rsp = &task->_call.rsp;
*newState = rsp->newState;
*newStateSize = rsp->newStateBytes;
output->size = rsp->outputBytes;
output->data = rsp->output;
int32_t err = task->errCode;
free(task);
return err;
}
int32_t teardownUdf(UdfHandle handle) {
debugPrint("%s", "client teardown udf");
SClientUdfTask *task = malloc(sizeof(SClientUdfTask));
task->errCode = 0;
task->session = (SUdfUvSession *) handle;
task->type = UDF_TASK_TEARDOWN;
SUdfTeardownRequest *req = &task->_teardown.req;
req->udfHandle = task->session->severHandle;
udfcRunUvTask(task, UV_TASK_REQ_RSP);
SUdfTeardownResponse *rsp = &task->_teardown.rsp;
int32_t err = task->errCode;
udfcRunUvTask(task, UV_TASK_DISCONNECT);
free(task->session);
free(task);
return err;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "uv.h"
#include "os.h"
#include "tlog.h"
#include "tudf.h"
#include "tudfInt.h"
static uv_loop_t *loop;
typedef struct SUdfdUvConn {
uv_stream_t *client;
char *inputBuf;
int32_t inputLen;
int32_t inputCap;
int32_t inputTotal;
} SUdfdUvConn;
typedef struct SUvUdfWork {
uv_stream_t *client;
uv_buf_t input;
uv_buf_t output;
} SUvUdfWork;
typedef struct SUdf {
int32_t refCount;
char name[16];
int8_t type;
uv_lib_t lib;
TUdfFunc normalFunc;
} SUdf;
//TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
//TODO: add private udf structure.
typedef struct SUdfHandle {
SUdf *udf;
} SUdfHandle;
void udfdProcessRequest(uv_work_t *req) {
SUvUdfWork *uvUdf = (SUvUdfWork *) (req->data);
SUdfRequest *request = NULL;
decodeRequest(uvUdf->input.base, uvUdf->input.len, &request);
switch (request->type) {
case UDF_TASK_SETUP: {
debugPrint("%s", "process setup request");
SUdf *udf = malloc(sizeof(SUdf));
udf->refCount = 0;
SUdfSetupRequest *setup = request->subReq;
strcpy(udf->name, setup->udfName);
int err = uv_dlopen(setup->path, &udf->lib);
if (err != 0) {
debugPrint("can not load library %s. error: %s", setup->path, uv_strerror(err));
//TODO set error
}
char normalFuncName[32] = {0};
strcpy(normalFuncName, setup->udfName);
//TODO error,
//TODO find all functions normal, init, destroy, normal, merge, finalize
uv_dlsym(&udf->lib, normalFuncName, (void **) (&udf->normalFunc));
SUdfHandle *handle = malloc(sizeof(SUdfHandle));
handle->udf = udf;
udf->refCount++;
//TODO: allocate private structure and call init function and set it to handle
SUdfResponse *rsp = malloc(sizeof(SUdfResponse));
rsp->seqNum = request->seqNum;
rsp->type = request->type;
rsp->code = 0;
SUdfSetupResponse *subRsp = malloc(sizeof(SUdfSetupResponse));
subRsp->udfHandle = (int64_t) (handle);
rsp->subRsp = subRsp;
char *buf;
int32_t len;
encodeResponse(&buf, &len, rsp);
uvUdf->output = uv_buf_init(buf, len);
free(rsp->subRsp);
free(rsp);
free(request->subReq);
free(request);
free(uvUdf->input.base);
break;
}
case UDF_TASK_CALL: {
debugPrint("%s", "process call request");
SUdfCallRequest *call = request->subReq;
SUdfHandle *handle = (SUdfHandle *) (call->udfHandle);
SUdf *udf = handle->udf;
char *newState;
int32_t newStateSize;
SUdfDataBlock input = {.data = call->input, .size= call->inputBytes};
SUdfDataBlock output;
//TODO: call different functions according to the step
udf->normalFunc(call->step, call->state, call->stateBytes, input, &newState, &newStateSize, &output);
SUdfResponse *rsp = malloc(sizeof(SUdfResponse));
rsp->seqNum = request->seqNum;
rsp->type = request->type;
rsp->code = 0;
SUdfCallResponse *subRsp = malloc(sizeof(SUdfCallResponse));
subRsp->outputBytes = output.size;
subRsp->output = output.data;
subRsp->newStateBytes = newStateSize;
subRsp->newState = newState;
rsp->subRsp = subRsp;
char *buf;
int32_t len;
encodeResponse(&buf, &len, rsp);
uvUdf->output = uv_buf_init(buf, len);
free(rsp->subRsp);
free(rsp);
free(newState);
free(output.data);
free(request->subReq);
free(request);
free(uvUdf->input.base);
break;
}
case UDF_TASK_TEARDOWN: {
debugPrint("%s", "process teardown request");
SUdfTeardownRequest *teardown = request->subReq;
SUdfHandle *handle = (SUdfHandle *) (teardown->udfHandle);
SUdf *udf = handle->udf;
udf->refCount--;
if (udf->refCount == 0) {
uv_dlclose(&udf->lib);
}
free(udf);
//TODO: call destroy and free udf private
free(handle);
SUdfResponse *rsp = malloc(sizeof(SUdfResponse));
rsp->seqNum = request->seqNum;
rsp->type = request->type;
rsp->code = 0;
SUdfTeardownResponse *subRsp = malloc(sizeof(SUdfTeardownResponse));
rsp->subRsp = subRsp;
char *buf;
int32_t len;
encodeResponse(&buf, &len, rsp);
uvUdf->output = uv_buf_init(buf, len);
free(rsp->subRsp);
free(rsp);
free(request->subReq);
free(request);
free(uvUdf->input.base);
break;
}
default: {
break;
}
}
}
void udfdOnWrite(uv_write_t *req, int status) {
debugPrint("%s", "after writing to pipe");
if (status < 0) {
debugPrint("Write error %s", uv_err_name(status));
}
SUvUdfWork *work = (SUvUdfWork *) req->data;
debugPrint("\tlength: %zu", work->output.len);
free(work->output.base);
free(work);
free(req);
}
void udfdSendResponse(uv_work_t *work, int status) {
debugPrint("%s", "send response");
SUvUdfWork *udfWork = (SUvUdfWork *) (work->data);
uv_write_t *write_req = malloc(sizeof(uv_write_t));
write_req->data = udfWork;
uv_write(write_req, udfWork->client, &udfWork->output, 1, udfdOnWrite);
free(work);
}
void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
debugPrint("%s", "allocate buffer for read");
SUdfdUvConn *ctx = handle->data;
int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
if (ctx->inputCap == 0) {
ctx->inputBuf = malloc(msgHeadSize);
if (ctx->inputBuf) {
ctx->inputLen = 0;
ctx->inputCap = msgHeadSize;
ctx->inputTotal = -1;
buf->base = ctx->inputBuf;
buf->len = ctx->inputCap;
} else {
//TODO: log error
buf->base = NULL;
buf->len = 0;
}
} else {
ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap;
void *inputBuf = realloc(ctx->inputBuf, ctx->inputCap);
if (inputBuf) {
ctx->inputBuf = inputBuf;
buf->base = ctx->inputBuf + ctx->inputLen;
buf->len = ctx->inputCap - ctx->inputLen;
} else {
//TODO: log error
buf->base = NULL;
buf->len = 0;
}
}
debugPrint("\tinput buf cap - len - total : %d - %d - %d", ctx->inputCap, ctx->inputLen, ctx->inputTotal);
}
bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
if (pipe->inputTotal == -1 && pipe->inputLen >= sizeof(int32_t)) {
pipe->inputTotal = *(int32_t *) (pipe->inputBuf);
}
if (pipe->inputLen == pipe->inputCap && pipe->inputTotal == pipe->inputCap) {
return true;
}
return false;
}
void udfdHandleRequest(SUdfdUvConn *conn) {
uv_work_t *work = malloc(sizeof(uv_work_t));
SUvUdfWork *udfWork = malloc(sizeof(SUvUdfWork));
udfWork->client = conn->client;
udfWork->input = uv_buf_init(conn->inputBuf, conn->inputLen);
conn->inputBuf = NULL;
conn->inputLen = 0;
conn->inputCap = 0;
conn->inputTotal = -1;
work->data = udfWork;
uv_queue_work(loop, work, udfdProcessRequest, udfdSendResponse);
}
void udfdPipeCloseCb(uv_handle_t *pipe) {
SUdfdUvConn *conn = pipe->data;
free(conn->client);
free(conn->inputBuf);
free(conn);
}
void udfdUvHandleError(SUdfdUvConn *conn) {
uv_close((uv_handle_t *) conn->client, udfdPipeCloseCb);
}
void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
debugPrint("%s, nread: %zd", "read from pipe", nread);
if (nread == 0) return;
SUdfdUvConn *conn = client->data;
if (nread > 0) {
conn->inputLen += nread;
if (isUdfdUvMsgComplete(conn)) {
udfdHandleRequest(conn);
} else {
//log error or continue;
}
return;
}
if (nread < 0) {
debugPrint("Read error %s", uv_err_name(nread));
if (nread == UV_EOF) {
//TODO check more when close
} else {
}
udfdUvHandleError(conn);
}
}
void udfdOnNewConnection(uv_stream_t *server, int status) {
debugPrint("%s", "on new connection");
if (status < 0) {
// TODO
return;
}
uv_pipe_t *client = (uv_pipe_t *) malloc(sizeof(uv_pipe_t));
uv_pipe_init(loop, client, 0);
if (uv_accept(server, (uv_stream_t *) client) == 0) {
SUdfdUvConn *ctx = malloc(sizeof(SUdfdUvConn));
ctx->client = (uv_stream_t *) client;
ctx->inputBuf = 0;
ctx->inputLen = 0;
ctx->inputCap = 0;
client->data = ctx;
ctx->client = (uv_stream_t *) client;
uv_read_start((uv_stream_t *) client, udfdAllocBuffer, udfdPipeRead);
} else {
uv_close((uv_handle_t *) client, NULL);
}
}
void removeListeningPipe(int sig) {
uv_fs_t req;
uv_fs_unlink(loop, &req, "udf.sock", NULL);
exit(0);
}
int main() {
debugPrint("libuv version: %x", UV_VERSION_HEX);
loop = uv_default_loop();
uv_fs_t req;
uv_fs_unlink(loop, &req, "udf.sock", NULL);
uv_pipe_t server;
uv_pipe_init(loop, &server, 0);
signal(SIGINT, removeListeningPipe);
int r;
if ((r = uv_pipe_bind(&server, "udf.sock"))) {
debugPrint("Bind error %s\n", uv_err_name(r));
removeListeningPipe(0);
return 1;
}
if ((r = uv_listen((uv_stream_t *) &server, 128, udfdOnNewConnection))) {
debugPrint("Listen error %s", uv_err_name(r));
return 2;
}
uv_run(loop, UV_RUN_DEFAULT);
uv_loop_close(loop);
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "uv.h"
#include "tudf.h"
int main(int argc, char *argv[]) {
startUdfService();
uv_sleep(1000);
char path[256] = {0};
size_t cwdSize = 256;
int err = uv_cwd(path, &cwdSize);
if (err != 0) {
fprintf(stderr, "err cwd: %s\n", uv_strerror(err));
return err;
}
fprintf(stdout, "current working directory:%s\n", path);
strcat(path, "/libudf1.so");
SUdfInfo udfInfo = {.udfName="udf1", .path=path};
UdfHandle handle;
setupUdf(&udfInfo, &handle);
//char state[5000000] = "state";
//char input[5000000] = "input";
int dataSize = 500;
int callCount = 2;
if (argc > 1) dataSize = atoi(argv[1]);
if (argc > 2) callCount = atoi(argv[2]);
char *state = malloc(dataSize);
char *input = malloc(dataSize);
SUdfDataBlock blockInput = {.data = input, .size = dataSize};
SUdfDataBlock blockOutput;
char* newState;
int32_t newStateSize;
for (int l = 0; l < callCount; ++l) {
callUdf(handle, 0, state, dataSize, blockInput, &newState, &newStateSize, &blockOutput);
}
free(state);
free(input);
teardownUdf(handle);
stopUdfService();
}
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include "tudf.h"
void udf1(int8_t step, char *state, int32_t stateSize, SUdfDataBlock input,
char **newState, int32_t *newStateSize, SUdfDataBlock *output) {
fprintf(stdout, "%s, step:%d\n", "udf function called", step);
char *newStateBuf = malloc(stateSize);
memcpy(newStateBuf, state, stateSize);
*newState = newStateBuf;
*newStateSize = stateSize;
char *outputBuf = malloc(input.size);
memcpy(outputBuf, input.data, input.size);
output->data = outputBuf;
output->size = input.size;
return;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册