pg_backup_archiver.c 107.3 KB
Newer Older
B
Bruce Momjian 已提交
1 2 3 4 5 6 7 8 9 10
/*-------------------------------------------------------------------------
 *
 * pg_backup_archiver.c
 *
 *	Private implementation of the archiver routines.
 *
 *	See the headers to pg_restore for more details.
 *
 * Copyright (c) 2000, Philip Warner
 *	Rights are granted to use this software in any way so long
B
Bruce Momjian 已提交
11
 *	as this notice is not removed.
B
Bruce Momjian 已提交
12 13
 *
 *	The author is not responsible for loss or damages that may
14
 *	result from its use.
B
Bruce Momjian 已提交
15 16 17
 *
 *
 * IDENTIFICATION
18
 *		src/bin/pg_dump/pg_backup_archiver.c
19
 *
B
Bruce Momjian 已提交
20 21 22
 *-------------------------------------------------------------------------
 */

23
#include "pg_backup_db.h"
24
#include "pg_backup_utils.h"
A
Andrew Dunstan 已提交
25
#include "parallel.h"
26

27
#include <ctype.h>
A
Andrew Dunstan 已提交
28
#include <fcntl.h>
29
#include <unistd.h>
30
#include <sys/stat.h>
31 32
#include <sys/types.h>
#include <sys/wait.h>
B
Bruce Momjian 已提交
33

34 35 36 37
#ifdef WIN32
#include <io.h>
#endif

38
#include "libpq/libpq-fs.h"
39

40 41 42
#define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
#define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"

43 44 45 46 47 48 49
/* state needed to save/restore an archive's output target */
typedef struct _outputContext
{
	void	   *OF;
	int			gzOut;
} OutputContext;

50
/* translator: this is a module name */
51
static const char *modulename = gettext_noop("archiver");
52 53


B
Bruce Momjian 已提交
54
static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
A
Andrew Dunstan 已提交
55
	 const int compression, ArchiveMode mode, SetupWorkerPtr setupWorkerPtr);
56
static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
B
Bruce Momjian 已提交
57
					  ArchiveHandle *AH);
58
static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isData, bool acl_pass);
59
static char *replace_line_endings(const char *str);
60
static void _doSetFixedOutputState(ArchiveHandle *AH);
61
static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
62
static void _doSetWithOids(ArchiveHandle *AH, const bool withOids);
63
static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
64 65
static void _becomeUser(ArchiveHandle *AH, const char *user);
static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
66
static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
67
static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
68 69
static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
70
static teReqs _tocEntryRequired(TocEntry *te, teSection curSection, RestoreOptions *ropt);
71
static bool _tocEntryIsACL(TocEntry *te);
B
Bruce Momjian 已提交
72 73
static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
74
static void buildTocEntryArrays(ArchiveHandle *AH);
75
static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
B
Bruce Momjian 已提交
76
static int	_discoverArchiveFormat(ArchiveHandle *AH);
B
Bruce Momjian 已提交
77

78
static int	RestoringToDB(ArchiveHandle *AH);
79
static void dump_lo_buf(ArchiveHandle *AH);
80
static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
81
static void SetOutput(ArchiveHandle *AH, const char *filename, int compression);
82 83
static OutputContext SaveOutput(ArchiveHandle *AH);
static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
84

85
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
86
				  RestoreOptions *ropt, bool is_parallel);
A
Andrew Dunstan 已提交
87 88 89 90
static void restore_toc_entries_prefork(ArchiveHandle *AH);
static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
							 TocEntry *pending_list);
static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list);
91 92 93
static void par_list_header_init(TocEntry *l);
static void par_list_append(TocEntry *l, TocEntry *te);
static void par_list_remove(TocEntry *te);
94
static TocEntry *get_next_work_item(ArchiveHandle *AH,
95
				   TocEntry *ready_list,
A
Andrew Dunstan 已提交
96
				   ParallelState *pstate);
97
static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
A
Andrew Dunstan 已提交
98 99
			   int worker, int status,
			   ParallelState *pstate);
100
static void fix_dependencies(ArchiveHandle *AH);
101
static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
102 103
static void repoint_table_dependencies(ArchiveHandle *AH);
static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
104
static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
B
Bruce Momjian 已提交
105
					TocEntry *ready_list);
106 107
static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
108

B
Bruce Momjian 已提交
109
/*
B
Bruce Momjian 已提交
110 111 112 113
 *	Wrapper functions.
 *
 *	The objective it to make writing new formats and dumpers as simple
 *	as possible, if necessary at the expense of extra function calls etc.
B
Bruce Momjian 已提交
114 115 116
 *
 */

A
Andrew Dunstan 已提交
117 118 119 120 121 122 123 124 125 126 127 128 129
/*
 * The dump worker setup needs lots of knowledge of the internals of pg_dump,
 * so It's defined in pg_dump.c and passed into OpenArchive. The restore worker
 * setup doesn't need to know anything much, so it's defined here.
 */
static void
setupRestoreWorker(Archive *AHX, RestoreOptions *ropt)
{
	ArchiveHandle *AH = (ArchiveHandle *) AHX;

	(AH->ReopenPtr) (AH);
}

B
Bruce Momjian 已提交
130 131 132

/* Create a new archive */
/* Public */
133
Archive *
B
Bruce Momjian 已提交
134
CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
A
Andrew Dunstan 已提交
135
	 const int compression, ArchiveMode mode, SetupWorkerPtr setupDumpWorker)
136

B
Bruce Momjian 已提交
137
{
A
Andrew Dunstan 已提交
138
	ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, mode, setupDumpWorker);
B
Bruce Momjian 已提交
139 140

	return (Archive *) AH;
B
Bruce Momjian 已提交
141 142 143 144
}

/* Open an existing archive */
/* Public */
145
Archive *
B
Bruce Momjian 已提交
146
OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
B
Bruce Momjian 已提交
147
{
A
Andrew Dunstan 已提交
148
	ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, archModeRead, setupRestoreWorker);
B
Bruce Momjian 已提交
149 150

	return (Archive *) AH;
B
Bruce Momjian 已提交
151 152 153
}

/* Public */
B
Bruce Momjian 已提交
154 155
void
CloseArchive(Archive *AHX)
B
Bruce Momjian 已提交
156
{
B
Bruce Momjian 已提交
157 158 159 160
	int			res = 0;
	ArchiveHandle *AH = (ArchiveHandle *) AHX;

	(*AH->ClosePtr) (AH);
B
Bruce Momjian 已提交
161

B
Bruce Momjian 已提交
162 163
	/* Close the output */
	if (AH->gzOut)
164
		res = GZCLOSE(AH->OF);
B
Bruce Momjian 已提交
165
	else if (AH->OF != stdout)
166 167 168
		res = fclose(AH->OF);

	if (res != 0)
169 170
		exit_horribly(modulename, "could not close output file: %s\n",
					  strerror(errno));
B
Bruce Momjian 已提交
171 172 173
}

/* Public */
B
Bruce Momjian 已提交
174
void
175
SetArchiveRestoreOptions(Archive *AHX, RestoreOptions *ropt)
B
Bruce Momjian 已提交
176
{
B
Bruce Momjian 已提交
177
	ArchiveHandle *AH = (ArchiveHandle *) AHX;
178 179 180 181 182 183 184 185 186 187
	TocEntry   *te;
	teSection	curSection;

	/* Save options for later access */
	AH->ropt = ropt;

	/* Decide which TOC entries will be dumped/restored, and mark them */
	curSection = SECTION_PRE_DATA;
	for (te = AH->toc->next; te != AH->toc; te = te->next)
	{
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
		/*
		 * When writing an archive, we also take this opportunity to check
		 * that we have generated the entries in a sane order that respects
		 * the section divisions.  When reading, don't complain, since buggy
		 * old versions of pg_dump might generate out-of-order archives.
		 */
		if (AH->mode != archModeRead)
		{
			switch (te->section)
			{
				case SECTION_NONE:
					/* ok to be anywhere */
					break;
				case SECTION_PRE_DATA:
					if (curSection != SECTION_PRE_DATA)
						write_msg(modulename,
								  "WARNING: archive items not in correct section order\n");
					break;
				case SECTION_DATA:
					if (curSection == SECTION_POST_DATA)
						write_msg(modulename,
								  "WARNING: archive items not in correct section order\n");
					break;
				case SECTION_POST_DATA:
					/* ok no matter which section we were in */
					break;
				default:
					exit_horribly(modulename, "unexpected section code %d\n",
								  (int) te->section);
					break;
			}
		}

221 222
		if (te->section != SECTION_NONE)
			curSection = te->section;
223

224 225 226 227 228 229 230 231 232 233
		te->reqs = _tocEntryRequired(te, curSection, ropt);
	}
}

/* Public */
void
RestoreArchive(Archive *AHX)
{
	ArchiveHandle *AH = (ArchiveHandle *) AHX;
	RestoreOptions *ropt = AH->ropt;
234
	bool		parallel_mode;
235
	TocEntry   *te;
B
Bruce Momjian 已提交
236
	OutputContext sav;
B
Bruce Momjian 已提交
237

238
	AH->stage = STAGE_INITIALIZING;
239

240 241 242
	/*
	 * Check for nonsensical option combinations.
	 *
243
	 * -C is not compatible with -1, because we can't create a database inside
244
	 * a transaction block.
245
	 */
246
	if (ropt->createDB && ropt->single_txn)
247
		exit_horribly(modulename, "-C and -1 are incompatible options\n");
248

249 250 251
	/*
	 * If we're going to do parallel restore, there are some restrictions.
	 */
A
Andrew Dunstan 已提交
252
	parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
253 254 255 256
	if (parallel_mode)
	{
		/* We haven't got round to making this work for all archive formats */
		if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
257
			exit_horribly(modulename, "parallel restore is not supported with this archive file format\n");
258 259 260

		/* Doesn't work if the archive represents dependencies as OIDs */
		if (AH->version < K_VERS_1_8)
261
			exit_horribly(modulename, "parallel restore is not supported with archives made by pre-8.0 pg_dump\n");
262 263 264 265 266 267 268 269

		/*
		 * It's also not gonna work if we can't reopen the input file, so
		 * let's try that immediately.
		 */
		(AH->ReopenPtr) (AH);
	}

270 271 272 273
	/*
	 * Make sure we won't need (de)compression we haven't got
	 */
#ifndef HAVE_LIBZ
274
	if (AH->compression != 0 && AH->PrintTocDataPtr !=NULL)
275 276 277
	{
		for (te = AH->toc->next; te != AH->toc; te = te->next)
		{
278
			if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
279
				exit_horribly(modulename, "cannot restore from compressed archive (compression not supported in this installation)\n");
280 281 282 283
		}
	}
#endif

284 285 286 287 288 289 290
	/*
	 * Prepare index arrays, so we can assume we have them throughout restore.
	 * It's possible we already did this, though.
	 */
	if (AH->tocsByDumpId == NULL)
		buildTocEntryArrays(AH);

291 292 293 294 295
	/*
	 * If we're using a DB connection, then connect it.
	 */
	if (ropt->useDB)
	{
296
		ahlog(AH, 1, "connecting to database for restore\n");
297
		if (AH->version < K_VERS_1_3)
298
			exit_horribly(modulename, "direct database connections are not supported in pre-1.3 archives\n");
299

300 301 302 303 304 305
		/*
		 * We don't want to guess at whether the dump will successfully
		 * restore; allow the attempt regardless of the version of the restore
		 * target.
		 */
		AHX->minRemoteVersion = 0;
306 307
		AHX->maxRemoteVersion = 999999;

308 309
		ConnectDatabase(AHX, ropt->dbname,
						ropt->pghost, ropt->pgport, ropt->username,
310
						ropt->promptPassword);
B
Bruce Momjian 已提交
311 312

		/*
B
Bruce Momjian 已提交
313 314
		 * If we're talking to the DB directly, don't send comments since they
		 * obscure SQL when displaying errors
B
Bruce Momjian 已提交
315
		 */
316
		AH->noTocComments = 1;
317 318
	}

319
	/*
B
Bruce Momjian 已提交
320 321 322 323
	 * Work out if we have an implied data-only restore. This can happen if
	 * the dump was data only or if the user has used a toc list to exclude
	 * all of the schema data. All we do is look for schema entries - if none
	 * are found then we set the dataOnly flag.
324
	 *
B
Bruce Momjian 已提交
325
	 * We could scan for wanted TABLE entries, but that is not the same as
326
	 * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
B
Bruce Momjian 已提交
327 328 329
	 */
	if (!ropt->dataOnly)
	{
B
Bruce Momjian 已提交
330
		int			impliedDataOnly = 1;
331 332

		for (te = AH->toc->next; te != AH->toc; te = te->next)
B
Bruce Momjian 已提交
333
		{
334
			if ((te->reqs & REQ_SCHEMA) != 0)
B
Bruce Momjian 已提交
335
			{					/* It's schema, and it's wanted */
336 337 338 339 340 341 342
				impliedDataOnly = 0;
				break;
			}
		}
		if (impliedDataOnly)
		{
			ropt->dataOnly = impliedDataOnly;
343
			ahlog(AH, 1, "implied data-only restore\n");
344
		}
B
Bruce Momjian 已提交
345
	}
346

347
	/*
B
Bruce Momjian 已提交
348
	 * Setup the output file if necessary.
B
Bruce Momjian 已提交
349
	 */
350
	sav = SaveOutput(AH);
B
Bruce Momjian 已提交
351
	if (ropt->filename || ropt->compression)
352
		SetOutput(AH, ropt->filename, ropt->compression);
B
Bruce Momjian 已提交
353

354
	ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
B
Bruce Momjian 已提交
355

356 357
	if (AH->public.verbose)
	{
358 359 360 361 362 363
		if (AH->archiveRemoteVersion)
			ahprintf(AH, "-- Dumped from database version %s\n",
					 AH->archiveRemoteVersion);
		if (AH->archiveDumpVersion)
			ahprintf(AH, "-- Dumped by pg_dump version %s\n",
					 AH->archiveDumpVersion);
364
		dumpTimestamp(AH, "Started on", AH->createDate);
365
	}
366

367
	if (ropt->single_txn)
368 369 370 371 372 373
	{
		if (AH->connection)
			StartTransaction(AH);
		else
			ahprintf(AH, "BEGIN;\n\n");
	}
374

375 376 377 378 379
	/*
	 * Establish important parameter values right away.
	 */
	_doSetFixedOutputState(AH);

380 381
	AH->stage = STAGE_PROCESSING;

B
Bruce Momjian 已提交
382 383
	/*
	 * Drop the items at the start, in reverse order
384
	 */
B
Bruce Momjian 已提交
385 386
	if (ropt->dropSchema)
	{
387
		for (te = AH->toc->prev; te != AH->toc; te = te->prev)
B
Bruce Momjian 已提交
388
		{
389 390
			AH->currentTE = te;

391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
			/*
			 * In createDB mode, issue a DROP *only* for the database as a
			 * whole.  Issuing drops against anything else would be wrong,
			 * because at this point we're connected to the wrong database.
			 * Conversely, if we're not in createDB mode, we'd better not
			 * issue a DROP against the database at all.
			 */
			if (ropt->createDB)
			{
				if (strcmp(te->desc, "DATABASE") != 0)
					continue;
			}
			else
			{
				if (strcmp(te->desc, "DATABASE") == 0)
					continue;
			}

			/* Otherwise, drop anything that's selected and has a dropStmt */
410
			if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
411
			{
412
				ahlog(AH, 1, "dropping %s %s\n", te->desc, te->tag);
413
				/* Select owner and schema as necessary */
414
				_becomeOwner(AH, te);
415
				_selectOutputSchema(AH, te->namespace);
416 417 418 419 420 421 422 423 424 425

				/*
				 * Now emit the DROP command, if the object has one.  Note we
				 * don't necessarily emit it verbatim; at this point we add an
				 * appropriate IF EXISTS clause, if the user requested it.
				 */
				if (*te->dropStmt != '\0')
				{
					if (!ropt->if_exists)
					{
B
Bruce Momjian 已提交
426
						/* No --if-exists?	Then just use the original */
427 428 429 430 431 432 433 434
						ahprintf(AH, "%s", te->dropStmt);
					}
					else
					{
						char		buffer[40];
						char	   *mark;
						char	   *dropStmt = pg_strdup(te->dropStmt);
						char	   *dropStmtPtr = dropStmt;
B
Bruce Momjian 已提交
435
						PQExpBuffer ftStmt = createPQExpBuffer();
436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451

						/*
						 * Need to inject IF EXISTS clause after ALTER TABLE
						 * part in ALTER TABLE .. DROP statement
						 */
						if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
						{
							appendPQExpBuffer(ftStmt,
											  "ALTER TABLE IF EXISTS");
							dropStmt = dropStmt + 11;
						}

						/*
						 * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does not
						 * support the IF EXISTS clause, and therefore we
						 * simply emit the original command for such objects.
B
Bruce Momjian 已提交
452 453 454 455 456 457
						 * For other objects, we need to extract the first
						 * part of the DROP which includes the object type.
						 * Most of the time this matches te->desc, so search
						 * for that; however for the different kinds of
						 * CONSTRAINTs, we know to search for hardcoded "DROP
						 * CONSTRAINT" instead.
458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486
						 */
						if (strcmp(te->desc, "DEFAULT") == 0)
							appendPQExpBuffer(ftStmt, "%s", dropStmt);
						else
						{
							if (strcmp(te->desc, "CONSTRAINT") == 0 ||
								strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
								strcmp(te->desc, "FK CONSTRAINT") == 0)
								strcpy(buffer, "DROP CONSTRAINT");
							else
								snprintf(buffer, sizeof(buffer), "DROP %s",
										 te->desc);

							mark = strstr(dropStmt, buffer);
							Assert(mark != NULL);

							*mark = '\0';
							appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
											  dropStmt, buffer,
											  mark + strlen(buffer));
						}

						ahprintf(AH, "%s", ftStmt->data);

						destroyPQExpBuffer(ftStmt);

						pg_free(dropStmtPtr);
					}
				}
487 488
			}
		}
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503

		/*
		 * _selectOutputSchema may have set currSchema to reflect the effect
		 * of a "SET search_path" command it emitted.  However, by now we may
		 * have dropped that schema; or it might not have existed in the first
		 * place.  In either case the effective value of search_path will not
		 * be what we think.  Forcibly reset currSchema so that we will
		 * re-establish the search_path setting when needed (after creating
		 * the schema).
		 *
		 * If we treated users as pg_dump'able objects then we'd need to reset
		 * currUser here too.
		 */
		if (AH->currSchema)
			free(AH->currSchema);
504
		AH->currSchema = NULL;
B
Bruce Momjian 已提交
505
	}
B
Bruce Momjian 已提交
506

507
	/*
508 509 510
	 * In serial mode, we now process each non-ACL TOC entry.
	 *
	 * In parallel mode, turn control over to the parallel-restore logic.
511
	 */
512
	if (parallel_mode)
A
Andrew Dunstan 已提交
513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531
	{
		ParallelState *pstate;
		TocEntry	pending_list;

		par_list_header_init(&pending_list);

		/* This runs PRE_DATA items and then disconnects from the database */
		restore_toc_entries_prefork(AH);
		Assert(AH->connection == NULL);

		/* ParallelBackupStart() will actually fork the processes */
		pstate = ParallelBackupStart(AH, ropt);
		restore_toc_entries_parallel(AH, pstate, &pending_list);
		ParallelBackupEnd(AH, pstate);

		/* reconnect the master and see if we missed something */
		restore_toc_entries_postfork(AH, &pending_list);
		Assert(AH->connection != NULL);
	}
532
	else
B
Bruce Momjian 已提交
533
	{
534 535 536
		for (te = AH->toc->next; te != AH->toc; te = te->next)
			(void) restore_toc_entry(AH, te, ropt, false);
	}
B
Bruce Momjian 已提交
537

538 539 540
	/*
	 * Scan TOC again to output ownership commands and ACLs
	 */
541
	for (te = AH->toc->next; te != AH->toc; te = te->next)
542
	{
543 544
		AH->currentTE = te;

545
		/* Both schema and data objects might now have ownership/ACLs */
546
		if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0)
547
		{
P
Peter Eisentraut 已提交
548
			ahlog(AH, 1, "setting owner and privileges for %s %s\n",
549
				  te->desc, te->tag);
550 551 552 553
			_printTocEntry(AH, te, ropt, false, true);
		}
	}

554
	if (ropt->single_txn)
555 556 557 558 559 560
	{
		if (AH->connection)
			CommitTransaction(AH);
		else
			ahprintf(AH, "COMMIT;\n\n");
	}
