提交 42a45941 编写于 作者: S Shoaib Lari

Generate Xlog for AO/CO operations.

Define a xlog format for AO operations. The xlog is generated when an AO block
is written.

A test is added to receive the AO log from the WAL Sender process after an
INSERT operation and verify that the the received xlog has the AO xlog record in
it.
Signed-off-by: NAbhijit Subramanya <asubramanya@pivotal.io>
上级 dde52e58
......@@ -542,6 +542,13 @@ dumpXLogRecord(XLogRecord *record, bool header_only)
case RM_SEQ_ID:
print_rmgr_seq(curRecPtr, record, info);
break;
#ifdef USE_SEGWALREP
case RM_APPEND_ONLY_ID:
print_rmgr_ao(curRecPtr, record, info);
break;
#endif /* USE_SEGWALREP */
default:
fprintf(stderr, "Unknown RMID %d.\n", record->xl_rmid);
break;
......
......@@ -18,6 +18,7 @@
#include "access/xact.h"
#include "catalog/pg_control.h"
#include "commands/dbcommands.h"
#include "cdb/cdbappendonlyam.h"
#if PG_VERSION_NUM >=90000
#include "utils/relmapper.h"
......@@ -53,7 +54,15 @@ const char * const RM_names[RM_MAX_ID+1] = {
"Sequence", /* 15 */
#if PG_VERSION_NUM >=90200
"SPGist" /* 16 */
#endif
#else
"Reserved 16",
#endif /* PG_VERSION_NUM >=90200 */
"DistributedLog", /* 17 */
"MasterMirrorLog", /* 18 */
#ifdef USE_SEGWALREP
"Appendonly" /* 19 */
#endif /* USE_SEGWALREP */
};
/* copy from utils/timestamp.h */
......@@ -1392,3 +1401,29 @@ print_rmgr_seq(XLogRecPtr cur, XLogRecord *record, uint8 info)
print_rmgr_record(cur, record, "seq");
}
#ifdef USE_SEGWALREP
void
print_rmgr_ao(XLogRecPtr cur, XLogRecord *record, uint8 info)
{
char spaceName[NAMEDATALEN];
char dbName[NAMEDATALEN];
char relName[NAMEDATALEN];
char buf[1024];
xl_ao_insert xlrec;
uint64 len;
memcpy(&xlrec, XLogRecGetData(record), sizeof(xlrec));
len = record->xl_len - SizeOfAOInsert;
getSpaceName(xlrec.node.spcNode, spaceName, sizeof(spaceName));
getDbName(xlrec.node.dbNode, dbName, sizeof(dbName));
getRelName(xlrec.node.relNode, relName, sizeof(relName));
snprintf(buf, sizeof(buf), "insert: s/d/r:%s/%s/%s segfile/off:%u/%lu, len: %lu",
spaceName, dbName, relName,
xlrec.segment_filenum,
xlrec.offset, len);
print_rmgr_record(cur, record, buf);
}
#endif /* USE_SEGWALREP */
......@@ -77,4 +77,8 @@ void print_rmgr_gin(XLogRecPtr, XLogRecord *, uint8);
void print_rmgr_gist(XLogRecPtr, XLogRecord *, uint8);
void print_rmgr_seq(XLogRecPtr, XLogRecord *, uint8);
#ifdef USE_SEGWALREP
void print_rmgr_ao(XLogRecPtr, XLogRecord *, uint8);
#endif /* USE_SEGWALREP */
#endif /* __XLOGDUMP_RMGR_H__ */
......@@ -23,7 +23,7 @@
#include "commands/sequence.h"
#include "commands/tablespace.h"
#include "storage/smgr.h"
#include "cdb/cdbappendonlyam.h"
const RmgrData RmgrTable[RM_MAX_ID + 1] = {
{"XLOG", xlog_redo, xlog_desc, NULL, NULL, NULL},
......@@ -44,5 +44,10 @@ const RmgrData RmgrTable[RM_MAX_ID + 1] = {
{"Sequence", seq_redo, seq_desc, NULL, NULL, NULL},
{"Bitmap", bitmap_redo, bitmap_desc, bitmap_xlog_startup, bitmap_xlog_cleanup, bitmap_safe_restartpoint},
{"DistributedLog", DistributedLog_redo, DistributedLog_desc, NULL, NULL, NULL},
{"Master Mirror Log Records", mmxlog_redo, mmxlog_desc, NULL, NULL, NULL}
{"Master Mirror Log Records", mmxlog_redo, mmxlog_desc, NULL, NULL, NULL},
#ifdef USE_SEGWALREP
{"Appendonly Table Log Records", appendonly_redo, appendonly_desc, NULL, NULL, NULL}
#endif /* USE_SEGWALREP */
};
......@@ -12,6 +12,11 @@
#include "cdb/cdbappendonlystorage.h"
#include "utils/guc.h"
#ifdef USE_SEGWALREP
#include "cdb/cdbappendonlyam.h"
#endif /* USE_SEGWALREP */
int32 AppendOnlyStorage_GetUsableBlockSize(int32 configBlockSize)
{
int32 result;
......@@ -28,3 +33,29 @@ int32 AppendOnlyStorage_GetUsableBlockSize(int32 configBlockSize)
return result;
}
#ifdef USE_SEGWALREP
void
appendonly_redo(XLogRecPtr beginLoc, XLogRecPtr lsn, XLogRecord *record)
{
/* TODO add logic here to replay AO xlog records */
}
void
appendonly_desc(StringInfo buf, XLogRecPtr beginLoc, XLogRecord *record)
{
xl_ao_insert *xlrec = (xl_ao_insert *)XLogRecGetData(record);
uint8 xl_info = record->xl_info;
uint8 info = xl_info & ~XLR_INFO_MASK;
if (info == XLOG_APPENDONLY_INSERT)
{
appendStringInfo(buf, "insert: rel %u/%u/%u seg/offset:%u/%lu len:%lu",
xlrec->node.spcNode, xlrec->node.dbNode,
xlrec->node.relNode, xlrec->segment_filenum,
xlrec->offset, record->xl_len - SizeOfAOInsert);
}
else
appendStringInfo(buf, "UNKNOWN");
}
#endif /* USE_SEGWALREP */
......@@ -29,6 +29,14 @@
#include "cdb/cdbpersistentstore.h"
#include "cdb/cdbpersistentrecovery.h"
#ifdef USE_SEGWALREP
#include "cdb/cdbappendonlyam.h"
static void insert_ao_xlog(MirroredAppendOnlyOpen *open, void *buffer,
int32 bufferLen);
#endif /* USE_SEGWALREP */
static void MirroredAppendOnly_SetUpMirrorAccess(
RelFileNode *relFileNode,
/* The tablespace, database, and relation OIDs for the open. */
......@@ -2189,6 +2197,12 @@ void MirroredAppendOnly_Append(
if (StorageManagerMirrorMode_DoPrimaryWork(open->mirrorMode) &&
!open->copyToMirror)
{
#ifdef USE_SEGWALREP
/* Log each varblock to the XLog. */
insert_ao_xlog(open, buffer, bufferLen);
#endif /* USE_SEGWALREP */
errno = 0;
if ((int) FileWrite(open->primaryFile, buffer, bufferLen) != bufferLen)
......@@ -2209,6 +2223,42 @@ void MirroredAppendOnly_Append(
}
#ifdef USE_SEGWALREP
/*
* Insert an AO XLOG/AOCO record
*/
static void insert_ao_xlog(MirroredAppendOnlyOpen *open, void *buffer,
int32 bufferLen)
{
xl_ao_insert xlaoinsert;
XLogRecData rdata[2];
xlaoinsert.node = open->relFileNode;
xlaoinsert.segment_filenum = open->segmentFileNum;
/*
* Using FileSeek to fetch the current write offset.
* Passing 0 offset with SEEK_CUR avoids actual disk-io,
* as it just returns from VFDCache the current file position value.
* Make sure to populate this before the FileWrite call else the file
* pointer has moved forward.
*/
xlaoinsert.offset = FileSeek(open->primaryFile, 0, SEEK_CUR);
rdata[0].data = (char*) &xlaoinsert;
rdata[0].len = SizeOfAOInsert;
rdata[0].buffer = InvalidBuffer;
rdata[0].next = &(rdata[1]);
rdata[1].data = (char*) buffer;
rdata[1].len = bufferLen;
rdata[1].buffer = InvalidBuffer;
rdata[1].next = NULL;
XLogInsert(RM_APPEND_ONLY_ID, XLOG_APPENDONLY_INSERT, rdata);
}
#endif /* USE_SEGWALREP */
// -----------------------------------------------------------------------------
// Truncate
// ----------------------------------------------------------------------------
......
......@@ -584,8 +584,13 @@ void ChangeTracking_GetRelationChangeInfoFromXlog(
case RM_DBASE_ID:
case RM_TBLSPC_ID:
case RM_MMXLOG_ID:
#ifdef USE_SEGWALREP
case RM_APPEND_ONLY_ID:
#endif /* USE_SEGWALREP */
break;
/*
* These aren't supported in GPDB
*/
......
......@@ -33,6 +33,12 @@ typedef uint8 RmgrId;
#define RM_BITMAP_ID 16
#define RM_DISTRIBUTEDLOG_ID 17
#define RM_MMXLOG_ID 18
#define RM_MAX_ID RM_MMXLOG_ID
#ifdef USE_SEGWALREP
#define RM_APPEND_ONLY_ID 19
#define RM_MAX_ID RM_APPEND_ONLY_ID
#else
#define RM_MAX_ID RM_MMXLOG_ID
#endif /* USE_SEGWALREP */
#endif /* RMGR_H */
......@@ -367,5 +367,25 @@ extern HTSU_Result appendonly_update(
MemTuple memTuple,
AOTupleId* aoTupleId,
AOTupleId* newAoTupleId);
#ifdef USE_SEGWALREP
#define XLOG_APPENDONLY_INSERT 0x00
typedef struct xl_ao_insert
{
/* meta data about the inserted block of AO data*/
RelFileNode node;
uint segment_filenum;
uint64 offset;
/* BLOCK DATA FOLLOWS AT END OF STRUCT */
} xl_ao_insert;
#define SizeOfAOInsert (offsetof(xl_ao_insert, offset) + sizeof(uint64))
extern void appendonly_redo(XLogRecPtr beginLoc, XLogRecPtr lsn, XLogRecord *record);
extern void appendonly_desc(StringInfo buf, XLogRecPtr beginLoc, XLogRecord *record);
#endif /* USE_SEGWALREP */
extern void appendonly_update_finish(AppendOnlyUpdateDesc aoUpdateDesc);
#endif /* CDBAPPENDONLYAM_H */
MODULES=gplibpq
PG_CONFIG=pg_config
REGRESS = setup walreceiver
REGRESS_OPTS = --dbname="walrep_regression"
subdir = src/test/walrep/
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
REGRESS = setup walreceiver
ifeq ($(enable_segwalrep), yes)
REGRESS += generate_ao_xlog generate_aoco_xlog
endif
REGRESS_OPTS = --dbname="walrep_regression"
NO_PGXS = 1
include $(top_srcdir)/src/makefiles/pgxs.mk
-- Test AO XLogging
CREATE OR REPLACE FUNCTION get_ao_eof(tablename TEXT) RETURNS BIGINT[] AS
$$
DECLARE
eofval BIGINT[];
eof_scalar BIGINT;
BEGIN
SELECT eof INTO eof_scalar FROM gp_toolkit.__gp_aoseg_name(tablename);
eofval[0] := eof_scalar;
RETURN eofval;
END;
$$ LANGUAGE plpgsql;
CREATE TABLE generate_ao_xlog_table(a INT, b INT) WITH (APPENDONLY=TRUE);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
-- Store the location of xlog in a temporary table so that we can
-- use it to request walsender to start streaming from this point
CREATE TEMP TABLE tmp(startpoint TEXT);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'startpoint' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
INSERT INTO tmp SELECT pg_current_xlog_location() FROM
gp_dist_random('gp_id') WHERE gp_segment_id = 0;
-- Generate some xlog records for AO
INSERT INTO generate_ao_xlog_table VALUES(1, 10);
-- Verify that AO xlog record was received
SELECT test_xlog_ao((SELECT 'port=' || port FROM gp_segment_configuration
WHERE dbid=2),
(SELECT startpoint FROM tmp),
(SELECT oid FROM pg_tablespace WHERE spcname = 'pg_default'),
(SELECT oid FROM pg_database
WHERE datname = current_database()),
(SELECT relfilenode FROM gp_dist_random('pg_class')
WHERE relname = 'generate_ao_xlog_table'
AND gp_segment_id = 0),
(SELECT get_ao_eof('generate_ao_xlog_table')
FROM gp_dist_random('gp_id')
WHERE gp_segment_id = 0),
false) FROM
gp_dist_random('gp_id') WHERE gp_segment_id = 0;
test_xlog_ao
--------------
1
(1 row)
-- Test AOCO XLogging
CREATE OR REPLACE FUNCTION get_aoco_eofs(tablename TEXT) RETURNS BIGINT[] AS
$$
DECLARE
eofvals BIGINT[];
i INT;
result RECORD;
BEGIN
i := 0;
FOR result IN SELECT * FROM gp_toolkit.__gp_aocsseg_name(tablename)
LOOP
eofvals[i] := result.eof;
i := i + 1;
END LOOP;
RETURN eofvals;
END;
$$ LANGUAGE plpgsql;
CREATE TABLE generate_aoco_xlog_table(a INT, b INT) WITH (APPENDONLY=TRUE, ORIENTATION=COLUMN);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
-- Store the location of xlog in a temporary table so that we can
-- use it to request walsender to start streaming from this point
CREATE TEMP TABLE tmp(startpoint TEXT);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'startpoint' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
INSERT INTO tmp SELECT pg_current_xlog_location() FROM
gp_dist_random('gp_id') WHERE gp_segment_id = 0;
-- Generate some xlog records for AOCO
INSERT INTO generate_aoco_xlog_table VALUES(1, 10);
-- Verify that AOCO xlog record was received
SELECT test_xlog_ao((SELECT 'port=' || port FROM gp_segment_configuration
WHERE dbid=2),
(SELECT startpoint FROM tmp),
(SELECT oid FROM pg_tablespace WHERE spcname = 'pg_default'),
(SELECT oid FROM pg_database
WHERE datname = current_database()),
(SELECT relfilenode FROM gp_dist_random('pg_class')
WHERE relname = 'generate_aoco_xlog_table'
AND gp_segment_id = 0),
(SELECT get_aoco_eofs('generate_aoco_xlog_table')
FROM gp_dist_random('gp_id')
WHERE gp_segment_id = 0),
true) FROM
gp_dist_random('gp_id') WHERE gp_segment_id = 0;
test_xlog_ao
--------------
2
(1 row)
......@@ -10,6 +10,7 @@
#include "access/xlog_internal.h"
#include "replication/walprotocol.h"
#include "replication/walreceiver.h"
#include "cdb/cdbappendonlyam.h"
PG_MODULE_MAGIC;
......@@ -28,17 +29,25 @@ static void test_XLogWalRcvSendReply(void);
static void test_PrintLog(char *type, XLogRecPtr walPtr,
TimestampTz sendTime);
#ifdef USE_SEGWALREP
static int check_ao_record_present(unsigned char type, char *buf, Oid spc_node,
Oid db_node, Oid rel_node, Size len,
uint64 eof[], bool aoco);
#endif /* USE_SEGWALREP */
Datum test_connect(PG_FUNCTION_ARGS);
Datum test_disconnect(PG_FUNCTION_ARGS);
Datum test_receive(PG_FUNCTION_ARGS);
Datum test_send(PG_FUNCTION_ARGS);
Datum test_receive_and_verify(PG_FUNCTION_ARGS);
Datum test_xlog_ao(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(test_connect);
PG_FUNCTION_INFO_V1(test_disconnect);
PG_FUNCTION_INFO_V1(test_receive);
PG_FUNCTION_INFO_V1(test_send);
PG_FUNCTION_INFO_V1(test_receive_and_verify);
PG_FUNCTION_INFO_V1(test_xlog_ao);
static void
string_to_xlogrecptr(text *location, XLogRecPtr *rec)
......@@ -170,13 +179,14 @@ test_XLogWalRcvProcessMsg(unsigned char type, char *buf,
logStreamStart->xlogid = msghdr.dataStart.xlogid;
logStreamStart->xrecoff = msghdr.dataStart.xrecoff;
test_PrintLog("wal end records",
test_PrintLog("wal start records",
msghdr.dataStart, msghdr.sendTime);
test_PrintLog("wal end records",
msghdr.walEnd, msghdr.sendTime);
buf += sizeof(WalDataMessageHeader);
len -= sizeof(WalDataMessageHeader);
test_XLogWalRcvWrite(buf, len, msghdr.dataStart);
break;
}
......@@ -293,7 +303,123 @@ static void
test_PrintLog(char *type, XLogRecPtr walPtr,
TimestampTz sendTime)
{
elog(DEBUG1, "%s: %X/%X at %s", type,
walPtr.xlogid, walPtr.xrecoff,
timestamptz_to_str(sendTime));
elog(DEBUG1, "%s: %X/%X at %s", type, walPtr.xlogid, walPtr.xrecoff,
timestamptz_to_str(sendTime));
}
/*
* Verify that XLOG records are being generated for AO tables and are getting
* shipped to the WAL receiver.
*/
Datum
test_xlog_ao(PG_FUNCTION_ARGS)
{
int num_found = 0;
#ifdef USE_SEGWALREP
char *conninfo = TextDatumGetCString(PG_GETARG_DATUM(0));
text *start_location = PG_GETARG_TEXT_P(1);
Oid spc_node = PG_GETARG_OID(2);
Oid db_node = PG_GETARG_OID(3);
Oid rel_node = PG_GETARG_OID(4);
uint64 *eof = (uint64 *)ARR_DATA_PTR(PG_GETARG_ARRAYTYPE_P(5));
bool aoco = PG_GETARG_BOOL(6);
XLogRecPtr startpoint;
unsigned char type;
char *buf;
int len;
string_to_xlogrecptr(start_location, &startpoint);
if (!walrcv_connect(conninfo, startpoint))
elog(ERROR, "could not connect");
for (int i = 0; i < NUM_RETRIES; i++)
{
if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
{
num_found = check_ao_record_present(type, buf, spc_node, db_node,
rel_node, len, eof, aoco);
break;
}
else
elog(LOG, "walrcv_receive didn't return anything, retry...%d", i);
}
walrcv_disconnect();
#endif /* USE_SEGWALREP */
PG_RETURN_INT32(num_found);
}
#ifdef USE_SEGWALREP
/*
* Verify that AO/AOCO XLOG record is present in buf.
* Returns the number of AO/AOCO XLOG records found in buf.
*/
static int
check_ao_record_present(unsigned char type, char *buf, Oid spc_node,
Oid db_node, Oid rel_node, Size len, uint64 eof[100],
bool aoco)
{
WalDataMessageHeader msghdr;
uint32 i = 0;
int num_found = 0;
int segfilenum = 0;
if (type != 'w')
return false;
if (len < sizeof(WalDataMessageHeader))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid WAL message received from primary")));
Assert(buf != NULL);
/* memcpy is required here for alignment reasons */
memcpy(&msghdr, buf, sizeof(WalDataMessageHeader));
test_PrintLog("wal start record", msghdr.dataStart, msghdr.sendTime);
test_PrintLog("wal end record", msghdr.walEnd, msghdr.sendTime);
buf += sizeof(WalDataMessageHeader);
len -= sizeof(WalDataMessageHeader);
/* process the xlog records one at a time and check if it is an AO/AOCO record */
while (i < len)
{
XLogRecord *xlrec = (XLogRecord *)(buf + i);
i += MAXALIGN(xlrec->xl_tot_len);
if (xlrec->xl_rmid == RM_APPEND_ONLY_ID)
{
xl_ao_insert *xlaoinsert = (xl_ao_insert *)XLogRecGetData(xlrec);
if (aoco)
segfilenum = AOTupleId_MultiplierSegmentFileNum * num_found + 1;
else
segfilenum = 1;
if (xlaoinsert->node.spcNode == spc_node &&
xlaoinsert->node.dbNode == db_node &&
xlaoinsert->node.relNode == rel_node &&
xlaoinsert->segment_filenum == segfilenum &&
xlaoinsert->offset == 0 &&
xlrec->xl_len - SizeOfAOInsert == eof[num_found])
num_found++;
else
{
elog(INFO, "Expected values: relfile %u/%u/%u segfile/offset %u/%u eof %lu",
spc_node, db_node, rel_node, segfilenum, 0, eof[num_found]);
elog(INFO, "Actual values: relfile %u/%u/%u segfile/offset %u/%lu eof %lu",
xlaoinsert->node.spcNode, xlaoinsert->node.dbNode,
xlaoinsert->node.relNode, xlaoinsert->segment_filenum,
xlaoinsert->offset, xlrec->xl_len - SizeOfAOInsert);
}
}
}
return num_found;
}
#endif /* USE_SEGWALREP */
......@@ -12,3 +12,7 @@ create or replace function test_send() RETURNS bool AS
create or replace function test_receive_and_verify(text, text) RETURNS bool AS
'@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL;
create or replace function test_xlog_ao(text, text, oid, oid, oid, bigint[], bool)
RETURNS int AS
'@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL;
......@@ -8,3 +8,6 @@ create or replace function test_send() RETURNS bool AS
'@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL;
create or replace function test_receive_and_verify(text, text) RETURNS bool AS
'@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL;
create or replace function test_xlog_ao(text, text, oid, oid, oid, bigint[], bool)
RETURNS int AS
'@abs_builddir@/gplibpq@DLSUFFIX@' LANGUAGE C VOLATILE STRICT NO SQL;
-- Test AO XLogging
CREATE OR REPLACE FUNCTION get_ao_eof(tablename TEXT) RETURNS BIGINT[] AS
$$
DECLARE
eofval BIGINT[];
eof_scalar BIGINT;
BEGIN
SELECT eof INTO eof_scalar FROM gp_toolkit.__gp_aoseg_name(tablename);
eofval[0] := eof_scalar;
RETURN eofval;
END;
$$ LANGUAGE plpgsql;
CREATE TABLE generate_ao_xlog_table(a INT, b INT) WITH (APPENDONLY=TRUE);
-- Store the location of xlog in a temporary table so that we can
-- use it to request walsender to start streaming from this point
CREATE TEMP TABLE tmp(startpoint TEXT);
INSERT INTO tmp SELECT pg_current_xlog_location() FROM
gp_dist_random('gp_id') WHERE gp_segment_id = 0;
-- Generate some xlog records for AO
INSERT INTO generate_ao_xlog_table VALUES(1, 10);
-- Verify that AO xlog record was received
SELECT test_xlog_ao((SELECT 'port=' || port FROM gp_segment_configuration
WHERE dbid=2),
(SELECT startpoint FROM tmp),
(SELECT oid FROM pg_tablespace WHERE spcname = 'pg_default'),
(SELECT oid FROM pg_database
WHERE datname = current_database()),
(SELECT relfilenode FROM gp_dist_random('pg_class')
WHERE relname = 'generate_ao_xlog_table'
AND gp_segment_id = 0),
(SELECT get_ao_eof('generate_ao_xlog_table')
FROM gp_dist_random('gp_id')
WHERE gp_segment_id = 0),
false) FROM
gp_dist_random('gp_id') WHERE gp_segment_id = 0;
-- Test AOCO XLogging
CREATE OR REPLACE FUNCTION get_aoco_eofs(tablename TEXT) RETURNS BIGINT[] AS
$$
DECLARE
eofvals BIGINT[];
i INT;
result RECORD;
BEGIN
i := 0;
FOR result IN SELECT * FROM gp_toolkit.__gp_aocsseg_name(tablename)
LOOP
eofvals[i] := result.eof;
i := i + 1;
END LOOP;
RETURN eofvals;
END;
$$ LANGUAGE plpgsql;
CREATE TABLE generate_aoco_xlog_table(a INT, b INT) WITH (APPENDONLY=TRUE, ORIENTATION=COLUMN);
-- Store the location of xlog in a temporary table so that we can
-- use it to request walsender to start streaming from this point
CREATE TEMP TABLE tmp(startpoint TEXT);
INSERT INTO tmp SELECT pg_current_xlog_location() FROM
gp_dist_random('gp_id') WHERE gp_segment_id = 0;
-- Generate some xlog records for AOCO
INSERT INTO generate_aoco_xlog_table VALUES(1, 10);
-- Verify that AOCO xlog record was received
SELECT test_xlog_ao((SELECT 'port=' || port FROM gp_segment_configuration
WHERE dbid=2),
(SELECT startpoint FROM tmp),
(SELECT oid FROM pg_tablespace WHERE spcname = 'pg_default'),
(SELECT oid FROM pg_database
WHERE datname = current_database()),
(SELECT relfilenode FROM gp_dist_random('pg_class')
WHERE relname = 'generate_aoco_xlog_table'
AND gp_segment_id = 0),
(SELECT get_aoco_eofs('generate_aoco_xlog_table')
FROM gp_dist_random('gp_id')
WHERE gp_segment_id = 0),
true) FROM
gp_dist_random('gp_id') WHERE gp_segment_id = 0;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册