提交 be0219e6 编写于 作者: A Ashwin Agrawal

Basic walreceiver unit test.

Just a start to have wal replication tests in ICW. This has simple protocol
functions whichs kind-of mocks walreceiver side to help validate walsender and
xlog stream. Mainly to portray something on these lines can be easily leveraged
to validate like xlog generation and stream for AO tables when done, avoiding to
fully instantiate a mirror or something on similar lines at ease.
上级 9015c6cf
......@@ -131,6 +131,7 @@ distclean maintainer-clean:
installcheck-world:
$(MAKE) -C src/test installcheck-good
$(MAKE) -C src/test/fsync install && $(MAKE) -C src/test/fsync installcheck
$(MAKE) -C src/test/walrep install && $(MAKE) -C src/test/walrep installcheck
$(MAKE) -C src/test/isolation installcheck
$(MAKE) -C src/test/isolation2 installcheck
$(MAKE) -C src/pl installcheck
......
MODULES = gplibpq
PG_CONFIG = pg_config
PGXS = $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
-- start_ignore
SET gp_create_table_random_default_distribution=off;
-- end_ignore
CREATE OR REPLACE FUNCTION test_connect(text) RETURNS bool AS
'$libdir/gplibpq' LANGUAGE C;
CREATE OR REPLACE FUNCTION test_disconnect() RETURNS bool AS
'$libdir/gplibpq' LANGUAGE C;
CREATE OR REPLACE FUNCTION test_receive() RETURNS bool AS
'$libdir/gplibpq' LANGUAGE C;
CREATE OR REPLACE FUNCTION test_send() RETURNS bool AS
'$libdir/gplibpq' LANGUAGE C;
CREATE OR REPLACE FUNCTION test_scenario1(text) RETURNS bool AS
'$libdir/gplibpq' LANGUAGE C;
"""
Copyright (C) 2004-2015 Pivotal Software, Inc. All rights reserved.
This program and the accompanying materials are made available under
the terms of the under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import unittest2 as unittest
from tinctest import logger
from gppylib.commands.base import Command
from mpp.gpdb.tests.storage.walrepl import lib as walrepl
from mpp.gpdb.tests.storage.walrepl.walreceiver import GPLibPQTestCase
from mpp.gpdb.tests.storage.walrepl.lib.pqwrap import *
from mpp.gpdb.tests.storage.walrepl.lib.pg_util import GpUtility
from mpp.lib.config import GPDBConfig
import select
gputil = GpUtility()
gputil.check_and_start_gpdb()
config = GPDBConfig()
class case(GPLibPQTestCase):
"""Basic test cases to see gplibpq and walsender are talking correctly."""
def setUp(self):
# cleanup
cmd = Command('gpinitstandby', 'gpinitstandby -ar')
# don't care the result in case standby is not configured
cmd.run()
@unittest.skipIf(config.is_multinode(), "Test applies only to a singlenode cluster")
def test_connect(self):
"""
@tags sanity
"""
with PGconn("") as conn:
res = conn.execute("SELECT test_connect('')")
self.assertEqual(res.status(), PGRES_TUPLES_OK)
self.assertTrue(res.getvalue(0, 0, convert=True))
cnt = self.count_walsender(conn)
self.assertEqual(cnt, 1)
@unittest.skipIf(config.is_multinode(), "Test applies only to a singlenode cluster")
def test_connect_fail(self):
"""
@tags sanity
"""
with PGconn("") as conn:
# the connection failure when conninfo is wrong.
res = conn.execute("SELECT test_connect('host=none')")
self.assertEqual(res.status(), PGRES_FATAL_ERROR)
errmsg = res.error_message()
self.assertIn("could not connect", errmsg)
@unittest.skipIf(config.is_multinode(), "Test applies only to a singlenode cluster")
def test_connect_host(self):
"""
@tags sanity
"""
with PGconn("") as conn:
res = conn.execute("SELECT test_connect('host=localhost')")
self.assertEqual(res.status(), PGRES_TUPLES_OK)
cnt = self.count_walsender(conn)
self.assertEqual(cnt, 1)
@unittest.skipIf(config.is_multinode(), "Test applies only to a singlenode cluster")
def test_disconnect(self):
"""
@tags sanity
"""
with PGconn("") as conn:
# check disconnect does not fail even before connect
res = conn.execute("SELECT test_disconnect()")
self.assertEqual(res.status(), PGRES_TUPLES_OK)
self.assertTrue(res.getvalue(0, 0, convert=True))
# check if disconnect clears the walsender
res = conn.execute("SELECT test_connect('')")
self.assertEqual(res.status(), PGRES_TUPLES_OK)
res = conn.execute("SELECT test_disconnect()")
self.assertEqual(res.status(), PGRES_TUPLES_OK)
# wait for the backend terminates...
for i in walrepl.polling(10, 0.5):
cnt = self.count_walsender(conn)
if cnt == 0:
break
self.assertEqual(cnt, 0)
@unittest.skipIf(config.is_multinode(), "Test applies only to a singlenode cluster")
def test_receive_fail(self):
"""
@tags sanity
"""
with PGconn("") as conn:
# Simply fails if not connected.
res = conn.execute("SELECT test_receive()")
self.assertEqual(res.status(), PGRES_FATAL_ERROR)
errmsg = res.error_message()
self.assertIn("connection pointer is NULL", errmsg)
@unittest.skipIf(config.is_multinode(), "Test applies only to a singlenode cluster")
def test_receive(self):
"""
@tags sanity
"""
with PGconn("") as conn:
res = conn.execute("SELECT test_connect('')")
self.assertEqual(res.status(), PGRES_TUPLES_OK)
# the first call should get reply.
res = conn.execute("SELECT test_receive()")
# the second call should time out.
res = conn.execute("SELECT test_receive()")
res = conn.execute("SELECT test_receive()")
res = conn.execute("SELECT test_receive()")
res = conn.execute("SELECT test_receive()")
res = conn.execute("SELECT test_receive()")
res = conn.execute("SELECT test_receive()")
self.assertFalse(res.getvalue(0, 0, convert=True))
@unittest.skipIf(config.is_multinode(), "Test applies only to a singlenode cluster")
def test_send_fail(self):
"""
@tags sanity
"""
with PGconn("") as conn:
# Simply fails if not connected.
res = conn.execute("SELECT test_send()")
self.assertEqual(res.status(), PGRES_FATAL_ERROR)
errmsg = res.error_message()
self.assertIn("connection pointer is NULL", errmsg)
@unittest.skipIf(config.is_multinode(), "Test applies only to a singlenode cluster")
def test_send(self):
"""
@tags sanity
"""
with PGconn("") as conn:
res = conn.execute("SELECT test_connect('')")
self.assertEqual(res.status(), PGRES_TUPLES_OK)
res = conn.execute("SELECT test_send()")
self.assertTrue(res.getvalue(0, 0, convert=True))
if __name__ == '__main__':
# just a test of async interface.
def printer(arg, res):
res = PGresult(res)
print res.error_message()
with PGconn("") as conn:
handler = NoticeReceiverFunc(printer)
conn.set_notice_receiver(handler, None)
while True:
conn.send_query("SELECT test_scenario1('')")
fd = [conn]
ready = select.select(fd, [], [], 0.1)[0]
if len(ready) > 0:
if conn.consume_input() == 0:
raise StandardError("cannot read input")
if conn.is_busy() == 0:
res = conn.get_result()
# Generated directories and files
/results/
/expected/setup.out
/sql/setup.sql
MODULES=gplibpq
PG_CONFIG=pg_config
REGRESS = setup walreceiver
REGRESS_OPTS = --dbname="walrep_regression"
subdir = src/test/walrep/
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
NO_PGXS = 1
include $(top_srcdir)/src/makefiles/pgxs.mk
-- negative cases
SELECT test_receive();
ERROR: could not receive data from WAL stream: connection pointer is NULL (gp_libpqwalreceiver.c:403)
SELECT test_send();
ERROR: could not send data to WAL stream: connection pointer is NULL (gp_libpqwalreceiver.c:430)
SELECT test_disconnect();
test_disconnect
-----------------
t
(1 row)
-- Test connection
SELECT test_connect('', pg_current_xlog_location());
test_connect
--------------
t
(1 row)
-- Should report 1 replication
SELECT count(*) FROM pg_stat_replication;
count
-------
1
(1 row)
SELECT test_disconnect();
test_disconnect
-----------------
t
(1 row)
SELECT count(*) FROM pg_stat_replication;
count
-------
0
(1 row)
-- Test connection passing hostname
SELECT test_connect('host=localhost', pg_current_xlog_location());
test_connect
--------------
t
(1 row)
SELECT count(*) FROM pg_stat_replication;
count
-------
1
(1 row)
SELECT test_disconnect();
test_disconnect
-----------------
t
(1 row)
SELECT count(*) FROM pg_stat_replication;
count
-------
0
(1 row)
-- create table and store current_xlog_location.
create TEMP table tmp(startpoint text);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'startpoint' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
CREATE FUNCTION select_tmp() RETURNS text AS $$
select startpoint from tmp;
$$ LANGUAGE SQL;
insert into tmp select pg_current_xlog_location();
-- lets generate some xlogs
create table testwalreceiver(a int);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
insert into testwalreceiver select * from generate_series(0, 9);
-- Connect and receive the xlogs, validate everything was received from start to
-- end
SELECT test_connect('', select_tmp());
test_connect
--------------
t
(1 row)
SELECT test_receive_and_verify(select_tmp(), pg_current_xlog_location());
test_receive_and_verify
-------------------------
t
(1 row)
SELECT test_send();
test_send
-----------
t
(1 row)
SELECT test_receive();
test_receive
--------------
f
(1 row)
SELECT test_disconnect();
test_disconnect
-----------------
t
(1 row)
......@@ -21,32 +21,50 @@ static struct
XLogRecPtr Flush;
} LogstreamResult;
static void test_XLogWalRcvProcessMsg(unsigned char type,
char *buf, Size len);
static void test_XLogWalRcvProcessMsg(unsigned char type, char *buf,
Size len, XLogRecPtr *logStreamStart);
static void test_XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void test_XLogWalRcvSendReply(void);
static void test_ProcessWalSndrMessage(char *type, XLogRecPtr walEnd,
TimestampTz sendTime);
static void test_PrintLog(char *type, XLogRecPtr walPtr,
TimestampTz sendTime);
Datum test_connect(PG_FUNCTION_ARGS);
Datum test_disconnect(PG_FUNCTION_ARGS);
Datum test_receive(PG_FUNCTION_ARGS);
Datum test_send(PG_FUNCTION_ARGS);
Datum test_scenario1(PG_FUNCTION_ARGS);
Datum test_receive_and_verify(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(test_connect);
PG_FUNCTION_INFO_V1(test_disconnect);
PG_FUNCTION_INFO_V1(test_receive);
PG_FUNCTION_INFO_V1(test_send);
PG_FUNCTION_INFO_V1(test_scenario1);
PG_FUNCTION_INFO_V1(test_receive_and_verify);
static void
string_to_xlogrecptr(text *location, XLogRecPtr *rec)
{
char *locationstr = DatumGetCString(
DirectFunctionCall1(textout,
PointerGetDatum(location)));
if (sscanf(locationstr, "%X/%X", &rec->xlogid, &rec->xrecoff) != 2)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse transaction log location \"%s\"",
locationstr)));
}
Datum
test_connect(PG_FUNCTION_ARGS)
{
char *conninfo = TextDatumGetCString(PG_GETARG_DATUM(0));
bool result;
text *location = PG_GETARG_TEXT_P(1);
bool result;
XLogRecPtr startpoint;
result = walrcv_connect(conninfo, GetRedoRecPtr());
string_to_xlogrecptr(location, &startpoint);
result = walrcv_connect(conninfo, startpoint);
PG_RETURN_BOOL(result);
}
......@@ -80,52 +98,50 @@ test_send(PG_FUNCTION_ARGS)
}
Datum
test_scenario1(PG_FUNCTION_ARGS)
test_receive_and_verify(PG_FUNCTION_ARGS)
{
char *conninfo = TextDatumGetCString(PG_GETARG_DATUM(0));
unsigned char type;
text *start_location = PG_GETARG_TEXT_P(0);
text *end_location = PG_GETARG_TEXT_P(1);
XLogRecPtr startpoint;
XLogRecPtr endpoint;
unsigned char type;
char *buf;
int len;
int len;
if (!walrcv_connect(conninfo, GetRedoRecPtr()))
elog(ERROR, "could not connect");
string_to_xlogrecptr(start_location, &startpoint);
string_to_xlogrecptr(end_location, &endpoint);
for (;;)
/* Pending to check why first walrcv_receive returns nothing */
walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len);
if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
{
CHECK_FOR_INTERRUPTS();
if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
{
elog(INFO, "got message: type = %c", type);
XLogRecPtr logStreamStart;
/* Accept the received data, and process it */
test_XLogWalRcvProcessMsg(type, buf, len, &logStreamStart);
/* Accept the received data, and process it */
test_XLogWalRcvProcessMsg(type, buf, len);
/* Compare received everthing from start */
if (startpoint.xlogid != logStreamStart.xlogid ||
startpoint.xrecoff != logStreamStart.xrecoff)
PG_RETURN_BOOL(false);
/* Receive any more data we can without sleeping */
while (walrcv_receive(0, &type, &buf, &len))
test_XLogWalRcvProcessMsg(type, buf, len);
/* Compare received everthing till end */
if (endpoint.xlogid != LogstreamResult.Write.xlogid ||
endpoint.xrecoff != LogstreamResult.Write.xrecoff)
PG_RETURN_BOOL(false);
/* Let the primary know that we received some data. */
test_XLogWalRcvSendReply();
}
else
{
/*
*
* We didn't receive anything new, but send a status update to the
* master anyway, to report any progress in applying WAL.
*/
test_XLogWalRcvSendReply();
}
PG_RETURN_BOOL(true);
}
PG_RETURN_NULL();
PG_RETURN_BOOL(false);
}
/*
* Accept the message from XLOG stream, and process it.
*/
static void
test_XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
test_XLogWalRcvProcessMsg(unsigned char type, char *buf,
Size len, XLogRecPtr *logStreamStart)
{
switch (type)
{
......@@ -139,9 +155,13 @@ test_XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
errmsg_internal("invalid WAL message received from primary")));
/* memcpy is required here for alignment reasons */
memcpy(&msghdr, buf, sizeof(WalDataMessageHeader));
logStreamStart->xlogid = msghdr.dataStart.xlogid;
logStreamStart->xrecoff = msghdr.dataStart.xrecoff;
test_ProcessWalSndrMessage("wal records",
msghdr.walEnd, msghdr.sendTime);
test_PrintLog("wal end records",
msghdr.dataStart, msghdr.sendTime);
test_PrintLog("wal end records",
msghdr.walEnd, msghdr.sendTime);
buf += sizeof(WalDataMessageHeader);
len -= sizeof(WalDataMessageHeader);
......@@ -163,8 +183,8 @@ test_XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
keepalive.walEnd.xlogid,
keepalive.walEnd.xrecoff,
timestamptz_to_str(keepalive.sendTime));
test_ProcessWalSndrMessage("keep alive",
keepalive.walEnd, keepalive.sendTime);
test_PrintLog("keep alive",
keepalive.walEnd, keepalive.sendTime);
break;
}
default:
......@@ -175,7 +195,6 @@ test_XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
}
}
/*
* Write XLOG data to disk.
*/
......@@ -202,7 +221,7 @@ test_XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
XLByteToSeg(recptr, recvId, recvSeg);
XLogFileName(recvFilePath, recvFileTLI, recvId, recvSeg);
elog(INFO, "would open: %s", recvFilePath);
elog(DEBUG1, "would open: %s", recvFilePath);
recvFileTLI = 1;
}
......@@ -259,10 +278,10 @@ test_XLogWalRcvSendReply(void)
* Just show the walEnd/sendTime information
*/
static void
test_ProcessWalSndrMessage(char *type, XLogRecPtr walEnd,
test_PrintLog(char *type, XLogRecPtr walPtr,
TimestampTz sendTime)
{
elog(INFO, "%s: %X/%X at %s", type,
walEnd.xlogid, walEnd.xrecoff,
elog(DEBUG1, "%s: %X/%X at %s", type,
walPtr.xlogid, walPtr.xrecoff,
timestamptz_to_str(sendTime));
}
create or replace function test_connect(text, text) RETURNS bool AS
'@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL;
create or replace function test_disconnect() RETURNS bool AS
'@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL;
create or replace function test_receive() RETURNS bool AS
'@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL;
create or replace function test_send() RETURNS bool AS
'@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL;
create or replace function test_receive_and_verify(text, text) RETURNS bool AS
'@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL;
create or replace function test_connect(text, text) RETURNS bool AS
'@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL;
create or replace function test_disconnect() RETURNS bool AS
'@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL;
create or replace function test_receive() RETURNS bool AS
'@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL;
create or replace function test_send() RETURNS bool AS
'@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL;
create or replace function test_receive_and_verify(text, text) RETURNS bool AS
'@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL;
-- negative cases
SELECT test_receive();
SELECT test_send();
SELECT test_disconnect();
-- Test connection
SELECT test_connect('', pg_current_xlog_location());
-- Should report 1 replication
SELECT count(*) FROM pg_stat_replication;
SELECT test_disconnect();
SELECT count(*) FROM pg_stat_replication;
-- Test connection passing hostname
SELECT test_connect('host=localhost', pg_current_xlog_location());
SELECT count(*) FROM pg_stat_replication;
SELECT test_disconnect();
SELECT count(*) FROM pg_stat_replication;
-- create table and store current_xlog_location.
create TEMP table tmp(startpoint text);
CREATE FUNCTION select_tmp() RETURNS text AS $$
select startpoint from tmp;
$$ LANGUAGE SQL;
insert into tmp select pg_current_xlog_location();
-- lets generate some xlogs
create table testwalreceiver(a int);
insert into testwalreceiver select * from generate_series(0, 9);
-- Connect and receive the xlogs, validate everything was received from start to
-- end
SELECT test_connect('', select_tmp());
SELECT test_receive_and_verify(select_tmp(), pg_current_xlog_location());
SELECT test_send();
SELECT test_receive();
SELECT test_disconnect();
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册