B
Bruce Momjian 已提交
561

562 563 564
	if (AH->public.verbose)
		dumpTimestamp(AH, "Completed on", time(NULL));

565 566
	ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");

567
	/*
568
	 * Clean up & we're done.
569
	 */
570 571
	AH->stage = STAGE_FINALIZING;

572
	if (ropt->filename || ropt->compression)
573
		RestoreOutput(AH, sav);
574 575

	if (ropt->useDB)
R
Robert Haas 已提交
576
		DisconnectDatabase(&AH->public);
B
Bruce Momjian 已提交
577 578
}

579 580 581 582 583 584 585 586 587 588 589
/*
 * Restore a single TOC item.  Used in both parallel and non-parallel restore;
 * is_parallel is true if we are in a worker child process.
 *
 * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
 * the parallel parent has to make the corresponding status update.
 */
static int
restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
				  RestoreOptions *ropt, bool is_parallel)
{
A
Andrew Dunstan 已提交
590
	int			status = WORKER_OK;
591 592 593 594 595 596
	teReqs		reqs;
	bool		defnDumped;

	AH->currentTE = te;

	/* Work out what, if anything, we want from this entry */
597 598 599 600 601 602 603
	if (_tocEntryIsACL(te))
		reqs = 0;				/* ACLs are never restored here */
	else
		reqs = te->reqs;

	/*
	 * Ignore DATABASE entry unless we should create it.  We must check this
604 605
	 * here, not in _tocEntryRequired, because the createDB option should not
	 * affect emitting a DATABASE entry to an archive file.
606 607 608
	 */
	if (!ropt->createDB && strcmp(te->desc, "DATABASE") == 0)
		reqs = 0;
609 610 611 612 613 614 615 616 617 618 619 620

	/* Dump any relevant dump warnings to stderr */
	if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
	{
		if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
			write_msg(modulename, "warning from original dump file: %s\n", te->defn);
		else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
			write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
	}

	defnDumped = false;

621
	if ((reqs & REQ_SCHEMA) != 0)		/* We want the schema */
622 623 624 625 626 627 628 629 630 631 632
	{
		ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);

		_printTocEntry(AH, te, ropt, false, false);
		defnDumped = true;

		if (strcmp(te->desc, "TABLE") == 0)
		{
			if (AH->lastErrorTE == te)
			{
				/*
633 634 635
				 * We failed to create the table. If
				 * --no-data-for-failed-tables was given, mark the
				 * corresponding TABLE DATA to be ignored.
636
				 *
637 638
				 * In the parallel case this must be done in the parent, so we
				 * just set the return value.
639 640 641 642
				 */
				if (ropt->noDataForFailedTables)
				{
					if (is_parallel)
A
Andrew Dunstan 已提交
643
						status = WORKER_INHIBIT_DATA;
644 645 646 647 648 649 650
					else
						inhibit_data_for_failed_table(AH, te);
				}
			}
			else
			{
				/*
651 652
				 * We created the table successfully.  Mark the corresponding
				 * TABLE DATA for possible truncation.
653
				 *
654 655
				 * In the parallel case this must be done in the parent, so we
				 * just set the return value.
656 657
				 */
				if (is_parallel)
A
Andrew Dunstan 已提交
658
					status = WORKER_CREATE_DONE;
659 660 661 662 663 664 665 666 667 668
				else
					mark_create_done(AH, te);
			}
		}

		/* If we created a DB, connect to it... */
		if (strcmp(te->desc, "DATABASE") == 0)
		{
			ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
			_reconnectToDB(AH, te->tag);
669
			ropt->dbname = pg_strdup(te->tag);
670 671 672 673 674 675 676 677 678
		}
	}

	/*
	 * If we have a data component, then process it
	 */
	if ((reqs & REQ_DATA) != 0)
	{
		/*
679 680 681
		 * hadDumper will be set if there is genuine data component for this
		 * node. Otherwise, we need to check the defn field for statements
		 * that need to be executed in data-only restores.
682 683 684 685 686 687
		 */
		if (te->hadDumper)
		{
			/*
			 * If we can output the data, then restore it.
			 */
688
			if (AH->PrintTocDataPtr !=NULL)
689 690 691 692 693 694
			{
				_printTocEntry(AH, te, ropt, true, false);

				if (strcmp(te->desc, "BLOBS") == 0 ||
					strcmp(te->desc, "BLOB COMMENTS") == 0)
				{
695
					ahlog(AH, 1, "processing %s\n", te->desc);
696 697 698 699 700 701 702 703 704 705 706 707 708

					_selectOutputSchema(AH, "pg_catalog");

					(*AH->PrintTocDataPtr) (AH, te, ropt);
				}
				else
				{
					_disableTriggersIfNecessary(AH, te, ropt);

					/* Select owner and schema as necessary */
					_becomeOwner(AH, te);
					_selectOutputSchema(AH, te->namespace);

709
					ahlog(AH, 1, "processing data for table \"%s\"\n",
710 711 712
						  te->tag);

					/*
713 714
					 * In parallel restore, if we created the table earlier in
					 * the run then we wrap the COPY in a transaction and
B
Bruce Momjian 已提交
715 716
					 * precede it with a TRUNCATE.  If archiving is not on
					 * this prevents WAL-logging the COPY.  This obtains a
717 718
					 * speedup similar to that from using single_txn mode in
					 * non-parallel restores.
719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739
					 */
					if (is_parallel && te->created)
					{
						/*
						 * Parallel restore is always talking directly to a
						 * server, so no need to see if we should issue BEGIN.
						 */
						StartTransaction(AH);

						/*
						 * If the server version is >= 8.4, make sure we issue
						 * TRUNCATE with ONLY so that child tables are not
						 * wiped.
						 */
						ahprintf(AH, "TRUNCATE TABLE %s%s;\n\n",
								 (PQserverVersion(AH->connection) >= 80400 ?
								  "ONLY " : ""),
								 fmtId(te->tag));
					}

					/*
740
					 * If we have a copy statement, use it.
741 742 743 744
					 */
					if (te->copyStmt && strlen(te->copyStmt) > 0)
					{
						ahprintf(AH, "%s", te->copyStmt);
745
						AH->outputKind = OUTPUT_COPYDATA;
746
					}
747 748
					else
						AH->outputKind = OUTPUT_OTHERDATA;
749 750 751

					(*AH->PrintTocDataPtr) (AH, te, ropt);

752 753 754
					/*
					 * Terminate COPY if needed.
					 */
755 756 757 758
					if (AH->outputKind == OUTPUT_COPYDATA &&
						RestoringToDB(AH))
						EndDBCopyMode(AH, te);
					AH->outputKind = OUTPUT_SQLCMDS;
759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775

					/* close out the transaction started above */
					if (is_parallel && te->created)
						CommitTransaction(AH);

					_enableTriggersIfNecessary(AH, te, ropt);
				}
			}
		}
		else if (!defnDumped)
		{
			/* If we haven't already dumped the defn part, do so now */
			ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
			_printTocEntry(AH, te, ropt, false, false);
		}
	}

A
Andrew Dunstan 已提交
776 777 778 779
	if (AH->public.n_errors > 0 && status == WORKER_OK)
		status = WORKER_IGNORED_ERRORS;

	return status;
780 781
}

782 783 784 785
/*
 * Allocate a new RestoreOptions block.
 * This is mainly so we can initialize it, but also for future expansion,
 */
B
Bruce Momjian 已提交
786 787
RestoreOptions *
NewRestoreOptions(void)
B
Bruce Momjian 已提交
788
{
B
Bruce Momjian 已提交
789
	RestoreOptions *opts;
B
Bruce Momjian 已提交
790

791
	opts = (RestoreOptions *) pg_malloc0(sizeof(RestoreOptions));
B
Bruce Momjian 已提交
792

793
	/* set any fields that shouldn't default to zeroes */
B
Bruce Momjian 已提交
794
	opts->format = archUnknown;
795
	opts->promptPassword = TRI_DEFAULT;
796
	opts->dumpSections = DUMP_UNSECTIONED;
B
Bruce Momjian 已提交
797 798 799 800

	return opts;
}

B
Bruce Momjian 已提交
801 802
static void
_disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
B
Bruce Momjian 已提交
803
{
804 805
	/* This hack is only needed in a data-only restore */
	if (!ropt->dataOnly || !ropt->disable_triggers)
806 807
		return;

808 809
	ahlog(AH, 1, "disabling triggers for %s\n", te->tag);

810
	/*
B
Bruce Momjian 已提交
811
	 * Become superuser if possible, since they are the only ones who can
812 813 814
	 * disable constraint triggers.  If -S was not given, assume the initial
	 * user identity is a superuser.  (XXX would it be better to become the
	 * table owner?)
815
	 */
816
	_becomeUser(AH, ropt->superuser);
817 818

	/*
819
	 * Disable them.
820
	 */
821
	_selectOutputSchema(AH, te->namespace);
822

823 824
	ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
			 fmtId(te->tag));
B
Bruce Momjian 已提交
825 826
}

B
Bruce Momjian 已提交
827 828
static void
_enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
B
Bruce Momjian 已提交
829
{
830 831
	/* This hack is only needed in a data-only restore */
	if (!ropt->dataOnly || !ropt->disable_triggers)
832 833
		return;

834 835
	ahlog(AH, 1, "enabling triggers for %s\n", te->tag);

836
	/*
B
Bruce Momjian 已提交
837
	 * Become superuser if possible, since they are the only ones who can
838 839 840
	 * disable constraint triggers.  If -S was not given, assume the initial
	 * user identity is a superuser.  (XXX would it be better to become the
	 * table owner?)
841
	 */
842
	_becomeUser(AH, ropt->superuser);
843 844

	/*
845
	 * Enable them.
846
	 */
847
	_selectOutputSchema(AH, te->namespace);
848

849 850
	ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
			 fmtId(te->tag));
851
}
B
Bruce Momjian 已提交
852 853

/*
854
 * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
B
Bruce Momjian 已提交
855 856 857
 */

/* Public */
858
void
P
Peter Eisentraut 已提交
859
WriteData(Archive *AHX, const void *data, size_t dLen)
B
Bruce Momjian 已提交
860
{
B
Bruce Momjian 已提交
861
	ArchiveHandle *AH = (ArchiveHandle *) AHX;
B
Bruce Momjian 已提交
862

863
	if (!AH->currToc)
864
		exit_horribly(modulename, "internal error -- WriteData cannot be called outside the context of a DataDumper routine\n");
865

866 867 868
	(*AH->WriteDataPtr) (AH, data, dLen);

	return;
B
Bruce Momjian 已提交
869 870 871
}

/*
B
Bruce Momjian 已提交
872
 * Create a new TOC entry. The TOC was designed as a TOC, but is now the
B
Bruce Momjian 已提交
873 874 875 876
 * repository for all metadata. But the name has stuck.
 */

/* Public */
B
Bruce Momjian 已提交
877
void
878 879 880
ArchiveEntry(Archive *AHX,
			 CatalogId catalogId, DumpId dumpId,
			 const char *tag,
881
			 const char *namespace,
B
Bruce Momjian 已提交
882
			 const char *tablespace,
883
			 const char *owner, bool withOids,
884 885
			 const char *desc, teSection section,
			 const char *defn,
886 887
			 const char *dropStmt, const char *copyStmt,
			 const DumpId *deps, int nDeps,
B
Bruce Momjian 已提交
888
			 DataDumperPtr dumpFn, void *dumpArg)
B
Bruce Momjian 已提交
889
{
B
Bruce Momjian 已提交
890 891 892
	ArchiveHandle *AH = (ArchiveHandle *) AHX;
	TocEntry   *newToc;

893
	newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
B
Bruce Momjian 已提交
894

895 896 897 898
	AH->tocCount++;
	if (dumpId > AH->maxDumpId)
		AH->maxDumpId = dumpId;

B
Bruce Momjian 已提交
899 900 901 902 903
	newToc->prev = AH->toc->prev;
	newToc->next = AH->toc;
	AH->toc->prev->next = newToc;
	AH->toc->prev = newToc;

904 905
	newToc->catalogId = catalogId;
	newToc->dumpId = dumpId;
906
	newToc->section = section;
907

908 909 910 911
	newToc->tag = pg_strdup(tag);
	newToc->namespace = namespace ? pg_strdup(namespace) : NULL;
	newToc->tablespace = tablespace ? pg_strdup(tablespace) : NULL;
	newToc->owner = pg_strdup(owner);
912
	newToc->withOids = withOids;
913 914 915 916
	newToc->desc = pg_strdup(desc);
	newToc->defn = pg_strdup(defn);
	newToc->dropStmt = pg_strdup(dropStmt);
	newToc->copyStmt = copyStmt ? pg_strdup(copyStmt) : NULL;
917

918 919
	if (nDeps > 0)
	{
920
		newToc->dependencies = (DumpId *) pg_malloc(nDeps * sizeof(DumpId));
921 922 923 924 925 926 927 928
		memcpy(newToc->dependencies, deps, nDeps * sizeof(DumpId));
		newToc->nDeps = nDeps;
	}
	else
	{
		newToc->dependencies = NULL;
		newToc->nDeps = 0;
	}
929

930 931
	newToc->dataDumper = dumpFn;
	newToc->dataDumperArg = dumpArg;
932
	newToc->hadDumper = dumpFn ? true : false;
B
Bruce Momjian 已提交
933

934
	newToc->formatData = NULL;
B
Bruce Momjian 已提交
935

B
Bruce Momjian 已提交
936
	if (AH->ArchiveEntryPtr !=NULL)
B
Bruce Momjian 已提交
937
		(*AH->ArchiveEntryPtr) (AH, newToc);
B
Bruce Momjian 已提交
938 939 940
}

/* Public */
B
Bruce Momjian 已提交
941 942
void
PrintTOCSummary(Archive *AHX, RestoreOptions *ropt)
B
Bruce Momjian 已提交
943
{
B
Bruce Momjian 已提交
944
	ArchiveHandle *AH = (ArchiveHandle *) AHX;
945
	TocEntry   *te;
946
	teSection	curSection;
B
Bruce Momjian 已提交
947
	OutputContext sav;
948
	const char *fmtName;
B
Bruce Momjian 已提交
949

950
	sav = SaveOutput(AH);
B
Bruce Momjian 已提交
951
	if (ropt->filename)
952
		SetOutput(AH, ropt->filename, 0 /* no compression */ );
B
Bruce Momjian 已提交
953

954 955
	ahprintf(AH, ";\n; Archive created at %s", ctime(&AH->createDate));
	ahprintf(AH, ";     dbname: %s\n;     TOC Entries: %d\n;     Compression: %d\n",
B
Bruce Momjian 已提交
956
			 AH->archdbname, AH->tocCount, AH->compression);
957

B
Bruce Momjian 已提交
958 959
	switch (AH->format)
	{
960 961 962
		case archCustom:
			fmtName = "CUSTOM";
			break;
963 964 965
		case archDirectory:
			fmtName = "DIRECTORY";
			break;
966 967 968 969 970 971
		case archTar:
			fmtName = "TAR";
			break;
		default:
			fmtName = "UNKNOWN";
	}
972 973

	ahprintf(AH, ";     Dump Version: %d.%d-%d\n", AH->vmaj, AH->vmin, AH->vrev);
974
	ahprintf(AH, ";     Format: %s\n", fmtName);
T
Tom Lane 已提交
975 976
	ahprintf(AH, ";     Integer: %d bytes\n", (int) AH->intSize);
	ahprintf(AH, ";     Offset: %d bytes\n", (int) AH->offSize);
977 978 979 980 981 982
	if (AH->archiveRemoteVersion)
		ahprintf(AH, ";     Dumped from database version: %s\n",
				 AH->archiveRemoteVersion);
	if (AH->archiveDumpVersion)
		ahprintf(AH, ";     Dumped by pg_dump version: %s\n",
				 AH->archiveDumpVersion);
983

984
	ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
B
Bruce Momjian 已提交
985

986
	curSection = SECTION_PRE_DATA;
987
	for (te = AH->toc->next; te != AH->toc; te = te->next)
B
Bruce Momjian 已提交
988
	{
989 990 991 992
		if (te->section != SECTION_NONE)
			curSection = te->section;
		if (ropt->verbose ||
			(_tocEntryRequired(te, curSection, ropt) & (REQ_SCHEMA | REQ_DATA)) != 0)
993
			ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
994
					 te->catalogId.tableoid, te->catalogId.oid,
995 996
					 te->desc, te->namespace ? te->namespace : "-",
					 te->tag, te->owner);
997 998
		if (ropt->verbose && te->nDeps > 0)
		{
999
			int			i;
1000 1001 1002 1003 1004 1005

			ahprintf(AH, ";\tdepends on:");
			for (i = 0; i < te->nDeps; i++)
				ahprintf(AH, " %d", te->dependencies[i]);
			ahprintf(AH, "\n");
		}
B
Bruce Momjian 已提交
1006
	}
B
Bruce Momjian 已提交
1007

B
Bruce Momjian 已提交
1008
	if (ropt->filename)
1009
		RestoreOutput(AH, sav);
B
Bruce Momjian 已提交
1010 1011
}

1012 1013 1014 1015 1016
/***********
 * BLOB Archival
 ***********/

/* Called by a dumper to signal start of a BLOB */
B
Bruce Momjian 已提交
1017
int
1018
StartBlob(Archive *AHX, Oid oid)
1019
{
B
Bruce Momjian 已提交
1020
	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1021

B
Bruce Momjian 已提交
1022
	if (!AH->StartBlobPtr)
1023
		exit_horribly(modulename, "large-object output not supported in chosen format\n");
1024

B
Bruce Momjian 已提交
1025
	(*AH->StartBlobPtr) (AH, AH->currToc, oid);
1026

B
Bruce Momjian 已提交
1027
	return 1;
1028 1029 1030
}

/* Called by a dumper to signal end of a BLOB */
B
Bruce Momjian 已提交
1031
int
1032
EndBlob(Archive *AHX, Oid oid)
1033
{
B
Bruce Momjian 已提交
1034
	ArchiveHandle *AH = (ArchiveHandle *) AHX;
1035

B
Bruce Momjian 已提交
1036 1037
	if (AH->EndBlobPtr)
		(*AH->EndBlobPtr) (AH, AH->currToc, oid);
1038

B
Bruce Momjian 已提交
1039
	return 1;
1040 1041 1042 1043 1044 1045
}

/**********
 * BLOB Restoration
 **********/

1046
/*
B
Bruce Momjian 已提交
1047
 * Called by a format handler before any blobs are restored
1048
 */
B
Bruce Momjian 已提交
1049 1050
void
StartRestoreBlobs(ArchiveHandle *AH)
1051
{
1052 1053 1054 1055 1056 1057 1058
	if (!AH->ropt->single_txn)
	{
		if (AH->connection)
			StartTransaction(AH);
		else
			ahprintf(AH, "BEGIN;\n\n");
	}
1059

1060 1061 1062 1063
	AH->blobCount = 0;
}

/*
B
Bruce Momjian 已提交
1064
 * Called by a format handler after all blobs are restored
1065
 */
B
Bruce Momjian 已提交
1066 1067
void
EndRestoreBlobs(ArchiveHandle *AH)
1068
{
1069 1070 1071 1072 1073 1074 1075
	if (!AH->ropt->single_txn)
	{
		if (AH->connection)
			CommitTransaction(AH);
		else
			ahprintf(AH, "COMMIT;\n\n");
	}
1076

P
Peter Eisentraut 已提交
1077 1078 1079 1080
	ahlog(AH, 1, ngettext("restored %d large object\n",
						  "restored %d large objects\n",
						  AH->blobCount),
		  AH->blobCount);
1081 1082 1083
}


1084 1085 1086
/*
 * Called by a format handler to initiate restoration of a blob
 */
B
Bruce Momjian 已提交
1087
void
1088
StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
1089
{
1090
	bool		old_blob_style = (AH->version < K_VERS_1_12);
1091
	Oid			loOid;
1092

1093 1094
	AH->blobCount++;

1095 1096 1097
	/* Initialize the LO Buffer */
	AH->lo_buf_used = 0;

1098
	ahlog(AH, 1, "restoring large object with OID %u\n", oid);
1099

1100 1101
	/* With an old archive we must do drop and create logic here */
	if (old_blob_style && drop)
1102
		DropBlobIfExists(AH, oid);
1103

1104
	if (AH->connection)
1105
	{
1106 1107 1108 1109
		if (old_blob_style)
		{
			loOid = lo_create(AH->connection, oid);
			if (loOid == 0 || loOid != oid)
1110 1111
				exit_horribly(modulename, "could not create large object %u: %s",
							  oid, PQerrorMessage(AH->connection));
1112
		}
1113 1114
		AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
		if (AH->loFd == -1)
1115 1116
			exit_horribly(modulename, "could not open large object %u: %s",
						  oid, PQerrorMessage(AH->connection));
1117 1118 1119
	}
	else
	{
1120 1121 1122 1123 1124 1125
		if (old_blob_style)
			ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
					 oid, INV_WRITE);
		else
			ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
					 oid, INV_WRITE);
