cluster.c 10.1 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * cluster.c
4
 *	  Paul Brown's implementation of cluster index.
5
 *
6 7 8 9 10 11
 *	  I am going to use the rename function as a model for this in the
 *	  parser and executor, and the vacuum code as an example in this
 *	  file. As I go - in contrast to the rest of postgres - there will
 *	  be BUCKETS of comments. This is to allow reviewers to understand
 *	  my (probably bogus) assumptions about the way this works.
 *														[pbrown '94]
12 13 14 15 16
 *
 * Copyright (c) 1994-5, Regents of the University of California
 *
 *
 * IDENTIFICATION
17
 *	  $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.38 1999/02/13 23:15:02 momjian Exp $
18 19 20
 *
 *-------------------------------------------------------------------------
 */
21 22
#include <string.h>

M
Marc G. Fournier 已提交
23
#include <postgres.h>
24

M
Marc G. Fournier 已提交
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
#include <catalog/pg_index.h>
#include <catalog/heap.h>
#include <access/heapam.h>
#include <access/genam.h>
#include <access/xact.h>
#include <catalog/catname.h>
#include <utils/syscache.h>
#include <catalog/index.h>
#include <catalog/indexing.h>
#include <catalog/pg_type.h>
#include <commands/copy.h>
#include <commands/cluster.h>
#include <commands/rename.h>
#include <storage/bufmgr.h>
#include <miscadmin.h>
#include <tcop/dest.h>
#include <commands/command.h>
#include <utils/builtins.h>
#include <utils/excid.h>
#include <utils/mcxt.h>
#include <catalog/pg_proc.h>
#include <catalog/pg_class.h>
#include <optimizer/internal.h>
48
#ifndef NO_SECURITY
M
Marc G. Fournier 已提交
49
#include <utils/acl.h>
50
#endif	 /* !NO_SECURITY */
51

52
static Relation copy_heap(Oid OIDOldHeap);
53 54
static void copy_index(Oid OIDOldIndex, Oid OIDNewHeap);
static void rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);
55

56 57 58
/*
 * cluster
 *
59 60 61
 *	 Check that the relation is a relation in the appropriate user
 *	 ACL. I will use the same security that limits users on the
 *	 renamerel() function.
62
 *
63 64
 *	 Check that the index specified is appropriate for the task
 *	 ( ie it's an index over this relation ). This is trickier.
65
 *
66 67 68 69 70 71 72
 *	 Create a list of all the other indicies on this relation. Because
 *	 the cluster will wreck all the tids, I'll need to destroy bogus
 *	 indicies. The user will have to re-create them. Not nice, but
 *	 I'm not a nice guy. The alternative is to try some kind of post
 *	 destroy re-build. This may be possible. I'll check out what the
 *	 index create functiond want in the way of paramaters. On the other
 *	 hand, re-creating n indicies may blow out the space.
73
 *
74 75 76 77 78 79
 *	 Create new (temporary) relations for the base heap and the new
 *	 index.
 *
 *	 Exclusively lock the relations.
 *
 *	 Create new clustered index and base heap relation.
80 81 82
 *
 */
