提交 14e9fa35 编写于 作者: X Xiaoyu Wang

fix: some problems of udf and 'create table'

上级 b05c263e
......@@ -275,7 +275,6 @@ typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igExists;
float xFilesFactor;
int32_t aggregationMethod;
int32_t delay;
int32_t ttl;
int32_t numOfColumns;
......@@ -1495,10 +1494,8 @@ typedef struct {
int32_t delay;
int32_t qmsg1Len;
int32_t qmsg2Len;
func_id_t* pFuncIds;
char* qmsg1; // not null: pAst1:qmsg1:SRetention1 => trigger aggr task1
char* qmsg2; // not null: pAst2:qmsg2:SRetention2 => trigger aggr task2
int8_t nFuncIds;
} SRSmaParam;
typedef struct SVCreateTbReq {
......
......@@ -125,15 +125,15 @@ typedef struct SFmGetFuncInfoParam {
struct SCatalog* pCtg;
void *pRpc;
const SEpSet* pMgmtEps;
char* pErrBuf;
int32_t errBufLen;
} SFmGetFuncInfoParam;
int32_t fmFuncMgtInit();
void fmFuncMgtDestroy();
int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType);
int32_t fmGetFuncResultType(SFunctionNode* pFunc, char* pErrBuf, int32_t len);
int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc);
bool fmIsAggFunc(int32_t funcId);
bool fmIsScalarFunc(int32_t funcId);
......
......@@ -119,6 +119,7 @@ typedef struct SFunctionNode {
int32_t funcId;
int32_t funcType;
SNodeList* pParameterList;
int32_t udfBufSize;
} SFunctionNode;
typedef struct STableNode {
......@@ -135,7 +136,7 @@ typedef struct SRealTableNode {
STableNode table; // QUERY_NODE_REAL_TABLE
struct STableMeta* pMeta;
SVgroupsInfo* pVgroupList;
char useDbName[TSDB_DB_NAME_LEN];
char qualDbName[TSDB_DB_NAME_LEN]; // SHOW qualDbName.TABLES
double ratio;
} SRealTableNode;
......
......@@ -573,7 +573,6 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_TABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x2602)
#define TSDB_CODE_PAR_AMBIGUOUS_COLUMN TAOS_DEF_ERROR_CODE(0, 0x2603)
#define TSDB_CODE_PAR_WRONG_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x2604)
#define TSDB_CODE_PAR_INVALID_FUNTION TAOS_DEF_ERROR_CODE(0, 0x2605)
#define TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION TAOS_DEF_ERROR_CODE(0, 0x2608)
#define TSDB_CODE_PAR_WRONG_NUMBER_OF_SELECT TAOS_DEF_ERROR_CODE(0, 0x2609)
#define TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION TAOS_DEF_ERROR_CODE(0, 0x260A)
......@@ -633,6 +632,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_FUNC_FUNTION_PARA_NUM TAOS_DEF_ERROR_CODE(0, 0x2801)
#define TSDB_CODE_FUNC_FUNTION_PARA_TYPE TAOS_DEF_ERROR_CODE(0, 0x2802)
#define TSDB_CODE_FUNC_FUNTION_PARA_VALUE TAOS_DEF_ERROR_CODE(0, 0x2803)
#define TSDB_CODE_FUNC_INVALID_FUNTION TAOS_DEF_ERROR_CODE(0, 0x2604)
#ifdef __cplusplus
}
......
......@@ -442,10 +442,6 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
tlen += taosEncodeBinary(buf, (const void *)&param->xFilesFactor, sizeof(param->xFilesFactor));
tlen += taosEncodeFixedI32(buf, param->delay);
tlen += taosEncodeFixedI8(buf, param->nFuncIds);
for (int8_t i = 0; i < param->nFuncIds; ++i) {
tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]);
}
tlen += taosEncodeFixedI32(buf, param->qmsg1Len);
if (param->qmsg1Len > 0) {
tlen += taosEncodeString(buf, param->qmsg1);
......@@ -475,10 +471,6 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
tlen += taosEncodeBinary(buf, (const void *)&param->xFilesFactor, sizeof(param->xFilesFactor));
tlen += taosEncodeFixedI32(buf, param->delay);
tlen += taosEncodeFixedI8(buf, param->nFuncIds);
for (int8_t i = 0; i < param->nFuncIds; ++i) {
tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]);
}
}
break;
default:
......@@ -521,13 +513,6 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
buf = taosDecodeBinaryTo(buf, (void *)&param->xFilesFactor, sizeof(param->xFilesFactor));
buf = taosDecodeFixedI32(buf, &param->delay);
buf = taosDecodeFixedI8(buf, &param->nFuncIds);
if (param->nFuncIds > 0) {
param->pFuncIds = (func_id_t *)taosMemoryCalloc(param->nFuncIds, sizeof(func_id_t));
for (int8_t i = 0; i < param->nFuncIds; ++i) {
buf = taosDecodeFixedI32(buf, param->pFuncIds + i);
}
}
buf = taosDecodeFixedI32(buf, &param->qmsg1Len);
if (param->qmsg1Len > 0) {
buf = taosDecodeString(buf, &param->qmsg1);
......@@ -561,15 +546,6 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
buf = taosDecodeBinaryTo(buf, (void *)&param->xFilesFactor, sizeof(param->xFilesFactor));
buf = taosDecodeFixedI32(buf, &param->delay);
buf = taosDecodeFixedI8(buf, &param->nFuncIds);
if (param->nFuncIds > 0) {
param->pFuncIds = (func_id_t *)taosMemoryMalloc(param->nFuncIds * sizeof(func_id_t));
for (int8_t i = 0; i < param->nFuncIds; ++i) {
buf = taosDecodeFixedI32(buf, param->pFuncIds + i);
}
} else {
param->pFuncIds = NULL;
}
} else {
pReq->ntbCfg.pRSmaParam = NULL;
}
......@@ -632,7 +608,6 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
if (tEncodeFloat(&encoder, pReq->xFilesFactor) < 0) return -1;
if (tEncodeI32(&encoder, pReq->aggregationMethod) < 0) return -1;
if (tEncodeI32(&encoder, pReq->delay) < 0) return -1;
if (tEncodeI32(&encoder, pReq->ttl) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfColumns) < 0) return -1;
......@@ -687,7 +662,6 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
if (tDecodeFloat(&decoder, &pReq->xFilesFactor) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->aggregationMethod) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->delay) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->ttl) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfColumns) < 0) return -1;
......
......@@ -350,7 +350,6 @@ typedef struct {
int32_t version;
int32_t nextColId;
float xFilesFactor;
int32_t aggregationMethod;
int32_t delay;
int32_t ttl;
int32_t numOfColumns;
......
......@@ -87,7 +87,6 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_INT32(pRaw, dataPos, pStb->version, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, _OVER)
SDB_SET_INT32(pRaw, dataPos, (int32_t)(pStb->xFilesFactor * 10000), _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->aggregationMethod, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->delay, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->ttl, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, _OVER)
......@@ -175,7 +174,6 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
int32_t xFilesFactor = 0;
SDB_GET_INT32(pRaw, dataPos, &xFilesFactor, _OVER)
pStb->xFilesFactor = xFilesFactor / 10000.0f;
SDB_GET_INT32(pRaw, dataPos, &pStb->aggregationMethod, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->delay, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->ttl, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, _OVER)
......@@ -404,7 +402,7 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req.name = (char *)tNameGetTableName(&name);
req.ttl = 0;
req.keep = 0;
req.rollup = pStb->aggregationMethod > -1 ? 1 : 0;
req.rollup = pStb->pAst1 > 0 ? 1 : 0;
req.type = TD_SUPER_TABLE;
req.stbCfg.suid = pStb->uid;
req.stbCfg.nCols = pStb->numOfColumns;
......@@ -433,20 +431,8 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
pRSmaParam->xFilesFactor = pStb->xFilesFactor;
pRSmaParam->delay = pStb->delay;
pRSmaParam->nFuncIds = 1; // only 1 aggregation method supported currently
pRSmaParam->pFuncIds = (func_id_t *)taosMemoryCalloc(pRSmaParam->nFuncIds, sizeof(func_id_t));
if (pRSmaParam->pFuncIds == NULL) {
taosMemoryFreeClear(req.stbCfg.pRSmaParam);
taosMemoryFreeClear(req.stbCfg.pSchema);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for (int32_t f = 0; f < pRSmaParam->nFuncIds; ++f) {
*(pRSmaParam->pFuncIds + f) = pStb->aggregationMethod;
}
if (pStb->ast1Len > 0) {
if (mndConvertRSmaTask(pStb->pAst1, 0, 0, &pRSmaParam->qmsg1, &pRSmaParam->qmsg1Len) != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pRSmaParam->pFuncIds);
taosMemoryFreeClear(req.stbCfg.pRSmaParam);
taosMemoryFreeClear(req.stbCfg.pSchema);
return NULL;
......@@ -455,7 +441,6 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
if (pStb->ast2Len > 0) {
int32_t qmsgLen2 = 0;
if (mndConvertRSmaTask(pStb->pAst2, 0, 0, &pRSmaParam->qmsg2, &pRSmaParam->qmsg2Len) != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pRSmaParam->pFuncIds);
taosMemoryFreeClear(pRSmaParam->qmsg1);
taosMemoryFreeClear(req.stbCfg.pRSmaParam);
taosMemoryFreeClear(req.stbCfg.pSchema);
......@@ -470,7 +455,6 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
SMsgHead *pHead = taosMemoryMalloc(contLen);
if (pHead == NULL) {
if (pRSmaParam) {
taosMemoryFreeClear(pRSmaParam->pFuncIds);
taosMemoryFreeClear(pRSmaParam->qmsg1);
taosMemoryFreeClear(pRSmaParam->qmsg2);
taosMemoryFreeClear(pRSmaParam);
......@@ -488,7 +472,6 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
*pContLen = contLen;
if (pRSmaParam) {
taosMemoryFreeClear(pRSmaParam->pFuncIds);
taosMemoryFreeClear(pRSmaParam->qmsg1);
taosMemoryFreeClear(pRSmaParam->qmsg2);
taosMemoryFreeClear(pRSmaParam);
......@@ -706,7 +689,6 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
stbObj.version = 1;
stbObj.nextColId = 1;
stbObj.xFilesFactor = pCreate->xFilesFactor;
stbObj.aggregationMethod = pCreate->aggregationMethod;
stbObj.delay = pCreate->delay;
stbObj.ttl = pCreate->ttl;
stbObj.numOfColumns = pCreate->numOfColumns;
......
......@@ -294,7 +294,6 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) {
taosMemoryFree(vCreateTbReq.stbCfg.pSchema);
taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema);
if (vCreateTbReq.stbCfg.pRSmaParam) {
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->pFuncIds);
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->qmsg1);
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->qmsg2);
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam);
......@@ -334,7 +333,6 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
taosMemoryFree(pCreateTbReq->stbCfg.pSchema);
taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema);
if (pCreateTbReq->stbCfg.pRSmaParam) {
taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam->pFuncIds);
taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam);
}
} else if (pCreateTbReq->type == TD_CHILD_TABLE) {
......@@ -342,7 +340,6 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
} else {
taosMemoryFree(pCreateTbReq->ntbCfg.pSchema);
if (pCreateTbReq->ntbCfg.pRSmaParam) {
taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam->pFuncIds);
taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam);
}
}
......@@ -371,7 +368,6 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq) {
taosMemoryFree(vAlterTbReq.stbCfg.pSchema);
taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema);
if (vAlterTbReq.stbCfg.pRSmaParam) {
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam->pFuncIds);
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam);
}
taosMemoryFree(vAlterTbReq.name);
......
......@@ -41,7 +41,9 @@ extern "C" {
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
#define FUNC_UDF_ID_START_OFFSET_VAL 5000
#define FUNC_UDF_ID_START 5000
#define FUNC_AGGREGATE_UDF_ID 5001
#define FUNC_SCALAR_UDF_ID 5002
extern const int funcMgtUdfNum;
......
......@@ -24,7 +24,6 @@
typedef struct SFuncMgtService {
SHashObj* pFuncNameHashTable;
SArray* pUdfTable; // SUdfInfo
} SFuncMgtService;
typedef struct SUdfInfo {
......@@ -49,18 +48,12 @@ static void doInitFunctionTable() {
return;
}
}
gFunMgtService.pUdfTable = NULL;
}
static int8_t getUdfType(int32_t funcId) {
SUdfInfo* pUdf = taosArrayGet(gFunMgtService.pUdfTable, funcId - FUNC_UDF_ID_START_OFFSET_VAL - 1);
return pUdf->funcType;
}
static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
if (fmIsUserDefinedFunc(funcId)) {
return getUdfType(funcId);
return FUNC_MGT_AGG_FUNC == classification ? FUNC_AGGREGATE_UDF_ID == funcId :
(FUNC_MGT_SCALAR_FUNC == classification ? FUNC_SCALAR_UDF_ID == funcId : false);
}
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return false;
......@@ -68,33 +61,23 @@ static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
return FUNC_MGT_TEST_MASK(funcMgtBuiltins[funcId].classification, classification);
}
static int32_t getUdfId(SFmGetFuncInfoParam* pParam, const char* pFuncName) {
static int32_t getUdfInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc) {
SFuncInfo* pInfo = NULL;
int32_t code = catalogGetUdfInfo(pParam->pCtg, pParam->pRpc, pParam->pMgmtEps, pFuncName, &pInfo);
if (TSDB_CODE_SUCCESS != code || NULL == pInfo) {
return -1;
}
if (NULL == gFunMgtService.pUdfTable) {
gFunMgtService.pUdfTable = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SUdfInfo));
}
SUdfInfo info = { .outputDt.type = pInfo->outputType, .outputDt.bytes = pInfo->outputLen, .funcType = pInfo->funcType };
taosArrayPush(gFunMgtService.pUdfTable, &info);
int32_t code = catalogGetUdfInfo(pParam->pCtg, pParam->pRpc, pParam->pMgmtEps, pFunc->functionName, &pInfo);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
if (NULL == pInfo) {
snprintf(pParam->pErrBuf, pParam->errBufLen, "Invalid function name: %s", pFunc->functionName);
return TSDB_CODE_FUNC_INVALID_FUNTION;
}
pFunc->funcType = FUNCTION_TYPE_UDF;
pFunc->funcId = TSDB_FUNC_TYPE_AGGREGATE == pInfo->funcType ? FUNC_AGGREGATE_UDF_ID : FUNC_SCALAR_UDF_ID;
pFunc->node.resType.type = pInfo->outputType;
pFunc->node.resType.bytes = pInfo->outputLen;
pFunc->udfBufSize = pInfo->bufSize;
tFreeSFuncInfo(pInfo);
taosMemoryFree(pInfo);
return taosArrayGetSize(gFunMgtService.pUdfTable) + FUNC_UDF_ID_START_OFFSET_VAL;
}
static int32_t getFuncId(SFmGetFuncInfoParam* pParam, const char* pFuncName) {
void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFuncName, strlen(pFuncName));
if (NULL == pVal) {
return getUdfId(pParam, pFuncName);
}
return *(int32_t*)pVal;
}
static int32_t getUdfResultType(SFunctionNode* pFunc) {
SUdfInfo* pUdf = taosArrayGet(gFunMgtService.pUdfTable, pFunc->funcId - FUNC_UDF_ID_START_OFFSET_VAL - 1);
pFunc->node.resType = pUdf->outputDt;
return TSDB_CODE_SUCCESS;
}
......@@ -103,28 +86,14 @@ int32_t fmFuncMgtInit() {
return initFunctionCode;
}
int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType) {
*pFuncId = getFuncId(pParam, pFuncName);
if (*pFuncId < 0) {
return TSDB_CODE_FAILED;
}
if (fmIsUserDefinedFunc(*pFuncId)) {
*pFuncType = FUNCTION_TYPE_UDF;
} else {
*pFuncType = funcMgtBuiltins[*pFuncId].type;
}
return TSDB_CODE_SUCCESS;
}
int32_t fmGetFuncResultType(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (fmIsUserDefinedFunc(pFunc->funcId)) {
return getUdfResultType(pFunc);
}
if (pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) {
return TSDB_CODE_FAILED;
int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc) {
void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFunc->functionName, strlen(pFunc->functionName));
if (NULL != pVal) {
pFunc->funcId = *(int32_t*)pVal;
pFunc->funcType = funcMgtBuiltins[pFunc->funcId].type;
return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, pParam->pErrBuf, pParam->errBufLen);
}
return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, pErrBuf, len);
return getUdfInfo(pParam, pFunc);
}
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
......@@ -194,7 +163,7 @@ bool fmIsMultiResFunc(int32_t funcId) {
}
bool fmIsUserDefinedFunc(int32_t funcId) {
return funcId > FUNC_UDF_ID_START_OFFSET_VAL;
return funcId > FUNC_UDF_ID_START;
}
void fmFuncMgtDestroy() {
......
......@@ -95,7 +95,6 @@ SNode* createOperatorNode(SAstCreateContext* pCxt, EOperatorType type, SNode* pL
SNode* createBetweenAnd(SAstCreateContext* pCxt, SNode* pExpr, SNode* pLeft, SNode* pRight);
SNode* createNotBetweenAnd(SAstCreateContext* pCxt, SNode* pExpr, SNode* pLeft, SNode* pRight);
SNode* createFunctionNode(SAstCreateContext* pCxt, const SToken* pFuncName, SNodeList* pParameterList);
SNode* createFunctionNodeNoArg(SAstCreateContext* pCxt, const SToken* pFuncName);
SNode* createCastFunctionNode(SAstCreateContext* pCxt, SNode* pExpr, SDataType dt);
SNode* createNodeListNode(SAstCreateContext* pCxt, SNodeList* pList);
SNode* createNodeListNodeEx(SAstCreateContext* pCxt, SNode* p1, SNode* p2);
......
......@@ -498,6 +498,7 @@ signed_literal(A) ::= NK_BOOL(B).
signed_literal(A) ::= TIMESTAMP NK_STRING(B). { A = createValueNode(pCxt, TSDB_DATA_TYPE_TIMESTAMP, &B); }
signed_literal(A) ::= duration_literal(B). { A = releaseRawExprNode(pCxt, B); }
signed_literal(A) ::= NULL. { A = createValueNode(pCxt, TSDB_DATA_TYPE_NULL, NULL); }
signed_literal(A) ::= literal_func(B). { A = releaseRawExprNode(pCxt, B); }
%type literal_list { SNodeList* }
%destructor literal_list { nodesDestroyList($$); }
......@@ -610,8 +611,10 @@ pseudo_column(A) ::= WDURATION(B).
function_expression(A) ::= function_name(B) NK_LP expression_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); }
function_expression(A) ::= star_func(B) NK_LP star_func_para_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); }
function_expression(A) ::= CAST(B) NK_LP expression(C) AS type_name(D) NK_RP(E). { A = createRawExprNodeExt(pCxt, &B, &E, createCastFunctionNode(pCxt, releaseRawExprNode(pCxt, C), D)); }
function_expression(A) ::= noarg_func(B) NK_LP NK_RP(C). { A = createRawExprNodeExt(pCxt, &B, &C, createFunctionNodeNoArg(pCxt, &B)); }
//function_expression(A) ::= NOW(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
function_expression(A) ::= literal_func(B). { A = B; }
literal_func(A) ::= noarg_func(B) NK_LP NK_RP(C). { A = createRawExprNodeExt(pCxt, &B, &C, createFunctionNode(pCxt, &B, NULL)); }
literal_func(A) ::= NOW(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
%type noarg_func { SToken }
%destructor noarg_func { }
......
......@@ -368,39 +368,6 @@ SNode* createFunctionNode(SAstCreateContext* pCxt, const SToken* pFuncName, SNod
return (SNode*)func;
}
SNode* createFunctionNodeNoArg(SAstCreateContext* pCxt, const SToken* pFuncName) {
SFunctionNode* func = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
CHECK_OUT_OF_MEM(func);
char buf[64] = {0};
int32_t dataType;
switch (pFuncName->type) {
case TK_NOW: {
int64_t ts = taosGetTimestamp(TSDB_TIME_PRECISION_MILLI);
snprintf(buf, sizeof(buf), "%"PRId64, ts);
dataType = TSDB_DATA_TYPE_BIGINT;
break;
}
case TK_TODAY: {
int64_t ts = taosGetTimestampToday(TSDB_TIME_PRECISION_MILLI);
snprintf(buf, sizeof(buf), "%"PRId64, ts);
dataType = TSDB_DATA_TYPE_BIGINT;
break;
}
case TK_TIMEZONE: {
strncpy(buf, tsTimezoneStr, strlen(tsTimezoneStr));
dataType = TSDB_DATA_TYPE_BINARY;
break;
}
}
SToken token = {.type = pFuncName->type, .n = strlen(buf), .z = buf};
SNodeList *pParameterList = createNodeList(pCxt, createValueNode(pCxt, dataType, &token));
strncpy(func->functionName, pFuncName->z, pFuncName->n);
func->pParameterList = pParameterList;
return (SNode*)func;
}
SNode* createCastFunctionNode(SAstCreateContext* pCxt, SNode* pExpr, SDataType dt) {
SFunctionNode* func = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
CHECK_OUT_OF_MEM(func);
......@@ -443,10 +410,7 @@ SNode* createRealTableNode(SAstCreateContext* pCxt, SToken* pDbName, SToken* pTa
} else {
strncpy(realTable->table.tableAlias, pTableName->z, pTableName->n);
}
strncpy(realTable->table.tableName, pTableName->z, pTableName->n);
if (NULL != pCxt->pQueryCxt->db) {
strcpy(realTable->useDbName, pCxt->pQueryCxt->db);
}
strncpy(realTable->table.tableName, pTableName->z, pTableName->n);
return (SNode*)realTable;
}
......
......@@ -19,6 +19,7 @@
#include "cmdnodes.h"
#include "functionMgt.h"
#include "parUtil.h"
#include "scalar.h"
#include "tglobal.h"
#include "ttime.h"
......@@ -587,13 +588,14 @@ static EDealRes haveAggFunction(SNode* pNode, void* pContext) {
}
static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) {
SFmGetFuncInfoParam param = {.pCtg = pCxt->pParseCxt->pCatalog,
.pRpc = pCxt->pParseCxt->pTransporter,
.pMgmtEps = &pCxt->pParseCxt->mgmtEpSet};
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(&param, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName);
}
pCxt->errCode = fmGetFuncResultType(pFunc, pCxt->msgBuf.buf, pCxt->msgBuf.len);
SFmGetFuncInfoParam param = {
.pCtg = pCxt->pParseCxt->pCatalog,
.pRpc = pCxt->pParseCxt->pTransporter,
.pMgmtEps = &pCxt->pParseCxt->mgmtEpSet,
.pErrBuf = pCxt->msgBuf.buf,
.errBufLen = pCxt->msgBuf.len
};
pCxt->errCode = fmGetFuncInfo(&param, pFunc);
if (TSDB_CODE_SUCCESS != pCxt->errCode) {
return DEAL_RES_ERROR;
}
......@@ -759,6 +761,19 @@ static int32_t toVgroupsInfo(SArray* pVgs, SVgroupsInfo** pVgsInfo) {
return TSDB_CODE_SUCCESS;
}
static int32_t addMnodeToVgroupList(const SEpSet* pEpSet, SArray** pVgroupList) {
if (NULL == *pVgroupList) {
*pVgroupList = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVgroupInfo));
if (NULL == *pVgroupList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
SVgroupInfo vg = { .vgId = MNODE_HANDLE };
memcpy(&vg.epSet, pEpSet, sizeof(SEpSet));
taosArrayPush(*pVgroupList, &vg);
return TSDB_CODE_SUCCESS;
}
static int32_t setSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
if (0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES)) {
return TSDB_CODE_SUCCESS;
......@@ -766,12 +781,20 @@ static int32_t setSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRea
int32_t code = TSDB_CODE_SUCCESS;
SArray* vgroupList = NULL;
if ('\0' != pRealTable->useDbName[0]) {
code = getDBVgInfo(pCxt, pRealTable->useDbName, &vgroupList);
if ('\0' != pRealTable->qualDbName[0]) {
// todo release after mnode can be processed
// if (0 != strcmp(pRealTable->qualDbName, TSDB_INFORMATION_SCHEMA_DB)) {
code = getDBVgInfo(pCxt, pRealTable->qualDbName, &vgroupList);
// }
} else {
code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
}
// todo release after mnode can be processed
// if (TSDB_CODE_SUCCESS == code) {
// code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList);
// }
if (TSDB_CODE_SUCCESS == code) {
code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList);
}
......@@ -1965,13 +1988,6 @@ static int32_t checkTableRollupOption(STranslateContext* pCxt, SNodeList* pFuncs
if (1 != LIST_LENGTH(pFuncs)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROLLUP_OPTION);
}
SFunctionNode* pFunc = nodesListGetNode(pFuncs, 0);
SFmGetFuncInfoParam param = {.pCtg = pCxt->pParseCxt->pCatalog,
.pRpc = pCxt->pParseCxt->pTransporter,
.pMgmtEps = &pCxt->pParseCxt->mgmtEpSet};
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(&param, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName);
}
return TSDB_CODE_SUCCESS;
}
......@@ -2001,13 +2017,6 @@ static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt
return code;
}
static int32_t getAggregationMethod(SNodeList* pFuncs) {
if (NULL == pFuncs) {
return -1;
}
return ((SFunctionNode*)nodesListGetNode(pFuncs, 0))->funcId;
}
static void toSchema(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSchema) {
int8_t flags = 0;
if (pCol->sma) {
......@@ -2229,7 +2238,6 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt,
static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStmt, SMCreateStbReq* pReq) {
pReq->igExists = pStmt->ignoreExists;
pReq->aggregationMethod = getAggregationMethod(pStmt->pOptions->pFuncs);
pReq->xFilesFactor = GET_OPTION_VAL(pStmt->pOptions->pFilesFactor, TSDB_DEFAULT_DB_FILE_FACTOR);
pReq->delay = GET_OPTION_VAL(pStmt->pOptions->pDelay, TSDB_DEFAULT_DB_DELAY);
columnDefNodeToField(pStmt->pCols, &pReq->pColumns);
......@@ -3193,7 +3201,7 @@ static int32_t createShowCondition(const SShowStmt* pShow, SSelectStmt* pSelect)
}
if (NULL != pShow->pDbName) {
strcpy(((SRealTableNode*)pSelect->pFromTable)->useDbName, ((SValueNode*)pShow->pDbName)->literal);
strcpy(((SRealTableNode*)pSelect->pFromTable)->qualDbName, ((SValueNode*)pShow->pDbName)->literal);
}
return TSDB_CODE_SUCCESS;
......@@ -3235,14 +3243,6 @@ static int32_t buildSmaParam(STableOptions* pOptions, SVCreateTbReq* pReq) {
}
pReq->ntbCfg.pRSmaParam->delay = GET_OPTION_VAL(pOptions->pDelay, TSDB_DEFAULT_DB_DELAY);
pReq->ntbCfg.pRSmaParam->xFilesFactor = GET_OPTION_VAL(pOptions->pFilesFactor, TSDB_DEFAULT_DB_FILE_FACTOR);
pReq->ntbCfg.pRSmaParam->nFuncIds = LIST_LENGTH(pOptions->pFuncs);
pReq->ntbCfg.pRSmaParam->pFuncIds = taosMemoryCalloc(pReq->ntbCfg.pRSmaParam->nFuncIds, sizeof(func_id_t));
if (NULL == pReq->ntbCfg.pRSmaParam->pFuncIds) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t index = 0;
SNode* pFunc = NULL;
FOREACH(pFunc, pOptions->pFuncs) { pReq->ntbCfg.pRSmaParam->pFuncIds[index++] = ((SFunctionNode*)pFunc)->funcId; }
return TSDB_CODE_SUCCESS;
}
......@@ -3428,10 +3428,6 @@ static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SS
return parseJsontoTagData(pVal->literal, pBuilder, &pCxt->msgBuf, pSchema->colId);
}
if (DEAL_RES_ERROR == translateValue(pCxt, pVal)) {
return pCxt->errCode;
}
if (pVal->node.resType.type == TSDB_DATA_TYPE_NULL) {
// todo
} else {
......@@ -3442,6 +3438,23 @@ static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SS
return TSDB_CODE_SUCCESS;
}
static int32_t createValueFromFunction(STranslateContext* pCxt, SFunctionNode* pFunc, SValueNode** pVal) {
if (DEAL_RES_ERROR == translateFunction(pCxt, pFunc)) {
return pCxt->errCode;
}
return scalarCalculateConstants((SNode*)pFunc, (SNode**)pVal);
}
static int32_t translateTagVal(STranslateContext* pCxt, SNode* pNode, SValueNode** pVal) {
if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
return createValueFromFunction(pCxt, (SFunctionNode*)pNode, pVal);
} else if (QUERY_NODE_VALUE == nodeType(pNode)) {
return (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pNode) ? pCxt->errCode : TSDB_CODE_SUCCESS);
} else {
return TSDB_CODE_FAILED;
}
}
static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableClause* pStmt, STableMeta* pSuperTableMeta,
SKVRowBuilder* pBuilder) {
int32_t numOfTags = getNumOfTags(pSuperTableMeta);
......@@ -3451,8 +3464,8 @@ static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableCla
}
SSchema* pTagSchema = getTableTagSchema(pSuperTableMeta);
SNode * pTag, *pVal;
FORBOTH(pTag, pStmt->pSpecificTags, pVal, pStmt->pValsOfTags) {
SNode * pTag, *pNode;
FORBOTH(pTag, pStmt->pSpecificTags, pNode, pStmt->pValsOfTags) {
SColumnNode* pCol = (SColumnNode*)pTag;
SSchema* pSchema = NULL;
for (int32_t i = 0; i < numOfTags; ++i) {
......@@ -3464,7 +3477,18 @@ static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableCla
if (NULL == pSchema) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAG_NAME, pCol->colName);
}
int32_t code = addValToKVRow(pCxt, (SValueNode*)pVal, pSchema, pBuilder);
SValueNode* pVal = NULL;
int32_t code = translateTagVal(pCxt, pNode, &pVal);
if (TSDB_CODE_SUCCESS == code) {
if (NULL == pVal) {
pVal = (SValueNode*)pNode;
} else {
REPLACE_LIST2_NODE(pVal);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = addValToKVRow(pCxt, pVal, pSchema, pBuilder);
}
if (TSDB_CODE_SUCCESS != code) {
return code;
}
......@@ -3480,10 +3504,21 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau
}
SSchema* pTagSchema = getTableTagSchema(pSuperTableMeta);
SNode* pVal;
SNode* pNode;
int32_t index = 0;
FOREACH(pVal, pStmt->pValsOfTags) {
int32_t code = addValToKVRow(pCxt, (SValueNode*)pVal, pTagSchema + index++, pBuilder);
FOREACH(pNode, pStmt->pValsOfTags) {
SValueNode* pVal = NULL;
int32_t code = translateTagVal(pCxt, pNode, &pVal);
if (TSDB_CODE_SUCCESS == code) {
if (NULL == pVal) {
pVal = (SValueNode*)pNode;
} else {
REPLACE_NODE(pVal);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = addValToKVRow(pCxt, pVal, pTagSchema + index++, pBuilder);
}
if (TSDB_CODE_SUCCESS != code) {
return code;
}
......
......@@ -30,8 +30,6 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "Column ambiguously defined: %s";
case TSDB_CODE_PAR_WRONG_VALUE_TYPE:
return "Invalid value type: %s";
case TSDB_CODE_PAR_INVALID_FUNTION:
return "Invalid function name: %s";
case TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION:
return "There mustn't be aggregation";
case TSDB_CODE_PAR_WRONG_NUMBER_OF_SELECT:
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
......@@ -353,7 +353,7 @@ TEST_F(ParserTest, selectSemanticError) {
// TSDB_CODE_PAR_INVALID_FUNTION
bind("SELECT cnt(*) FROM t1");
ASSERT_TRUE(run(TSDB_CODE_SUCCESS, TSDB_CODE_PAR_INVALID_FUNTION));
ASSERT_TRUE(run(TSDB_CODE_SUCCESS, TSDB_CODE_FUNC_INVALID_FUNTION));
// TSDB_CODE_PAR_FUNTION_PARA_NUM
// TSDB_CODE_PAR_FUNTION_PARA_TYPE
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册