1126
	}
1127

B
Bruce Momjian 已提交
1128
	AH->writingBlob = 1;
1129 1130
}

B
Bruce Momjian 已提交
1131
void
1132
EndRestoreBlob(ArchiveHandle *AH, Oid oid)
1133
{
1134 1135 1136
	if (AH->lo_buf_used > 0)
	{
		/* Write remaining bytes from the LO buffer */
1137
		dump_lo_buf(AH);
1138
	}
1139

B
Bruce Momjian 已提交
1140
	AH->writingBlob = 0;
1141

1142
	if (AH->connection)
1143
	{
1144 1145 1146 1147 1148
		lo_close(AH->connection, AH->loFd);
		AH->loFd = -1;
	}
	else
	{
1149
		ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1150
	}
1151 1152
}

B
Bruce Momjian 已提交
1153 1154 1155 1156
/***********
 * Sorting and Reordering
 ***********/

B
Bruce Momjian 已提交
1157 1158
void
SortTocFromFile(Archive *AHX, RestoreOptions *ropt)
B
Bruce Momjian 已提交
1159
{
B
Bruce Momjian 已提交
1160 1161
	ArchiveHandle *AH = (ArchiveHandle *) AHX;
	FILE	   *fh;
1162 1163
	char		buf[100];
	bool		incomplete_line;
B
Bruce Momjian 已提交
1164 1165

	/* Allocate space for the 'wanted' array, and init it */
1166
	ropt->idWanted = (bool *) pg_malloc(sizeof(bool) * AH->maxDumpId);
1167
	memset(ropt->idWanted, 0, sizeof(bool) * AH->maxDumpId);
B
Bruce Momjian 已提交
1168

B
Bruce Momjian 已提交
1169 1170 1171
	/* Setup the file */
	fh = fopen(ropt->tocFile, PG_BINARY_R);
	if (!fh)
1172 1173
		exit_horribly(modulename, "could not open TOC file \"%s\": %s\n",
					  ropt->tocFile, strerror(errno));
B
Bruce Momjian 已提交
1174

1175
	incomplete_line = false;
1176
	while (fgets(buf, sizeof(buf), fh) != NULL)
B
Bruce Momjian 已提交
1177
	{
1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198
		bool		prev_incomplete_line = incomplete_line;
		int			buflen;
		char	   *cmnt;
		char	   *endptr;
		DumpId		id;
		TocEntry   *te;

		/*
		 * Some lines in the file might be longer than sizeof(buf).  This is
		 * no problem, since we only care about the leading numeric ID which
		 * can be at most a few characters; but we have to skip continuation
		 * bufferloads when processing a long line.
		 */
		buflen = strlen(buf);
		if (buflen > 0 && buf[buflen - 1] == '\n')
			incomplete_line = false;
		else
			incomplete_line = true;
		if (prev_incomplete_line)
			continue;

1199
		/* Truncate line at comment, if any */
B
Bruce Momjian 已提交
1200 1201 1202 1203
		cmnt = strchr(buf, ';');
		if (cmnt != NULL)
			cmnt[0] = '\0';

1204
		/* Ignore if all blank */
1205
		if (strspn(buf, " \t\r\n") == strlen(buf))
B
Bruce Momjian 已提交
1206 1207
			continue;

1208
		/* Get an ID, check it's valid and not already seen */
B
Bruce Momjian 已提交
1209
		id = strtol(buf, &endptr, 10);
1210 1211
		if (endptr == buf || id <= 0 || id > AH->maxDumpId ||
			ropt->idWanted[id - 1])
B
Bruce Momjian 已提交
1212
		{
1213
			write_msg(modulename, "WARNING: line ignored: %s\n", buf);
B
Bruce Momjian 已提交
1214 1215
			continue;
		}
B
Bruce Momjian 已提交
1216

B
Bruce Momjian 已提交
1217
		/* Find TOC entry */
1218
		te = getTocEntryByDumpId(AH, id);
B
Bruce Momjian 已提交
1219
		if (!te)
1220 1221
			exit_horribly(modulename, "could not find entry for ID %d\n",
						  id);
B
Bruce Momjian 已提交
1222

1223
		/* Mark it wanted */
1224
		ropt->idWanted[id - 1] = true;
B
Bruce Momjian 已提交
1225

1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237
		/*
		 * Move each item to the end of the list as it is selected, so that
		 * they are placed in the desired order.  Any unwanted items will end
		 * up at the front of the list, which may seem unintuitive but it's
		 * what we need.  In an ordinary serial restore that makes no
		 * difference, but in a parallel restore we need to mark unrestored
		 * items' dependencies as satisfied before we start examining
		 * restorable items.  Otherwise they could have surprising
		 * side-effects on the order in which restorable items actually get
		 * restored.
		 */
		_moveBefore(AH, AH->toc, te);
B
Bruce Momjian 已提交
1238
	}
B
Bruce Momjian 已提交
1239

B
Bruce Momjian 已提交
1240
	if (fclose(fh) != 0)
1241 1242
		exit_horribly(modulename, "could not close TOC file: %s\n",
					  strerror(errno));
B
Bruce Momjian 已提交
1243 1244 1245 1246 1247 1248 1249 1250
}

/**********************
 * 'Convenience functions that look like standard IO functions
 * for writing data when in dump mode.
 **********************/

/* Public */
1251
void
B
Bruce Momjian 已提交
1252 1253
archputs(const char *s, Archive *AH)
{
1254 1255
	WriteData(AH, s, strlen(s));
	return;
B
Bruce Momjian 已提交
1256 1257 1258
}

/* Public */
B
Bruce Momjian 已提交
1259 1260
int
archprintf(Archive *AH, const char *fmt,...)
B
Bruce Momjian 已提交
1261
{
1262 1263 1264
	char	   *p;
	size_t		len = 128;		/* initial assumption about buffer size */
	size_t		cnt;
B
Bruce Momjian 已提交
1265

1266
	for (;;)
B
Bruce Momjian 已提交
1267
	{
1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283
		va_list		args;

		/* Allocate work buffer. */
		p = (char *) pg_malloc(len);

		/* Try to format the data. */
		va_start(args, fmt);
		cnt = pvsnprintf(p, len, fmt, args);
		va_end(args);

		if (cnt < len)
			break;				/* success */

		/* Release buffer and loop around to try again with larger len. */
		free(p);
		len = cnt;
B
Bruce Momjian 已提交
1284
	}
1285

B
Bruce Momjian 已提交
1286 1287
	WriteData(AH, p, cnt);
	free(p);
1288
	return (int) cnt;
B
Bruce Momjian 已提交
1289 1290 1291 1292 1293 1294 1295
}


/*******************************
 * Stuff below here should be 'private' to the archiver routines
 *******************************/

1296
static void
1297
SetOutput(ArchiveHandle *AH, const char *filename, int compression)
B
Bruce Momjian 已提交
1298
{
1299
	int			fn;
B
Bruce Momjian 已提交
1300 1301

	if (filename)
1302
		fn = -1;
B
Bruce Momjian 已提交
1303 1304 1305 1306
	else if (AH->FH)
		fn = fileno(AH->FH);
	else if (AH->fSpec)
	{
1307
		fn = -1;
B
Bruce Momjian 已提交
1308 1309 1310 1311 1312 1313
		filename = AH->fSpec;
	}
	else
		fn = fileno(stdout);

	/* If compression explicitly requested, use gzopen */
1314
#ifdef HAVE_LIBZ
B
Bruce Momjian 已提交
1315 1316
	if (compression != 0)
	{
1317 1318 1319
		char		fmode[10];

		/* Don't use PG_BINARY_x since this is zlib */
1320
		sprintf(fmode, "wb%d", compression);
1321 1322
		if (fn >= 0)
			AH->OF = gzdopen(dup(fn), fmode);
B
Bruce Momjian 已提交
1323 1324
		else
			AH->OF = gzopen(filename, fmode);
1325
		AH->gzOut = 1;
B
Bruce Momjian 已提交
1326 1327
	}
	else
B
Bruce Momjian 已提交
1328
#endif
1329
	{							/* Use fopen */
1330 1331 1332 1333 1334 1335 1336
		if (AH->mode == archModeAppend)
		{
			if (fn >= 0)
				AH->OF = fdopen(dup(fn), PG_BINARY_A);
			else
				AH->OF = fopen(filename, PG_BINARY_A);
		}
B
Bruce Momjian 已提交
1337
		else
1338 1339 1340 1341 1342 1343
		{
			if (fn >= 0)
				AH->OF = fdopen(dup(fn), PG_BINARY_W);
			else
				AH->OF = fopen(filename, PG_BINARY_W);
		}
1344
		AH->gzOut = 0;
B
Bruce Momjian 已提交
1345
	}
B
Bruce Momjian 已提交
1346

1347
	if (!AH->OF)
1348 1349
	{
		if (filename)
1350 1351
			exit_horribly(modulename, "could not open output file \"%s\": %s\n",
						  filename, strerror(errno));
1352
		else
1353 1354
			exit_horribly(modulename, "could not open output file: %s\n",
						  strerror(errno));
1355
	}
1356 1357 1358 1359 1360 1361 1362 1363 1364
}

static OutputContext
SaveOutput(ArchiveHandle *AH)
{
	OutputContext sav;

	sav.OF = AH->OF;
	sav.gzOut = AH->gzOut;
1365

B
Bruce Momjian 已提交
1366
	return sav;
B
Bruce Momjian 已提交
1367 1368
}

1369
static void
1370
RestoreOutput(ArchiveHandle *AH, OutputContext savedContext)
B
Bruce Momjian 已提交
1371
{
B
Bruce Momjian 已提交
1372
	int			res;
1373

B
Bruce Momjian 已提交
1374
	if (AH->gzOut)
1375
		res = GZCLOSE(AH->OF);
B
Bruce Momjian 已提交
1376
	else
1377 1378 1379
		res = fclose(AH->OF);

	if (res != 0)
1380
		exit_horribly(modulename, "could not close output file: %s\n",
1381
					  strerror(errno));
B
Bruce Momjian 已提交
1382

1383 1384
	AH->gzOut = savedContext.gzOut;
	AH->OF = savedContext.OF;
B
Bruce Momjian 已提交
1385 1386 1387 1388 1389
}



/*
B
Bruce Momjian 已提交
1390
 *	Print formatted text to the output file (usually stdout).
B
Bruce Momjian 已提交
1391
 */
B
Bruce Momjian 已提交
1392 1393
int
ahprintf(ArchiveHandle *AH, const char *fmt,...)
B
Bruce Momjian 已提交
1394
{
1395 1396 1397
	char	   *p;
	size_t		len = 128;		/* initial assumption about buffer size */
	size_t		cnt;
B
Bruce Momjian 已提交
1398

1399
	for (;;)
1400
	{
1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416
		va_list		args;

		/* Allocate work buffer. */
		p = (char *) pg_malloc(len);

		/* Try to format the data. */
		va_start(args, fmt);
		cnt = pvsnprintf(p, len, fmt, args);
		va_end(args);

		if (cnt < len)
			break;				/* success */

		/* Release buffer and loop around to try again with larger len. */
		free(p);
		len = cnt;
B
Bruce Momjian 已提交
1417
	}
1418

B
Bruce Momjian 已提交
1419 1420
	ahwrite(p, 1, cnt, AH);
	free(p);
1421
	return (int) cnt;
B
Bruce Momjian 已提交
1422 1423
}

B
Bruce Momjian 已提交
1424 1425
void
ahlog(ArchiveHandle *AH, int level, const char *fmt,...)
1426 1427 1428 1429 1430 1431 1432
{
	va_list		ap;

	if (AH->debugLevel < level && (!AH->public.verbose || level > 1))
		return;

	va_start(ap, fmt);
T
Tom Lane 已提交
1433
	vwrite_msg(NULL, fmt, ap);
1434 1435 1436
	va_end(ap);
}

1437 1438 1439
/*
 * Single place for logic which says 'We are restoring to a direct DB connection'.
 */
1440
static int
B
Bruce Momjian 已提交
1441
RestoringToDB(ArchiveHandle *AH)
1442 1443 1444 1445
{
	return (AH->ropt && AH->ropt->useDB && AH->connection);
}

1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456
/*
 * Dump the current contents of the LO data buffer while writing a BLOB
 */
static void
dump_lo_buf(ArchiveHandle *AH)
{
	if (AH->connection)
	{
		size_t		res;

		res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
P
Peter Eisentraut 已提交
1457
		ahlog(AH, 5, ngettext("wrote %lu byte of large object data (result = %lu)\n",
1458
					 "wrote %lu bytes of large object data (result = %lu)\n",
P
Peter Eisentraut 已提交
1459
							  AH->lo_buf_used),
1460 1461
			  (unsigned long) AH->lo_buf_used, (unsigned long) res);
		if (res != AH->lo_buf_used)
1462
			exit_horribly(modulename,
B
Bruce Momjian 已提交
1463 1464
			"could not write to large object (result: %lu, expected: %lu)\n",
					   (unsigned long) res, (unsigned long) AH->lo_buf_used);
1465 1466 1467
	}
	else
	{
1468
		PQExpBuffer buf = createPQExpBuffer();
1469

1470 1471 1472 1473
		appendByteaLiteralAHX(buf,
							  (const unsigned char *) AH->lo_buf,
							  AH->lo_buf_used,
							  AH);
1474 1475 1476

		/* Hack: turn off writingBlob so ahwrite doesn't recurse to here */
		AH->writingBlob = 0;
1477
		ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1478 1479
		AH->writingBlob = 1;

1480
		destroyPQExpBuffer(buf);
1481 1482 1483 1484 1485
	}
	AH->lo_buf_used = 0;
}


B
Bruce Momjian 已提交
1486
/*
1487
 *	Write buffer to the output file (usually stdout). This is used for
B
Bruce Momjian 已提交
1488 1489
 *	outputting 'restore' scripts etc. It is even possible for an archive
 *	format to create a custom output routine to 'fake' a restore if it
1490
 *	wants to generate a script (see TAR output).
B
Bruce Momjian 已提交
1491
 */
1492
void
B
Bruce Momjian 已提交
1493
ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
B
Bruce Momjian 已提交
1494
{
B
Bruce Momjian 已提交
1495 1496
	int			bytes_written = 0;

B
Bruce Momjian 已提交
1497
	if (AH->writingBlob)
1498
	{
B
Bruce Momjian 已提交
1499
		size_t		remaining = size * nmemb;
1500 1501

		while (AH->lo_buf_used + remaining > AH->lo_buf_size)
P
Peter Eisentraut 已提交
1502
		{
1503 1504 1505 1506 1507 1508 1509
			size_t		avail = AH->lo_buf_size - AH->lo_buf_used;

			memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
			ptr = (const void *) ((const char *) ptr + avail);
			remaining -= avail;
			AH->lo_buf_used += avail;
			dump_lo_buf(AH);
P
Peter Eisentraut 已提交
1510 1511
		}

1512 1513 1514
		memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
		AH->lo_buf_used += remaining;

1515
		bytes_written = size * nmemb;
1516
	}
B
Bruce Momjian 已提交
1517
	else if (AH->gzOut)
1518
		bytes_written = GZWRITE(ptr, size, nmemb, AH->OF);
B
Bruce Momjian 已提交
1519
	else if (AH->CustomOutPtr)
1520
		bytes_written = AH->CustomOutPtr (AH, ptr, size * nmemb);
B
Bruce Momjian 已提交
1521

1522 1523 1524
	else
	{
		/*
B
Bruce Momjian 已提交
1525 1526 1527
		 * If we're doing a restore, and it's direct to DB, and we're
		 * connected then send it to the DB.
		 */
1528
		if (RestoringToDB(AH))
B
Bruce Momjian 已提交
1529
			bytes_written = ExecuteSqlCommandBuf(AH, (const char *) ptr, size * nmemb);
1530
		else
1531
			bytes_written = fwrite(ptr, size, nmemb, AH->OF) * size;
1532
	}
1533 1534 1535 1536 1537

	if (bytes_written != size * nmemb)
		WRITE_ERROR_EXIT;

	return;
B
Bruce Momjian 已提交
1538
}
1539

1540 1541
/* on some error, we may decide to go on... */
void
1542
warn_or_exit_horribly(ArchiveHandle *AH,
1543
					  const char *modulename, const char *fmt,...)
1544
{
B
Bruce Momjian 已提交
1545
	va_list		ap;
1546

B
Bruce Momjian 已提交
1547 1548
	switch (AH->stage)
	{
1549 1550 1551 1552 1553 1554

		case STAGE_NONE:
			/* Do nothing special */
			break;

		case STAGE_INITIALIZING:
B
Bruce Momjian 已提交
1555
			if (AH->stage != AH->lastErrorStage)
1556 1557 1558 1559
				write_msg(modulename, "Error while INITIALIZING:\n");
			break;

		case STAGE_PROCESSING:
B
Bruce Momjian 已提交
1560
			if (AH->stage != AH->lastErrorStage)
1561 1562 1563 1564
				write_msg(modulename, "Error while PROCESSING TOC:\n");
			break;

		case STAGE_FINALIZING:
B
Bruce Momjian 已提交
1565
			if (AH->stage != AH->lastErrorStage)
1566 1567 1568
				write_msg(modulename, "Error while FINALIZING:\n");
			break;
	}
B
Bruce Momjian 已提交
1569 1570
	if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
	{
1571 1572
		write_msg(modulename, "Error from TOC entry %d; %u %u %s %s %s\n",
				  AH->currentTE->dumpId,
B
Bruce Momjian 已提交
1573 1574
			 AH->currentTE->catalogId.tableoid, AH->currentTE->catalogId.oid,
			  AH->currentTE->desc, AH->currentTE->tag, AH->currentTE->owner);
1575 1576 1577 1578
	}
	AH->lastErrorStage = AH->stage;
	AH->lastErrorTE = AH->currentTE;

1579
	va_start(ap, fmt);
1580 1581 1582
	vwrite_msg(modulename, fmt, ap);
	va_end(ap);

1583
	if (AH->public.exit_on_error)
1584
		exit_nicely(1);
1585 1586 1587
	else
		AH->public.n_errors++;
}
1588

1589 1590
#ifdef NOT_USED

B
Bruce Momjian 已提交
1591 1592
static void
_moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
B
Bruce Momjian 已提交
1593
{
1594
	/* Unlink te from list */
B
Bruce Momjian 已提交
1595 1596
	te->prev->next = te->next;
	te->next->prev = te->prev;
B
Bruce Momjian 已提交
1597

1598
	/* and insert it after "pos" */
B
Bruce Momjian 已提交
1599 1600 1601 1602
	te->prev = pos;
	te->next = pos->next;
	pos->next->prev = te;
	pos->next = te;
B
Bruce Momjian 已提交
1603
}
1604
#endif
1605

B
Bruce Momjian 已提交
1606 1607
static void
_moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
B
Bruce Momjian 已提交
1608
{
1609
	/* Unlink te from list */
B
Bruce Momjian 已提交
1610 1611
	te->prev->next = te->next;
	te->next->prev = te->prev;
B
Bruce Momjian 已提交
1612

1613
	/* and insert it before "pos" */
B
Bruce Momjian 已提交
1614 1615 1616 1617
	te->prev = pos->prev;
	te->next = pos;
	pos->prev->next = te;
	pos->prev = te;
B
Bruce Momjian 已提交
1618
}
1619

1620 1621 1622 1623 1624 1625 1626
/*
 * Build index arrays for the TOC list
 *
 * This should be invoked only after we have created or read in all the TOC
 * items.
 *
 * The arrays are indexed by dump ID (so entry zero is unused).  Note that the
B
Bruce Momjian 已提交
1627
 * array entries run only up to maxDumpId.  We might see dependency dump IDs
1628 1629 1630 1631 1632
 * beyond that (if the dump was partial); so always check the array bound
 * before trying to touch an array entry.
 */
