From 778270f4355d4d4e01443171990ba2ca8a5ab694 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 18 Mar 2021 14:17:24 +0800 Subject: [PATCH] [td-3188] --- src/client/src/tscSQLParser.c | 69 +++++++++++++++++++++++++++++------ src/client/src/tscServer.c | 29 +++++++++++++-- src/inc/taosmsg.h | 5 ++- src/mnode/inc/mnodeDef.h | 12 +++--- src/mnode/src/mnodeFunc.c | 32 +++++++++------- 5 files changed, 113 insertions(+), 34 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index d413f52bdf..96d3e9632c 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1753,6 +1753,18 @@ static int32_t checkForUdf(SSqlObj* pSql, tSQLExprList* pSelection) { } } +static SUdfInfo* isValidUdf(SArray* pUdfInfo, const char* name, int32_t len) { + size_t t = taosArrayGetSize(pUdfInfo); + for(int32_t i = 0; i < t; ++i) { + SUdfInfo* pUdf = taosArrayGet(pUdfInfo, i); + if (strlen(pUdf->name) == len && strncasecmp(pUdf->name, name, len) == 0) { + return pUdf; + } + } + + return NULL; +} + int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery, bool timeWindowQuery) { assert(pSelection != NULL && pCmd != NULL); @@ -1780,15 +1792,10 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel if (type == SQL_NODE_SQLFUNCTION) { pItem->pNode->functionId = isValidFunction(pItem->pNode->operand.z, pItem->pNode->operand.n); if (pItem->pNode->functionId < 0) { - // extract all possible user defined function - struct SUdfInfo info = {0}; - memcpy(info.name, pItem->pNode->operand.z, pItem->pNode->operand.n); - if (pCmd->pUdfInfo == NULL) { - pCmd->pUdfInfo = taosArrayInit(4, sizeof(struct SUdfInfo)); + SUdfInfo* pUdfInfo = isValidUdf(pCmd->pUdfInfo, pItem->pNode->operand.z, pItem->pNode->operand.n); + if (pUdfInfo == NULL) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5); } - - taosArrayPush(pCmd->pUdfInfo, &info); - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5); } // sql function in selection clause, append sql function info in pSqlCmd structure sequentially @@ -2641,10 +2648,50 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return TSDB_CODE_SUCCESS; } - default: - return TSDB_CODE_TSC_INVALID_SQL; + default: { + SUdfInfo* pUdfInfo = isValidUdf(pCmd->pUdfInfo, pItem->pNode->operand.z, pItem->pNode->operand.n); + + tSqlExprItem* pParamElem = &(pItem->pNode->pParam->a[0]); + if (pParamElem->pNode->tokenId != TK_ID) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + } + + SColumnIndex index = COLUMN_INDEX_INITIALIZER; + if (getColumnIndexByName(pCmd, &pParamElem->pNode->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); + } + + if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); + } + + pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); + + // functions can not be applied to tags + if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); + } + + SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, pUdfInfo->resType, pUdfInfo->resBytes, + getNewResColId(pQueryInfo), pUdfInfo->resBytes, false); + + memset(pExpr->aliasName, 0, tListLen(pExpr->aliasName)); + getColumnName(pItem, pExpr->aliasName, sizeof(pExpr->aliasName) - 1); + + SColumnList ids = getColumnList(1, 0, index.columnIndex); + if (finalResult) { + insertResultField(pQueryInfo, colIndex, &ids, pUdfInfo->resBytes, pUdfInfo->resType, pExpr->aliasName, pExpr); + } else { + for (int32_t i = 0; i < ids.num; ++i) { + tscColumnListInsert(pQueryInfo->colList, &(ids.ids[i])); + } + } + + return TSDB_CODE_SUCCESS; + } } - + + return TSDB_CODE_TSC_INVALID_SQL; } // todo refactor diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index d7e142f1e5..2f6d053351 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -758,6 +758,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->sw.gap = htobe64(pQueryInfo->sessionWindow.gap); pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX); + if (pCmd->pUdfInfo != NULL) { + pQueryMsg->udfNum = htonl((uint32_t) taosArrayGetSize(pCmd->pUdfInfo)); + } else { + pQueryMsg->udfNum = 0; + } + size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number @@ -1056,6 +1062,23 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks); } + // support only one udf + if (pCmd->pUdfInfo != NULL) { + assert(taosArrayGetSize(pCmd->pUdfInfo) == 1); + + pQueryMsg->udfContentOffset = htonl((int32_t) (pMsg - pCmd->payload)); + for(int32_t i = 0; i < taosArrayGetSize(pCmd->pUdfInfo); ++i) { + SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, i); + STR_TO_VARSTR(pMsg, pUdfInfo->name); + + pMsg += varDataTLen(pMsg); + pQueryMsg->udfContentLen = htonl(pUdfInfo->contLen); + memcpy(pMsg, pUdfInfo->content, pUdfInfo->contLen); + + pMsg += pUdfInfo->contLen; + } + } + memcpy(pMsg, pSql->sqlstr, sqlLen); pMsg += sqlLen; @@ -2100,11 +2123,11 @@ int tscProcessRetrieveFuncRsp(SSqlObj* pSql) { SUdfInfo info = {0}; info.name = strndup(pFunc->name, TSDB_FUNC_NAME_LEN); info.resBytes = htons(pFunc->resBytes); - info.resType = htons(pFunc->resType); + info.resType = htons(pFunc->resType); info.funcType = TSDB_UDF_TYPE_SCALAR; - info.contLen = htons(pFunc->contentLen); - info.content = malloc(pFunc->contentLen); + info.contLen = htonl(pFunc->len); + info.content = malloc(pFunc->len); memcpy(info.content, pFunc->content, info.contLen); taosArrayPush(pCmd->pUdfInfo, &info); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index ba6e47aa25..bf0acf3c3f 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -511,6 +511,9 @@ typedef struct { int32_t numOfTags; // number of tags columns involved int32_t sqlstrLen; // sql query string int32_t prevResultLen; // previous result length + int32_t udfNum; // number of udf function + int32_t udfContentOffset; + int32_t udfContentLen; SColumnInfo colList[]; } SQueryTableMsg; @@ -592,7 +595,7 @@ typedef struct { char name[TSDB_FUNC_NAME_LEN]; int16_t resType; int16_t resBytes; - int16_t contentLen; + int32_t len; char content[]; } SFunctionInfoMsg; diff --git a/src/mnode/inc/mnodeDef.h b/src/mnode/inc/mnodeDef.h index f9a5ccbcf6..ba685a9bdc 100644 --- a/src/mnode/inc/mnodeDef.h +++ b/src/mnode/inc/mnodeDef.h @@ -216,12 +216,14 @@ typedef struct SUserObj { typedef struct SFuncObj { char name[TSDB_FUNC_NAME_LEN]; - char path[PATH_MAX]; - int32_t codeLen; - char code[TSDB_FUNC_CODE_LEN]; + char path[128]; + int32_t contLen; + char cont[TSDB_FUNC_CODE_LEN]; int64_t createdTime; - uint8_t outputType; - int16_t outputLen; + uint8_t resType; + int16_t resBytes; + int64_t sig; // partial md5 sign + int16_t type; // [lua script|so|js] int8_t reserved[64]; int8_t updateEnd[4]; int32_t refCount; diff --git a/src/mnode/src/mnodeFunc.c b/src/mnode/src/mnodeFunc.c index 871b7ffbad..464ad3f110 100644 --- a/src/mnode/src/mnodeFunc.c +++ b/src/mnode/src/mnodeFunc.c @@ -52,7 +52,7 @@ static int32_t mnodeFuncActionDestroy(SSdbRow *pRow) { static int32_t mnodeFuncActionInsert(SSdbRow *pRow) { SFuncObj *pFunc = pRow->pObj; - mTrace("func:%s, length: %d, insert into sdb", pFunc->name, pFunc->codeLen); + mTrace("func:%s, contLen: %d, insert into sdb", pFunc->name, pFunc->contLen); return TSDB_CODE_SUCCESS; } @@ -60,7 +60,7 @@ static int32_t mnodeFuncActionInsert(SSdbRow *pRow) { static int32_t mnodeFuncActionDelete(SSdbRow *pRow) { SFuncObj *pFunc = pRow->pObj; - mTrace("func:%s, length: %d, delete from sdb", pFunc->name, pFunc->codeLen); + mTrace("func:%s, length: %d, delete from sdb", pFunc->name, pFunc->contLen); return TSDB_CODE_SUCCESS; } @@ -73,8 +73,8 @@ static int32_t mnodeFuncActionUpdate(SSdbRow *pRow) { memcpy(pSaved, pFunc, tsFuncUpdateSize); free(pFunc); } - mnodeDecFuncRef(pSaved); + mnodeDecFuncRef(pSaved); return TSDB_CODE_SUCCESS; } @@ -230,12 +230,14 @@ int32_t mnodeCreateFunc(SAcctObj *pAcct, char *name, int32_t codeLen, char *code pFunc = calloc(1, sizeof(SFuncObj)); tstrncpy(pFunc->name, name, TSDB_FUNC_NAME_LEN); - tstrncpy(pFunc->path, path, PATH_MAX); - tstrncpy(pFunc->code, codeScript, TSDB_FUNC_CODE_LEN); - pFunc->codeLen = codeLen; + tstrncpy(pFunc->path, path, tListLen(pFunc->path)); + tstrncpy(pFunc->cont, codeScript, codeLen); + pFunc->contLen = codeLen; pFunc->createdTime = taosGetTimestampMs(); - pFunc->outputType = outputType; - pFunc->outputLen = outputLen; + pFunc->resType = outputType; + pFunc->resBytes = outputLen; + pFunc->sig = 0; + pFunc->type = 1; //lua script, refactor SSdbRow row = { .type = SDB_OPER_GLOBAL, @@ -380,7 +382,7 @@ static int32_t mnodeRetrieveFuncs(SShowObj *pShow, char *data, int32_t rows, voi cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, mnodeGenTypeStr(buf, TSDB_TYPE_STR_MAX_LEN, pFunc->outputType, pFunc->outputLen), pShow->bytes[cols]); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, mnodeGenTypeStr(buf, TSDB_TYPE_STR_MAX_LEN, pFunc->resType, pFunc->resBytes), pShow->bytes[cols]); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; @@ -388,11 +390,11 @@ static int32_t mnodeRetrieveFuncs(SShowObj *pShow, char *data, int32_t rows, voi cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pFunc->codeLen; + *(int32_t *)pWrite = pFunc->contLen; cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pFunc->code, pShow->bytes[cols]); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pFunc->cont, pShow->bytes[cols]); cols++; numOfRows++; @@ -442,9 +444,11 @@ static int32_t mnodeProcessRetrieveFuncImplMsg(SMnodeMsg *pMsg) { SFunctionInfoMsg* pFuncInfo = (SFunctionInfoMsg*) pOutput; strcpy(pFuncInfo->name, buf); - pFuncInfo->contentLen = htonl(pFuncObj->codeLen); - pFuncInfo->resType = htons(pFuncObj->outputType); - pOutput += sizeof(SFunctionInfoMsg) + pFuncObj->codeLen; + pFuncInfo->len = htonl(pFuncObj->contLen); + memcpy(pFuncInfo->content, pFuncObj->cont, pFuncObj->contLen); + + pFuncInfo->resType = htons(pFuncObj->resType); + pOutput += sizeof(SFunctionInfoMsg) + pFuncObj->contLen; } pMsg->rpcRsp.rsp = pFuncMsg; -- GitLab