未验证 提交 87394a7b 编写于 作者: N Ning Yu 提交者: GitHub

Retire threaded dispatcher

Now there is only the async dispatcher.  The dispatcher API interface is
kept so we might add new backend in the future.

The GUC gp_connections_per_thread is also retired which was used to
switch between the async and threaded backends.
上级 e0b06678
......@@ -1219,9 +1219,6 @@
<p>
<codeph>gp_cached_segworkers_threshold</codeph>
</p>
<p>
<codeph>gp_connections_per_thread</codeph>
</p>
<p>
<codeph>gp_enable_direct_dispatch</codeph>
</p>
......
......@@ -176,9 +176,6 @@
<li>
<xref href="#gp_connection_send_timeout"/>
</li>
<li>
<xref href="#gp_connections_per_thread"/>
</li>
<li>
<xref href="#gp_content"/>
</li>
......@@ -2536,48 +2533,6 @@
</table>
</body>
</topic>
<topic id="gp_connections_per_thread">
<title>gp_connections_per_thread</title>
<body>
<p>Controls the number of asynchronous threads (worker threads) that a Greenplum Database
query dispatcher (QD) generates when dispatching work to query executor processes on segment
instances when processing SQL queries. The value sets the number of primary segment
instances that a worker thread connects to when processing a query. For example, when the
value is 2 and there are 64 segment instances, a QD generates 32 worker threads to dispatch
a query plan work. Each thread is assigned to two segments.</p>
<p>For the default value, 0, a query dispatcher generates two types of threads: a main thread
that manages the dispatch of query plan work, and an interconnect thread. The main thread
also acts as a worker thread.</p>
<p>For a value greater than 0, a QD generates three types of threads: a main thread, one or
more worker threads, and an interconnect thread. When the value is equal to or greater than
the number of segment instances, a QD generates three threads: a main thread, a single
worker thread, and an interconnect thread.</p>
<p>The value does not need to be changed from the default unless there are known throughput
performance issues.</p>
<p>This parameter is master only and changing it requires a server restart.</p>
<table id="gp_connections_per_thread_table">
<tgroup cols="3">
<colspec colnum="1" colname="col1" colwidth="1*"/>
<colspec colnum="2" colname="col2" colwidth="1*"/>
<colspec colnum="3" colname="col3" colwidth="1*"/>
<thead>
<row>
<entry colname="col1">Value Range</entry>
<entry colname="col2">Default</entry>
<entry colname="col3">Set Classifications</entry>
</row>
</thead>
<tbody>
<row>
<entry colname="col1">integer >= 0</entry>
<entry colname="col2">0</entry>
<entry colname="col3">master<p>restart</p></entry>
</row>
</tbody>
</tgroup>
</table>
</body>
</topic>
<topic id="gp_content">
<title>gp_content</title>
<body>
......
......@@ -1463,10 +1463,6 @@
<xref href="guc-list.xml#gp_cached_segworkers_threshold" type="section"
>gp_cached_segworkers_threshold</xref>
</p>
<p>
<xref href="guc-list.xml#gp_connections_per_thread" type="section"
>gp_connections_per_thread</xref>
</p>
<p>
<xref href="guc-list.xml#gp_enable_direct_dispatch" type="section"
>gp_enable_direct_dispatch</xref>
......
......@@ -81,7 +81,6 @@
<topicref href="guc-list.xml#gp_cached_segworkers_threshold"/>
<topicref href="guc-list.xml#gp_command_count"/>
<topicref href="guc-list.xml#gp_connection_send_timeout"/>
<topicref href="guc-list.xml#gp_connections_per_thread"/>
<topicref href="guc-list.xml#gp_content"/>
<topicref href="guc-list.xml#gp_create_table_random_default_distribution"/>
<topicref href="guc-list.xml#gp_dbid"/>
......
......@@ -84,9 +84,6 @@ int gp_external_max_segs; /* max segdbs per gpfdist/gpfdists URI */
int gp_safefswritesize; /* set for safe AO writes in non-mature fs */
int gp_connections_per_thread; /* How many libpq connections are
* handled in each thread */
int gp_cached_gang_threshold; /* How many gangs to keep around from
* stmt to stmt. */
......@@ -553,26 +550,6 @@ assign_gp_role(const char *newval, void *extra)
}
/*
* Assign hook routine for "gp_connections_per_thread" option. This variable has context
* PGC_SUSET so that is can only be set by a superuser via the SET command.
* (It can also be set in config file, but not inside of PGOPTIONS.)
*
* See src/backend/util/misc/guc.c for option definition.
*/
void
assign_gp_connections_per_thread(int newval, void *extra)
{
#if FALSE
elog(DEBUG1, "assign_gp_connections_per_thread: gp_connections_per_thread=%s, newval=%d",
show_gp_connections_per_thread(), newval);
#endif
cdbdisp_setAsync(newval == 0);
cdbgang_setAsync(newval == 0);
gp_connections_per_thread = newval;
}
/*
* Show hook routine for "gp_session_role" option.
*
......
......@@ -11,5 +11,5 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS += -I$(libpq_srcdir) -I$(top_srcdir)/src/port -I$(top_srcdir)/src/backend/utils/misc
OBJS = cdbconn.o cdbdisp.o cdbdisp_thread.o cdbdisp_async.o cdbdispatchresult.o cdbdisp_dtx.o cdbdisp_query.o cdbgang.o cdbgang_thread.o cdbgang_async.o cdbpq.o
OBJS = cdbconn.o cdbdisp.o cdbdisp_async.o cdbdispatchresult.o cdbdisp_dtx.o cdbdisp_query.o cdbgang.o cdbgang_async.o cdbpq.o
include $(top_srcdir)/src/backend/common.mk
......@@ -37,9 +37,6 @@ For a query/plan, QD would build one `GANGTYPE_PRIMARY_WRITER` Gang, and several
* `CdbDispatchSetCommand`: send SET commands to all existing Gangs except those allocated for extended queries, and block to get results or error
* `CdbDispDtxProtocolCommand`: send DTX commands to the writer Gang, and block to get results or error
### Dispatcher Mode:
To improve parallelism, Dispatcher has two different implementations internally, one is using threads, the other leverages asynchronous network programming. When GUC `gp_connections_per_thread` is 0, async dispatcher is used, which is the default configuration
### Dispatcher routines:
All dispatcher routines contains few standard steps:
* CdbDispatchPlan/CdbDispatchUtilityStatement/CdbDispatchCommand/CdbDispatchSetCommand/CdbDispDtxProtocolCommand
......
......@@ -264,10 +264,6 @@ cdbconn_createSegmentDescriptor(struct CdbComponentDatabaseInfo *cdbinfo, int id
segdbDesc->identifier = identifier;
segdbDesc->isWriter = isWriter;
/* Connection error info */
segdbDesc->errcode = 0;
initPQExpBuffer(&segdbDesc->error_message);
MemoryContextSwitchTo(oldContext);
return segdbDesc;
}
......@@ -280,10 +276,6 @@ cdbconn_termSegmentDescriptor(SegmentDatabaseDescriptor *segdbDesc)
cdbconn_disconnect(segdbDesc);
/* Free the error message buffer. */
segdbDesc->errcode = 0;
termPQExpBuffer(&segdbDesc->error_message);
if (segdbDesc->whoami != NULL)
{
pfree(segdbDesc->whoami);
......@@ -291,152 +283,6 @@ cdbconn_termSegmentDescriptor(SegmentDatabaseDescriptor *segdbDesc)
}
} /* cdbconn_termSegmentDescriptor */
/*
* Connect to a QE as a client via libpq. Returns a PGconn object in
* segdbDesc->conn which can be tested for connection status.
*/
void
cdbconn_doConnect(SegmentDatabaseDescriptor *segdbDesc,
const char *gpqeid,
const char *options)
{
#define MAX_KEYWORDS 10
#define MAX_INT_STRING_LEN 20
CdbComponentDatabaseInfo *cdbinfo = segdbDesc->segment_database_info;
const char *keywords[MAX_KEYWORDS];
const char *values[MAX_KEYWORDS];
char portstr[MAX_INT_STRING_LEN];
char timeoutstr[MAX_INT_STRING_LEN];
int nkeywords = 0;
keywords[nkeywords] = "gpqeid";
values[nkeywords] = gpqeid;
nkeywords++;
/*
* Build the connection string
*/
if (options)
{
keywords[nkeywords] = "options";
values[nkeywords] = options;
nkeywords++;
}
/*
* For entry DB connection, we make sure both "hostaddr" and "host" are
* empty string. Or else, it will fall back to environment variables and
* won't use domain socket in function connectDBStart.
*
* For other QE connections, we set "hostaddr". "host" is not used.
*/
if (segdbDesc->segindex == MASTER_CONTENT_ID &&
IS_QUERY_DISPATCHER())
{
keywords[nkeywords] = "hostaddr";
values[nkeywords] = "";
nkeywords++;
}
else
{
Assert(cdbinfo->hostip != NULL);
keywords[nkeywords] = "hostaddr";
values[nkeywords] = cdbinfo->hostip;
nkeywords++;
}
keywords[nkeywords] = "host";
values[nkeywords] = "";
nkeywords++;
snprintf(portstr, sizeof(portstr), "%u", cdbinfo->port);
keywords[nkeywords] = "port";
values[nkeywords] = portstr;
nkeywords++;
if (MyProcPort->database_name)
{
keywords[nkeywords] = "dbname";
values[nkeywords] = MyProcPort->database_name;
nkeywords++;
}
Assert(MyProcPort->user_name);
keywords[nkeywords] = "user";
values[nkeywords] = MyProcPort->user_name;
nkeywords++;
snprintf(timeoutstr, sizeof(timeoutstr), "%d", gp_segment_connect_timeout);
keywords[nkeywords] = "connect_timeout";
values[nkeywords] = timeoutstr;
nkeywords++;
keywords[nkeywords] = NULL;
values[nkeywords] = NULL;
Assert(nkeywords < MAX_KEYWORDS);
/*
* Call libpq to connect
*/
segdbDesc->conn = PQconnectdbParams(keywords, values, false);
/*
* Check for connection failure.
*/
if (PQstatus(segdbDesc->conn) == CONNECTION_BAD)
{
if (!segdbDesc->errcode)
segdbDesc->errcode = ERRCODE_GP_INTERCONNECTION_ERROR;
appendPQExpBuffer(&segdbDesc->error_message, "%s", PQerrorMessage(segdbDesc->conn));
/* Don't use elog, it's not thread-safe */
if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
write_log("%s\n", segdbDesc->error_message.data);
PQfinish(segdbDesc->conn);
segdbDesc->conn = NULL;
}
/*
* Successfully connected.
*/
else
{
PQsetNoticeReceiver(segdbDesc->conn, &MPPnoticeReceiver, segdbDesc);
/*
* Command the QE to initialize its motion layer. Wait for it to
* respond giving us the TCP port number where it listens for
* connections from the gang below.
*/
segdbDesc->motionListener = cdbconn_get_motion_listener_port(segdbDesc->conn);
segdbDesc->backendPid = PQbackendPID(segdbDesc->conn);
if (segdbDesc->motionListener == 0)
{
segdbDesc->errcode = ERRCODE_INTERNAL_ERROR;
appendPQExpBuffer(&segdbDesc->error_message,
"Internal error: No motion listener port");
if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
write_log("%s\n", segdbDesc->error_message.data);
PQfinish(segdbDesc->conn);
segdbDesc->conn = NULL;
}
else
{
if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
write_log("Connected to %s motionListener=%u/%u with options: %s\n",
segdbDesc->whoami,
(segdbDesc->motionListener & 0x0ffff),
((segdbDesc->motionListener >> 16) & 0x0ffff),
options);
}
}
}
/*
* Establish socket connection via libpq.
* Caller should call PQconnectPoll to finish it up.
......@@ -644,14 +490,6 @@ cdbconn_isConnectionOk(SegmentDatabaseDescriptor *segdbDesc)
return (PQstatus(segdbDesc->conn) == CONNECTION_OK);
}
/* Reset error message buffer */
void
cdbconn_resetQEErrorMessage(SegmentDatabaseDescriptor *segdbDesc)
{
segdbDesc->errcode = 0;
resetPQExpBuffer(&segdbDesc->error_message);
}
/*
* Build text to identify this QE in error messages.
* Don't call this function in threads.
......
......@@ -21,7 +21,6 @@
#include "storage/ipc.h" /* For proc_exit_inprogress */
#include "tcop/tcopprot.h"
#include "cdb/cdbdisp.h"
#include "cdb/cdbdisp_thread.h"
#include "cdb/cdbdisp_async.h"
#include "cdb/cdbdispatchresult.h"
#include "executor/execUtils.h"
......@@ -56,7 +55,7 @@ static void destroy_dispatcher_handle(dispatcher_handle_t *h);
*/
CdbDispatchDirectDesc default_dispatch_direct_desc = {false, 0, {0}};
static DispatcherInternalFuncs *pDispatchFuncs = NULL;
static DispatcherInternalFuncs *pDispatchFuncs = &DispatcherAsyncFuncs;
/*
* cdbdisp_dispatchToGang:
......@@ -64,9 +63,7 @@ static DispatcherInternalFuncs *pDispatchFuncs = NULL;
* specified by the gang parameter. cancelOnError indicates whether an error
* occurring on one of the qExec segdbs should cause all still-executing commands to cancel
* on other qExecs. Normally this would be true. The commands are sent over the libpq
* connections that were established during cdblink_setup. They are run inside of threads.
* The number of segdbs handled by any one thread is determined by the
* guc variable gp_connections_per_thread.
* connections that were established during cdblink_setup.
*
* The caller must provide a CdbDispatchResults object having available
* resultArray slots sufficient for the number of QEs to be dispatched:
......@@ -98,11 +95,6 @@ cdbdisp_dispatchToGang(struct CdbDispatcherState *ds,
Assert(gp && gp->size > 0);
Assert(dispatchResults && dispatchResults->resultArray);
/*
* WIP: will use a function pointer for implementation later, currently
* just use an internal function to move dispatch thread related code into
* a separate file.
*/
(pDispatchFuncs->dispatchToGang) (ds, gp, sliceIndex);
markCurrentGxactDispatched();
......@@ -111,7 +103,7 @@ cdbdisp_dispatchToGang(struct CdbDispatcherState *ds,
/*
* For asynchronous dispatcher, we have to wait all dispatch to finish before we move on to query execution,
* otherwise we may get into a deadlock situation, e.g, gather motion node waiting for data,
* while segments waiting for plan. This is skipped in threaded dispatcher as data is sent in blocking style.
* while segments waiting for plan.
*/
void
cdbdisp_waitDispatchFinish(struct CdbDispatcherState *ds)
......@@ -430,22 +422,6 @@ cdbdisp_getWaitSocketFd(CdbDispatcherState *ds)
return (pDispatchFuncs->getWaitSocketFd) (ds);
}
void
cdbdisp_onProcExit(void)
{
if (pDispatchFuncs != NULL && pDispatchFuncs->procExitCallBack != NULL)
(pDispatchFuncs->procExitCallBack) ();
}
void
cdbdisp_setAsync(bool async)
{
if (async)
pDispatchFuncs = &DispatcherAsyncFuncs;
else
pDispatchFuncs = &DispatcherSyncFuncs;
}
dispatcher_handle_t *
allocate_dispatcher_handle(void)
{
......
......@@ -97,7 +97,6 @@ static int cdbdisp_getWaitSocketFd_async(struct CdbDispatcherState *ds);
DispatcherInternalFuncs DispatcherAsyncFuncs =
{
NULL,
cdbdisp_checkForCancel_async,
cdbdisp_getWaitSocketFd_async,
cdbdisp_makeDispatchParams_async,
......
......@@ -41,8 +41,6 @@
#include "cdb/cdbdisp.h"
#include "cdb/cdbdisp_query.h"
#include "cdb/cdbdisp_thread.h" /* for CdbDispatchCmdThreads and
* DispatchCommandParms */
#include "cdb/cdbdisp_dtx.h" /* for qdSerializeDtxContextInfo() */
#include "cdb/cdbdispatchresult.h"
#include "cdb/cdbcopy.h"
......
此差异已折叠。
......@@ -15,7 +15,6 @@
*/
#include "postgres.h"
#include <pthread.h>
#include "libpq-fe.h" /* prerequisite for libpq-int.h */
#include "libpq-int.h" /* PQExpBufferData */
......@@ -30,12 +29,6 @@
#include "cdb/cdbdispatchresult.h"
#include "commands/tablecmds.h"
/*
* This mutex serializes writes by dispatcher threads to the
* iFirstError and errcode fields of CdbDispatchResults objects.
*/
static pthread_mutex_t setErrcodeMutex = PTHREAD_MUTEX_INITIALIZER;
static int cdbdisp_snatchPGresults(CdbDispatchResult *dispatchResult,
struct pg_result **pgresultptrs, int maxresults);
......@@ -203,8 +196,6 @@ cdbdisp_resetResult(CdbDispatchResult *dispatchResult)
/*
* Take note of an error.
* 'errcode' is the ERRCODE_xxx value for setting the client's SQLSTATE.
* NB: This can be called from a dispatcher thread, so it must not use
* palloc/pfree or elog/ereport because they are not thread safe.
*/
void
cdbdisp_seterrcode(int errcode, /* ERRCODE_xxx or 0 */
......@@ -257,69 +248,9 @@ cdbdisp_seterrcode(int errcode, /* ERRCODE_xxx or 0 */
(meleeResults->errcode == ERRCODE_GP_INTERCONNECTION_ERROR &&
errcode != ERRCODE_GP_INTERCONNECTION_ERROR))
{
pthread_mutex_lock(&setErrcodeMutex);
if (meleeResults->errcode == 0 ||
(meleeResults->errcode == ERRCODE_GP_INTERCONNECTION_ERROR &&
errcode != ERRCODE_GP_INTERCONNECTION_ERROR))
{
meleeResults->errcode = errcode;
meleeResults->iFirstError = dispatchResult->meleeIndex;
}
pthread_mutex_unlock(&setErrcodeMutex);
}
}
/*
* Format a message, printf-style, and append to the error_message buffer.
* Also write it to stderr if logging is enabled for messages of the
* given severity level 'elevel' (for example, DEBUG1; or 0 to suppress).
* 'errcode' is the ERRCODE_xxx value for setting the client's SQLSTATE.
* NB: This can be called from a dispatcher thread, so it must not use
* palloc/pfree or elog/ereport because they are not thread safe.
*/
void
cdbdisp_appendMessage(CdbDispatchResult *dispatchResult,
int elevel, const char *fmt,...)
{
va_list args;
int msgoff;
/*
* Remember first error.
*/
cdbdisp_seterrcode(ERRCODE_GP_INTERCONNECTION_ERROR, -1, dispatchResult);
/*
* Allocate buffer if first message. Insert newline between previous
* message and new one.
*/
Assert(dispatchResult->error_message != NULL);
oneTrailingNewlinePQ(dispatchResult->error_message);
msgoff = dispatchResult->error_message->len;
/*
* Format the message and append it to the buffer.
*/
va_start(args, fmt);
appendPQExpBufferVA(dispatchResult->error_message, fmt, args);
va_end(args);
/*
* Display the message on stderr for debugging, if requested. This helps
* to clarify the actual timing of threaded events.
*/
if (elevel >= log_min_messages)
{
oneTrailingNewlinePQ(dispatchResult->error_message);
write_log("%s", dispatchResult->error_message->data + msgoff);
meleeResults->errcode = errcode;
meleeResults->iFirstError = dispatchResult->meleeIndex;
}
/*
* In case the caller wants to hand the buffer to ereport(), follow the
* ereport() convention of not ending with a newline.
*/
noTrailingNewlinePQ(dispatchResult->error_message);
}
......
......@@ -15,7 +15,6 @@
#include "postgres.h"
#include <unistd.h> /* getpid() */
#include <pthread.h>
#include <limits.h>
#include "libpq-fe.h"
......@@ -42,7 +41,6 @@
#include "cdb/cdbdisp.h" /* me */
#include "cdb/cdbdisp_query.h"
#include "cdb/cdbgang.h" /* me */
#include "cdb/cdbgang_thread.h"
#include "cdb/cdbgang_async.h"
#include "cdb/cdbtm.h" /* discardDtxTransaction() */
#include "cdb/cdbutil.h" /* CdbComponentDatabaseInfo */
......@@ -72,13 +70,29 @@ int host_segments = 0;
Gang *CurrentGangCreating = NULL;
CreateGangFunc pCreateGangFunc = NULL;
CreateGangFunc pCreateGangFunc = cdbgang_createGang_async;
static bool NeedResetSession = false;
static Oid OldTempNamespace = InvalidOid;
static void resetSessionForPrimaryGangLoss(void);
/*
* cdbgang_createGang:
*
* Creates a new gang by logging on a session to each segDB involved.
*
* call this function in GangContext memory context.
* elog ERROR or return a non-NULL gang.
*/
Gang *
cdbgang_createGang(List *segments, SegmentType segmentType)
{
Assert(pCreateGangFunc);
return pCreateGangFunc(segments, segmentType);
}
/*
* Creates a new gang by logging on a session to each segDB involved.
*
......@@ -113,7 +127,7 @@ AllocateGang(CdbDispatcherState *ds, GangType type, List *segments)
else
segmentType = SEGMENTTYPE_ANY;
newGang = pCreateGangFunc(segments, segmentType);
newGang = cdbgang_createGang(segments, segmentType);
newGang->allocated = true;
newGang->type = type;
......@@ -763,54 +777,6 @@ resetSessionForPrimaryGangLoss(void)
* Helper functions
*/
int gp_pthread_create(pthread_t *thread, void *(*start_routine) (void *),
void *arg, const char *caller)
{
int pthread_err = 0;
pthread_attr_t t_atts;
/*
* Call some init function. Before any thread is created, we need to init
* some static stuff. The main purpose is to guarantee the non-thread safe
* stuff are called in main thread, before any child thread get running.
* Note these staic data structure should be read only after init. Thread
* creation is a barrier, so there is no need to get lock before we use
* these data structures.
*
* So far, we know we need to do this for getpwuid_r (See MPP-1971, glibc
* getpwuid_r is not thread safe).
*/
#ifndef WIN32
get_gp_passwdptr();
#endif
/*
* save ourselves some memory: the defaults for thread stack size are
* large (1M+)
*/
pthread_err = pthread_attr_init(&t_atts);
if (pthread_err != 0)
{
elog(LOG, "%s: pthread_attr_init failed. Error %d", caller, pthread_err);
return pthread_err;
}
pthread_err = pthread_attr_setstacksize(&t_atts,
Max(PTHREAD_STACK_MIN, (256 * 1024)));
if (pthread_err != 0)
{
elog(LOG, "%s: pthread_attr_setstacksize failed. Error %d", caller, pthread_err);
pthread_attr_destroy(&t_atts);
return pthread_err;
}
pthread_err = pthread_create(thread, &t_atts, start_routine, arg);
pthread_attr_destroy(&t_atts);
return pthread_err;
}
const char *
gangTypeToString(GangType type)
{
......@@ -864,15 +830,6 @@ GangOK(Gang *gp)
return true;
}
void
cdbgang_setAsync(bool async)
{
if (async)
pCreateGangFunc = pCreateGangFuncAsync;
else
pCreateGangFunc = pCreateGangFuncThreaded;
}
void
RecycleGang(Gang *gp, bool forceDestroy)
{
......
......@@ -30,14 +30,12 @@
#include "libpq-int.h"
#include "cdb/cdbfts.h"
#include "cdb/cdbgang.h"
#include "cdb/cdbgang_async.h"
#include "cdb/cdbvars.h"
#include "miscadmin.h"
#include "utils/resowner.h"
static int getPollTimeout(const struct timeval *startTS);
static Gang *createGang_async(List *segments, SegmentType segmentType);
CreateGangFunc pCreateGangFuncAsync = createGang_async;
/*
* Creates a new gang by logging on a session to each segDB involved.
......@@ -45,8 +43,8 @@ CreateGangFunc pCreateGangFuncAsync = createGang_async;
* call this function in GangContext memory context.
* elog ERROR or return a non-NULL gang.
*/
static Gang *
createGang_async(List *segments, SegmentType segmentType)
Gang *
cdbgang_createGang_async(List *segments, SegmentType segmentType)
{
PostgresPollingStatusType *pollingStatus = NULL;
SegmentDatabaseDescriptor *segdbDesc = NULL;
......
/*-------------------------------------------------------------------------
*
* cdbgang_thread.c
* Functions for multi-thread implementation of creating gang.
*
* Portions Copyright (c) 2005-2008, Greenplum inc
* Portions Copyright (c) 2012-Present Pivotal Software, Inc.
*
*
* IDENTIFICATION
* src/backend/cdb/dispatcher/cdbgang_thread.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <pthread.h>
#include <limits.h>
#include "storage/ipc.h" /* For proc_exit_inprogress */
#include "tcop/tcopprot.h"
#include "libpq-fe.h"
#include "libpq-int.h"
#include "cdb/cdbfts.h"
#include "cdb/cdbgang.h"
#include "cdb/cdbvars.h"
#include "miscadmin.h"
#include "utils/resowner.h"
/*
* Parameter structure for the DoConnect threads
*/
typedef struct DoConnectParms
{
/*
* db_count: The number of segdbs that this thread is responsible for
* connecting to. Equals the count of segdbDescPtrArray below.
*/
int db_count;
/*
* segdbDescPtrArray: Array of SegmentDatabaseDescriptor* 's that this
* thread is responsible for connecting to. Has size equal to db_count.
*/
SegmentDatabaseDescriptor **segdbDescPtrArray;
/* type of gang. */
GangType type;
int gangId;
/* connect options. GUC etc. */
char *connectOptions;
/* The pthread_t thread handle. */
pthread_t thread;
} DoConnectParms;
static DoConnectParms *makeConnectParms(int parmsCount, GangType type, int gangId);
static void destroyConnectParms(DoConnectParms *doConnectParmsAr, int count);
static void *thread_DoConnect(void *arg);
static void checkConnectionStatus(Gang *gp,
int *countInRecovery,
int *countSuccessful,
struct PQExpBufferData *errorMessage);
static Gang *createGang_thread(List *segments, SegmentType segmentType);
CreateGangFunc pCreateGangFuncThreaded = createGang_thread;
/*
* Creates a new gang by logging on a session to each segDB involved.
*
* call this function in GangContext memory context.
* elog ERROR or return a non-NULL gang.
*/
static Gang *
createGang_thread(List *segments, SegmentType segmentType)
{
Gang *newGangDefinition = NULL;
SegmentDatabaseDescriptor *segdbDesc = NULL;
DoConnectParms *doConnectParmsAr = NULL;
DoConnectParms *pParms = NULL;
int parmIndex = 0;
int threadCount = 0;
int i = 0;
int create_gang_retry_counter = 0;
int in_recovery_mode_count = 0;
int successful_connections = 0;
int size;
PQExpBufferData create_gang_error;
size = list_length(segments);
ELOG_DISPATCHER_DEBUG("createGang :size %d", size);
/* check arguments */
Assert(gp_connections_per_thread > 0);
initPQExpBuffer(&create_gang_error);
Assert(CurrentGangCreating == NULL);
/*
* If we're in a retry, we may need to reset our initial state a bit. We
* also want to ensure that all resources have been released.
*/
Assert(newGangDefinition == NULL);
Assert(doConnectParmsAr == NULL);
successful_connections = 0;
in_recovery_mode_count = 0;
threadCount = 0;
/* allocate and initialize a gang structure */
newGangDefinition = buildGangDefinition(segments, segmentType);
CurrentGangCreating = newGangDefinition;
create_gang_retry:
Assert(newGangDefinition != NULL);
resetPQExpBuffer(&create_gang_error);
/*
* The most threads we could have is segdb_count /
* gp_connections_per_thread, rounded up. This is equivalent to 1 +
* (segdb_count-1) / gp_connections_per_thread. We allocate enough memory
* for this many DoConnectParms structures, even though we may not use
* them all.
*/
threadCount = 1 + (size - 1) / gp_connections_per_thread;
Assert(threadCount > 0);
/* initialize connect parameters */
doConnectParmsAr = makeConnectParms(threadCount, GANGTYPE_UNALLOCATED, -1);
for (i = 0; i < size; i++)
{
parmIndex = i / gp_connections_per_thread;
pParms = &doConnectParmsAr[parmIndex];
segdbDesc = newGangDefinition->db_descriptors[i];
pParms->segdbDescPtrArray[pParms->db_count++] = segdbDesc;
}
/* start threads and doing the connect */
for (i = 0; i < threadCount; i++)
{
int pthread_err;
pParms = &doConnectParmsAr[i];
ELOG_DISPATCHER_DEBUG("createGang creating thread %d of %d for libpq connections",
i + 1, threadCount);
pthread_err = gp_pthread_create(&pParms->thread, thread_DoConnect, pParms, "createGang");
if (pthread_err != 0)
{
int j;
/*
* Error during thread create (this should be caused by resource
* constraints). If we leave the threads running, they'll
* immediately have some problems -- so we need to join them, and
* *then* we can issue our FATAL error
*/
for (j = 0; j < i; j++)
{
pthread_join(doConnectParmsAr[j].thread, NULL);
}
ereport(FATAL, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("failed to create thread %d of %d", i + 1, threadCount),
errdetail("pthread_create() failed with err %d", pthread_err)));
}
}
/*
* wait for all of the DoConnect threads to complete.
*/
for (i = 0; i < threadCount; i++)
{
ELOG_DISPATCHER_DEBUG("joining to thread %d of %d for libpq connections",
i + 1, threadCount);
if (0 != pthread_join(doConnectParmsAr[i].thread, NULL))
{
elog(FATAL, "could not create segworker group");
}
}
/*
* Free the memory allocated for the threadParms array
*/
destroyConnectParms(doConnectParmsAr, threadCount);
doConnectParmsAr = NULL;
SIMPLE_FAULT_INJECTOR(GangCreated);
/* find out the successful connections and the failed ones */
checkConnectionStatus(newGangDefinition, &in_recovery_mode_count,
&successful_connections, &create_gang_error);
ELOG_DISPATCHER_DEBUG("createGang: %d processes requested; %d successful connections %d in recovery",
size, successful_connections, in_recovery_mode_count);
if (size == successful_connections)
{
termPQExpBuffer(&create_gang_error);
CurrentGangCreating = NULL;
return newGangDefinition;
}
/* there'er failed connections */
FtsNotifyProber();
/* FTS shows some segment DBs are down, destroy all gangs. */
if (FtsTestSegmentDBIsDown(newGangDefinition->db_descriptors, size))
{
appendPQExpBuffer(&create_gang_error, "FTS detected one or more segments are down\n");
goto exit;
}
/* failure due to recovery */
if (successful_connections + in_recovery_mode_count == size)
{
if (gp_gang_creation_retry_count &&
create_gang_retry_counter++ < gp_gang_creation_retry_count)
{
ELOG_DISPATCHER_DEBUG("createGang: gang creation failed, but retryable.");
CHECK_FOR_INTERRUPTS();
pg_usleep(gp_gang_creation_retry_timer * 1000);
CHECK_FOR_INTERRUPTS();
goto create_gang_retry;
}
appendPQExpBuffer(&create_gang_error, "segment(s) are in recovery mode\n");
}
exit:
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
errmsg("failed to acquire resources on one or more segments"),
errdetail("%s", create_gang_error.data)));
return NULL;
}
/*
* Thread procedure.
* Perform the connect.
*/
static void *
thread_DoConnect(void *arg)
{
DoConnectParms *pParms = (DoConnectParms *) arg;
SegmentDatabaseDescriptor **segdbDescPtrArray = pParms->segdbDescPtrArray;
int db_count = pParms->db_count;
SegmentDatabaseDescriptor *segdbDesc = NULL;
int i = 0;
gp_set_thread_sigmasks();
/*
* The pParms contains an array of SegmentDatabaseDescriptors to connect
* to.
*/
for (i = 0; i < db_count; i++)
{
bool ret;
char gpqeid[100];
segdbDesc = segdbDescPtrArray[i];
if (segdbDesc == NULL || segdbDesc->segment_database_info == NULL)
{
write_log("thread_DoConnect: bad segment definition during gang creation %d/%d\n", i, db_count);
continue;
}
/* if it's a cached QE, skip */
if (segdbDesc->conn != NULL && !cdbconn_isBadConnection(segdbDesc))
continue;
/*
* Build the connection string. Writer-ness needs to be processed
* early enough now some locks are taken before command line options
* are recognized.
*/
ret = build_gpqeid_param(gpqeid, sizeof(gpqeid),
segdbDesc->isWriter,
segdbDesc->identifier,
segdbDesc->segment_database_info->hostSegs);
if (!ret)
{
segdbDesc->errcode = ERRCODE_INTERNAL_ERROR;
appendPQExpBuffer(&segdbDesc->error_message,
"Internal error: unable to construct connection string");
write_log("thread_DoConnect: unable to construct connection string for segdb %i", i);
continue;
}
/* check the result in createGang */
cdbconn_doConnect(segdbDesc, gpqeid, pParms->connectOptions);
}
return (NULL);
}
/*
* Initialize a DoConnectParms structure.
*
* Including initialize the connect option string.
*/
static DoConnectParms *
makeConnectParms(int parmsCount, GangType type, int gangId)
{
DoConnectParms *doConnectParmsAr =
(DoConnectParms *) palloc0(parmsCount * sizeof(DoConnectParms));
DoConnectParms *pParms = NULL;
int segdbPerThread = gp_connections_per_thread;
int i = 0;
for (i = 0; i < parmsCount; i++)
{
pParms = &doConnectParmsAr[i];
pParms->segdbDescPtrArray =
(SegmentDatabaseDescriptor **) palloc0(segdbPerThread * sizeof(SegmentDatabaseDescriptor *));
MemSet(&pParms->thread, 0, sizeof(pthread_t));
pParms->db_count = 0;
pParms->type = type;
pParms->connectOptions = makeOptions();
pParms->gangId = gangId;
}
return doConnectParmsAr;
}
/*
* Free all the memory allocated in DoConnectParms.
*/
static void
destroyConnectParms(DoConnectParms *doConnectParmsAr, int count)
{
if (doConnectParmsAr != NULL)
{
int i = 0;
for (i = 0; i < count; i++)
{
DoConnectParms *pParms = &doConnectParmsAr[i];
if (pParms->connectOptions != NULL)
{
pfree(pParms->connectOptions);
pParms->connectOptions = NULL;
}
pfree(pParms->segdbDescPtrArray);
pParms->segdbDescPtrArray = NULL;
}
pfree(doConnectParmsAr);
}
}
/*
* Check all the connections of a gang.
*
* return the count of successful connections and
* the count of failed connections due to recovery.
*/
static void
checkConnectionStatus(Gang *gp,
int *countInRecovery,
int *countSuccessful,
struct PQExpBufferData *errorMessage)
{
SegmentDatabaseDescriptor *segdbDesc = NULL;
int size = gp->size;
int i = 0;
/*
* In this loop, we check whether the connections were successful. If not,
* we recreate the error message with palloc and report it.
*/
for (i = 0; i < size; i++)
{
segdbDesc = gp->db_descriptors[i];
/*
* check connection established or not, if not, we may have to
* re-build this gang.
*/
if (segdbDesc->errcode && segdbDesc->error_message.len > 0)
{
/*
* Log failed connections. Complete failures are taken care of
* later.
*/
Assert(segdbDesc->whoami != NULL);
elog(LOG, "Failed connection to %s", segdbDesc->whoami);
insist_log(segdbDesc->errcode != 0 && segdbDesc->error_message.len != 0,
"connection is null, but no error code or error message, for segDB %d", i);
ereport(LOG, (errcode(segdbDesc->errcode), errmsg("%s", segdbDesc->error_message.data)));
/* this connect failed -- but why ? */
if (segment_failure_due_to_recovery(segdbDesc->error_message.data))
{
elog(LOG, "segment is in recovery mode (%s)", segdbDesc->whoami);
(*countInRecovery)++;
}
else
{
appendPQExpBuffer(errorMessage, "%s (%s)\n", segdbDesc->error_message.data, segdbDesc->whoami);
}
cdbconn_resetQEErrorMessage(segdbDesc);
}
else
{
Assert(segdbDesc->errcode == 0 && segdbDesc->error_message.len == 0);
/* We have a live connection! */
(*countSuccessful)++;
}
}
}
......@@ -179,7 +179,6 @@ main(int argc, char *argv[])
GpIdentity.numsegments = TOTOAL_SEGMENTS;
GpIdentity.dbid = 1;
GpIdentity.segindex = -1;
gp_connections_per_thread = 64;
Port procport;
......
......@@ -189,20 +189,6 @@ proc_exit_prepare(int code)
/* For the same reason, reset debug_query_string before it's clobbered */
debug_query_string = NULL;
/*
* Make sure threads get cleaned up: there might be still ongoing
* dispatch threads with something that will be cleaned up during
* shmem_exit. And this should be after proc_exit_inprogress = true above
* so that threads will recognize we are dying and break-out from the loop
* even if they're in the middle of work. Note this call will block for
* a certain time until threads get cleaned up. While it is generally
* expected for a process to die immediately in this code path, it should
* be ok to block as we are most likely not in signal handler or
* something. Actually, I cannot find any better option to do the
* correct work.
*/
cdbdisp_onProcExit();
/*
* Make sure interconnect thread quit before shmem_exit() in FATAL case.
* Otherwise, shmem_exit() may free MemoryContex of MotionConns in connHtab unexpectedly;
......
......@@ -3562,17 +3562,6 @@ struct config_int ConfigureNamesInt_gp[] =
0, 0, INT_MAX, NULL, NULL
},
{
{"gp_connections_per_thread", PGC_BACKEND, GP_ARRAY_TUNING,
gettext_noop("Sets the number of client connections handled in each thread."),
NULL,
GUC_GPDB_ADDOPT
},
&gp_connections_per_thread,
0, 0, INT_MAX,
NULL, assign_gp_connections_per_thread, NULL
},
{
{"gp_subtrans_warn_limit", PGC_POSTMASTER, RESOURCES,
gettext_noop("Sets the warning limit on number of subtransactions in a transaction."),
......
......@@ -650,7 +650,6 @@ gp_interconnect_type=udpifc
# - Worker Process Creation -
gp_connections_per_thread = 0
gp_segment_connect_timeout = 600s
# - Resource limits -
......
......@@ -42,17 +42,6 @@ typedef struct SegmentDatabaseDescriptor
* established connection to the segment database.
*/
PGconn *conn;
/*
* Error info saved when connection cannot be established.
* ERRCODE_xxx (sqlstate encoded as an int) of first error, or 0.
*
* errcode and error_message are only used in threaded implementation.
*/
int errcode;
/* message text; '\n' at end */
PQExpBufferData error_message;
/*
* Connection info saved at most recent PQconnectdb.
......@@ -76,11 +65,6 @@ cdbconn_termSegmentDescriptor(SegmentDatabaseDescriptor *segdbDesc);
/* Connect to a QE as a client via libpq. */
void
cdbconn_doConnect(SegmentDatabaseDescriptor *segdbDesc,
const char *gpqeid,
const char *options);
void
cdbconn_doConnectStart(SegmentDatabaseDescriptor *segdbDesc,
const char *gpqeid,
......@@ -104,9 +88,6 @@ bool cdbconn_isBadConnection(SegmentDatabaseDescriptor *segdbDesc);
/* Return if it's a connection OK */
bool cdbconn_isConnectionOk(SegmentDatabaseDescriptor *segdbDesc);
/* Reset error message buffer */
void cdbconn_resetQEErrorMessage(SegmentDatabaseDescriptor *segdbDesc);
/* Set the slice index for error messages related to this QE. */
void cdbconn_setQEIdentifier(SegmentDatabaseDescriptor *segdbDesc, int sliceIndex);
......
......@@ -60,7 +60,6 @@ typedef struct CdbDispatcherState
typedef struct DispatcherInternalFuncs
{
void (*procExitCallBack)(void);
bool (*checkForCancel)(struct CdbDispatcherState *ds);
int (*getWaitSocketFd)(struct CdbDispatcherState *ds);
void* (*makeDispatchParams)(int maxSlices, int largestGangSize, char *queryText, int queryTextLen);
......@@ -77,9 +76,7 @@ typedef struct DispatcherInternalFuncs
* specified by the gang parameter. cancelOnError indicates whether an error
* occurring on one of the qExec segdbs should cause all still-executing commands to cancel
* on other qExecs. Normally this would be true. The commands are sent over the libpq
* connections that were established during cdblink_setup. They are run inside of threads.
* The number of segdbs handled by any one thread is determined by the
* guc variable gp_connections_per_thread.
* connections that were established during cdblink_setup.
*
* The caller must provide a CdbDispatchResults object having available
* resultArray slots sufficient for the number of QEs to be dispatched:
......@@ -110,7 +107,7 @@ cdbdisp_dispatchToGang(struct CdbDispatcherState *ds,
*
* For asynchronous dispatcher, we have to wait all dispatch to finish before we move on to query execution,
* otherwise we may get into a deadlock situation, e.g, gather motion node waiting for data,
* while segments waiting for plan. This is skipped in threaded dispatcher as data is sent in blocking style.
* while segments waiting for plan.
*/
void
cdbdisp_waitDispatchFinish(struct CdbDispatcherState *ds);
......@@ -182,10 +179,6 @@ cdbdisp_makeDispatchParams(CdbDispatcherState *ds,
bool cdbdisp_checkForCancel(CdbDispatcherState * ds);
int cdbdisp_getWaitSocketFd(CdbDispatcherState *ds);
void cdbdisp_onProcExit(void);
void cdbdisp_setAsync(bool async);
void cdbdisp_markNamedPortalGangsDestroyed(void);
void cdbdisp_cleanupDispatcherHandle(const struct ResourceOwnerData * owner);
......
/*-------------------------------------------------------------------------
*
* cdbdisp_thread.h
* routines for multi-thread implementation of dispatching commands
* to the qExec processes.
*
* Portions Copyright (c) 2005-2008, Greenplum inc
* Portions Copyright (c) 2012-Present Pivotal Software, Inc.
*
*
* IDENTIFICATION
* src/include/cdb/cdbdisp_thread.h
*
*-------------------------------------------------------------------------
*/
#ifndef CDBDISP_THREAD_H
#define CDBDISP_THREAD_H
extern DispatcherInternalFuncs DispatcherSyncFuncs;
#endif
......@@ -197,22 +197,6 @@ cdbdisp_seterrcode(int errcode, /* ERRCODE_xxx or 0 */
int resultIndex, /* -1 if no PGresult */
CdbDispatchResult *dispatchResult);
/*
* Format a message, printf-style, and append to the error_message buffer.
* Also write it to stderr if logging is enabled for messages of the
* given severity level 'elevel' (for example, DEBUG1; or 0 to suppress).
* 'errcode' is the ERRCODE_xxx value for setting the client's SQLSTATE.
* NB: This can be called from a dispatcher thread, so it must not use
* palloc/pfree or elog/ereport because they are not thread safe.
*/
void
cdbdisp_appendMessage(CdbDispatchResult *dispatchResult,
int errcode,
const char *fmt,
...)
/* This extension allows gcc to check the format string */
__attribute__((format(printf, 3, 4)));
/*
* Format a message, printf-style, and append to the error_message buffer.
* Also write it to stderr if logging is enabled for messages of the
......
......@@ -16,11 +16,6 @@
#include "cdb/cdbutil.h"
#include "executor/execdesc.h"
#ifdef WIN32
#include "pthread-win32.h"
#else
#include <pthread.h>
#endif
#include "utils/faultinjector.h"
#include "utils/portal.h"
......@@ -65,6 +60,17 @@ extern int host_segments;
extern MemoryContext GangContext;
extern Gang *CurrentGangCreating;
/*
* cdbgang_createGang:
*
* Creates a new gang by logging on a session to each segDB involved.
*
* call this function in GangContext memory context.
* elog ERROR or return a non-NULL gang.
*/
extern Gang *
cdbgang_createGang(List *segments, SegmentType segmentType);
extern const char *gangTypeToString(GangType type);
extern void setupCdbProcessList(Slice *slice);
......@@ -93,31 +99,6 @@ bool build_gpqeid_param(char *buf, int bufsz, bool is_writer, int identifier, in
char *makeOptions(void);
extern bool segment_failure_due_to_recovery(const char *error_message);
/*
* DisconnectAndDestroyIdleQEs()
*
* This routine is used when a session has been idle for a while (waiting for the
* client to send us SQL to execute). The idea is to consume less resources while sitting idle.
*
* The expectation is that if the session is logged on, but nobody is sending us work to do,
* we want to free up whatever resources we can. Usually it means there is a human being at the
* other end of the connection, and that person has walked away from their terminal, or just hasn't
* decided what to do next. We could be idle for a very long time (many hours).
*
* Of course, freeing QEs means that the next time the user does send in an SQL statement,
* we need to allocate QEs (at least the writer QEs) to do anything. This entails extra work,
* so we don't want to do this if we don't think the session has gone idle.
*
* Only call these routines from an idle session.
*
* This routine is also called from the sigalarm signal handler (hopefully that is safe to do).
*/
#ifdef WIN32
extern int gp_pthread_create(DWORD *thread, void *(*start_routine)(void *), void *arg, const char *caller);
#else
extern int gp_pthread_create(pthread_t *thread, void *(*start_routine)(void *), void *arg, const char *caller);
#endif
/*
* cdbgang_parse_gpqeid_params
*
......@@ -158,7 +139,6 @@ typedef struct CdbProcess
typedef Gang *(*CreateGangFunc)(List *segments, SegmentType segmentType);
extern void cdbgang_setAsync(bool async);
extern void cdbgang_resetPrimaryWriterGang(void);
extern void cdbgang_decreaseNumReaderGang(void);
#endif /* _CDBGANG_H_ */
......@@ -17,6 +17,6 @@
#include "cdb/cdbgang.h"
extern CreateGangFunc pCreateGangFuncAsync;
extern Gang *cdbgang_createGang_async(List *segments, SegmentType segmentType);
#endif
/*-------------------------------------------------------------------------
*
* cdbdisp_gang.h
* routines for multi-thread implementation of creating gang
*
* Portions Copyright (c) 2005-2008, Greenplum inc
* Portions Copyright (c) 2012-Present Pivotal Software, Inc.
*
*
* IDENTIFICATION
* src/include/cdb/cdbgang_thread.h
*
*-------------------------------------------------------------------------
*/
#ifndef CDBGANG_THREAD_H
#define CDBGANG_THREAD_H
#include "cdb/cdbgang.h"
extern CreateGangFunc pCreateGangFuncThreaded;
#endif
......@@ -156,6 +156,25 @@ extern void cdb_cleanup(int code, Datum arg __attribute__((unused)) );
CdbComponentDatabases * cdbcomponent_getCdbComponents(bool DNSLookupAsError);
void cdbcomponent_destroyCdbComponents(void);
/*
* cdbcomponent_cleanupIdleQEs()
*
* This routine is used when a session has been idle for a while (waiting for the
* client to send us SQL to execute). The idea is to consume less resources while sitting idle.
*
* The expectation is that if the session is logged on, but nobody is sending us work to do,
* we want to free up whatever resources we can. Usually it means there is a human being at the
* other end of the connection, and that person has walked away from their terminal, or just hasn't
* decided what to do next. We could be idle for a very long time (many hours).
*
* Of course, freeing QEs means that the next time the user does send in an SQL statement,
* we need to allocate QEs (at least the writer QEs) to do anything. This entails extra work,
* so we don't want to do this if we don't think the session has gone idle.
*
* Only call these routines from an idle session.
*
* This routine is also called from the sigalarm signal handler (hopefully that is safe to do).
*/
void cdbcomponent_cleanupIdleQEs(bool includeWriter);
CdbComponentDatabaseInfo * cdbcomponent_getComponentInfo(int contentId);
......@@ -196,8 +215,4 @@ extern int getgpsegmentCount(void);
if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG) elog(LOG, __VA_ARGS__); \
} while(false);
#define WRITE_LOG_DISPATCHER_DEBUG(...) do { \
if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG) write_log(__VA_ARGS__); \
} while(false);
#endif /* CDBUTIL_H */
......@@ -291,31 +291,6 @@ extern bool Gp_write_shared_snapshot;
extern int gp_fts_transition_retries;
extern int gp_fts_transition_timeout;
/*
* Parameter gp_connections_per_thread
*
* The run-time parameter (GUC variables) gp_connections_per_thread
* controls how many libpq connections to qExecs are processed in each
* thread.
*
* Any number >= 1 is valid.
*
* 1 means each connection has its own thread.
*
* This can be set in the config file, or at runtime by a superuser using
* SQL: set gp_connections_per_thread = x;
*
* The default is 256. So, if there are fewer than 256 segdbs, all would be handled
* by the same thread.
*
* Currently, this is used in two situation:
* 1) In cdblink_setup, when the libpq connections are obtained by the dispatcher
* to the qExecs.
* 2) In CdbDispatchCommand, when commands are sent from the dispatcher to the qExecs.
*
*/
extern int gp_connections_per_thread; /* GUC var - server operating mode. */
/*
* If number of subtransactions within a transaction exceed this limit,
* then a warning is given to the user.
......
......@@ -764,7 +764,6 @@ extern const char *show_gp_session_role(void);
extern bool check_gp_role(char **newval, void **extra, GucSource source);
extern void assign_gp_role(const char *newval, void *extra);
extern const char *show_gp_role(void);
extern void assign_gp_connections_per_thread(int newval, void *extra);
extern void assign_gp_write_shared_snapshot(bool newval, void *extra);
extern bool gpvars_check_gp_resource_manager_policy(char **newval, void **extra, GucSource source);
extern void gpvars_assign_gp_resource_manager_policy(const char *newval, void *extra);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册