提交 89271550 编写于 作者: S slzhou@taodata.com

feat:add udf dedicated errors

上级 c95d3cdb
...@@ -654,6 +654,7 @@ int32_t* taosGetErrno(); ...@@ -654,6 +654,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_UDF_PIPE_NO_PIPE TAOS_DEF_ERROR_CODE(0, 0x2904) #define TSDB_CODE_UDF_PIPE_NO_PIPE TAOS_DEF_ERROR_CODE(0, 0x2904)
#define TSDB_CODE_UDF_LOAD_UDF_FAILURE TAOS_DEF_ERROR_CODE(0, 0x2905) #define TSDB_CODE_UDF_LOAD_UDF_FAILURE TAOS_DEF_ERROR_CODE(0, 0x2905)
#define TSDB_CODE_UDF_INVALID_STATE TAOS_DEF_ERROR_CODE(0, 0x2906) #define TSDB_CODE_UDF_INVALID_STATE TAOS_DEF_ERROR_CODE(0, 0x2906)
#define TSDB_CODE_UDF_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2907)
#define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000) #define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000)
#define TSDB_CODE_SML_INVALID_PRECISION_TYPE TAOS_DEF_ERROR_CODE(0, 0x3001) #define TSDB_CODE_SML_INVALID_PRECISION_TYPE TAOS_DEF_ERROR_CODE(0, 0x3001)
......
...@@ -444,7 +444,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -444,7 +444,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddBool(pCfg, "startUdfd", tsStartUdfd, 0) != 0) return -1; if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
return 0; return 0;
} }
...@@ -585,7 +585,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -585,7 +585,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32; tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32;
tsStartUdfd = cfgGetItem(pCfg, "startUdfd")->bval; tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
if (tsQueryBufferSize >= 0) { if (tsQueryBufferSize >= 0) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
......
...@@ -1497,7 +1497,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { ...@@ -1497,7 +1497,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
taosArrayDestroy(tempBlock.pDataBlock); taosArrayDestroy(tempBlock.pDataBlock);
taosMemoryFree(newState.buf); taosMemoryFree(newState.buf);
return TSDB_CODE_SUCCESS; return udfCode;
} }
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
......
...@@ -140,20 +140,14 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { ...@@ -140,20 +140,14 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
return 0; return 0;
} }
void udfdProcessRequest(uv_work_t *req) { void udfdProcessSetupRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
SUdfRequest request = {0};
decodeUdfRequest(uvUdf->input.base, &request);
switch (request.type) {
case UDF_TASK_SETUP: {
// TODO: tracable id from client. connect, setup, call, teardown // TODO: tracable id from client. connect, setup, call, teardown
fnInfo("%" PRId64 " setup request. udf name: %s", request.seqNum, request.setup.udfName); fnInfo("%" PRId64 " setup request. udf name: %s", request->seqNum, request->setup.udfName);
SUdfSetupRequest *setup = &request.setup; SUdfSetupRequest *setup = &request->setup;
int32_t code = TSDB_CODE_SUCCESS;
SUdf *udf = NULL; SUdf *udf = NULL;
uv_mutex_lock(&global.udfsMutex); uv_mutex_lock(&global.udfsMutex);
SUdf **udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, strlen(request.setup.udfName)); SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName));
if (udfInHash) { if (udfInHash) {
++(*udfInHash)->refCount; ++(*udfInHash)->refCount;
udf = *udfInHash; udf = *udfInHash;
...@@ -166,14 +160,14 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -166,14 +160,14 @@ void udfdProcessRequest(uv_work_t *req) {
uv_mutex_init(&udfNew->lock); uv_mutex_init(&udfNew->lock);
uv_cond_init(&udfNew->condReady); uv_cond_init(&udfNew->condReady);
udf = udfNew; udf = udfNew;
taosHashPut(global.udfsHash, request.setup.udfName, strlen(request.setup.udfName), &udfNew, sizeof(&udfNew)); taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), &udfNew, sizeof(&udfNew));
uv_mutex_unlock(&global.udfsMutex); uv_mutex_unlock(&global.udfsMutex);
} }
uv_mutex_lock(&udf->lock); uv_mutex_lock(&udf->lock);
if (udf->state == UDF_STATE_INIT) { if (udf->state == UDF_STATE_INIT) {
udf->state = UDF_STATE_LOADING; udf->state = UDF_STATE_LOADING;
udfdLoadUdf(setup->udfName, udf); code = udfdLoadUdf(setup->udfName, udf);
if (udf->initFunc) { if (udf->initFunc) {
udf->initFunc(); udf->initFunc();
} }
...@@ -188,14 +182,16 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -188,14 +182,16 @@ void udfdProcessRequest(uv_work_t *req) {
} }
SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle)); SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
handle->udf = udf; handle->udf = udf;
SUdfResponse rsp; SUdfResponse rsp;
rsp.seqNum = request.seqNum; rsp.seqNum = request->seqNum;
rsp.type = request.type; rsp.type = request->type;
rsp.code = 0; rsp.code = code;
rsp.setupRsp.udfHandle = (int64_t)(handle); rsp.setupRsp.udfHandle = (int64_t)(handle);
rsp.setupRsp.outputType = udf->outputType; rsp.setupRsp.outputType = udf->outputType;
rsp.setupRsp.outputLen = udf->outputLen; rsp.setupRsp.outputLen = udf->outputLen;
rsp.setupRsp.bufSize = udf->bufSize; rsp.setupRsp.bufSize = udf->bufSize;
int32_t len = encodeUdfResponse(NULL, &rsp); int32_t len = encodeUdfResponse(NULL, &rsp);
rsp.msgLen = len; rsp.msgLen = len;
void *bufBegin = taosMemoryMalloc(len); void *bufBegin = taosMemoryMalloc(len);
...@@ -205,12 +201,12 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -205,12 +201,12 @@ void udfdProcessRequest(uv_work_t *req) {
uvUdf->output = uv_buf_init(bufBegin, len); uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(uvUdf->input.base); taosMemoryFree(uvUdf->input.base);
break; return;
} }
case UDF_TASK_CALL: { void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfCallRequest *call = &request.call; SUdfCallRequest *call = &request->call;
fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request.seqNum, call->callType, fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request->seqNum, call->callType,
call->udfHandle); call->udfHandle);
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle); SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
SUdf *udf = handle->udf; SUdf *udf = handle->udf;
...@@ -218,13 +214,14 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -218,13 +214,14 @@ void udfdProcessRequest(uv_work_t *req) {
SUdfResponse *rsp = &response; SUdfResponse *rsp = &response;
SUdfCallResponse *subRsp = &rsp->callRsp; SUdfCallResponse *subRsp = &rsp->callRsp;
int32_t code = TSDB_CODE_SUCCESS;
switch(call->callType) { switch(call->callType) {
case TSDB_UDF_CALL_SCALA_PROC: { case TSDB_UDF_CALL_SCALA_PROC: {
SUdfColumn output = {0}; SUdfColumn output = {0};
SUdfDataBlock input = {0}; SUdfDataBlock input = {0};
convertDataBlockToUdfDataBlock(&call->block, &input); convertDataBlockToUdfDataBlock(&call->block, &input);
udf->scalarProcFunc(&input, &output); code = udf->scalarProcFunc(&input, &output);
convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
freeUdfColumn(&output); freeUdfColumn(&output);
...@@ -244,7 +241,7 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -244,7 +241,7 @@ void udfdProcessRequest(uv_work_t *req) {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
.bufLen= udf->bufSize, .bufLen= udf->bufSize,
.numOfResult = 0}; .numOfResult = 0};
udf->aggProcFunc(&input, &call->interBuf, &outBuf); code = udf->aggProcFunc(&input, &call->interBuf, &outBuf);
subRsp->resultBuf = outBuf; subRsp->resultBuf = outBuf;
break; break;
...@@ -253,7 +250,7 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -253,7 +250,7 @@ void udfdProcessRequest(uv_work_t *req) {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
.bufLen= udf->bufSize, .bufLen= udf->bufSize,
.numOfResult = 0}; .numOfResult = 0};
udf->aggFinishFunc(&call->interBuf, &outBuf); code = udf->aggFinishFunc(&call->interBuf, &outBuf);
subRsp->resultBuf = outBuf; subRsp->resultBuf = outBuf;
break; break;
} }
...@@ -261,9 +258,9 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -261,9 +258,9 @@ void udfdProcessRequest(uv_work_t *req) {
break; break;
} }
rsp->seqNum = request.seqNum; rsp->seqNum = request->seqNum;
rsp->type = request.type; rsp->type = request->type;
rsp->code = 0; rsp->code = code;
subRsp->callType = call->callType; subRsp->callType = call->callType;
int32_t len = encodeUdfResponse(NULL, rsp); int32_t len = encodeUdfResponse(NULL, rsp);
...@@ -274,14 +271,17 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -274,14 +271,17 @@ void udfdProcessRequest(uv_work_t *req) {
uvUdf->output = uv_buf_init(bufBegin, len); uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(uvUdf->input.base); taosMemoryFree(uvUdf->input.base);
break; return;
} }
case UDF_TASK_TEARDOWN: {
SUdfTeardownRequest *teardown = &request.teardown; void udfdProcessTeardownRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
fnInfo("teardown. %" PRId64 "handle:%" PRIx64, request.seqNum, teardown->udfHandle) SUdfcFuncHandle *handle = SUdfTeardownRequest *teardown = &request->teardown;
(SUdfcFuncHandle *)(teardown->udfHandle); fnInfo("teardown. %" PRId64 "handle:%" PRIx64, request->seqNum, teardown->udfHandle);
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
SUdf *udf = handle->udf; SUdf *udf = handle->udf;
bool unloadUdf = false; bool unloadUdf = false;
int32_t code = TSDB_CODE_SUCCESS;
uv_mutex_lock(&global.udfsMutex); uv_mutex_lock(&global.udfsMutex);
udf->refCount--; udf->refCount--;
if (udf->refCount == 0) { if (udf->refCount == 0) {
...@@ -302,9 +302,9 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -302,9 +302,9 @@ void udfdProcessRequest(uv_work_t *req) {
SUdfResponse response; SUdfResponse response;
SUdfResponse *rsp = &response; SUdfResponse *rsp = &response;
rsp->seqNum = request.seqNum; rsp->seqNum = request->seqNum;
rsp->type = request.type; rsp->type = request->type;
rsp->code = 0; rsp->code = code;
int32_t len = encodeUdfResponse(NULL, rsp); int32_t len = encodeUdfResponse(NULL, rsp);
rsp->msgLen = len; rsp->msgLen = len;
void *bufBegin = taosMemoryMalloc(len); void *bufBegin = taosMemoryMalloc(len);
...@@ -313,6 +313,26 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -313,6 +313,26 @@ void udfdProcessRequest(uv_work_t *req) {
uvUdf->output = uv_buf_init(bufBegin, len); uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(uvUdf->input.base); taosMemoryFree(uvUdf->input.base);
return;
}
void udfdProcessRequest(uv_work_t *req) {
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
SUdfRequest request = {0};
decodeUdfRequest(uvUdf->input.base, &request);
switch (request.type) {
case UDF_TASK_SETUP: {
udfdProcessSetupRequest(uvUdf, &request);
break;
}
case UDF_TASK_CALL: {
udfdProcessCallRequest(uvUdf, &request);
break;
}
case UDF_TASK_TEARDOWN: {
udfdProcessTeardownRequest(uvUdf, &request);
break; break;
} }
default: { default: {
......
...@@ -27,6 +27,12 @@ int32_t udf2_start(SUdfInterBuf *buf) { ...@@ -27,6 +27,12 @@ int32_t udf2_start(SUdfInterBuf *buf) {
int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
int64_t sumSquares = *(int64_t*)interBuf->buf; int64_t sumSquares = *(int64_t*)interBuf->buf;
int8_t numOutput = 0; int8_t numOutput = 0;
for (int32_t i = 0; i < block->numOfCols; ++i) {
SUdfColumn* col = block->udfCols[i];
if (col->colMeta.type != TSDB_DATA_TYPE_INT) {
return TSDB_CODE_UDF_INVALID_INPUT;
}
}
for (int32_t i = 0; i < block->numOfCols; ++i) { for (int32_t i = 0; i < block->numOfCols; ++i) {
for (int32_t j = 0; j < block->numOfRows; ++j) { for (int32_t j = 0; j < block->numOfRows; ++j) {
SUdfColumn* col = block->udfCols[i]; SUdfColumn* col = block->udfCols[i];
......
...@@ -461,6 +461,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_CONNECT_ERR, "udf pipe connect erro ...@@ -461,6 +461,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_CONNECT_ERR, "udf pipe connect erro
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_NO_PIPE, "udf no pipe") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_NO_PIPE, "udf no pipe")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_LOAD_UDF_FAILURE, "udf load failure") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_LOAD_UDF_FAILURE, "udf load failure")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_STATE, "udf invalid state") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_STATE, "udf invalid state")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_INPUT, "udf invalid function input")
//schemaless //schemaless
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type") TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册