提交 946b565e 编写于 作者: S shenglian zhou

sync home and office

上级 3f62f8a3
......@@ -5,6 +5,7 @@ target_include_directories(
function
PUBLIC
"${TD_SOURCE_DIR}/include/libs/function"
"${TD_SOURCE_DIR}/inlcude/util"
"${TD_SOURCE_DIR}/contrib/libuv/include"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
......@@ -21,6 +22,7 @@ target_include_directories(
PUBLIC
"${TD_SOURCE_DIR}/include/libs/function"
"${TD_SOURCE_DIR}/contrib/libuv/include"
"${TD_SOURCE_DIR}/inlcude/util"
"${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
......@@ -35,6 +37,7 @@ target_include_directories(
udf1
PUBLIC
"${TD_SOURCE_DIR}/include/libs/function"
"${TD_SOURCE_DIR}/include/util"
"${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
......@@ -46,6 +49,7 @@ target_include_directories(
PUBLIC
"${TD_SOURCE_DIR}/include/libs/function"
"${TD_SOURCE_DIR}/contrib/libuv/include"
"${TD_SOURCE_DIR}/inlcude/util"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
......
......@@ -16,13 +16,17 @@
#ifndef TDENGINE_TUDF_H
#define TDENGINE_TUDF_H
#include <stdint.h>
#include <stdbool.h>
#include "tmsg.h"
#ifdef __cplusplus
extern "C" {
#endif
//======================================================================================
//begin API to taosd and qworker
#define TSDB_UDF_MAX_COLUMNS 4
enum {
UDFC_CODE_STOPPING = -1,
......@@ -41,35 +45,6 @@ int32_t startUdfService();
*/
int32_t stopUdfService();
enum {
TSDB_UDF_TYPE_SCALAR = 0,
TSDB_UDF_TYPE_AGGREGATE = 1
};
enum {
TSDB_UDF_SCRIPT_BIN_LIB = 0,
TSDB_UDF_SCRIPT_LUA = 1,
};
typedef struct SUdfColumnMeta {
int16_t type;
int32_t bytes; // <0 var length, others fixed length bytes
uint8_t precision;
uint8_t scale;
} SUdfColumnMeta;
typedef struct SUdfInfo {
char *udfName; // function name
int32_t udfType; // scalar function or aggregate function
int8_t scriptType;
char *path;
// known info between qworker and udf
// struct SUdfColumnMeta resultMeta;
// int32_t bufSize; //interbuf size
} SUdfInfo;
typedef void *UdfHandle;
/**
......@@ -78,8 +53,14 @@ typedef void *UdfHandle;
* @param handle, out
* @return error code
*/
int32_t setupUdf(SUdfInfo* udf, UdfHandle *handle);
int32_t setupUdf(char udfName[], SEpSet epSet, UdfHandle *handle);
typedef struct SUdfColumnMeta {
int16_t type;
int32_t bytes; // <0 var length, others fixed length bytes
uint8_t precision;
uint8_t scale;
} SUdfColumnMeta;
typedef struct SUdfColumnData {
int32_t numOfRows;
......@@ -108,7 +89,7 @@ typedef struct SUdfColumn {
typedef struct SUdfDataBlock {
int32_t numOfRows;
int32_t numOfCols;
SUdfColumn udfCols[TSDB_UDF_MAX_COLUMNS];
SUdfColumn **udfCols;
} SUdfDataBlock;
typedef struct SUdfInterBuf {
......@@ -138,13 +119,17 @@ int32_t teardownUdf(UdfHandle handle);
// begin API to UDF writer.
// dynamic lib init and destroy
typedef int32_t (*TUdfInitFunc)();
typedef int32_t (*TUdfDestroyFunc)();
typedef int32_t (*TUdfSetupFunc)();
typedef int32_t (*TUdfTeardownFunc)();
typedef int32_t (*TUdfFreeUdfColumnDataFunc)(SUdfColumnData* columnData);
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock block, SUdfColumnData *resultData);
typedef int32_t (*TUdfAggInit)(SUdfInterBuf *buf);
typedef int32_t (*TUdfAggProcess)(SUdfDataBlock block, SUdfInterBuf *interBuf);
typedef int32_t (*TUdfAggFinalize)(SUdfInterBuf buf, SUdfColumnData *resultData);
typedef int32_t (*TUdfAggInitFunc)(SUdfInterBuf *buf);
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock block, SUdfInterBuf *interBuf);
typedef int32_t (*TUdfAggFinalizeFunc)(SUdfInterBuf buf, SUdfColumnData *resultData);
// end API to UDF writer
//=======================================================================================================================
......
......@@ -15,7 +15,6 @@
#ifndef TDENGINE_TUDF_INT_H
#define TDENGINE_TUDF_INT_H
#ifdef __cplusplus
extern "C" {
#endif
......@@ -37,11 +36,8 @@ enum {
};
typedef struct SUdfSetupRequest {
char udfName[16]; //
int8_t scriptType; // 0:c, 1: lua, 2:js
int8_t udfType; //udaf, udf
int16_t pathSize;
char *path;
char udfName[TSDB_FUNC_NAME_LEN];
SEpSet epSet;
} SUdfSetupRequest;
typedef struct SUdfSetupResponse {
......@@ -101,6 +97,10 @@ int32_t encodeRequest(char **buf, int32_t *bufLen, SUdfRequest *request);
int32_t decodeResponse(char *buf, int32_t bufLen, SUdfResponse *pResponse);
int32_t encodeResponse(char **buf, int32_t *bufLen, SUdfResponse *response);
void freeUdfColumnData(SUdfColumnData *data);
void freeUdfColumn(SUdfColumn* col);
void freeUdfDataDataBlock(SUdfDataBlock *block);
#ifdef __cplusplus
}
#endif
......
......@@ -254,25 +254,30 @@ int32_t deserializeUdfColumnData(SUdfColumnData* pData, char* buf) {
pData->nullBitmapLen = *(int32_t*)buf;
buf += sizeof(int32_t);
pData->nullBitmap = buf;
//TODO: optimize for less memory copy
pData->nullBitmap = taosMemoryMalloc(pData->nullBitmapLen);
memcpy(pData->nullBitmap, buf, pData->nullBitmapLen);
buf += pData->nullBitmapLen;
pData->dataLen = *(int32_t*)buf;
buf += sizeof(int32_t);
pData->data = buf;
pData->data = taosMemoryMalloc(pData->dataLen);
memcpy(pData->data, buf, pData->dataLen);
buf += pData->dataLen;
} else {
pData->varOffsetsLen = *(int32_t*)buf;
buf += sizeof(int32_t);
pData->varOffsets = buf;
pData->varOffsets = taosMemoryMalloc(pData->varOffsetsLen);
memcpy(pData->varOffsets, buf, pData->varOffsetsLen);
buf += pData->varOffsetsLen;
pData->payloadLen = *(int32_t*)buf;
buf += sizeof(int32_t);
pData->payload = buf;
pData->payload = taosMemoryMalloc(pData->payloadLen);
memcpy(pData->payload, buf, pData->payloadLen);
buf += pData->payloadLen;
}
int32_t len = buf - bufBeg;
......@@ -333,7 +338,7 @@ int32_t serializeUdfDataBlock(SUdfDataBlock *block, char *pBuf) {
pBuf += sizeof(int32_t);
for (int32_t i = 0; i < block->numOfCols; ++i) {
SUdfColumn* col = block->udfCols + i;
SUdfColumn* col = block->udfCols[i];
int32_t l = serializeUdfColumn(col, pBuf);
pBuf += l;
}
......@@ -342,7 +347,7 @@ int32_t serializeUdfDataBlock(SUdfDataBlock *block, char *pBuf) {
return totalLen;
}
int32_t deserailizeUdfDataBlock(SUdfDataBlock *pBlock, char *buf) {
int32_t deserializeUdfDataBlock(SUdfDataBlock *pBlock, char *buf) {
char *bufBegin = buf;
pBlock->numOfRows = *(int32_t*)buf;
......@@ -351,8 +356,10 @@ int32_t deserailizeUdfDataBlock(SUdfDataBlock *pBlock, char *buf) {
pBlock->numOfCols = *(int32_t*)buf;
buf += sizeof(int32_t);
pBlock->udfCols = taosMemoryMalloc(sizeof(SUdfColumn*) * pBlock->numOfCols);
for (int32_t i = 0; i < pBlock->numOfCols; ++i) {
int32_t l = deserializeUdfColumn(pBlock->udfCols + i, buf);
pBlock->udfCols[i] = taosMemoryMalloc(sizeof(SUdfColumn));
int32_t l = deserializeUdfColumn(pBlock->udfCols[i], buf);
buf += l;
}
......@@ -369,14 +376,16 @@ int32_t serializeUdfInterBuf(SUdfInterBuf *state, char *pBuf) {
memcpy(pBuf, state->buf, state->bufLen);
pBuf += state->bufLen;
return pBuf-bufBegin;
return pBuf - bufBegin;
}
int32_t deserializeUdfInterBuf(SUdfInterBuf *pState, char *buf) {
char* bufBegin = buf;
pState->bufLen = *(int32_t*)buf;
buf += sizeof(int32_t);
pState->buf = buf;
pState->buf = taosMemoryMalloc(pState->bufLen);
memcpy(pState->buf, buf, pState->bufLen);
buf += pState->bufLen;
return buf - bufBegin;
}
......@@ -384,16 +393,11 @@ int32_t deserializeUdfInterBuf(SUdfInterBuf *pState, char *buf) {
int32_t serializeUdfSetupRequest(SUdfSetupRequest *setup, char *buf) {
char *bufBegin = buf;
memcpy(buf, setup->udfName, 16);
buf += 16;
*(int8_t *) buf = setup->scriptType;
buf += sizeof(int8_t);
*(int8_t *) buf = setup->udfType;
buf += sizeof(int8_t);
*(int16_t *) buf = setup->pathSize;
buf += sizeof(int16_t);
memcpy(buf, setup->path, setup->pathSize);
buf += setup->pathSize;
memcpy(buf, setup->udfName, TSDB_FUNC_NAME_LEN);
buf += TSDB_FUNC_NAME_LEN;
memcpy(buf, &setup->epSet, sizeof(SEpSet));
buf += sizeof(SEpSet);
return buf - bufBegin;
};
......@@ -401,17 +405,11 @@ int32_t serializeUdfSetupRequest(SUdfSetupRequest *setup, char *buf) {
int32_t deserializeUdfSetupRequest(SUdfSetupRequest *setup, char *buf) {
char* bufBegin = buf;
memcpy(setup->udfName, buf, 16);
buf += 16;
setup->scriptType = *(int8_t *) buf;
buf += sizeof(int8_t);
setup->udfType = *(int8_t *) buf;
buf += sizeof(int8_t);
setup->pathSize = *(int16_t *) buf;
buf += sizeof(int16_t);
setup->path = buf;
buf += setup->pathSize;
memcpy(setup->udfName, buf, TSDB_FUNC_NAME_LEN);
buf += TSDB_FUNC_NAME_LEN;
memcpy(&setup->epSet, buf, sizeof(SEpSet));
buf += sizeof(SEpSet);
return buf - bufBegin;
}
......@@ -479,7 +477,7 @@ int32_t deserializeUdfCallRequest(SUdfCallRequest *call, char *buf) {
call->callType = *(int8_t *) buf;
buf += sizeof(int8_t);
int32_t l = 0;
l = deserailizeUdfDataBlock(&call->block, buf);
l = deserializeUdfDataBlock(&call->block, buf);
buf += l;
l = deserializeUdfInterBuf(&call->interBuf, buf);
buf += l;
......@@ -633,11 +631,11 @@ int32_t deserializeUdfResponse(SUdfResponse *rsp, char *buf) {
int32_t estimateUdfRequestLen(SUdfRequest *request) {
// a larger estimated is generated
int32_t size = sizeof(SUdfRequest);
if (request->type == UDF_TASK_SETUP) {
size += request->setup.pathSize;
} else if (request->type == UDF_TASK_CALL) {
if (request->type == UDF_TASK_CALL) {
size += request->call.block.numOfCols * sizeof(SUdfColumn*);
for (int32_t i = 0; i < request->call.block.numOfCols; ++i) {
SUdfColumn* col = request->call.block.udfCols + i;
size += sizeof(SUdfColumn);
SUdfColumn* col = request->call.block.udfCols[i];
if (col->colData.varLengthColumn) {
size += col->colData.varOffsetsLen;
size += col->colData.payloadLen;
......@@ -717,6 +715,39 @@ int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse *rsp) {
return 0;
}
void freeUdfColumnData(SUdfColumnData *data) {
if (data->varLengthColumn) {
taosMemoryFree(data->varOffsets);
data->varOffsets = NULL;
taosMemoryFree(data->payload);
data->payload = NULL;
} else {
taosMemoryFree(data->nullBitmap);
data->nullBitmap = NULL;
taosMemoryFree(data->data);
data->data = NULL;
}
}
void freeUdfColumn(SUdfColumn* col) {
freeUdfColumnData(&col->colData);
}
void freeUdfDataDataBlock(SUdfDataBlock *block) {
for (int32_t i = 0; i < block->numOfCols; ++i) {
freeUdfColumn(block->udfCols[i]);
taosMemoryFree(block->udfCols[i]);
block->udfCols[i] = NULL;
}
taosMemoryFree(block->udfCols);
block->udfCols = NULL;
}
void freeUdfInterBuf(SUdfInterBuf *buf) {
taosMemoryFree(buf->buf);
buf->buf = NULL;
}
void onUdfcPipeClose(uv_handle_t *handle) {
SClientUvConn *conn = handle->data;
if (!QUEUE_EMPTY(&conn->taskQueue)) {
......@@ -1168,7 +1199,7 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) {
return task->errCode;
}
int32_t setupUdf(SUdfInfo *udfInfo, UdfHandle *handle) {
int32_t setupUdf(char udfName[TSDB_FUNC_NAME_LEN], UdfHandle *handle) {
debugPrint("%s", "client setup udf");
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
task->errCode = 0;
......@@ -1176,11 +1207,7 @@ int32_t setupUdf(SUdfInfo *udfInfo, UdfHandle *handle) {
task->type = UDF_TASK_SETUP;
SUdfSetupRequest *req = &task->_setup.req;
memcpy(req->udfName, udfInfo->udfName, 16);
req->path = udfInfo->path;
req->pathSize = strlen(req->path) + 1;
req->udfType = udfInfo->udfType;
req->scriptType = udfInfo->scriptType;
memcpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
int32_t errCode = udfcRunUvTask(task, UV_TASK_CONNECT);
if (errCode != 0) {
......
......@@ -44,7 +44,8 @@ typedef struct SUdf {
int8_t type;
uv_lib_t lib;
TUdfScalarProcFunc normalFunc;
TUdfScalarProcFunc scalarProcFunc;
TUdfFreeUdfColumnDataFunc freeUdfColumnData;
} SUdf;
//TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
......@@ -56,19 +57,21 @@ typedef struct SUdfHandle {
void udfdProcessRequest(uv_work_t *req) {
SUvUdfWork *uvUdf = (SUvUdfWork *) (req->data);
SUdfRequest *request = NULL;
SUdfRequest request = {0};
decodeRequest(uvUdf->input.base, uvUdf->input.len, &request);
switch (request->type) {
switch (request.type) {
case UDF_TASK_SETUP: {
debugPrint("%s", "process setup request");
SUdf *udf = taosMemoryMalloc(sizeof(SUdf));
udf->refCount = 0;
SUdfSetupRequest *setup = request->subReq;
SUdfSetupRequest *setup = &request.setup;
strcpy(udf->name, setup->udfName);
int err = uv_dlopen(setup->path, &udf->lib);
//TODO: retrive udf info from mnode
char* path = "udf1.so";
int err = uv_dlopen(path, &udf->lib);
if (err != 0) {
debugPrint("can not load library %s. error: %s", setup->path, uv_strerror(err));
debugPrint("can not load library %s. error: %s", path, uv_strerror(err));
//TODO set error
}
......@@ -76,99 +79,86 @@ void udfdProcessRequest(uv_work_t *req) {
strcpy(normalFuncName, setup->udfName);
//TODO error,
//TODO find all functions normal, init, destroy, normal, merge, finalize
uv_dlsym(&udf->lib, normalFuncName, (void **) (&udf->normalFunc));
uv_dlsym(&udf->lib, normalFuncName, (void **) (&udf->scalarProcFunc));
SUdfHandle *handle = taosMemoryMalloc(sizeof(SUdfHandle));
handle->udf = udf;
udf->refCount++;
//TODO: allocate private structure and call init function and set it to handle
SUdfResponse *rsp = taosMemoryMalloc(sizeof(SUdfResponse));
rsp->seqNum = request->seqNum;
rsp->type = request->type;
rsp->code = 0;
SUdfSetupResponse *subRsp = taosMemoryMalloc(sizeof(SUdfSetupResponse));
subRsp->udfHandle = (int64_t) (handle);
rsp->subRsp = subRsp;
SUdfResponse rsp;
rsp.seqNum = request.seqNum;
rsp.type = request.type;
rsp.code = 0;
rsp.setupRsp.udfHandle = (int64_t) (handle);
char *buf;
int32_t len;
encodeResponse(&buf, &len, rsp);
encodeResponse(&buf, &len, &rsp);
uvUdf->output = uv_buf_init(buf, len);
taosMemoryFree(rsp->subRsp);
taosMemoryFree(rsp);
taosMemoryFree(request->subReq);
taosMemoryFree(request);
taosMemoryFree(uvUdf->input.base);
break;
}
case UDF_TASK_CALL: {
debugPrint("%s", "process call request");
SUdfCallRequest *call = request->subReq;
SUdfCallRequest *call = &request.call;
SUdfHandle *handle = (SUdfHandle *) (call->udfHandle);
SUdf *udf = handle->udf;
char *newState;
int32_t newStateSize;
SUdfDataBlock input = {.data = call->input, .size= call->inputBytes};
SUdfDataBlock output;
//TODO: call different functions according to the step
udf->normalFunc(call->step, call->state, call->stateBytes, input, &newState, &newStateSize, &output);
SUdfResponse *rsp = taosMemoryMalloc(sizeof(SUdfResponse));
rsp->seqNum = request->seqNum;
rsp->type = request->type;
rsp->code = 0;
SUdfCallResponse *subRsp = taosMemoryMalloc(sizeof(SUdfCallResponse));
subRsp->outputBytes = output.size;
subRsp->output = output.data;
subRsp->newStateBytes = newStateSize;
subRsp->newState = newState;
rsp->subRsp = subRsp;
SUdfDataBlock input = call->block;
SUdfColumnData output;
//TODO: call different functions according to call type, for now just calar
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
udf->scalarProcFunc(input, &output);
}
SUdfResponse response = {0};
SUdfResponse *rsp = &response;
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
rsp->seqNum = request.seqNum;
rsp->type = request.type;
rsp->code = 0;
SUdfCallResponse *subRsp = &rsp->callRsp;
subRsp->resultData = output;
}
char *buf;
int32_t len;
encodeResponse(&buf, &len, rsp);
uvUdf->output = uv_buf_init(buf, len);
taosMemoryFree(rsp->subRsp);
taosMemoryFree(rsp);
taosMemoryFree(newState);
taosMemoryFree(output.data);
taosMemoryFree(request->subReq);
taosMemoryFree(request);
//TODO: free
udf->freeUdfColumnData(&output);
taosMemoryFree(uvUdf->input.base);
break;
}
case UDF_TASK_TEARDOWN: {
debugPrint("%s", "process teardown request");
SUdfTeardownRequest *teardown = request->subReq;
SUdfTeardownRequest *teardown = &request.teardown;
SUdfHandle *handle = (SUdfHandle *) (teardown->udfHandle);
SUdf *udf = handle->udf;
udf->refCount--;
if (udf->refCount == 0) {
uv_dlclose(&udf->lib);
taosMemoryFree(udf);
}
taosMemoryFree(udf);
//TODO: call destroy and free udf private
//TODO: call destroy and free udf private
taosMemoryFree(handle);
SUdfResponse *rsp = taosMemoryMalloc(sizeof(SUdfResponse));
rsp->seqNum = request->seqNum;
rsp->type = request->type;
SUdfResponse response;
SUdfResponse *rsp = &response;
rsp->seqNum = request.seqNum;
rsp->type = request.type;
rsp->code = 0;
SUdfTeardownResponse *subRsp = taosMemoryMalloc(sizeof(SUdfTeardownResponse));
rsp->subRsp = subRsp;
SUdfTeardownResponse *subRsp = &response.teardownRsp;
char *buf;
int32_t len;
encodeResponse(&buf, &len, rsp);
uvUdf->output = uv_buf_init(buf, len);
taosMemoryFree(rsp->subRsp);
taosMemoryFree(rsp);
taosMemoryFree(request->subReq);
taosMemoryFree(request);
taosMemoryFree(uvUdf->input.base);
break;
}
......
......@@ -3,17 +3,53 @@
#include <stdio.h>
#include "tudf.h"
void udf1(int8_t step, char *state, int32_t stateSize, SUdfDataBlock input,
char **newState, int32_t *newStateSize, SUdfDataBlock *output) {
fprintf(stdout, "%s, step:%d\n", "udf function called", step);
char *newStateBuf = malloc(stateSize);
memcpy(newStateBuf, state, stateSize);
*newState = newStateBuf;
*newStateSize = stateSize;
char *outputBuf = malloc(input.size);
memcpy(outputBuf, input.data, input.size);
output->data = outputBuf;
output->size = input.size;
return;
int32_t udf1_setup() {
return 0;
}
int32_t udf1_teardown() {
return 0;
}
int32_t udf1(SUdfDataBlock block, SUdfColumnData *resultData) {
resultData->numOfRows = block.numOfRows;
SUdfColumnData *srcData = &block.udfCols[0]->colData;
resultData->varLengthColumn = srcData->varLengthColumn;
if (resultData->varLengthColumn) {
resultData->varOffsetsLen = srcData->varOffsetsLen;
resultData->varOffsets = malloc(resultData->varOffsetsLen);
memcpy(resultData->varOffsets, srcData->varOffsets, srcData->varOffsetsLen);
resultData->payloadLen = srcData->payloadLen;
resultData->payload = malloc(resultData->payloadLen);
memcpy(resultData->payload, srcData->payload, srcData->payloadLen);
} else {
resultData->nullBitmapLen = srcData->nullBitmapLen;
resultData->nullBitmap = malloc(resultData->nullBitmapLen);
memcpy(resultData->nullBitmap, srcData->nullBitmap, srcData->nullBitmapLen);
resultData->dataLen = srcData->dataLen;
resultData->data = malloc(resultData->dataLen);
memcpy(resultData->data, srcData->data, srcData->dataLen);
}
return 0;
}
int32_t udf1_free(SUdfColumnData *data) {
if (data->varLengthColumn) {
free(data->varOffsets);
data->varOffsets = NULL;
free(data->payload);
data->payload = NULL;
} else {
free(data->nullBitmap);
data->nullBitmap = NULL;
free(data->data);
data->data = NULL;
}
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册