From e2a186b03cc1a87cf26644db18f28a20f10bd739 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Mon, 16 Apr 2007 18:30:04 +0000 Subject: [PATCH] Add a multi-worker capability to autovacuum. This allows multiple worker processes to be running simultaneously. Also, now autovacuum processes do not count towards the max_connections limit; they are counted separately from regular processes, and are limited by the new GUC variable autovacuum_max_workers. The launcher now has intelligence to launch workers on each database every autovacuum_naptime seconds, limited only on the max amount of worker slots available. Also, the global worker I/O utilization is limited by the vacuum cost-based delay feature. Workers are "balanced" so that the total I/O consumption does not exceed the established limit. This part of the patch was contributed by ITAGAKI Takahiro. Per discussion. --- doc/src/sgml/config.sgml | 30 +- doc/src/sgml/maintenance.sgml | 49 +- src/backend/commands/vacuum.c | 5 +- src/backend/postmaster/autovacuum.c | 1102 +++++++++++++++-- src/backend/storage/lmgr/proc.c | 62 +- src/backend/utils/init/globals.c | 9 +- src/backend/utils/misc/guc.c | 51 +- src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/miscadmin.h | 3 +- src/include/postmaster/autovacuum.h | 11 +- src/include/storage/lwlock.h | 3 +- src/include/storage/proc.h | 4 +- 12 files changed, 1171 insertions(+), 159 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 57a618faa6..e10d2d753a 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -1,4 +1,4 @@ - + Server Configuration @@ -3166,7 +3166,7 @@ SELECT * FROM parent WHERE key = 2400; Controls whether the server should run the - autovacuum daemon. This is off by default. + autovacuum launcher daemon. This is on by default. stats_start_collector and stats_row_level must also be turned on for autovacuum to work. This parameter can only be set in the postgresql.conf @@ -3175,6 +3175,21 @@ SELECT * FROM parent WHERE key = 2400; + + autovacuum_max_workers (integer) + + autovacuum_max_workers configuration parameter + + + + Specifies the maximum number of autovacuum processes (other than the + autovacuum launcher) which may be running at any one time. The default + is three (3). This parameter can only be set in + the postgresql.conf file or on the server command line. + + + + autovacuum_naptime (integer) @@ -3182,9 +3197,9 @@ SELECT * FROM parent WHERE key = 2400; - Specifies the delay between activity rounds for the autovacuum - daemon. In each round the daemon examines one database - and issues VACUUM and ANALYZE commands + Specifies the minimum delay between autovacuum runs on any given + database. In each round the daemon examines the + database and issues VACUUM and ANALYZE commands as needed for tables in that database. The delay is measured in seconds, and the default is one minute (1m). This parameter can only be set in the postgresql.conf @@ -3318,7 +3333,10 @@ SELECT * FROM parent WHERE key = 2400; Specifies the cost limit value that will be used in automatic VACUUM operations. If -1 is specified (which is the default), the regular - value will be used. + value will be used. Note that + the value is distributed proportionally among the running autovacuum + workers, if there is more than one, so that the sum of the limits of + each worker never exceeds the limit on this variable. This parameter can only be set in the postgresql.conf file or on the server command line. This setting can be overridden for individual tables by entries in diff --git a/doc/src/sgml/maintenance.sgml b/doc/src/sgml/maintenance.sgml index fe5369c19c..2be11332c2 100644 --- a/doc/src/sgml/maintenance.sgml +++ b/doc/src/sgml/maintenance.sgml @@ -1,4 +1,4 @@ - + Routine Database Maintenance Tasks @@ -466,26 +466,43 @@ HINT: Stop the postmaster and use a standalone backend to VACUUM in "mydb". general information - Beginning in PostgreSQL 8.1, there is a - separate optional server process called the autovacuum - daemon, whose purpose is to automate the execution of + Beginning in PostgreSQL 8.1, there is an + optional feature called autovacuum, + whose purpose is to automate the execution of VACUUM and ANALYZE commands. - When enabled, the autovacuum daemon runs periodically and checks for + When enabled, autovacuum checks for tables that have had a large number of inserted, updated or deleted tuples. These checks use the row-level statistics collection facility; - therefore, the autovacuum daemon cannot be used unless and are set to true. Also, - it's important to allow a slot for the autovacuum process when choosing - the value of . In - the default configuration, autovacuuming is enabled and the related + linkend="guc-stats-row-level"> are set to true. + In the default configuration, autovacuuming is enabled and the related configuration parameters are appropriately set. - The autovacuum daemon, when enabled, runs every seconds. On each run, it selects - one database to process and checks each table within that database. + Beginning in PostgreSQL 8.3, autovacuum has a + multi-process architecture: there is a daemon process, called the + autovacuum launcher, which is in charge of starting + an autovacuum worker process on each database every + seconds. + + + + There is a limit of worker + processes that may be running at at any time, so if the VACUUM + and ANALYZE work to do takes too long to run, the deadline may + be failed to meet for other databases. Also, if a particular database + takes long to process, more than one worker may be processing it + simultaneously. The workers are smart enough to avoid repeating work that + other workers have done, so this is normally not a problem. Note that the + number of running workers does not count towards the nor the limits. + + + + On each run, the worker process checks each table within that database, and VACUUM or ANALYZE commands are issued as needed. @@ -581,6 +598,12 @@ analyze threshold = analyze base threshold + analyze scale factor * number of tu + + When multiple workers are running, the cost limit is "balanced" among all + the running workers, so that the total impact on the system is the same, + regardless of the number of workers actually running. + + diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index d350420ab2..f275448756 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -13,7 +13,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.349 2007/03/14 18:48:55 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.350 2007/04/16 18:29:50 alvherre Exp $ * *------------------------------------------------------------------------- */ @@ -3504,6 +3504,9 @@ vacuum_delay_point(void) VacuumCostBalance = 0; + /* update balance values for workers */ + AutoVacuumUpdateDelay(); + /* Might have gotten an interrupt while sleeping */ CHECK_FOR_INTERRUPTS(); } diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index 4631c636c0..9893fa680b 100644 --- a/src/backend/postmaster/autovacuum.c +++ b/src/backend/postmaster/autovacuum.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/postmaster/autovacuum.c,v 1.40 2007/03/28 22:17:12 alvherre Exp $ + * $PostgreSQL: pgsql/src/backend/postmaster/autovacuum.c,v 1.41 2007/04/16 18:29:52 alvherre Exp $ * *------------------------------------------------------------------------- */ @@ -52,6 +52,7 @@ #include "utils/syscache.h" +static volatile sig_atomic_t got_SIGUSR1 = false; static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t avlauncher_shutdown_request = false; @@ -59,6 +60,7 @@ static volatile sig_atomic_t avlauncher_shutdown_request = false; * GUC parameters */ bool autovacuum_start_daemon = false; +int autovacuum_max_workers; int autovacuum_naptime; int autovacuum_vac_thresh; double autovacuum_vac_scale; @@ -69,7 +71,7 @@ int autovacuum_freeze_max_age; int autovacuum_vac_cost_delay; int autovacuum_vac_cost_limit; -/* Flag to tell if we are in the autovacuum daemon process */ +/* Flags to tell if we are in an autovacuum process */ static bool am_autovacuum_launcher = false; static bool am_autovacuum_worker = false; @@ -82,14 +84,22 @@ static int default_freeze_min_age; /* Memory context for long-lived data */ static MemoryContext AutovacMemCxt; -/* struct to keep list of candidate databases for vacuum */ -typedef struct autovac_dbase +/* struct to keep track of databases in launcher */ +typedef struct avl_dbase { - Oid ad_datid; - char *ad_name; - TransactionId ad_frozenxid; - PgStat_StatDBEntry *ad_entry; -} autovac_dbase; + Oid adl_datid; /* hash key -- must be first */ + TimestampTz adl_next_worker; + int adl_score; +} avl_dbase; + +/* struct to keep track of databases in worker */ +typedef struct avw_dbase +{ + Oid adw_datid; + char *adw_name; + TransactionId adw_frozenxid; + PgStat_StatDBEntry *adw_entry; +} avw_dbase; /* struct to keep track of tables to vacuum and/or analyze, in 1st pass */ typedef struct av_relation @@ -110,14 +120,73 @@ typedef struct autovac_table int at_vacuum_cost_limit; } autovac_table; +/*------------- + * This struct holds information about a single worker's whereabouts. We keep + * an array of these in shared memory, sized according to + * autovacuum_max_workers. + * + * wi_links entry into free list or running list + * wi_dboid OID of the database this worker is supposed to work on + * wi_tableoid OID of the table currently being vacuumed + * wi_workerpid PID of the running worker, 0 if not yet started + * wi_launchtime Time at which this worker was launched + * wi_cost_* Vacuum cost-based delay parameters current in this worker + * + * All fields are protected by AutovacuumLock, except for wi_tableoid which is + * protected by AutovacuumScheduleLock (which is read-only for everyone except + * that worker itself). + *------------- + */ +typedef struct WorkerInfoData +{ + SHM_QUEUE wi_links; + Oid wi_dboid; + Oid wi_tableoid; + int wi_workerpid; + TimestampTz wi_launchtime; + int wi_cost_delay; + int wi_cost_limit; + int wi_cost_limit_base; +} WorkerInfoData; + +typedef struct WorkerInfoData *WorkerInfo; + +/*------------- + * The main autovacuum shmem struct. On shared memory we store this main + * struct and the array of WorkerInfo structs. This struct keeps: + * + * av_launcherpid the PID of the autovacuum launcher + * av_freeWorkers the WorkerInfo freelist + * av_runningWorkers the WorkerInfo non-free queue + * av_startingWorker pointer to WorkerInfo currently being started (cleared by + * the worker itself as soon as it's up and running) + * av_rebalance true when a worker determines that cost limits must be + * rebalanced + * + * This struct is protected by AutovacuumLock. + *------------- + */ typedef struct { - Oid process_db; /* OID of database to process */ - int worker_pid; /* PID of the worker process, if any */ + pid_t av_launcherpid; + SHMEM_OFFSET av_freeWorkers; + SHM_QUEUE av_runningWorkers; + SHMEM_OFFSET av_startingWorker; + bool av_rebalance; } AutoVacuumShmemStruct; static AutoVacuumShmemStruct *AutoVacuumShmem; +/* the database list in the launcher, and the context that contains it */ +static Dllist *DatabaseList = NULL; +static MemoryContext DatabaseListCxt = NULL; + +/* Pointer to my own WorkerInfo, valid on each worker */ +static WorkerInfo MyWorkerInfo = NULL; + +/* PID of launcher, valid only in worker while shutting down */ +int AutovacuumLauncherPid = 0; + #ifdef EXEC_BACKEND static pid_t avlauncher_forkexec(void); static pid_t avworker_forkexec(void); @@ -125,9 +194,16 @@ static pid_t avworker_forkexec(void); NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]); NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]); -static void do_start_worker(void); +static Oid do_start_worker(void); +static uint64 launcher_determine_sleep(bool canlaunch, bool recursing); +static void launch_worker(TimestampTz now); +static List *get_database_list(void); +static void rebuild_database_list(Oid newdb); +static int db_comparator(const void *a, const void *b); +static void autovac_balance_cost(void); + static void do_autovacuum(void); -static List *autovac_get_database_list(void); +static void FreeWorkerInfo(int code, Datum arg); static void relation_check_autovac(Oid relid, Form_pg_class classForm, Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry, @@ -147,6 +223,7 @@ static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared, PgStat_StatDBEntry *dbentry); static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid); static void avl_sighup_handler(SIGNAL_ARGS); +static void avl_sigusr1_handler(SIGNAL_ARGS); static void avlauncher_shutdown(SIGNAL_ARGS); static void avl_quickdie(SIGNAL_ARGS); @@ -230,12 +307,34 @@ StartAutoVacLauncher(void) /* * Main loop for the autovacuum launcher process. + * + * The signalling between launcher and worker is as follows: + * + * When the worker has finished starting up, it stores its PID in wi_workerpid + * and sends a SIGUSR1 signal to the launcher. The launcher then knows that + * the postmaster is ready to start a new worker. We do it this way because + * otherwise we risk calling SendPostmasterSignal() when the postmaster hasn't + * yet processed the last one, in which case the second signal would be lost. + * This is only useful when two workers need to be started close to one + * another, which should be rare but it's possible. + * + * When a worker exits, it resets the WorkerInfo struct and puts it back into + * the free list. If there is no free worker slot, it will also signal the + * launcher, which then wakes up and can launch a new worker if it needs to. + * Note that we only need to do it when there's no free worker slot, because + * otherwise there is no need -- the launcher would be awakened normally per + * schedule. + * + * There is a potential problem if, for some reason, a worker starts and is not + * able to bootstrap itself correctly. To prevent this situation from starving + * the whole system, the launcher checks the launch time of the "starting + * worker". If it's too old (older than autovacuum_naptime seconds), it resets + * the worker entry and puts it back into the free list. */ NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]) { sigjmp_buf local_sigjmp_buf; - MemoryContext avlauncher_cxt; /* we are a postmaster subprocess now */ IsUnderPostmaster = true; @@ -264,9 +363,6 @@ AutoVacLauncherMain(int argc, char *argv[]) * Set up signal handlers. Since this is an auxiliary process, it has * particular signal requirements -- no deadlock checker or sinval * catchup, for example. - * - * XXX It may be a good idea to receive signals when an avworker process - * finishes. */ pqsignal(SIGHUP, avl_sighup_handler); @@ -276,7 +372,7 @@ AutoVacLauncherMain(int argc, char *argv[]) pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGUSR1, SIG_IGN); + pqsignal(SIGUSR1, avl_sigusr1_handler); /* We don't listen for async notifies */ pqsignal(SIGUSR2, SIG_IGN); pqsignal(SIGFPE, FloatExceptionHandler); @@ -300,12 +396,12 @@ AutoVacLauncherMain(int argc, char *argv[]) * that we can reset the context during error recovery and thereby avoid * possible memory leaks. */ - avlauncher_cxt = AllocSetContextCreate(TopMemoryContext, - "Autovacuum Launcher", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - MemoryContextSwitchTo(avlauncher_cxt); + AutovacMemCxt = AllocSetContextCreate(TopMemoryContext, + "Autovacuum Launcher", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContextSwitchTo(AutovacMemCxt); /* @@ -336,11 +432,15 @@ AutoVacLauncherMain(int argc, char *argv[]) * Now return to normal top-level context and clear ErrorContext for * next time. */ - MemoryContextSwitchTo(avlauncher_cxt); + MemoryContextSwitchTo(AutovacMemCxt); FlushErrorState(); /* Flush any leaked data in the top-level context */ - MemoryContextResetAndDeleteChildren(avlauncher_cxt); + MemoryContextResetAndDeleteChildren(AutovacMemCxt); + + /* don't leave dangling pointers to freed memory */ + DatabaseListCxt = NULL; + DatabaseList = NULL; /* Make sure pgstat also considers our stat data as gone */ pgstat_clear_snapshot(); @@ -361,18 +461,32 @@ AutoVacLauncherMain(int argc, char *argv[]) ereport(LOG, (errmsg("autovacuum launcher started"))); + /* must unblock signals before calling rebuild_database_list */ PG_SETMASK(&UnBlockSig); + /* in emergency mode, just start a worker and go away */ + if (!autovacuum_start_daemon) + { + do_start_worker(); + proc_exit(0); /* done */ + } + + AutoVacuumShmem->av_launcherpid = MyProcPid; + /* - * take a nap before executing the first iteration, unless we were - * requested an emergency run. + * Create the initial database list. The invariant we want this list to + * keep is that it's ordered by decreasing next_time. As soon as an entry + * is updated to a higher time, it will be moved to the front (which is + * correct because the only operation is to add autovacuum_naptime to the + * entry, and time always increases). */ - if (autovacuum_start_daemon) - pg_usleep(autovacuum_naptime * 1000000L); + rebuild_database_list(InvalidOid); for (;;) { - int worker_pid; + uint64 micros; + bool can_launch; + TimestampTz current_time = 0; /* * Emergency bailout if postmaster has died. This is to avoid the @@ -381,6 +495,13 @@ AutoVacLauncherMain(int argc, char *argv[]) if (!PostmasterIsAlive(true)) exit(1); + micros = launcher_determine_sleep(AutoVacuumShmem->av_freeWorkers != + INVALID_OFFSET, false); + + /* Sleep for a while according to schedule */ + pg_usleep(micros); + + /* the normal shutdown case */ if (avlauncher_shutdown_request) break; @@ -388,82 +509,455 @@ AutoVacLauncherMain(int argc, char *argv[]) { got_SIGHUP = false; ProcessConfigFile(PGC_SIGHUP); + + /* rebalance in case the default cost parameters changed */ + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + autovac_balance_cost(); + LWLockRelease(AutovacuumLock); + + /* rebuild the list in case the naptime changed */ + rebuild_database_list(InvalidOid); + } + + /* a worker started up or finished */ + if (got_SIGUSR1) + { + got_SIGUSR1 = false; + + /* rebalance cost limits, if needed */ + if (AutoVacuumShmem->av_rebalance) + { + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + AutoVacuumShmem->av_rebalance = false; + autovac_balance_cost(); + LWLockRelease(AutovacuumLock); + } } /* - * if there's a worker already running, sleep until it - * disappears. + * There are some conditions that we need to check before trying to + * start a launcher. First, we need to make sure that there is a + * launcher slot available. Second, we need to make sure that no other + * worker is still starting up. */ + LWLockAcquire(AutovacuumLock, LW_SHARED); - worker_pid = AutoVacuumShmem->worker_pid; - LWLockRelease(AutovacuumLock); - if (worker_pid != 0) - { - PGPROC *proc = BackendPidGetProc(worker_pid); + can_launch = (AutoVacuumShmem->av_freeWorkers != INVALID_OFFSET); - if (proc != NULL && proc->isAutovacuum) - goto sleep; + if (can_launch && AutoVacuumShmem->av_startingWorker != INVALID_OFFSET) + { + long secs; + int usecs; + WorkerInfo worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker); + + if (current_time == 0) + current_time = GetCurrentTimestamp(); + + /* + * We can't launch another worker when another one is still + * starting up, so just sleep for a bit more; that worker will wake + * us up again as soon as it's ready. We will only wait + * autovacuum_naptime seconds for this to happen however. Note + * that failure to connect to a particular database is not a + * problem here, because the worker removes itself from the + * startingWorker pointer before trying to connect; only low-level + * problems, like fork() failure, can get us here. + */ + TimestampDifference(worker->wi_launchtime, current_time, + &secs, &usecs); + + /* ignore microseconds, as they cannot make any difference */ + if (secs > autovacuum_naptime) + { + LWLockRelease(AutovacuumLock); + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + /* + * No other process can put a worker in starting mode, so if + * startingWorker is still INVALID after exchanging our lock, + * we assume it's the same one we saw above (so we don't + * recheck the launch time). + */ + if (AutoVacuumShmem->av_startingWorker != INVALID_OFFSET) + { + worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker); + worker->wi_dboid = InvalidOid; + worker->wi_tableoid = InvalidOid; + worker->wi_workerpid = 0; + worker->wi_launchtime = 0; + worker->wi_links.next = AutoVacuumShmem->av_freeWorkers; + AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(worker); + AutoVacuumShmem->av_startingWorker = INVALID_OFFSET; + } + } else { /* - * if the worker is not really running (or it's a process - * that's not an autovacuum worker), remove the PID from shmem. - * This should not happen, because either the worker exits - * cleanly, in which case it'll remove the PID, or it dies, in - * which case postmaster will cause a system reset cycle. + * maybe the postmaster neglected this start signal -- + * resend it. Note: the constraints in + * launcher_determine_sleep keep us from delivering signals too + * quickly (at most once every 100ms). */ - LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); - worker_pid = 0; - LWLockRelease(AutovacuumLock); + SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER); + can_launch = false; } } + LWLockRelease(AutovacuumLock); /* either shared or exclusive */ - do_start_worker(); + if (can_launch) + { + Dlelem *elem; -sleep: - /* - * in emergency mode, exit immediately so that the postmaster can - * request another run right away if needed. - * - * XXX -- maybe it would be better to handle this inside the launcher - * itself. - */ - if (!autovacuum_start_daemon) - break; + elem = DLGetTail(DatabaseList); - /* have pgstat read the file again next time */ - pgstat_clear_snapshot(); + if (current_time == 0) + current_time = GetCurrentTimestamp(); + + if (elem != NULL) + { + avl_dbase *avdb = DLE_VAL(elem); + long secs; + int usecs; - /* now sleep until the next autovac iteration */ - pg_usleep(autovacuum_naptime * 1000000L); + TimestampDifference(current_time, avdb->adl_next_worker, &secs, &usecs); + + /* do we have to start a worker? */ + if (secs <= 0 && usecs <= 0) + launch_worker(current_time); + } + else + { + /* + * Special case when the list is empty: start a worker right + * away. This covers the initial case, when no database is in + * pgstats (thus the list is empty). Note that the constraints + * in launcher_determine_sleep keep us from starting workers + * too quickly (at most once every autovacuum_naptime when the + * list is empty). + */ + launch_worker(current_time); + } + } } /* Normal exit from the autovac launcher is here */ ereport(LOG, (errmsg("autovacuum launcher shutting down"))); + AutoVacuumShmem->av_launcherpid = 0; proc_exit(0); /* done */ } +/* + * Determine the time to sleep, in microseconds, based on the database list. + * + * The "canlaunch" parameter indicates whether we can start a worker right now, + * for example due to the workers being all busy. + */ +static uint64 +launcher_determine_sleep(bool canlaunch, bool recursing) +{ + long secs; + int usecs; + Dlelem *elem; + + /* + * We sleep until the next scheduled vacuum. We trust that when the + * database list was built, care was taken so that no entries have times in + * the past; if the first entry has too close a next_worker value, or a + * time in the past, we will sleep a small nominal time. + */ + if (!canlaunch) + { + secs = autovacuum_naptime; + usecs = 0; + } + else if ((elem = DLGetTail(DatabaseList)) != NULL) + { + avl_dbase *avdb = DLE_VAL(elem); + TimestampTz current_time = GetCurrentTimestamp(); + TimestampTz next_wakeup; + + next_wakeup = avdb->adl_next_worker; + TimestampDifference(current_time, next_wakeup, &secs, &usecs); + } + else + { + /* list is empty, sleep for whole autovacuum_naptime seconds */ + secs = autovacuum_naptime; + usecs = 0; + } + + /* + * If the result is exactly zero, it means a database had an entry with + * time in the past. Rebuild the list so that the databases are evenly + * distributed again, and recalculate the time to sleep. This can happen + * if there are more tables needing vacuum than workers, and they all take + * longer to vacuum than autovacuum_naptime. + * + * We only recurse once. rebuild_database_list should always return times + * in the future, but it seems best not to trust too much on that. + */ + if (secs == 0L && usecs == 0 && !recursing) + { + rebuild_database_list(InvalidOid); + return launcher_determine_sleep(canlaunch, true); + } + + /* 100ms is the smallest time we'll allow the launcher to sleep */ + if (secs <= 0L && usecs <= 100000) + { + secs = 0L; + usecs = 100000; /* 100 ms */ + } + + return secs * 1000000 + usecs; +} + +/* + * Build an updated DatabaseList. It must only contain databases that appear + * in pgstats, and must be sorted by next_worker from highest to lowest, + * distributed regularly across the next autovacuum_naptime interval. + * + * Receives the Oid of the database that made this list be generated (we call + * this the "new" database, because when the database was already present on + * the list, we expect that this function is not called at all). The + * preexisting list, if any, will be used to preserve the order of the + * databases in the autovacuum_naptime period. The new database is put at the + * end of the interval. The actual values are not saved, which should not be + * much of a problem. + */ +static void +rebuild_database_list(Oid newdb) +{ + List *dblist; + ListCell *cell; + MemoryContext newcxt; + MemoryContext oldcxt; + MemoryContext tmpcxt; + HASHCTL hctl; + int score; + int nelems; + HTAB *dbhash; + + /* use fresh stats */ + pgstat_clear_snapshot(); + + newcxt = AllocSetContextCreate(AutovacMemCxt, + "AV dblist", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + tmpcxt = AllocSetContextCreate(newcxt, + "tmp AV dblist", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + oldcxt = MemoryContextSwitchTo(tmpcxt); + + /* + * Implementing this is not as simple as it sounds, because we need to put + * the new database at the end of the list; next the databases that were + * already on the list, and finally (at the tail of the list) all the other + * databases that are not on the existing list. + * + * To do this, we build an empty hash table of scored databases. We will + * start with the lowest score (zero) for the new database, then increasing + * scores for the databases in the existing list, in order, and lastly + * increasing scores for all databases gotten via get_database_list() that + * are not already on the hash. + * + * Then we will put all the hash elements into an array, sort the array by + * score, and finally put the array elements into the new doubly linked + * list. + */ + hctl.keysize = sizeof(Oid); + hctl.entrysize = sizeof(avl_dbase); + hctl.hash = oid_hash; + hctl.hcxt = tmpcxt; + dbhash = hash_create("db hash", 20, &hctl, /* magic number here FIXME */ + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + + /* start by inserting the new database */ + score = 0; + if (OidIsValid(newdb)) + { + avl_dbase *db; + PgStat_StatDBEntry *entry; + + /* only consider this database if it has a pgstat entry */ + entry = pgstat_fetch_stat_dbentry(newdb); + if (entry != NULL) + { + /* we assume it isn't found because the hash was just created */ + db = hash_search(dbhash, &newdb, HASH_ENTER, NULL); + + /* hash_search already filled in the key */ + db->adl_score = score++; + /* next_worker is filled in later */ + } + } + + /* Now insert the databases from the existing list */ + if (DatabaseList != NULL) + { + Dlelem *elem; + + elem = DLGetHead(DatabaseList); + while (elem != NULL) + { + avl_dbase *avdb = DLE_VAL(elem); + avl_dbase *db; + bool found; + PgStat_StatDBEntry *entry; + + elem = DLGetSucc(elem); + + /* + * skip databases with no stat entries -- in particular, this + * gets rid of dropped databases + */ + entry = pgstat_fetch_stat_dbentry(avdb->adl_datid); + if (entry == NULL) + continue; + + db = hash_search(dbhash, &(avdb->adl_datid), HASH_ENTER, &found); + + if (!found) + { + /* hash_search already filled in the key */ + db->adl_score = score++; + /* next_worker is filled in later */ + } + } + } + + /* finally, insert all qualifying databases not previously inserted */ + dblist = get_database_list(); + foreach(cell, dblist) + { + avw_dbase *avdb = lfirst(cell); + avl_dbase *db; + bool found; + PgStat_StatDBEntry *entry; + + /* only consider databases with a pgstat entry */ + entry = pgstat_fetch_stat_dbentry(avdb->adw_datid); + if (entry == NULL) + continue; + + db = hash_search(dbhash, &(avdb->adw_datid), HASH_ENTER, &found); + /* only update the score if the database was not already on the hash */ + if (!found) + { + /* hash_search already filled in the key */ + db->adl_score = score++; + /* next_worker is filled in later */ + } + } + nelems = score; + + /* from here on, the allocated memory belongs to the new list */ + MemoryContextSwitchTo(newcxt); + DatabaseList = DLNewList(); + + if (nelems > 0) + { + TimestampTz current_time; + int millis_increment; + avl_dbase *dbary; + avl_dbase *db; + HASH_SEQ_STATUS seq; + int i; + + /* put all the hash elements into an array */ + dbary = palloc(nelems * sizeof(avl_dbase)); + + i = 0; + hash_seq_init(&seq, dbhash); + while ((db = hash_seq_search(&seq)) != NULL) + memcpy(&(dbary[i++]), db, sizeof(avl_dbase)); + + /* sort the array */ + qsort(dbary, nelems, sizeof(avl_dbase), db_comparator); + + /* this is the time interval between databases in the schedule */ + millis_increment = 1000.0 * autovacuum_naptime / nelems; + current_time = GetCurrentTimestamp(); + + /* + * move the elements from the array into the dllist, setting the + * next_worker while walking the array + */ + for (i = 0; i < nelems; i++) + { + avl_dbase *db = &(dbary[i]); + Dlelem *elem; + + current_time = TimestampTzPlusMilliseconds(current_time, + millis_increment); + db->adl_next_worker = current_time; + + elem = DLNewElem(db); + /* later elements should go closer to the head of the list */ + DLAddHead(DatabaseList, elem); + } + } + + /* all done, clean up memory */ + if (DatabaseListCxt != NULL) + MemoryContextDelete(DatabaseListCxt); + MemoryContextDelete(tmpcxt); + DatabaseListCxt = newcxt; + MemoryContextSwitchTo(oldcxt); +} + +/* qsort comparator for avl_dbase, using adl_score */ +static int +db_comparator(const void *a, const void *b) +{ + if (((avl_dbase *) a)->adl_score == ((avl_dbase *) b)->adl_score) + return 0; + else + return (((avl_dbase *) a)->adl_score < ((avl_dbase *) b)->adl_score) ? 1 : -1; +} + /* * do_start_worker * * Bare-bones procedure for starting an autovacuum worker from the launcher. * It determines what database to work on, sets up shared memory stuff and - * signals postmaster to start the worker. + * signals postmaster to start the worker. It fails gracefully if invoked when + * autovacuum_workers are already active. + * + * Return value is the OID of the database that the worker is going to process, + * or InvalidOid if no worker was actually started. */ -static void +static Oid do_start_worker(void) { List *dblist; - bool for_xid_wrap; - autovac_dbase *db; - ListCell *cell; + ListCell *cell; TransactionId xidForceLimit; + bool for_xid_wrap; + avw_dbase *avdb; + TimestampTz current_time; + bool skipit = false; + + /* return quickly when there are no free workers */ + LWLockAcquire(AutovacuumLock, LW_SHARED); + if (AutoVacuumShmem->av_freeWorkers == INVALID_OFFSET) + { + LWLockRelease(AutovacuumLock); + return InvalidOid; + } + LWLockRelease(AutovacuumLock); + + /* use fresh stats */ + pgstat_clear_snapshot(); /* Get a list of databases */ - dblist = autovac_get_database_list(); + dblist = get_database_list(); /* * Determine the oldest datfrozenxid/relfrozenxid that we will allow @@ -495,21 +989,23 @@ do_start_worker(void) * isn't clear how to construct a metric that measures that and not cause * starvation for less busy databases. */ - db = NULL; + avdb = NULL; for_xid_wrap = false; + current_time = GetCurrentTimestamp(); foreach(cell, dblist) { - autovac_dbase *tmp = lfirst(cell); + avw_dbase *tmp = lfirst(cell); + Dlelem *elem; /* Find pgstat entry if any */ - tmp->ad_entry = pgstat_fetch_stat_dbentry(tmp->ad_datid); + tmp->adw_entry = pgstat_fetch_stat_dbentry(tmp->adw_datid); /* Check to see if this one is at risk of wraparound */ - if (TransactionIdPrecedes(tmp->ad_frozenxid, xidForceLimit)) + if (TransactionIdPrecedes(tmp->adw_frozenxid, xidForceLimit)) { - if (db == NULL || - TransactionIdPrecedes(tmp->ad_frozenxid, db->ad_frozenxid)) - db = tmp; + if (avdb == NULL || + TransactionIdPrecedes(tmp->adw_frozenxid, avdb->adw_frozenxid)) + avdb = tmp; for_xid_wrap = true; continue; } @@ -520,26 +1016,156 @@ do_start_worker(void) * Otherwise, skip a database with no pgstat entry; it means it * hasn't seen any activity. */ - if (!tmp->ad_entry) + if (!tmp->adw_entry) + continue; + + /* + * Also, skip a database that appears on the database list as having + * been processed recently (less than autovacuum_naptime seconds ago). + * We do this so that we don't select a database which we just + * selected, but that pgstat hasn't gotten around to updating the last + * autovacuum time yet. + */ + skipit = false; + elem = DatabaseList ? DLGetTail(DatabaseList) : NULL; + + while (elem != NULL) + { + avl_dbase *dbp = DLE_VAL(elem); + + if (dbp->adl_datid == tmp->adw_datid) + { + TimestampTz curr_plus_naptime; + TimestampTz next = dbp->adl_next_worker; + + curr_plus_naptime = + TimestampTzPlusMilliseconds(current_time, + autovacuum_naptime * 1000); + + /* + * What we want here if to skip if next_worker falls between + * the current time and the current time plus naptime. + */ + if (timestamp_cmp_internal(current_time, next) > 0) + skipit = false; + else if (timestamp_cmp_internal(next, curr_plus_naptime) > 0) + skipit = false; + else + skipit = true; + + break; + } + elem = DLGetPred(elem); + } + if (skipit) continue; /* * Remember the db with oldest autovac time. (If we are here, * both tmp->entry and db->entry must be non-null.) */ - if (db == NULL || - tmp->ad_entry->last_autovac_time < db->ad_entry->last_autovac_time) - db = tmp; + if (avdb == NULL || + tmp->adw_entry->last_autovac_time < avdb->adw_entry->last_autovac_time) + avdb = tmp; } /* Found a database -- process it */ - if (db != NULL) + if (avdb != NULL) { + WorkerInfo worker; + SHMEM_OFFSET sworker; + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); - AutoVacuumShmem->process_db = db->ad_datid; + + /* + * Get a worker entry from the freelist. We checked above, so there + * really should be a free slot -- complain very loudly if there isn't. + */ + sworker = AutoVacuumShmem->av_freeWorkers; + if (sworker == INVALID_OFFSET) + elog(FATAL, "no free worker found"); + + worker = (WorkerInfo) MAKE_PTR(sworker); + AutoVacuumShmem->av_freeWorkers = worker->wi_links.next; + + worker->wi_dboid = avdb->adw_datid; + worker->wi_workerpid = 0; + worker->wi_launchtime = GetCurrentTimestamp(); + + AutoVacuumShmem->av_startingWorker = sworker; + LWLockRelease(AutovacuumLock); SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER); + + return avdb->adw_datid; + } + else if (skipit) + { + /* + * If we skipped all databases on the list, rebuild it, because it + * probably contains a dropped database. + */ + rebuild_database_list(InvalidOid); + } + + return InvalidOid; +} + +/* + * launch_worker + * + * Wrapper for starting a worker from the launcher. Besides actually starting + * it, update the database list to reflect the next time that another one will + * need to be started on the selected database. The actual database choice is + * left to do_start_worker. + * + * This routine is also expected to insert an entry into the database list if + * the selected database was previously absent from the list. It returns the + * new database list. + */ +static void +launch_worker(TimestampTz now) +{ + Oid dbid; + Dlelem *elem; + + dbid = do_start_worker(); + if (OidIsValid(dbid)) + { + /* + * Walk the database list and update the corresponding entry. If the + * database is not on the list, we'll recreate the list. + */ + elem = (DatabaseList == NULL) ? NULL : DLGetHead(DatabaseList); + while (elem != NULL) + { + avl_dbase *avdb = DLE_VAL(elem); + + if (avdb->adl_datid == dbid) + { + /* + * add autovacuum_naptime seconds to the current time, and use + * that as the new "next_worker" field for this database. + */ + avdb->adl_next_worker = + TimestampTzPlusMilliseconds(now, autovacuum_naptime * 1000); + + DLMoveToFront(elem); + break; + } + elem = DLGetSucc(elem); + } + + /* + * If the database was not present in the database list, we rebuild the + * list. It's possible that the database does not get into the list + * anyway, for example if it's a database that doesn't have a pgstat + * entry, but this is not a problem because we don't want to schedule + * workers regularly into those in any case. + */ + if (elem == NULL) + rebuild_database_list(dbid); } } @@ -550,6 +1176,13 @@ avl_sighup_handler(SIGNAL_ARGS) got_SIGHUP = true; } +/* SIGUSR1: a worker is up and running, or just finished */ +static void +avl_sigusr1_handler(SIGNAL_ARGS) +{ + got_SIGUSR1 = true; +} + static void avlauncher_shutdown(SIGNAL_ARGS) { @@ -665,7 +1298,7 @@ NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]) { sigjmp_buf local_sigjmp_buf; - Oid dbid; + Oid dbid = InvalidOid; /* we are a postmaster subprocess now */ IsUnderPostmaster = true; @@ -763,18 +1396,35 @@ AutoVacWorkerMain(int argc, char *argv[]) SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE); /* - * Get the database Id we're going to work on, and announce our PID - * in the shared memory area. We remove the database OID immediately - * from the shared memory area. + * Force statement_timeout to zero to avoid a timeout setting from + * preventing regular maintenance from being executed. */ - LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE); - dbid = AutoVacuumShmem->process_db; - AutoVacuumShmem->process_db = InvalidOid; - AutoVacuumShmem->worker_pid = MyProcPid; + /* + * Get the info about the database we're going to work on. + */ + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + MyWorkerInfo = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker); + dbid = MyWorkerInfo->wi_dboid; + MyWorkerInfo->wi_workerpid = MyProcPid; + /* insert into the running list */ + SHMQueueInsertBefore(&AutoVacuumShmem->av_runningWorkers, + &MyWorkerInfo->wi_links); + /* + * remove from the "starting" pointer, so that the launcher can start a new + * worker if required + */ + AutoVacuumShmem->av_startingWorker = INVALID_OFFSET; LWLockRelease(AutovacuumLock); + on_shmem_exit(FreeWorkerInfo, 0); + + /* wake up the launcher */ + if (AutoVacuumShmem->av_launcherpid != 0) + kill(AutoVacuumShmem->av_launcherpid, SIGUSR1); + if (OidIsValid(dbid)) { char *dbname; @@ -803,7 +1453,7 @@ AutoVacWorkerMain(int argc, char *argv[]) /* Create the memory context where cross-transaction state is stored */ AutovacMemCxt = AllocSetContextCreate(TopMemoryContext, - "Autovacuum context", + "AV worker", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); @@ -814,25 +1464,152 @@ AutoVacWorkerMain(int argc, char *argv[]) } /* - * Now remove our PID from shared memory, so that the launcher can start - * another worker as soon as appropriate. + * FIXME -- we need to notify the launcher when we are gone. But this + * should be done after our PGPROC is released, in ProcKill. */ - LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); - AutoVacuumShmem->worker_pid = 0; - LWLockRelease(AutovacuumLock); /* All done, go away */ proc_exit(0); } /* - * autovac_get_database_list + * Return a WorkerInfo to the free list */ +static void +FreeWorkerInfo(int code, Datum arg) +{ + if (MyWorkerInfo != NULL) + { + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + + /* + * If this worker shuts down when there is no free worker slot, wake + * the launcher up so that he can launch a new worker immediately if + * required. We only save the launcher's PID in local memory here -- + * the actual signal will be sent when the PGPROC is recycled, because + * that is when the new worker can actually be launched. + * + * We somewhat ignore the risk that the launcher changes its PID + * between we reading it and the actual kill; we expect ProcKill to be + * called shortly after us, and we assume that PIDs are not reused too + * quickly after a process exits. + */ + if (AutoVacuumShmem->av_freeWorkers == INVALID_OFFSET) + AutovacuumLauncherPid = AutoVacuumShmem->av_launcherpid; + + SHMQueueDelete(&MyWorkerInfo->wi_links); + MyWorkerInfo->wi_links.next = AutoVacuumShmem->av_freeWorkers; + MyWorkerInfo->wi_dboid = InvalidOid; + MyWorkerInfo->wi_tableoid = InvalidOid; + MyWorkerInfo->wi_workerpid = 0; + MyWorkerInfo->wi_launchtime = 0; + MyWorkerInfo->wi_cost_delay = 0; + MyWorkerInfo->wi_cost_limit = 0; + MyWorkerInfo->wi_cost_limit_base = 0; + AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(MyWorkerInfo); + /* not mine anymore */ + MyWorkerInfo = NULL; + + /* + * now that we're inactive, cause a rebalancing of the surviving + * workers + */ + AutoVacuumShmem->av_rebalance = true; + LWLockRelease(AutovacuumLock); + } +} + +/* + * Update the cost-based delay parameters, so that multiple workers consume + * each a fraction of the total available I/O. + */ +void +AutoVacuumUpdateDelay(void) +{ + if (MyWorkerInfo) + { + VacuumCostDelay = MyWorkerInfo->wi_cost_delay; + VacuumCostLimit = MyWorkerInfo->wi_cost_limit; + } +} + +/* + * autovac_balance_cost + * Recalculate the cost limit setting for each active workers. + * + * Caller must hold the AutovacuumLock in exclusive mode. + */ +static void +autovac_balance_cost(void) +{ + WorkerInfo worker; + int vac_cost_limit = (autovacuum_vac_cost_limit >= 0 ? + autovacuum_vac_cost_limit : VacuumCostLimit); + int vac_cost_delay = (autovacuum_vac_cost_delay >= 0 ? + autovacuum_vac_cost_delay : VacuumCostDelay); + double cost_total; + double cost_avail; + + /* not set? nothing to do */ + if (vac_cost_limit <= 0 || vac_cost_delay <= 0) + return; + + /* caculate the total base cost limit of active workers */ + cost_total = 0.0; + worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers, + &AutoVacuumShmem->av_runningWorkers, + offsetof(WorkerInfoData, wi_links)); + while (worker) + { + if (worker->wi_workerpid != 0 && + worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0) + cost_total += + (double) worker->wi_cost_limit_base / worker->wi_cost_delay; + + worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers, + &worker->wi_links, + offsetof(WorkerInfoData, wi_links)); + } + /* there are no cost limits -- nothing to do */ + if (cost_total <= 0) + return; + + /* + * Adjust each cost limit of active workers to balance the total of + * cost limit to autovacuum_vacuum_cost_limit. + */ + cost_avail = (double) vac_cost_limit / vac_cost_delay; + worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers, + &AutoVacuumShmem->av_runningWorkers, + offsetof(WorkerInfoData, wi_links)); + while (worker) + { + if (worker->wi_workerpid != 0 && + worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0) + { + int limit = (int) + (cost_avail * worker->wi_cost_limit_base / cost_total); + + worker->wi_cost_limit = Min(limit, worker->wi_cost_limit_base); + + elog(DEBUG2, "autovac_balance_cost(pid=%u db=%u, rel=%u, cost_limit=%d, cost_delay=%d)", + worker->wi_workerpid, worker->wi_dboid, + worker->wi_tableoid, worker->wi_cost_limit, worker->wi_cost_delay); + } + + worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers, + &worker->wi_links, + offsetof(WorkerInfoData, wi_links)); + } +} + +/* + * get_database_list * * Return a list of all databases. Note we cannot use pg_database, * because we aren't connected; we use the flat database file. */ static List * -autovac_get_database_list(void) +get_database_list(void) { char *filename; List *dblist = NIL; @@ -852,15 +1629,15 @@ autovac_get_database_list(void) while (read_pg_database_line(db_file, thisname, &db_id, &db_tablespace, &db_frozenxid)) { - autovac_dbase *avdb; + avw_dbase *avdb; - avdb = (autovac_dbase *) palloc(sizeof(autovac_dbase)); + avdb = (avw_dbase *) palloc(sizeof(avw_dbase)); - avdb->ad_datid = db_id; - avdb->ad_name = pstrdup(thisname); - avdb->ad_frozenxid = db_frozenxid; + avdb->adw_datid = db_id; + avdb->adw_name = pstrdup(thisname); + avdb->adw_frozenxid = db_frozenxid; /* this gets set later: */ - avdb->ad_entry = NULL; + avdb->adw_entry = NULL; dblist = lappend(dblist, avdb); } @@ -1008,12 +1785,12 @@ do_autovacuum(void) * Add to the list of tables to vacuum, the OIDs of the tables that * correspond to the saved OIDs of toast tables needing vacuum. */ - foreach (cell, toast_oids) + foreach(cell, toast_oids) { Oid toastoid = lfirst_oid(cell); ListCell *cell2; - foreach (cell2, table_toast_list) + foreach(cell2, table_toast_list) { av_relation *ar = lfirst(cell2); @@ -1038,9 +1815,55 @@ do_autovacuum(void) Oid relid = lfirst_oid(cell); autovac_table *tab; char *relname; + WorkerInfo worker; + bool skipit; CHECK_FOR_INTERRUPTS(); + /* + * hold schedule lock from here until we're sure that this table + * still needs vacuuming. We also need the AutovacuumLock to walk + * the worker array, but we'll let go of that one quickly. + */ + LWLockAcquire(AutovacuumScheduleLock, LW_EXCLUSIVE); + LWLockAcquire(AutovacuumLock, LW_SHARED); + + /* + * Check whether the table is being vacuumed concurrently by another + * worker. + */ + skipit = false; + worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers, + &AutoVacuumShmem->av_runningWorkers, + offsetof(WorkerInfoData, wi_links)); + while (worker) + { + /* ignore myself */ + if (worker == MyWorkerInfo) + goto next_worker; + + /* ignore workers in other databases */ + if (worker->wi_dboid != MyDatabaseId) + goto next_worker; + + if (worker->wi_tableoid == relid) + { + skipit = true; + break; + } + +next_worker: + worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers, + &worker->wi_links, + offsetof(WorkerInfoData, wi_links)); + } + LWLockRelease(AutovacuumLock); + if (skipit) + { + LWLockRelease(AutovacuumScheduleLock); + continue; + } + /* * Check whether pgstat data still says we need to vacuum this table. * It could have changed if something else processed the table while we @@ -1053,11 +1876,18 @@ do_autovacuum(void) if (tab == NULL) { /* someone else vacuumed the table */ + LWLockRelease(AutovacuumScheduleLock); continue; } - /* Ok, good to go! */ - /* Set the vacuum cost parameters for this table */ + /* + * Ok, good to go. Store the table in shared memory before releasing + * the lock so that other workers don't vacuum it concurrently. + */ + MyWorkerInfo->wi_tableoid = relid; + LWLockRelease(AutovacuumScheduleLock); + + /* Set the initial vacuum cost parameters for this table */ VacuumCostDelay = tab->at_vacuum_cost_delay; VacuumCostLimit = tab->at_vacuum_cost_limit; @@ -1067,6 +1897,18 @@ do_autovacuum(void) (tab->at_doanalyze ? " ANALYZE" : ""), relname); + /* + * Advertise my cost delay parameters for the balancing algorithm, and + * do a balance + */ + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + MyWorkerInfo->wi_cost_delay = tab->at_vacuum_cost_delay; + MyWorkerInfo->wi_cost_limit = tab->at_vacuum_cost_limit; + MyWorkerInfo->wi_cost_limit_base = tab->at_vacuum_cost_limit; + autovac_balance_cost(); + LWLockRelease(AutovacuumLock); + + /* have at it */ autovacuum_do_vac_analyze(tab->at_relid, tab->at_dovacuum, tab->at_doanalyze, @@ -1211,7 +2053,7 @@ table_recheck_autovac(Oid relid) PgStat_StatDBEntry *shared; PgStat_StatDBEntry *dbentry; - /* We need fresh pgstat data for this */ + /* use fresh stats */ pgstat_clear_snapshot(); shared = pgstat_fetch_stat_dbentry(InvalidOid); @@ -1219,8 +2061,8 @@ table_recheck_autovac(Oid relid) /* fetch the relation's relcache entry */ classTup = SearchSysCacheCopy(RELOID, - ObjectIdGetDatum(relid), - 0, 0, 0); + ObjectIdGetDatum(relid), + 0, 0, 0); if (!HeapTupleIsValid(classTup)) return NULL; classForm = (Form_pg_class) GETSTRUCT(classTup); @@ -1630,7 +2472,16 @@ IsAutoVacuumWorkerProcess(void) Size AutoVacuumShmemSize(void) { - return sizeof(AutoVacuumShmemStruct); + Size size; + + /* + * Need the fixed struct and the array of WorkerInfoData. + */ + size = sizeof(AutoVacuumShmemStruct); + size = MAXALIGN(size); + size = add_size(size, mul_size(autovacuum_max_workers, + sizeof(WorkerInfoData))); + return size; } /* @@ -1650,8 +2501,29 @@ AutoVacuumShmemInit(void) ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("not enough shared memory for autovacuum"))); - if (found) - return; /* already initialized */ - MemSet(AutoVacuumShmem, 0, sizeof(AutoVacuumShmemStruct)); + if (!IsUnderPostmaster) + { + WorkerInfo worker; + int i; + + Assert(!found); + + AutoVacuumShmem->av_launcherpid = 0; + AutoVacuumShmem->av_freeWorkers = INVALID_OFFSET; + SHMQueueInit(&AutoVacuumShmem->av_runningWorkers); + AutoVacuumShmem->av_startingWorker = INVALID_OFFSET; + + worker = (WorkerInfo) ((char *) AutoVacuumShmem + + MAXALIGN(sizeof(AutoVacuumShmemStruct))); + + /* initialize the WorkerInfo free list */ + for (i = 0; i < autovacuum_max_workers; i++) + { + worker[i].wi_links.next = AutoVacuumShmem->av_freeWorkers; + AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(&worker[i]); + } + } + else + Assert(found); } diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 2691a50582..e2b8f22d35 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.187 2007/04/03 16:34:36 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.188 2007/04/16 18:29:53 alvherre Exp $ * *------------------------------------------------------------------------- */ @@ -96,7 +96,7 @@ ProcGlobalShmemSize(void) size = add_size(size, sizeof(PROC_HDR)); /* AuxiliaryProcs */ size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC))); - /* MyProcs */ + /* MyProcs, including autovacuum */ size = add_size(size, mul_size(MaxBackends, sizeof(PGPROC))); /* ProcStructLock */ size = add_size(size, sizeof(slock_t)); @@ -110,7 +110,10 @@ ProcGlobalShmemSize(void) int ProcGlobalSemas(void) { - /* We need a sema per backend, plus one for each auxiliary process. */ + /* + * We need a sema per backend (including autovacuum), plus one for each + * auxiliary process. + */ return MaxBackends + NUM_AUXILIARY_PROCS; } @@ -127,8 +130,8 @@ ProcGlobalSemas(void) * running out when trying to start another backend is a common failure. * So, now we grab enough semaphores to support the desired max number * of backends immediately at initialization --- if the sysadmin has set - * MaxBackends higher than his kernel will support, he'll find out sooner - * rather than later. + * MaxConnections or autovacuum_max_workers higher than his kernel will + * support, he'll find out sooner rather than later. * * Another reason for creating semaphores here is that the semaphore * implementation typically requires us to create semaphores in the @@ -163,25 +166,39 @@ InitProcGlobal(void) * Initialize the data structures. */ ProcGlobal->freeProcs = INVALID_OFFSET; + ProcGlobal->autovacFreeProcs = INVALID_OFFSET; ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY; /* * Pre-create the PGPROC structures and create a semaphore for each. */ - procs = (PGPROC *) ShmemAlloc(MaxBackends * sizeof(PGPROC)); + procs = (PGPROC *) ShmemAlloc((MaxConnections) * sizeof(PGPROC)); if (!procs) ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"))); - MemSet(procs, 0, MaxBackends * sizeof(PGPROC)); - for (i = 0; i < MaxBackends; i++) + MemSet(procs, 0, MaxConnections * sizeof(PGPROC)); + for (i = 0; i < MaxConnections; i++) { PGSemaphoreCreate(&(procs[i].sem)); procs[i].links.next = ProcGlobal->freeProcs; ProcGlobal->freeProcs = MAKE_OFFSET(&procs[i]); } + procs = (PGPROC *) ShmemAlloc((autovacuum_max_workers) * sizeof(PGPROC)); + if (!procs) + ereport(FATAL, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of shared memory"))); + MemSet(procs, 0, autovacuum_max_workers * sizeof(PGPROC)); + for (i = 0; i < autovacuum_max_workers; i++) + { + PGSemaphoreCreate(&(procs[i].sem)); + procs[i].links.next = ProcGlobal->autovacFreeProcs; + ProcGlobal->autovacFreeProcs = MAKE_OFFSET(&procs[i]); + } + MemSet(AuxiliaryProcs, 0, NUM_AUXILIARY_PROCS * sizeof(PGPROC)); for (i = 0; i < NUM_AUXILIARY_PROCS; i++) { @@ -226,12 +243,18 @@ InitProcess(void) set_spins_per_delay(procglobal->spins_per_delay); - myOffset = procglobal->freeProcs; + if (IsAutoVacuumWorkerProcess()) + myOffset = procglobal->autovacFreeProcs; + else + myOffset = procglobal->freeProcs; if (myOffset != INVALID_OFFSET) { MyProc = (PGPROC *) MAKE_PTR(myOffset); - procglobal->freeProcs = MyProc->links.next; + if (IsAutoVacuumWorkerProcess()) + procglobal->autovacFreeProcs = MyProc->links.next; + else + procglobal->freeProcs = MyProc->links.next; SpinLockRelease(ProcStructLock); } else @@ -239,7 +262,8 @@ InitProcess(void) /* * If we reach here, all the PGPROCs are in use. This is one of the * possible places to detect "too many backends", so give the standard - * error message. + * error message. XXX do we need to give a different failure message + * in the autovacuum case? */ SpinLockRelease(ProcStructLock); ereport(FATAL, @@ -571,8 +595,16 @@ ProcKill(int code, Datum arg) SpinLockAcquire(ProcStructLock); /* Return PGPROC structure (and semaphore) to freelist */ - MyProc->links.next = procglobal->freeProcs; - procglobal->freeProcs = MAKE_OFFSET(MyProc); + if (IsAutoVacuumWorkerProcess()) + { + MyProc->links.next = procglobal->autovacFreeProcs; + procglobal->autovacFreeProcs = MAKE_OFFSET(MyProc); + } + else + { + MyProc->links.next = procglobal->freeProcs; + procglobal->freeProcs = MAKE_OFFSET(MyProc); + } /* PGPROC struct isn't mine anymore */ MyProc = NULL; @@ -581,6 +613,10 @@ ProcKill(int code, Datum arg) procglobal->spins_per_delay = update_spins_per_delay(procglobal->spins_per_delay); SpinLockRelease(ProcStructLock); + + /* wake autovac launcher if needed -- see comments in FreeWorkerInfo */ + if (AutovacuumLauncherPid != 0) + kill(AutovacuumLauncherPid, SIGUSR1); } /* diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index 130244b5e5..b5b1150056 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/init/globals.c,v 1.100 2007/01/05 22:19:44 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/utils/init/globals.c,v 1.101 2007/04/16 18:29:54 alvherre Exp $ * * NOTES * Globals used all over the place should be declared here and not @@ -95,9 +95,14 @@ bool allowSystemTableMods = false; int work_mem = 1024; int maintenance_work_mem = 16384; -/* Primary determinants of sizes of shared-memory structures: */ +/* + * Primary determinants of sizes of shared-memory structures. MaxBackends is + * MaxConnections + autovacuum_max_workers (it is computed by the GUC assign + * hook): + */ int NBuffers = 1000; int MaxBackends = 100; +int MaxConnections = 90; int VacuumCostPageHit = 1; /* GUC parameters for vacuum */ int VacuumCostPageMiss = 10; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 385411c058..83ea00c568 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -10,7 +10,7 @@ * Written by Peter Eisentraut . * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.384 2007/04/12 06:53:47 neilc Exp $ + * $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.385 2007/04/16 18:29:55 alvherre Exp $ * *-------------------------------------------------------------------- */ @@ -163,6 +163,8 @@ static bool assign_tcp_keepalives_count(int newval, bool doit, GucSource source) static const char *show_tcp_keepalives_idle(void); static const char *show_tcp_keepalives_interval(void); static const char *show_tcp_keepalives_count(void); +static bool assign_autovacuum_max_workers(int newval, bool doit, GucSource source); +static bool assign_maxconnections(int newval, bool doit, GucSource source); /* * GUC option variables that are exported from this module @@ -1149,16 +1151,19 @@ static struct config_int ConfigureNamesInt[] = * number. * * MaxBackends is limited to INT_MAX/4 because some places compute - * 4*MaxBackends without any overflow check. Likewise we have to limit - * NBuffers to INT_MAX/2. + * 4*MaxBackends without any overflow check. This check is made on + * assign_maxconnections, since MaxBackends is computed as MaxConnections + + * autovacuum_max_workers. + * + * Likewise we have to limit NBuffers to INT_MAX/2. */ { {"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS, gettext_noop("Sets the maximum number of concurrent connections."), NULL }, - &MaxBackends, - 100, 1, INT_MAX / 4, NULL, NULL + &MaxConnections, + 100, 1, INT_MAX / 4, assign_maxconnections, NULL }, { @@ -1622,6 +1627,15 @@ static struct config_int ConfigureNamesInt[] = &autovacuum_freeze_max_age, 200000000, 100000000, 2000000000, NULL, NULL }, + { + /* see max_connections */ + {"autovacuum_max_workers", PGC_POSTMASTER, AUTOVACUUM, + gettext_noop("Sets the maximum number of simultaneously running autovacuum worker processes."), + NULL + }, + &autovacuum_max_workers, + 3, 1, INT_MAX / 4, assign_autovacuum_max_workers, NULL + }, { {"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER, @@ -6692,5 +6706,32 @@ show_tcp_keepalives_count(void) return nbuf; } +static bool +assign_maxconnections(int newval, bool doit, GucSource source) +{ + if (doit) + { + if (newval + autovacuum_max_workers > INT_MAX / 4) + return false; + + MaxBackends = newval + autovacuum_max_workers; + } + + return true; +} + +static bool +assign_autovacuum_max_workers(int newval, bool doit, GucSource source) +{ + if (doit) + { + if (newval + MaxConnections > INT_MAX / 4) + return false; + + MaxBackends = newval + MaxConnections; + } + + return true; +} #include "guc-file.c" diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 22f9685bbd..bc5b642d02 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -376,6 +376,7 @@ #autovacuum = on # enable autovacuum subprocess? # 'on' requires stats_start_collector # and stats_row_level to also be on +#autovacuum_max_workers = 3 # max # of autovacuum subprocesses #autovacuum_naptime = 1min # time between autovacuum runs #autovacuum_vacuum_threshold = 500 # min # of tuple updates before # vacuum diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index ca5cc799c5..c5090581c5 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -13,7 +13,7 @@ * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/miscadmin.h,v 1.193 2007/03/01 14:52:04 petere Exp $ + * $PostgreSQL: pgsql/src/include/miscadmin.h,v 1.194 2007/04/16 18:29:56 alvherre Exp $ * * NOTES * some of the information in this file should be moved to other files. @@ -129,6 +129,7 @@ extern DLLIMPORT char *DataDir; extern DLLIMPORT int NBuffers; extern int MaxBackends; +extern int MaxConnections; extern DLLIMPORT int MyProcPid; extern DLLIMPORT struct Port *MyProcPort; diff --git a/src/include/postmaster/autovacuum.h b/src/include/postmaster/autovacuum.h index facf9de52b..ccd982b681 100644 --- a/src/include/postmaster/autovacuum.h +++ b/src/include/postmaster/autovacuum.h @@ -7,15 +7,18 @@ * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/postmaster/autovacuum.h,v 1.8 2007/02/15 23:23:23 alvherre Exp $ + * $PostgreSQL: pgsql/src/include/postmaster/autovacuum.h,v 1.9 2007/04/16 18:30:03 alvherre Exp $ * *------------------------------------------------------------------------- */ #ifndef AUTOVACUUM_H #define AUTOVACUUM_H +#include "storage/lock.h" + /* GUC variables */ extern bool autovacuum_start_daemon; +extern int autovacuum_max_workers; extern int autovacuum_naptime; extern int autovacuum_vac_thresh; extern double autovacuum_vac_scale; @@ -25,6 +28,9 @@ extern int autovacuum_freeze_max_age; extern int autovacuum_vac_cost_delay; extern int autovacuum_vac_cost_limit; +/* autovacuum launcher PID, only valid when worker is shutting down */ +extern int AutovacuumLauncherPid; + /* Status inquiry functions */ extern bool AutoVacuumingActive(void); extern bool IsAutoVacuumLauncherProcess(void); @@ -35,6 +41,9 @@ extern void autovac_init(void); extern int StartAutoVacLauncher(void); extern int StartAutoVacWorker(void); +/* autovacuum cost-delay balancer */ +extern void AutoVacuumUpdateDelay(void); + #ifdef EXEC_BACKEND extern void AutoVacLauncherMain(int argc, char *argv[]); extern void AutoVacWorkerMain(int argc, char *argv[]); diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index c47256a159..477284b7d1 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.35 2007/04/03 16:34:36 tgl Exp $ + * $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.36 2007/04/16 18:30:04 alvherre Exp $ * *------------------------------------------------------------------------- */ @@ -61,6 +61,7 @@ typedef enum LWLockId BtreeVacuumLock, AddinShmemInitLock, AutovacuumLock, + AutovacuumScheduleLock, /* Individual lock IDs end here */ FirstBufMappingLock, FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS, diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 772cf52cdf..1fd4e264f0 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/storage/proc.h,v 1.97 2007/04/03 16:34:36 tgl Exp $ + * $PostgreSQL: pgsql/src/include/storage/proc.h,v 1.98 2007/04/16 18:30:04 alvherre Exp $ * *------------------------------------------------------------------------- */ @@ -115,6 +115,8 @@ typedef struct PROC_HDR { /* Head of list of free PGPROC structures */ SHMEM_OFFSET freeProcs; + /* Head of list of autovacuum's free PGPROC structures */ + SHMEM_OFFSET autovacFreeProcs; /* Current shared estimate of appropriate spins_per_delay value */ int spins_per_delay; } PROC_HDR; -- GitLab