dbcommands.c 30.1 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * dbcommands.c
4
 *		Database management commands (create/drop database).
5
 *
6
 *
B
Bruce Momjian 已提交
7
 * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
8
 * Portions Copyright (c) 1994, Regents of the University of California
9 10 11
 *
 *
 * IDENTIFICATION
12
 *	  $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.135 2004/06/10 22:26:18 momjian Exp $
13 14 15 16
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"
17 18 19 20 21

#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
B
Bruce Momjian 已提交
22

23
#include "access/genam.h"
24 25
#include "access/heapam.h"
#include "catalog/catname.h"
26
#include "catalog/catalog.h"
27
#include "catalog/pg_database.h"
B
Bruce Momjian 已提交
28
#include "catalog/pg_shadow.h"
29
#include "catalog/indexing.h"
B
Hello.  
Bruce Momjian 已提交
30
#include "commands/comment.h"
31
#include "commands/dbcommands.h"
B
Bruce Momjian 已提交
32
#include "miscadmin.h"
33
#include "storage/fd.h"
34 35
#include "storage/freespace.h"
#include "storage/sinval.h"
36
#include "utils/acl.h"
37
#include "utils/array.h"
38
#include "utils/builtins.h"
39
#include "utils/fmgroids.h"
40
#include "utils/guc.h"
41
#include "utils/lsyscache.h"
B
Bruce Momjian 已提交
42
#include "utils/syscache.h"
43

44
#include "mb/pg_wchar.h"		/* encoding check */
T
Tatsuo Ishii 已提交
45

46 47

/* non-export function prototypes */
48
static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
49 50 51
			int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
			TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
			char *dbpath);
52
static bool have_createdb_privilege(void);
53 54
static char *resolve_alt_dbpath(const char *dbpath, Oid dboid);
static bool remove_dbdirs(const char *real_loc, const char *altloc);
55 56 57 58

/*
 * CREATE DATABASE
 */
59 60

void
61
createdb(const CreatedbStmt *stmt)
62
{
63 64 65 66
	char	   *nominal_loc;
	char	   *alt_loc;
	char	   *target_dir;
	char		src_loc[MAXPGPATH];
67
#ifndef WIN32
68
	char		buf[2 * MAXPGPATH + 100];
69
#endif
70
	Oid			src_dboid;
71
	AclId		src_owner;
72 73 74
	int			src_encoding;
	bool		src_istemplate;
	Oid			src_lastsysoid;
75 76
	TransactionId src_vacuumxid;
	TransactionId src_frozenxid;
77
	char		src_dbpath[MAXPGPATH];
78 79
	Relation	pg_database_rel;
	HeapTuple	tuple;
80 81
	TupleDesc	pg_database_dsc;
	Datum		new_record[Natts_pg_database];
82
	char		new_record_nulls[Natts_pg_database];
83
	Oid			dboid;
84
	AclId		datdba;
85
	ListCell   *option;
86
	DefElem    *downer = NULL;
B
Bruce Momjian 已提交
87 88 89
	DefElem    *dpath = NULL;
	DefElem    *dtemplate = NULL;
	DefElem    *dencoding = NULL;
90 91 92 93
	char	   *dbname = stmt->dbname;
	char	   *dbowner = NULL;
	char	   *dbpath = NULL;
	char	   *dbtemplate = NULL;
B
Bruce Momjian 已提交
94
	int			encoding = -1;
95 96 97 98 99 100 101 102 103

	/* Extract options from the statement node tree */
	foreach(option, stmt->options)
	{
		DefElem    *defel = (DefElem *) lfirst(option);

		if (strcmp(defel->defname, "owner") == 0)
		{
			if (downer)
104 105 106
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
107 108 109 110 111
			downer = defel;
		}
		else if (strcmp(defel->defname, "location") == 0)
		{
			if (dpath)
112 113 114
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
115 116 117 118 119
			dpath = defel;
		}
		else if (strcmp(defel->defname, "template") == 0)
		{
			if (dtemplate)
120 121 122
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
123 124 125 126 127
			dtemplate = defel;
		}
		else if (strcmp(defel->defname, "encoding") == 0)
		{
			if (dencoding)
128 129 130
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options")));
131 132 133
			dencoding = defel;
		}
		else
134
			elog(ERROR, "option \"%s\" not recognized",
135 136 137
				 defel->defname);
	}

138
	if (downer && downer->arg)
139
		dbowner = strVal(downer->arg);
140
	if (dpath && dpath->arg)
141
		dbpath = strVal(dpath->arg);
142
	if (dtemplate && dtemplate->arg)
143
		dbtemplate = strVal(dtemplate->arg);
144 145 146 147 148 149 150 151 152 153
	if (dencoding && dencoding->arg)
	{
		const char *encoding_name;

		if (IsA(dencoding->arg, Integer))
		{
			encoding = intVal(dencoding->arg);
			encoding_name = pg_encoding_to_char(encoding);
			if (strcmp(encoding_name, "") == 0 ||
				pg_valid_server_encoding(encoding_name) < 0)
154 155 156 157
				ereport(ERROR,
						(errcode(ERRCODE_UNDEFINED_OBJECT),
						 errmsg("%d is not a valid encoding code",
								encoding)));
158 159 160 161 162
		}
		else if (IsA(dencoding->arg, String))
		{
			encoding_name = strVal(dencoding->arg);
			if (pg_valid_server_encoding(encoding_name) < 0)
163 164 165 166
				ereport(ERROR,
						(errcode(ERRCODE_UNDEFINED_OBJECT),
						 errmsg("%s is not a valid encoding name",
								encoding_name)));
167 168 169
			encoding = pg_char_to_encoding(encoding_name);
		}
		else
170 171
			elog(ERROR, "unrecognized node type: %d",
				 nodeTag(dencoding->arg));
172
	}
173

174 175
	/* obtain sysid of proposed owner */
	if (dbowner)
176
		datdba = get_usesysid(dbowner); /* will ereport if no such user */
177 178 179
	else
		datdba = GetUserId();

180
	if (datdba == GetUserId())
181 182
	{
		/* creating database for self: can be superuser or createdb */
183
		if (!superuser() && !have_createdb_privilege())
184 185
			ereport(ERROR,
					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
186
					 errmsg("permission denied to create database")));
187 188 189 190 191
	}
	else
	{
		/* creating database for someone else: must be superuser */
		/* note that the someone else need not have any permissions */
192
		if (!superuser())
193 194
			ereport(ERROR,
					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
195
					 errmsg("must be superuser to create database for another user")));
