未验证 提交 b5a84be4 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #15797 from taosdata/feature/3.0_wxy

fix: add checks for stream query
......@@ -1253,20 +1253,21 @@ static int32_t translateRepeatScanFunc(STranslateContext* pCxt, SFunctionNode* p
if (!fmIsRepeatScanFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (isSelectStmt(pCxt->pCurrStmt)) {
// select percentile() without from clause is also valid
if (NULL == ((SSelectStmt*)pCxt->pCurrStmt)->pFromTable) {
return TSDB_CODE_SUCCESS;
}
SNode* pTable = ((SSelectStmt*)pCxt->pCurrStmt)->pFromTable;
if (QUERY_NODE_REAL_TABLE == nodeType(pTable) &&
(TSDB_CHILD_TABLE == ((SRealTableNode*)pTable)->pMeta->tableType ||
TSDB_NORMAL_TABLE == ((SRealTableNode*)pTable)->pMeta->tableType)) {
return TSDB_CODE_SUCCESS;
}
if (!isSelectStmt(pCxt->pCurrStmt)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE,
"%s is only supported in single table query", pFunc->functionName);
}
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE,
"%s is only supported in single table query", pFunc->functionName);
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
SNode* pTable = pSelect->pFromTable;
// select percentile() without from clause is also valid
if ((NULL != pTable && (QUERY_NODE_REAL_TABLE != nodeType(pTable) ||
(TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType))) ||
NULL != pSelect->pPartitionByList) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE,
"%s is only supported in single table query", pFunc->functionName);
}
return TSDB_CODE_SUCCESS;
}
static bool isStar(SNode* pNode) {
......@@ -2509,9 +2510,31 @@ static EDealRes checkStateExpr(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
static int32_t translateStateWindow(STranslateContext* pCxt, SStateWindowNode* pState) {
static bool isPartitionByTbname(SNodeList* pPartitionByList) {
if (1 != LIST_LENGTH(pPartitionByList)) {
return false;
}
SNode* pPartKey = nodesListGetNode(pPartitionByList, 0);
return QUERY_NODE_FUNCTION != nodeType(pPartKey) || FUNCTION_TYPE_TBNAME != ((SFunctionNode*)pPartKey)->funcType;
}
static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (!pCxt->createStream) {
return TSDB_CODE_SUCCESS;
}
if (TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!isPartitionByTbname(pSelect->pPartitionByList)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateStateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
SStateWindowNode* pState = (SStateWindowNode*)pSelect->pWindow;
nodesWalkExprPostOrder(pState->pExpr, checkStateExpr, pCxt);
// todo check for "function not support for state_window"
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode = checkStateWindowForStream(pCxt, pSelect);
}
return pCxt->errCode;
}
......@@ -2522,14 +2545,13 @@ static int32_t translateSessionWindow(STranslateContext* pCxt, SSessionWindowNod
if (PRIMARYKEY_TIMESTAMP_COL_ID != pSession->pCol->colId) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SESSION_COL);
}
// todo check for "function not support for session"
return TSDB_CODE_SUCCESS;
}
static int32_t translateSpecificWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
switch (nodeType(pSelect->pWindow)) {
case QUERY_NODE_STATE_WINDOW:
return translateStateWindow(pCxt, (SStateWindowNode*)pSelect->pWindow);
return translateStateWindow(pCxt, pSelect);
case QUERY_NODE_SESSION_WINDOW:
return translateSessionWindow(pCxt, (SSessionWindowNode*)pSelect->pWindow);
case QUERY_NODE_INTERVAL_WINDOW:
......@@ -4708,7 +4730,7 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt
}
}
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY);
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
}
static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) {
......
......@@ -85,7 +85,7 @@ static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDown
}
int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) {
planDebug("QID:0x%" PRIx64 " set subplan execution node, groupId:%d", subplan->id.groupId, groupId);
planDebug("QID:0x%" PRIx64 " set subplan execution node, groupId:%d", subplan->id.queryId, groupId);
return setSubplanExecutionNode(subplan->pNode, groupId, pSource);
}
......@@ -104,7 +104,10 @@ static void clearSubplanExecutionNode(SPhysiNode* pNode) {
FOREACH(pChild, pNode->pChildren) { clearSubplanExecutionNode((SPhysiNode*)pChild); }
}
void qClearSubplanExecutionNode(SSubplan* pSubplan) { clearSubplanExecutionNode(pSubplan->pNode); }
void qClearSubplanExecutionNode(SSubplan* pSubplan) {
planDebug("QID:0x%" PRIx64 " clear subplan execution node, groupId:%d", pSubplan->id.queryId, pSubplan->id.groupId);
clearSubplanExecutionNode(pSubplan->pNode);
}
int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType && NULL == pSubplan->pNode) {
......
......@@ -284,7 +284,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
for (int32_t i = 0; i < parentNum; ++i) {
SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);
int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
SCH_LOCK(SCH_WRITE, &parent->planLock);
SDownstreamSourceNode source = {
......@@ -298,6 +297,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
SCH_UNLOCK(SCH_WRITE, &parent->planLock);
int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId);
SCH_ERR_RET(schLaunchTask(pJob, parent));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册