static void
buildTocEntryArrays(ArchiveHandle *AH)
B
Bruce Momjian 已提交
1633
{
1634
	DumpId		maxDumpId = AH->maxDumpId;
B
Bruce Momjian 已提交
1635 1636
	TocEntry   *te;

1637 1638
	AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
	AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1639

1640
	for (te = AH->toc->next; te != AH->toc; te = te->next)
B
Bruce Momjian 已提交
1641
	{
1642 1643
		/* this check is purely paranoia, maxDumpId should be correct */
		if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1644
			exit_horribly(modulename, "bad dumpId\n");
1645 1646 1647 1648 1649 1650

		/* tocsByDumpId indexes all TOCs by their dump ID */
		AH->tocsByDumpId[te->dumpId] = te;

		/*
		 * tableDataId provides the TABLE DATA item's dump ID for each TABLE
B
Bruce Momjian 已提交
1651
		 * TOC entry that has a DATA item.  We compute this by reversing the
1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664
		 * TABLE DATA item's dependency, knowing that a TABLE DATA item has
		 * just one dependency and it is the TABLE item.
		 */
		if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
		{
			DumpId		tableId = te->dependencies[0];

			/*
			 * The TABLE item might not have been in the archive, if this was
			 * a data-only dump; but its dump ID should be less than its data
			 * item's dump ID, so there should be a place for it in the array.
			 */
			if (tableId <= 0 || tableId > maxDumpId)
1665
				exit_horribly(modulename, "bad table dumpId for TABLE DATA item\n");
1666 1667 1668

			AH->tableDataId[tableId] = te->dumpId;
		}
B
Bruce Momjian 已提交
1669
	}
1670 1671
}

A
Andrew Dunstan 已提交
1672
TocEntry *
1673 1674 1675 1676 1677 1678 1679 1680 1681
getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
{
	/* build index arrays if we didn't already */
	if (AH->tocsByDumpId == NULL)
		buildTocEntryArrays(AH);

	if (id > 0 && id <= AH->maxDumpId)
		return AH->tocsByDumpId[id];

B
Bruce Momjian 已提交
1682
	return NULL;
B
Bruce Momjian 已提交
1683 1684
}

1685
teReqs
1686
TocIDRequired(ArchiveHandle *AH, DumpId id)
B
Bruce Momjian 已提交
1687
{
1688
	TocEntry   *te = getTocEntryByDumpId(AH, id);
B
Bruce Momjian 已提交
1689

B
Bruce Momjian 已提交
1690 1691
	if (!te)
		return 0;
B
Bruce Momjian 已提交
1692

1693
	return te->reqs;
B
Bruce Momjian 已提交
1694 1695
}

1696
size_t
1697
WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
1698
{
B
Bruce Momjian 已提交
1699
	int			off;
1700 1701 1702 1703

	/* Save the flag */
	(*AH->WriteBytePtr) (AH, wasSet);

1704 1705
	/* Write out pgoff_t smallest byte first, prevents endian mismatch */
	for (off = 0; off < sizeof(pgoff_t); off++)
1706
	{
B
Bruce Momjian 已提交
1707
		(*AH->WriteBytePtr) (AH, o & 0xFF);
1708 1709
		o >>= 8;
	}
1710
	return sizeof(pgoff_t) + 1;
1711 1712 1713
}

int
B
Bruce Momjian 已提交
1714
ReadOffset(ArchiveHandle *AH, pgoff_t * o)
1715
{
B
Bruce Momjian 已提交
1716 1717 1718
	int			i;
	int			off;
	int			offsetFlg;
1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729

	/* Initialize to zero */
	*o = 0;

	/* Check for old version */
	if (AH->version < K_VERS_1_7)
	{
		/* Prior versions wrote offsets using WriteInt */
		i = ReadInt(AH);
		/* -1 means not set */
		if (i < 0)
B
Bruce Momjian 已提交
1730
			return K_OFFSET_POS_NOT_SET;
1731
		else if (i == 0)
B
Bruce Momjian 已提交
1732
			return K_OFFSET_NO_DATA;
1733

1734 1735
		/* Cast to pgoff_t because it was written as an int. */
		*o = (pgoff_t) i;
1736 1737 1738 1739
		return K_OFFSET_POS_SET;
	}

	/*
B
Bruce Momjian 已提交
1740 1741
	 * Read the flag indicating the state of the data pointer. Check if valid
	 * and die if not.
1742
	 *
1743 1744
	 * This used to be handled by a negative or zero pointer, now we use an
	 * extra byte specifically for the state.
1745 1746 1747 1748 1749 1750 1751 1752 1753
	 */
	offsetFlg = (*AH->ReadBytePtr) (AH) & 0xFF;

	switch (offsetFlg)
	{
		case K_OFFSET_POS_NOT_SET:
		case K_OFFSET_NO_DATA:
		case K_OFFSET_POS_SET:

B
Bruce Momjian 已提交
1754
			break;
1755 1756

		default:
1757
			exit_horribly(modulename, "unexpected data offset flag %d\n", offsetFlg);
1758 1759 1760 1761 1762 1763 1764
	}

	/*
	 * Read the bytes
	 */
	for (off = 0; off < AH->offSize; off++)
	{
1765 1766
		if (off < sizeof(pgoff_t))
			*o |= ((pgoff_t) ((*AH->ReadBytePtr) (AH))) << (off * 8);
1767 1768 1769
		else
		{
			if ((*AH->ReadBytePtr) (AH) != 0)
1770
				exit_horribly(modulename, "file offset in dump file is too large\n");
1771 1772 1773 1774 1775 1776
		}
	}

	return offsetFlg;
}

P
Peter Eisentraut 已提交
1777
size_t
B
Bruce Momjian 已提交
1778
WriteInt(ArchiveHandle *AH, int i)
B
Bruce Momjian 已提交
1779
{
B
Bruce Momjian 已提交
1780 1781 1782
	int			b;

	/*
B
Bruce Momjian 已提交
1783 1784 1785 1786 1787
	 * This is a bit yucky, but I don't want to make the binary format very
	 * dependent on representation, and not knowing much about it, I write out
	 * a sign byte. If you change this, don't forget to change the file
	 * version #, and modify readInt to read the new format AS WELL AS the old
	 * formats.
B
Bruce Momjian 已提交
1788 1789 1790 1791 1792 1793
	 */

	/* SIGN byte */
	if (i < 0)
	{
		(*AH->WriteBytePtr) (AH, 1);
1794
		i = -i;
B
Bruce Momjian 已提交
1795 1796 1797 1798 1799 1800 1801
	}
	else
		(*AH->WriteBytePtr) (AH, 0);

	for (b = 0; b < AH->intSize; b++)
	{
		(*AH->WriteBytePtr) (AH, i & 0xFF);
1802
		i >>= 8;
B
Bruce Momjian 已提交
1803 1804 1805
	}

	return AH->intSize + 1;
B
Bruce Momjian 已提交
1806 1807
}

B
Bruce Momjian 已提交
1808 1809
int
ReadInt(ArchiveHandle *AH)
B
Bruce Momjian 已提交
1810
{
B
Bruce Momjian 已提交
1811 1812 1813 1814 1815
	int			res = 0;
	int			bv,
				b;
	int			sign = 0;		/* Default positive */
	int			bitShift = 0;
B
Bruce Momjian 已提交
1816

B
Bruce Momjian 已提交
1817
	if (AH->version > K_VERS_1_0)
1818
		/* Read a sign byte */
B
Bruce Momjian 已提交
1819
		sign = (*AH->ReadBytePtr) (AH);
B
Bruce Momjian 已提交
1820

B
Bruce Momjian 已提交
1821 1822 1823
	for (b = 0; b < AH->intSize; b++)
	{
		bv = (*AH->ReadBytePtr) (AH) & 0xFF;
1824 1825 1826
		if (bv != 0)
			res = res + (bv << bitShift);
		bitShift += 8;
B
Bruce Momjian 已提交
1827
	}
B
Bruce Momjian 已提交
1828

B
Bruce Momjian 已提交
1829 1830
	if (sign)
		res = -res;
B
Bruce Momjian 已提交
1831

B
Bruce Momjian 已提交
1832
	return res;
B
Bruce Momjian 已提交
1833 1834
}

P
Peter Eisentraut 已提交
1835
size_t
1836
WriteStr(ArchiveHandle *AH, const char *c)
B
Bruce Momjian 已提交
1837
{
P
Peter Eisentraut 已提交
1838
	size_t		res;
1839 1840 1841

	if (c)
	{
B
Bruce Momjian 已提交
1842 1843
		int			len = strlen(c);

1844 1845 1846
		res = WriteInt(AH, len);
		(*AH->WriteBufPtr) (AH, c, len);
		res += len;
1847 1848 1849 1850
	}
	else
		res = WriteInt(AH, -1);

B
Bruce Momjian 已提交
1851
	return res;
B
Bruce Momjian 已提交
1852 1853
}

B
Bruce Momjian 已提交
1854 1855
char *
ReadStr(ArchiveHandle *AH)
B
Bruce Momjian 已提交
1856
{
B
Bruce Momjian 已提交
1857 1858
	char	   *buf;
	int			l;
B
Bruce Momjian 已提交
1859

B
Bruce Momjian 已提交
1860
	l = ReadInt(AH);
1861
	if (l < 0)
1862 1863 1864
		buf = NULL;
	else
	{
1865
		buf = (char *) pg_malloc(l + 1);
1866
		(*AH->ReadBufPtr) (AH, (void *) buf, l);
1867

1868 1869
		buf[l] = '\0';
	}
B
Bruce Momjian 已提交
1870

B
Bruce Momjian 已提交
1871
	return buf;
B
Bruce Momjian 已提交
1872 1873
}

T
Tom Lane 已提交
1874
static int
B
Bruce Momjian 已提交
1875
_discoverArchiveFormat(ArchiveHandle *AH)
B
Bruce Momjian 已提交
1876
{
B
Bruce Momjian 已提交
1877 1878
	FILE	   *fh;
	char		sig[6];			/* More than enough */
P
Peter Eisentraut 已提交
1879
	size_t		cnt;
B
Bruce Momjian 已提交
1880
	int			wantClose = 0;
B
Bruce Momjian 已提交
1881

1882
#if 0
1883
	write_msg(modulename, "attempting to ascertain archive format\n");
1884
#endif
1885 1886 1887 1888 1889

	if (AH->lookahead)
		free(AH->lookahead);

	AH->lookaheadSize = 512;
1890
	AH->lookahead = pg_malloc0(512);
1891 1892
	AH->lookaheadLen = 0;
	AH->lookaheadPos = 0;
1893

B
Bruce Momjian 已提交
1894 1895
	if (AH->fSpec)
	{
1896
		struct stat st;
1897

1898
		wantClose = 1;
1899 1900 1901 1902 1903 1904 1905 1906

		/*
		 * Check if the specified archive is a directory. If so, check if
		 * there's a "toc.dat" (or "toc.dat.gz") file in it.
		 */
		if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
		{
			char		buf[MAXPGPATH];
1907

1908
			if (snprintf(buf, MAXPGPATH, "%s/toc.dat", AH->fSpec) >= MAXPGPATH)
1909 1910
				exit_horribly(modulename, "directory name too long: \"%s\"\n",
							  AH->fSpec);
1911 1912 1913 1914 1915 1916 1917 1918
			if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
			{
				AH->format = archDirectory;
				return AH->format;
			}

#ifdef HAVE_LIBZ
			if (snprintf(buf, MAXPGPATH, "%s/toc.dat.gz", AH->fSpec) >= MAXPGPATH)
1919 1920
				exit_horribly(modulename, "directory name too long: \"%s\"\n",
							  AH->fSpec);
1921 1922 1923 1924 1925 1926
			if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
			{
				AH->format = archDirectory;
				return AH->format;
			}
#endif
1927 1928
			exit_horribly(modulename, "directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)\n",
						  AH->fSpec);
1929
			fh = NULL;			/* keep compiler quiet */
1930 1931 1932 1933 1934
		}
		else
		{
			fh = fopen(AH->fSpec, PG_BINARY_R);
			if (!fh)
1935 1936
				exit_horribly(modulename, "could not open input file \"%s\": %s\n",
							  AH->fSpec, strerror(errno));
1937
		}
B
Bruce Momjian 已提交
1938 1939
	}
	else
1940
	{
1941
		fh = stdin;
1942
		if (!fh)
1943 1944
			exit_horribly(modulename, "could not open input file: %s\n",
						  strerror(errno));
1945
	}
B
Bruce Momjian 已提交
1946

1947
	if ((cnt = fread(sig, 1, 5, fh)) != 5)
1948 1949
	{
		if (ferror(fh))
1950
			exit_horribly(modulename, "could not read input file: %s\n", strerror(errno));
1951
		else
1952 1953
			exit_horribly(modulename, "input file is too short (read %lu, expected 5)\n",
						  (unsigned long) cnt);
1954
	}
B
Bruce Momjian 已提交
1955

B
Bruce Momjian 已提交
1956
	/* Save it, just in case we need it later */
1957 1958
	strncpy(&AH->lookahead[0], sig, 5);
	AH->lookaheadLen = 5;
B
Bruce Momjian 已提交
1959

B
Bruce Momjian 已提交
1960
	if (strncmp(sig, "PGDMP", 5) == 0)
1961
	{
B
Bruce Momjian 已提交
1962
		int			byteread;
S
Stephen Frost 已提交
1963

1964 1965 1966 1967 1968
		/*
		 * Finish reading (most of) a custom-format header.
		 *
		 * NB: this code must agree with ReadHead().
		 */
S
Stephen Frost 已提交
1969
		if ((byteread = fgetc(fh)) == EOF)
1970
			READ_ERROR_EXIT(fh);
S
Stephen Frost 已提交
1971 1972 1973 1974

		AH->vmaj = byteread;

		if ((byteread = fgetc(fh)) == EOF)
1975
			READ_ERROR_EXIT(fh);
S
Stephen Frost 已提交
1976 1977

		AH->vmin = byteread;
1978 1979 1980 1981 1982 1983

		/* Save these too... */
		AH->lookahead[AH->lookaheadLen++] = AH->vmaj;
		AH->lookahead[AH->lookaheadLen++] = AH->vmin;

		/* Check header version; varies from V1.0 */
B
Bruce Momjian 已提交
1984 1985
		if (AH->vmaj > 1 || ((AH->vmaj == 1) && (AH->vmin > 0)))		/* Version > 1.0 */
		{
S
Stephen Frost 已提交
1986
			if ((byteread = fgetc(fh)) == EOF)
1987
				READ_ERROR_EXIT(fh);
S
Stephen Frost 已提交
1988 1989

			AH->vrev = byteread;
1990 1991 1992 1993 1994
			AH->lookahead[AH->lookaheadLen++] = AH->vrev;
		}
		else
			AH->vrev = 0;

1995 1996 1997
		/* Make a convenient integer <maj><min><rev>00 */
		AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;

S
Stephen Frost 已提交
1998
		if ((AH->intSize = fgetc(fh)) == EOF)
1999
			READ_ERROR_EXIT(fh);
2000 2001
		AH->lookahead[AH->lookaheadLen++] = AH->intSize;

2002 2003
		if (AH->version >= K_VERS_1_7)
		{
S
Stephen Frost 已提交
2004
			if ((AH->offSize = fgetc(fh)) == EOF)
2005
				READ_ERROR_EXIT(fh);
2006 2007 2008 2009 2010
			AH->lookahead[AH->lookaheadLen++] = AH->offSize;
		}
		else
			AH->offSize = AH->intSize;

S
Stephen Frost 已提交
2011
		if ((byteread = fgetc(fh)) == EOF)
2012
			READ_ERROR_EXIT(fh);
S
Stephen Frost 已提交
2013 2014

		AH->format = byteread;
2015
		AH->lookahead[AH->lookaheadLen++] = AH->format;
B
Bruce Momjian 已提交
2016 2017 2018
	}
	else
	{
2019
		/*
2020 2021
		 * *Maybe* we have a tar archive format file or a text dump ... So,
		 * read first 512 byte header...
2022 2023
		 */
		cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2024
		/* read failure is checked below */
2025
		AH->lookaheadLen += cnt;
B
Bruce Momjian 已提交
2026

2027 2028 2029 2030
		if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
			(strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
			 strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
		{
2031 2032 2033 2034
			/*
			 * looks like it's probably a text format dump. so suggest they
			 * try psql
			 */
2035
			exit_horribly(modulename, "input file appears to be a text format dump. Please use psql.\n");
2036 2037
		}

2038 2039 2040 2041 2042 2043 2044
		if (AH->lookaheadLen != 512)
		{
			if (feof(fh))
				exit_horribly(modulename, "input file does not appear to be a valid archive (too short?)\n");
			else
				READ_ERROR_EXIT(fh);
		}
B
Bruce Momjian 已提交
2045

2046
		if (!isValidTarHeader(AH->lookahead))
2047
			exit_horribly(modulename, "input file does not appear to be a valid archive\n");
B
Bruce Momjian 已提交
2048

2049 2050
		AH->format = archTar;
	}
B
Bruce Momjian 已提交
2051

B
Bruce Momjian 已提交
2052
	/* If we can't seek, then mark the header as read */
P
Peter Eisentraut 已提交
2053
	if (fseeko(fh, 0, SEEK_SET) != 0)
2054 2055
	{
		/*
B
Bruce Momjian 已提交
2056 2057
		 * NOTE: Formats that use the lookahead buffer can unset this in their
		 * Init routine.
2058 2059 2060 2061
		 */
		AH->readHeader = 1;
	}
	else
B
Bruce Momjian 已提交
2062
		AH->lookaheadLen = 0;	/* Don't bother since we've reset the file */
2063

B
Bruce Momjian 已提交
2064 2065
	/* Close the file */
	if (wantClose)
2066
		if (fclose(fh) != 0)
2067 2068
			exit_horribly(modulename, "could not close input file: %s\n",
						  strerror(errno));
B
Bruce Momjian 已提交
2069

B
Bruce Momjian 已提交
2070
	return AH->format;
B
Bruce Momjian 已提交
2071 2072 2073 2074 2075 2076
}


/*
 * Allocate an archive handle
 */
B
Bruce Momjian 已提交
2077 2078
static ArchiveHandle *
_allocAH(const char *FileSpec, const ArchiveFormat fmt,
A
Andrew Dunstan 已提交
2079
	  const int compression, ArchiveMode mode, SetupWorkerPtr setupWorkerPtr)
2080
{
B
Bruce Momjian 已提交
2081
	ArchiveHandle *AH;
B
Bruce Momjian 已提交
2082

2083
#if 0
2084
	write_msg(modulename, "allocating AH for %s, format %d\n", FileSpec, fmt);
2085
#endif
2086

2087
	AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
B
Bruce Momjian 已提交
2088

2089 2090
	/* AH->debugLevel = 100; */

B
Bruce Momjian 已提交
2091 2092
	AH->vmaj = K_VERS_MAJOR;
	AH->vmin = K_VERS_MINOR;
2093
	AH->vrev = K_VERS_REV;
B
Bruce Momjian 已提交
2094

2095 2096 2097
	/* Make a convenient integer <maj><min><rev>00 */
	AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;

2098
	/* initialize for backwards compatible string processing */
2099
	AH->public.encoding = 0;	/* PG_SQL_ASCII */
2100 2101 2102 2103 2104 2105
	AH->public.std_strings = false;

	/* sql error handling */
	AH->public.exit_on_error = true;
	AH->public.n_errors = 0;

2106 2107
	AH->archiveDumpVersion = PG_VERSION;

2108 2109
	AH->createDate = time(NULL);

B
Bruce Momjian 已提交
2110
	AH->intSize = sizeof(int);
2111
	AH->offSize = sizeof(pgoff_t);
B
Bruce Momjian 已提交
2112 2113
	if (FileSpec)
	{
2114
		AH->fSpec = pg_strdup(FileSpec);
B
Bruce Momjian 已提交
2115

2116 2117 2118
		/*
		 * Not used; maybe later....
		 *
2119
		 * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2120
		 * i--) if (AH->workDir[i-1] == '/')
2121
		 */
B
Bruce Momjian 已提交
2122 2123
	}
	else
2124
		AH->fSpec = NULL;
B
Bruce Momjian 已提交
2125

2126 2127 2128
	AH->currUser = NULL;		/* unknown */
	AH->currSchema = NULL;		/* ditto */
	AH->currTablespace = NULL;	/* ditto */
2129
	AH->currWithOids = -1;		/* force SET */
B
Bruce Momjian 已提交
2130

2131
	AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
B
Bruce Momjian 已提交
2132

B
Bruce Momjian 已提交
2133 2134 2135 2136 2137
	AH->toc->next = AH->toc;
	AH->toc->prev = AH->toc;

	AH->mode = mode;
	AH->compression = compression;
B
Bruce Momjian 已提交
2138

2139 2140
	memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));

