From 0d738dc6f8c1fae65584aff16c692d45b35a5df9 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 24 May 2021 16:14:58 +0800 Subject: [PATCH] [TD-4270]: the first round rest udf implementation --- src/plugins/http/inc/httpParser.h | 2 +- src/plugins/http/src/httpRestHandle.c | 86 +++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 1 deletion(-) diff --git a/src/plugins/http/inc/httpParser.h b/src/plugins/http/inc/httpParser.h index 85ba843716..764e5cdbad 100644 --- a/src/plugins/http/inc/httpParser.h +++ b/src/plugins/http/inc/httpParser.h @@ -17,7 +17,7 @@ #define HTTP_PARSER_H #include "httpGzip.h" -#define HTTP_MAX_URL 5 // http url stack size +#define HTTP_MAX_URL 6 // http url stack size typedef enum HTTP_PARSER_STATE { HTTP_PARSER_BEGIN, diff --git a/src/plugins/http/src/httpRestHandle.c b/src/plugins/http/src/httpRestHandle.c index 8728b9e37a..aee6d1a5d0 100644 --- a/src/plugins/http/src/httpRestHandle.c +++ b/src/plugins/http/src/httpRestHandle.c @@ -119,6 +119,90 @@ bool restProcessSqlRequest(HttpContext* pContext, int32_t timestampFmt) { return true; } +#define REST_FUNC_URL_POS 2 +#define REST_OUTP_URL_POS 3 +#define REST_AGGR_URL_POS 4 +#define REST_BUFF_URL_POS 5 + +#define HTTP_FUNC_LEN 32 +#define HTTP_OUTP_LEN 16 +#define HTTP_AGGR_LEN 2 +#define HTTP_BUFF_LEN 32 + +static int udfSaveFile(const char *fname, const char *content, long len) { + int fd = open(fname, O_WRONLY | O_CREAT | O_EXCL | O_BINARY, 0755); + if (fd < 0) + return -1; + if (taosWrite(fd, (void *)content, len) < 0) + return -1; + + return 0; +} + +static bool restProcessUdfRequest(HttpContext* pContext) { + HttpParser* pParser = pContext->parser; + if (pParser->path[REST_FUNC_URL_POS].pos >= HTTP_FUNC_LEN || pParser->path[REST_FUNC_URL_POS].pos <= 0) { + return false; + } + + if (pParser->path[REST_OUTP_URL_POS].pos >= HTTP_OUTP_LEN || pParser->path[REST_OUTP_URL_POS].pos <= 0) { + return false; + } + + if (pParser->path[REST_AGGR_URL_POS].pos >= HTTP_AGGR_LEN || pParser->path[REST_AGGR_URL_POS].pos <= 0) { + return false; + } + + if (pParser->path[REST_BUFF_URL_POS].pos >= HTTP_BUFF_LEN || pParser->path[REST_BUFF_URL_POS].pos <= 0) { + return false; + } + + char* sql = pContext->parser->body.str; + int len = pContext->parser->body.size; + if (sql == NULL) { + httpSendErrorResp(pContext, TSDB_CODE_HTTP_NO_SQL_INPUT); + return false; + } + + char udfDir[256] = {0}; + char buf[64] = "udf-"; + char funcName[64] = {0}; + int aggr = 0; + char outputType[16] = {0}; + char buffSize[32] = {0}; + + tstrncpy(funcName, pParser->path[REST_FUNC_URL_POS].str, HTTP_FUNC_LEN); + tstrncpy(buf + 4, funcName, HTTP_FUNC_LEN); + + if (pParser->path[REST_AGGR_URL_POS].str[0] != '0') { + aggr = 1; + } + + tstrncpy(outputType, pParser->path[REST_OUTP_URL_POS].str, HTTP_OUTP_LEN); + tstrncpy(buffSize, pParser->path[REST_BUFF_URL_POS].str, HTTP_BUFF_LEN); + + taosGetTmpfilePath(funcName, udfDir); + + udfSaveFile(udfDir, sql, len); + + tfree(sql); + pContext->parser->body.str = malloc(1024); + sql = pContext->parser->body.str; + int sqlLen = sprintf(sql, "create %s function %s as \"%s\" outputtype %s bufsize %s", + aggr == 1 ? "aggregate" : " ", funcName, udfDir, outputType, buffSize); + + pContext->parser->body.pos = sqlLen; + pContext->parser->body.size = sqlLen + 1; + + HttpSqlCmd* cmd = &(pContext->singleCmd); + cmd->nativSql = sql; + + pContext->reqType = HTTP_REQTYPE_SINGLE_SQL; + pContext->encodeMethod = &restEncodeSqlLocalTimeStringMethod; + + return true; +} + bool restProcessRequest(struct HttpContext* pContext) { if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "login")) { restGetUserFromUrl(pContext); @@ -138,6 +222,8 @@ bool restProcessRequest(struct HttpContext* pContext) { return restProcessSqlRequest(pContext, REST_TIMESTAMP_FMT_UTC_STRING); } else if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "login")) { return restProcessLoginRequest(pContext); + } else if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "udf")) { + return restProcessUdfRequest(pContext); } else { } -- GitLab