提交 bc98d601 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/tsdb_optimize

...@@ -147,7 +147,7 @@ In both of these methods, configuring the watermark is essential for obtaining a ...@@ -147,7 +147,7 @@ In both of these methods, configuring the watermark is essential for obtaining a
## Supported functions ## Supported functions
All [scalar functions](../function/#scalar-functions) are available in stream processing. All [System information functions](../function/#system-information-functions) are <b>not</b> allowed in stream processing. All [Aggregate functions](../function/#aggregate-functions) and [Selection functions](../function/#selection-functions) are available in stream processing, except the followings: All [scalar functions](../function/#scalar-functions) are available in stream processing. All [Aggregate functions](../function/#aggregate-functions) and [Selection functions](../function/#selection-functions) are available in stream processing, except the followings:
- [leastsquares](../function/#leastsquares) - [leastsquares](../function/#leastsquares)
- [percentile](../function/#percentile) - [percentile](../function/#percentile)
- [top](../function/#top) - [top](../function/#top)
......
...@@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> { ...@@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> {
taos.exec_many([ taos.exec_many([
format!("DROP TOPIC IF EXISTS tmq_meters"), format!("DROP TOPIC IF EXISTS tmq_meters"),
format!("DROP DATABASE IF EXISTS `{db}`"), format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"), format!("CREATE DATABASE `{db}` WAL_RETENTION_PERIOD 3600"),
format!("USE `{db}`"), format!("USE `{db}`"),
// create super table // create super table
format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"), format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"),
......
...@@ -227,7 +227,7 @@ T = 最新事件时间 - DELETE_MARK ...@@ -227,7 +227,7 @@ T = 最新事件时间 - DELETE_MARK
## 流式计算支持的函数 ## 流式计算支持的函数
1. 所有的 [单行函数](../function/#单行函数) 均可用于流计算。 1. 所有的 [单行函数](../function/#单行函数) 均可用于流计算。
2. 以下 19 个聚合/选择函数 <b>不能</b> 应用在创建流计算的 SQL 语句[系统信息函数](../function/#系统信息函数) 也不能用于流计算中。此外的其他类型的函数均可用于流计算。 2. 以下 19 个聚合/选择函数 <b>不能</b> 应用在创建流计算的 SQL 语句。此外的其他类型的函数均可用于流计算。
- [leastsquares](../function/#leastsquares) - [leastsquares](../function/#leastsquares)
- [percentile](../function/#percentile) - [percentile](../function/#percentile)
......
...@@ -60,7 +60,7 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -60,7 +60,7 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
if (IsReq(pMsg)) { if (IsReq(pMsg)) {
if (code != 0) { if (code != 0) {
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
dGError("msg:%p, failed to process since %s, type:%s", pMsg, terrstr(code), TMSG_INFO(pMsg->msgType)); dGError("msg:%p, failed to process since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
} }
vmSendRsp(pMsg, code); vmSendRsp(pMsg, code);
} }
......
...@@ -118,7 +118,7 @@ typedef enum { ...@@ -118,7 +118,7 @@ typedef enum {
} ETrnPolicy; } ETrnPolicy;
typedef enum { typedef enum {
TRN_EXEC_PRARLLEL = 0, TRN_EXEC_PARALLEL = 0,
TRN_EXEC_SERIAL = 1, TRN_EXEC_SERIAL = 1,
} ETrnExec; } ETrnExec;
...@@ -177,6 +177,7 @@ typedef struct { ...@@ -177,6 +177,7 @@ typedef struct {
SArray* pRpcArray; SArray* pRpcArray;
SRWLatch lockRpcArray; SRWLatch lockRpcArray;
int64_t mTraceId; int64_t mTraceId;
TdThreadMutex mutex;
} STrans; } STrans;
typedef struct { typedef struct {
......
...@@ -76,6 +76,7 @@ void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); ...@@ -76,6 +76,7 @@ void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen); void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname); void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname);
void mndTransSetSerial(STrans *pTrans); void mndTransSetSerial(STrans *pTrans);
void mndTransSetParallel(STrans *pTrans);
void mndTransSetOper(STrans *pTrans, EOperType oper); void mndTransSetOper(STrans *pTrans, EOperType oper);
int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans); int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans);
......
...@@ -546,6 +546,7 @@ static void mndTransDropData(STrans *pTrans) { ...@@ -546,6 +546,7 @@ static void mndTransDropData(STrans *pTrans) {
pTrans->param = NULL; pTrans->param = NULL;
pTrans->paramLen = 0; pTrans->paramLen = 0;
} }
(void)taosThreadMutexDestroy(&pTrans->mutex);
} }
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) { static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
...@@ -643,7 +644,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, ...@@ -643,7 +644,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
pTrans->stage = TRN_STAGE_PREPARE; pTrans->stage = TRN_STAGE_PREPARE;
pTrans->policy = policy; pTrans->policy = policy;
pTrans->conflict = conflict; pTrans->conflict = conflict;
pTrans->exec = TRN_EXEC_PRARLLEL; pTrans->exec = TRN_EXEC_PARALLEL;
pTrans->createdTime = taosGetTimestampMs(); pTrans->createdTime = taosGetTimestampMs();
pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
...@@ -651,6 +652,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, ...@@ -651,6 +652,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo)); pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : 0; pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : 0;
taosInitRWLatch(&pTrans->lockRpcArray); taosInitRWLatch(&pTrans->lockRpcArray);
taosThreadMutexInit(&pTrans->mutex, NULL);
if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL || if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL ||
pTrans->pRpcArray == NULL) { pTrans->pRpcArray == NULL) {
...@@ -793,6 +795,8 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname) ...@@ -793,6 +795,8 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname)
void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; } void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; }
void mndTransSetParallel(STrans *pTrans) { pTrans->exec = TRN_EXEC_PARALLEL; }
void mndTransSetOper(STrans *pTrans, EOperType oper) { pTrans->oper = oper; } void mndTransSetOper(STrans *pTrans, EOperType oper) { pTrans->oper = oper; }
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
...@@ -1307,7 +1311,13 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) ...@@ -1307,7 +1311,13 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
int32_t code = 0; int32_t code = 0;
int32_t numOfActions = taosArrayGetSize(pTrans->redoActions); int32_t numOfActions = taosArrayGetSize(pTrans->redoActions);
if (numOfActions == 0) return code; if (numOfActions == 0) return code;
if (pTrans->redoActionPos >= numOfActions) return code;
taosThreadMutexLock(&pTrans->mutex);
if (pTrans->redoActionPos >= numOfActions) {
taosThreadMutexUnlock(&pTrans->mutex);
return code;
}
mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->redoActionPos); mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->redoActionPos);
...@@ -1377,6 +1387,8 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) ...@@ -1377,6 +1387,8 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
} }
} }
taosThreadMutexUnlock(&pTrans->mutex);
return code; return code;
} }
......
...@@ -1917,7 +1917,7 @@ int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTra ...@@ -1917,7 +1917,7 @@ int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTra
int32_t vgid = pVgroup->vgId; int32_t vgid = pVgroup->vgId;
int8_t replica = pVgroup->replica; int8_t replica = pVgroup->replica;
if(pVgroup->replica <= 1) { if(pVgroup->replica <= 1) {
mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica); mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
return -1; return -1;
} }
...@@ -1951,6 +1951,19 @@ int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTra ...@@ -1951,6 +1951,19 @@ int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTra
return -1; return -1;
} }
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
if (pDb == NULL) {
mError("trans:%d, vgid:%d failed to be balanced to dnode:%d, because db not exist", pTrans->id, vgid, dnodeId);
return -1;
}
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) {
mError("trans:%d, vgid:%d failed to be balanced to dnode:%d", pTrans->id, vgid, dnodeId);
return -1;
}
mndReleaseDb(pMnode, pDb);
SSdbRaw *pRaw = mndVgroupActionEncode(pVgroup); SSdbRaw *pRaw = mndVgroupActionEncode(pVgroup);
if (pRaw == NULL) { if (pRaw == NULL) {
mError("trans:%d, vgid:%d failed to encode action to dnode:%d", pTrans->id, vgid, dnodeId); mError("trans:%d, vgid:%d failed to encode action to dnode:%d", pTrans->id, vgid, dnodeId);
...@@ -1965,7 +1978,8 @@ int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTra ...@@ -1965,7 +1978,8 @@ int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTra
} }
else else
{ {
mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist, online); mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d",
pTrans->id, vgid, dnodeId, exist, online);
} }
return 0; return 0;
...@@ -2103,6 +2117,18 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, ...@@ -2103,6 +2117,18 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans,
return 0; return 0;
} }
static int32_t mndTransCommitVgStatus(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus) {
SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
if (pRaw == NULL) goto _err;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _err;
(void)sdbSetRawStatus(pRaw, vgStatus);
pRaw = NULL;
return 0;
_err:
sdbFreeRaw(pRaw);
return -1;
}
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) { int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = NULL; STrans *pTrans = NULL;
...@@ -2181,28 +2207,16 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro ...@@ -2181,28 +2207,16 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
if (pDb->cfg.replications != newVg1.replica) { if (pDb->cfg.replications != newVg1.replica) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER; if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER;
} else { } else {
pRaw = mndVgroupActionEncode(&newVg1); if (mndTransCommitVgStatus(pTrans, &newVg1, SDB_STATUS_READY) < 0) goto _OVER;
if (pRaw == NULL) goto _OVER;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
pRaw = NULL;
} }
if (pDb->cfg.replications != newVg2.replica) { if (pDb->cfg.replications != newVg2.replica) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER; if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER;
} else { } else {
pRaw = mndVgroupActionEncode(&newVg2); if (mndTransCommitVgStatus(pTrans, &newVg2, SDB_STATUS_READY) < 0) goto _OVER;
if (pRaw == NULL) goto _OVER;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
pRaw = NULL;
} }
pRaw = mndVgroupActionEncode(pVgroup); if (mndTransCommitVgStatus(pTrans, pVgroup, SDB_STATUS_DROPPED) < 0) goto _OVER;
if (pRaw == NULL) goto _OVER;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);
pRaw = NULL;
memcpy(&dbObj, pDb, sizeof(SDbObj)); memcpy(&dbObj, pDb, sizeof(SDbObj));
if (dbObj.cfg.pRetensions != NULL) { if (dbObj.cfg.pRetensions != NULL) {
...@@ -2229,42 +2243,13 @@ _OVER: ...@@ -2229,42 +2243,13 @@ _OVER:
return code; return code;
} }
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { extern int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq);
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
SVgObj *pVgroup = NULL;
SDbObj *pDb = NULL;
SSplitVgroupReq req = {0};
if (tDeserializeSSplitVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
mInfo("vgId:%d, start to split", req.vgId);
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_SPLIT_VGROUP) != 0) {
goto _OVER;
}
pVgroup = mndAcquireVgroup(pMnode, req.vgId);
if (pVgroup == NULL) goto _OVER;
pDb = mndAcquireDb(pMnode, pVgroup->dbName);
if (pDb == NULL) goto _OVER;
code = mndSplitVgroup(pMnode, pReq, pDb, pVgroup); static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return mndProcessSplitVgroupMsgImp(pReq); }
if (code != 0) {
mError("vgId:%d, failed to start to split vgroup since %s, db:%s", pVgroup->vgId, terrstr(), pDb->name);
goto _OVER;
}
mInfo("vgId:%d, split vgroup started successfully. db:%s", pVgroup->vgId, pDb->name);
_OVER: #ifndef TD_ENTERPRISE
mndReleaseVgroup(pMnode, pVgroup); int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq) { return 0; }
mndReleaseDb(pMnode, pDb); #endif
return code;
}
static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
SDnodeObj *pSrc, SDnodeObj *pDst) { SDnodeObj *pSrc, SDnodeObj *pDst) {
......
...@@ -2289,6 +2289,25 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) ...@@ -2289,6 +2289,25 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t checkHavingGroupBy(STranslateContext* pCxt, SSelectStmt* pSelect) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == getGroupByList(pCxt) && NULL == pSelect->pPartitionByList && NULL == pSelect->pWindow) {
return code;
}
if (NULL != pSelect->pHaving) {
code = checkExprForGroupBy(pCxt, &pSelect->pHaving);
}
/*
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pProjectionList) {
code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pProjectionList);
}
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pOrderByList) {
code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pOrderByList);
}
*/
return code;
}
static int32_t checkWindowFuncCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t checkWindowFuncCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (NULL == pSelect->pWindow) { if (NULL == pSelect->pWindow) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -3024,7 +3043,7 @@ static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect ...@@ -3024,7 +3043,7 @@ static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pProjectionList); code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pProjectionList);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = translateFillValues(pCxt, pSelect); code = translateFillValues(pCxt, pSelect);
} }
...@@ -3041,9 +3060,6 @@ static int32_t translateHaving(STranslateContext* pCxt, SSelectStmt* pSelect) { ...@@ -3041,9 +3060,6 @@ static int32_t translateHaving(STranslateContext* pCxt, SSelectStmt* pSelect) {
} }
pCxt->currClause = SQL_CLAUSE_HAVING; pCxt->currClause = SQL_CLAUSE_HAVING;
int32_t code = translateExpr(pCxt, &pSelect->pHaving); int32_t code = translateExpr(pCxt, &pSelect->pHaving);
if (TSDB_CODE_SUCCESS == code && (NULL != pSelect->pGroupByList || NULL != pSelect->pWindow)) {
code = checkExprForGroupBy(pCxt, &pSelect->pHaving);
}
return code; return code;
} }
...@@ -3626,6 +3642,9 @@ static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect ...@@ -3626,6 +3642,9 @@ static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = translateSelectList(pCxt, pSelect); code = translateSelectList(pCxt, pSelect);
} }
if (TSDB_CODE_SUCCESS == code) {
code = checkHavingGroupBy(pCxt, pSelect);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = translateOrderBy(pCxt, pSelect); code = translateOrderBy(pCxt, pSelect);
} }
......
...@@ -239,6 +239,19 @@ TEST_F(ParserSelectTest, groupBySemanticCheck) { ...@@ -239,6 +239,19 @@ TEST_F(ParserSelectTest, groupBySemanticCheck) {
run("SELECT COUNT(*) cnt, c2 FROM t1 WHERE c1 > 0 GROUP BY c1", TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION); run("SELECT COUNT(*) cnt, c2 FROM t1 WHERE c1 > 0 GROUP BY c1", TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION);
} }
TEST_F(ParserSelectTest, havingCheck) {
useDb("root", "test");
run("select tbname,count(*) from st1 partition by tbname having c1>0", TSDB_CODE_PAR_INVALID_OPTR_USAGE);
run("select tbname,count(*) from st1 group by tbname having c1>0", TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION);
run("select max(c1) from st1 group by tbname having c1>0");
run("select max(c1) from st1 partition by tbname having c1>0");
}
TEST_F(ParserSelectTest, orderBy) { TEST_F(ParserSelectTest, orderBy) {
useDb("root", "test"); useDb("root", "test");
......
...@@ -1070,7 +1070,7 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel ...@@ -1070,7 +1070,7 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel
TSWAP(pProject->node.pLimit, pSelect->pLimit); TSWAP(pProject->node.pLimit, pSelect->pLimit);
TSWAP(pProject->node.pSlimit, pSelect->pSlimit); TSWAP(pProject->node.pSlimit, pSelect->pSlimit);
pProject->ignoreGroupId = pSelect->isSubquery ? true : (NULL == pSelect->pPartitionByList); pProject->ignoreGroupId = (pSelect->isSubquery && NULL == pProject->node.pLimit && NULL == pProject->node.pSlimit) ? true : (NULL == pSelect->pPartitionByList);
pProject->node.groupAction = pProject->node.groupAction =
(!pSelect->isSubquery && pCxt->pPlanCxt->streamQuery) ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR; (!pSelect->isSubquery && pCxt->pPlanCxt->streamQuery) ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR;
pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
......
...@@ -2351,6 +2351,17 @@ static EDealRes mergeProjectionsExpr(SNode** pNode, void* pContext) { ...@@ -2351,6 +2351,17 @@ static EDealRes mergeProjectionsExpr(SNode** pNode, void* pContext) {
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
static int32_t mergeProjectionsLogicNode(SLogicNode* pDstNode, SLogicNode* pSrcNode) {
SProjectLogicNode *pDstPro = (SProjectLogicNode*)pDstNode;
SProjectLogicNode *pSrcPro = (SProjectLogicNode*)pSrcNode;
if (!pSrcPro->ignoreGroupId) {
pDstPro->ignoreGroupId = pSrcPro->ignoreGroupId;
}
return TSDB_CODE_SUCCESS;
}
static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSelfNode) { static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSelfNode) {
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSelfNode->pChildren, 0); SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSelfNode->pChildren, 0);
...@@ -2360,8 +2371,11 @@ static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* ...@@ -2360,8 +2371,11 @@ static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan*
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if (1 == LIST_LENGTH(pChild->pChildren)) { if (1 == LIST_LENGTH(pChild->pChildren)) {
SLogicNode* pGrandChild = (SLogicNode*)nodesListGetNode(pChild->pChildren, 0); code = mergeProjectionsLogicNode(pSelfNode, pChild);
code = replaceLogicNode(pLogicSubplan, pChild, pGrandChild); if (TSDB_CODE_SUCCESS == code) {
SLogicNode* pGrandChild = (SLogicNode*)nodesListGetNode(pChild->pChildren, 0);
code = replaceLogicNode(pLogicSubplan, pChild, pGrandChild);
}
} else { // no grand child } else { // no grand child
NODES_CLEAR_LIST(pSelfNode->pChildren); NODES_CLEAR_LIST(pSelfNode->pChildren);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册