pg_backup_archiver.c 110.6 KB
Newer Older
B
Bruce Momjian 已提交
1 2 3 4 5 6 7 8 9 10
/*-------------------------------------------------------------------------
 *
 * pg_backup_archiver.c
 *
 *	Private implementation of the archiver routines.
 *
 *	See the headers to pg_restore for more details.
 *
 * Copyright (c) 2000, Philip Warner
 *	Rights are granted to use this software in any way so long
B
Bruce Momjian 已提交
11
 *	as this notice is not removed.
B
Bruce Momjian 已提交
12 13
 *
 *	The author is not responsible for loss or damages that may
14
 *	result from its use.
B
Bruce Momjian 已提交
15 16 17
 *
 *
 * IDENTIFICATION
18
 *		src/bin/pg_dump/pg_backup_archiver.c
19
 *
B
Bruce Momjian 已提交
20 21
 *-------------------------------------------------------------------------
 */
22
#include "postgres_fe.h"
B
Bruce Momjian 已提交
23

24 25
#include "parallel.h"
#include "pg_backup_archiver.h"
26
#include "pg_backup_db.h"
27
#include "pg_backup_utils.h"
28

29
#include <ctype.h>
A
Andrew Dunstan 已提交
30
#include <fcntl.h>
31
#include <unistd.h>
32
#include <sys/stat.h>
33 34
#include <sys/types.h>
#include <sys/wait.h>
B
Bruce Momjian 已提交
35

36 37 38 39
#ifdef WIN32
#include <io.h>
#endif

40
#include "libpq/libpq-fs.h"
41

42 43 44
#define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
#define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"

45 46 47 48 49 50 51
/* state needed to save/restore an archive's output target */
typedef struct _outputContext
{
	void	   *OF;
	int			gzOut;
} OutputContext;

52
/* translator: this is a module name */
53
static const char *modulename = gettext_noop("archiver");
54 55


B
Bruce Momjian 已提交
56
static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
A
Andrew Dunstan 已提交
57
	 const int compression, ArchiveMode mode, SetupWorkerPtr setupWorkerPtr);
58
static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
B
Bruce Momjian 已提交
59
					  ArchiveHandle *AH);
60
static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isData, bool acl_pass);
61
static char *replace_line_endings(const char *str);
62
static void _doSetFixedOutputState(ArchiveHandle *AH);
63
static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
64
static void _doSetWithOids(ArchiveHandle *AH, const bool withOids);
65
static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
66 67
static void _becomeUser(ArchiveHandle *AH, const char *user);
static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
68
static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
69
static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
70 71
static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
72
static teReqs _tocEntryRequired(TocEntry *te, teSection curSection, RestoreOptions *ropt);
73
static bool _tocEntryIsACL(TocEntry *te);
B
Bruce Momjian 已提交
74 75
static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
76
static void buildTocEntryArrays(ArchiveHandle *AH);
77
static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
B
Bruce Momjian 已提交
78
static int	_discoverArchiveFormat(ArchiveHandle *AH);
B
Bruce Momjian 已提交
79

80
static int	RestoringToDB(ArchiveHandle *AH);
81
static void dump_lo_buf(ArchiveHandle *AH);
82
static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
83
static void SetOutput(ArchiveHandle *AH, const char *filename, int compression);
84 85
static OutputContext SaveOutput(ArchiveHandle *AH);
static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
86

87
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
88
				  RestoreOptions *ropt, bool is_parallel);
A
Andrew Dunstan 已提交
89 90 91 92
static void restore_toc_entries_prefork(ArchiveHandle *AH);
static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
							 TocEntry *pending_list);
static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list);
93 94 95
static void par_list_header_init(TocEntry *l);
static void par_list_append(TocEntry *l, TocEntry *te);
static void par_list_remove(TocEntry *te);
96
static TocEntry *get_next_work_item(ArchiveHandle *AH,
97
				   TocEntry *ready_list,
A
Andrew Dunstan 已提交
98
				   ParallelState *pstate);
99
static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
A
Andrew Dunstan 已提交
100 101
			   int worker, int status,
			   ParallelState *pstate);
102
static void fix_dependencies(ArchiveHandle *AH);
103
static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
104 105
static void repoint_table_dependencies(ArchiveHandle *AH);
static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
106
static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
B
Bruce Momjian 已提交
107
					TocEntry *ready_list);
108 109
static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
110

111
/*
T
Tom Lane 已提交
112
 * Allocate a new DumpOptions block containing all default values.
113 114 115 116
 */
DumpOptions *
NewDumpOptions(void)
{
T
Tom Lane 已提交
117
	DumpOptions *opts = (DumpOptions *) pg_malloc(sizeof(DumpOptions));
118

T
Tom Lane 已提交
119 120 121
	InitDumpOptions(opts);
	return opts;
}
122

T
Tom Lane 已提交
123 124 125 126 127 128 129
/*
 * Initialize a DumpOptions struct to all default values
 */
void
InitDumpOptions(DumpOptions *opts)
{
	memset(opts, 0, sizeof(DumpOptions));
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
	/* set any fields that shouldn't default to zeroes */
	opts->include_everything = true;
	opts->dumpSections = DUMP_UNSECTIONED;
}

/*
 * Create a freshly allocated DumpOptions with options equivalent to those
 * found in the given RestoreOptions.
 */
DumpOptions *
dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
{
	DumpOptions *dopt = NewDumpOptions();

	/* this is the inverse of what's at the end of pg_dump.c's main() */
	dopt->outputClean = ropt->dropSchema;
	dopt->dataOnly = ropt->dataOnly;
	dopt->schemaOnly = ropt->schemaOnly;
	dopt->if_exists = ropt->if_exists;
	dopt->column_inserts = ropt->column_inserts;
	dopt->dumpSections = ropt->dumpSections;
	dopt->aclsSkip = ropt->aclsSkip;
	dopt->outputSuperuser = ropt->superuser;
	dopt->outputCreateDB = ropt->createDB;
	dopt->outputNoOwner = ropt->noOwner;
	dopt->outputNoTablespaces = ropt->noTablespace;
	dopt->disable_triggers = ropt->disable_triggers;
	dopt->use_setsessauth = ropt->use_setsessauth;

	dopt->disable_dollar_quoting = ropt->disable_dollar_quoting;
	dopt->dump_inserts = ropt->dump_inserts;
	dopt->no_security_labels = ropt->no_security_labels;
	dopt->lockWaitTimeout = ropt->lockWaitTimeout;
	dopt->include_everything = ropt->include_everything;
	dopt->enable_row_security = ropt->enable_row_security;

	return dopt;
}


B
Bruce Momjian 已提交
170
/*
B
Bruce Momjian 已提交
171 172 173 174
 *	Wrapper functions.
 *
 *	The objective it to make writing new formats and dumpers as simple
 *	as possible, if necessary at the expense of extra function calls etc.
B
Bruce Momjian 已提交
175 176 177
 *
 */

A
Andrew Dunstan 已提交
178 179 180 181 182 183
/*
 * The dump worker setup needs lots of knowledge of the internals of pg_dump,
 * so It's defined in pg_dump.c and passed into OpenArchive. The restore worker
 * setup doesn't need to know anything much, so it's defined here.
 */
static void
184
setupRestoreWorker(Archive *AHX, DumpOptions *dopt, RestoreOptions *ropt)
A
Andrew Dunstan 已提交
185 186 187 188 189 190
{
	ArchiveHandle *AH = (ArchiveHandle *) AHX;

	(AH->ReopenPtr) (AH);
}

B
Bruce Momjian 已提交
191 192 193

/* Create a new archive */
/* Public */
194
Archive *
B
Bruce Momjian 已提交
195
CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
A
Andrew Dunstan 已提交
196
	 const int compression, ArchiveMode mode, SetupWorkerPtr setupDumpWorker)
197

B
Bruce Momjian 已提交
198
{
A
Andrew Dunstan 已提交
199
	ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, mode, setupDumpWorker);
B
Bruce Momjian 已提交
200 201

	return (Archive *) AH;
B
Bruce Momjian 已提交
202 203 204 205
}

/* Open an existing archive */
/* Public */
206
Archive *
B
Bruce Momjian 已提交
207
OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
B
Bruce Momjian 已提交
208
{
A
Andrew Dunstan 已提交
209
	ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, archModeRead, setupRestoreWorker);
B
Bruce Momjian 已提交
210 211

	return (Archive *) AH;
B
Bruce Momjian 已提交
212 213 214
}

/* Public */
B
Bruce Momjian 已提交
215
void
216
CloseArchive(Archive *AHX, DumpOptions *dopt)
B
Bruce Momjian 已提交
217
{
B
Bruce Momjian 已提交
218 219 220
	int			res = 0;
	ArchiveHandle *AH = (ArchiveHandle *) AHX;

221
	(*AH->ClosePtr) (AH, dopt);
B
Bruce Momjian 已提交
222

B
Bruce Momjian 已提交
223 224
	/* Close the output */
	if (AH->gzOut)
225
		res = GZCLOSE(AH->OF);
B
Bruce Momjian 已提交
226
	else if (AH->OF != stdout)
227 228 229
		res = fclose(AH->OF);

	if (res != 0)
230 231
		exit_horribly(modulename, "could not close output file: %s\n",
					  strerror(errno));
B
Bruce Momjian 已提交
232 233 234
}

/* Public */
B
Bruce Momjian 已提交
235
void
236
SetArchiveRestoreOptions(Archive *AHX, RestoreOptions *ropt)
B
Bruce Momjian 已提交
237
{
B
Bruce Momjian 已提交
238
	ArchiveHandle *AH = (ArchiveHandle *) AHX;
239 240 241 242 243 244 245 246 247 248
	TocEntry   *te;
	teSection	curSection;

	/* Save options for later access */
	AH->ropt = ropt;

	/* Decide which TOC entries will be dumped/restored, and mark them */
	curSection = SECTION_PRE_DATA;
	for (te = AH->toc->next; te != AH->toc; te = te->next)
	{
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
		/*
		 * When writing an archive, we also take this opportunity to check
		 * that we have generated the entries in a sane order that respects
		 * the section divisions.  When reading, don't complain, since buggy
		 * old versions of pg_dump might generate out-of-order archives.
		 */
		if (AH->mode != archModeRead)
		{
			switch (te->section)
			{
				case SECTION_NONE:
					/* ok to be anywhere */
					break;
				case SECTION_PRE_DATA:
					if (curSection != SECTION_PRE_DATA)
						write_msg(modulename,
								  "WARNING: archive items not in correct section order\n");
					break;
				case SECTION_DATA:
					if (curSection == SECTION_POST_DATA)
						write_msg(modulename,
								  "WARNING: archive items not in correct section order\n");
					break;
				case SECTION_POST_DATA:
					/* ok no matter which section we were in */
					break;
				default:
					exit_horribly(modulename, "unexpected section code %d\n",
								  (int) te->section);
					break;
			}
		}

282 283
		if (te->section != SECTION_NONE)
			curSection = te->section;
284

285 286 287 288 289 290 291 292 293 294
		te->reqs = _tocEntryRequired(te, curSection, ropt);
	}
}

/* Public */
void
RestoreArchive(Archive *AHX)
{
	ArchiveHandle *AH = (ArchiveHandle *) AHX;
	RestoreOptions *ropt = AH->ropt;
295
	bool		parallel_mode;
296
	TocEntry   *te;
B
Bruce Momjian 已提交
297
	OutputContext sav;
B
Bruce Momjian 已提交
298

299
	AH->stage = STAGE_INITIALIZING;
300

301 302 303
	/*
	 * Check for nonsensical option combinations.
	 *
304
	 * -C is not compatible with -1, because we can't create a database inside
305
	 * a transaction block.
306
	 */
307
	if (ropt->createDB && ropt->single_txn)
308
		exit_horribly(modulename, "-C and -1 are incompatible options\n");
309

310 311 312
	/*
	 * If we're going to do parallel restore, there are some restrictions.
	 */
A
Andrew Dunstan 已提交
313
	parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
314 315 316 317
	if (parallel_mode)
	{
		/* We haven't got round to making this work for all archive formats */
		if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
318
			exit_horribly(modulename, "parallel restore is not supported with this archive file format\n");
319 320 321

		/* Doesn't work if the archive represents dependencies as OIDs */
		if (AH->version < K_VERS_1_8)
322
			exit_horribly(modulename, "parallel restore is not supported with archives made by pre-8.0 pg_dump\n");
323 324 325 326 327 328 329 330

		/*
		 * It's also not gonna work if we can't reopen the input file, so
		 * let's try that immediately.
		 */
		(AH->ReopenPtr) (AH);
	}

331 332 333 334
	/*
	 * Make sure we won't need (de)compression we haven't got
	 */
#ifndef HAVE_LIBZ
335
	if (AH->compression != 0 && AH->PrintTocDataPtr !=NULL)
336 337 338
	{
		for (te = AH->toc->next; te != AH->toc; te = te->next)
		{
339
			if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
340
				exit_horribly(modulename, "cannot restore from compressed archive (compression not supported in this installation)\n");
341 342 343 344
		}
	}
#endif

345 346 347 348 349 350 351
	/*
	 * Prepare index arrays, so we can assume we have them throughout restore.
	 * It's possible we already did this, though.
	 */
	if (AH->tocsByDumpId == NULL)
		buildTocEntryArrays(AH);

352 353 354 355 356
	/*
	 * If we're using a DB connection, then connect it.
	 */
	if (ropt->useDB)
	{
357
		ahlog(AH, 1, "connecting to database for restore\n");
358
		if (AH->version < K_VERS_1_3)
359
			exit_horribly(modulename, "direct database connections are not supported in pre-1.3 archives\n");
360

361 362 363 364 365 366
		/*
		 * We don't want to guess at whether the dump will successfully
		 * restore; allow the attempt regardless of the version of the restore
		 * target.
		 */
		AHX->minRemoteVersion = 0;
367 368
		AHX->maxRemoteVersion = 999999;

369 370
		ConnectDatabase(AHX, ropt->dbname,
						ropt->pghost, ropt->pgport, ropt->username,
371
						ropt->promptPassword);
B
Bruce Momjian 已提交
372 373

		/*
B
Bruce Momjian 已提交
374 375
		 * If we're talking to the DB directly, don't send comments since they
		 * obscure SQL when displaying errors
B
Bruce Momjian 已提交
376
		 */
377
		AH->noTocComments = 1;
378 379
	}

380
	/*
B
Bruce Momjian 已提交
381 382 383 384
	 * Work out if we have an implied data-only restore. This can happen if
	 * the dump was data only or if the user has used a toc list to exclude
	 * all of the schema data. All we do is look for schema entries - if none
	 * are found then we set the dataOnly flag.
385
	 *
B
Bruce Momjian 已提交
386
	 * We could scan for wanted TABLE entries, but that is not the same as
387
	 * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
B
Bruce Momjian 已提交
388 389 390
	 */
	if (!ropt->dataOnly)
	{
B
Bruce Momjian 已提交
391
		int			impliedDataOnly = 1;
392 393

		for (te = AH->toc->next; te != AH->toc; te = te->next)
B
Bruce Momjian 已提交
394
		{
395
			if ((te->reqs & REQ_SCHEMA) != 0)
B
Bruce Momjian 已提交
396
			{					/* It's schema, and it's wanted */
397 398 399 400 401 402 403
				impliedDataOnly = 0;
				break;
			}
		}
		if (impliedDataOnly)
		{
			ropt->dataOnly = impliedDataOnly;
404
			ahlog(AH, 1, "implied data-only restore\n");
405
		}
B
Bruce Momjian 已提交
406
	}
407

408
	/*
B
Bruce Momjian 已提交
409
	 * Setup the output file if necessary.
B
Bruce Momjian 已提交
410
	 */
411
	sav = SaveOutput(AH);
B
Bruce Momjian 已提交
412
	if (ropt->filename || ropt->compression)
413
		SetOutput(AH, ropt->filename, ropt->compression);
B
Bruce Momjian 已提交
414

415
	ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
B
Bruce Momjian 已提交
416

417 418 419 420 421 422 423 424 425
	if (AH->archiveRemoteVersion)
		ahprintf(AH, "-- Dumped from database version %s\n",
				 AH->archiveRemoteVersion);
	if (AH->archiveDumpVersion)
		ahprintf(AH, "-- Dumped by pg_dump version %s\n",
				 AH->archiveDumpVersion);

	ahprintf(AH, "\n");

426
	if (AH->public.verbose)
427 428
		dumpTimestamp(AH, "Started on", AH->createDate);

429
	if (ropt->single_txn)
430 431
	{
		if (AH->connection)
432
			StartTransaction(AHX);
433 434 435
		else
			ahprintf(AH, "BEGIN;\n\n");
	}
436

437 438 439
	/*
	 * Enable row-security if necessary.
	 */
S
Stephen Frost 已提交
440 441 442 443 444 445 446
	if (PQserverVersion(AH->connection) >= 90500)
	{
		if (!ropt->enable_row_security)
			ahprintf(AH, "SET row_security = off;\n");
		else
			ahprintf(AH, "SET row_security = on;\n");
	}
447

448 449 450 451 452
	/*
	 * Establish important parameter values right away.
	 */
	_doSetFixedOutputState(AH);

453 454
	AH->stage = STAGE_PROCESSING;

B
Bruce Momjian 已提交
455 456
	/*
	 * Drop the items at the start, in reverse order
457
	 */
B
Bruce Momjian 已提交
458 459
	if (ropt->dropSchema)
	{
460
		for (te = AH->toc->prev; te != AH->toc; te = te->prev)
B
Bruce Momjian 已提交
461
		{
462 463
			AH->currentTE = te;

464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482
			/*
			 * In createDB mode, issue a DROP *only* for the database as a
			 * whole.  Issuing drops against anything else would be wrong,
			 * because at this point we're connected to the wrong database.
			 * Conversely, if we're not in createDB mode, we'd better not
			 * issue a DROP against the database at all.
			 */
			if (ropt->createDB)
			{
				if (strcmp(te->desc, "DATABASE") != 0)
					continue;
			}
			else
			{
				if (strcmp(te->desc, "DATABASE") == 0)
					continue;
			}

			/* Otherwise, drop anything that's selected and has a dropStmt */
483
			if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
484
			{
485
				ahlog(AH, 1, "dropping %s %s\n", te->desc, te->tag);
486
				/* Select owner and schema as necessary */
487
				_becomeOwner(AH, te);
488
				_selectOutputSchema(AH, te->namespace);
489 490 491 492 493 494 495 496 497 498

				/*
				 * Now emit the DROP command, if the object has one.  Note we
				 * don't necessarily emit it verbatim; at this point we add an
				 * appropriate IF EXISTS clause, if the user requested it.
				 */
				if (*te->dropStmt != '\0')
				{
					if (!ropt->if_exists)
					{
B
Bruce Momjian 已提交
499
						/* No --if-exists?	Then just use the original */
500 501 502 503 504
						ahprintf(AH, "%s", te->dropStmt);
					}
					else
					{
						/*
505 506 507 508 509 510
						 * Inject an appropriate spelling of "if exists".  For
						 * large objects, we have a separate routine that
						 * knows how to do it, without depending on
						 * te->dropStmt; use that.  For other objects we need
						 * to parse the command.
						 *
511
						 */
512
						if (strncmp(te->desc, "BLOB", 4) == 0)
513
						{
514
							DropBlobIfExists(AH, te->catalogId.oid);
515 516 517
						}
						else
						{
518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
							char		buffer[40];
							char	   *mark;
							char	   *dropStmt = pg_strdup(te->dropStmt);
							char	   *dropStmtPtr = dropStmt;
							PQExpBuffer ftStmt = createPQExpBuffer();

							/*
							 * Need to inject IF EXISTS clause after ALTER
							 * TABLE part in ALTER TABLE .. DROP statement
							 */
							if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
							{
								appendPQExpBuffer(ftStmt,
												  "ALTER TABLE IF EXISTS");
								dropStmt = dropStmt + 11;
							}

							/*
							 * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
							 * not support the IF EXISTS clause, and therefore
							 * we simply emit the original command for such
							 * objects. For other objects, we need to extract
							 * the first part of the DROP which includes the
							 * object type. Most of the time this matches
							 * te->desc, so search for that; however for the
							 * different kinds of CONSTRAINTs, we know to
							 * search for hardcoded "DROP CONSTRAINT" instead.
							 */
							if (strcmp(te->desc, "DEFAULT") == 0)
								appendPQExpBuffer(ftStmt, "%s", dropStmt);
548
							else
549 550 551 552 553 554 555 556
							{
								if (strcmp(te->desc, "CONSTRAINT") == 0 ||
								 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
									strcmp(te->desc, "FK CONSTRAINT") == 0)
									strcpy(buffer, "DROP CONSTRAINT");
								else
									snprintf(buffer, sizeof(buffer), "DROP %s",
											 te->desc);
557

558 559
								mark = strstr(dropStmt, buffer);
								Assert(mark != NULL);
560

561 562 563 564 565
								*mark = '\0';
								appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
												  dropStmt, buffer,
												  mark + strlen(buffer));
							}
566

567
							ahprintf(AH, "%s", ftStmt->data);
568

569
							destroyPQExpBuffer(ftStmt);
570

571 572
							pg_free(dropStmtPtr);
						}
573 574
					}
				}
575 576
			}
		}
577 578 579 580 581 582 583 584 585 586 587 588 589 590 591

		/*
		 * _selectOutputSchema may have set currSchema to reflect the effect
		 * of a "SET search_path" command it emitted.  However, by now we may
		 * have dropped that schema; or it might not have existed in the first
		 * place.  In either case the effective value of search_path will not
		 * be what we think.  Forcibly reset currSchema so that we will
		 * re-establish the search_path setting when needed (after creating
		 * the schema).
		 *
		 * If we treated users as pg_dump'able objects then we'd need to reset
		 * currUser here too.
		 */
		if (AH->currSchema)
			free(AH->currSchema);
592
		AH->currSchema = NULL;
B
Bruce Momjian 已提交
593
	}