196
	}
197

198
	/* don't call this in a transaction block */
199
	PreventTransactionChain((void *) stmt, "CREATE DATABASE");
200

201
	/* alternate location requires symlinks */
202 203
#ifndef HAVE_SYMLINK
	if (dbpath != NULL)
204 205
		ereport(ERROR,
				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
206
		   errmsg("cannot use an alternative location on this platform")));
207 208
#endif

209
	/*
B
Bruce Momjian 已提交
210
	 * Check for db name conflict.	There is a race condition here, since
211
	 * another backend could create the same DB name before we commit.
B
Bruce Momjian 已提交
212 213 214 215
	 * However, holding an exclusive lock on pg_database for the whole
	 * time we are copying the source database doesn't seem like a good
	 * idea, so accept possibility of race to create.  We will check again
	 * after we grab the exclusive lock.
216
	 */
217
	if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
218 219 220
		ereport(ERROR,
				(errcode(ERRCODE_DUPLICATE_DATABASE),
				 errmsg("database \"%s\" already exists", dbname)));
221

222 223
	/*
	 * Lookup database (template) to be cloned.
224
	 */
225
	if (!dbtemplate)
B
Bruce Momjian 已提交
226
		dbtemplate = "template1";		/* Default template database name */
227

228
	if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
229 230 231
					 &src_istemplate, &src_lastsysoid,
					 &src_vacuumxid, &src_frozenxid,
					 src_dbpath))
232 233
		ereport(ERROR,
				(errcode(ERRCODE_UNDEFINED_DATABASE),
234
				 errmsg("template database \"%s\" does not exist", dbtemplate)));
B
Bruce Momjian 已提交
235

236
	/*
B
Bruce Momjian 已提交
237 238
	 * Permission check: to copy a DB that's not marked datistemplate, you
	 * must be superuser or the owner thereof.
239 240 241
	 */
	if (!src_istemplate)
	{
B
Bruce Momjian 已提交
242
		if (!superuser() && GetUserId() != src_owner)
243 244
			ereport(ERROR,
					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
245
					 errmsg("permission denied to copy database \"%s\"",
246
							dbtemplate)));
247
	}
B
Bruce Momjian 已提交
248

249 250 251 252 253 254 255
	/*
	 * Determine physical path of source database
	 */
	alt_loc = resolve_alt_dbpath(src_dbpath, src_dboid);
	if (!alt_loc)
		alt_loc = GetDatabasePath(src_dboid);
	strcpy(src_loc, alt_loc);
256

257 258 259 260 261 262 263
	/*
	 * The source DB can't have any active backends, except this one
	 * (exception is to allow CREATE DB while connected to template1).
	 * Otherwise we might copy inconsistent data.  This check is not
	 * bulletproof, since someone might connect while we are copying...
	 */
	if (DatabaseHasActiveBackends(src_dboid, true))
264 265
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_IN_USE),
B
Bruce Momjian 已提交
266 267
		errmsg("source database \"%s\" is being accessed by other users",
			   dbtemplate)));
268

269 270 271
	/* If encoding is defaulted, use source's encoding */
	if (encoding < 0)
		encoding = src_encoding;
272

T
Tatsuo Ishii 已提交
273
	/* Some encodings are client only */
274
	if (!PG_VALID_BE_ENCODING(encoding))
275 276
		ereport(ERROR,
				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
277
				 errmsg("invalid server encoding %d", encoding)));
278

