diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 4636e8d1c6fc74b93e5c48a282fc9f444bc91270..04681f4196299ce74739cc4cccf5ca305cffae97 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -22,6 +22,7 @@ #include "lib/stringinfo.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" +#include "miscadmin.h" #include "nodes/pg_list.h" #include "replication/basebackup.h" #include "replication/walsender.h" @@ -30,7 +31,6 @@ #include "storage/ipc.h" #include "utils/builtins.h" #include "utils/elog.h" -#include "utils/memutils.h" #include "utils/ps_status.h" typedef struct @@ -370,19 +370,10 @@ void SendBaseBackup(BaseBackupCmd *cmd) { DIR *dir; - MemoryContext backup_context; - MemoryContext old_context; basebackup_options opt; parse_basebackup_options(cmd->options, &opt); - backup_context = AllocSetContextCreate(CurrentMemoryContext, - "Streaming base backup context", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - old_context = MemoryContextSwitchTo(backup_context); - WalSndSetState(WALSNDSTATE_BACKUP); if (update_process_title) @@ -403,9 +394,6 @@ SendBaseBackup(BaseBackupCmd *cmd) perform_base_backup(&opt, dir); FreeDir(dir); - - MemoryContextSwitchTo(old_context); - MemoryContextDelete(backup_context); } static void @@ -606,7 +594,7 @@ sendDir(char *path, int basepathlen, bool sizeonly) * error in that case. The error handler further up will call * do_pg_abort_backup() for us. */ - if (walsender_shutdown_requested || walsender_ready_to_stop) + if (ProcDiePending || walsender_ready_to_stop) ereport(ERROR, (errmsg("shutdown requested, aborting active base backup"))); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index cc27848318bc223e931b842bcd94003a9fa556d7..0ba2ad4414062ead6c20c50cd6d5c84ba97b6086 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -78,6 +78,8 @@ bool am_walsender = false; /* Am I a walsender process ? */ bool am_cascading_walsender = false; /* Am I cascading WAL to * another standby ? */ +static bool replication_started = false; /* Started streaming yet? */ + /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ int replication_timeout = 60 * 1000; /* maximum time to send one @@ -113,21 +115,16 @@ static TimestampTz last_reply_timestamp; /* Flags set by signal handlers for later service in main loop */ static volatile sig_atomic_t got_SIGHUP = false; -volatile sig_atomic_t walsender_shutdown_requested = false; volatile sig_atomic_t walsender_ready_to_stop = false; /* Signal handlers */ static void WalSndSigHupHandler(SIGNAL_ARGS); -static void WalSndShutdownHandler(SIGNAL_ARGS); -static void WalSndQuickDieHandler(SIGNAL_ARGS); static void WalSndXLogSendHandler(SIGNAL_ARGS); static void WalSndLastCycleHandler(SIGNAL_ARGS); /* Prototypes for private functions */ -static bool HandleReplicationCommand(const char *cmd_string); static void WalSndLoop(void) __attribute__((noreturn)); -static void InitWalSnd(void); -static void WalSndHandshake(void); +static void InitWalSenderSlot(void); static void WalSndKill(int code, Datum arg); static void XLogSend(char *msgbuf, bool *caughtup); static void IdentifySystem(void); @@ -139,147 +136,48 @@ static void ProcessRepliesIfAny(void); static void WalSndKeepalive(char *msgbuf); -/* Main entry point for walsender process */ +/* Initialize walsender process before entering the main command loop */ void -WalSenderMain(void) +InitWalSender(void) { - MemoryContext walsnd_context; - am_cascading_walsender = RecoveryInProgress(); /* Create a per-walsender data structure in shared memory */ - InitWalSnd(); - - /* - * Create a memory context that we will do all our work in. We do this so - * that we can reset the context during error recovery and thereby avoid - * possible memory leaks. Formerly this code just ran in - * TopMemoryContext, but resetting that would be a really bad idea. - * - * XXX: we don't actually attempt error recovery in walsender, we just - * close the connection and exit. - */ - walsnd_context = AllocSetContextCreate(TopMemoryContext, - "Wal Sender", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - MemoryContextSwitchTo(walsnd_context); + InitWalSenderSlot(); /* Set up resource owner */ CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner"); - /* Unblock signals (they were blocked when the postmaster forked us) */ - PG_SETMASK(&UnBlockSig); - /* * Use the recovery target timeline ID during recovery */ if (am_cascading_walsender) ThisTimeLineID = GetRecoveryTargetTLI(); - - /* Tell the standby that walsender is ready for receiving commands */ - ReadyForQuery(DestRemote); - - /* Handle handshake messages before streaming */ - WalSndHandshake(); - - /* Initialize shared memory status */ - { - /* use volatile pointer to prevent code rearrangement */ - volatile WalSnd *walsnd = MyWalSnd; - - SpinLockAcquire(&walsnd->mutex); - walsnd->sentPtr = sentPtr; - SpinLockRelease(&walsnd->mutex); - } - - SyncRepInitConfig(); - - /* Main loop of walsender */ - WalSndLoop(); } /* - * Execute commands from walreceiver, until we enter streaming mode. + * Clean up after an error. + * + * WAL sender processes don't use transactions like regular backends do. + * This function does any cleanup requited after an error in a WAL sender + * process, similar to what transaction abort does in a regular backend. */ -static void -WalSndHandshake(void) +void +WalSndErrorCleanup() { - StringInfoData input_message; - bool replication_started = false; - - initStringInfo(&input_message); - - while (!replication_started) + if (sendFile >= 0) { - int firstchar; - - WalSndSetState(WALSNDSTATE_STARTUP); - set_ps_display("idle", false); - - /* Wait for a command to arrive */ - firstchar = pq_getbyte(); - - /* - * Emergency bailout if postmaster has died. This is to avoid the - * necessity for manual cleanup of all postmaster children. - */ - if (!PostmasterIsAlive()) - exit(1); - - /* - * Check for any other interesting events that happened while we - * slept. - */ - if (got_SIGHUP) - { - got_SIGHUP = false; - ProcessConfigFile(PGC_SIGHUP); - } - - if (firstchar != EOF) - { - /* - * Read the message contents. This is expected to be done without - * blocking because we've been able to get message type code. - */ - if (pq_getmessage(&input_message, 0)) - firstchar = EOF; /* suitable message already logged */ - } - - /* Handle the very limited subset of commands expected in this phase */ - switch (firstchar) - { - case 'Q': /* Query message */ - { - const char *query_string; - - query_string = pq_getmsgstring(&input_message); - pq_getmsgend(&input_message); - - if (HandleReplicationCommand(query_string)) - replication_started = true; - } - break; - - case 'X': - /* standby is closing the connection */ - proc_exit(0); - - case EOF: - /* standby disconnected unexpectedly */ - ereport(COMMERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("unexpected EOF on standby connection"))); - proc_exit(0); - - default: - ereport(FATAL, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid standby handshake message type %d", firstchar))); - } + close(sendFile); + sendFile = -1; } + + /* + * Don't return back to the command loop after we've started replicating. + * We've already marked us as an actively streaming WAL sender in the + * PMSignal slot, and there's currently no way to undo that. + */ + if (replication_started) + proc_exit(0); } /* @@ -350,15 +248,13 @@ IdentifySystem(void) pq_sendbytes(&buf, (char *) xpos, strlen(xpos)); pq_endmessage(&buf); - - /* Send CommandComplete and ReadyForQuery messages */ - EndCommand("SELECT", DestRemote); - ReadyForQuery(DestRemote); - /* ReadyForQuery did pq_flush for us */ } /* - * START_REPLICATION + * Handle START_REPLICATION command. + * + * At the moment, this never returns, but an ereport(ERROR) will take us back + * to the main loop. */ static void StartReplication(StartReplicationCmd *cmd) @@ -374,6 +270,7 @@ StartReplication(StartReplicationCmd *cmd) */ MarkPostmasterChildWalSender(); SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE); + replication_started = true; /* * When promoting a cascading standby, postmaster sends SIGUSR2 to any @@ -435,15 +332,29 @@ StartReplication(StartReplicationCmd *cmd) * be shipped from that position */ sentPtr = cmd->startpoint; + + /* Also update the start position status in shared memory */ + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->sentPtr = sentPtr; + SpinLockRelease(&walsnd->mutex); + } + + SyncRepInitConfig(); + + /* Main loop of walsender */ + WalSndLoop(); } /* * Execute an incoming replication command. */ -static bool -HandleReplicationCommand(const char *cmd_string) +void +exec_replication_command(const char *cmd_string) { - bool replication_started = false; int parse_rc; Node *cmd_node; MemoryContext cmd_context; @@ -451,6 +362,8 @@ HandleReplicationCommand(const char *cmd_string) elog(DEBUG1, "received replication command: %s", cmd_string); + CHECK_FOR_INTERRUPTS(); + cmd_context = AllocSetContextCreate(CurrentMemoryContext, "Replication command context", ALLOCSET_DEFAULT_MINSIZE, @@ -476,18 +389,10 @@ HandleReplicationCommand(const char *cmd_string) case T_StartReplicationCmd: StartReplication((StartReplicationCmd *) cmd_node); - - /* break out of the loop */ - replication_started = true; break; case T_BaseBackupCmd: SendBaseBackup((BaseBackupCmd *) cmd_node); - - /* Send CommandComplete and ReadyForQuery messages */ - EndCommand("SELECT", DestRemote); - ReadyForQuery(DestRemote); - /* ReadyForQuery did pq_flush for us */ break; default: @@ -500,7 +405,8 @@ HandleReplicationCommand(const char *cmd_string) MemoryContextSwitchTo(old_context); MemoryContextDelete(cmd_context); - return replication_started; + /* Send CommandComplete message */ + EndCommand("SELECT", DestRemote); } /* @@ -710,7 +616,7 @@ ProcessStandbyHSFeedbackMessage(void) MyPgXact->xmin = reply.xmin; } -/* Main loop of walsender process */ +/* Main loop of walsender process that streams the WAL over Copy messages. */ static void WalSndLoop(void) { @@ -754,15 +660,7 @@ WalSndLoop(void) SyncRepInitConfig(); } - /* Normal exit from the walsender is here */ - if (walsender_shutdown_requested) - { - /* Inform the standby that XLOG streaming is done */ - pq_puttextmessage('C', "COPY 0"); - pq_flush(); - - proc_exit(0); - } + CHECK_FOR_INTERRUPTS(); /* Check for input from the client */ ProcessRepliesIfAny(); @@ -813,7 +711,7 @@ WalSndLoop(void) XLogSend(output_message, &caughtup); if (caughtup && !pq_is_send_pending()) { - walsender_shutdown_requested = true; + ProcDiePending = true; continue; /* don't want to wait more */ } } @@ -854,8 +752,11 @@ WalSndLoop(void) } /* Sleep until something happens or replication timeout */ + ImmediateInterruptOK = true; + CHECK_FOR_INTERRUPTS(); WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents, MyProcPort->sock, sleeptime); + ImmediateInterruptOK = false; /* * Check for replication timeout. Note we ignore the corner case @@ -892,7 +793,7 @@ WalSndLoop(void) /* Initialize a per-walsender data structure for this walsender process */ static void -InitWalSnd(void) +InitWalSenderSlot(void) { int i; @@ -1284,58 +1185,6 @@ WalSndSigHupHandler(SIGNAL_ARGS) errno = save_errno; } -/* SIGTERM: set flag to shut down */ -static void -WalSndShutdownHandler(SIGNAL_ARGS) -{ - int save_errno = errno; - - walsender_shutdown_requested = true; - if (MyWalSnd) - SetLatch(&MyWalSnd->latch); - - /* - * Set the standard (non-walsender) state as well, so that we can abort - * things like do_pg_stop_backup(). - */ - InterruptPending = true; - ProcDiePending = true; - - errno = save_errno; -} - -/* - * WalSndQuickDieHandler() occurs when signalled SIGQUIT by the postmaster. - * - * Some backend has bought the farm, - * so we need to stop what we're doing and exit. - */ -static void -WalSndQuickDieHandler(SIGNAL_ARGS) -{ - PG_SETMASK(&BlockSig); - - /* - * We DO NOT want to run proc_exit() callbacks -- we're here because - * shared memory may be corrupted, so we don't want to try to clean up our - * transaction. Just nail the windows shut and get out of town. Now that - * there's an atexit callback to prevent third-party code from breaking - * things by calling exit() directly, we have to reset the callbacks - * explicitly to make this work as intended. - */ - on_exit_reset(); - - /* - * Note we do exit(2) not exit(0). This is to force the postmaster into a - * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random - * backend. This is necessary precisely because we don't clean up our - * shared memory state. (The "dead man switch" mechanism in pmsignal.c - * should ensure the postmaster sees this as a crash, too, but no harm in - * being doubly sure.) - */ - exit(2); -} - /* SIGUSR1: set flag to send WAL records */ static void WalSndXLogSendHandler(SIGNAL_ARGS) @@ -1368,8 +1217,8 @@ WalSndSignals(void) pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config * file */ pqsignal(SIGINT, SIG_IGN); /* not used */ - pqsignal(SIGTERM, WalSndShutdownHandler); /* request shutdown */ - pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */ + pqsignal(SIGTERM, die); /* request shutdown */ + pqsignal(SIGQUIT, quickdie); /* hard crash time */ InitializeTimeouts(); /* establishes SIGALRM handler */ pqsignal(SIGPIPE, SIG_IGN); pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */ diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index f1248a851bf90188da8d3a7e8b61ac99bf78ebbd..585db1af89cd37ba7e8f5327bbd992c45beaf288 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -192,6 +192,7 @@ static int InteractiveBackend(StringInfo inBuf); static int interactive_getc(void); static int SocketBackend(StringInfo inBuf); static int ReadCommand(StringInfo inBuf); +static void forbidden_in_wal_sender(char firstchar); static List *pg_rewrite_query(Query *query); static bool check_log_statement(List *stmt_list); static int errdetail_execute(List *raw_parsetree_list); @@ -3720,12 +3721,9 @@ PostgresMain(int argc, char *argv[], const char *username) if (IsUnderPostmaster && Log_disconnections) on_proc_exit(log_disconnections, 0); - /* If this is a WAL sender process, we're done with initialization. */ + /* Perform initialization specific to a WAL sender process. */ if (am_walsender) - { - WalSenderMain(); /* does not return */ - abort(); - } + InitWalSender(); /* * process any libraries that should be preloaded at backend start (this @@ -3835,6 +3833,9 @@ PostgresMain(int argc, char *argv[], const char *username) */ AbortCurrentTransaction(); + if (am_walsender) + WalSndErrorCleanup(); + /* * Now return to normal top-level context and clear ErrorContext for * next time. @@ -3969,7 +3970,10 @@ PostgresMain(int argc, char *argv[], const char *username) query_string = pq_getmsgstring(&input_message); pq_getmsgend(&input_message); - exec_simple_query(query_string); + if (am_walsender) + exec_replication_command(query_string); + else + exec_simple_query(query_string); send_ready_for_query = true; } @@ -3982,6 +3986,8 @@ PostgresMain(int argc, char *argv[], const char *username) int numParams; Oid *paramTypes = NULL; + forbidden_in_wal_sender(firstchar); + /* Set statement_timestamp() */ SetCurrentStatementStartTimestamp(); @@ -4004,6 +4010,8 @@ PostgresMain(int argc, char *argv[], const char *username) break; case 'B': /* bind */ + forbidden_in_wal_sender(firstchar); + /* Set statement_timestamp() */ SetCurrentStatementStartTimestamp(); @@ -4019,6 +4027,8 @@ PostgresMain(int argc, char *argv[], const char *username) const char *portal_name; int max_rows; + forbidden_in_wal_sender(firstchar); + /* Set statement_timestamp() */ SetCurrentStatementStartTimestamp(); @@ -4031,6 +4041,8 @@ PostgresMain(int argc, char *argv[], const char *username) break; case 'F': /* fastpath function call */ + forbidden_in_wal_sender(firstchar); + /* Set statement_timestamp() */ SetCurrentStatementStartTimestamp(); @@ -4078,6 +4090,8 @@ PostgresMain(int argc, char *argv[], const char *username) int close_type; const char *close_target; + forbidden_in_wal_sender(firstchar); + close_type = pq_getmsgbyte(&input_message); close_target = pq_getmsgstring(&input_message); pq_getmsgend(&input_message); @@ -4120,6 +4134,8 @@ PostgresMain(int argc, char *argv[], const char *username) int describe_type; const char *describe_target; + forbidden_in_wal_sender(firstchar); + /* Set statement_timestamp() (needed for xact) */ SetCurrentStatementStartTimestamp(); @@ -4201,6 +4217,29 @@ PostgresMain(int argc, char *argv[], const char *username) } /* end of input-reading loop */ } +/* + * Throw an error if we're a WAL sender process. + * + * This is used to forbid anything else than simple query protocol messages + * in a WAL sender process. 'firstchar' specifies what kind of a forbidden + * message was received, and is used to construct the error message. + */ +static void +forbidden_in_wal_sender(char firstchar) +{ + if (am_walsender) + { + if (firstchar == 'F') + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("fastpath function calls not supported in a replication connection"))); + else + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("extended query protocol not supported in a replication connection"))); + } +} + /* * Obtain platform stack depth limit (in bytes) diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index bb85ccf7b22cea2a927a82be0ba6219c6fb5efe8..78e8558299ce844d8a7bd31ebeafc5a8f507f5a2 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -19,7 +19,6 @@ /* global state */ extern bool am_walsender; extern bool am_cascading_walsender; -extern volatile sig_atomic_t walsender_shutdown_requested; extern volatile sig_atomic_t walsender_ready_to_stop; extern bool wake_wal_senders; @@ -27,7 +26,9 @@ extern bool wake_wal_senders; extern int max_wal_senders; extern int replication_timeout; -extern void WalSenderMain(void) __attribute__((noreturn)); +extern void InitWalSender(void); +extern void exec_replication_command(const char *query_string); +extern void WalSndErrorCleanup(void); extern void WalSndSignals(void); extern Size WalSndShmemSize(void); extern void WalSndShmemInit(void);