B
Bruce Momjian 已提交
2141 2142 2143
	/* Open stdout with no compression for AH output handle */
	AH->gzOut = 0;
	AH->OF = stdout;
B
Bruce Momjian 已提交
2144

2145 2146
	/*
	 * On Windows, we need to use binary mode to read/write non-text archive
B
Bruce Momjian 已提交
2147 2148
	 * formats.  Force stdin/stdout into binary mode if that is what we are
	 * using.
2149 2150
	 */
#ifdef WIN32
2151 2152
	if (fmt != archNull &&
		(AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2153 2154 2155 2156 2157 2158 2159 2160
	{
		if (mode == archModeWrite)
			setmode(fileno(stdout), O_BINARY);
		else
			setmode(fileno(stdin), O_BINARY);
	}
#endif

A
Andrew Dunstan 已提交
2161 2162
	AH->SetupWorkerPtr = setupWorkerPtr;

B
Bruce Momjian 已提交
2163
	if (fmt == archUnknown)
2164 2165 2166
		AH->format = _discoverArchiveFormat(AH);
	else
		AH->format = fmt;
B
Bruce Momjian 已提交
2167

2168 2169
	AH->promptPassword = TRI_DEFAULT;

B
Bruce Momjian 已提交
2170 2171
	switch (AH->format)
	{
2172 2173 2174
		case archCustom:
			InitArchiveFmt_Custom(AH);
			break;
B
Bruce Momjian 已提交
2175

2176 2177 2178
		case archNull:
			InitArchiveFmt_Null(AH);
			break;
B
Bruce Momjian 已提交
2179

2180 2181 2182 2183
		case archDirectory:
			InitArchiveFmt_Directory(AH);
			break;

2184 2185 2186 2187 2188
		case archTar:
			InitArchiveFmt_Tar(AH);
			break;

		default:
2189
			exit_horribly(modulename, "unrecognized file format \"%d\"\n", fmt);
B
Bruce Momjian 已提交
2190
	}
B
Bruce Momjian 已提交
2191

B
Bruce Momjian 已提交
2192
	return AH;
B
Bruce Momjian 已提交
2193 2194
}

B
Bruce Momjian 已提交
2195
void
A
Andrew Dunstan 已提交
2196
WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
B
Bruce Momjian 已提交
2197
{
2198
	TocEntry   *te;
B
Bruce Momjian 已提交
2199

2200
	for (te = AH->toc->next; te != AH->toc; te = te->next)
B
Bruce Momjian 已提交
2201
	{
A
Andrew Dunstan 已提交
2202 2203
		if (!te->dataDumper)
			continue;
B
Bruce Momjian 已提交
2204

A
Andrew Dunstan 已提交
2205 2206
		if ((te->reqs & REQ_DATA) == 0)
			continue;
B
Bruce Momjian 已提交
2207

A
Andrew Dunstan 已提交
2208 2209
		if (pstate && pstate->numWorkers > 1)
		{
B
Bruce Momjian 已提交
2210
			/*
A
Andrew Dunstan 已提交
2211 2212
			 * If we are in a parallel backup, then we are always the master
			 * process.
B
Bruce Momjian 已提交
2213
			 */
A
Andrew Dunstan 已提交
2214 2215 2216 2217 2218 2219 2220 2221 2222
			EnsureIdleWorker(AH, pstate);
			Assert(GetIdleWorker(pstate) != NO_SLOT);
			DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP);
		}
		else
			WriteDataChunksForTocEntry(AH, te);
	}
	EnsureWorkersFinished(AH, pstate);
}
B
Bruce Momjian 已提交
2223

A
Andrew Dunstan 已提交
2224 2225 2226 2227 2228
void
WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
{
	StartDataPtr startPtr;
	EndDataPtr	endPtr;
B
Bruce Momjian 已提交
2229

A
Andrew Dunstan 已提交
2230 2231 2232 2233 2234 2235
	AH->currToc = te;

	if (strcmp(te->desc, "BLOBS") == 0)
	{
		startPtr = AH->StartBlobsPtr;
		endPtr = AH->EndBlobsPtr;
B
Bruce Momjian 已提交
2236
	}
A
Andrew Dunstan 已提交
2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254
	else
	{
		startPtr = AH->StartDataPtr;
		endPtr = AH->EndDataPtr;
	}

	if (startPtr != NULL)
		(*startPtr) (AH, te);

	/*
	 * The user-provided DataDumper routine needs to call AH->WriteData
	 */
	(*te->dataDumper) ((Archive *) AH, te->dataDumperArg);

	if (endPtr != NULL)
		(*endPtr) (AH, te);

	AH->currToc = NULL;
B
Bruce Momjian 已提交
2255 2256
}

B
Bruce Momjian 已提交
2257 2258
void
WriteToc(ArchiveHandle *AH)
B
Bruce Momjian 已提交
2259
{
2260 2261
	TocEntry   *te;
	char		workbuf[32];
2262
	int			tocCount;
2263
	int			i;
B
Bruce Momjian 已提交
2264

2265 2266 2267 2268 2269 2270 2271 2272 2273
	/* count entries that will actually be dumped */
	tocCount = 0;
	for (te = AH->toc->next; te != AH->toc; te = te->next)
	{
		if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) != 0)
			tocCount++;
	}

	/* printf("%d TOC Entries to save\n", tocCount); */
B
Bruce Momjian 已提交
2274

2275
	WriteInt(AH, tocCount);
2276 2277

	for (te = AH->toc->next; te != AH->toc; te = te->next)
B
Bruce Momjian 已提交
2278
	{
2279 2280 2281
		if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) == 0)
			continue;

2282
		WriteInt(AH, te->dumpId);
B
Bruce Momjian 已提交
2283
		WriteInt(AH, te->dataDumper ? 1 : 0);
2284 2285 2286 2287 2288 2289

		/* OID is recorded as a string for historical reasons */
		sprintf(workbuf, "%u", te->catalogId.tableoid);
		WriteStr(AH, workbuf);
		sprintf(workbuf, "%u", te->catalogId.oid);
		WriteStr(AH, workbuf);
2290

2291
		WriteStr(AH, te->tag);
B
Bruce Momjian 已提交
2292
		WriteStr(AH, te->desc);
2293
		WriteInt(AH, te->section);
B
Bruce Momjian 已提交
2294 2295 2296
		WriteStr(AH, te->defn);
		WriteStr(AH, te->dropStmt);
		WriteStr(AH, te->copyStmt);
2297
		WriteStr(AH, te->namespace);
2298
		WriteStr(AH, te->tablespace);
B
Bruce Momjian 已提交
2299
		WriteStr(AH, te->owner);
2300
		WriteStr(AH, te->withOids ? "true" : "false");
2301 2302

		/* Dump list of dependencies */
2303
		for (i = 0; i < te->nDeps; i++)
2304
		{
2305 2306
			sprintf(workbuf, "%d", te->dependencies[i]);
			WriteStr(AH, workbuf);
2307
		}
2308
		WriteStr(AH, NULL);		/* Terminate List */
2309

B
Bruce Momjian 已提交
2310 2311 2312
		if (AH->WriteExtraTocPtr)
			(*AH->WriteExtraTocPtr) (AH, te);
	}
B
Bruce Momjian 已提交
2313 2314
}

B
Bruce Momjian 已提交
2315 2316
void
ReadToc(ArchiveHandle *AH)
B
Bruce Momjian 已提交
2317
{
B
Bruce Momjian 已提交
2318
	int			i;
2319 2320
	char	   *tmp;
	DumpId	   *deps;
2321 2322
	int			depIdx;
	int			depSize;
2323
	TocEntry   *te;
B
Bruce Momjian 已提交
2324

B
Bruce Momjian 已提交
2325
	AH->tocCount = ReadInt(AH);
2326
	AH->maxDumpId = 0;
B
Bruce Momjian 已提交
2327

B
Bruce Momjian 已提交
2328 2329
	for (i = 0; i < AH->tocCount; i++)
	{
2330
		te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2331 2332 2333 2334
		te->dumpId = ReadInt(AH);

		if (te->dumpId > AH->maxDumpId)
			AH->maxDumpId = te->dumpId;
2335 2336

		/* Sanity check */
2337
		if (te->dumpId <= 0)
2338
			exit_horribly(modulename,
2339
					   "entry ID %d out of range -- perhaps a corrupt TOC\n",
2340
						  te->dumpId);
2341 2342

		te->hadDumper = ReadInt(AH);
2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354

		if (AH->version >= K_VERS_1_8)
		{
			tmp = ReadStr(AH);
			sscanf(tmp, "%u", &te->catalogId.tableoid);
			free(tmp);
		}
		else
			te->catalogId.tableoid = InvalidOid;
		tmp = ReadStr(AH);
		sscanf(tmp, "%u", &te->catalogId.oid);
		free(tmp);
2355

2356
		te->tag = ReadStr(AH);
2357
		te->desc = ReadStr(AH);
2358 2359 2360 2361 2362 2363 2364 2365

		if (AH->version >= K_VERS_1_11)
		{
			te->section = ReadInt(AH);
		}
		else
		{
			/*
2366 2367 2368
			 * Rules for pre-8.4 archives wherein pg_dump hasn't classified
			 * the entries into sections.  This list need not cover entry
			 * types added later than 8.4.
2369 2370
			 */
			if (strcmp(te->desc, "COMMENT") == 0 ||
2371
				strcmp(te->desc, "ACL") == 0 ||
2372
				strcmp(te->desc, "ACL LANGUAGE") == 0)
2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388
				te->section = SECTION_NONE;
			else if (strcmp(te->desc, "TABLE DATA") == 0 ||
					 strcmp(te->desc, "BLOBS") == 0 ||
					 strcmp(te->desc, "BLOB COMMENTS") == 0)
				te->section = SECTION_DATA;
			else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
					 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
					 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
					 strcmp(te->desc, "INDEX") == 0 ||
					 strcmp(te->desc, "RULE") == 0 ||
					 strcmp(te->desc, "TRIGGER") == 0)
				te->section = SECTION_POST_DATA;
			else
				te->section = SECTION_PRE_DATA;
		}

2389 2390 2391 2392 2393 2394
		te->defn = ReadStr(AH);
		te->dropStmt = ReadStr(AH);

		if (AH->version >= K_VERS_1_3)
			te->copyStmt = ReadStr(AH);

2395 2396 2397
		if (AH->version >= K_VERS_1_6)
			te->namespace = ReadStr(AH);

2398 2399 2400
		if (AH->version >= K_VERS_1_10)
			te->tablespace = ReadStr(AH);

2401
		te->owner = ReadStr(AH);
2402 2403 2404 2405 2406 2407 2408 2409 2410
		if (AH->version >= K_VERS_1_9)
		{
			if (strcmp(ReadStr(AH), "true") == 0)
				te->withOids = true;
			else
				te->withOids = false;
		}
		else
			te->withOids = true;
B
Bruce Momjian 已提交
2411

2412 2413 2414 2415
		/* Read TOC entry dependencies */
		if (AH->version >= K_VERS_1_5)
		{
			depSize = 100;
2416
			deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2417
			depIdx = 0;
2418
			for (;;)
2419
			{
2420 2421 2422
				tmp = ReadStr(AH);
				if (!tmp)
					break;		/* end of list */
2423
				if (depIdx >= depSize)
2424 2425
				{
					depSize *= 2;
T
Tom Lane 已提交
2426
					deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2427
				}
2428 2429 2430 2431
				sscanf(tmp, "%d", &deps[depIdx]);
				free(tmp);
				depIdx++;
			}
2432

2433 2434
			if (depIdx > 0)		/* We have a non-null entry */
			{
T
Tom Lane 已提交
2435
				deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2436 2437 2438
				te->dependencies = deps;
				te->nDeps = depIdx;
			}
2439
			else
2440 2441
			{
				free(deps);
2442 2443
				te->dependencies = NULL;
				te->nDeps = 0;
2444
			}
2445
		}
2446
		else
2447 2448 2449 2450
		{
			te->dependencies = NULL;
			te->nDeps = 0;
		}
2451

B
Bruce Momjian 已提交
2452 2453
		if (AH->ReadExtraTocPtr)
			(*AH->ReadExtraTocPtr) (AH, te);
2454

2455 2456
		ahlog(AH, 3, "read TOC entry %d (ID %d) for %s %s\n",
			  i, te->dumpId, te->desc, te->tag);
2457

2458
		/* link completed entry into TOC circular list */
2459 2460 2461 2462
		te->prev = AH->toc->prev;
		AH->toc->prev->next = te;
		AH->toc->prev = te;
		te->next = AH->toc;
2463 2464 2465 2466 2467 2468

		/* special processing immediately upon read for some items */
		if (strcmp(te->desc, "ENCODING") == 0)
			processEncodingEntry(AH, te);
		else if (strcmp(te->desc, "STDSTRINGS") == 0)
			processStdStringsEntry(AH, te);
B
Bruce Momjian 已提交
2469
	}
B
Bruce Momjian 已提交
2470 2471
}

2472 2473 2474 2475
static void
processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
{
	/* te->defn should have the form SET client_encoding = 'foo'; */
2476
	char	   *defn = pg_strdup(te->defn);
2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488
	char	   *ptr1;
	char	   *ptr2 = NULL;
	int			encoding;

	ptr1 = strchr(defn, '\'');
	if (ptr1)
		ptr2 = strchr(++ptr1, '\'');
	if (ptr2)
	{
		*ptr2 = '\0';
		encoding = pg_char_to_encoding(ptr1);
		if (encoding < 0)
2489 2490
			exit_horribly(modulename, "unrecognized encoding \"%s\"\n",
						  ptr1);
2491 2492 2493
		AH->public.encoding = encoding;
	}
	else
2494 2495
		exit_horribly(modulename, "invalid ENCODING item: %s\n",
					  te->defn);
2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511

	free(defn);
}

static void
processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
{
	/* te->defn should have the form SET standard_conforming_strings = 'x'; */
	char	   *ptr1;

	ptr1 = strchr(te->defn, '\'');
	if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
		AH->public.std_strings = true;
	else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
		AH->public.std_strings = false;
	else
2512 2513
		exit_horribly(modulename, "invalid STDSTRINGS item: %s\n",
					  te->defn);
2514 2515
}

