提交 01268a09 编写于 作者: H Haojun Liao

[td-3188]<feature>: support udf.

上级 8b936d5d
......@@ -230,6 +230,7 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
int tscGetTableMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists);
int32_t tscGetUdfFromNode(SSqlObj *pSql);
void tscResetForNextRetrieve(SSqlRes* pRes);
void tscDoQuery(SSqlObj* pSql);
......
......@@ -47,6 +47,9 @@ enum {
DATA_FROM_DATA_FILE = 2,
};
#define TSDB_UDF_TYPE_SCALAR 1
#define TSDB_UDF_TYPE_AGGREGATE 2
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows);
typedef struct STableComInfo {
......@@ -259,9 +262,9 @@ typedef struct {
int32_t numOfParams;
int8_t dataSourceType; // load data from file or not
char reserve4[3]; // fix bus error on arm32
char reserve4[3]; // fix bus error on arm32
int8_t submitSchema; // submit block is built with table schema
char reserve5[3]; // fix bus error on arm32
char reserve5[3]; // fix bus error on arm32
STagData tagData; // NOTE: pTagData->data is used as a variant length array
SName **pTableNameList; // all involved tableMeta list of current insert sql statement.
......@@ -269,6 +272,7 @@ typedef struct {
SHashObj *pTableBlockHashList; // data block for each table
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
SArray *pUdfInfo; // user defined function information SArray<SUdfInfo>
} SSqlCmd;
typedef struct SResRec {
......@@ -276,6 +280,16 @@ typedef struct SResRec {
int numOfTotal;
} SResRec;
typedef struct SUdfInfo {
int32_t functionId; // system assigned function id
char *name; // function name
int16_t resType; // result type
int16_t resBytes; // result byte
int32_t funcType; // scalar function or aggregate function
int32_t contLen; // content length
char *content; // binary content
} SUdfInfo;
typedef struct {
int32_t numOfRows; // num of results in current retrieval
int64_t numOfRowsGroup; // num of results of current group
......
......@@ -1725,6 +1725,34 @@ bool isValidDistinctSql(SQueryInfo* pQueryInfo) {
return false;
}
static int32_t checkForUdf(SSqlObj* pSql, tSQLExprList* pSelection) {
SSqlCmd* pCmd = &pSql->cmd;
if (pCmd->pUdfInfo != NULL) {
return TSDB_CODE_SUCCESS;
}
pCmd->pUdfInfo = taosArrayInit(4, sizeof(struct SUdfInfo));
for (int32_t i = 0; i < pSelection->nExpr; ++i) {
tSqlExprItem* pItem = &pSelection->a[i];
int32_t type = pItem->pNode->type;
if (type == SQL_NODE_SQLFUNCTION) {
pItem->pNode->functionId = isValidFunction(pItem->pNode->operand.z, pItem->pNode->operand.n);
if (pItem->pNode->functionId < 0) { // extract all possible user defined function
struct SUdfInfo info = {0};
info.name = strndup(pItem->pNode->operand.z, pItem->pNode->operand.n);
taosArrayPush(pCmd->pUdfInfo, &info);
}
}
}
if (taosArrayGetSize(pCmd->pUdfInfo) > 0) {
return tscGetUdfFromNode(pSql);
} else {
return TSDB_CODE_SUCCESS;
}
}
int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery, bool timeWindowQuery) {
assert(pSelection != NULL && pCmd != NULL);
......@@ -1752,6 +1780,14 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
if (type == SQL_NODE_SQLFUNCTION) {
pItem->pNode->functionId = isValidFunction(pItem->pNode->operand.z, pItem->pNode->operand.n);
if (pItem->pNode->functionId < 0) {
// extract all possible user defined function
struct SUdfInfo info = {0};
memcpy(info.name, pItem->pNode->operand.z, pItem->pNode->operand.n);
if (pCmd->pUdfInfo == NULL) {
pCmd->pUdfInfo = taosArrayInit(4, sizeof(struct SUdfInfo));
}
taosArrayPush(pCmd->pUdfInfo, &info);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
......@@ -6719,6 +6755,11 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return code;
}
code = checkForUdf(pSql, pQuerySql->pSelection);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
if (parseSelectClause(&pSql->cmd, 0, pQuerySql->pSelection, isSTable, false, false) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
......@@ -6970,6 +7011,12 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
int32_t timeWindowQuery = !(pQuerySql->interval.interval.type == 0 || pQuerySql->interval.interval.n == 0 ||
pQuerySql->sessionVal.gap.n == 0);
code = checkForUdf(pSql, pQuerySql->pSelection);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (parseSelectClause(pCmd, index, pQuerySql->pSelection, isSTable, joinQuery, timeWindowQuery) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
......
......@@ -448,6 +448,7 @@ int doProcessSql(SSqlObj *pSql) {
pCmd->command == TSDB_SQL_CONNECT ||
pCmd->command == TSDB_SQL_HB ||
pCmd->command == TSDB_SQL_META ||
pCmd->command == TSDB_SQL_RETRIEVE_FUNC ||
pCmd->command == TSDB_SQL_STABLEVGROUP) {
pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
}
......@@ -1319,14 +1320,11 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SShowInfo *pShowInfo = &pInfo->pMiscInfo->showOpt;
SShowMsg *pShowMsg = (SShowMsg *)pCmd->payload;
SShowMsg *pShowMsg = (SShowMsg *)pCmd->payload;
if (pShowInfo->showType == TSDB_MGMT_TABLE_FUNCTION) {
pShowMsg->type = pShowInfo->showType;
pShowMsg->payloadLen = 0;
pCmd->payloadLen = sizeof(SShowMsg);
return TSDB_CODE_SUCCESS;
......@@ -1807,6 +1805,28 @@ int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS;
}
int tscBuildRetrieveFuncMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
char *pMsg = pCmd->payload;
int32_t numOfFuncs = taosArrayGetSize(pCmd->pUdfInfo);
SRetrieveFuncMsg *pRetrieveFuncMsg = (SRetrieveFuncMsg *)pMsg;
pRetrieveFuncMsg->num = htonl(numOfFuncs);
pMsg += sizeof(SRetrieveFuncMsg);
for(int32_t i = 0; i < numOfFuncs; ++i) {
SUdfInfo* pUdf = taosArrayGet(pCmd->pUdfInfo, i);
STR_TO_VARSTR(pMsg, pUdf->name);
pMsg += varDataTLen(pMsg);
}
pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE_FUNC;
pCmd->payloadLen = (int32_t)(pMsg - pCmd->payload);
return TSDB_CODE_SUCCESS;
}
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj;
......@@ -2068,6 +2088,32 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS;
}
int tscProcessRetrieveFuncRsp(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SUdfFuncMsg* pFuncMsg = (SUdfFuncMsg *)pSql->res.pRsp;
pFuncMsg->num = htonl(pFuncMsg->num);
char* pMsg = pFuncMsg->content;
for(int32_t i = 0; i < pFuncMsg->num; ++i) {
SFunctionInfoMsg* pFunc = (SFunctionInfoMsg*) pMsg;
SUdfInfo info = {0};
info.name = strndup(pFunc->name, TSDB_FUNC_NAME_LEN);
info.resBytes = htons(pFunc->resBytes);
info.resType = htons(pFunc->resType);
info.funcType = TSDB_UDF_TYPE_SCALAR;
info.contLen = htons(pFunc->contentLen);
info.content = malloc(pFunc->contentLen);
memcpy(info.content, pFunc->content, info.contLen);
taosArrayPush(pCmd->pUdfInfo, &info);
pMsg += sizeof(SFunctionInfoMsg) + info.contLen;
}
return TSDB_CODE_SUCCESS;
}
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
// master sqlObj locates in param
SSqlObj* parent = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pSql->param);
......@@ -2480,6 +2526,50 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create
return tscGetTableMeta(pSql, pTableMetaInfo);
}
int32_t tscGetUdfFromNode(SSqlObj *pSql) {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
if (NULL == pNew) {
tscError("%p malloc failed for new sqlobj to get user-defined functions", pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
pNew->cmd.command = TSDB_SQL_RETRIEVE_FUNC;
pNew->cmd.pUdfInfo = taosArrayInit(4, sizeof(SUdfInfo));
for(int32_t i = 0; i < taosArrayGetSize(pSql->cmd.pUdfInfo); ++i) {
SUdfInfo info = {0};
SUdfInfo* p1 = taosArrayGet(pSql->cmd.pUdfInfo, i);
info = *p1;
info.name = strdup(p1->name);
taosArrayPush(pNew->cmd.pUdfInfo, &info);
}
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
tscError("%p malloc failed for payload to get table meta", pSql);
tscFreeSqlObj(pNew);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
tscDebug("%p new pSqlObj:%p to retrieve udf", pSql, pNew);
registerSqlObj(pNew);
pNew->fp = tscTableMetaCallBack;
pNew->param = (void *)pSql->self;
tscDebug("%p metaRid from %" PRId64 " to %" PRId64 , pSql, pSql->metaRid, pNew->self);
pSql->metaRid = pNew->self;
int32_t code = tscProcessSql(pNew);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify application that current process needs to be terminated
}
return code;
}
/**
* retrieve table meta from mnode, and update the local table meta hashmap.
* @param pSql sql object
......@@ -2609,6 +2699,7 @@ void tscInitMsgsFp() {
tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
tscBuildMsg[TSDB_SQL_RETRIEVE_FUNC] = tscBuildRetrieveFuncMsg;
tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
......@@ -2627,6 +2718,7 @@ void tscInitMsgsFp() {
tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_FUNC] = tscProcessRetrieveFuncRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode; // rsp handled by same function.
......
......@@ -73,6 +73,7 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_STABLEVGROUP, "stable-vgroup" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MULTI_META, "multi-meta" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_HB, "heart-beat" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_FUNC, "retrieve-function" )
// SQL below for client local
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_LOCAL, "local" )
......
......@@ -72,6 +72,7 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep;
......
......@@ -183,7 +183,7 @@ do { \
#define TSDB_NODE_NAME_LEN 64
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_DB_NAME_LEN 33
#define TSDB_FUNC_NAME_LEN 128
#define TSDB_FUNC_NAME_LEN 65
#define TSDB_FUNC_CODE_LEN (4096 - 512)
#define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN)
......
......@@ -93,7 +93,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_STREAM, "kill-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_CONN, "kill-conn" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CONFIG_DNODE, "cm-config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_HEARTBEAT, "heartbeat" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY8, "dummy8" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_RETRIEVE_FUNC, "retrieve-func" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY10, "dummy10" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY11, "dummy11" )
......@@ -583,11 +583,28 @@ typedef struct {
char code[];
} SCreateFuncMsg;
typedef struct {
int32_t num;
char name[];
} SRetrieveFuncMsg;
typedef struct {
char name[TSDB_FUNC_NAME_LEN];
int16_t resType;
int16_t resBytes;
int16_t contentLen;
char content[];
} SFunctionInfoMsg;
typedef struct {
int32_t num;
char content[];
} SUdfFuncMsg;
typedef struct {
char name[TSDB_FUNC_NAME_LEN];
} SDropFuncMsg;
typedef struct {
char db[TSDB_TABLE_FNAME_LEN];
uint8_t ignoreNotExists;
......
......@@ -32,6 +32,7 @@
#include "mnodeShow.h"
#include "mnodeFunc.h"
#include "mnodeWrite.h"
#include "mnodeRead.h"
#include "mnodePeer.h"
int64_t tsFuncRid = -1;
......@@ -39,6 +40,7 @@ static void * tsFuncSdb = NULL;
static int32_t tsFuncUpdateSize = 0;
static int32_t mnodeGetFuncMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveFuncs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeProcessRetrieveFuncImplMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessCreateFuncMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropFuncMsg(SMnodeMsg *pMsg);
......@@ -134,6 +136,8 @@ int32_t mnodeInitFuncs() {
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_CREATE_FUNCTION, mnodeProcessCreateFuncMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_FUNCTION, mnodeProcessDropFuncMsg);
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_RETRIEVE_FUNC, mnodeProcessRetrieveFuncImplMsg);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_FUNCTION, mnodeGetFuncMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_FUNCTION, mnodeRetrieveFuncs);
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_FUNCTION, mnodeCancelGetNextFunc);
......@@ -418,3 +422,32 @@ static int32_t mnodeProcessDropFuncMsg(SMnodeMsg *pMsg) {
return mnodeDropFunc(pFunc, pMsg);
}
static int32_t mnodeProcessRetrieveFuncImplMsg(SMnodeMsg *pMsg) {
SRetrieveFuncMsg *pInfo = pMsg->rpcMsg.pCont;
pInfo->num = htonl(pInfo->num);
int32_t t = sizeof(SUdfFuncMsg) + sizeof(SFunctionInfoMsg) * pInfo->num + 16384;
SUdfFuncMsg *pFuncMsg = rpcMallocCont(t);
pFuncMsg->num = htonl(pInfo->num);
char* pOutput = pFuncMsg->content;
for(int32_t i = 0; i < pInfo->num; ++i) {
tstr* name = (tstr*) pInfo->name;
char buf[TSDB_FUNC_NAME_LEN] = {0};
tstrncpy(buf, name->data, TSDB_FUNC_NAME_LEN);
SFuncObj* pFuncObj = mnodeGetFunc(buf);
SFunctionInfoMsg* pFuncInfo = (SFunctionInfoMsg*) pOutput;
strcpy(pFuncInfo->name, buf);
pFuncInfo->contentLen = htonl(pFuncObj->codeLen);
pFuncInfo->resType = htons(pFuncObj->outputType);
pOutput += sizeof(SFunctionInfoMsg) + pFuncObj->codeLen;
}
pMsg->rpcRsp.rsp = pFuncMsg;
pMsg->rpcRsp.len = (pOutput - (char*)pFuncMsg);
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
......@@ -215,7 +215,7 @@ typedef struct tSQLExpr {
// the whole string of the function(col, param), while the function name is kept in token
SStrToken operand;
uint32_t functionId; // function id
int32_t functionId; // function id
SStrToken colInfo; // table column info
tVariant value; // the use input value
......
......@@ -117,7 +117,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_FILE_EMPTY, "File is empty")
// mnode
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_IN_PROGRESS, "Message is progressing")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_NEED_REPROCESSED, "Messag need to be reprocessed")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_NEED_REPROCESSED, "Message need to be reprocessed")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NO_RIGHTS, "Insufficient privilege for operation")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Unexpected generic error in mnode")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONNECTION, "Invalid message connection")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册