提交 4f87ff34 编写于 作者: S shenglian zhou

Merge branch 'szhou/cenc' of github.com:taosdata/TDengine into szhou/cenc

...@@ -28,39 +28,46 @@ ...@@ -28,39 +28,46 @@
#include "tmsg.h" #include "tmsg.h"
#include "trpc.h" #include "trpc.h"
#include "tmisce.h" #include "tmisce.h"
// clang-foramt on // clang-format on
typedef struct SUdfdContext { typedef struct SUdfdContext {
uv_loop_t * loop; uv_loop_t *loop;
uv_pipe_t ctrlPipe; uv_pipe_t ctrlPipe;
uv_signal_t intrSignal; uv_signal_t intrSignal;
char listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2]; char listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
uv_pipe_t listeningPipe; uv_pipe_t listeningPipe;
void * clientRpc; void *clientRpc;
SCorEpSet mgmtEp; SCorEpSet mgmtEp;
uv_mutex_t udfsMutex; uv_mutex_t udfsMutex;
SHashObj * udfsHash; SHashObj *udfsHash;
SArray* residentFuncs; SArray *residentFuncs;
bool printVersion; bool printVersion;
} SUdfdContext; } SUdfdContext;
SUdfdContext global; SUdfdContext global;
struct SUdfdUvConn;
struct SUvUdfWork;
typedef struct SUdfdUvConn { typedef struct SUdfdUvConn {
uv_stream_t *client; uv_stream_t *client;
char * inputBuf; char *inputBuf;
int32_t inputLen; int32_t inputLen;
int32_t inputCap; int32_t inputCap;
int32_t inputTotal; int32_t inputTotal;
struct SUvUdfWork *pWorkList; // head of work list
} SUdfdUvConn; } SUdfdUvConn;
typedef struct SUvUdfWork { typedef struct SUvUdfWork {
uv_stream_t *client; SUdfdUvConn *conn;
uv_buf_t input; uv_buf_t input;
uv_buf_t output; uv_buf_t output;
struct SUvUdfWork *pWorkNext;
} SUvUdfWork; } SUvUdfWork;
typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY, UDF_STATE_UNLOADING } EUdfState; typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY, UDF_STATE_UNLOADING } EUdfState;
...@@ -107,7 +114,7 @@ typedef enum EUdfdRpcReqRspType { ...@@ -107,7 +114,7 @@ typedef enum EUdfdRpcReqRspType {
typedef struct SUdfdRpcSendRecvInfo { typedef struct SUdfdRpcSendRecvInfo {
EUdfdRpcReqRspType rpcType; EUdfdRpcReqRspType rpcType;
int32_t code; int32_t code;
void * param; void *param;
uv_sem_t resultSem; uv_sem_t resultSem;
} SUdfdRpcSendRecvInfo; } SUdfdRpcSendRecvInfo;
...@@ -178,7 +185,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { ...@@ -178,7 +185,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName); fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
SUdfSetupRequest *setup = &request->setup; SUdfSetupRequest *setup = &request->setup;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SUdf * udf = NULL; SUdf *udf = NULL;
uv_mutex_lock(&global.udfsMutex); uv_mutex_lock(&global.udfsMutex);
SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName)); SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName));
if (udfInHash) { if (udfInHash) {
...@@ -193,7 +200,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { ...@@ -193,7 +200,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
uv_cond_init(&udfNew->condReady); uv_cond_init(&udfNew->condReady);
udf = udfNew; udf = udfNew;
SUdf** pUdf = &udf; SUdf **pUdf = &udf;
taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), pUdf, POINTER_BYTES); taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), pUdf, POINTER_BYTES);
uv_mutex_unlock(&global.udfsMutex); uv_mutex_unlock(&global.udfsMutex);
} }
...@@ -207,7 +214,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { ...@@ -207,7 +214,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
} }
udf->resident = false; udf->resident = false;
for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
char* funcName = taosArrayGet(global.residentFuncs, i); char *funcName = taosArrayGet(global.residentFuncs, i);
if (strcmp(setup->udfName, funcName) == 0) { if (strcmp(setup->udfName, funcName) == 0) {
udf->resident = true; udf->resident = true;
break; break;
...@@ -248,11 +255,12 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { ...@@ -248,11 +255,12 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfCallRequest *call = &request->call; SUdfCallRequest *call = &request->call;
fnDebug("call request. call type %d, handle: %" PRIx64 ", seq num %" PRId64 , call->callType, call->udfHandle, request->seqNum); fnDebug("call request. call type %d, handle: %" PRIx64 ", seq num %" PRId64, call->callType, call->udfHandle,
SUdfcFuncHandle * handle = (SUdfcFuncHandle *)(call->udfHandle); request->seqNum);
SUdf * udf = handle->udf; SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
SUdf *udf = handle->udf;
SUdfResponse response = {0}; SUdfResponse response = {0};
SUdfResponse * rsp = &response; SUdfResponse *rsp = &response;
SUdfCallResponse *subRsp = &rsp->callRsp; SUdfCallResponse *subRsp = &rsp->callRsp;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -352,7 +360,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { ...@@ -352,7 +360,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfTeardownRequest *teardown = &request->teardown; SUdfTeardownRequest *teardown = &request->teardown;
fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle); fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle);
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle); SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
SUdf * udf = handle->udf; SUdf *udf = handle->udf;
bool unloadUdf = false; bool unloadUdf = false;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -417,7 +425,6 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -417,7 +425,6 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
goto _return; goto _return;
} }
if (connectRsp.epSet.numOfEps == 0) { if (connectRsp.epSet.numOfEps == 0) {
msgInfo->code = TSDB_CODE_APP_ERROR; msgInfo->code = TSDB_CODE_APP_ERROR;
goto _return; goto _return;
...@@ -434,7 +441,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -434,7 +441,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
goto _return; goto _return;
} }
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
SUdf * udf = msgInfo->param; SUdf *udf = msgInfo->param;
udf->funcType = pFuncInfo->funcType; udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType; udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->outputType; udf->outputType = pFuncInfo->outputType;
...@@ -487,7 +494,7 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) { ...@@ -487,7 +494,7 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
taosArrayPush(retrieveReq.pFuncNames, udfName); taosArrayPush(retrieveReq.pFuncNames, udfName);
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void * pReq = rpcMallocCont(contLen); void *pReq = rpcMallocCont(contLen);
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq); tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
taosArrayDestroy(retrieveReq.pFuncNames); taosArrayDestroy(retrieveReq.pFuncNames);
...@@ -522,7 +529,7 @@ int32_t udfdConnectToMnode() { ...@@ -522,7 +529,7 @@ int32_t udfdConnectToMnode() {
connReq.startTime = taosGetTimestampMs(); connReq.startTime = taosGetTimestampMs();
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq); int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
void * pReq = rpcMallocCont(contLen); void *pReq = rpcMallocCont(contLen);
tSerializeSConnectReq(pReq, contLen, &connReq); tSerializeSConnectReq(pReq, contLen, &connReq);
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
...@@ -601,7 +608,8 @@ static bool udfdRpcRfp(int32_t code, tmsg_t msgType) { ...@@ -601,7 +608,8 @@ static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER || if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER ||
code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
code == TSDB_CODE_APP_IS_STOPPING) { code == TSDB_CODE_APP_IS_STOPPING) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
msgType == TDMT_SCH_MERGE_FETCH) {
return false; return false;
} }
return true; return true;
...@@ -684,6 +692,17 @@ void udfdOnWrite(uv_write_t *req, int status) { ...@@ -684,6 +692,17 @@ void udfdOnWrite(uv_write_t *req, int status) {
if (status < 0) { if (status < 0) {
fnError("udfd send response error, length: %zu code: %s", work->output.len, uv_err_name(status)); fnError("udfd send response error, length: %zu code: %s", work->output.len, uv_err_name(status));
} }
// remove work from the connection work list
if (work->conn != NULL) {
SUvUdfWork **ppWork;
for (ppWork = &work->conn->pWorkList; *ppWork && (*ppWork != work); ppWork = &((*ppWork)->pWorkNext)) {
}
if (*ppWork == work) {
*ppWork = work->pWorkNext;
} else {
fnError("work not in conn any more");
}
}
taosMemoryFree(work->output.base); taosMemoryFree(work->output.base);
taosMemoryFree(work); taosMemoryFree(work);
taosMemoryFree(req); taosMemoryFree(req);
...@@ -692,10 +711,11 @@ void udfdOnWrite(uv_write_t *req, int status) { ...@@ -692,10 +711,11 @@ void udfdOnWrite(uv_write_t *req, int status) {
void udfdSendResponse(uv_work_t *work, int status) { void udfdSendResponse(uv_work_t *work, int status) {
SUvUdfWork *udfWork = (SUvUdfWork *)(work->data); SUvUdfWork *udfWork = (SUvUdfWork *)(work->data);
if (udfWork->conn != NULL) {
uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t)); uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t));
write_req->data = udfWork; write_req->data = udfWork;
uv_write(write_req, udfWork->client, &udfWork->output, 1, udfdOnWrite); uv_write(write_req, udfWork->conn->client, &udfWork->output, 1, udfdOnWrite);
}
taosMemoryFree(work); taosMemoryFree(work);
} }
...@@ -744,14 +764,14 @@ bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) { ...@@ -744,14 +764,14 @@ bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
} }
void udfdHandleRequest(SUdfdUvConn *conn) { void udfdHandleRequest(SUdfdUvConn *conn) {
char* inputBuf = taosMemoryMalloc(conn->inputLen); char *inputBuf = conn->inputBuf;
memcpy(inputBuf, conn->inputBuf, conn->inputLen);
int32_t inputLen = conn->inputLen; int32_t inputLen = conn->inputLen;
taosMemoryFree(conn->inputBuf);
uv_work_t * work = taosMemoryMalloc(sizeof(uv_work_t)); uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t));
SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork)); SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork));
udfWork->client = conn->client; udfWork->conn = conn;
udfWork->pWorkNext = conn->pWorkList;
conn->pWorkList = udfWork;
udfWork->input = uv_buf_init(inputBuf, inputLen); udfWork->input = uv_buf_init(inputBuf, inputLen);
conn->inputBuf = NULL; conn->inputBuf = NULL;
conn->inputLen = 0; conn->inputLen = 0;
...@@ -763,13 +783,19 @@ void udfdHandleRequest(SUdfdUvConn *conn) { ...@@ -763,13 +783,19 @@ void udfdHandleRequest(SUdfdUvConn *conn) {
void udfdPipeCloseCb(uv_handle_t *pipe) { void udfdPipeCloseCb(uv_handle_t *pipe) {
SUdfdUvConn *conn = pipe->data; SUdfdUvConn *conn = pipe->data;
SUvUdfWork* pWork = conn->pWorkList;
while (pWork != NULL) {
pWork->conn = NULL;
pWork = pWork->pWorkNext;
}
taosMemoryFree(conn->client); taosMemoryFree(conn->client);
taosMemoryFree(conn->inputBuf); taosMemoryFree(conn->inputBuf);
taosMemoryFree(conn); taosMemoryFree(conn);
} }
void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
fnDebug("udf read %zd bytes from client", nread); fnDebug("udfd read %zd bytes from client", nread);
if (nread == 0) return; if (nread == 0) return;
SUdfdUvConn *conn = client->data; SUdfdUvConn *conn = client->data;
...@@ -785,10 +811,10 @@ void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { ...@@ -785,10 +811,10 @@ void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
} }
if (nread < 0) { if (nread < 0) {
fnError("Receive error %s", uv_err_name(nread));
if (nread == UV_EOF) { if (nread == UV_EOF) {
// TODO check more when close fnError("udfd read EOF");
} else { } else {
fnError("Receive error %s", uv_err_name(nread));
} }
udfdUvHandleError(conn); udfdUvHandleError(conn);
} }
...@@ -804,6 +830,7 @@ void udfdOnNewConnection(uv_stream_t *server, int status) { ...@@ -804,6 +830,7 @@ void udfdOnNewConnection(uv_stream_t *server, int status) {
uv_pipe_init(global.loop, client, 0); uv_pipe_init(global.loop, client, 0);
if (uv_accept(server, (uv_stream_t *)client) == 0) { if (uv_accept(server, (uv_stream_t *)client) == 0) {
SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn)); SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn));
ctx->pWorkList = NULL;
ctx->client = (uv_stream_t *)client; ctx->client = (uv_stream_t *)client;
ctx->inputBuf = 0; ctx->inputBuf = 0;
ctx->inputLen = 0; ctx->inputLen = 0;
...@@ -971,10 +998,10 @@ int32_t udfdInitResidentFuncs() { ...@@ -971,10 +998,10 @@ int32_t udfdInitResidentFuncs() {
} }
global.residentFuncs = taosArrayInit(2, TSDB_FUNC_NAME_LEN); global.residentFuncs = taosArrayInit(2, TSDB_FUNC_NAME_LEN);
char* pSave = tsUdfdResFuncs; char *pSave = tsUdfdResFuncs;
char* token; char *token;
while ((token = strtok_r(pSave, ",", &pSave)) != NULL) { while ((token = strtok_r(pSave, ",", &pSave)) != NULL) {
char func[TSDB_FUNC_NAME_LEN+1] = {0}; char func[TSDB_FUNC_NAME_LEN + 1] = {0};
strncpy(func, token, TSDB_FUNC_NAME_LEN); strncpy(func, token, TSDB_FUNC_NAME_LEN);
fnInfo("udfd add resident function %s", func); fnInfo("udfd add resident function %s", func);
taosArrayPush(global.residentFuncs, func); taosArrayPush(global.residentFuncs, func);
...@@ -985,10 +1012,10 @@ int32_t udfdInitResidentFuncs() { ...@@ -985,10 +1012,10 @@ int32_t udfdInitResidentFuncs() {
int32_t udfdDeinitResidentFuncs() { int32_t udfdDeinitResidentFuncs() {
for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
char* funcName = taosArrayGet(global.residentFuncs, i); char *funcName = taosArrayGet(global.residentFuncs, i);
SUdf** udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName)); SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName));
if (udfInHash) { if (udfInHash) {
SUdf* udf = *udfInHash; SUdf *udf = *udfInHash;
if (udf->destroyFunc) { if (udf->destroyFunc) {
(udf->destroyFunc)(); (udf->destroyFunc)();
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册