提交 9a144323 编写于 作者: K Kenan Yao

Wrong data inserted when COPY files to an altered partition table.

上级 d9496670
......@@ -3670,11 +3670,16 @@ CopyFrom(CopyState cstate)
int attnum;
int i;
Oid in_func_oid;
Datum *values;
bool *nulls;
Datum *values = NULL;
bool *nulls = NULL;
Datum *partValues = NULL;
bool *partNulls = NULL;
Datum *baseValues = NULL;
bool *baseNulls = NULL;
bool isnull;
ResultRelInfo *resultRelInfo;
EState *estate = CreateExecutorState(); /* for ExecConstraints() */
TupleTableSlot *baseSlot;
TupleTableSlot *slot;
bool file_has_oids;
int *defmap;
......@@ -3739,6 +3744,8 @@ CopyFrom(CopyState cstate)
use_wal = false;
}
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
/*
* We need a ResultRelInfo so we can use the regular executor's
* index-entry-making machinery. (There used to be a huge amount of
......@@ -3764,7 +3771,7 @@ CopyFrom(CopyState cstate)
CopyInitPartitioningState(estate);
/* Set up a tuple slot too */
slot = MakeSingleTupleTableSlot(tupDesc);
baseSlot = MakeSingleTupleTableSlot(tupDesc);
econtext = GetPerTupleExprContext(estate);
......@@ -3823,10 +3830,13 @@ CopyFrom(CopyState cstate)
file_has_oids = cstate->oids; /* must rely on user to tell us this... */
values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
baseValues = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
baseNulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
attr_offsets = (int *) palloc(num_phys_attrs * sizeof(int));
partValues = (Datum *) palloc(attr_count * sizeof(Datum));
partNulls = (bool *) palloc(attr_count * sizeof(bool));
/* Set up callback to identify error line number */
errcontext.callback = copy_in_error_callback;
errcontext.arg = (void *) cstate;
......@@ -3886,8 +3896,8 @@ CopyFrom(CopyState cstate)
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
/* Initialize all values for row to NULL */
MemSet(values, 0, num_phys_attrs * sizeof(Datum));
MemSet(nulls, true, num_phys_attrs * sizeof(bool));
MemSet(baseValues, 0, num_phys_attrs * sizeof(Datum));
MemSet(baseNulls, true, num_phys_attrs * sizeof(bool));
/* reset attribute pointers */
MemSet(attr_offsets, 0, num_phys_attrs * sizeof(int));
......@@ -4002,9 +4012,9 @@ CopyFrom(CopyState cstate)
PG_TRY();
{
if (cstate->csv_mode)
CopyReadAttributesCSV(cstate, nulls, attr_offsets, num_phys_attrs, attr);
CopyReadAttributesCSV(cstate, baseNulls, attr_offsets, num_phys_attrs, attr);
else
CopyReadAttributesText(cstate, nulls, attr_offsets, num_phys_attrs, attr);
CopyReadAttributesText(cstate, baseNulls, attr_offsets, num_phys_attrs, attr);
/*
* Loop to read the user attributes on the line.
......@@ -4017,7 +4027,7 @@ CopyFrom(CopyState cstate)
string = cstate->attribute_buf.data + attr_offsets[m];
if (nulls[m])
if (baseNulls[m])
isnull = true;
else
isnull = false;
......@@ -4030,11 +4040,11 @@ CopyFrom(CopyState cstate)
cstate->cur_attname = NameStr(attr[m]->attname);
values[m] = InputFunctionCall(&in_functions[m],
baseValues[m] = InputFunctionCall(&in_functions[m],
isnull ? NULL : string,
typioparams[m],
attr[m]->atttypmod);
nulls[m] = isnull;
baseNulls[m] = isnull;
cstate->cur_attname = NULL;
}
......@@ -4045,11 +4055,11 @@ CopyFrom(CopyState cstate)
*/
for (i = 0; i < num_defaults; i++)
{
values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
baseValues[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
&isnull, NULL);
if (!isnull)
nulls[defmap[i]] = false;
baseNulls[defmap[i]] = false;
}
}
PG_CATCH();
......@@ -4070,10 +4080,10 @@ CopyFrom(CopyState cstate)
*/
PG_TRY();
{
MemoryContextSwitchTo(oldcontext);
MemoryContextSwitchTo(estate->es_query_cxt);
if (estate->es_result_partitions)
{
resultRelInfo = values_get_partition(values, nulls,
resultRelInfo = values_get_partition(baseValues, baseNulls,
tupDesc, estate);
estate->es_result_relation_info = resultRelInfo;
}
......@@ -4121,6 +4131,27 @@ CopyFrom(CopyState cstate)
/*
* And now we can form the input tuple.
*/
if (resultRelInfo->ri_partSlot != NULL)
{
AttrMap *map = resultRelInfo->ri_partInsertMap;
Assert(map != NULL);
MemSet(partValues, 0, attr_count * sizeof(Datum));
MemSet(partNulls, true, attr_count * sizeof(bool));
reconstructTupleValues(map, baseValues, baseNulls, (int) num_phys_attrs,
partValues, partNulls, (int) attr_count);
values = partValues;
nulls = partNulls;
}
else
{
values = baseValues;
nulls = baseNulls;
}
if (relstorage == RELSTORAGE_AOROWS)
{
/* form a mem tuple */
......@@ -4139,7 +4170,7 @@ CopyFrom(CopyState cstate)
else
{
/* form a regular heap tuple */
tuple = (HeapTuple) heap_form_tuple(tupDesc, values, nulls);
tuple = (HeapTuple) heap_form_tuple(resultRelInfo->ri_RelationDesc->rd_att, values, nulls);
if (cstate->oids && file_has_oids)
HeapTupleSetOid((HeapTuple)tuple, loaded_oid);
......@@ -4149,7 +4180,7 @@ CopyFrom(CopyState cstate)
/*
* Triggers and stuff need to be invoked in query context.
*/
MemoryContextSwitchTo(oldcontext);
MemoryContextSwitchTo(estate->es_query_cxt);
/* Partitions don't support triggers yet */
Assert(!(estate->es_result_partitions &&
......@@ -4185,6 +4216,16 @@ CopyFrom(CopyState cstate)
{
char relstorage = RelinfoGetStorage(resultRelInfo);
if (resultRelInfo->ri_partSlot != NULL)
{
Assert(resultRelInfo->ri_partInsertMap != NULL);
slot = resultRelInfo->ri_partSlot;
}
else
{
slot = baseSlot;
}
if (relstorage != RELSTORAGE_AOCOLS)
{
/* Place tuple in tuple slot */
......@@ -4273,7 +4314,7 @@ CopyFrom(CopyState cstate)
*/
error_context_stack = errcontext.previous;
MemoryContextSwitchTo(oldcontext);
MemoryContextSwitchTo(estate->es_query_cxt);
/*
* Execute AFTER STATEMENT insertion triggers
......@@ -4295,14 +4336,11 @@ CopyFrom(CopyState cstate)
if (estate->es_result_partitions && Gp_role == GP_ROLE_EXECUTE)
SendAOTupCounts(estate);
pfree(attr_offsets);
pfree(in_functions);
pfree(typioparams);
pfree(defmap);
pfree(defexprs);
/* NB: do not pfree baseValues/baseNulls and partValues/partNulls here, since
* there may be duplicate free in ExecDropSingleTupleTableSlot; if not, they
* would be freed by FreeExecutorState anyhow */
ExecDropSingleTupleTableSlot(slot);
ExecDropSingleTupleTableSlot(baseSlot);
/*
* If we skipped writing WAL, then we need to sync the heap (but not
......@@ -4333,6 +4371,8 @@ CopyFrom(CopyState cstate)
}
cstate->rel = NULL; /* closed above */
MemoryContextSwitchTo(oldcontext);
FreeExecutorState(estate);
}
......
......@@ -7819,7 +7819,7 @@ copy pt_td_leak from '/tmp/pt_td_leak.out' with delimiter ',';
select * from pt_td_leak where col1 = 5;
col1 | col2 | col3
------+------+------
5 | 5 |
5 | 5 | 5
(1 row)
drop table pt_td_leak;
......
......@@ -7790,7 +7790,7 @@ copy pt_td_leak from '/tmp/pt_td_leak.out' with delimiter ',';
select * from pt_td_leak where col1 = 5;
col1 | col2 | col3
------+------+------
5 | 5 |
5 | 5 | 5
(1 row)
drop table pt_td_leak;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册