From 190fbe849e7979bfae51e78d2c6c97e32eba4f0d Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 12 Dec 2022 20:20:24 +0800 Subject: [PATCH] fix: udfd pipe can close before sending response --- source/libs/function/src/udfd.c | 115 ++++++++++++++++++++------------ 1 file changed, 72 insertions(+), 43 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 2ab1e8b64c..9ee9c787ee 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -28,39 +28,46 @@ #include "tmsg.h" #include "trpc.h" #include "tmisce.h" -// clang-foramt on +// clang-format on typedef struct SUdfdContext { - uv_loop_t * loop; + uv_loop_t *loop; uv_pipe_t ctrlPipe; uv_signal_t intrSignal; char listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2]; uv_pipe_t listeningPipe; - void * clientRpc; + void *clientRpc; SCorEpSet mgmtEp; uv_mutex_t udfsMutex; - SHashObj * udfsHash; + SHashObj *udfsHash; - SArray* residentFuncs; + SArray *residentFuncs; bool printVersion; } SUdfdContext; SUdfdContext global; +struct SUdfdUvConn; +struct SUvUdfWork; + typedef struct SUdfdUvConn { uv_stream_t *client; - char * inputBuf; + char *inputBuf; int32_t inputLen; int32_t inputCap; int32_t inputTotal; + + struct SUvUdfWork *pWorkList; // head of work list } SUdfdUvConn; typedef struct SUvUdfWork { - uv_stream_t *client; + SUdfdUvConn *conn; uv_buf_t input; uv_buf_t output; + + struct SUvUdfWork *pWorkNext; } SUvUdfWork; typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY, UDF_STATE_UNLOADING } EUdfState; @@ -70,7 +77,7 @@ typedef struct SUdf { EUdfState state; uv_mutex_t lock; uv_cond_t condReady; - bool resident; + bool resident; char name[TSDB_FUNC_NAME_LEN + 1]; int8_t funcType; @@ -107,7 +114,7 @@ typedef enum EUdfdRpcReqRspType { typedef struct SUdfdRpcSendRecvInfo { EUdfdRpcReqRspType rpcType; int32_t code; - void * param; + void *param; uv_sem_t resultSem; } SUdfdRpcSendRecvInfo; @@ -178,7 +185,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName); SUdfSetupRequest *setup = &request->setup; int32_t code = TSDB_CODE_SUCCESS; - SUdf * udf = NULL; + SUdf *udf = NULL; uv_mutex_lock(&global.udfsMutex); SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName)); if (udfInHash) { @@ -193,7 +200,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { uv_cond_init(&udfNew->condReady); udf = udfNew; - SUdf** pUdf = &udf; + SUdf **pUdf = &udf; taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), pUdf, POINTER_BYTES); uv_mutex_unlock(&global.udfsMutex); } @@ -207,7 +214,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { } udf->resident = false; for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { - char* funcName = taosArrayGet(global.residentFuncs, i); + char *funcName = taosArrayGet(global.residentFuncs, i); if (strcmp(setup->udfName, funcName) == 0) { udf->resident = true; break; @@ -248,11 +255,12 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { SUdfCallRequest *call = &request->call; - fnDebug("call request. call type %d, handle: %" PRIx64 ", seq num %" PRId64 , call->callType, call->udfHandle, request->seqNum); - SUdfcFuncHandle * handle = (SUdfcFuncHandle *)(call->udfHandle); - SUdf * udf = handle->udf; + fnDebug("call request. call type %d, handle: %" PRIx64 ", seq num %" PRId64, call->callType, call->udfHandle, + request->seqNum); + SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle); + SUdf *udf = handle->udf; SUdfResponse response = {0}; - SUdfResponse * rsp = &response; + SUdfResponse *rsp = &response; SUdfCallResponse *subRsp = &rsp->callRsp; int32_t code = TSDB_CODE_SUCCESS; @@ -352,7 +360,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { SUdfTeardownRequest *teardown = &request->teardown; fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle); SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle); - SUdf * udf = handle->udf; + SUdf *udf = handle->udf; bool unloadUdf = false; int32_t code = TSDB_CODE_SUCCESS; @@ -409,15 +417,14 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) { SConnectRsp connectRsp = {0}; tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); - + int32_t now = taosGetTimestampSec(); int32_t delta = abs(now - connectRsp.svrTimestamp); if (delta > 900) { msgInfo->code = TSDB_CODE_TIME_UNSYNCED; goto _return; } - - + if (connectRsp.epSet.numOfEps == 0) { msgInfo->code = TSDB_CODE_APP_ERROR; goto _return; @@ -434,7 +441,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { goto _return; } SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); - SUdf * udf = msgInfo->param; + SUdf *udf = msgInfo->param; udf->funcType = pFuncInfo->funcType; udf->scriptType = pFuncInfo->scriptType; udf->outputType = pFuncInfo->outputType; @@ -487,7 +494,7 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) { taosArrayPush(retrieveReq.pFuncNames, udfName); int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); - void * pReq = rpcMallocCont(contLen); + void *pReq = rpcMallocCont(contLen); tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq); taosArrayDestroy(retrieveReq.pFuncNames); @@ -522,7 +529,7 @@ int32_t udfdConnectToMnode() { connReq.startTime = taosGetTimestampMs(); int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq); - void * pReq = rpcMallocCont(contLen); + void *pReq = rpcMallocCont(contLen); tSerializeSConnectReq(pReq, contLen, &connReq); SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); @@ -589,7 +596,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { strncpy(finishFuncName, processFuncName, sizeof(finishFuncName)); strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc)); - char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; + char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; char *mergeSuffix = "_merge"; strncpy(mergeFuncName, processFuncName, sizeof(mergeFuncName)); strncat(mergeFuncName, mergeSuffix, strlen(mergeSuffix)); @@ -601,9 +608,10 @@ static bool udfdRpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) { - if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) { + if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || + msgType == TDMT_SCH_MERGE_FETCH) { return false; - } + } return true; } else { return false; @@ -663,7 +671,7 @@ int32_t udfdOpenClientRpc() { rpcInit.parent = &global; rpcInit.rfp = udfdRpcRfp; rpcInit.compressSize = tsCompressMsgSize; - + global.clientRpc = rpcOpen(&rpcInit); if (global.clientRpc == NULL) { fnError("failed to init dnode rpc client"); @@ -684,6 +692,17 @@ void udfdOnWrite(uv_write_t *req, int status) { if (status < 0) { fnError("udfd send response error, length: %zu code: %s", work->output.len, uv_err_name(status)); } + // remove work from the connection work list + if (work->conn != NULL) { + SUvUdfWork **ppWork; + for (ppWork = &work->conn->pWorkList; *ppWork && (*ppWork != work); ppWork = &((*ppWork)->pWorkNext)) { + } + if (*ppWork == work) { + *ppWork = work->pWorkNext; + } else { + fnError("work not in conn any more"); + } + } taosMemoryFree(work->output.base); taosMemoryFree(work); taosMemoryFree(req); @@ -692,10 +711,11 @@ void udfdOnWrite(uv_write_t *req, int status) { void udfdSendResponse(uv_work_t *work, int status) { SUvUdfWork *udfWork = (SUvUdfWork *)(work->data); - uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t)); - write_req->data = udfWork; - uv_write(write_req, udfWork->client, &udfWork->output, 1, udfdOnWrite); - + if (udfWork->conn != NULL) { + uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t)); + write_req->data = udfWork; + uv_write(write_req, udfWork->conn->client, &udfWork->output, 1, udfdOnWrite); + } taosMemoryFree(work); } @@ -716,8 +736,8 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { buf->len = 0; } } else if (ctx->inputTotal == -1 && ctx->inputLen < msgHeadSize) { - buf->base = ctx->inputBuf + ctx->inputLen; - buf->len = msgHeadSize - ctx->inputLen; + buf->base = ctx->inputBuf + ctx->inputLen; + buf->len = msgHeadSize - ctx->inputLen; } else { ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap; void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap); @@ -744,14 +764,16 @@ bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) { } void udfdHandleRequest(SUdfdUvConn *conn) { - char* inputBuf = taosMemoryMalloc(conn->inputLen); + char *inputBuf = taosMemoryMalloc(conn->inputLen); memcpy(inputBuf, conn->inputBuf, conn->inputLen); int32_t inputLen = conn->inputLen; taosMemoryFree(conn->inputBuf); - uv_work_t * work = taosMemoryMalloc(sizeof(uv_work_t)); + uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t)); SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork)); - udfWork->client = conn->client; + udfWork->conn = conn; + udfWork->pWorkNext = conn->pWorkList; + conn->pWorkList = udfWork; udfWork->input = uv_buf_init(inputBuf, inputLen); conn->inputBuf = NULL; conn->inputLen = 0; @@ -763,6 +785,12 @@ void udfdHandleRequest(SUdfdUvConn *conn) { void udfdPipeCloseCb(uv_handle_t *pipe) { SUdfdUvConn *conn = pipe->data; + SUvUdfWork* pWork = conn->pWorkList; + while (pWork != NULL) { + pWork->conn = NULL; + pWork = pWork->pWorkNext; + } + taosMemoryFree(conn->client); taosMemoryFree(conn->inputBuf); taosMemoryFree(conn); @@ -804,6 +832,7 @@ void udfdOnNewConnection(uv_stream_t *server, int status) { uv_pipe_init(global.loop, client, 0); if (uv_accept(server, (uv_stream_t *)client) == 0) { SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn)); + ctx->pWorkList = NULL; ctx->client = (uv_stream_t *)client; ctx->inputBuf = 0; ctx->inputLen = 0; @@ -896,7 +925,7 @@ static int32_t udfdUvInit() { } global.loop = loop; - if (tsStartUdfd) { // udfd is started by taosd, which shall exit when taosd exit + if (tsStartUdfd) { // udfd is started by taosd, which shall exit when taosd exit uv_pipe_init(global.loop, &global.ctrlPipe, 1); uv_pipe_open(&global.ctrlPipe, 0); uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); @@ -971,10 +1000,10 @@ int32_t udfdInitResidentFuncs() { } global.residentFuncs = taosArrayInit(2, TSDB_FUNC_NAME_LEN); - char* pSave = tsUdfdResFuncs; - char* token; + char *pSave = tsUdfdResFuncs; + char *token; while ((token = strtok_r(pSave, ",", &pSave)) != NULL) { - char func[TSDB_FUNC_NAME_LEN+1] = {0}; + char func[TSDB_FUNC_NAME_LEN + 1] = {0}; strncpy(func, token, TSDB_FUNC_NAME_LEN); fnInfo("udfd add resident function %s", func); taosArrayPush(global.residentFuncs, func); @@ -985,10 +1014,10 @@ int32_t udfdInitResidentFuncs() { int32_t udfdDeinitResidentFuncs() { for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { - char* funcName = taosArrayGet(global.residentFuncs, i); - SUdf** udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName)); + char *funcName = taosArrayGet(global.residentFuncs, i); + SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName)); if (udfInHash) { - SUdf* udf = *udfInHash; + SUdf *udf = *udfInHash; if (udf->destroyFunc) { (udf->destroyFunc)(); } -- GitLab