void
83
cluster(char *oldrelname, char *oldindexname)
84
{
85 86 87
	Oid			OIDOldHeap,
				OIDOldIndex,
				OIDNewHeap;
88

89 90 91
	Relation	OldHeap,
				OldIndex;
	Relation	NewHeap;
92

93 94 95 96
	char		NewIndexName[NAMEDATALEN];
	char		NewHeapName[NAMEDATALEN];
	char		saveoldrelname[NAMEDATALEN];
	char		saveoldindexname[NAMEDATALEN];
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127


	/*
	 * Save the old names because they will get lost when the old
	 * relations are destroyed.
	 */
	strcpy(saveoldrelname, oldrelname);
	strcpy(saveoldindexname, oldindexname);

	/*
	 * I'm going to force all checking back into the commands.c function.
	 *
	 * Get the list if indicies for this relation. If the index we want is
	 * among them, do not add it to the 'kill' list, as it will be handled
	 * by the 'clean up' code which commits this transaction.
	 *
	 * I'm not using the SysCache, because this will happen but once, and the
	 * slow way is the sure way in this case.
	 *
	 */

	/*
	 * Like vacuum, cluster spans transactions, so I'm going to handle it
	 * in the same way.
	 */

	/* matches the StartTransaction in PostgresMain() */

	OldHeap = heap_openr(oldrelname);
	if (!RelationIsValid(OldHeap))
	{
128
		elog(ERROR, "cluster: unknown relation: \"%s\"",
129 130
			 oldrelname);
	}
131 132
	OIDOldHeap = RelationGetRelid(OldHeap);		/* Get OID for the index
												 * scan    */
133 134 135 136

	OldIndex = index_openr(oldindexname);		/* Open old index relation	*/
	if (!RelationIsValid(OldIndex))
	{
137
		elog(ERROR, "cluster: unknown index: \"%s\"",
138 139
			 oldindexname);
	}
140
	OIDOldIndex = RelationGetRelid(OldIndex);	/* OID for the index scan		  */
141 142 143 144 145 146 147 148 149 150 151 152 153

	heap_close(OldHeap);
	index_close(OldIndex);

	/*
	 * I need to build the copies of the heap and the index. The Commit()
	 * between here is *very* bogus. If someone is appending stuff, they
	 * will get the lock after being blocked and add rows which won't be
	 * present in the new table. Bleagh! I'd be best to try and ensure
	 * that no-one's in the tables for the entire duration of this process
	 * with a pg_vlock.
	 */
	NewHeap = copy_heap(OIDOldHeap);
154
	OIDNewHeap = RelationGetRelid(NewHeap);
155 156 157 158 159 160 161 162 163 164 165 166 167
	strcpy(NewHeapName, NewHeap->rd_rel->relname.data);


	/* To make the new heap visible (which is until now empty). */
	CommandCounterIncrement();

	rebuildheap(OIDNewHeap, OIDOldHeap, OIDOldIndex);

	/* To flush the filled new heap (and the statistics about it). */
	CommandCounterIncrement();

	/* Create new index over the tuples of the new heap. */
	copy_index(OIDOldIndex, OIDNewHeap);
M
 
Marc G. Fournier 已提交
168
	snprintf(NewIndexName, NAMEDATALEN, "temp_%x", OIDOldIndex);
169 170 171 172 173 174 175 176 177 178

	/*
	 * make this really happen. Flush all the buffers. (Believe me, it is
	 * necessary ... ended up in a mess without it.)
	 */
	CommitTransactionCommand();
	StartTransactionCommand();


	/* Destroy old heap (along with its index) and rename new. */
179
	heap_destroy_with_catalog(oldrelname);
180

B
Bruce Momjian 已提交
181 182 183
	CommitTransactionCommand();
	StartTransactionCommand();

184 185 186 187 188 189 190 191 192 193
	renamerel(NewHeapName, saveoldrelname);
	TypeRename(NewHeapName, saveoldrelname);

	renamerel(NewIndexName, saveoldindexname);

	/*
	 * Again flush all the buffers.
	 */
	CommitTransactionCommand();
	StartTransactionCommand();
194 195
}

196
static Relation
197 198
copy_heap(Oid OIDOldHeap)
{
199 200 201 202 203 204
	char		NewName[NAMEDATALEN];
	TupleDesc	OldHeapDesc,
				tupdesc;
	Oid			OIDNewHeap;
	Relation	NewHeap,
				OldHeap;
205 206 207 208 209

	/*
	 * Create a new heap relation with a temporary name, which has the
	 * same tuple description as the old one.
	 */
M
 
Marc G. Fournier 已提交
210
	snprintf(NewName, NAMEDATALEN, "temp_%x", OIDOldHeap);
211 212

	OldHeap = heap_open(OIDOldHeap);
213
	OldHeapDesc = RelationGetDescr(OldHeap);
214 215

	/*
216 217
	 * Need to make a copy of the tuple descriptor,
	 * heap_create_with_catalog modifies it.
218 219 220 221
	 */

	tupdesc = CreateTupleDescCopy(OldHeapDesc);

222 223
	OIDNewHeap = heap_create_with_catalog(NewName, tupdesc,
										  RELKIND_RELATION, false);
224 225

	if (!OidIsValid(OIDNewHeap))
226
		elog(ERROR, "clusterheap: cannot create temporary heap relation\n");
227 228 229 230 231 232 233

	NewHeap = heap_open(OIDNewHeap);

	heap_close(NewHeap);
	heap_close(OldHeap);

	return NewHeap;
234 235
}

