提交 7989190b 编写于 作者: X Xiaoyu Wang

fix: some problems of rollup sma

上级 e27f8e8a
...@@ -48,11 +48,12 @@ typedef struct SParseContext { ...@@ -48,11 +48,12 @@ typedef struct SParseContext {
} SParseContext; } SParseContext;
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery); int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
bool isInsertSql(const char* pStr, size_t length); bool qIsInsertSql(const char* pStr, size_t length);
void qDestroyQuery(SQuery* pQueryNode); void qDestroyQuery(SQuery* pQueryNode);
int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema); int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
int32_t qSetSTableIdForRSma(SNode* pStmt, int64_t uid);
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash); int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash);
int32_t qResetStmtDataBlock(void* block, bool keepBuf); int32_t qResetStmtDataBlock(void* block, bool keepBuf);
......
...@@ -48,7 +48,8 @@ int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) { ...@@ -48,7 +48,8 @@ int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
break; break;
case STMT_EXECUTE: case STMT_EXECUTE:
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) && STMT_STATUS_NE(BIND_COL)) { if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) &&
STMT_STATUS_NE(BIND_COL)) {
code = TSDB_CODE_TSC_STMT_API_ERROR; code = TSDB_CODE_TSC_STMT_API_ERROR;
} }
} else { } else {
...@@ -230,22 +231,6 @@ int32_t stmtParseSql(STscStmt* pStmt) { ...@@ -230,22 +231,6 @@ int32_t stmtParseSql(STscStmt* pStmt) {
pStmt->sql.type = STMT_TYPE_QUERY; pStmt->sql.type = STMT_TYPE_QUERY;
} }
/*
switch (nodeType(pStmt->sql.pQuery->pRoot)) {
case QUERY_NODE_VNODE_MODIF_STMT:
if (0 == pStmt->sql.type) {
pStmt->sql.type = STMT_TYPE_INSERT;
}
break;
case QUERY_NODE_SELECT_STMT:
pStmt->sql.type = STMT_TYPE_QUERY;
break;
default:
tscError("not supported stmt type %d", nodeType(pStmt->sql.pQuery->pRoot));
STMT_ERR_RET(TSDB_CODE_TSC_STMT_CLAUSE_ERROR);
}
*/
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -823,7 +808,7 @@ _return: ...@@ -823,7 +808,7 @@ _return:
code = stmtUpdateTableUid(pStmt, pRsp); code = stmtUpdateTableUid(pStmt, pRsp);
} }
} }
tFreeSSubmitRsp(pRsp); tFreeSSubmitRsp(pRsp);
++pStmt->sql.runTimes; ++pStmt->sql.runTimes;
...@@ -861,7 +846,7 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) { ...@@ -861,7 +846,7 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) {
if (pStmt->sql.type) { if (pStmt->sql.type) {
*insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type); *insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
} else { } else {
*insert = isInsertSql(pStmt->sql.sqlStr, 0); *insert = qIsInsertSql(pStmt->sql.sqlStr, 0);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -29,7 +29,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib ...@@ -29,7 +29,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
int32_t mndConvertRSmaTask(const char* ast, int8_t triggerType, int64_t watermark, char** pStr, int32_t* pLen); int32_t mndConvertRSmaTask(const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, char** pStr,
int32_t* pLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -28,13 +28,15 @@ ...@@ -28,13 +28,15 @@
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
#include "parser.h"
#include "tcompare.h" #include "tcompare.h"
#include "tname.h" #include "tname.h"
#include "tuuid.h" #include "tuuid.h"
extern bool tsStreamSchedV; extern bool tsStreamSchedV;
int32_t mndConvertRSmaTask(const char* ast, int8_t triggerType, int64_t watermark, char** pStr, int32_t* pLen) { int32_t mndConvertRSmaTask(const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, char** pStr,
int32_t* pLen) {
SNode* pAst = NULL; SNode* pAst = NULL;
SQueryPlan* pPlan = NULL; SQueryPlan* pPlan = NULL;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
...@@ -44,6 +46,11 @@ int32_t mndConvertRSmaTask(const char* ast, int8_t triggerType, int64_t watermar ...@@ -44,6 +46,11 @@ int32_t mndConvertRSmaTask(const char* ast, int8_t triggerType, int64_t watermar
goto END; goto END;
} }
if (qSetSTableIdForRSma(pAst, uid) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
SPlanContext cxt = { SPlanContext cxt = {
.pAstRoot = pAst, .pAstRoot = pAst,
.topicQuery = false, .topicQuery = false,
......
...@@ -401,12 +401,14 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt ...@@ -401,12 +401,14 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req.pRSmaParam.xFilesFactor = pStb->xFilesFactor; req.pRSmaParam.xFilesFactor = pStb->xFilesFactor;
req.pRSmaParam.delay = pStb->delay; req.pRSmaParam.delay = pStb->delay;
if (pStb->ast1Len > 0) { if (pStb->ast1Len > 0) {
if (mndConvertRSmaTask(pStb->pAst1, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len) != TSDB_CODE_SUCCESS) { if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len) !=
TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
} }
if (pStb->ast2Len > 0) { if (pStb->ast2Len > 0) {
if (mndConvertRSmaTask(pStb->pAst2, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len) != TSDB_CODE_SUCCESS) { if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len) !=
TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
} }
......
...@@ -46,8 +46,9 @@ typedef struct SFullDatabaseName { ...@@ -46,8 +46,9 @@ typedef struct SFullDatabaseName {
char fullDbName[TSDB_DB_FNAME_LEN]; char fullDbName[TSDB_DB_FNAME_LEN];
} SFullDatabaseName; } SFullDatabaseName;
static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode); static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode);
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode); static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode);
static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal);
static bool afterGroupBy(ESqlClause clause) { return clause > SQL_CLAUSE_GROUP_BY; } static bool afterGroupBy(ESqlClause clause) { return clause > SQL_CLAUSE_GROUP_BY; }
...@@ -542,17 +543,18 @@ static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode** pCol) { ...@@ -542,17 +543,18 @@ static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode** pCol) {
return res; return res;
} }
static int32_t parseTimeFromValueNode(SValueNode* pVal) { static int32_t parseTimeFromValueNode(STranslateContext* pCxt, SValueNode* pVal) {
if (IS_SIGNED_NUMERIC_TYPE(pVal->node.resType.type)) { if (IS_NUMERIC_TYPE(pVal->node.resType.type) || TSDB_DATA_TYPE_BOOL == pVal->node.resType.type) {
return TSDB_CODE_SUCCESS; if (DEAL_RES_ERROR == translateValue(pCxt, pVal)) {
} else if (IS_UNSIGNED_NUMERIC_TYPE(pVal->node.resType.type)) { return pCxt->errCode;
pVal->datum.i = pVal->datum.u; }
return TSDB_CODE_SUCCESS; if (IS_UNSIGNED_NUMERIC_TYPE(pVal->node.resType.type)) {
} else if (IS_FLOAT_TYPE(pVal->node.resType.type)) { pVal->datum.i = pVal->datum.u;
pVal->datum.i = pVal->datum.d; } else if (IS_FLOAT_TYPE(pVal->node.resType.type)) {
return TSDB_CODE_SUCCESS; pVal->datum.i = pVal->datum.d;
} else if (TSDB_DATA_TYPE_BOOL == pVal->node.resType.type) { } else if (TSDB_DATA_TYPE_BOOL == pVal->node.resType.type) {
pVal->datum.i = pVal->datum.b; pVal->datum.i = pVal->datum.b;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (IS_VAR_DATA_TYPE(pVal->node.resType.type) || TSDB_DATA_TYPE_TIMESTAMP == pVal->node.resType.type) { } else if (IS_VAR_DATA_TYPE(pVal->node.resType.type) || TSDB_DATA_TYPE_TIMESTAMP == pVal->node.resType.type) {
if (TSDB_CODE_SUCCESS == taosParseTime(pVal->literal, &pVal->datum.i, pVal->node.resType.bytes, if (TSDB_CODE_SUCCESS == taosParseTime(pVal->literal, &pVal->datum.i, pVal->node.resType.bytes,
...@@ -588,62 +590,52 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD ...@@ -588,62 +590,52 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD
*(bool*)&pVal->typeData = pVal->datum.b; *(bool*)&pVal->typeData = pVal->datum.b;
break; break;
case TSDB_DATA_TYPE_TINYINT: { case TSDB_DATA_TYPE_TINYINT: {
char* endPtr = NULL; pVal->datum.i = taosStr2Int64(pVal->literal, NULL, 10);
pVal->datum.i = taosStr2Int64(pVal->literal, &endPtr, 10);
*(int8_t*)&pVal->typeData = pVal->datum.i; *(int8_t*)&pVal->typeData = pVal->datum.i;
break; break;
} }
case TSDB_DATA_TYPE_SMALLINT: { case TSDB_DATA_TYPE_SMALLINT: {
char* endPtr = NULL; pVal->datum.i = taosStr2Int64(pVal->literal, NULL, 10);
pVal->datum.i = taosStr2Int64(pVal->literal, &endPtr, 10);
*(int16_t*)&pVal->typeData = pVal->datum.i; *(int16_t*)&pVal->typeData = pVal->datum.i;
break; break;
} }
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
char* endPtr = NULL; pVal->datum.i = taosStr2Int64(pVal->literal, NULL, 10);
pVal->datum.i = taosStr2Int64(pVal->literal, &endPtr, 10);
*(int32_t*)&pVal->typeData = pVal->datum.i; *(int32_t*)&pVal->typeData = pVal->datum.i;
break; break;
} }
case TSDB_DATA_TYPE_BIGINT: { case TSDB_DATA_TYPE_BIGINT: {
char* endPtr = NULL; pVal->datum.i = taosStr2Int64(pVal->literal, NULL, 10);
pVal->datum.i = taosStr2Int64(pVal->literal, &endPtr, 10);
*(int64_t*)&pVal->typeData = pVal->datum.i; *(int64_t*)&pVal->typeData = pVal->datum.i;
break; break;
} }
case TSDB_DATA_TYPE_UTINYINT: { case TSDB_DATA_TYPE_UTINYINT: {
char* endPtr = NULL; pVal->datum.u = taosStr2UInt64(pVal->literal, NULL, 10);
pVal->datum.u = taosStr2UInt64(pVal->literal, &endPtr, 10);
*(uint8_t*)&pVal->typeData = pVal->datum.u; *(uint8_t*)&pVal->typeData = pVal->datum.u;
break; break;
} }
case TSDB_DATA_TYPE_USMALLINT: { case TSDB_DATA_TYPE_USMALLINT: {
char* endPtr = NULL; pVal->datum.u = taosStr2UInt64(pVal->literal, NULL, 10);
pVal->datum.u = taosStr2UInt64(pVal->literal, &endPtr, 10);
*(uint16_t*)&pVal->typeData = pVal->datum.u; *(uint16_t*)&pVal->typeData = pVal->datum.u;
break; break;
} }
case TSDB_DATA_TYPE_UINT: { case TSDB_DATA_TYPE_UINT: {
char* endPtr = NULL; pVal->datum.u = taosStr2UInt64(pVal->literal, NULL, 10);
pVal->datum.u = taosStr2UInt64(pVal->literal, &endPtr, 10);
*(uint32_t*)&pVal->typeData = pVal->datum.u; *(uint32_t*)&pVal->typeData = pVal->datum.u;
break; break;
} }
case TSDB_DATA_TYPE_UBIGINT: { case TSDB_DATA_TYPE_UBIGINT: {
char* endPtr = NULL; pVal->datum.u = taosStr2UInt64(pVal->literal, NULL, 10);
pVal->datum.u = taosStr2UInt64(pVal->literal, &endPtr, 10);
*(uint64_t*)&pVal->typeData = pVal->datum.u; *(uint64_t*)&pVal->typeData = pVal->datum.u;
break; break;
} }
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
char* endPtr = NULL; pVal->datum.d = taosStr2Double(pVal->literal, NULL);
pVal->datum.d = taosStr2Double(pVal->literal, &endPtr);
*(float*)&pVal->typeData = pVal->datum.d; *(float*)&pVal->typeData = pVal->datum.d;
break; break;
} }
case TSDB_DATA_TYPE_DOUBLE: { case TSDB_DATA_TYPE_DOUBLE: {
char* endPtr = NULL; pVal->datum.d = taosStr2Double(pVal->literal, NULL);
pVal->datum.d = taosStr2Double(pVal->literal, &endPtr);
*(double*)&pVal->typeData = pVal->datum.d; *(double*)&pVal->typeData = pVal->datum.d;
break; break;
} }
...@@ -659,7 +651,7 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD ...@@ -659,7 +651,7 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD
break; break;
} }
case TSDB_DATA_TYPE_TIMESTAMP: { case TSDB_DATA_TYPE_TIMESTAMP: {
if (TSDB_CODE_SUCCESS != parseTimeFromValueNode(pVal)) { if (TSDB_CODE_SUCCESS != parseTimeFromValueNode(pCxt, pVal)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal); return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
} }
*(int64_t*)&pVal->typeData = pVal->datum.i; *(int64_t*)&pVal->typeData = pVal->datum.i;
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include "parInt.h" #include "parInt.h"
#include "parToken.h" #include "parToken.h"
bool isInsertSql(const char* pStr, size_t length) { bool qIsInsertSql(const char* pStr, size_t length) {
if (NULL == pStr) { if (NULL == pStr) {
return false; return false;
} }
...@@ -169,7 +169,7 @@ static void rewriteExprAlias(SNode* pRoot) { ...@@ -169,7 +169,7 @@ static void rewriteExprAlias(SNode* pRoot) {
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) { int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) { if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) {
code = parseInsertSql(pCxt, pQuery); code = parseInsertSql(pCxt, pQuery);
} else { } else {
code = parseSqlIntoAst(pCxt, pQuery); code = parseSqlIntoAst(pCxt, pQuery);
...@@ -184,6 +184,18 @@ int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** p ...@@ -184,6 +184,18 @@ int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** p
return extractResultSchema(pRoot, numOfCols, pSchema); return extractResultSchema(pRoot, numOfCols, pSchema);
} }
int32_t qSetSTableIdForRSma(SNode* pStmt, int64_t uid) {
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
SNode* pTable = ((SSelectStmt*)pStmt)->pFromTable;
if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
((SRealTableNode*)pTable)->pMeta->uid = uid;
((SRealTableNode*)pTable)->pMeta->suid = uid;
return TSDB_CODE_SUCCESS;
}
}
return TSDB_CODE_FAILED;
}
int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx) { int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册