B
Bruce Momjian 已提交
279 280 281
	/*
	 * Preassign OID for pg_database tuple, so that we can compute db
	 * path.
282 283
	 */
	dboid = newoid();
284

285
	/*
B
Bruce Momjian 已提交
286 287 288
	 * Compute nominal location (where we will try to access the
	 * database), and resolve alternate physical location if one is
	 * specified.
289
	 *
B
Bruce Momjian 已提交
290 291 292 293
	 * If an alternate location is specified but is the same as the normal
	 * path, just drop the alternate-location spec (this seems friendlier
	 * than erroring out).	We must test this case to avoid creating a
	 * circular symlink below.
294
	 */
295 296
	nominal_loc = GetDatabasePath(dboid);
	alt_loc = resolve_alt_dbpath(dbpath, dboid);
297

298 299 300 301 302 303
	if (alt_loc && strcmp(alt_loc, nominal_loc) == 0)
	{
		alt_loc = NULL;
		dbpath = NULL;
	}

304
	if (strchr(nominal_loc, '\''))
305 306 307
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_NAME),
				 errmsg("database path may not contain single quotes")));
308
	if (alt_loc && strchr(alt_loc, '\''))
309 310 311
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_NAME),
				 errmsg("database path may not contain single quotes")));
312
	if (strchr(src_loc, '\''))
313 314 315
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_NAME),
				 errmsg("database path may not contain single quotes")));
316
	/* ... otherwise we'd be open to shell exploits below */
317

318 319
	/*
	 * Force dirty buffers out to disk, to ensure source database is
B
Bruce Momjian 已提交
320 321
	 * up-to-date for the copy.  (We really only need to flush buffers for
	 * the source database...)
322
	 */
J
Jan Wieck 已提交
323
	BufferSync(-1, -1);
324

325 326 327 328 329 330
	/*
	 * Close virtual file descriptors so the kernel has more available for
	 * the mkdir() and system() calls below.
	 */
	closeAllVfds();

331 332 333 334 335
	/*
	 * Check we can create the target directory --- but then remove it
	 * because we rely on cp(1) to create it for real.
	 */
	target_dir = alt_loc ? alt_loc : nominal_loc;
336

337
	if (mkdir(target_dir, S_IRWXU) != 0)
338 339
		ereport(ERROR,
				(errcode_for_file_access(),
340
				 errmsg("could not create database directory \"%s\": %m",
341
						target_dir)));
342
	if (rmdir(target_dir) != 0)
343 344
		ereport(ERROR,
				(errcode_for_file_access(),
345
				 errmsg("could not remove temporary directory \"%s\": %m",
346
						target_dir)));
347

348 349
	/* Make the symlink, if needed */
	if (alt_loc)
350
	{
B
Bruce Momjian 已提交
351
#ifdef HAVE_SYMLINK				/* already throws error above */
352
		if (symlink(alt_loc, nominal_loc) != 0)
353
#endif
354 355
			ereport(ERROR,
					(errcode_for_file_access(),
356
					 errmsg("could not link file \"%s\" to \"%s\": %m",
357
							nominal_loc, alt_loc)));
358
	}
359

360 361 362 363 364 365 366
	/*
	 * Copy the template database to the new location
	 *
	 * XXX use of cp really makes this code pretty grotty, particularly
	 * with respect to lack of ability to report errors well.  Someday
	 * rewrite to do it for ourselves.
	 */
367
#ifndef WIN32
368
	/* We might need to use cp -R one day for portability */
369
	snprintf(buf, sizeof(buf), "cp -r '%s' '%s'", src_loc, target_dir);
370
	if (system(buf) != 0)
371 372 373 374 375 376 377 378 379 380 381 382 383
	{
		if (remove_dbdirs(nominal_loc, alt_loc))
			ereport(ERROR,
					(errmsg("could not initialize database directory"),
					 errdetail("Failing system command was: %s", buf),
					 errhint("Look in the postmaster's stderr log for more information.")));
		else
			ereport(ERROR,
					(errmsg("could not initialize database directory; delete failed as well"),
					 errdetail("Failing system command was: %s", buf),
					 errhint("Look in the postmaster's stderr log for more information.")));
	}
#else	/* WIN32 */
384
	if (copydir(src_loc, target_dir) != 0)
385
	{
386
		/* copydir should already have given details of its troubles */
387
		if (remove_dbdirs(nominal_loc, alt_loc))
388 389
			ereport(ERROR,
					(errmsg("could not initialize database directory")));
390
		else
391 392
			ereport(ERROR,
					(errmsg("could not initialize database directory; delete failed as well")));
393
	}
394
#endif	/* WIN32 */
395

396 397 398 399 400 401
	/*
	 * Now OK to grab exclusive lock on pg_database.
	 */
	pg_database_rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);

	/* Check to see if someone else created same DB name meanwhile. */
402
	if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