2516
static teReqs
2517
_tocEntryRequired(TocEntry *te, teSection curSection, RestoreOptions *ropt)
B
Bruce Momjian 已提交
2518
{
2519
	teReqs		res = REQ_SCHEMA | REQ_DATA;
B
Bruce Momjian 已提交
2520

2521
	/* ENCODING and STDSTRINGS items are treated specially */
2522 2523
	if (strcmp(te->desc, "ENCODING") == 0 ||
		strcmp(te->desc, "STDSTRINGS") == 0)
2524
		return REQ_SPECIAL;
2525

B
Bruce Momjian 已提交
2526
	/* If it's an ACL, maybe ignore it */
2527
	if (ropt->aclsSkip && _tocEntryIsACL(te))
2528
		return 0;
B
Bruce Momjian 已提交
2529

R
Robert Haas 已提交
2530
	/* If it's security labels, maybe ignore it */
2531
	if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
R
Robert Haas 已提交
2532 2533
		return 0;

2534 2535
	/* Ignore it if section is not to be dumped/restored */
	switch (curSection)
2536
	{
2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550
		case SECTION_PRE_DATA:
			if (!(ropt->dumpSections & DUMP_PRE_DATA))
				return 0;
			break;
		case SECTION_DATA:
			if (!(ropt->dumpSections & DUMP_DATA))
				return 0;
			break;
		case SECTION_POST_DATA:
			if (!(ropt->dumpSections & DUMP_POST_DATA))
				return 0;
			break;
		default:
			/* shouldn't get here, really, but ignore it */
2551 2552 2553
			return 0;
	}

2554
	/* Check options for selective dump/restore */
2555
	if (ropt->schemaNames.head != NULL)
2556 2557 2558 2559
	{
		/* If no namespace is specified, it means all. */
		if (!te->namespace)
			return 0;
2560
		if (!(simple_string_list_member(&ropt->schemaNames, te->namespace)))
2561 2562 2563
			return 0;
	}

B
Bruce Momjian 已提交
2564 2565
	if (ropt->selTypes)
	{
2566 2567
		if (strcmp(te->desc, "TABLE") == 0 ||
			strcmp(te->desc, "TABLE DATA") == 0)
2568 2569 2570
		{
			if (!ropt->selTable)
				return 0;
2571
			if (ropt->tableNames.head != NULL && (!(simple_string_list_member(&ropt->tableNames, te->tag))))
2572
				return 0;
B
Bruce Momjian 已提交
2573 2574 2575
		}
		else if (strcmp(te->desc, "INDEX") == 0)
		{
2576 2577
			if (!ropt->selIndex)
				return 0;
2578
			if (ropt->indexNames.head != NULL && (!(simple_string_list_member(&ropt->indexNames, te->tag))))
2579
				return 0;
B
Bruce Momjian 已提交
2580 2581 2582
		}
		else if (strcmp(te->desc, "FUNCTION") == 0)
		{
2583 2584
			if (!ropt->selFunction)
				return 0;
2585
			if (ropt->functionNames.head != NULL && (!(simple_string_list_member(&ropt->functionNames, te->tag))))
2586
				return 0;
B
Bruce Momjian 已提交
2587 2588 2589
		}
		else if (strcmp(te->desc, "TRIGGER") == 0)
		{
2590 2591
			if (!ropt->selTrigger)
				return 0;
2592
			if (ropt->triggerNames.head != NULL && (!(simple_string_list_member(&ropt->triggerNames, te->tag))))
2593 2594
				return 0;
		}
B
Bruce Momjian 已提交
2595 2596
		else
			return 0;
B
Bruce Momjian 已提交
2597 2598
	}

2599
	/*
B
Bruce Momjian 已提交
2600
	 * Check if we had a dataDumper. Indicates if the entry is schema or data
2601 2602 2603 2604
	 */
	if (!te->hadDumper)
	{
		/*
B
Bruce Momjian 已提交
2605 2606 2607 2608 2609
		 * Special Case: If 'SEQUENCE SET' or anything to do with BLOBs, then
		 * it is considered a data entry.  We don't need to check for the
		 * BLOBS entry or old-style BLOB COMMENTS, because they will have
		 * hadDumper = true ... but we do need to check new-style BLOB
		 * comments.
2610
		 */
2611 2612 2613 2614 2615
		if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
			strcmp(te->desc, "BLOB") == 0 ||
			(strcmp(te->desc, "ACL") == 0 &&
			 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
			(strcmp(te->desc, "COMMENT") == 0 &&
R
Robert Haas 已提交
2616 2617
			 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
			(strcmp(te->desc, "SECURITY LABEL") == 0 &&
2618
			 strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
2619 2620
			res = res & REQ_DATA;
		else
2621 2622
			res = res & ~REQ_DATA;
	}
2623

2624
	/*
B
Bruce Momjian 已提交
2625 2626
	 * Special case: <Init> type with <Max OID> tag; this is obsolete and we
	 * always ignore it.
2627
	 */
2628
	if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
2629
		return 0;
2630

B
Bruce Momjian 已提交
2631 2632
	/* Mask it if we only want schema */
	if (ropt->schemaOnly)
2633
		res = res & REQ_SCHEMA;
B
Bruce Momjian 已提交
2634

2635
	/* Mask it if we only want data */
2636
	if (ropt->dataOnly)
2637
		res = res & REQ_DATA;
B
Bruce Momjian 已提交
2638

2639
	/* Mask it if we don't have a schema contribution */
B
Bruce Momjian 已提交
2640
	if (!te->defn || strlen(te->defn) == 0)
2641
		res = res & ~REQ_SCHEMA;
B
Bruce Momjian 已提交
2642

2643 2644
	/* Finally, if there's a per-ID filter, limit based on that as well */
	if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2645
		return 0;
B
Bruce Momjian 已提交
2646

B
Bruce Momjian 已提交
2647
	return res;
B
Bruce Momjian 已提交
2648 2649
}

2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663
/*
 * Identify TOC entries that are ACLs.
 */
static bool
_tocEntryIsACL(TocEntry *te)
{
	/* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
	if (strcmp(te->desc, "ACL") == 0 ||
		strcmp(te->desc, "ACL LANGUAGE") == 0 ||
		strcmp(te->desc, "DEFAULT ACL") == 0)
		return true;
	return false;
}

2664 2665 2666 2667 2668 2669 2670
/*
 * Issue SET commands for parameters that we want to have set the same way
 * at all times during execution of a restore script.
 */
static void
_doSetFixedOutputState(ArchiveHandle *AH)
{
2671
	/* Disable statement_timeout since restore is probably slow */
2672
	ahprintf(AH, "SET statement_timeout = 0;\n");
2673

2674 2675 2676
	/* Likewise for lock_timeout */
	ahprintf(AH, "SET lock_timeout = 0;\n");

2677 2678 2679
	/* Select the correct character set encoding */
	ahprintf(AH, "SET client_encoding = '%s';\n",
			 pg_encoding_to_char(AH->public.encoding));
2680

2681 2682 2683
	/* Select the correct string literal syntax */
	ahprintf(AH, "SET standard_conforming_strings = %s;\n",
			 AH->public.std_strings ? "on" : "off");
2684

2685 2686 2687 2688
	/* Select the role to be used during restore */
	if (AH->ropt && AH->ropt->use_role)
		ahprintf(AH, "SET ROLE %s;\n", fmtId(AH->ropt->use_role));

2689 2690 2691
	/* Make sure function checking is disabled */
	ahprintf(AH, "SET check_function_bodies = false;\n");

2692 2693
	/* Avoid annoying notices etc */
	ahprintf(AH, "SET client_min_messages = warning;\n");
2694 2695
	if (!AH->public.std_strings)
		ahprintf(AH, "SET escape_string_warning = off;\n");
2696

2697 2698 2699
	ahprintf(AH, "\n");
}

2700 2701
/*
 * Issue a SET SESSION AUTHORIZATION command.  Caller is responsible
2702 2703
 * for updating state if appropriate.  If user is NULL or an empty string,
 * the specification DEFAULT will be used.
2704 2705
 */
static void
2706
_doSetSessionAuth(ArchiveHandle *AH, const char *user)
2707
{
2708
	PQExpBuffer cmd = createPQExpBuffer();
B
Bruce Momjian 已提交
2709

2710
	appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
B
Bruce Momjian 已提交
2711

2712
	/*
B
Bruce Momjian 已提交
2713
	 * SQL requires a string literal here.  Might as well be correct.
2714 2715
	 */
	if (user && *user)
2716
		appendStringLiteralAHX(cmd, user, AH);
2717
	else
2718 2719
		appendPQExpBufferStr(cmd, "DEFAULT");
	appendPQExpBufferChar(cmd, ';');
2720

2721 2722 2723 2724
	if (RestoringToDB(AH))
	{
		PGresult   *res;

2725
		res = PQexec(AH->connection, cmd->data);
2726 2727

		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2728 2729 2730
			/* NOT warn_or_exit_horribly... use -O instead to skip this. */
			exit_horribly(modulename, "could not set session user to \"%s\": %s",
						  user, PQerrorMessage(AH->connection));
2731 2732 2733 2734

		PQclear(res);
	}
	else
2735 2736 2737
		ahprintf(AH, "%s\n\n", cmd->data);

	destroyPQExpBuffer(cmd);
2738 2739
}

2740

2741 2742 2743 2744 2745 2746 2747 2748 2749 2750
/*
 * Issue a SET default_with_oids command.  Caller is responsible
 * for updating state if appropriate.
 */
static void
_doSetWithOids(ArchiveHandle *AH, const bool withOids)
{
	PQExpBuffer cmd = createPQExpBuffer();

	appendPQExpBuffer(cmd, "SET default_with_oids = %s;", withOids ?
B
Bruce Momjian 已提交
2751
					  "true" : "false");
2752 2753 2754 2755 2756 2757 2758 2759

	if (RestoringToDB(AH))
	{
		PGresult   *res;

		res = PQexec(AH->connection, cmd->data);

		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2760 2761 2762
			warn_or_exit_horribly(AH, modulename,
								  "could not set default_with_oids: %s",
								  PQerrorMessage(AH->connection));
2763 2764 2765 2766 2767 2768 2769 2770 2771 2772

		PQclear(res);
	}
	else
		ahprintf(AH, "%s\n\n", cmd->data);

	destroyPQExpBuffer(cmd);
}


2773
/*
2774
 * Issue the commands to connect to the specified database.
2775 2776
 *
 * If we're currently restoring right into a database, this will
B
Bruce Momjian 已提交
2777
 * actually establish a connection. Otherwise it puts a \connect into
2778
 * the script output.
2779 2780
 *
 * NULL dbname implies reconnecting to the current DB (pretty useless).
2781
 */
B
Bruce Momjian 已提交
2782
static void
2783
_reconnectToDB(ArchiveHandle *AH, const char *dbname)
2784
{
2785
	if (RestoringToDB(AH))
2786
		ReconnectToServer(AH, dbname, NULL);
2787
	else
2788 2789 2790
	{
		PQExpBuffer qry = createPQExpBuffer();

2791
		appendPQExpBuffer(qry, "\\connect %s\n\n",
2792
						  dbname ? fmtId(dbname) : "-");
2793
		ahprintf(AH, "%s", qry->data);
2794 2795
		destroyPQExpBuffer(qry);
	}
2796

2797
	/*
B
Bruce Momjian 已提交
2798 2799
	 * NOTE: currUser keeps track of what the imaginary session user in our
	 * script is.  It's now effectively reset to the original userID.
2800
	 */
2801 2802
	if (AH->currUser)
		free(AH->currUser);
2803
	AH->currUser = NULL;
2804

2805
	/* don't assume we still know the output schema, tablespace, etc either */
2806 2807
	if (AH->currSchema)
		free(AH->currSchema);
2808 2809 2810 2811
	AH->currSchema = NULL;
	if (AH->currTablespace)
		free(AH->currTablespace);
	AH->currTablespace = NULL;
2812
	AH->currWithOids = -1;
B
Bruce Momjian 已提交
2813

2814 2815
	/* re-establish fixed state */
	_doSetFixedOutputState(AH);
2816 2817
}

2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834
/*
 * Become the specified user, and update state to avoid redundant commands
 *
 * NULL or empty argument is taken to mean restoring the session default
 */
static void
_becomeUser(ArchiveHandle *AH, const char *user)
{
	if (!user)
		user = "";				/* avoid null pointers */

	if (AH->currUser && strcmp(AH->currUser, user) == 0)
		return;					/* no need to do anything */

	_doSetSessionAuth(AH, user);

	/*
B
Bruce Momjian 已提交
2835 2836
	 * NOTE: currUser keeps track of what the imaginary session user in our
	 * script is
2837 2838 2839
	 */
	if (AH->currUser)
		free(AH->currUser);
2840
	AH->currUser = pg_strdup(user);
2841
}
2842 2843

/*
B
Bruce Momjian 已提交
2844
 * Become the owner of the given TOC entry object.  If
2845 2846
 * changes in ownership are not allowed, this doesn't do anything.
 */
B
Bruce Momjian 已提交
2847
static void
2848
_becomeOwner(ArchiveHandle *AH, TocEntry *te)
2849
{
2850
	if (AH->ropt && (AH->ropt->noOwner || !AH->ropt->use_setsessauth))
2851 2852
		return;

2853
	_becomeUser(AH, te->owner);
2854 2855
}

2856

2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870
/*
 * Set the proper default_with_oids value for the table.
 */
static void
_setWithOids(ArchiveHandle *AH, TocEntry *te)
{
	if (AH->currWithOids != te->withOids)
	{
		_doSetWithOids(AH, te->withOids);
		AH->currWithOids = te->withOids;
	}
}


2871 2872 2873 2874 2875 2876 2877
/*
 * Issue the commands to select the specified schema as the current schema
 * in the target database.
 */
static void
_selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
{
2878 2879
	PQExpBuffer qry;

2880
	if (!schemaName || *schemaName == '\0' ||
2881
		(AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
2882 2883
		return;					/* no need to do anything */

2884 2885 2886
	qry = createPQExpBuffer();

	appendPQExpBuffer(qry, "SET search_path = %s",
2887
					  fmtId(schemaName));
2888
	if (strcmp(schemaName, "pg_catalog") != 0)
2889
		appendPQExpBufferStr(qry, ", pg_catalog");
2890

2891 2892 2893 2894 2895 2896 2897
	if (RestoringToDB(AH))
	{
		PGresult   *res;

		res = PQexec(AH->connection, qry->data);

		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2898 2899 2900
			warn_or_exit_horribly(AH, modulename,
								  "could not set search_path to \"%s\": %s",
								  schemaName, PQerrorMessage(AH->connection));
2901 2902 2903 2904

		PQclear(res);
	}
	else
2905
		ahprintf(AH, "%s;\n\n", qry->data);
2906 2907 2908

	if (AH->currSchema)
		free(AH->currSchema);
2909
	AH->currSchema = pg_strdup(schemaName);
2910 2911

	destroyPQExpBuffer(qry);
2912 2913
}

2914 2915 2916 2917 2918 2919 2920 2921
/*
 * Issue the commands to select the specified tablespace as the current one
 * in the target database.
 */
static void
_selectTablespace(ArchiveHandle *AH, const char *tablespace)
{
	PQExpBuffer qry;
B
Bruce Momjian 已提交
2922 2923
	const char *want,
			   *have;
2924

2925 2926 2927 2928
	/* do nothing in --no-tablespaces mode */
	if (AH->ropt->noTablespace)
		return;

2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943
	have = AH->currTablespace;
	want = tablespace;

	/* no need to do anything for non-tablespace object */
	if (!want)
		return;

	if (have && strcmp(want, have) == 0)
		return;					/* no need to do anything */

	qry = createPQExpBuffer();

	if (strcmp(want, "") == 0)
	{
		/* We want the tablespace to be the database's default */
2944
		appendPQExpBufferStr(qry, "SET default_tablespace = ''");
2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958
	}
	else
	{
		/* We want an explicit tablespace */
		appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
	}

	if (RestoringToDB(AH))
	{
		PGresult   *res;

		res = PQexec(AH->connection, qry->data);

		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2959
			warn_or_exit_horribly(AH, modulename,
2960 2961
								"could not set default_tablespace to %s: %s",
								fmtId(want), PQerrorMessage(AH->connection));
2962 2963 2964 2965 2966 2967 2968 2969

		PQclear(res);
	}
	else
		ahprintf(AH, "%s;\n\n", qry->data);

	if (AH->currTablespace)
		free(AH->currTablespace);
2970
	AH->currTablespace = pg_strdup(want);
2971 2972 2973

	destroyPQExpBuffer(qry);
}
2974

2975 2976 2977
/*
 * Extract an object description for a TOC entry, and append it to buf.
 *
2978
 * This is used for ALTER ... OWNER TO.
2979
 */
2980
static void
2981
_getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
2982
{
2983 2984 2985
	const char *type = te->desc;

	/* Use ALTER TABLE for views and sequences */
A
Andrew Dunstan 已提交
2986
	if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0 ||
2987
		strcmp(type, "MATERIALIZED VIEW") == 0)
2988 2989
		type = "TABLE";

2990
	/* objects that don't require special decoration */
P
Peter Eisentraut 已提交
2991 2992
	if (strcmp(type, "COLLATION") == 0 ||
		strcmp(type, "CONVERSION") == 0 ||
2993 2994
		strcmp(type, "DOMAIN") == 0 ||
		strcmp(type, "TABLE") == 0 ||
2995
		strcmp(type, "TYPE") == 0 ||
R
Robert Haas 已提交
2996
		strcmp(type, "FOREIGN TABLE") == 0 ||
2997
		strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
2998
		strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
B
Bruce Momjian 已提交
2999
	/* non-schema-specified objects */
3000
		strcmp(type, "DATABASE") == 0 ||
3001
		strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3002 3003 3004 3005
		strcmp(type, "SCHEMA") == 0 ||
		strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
		strcmp(type, "SERVER") == 0 ||
		strcmp(type, "USER MAPPING") == 0)
3006
	{
3007
		/* We already know that search_path was set properly */
3008 3009 3010
		appendPQExpBuffer(buf, "%s %s", type, fmtId(te->tag));
		return;
	}
3011

3012 3013 3014 3015 3016 3017 3018
	/* BLOBs just have a name, but it's numeric so must not use fmtId */
	if (strcmp(type, "BLOB") == 0)
	{
		appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
		return;
	}

B
Bruce Momjian 已提交
3019
	/*
B
Bruce Momjian 已提交
3020 3021
	 * These object types require additional decoration.  Fortunately, the
	 * information needed is exactly what's in the DROP command.
B
Bruce Momjian 已提交
3022
	 */
3023 3024 3025
	if (strcmp(type, "AGGREGATE") == 0 ||
		strcmp(type, "FUNCTION") == 0 ||
		strcmp(type, "OPERATOR") == 0 ||
3026 3027
		strcmp(type, "OPERATOR CLASS") == 0 ||
		strcmp(type, "OPERATOR FAMILY") == 0)
B
Bruce Momjian 已提交
3028
	{
3029
		/* Chop "DROP " off the front and make a modifiable copy */
3030
		char	   *first = pg_strdup(te->dropStmt + 5);
3031
		char	   *last;
3032

3033 3034
		/* point to last character in string */
		last = first + strlen(first) - 1;
3035

3036 3037 3038 3039
		/* Strip off any ';' or '\n' at the end */
		while (last >= first && (*last == '\n' || *last == ';'))
			last--;
		*(last + 1) = '\0';
B
Bruce Momjian 已提交
3040

3041
		appendPQExpBufferStr(buf, first);
B
Bruce Momjian 已提交
3042 3043

		free(first);
3044
		return;
3045 3046
	}

3047 3048
	write_msg(modulename, "WARNING: don't know how to set owner for object type %s\n",
			  type);
3049 3050 3051
}

static void
3052
_printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isData, bool acl_pass)
B
Bruce Momjian 已提交
3053
{
3054 3055 3056
	/* ACLs are dumped only during acl pass */
	if (acl_pass)
	{
3057
		if (!_tocEntryIsACL(te))
3058 3059 3060 3061
			return;
	}
	else
	{
3062
		if (_tocEntryIsACL(te))
3063 3064 3065 3066 3067
			return;
	}

	/*
	 * Avoid dumping the public schema, as it will already be created ...
B
Bruce Momjian 已提交
3068
	 * unless we are using --clean mode, in which case it's been deleted and
3069
	 * we'd better recreate it.  Likewise for its comment, if any.
3070
	 */
3071 3072 3073 3074 3075
	if (!ropt->dropSchema)
	{
		if (strcmp(te->desc, "SCHEMA") == 0 &&
			strcmp(te->tag, "public") == 0)
			return;
3076
		/* The comment restore would require super-user privs, so avoid it. */
3077 3078 3079 3080
		if (strcmp(te->desc, "COMMENT") == 0 &&
			strcmp(te->tag, "SCHEMA public") == 0)
			return;
	}
3081

3082
	/* Select owner, schema, and tablespace as necessary */
3083 3084
	_becomeOwner(AH, te);
	_selectOutputSchema(AH, te->namespace);
3085
	_selectTablespace(AH, te->tablespace);
3086 3087 3088 3089 3090 3091

	/* Set up OID mode too */
	if (strcmp(te->desc, "TABLE") == 0)
		_setWithOids(AH, te);

	/* Emit header comment for item */
3092
	if (!AH->noTocComments)
3093
	{
3094
		const char *pfx;
3095 3096 3097
		char	   *sanitized_name;
		char	   *sanitized_schema;
		char	   *sanitized_owner;
3098 3099 3100 3101 3102 3103 3104 3105

		if (isData)
			pfx = "Data for ";
		else
			pfx = "";

		ahprintf(AH, "--\n");
		if (AH->public.verbose)
3106
		{
3107 3108 3109 3110 3111
			ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
					 te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
			if (te->nDeps > 0)
			{
				int			i;
3112

3113 3114 3115 3116 3117
				ahprintf(AH, "-- Dependencies:");
				for (i = 0; i < te->nDeps; i++)
					ahprintf(AH, " %d", te->dependencies[i]);
				ahprintf(AH, "\n");
			}
3118
		}
3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135

		/*
		 * Zap any line endings embedded in user-supplied fields, to prevent
		 * corruption of the dump (which could, in the worst case, present an
		 * SQL injection vulnerability if someone were to incautiously load a
		 * dump containing objects with maliciously crafted names).
		 */
		sanitized_name = replace_line_endings(te->tag);
		if (te->namespace)
			sanitized_schema = replace_line_endings(te->namespace);
		else
			sanitized_schema = pg_strdup("-");
		if (!ropt->noOwner)
			sanitized_owner = replace_line_endings(te->owner);
		else
			sanitized_owner = pg_strdup("-");

3136
		ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3137 3138 3139 3140 3141 3142 3143
				 pfx, sanitized_name, te->desc, sanitized_schema,
				 sanitized_owner);

		free(sanitized_name);
		free(sanitized_schema);
		free(sanitized_owner);

3144
		if (te->tablespace && !ropt->noTablespace)
3145
		{
3146
			char	   *sanitized_tablespace;
3147 3148 3149 3150 3151

			sanitized_tablespace = replace_line_endings(te->tablespace);
			ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
			free(sanitized_tablespace);
		}
3152 3153
		ahprintf(AH, "\n");

B
Bruce Momjian 已提交
3154
		if (AH->PrintExtraTocPtr !=NULL)
3155 3156
			(*AH->PrintExtraTocPtr) (AH, te);
		ahprintf(AH, "--\n\n");
3157
	}
B
Bruce Momjian 已提交
3158

3159 3160 3161
	/*
	 * Actually print the definition.
	 *
B
Bruce Momjian 已提交
3162 3163 3164
	 * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
	 * versions put into CREATE SCHEMA.  We have to do this when --no-owner
	 * mode is selected.  This is ugly, but I see no other good way ...
3165
	 */
3166
	if (ropt->noOwner && strcmp(te->desc, "SCHEMA") == 0)
3167
	{
3168
		ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3169
	}
3170
	else
3171
	{
3172 3173
		if (strlen(te->defn) > 0)
			ahprintf(AH, "%s\n\n", te->defn);
3174
	}
3175 3176 3177

	/*
	 * If we aren't using SET SESSION AUTH to determine ownership, we must
3178 3179 3180
	 * instead issue an ALTER OWNER command.  We assume that anything without
	 * a DROP command is not a separately ownable object.  All the categories
	 * with DROP commands must appear in one list or the other.
3181 3182
	 */
	if (!ropt->noOwner && !ropt->use_setsessauth &&
3183 3184 3185
		strlen(te->owner) > 0 && strlen(te->dropStmt) > 0)
	{
		if (strcmp(te->desc, "AGGREGATE") == 0 ||
3186
			strcmp(te->desc, "BLOB") == 0 ||
P
Peter Eisentraut 已提交
3187
			strcmp(te->desc, "COLLATION") == 0 ||
3188 3189 3190 3191 3192 3193
			strcmp(te->desc, "CONVERSION") == 0 ||
			strcmp(te->desc, "DATABASE") == 0 ||
			strcmp(te->desc, "DOMAIN") == 0 ||
			strcmp(te->desc, "FUNCTION") == 0 ||
			strcmp(te->desc, "OPERATOR") == 0 ||
			strcmp(te->desc, "OPERATOR CLASS") == 0 ||
3194
			strcmp(te->desc, "OPERATOR FAMILY") == 0 ||
3195
			strcmp(te->desc, "PROCEDURAL LANGUAGE") == 0 ||
3196 3197 3198 3199
			strcmp(te->desc, "SCHEMA") == 0 ||
			strcmp(te->desc, "TABLE") == 0 ||
			strcmp(te->desc, "TYPE") == 0 ||
			strcmp(te->desc, "VIEW") == 0 ||
3200
			strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3201
			strcmp(te->desc, "SEQUENCE") == 0 ||
R
Robert Haas 已提交
3202
			strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3203
			strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 ||
3204 3205 3206
			strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 ||
			strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 ||
			strcmp(te->desc, "SERVER") == 0)
3207 3208 3209
		{
			PQExpBuffer temp = createPQExpBuffer();

3210
			appendPQExpBufferStr(temp, "ALTER ");
3211
			_getObjectDescription(temp, te, AH);
3212 3213 3214 3215 3216 3217
			appendPQExpBuffer(temp, " OWNER TO %s;", fmtId(te->owner));
			ahprintf(AH, "%s\n\n", temp->data);
			destroyPQExpBuffer(temp);
		}
		else if (strcmp(te->desc, "CAST") == 0 ||
				 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
3218
				 strcmp(te->desc, "CONSTRAINT") == 0 ||
3219 3220
				 strcmp(te->desc, "DEFAULT") == 0 ||
				 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
3221
				 strcmp(te->desc, "INDEX") == 0 ||
3222
				 strcmp(te->desc, "RULE") == 0 ||
3223 3224
				 strcmp(te->desc, "TRIGGER") == 0 ||
				 strcmp(te->desc, "USER MAPPING") == 0)
3225 3226 3227 3228 3229 3230 3231 3232
		{
			/* these object types don't have separate owners */
		}
		else
		{
			write_msg(modulename, "WARNING: don't know how to set owner for object type %s\n",
					  te->desc);
		}
3233
	}
B
Bruce Momjian 已提交
3234

3235 3236
	/*
	 * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
B
Bruce Momjian 已提交
3237
	 * commands, so we can no longer assume we know the current auth setting.
3238
	 */
3239
	if (acl_pass)
3240 3241 3242 3243 3244
	{
		if (AH->currUser)
			free(AH->currUser);
		AH->currUser = NULL;
	}
B
Bruce Momjian 已提交
3245 3246
}

3247 3248 3249 3250 3251 3252 3253
/*
 * Sanitize a string to be included in an SQL comment, by replacing any
 * newlines with spaces.
 */
static char *
replace_line_endings(const char *str)
{
3254 3255
	char	   *result;
	char	   *s;
3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267

	result = pg_strdup(str);

	for (s = result; *s != '\0'; s++)
	{
		if (*s == '\n' || *s == '\r')
			*s = ' ';
	}

	return result;
}

B
Bruce Momjian 已提交
3268 3269
void
WriteHead(ArchiveHandle *AH)
B
Bruce Momjian 已提交
3270
{
B
Bruce Momjian 已提交
3271
	struct tm	crtm;
3272

B
Bruce Momjian 已提交
3273 3274 3275 3276 3277
	(*AH->WriteBufPtr) (AH, "PGDMP", 5);		/* Magic code */
	(*AH->WriteBytePtr) (AH, AH->vmaj);
	(*AH->WriteBytePtr) (AH, AH->vmin);
	(*AH->WriteBytePtr) (AH, AH->vrev);
	(*AH->WriteBytePtr) (AH, AH->intSize);
3278
	(*AH->WriteBytePtr) (AH, AH->offSize);
B
Bruce Momjian 已提交
3279
	(*AH->WriteBytePtr) (AH, AH->format);
B
Bruce Momjian 已提交
3280

3281
#ifndef HAVE_LIBZ
B
Bruce Momjian 已提交
3282
	if (AH->compression != 0)
3283
		write_msg(modulename, "WARNING: requested compression not available in this "
3284
				  "installation -- archive will be uncompressed\n");
B
Bruce Momjian 已提交
3285

B
Bruce Momjian 已提交
3286
	AH->compression = 0;
3287
#endif
B
Bruce Momjian 已提交
3288

3289 3290 3291 3292 3293 3294 3295 3296 3297 3298
	WriteInt(AH, AH->compression);

	crtm = *localtime(&AH->createDate);
	WriteInt(AH, crtm.tm_sec);
	WriteInt(AH, crtm.tm_min);
	WriteInt(AH, crtm.tm_hour);
	WriteInt(AH, crtm.tm_mday);
	WriteInt(AH, crtm.tm_mon);
	WriteInt(AH, crtm.tm_year);
	WriteInt(AH, crtm.tm_isdst);
3299
	WriteStr(AH, PQdb(AH->connection));
3300 3301
	WriteStr(AH, AH->public.remoteVersionStr);
	WriteStr(AH, PG_VERSION);
B
Bruce Momjian 已提交
3302 3303
}

B
Bruce Momjian 已提交
3304 3305
void
ReadHead(ArchiveHandle *AH)
B
Bruce Momjian 已提交
3306
{
B
Bruce Momjian 已提交
3307 3308
	char		tmpMag[7];
	int			fmt;
3309
	struct tm	crtm;
B
Bruce Momjian 已提交
3310

3311 3312 3313
	/*
	 * If we haven't already read the header, do so.
	 *
B
Bruce Momjian 已提交
3314
	 * NB: this code must agree with _discoverArchiveFormat().  Maybe find a
B
Bruce Momjian 已提交
3315
	 * way to unify the cases?
3316
	 */
B
Bruce Momjian 已提交
3317 3318
	if (!AH->readHeader)
	{
3319
		(*AH->ReadBufPtr) (AH, tmpMag, 5);
B
Bruce Momjian 已提交
3320

B
Bruce Momjian 已提交
3321
		if (strncmp(tmpMag, "PGDMP", 5) != 0)
3322
			exit_horribly(modulename, "did not find magic string in file header\n");
B
Bruce Momjian 已提交
3323

B
Bruce Momjian 已提交
3324 3325
		AH->vmaj = (*AH->ReadBytePtr) (AH);
		AH->vmin = (*AH->ReadBytePtr) (AH);
B
Bruce Momjian 已提交
3326

B
Bruce Momjian 已提交
3327 3328 3329
		if (AH->vmaj > 1 || ((AH->vmaj == 1) && (AH->vmin > 0)))		/* Version > 1.0 */
			AH->vrev = (*AH->ReadBytePtr) (AH);
		else
3330
			AH->vrev = 0;
B
Bruce Momjian 已提交
3331

B
Bruce Momjian 已提交
3332
		AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
B
Bruce Momjian 已提交
3333

3334
		if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
3335 3336
			exit_horribly(modulename, "unsupported version (%d.%d) in file header\n",
						  AH->vmaj, AH->vmin);
B
Bruce Momjian 已提交
3337

B
Bruce Momjian 已提交
3338
		AH->intSize = (*AH->ReadBytePtr) (AH);
3339
		if (AH->intSize > 32)
3340 3341
			exit_horribly(modulename, "sanity check on integer size (%lu) failed\n",
						  (unsigned long) AH->intSize);
B
Bruce Momjian 已提交
3342

3343
		if (AH->intSize > sizeof(int))
3344
			write_msg(modulename, "WARNING: archive was made on a machine with larger integers, some operations might fail\n");
B
Bruce Momjian 已提交
3345

3346
		if (AH->version >= K_VERS_1_7)
B
Bruce Momjian 已提交
3347
			AH->offSize = (*AH->ReadBytePtr) (AH);
3348
		else
B
Bruce Momjian 已提交
3349
			AH->offSize = AH->intSize;
3350

B
Bruce Momjian 已提交
3351
		fmt = (*AH->ReadBytePtr) (AH);
B
Bruce Momjian 已提交
3352

3353
		if (AH->format != fmt)
3354 3355
			exit_horribly(modulename, "expected format (%d) differs from format found in file (%d)\n",
						  AH->format, fmt);
B
Bruce Momjian 已提交
3356
	}
B
Bruce Momjian 已提交
3357

B
Bruce Momjian 已提交
3358 3359
	if (AH->version >= K_VERS_1_2)
	{
3360
		if (AH->version < K_VERS_1_4)
B
Bruce Momjian 已提交
3361
			AH->compression = (*AH->ReadBytePtr) (AH);
3362 3363
		else
			AH->compression = ReadInt(AH);
B
Bruce Momjian 已提交
3364 3365
	}
	else
3366
		AH->compression = Z_DEFAULT_COMPRESSION;
B
Bruce Momjian 已提交
3367

3368
#ifndef HAVE_LIBZ
B
Bruce Momjian 已提交
3369
	if (AH->compression != 0)
3370
		write_msg(modulename, "WARNING: archive is compressed, but this installation does not support compression -- no data will be available\n");
B
Bruce Momjian 已提交
3371 3372
#endif

3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383 3384 3385 3386
	if (AH->version >= K_VERS_1_4)
	{
		crtm.tm_sec = ReadInt(AH);
		crtm.tm_min = ReadInt(AH);
		crtm.tm_hour = ReadInt(AH);
		crtm.tm_mday = ReadInt(AH);
		crtm.tm_mon = ReadInt(AH);
		crtm.tm_year = ReadInt(AH);
		crtm.tm_isdst = ReadInt(AH);

		AH->archdbname = ReadStr(AH);

		AH->createDate = mktime(&crtm);

B
Bruce Momjian 已提交
3387
		if (AH->createDate == (time_t) -1)
3388
			write_msg(modulename, "WARNING: invalid creation date in header\n");
3389 3390
	}

3391 3392 3393 3394 3395
	if (AH->version >= K_VERS_1_10)
	{
		AH->archiveRemoteVersion = ReadStr(AH);
		AH->archiveDumpVersion = ReadStr(AH);
	}
B
Bruce Momjian 已提交
3396 3397 3398
}


3399 3400
/*
 * checkSeek
3401
 *	  check to see if ftell/fseek can be performed.
3402 3403 3404 3405
 */
bool
checkSeek(FILE *fp)
{
3406 3407 3408
	pgoff_t		tpos;

	/*
B
Bruce Momjian 已提交
3409 3410
	 * If pgoff_t is wider than long, we must have "real" fseeko and not an
	 * emulation using fseek.  Otherwise report no seek capability.
3411 3412 3413
	 */
#ifndef HAVE_FSEEKO
	if (sizeof(pgoff_t) > sizeof(long))
3414 3415
		return false;
#endif
3416 3417 3418

	/* Check that ftello works on this file */
	tpos = ftello(fp);
3419
	if (tpos < 0)
3420 3421 3422
		return false;

	/*
B
Bruce Momjian 已提交
3423
	 * Check that fseeko(SEEK_SET) works, too.  NB: we used to try to test
3424 3425 3426 3427 3428 3429 3430
	 * this with fseeko(fp, 0, SEEK_CUR).  But some platforms treat that as a
	 * successful no-op even on files that are otherwise unseekable.
	 */
	if (fseeko(fp, tpos, SEEK_SET) != 0)
		return false;

	return true;
3431
}
3432 3433 3434 3435 3436 3437 3438 3439 3440 3441


/*
 * dumpTimestamp
 */
static void
dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
{
	char		buf[256];

3442 3443 3444
	/*
	 * We don't print the timezone on Win32, because the names are long and
	 * localized, which means they may contain characters in various random
B
Bruce Momjian 已提交
3445 3446
	 * encodings; this has been seen to cause encoding errors when reading the
	 * dump script.
3447 3448 3449 3450 3451 3452 3453 3454
	 */
	if (strftime(buf, sizeof(buf),
#ifndef WIN32
				 "%Y-%m-%d %H:%M:%S %Z",
#else
				 "%Y-%m-%d %H:%M:%S",
#endif
				 localtime(&tim)) != 0)
3455 3456
		ahprintf(AH, "-- %s %s\n\n", msg, buf);
}
3457 3458 3459 3460 3461

/*
 * Main engine for parallel restore.
 *
 * Work is done in three phases.
3462
 * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
B
Bruce Momjian 已提交
3463
 * just as for a standard restore.  Second we process the remaining non-ACL
3464 3465 3466 3467
 * steps in parallel worker children (threads on Windows, processes on Unix),
 * each of which connects separately to the database.  Finally we process all
 * the ACL entries in a single connection (that happens back in
 * RestoreArchive).
3468 3469
 */
static void
A
Andrew Dunstan 已提交
3470
restore_toc_entries_prefork(ArchiveHandle *AH)
3471 3472
{
	RestoreOptions *ropt = AH->ropt;
3473
	bool		skipped_some;
3474 3475
	TocEntry   *next_work_item;

A
Andrew Dunstan 已提交
3476
	ahlog(AH, 2, "entering restore_toc_entries_prefork\n");
3477 3478 3479 3480 3481

	/* Adjust dependency information */
	fix_dependencies(AH);

	/*
3482 3483 3484
	 * Do all the early stuff in a single connection in the parent. There's no
	 * great point in running it in parallel, in fact it will actually run
	 * faster in a single connection because we avoid all the connection and
B
Bruce Momjian 已提交
3485
	 * setup overhead.  Also, pre-9.2 pg_dump versions were not very good
3486 3487 3488 3489 3490 3491 3492
	 * about showing all the dependencies of SECTION_PRE_DATA items, so we do
	 * not risk trying to process them out-of-order.
	 *
	 * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
	 * before DATA items, and all DATA items before POST_DATA items.  That is
	 * not certain to be true in older archives, though, so this loop is coded
	 * to not assume it.
3493
	 */
3494
	skipped_some = false;
3495
	for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
3496
	{
3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518
		/* NB: process-or-continue logic must be the inverse of loop below */
		if (next_work_item->section != SECTION_PRE_DATA)
		{
			/* DATA and POST_DATA items are just ignored for now */
			if (next_work_item->section == SECTION_DATA ||
				next_work_item->section == SECTION_POST_DATA)
			{
				skipped_some = true;
				continue;
			}
			else
			{
				/*
				 * SECTION_NONE items, such as comments, can be processed now
				 * if we are still in the PRE_DATA part of the archive.  Once
				 * we've skipped any items, we have to consider whether the
				 * comment's dependencies are satisfied, so skip it for now.
				 */
				if (skipped_some)
					continue;
			}
		}
3519 3520 3521 3522 3523 3524 3525

		ahlog(AH, 1, "processing item %d %s %s\n",
			  next_work_item->dumpId,
			  next_work_item->desc, next_work_item->tag);

		(void) restore_toc_entry(AH, next_work_item, ropt, false);

3526 3527
		/* there should be no touch of ready_list here, so pass NULL */
		reduce_dependencies(AH, next_work_item, NULL);
3528 3529 3530
	}

	/*
B
Bruce Momjian 已提交
3531
	 * Now close parent connection in prep for parallel steps.  We do this
3532 3533 3534
	 * mainly to ensure that we don't exceed the specified number of parallel
	 * connections.
	 */
R
Robert Haas 已提交
3535
	DisconnectDatabase(&AH->public);
3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547

	/* blow away any transient state from the old connection */
	if (AH->currUser)
		free(AH->currUser);
	AH->currUser = NULL;
	if (AH->currSchema)
		free(AH->currSchema);
	AH->currSchema = NULL;
	if (AH->currTablespace)
		free(AH->currTablespace);
	AH->currTablespace = NULL;
	AH->currWithOids = -1;
A
Andrew Dunstan 已提交
3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572
}

/*
 * Main engine for parallel restore.
 *
 * Work is done in three phases.
 * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
 * just as for a standard restore. This is done in restore_toc_entries_prefork().
 * Second we process the remaining non-ACL steps in parallel worker children
 * (threads on Windows, processes on Unix), these fork off and set up their
 * connections before we call restore_toc_entries_parallel_forked.
 * Finally we process all the ACL entries in a single connection (that happens
 * back in RestoreArchive).
 */
static void
restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
							 TocEntry *pending_list)
{
	int			work_status;
	bool		skipped_some;
	TocEntry	ready_list;
	TocEntry   *next_work_item;
	int			ret_child;

	ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
3573

3574
	/*
A
Andrew Dunstan 已提交
3575
	 * Initialize the lists of ready items, the list for pending items has
B
Bruce Momjian 已提交
3576
	 * already been initialized in the caller.  After this setup, the pending
A
Andrew Dunstan 已提交
3577 3578 3579 3580
	 * list is everything that needs to be done but is blocked by one or more
	 * dependencies, while the ready list contains items that have no
	 * remaining dependencies. Note: we don't yet filter out entries that
	 * aren't going to be restored. They might participate in dependency
B
Bruce Momjian 已提交
3581 3582
	 * chains connecting entries that should be restored, so we treat them as
	 * live until we actually process them.
3583 3584
	 */
	par_list_header_init(&ready_list);
3585
	skipped_some = false;
3586
	for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
3587
	{
3588 3589 3590 3591 3592 3593
		/* NB: process-or-continue logic must be the inverse of loop above */
		if (next_work_item->section == SECTION_PRE_DATA)
		{
			/* All PRE_DATA items were dealt with above */
			continue;
		}
3594 3595 3596
		if (next_work_item->section == SECTION_DATA ||
			next_work_item->section == SECTION_POST_DATA)
		{
3597 3598
			/* set this flag at same point that previous loop did */
			skipped_some = true;
3599
		}
3600 3601 3602 3603 3604 3605 3606 3607
		else
		{
			/* SECTION_NONE items must be processed if previous loop didn't */
			if (!skipped_some)
				continue;
		}

		if (next_work_item->depCount > 0)
A
Andrew Dunstan 已提交
3608
			par_list_append(pending_list, next_work_item);
3609 3610
		else
			par_list_append(&ready_list, next_work_item);
3611 3612
	}

3613 3614 3615 3616 3617 3618 3619
	/*
	 * main parent loop
	 *
	 * Keep going until there is no worker still running AND there is no work
	 * left to be done.
	 */

3620
	ahlog(AH, 1, "entering main parallel loop\n");
3621

A
Andrew Dunstan 已提交
3622 3623
	while ((next_work_item = get_next_work_item(AH, &ready_list, pstate)) != NULL ||
		   !IsEveryWorkerIdle(pstate))
3624 3625 3626
	{
		if (next_work_item != NULL)
		{
3627 3628 3629
			/* If not to be restored, don't waste time launching a worker */
			if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0 ||
				_tocEntryIsACL(next_work_item))
3630 3631 3632 3633 3634
			{
				ahlog(AH, 1, "skipping item %d %s %s\n",
					  next_work_item->dumpId,
					  next_work_item->desc, next_work_item->tag);

3635 3636
				par_list_remove(next_work_item);
				reduce_dependencies(AH, next_work_item, &ready_list);
3637 3638 3639 3640

				continue;
			}

A
Andrew Dunstan 已提交
3641 3642 3643
			ahlog(AH, 1, "launching item %d %s %s\n",
				  next_work_item->dumpId,
				  next_work_item->desc, next_work_item->tag);
3644

A
Andrew Dunstan 已提交
3645
			par_list_remove(next_work_item);
3646

A
Andrew Dunstan 已提交
3647 3648 3649 3650
			Assert(GetIdleWorker(pstate) != NO_SLOT);
			DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE);
		}
		else
3651
		{
A
Andrew Dunstan 已提交
3652 3653
			/* at least one child is working and we have nothing ready. */
			Assert(!IsEveryWorkerIdle(pstate));
3654
		}
3655

A
Andrew Dunstan 已提交
3656 3657 3658
		for (;;)
		{
			int			nTerm = 0;
3659

A
Andrew Dunstan 已提交
3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670 3671
			/*
			 * In order to reduce dependencies as soon as possible and
			 * especially to reap the status of workers who are working on
			 * items that pending items depend on, we do a non-blocking check
			 * for ended workers first.
			 *
			 * However, if we do not have any other work items currently that
			 * workers can work on, we do not busy-loop here but instead
			 * really wait for at least one worker to terminate. Hence we call
			 * ListenToWorkers(..., ..., do_wait = true) in this case.
			 */
			ListenToWorkers(AH, pstate, !next_work_item);
3672

A
Andrew Dunstan 已提交
3673 3674 3675 3676
			while ((ret_child = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
			{
				nTerm++;
				mark_work_done(AH, &ready_list, ret_child, work_status, pstate);
3677 3678
			}

A
Andrew Dunstan 已提交
3679 3680 3681 3682 3683 3684 3685
			/*
			 * We need to make sure that we have an idle worker before
			 * re-running the loop. If nTerm > 0 we already have that (quick
			 * check).
			 */
			if (nTerm > 0)
				break;
3686

A
Andrew Dunstan 已提交
3687 3688 3689 3690 3691 3692 3693 3694 3695
			/* if nobody terminated, explicitly check for an idle worker */
			if (GetIdleWorker(pstate) != NO_SLOT)
				break;

			/*
			 * If we have no idle worker, read the result of one or more
			 * workers and loop the loop to call ReapWorkerStatus() on them.
			 */
			ListenToWorkers(AH, pstate, true);
3696 3697 3698
		}
	}

3699
	ahlog(AH, 1, "finished main parallel loop\n");
A
Andrew Dunstan 已提交
3700
}
3701

A
Andrew Dunstan 已提交
3702 3703 3704 3705 3706 3707 3708
static void
restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
{
	RestoreOptions *ropt = AH->ropt;
	TocEntry   *te;

	ahlog(AH, 2, "entering restore_toc_entries_postfork\n");
3709

3710 3711 3712 3713 3714
	/*
	 * Now reconnect the single parent connection.
	 */
	ConnectDatabase((Archive *) AH, ropt->dbname,
					ropt->pghost, ropt->pgport, ropt->username,
3715
					ropt->promptPassword);
3716 3717 3718 3719

	_doSetFixedOutputState(AH);

	/*
3720 3721 3722
	 * Make sure there is no non-ACL work left due to, say, circular
	 * dependencies, or some other pathological condition. If so, do it in the
	 * single parent connection.
3723
	 */
A
Andrew Dunstan 已提交
3724
	for (te = pending_list->par_next; te != pending_list; te = te->par_next)
3725
	{
3726 3727 3728
		ahlog(AH, 1, "processing missed item %d %s %s\n",
			  te->dumpId, te->desc, te->tag);
		(void) restore_toc_entry(AH, te, ropt, false);
3729 3730 3731 3732 3733
	}

	/* The ACLs will be handled back in RestoreArchive. */
}

3734 3735 3736 3737 3738 3739 3740
/*
 * Check if te1 has an exclusive lock requirement for an item that te2 also
 * requires, whether or not te2's requirement is for an exclusive lock.
 */
static bool
has_lock_conflicts(TocEntry *te1, TocEntry *te2)
{
3741 3742
	int			j,
				k;
3743 3744 3745 3746 3747 3748 3749 3750 3751 3752 3753 3754 3755

	for (j = 0; j < te1->nLockDeps; j++)
	{
		for (k = 0; k < te2->nDeps; k++)
		{
			if (te1->lockDeps[j] == te2->dependencies[k])
				return true;
		}
	}
	return false;
}


3756 3757 3758 3759 3760 3761 3762 3763 3764 3765 3766 3767 3768 3769 3770 3771 3772 3773 3774 3775 3776 3777 3778 3779 3780 3781 3782 3783 3784 3785 3786 3787 3788
/*
 * Initialize the header of a parallel-processing list.
 *
 * These are circular lists with a dummy TocEntry as header, just like the
 * main TOC list; but we use separate list links so that an entry can be in
 * the main TOC list as well as in a parallel-processing list.
 */
static void
par_list_header_init(TocEntry *l)
{
	l->par_prev = l->par_next = l;
}

/* Append te to the end of the parallel-processing list headed by l */
static void
par_list_append(TocEntry *l, TocEntry *te)
{
	te->par_prev = l->par_prev;
	l->par_prev->par_next = te;
	l->par_prev = te;
	te->par_next = l;
}

/* Remove te from whatever parallel-processing list it's in */
static void
par_list_remove(TocEntry *te)
{
	te->par_prev->par_next = te->par_next;
	te->par_next->par_prev = te->par_prev;
	te->par_prev = NULL;
	te->par_next = NULL;
}

3789

3790 3791 3792 3793
/*
 * Find the next work item (if any) that is capable of being run now.
 *
 * To qualify, the item must have no remaining dependencies
3794 3795 3796
 * and no requirements for locks that are incompatible with
 * items currently running.  Items in the ready_list are known to have
 * no remaining dependencies, but we have to check for lock conflicts.
3797
 *
3798 3799
 * Note that the returned item has *not* been removed from ready_list.
 * The caller must do that after successfully dispatching the item.
3800 3801 3802 3803 3804 3805
 *
 * pref_non_data is for an alternative selection algorithm that gives
 * preference to non-data items if there is already a data load running.
 * It is currently disabled.
 */
static TocEntry *
3806
get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
A
Andrew Dunstan 已提交
3807
				   ParallelState *pstate)
3808
{
3809 3810 3811 3812 3813
	bool		pref_non_data = false;	/* or get from AH->ropt */
	TocEntry   *data_te = NULL;
	TocEntry   *te;
	int			i,
				k;
3814 3815 3816 3817 3818 3819

	/*
	 * Bogus heuristics for pref_non_data
	 */
	if (pref_non_data)
	{
3820
		int			count = 0;
3821

A
Andrew Dunstan 已提交
3822 3823 3824
		for (k = 0; k < pstate->numWorkers; k++)
			if (pstate->parallelSlot[k].args->te != NULL &&
				pstate->parallelSlot[k].args->te->section == SECTION_DATA)
3825
				count++;
A
Andrew Dunstan 已提交
3826
		if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
3827 3828 3829 3830
			pref_non_data = false;
	}

	/*
3831
	 * Search the ready_list until we find a suitable item.
3832
	 */
3833
	for (te = ready_list->par_next; te != ready_list; te = te->par_next)
3834
	{
3835
		bool		conflicts = false;
3836 3837 3838

		/*
		 * Check to see if the item would need exclusive lock on something
3839 3840
		 * that a currently running item also needs lock on, or vice versa. If
		 * so, we don't want to schedule them together.
3841
		 */
A
Andrew Dunstan 已提交
3842
		for (i = 0; i < pstate->numWorkers && !conflicts; i++)
3843
		{
3844
			TocEntry   *running_te;
3845

A
Andrew Dunstan 已提交
3846
			if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING)
3847
				continue;
A
Andrew Dunstan 已提交
3848
			running_te = pstate->parallelSlot[i].args->te;
3849 3850 3851

			if (has_lock_conflicts(te, running_te) ||
				has_lock_conflicts(running_te, te))
3852
			{
3853 3854
				conflicts = true;
				break;
3855 3856 3857 3858 3859 3860 3861 3862 3863 3864 3865 3866 3867 3868 3869 3870 3871 3872 3873 3874
			}
		}

		if (conflicts)
			continue;

		if (pref_non_data && te->section == SECTION_DATA)
		{
			if (data_te == NULL)
				data_te = te;
			continue;
		}

		/* passed all tests, so this item can run */
		return te;
	}

	if (data_te != NULL)
		return data_te;

3875
	ahlog(AH, 2, "no item ready\n");
3876 3877 3878 3879 3880 3881 3882
	return NULL;
}