B
Bruce Momjian 已提交
594

595
	/*
596 597 598
	 * In serial mode, we now process each non-ACL TOC entry.
	 *
	 * In parallel mode, turn control over to the parallel-restore logic.
599
	 */
600
	if (parallel_mode)
A
Andrew Dunstan 已提交
601 602 603 604 605 606 607 608 609 610 611
	{
		ParallelState *pstate;
		TocEntry	pending_list;

		par_list_header_init(&pending_list);

		/* This runs PRE_DATA items and then disconnects from the database */
		restore_toc_entries_prefork(AH);
		Assert(AH->connection == NULL);

		/* ParallelBackupStart() will actually fork the processes */
612
		pstate = ParallelBackupStart(AH, NULL, ropt);
A
Andrew Dunstan 已提交
613 614 615 616 617 618 619
		restore_toc_entries_parallel(AH, pstate, &pending_list);
		ParallelBackupEnd(AH, pstate);

		/* reconnect the master and see if we missed something */
		restore_toc_entries_postfork(AH, &pending_list);
		Assert(AH->connection != NULL);
	}
620
	else
B
Bruce Momjian 已提交
621
	{
622 623 624
		for (te = AH->toc->next; te != AH->toc; te = te->next)
			(void) restore_toc_entry(AH, te, ropt, false);
	}
B
Bruce Momjian 已提交
625

626 627 628
	/*
	 * Scan TOC again to output ownership commands and ACLs
	 */
629
	for (te = AH->toc->next; te != AH->toc; te = te->next)
630
	{
631 632
		AH->currentTE = te;

633
		/* Both schema and data objects might now have ownership/ACLs */
634
		if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0)
635
		{
636 637 638 639 640 641 642
			/* Show namespace if available */
			if (te->namespace)
				ahlog(AH, 1, "setting owner and privileges for %s \"%s\".\"%s\"\n",
					  te->desc, te->namespace, te->tag);
			else
				ahlog(AH, 1, "setting owner and privileges for %s \"%s\"\n",
					  te->desc, te->tag);
643 644 645 646
			_printTocEntry(AH, te, ropt, false, true);
		}
	}

647
	if (ropt->single_txn)
648 649
	{
		if (AH->connection)
650
			CommitTransaction(AHX);
651 652 653
		else
			ahprintf(AH, "COMMIT;\n\n");
	}
B
Bruce Momjian 已提交
654

655 656 657
	if (AH->public.verbose)
		dumpTimestamp(AH, "Completed on", time(NULL));

658 659
	ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");

660
	/*
661
	 * Clean up & we're done.
662
	 */
663 664
	AH->stage = STAGE_FINALIZING;

665
	if (ropt->filename || ropt->compression)
666
		RestoreOutput(AH, sav);
667 668

	if (ropt->useDB)
R
Robert Haas 已提交
669
		DisconnectDatabase(&AH->public);
B
Bruce Momjian 已提交
670 671
}

672 673 674 675 676 677 678 679 680 681 682
/*
 * Restore a single TOC item.  Used in both parallel and non-parallel restore;
 * is_parallel is true if we are in a worker child process.
 *
 * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
 * the parallel parent has to make the corresponding status update.
 */
static int
restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
				  RestoreOptions *ropt, bool is_parallel)
{
A
Andrew Dunstan 已提交
683
	int			status = WORKER_OK;
684 685 686 687 688 689
	teReqs		reqs;
	bool		defnDumped;

	AH->currentTE = te;

	/* Work out what, if anything, we want from this entry */
690 691 692 693 694 695 696
	if (_tocEntryIsACL(te))
		reqs = 0;				/* ACLs are never restored here */
	else
		reqs = te->reqs;

	/*
	 * Ignore DATABASE entry unless we should create it.  We must check this
697 698
	 * here, not in _tocEntryRequired, because the createDB option should not
	 * affect emitting a DATABASE entry to an archive file.
699 700 701
	 */
	if (!ropt->createDB && strcmp(te->desc, "DATABASE") == 0)
		reqs = 0;
702 703 704 705 706 707 708 709 710 711 712 713

	/* Dump any relevant dump warnings to stderr */
	if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
	{
		if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
			write_msg(modulename, "warning from original dump file: %s\n", te->defn);
		else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
			write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
	}

	defnDumped = false;

714
	if ((reqs & REQ_SCHEMA) != 0)		/* We want the schema */
715
	{
716 717 718 719 720 721 722
		/* Show namespace if available */
		if (te->namespace)
			ahlog(AH, 1, "creating %s \"%s\".\"%s\"\n",
				  te->desc, te->namespace, te->tag);
		else
			ahlog(AH, 1, "creating %s \"%s\"\n", te->desc, te->tag);

723 724 725 726 727 728 729 730 731

		_printTocEntry(AH, te, ropt, false, false);
		defnDumped = true;

		if (strcmp(te->desc, "TABLE") == 0)
		{
			if (AH->lastErrorTE == te)
			{
				/*
732 733 734
				 * We failed to create the table. If
				 * --no-data-for-failed-tables was given, mark the
				 * corresponding TABLE DATA to be ignored.
735
				 *
736 737
				 * In the parallel case this must be done in the parent, so we
				 * just set the return value.
738 739 740 741
				 */
				if (ropt->noDataForFailedTables)
				{
					if (is_parallel)
A
Andrew Dunstan 已提交
742
						status = WORKER_INHIBIT_DATA;
743 744 745 746 747 748 749
					else
						inhibit_data_for_failed_table(AH, te);
				}
			}
			else
			{
				/*
750 751
				 * We created the table successfully.  Mark the corresponding
				 * TABLE DATA for possible truncation.
752
				 *
753 754
				 * In the parallel case this must be done in the parent, so we
				 * just set the return value.
755 756
				 */
				if (is_parallel)
A
Andrew Dunstan 已提交
757
					status = WORKER_CREATE_DONE;
758 759 760 761 762 763 764 765 766 767
				else
					mark_create_done(AH, te);
			}
		}

		/* If we created a DB, connect to it... */
		if (strcmp(te->desc, "DATABASE") == 0)
		{
			ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
			_reconnectToDB(AH, te->tag);
768
			ropt->dbname = pg_strdup(te->tag);
769 770 771 772 773 774 775 776 777
		}
	}

	/*
	 * If we have a data component, then process it
	 */
	if ((reqs & REQ_DATA) != 0)
	{
		/*
778 779 780
		 * hadDumper will be set if there is genuine data component for this
		 * node. Otherwise, we need to check the defn field for statements
		 * that need to be executed in data-only restores.
781 782 783 784 785 786
		 */
		if (te->hadDumper)
		{
			/*
			 * If we can output the data, then restore it.
			 */
787
			if (AH->PrintTocDataPtr !=NULL)
788 789 790 791 792 793
			{
				_printTocEntry(AH, te, ropt, true, false);

				if (strcmp(te->desc, "BLOBS") == 0 ||
					strcmp(te->desc, "BLOB COMMENTS") == 0)
				{
794
					ahlog(AH, 1, "processing %s\n", te->desc);
795 796 797

					_selectOutputSchema(AH, "pg_catalog");

798 799 800 801
					/* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
					if (strcmp(te->desc, "BLOB COMMENTS") == 0)
						AH->outputKind = OUTPUT_OTHERDATA;

802
					(*AH->PrintTocDataPtr) (AH, te, ropt);
803 804

					AH->outputKind = OUTPUT_SQLCMDS;
805 806 807 808 809 810 811 812 813
				}
				else
				{
					_disableTriggersIfNecessary(AH, te, ropt);

					/* Select owner and schema as necessary */
					_becomeOwner(AH, te);
					_selectOutputSchema(AH, te->namespace);

814 815
					ahlog(AH, 1, "processing data for table \"%s\".\"%s\"\n",
						  te->namespace, te->tag);
816 817

					/*
818 819
					 * In parallel restore, if we created the table earlier in
					 * the run then we wrap the COPY in a transaction and
B
Bruce Momjian 已提交
820 821
					 * precede it with a TRUNCATE.  If archiving is not on
					 * this prevents WAL-logging the COPY.  This obtains a
822 823
					 * speedup similar to that from using single_txn mode in
					 * non-parallel restores.
824 825 826 827 828 829 830
					 */
					if (is_parallel && te->created)
					{
						/*
						 * Parallel restore is always talking directly to a
						 * server, so no need to see if we should issue BEGIN.
						 */
831
						StartTransaction(&AH->public);
832 833 834 835 836 837 838 839 840 841 842 843 844

						/*
						 * If the server version is >= 8.4, make sure we issue
						 * TRUNCATE with ONLY so that child tables are not
						 * wiped.
						 */
						ahprintf(AH, "TRUNCATE TABLE %s%s;\n\n",
								 (PQserverVersion(AH->connection) >= 80400 ?
								  "ONLY " : ""),
								 fmtId(te->tag));
					}

					/*
845
					 * If we have a copy statement, use it.
846 847 848 849
					 */
					if (te->copyStmt && strlen(te->copyStmt) > 0)
					{
						ahprintf(AH, "%s", te->copyStmt);
850
						AH->outputKind = OUTPUT_COPYDATA;
851
					}
852 853
					else
						AH->outputKind = OUTPUT_OTHERDATA;
854 855 856

					(*AH->PrintTocDataPtr) (AH, te, ropt);

857 858 859
					/*
					 * Terminate COPY if needed.
					 */
860 861
					if (AH->outputKind == OUTPUT_COPYDATA &&
						RestoringToDB(AH))
862
						EndDBCopyMode(&AH->public, te->tag);
863
					AH->outputKind = OUTPUT_SQLCMDS;
864 865 866

					/* close out the transaction started above */
					if (is_parallel && te->created)
867
						CommitTransaction(&AH->public);
868 869 870 871 872 873 874 875 876 877 878 879 880

					_enableTriggersIfNecessary(AH, te, ropt);
				}
			}
		}
		else if (!defnDumped)
		{
			/* If we haven't already dumped the defn part, do so now */
			ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
			_printTocEntry(AH, te, ropt, false, false);
		}
	}

A
Andrew Dunstan 已提交
881 882 883 884
	if (AH->public.n_errors > 0 && status == WORKER_OK)
		status = WORKER_IGNORED_ERRORS;

	return status;
885 886
}

887 888 889 890
/*
 * Allocate a new RestoreOptions block.
 * This is mainly so we can initialize it, but also for future expansion,
 */
B
Bruce Momjian 已提交
891 892
RestoreOptions *
NewRestoreOptions(void)
B
Bruce Momjian 已提交
893
{
B
Bruce Momjian 已提交
894
	RestoreOptions *opts;
B
Bruce Momjian 已提交
895

896
	opts = (RestoreOptions *) pg_malloc0(sizeof(RestoreOptions));
B
Bruce Momjian 已提交
897

898
	/* set any fields that shouldn't default to zeroes */
B
Bruce Momjian 已提交
899
	opts->format = archUnknown;
900
	opts->promptPassword = TRI_DEFAULT;
901
	opts->dumpSections = DUMP_UNSECTIONED;
B
Bruce Momjian 已提交
902 903 904 905

	return opts;
}

B
Bruce Momjian 已提交
906 907
static void
_disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
B
Bruce Momjian 已提交
908
{
909 910
	/* This hack is only needed in a data-only restore */
	if (!ropt->dataOnly || !ropt->disable_triggers)
911 912
		return;

913 914
	ahlog(AH, 1, "disabling triggers for %s\n", te->tag);

915
	/*
B
Bruce Momjian 已提交
916
	 * Become superuser if possible, since they are the only ones who can
917 918 919
	 * disable constraint triggers.  If -S was not given, assume the initial
	 * user identity is a superuser.  (XXX would it be better to become the
	 * table owner?)
920
	 */
921
	_becomeUser(AH, ropt->superuser);
922 923

	/*
924
	 * Disable them.
925
	 */
926
	_selectOutputSchema(AH, te->namespace);
927

928 929
	ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
			 fmtId(te->tag));
B
Bruce Momjian 已提交
930 931
}

B
Bruce Momjian 已提交
932 933
static void
_enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
B
Bruce Momjian 已提交
934
{
935 936
	/* This hack is only needed in a data-only restore */
	if (!ropt->dataOnly || !ropt->disable_triggers)
937 938
		return;

939 940
	ahlog(AH, 1, "enabling triggers for %s\n", te->tag);

941
	/*
B
Bruce Momjian 已提交
942
	 * Become superuser if possible, since they are the only ones who can
943 944 945
	 * disable constraint triggers.  If -S was not given, assume the initial
	 * user identity is a superuser.  (XXX would it be better to become the
	 * table owner?)
946
	 */
947
	_becomeUser(AH, ropt->superuser);
948 949

	/*
950
	 * Enable them.
951
	 */
952
	_selectOutputSchema(AH, te->namespace);
953

954 955
	ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
			 fmtId(te->tag));
956
}
B
Bruce Momjian 已提交
957 958

/*
959
 * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
B
Bruce Momjian 已提交
960 961 962
 */

/* Public */
963
void
P
Peter Eisentraut 已提交
964
WriteData(Archive *AHX, const void *data, size_t dLen)
B
Bruce Momjian 已提交
965
{
B
Bruce Momjian 已提交
966
	ArchiveHandle *AH = (ArchiveHandle *) AHX;
B
Bruce Momjian 已提交
967

968
	if (!AH->currToc)
969
		exit_horribly(modulename, "internal error -- WriteData cannot be called outside the context of a DataDumper routine\n");
970

971 972 973
	(*AH->WriteDataPtr) (AH, data, dLen);

	return;
B
Bruce Momjian 已提交
974 975 976
}

/*
B
Bruce Momjian 已提交
977
 * Create a new TOC entry. The TOC was designed as a TOC, but is now the
B
Bruce Momjian 已提交
978 979 980 981
 * repository for all metadata. But the name has stuck.
 */

/* Public */
B
Bruce Momjian 已提交
982
void
983 984 985
ArchiveEntry(Archive *AHX,
			 CatalogId catalogId, DumpId dumpId,
			 const char *tag,
986
			 const char *namespace,
B
Bruce Momjian 已提交
987
			 const char *tablespace,
988
			 const char *owner, bool withOids,
989 990
			 const char *desc, teSection section,
			 const char *defn,
991 992
			 const char *dropStmt, const char *copyStmt,
			 const DumpId *deps, int nDeps,
B
Bruce Momjian 已提交
993
			 DataDumperPtr dumpFn, void *dumpArg)
B
Bruce Momjian 已提交
994
{
B
Bruce Momjian 已提交
995 996 997
	ArchiveHandle *AH = (ArchiveHandle *) AHX;
	TocEntry   *newToc;

998
	newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
B
Bruce Momjian 已提交
999

1000 1001 1002 1003
	AH->tocCount++;
	if (dumpId > AH->maxDumpId)
		AH->maxDumpId = dumpId;

B
Bruce Momjian 已提交
1004 1005 1006 1007 1008
	newToc->prev = AH->toc->prev;
	newToc->next = AH->toc;
	AH->toc->prev->next = newToc;
	AH->toc->prev = newToc;

1009 1010
	newToc->catalogId = catalogId;
	newToc->dumpId = dumpId;
1011
	newToc->section = section;
1012

1013 1014 1015 1016
	newToc->tag = pg_strdup(tag);
	newToc->namespace = namespace ? pg_strdup(namespace) : NULL;
	newToc->tablespace = tablespace ? pg_strdup(tablespace) : NULL;
	newToc->owner = pg_strdup(owner);
1017
	newToc->withOids = withOids;
1018 1019 1020 1021
	newToc->desc = pg_strdup(desc);
	newToc->defn = pg_strdup(defn);
	newToc->dropStmt = pg_strdup(dropStmt);
	newToc->copyStmt = copyStmt ? pg_strdup(copyStmt) : NULL;
1022

1023 1024
	if (nDeps > 0)
	{
1025
		newToc->dependencies = (DumpId *) pg_malloc(nDeps * sizeof(DumpId));
1026 1027 1028 1029 1030 1031 1032 1033
		memcpy(newToc->dependencies, deps, nDeps * sizeof(DumpId));
		newToc->nDeps = nDeps;
	}
	else
	{
		newToc->dependencies = NULL;
		newToc->nDeps = 0;
	}
1034

1035 1036
	newToc->dataDumper = dumpFn;
	newToc->dataDumperArg = dumpArg;
1037
	newToc->hadDumper = dumpFn ? true : false;
B
Bruce Momjian 已提交
1038

1039
	newToc->formatData = NULL;
B
Bruce Momjian 已提交
1040

B
Bruce Momjian 已提交
1041
	if (AH->ArchiveEntryPtr !=NULL)
B
Bruce Momjian 已提交
1042
		(*AH->ArchiveEntryPtr) (AH, newToc);
B
Bruce Momjian 已提交
1043 1044 1045
}

/* Public */
B
Bruce Momjian 已提交
1046 1047
void
PrintTOCSummary(Archive *AHX, RestoreOptions *ropt)
B
Bruce Momjian 已提交
1048
{
B
Bruce Momjian 已提交
1049
	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1050
	TocEntry   *te;
1051
	teSection	curSection;
B
Bruce Momjian 已提交
1052
	OutputContext sav;
1053
	const char *fmtName;
1054
	char		stamp_str[64];
B
Bruce Momjian 已提交
1055

1056
	sav = SaveOutput(AH);
B
Bruce Momjian 已提交
1057
	if (ropt->filename)
1058
		SetOutput(AH, ropt->filename, 0 /* no compression */ );
B
Bruce Momjian 已提交
1059

1060 1061 1062 1063
	if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
				 localtime(&AH->createDate)) == 0)
		strcpy(stamp_str, "[unknown]");

1064
	ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1065
	ahprintf(AH, ";     dbname: %s\n;     TOC Entries: %d\n;     Compression: %d\n",
B
Bruce Momjian 已提交
1066
			 AH->archdbname, AH->tocCount, AH->compression);
1067

B
Bruce Momjian 已提交
1068 1069
	switch (AH->format)
	{
1070 1071 1072
		case archCustom:
			fmtName = "CUSTOM";
			break;
1073 1074 1075
		case archDirectory:
			fmtName = "DIRECTORY";
			break;
1076 1077 1078 1079 1080 1081
		case archTar:
			fmtName = "TAR";
			break;
		default:
			fmtName = "UNKNOWN";
	}
1082 1083

	ahprintf(AH, ";     Dump Version: %d.%d-%d\n", AH->vmaj, AH->vmin, AH->vrev);
1084
	ahprintf(AH, ";     Format: %s\n", fmtName);
T
Tom Lane 已提交
1085 1086
	ahprintf(AH, ";     Integer: %d bytes\n", (int) AH->intSize);
	ahprintf(AH, ";     Offset: %d bytes\n", (int) AH->offSize);
1087 1088 1089 1090 1091 1092
	if (AH->archiveRemoteVersion)
		ahprintf(AH, ";     Dumped from database version: %s\n",
				 AH->archiveRemoteVersion);
	if (AH->archiveDumpVersion)
		ahprintf(AH, ";     Dumped by pg_dump version: %s\n",
				 AH->archiveDumpVersion);
1093

1094
	ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
B
Bruce Momjian 已提交
1095

1096
	curSection = SECTION_PRE_DATA;
1097
	for (te = AH->toc->next; te != AH->toc; te = te->next)
B
Bruce Momjian 已提交
1098
	{
1099 1100 1101 1102
		if (te->section != SECTION_NONE)
			curSection = te->section;
		if (ropt->verbose ||
			(_tocEntryRequired(te, curSection, ropt) & (REQ_SCHEMA | REQ_DATA)) != 0)
1103
			ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1104
					 te->catalogId.tableoid, te->catalogId.oid,
1105 1106
					 te->desc, te->namespace ? te->namespace : "-",
					 te->tag, te->owner);
1107 1108
		if (ropt->verbose && te->nDeps > 0)
		{
1109
			int			i;
1110 1111 1112 1113 1114 1115

			ahprintf(AH, ";\tdepends on:");
			for (i = 0; i < te->nDeps; i++)
				ahprintf(AH, " %d", te->dependencies[i]);
			ahprintf(AH, "\n");
		}
B
Bruce Momjian 已提交
1116
	}
B
Bruce Momjian 已提交
1117

B
Bruce Momjian 已提交
1118
	if (ropt->filename)
1119
		RestoreOutput(AH, sav);
B
Bruce Momjian 已提交
1120 1121
}

1122 1123 1124 1125 1126
/***********
 * BLOB Archival
 ***********/

/* Called by a dumper to signal start of a BLOB */
B
Bruce Momjian 已提交
1127
int
1128
StartBlob(Archive *AHX, Oid oid)
1129
{
B
Bruce Momjian 已提交
1130
	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1131

B
Bruce Momjian 已提交
1132
	if (!AH->StartBlobPtr)
1133
		exit_horribly(modulename, "large-object output not supported in chosen format\n");
1134

B
Bruce Momjian 已提交
1135
	(*AH->StartBlobPtr) (AH, AH->currToc, oid);
1136

B
Bruce Momjian 已提交
1137
	return 1;
1138 1139 1140
}

/* Called by a dumper to signal end of a BLOB */
B
Bruce Momjian 已提交
1141
int
1142
EndBlob(Archive *AHX, Oid oid)
1143
{
B
Bruce Momjian 已提交
1144
	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1145

B
Bruce Momjian 已提交
1146 1147
	if (AH->EndBlobPtr)
		(*AH->EndBlobPtr) (AH, AH->currToc, oid);
1148

B
Bruce Momjian 已提交
1149
	return 1;
1150 1151 1152 1153 1154 1155
}

/**********
 * BLOB Restoration
 **********/

1156
/*
B
Bruce Momjian 已提交
1157
 * Called by a format handler before any blobs are restored
1158
 */
B
Bruce Momjian 已提交
1159 1160
void
StartRestoreBlobs(ArchiveHandle *AH)
1161
{
1162 1163 1164
	if (!AH->ropt->single_txn)
	{
		if (AH->connection)
1165
			StartTransaction(&AH->public);
1166 1167 1168
		else
			ahprintf(AH, "BEGIN;\n\n");
	}
1169

1170 1171 1172 1173
	AH->blobCount = 0;
}

/*
B
Bruce Momjian 已提交
1174
 * Called by a format handler after all blobs are restored
1175
 */
B
Bruce Momjian 已提交
1176 1177
void
EndRestoreBlobs(ArchiveHandle *AH)
1178
{
1179 1180 1181
	if (!AH->ropt->single_txn)
	{
		if (AH->connection)
1182
			CommitTransaction(&AH->public);
1183 1184 1185
		else
			ahprintf(AH, "COMMIT;\n\n");
	}
1186

P
Peter Eisentraut 已提交
1187 1188 1189 1190
	ahlog(AH, 1, ngettext("restored %d large object\n",
						  "restored %d large objects\n",
						  AH->blobCount),
		  AH->blobCount);
1191 1192 1193
}


1194 1195 1196
/*
 * Called by a format handler to initiate restoration of a blob
 */
B
Bruce Momjian 已提交
1197
void
1198
StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
1199
{
1200
	bool		old_blob_style = (AH->version < K_VERS_1_12);
1201
	Oid			loOid;
1202

1203 1204
	AH->blobCount++;

1205 1206 1207
	/* Initialize the LO Buffer */
	AH->lo_buf_used = 0;

1208
	ahlog(AH, 1, "restoring large object with OID %u\n", oid);
1209

1210 1211
	/* With an old archive we must do drop and create logic here */
	if (old_blob_style && drop)
1212
		DropBlobIfExists(AH, oid);
1213

1214
	if (AH->connection)
1215
	{
1216 1217 1218 1219
		if (old_blob_style)
		{
			loOid = lo_create(AH->connection, oid);
			if (loOid == 0 || loOid != oid)
1220 1221
				exit_horribly(modulename, "could not create large object %u: %s",
							  oid, PQerrorMessage(AH->connection));
1222
		}
1223 1224
		AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
		if (AH->loFd == -1)
1225 1226
			exit_horribly(modulename, "could not open large object %u: %s",
						  oid, PQerrorMessage(AH->connection));
1227 1228 1229
	}
	else
	{
1230 1231 1232 1233 1234 1235
		if (old_blob_style)
			ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
					 oid, INV_WRITE);
		else
			ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
					 oid, INV_WRITE);
1236
	}
1237

B
Bruce Momjian 已提交
1238
	AH->writingBlob = 1;
1239 1240
}

B
Bruce Momjian 已提交
1241
void
1242
EndRestoreBlob(ArchiveHandle *AH, Oid oid)
1243
{
1244 1245 1246
	if (AH->lo_buf_used > 0)
	{
		/* Write remaining bytes from the LO buffer */
1247
		dump_lo_buf(AH);
1248
	}
1249

B
Bruce Momjian 已提交
1250
	AH->writingBlob = 0;
1251

1252
	if (AH->connection)
1253
	{
1254 1255 1256 1257 1258
		lo_close(AH->connection, AH->loFd);
		AH->loFd = -1;
	}
	else
	{
1259
		ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1260
	}
1261 1262
}

B
Bruce Momjian 已提交
1263 1264 1265 1266
/***********
 * Sorting and Reordering
 ***********/

B
Bruce Momjian 已提交
1267 1268
void
SortTocFromFile(Archive *AHX, RestoreOptions *ropt)
B
Bruce Momjian 已提交
1269
{
B
Bruce Momjian 已提交
1270 1271
	ArchiveHandle *AH = (ArchiveHandle *) AHX;
	FILE	   *fh;
1272 1273
	char		buf[100];
	bool		incomplete_line;
B
Bruce Momjian 已提交
1274 1275

	/* Allocate space for the 'wanted' array, and init it */
1276
	ropt->idWanted = (bool *) pg_malloc(sizeof(bool) * AH->maxDumpId);
1277
	memset(ropt->idWanted, 0, sizeof(bool) * AH->maxDumpId);
B
Bruce Momjian 已提交
1278

B
Bruce Momjian 已提交
1279 1280 1281
	/* Setup the file */
	fh = fopen(ropt->tocFile, PG_BINARY_R);
	if (!fh)
1282 1283
		exit_horribly(modulename, "could not open TOC file \"%s\": %s\n",
					  ropt->tocFile, strerror(errno));
B
Bruce Momjian 已提交
1284

1285
	incomplete_line = false;
1286
	while (fgets(buf, sizeof(buf), fh) != NULL)
B
Bruce Momjian 已提交
1287
	{
1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308
		bool		prev_incomplete_line = incomplete_line;
		int			buflen;
		char	   *cmnt;
		char	   *endptr;
		DumpId		id;
		TocEntry   *te;

		/*
		 * Some lines in the file might be longer than sizeof(buf).  This is
		 * no problem, since we only care about the leading numeric ID which
		 * can be at most a few characters; but we have to skip continuation
		 * bufferloads when processing a long line.
		 */
		buflen = strlen(buf);
		if (buflen > 0 && buf[buflen - 1] == '\n')
			incomplete_line = false;
		else
			incomplete_line = true;
		if (prev_incomplete_line)
			continue;

1309
		/* Truncate line at comment, if any */
B
Bruce Momjian 已提交
1310 1311 1312 1313
		cmnt = strchr(buf, ';');
		if (cmnt != NULL)
			cmnt[0] = '\0';

1314
		/* Ignore if all blank */
1315
		if (strspn(buf, " \t\r\n") == strlen(buf))
B
Bruce Momjian 已提交
1316 1317
			continue;

1318
		/* Get an ID, check it's valid and not already seen */
B
Bruce Momjian 已提交
1319
		id = strtol(buf, &endptr, 10);
1320 1321
		if (endptr == buf || id <= 0 || id > AH->maxDumpId ||
			ropt->idWanted[id - 1])
B
Bruce Momjian 已提交
1322
		{
1323
			write_msg(modulename, "WARNING: line ignored: %s\n", buf);
B
Bruce Momjian 已提交
1324 1325
			continue;
		}
B
Bruce Momjian 已提交
1326

B
Bruce Momjian 已提交
1327
		/* Find TOC entry */
1328
		te = getTocEntryByDumpId(AH, id);
B
Bruce Momjian 已提交
1329
		if (!te)
1330 1331
			exit_horribly(modulename, "could not find entry for ID %d\n",
						  id);
B
Bruce Momjian 已提交
1332

1333
		/* Mark it wanted */
1334
		ropt->idWanted[id - 1] = true;
B
Bruce Momjian 已提交
1335

1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347
		/*
		 * Move each item to the end of the list as it is selected, so that
		 * they are placed in the desired order.  Any unwanted items will end
		 * up at the front of the list, which may seem unintuitive but it's
		 * what we need.  In an ordinary serial restore that makes no
		 * difference, but in a parallel restore we need to mark unrestored
		 * items' dependencies as satisfied before we start examining
		 * restorable items.  Otherwise they could have surprising
		 * side-effects on the order in which restorable items actually get
		 * restored.
		 */
		_moveBefore(AH, AH->toc, te);
B
Bruce Momjian 已提交
1348
	}
B
Bruce Momjian 已提交
1349

B
Bruce Momjian 已提交
1350
	if (fclose(fh) != 0)
1351 1352
		exit_horribly(modulename, "could not close TOC file: %s\n",
					  strerror(errno));
B
Bruce Momjian 已提交
1353 1354 1355 1356 1357 1358 1359 1360
}

/**********************
 * 'Convenience functions that look like standard IO functions
 * for writing data when in dump mode.
 **********************/

/* Public */
1361
void
B
Bruce Momjian 已提交
1362 1363
archputs(const char *s, Archive *AH)
{
1364 1365
	WriteData(AH, s, strlen(s));
	return;
B
Bruce Momjian 已提交
1366 1367 1368
}

/* Public */
B
Bruce Momjian 已提交
1369 1370
int
archprintf(Archive *AH, const char *fmt,...)
B
Bruce Momjian 已提交
1371
{
1372 1373 1374
	char	   *p;
	size_t		len = 128;		/* initial assumption about buffer size */
	size_t		cnt;
B
Bruce Momjian 已提交
1375

1376
	for (;;)
B
Bruce Momjian 已提交
1377
	{
1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393
		va_list		args;

		/* Allocate work buffer. */
		p = (char *) pg_malloc(len);

		/* Try to format the data. */
		va_start(args, fmt);
		cnt = pvsnprintf(p, len, fmt, args);
		va_end(args);

		if (cnt < len)
			break;				/* success */

		/* Release buffer and loop around to try again with larger len. */
		free(p);
		len = cnt;
B
Bruce Momjian 已提交
1394
	}
1395

B
Bruce Momjian 已提交
1396 1397
	WriteData(AH, p, cnt);
	free(p);
1398
	return (int) cnt;
B
Bruce Momjian 已提交
1399 1400 1401 1402 1403 1404 1405
}


/*******************************
 * Stuff below here should be 'private' to the archiver routines
 *******************************/

1406
static void
1407
SetOutput(ArchiveHandle *AH, const char *filename, int compression)
B
Bruce Momjian 已提交
1408
{
1409
	int			fn;
B
Bruce Momjian 已提交
1410 1411

	if (filename)
1412
		fn = -1;
B
Bruce Momjian 已提交
1413 1414 1415 1416
	else if (AH->FH)
		fn = fileno(AH->FH);
	else if (AH->fSpec)
	{
1417
		fn = -1;
B
Bruce Momjian 已提交
1418 1419 1420 1421 1422 1423
		filename = AH->fSpec;
	}
	else
		fn = fileno(stdout);

	/* If compression explicitly requested, use gzopen */
1424
#ifdef HAVE_LIBZ
B
Bruce Momjian 已提交
1425 1426
	if (compression != 0)
	{
1427 1428 1429
		char		fmode[10];

		/* Don't use PG_BINARY_x since this is zlib */
1430
		sprintf(fmode, "wb%d", compression);
1431 1432
		if (fn >= 0)
			AH->OF = gzdopen(dup(fn), fmode);
B
Bruce Momjian 已提交
1433 1434
		else
			AH->OF = gzopen(filename, fmode);
1435
		AH->gzOut = 1;
B
Bruce Momjian 已提交
1436 1437
	}
	else
B
Bruce Momjian 已提交
1438
#endif
1439
	{							/* Use fopen */
1440 1441 1442 1443 1444 1445 1446
		if (AH->mode == archModeAppend)
		{
			if (fn >= 0)
				AH->OF = fdopen(dup(fn), PG_BINARY_A);
			else
				AH->OF = fopen(filename, PG_BINARY_A);
		}
B
Bruce Momjian 已提交
1447
		else
1448 1449 1450 1451 1452 1453
		{
			if (fn >= 0)
				AH->OF = fdopen(dup(fn), PG_BINARY_W);
			else
				AH->OF = fopen(filename, PG_BINARY_W);
		}
1454
		AH->gzOut = 0;
B
Bruce Momjian 已提交
1455
	}
B
Bruce Momjian 已提交
1456

1457
	if (!AH->OF)
1458 1459
	{
		if (filename)
1460 1461
			exit_horribly(modulename, "could not open output file \"%s\": %s\n",
						  filename, strerror(errno));
1462
		else
1463 1464
			exit_horribly(modulename, "could not open output file: %s\n",
						  strerror(errno));
1465
	}
1466 1467 1468 1469 1470 1471 1472 1473 1474
}

static OutputContext
SaveOutput(ArchiveHandle *AH)
{
	OutputContext sav;

	sav.OF = AH->OF;
	sav.gzOut = AH->gzOut;
1475

B
Bruce Momjian 已提交
1476
	return sav;
B
Bruce Momjian 已提交
1477 1478
}

1479
static void
1480
RestoreOutput(ArchiveHandle *AH, OutputContext savedContext)
B
Bruce Momjian 已提交
1481
{
B
Bruce Momjian 已提交
1482
	int			res;
1483

B
Bruce Momjian 已提交
1484
	if (AH->gzOut)
1485
		res = GZCLOSE(AH->OF);
B
Bruce Momjian 已提交
1486
	else
1487 1488 1489
		res = fclose(AH->OF);

	if (res != 0)
1490
		exit_horribly(modulename, "could not close output file: %s\n",
1491
					  strerror(errno));
B
Bruce Momjian 已提交
1492

1493 1494
	AH->gzOut = savedContext.gzOut;
	AH->OF = savedContext.OF;
B
Bruce Momjian 已提交
1495 1496 1497 1498 1499
}



/*
B
Bruce Momjian 已提交
1500
 *	Print formatted text to the output file (usually stdout).
B
Bruce Momjian 已提交
1501
 */
B
Bruce Momjian 已提交
1502 1503
int
ahprintf(ArchiveHandle *AH, const char *fmt,...)
B
Bruce Momjian 已提交
1504
{
1505 1506 1507
	char	   *p;
	size_t		len = 128;		/* initial assumption about buffer size */
	size_t		cnt;
B
Bruce Momjian 已提交
1508

1509
	for (;;)
1510
	{
1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526
		va_list		args;

		/* Allocate work buffer. */
		p = (char *) pg_malloc(len);

		/* Try to format the data. */
		va_start(args, fmt);
		cnt = pvsnprintf(p, len, fmt, args);
		va_end(args);

		if (cnt < len)
			break;				/* success */

		/* Release buffer and loop around to try again with larger len. */
		free(p);
		len = cnt;
B
Bruce Momjian 已提交
1527
	}
1528

B
Bruce Momjian 已提交
1529 1530
	ahwrite(p, 1, cnt, AH);
	free(p);
1531
	return (int) cnt;
B
Bruce Momjian 已提交
1532 1533
}

B
Bruce Momjian 已提交
1534 1535
void
ahlog(ArchiveHandle *AH, int level, const char *fmt,...)
1536 1537 1538 1539 1540 1541 1542
{
	va_list		ap;

	if (AH->debugLevel < level && (!AH->public.verbose || level > 1))
		return;

	va_start(ap, fmt);
T
Tom Lane 已提交
1543
	vwrite_msg(NULL, fmt, ap);
1544 1545 1546
	va_end(ap);
}

1547 1548 1549
/*
 * Single place for logic which says 'We are restoring to a direct DB connection'.
 */
1550
static int
B
Bruce Momjian 已提交
1551
RestoringToDB(ArchiveHandle *AH)
1552 1553 1554 1555
{
	return (AH->ropt && AH->ropt->useDB && AH->connection);
}

1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566
/*
 * Dump the current contents of the LO data buffer while writing a BLOB
 */
static void
dump_lo_buf(ArchiveHandle *AH)
{
	if (AH->connection)
	{
		size_t		res;

		res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
P
Peter Eisentraut 已提交
1567
		ahlog(AH, 5, ngettext("wrote %lu byte of large object data (result = %lu)\n",
1568
					 "wrote %lu bytes of large object data (result = %lu)\n",
P
Peter Eisentraut 已提交
1569
							  AH->lo_buf_used),
1570 1571
			  (unsigned long) AH->lo_buf_used, (unsigned long) res);
		if (res != AH->lo_buf_used)
1572
			exit_horribly(modulename,
B
Bruce Momjian 已提交
1573 1574
			"could not write to large object (result: %lu, expected: %lu)\n",
					   (unsigned long) res, (unsigned long) AH->lo_buf_used);
1575 1576 1577
	}
	else
	{
1578
		PQExpBuffer buf = createPQExpBuffer();
1579

1580 1581 1582 1583
		appendByteaLiteralAHX(buf,
							  (const unsigned char *) AH->lo_buf,
							  AH->lo_buf_used,
							  AH);
1584 1585 1586

		/* Hack: turn off writingBlob so ahwrite doesn't recurse to here */
		AH->writingBlob = 0;
1587
		ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1588 1589
		AH->writingBlob = 1;

1590
		destroyPQExpBuffer(buf);
1591 1592 1593 1594 1595
	}
	AH->lo_buf_used = 0;
}


B
Bruce Momjian 已提交
1596
/*
1597
 *	Write buffer to the output file (usually stdout). This is used for
B
Bruce Momjian 已提交
1598 1599
 *	outputting 'restore' scripts etc. It is even possible for an archive
 *	format to create a custom output routine to 'fake' a restore if it
1600
 *	wants to generate a script (see TAR output).
B
Bruce Momjian 已提交
1601
 */
1602
void
B
Bruce Momjian 已提交
1603
ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
B
Bruce Momjian 已提交
1604
{
B
Bruce Momjian 已提交
1605 1606
	int			bytes_written = 0;

B
Bruce Momjian 已提交
1607
	if (AH->writingBlob)
1608
	{
B
Bruce Momjian 已提交
1609
		size_t		remaining = size * nmemb;
1610 1611

		while (AH->lo_buf_used + remaining > AH->lo_buf_size)
P
Peter Eisentraut 已提交
1612
		{
1613 1614 1615 1616 1617 1618 1619
			size_t		avail = AH->lo_buf_size - AH->lo_buf_used;

			memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
			ptr = (const void *) ((const char *) ptr + avail);
			remaining -= avail;
			AH->lo_buf_used += avail;
			dump_lo_buf(AH);
P
Peter Eisentraut 已提交
1620 1621
		}

1622 1623 1624
		memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
		AH->lo_buf_used += remaining;

1625
		bytes_written = size * nmemb;
1626
	}
B
Bruce Momjian 已提交
1627
	else if (AH->gzOut)
1628
		bytes_written = GZWRITE(ptr, size, nmemb, AH->OF);
B
Bruce Momjian 已提交
1629
	else if (AH->CustomOutPtr)
1630
		bytes_written = AH->CustomOutPtr (AH, ptr, size * nmemb);
B
Bruce Momjian 已提交
1631

1632 1633 1634
	else
	{
		/*
B
Bruce Momjian 已提交
1635 1636 1637
		 * If we're doing a restore, and it's direct to DB, and we're
		 * connected then send it to the DB.
		 */
1638
		if (RestoringToDB(AH))
1639
			bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1640
		else
1641
			bytes_written = fwrite(ptr, size, nmemb, AH->OF) * size;
1642
	}
1643 1644 1645 1646 1647

	if (bytes_written != size * nmemb)
		WRITE_ERROR_EXIT;

	return;
B
Bruce Momjian 已提交
1648
}
1649

1650 1651
/* on some error, we may decide to go on... */
void
1652
warn_or_exit_horribly(ArchiveHandle *AH,
1653
					  const char *modulename, const char *fmt,...)
1654
{
B
Bruce Momjian 已提交
1655
	va_list		ap;
1656

B
Bruce Momjian 已提交
1657 1658
	switch (AH->stage)
	{
1659 1660 1661 1662 1663 1664

		case STAGE_NONE:
			/* Do nothing special */
			break;

		case STAGE_INITIALIZING:
B
Bruce Momjian 已提交
1665
			if (AH->stage != AH->lastErrorStage)
1666 1667 1668 1669
				write_msg(modulename, "Error while INITIALIZING:\n");
			break;

		case STAGE_PROCESSING:
B
Bruce Momjian 已提交
1670
			if (AH->stage != AH->lastErrorStage)
1671 1672 1673 1674
				write_msg(modulename, "Error while PROCESSING TOC:\n");
			break;

		case STAGE_FINALIZING:
B
Bruce Momjian 已提交
1675
			if (AH->stage != AH->lastErrorStage)
1676 1677 1678
				write_msg(modulename, "Error while FINALIZING:\n");
			break;
	}
B
Bruce Momjian 已提交
1679 1680
	if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
	{
1681 1682
		write_msg(modulename, "Error from TOC entry %d; %u %u %s %s %s\n",
				  AH->currentTE->dumpId,
B
Bruce Momjian 已提交
1683 1684
			 AH->currentTE->catalogId.tableoid, AH->currentTE->catalogId.oid,
			  AH->currentTE->desc, AH->currentTE->tag, AH->currentTE->owner);
1685 1686 1687 1688
	}
	AH->lastErrorStage = AH->stage;
	AH->lastErrorTE = AH->currentTE;

1689
	va_start(ap, fmt);
1690 1691 1692
	vwrite_msg(modulename, fmt, ap);
	va_end(ap);

1693
	if (AH->public.exit_on_error)
1694
		exit_nicely(1);
1695 1696 1697
	else
		AH->public.n_errors++;
}
1698

1699 1700
#ifdef NOT_USED

B
Bruce Momjian 已提交
1701 1702
static void
_moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
B
Bruce Momjian 已提交
1703
{
1704
	/* Unlink te from list */
B
Bruce Momjian 已提交
1705 1706
	te->prev->next = te->next;
	te->next->prev = te->prev;
B
Bruce Momjian 已提交
1707

1708
	/* and insert it after "pos" */
B
Bruce Momjian 已提交
1709 1710 1711 1712
	te->prev = pos;
	te->next = pos->next;
	pos->next->prev = te;
	pos->next = te;
B
Bruce Momjian 已提交
1713
}
1714
#endif
1715

B
Bruce Momjian 已提交
1716 1717
static void
_moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
B
Bruce Momjian 已提交
1718
{
1719
	/* Unlink te from list */
B
Bruce Momjian 已提交
1720 1721
	te->prev->next = te->next;
	te->next->prev = te->prev;
B
Bruce Momjian 已提交
1722

1723
	/* and insert it before "pos" */
B
Bruce Momjian 已提交
1724 1725 1726 1727
	te->prev = pos->prev;
	te->next = pos;
	pos->prev->next = te;
	pos->prev = te;
B
Bruce Momjian 已提交
1728
}
1729

1730 1731 1732 1733 1734 1735 1736
/*
 * Build index arrays for the TOC list
 *
 * This should be invoked only after we have created or read in all the TOC
 * items.
 *
 * The arrays are indexed by dump ID (so entry zero is unused).  Note that the
B
Bruce Momjian 已提交
1737
 * array entries run only up to maxDumpId.  We might see dependency dump IDs
1738 1739 1740 1741 1742
 * beyond that (if the dump was partial); so always check the array bound
 * before trying to touch an array entry.
 */
static void
buildTocEntryArrays(ArchiveHandle *AH)
B
Bruce Momjian 已提交
1743
{
1744
	DumpId		maxDumpId = AH->maxDumpId;
B
Bruce Momjian 已提交
1745 1746
	TocEntry   *te;

1747 1748
	AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
	AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1749

1750
	for (te = AH->toc->next; te != AH->toc; te = te->next)
B
Bruce Momjian 已提交
1751
	{
1752 1753
		/* this check is purely paranoia, maxDumpId should be correct */
		if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1754
			exit_horribly(modulename, "bad dumpId\n");
1755 1756 1757 1758 1759 1760

		/* tocsByDumpId indexes all TOCs by their dump ID */
		AH->tocsByDumpId[te->dumpId] = te;

		/*
		 * tableDataId provides the TABLE DATA item's dump ID for each TABLE
B
Bruce Momjian 已提交
1761
		 * TOC entry that has a DATA item.  We compute this by reversing the
1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774
		 * TABLE DATA item's dependency, knowing that a TABLE DATA item has
		 * just one dependency and it is the TABLE item.
		 */
		if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
		{
			DumpId		tableId = te->dependencies[0];

			/*
			 * The TABLE item might not have been in the archive, if this was
			 * a data-only dump; but its dump ID should be less than its data
			 * item's dump ID, so there should be a place for it in the array.
			 */
			if (tableId <= 0 || tableId > maxDumpId)
1775
				exit_horribly(modulename, "bad table dumpId for TABLE DATA item\n");
1776 1777 1778

			AH->tableDataId[tableId] = te->dumpId;
		}
B
Bruce Momjian 已提交
1779
	}
1780 1781
}

A
Andrew Dunstan 已提交
1782
TocEntry *
1783 1784 1785 1786 1787 1788 1789 1790 1791
getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
{
	/* build index arrays if we didn't already */
	if (AH->tocsByDumpId == NULL)
		buildTocEntryArrays(AH);

	if (id > 0 && id <= AH->maxDumpId)
		return AH->tocsByDumpId[id];

B
Bruce Momjian 已提交
1792
	return NULL;
B
Bruce Momjian 已提交
1793 1794
}

1795
teReqs
1796
TocIDRequired(ArchiveHandle *AH, DumpId id)
B
Bruce Momjian 已提交
1797
{
1798
	TocEntry   *te = getTocEntryByDumpId(AH, id);
B
Bruce Momjian 已提交
1799

B
Bruce Momjian 已提交
1800 1801
	if (!te)
		return 0;
B
Bruce Momjian 已提交
1802

1803
	return te->reqs;
B
Bruce Momjian 已提交
1804 1805
}

1806
size_t
1807
WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
1808
{
B
Bruce Momjian 已提交
1809
	int			off;
1810 1811 1812 1813

	/* Save the flag */
	(*AH->WriteBytePtr) (AH, wasSet);

1814 1815
	/* Write out pgoff_t smallest byte first, prevents endian mismatch */
	for (off = 0; off < sizeof(pgoff_t); off++)
1816
	{
B
Bruce Momjian 已提交
1817
		(*AH->WriteBytePtr) (AH, o & 0xFF);
1818 1819
		o >>= 8;
	}
1820
	return sizeof(pgoff_t) + 1;
1821 1822 1823
}

int
B
Bruce Momjian 已提交
1824
ReadOffset(ArchiveHandle *AH, pgoff_t * o)
1825
{
B
Bruce Momjian 已提交
1826 1827 1828
	int			i;
	int			off;
	int			offsetFlg;
1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839

	/* Initialize to zero */
	*o = 0;

	/* Check for old version */
	if (AH->version < K_VERS_1_7)
	{
		/* Prior versions wrote offsets using WriteInt */
		i = ReadInt(AH);
		/* -1 means not set */
		if (i < 0)
B
Bruce Momjian 已提交
1840
			return K_OFFSET_POS_NOT_SET;
1841
		else if (i == 0)
B
Bruce Momjian 已提交
1842
			return K_OFFSET_NO_DATA;
1843

1844 1845
		/* Cast to pgoff_t because it was written as an int. */
		*o = (pgoff_t) i;
1846 1847 1848 1849
		return K_OFFSET_POS_SET;
	}

	/*
B
Bruce Momjian 已提交
1850 1851
	 * Read the flag indicating the state of the data pointer. Check if valid
	 * and die if not.
1852
	 *
1853 1854
	 * This used to be handled by a negative or zero pointer, now we use an
	 * extra byte specifically for the state.
1855 1856 1857 1858 1859 1860 1861 1862 1863
	 */
	offsetFlg = (*AH->ReadBytePtr) (AH) & 0xFF;

	switch (offsetFlg)
	{
		case K_OFFSET_POS_NOT_SET:
		case K_OFFSET_NO_DATA:
		case K_OFFSET_POS_SET:

B
Bruce Momjian 已提交
1864
			break;
1865 1866

		default:
1867
			exit_horribly(modulename, "unexpected data offset flag %d\n", offsetFlg);
1868 1869 1870 1871 1872 1873 1874
	}

	/*
	 * Read the bytes
	 */
	for (off = 0; off < AH->offSize; off++)
	{
1875 1876
		if (off < sizeof(pgoff_t))
			*o |= ((pgoff_t) ((*AH->ReadBytePtr) (AH))) << (off * 8);
1877 1878 1879
		else
		{
			if ((*AH->ReadBytePtr) (AH) != 0)
1880
				exit_horribly(modulename, "file offset in dump file is too large\n");
1881 1882 1883 1884 1885 1886
		}
	}

	return offsetFlg;
}

P
Peter Eisentraut 已提交
1887
size_t
B
Bruce Momjian 已提交
1888
WriteInt(ArchiveHandle *AH, int i)
B
Bruce Momjian 已提交
1889
{
B
Bruce Momjian 已提交
1890 1891 1892
	int			b;

	/*
B
Bruce Momjian 已提交
1893 1894 1895 1896 1897
	 * This is a bit yucky, but I don't want to make the binary format very
	 * dependent on representation, and not knowing much about it, I write out
	 * a sign byte. If you change this, don't forget to change the file
	 * version #, and modify readInt to read the new format AS WELL AS the old
	 * formats.
B
Bruce Momjian 已提交
1898 1899 1900 1901 1902 1903
	 */

	/* SIGN byte */
	if (i < 0)
	{
		(*AH->WriteBytePtr) (AH, 1);
1904
		i = -i;
B
Bruce Momjian 已提交
1905 1906 1907 1908 1909 1910 1911
	}
	else
		(*AH->WriteBytePtr) (AH, 0);

	for (b = 0; b < AH->intSize; b++)
	{
		(*AH->WriteBytePtr) (AH, i & 0xFF);
1912
		i >>= 8;
B
Bruce Momjian 已提交
1913 1914 1915
	}

	return AH->intSize + 1;
B
Bruce Momjian 已提交
1916 1917
}

B
Bruce Momjian 已提交
1918 1919
int
ReadInt(ArchiveHandle *AH)
B
Bruce Momjian 已提交
1920
{
B
Bruce Momjian 已提交
1921 1922 1923 1924 1925
	int			res = 0;
	int			bv,
				b;
	int			sign = 0;		/* Default positive */
	int			bitShift = 0;
B
Bruce Momjian 已提交
1926

B
Bruce Momjian 已提交
1927
	if (AH->version > K_VERS_1_0)
1928
		/* Read a sign byte */
B
Bruce Momjian 已提交
1929
		sign = (*AH->ReadBytePtr) (AH);
B
Bruce Momjian 已提交
1930

B
Bruce Momjian 已提交
1931 1932 1933
	for (b = 0; b < AH->intSize; b++)
	{
		bv = (*AH->ReadBytePtr) (AH) & 0xFF;
1934 1935 1936
		if (bv != 0)
			res = res + (bv << bitShift);
		bitShift += 8;
B
Bruce Momjian 已提交
1937
	}
B
Bruce Momjian 已提交
1938

B
Bruce Momjian 已提交
1939 1940
	if (sign)
		res = -res;
B
Bruce Momjian 已提交
1941

B
Bruce Momjian 已提交
1942
	return res;
B
Bruce Momjian 已提交
1943 1944
}

P
Peter Eisentraut 已提交
1945
size_t
1946
WriteStr(ArchiveHandle *AH, const char *c)
B
Bruce Momjian 已提交
1947
{
P
Peter Eisentraut 已提交
1948
	size_t		res;
1949 1950 1951

	if (c)
	{
B
Bruce Momjian 已提交
1952 1953
		int			len = strlen(c);

1954 1955 1956
		res = WriteInt(AH, len);
		(*AH->WriteBufPtr) (AH, c, len);
		res += len;
1957 1958 1959 1960
	}
	else
		res = WriteInt(AH, -1);

B
Bruce Momjian 已提交
1961
	return res;
B
Bruce Momjian 已提交
1962 1963
}

B
Bruce Momjian 已提交
1964 1965
char *
ReadStr(ArchiveHandle *AH)
B
Bruce Momjian 已提交
1966
{
B
Bruce Momjian 已提交
1967 1968
	char	   *buf;
	int			l;
B
Bruce Momjian 已提交
1969

B
Bruce Momjian 已提交
1970
	l = ReadInt(AH);
1971
	if (l < 0)
1972 1973 1974
		buf = NULL;
	else
	{
1975
		buf = (char *) pg_malloc(l + 1);
1976
		(*AH->ReadBufPtr) (AH, (void *) buf, l);
1977

1978 1979
		buf[l] = '\0';
	}
B
Bruce Momjian 已提交
1980

B
Bruce Momjian 已提交
1981
	return buf;
B
Bruce Momjian 已提交
1982 1983
}

T
Tom Lane 已提交
1984
static int
B
Bruce Momjian 已提交
1985
_discoverArchiveFormat(ArchiveHandle *AH)
B
Bruce Momjian 已提交
1986
{
B
Bruce Momjian 已提交
1987 1988
	FILE	   *fh;
	char		sig[6];			/* More than enough */
P
Peter Eisentraut 已提交
1989
	size_t		cnt;
B
Bruce Momjian 已提交
1990
	int			wantClose = 0;
B
Bruce Momjian 已提交
1991

1992
#if 0
1993
	write_msg(modulename, "attempting to ascertain archive format\n");
1994
#endif
1995 1996 1997 1998 1999

	if (AH->lookahead)
		free(AH->lookahead);

	AH->lookaheadSize = 512;
2000
	AH->lookahead = pg_malloc0(512);
2001 2002
	AH->lookaheadLen = 0;
	AH->lookaheadPos = 0;
2003

B
Bruce Momjian 已提交
2004 2005
	if (AH->fSpec)
	{
2006
		struct stat st;
2007

2008
		wantClose = 1;
2009 2010 2011 2012 2013 2014 2015 2016

		/*
		 * Check if the specified archive is a directory. If so, check if
		 * there's a "toc.dat" (or "toc.dat.gz") file in it.
		 */
		if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
		{
			char		buf[MAXPGPATH];
2017

2018
			if (snprintf(buf, MAXPGPATH, "%s/toc.dat", AH->fSpec) >= MAXPGPATH)
2019 2020
				exit_horribly(modulename, "directory name too long: \"%s\"\n",
							  AH->fSpec);
2021 2022 2023 2024 2025 2026 2027 2028
			if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
			{
				AH->format = archDirectory;
				return AH->format;
			}

#ifdef HAVE_LIBZ
			if (snprintf(buf, MAXPGPATH, "%s/toc.dat.gz", AH->fSpec) >= MAXPGPATH)
2029 2030
				exit_horribly(modulename, "directory name too long: \"%s\"\n",
							  AH->fSpec);
2031 2032 2033 2034 2035 2036
			if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
			{
				AH->format = archDirectory;
				return AH->format;
			}
#endif
2037 2038
			exit_horribly(modulename, "directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)\n",
						  AH->fSpec);
2039
			fh = NULL;			/* keep compiler quiet */
2040 2041 2042 2043 2044
		}
		else
		{
			fh = fopen(AH->fSpec, PG_BINARY_R);
			if (!fh)
2045 2046
				exit_horribly(modulename, "could not open input file \"%s\": %s\n",
							  AH->fSpec, strerror(errno));
2047
		}
B
Bruce Momjian 已提交
2048 2049
	}
	else
2050
	{
2051
		fh = stdin;
2052
		if (!fh)
2053 2054
			exit_horribly(modulename, "could not open input file: %s\n",
						  strerror(errno));
2055
	}
B
Bruce Momjian 已提交
2056

2057
	if ((cnt = fread(sig, 1, 5, fh)) != 5)
2058 2059
	{
		if (ferror(fh))
2060
			exit_horribly(modulename, "could not read input file: %s\n", strerror(errno));
2061
		else
2062 2063
			exit_horribly(modulename, "input file is too short (read %lu, expected 5)\n",
						  (unsigned long) cnt);
2064
	}
B
Bruce Momjian 已提交
2065

B
Bruce Momjian 已提交
2066
	/* Save it, just in case we need it later */
2067
	memcpy(&AH->lookahead[0], sig, 5);
2068
	AH->lookaheadLen = 5;
B
Bruce Momjian 已提交
2069

B
Bruce Momjian 已提交
2070
	if (strncmp(sig, "PGDMP", 5) == 0)
2071
	{
B
Bruce Momjian 已提交
2072
		int			byteread;
S
Stephen Frost 已提交
2073

2074 2075 2076 2077 2078
		/*
		 * Finish reading (most of) a custom-format header.
		 *
		 * NB: this code must agree with ReadHead().
		 */
S
Stephen Frost 已提交
2079
		if ((byteread = fgetc(fh)) == EOF)
2080
			READ_ERROR_EXIT(fh);
S
Stephen Frost 已提交
2081 2082 2083 2084

		AH->vmaj = byteread;

		if ((byteread = fgetc(fh)) == EOF)
2085
			READ_ERROR_EXIT(fh);
S
Stephen Frost 已提交
2086 2087

		AH->vmin = byteread;
2088 2089 2090 2091 2092 2093

		/* Save these too... */
		AH->lookahead[AH->lookaheadLen++] = AH->vmaj;
		AH->lookahead[AH->lookaheadLen++] = AH->vmin;

		/* Check header version; varies from V1.0 */
B
Bruce Momjian 已提交
2094 2095
		if (AH->vmaj > 1 || ((AH->vmaj == 1) && (AH->vmin > 0)))		/* Version > 1.0 */
		{
S
Stephen Frost 已提交
2096
			if ((byteread = fgetc(fh)) == EOF)
2097
				READ_ERROR_EXIT(fh);
S
Stephen Frost 已提交
2098 2099

			AH->vrev = byteread;
2100 2101 2102 2103 2104
			AH->lookahead[AH->lookaheadLen++] = AH->vrev;
		}
		else
			AH->vrev = 0;

2105 2106 2107
		/* Make a convenient integer <maj><min><rev>00 */
		AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;

S
Stephen Frost 已提交
2108
		if ((AH->intSize = fgetc(fh)) == EOF)
2109
			READ_ERROR_EXIT(fh);
2110 2111
		AH->lookahead[AH->lookaheadLen++] = AH->intSize;

2112 2113
		if (AH->version >= K_VERS_1_7)
		{
S
Stephen Frost 已提交
2114
			if ((AH->offSize = fgetc(fh)) == EOF)
2115
				READ_ERROR_EXIT(fh);
2116 2117 2118 2119 2120
			AH->lookahead[AH->lookaheadLen++] = AH->offSize;
		}
		else
			AH->offSize = AH->intSize;

S
Stephen Frost 已提交
2121
		if ((byteread = fgetc(fh)) == EOF)
2122
			READ_ERROR_EXIT(fh);
S
Stephen Frost 已提交
2123 2124

		AH->format = byteread;
2125
		AH->lookahead[AH->lookaheadLen++] = AH->format;
B
Bruce Momjian 已提交
2126 2127 2128
	}
	else
	{
2129
		/*
2130 2131
		 * *Maybe* we have a tar archive format file or a text dump ... So,
		 * read first 512 byte header...
2132 2133
		 */
		cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2134
		/* read failure is checked below */
2135
		AH->lookaheadLen += cnt;
B
Bruce Momjian 已提交
2136

2137 2138 2139 2140
		if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
			(strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
			 strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
		{
2141 2142 2143 2144
			/*
			 * looks like it's probably a text format dump. so suggest they
			 * try psql
			 */
2145
			exit_horribly(modulename, "input file appears to be a text format dump. Please use psql.\n");
2146 2147
		}

2148 2149 2150 2151 2152 2153 2154
		if (AH->lookaheadLen != 512)
		{
			if (feof(fh))
				exit_horribly(modulename, "input file does not appear to be a valid archive (too short?)\n");
			else
				READ_ERROR_EXIT(fh);
		}
B
Bruce Momjian 已提交
2155

2156
		if (!isValidTarHeader(AH->lookahead))
2157
			exit_horribly(modulename, "input file does not appear to be a valid archive\n");
B
Bruce Momjian 已提交
2158

2159 2160
		AH->format = archTar;
	}
B
Bruce Momjian 已提交
2161

B
Bruce Momjian 已提交
2162
	/* If we can't seek, then mark the header as read */
P
Peter Eisentraut 已提交
2163
	if (fseeko(fh, 0, SEEK_SET) != 0)
2164 2165
	{
		/*
B
Bruce Momjian 已提交
2166 2167
		 * NOTE: Formats that use the lookahead buffer can unset this in their
		 * Init routine.
2168 2169 2170 2171
		 */
		AH->readHeader = 1;
	}
	else
B
Bruce Momjian 已提交
2172
		AH->lookaheadLen = 0;	/* Don't bother since we've reset the file */
2173

B
Bruce Momjian 已提交
2174 2175
	/* Close the file */
	if (wantClose)
2176
		if (fclose(fh) != 0)
2177 2178
			exit_horribly(modulename, "could not close input file: %s\n",
						  strerror(errno));
B
Bruce Momjian 已提交
2179

B
Bruce Momjian 已提交
2180
	return AH->format;
B
Bruce Momjian 已提交
2181 2182 2183 2184 2185 2186
}


/*
 * Allocate an archive handle
 */
B
Bruce Momjian 已提交
2187 2188
static ArchiveHandle *
_allocAH(const char *FileSpec, const ArchiveFormat fmt,
A
Andrew Dunstan 已提交
2189
	  const int compression, ArchiveMode mode, SetupWorkerPtr setupWorkerPtr)
2190
{
B
Bruce Momjian 已提交
2191
	ArchiveHandle *AH;
B
Bruce Momjian 已提交
2192

2193
#if 0
2194
	write_msg(modulename, "allocating AH for %s, format %d\n", FileSpec, fmt);
2195
#endif
2196

2197
	AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
B
Bruce Momjian 已提交
2198

2199 2200
	/* AH->debugLevel = 100; */

B
Bruce Momjian 已提交
2201 2202
	AH->vmaj = K_VERS_MAJOR;
	AH->vmin = K_VERS_MINOR;
2203
	AH->vrev = K_VERS_REV;
B
Bruce Momjian 已提交
2204

2205 2206 2207
	/* Make a convenient integer <maj><min><rev>00 */
	AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;

2208
	/* initialize for backwards compatible string processing */
2209
	AH->public.encoding = 0;	/* PG_SQL_ASCII */
2210 2211 2212 2213 2214 2215
	AH->public.std_strings = false;

	/* sql error handling */
	AH->public.exit_on_error = true;
	AH->public.n_errors = 0;

2216 2217
	AH->archiveDumpVersion = PG_VERSION;

2218 2219
	AH->createDate = time(NULL);

B
Bruce Momjian 已提交
2220
	AH->intSize = sizeof(int);
2221
	AH->offSize = sizeof(pgoff_t);
B
Bruce Momjian 已提交
2222 2223
	if (FileSpec)
	{
2224
		AH->fSpec = pg_strdup(FileSpec);
B
Bruce Momjian 已提交
2225

2226 2227 2228
		/*
		 * Not used; maybe later....
		 *
2229
		 * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2230
		 * i--) if (AH->workDir[i-1] == '/')
2231
		 */
B
Bruce Momjian 已提交
2232 2233
	}
	else
2234
		AH->fSpec = NULL;
B
Bruce Momjian 已提交
2235

2236 2237 2238
	AH->currUser = NULL;		/* unknown */
	AH->currSchema = NULL;		/* ditto */
	AH->currTablespace = NULL;	/* ditto */
2239
	AH->currWithOids = -1;		/* force SET */
B
Bruce Momjian 已提交
2240

2241
	AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
B
Bruce Momjian 已提交
2242

B
Bruce Momjian 已提交
2243 2244 2245 2246 2247
	AH->toc->next = AH->toc;
	AH->toc->prev = AH->toc;

	AH->mode = mode;
	AH->compression = compression;
B
Bruce Momjian 已提交
2248

2249 2250
	memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));

B
Bruce Momjian 已提交
2251 2252 2253
	/* Open stdout with no compression for AH output handle */
	AH->gzOut = 0;
	AH->OF = stdout;
B
Bruce Momjian 已提交
2254

2255 2256
	/*
	 * On Windows, we need to use binary mode to read/write non-text archive
B
Bruce Momjian 已提交
2257 2258
	 * formats.  Force stdin/stdout into binary mode if that is what we are
	 * using.
2259 2260
	 */
#ifdef WIN32
2261 2262
	if (fmt != archNull &&
		(AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2263 2264 2265 2266 2267 2268 2269 2270
	{
		if (mode == archModeWrite)
			setmode(fileno(stdout), O_BINARY);
		else
			setmode(fileno(stdin), O_BINARY);
	}
#endif

A
Andrew Dunstan 已提交
2271 2272
	AH->SetupWorkerPtr = setupWorkerPtr;

B
Bruce Momjian 已提交
2273
	if (fmt == archUnknown)
2274 2275 2276
		AH->format = _discoverArchiveFormat(AH);
	else
		AH->format = fmt;
B
Bruce Momjian 已提交
2277

2278 2279
	AH->promptPassword = TRI_DEFAULT;

B
Bruce Momjian 已提交
2280 2281
	switch (AH->format)
	{
2282 2283 2284
		case archCustom:
			InitArchiveFmt_Custom(AH);
			break;
B
Bruce Momjian 已提交
2285

2286 2287 2288
		case archNull:
			InitArchiveFmt_Null(AH);
			break;
B
Bruce Momjian 已提交
2289

2290 2291 2292 2293
		case archDirectory:
			InitArchiveFmt_Directory(AH);
			break;

2294 2295 2296 2297 2298
		case archTar:
			InitArchiveFmt_Tar(AH);
			break;

		default:
2299
			exit_horribly(modulename, "unrecognized file format \"%d\"\n", fmt);
B
Bruce Momjian 已提交
2300
	}
B
Bruce Momjian 已提交
2301

B
Bruce Momjian 已提交
2302
	return AH;
B
Bruce Momjian 已提交
2303 2304
}

B
Bruce Momjian 已提交
2305
void
2306
WriteDataChunks(ArchiveHandle *AH, DumpOptions *dopt, ParallelState *pstate)
B
Bruce Momjian 已提交
2307
{
2308
	TocEntry   *te;
B
Bruce Momjian 已提交
2309

2310
	for (te = AH->toc->next; te != AH->toc; te = te->next)
B
Bruce Momjian 已提交
2311
	{
A
Andrew Dunstan 已提交
2312 2313
		if (!te->dataDumper)
			continue;
B
Bruce Momjian 已提交
2314

A
Andrew Dunstan 已提交
2315 2316
		if ((te->reqs & REQ_DATA) == 0)
			continue;
B
Bruce Momjian 已提交
2317

A
Andrew Dunstan 已提交
2318 2319
		if (pstate && pstate->numWorkers > 1)
		{
B
Bruce Momjian 已提交
2320
			/*
A
Andrew Dunstan 已提交
2321 2322
			 * If we are in a parallel backup, then we are always the master
			 * process.
B
Bruce Momjian 已提交
2323
			 */
A
Andrew Dunstan 已提交
2324 2325 2326 2327 2328
			EnsureIdleWorker(AH, pstate);
			Assert(GetIdleWorker(pstate) != NO_SLOT);
			DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP);
		}
		else
2329
			WriteDataChunksForTocEntry(AH, dopt, te);
A
Andrew Dunstan 已提交
2330 2331 2332
	}
	EnsureWorkersFinished(AH, pstate);
}
B
Bruce Momjian 已提交
2333

A
Andrew Dunstan 已提交
2334
void
2335
WriteDataChunksForTocEntry(ArchiveHandle *AH, DumpOptions *dopt, TocEntry *te)
A
Andrew Dunstan 已提交
2336 2337 2338
{
	StartDataPtr startPtr;
	EndDataPtr	endPtr;
B
Bruce Momjian 已提交
2339

A
Andrew Dunstan 已提交
2340 2341 2342 2343 2344 2345
	AH->currToc = te;

	if (strcmp(te->desc, "BLOBS") == 0)
	{
		startPtr = AH->StartBlobsPtr;
		endPtr = AH->EndBlobsPtr;
B
Bruce Momjian 已提交
2346
	}
A
Andrew Dunstan 已提交
2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358
	else
	{
		startPtr = AH->StartDataPtr;
		endPtr = AH->EndDataPtr;
	}

	if (startPtr != NULL)
		(*startPtr) (AH, te);

	/*
	 * The user-provided DataDumper routine needs to call AH->WriteData
	 */
2359
	(*te->dataDumper) ((Archive *) AH, dopt, te->dataDumperArg);
A
Andrew Dunstan 已提交
2360 2361 2362 2363 2364

	if (endPtr != NULL)
		(*endPtr) (AH, te);

	AH->currToc = NULL;
B
Bruce Momjian 已提交
2365 2366
}

B
Bruce Momjian 已提交
2367 2368
void
WriteToc(ArchiveHandle *AH)
B
Bruce Momjian 已提交
2369
{
2370 2371
	TocEntry   *te;
	char		workbuf[32];
2372
	int			tocCount;
2373
	int			i;
B
Bruce Momjian 已提交
2374

2375 2376 2377 2378 2379 2380 2381 2382 2383
	/* count entries that will actually be dumped */
	tocCount = 0;
	for (te = AH->toc->next; te != AH->toc; te = te->next)
	{
		if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) != 0)
			tocCount++;
	}

	/* printf("%d TOC Entries to save\n", tocCount); */
B
Bruce Momjian 已提交
2384

2385
	WriteInt(AH, tocCount);
2386 2387

	for (te = AH->toc->next; te != AH->toc; te = te->next)
B
Bruce Momjian 已提交
2388
	{
2389 2390 2391
		if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) == 0)
			continue;

2392
		WriteInt(AH, te->dumpId);
B
Bruce Momjian 已提交
2393
		WriteInt(AH, te->dataDumper ? 1 : 0);
2394 2395 2396 2397 2398 2399

		/* OID is recorded as a string for historical reasons */
		sprintf(workbuf, "%u", te->catalogId.tableoid);
		WriteStr(AH, workbuf);
		sprintf(workbuf, "%u", te->catalogId.oid);
		WriteStr(AH, workbuf);
2400

2401
		WriteStr(AH, te->tag);
B
Bruce Momjian 已提交
2402
		WriteStr(AH, te->desc);
2403
		WriteInt(AH, te->section);
B
Bruce Momjian 已提交
2404 2405 2406
		WriteStr(AH, te->defn);
		WriteStr(AH, te->dropStmt);
		WriteStr(AH, te->copyStmt);
2407
		WriteStr(AH, te->namespace);
2408
		WriteStr(AH, te->tablespace);
B
Bruce Momjian 已提交
2409
		WriteStr(AH, te->owner);
2410
		WriteStr(AH, te->withOids ? "true" : "false");
2411 2412

		/* Dump list of dependencies */
2413
		for (i = 0; i < te->nDeps; i++)
2414
		{
2415 2416
			sprintf(workbuf, "%d", te->dependencies[i]);
			WriteStr(AH, workbuf);
2417
		}
2418
		WriteStr(AH, NULL);		/* Terminate List */
2419

B
Bruce Momjian 已提交
2420 2421 2422
		if (AH->WriteExtraTocPtr)
			(*AH->WriteExtraTocPtr) (AH, te);
	}
B
Bruce Momjian 已提交
2423 2424
}

B
Bruce Momjian 已提交
2425 2426
void
ReadToc(ArchiveHandle *AH)
B
Bruce Momjian 已提交
2427
{
B
Bruce Momjian 已提交
2428
	int			i;
2429 2430
	char	   *tmp;
	DumpId	   *deps;
2431 2432
	int			depIdx;
	int			depSize;
2433
	TocEntry   *te;
B
Bruce Momjian 已提交
2434

B
Bruce Momjian 已提交
2435
	AH->tocCount = ReadInt(AH);
2436
	AH->maxDumpId = 0;
B
Bruce Momjian 已提交
2437

B
Bruce Momjian 已提交
2438 2439
	for (i = 0; i < AH->tocCount; i++)
	{
2440
		te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2441 2442 2443 2444
		te->dumpId = ReadInt(AH);

		if (te->dumpId > AH->maxDumpId)
			AH->maxDumpId = te->dumpId;
2445 2446

		/* Sanity check */
2447
		if (te->dumpId <= 0)
2448
			exit_horribly(modulename,
2449
					   "entry ID %d out of range -- perhaps a corrupt TOC\n",
2450
						  te->dumpId);
2451 2452

		te->hadDumper = ReadInt(AH);
2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464

		if (AH->version >= K_VERS_1_8)
		{
			tmp = ReadStr(AH);
			sscanf(tmp, "%u", &te->catalogId.tableoid);
			free(tmp);
		}
		else
			te->catalogId.tableoid = InvalidOid;
		tmp = ReadStr(AH);
		sscanf(tmp, "%u", &te->catalogId.oid);
		free(tmp);
2465

2466
		te->tag = ReadStr(AH);
2467
		te->desc = ReadStr(AH);
2468 2469 2470 2471 2472 2473 2474 2475

		if (AH->version >= K_VERS_1_11)
		{
			te->section = ReadInt(AH);
		}
		else
		{
			/*
2476 2477 2478
			 * Rules for pre-8.4 archives wherein pg_dump hasn't classified
			 * the entries into sections.  This list need not cover entry
			 * types added later than 8.4.
2479 2480
			 */
			if (strcmp(te->desc, "COMMENT") == 0 ||
2481
				strcmp(te->desc, "ACL") == 0 ||
2482
				strcmp(te->desc, "ACL LANGUAGE") == 0)
2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498
				te->section = SECTION_NONE;
			else if (strcmp(te->desc, "TABLE DATA") == 0 ||
					 strcmp(te->desc, "BLOBS") == 0 ||
					 strcmp(te->desc, "BLOB COMMENTS") == 0)
				te->section = SECTION_DATA;
			else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
					 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
					 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
					 strcmp(te->desc, "INDEX") == 0 ||
					 strcmp(te->desc, "RULE") == 0 ||
					 strcmp(te->desc, "TRIGGER") == 0)
				te->section = SECTION_POST_DATA;
			else
				te->section = SECTION_PRE_DATA;
		}

2499 2500 2501 2502 2503 2504
		te->defn = ReadStr(AH);
		te->dropStmt = ReadStr(AH);

		if (AH->version >= K_VERS_1_3)
			te->copyStmt = ReadStr(AH);

2505 2506 2507
		if (AH->version >= K_VERS_1_6)
			te->namespace = ReadStr(AH);

2508 2509 2510
		if (AH->version >= K_VERS_1_10)
			te->tablespace = ReadStr(AH);

2511
		te->owner = ReadStr(AH);
2512 2513 2514 2515 2516 2517 2518 2519 2520
		if (AH->version >= K_VERS_1_9)
		{
			if (strcmp(ReadStr(AH), "true") == 0)
				te->withOids = true;
			else
				te->withOids = false;
		}
		else
			te->withOids = true;
B
Bruce Momjian 已提交
2521

2522 2523 2524 2525
		/* Read TOC entry dependencies */
		if (AH->version >= K_VERS_1_5)
		{
			depSize = 100;
2526
			deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2527
			depIdx = 0;
2528
			for (;;)
2529
			{
2530 2531 2532
				tmp = ReadStr(AH);
				if (!tmp)
					break;		/* end of list */
2533
				if (depIdx >= depSize)
2534 2535
				{
					depSize *= 2;
T
Tom Lane 已提交
2536
					deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2537
				}
2538 2539 2540 2541
				sscanf(tmp, "%d", &deps[depIdx]);
				free(tmp);
				depIdx++;
			}
2542

2543 2544
			if (depIdx > 0)		/* We have a non-null entry */
			{
T
Tom Lane 已提交
2545
				deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2546 2547 2548
				te->dependencies = deps;
				te->nDeps = depIdx;
			}
2549
			else
2550 2551
			{
				free(deps);
2552 2553
				te->dependencies = NULL;
				te->nDeps = 0;
2554
			}
2555
		}
2556
		else
2557 2558 2559 2560
		{
			te->dependencies = NULL;
			te->nDeps = 0;
		}
2561

B
Bruce Momjian 已提交
2562 2563
		if (AH->ReadExtraTocPtr)
			(*AH->ReadExtraTocPtr) (AH, te);
2564

2565 2566
		ahlog(AH, 3, "read TOC entry %d (ID %d) for %s %s\n",
			  i, te->dumpId, te->desc, te->tag);
2567

2568
		/* link completed entry into TOC circular list */
2569 2570 2571 2572
		te->prev = AH->toc->prev;
		AH->toc->prev->next = te;
		AH->toc->prev = te;
		te->next = AH->toc;
2573 2574 2575 2576 2577 2578

		/* special processing immediately upon read for some items */
		if (strcmp(te->desc, "ENCODING") == 0)
			processEncodingEntry(AH, te);
		else if (strcmp(te->desc, "STDSTRINGS") == 0)
			processStdStringsEntry(AH, te);
B
Bruce Momjian 已提交
2579
	}
B
Bruce Momjian 已提交
2580 2581
}

2582 2583 2584 2585
static void
processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
{
	/* te->defn should have the form SET client_encoding = 'foo'; */
2586
	char	   *defn = pg_strdup(te->defn);
2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598
	char	   *ptr1;
	char	   *ptr2 = NULL;
	int			encoding;

	ptr1 = strchr(defn, '\'');
	if (ptr1)
		ptr2 = strchr(++ptr1, '\'');
	if (ptr2)
	{
		*ptr2 = '\0';
		encoding = pg_char_to_encoding(ptr1);
		if (encoding < 0)
2599 2600
			exit_horribly(modulename, "unrecognized encoding \"%s\"\n",
						  ptr1);
2601 2602 2603
		AH->public.encoding = encoding;
	}
	else
2604 2605
		exit_horribly(modulename, "invalid ENCODING item: %s\n",
					  te->defn);
2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621

	free(defn);
}

static void
processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
{
	/* te->defn should have the form SET standard_conforming_strings = 'x'; */
	char	   *ptr1;

	ptr1 = strchr(te->defn, '\'');
	if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
		AH->public.std_strings = true;
	else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
		AH->public.std_strings = false;
	else
2622 2623
		exit_horribly(modulename, "invalid STDSTRINGS item: %s\n",
					  te->defn);
2624 2625
}

2626
static teReqs
2627
_tocEntryRequired(TocEntry *te, teSection curSection, RestoreOptions *ropt)
B
Bruce Momjian 已提交
2628
{
2629
	teReqs		res = REQ_SCHEMA | REQ_DATA;
B
Bruce Momjian 已提交
2630

2631
	/* ENCODING and STDSTRINGS items are treated specially */
2632 2633
	if (strcmp(te->desc, "ENCODING") == 0 ||
		strcmp(te->desc, "STDSTRINGS") == 0)
2634
		return REQ_SPECIAL;
2635

B
Bruce Momjian 已提交
2636
	/* If it's an ACL, maybe ignore it */
2637
	if (ropt->aclsSkip && _tocEntryIsACL(te))
2638
		return 0;
B
Bruce Momjian 已提交
2639

R
Robert Haas 已提交
2640
	/* If it's security labels, maybe ignore it */
2641
	if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
R
Robert Haas 已提交
2642 2643
		return 0;

2644 2645
	/* Ignore it if section is not to be dumped/restored */
	switch (curSection)
2646
	{
2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660
		case SECTION_PRE_DATA:
			if (!(ropt->dumpSections & DUMP_PRE_DATA))
				return 0;
			break;
		case SECTION_DATA:
			if (!(ropt->dumpSections & DUMP_DATA))
				return 0;
			break;
		case SECTION_POST_DATA:
			if (!(ropt->dumpSections & DUMP_POST_DATA))
				return 0;
			break;
		default:
			/* shouldn't get here, really, but ignore it */
2661 2662 2663
			return 0;
	}

2664
	/* Check options for selective dump/restore */
2665
	if (ropt->schemaNames.head != NULL)
2666 2667 2668 2669
	{
		/* If no namespace is specified, it means all. */
		if (!te->namespace)
			return 0;
2670
		if (!(simple_string_list_member(&ropt->schemaNames, te->namespace)))
2671 2672 2673
			return 0;
	}

B
Bruce Momjian 已提交
2674 2675
	if (ropt->selTypes)
	{
2676 2677
		if (strcmp(te->desc, "TABLE") == 0 ||
			strcmp(te->desc, "TABLE DATA") == 0)
2678 2679 2680
		{
			if (!ropt->selTable)
				return 0;
2681
			if (ropt->tableNames.head != NULL && (!(simple_string_list_member(&ropt->tableNames, te->tag))))
2682
				return 0;
B
Bruce Momjian 已提交
2683 2684 2685
		}
		else if (strcmp(te->desc, "INDEX") == 0)
		{
2686 2687
			if (!ropt->selIndex)
				return 0;
2688
			if (ropt->indexNames.head != NULL && (!(simple_string_list_member(&ropt->indexNames, te->tag))))
2689
				return 0;
B
Bruce Momjian 已提交
2690 2691 2692
		}
		else if (strcmp(te->desc, "FUNCTION") == 0)
		{
2693 2694
			if (!ropt->selFunction)
				return 0;
2695
			if (ropt->functionNames.head != NULL && (!(simple_string_list_member(&ropt->functionNames, te->tag))))
2696
				return 0;
B
Bruce Momjian 已提交
2697 2698 2699
		}
		else if (strcmp(te->desc, "TRIGGER") == 0)
		{
2700 2701
			if (!ropt->selTrigger)
				return 0;
2702
			if (ropt->triggerNames.head != NULL && (!(simple_string_list_member(&ropt->triggerNames, te->tag))))
2703 2704
				return 0;
		}
B
Bruce Momjian 已提交
2705 2706
		else
			return 0;
B
Bruce Momjian 已提交
2707 2708
	}

2709
	/*
B
Bruce Momjian 已提交
2710
	 * Check if we had a dataDumper. Indicates if the entry is schema or data
2711 2712 2713 2714
	 */
	if (!te->hadDumper)
	{
		/*
B
Bruce Momjian 已提交
2715 2716 2717 2718 2719
		 * Special Case: If 'SEQUENCE SET' or anything to do with BLOBs, then
		 * it is considered a data entry.  We don't need to check for the
		 * BLOBS entry or old-style BLOB COMMENTS, because they will have
		 * hadDumper = true ... but we do need to check new-style BLOB
		 * comments.
2720
		 */
2721 2722 2723 2724 2725
		if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
			strcmp(te->desc, "BLOB") == 0 ||
			(strcmp(te->desc, "ACL") == 0 &&
			 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
			(strcmp(te->desc, "COMMENT") == 0 &&
R
Robert Haas 已提交
2726 2727
			 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
			(strcmp(te->desc, "SECURITY LABEL") == 0 &&
2728
			 strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
2729 2730
			res = res & REQ_DATA;
		else
2731 2732
			res = res & ~REQ_DATA;
	}
2733

2734
	/*
B
Bruce Momjian 已提交
2735 2736
	 * Special case: <Init> type with <Max OID> tag; this is obsolete and we
	 * always ignore it.
2737
	 */
2738
	if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
2739
		return 0;
2740

B
Bruce Momjian 已提交
2741 2742
	/* Mask it if we only want schema */
	if (ropt->schemaOnly)
2743
		res = res & REQ_SCHEMA;
B
Bruce Momjian 已提交
2744

2745
	/* Mask it if we only want data */
2746
	if (ropt->dataOnly)
2747
		res = res & REQ_DATA;
B
Bruce Momjian 已提交
2748

2749
	/* Mask it if we don't have a schema contribution */
B
Bruce Momjian 已提交
2750
	if (!te->defn || strlen(te->defn) == 0)
2751
		res = res & ~REQ_SCHEMA;
B
Bruce Momjian 已提交
2752

2753 2754
	/* Finally, if there's a per-ID filter, limit based on that as well */
	if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2755
		return 0;
B
Bruce Momjian 已提交
2756

B
Bruce Momjian 已提交
2757
	return res;
B
Bruce Momjian 已提交
2758 2759
}

2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773
/*
 * Identify TOC entries that are ACLs.
 */
static bool
_tocEntryIsACL(TocEntry *te)
{
	/* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
	if (strcmp(te->desc, "ACL") == 0 ||
		strcmp(te->desc, "ACL LANGUAGE") == 0 ||
		strcmp(te->desc, "DEFAULT ACL") == 0)
		return true;
	return false;
}

2774 2775 2776 2777 2778 2779 2780
/*
 * Issue SET commands for parameters that we want to have set the same way
 * at all times during execution of a restore script.
 */
static void
_doSetFixedOutputState(ArchiveHandle *AH)
{
2781
	/* Disable statement_timeout since restore is probably slow */
2782
	ahprintf(AH, "SET statement_timeout = 0;\n");
2783

2784 2785 2786
	/* Likewise for lock_timeout */
	ahprintf(AH, "SET lock_timeout = 0;\n");

2787 2788 2789
	/* Select the correct character set encoding */
	ahprintf(AH, "SET client_encoding = '%s';\n",
			 pg_encoding_to_char(AH->public.encoding));
2790

2791 2792 2793
	/* Select the correct string literal syntax */
	ahprintf(AH, "SET standard_conforming_strings = %s;\n",
			 AH->public.std_strings ? "on" : "off");
2794

2795 2796 2797 2798
	/* Select the role to be used during restore */
	if (AH->ropt && AH->ropt->use_role)
		ahprintf(AH, "SET ROLE %s;\n", fmtId(AH->ropt->use_role));

2799 2800 2801
	/* Make sure function checking is disabled */
	ahprintf(AH, "SET check_function_bodies = false;\n");

2802 2803
	/* Avoid annoying notices etc */
	ahprintf(AH, "SET client_min_messages = warning;\n");
2804 2805
	if (!AH->public.std_strings)
		ahprintf(AH, "SET escape_string_warning = off;\n");
2806

2807 2808 2809
	ahprintf(AH, "\n");
}

2810 2811
/*
 * Issue a SET SESSION AUTHORIZATION command.  Caller is responsible
2812 2813
 * for updating state if appropriate.  If user is NULL or an empty string,
 * the specification DEFAULT will be used.
2814 2815
 */
static void
2816
_doSetSessionAuth(ArchiveHandle *AH, const char *user)
2817
{
2818
	PQExpBuffer cmd = createPQExpBuffer();
B
Bruce Momjian 已提交
2819

2820
	appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
B
Bruce Momjian 已提交
2821

2822
	/*
B
Bruce Momjian 已提交
2823
	 * SQL requires a string literal here.  Might as well be correct.
2824 2825
	 */
	if (user && *user)
2826
		appendStringLiteralAHX(cmd, user, AH);
2827
	else
2828 2829
		appendPQExpBufferStr(cmd, "DEFAULT");
	appendPQExpBufferChar(cmd, ';');
2830

2831 2832 2833 2834
	if (RestoringToDB(AH))
	{
		PGresult   *res;

2835
		res = PQexec(AH->connection, cmd->data);
2836 2837

		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2838 2839 2840
			/* NOT warn_or_exit_horribly... use -O instead to skip this. */
			exit_horribly(modulename, "could not set session user to \"%s\": %s",
						  user, PQerrorMessage(AH->connection));
2841 2842 2843 2844

		PQclear(res);
	}
	else
2845 2846 2847
		ahprintf(AH, "%s\n\n", cmd->data);

	destroyPQExpBuffer(cmd);
2848 2849
}

2850

2851 2852 2853 2854 2855 2856 2857 2858 2859 2860
/*
 * Issue a SET default_with_oids command.  Caller is responsible
 * for updating state if appropriate.
 */
static void
_doSetWithOids(ArchiveHandle *AH, const bool withOids)
{
	PQExpBuffer cmd = createPQExpBuffer();

	appendPQExpBuffer(cmd, "SET default_with_oids = %s;", withOids ?
B
Bruce Momjian 已提交
2861
					  "true" : "false");
2862 2863 2864 2865 2866 2867 2868 2869

	if (RestoringToDB(AH))
	{
		PGresult   *res;

		res = PQexec(AH->connection, cmd->data);

		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2870 2871 2872
			warn_or_exit_horribly(AH, modulename,
								  "could not set default_with_oids: %s",
								  PQerrorMessage(AH->connection));
2873 2874 2875 2876 2877 2878 2879 2880 2881 2882

		PQclear(res);
	}
	else
		ahprintf(AH, "%s\n\n", cmd->data);

	destroyPQExpBuffer(cmd);
}


2883
/*
2884
 * Issue the commands to connect to the specified database.
2885 2886
 *
 * If we're currently restoring right into a database, this will
B
Bruce Momjian 已提交
2887
 * actually establish a connection. Otherwise it puts a \connect into
2888
 * the script output.
2889 2890
 *
 * NULL dbname implies reconnecting to the current DB (pretty useless).
2891
 */
B
Bruce Momjian 已提交
2892
static void
2893
_reconnectToDB(ArchiveHandle *AH, const char *dbname)
2894
{
2895
	if (RestoringToDB(AH))
2896
		ReconnectToServer(AH, dbname, NULL);
2897
	else
2898 2899 2900
	{
		PQExpBuffer qry = createPQExpBuffer();

2901
		appendPQExpBuffer(qry, "\\connect %s\n\n",
2902
						  dbname ? fmtId(dbname) : "-");
2903
		ahprintf(AH, "%s", qry->data);
2904 2905
		destroyPQExpBuffer(qry);
	}
2906

2907
	/*
B
Bruce Momjian 已提交
2908 2909
	 * NOTE: currUser keeps track of what the imaginary session user in our
	 * script is.  It's now effectively reset to the original userID.
2910
	 */
2911 2912
	if (AH->currUser)
		free(AH->currUser);
2913
	AH->currUser = NULL;
2914

2915
	/* don't assume we still know the output schema, tablespace, etc either */
2916 2917
	if (AH->currSchema)
		free(AH->currSchema);
2918 2919 2920 2921
	AH->currSchema = NULL;
	if (AH->currTablespace)
		free(AH->currTablespace);
	AH->currTablespace = NULL;
2922
	AH->currWithOids = -1;
B
Bruce Momjian 已提交
2923

2924 2925
	/* re-establish fixed state */
	_doSetFixedOutputState(AH);
2926 2927
}

2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944
/*
 * Become the specified user, and update state to avoid redundant commands
 *
 * NULL or empty argument is taken to mean restoring the session default
 */
static void
_becomeUser(ArchiveHandle *AH, const char *user)
{
	if (!user)
		user = "";				/* avoid null pointers */

	if (AH->currUser && strcmp(AH->currUser, user) == 0)
		return;					/* no need to do anything */

	_doSetSessionAuth(AH, user);

	/*
B
Bruce Momjian 已提交
2945 2946
	 * NOTE: currUser keeps track of what the imaginary session user in our
	 * script is
2947 2948 2949
	 */
	if (AH->currUser)
		free(AH->currUser);
2950
	AH->currUser = pg_strdup(user);
2951
}
2952 2953

/*
B
Bruce Momjian 已提交
2954
 * Become the owner of the given TOC entry object.  If
2955 2956
 * changes in ownership are not allowed, this doesn't do anything.
 */
B
Bruce Momjian 已提交
2957
static void
2958
_becomeOwner(ArchiveHandle *AH, TocEntry *te)
2959
{
2960
	if (AH->ropt && (AH->ropt->noOwner || !AH->ropt->use_setsessauth))
2961 2962
		return;

2963
	_becomeUser(AH, te->owner);
2964 2965
}

2966

2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980
/*
 * Set the proper default_with_oids value for the table.
 */
static void
_setWithOids(ArchiveHandle *AH, TocEntry *te)
{
	if (AH->currWithOids != te->withOids)
	{
		_doSetWithOids(AH, te->withOids);
		AH->currWithOids = te->withOids;
	}
}


2981 2982 2983 2984 2985 2986 2987
/*
 * Issue the commands to select the specified schema as the current schema
 * in the target database.
 */
static void
_selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
{
2988 2989
	PQExpBuffer qry;

2990
	if (!schemaName || *schemaName == '\0' ||
2991
		(AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
2992 2993
		return;					/* no need to do anything */

2994 2995 2996
	qry = createPQExpBuffer();

	appendPQExpBuffer(qry, "SET search_path = %s",
2997
					  fmtId(schemaName));
2998
	if (strcmp(schemaName, "pg_catalog") != 0)
2999
		appendPQExpBufferStr(qry, ", pg_catalog");
3000

3001 3002 3003 3004 3005 3006 3007
	if (RestoringToDB(AH))
	{
		PGresult   *res;

		res = PQexec(AH->connection, qry->data);

		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3008 3009 3010
			warn_or_exit_horribly(AH, modulename,
								  "could not set search_path to \"%s\": %s",
								  schemaName, PQerrorMessage(AH->connection));
3011 3012 3013 3014

		PQclear(res);
	}
	else
3015
		ahprintf(AH, "%s;\n\n", qry->data);
3016 3017 3018

	if (AH->currSchema)
		free(AH->currSchema);
3019
	AH->currSchema = pg_strdup(schemaName);
3020 3021

	destroyPQExpBuffer(qry);
3022 3023
}

3024 3025 3026 3027 3028 3029 3030 3031
/*
 * Issue the commands to select the specified tablespace as the current one
 * in the target database.
 */
static void
_selectTablespace(ArchiveHandle *AH, const char *tablespace)
{
	PQExpBuffer qry;
B
Bruce Momjian 已提交
3032 3033
	const char *want,
			   *have;
3034

3035 3036 3037 3038
	/* do nothing in --no-tablespaces mode */
	if (AH->ropt->noTablespace)
		return;

3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053
	have = AH->currTablespace;
	want = tablespace;

	/* no need to do anything for non-tablespace object */
	if (!want)
		return;

	if (have && strcmp(want, have) == 0)
		return;					/* no need to do anything */

	qry = createPQExpBuffer();

	if (strcmp(want, "") == 0)
	{
		/* We want the tablespace to be the database's default */
3054
		appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068
	}
	else
	{
		/* We want an explicit tablespace */
		appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
	}

	if (RestoringToDB(AH))
	{
		PGresult   *res;

		res = PQexec(AH->connection, qry->data);

		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3069
			warn_or_exit_horribly(AH, modulename,
3070 3071
								"could not set default_tablespace to %s: %s",
								fmtId(want), PQerrorMessage(AH->connection));
3072 3073 3074 3075 3076 3077 3078 3079

		PQclear(res);
	}
	else
		ahprintf(AH, "%s;\n\n", qry->data);

	if (AH->currTablespace)
		free(AH->currTablespace);
3080
	AH->currTablespace = pg_strdup(want);
3081 3082 3083

	destroyPQExpBuffer(qry);
}
3084

3085 3086 3087
/*
 * Extract an object description for a TOC entry, and append it to buf.
 *
3088
 * This is used for ALTER ... OWNER TO.
3089
 */
3090
static void
3091
_getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
3092
{
3093 3094 3095
	const char *type = te->desc;

	/* Use ALTER TABLE for views and sequences */
A
Andrew Dunstan 已提交
3096
	if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0 ||
3097
		strcmp(type, "MATERIALIZED VIEW") == 0)
3098 3099
		type = "TABLE";

3100
	/* objects that don't require special decoration */
P
Peter Eisentraut 已提交
3101 3102
	if (strcmp(type, "COLLATION") == 0 ||
		strcmp(type, "CONVERSION") == 0 ||
3103 3104
		strcmp(type, "DOMAIN") == 0 ||
		strcmp(type, "TABLE") == 0 ||
3105
		strcmp(type, "TYPE") == 0 ||
R
Robert Haas 已提交
3106
		strcmp(type, "FOREIGN TABLE") == 0 ||
3107
		strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3108
		strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
B
Bruce Momjian 已提交
3109
	/* non-schema-specified objects */
3110
		strcmp(type, "DATABASE") == 0 ||
3111
		strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3112 3113 3114 3115
		strcmp(type, "SCHEMA") == 0 ||
		strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
		strcmp(type, "SERVER") == 0 ||
		strcmp(type, "USER MAPPING") == 0)
3116
	{
3117
		/* We already know that search_path was set properly */
3118 3119 3120
		appendPQExpBuffer(buf, "%s %s", type, fmtId(te->tag));
		return;
	}
3121

3122 3123 3124 3125 3126 3127 3128
	/* BLOBs just have a name, but it's numeric so must not use fmtId */
	if (strcmp(type, "BLOB") == 0)
	{
		appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
		return;
	}

B
Bruce Momjian 已提交
3129
	/*
B
Bruce Momjian 已提交
3130 3131
	 * These object types require additional decoration.  Fortunately, the
	 * information needed is exactly what's in the DROP command.
B
Bruce Momjian 已提交
3132
	 */
3133 3134 3135
	if (strcmp(type, "AGGREGATE") == 0 ||
		strcmp(type, "FUNCTION") == 0 ||
		strcmp(type, "OPERATOR") == 0 ||
3136 3137
		strcmp(type, "OPERATOR CLASS") == 0 ||
		strcmp(type, "OPERATOR FAMILY") == 0)
B
Bruce Momjian 已提交
3138
	{
3139
		/* Chop "DROP " off the front and make a modifiable copy */
3140
		char	   *first = pg_strdup(te->dropStmt + 5);
3141
		char	   *last;
3142

3143 3144
		/* point to last character in string */
		last = first + strlen(first) - 1;
3145

3146 3147 3148 3149
		/* Strip off any ';' or '\n' at the end */
		while (last >= first && (*last == '\n' || *last == ';'))
			last--;
		*(last + 1) = '\0';
B
Bruce Momjian 已提交
3150

3151
		appendPQExpBufferStr(buf, first);
B
Bruce Momjian 已提交
3152 3153

		free(first);
3154
		return;
3155 3156
	}

3157 3158
	write_msg(modulename, "WARNING: don't know how to set owner for object type %s\n",
			  type);
3159 3160 3161
}

static void
3162
_printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isData, bool acl_pass)
B
Bruce Momjian 已提交
3163
{
3164 3165 3166
	/* ACLs are dumped only during acl pass */
	if (acl_pass)
	{
3167
		if (!_tocEntryIsACL(te))
3168 3169 3170 3171
			return;
	}
	else
	{
3172
		if (_tocEntryIsACL(te))
3173 3174 3175 3176 3177
			return;
	}

	/*
	 * Avoid dumping the public schema, as it will already be created ...
B
Bruce Momjian 已提交
3178
	 * unless we are using --clean mode, in which case it's been deleted and
3179
	 * we'd better recreate it.  Likewise for its comment, if any.
3180
	 */
3181 3182 3183 3184 3185
	if (!ropt->dropSchema)
	{
		if (strcmp(te->desc, "SCHEMA") == 0 &&
			strcmp(te->tag, "public") == 0)
			return;
3186
		/* The comment restore would require super-user privs, so avoid it. */
3187 3188 3189 3190
		if (strcmp(te->desc, "COMMENT") == 0 &&
			strcmp(te->tag, "SCHEMA public") == 0)
			return;
	}
3191

3192
	/* Select owner, schema, and tablespace as necessary */
3193 3194
	_becomeOwner(AH, te);
	_selectOutputSchema(AH, te->namespace);
3195
	_selectTablespace(AH, te->tablespace);
3196 3197 3198 3199 3200 3201

	/* Set up OID mode too */
	if (strcmp(te->desc, "TABLE") == 0)
		_setWithOids(AH, te);

	/* Emit header comment for item */
3202
	if (!AH->noTocComments)
3203
	{
3204
		const char *pfx;
3205 3206 3207
		char	   *sanitized_name;
		char	   *sanitized_schema;
		char	   *sanitized_owner;
3208 3209 3210 3211 3212 3213 3214 3215

		if (isData)
			pfx = "Data for ";
		else
			pfx = "";

		ahprintf(AH, "--\n");
		if (AH->public.verbose)
3216
		{
3217 3218 3219 3220 3221
			ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
					 te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
			if (te->nDeps > 0)
			{
				int			i;
3222

3223 3224 3225 3226 3227
				ahprintf(AH, "-- Dependencies:");
				for (i = 0; i < te->nDeps; i++)
					ahprintf(AH, " %d", te->dependencies[i]);
				ahprintf(AH, "\n");
			}
3228
		}
3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245

		/*
		 * Zap any line endings embedded in user-supplied fields, to prevent
		 * corruption of the dump (which could, in the worst case, present an
		 * SQL injection vulnerability if someone were to incautiously load a
		 * dump containing objects with maliciously crafted names).
		 */
		sanitized_name = replace_line_endings(te->tag);
		if (te->namespace)
			sanitized_schema = replace_line_endings(te->namespace);
		else
			sanitized_schema = pg_strdup("-");
		if (!ropt->noOwner)
			sanitized_owner = replace_line_endings(te->owner);
		else
			sanitized_owner = pg_strdup("-");

3246
		ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3247 3248 3249 3250 3251 3252 3253
				 pfx, sanitized_name, te->desc, sanitized_schema,
				 sanitized_owner);

		free(sanitized_name);
		free(sanitized_schema);
		free(sanitized_owner);

3254
		if (te->tablespace && !ropt->noTablespace)
3255
		{
3256
			char	   *sanitized_tablespace;
3257 3258 3259 3260 3261

			sanitized_tablespace = replace_line_endings(te->tablespace);
			ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
			free(sanitized_tablespace);
		}
3262 3263
		ahprintf(AH, "\n");

B
Bruce Momjian 已提交
3264
		if (AH->PrintExtraTocPtr !=NULL)
3265 3266
			(*AH->PrintExtraTocPtr) (AH, te);
		ahprintf(AH, "--\n\n");
3267
	}
B
Bruce Momjian 已提交
3268

3269 3270 3271
	/*
	 * Actually print the definition.
	 *
B
Bruce Momjian 已提交
3272 3273 3274
	 * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
	 * versions put into CREATE SCHEMA.  We have to do this when --no-owner
	 * mode is selected.  This is ugly, but I see no other good way ...
3275
	 */
3276
	if (ropt->noOwner && strcmp(te->desc, "SCHEMA") == 0)
3277
	{
3278
		ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3279
	}
3280
	else
3281
	{
3282 3283
		if (strlen(te->defn) > 0)
			ahprintf(AH, "%s\n\n", te->defn);
3284
	}
3285 3286 3287

	/*
	 * If we aren't using SET SESSION AUTH to determine ownership, we must
3288 3289 3290
	 * instead issue an ALTER OWNER command.  We assume that anything without
	 * a DROP command is not a separately ownable object.  All the categories
	 * with DROP commands must appear in one list or the other.
3291 3292
	 */
	if (!ropt->noOwner && !ropt->use_setsessauth &&
3293 3294 3295
		strlen(te->owner) > 0 && strlen(te->dropStmt) > 0)
	{
		if (strcmp(te->desc, "AGGREGATE") == 0 ||
3296
			strcmp(te->desc, "BLOB") == 0 ||
P
Peter Eisentraut 已提交
3297
			strcmp(te->desc, "COLLATION") == 0 ||
3298 3299 3300 3301 3302 3303
			strcmp(te->desc, "CONVERSION") == 0 ||
			strcmp(te->desc, "DATABASE") == 0 ||
			strcmp(te->desc, "DOMAIN") == 0 ||
			strcmp(te->desc, "FUNCTION") == 0 ||
			strcmp(te->desc, "OPERATOR") == 0 ||
			strcmp(te->desc, "OPERATOR CLASS") == 0 ||
3304
			strcmp(te->desc, "OPERATOR FAMILY") == 0 ||
3305
			strcmp(te->desc, "PROCEDURAL LANGUAGE") == 0 ||
3306 3307 3308 3309
			strcmp(te->desc, "SCHEMA") == 0 ||
			strcmp(te->desc, "TABLE") == 0 ||
			strcmp(te->desc, "TYPE") == 0 ||
			strcmp(te->desc, "VIEW") == 0 ||
3310
			strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3311
			strcmp(te->desc, "SEQUENCE") == 0 ||
R
Robert Haas 已提交
3312
			strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3313
			strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 ||
3314 3315 3316
			strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 ||
			strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 ||
			strcmp(te->desc, "SERVER") == 0)
3317 3318 3319
		{
			PQExpBuffer temp = createPQExpBuffer();

3320
			appendPQExpBufferStr(temp, "ALTER ");
3321
			_getObjectDescription(temp, te, AH);
3322 3323 3324 3325 3326 3327
			appendPQExpBuffer(temp, " OWNER TO %s;", fmtId(te->owner));
			ahprintf(AH, "%s\n\n", temp->data);
			destroyPQExpBuffer(temp);
		}
		else if (strcmp(te->desc, "CAST") == 0 ||
				 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
3328
				 strcmp(te->desc, "CONSTRAINT") == 0 ||
3329 3330
				 strcmp(te->desc, "DEFAULT") == 0 ||
				 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
3331
				 strcmp(te->desc, "INDEX") == 0 ||
3332
				 strcmp(te->desc, "RULE") == 0 ||
3333
				 strcmp(te->desc, "TRIGGER") == 0 ||
3334
				 strcmp(te->desc, "ROW SECURITY") == 0 ||
3335
				 strcmp(te->desc, "POLICY") == 0 ||
3336
				 strcmp(te->desc, "USER MAPPING") == 0)
3337 3338 3339 3340 3341 3342 3343 3344
		{
			/* these object types don't have separate owners */
		}
		else
		{
			write_msg(modulename, "WARNING: don't know how to set owner for object type %s\n",
					  te->desc);
		}
3345
	}
B
Bruce Momjian 已提交
3346

3347 3348
	/*
	 * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
B
Bruce Momjian 已提交
3349
	 * commands, so we can no longer assume we know the current auth setting.
3350
	 */
3351
	if (acl_pass)
3352 3353 3354 3355 3356
	{
		if (AH->currUser)
			free(AH->currUser);
		AH->currUser = NULL;
	}
B
Bruce Momjian 已提交
3357 3358
}

3359 3360 3361 3362 3363 3364 3365
/*
 * Sanitize a string to be included in an SQL comment, by replacing any
 * newlines with spaces.
 */
static char *
replace_line_endings(const char *str)
{
3366 3367
	char	   *result;
	char	   *s;
3368 3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379

	result = pg_strdup(str);

	for (s = result; *s != '\0'; s++)
	{
		if (*s == '\n' || *s == '\r')
			*s = ' ';
	}

	return result;
}

B
Bruce Momjian 已提交
3380 3381
void
WriteHead(ArchiveHandle *AH)
B
Bruce Momjian 已提交
3382
{
B
Bruce Momjian 已提交
3383
	struct tm	crtm;
3384

B
Bruce Momjian 已提交
3385 3386 3387 3388 3389
	(*AH->WriteBufPtr) (AH, "PGDMP", 5);		/* Magic code */
	(*AH->WriteBytePtr) (AH, AH->vmaj);
	(*AH->WriteBytePtr) (AH, AH->vmin);
	(*AH->WriteBytePtr) (AH, AH->vrev);
	(*AH->WriteBytePtr) (AH, AH->intSize);
3390
	(*AH->WriteBytePtr) (AH, AH->offSize);
B
Bruce Momjian 已提交
3391
	(*AH->WriteBytePtr) (AH, AH->format);
B
Bruce Momjian 已提交
3392

3393
#ifndef HAVE_LIBZ
B
Bruce Momjian 已提交
3394
	if (AH->compression != 0)
3395
		write_msg(modulename, "WARNING: requested compression not available in this "
3396
				  "installation -- archive will be uncompressed\n");
B
Bruce Momjian 已提交
3397

B
Bruce Momjian 已提交
3398
	AH->compression = 0;
3399
#endif
B
Bruce Momjian 已提交
3400

3401 3402 3403 3404 3405 3406 3407 3408 3409 3410
	WriteInt(AH, AH->compression);

	crtm = *localtime(&AH->createDate);
	WriteInt(AH, crtm.tm_sec);
	WriteInt(AH, crtm.tm_min);
	WriteInt(AH, crtm.tm_hour);
	WriteInt(AH, crtm.tm_mday);
	WriteInt(AH, crtm.tm_mon);
	WriteInt(AH, crtm.tm_year);
	WriteInt(AH, crtm.tm_isdst);
3411
	WriteStr(AH, PQdb(AH->connection));
3412 3413
	WriteStr(AH, AH->public.remoteVersionStr);
	WriteStr(AH, PG_VERSION);
B
Bruce Momjian 已提交
3414 3415
}

B
Bruce Momjian 已提交
3416 3417
void
ReadHead(ArchiveHandle *AH)
B
Bruce Momjian 已提交
3418
{
B
Bruce Momjian 已提交
3419 3420
	char		tmpMag[7];
	int			fmt;
3421
	struct tm	crtm;
B
Bruce Momjian 已提交
3422

3423 3424 3425
	/*
	 * If we haven't already read the header, do so.
	 *
B
Bruce Momjian 已提交
3426
	 * NB: this code must agree with _discoverArchiveFormat().  Maybe find a
B
Bruce Momjian 已提交
3427
	 * way to unify the cases?
3428
	 */
B
Bruce Momjian 已提交
3429 3430
	if (!AH->readHeader)
	{
3431
		(*AH->ReadBufPtr) (AH, tmpMag, 5);
B
Bruce Momjian 已提交
3432

B
Bruce Momjian 已提交
3433
		if (strncmp(tmpMag, "PGDMP", 5) != 0)
3434
			exit_horribly(modulename, "did not find magic string in file header\n");
B
Bruce Momjian 已提交
3435

B
Bruce Momjian 已提交
3436 3437
		AH->vmaj = (*AH->ReadBytePtr) (AH);
		AH->vmin = (*AH->ReadBytePtr) (AH);
B
Bruce Momjian 已提交
3438

B
Bruce Momjian 已提交
3439 3440 3441
		if (AH->vmaj > 1 || ((AH->vmaj == 1) && (AH->vmin > 0)))		/* Version > 1.0 */
			AH->vrev = (*AH->ReadBytePtr) (AH);
		else
3442
			AH->vrev = 0;
B
Bruce Momjian 已提交
3443

B
Bruce Momjian 已提交
3444
		AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
B
Bruce Momjian 已提交
3445

3446
		if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
3447 3448
			exit_horribly(modulename, "unsupported version (%d.%d) in file header\n",
						  AH->vmaj, AH->vmin);
B
Bruce Momjian 已提交
3449

B
Bruce Momjian 已提交
3450
		AH->intSize = (*AH->ReadBytePtr) (AH);
3451
		if (AH->intSize > 32)
3452 3453
			exit_horribly(modulename, "sanity check on integer size (%lu) failed\n",
						  (unsigned long) AH->intSize);
B
Bruce Momjian 已提交
3454

3455
		if (AH->intSize > sizeof(int))
3456
			write_msg(modulename, "WARNING: archive was made on a machine with larger integers, some operations might fail\n");
B
Bruce Momjian 已提交
3457

3458
		if (AH->version >= K_VERS_1_7)
B
Bruce Momjian 已提交
3459
			AH->offSize = (*AH->ReadBytePtr) (AH);
3460
		else
B
Bruce Momjian 已提交
3461
			AH->offSize = AH->intSize;
3462

B
Bruce Momjian 已提交
3463
		fmt = (*AH->ReadBytePtr) (AH);
B
Bruce Momjian 已提交
3464

3465
		if (AH->format != fmt)
3466 3467
			exit_horribly(modulename, "expected format (%d) differs from format found in file (%d)\n",
						  AH->format, fmt);
B
Bruce Momjian 已提交
3468
	}
B
Bruce Momjian 已提交
3469

B
Bruce Momjian 已提交
3470 3471
	if (AH->version >= K_VERS_1_2)
	{
3472
		if (AH->version < K_VERS_1_4)
B
Bruce Momjian 已提交
3473
			AH->compression = (*AH->ReadBytePtr) (AH);
3474 3475
		else
			AH->compression = ReadInt(AH);
B
Bruce Momjian 已提交
3476 3477
	}
	else
3478
		AH->compression = Z_DEFAULT_COMPRESSION;
B
Bruce Momjian 已提交
3479

3480
#ifndef HAVE_LIBZ
B
Bruce Momjian 已提交
3481
	if (AH->compression != 0)
3482
		write_msg(modulename, "WARNING: archive is compressed, but this installation does not support compression -- no data will be available\n");
B
Bruce Momjian 已提交
3483 3484
#endif

3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498
	if (AH->version >= K_VERS_1_4)
	{
		crtm.tm_sec = ReadInt(AH);
		crtm.tm_min = ReadInt(AH);
		crtm.tm_hour = ReadInt(AH);
		crtm.tm_mday = ReadInt(AH);
		crtm.tm_mon = ReadInt(AH);
		crtm.tm_year = ReadInt(AH);
		crtm.tm_isdst = ReadInt(AH);

		AH->archdbname = ReadStr(AH);

		AH->createDate = mktime(&crtm);

B
Bruce Momjian 已提交
3499
		if (AH->createDate == (time_t) -1)
3500
			write_msg(modulename, "WARNING: invalid creation date in header\n");
3501 3502
	}

3503 3504 3505 3506 3507
	if (AH->version >= K_VERS_1_10)
	{
		AH->archiveRemoteVersion = ReadStr(AH);
		AH->archiveDumpVersion = ReadStr(AH);
	}
B
Bruce Momjian 已提交
3508 3509 3510
}


3511 3512
/*
 * checkSeek
3513
 *	  check to see if ftell/fseek can be performed.
3514 3515 3516 3517
 */
bool
checkSeek(FILE *fp)
{
3518 3519 3520
	pgoff_t		tpos;

	/*
B
Bruce Momjian 已提交
3521 3522
	 * If pgoff_t is wider than long, we must have "real" fseeko and not an
	 * emulation using fseek.  Otherwise report no seek capability.
3523 3524 3525
	 */
#ifndef HAVE_FSEEKO
	if (sizeof(pgoff_t) > sizeof(long))
3526 3527
		return false;
#endif
3528 3529 3530

	/* Check that ftello works on this file */
	tpos = ftello(fp);
3531
	if (tpos < 0)
3532 3533 3534
		return false;

	/*
B
Bruce Momjian 已提交
3535
	 * Check that fseeko(SEEK_SET) works, too.  NB: we used to try to test
3536 3537 3538 3539 3540 3541 3542
	 * this with fseeko(fp, 0, SEEK_CUR).  But some platforms treat that as a
	 * successful no-op even on files that are otherwise unseekable.
	 */
	if (fseeko(fp, tpos, SEEK_SET) != 0)
		return false;

	return true;
3543
}
3544 3545 3546 3547 3548 3549 3550 3551


/*
 * dumpTimestamp
 */
static void
dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
{
3552
	char		buf[64];
3553

3554
	if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
3555 3556
		ahprintf(AH, "-- %s %s\n\n", msg, buf);
}
3557 3558 3559 3560 3561

/*
 * Main engine for parallel restore.
 *
 * Work is done in three phases.
3562
 * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
B
Bruce Momjian 已提交
3563
 * just as for a standard restore.  Second we process the remaining non-ACL
3564 3565 3566 3567
 * steps in parallel worker children (threads on Windows, processes on Unix),
 * each of which connects separately to the database.  Finally we process all
 * the ACL entries in a single connection (that happens back in
 * RestoreArchive).
3568 3569
 */
static void
A
Andrew Dunstan 已提交
3570
restore_toc_entries_prefork(ArchiveHandle *AH)
3571 3572
{
	RestoreOptions *ropt = AH->ropt;
3573
	bool		skipped_some;
3574 3575
	TocEntry   *next_work_item;

A
Andrew Dunstan 已提交
3576
	ahlog(AH, 2, "entering restore_toc_entries_prefork\n");
3577 3578 3579 3580 3581

	/* Adjust dependency information */
	fix_dependencies(AH);

	/*
3582 3583 3584
	 * Do all the early stuff in a single connection in the parent. There's no
	 * great point in running it in parallel, in fact it will actually run
	 * faster in a single connection because we avoid all the connection and
B
Bruce Momjian 已提交
3585
	 * setup overhead.  Also, pre-9.2 pg_dump versions were not very good
3586 3587 3588 3589 3590 3591 3592
	 * about showing all the dependencies of SECTION_PRE_DATA items, so we do
	 * not risk trying to process them out-of-order.
	 *
	 * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
	 * before DATA items, and all DATA items before POST_DATA items.  That is
	 * not certain to be true in older archives, though, so this loop is coded
	 * to not assume it.
3593
	 */
3594
	skipped_some = false;
3595
	for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
3596
	{
3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618
		/* NB: process-or-continue logic must be the inverse of loop below */
		if (next_work_item->section != SECTION_PRE_DATA)
		{
			/* DATA and POST_DATA items are just ignored for now */
			if (next_work_item->section == SECTION_DATA ||
				next_work_item->section == SECTION_POST_DATA)
			{
				skipped_some = true;
				continue;
			}
			else
			{
				/*
				 * SECTION_NONE items, such as comments, can be processed now
				 * if we are still in the PRE_DATA part of the archive.  Once
				 * we've skipped any items, we have to consider whether the
				 * comment's dependencies are satisfied, so skip it for now.
				 */
				if (skipped_some)
					continue;
			}
		}
3619 3620 3621 3622 3623 3624 3625

		ahlog(AH, 1, "processing item %d %s %s\n",
			  next_work_item->dumpId,
			  next_work_item->desc, next_work_item->tag);

		(void) restore_toc_entry(AH, next_work_item, ropt, false);

3626 3627
		/* there should be no touch of ready_list here, so pass NULL */
		reduce_dependencies(AH, next_work_item, NULL);
3628 3629 3630
	}

	/*
B
Bruce Momjian 已提交
3631
	 * Now close parent connection in prep for parallel steps.  We do this
3632 3633 3634
	 * mainly to ensure that we don't exceed the specified number of parallel
	 * connections.
	 */
R
Robert Haas 已提交
3635
	DisconnectDatabase(&AH->public);
3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647

	/* blow away any transient state from the old connection */
	if (AH->currUser)
		free(AH->currUser);
	AH->currUser = NULL;
	if (AH->currSchema)
		free(AH->currSchema);
	AH->currSchema = NULL;
	if (AH->currTablespace)
		free(AH->currTablespace);
	AH->currTablespace = NULL;
	AH->currWithOids = -1;
A
Andrew Dunstan 已提交
3648 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670 3671 3672
}

/*
 * Main engine for parallel restore.
 *
 * Work is done in three phases.
 * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
 * just as for a standard restore. This is done in restore_toc_entries_prefork().
 * Second we process the remaining non-ACL steps in parallel worker children
 * (threads on Windows, processes on Unix), these fork off and set up their
 * connections before we call restore_toc_entries_parallel_forked.
 * Finally we process all the ACL entries in a single connection (that happens
 * back in RestoreArchive).
 */
static void
restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
							 TocEntry *pending_list)
{
	int			work_status;
	bool		skipped_some;
	TocEntry	ready_list;
	TocEntry   *next_work_item;
	int			ret_child;

	ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
3673

3674
	/*
A
Andrew Dunstan 已提交
3675
	 * Initialize the lists of ready items, the list for pending items has
B
Bruce Momjian 已提交
3676
	 * already been initialized in the caller.  After this setup, the pending
A
Andrew Dunstan 已提交
3677 3678 3679 3680
	 * list is everything that needs to be done but is blocked by one or more
	 * dependencies, while the ready list contains items that have no
	 * remaining dependencies. Note: we don't yet filter out entries that
	 * aren't going to be restored. They might participate in dependency
B
Bruce Momjian 已提交
3681 3682
	 * chains connecting entries that should be restored, so we treat them as
	 * live until we actually process them.
3683 3684
	 */
	par_list_header_init(&ready_list);
3685
	skipped_some = false;
3686
	for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
3687
	{
3688 3689 3690 3691 3692 3693
		/* NB: process-or-continue logic must be the inverse of loop above */
		if (next_work_item->section == SECTION_PRE_DATA)
		{
			/* All PRE_DATA items were dealt with above */
			continue;
		}
3694 3695 3696
		if (next_work_item->section == SECTION_DATA ||
			next_work_item->section == SECTION_POST_DATA)
		{
3697 3698
			/* set this flag at same point that previous loop did */
			skipped_some = true;
3699
		}
3700 3701 3702 3703 3704 3705 3706 3707
		else
		{
			/* SECTION_NONE items must be processed if previous loop didn't */
			if (!skipped_some)
				continue;
		}

		if (next_work_item->depCount > 0)
A
Andrew Dunstan 已提交
3708
			par_list_append(pending_list, next_work_item);
3709 3710
		else
			par_list_append(&ready_list, next_work_item);
3711 3712
	}

3713 3714 3715 3716 3717 3718 3719
	/*
	 * main parent loop
	 *
	 * Keep going until there is no worker still running AND there is no work
	 * left to be done.
	 */

3720
	ahlog(AH, 1, "entering main parallel loop\n");
3721

A
Andrew Dunstan 已提交
3722 3723
	while ((next_work_item = get_next_work_item(AH, &ready_list, pstate)) != NULL ||
		   !IsEveryWorkerIdle(pstate))
3724 3725 3726
	{
		if (next_work_item != NULL)
		{
3727 3728 3729
			/* If not to be restored, don't waste time launching a worker */
			if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0 ||
				_tocEntryIsACL(next_work_item))
3730 3731 3732 3733 3734
			{
				ahlog(AH, 1, "skipping item %d %s %s\n",
					  next_work_item->dumpId,
					  next_work_item->desc, next_work_item->tag);

3735 3736
				par_list_remove(next_work_item);
				reduce_dependencies(AH, next_work_item, &ready_list);
3737 3738 3739 3740

				continue;
			}

A
Andrew Dunstan 已提交
3741 3742 3743
			ahlog(AH, 1, "launching item %d %s %s\n",
				  next_work_item->dumpId,
				  next_work_item->desc, next_work_item->tag);
3744

A
Andrew Dunstan 已提交
3745
			par_list_remove(next_work_item);
3746

A
Andrew Dunstan 已提交
3747 3748 3749 3750
			Assert(GetIdleWorker(pstate) != NO_SLOT);
			DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE);
		}
		else
3751
		{
A
Andrew Dunstan 已提交
3752 3753
			/* at least one child is working and we have nothing ready. */
			Assert(!IsEveryWorkerIdle(pstate));
3754
		}
3755

A
Andrew Dunstan 已提交
3756 3757 3758
		for (;;)
		{
			int			nTerm = 0;
3759

A
Andrew Dunstan 已提交
3760 3761 3762 3763 3764 3765 3766 3767 3768 3769 3770 3771
			/*
			 * In order to reduce dependencies as soon as possible and
			 * especially to reap the status of workers who are working on
			 * items that pending items depend on, we do a non-blocking check
			 * for ended workers first.
			 *
			 * However, if we do not have any other work items currently that
			 * workers can work on, we do not busy-loop here but instead
			 * really wait for at least one worker to terminate. Hence we call
			 * ListenToWorkers(..., ..., do_wait = true) in this case.
			 */
			ListenToWorkers(AH, pstate, !next_work_item);
3772

A
Andrew Dunstan 已提交
3773 3774 3775 3776
			while ((ret_child = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
			{
				nTerm++;
				mark_work_done(AH, &ready_list, ret_child, work_status, pstate);
3777 3778
			}

A
Andrew Dunstan 已提交
3779 3780 3781 3782 3783 3784 3785
			/*
			 * We need to make sure that we have an idle worker before
			 * re-running the loop. If nTerm > 0 we already have that (quick
			 * check).
			 */
			if (nTerm > 0)
				break;
3786

A
Andrew Dunstan 已提交
3787 3788 3789 3790 3791 3792 3793 3794 3795
			/* if nobody terminated, explicitly check for an idle worker */
			if (GetIdleWorker(pstate) != NO_SLOT)
				break;

			/*
			 * If we have no idle worker, read the result of one or more
			 * workers and loop the loop to call ReapWorkerStatus() on them.
			 */
			ListenToWorkers(AH, pstate, true);
3796 3797 3798
		}
	}

3799
	ahlog(AH, 1, "finished main parallel loop\n");
A
Andrew Dunstan 已提交
3800
}
3801

A
Andrew Dunstan 已提交
3802 3803 3804 3805 3806 3807 3808
static void
restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
{
	RestoreOptions *ropt = AH->ropt;
	TocEntry   *te;

	ahlog(AH, 2, "entering restore_toc_entries_postfork\n");
3809

3810 3811 3812 3813 3814
	/*
	 * Now reconnect the single parent connection.
	 */
	ConnectDatabase((Archive *) AH, ropt->dbname,
					ropt->pghost, ropt->pgport, ropt->username,
3815
					ropt->promptPassword);
3816 3817 3818 3819

	_doSetFixedOutputState(AH);

	/*
3820 3821 3822
	 * Make sure there is no non-ACL work left due to, say, circular
	 * dependencies, or some other pathological condition. If so, do it in the
	 * single parent connection.
3823
	 */
A
Andrew Dunstan 已提交
3824
	for (te = pending_list->par_next; te != pending_list; te = te->par_next)
3825
	{
3826 3827 3828
		ahlog(AH, 1, "processing missed item %d %s %s\n",
			  te->dumpId, te->desc, te->tag);
		(void) restore_toc_entry(AH, te, ropt, false);
3829 3830 3831 3832 3833
	}

	/* The ACLs will be handled back in RestoreArchive. */
}

3834 3835 3836 3837 3838 3839 3840
/*
 * Check if te1 has an exclusive lock requirement for an item that te2 also
 * requires, whether or not te2's requirement is for an exclusive lock.
 */
static bool
has_lock_conflicts(TocEntry *te1, TocEntry *te2)
{
3841 3842
	int			j,
				k;
3843 3844 3845 3846 3847 3848 3849 3850 3851 3852 3853 3854 3855

	for (j = 0; j < te1->nLockDeps; j++)
	{
		for (k = 0; k < te2->nDeps; k++)
		{
			if (te1->lockDeps[j] == te2->dependencies[k])
				return true;
		}
	}
	return false;
}


3856 3857 3858 3859 3860 3861 3862 3863 3864 3865 3866 3867 3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886 3887 3888
/*
 * Initialize the header of a parallel-processing list.
 *
 * These are circular lists with a dummy TocEntry as header, just like the
 * main TOC list; but we use separate list links so that an entry can be in
 * the main TOC list as well as in a parallel-processing list.
 */
static void
par_list_header_init(TocEntry *l)
{
	l->par_prev = l->par_next = l;
}

/* Append te to the end of the parallel-processing list headed by l */
static void
par_list_append(TocEntry *l, TocEntry *te)
{
	te->par_prev = l->par_prev;
	l->par_prev->par_next = te;
	l->par_prev = te;
	te->par_next = l;
}

/* Remove te from whatever parallel-processing list it's in */
static void
par_list_remove(TocEntry *te)
{
	te->par_prev->par_next = te->par_next;
	te->par_next->par_prev = te->par_prev;
	te->par_prev = NULL;
	te->par_next = NULL;
}

3889

3890 3891 3892 3893
/*
 * Find the next work item (if any) that is capable of being run now.
 *
 * To qualify, the item must have no remaining dependencies
3894 3895 3896
 * and no requirements for locks that are incompatible with
 * items currently running.  Items in the ready_list are known to have
 * no remaining dependencies, but we have to check for lock conflicts.
3897
 *
3898 3899
 * Note that the returned item has *not* been removed from ready_list.
 * The caller must do that after successfully dispatching the item.
3900 3901 3902 3903 3904 3905
 *
 * pref_non_data is for an alternative selection algorithm that gives
 * preference to non-data items if there is already a data load running.
 * It is currently disabled.
 */
static TocEntry *
3906
get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
A
Andrew Dunstan 已提交
3907
				   ParallelState *pstate)
3908
{
3909 3910 3911 3912 3913
	bool		pref_non_data = false;	/* or get from AH->ropt */
	TocEntry   *data_te = NULL;
	TocEntry   *te;
	int			i,
				k;
3914 3915 3916 3917 3918 3919

	/*
	 * Bogus heuristics for pref_non_data
	 */
	if (pref_non_data)
	{
3920
		int			count = 0;
3921

A
Andrew Dunstan 已提交
3922 3923 3924
		for (k = 0; k < pstate->numWorkers; k++)
			if (pstate->parallelSlot[k].args->te != NULL &&
				pstate->parallelSlot[k].args->te->section == SECTION_DATA)
3925
				count++;
A
Andrew Dunstan 已提交
3926
		if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
3927 3928 3929 3930
			pref_non_data = false;
	}

	/*
3931
	 * Search the ready_list until we find a suitable item.
3932
	 */
3933
	for (te = ready_list->par_next; te != ready_list; te = te->par_next)
3934
	{
3935
		bool		conflicts = false;
3936 3937 3938

		/*
		 * Check to see if the item would need exclusive lock on something
3939 3940
		 * that a currently running item also needs lock on, or vice versa. If
		 * so, we don't want to schedule them together.
3941
		 */
A
Andrew Dunstan 已提交
3942
		for (i = 0; i < pstate->numWorkers && !conflicts; i++)
3943
		{
3944
			TocEntry   *running_te;
3945

A
Andrew Dunstan 已提交
3946
			if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING)
3947
				continue;
A
Andrew Dunstan 已提交
3948
			running_te = pstate->parallelSlot[i].args->te;
3949 3950 3951

			if (has_lock_conflicts(te, running_te) ||
				has_lock_conflicts(running_te, te))
3952
			{
3953 3954
				conflicts = true;
				break;
3955 3956 3957 3958 3959 3960 3961 3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974
			}
		}

		if (conflicts)
			continue;

		if (pref_non_data && te->section == SECTION_DATA)
		{
			if (data_te == NULL)
				data_te = te;
			continue;
		}

		/* passed all tests, so this item can run */
		return te;
	}

	if (data_te != NULL)
		return data_te;

3975
	ahlog(AH, 2, "no item ready\n");
3976 3977 3978 3979 3980 3981 3982
	return NULL;
}


