提交 b8fb0957 编写于 作者: P Pengzhou Tang 提交者: Tang Pengzhou

Add two helper functions to construct query parms

* cdbdisp_buildUtilityQueryParms
* cdbdisp_buildCommandQueryParms
上级 957629d1
......@@ -97,14 +97,6 @@ typedef struct DispatchCommandQueryParms
int *sliceIndexGangIdMap;
} DispatchCommandQueryParms;
static void cdbdisp_dispatchCommandInternal(const char *strCommand,
char *serializedQuerytree,
int serializedQuerytreelen,
char *serializedQueryDispatchDesc,
int serializedQueryDispatchDesclen,
int flags,
CdbPgResults *cdb_pgresults);
static int fillSliceVector(SliceTable *sliceTable,
int sliceIndex,
SliceVec *sliceVector,
......@@ -114,6 +106,11 @@ static char *buildGpQueryString(DispatchCommandQueryParms *pQueryParms,
int *finalLen);
static DispatchCommandQueryParms *cdbdisp_buildPlanQueryParms(struct QueryDesc *queryDesc, bool planRequiresTxn);
static DispatchCommandQueryParms *cdbdisp_buildUtilityQueryParms(struct Node *stmt, int flags, List *oid_assignments);
static DispatchCommandQueryParms *cdbdisp_buildCommandQueryParms(const char *strCommand, int flags);
static void cdbdisp_dispatchCommandInternal(char *queryText, int queryTextLength,
int flags, CdbPgResults *cdb_pgresults);
static void cdbdisp_destroyQueryParms(DispatchCommandQueryParms *pQueryParms);
......@@ -269,42 +266,32 @@ CdbDispatchSetCommand(const char *strCommand, bool cancelOnError)
MemoryContext oldContext;
ErrorData *qeError = NULL;
dtmPreCommand("CdbDispatchSetCommand", strCommand, NULL,
false /* no two-phase commit needed for SET */,
false, /* no snapshot needed for SET */
false /* inCursor */ );
elog((Debug_print_full_dtm ? LOG : DEBUG5),
"CdbDispatchSetCommand for command = '%s'",
strCommand);
pQueryParms = cdbdisp_buildCommandQueryParms(strCommand, DF_NONE);
queryText = buildGpQueryString(pQueryParms, &queryTextLength);
primaryGang = AllocateWriterGang();
Assert(primaryGang);
idleReaderGangs = getAllIdleReaderGangs();
allocatedReaderGangs = getAllAllocatedReaderGangs();
gangCount = 1 + list_length(idleReaderGangs) + list_length(allocatedReaderGangs);
pQueryParms = palloc0(sizeof(*pQueryParms));
pQueryParms->strCommand = strCommand;
/*
* serialized a version of our snapshot
*/
pQueryParms->serializedDtxContextInfo =
qdSerializeDtxContextInfo(&pQueryParms->serializedDtxContextInfolen,
false /* withSnapshot */ ,
false /* cursor */ ,
mppTxnOptions(false), /* no two-phase commit needed for SET */
"CdbDispatchSetCommand");
ds = cdbdisp_makeDispatcherState();
oldContext = MemoryContextSwitchTo(DispatcherContext);
queryText = buildGpQueryString(pQueryParms, &queryTextLength);
ds->primaryResults = cdbdisp_makeDispatchResults(gangCount, cancelOnError);
ds->dispatchParams = cdbdisp_makeDispatchParams (gangCount, queryText, queryTextLength);
ds->primaryResults->writer_gang = primaryGang;
MemoryContextSwitchTo(oldContext);
cdbdisp_destroyQueryParms(pQueryParms);
elog((Debug_print_full_dtm ? LOG : DEBUG5),
"CdbDispatchSetCommand for command = '%s'",
strCommand);
dtmPreCommand("CdbDispatchSetCommand", strCommand, NULL,
false /* no two-phase commit needed for SET */,
false, /* no snapshot needed for SET */
false /* inCursor */ );
cdbdisp_dispatchToGang(ds, primaryGang, -1, DEFAULT_DISP_DIRECT);
foreach(le, idleReaderGangs)
......@@ -366,13 +353,24 @@ CdbDispatchCommand(const char *strCommand,
int flags,
CdbPgResults *cdb_pgresults)
{
return cdbdisp_dispatchCommandInternal(strCommand,
NULL,
0,
NULL,
0,
flags,
cdb_pgresults);
DispatchCommandQueryParms *pQueryParms;
char *queryText;
int queryTextLength;
bool needTwoPhase = flags & DF_NEED_TWO_PHASE;
bool withSnapshot = flags & DF_WITH_SNAPSHOT;
dtmPreCommand("CdbDispatchCommand", strCommand,
NULL, needTwoPhase, withSnapshot,
false /* inCursor */ );
elogif((Debug_print_full_dtm || log_min_messages <= DEBUG5), LOG,
"CdbDispatchCommand: %s (needTwoPhase = %s)",
strCommand, (needTwoPhase ? "true" : "false"));
pQueryParms = cdbdisp_buildCommandQueryParms(strCommand, flags);
queryText = buildGpQueryString(pQueryParms, &queryTextLength);
return cdbdisp_dispatchCommandInternal(queryText, queryTextLength, flags, cdb_pgresults);
}
/*
......@@ -395,12 +393,114 @@ CdbDispatchUtilityStatement(struct Node *stmt,
List *oid_assignments,
CdbPgResults *cdb_pgresults)
{
char *serializedQuerytree;
int serializedQuerytree_len;
char *serializedQueryDispatchDesc = NULL;
int serializedQueryDispatchDesc_len = 0;
Query *q;
DispatchCommandQueryParms *pQueryParms;
char *queryText;
int queryTextLength;
bool needTwoPhase = flags & DF_NEED_TWO_PHASE;
bool withSnapshot = flags & DF_WITH_SNAPSHOT;
dtmPreCommand("CdbDispatchUtilityStatement", debug_query_string,
NULL, needTwoPhase, withSnapshot,
false /* inCursor */ );
elogif((Debug_print_full_dtm || log_min_messages <= DEBUG5), LOG,
"CdbDispatchUtilityStatement: %s (needTwoPhase = %s)",
debug_query_string, (needTwoPhase ? "true" : "false"));
pQueryParms = cdbdisp_buildUtilityQueryParms(stmt, flags, oid_assignments);
queryText = buildGpQueryString(pQueryParms, &queryTextLength);
return cdbdisp_dispatchCommandInternal(queryText, queryTextLength, flags, cdb_pgresults);
}
static void
cdbdisp_dispatchCommandInternal(char *queryText,
int queryTextLength,
int flags,
CdbPgResults *cdb_pgresults)
{
CdbDispatcherState *ds;
Gang *primaryGang;
MemoryContext oldContext;
CdbDispatchResults *pr;
ErrorData *qeError = NULL;
/*
* Dispatch the command.
*/
ds = cdbdisp_makeDispatcherState();
/*
* Allocate a primary QE for every available segDB in the system.
*/
primaryGang = AllocateWriterGang();
Assert(primaryGang);
oldContext = MemoryContextSwitchTo(DispatcherContext);
ds->primaryResults = cdbdisp_makeDispatchResults(1, flags & DF_CANCEL_ON_ERROR);
ds->dispatchParams = cdbdisp_makeDispatchParams (1, queryText, queryTextLength);
ds->primaryResults->writer_gang = primaryGang;
MemoryContextSwitchTo(oldContext);
cdbdisp_dispatchToGang(ds, primaryGang, -1, DEFAULT_DISP_DIRECT);
cdbdisp_waitDispatchFinish(ds);
cdbdisp_checkDispatchResult(ds, DISPATCH_WAIT_NONE);
pr = cdbdisp_getDispatchResults(ds, &qeError);
if (qeError)
{
cdbdisp_destroyDispatcherState(ds);
ReThrowError(qeError);
}
cdbdisp_returnResults(pr, cdb_pgresults);
cdbdisp_destroyDispatcherState(ds);
}
static DispatchCommandQueryParms *
cdbdisp_buildCommandQueryParms(const char *strCommand, int flags)
{
bool needTwoPhase = flags & DF_NEED_TWO_PHASE;
bool withSnapshot = flags & DF_WITH_SNAPSHOT;
DispatchCommandQueryParms *pQueryParms;
pQueryParms = palloc0(sizeof(*pQueryParms));
pQueryParms->strCommand = strCommand;
pQueryParms->serializedQuerytree = NULL;
pQueryParms->serializedQuerytreelen = 0;
pQueryParms->serializedQueryDispatchDesc = NULL;
pQueryParms->serializedQueryDispatchDesclen = 0;
/*
* Serialize a version of our DTX Context Info
*/
pQueryParms->serializedDtxContextInfo =
qdSerializeDtxContextInfo(&pQueryParms->serializedDtxContextInfolen,
withSnapshot, false,
mppTxnOptions(needTwoPhase),
"cdbdisp_dispatchCommandInternal");
return pQueryParms;
}
static DispatchCommandQueryParms *
cdbdisp_buildUtilityQueryParms(struct Node *stmt,
int flags,
List *oid_assignments)
{
char *serializedQuerytree = NULL;
char *serializedQueryDispatchDesc = NULL;
int serializedQuerytree_len = 0;
int serializedQueryDispatchDesc_len = 0;
bool needTwoPhase = flags & DF_NEED_TWO_PHASE;
bool withSnapshot = flags & DF_WITH_SNAPSHOT;
QueryDispatchDesc *qddesc;
Query *q;
DispatchCommandQueryParms *pQueryParms;
Assert(stmt != NULL);
Assert(stmt->type < 1000);
......@@ -439,64 +539,12 @@ CdbDispatchUtilityStatement(struct Node *stmt,
NULL /* uncompressed_size */ );
}
/*
* Dispatch serializedQuerytree to primary writer gang.
*/
return cdbdisp_dispatchCommandInternal(debug_query_string,
serializedQuerytree, serializedQuerytree_len,
serializedQueryDispatchDesc, serializedQueryDispatchDesc_len,
flags, cdb_pgresults);
}
/*
* Internal function to send plain command or serialized query tree to all segdbs in
* the cluster
*/
static void
cdbdisp_dispatchCommandInternal(const char *strCommand,
char *serializedQuerytree,
int serializedQuerytreelen,
char *serializedQueryDispatchDesc,
int serializedQueryDispatchDesclen,
int flags,
CdbPgResults *cdb_pgresults)
{
struct CdbDispatcherState *ds;
MemoryContext oldContext;
DispatchCommandQueryParms *pQueryParms;
Gang *primaryGang;
char *queryText = NULL;
int queryTextLength = 0;
ErrorData *qeError = NULL;
CdbDispatchResults *pr = NULL;
if (log_dispatch_stats)
ResetUsage();
bool cancelOnError = flags & DF_CANCEL_ON_ERROR;
bool needTwoPhase = flags & DF_NEED_TWO_PHASE;
bool withSnapshot = flags & DF_WITH_SNAPSHOT;
dtmPreCommand("cdbdisp_dispatchCommandInternal", strCommand,
NULL, needTwoPhase, withSnapshot,
false /* inCursor */ );
elogif((Debug_print_full_dtm || log_min_messages <= DEBUG5), LOG,
"cdbdisp_dispatchCommandInternal: %s (needTwoPhase = %s)",
strCommand, (needTwoPhase ? "true" : "false"));
pQueryParms = palloc0(sizeof(*pQueryParms));
pQueryParms->strCommand = strCommand;
pQueryParms->strCommand = debug_query_string;
pQueryParms->serializedQuerytree = serializedQuerytree;
pQueryParms->serializedQuerytreelen = serializedQuerytreelen;
pQueryParms->serializedQuerytreelen = serializedQuerytree_len;
pQueryParms->serializedQueryDispatchDesc = serializedQueryDispatchDesc;
pQueryParms->serializedQueryDispatchDesclen = serializedQueryDispatchDesclen;
/*
* Allocate a primary QE for every available segDB in the system.
*/
primaryGang = AllocateWriterGang();
Assert(primaryGang);
pQueryParms->serializedQueryDispatchDesclen = serializedQueryDispatchDesc_len;
/*
* Serialize a version of our DTX Context Info
......@@ -507,35 +555,7 @@ cdbdisp_dispatchCommandInternal(const char *strCommand,
mppTxnOptions(needTwoPhase),
"cdbdisp_dispatchCommandInternal");
/*
* Dispatch the command.
*/
ds = cdbdisp_makeDispatcherState();
oldContext = MemoryContextSwitchTo(DispatcherContext);
queryText = buildGpQueryString(pQueryParms, &queryTextLength);
ds->primaryResults = cdbdisp_makeDispatchResults(1, cancelOnError);
ds->dispatchParams = cdbdisp_makeDispatchParams (1, queryText, queryTextLength);
ds->primaryResults->writer_gang = primaryGang;
MemoryContextSwitchTo(oldContext);
cdbdisp_destroyQueryParms(pQueryParms);
cdbdisp_dispatchToGang(ds, primaryGang, -1, DEFAULT_DISP_DIRECT);
cdbdisp_waitDispatchFinish(ds);
cdbdisp_checkDispatchResult(ds, DISPATCH_WAIT_NONE);
pr = cdbdisp_getDispatchResults(ds, &qeError);
if (qeError)
{
cdbdisp_destroyDispatcherState(ds);
ReThrowError(qeError);
}
cdbdisp_returnResults(pr, cdb_pgresults);
cdbdisp_destroyDispatcherState(ds);
return pQueryParms;
}
static DispatchCommandQueryParms *
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册