提交 778270f4 编写于 作者: H Haojun Liao

[td-3188]

上级 af7ff7c8
......@@ -1753,6 +1753,18 @@ static int32_t checkForUdf(SSqlObj* pSql, tSQLExprList* pSelection) {
}
}
static SUdfInfo* isValidUdf(SArray* pUdfInfo, const char* name, int32_t len) {
size_t t = taosArrayGetSize(pUdfInfo);
for(int32_t i = 0; i < t; ++i) {
SUdfInfo* pUdf = taosArrayGet(pUdfInfo, i);
if (strlen(pUdf->name) == len && strncasecmp(pUdf->name, name, len) == 0) {
return pUdf;
}
}
return NULL;
}
int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery, bool timeWindowQuery) {
assert(pSelection != NULL && pCmd != NULL);
......@@ -1780,15 +1792,10 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
if (type == SQL_NODE_SQLFUNCTION) {
pItem->pNode->functionId = isValidFunction(pItem->pNode->operand.z, pItem->pNode->operand.n);
if (pItem->pNode->functionId < 0) {
// extract all possible user defined function
struct SUdfInfo info = {0};
memcpy(info.name, pItem->pNode->operand.z, pItem->pNode->operand.n);
if (pCmd->pUdfInfo == NULL) {
pCmd->pUdfInfo = taosArrayInit(4, sizeof(struct SUdfInfo));
SUdfInfo* pUdfInfo = isValidUdf(pCmd->pUdfInfo, pItem->pNode->operand.z, pItem->pNode->operand.n);
if (pUdfInfo == NULL) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
taosArrayPush(pCmd->pUdfInfo, &info);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
// sql function in selection clause, append sql function info in pSqlCmd structure sequentially
......@@ -2641,10 +2648,50 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return TSDB_CODE_SUCCESS;
}
default:
return TSDB_CODE_TSC_INVALID_SQL;
default: {
SUdfInfo* pUdfInfo = isValidUdf(pCmd->pUdfInfo, pItem->pNode->operand.z, pItem->pNode->operand.n);
tSqlExprItem* pParamElem = &(pItem->pNode->pParam->a[0]);
if (pParamElem->pNode->tokenId != TK_ID) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(pCmd, &pParamElem->pNode->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
// functions can not be applied to tags
if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, pUdfInfo->resType, pUdfInfo->resBytes,
getNewResColId(pQueryInfo), pUdfInfo->resBytes, false);
memset(pExpr->aliasName, 0, tListLen(pExpr->aliasName));
getColumnName(pItem, pExpr->aliasName, sizeof(pExpr->aliasName) - 1);
SColumnList ids = getColumnList(1, 0, index.columnIndex);
if (finalResult) {
insertResultField(pQueryInfo, colIndex, &ids, pUdfInfo->resBytes, pUdfInfo->resType, pExpr->aliasName, pExpr);
} else {
for (int32_t i = 0; i < ids.num; ++i) {
tscColumnListInsert(pQueryInfo->colList, &(ids.ids[i]));
}
}
return TSDB_CODE_SUCCESS;
}
}
return TSDB_CODE_TSC_INVALID_SQL;
}
// todo refactor
......
......@@ -758,6 +758,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->sw.gap = htobe64(pQueryInfo->sessionWindow.gap);
pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX);
if (pCmd->pUdfInfo != NULL) {
pQueryMsg->udfNum = htonl((uint32_t) taosArrayGetSize(pCmd->pUdfInfo));
} else {
pQueryMsg->udfNum = 0;
}
size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number
......@@ -1056,6 +1062,23 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks);
}
// support only one udf
if (pCmd->pUdfInfo != NULL) {
assert(taosArrayGetSize(pCmd->pUdfInfo) == 1);
pQueryMsg->udfContentOffset = htonl((int32_t) (pMsg - pCmd->payload));
for(int32_t i = 0; i < taosArrayGetSize(pCmd->pUdfInfo); ++i) {
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, i);
STR_TO_VARSTR(pMsg, pUdfInfo->name);
pMsg += varDataTLen(pMsg);
pQueryMsg->udfContentLen = htonl(pUdfInfo->contLen);
memcpy(pMsg, pUdfInfo->content, pUdfInfo->contLen);
pMsg += pUdfInfo->contLen;
}
}
memcpy(pMsg, pSql->sqlstr, sqlLen);
pMsg += sqlLen;
......@@ -2100,11 +2123,11 @@ int tscProcessRetrieveFuncRsp(SSqlObj* pSql) {
SUdfInfo info = {0};
info.name = strndup(pFunc->name, TSDB_FUNC_NAME_LEN);
info.resBytes = htons(pFunc->resBytes);
info.resType = htons(pFunc->resType);
info.resType = htons(pFunc->resType);
info.funcType = TSDB_UDF_TYPE_SCALAR;
info.contLen = htons(pFunc->contentLen);
info.content = malloc(pFunc->contentLen);
info.contLen = htonl(pFunc->len);
info.content = malloc(pFunc->len);
memcpy(info.content, pFunc->content, info.contLen);
taosArrayPush(pCmd->pUdfInfo, &info);
......
......@@ -511,6 +511,9 @@ typedef struct {
int32_t numOfTags; // number of tags columns involved
int32_t sqlstrLen; // sql query string
int32_t prevResultLen; // previous result length
int32_t udfNum; // number of udf function
int32_t udfContentOffset;
int32_t udfContentLen;
SColumnInfo colList[];
} SQueryTableMsg;
......@@ -592,7 +595,7 @@ typedef struct {
char name[TSDB_FUNC_NAME_LEN];
int16_t resType;
int16_t resBytes;
int16_t contentLen;
int32_t len;
char content[];
} SFunctionInfoMsg;
......
......@@ -216,12 +216,14 @@ typedef struct SUserObj {
typedef struct SFuncObj {
char name[TSDB_FUNC_NAME_LEN];
char path[PATH_MAX];
int32_t codeLen;
char code[TSDB_FUNC_CODE_LEN];
char path[128];
int32_t contLen;
char cont[TSDB_FUNC_CODE_LEN];
int64_t createdTime;
uint8_t outputType;
int16_t outputLen;
uint8_t resType;
int16_t resBytes;
int64_t sig; // partial md5 sign
int16_t type; // [lua script|so|js]
int8_t reserved[64];
int8_t updateEnd[4];
int32_t refCount;
......
......@@ -52,7 +52,7 @@ static int32_t mnodeFuncActionDestroy(SSdbRow *pRow) {
static int32_t mnodeFuncActionInsert(SSdbRow *pRow) {
SFuncObj *pFunc = pRow->pObj;
mTrace("func:%s, length: %d, insert into sdb", pFunc->name, pFunc->codeLen);
mTrace("func:%s, contLen: %d, insert into sdb", pFunc->name, pFunc->contLen);
return TSDB_CODE_SUCCESS;
}
......@@ -60,7 +60,7 @@ static int32_t mnodeFuncActionInsert(SSdbRow *pRow) {
static int32_t mnodeFuncActionDelete(SSdbRow *pRow) {
SFuncObj *pFunc = pRow->pObj;
mTrace("func:%s, length: %d, delete from sdb", pFunc->name, pFunc->codeLen);
mTrace("func:%s, length: %d, delete from sdb", pFunc->name, pFunc->contLen);
return TSDB_CODE_SUCCESS;
}
......@@ -73,8 +73,8 @@ static int32_t mnodeFuncActionUpdate(SSdbRow *pRow) {
memcpy(pSaved, pFunc, tsFuncUpdateSize);
free(pFunc);
}
mnodeDecFuncRef(pSaved);
mnodeDecFuncRef(pSaved);
return TSDB_CODE_SUCCESS;
}
......@@ -230,12 +230,14 @@ int32_t mnodeCreateFunc(SAcctObj *pAcct, char *name, int32_t codeLen, char *code
pFunc = calloc(1, sizeof(SFuncObj));
tstrncpy(pFunc->name, name, TSDB_FUNC_NAME_LEN);
tstrncpy(pFunc->path, path, PATH_MAX);
tstrncpy(pFunc->code, codeScript, TSDB_FUNC_CODE_LEN);
pFunc->codeLen = codeLen;
tstrncpy(pFunc->path, path, tListLen(pFunc->path));
tstrncpy(pFunc->cont, codeScript, codeLen);
pFunc->contLen = codeLen;
pFunc->createdTime = taosGetTimestampMs();
pFunc->outputType = outputType;
pFunc->outputLen = outputLen;
pFunc->resType = outputType;
pFunc->resBytes = outputLen;
pFunc->sig = 0;
pFunc->type = 1; //lua script, refactor
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
......@@ -380,7 +382,7 @@ static int32_t mnodeRetrieveFuncs(SShowObj *pShow, char *data, int32_t rows, voi
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, mnodeGenTypeStr(buf, TSDB_TYPE_STR_MAX_LEN, pFunc->outputType, pFunc->outputLen), pShow->bytes[cols]);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, mnodeGenTypeStr(buf, TSDB_TYPE_STR_MAX_LEN, pFunc->resType, pFunc->resBytes), pShow->bytes[cols]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......@@ -388,11 +390,11 @@ static int32_t mnodeRetrieveFuncs(SShowObj *pShow, char *data, int32_t rows, voi
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pFunc->codeLen;
*(int32_t *)pWrite = pFunc->contLen;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pFunc->code, pShow->bytes[cols]);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pFunc->cont, pShow->bytes[cols]);
cols++;
numOfRows++;
......@@ -442,9 +444,11 @@ static int32_t mnodeProcessRetrieveFuncImplMsg(SMnodeMsg *pMsg) {
SFunctionInfoMsg* pFuncInfo = (SFunctionInfoMsg*) pOutput;
strcpy(pFuncInfo->name, buf);
pFuncInfo->contentLen = htonl(pFuncObj->codeLen);
pFuncInfo->resType = htons(pFuncObj->outputType);
pOutput += sizeof(SFunctionInfoMsg) + pFuncObj->codeLen;
pFuncInfo->len = htonl(pFuncObj->contLen);
memcpy(pFuncInfo->content, pFuncObj->cont, pFuncObj->contLen);
pFuncInfo->resType = htons(pFuncObj->resType);
pOutput += sizeof(SFunctionInfoMsg) + pFuncObj->contLen;
}
pMsg->rpcRsp.rsp = pFuncMsg;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册