From be0219e644d1763f979ec07c9e2ed7e3b1103ccd Mon Sep 17 00:00:00 2001 From: Ashwin Agrawal Date: Fri, 12 May 2017 09:24:18 -0700 Subject: [PATCH] Basic walreceiver unit test. Just a start to have wal replication tests in ICW. This has simple protocol functions whichs kind-of mocks walreceiver side to help validate walsender and xlog stream. Mainly to portray something on these lines can be easily leveraged to validate like xlog generation and stream for AO tables when done, avoiding to fully instantiate a mirror or something on similar lines at ease. --- GNUmakefile.in | 1 + .../walrepl/walreceiver/gplibpq/Makefile | 5 - .../walrepl/walreceiver/gplibpq/install.sql | 17 -- .../storage/walrepl/walreceiver/test_basic.py | 186 ------------------ src/test/walrep/.gitignore | 4 + src/test/walrep/Makefile | 13 ++ src/test/walrep/expected/walreceiver.out | 107 ++++++++++ .../walreceiver/gplibpq => walrep}/gplibpq.c | 113 ++++++----- src/test/walrep/input/setup.source | 14 ++ src/test/walrep/output/setup.source | 10 + src/test/walrep/sql/walreceiver.sql | 36 ++++ 11 files changed, 251 insertions(+), 255 deletions(-) delete mode 100644 src/test/tinc/tincrepo/mpp/gpdb/tests/storage/walrepl/walreceiver/gplibpq/Makefile delete mode 100644 src/test/tinc/tincrepo/mpp/gpdb/tests/storage/walrepl/walreceiver/gplibpq/install.sql delete mode 100644 src/test/tinc/tincrepo/mpp/gpdb/tests/storage/walrepl/walreceiver/test_basic.py create mode 100644 src/test/walrep/.gitignore create mode 100644 src/test/walrep/Makefile create mode 100644 src/test/walrep/expected/walreceiver.out rename src/test/{tinc/tincrepo/mpp/gpdb/tests/storage/walrepl/walreceiver/gplibpq => walrep}/gplibpq.c (65%) create mode 100644 src/test/walrep/input/setup.source create mode 100644 src/test/walrep/output/setup.source create mode 100644 src/test/walrep/sql/walreceiver.sql diff --git a/GNUmakefile.in b/GNUmakefile.in index 67aae82774..2b594b0496 100644 --- a/GNUmakefile.in +++ b/GNUmakefile.in @@ -131,6 +131,7 @@ distclean maintainer-clean: installcheck-world: $(MAKE) -C src/test installcheck-good $(MAKE) -C src/test/fsync install && $(MAKE) -C src/test/fsync installcheck + $(MAKE) -C src/test/walrep install && $(MAKE) -C src/test/walrep installcheck $(MAKE) -C src/test/isolation installcheck $(MAKE) -C src/test/isolation2 installcheck $(MAKE) -C src/pl installcheck diff --git a/src/test/tinc/tincrepo/mpp/gpdb/tests/storage/walrepl/walreceiver/gplibpq/Makefile b/src/test/tinc/tincrepo/mpp/gpdb/tests/storage/walrepl/walreceiver/gplibpq/Makefile deleted file mode 100644 index 7cc3caf9e8..0000000000 --- a/src/test/tinc/tincrepo/mpp/gpdb/tests/storage/walrepl/walreceiver/gplibpq/Makefile +++ /dev/null @@ -1,5 +0,0 @@ -MODULES = gplibpq - -PG_CONFIG = pg_config -PGXS = $(shell $(PG_CONFIG) --pgxs) -include $(PGXS) diff --git a/src/test/tinc/tincrepo/mpp/gpdb/tests/storage/walrepl/walreceiver/gplibpq/install.sql b/src/test/tinc/tincrepo/mpp/gpdb/tests/storage/walrepl/walreceiver/gplibpq/install.sql deleted file mode 100644 index a8c3b4f63e..0000000000 --- a/src/test/tinc/tincrepo/mpp/gpdb/tests/storage/walrepl/walreceiver/gplibpq/install.sql +++ /dev/null @@ -1,17 +0,0 @@ --- start_ignore -SET gp_create_table_random_default_distribution=off; --- end_ignore -CREATE OR REPLACE FUNCTION test_connect(text) RETURNS bool AS -'$libdir/gplibpq' LANGUAGE C; - -CREATE OR REPLACE FUNCTION test_disconnect() RETURNS bool AS -'$libdir/gplibpq' LANGUAGE C; - -CREATE OR REPLACE FUNCTION test_receive() RETURNS bool AS -'$libdir/gplibpq' LANGUAGE C; - -CREATE OR REPLACE FUNCTION test_send() RETURNS bool AS -'$libdir/gplibpq' LANGUAGE C; - -CREATE OR REPLACE FUNCTION test_scenario1(text) RETURNS bool AS -'$libdir/gplibpq' LANGUAGE C; diff --git a/src/test/tinc/tincrepo/mpp/gpdb/tests/storage/walrepl/walreceiver/test_basic.py b/src/test/tinc/tincrepo/mpp/gpdb/tests/storage/walrepl/walreceiver/test_basic.py deleted file mode 100644 index d69c45c2a9..0000000000 --- a/src/test/tinc/tincrepo/mpp/gpdb/tests/storage/walrepl/walreceiver/test_basic.py +++ /dev/null @@ -1,186 +0,0 @@ -""" -Copyright (C) 2004-2015 Pivotal Software, Inc. All rights reserved. - -This program and the accompanying materials are made available under -the terms of the under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import unittest2 as unittest -from tinctest import logger - -from gppylib.commands.base import Command -from mpp.gpdb.tests.storage.walrepl import lib as walrepl -from mpp.gpdb.tests.storage.walrepl.walreceiver import GPLibPQTestCase -from mpp.gpdb.tests.storage.walrepl.lib.pqwrap import * -from mpp.gpdb.tests.storage.walrepl.lib.pg_util import GpUtility -from mpp.lib.config import GPDBConfig - -import select - -gputil = GpUtility() -gputil.check_and_start_gpdb() -config = GPDBConfig() - -class case(GPLibPQTestCase): - """Basic test cases to see gplibpq and walsender are talking correctly.""" - - def setUp(self): - # cleanup - cmd = Command('gpinitstandby', 'gpinitstandby -ar') - # don't care the result in case standby is not configured - cmd.run() - - @unittest.skipIf(config.is_multinode(), "Test applies only to a singlenode cluster") - def test_connect(self): - """ - @tags sanity - """ - - with PGconn("") as conn: - res = conn.execute("SELECT test_connect('')") - self.assertEqual(res.status(), PGRES_TUPLES_OK) - self.assertTrue(res.getvalue(0, 0, convert=True)) - - cnt = self.count_walsender(conn) - self.assertEqual(cnt, 1) - - @unittest.skipIf(config.is_multinode(), "Test applies only to a singlenode cluster") - def test_connect_fail(self): - """ - @tags sanity - """ - - with PGconn("") as conn: - # the connection failure when conninfo is wrong. - res = conn.execute("SELECT test_connect('host=none')") - self.assertEqual(res.status(), PGRES_FATAL_ERROR) - - errmsg = res.error_message() - self.assertIn("could not connect", errmsg) - - @unittest.skipIf(config.is_multinode(), "Test applies only to a singlenode cluster") - def test_connect_host(self): - """ - @tags sanity - """ - - with PGconn("") as conn: - res = conn.execute("SELECT test_connect('host=localhost')") - self.assertEqual(res.status(), PGRES_TUPLES_OK) - - cnt = self.count_walsender(conn) - self.assertEqual(cnt, 1) - - @unittest.skipIf(config.is_multinode(), "Test applies only to a singlenode cluster") - def test_disconnect(self): - """ - @tags sanity - """ - - with PGconn("") as conn: - # check disconnect does not fail even before connect - res = conn.execute("SELECT test_disconnect()") - self.assertEqual(res.status(), PGRES_TUPLES_OK) - self.assertTrue(res.getvalue(0, 0, convert=True)) - - # check if disconnect clears the walsender - res = conn.execute("SELECT test_connect('')") - self.assertEqual(res.status(), PGRES_TUPLES_OK) - res = conn.execute("SELECT test_disconnect()") - self.assertEqual(res.status(), PGRES_TUPLES_OK) - - # wait for the backend terminates... - for i in walrepl.polling(10, 0.5): - cnt = self.count_walsender(conn) - if cnt == 0: - break - - self.assertEqual(cnt, 0) - - @unittest.skipIf(config.is_multinode(), "Test applies only to a singlenode cluster") - def test_receive_fail(self): - """ - @tags sanity - """ - - with PGconn("") as conn: - # Simply fails if not connected. - res = conn.execute("SELECT test_receive()") - self.assertEqual(res.status(), PGRES_FATAL_ERROR) - errmsg = res.error_message() - self.assertIn("connection pointer is NULL", errmsg) - - @unittest.skipIf(config.is_multinode(), "Test applies only to a singlenode cluster") - def test_receive(self): - """ - @tags sanity - """ - - with PGconn("") as conn: - res = conn.execute("SELECT test_connect('')") - self.assertEqual(res.status(), PGRES_TUPLES_OK) - - # the first call should get reply. - res = conn.execute("SELECT test_receive()") - # the second call should time out. - res = conn.execute("SELECT test_receive()") - res = conn.execute("SELECT test_receive()") - res = conn.execute("SELECT test_receive()") - res = conn.execute("SELECT test_receive()") - res = conn.execute("SELECT test_receive()") - res = conn.execute("SELECT test_receive()") - self.assertFalse(res.getvalue(0, 0, convert=True)) - - @unittest.skipIf(config.is_multinode(), "Test applies only to a singlenode cluster") - def test_send_fail(self): - """ - @tags sanity - """ - - with PGconn("") as conn: - # Simply fails if not connected. - res = conn.execute("SELECT test_send()") - self.assertEqual(res.status(), PGRES_FATAL_ERROR) - errmsg = res.error_message() - self.assertIn("connection pointer is NULL", errmsg) - - @unittest.skipIf(config.is_multinode(), "Test applies only to a singlenode cluster") - def test_send(self): - """ - @tags sanity - """ - - with PGconn("") as conn: - res = conn.execute("SELECT test_connect('')") - self.assertEqual(res.status(), PGRES_TUPLES_OK) - - res = conn.execute("SELECT test_send()") - self.assertTrue(res.getvalue(0, 0, convert=True)) - -if __name__ == '__main__': - # just a test of async interface. - def printer(arg, res): - res = PGresult(res) - print res.error_message() - with PGconn("") as conn: - handler = NoticeReceiverFunc(printer) - conn.set_notice_receiver(handler, None) - while True: - conn.send_query("SELECT test_scenario1('')") - fd = [conn] - ready = select.select(fd, [], [], 0.1)[0] - if len(ready) > 0: - if conn.consume_input() == 0: - raise StandardError("cannot read input") - if conn.is_busy() == 0: - res = conn.get_result() diff --git a/src/test/walrep/.gitignore b/src/test/walrep/.gitignore new file mode 100644 index 0000000000..1d72afaa75 --- /dev/null +++ b/src/test/walrep/.gitignore @@ -0,0 +1,4 @@ +# Generated directories and files +/results/ +/expected/setup.out +/sql/setup.sql diff --git a/src/test/walrep/Makefile b/src/test/walrep/Makefile new file mode 100644 index 0000000000..9a37c163be --- /dev/null +++ b/src/test/walrep/Makefile @@ -0,0 +1,13 @@ +MODULES=gplibpq +PG_CONFIG=pg_config + +REGRESS = setup walreceiver +REGRESS_OPTS = --dbname="walrep_regression" + +subdir = src/test/walrep/ +top_builddir = ../../.. + +include $(top_builddir)/src/Makefile.global + +NO_PGXS = 1 +include $(top_srcdir)/src/makefiles/pgxs.mk diff --git a/src/test/walrep/expected/walreceiver.out b/src/test/walrep/expected/walreceiver.out new file mode 100644 index 0000000000..f42d027338 --- /dev/null +++ b/src/test/walrep/expected/walreceiver.out @@ -0,0 +1,107 @@ +-- negative cases +SELECT test_receive(); +ERROR: could not receive data from WAL stream: connection pointer is NULL (gp_libpqwalreceiver.c:403) +SELECT test_send(); +ERROR: could not send data to WAL stream: connection pointer is NULL (gp_libpqwalreceiver.c:430) +SELECT test_disconnect(); + test_disconnect +----------------- + t +(1 row) + +-- Test connection +SELECT test_connect('', pg_current_xlog_location()); + test_connect +-------------- + t +(1 row) + +-- Should report 1 replication +SELECT count(*) FROM pg_stat_replication; + count +------- + 1 +(1 row) + +SELECT test_disconnect(); + test_disconnect +----------------- + t +(1 row) + +SELECT count(*) FROM pg_stat_replication; + count +------- + 0 +(1 row) + +-- Test connection passing hostname +SELECT test_connect('host=localhost', pg_current_xlog_location()); + test_connect +-------------- + t +(1 row) + +SELECT count(*) FROM pg_stat_replication; + count +------- + 1 +(1 row) + +SELECT test_disconnect(); + test_disconnect +----------------- + t +(1 row) + +SELECT count(*) FROM pg_stat_replication; + count +------- + 0 +(1 row) + +-- create table and store current_xlog_location. +create TEMP table tmp(startpoint text); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'startpoint' as the Greenplum Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +CREATE FUNCTION select_tmp() RETURNS text AS $$ +select startpoint from tmp; +$$ LANGUAGE SQL; +insert into tmp select pg_current_xlog_location(); +-- lets generate some xlogs +create table testwalreceiver(a int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Greenplum Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +insert into testwalreceiver select * from generate_series(0, 9); +-- Connect and receive the xlogs, validate everything was received from start to +-- end +SELECT test_connect('', select_tmp()); + test_connect +-------------- + t +(1 row) + +SELECT test_receive_and_verify(select_tmp(), pg_current_xlog_location()); + test_receive_and_verify +------------------------- + t +(1 row) + +SELECT test_send(); + test_send +----------- + t +(1 row) + +SELECT test_receive(); + test_receive +-------------- + f +(1 row) + +SELECT test_disconnect(); + test_disconnect +----------------- + t +(1 row) + diff --git a/src/test/tinc/tincrepo/mpp/gpdb/tests/storage/walrepl/walreceiver/gplibpq/gplibpq.c b/src/test/walrep/gplibpq.c similarity index 65% rename from src/test/tinc/tincrepo/mpp/gpdb/tests/storage/walrepl/walreceiver/gplibpq/gplibpq.c rename to src/test/walrep/gplibpq.c index 3909f5c307..f884117e14 100644 --- a/src/test/tinc/tincrepo/mpp/gpdb/tests/storage/walrepl/walreceiver/gplibpq/gplibpq.c +++ b/src/test/walrep/gplibpq.c @@ -21,32 +21,50 @@ static struct XLogRecPtr Flush; } LogstreamResult; -static void test_XLogWalRcvProcessMsg(unsigned char type, - char *buf, Size len); +static void test_XLogWalRcvProcessMsg(unsigned char type, char *buf, + Size len, XLogRecPtr *logStreamStart); static void test_XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void test_XLogWalRcvSendReply(void); -static void test_ProcessWalSndrMessage(char *type, XLogRecPtr walEnd, - TimestampTz sendTime); +static void test_PrintLog(char *type, XLogRecPtr walPtr, + TimestampTz sendTime); Datum test_connect(PG_FUNCTION_ARGS); Datum test_disconnect(PG_FUNCTION_ARGS); Datum test_receive(PG_FUNCTION_ARGS); Datum test_send(PG_FUNCTION_ARGS); -Datum test_scenario1(PG_FUNCTION_ARGS); +Datum test_receive_and_verify(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(test_connect); PG_FUNCTION_INFO_V1(test_disconnect); PG_FUNCTION_INFO_V1(test_receive); PG_FUNCTION_INFO_V1(test_send); -PG_FUNCTION_INFO_V1(test_scenario1); +PG_FUNCTION_INFO_V1(test_receive_and_verify); + +static void +string_to_xlogrecptr(text *location, XLogRecPtr *rec) +{ + char *locationstr = DatumGetCString( + DirectFunctionCall1(textout, + PointerGetDatum(location))); + + if (sscanf(locationstr, "%X/%X", &rec->xlogid, &rec->xrecoff) != 2) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse transaction log location \"%s\"", + locationstr))); +} Datum test_connect(PG_FUNCTION_ARGS) { char *conninfo = TextDatumGetCString(PG_GETARG_DATUM(0)); - bool result; + text *location = PG_GETARG_TEXT_P(1); + bool result; + XLogRecPtr startpoint; - result = walrcv_connect(conninfo, GetRedoRecPtr()); + string_to_xlogrecptr(location, &startpoint); + + result = walrcv_connect(conninfo, startpoint); PG_RETURN_BOOL(result); } @@ -80,52 +98,50 @@ test_send(PG_FUNCTION_ARGS) } Datum -test_scenario1(PG_FUNCTION_ARGS) +test_receive_and_verify(PG_FUNCTION_ARGS) { - char *conninfo = TextDatumGetCString(PG_GETARG_DATUM(0)); - unsigned char type; + text *start_location = PG_GETARG_TEXT_P(0); + text *end_location = PG_GETARG_TEXT_P(1); + XLogRecPtr startpoint; + XLogRecPtr endpoint; + unsigned char type; char *buf; - int len; + int len; - if (!walrcv_connect(conninfo, GetRedoRecPtr())) - elog(ERROR, "could not connect"); + string_to_xlogrecptr(start_location, &startpoint); + string_to_xlogrecptr(end_location, &endpoint); - for (;;) + /* Pending to check why first walrcv_receive returns nothing */ + walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len); + + if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len)) { - CHECK_FOR_INTERRUPTS(); - if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len)) - { - elog(INFO, "got message: type = %c", type); + XLogRecPtr logStreamStart; + /* Accept the received data, and process it */ + test_XLogWalRcvProcessMsg(type, buf, len, &logStreamStart); - /* Accept the received data, and process it */ - test_XLogWalRcvProcessMsg(type, buf, len); + /* Compare received everthing from start */ + if (startpoint.xlogid != logStreamStart.xlogid || + startpoint.xrecoff != logStreamStart.xrecoff) + PG_RETURN_BOOL(false); - /* Receive any more data we can without sleeping */ - while (walrcv_receive(0, &type, &buf, &len)) - test_XLogWalRcvProcessMsg(type, buf, len); + /* Compare received everthing till end */ + if (endpoint.xlogid != LogstreamResult.Write.xlogid || + endpoint.xrecoff != LogstreamResult.Write.xrecoff) + PG_RETURN_BOOL(false); - /* Let the primary know that we received some data. */ - test_XLogWalRcvSendReply(); - } - else - { - /* - * - * We didn't receive anything new, but send a status update to the - * master anyway, to report any progress in applying WAL. - */ - test_XLogWalRcvSendReply(); - } + PG_RETURN_BOOL(true); } - PG_RETURN_NULL(); + PG_RETURN_BOOL(false); } /* * Accept the message from XLOG stream, and process it. */ static void -test_XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) +test_XLogWalRcvProcessMsg(unsigned char type, char *buf, + Size len, XLogRecPtr *logStreamStart) { switch (type) { @@ -139,9 +155,13 @@ test_XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) errmsg_internal("invalid WAL message received from primary"))); /* memcpy is required here for alignment reasons */ memcpy(&msghdr, buf, sizeof(WalDataMessageHeader)); + logStreamStart->xlogid = msghdr.dataStart.xlogid; + logStreamStart->xrecoff = msghdr.dataStart.xrecoff; - test_ProcessWalSndrMessage("wal records", - msghdr.walEnd, msghdr.sendTime); + test_PrintLog("wal end records", + msghdr.dataStart, msghdr.sendTime); + test_PrintLog("wal end records", + msghdr.walEnd, msghdr.sendTime); buf += sizeof(WalDataMessageHeader); len -= sizeof(WalDataMessageHeader); @@ -163,8 +183,8 @@ test_XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) keepalive.walEnd.xlogid, keepalive.walEnd.xrecoff, timestamptz_to_str(keepalive.sendTime)); - test_ProcessWalSndrMessage("keep alive", - keepalive.walEnd, keepalive.sendTime); + test_PrintLog("keep alive", + keepalive.walEnd, keepalive.sendTime); break; } default: @@ -175,7 +195,6 @@ test_XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) } } - /* * Write XLOG data to disk. */ @@ -202,7 +221,7 @@ test_XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) XLByteToSeg(recptr, recvId, recvSeg); XLogFileName(recvFilePath, recvFileTLI, recvId, recvSeg); - elog(INFO, "would open: %s", recvFilePath); + elog(DEBUG1, "would open: %s", recvFilePath); recvFileTLI = 1; } @@ -259,10 +278,10 @@ test_XLogWalRcvSendReply(void) * Just show the walEnd/sendTime information */ static void -test_ProcessWalSndrMessage(char *type, XLogRecPtr walEnd, +test_PrintLog(char *type, XLogRecPtr walPtr, TimestampTz sendTime) { - elog(INFO, "%s: %X/%X at %s", type, - walEnd.xlogid, walEnd.xrecoff, + elog(DEBUG1, "%s: %X/%X at %s", type, + walPtr.xlogid, walPtr.xrecoff, timestamptz_to_str(sendTime)); } diff --git a/src/test/walrep/input/setup.source b/src/test/walrep/input/setup.source new file mode 100644 index 0000000000..d932e5a8ba --- /dev/null +++ b/src/test/walrep/input/setup.source @@ -0,0 +1,14 @@ +create or replace function test_connect(text, text) RETURNS bool AS + '@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL; + +create or replace function test_disconnect() RETURNS bool AS + '@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL; + +create or replace function test_receive() RETURNS bool AS + '@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL; + +create or replace function test_send() RETURNS bool AS + '@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL; + +create or replace function test_receive_and_verify(text, text) RETURNS bool AS + '@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL; diff --git a/src/test/walrep/output/setup.source b/src/test/walrep/output/setup.source new file mode 100644 index 0000000000..d2d1614b40 --- /dev/null +++ b/src/test/walrep/output/setup.source @@ -0,0 +1,10 @@ +create or replace function test_connect(text, text) RETURNS bool AS + '@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL; +create or replace function test_disconnect() RETURNS bool AS + '@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL; +create or replace function test_receive() RETURNS bool AS + '@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL; +create or replace function test_send() RETURNS bool AS + '@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL; +create or replace function test_receive_and_verify(text, text) RETURNS bool AS + '@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL; diff --git a/src/test/walrep/sql/walreceiver.sql b/src/test/walrep/sql/walreceiver.sql new file mode 100644 index 0000000000..52ee9b303a --- /dev/null +++ b/src/test/walrep/sql/walreceiver.sql @@ -0,0 +1,36 @@ +-- negative cases +SELECT test_receive(); +SELECT test_send(); +SELECT test_disconnect(); + +-- Test connection +SELECT test_connect('', pg_current_xlog_location()); +-- Should report 1 replication +SELECT count(*) FROM pg_stat_replication; +SELECT test_disconnect(); +SELECT count(*) FROM pg_stat_replication; + +-- Test connection passing hostname +SELECT test_connect('host=localhost', pg_current_xlog_location()); +SELECT count(*) FROM pg_stat_replication; +SELECT test_disconnect(); +SELECT count(*) FROM pg_stat_replication; + +-- create table and store current_xlog_location. +create TEMP table tmp(startpoint text); +CREATE FUNCTION select_tmp() RETURNS text AS $$ +select startpoint from tmp; +$$ LANGUAGE SQL; +insert into tmp select pg_current_xlog_location(); + +-- lets generate some xlogs +create table testwalreceiver(a int); +insert into testwalreceiver select * from generate_series(0, 9); + +-- Connect and receive the xlogs, validate everything was received from start to +-- end +SELECT test_connect('', select_tmp()); +SELECT test_receive_and_verify(select_tmp(), pg_current_xlog_location()); +SELECT test_send(); +SELECT test_receive(); +SELECT test_disconnect(); -- GitLab