提交 4075136b 编写于 作者: F foyzur 提交者: GitHub

Extracting column names from ShareInputScan in ORCA plans to support proper...

Extracting column names from ShareInputScan in ORCA plans to support proper column name resolution using RTE_CTE (#992)

* Extracting column names from ShareInputScan in ORCA plans to support proper column name resolution using RTE_CTE.

* Code review on PR 992.

* PlannerGlobal has out-function support in the upstream, so removing it
  altogether doesn't seem like a good idea. I'm not sure if it get
  printed out with suitable verbose or debug flags, but I remember seeing
  it being printed out during debugging somehow, and it can be useful.

* Refactor the functions in cdbmutate.c, so that there's a separate function
  to do the DAG to Tree conversion, and a separate function for just
  collecting the producer nodes. It seems like a bad idea that a function
  called "apply_dag_to_tree" actually does something different, depending
  on a flag in a struct.

* Now that we have a separate array of producers, no need to hold the
  colnames etc. lists in ShareInputScan node itself. Since we can look up
  the producer node at will, we might as well look at the producer node's
  sub-tree directly every time we construct the CTE RTE.

* One complication from the previous change is that we can't call
  get_tle_name() in replace_shareinput_targetlists(), because that runs
  after the post-processing in setrefs.c, so all Vars have already been
  changed to use INNER/OUTER. get_tle_name() doesn't work with those.
  On closer inspection, I think this was a bit fiddly in the ORCA case
  before too, because in ORCA-generated plans, Vars always use the
  INNER/OUTER notation, so calling get_tle_name() on an ORCA-generated
  plan was always questionable. It happened to work, becuase ORCA also
  makes seems to always fill in TargetEntry.resname, so get_tle_name()
  always just picked that, rather than looking up the range table entry.
  This new structuring of the code avoids relying on that assumption.

* Refactored the code to create the fake CTE RTE to a separate function.
  replace_shareinput_targetlists_walker() had grown quite complex.

* Use the producers array in setrefs.c

* Get rid of separate sharedNodes list.

Now that we have an array of producers, conveniently indexed by share_id,
just use that. Mostly for sake of readability, although you might see
a performance gain in corner-cases involving a huge number of share input
scans.
上级 0718dca1
......@@ -1933,78 +1933,189 @@ assign_plannode_id_walker(Node *node, assign_plannode_id_walker_context *ctxt)
list_delete_first(ctxt->planNodes);
}
/*
* DAG to Tree
* Zap children if I am not the first sharer (not recursive down).
* Assign share_id to both ShareInputScan and Material/Sort.
/*
* Create a fake CTE range table entry that reflects the target list of a
* shared input.
*/
static bool shareinput_mutator_dag_to_tree(Node *node, PlannerGlobal *glob, bool fPop)
static RangeTblEntry *
create_shareinput_producer_rte(ApplyShareInputContext *ctxt, int share_id,
int refno)
{
int attno = 1;
ListCell *lc;
Plan *subplan;
char buf[100];
RangeTblEntry *rte;
List *colnames = NIL;
List *coltypes = NIL;
List *coltypmods = NIL;
ShareInputScan *producer;
Assert(ctxt->producer_count > share_id);
producer = ctxt->producers[share_id];
subplan = producer->scan.plan.lefttree;
foreach(lc, subplan->targetlist)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
Oid vartype;
int32 vartypmod;
char *resname;
vartype = exprType((Node *) tle->expr);
vartypmod = exprTypmod((Node *) tle->expr);
/*
* We should've filled in tle->resname in
* shareinput_save_producer(). Note that it's too late to call
* get_tle_name() here, because this runs after all the varnos in Vars
* have already been changed to INNER/OUTER.
*/
resname = tle->resname;
if (!resname)
resname = pstrdup("unnamed_attr");
colnames = lappend(colnames, makeString(resname));
coltypes = lappend_oid(coltypes, vartype);
coltypmods = lappend_int(coltypmods, vartypmod);
attno++;
}
/*
* Create a new RTE. Note that we use a different RTE for each reference,
* because we want to give each reference a different name.
*/
snprintf(buf, sizeof(buf), "share%d_ref%d", share_id, refno);
rte = makeNode(RangeTblEntry);
rte->rtekind = RTE_CTE;
rte->ctename = pstrdup(buf);
rte->ctelevelsup = 0;
rte->self_reference = false;
rte->alias = NULL;
rte->eref = makeAlias(rte->ctename, colnames);
rte->ctecoltypes = coltypes;
rte->ctecoltypmods = coltypmods;
rte->inh = false;
rte->inFromCl = false;
rte->requiredPerms = 0;
rte->checkAsUser = InvalidOid;
return rte;
}
/*
* Memorize the producer of a shared input in an array of producers, one
* producer per share_id.
*/
static void
shareinput_save_producer(ShareInputScan *plan, ApplyShareInputContext *ctxt)
{
int share_id = plan->share_id;
int new_producer_count = (share_id + 1);
Assert(plan->share_id >= 0);
if (ctxt->producers == NULL)
{
ctxt->producers = palloc0(sizeof(ShareInputScan *) * new_producer_count);
ctxt->producer_count = new_producer_count;
}
else if (ctxt->producer_count < new_producer_count)
{
ctxt->producers = repalloc(ctxt->producers, new_producer_count * sizeof(ShareInputScan *));
memset(&ctxt->producers[ctxt->producer_count], 0, (new_producer_count - ctxt->producer_count) * sizeof(ShareInputScan *));
ctxt->producer_count = new_producer_count;
}
Assert(ctxt->producers[share_id] == NULL);
ctxt->producers[share_id] = plan;
}
/*
* When a plan comes out of the planner, all the ShareInputScan nodes belonging
* to the same "share" have the same child node. apply_shareinput_dag_to_tree()
* turns the DAG into a proper tree. The first occurrence of a ShareInput scan,
* with a particular child tree, becomes the "producer" of the share, and the
* others becomes consumers. The subtree is removed from all the consumer nodes.
*
* Also, a share_id is assigned to each ShareInputScan node, as well as the
* Material/Sort nodes below the producers. The producers and its consumers
* are linked together by the same share_id.
*/
static bool
shareinput_mutator_dag_to_tree(Node *node, PlannerGlobal *glob, bool fPop)
{
ApplyShareInputContext *ctxt = &glob->share;
Plan *plan = (Plan *) node;
Plan *plan = (Plan *) node;
if(fPop)
if (fPop)
return true;
if(IsA(plan, ShareInputScan))
if (IsA(plan, ShareInputScan))
{
ShareInputScan *siscan = (ShareInputScan *) plan;
Plan *subplan = plan->lefttree;
int i;
int attno;
ListCell *lc;
Assert(subplan);
Assert(IsA(subplan, Material) || IsA(subplan, Sort));
Assert(get_plan_share_id(plan) == SHARE_ID_NOT_ASSIGNED);
Assert(plan->righttree == NULL);
Assert(plan->initPlan == NULL);
/* Is there a producer for this sub-tree already? */
for (i = 0; i < ctxt->producer_count; i++)
{
if (ctxt->producers[i] && ctxt->producers[i]->scan.plan.lefttree == subplan)
{
/*
* Yes. This is a consumer. Remove the subtree, and assign
* the same share_id as the producer.
*/
Assert(get_plan_share_id((Plan *)ctxt->producers[i]) == i);
set_plan_share_id((Plan *) plan, ctxt->producers[i]->share_id);
siscan->scan.plan.lefttree = NULL;
return false;
}
}
/*
* Couldn't find a match in existing list of producers, so this
* is a producer. Add this to the list of producers, and assign
* a new share_id.
*/
Assert(get_plan_share_id(subplan) == SHARE_ID_NOT_ASSIGNED);
set_plan_share_id((Plan *) plan, ctxt->producer_count);
set_plan_share_id(subplan, ctxt->producer_count);
shareinput_save_producer(siscan, ctxt);
/*
* Before we zap the child, fill in information about the child's
* targetlist, so that we can still EXPLAIN this correctly.
* Also make sure that all the entries in the subplan's target list
* have human-readable column names. They are used for EXPLAIN.
*/
attno = 1;
foreach(lc, subplan->targetlist)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
char buf[100];
Oid vartype;
int32 vartypmod;
snprintf(buf, sizeof(buf), "col_%d", attno);
if (tle->resname == NULL)
{
char default_name[100];
char *resname;
vartype = exprType((Node *) tle->expr);
vartypmod = exprTypmod((Node *) tle->expr);
snprintf(default_name, sizeof(default_name), "col_%d", attno);
siscan->colnames = lappend(siscan->colnames, get_tle_name(tle, ctxt->curr_rtable, buf));
siscan->coltypes = lappend_oid(siscan->coltypes, vartype);
siscan->coltypmods = lappend_int(siscan->coltypmods, vartypmod);
resname = strVal(get_tle_name(tle, ctxt->curr_rtable, default_name));
tle->resname = pstrdup(resname);
}
attno++;
}
if(list_member_ptr(ctxt->sharedNodes, subplan))
{
Assert(get_plan_share_id(subplan) >= 0);
set_plan_share_id(plan, get_plan_share_id(subplan));
plan->lefttree = NULL;
return false;
}
else
{
Assert(get_plan_share_id(subplan) == SHARE_ID_NOT_ASSIGNED);
set_plan_share_id(subplan, list_length(ctxt->sharedNodes));
set_plan_share_id(plan, get_plan_share_id(subplan));
ctxt->sharedNodes = lappend(ctxt->sharedNodes, subplan);
return true;
}
}
return true;
}
/*
* Apply the share input mutator.
*/
Plan *
apply_shareinput_dag_to_tree(PlannerGlobal *glob, Plan *plan, List *rtable)
{
......@@ -2013,6 +2124,45 @@ apply_shareinput_dag_to_tree(PlannerGlobal *glob, Plan *plan, List *rtable)
return plan;
}
/*
* Collect all the producer ShareInput nodes into an array, for later use by
* replace_shareinput_targetlists().
*
* This is a stripped-down version of apply_shareinput_dag_to_tree(), for use
* on ORCA-produced plans. ORCA assigns share_ids to all ShareInputScan nodes,
* and only producer nodes have a subtree, so we don't need to do the DAG to
* tree conversion or assign share_ids here.
*/
static bool
collect_shareinput_producers_walker(Node *node, PlannerGlobal *glob, bool fPop)
{
ApplyShareInputContext *ctxt = &glob->share;
if (fPop)
return true;
if (IsA(node, ShareInputScan))
{
ShareInputScan *siscan = (ShareInputScan *) node;
Plan *subplan = siscan->scan.plan.lefttree;
Assert(get_plan_share_id((Plan *) siscan) >= 0);
if (subplan)
{
shareinput_save_producer(siscan, ctxt);
}
}
return true;
}
void
collect_shareinput_producers(PlannerGlobal *glob, Plan *plan, List *rtable)
{
glob->share.curr_rtable = rtable;
shareinput_walker(collect_shareinput_producers_walker, (Node *) plan, glob);
}
/* Some helper: implements a stack using List. */
static void shareinput_pushmot(ApplyShareInputContext *ctxt, int motid)
{
......@@ -2045,7 +2195,7 @@ static int shareinput_peekmot(ApplyShareInputContext *ctxt)
* Vars that point to the CTE instead of the child plan.
*/
Plan *
replace_shareinput_targetlists(PlannerGlobal *glob, Plan *plan)
replace_shareinput_targetlists(PlannerGlobal *glob, Plan *plan, List *rtable)
{
shareinput_walker(replace_shareinput_targetlists_walker, (Node *) plan, glob);
return plan;
......@@ -2067,7 +2217,6 @@ replace_shareinput_targetlists_walker(Node *node, PlannerGlobal *glob, bool fPop
int attno;
List *newtargetlist;
RangeTblEntry *rte;
char buf[100];
/*
* Note that even though the planner assigns sequential share_ids for each
......@@ -2098,27 +2247,8 @@ replace_shareinput_targetlists_walker(Node *node, PlannerGlobal *glob, bool fPop
* Create a new RTE. Note that we use a different RTE for each reference,
* because we want to give each reference a different name.
*/
snprintf(buf, sizeof(buf), "share%d_ref%d",
sisc->share_id,
ctxt->share_refcounts[share_id]);
rte = makeNode(RangeTblEntry);
rte->rtekind = RTE_CTE;
rte->ctename = pstrdup(buf);
rte->ctelevelsup = 0;
rte->self_reference = false;
rte->alias = NULL;
rte->eref = makeAlias(rte->ctename, sisc->colnames);
rte->ctecoltypes = sisc->coltypes;
rte->ctecoltypmods = sisc->coltypmods;
rte->inh = false;
rte->inFromCl = false;
rte->requiredPerms = 0;
rte->checkAsUser = InvalidOid;
rte = create_shareinput_producer_rte(ctxt, share_id,
ctxt->share_refcounts[share_id]);
glob->finalrtable = lappend(glob->finalrtable, rte);
sisc->scan.scanrelid = list_length(glob->finalrtable);
......@@ -2156,39 +2286,39 @@ typedef struct ShareNodeWithSliceMark
int slice_mark;
} ShareNodeWithSliceMark;
static bool shareinput_find_sharenode(ApplyShareInputContext *ctxt, int share_id, ShareNodeWithSliceMark *result)
static bool
shareinput_find_sharenode(ApplyShareInputContext *ctxt, int share_id, ShareNodeWithSliceMark *result)
{
ListCell *lc;
ListCell *lc_slicemark;
if(ctxt->sharedNodes == NULL)
ShareInputScan *siscan;
Plan *plan;
Assert(share_id < ctxt->producer_count);
if (share_id >= ctxt->producer_count)
return false;
forboth(lc, ctxt->sharedNodes, lc_slicemark, ctxt->sliceMarks)
{
Plan *plan = (Plan *) lfirst(lc);
siscan = ctxt->producers[share_id];
if (!siscan)
return false;
Assert(IsA(plan, Material) || IsA(plan, Sort));
if(get_plan_share_id(plan) == share_id)
{
if(result)
{
result->plan = plan;
result->slice_mark = lfirst_int(lc_slicemark);
}
return true;
}
plan = siscan->scan.plan.lefttree;
Assert(get_plan_share_id(plan) == share_id);
Assert(IsA(plan, Material) || IsA(plan, Sort));
if (result)
{
result->plan = plan;
result->slice_mark = ctxt->sliceMarks[share_id];
}
return false;
return true;
}
/*
* First walk on shareinput xslice. It does the following:
*
* 1. Build the sharedNodes in context.
* 2. Build the sliceMarks in context.
* 3. Build a list a share on QD
* 1. Build the sliceMarks in context.
* 2. Build a list a share on QD
*/
static bool
shareinput_mutator_xslice_1(Node* node, PlannerGlobal *glob, bool fPop)
......@@ -2225,12 +2355,17 @@ shareinput_mutator_xslice_1(Node* node, PlannerGlobal *glob, bool fPop)
if (shared)
{
Assert(shareinput_find_sharenode(ctxt, sisc->share_id, NULL) == false);
Assert(get_plan_share_id(plan) == get_plan_share_id(shared));
set_plan_driver_slice(shared, motId);
ctxt->sharedNodes = lappend(ctxt->sharedNodes, shared);
ctxt->sliceMarks = lappend_int(ctxt->sliceMarks, motId);
/*
* We need to repopulate the producers array. cdbparallelize() was
* run on the plan tree between shareinput_mutator_dag_to_tree() and
* here, which copies all the nodes, and the destroys the producers
* array in the process.
*/
ctxt->producers[sisc->share_id] = sisc;
ctxt->sliceMarks[sisc->share_id] = motId;
}
}
......@@ -2404,13 +2539,13 @@ apply_shareinput_xslice(Plan *plan, PlannerGlobal *glob)
ApplyShareInputContext *ctxt = &glob->share;
ListCell *lp;
ctxt->sharedNodes = NULL;
ctxt->sliceMarks = NULL;
ctxt->motStack = NULL;
ctxt->qdShares = NULL;
ctxt->qdSlices = NULL;
ctxt->nextPlanId = 0;
ctxt->sliceMarks = palloc0(ctxt->producer_count * sizeof(int));
shareinput_pushmot(ctxt, 0);
/*
......
......@@ -818,9 +818,6 @@ _copyShareInputScan(ShareInputScan *from)
COPY_SCALAR_FIELD(share_type);
COPY_SCALAR_FIELD(share_id);
COPY_SCALAR_FIELD(driver_slice);
COPY_NODE_FIELD(colnames);
COPY_NODE_FIELD(coltypes);
COPY_NODE_FIELD(coltypmods);
return newnode;
}
......
......@@ -851,9 +851,6 @@ _outShareInputScan(StringInfo str, ShareInputScan *node)
WRITE_ENUM_FIELD(share_type, ShareType);
WRITE_INT_FIELD(share_id);
WRITE_INT_FIELD(driver_slice);
WRITE_NODE_FIELD(colnames);
WRITE_NODE_FIELD(coltypes);
WRITE_NODE_FIELD(coltypmods);
_outPlanInfo(str, (Plan *) node);
}
......@@ -1967,8 +1964,6 @@ _outPlannerGlobal(StringInfo str, PlannerGlobal *node)
WRITE_NODE_FIELD(relationOids);
WRITE_NODE_FIELD(invalItems);
WRITE_BOOL_FIELD(transientPlan);
WRITE_NODE_FIELD(share.sharedNodes);
WRITE_NODE_FIELD(share.sliceMarks);
WRITE_NODE_FIELD(share.motStack);
WRITE_NODE_FIELD(share.qdShares);
WRITE_NODE_FIELD(share.qdSlices);
......
......@@ -2067,9 +2067,6 @@ _readShareInputScan(void)
READ_ENUM_FIELD(share_type, ShareType);
READ_INT_FIELD(share_id);
READ_INT_FIELD(driver_slice);
READ_NODE_FIELD(colnames);
READ_NODE_FIELD(coltypes);
READ_NODE_FIELD(coltypmods);
readPlanInfo((Plan *)local_node);
......
......@@ -193,8 +193,9 @@ optimize_query(Query *parse, ParamListInfo boundParams)
glob->subrtables = NIL;
glob->rewindPlanIDs = NULL;
glob->transientPlan = false;
glob->share.sharedNodes = NIL;
glob->share.sliceMarks = NIL;
glob->share.producers = NULL;
glob->share.producer_count = 0;
glob->share.sliceMarks = NULL;
glob->share.motStack = NIL;
glob->share.qdShares = NIL;
glob->share.qdSlices = NIL;
......@@ -242,6 +243,20 @@ optimize_query(Query *parse, ParamListInfo boundParams)
glob->finalrtable = result->rtable;
glob->subplans = result->subplans;
/*
* For optimizer, we already have share_id and the plan tree is already a tree.
* However, the apply_shareinput_dag_to_tree walker does more than DAG conversion.
* It will also populate column names for RTE_CTE entries that will be later used
* for readable column names in EXPLAIN, if needed.
*/
foreach(lp, glob->subplans)
{
Plan *subplan = (Plan *) lfirst(lp);
collect_shareinput_producers(glob, subplan, result->rtable);
}
collect_shareinput_producers(glob, result->planTree, result->rtable);
/* Post-process ShareInputScan nodes */
(void) apply_shareinput_xslice(result->planTree, glob);
......@@ -252,9 +267,9 @@ optimize_query(Query *parse, ParamListInfo boundParams)
foreach(lp, glob->subplans)
{
Plan *subplan = (Plan *) lfirst(lp);
lfirst(lp) = replace_shareinput_targetlists(glob, subplan);
lfirst(lp) = replace_shareinput_targetlists(glob, subplan, result->rtable);
}
result->planTree = replace_shareinput_targetlists(glob, result->planTree);
result->planTree = replace_shareinput_targetlists(glob, result->planTree, result->rtable);
/*
* To save on memory, and on the network bandwidth when the plan is
......@@ -425,8 +440,9 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
glob->invalItems = NIL;
glob->transientPlan = false;
/* ApplyShareInputContext initialization. */
glob->share.sharedNodes = NIL;
glob->share.sliceMarks = NIL;
glob->share.producers = NULL;
glob->share.producer_count = 0;
glob->share.sliceMarks = NULL;
glob->share.motStack = NIL;
glob->share.qdShares = NIL;
glob->share.qdSlices = NIL;
......@@ -521,6 +537,14 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
top_plan = cdbparallelize(root, top_plan, parse,
cursorOptions,
boundParams);
/*
* cdbparallelize() mutates all the nodes, so the producer nodes
* we memorized earlier are no longer valid. apply_shareinput_xslice()
* will re-populate it, but clear it for now, just to make sure
* that we don't access the obsolete copies of the nodes.
*/
if (glob->share.producer_count > 0)
memset(glob->share.producers, 0, glob->share.producer_count * sizeof(ShareInputScan *));
/* cdbparallelize may create additional slices that may affect share input.
* need to mark material nodes that are split acrossed multi slices.
......@@ -545,9 +569,9 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
foreach(lp, glob->subplans)
{
Plan *subplan = (Plan *) lfirst(lp);
lfirst(lp) = replace_shareinput_targetlists(glob, subplan);
lfirst(lp) = replace_shareinput_targetlists(glob, subplan, glob->finalrtable);
}
top_plan = replace_shareinput_targetlists(glob, top_plan);
top_plan = replace_shareinput_targetlists(glob, top_plan, glob->finalrtable);
/*
* To save on memory, and on the network bandwidth when the plan is dispatched
......
......@@ -726,10 +726,12 @@ set_plan_refs(PlannerGlobal *glob, Plan *plan, int rtoffset)
if (childPlan == NULL)
{
Assert(sisc->share_type != SHARE_NOTSHARED
&& sisc->share_id >= 0
&& glob->share.sharedNodes);
childPlan = list_nth(glob->share.sharedNodes, sisc->share_id);
ShareInputScan *producer;
Assert(sisc->share_type != SHARE_NOTSHARED);
Assert(sisc->share_id >= 0 && sisc->share_id < glob->share.producer_count);
producer = glob->share.producers[sisc->share_id];
childPlan = producer->scan.plan.lefttree;
}
#ifdef DEBUG
......
......@@ -39,7 +39,8 @@ extern void add_slice_to_motion(Motion *m,
extern Plan *zap_trivial_result(PlannerInfo *root, Plan *plan);
extern Plan *apply_shareinput_dag_to_tree(PlannerGlobal *glob, Plan *plan, List *rtable);
extern Plan *replace_shareinput_targetlists(PlannerGlobal *glob, Plan *plan);
extern void collect_shareinput_producers(PlannerGlobal *glob, Plan *plan, List *rtable);
extern Plan *replace_shareinput_targetlists(PlannerGlobal *glob, Plan *plan, List *rtable);
extern Plan *apply_shareinput_xslice(Plan *plan, PlannerGlobal *glob);
extern void assign_plannode_id(PlannedStmt *stmt);
......
......@@ -815,9 +815,6 @@ typedef struct ShareInputScan
ShareType share_type;
int share_id;
int driver_slice; /* slice id that will execute the underlying material/sort */
List *colnames; /* output column names (string Value nodes) */
List *coltypes; /* OID list of column type OIDs */
List *coltypmods; /* integer list of column typmods */
} ShareInputScan;
/* ----------------
......
......@@ -63,15 +63,18 @@ typedef struct QualCost
*/
typedef struct ApplyShareInputContext
{
List *sharedNodes;
List *curr_rtable;
int *share_refcounts;
int share_refcounts_sz; /* allocated sized of 'share_refcounts' */
List *sliceMarks;
List *motStack;
List *qdShares;
List *qdSlices;
int nextPlanId;
ShareInputScan **producers;
int *sliceMarks; /* one for each producer */
int producer_count;
} ApplyShareInputContext;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册