slotfuncs.c 6.7 KB
Newer Older
R
Robert Haas 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*-------------------------------------------------------------------------
 *
 * slotfuncs.c
 *	   Support functions for replication slots
 *
 * Copyright (c) 2012-2014, PostgreSQL Global Development Group
 *
 * IDENTIFICATION
 *	  src/backend/replication/slotfuncs.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "funcapi.h"
#include "miscadmin.h"
R
Robert Haas 已提交
18

R
Robert Haas 已提交
19
#include "access/htup_details.h"
R
Robert Haas 已提交
20 21 22
#include "replication/slot.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
R
Robert Haas 已提交
23
#include "utils/builtins.h"
24
#include "utils/pg_lsn.h"
R
Robert Haas 已提交
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48

static void
check_permissions(void)
{
	if (!superuser() && !has_rolreplication(GetUserId()))
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 (errmsg("must be superuser or replication role to use replication slots"))));
}

/*
 * SQL function for creating a new physical (streaming replication)
 * replication slot.
 */
Datum
pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
{
	Name		name = PG_GETARG_NAME(0);
	Datum		values[2];
	bool		nulls[2];
	TupleDesc	tupdesc;
	HeapTuple	tuple;
	Datum		result;

49
	Assert(!MyReplicationSlot);
R
Robert Haas 已提交
50 51 52 53

	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
		elog(ERROR, "return type must be a row type");

54 55 56 57
	check_permissions();

	CheckSlotRequirements();

B
Bruce Momjian 已提交
58
	/* acquire replication slot, this will check for conflicting names */
R
Robert Haas 已提交
59
	ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
R
Robert Haas 已提交
60

61
	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
R
Robert Haas 已提交
62 63 64 65 66 67 68 69 70 71 72 73

	nulls[0] = false;
	nulls[1] = true;

	tuple = heap_form_tuple(tupdesc, values, nulls);
	result = HeapTupleGetDatum(tuple);

	ReplicationSlotRelease();

	PG_RETURN_DATUM(result);
}

R
Robert Haas 已提交
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91

/*
 * SQL function for creating a new logical replication slot.
 */
Datum
pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
{
	Name		name = PG_GETARG_NAME(0);
	Name		plugin = PG_GETARG_NAME(1);

	LogicalDecodingContext *ctx = NULL;

	TupleDesc	tupdesc;
	HeapTuple	tuple;
	Datum		result;
	Datum		values[2];
	bool		nulls[2];

92 93
	Assert(!MyReplicationSlot);

R
Robert Haas 已提交
94 95 96 97 98 99 100 101
	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
		elog(ERROR, "return type must be a row type");

	check_permissions();

	CheckLogicalDecodingRequirements();

	/*
102 103 104 105
	 * Acquire a logical decoding slot, this will check for conflicting
	 * names. Initially create it as ephemeral - that allows us to nicely
	 * handle errors during initialization because it'll get dropped if this
	 * transaction fails. We'll make it persistent at the end.
R
Robert Haas 已提交
106 107 108 109 110 111
	 */
	ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL);

	/*
	 * Create logical decoding context, to build the initial snapshot.
	 */
112 113
	ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
									false, /* do not build snapshot */
B
Bruce Momjian 已提交
114
									logical_read_local_xlog_page, NULL, NULL);
R
Robert Haas 已提交
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137

	/* build initial snapshot, might take a while */
	DecodingContextFindStartpoint(ctx);

	values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name));
	values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);

	/* don't need the decoding context anymore */
	FreeDecodingContext(ctx);

	memset(nulls, 0, sizeof(nulls));

	tuple = heap_form_tuple(tupdesc, values, nulls);
	result = HeapTupleGetDatum(tuple);

	/* ok, slot is now fully created, mark it as persistent */
	ReplicationSlotPersist();
	ReplicationSlotRelease();

	PG_RETURN_DATUM(result);
}


R
Robert Haas 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
/*
 * SQL function for dropping a replication slot.
 */
Datum
pg_drop_replication_slot(PG_FUNCTION_ARGS)
{
	Name		name = PG_GETARG_NAME(0);

	check_permissions();

	CheckSlotRequirements();

	ReplicationSlotDrop(NameStr(*name));

	PG_RETURN_VOID();
}

/*
 * pg_get_replication_slots - SQL SRF showing active replication slots.
 */
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
R
Robert Haas 已提交
161
#define PG_GET_REPLICATION_SLOTS_COLS 8
R
Robert Haas 已提交
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
	TupleDesc	tupdesc;
	Tuplestorestate *tupstore;
	MemoryContext per_query_ctx;
	MemoryContext oldcontext;
	int			slotno;

	/* check to see if caller supports us returning a tuplestore */
	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
		ereport(ERROR,
				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
				 errmsg("set-valued function called in context that cannot accept a set")));
	if (!(rsinfo->allowedModes & SFRM_Materialize))
		ereport(ERROR,
				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
				 errmsg("materialize mode required, but it is not " \
						"allowed in this context")));

	/* Build a tuple descriptor for our result type */
	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
		elog(ERROR, "return type must be a row type");

	/*
	 * We don't require any special permission to see this function's data
	 * because nothing should be sensitive. The most critical being the slot
	 * name, which shouldn't contain anything particularly sensitive.
	 */

	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
	oldcontext = MemoryContextSwitchTo(per_query_ctx);

	tupstore = tuplestore_begin_heap(true, false, work_mem);
	rsinfo->returnMode = SFRM_Materialize;
	rsinfo->setResult = tupstore;
	rsinfo->setDesc = tupdesc;

	MemoryContextSwitchTo(oldcontext);

	for (slotno = 0; slotno < max_replication_slots; slotno++)
	{
		ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
R
Robert Haas 已提交
203 204
		Datum		values[PG_GET_REPLICATION_SLOTS_COLS];
		bool		nulls[PG_GET_REPLICATION_SLOTS_COLS];
R
Robert Haas 已提交
205 206

		TransactionId xmin;
R
Robert Haas 已提交
207
		TransactionId catalog_xmin;
R
Robert Haas 已提交
208 209 210
		XLogRecPtr	restart_lsn;
		bool		active;
		Oid			database;
211
		NameData	slot_name;
R
Robert Haas 已提交
212
		NameData	plugin;
R
Robert Haas 已提交
213 214 215 216 217 218 219 220 221 222 223
		int			i;

		SpinLockAcquire(&slot->mutex);
		if (!slot->in_use)
		{
			SpinLockRelease(&slot->mutex);
			continue;
		}
		else
		{
			xmin = slot->data.xmin;
R
Robert Haas 已提交
224
			catalog_xmin = slot->data.catalog_xmin;
R
Robert Haas 已提交
225 226
			database = slot->data.database;
			restart_lsn = slot->data.restart_lsn;
227
			namecpy(&slot_name, &slot->data.name);
R
Robert Haas 已提交
228
			namecpy(&plugin, &slot->data.plugin);
R
Robert Haas 已提交
229 230 231 232 233 234 235 236

			active = slot->active;
		}
		SpinLockRelease(&slot->mutex);

		memset(nulls, 0, sizeof(nulls));

		i = 0;
237
		values[i++] = NameGetDatum(&slot_name);
R
Robert Haas 已提交
238 239 240 241 242 243

		if (database == InvalidOid)
			nulls[i++] = true;
		else
			values[i++] = NameGetDatum(&plugin);

R
Robert Haas 已提交
244 245 246 247
		if (database == InvalidOid)
			values[i++] = CStringGetTextDatum("physical");
		else
			values[i++] = CStringGetTextDatum("logical");
R
Robert Haas 已提交
248

249 250 251 252
		if (database == InvalidOid)
			nulls[i++] = true;
		else
			values[i++] = database;
R
Robert Haas 已提交
253

R
Robert Haas 已提交
254
		values[i++] = BoolGetDatum(active);
R
Robert Haas 已提交
255

R
Robert Haas 已提交
256 257 258 259
		if (xmin != InvalidTransactionId)
			values[i++] = TransactionIdGetDatum(xmin);
		else
			nulls[i++] = true;
R
Robert Haas 已提交
260 261 262 263 264 265

		if (catalog_xmin != InvalidTransactionId)
			values[i++] = TransactionIdGetDatum(catalog_xmin);
		else
			nulls[i++] = true;

266
		if (restart_lsn != InvalidXLogRecPtr)
267
			values[i++] = LSNGetDatum(restart_lsn);
R
Robert Haas 已提交
268 269 270 271 272 273 274 275 276 277
		else
			nulls[i++] = true;

		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
	}

	tuplestore_donestoring(tupstore);

	return (Datum) 0;
}