diff --git a/source/libs/function/inc/tudf.h b/source/libs/function/inc/tudf.h index f072866c67c3f1c7d62ef5868b0d99169783bd67..41183d5533bcf24833769f86bf1488c1c2b2c4eb 100644 --- a/source/libs/function/inc/tudf.h +++ b/source/libs/function/inc/tudf.h @@ -22,6 +22,8 @@ extern "C" { //====================================================================================== //begin API to taosd and qworker +#define TSDB_UDF_MAX_COLUMNS 4 + enum { UDFC_CODE_STOPPING = -1, UDFC_CODE_RESTARTING = -2, @@ -49,15 +51,22 @@ enum { TSDB_UDF_SCRIPT_LUA = 1, }; +typedef struct SUdfColumnMeta { + int16_t type; + int32_t bytes; // <0 var length, others fixed length bytes + uint8_t precision; + uint8_t scale; +} SUdfColumnMeta; + typedef struct SUdfInfo { char *udfName; // function name int32_t udfType; // scalar function or aggregate function int8_t scriptType; char *path; - int8_t resType; // result type - int16_t resBytes; // result byte - int32_t bufSize; //interbuf size + // known info between qworker and udf + // struct SUdfColumnMeta resultMeta; + // int32_t bufSize; //interbuf size } SUdfInfo; @@ -72,33 +81,50 @@ typedef void *UdfHandle; int32_t setupUdf(SUdfInfo* udf, UdfHandle *handle); -enum { - TSDB_UDF_STEP_NORMAL = 0, - TSDB_UDF_STEP_MERGE, - TSDb_UDF_STEP_FINALIZE, - TSDB_UDF_STEP_MAX_NUM -}; -/** - * call udf - * @param handle udf handle - * @param step - * @param state - * @param stateSize - * @param input - * @param newstate - * @param newStateSize - * @param output - * @return error code - */ +typedef struct SUdfColumnData { + int32_t numOfRows; + bool varLengthColumn; + union { + int32_t nullBitmapLen; + char* nullBitmap; + int32_t dataLen; + char* data; + }; + + union { + int32_t varOffsetsLen; + char* varOffsets; + int32_t payloadLen; + char* payload; + }; +} SUdfColumnData; + + +typedef struct SUdfColumn { + SUdfColumnMeta colMeta; + SUdfColumnData colData; +} SUdfColumn; -//TODO: must change the following after metadata flow and data flow between qworker and udfd is well defined typedef struct SUdfDataBlock { - char* data; - int32_t size; + int32_t numOfRows; + int32_t numOfCols; + SUdfColumn udfCols[TSDB_UDF_MAX_COLUMNS]; } SUdfDataBlock; -int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SUdfDataBlock input, char **newstate, - int32_t *newStateSize, SUdfDataBlock *output); +typedef struct SUdfInterBuf { + int32_t bufLen; + char* buf; +} SUdfInterBuf; + +// input: block, initFirst +// output: interbuf +int32_t callUdfAggProcess(SUdfDataBlock block, SUdfInterBuf *interBuf, bool initFirst); +// input: interBuf +// output: resultData +int32_t callUdfAggFinalize(SUdfInterBuf interBuf, SUdfColumnData* resultData); +// input: block +// output: resultData +int32_t callUdfScalaProcess(SUdfDataBlock block, SUdfColumnData* resultData); /** * tearn down udf @@ -109,30 +135,16 @@ int32_t teardownUdf(UdfHandle handle); // end API to taosd and qworker //============================================================================================================================= -// TODO: Must change // begin API to UDF writer. -// script - -//typedef int32_t (*scriptInitFunc)(void* pCtx); -//typedef void (*scriptNormalFunc)(void* pCtx, char* data, int16_t iType, int16_t iBytes, int32_t numOfRows, -// int64_t* ptList, int64_t key, char* dataOutput, char* tsOutput, int32_t* numOfOutput, -// int16_t oType, int16_t oBytes); -//typedef void (*scriptFinalizeFunc)(void* pCtx, int64_t key, char* dataOutput, int32_t* numOfOutput); -//typedef void (*scriptMergeFunc)(void* pCtx, char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput); -//typedef void (*scriptDestroyFunc)(void* pCtx); - -// dynamic lib +// dynamic lib init and destroy typedef int32_t (*TUdfInitFunc)(); -typedef void (*TUdfDestroyFunc)(); - -typedef void (*TUdfFunc)(int8_t step, - char *state, int32_t stateSize, SUdfDataBlock input, - char **newstate, int32_t *newStateSize, SUdfDataBlock *output); - -//typedef void (*udfMergeFunc)(char *data, int32_t numOfRows, char *dataOutput, int32_t* numOfOutput); -//typedef void (*udfFinalizeFunc)(char* state, int32_t stateSize, SUdfDataBlock *output); +typedef int32_t (*TUdfDestroyFunc)(); +typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock block, SUdfColumnData *resultData); +typedef int32_t (*TUdfAggInit)(SUdfInterBuf *buf); +typedef int32_t (*TUdfAggProcess)(SUdfDataBlock block, SUdfInterBuf *interBuf); +typedef int32_t (*TUdfAggFinalize)(SUdfInterBuf buf, SUdfColumnData *resultData); // end API to UDF writer //======================================================================================================================= diff --git a/source/libs/function/inc/tudfInt.h b/source/libs/function/inc/tudfInt.h index 5f757c1ef0a36d47d52a1b74670d3f1b888b676f..2e0b40591602b94e7d24791f634616ad18ff2328 100644 --- a/source/libs/function/inc/tudfInt.h +++ b/source/libs/function/inc/tudfInt.h @@ -30,6 +30,12 @@ enum { }; +enum { + TSDB_UDF_CALL_AGG_PROC = 0, + TSDb_UDF_CALL_AGG_FIN, + TSDB_UDF_CALL_SCALA_PROC, +}; + typedef struct SUdfSetupRequest { char udfName[16]; // int8_t scriptType; // 0:c, 1: lua, 2:js @@ -42,24 +48,18 @@ typedef struct SUdfSetupResponse { int64_t udfHandle; } SUdfSetupResponse; - typedef struct SUdfCallRequest { int64_t udfHandle; - int8_t step; + int8_t callType; - int32_t inputBytes; - char *input; - - int32_t stateBytes; - char *state; + SUdfDataBlock block; + SUdfInterBuf interBuf; + bool initFirst; } SUdfCallRequest; - typedef struct SUdfCallResponse { - int32_t outputBytes; - char *output; - int32_t newStateBytes; - char *newState; + SUdfColumnData resultData; + SUdfInterBuf interBuf; } SUdfCallResponse; @@ -76,7 +76,11 @@ typedef struct SUdfRequest { int64_t seqNum; int8_t type; - void *subReq; + union { + SUdfSetupRequest setup; + SUdfCallRequest call; + SUdfTeardownRequest teardown; + }; } SUdfRequest; typedef struct SUdfResponse { @@ -85,13 +89,17 @@ typedef struct SUdfResponse { int8_t type; int32_t code; - void *subRsp; + union { + SUdfSetupResponse setupRsp; + SUdfCallResponse callRsp; + SUdfTeardownResponse teardownRsp; + }; } SUdfResponse; -int32_t decodeRequest(char *buf, int32_t bufLen, SUdfRequest **pRequest); -int32_t encodeResponse(char **buf, int32_t *bufLen, SUdfResponse *response); +int32_t decodeRequest(char *buf, int32_t bufLen, SUdfRequest *pRequest); int32_t encodeRequest(char **buf, int32_t *bufLen, SUdfRequest *request); -int32_t decodeResponse(char *buf, int32_t bufLen, SUdfResponse **pResponse); +int32_t decodeResponse(char *buf, int32_t bufLen, SUdfResponse *pResponse); +int32_t encodeResponse(char **buf, int32_t *bufLen, SUdfResponse *response); #ifdef __cplusplus } diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 0cc5b1f92d589bc9f2170b71fd623cfa9f5400b7..d5089ca0ebe53ca8c07e7435e52890e7f8edd6db 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -19,11 +19,17 @@ #include "tudfInt.h" #include "tarray.h" -//TODO: when startup, set thread poll size. add it to cfg +//TODO: when startup, set thread poll size. add it to cfg +//TODO: test for udfd restart //TODO: udfd restart when exist or aborts +//TODO: deal with uv task that has been started and then udfd core dumped //TODO: network error processing. //TODO: add unit test -//TODO: test libuv queue +//TODO: include all global variable under context struct +/* Copyright (c) 2013, Ben Noordhuis + * The QUEUE is copied from queue.h under libuv + * */ + typedef void *QUEUE[2]; /* Private macros. */ @@ -205,194 +211,359 @@ int8_t gUdfcState = UDFC_STATE_INITAL; QUEUE gUdfTaskQueue = {0}; -//TODO: deal with uv task that has been started and then udfd core dumped QUEUE gUvProcTaskQueue = {0}; -int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) { - debugPrint("%s", "encoding request"); +int32_t serializeUdfColumnData(SUdfColumnData* data, char* pBuf) { + char* bufBeg = pBuf; + *(int32_t*)pBuf = data->numOfRows; + pBuf += sizeof(int32_t); + *(bool*)pBuf = data->varLengthColumn; + pBuf += sizeof(bool); + + if (!data->varLengthColumn) { + *(int32_t*)pBuf = data->nullBitmapLen; + pBuf += sizeof(int32_t); + memcpy(pBuf, data->nullBitmap, data->nullBitmapLen); + pBuf += data->nullBitmapLen; + *(int32_t*)pBuf = data->dataLen; + pBuf += sizeof(int32_t); + memcpy(pBuf, data->data, data->dataLen); + pBuf += data->dataLen; + } else { + *(int32_t*)pBuf = data->varOffsetsLen; + pBuf += sizeof(int32_t); + memcpy(pBuf, data->varOffsets, data->varOffsetsLen); + pBuf += data->varOffsetsLen; + *(int32_t*)pBuf = data->payloadLen; + pBuf += sizeof(int32_t); + memcpy(pBuf, data->payload, data->payloadLen); + pBuf += data->payloadLen; + } + int32_t len = pBuf - bufBeg; + return len; +} - int len = sizeof(SUdfRequest) - sizeof(void *); - switch (request->type) { - case UDF_TASK_SETUP: { - SUdfSetupRequest *setup = (SUdfSetupRequest *) (request->subReq); - len += sizeof(SUdfSetupRequest) - 1 * sizeof(char *) + setup->pathSize; - break; - } - case UDF_TASK_CALL: { - SUdfCallRequest *call = (SUdfCallRequest *) (request->subReq); - len += sizeof(SUdfCallRequest) - 2 * sizeof(char *) + call->inputBytes + call->stateBytes; - break; - } - case UDF_TASK_TEARDOWN: { - SUdfTeardownRequest *teardown = (SUdfTeardownRequest *) (request->subReq); - len += sizeof(SUdfTeardownRequest); - break; - } - default: - break; +int32_t deserializeUdfColumnData(SUdfColumnData* pData, char* buf) { + char* bufBeg = buf; + pData->numOfRows = *(int32_t*)buf; + buf += sizeof(int32_t); + pData->varLengthColumn = *(bool*)buf; + buf += sizeof(bool); + + if (!pData->varLengthColumn) { + pData->nullBitmapLen = *(int32_t*)buf; + buf += sizeof(int32_t); + + pData->nullBitmap = buf; + buf += pData->nullBitmapLen; + + pData->dataLen = *(int32_t*)buf; + buf += sizeof(int32_t); + + pData->data = buf; + buf += pData->dataLen; + } else { + pData->varOffsetsLen = *(int32_t*)buf; + buf += sizeof(int32_t); + + pData->varOffsets = buf; + buf += pData->varOffsetsLen; + + pData->payloadLen = *(int32_t*)buf; + buf += sizeof(int32_t); + + pData->payload = buf; + buf += pData->payloadLen; } + int32_t len = buf - bufBeg; + return len; +} - char *bufBegin = taosMemoryMalloc(len); - char *buf = bufBegin; +int32_t serializeUdfColumnMeta(SUdfColumnMeta *meta, char* pBuf) { + char* bufBeg = pBuf; + memcpy(pBuf, meta, sizeof(SUdfColumnMeta)); + pBuf += sizeof(SUdfColumnMeta); - //skip msgLen first - buf += sizeof(int32_t); + int32_t len = pBuf - bufBeg; + return len; +} - *(int64_t *) buf = request->seqNum; - buf += sizeof(int64_t); - *(int8_t *) buf = request->type; - buf += sizeof(int8_t); +int32_t deserializeUdfColumnMeta(SUdfColumnMeta *pMeta, char* buf) { + char *bufBegin = buf; + memcpy(pMeta, buf, sizeof(SUdfColumnMeta)); + buf += sizeof(SUdfColumnMeta); - switch (request->type) { - case UDF_TASK_SETUP: { - SUdfSetupRequest *setup = (SUdfSetupRequest *) (request->subReq); - memcpy(buf, setup->udfName, 16); - buf += 16; - *(int8_t *) buf = setup->scriptType; - buf += sizeof(int8_t); - *(int8_t *) buf = setup->udfType; - buf += sizeof(int8_t); - *(int16_t *) buf = setup->pathSize; - buf += sizeof(int16_t); - memcpy(buf, setup->path, setup->pathSize); - buf += setup->pathSize; - break; - } + int32_t len = buf - bufBegin; + return len; +} - case UDF_TASK_CALL: { - SUdfCallRequest *call = (SUdfCallRequest *) (request->subReq); - *(int64_t *) buf = call->udfHandle; - buf += sizeof(int64_t); - *(int8_t *) buf = call->step; - buf += sizeof(int8_t); - *(int32_t *) buf = call->inputBytes; - buf += sizeof(int32_t); - memcpy(buf, call->input, call->inputBytes); - buf += call->inputBytes; - *(int32_t *) buf = call->stateBytes; - buf += sizeof(int32_t); - memcpy(buf, call->state, call->stateBytes); - buf += call->stateBytes; - break; - } +int32_t serializeUdfColumn(SUdfColumn *udfCol, char *pBuf) { + char *bufBegin = pBuf; - case UDF_TASK_TEARDOWN: { - SUdfTeardownRequest *teardown = (SUdfTeardownRequest *) (request->subReq); - *(int64_t *) buf = teardown->udfHandle; - buf += sizeof(int64_t); - break; - } - default: - break; + int32_t len = serializeUdfColumnMeta(&udfCol->colMeta, pBuf); + pBuf += len; + + len = serializeUdfColumnData(&udfCol->colData, pBuf); + pBuf += len; + + int32_t totalLen = pBuf - bufBegin; + return totalLen; +} + +int32_t deserializeUdfColumn(SUdfColumn *pUdfCol, char *buf) { + char *bufBegin = buf; + + int32_t len = deserializeUdfColumnMeta(&pUdfCol->colMeta, buf); + buf += len; + + len = deserializeUdfColumnData(&pUdfCol->colData, buf); + buf += len; + + int32_t totalLen = buf - bufBegin; + return totalLen; +} + +int32_t serializeUdfDataBlock(SUdfDataBlock *block, char *pBuf) { + char *bufBegin = pBuf; + + *(int32_t*)pBuf = block->numOfRows; + pBuf += sizeof(int32_t); + + *(int32_t*)pBuf = block->numOfCols; + pBuf += sizeof(int32_t); + + for (int32_t i = 0; i < block->numOfCols; ++i) { + SUdfColumn* col = block->udfCols + i; + int32_t l = serializeUdfColumn(col, pBuf); + pBuf += l; } - request->msgLen = buf - bufBegin; - *(int32_t *) bufBegin = request->msgLen; - *pBuf = bufBegin; - *pBufLen = request->msgLen; - debugPrint("\tLen: estimate: %d, actual:%d", len, *pBufLen); - return 0; + int32_t totalLen = pBuf - bufBegin; + return totalLen; } -int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest **pRequest) { - debugPrint("%s", "decoding request"); - if (*(int32_t *) bufMsg != bufLen) { - debugPrint("%s", "decoding request error"); - return -1; +int32_t deserailizeUdfDataBlock(SUdfDataBlock *pBlock, char *buf) { + char *bufBegin = buf; + + pBlock->numOfRows = *(int32_t*)buf; + buf += sizeof(int32_t); + + pBlock->numOfCols = *(int32_t*)buf; + buf += sizeof(int32_t); + + for (int32_t i = 0; i < pBlock->numOfCols; ++i) { + int32_t l = deserializeUdfColumn(pBlock->udfCols + i, buf); + buf += l; } - char *buf = bufMsg; - SUdfRequest *request = taosMemoryMalloc(sizeof(SUdfRequest)); - request->subReq = NULL; - request->msgLen = *(int32_t *) (buf); + + int32_t totalLen = buf - bufBegin; + return totalLen; +} + +int32_t serializeUdfInterBuf(SUdfInterBuf *state, char *pBuf) { + char *bufBegin = pBuf; + + *(int32_t*)pBuf = state->bufLen; + pBuf += sizeof(int32_t); + + memcpy(pBuf, state->buf, state->bufLen); + pBuf += state->bufLen; + + return pBuf-bufBegin; +} + +int32_t deserializeUdfInterBuf(SUdfInterBuf *pState, char *buf) { + char* bufBegin = buf; + pState->bufLen = *(int32_t*)buf; buf += sizeof(int32_t); - request->seqNum = *(int64_t *) (buf); - buf += sizeof(int64_t); - request->type = *(int8_t *) (buf); + pState->buf = buf; + buf += pState->bufLen; + return buf - bufBegin; +} + +int32_t serializeUdfSetupRequest(SUdfSetupRequest *setup, char *buf) { + char *bufBegin = buf; + + memcpy(buf, setup->udfName, 16); + buf += 16; + *(int8_t *) buf = setup->scriptType; buf += sizeof(int8_t); + *(int8_t *) buf = setup->udfType; + buf += sizeof(int8_t); + *(int16_t *) buf = setup->pathSize; + buf += sizeof(int16_t); + memcpy(buf, setup->path, setup->pathSize); + buf += setup->pathSize; - switch (request->type) { - case UDF_TASK_SETUP: { - SUdfSetupRequest *setup = taosMemoryMalloc(sizeof(SUdfSetupRequest)); - - memcpy(setup->udfName, buf, 16); - buf += 16; - setup->scriptType = *(int8_t *) buf; - buf += sizeof(int8_t); - setup->udfType = *(int8_t *) buf; - buf += sizeof(int8_t); - setup->pathSize = *(int16_t *) buf; - buf += sizeof(int16_t); - setup->path = buf; - buf += setup->pathSize; - - request->subReq = setup; - break; - } - case UDF_TASK_CALL: { - SUdfCallRequest *call = taosMemoryMalloc(sizeof(SUdfCallRequest)); - - call->udfHandle = *(int64_t *) buf; - buf += sizeof(int64_t); - call->step = *(int8_t *) buf; - buf += sizeof(int8_t); - call->inputBytes = *(int32_t *) buf; - buf += sizeof(int32_t); - call->input = buf; - buf += call->inputBytes; - call->stateBytes = *(int32_t *) buf; - buf += sizeof(int32_t); - call->state = buf; - buf += call->stateBytes; - - request->subReq = call; - break; - } + return buf - bufBegin; +}; - case UDF_TASK_TEARDOWN: { - SUdfTeardownRequest *teardown = taosMemoryMalloc(sizeof(SUdfTeardownRequest)); +int32_t deserializeUdfSetupRequest(SUdfSetupRequest *setup, char *buf) { + char* bufBegin = buf; + + memcpy(setup->udfName, buf, 16); + buf += 16; + setup->scriptType = *(int8_t *) buf; + buf += sizeof(int8_t); + setup->udfType = *(int8_t *) buf; + buf += sizeof(int8_t); + setup->pathSize = *(int16_t *) buf; + buf += sizeof(int16_t); + setup->path = buf; + buf += setup->pathSize; - teardown->udfHandle = *(int64_t *) buf; - buf += sizeof(int64_t); + return buf - bufBegin; +} - request->subReq = teardown; - } +int32_t serializeUdfTeardownRequest(SUdfTeardownRequest *teardown, char *buf) { + char* bufBegin = buf; + *(int64_t *) buf = teardown->udfHandle; + buf += sizeof(int64_t); - } - if (buf - bufMsg != bufLen) { - debugPrint("%s", "decode request error"); - taosMemoryFree(request->subReq); - taosMemoryFree(request); - return -1; - } - *pRequest = request; + return buf - bufBegin; +} + +int32_t deserializeUdfTeardownRequest(SUdfTeardownRequest *teardown, char *buf) { + char *bufBegin = buf; + teardown->udfHandle = *(int64_t *) buf; + buf += sizeof(int64_t); + return buf - bufBegin; +} + +int32_t serializeUdfSetupResponse(SUdfSetupResponse *setupRsp, char *buf) { + char *bufBegin = buf; + + *(int64_t *) buf = setupRsp->udfHandle; + buf += sizeof(int64_t); + + return buf-bufBegin; +} + +int32_t deserializeUdfSetupResponse(SUdfSetupResponse *setupRsp, char *buf) { + char *bufBegin = buf; + setupRsp->udfHandle = *(int64_t *) buf; + buf += sizeof(int64_t); + return buf-bufBegin; +} + +int32_t serializeUdfTeardownResponse(SUdfTeardownResponse *teardownRsp, char *buf) { return 0; } -int32_t encodeResponse(char **pBuf, int32_t *pBufLen, SUdfResponse *response) { - debugPrint("%s", "encoding response"); +int32_t deserializeUdfTeardownResponse(SUdfTeardownResponse *teardownRsp, char *buf) { + return 0; +} - int32_t len = sizeof(SUdfResponse) - sizeof(void *); +int32_t serializeUdfCallRequest(SUdfCallRequest *call, char *buf) { + char* bufBegin = buf; + *(int64_t *) buf = call->udfHandle; + buf += sizeof(int64_t); + *(int8_t *) buf = call->callType; + buf += sizeof(int8_t); + int32_t l = 0; + l = serializeUdfDataBlock(&call->block, buf); + buf += l; + l = serializeUdfInterBuf(&call->interBuf, buf); + buf += l; - switch (response->type) { - case UDF_TASK_SETUP: { - len += sizeof(SUdfSetupResponse); - break; - } - case UDF_TASK_CALL: { - SUdfCallResponse *callResp = (SUdfCallResponse *) (response->subRsp); - len += sizeof(SUdfCallResponse) - 2 * sizeof(char *) + - callResp->outputBytes + callResp->newStateBytes; - break; - } - case UDF_TASK_TEARDOWN: { - len += sizeof(SUdfTeardownResponse); - break; - } - } + *(bool*)buf = call->initFirst; + buf += sizeof(bool); - char *bufBegin = taosMemoryMalloc(len); - char *buf = bufBegin; + return buf - bufBegin; +} + +int32_t deserializeUdfCallRequest(SUdfCallRequest *call, char *buf) { + char* bufBegin = buf; + call->udfHandle = *(int64_t *) buf; + buf += sizeof(int64_t); + call->callType = *(int8_t *) buf; + buf += sizeof(int8_t); + int32_t l = 0; + l = deserailizeUdfDataBlock(&call->block, buf); + buf += l; + l = deserializeUdfInterBuf(&call->interBuf, buf); + buf += l; + call->initFirst = *(bool*)buf; + buf += sizeof(bool); + return buf - bufBegin; +} + +int32_t serializeUdfCallResponse(SUdfCallResponse *callRsp, char * buf) { + char *bufBegin = buf; + int32_t l = 0; + l = serializeUdfColumnData(&callRsp->resultData, buf); + buf += l; + l = serializeUdfInterBuf(&callRsp->interBuf, buf); + buf += l; + + return buf - bufBegin; +} + +int32_t deserializeUdfCallResponse(SUdfCallResponse *callRsp, char * buf) { + char *bufBegin = buf; + int32_t l = 0; + l =deserializeUdfColumnData(&callRsp->resultData, buf); + buf += l; + l = deserializeUdfInterBuf(&callRsp->interBuf, buf); + buf += l; + return buf - bufBegin; +} + +int32_t serializeUdfRequest(SUdfRequest *request, char *buf) { + char* bufBegin = buf; + //skip msglen first + buf += sizeof(int32_t); + + *(int64_t *) buf = request->seqNum; + buf += sizeof(int64_t); + *(int8_t *) buf = request->type; + buf += sizeof(int8_t); + + int32_t l = 0; + if (request->type == UDF_TASK_SETUP) { + l = serializeUdfSetupRequest(&request->setup, buf); + buf += l; + } else if (request->type == UDF_TASK_CALL) { + l = serializeUdfCallRequest(&request->call, buf); + buf += l; + } else if (request->type == UDF_TASK_TEARDOWN){ + l = serializeUdfTeardownRequest(&request->teardown, buf); + buf += l; + } + *(int32_t*)bufBegin = buf - bufBegin; + return buf - bufBegin; +} + +int32_t deserializeUdfRequest(SUdfRequest *request, char *buf) { + char* bufBegin = buf; + request->msgLen = *(int32_t *) (buf); + buf += sizeof(int32_t); + request->seqNum = *(int64_t *) (buf); + buf += sizeof(int64_t); + request->type = *(int8_t *) (buf); + buf += sizeof(int8_t); + + int32_t l = 0; + if (request->type == UDF_TASK_SETUP) { + l = deserializeUdfSetupRequest(&request->setup, buf); + buf += l; + } else if (request->type == UDF_TASK_CALL) { + l = deserializeUdfCallRequest(&request->call, buf); + buf += l; + } else if (request->type == UDF_TASK_TEARDOWN){ + l = deserializeUdfTeardownRequest(&request->teardown, buf); + buf += l; + } + int32_t totalLen = buf-bufBegin; + if (totalLen != request->msgLen) { + debugPrint("decoding request error"); + return -1; + } + return buf - bufBegin; +} +int32_t serializeUdfResponse(SUdfResponse *response, char *buf) { + char* bufBegin = buf; //skip msgLen buf += sizeof(int32_t); @@ -402,51 +573,30 @@ int32_t encodeResponse(char **pBuf, int32_t *pBufLen, SUdfResponse *response) { buf += sizeof(int8_t); *(int32_t *) buf = response->code; buf += sizeof(int32_t); - - + int32_t l = 0; switch (response->type) { case UDF_TASK_SETUP: { - SUdfSetupResponse *setupResp = (SUdfSetupResponse *) (response->subRsp); - *(int64_t *) buf = setupResp->udfHandle; - buf += sizeof(int64_t); + l = serializeUdfSetupResponse(&response->setupRsp, buf); + buf += l; break; } case UDF_TASK_CALL: { - SUdfCallResponse *callResp = (SUdfCallResponse *) (response->subRsp); - *(int32_t *) buf = callResp->outputBytes; - buf += sizeof(int32_t); - memcpy(buf, callResp->output, callResp->outputBytes); - buf += callResp->outputBytes; - - *(int32_t *) buf = callResp->newStateBytes; - buf += sizeof(int32_t); - memcpy(buf, callResp->newState, callResp->newStateBytes); - buf += callResp->newStateBytes; + l = serializeUdfCallResponse(&response->callRsp, buf); + buf += l; break; } case UDF_TASK_TEARDOWN: { - SUdfTeardownResponse *teardownResp = (SUdfTeardownResponse *) (response->subRsp); - break; + l = serializeUdfTeardownResponse(&response->teardownRsp, buf); + buf += l; } - default: - break; } - response->msgLen = buf - bufBegin; - *(int32_t *) bufBegin = response->msgLen; - *pBuf = bufBegin; - *pBufLen = response->msgLen; - return 0; -} -int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse **pResponse) { - debugPrint("%s", "decoding response"); + *(int32_t*)bufBegin = buf - bufBegin; + return buf - bufBegin; +} - if (*(int32_t *) bufMsg != bufLen) { - debugPrint("%s", "can not decode response"); - return -1; - } - char *buf = bufMsg; - SUdfResponse *rsp = taosMemoryMalloc(sizeof(SUdfResponse)); +int32_t deserializeUdfResponse(SUdfResponse *rsp, char *buf) { + char* bufBegin = buf; rsp->msgLen = *(int32_t *) buf; buf += sizeof(int32_t); rsp->seqNum = *(int64_t *) buf; @@ -455,47 +605,115 @@ int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse **pResponse) { buf += sizeof(int8_t); rsp->code = *(int32_t *) buf; buf += sizeof(int32_t); - + int32_t l = 0; switch (rsp->type) { case UDF_TASK_SETUP: { - SUdfSetupResponse *setupRsp = (SUdfSetupResponse *) taosMemoryMalloc(sizeof(SUdfSetupResponse)); - setupRsp->udfHandle = *(int64_t *) buf; - buf += sizeof(int64_t); - rsp->subRsp = (char *) setupRsp; + l = deserializeUdfSetupResponse(&rsp->setupRsp, buf); + buf += l; break; } case UDF_TASK_CALL: { - SUdfCallResponse *callRsp = (SUdfCallResponse *) taosMemoryMalloc(sizeof(SUdfCallResponse)); - callRsp->outputBytes = *(int32_t *) buf; - buf += sizeof(int32_t); - - callRsp->output = buf; - buf += callRsp->outputBytes; - - callRsp->newStateBytes = *(int32_t *) buf; - buf += sizeof(int32_t); - - callRsp->newState = buf; - buf += callRsp->newStateBytes; - - rsp->subRsp = callRsp; + l = deserializeUdfCallResponse(&rsp->callRsp, buf); + buf += l; break; } case UDF_TASK_TEARDOWN: { - SUdfTeardownResponse *teardownRsp = (SUdfTeardownResponse *) taosMemoryMalloc(sizeof(SUdfTeardownResponse)); - rsp->subRsp = teardownRsp; - break; + l = deserializeUdfTeardownResponse(&rsp->teardownRsp, buf); + buf += l; } - default: - break; } - if (buf - bufMsg != bufLen) { + int32_t total = buf - bufBegin; + if (total != rsp->msgLen) { + debugPrint("decode response error"); + return -1; + } + return buf - bufBegin; +} + +int32_t estimateUdfRequestLen(SUdfRequest *request) { + // a larger estimated is generated + int32_t size = sizeof(SUdfRequest); + if (request->type == UDF_TASK_SETUP) { + size += request->setup.pathSize; + } else if (request->type == UDF_TASK_CALL) { + for (int32_t i = 0; i < request->call.block.numOfCols; ++i) { + SUdfColumn* col = request->call.block.udfCols + i; + if (col->colData.varLengthColumn) { + size += col->colData.varOffsetsLen; + size += col->colData.payloadLen; + } else { + size += col->colData.nullBitmapLen; + size += col->colData.dataLen; + } + } + size += request->call.interBuf.bufLen; + } + return size; +} + +int32_t estimateUdfResponseLen(SUdfResponse *response) { + int32_t size = sizeof(SUdfResponse); + if (response->type == UDF_TASK_CALL) { + size += response->callRsp.interBuf.bufLen; + SUdfColumnData *resultData = &response->callRsp.resultData; + if (!resultData->varLengthColumn) { + size += resultData->nullBitmapLen; + size += resultData->dataLen; + } else { + size += resultData->varOffsetsLen; + size += resultData->payloadLen; + } + } + + return size; +} + +int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) { + debugPrint("%s", "encoding request"); + + int len = estimateUdfRequestLen(request); + + char *bufBegin = taosMemoryMalloc(len); + char *buf = bufBegin; + serializeUdfRequest(request, buf); + *pBuf = bufBegin; + *pBufLen = request->msgLen; + debugPrint("\tLen: estimate: %d, actual:%d", len, *pBufLen); + return 0; +} + +int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest *pRequest) { + debugPrint("%s", "decoding request"); + if (*(int32_t *) bufMsg != bufLen) { + debugPrint("%s", "decoding request error"); + return -1; + } + char *buf = bufMsg; + deserializeUdfRequest(pRequest, buf); + return 0; +} + +int32_t encodeResponse(char **pBuf, int32_t *pBufLen, SUdfResponse *response) { + debugPrint("%s", "encoding response"); + int32_t len = estimateUdfResponseLen(response); + + char *bufBegin = taosMemoryMalloc(len); + char *buf = bufBegin; + serializeUdfResponse(response, buf); + *pBuf = bufBegin; + *pBufLen = response->msgLen; + return 0; +} + +int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse *rsp) { + debugPrint("%s", "decoding response"); + + if (*(int32_t *) bufMsg != bufLen) { debugPrint("%s", "can not decode response"); - taosMemoryFree(rsp->subRsp); - taosMemoryFree(rsp); return -1; } - *pResponse = rsp; + char *buf = bufMsg; + deserializeUdfResponse(rsp, buf); return 0; } @@ -519,23 +737,23 @@ int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvT debugPrint("%s", "get uv task result"); if (uvTask->type == UV_TASK_REQ_RSP) { if (uvTask->rspBuf.base != NULL) { - SUdfResponse *rsp; + SUdfResponse rsp; decodeResponse(uvTask->rspBuf.base, uvTask->rspBuf.len, &rsp); - task->errCode = rsp->code; + task->errCode = rsp.code; switch (task->type) { case UDF_TASK_SETUP: { //TODO: copy or not - task->_setup.rsp = *(SUdfSetupResponse *) (rsp->subRsp); + task->_setup.rsp = rsp.setupRsp; break; } case UDF_TASK_CALL: { - task->_call.rsp = *(SUdfCallResponse *) (rsp->subRsp); + task->_call.rsp = rsp.callRsp; //TODO: copy or not break; } case UDF_TASK_TEARDOWN: { - task->_teardown.rsp = *(SUdfTeardownResponse *) (rsp->subRsp); + task->_teardown.rsp = rsp.teardownRsp; //TODO: copy or not? break; } @@ -546,8 +764,6 @@ int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvT // TODO: the call buffer is setup and freed by udf invocation taosMemoryFree(uvTask->rspBuf.base); - taosMemoryFree(rsp->subRsp); - taosMemoryFree(rsp); } else { task->errCode = uvTask->errCode; } @@ -714,13 +930,13 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN request.seqNum = gUdfTaskSeqNum++; if (task->type == UDF_TASK_SETUP) { - request.subReq = &task->_setup.req; + request.setup = task->_setup.req; request.type = UDF_TASK_SETUP; } else if (task->type == UDF_TASK_CALL) { - request.subReq = &task->_call.req; + request.call = task->_call.req; request.type = UDF_TASK_CALL; } else if (task->type == UDF_TASK_TEARDOWN) { - request.subReq = &task->_teardown.req; + request.teardown = task->_teardown.req; request.type = UDF_TASK_TEARDOWN; } else { //TODO log and return error @@ -947,8 +1163,7 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) { udfcGetUvTaskResponseResult(task, uvTask); if (uvTaskType == UV_TASK_CONNECT) { task->session->udfSvcPipe = uvTask->pipe; - } - taosMemoryFree(uvTask); + } taosMemoryFree(uvTask); uvTask = NULL; return task->errCode; } @@ -983,8 +1198,8 @@ int32_t setupUdf(SUdfInfo *udfInfo, UdfHandle *handle) { return err; } -int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SUdfDataBlock input, char **newState, - int32_t *newStateSize, SUdfDataBlock *output) { +int32_t callUdf(UdfHandle handle, int8_t callType, SUdfDataBlock *input, SUdfInterBuf *state, + SUdfColumnData* output, SUdfInterBuf *newState, bool initFirst) { debugPrint("%s", "client call udf"); SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); @@ -993,24 +1208,47 @@ int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, S task->type = UDF_TASK_CALL; SUdfCallRequest *req = &task->_call.req; + switch (callType) { + case TSDB_UDF_CALL_AGG_PROC: { + req->block = *input; + req->interBuf = *state; + req->initFirst = initFirst; + break; + } + + case TSDb_UDF_CALL_AGG_FIN: { + req->interBuf = *state; + break; + } + case TSDB_UDF_CALL_SCALA_PROC: { + req->block = *input; + break; + } + } + - req->state = state; - req->stateBytes = stateSize; - req->inputBytes = input.size; - req->input = input.data; - req->udfHandle = task->session->severHandle; - req->step = step; udfcRunUvTask(task, UV_TASK_REQ_RSP); SUdfCallResponse *rsp = &task->_call.rsp; - *newState = rsp->newState; - *newStateSize = rsp->newStateBytes; - output->size = rsp->outputBytes; - output->data = rsp->output; - int32_t err = task->errCode; + switch (callType) { + case TSDB_UDF_CALL_AGG_PROC: { + *newState = rsp->interBuf; + break; + } + + case TSDb_UDF_CALL_AGG_FIN: { + *output = rsp->resultData; + break; + } + case TSDB_UDF_CALL_SCALA_PROC: { + *output = rsp->resultData; + break; + } + } + taosMemoryFree(task); - return err; + return task->errCode; } int32_t teardownUdf(UdfHandle handle) { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 2d9344709a8a281f9c1eb4a1d19af354390eff61..41995c819265246d45a4134d386b8ea23bf66b6b 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -44,7 +44,7 @@ typedef struct SUdf { int8_t type; uv_lib_t lib; - TUdfFunc normalFunc; + TUdfScalarProcFunc normalFunc; } SUdf; //TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix