diff --git a/src/backend/cdb/cdbgang.c b/src/backend/cdb/cdbgang.c index 438fd80ff6831128e61e22416009ee4a45f02a1b..4f214f1504f50ff835ea80c775d0f7eb5ae9accb 100644 --- a/src/backend/cdb/cdbgang.c +++ b/src/backend/cdb/cdbgang.c @@ -68,8 +68,7 @@ static List *availableReaderGangs1 = NIL; static Gang *primaryWriterGang = NULL; /* - * Every gang created must have a unique identifier, so the QD and Dispatch Agents can agree - * about what they are talking about. + * Every gang created must have a unique identifier */ #define PRIMARY_WRITER_GANG_ID 1 static int gang_id_counter = 2; diff --git a/src/backend/cdb/dispatcher/cdbdisp_dtx.c b/src/backend/cdb/dispatcher/cdbdisp_dtx.c index f4ad488f4d30c7d0c776e05364cb3c3eb67e04f5..af6ca5ef7b504313e82c0c985c92b50782feec9a 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_dtx.c +++ b/src/backend/cdb/dispatcher/cdbdisp_dtx.c @@ -343,7 +343,7 @@ cdbdisp_dtxParmsInit(struct CdbDispatcherState *ds, } /* - * Special Greenplum Database-only method for executing DTX protocol commands. + * Build a dtx protocol command string to be dispatched to QE. */ static char * PQbuildGpDtxProtocolCommand(MemoryContext cxt, @@ -373,7 +373,7 @@ PQbuildGpDtxProtocolCommand(MemoryContext cxt, *pos++ = 'T'; - pos += 4; /* place holder for message length */ + pos += 4; /* placeholder for message length */ tmp = htonl(dtxProtocolCommand); memcpy(pos, &tmp, 4); diff --git a/src/backend/cdb/dispatcher/cdbdisp_query.c b/src/backend/cdb/dispatcher/cdbdisp_query.c index 9cd1bca9c431ffd5b605ce3c3cd2a6249f34df51..fc79c4687866ca0e3d17eb130b1ca52029d70c96 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_query.c +++ b/src/backend/cdb/dispatcher/cdbdisp_query.c @@ -673,14 +673,13 @@ cdbdisp_dispatchUtilityStatement(struct Node *stmt, * should come back as "SELECT n" and should not reflect other commands * inserted by rewrite rules. True means we want the status. */ - q->canSetTag = true; /* ? */ + q->canSetTag = true; /* * serialized the stmt tree, and create the sql statement: mppexec .... */ serializedQuerytree = - serializeNode((Node *) q, &serializedQuerytree_len, - NULL /*uncompressed_size */); + serializeNode((Node *) q, &serializedQuerytree_len, NULL /*uncompressed_size */); Assert(serializedQuerytree != NULL); @@ -733,24 +732,21 @@ cdbdisp_dispatchRMCommand(const char *strCommand, /* * Wait for all QEs to finish. Don't cancel. */ - CdbCheckDispatchResult((struct CdbDispatcherState *) &ds, - DISPATCH_WAIT_NONE); + CdbCheckDispatchResult((struct CdbDispatcherState *) &ds, DISPATCH_WAIT_NONE); } PG_CATCH(); { /* * Something happend, clean up after ourselves */ - CdbCheckDispatchResult((struct CdbDispatcherState *) &ds, - DISPATCH_WAIT_NONE); + CdbCheckDispatchResult((struct CdbDispatcherState *) &ds, DISPATCH_WAIT_NONE); cdbdisp_destroyDispatcherState((struct CdbDispatcherState *) &ds); PG_RE_THROW(); } PG_END_TRY(); - resultSets = - cdbdisp_returnResults(ds.primaryResults, errmsgbuf, numresults); + resultSets = cdbdisp_returnResults(ds.primaryResults, errmsgbuf, numresults); cdbdisp_destroyDispatcherState((struct CdbDispatcherState *) &ds); @@ -1017,18 +1013,7 @@ fillSliceVector(SliceTable *sliceTbl, int rootIdx, } /* - * Special Greenplum-only method for executing SQL statements. Specifies a global - * transaction context that the statement should be executed within. - * - * This should *ONLY* ever be used by the Greenplum Database Query Dispatcher and NEVER (EVER) - * by anyone else. - * - * snapshot - serialized form of Snapshot data. - * xid - Global Transaction Id to use. - * flags - specifies additional processing instructions to the remote server. - * e.g. Auto explicitly start a transaction before executing this - * statement. - * gp_command_count - Client request serial# to be passed to the qExec. + * Build a query string to be dispatched to QE. */ static char * PQbuildGpQueryString(MemoryContext cxt, DispatchCommandParms * pParms, @@ -1046,8 +1031,8 @@ PQbuildGpQueryString(MemoryContext cxt, DispatchCommandParms * pParms, int sliceinfo_len = pQueryParms->serializedSliceInfolen; const char *snapshotInfo = pQueryParms->serializedDtxContextInfo; int snapshotInfo_len = pQueryParms->serializedDtxContextInfolen; - int flags = 0; /* unused flags */ - int localSlice = 0; /* localSlice; place holder; set later in dupQueryTextAndSetSliceId */ + int flags = 0; /* unused flags */ + int localSlice = 0; /* localSlice; placeholder; set later in dupQueryTextAndSetSliceId */ int rootIdx = pQueryParms->rootIdx; const char *seqServerHost = pQueryParms->seqServerHost; int seqServerHostlen = pQueryParms->seqServerHostlen; @@ -1099,7 +1084,7 @@ PQbuildGpQueryString(MemoryContext cxt, DispatchCommandParms * pParms, *pos++ = 'M'; - pos += 4; /* place holder for message length */ + pos += 4; /* placeholder for message length */ tmp = htonl(localSlice); memcpy(pos, &tmp, sizeof(localSlice)); @@ -1331,17 +1316,6 @@ cdbdisp_dispatchX(DispatchCommandQueryParms *pQueryParms, } } - /* - * Now we need to call CDBDispatchCommand once per slice. Each such - * call dispatches a MPPEXEC command to each of the QEs assigned to - * the slice. - * - * The QE information goes in two places: (1) in the argument to the - * function CDBDispatchCommand, and (2) in the serialized - * command sent to the QEs. - * - * So, for each slice in the tree... - */ for (iSlice = 0; iSlice < nSlices; iSlice++) { CdbDispatchDirectDesc direct; diff --git a/src/backend/cdb/dispatcher/cdbdisp_thread.c b/src/backend/cdb/dispatcher/cdbdisp_thread.c index 4cfa15ae92febf6863508e31434dfb01b662f6c0..0462515fe298f0b78eaaecdfbad20b5f7dbf4956 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_thread.c +++ b/src/backend/cdb/dispatcher/cdbdisp_thread.c @@ -109,16 +109,6 @@ cdbdisp_dispatchToGang_internal(struct CdbDispatcherState *ds, Assert(gangSize <= largestGangsize()); db_descriptors = gp->db_descriptors; - /* - * The most threads we could have is segdb_count / gp_connections_per_thread, rounded up. - * This is equivalent to 1 + (segdb_count-1) / gp_connections_per_thread. - * We allocate enough memory for this many DispatchCommandParms structures, - * even though we may not use them all. - * - * We can only use gp->size here if we're not dealing with a - * singleton gang. It is safer to always use the max number of segments we are - * controlling (largestGangsize). - */ Assert(gp_connections_per_thread >= 0); Assert(ds->dispatchThreads != NULL); /* @@ -590,7 +580,7 @@ thread_DispatchOut(DispatchCommandParms * pParms) if (PQstatus(dispatchResult->segdbDesc->conn) == CONNECTION_BAD) { - char *msg; + char *msg; msg = PQerrorMessage(dispatchResult->segdbDesc->conn); @@ -1161,7 +1151,7 @@ dispatchCommand(CdbDispatchResult * dispatchResult, */ if (PQsendGpQuery_shared(conn, (char *) query_text, query_text_len) == 0) { - char *msg = PQerrorMessage(segdbDesc->conn); + char *msg = PQerrorMessage(segdbDesc->conn); if (DEBUG3 >= log_min_messages) write_log("PQsendMPPQuery_shared error %s %s", diff --git a/src/backend/gp_libpq_fe/fe-exec.c b/src/backend/gp_libpq_fe/fe-exec.c index 4c065222a0bde5fff95e764492fc17ff5e70c556..93f4764daaef54cd2e8ac20727b9c75483aa1610 100644 --- a/src/backend/gp_libpq_fe/fe-exec.c +++ b/src/backend/gp_libpq_fe/fe-exec.c @@ -37,7 +37,7 @@ #endif /* keep this in same order as ExecStatusType in libpq-fe.h */ -char *const pgresStatus[] = { +char *const pgresStatus[] = { "PGRES_EMPTY_QUERY", "PGRES_COMMAND_OK", "PGRES_TUPLES_OK", @@ -1660,7 +1660,7 @@ PQisBusy(PGconn *conn) PGresult * PQgetResult(PGconn *conn) { - PGresult *res; + PGresult *res; if (!conn) return NULL; @@ -1671,7 +1671,7 @@ PQgetResult(PGconn *conn) /* If not ready to return something, block until we are. */ while (conn->asyncStatus == PGASYNC_BUSY) { - int flushResult; + int flushResult; /* * If data remains unsent, send it. Else we might be waiting for the diff --git a/src/backend/gp_libpq_fe/fe-misc.c b/src/backend/gp_libpq_fe/fe-misc.c index a7382f00f435f5d79e5840620fc547d9005a87f0..3872480ce714358d68108187a07dc0d2a711fcee 100644 --- a/src/backend/gp_libpq_fe/fe-misc.c +++ b/src/backend/gp_libpq_fe/fe-misc.c @@ -65,11 +65,10 @@ #include "nodes/pg_list.h" #include "cdb/cdbpartition.h" -static int pqPutMsgBytes(const void *buf, size_t len, PGconn *conn); -static int pqSendSome(PGconn *conn, int len); -static int pqSocketCheck(PGconn *conn, int forRead, int forWrite, - time_t end_time); -static int pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time); +static int pqPutMsgBytes(const void *buf, size_t len, PGconn *conn); +static int pqSendSome(PGconn *conn, int len); +static int pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time); +static int pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time); /* @@ -382,8 +381,8 @@ pqPutInt(int value, size_t bytes, PGconn *conn) int pqCheckOutBufferSpace(size_t bytes_needed, PGconn *conn) { - int newsize = conn->outBufSize; - char *newbuf; + int newsize = conn->outBufSize; + char *newbuf; if (bytes_needed <= (size_t) newsize) return 0; @@ -839,9 +838,9 @@ definitelyFailed: static int pqSendSome(PGconn *conn, int len) { - char *ptr = conn->outBuffer; - int remaining = conn->outCount; - int result = 0; + char *ptr = conn->outBuffer; + int remaining = conn->outCount; + int result = 0; if (conn->sock < 0) { @@ -853,8 +852,8 @@ pqSendSome(PGconn *conn, int len) /* while there's still data to send */ while (len > 0) { - int sent; - char sebuf[256]; + int sent; + char sebuf[256]; #ifndef WIN32 sent = send(conn->sock, ptr, len, 0); diff --git a/src/include/cdb/cdbgang.h b/src/include/cdb/cdbgang.h index 2a1c76c86650bdd6303b38647686ce822b3b77c5..86257df8f3aaadae44ea1c0708b1266f6bc5329b 100644 --- a/src/include/cdb/cdbgang.h +++ b/src/include/cdb/cdbgang.h @@ -13,60 +13,57 @@ #include "executor/execdesc.h" #include -struct Port; /* #include "libpq/libpq-be.h" */ -struct QueryDesc; /* #include "executor/execdesc.h" */ +struct Port; +struct QueryDesc; +struct DirectDispatchInfo; +struct EState; -/* GangType enumeration is used in several structures related to CDB - * slice plan support. - */ typedef enum GangType { - GANGTYPE_UNALLOCATED, /* a root slice executed by the qDisp */ - GANGTYPE_ENTRYDB_READER, /* a 1-gang with read access to the entry db */ - GANGTYPE_SINGLETON_READER, /* a 1-gang to read the segment dbs */ - GANGTYPE_PRIMARY_READER, /* a N-gang to read the segment dbs */ - GANGTYPE_PRIMARY_WRITER /* the N-gang that can update the segment dbs */ + GANGTYPE_UNALLOCATED, /* a root slice executed by the qDisp */ + GANGTYPE_ENTRYDB_READER, /* a 1-gang with read access to the entry db */ + GANGTYPE_SINGLETON_READER, /* a 1-gang to read the segment dbs */ + GANGTYPE_PRIMARY_READER, /* a N-gang to read the segment dbs */ + GANGTYPE_PRIMARY_WRITER /* the N-gang that can update the segment dbs */ } GangType; /* - * A gang represents a single worker on each connected segDB - * + * A gang represents a single group of workers on each connected segDB */ - typedef struct Gang { - GangType type; - int gang_id; - int size; /* segment_count or segdb_count ? */ + GangType type; + int gang_id; + int size; - /* MPP-6253: on *writer* gangs keep track of dispatcher use - * (reader gangs already track this properly, since they get - * allocated from a list of available gangs.*/ - bool dispatcherActive; + /* + * Keep track of dispatcher use for writer gang. (reader gangs already track + * this properly, since they get allocated from a list of available gangs.) + */ + bool dispatcherActive; /* the named portal that owns this gang, NULL if none */ - char *portal_name; + char *portal_name; /* - * Array of QEs/segDBs that make up this gang. - * Sorted by segment index. + * Array of QEs/segDBs that make up this gang. Sorted by segment index. */ struct SegmentDatabaseDescriptor *db_descriptors; /* For debugging purposes only. These do not add any actual functionality. */ - bool allocated; + bool allocated; - /* should be destroyed in cleanupGang() if set*/ - bool noReuse; + /* should be destroyed if set */ + bool noReuse; /* memory context */ MemoryContext perGangContext; } Gang; extern Gang *allocateReaderGang(GangType type, char *portal_name); + extern Gang *allocateWriterGang(void); -struct DirectDispatchInfo; extern List *getCdbProcessList(Gang *gang, int sliceIndex, struct DirectDispatchInfo *directDispatch); extern bool gangOK(Gang *gp); @@ -79,16 +76,15 @@ extern void disconnectAndDestroyAllGangs(bool resetSession); extern void CheckForResetSession(void); -extern List * getAllIdleReaderGangs(void); - -extern List * getAllBusyReaderGangs(void); +extern List *getAllIdleReaderGangs(void); +extern List *getAllBusyReaderGangs(void); extern CdbComponentDatabases *getComponentDatabases(void); extern bool gangsExist(void); -struct SegmentDatabaseDescriptor *getSegmentDescriptorFromGang(const Gang *gp, int seg); +extern struct SegmentDatabaseDescriptor *getSegmentDescriptorFromGang(const Gang *gp, int seg); extern Gang *findGangById(int gang_id); @@ -97,20 +93,20 @@ extern Gang *findGangById(int gang_id); * disconnectAndDestroyIdleReaderGangs() * * This routine is used when a session has been idle for a while (waiting for the - * client to send us SQL to execute). The idea is to consume less resources while sitting idle. + * client to send us SQL to execute). The idea is to consume less resources while sitting idle. * * The expectation is that if the session is logged on, but nobody is sending us work to do, - * we want to free up whatever resources we can. Usually it means there is a human being at the + * we want to free up whatever resources we can. Usually it means there is a human being at the * other end of the connection, and that person has walked away from their terminal, or just hasn't - * decided what to do next. We could be idle for a very long time (many hours). + * decided what to do next. We could be idle for a very long time (many hours). * * Of course, freeing gangs means that the next time the user does send in an SQL statement, - * we need to allocate gangs (at least the writer gang) to do anything. This entails extra work, + * we need to allocate gangs (at least the writer gang) to do anything. This entails extra work, * so we don't want to do this if we don't think the session has gone idle. * * Only call these routines from an idle session. * - * This routine is called from the sigalarm signal handler (hopefully that is safe to do). + * This routine is also called from the sigalarm signal handler (hopefully that is safe to do). */ extern void disconnectAndDestroyIdleReaderGangs(void); @@ -119,6 +115,7 @@ extern void cleanupPortalGangs(Portal portal); extern int largestGangsize(void); extern int gp_pthread_create(pthread_t *thread, void *(*start_routine)(void *), void *arg, const char *caller); + /* * cdbgang_parse_gpqeid_params * @@ -129,110 +126,97 @@ extern int gp_pthread_create(pthread_t *thread, void *(*start_routine)(void *), * command line options have not been processed; GUCs have the settings * inherited from the postmaster; etc; so don't try to do too much in here. */ -void -cdbgang_parse_gpqeid_params(struct Port *port, const char* gpqeid_value); +extern void cdbgang_parse_gpqeid_params(struct Port *port, const char *gpqeid_value); -void -cdbgang_parse_gpqdid_params(struct Port *port, const char* gpqdid_value); +extern void cdbgang_parse_gpqdid_params(struct Port *port, const char *gpqdid_value); -void -cdbgang_parse_gpdaid_params(struct Port *port, const char* gpdaid_value); +extern void cdbgang_parse_gpdaid_params(struct Port *port, const char *gpdaid_value); -/* ---------------- +/* * MPP Worker Process information * * This structure represents the global information about a worker process. * It is constructed on the entry process (QD) and transmitted as part of - * the global slice table to the involved QEs. Note that this is an + * the global slice table to the involved QEs. Note that this is an * immutable, fixed-size structure so it can be held in a contiguous * array. In the Slice node, however, it is held in a List. - * ---------------- */ typedef struct CdbProcess { NodeTag type; - /* These fields are established at connection (libpq) time and are + /* + * These fields are established at connection (libpq) time and are * available to the QD in PGconn structure associated with connected - * QE. It needs to be explicitly transmitted to QE's, however, + * QE. It needs to be explicitly transmitted to QE's */ - char *listenerAddr; /* Interconnect listener IPv4 address, a C-string */ int listenerPort; /* Interconnect listener port */ int pid; /* Backend PID of the process. */ - /* Unclear about why we need these, however, it is no trouble to carry - * them. - */ - int contentid; } CdbProcess; -/* ---------------- +/* * MPP Plan Slice information * * These structures summarize how a plan tree is sliced up into separate * units of execution or slices. A slice will execute on a each worker within * a gang of processes. Some gangs have a worker process on each of several * databases, others have a single worker. - * ---------------- + * */ typedef struct Slice { NodeTag type; /* - * The index in the global slice table of this - * slice. The root slice of the main plan is - * always 0. Slices that have senders at their - * local root have a sliceIndex equal to the - * motionID of their sender Motion. + * The index in the global slice table of this slice. The root slice of + * the main plan is always 0. Slices that have senders at their local + * root have a sliceIndex equal to the motionID of their sender Motion. * - * Undefined slices should have this set to - * -1. + * Undefined slices should have this set to -1. */ int sliceIndex; /* - * The root slice of the slice tree of which - * this slice is a part. + * The root slice of the slice tree of which this slice is a part. */ int rootIndex; /* - * the index of parent in global slice table (origin 0) - * or -1 if this is root slice. + * the index of parent in global slice table (origin 0) or -1 if + * this is root slice. */ int parentIndex; /* - * An integer list of indices in the global slice - * table (origin 0) of the child slices of - * this slice, or -1 if this is a leaf slice. - * A child slice corresponds to a receiving - * motion in this slice. + * An integer list of indices in the global slice table (origin 0) + * of the child slices of this slice, or -1 if this is a leaf slice. + * A child slice corresponds to a receiving motion in this slice. */ List *children; - /* What kind of gang does this slice need? */ + /* What kind of gang does this slice need */ GangType gangType; - /* How many gang members needed? + /* + * How many gang members needed * - * This may seem redundant, but it is set before - * the process lists below and used to decide how + * It is set before the process lists below and used to decide how * to initialize them. */ int gangSize; - /* how many of the gang members will actually be used? - * This takes into account directDispatch information + /* + * How many of the gang members will actually be used. This takes into + * account directDispatch information */ int numGangMembersToBeActive; - /** + /* * directDispatch->isDirectDispatch should ONLY be set for a slice when it requires an n-gang. */ DirectDispatchInfo directDispatch; @@ -240,27 +224,14 @@ typedef struct Slice struct Gang *primaryGang; /* tell dispatch agents which gang we're talking about.*/ - int primary_gang_id; + int primary_gang_id; /* - * A list of CDBProcess nodes corresponding to - * the worker processes allocated to implement - * this plan slice. + * A list of CDBProcess nodes corresponding to the worker processes allocated + * to implement this plan slice. * - * The number of processes must agree with the - * the plan slice to be implemented. In MPP 2 - * the possibilities are 1 for a slice that - * operates on a single stream of tuples (e.g., - * the receiver of a fixed motion, a 1-gang), or - * the number of segments (e.g., a parallel - * operation or the receiver of the results of a - * parallel operation, an N-gang). - * - * The processes of an N-gang must be in hash - * value order -- a hash value of H used to direct - * an input tuple to this slice will direct it to - * the H+1st process in the list. + * The number of processes must agree with the the plan slice to be implemented. */ List *primaryProcesses; } Slice; @@ -271,29 +242,27 @@ typedef struct Slice * * Slice 0 is the root slice of plan as a whole. * Slices 1 through nMotion are motion slices with a sending motion at - * the root of the slice. + * the root of the slice. * Slices nMotion+1 and on are root slices of initPlans. * * There may be unused slices in case the plan contains subplans that - * are not initPlans. (This won't happen unless MPP decides to support + * are not initPlans. (This won't happen unless MPP decides to support * subplans similarly to PostgreSQL, which isn't the current plan.) */ typedef struct SliceTable { NodeTag type; - int nMotions; /* The number Motion nodes in the entire plan */ - int nInitPlans; /* The number of initplan slices allocated */ - int localSlice; /* Index of the slice to execute. */ - List *slices; /* List of slices */ - bool doInstrument; /* true => collect stats for EXPLAIN ANALYZE */ + int nMotions; /* The number Motion nodes in the entire plan */ + int nInitPlans; /* The number of initplan slices allocated */ + int localSlice; /* Index of the slice to execute */ + List *slices; /* List of slices */ + bool doInstrument; /* true => collect stats for EXPLAIN ANALYZE */ uint32 ic_instance_id; } SliceTable; -struct EState; - extern void InitSliceTable(struct EState *estate, int nMotions, int nSubplans); -extern Slice *getCurrentSlice(struct EState* estate, int sliceIndex); +extern Slice *getCurrentSlice(struct EState *estate, int sliceIndex); extern bool sliceRunsOnQD(Slice *slice); extern bool sliceRunsOnQE(Slice *slice); extern int sliceCalculateNumSendingProcesses(Slice *slice);