diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 2d8fd9a93cadc1275e937ade9a8b859dcebe7dc9..788512e0e8f424b5654f36c6cd7a0af13ff6da3d 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -48,11 +48,12 @@ typedef struct SParseContext { } SParseContext; int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery); -bool isInsertSql(const char* pStr, size_t length); +bool qIsInsertSql(const char* pStr, size_t length); void qDestroyQuery(SQuery* pQueryNode); int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema); +int32_t qSetSTableIdForRSma(SNode* pStmt, int64_t uid); int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash); int32_t qResetStmtDataBlock(void* block, bool keepBuf); diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 17468584822361d8a3f7ff048cb797a8174b1836..01d785ef73107778c818437c18d98c778d1f8893 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -48,7 +48,8 @@ int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) { break; case STMT_EXECUTE: if (STMT_TYPE_QUERY == pStmt->sql.type) { - if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) && STMT_STATUS_NE(BIND_COL)) { + if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) && + STMT_STATUS_NE(BIND_COL)) { code = TSDB_CODE_TSC_STMT_API_ERROR; } } else { @@ -230,22 +231,6 @@ int32_t stmtParseSql(STscStmt* pStmt) { pStmt->sql.type = STMT_TYPE_QUERY; } -/* - switch (nodeType(pStmt->sql.pQuery->pRoot)) { - case QUERY_NODE_VNODE_MODIF_STMT: - if (0 == pStmt->sql.type) { - pStmt->sql.type = STMT_TYPE_INSERT; - } - break; - case QUERY_NODE_SELECT_STMT: - pStmt->sql.type = STMT_TYPE_QUERY; - break; - default: - tscError("not supported stmt type %d", nodeType(pStmt->sql.pQuery->pRoot)); - STMT_ERR_RET(TSDB_CODE_TSC_STMT_CLAUSE_ERROR); - } -*/ - return TSDB_CODE_SUCCESS; } @@ -823,7 +808,7 @@ _return: code = stmtUpdateTableUid(pStmt, pRsp); } } - + tFreeSSubmitRsp(pRsp); ++pStmt->sql.runTimes; @@ -861,7 +846,7 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) { if (pStmt->sql.type) { *insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type); } else { - *insert = isInsertSql(pStmt->sql.sqlStr, 0); + *insert = qIsInsertSql(pStmt->sql.sqlStr, 0); } return TSDB_CODE_SUCCESS; diff --git a/source/dnode/mnode/impl/inc/mndScheduler.h b/source/dnode/mnode/impl/inc/mndScheduler.h index 33af040915688fd83c4a82af3c89047be5d20dae..9f4e377dd17dffc94ab04366e2c1ba61e170b92f 100644 --- a/source/dnode/mnode/impl/inc/mndScheduler.h +++ b/source/dnode/mnode/impl/inc/mndScheduler.h @@ -29,7 +29,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); -int32_t mndConvertRSmaTask(const char* ast, int8_t triggerType, int64_t watermark, char** pStr, int32_t* pLen); +int32_t mndConvertRSmaTask(const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, char** pStr, + int32_t* pLen); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 22a5f37334b4f18a422249afa9e870068e0e5f83..1abce71a88080f5d9dea5ea8d0e53501e808e25f 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -28,13 +28,15 @@ #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" +#include "parser.h" #include "tcompare.h" #include "tname.h" #include "tuuid.h" extern bool tsStreamSchedV; -int32_t mndConvertRSmaTask(const char* ast, int8_t triggerType, int64_t watermark, char** pStr, int32_t* pLen) { +int32_t mndConvertRSmaTask(const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, char** pStr, + int32_t* pLen) { SNode* pAst = NULL; SQueryPlan* pPlan = NULL; terrno = TSDB_CODE_SUCCESS; @@ -44,6 +46,11 @@ int32_t mndConvertRSmaTask(const char* ast, int8_t triggerType, int64_t watermar goto END; } + if (qSetSTableIdForRSma(pAst, uid) < 0) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + goto END; + } + SPlanContext cxt = { .pAstRoot = pAst, .topicQuery = false, diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 61f115e2bab32b64ee6a57e967fb0c8e5c287d0f..dbfced547dbe12659a91d146da4ec73fca9a1693 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -401,12 +401,14 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt req.pRSmaParam.xFilesFactor = pStb->xFilesFactor; req.pRSmaParam.delay = pStb->delay; if (pStb->ast1Len > 0) { - if (mndConvertRSmaTask(pStb->pAst1, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len) != TSDB_CODE_SUCCESS) { + if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len) != + TSDB_CODE_SUCCESS) { return NULL; } } if (pStb->ast2Len > 0) { - if (mndConvertRSmaTask(pStb->pAst2, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len) != TSDB_CODE_SUCCESS) { + if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len) != + TSDB_CODE_SUCCESS) { return NULL; } } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 559fe3e85a7e37b1a811c757c965448a9b9bfabf..d7267bb85de8f126c9cc2110a4097b144f36ce05 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -46,8 +46,9 @@ typedef struct SFullDatabaseName { char fullDbName[TSDB_DB_FNAME_LEN]; } SFullDatabaseName; -static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode); -static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode); +static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode); +static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode); +static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal); static bool afterGroupBy(ESqlClause clause) { return clause > SQL_CLAUSE_GROUP_BY; } @@ -542,17 +543,18 @@ static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode** pCol) { return res; } -static int32_t parseTimeFromValueNode(SValueNode* pVal) { - if (IS_SIGNED_NUMERIC_TYPE(pVal->node.resType.type)) { - return TSDB_CODE_SUCCESS; - } else if (IS_UNSIGNED_NUMERIC_TYPE(pVal->node.resType.type)) { - pVal->datum.i = pVal->datum.u; - return TSDB_CODE_SUCCESS; - } else if (IS_FLOAT_TYPE(pVal->node.resType.type)) { - pVal->datum.i = pVal->datum.d; - return TSDB_CODE_SUCCESS; - } else if (TSDB_DATA_TYPE_BOOL == pVal->node.resType.type) { - pVal->datum.i = pVal->datum.b; +static int32_t parseTimeFromValueNode(STranslateContext* pCxt, SValueNode* pVal) { + if (IS_NUMERIC_TYPE(pVal->node.resType.type) || TSDB_DATA_TYPE_BOOL == pVal->node.resType.type) { + if (DEAL_RES_ERROR == translateValue(pCxt, pVal)) { + return pCxt->errCode; + } + if (IS_UNSIGNED_NUMERIC_TYPE(pVal->node.resType.type)) { + pVal->datum.i = pVal->datum.u; + } else if (IS_FLOAT_TYPE(pVal->node.resType.type)) { + pVal->datum.i = pVal->datum.d; + } else if (TSDB_DATA_TYPE_BOOL == pVal->node.resType.type) { + pVal->datum.i = pVal->datum.b; + } return TSDB_CODE_SUCCESS; } else if (IS_VAR_DATA_TYPE(pVal->node.resType.type) || TSDB_DATA_TYPE_TIMESTAMP == pVal->node.resType.type) { if (TSDB_CODE_SUCCESS == taosParseTime(pVal->literal, &pVal->datum.i, pVal->node.resType.bytes, @@ -588,62 +590,52 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD *(bool*)&pVal->typeData = pVal->datum.b; break; case TSDB_DATA_TYPE_TINYINT: { - char* endPtr = NULL; - pVal->datum.i = taosStr2Int64(pVal->literal, &endPtr, 10); + pVal->datum.i = taosStr2Int64(pVal->literal, NULL, 10); *(int8_t*)&pVal->typeData = pVal->datum.i; break; } case TSDB_DATA_TYPE_SMALLINT: { - char* endPtr = NULL; - pVal->datum.i = taosStr2Int64(pVal->literal, &endPtr, 10); + pVal->datum.i = taosStr2Int64(pVal->literal, NULL, 10); *(int16_t*)&pVal->typeData = pVal->datum.i; break; } case TSDB_DATA_TYPE_INT: { - char* endPtr = NULL; - pVal->datum.i = taosStr2Int64(pVal->literal, &endPtr, 10); + pVal->datum.i = taosStr2Int64(pVal->literal, NULL, 10); *(int32_t*)&pVal->typeData = pVal->datum.i; break; } case TSDB_DATA_TYPE_BIGINT: { - char* endPtr = NULL; - pVal->datum.i = taosStr2Int64(pVal->literal, &endPtr, 10); + pVal->datum.i = taosStr2Int64(pVal->literal, NULL, 10); *(int64_t*)&pVal->typeData = pVal->datum.i; break; } case TSDB_DATA_TYPE_UTINYINT: { - char* endPtr = NULL; - pVal->datum.u = taosStr2UInt64(pVal->literal, &endPtr, 10); + pVal->datum.u = taosStr2UInt64(pVal->literal, NULL, 10); *(uint8_t*)&pVal->typeData = pVal->datum.u; break; } case TSDB_DATA_TYPE_USMALLINT: { - char* endPtr = NULL; - pVal->datum.u = taosStr2UInt64(pVal->literal, &endPtr, 10); + pVal->datum.u = taosStr2UInt64(pVal->literal, NULL, 10); *(uint16_t*)&pVal->typeData = pVal->datum.u; break; } case TSDB_DATA_TYPE_UINT: { - char* endPtr = NULL; - pVal->datum.u = taosStr2UInt64(pVal->literal, &endPtr, 10); + pVal->datum.u = taosStr2UInt64(pVal->literal, NULL, 10); *(uint32_t*)&pVal->typeData = pVal->datum.u; break; } case TSDB_DATA_TYPE_UBIGINT: { - char* endPtr = NULL; - pVal->datum.u = taosStr2UInt64(pVal->literal, &endPtr, 10); + pVal->datum.u = taosStr2UInt64(pVal->literal, NULL, 10); *(uint64_t*)&pVal->typeData = pVal->datum.u; break; } case TSDB_DATA_TYPE_FLOAT: { - char* endPtr = NULL; - pVal->datum.d = taosStr2Double(pVal->literal, &endPtr); + pVal->datum.d = taosStr2Double(pVal->literal, NULL); *(float*)&pVal->typeData = pVal->datum.d; break; } case TSDB_DATA_TYPE_DOUBLE: { - char* endPtr = NULL; - pVal->datum.d = taosStr2Double(pVal->literal, &endPtr); + pVal->datum.d = taosStr2Double(pVal->literal, NULL); *(double*)&pVal->typeData = pVal->datum.d; break; } @@ -659,7 +651,7 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD break; } case TSDB_DATA_TYPE_TIMESTAMP: { - if (TSDB_CODE_SUCCESS != parseTimeFromValueNode(pVal)) { + if (TSDB_CODE_SUCCESS != parseTimeFromValueNode(pCxt, pVal)) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal); } *(int64_t*)&pVal->typeData = pVal->datum.i; diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 688e20063a4f02f3b077b116e1b702c428562c71..6dac1e1250df2de8d41df4c9b342f783e54f2b22 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -19,7 +19,7 @@ #include "parInt.h" #include "parToken.h" -bool isInsertSql(const char* pStr, size_t length) { +bool qIsInsertSql(const char* pStr, size_t length) { if (NULL == pStr) { return false; } @@ -169,7 +169,7 @@ static void rewriteExprAlias(SNode* pRoot) { int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) { int32_t code = TSDB_CODE_SUCCESS; - if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) { + if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) { code = parseInsertSql(pCxt, pQuery); } else { code = parseSqlIntoAst(pCxt, pQuery); @@ -184,6 +184,18 @@ int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** p return extractResultSchema(pRoot, numOfCols, pSchema); } +int32_t qSetSTableIdForRSma(SNode* pStmt, int64_t uid) { + if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) { + SNode* pTable = ((SSelectStmt*)pStmt)->pFromTable; + if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) { + ((SRealTableNode*)pTable)->pMeta->uid = uid; + ((SRealTableNode*)pTable)->pMeta->suid = uid; + return TSDB_CODE_SUCCESS; + } + } + return TSDB_CODE_FAILED; +} + int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx) { int32_t code = TSDB_CODE_SUCCESS;