提交 75480b8c 编写于 作者: P Pengzhou Tang

Abandon cdbdisp_dispatchUtilityStatement

cdbdisp_dispatchUtilityStatement is designed to dispatch a utility statement synchronously used by
DefineIndex(), DefineExternalRelation(), DefineRelation() and createdb(). The formor three function
actually use it in asynchronous way, so replace it with CdbDispatchUtilityStatement.

createdb() use it synchronously, but the performance improvement is little. To make code clean, abandon
this function and replace it with asynchronous version.
上级 f7078db2
......@@ -82,14 +82,6 @@ typedef struct DispatchCommandQueryParms
int *sliceIndexGangIdMap;
} DispatchCommandQueryParms;
static void
cdbdisp_dispatchCommand(const char *strCommand,
char *serializedQuerytree,
int serializedQuerytreelen,
bool cancelOnError,
bool needTwoPhase,
bool withSnapshot, CdbDispatcherState *ds);
static void
cdbdisp_dispatchCommandInternal(const char *strCommand,
char *serializedQuerytree,
......@@ -474,169 +466,6 @@ CdbSetGucOnAllGangs(const char *strCommand,
PG_END_TRY();
}
/*
* cdbdisp_dispatchCommand:
* Send the strCommand SQL statement to all segdbs in the cluster
* cancelOnError indicates whether an error
* occurring on one of the qExec segdbs should cause all still-executing commands to cancel
* on other qExecs. Normally this would be true. The commands are sent over the libpq
* connections that were established during gang creation. They are run inside of threads.
* The number of segdbs handled by any one thread is determined by the
* guc variable gp_connections_per_thread.
*
* The CdbDispatchResults objects allocated for the command
* are returned in *pPrimaryResults
* The caller, after calling CdbCheckDispatchResult(), can
* examine the CdbDispatchResults objects, can keep them as
* long as needed, and ultimately must free them with
* cdbdisp_destroyDispatcherState() prior to deallocation
* of the memory context from which they were allocated.
*
* NB: Callers should use PG_TRY()/PG_CATCH() if needed to make
* certain that the CdbDispatchResults objects are destroyed by
* cdbdisp_destroyDispatcherState() in case of error.
* To wait for completion, check for errors, and clean up, it is
* suggested that the caller use cdbdisp_finishCommand().
*/
static void
cdbdisp_dispatchCommand(const char *strCommand,
char *serializedQuerytree,
int serializedQuerytreelen,
bool cancelOnError,
bool needTwoPhase,
bool withSnapshot, CdbDispatcherState *ds)
{
DispatchCommandQueryParms queryParms;
Gang *primaryGang;
CdbComponentDatabaseInfo *qdinfo;
char *queryText = NULL;
int queryTextLength = 0;
if (log_dispatch_stats)
ResetUsage();
if (DEBUG5 >= log_min_messages)
elog(DEBUG3, "cdbdisp_dispatchCommand: %s (needTwoPhase = %s)",
strCommand, (needTwoPhase ? "true" : "false"));
else
elog((Debug_print_full_dtm ? LOG : DEBUG3),
"cdbdisp_dispatchCommand: %.50s (needTwoPhase = %s)", strCommand,
(needTwoPhase ? "true" : "false"));
MemSet(&queryParms, 0, sizeof(queryParms));
queryParms.strCommand = strCommand;
queryParms.serializedQuerytree = serializedQuerytree;
queryParms.serializedQuerytreelen = serializedQuerytreelen;
/*
* Allocate a primary QE for every available segDB in the system.
*/
primaryGang = allocateWriterGang();
Assert(primaryGang);
/*
* Serialize a version of our DTX Context Info
*/
queryParms.serializedDtxContextInfo =
qdSerializeDtxContextInfo(&queryParms.serializedDtxContextInfolen,
withSnapshot, false,
mppTxnOptions(needTwoPhase),
"cdbdisp_dispatchCommand");
/*
* sequence server info
*/
qdinfo = &(getComponentDatabases()->entry_db_info[0]);
Assert(qdinfo != NULL && qdinfo->hostip != NULL);
queryParms.seqServerHost = pstrdup(qdinfo->hostip);
queryParms.seqServerHostlen = strlen(qdinfo->hostip) + 1;
queryParms.seqServerPort = seqServerCtl->seqServerPort;
/*
* Dispatch the command.
*/
ds->primaryResults = NULL;
ds->dispatchParams = NULL;
queryText = buildGpQueryString(ds, &queryParms, &queryTextLength);
cdbdisp_destroyQueryParms(&queryParms);
cdbdisp_makeDispatcherState(ds, /*slice count*/1, cancelOnError, queryText, queryTextLength);
ds->primaryResults->writer_gang = primaryGang;
cdbdisp_dispatchToGang(ds, primaryGang, -1, DEFAULT_DISP_DIRECT);
/*
* don't pfree serializedShapshot here, it will be pfree'd when
* the first thread is destroyed.
*/
}
/*
* Dispatch a command - already parsed and in the form of a Node tree
* - to all primary segdbs. Does not wait for completion. Does not
* start a global transaction.
*
* NB: Callers should use PG_TRY()/PG_CATCH() if needed to make
* certain that the CdbDispatchResults objects are destroyed by
* cdbdisp_destroyDispatcherState() in case of error.
* To wait for completion, check for errors, and clean up, it is
* suggested that the caller use cdbdisp_finishCommand().
*/
void
cdbdisp_dispatchUtilityStatement(struct Node *stmt,
bool cancelOnError,
bool needTwoPhase,
bool withSnapshot,
struct CdbDispatcherState *ds,
char *debugCaller)
{
char *serializedQuerytree;
int serializedQuerytree_len;
Query *q = makeNode(Query);
StringInfoData buffer;
elog((Debug_print_full_dtm ? LOG : DEBUG5),
"cdbdisp_dispatchUtilityStatement debug_query_string = %s (needTwoPhase = %s, debugCaller = %s)",
debug_query_string, (needTwoPhase ? "true" : "false"), debugCaller);
dtmPreCommand("cdbdisp_dispatchUtilityStatement", "(none)", NULL,
needTwoPhase, withSnapshot, false /* inCursor */ );
initStringInfo(&buffer);
q->commandType = CMD_UTILITY;
Assert(stmt != NULL);
Assert(stmt->type < 1000);
Assert(stmt->type > 0);
q->utilityStmt = stmt;
q->querySource = QSRC_ORIGINAL;
/*
* We must set q->canSetTag = true. False would be used to hide a command
* introduced by rule expansion which is not allowed to return its
* completion status in the command tag (PQcmdStatus/PQcmdTuples). For
* example, if the original unexpanded command was SELECT, the status
* should come back as "SELECT n" and should not reflect other commands
* inserted by rewrite rules. True means we want the status.
*/
q->canSetTag = true;
/*
* serialized the stmt tree, and create the sql statement: mppexec ....
*/
serializedQuerytree =
serializeNode((Node *) q, &serializedQuerytree_len, NULL /*uncompressed_size */);
Assert(serializedQuerytree != NULL);
cdbdisp_dispatchCommand(debug_query_string, serializedQuerytree,
serializedQuerytree_len, cancelOnError,
needTwoPhase, withSnapshot, ds);
}
/*
* CdbDispatchCommand:
*
......
......@@ -93,8 +93,6 @@ static bool get_db_info(const char *name, LOCKMODE lockmode,
static bool have_createdb_privilege(void);
static bool check_db_file_conflict(Oid db_id);
static void createdb_int(CreatedbStmt *stmt, CdbDispatcherState *ds);
/*
* Create target database directories (under transaction).
*/
......@@ -602,28 +600,6 @@ static void copy_append_only_segment_file(
*/
void
createdb(CreatedbStmt *stmt)
{
volatile struct CdbDispatcherState ds = {NULL, NULL, NULL};
PG_TRY();
{
createdb_int(stmt, (struct CdbDispatcherState *)&ds);
cdbdisp_finishCommand((struct CdbDispatcherState *)&ds);
}
PG_CATCH();
{
/* If dispatched, stop QEs and clean up after them. */
if (ds.primaryResults)
cdbdisp_handleError((struct CdbDispatcherState *)&ds);
PG_RE_THROW();
/* not reached */
}
PG_END_TRY();
}
static void
createdb_int(CreatedbStmt *stmt, CdbDispatcherState *ds)
{
Oid src_dboid = InvalidOid;
Oid src_owner;
......@@ -985,12 +961,11 @@ createdb_int(CreatedbStmt *stmt, CdbDispatcherState *ds)
*
* Doesn't wait for the QEs to finish execution.
*/
cdbdisp_dispatchUtilityStatement((Node *)stmt,
true, /* cancelOnError */
true, /* startTransaction */
true, /* withSnapshot */
ds,
"createdb_int");
CdbDispatchUtilityStatement((Node *)stmt,
DF_CANCEL_ON_ERROR |
DF_NEED_TWO_PHASE |
DF_WITH_SNAPSHOT,
NULL);
}
/*
......
......@@ -727,34 +727,11 @@ DefineIndex(RangeVar *heapRelation,
*/
if (shouldDispatch)
{
volatile struct CdbDispatcherState ds = {NULL, NULL};
PG_TRY();
{
/*
* Dispatch the command to all primary and mirror segdbs.
* Doesn't start a global transaction. Doesn't wait for
* the QEs to finish execution.
*/
cdbdisp_dispatchUtilityStatement((Node *) stmt,
true, /* cancelOnError */
false, /* startTransaction */
true, /* withSnapshot */
(struct CdbDispatcherState *)&ds,
"DefineIndex");
/* Wait for all QEs to finish. Throw up if error. */
cdbdisp_finishCommand((struct CdbDispatcherState *)&ds);
}
PG_CATCH();
{
/* If dispatched, stop QEs and clean up after them. */
if (ds.primaryResults)
cdbdisp_handleError((struct CdbDispatcherState *)&ds);
PG_RE_THROW();
/* not reached */
}
PG_END_TRY();
CdbDispatchUtilityStatement((Node *)stmt,
DF_CANCEL_ON_ERROR|
DF_WITH_SNAPSHOT,
NULL);
}
StartTransactionCommand();
......
......@@ -370,8 +370,6 @@ static Datum transformExecOnClause(List *on_clause);
static char transformFormatType(char *formatname);
static Datum transformFormatOpts(char formattype, List *formatOpts, int numcols, bool iswritable);
static Oid DefineRelation_int(CreateStmt *stmt, char relkind, char relstorage, CdbDispatcherState *ds);
static void ATExecPartAddInternal(Relation rel, Node *def);
static void
......@@ -400,43 +398,6 @@ static char *alterTableCmdString(AlterTableType subtype);
*/
Oid
DefineRelation(CreateStmt *stmt, char relkind, char relstorage)
{
volatile struct CdbDispatcherState ds = {NULL, NULL, NULL};
Oid reloid = 0;
Assert(stmt->relation->schemaname == NULL || strlen(stmt->relation->schemaname)>0);
PG_TRY();
{
reloid = DefineRelation_int(stmt, relkind, relstorage, (struct CdbDispatcherState *)&ds);
}
PG_CATCH();
{
/* If dispatched, stop QEs and clean up after them. */
if (ds.primaryResults)
cdbdisp_handleError((struct CdbDispatcherState *)&ds);
/* Carry on with error handling. */
PG_RE_THROW();
}
PG_END_TRY();
/*
* We successfully completed our work. Now check the results from the
* qExecs, if dispatched. This waits for them to all finish, and exits
* via ereport(ERROR,...) if unsuccessful.
*/
cdbdisp_finishCommand((struct CdbDispatcherState *)&ds);
return reloid;
}
Oid
DefineRelation_int(CreateStmt *stmt,
char relkind,
char relstorage,
CdbDispatcherState *ds)
{
char relname[NAMEDATALEN];
Oid namespaceId;
......@@ -836,12 +797,11 @@ DefineRelation_int(CreateStmt *stmt,
/* Dispatch the statement tree to all primary and mirror segdbs.
* Doesn't wait for the QEs to finish execution.
*/
cdbdisp_dispatchUtilityStatement((Node *)stmt,
true, /* cancelOnError */
true, /* startTransaction */
true, /* withSnapshot */
ds,
"DefineRelation_int");
CdbDispatchUtilityStatement((Node *)stmt,
DF_CANCEL_ON_ERROR |
DF_NEED_TWO_PHASE |
DF_WITH_SNAPSHOT,
NULL);
}
......@@ -872,7 +832,6 @@ DefineRelation_int(CreateStmt *stmt,
extern void
DefineExternalRelation(CreateExternalStmt *createExtStmt)
{
volatile struct CdbDispatcherState ds = {NULL, NULL, NULL};
CreateStmt *createStmt = makeNode(CreateStmt);
ExtTableTypeDesc *exttypeDesc = (ExtTableTypeDesc *)createExtStmt->exttypedesc;
SingleRowErrorDesc *singlerowerrorDesc = NULL;
......@@ -1173,86 +1132,63 @@ DefineExternalRelation(CreateExternalStmt *createExtStmt)
/*
* Now we take care of pg_exttable.
*
* get our pg_class external rel OID. If we're the QD we just created
* it above. If we're a QE DefineRelation() was already dispatched to
* us and therefore we have a local entry in pg_class. get the OID
* from cache.
*/
PG_TRY();
{
/*
* get our pg_class external rel OID. If we're the QD we just created
* it above. If we're a QE DefineRelation() was already dispatched to
* us and therefore we have a local entry in pg_class. get the OID
* from cache.
*/
if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_UTILITY)
Assert(reloid != InvalidOid);
else
reloid = RangeVarGetRelid(createExtStmt->relation, true);
/*
* In the case of error log file, set fmtErrorTblOid to the external table itself.
*/
if (issreh)
fmtErrTblOid = reloid;
if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_UTILITY)
Assert(reloid != InvalidOid);
else
reloid = RangeVarGetRelid(createExtStmt->relation, true);
/*
* create a pg_exttable entry for this external table.
*/
InsertExtTableEntry(reloid,
iswritable,
isweb,
issreh,
formattype,
rejectlimittype,
commandString,
rejectlimit,
fmtErrTblOid,
encoding,
formatOptStr,
locationExec,
locationUris);
/*
* In the case of error log file, set fmtErrorTblOid to the external table itself.
*/
if (issreh)
fmtErrTblOid = reloid;
/*
* DefineRelation loaded the new relation into relcache, but the
* relcache contains the distribution policy, which in turn depends on
* the contents of pg_exttable, for EXECUTE-type external tables
* (see GpPolicyFetch()). Now that we have created the pg_exttable
* entry, invalidate the relcache, so that it gets loaded with the
* correct information.
*/
CacheInvalidateRelcacheByRelid(reloid);
/*
* create a pg_exttable entry for this external table.
*/
InsertExtTableEntry(reloid,
iswritable,
isweb,
issreh,
formattype,
rejectlimittype,
commandString,
rejectlimit,
fmtErrTblOid,
encoding,
formatOptStr,
locationExec,
locationUris);
if (shouldDispatch)
{
/*
* DefineRelation loaded the new relation into relcache, but the
* relcache contains the distribution policy, which in turn depends on
* the contents of pg_exttable, for EXECUTE-type external tables
* (see GpPolicyFetch()). Now that we have created the pg_exttable
* entry, invalidate the relcache, so that it gets loaded with the
* correct information.
*/
CacheInvalidateRelcacheByRelid(reloid);
/*
* Dispatch the statement tree to all primary segdbs.
* Doesn't wait for the QEs to finish execution.
*/
cdbdisp_dispatchUtilityStatement((Node *)createExtStmt,
true, /* cancelOnError */
true, /* startTransaction */
true, /* withSnapshot */
(struct CdbDispatcherState *)&ds,
"DefineExternalRelation");
}
}
PG_CATCH();
if (shouldDispatch)
{
/* If dispatched, stop QEs and clean up after them. */
if (ds.primaryResults)
cdbdisp_handleError((struct CdbDispatcherState *)&ds);
/* Carry on with error handling. */
PG_RE_THROW();
/*
* Dispatch the statement tree to all primary segdbs.
* Doesn't wait for the QEs to finish execution.
*/
CdbDispatchUtilityStatement((Node *)createExtStmt,
DF_CANCEL_ON_ERROR|
DF_WITH_SNAPSHOT|
DF_NEED_TWO_PHASE,
NULL);
}
PG_END_TRY();
/*
* We successfully completed our work. Now check the results from the
* qExecs, if dispatched. This waits for them to all finish, and exits
* via ereport(ERROR,...) if unsuccessful.
*/
cdbdisp_finishCommand((struct CdbDispatcherState *)&ds);
if(customProtName)
pfree(customProtName);
......
......@@ -89,11 +89,4 @@ CdbDispatchUtilityStatement(struct Node *stmt,
int flags,
struct CdbPgResults* cdb_pgresults);
void
cdbdisp_dispatchUtilityStatement(struct Node *stmt,
bool cancelOnError,
bool needTwoPhase,
bool withSnapshot,
struct CdbDispatcherState *ds,
char *debugCaller);
#endif /* CDBDISP_QUERY_H */
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册