403
	{
404 405
		/* Don't hold lock while doing recursive remove */
		heap_close(pg_database_rel, AccessExclusiveLock);
406
		remove_dbdirs(nominal_loc, alt_loc);
407 408 409
		ereport(ERROR,
				(errcode(ERRCODE_DUPLICATE_DATABASE),
				 errmsg("database \"%s\" already exists", dbname)));
410 411 412 413 414 415 416 417
	}

	/*
	 * Insert a new tuple into pg_database
	 */
	pg_database_dsc = RelationGetDescr(pg_database_rel);

	/* Form tuple */
418 419 420
	MemSet(new_record, 0, sizeof(new_record));
	MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));

421 422
	new_record[Anum_pg_database_datname - 1] =
		DirectFunctionCall1(namein, CStringGetDatum(dbname));
423
	new_record[Anum_pg_database_datdba - 1] = Int32GetDatum(datdba);
424 425 426 427
	new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
	new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
	new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
	new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
428 429
	new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
	new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
430
	/* do not set datpath to null, GetRawDatabaseInfo won't cope */
431 432
	new_record[Anum_pg_database_datpath - 1] =
		DirectFunctionCall1(textin, CStringGetDatum(dbpath ? dbpath : ""));
B
Bruce Momjian 已提交
433

434 435 436
	/*
	 * We deliberately set datconfig and datacl to defaults (NULL), rather
	 * than copying them from the template database.  Copying datacl would
B
Bruce Momjian 已提交
437 438
	 * be a bad idea when the owner is not the same as the template's
	 * owner. It's more debatable whether datconfig should be copied.
439
	 */
440
	new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
441
	new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
442 443 444

	tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);

445
	HeapTupleSetOid(tuple, dboid);		/* override heap_insert's OID
B
Bruce Momjian 已提交
446
										 * selection */
447

448
	simple_heap_insert(pg_database_rel, tuple);
449

450 451
	/* Update indexes */
	CatalogUpdateIndexes(pg_database_rel, tuple);
452

453 454
	/* Close pg_database, but keep lock till commit */
	heap_close(pg_database_rel, NoLock);
455

456 457
	/*
	 * Force dirty buffers out to disk, so that newly-connecting backends
458 459 460
	 * will see the new database in pg_database right away.  (They'll see
	 * an uncommitted tuple, but they don't care; see GetRawDatabaseInfo.)
	 */
J
Jan Wieck 已提交
461
	BufferSync(-1, -1);
462
}
463 464


465 466 467
/*
 * DROP DATABASE
 */
468
void
469
dropdb(const char *dbname)
470
{
471
	int4		db_owner;
472
	bool		db_istemplate;
473
	Oid			db_id;
B
Bruce Momjian 已提交
474 475 476
	char	   *alt_loc;
	char	   *nominal_loc;
	char		dbpath[MAXPGPATH];
477
	Relation	pgdbrel;
B
Bruce Momjian 已提交
478
	SysScanDesc pgdbscan;
479
	ScanKeyData key;
480
	HeapTuple	tup;
481

482
	AssertArg(dbname);
483

484
	if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0)
485 486 487
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_IN_USE),
				 errmsg("cannot drop the currently open database")));
488

489
	PreventTransactionChain((void *) dbname, "DROP DATABASE");
490

491
	/*
492 493 494 495 496 497 498
	 * Obtain exclusive lock on pg_database.  We need this to ensure that
	 * no new backend starts up in the target database while we are
	 * deleting it.  (Actually, a new backend might still manage to start
	 * up, because it will read pg_database without any locking to
	 * discover the database's OID.  But it will detect its error in
	 * ReverifyMyDatabase and shut down before any serious damage is done.
	 * See postinit.c.)
499
	 */
500 501
	pgdbrel = heap_openr(DatabaseRelationName, AccessExclusiveLock);

502
	if (!get_db_info(dbname, &db_id, &db_owner, NULL,
503
					 &db_istemplate, NULL, NULL, NULL, dbpath))
504 505 506
		ereport(ERROR,
				(errcode(ERRCODE_UNDEFINED_DATABASE),
				 errmsg("database \"%s\" does not exist", dbname)));
507

508
	if (GetUserId() != db_owner && !superuser())
509 510
		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
					   dbname);
511 512

	/*
B
Bruce Momjian 已提交
513 514
	 * Disallow dropping a DB that is marked istemplate.  This is just to
	 * prevent people from accidentally dropping template0 or template1;
515 516 517
	 * they can do so if they're really determined ...
	 */
	if (db_istemplate)
518 519 520
		ereport(ERROR,
				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
				 errmsg("cannot drop a template database")));
521 522 523 524

	nominal_loc = GetDatabasePath(db_id);
	alt_loc = resolve_alt_dbpath(dbpath, db_id);

525 526 527
	/*
	 * Check for active backends in the target database.
	 */
528
	if (DatabaseHasActiveBackends(db_id, false))
529 530
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_IN_USE),
B
Bruce Momjian 已提交
531 532
			   errmsg("database \"%s\" is being accessed by other users",
					  dbname)));
533 534

	/*
535
	 * Find the database's tuple by OID (should be unique).
536
	 */
