未验证 提交 b62b1786 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #11572 from taosdata/3.0_udfd

feat(udf): UDF service refactoring with new interface
...@@ -5,6 +5,9 @@ target_include_directories( ...@@ -5,6 +5,9 @@ target_include_directories(
function function
PUBLIC PUBLIC
"${TD_SOURCE_DIR}/include/libs/function" "${TD_SOURCE_DIR}/include/libs/function"
"${TD_SOURCE_DIR}/include/util"
"${TD_SOURCE_DIR}/include/common"
"${TD_SOURCE_DIR}/include/client"
"${TD_SOURCE_DIR}/contrib/libuv/include" "${TD_SOURCE_DIR}/contrib/libuv/include"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
) )
...@@ -21,6 +24,9 @@ target_include_directories( ...@@ -21,6 +24,9 @@ target_include_directories(
PUBLIC PUBLIC
"${TD_SOURCE_DIR}/include/libs/function" "${TD_SOURCE_DIR}/include/libs/function"
"${TD_SOURCE_DIR}/contrib/libuv/include" "${TD_SOURCE_DIR}/contrib/libuv/include"
"${TD_SOURCE_DIR}/include/util"
"${TD_SOURCE_DIR}/include/common"
"${TD_SOURCE_DIR}/include/client"
"${TD_SOURCE_DIR}/include/os" "${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
) )
...@@ -35,6 +41,9 @@ target_include_directories( ...@@ -35,6 +41,9 @@ target_include_directories(
udf1 udf1
PUBLIC PUBLIC
"${TD_SOURCE_DIR}/include/libs/function" "${TD_SOURCE_DIR}/include/libs/function"
"${TD_SOURCE_DIR}/include/util"
"${TD_SOURCE_DIR}/include/common"
"${TD_SOURCE_DIR}/include/client"
"${TD_SOURCE_DIR}/include/os" "${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
) )
...@@ -46,6 +55,10 @@ target_include_directories( ...@@ -46,6 +55,10 @@ target_include_directories(
PUBLIC PUBLIC
"${TD_SOURCE_DIR}/include/libs/function" "${TD_SOURCE_DIR}/include/libs/function"
"${TD_SOURCE_DIR}/contrib/libuv/include" "${TD_SOURCE_DIR}/contrib/libuv/include"
"${TD_SOURCE_DIR}/include/util"
"${TD_SOURCE_DIR}/include/common"
"${TD_SOURCE_DIR}/include/libs/transport"
"${TD_SOURCE_DIR}/include/client"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
) )
......
...@@ -16,12 +16,25 @@ ...@@ -16,12 +16,25 @@
#ifndef TDENGINE_TUDF_H #ifndef TDENGINE_TUDF_H
#define TDENGINE_TUDF_H #define TDENGINE_TUDF_H
#include <stdint.h>
#include <stdbool.h>
#include "tmsg.h"
#include "tcommon.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
//====================================================================================== //======================================================================================
//begin API to taosd and qworker //begin API to taosd and qworker
enum {
UDFC_CODE_STOPPING = -1,
UDFC_CODE_RESTARTING = -2,
UDFC_CODE_PIPE_READ_ERR = -3,
};
/** /**
* start udf dameon service * start udf dameon service
* @return error code * @return error code
...@@ -34,28 +47,6 @@ int32_t startUdfService(); ...@@ -34,28 +47,6 @@ int32_t startUdfService();
*/ */
int32_t stopUdfService(); int32_t stopUdfService();
enum {
TSDB_UDF_TYPE_SCALAR = 0,
TSDB_UDF_TYPE_AGGREGATE = 1
};
enum {
TSDB_UDF_SCRIPT_BIN_LIB = 0,
TSDB_UDF_SCRIPT_LUA = 1,
};
typedef struct SUdfInfo {
char *udfName; // function name
int32_t udfType; // scalar function or aggregate function
int8_t scriptType;
char *path;
int8_t resType; // result type
int16_t resBytes; // result byte
int32_t bufSize; //interbuf size
} SUdfInfo;
typedef void *UdfHandle; typedef void *UdfHandle;
/** /**
...@@ -64,36 +55,67 @@ typedef void *UdfHandle; ...@@ -64,36 +55,67 @@ typedef void *UdfHandle;
* @param handle, out * @param handle, out
* @return error code * @return error code
*/ */
int32_t setupUdf(SUdfInfo* udf, UdfHandle *handle); int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle);
typedef struct SUdfColumnMeta {
enum { int16_t type;
TSDB_UDF_STEP_NORMAL = 0, int32_t bytes; // <0 var length, others fixed length bytes
TSDB_UDF_STEP_MERGE, uint8_t precision;
TSDb_UDF_STEP_FINALIZE, uint8_t scale;
TSDB_UDF_STEP_MAX_NUM } SUdfColumnMeta;
};
/** typedef struct SUdfColumnData {
* call udf int32_t numOfRows;
* @param handle udf handle bool varLengthColumn;
* @param step union {
* @param state struct {
* @param stateSize int32_t nullBitmapLen;
* @param input char *nullBitmap;
* @param newstate int32_t dataLen;
* @param newStateSize char *data;
* @param output } fixLenCol;
* @return error code
*/ struct {
int32_t varOffsetsLen;
char *varOffsets;
int32_t payloadLen;
char *payload;
} varLenCol;
};
} SUdfColumnData;
typedef struct SUdfColumn {
SUdfColumnMeta colMeta;
SUdfColumnData colData;
} SUdfColumn;
//TODO: must change the following after metadata flow and data flow between qworker and udfd is well defined
typedef struct SUdfDataBlock { typedef struct SUdfDataBlock {
char* data; int32_t numOfRows;
int32_t size; int32_t numOfCols;
SUdfColumn **udfCols;
} SUdfDataBlock; } SUdfDataBlock;
int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SUdfDataBlock input, char **newstate, typedef struct SUdfInterBuf {
int32_t *newStateSize, SUdfDataBlock *output); int32_t bufLen;
char* buf;
} SUdfInterBuf;
//TODO: translate these calls to callUdf
// output: interBuf
int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf);
// input: block, state
// output: newState
int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState);
// input: interBuf
// output: resultData
int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData);
// input: interbuf1, interbuf2
// output: resultBuf
int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf);
// input: block
// output: resultData
int32_t callUdfScalaProcess(UdfHandle handle, SSDataBlock *block, SSDataBlock *resultData);
/** /**
* tearn down udf * tearn down udf
...@@ -104,29 +126,25 @@ int32_t teardownUdf(UdfHandle handle); ...@@ -104,29 +126,25 @@ int32_t teardownUdf(UdfHandle handle);
// end API to taosd and qworker // end API to taosd and qworker
//============================================================================================================================= //=============================================================================================================================
// TODO: Must change
// begin API to UDF writer. // begin API to UDF writer.
// script // dynamic lib init and destroy
typedef int32_t (*TUdfSetupFunc)();
typedef int32_t (*TUdfTeardownFunc)();
//typedef int32_t (*scriptInitFunc)(void* pCtx); //TODO: add API to check function arguments type, number etc.
//typedef void (*scriptNormalFunc)(void* pCtx, char* data, int16_t iType, int16_t iBytes, int32_t numOfRows, //TODO: another way to manage memory is provide api for UDF to add data to SUdfColumnData and UDF framework will allocate memory.
// int64_t* ptList, int64_t key, char* dataOutput, char* tsOutput, int32_t* numOfOutput, // then UDF framework will free the memory
// int16_t oType, int16_t oBytes); //typedef int32_t addFixedLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t colBytes, char* data);
//typedef void (*scriptFinalizeFunc)(void* pCtx, int64_t key, char* dataOutput, int32_t* numOfOutput); //typedef int32_t addVariableLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t dataLen, char * data);
//typedef void (*scriptMergeFunc)(void* pCtx, char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput);
//typedef void (*scriptDestroyFunc)(void* pCtx);
// dynamic lib typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column);
typedef int32_t (*TUdfInitFunc)();
typedef void (*TUdfDestroyFunc)();
typedef void (*TUdfFunc)(int8_t step, typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock block, SUdfColumn *resultCol);
char *state, int32_t stateSize, SUdfDataBlock input, typedef int32_t (*TUdfAggInitFunc)(SUdfInterBuf *buf);
char **newstate, int32_t *newStateSize, SUdfDataBlock *output); typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock block, SUdfInterBuf *interBuf);
typedef int32_t (*TUdfAggFinalizeFunc)(SUdfInterBuf buf, SUdfInterBuf *resultData);
//typedef void (*udfMergeFunc)(char *data, int32_t numOfRows, char *dataOutput, int32_t* numOfOutput);
//typedef void (*udfFinalizeFunc)(char* state, int32_t stateSize, SUdfDataBlock *output);
// end API to UDF writer // end API to UDF writer
//======================================================================================================================= //=======================================================================================================================
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
#ifndef TDENGINE_TUDF_INT_H #ifndef TDENGINE_TUDF_INT_H
#define TDENGINE_TUDF_INT_H #define TDENGINE_TUDF_INT_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
...@@ -30,36 +29,37 @@ enum { ...@@ -30,36 +29,37 @@ enum {
}; };
enum {
TSDB_UDF_CALL_AGG_INIT = 0,
TSDB_UDF_CALL_AGG_PROC,
TSDB_UDF_CALL_AGG_MERGE,
TSDB_UDF_CALL_AGG_FIN,
TSDB_UDF_CALL_SCALA_PROC,
};
typedef struct SUdfSetupRequest { typedef struct SUdfSetupRequest {
char udfName[16]; // char udfName[TSDB_FUNC_NAME_LEN];
int8_t scriptType; // 0:c, 1: lua, 2:js SEpSet epSet;
int8_t udfType; //udaf, udf
int16_t pathSize;
char *path;
} SUdfSetupRequest; } SUdfSetupRequest;
typedef struct SUdfSetupResponse { typedef struct SUdfSetupResponse {
int64_t udfHandle; int64_t udfHandle;
} SUdfSetupResponse; } SUdfSetupResponse;
typedef struct SUdfCallRequest { typedef struct SUdfCallRequest {
int64_t udfHandle; int64_t udfHandle;
int8_t step; int8_t callType;
int32_t inputBytes; SSDataBlock block;
char *input; SUdfInterBuf interBuf;
SUdfInterBuf interBuf2;
int32_t stateBytes; int8_t initFirst;
char *state;
} SUdfCallRequest; } SUdfCallRequest;
typedef struct SUdfCallResponse { typedef struct SUdfCallResponse {
int32_t outputBytes; int8_t callType;
char *output; SSDataBlock resultData;
int32_t newStateBytes; SUdfInterBuf resultBuf;
char *newState;
} SUdfCallResponse; } SUdfCallResponse;
...@@ -76,7 +76,11 @@ typedef struct SUdfRequest { ...@@ -76,7 +76,11 @@ typedef struct SUdfRequest {
int64_t seqNum; int64_t seqNum;
int8_t type; int8_t type;
void *subReq; union {
SUdfSetupRequest setup;
SUdfCallRequest call;
SUdfTeardownRequest teardown;
};
} SUdfRequest; } SUdfRequest;
typedef struct SUdfResponse { typedef struct SUdfResponse {
...@@ -85,13 +89,25 @@ typedef struct SUdfResponse { ...@@ -85,13 +89,25 @@ typedef struct SUdfResponse {
int8_t type; int8_t type;
int32_t code; int32_t code;
void *subRsp; union {
SUdfSetupResponse setupRsp;
SUdfCallResponse callRsp;
SUdfTeardownResponse teardownRsp;
};
} SUdfResponse; } SUdfResponse;
int32_t decodeRequest(char *buf, int32_t bufLen, SUdfRequest **pRequest); int32_t encodeUdfRequest(void **buf, const SUdfRequest* request);
int32_t encodeResponse(char **buf, int32_t *bufLen, SUdfResponse *response); void* decodeUdfRequest(const void *buf, SUdfRequest* request);
int32_t encodeRequest(char **buf, int32_t *bufLen, SUdfRequest *request);
int32_t decodeResponse(char *buf, int32_t bufLen, SUdfResponse **pResponse); int32_t encodeUdfResponse(void **buf, const SUdfResponse *response);
void* decodeUdfResponse(const void* buf, SUdfResponse *response);
void freeUdfColumnData(SUdfColumnData *data);
void freeUdfColumn(SUdfColumn* col);
void freeUdfDataDataBlock(SUdfDataBlock *block);
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock);
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -17,13 +17,107 @@ ...@@ -17,13 +17,107 @@
#include "tlog.h" #include "tlog.h"
#include "tudf.h" #include "tudf.h"
#include "tudfInt.h" #include "tudfInt.h"
#include "tarray.h"
#include "tdatablock.h"
//TODO: when startup, set thread poll size. add it to cfg //TODO: when startup, set thread poll size. add it to cfg
//TODO: test for udfd restart
//TODO: udfd restart when exist or aborts //TODO: udfd restart when exist or aborts
//TODO: deal with uv task that has been started and then udfd core dumped
//TODO: network error processing. //TODO: network error processing.
//TODO: add unit test //TODO: add unit test
//TODO: add lua support //TODO: include all global variable under context struct
void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf); /* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
* The QUEUE is copied from queue.h under libuv
* */
typedef void *QUEUE[2];
/* Private macros. */
#define QUEUE_NEXT(q) (*(QUEUE **) &((*(q))[0]))
#define QUEUE_PREV(q) (*(QUEUE **) &((*(q))[1]))
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
/* Public macros. */
#define QUEUE_DATA(ptr, type, field) \
((type *) ((char *) (ptr) - offsetof(type, field)))
/* Important note: mutating the list while QUEUE_FOREACH is
* iterating over its elements results in undefined behavior.
*/
#define QUEUE_FOREACH(q, h) \
for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))
#define QUEUE_EMPTY(q) \
((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q))
#define QUEUE_HEAD(q) \
(QUEUE_NEXT(q))
#define QUEUE_INIT(q) \
do { \
QUEUE_NEXT(q) = (q); \
QUEUE_PREV(q) = (q); \
} \
while (0)
#define QUEUE_ADD(h, n) \
do { \
QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \
QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \
QUEUE_PREV(h) = QUEUE_PREV(n); \
QUEUE_PREV_NEXT(h) = (h); \
} \
while (0)
#define QUEUE_SPLIT(h, q, n) \
do { \
QUEUE_PREV(n) = QUEUE_PREV(h); \
QUEUE_PREV_NEXT(n) = (n); \
QUEUE_NEXT(n) = (q); \
QUEUE_PREV(h) = QUEUE_PREV(q); \
QUEUE_PREV_NEXT(h) = (h); \
QUEUE_PREV(q) = (n); \
} \
while (0)
#define QUEUE_MOVE(h, n) \
do { \
if (QUEUE_EMPTY(h)) \
QUEUE_INIT(n); \
else { \
QUEUE* q = QUEUE_HEAD(h); \
QUEUE_SPLIT(h, q, n); \
} \
} \
while (0)
#define QUEUE_INSERT_HEAD(h, q) \
do { \
QUEUE_NEXT(q) = QUEUE_NEXT(h); \
QUEUE_PREV(q) = (h); \
QUEUE_NEXT_PREV(q) = (q); \
QUEUE_NEXT(h) = (q); \
} \
while (0)
#define QUEUE_INSERT_TAIL(h, q) \
do { \
QUEUE_NEXT(q) = (h); \
QUEUE_PREV(q) = QUEUE_PREV(h); \
QUEUE_PREV_NEXT(q) = (q); \
QUEUE_PREV(h) = (q); \
} \
while (0)
#define QUEUE_REMOVE(q) \
do { \
QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \
QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \
} \
while (0)
enum { enum {
UV_TASK_CONNECT = 0, UV_TASK_CONNECT = 0,
...@@ -48,8 +142,9 @@ typedef struct SClientUvTaskNode { ...@@ -48,8 +142,9 @@ typedef struct SClientUvTaskNode {
uv_sem_t taskSem; uv_sem_t taskSem;
uv_buf_t rspBuf; uv_buf_t rspBuf;
struct SClientUvTaskNode *prev; QUEUE recvTaskQueue;
struct SClientUvTaskNode *next; QUEUE procTaskQueue;
QUEUE connTaskQueue;
} SClientUvTaskNode; } SClientUvTaskNode;
typedef struct SClientUdfTask { typedef struct SClientUdfTask {
...@@ -86,7 +181,7 @@ typedef struct SClientConnBuf { ...@@ -86,7 +181,7 @@ typedef struct SClientConnBuf {
typedef struct SClientUvConn { typedef struct SClientUvConn {
uv_pipe_t *pipe; uv_pipe_t *pipe;
SClientUvTaskNode taskQueue; QUEUE taskQueue;
SClientConnBuf readBuf; SClientConnBuf readBuf;
} SClientUvConn; } SClientUvConn;
...@@ -103,376 +198,377 @@ uv_async_t gUdfLoopStopAsync; ...@@ -103,376 +198,377 @@ uv_async_t gUdfLoopStopAsync;
uv_mutex_t gUdfTaskQueueMutex; uv_mutex_t gUdfTaskQueueMutex;
int64_t gUdfTaskSeqNum = 0; int64_t gUdfTaskSeqNum = 0;
//double circular linked list enum {
typedef SClientUvTaskNode *SClientUvTaskQueue; UDFC_STATE_INITAL = 0, // initial state
SClientUvTaskNode gUdfQueueNode; UDFC_STATE_STARTNG, // starting after startUdfService
SClientUvTaskQueue gUdfTaskQueue = &gUdfQueueNode; UDFC_STATE_READY, // started and begin to receive quests
UDFC_STATE_RESTARTING, // udfd abnormal exit. cleaning up and restart.
UDFC_STATE_STOPPING, // stopping after stopUdfService
UDFC_STATUS_FINAL, // stopped
};
int8_t gUdfcState = UDFC_STATE_INITAL;
//add SClientUvTaskNode task that close conn //double circular linked list
QUEUE gUdfTaskQueue = {0};
QUEUE gUvProcTaskQueue = {0};
void udfTaskQueueInit(SClientUvTaskQueue q) { int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
q->next = q; int32_t len = 0;
q->prev = q; len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN);
len += taosEncodeSEpSet(buf, &setup->epSet);
return len;
} }
bool udfTaskQueueIsEmpty(SClientUvTaskQueue q) { void* decodeUdfSetupRequest(const void* buf, SUdfSetupRequest *request) {
return q == q->next; buf = taosDecodeBinaryTo(buf, request->udfName, TSDB_FUNC_NAME_LEN);
buf = taosDecodeSEpSet((void*)buf, &request->epSet);
return (void*)buf;
} }
void udfTaskQueueInsertTail(SClientUvTaskQueue q, SClientUvTaskNode *e) { int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf* state) {
e->next = q; int32_t len = 0;
e->prev = q->prev; len += taosEncodeFixedI32(buf, state->bufLen);
e->prev->next = e; len += taosEncodeBinary(buf, state->buf, state->bufLen);
q->prev = e; return len;
} }
void udfTaskQueueInsertTaskAtHead(SClientUvTaskQueue q, SClientUvTaskNode *e) { void* decodeUdfInterBuf(const void* buf, SUdfInterBuf* state) {
e->next = q->next; buf = taosDecodeFixedI32(buf, &state->bufLen);
e->prev = q; buf = taosDecodeBinary(buf, (void**)&state->buf, state->bufLen);
q->next->prev = e; return (void*)buf;
q->next = e;
} }
void udfTaskQueueRemoveTask(SClientUvTaskNode *e) { int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) {
e->prev->next = e->next; int32_t len = 0;
e->next->prev = e->prev; len += taosEncodeFixedI64(buf, call->udfHandle);
len += taosEncodeFixedI8(buf, call->callType);
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
len += tEncodeDataBlock(buf, &call->block);
} else if (call->callType == TSDB_UDF_CALL_AGG_INIT) {
len += taosEncodeFixedI8(buf, call->initFirst);
} else if (call->callType == TSDB_UDF_CALL_AGG_PROC) {
len += tEncodeDataBlock(buf, &call->block);
len += encodeUdfInterBuf(buf, &call->interBuf);
} else if (call->callType == TSDB_UDF_CALL_AGG_MERGE) {
len += encodeUdfInterBuf(buf, &call->interBuf);
len += encodeUdfInterBuf(buf, &call->interBuf2);
} else if (call->callType == TSDB_UDF_CALL_AGG_FIN) {
len += encodeUdfInterBuf(buf, &call->interBuf);
}
return len;
} }
void udfTaskQueueSplit(SClientUvTaskQueue q, SClientUvTaskNode *from, SClientUvTaskQueue n) { void* decodeUdfCallRequest(const void* buf, SUdfCallRequest* call) {
n->prev = q->prev; buf = taosDecodeFixedI64(buf, &call->udfHandle);
n->prev->next = n; buf = taosDecodeFixedI8(buf, &call->callType);
n->next = from; switch (call->callType) {
q->prev = from->prev; case TSDB_UDF_CALL_SCALA_PROC:
q->prev->next = q; buf = tDecodeDataBlock(buf, &call->block);
from->prev = n; break;
case TSDB_UDF_CALL_AGG_INIT:
buf = taosDecodeFixedI8(buf, &call->initFirst);
break;
case TSDB_UDF_CALL_AGG_PROC:
buf = tDecodeDataBlock(buf, &call->block);
buf = decodeUdfInterBuf(buf, &call->interBuf);
break;
case TSDB_UDF_CALL_AGG_MERGE:
buf = decodeUdfInterBuf(buf, &call->interBuf);
buf = decodeUdfInterBuf(buf, &call->interBuf2);
break;
case TSDB_UDF_CALL_AGG_FIN:
buf = decodeUdfInterBuf(buf, &call->interBuf);
break;
}
return (void*)buf;
} }
SClientUvTaskNode *udfTaskQueueHeadTask(SClientUvTaskQueue q) { int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown) {
return q->next; int32_t len = 0;
len += taosEncodeFixedI64(buf, teardown->udfHandle);
return len;
} }
SClientUvTaskNode *udfTaskQueueTailTask(SClientUvTaskQueue q) { void* decodeUdfTeardownRequest(const void* buf, SUdfTeardownRequest *teardown) {
return q->prev; buf = taosDecodeFixedI64(buf, &teardown->udfHandle);
return (void*)buf;
} }
SClientUvTaskNode *udfTaskQueueNext(SClientUvTaskNode *e) { int32_t encodeUdfRequest(void** buf, const SUdfRequest* request) {
return e->next; int32_t len = 0;
if (buf == NULL) {
len += sizeof(request->msgLen);
} else {
*(int32_t*)(*buf) = request->msgLen;
*buf = POINTER_SHIFT(*buf, sizeof(request->msgLen));
}
len += taosEncodeFixedI64(buf, request->seqNum);
len += taosEncodeFixedI8(buf, request->type);
if (request->type == UDF_TASK_SETUP) {
len += encodeUdfSetupRequest(buf, &request->setup);
} else if (request->type == UDF_TASK_CALL) {
len += encodeUdfCallRequest(buf, &request->call);
} else if (request->type == UDF_TASK_TEARDOWN) {
len += encodeUdfTeardownRequest(buf, &request->teardown);
}
return len;
} }
void udfTaskQueueMove(SClientUvTaskQueue q, SClientUvTaskQueue n) { void* decodeUdfRequest(const void* buf, SUdfRequest* request) {
if (udfTaskQueueIsEmpty(q)) { request->msgLen = *(int32_t*)(buf);
udfTaskQueueInit(n); buf = POINTER_SHIFT(buf, sizeof(request->msgLen));
} else {
SClientUvTaskNode *h = udfTaskQueueHeadTask(q); buf = taosDecodeFixedI64(buf, &request->seqNum);
udfTaskQueueSplit(q, h, n); buf = taosDecodeFixedI8(buf, &request->type);
if (request->type == UDF_TASK_SETUP) {
buf = decodeUdfSetupRequest(buf, &request->setup);
} else if (request->type == UDF_TASK_CALL) {
buf = decodeUdfCallRequest(buf, &request->call);
} else if (request->type == UDF_TASK_TEARDOWN) {
buf = decodeUdfTeardownRequest(buf, &request->teardown);
} }
return (void*)buf;
} }
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
int32_t len = 0;
len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
return len;
}
int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) { void* decodeUdfSetupResponse(const void* buf, SUdfSetupResponse* setupRsp) {
debugPrint("%s", "encoding request"); buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
return (void*)buf;
}
int len = sizeof(SUdfRequest) - sizeof(void *); int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) {
switch (request->type) { int32_t len = 0;
case UDF_TASK_SETUP: { len += taosEncodeFixedI8(buf, callRsp->callType);
SUdfSetupRequest *setup = (SUdfSetupRequest *) (request->subReq); switch (callRsp->callType) {
len += sizeof(SUdfSetupRequest) - 1 * sizeof(char *) + setup->pathSize; case TSDB_UDF_CALL_SCALA_PROC:
len += tEncodeDataBlock(buf, &callRsp->resultData);
break; break;
} case TSDB_UDF_CALL_AGG_INIT:
case UDF_TASK_CALL: { len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
SUdfCallRequest *call = (SUdfCallRequest *) (request->subReq);
len += sizeof(SUdfCallRequest) - 2 * sizeof(char *) + call->inputBytes + call->stateBytes;
break; break;
} case TSDB_UDF_CALL_AGG_PROC:
case UDF_TASK_TEARDOWN: { len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
SUdfTeardownRequest *teardown = (SUdfTeardownRequest *) (request->subReq);
len += sizeof(SUdfTeardownRequest);
break; break;
} case TSDB_UDF_CALL_AGG_MERGE:
default: len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
break;
case TSDB_UDF_CALL_AGG_FIN:
len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
break; break;
} }
return len;
}
char *bufBegin = taosMemoryMalloc(len); void* decodeUdfCallResponse(const void* buf, SUdfCallResponse* callRsp) {
char *buf = bufBegin; buf = taosDecodeFixedI8(buf, &callRsp->callType);
switch (callRsp->callType) {
//skip msgLen first case TSDB_UDF_CALL_SCALA_PROC:
buf += sizeof(int32_t); buf = tDecodeDataBlock(buf, &callRsp->resultData);
*(int64_t *) buf = request->seqNum;
buf += sizeof(int64_t);
*(int8_t *) buf = request->type;
buf += sizeof(int8_t);
switch (request->type) {
case UDF_TASK_SETUP: {
SUdfSetupRequest *setup = (SUdfSetupRequest *) (request->subReq);
memcpy(buf, setup->udfName, 16);
buf += 16;
*(int8_t *) buf = setup->scriptType;
buf += sizeof(int8_t);
*(int8_t *) buf = setup->udfType;
buf += sizeof(int8_t);
*(int16_t *) buf = setup->pathSize;
buf += sizeof(int16_t);
memcpy(buf, setup->path, setup->pathSize);
buf += setup->pathSize;
break; break;
} case TSDB_UDF_CALL_AGG_INIT:
buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
case UDF_TASK_CALL: {
SUdfCallRequest *call = (SUdfCallRequest *) (request->subReq);
*(int64_t *) buf = call->udfHandle;
buf += sizeof(int64_t);
*(int8_t *) buf = call->step;
buf += sizeof(int8_t);
*(int32_t *) buf = call->inputBytes;
buf += sizeof(int32_t);
memcpy(buf, call->input, call->inputBytes);
buf += call->inputBytes;
*(int32_t *) buf = call->stateBytes;
buf += sizeof(int32_t);
memcpy(buf, call->state, call->stateBytes);
buf += call->stateBytes;
break; break;
} case TSDB_UDF_CALL_AGG_PROC:
buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
case UDF_TASK_TEARDOWN: {
SUdfTeardownRequest *teardown = (SUdfTeardownRequest *) (request->subReq);
*(int64_t *) buf = teardown->udfHandle;
buf += sizeof(int64_t);
break; break;
} case TSDB_UDF_CALL_AGG_MERGE:
default: buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
break;
case TSDB_UDF_CALL_AGG_FIN:
buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
break; break;
} }
return (void*)buf;
}
request->msgLen = buf - bufBegin; int32_t encodeUdfTeardownResponse(void** buf, const SUdfTeardownResponse* teardownRsp) {
*(int32_t *) bufBegin = request->msgLen;
*pBuf = bufBegin;
*pBufLen = request->msgLen;
return 0; return 0;
} }
int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest **pRequest) { void* decodeUdfTeardownResponse(const void* buf, SUdfTeardownResponse* teardownResponse) {
debugPrint("%s", "decoding request"); return (void*)buf;
if (*(int32_t *) bufMsg != bufLen) { }
debugPrint("%s", "decoding request error");
return -1;
}
char *buf = bufMsg;
SUdfRequest *request = taosMemoryMalloc(sizeof(SUdfRequest));
request->subReq = NULL;
request->msgLen = *(int32_t *) (buf);
buf += sizeof(int32_t);
request->seqNum = *(int64_t *) (buf);
buf += sizeof(int64_t);
request->type = *(int8_t *) (buf);
buf += sizeof(int8_t);
switch (request->type) {
case UDF_TASK_SETUP: {
SUdfSetupRequest *setup = taosMemoryMalloc(sizeof(SUdfSetupRequest));
memcpy(setup->udfName, buf, 16);
buf += 16;
setup->scriptType = *(int8_t *) buf;
buf += sizeof(int8_t);
setup->udfType = *(int8_t *) buf;
buf += sizeof(int8_t);
setup->pathSize = *(int16_t *) buf;
buf += sizeof(int16_t);
setup->path = buf;
buf += setup->pathSize;
request->subReq = setup;
break;
}
case UDF_TASK_CALL: {
SUdfCallRequest *call = taosMemoryMalloc(sizeof(SUdfCallRequest));
call->udfHandle = *(int64_t *) buf;
buf += sizeof(int64_t);
call->step = *(int8_t *) buf;
buf += sizeof(int8_t);
call->inputBytes = *(int32_t *) buf;
buf += sizeof(int32_t);
call->input = buf;
buf += call->inputBytes;
call->stateBytes = *(int32_t *) buf;
buf += sizeof(int32_t);
call->state = buf;
buf += call->stateBytes;
request->subReq = call;
break;
}
case UDF_TASK_TEARDOWN: {
SUdfTeardownRequest *teardown = taosMemoryMalloc(sizeof(SUdfTeardownRequest));
teardown->udfHandle = *(int64_t *) buf;
buf += sizeof(int64_t);
request->subReq = teardown;
}
int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp) {
int32_t len = 0;
if (buf == NULL) {
len += sizeof(rsp->msgLen);
} else {
*(int32_t*)(*buf) = rsp->msgLen;
*buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen));
} }
if (buf - bufMsg != bufLen) {
debugPrint("%s", "decode request error");
taosMemoryFree(request->subReq);
taosMemoryFree(request);
return -1;
}
*pRequest = request;
return 0;
}
int32_t encodeResponse(char **pBuf, int32_t *pBufLen, SUdfResponse *response) { if (buf == NULL) {
debugPrint("%s", "encoding response"); len += sizeof(rsp->seqNum);
} else {
*(int64_t*)(*buf) = rsp->seqNum;
*buf = POINTER_SHIFT(*buf, sizeof(rsp->seqNum));
}
int32_t len = sizeof(SUdfResponse) - sizeof(void *); len += taosEncodeFixedI64(buf, rsp->seqNum);
len += taosEncodeFixedI8(buf, rsp->type);
len += taosEncodeFixedI32(buf, rsp->code);
switch (response->type) { switch (rsp->type) {
case UDF_TASK_SETUP: { case UDF_TASK_SETUP:
len += sizeof(SUdfSetupResponse); len += encodeUdfSetupResponse(buf, &rsp->setupRsp);
break; break;
} case UDF_TASK_CALL:
case UDF_TASK_CALL: { len += encodeUdfCallResponse(buf, &rsp->callRsp);
SUdfCallResponse *callResp = (SUdfCallResponse *) (response->subRsp);
len += sizeof(SUdfCallResponse) - 2 * sizeof(char *) +
callResp->outputBytes + callResp->newStateBytes;
break; break;
} case UDF_TASK_TEARDOWN:
case UDF_TASK_TEARDOWN: { len += encodeUdfTeardownResponse(buf, &rsp->teardownRsp);
len += sizeof(SUdfTeardownResponse); break;
default:
//TODO: log error
break; break;
}
} }
return len;
}
char *bufBegin = taosMemoryMalloc(len); void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) {
char *buf = bufBegin; rsp->msgLen = *(int32_t*)(buf);
buf = POINTER_SHIFT(buf, sizeof(rsp->msgLen));
//skip msgLen rsp->seqNum = *(int64_t*)(buf);
buf += sizeof(int32_t); buf = POINTER_SHIFT(buf, sizeof(rsp->seqNum));
buf = taosDecodeFixedI64(buf, &rsp->seqNum);
*(int64_t *) buf = response->seqNum; buf = taosDecodeFixedI8(buf, &rsp->type);
buf += sizeof(int64_t); buf = taosDecodeFixedI32(buf, &rsp->code);
*(int8_t *) buf = response->type;
buf += sizeof(int8_t);
*(int32_t *) buf = response->code;
buf += sizeof(int32_t);
switch (response->type) { switch (rsp->type) {
case UDF_TASK_SETUP: { case UDF_TASK_SETUP:
SUdfSetupResponse *setupResp = (SUdfSetupResponse *) (response->subRsp); buf = decodeUdfSetupResponse(buf, &rsp->setupRsp);
*(int64_t *) buf = setupResp->udfHandle;
buf += sizeof(int64_t);
break; break;
} case UDF_TASK_CALL:
case UDF_TASK_CALL: { buf = decodeUdfCallResponse(buf, &rsp->callRsp);
SUdfCallResponse *callResp = (SUdfCallResponse *) (response->subRsp);
*(int32_t *) buf = callResp->outputBytes;
buf += sizeof(int32_t);
memcpy(buf, callResp->output, callResp->outputBytes);
buf += callResp->outputBytes;
*(int32_t *) buf = callResp->newStateBytes;
buf += sizeof(int32_t);
memcpy(buf, callResp->newState, callResp->newStateBytes);
buf += callResp->newStateBytes;
break; break;
} case UDF_TASK_TEARDOWN:
case UDF_TASK_TEARDOWN: { buf = decodeUdfTeardownResponse(buf, &rsp->teardownRsp);
SUdfTeardownResponse *teardownResp = (SUdfTeardownResponse *) (response->subRsp);
break; break;
}
default: default:
//TODO: log error
break; break;
} }
response->msgLen = buf - bufBegin; return (void*)buf;
*(int32_t *) bufBegin = response->msgLen;
*pBuf = bufBegin;
*pBufLen = response->msgLen;
return 0;
} }
int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse **pResponse) { void freeUdfColumnData(SUdfColumnData *data) {
debugPrint("%s", "decoding response"); if (data->varLengthColumn) {
taosMemoryFree(data->varLenCol.varOffsets);
if (*(int32_t *) bufMsg != bufLen) { data->varLenCol.varOffsets = NULL;
debugPrint("%s", "can not decode response"); taosMemoryFree(data->varLenCol.payload);
return -1; data->varLenCol.payload = NULL;
} else {
taosMemoryFree(data->fixLenCol.nullBitmap);
data->fixLenCol.nullBitmap = NULL;
taosMemoryFree(data->fixLenCol.data);
data->fixLenCol.data = NULL;
} }
char *buf = bufMsg; }
SUdfResponse *rsp = taosMemoryMalloc(sizeof(SUdfResponse));
rsp->msgLen = *(int32_t *) buf;
buf += sizeof(int32_t);
rsp->seqNum = *(int64_t *) buf;
buf += sizeof(int64_t);
rsp->type = *(int8_t *) buf;
buf += sizeof(int8_t);
rsp->code = *(int32_t *) buf;
buf += sizeof(int32_t);
switch (rsp->type) { void freeUdfColumn(SUdfColumn* col) {
case UDF_TASK_SETUP: { freeUdfColumnData(&col->colData);
SUdfSetupResponse *setupRsp = (SUdfSetupResponse *) taosMemoryMalloc(sizeof(SUdfSetupResponse)); }
setupRsp->udfHandle = *(int64_t *) buf;
buf += sizeof(int64_t);
rsp->subRsp = (char *) setupRsp;
break;
}
case UDF_TASK_CALL: {
SUdfCallResponse *callRsp = (SUdfCallResponse *) taosMemoryMalloc(sizeof(SUdfCallResponse));
callRsp->outputBytes = *(int32_t *) buf;
buf += sizeof(int32_t);
callRsp->output = buf; void freeUdfDataDataBlock(SUdfDataBlock *block) {
buf += callRsp->outputBytes; for (int32_t i = 0; i < block->numOfCols; ++i) {
freeUdfColumn(block->udfCols[i]);
taosMemoryFree(block->udfCols[i]);
block->udfCols[i] = NULL;
}
taosMemoryFree(block->udfCols);
block->udfCols = NULL;
}
callRsp->newStateBytes = *(int32_t *) buf; void freeUdfInterBuf(SUdfInterBuf *buf) {
buf += sizeof(int32_t); taosMemoryFree(buf->buf);
buf->buf = NULL;
}
callRsp->newState = buf;
buf += callRsp->newStateBytes;
rsp->subRsp = callRsp; int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock) {
break; udfBlock->numOfRows = block->info.rows;
} udfBlock->numOfCols = block->info.numOfCols;
case UDF_TASK_TEARDOWN: { udfBlock->udfCols = taosMemoryCalloc(udfBlock->numOfCols, sizeof(SUdfColumn*));
SUdfTeardownResponse *teardownRsp = (SUdfTeardownResponse *) taosMemoryMalloc(sizeof(SUdfTeardownResponse)); for (int32_t i = 0; i < udfBlock->numOfCols; ++i) {
rsp->subRsp = teardownRsp; udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn));
break; SColumnInfoData *col= (SColumnInfoData*)taosArrayGet(block->pDataBlock, i);
SUdfColumn *udfCol = udfBlock->udfCols[i];
udfCol->colMeta.type = col->info.type;
udfCol->colMeta.bytes = col->info.bytes;
udfCol->colMeta.scale = col->info.scale;
udfCol->colMeta.precision = col->info.precision;
udfCol->colData.numOfRows = udfBlock->numOfRows;
udfCol->colData.varLengthColumn = IS_VAR_DATA_TYPE(udfCol->colMeta.type);
if (udfCol->colData.varLengthColumn) {
udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
memcpy(udfCol->colData.varLenCol.payload, col->pData, udfCol->colData.varLenCol.payloadLen);
} else {
udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen);
char* bitmap = udfCol->colData.fixLenCol.nullBitmap;
memcpy(bitmap, col->nullbitmap, bitmapLen);
udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows);
int32_t dataLen = udfCol->colData.fixLenCol.dataLen;
udfCol->colData.fixLenCol.data = taosMemoryMalloc(udfCol->colData.fixLenCol.dataLen);
char* data = udfCol->colData.fixLenCol.data;
memcpy(data, col->pData, dataLen);
} }
default:
break;
} }
if (buf - bufMsg != bufLen) {
debugPrint("%s", "can not decode response");
taosMemoryFree(rsp->subRsp);
taosMemoryFree(rsp);
return -1;
}
*pResponse = rsp;
return 0; return 0;
} }
void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) { int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
debugPrint("Process exited with status %" PRId64 ", signal %d", exit_status, term_signal); block->info.numOfCols = 1;
uv_close((uv_handle_t *) req, NULL); block->info.rows = udfCol->colData.numOfRows;
//TODO: restart the udfd process block->info.hasVarCol = udfCol->colData.varLengthColumn;
block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
taosArraySetSize(block->pDataBlock, 1);
SColumnInfoData *col = taosArrayGet(block->pDataBlock, 0);
SUdfColumnMeta *meta = &udfCol->colMeta;
col->info.precision = meta->precision;
col->info.bytes = meta->bytes;
col->info.scale = meta->scale;
col->info.type = meta->type;
SUdfColumnData *data = &udfCol->colData;
if (!IS_VAR_DATA_TYPE(meta->type)) {
col->nullbitmap = taosMemoryMalloc(data->fixLenCol.nullBitmapLen);
memcpy(col->nullbitmap, data->fixLenCol.nullBitmap, data->fixLenCol.nullBitmapLen);
col->pData = taosMemoryMalloc(data->fixLenCol.dataLen);
memcpy(col->pData, data->fixLenCol.data, data->fixLenCol.dataLen);
} else {
col->varmeta.offset = taosMemoryMalloc(data->varLenCol.varOffsetsLen);
memcpy(col->varmeta.offset, data->varLenCol.varOffsets, data->varLenCol.varOffsetsLen);
col->pData = taosMemoryMalloc(data->varLenCol.payloadLen);
memcpy(col->pData, data->varLenCol.payload, data->varLenCol.payloadLen);
}
return 0;
} }
void onUdfcPipeClose(uv_handle_t *handle) { void onUdfcPipeClose(uv_handle_t *handle) {
SClientUvConn *conn = handle->data; SClientUvConn *conn = handle->data;
if (!udfTaskQueueIsEmpty(&conn->taskQueue)) { if (!QUEUE_EMPTY(&conn->taskQueue)) {
SClientUvTaskNode *task = udfTaskQueueHeadTask(&conn->taskQueue); QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
task->errCode = 0; task->errCode = 0;
uv_sem_post(&task->taskSem); uv_sem_post(&task->taskSem);
QUEUE_REMOVE(&task->procTaskQueue);
} }
taosMemoryFree(conn->readBuf.buf); taosMemoryFree(conn->readBuf.buf);
...@@ -485,23 +581,24 @@ int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvT ...@@ -485,23 +581,24 @@ int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvT
debugPrint("%s", "get uv task result"); debugPrint("%s", "get uv task result");
if (uvTask->type == UV_TASK_REQ_RSP) { if (uvTask->type == UV_TASK_REQ_RSP) {
if (uvTask->rspBuf.base != NULL) { if (uvTask->rspBuf.base != NULL) {
SUdfResponse *rsp; SUdfResponse rsp;
decodeResponse(uvTask->rspBuf.base, uvTask->rspBuf.len, &rsp); void* buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
task->errCode = rsp->code; assert(uvTask->rspBuf.len == POINTER_DISTANCE(buf, uvTask->rspBuf.base));
task->errCode = rsp.code;
switch (task->type) { switch (task->type) {
case UDF_TASK_SETUP: { case UDF_TASK_SETUP: {
//TODO: copy or not //TODO: copy or not
task->_setup.rsp = *(SUdfSetupResponse *) (rsp->subRsp); task->_setup.rsp = rsp.setupRsp;
break; break;
} }
case UDF_TASK_CALL: { case UDF_TASK_CALL: {
task->_call.rsp = *(SUdfCallResponse *) (rsp->subRsp); task->_call.rsp = rsp.callRsp;
//TODO: copy or not //TODO: copy or not
break; break;
} }
case UDF_TASK_TEARDOWN: { case UDF_TASK_TEARDOWN: {
task->_teardown.rsp = *(SUdfTeardownResponse *) (rsp->subRsp); task->_teardown.rsp = rsp.teardownRsp;
//TODO: copy or not? //TODO: copy or not?
break; break;
} }
...@@ -512,8 +609,6 @@ int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvT ...@@ -512,8 +609,6 @@ int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvT
// TODO: the call buffer is setup and freed by udf invocation // TODO: the call buffer is setup and freed by udf invocation
taosMemoryFree(uvTask->rspBuf.base); taosMemoryFree(uvTask->rspBuf.base);
taosMemoryFree(rsp->subRsp);
taosMemoryFree(rsp);
} else { } else {
task->errCode = uvTask->errCode; task->errCode = uvTask->errCode;
} }
...@@ -577,14 +672,16 @@ void udfcUvHandleRsp(SClientUvConn *conn) { ...@@ -577,14 +672,16 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
SClientConnBuf *connBuf = &conn->readBuf; SClientConnBuf *connBuf = &conn->readBuf;
int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen int32_t then seqnum int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen int32_t then seqnum
if (udfTaskQueueIsEmpty(&conn->taskQueue)) { if (QUEUE_EMPTY(&conn->taskQueue)) {
//LOG error //LOG error
return; return;
} }
bool found = false; bool found = false;
SClientUvTaskNode *taskFound = NULL; SClientUvTaskNode *taskFound = NULL;
SClientUvTaskNode *task = udfTaskQueueNext(&conn->taskQueue); QUEUE* h = QUEUE_NEXT(&conn->taskQueue);
while (task != &conn->taskQueue) { SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
while (h != &conn->taskQueue) {
if (task->seqNum == seqNum) { if (task->seqNum == seqNum) {
if (found == false) { if (found == false) {
found = true; found = true;
...@@ -594,15 +691,17 @@ void udfcUvHandleRsp(SClientUvConn *conn) { ...@@ -594,15 +691,17 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
continue; continue;
} }
} }
task = udfTaskQueueNext(task); h = QUEUE_NEXT(h);
task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
} }
if (taskFound) { if (taskFound) {
taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len); taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
udfTaskQueueRemoveTask(taskFound); QUEUE_REMOVE(&taskFound->connTaskQueue);
uv_sem_post(&taskFound->taskSem); uv_sem_post(&taskFound->taskSem);
QUEUE_REMOVE(&taskFound->procTaskQueue);
} else { } else {
//LOG error //TODO: LOG error
} }
connBuf->buf = NULL; connBuf->buf = NULL;
connBuf->total = -1; connBuf->total = -1;
...@@ -611,7 +710,18 @@ void udfcUvHandleRsp(SClientUvConn *conn) { ...@@ -611,7 +710,18 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
} }
void udfcUvHandleError(SClientUvConn *conn) { void udfcUvHandleError(SClientUvConn *conn) {
uv_close((uv_handle_t *) conn->pipe, onUdfcPipeClose); while (!QUEUE_EMPTY(&conn->taskQueue)) {
QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
task->errCode = UDFC_CODE_PIPE_READ_ERR;
uv_sem_post(&task->taskSem);
QUEUE_REMOVE(&task->procTaskQueue);
}
uv_close((uv_handle_t *) conn->pipe, NULL);
taosMemoryFree(conn->pipe);
taosMemoryFree(conn->readBuf.buf);
taosMemoryFree(conn);
} }
void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
...@@ -643,7 +753,7 @@ void onUdfClientWrite(uv_write_t *write, int status) { ...@@ -643,7 +753,7 @@ void onUdfClientWrite(uv_write_t *write, int status) {
if (status == 0) { if (status == 0) {
uv_pipe_t *pipe = uvTask->pipe; uv_pipe_t *pipe = uvTask->pipe;
SClientUvConn *conn = pipe->data; SClientUvConn *conn = pipe->data;
udfTaskQueueInsertTail(&conn->taskQueue, uvTask); QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
} else { } else {
//TODO Log error; //TODO Log error;
} }
...@@ -661,6 +771,7 @@ void onUdfClientConnect(uv_connect_t *connect, int status) { ...@@ -661,6 +771,7 @@ void onUdfClientConnect(uv_connect_t *connect, int status) {
uv_read_start((uv_stream_t *) uvTask->pipe, udfcAllocateBuffer, onUdfcRead); uv_read_start((uv_stream_t *) uvTask->pipe, udfcAllocateBuffer, onUdfcRead);
taosMemoryFree(connect); taosMemoryFree(connect);
uv_sem_post(&uvTask->taskSem); uv_sem_post(&uvTask->taskSem);
QUEUE_REMOVE(&uvTask->procTaskQueue);
} }
int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) { int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) {
...@@ -675,21 +786,23 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN ...@@ -675,21 +786,23 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
request.seqNum = gUdfTaskSeqNum++; request.seqNum = gUdfTaskSeqNum++;
if (task->type == UDF_TASK_SETUP) { if (task->type == UDF_TASK_SETUP) {
request.subReq = &task->_setup.req; request.setup = task->_setup.req;
request.type = UDF_TASK_SETUP; request.type = UDF_TASK_SETUP;
} else if (task->type == UDF_TASK_CALL) { } else if (task->type == UDF_TASK_CALL) {
request.subReq = &task->_call.req; request.call = task->_call.req;
request.type = UDF_TASK_CALL; request.type = UDF_TASK_CALL;
} else if (task->type == UDF_TASK_TEARDOWN) { } else if (task->type == UDF_TASK_TEARDOWN) {
request.subReq = &task->_teardown.req; request.teardown = task->_teardown.req;
request.type = UDF_TASK_TEARDOWN; request.type = UDF_TASK_TEARDOWN;
} else { } else {
//TODO log and return error //TODO log and return error
} }
char *buf = NULL; int32_t bufLen = encodeUdfRequest(NULL, &request);
int32_t bufLen = 0; request.msgLen = bufLen;
encodeRequest(&buf, &bufLen, &request); void *bufBegin = taosMemoryMalloc(bufLen);
uvTask->reqBuf = uv_buf_init(buf, bufLen); void *buf = bufBegin;
encodeUdfRequest(&buf, &request);
uvTask->reqBuf = uv_buf_init(bufBegin, bufLen);
uvTask->seqNum = request.seqNum; uvTask->seqNum = request.seqNum;
} else if (uvTaskType == UV_TASK_DISCONNECT) { } else if (uvTaskType == UV_TASK_DISCONNECT) {
uvTask->pipe = task->session->udfSvcPipe; uvTask->pipe = task->session->udfSvcPipe;
...@@ -704,7 +817,7 @@ int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) { ...@@ -704,7 +817,7 @@ int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) {
debugPrint("%s, %d", "queue uv task", uvTask->type); debugPrint("%s, %d", "queue uv task", uvTask->type);
uv_mutex_lock(&gUdfTaskQueueMutex); uv_mutex_lock(&gUdfTaskQueueMutex);
udfTaskQueueInsertTail(gUdfTaskQueue, uvTask); QUEUE_INSERT_TAIL(&gUdfTaskQueue, &uvTask->recvTaskQueue);
uv_mutex_unlock(&gUdfTaskQueueMutex); uv_mutex_unlock(&gUdfTaskQueueMutex);
uv_async_send(&gUdfLoopTaskAync); uv_async_send(&gUdfLoopTaskAync);
...@@ -728,7 +841,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { ...@@ -728,7 +841,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
conn->readBuf.cap = 0; conn->readBuf.cap = 0;
conn->readBuf.buf = 0; conn->readBuf.buf = 0;
conn->readBuf.total = -1; conn->readBuf.total = -1;
udfTaskQueueInit(&conn->taskQueue); QUEUE_INIT(&conn->taskQueue);
pipe->data = conn; pipe->data = conn;
...@@ -747,7 +860,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { ...@@ -747,7 +860,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
} }
case UV_TASK_DISCONNECT: { case UV_TASK_DISCONNECT: {
SClientUvConn *conn = uvTask->pipe->data; SClientUvConn *conn = uvTask->pipe->data;
udfTaskQueueInsertTail(&conn->taskQueue, uvTask); QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
uv_close((uv_handle_t *) uvTask->pipe, onUdfcPipeClose); uv_close((uv_handle_t *) uvTask->pipe, onUdfcPipeClose);
break; break;
} }
...@@ -760,67 +873,145 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { ...@@ -760,67 +873,145 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
} }
void udfClientAsyncCb(uv_async_t *async) { void udfClientAsyncCb(uv_async_t *async) {
SClientUvTaskNode node; QUEUE wq;
SClientUvTaskQueue q = &node;
udfTaskQueueInit(q);
uv_mutex_lock(&gUdfTaskQueueMutex); uv_mutex_lock(&gUdfTaskQueueMutex);
udfTaskQueueMove(gUdfTaskQueue, q); QUEUE_MOVE(&gUdfTaskQueue, &wq);
uv_mutex_unlock(&gUdfTaskQueueMutex); uv_mutex_unlock(&gUdfTaskQueueMutex);
while (!udfTaskQueueIsEmpty(q)) { while (!QUEUE_EMPTY(&wq)) {
SClientUvTaskNode *task = udfTaskQueueHeadTask(q); QUEUE* h = QUEUE_HEAD(&wq);
udfTaskQueueRemoveTask(task); QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
startUvUdfTask(task); startUvUdfTask(task);
QUEUE_INSERT_TAIL(&gUvProcTaskQueue, &task->procTaskQueue);
} }
} }
void cleanUpUvTasks() {
QUEUE wq;
uv_mutex_lock(&gUdfTaskQueueMutex);
QUEUE_MOVE(&gUdfTaskQueue, &wq);
uv_mutex_unlock(&gUdfTaskQueueMutex);
while (!QUEUE_EMPTY(&wq)) {
QUEUE* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
if (gUdfcState == UDFC_STATE_STOPPING) {
task->errCode = UDFC_CODE_STOPPING;
} else if (gUdfcState == UDFC_STATE_RESTARTING) {
task->errCode = UDFC_CODE_RESTARTING;
}
uv_sem_post(&task->taskSem);
}
// TODO: deal with tasks that are waiting result.
while (!QUEUE_EMPTY(&gUvProcTaskQueue)) {
QUEUE* h = QUEUE_HEAD(&gUvProcTaskQueue);
QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
if (gUdfcState == UDFC_STATE_STOPPING) {
task->errCode = UDFC_CODE_STOPPING;
} else if (gUdfcState == UDFC_STATE_RESTARTING) {
task->errCode = UDFC_CODE_RESTARTING;
}
uv_sem_post(&task->taskSem);
}
}
void udfStopAsyncCb(uv_async_t *async) { void udfStopAsyncCb(uv_async_t *async) {
uv_stop(&gUdfdLoop); cleanUpUvTasks();
uv_loop_close(&gUdfdLoop); if (gUdfcState == UDFC_STATE_STOPPING) {
uv_stop(&gUdfdLoop);
}
} }
void startUdfd(void *argsThread) { int32_t startUdfd();
uv_loop_init(&gUdfdLoop);
//TODO: path void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) {
uv_process_options_t options; //TODO: pipe close will be first received
static char path[256] = {0}; debugPrint("Process exited with status %" PRId64 ", signal %d", exit_status, term_signal);
size_t cwdSize; uv_close((uv_handle_t *) req, NULL);
uv_cwd(path, &cwdSize); //TODO: restart the udfd process
strcat(path, "./udfd"); if (gUdfcState == UDFC_STATE_STOPPING) {
char* args[2] = {path, NULL}; if (term_signal != SIGINT) {
options.args = args; //TODO: log error
options.file = path;
options.exit_cb = onUdfdExit;
int err = uv_spawn(&gUdfdLoop, &gUdfdProcess, &options);
if (err != 0) {
debugPrint("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
} }
}
if (gUdfcState == UDFC_STATE_READY) {
gUdfcState = UDFC_STATE_RESTARTING;
//TODO: asynchronous without blocking. how to do it
cleanUpUvTasks();
startUdfd();
}
}
int32_t startUdfd() {
//TODO: path
uv_process_options_t options = {0};
static char path[256] = {0};
size_t cwdSize;
uv_cwd(path, &cwdSize);
strcat(path, "/udfd");
char* args[2] = {path, NULL};
options.args = args;
options.file = path;
options.exit_cb = onUdfdExit;
options.stdio_count = 3;
uv_stdio_container_t child_stdio[3];
child_stdio[0].flags = UV_IGNORE;
child_stdio[1].flags = UV_INHERIT_FD;
child_stdio[1].data.fd = 1;
child_stdio[2].flags = UV_INHERIT_FD;
child_stdio[2].data.fd = 2;
options.stdio = child_stdio;
//TODO spawn error
int err = uv_spawn(&gUdfdLoop, &gUdfdProcess, &options);
if (err != 0) {
debugPrint("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
}
return err;
}
void constructUdfService(void *argsThread) {
uv_loop_init(&gUdfdLoop);
//TODO spawn error
startUdfd();
uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb); uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb);
uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb); uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb);
uv_mutex_init(&gUdfTaskQueueMutex); uv_mutex_init(&gUdfTaskQueueMutex);
udfTaskQueueInit(gUdfTaskQueue); QUEUE_INIT(&gUdfTaskQueue);
QUEUE_INIT(&gUvProcTaskQueue);
uv_barrier_wait(&gUdfInitBarrier); uv_barrier_wait(&gUdfInitBarrier);
//TODO return value of uv_run
uv_run(&gUdfdLoop, UV_RUN_DEFAULT); uv_run(&gUdfdLoop, UV_RUN_DEFAULT);
uv_loop_close(&gUdfdLoop);
} }
int32_t startUdfService() { int32_t startUdfService() {
gUdfcState = UDFC_STATE_STARTNG;
uv_barrier_init(&gUdfInitBarrier, 2); uv_barrier_init(&gUdfInitBarrier, 2);
uv_thread_create(&gUdfLoopThread, startUdfd, 0); uv_thread_create(&gUdfLoopThread, constructUdfService, 0);
uv_barrier_wait(&gUdfInitBarrier); uv_barrier_wait(&gUdfInitBarrier); gUdfcState = UDFC_STATE_READY;
return 0; return 0;
} }
int32_t stopUdfService() { int32_t stopUdfService() {
gUdfcState = UDFC_STATE_STOPPING;
uv_barrier_destroy(&gUdfInitBarrier); uv_barrier_destroy(&gUdfInitBarrier);
uv_process_kill(&gUdfdProcess, SIGINT); if (gUdfcState == UDFC_STATE_STOPPING) {
uv_process_kill(&gUdfdProcess, SIGINT);
}
uv_async_send(&gUdfLoopStopAsync); uv_async_send(&gUdfLoopStopAsync);
uv_mutex_destroy(&gUdfTaskQueueMutex);
uv_thread_join(&gUdfLoopThread); uv_thread_join(&gUdfLoopThread);
uv_mutex_destroy(&gUdfTaskQueueMutex);
gUdfcState = UDFC_STATUS_FINAL;
return 0; return 0;
} }
...@@ -838,7 +1029,7 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) { ...@@ -838,7 +1029,7 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) {
return task->errCode; return task->errCode;
} }
int32_t setupUdf(SUdfInfo *udfInfo, UdfHandle *handle) { int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle) {
debugPrint("%s", "client setup udf"); debugPrint("%s", "client setup udf");
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
task->errCode = 0; task->errCode = 0;
...@@ -846,11 +1037,7 @@ int32_t setupUdf(SUdfInfo *udfInfo, UdfHandle *handle) { ...@@ -846,11 +1037,7 @@ int32_t setupUdf(SUdfInfo *udfInfo, UdfHandle *handle) {
task->type = UDF_TASK_SETUP; task->type = UDF_TASK_SETUP;
SUdfSetupRequest *req = &task->_setup.req; SUdfSetupRequest *req = &task->_setup.req;
memcpy(req->udfName, udfInfo->udfName, 16); memcpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
req->path = udfInfo->path;
req->pathSize = strlen(req->path) + 1;
req->udfType = udfInfo->udfType;
req->scriptType = udfInfo->scriptType;
int32_t errCode = udfcRunUvTask(task, UV_TASK_CONNECT); int32_t errCode = udfcRunUvTask(task, UV_TASK_CONNECT);
if (errCode != 0) { if (errCode != 0) {
...@@ -868,8 +1055,8 @@ int32_t setupUdf(SUdfInfo *udfInfo, UdfHandle *handle) { ...@@ -868,8 +1055,8 @@ int32_t setupUdf(SUdfInfo *udfInfo, UdfHandle *handle) {
return err; return err;
} }
int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SUdfDataBlock input, char **newState, int32_t callUdf(UdfHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
int32_t *newStateSize, SUdfDataBlock *output) { SSDataBlock* output, SUdfInterBuf *newState) {
debugPrint("%s", "client call udf"); debugPrint("%s", "client call udf");
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
...@@ -878,23 +1065,102 @@ int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, S ...@@ -878,23 +1065,102 @@ int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, S
task->type = UDF_TASK_CALL; task->type = UDF_TASK_CALL;
SUdfCallRequest *req = &task->_call.req; SUdfCallRequest *req = &task->_call.req;
req->state = state;
req->stateBytes = stateSize;
req->inputBytes = input.size;
req->input = input.data;
req->udfHandle = task->session->severHandle; req->udfHandle = task->session->severHandle;
req->step = step; req->callType = callType;
switch (callType) {
case TSDB_UDF_CALL_AGG_INIT: {
req->initFirst = 1;
break;
}
case TSDB_UDF_CALL_AGG_PROC: {
req->block = *input;
req->interBuf = *state;
break;
}
case TSDB_UDF_CALL_AGG_MERGE: {
req->interBuf = *state;
req->interBuf2 = *state2;
break;
}
case TSDB_UDF_CALL_AGG_FIN: {
req->interBuf = *state;
break;
}
case TSDB_UDF_CALL_SCALA_PROC: {
req->block = *input;
break;
}
}
udfcRunUvTask(task, UV_TASK_REQ_RSP); udfcRunUvTask(task, UV_TASK_REQ_RSP);
SUdfCallResponse *rsp = &task->_call.rsp; SUdfCallResponse *rsp = &task->_call.rsp;
*newState = rsp->newState; switch (callType) {
*newStateSize = rsp->newStateBytes; case TSDB_UDF_CALL_AGG_INIT: {
output->size = rsp->outputBytes; *newState = rsp->resultBuf;
output->data = rsp->output; break;
int32_t err = task->errCode; }
case TSDB_UDF_CALL_AGG_PROC: {
*newState = rsp->resultBuf;
break;
}
case TSDB_UDF_CALL_AGG_MERGE: {
*newState = rsp->resultBuf;
break;
}
case TSDB_UDF_CALL_AGG_FIN: {
*newState = rsp->resultBuf;
break;
}
case TSDB_UDF_CALL_SCALA_PROC: {
*output = rsp->resultData;
break;
}
}
taosMemoryFree(task); taosMemoryFree(task);
return task->errCode;
}
//TODO: translate these calls to callUdf
int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf) {
int8_t callType = TSDB_UDF_CALL_AGG_INIT;
int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf);
return err;
}
// input: block, state
// output: interbuf,
int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) {
int8_t callType = TSDB_UDF_CALL_AGG_PROC;
int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState);
return err;
}
// input: interbuf1, interbuf2
// output: resultBuf
int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) {
int8_t callType = TSDB_UDF_CALL_AGG_MERGE;
int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf);
return err;
}
// input: interBuf
// output: resultData
int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
int8_t callType = TSDB_UDF_CALL_AGG_PROC;
int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
return err;
}
// input: block
// output: resultData
int32_t callUdfScalaProcess(UdfHandle handle, SSDataBlock *block, SSDataBlock *resultData) {
int8_t callType = TSDB_UDF_CALL_SCALA_PROC;
int32_t err = callUdf(handle, callType, block, NULL, NULL, resultData, NULL);
return err; return err;
} }
......
...@@ -20,6 +20,10 @@ ...@@ -20,6 +20,10 @@
#include "tudf.h" #include "tudf.h"
#include "tudfInt.h" #include "tudfInt.h"
#include "tdataformat.h"
#include "tglobal.h"
#include "tmsg.h"
#include "trpc.h"
static uv_loop_t *loop; static uv_loop_t *loop;
...@@ -44,7 +48,8 @@ typedef struct SUdf { ...@@ -44,7 +48,8 @@ typedef struct SUdf {
int8_t type; int8_t type;
uv_lib_t lib; uv_lib_t lib;
TUdfFunc normalFunc; TUdfScalarProcFunc scalarProcFunc;
TUdfFreeUdfColumnFunc freeUdfColumn;
} SUdf; } SUdf;
//TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix //TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
...@@ -56,119 +61,120 @@ typedef struct SUdfHandle { ...@@ -56,119 +61,120 @@ typedef struct SUdfHandle {
void udfdProcessRequest(uv_work_t *req) { void udfdProcessRequest(uv_work_t *req) {
SUvUdfWork *uvUdf = (SUvUdfWork *) (req->data); SUvUdfWork *uvUdf = (SUvUdfWork *) (req->data);
SUdfRequest *request = NULL; SUdfRequest request = {0};
decodeRequest(uvUdf->input.base, uvUdf->input.len, &request); decodeUdfRequest(uvUdf->input.base, &request);
switch (request->type) { switch (request.type) {
case UDF_TASK_SETUP: { case UDF_TASK_SETUP: {
debugPrint("%s", "process setup request"); debugPrint("%s", "process setup request");
SUdf *udf = taosMemoryMalloc(sizeof(SUdf)); SUdf *udf = taosMemoryMalloc(sizeof(SUdf));
udf->refCount = 0; udf->refCount = 0;
SUdfSetupRequest *setup = request->subReq; SUdfSetupRequest *setup = &request.setup;
strcpy(udf->name, setup->udfName); strcpy(udf->name, setup->udfName);
int err = uv_dlopen(setup->path, &udf->lib); //TODO: retrive udf info from mnode
char* path = "libudf1.so";
int err = uv_dlopen(path, &udf->lib);
if (err != 0) { if (err != 0) {
debugPrint("can not load library %s. error: %s", setup->path, uv_strerror(err)); debugPrint("can not load library %s. error: %s", path, uv_strerror(err));
//TODO set error //TODO set error
} }
char normalFuncName[32] = {0}; char normalFuncName[TSDB_FUNC_NAME_LEN] = {0};
strcpy(normalFuncName, setup->udfName); strcpy(normalFuncName, setup->udfName);
//TODO error, //TODO error, multi-thread, same udf, lock it
//TODO find all functions normal, init, destroy, normal, merge, finalize //TODO find all functions normal, init, destroy, normal, merge, finalize
uv_dlsym(&udf->lib, normalFuncName, (void **) (&udf->normalFunc)); uv_dlsym(&udf->lib, normalFuncName, (void **) (&udf->scalarProcFunc));
char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *freeSuffix = "_free";
strncpy(freeFuncName, normalFuncName, strlen(normalFuncName));
strncat(freeFuncName, freeSuffix, strlen(freeSuffix));
uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn));
SUdfHandle *handle = taosMemoryMalloc(sizeof(SUdfHandle)); SUdfHandle *handle = taosMemoryMalloc(sizeof(SUdfHandle));
handle->udf = udf; handle->udf = udf;
udf->refCount++; udf->refCount++;
//TODO: allocate private structure and call init function and set it to handle //TODO: allocate private structure and call init function and set it to handle
SUdfResponse *rsp = taosMemoryMalloc(sizeof(SUdfResponse)); SUdfResponse rsp;
rsp->seqNum = request->seqNum; rsp.seqNum = request.seqNum;
rsp->type = request->type; rsp.type = request.type;
rsp->code = 0; rsp.code = 0;
SUdfSetupResponse *subRsp = taosMemoryMalloc(sizeof(SUdfSetupResponse)); rsp.setupRsp.udfHandle = (int64_t) (handle);
subRsp->udfHandle = (int64_t) (handle); int32_t len = encodeUdfResponse(NULL, &rsp);
rsp->subRsp = subRsp; rsp.msgLen = len;
char *buf; void *bufBegin = taosMemoryMalloc(len);
int32_t len; void *buf = bufBegin;
encodeResponse(&buf, &len, rsp); encodeUdfResponse(&buf, &rsp);
uvUdf->output = uv_buf_init(buf, len); uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(rsp->subRsp);
taosMemoryFree(rsp);
taosMemoryFree(request->subReq);
taosMemoryFree(request);
taosMemoryFree(uvUdf->input.base); taosMemoryFree(uvUdf->input.base);
break; break;
} }
case UDF_TASK_CALL: { case UDF_TASK_CALL: {
debugPrint("%s", "process call request"); debugPrint("%s", "process call request");
SUdfCallRequest *call = request->subReq; SUdfCallRequest *call = &request.call;
SUdfHandle *handle = (SUdfHandle *) (call->udfHandle); SUdfHandle *handle = (SUdfHandle *) (call->udfHandle);
SUdf *udf = handle->udf; SUdf *udf = handle->udf;
char *newState;
int32_t newStateSize; SUdfDataBlock input = {0};
SUdfDataBlock input = {.data = call->input, .size= call->inputBytes}; convertDataBlockToUdfDataBlock(&call->block, &input);
SUdfDataBlock output; SUdfColumn output = {0};
//TODO: call different functions according to the step //TODO: call different functions according to call type, for now just calar
udf->normalFunc(call->step, call->state, call->stateBytes, input, &newState, &newStateSize, &output); if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
udf->scalarProcFunc(input, &output);
SUdfResponse *rsp = taosMemoryMalloc(sizeof(SUdfResponse)); }
rsp->seqNum = request->seqNum;
rsp->type = request->type; SUdfResponse response = {0};
rsp->code = 0; SUdfResponse *rsp = &response;
SUdfCallResponse *subRsp = taosMemoryMalloc(sizeof(SUdfCallResponse)); if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
subRsp->outputBytes = output.size; rsp->seqNum = request.seqNum;
subRsp->output = output.data; rsp->type = request.type;
subRsp->newStateBytes = newStateSize; rsp->code = 0;
subRsp->newState = newState; SUdfCallResponse *subRsp = &rsp->callRsp;
rsp->subRsp = subRsp; subRsp->callType = call->callType;
convertUdfColumnToDataBlock(&output, &subRsp->resultData);
char *buf; }
int32_t len;
encodeResponse(&buf, &len, rsp); int32_t len = encodeUdfResponse(NULL, rsp);
uvUdf->output = uv_buf_init(buf, len); rsp->msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
taosMemoryFree(rsp->subRsp); void *buf = bufBegin;
taosMemoryFree(rsp); encodeUdfResponse(&buf, rsp);
taosMemoryFree(newState); uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(output.data);
taosMemoryFree(request->subReq); //TODO: free
taosMemoryFree(request); udf->freeUdfColumn(&output);
taosMemoryFree(uvUdf->input.base); taosMemoryFree(uvUdf->input.base);
break; break;
} }
case UDF_TASK_TEARDOWN: { case UDF_TASK_TEARDOWN: {
debugPrint("%s", "process teardown request"); debugPrint("%s", "process teardown request");
SUdfTeardownRequest *teardown = request->subReq; SUdfTeardownRequest *teardown = &request.teardown;
SUdfHandle *handle = (SUdfHandle *) (teardown->udfHandle); SUdfHandle *handle = (SUdfHandle *) (teardown->udfHandle);
SUdf *udf = handle->udf; SUdf *udf = handle->udf;
udf->refCount--; udf->refCount--;
if (udf->refCount == 0) { if (udf->refCount == 0) {
uv_dlclose(&udf->lib); uv_dlclose(&udf->lib);
taosMemoryFree(udf);
} }
taosMemoryFree(udf); //TODO: call destroy and free udf private
//TODO: call destroy and free udf private
taosMemoryFree(handle); taosMemoryFree(handle);
SUdfResponse *rsp = taosMemoryMalloc(sizeof(SUdfResponse)); SUdfResponse response;
rsp->seqNum = request->seqNum; SUdfResponse *rsp = &response;
rsp->type = request->type; rsp->seqNum = request.seqNum;
rsp->type = request.type;
rsp->code = 0; rsp->code = 0;
SUdfTeardownResponse *subRsp = taosMemoryMalloc(sizeof(SUdfTeardownResponse)); int32_t len = encodeUdfResponse(NULL, rsp);
rsp->subRsp = subRsp; rsp->msgLen = len;
char *buf; void *bufBegin = taosMemoryMalloc(len);
int32_t len; void *buf = bufBegin;
encodeResponse(&buf, &len, rsp); encodeUdfResponse(&buf, rsp);
uvUdf->output = uv_buf_init(buf, len); uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(rsp->subRsp);
taosMemoryFree(rsp);
taosMemoryFree(request->subReq);
taosMemoryFree(request);
taosMemoryFree(uvUdf->input.base); taosMemoryFree(uvUdf->input.base);
break; break;
} }
...@@ -181,7 +187,7 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -181,7 +187,7 @@ void udfdProcessRequest(uv_work_t *req) {
} }
void udfdOnWrite(uv_write_t *req, int status) { void udfdOnWrite(uv_write_t *req, int status) {
debugPrint("%s", "after writing to pipe"); debugPrint("%s", "server after writing to pipe");
if (status < 0) { if (status < 0) {
debugPrint("Write error %s", uv_err_name(status)); debugPrint("Write error %s", uv_err_name(status));
} }
...@@ -205,7 +211,7 @@ void udfdSendResponse(uv_work_t *work, int status) { ...@@ -205,7 +211,7 @@ void udfdSendResponse(uv_work_t *work, int status) {
} }
void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
debugPrint("%s", "allocate buffer for read"); debugPrint("%s", "server allocate buffer for read");
SUdfdUvConn *ctx = handle->data; SUdfdUvConn *ctx = handle->data;
int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t); int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
if (ctx->inputCap == 0) { if (ctx->inputCap == 0) {
...@@ -329,6 +335,77 @@ void removeListeningPipe(int sig) { ...@@ -329,6 +335,77 @@ void removeListeningPipe(int sig) {
exit(0); exit(0);
} }
typedef struct SServerContext {
void *clientRpc;
} SUdfdContext;
void udfdProcessRpcRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
return;
}
int32_t fetchUdfFuncInfo(void *clientRpc, SEpSet* pEpSet, char* udfNames[], int32_t numOfUdfs) {
SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
for (int32_t i = 0; i < numOfUdfs; ++i) {
taosArrayPush(retrieveReq.pFuncNames, udfNames[i]);
}
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
taosArrayDestroy(retrieveReq.pFuncNames);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = contLen;
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
SRpcMsg rpcRsp = {0};
rpcSendRecv(clientRpc, pEpSet, &rpcMsg, &rpcRsp);
SRetrieveFuncRsp retrieveRsp = {0};
tDeserializeSRetrieveFuncRsp(rpcRsp.pCont, rpcRsp.contLen, &retrieveRsp);
SFuncInfo* pFuncInfo = (SFuncInfo*)taosArrayGet(retrieveRsp.pFuncInfos, 0);
taosArrayDestroy(retrieveRsp.pFuncInfos);
rpcFreeCont(rpcRsp.pCont);
return 0;
}
int32_t openUdfdClientRpc(SUdfdContext *ctx) {
char *pass = "taosdata";
char *user = "root";
char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
SRpcInit rpcInit = {0};
rpcInit.label = (char*)"UDFD";
rpcInit.numOfThreads = 1;
rpcInit.cfp = udfdProcessRpcRsp;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = 30 * 1000;
rpcInit.parent = ctx;
rpcInit.user = (char*)user;
rpcInit.ckey = (char*)"key";
rpcInit.secret = (char*)secretEncrypt;
rpcInit.spi = 1;
ctx->clientRpc = rpcOpen(&rpcInit);
return 0;
}
int32_t closeUdfdClientRpc(SUdfdContext *ctx) {
rpcClose(ctx->clientRpc);
return 0;
}
int main() { int main() {
debugPrint("libuv version: %x", UV_VERSION_HEX); debugPrint("libuv version: %x", UV_VERSION_HEX);
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include "uv.h" #include "uv.h"
#include "os.h" #include "os.h"
#include "tudf.h" #include "tudf.h"
#include "tdatablock.h"
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
startUdfService(); startUdfService();
...@@ -18,28 +19,38 @@ int main(int argc, char *argv[]) { ...@@ -18,28 +19,38 @@ int main(int argc, char *argv[]) {
} }
fprintf(stdout, "current working directory:%s\n", path); fprintf(stdout, "current working directory:%s\n", path);
strcat(path, "/libudf1.so"); strcat(path, "/libudf1.so");
SUdfInfo udfInfo = {.udfName="udf1", .path=path};
UdfHandle handle; UdfHandle handle;
setupUdf(&udfInfo, &handle); SEpSet epSet;
setupUdf("udf1", &epSet, &handle);
//char state[5000000] = "state"; SSDataBlock block = {0};
//char input[5000000] = "input"; SSDataBlock* pBlock = &block;
int dataSize = 500; pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
int callCount = 2; pBlock->info.numOfCols = 1;
if (argc > 1) dataSize = atoi(argv[1]); pBlock->info.rows = 4;
if (argc > 2) callCount = atoi(argv[2]); char data[16] = {0};
char *state = taosMemoryMalloc(dataSize); char bitmap[4] = {0};
char *input = taosMemoryMalloc(dataSize); for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SUdfDataBlock blockInput = {.data = input, .size = dataSize}; SColumnInfoData colInfo = {0};
SUdfDataBlock blockOutput; colInfo.info.type = TSDB_DATA_TYPE_INT;
char* newState; colInfo.info.bytes = sizeof(int32_t);
int32_t newStateSize; colInfo.info.colId = 1;
for (int l = 0; l < callCount; ++l) { colInfo.pData = data;
callUdf(handle, 0, state, dataSize, blockInput, &newState, &newStateSize, &blockOutput); colInfo.nullbitmap = bitmap;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
colDataAppendInt32(&colInfo, j, &j);
}
taosArrayPush(pBlock->pDataBlock, &colInfo);
}
SSDataBlock output = {0};
callUdfScalaProcess(handle, pBlock, &output);
SColumnInfoData *col = taosArrayGet(output.pDataBlock, 0);
for (int32_t i = 0; i < output.info.rows; ++i) {
fprintf(stderr, "%d\t%d\n" , i, *(int32_t*)(col->pData + i *sizeof(int32_t)));
} }
taosMemoryFree(state);
taosMemoryFree(input);
teardownUdf(handle); teardownUdf(handle);
stopUdfService(); stopUdfService();
......
...@@ -2,20 +2,68 @@ ...@@ -2,20 +2,68 @@
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include "os.h"
#include "tudf.h" #include "tudf.h"
void udf1(int8_t step, char *state, int32_t stateSize, SUdfDataBlock input, #undef malloc
char **newState, int32_t *newStateSize, SUdfDataBlock *output) { #define malloc malloc
fprintf(stdout, "%s, step:%d\n", "udf function called", step); #undef free
char *newStateBuf = taosMemoryMalloc(stateSize); #define free free
memcpy(newStateBuf, state, stateSize);
*newState = newStateBuf; int32_t udf1_setup() {
*newStateSize = stateSize; return 0;
}
char *outputBuf = taosMemoryMalloc(input.size);
memcpy(outputBuf, input.data, input.size); int32_t udf1_teardown() {
output->data = outputBuf; return 0;
output->size = input.size; }
return;
int32_t udf1(SUdfDataBlock block, SUdfColumn *resultCol) {
SUdfColumnData *resultData = &resultCol->colData;
resultData->numOfRows = block.numOfRows;
SUdfColumnData *srcData = &block.udfCols[0]->colData;
resultData->varLengthColumn = srcData->varLengthColumn;
if (resultData->varLengthColumn) {
resultData->varLenCol.varOffsetsLen = srcData->varLenCol.varOffsetsLen;
resultData->varLenCol.varOffsets = malloc(resultData->varLenCol.varOffsetsLen);
memcpy(resultData->varLenCol.varOffsets, srcData->varLenCol.varOffsets, srcData->varLenCol.varOffsetsLen);
resultData->varLenCol.payloadLen = srcData->varLenCol.payloadLen;
resultData->varLenCol.payload = malloc(resultData->varLenCol.payloadLen);
memcpy(resultData->varLenCol.payload, srcData->varLenCol.payload, srcData->varLenCol.payloadLen);
} else {
resultData->fixLenCol.nullBitmapLen = srcData->fixLenCol.nullBitmapLen;
resultData->fixLenCol.nullBitmap = malloc(resultData->fixLenCol.nullBitmapLen);
memcpy(resultData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmapLen);
resultData->fixLenCol.dataLen = srcData->fixLenCol.dataLen;
resultData->fixLenCol.data = malloc(resultData->fixLenCol.dataLen);
memcpy(resultData->fixLenCol.data, srcData->fixLenCol.data, srcData->fixLenCol.dataLen);
for (int32_t i = 0; i < resultData->numOfRows; ++i) {
*(resultData->fixLenCol.data + i * sizeof(int32_t)) = 88;
}
}
SUdfColumnMeta *meta = &resultCol->colMeta;
meta->bytes = 4;
meta->type = TSDB_DATA_TYPE_INT;
meta->scale = 0;
meta->precision = 0;
return 0;
} }
int32_t udf1_free(SUdfColumn *col) {
SUdfColumnData *data = &col->colData;
if (data->varLengthColumn) {
free(data->varLenCol.varOffsets);
data->varLenCol.varOffsets = NULL;
free(data->varLenCol.payload);
data->varLenCol.payload = NULL;
} else {
free(data->fixLenCol.nullBitmap);
data->fixLenCol.nullBitmap = NULL;
free(data->fixLenCol.data);
data->fixLenCol.data = NULL;
}
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册