/*
 * Restore a single TOC item in parallel with others
 *
A
Andrew Dunstan 已提交
3983 3984 3985 3986
 * this is run in the worker, i.e. in a thread (Windows) or a separate process
 * (everything else). A worker process executes several such work items during
 * a parallel backup or restore. Once we terminate here and report back that
 * our work is finished, the master process will assign us a new work item.
3987
 */
A
Andrew Dunstan 已提交
3988
int
B
Bruce Momjian 已提交
3989
parallel_restore(ParallelArgs *args)
3990 3991
{
	ArchiveHandle *AH = args->AH;
3992
	TocEntry   *te = args->te;
3993
	RestoreOptions *ropt = AH->ropt;
A
Andrew Dunstan 已提交
3994
	int			status;
3995 3996 3997

	_doSetFixedOutputState(AH);

A
Andrew Dunstan 已提交
3998
	Assert(AH->connection != NULL);
3999

A
Andrew Dunstan 已提交
4000
	AH->public.n_errors = 0;
4001

A
Andrew Dunstan 已提交
4002 4003
	/* Restore the TOC item */
	status = restore_toc_entry(AH, te, ropt, true);
4004

A
Andrew Dunstan 已提交
4005
	return status;
4006 4007 4008 4009 4010 4011 4012 4013 4014 4015
}


