提交 a25e2cd6 编写于 作者: H Heikki Linnakangas

Fix confusion with distribution keys of queries with FULL JOINs.

There was some confusion on how NULLs are distributed, when CdbPathLocus
is of Hashed or HashedOJ type. The comment in cdbpathlocus.h suggested
that NULLs can be on any segment. But the rest of the code assumed that
that's true only for HashedOJ, and that for Hashed, all NULLs are stored
on a particular segment. There was a comment in cdbgroup.c that said "Or
would HashedOJ ok, too?"; the answer to that is "No!". Given the comment
in cdbpathlocus.h, I'm not suprised that the author was not very sure
about that. Clarify the comments in cdbpathlocus.h and cdbgroup.c on that.

There were a few cases where we got that actively wrong. repartitionPlan()
function is used to inject a Redistribute Motion into queries used for
CREATE TABLE AS and INSERT, if the "current" locus didn't match the target
table's policy. It did not check for HashedOJ. Because of that, if the
query contained FULL JOINs, NULL values might end up on all segments. Code
elsewhere, particularly in cdbgroup.c, assumes that all NULLs in a table
are stored on a single segment, identified by the cdbhash value of a NULL
datum. Fix that, by adding a check for HashedOJ in repartitionPlan(), and
forcing a Redistribute Motion.

CREATE TABLE AS had a similar problem, in the code to decide which
distribution key to use, if the user didn't specify DISTRIBUTED BY
explicitly. The default behaviour is to choose a distribution key that
matches the distribution of the query, so that we can avoid adding an
extra Redistribute Motion. After fixing repartitionPlan, there was no
correctness problem, but if we chose the key based on a HashedOJ locus,
there is no performance benefit because we'd need a Redistribute Motion
anyway. So modify the code that chooses the CTAS distribution key to
ignore HashedOJ.

While we're at it, refactor the code to choose the CTAS distribution key,
by moving it to a separate function. It had become ridiculously deeply
indented.

