提交 6606a5c6 编写于 作者: K Kenan Yao

Code format and comment change in dispatcher; no code logic change involved.

上级 72eaf899
......@@ -68,8 +68,7 @@ static List *availableReaderGangs1 = NIL;
static Gang *primaryWriterGang = NULL;
/*
* Every gang created must have a unique identifier, so the QD and Dispatch Agents can agree
* about what they are talking about.
* Every gang created must have a unique identifier
*/
#define PRIMARY_WRITER_GANG_ID 1
static int gang_id_counter = 2;
......
......@@ -343,7 +343,7 @@ cdbdisp_dtxParmsInit(struct CdbDispatcherState *ds,
}
/*
* Special Greenplum Database-only method for executing DTX protocol commands.
* Build a dtx protocol command string to be dispatched to QE.
*/
static char *
PQbuildGpDtxProtocolCommand(MemoryContext cxt,
......@@ -373,7 +373,7 @@ PQbuildGpDtxProtocolCommand(MemoryContext cxt,
*pos++ = 'T';
pos += 4; /* place holder for message length */
pos += 4; /* placeholder for message length */
tmp = htonl(dtxProtocolCommand);
memcpy(pos, &tmp, 4);
......
......@@ -673,14 +673,13 @@ cdbdisp_dispatchUtilityStatement(struct Node *stmt,
* should come back as "SELECT n" and should not reflect other commands
* inserted by rewrite rules. True means we want the status.
*/
q->canSetTag = true; /* ? */
q->canSetTag = true;
/*
* serialized the stmt tree, and create the sql statement: mppexec ....
*/
serializedQuerytree =
serializeNode((Node *) q, &serializedQuerytree_len,
NULL /*uncompressed_size */);
serializeNode((Node *) q, &serializedQuerytree_len, NULL /*uncompressed_size */);
Assert(serializedQuerytree != NULL);
......@@ -733,24 +732,21 @@ cdbdisp_dispatchRMCommand(const char *strCommand,
/*
* Wait for all QEs to finish. Don't cancel.
*/
CdbCheckDispatchResult((struct CdbDispatcherState *) &ds,
DISPATCH_WAIT_NONE);
CdbCheckDispatchResult((struct CdbDispatcherState *) &ds, DISPATCH_WAIT_NONE);
}
PG_CATCH();
{
/*
* Something happend, clean up after ourselves
*/
CdbCheckDispatchResult((struct CdbDispatcherState *) &ds,
DISPATCH_WAIT_NONE);
CdbCheckDispatchResult((struct CdbDispatcherState *) &ds, DISPATCH_WAIT_NONE);
cdbdisp_destroyDispatcherState((struct CdbDispatcherState *) &ds);
PG_RE_THROW();
}
PG_END_TRY();
resultSets =
cdbdisp_returnResults(ds.primaryResults, errmsgbuf, numresults);
resultSets = cdbdisp_returnResults(ds.primaryResults, errmsgbuf, numresults);
cdbdisp_destroyDispatcherState((struct CdbDispatcherState *) &ds);
......@@ -1017,18 +1013,7 @@ fillSliceVector(SliceTable *sliceTbl, int rootIdx,
}
/*
* Special Greenplum-only method for executing SQL statements. Specifies a global
* transaction context that the statement should be executed within.
*
* This should *ONLY* ever be used by the Greenplum Database Query Dispatcher and NEVER (EVER)
* by anyone else.
*
* snapshot - serialized form of Snapshot data.
* xid - Global Transaction Id to use.
* flags - specifies additional processing instructions to the remote server.
* e.g. Auto explicitly start a transaction before executing this
* statement.
* gp_command_count - Client request serial# to be passed to the qExec.
* Build a query string to be dispatched to QE.
*/
static char *
PQbuildGpQueryString(MemoryContext cxt, DispatchCommandParms * pParms,
......@@ -1046,8 +1031,8 @@ PQbuildGpQueryString(MemoryContext cxt, DispatchCommandParms * pParms,
int sliceinfo_len = pQueryParms->serializedSliceInfolen;
const char *snapshotInfo = pQueryParms->serializedDtxContextInfo;
int snapshotInfo_len = pQueryParms->serializedDtxContextInfolen;
int flags = 0; /* unused flags */
int localSlice = 0; /* localSlice; place holder; set later in dupQueryTextAndSetSliceId */
int flags = 0; /* unused flags */
int localSlice = 0; /* localSlice; placeholder; set later in dupQueryTextAndSetSliceId */
int rootIdx = pQueryParms->rootIdx;
const char *seqServerHost = pQueryParms->seqServerHost;
int seqServerHostlen = pQueryParms->seqServerHostlen;
......@@ -1099,7 +1084,7 @@ PQbuildGpQueryString(MemoryContext cxt, DispatchCommandParms * pParms,
*pos++ = 'M';
pos += 4; /* place holder for message length */
pos += 4; /* placeholder for message length */
tmp = htonl(localSlice);
memcpy(pos, &tmp, sizeof(localSlice));
......@@ -1331,17 +1316,6 @@ cdbdisp_dispatchX(DispatchCommandQueryParms *pQueryParms,
}
}
/*
* Now we need to call CDBDispatchCommand once per slice. Each such
* call dispatches a MPPEXEC command to each of the QEs assigned to
* the slice.
*
* The QE information goes in two places: (1) in the argument to the
* function CDBDispatchCommand, and (2) in the serialized
* command sent to the QEs.
*
* So, for each slice in the tree...
*/
for (iSlice = 0; iSlice < nSlices; iSlice++)
{
CdbDispatchDirectDesc direct;
......
......@@ -109,16 +109,6 @@ cdbdisp_dispatchToGang_internal(struct CdbDispatcherState *ds,
Assert(gangSize <= largestGangsize());
db_descriptors = gp->db_descriptors;
/*
* The most threads we could have is segdb_count / gp_connections_per_thread, rounded up.
* This is equivalent to 1 + (segdb_count-1) / gp_connections_per_thread.
* We allocate enough memory for this many DispatchCommandParms structures,
* even though we may not use them all.
*
* We can only use gp->size here if we're not dealing with a
* singleton gang. It is safer to always use the max number of segments we are
* controlling (largestGangsize).
*/
Assert(gp_connections_per_thread >= 0);
Assert(ds->dispatchThreads != NULL);
/*
......@@ -590,7 +580,7 @@ thread_DispatchOut(DispatchCommandParms * pParms)
if (PQstatus(dispatchResult->segdbDesc->conn) == CONNECTION_BAD)
{
char *msg;
char *msg;
msg = PQerrorMessage(dispatchResult->segdbDesc->conn);
......@@ -1161,7 +1151,7 @@ dispatchCommand(CdbDispatchResult * dispatchResult,
*/
if (PQsendGpQuery_shared(conn, (char *) query_text, query_text_len) == 0)
{
char *msg = PQerrorMessage(segdbDesc->conn);
char *msg = PQerrorMessage(segdbDesc->conn);
if (DEBUG3 >= log_min_messages)
write_log("PQsendMPPQuery_shared error %s %s",
......
......@@ -37,7 +37,7 @@
#endif
/* keep this in same order as ExecStatusType in libpq-fe.h */
char *const pgresStatus[] = {
char *const pgresStatus[] = {
"PGRES_EMPTY_QUERY",
"PGRES_COMMAND_OK",
"PGRES_TUPLES_OK",
......@@ -1660,7 +1660,7 @@ PQisBusy(PGconn *conn)
PGresult *
PQgetResult(PGconn *conn)
{
PGresult *res;
PGresult *res;
if (!conn)
return NULL;
......@@ -1671,7 +1671,7 @@ PQgetResult(PGconn *conn)
/* If not ready to return something, block until we are. */
while (conn->asyncStatus == PGASYNC_BUSY)
{
int flushResult;
int flushResult;
/*
* If data remains unsent, send it. Else we might be waiting for the
......
......@@ -65,11 +65,10 @@
#include "nodes/pg_list.h"
#include "cdb/cdbpartition.h"
static int pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
static int pqSendSome(PGconn *conn, int len);
static int pqSocketCheck(PGconn *conn, int forRead, int forWrite,
time_t end_time);
static int pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time);
static int pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
static int pqSendSome(PGconn *conn, int len);
static int pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time);
static int pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time);
/*
......@@ -382,8 +381,8 @@ pqPutInt(int value, size_t bytes, PGconn *conn)
int
pqCheckOutBufferSpace(size_t bytes_needed, PGconn *conn)
{
int newsize = conn->outBufSize;
char *newbuf;
int newsize = conn->outBufSize;
char *newbuf;
if (bytes_needed <= (size_t) newsize)
return 0;
......@@ -839,9 +838,9 @@ definitelyFailed:
static int
pqSendSome(PGconn *conn, int len)
{
char *ptr = conn->outBuffer;
int remaining = conn->outCount;
int result = 0;
char *ptr = conn->outBuffer;
int remaining = conn->outCount;
int result = 0;
if (conn->sock < 0)
{
......@@ -853,8 +852,8 @@ pqSendSome(PGconn *conn, int len)
/* while there's still data to send */
while (len > 0)
{
int sent;
char sebuf[256];
int sent;
char sebuf[256];
#ifndef WIN32
sent = send(conn->sock, ptr, len, 0);
......
......@@ -13,60 +13,57 @@
#include "executor/execdesc.h"
#include <pthread.h>
struct Port; /* #include "libpq/libpq-be.h" */
struct QueryDesc; /* #include "executor/execdesc.h" */
struct Port;
struct QueryDesc;
struct DirectDispatchInfo;
struct EState;
/* GangType enumeration is used in several structures related to CDB
* slice plan support.
*/
typedef enum GangType
{
GANGTYPE_UNALLOCATED, /* a root slice executed by the qDisp */
GANGTYPE_ENTRYDB_READER, /* a 1-gang with read access to the entry db */
GANGTYPE_SINGLETON_READER, /* a 1-gang to read the segment dbs */
GANGTYPE_PRIMARY_READER, /* a N-gang to read the segment dbs */
GANGTYPE_PRIMARY_WRITER /* the N-gang that can update the segment dbs */
GANGTYPE_UNALLOCATED, /* a root slice executed by the qDisp */
GANGTYPE_ENTRYDB_READER, /* a 1-gang with read access to the entry db */
GANGTYPE_SINGLETON_READER, /* a 1-gang to read the segment dbs */
GANGTYPE_PRIMARY_READER, /* a N-gang to read the segment dbs */
GANGTYPE_PRIMARY_WRITER /* the N-gang that can update the segment dbs */
} GangType;
/*
* A gang represents a single worker on each connected segDB
*
* A gang represents a single group of workers on each connected segDB
*/
typedef struct Gang
{
GangType type;
int gang_id;
int size; /* segment_count or segdb_count ? */
GangType type;
int gang_id;
int size;
/* MPP-6253: on *writer* gangs keep track of dispatcher use
* (reader gangs already track this properly, since they get
* allocated from a list of available gangs.*/
bool dispatcherActive;
/*
* Keep track of dispatcher use for writer gang. (reader gangs already track
* this properly, since they get allocated from a list of available gangs.)
*/
bool dispatcherActive;
/* the named portal that owns this gang, NULL if none */
char *portal_name;
char *portal_name;
/*
* Array of QEs/segDBs that make up this gang.
* Sorted by segment index.
* Array of QEs/segDBs that make up this gang. Sorted by segment index.
*/
struct SegmentDatabaseDescriptor *db_descriptors;
/* For debugging purposes only. These do not add any actual functionality. */
bool allocated;
bool allocated;
/* should be destroyed in cleanupGang() if set*/
bool noReuse;
/* should be destroyed if set */
bool noReuse;
/* memory context */
MemoryContext perGangContext;
} Gang;
extern Gang *allocateReaderGang(GangType type, char *portal_name);
extern Gang *allocateWriterGang(void);
struct DirectDispatchInfo;
extern List *getCdbProcessList(Gang *gang, int sliceIndex, struct DirectDispatchInfo *directDispatch);
extern bool gangOK(Gang *gp);
......@@ -79,16 +76,15 @@ extern void disconnectAndDestroyAllGangs(bool resetSession);
extern void CheckForResetSession(void);
extern List * getAllIdleReaderGangs(void);
extern List * getAllBusyReaderGangs(void);
extern List *getAllIdleReaderGangs(void);
extern List *getAllBusyReaderGangs(void);
extern CdbComponentDatabases *getComponentDatabases(void);
extern bool gangsExist(void);
struct SegmentDatabaseDescriptor *getSegmentDescriptorFromGang(const Gang *gp, int seg);
extern struct SegmentDatabaseDescriptor *getSegmentDescriptorFromGang(const Gang *gp, int seg);
extern Gang *findGangById(int gang_id);
......@@ -97,20 +93,20 @@ extern Gang *findGangById(int gang_id);
* disconnectAndDestroyIdleReaderGangs()
*
* This routine is used when a session has been idle for a while (waiting for the
* client to send us SQL to execute). The idea is to consume less resources while sitting idle.
* client to send us SQL to execute). The idea is to consume less resources while sitting idle.
*
* The expectation is that if the session is logged on, but nobody is sending us work to do,
* we want to free up whatever resources we can. Usually it means there is a human being at the
* we want to free up whatever resources we can. Usually it means there is a human being at the
* other end of the connection, and that person has walked away from their terminal, or just hasn't
* decided what to do next. We could be idle for a very long time (many hours).
* decided what to do next. We could be idle for a very long time (many hours).
*
* Of course, freeing gangs means that the next time the user does send in an SQL statement,
* we need to allocate gangs (at least the writer gang) to do anything. This entails extra work,
* we need to allocate gangs (at least the writer gang) to do anything. This entails extra work,
* so we don't want to do this if we don't think the session has gone idle.
*
* Only call these routines from an idle session.
*
* This routine is called from the sigalarm signal handler (hopefully that is safe to do).
* This routine is also called from the sigalarm signal handler (hopefully that is safe to do).
*/
extern void disconnectAndDestroyIdleReaderGangs(void);
......@@ -119,6 +115,7 @@ extern void cleanupPortalGangs(Portal portal);
extern int largestGangsize(void);
extern int gp_pthread_create(pthread_t *thread, void *(*start_routine)(void *), void *arg, const char *caller);
/*
* cdbgang_parse_gpqeid_params
*
......@@ -129,110 +126,97 @@ extern int gp_pthread_create(pthread_t *thread, void *(*start_routine)(void *),
* command line options have not been processed; GUCs have the settings
* inherited from the postmaster; etc; so don't try to do too much in here.
*/
void
cdbgang_parse_gpqeid_params(struct Port *port, const char* gpqeid_value);
extern void cdbgang_parse_gpqeid_params(struct Port *port, const char *gpqeid_value);
void
cdbgang_parse_gpqdid_params(struct Port *port, const char* gpqdid_value);
extern void cdbgang_parse_gpqdid_params(struct Port *port, const char *gpqdid_value);
void
cdbgang_parse_gpdaid_params(struct Port *port, const char* gpdaid_value);
extern void cdbgang_parse_gpdaid_params(struct Port *port, const char *gpdaid_value);
/* ----------------
/*
* MPP Worker Process information
*
* This structure represents the global information about a worker process.
* It is constructed on the entry process (QD) and transmitted as part of
* the global slice table to the involved QEs. Note that this is an
* the global slice table to the involved QEs. Note that this is an
* immutable, fixed-size structure so it can be held in a contiguous
* array. In the Slice node, however, it is held in a List.
* ----------------
*/
typedef struct CdbProcess
{
NodeTag type;
/* These fields are established at connection (libpq) time and are
/*
* These fields are established at connection (libpq) time and are
* available to the QD in PGconn structure associated with connected
* QE. It needs to be explicitly transmitted to QE's, however,
* QE. It needs to be explicitly transmitted to QE's
*/
char *listenerAddr; /* Interconnect listener IPv4 address, a C-string */
int listenerPort; /* Interconnect listener port */
int pid; /* Backend PID of the process. */
/* Unclear about why we need these, however, it is no trouble to carry
* them.
*/
int contentid;
} CdbProcess;
/* ----------------
/*
* MPP Plan Slice information
*
* These structures summarize how a plan tree is sliced up into separate
* units of execution or slices. A slice will execute on a each worker within
* a gang of processes. Some gangs have a worker process on each of several
* databases, others have a single worker.
* ----------------
*
*/
typedef struct Slice
{
NodeTag type;
/*
* The index in the global slice table of this
* slice. The root slice of the main plan is
* always 0. Slices that have senders at their
* local root have a sliceIndex equal to the
* motionID of their sender Motion.
* The index in the global slice table of this slice. The root slice of
* the main plan is always 0. Slices that have senders at their local
* root have a sliceIndex equal to the motionID of their sender Motion.
*
* Undefined slices should have this set to
* -1.
* Undefined slices should have this set to -1.
*/
int sliceIndex;
/*
* The root slice of the slice tree of which
* this slice is a part.
* The root slice of the slice tree of which this slice is a part.
*/
int rootIndex;
/*
* the index of parent in global slice table (origin 0)
* or -1 if this is root slice.
* the index of parent in global slice table (origin 0) or -1 if
* this is root slice.
*/
int parentIndex;
/*
* An integer list of indices in the global slice
* table (origin 0) of the child slices of
* this slice, or -1 if this is a leaf slice.
* A child slice corresponds to a receiving
* motion in this slice.
* An integer list of indices in the global slice table (origin 0)
* of the child slices of this slice, or -1 if this is a leaf slice.
* A child slice corresponds to a receiving motion in this slice.
*/
List *children;
/* What kind of gang does this slice need? */
/* What kind of gang does this slice need */
GangType gangType;
/* How many gang members needed?
/*
* How many gang members needed
*
* This may seem redundant, but it is set before
* the process lists below and used to decide how
* It is set before the process lists below and used to decide how
* to initialize them.
*/
int gangSize;
/* how many of the gang members will actually be used?
* This takes into account directDispatch information
/*
* How many of the gang members will actually be used. This takes into
* account directDispatch information
*/
int numGangMembersToBeActive;
/**
/*
* directDispatch->isDirectDispatch should ONLY be set for a slice when it requires an n-gang.
*/
DirectDispatchInfo directDispatch;
......@@ -240,27 +224,14 @@ typedef struct Slice
struct Gang *primaryGang;
/* tell dispatch agents which gang we're talking about.*/
int primary_gang_id;
int primary_gang_id;
/*
* A list of CDBProcess nodes corresponding to
* the worker processes allocated to implement
* this plan slice.
* A list of CDBProcess nodes corresponding to the worker processes allocated
* to implement this plan slice.
*
* The number of processes must agree with the
* the plan slice to be implemented. In MPP 2
* the possibilities are 1 for a slice that
* operates on a single stream of tuples (e.g.,
* the receiver of a fixed motion, a 1-gang), or
* the number of segments (e.g., a parallel
* operation or the receiver of the results of a
* parallel operation, an N-gang).
*
* The processes of an N-gang must be in hash
* value order -- a hash value of H used to direct
* an input tuple to this slice will direct it to
* the H+1st process in the list.
* The number of processes must agree with the the plan slice to be implemented.
*/
List *primaryProcesses;
} Slice;
......@@ -271,29 +242,27 @@ typedef struct Slice
*
* Slice 0 is the root slice of plan as a whole.
* Slices 1 through nMotion are motion slices with a sending motion at
* the root of the slice.
* the root of the slice.
* Slices nMotion+1 and on are root slices of initPlans.
*
* There may be unused slices in case the plan contains subplans that
* are not initPlans. (This won't happen unless MPP decides to support
* are not initPlans. (This won't happen unless MPP decides to support
* subplans similarly to PostgreSQL, which isn't the current plan.)
*/
typedef struct SliceTable
{
NodeTag type;
int nMotions; /* The number Motion nodes in the entire plan */
int nInitPlans; /* The number of initplan slices allocated */
int localSlice; /* Index of the slice to execute. */
List *slices; /* List of slices */
bool doInstrument; /* true => collect stats for EXPLAIN ANALYZE */
int nMotions; /* The number Motion nodes in the entire plan */
int nInitPlans; /* The number of initplan slices allocated */
int localSlice; /* Index of the slice to execute */
List *slices; /* List of slices */
bool doInstrument; /* true => collect stats for EXPLAIN ANALYZE */
uint32 ic_instance_id;
} SliceTable;
struct EState;
extern void InitSliceTable(struct EState *estate, int nMotions, int nSubplans);
extern Slice *getCurrentSlice(struct EState* estate, int sliceIndex);
extern Slice *getCurrentSlice(struct EState *estate, int sliceIndex);
extern bool sliceRunsOnQD(Slice *slice);
extern bool sliceRunsOnQE(Slice *slice);
extern int sliceCalculateNumSendingProcesses(Slice *slice);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册