未验证 提交 72ad0453 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #12827 from taosdata/feature/udf

feat: udf code refactoring
......@@ -24,7 +24,6 @@
#include "builtinsimpl.h"
#include "functionMgt.h"
//TODO: add unit test
typedef struct SUdfdData {
bool startCalled;
bool needCleanUp;
......@@ -45,7 +44,15 @@ typedef struct SUdfdData {
SUdfdData udfdGlobal = {0};
int32_t udfStartUdfd(int32_t startDnodeId);
int32_t udfStopUdfd();
static int32_t udfSpawnUdfd(SUdfdData *pData);
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal);
static int32_t udfSpawnUdfd(SUdfdData* pData);
static void udfUdfdCloseWalkCb(uv_handle_t* handle, void* arg);
static void udfUdfdStopAsyncCb(uv_async_t *async);
static void udfWatchUdfd(void *args);
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
......@@ -413,6 +420,34 @@ enum {
UDFC_STATE_STOPPING, // stopping after udfcClose
};
int32_t getUdfdPipeName(char* pipeName, int32_t size);
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup);
void* decodeUdfSetupRequest(const void* buf, SUdfSetupRequest *request);
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf* state);
void* decodeUdfInterBuf(const void* buf, SUdfInterBuf* state);
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call);
void* decodeUdfCallRequest(const void* buf, SUdfCallRequest* call);
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown);
void* decodeUdfTeardownRequest(const void* buf, SUdfTeardownRequest *teardown);
int32_t encodeUdfRequest(void** buf, const SUdfRequest* request);
void* decodeUdfRequest(const void* buf, SUdfRequest* request);
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp);
void* decodeUdfSetupResponse(const void* buf, SUdfSetupResponse* setupRsp);
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp);
void* decodeUdfCallResponse(const void* buf, SUdfCallResponse* callRsp);
int32_t encodeUdfTeardownResponse(void** buf, const SUdfTeardownResponse* teardownRsp);
void* decodeUdfTeardownResponse(const void* buf, SUdfTeardownResponse* teardownResponse);
int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp);
void* decodeUdfResponse(const void* buf, SUdfResponse* rsp);
void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta);
void freeUdfColumn(SUdfColumn* col);
void freeUdfDataDataBlock(SUdfDataBlock *block);
void freeUdfInterBuf(SUdfInterBuf *buf);
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock);
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block);
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output);
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output);
int32_t getUdfdPipeName(char* pipeName, int32_t size) {
char dnodeId[8] = {0};
size_t dnodeIdSize = sizeof(dnodeId);
......@@ -650,7 +685,7 @@ int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp) {
len += encodeUdfTeardownResponse(buf, &rsp->teardownRsp);
break;
default:
//TODO: log error
fnError("encode udf response, invalid udf response type %d", rsp->type);
break;
}
return len;
......@@ -676,7 +711,7 @@ void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) {
buf = decodeUdfTeardownResponse(buf, &rsp->teardownRsp);
break;
default:
//TODO: log error
fnError("decode udf response, invalid udf response type %d", rsp->type);
break;
}
return (void*)buf;
......@@ -817,189 +852,499 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
return 0;
}
void onUdfcPipeClose(uv_handle_t *handle) {
SClientUvConn *conn = handle->data;
if (!QUEUE_EMPTY(&conn->taskQueue)) {
QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
task->errCode = 0;
QUEUE_REMOVE(&task->procTaskQueue);
uv_sem_post(&task->taskSem);
}
conn->session->udfUvPipe = NULL;
taosMemoryFree(conn->readBuf.buf);
taosMemoryFree(conn);
taosMemoryFree((uv_pipe_t *) handle);
}
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask);
if (uvTask->type == UV_TASK_REQ_RSP) {
if (uvTask->rspBuf.base != NULL) {
SUdfResponse rsp = {0};
void* buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
assert(uvTask->rspBuf.len == POINTER_DISTANCE(buf, uvTask->rspBuf.base));
task->errCode = rsp.code;
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
//memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
typedef struct SUdfAggRes {
int8_t finalResNum;
int8_t interResNum;
char* finalResBuf;
char* interResBuf;
} SUdfAggRes;
void onUdfcPipeClose(uv_handle_t *handle);
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask);
void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
bool isUdfcUvMsgComplete(SClientConnBuf *connBuf);
void udfcUvHandleRsp(SClientUvConn *conn);
void udfcUvHandleError(SClientUvConn *conn);
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
void onUdfcPipetWrite(uv_write_t *write, int status);
void onUdfcPipeConnect(uv_connect_t *connect, int status);
int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask);
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask);
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask);
void udfcAsyncTaskCb(uv_async_t *async);
void cleanUpUvTasks(SUdfcProxy *udfc);
void udfStopAsyncCb(uv_async_t *async);
void constructUdfService(void *argsThread);
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType);
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle);
int compareUdfcFuncSub(const void* elem1, const void* elem2);
int32_t doTeardownUdf(UdfcFuncHandle handle);
switch (task->type) {
case UDF_TASK_SETUP: {
//TODO: copy or not
task->_setup.rsp = rsp.setupRsp;
break;
}
case UDF_TASK_CALL: {
task->_call.rsp = rsp.callRsp;
//TODO: copy or not
break;
}
case UDF_TASK_TEARDOWN: {
task->_teardown.rsp = rsp.teardownRsp;
//TODO: copy or not?
break;
}
default: {
break;
}
}
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
SSDataBlock* output, SUdfInterBuf *newState);
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf);
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState);
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf);
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData);
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam* output);
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
int32_t udfcOpen();
int32_t udfcClose();
int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle);
void releaseUdfFuncHandle(char* udfName);
int32_t cleanUpUdfs();
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
// TODO: the call buffer is setup and freed by udf invocation
taosMemoryFree(uvTask->rspBuf.base);
} else {
task->errCode = uvTask->errCode;
}
} else if (uvTask->type == UV_TASK_CONNECT) {
task->errCode = uvTask->errCode;
} else if (uvTask->type == UV_TASK_DISCONNECT) {
task->errCode = uvTask->errCode;
}
return 0;
int compareUdfcFuncSub(const void* elem1, const void* elem2) {
SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
return strcmp(stub1->udfName, stub2->udfName);
}
void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
SClientUvConn *conn = handle->data;
SClientConnBuf *connBuf = &conn->readBuf;
int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
if (connBuf->cap == 0) {
connBuf->buf = taosMemoryMalloc(msgHeadSize);
if (connBuf->buf) {
connBuf->len = 0;
connBuf->cap = msgHeadSize;
connBuf->total = -1;
buf->base = connBuf->buf;
buf->len = connBuf->cap;
int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
int32_t code = 0;
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
SUdfcFuncStub key = {0};
strcpy(key.udfName, udfName);
int32_t stubIndex = taosArraySearchIdx(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
if (stubIndex != -1) {
SUdfcFuncStub *foundStub = taosArrayGet(gUdfdProxy.udfStubs, stubIndex);
UdfcFuncHandle handle = foundStub->handle;
if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) {
*pHandle = foundStub->handle;
++foundStub->refCount;
foundStub->lastRefTime = taosGetTimestampUs();
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return 0;
} else {
fnError("udfc allocate buffer failure. size: %d", msgHeadSize);
buf->base = NULL;
buf->len = 0;
fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache",
udfName, foundStub->refCount, foundStub->lastRefTime);
taosArrayRemove(gUdfdProxy.udfStubs, stubIndex);
}
}
*pHandle = NULL;
code = doSetupUdf(udfName, pHandle);
if (code == TSDB_CODE_SUCCESS) {
SUdfcFuncStub stub = {0};
strcpy(stub.udfName, udfName);
stub.handle = *pHandle;
++stub.refCount;
stub.lastRefTime = taosGetTimestampUs();
taosArrayPush(gUdfdProxy.udfStubs, &stub);
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
} else {
connBuf->cap = connBuf->total > connBuf->cap ? connBuf->total : connBuf->cap;
void *resultBuf = taosMemoryRealloc(connBuf->buf, connBuf->cap);
if (resultBuf) {
connBuf->buf = resultBuf;
buf->base = connBuf->buf + connBuf->len;
buf->len = connBuf->cap - connBuf->len;
} else {
fnError("udfc re-allocate buffer failure. size: %d", connBuf->cap);
buf->base = NULL;
buf->len = 0;
}
*pHandle = NULL;
}
fnTrace("conn buf cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return code;
}
bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
if (connBuf->total == -1 && connBuf->len >= sizeof(int32_t)) {
connBuf->total = *(int32_t *) (connBuf->buf);
void releaseUdfFuncHandle(char* udfName) {
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
SUdfcFuncStub key = {0};
strcpy(key.udfName, udfName);
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
if (!foundStub) {
return;
}
if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
fnTrace("udfc complete message is received, now handle it");
return true;
if (foundStub->refCount > 0) {
--foundStub->refCount;
}
return false;
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
}
void udfcUvHandleRsp(SClientUvConn *conn) {
SClientConnBuf *connBuf = &conn->readBuf;
int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen then seqnum
if (QUEUE_EMPTY(&conn->taskQueue)) {
fnError("udfc no task waiting for response on connection");
return;
}
bool found = false;
SClientUvTaskNode *taskFound = NULL;
QUEUE* h = QUEUE_NEXT(&conn->taskQueue);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
while (h != &conn->taskQueue) {
if (task->seqNum == seqNum) {
if (found == false) {
found = true;
taskFound = task;
int32_t cleanUpUdfs() {
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
int32_t i = 0;
SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
if (stub->refCount == 0) {
fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
doTeardownUdf(stub->handle);
} else {
fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p",
stub->udfName, stub->refCount, stub->lastRefTime, stub->handle);
UdfcFuncHandle handle = stub->handle;
if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) {
taosArrayPush(udfStubs, stub);
} else {
fnError("udfc more than one task waiting for the same response");
continue;
fnInfo("udf invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache",
stub->udfName, stub->refCount, stub->lastRefTime);
}
}
h = QUEUE_NEXT(h);
task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
++i;
}
taosArrayDestroy(gUdfdProxy.udfStubs);
gUdfdProxy.udfStubs = udfStubs;
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return 0;
}
if (taskFound) {
taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
QUEUE_REMOVE(&taskFound->connTaskQueue);
QUEUE_REMOVE(&taskFound->procTaskQueue);
uv_sem_post(&taskFound->taskSem);
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
UdfcFuncHandle handle = NULL;
int32_t code = acquireUdfFuncHandle(udfName, &handle);
if (code != 0) {
return code;
}
SUdfcUvSession *session = handle;
code = doCallUdfScalarFunc(handle, input, numOfCols, output);
if (output->columnData == NULL) {
fnError("udfc scalar function calculate error. no column data");
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
} else {
fnError("no task is waiting for the response.");
if (session->outputType != output->columnData->info.type || session->outputLen != output->columnData->info.bytes) {
fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)", session->outputType,
session->outputLen, output->columnData->info.type, output->columnData->info.bytes);
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
}
}
connBuf->buf = NULL;
connBuf->total = -1;
connBuf->len = 0;
connBuf->cap = 0;
releaseUdfFuncHandle(udfName);
return code;
}
void udfcUvHandleError(SClientUvConn *conn) {
while (!QUEUE_EMPTY(&conn->taskQueue)) {
QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
task->errCode = TSDB_CODE_UDF_PIPE_READ_ERR;
QUEUE_REMOVE(&task->connTaskQueue);
QUEUE_REMOVE(&task->procTaskQueue);
uv_sem_post(&task->taskSem);
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
if (fmIsScalarFunc(pFunc->funcId)) {
return false;
}
uv_close((uv_handle_t *) conn->pipe, onUdfcPipeClose);
pEnv->calcMemSize = sizeof(SUdfAggRes) + pFunc->node.resType.bytes + pFunc->udfBufSize;
return true;
}
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
fnTrace("udfc client %p, client read from pipe. nread: %zd", client, nread);
if (nread == 0) return;
SClientUvConn *conn = client->data;
SClientConnBuf *connBuf = &conn->readBuf;
if (nread > 0) {
connBuf->len += nread;
if (isUdfcUvMsgComplete(connBuf)) {
udfcUvHandleRsp(conn);
}
}
if (nread < 0) {
fnError("udfc client pipe %p read error: %zd, %s.", client, nread, uv_strerror(nread));
if (nread == UV_EOF) {
fnError("\tudfc client pipe %p closed", client);
}
udfcUvHandleError(conn);
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo) {
if (functionSetup(pCtx, pResultCellInfo) != true) {
return false;
}
UdfcFuncHandle handle;
int32_t udfCode = 0;
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
return false;
}
SUdfcUvSession *session = (SUdfcUvSession *)handle;
SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo);
int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
memset(udfRes, 0, envSize);
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
SUdfInterBuf buf = {0};
if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
releaseUdfFuncHandle(pCtx->udfName);
return false;
}
udfRes->interResNum = buf.numOfResult;
if (buf.bufLen <= session->bufSize) {
memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
} else {
fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize);
releaseUdfFuncHandle(pCtx->udfName);
return false;
}
releaseUdfFuncHandle(pCtx->udfName);
freeUdfInterBuf(&buf);
return true;
}
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
int32_t udfCode = 0;
UdfcFuncHandle handle = 0;
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode);
return udfCode;
}
SUdfcUvSession *session = handle;
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
SInputColumnInfoData* pInput = &pCtx->input;
int32_t numOfCols = pInput->numOfInputCols;
int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
SSDataBlock tempBlock = {0};
tempBlock.info.numOfCols = numOfCols;
tempBlock.info.rows = pInput->totalRows;
tempBlock.info.uid = pInput->uid;
bool hasVarCol = false;
tempBlock.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData *col = pInput->pData[i];
if (IS_VAR_DATA_TYPE(col->info.type)) {
hasVarCol = true;
}
taosArrayPush(tempBlock.pDataBlock, col);
}
tempBlock.info.hasVarCol = hasVarCol;
SSDataBlock *inputBlock = blockDataExtractBlock(&tempBlock, start, numOfRows);
SUdfInterBuf state = {.buf = udfRes->interResBuf,
.bufLen = session->bufSize,
.numOfResult = udfRes->interResNum};
SUdfInterBuf newState = {0};
udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
if (udfCode != 0) {
fnError("udfAggProcess error. code: %d", udfCode);
newState.numOfResult = 0;
} else {
udfRes->interResNum = newState.numOfResult;
if (newState.bufLen <= session->bufSize) {
memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
} else {
fnError("udfc inter buf size %d is greater than function bufSize %d", newState.bufLen, session->bufSize);
udfCode = TSDB_CODE_UDF_INVALID_BUFSIZE;
}
}
if (newState.numOfResult == 1 || state.numOfResult == 1) {
GET_RES_INFO(pCtx)->numOfRes = 1;
}
blockDataDestroy(inputBlock);
taosArrayDestroy(tempBlock.pDataBlock);
releaseUdfFuncHandle(pCtx->udfName);
freeUdfInterBuf(&newState);
return udfCode;
}
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
int32_t udfCode = 0;
UdfcFuncHandle handle = 0;
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode);
return udfCode;
}
SUdfcUvSession *session = handle;
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
SUdfInterBuf resultBuf = {0};
SUdfInterBuf state = {.buf = udfRes->interResBuf,
.bufLen = session->bufSize,
.numOfResult = udfRes->interResNum};
int32_t udfCallCode= 0;
udfCallCode= doCallUdfAggFinalize(session, &state, &resultBuf);
if (udfCallCode != 0) {
fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
GET_RES_INFO(pCtx)->numOfRes = 0;
} else {
if (resultBuf.bufLen <= session->outputLen) {
memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen);
udfRes->finalResNum = resultBuf.numOfResult;
GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
} else {
fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->outputLen);
GET_RES_INFO(pCtx)->numOfRes = 0;
udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
}
}
freeUdfInterBuf(&resultBuf);
int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
releaseUdfFuncHandle(pCtx->udfName);
return udfCallCode == 0 ? numOfResults : udfCallCode;
}
void onUdfcPipeClose(uv_handle_t *handle) {
SClientUvConn *conn = handle->data;
if (!QUEUE_EMPTY(&conn->taskQueue)) {
QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
task->errCode = 0;
QUEUE_REMOVE(&task->procTaskQueue);
uv_sem_post(&task->taskSem);
}
conn->session->udfUvPipe = NULL;
taosMemoryFree(conn->readBuf.buf);
taosMemoryFree(conn);
taosMemoryFree((uv_pipe_t *) handle);
}
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask);
if (uvTask->type == UV_TASK_REQ_RSP) {
if (uvTask->rspBuf.base != NULL) {
SUdfResponse rsp = {0};
void* buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
assert(uvTask->rspBuf.len == POINTER_DISTANCE(buf, uvTask->rspBuf.base));
task->errCode = rsp.code;
switch (task->type) {
case UDF_TASK_SETUP: {
task->_setup.rsp = rsp.setupRsp;
break;
}
case UDF_TASK_CALL: {
task->_call.rsp = rsp.callRsp;
break;
}
case UDF_TASK_TEARDOWN: {
task->_teardown.rsp = rsp.teardownRsp;
break;
}
default: {
break;
}
}
// TODO: the call buffer is setup and freed by udf invocation
taosMemoryFree(uvTask->rspBuf.base);
} else {
task->errCode = uvTask->errCode;
}
} else if (uvTask->type == UV_TASK_CONNECT) {
task->errCode = uvTask->errCode;
} else if (uvTask->type == UV_TASK_DISCONNECT) {
task->errCode = uvTask->errCode;
}
return 0;
}
void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
SClientUvConn *conn = handle->data;
SClientConnBuf *connBuf = &conn->readBuf;
int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
if (connBuf->cap == 0) {
connBuf->buf = taosMemoryMalloc(msgHeadSize);
if (connBuf->buf) {
connBuf->len = 0;
connBuf->cap = msgHeadSize;
connBuf->total = -1;
buf->base = connBuf->buf;
buf->len = connBuf->cap;
} else {
fnError("udfc allocate buffer failure. size: %d", msgHeadSize);
buf->base = NULL;
buf->len = 0;
}
} else {
connBuf->cap = connBuf->total > connBuf->cap ? connBuf->total : connBuf->cap;
void *resultBuf = taosMemoryRealloc(connBuf->buf, connBuf->cap);
if (resultBuf) {
connBuf->buf = resultBuf;
buf->base = connBuf->buf + connBuf->len;
buf->len = connBuf->cap - connBuf->len;
} else {
fnError("udfc re-allocate buffer failure. size: %d", connBuf->cap);
buf->base = NULL;
buf->len = 0;
}
}
fnTrace("conn buf cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
}
bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
if (connBuf->total == -1 && connBuf->len >= sizeof(int32_t)) {
connBuf->total = *(int32_t *) (connBuf->buf);
}
if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
fnTrace("udfc complete message is received, now handle it");
return true;
}
return false;
}
void udfcUvHandleRsp(SClientUvConn *conn) {
SClientConnBuf *connBuf = &conn->readBuf;
int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen then seqnum
if (QUEUE_EMPTY(&conn->taskQueue)) {
fnError("udfc no task waiting for response on connection");
return;
}
bool found = false;
SClientUvTaskNode *taskFound = NULL;
QUEUE* h = QUEUE_NEXT(&conn->taskQueue);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
while (h != &conn->taskQueue) {
if (task->seqNum == seqNum) {
if (found == false) {
found = true;
taskFound = task;
} else {
fnError("udfc more than one task waiting for the same response");
continue;
}
}
h = QUEUE_NEXT(h);
task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
}
if (taskFound) {
taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
QUEUE_REMOVE(&taskFound->connTaskQueue);
QUEUE_REMOVE(&taskFound->procTaskQueue);
uv_sem_post(&taskFound->taskSem);
} else {
fnError("no task is waiting for the response.");
}
connBuf->buf = NULL;
connBuf->total = -1;
connBuf->len = 0;
connBuf->cap = 0;
}
void udfcUvHandleError(SClientUvConn *conn) {
while (!QUEUE_EMPTY(&conn->taskQueue)) {
QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
task->errCode = TSDB_CODE_UDF_PIPE_READ_ERR;
QUEUE_REMOVE(&task->connTaskQueue);
QUEUE_REMOVE(&task->procTaskQueue);
uv_sem_post(&task->taskSem);
}
uv_close((uv_handle_t *) conn->pipe, onUdfcPipeClose);
}
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
fnTrace("udfc client %p, client read from pipe. nread: %zd", client, nread);
if (nread == 0) return;
SClientUvConn *conn = client->data;
SClientConnBuf *connBuf = &conn->readBuf;
if (nread > 0) {
connBuf->len += nread;
if (isUdfcUvMsgComplete(connBuf)) {
udfcUvHandleRsp(conn);
}
}
if (nread < 0) {
fnError("udfc client pipe %p read error: %zd, %s.", client, nread, uv_strerror(nread));
if (nread == UV_EOF) {
fnError("\tudfc client pipe %p closed", client);
}
udfcUvHandleError(conn);
}
}
}
void onUdfcPipetWrite(uv_write_t *write, int status) {
SClientUvTaskNode *uvTask = write->data;
uv_pipe_t *pipe = uvTask->pipe;
......@@ -1050,7 +1395,7 @@ int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
request.teardown = task->_teardown.req;
request.type = UDF_TASK_TEARDOWN;
} else {
//TODO log and return error
fnError("udfc create uv task, invalid task type : %d", task->type);
}
int32_t bufLen = encodeUdfRequest(NULL, &request);
request.msgLen = bufLen;
......@@ -1271,134 +1616,47 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
SClientUvConn *conn = uvTask->pipe->data;
conn->session = task->session;
}
taosMemoryFree(uvTask);
uvTask = NULL;
return task->errCode;
}
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
if (gUdfdProxy.udfcState != UDFC_STATE_READY) {
return TSDB_CODE_UDF_INVALID_STATE;
}
SClientUdfTask *task = taosMemoryCalloc(1,sizeof(SClientUdfTask));
task->errCode = 0;
task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession));
task->session->udfc = &gUdfdProxy;
task->type = UDF_TASK_SETUP;
SUdfSetupRequest *req = &task->_setup.req;
strncpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
if (errCode != 0) {
fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfdProxy)->udfdPipeName);
return TSDB_CODE_UDF_PIPE_CONNECT_ERR;
}
udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
SUdfSetupResponse *rsp = &task->_setup.rsp;
task->session->severHandle = rsp->udfHandle;
task->session->outputType = rsp->outputType;
task->session->outputLen = rsp->outputLen;
task->session->bufSize = rsp->bufSize;
strcpy(task->session->udfName, udfName);
if (task->errCode != 0) {
fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode)
} else {
fnInfo("sucessfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
*funcHandle = task->session;
}
int32_t err = task->errCode;
taosMemoryFree(task);
return err;
}
int compareUdfcFuncSub(const void* elem1, const void* elem2) {
SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
return strcmp(stub1->udfName, stub2->udfName);
}
int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
int32_t code = 0;
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
SUdfcFuncStub key = {0};
strcpy(key.udfName, udfName);
int32_t stubIndex = taosArraySearchIdx(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
if (stubIndex != -1) {
SUdfcFuncStub *foundStub = taosArrayGet(gUdfdProxy.udfStubs, stubIndex);
UdfcFuncHandle handle = foundStub->handle;
if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) {
*pHandle = foundStub->handle;
++foundStub->refCount;
foundStub->lastRefTime = taosGetTimestampUs();
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return 0;
} else {
fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache",
udfName, foundStub->refCount, foundStub->lastRefTime);
taosArrayRemove(gUdfdProxy.udfStubs, stubIndex);
}
}
*pHandle = NULL;
code = doSetupUdf(udfName, pHandle);
if (code == TSDB_CODE_SUCCESS) {
SUdfcFuncStub stub = {0};
strcpy(stub.udfName, udfName);
stub.handle = *pHandle;
++stub.refCount;
stub.lastRefTime = taosGetTimestampUs();
taosArrayPush(gUdfdProxy.udfStubs, &stub);
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
} else {
*pHandle = NULL;
}
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return code;
taosMemoryFree(uvTask);
uvTask = NULL;
return task->errCode;
}
void releaseUdfFuncHandle(char* udfName) {
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
SUdfcFuncStub key = {0};
strcpy(key.udfName, udfName);
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
if (!foundStub) {
return;
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
if (gUdfdProxy.udfcState != UDFC_STATE_READY) {
return TSDB_CODE_UDF_INVALID_STATE;
}
if (foundStub->refCount > 0) {
--foundStub->refCount;
SClientUdfTask *task = taosMemoryCalloc(1,sizeof(SClientUdfTask));
task->errCode = 0;
task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession));
task->session->udfc = &gUdfdProxy;
task->type = UDF_TASK_SETUP;
SUdfSetupRequest *req = &task->_setup.req;
strncpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
if (errCode != 0) {
fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfdProxy)->udfdPipeName);
return TSDB_CODE_UDF_PIPE_CONNECT_ERR;
}
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
}
int32_t cleanUpUdfs() {
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
int32_t i = 0;
SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
if (stub->refCount == 0) {
fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
doTeardownUdf(stub->handle);
} else {
fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p",
stub->udfName, stub->refCount, stub->lastRefTime, stub->handle);
UdfcFuncHandle handle = stub->handle;
if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) {
taosArrayPush(udfStubs, stub);
} else {
fnInfo("udf invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache",
stub->udfName, stub->refCount, stub->lastRefTime);
}
}
++i;
udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
SUdfSetupResponse *rsp = &task->_setup.rsp;
task->session->severHandle = rsp->udfHandle;
task->session->outputType = rsp->outputType;
task->session->outputLen = rsp->outputLen;
task->session->bufSize = rsp->bufSize;
strcpy(task->session->udfName, udfName);
if (task->errCode != 0) {
fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode)
} else {
fnInfo("sucessfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
*funcHandle = task->session;
}
taosArrayDestroy(gUdfdProxy.udfStubs);
gUdfdProxy.udfStubs = udfStubs;
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return 0;
int32_t err = task->errCode;
taosMemoryFree(task);
return err;
}
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
......@@ -1524,29 +1782,6 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t
return err;
}
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
UdfcFuncHandle handle = NULL;
int32_t code = acquireUdfFuncHandle(udfName, &handle);
if (code != 0) {
return code;
}
SUdfcUvSession *session = handle;
code = doCallUdfScalarFunc(handle, input, numOfCols, output);
if (output->columnData == NULL) {
fnError("udfc scalar function calculate error. no column data");
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
} else {
if (session->outputType != output->columnData->info.type || session->outputLen != output->columnData->info.bytes) {
fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)", session->outputType,
session->outputLen, output->columnData->info.type, output->columnData->info.bytes);
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
}
}
releaseUdfFuncHandle(udfName);
return code;
}
int32_t doTeardownUdf(UdfcFuncHandle handle) {
SUdfcUvSession *session = (SUdfcUvSession *) handle;
......@@ -1576,165 +1811,3 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
return err;
}
//memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
typedef struct SUdfAggRes {
int8_t finalResNum;
int8_t interResNum;
char* finalResBuf;
char* interResBuf;
} SUdfAggRes;
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
if (fmIsScalarFunc(pFunc->funcId)) {
return false;
}
pEnv->calcMemSize = sizeof(SUdfAggRes) + pFunc->node.resType.bytes + pFunc->udfBufSize;
return true;
}
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo) {
if (functionSetup(pCtx, pResultCellInfo) != true) {
return false;
}
UdfcFuncHandle handle;
int32_t udfCode = 0;
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
return false;
}
SUdfcUvSession *session = (SUdfcUvSession *)handle;
SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo);
int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
memset(udfRes, 0, envSize);
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
SUdfInterBuf buf = {0};
if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
releaseUdfFuncHandle(pCtx->udfName);
return false;
}
udfRes->interResNum = buf.numOfResult;
if (buf.bufLen <= session->bufSize) {
memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
} else {
fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize);
releaseUdfFuncHandle(pCtx->udfName);
return false;
}
releaseUdfFuncHandle(pCtx->udfName);
freeUdfInterBuf(&buf);
return true;
}
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
int32_t udfCode = 0;
UdfcFuncHandle handle = 0;
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode);
return udfCode;
}
SUdfcUvSession *session = handle;
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
SInputColumnInfoData* pInput = &pCtx->input;
int32_t numOfCols = pInput->numOfInputCols;
int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
SSDataBlock tempBlock = {0};
tempBlock.info.numOfCols = numOfCols;
tempBlock.info.rows = pInput->totalRows;
tempBlock.info.uid = pInput->uid;
bool hasVarCol = false;
tempBlock.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData *col = pInput->pData[i];
if (IS_VAR_DATA_TYPE(col->info.type)) {
hasVarCol = true;
}
taosArrayPush(tempBlock.pDataBlock, col);
}
tempBlock.info.hasVarCol = hasVarCol;
SSDataBlock *inputBlock = blockDataExtractBlock(&tempBlock, start, numOfRows);
SUdfInterBuf state = {.buf = udfRes->interResBuf,
.bufLen = session->bufSize,
.numOfResult = udfRes->interResNum};
SUdfInterBuf newState = {0};
udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
if (udfCode != 0) {
fnError("udfAggProcess error. code: %d", udfCode);
newState.numOfResult = 0;
} else {
udfRes->interResNum = newState.numOfResult;
if (newState.bufLen <= session->bufSize) {
memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
} else {
fnError("udfc inter buf size %d is greater than function bufSize %d", newState.bufLen, session->bufSize);
udfCode = TSDB_CODE_UDF_INVALID_BUFSIZE;
}
}
if (newState.numOfResult == 1 || state.numOfResult == 1) {
GET_RES_INFO(pCtx)->numOfRes = 1;
}
blockDataDestroy(inputBlock);
taosArrayDestroy(tempBlock.pDataBlock);
releaseUdfFuncHandle(pCtx->udfName);
freeUdfInterBuf(&newState);
return udfCode;
}
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
int32_t udfCode = 0;
UdfcFuncHandle handle = 0;
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode);
return udfCode;
}
SUdfcUvSession *session = handle;
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
SUdfInterBuf resultBuf = {0};
SUdfInterBuf state = {.buf = udfRes->interResBuf,
.bufLen = session->bufSize,
.numOfResult = udfRes->interResNum};
int32_t udfCallCode= 0;
udfCallCode= doCallUdfAggFinalize(session, &state, &resultBuf);
if (udfCallCode != 0) {
fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
GET_RES_INFO(pCtx)->numOfRes = 0;
} else {
if (resultBuf.bufLen <= session->outputLen) {
memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen);
udfRes->finalResNum = resultBuf.numOfResult;
GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
} else {
fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->outputLen);
GET_RES_INFO(pCtx)->numOfRes = 0;
udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
}
}
freeUdfInterBuf(&resultBuf);
int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
releaseUdfFuncHandle(pCtx->udfName);
return udfCallCode == 0 ? numOfResults : udfCallCode;
}
......@@ -103,6 +103,262 @@ typedef struct SUdfdRpcSendRecvInfo {
uv_sem_t resultSem;
} SUdfdRpcSendRecvInfo;
static void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
static int32_t udfdConnectToMnode();
static int32_t udfdLoadUdf(char *udfName, SUdf *udf);
static bool udfdRpcRfp(int32_t code);
static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
static int32_t udfdOpenClientRpc();
static int32_t udfdCloseClientRpc();
static void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
static void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
static void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
static void udfdProcessRequest(uv_work_t *req);
static void udfdOnWrite(uv_write_t *req, int status);
static void udfdSendResponse(uv_work_t *work, int status);
static void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
static bool isUdfdUvMsgComplete(SUdfdUvConn *pipe);
static void udfdHandleRequest(SUdfdUvConn *conn);
static void udfdPipeCloseCb(uv_handle_t *pipe);
static void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn->client, udfdPipeCloseCb); }
static void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
static void udfdOnNewConnection(uv_stream_t *server, int status);
static void udfdIntrSignalHandler(uv_signal_t *handle, int signum);
static int32_t removeListeningPipe();
static void udfdPrintVersion();
static int32_t udfdParseArgs(int32_t argc, char *argv[]);
static int32_t udfdInitLog();
static void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
static void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf);
static int32_t udfdUvInit();
static void udfdCloseWalkCb(uv_handle_t *handle, void *arg);
static int32_t udfdRun();
void udfdProcessRequest(uv_work_t *req) {
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
SUdfRequest request = {0};
decodeUdfRequest(uvUdf->input.base, &request);
switch (request.type) {
case UDF_TASK_SETUP: {
udfdProcessSetupRequest(uvUdf, &request);
break;
}
case UDF_TASK_CALL: {
udfdProcessCallRequest(uvUdf, &request);
break;
}
case UDF_TASK_TEARDOWN: {
udfdProcessTeardownRequest(uvUdf, &request);
break;
}
default: {
break;
}
}
}
void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
// TODO: tracable id from client. connect, setup, call, teardown
fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
SUdfSetupRequest *setup = &request->setup;
int32_t code = TSDB_CODE_SUCCESS;
SUdf * udf = NULL;
uv_mutex_lock(&global.udfsMutex);
SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName));
if (udfInHash) {
++(*udfInHash)->refCount;
udf = *udfInHash;
uv_mutex_unlock(&global.udfsMutex);
} else {
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
udfNew->refCount = 1;
udfNew->state = UDF_STATE_INIT;
uv_mutex_init(&udfNew->lock);
uv_cond_init(&udfNew->condReady);
udf = udfNew;
taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), &udfNew, sizeof(&udfNew));
uv_mutex_unlock(&global.udfsMutex);
}
uv_mutex_lock(&udf->lock);
if (udf->state == UDF_STATE_INIT) {
udf->state = UDF_STATE_LOADING;
code = udfdLoadUdf(setup->udfName, udf);
if (udf->initFunc) {
udf->initFunc();
}
udf->state = UDF_STATE_READY;
uv_cond_broadcast(&udf->condReady);
uv_mutex_unlock(&udf->lock);
} else {
while (udf->state != UDF_STATE_READY) {
uv_cond_wait(&udf->condReady, &udf->lock);
}
uv_mutex_unlock(&udf->lock);
}
SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
handle->udf = udf;
SUdfResponse rsp;
rsp.seqNum = request->seqNum;
rsp.type = request->type;
rsp.code = code;
rsp.setupRsp.udfHandle = (int64_t)(handle);
rsp.setupRsp.outputType = udf->outputType;
rsp.setupRsp.outputLen = udf->outputLen;
rsp.setupRsp.bufSize = udf->bufSize;
int32_t len = encodeUdfResponse(NULL, &rsp);
rsp.msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
void *buf = bufBegin;
encodeUdfResponse(&buf, &rsp);
uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(uvUdf->input.base);
return;
}
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfCallRequest *call = &request->call;
fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request->seqNum, call->callType, call->udfHandle);
SUdfcFuncHandle * handle = (SUdfcFuncHandle *)(call->udfHandle);
SUdf * udf = handle->udf;
SUdfResponse response = {0};
SUdfResponse * rsp = &response;
SUdfCallResponse *subRsp = &rsp->callRsp;
int32_t code = TSDB_CODE_SUCCESS;
switch (call->callType) {
case TSDB_UDF_CALL_SCALA_PROC: {
SUdfColumn output = {0};
SUdfDataBlock input = {0};
convertDataBlockToUdfDataBlock(&call->block, &input);
code = udf->scalarProcFunc(&input, &output);
freeUdfDataDataBlock(&input);
convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
freeUdfColumn(&output);
break;
}
case TSDB_UDF_CALL_AGG_INIT: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
udf->aggStartFunc(&outBuf);
subRsp->resultBuf = outBuf;
break;
}
case TSDB_UDF_CALL_AGG_PROC: {
SUdfDataBlock input = {0};
convertDataBlockToUdfDataBlock(&call->block, &input);
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
code = udf->aggProcFunc(&input, &call->interBuf, &outBuf);
freeUdfInterBuf(&call->interBuf);
freeUdfDataDataBlock(&input);
subRsp->resultBuf = outBuf;
break;
}
case TSDB_UDF_CALL_AGG_FIN: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
code = udf->aggFinishFunc(&call->interBuf, &outBuf);
freeUdfInterBuf(&call->interBuf);
subRsp->resultBuf = outBuf;
break;
}
default:
break;
}
rsp->seqNum = request->seqNum;
rsp->type = request->type;
rsp->code = code;
subRsp->callType = call->callType;
int32_t len = encodeUdfResponse(NULL, rsp);
rsp->msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
void *buf = bufBegin;
encodeUdfResponse(&buf, rsp);
uvUdf->output = uv_buf_init(bufBegin, len);
switch (call->callType) {
case TSDB_UDF_CALL_SCALA_PROC: {
tDeleteSSDataBlock(&call->block);
tDeleteSSDataBlock(&subRsp->resultData);
break;
}
case TSDB_UDF_CALL_AGG_INIT: {
freeUdfInterBuf(&subRsp->resultBuf);
break;
}
case TSDB_UDF_CALL_AGG_PROC: {
tDeleteSSDataBlock(&call->block);
freeUdfInterBuf(&subRsp->resultBuf);
break;
}
case TSDB_UDF_CALL_AGG_FIN: {
freeUdfInterBuf(&subRsp->resultBuf);
break;
}
default:
break;
}
taosMemoryFree(uvUdf->input.base);
return;
}
void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfTeardownRequest *teardown = &request->teardown;
fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle);
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
SUdf * udf = handle->udf;
bool unloadUdf = false;
int32_t code = TSDB_CODE_SUCCESS;
uv_mutex_lock(&global.udfsMutex);
udf->refCount--;
if (udf->refCount == 0) {
unloadUdf = true;
taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
}
uv_mutex_unlock(&global.udfsMutex);
if (unloadUdf) {
uv_cond_destroy(&udf->condReady);
uv_mutex_destroy(&udf->lock);
if (udf->destroyFunc) {
(udf->destroyFunc)();
}
uv_dlclose(&udf->lib);
taosMemoryFree(udf);
}
taosMemoryFree(handle);
SUdfResponse response;
SUdfResponse *rsp = &response;
rsp->seqNum = request->seqNum;
rsp->type = request->type;
rsp->code = code;
int32_t len = encodeUdfResponse(NULL, rsp);
rsp->msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
void *buf = bufBegin;
encodeUdfResponse(&buf, rsp);
uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(uvUdf->input.base);
return;
}
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle;
ASSERT(pMsg->info.ahandle != NULL);
......@@ -147,8 +403,11 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
snprintf(path, sizeof(path), "%s/lib%s.so", TD_TMP_DIR_PATH, pFuncInfo->name);
TdFilePtr file =
taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
// TODO check for failure of flush to disk
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
if (count != pFuncInfo->codeSize) {
fnError("udfd write udf shared library failed");
msgInfo->code = TSDB_CODE_FILE_CORRUPTED;
}
taosCloseFile(&file);
strncpy(udf->path, path, strlen(path));
tFreeSFuncInfo(pFuncInfo);
......@@ -265,235 +524,88 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
char *startSuffix = "_start";
strncpy(startFuncName, processFuncName, strlen(processFuncName));
strncat(startFuncName, startSuffix, strlen(startSuffix));
uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggStartFunc));
char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
char *finishSuffix = "_finish";
strncpy(finishFuncName, processFuncName, strlen(processFuncName));
strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
// TODO: merge
}
return 0;
}
void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
// TODO: tracable id from client. connect, setup, call, teardown
fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
SUdfSetupRequest *setup = &request->setup;
int32_t code = TSDB_CODE_SUCCESS;
SUdf * udf = NULL;
uv_mutex_lock(&global.udfsMutex);
SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName));
if (udfInHash) {
++(*udfInHash)->refCount;
udf = *udfInHash;
uv_mutex_unlock(&global.udfsMutex);
} else {
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
udfNew->refCount = 1;
udfNew->state = UDF_STATE_INIT;
uv_mutex_init(&udfNew->lock);
uv_cond_init(&udfNew->condReady);
udf = udfNew;
taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), &udfNew, sizeof(&udfNew));
uv_mutex_unlock(&global.udfsMutex);
}
uv_mutex_lock(&udf->lock);
if (udf->state == UDF_STATE_INIT) {
udf->state = UDF_STATE_LOADING;
code = udfdLoadUdf(setup->udfName, udf);
if (udf->initFunc) {
udf->initFunc();
}
udf->state = UDF_STATE_READY;
uv_cond_broadcast(&udf->condReady);
uv_mutex_unlock(&udf->lock);
} else {
while (udf->state != UDF_STATE_READY) {
uv_cond_wait(&udf->condReady, &udf->lock);
}
uv_mutex_unlock(&udf->lock);
}
SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
handle->udf = udf;
SUdfResponse rsp;
rsp.seqNum = request->seqNum;
rsp.type = request->type;
rsp.code = code;
rsp.setupRsp.udfHandle = (int64_t)(handle);
rsp.setupRsp.outputType = udf->outputType;
rsp.setupRsp.outputLen = udf->outputLen;
rsp.setupRsp.bufSize = udf->bufSize;
int32_t len = encodeUdfResponse(NULL, &rsp);
rsp.msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
void *buf = bufBegin;
encodeUdfResponse(&buf, &rsp);
uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(uvUdf->input.base);
return;
}
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfCallRequest *call = &request->call;
fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request->seqNum, call->callType, call->udfHandle);
SUdfcFuncHandle * handle = (SUdfcFuncHandle *)(call->udfHandle);
SUdf * udf = handle->udf;
SUdfResponse response = {0};
SUdfResponse * rsp = &response;
SUdfCallResponse *subRsp = &rsp->callRsp;
int32_t code = TSDB_CODE_SUCCESS;
switch (call->callType) {
case TSDB_UDF_CALL_SCALA_PROC: {
SUdfColumn output = {0};
SUdfDataBlock input = {0};
convertDataBlockToUdfDataBlock(&call->block, &input);
code = udf->scalarProcFunc(&input, &output);
freeUdfDataDataBlock(&input);
convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
freeUdfColumn(&output);
break;
}
case TSDB_UDF_CALL_AGG_INIT: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
udf->aggStartFunc(&outBuf);
subRsp->resultBuf = outBuf;
break;
}
case TSDB_UDF_CALL_AGG_PROC: {
SUdfDataBlock input = {0};
convertDataBlockToUdfDataBlock(&call->block, &input);
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
code = udf->aggProcFunc(&input, &call->interBuf, &outBuf);
freeUdfInterBuf(&call->interBuf);
freeUdfDataDataBlock(&input);
subRsp->resultBuf = outBuf;
break;
}
case TSDB_UDF_CALL_AGG_FIN: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
code = udf->aggFinishFunc(&call->interBuf, &outBuf);
freeUdfInterBuf(&call->interBuf);
subRsp->resultBuf = outBuf;
break;
}
default:
break;
uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggStartFunc));
char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
char *finishSuffix = "_finish";
strncpy(finishFuncName, processFuncName, strlen(processFuncName));
strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
// TODO: merge
}
return 0;
}
static bool udfdRpcRfp(int32_t code) {
if (code == TSDB_CODE_RPC_REDIRECT) {
return true;
} else {
return false;
}
}
rsp->seqNum = request->seqNum;
rsp->type = request->type;
rsp->code = code;
subRsp->callType = call->callType;
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
pEpSet->version = 0;
int32_t len = encodeUdfResponse(NULL, rsp);
rsp->msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
void *buf = bufBegin;
encodeUdfResponse(&buf, rsp);
uvUdf->output = uv_buf_init(bufBegin, len);
// init mnode ip set
SEpSet *mgmtEpSet = &(pEpSet->epSet);
mgmtEpSet->numOfEps = 0;
mgmtEpSet->inUse = 0;
switch (call->callType) {
case TSDB_UDF_CALL_SCALA_PROC: {
tDeleteSSDataBlock(&call->block);
tDeleteSSDataBlock(&subRsp->resultData);
break;
}
case TSDB_UDF_CALL_AGG_INIT: {
freeUdfInterBuf(&subRsp->resultBuf);
break;
}
case TSDB_UDF_CALL_AGG_PROC: {
tDeleteSSDataBlock(&call->block);
freeUdfInterBuf(&subRsp->resultBuf);
break;
}
case TSDB_UDF_CALL_AGG_FIN: {
freeUdfInterBuf(&subRsp->resultBuf);
break;
if (firstEp && firstEp[0] != 0) {
if (strlen(firstEp) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
default:
break;
}
taosMemoryFree(uvUdf->input.base);
return;
}
void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfTeardownRequest *teardown = &request->teardown;
fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle);
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
SUdf * udf = handle->udf;
bool unloadUdf = false;
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return terrno;
}
uv_mutex_lock(&global.udfsMutex);
udf->refCount--;
if (udf->refCount == 0) {
unloadUdf = true;
taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
mgmtEpSet->numOfEps++;
}
uv_mutex_unlock(&global.udfsMutex);
if (unloadUdf) {
uv_cond_destroy(&udf->condReady);
uv_mutex_destroy(&udf->lock);
if (udf->destroyFunc) {
(udf->destroyFunc)();
if (secondEp && secondEp[0] != 0) {
if (strlen(secondEp) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
uv_dlclose(&udf->lib);
taosMemoryFree(udf);
taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
mgmtEpSet->numOfEps++;
}
taosMemoryFree(handle);
SUdfResponse response;
SUdfResponse *rsp = &response;
rsp->seqNum = request->seqNum;
rsp->type = request->type;
rsp->code = code;
int32_t len = encodeUdfResponse(NULL, rsp);
rsp->msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
void *buf = bufBegin;
encodeUdfResponse(&buf, rsp);
uvUdf->output = uv_buf_init(bufBegin, len);
if (mgmtEpSet->numOfEps == 0) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
taosMemoryFree(uvUdf->input.base);
return;
return 0;
}
void udfdProcessRequest(uv_work_t *req) {
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
SUdfRequest request = {0};
decodeUdfRequest(uvUdf->input.base, &request);
switch (request.type) {
case UDF_TASK_SETUP: {
udfdProcessSetupRequest(uvUdf, &request);
break;
}
int32_t udfdOpenClientRpc() {
SRpcInit rpcInit = {0};
rpcInit.label = "UDFD";
rpcInit.numOfThreads = 1;
rpcInit.cfp = (RpcCfp)udfdProcessRpcRsp;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = TSDB_DEFAULT_USER;
rpcInit.parent = &global;
rpcInit.rfp = udfdRpcRfp;
case UDF_TASK_CALL: {
udfdProcessCallRequest(uvUdf, &request);
break;
}
case UDF_TASK_TEARDOWN: {
udfdProcessTeardownRequest(uvUdf, &request);
break;
}
default: {
break;
}
global.clientRpc = rpcOpen(&rpcInit);
if (global.clientRpc == NULL) {
fnError("failed to init dnode rpc client");
return -1;
}
return 0;
}
int32_t udfdCloseClientRpc() {
rpcClose(global.clientRpc);
return 0;
}
void udfdOnWrite(uv_write_t *req, int status) {
......@@ -529,7 +641,7 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
buf->base = ctx->inputBuf;
buf->len = ctx->inputCap;
} else {
// TODO: log error
fnError("udfd can not allocate enough memory")
buf->base = NULL;
buf->len = 0;
}
......@@ -541,7 +653,7 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
buf->base = ctx->inputBuf + ctx->inputLen;
buf->len = ctx->inputCap - ctx->inputLen;
} else {
// TODO: log error
fnError("udfd can not allocate enough memory")
buf->base = NULL;
buf->len = 0;
}
......@@ -580,8 +692,6 @@ void udfdPipeCloseCb(uv_handle_t *pipe) {
taosMemoryFree(conn);
}
void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn->client, udfdPipeCloseCb); }
void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
fnDebug("udf read %zd bytes from client", nread);
if (nread == 0) return;
......@@ -599,7 +709,7 @@ void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
}
if (nread < 0) {
fnDebug("Receive error %s", uv_err_name(nread));
fnError("Receive error %s", uv_err_name(nread));
if (nread == UV_EOF) {
// TODO check more when close
} else {
......@@ -638,91 +748,6 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
uv_stop(global.loop);
}
static bool udfdRpcRfp(int32_t code) {
if (code == TSDB_CODE_RPC_REDIRECT) {
return true;
} else {
return false;
}
}
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
pEpSet->version = 0;
// init mnode ip set
SEpSet *mgmtEpSet = &(pEpSet->epSet);
mgmtEpSet->numOfEps = 0;
mgmtEpSet->inUse = 0;
if (firstEp && firstEp[0] != 0) {
if (strlen(firstEp) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return terrno;
}
mgmtEpSet->numOfEps++;
}
if (secondEp && secondEp[0] != 0) {
if (strlen(secondEp) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
mgmtEpSet->numOfEps++;
}
if (mgmtEpSet->numOfEps == 0) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
return 0;
}
int32_t udfdOpenClientRpc() {
SRpcInit rpcInit = {0};
rpcInit.label = "UDFD";
rpcInit.numOfThreads = 1;
rpcInit.cfp = (RpcCfp)udfdProcessRpcRsp;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = TSDB_DEFAULT_USER;
rpcInit.parent = &global;
rpcInit.rfp = udfdRpcRfp;
global.clientRpc = rpcOpen(&rpcInit);
if (global.clientRpc == NULL) {
fnError("failed to init dnode rpc client");
return -1;
}
return 0;
}
int32_t udfdCloseClientRpc() {
rpcClose(global.clientRpc);
return 0;
}
static void udfdPrintVersion() {
#ifdef TD_ENTERPRISE
char *releaseName = "enterprise";
#else
char *releaseName = "community";
#endif
printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version);
printf("gitinfo: %s\n", gitinfo);
printf("buildInfo: %s\n", buildinfo);
}
static int32_t udfdParseArgs(int32_t argc, char *argv[]) {
for (int32_t i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-c") == 0) {
......@@ -745,6 +770,17 @@ static int32_t udfdParseArgs(int32_t argc, char *argv[]) {
return 0;
}
static void udfdPrintVersion() {
#ifdef TD_ENTERPRISE
char *releaseName = "enterprise";
#else
char *releaseName = "community";
#endif
printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version);
printf("gitinfo: %s\n", gitinfo);
printf("buildInfo: %s\n", buildinfo);
}
static int32_t udfdInitLog() {
char logName[12] = {0};
snprintf(logName, sizeof(logName), "%slog", "udfd");
......@@ -868,8 +904,8 @@ int main(int argc, char *argv[]) {
int32_t retryMnodeTimes = 0;
int32_t code = 0;
while (retryMnodeTimes++ < TSDB_MAX_REPLICA) {
uv_sleep(500 * (1 << retryMnodeTimes));
while (retryMnodeTimes++ <= TSDB_MAX_REPLICA) {
uv_sleep(100 * (1 << retryMnodeTimes));
code = udfdConnectToMnode();
if (code == 0) {
break;
......@@ -890,6 +926,7 @@ int main(int argc, char *argv[]) {
udfdRun();
removeListeningPipe();
udfdCloseClientRpc();
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册