Fixes https://github.com/greenplum-db/gpdb/issues/6154, and adds tests.
Reviewed-by: NMelanie Plageman <mplageman@pivotal.io>
上级 9457fe71
......@@ -3008,7 +3008,14 @@ cdbpathlocus_collocates(PlannerInfo *root, CdbPathLocus locus, List *pathkeys,
return true;
if (!CdbPathLocus_IsHashed(locus))
return false; /* Or would HashedOJ ok, too? */
{
/*
* Note: HashedOJ can *not* be used for grouping. In HashedOJ, NULL
* values can be located on any segment, so we would end up with
* multiple NULL groups.
*/
return false;
}
if (exact_match && list_length(pathkeys) != list_length(locus.partkey_h))
return false;
......
......@@ -1123,7 +1123,8 @@ repartitionPlan(Plan *plan, bool stable, bool rescannable,
plan->flow->flotype == FLOW_SINGLETON);
/* Already partitioned on the given hashExpr? Do nothing. */
if (hashExpr && plan->flow->numsegments == numsegments)
if (hashExpr && plan->flow->numsegments == numsegments &&
plan->flow->locustype == CdbLocusType_Hashed)
{
if (loci_compatible(hashExpr, plan->flow->hashExpr))
return true;
......
......@@ -219,6 +219,125 @@ directDispatchCalculateHash(Plan *plan, GpPolicy *targetPolicy)
pfree(nulls);
}
/*
* Create a GpPolicy that matches the Flow of the given plan.
*
* This is used with CREATE TABLE AS, to derive the distribution
* key for the table from the query plan.
*/
static GpPolicy *
get_partitioned_policy_from_flow(Plan *plan)
{
/* Find out what the flow is partitioned on */
List *policykeys;
ListCell *exp1;
/*
* Is it a Hashed distribution?
*
* NOTE: HashedOJ is not OK, because we cannot let the NULLs be stored
* multiple segments.
*/
if (plan->flow->locustype != CdbLocusType_Hashed)
{
return NULL;
}
/*
* Sometimes the planner produces a Flow with CdbLocusType_Hashed,
* but hashExpr are not set because we have lost track of the
* expressions it's hashed on.
*/
if (!plan->flow->hashExpr)
return NULL;
policykeys = NIL;
foreach(exp1, plan->flow->hashExpr)
{
Expr *var1 = (Expr *) lfirst(exp1);
AttrNumber n;
bool found_expr = false;
/* See if this Expr is a column of the result table */
for (n = 1; n <= list_length(plan->targetlist); n++)
{
TargetEntry *target = get_tle_by_resno(plan->targetlist, n);
Var *new_var;
if (target->resjunk)
continue;
/*
* Right side variable may be encapsulated by a relabel node.
* Motion, however, does not care about relabel nodes.
*/
if (IsA(var1, RelabelType))
var1 = ((RelabelType *) var1)->arg;
/*
* If subplan expr is a Var, copy to preserve its EXPLAIN info.
*/
if (IsA(target->expr, Var))
{
new_var = copyObject(target->expr);
new_var->varno = OUTER_VAR;
new_var->varattno = n;
}
/*
* Make a Var that references the target list entry at this
* offset, using OUTER_VAR as the varno
*/
else
new_var = makeVar(OUTER_VAR,
n,
exprType((Node *) target->expr),
exprTypmod((Node *) target->expr),
exprCollation((Node *) target->expr),
0);
if (equal(var1, new_var))
{
/*
* If it is, use it to partition the result table, to avoid
* unnecessary redistribution of data
*/
Assert(list_length(policykeys) < MaxPolicyAttributeNumber);
if (list_member_int(policykeys, n))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
errmsg("duplicate DISTRIBUTED BY column '%s'",
target->resname ? target->resname : "???")));
policykeys = lappend_int(policykeys, n);
found_expr = true;
break;
}
}
if (!found_expr)
{
/*
* This distribution key is not present in the target list. Give
* up.
*/
return NULL;
}
}
/*
* We use the default number of segments, even if the flow was partially
* distributed. That defeats the performance benefit of using the same
* distribution key columns, because we'll need a Restribute Motion
* anyway. But presumably if the user had expanded the cluster, they want
* to use all the segments for new tables.
*/
return createHashPartitionedPolicy(policykeys, GP_POLICY_DEFAULT_NUMSEGMENTS);
}
/* -------------------------------------------------------------------------
* Function apply_motion() and apply_motion_mutator() add motion nodes to a
* top-level Plan tree as directed by the Flow nodes in the plan.
......@@ -281,16 +400,6 @@ apply_motion(PlannerInfo *root, Plan *plan, Query *query)
/* If the query comes from 'CREATE TABLE AS' or 'SELECT INTO' */
if (query->parentStmtType != PARENTSTMTTYPE_NONE)
{
List *hashExpr;
ListCell *exp1;
/*
* In CTAS the source distribution policy is not inherited,
* always set numsegments to DEFAULT unless a DISTRIBUTED BY
* clause is specified.
*/
numsegments = GP_POLICY_DEFAULT_NUMSEGMENTS;
if (query->intoPolicy != NULL)
{
targetPolicy = query->intoPolicy;
......@@ -301,7 +410,7 @@ apply_motion(PlannerInfo *root, Plan *plan, Query *query)
}
else if (gp_create_table_random_default_distribution)
{
targetPolicy = createRandomPartitionedPolicy(numsegments);
targetPolicy = createRandomPartitionedPolicy(GP_POLICY_DEFAULT_NUMSEGMENTS);
ereport(NOTICE,
(errcode(ERRCODE_SUCCESSFUL_COMPLETION),
errmsg("Using default RANDOM distribution since no distribution was specified."),
......@@ -309,100 +418,17 @@ apply_motion(PlannerInfo *root, Plan *plan, Query *query)
}
else
{
/* Find out what the flow is partitioned on */
List *policykeys = NIL;
hashExpr = plan->flow->hashExpr;
if (hashExpr)
foreach(exp1, hashExpr)
{
AttrNumber n;
bool found_expr = false;
/* See if this Expr is a column of the result table */
/* First try to deduce the distribution from the query */
targetPolicy = get_partitioned_policy_from_flow(plan);
for (n = 1; n <= list_length(plan->targetlist); n++)
{
Var *new_var = NULL;
TargetEntry *target = get_tle_by_resno(plan->targetlist, n);
if (!target->resjunk)
{
Expr *var1 = (Expr *) lfirst(exp1);
/*
* right side variable may be encapsulated by
* a relabel node. motion, however, does not
* care about relabel nodes.
*/
if (IsA(var1, RelabelType))
var1 = ((RelabelType *) var1)->arg;
/*
* If subplan expr is a Var, copy to preserve
* its EXPLAIN info.
*/
if (IsA(target->expr, Var))
{
new_var = copyObject(target->expr);
new_var->varno = OUTER_VAR;
new_var->varattno = n;
}
/*
* Make a Var that references the target list
* entry at this offset, using OUTER_VAR as the
* varno
*/
else
new_var = makeVar(OUTER_VAR,
n,
exprType((Node *) target->expr),
exprTypmod((Node *) target->expr),
exprCollation((Node *) target->expr),
0);
if (equal(var1, new_var))
{
/*
* If it is, use it to partition the
* result table, to avoid unnecessary
* redistibution of data
*/
Assert(list_length(policykeys) < MaxPolicyAttributeNumber);
if (list_member_int(policykeys, n))
{
TargetEntry *target = get_tle_by_resno(plan->targetlist, n);
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
errmsg("duplicate DISTRIBUTED BY column '%s'",
target->resname ? target->resname : "???")));
}
policykeys = lappend_int(policykeys, n);
found_expr = true;
break;
}
}
}
if (!found_expr)
break;
}
/* do we know how to partition? */
if (policykeys == NIL)
/*
* If that fails, hash on the first hashable column we can
* find.
*/
if (!targetPolicy)
{
/*
* hash on the first hashable column we can find.
*/
int i;
List *policykeys = NIL;
for (i = 0; i < list_length(plan->targetlist); i++)
{
......@@ -420,11 +446,10 @@ apply_motion(PlannerInfo *root, Plan *plan, Query *query)
}
}
}
targetPolicy = createHashPartitionedPolicy(policykeys,
GP_POLICY_DEFAULT_NUMSEGMENTS);
}
targetPolicy = createHashPartitionedPolicy(policykeys,
numsegments);
/* If we deduced the policy from the query, give a NOTICE */
if (query->parentStmtType == PARENTSTMTTYPE_CTAS)
{
......
......@@ -81,20 +81,24 @@ typedef enum CdbLocusType
* expressions (e1, e2, ..., em) forms an equivalence class
* with respect to the partitioning function.
*
* We use this information primarily to optimize joins for which we
* have an equijoin predicate on every column of the partitioning key.
* In such a join, a row in which any partitioning key column is NULL
* cannot join with other rows. So for our purposes here, equivalence
* with respect to partitioning pertains only to rows in which no
* partitioning key column is NULL. In particular, we can disregard
* the distribution of null-augmented rows generated by outer join.
* We use this information for a few different purposes:
*
* 1. To optimize joins for which we have an equijoin predicate on every
* column of the partitioning key.
*
* 2. To optimize grouping and set operations (i.e. UNION, INTERSECT).
* If the GROUP BY clause contains all the partitioning key columns, we
* can perform the grouping separately on each segment.
*
* 3. In CREATE TABLE AS, we use the result distribution as the
* distribution key of the new table.
*
* If locustype == CdbLocusType_Hashed:
* Rows are distributed on a hash function of the partitioning key.
* The 'partkey' field points to a pathkey list (see pathkeys.c).
* Each of the sets 'Ei' is represented as a PathKey, and in particular
* its equivalence class, which is a List of expressions that are
* constrained by equality predicates to be equal to one another.
* its equivalence class, which contains a list of expressions (members)
* are constrained by equality predicates to be equal to one another.
*
* If locustype == CdbLocusType_HashedOJ:
* Rows are distributed on a hash function of the partitioning key,
......@@ -104,6 +108,17 @@ typedef enum CdbLocusType
* classes can be considered as equal for the purposes of the locus in
* a join relation. This case arises in the result of outer join.
*
* NB: The important distinction between Hashed and HashedOJ, is the
* semantics for NULLs. In a Hashed distribution, a NULL is considered
* like any other value, and all NULLs are located on a particular
* segment, based on the hash value of a NULL datum. But with HashedOJ,
* the NULL values can legitimately appear on any segment! For join
* optimization, either one will do. In an inner join on A=B, any NULL
* rows won't match anyway. And for an OUTER JOIN, it doesn't matter
* which segment the NULL rows output on, as long as we correctly mark
* the resulting locus also as HashedOJ. But for grouping, HashedOJ can
* not be used, because NULL groups might appear multiple segments!
*
* If locustype == CdbLocusType_Strewn:
* Rows are distributed according to a criterion that is unknown or
* may depend on inputs that are unknown or unavailable in the present
......
......@@ -553,3 +553,88 @@ select * from inhdisttest_b;
1 | lastname c | 422
(2 rows)
--
-- Test that NULLs are distributed correctly, by a CTAS involving an outer join
--
create temporary table even (i int4, j int4) distributed by (i);
insert into even select g*2, g*2 from generate_series(1, 10) g;
create temporary table odd (i int4, j int4) distributed by (i);
insert into odd select g*2+1, g*2+1 from generate_series(1, 10) g;
create temporary table ctas_x as
select even.j, even.i as a, odd.i as b from even full outer join odd on (even.i = odd.i)
distributed by (a);
-- Check that all the rows with NULL distribution key are stored on the same segment.
select count(distinct gp_segment_id) from ctas_x where a is null;
count
-------
1
(1 row)
select a from ctas_x group by a;
a
----
18
4
8
20
10
6
12
2
16
14
(11 rows)
-- The same, but let the planner deduce the distribution key by itself. The
-- codepaths to deduce it, and to check if the explicitly given distribution
-- needs a Redistribute, are different. It should not choose 'a', even though
-- the result is distributed on 'a', because there are NULLs on all segments.
create temporary table ctas_y as
select even.j, even.i as a, odd.i as b
from even full outer join odd on (even.i = odd.i);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'j' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
select a from ctas_y group by a;
a
----
14
18
4
8
20
10
6
12
2
16
(11 rows)
-- Same for INSERT.
create temporary table insert_z (j int4, a int4, b int4) distributed by (a);
insert into insert_z
select even.j, even.i as a, odd.i as b
from even full outer join odd on (even.i = odd.i);
select count(distinct gp_segment_id) from insert_z where a is null;
count
-------
1
(1 row)
select a from insert_z group by a;
a
----
18
4
8
20
10
6
12
2
16
14
(11 rows)
......@@ -458,3 +458,39 @@ INSERT INTO inhdisttest_c (ssn, lastname, junk, id, morejunk, uid1, uid2, uid3)
select * from inhdisttest_a;
select * from inhdisttest_b;
--
-- Test that NULLs are distributed correctly, by a CTAS involving an outer join
--
create temporary table even (i int4, j int4) distributed by (i);
insert into even select g*2, g*2 from generate_series(1, 10) g;
create temporary table odd (i int4, j int4) distributed by (i);
insert into odd select g*2+1, g*2+1 from generate_series(1, 10) g;
create temporary table ctas_x as
select even.j, even.i as a, odd.i as b from even full outer join odd on (even.i = odd.i)
distributed by (a);
-- Check that all the rows with NULL distribution key are stored on the same segment.
select count(distinct gp_segment_id) from ctas_x where a is null;
select a from ctas_x group by a;
-- The same, but let the planner deduce the distribution key by itself. The
-- codepaths to deduce it, and to check if the explicitly given distribution
-- needs a Redistribute, are different. It should not choose 'a', even though
-- the result is distributed on 'a', because there are NULLs on all segments.
create temporary table ctas_y as
select even.j, even.i as a, odd.i as b
from even full outer join odd on (even.i = odd.i);
select a from ctas_y group by a;
-- Same for INSERT.
create temporary table insert_z (j int4, a int4, b int4) distributed by (a);
insert into insert_z
select even.j, even.i as a, odd.i as b
from even full outer join odd on (even.i = odd.i);
select count(distinct gp_segment_id) from insert_z where a is null;
select a from insert_z group by a;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册