537 538 539 540
	ScanKeyInit(&key,
				ObjectIdAttributeNumber,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(db_id));
541

542 543
	pgdbscan = systable_beginscan(pgdbrel, DatabaseOidIndex, true,
								  SnapshotNow, 1, &key);
544

545
	tup = systable_getnext(pgdbscan);
546 547
	if (!HeapTupleIsValid(tup))
	{
548 549 550 551
		/*
		 * This error should never come up since the existence of the
		 * database is checked earlier
		 */
552
		elog(ERROR, "database \"%s\" doesn't exist despite earlier reports to the contrary",
553
			 dbname);
554 555
	}

556
	/* Remove the database's tuple from pg_database */
557
	simple_heap_delete(pgdbrel, &tup->t_self);
558

559
	systable_endscan(pgdbscan);
560

561 562 563 564 565 566 567
	/*
	 * Delete any comments associated with the database
	 *
	 * NOTE: this is probably dead code since any such comments should have
	 * been in that database, not mine.
	 */
	DeleteComments(db_id, RelationGetRelid(pgdbrel), 0);
568

569 570 571 572 573 574 575 576 577 578 579
	/*
	 * Close pg_database, but keep exclusive lock till commit to ensure
	 * that any new backend scanning pg_database will see the tuple dead.
	 */
	heap_close(pgdbrel, NoLock);

	/*
	 * Drop pages for this database that are in the shared buffer cache.
	 * This is important to ensure that no remaining backend tries to
	 * write out a dirty buffer to the dead database later...
	 */
580 581
	DropBuffers(db_id);

582 583 584 585 586
	/*
	 * Also, clean out any entries in the shared free space map.
	 */
	FreeSpaceMapForgetDatabase(db_id);

587
	/*
588
	 * Remove the database's subdirectory and everything in it.
589
	 */
590
	remove_dbdirs(nominal_loc, alt_loc);
591 592 593 594 595 596 597

	/*
	 * Force dirty buffers out to disk, so that newly-connecting backends
	 * will see the database tuple marked dead in pg_database right away.
	 * (They'll see an uncommitted deletion, but they don't care; see
	 * GetRawDatabaseInfo.)
	 */
J
Jan Wieck 已提交
598
	BufferSync(-1, -1);
599 600
}

601

602 603 604 605 606 607
/*
 * Rename database
 */
void
RenameDatabase(const char *oldname, const char *newname)
{
B
Bruce Momjian 已提交
608 609
	HeapTuple	tup,
				newtup;
610
	Relation	rel;
B
Bruce Momjian 已提交
611 612 613 614
	SysScanDesc scan,
				scan2;
	ScanKeyData key,
				key2;
615 616 617 618 619 620 621

	/*
	 * Obtain AccessExclusiveLock so that no new session gets started
	 * while the rename is in progress.
	 */
	rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);

622 623 624 625
	ScanKeyInit(&key,
				Anum_pg_database_datname,
				BTEqualStrategyNumber, F_NAMEEQ,
				NameGetDatum(oldname));
626 627
	scan = systable_beginscan(rel, DatabaseNameIndex, true,
							  SnapshotNow, 1, &key);
628 629 630 631

	tup = systable_getnext(scan);
	if (!HeapTupleIsValid(tup))
		ereport(ERROR,
632
				(errcode(ERRCODE_UNDEFINED_DATABASE),
633 634 635 636 637 638 639 640 641 642
				 errmsg("database \"%s\" does not exist", oldname)));

	/*
	 * XXX Client applications probably store the current database
	 * somewhere, so renaming it could cause confusion.  On the other
	 * hand, there may not be an actual problem besides a little
	 * confusion, so think about this and decide.
	 */
	if (HeapTupleGetOid(tup) == MyDatabaseId)
		ereport(ERROR,
643
				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
644 645 646
				 errmsg("current database may not be renamed")));

	/*
B
Bruce Momjian 已提交
647 648
	 * Make sure the database does not have active sessions.  Might not be
	 * necessary, but it's consistent with other database operations.
649 650
	 */
	if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false))
651 652
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_IN_USE),
B
Bruce Momjian 已提交
653 654
			   errmsg("database \"%s\" is being accessed by other users",
					  oldname)));
655 656

	/* make sure the new name doesn't exist */
657 658 659 660
	ScanKeyInit(&key2,
				Anum_pg_database_datname,
				BTEqualStrategyNumber, F_NAMEEQ,
				NameGetDatum(newname));
661 662
	scan2 = systable_beginscan(rel, DatabaseNameIndex, true,
							   SnapshotNow, 1, &key2);
663 664
	if (HeapTupleIsValid(systable_getnext(scan2)))
		ereport(ERROR,
665
				(errcode(ERRCODE_DUPLICATE_DATABASE),
666 667 668 669 670
				 errmsg("database \"%s\" already exists", newname)));
	systable_endscan(scan2);

	/* must be owner */
	if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId()))
671 672
		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
					   oldname);
