未验证 提交 fa762b69 编写于 作者: N Ning Yu 提交者: GitHub

Fix motion hazard between outer and joinqual

A motion hazard is a deadlock between motions, a classic motion hazard
in a join executor is formed by its inner and outer motions, it can be
prevented by prefetching the inner plan, refer to motion_sanity_check()
for details.

A similar motion hazard can be formed by the outer motion and the join
qual motion.  A join executor fetches a outer tuple, filters it with the
join qual, then repeat the process on all the outer tuples.  When there
are motions in both outer plan and the join qual then below state is
possible:

0. processes A and B belong to the join slice, process C belongs to the
   outer slice, process D belongs to the JoinQual slice;
1. A has read the first outer tuple and is fetching tuples from D;
2. D is waiting for ACK from B;
3. B is fetching the first outer tuple from C;
4. C is waiting for ACK from A;

So a deadlock is formed A->D->B->C->A.  We can prevent it also by
prefetching the join qual.
Reviewed-by: NJesse Zhang <jzhang@pivotal.io>
Reviewed-by: NGang Xiong <gxiong@pivotal.io>
Reviewed-by: NZhenghua Lyu <zlv@pivotal.io>
上级 76e7b822
......@@ -1682,6 +1682,87 @@ ExecGetShareNodeEntry(EState* estate, int shareidx, bool fCreate)
return (ShareNodeEntry *) list_nth(*estate->es_sharenode, shareidx);
}
/*
* Prefetch JoinQual to prevent motion hazard.
*
* A motion hazard is a deadlock between motions, a classic motion hazard in a
* join executor is formed by its inner and outer motions, it can be prevented
* by prefetching the inner plan, refer to motion_sanity_check() for details.
*
* A similar motion hazard can be formed by the outer motion and the join qual
* motion. A join executor fetches a outer tuple, filters it with the join
* qual, then repeat the process on all the outer tuples. When there are
* motions in both outer plan and the join qual then below state is possible:
*
* 0. processes A and B belong to the join slice, process C belongs to the
* outer slice, process D belongs to the JoinQual slice;
* 1. A has read the first outer tuple and is fetching tuples from D;
* 2. D is waiting for ACK from B;
* 3. B is fetching the first outer tuple from C;
* 4. C is waiting for ACK from A;
*
* So a deadlock is formed A->D->B->C->A. We can prevent it also by
* prefetching the join qual.
*
* An example is demonstrated and explained in test case
* src/test/regress/sql/deadlock2.sql.
*
* Return true if the JoinQual is prefetched.
*/
bool
ExecPrefetchJoinQual(JoinState *node)
{
EState *estate = node->ps.state;
ExprContext *econtext = node->ps.ps_ExprContext;
PlanState *inner = innerPlanState(node);
PlanState *outer = outerPlanState(node);
List *joinqual = node->joinqual;
TupleTableSlot *innertuple = econtext->ecxt_innertuple;
if (!joinqual)
return false;
/* Outer tuples should not be fetched before us */
Assert(econtext->ecxt_outertuple == NULL);
/* Build fake inner & outer tuples */
econtext->ecxt_innertuple = ExecInitNullTupleSlot(estate,
ExecGetResultType(inner));
econtext->ecxt_outertuple = ExecInitNullTupleSlot(estate,
ExecGetResultType(outer));
/* Fetch subplan with the fake inner & outer tuples */
ExecQual(joinqual, econtext, false);
/* Restore previous state */
econtext->ecxt_innertuple = innertuple;
econtext->ecxt_outertuple = NULL;
return true;
}
/*
* Decide if should prefetch joinqual.
*
* Joinqual should be prefetched when both outer and joinqual contain motions.
* In create_*join_plan() functions we set prefetch_joinqual according to the
* outer motions, now we detect for joinqual motions to make the final
* decision.
*
* See ExecPrefetchJoinQual() for details.
*
* This function should be called in ExecInit*Join() functions.
*
* Return true if JoinQual should be prefetched.
*/
bool
ShouldPrefetchJoinQual(EState *estate, Join *join)
{
return (join->prefetch_joinqual &&
findSenderMotion(estate->es_plannedstmt,
estate->currentSliceIdInPlan));
}
/* ----------------------------------------------------------------
* CDB Slice Table utilities
* ----------------------------------------------------------------
......
......@@ -234,6 +234,14 @@ ExecHashJoin_guts(HashJoinState *node)
if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
return NULL;
/*
* Prefetch JoinQual to prevent motion hazard.
*
* See ExecPrefetchJoinQual() for details.
*/
if (node->prefetch_joinqual && ExecPrefetchJoinQual(&node->js))
node->prefetch_joinqual = false;
/*
* We just scanned the entire inner side and built the hashtable
* (and its overflow batches). Check here and remember if the inner
......@@ -592,6 +600,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
* the fix to MPP-989)
*/
hjstate->prefetch_inner = node->join.prefetch_inner;
hjstate->prefetch_joinqual = ShouldPrefetchJoinQual(estate, &node->join);
/*
* initialize child nodes
......
......@@ -680,6 +680,14 @@ ExecMergeJoin_guts(MergeJoinState *node)
node->prefetch_inner = false;
}
/*
* Prefetch JoinQual to prevent motion hazard.
*
* See ExecPrefetchJoinQual() for details.
*/
if (node->prefetch_joinqual && ExecPrefetchJoinQual(&node->js))
node->prefetch_joinqual = false;
/*
* ok, everything is setup.. let's go to work
*/
......@@ -1564,6 +1572,7 @@ ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags)
mergestate->mj_ConstFalseJoin = false;
mergestate->prefetch_inner = node->join.prefetch_inner;
mergestate->prefetch_joinqual = ShouldPrefetchJoinQual(estate, &node->join);
/* Prepare inner operators for rewind after the prefetch */
rewindflag = mergestate->prefetch_inner ? EXEC_FLAG_REWIND : 0;
......
......@@ -142,6 +142,14 @@ ExecNestLoop_guts(NestLoopState *node)
node->reset_inner = false;
}
/*
* Prefetch JoinQual to prevent motion hazard.
*
* See ExecPrefetchJoinQual() for details.
*/
if (node->prefetch_joinqual && ExecPrefetchJoinQual(&node->js))
node->prefetch_joinqual = false;
/*
* Ok, everything is setup for the join so now loop until we return a
* qualifying join tuple.
......@@ -382,6 +390,7 @@ ExecInitNestLoop(NestLoop *node, EState *estate, int eflags)
nlstate->shared_outer = node->shared_outer;
nlstate->prefetch_inner = node->join.prefetch_inner;
nlstate->prefetch_joinqual = ShouldPrefetchJoinQual(estate, &node->join);
/*CDB-OLAP*/
nlstate->reset_inner = false;
......
......@@ -871,6 +871,7 @@ CopyJoinFields(const Join *from, Join *newnode)
CopyPlanFields((const Plan *) from, (Plan *) newnode);
COPY_SCALAR_FIELD(prefetch_inner);
COPY_SCALAR_FIELD(prefetch_joinqual);
COPY_SCALAR_FIELD(jointype);
COPY_NODE_FIELD(joinqual);
......
......@@ -406,6 +406,7 @@ _outJoinPlanInfo(StringInfo str, const Join *node)
_outPlanInfo(str, (const Plan *) node);
WRITE_BOOL_FIELD(prefetch_inner);
WRITE_BOOL_FIELD(prefetch_joinqual);
WRITE_ENUM_FIELD(jointype, JoinType);
WRITE_NODE_FIELD(joinqual);
......
......@@ -2427,6 +2427,7 @@ void readJoinInfo(Join *local_node)
readPlanInfo((Plan *) local_node);
READ_BOOL_FIELD(prefetch_inner);
READ_BOOL_FIELD(prefetch_joinqual);
READ_ENUM_FIELD(jointype, JoinType);
READ_NODE_FIELD(joinqual);
......
......@@ -776,6 +776,18 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path)
if (partition_selector_created)
((Join *) plan)->prefetch_inner = true;
/*
* A motion deadlock can also happen when outer and joinqual both contain
* motions. It is not easy to check for joinqual here, so we set the
* prefetch_joinqual mark only according to outer motion, and check for
* joinqual later in the executor.
*
* See ExecPrefetchJoinQual() for details.
*/
if (best_path->outerjoinpath &&
best_path->outerjoinpath->motionHazard)
((Join *) plan)->prefetch_joinqual = true;
plan->flow = cdbpathtoplan_create_flow(root,
best_path->path.locus,
best_path->path.parent ? best_path->path.parent->relids
......@@ -3189,6 +3201,18 @@ create_nestloop_plan(PlannerInfo *root,
if (prefetch)
join_plan->join.prefetch_inner = true;
/*
* A motion deadlock can also happen when outer and joinqual both contain
* motions. It is not easy to check for joinqual here, so we set the
* prefetch_joinqual mark only according to outer motion, and check for
* joinqual later in the executor.
*
* See ExecPrefetchJoinQual() for details.
*/
if (best_path->outerjoinpath &&
best_path->outerjoinpath->motionHazard)
join_plan->join.prefetch_joinqual = true;
return join_plan;
}
......@@ -3514,6 +3538,25 @@ create_mergejoin_plan(PlannerInfo *root,
join_plan->join.prefetch_inner = prefetch;
/*
* A motion deadlock can also happen when outer and joinqual both contain
* motions. It is not easy to check for joinqual here, so we set the
* prefetch_joinqual mark only according to outer motion, and check for
* joinqual later in the executor.
*
* See ExecPrefetchJoinQual() for details.
*/
if (best_path->jpath.outerjoinpath &&
best_path->jpath.outerjoinpath->motionHazard)
join_plan->join.prefetch_joinqual = true;
/*
* If inner motion is not under a Material or Sort node then there could
* also be motion deadlock between inner and joinqual in mergejoin.
*/
if (best_path->jpath.innerjoinpath &&
best_path->jpath.innerjoinpath->motionHazard)
join_plan->join.prefetch_joinqual = true;
/* Costs of sort and material steps are included in path cost already */
copy_path_costsize(root, &join_plan->join.plan, &best_path->jpath.path);
......@@ -3668,6 +3711,18 @@ create_hashjoin_plan(PlannerInfo *root,
join_plan->join.prefetch_inner = true;
}
/*
* A motion deadlock can also happen when outer and joinqual both contain
* motions. It is not easy to check for joinqual here, so we set the
* prefetch_joinqual mark only according to outer motion, and check for
* joinqual later in the executor.
*
* See ExecPrefetchJoinQual() for details.
*/
if (best_path->jpath.outerjoinpath &&
best_path->jpath.outerjoinpath->motionHazard)
join_plan->join.prefetch_joinqual = true;
copy_path_costsize(root, &join_plan->join.plan, &best_path->jpath.path);
return join_plan;
......
......@@ -470,6 +470,9 @@ extern void UnregisterExprContextCallback(ExprContext *econtext,
/* Share input utilities defined in execUtils.c */
extern ShareNodeEntry * ExecGetShareNodeEntry(EState *estate, int shareid, bool fCreate);
extern bool ExecPrefetchJoinQual(JoinState *node);
extern bool ShouldPrefetchJoinQual(EState *estate, Join *join);
/* ResultRelInfo and Append Only segment assignment */
void ResultRelInfoSetSegno(ResultRelInfo *resultRelInfo, List *mapping);
......
......@@ -2264,6 +2264,7 @@ typedef struct NestLoopState
bool nl_MatchedOuter;
bool shared_outer;
bool prefetch_inner;
bool prefetch_joinqual;
bool reset_inner; /*CDB-OLAP*/
bool require_inner_reset; /*CDB-OLAP*/
......@@ -2319,6 +2320,7 @@ typedef struct MergeJoinState
ExprContext *mj_OuterEContext;
ExprContext *mj_InnerEContext;
bool prefetch_inner; /* MPP-3300 */
bool prefetch_joinqual;
} MergeJoinState;
/* ----------------
......@@ -2376,6 +2378,7 @@ typedef struct HashJoinState
bool hj_OuterNotEmpty;
bool hj_InnerEmpty; /* set to true if inner side is empty */
bool prefetch_inner;
bool prefetch_joinqual;
bool hj_nonequijoin;
/* set if the operator created workfiles */
......
......@@ -867,6 +867,7 @@ typedef struct Join
List *joinqual; /* JOIN quals (in addition to plan.qual) */
bool prefetch_inner; /* to avoid deadlock in MPP */
bool prefetch_joinqual; /* to avoid deadlock in MPP */
} Join;
/* ----------------
......
-- A classic motion hazard / deadlock is between the inner and outer motions,
-- but it is also possible to happen between the outer and joinqual / subplan
-- motions. A sample plan is as below:
--
-- Gather Motion 3:1 (slice4; segments: 3)
-- -> Hash Left Join
-- Hash Cond: (t_outer.c2 = t_inner.c2)
-- Join Filter: (NOT (SubPlan))
-- -> Redistribute Motion 3:3 (slice1; segments: 3)
-- Hash Key: t_outer.c2
-- -> Seq Scan on t_outer
-- -> Hash
-- -> Redistribute Motion 3:3 (slice2; segments: 3)
-- Hash Key: t_inner.c2
-- -> Seq Scan on t_inner
-- SubPlan 1 (slice4; segments: 3)
-- -> Result
-- Filter: (t_subplan.c2 = t_outer.c1)
-- -> Materialize
-- -> Broadcast Motion 3:3 (slice3; segments: 3)
-- -> Seq Scan on t_subplan
-- Suppose :x0 is distributed on seg0, it does not matter if it is not.
-- This assumption is only to simplify the explanation.
\set x0 1
\set scale 10000
drop schema if exists deadlock2 cascade;
NOTICE: schema "deadlock2" does not exist, skipping
create schema deadlock2;
set search_path = deadlock2;
create table t_inner (c1 int, c2 int);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
create table t_outer (c1 int, c2 int);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
create table t_subplan (c1 int, c2 int);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
-- First load enough data on all the relations to generate redistribute-both
-- motion instead of broadcast-one motion.
insert into t_inner select i, i from generate_series(1,:scale) i;
insert into t_outer select i, i from generate_series(1,:scale) i;
insert into t_subplan select i, i from generate_series(1,:scale) i;
analyze t_inner;
analyze t_outer;
analyze t_subplan;
-- Then delete all of them and load the real data, do not use TRUNCATE as it
-- will clear the analyze information.
delete from t_inner;
delete from t_outer;
delete from t_subplan;
-- t_inner is the inner relation, it does not need much data as long as it is
-- not empty.
insert into t_inner values (:x0, :x0);
-- t_outer is the outer relation of the hash join, all its data are on seg0,
-- and redistributes to seg0, so outer@slice1@seg0 sends data to
-- hashjoin@slice4@seg0 and waits for ACK from it. After all the data are sent
-- it will send EOS to all the segments of hashjoin@slice4. So once
-- hashjoin@slice4@seg1 reads from outer@slice1 it has to wait for EOS.
insert into t_outer select :x0, :x0 from generate_series(1,:scale) i;
-- t_subplan is the subplan relation of the hash join, all its data are on
-- seg0, and broadcasts to seg0 and seg1. When subplan@slice3@seg0 sends data
-- to hashjoin@slice4@seg1 it has to wait for ACK from it, this can happen
-- before hashjoin@slice4@seg1 reading from outer@slice1.
insert into t_subplan select :x0, :x0 from generate_series(1,:scale) i;
-- In the past hash join do the job like this:
--
-- 10. read all the inner tuples and build the hash table;
-- 20. read an outer tuple;
-- 30. pass the outer tuple to the join qual, reads through the motion in the
-- subplan;
-- 40. if there are more outer tuples goto 20.
--
-- So this makes it possible that hashjoin@slice4@seg0 is reading from
-- subplan@slice3@seg0 while itself is being waited by outer@slice1@seg0, then
-- it forms a deadlock:
--
-- outer@slice1@seg0 --[waits for ACK]-> hashjoin@slice4@seg0
-- ^ |
-- | |
-- [waits for outer tuple] [waits for subplan tuple]
-- | |
-- | v
-- hashjoin@slice4@seg1 <-[wait for ACK]-- subplan@slice3@seg0
--
-- This deadlock is prevented by prefetching the subplan.
-- In theory this deadlock exists in all of hash join, merge join and nestloop
-- join, but so far we have only constructed a reproducer for hash join.
set enable_hashjoin to on;
set enable_mergejoin to off;
set enable_nestloop to off;
select count(*) from t_inner right join t_outer on t_inner.c2=t_outer.c2
and not exists (select 0 from t_subplan where t_subplan.c2=t_outer.c1);
count
-------
10000
(1 row)
......@@ -49,6 +49,7 @@ test: event_trigger_gp
# deadlock tests run separately - because we don't know which one gets stuck.
test: deadlock
test: deadlock2
# test workfiles
test: workfile/hashagg_spill workfile/hashjoin_spill workfile/materialize_spill workfile/sisc_mat_sort workfile/sisc_sort_spill workfile/sort_spill workfile/spilltodisk
......
-- A classic motion hazard / deadlock is between the inner and outer motions,
-- but it is also possible to happen between the outer and joinqual / subplan
-- motions. A sample plan is as below:
--
-- Gather Motion 3:1 (slice4; segments: 3)
-- -> Hash Left Join
-- Hash Cond: (t_outer.c2 = t_inner.c2)
-- Join Filter: (NOT (SubPlan))
-- -> Redistribute Motion 3:3 (slice1; segments: 3)
-- Hash Key: t_outer.c2
-- -> Seq Scan on t_outer
-- -> Hash
-- -> Redistribute Motion 3:3 (slice2; segments: 3)
-- Hash Key: t_inner.c2
-- -> Seq Scan on t_inner
-- SubPlan 1 (slice4; segments: 3)
-- -> Result
-- Filter: (t_subplan.c2 = t_outer.c1)
-- -> Materialize
-- -> Broadcast Motion 3:3 (slice3; segments: 3)
-- -> Seq Scan on t_subplan
-- Suppose :x0 is distributed on seg0, it does not matter if it is not.
-- This assumption is only to simplify the explanation.
\set x0 1
\set scale 10000
drop schema if exists deadlock2 cascade;
create schema deadlock2;
set search_path = deadlock2;
create table t_inner (c1 int, c2 int);
create table t_outer (c1 int, c2 int);
create table t_subplan (c1 int, c2 int);
-- First load enough data on all the relations to generate redistribute-both
-- motion instead of broadcast-one motion.
insert into t_inner select i, i from generate_series(1,:scale) i;
insert into t_outer select i, i from generate_series(1,:scale) i;
insert into t_subplan select i, i from generate_series(1,:scale) i;
analyze t_inner;
analyze t_outer;
analyze t_subplan;
-- Then delete all of them and load the real data, do not use TRUNCATE as it
-- will clear the analyze information.
delete from t_inner;
delete from t_outer;
delete from t_subplan;
-- t_inner is the inner relation, it does not need much data as long as it is
-- not empty.
insert into t_inner values (:x0, :x0);
-- t_outer is the outer relation of the hash join, all its data are on seg0,
-- and redistributes to seg0, so outer@slice1@seg0 sends data to
-- hashjoin@slice4@seg0 and waits for ACK from it. After all the data are sent
-- it will send EOS to all the segments of hashjoin@slice4. So once
-- hashjoin@slice4@seg1 reads from outer@slice1 it has to wait for EOS.
insert into t_outer select :x0, :x0 from generate_series(1,:scale) i;
-- t_subplan is the subplan relation of the hash join, all its data are on
-- seg0, and broadcasts to seg0 and seg1. When subplan@slice3@seg0 sends data
-- to hashjoin@slice4@seg1 it has to wait for ACK from it, this can happen
-- before hashjoin@slice4@seg1 reading from outer@slice1.
insert into t_subplan select :x0, :x0 from generate_series(1,:scale) i;
-- In the past hash join do the job like this:
--
-- 10. read all the inner tuples and build the hash table;
-- 20. read an outer tuple;
-- 30. pass the outer tuple to the join qual, reads through the motion in the
-- subplan;
-- 40. if there are more outer tuples goto 20.
--
-- So this makes it possible that hashjoin@slice4@seg0 is reading from
-- subplan@slice3@seg0 while itself is being waited by outer@slice1@seg0, then
-- it forms a deadlock:
--
-- outer@slice1@seg0 --[waits for ACK]-> hashjoin@slice4@seg0
-- ^ |
-- | |
-- [waits for outer tuple] [waits for subplan tuple]
-- | |
-- | v
-- hashjoin@slice4@seg1 <-[wait for ACK]-- subplan@slice3@seg0
--
-- This deadlock is prevented by prefetching the subplan.
-- In theory this deadlock exists in all of hash join, merge join and nestloop
-- join, but so far we have only constructed a reproducer for hash join.
set enable_hashjoin to on;
set enable_mergejoin to off;
set enable_nestloop to off;
select count(*) from t_inner right join t_outer on t_inner.c2=t_outer.c2
and not exists (select 0 from t_subplan where t_subplan.c2=t_outer.c1);
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册