提交 f3aa5e30 编写于 作者: P Pengzhou Tang

Fix few defects when creating gang in asynchronous way

1. Remove retry mechanism for reader gang and non "in recovery mode" error, gp_segment_connect_timeout is default
set to 10 mins, so it should be long enough to say we temporary lost the segments.

2. Fix "in recovery mode" retry mechanism, original codes can not recognize a in-recovery-mode error.

3. Add failure details. "failed to acquire resources on one or more segments" hide too many details.

4. Only destroy all gangs when create writer gang failed, otherwise it may clean cursor opened gangs and cause
unexpected error.
上级 eb40e073
......@@ -406,8 +406,7 @@ void cdbconn_doConnect(SegmentDatabaseDescriptor *segdbDesc,
{
segdbDesc->errcode = ERRCODE_GP_INTERNAL_ERROR;
appendPQExpBuffer(&segdbDesc->error_message,
"Internal error: No motion listener port for %s\n",
segdbDesc->whoami);
"Internal error: No motion listener port");
if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
write_log("%s\n", segdbDesc->error_message.data);
......@@ -517,17 +516,15 @@ cdbconn_doConnectComplete(SegmentDatabaseDescriptor *segdbDesc)
*/
segdbDesc->motionListener = PQgetQEdetail(segdbDesc->conn);
segdbDesc->backendPid = PQbackendPID(segdbDesc->conn);
if (segdbDesc->motionListener == -1)
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("Internal error: No motion listener port for %s\n",
segdbDesc->whoami)));
else if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
if (segdbDesc->motionListener != -1 && gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
{
elog(LOG, "Connected to %s motionListenerPorts=%d/%d with options %s",
segdbDesc->whoami,
(segdbDesc->motionListener & 0x0ffff),
((segdbDesc->motionListener >> 16) & 0x0ffff),
PQoptions(segdbDesc->conn));
}
}
/* Disconnect from QE */
......
......@@ -280,7 +280,8 @@ createGang(GangType type, int gang_id, int size, int content)
/*
* Test if the connections of the primary writer gang are alive.
*/
bool isPrimaryWriterGangAlive(void)
bool
isPrimaryWriterGangAlive(void)
{
if (primaryWriterGang == NULL)
return false;
......@@ -303,15 +304,15 @@ bool isPrimaryWriterGangAlive(void)
/*
* Check the segment failure reason by comparing connection error message.
*/
bool segment_failure_due_to_recovery(SegmentDatabaseDescriptor *segdbDesc)
bool segment_failure_due_to_recovery(struct PQExpBufferData* error_message)
{
char *fatal = NULL, *message = NULL, *ptr = NULL;
int fatal_len = 0;
if (segdbDesc == NULL)
if (error_message == NULL)
return false;
message = segdbDesc->error_message.data;
message = error_message->data;
if (message == NULL)
return false;
......@@ -1692,13 +1693,6 @@ bool gangsExist(void)
availableReaderGangs1 != NIL);
}
bool readerGangsExist(void)
{
return (allocatedReaderGangsN != NIL ||
availableReaderGangsN != NIL ||
allocatedReaderGangs1 != NIL||
availableReaderGangs1 != NIL);
}
int largestGangsize(void)
{
......
......@@ -47,6 +47,7 @@ createGang_async(GangType type, int gang_id, int size, int content)
int create_gang_retry_counter = 0;
int in_recovery_mode_count = 0;
int successful_connections = 0;
bool retry = false;
ELOG_DISPATCHER_DEBUG("createGang type = %d, gang_id = %d, size = %d, content = %d",
type, gang_id, size, content);
......@@ -55,32 +56,36 @@ createGang_async(GangType type, int gang_id, int size, int content)
Assert(size == 1 || size == getgpsegmentCount());
Assert(CurrentResourceOwner != NULL);
Assert(CurrentMemoryContext == GangContext);
/* Writer gang is created before reader gangs. */
if (type == GANGTYPE_PRIMARY_WRITER)
Insist(!gangsExist());
/* Check writer gang firstly*/
if (type != GANGTYPE_PRIMARY_WRITER && !isPrimaryWriterGangAlive())
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("failed to create gang on one or more segments"),
errdetail("writer gang got broken before creating reader gangs")));
create_gang_retry:
/* If we're in a retry, we may need to reset our initial state, a bit */
newGangDefinition = NULL;
successful_connections = 0;
in_recovery_mode_count = 0;
/* Check the writer gang first. */
if (type != GANGTYPE_PRIMARY_WRITER && !isPrimaryWriterGangAlive())
{
elog(LOG, "primary writer gang is broken");
goto exit;
}
retry = false;
/* allocate and initialize a gang structure */
newGangDefinition = buildGangDefinition(type, gang_id, size, content);
Assert(newGangDefinition != NULL);
Assert(newGangDefinition->size == size);
Assert(newGangDefinition->perGangContext != NULL);
MemoryContextSwitchTo(newGangDefinition->perGangContext);
struct pollfd *fds;
struct timeval startTS;
PG_TRY();
{
struct pollfd *fds;
struct timeval startTS;
for (i = 0; i < size; i++)
{
char gpqeid[100];
......@@ -105,26 +110,13 @@ create_gang_retry:
options = makeOptions();
/* start connection in asynchronous way */
cdbconn_doConnectStart(segdbDesc, gpqeid, options);
if (cdbconn_isBadConnection(segdbDesc))
{
/*
* Log the details to the server log, but give a more
* generic error to the client. XXX: The user would
* probably prefer to see a bit more details too.
*/
ereport(LOG, (errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("Master unable to connect to %s with options %s: %s",
segdbDesc->whoami,
options,
PQerrorMessage(segdbDesc->conn))));
if (!segment_failure_due_to_recovery(segdbDesc))
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("failed to acquire resources on one or more segments")));
else
in_recovery_mode_count++;
}
if(cdbconn_isBadConnection(segdbDesc))
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("failed to create gang on one or more segments"),
errdetail("%s (%s)", PQerrorMessage(segdbDesc->conn), segdbDesc->whoami)));
}
/*
......@@ -147,7 +139,8 @@ create_gang_retry:
segdbDesc = &newGangDefinition->db_descriptors[i];
if (cdbconn_isConnectionOk(segdbDesc))
/* Skip established connections and broken connections*/
if (cdbconn_isConnectionOk(segdbDesc) || cdbconn_isBadConnection(segdbDesc))
continue;
pollStatus = PQconnectPoll(segdbDesc->conn);
......@@ -155,6 +148,10 @@ create_gang_retry:
{
case PGRES_POLLING_OK:
cdbconn_doConnectComplete(segdbDesc);
if (segdbDesc->motionListener == -1)
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("failed to create gang on one or more segments"),
errdetail("Internal error: No motion listener port (%s)", segdbDesc->whoami)));
successful_connections++;
break;
......@@ -171,17 +168,24 @@ create_gang_retry:
break;
case PGRES_POLLING_FAILED:
elog(LOG, "Failed to connect to %s", segdbDesc->whoami);
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("failed to acquire resources on one or more segments")));
if (segment_failure_due_to_recovery(&segdbDesc->conn->errorMessage))
{
in_recovery_mode_count++;
elog(LOG, "segment is in recovery mode (%s)", segdbDesc->whoami);
}
else
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("failed to create gang on one or more segments"),
errdetail("%s (%s)", PQerrorMessage(segdbDesc->conn), segdbDesc->whoami)));
}
break;
default:
elog(LOG, "Get wrong poll status when connect to %s", segdbDesc->whoami);
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("failed to acquire resources on one or more segments")));
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("failed to create gang on one or more segments"),
errdetail("unknown pollStatus")));
break;
}
}
......@@ -189,108 +193,91 @@ create_gang_retry:
if (nfds == 0)
break;
CHECK_FOR_INTERRUPTS();
timeout = getTimeout(&startTS);
/* Wait until something happens */
nready = poll(fds, nfds, timeout);
if (nready < 0)
{
int sock_errno = SOCK_ERRNO;
if (sock_errno == EINTR)
continue;
ereport(LOG, (errcode_for_socket_access(),
errmsg("poll() failed while connecting to segments")));
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("failed to acquire resources on one or more segments")));
errmsg("failed to create gang on one or more segments"),
errdetail("poll() failed: errno = %d", sock_errno)));
}
else if (nready == 0)
{
if (timeout != 0)
continue;
elog(LOG, "poll() timeout while connecting to segments");
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("failed to acquire resources on one or more segments")));
errmsg("failed to create gang on one or more segments"),
errdetail("createGang timeout after %d seconds", gp_segment_connect_timeout)));
}
}
if (successful_connections != size)
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("failed to acquire resources on one or more segments")));
ELOG_DISPATCHER_DEBUG("createGang: %d processes requested; %d successful connections %d in recovery",
size, successful_connections, in_recovery_mode_count);
MemoryContextSwitchTo(GangContext);
}
PG_CATCH();
{
MemoryContextSwitchTo(GangContext);
/*
* If this is a reader gang and the writer gang is invalid, destroy all gangs.
* This happens when one segment is reset.
*/
if (type != GANGTYPE_PRIMARY_WRITER && !isPrimaryWriterGangAlive())
{
elog(LOG, "primary writer gang is broken");
goto exit;
}
/* FTS shows some segment DBs are down, destroy all gangs. */
if (isFTSEnabled() &&
FtsTestSegmentDBIsDown(newGangDefinition->db_descriptors, size))
/* some segments are in recovery mode*/
if (successful_connections != size)
{
elog(LOG, "FTS detected some segments are down");
goto exit;
}
Assert(successful_connections + in_recovery_mode_count == size);
/* Writer gang is created before reader gangs. */
if (type == GANGTYPE_PRIMARY_WRITER)
Insist(!gangsExist());
ELOG_DISPATCHER_DEBUG("createGang: %d processes requested; %d successful connections %d in recovery",
size, successful_connections, in_recovery_mode_count);
/* FTS shows some segment DBs are down */
if (isFTSEnabled() &&
FtsTestSegmentDBIsDown(newGangDefinition->db_descriptors, size))
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("failed to create gang on one or more segments"),
errdetail("FTS detected one or more segments are down")));
/*
* Retry when any of the following condition is met:
* 1) This is the writer gang.
* 2) This is the first reader gang.
* 3) All failed segments are in recovery mode.
*/
if(gp_gang_creation_retry_count &&
create_gang_retry_counter++ < gp_gang_creation_retry_count &&
(type == GANGTYPE_PRIMARY_WRITER ||
!readerGangsExist() ||
successful_connections + in_recovery_mode_count == size))
{
disconnectAndDestroyGang(newGangDefinition);
newGangDefinition = NULL;
if ( gp_gang_creation_retry_count <= 0 ||
create_gang_retry_counter++ >= gp_gang_creation_retry_count ||
type != GANGTYPE_PRIMARY_WRITER)
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("failed to create gang on one or more segments"),
errdetail("segments is in recovery mode")));
ELOG_DISPATCHER_DEBUG("createGang: gang creation failed, but retryable.");
CHECK_FOR_INTERRUPTS();
pg_usleep(gp_gang_creation_retry_timer * 1000);
CHECK_FOR_INTERRUPTS();
goto create_gang_retry;
disconnectAndDestroyGang(newGangDefinition);
newGangDefinition = NULL;
retry = true;
}
else
}
PG_CATCH();
{
MemoryContextSwitchTo(GangContext);
disconnectAndDestroyGang(newGangDefinition);
newGangDefinition = NULL;
if (type == GANGTYPE_PRIMARY_WRITER)
{
goto exit;
disconnectAndDestroyAllGangs(true);
CheckForResetSession();
}
PG_RE_THROW();
}
PG_END_TRY();
if (retry)
{
CHECK_FOR_INTERRUPTS();
pg_usleep(gp_gang_creation_retry_timer * 1000);
CHECK_FOR_INTERRUPTS();
goto create_gang_retry;
}
setLargestGangsize(size);
return newGangDefinition;
exit:
disconnectAndDestroyGang(newGangDefinition);
disconnectAndDestroyAllGangs(true);
CheckForResetSession();
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("failed to acquire resources on one or more segments")));
return NULL;
}
static int getTimeout(const struct timeval* startTS)
......
......@@ -17,6 +17,7 @@ struct Port;
struct QueryDesc;
struct DirectDispatchInfo;
struct EState;
struct PQExpBufferData;
/*
* A gang represents a single group of workers on each connected segDB
......@@ -79,15 +80,15 @@ extern List *getAllAllocatedReaderGangs(void);
extern CdbComponentDatabases *getComponentDatabases(void);
extern bool gangsExist(void);
extern bool readerGangsExist(void);
extern struct SegmentDatabaseDescriptor *getSegmentDescriptorFromGang(const Gang *gp, int seg);
bool isPrimaryWriterGangAlive(void);
Gang *buildGangDefinition(GangType type, int gang_id, int size, int content);
void build_gpqeid_param(char *buf, int bufsz, int segIndex, bool is_writer, int gangId);
char *makeOptions(void);
bool segment_failure_due_to_recovery(struct SegmentDatabaseDescriptor *segdbDesc);
bool segment_failure_due_to_recovery(struct PQExpBufferData *segdbDesc);
/*
* disconnectAndDestroyIdleReaderGangs()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册