673 674 675 676

	/* must have createdb */
	if (!have_createdb_privilege())
		ereport(ERROR,
677
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
678
				 errmsg("permission denied to rename database")));
679 680 681 682 683 684 685 686 687 688 689

	/* rename */
	newtup = heap_copytuple(tup);
	namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
	simple_heap_update(rel, &tup->t_self, newtup);
	CatalogUpdateIndexes(rel, newtup);

	systable_endscan(scan);
	heap_close(rel, NoLock);

	/*
B
Bruce Momjian 已提交
690 691 692 693
	 * Force dirty buffers out to disk, so that newly-connecting backends
	 * will see the renamed database in pg_database right away.  (They'll
	 * see an uncommitted tuple, but they don't care; see
	 * GetRawDatabaseInfo.)
694
	 */
J
Jan Wieck 已提交
695
	BufferSync(-1, -1);
696 697
}

698

699 700 701 702 703 704 705 706 707 708
/*
 * ALTER DATABASE name SET ...
 */
void
AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
{
	char	   *valuestr;
	HeapTuple	tuple,
				newtuple;
	Relation	rel;
B
Bruce Momjian 已提交
709
	ScanKeyData scankey;
B
Bruce Momjian 已提交
710
	SysScanDesc scan;
711 712 713 714
	Datum		repl_val[Natts_pg_database];
	char		repl_null[Natts_pg_database];
	char		repl_repl[Natts_pg_database];

715
	valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
716 717

	rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
718 719 720 721
	ScanKeyInit(&scankey,
				Anum_pg_database_datname,
				BTEqualStrategyNumber, F_NAMEEQ,
				NameGetDatum(stmt->dbname));
722 723
	scan = systable_beginscan(rel, DatabaseNameIndex, true,
							  SnapshotNow, 1, &scankey);
724
	tuple = systable_getnext(scan);
725
	if (!HeapTupleIsValid(tuple))
726 727 728
		ereport(ERROR,
				(errcode(ERRCODE_UNDEFINED_DATABASE),
				 errmsg("database \"%s\" does not exist", stmt->dbname)));
729 730

	if (!(superuser()
B
Bruce Momjian 已提交
731
		|| ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId()))
732
		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
B
Bruce Momjian 已提交
733
					   stmt->dbname);
734

735
	MemSet(repl_repl, ' ', sizeof(repl_repl));
B
Bruce Momjian 已提交
736
	repl_repl[Anum_pg_database_datconfig - 1] = 'r';
737

B
Bruce Momjian 已提交
738
	if (strcmp(stmt->variable, "all") == 0 && valuestr == NULL)
739
	{
740
		/* RESET ALL */
B
Bruce Momjian 已提交
741 742
		repl_null[Anum_pg_database_datconfig - 1] = 'n';
		repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
743
	}
744 745
	else
	{
B
Bruce Momjian 已提交
746 747 748
		Datum		datum;
		bool		isnull;
		ArrayType  *a;
749

B
Bruce Momjian 已提交
750
		repl_null[Anum_pg_database_datconfig - 1] = ' ';
751 752 753 754

		datum = heap_getattr(tuple, Anum_pg_database_datconfig,
							 RelationGetDescr(rel), &isnull);

755
		a = isnull ? NULL : DatumGetArrayTypeP(datum);
756

757
		if (valuestr)
758
			a = GUCArrayAdd(a, stmt->variable, valuestr);
759
		else
760
			a = GUCArrayDelete(a, stmt->variable);
761

762 763 764 765
		if (a)
			repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
		else
			repl_null[Anum_pg_database_datconfig - 1] = 'n';
766 767 768 769 770
	}

	newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl);
	simple_heap_update(rel, &tuple->t_self, newtuple);

771 772
	/* Update indexes */
	CatalogUpdateIndexes(rel, newtuple);
773

774
	systable_endscan(scan);
775 776 777 778
	heap_close(rel, RowExclusiveLock);
}


779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824
/*
 * ALTER DATABASE name OWNER TO newowner
 */
void
AlterDatabaseOwner(const char *dbname, const char *newowner)
{
	AclId		newdatdba;
	HeapTuple	tuple,
				newtuple;
	Relation	rel;
	ScanKeyData scankey;
	SysScanDesc scan;

	rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
	ScanKeyInit(&scankey,
				Anum_pg_database_datname,
				BTEqualStrategyNumber, F_NAMEEQ,
				NameGetDatum(dbname));
	scan = systable_beginscan(rel, DatabaseNameIndex, true,
							  SnapshotNow, 1, &scankey);
	tuple = systable_getnext(scan);
	if (!HeapTupleIsValid(tuple))
		ereport(ERROR,
				(errcode(ERRCODE_UNDEFINED_DATABASE),
				 errmsg("database \"%s\" does not exist", dbname)));

	/* obtain sysid of proposed owner */
	newdatdba = get_usesysid(newowner); /* will ereport if no such user */

	/* changing owner's database for someone else: must be superuser */
	/* note that the someone else need not have any permissions */
	if (!superuser())
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("must be superuser to change owner's database for another user")));

	/* change owner */
	newtuple = heap_copytuple(tuple);
	((Form_pg_database) GETSTRUCT(newtuple))->datdba = newdatdba;
	simple_heap_update(rel, &tuple->t_self, newtuple);
	CatalogUpdateIndexes(rel, newtuple);

	systable_endscan(scan);
	heap_close(rel, NoLock);
}

