diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 390abe6f1f358330e0aa94f39e695b816cd958c5..090ef8010aa6cb70783679e12246190907e85d72 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -57,6 +57,7 @@ #include "parser/parser.h" #include "rewrite/rewriteDefine.h" #include "rewrite/rewriteHandler.h" +#include "rewrite/rewriteManip.h" #include "storage/smgr.h" #include "utils/acl.h" #include "utils/builtins.h" @@ -170,7 +171,6 @@ static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel); static void add_nonduplicate_constraint(Constraint *cdef, ConstrCheck *check, int *ncheck); -static bool change_varattnos_walker(Node *node, const AttrNumber *newattno); static void StoreCatalogInheritance(Oid relationId, List *supers); static void StoreCatalogInheritance1(Oid relationId, Oid parentOid, int16 seqNumber, Relation inhRelation); @@ -868,7 +868,7 @@ MergeAttributes(List *schema, List *supers, bool istemp, * parents after the first one, nor if we have dropped columns.) */ newattno = (AttrNumber *) - palloc(tupleDesc->natts * sizeof(AttrNumber)); + palloc0(tupleDesc->natts * sizeof(AttrNumber)); for (parent_attno = 1; parent_attno <= tupleDesc->natts; parent_attno++) @@ -882,15 +882,7 @@ MergeAttributes(List *schema, List *supers, bool istemp, * Ignore dropped columns in the parent. */ if (attribute->attisdropped) - { - /* - * change_varattnos_of_a_node asserts that this is greater - * than zero, so if anything tries to use it, we should find - * out. - */ - newattno[parent_attno - 1] = 0; - continue; - } + continue; /* leave newattno entry as zero */ /* * Does it conflict with some previously inherited column? @@ -1000,13 +992,31 @@ MergeAttributes(List *schema, List *supers, bool istemp, { Constraint *cdef = makeNode(Constraint); Node *expr; + bool found_whole_row; + + /* Adjust Vars to match new table's column numbering */ + expr = map_variable_attnos(stringToNode(check[i].ccbin), + 1, 0, + newattno, tupleDesc->natts, + &found_whole_row); + + /* + * For the moment we have to reject whole-row variables. + * We could convert them, if we knew the new table's rowtype + * OID, but that hasn't been assigned yet. + */ + if (found_whole_row) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot convert whole-row table reference"), + errdetail("Constraint \"%s\" contains a whole-row reference to table \"%s\".", + check[i].ccname, + RelationGetRelationName(relation)))); cdef->contype = CONSTR_CHECK; cdef->name = pstrdup(check[i].ccname); cdef->raw_expr = NULL; - /* adjust varattnos of ccbin here */ - expr = stringToNode(check[i].ccbin); - change_varattnos_of_a_node(expr, newattno); + cdef->cooked_expr = nodeToString(expr); constraints = lappend(constraints, cdef); } @@ -1160,101 +1170,6 @@ add_nonduplicate_constraint(Constraint *cdef, ConstrCheck *check, int *ncheck) } -/* - * Replace varattno values in an expression tree according to the given - * map array, that is, varattno N is replaced by newattno[N-1]. It is - * caller's responsibility to ensure that the array is long enough to - * define values for all user varattnos present in the tree. System column - * attnos remain unchanged. - * - * Note that the passed node tree is modified in-place! - */ -void -change_varattnos_of_a_node(Node *node, const AttrNumber *newattno) -{ - /* no setup needed, so away we go */ - (void) change_varattnos_walker(node, newattno); -} - -static bool -change_varattnos_walker(Node *node, const AttrNumber *newattno) -{ - if (node == NULL) - return false; - if (IsA(node, Var)) - { - Var *var = (Var *) node; - - if (var->varlevelsup == 0 && var->varno == 1 && - var->varattno > 0) - { - /* - * ??? the following may be a problem when the node is multiply - * referenced though stringToNode() doesn't create such a node - * currently. - */ - Assert(newattno[var->varattno - 1] > 0); - var->varattno = var->varoattno = newattno[var->varattno - 1]; - } - return false; - } - return expression_tree_walker(node, change_varattnos_walker, - (void *) newattno); -} - -/* - * Generate a map for change_varattnos_of_a_node from old and new TupleDesc's, - * matching according to column name. - */ -AttrNumber * -varattnos_map(TupleDesc old, TupleDesc new) -{ - AttrNumber *attmap; - int i, - j; - - attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * old->natts); - for (i = 1; i <= old->natts; i++) - { - if (old->attrs[i - 1]->attisdropped) - continue; /* leave the entry as zero */ - - for (j = 1; j <= new->natts; j++) - { - if (strcmp(NameStr(old->attrs[i - 1]->attname), - NameStr(new->attrs[j - 1]->attname)) == 0) - { - attmap[i - 1] = j; - break; - } - } - } - return attmap; -} - -/* - * Generate a map for change_varattnos_of_a_node from a TupleDesc and a list - * of ColumnDefs - */ -AttrNumber * -varattnos_map_schema(TupleDesc old, List *schema) -{ - AttrNumber *attmap; - int i; - - attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * old->natts); - for (i = 1; i <= old->natts; i++) - { - if (old->attrs[i - 1]->attisdropped) - continue; /* leave the entry as zero */ - - attmap[i - 1] = findAttrByName(NameStr(old->attrs[i - 1]->attname), - schema); - } - return attmap; -} - - /* * StoreCatalogInheritance * Updates the system catalogs with proper inheritance information. diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index a9079ff2a9edfe3766a1b8613bd6111bf0dad477..3c9462fcf86ebbbbcdf00fdb8a66e5e65c1a7363 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -101,7 +101,8 @@ static void transformTableConstraint(ParseState *pstate, static void transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, InhRelation *inhrelation); static IndexStmt *generateClonedIndexStmt(CreateStmtContext *cxt, - Relation parent_index, AttrNumber *attmap); + Relation source_idx, + const AttrNumber *attmap, int attmap_length); static List *get_opclass(Oid opclass, Oid actual_datatype); static void transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt); @@ -525,6 +526,7 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, Relation relation; TupleDesc tupleDesc; TupleConstr *constr; + AttrNumber *attmap; AclResult aclresult; bool including_defaults = false; bool including_constraints = false; @@ -581,6 +583,13 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, } } + /* + * Initialize column number map for map_variable_attnos(). We need this + * since dropped columns in the source table aren't copied, so the new + * table can have different column numbers. + */ + attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * tupleDesc->natts); + /* * Insert the copied attributes into the cxt for the new table definition. */ @@ -592,7 +601,7 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, ColumnDef *def; /* - * Ignore dropped columns in the parent. + * Ignore dropped columns in the parent. attmap entry is left zero. */ if (attribute->attisdropped) continue; @@ -619,6 +628,8 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, */ cxt->columns = lappend(cxt->columns, def); + attmap[parent_attno - 1] = list_length(cxt->columns); + /* * Copy default, if present and the default has been requested */ @@ -652,21 +663,38 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, /* * Copy CHECK constraints if requested, being careful to adjust attribute - * numbers + * numbers so they match the child. */ if (including_constraints && tupleDesc->constr) { - AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns); int ccnum; for (ccnum = 0; ccnum < tupleDesc->constr->num_check; ccnum++) { char *ccname = tupleDesc->constr->check[ccnum].ccname; char *ccbin = tupleDesc->constr->check[ccnum].ccbin; - Node *ccbin_node = stringToNode(ccbin); Constraint *n = makeNode(Constraint); + Node *ccbin_node; + bool found_whole_row; - change_varattnos_of_a_node(ccbin_node, attmap); + ccbin_node = map_variable_attnos(stringToNode(ccbin), + 1, 0, + attmap, tupleDesc->natts, + &found_whole_row); + + /* + * We reject whole-row variables because the whole point of LIKE + * is that the new table's rowtype might later diverge from the + * parent's. So, while translation might be possible right now, + * it wouldn't be possible to guarantee it would work in future. + */ + if (found_whole_row) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot convert whole-row table reference"), + errdetail("Constraint \"%s\" contains a whole-row reference to table \"%s\".", + ccname, + RelationGetRelationName(relation)))); n->contype = CONSTR_CHECK; n->name = pstrdup(ccname); @@ -682,7 +710,6 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, */ if (including_indexes && relation->rd_rel->relhasindex) { - AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns); List *parent_indexes; ListCell *l; @@ -697,7 +724,8 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, parent_index = index_open(parent_index_oid, AccessShareLock); /* Build CREATE INDEX statement to recreate the parent_index */ - index_stmt = generateClonedIndexStmt(cxt, parent_index, attmap); + index_stmt = generateClonedIndexStmt(cxt, parent_index, + attmap, tupleDesc->natts); /* Save it in the inh_indexes list for the time being */ cxt->inh_indexes = lappend(cxt->inh_indexes, index_stmt); @@ -720,7 +748,7 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, */ static IndexStmt * generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, - AttrNumber *attmap) + const AttrNumber *attmap, int attmap_length) { Oid source_relid = RelationGetRelid(source_idx); HeapTuple ht_idxrel; @@ -833,14 +861,26 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, { /* Expressional index */ Node *indexkey; + bool found_whole_row; if (indexpr_item == NULL) elog(ERROR, "too few entries in indexprs list"); indexkey = (Node *) lfirst(indexpr_item); indexpr_item = lnext(indexpr_item); - /* OK to modify indexkey since we are working on a private copy */ - change_varattnos_of_a_node(indexkey, attmap); + /* Adjust Vars to match new table's column numbering */ + indexkey = map_variable_attnos(indexkey, + 1, 0, + attmap, attmap_length, + &found_whole_row); + + /* As in transformTableLikeClause, reject whole-row variables */ + if (found_whole_row) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot convert whole-row table reference"), + errdetail("Index \"%s\" contains a whole-row table reference.", + RelationGetRelationName(source_idx)))); iparam->name = NULL; iparam->expr = indexkey; @@ -891,12 +931,28 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, if (!isnull) { char *pred_str; + Node *pred_tree; + bool found_whole_row; /* Convert text string to node tree */ pred_str = DatumGetCString(DirectFunctionCall1(textout, datum)); - index->whereClause = (Node *) stringToNode(pred_str); - /* Adjust attribute numbers */ - change_varattnos_of_a_node(index->whereClause, attmap); + pred_tree = (Node *) stringToNode(pred_str); + + /* Adjust Vars to match new table's column numbering */ + pred_tree = map_variable_attnos(pred_tree, + 1, 0, + attmap, attmap_length, + &found_whole_row); + + /* As in transformTableLikeClause, reject whole-row variables */ + if (found_whole_row) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot convert whole-row table reference"), + errdetail("Index \"%s\" contains a whole-row table reference.", + RelationGetRelationName(source_idx)))); + + index->whereClause = pred_tree; } /* Clean up */ diff --git a/src/backend/rewrite/rewriteManip.c b/src/backend/rewrite/rewriteManip.c index 27ccb9b9aaa0d13ba3350d554dab444b6fcaa49c..3cef28b6479d089ae9edc71f7d9d1d4ba482d75e 100644 --- a/src/backend/rewrite/rewriteManip.c +++ b/src/backend/rewrite/rewriteManip.c @@ -863,6 +863,119 @@ AddInvertedQual(Query *parsetree, Node *qual) } +/* + * map_variable_attnos() finds all user-column Vars in an expression tree + * that reference a particular RTE, and adjusts their varattnos according + * to the given mapping array (varattno n is replaced by attno_map[n-1]). + * Vars for system columns are not modified. + * + * A zero in the mapping array represents a dropped column, which should not + * appear in the expression. + * + * If the expression tree contains a whole-row Var for the target RTE, + * the Var is not changed but *found_whole_row is returned as TRUE. + * For most callers this is an error condition, but we leave it to the caller + * to report the error so that useful context can be provided. (In some + * usages it would be appropriate to modify the Var's vartype and insert a + * ConvertRowtypeExpr node to map back to the original vartype. We might + * someday extend this function's API to support that. For now, the only + * concession to that future need is that this function is a tree mutator + * not just a walker.) + * + * This could be built using replace_rte_variables and a callback function, + * but since we don't ever need to insert sublinks, replace_rte_variables is + * overly complicated. + */ + +typedef struct +{ + int target_varno; /* RTE index to search for */ + int sublevels_up; /* (current) nesting depth */ + const AttrNumber *attno_map; /* map array for user attnos */ + int map_length; /* number of entries in attno_map[] */ + bool *found_whole_row; /* output flag */ +} map_variable_attnos_context; + +static Node * +map_variable_attnos_mutator(Node *node, + map_variable_attnos_context *context) +{ + if (node == NULL) + return NULL; + if (IsA(node, Var)) + { + Var *var = (Var *) node; + + if (var->varno == context->target_varno && + var->varlevelsup == context->sublevels_up) + { + /* Found a matching variable, make the substitution */ + Var *newvar = (Var *) palloc(sizeof(Var)); + int attno = var->varattno; + + *newvar = *var; + if (attno > 0) + { + /* user-defined column, replace attno */ + if (attno > context->map_length || + context->attno_map[attno - 1] == 0) + elog(ERROR, "unexpected varattno %d in expression to be mapped", + attno); + newvar->varattno = newvar->varoattno = context->attno_map[attno - 1]; + } + else if (attno == 0) + { + /* whole-row variable, warn caller */ + *(context->found_whole_row) = true; + } + return (Node *) newvar; + } + /* otherwise fall through to copy the var normally */ + } + else if (IsA(node, Query)) + { + /* Recurse into RTE subquery or not-yet-planned sublink subquery */ + Query *newnode; + + context->sublevels_up++; + newnode = query_tree_mutator((Query *) node, + map_variable_attnos_mutator, + (void *) context, + 0); + context->sublevels_up--; + return (Node *) newnode; + } + return expression_tree_mutator(node, map_variable_attnos_mutator, + (void *) context); +} + +Node * +map_variable_attnos(Node *node, + int target_varno, int sublevels_up, + const AttrNumber *attno_map, int map_length, + bool *found_whole_row) +{ + map_variable_attnos_context context; + + context.target_varno = target_varno; + context.sublevels_up = sublevels_up; + context.attno_map = attno_map; + context.map_length = map_length; + context.found_whole_row = found_whole_row; + + *found_whole_row = false; + + /* + * Must be prepared to start with a Query or a bare expression tree; if + * it's a Query, we don't want to increment sublevels_up. + */ + return query_or_expression_tree_mutator(node, + map_variable_attnos_mutator, + (void *) &context, + 0); +} + + /* * ResolveNew - replace Vars with corresponding items from a targetlist * diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index e9c965439cee5511468e6314839897e4ffa2f406..66380749c60ea7b6060fddbc5b26b3f5cccac8bd 100644 --- a/src/include/commands/tablecmds.h +++ b/src/include/commands/tablecmds.h @@ -52,10 +52,6 @@ extern void find_composite_type_dependencies(Oid typeOid, const char *origTblName, const char *origTypeName); -extern AttrNumber *varattnos_map(TupleDesc old, TupleDesc new); -extern AttrNumber *varattnos_map_schema(TupleDesc old, List *schema); -extern void change_varattnos_of_a_node(Node *node, const AttrNumber *newattno); - extern void register_on_commit_action(Oid relid, OnCommitAction action); extern void remove_on_commit_action(Oid relid); diff --git a/src/include/rewrite/rewriteManip.h b/src/include/rewrite/rewriteManip.h index 04fa98975694a0462748496cb200a1d1d08dd1b4..5f226552db4de5fab432dc3312bed274a88c5127 100644 --- a/src/include/rewrite/rewriteManip.h +++ b/src/include/rewrite/rewriteManip.h @@ -38,6 +38,11 @@ extern void AddInvertedQual(Query *parsetree, Node *qual); extern bool checkExprHasAggs(Node *node); extern bool checkExprHasSubLink(Node *node); +extern Node *map_variable_attnos(Node *node, + int target_varno, int sublevels_up, + const AttrNumber *attno_map, int map_length, + bool *found_whole_row); + extern Node *ResolveNew(Node *node, int target_varno, int sublevels_up, RangeTblEntry *target_rte, List *targetlist, int event, int update_varno);