提交 f8dd3a13 编写于 作者: S slzhou

feature(udf):refactor code and fix bugs

上级 f54cdb02
...@@ -44,7 +44,8 @@ enum { ...@@ -44,7 +44,8 @@ enum {
UDFC_CODE_PIPE_READ_ERR = -2, UDFC_CODE_PIPE_READ_ERR = -2,
UDFC_CODE_CONNECT_PIPE_ERR = -3, UDFC_CODE_CONNECT_PIPE_ERR = -3,
UDFC_CODE_LOAD_UDF_FAILURE = -4, UDFC_CODE_LOAD_UDF_FAILURE = -4,
UDFC_CODE_INVALID_STATE = -5 UDFC_CODE_INVALID_STATE = -5,
UDFC_CODE_NO_PIPE = -6,
}; };
typedef void *UdfcFuncHandle; typedef void *UdfcFuncHandle;
...@@ -140,6 +141,44 @@ typedef int32_t (*TUdfDestroyFunc)(); ...@@ -140,6 +141,44 @@ typedef int32_t (*TUdfDestroyFunc)();
#define UDF_MEMORY_EXP_GROWTH 1.5 #define UDF_MEMORY_EXP_GROWTH 1.5
#define udfColDataIsNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] == -1)
#define udfColDataIsNull_f(pColumn, row) ((BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) & (1u << (7u - BitPos(row)))) == (1u << (7u - BitPos(row))))
#define udfColDataSetNull_f(pColumn, row) \
do { \
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) |= (1u << (7u - BitPos(row))); \
} while (0)
#define udfColDataSetNotNull_f(pColumn, r_) \
do { \
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, r_) &= ~(1u << (7u - BitPos(r_))); \
} while (0)
#define udfColDataSetNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] = -1)
static FORCE_INLINE char* udfColDataGetData(const SUdfColumn* pColumn, int32_t row) {
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
return pColumn->colData.varLenCol.payload + pColumn->colData.varLenCol.varOffsets[row];
} else {
return pColumn->colData.fixLenCol.data + pColumn->colMeta.bytes * row;
}
}
static FORCE_INLINE bool udfColDataIsNull(const SUdfColumn* pColumn, int32_t row) {
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
if (pColumn->colMeta.type == TSDB_DATA_TYPE_JSON) {
if (udfColDataIsNull_var(pColumn, row)) {
return true;
}
char* data = udfColDataGetData(pColumn, row);
return (*data == TSDB_DATA_TYPE_NULL);
} else {
return udfColDataIsNull_var(pColumn, row);
}
} else {
return udfColDataIsNull_f(pColumn, row);
}
}
static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t newCapacity) { static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t newCapacity) {
SUdfColumnMeta *meta = &pColumn->colMeta; SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData; SUdfColumnData *data = &pColumn->colData;
...@@ -186,17 +225,22 @@ static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t ne ...@@ -186,17 +225,22 @@ static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t ne
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE int32_t udfColSetRow(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) { static FORCE_INLINE void udfColDataSetNull(SUdfColumn* pColumn, int32_t row) {
udfColEnsureCapacity(pColumn, row+1);
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
udfColDataSetNull_var(pColumn, row);
} else {
udfColDataSetNull_f(pColumn, row);
}
}
static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) {
SUdfColumnMeta *meta = &pColumn->colMeta; SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData; SUdfColumnData *data = &pColumn->colData;
udfColEnsureCapacity(pColumn, currentRow+1); udfColEnsureCapacity(pColumn, currentRow+1);
bool isVarCol = IS_VAR_DATA_TYPE(meta->type); bool isVarCol = IS_VAR_DATA_TYPE(meta->type);
if (isNull) { if (isNull) {
if (isVarCol) { udfColDataSetNull(pColumn, currentRow);
data->varLenCol.varOffsets[currentRow] = -1;
} else {
colDataSetNull_f(data->fixLenCol.nullBitmap, currentRow);
}
} else { } else {
if (!isVarCol) { if (!isVarCol) {
colDataSetNotNull_f(data->fixLenCol.nullBitmap, currentRow); colDataSetNotNull_f(data->fixLenCol.nullBitmap, currentRow);
......
...@@ -146,15 +146,15 @@ typedef struct SUdfdProxy { ...@@ -146,15 +146,15 @@ typedef struct SUdfdProxy {
SUdfdProxy gUdfdProxy = {0}; SUdfdProxy gUdfdProxy = {0};
typedef struct SUdfUvSession { typedef struct SClientUdfUvSession {
SUdfdProxy *udfc; SUdfdProxy *udfc;
int64_t severHandle; int64_t severHandle;
uv_pipe_t *udfSvcPipe; uv_pipe_t *udfUvPipe;
int8_t outputType; int8_t outputType;
int32_t outputLen; int32_t outputLen;
int32_t bufSize; int32_t bufSize;
} SUdfUvSession; } SClientUdfUvSession;
typedef struct SClientUvTaskNode { typedef struct SClientUvTaskNode {
SUdfdProxy *udfc; SUdfdProxy *udfc;
...@@ -177,7 +177,7 @@ typedef struct SClientUvTaskNode { ...@@ -177,7 +177,7 @@ typedef struct SClientUvTaskNode {
typedef struct SClientUdfTask { typedef struct SClientUdfTask {
int8_t type; int8_t type;
SUdfUvSession *session; SClientUdfUvSession *session;
int32_t errCode; int32_t errCode;
...@@ -209,6 +209,7 @@ typedef struct SClientUvConn { ...@@ -209,6 +209,7 @@ typedef struct SClientUvConn {
uv_pipe_t *pipe; uv_pipe_t *pipe;
QUEUE taskQueue; QUEUE taskQueue;
SClientConnBuf readBuf; SClientConnBuf readBuf;
SClientUdfUvSession *session;
} SClientUvConn; } SClientUvConn;
enum { enum {
...@@ -617,18 +618,17 @@ void onUdfcPipeClose(uv_handle_t *handle) { ...@@ -617,18 +618,17 @@ void onUdfcPipeClose(uv_handle_t *handle) {
QUEUE* h = QUEUE_HEAD(&conn->taskQueue); QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
task->errCode = 0; task->errCode = 0;
uv_sem_post(&task->taskSem);
QUEUE_REMOVE(&task->procTaskQueue); QUEUE_REMOVE(&task->procTaskQueue);
uv_sem_post(&task->taskSem);
} }
conn->session->udfUvPipe = NULL;
taosMemoryFree(conn->readBuf.buf); taosMemoryFree(conn->readBuf.buf);
taosMemoryFree(conn); taosMemoryFree(conn);
taosMemoryFree((uv_pipe_t *) handle); taosMemoryFree((uv_pipe_t *) handle);
} }
int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvTask) { int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
fnDebug("udfc get uv task result. task: %p", task); fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask);
if (uvTask->type == UV_TASK_REQ_RSP) { if (uvTask->type == UV_TASK_REQ_RSP) {
if (uvTask->rspBuf.base != NULL) { if (uvTask->rspBuf.base != NULL) {
SUdfResponse rsp; SUdfResponse rsp;
...@@ -748,8 +748,8 @@ void udfcUvHandleRsp(SClientUvConn *conn) { ...@@ -748,8 +748,8 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
if (taskFound) { if (taskFound) {
taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len); taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
QUEUE_REMOVE(&taskFound->connTaskQueue); QUEUE_REMOVE(&taskFound->connTaskQueue);
uv_sem_post(&taskFound->taskSem);
QUEUE_REMOVE(&taskFound->procTaskQueue); QUEUE_REMOVE(&taskFound->procTaskQueue);
uv_sem_post(&taskFound->taskSem);
} else { } else {
fnError("no task is waiting for the response."); fnError("no task is waiting for the response.");
} }
...@@ -764,14 +764,12 @@ void udfcUvHandleError(SClientUvConn *conn) { ...@@ -764,14 +764,12 @@ void udfcUvHandleError(SClientUvConn *conn) {
QUEUE* h = QUEUE_HEAD(&conn->taskQueue); QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
task->errCode = UDFC_CODE_PIPE_READ_ERR; task->errCode = UDFC_CODE_PIPE_READ_ERR;
uv_sem_post(&task->taskSem); QUEUE_REMOVE(&task->connTaskQueue);
QUEUE_REMOVE(&task->procTaskQueue); QUEUE_REMOVE(&task->procTaskQueue);
uv_sem_post(&task->taskSem);
} }
uv_close((uv_handle_t *) conn->pipe, NULL); uv_close((uv_handle_t *) conn->pipe, onUdfcPipeClose);
taosMemoryFree(conn->pipe);
taosMemoryFree(conn->readBuf.buf);
taosMemoryFree(conn);
} }
void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
...@@ -788,9 +786,9 @@ void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { ...@@ -788,9 +786,9 @@ void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
} }
if (nread < 0) { if (nread < 0) {
fnError("udfc client pipe %p read error: %s", client, uv_strerror(nread)); fnError("udfc client pipe %p read error: %zd, %s.", client, nread, uv_strerror(nread));
if (nread == UV_EOF) { if (nread == UV_EOF) {
fnError("udfc client pipe %p closed", client); fnError("\tudfc client pipe %p closed", client);
} }
udfcUvHandleError(conn); udfcUvHandleError(conn);
} }
...@@ -823,14 +821,14 @@ void onUdfClientConnect(uv_connect_t *connect, int status) { ...@@ -823,14 +821,14 @@ void onUdfClientConnect(uv_connect_t *connect, int status) {
QUEUE_REMOVE(&uvTask->procTaskQueue); QUEUE_REMOVE(&uvTask->procTaskQueue);
} }
int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) { int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) {
SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode)); SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
uvTask->type = uvTaskType; uvTask->type = uvTaskType;
uvTask->udfc = task->session->udfc; uvTask->udfc = task->session->udfc;
if (uvTaskType == UV_TASK_CONNECT) { if (uvTaskType == UV_TASK_CONNECT) {
} else if (uvTaskType == UV_TASK_REQ_RSP) { } else if (uvTaskType == UV_TASK_REQ_RSP) {
uvTask->pipe = task->session->udfSvcPipe; uvTask->pipe = task->session->udfUvPipe;
SUdfRequest request; SUdfRequest request;
request.type = task->type; request.type = task->type;
request.seqNum = atomic_fetch_add_64(&gUdfTaskSeqNum, 1); request.seqNum = atomic_fetch_add_64(&gUdfTaskSeqNum, 1);
...@@ -855,7 +853,7 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN ...@@ -855,7 +853,7 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
uvTask->reqBuf = uv_buf_init(bufBegin, bufLen); uvTask->reqBuf = uv_buf_init(bufBegin, bufLen);
uvTask->seqNum = request.seqNum; uvTask->seqNum = request.seqNum;
} else if (uvTaskType == UV_TASK_DISCONNECT) { } else if (uvTaskType == UV_TASK_DISCONNECT) {
uvTask->pipe = task->session->udfSvcPipe; uvTask->pipe = task->session->udfUvPipe;
} }
uv_sem_init(&uvTask->taskSem, 0); uv_sem_init(&uvTask->taskSem, 0);
...@@ -863,7 +861,7 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN ...@@ -863,7 +861,7 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
return 0; return 0;
} }
int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) { int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
fnTrace("queue uv task to event loop, task: %d, %p", uvTask->type, uvTask); fnTrace("queue uv task to event loop, task: %d, %p", uvTask->type, uvTask);
SUdfdProxy *udfc = uvTask->udfc; SUdfdProxy *udfc = uvTask->udfc;
uv_mutex_lock(&udfc->gUdfTaskQueueMutex); uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
...@@ -872,12 +870,13 @@ int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) { ...@@ -872,12 +870,13 @@ int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) {
uv_async_send(&udfc->gUdfLoopTaskAync); uv_async_send(&udfc->gUdfLoopTaskAync);
uv_sem_wait(&uvTask->taskSem); uv_sem_wait(&uvTask->taskSem);
fnInfo("udfc uv task finished. task: %d, %p", uvTask->type, uvTask);
uv_sem_destroy(&uvTask->taskSem); uv_sem_destroy(&uvTask->taskSem);
return 0; return 0;
} }
int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
fnTrace("event loop start uv task. task: %d, %p", uvTask->type, uvTask); fnTrace("event loop start uv task. task: %d, %p", uvTask->type, uvTask);
switch (uvTask->type) { switch (uvTask->type) {
case UV_TASK_CONNECT: { case UV_TASK_CONNECT: {
...@@ -885,7 +884,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { ...@@ -885,7 +884,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
uv_pipe_init(&uvTask->udfc->gUdfdLoop, pipe, 0); uv_pipe_init(&uvTask->udfc->gUdfdLoop, pipe, 0);
uvTask->pipe = pipe; uvTask->pipe = pipe;
SClientUvConn *conn = taosMemoryMalloc(sizeof(SClientUvConn)); SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn));
conn->pipe = pipe; conn->pipe = pipe;
conn->readBuf.len = 0; conn->readBuf.len = 0;
conn->readBuf.cap = 0; conn->readBuf.cap = 0;
...@@ -933,13 +932,14 @@ void udfClientAsyncCb(uv_async_t *async) { ...@@ -933,13 +932,14 @@ void udfClientAsyncCb(uv_async_t *async) {
QUEUE* h = QUEUE_HEAD(&wq); QUEUE* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
startUvUdfTask(task); udfcStartUvTask(task);
QUEUE_INSERT_TAIL(&udfc->gUvProcTaskQueue, &task->procTaskQueue); QUEUE_INSERT_TAIL(&udfc->gUvProcTaskQueue, &task->procTaskQueue);
} }
} }
void cleanUpUvTasks(SUdfdProxy *udfc) { void cleanUpUvTasks(SUdfdProxy *udfc) {
fnDebug("clean up uv tasks")
QUEUE wq; QUEUE wq;
uv_mutex_lock(&udfc->gUdfTaskQueueMutex); uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
...@@ -956,7 +956,6 @@ void cleanUpUvTasks(SUdfdProxy *udfc) { ...@@ -956,7 +956,6 @@ void cleanUpUvTasks(SUdfdProxy *udfc) {
uv_sem_post(&task->taskSem); uv_sem_post(&task->taskSem);
} }
// TODO: deal with tasks that are waiting result.
while (!QUEUE_EMPTY(&udfc->gUvProcTaskQueue)) { while (!QUEUE_EMPTY(&udfc->gUvProcTaskQueue)) {
QUEUE* h = QUEUE_HEAD(&udfc->gUvProcTaskQueue); QUEUE* h = QUEUE_HEAD(&udfc->gUvProcTaskQueue);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
...@@ -1027,14 +1026,16 @@ int32_t udfcClose() { ...@@ -1027,14 +1026,16 @@ int32_t udfcClose() {
return 0; return 0;
} }
int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) { int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
SClientUvTaskNode *uvTask = NULL; SClientUvTaskNode *uvTask = NULL;
createUdfcUvTask(task, uvTaskType, &uvTask); udfcCreateUvTask(task, uvTaskType, &uvTask);
queueUvUdfTask(uvTask); udfcQueueUvTask(uvTask);
udfcGetUvTaskResponseResult(task, uvTask); udfcGetUdfTaskResultFromUvTask(task, uvTask);
if (uvTaskType == UV_TASK_CONNECT) { if (uvTaskType == UV_TASK_CONNECT) {
task->session->udfSvcPipe = uvTask->pipe; task->session->udfUvPipe = uvTask->pipe;
SClientUvConn *conn = uvTask->pipe->data;
conn->session = task->session;
} }
taosMemoryFree(uvTask); taosMemoryFree(uvTask);
uvTask = NULL; uvTask = NULL;
...@@ -1046,22 +1047,22 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) { ...@@ -1046,22 +1047,22 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
if (gUdfdProxy.gUdfcState != UDFC_STATE_READY) { if (gUdfdProxy.gUdfcState != UDFC_STATE_READY) {
return UDFC_CODE_INVALID_STATE; return UDFC_CODE_INVALID_STATE;
} }
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); SClientUdfTask *task = taosMemoryCalloc(1,sizeof(SClientUdfTask));
task->errCode = 0; task->errCode = 0;
task->session = taosMemoryMalloc(sizeof(SUdfUvSession)); task->session = taosMemoryCalloc(1, sizeof(SClientUdfUvSession));
task->session->udfc = &gUdfdProxy; task->session->udfc = &gUdfdProxy;
task->type = UDF_TASK_SETUP; task->type = UDF_TASK_SETUP;
SUdfSetupRequest *req = &task->_setup.req; SUdfSetupRequest *req = &task->_setup.req;
memcpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN); memcpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
int32_t errCode = udfcRunUvTask(task, UV_TASK_CONNECT); int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
if (errCode != 0) { if (errCode != 0) {
fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfdProxy)->udfdPipeName); fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfdProxy)->udfdPipeName);
return UDFC_CODE_CONNECT_PIPE_ERR; return UDFC_CODE_CONNECT_PIPE_ERR;
} }
udfcRunUvTask(task, UV_TASK_REQ_RSP); udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
SUdfSetupResponse *rsp = &task->_setup.rsp; SUdfSetupResponse *rsp = &task->_setup.rsp;
task->session->severHandle = rsp->udfHandle; task->session->severHandle = rsp->udfHandle;
...@@ -1082,10 +1083,14 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) { ...@@ -1082,10 +1083,14 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
SSDataBlock* output, SUdfInterBuf *newState) { SSDataBlock* output, SUdfInterBuf *newState) {
fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle); fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
SClientUdfUvSession *session = (SClientUdfUvSession *) handle;
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); if (session->udfUvPipe == NULL) {
fnError("No pipe to udfd");
return UDFC_CODE_NO_PIPE;
}
SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
task->errCode = 0; task->errCode = 0;
task->session = (SUdfUvSession *) handle; task->session = (SClientUdfUvSession *) handle;
task->type = UDF_TASK_CALL; task->type = UDF_TASK_CALL;
SUdfCallRequest *req = &task->_call.req; SUdfCallRequest *req = &task->_call.req;
...@@ -1117,7 +1122,7 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf ...@@ -1117,7 +1122,7 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
} }
} }
udfcRunUvTask(task, UV_TASK_REQ_RSP); udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
if (task->errCode != 0) { if (task->errCode != 0) {
fnError("call udf failure. err: %d", task->errCode); fnError("call udf failure. err: %d", task->errCode);
...@@ -1145,9 +1150,10 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf ...@@ -1145,9 +1150,10 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
break; break;
} }
} }
} };
int err = task->errCode;
taosMemoryFree(task); taosMemoryFree(task);
return task->errCode; return err;
} }
int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) { int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
...@@ -1188,28 +1194,36 @@ int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t nu ...@@ -1188,28 +1194,36 @@ int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t nu
convertScalarParamToDataBlock(input, numOfCols, &inputBlock); convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
SSDataBlock resultBlock = {0}; SSDataBlock resultBlock = {0};
int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL); int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
convertDataBlockToScalarParm(&resultBlock, output); if (err == 0) {
convertDataBlockToScalarParm(&resultBlock, output);
}
return err; return err;
} }
int32_t teardownUdf(UdfcFuncHandle handle) { int32_t teardownUdf(UdfcFuncHandle handle) {
fnInfo("tear down udf. udf func handle: %p", handle); fnInfo("tear down udf. udf func handle: %p", handle);
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); SClientUdfUvSession *session = (SClientUdfUvSession *) handle;
if (session->udfUvPipe == NULL) {
fnError("pipe to udfd does not exist");
return UDFC_CODE_NO_PIPE;
}
SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
task->errCode = 0; task->errCode = 0;
task->session = (SUdfUvSession *) handle; task->session = session;
task->type = UDF_TASK_TEARDOWN; task->type = UDF_TASK_TEARDOWN;
SUdfTeardownRequest *req = &task->_teardown.req; SUdfTeardownRequest *req = &task->_teardown.req;
req->udfHandle = task->session->severHandle; req->udfHandle = task->session->severHandle;
udfcRunUvTask(task, UV_TASK_REQ_RSP); udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
SUdfTeardownResponse *rsp = &task->_teardown.rsp; SUdfTeardownResponse *rsp = &task->_teardown.rsp;
int32_t err = task->errCode; int32_t err = task->errCode;
udfcRunUvTask(task, UV_TASK_DISCONNECT); udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
taosMemoryFree(task->session); taosMemoryFree(task->session);
taosMemoryFree(task); taosMemoryFree(task);
...@@ -1219,7 +1233,7 @@ int32_t teardownUdf(UdfcFuncHandle handle) { ...@@ -1219,7 +1233,7 @@ int32_t teardownUdf(UdfcFuncHandle handle) {
//memory layout |---SUdfAggRes----|-----final result-----|---inter result----| //memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
typedef struct SUdfAggRes { typedef struct SUdfAggRes {
SUdfUvSession *session; SClientUdfUvSession *session;
int8_t finalResNum; int8_t finalResNum;
int8_t interResNum; int8_t interResNum;
char* finalResBuf; char* finalResBuf;
...@@ -1242,7 +1256,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult ...@@ -1242,7 +1256,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
if (setupUdf((char*)pCtx->udfName, &handle) != 0) { if (setupUdf((char*)pCtx->udfName, &handle) != 0) {
return false; return false;
} }
SUdfUvSession *session = (SUdfUvSession *)handle; SClientUdfUvSession *session = (SClientUdfUvSession *)handle;
SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo); SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo);
int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize; int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
memset(udfRes, 0, envSize); memset(udfRes, 0, envSize);
...@@ -1250,7 +1264,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult ...@@ -1250,7 +1264,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
udfRes->session = (SUdfUvSession *)handle; udfRes->session = (SClientUdfUvSession *)handle;
SUdfInterBuf buf = {0}; SUdfInterBuf buf = {0};
if (callUdfAggInit(handle, &buf) != 0) { if (callUdfAggInit(handle, &buf) != 0) {
return false; return false;
...@@ -1265,7 +1279,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { ...@@ -1265,7 +1279,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
int32_t numOfCols = pInput->numOfInputCols; int32_t numOfCols = pInput->numOfInputCols;
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
SUdfUvSession *session = udfRes->session; SClientUdfUvSession *session = udfRes->session;
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
...@@ -1315,7 +1329,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { ...@@ -1315,7 +1329,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
SUdfUvSession *session = udfRes->session; SClientUdfUvSession *session = udfRes->session;
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
......
...@@ -26,11 +26,18 @@ int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) { ...@@ -26,11 +26,18 @@ int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) {
SUdfColumnData *resultData = &resultCol->colData; SUdfColumnData *resultData = &resultCol->colData;
resultData->numOfRows = block->numOfRows; resultData->numOfRows = block->numOfRows;
SUdfColumnData *srcData = &block->udfCols[0]->colData;
for (int32_t i = 0; i < resultData->numOfRows; ++i) { for (int32_t i = 0; i < resultData->numOfRows; ++i) {
int32_t luckyNum = 88; int j = 0;
udfColSetRow(resultCol, i, (char*)&luckyNum, false); for (; j < block->numOfCols; ++j) {
if (udfColDataIsNull(block->udfCols[j], i)) {
udfColDataSetNull(resultCol, i);
break;
}
}
if ( j == block->numOfCols) {
int32_t luckyNum = 88;
udfColDataSet(resultCol, i, (char *)&luckyNum, false);
}
} }
return 0; return 0;
......
...@@ -26,24 +26,34 @@ int32_t udf2_start(SUdfInterBuf *buf) { ...@@ -26,24 +26,34 @@ int32_t udf2_start(SUdfInterBuf *buf) {
int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
int64_t sumSquares = *(int64_t*)interBuf->buf; int64_t sumSquares = *(int64_t*)interBuf->buf;
int8_t numOutput = 0;
for (int32_t i = 0; i < block->numOfCols; ++i) { for (int32_t i = 0; i < block->numOfCols; ++i) {
for (int32_t j = 0; j < block->numOfRows; ++j) { for (int32_t j = 0; j < block->numOfRows; ++j) {
SUdfColumn* col = block->udfCols[i]; SUdfColumn* col = block->udfCols[i];
//TODO: check the bitmap for null value if (udfColDataIsNull(col, j)) {
int32_t* rows = (int32_t*)col->colData.fixLenCol.data; continue;
sumSquares += rows[j] * rows[j]; }
char* cell = udfColDataGetData(col, j);
int32_t num = *(int32_t*)cell;
sumSquares += num * num;
numOutput = 1;
} }
} }
*(int64_t*)(newInterBuf->buf) = sumSquares; if (numOutput == 1) {
newInterBuf->bufLen = sizeof(int64_t); *(int64_t*)(newInterBuf->buf) = sumSquares;
//TODO: if all null value, numOfResult = 0; newInterBuf->bufLen = sizeof(int64_t);
newInterBuf->numOfResult = 1; }
newInterBuf->numOfResult = numOutput;
return 0; return 0;
} }
int32_t udf2_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) { int32_t udf2_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) {
//TODO: check numOfResults; if (buf->numOfResult == 0) {
resultData->numOfResult = 0;
return 0;
}
int64_t sumSquares = *(int64_t*)(buf->buf); int64_t sumSquares = *(int64_t*)(buf->buf);
*(double*)(resultData->buf) = sqrt(sumSquares); *(double*)(resultData->buf) = sqrt(sumSquares);
resultData->bufLen = sizeof(double); resultData->bufLen = sizeof(double);
......
...@@ -66,4 +66,4 @@ endi ...@@ -66,4 +66,4 @@ endi
#sql drop function udf1; #sql drop function udf1;
#sql drop function udf2; #sql drop function udf2;
system sh/exec.sh -n dnode1 -s stop -x SIGKILL system sh/exec.sh -n dnode1 -s stop -x SIGTERM
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册