825

826
/*
827
 * Helper functions
828 829
 */

830
static bool
831
get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
832 833 834
			int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
			TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
			char *dbpath)
835
{
836 837
	Relation	relation;
	ScanKeyData scanKey;
B
Bruce Momjian 已提交
838
	SysScanDesc scan;
839
	HeapTuple	tuple;
840
	bool		gottuple;
841

842
	AssertArg(name);
843

844 845
	/* Caller may wish to grab a better lock on pg_database beforehand... */
	relation = heap_openr(DatabaseRelationName, AccessShareLock);
846

847 848 849 850
	ScanKeyInit(&scanKey,
				Anum_pg_database_datname,
				BTEqualStrategyNumber, F_NAMEEQ,
				NameGetDatum(name));
851

852 853
	scan = systable_beginscan(relation, DatabaseNameIndex, true,
							  SnapshotNow, 1, &scanKey);
854

855
	tuple = systable_getnext(scan);
856

857 858
	gottuple = HeapTupleIsValid(tuple);
	if (gottuple)
859
	{
860
		Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
861 862 863

		/* oid of the database */
		if (dbIdP)
864
			*dbIdP = HeapTupleGetOid(tuple);
865
		/* sysid of the owner */
866
		if (ownerIdP)
867
			*ownerIdP = dbform->datdba;
868
		/* character encoding */
869 870 871 872 873 874 875 876
		if (encodingP)
			*encodingP = dbform->encoding;
		/* allowed as template? */
		if (dbIsTemplateP)
			*dbIsTemplateP = dbform->datistemplate;
		/* last system OID used in database */
		if (dbLastSysOidP)
			*dbLastSysOidP = dbform->datlastsysoid;
877 878 879 880 881 882
		/* limit of vacuumed XIDs */
		if (dbVacuumXidP)
			*dbVacuumXidP = dbform->datvacuumxid;
		/* limit of frozen XIDs */
		if (dbFrozenXidP)
			*dbFrozenXidP = dbform->datfrozenxid;
883 884 885
		/* database path (as registered in pg_database) */
		if (dbpath)
		{
886 887 888 889 890 891 892
			Datum		datum;
			bool		isnull;

			datum = heap_getattr(tuple,
								 Anum_pg_database_datpath,
								 RelationGetDescr(relation),
								 &isnull);
893 894
			if (!isnull)
			{
895 896
				text	   *pathtext = DatumGetTextP(datum);
				int			pathlen = VARSIZE(pathtext) - VARHDRSZ;
897

898 899 900
				Assert(pathlen >= 0 && pathlen < MAXPGPATH);
				strncpy(dbpath, VARDATA(pathtext), pathlen);
				*(dbpath + pathlen) = '\0';
901 902 903 904
			}
			else
				strcpy(dbpath, "");
		}
905 906
	}

907
	systable_endscan(scan);
908
	heap_close(relation, AccessShareLock);
909

910
	return gottuple;
911
}
912

913
static bool
914
have_createdb_privilege(void)
915
{
916
	HeapTuple	utup;
917
	bool		retval;
918

919
	utup = SearchSysCache(SHADOWSYSID,
920
						  Int32GetDatum(GetUserId()),
921
						  0, 0, 0);
922

923
	if (!HeapTupleIsValid(utup))
924
		retval = false;
925 926
	else
		retval = ((Form_pg_shadow) GETSTRUCT(utup))->usecreatedb;
927

928 929
	ReleaseSysCache(utup);

930
	return retval;
931
}
932 933 934


static char *
B
Bruce Momjian 已提交
935
resolve_alt_dbpath(const char *dbpath, Oid dboid)
936
{
B
Bruce Momjian 已提交
937 938 939
	const char *prefix;
	char	   *ret;
	size_t		len;
940 941 942 943

	if (dbpath == NULL || dbpath[0] == '\0')
		return NULL;

944
	if (first_dir_separator(dbpath))
945
	{
946
		if (!is_absolute_path(dbpath))
947 948 949
			ereport(ERROR,
					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
					 errmsg("relative paths are not allowed as database locations")));
T
Tom Lane 已提交
950
#ifndef ALLOW_ABSOLUTE_DBPATHS
951 952
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
B
Bruce Momjian 已提交
953
		errmsg("absolute paths are not allowed as database locations")));
954
#endif
T
Tom Lane 已提交
955
		prefix = dbpath;
