提交 6a28c978 编写于 作者: H Heikki Linnakangas

Clean up the way the results array is allocated in cdbdisp_returnResults().

I saw the "nresults < nslots" assertion fail, while hacking on something
else. It happened when a Distributed Prepare command failed, and there were
several error result sets from a segment. I'm not sure how normal it is to
receive multiple ERROR responses to a single query, but the protocol
certainly allows it, and I don't see any explanation for why the code used
to assume that there can be at most 2 result sets from each segment.

Remove that assumption, and make the code cope with more than two result
sets from a segment, by calculating the required size of the array
accurately.

In the passing, remove the NULL-terminator from the array, and change the
callers that depended on it to use the returned size variable instead.
Makes the loops in the callers look less funky.
上级 c04d827d
......@@ -658,7 +658,6 @@ cdbdisp_returnResults(CdbDispatchResults *primaryResults,
StringInfo errmsgbuf,
int *numresults)
{
CdbDispatchResults *gangResults;
CdbDispatchResult *dispatchResult;
PGresult **resultSets = NULL;
int nslots;
......@@ -667,12 +666,15 @@ cdbdisp_returnResults(CdbDispatchResults *primaryResults,
int totalResultCount=0;
/*
* Allocate result set ptr array. Make room for one PGresult ptr per
* primary segment db, plus a null terminator slot after the
* last entry. The caller must PQclear() each PGresult and free() the
* array.
* Allocate result set ptr array. The caller must PQclear() each PGresult
* and free() the array.
*/
nslots = 2 * largestGangsize() + 1;
nslots = 0;
if (primaryResults)
{
for (i = 0; i < primaryResults->resultCount; ++i)
nslots += cdbdisp_numPGresult(&primaryResults->resultArray[i]);
}
resultSets = (struct pg_result **)calloc(nslots, sizeof(*resultSets));
if (!resultSets)
......@@ -680,14 +682,13 @@ cdbdisp_returnResults(CdbDispatchResults *primaryResults,
errmsg("cdbdisp_returnResults failed: out of memory")));
/* Collect results from primary gang. */
gangResults = primaryResults;
if (gangResults)
if (primaryResults)
{
totalResultCount = gangResults->resultCount;
totalResultCount = primaryResults->resultCount;
for (i = 0; i < gangResults->resultCount; ++i)
for (i = 0; i < primaryResults->resultCount; ++i)
{
dispatchResult = &gangResults->resultArray[i];
dispatchResult = &primaryResults->resultArray[i];
/* Append error messages to caller's buffer. */
cdbdisp_dumpDispatchResult(dispatchResult, false, errmsgbuf);
......@@ -695,17 +696,13 @@ cdbdisp_returnResults(CdbDispatchResults *primaryResults,
/* Take ownership of this QE's PGresult object(s). */
nresults += cdbdisp_snatchPGresults(dispatchResult,
resultSets + nresults,
nslots - nresults - 1);
nslots - nresults);
}
}
Assert(nresults == nslots);
/* Put a stopper at the end of the array. */
Assert(nresults < nslots);
resultSets[nresults] = NULL;
/* If our caller is interested, tell them how many sets we're returning. */
if (numresults != NULL)
*numresults = totalResultCount;
/* tell the caller how many sets we're returning. */
*numresults = totalResultCount;
return resultSets;
}
......@@ -715,7 +712,6 @@ cdbdisp_returnResults(CdbDispatchResults *primaryResults,
*
* Returns a malloc'ed array containing the PGresult objects thus
* produced; the caller must PQclear() them and free() the array.
* A NULL entry follows the last used entry in the array.
*
* Any error messages - whether or not they are associated with
* PGresult objects - are appended to a StringInfo buffer provided
......
......@@ -1125,15 +1125,15 @@ Persistent_PostDTMRecv_NonDBSpecificPTCatVerification(void)
{
PGresult **pgresultSets = NULL;
StringInfoData errmsgbuf;
PGresult *rs = NULL;
int i;
int nresults;
int i;
const char *cmdbuf = "select * from gp_nondbspecific_ptcat_verification()";
initStringInfo(&errmsgbuf);
/* call to all QE to perform verifications */
pgresultSets = cdbdisp_dispatchRMCommand(cmdbuf, /* withSnapshot */false,
&errmsgbuf, NULL);
&errmsgbuf, &nresults);
if (errmsgbuf.len > 0)
{
......@@ -1144,13 +1144,8 @@ Persistent_PostDTMRecv_NonDBSpecificPTCatVerification(void)
pfree(errmsgbuf.data);
}
rs = pgresultSets[0];
i=0;
do
{
PQclear(rs);
} while ((rs = pgresultSets[++i]) != NULL);
for (i = 0; i < nresults; i++)
PQclear(pgresultSets[i]);
pfree(errmsgbuf.data);
free(pgresultSets);
......@@ -1165,15 +1160,15 @@ Persistent_PostDTMRecv_DBSpecificPTCatVerification(void)
{
PGresult **pgresultSets = NULL;
StringInfoData errmsgbuf;
PGresult *rs = NULL;
int i =0;
int nresults;
int i;
const char *cmdbuf = "select * from gp_dbspecific_ptcat_verification()";
initStringInfo(&errmsgbuf);
/* call to all QE to perform verifications */
pgresultSets = cdbdisp_dispatchRMCommand(cmdbuf, /* withSnapshot */false,
&errmsgbuf, NULL);
&errmsgbuf, &nresults);
/* display error messages */
if (errmsgbuf.len > 0)
......@@ -1185,12 +1180,8 @@ Persistent_PostDTMRecv_DBSpecificPTCatVerification(void)
pfree(errmsgbuf.data);
}
rs = pgresultSets[0];
i=0;
do
{
PQclear(rs);
} while ((rs = pgresultSets[++i]) != NULL);
for (i = 0; i < nresults; i++)
PQclear(pgresultSets[i]);
pfree(errmsgbuf.data);
free(pgresultSets);
......
......@@ -131,7 +131,6 @@ static void recoverTM(void);
static bool recoverInDoubtTransactions(void);
static HTAB *gatherRMInDoubtTransactions(void);
static void abortRMInDoubtTransactions(HTAB *htab);
static void cleanRMResultSets(PGresult **pgresultSets);
static void dumpAllDtx(void);
/* static void resolveInDoubtDtx(void); */
......@@ -2286,7 +2285,9 @@ doDispatchDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand, int flags,
pfree(errbuf.data);
/* Now we clean up the results array. */
cleanRMResultSets(results);
for (i = 0; i < resultCount; i++)
PQclear(results[i]);
free(results);
return (numOfFailed == 0);
}
......@@ -2356,7 +2357,9 @@ dispatchDtxCommand(const char *cmd, bool withSnapshot, bool raiseError)
pfree(errbuf.data);
/* Now we clean up the results array. */
cleanRMResultSets(results);
for (i = 0; i < resultCount; i++)
PQclear(results[i]);
free(results);
return (numOfFailed == 0);
}
......@@ -3408,7 +3411,6 @@ determineSegmentMaxDistributedXid(void)
for (i = 0; i < resultCount; i++)
PQclear(results[i]);
free(results);
return max;
......@@ -3427,16 +3429,17 @@ determineSegmentMaxDistributedXid(void)
static HTAB *
gatherRMInDoubtTransactions(void)
{
PGresult **pgresultSets = NULL;
PGresult **pgresultSets;
int nresultSets;
StringInfoData errmsgbuf;
const char *cmdbuf = "select gid from pg_prepared_xacts";
PGresult **rs;
PGresult *rs;
InDoubtDtx *lastDtx = NULL;
HASHCTL hctl;
HTAB *htab = NULL;
int i;
int j,
rows;
bool found;
......@@ -3445,7 +3448,7 @@ gatherRMInDoubtTransactions(void)
/* call to all QE to get in-doubt transactions */
pgresultSets = cdbdisp_dispatchRMCommand(cmdbuf, /* withSnapshot */false,
&errmsgbuf, NULL);
&errmsgbuf, &nresultSets);
/* display error messages */
if (errmsgbuf.len > 0)
......@@ -3464,9 +3467,10 @@ gatherRMInDoubtTransactions(void)
pfree(errmsgbuf.data);
/* If any result set is nonempty, there are in-doubt transactions. */
for (rs = pgresultSets; *rs; ++rs)
for (i = 0; i < nresultSets; i++)
{
rows = PQntuples(*rs);
rs = pgresultSets[i];
rows = PQntuples(rs);
for (j = 0; j < rows; j++)
{
......@@ -3492,7 +3496,7 @@ gatherRMInDoubtTransactions(void)
}
}
gid = PQgetvalue(*rs, j, 0);
gid = PQgetvalue(rs, j, 0);
/* Now we can add entry to hash table */
lastDtx = (InDoubtDtx *) hash_search(htab, gid, HASH_ENTER, &found);
......@@ -3511,7 +3515,9 @@ gatherRMInDoubtTransactions(void)
}
}
cleanRMResultSets(pgresultSets);
for (i = 0; i < nresultSets; i++)
PQclear(pgresultSets[i]);
free(pgresultSets);
return htab;
}
......@@ -3577,27 +3583,6 @@ abortRMInDoubtTransactions(HTAB *htab)
}
}
/*
* cleanRMResultSets:
* Goes the set of PGresults passed in and clears and frees them.
*/
static void
cleanRMResultSets(PGresult **pgresultSets)
{
/* TODO: need to handle multiple dbs in a segment case */
/*
int tolaldbs = CdbClusterDefinition->segment_count * CdbClusterDefinition->db_count;
*/
int i = 0;
PGresult *rs = pgresultSets[0];
do
{
PQclear(rs);
} while ((rs = pgresultSets[++i]) != NULL);
free(pgresultSets);
}
/*
* dumpAllDtx:
* used to log the current state (according to the DTM) of all the
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册