提交 675cb961 编写于 作者: P Pengcheng Tang

Adding segment fault detection for gpcrondump.

When the segment dump process crashes, master ends up waiting
infinitely. So adding a fault detection mechanism to periodically
check if the segment dump process is alive.

Fix DDBoost restore to retrieve dump file from Data Domain Server;

DDBoost restore incorrectly assumes that the post schema dump file is
located locally, and failed the restore process.

Gpcrondump now sets the correct schema for schema altered sub partition tables;

If sub partition tables got altered and set to different schema,
gpcrondump incorrectly sets the schema for these modified tables.

Fix the usage of fmtId() to correctly consume the returned value;

fmtId() reuses the same buffer; so before calling fmtId() for the 2nd time,
consume the value in the returned buffer.

Fix restore failure when exchange partition with external tables has 'LOG ERRORS' statement.

If an external table has a "LOG ERRORS" statement in the definition, it explicitly
creates the error table. Hence, restore fails due to name conflicts with the subpartition table name.

Authors : Pengcheng Tang, Nikhil Kak, Isobel Redelmeier
上级 239d6124
......@@ -54,8 +54,8 @@ all: submake-libpq submake-libpgport cdb_dump cdb_dump_agent cdb_restore cdb_res
cdb_dump: cdb_dump.o cdb_backup_status.o cdb_seginst.o cdb_backup_state.o cdb_table.o cdb_backup_archiver.o cdb_dump_util.o cdb_dump_include.o $(PGDUMP_DIR)/common.o $(OBJS) $(KEYWRDOBJS) $(libpq_builddir)/libpq.a $(EXTRA_OBJS)
$(CC) $(CFLAGS) cdb_dump.o cdb_backup_status.o cdb_seginst.o cdb_backup_state.o cdb_table.o cdb_backup_archiver.o cdb_dump_util.o cdb_dump_include.o $(PGDUMP_DIR)/common.o $(OBJS) $(KEYWRDOBJS) $(libpq) $(LDFLAGS) $(LIBS) $(DDBOOSTLIB) -o $@
cdb_dump_agent: cdb_dump_agent.o $(PGDUMP_DIR)/common.o $(PGDUMP_DIR)/pg_dump_sort.o cdb_backup_archiver.o cdb_dump_util.o cdb_seginst.o cdb_table.o cdb_backup_status.o cdb_dump_include.o $(OBJS) $(KEYWRDOBJS) $(libpq_builddir)/libpq.a
$(CC) $(CFLAGS) cdb_dump_agent.o $(PGDUMP_DIR)/common.o $(PGDUMP_DIR)/pg_dump_sort.o cdb_dump_util.o cdb_seginst.o cdb_table.o cdb_backup_archiver.o cdb_backup_status.o cdb_dump_include.o $(OBJS) $(KEYWRDOBJS) $(libpq) $(LDFLAGS) $(LIBS) $(DDBOOSTLIB) -o $@
cdb_dump_agent: cdb_dump_agent.o $(PGDUMP_DIR)/common.o $(PGDUMP_DIR)/pg_dump_sort.o cdb_backup_archiver.o cdb_dump_util.o cdb_seginst.o cdb_table.o cdb_backup_status.o cdb_backup_state.o cdb_dump_include.o $(OBJS) $(KEYWRDOBJS) $(libpq_builddir)/libpq.a
$(CC) $(CFLAGS) cdb_dump_agent.o $(PGDUMP_DIR)/common.o $(PGDUMP_DIR)/pg_dump_sort.o cdb_dump_util.o cdb_seginst.o cdb_table.o cdb_backup_archiver.o cdb_backup_status.o cdb_backup_state.o cdb_dump_include.o $(OBJS) $(KEYWRDOBJS) $(libpq) $(LDFLAGS) $(LIBS) $(DDBOOSTLIB) -o $@
#cdb_dumpall_agent: cdb_dumpall_agent.o ../common.o cdb_backup_archiver.o cdb_dump_util.o cdb_seginst.o cdb_table.o cdb_backup_status.o $(OBJS) $(KEYWRDOBJS) $(libpq_builddir)/libpq.a
# $(CC) $(CFLAGS) cdb_dumpall_agent.o cdb_backup_archiver.o $(OBJS) $(KEYWRDOBJS) $(libpq) $(LDFLAGS) $(LIBS) -o $@
......
......@@ -11,6 +11,7 @@
#include "postgres_fe.h"
#include <assert.h>
#include "libpq-fe.h"
#include "pqexpbuffer.h"
#include "cdb_backup_status.h"
#include "cdb_dump_util.h"
#include "cdb_backup_state.h"
......@@ -92,6 +93,8 @@ CreateBackupStateMachine(const char *pszKey, int instid, int segid)
pStateMachine->pszNotifyRelNameGotLocks = MakeString("%s_%s", pStateMachine->pszNotifyRelName, SUFFIX_GOTLOCKS);
pStateMachine->pszNotifyRelNameSucceed = MakeString("%s_%s", pStateMachine->pszNotifyRelName, SUFFIX_SUCCEED);
pStateMachine->pszNotifyRelNameFail = MakeString("%s_%s", pStateMachine->pszNotifyRelName, SUFFIX_FAIL);
pStateMachine->pszNotifyRelNameMasterProbe = MakeString("%s_%s", pStateMachine->pszNotifyRelName, SUFFIX_MASTER_PROBE);
pStateMachine->pszNotifyRelNameSegmentProbe = MakeString("%s_%s", pStateMachine->pszNotifyRelName, SUFFIX_SEGMENT_PROBE);
pStateMachine->nArCount = 0;
pStateMachine->nArSize = 10;
......
......@@ -34,18 +34,20 @@ typedef enum backup_state
*/
typedef struct backup_state_machine
{
BackupState currentState;
BackupState currentState;
int nWaits;
bool bStatus;
bool bReceivedSetSerializable;
bool bReceivedGotLocks;
char *pszNotifyRelName; /* "N<backupkey>_<contentid>_<dbid>" */
char *pszNotifyRelNameStart; /* "<pszNotifyRelName>_Start" */
char* pszNotifyRelNameSetSerializable; /* "<pszNotifyRelName>_SetSerializable" */
char* pszNotifyRelNameGotLocks; /* "<pszNotifyRelName>_SetGotLocks" */
char* pszNotifyRelNameSucceed; /* "<pszNotifyRelName>_Success" */
char *pszNotifyRelNameFail; /* "<pszNotifyRelName>_Fail" */
PGnotify **ppNotifyAr;
char *pszNotifyRelName; /* "N<backupkey>_<contentid>_<dbid>" */
char *pszNotifyRelNameStart; /* "<pszNotifyRelName>_Start" */
char *pszNotifyRelNameSetSerializable; /* "<pszNotifyRelName>_SetSerializable" */
char *pszNotifyRelNameGotLocks; /* "<pszNotifyRelName>_SetGotLocks" */
char *pszNotifyRelNameSucceed; /* "<pszNotifyRelName>_Success" */
char *pszNotifyRelNameFail; /* "<pszNotifyRelName>_Fail" */
char *pszNotifyRelNameMasterProbe; /* "<pszNotifyRelName>_MasterProbe" */
char *pszNotifyRelNameSegmentProbe; /* "<pszNotifyRelName>_SegmentProbe" */
PGnotify **ppNotifyAr;
int nArSize;
int nArCount;
} BackupStateMachine;
......
......@@ -63,6 +63,8 @@ typedef struct status_op_list
#define SUFFIX_GOTLOCKS "GotLocks"
#define SUFFIX_SUCCEED "Succeed"
#define SUFFIX_FAIL "Fail"
#define SUFFIX_MASTER_PROBE "MasterProbe"
#define SUFFIX_SEGMENT_PROBE "SegmentProbe"
/* Adds a StatusOp* to the list */
extern void AddToStatusOpList(StatusOpList * pList, StatusOp * pOp);
......
......@@ -141,7 +141,6 @@ static const char *logError = "ERROR";
const char *progname;
static char * addPassThroughLongParm(const char *Parm, const char *pszValue, char *pszPassThroughParmString);
static char * shellEscape(const char *shellArg, PQExpBuffer escapeBuf);
static int dd_boost_enabled = 1;
extern ddp_client_info_t dd_client_info;
......@@ -900,48 +899,6 @@ addPassThroughLongParm(const char *Parm, const char *pszValue, char *pszPassThro
return pszRtn;
}
/*
* shellEscape: Returns a string in which the shell-significant quoted-string characters are
* escaped. The resulting string, if used as a SQL statement component, should be quoted
* using the PG $$ delimiter (or as an E-string with the '\' characters escaped again).
*
* This function escapes the following characters: '"', '$', '`', '\', '!'.
*
* The PQExpBuffer escapeBuf is used for assembling the escaped string and is reset at the
* start of this function.
*
* The return value of this function is the data area from excapeBuf.
*/
static char *
shellEscape(const char *shellArg, PQExpBuffer escapeBuf)
{
const char *s = shellArg;
const char escape = '\\';
resetPQExpBuffer(escapeBuf);
/*
* Copy the shellArg into the escapeBuf prepending any characters
* requiring an escape with the escape character.
*/
while (*s != '\0')
{
switch (*s)
{
case '"':
case '$':
case '\\':
case '`':
case '!':
appendPQExpBufferChar(escapeBuf, escape);
}
appendPQExpBufferChar(escapeBuf, *s);
s++;
}
return escapeBuf->data;
}
int
readFromDDFile(FILE *fp, char *ddBoostFileName)
{
......
......@@ -26,6 +26,7 @@
#include "cdb_dump.h"
#include "cdb_dump_include.h"
#include <poll.h>
#include <time.h>
/* This is necessary on platforms where optreset variable is not available.
* Look at "man getopt" documentation for details.
......@@ -41,7 +42,6 @@ int optreset;
*/
static char *addPassThroughParm(char Parm, const char *pszValue, char *pszPassThroughParmString);
static char *addPassThroughLongParm(const char *Parm, const char *pszValue, char *pszPassThroughParmString);
static char *shellEscape(const char *shellArg, PQExpBuffer escapeBuf);
static bool createThreadParmArray(int nCount, ThreadParmArray * pParmAr);
static void decrementFinishedLaunchCount(void);
static void decrementFinishedLockingCount(void);
......@@ -73,6 +73,8 @@ extern int optind,
opterr;
static int dump_inserts; /* dump data using proper insert strings */
static int column_inserts; /* put attr names into insert strings */
static int max_probe_retries = 10;
static int probe_interval = 60; /* 1 minute interval */
static char *dumpencoding = NULL;
static bool schemaOnly;
static bool incremental_backup;
......@@ -503,48 +505,6 @@ copyFilesToSegments(InputOptions *pInputOpts, SegmentDatabaseArray *segDBAr)
return true;
}
/*
* shellEscape: Returns a string in which the shell-significant quoted-string characters are
* escaped. The resulting string, if used as a SQL statement component, should be quoted
* using the PG $$ delimiter (or as an E-string with the '\' characters escaped again).
*
* This function escapes the following characters: '"', '$', '`', '\', '!'.
*
* The PQExpBuffer escapeBuf is used for assembling the escaped string and is reset at the
* start of this function.
*
* The return value of this function is the data area from excapeBuf.
*/
static char *
shellEscape(const char *shellArg, PQExpBuffer escapeBuf)
{
const char *s = shellArg;
const char escape = '\\';
resetPQExpBuffer(escapeBuf);
/*
* Copy the shellArg into the escapeBuf prepending any characters
* requiring an escape with the escape character.
*/
while (*s != '\0')
{
switch (*s)
{
case '"':
case '$':
case '\\':
case '`':
case '!':
appendPQExpBufferChar(escapeBuf, escape);
}
appendPQExpBufferChar(escapeBuf, *s);
s++;
}
return escapeBuf->data;
}
/*
* createThreadParmArray: This function initializes the count and pData elements of the ThreadParmArray.
......@@ -1961,6 +1921,15 @@ spinOffThreads(PGconn *pConn,
mpp_msg(logInfo, progname, "Committing transaction on the master database, thereby releasing locks.\n");
execCommit(pConn);
/* The main purpose of probing is to detect segment crashes during the
* the dump process, to prevent master from keeping pg_class lock
* or waiting for status update from segment infinitely. So once the
* pg_class lock has been released, we can increase the probing interval
* to reduce the overhead.
*/
max_probe_retries = 5;
probe_interval = 120; /* 2 minute interval */
// Create a file on the master to signal to gpcrondump that it can release its pg_class lock
// Only do this if no-lock is passed, otherwise it can cause problems if gp_dump is called directly and does its own locks
if (pParm->pTargetSegDBData->role == ROLE_MASTER && no_lock)
......@@ -2021,6 +1990,8 @@ threadProc(void *arg)
bool decrementedLunchCount = false;
bool decrementedLockCount = false;
time_t now, last;
/*
* The argument is a pointer to a ThreadParm structure that stays around
* for the entire time the program is running. so we need not worry about
......@@ -2040,8 +2011,10 @@ threadProc(void *arg)
bool bSentCancelMessage;
BackupStateMachine *pState;
PGnotify *pNotify;
int pollResult = 0;
int pollTimeout;
int retryCnt = 0;
bool notifyReceived = false;
int pollResult = 0;
int pollTimeout;
struct pollfd *pollInput;
/*
......@@ -2074,6 +2047,7 @@ threadProc(void *arg)
DoCancelNotifyListen(pConn, true, pszKey, pSegDB->role, pSegDB->dbid, -1, SUFFIX_GOTLOCKS);
DoCancelNotifyListen(pConn, true, pszKey, pSegDB->role, pSegDB->dbid, -1, SUFFIX_SUCCEED);
DoCancelNotifyListen(pConn, true, pszKey, pSegDB->role, pSegDB->dbid, -1, SUFFIX_FAIL);
DoCancelNotifyListen(pConn, true, pszKey, pSegDB->role, pSegDB->dbid, -1, SUFFIX_SEGMENT_PROBE);
mpp_msg(logInfo, progname, "Listening for messages from server on dbid %d connection\n", pSegDB->dbid);
......@@ -2156,6 +2130,9 @@ threadProc(void *arg)
* receiving these notifications
*/
time(&now);
time(&last);
while (!IsFinalState(pState))
{
/*
......@@ -2173,6 +2150,7 @@ threadProc(void *arg)
*/
DoCancelNotifyListen(pConn, false, pszKey, pSegDB->role, pSegDB->dbid, -1, NULL);
bSentCancelMessage = true;
goto cleanup;
}
/* Replacing select() by poll() here to overcome the limitations of
......@@ -2191,21 +2169,7 @@ threadProc(void *arg)
pParm->pszErrorMsg = MakeString("poll failed for backup key %s, role %d, dbid %d failed\n",
pszKey, pSegDB->role, pSegDB->dbid);
mpp_err_msg(logFatal, progname, pParm->pszErrorMsg);
PQfinish(pConn);
DestroyBackupStateMachine(pState);
if (!decrementedLunchCount)
{
decrementFinishedLaunchCount();
decrementedLunchCount = true;
}
if (!decrementedLockCount)
{
decrementFinishedLockingCount();
decrementedLockCount = true;
}
return NULL;
goto cleanup;
}
/* See whether the connection went down */
......@@ -2215,21 +2179,7 @@ threadProc(void *arg)
pParm->pszErrorMsg = MakeString("connection went down for backup key %s, role %d, dbid %d\n",
pszKey, pSegDB->role, pSegDB->dbid);
mpp_err_msg(logError, progname, pParm->pszErrorMsg);
PQfinish(pConn);
DestroyBackupStateMachine(pState);
if (!decrementedLunchCount)
{
decrementFinishedLaunchCount();
decrementedLunchCount = true;
}
if (!decrementedLockCount)
{
decrementFinishedLockingCount();
decrementedLockCount = true;
}
return NULL;
goto cleanup;
}
/* try to get any notification from the server */
......@@ -2245,7 +2195,9 @@ threadProc(void *arg)
* expect
*/
if (strncasecmp(pState->pszNotifyRelName, pNotify->relname,
strlen(pState->pszNotifyRelName)) == 0)
strlen(pState->pszNotifyRelName)) == 0 &&
strncasecmp(pState->pszNotifyRelNameSegmentProbe, pNotify->relname,
strlen(pState->pszNotifyRelNameSegmentProbe)))
{
/* add this notification to our state notification array */
if (!AddNotificationtoBackupStateMachine(pState, pNotify))
......@@ -2253,23 +2205,11 @@ threadProc(void *arg)
g_b_SendCancelMessage = true;
pParm->pszErrorMsg = MakeString("error allocating memory for Greenplum Database backup\n");
mpp_err_msg(logError, progname, pParm->pszErrorMsg);
PQfinish(pConn);
DestroyBackupStateMachine(pState);
if (!decrementedLunchCount)
{
decrementFinishedLaunchCount();
decrementedLunchCount = true;
}
if (!decrementedLockCount)
{
decrementFinishedLockingCount();
decrementedLockCount = true;
}
return NULL;
goto cleanup;
}
}
/* As long as the segment agent is responding with notifications, don't try cancelling */
notifyReceived = true;
}
ProcessInput(pState);
......@@ -2285,22 +2225,36 @@ threadProc(void *arg)
decrementedLockCount = true;
}
}
time(&now);
/*
* make sure to decrement if we haven't already, to release mutex if in
* error state
*/
if (!decrementedLunchCount)
{
decrementFinishedLaunchCount();
decrementedLunchCount = true;
}
/* Wait here if no response received from segment. Once we reach the
* probe_interval, send out next probe notification to the segment.
*/
if (notifyReceived)
{
/* reset and start next probing cycle */
notifyReceived = false;
retryCnt = 0;
last = now;
}
else if(difftime(now, last) >= probe_interval)
{
retryCnt += 1;
last = now;
if (!decrementedLockCount)
{
decrementFinishedLockingCount();
decrementedLockCount = true;
/* Sending probe notification to the segment every probe interval */
DoCancelNotifyListen(pConn, false, pszKey, pSegDB->role, pSegDB->dbid, -1, SUFFIX_MASTER_PROBE);
}
/* mark cancellation for dump agent if not receiving response for probe notifications after 10 min. */
if (retryCnt >= max_probe_retries)
{
g_b_SendCancelMessage = true;
pParm->pszErrorMsg = MakeString("Lost response from dump agent with dbid %d on host %s after 10 minutes.\n",
pSegDB->dbid, StringNotNull(pSegDB->pszHost, "localhost"));
mpp_err_msg(logError, progname, pParm->pszErrorMsg);
goto cleanup;
}
}
/*
......@@ -2361,6 +2315,21 @@ threadProc(void *arg)
pSegDB->dbid, StringNotNull(pSegDB->pszHost, "localhost"));
}
cleanup:
/*
* make sure to decrement if we haven't already, to release mutex if in
* error state
*/
if (!decrementedLunchCount)
{
decrementFinishedLaunchCount();
}
if (!decrementedLockCount)
{
decrementFinishedLockingCount();
}
PQfinish(pConn);
DestroyBackupStateMachine(pState);
......
......@@ -77,6 +77,7 @@ int optreset;
#include "dumputils.h"
#include "cdb_dump_include.h"
#include "cdb_backup_status.h"
#include "cdb_backup_state.h"
#include "cdb_dump_util.h"
#include "cdb_table.h"
#include <assert.h>
......@@ -180,7 +181,6 @@ static char *dump_prefix = NULL;
static const char *logInfo = "INFO";
static const char *logWarn = "WARN";
static const char *logError = "ERROR";
static const char *c_PUBLIC = "public";
/* MPP Additions end */
static void help(const char *progname);
......@@ -5218,7 +5218,8 @@ dumpUserMappings(Archive *fout, const char *target,
appendPQExpBuffer(q, ";\n");
resetPQExpBuffer(delq);
appendPQExpBuffer(delq, "DROP USER MAPPING FOR %s SERVER %s;\n", fmtId(umuser), fmtId(servername));
appendPQExpBuffer(delq, "DROP USER MAPPING FOR %s ", fmtId(umuser)); /* Separate as fmtId reuses same buffer. */
appendPQExpBuffer(delq, "SERVER %s;\n", fmtId(servername));
resetPQExpBuffer(tag);
appendPQExpBuffer(tag, "USER MAPPING %s %s", fmtId(umuser), target);
......@@ -5564,11 +5565,28 @@ dumpExternal(TableInfo *tbinfo, PQExpBuffer query, PQExpBuffer q, PQExpBuffer de
if (errtblname && strlen(errtblname) > 0)
{
appendPQExpBuffer(q, "LOG ERRORS ");
if(strcmp(fmtId(errtblname), fmtId(tbinfo->dobj.name)))
char *errtablename = Safe_strdup(fmtId(errtblname));
char *tablename = Safe_strdup(fmtId(tbinfo->dobj.name));
/* Check error table was not generated by LOG ERRORS statement.
* To do: deprecate the use of LOG ERRORS INTO
*/
PQExpBuffer buf = createPQExpBuffer();
appendPQExpBuffer(buf, "%s", errtblname);
appendPQExpBuffer(buf, "%s", EXT_PARTITION_NAME_POSTFIX);
char *tmpStr = Safe_strdup(fmtId(buf->data));
if(strcmp(errtablename, tablename) && strcmp(tmpStr, tablename))
{
appendPQExpBuffer(q, "INTO %s.", fmtId(errnspname));
appendPQExpBuffer(q, "%s ", fmtId(errtblname));
appendPQExpBuffer(q, "%s ", errtablename);
}
free(errtablename);
free(tablename);
free(tmpStr);
destroyPQExpBuffer(buf);
}
/* reject limit */
......@@ -5980,7 +5998,6 @@ dumpTableSchema(Archive *fout, TableInfo *tbinfo)
appendPQExpBuffer(q, "\n");
appendPQExpBuffer(q, "DROP TABLE %s; ", fmtId(tmpExtTable));
appendPQExpBuffer(q, "DROP EXTERNAL TABLE %s; ", fmtId(tmpExtTable));
appendPQExpBuffer(q, "\n");
}
......@@ -6004,10 +6021,12 @@ dumpTableSchema(Archive *fout, TableInfo *tbinfo)
char *relname = NULL;
resetPQExpBuffer(query);
/* Prefixing the quoted object name in where clause with 'E' to avoid backslash escape warnings. */
appendPQExpBuffer(query, "SELECT "
"partitionschemaname, partitiontablename FROM pg_partitions "
"WHERE partitionschemaname != schemaname AND tablename = '%s';", tbinfo->dobj.name);
"partitionschemaname, partitiontablename FROM pg_catalog.pg_partitions "
"WHERE partitionschemaname != schemaname AND tablename = ");
appendStringLiteralConn(query, tbinfo->dobj.name, g_conn);
res = PQexec(g_conn, query->data);
check_sql_result(res, g_conn, query->data, PGRES_TUPLES_OK);
......@@ -6019,10 +6038,12 @@ dumpTableSchema(Archive *fout, TableInfo *tbinfo)
{
schemaname = strdup(PQgetvalue(res, i, i_schemaname));
relname = strdup(PQgetvalue(res, i, i_relname));
appendPQExpBuffer(q, "ALTER TABLE %s ", relname);
appendPQExpBuffer(q, "SET SCHEMA %s;", schemaname);
appendPQExpBuffer(q, "ALTER TABLE %s ", fmtId(relname));
appendPQExpBuffer(q, "SET SCHEMA %s;", fmtId(schemaname));
appendPQExpBuffer(q, "\n");
free(schemaname);
free(relname);
}
PQclear(res);
......@@ -7215,6 +7236,7 @@ monitorThreadProc(void *arg __attribute__((unused)))
int PID;
PGnotify *notify;
StatusOp *pOp;
BackupStateMachine *pState;
char *pszMsg;
struct pollfd *pollInput;
int pollResult = 0;
......@@ -7230,8 +7252,15 @@ monitorThreadProc(void *arg __attribute__((unused)))
mpp_err_msg(logInfo, progname, "Starting monitor thread\n");
pState = CreateBackupStateMachine(g_CDBDumpKey, g_role, g_dbID);
if (pState == NULL)
{
mpp_err_msg(logError, progname, "Failed to allocate memory, canceling dump agent on segment with dbid %d\n", g_dbID);
pthread_kill(g_main_tid, SIGINT);
}
/* Issue Listen command */
DoCancelNotifyListen(g_conn_status, true, g_CDBDumpKey, g_role, g_dbID, -1, NULL);
DoCancelNotifyListen(g_conn_status, true, g_CDBDumpKey, g_role, g_dbID, -1, SUFFIX_MASTER_PROBE);
/* Now wait for a cancel notification */
sock = PQsocket(g_conn_status);
......@@ -7278,11 +7307,22 @@ monitorThreadProc(void *arg __attribute__((unused)))
* since this processes also issues notifies that aren't cancel
* requests.
*/
if (notify->be_pid != PID)
{
mpp_err_msg(logInfo, progname, "Notification received that we need to cancel for backup key %s\n",
if (strncasecmp(pState->pszNotifyRelNameMasterProbe, notify->relname,
strlen(pState->pszNotifyRelNameMasterProbe)) == 0)
{
/* Notify status alive */
DoCancelNotifyListen(g_conn_status, false, g_CDBDumpKey, g_role, g_dbID, -1, SUFFIX_SEGMENT_PROBE);
}
else
{
mpp_err_msg(logInfo, progname, "Notification received that we need to cancel for backup key %s\n",
g_CDBDumpKey /* , g_dbID */ );
bGotCancelRequest = true;
bGotCancelRequest = true;
break;
}
}
PQfreemem(notify);
......@@ -7291,6 +7331,7 @@ monitorThreadProc(void *arg __attribute__((unused)))
if (bGotCancelRequest)
{
mpp_err_msg(logInfo, progname, "Canceling seg with dbid %d\n", /* g_CDBDumpKey, */ g_dbID);
DestroyBackupStateMachine(pState);
pthread_kill(g_main_tid, SIGINT);
}
......@@ -7334,6 +7375,7 @@ monitorThreadProc(void *arg __attribute__((unused)))
/* Close the g_conn_status connection. */
PQfinish(g_conn_status);
DestroyBackupStateMachine(pState);
return NULL;
}
......@@ -7375,12 +7417,20 @@ addDistributedBy(PQExpBuffer q, TableInfo *tbinfo, int actual_atts)
char *policycol = NULL;
appendPQExpBuffer(query,
"SELECT attrnums from pg_namespace as n, pg_class as c, gp_distribution_policy as p "
"WHERE c.relname = '%s' "
"AND n.nspname='%s' "
"AND c.relnamespace=n.oid "
"AND c.oid = p.localoid",
tbinfo->dobj.name, (tbinfo->dobj.namespace->dobj.name != NULL ? tbinfo->dobj.namespace->dobj.name : c_PUBLIC));
"SELECT attrnums from pg_namespace as n, pg_class as c, gp_distribution_policy as p "
"WHERE c.relname = ");
appendStringLiteralConn(query, tbinfo->dobj.name, g_conn);
appendPQExpBuffer(query,
"AND c.relnamespace=n.oid "
"AND c.oid = p.localoid "
"AND n.nspname= ");
if (tbinfo->dobj.namespace->dobj.name)
appendStringLiteralConn(query, tbinfo->dobj.namespace->dobj.name, g_conn);
else
appendStringLiteralConn(query, "public", g_conn);
res = PQexec(g_conn, query->data);
check_sql_result(res, g_conn, query->data, PGRES_TUPLES_OK);
......
......@@ -549,7 +549,14 @@ Safe_strdup(const char *s)
if (s == NULL)
return NULL;
return (strdup(s));
char *res = strdup(s);
if(res == NULL)
{
mpp_err_msg("ERROR", "Safe_strdup", "Out of memory\n");
exit(1);
}
return res;
}
/* stringNotNull: This function simply returns either the Input parameter if not NULL, or the
......@@ -1031,6 +1038,7 @@ formPostDataSchemaOnlyPsqlCommandLine(char** retVal, const char* inputFileSpec,
}
}
void
formSegmentPsqlCommandLine(char** retVal, const char* inputFileSpec, bool compUsed, const char* compProg,
const char* filter_script, const char* table_filter_file,
......@@ -1053,11 +1061,11 @@ formSegmentPsqlCommandLine(char** retVal, const char* inputFileSpec, bool compUs
strncat(pszCmdLine, netbackupBlockSize, strlen(netbackupBlockSize));
}
strncat(pszCmdLine, " | ", strlen(" | "));
strncat(pszCmdLine, compProg, strlen(compProg));
strncat(pszCmdLine, compProg, strlen(compProg)); /* add compression program */
}
else
{
strcpy(pszCmdLine, catPg);
strcpy(pszCmdLine, catPg); /* add 'cat' command */
strcat(pszCmdLine, " ");
strcat(pszCmdLine, inputFileSpec);
strcat(pszCmdLine, " | ");
......@@ -1091,8 +1099,14 @@ formSegmentPsqlCommandLine(char** retVal, const char* inputFileSpec, bool compUs
{
strcat(pszCmdLine, " | ");
strcat(pszCmdLine, filter_script);
/* Add filter option for gprestore_filter.py to
* process schemas only (no data) on master.
*/
if (role == ROLE_MASTER)
strcat(pszCmdLine, " -m");
/* Add filter option with table file to filter data only for specified tables. */
strcat(pszCmdLine, " -t ");
strcat(pszCmdLine, table_filter_file);
}
......@@ -1738,7 +1752,7 @@ void
formDDBoostPsqlCommandLine(char** retVal, bool compUsed, const char* ddboostPg, const char* compProg,
const char* ddp_file_name, const char* dd_boost_buf_size,
const char* filter_script, const char* table_filter_file,
int role, const char* psqlPg)
int role, const char* psqlPg, bool postSchemaOnly)
{
char* pszCmdLine = *retVal;
......@@ -1765,7 +1779,7 @@ formDDBoostPsqlCommandLine(char** retVal, bool compUsed, const char* ddboostPg,
{
strcat(pszCmdLine, " | ");
strcat(pszCmdLine, filter_script);
if (role == ROLE_MASTER)
if (role == ROLE_MASTER && !postSchemaOnly)
{
strcat(pszCmdLine, " -m");
}
......@@ -1922,3 +1936,44 @@ void cleanUpTable()
}
/*
* shellEscape: Returns a string in which the shell-significant quoted-string characters are
* escaped. The resulting string, if used as a SQL statement component, should be quoted
* using the PG $$ delimiter (or as an E-string with the '\' characters escaped again).
*
* This function escapes the following characters: '"', '$', '`', '\', '!'.
*
* The PQExpBuffer escapeBuf is used for assembling the escaped string and is reset at the
* start of this function.
*
* The return value of this function is the data area from excapeBuf.
*/
char *
shellEscape(const char *shellArg, PQExpBuffer escapeBuf)
{
const char *s = shellArg;
const char escape = '\\';
resetPQExpBuffer(escapeBuf);
/*
* Copy the shellArg into the escapeBuf prepending any characters
* requiring an escape with the escape character.
*/
while (*s != '\0')
{
switch (*s)
{
case '"':
case '$':
case '\\':
case '`':
case '!':
appendPQExpBufferChar(escapeBuf, escape);
}
appendPQExpBufferChar(escapeBuf, *s);
s++;
}
return escapeBuf->data;
}
......@@ -161,7 +161,7 @@ extern char* formCompressionProgramString(char* compPg);
extern void formDDBoostPsqlCommandLine(char** retVal, bool compUsed, const char* ddboostPg, const char* compProg,
const char* ddp_file_name, const char* dd_boost_buf_size,
const char* filter_script, const char* table_filter_file,
int role, const char* psqlPg);
int role, const char* psqlPg, bool postSchemaOnly);
extern void formSegmentPsqlCommandLine(char** retVal, const char* inputFileSpec,
bool compUsed, const char* compProg, const char* filter_script,
const char* table_filter_file, int role, const char* psqlPg, const char* catPg,
......@@ -185,4 +185,5 @@ char getTypstorage(Oid o);
int removeNode(Oid o);
extern char* shellEscape(const char *shellArg, PQExpBuffer escapeBuf);
#endif /* CDB_DUMP_UTIL_H */
......@@ -69,7 +69,6 @@ static const char *logError = "ERROR";
static const char *logFatal = "FATAL";
const char *progname;
static char * addPassThroughLongParm(const char *Parm, const char *pszValue, char *pszPassThroughParmString);
static char * shellEscape(const char *shellArg, PQExpBuffer escapeBuf);
static char *dump_prefix = NULL;
static char *status_file = NULL;
......@@ -1880,46 +1879,3 @@ addPassThroughLongParm(const char *Parm, const char *pszValue, char *pszPassThro
return pszRtn;
}
/*
* shellEscape: Returns a string in which the shell-significant quoted-string characters are
* escaped. The resulting string, if used as a SQL statement component, should be quoted
* using the PG $$ delimiter (or as an E-string with the '\' characters escaped again).
*
* This function escapes the following characters: '"', '$', '`', '\', '!'.
*
* The PQExpBuffer escapeBuf is used for assembling the escaped string and is reset at the
* start of this function.
*
* The return value of this function is the data area from excapeBuf.
*/
static char *
shellEscape(const char *shellArg, PQExpBuffer escapeBuf)
{
const char *s = shellArg;
const char escape = '\\';
resetPQExpBuffer(escapeBuf);
/*
* Copy the shellArg into the escapeBuf prepending any characters
* requiring an escape with the escape character.
*/
while (*s != '\0')
{
switch (*s)
{
case '"':
case '$':
case '\\':
case '`':
case '!':
appendPQExpBufferChar(escapeBuf, escape);
}
appendPQExpBufferChar(escapeBuf, *s);
s++;
}
return escapeBuf->data;
}
......@@ -153,11 +153,12 @@ static char *netbackup_block_size = NULL;
int
main(int argc, char **argv)
{
PQExpBuffer valueBuf = NULL;
RestoreOptions *opts;
int c;
int exit_code = 0;
Archive *AH;
char *inputFileSpec;
char *inputFileSpec = NULL;
extern int optind;
extern char *optarg;
static int use_setsessauth = 0;
......@@ -262,9 +263,6 @@ main(int argc, char **argv)
}
}
#ifdef USE_DDBOOST
dd_boost_enabled = 0;
#endif
while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:Op:P:RsS:t:T:uU:vwWxX:",
cmdopts, NULL)) != -1)
{
......@@ -456,29 +454,6 @@ main(int argc, char **argv)
}
}
/* backup file name */
/* TODO: use findAcceptableBackupFilePathName(...) to look for the file name
* if user invoked gp_restore_agent directly without supplying a file name.
* If the agent is invoked from gp_restore_launch, then we are ok.
*/
if (optind < argc)
inputFileSpec = argv[optind];
else
inputFileSpec = NULL;
if (postDataSchemaOnly && inputFileSpec != NULL)
{
if (strstr(inputFileSpec,"_post_data") == NULL)
{
fprintf(stderr,"Adding _post_data to the end of the file name?\n");
char * newFS = malloc(strlen(inputFileSpec) + strlen("_post_data") + 1);
strcpy(newFS, inputFileSpec);
strcat(newFS, "_post_data");
inputFileSpec = newFS;
}
}
/* Should get at most one of -d and -f, else user is confused */
if (opts->dbname)
{
......@@ -555,16 +530,12 @@ main(int argc, char **argv)
mpp_err_msg(logInfo, progname, "ddboost is initialized\n");
if (!postDataSchemaOnly)
{
ddp_file_name = formDDBoostFileName(g_gpdumpKey, false, dd_boost_dir);
if (ddp_file_name == NULL)
{
mpp_err_msg(logInfo, progname, "Error in opening ddboost file\n");
exit(1);
}
}
ddp_file_name = formDDBoostFileName(g_gpdumpKey, postDataSchemaOnly, dd_boost_dir);
if (ddp_file_name == NULL)
{
mpp_err_msg(logInfo, progname, "Error in opening ddboost file\n");
exit(1);
}
}
#endif
......@@ -772,6 +743,39 @@ main(int argc, char **argv)
}
}
/* backup file name */
/* TODO: use findAcceptableBackupFilePathName(...) to look for the file name
* if user invoked gp_restore_agent directly without supplying a file name.
* If the agent is invoked from gp_restore_launch, then we are ok.
*/
if (optind < argc)
{
char *rawInputFile = argv[optind];
valueBuf = createPQExpBuffer();
inputFileSpec = shellEscape(rawInputFile, valueBuf);
}
if (inputFileSpec == NULL || inputFileSpec[0] == '\0')
{
mpp_err_msg(logError, progname, "dump file path is empty");
exit(1);
}
if (postDataSchemaOnly)
{
if (strstr(inputFileSpec,"_post_data") == NULL)
{
fprintf(stderr,"Adding _post_data to the end of the file name?\n");
char * newFS = malloc(strlen(inputFileSpec) + strlen("_post_data") + 1);
strcpy(newFS, inputFileSpec);
strcat(newFS, "_post_data");
inputFileSpec = newFS;
}
}
#ifdef USE_DDBOOST
/* find if gpddboost is present in PATH or PGPATH */
if ((ddboostPg = testProgramExists("gpddboost")) == NULL)
......@@ -783,37 +787,36 @@ main(int argc, char **argv)
/* add all the psql args to the command string */
/* Its too error prone to pre-calc the exact command line length
just allocate a chunk of memory that is not likely to be exceeded. */
pszCmdLine = (char *) calloc(MAX_COMMANDLINE_LEN, 1);
if (!postDataSchemaOnly)
{
pszCmdLine = (char *) calloc(MAX_COMMANDLINE_LEN, 1);
#ifdef USE_DDBOOST
if (dd_boost_enabled)
if (dd_boost_enabled)
{
formDDBoostPsqlCommandLine(&pszCmdLine, bCompUsed, ddboostPg, g_compPg,
ddp_file_name, dd_boost_buf_size,
filterScript, table_filter_file,
g_role, psqlPg, postDataSchemaOnly);
}
else
{
#endif
if(postDataSchemaOnly)
{
formDDBoostPsqlCommandLine(&pszCmdLine, bCompUsed, ddboostPg, g_compPg,
ddp_file_name, dd_boost_buf_size,
filterScript, table_filter_file,
g_role, psqlPg);
formPostDataSchemaOnlyPsqlCommandLine(&pszCmdLine, inputFileSpec, bCompUsed, g_compPg,
postDataFilterScript, table_filter_file, psqlPg, catPg,
gpNBURestorePg, netbackup_service_host, netbackup_block_size);
}
else
else
{
#endif
/* Non ddboost restore */
formSegmentPsqlCommandLine(&pszCmdLine, inputFileSpec, bCompUsed, g_compPg,
filterScript, table_filter_file,
filterScript, table_filter_file,
g_role, psqlPg, catPg,
gpNBURestorePg, netbackup_service_host, netbackup_block_size);
#ifdef USE_DDBOOST
}
#endif
}
else if (postDataSchemaOnly)
{
/* Right now the postdata files will be synced to segment directories before restore */
formPostDataSchemaOnlyPsqlCommandLine(&pszCmdLine, inputFileSpec, bCompUsed, g_compPg,
postDataFilterScript, table_filter_file, psqlPg, catPg,
gpNBURestorePg, netbackup_service_host, netbackup_block_size);
#ifdef USE_DDBOOST
}
#endif
strcat(pszCmdLine, " -h ");
strcat(pszCmdLine, g_targetHost);
......@@ -1004,13 +1007,14 @@ main(int argc, char **argv)
free(SegDB.pszDBUser);
if (SegDB.pszDBPswd)
free(SegDB.pszDBPswd);
if (valueBuf)
destroyPQExpBuffer(valueBuf);
PQfinish(g_conn);
if (exit_code == 0)
mpp_err_msg(logInfo, progname, "Finished successfully\n");
else
mpp_err_msg(logError, progname, "Finished with errors\n");
return exit_code;
}
......
......@@ -531,11 +531,12 @@ void test__formDDBoostPsqlCommandLine1(void **state)
const char* ddboostPg = "ddboostPg";
const char* ddp_file_name = "ddb_filename";
const char* dd_boost_buf_size = "512MB";
bool postSchemaOnly = false;
formDDBoostPsqlCommandLine(&cmdLine, compUsed, ddboostPg, compProg,
ddp_file_name, dd_boost_buf_size,
filter_script, table_filter_file,
role, psqlPg);
role, psqlPg, postSchemaOnly);
char *e = "ddboostPg --readFile --from-file=ddb_filename.gz --dd_boost_buf_size=512MB | gzip -c | filter.py -t filter.conf | psql";
assert_string_equal(cmdLine, e);
......@@ -553,11 +554,12 @@ void test__formDDBoostPsqlCommandLine2(void **state)
const char* ddboostPg = "ddboostPg";
const char* ddp_file_name = "ddb_filename";
const char* dd_boost_buf_size = "512MB";
bool postSchemaOnly = false;
formDDBoostPsqlCommandLine(&cmdLine, compUsed, ddboostPg, compProg,
ddp_file_name, dd_boost_buf_size,
NULL, NULL,
role, psqlPg);
role, psqlPg, postSchemaOnly);
char *e = "ddboostPg --readFile --from-file=ddb_filename.gz --dd_boost_buf_size=512MB | gzip -c | psql";
printf("cmdLine is %s", cmdLine);
......@@ -579,11 +581,12 @@ void test__formDDBoostPsqlCommandLine3(void **state)
const char* ddboostPg = "ddboostPg";
const char* ddp_file_name = "ddb_filename";
const char* dd_boost_buf_size = "512MB";
bool postSchemaOnly = false;
formDDBoostPsqlCommandLine(&cmdLine, compUsed, ddboostPg, compProg,
ddp_file_name, dd_boost_buf_size,
filter_script, table_filter_file,
role, psqlPg);
role, psqlPg, postSchemaOnly);
char *e = "ddboostPg --readFile --from-file=ddb_filename --dd_boost_buf_size=512MB | filter.py -t filter.conf | psql";
......@@ -602,11 +605,12 @@ void test__formDDBoostPsqlCommandLine4(void **state)
const char* ddboostPg = "ddboostPg";
const char* ddp_file_name = "ddb_filename";
const char* dd_boost_buf_size = "512MB";
bool postSchemaOnly = false;
formDDBoostPsqlCommandLine(&cmdLine, compUsed, ddboostPg, compProg,
ddp_file_name, dd_boost_buf_size,
NULL, NULL,
role, psqlPg);
role, psqlPg, postSchemaOnly);
char *e = "ddboostPg --readFile --from-file=ddb_filename --dd_boost_buf_size=512MB | psql";
assert_string_equal(cmdLine, e);
......@@ -626,11 +630,12 @@ void test__formDDBoostPsqlCommandLine5(void **state)
const char* ddboostPg = "ddboostPg";
const char* ddp_file_name = "ddb_filename";
const char* dd_boost_buf_size = "512MB";
bool postSchemaOnly = false;
formDDBoostPsqlCommandLine(&cmdLine, compUsed, ddboostPg, compProg,
ddp_file_name, dd_boost_buf_size,
filter_script, table_filter_file,
role, psqlPg);
role, psqlPg, postSchemaOnly);
char *e = "ddboostPg --readFile --from-file=ddb_filename.gz --dd_boost_buf_size=512MB | gzip -c | filter.py -m -t filter.conf | psql";
assert_string_equal(cmdLine, e);
......@@ -648,11 +653,12 @@ void test__formDDBoostPsqlCommandLine6(void **state)
const char* ddboostPg = "ddboostPg";
const char* ddp_file_name = "ddb_filename";
const char* dd_boost_buf_size = "512MB";
bool postSchemaOnly = false;
formDDBoostPsqlCommandLine(&cmdLine, compUsed, ddboostPg, compProg,
ddp_file_name, dd_boost_buf_size,
NULL, NULL,
role, psqlPg);
role, psqlPg, postSchemaOnly);
char *e = "ddboostPg --readFile --from-file=ddb_filename.gz --dd_boost_buf_size=512MB | gzip -c | psql";
assert_string_equal(cmdLine, e);
......@@ -672,11 +678,12 @@ void test__formDDBoostPsqlCommandLine7(void **state)
const char* ddboostPg = "ddboostPg";
const char* ddp_file_name = "ddb_filename";
const char* dd_boost_buf_size = "512MB";
bool postSchemaOnly =false;
formDDBoostPsqlCommandLine(&cmdLine, compUsed, ddboostPg, compProg,
ddp_file_name, dd_boost_buf_size,
filter_script, table_filter_file,
role, psqlPg);
role, psqlPg, postSchemaOnly);
char *e = "ddboostPg --readFile --from-file=ddb_filename --dd_boost_buf_size=512MB | filter.py -m -t filter.conf | psql";
assert_string_equal(cmdLine, e);
......@@ -694,16 +701,43 @@ void test__formDDBoostPsqlCommandLine8(void **state)
const char* ddboostPg = "ddboostPg";
const char* ddp_file_name = "ddb_filename";
const char* dd_boost_buf_size = "512MB";
bool postSchemaOnly = false;
formDDBoostPsqlCommandLine(&cmdLine, compUsed, ddboostPg, compProg,
ddp_file_name, dd_boost_buf_size,
NULL, NULL,
role, psqlPg);
role, psqlPg, postSchemaOnly);
char *e = "ddboostPg --readFile --from-file=ddb_filename --dd_boost_buf_size=512MB | psql";
assert_string_equal(cmdLine, e);
free(cmdLine);
}
void test__formDDBoostPsqlCommandLine_with_postSchemaOnly_and_master_role(void **state)
{
char *cmdLine = calloc(1000000, 1);
char *inputFileSpec = "fileSpec";
bool compUsed = false;
const char* compProg = "gzip -c";
int role = ROLE_MASTER;
const char* psqlPg = "psql";
const char* ddboostPg = "ddboostPg";
const char* ddp_file_name = "ddb_filename";
const char* postDataFilterScript = "gprestore_post_data_filter.py";
const char* tableFilterFile = "tablefilter";
const char* dd_boost_buf_size = "512MB";
bool postSchemaOnly = true;
formDDBoostPsqlCommandLine(&cmdLine, compUsed, ddboostPg, compProg,
ddp_file_name, dd_boost_buf_size,
postDataFilterScript, tableFilterFile,
role, psqlPg, postSchemaOnly);
char *e = "ddboostPg --readFile --from-file=ddb_filename --dd_boost_buf_size=512MB | gprestore_post_data_filter.py -t tablefilter | psql";
assert_string_equal(cmdLine, e);
free(cmdLine);
}
#endif
void test__shouldExpandChildren1(void **state)
......@@ -1053,6 +1087,7 @@ main(int argc, char* argv[])
unit_test(test__formDDBoostPsqlCommandLine6),
unit_test(test__formDDBoostPsqlCommandLine7),
unit_test(test__formDDBoostPsqlCommandLine8),
unit_test(test__formDDBoostPsqlCommandLine_with_postSchemaOnly_and_master_role),
#endif
unit_test(test__shouldExpandChildren1),
unit_test(test__shouldExpandChildren2),
......@@ -1076,7 +1111,7 @@ main(int argc, char* argv[])
unit_test(test__remove_node),
unit_test(test__remove_node_not_present),
unit_test(test__remove_node_not_present_in_list),
unit_test(test__clean_up_table),
unit_test(test__clean_up_table),
};
return run_tests(tests);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册