956 957 958 959
	}
	else
	{
		/* must be environment variable */
B
Bruce Momjian 已提交
960 961
		char	   *var = getenv(dbpath);

962
		if (!var)
963 964
			ereport(ERROR,
					(errcode(ERRCODE_UNDEFINED_OBJECT),
B
Bruce Momjian 已提交
965 966
			   errmsg("postmaster environment variable \"%s\" not found",
					  dbpath)));
967
		if (!is_absolute_path(var))
968 969 970 971
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_NAME),
					 errmsg("postmaster environment variable \"%s\" must be absolute path",
							dbpath)));
972 973 974 975
		prefix = var;
	}

	len = strlen(prefix) + 6 + sizeof(Oid) * 8 + 1;
976
	if (len >= MAXPGPATH - 100)
977 978
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_NAME),
979
				 errmsg("alternative path is too long")));
980

981 982 983 984 985 986 987 988
	ret = palloc(len);
	snprintf(ret, len, "%s/base/%u", prefix, dboid);

	return ret;
}


static bool
B
Bruce Momjian 已提交
989
remove_dbdirs(const char *nominal_loc, const char *alt_loc)
990
{
B
Bruce Momjian 已提交
991 992 993
	const char *target_dir;
	char		buf[MAXPGPATH + 100];
	bool		success = true;
994

995 996 997 998 999 1000 1001 1002 1003 1004
	target_dir = alt_loc ? alt_loc : nominal_loc;

	/*
	 * Close virtual file descriptors so the kernel has more available for
	 * the system() call below.
	 */
	closeAllVfds();

	if (alt_loc)
	{
1005
		/* remove symlink */
1006
		if (unlink(nominal_loc) != 0)
1007
		{
1008 1009
			ereport(WARNING,
					(errcode_for_file_access(),
1010
					 errmsg("could not remove file \"%s\": %m", nominal_loc)));
1011 1012
			success = false;
		}
1013 1014
	}

1015
#ifndef WIN32
1016
	snprintf(buf, sizeof(buf), "rm -rf '%s'", target_dir);
1017 1018 1019
#else
	snprintf(buf, sizeof(buf), "rmdir /s /q \"%s\"", target_dir);
#endif
1020

1021
	if (system(buf) != 0)
1022
	{
1023
		ereport(WARNING,
1024 1025 1026 1027
				(errmsg("could not remove database directory \"%s\"",
						target_dir),
				 errdetail("Failing system command was: %s", buf),
				 errhint("Look in the postmaster's stderr log for more information.")));
1028 1029 1030 1031 1032
		success = false;
	}

	return success;
}
1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046


/*
 * get_database_oid - given a database name, look up the OID
 *
 * Returns InvalidOid if database name not found.
 *
 * This is not actually used in this file, but is exported for use elsewhere.
 */
Oid
get_database_oid(const char *dbname)
{
	Relation	pg_database;
	ScanKeyData entry[1];
B
Bruce Momjian 已提交
1047
	SysScanDesc scan;
1048 1049 1050 1051 1052
	HeapTuple	dbtuple;
	Oid			oid;

	/* There's no syscache for pg_database, so must look the hard way */
	pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
1053 1054 1055 1056
	ScanKeyInit(&entry[0],
				Anum_pg_database_datname,
				BTEqualStrategyNumber, F_NAMEEQ,
				CStringGetDatum(dbname));
1057 1058
	scan = systable_beginscan(pg_database, DatabaseNameIndex, true,
							  SnapshotNow, 1, entry);
1059

1060
	dbtuple = systable_getnext(scan);
1061 1062 1063 1064 1065 1066 1067

	/* We assume that there can be at most one matching tuple */
	if (HeapTupleIsValid(dbtuple))
		oid = HeapTupleGetOid(dbtuple);
	else
		oid = InvalidOid;

1068
	systable_endscan(scan);
1069 1070 1071 1072 1073
	heap_close(pg_database, AccessShareLock);

	return oid;
}

1074

1075
/*
1076
 * get_database_name - given a database OID, look up the name
1077
 *
1078
 * Returns InvalidOid if database name not found.
1079 1080 1081
 *
 * This is not actually used in this file, but is exported for use elsewhere.
 */
1082 1083
char *
get_database_name(Oid dbid)
1084 1085 1086
{
	Relation	pg_database;
	ScanKeyData entry[1];
B
Bruce Momjian 已提交
1087
	SysScanDesc scan;
1088
	HeapTuple	dbtuple;
1089
	char	   *result;
1090 1091 1092

	/* There's no syscache for pg_database, so must look the hard way */
	pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
1093 1094 1095 1096
	ScanKeyInit(&entry[0],
				ObjectIdAttributeNumber,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(dbid));
1097 1098
	scan = systable_beginscan(pg_database, DatabaseOidIndex, true,
							  SnapshotNow, 1, entry);
1099

1100
	dbtuple = systable_getnext(scan);
1101

1102 1103 1104 1105 1106
	/* We assume that there can be at most one matching tuple */
	if (HeapTupleIsValid(dbtuple))
		result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
	else
		result = NULL;
1107

1108
	systable_endscan(scan);
1109 1110
	heap_close(pg_database, AccessShareLock);

1111
	return result;
1112
}