提交 958f3586 编写于 作者: X Xiaoyu Wang

fix: the problem of data loss when interval is used for outer query

上级 d7c45493
...@@ -197,6 +197,7 @@ bool fmIsSystemInfoFunc(int32_t funcId); ...@@ -197,6 +197,7 @@ bool fmIsSystemInfoFunc(int32_t funcId);
bool fmIsImplicitTsFunc(int32_t funcId); bool fmIsImplicitTsFunc(int32_t funcId);
bool fmIsClientPseudoColumnFunc(int32_t funcId); bool fmIsClientPseudoColumnFunc(int32_t funcId);
bool fmIsMultiRowsFunc(int32_t funcId); bool fmIsMultiRowsFunc(int32_t funcId);
bool fmIsKeepOrderFunc(int32_t funcId);
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc); int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);
......
...@@ -47,6 +47,7 @@ extern "C" { ...@@ -47,6 +47,7 @@ extern "C" {
#define FUNC_MGT_SYSTEM_INFO_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(18) #define FUNC_MGT_SYSTEM_INFO_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(18)
#define FUNC_MGT_CLIENT_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(19) #define FUNC_MGT_CLIENT_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(19)
#define FUNC_MGT_MULTI_ROWS_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(20) #define FUNC_MGT_MULTI_ROWS_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(20)
#define FUNC_MGT_KEEP_ORDER_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(21)
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0) #define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
......
...@@ -2097,7 +2097,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -2097,7 +2097,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "top", .name = "top",
.type = FUNCTION_TYPE_TOP, .type = FUNCTION_TYPE_TOP,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateTopBot, .translateFunc = translateTopBot,
.getEnvFunc = getTopBotFuncEnv, .getEnvFunc = getTopBotFuncEnv,
.initFunc = topBotFunctionSetup, .initFunc = topBotFunctionSetup,
...@@ -2112,7 +2112,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -2112,7 +2112,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "bottom", .name = "bottom",
.type = FUNCTION_TYPE_BOTTOM, .type = FUNCTION_TYPE_BOTTOM,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateTopBot, .translateFunc = translateTopBot,
.getEnvFunc = getTopBotFuncEnv, .getEnvFunc = getTopBotFuncEnv,
.initFunc = topBotFunctionSetup, .initFunc = topBotFunctionSetup,
...@@ -2480,7 +2480,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -2480,7 +2480,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "sample", .name = "sample",
.type = FUNCTION_TYPE_SAMPLE, .type = FUNCTION_TYPE_SAMPLE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_ROWS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateSample, .translateFunc = translateSample,
.getEnvFunc = getSampleFuncEnv, .getEnvFunc = getSampleFuncEnv,
.initFunc = sampleFunctionSetup, .initFunc = sampleFunctionSetup,
...@@ -2906,7 +2906,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -2906,7 +2906,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "_select_value", .name = "_select_value",
.type = FUNCTION_TYPE_SELECT_VALUE, .type = FUNCTION_TYPE_SELECT_VALUE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
.translateFunc = translateSelectValue, .translateFunc = translateSelectValue,
.getEnvFunc = getSelectivityFuncEnv, // todo remove this function later. .getEnvFunc = getSelectivityFuncEnv, // todo remove this function later.
.initFunc = functionSetup, .initFunc = functionSetup,
......
...@@ -183,6 +183,8 @@ bool fmIsClientPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc( ...@@ -183,6 +183,8 @@ bool fmIsClientPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(
bool fmIsMultiRowsFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_MULTI_ROWS_FUNC); } bool fmIsMultiRowsFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_MULTI_ROWS_FUNC); }
bool fmIsKeepOrderFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_KEEP_ORDER_FUNC); }
bool fmIsInterpFunc(int32_t funcId) { bool fmIsInterpFunc(int32_t funcId) {
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) { if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return false; return false;
......
...@@ -2010,15 +2010,16 @@ static int32_t rewriteUniqueOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* ...@@ -2010,15 +2010,16 @@ static int32_t rewriteUniqueOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan*
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeAppend(&pProject->pChildren, (SNode*)pAgg); code = nodesListMakeAppend(&pProject->pChildren, (SNode*)pAgg);
pAgg->pParent = pProject;
pAgg = NULL;
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pAgg->pParent = pProject;
pAgg = NULL;
code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pIndef, pProject); code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pIndef, pProject);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = adjustLogicNodeDataRequirement( code = adjustLogicNodeDataRequirement(
pProject, NULL == pProject->pParent ? DATA_ORDER_LEVEL_NONE : pProject->pParent->requireDataOrder); pProject, NULL == pProject->pParent ? DATA_ORDER_LEVEL_NONE : pProject->pParent->requireDataOrder);
pProject = NULL;
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
nodesDestroyNode((SNode*)pIndef); nodesDestroyNode((SNode*)pIndef);
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "functionMgt.h"
#include "planInt.h" #include "planInt.h"
static char* getUsageErrFormat(int32_t errCode) { static char* getUsageErrFormat(int32_t errCode) {
...@@ -140,13 +141,27 @@ static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel ...@@ -140,13 +141,27 @@ static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel
} }
static int32_t adjustJoinDataRequirement(SJoinLogicNode* pJoin, EDataOrderLevel requirement) { static int32_t adjustJoinDataRequirement(SJoinLogicNode* pJoin, EDataOrderLevel requirement) {
// The lowest sort level of join input and output data is DATA_ORDER_LEVEL_GLOBAL
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static bool isKeepOrderAggFunc(SNodeList* pFuncs) {
SNode* pFunc = NULL;
FOREACH(pFunc, pFuncs) {
if (!fmIsKeepOrderFunc(((SFunctionNode*)pFunc)->funcId)) {
return false;
}
}
return true;
}
static int32_t adjustAggDataRequirement(SAggLogicNode* pAgg, EDataOrderLevel requirement) { static int32_t adjustAggDataRequirement(SAggLogicNode* pAgg, EDataOrderLevel requirement) {
if (requirement > DATA_ORDER_LEVEL_NONE) { // The sort level of agg with group by output data can only be DATA_ORDER_LEVEL_NONE
if (requirement > DATA_ORDER_LEVEL_NONE && (NULL != pAgg->pGroupKeys || !isKeepOrderAggFunc(pAgg->pAggFuncs))) {
return TSDB_CODE_PLAN_INTERNAL_ERROR; return TSDB_CODE_PLAN_INTERNAL_ERROR;
} }
pAgg->node.resultDataOrder = requirement;
pAgg->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -157,11 +172,12 @@ static int32_t adjustProjectDataRequirement(SProjectLogicNode* pProject, EDataOr ...@@ -157,11 +172,12 @@ static int32_t adjustProjectDataRequirement(SProjectLogicNode* pProject, EDataOr
} }
static int32_t adjustIntervalDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { static int32_t adjustIntervalDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) {
if (requirement <= pWindow->node.resultDataOrder) { // The lowest sort level of interval output data is DATA_ORDER_LEVEL_IN_GROUP
return TSDB_CODE_SUCCESS; if (requirement < DATA_ORDER_LEVEL_IN_GROUP) {
requirement = DATA_ORDER_LEVEL_IN_GROUP;
} }
// The sort level of interval input data is always DATA_ORDER_LEVEL_IN_BLOCK
pWindow->node.resultDataOrder = requirement; pWindow->node.resultDataOrder = requirement;
pWindow->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -70,4 +70,6 @@ TEST_F(PlanSubqeuryTest, outerInterval) { ...@@ -70,4 +70,6 @@ TEST_F(PlanSubqeuryTest, outerInterval) {
run("SELECT COUNT(*) FROM (SELECT * FROM st1) INTERVAL(5s)"); run("SELECT COUNT(*) FROM (SELECT * FROM st1) INTERVAL(5s)");
run("SELECT COUNT(*) + SUM(c1) FROM (SELECT * FROM st1) INTERVAL(5s)"); run("SELECT COUNT(*) + SUM(c1) FROM (SELECT * FROM st1) INTERVAL(5s)");
run("SELECT COUNT(*) FROM (SELECT ts, TOP(c1, 10) FROM st1s1) INTERVAL(5s)");
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册