提交 206ffa6c 编写于 作者: D David Kimura

QE writer should cancel QE readers before aborting

If a QE writer marks the transaction as aborted while a reader is
still executing the query the reader may affect shared memory state.
E.g. consider a transaction being aborted and a table needs to be
dropped as part of abort processing.  If the writer has dropped all
the buffers belonging to the table from shared buffer cache and is
about to unlink the file for the table.  Concurrently, a reader,
unaware of the writer's abort, is still executing the query.  The
reader may bring in a page from the file that the writer is about to
unlink into shared buffer cache.

In order to prevent such situations the writer walks procArray to find
the readers and sends SIGINT to them.

Walking procArray is expensive, and is avoided as much as possible.
The patch walks the procArray only if the command being aborted (or
the last command in a transaction that is being aborted) performed a
write and at least one reader slice was part of the query plan.

To avoid confusion on the QD due to "canceling MPP operation" error
messages emitted by the readers upon receiving the SIGINT, the readers
do not emit them on the libpq channel.

Discussion:
https://groups.google.com/a/greenplum.org/d/msg/gpdb-dev/S2WL1FtJEJ0/Wh6DfJ-RBwAJCo-authored-by: NEkta Khanna <ekhanna@pivotal.io>
Co-authored-by: NAsim R P <apraveen@pivotal.io>
Co-authored-by: NTaylor Vesely <tvesely@pivotal.io>
上级 012476fd
......@@ -183,6 +183,7 @@ typedef struct TransactionStateData
bool startedInRecovery; /* did we start in recovery? */
bool didLogXid; /* has xid been included in WAL record? */
bool executorSaysXactDoesWrites; /* GP executor says xact does writes */
bool xactUsesReaders; /* signal readers to cancel query execution before abort */
struct TransactionStateData *parent; /* back link to parent */
struct TransactionStateData *fastLink; /* back link to jump to parent for efficient search */
......@@ -220,6 +221,7 @@ static TransactionStateData TopTransactionStateData = {
false, /* startedInRecovery */
false, /* didLogXid */
false, /* executorSaysXactDoesWrites */
false, /* xactUsesReaders */
NULL /* link to parent state block */
};
......@@ -3229,6 +3231,7 @@ AbortTransaction(void)
AtAbort_Notify();
AtEOXact_RelationMap(false);
AtAbort_Twophase();
AtAbort_Readers();
/*
* Advertise the fact that we aborted in pg_clog (assuming that we got as
......@@ -5869,6 +5872,30 @@ xactGetCommittedChildren(TransactionId **ptr)
return s->nChildXids;
}
/*
* Mark this transaction as using QE reader. This is used during abort to
* signal readers to cancel query execution before marking this transaction
* aborted.
*/
void
SetCurrentTransactionUsesReaders(void)
{
Assert(Gp_is_writer);
CurrentTransactionState->xactUsesReaders = true;
}
bool
GetCurrentTransactionUsesReaders(void)
{
return CurrentTransactionState->xactUsesReaders;
}
void
ResetCurrentTransactionUsesReaders(void)
{
CurrentTransactionState->xactUsesReaders = false;
}
/*
* XLOG support routines
*/
......
......@@ -4185,7 +4185,6 @@ PostmasterStateMachine(void)
}
}
/*
* Send a signal to a postmaster child process
*
......
......@@ -1548,6 +1548,7 @@ updateSharedLocalSnapshot(DtxContextInfo *dtxContextInfo, Snapshot snapshot, cha
SharedLocalSnapshotSlot->QDxid = dtxContextInfo->distributedXid;
SharedLocalSnapshotSlot->ready = true;
SharedLocalSnapshotSlot->writerSentCancel = false;
SharedLocalSnapshotSlot->segmateSync = dtxContextInfo->segmateSync;
......@@ -4863,6 +4864,40 @@ ListAllGxid(void)
return gxids;
}
void
CancelMyReaders(void)
{
int numProcs;
int i;
int pid_index = 0;
ProcArrayStruct *arrayP = procArray;
pid_t *reader_pids = (pid_t *) palloc(arrayP->maxProcs * sizeof(pid_t));
Assert(Gp_is_writer);
LWLockAcquire(ProcArrayLock, LW_SHARED);
numProcs = arrayP->numProcs;
for (i = 0; i < numProcs; i++)
{
volatile PGPROC *proc = &allProcs[arrayP->pgprocnos[i]];
if (proc->mppSessionId == MyProc->mppSessionId &&
proc->pid != MyProcPid)
{
Assert(!proc->mppIsWriter);
Assert(proc->databaseId == MyDatabaseId);
reader_pids[pid_index++] = proc->pid;
}
}
LWLockRelease(ProcArrayLock);
while (--pid_index >= 0)
{
kill(reader_pids[pid_index], SIGINT); /* ignore error */
SIMPLE_FAULT_INJECTOR(CancelledReaderDuringAbort);
}
}
/*
* This function returns the corresponding process id given by a
* DistributedTransaction Id.
......
......@@ -94,6 +94,7 @@
#include "cdb/cdbgang.h"
#include "cdb/ml_ipc.h"
#include "utils/guc.h"
#include "utils/sharedsnapshot.h"
#include "access/twophase.h"
#include "postmaster/backoff.h"
#include "utils/resource_manager.h"
......@@ -236,6 +237,7 @@ static bool RecoveryConflictRetryable = true;
static ProcSignalReason RecoveryConflictReason;
static DtxContextInfo TempDtxContextInfo = DtxContextInfo_StaticInit;
static bool HandlingWriterCancelRequest = false;
extern void CheckForQDMirroringWork(void);
......@@ -1052,7 +1054,7 @@ exec_mpp_query(const char *query_string,
QueryDispatchDesc *ddesc = NULL;
CmdType commandType = CMD_UNKNOWN;
SliceTable *sliceTable = NULL;
Slice *slice = NULL;
Slice *currentSlice = NULL;
ParamListInfo paramLI = NULL;
Assert(Gp_role == GP_ROLE_EXECUTE);
......@@ -1146,14 +1148,14 @@ exec_mpp_query(const char *query_string,
/* Identify slice to execute */
foreach(lc, sliceTable->slices)
{
slice = (Slice *)lfirst(lc);
if (bms_is_member(qe_identifier, slice->processesMap))
currentSlice = (Slice *)lfirst(lc);
if (bms_is_member(qe_identifier, currentSlice->processesMap))
break;
}
sliceTable->localSlice = slice->sliceIndex;
sliceTable->localSlice = currentSlice->sliceIndex;
Assert(IsA(slice, Slice));
Assert(IsA(currentSlice, Slice));
/* Set global sliceid variable for elog. */
currentSliceId = sliceTable->localSlice;
......@@ -1185,10 +1187,10 @@ exec_mpp_query(const char *query_string,
commandType = plan->commandType;
}
if ( slice )
if ( currentSlice )
{
/* Non root slices don't need update privileges. */
if (sliceTable->localSlice != slice->rootIndex)
if (sliceTable->localSlice != currentSlice->rootIndex)
{
ListCell *rtcell;
RangeTblEntry *rte;
......@@ -1304,6 +1306,21 @@ exec_mpp_query(const char *query_string,
/* Make sure we are in a transaction command */
start_xact_command();
/*
* In a query plan there can be at the most one writer that executes
* the root slice. All other slices are executed by readers. If
* readers exist in our plan, we capture that information in the
* transaction state, to be used later during abort. In case of a
* read-only command, no slice should have gangType as PRIMARY_WRITER.
* We can skip the overhead of walking through proc array during abort
* if the command is read-only.
*/
if (sliceTable && list_length(sliceTable->slices) > 1 && Gp_is_writer &&
currentSlice->gangType == GANGTYPE_PRIMARY_WRITER)
SetCurrentTransactionUsesReaders();
else
ResetCurrentTransactionUsesReaders();
/* If we got a cancel signal in parsing or prior command, quit */
CHECK_FOR_INTERRUPTS();
......@@ -3871,9 +3888,23 @@ ProcessInterrupts(const char* filename, int lineno)
DisableCatchupInterrupt();
if (Gp_role == GP_ROLE_EXECUTE)
{
HandlingWriterCancelRequest =
!Gp_is_writer && SharedLocalSnapshotSlot &&
SharedLocalSnapshotSlot->writerSentCancel;
/*
* Do not send the error message back to QD if the writer
* asked us to cancel the query. Otherwise, if QD receives
* "canceling MPP operation" error before the error reported
* by the writer, the writer's error will not be reported to
* the client.
*/
if (HandlingWriterCancelRequest)
whereToSendOutput = DestNone;
ereport(ERROR,
(errcode(ERRCODE_GP_OPERATION_CANCELED),
errmsg("canceling MPP operation")));
}
else if (HasCancelMessage())
{
char *buffer = palloc0(MAX_CANCEL_MSG);
......@@ -4967,6 +4998,16 @@ PostgresMain(int argc, char *argv[],
/* Report the error to the client and/or server log */
EmitErrorReport();
if (HandlingWriterCancelRequest)
{
/*
* Restore CommandDest so that any error from now on will be
* delivered back to QD.
*/
Assert(!Gp_is_writer);
whereToSendOutput = DestRemote;
HandlingWriterCancelRequest = false;
}
/*
* Make sure debug_query_string gets reset before we possibly clobber
......
......@@ -942,3 +942,26 @@ LogDistributedSnapshotInfo(Snapshot snapshot, const char *prefix)
elog(LOG, "%s", buf.data);
pfree(buf.data);
}
/*
* QE writer must let other QE readers participating in the same transaction
* know that the transaction must be canceled. This reduces the window
* between a transaction being marked as aborted by the QE writer and QE
* readers cancel executing part of the transaction / plan.
*/
void
AtAbort_Readers(void)
{
/*
* Walk the procArray only if the current transaction made any writes and
* there are readers
*/
if (Gp_is_writer && TransactionIdIsValid(GetCurrentTransactionIdIfAny()) &&
GetCurrentTransactionUsesReaders())
{
Assert(SharedLocalSnapshotSlot != NULL);
SharedLocalSnapshotSlot->writerSentCancel = true;
CancelMyReaders();
}
/* TODO: how to ensure that the readers have acted upon the signal? */
}
......@@ -292,6 +292,9 @@ extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
extern void RecordDistributedForgetCommitted(struct TMGXACT_LOG *gxact_log);
extern int xactGetCommittedChildren(TransactionId **ptr);
extern void SetCurrentTransactionUsesReaders(void);
extern bool GetCurrentTransactionUsesReaders(void);
extern void ResetCurrentTransactionUsesReaders(void);
extern void xact_redo(XLogRecPtr beginLoc, XLogRecPtr lsn, XLogRecord *record);
extern void xact_desc(StringInfo buf, XLogRecord *record);
......
......@@ -96,6 +96,7 @@ extern void GetSlotTableDebugInfo(void **snapshotArray, int *maxSlots);
extern void getDtxCheckPointInfo(char **result, int *result_size);
extern List *ListAllGxid(void);
extern void CancelMyReaders(void);
extern int GetPidByGxid(DistributedTransactionId gxid);
extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
......
......@@ -255,6 +255,8 @@ FI_IDENT(CreateGangInProgress, "create_gang_in_progress")
FI_IDENT(DecreaseToastMaxChunkSize, "decrease_toast_max_chunk_size")
/* inject fault to let cleanupGang return false */
FI_IDENT(CleanupQE, "cleanup_qe")
/* inject fault after a writer has cancelled a reader during abort transaction */
FI_IDENT(CancelledReaderDuringAbort, "cancelled_reader_during_abort")
#endif
/*
......
......@@ -36,6 +36,7 @@ typedef struct SharedSnapshotSlot
ComboCidKeyData combocids[MaxComboCids];
SnapshotData snapshot;
LWLockId slotLock;
bool writerSentCancel;
} SharedSnapshotSlot;
extern volatile SharedSnapshotSlot *SharedLocalSnapshotSlot;
......@@ -53,6 +54,7 @@ extern void dumpSharedLocalSnapshot_forCursor(void);
extern void readSharedLocalSnapshot_forCursor(Snapshot snapshot);
extern void AtEOXact_SharedSnapshot(void);
extern void AtAbort_Readers(void);
#define NUM_SHARED_SNAPSHOT_SLOTS (2 * max_prepared_xacts)
......
-- Tests to validate that in a multi-slice query a QE writer signals QE readers
-- to cancel query execution before marking the transaction as aborted.
-- The tests make use of a "skip" fault to determine if the control reached a
-- specific location of interest. In this case, the location of the fault
-- "cancelled_reader_during_abort" is right after a writer sends SIGINT to
-- corresponding readers.
CREATE EXTENSION IF NOT EXISTS gp_inject_fault;
create table writer_aborts_before_reader_a(i int, j int) distributed by (i);
alter table writer_aborts_before_reader_a add constraint check_j check (j > 0);
insert into writer_aborts_before_reader_a select 4,i from generate_series(1,12) i;
create table writer_aborts_before_reader_b (like writer_aborts_before_reader_a) distributed by (i);
insert into writer_aborts_before_reader_b select * from writer_aborts_before_reader_a;
-- first test: one writer with one or more readers
select gp_inject_fault('cancelled_reader_during_abort', 'skip', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
NOTICE: Success: (seg0 127.0.0.1:25432 pid=51789)
gp_inject_fault
-----------------
t
(1 row)
begin;
-- Make a write in this transaction so that a TransactionId will be assigned.
insert into writer_aborts_before_reader_a values (4, 4);
-- This multi-slice update is aborted because QE writer encounters constraint
-- failure. The writer must send cancel signal to the readers before aborting the
-- transaction. The fault, therefore, is expected to hit at least once.
update writer_aborts_before_reader_a set j = -1 from writer_aborts_before_reader_b;
ERROR: new row for relation "writer_aborts_before_reader_a" violates check constraint "check_j" (seg0 127.0.0.1:25432 pid=51789)
DETAIL: Failing row contains (4, -1).
end;
select gp_wait_until_triggered_fault('cancelled_reader_during_abort', 1, dbid) from
gp_segment_configuration where role = 'p' and content = 0;
NOTICE: Success: (seg0 127.0.0.1:25432 pid=51789)
gp_wait_until_triggered_fault
-------------------------------
t
(1 row)
select gp_inject_fault('cancelled_reader_during_abort', 'reset', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
NOTICE: Success: (seg0 127.0.0.1:25432 pid=51789)
gp_inject_fault
-----------------
t
(1 row)
-- second test: one writer with no readers
select gp_inject_fault('cancelled_reader_during_abort', 'skip', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
NOTICE: Success: (seg0 127.0.0.1:25432 pid=51789)
gp_inject_fault
-----------------
t
(1 row)
begin;
-- Make a write in this transaction so that a TransactionId will be assigned.
insert into writer_aborts_before_reader_a values (4, 4);
-- No reader gangs. This is a single slice update. The writer aborts the
-- transaction because of constraint failure.
update writer_aborts_before_reader_a set j = -1;
ERROR: new row for relation "writer_aborts_before_reader_a" violates check constraint "check_j" (seg0 127.0.0.1:25432 pid=51789)
DETAIL: Failing row contains (4, -1).
end;
-- The writer from the previous update statement is not expceted to walk the
-- proc array and look for readers because that was a single-slice statement.
-- Therefore, hit count for the fault should be 0.
select gp_inject_fault('cancelled_reader_during_abort', 'status', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
NOTICE: Success: fault name:'cancelled_reader_during_abort' fault type:'skip' ddl statement:'' database name:'' table name:'' start occurrence:'1' end occurrence:'1' extra arg:'0' fault injection state:'set' num times hit:'0' (seg0 127.0.0.1:25432 pid=51789)
gp_inject_fault
-----------------
t
(1 row)
select gp_inject_fault('cancelled_reader_during_abort', 'reset', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
NOTICE: Success: (seg0 127.0.0.1:25432 pid=51789)
gp_inject_fault
-----------------
t
(1 row)
-- third test: writer and reader, but the transaction does not write
select gp_inject_fault('cancelled_reader_during_abort', 'skip', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
NOTICE: Success: (seg0 127.0.0.1:25432 pid=51789)
gp_inject_fault
-----------------
t
(1 row)
begin;
-- This is a non-colocated join, with two slices.
select count(*) from writer_aborts_before_reader_a, writer_aborts_before_reader_b
where writer_aborts_before_reader_a.i = writer_aborts_before_reader_b.j;
count
-------
12
(1 row)
abort;
-- The previous abort should cause the QE writer to go through abort workflow.
-- But the writer should not walk the proc array and look for readers because the
-- transaction did not make any writes. Therefore, the fault status is expected
-- to return hit count as 0.
select gp_inject_fault('cancelled_reader_during_abort', 'status', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
NOTICE: Success: fault name:'cancelled_reader_during_abort' fault type:'skip' ddl statement:'' database name:'' table name:'' start occurrence:'1' end occurrence:'1' extra arg:'0' fault injection state:'set' num times hit:'0' (seg0 127.0.0.1:25432 pid=51789)
gp_inject_fault
-----------------
t
(1 row)
select gp_inject_fault('cancelled_reader_during_abort', 'reset', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
NOTICE: Success: (seg0 127.0.0.1:25432 pid=51789)
gp_inject_fault
-----------------
t
(1 row)
-- fourth test: the transaction does write but the last command before
-- abort is read-only
select gp_inject_fault('cancelled_reader_during_abort', 'skip', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
NOTICE: Success: (seg0 127.0.1.1:25432 pid=1390)
gp_inject_fault
-----------------
t
(1 row)
begin;
-- Make a write in this transaction so that a TransactionId will be assigned.
insert into writer_aborts_before_reader_a values (4, 4);
-- Both the slices in this query plan have gangType PRIMARY_READER
select count(*) from writer_aborts_before_reader_a, writer_aborts_before_reader_b
where writer_aborts_before_reader_a.i = writer_aborts_before_reader_b.j;
count
-------
13
(1 row)
abort;
-- QE writer should not walk through proc array during abort to look
-- for readers because none of the slices in the last command had
-- gangType PRIMARY_WRITER. Hit count reported by the status should
-- therefore be 0.
select gp_inject_fault('cancelled_reader_during_abort', 'status', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
NOTICE: Success: fault name:'cancelled_reader_during_abort' fault type:'skip' ddl statement:'' database name:'' table name:'' start occurrence:'1' end occurrence:'1' extra arg:'0' fault injection state:'set' num times hit:'0' (seg0 127.0.1.1:25432 pid=1390)
gp_inject_fault
-----------------
t
(1 row)
select gp_inject_fault('cancelled_reader_during_abort', 'reset', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
NOTICE: Success: (seg0 127.0.1.1:25432 pid=1390)
gp_inject_fault
-----------------
t
(1 row)
......@@ -142,7 +142,7 @@ test: catalog
test: bfv_catalog bfv_index bfv_olap bfv_aggregate bfv_partition bfv_partition_plans DML_over_joins gporca bfv_statistic
# NOTE: gporca_faults uses gp_fault_injector - so do not add to a parallel group
test: gporca_faults
test: aggregate_with_groupingsets
test: nested_case_null sort bb_mpph
......@@ -247,4 +247,10 @@ test: autovacuum-template0
# Online expand introduce the partial tables, check them if they can run correctly
test: gangsize
# This test uses fault injectors in abort transaction workflow. It
# may be run in parallel with other tests as long as they don't abort
# a transaction where mult-slice query was executed and the writer was
# assigned a transaction ID.
test: writer_aborts_reader
# end of tests
-- Tests to validate that in a multi-slice query a QE writer signals QE readers
-- to cancel query execution before marking the transaction as aborted.
-- The tests make use of a "skip" fault to determine if the control reached a
-- specific location of interest. In this case, the location of the fault
-- "cancelled_reader_during_abort" is right after a writer sends SIGINT to
-- corresponding readers.
CREATE EXTENSION IF NOT EXISTS gp_inject_fault;
create table writer_aborts_before_reader_a(i int, j int) distributed by (i);
alter table writer_aborts_before_reader_a add constraint check_j check (j > 0);
insert into writer_aborts_before_reader_a select 4,i from generate_series(1,12) i;
create table writer_aborts_before_reader_b (like writer_aborts_before_reader_a) distributed by (i);
insert into writer_aborts_before_reader_b select * from writer_aborts_before_reader_a;
-- first test: one writer with one or more readers
select gp_inject_fault('cancelled_reader_during_abort', 'skip', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
begin;
-- Make a write in this transaction so that a TransactionId will be assigned.
insert into writer_aborts_before_reader_a values (4, 4);
-- This multi-slice update is aborted because QE writer encounters constraint
-- failure. The writer must send cancel signal to the readers before aborting the
-- transaction. The fault, therefore, is expected to hit at least once.
update writer_aborts_before_reader_a set j = -1 from writer_aborts_before_reader_b;
end;
select gp_wait_until_triggered_fault('cancelled_reader_during_abort', 1, dbid) from
gp_segment_configuration where role = 'p' and content = 0;
select gp_inject_fault('cancelled_reader_during_abort', 'reset', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
-- second test: one writer with no readers
select gp_inject_fault('cancelled_reader_during_abort', 'skip', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
begin;
-- Make a write in this transaction so that a TransactionId will be assigned.
insert into writer_aborts_before_reader_a values (4, 4);
-- No reader gangs. This is a single slice update. The writer aborts the
-- transaction because of constraint failure.
update writer_aborts_before_reader_a set j = -1;
end;
-- The writer from the previous update statement is not expceted to walk the
-- proc array and look for readers because that was a single-slice statement.
-- Therefore, hit count for the fault should be 0.
select gp_inject_fault('cancelled_reader_during_abort', 'status', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
select gp_inject_fault('cancelled_reader_during_abort', 'reset', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
-- third test: writer and reader, but the transaction does not write
select gp_inject_fault('cancelled_reader_during_abort', 'skip', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
begin;
-- This is a non-colocated join, with two slices.
select count(*) from writer_aborts_before_reader_a, writer_aborts_before_reader_b
where writer_aborts_before_reader_a.i = writer_aborts_before_reader_b.j;
abort;
-- The previous abort should cause the QE writer to go through abort workflow.
-- But the writer should not walk the proc array and look for readers because the
-- transaction did not make any writes. Therefore, the fault status is expected
-- to return hit count as 0.
select gp_inject_fault('cancelled_reader_during_abort', 'status', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
select gp_inject_fault('cancelled_reader_during_abort', 'reset', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
-- fourth test: the transaction does write but the last command before
-- abort is read-only
select gp_inject_fault('cancelled_reader_during_abort', 'skip', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
begin;
-- Make a write in this transaction so that a TransactionId will be assigned.
insert into writer_aborts_before_reader_a values (4, 4);
-- Both the slices in this query plan have gangType PRIMARY_READER
select count(*) from writer_aborts_before_reader_a, writer_aborts_before_reader_b
where writer_aborts_before_reader_a.i = writer_aborts_before_reader_b.j;
abort;
-- QE writer should not walk through proc array during abort to look
-- for readers because none of the slices in the last command had
-- gangType PRIMARY_WRITER. Hit count reported by the status should
-- therefore be 0.
select gp_inject_fault('cancelled_reader_during_abort', 'status', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
select gp_inject_fault('cancelled_reader_during_abort', 'reset', dbid) from
gp_segment_configuration where role = 'p' and content = 0;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册