236
static void
237 238
copy_index(Oid OIDOldIndex, Oid OIDNewHeap)
{
M
 
Marc G. Fournier 已提交
239 240 241 242 243
	Relation			OldIndex,
								NewHeap;
	HeapTuple			Old_pg_index_Tuple,
								Old_pg_index_relation_Tuple,
								pg_proc_Tuple;
244
	Form_pg_index Old_pg_index_Form;
245
	Form_pg_class Old_pg_index_relation_Form;
M
 
Marc G. Fournier 已提交
246 247 248 249
	Form_pg_proc	pg_proc_Form;
	char					*NewIndexName;
	AttrNumber		*attnumP;
	int						natts;
250
	FuncIndexInfo *finfo;
251 252 253 254 255 256 257 258 259

	NewHeap = heap_open(OIDNewHeap);
	OldIndex = index_open(OIDOldIndex);

	/*
	 * OK. Create a new (temporary) index for the one that's already here.
	 * To do this I get the info from pg_index, re-build the FunctInfo if
	 * I have to, and add a new index with a temporary name.
	 */
260
	Old_pg_index_Tuple = SearchSysCacheTuple(INDEXRELID,
261
							ObjectIdGetDatum(RelationGetRelid(OldIndex)),
262 263 264
							0, 0, 0);

	Assert(Old_pg_index_Tuple);
265
	Old_pg_index_Form = (Form_pg_index) GETSTRUCT(Old_pg_index_Tuple);
266

267
	Old_pg_index_relation_Tuple = SearchSysCacheTuple(RELOID,
268
							ObjectIdGetDatum(RelationGetRelid(OldIndex)),
269 270 271
							0, 0, 0);

	Assert(Old_pg_index_relation_Tuple);
272
	Old_pg_index_relation_Form = (Form_pg_class) GETSTRUCT(Old_pg_index_relation_Tuple);
273

M
 
Marc G. Fournier 已提交
274
	/* Set the name. */
275
	NewIndexName = palloc(NAMEDATALEN); /* XXX */
M
 
Marc G. Fournier 已提交
276
	snprintf(NewIndexName, NAMEDATALEN, "temp_%x", OIDOldIndex);
277 278 279 280 281 282 283

	/*
	 * Ugly as it is, the only way I have of working out the number of
	 * attribues is to count them. Mostly there'll be just one but I've
	 * got to be sure.
	 */
	for (attnumP = &(Old_pg_index_Form->indkey[0]), natts = 0;
B
Bruce Momjian 已提交
284
		 natts < INDEX_MAX_KEYS && *attnumP != InvalidAttrNumber;
285 286 287 288 289 290 291 292 293 294 295 296
		 attnumP++, natts++);

	/*
	 * If this is a functional index, I need to rebuild the functional
	 * component to pass it to the defining procedure.
	 */
	if (Old_pg_index_Form->indproc != InvalidOid)
	{
		finfo = (FuncIndexInfo *) palloc(sizeof(FuncIndexInfo));
		FIgetnArgs(finfo) = natts;
		FIgetProcOid(finfo) = Old_pg_index_Form->indproc;

297
		pg_proc_Tuple = SearchSysCacheTuple(PROOID,
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
							ObjectIdGetDatum(Old_pg_index_Form->indproc),
								0, 0, 0);

		Assert(pg_proc_Tuple);
		pg_proc_Form = (Form_pg_proc) GETSTRUCT(pg_proc_Tuple);
		namecpy(&(finfo->funcName), &(pg_proc_Form->proname));
	}
	else
	{
		finfo = (FuncIndexInfo *) NULL;
		natts = 1;
	}

	index_create((NewHeap->rd_rel->relname).data,
				 NewIndexName,
				 finfo,
				 NULL,			/* type info is in the old index */
				 Old_pg_index_relation_Form->relam,
				 natts,
				 Old_pg_index_Form->indkey,
				 Old_pg_index_Form->indclass,
				 (uint16) 0, (Datum) NULL, NULL,
				 Old_pg_index_Form->indislossy,
321 322
				 Old_pg_index_Form->indisunique,
                 Old_pg_index_Form->indisprimary);
323 324 325

	heap_close(OldIndex);
	heap_close(NewHeap);
326 327 328
}


329
static void
330 331
rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
{
332 333 334 335 336 337 338 339
	Relation			LocalNewHeap,
						LocalOldHeap,
						LocalOldIndex;
	IndexScanDesc		ScanDesc;
	RetrieveIndexResult	ScanResult;
	HeapTupleData		LocalHeapTuple;
	Buffer				LocalBuffer;
	Oid					OIDNewHeapInsert;
340 341 342 343 344 345 346 347 348 349 350

	/*
	 * Open the relations I need. Scan through the OldHeap on the OldIndex
	 * and insert each tuple into the NewHeap.
	 */
	LocalNewHeap = (Relation) heap_open(OIDNewHeap);
	LocalOldHeap = (Relation) heap_open(OIDOldHeap);
	LocalOldIndex = (Relation) index_open(OIDOldIndex);

	ScanDesc = index_beginscan(LocalOldIndex, false, 0, (ScanKey) NULL);

B
Bruce Momjian 已提交
351
	while ((ScanResult = index_getnext(ScanDesc, ForwardScanDirection)) != NULL)
352 353
	{

354 355
		LocalHeapTuple.t_self = ScanResult->heap_iptr;
		heap_fetch(LocalOldHeap, SnapshotNow, &LocalHeapTuple, &LocalBuffer);
356
		OIDNewHeapInsert = heap_insert(LocalNewHeap, &LocalHeapTuple);
357 358 359 360 361 362 363 364
		pfree(ScanResult);
		ReleaseBuffer(LocalBuffer);
	}
	index_endscan(ScanDesc);

	index_close(LocalOldIndex);
	heap_close(LocalOldHeap);
	heap_close(LocalNewHeap);
365
}