From 51b363ec2499a308bb7af23382d25befe6ef5d6c Mon Sep 17 00:00:00 2001 From: Bruce Momjian Date: Tue, 17 Feb 2004 03:34:35 +0000 Subject: [PATCH] Please apply this patch to contrib/dbmirror In incorperates changes from myself and a number of contributors. This update to dbmirror provides: -replication of sequence operations via setval/nextval -DBMirror.pl support for logging to syslog -changed the names of the tables to dbmirror_* (no quotes required) -Support for writitng SQL statements to files instead of directly to a slave database -More options for DBMirror.pl in the config files. Steven Singer --- contrib/dbmirror/DBMirror.pl | 335 +++++++++++++++-------- contrib/dbmirror/MirrorSetup.sql | 70 +++-- contrib/dbmirror/README.dbmirror | 75 ++++-- contrib/dbmirror/pending.c | 445 ++++++++++++++++++++++++------- 4 files changed, 671 insertions(+), 254 deletions(-) diff --git a/contrib/dbmirror/DBMirror.pl b/contrib/dbmirror/DBMirror.pl index 4a951d0a45..1eb917bf18 100755 --- a/contrib/dbmirror/DBMirror.pl +++ b/contrib/dbmirror/DBMirror.pl @@ -33,7 +33,7 @@ # # ############################################################################## -# $PostgreSQL: pgsql/contrib/dbmirror/DBMirror.pl,v 1.7 2003/11/29 22:39:19 pgsql Exp $ +# $PostgreSQL: pgsql/contrib/dbmirror/DBMirror.pl,v 1.8 2004/02/17 03:34:35 momjian Exp $ # ############################################################################## @@ -79,17 +79,17 @@ sub mirrorCommand($$$$$$); sub mirrorInsert($$$$$); sub mirrorDelete($$$$$); sub mirrorUpdate($$$$$); -sub sendQueryToSlaves($$); sub logErrorMessage($); -sub openSlaveConnection($); +sub setupSlave($); sub updateMirrorHostTable($$); - sub extractData($$); +sub extractData($$); local $::masterHost; local $::masterDb; local $::masterUser; local $::masterPassword; local $::errorThreshold=5; local $::errorEmailAddr=undef; +local $::sleepInterval=60; my %slaveInfoHash; local $::slaveInfo = \%slaveInfoHash; @@ -115,8 +115,25 @@ sub Main() { die; } - - my $connectString = "host=$::masterHost dbname=$::masterDb user=$::masterUser password=$::masterPassword"; + if (defined($::syslog)) + { + # log with syslog + require Sys::Syslog; + import Sys::Syslog qw(openlog syslog); + openlog($0, 'cons,pid', 'user'); + syslog("info", '%s', "starting $0 script with $ARGV[0]"); + } + + my $connectString; + if(defined($::masterHost)) + { + $connectString .= "host=$::masterHost "; + } + if(defined($::masterPort)) + { + $connectString .= "port=$::masterPort "; + } + $connectString .= "dbname=$::masterDb user=$::masterUser password=$::masterPassword"; $masterConn = Pg::connectdb($connectString); @@ -138,33 +155,29 @@ sub Main() { my $firstTime = 1; while(1) { if($firstTime == 0) { - sleep 60; + sleep $::sleepInterval; } $firstTime = 0; -# Open up the connection to the slave. - if(! defined $::slaveInfo->{"status"} || - $::slaveInfo->{"status"} == -1) { - openSlaveConnection($::slaveInfo); - } - + setupSlave($::slaveInfo); + - sendQueryToSlaves(undef,"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); - sendQueryToSlaves(undef,"SET CONSTRAINTS ALL DEFERRED"); #Obtain a list of pending transactions using ordering by our approximation #to the commit time. The commit time approximation is taken to be the #SeqId of the last row edit in the transaction. - my $pendingTransQuery = "SELECT pd.\"XID\",MAX(\"SeqId\") FROM \"Pending\" pd"; - $pendingTransQuery .= " LEFT JOIN \"MirroredTransaction\" mt INNER JOIN"; - $pendingTransQuery .= " \"MirrorHost\" mh ON mt.\"MirrorHostId\" = "; - $pendingTransQuery .= " mh.\"MirrorHostId\" AND mh.\"HostName\"="; - $pendingTransQuery .= " '$::slaveInfo->{\"slaveHost\"}' "; - $pendingTransQuery .= " ON pd.\"XID\""; - $pendingTransQuery .= " = mt.\"XID\" WHERE mt.\"XID\" is null "; - $pendingTransQuery .= " GROUP BY pd.\"XID\" "; - $pendingTransQuery .= " ORDER BY MAX(pd.\"SeqId\")"; + my $pendingTransQuery = "SELECT pd.XID,MAX(SeqId) FROM dbmirror_Pending pd"; + $pendingTransQuery .= " LEFT JOIN dbmirror_MirroredTransaction mt INNER JOIN"; + $pendingTransQuery .= " dbmirror_MirrorHost mh ON mt.MirrorHostId = "; + $pendingTransQuery .= " mh.MirrorHostId AND mh.SlaveName="; + $pendingTransQuery .= " '$::slaveInfo->{\"slaveName\"}' "; + $pendingTransQuery .= " ON pd.XID"; + $pendingTransQuery .= " = mt.XID WHERE mt.XID is null "; + + + $pendingTransQuery .= " GROUP BY pd.XID"; + $pendingTransQuery .= " ORDER BY MAX(pd.SeqId)"; my $pendingTransResults = $masterConn->exec($pendingTransQuery); @@ -185,13 +198,21 @@ sub Main() { my $XID = $pendingTransResults->getvalue($curTransTuple,0); my $maxSeqId = $pendingTransResults->getvalue($curTransTuple,1); my $seqId; - - my $pendingQuery = "SELECT pnd.\"SeqId\",pnd.\"TableName\","; - $pendingQuery .= " pnd.\"Op\",pnddata.\"IsKey\", pnddata.\"Data\" AS \"Data\" "; - $pendingQuery .= " FROM \"Pending\" pnd, \"PendingData\" pnddata "; - $pendingQuery .= " WHERE pnd.\"SeqId\" = pnddata.\"SeqId\" AND "; - - $pendingQuery .= " pnd.\"XID\"=$XID ORDER BY \"SeqId\", \"IsKey\" DESC"; + + + if($::slaveInfo->{'status'} eq 'FileClosed') + { + openTransactionFile($::slaveInfo,$XID); + } + + + + my $pendingQuery = "SELECT pnd.SeqId,pnd.TableName,"; + $pendingQuery .= " pnd.Op,pnddata.IsKey, pnddata.Data AS Data "; + $pendingQuery .= " FROM dbmirror_Pending pnd, dbmirror_PendingData pnddata "; + $pendingQuery .= " WHERE pnd.SeqId = pnddata.SeqId "; + + $pendingQuery .= " AND pnd.XID=$XID ORDER BY SeqId, IsKey DESC"; my $pendingResults = $masterConn->exec($pendingQuery); @@ -200,40 +221,47 @@ sub Main() { die; } - + sendQueryToSlaves($XID,"BEGIN"); my $numPending = $pendingResults->ntuples; my $curTuple = 0; - sendQueryToSlaves(undef,"BEGIN"); while ($curTuple < $numPending) { $seqId = $pendingResults->getvalue($curTuple,0); my $tableName = $pendingResults->getvalue($curTuple,1); my $op = $pendingResults->getvalue($curTuple,2); - $curTuple = mirrorCommand($seqId,$tableName,$op,$XID, $pendingResults,$curTuple) +1; - if($::slaveInfo->{"status"}==-1) { - last; - } } - #Now commit the transaction. - if($::slaveInfo->{"status"}==-1) { + + if($::slaveInfo->{'status'} ne 'DBOpen' && + $::slaveInfo->{'status'} ne 'FileOpen') + { last; } sendQueryToSlaves(undef,"COMMIT"); + #Now commit the transaction. updateMirrorHostTable($XID,$seqId); - if($commandCount > 5000) { - $commandCount = 0; - $::slaveInfo->{"status"} = -1; - $::slaveInfo->{"slaveConn"}->reset; - #Open the connection right away. - openSlaveConnection($::slaveInfo); - - } $pendingResults = undef; $curTransTuple = $curTransTuple +1; + + if($::slaveInfo->{'status'} eq 'FileOpen') + { + close ($::slaveInfo->{'TransactionFile'}); + } + elsif($::slaveInfo->{'status'} eq 'DBOpen') + { + if($commandCount > 5000) { + $commandCount = 0; + $::slaveInfo->{"status"} = 'DBClosed'; + $::slaveInfo->{"slaveConn"}->reset; + #Open the connection right away. + openSlaveConnection($::slaveInfo); + + } + } + }#while transactions left. $pendingTransResults = undef; @@ -303,6 +331,7 @@ sub mirrorCommand($$$$$$) { my $pendingResults = $_[4]; my $currentTuple = $_[5]; + if($op eq 'i') { $currentTuple = mirrorInsert($seqId,$tableName,$transId,$pendingResults ,$currentTuple); @@ -315,6 +344,10 @@ sub mirrorCommand($$$$$$) { $currentTuple = mirrorUpdate($seqId,$tableName,$transId,$pendingResults, $currentTuple); } + if($op eq 's') { + $currentTuple = mirrorSequence($seqId,$tableName,$transId,$pendingResults, + $currentTuple); + } $commandCount = $commandCount +1; if($commandCount % 100 == 0) { # print "Sent 100 commmands on SeqId $seqId \n"; @@ -411,7 +444,7 @@ sub mirrorInsert($$$$$) { $firstIteration=0; } $valuesQuery .= ")"; - sendQueryToSlaves(undef,$insertQuery . $valuesQuery); + sendQueryToSlaves($transId,$insertQuery . $valuesQuery); return $currentTuple; } @@ -491,7 +524,6 @@ sub mirrorDelete($$$$$) { $counter++; $firstField=0; } - sendQueryToSlaves($transId,$deleteQuery); return $currentTuple; } @@ -554,14 +586,12 @@ sub mirrorUpdate($$$$$) { my $transId = $_[2]; my $pendingResult = $_[3]; my $currentTuple = $_[4]; - + my $counter; my $quotedValue; my $updateQuery = "UPDATE $tableName SET "; my $currentField; - - my %keyValueHash; my %dataValueHash; my $firstIteration=1; @@ -615,12 +645,27 @@ sub mirrorUpdate($$$$$) { } $firstIteration=0; } - sendQueryToSlaves($transId,$updateQuery); return $currentTuple+1; } +sub mirrorSequence($$$$$) { + my $seqId = $_[0]; + my $sequenceName = $_[1]; + my $transId = $_[2]; + my $pendingResult = $_[3]; + my $currentTuple = $_[4]; + + + my $query; + my $sequenceValue = $pendingResult->getvalue($currentTuple,4); + $query = sprintf("select setval(%s,%s)",$sequenceName,$sequenceValue); + + sendQueryToSlaves($transId,$query); + return $currentTuple; + +} =item sendQueryToSlaves(seqId,sqlQuery) @@ -647,7 +692,7 @@ sub sendQueryToSlaves($$) { my $seqId = $_[0]; my $sqlQuery = $_[1]; - if($::slaveInfo->{"status"} == 0) { + if($::slaveInfo->{"status"} eq 'DBOpen') { my $queryResult = $::slaveInfo->{"slaveConn"}->exec($sqlQuery); unless($queryResult->resultStatus == PGRES_COMMAND_OK) { my $errorMessage; @@ -660,10 +705,18 @@ sub sendQueryToSlaves($$) { $::slaveInfo->{"status"} = -1; } } + elsif($::slaveInfo->{"status"} eq 'FileOpen' ) { + my $xfile = $::slaveInfo->{'TransactionFile'}; + print $xfile $sqlQuery . ";\n"; + } + + } + + =item logErrorMessage(error) Mails an error message to the users specified $errorEmailAddr @@ -707,41 +760,30 @@ sub logErrorMessage($) { print mailPipe "\n\n\n=================================================\n"; close mailPipe; } + + if (defined($::syslog)) + { + syslog('err', '%s (%m)', $error); + } + warn($error); $lastErrorMsg = $error; } -sub openSlaveConnection($) { +sub setupSlave($) { my $slavePtr = $_[0]; - my $slaveConn; - my $slaveConnString = "host=" . $slavePtr->{"slaveHost"}; - $slaveConnString .= " dbname=" . $slavePtr->{"slaveDb"}; - $slaveConnString .= " user=" . $slavePtr->{"slaveUser"}; - $slaveConnString .= " password=" . $slavePtr->{"slavePassword"}; - - $slaveConn = Pg::connectdb($slaveConnString); - - if($slaveConn->status != PGRES_CONNECTION_OK) { - my $errorMessage = "Can't connect to slave database " ; - $errorMessage .= $slavePtr->{"slaveHost"} . "\n"; - $errorMessage .= $slaveConn->errorMessage; - logErrorMessage($errorMessage); - $slavePtr->{"status"} = -1; - } - else { - $slavePtr->{"slaveConn"} = $slaveConn; $slavePtr->{"status"} = 0; #Determine the MirrorHostId for the slave from the master's database - my $resultSet = $masterConn->exec('SELECT "MirrorHostId" FROM ' - . ' "MirrorHost" WHERE "HostName"' - . '=\'' . $slavePtr->{"slaveHost"} + my $resultSet = $masterConn->exec('SELECT MirrorHostId FROM ' + . ' dbmirror_MirrorHost WHERE SlaveName' + . '=\'' . $slavePtr->{"slaveName"} . '\''); if($resultSet->ntuples !=1) { - my $errorMessage .= $slavePtr->{"slaveHost"} ."\n"; + my $errorMessage .= $slavePtr->{"slaveName"} ."\n"; $errorMessage .= "Has no MirrorHost entry on master\n"; logErrorMessage($errorMessage); $slavePtr->{"status"}=-1; @@ -749,14 +791,24 @@ sub openSlaveConnection($) { } $slavePtr->{"MirrorHostId"} = $resultSet->getvalue(0,0); - - - + + if(defined($::slaveInfo->{'slaveDb'})) { + # We talk directly to a slave database. + # + if($::slaveInfo->{"status"} ne 'DBOpen') + { + openSlaveConnection($::slaveInfo); + } + sendQueryToSlaves(undef,"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); + sendQueryToSlaves(undef,"SET CONSTRAINTS ALL DEFERRED"); + } + else { + $::slaveInfo->{"status"} = 'FileClosed'; } + } - =item updateMirrorHostTable(lastTransId,lastSeqId) Updates the MirroredTransaction table to reflect the fact that @@ -783,39 +835,40 @@ sub updateMirrorHostTable($$) { my $lastTransId = shift; my $lastSeqId = shift; - if($::slaveInfo->{"status"}==0) { - my $deleteTransactionQuery; - my $deleteResult; - my $updateMasterQuery = "INSERT INTO \"MirroredTransaction\" "; - $updateMasterQuery .= " (\"XID\",\"LastSeqId\",\"MirrorHostId\")"; - $updateMasterQuery .= " VALUES ($lastTransId,$lastSeqId,$::slaveInfo->{\"MirrorHostId\"}) "; - - my $updateResult = $masterConn->exec($updateMasterQuery); - unless($updateResult->resultStatus == PGRES_COMMAND_OK) { - my $errorMessage = $masterConn->errorMessage . "\n"; - $errorMessage .= $updateMasterQuery; - logErrorMessage($errorMessage); - die; - } + + + my $deleteTransactionQuery; + my $deleteResult; + my $updateMasterQuery = "INSERT INTO dbmirror_MirroredTransaction "; + $updateMasterQuery .= " (XID,LastSeqId,MirrorHostId)"; + $updateMasterQuery .= " VALUES ($lastTransId,$lastSeqId,$::slaveInfo->{\"MirrorHostId\"}) "; + + my $updateResult = $masterConn->exec($updateMasterQuery); + unless($updateResult->resultStatus == PGRES_COMMAND_OK) { + my $errorMessage = $masterConn->errorMessage . "\n"; + $errorMessage .= $updateMasterQuery; + logErrorMessage($errorMessage); + die; + } # print "Updated slaves to transaction $lastTransId\n" ; # flush STDOUT; - #If this transaction has now been mirrored to all mirror hosts - #then it can be deleted. - $deleteTransactionQuery = 'DELETE FROM "Pending" WHERE "XID"=' - . $lastTransId . ' AND (SELECT COUNT(*) FROM "MirroredTransaction"' - . ' WHERE "XID"=' . $lastTransId . ')=(SELECT COUNT(*) FROM' - . ' "MirrorHost")'; - - $deleteResult = $masterConn->exec($deleteTransactionQuery); - if($deleteResult->resultStatus!=PGRES_COMMAND_OK) { - logErrorMessage($masterConn->errorMessage . "\n" . - $deleteTransactionQuery); - die; - } - + #If this transaction has now been mirrored to all mirror hosts + #then it can be deleted. + $deleteTransactionQuery = 'DELETE FROM dbmirror_Pending WHERE XID=' + . $lastTransId . ' AND (SELECT COUNT(*) FROM dbmirror_MirroredTransaction' + . ' WHERE XID=' . $lastTransId . ')=(SELECT COUNT(*) FROM' + . ' dbmirror_MirrorHost)'; + + $deleteResult = $masterConn->exec($deleteTransactionQuery); + if($deleteResult->resultStatus!=PGRES_COMMAND_OK) { + logErrorMessage($masterConn->errorMessage . "\n" . + $deleteTransactionQuery); + die; } + + } @@ -889,3 +942,69 @@ sub extractData($$) { return %valuesHash; } + + +sub openTransactionFile($$) +{ + my $slaveInfo = shift; + my $XID =shift; +# my $now_str = localtime; + my $nowsec; + my $nowmin; + my $nowhour; + my $nowmday; + my $nowmon; + my $nowyear; + my $nowwday; + my $nowyday; + my $nowisdst; + ($nowsec,$nowmin,$nowhour,$nowmday,$nowmon,$nowyear,$nowwday,$nowyday,$nowisdst) = + localtime; + my $fileName=sprintf(">%s/%s_%d-%d-%d_%d:%d:%dXID%d.sql", $::slaveInfo->{'TransactionFileDirectory'}, + $::slaveInfo->{"MirrorHostId"},($nowyear+1900),($nowmon+1),$nowmday,$nowhour,$nowmin, + $nowsec,$XID); + + my $xfile; + open($xfile,$fileName) or die "Can't open $fileName : $!"; + + $slaveInfo->{'TransactionFile'} = $xfile; + $slaveInfo->{'status'} = 'FileOpen'; +} + + + +sub openSlaveConnection($) { + my $slavePtr = $_[0]; + my $slaveConn; + + + my $slaveConnString; + if(defined($slavePtr->{"slaveHost"})) + { + $slaveConnString .= "host=" . $slavePtr->{"slaveHost"} . " "; + } + if(defined($slavePtr->{"slavePort"})) + { + $slaveConnString .= "port=" . $slavePtr->{"slavePort"} . " "; + } + + $slaveConnString .= " dbname=" . $slavePtr->{"slaveDb"}; + $slaveConnString .= " user=" . $slavePtr->{"slaveUser"}; + $slaveConnString .= " password=" . $slavePtr->{"slavePassword"}; + + $slaveConn = Pg::connectdb($slaveConnString); + + if($slaveConn->status != PGRES_CONNECTION_OK) { + my $errorMessage = "Can't connect to slave database " ; + $errorMessage .= $slavePtr->{"slaveHost"} . "\n"; + $errorMessage .= $slaveConn->errorMessage; + logErrorMessage($errorMessage); + $slavePtr->{"status"} = 'DBFailed'; + } + else { + $slavePtr->{"slaveConn"} = $slaveConn; + $slavePtr->{"status"} = 'DBOpen'; + } + + +} diff --git a/contrib/dbmirror/MirrorSetup.sql b/contrib/dbmirror/MirrorSetup.sql index 4227ca5f39..cae686f929 100644 --- a/contrib/dbmirror/MirrorSetup.sql +++ b/contrib/dbmirror/MirrorSetup.sql @@ -1,43 +1,61 @@ +BEGIN; + +SET autocommit TO 'on'; CREATE FUNCTION "recordchange" () RETURNS trigger AS -'/usr/local/pgsql/lib/pending.so', 'recordchange' LANGUAGE 'C'; +'$libdir/pending.so', 'recordchange' LANGUAGE 'C'; + -CREATE TABLE "MirrorHost" ( -"MirrorHostId" serial, -"HostName" varchar NOT NULL, -PRIMARY KEY("MirrorHostId") -); +CREATE TABLE dbmirror_MirrorHost ( +MirrorHostId serial not null, +SlaveName varchar NOT NULL, +PRIMARY KEY(MirrorHostId) +); -CREATE TABLE "Pending" ( -"SeqId" serial, -"TableName" varchar NOT NULL, -"Op" character, -"XID" int4 NOT NULL, -PRIMARY KEY ("SeqId") +CREATE TABLE dbmirror_Pending ( +SeqId serial, +TableName Name NOT NULL, +Op character, +XID int4 NOT NULL, +PRIMARY KEY (SeqId) ); -CREATE INDEX "Pending_XID_Index" ON "Pending" ("XID"); +CREATE INDEX "dbmirror_Pending_XID_Index" ON dbmirror_Pending (XID); -CREATE TABLE "PendingData" ( -"SeqId" int4 NOT NULL, -"IsKey" bool NOT NULL, -"Data" varchar, -PRIMARY KEY ("SeqId", "IsKey") , -FOREIGN KEY ("SeqId") REFERENCES "Pending" ("SeqId") ON UPDATE CASCADE ON DELETE CASCADE +CREATE TABLE dbmirror_PendingData ( +SeqId int4 NOT NULL, +IsKey bool NOT NULL, +Data varchar, +PRIMARY KEY (SeqId, IsKey) , +FOREIGN KEY (SeqId) REFERENCES dbmirror_Pending (SeqId) ON UPDATE CASCADE ON DELETE CASCADE ); -CREATE TABLE "MirroredTransaction" ( -"XID" int4 NOT NULL, -"LastSeqId" int4 NOT NULL, -"MirrorHostId" int4 NOT NULL, -PRIMARY KEY ("XID","MirrorHostId"), -FOREIGN KEY ("MirrorHostId") REFERENCES "MirrorHost" ("MirrorHostId") ON UPDATE CASCADE ON DELETE CASCADE, -FOREIGN KEY ("LastSeqId") REFERENCES "Pending" ("SeqId") ON UPDATE +CREATE TABLE dbmirror_MirroredTransaction ( +XID int4 NOT NULL, +LastSeqId int4 NOT NULL, +MirrorHostId int4 NOT NULL, +PRIMARY KEY (XID,MirrorHostId), +FOREIGN KEY (MirrorHostId) REFERENCES dbmirror_MirrorHost (MirrorHostId) ON UPDATE CASCADE ON DELETE CASCADE, +FOREIGN KEY (LastSeqId) REFERENCES dbmirror_Pending (SeqId) ON UPDATE CASCADE ON DELETE CASCADE ); + + +UPDATE pg_proc SET proname='nextval_pg' WHERE proname='nextval'; + +CREATE FUNCTION pg_catalog.nextval(text) RETURNS int8 AS +'/usr/local/postgresql-7.4/lib/pending.so', 'nextval' LANGUAGE 'C' STRICT; + + +UPDATE pg_proc set proname='setval_pg' WHERE proname='setval'; + +CREATE FUNCTION pg_catalog.setval(text,int4) RETURNS int8 AS +'/usr/local/postgresql-7.4/lib/pending.so', 'setval' LANGUAGE 'C' STRICT; + +COMMIT; \ No newline at end of file diff --git a/contrib/dbmirror/README.dbmirror b/contrib/dbmirror/README.dbmirror index 993bbb1f94..ea38e3b3ac 100644 --- a/contrib/dbmirror/README.dbmirror +++ b/contrib/dbmirror/README.dbmirror @@ -6,7 +6,7 @@ DBMirror is a database mirroring system developed for the PostgreSQL database Written and maintained by Steven Singer(ssinger@navtechinc.com) -(c) 2001-2002 Navtech Systems Support Inc. +(c) 2001-2004 Navtech Systems Support Inc. ALL RIGHTS RESERVED Permission to use, copy, modify, and distribute this software and its @@ -57,7 +57,7 @@ Pending tables. Requirments: --------------------------------- -PostgreSQL-7.4 (Older versions are no longer supported) --Perl 5.6(Other versions might work) +-Perl 5.6 or 5.8 (Other versions might work) -PgPerl (http://gborg.postgresql.org/project/pgperl/projdisplay.php) @@ -81,13 +81,8 @@ PostgreSQL-7.4 Make Instructions: You should now have a file named pending.so that contains the trigger. -Install this file in /usr/local/pgsql/lib (or another suitable location). +Install this file in your Postgresql lib directory (/usr/local/pgsql/lib) -If you choose a different location the MirrorSetup.sql script will need -to be modified to reflect your new location. The CREATE FUNCTION command -in the MirrorSetup.sql script associates the trigger function with the -pending.so shared library. Modify the arguments to this command if you -choose to install the trigger elsewhere. 2) Run MirrorSetup.sql @@ -95,7 +90,8 @@ This file contains SQL commands to setup the Mirroring environment. This includes -Telling PostgreSQL about the "recordchange" trigger function. --Creating the Pending,PendingData, MirrorHost, MirroredTransaction tables +-Creating the dbmirror_Pending,dbmirror_PendingData,dbmirror_MirrorHost, +dbmirror_MirroredTransaction tables To execute the script use psql as follows @@ -114,17 +110,34 @@ DBMirror.pl script. See slaveDatabase.conf for a sample. The master settings refer to the master database(The one that is being mirrored). -The slave settings refer to the database that the data is being mirrored to. -The slaveHost parameter must refer to the machine name of the slave (Either -a resolvable hostname or an IP address). The value for slave host -must match the Hostname field in the MirrorHost table(See step 6). +The slave settings refer to the database that the data is being +mirrored to. -The master user must have sufficient permissions to modify the Pending -tables and to read all of the tables being mirrored. +The slaveName setting in the configuration file must match the slave +name specified in the dbmirror_MirrorHost table. + +DBMirror.pl can be run in two modes of operation: + + A) It can connect directly to the slave database. To do this specify + a slave database name and optional host and port along with a username + and password. See slaveDatabase.conf for details. + + + The master user must have sufficient permissions to modify the Pending + tables and to read all of the tables being mirrored. + + The slave user must have enough permissions on the slave database to + modify(INSERT,UPDATE,DELETE) any tables on the slave system that are being + mirrored. + + B) The SQL statements that should be executed on the slave can be + written to files which can then be executed slave database through + psql. This would be suitable for setups where their is no direct + connection between the slave database and the master. A file is + generated for each transaction in the directory specified by + TransactionFileDirectory. The file name contains the date/time the + file was created along with the transaction id. -The slave user must have enough permissions on the slave database to -modify(INSERT,UPDATE,DELETE) any tables on the slave system that are being -mirrored. 4) Add the trigger to tables. @@ -153,7 +166,7 @@ The name of the host in the MirrorHost table must exactly match the slaveHost variable for that slave in the configuration file. For example -INSERT INTO "MirrorHost" ("HostName") VALUES ('mySlaveMachine.mycompany.com'); +INSERT INTO "MirrorHost" ("SlaveName") VALUES ('backup_system'); 6) Start DBMirror.pl @@ -171,7 +184,8 @@ Any errors are printed to standard out and emailed to the address specified in the configuration file. DBMirror can be run from the master, the slave, or a third machine as long -as it is able to access both the master and slave databases. +as it is able to access both the master and slave databases(not +required if SQL files are being generated) 7) Periodically run clean_pending.pl clean_pending.pl cleans out any entries from the Pending tables that @@ -194,11 +208,28 @@ TODO(Current Limitations) ---------- -Support for selective mirroring based on the content of data. -Support for BLOB's. --Support for conflict resolution. --Batching SQL commands in DBMirror for better performance over WAN's. +-Support for multi-master mirroring with conflict resolution. -Better support for dealing with Schema changes. + +Significant Changes Since 7.4 +---------------- +-Support for mirroring SEQUENCE's +-Support for unix domain sockets +-Support for outputting slave SQL statements to a file +-Changed the names of replication tables are now named +dbmirror_pending etc.. + + + +Credits +----------- +Achilleus Mantzios + + + + Steven Singer Navtech Systems Support Inc. ssinger@navtechinc.com diff --git a/contrib/dbmirror/pending.c b/contrib/dbmirror/pending.c index 4703d30f3c..24fb71b9e2 100644 --- a/contrib/dbmirror/pending.c +++ b/contrib/dbmirror/pending.c @@ -1,6 +1,7 @@ /**************************************************************************** * pending.c - * $PostgreSQL: pgsql/contrib/dbmirror/pending.c,v 1.15 2003/11/29 22:39:19 pgsql Exp $ + * $Id: pending.c,v 1.16 2004/02/17 03:34:35 momjian Exp $ + * $PostgreSQL: pgsql/contrib/dbmirror/pending.c,v 1.16 2004/02/17 03:34:35 momjian Exp $ * * This file contains a trigger for Postgresql-7.x to record changes to tables * to a pending table for mirroring. @@ -34,35 +35,60 @@ #include #include #include +#include + enum FieldUsage { PRIMARY = 0, NONPRIMARY, ALL, NUM_FIELDUSAGE }; int storePending(char *cpTableName, HeapTuple tBeforeTuple, - HeapTuple tAfterTuple, - TupleDesc tTupdesc, - TriggerData *tpTrigdata, char cOp); + HeapTuple tAfterTuple, + TupleDesc tTupdesc, + Oid tableOid, + char cOp); + + int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc, - TriggerData *tpTrigdata); -int storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, - TriggerData *tpTrigData, int iIncludeKeyData); + Oid tableOid); +int storeData(char *cpTableName, HeapTuple tTupleData, + TupleDesc tTupleDesc,Oid tableOid,int iIncludeKeyData); int2vector *getPrimaryKey(Oid tblOid); -char *packageData(HeapTuple tTupleData, TupleDesc tTupleDecs, - TriggerData *tTrigData, +char *packageData(HeapTuple tTupleData, TupleDesc tTupleDecs, Oid tableOid, enum FieldUsage eKeyUsage); + #define BUFFER_SIZE 256 #define MAX_OID_LEN 10 -/*#define DEBUG_OUTPUT 1 */ +#define DEBUG_OUTPUT 1 extern Datum recordchange(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(recordchange); +#if defined DEBUG_OUTPUT +#define debug_msg2(x,y) elog(NOTICE,x,y) +#define debug_msg(x) elog(NOTICE,x) +#define debug_msg3(x,y,z) elog(NOTICE,x,y,z) +#else +#define debug_msg2(x,y) +#define debug_msg(x) +#define debug_msg(x,y,z) + +#endif + + + +extern Datum nextval(PG_FUNCTION_ARGS); +extern Datum setval(PG_FUNCTION_ARGS); + +int saveSequenceUpdate(const text * sequenceName, + int nextSequenceValue); + + /***************************************************************************** * The entry point for the trigger function. * The Trigger takes a single SQL 'text' argument indicating the name of the @@ -81,13 +107,15 @@ recordchange(PG_FUNCTION_ARGS) char op = 0; char *schemaname; char *fullyqualtblname; + char *pkxpress=NULL; if (fcinfo->context != NULL) { if (SPI_connect() < 0) { - elog(NOTICE, "storePending could not connect to SPI"); + ereport(ERROR,(errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("dbmirror:recordchange could not connect to SPI"))); return -1; } trigdata = (TriggerData *) fcinfo->context; @@ -124,8 +152,15 @@ recordchange(PG_FUNCTION_ARGS) beforeTuple = trigdata->tg_trigtuple; op = 'd'; } + else + { + ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), + errmsg("dbmirror:recordchange Unknown operation"))); + + } - if (storePending(fullyqualtblname, beforeTuple, afterTuple, tupdesc, trigdata, op)) + if (storePending(fullyqualtblname, beforeTuple, afterTuple, + tupdesc, retTuple->t_tableOid, op)) { /* An error occoured. Skip the operation. */ ereport(ERROR, @@ -135,10 +170,11 @@ recordchange(PG_FUNCTION_ARGS) return PointerGetDatum(NULL); } -#if defined DEBUG_OUTPUT - elog(NOTICE, "returning on success"); -#endif + debug_msg("dbmirror:recordchange returning on success"); + SPI_pfree(fullyqualtblname); + if(pkxpress != NULL) + SPI_pfree(pkxpress); SPI_finish(); return PointerGetDatum(retTuple); } @@ -160,41 +196,45 @@ int storePending(char *cpTableName, HeapTuple tBeforeTuple, HeapTuple tAfterTuple, TupleDesc tTupDesc, - TriggerData *tpTrigData, char cOp) + Oid tableOid, + char cOp) { - char *cpQueryBase = "INSERT INTO \"Pending\" (\"TableName\",\"Op\",\"XID\") VALUES ($1,$2,$3)"; + char *cpQueryBase = "INSERT INTO dbmirror_pending (TableName,Op,XID) VALUES ($1,$2,$3)"; int iResult = 0; HeapTuple tCurTuple; + char nulls[3]=" "; /* Points the current tuple(before or after) */ - Datum saPlanData[4]; - Oid taPlanArgTypes[3] = {NAMEOID, CHAROID, INT4OID}; + Datum saPlanData[3]; + Oid taPlanArgTypes[4] = {NAMEOID, + CHAROID, + INT4OID}; void *vpPlan; tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple; - vpPlan = SPI_prepare(cpQueryBase, 3, taPlanArgTypes); if (vpPlan == NULL) - elog(NOTICE, "error creating plan"); - /* SPI_saveplan(vpPlan); */ + ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), + errmsg("dbmirror:storePending error creating plan"))); + saPlanData[0] = PointerGetDatum(cpTableName); saPlanData[1] = CharGetDatum(cOp); saPlanData[2] = Int32GetDatum(GetCurrentTransactionId()); - - iResult = SPI_execp(vpPlan, saPlanData, NULL, 1); + iResult = SPI_execp(vpPlan, saPlanData, nulls, 1); if (iResult < 0) - elog(NOTICE, "storedPending fired (%s) returned %d", cpQueryBase, iResult); + elog(NOTICE, "storedPending fired (%s) returned %d", + cpQueryBase, iResult); -#if defined DEBUG_OUTPUT - elog(NOTICE, "row successfully stored in pending table"); -#endif + + debug_msg("dbmirror:storePending row successfully stored in pending table"); + if (cOp == 'd') { @@ -202,7 +242,8 @@ storePending(char *cpTableName, HeapTuple tBeforeTuple, * This is a record of a delete operation. * Just store the key data. */ - iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tpTrigData); + iResult = storeKeyInfo(cpTableName, + tBeforeTuple, tTupDesc, tableOid); } else if (cOp == 'i') { @@ -210,20 +251,22 @@ storePending(char *cpTableName, HeapTuple tBeforeTuple, * An Insert operation. * Store all data */ - iResult = storeData(cpTableName, tAfterTuple, tTupDesc, tpTrigData, TRUE); + iResult = storeData(cpTableName, tAfterTuple, + tTupDesc, tableOid,TRUE); } else { /* op must be an update. */ - iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tpTrigData); - iResult = iResult ? iResult : storeData(cpTableName, tAfterTuple, tTupDesc, - tpTrigData, TRUE); + iResult = storeKeyInfo(cpTableName, tBeforeTuple, + tTupDesc, tableOid); + iResult = iResult ? iResult : + storeData(cpTableName, tAfterTuple, tTupDesc, + tableOid,TRUE); } -#if defined DEBUG_OUTPUT - elog(NOTICE, "done storing keyinfo"); -#endif + + debug_msg("dbmirror:storePending done storing keyinfo"); return iResult; @@ -231,12 +274,11 @@ storePending(char *cpTableName, HeapTuple tBeforeTuple, int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, - TupleDesc tTupleDesc, - TriggerData *tpTrigData) + TupleDesc tTupleDesc, Oid tableOid) { Oid saPlanArgTypes[1] = {NAMEOID}; - char *insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'t',$1)"; + char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'t',$1)"; void *pplan; Datum saPlanData[1]; char *cpKeyData; @@ -250,7 +292,7 @@ storeKeyInfo(char *cpTableName, HeapTuple tTupleData, } /* pplan = SPI_saveplan(pplan); */ - cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, PRIMARY); + cpKeyData = packageData(tTupleData, tTupleDesc,tableOid, PRIMARY); if (cpKeyData == NULL) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), @@ -258,9 +300,9 @@ storeKeyInfo(char *cpTableName, HeapTuple tTupleData, errmsg("there is no PRIMARY KEY for table %s", cpTableName))); -#if defined DEBUG_OUTPUT - elog(NOTICE, "key data: %s", cpKeyData); -#endif + + debug_msg2("dbmirror:storeKeyInfo key data: %s", cpKeyData); + saPlanData[0] = PointerGetDatum(cpKeyData); iRetCode = SPI_execp(pplan, saPlanData, NULL, 1); @@ -270,12 +312,12 @@ storeKeyInfo(char *cpTableName, HeapTuple tTupleData, if (iRetCode != SPI_OK_INSERT) { - elog(NOTICE, "error inserting row in pendingDelete"); + ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION) + ,errmsg("error inserting row in pendingDelete"))); return -1; } -#if defined DEBUG_OUTPUT - elog(NOTICE, "insert successful"); -#endif + + debug_msg("insert successful"); return 0; @@ -318,12 +360,12 @@ getPrimaryKey(Oid tblOid) * Stores a copy of the non-key data for the row. *****************************************************************************/ int -storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, - TriggerData *tpTrigData, int iIncludeKeyData) +storeData(char *cpTableName, HeapTuple tTupleData, + TupleDesc tTupleDesc,Oid tableOid, int iIncludeKeyData) { Oid planArgTypes[1] = {NAMEOID}; - char *insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'f',$1)"; + char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'f',$1)"; void *pplan; Datum planData[1]; char *cpKeyData; @@ -338,9 +380,10 @@ storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, /* pplan = SPI_saveplan(pplan); */ if (iIncludeKeyData == 0) - cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, NONPRIMARY); + cpKeyData = packageData(tTupleData, tTupleDesc, + tableOid, NONPRIMARY); else - cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, ALL); + cpKeyData = packageData(tTupleData, tTupleDesc,tableOid, ALL); planData[0] = PointerGetDatum(cpKeyData); iRetValue = SPI_execp(pplan, planData, NULL, 1); @@ -353,9 +396,9 @@ storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, elog(NOTICE, "error inserting row in pendingDelete"); return -1; } -#if defined DEBUG_OUTPUT - elog(NOTICE, "insert successful"); -#endif + + debug_msg("dbmirror:storeKeyData insert successful"); + return 0; @@ -376,8 +419,7 @@ storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, * ALL implies include all fields. */ char * -packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, - TriggerData *tpTrigData, +packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, Oid tableOid, enum FieldUsage eKeyUsage) { int iNumCols; @@ -391,14 +433,17 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, if (eKeyUsage != ALL) { - tpPKeys = getPrimaryKey(tpTrigData->tg_relation->rd_id); + tpPKeys = getPrimaryKey(tableOid); if (tpPKeys == NULL) return NULL; } -#if defined DEBUG_OUTPUT + if (tpPKeys != NULL) - elog(NOTICE, "have primary keys"); -#endif + { + debug_msg("dbmirror:packageData have primary keys"); + + } + cpDataBlock = SPI_palloc(BUFFER_SIZE); iDataBlockSize = BUFFER_SIZE; iUsedDataBlock = 0; /* To account for the null */ @@ -417,49 +462,58 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, { /* Determine if this is a primary key or not. */ iIsPrimaryKey = 0; - for (iPrimaryKeyIndex = 0; (*tpPKeys)[iPrimaryKeyIndex] != 0; + for (iPrimaryKeyIndex = 0; + (*tpPKeys)[iPrimaryKeyIndex] != 0; iPrimaryKeyIndex++) { - if ((*tpPKeys)[iPrimaryKeyIndex] == iColumnCounter) + if ((*tpPKeys)[iPrimaryKeyIndex] + == iColumnCounter) { iIsPrimaryKey = 1; break; } } - if (iIsPrimaryKey ? (eKeyUsage != PRIMARY) : (eKeyUsage != NONPRIMARY)) + if (iIsPrimaryKey ? (eKeyUsage != PRIMARY) : + (eKeyUsage != NONPRIMARY)) { /** * Don't use. */ -#if defined DEBUG_OUTPUT - elog(NOTICE, "skipping column"); -#endif + + debug_msg("dbmirror:packageData skipping column"); + continue; } } /* KeyUsage!=ALL */ -#ifndef NODROPCOLUMN - if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped) - { - /** - * This column has been dropped. - * Do not mirror it. - */ - continue; - } -#endif - cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs - [iColumnCounter - 1]->attname)); -#if defined DEBUG_OUTPUT - elog(NOTICE, "field name: %s", cpFieldName); -#endif - while (iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) + 6) + + if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped) + { + /** + * This column has been dropped. + * Do not mirror it. + */ + continue; + } + + cpFieldName = DatumGetPointer(NameGetDatum + + (&tTupleDesc->attrs + [iColumnCounter - 1]->attname)); + + debug_msg2("dbmirror:packageData field name: %s", cpFieldName); + + while (iDataBlockSize - iUsedDataBlock < + strlen(cpFieldName) + 6) { - cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE); + cpDataBlock = SPI_repalloc(cpDataBlock, + iDataBlockSize + + BUFFER_SIZE); iDataBlockSize = iDataBlockSize + BUFFER_SIZE; } sprintf(cpDataBlock + iUsedDataBlock, "\"%s\"=", cpFieldName); iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName) + 3; - cpFieldData = SPI_getvalue(tTupleData, tTupleDesc, iColumnCounter); + cpFieldData = SPI_getvalue(tTupleData, tTupleDesc, + iColumnCounter); cpUnFormatedPtr = cpFieldData; cpFormatedPtr = cpDataBlock + iUsedDataBlock; @@ -477,15 +531,17 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, continue; } -#if defined DEBUG_OUTPUT - elog(NOTICE, "field data: \"%s\"", cpFieldData); - elog(NOTICE, "starting format loop"); -#endif + debug_msg2("dbmirror:packageData field data: \"%s\"", + cpFieldData); + debug_msg("dbmirror:packageData starting format loop"); + while (*cpUnFormatedPtr != 0) { while (iDataBlockSize - iUsedDataBlock < 2) { - cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE); + cpDataBlock = SPI_repalloc(cpDataBlock, + iDataBlockSize + + BUFFER_SIZE); iDataBlockSize = iDataBlockSize + BUFFER_SIZE; cpFormatedPtr = cpDataBlock + iUsedDataBlock; } @@ -505,25 +561,218 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, while (iDataBlockSize - iUsedDataBlock < 3) { - cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE); + cpDataBlock = SPI_repalloc(cpDataBlock, + iDataBlockSize + + BUFFER_SIZE); iDataBlockSize = iDataBlockSize + BUFFER_SIZE; cpFormatedPtr = cpDataBlock + iUsedDataBlock; } sprintf(cpFormatedPtr, "' "); iUsedDataBlock = iUsedDataBlock + 2; -#if defined DEBUG_OUTPUT - elog(NOTICE, "data block: \"%s\"", cpDataBlock); -#endif + + debug_msg2("dbmirror:packageData data block: \"%s\"", + cpDataBlock); } /* for iColumnCounter */ if (tpPKeys != NULL) SPI_pfree(tpPKeys); -#if defined DEBUG_OUTPUT - elog(NOTICE, "returning DataBlockSize:%d iUsedDataBlock:%d", iDataBlockSize, - iUsedDataBlock); -#endif + + debug_msg3("dbmirror:packageData returning DataBlockSize:%d iUsedDataBlock:%d", + iDataBlockSize, + iUsedDataBlock); + memset(cpDataBlock + iUsedDataBlock, 0, iDataBlockSize - iUsedDataBlock); return cpDataBlock; } + + +PG_FUNCTION_INFO_V1(setval); + +Datum setval(PG_FUNCTION_ARGS) +{ + + + text * sequenceName; + + Oid setvalArgTypes[2] = {TEXTOID,INT4OID}; + int nextValue; + void * setvalPlan=NULL; + Datum setvalData[2]; + const char * setvalQuery = "SELECT setval_pg($1,$2)"; + int ret; + + sequenceName = PG_GETARG_TEXT_P(0); + nextValue = PG_GETARG_INT32(1); + + setvalData[0] = PointerGetDatum(sequenceName); + setvalData[1] = Int32GetDatum(nextValue); + + if (SPI_connect() < 0) + { + ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("dbmirror:setval could not connect to SPI"))); + return -1; + } + + setvalPlan = SPI_prepare(setvalQuery,2,setvalArgTypes); + if(setvalPlan == NULL) + { + ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("dbmirror:setval could not prepare plan"))); + return -1; + } + + ret = SPI_execp(setvalPlan,setvalData,NULL,1); + + if(ret != SPI_OK_SELECT || SPI_processed != 1) + return -1; + + debug_msg2("dbmirror:setval: setval_pg returned ok:%d",nextValue); + + ret = saveSequenceUpdate(sequenceName,nextValue); + + SPI_pfree(setvalPlan); + + SPI_finish(); + debug_msg("dbmirror:setval about to return"); + return Int64GetDatum(nextValue); + +} + + + +PG_FUNCTION_INFO_V1(nextval); + +Datum +nextval(PG_FUNCTION_ARGS) +{ + text * sequenceName; + + const char * nextvalQuery = "SELECT nextval_pg($1)"; + Oid nextvalArgTypes[1] = {TEXTOID}; + void * nextvalPlan=NULL; + Datum nextvalData[1]; + + + int ret; + HeapTuple resTuple; + char isNull; + int nextSequenceValue; + + + + debug_msg("dbmirror:nextval Starting pending.so:nextval"); + + + sequenceName = PG_GETARG_TEXT_P(0); + + if (SPI_connect() < 0) + { + ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("dbmirror:nextval could not connect to SPI"))); + return -1; + } + + nextvalPlan = SPI_prepare(nextvalQuery,1,nextvalArgTypes); + + + debug_msg("prepared plan to call nextval_pg"); + + + if(nextvalPlan==NULL) + { + ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("dbmirror:nextval error creating plan"))); + return -1; + } + nextvalData[0] = PointerGetDatum(sequenceName); + + ret = SPI_execp(nextvalPlan,nextvalData,NULL,1); + + debug_msg("dbmirror:Executed call to nextval_pg"); + + + if(ret != SPI_OK_SELECT || SPI_processed != 1) + return -1; + + resTuple = SPI_tuptable->vals[0]; + + debug_msg("dbmirror:nextval Set resTuple"); + + nextSequenceValue =*(DatumGetPointer(SPI_getbinval(resTuple, + SPI_tuptable->tupdesc, + 1,&isNull))); + + + + debug_msg2("dbmirror:nextval Set SPI_getbinval:%d",nextSequenceValue); + + + saveSequenceUpdate(sequenceName,nextSequenceValue); + SPI_pfree(resTuple); + SPI_pfree(nextvalPlan); + + SPI_finish(); + + return Int64GetDatum(nextSequenceValue); +} + + +int +saveSequenceUpdate(const text * sequenceName, + int nextSequenceVal) +{ + + Oid insertArgTypes[2] = {TEXTOID,INT4OID}; + Oid insertDataArgTypes[1] = {NAMEOID}; + void * insertPlan=NULL; + void * insertDataPlan=NULL; + Datum insertDatum[2]; + Datum insertDataDatum[1]; + char nextSequenceText[32]; + + const char * insertQuery = + "INSERT INTO dbmirror_Pending (TableName,Op,XID) VALUES" \ + "($1,'s',$2)"; + const char * insertDataQuery = + "INSERT INTO dbmirror_PendingData(SeqId,IsKey,Data) VALUES " \ + "(currval('dbmirror_pending_seqid_seq'),'t',$1)"; + + int ret; + + + insertPlan = SPI_prepare(insertQuery,2,insertArgTypes); + insertDataPlan = SPI_prepare(insertDataQuery,1,insertDataArgTypes); + + debug_msg("Prepared insert query"); + + + if(insertPlan == NULL || insertDataPlan == NULL) + { + ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),errmsg("dbmirror:nextval error creating plan"))); + } + + insertDatum[1] = Int32GetDatum(GetCurrentTransactionId()); + insertDatum[0] = PointerGetDatum(sequenceName); + + sprintf(nextSequenceText,"%d",nextSequenceVal); + insertDataDatum[0] = PointerGetDatum(nextSequenceText); + debug_msg2("dbmirror:savesequenceupdate: Setting value %s", + nextSequenceText); + + debug_msg("dbmirror:About to execute insert query"); + + ret = SPI_execp(insertPlan,insertDatum,NULL,1); + + ret = SPI_execp(insertDataPlan,insertDataDatum,NULL,1); + + debug_msg("dbmirror:Insert query finished"); + SPI_pfree(insertPlan); + SPI_pfree(insertDataPlan); + + return ret; + +} + -- GitLab