提交 8c12733e 编写于 作者: S shenglian zhou

pass compilation before change log

上级 8e3d1025
aux_source_directory(src FUNCTION_SRC)
list(REMOVE_ITEM FUNCTION_SRC src/udfd.c)
add_library(function STATIC ${FUNCTION_SRC})
target_include_directories(
function
......@@ -9,4 +10,16 @@ target_include_directories(
target_link_libraries(
function
PRIVATE os util common nodes
)
add_executable(udfd src/udfd.c)
target_include_directories(
udfd
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/function"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
udfd
PRIVATE os util common nodes function
)
\ No newline at end of file
......@@ -20,10 +20,6 @@
extern "C" {
#endif
#include "os.h"
#include "taoserror.h"
#include "tcommon.h"
//======================================================================================
//begin API to taosd and qworker
/**
......@@ -38,10 +34,20 @@ int32_t startUdfService();
*/
int32_t stopUdfService();
enum {
TSDB_UDF_TYPE_SCALAR = 0,
TSDB_UDF_TYPE_AGGREGATE = 1
};
enum {
TSDB_UDF_SCRIPT_BIN_LIB = 0,
TSDB_UDF_SCRIPT_LUA = 1,
};
typedef struct SUdfInfo {
char *udfName; // function name
int32_t funcType; // scalar function or aggregate function
int8_t isScript;
int32_t udfType; // scalar function or aggregate function
int8_t scriptType;
char *path;
int8_t resType; // result type
......@@ -80,16 +86,14 @@ enum {
* @return error code
*/
//TODO: must change the following after metadata flow and data flow between qworker and udfd is well defined
typedef struct SUdfDataBlock {
int16_t numOfCols;
struct {
char* data;
int32_t length;
} *colsData;
char* data;
int32_t size;
} SUdfDataBlock;
int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SUdfDataBlock *input, char **newstate,
int32_t *newStateSize, SUdfDataBlock **output);
int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SUdfDataBlock input, char **newstate,
int32_t *newStateSize, SUdfDataBlock *output);
/**
* tearn down udf
......@@ -100,7 +104,8 @@ int32_t teardownUdf(UdfHandle handle);
// end API to taosd and qworker
//=============================================================================================================================
// begin API to UDF writer
// TODO: Must change
// begin API to UDF writer.
// script
......@@ -113,24 +118,18 @@ int32_t teardownUdf(UdfHandle handle);
//typedef void (*scriptDestroyFunc)(void* pCtx);
// dynamic lib
typedef int32_t (*udfInitFunc)();
typedef void (*udfDestroyFunc)();
typedef int32_t (*TUdfInitFunc)();
typedef void (*TUdfDestroyFunc)();
typedef void (*TUdfFunc)(int8_t step,
char *state, int32_t stateSize, SUdfDataBlock input,
char **newstate, int32_t *newStateSize, SUdfDataBlock *output);
typedef void (*udfNormalFunc)(char *state, int32_t stateSize, SUdfDataBlock input, char **newstate,
int32_t *newStateSize, SUdfDataBlock *output);
typedef void (*udfMergeFunc)(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput);
typedef void (*udfFinalizeFunc)(char* state, int32_t stateSize, SUdfDataBlock *output);
//typedef void (*udfMergeFunc)(char *data, int32_t numOfRows, char *dataOutput, int32_t* numOfOutput);
//typedef void (*udfFinalizeFunc)(char* state, int32_t stateSize, SUdfDataBlock *output);
// end API to UDF writer
//=======================================================================================================================
enum {
TSDB_UDF_FUNC_NORMAL = 0,
TSDB_UDF_FUNC_INIT,
TSDB_UDF_FUNC_FINALIZE,
TSDB_UDF_FUNC_MERGE,
TSDB_UDF_FUNC_DESTROY,
TSDB_UDF_FUNC_MAX_NUM
};
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TUDF_INT_H
#define TDENGINE_TUDF_INT_H
#ifdef __cplusplus
extern "C" {
#endif
enum {
UDF_TASK_SETUP = 0,
UDF_TASK_CALL = 1,
UDF_TASK_TEARDOWN = 2
};
typedef struct SUdfSetupRequest {
char udfName[16]; //
int8_t scriptType; // 0:c, 1: lua, 2:js
int8_t udfType; //udaf, udf
int16_t pathSize;
char *path;
} SUdfSetupRequest;
typedef struct SUdfSetupResponse {
int64_t udfHandle;
} SUdfSetupResponse;
typedef struct SUdfCallRequest {
int64_t udfHandle;
int8_t step;
int32_t inputBytes;
char *input;
int32_t stateBytes;
char *state;
} SUdfCallRequest;
typedef struct SUdfCallResponse {
int32_t outputBytes;
char *output;
int32_t newStateBytes;
char *newState;
} SUdfCallResponse;
typedef struct SUdfTeardownRequest {
int64_t udfHandle;
} SUdfTeardownRequest;
typedef struct SUdfTeardownResponse {
} SUdfTeardownResponse;
typedef struct SUdfRequest {
int32_t msgLen;
int64_t seqNum;
int8_t type;
void *subReq;
} SUdfRequest;
typedef struct SUdfResponse {
int32_t msgLen;
int64_t seqNum;
int8_t type;
int32_t code;
void *subRsp;
} SUdfResponse;
int32_t decodeRequest(char *buf, int32_t bufLen, SUdfRequest **pRequest);
int32_t encodeResponse(char **buf, int32_t *bufLen, SUdfResponse *response);
int32_t encodeRequest(char **buf, int32_t *bufLen, SUdfRequest *request);
int32_t decodeResponse(char *buf, int32_t bufLen, SUdfResponse **pResponse);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TUDF_INT_H
此差异已折叠。
此差异已折叠。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <uv.h>
#include <udf.h>
#include <stdbool.h>
uv_loop_t *loop;
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "uv.h"
#include "os.h"
#include "tudf.h"
#include "tudfInt.h"
//TODO replaces them with qDebug
#define DEBUG
#ifdef DEBUG
#define debugPrint(...) fprintf(__VA_ARGS__)
#else
#define debugPrint(...) /**/
#endif
static uv_loop_t *loop;
typedef struct SUdfdUvConn {
uv_stream_t *client;
......@@ -21,10 +42,6 @@ typedef struct SUvUdfWork {
uv_buf_t output;
} SUvUdfWork;
typedef void (*TUdfNormalFunc)(char *state, int32_t stateSize, char **newstate, int32_t *newStateSize,
SSDataBlock input, SSDataBlock *output);
typedef struct SUdf {
int32_t refCount;
......@@ -32,8 +49,7 @@ typedef struct SUdf {
int8_t type;
uv_lib_t lib;
TUdfNormalFunc normalFunc;
TUdfFunc normalFunc;
} SUdf;
//TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
......@@ -43,10 +59,6 @@ typedef struct SUdfHandle {
} SUdfHandle;
typedef void (*TUdfNormalFunc)(char *state, int32_t stateSize, char **newstate, int32_t *newStateSize,
SSDataBlock input, SSDataBlock *output);
void udfdProcessRequest(uv_work_t *req) {
SUvUdfWork *uvUdf = (SUvUdfWork *) (req->data);
SUdfRequest *request = NULL;
......@@ -67,8 +79,7 @@ void udfdProcessRequest(uv_work_t *req) {
char normalFuncName[32] = {0};
strcpy(normalFuncName, setup->udfName);
strcat(normalFuncName, "_normal");
//TODO error,
//TODO error,
//TODO find all functions normal, init, destroy, normal, merge, finalize
uv_dlsym(&udf->lib, normalFuncName, (void **) (&udf->normalFunc));
......@@ -104,10 +115,10 @@ void udfdProcessRequest(uv_work_t *req) {
SUdf *udf = handle->udf;
char *newState;
int32_t newStateSize;
SSDataBlock input = {.data = call->input, .size= call->inputBytes};
SSDataBlock output;
SUdfDataBlock input = {.data = call->input, .size= call->inputBytes};
SUdfDataBlock output;
//TODO: call different functions according to the step
udf->normalFunc(call->state, call->stateBytes, &newState, &newStateSize, input, &output);
udf->normalFunc(call->step, call->state, call->stateBytes, input, &newState, &newStateSize, &output);
SUdfResponse *rsp = malloc(sizeof(SUdfResponse));
rsp->seqNum = request->seqNum;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册