/*
 * Restore a single TOC item in parallel with others
 *
A
Andrew Dunstan 已提交
3883 3884 3885 3886
 * this is run in the worker, i.e. in a thread (Windows) or a separate process
 * (everything else). A worker process executes several such work items during
 * a parallel backup or restore. Once we terminate here and report back that
 * our work is finished, the master process will assign us a new work item.
3887
 */
A
Andrew Dunstan 已提交
3888
int
B
Bruce Momjian 已提交
3889
parallel_restore(ParallelArgs *args)
3890 3891
{
	ArchiveHandle *AH = args->AH;
3892
	TocEntry   *te = args->te;
3893
	RestoreOptions *ropt = AH->ropt;
A
Andrew Dunstan 已提交
3894
	int			status;
3895 3896 3897

	_doSetFixedOutputState(AH);

A
Andrew Dunstan 已提交
3898
	Assert(AH->connection != NULL);
3899

A
Andrew Dunstan 已提交
3900
	AH->public.n_errors = 0;
3901

A
Andrew Dunstan 已提交
3902 3903
	/* Restore the TOC item */
	status = restore_toc_entry(AH, te, ropt, true);
3904

A
Andrew Dunstan 已提交
3905
	return status;
3906 3907 3908 3909 3910 3911 3912 3913 3914 3915
}


/*
 * Housekeeping to be done after a step has been parallel restored.
 *
 * Clear the appropriate slot, free all the extra memory we allocated,
 * update status, and reduce the dependency count of any dependent items.
 */
static void
3916
mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
A
Andrew Dunstan 已提交
3917 3918
			   int worker, int status,
			   ParallelState *pstate)
3919
{
3920
	TocEntry   *te = NULL;
3921

A
Andrew Dunstan 已提交
3922
	te = pstate->parallelSlot[worker].args->te;
3923 3924

	if (te == NULL)
3925
		exit_horribly(modulename, "could not find slot of finished worker\n");
3926 3927 3928 3929 3930 3931 3932 3933 3934 3935 3936 3937 3938 3939

	ahlog(AH, 1, "finished item %d %s %s\n",
		  te->dumpId, te->desc, te->tag);

	if (status == WORKER_CREATE_DONE)
		mark_create_done(AH, te);
	else if (status == WORKER_INHIBIT_DATA)
	{
		inhibit_data_for_failed_table(AH, te);
		AH->public.n_errors++;
	}
	else if (status == WORKER_IGNORED_ERRORS)
		AH->public.n_errors++;
	else if (status != 0)
3940 3941
		exit_horribly(modulename, "worker process failed: exit code %d\n",
					  status);
3942

3943
	reduce_dependencies(AH, te, ready_list);
3944 3945 3946 3947 3948 3949
}


/*
 * Process the dependency information into a form useful for parallel restore.
 *
3950 3951 3952
 * This function takes care of fixing up some missing or badly designed
 * dependencies, and then prepares subsidiary data structures that will be
 * used in the main parallel-restore logic, including:
3953 3954
 * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
 * 2. We set up depCount fields that are the number of as-yet-unprocessed
3955 3956 3957 3958 3959 3960 3961 3962 3963
 * dependencies for each TOC entry.
 *
 * We also identify locking dependencies so that we can avoid trying to
 * schedule conflicting items at the same time.
 */
static void
fix_dependencies(ArchiveHandle *AH)
{
	TocEntry   *te;
3964
	int			i;
3965 3966

	/*
3967 3968
	 * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
	 * items are marked as not being in any parallel-processing list.
3969 3970 3971 3972
	 */
	for (te = AH->toc->next; te != AH->toc; te = te->next)
	{
		te->depCount = te->nDeps;
3973 3974
		te->revDeps = NULL;
		te->nRevDeps = 0;
3975 3976
		te->par_prev = NULL;
		te->par_next = NULL;
3977 3978 3979 3980 3981
	}

	/*
	 * POST_DATA items that are shown as depending on a table need to be
	 * re-pointed to depend on that table's data, instead.  This ensures they
3982
	 * won't get scheduled until the data has been loaded.
3983
	 */
3984
	repoint_table_dependencies(AH);
3985 3986

	/*
3987 3988 3989
	 * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
	 * COMMENTS to BLOBS.  Cope.  (We assume there's only one BLOBS and only
	 * one BLOB COMMENTS in such files.)
3990 3991 3992 3993 3994 3995 3996 3997 3998 3999 4000 4001 4002
	 */
	if (AH->version < K_VERS_1_11)
	{
		for (te = AH->toc->next; te != AH->toc; te = te->next)
		{
			if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
			{
				TocEntry   *te2;

				for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
				{
					if (strcmp(te2->desc, "BLOBS") == 0)
					{
4003
						te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014 4015
						te->dependencies[0] = te2->dumpId;
						te->nDeps++;
						te->depCount++;
						break;
					}
				}
				break;
			}
		}
	}

	/*
4016 4017 4018 4019 4020 4021
	 * At this point we start to build the revDeps reverse-dependency arrays,
	 * so all changes of dependencies must be complete.
	 */

	/*
	 * Count the incoming dependencies for each item.  Also, it is possible
4022
	 * that the dependencies list items that are not in the archive at all
A
Andrew Dunstan 已提交
4023 4024
	 * (that should not happen in 9.2 and later, but is highly likely in older
	 * archives).  Subtract such items from the depCounts.
4025 4026 4027 4028 4029
	 */
	for (te = AH->toc->next; te != AH->toc; te = te->next)
	{
		for (i = 0; i < te->nDeps; i++)
		{
4030 4031
			DumpId		depid = te->dependencies[i];

4032 4033
			if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
				AH->tocsByDumpId[depid]->nRevDeps++;
4034
			else
4035 4036 4037 4038
				te->depCount--;
		}
	}

4039
	/*
4040 4041
	 * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
	 * it as a counter below.
4042 4043 4044 4045
	 */
	for (te = AH->toc->next; te != AH->toc; te = te->next)
	{
		if (te->nRevDeps > 0)
4046
			te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4047 4048 4049 4050
		te->nRevDeps = 0;
	}

	/*
4051 4052
	 * Build the revDeps[] arrays of incoming-dependency dumpIds.  This had
	 * better agree with the loops above.
4053 4054 4055 4056 4057 4058 4059
	 */
	for (te = AH->toc->next; te != AH->toc; te = te->next)
	{
		for (i = 0; i < te->nDeps; i++)
		{
			DumpId		depid = te->dependencies[i];

4060
			if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4061
			{
4062
				TocEntry   *otherte = AH->tocsByDumpId[depid];
4063 4064 4065 4066 4067 4068

				otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
			}
		}
	}

4069 4070 4071 4072 4073 4074 4075
	/*
	 * Lastly, work out the locking dependencies.
	 */
	for (te = AH->toc->next; te != AH->toc; te = te->next)
	{
		te->lockDeps = NULL;
		te->nLockDeps = 0;
4076
		identify_locking_dependencies(AH, te);
4077 4078 4079 4080
	}
}

/*
4081
 * Change dependencies on table items to depend on table data items instead,
4082 4083 4084
 * but only in POST_DATA items.
 */
static void
4085
repoint_table_dependencies(ArchiveHandle *AH)
4086 4087
{
	TocEntry   *te;
4088
	int			i;
4089
	DumpId		olddep;
4090 4091 4092 4093 4094 4095 4096

	for (te = AH->toc->next; te != AH->toc; te = te->next)
	{
		if (te->section != SECTION_POST_DATA)
			continue;
		for (i = 0; i < te->nDeps; i++)
		{
4097 4098 4099
			olddep = te->dependencies[i];
			if (olddep <= AH->maxDumpId &&
				AH->tableDataId[olddep] != 0)
4100
			{
4101
				te->dependencies[i] = AH->tableDataId[olddep];
4102
				ahlog(AH, 2, "transferring dependency %d -> %d to %d\n",
4103
					  te->dumpId, olddep, AH->tableDataId[olddep]);
4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114
			}
		}
	}
}

/*
 * Identify which objects we'll need exclusive lock on in order to restore
 * the given TOC entry (*other* than the one identified by the TOC entry
 * itself).  Record their dump IDs in the entry's lockDeps[] array.
 */
static void
4115
identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
4116 4117 4118 4119 4120 4121 4122 4123 4124 4125 4126 4127 4128 4129 4130 4131 4132 4133
{
	DumpId	   *lockids;
	int			nlockids;
	int			i;

	/* Quick exit if no dependencies at all */
	if (te->nDeps == 0)
		return;

	/* Exit if this entry doesn't need exclusive lock on other objects */
	if (!(strcmp(te->desc, "CONSTRAINT") == 0 ||
		  strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
		  strcmp(te->desc, "FK CONSTRAINT") == 0 ||
		  strcmp(te->desc, "RULE") == 0 ||
		  strcmp(te->desc, "TRIGGER") == 0))
		return;

	/*
4134
	 * We assume the item requires exclusive lock on each TABLE DATA item
4135 4136 4137 4138
	 * listed among its dependencies.  (This was originally a dependency on
	 * the TABLE, but fix_dependencies repointed it to the data item. Note
	 * that all the entry types we are interested in here are POST_DATA, so
	 * they will all have been changed this way.)
4139
	 */
4140
	lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
4141 4142 4143
	nlockids = 0;
	for (i = 0; i < te->nDeps; i++)
	{
4144
		DumpId		depid = te->dependencies[i];
4145

4146 4147
		if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
			strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0)
4148 4149 4150 4151 4152 4153 4154 4155 4156
			lockids[nlockids++] = depid;
	}

	if (nlockids == 0)
	{
		free(lockids);
		return;
	}

T
Tom Lane 已提交
4157
	te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
4158 4159 4160 4161 4162
	te->nLockDeps = nlockids;
}

/*
 * Remove the specified TOC entry from the depCounts of items that depend on
4163 4164
 * it, thereby possibly making them ready-to-run.  Any pending item that
 * becomes ready should be moved to the ready list.
4165 4166
 */
static void
4167
reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
4168
{
4169
	int			i;
4170

4171
	ahlog(AH, 2, "reducing dependencies for %d\n", te->dumpId);
4172

4173
	for (i = 0; i < te->nRevDeps; i++)
4174
	{
4175
		TocEntry   *otherte = AH->tocsByDumpId[te->revDeps[i]];
4176 4177 4178

		otherte->depCount--;
		if (otherte->depCount == 0 && otherte->par_prev != NULL)
4179
		{
4180 4181 4182 4183
			/* It must be in the pending list, so remove it ... */
			par_list_remove(otherte);
			/* ... and add to ready_list */
			par_list_append(ready_list, otherte);
4184 4185 4186 4187 4188 4189 4190 4191 4192 4193 4194
		}
	}
}

/*
 * Set the created flag on the DATA member corresponding to the given
 * TABLE member
 */
static void
mark_create_done(ArchiveHandle *AH, TocEntry *te)
{
4195
	if (AH->tableDataId[te->dumpId] != 0)
4196
	{
4197 4198 4199
		TocEntry   *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];

		ted->created = true;
4200 4201 4202 4203 4204 4205 4206 4207 4208 4209 4210 4211 4212
	}
}

/*
 * Mark the DATA member corresponding to the given TABLE member
 * as not wanted
 */
static void
inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
{
	ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
		  te->tag);

4213
	if (AH->tableDataId[te->dumpId] != 0)
4214
	{
4215 4216 4217
		TocEntry   *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];

		ted->reqs = 0;
4218 4219 4220 4221 4222 4223 4224 4225 4226
	}
}

/*
 * Clone and de-clone routines used in parallel restoration.
 *
 * Enough of the structure is cloned to ensure that there is no
 * conflict between different threads each with their own clone.
 */
A
Andrew Dunstan 已提交
4227
ArchiveHandle *
4228 4229 4230 4231 4232
CloneArchive(ArchiveHandle *AH)
{
	ArchiveHandle *clone;

	/* Make a "flat" copy */
4233
	clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
4234 4235
	memcpy(clone, AH, sizeof(ArchiveHandle));

4236 4237
	/* Handle format-independent fields */
	memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
4238 4239 4240 4241 4242 4243 4244 4245 4246 4247

	/* The clone will have its own connection, so disregard connection state */
	clone->connection = NULL;
	clone->currUser = NULL;
	clone->currSchema = NULL;
	clone->currTablespace = NULL;
	clone->currWithOids = -1;

	/* savedPassword must be local in case we change it while connecting */
	if (clone->savedPassword)
4248
		clone->savedPassword = pg_strdup(clone->savedPassword);
4249 4250 4251 4252

	/* clone has its own error count, too */
	clone->public.n_errors = 0;

A
Andrew Dunstan 已提交
4253 4254 4255 4256 4257 4258 4259 4260 4261 4262 4263 4264 4265 4266 4267 4268 4269 4270 4271 4272 4273 4274 4275 4276 4277 4278 4279 4280 4281 4282 4283 4284 4285 4286 4287 4288 4289 4290 4291 4292 4293 4294 4295 4296 4297 4298 4299 4300 4301 4302
	/*
	 * Connect our new clone object to the database: In parallel restore the
	 * parent is already disconnected, because we can connect the worker
	 * processes independently to the database (no snapshot sync required). In
	 * parallel backup we clone the parent's existing connection.
	 */
	if (AH->mode == archModeRead)
	{
		RestoreOptions *ropt = AH->ropt;

		Assert(AH->connection == NULL);
		/* this also sets clone->connection */
		ConnectDatabase((Archive *) clone, ropt->dbname,
						ropt->pghost, ropt->pgport, ropt->username,
						ropt->promptPassword);
	}
	else
	{
		char	   *dbname;
		char	   *pghost;
		char	   *pgport;
		char	   *username;
		const char *encname;

		Assert(AH->connection != NULL);

		/*
		 * Even though we are technically accessing the parent's database
		 * object here, these functions are fine to be called like that
		 * because all just return a pointer and do not actually send/receive
		 * any data to/from the database.
		 */
		dbname = PQdb(AH->connection);
		pghost = PQhost(AH->connection);
		pgport = PQport(AH->connection);
		username = PQuser(AH->connection);
		encname = pg_encoding_to_char(AH->public.encoding);

		/* this also sets clone->connection */
		ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username, TRI_NO);

		/*
		 * Set the same encoding, whatever we set here is what we got from
		 * pg_encoding_to_char(), so we really shouldn't run into an error
		 * setting that very same value. Also see the comment in
		 * SetupConnection().
		 */
		PQsetClientEncoding(clone->connection, encname);
	}

4303 4304 4305
	/* Let the format-specific code have a chance too */
	(clone->ClonePtr) (clone);

A
Andrew Dunstan 已提交
4306
	Assert(clone->connection != NULL);
4307 4308 4309 4310 4311 4312 4313 4314
	return clone;
}

/*
 * Release clone-local storage.
 *
 * Note: we assume any clone-local connection was already closed.
 */
A
Andrew Dunstan 已提交
4315
void
4316 4317 4318 4319 4320
DeCloneArchive(ArchiveHandle *AH)
{
	/* Clear format-specific state */
	(AH->DeClonePtr) (AH);

4321 4322 4323
	/* Clear state allocated by CloneArchive */
	if (AH->sqlparse.curCmd)
		destroyPQExpBuffer(AH->sqlparse.curCmd);
4324 4325 4326 4327 4328 4329 4330 4331 4332 4333 4334 4335 4336

	/* Clear any connection-local state */
	if (AH->currUser)
		free(AH->currUser);
	if (AH->currSchema)
		free(AH->currSchema);
	if (AH->currTablespace)
		free(AH->currTablespace);
	if (AH->savedPassword)
		free(AH->savedPassword);

	free(AH);
}