/*
 * Housekeeping to be done after a step has been parallel restored.
 *
 * Clear the appropriate slot, free all the extra memory we allocated,
 * update status, and reduce the dependency count of any dependent items.
 */
static void
4016
mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
A
Andrew Dunstan 已提交
4017 4018
			   int worker, int status,
			   ParallelState *pstate)
4019
{
4020
	TocEntry   *te = NULL;
4021

A
Andrew Dunstan 已提交
4022
	te = pstate->parallelSlot[worker].args->te;
4023 4024

	if (te == NULL)
4025
		exit_horribly(modulename, "could not find slot of finished worker\n");
4026 4027 4028 4029 4030 4031 4032 4033 4034 4035 4036 4037 4038 4039

	ahlog(AH, 1, "finished item %d %s %s\n",
		  te->dumpId, te->desc, te->tag);

	if (status == WORKER_CREATE_DONE)
		mark_create_done(AH, te);
	else if (status == WORKER_INHIBIT_DATA)
	{
		inhibit_data_for_failed_table(AH, te);
		AH->public.n_errors++;
	}
	else if (status == WORKER_IGNORED_ERRORS)
		AH->public.n_errors++;
	else if (status != 0)
4040 4041
		exit_horribly(modulename, "worker process failed: exit code %d\n",
					  status);
4042

4043
	reduce_dependencies(AH, te, ready_list);
4044 4045 4046 4047 4048 4049
}


/*
 * Process the dependency information into a form useful for parallel restore.
 *
4050 4051 4052
 * This function takes care of fixing up some missing or badly designed
 * dependencies, and then prepares subsidiary data structures that will be
 * used in the main parallel-restore logic, including:
4053 4054
 * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
 * 2. We set up depCount fields that are the number of as-yet-unprocessed
4055 4056 4057 4058 4059 4060 4061 4062 4063
 * dependencies for each TOC entry.
 *
 * We also identify locking dependencies so that we can avoid trying to
 * schedule conflicting items at the same time.
 */
static void
fix_dependencies(ArchiveHandle *AH)
{
	TocEntry   *te;
4064
	int			i;
4065 4066

	/*
4067 4068
	 * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
	 * items are marked as not being in any parallel-processing list.
4069 4070 4071 4072
	 */
	for (te = AH->toc->next; te != AH->toc; te = te->next)
	{
		te->depCount = te->nDeps;
4073 4074
		te->revDeps = NULL;
		te->nRevDeps = 0;
4075 4076
		te->par_prev = NULL;
		te->par_next = NULL;
4077 4078 4079 4080 4081
	}

	/*
	 * POST_DATA items that are shown as depending on a table need to be
	 * re-pointed to depend on that table's data, instead.  This ensures they
4082
	 * won't get scheduled until the data has been loaded.
4083
	 */
4084
	repoint_table_dependencies(AH);
4085 4086

	/*
4087 4088 4089
	 * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
	 * COMMENTS to BLOBS.  Cope.  (We assume there's only one BLOBS and only
	 * one BLOB COMMENTS in such files.)
4090 4091 4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102
	 */
	if (AH->version < K_VERS_1_11)
	{
		for (te = AH->toc->next; te != AH->toc; te = te->next)
		{
			if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
			{
				TocEntry   *te2;

				for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
				{
					if (strcmp(te2->desc, "BLOBS") == 0)
					{
4103
						te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115
						te->dependencies[0] = te2->dumpId;
						te->nDeps++;
						te->depCount++;
						break;
					}
				}
				break;
			}
		}
	}

	/*
4116 4117 4118 4119 4120 4121
	 * At this point we start to build the revDeps reverse-dependency arrays,
	 * so all changes of dependencies must be complete.
	 */

	/*
	 * Count the incoming dependencies for each item.  Also, it is possible
4122
	 * that the dependencies list items that are not in the archive at all
A
Andrew Dunstan 已提交
4123 4124
	 * (that should not happen in 9.2 and later, but is highly likely in older
	 * archives).  Subtract such items from the depCounts.
4125 4126 4127 4128 4129
	 */
	for (te = AH->toc->next; te != AH->toc; te = te->next)
	{
		for (i = 0; i < te->nDeps; i++)
		{
4130 4131
			DumpId		depid = te->dependencies[i];

4132 4133
			if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
				AH->tocsByDumpId[depid]->nRevDeps++;
4134
			else
4135 4136 4137 4138
				te->depCount--;
		}
	}

4139
	/*
4140 4141
	 * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
	 * it as a counter below.
4142 4143 4144 4145
	 */
	for (te = AH->toc->next; te != AH->toc; te = te->next)
	{
		if (te->nRevDeps > 0)
4146
			te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4147 4148 4149 4150
		te->nRevDeps = 0;
	}

	/*
4151 4152
	 * Build the revDeps[] arrays of incoming-dependency dumpIds.  This had
	 * better agree with the loops above.
4153 4154 4155 4156 4157 4158 4159
	 */
	for (te = AH->toc->next; te != AH->toc; te = te->next)
	{
		for (i = 0; i < te->nDeps; i++)
		{
			DumpId		depid = te->dependencies[i];

4160
			if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4161
			{
4162
				TocEntry   *otherte = AH->tocsByDumpId[depid];
4163 4164 4165 4166 4167 4168

				otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
			}
		}
	}

4169 4170 4171 4172 4173 4174 4175
	/*
	 * Lastly, work out the locking dependencies.
	 */
	for (te = AH->toc->next; te != AH->toc; te = te->next)
	{
		te->lockDeps = NULL;
		te->nLockDeps = 0;
4176
		identify_locking_dependencies(AH, te);
4177 4178 4179 4180
	}
}

/*
4181
 * Change dependencies on table items to depend on table data items instead,
4182 4183 4184
 * but only in POST_DATA items.
 */
static void
4185
repoint_table_dependencies(ArchiveHandle *AH)
4186 4187
{
	TocEntry   *te;
4188
	int			i;
4189
	DumpId		olddep;
4190 4191 4192 4193 4194 4195 4196

	for (te = AH->toc->next; te != AH->toc; te = te->next)
	{
		if (te->section != SECTION_POST_DATA)
			continue;
		for (i = 0; i < te->nDeps; i++)
		{
4197 4198 4199
			olddep = te->dependencies[i];
			if (olddep <= AH->maxDumpId &&
				AH->tableDataId[olddep] != 0)
4200
			{
4201
				te->dependencies[i] = AH->tableDataId[olddep];
4202
				ahlog(AH, 2, "transferring dependency %d -> %d to %d\n",
4203
					  te->dumpId, olddep, AH->tableDataId[olddep]);
4204 4205 4206 4207 4208 4209 4210 4211 4212 4213 4214
			}
		}
	}
}

/*
 * Identify which objects we'll need exclusive lock on in order to restore
 * the given TOC entry (*other* than the one identified by the TOC entry
 * itself).  Record their dump IDs in the entry's lockDeps[] array.
 */
static void
4215
identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227 4228 4229 4230 4231 4232 4233
{
	DumpId	   *lockids;
	int			nlockids;
	int			i;

	/* Quick exit if no dependencies at all */
	if (te->nDeps == 0)
		return;

	/* Exit if this entry doesn't need exclusive lock on other objects */
	if (!(strcmp(te->desc, "CONSTRAINT") == 0 ||
		  strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
		  strcmp(te->desc, "FK CONSTRAINT") == 0 ||
		  strcmp(te->desc, "RULE") == 0 ||
		  strcmp(te->desc, "TRIGGER") == 0))
		return;

	/*
4234 4235 4236 4237 4238 4239 4240 4241
	 * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
	 * item listed among its dependencies.  Originally all of these would have
	 * been TABLE items, but repoint_table_dependencies would have repointed
	 * them to the TABLE DATA items if those are present (which they might not
	 * be, eg in a schema-only dump).  Note that all of the entries we are
	 * processing here are POST_DATA; otherwise there might be a significant
	 * difference between a dependency on a table and a dependency on its
	 * data, so that closer analysis would be needed here.
4242
	 */
4243
	lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
4244 4245 4246
	nlockids = 0;
	for (i = 0; i < te->nDeps; i++)
	{
4247
		DumpId		depid = te->dependencies[i];
4248

4249
		if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
4250
			((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
T
Tom Lane 已提交
4251
			 strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
4252 4253 4254 4255 4256 4257 4258 4259 4260
			lockids[nlockids++] = depid;
	}

	if (nlockids == 0)
	{
		free(lockids);
		return;
	}

T
Tom Lane 已提交
4261
	te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
4262 4263 4264 4265 4266
	te->nLockDeps = nlockids;
}

/*
 * Remove the specified TOC entry from the depCounts of items that depend on
4267 4268
 * it, thereby possibly making them ready-to-run.  Any pending item that
 * becomes ready should be moved to the ready list.
4269 4270
 */
static void
4271
reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
4272
{
4273
	int			i;
4274

4275
	ahlog(AH, 2, "reducing dependencies for %d\n", te->dumpId);
4276

4277
	for (i = 0; i < te->nRevDeps; i++)
4278
	{
4279
		TocEntry   *otherte = AH->tocsByDumpId[te->revDeps[i]];
4280 4281 4282

		otherte->depCount--;
		if (otherte->depCount == 0 && otherte->par_prev != NULL)
4283
		{
4284 4285 4286 4287
			/* It must be in the pending list, so remove it ... */
			par_list_remove(otherte);
			/* ... and add to ready_list */
			par_list_append(ready_list, otherte);
4288 4289 4290 4291 4292 4293 4294 4295 4296 4297 4298
		}
	}
}

/*
 * Set the created flag on the DATA member corresponding to the given
 * TABLE member
 */
static void
mark_create_done(ArchiveHandle *AH, TocEntry *te)
{
4299
	if (AH->tableDataId[te->dumpId] != 0)
4300
	{
4301 4302 4303
		TocEntry   *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];

		ted->created = true;
4304 4305 4306 4307 4308 4309 4310 4311 4312 4313 4314 4315 4316
	}
}

/*
 * Mark the DATA member corresponding to the given TABLE member
 * as not wanted
 */
static void
inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
{
	ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
		  te->tag);

4317
	if (AH->tableDataId[te->dumpId] != 0)
4318
	{
4319 4320 4321
		TocEntry   *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];

		ted->reqs = 0;
4322 4323 4324 4325 4326 4327 4328 4329 4330
	}
}

/*
 * Clone and de-clone routines used in parallel restoration.
 *
 * Enough of the structure is cloned to ensure that there is no
 * conflict between different threads each with their own clone.
 */
A
Andrew Dunstan 已提交
4331
ArchiveHandle *
4332 4333 4334 4335 4336
CloneArchive(ArchiveHandle *AH)
{
	ArchiveHandle *clone;

	/* Make a "flat" copy */
4337
	clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
4338 4339
	memcpy(clone, AH, sizeof(ArchiveHandle));

4340 4341
	/* Handle format-independent fields */
	memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
4342 4343 4344 4345 4346 4347 4348 4349 4350 4351

	/* The clone will have its own connection, so disregard connection state */
	clone->connection = NULL;
	clone->currUser = NULL;
	clone->currSchema = NULL;
	clone->currTablespace = NULL;
	clone->currWithOids = -1;

	/* savedPassword must be local in case we change it while connecting */
	if (clone->savedPassword)
4352
		clone->savedPassword = pg_strdup(clone->savedPassword);
4353 4354 4355 4356

	/* clone has its own error count, too */
	clone->public.n_errors = 0;

A
Andrew Dunstan 已提交
4357 4358 4359 4360 4361 4362 4363 4364 4365 4366 4367 4368 4369 4370 4371 4372 4373 4374 4375 4376 4377 4378 4379 4380 4381 4382 4383 4384 4385 4386 4387 4388 4389 4390 4391 4392 4393 4394 4395 4396 4397 4398 4399 4400 4401 4402 4403 4404 4405 4406
	/*
	 * Connect our new clone object to the database: In parallel restore the
	 * parent is already disconnected, because we can connect the worker
	 * processes independently to the database (no snapshot sync required). In
	 * parallel backup we clone the parent's existing connection.
	 */
	if (AH->mode == archModeRead)
	{
		RestoreOptions *ropt = AH->ropt;

		Assert(AH->connection == NULL);
		/* this also sets clone->connection */
		ConnectDatabase((Archive *) clone, ropt->dbname,
						ropt->pghost, ropt->pgport, ropt->username,
						ropt->promptPassword);
	}
	else
	{
		char	   *dbname;
		char	   *pghost;
		char	   *pgport;
		char	   *username;
		const char *encname;

		Assert(AH->connection != NULL);

		/*
		 * Even though we are technically accessing the parent's database
		 * object here, these functions are fine to be called like that
		 * because all just return a pointer and do not actually send/receive
		 * any data to/from the database.
		 */
		dbname = PQdb(AH->connection);
		pghost = PQhost(AH->connection);
		pgport = PQport(AH->connection);
		username = PQuser(AH->connection);
		encname = pg_encoding_to_char(AH->public.encoding);

		/* this also sets clone->connection */
		ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username, TRI_NO);

		/*
		 * Set the same encoding, whatever we set here is what we got from
		 * pg_encoding_to_char(), so we really shouldn't run into an error
		 * setting that very same value. Also see the comment in
		 * SetupConnection().
		 */
		PQsetClientEncoding(clone->connection, encname);
	}

4407 4408 4409
	/* Let the format-specific code have a chance too */
	(clone->ClonePtr) (clone);

A
Andrew Dunstan 已提交
4410
	Assert(clone->connection != NULL);
4411 4412 4413 4414 4415 4416 4417 4418
	return clone;
}

/*
 * Release clone-local storage.
 *
 * Note: we assume any clone-local connection was already closed.
 */
A
Andrew Dunstan 已提交
4419
void
4420 4421 4422 4423 4424
DeCloneArchive(ArchiveHandle *AH)
{
	/* Clear format-specific state */
	(AH->DeClonePtr) (AH);

4425 4426 4427
	/* Clear state allocated by CloneArchive */
	if (AH->sqlparse.curCmd)
		destroyPQExpBuffer(AH->sqlparse.curCmd);
4428 4429 4430 4431 4432 4433 4434 4435 4436 4437 4438 4439 4440

	/* Clear any connection-local state */
	if (AH->currUser)
		free(AH->currUser);
	if (AH->currSchema)
		free(AH->currSchema);
	if (AH->currTablespace)
		free(AH->currTablespace);
	if (AH->savedPassword)
		free(AH->savedPassword);

	free(AH);
}