diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 32e57565d4b94d1aa71ed9cc434983e96c309a9a..0b309bc8f56abd3870a07eebdb94e61f357ee23c 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -837,9 +837,34 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { } int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) { - output->info.rows = input->numOfRows; + int32_t numOfRows = 0; + for (int32_t i = 0; i < numOfCols; ++i) { + numOfRows = (input[i].numOfRows > numOfRows) ? input[i].numOfRows : numOfRows; + } + output->info.rows = numOfRows; output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); for (int32_t i = 0; i < numOfCols; ++i) { + if ((input+i)->numOfRows < numOfRows) { + SColumnInfoData* pColInfoData = (input+i)->columnData; + int32_t startRow = (input+i)->numOfRows; + int32_t expandRows = numOfRows - startRow; + colInfoDataEnsureCapacity(pColInfoData, numOfRows, false); + bool isNull = colDataIsNull_s(pColInfoData, (input+i)->numOfRows - 1); + if (isNull) { + colDataAppendNNULL(pColInfoData, startRow, expandRows); + } else { + char* src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1); + int32_t bytes = pColInfoData->info.bytes; + char* data = taosMemoryMalloc(bytes); + memcpy(data, src, bytes); + for (int j = 0; j < expandRows; ++j) { + colDataAppend(pColInfoData, startRow+j, data, false); + } + //colDataAppendNItems(pColInfoData, startRow, data, expandRows); + taosMemoryFree(data); + } + } + taosArrayPush(output->pDataBlock, (input + i)->columnData); if (IS_VAR_DATA_TYPE((input + i)->columnData->info.type)) { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 2f3db636c801277bc6e4809f1a9c3b6dd50283ed..40c75ce6baacbaad258ad10874af16f9466ad2d4 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -28,39 +28,46 @@ #include "tmsg.h" #include "trpc.h" #include "tmisce.h" -// clang-foramt on +// clang-format on typedef struct SUdfdContext { - uv_loop_t * loop; + uv_loop_t *loop; uv_pipe_t ctrlPipe; uv_signal_t intrSignal; char listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2]; uv_pipe_t listeningPipe; - void * clientRpc; + void *clientRpc; SCorEpSet mgmtEp; uv_mutex_t udfsMutex; - SHashObj * udfsHash; + SHashObj *udfsHash; - SArray* residentFuncs; + SArray *residentFuncs; bool printVersion; } SUdfdContext; SUdfdContext global; +struct SUdfdUvConn; +struct SUvUdfWork; + typedef struct SUdfdUvConn { uv_stream_t *client; - char * inputBuf; + char *inputBuf; int32_t inputLen; int32_t inputCap; int32_t inputTotal; + + struct SUvUdfWork *pWorkList; // head of work list } SUdfdUvConn; typedef struct SUvUdfWork { - uv_stream_t *client; + SUdfdUvConn *conn; uv_buf_t input; uv_buf_t output; + + struct SUvUdfWork *pWorkNext; } SUvUdfWork; typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY, UDF_STATE_UNLOADING } EUdfState; @@ -70,7 +77,7 @@ typedef struct SUdf { EUdfState state; uv_mutex_t lock; uv_cond_t condReady; - bool resident; + bool resident; char name[TSDB_FUNC_NAME_LEN + 1]; int8_t funcType; @@ -107,7 +114,7 @@ typedef enum EUdfdRpcReqRspType { typedef struct SUdfdRpcSendRecvInfo { EUdfdRpcReqRspType rpcType; int32_t code; - void * param; + void *param; uv_sem_t resultSem; } SUdfdRpcSendRecvInfo; @@ -178,7 +185,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName); SUdfSetupRequest *setup = &request->setup; int32_t code = TSDB_CODE_SUCCESS; - SUdf * udf = NULL; + SUdf *udf = NULL; uv_mutex_lock(&global.udfsMutex); SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName)); if (udfInHash) { @@ -193,7 +200,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { uv_cond_init(&udfNew->condReady); udf = udfNew; - SUdf** pUdf = &udf; + SUdf **pUdf = &udf; taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), pUdf, POINTER_BYTES); uv_mutex_unlock(&global.udfsMutex); } @@ -207,7 +214,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { } udf->resident = false; for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { - char* funcName = taosArrayGet(global.residentFuncs, i); + char *funcName = taosArrayGet(global.residentFuncs, i); if (strcmp(setup->udfName, funcName) == 0) { udf->resident = true; break; @@ -248,11 +255,12 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { SUdfCallRequest *call = &request->call; - fnDebug("call request. call type %d, handle: %" PRIx64 ", seq num %" PRId64 , call->callType, call->udfHandle, request->seqNum); - SUdfcFuncHandle * handle = (SUdfcFuncHandle *)(call->udfHandle); - SUdf * udf = handle->udf; + fnDebug("call request. call type %d, handle: %" PRIx64 ", seq num %" PRId64, call->callType, call->udfHandle, + request->seqNum); + SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle); + SUdf *udf = handle->udf; SUdfResponse response = {0}; - SUdfResponse * rsp = &response; + SUdfResponse *rsp = &response; SUdfCallResponse *subRsp = &rsp->callRsp; int32_t code = TSDB_CODE_SUCCESS; @@ -352,7 +360,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { SUdfTeardownRequest *teardown = &request->teardown; fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle); SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle); - SUdf * udf = handle->udf; + SUdf *udf = handle->udf; bool unloadUdf = false; int32_t code = TSDB_CODE_SUCCESS; @@ -409,15 +417,14 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) { SConnectRsp connectRsp = {0}; tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); - + int32_t now = taosGetTimestampSec(); int32_t delta = abs(now - connectRsp.svrTimestamp); if (delta > 900) { msgInfo->code = TSDB_CODE_TIME_UNSYNCED; goto _return; } - - + if (connectRsp.epSet.numOfEps == 0) { msgInfo->code = TSDB_CODE_APP_ERROR; goto _return; @@ -434,7 +441,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { goto _return; } SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); - SUdf * udf = msgInfo->param; + SUdf *udf = msgInfo->param; udf->funcType = pFuncInfo->funcType; udf->scriptType = pFuncInfo->scriptType; udf->outputType = pFuncInfo->outputType; @@ -487,7 +494,7 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) { taosArrayPush(retrieveReq.pFuncNames, udfName); int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); - void * pReq = rpcMallocCont(contLen); + void *pReq = rpcMallocCont(contLen); tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq); taosArrayDestroy(retrieveReq.pFuncNames); @@ -522,7 +529,7 @@ int32_t udfdConnectToMnode() { connReq.startTime = taosGetTimestampMs(); int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq); - void * pReq = rpcMallocCont(contLen); + void *pReq = rpcMallocCont(contLen); tSerializeSConnectReq(pReq, contLen, &connReq); SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); @@ -589,7 +596,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { strncpy(finishFuncName, processFuncName, sizeof(finishFuncName)); strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc)); - char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; + char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; char *mergeSuffix = "_merge"; strncpy(mergeFuncName, processFuncName, sizeof(mergeFuncName)); strncat(mergeFuncName, mergeSuffix, strlen(mergeSuffix)); @@ -601,9 +608,10 @@ static bool udfdRpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) { - if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) { + if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || + msgType == TDMT_SCH_MERGE_FETCH) { return false; - } + } return true; } else { return false; @@ -663,7 +671,7 @@ int32_t udfdOpenClientRpc() { rpcInit.parent = &global; rpcInit.rfp = udfdRpcRfp; rpcInit.compressSize = tsCompressMsgSize; - + global.clientRpc = rpcOpen(&rpcInit); if (global.clientRpc == NULL) { fnError("failed to init dnode rpc client"); @@ -684,6 +692,17 @@ void udfdOnWrite(uv_write_t *req, int status) { if (status < 0) { fnError("udfd send response error, length: %zu code: %s", work->output.len, uv_err_name(status)); } + // remove work from the connection work list + if (work->conn != NULL) { + SUvUdfWork **ppWork; + for (ppWork = &work->conn->pWorkList; *ppWork && (*ppWork != work); ppWork = &((*ppWork)->pWorkNext)) { + } + if (*ppWork == work) { + *ppWork = work->pWorkNext; + } else { + fnError("work not in conn any more"); + } + } taosMemoryFree(work->output.base); taosMemoryFree(work); taosMemoryFree(req); @@ -692,10 +711,11 @@ void udfdOnWrite(uv_write_t *req, int status) { void udfdSendResponse(uv_work_t *work, int status) { SUvUdfWork *udfWork = (SUvUdfWork *)(work->data); - uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t)); - write_req->data = udfWork; - uv_write(write_req, udfWork->client, &udfWork->output, 1, udfdOnWrite); - + if (udfWork->conn != NULL) { + uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t)); + write_req->data = udfWork; + uv_write(write_req, udfWork->conn->client, &udfWork->output, 1, udfdOnWrite); + } taosMemoryFree(work); } @@ -716,8 +736,8 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { buf->len = 0; } } else if (ctx->inputTotal == -1 && ctx->inputLen < msgHeadSize) { - buf->base = ctx->inputBuf + ctx->inputLen; - buf->len = msgHeadSize - ctx->inputLen; + buf->base = ctx->inputBuf + ctx->inputLen; + buf->len = msgHeadSize - ctx->inputLen; } else { ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap; void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap); @@ -744,10 +764,15 @@ bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) { } void udfdHandleRequest(SUdfdUvConn *conn) { - uv_work_t * work = taosMemoryMalloc(sizeof(uv_work_t)); + char *inputBuf = conn->inputBuf; + int32_t inputLen = conn->inputLen; + + uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t)); SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork)); - udfWork->client = conn->client; - udfWork->input = uv_buf_init(conn->inputBuf, conn->inputLen); + udfWork->conn = conn; + udfWork->pWorkNext = conn->pWorkList; + conn->pWorkList = udfWork; + udfWork->input = uv_buf_init(inputBuf, inputLen); conn->inputBuf = NULL; conn->inputLen = 0; conn->inputCap = 0; @@ -758,13 +783,19 @@ void udfdHandleRequest(SUdfdUvConn *conn) { void udfdPipeCloseCb(uv_handle_t *pipe) { SUdfdUvConn *conn = pipe->data; + SUvUdfWork* pWork = conn->pWorkList; + while (pWork != NULL) { + pWork->conn = NULL; + pWork = pWork->pWorkNext; + } + taosMemoryFree(conn->client); taosMemoryFree(conn->inputBuf); taosMemoryFree(conn); } void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { - fnDebug("udf read %zd bytes from client", nread); + fnDebug("udfd read %zd bytes from client", nread); if (nread == 0) return; SUdfdUvConn *conn = client->data; @@ -780,10 +811,10 @@ void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { } if (nread < 0) { - fnError("Receive error %s", uv_err_name(nread)); if (nread == UV_EOF) { - // TODO check more when close + fnInfo("udfd pipe read EOF"); } else { + fnError("Receive error %s", uv_err_name(nread)); } udfdUvHandleError(conn); } @@ -799,6 +830,7 @@ void udfdOnNewConnection(uv_stream_t *server, int status) { uv_pipe_init(global.loop, client, 0); if (uv_accept(server, (uv_stream_t *)client) == 0) { SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn)); + ctx->pWorkList = NULL; ctx->client = (uv_stream_t *)client; ctx->inputBuf = 0; ctx->inputLen = 0; @@ -891,7 +923,7 @@ static int32_t udfdUvInit() { } global.loop = loop; - if (tsStartUdfd) { // udfd is started by taosd, which shall exit when taosd exit + if (tsStartUdfd) { // udfd is started by taosd, which shall exit when taosd exit uv_pipe_init(global.loop, &global.ctrlPipe, 1); uv_pipe_open(&global.ctrlPipe, 0); uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); @@ -966,10 +998,10 @@ int32_t udfdInitResidentFuncs() { } global.residentFuncs = taosArrayInit(2, TSDB_FUNC_NAME_LEN); - char* pSave = tsUdfdResFuncs; - char* token; + char *pSave = tsUdfdResFuncs; + char *token; while ((token = strtok_r(pSave, ",", &pSave)) != NULL) { - char func[TSDB_FUNC_NAME_LEN+1] = {0}; + char func[TSDB_FUNC_NAME_LEN + 1] = {0}; strncpy(func, token, TSDB_FUNC_NAME_LEN); fnInfo("udfd add resident function %s", func); taosArrayPush(global.residentFuncs, func); @@ -980,10 +1012,10 @@ int32_t udfdInitResidentFuncs() { int32_t udfdDeinitResidentFuncs() { for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { - char* funcName = taosArrayGet(global.residentFuncs, i); - SUdf** udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName)); + char *funcName = taosArrayGet(global.residentFuncs, i); + SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName)); if (udfInHash) { - SUdf* udf = *udfInHash; + SUdf *udf = *udfInHash; if (udf->destroyFunc) { (udf->destroyFunc)(); } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 4cbba106ba4ef117bdf9c7fa21f73273a7eee389..5361ba356fccc9a31f96e35b6587c3a777661776 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -174,6 +174,7 @@ ,,y,script,./test.sh -f tsim/query/scalarNull.sim ,,y,script,./test.sh -f tsim/query/session.sim ,,y,script,./test.sh -f tsim/query/udf.sim +,,y,script,./test.sh -f tsim/query/udf_with_const.sim ,,y,script,./test.sh -f tsim/qnode/basic1.sim ,,y,script,./test.sh -f tsim/snode/basic1.sim ,,y,script,./test.sh -f tsim/mnode/basic1.sim diff --git a/tests/script/sh/compile_udf.sh b/tests/script/sh/compile_udf.sh index c7148d7d7d5200fe12cb2af32172dfd6b1d9c68a..5265e5a99b3400ac93702d7ee8a972f5d2893b33 100755 --- a/tests/script/sh/compile_udf.sh +++ b/tests/script/sh/compile_udf.sh @@ -1,10 +1,11 @@ set +e -rm -rf /tmp/udf/libbitand.so /tmp/udf/libsqrsum.so +rm -rf /tmp/udf/libbitand.so /tmp/udf/libsqrsum.so /tmp/udf/libgpd.so mkdir -p /tmp/udf echo "compile udf bit_and and sqr_sum" gcc -fPIC -shared sh/bit_and.c -I../../include/libs/function/ -I../../include/client -I../../include/util -o /tmp/udf/libbitand.so gcc -fPIC -shared sh/l2norm.c -I../../include/libs/function/ -I../../include/client -I../../include/util -o /tmp/udf/libl2norm.so +gcc -fPIC -shared sh/gpd.c -I../../include/libs/function/ -I../../include/client -I../../include/util -o /tmp/udf/libgpd.so echo "debug show /tmp/udf/*.so" ls /tmp/udf/*.so diff --git a/tests/script/sh/gpd.c b/tests/script/sh/gpd.c index 8d69bacb5edac29783ce860fb3a8dd5b407541d8..2259efa64a500cd7bf331642445c47a08883bec1 100644 --- a/tests/script/sh/gpd.c +++ b/tests/script/sh/gpd.c @@ -12,13 +12,10 @@ TAOS* taos = NULL; DLL_EXPORT int32_t gpd_init() { - taos = taos_connect("localhost", "root", "taosdata", "", 7100); return 0; } DLL_EXPORT int32_t gpd_destroy() { - taos_close(taos); - taos_cleanup(); return 0; } @@ -32,43 +29,18 @@ DLL_EXPORT int32_t gpd(SUdfDataBlock* block, SUdfColumn *resultCol) { SUdfColumnData *resultData = &resultCol->colData; resultData->numOfRows = block->numOfRows; for (int32_t i = 0; i < resultData->numOfRows; ++i) { - int j = 0; - for (; j < block->numOfCols; ++j) { - if (udfColDataIsNull(block->udfCols[j], i)) { - udfColDataSetNull(resultCol, i); - break; - } - } - if ( j == block->numOfCols) { - int32_t luckyNum = 88; - udfColDataSet(resultCol, i, (char *)&luckyNum, false); - } + int64_t* calc_ts = (int64_t*)udfColDataGetData(block->udfCols[0], i); + char* varTbname = udfColDataGetData(block->udfCols[1], i); + char* varDbname = udfColDataGetData(block->udfCols[2], i); + + char dbName[256] = {0}; + char tblName[256] = {0}; + memcpy(dbName, varDataVal(varDbname), varDataLen(varDbname)); + memcpy(tblName, varDataVal(varTbname), varDataLen(varTbname)); + printf("%s, %s\n", dbName, tblName); + int32_t result = 0; + udfColDataSet(resultCol, i, (char*)&result, false); } - TAOS_RES* res = taos_query(taos, "create database if not exists gpd"); - if (taos_errno(res) != 0) { - char* errstr = taos_errstr(res); - } - res = taos_query(taos, "create table gpd.st (ts timestamp, f int) tags(t int)"); - if (taos_errno(res) != 0) { - char* errstr = taos_errstr(res); - } - taos_query(taos, "insert into gpd.t using gpd.st tags(1) values(now, 1) "); - if (taos_errno(res) != 0) { - char* errstr = taos_errstr(res); - } - - taos_query(taos, "select * from gpd.t"); - if (taos_errno(res) != 0) { - char* errstr = taos_errstr(res); - } - - //to simulate actual processing delay by udf -#ifdef LINUX - usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second) -#endif -#ifdef WINDOWS - Sleep(1); -#endif return 0; } diff --git a/tests/script/tsim/query/udf_with_const.sim b/tests/script/tsim/query/udf_with_const.sim new file mode 100644 index 0000000000000000000000000000000000000000..7a2a3389bd155e1c384823ce94c3fdd96b14139e --- /dev/null +++ b/tests/script/tsim/query/udf_with_const.sim @@ -0,0 +1,45 @@ +system_content printf %OS% +if $system_content == Windows_NT then + return 0; +endi + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c udf -v 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +print ======== step1 udf +system sh/compile_udf.sh +sql create database udf vgroups 3; +sql use udf; + +sql create table t (ts timestamp, f int); +sql insert into t values(now, 1)(now+1s, 2)(now+2s,3)(now+3s,4)(now+4s,5)(now+5s,6)(now+6s,7); + +system_content printf %OS% +if $system_content == Windows_NT then + return 0; +endi + +if $system_content == Windows_NT then + sql create function gpd as 'C:\\Windows\\Temp\\gpd.dll' outputtype int bufSize 8; +else + sql create function gpd as '/tmp/udf/libgpd.so' outputtype int bufSize 8; +endi +sql show functions; +if $rows != 1 then + return -1 +endi + +sql select gpd(ts, tbname, 'detail') from t; +if $rows != 7 then + return -1 +endi +print $data00 $data10 +if $data00 != @0@ then + return -1 +endi +sql drop function gpd; + +system sh/exec.sh -n dnode1 -s stop -x SIGINT