未验证 提交 e8b467d9 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #12476 from taosdata/feature/udf

feat: fix session error that is caused by udfd restart
...@@ -657,6 +657,7 @@ int32_t* taosGetErrno(); ...@@ -657,6 +657,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_UDF_LOAD_UDF_FAILURE TAOS_DEF_ERROR_CODE(0, 0x2905) #define TSDB_CODE_UDF_LOAD_UDF_FAILURE TAOS_DEF_ERROR_CODE(0, 0x2905)
#define TSDB_CODE_UDF_INVALID_STATE TAOS_DEF_ERROR_CODE(0, 0x2906) #define TSDB_CODE_UDF_INVALID_STATE TAOS_DEF_ERROR_CODE(0, 0x2906)
#define TSDB_CODE_UDF_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2907) #define TSDB_CODE_UDF_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2907)
#define TSDB_CODE_UDF_NO_FUNC_HANDLE TAOS_DEF_ERROR_CODE(0, 0x2908)
#define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000) #define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000)
#define TSDB_CODE_SML_INVALID_PRECISION_TYPE TAOS_DEF_ERROR_CODE(0, 0x3001) #define TSDB_CODE_SML_INVALID_PRECISION_TYPE TAOS_DEF_ERROR_CODE(0, 0x3001)
......
...@@ -1072,6 +1072,8 @@ int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) { ...@@ -1072,6 +1072,8 @@ int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
fnTrace("event loop start uv task. task: %d, %p", uvTask->type, uvTask); fnTrace("event loop start uv task. task: %d, %p", uvTask->type, uvTask);
int32_t code = 0;
switch (uvTask->type) { switch (uvTask->type) {
case UV_TASK_CONNECT: { case UV_TASK_CONNECT: {
uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t)); uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
...@@ -1091,22 +1093,34 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { ...@@ -1091,22 +1093,34 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t)); uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
connReq->data = uvTask; connReq->data = uvTask;
uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect); uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect);
code = 0;
break; break;
} }
case UV_TASK_REQ_RSP: { case UV_TASK_REQ_RSP: {
uv_pipe_t *pipe = uvTask->pipe; uv_pipe_t *pipe = uvTask->pipe;
uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t)); if (pipe == NULL) {
write->data = uvTask; code = TSDB_CODE_UDF_PIPE_NO_PIPE;
int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipetWrite); } else {
if (err != 0) { uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
fnError("udfc event loop start req/rsp task uv_write failed. code: %s", uv_strerror(err)); write->data = uvTask;
int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipetWrite);
if (err != 0) {
fnError("udfc event loop start req/rsp task uv_write failed. code: %s", uv_strerror(err));
}
code = err;
} }
break; break;
} }
case UV_TASK_DISCONNECT: { case UV_TASK_DISCONNECT: {
SClientUvConn *conn = uvTask->pipe->data; uv_pipe_t *pipe = uvTask->pipe;
QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue); if (pipe == NULL) {
uv_close((uv_handle_t *) uvTask->pipe, onUdfcPipeClose); code = TSDB_CODE_UDF_PIPE_NO_PIPE;
} else {
SClientUvConn *conn = pipe->data;
QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
uv_close((uv_handle_t *)uvTask->pipe, onUdfcPipeClose);
code = 0;
}
break; break;
} }
default: { default: {
...@@ -1115,7 +1129,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { ...@@ -1115,7 +1129,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
} }
} }
return 0; return code;
} }
void udfClientAsyncCb(uv_async_t *async) { void udfClientAsyncCb(uv_async_t *async) {
...@@ -1133,6 +1147,9 @@ void udfClientAsyncCb(uv_async_t *async) { ...@@ -1133,6 +1147,9 @@ void udfClientAsyncCb(uv_async_t *async) {
int32_t code = udfcStartUvTask(task); int32_t code = udfcStartUvTask(task);
if (code == 0) { if (code == 0) {
QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue); QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue);
} else {
task->errCode = code;
uv_sem_post(&task->taskSem);
} }
} }
...@@ -1483,6 +1500,9 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { ...@@ -1483,6 +1500,9 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
SClientUdfUvSession *session = udfRes->session; SClientUdfUvSession *session = udfRes->session;
if (session == NULL) {
return TSDB_CODE_UDF_NO_FUNC_HANDLE;
}
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
...@@ -1535,6 +1555,9 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { ...@@ -1535,6 +1555,9 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
SClientUdfUvSession *session = udfRes->session; SClientUdfUvSession *session = udfRes->session;
if (session == NULL) {
return TSDB_CODE_UDF_NO_FUNC_HANDLE;
}
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
......
...@@ -27,13 +27,11 @@ typedef struct SScalarCtx { ...@@ -27,13 +27,11 @@ typedef struct SScalarCtx {
SArray *pBlockList; /* element is SSDataBlock* */ SArray *pBlockList; /* element is SSDataBlock* */
SHashObj *pRes; /* element is SScalarParam */ SHashObj *pRes; /* element is SScalarParam */
void *param; // additional parameter (meta actually) for acquire value such as tbname/tags values void *param; // additional parameter (meta actually) for acquire value such as tbname/tags values
SHashObj *udf2Handle;
} SScalarCtx; } SScalarCtx;
#define SCL_DATA_TYPE_DUMMY_HASH 9000 #define SCL_DATA_TYPE_DUMMY_HASH 9000
#define SCL_DEFAULT_OP_NUM 10 #define SCL_DEFAULT_OP_NUM 10
#define SCL_DEFAULT_UDF_NUM 8
#define SCL_IS_CONST_NODE(_node) ((NULL == (_node)) || (QUERY_NODE_VALUE == (_node)->type) || (QUERY_NODE_NODE_LIST == (_node)->type)) #define SCL_IS_CONST_NODE(_node) ((NULL == (_node)) || (QUERY_NODE_VALUE == (_node)->type) || (QUERY_NODE_NODE_LIST == (_node)->type))
#define SCL_IS_CONST_CALC(_ctx) (NULL == (_ctx)->pBlockList) #define SCL_IS_CONST_CALC(_ctx) (NULL == (_ctx)->pBlockList)
......
...@@ -154,18 +154,6 @@ void sclFreeRes(SHashObj *res) { ...@@ -154,18 +154,6 @@ void sclFreeRes(SHashObj *res) {
taosHashCleanup(res); taosHashCleanup(res);
} }
void sclFreeUdfHandles(SHashObj *udf2handle) {
void *pIter = taosHashIterate(udf2handle, NULL);
while (pIter) {
UdfcFuncHandle *handle = (UdfcFuncHandle *)pIter;
if (handle) {
teardownUdf(*handle);
}
pIter = taosHashIterate(udf2handle, pIter);
}
taosHashCleanup(udf2handle);
}
void sclFreeParam(SScalarParam *param) { void sclFreeParam(SScalarParam *param) {
if (param->columnData != NULL) { if (param->columnData != NULL) {
colDataDestroy(param->columnData); colDataDestroy(param->columnData);
...@@ -375,28 +363,22 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp ...@@ -375,28 +363,22 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
if (fmIsUserDefinedFunc(node->funcId)) { if (fmIsUserDefinedFunc(node->funcId)) {
UdfcFuncHandle udfHandle = NULL; UdfcFuncHandle udfHandle = NULL;
char* udfName = node->functionName;
if (ctx->udf2Handle) { code = setupUdf(node->functionName, &udfHandle);
UdfcFuncHandle *pHandle = taosHashGet(ctx->udf2Handle, udfName, strlen(udfName)); if (code != 0) {
if (pHandle) { sclError("fmExecFunction error. setupUdf. function name: %s, code:%d", node->functionName, code);
udfHandle = *pHandle; goto _return;
}
}
if (udfHandle == NULL) {
code = setupUdf(udfName, &udfHandle);
if (code != 0) {
sclError("fmExecFunction error. setupUdf. function name: %s, code:%d", udfName, code);
goto _return;
}
if (ctx->udf2Handle) {
taosHashPut(ctx->udf2Handle, udfName, strlen(udfName), &udfHandle, sizeof(UdfcFuncHandle));
}
} }
code = callUdfScalarFunc(udfHandle, params, paramNum, output); code = callUdfScalarFunc(udfHandle, params, paramNum, output);
if (code != 0) { if (code != 0) {
sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code); sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
goto _return; goto _return;
} }
code = teardownUdf(udfHandle);
if (code != 0) {
sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
goto _return;
}
} else { } else {
SScalarFuncExecFuncs ffpSet = {0}; SScalarFuncExecFuncs ffpSet = {0};
code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet); code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
...@@ -910,20 +892,15 @@ int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) { ...@@ -910,20 +892,15 @@ int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
SScalarCtx ctx = {0}; SScalarCtx ctx = {0};
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (NULL == ctx.pRes) { if (NULL == ctx.pRes) {
sclError("taosHashInit result map failed, num:%d", SCL_DEFAULT_OP_NUM); sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
ctx.udf2Handle = taosHashInit(SCL_DEFAULT_UDF_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == ctx.udf2Handle) {
sclError("taosHashInit udf to handle map failed, num:%d", SCL_DEFAULT_OP_NUM);
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
nodesRewriteExprPostOrder(&pNode, sclConstantsRewriter, (void *)&ctx); nodesRewriteExprPostOrder(&pNode, sclConstantsRewriter, (void *)&ctx);
SCL_ERR_JRET(ctx.code); SCL_ERR_JRET(ctx.code);
*pRes = pNode; *pRes = pNode;
_return: _return:
sclFreeUdfHandles(ctx.udf2Handle);
sclFreeRes(ctx.pRes); sclFreeRes(ctx.pRes);
return code; return code;
} }
...@@ -939,14 +916,10 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { ...@@ -939,14 +916,10 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
// TODO: OPT performance // TODO: OPT performance
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (NULL == ctx.pRes) { if (NULL == ctx.pRes) {
sclError("taosHashInit result map failed, num:%d", SCL_DEFAULT_OP_NUM); sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
ctx.udf2Handle = taosHashInit(SCL_DEFAULT_UDF_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == ctx.udf2Handle) {
sclError("taosHashInit udf to handle map failed, num:%d", SCL_DEFAULT_OP_NUM);
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
nodesWalkExprPostOrder(pNode, sclCalcWalker, (void *)&ctx); nodesWalkExprPostOrder(pNode, sclCalcWalker, (void *)&ctx);
SCL_ERR_JRET(ctx.code); SCL_ERR_JRET(ctx.code);
...@@ -964,7 +937,6 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { ...@@ -964,7 +937,6 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
_return: _return:
//nodesDestroyNode(pNode); //nodesDestroyNode(pNode);
sclFreeUdfHandles(ctx.udf2Handle);
sclFreeRes(ctx.pRes); sclFreeRes(ctx.pRes);
return code; return code;
} }
...@@ -463,7 +463,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_NO_PIPE, "udf no pipe") ...@@ -463,7 +463,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_NO_PIPE, "udf no pipe")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_LOAD_UDF_FAILURE, "udf load failure") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_LOAD_UDF_FAILURE, "udf load failure")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_STATE, "udf invalid state") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_STATE, "udf invalid state")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_INPUT, "udf invalid function input") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_INPUT, "udf invalid function input")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_NO_FUNC_HANDLE, "udf no function handle")
//schemaless //schemaless
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type") TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type")
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PRECISION_TYPE, "Invalid timestamp precision type") TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PRECISION_TYPE, "Invalid timestamp precision type")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册