提交 9643bac7 编写于 作者: A Andreas Scherbaum 提交者: Andreas Scherbaum

Make SPI work with 64 bit counters (#2679)

* Make SPI work with 64 bit counters

Fix GET DIAGNOSTICS
Remove the earlier introduced SPI_processed64 variable

This includes the following upstream patches:
https://github.com/greenplum-db/gpdb/commit/23a27b039d94ba359286694831eafe03cd970eef
https://github.com/greenplum-db/gpdb/commit/f3f3aae4b7841f4dc51129691a7404a03eb55449
https://github.com/greenplum-db/gpdb/commit/ab737f6ba9fc0a26d32a95b115d5cd0e24a63191

The https://github.com/greenplum-db/gpdb/commit/74a379b984d4df91acec2436a16c51caee3526af
is not yet included, because repalloc_huge() is not yet backported.
上级 32f099fd
......@@ -578,7 +578,7 @@ check_foreign_key(PG_FUNCTION_ARGS)
snprintf(ident, sizeof(ident), "%s$%u", trigger->tgname, rel->rd_id);
plan = find_plan(ident, &FPlans, &nFPlans);
ret = SPI_execp(plan->splan[r], kvals, NULL, tcount);
ret = SPI_execp(plan->splan[r], kvals, NULL, (int64) tcount);
/* we have no NULLs - so we pass ^^^^ here */
if (ret < 0)
......@@ -599,7 +599,7 @@ check_foreign_key(PG_FUNCTION_ARGS)
else
{
#ifdef REFINT_VERBOSE
elog(NOTICE, "%s: %d tuple(s) of %s are %s",
elog(NOTICE, "%s: " UINT64_FORMAT " tuple(s) of %s are %s",
trigger->tgname, SPI_processed, relname,
(action == 'c') ? "deleted" : "set to null");
#endif
......
......@@ -522,9 +522,9 @@ xpath_table(PG_FUNCTION_ARGS)
int numpaths;
int ret;
int proc;
int i;
int j;
uint64 proc;
uint64 i; /* rows */
int j; /* columns */
int rownr; /* For issuing multiple rows from one original
* document */
bool had_values; /* To determine end of nodeset results */
......@@ -631,7 +631,6 @@ xpath_table(PG_FUNCTION_ARGS)
query_buf.data);
proc = SPI_processed;
/* elog(DEBUG1,"xpath_table: SPI returned %d rows",proc); */
tuptable = SPI_tuptable;
spi_tupdesc = tuptable->tupdesc;
......
......@@ -1534,7 +1534,7 @@ gp_update_aocol_master_stats_internal(Relation parentrel, Snapshot appendOnlyMet
Relation aosegrel;
bool connected = false;
char aoseg_relname[NAMEDATALEN];
int proc;
int proc; /* 32 bit, only holds number of segments */
int ret;
int64 total_count = 0;
MemoryContext oldcontext = CurrentMemoryContext;
......@@ -1569,7 +1569,7 @@ gp_update_aocol_master_stats_internal(Relation parentrel, Snapshot appendOnlyMet
/* Do the query. */
ret = SPI_execute(sqlstmt.data, false, 0);
proc = SPI_processed;
proc = (int) SPI_processed;
if (ret > 0 && SPI_tuptable != NULL)
......@@ -1724,7 +1724,7 @@ aocol_compression_ratio_internal(Relation parentrel)
Relation aosegrel;
bool connected = false;
char aocsseg_relname[NAMEDATALEN];
int proc;
int proc; /* 32 bit, only holds number of segments */
int ret;
int64 eof = 0;
int64 eof_uncompressed = 0;
......@@ -1773,7 +1773,7 @@ aocol_compression_ratio_internal(Relation parentrel)
/* Do the query. */
ret = SPI_execute(sqlstmt.data, false, 0);
proc = SPI_processed;
proc = (int) SPI_processed;
if (ret > 0 && SPI_tuptable != NULL)
{
......
......@@ -1165,7 +1165,7 @@ gp_update_aorow_master_stats_internal(Relation parentrel, Snapshot appendOnlyMet
Relation aosegrel;
bool connected = false;
char aoseg_relname[NAMEDATALEN];
int proc;
int proc; /* 32 bit, only holds number of segments */
int ret;
int64 total_count = 0;
MemoryContext oldcontext = CurrentMemoryContext;
......@@ -1200,7 +1200,7 @@ gp_update_aorow_master_stats_internal(Relation parentrel, Snapshot appendOnlyMet
/* Do the query. */
ret = SPI_execute(sqlstmt.data, false, 0);
proc = SPI_processed;
proc = (int) SPI_processed;
if (ret > 0 && SPI_tuptable != NULL)
......@@ -1554,8 +1554,10 @@ gp_update_ao_master_stats_oid(PG_FUNCTION_ARGS)
typedef struct
{
int index;
int rows;
uint64 index;
/* there is a chance the count will return more than 2^32 rows
* plus SPI_processed is 64 bit anyway */
uint64 rows;
} QueryInfo;
......@@ -1982,7 +1984,7 @@ aorow_compression_ratio_internal(Relation parentrel)
Relation aosegrel;
bool connected = false;
char aoseg_relname[NAMEDATALEN];
int proc;
int proc; /* 32 bit, only holds number of segments */
int ret;
float8 compress_ratio = -1; /* the default, meaning "not
* available" */
......@@ -2026,7 +2028,7 @@ aorow_compression_ratio_internal(Relation parentrel)
/* Do the query. */
ret = SPI_execute(sqlstmt.data, false, 0);
proc = SPI_processed;
proc = (int) SPI_processed;
if (ret > 0 && SPI_tuptable != NULL)
{
......
......@@ -981,7 +981,7 @@ Persistent_ExecuteQuery(char const *query, bool readOnlyQuery)
{
StringInfoData sqlstmt;
int ret;
int proc = 0;
int proc = 0; /* 32 bit, only holds metadata */
Assert(query);
Insist(connected);
......@@ -1000,7 +1000,7 @@ Persistent_ExecuteQuery(char const *query, bool readOnlyQuery)
/* Run the query. */
ret = SPI_execute(sqlstmt.data, readOnlyQuery, 0);
proc = SPI_processed;
proc = (int) SPI_processed;
if (ret > 0 && SPI_tuptable != NULL)
{
......
......@@ -1376,7 +1376,7 @@ ChangeTracking_GetIncrementalChangeList(void)
IncrementalChangeList *result = NULL;
StringInfoData sqlstmt;
int ret;
int proc;
int proc; /* 32 bit, only holds metadata */
volatile bool connected = false;
ResourceOwner save = CurrentResourceOwner;
MemoryContext oldcontext = CurrentMemoryContext;
......@@ -1432,7 +1432,7 @@ ChangeTracking_GetIncrementalChangeList(void)
/* Do the query. */
ret = SPI_execute(sqlstmt.data, true, 0);
proc = SPI_processed;
proc = (int) SPI_processed;
if (ret > 0 && SPI_tuptable != NULL)
......@@ -1653,7 +1653,7 @@ ChangeTracking_GetChanges(ChangeTrackingRequest *request)
ChangeTrackingResult *result = NULL;
StringInfoData sqlstmt;
int ret;
int proc;
int proc; /* 32 bit, only holds metadata */
int i;
volatile bool connected = false; /* needs to survive PG_TRY()/CATCH() */
ResourceOwner save = CurrentResourceOwner;
......@@ -1727,7 +1727,7 @@ ChangeTracking_GetChanges(ChangeTrackingRequest *request)
/* Do the query. */
ret = SPI_execute(sqlstmt.data, true, 0);
proc = SPI_processed;
proc = (int) SPI_processed;
if (ret > 0 && SPI_tuptable != NULL)
......@@ -3193,7 +3193,7 @@ ChangeTracking_CompactLogFile(CTFType source, CTFType dest, XLogRecPtr *uptolsn)
{
StringInfoData sqlstmt;
int ret;
int proc;
int proc; /* 32 bit, only holds metadata */
bool connected = false;
int64 count = 0;
ResourceOwner save = CurrentResourceOwner;
......@@ -3240,7 +3240,7 @@ ChangeTracking_CompactLogFile(CTFType source, CTFType dest, XLogRecPtr *uptolsn)
/* Do the query. */
ret = SPI_execute(sqlstmt.data, true, 0);
proc = SPI_processed;
proc = (int) SPI_processed;
if ((segmentState != SegmentStateChangeTrackingDisabled) &&
ret > 0 && SPI_tuptable != NULL)
......
......@@ -1447,7 +1447,7 @@ acquire_sample_rows_by_query(Relation onerel, int nattrs, VacAttrStats **attrsta
float4 relTuples;
float4 relPages;
int ret;
int sampleTuples;
int sampleTuples; /* 32 bit - assume that number of tuples will not > 2B */
Datum *vals;
bool *nulls;
MemoryContext oldcxt;
......@@ -1587,7 +1587,12 @@ acquire_sample_rows_by_query(Relation onerel, int nattrs, VacAttrStats **attrsta
*/
ret = SPI_execute(str.data, false, 0);
Assert(ret > 0);
sampleTuples = SPI_processed;
/*
* targrows in analyze_rel_internal() is an int,
* it's unlikely that this query will return more rows
*/
Assert(SPI_processed < INT_MAX);
sampleTuples = (int) SPI_processed;
/* Ok, read in the tuples to *rows */
MemoryContextSwitchTo(oldcxt);
......
......@@ -186,7 +186,7 @@ PerformPortalFetch(FetchStmt *stmt,
char *completionTag)
{
Portal portal;
long nprocessed;
uint64 nprocessed;
/*
* Disallow empty-string cursor name (conflicts with protocol-level
......@@ -219,7 +219,7 @@ PerformPortalFetch(FetchStmt *stmt,
/* Return command status if wanted */
if (completionTag)
snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "%s %ld",
snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "%s " UINT64_FORMAT,
stmt->ismove ? "MOVE" : "FETCH",
nprocessed);
}
......@@ -451,7 +451,7 @@ PersistHoldablePortal(Portal portal)
true);
/* Fetch the result set into the tuplestore */
ExecutorRun(queryDesc, ForwardScanDirection, 0L);
ExecutorRun(queryDesc, ForwardScanDirection, 0);
(*queryDesc->dest->rDestroy) (queryDesc->dest);
queryDesc->dest = NULL;
......@@ -485,7 +485,10 @@ PersistHoldablePortal(Portal portal)
{
if (portal->atEnd)
{
/* we can handle this case even if posOverflow */
/*
* Just force the tuplestore forward to its end. The size of the
* skip request here is arbitrary.
*/
while (tuplestore_advance(portal->holdStore, true))
/* continue */ ;
}
......@@ -493,11 +496,6 @@ PersistHoldablePortal(Portal portal)
{
int64 store_pos;
if (portal->posOverflow) /* oops, cannot trust portalPos */
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not reposition held cursor")));
tuplestore_rescan(portal->holdStore);
for (store_pos = 0; store_pos < portal->portalPos; store_pos++)
......
......@@ -988,6 +988,32 @@ standard_ExecutorRun(QueryDesc *queryDesc,
}
PG_END_TRY();
/*
* Allow testing of very high number of processed rows, without spending
* hours actually processing that many rows.
*
* Somewhat arbitrarily, only trigger this if more than 10000 rows were truly
* processed. This screens out some internal queries that the system might
* issue during planning.
*/
if (estate->es_processed >= 10000 && estate->es_processed <= 1000000)
//if (estate->es_processed >= 10000)
{
if (FaultInjector_InjectFaultIfSet(ExecutorRunHighProcessed,
DDLNotSpecified,
"" /* databaseName */,
"" /* tableName */))
{
/*
* For testing purposes, pretend that we have already processed
* almost 2^32 rows.
*/
estate->es_processed = UINT_MAX - 10;
}
}
/*
* shutdown tuple receiver, if we started it
*/
......@@ -3226,6 +3252,24 @@ ExecSelect(TupleTableSlot *slot,
{
(*dest->receiveSlot) (slot, dest);
(estate->es_processed)++;
/*
* bump es_processed using the fault injector, but only if the number rows is in a certain range
* this avoids bumping the counter every time after we bumped it once
*/
if (estate->es_processed >= 10000 && estate->es_processed <= 1000000)
{
if (FaultInjector_InjectFaultIfSet(ExecutorRunHighProcessed,
DDLNotSpecified,
"" /* databaseName */,
"" /* tableName */))
{
/*
* For testing purposes, pretend that we have already processed
* almost 2^32 rows.
*/
estate->es_processed = UINT_MAX - 10;
}
}
}
/* ----------------------------------------------------------------
......
......@@ -33,6 +33,7 @@
#include "utils/typcache.h"
#include "utils/resource_manager.h"
#include "utils/resscheduler.h"
#include "utils/faultinjector.h"
#include "cdb/cdbvars.h"
#include "miscadmin.h"
......@@ -43,18 +44,7 @@
#include "cdb/memquota.h"
#include "parser/analyze.h"
/*
* Update the legacy 32-bit processed counter, but handle overflow.
*/
#define SET_SPI_PROCESSED \
if (SPI_processed64 > UINT_MAX) \
SPI_processed = UINT_MAX; \
else \
SPI_processed = (uint32)SPI_processed64
uint64 SPI_processed64 = 0;
uint32 SPI_processed = 0;
uint64 SPI_processed = 0;
Oid SPI_lastoid = InvalidOid;
SPITupleTable *SPI_tuptable = NULL;
int SPI_result;
......@@ -74,7 +64,7 @@ static void _SPI_prepare_plan(const char *src, SPIPlanPtr plan,
static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot crosscheck_snapshot,
bool read_only, bool fire_triggers, long tcount);
bool read_only, bool fire_triggers, int64 tcount);
static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
Datum *Values, const char *Nulls,
......@@ -82,12 +72,12 @@ static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
static void _SPI_assign_query_mem(QueryDesc *queryDesc);
static int _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, long tcount);
static int _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, int64 tcount);
static void _SPI_error_callback(void *arg);
static void _SPI_cursor_operation(Portal portal,
FetchDirection direction, long count,
FetchDirection direction, int64 count,
DestReceiver *dest);
static SPIPlanPtr _SPI_copy_plan(SPIPlanPtr plan, MemoryContext parentcxt);
......@@ -204,7 +194,6 @@ SPI_finish(void)
* Reset result variables, especially SPI_tuptable which is probably
* pointing at a just-deleted tuptable
*/
SPI_processed64 = 0;
SPI_processed = 0;
SPI_lastoid = InvalidOid;
SPI_tuptable = NULL;
......@@ -244,7 +233,6 @@ AtEOXact_SPI(bool isCommit)
_SPI_current = _SPI_stack = NULL;
_SPI_stack_depth = 0;
_SPI_connected = _SPI_curid = -1;
SPI_processed64 = 0;
SPI_processed = 0;
SPI_lastoid = InvalidOid;
SPI_tuptable = NULL;
......@@ -295,7 +283,6 @@ AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid)
_SPI_current = NULL;
else
_SPI_current = &(_SPI_stack[_SPI_connected]);
SPI_processed64 = 0;
SPI_processed = 0;
SPI_lastoid = InvalidOid;
SPI_tuptable = NULL;
......@@ -371,7 +358,7 @@ SPI_restore_connection(void)
/* Parse, plan, and execute a query string */
int
SPI_execute(const char *src, bool read_only, long tcount)
SPI_execute(const char *src, bool read_only, int64 tcount)
{
_SPI_plan plan;
int res;
......@@ -399,7 +386,7 @@ SPI_execute(const char *src, bool read_only, long tcount)
/* Obsolete version of SPI_execute */
int
SPI_exec(const char *src, long tcount)
SPI_exec(const char *src, int64 tcount)
{
return SPI_execute(src, false, tcount);
}
......@@ -407,7 +394,7 @@ SPI_exec(const char *src, long tcount)
/* Execute a previously prepared plan */
int
SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
bool read_only, long tcount)
bool read_only, int64 tcount)
{
int res;
......@@ -434,7 +421,7 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
/* Obsolete version of SPI_execute_plan */
int
SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls, long tcount)
SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls, int64 tcount)
{
return SPI_execute_plan(plan, Values, Nulls, false, tcount);
}
......@@ -456,7 +443,7 @@ int
SPI_execute_snapshot(SPIPlanPtr plan,
Datum *Values, const char *Nulls,
Snapshot snapshot, Snapshot crosscheck_snapshot,
bool read_only, bool fire_triggers, long tcount)
bool read_only, bool fire_triggers, int64 tcount)
{
int res;
......@@ -494,7 +481,7 @@ int
SPI_execute_with_args(const char *src,
int nargs, Oid *argtypes,
Datum *Values, const char *Nulls,
bool read_only, long tcount)
bool read_only, int64 tcount)
{
int res;
_SPI_plan plan;
......@@ -1114,7 +1101,6 @@ SPI_cursor_open_internal(const char *name, SPIPlanPtr plan,
elog(ERROR, "SPI_cursor_open called while not connected");
/* Reset SPI result (note we deliberately don't touch lastoid) */
SPI_processed64 = 0;
SPI_processed = 0;
SPI_tuptable = NULL;
_SPI_current->processed = 0;
......@@ -1315,7 +1301,7 @@ void
SPI_cursor_fetch(Portal portal, bool forward, long count)
{
_SPI_cursor_operation(portal,
forward ? FETCH_FORWARD : FETCH_BACKWARD, count,
forward ? FETCH_FORWARD : FETCH_BACKWARD, (int64) count,
CreateDestReceiver(DestSPI));
/* we know that the DestSPI receiver doesn't need a destroy call */
}
......@@ -1330,7 +1316,7 @@ void
SPI_cursor_move(Portal portal, bool forward, long count)
{
_SPI_cursor_operation(portal,
forward ? FETCH_FORWARD : FETCH_BACKWARD, count,
forward ? FETCH_FORWARD : FETCH_BACKWARD, (int64) count,
None_Receiver);
}
......@@ -1344,7 +1330,7 @@ void
SPI_scroll_cursor_fetch(Portal portal, FetchDirection direction, long count)
{
_SPI_cursor_operation(portal,
direction, count,
direction, (int64) count,
CreateDestReceiver(DestSPI));
/* we know that the DestSPI receiver doesn't need a destroy call */
}
......@@ -1358,7 +1344,7 @@ SPI_scroll_cursor_fetch(Portal portal, FetchDirection direction, long count)
void
SPI_scroll_cursor_move(Portal portal, FetchDirection direction, long count)
{
_SPI_cursor_operation(portal, direction, count, None_Receiver);
_SPI_cursor_operation(portal, direction, (int64) count, None_Receiver);
}
......@@ -1629,6 +1615,10 @@ spi_printtup(TupleTableSlot *slot, DestReceiver *self)
{
tuptable->free = 256;
tuptable->alloced += tuptable->free;
/*
* 74a379b984d4df91acec2436a16c51caee3526af uses repalloc_huge(),
* but this is not yet backported from PG
*/
tuptable->vals = (HeapTuple *) repalloc(tuptable->vals,
tuptable->alloced * sizeof(HeapTuple));
}
......@@ -1768,7 +1758,7 @@ _SPI_prepare_plan(const char *src, SPIPlanPtr plan, ParamListInfo boundParams)
static int
_SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot crosscheck_snapshot,
bool read_only, bool fire_triggers, long tcount)
bool read_only, bool fire_triggers, int64 tcount)
{
int my_res = 0;
uint64 my_processed = 0;
......@@ -2007,8 +1997,7 @@ fail:
error_context_stack = spierrcontext.previous;
/* Save results for caller */
SPI_processed64 = my_processed;
SET_SPI_PROCESSED;
SPI_processed = my_processed;
SPI_lastoid = my_lastoid;
SPI_tuptable = my_tuptable;
......@@ -2094,7 +2083,7 @@ _SPI_assign_query_mem(QueryDesc * queryDesc)
}
static int
_SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, long tcount)
_SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, int64 tcount)
{
int operation = queryDesc->operation;
int res;
......@@ -2212,8 +2201,17 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, long tcount)
if ((res == SPI_OK_SELECT || queryDesc->plannedstmt->returningLists) &&
queryDesc->dest->mydest == DestSPI)
{
if (_SPI_checktuples())
insist_log(false, "consistency check on SPI tuple count failed");
/*
* only check number tuples if the SPI 64 bit test is NOT running
*/
if (!FaultInjector_InjectFaultIfSet(ExecutorRunHighProcessed,
DDLNotSpecified,
"" /* databaseName */,
"" /* tableName */))
{
if (_SPI_checktuples())
insist_log(false, "consistency check on SPI tuple count failed");
}
}
if (!cdbpathlocus_querysegmentcatalogs)
......@@ -2287,10 +2285,10 @@ _SPI_error_callback(void *arg)
* Do a FETCH or MOVE in a cursor
*/
static void
_SPI_cursor_operation(Portal portal, FetchDirection direction, long count,
_SPI_cursor_operation(Portal portal, FetchDirection direction, int64 count,
DestReceiver *dest)
{
int64 nfetched;
uint64 nfetched;
/* Check that the portal is valid */
if (!PortalIsValid(portal))
......@@ -2301,7 +2299,6 @@ _SPI_cursor_operation(Portal portal, FetchDirection direction, long count,
elog(ERROR, "SPI cursor operation called while not connected");
/* Reset the SPI result (note we deliberately don't touch lastoid) */
SPI_processed64 = 0;
SPI_processed = 0;
SPI_tuptable = NULL;
_SPI_current->processed = 0;
......@@ -2327,8 +2324,7 @@ _SPI_cursor_operation(Portal portal, FetchDirection direction, long count,
elog(ERROR, "consistency check on SPI tuple count failed");
/* Put the result into place for access by caller */
SPI_processed64 = _SPI_current->processed;
SET_SPI_PROCESSED;
SPI_processed = _SPI_current->processed;
SPI_tuptable = _SPI_current->tuptable;
......@@ -2396,7 +2392,7 @@ _SPI_end_call(bool procmem)
static bool
_SPI_checktuples(void)
{
uint32 processed = _SPI_current->processed;
uint64 processed = _SPI_current->processed;
SPITupleTable *tuptable = _SPI_current->tuptable;
bool failed = false;
......
......@@ -51,16 +51,16 @@ static void ProcessQuery(Portal portal, /* Resource queueing need SQL, so we pas
DestReceiver *dest,
char *completionTag);
static void FillPortalStore(Portal portal, bool isTopLevel);
static uint64 RunFromStore(Portal portal, ScanDirection direction, int64 count,
static uint64 RunFromStore(Portal portal, ScanDirection direction, uint64 count,
DestReceiver *dest);
static int64 PortalRunSelect(Portal portal, bool forward, int64 count,
static uint64 PortalRunSelect(Portal portal, bool forward, int64 count,
DestReceiver *dest);
static void PortalRunUtility(Portal portal, Node *utilityStmt, bool isTopLevel,
DestReceiver *dest, char *completionTag);
static void PortalRunMulti(Portal portal, bool isTopLevel,
DestReceiver *dest, DestReceiver *altdest,
char *completionTag);
static int64 DoPortalRunFetch(Portal portal,
static uint64 DoPortalRunFetch(Portal portal,
FetchDirection fdirection,
int64 count,
DestReceiver *dest);
......@@ -288,7 +288,7 @@ ProcessQuery(Portal portal,
/*
* Run the plan to completion.
*/
ExecutorRun(queryDesc, ForwardScanDirection, 0L);
ExecutorRun(queryDesc, ForwardScanDirection, 0);
/* Now take care of any queued AFTER triggers */
AfterTriggerEndQuery(queryDesc->estate);
......@@ -730,7 +730,6 @@ PortalStart(Portal portal, ParamListInfo params, Snapshot snapshot,
portal->atStart = true;
portal->atEnd = false; /* allow fetches */
portal->portalPos = 0;
portal->posOverflow = false;
PopActiveSnapshot();
break;
......@@ -758,7 +757,6 @@ PortalStart(Portal portal, ParamListInfo params, Snapshot snapshot,
portal->atStart = true;
portal->atEnd = false; /* allow fetches */
portal->portalPos = 0;
portal->posOverflow = false;
break;
case PORTAL_UTIL_SELECT:
......@@ -780,7 +778,6 @@ PortalStart(Portal portal, ParamListInfo params, Snapshot snapshot,
portal->atStart = true;
portal->atEnd = false; /* allow fetches */
portal->portalPos = 0;
portal->posOverflow = false;
break;
case PORTAL_MULTI_QUERY:
......@@ -871,8 +868,8 @@ PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
*
* count <= 0 is interpreted as a no-op: the destination gets started up
* and shut down, but nothing else happens. Also, count == FETCH_ALL is
* interpreted as "all rows". Note that count is ignored in multi-query
* situations, where we always run the portal to completion.
* interpreted as "all rows". (cf FetchStmt.howMany)
* Note that count is ignored in multi-query
*
* isTopLevel: true if query is being executed at backend "top level"
* (that is, directly from a client command message)
......@@ -1082,7 +1079,7 @@ PortalRun(Portal portal, int64 count, bool isTopLevel,
*
* Returns number of rows processed (suitable for use in result tag)
*/
static int64
static uint64
PortalRunSelect(Portal portal,
bool forward,
int64 count,
......@@ -1124,7 +1121,9 @@ PortalRunSelect(Portal portal,
if (forward)
{
if (portal->atEnd || count <= 0)
{
direction = NoMovementScanDirection;
}
else
direction = ForwardScanDirection;
......@@ -1133,7 +1132,7 @@ PortalRunSelect(Portal portal,
count = 0;
if (portal->holdStore)
nprocessed = RunFromStore(portal, direction, count, dest);
nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
else
{
PushActiveSnapshot(queryDesc->snapshot);
......@@ -1144,18 +1143,11 @@ PortalRunSelect(Portal portal,
if (!ScanDirectionIsNoMovement(direction))
{
long oldPos;
if (nprocessed > 0)
portal->atStart = false; /* OK to go backward now */
if (count == 0 ||
(unsigned long) nprocessed < (unsigned long) count)
if (count == 0 || nprocessed < count)
portal->atEnd = true; /* we retrieved 'em all */
oldPos = portal->portalPos;
portal->portalPos += nprocessed;
/* portalPos doesn't advance when we fall off the end */
if (portal->portalPos < oldPos)
portal->posOverflow = true;
}
}
else
......@@ -1167,7 +1159,9 @@ PortalRunSelect(Portal portal,
errhint("Declare it with SCROLL option to enable backward scan.")));
if (portal->atStart || count <= 0)
{
direction = NoMovementScanDirection;
}
else
direction = BackwardScanDirection;
......@@ -1176,7 +1170,7 @@ PortalRunSelect(Portal portal,
count = 0;
if (portal->holdStore)
nprocessed = RunFromStore(portal, direction, count, dest);
nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
else
{
PushActiveSnapshot(queryDesc->snapshot);
......@@ -1192,22 +1186,14 @@ PortalRunSelect(Portal portal,
portal->atEnd = false; /* OK to go forward now */
portal->portalPos++; /* adjust for endpoint case */
}
if (count == 0 ||
(unsigned long) nprocessed < (unsigned long) count)
if (count == 0 || nprocessed < count)
{
portal->atStart = true; /* we retrieved 'em all */
portal->portalPos = 0;
portal->posOverflow = false;
}
else
{
int64 oldPos;
oldPos = portal->portalPos;
portal->portalPos -= nprocessed;
if (portal->portalPos > oldPos ||
portal->portalPos <= 0)
portal->posOverflow = true;
}
}
}
......@@ -1280,10 +1266,10 @@ FillPortalStore(Portal portal, bool isTopLevel)
* out for memory leaks.
*/
static uint64
RunFromStore(Portal portal, ScanDirection direction, int64 count,
RunFromStore(Portal portal, ScanDirection direction, uint64 count,
DestReceiver *dest)
{
int64 current_tuple_count = 0;
uint64 current_tuple_count = 0;
TupleTableSlot *slot;
slot = MakeSingleTupleTableSlot(portal->tupDesc);
......@@ -1332,7 +1318,7 @@ RunFromStore(Portal portal, ScanDirection direction, int64 count,
ExecDropSingleTupleTableSlot(slot);
return (uint32) current_tuple_count;
return current_tuple_count;
}
/*
......@@ -1534,13 +1520,13 @@ PortalRunMulti(Portal portal, bool isTopLevel,
*
* Returns number of rows processed (suitable for use in result tag)
*/
int64
uint64
PortalRunFetch(Portal portal,
FetchDirection fdirection,
int64 count,
DestReceiver *dest)
{
int64 result = 0;
uint64 result = 0;
Portal saveActivePortal;
ResourceOwner saveResourceOwner;
MemoryContext savePortalContext;
......@@ -1634,9 +1620,13 @@ PortalRunFetch(Portal portal,
* DoPortalRunFetch
* Guts of PortalRunFetch --- the portal context is already set up
*
* count <= 0 is interpreted as a no-op: the destination gets started up
* and shut down, but nothing else happens. Also, count == FETCH_ALL is
* interpreted as "all rows". (cf FetchStmt.howMany)
*
* Returns number of rows processed (suitable for use in result tag)
*/
static int64
static uint64
DoPortalRunFetch(Portal portal,
FetchDirection fdirection,
int64 count,
......@@ -1683,14 +1673,21 @@ DoPortalRunFetch(Portal portal,
{
/*
* Definition: Rewind to start, advance count-1 rows, return
* next row (if any). In practice, if the goal is less than
* halfway back to the start, it's better to scan from where
* we are. In any case, we arrange to fetch the target row
* going forwards.
* next row (if any).
*
* In practice, if the goal is less than halfway back to the
* start, it's better to scan from where we are.
*
* Also, if current portalPos is outside the range of "long",
* do it the hard way to avoid possible overflow of the count
* argument to PortalRunSelect. We must exclude exactly
* LONG_MAX, as well, lest the count look like FETCH_ALL.
*
* In any case, we arrange to fetch the target row going
* forwards.
*/
if (portal->posOverflow ||
portal->portalPos == INT64CONST(0x7FFFFFFFFFFFFFFF) ||
count - 1 <= portal->portalPos / 2)
if ((uint64) (count - 1) <= portal->portalPos / 2 ||
portal->portalPos >= (uint64) LONG_MAX)
{
/* until we enable backward scan - bail out here */
if(portal->portalPos > 0)
......@@ -1705,7 +1702,7 @@ DoPortalRunFetch(Portal portal,
}
else
{
int64 pos = portal->portalPos;
uint64 pos = portal->portalPos;
if (portal->atEnd)
pos++; /* need one extra fetch if off end */
......@@ -1716,7 +1713,7 @@ DoPortalRunFetch(Portal portal,
PortalRunSelect(portal, true, count - pos - 1,
None_Receiver);
}
return PortalRunSelect(portal, true, 1L, dest);
return PortalRunSelect(portal, true, 1, dest);
}
else if (count < 0)
{
......@@ -1736,7 +1733,7 @@ DoPortalRunFetch(Portal portal,
PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
if (count < -1)
PortalRunSelect(portal, false, -count - 1, None_Receiver);
return PortalRunSelect(portal, false, 1L, dest);
return PortalRunSelect(portal, false, 1, dest);
}
else
{
......@@ -1749,7 +1746,7 @@ DoPortalRunFetch(Portal portal,
/* Rewind to start, return zero rows */
DoPortalRewind(portal);
return PortalRunSelect(portal, true, 0L, dest);
return PortalRunSelect(portal, true, 0, dest);
}
break;
case FETCH_RELATIVE:
......@@ -1760,7 +1757,7 @@ DoPortalRunFetch(Portal portal,
*/
if (count > 1)
PortalRunSelect(portal, true, count - 1, None_Receiver);
return PortalRunSelect(portal, true, 1L, dest);
return PortalRunSelect(portal, true, 1, dest);
}
else if (count < 0)
{
......@@ -1776,7 +1773,7 @@ DoPortalRunFetch(Portal portal,
if (count < -1)
PortalRunSelect(portal, false, -count - 1, None_Receiver);
return PortalRunSelect(portal, false, 1L, dest);
return PortalRunSelect(portal, false, 1, dest);
}
else
{
......@@ -1809,7 +1806,7 @@ DoPortalRunFetch(Portal portal,
if (dest->mydest == DestNone)
{
/* MOVE 0 returns 0/1 based on if FETCH 0 would return a row */
return on_row ? 1L : 0L;
return on_row ? 1 : 0;
}
else
{
......@@ -1822,7 +1819,7 @@ DoPortalRunFetch(Portal portal,
*/
if (on_row)
{
PortalRunSelect(portal, false, 1L, None_Receiver);
PortalRunSelect(portal, false, 1, None_Receiver);
/* Set up to fetch one row forward */
count = 1;
forward = true;
......@@ -1835,7 +1832,7 @@ DoPortalRunFetch(Portal portal,
*/
if (!forward && count == FETCH_ALL && dest->mydest == DestNone)
{
int64 result = portal->portalPos;
uint64 result = portal->portalPos;
/* until we enable backward scan - bail out here */
ereport(ERROR,
......@@ -1845,7 +1842,6 @@ DoPortalRunFetch(Portal portal,
if (result > 0 && !portal->atEnd)
result--;
DoPortalRewind(portal);
/* result is bogus if pos had overflowed, but it's best we can do */
return result;
}
......@@ -1872,7 +1868,6 @@ DoPortalRewind(Portal portal)
portal->atStart = true;
portal->atEnd = false;
portal->portalPos = 0;
portal->posOverflow = false;
}
/*
......
......@@ -263,7 +263,7 @@ tsquery_rewrite_query(PG_FUNCTION_ARGS)
SPIPlanPtr plan;
Portal portal;
bool isnull;
int i;
int64 i;
if (query->size == 0)
{
......
......@@ -1097,7 +1097,7 @@ static TSVectorStat *
ts_stat_sql(MemoryContext persistentContext, text *txt, text *ws)
{
char *query = text_to_cstring(txt);
int i;
int64 i;
TSVectorStat *stat;
bool isnull;
Portal portal;
......
......@@ -128,7 +128,7 @@ static const char *map_sql_catalog_to_xmlschema_types(List *nspid_list,
static const char *map_sql_type_to_xml_name(Oid typeoid, int typmod);
static const char *map_sql_typecoll_to_xmlschema_types(List *tupdesc_list);
static const char *map_sql_type_to_xmlschema_type(Oid typeoid, int typmod);
static void SPI_sql_row_to_xmlelement(int rownum, StringInfo result,
static void SPI_sql_row_to_xmlelement(uint64 rownum, StringInfo result,
char *tablename, bool nulls, bool tableforest,
const char *targetns, bool top_level);
......@@ -1942,7 +1942,7 @@ _SPI_strdup(const char *s)
static List *
query_to_oid_list(const char *query)
{
int i;
uint64 i;
List *list = NIL;
SPI_execute(query, true, 0);
......@@ -2061,7 +2061,7 @@ cursor_to_xml(PG_FUNCTION_ARGS)
StringInfoData result;
Portal portal;
int i;
uint64 i;
initStringInfo(&result);
......@@ -2136,7 +2136,7 @@ query_to_xml_internal(const char *query, char *tablename,
{
StringInfo result;
char *xmltn;
int i;
uint64 i;
if (tablename)
xmltn = map_sql_identifier_to_xml_name(tablename, true, false);
......@@ -3211,7 +3211,7 @@ map_sql_type_to_xmlschema_type(Oid typeoid, int typmod)
* SPI cursor. See also SQL/XML:2008 section 9.10.
*/
static void
SPI_sql_row_to_xmlelement(int rownum, StringInfo result, char *tablename,
SPI_sql_row_to_xmlelement(uint64 rownum, StringInfo result, char *tablename,
bool nulls, bool tableforest,
const char *targetns, bool top_level)
{
......
......@@ -258,6 +258,8 @@ FaultInjectorIdentifierEnumToString[] = {
_("workfile_write_failure"),
/* inject fault to simulate workfile write failure */
_("workfile_hashjoin_failure"),
/* pretend that a query processed billions of rows */
_("executor_run_high_processed"),
/* inject fault before we close workfile in ExecHashJoinNewBatch */
_("update_committed_eof_in_persistent_table"),
/* inject fault before committed EOF is updated in gp_persistent_relation_node for Append Only segment files */
......@@ -552,7 +554,7 @@ FaultInjector_InjectFaultNameIfSet(
* ok given this framework is purely for dev/testing.
*/
if (faultInjectorShmem->faultInjectorSlots == 0)
return FALSE;
return FaultInjectorTypeNotSpecified;
snprintf(databaseNameLocal, sizeof(databaseNameLocal), "%s", databaseName);
snprintf(tableNameLocal, sizeof(tableNameLocal), "%s", tableName);
......@@ -1059,6 +1061,7 @@ FaultInjector_NewHashEntry(
case InterconnectStopAckIsLost:
case SendQEDetailsInitBackend:
case ExecutorRunHighProcessed:
break;
default:
......
......@@ -238,7 +238,7 @@ typedef struct ScanMethod
extern void ExecutorStart(QueryDesc *queryDesc, int eflags);
extern void standard_ExecutorStart(QueryDesc *queryDesc, int eflags);
extern void ExecutorRun(QueryDesc *queryDesc,
ScanDirection direction, long count);
ScanDirection direction, int64 count);
extern void standard_ExecutorRun(QueryDesc *queryDesc,
ScanDirection direction, long count);
extern void ExecutorEnd(QueryDesc *queryDesc);
......
......@@ -22,8 +22,8 @@
typedef struct SPITupleTable
{
MemoryContext tuptabcxt; /* memory context of result table */
uint32 alloced; /* # of alloced vals */
uint32 free; /* # of free vals */
uint64 alloced; /* # of alloced vals */
uint64 free; /* # of free vals */
TupleDesc tupdesc; /* tuple descriptor */
HeapTuple *vals; /* tuples */
} SPITupleTable;
......@@ -58,8 +58,7 @@ typedef struct _SPI_plan *SPIPlanPtr;
#define SPI_OK_UPDATE_RETURNING 13
#define SPI_OK_REWRITTEN 14
extern PGDLLIMPORT uint32 SPI_processed;
extern PGDLLIMPORT uint64 SPI_processed64;
extern PGDLLIMPORT uint64 SPI_processed;
extern PGDLLIMPORT Oid SPI_lastoid;
extern PGDLLIMPORT SPITupleTable *SPI_tuptable;
extern PGDLLIMPORT int SPI_result;
......@@ -71,21 +70,21 @@ extern void SPI_pop(void);
extern bool SPI_push_conditional(void);
extern void SPI_pop_conditional(bool pushed);
extern void SPI_restore_connection(void);
extern int SPI_execute(const char *src, bool read_only, long tcount);
extern int SPI_execute(const char *src, bool read_only, int64 tcount);
extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
bool read_only, long tcount);
extern int SPI_exec(const char *src, long tcount);
bool read_only, int64 tcount);
extern int SPI_exec(const char *src, int64 tcount);
extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
long tcount);
int64 tcount);
extern int SPI_execute_snapshot(SPIPlanPtr plan,
Datum *Values, const char *Nulls,
Snapshot snapshot,
Snapshot crosscheck_snapshot,
bool read_only, bool fire_triggers, long tcount);
bool read_only, bool fire_triggers, int64 tcount);
extern int SPI_execute_with_args(const char *src,
int nargs, Oid *argtypes,
Datum *Values, const char *Nulls,
bool read_only, long tcount);
bool read_only, int64 tcount);
extern SPIPlanPtr SPI_prepare(const char *src, int nargs, Oid *argtypes);
extern SPIPlanPtr SPI_prepare_cursor(const char *src, int nargs, Oid *argtypes,
int cursorOptions);
......
......@@ -296,6 +296,7 @@ extern struct varlena *pg_detoast_datum_packed(struct varlena * datum);
#define PG_RETURN_FLOAT4(x) return Float4GetDatum(x)
#define PG_RETURN_FLOAT8(x) return Float8GetDatum(x)
#define PG_RETURN_INT64(x) return Int64GetDatum(x)
#define PG_RETURN_UINT64(x) return UInt64GetDatum(x)
/* RETURN macros for other pass-by-ref types will typically look like this: */
#define PG_RETURN_BYTEA_P(x) PG_RETURN_POINTER(x)
#define PG_RETURN_TEXT_P(x) PG_RETURN_POINTER(x)
......
......@@ -306,6 +306,19 @@ typedef union Datum_U
typedef Datum *DatumPtr;
#define GET_1_BYTE(datum) (((Datum) (datum)) & 0x000000ff)
#define GET_2_BYTES(datum) (((Datum) (datum)) & 0x0000ffff)
#define GET_4_BYTES(datum) (((Datum) (datum)) & 0xffffffff)
#if SIZEOF_DATUM == 8
#define GET_8_BYTES(datum) ((Datum) (datum))
#endif
#define SET_1_BYTE(value) (((Datum) (value)) & 0x000000ff)
#define SET_2_BYTES(value) (((Datum) (value)) & 0x0000ffff)
#define SET_4_BYTES(value) (((Datum) (value)) & 0xffffffff)
#if SIZEOF_DATUM == 8
#define SET_8_BYTES(value) ((Datum) (value))
#endif
/*
* Conversion between Datum and type X. Changed from Macro to static inline
* functions to get proper type checking.
......@@ -347,8 +360,32 @@ static inline int64 DatumGetInt64(Datum d) { return (int64) d; }
static inline Datum Int64GetDatum(int64 i64) { return (Datum) i64; }
static inline Datum Int64GetDatumFast(int64 x) { return Int64GetDatum(x); }
static inline uint64 DatumGetUInt64(Datum d) { return (uint64) d; }
static inline Datum UInt64GetDatum(uint64 ui64) { return (Datum) ui64; }
/*
* DatumGetUInt64
* Returns 64-bit unsigned integer value of a datum.
*
* Note: this macro hides whether int64 is pass by value or by reference.
*/
#ifdef USE_FLOAT8_BYVAL
#define DatumGetUInt64(X) ((uint64) GET_8_BYTES(X))
#else
#define DatumGetUInt64(X) (* ((uint64 *) DatumGetPointer(X)))
#endif
/*
* UInt64GetDatum
* Returns datum representation for a 64-bit unsigned integer.
*
* Note: if int64 is pass by reference, this function returns a reference
* to palloc'd space.
*/
#ifdef USE_FLOAT8_BYVAL
#define UInt64GetDatum(X) ((Datum) SET_8_BYTES(X))
#else
#define UInt64GetDatum(X) Int64GetDatum((int64) (X))
#endif
static inline Oid DatumGetObjectId(Datum d) { return (Oid) d; }
static inline Datum ObjectIdGetDatum(Oid oid) { return (Datum) oid; }
......
......@@ -38,7 +38,7 @@ extern bool PortalRun(Portal portal, int64 count, bool isTopLevel,
DestReceiver *dest, DestReceiver *altdest,
char *completionTag);
extern int64 PortalRunFetch(Portal portal,
extern uint64 PortalRunFetch(Portal portal,
FetchDirection fdirection,
int64 count,
DestReceiver *dest);
......
......@@ -166,6 +166,8 @@ typedef enum FaultInjectorIdentifier_e {
WorkfileWriteFail,
WorkfileHashJoinFailure,
ExecutorRunHighProcessed,
UpdateCommittedEofInPersistentTable,
MultiExecHashLargeVmem,
......
......@@ -169,15 +169,14 @@ typedef struct PortalData
* atStart, atEnd and portalPos indicate the current cursor position.
* portalPos is zero before the first row, N after fetching N'th row of
* query. After we run off the end, portalPos = # of rows in query, and
* atEnd is true. If portalPos overflows, set posOverflow (this causes us
* to stop relying on its value for navigation). Note that atStart
* implies portalPos == 0, but not the reverse (portalPos could have
* overflowed).
* atEnd is true. Note that atStart implies portalPos == 0, but not the
* reverse: we might have backed up only as far as the first row, not to
* the start. Also note that various code inspects atStart and atEnd, but
* only the portal movement routines should touch portalPos.
*/
bool atStart;
bool atEnd;
bool posOverflow;
int64 portalPos;
uint64 portalPos;
/* Presentation data, primarily used by the pg_cursors system view */
TimestampTz creation_time; /* time at which this portal was defined */
......
......@@ -167,7 +167,7 @@ static Datum exec_eval_expr(PLpgSQL_execstate *estate,
bool *isNull,
Oid *rettype);
static int exec_run_select(PLpgSQL_execstate *estate,
PLpgSQL_expr *expr, long maxtuples, Portal *portalP);
PLpgSQL_expr *expr, int64 maxtuples, Portal *portalP);
static int exec_for_query(PLpgSQL_execstate *estate, PLpgSQL_stmt_forq *stmt,
Portal portal, bool prefetch_ok);
static void eval_expr_params(PLpgSQL_execstate *estate,
......@@ -2741,7 +2741,7 @@ exec_stmt_execsql(PLpgSQL_execstate *estate,
{
Datum *values;
char *nulls;
long tcount;
int64 tcount;
int rc;
PLpgSQL_expr *expr = stmt->sqlstmt;
......@@ -2870,14 +2870,14 @@ exec_stmt_execsql(PLpgSQL_execstate *estate,
}
/* All variants should save result info for GET DIAGNOSTICS */
estate->eval_processed = SPI_processed64;
estate->eval_processed = SPI_processed;
estate->eval_lastoid = SPI_lastoid;
/* Process INTO if present */
if (stmt->into)
{
SPITupleTable *tuptab = SPI_tuptable;
uint32 n = SPI_processed;
uint64 n = SPI_processed;
PLpgSQL_rec *rec = NULL;
PLpgSQL_row *row = NULL;
......@@ -3060,7 +3060,7 @@ exec_stmt_dynexecute(PLpgSQL_execstate *estate,
if (stmt->into)
{
SPITupleTable *tuptab = SPI_tuptable;
uint32 n = SPI_processed;
uint64 n = SPI_processed;
PLpgSQL_rec *rec = NULL;
PLpgSQL_row *row = NULL;
......@@ -4205,7 +4205,7 @@ exec_eval_expr(PLpgSQL_execstate *estate,
*/
static int
exec_run_select(PLpgSQL_execstate *estate,
PLpgSQL_expr *expr, long maxtuples, Portal *portalP)
PLpgSQL_expr *expr, int64 maxtuples, Portal *portalP)
{
Datum *values;
char *nulls;
......
CREATE EXTENSION IF NOT EXISTS gp_inject_fault;
DROP TABLE IF EXISTS public.spi64bittest;
NOTICE: table "spi64bittest" does not exist, skipping
-- use a sequence as primary key, so we can update the data later on
CREATE TABLE public.spi64bittest (id BIGSERIAL PRIMARY KEY, data BIGINT);
NOTICE: CREATE TABLE will create implicit sequence "spi64bittest_id_seq" for serial column "spi64bittest.id"
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "spi64bittest_pkey" for table "spi64bittest"
-- general test case first, user test case afterwards
-- Pretend that the INSERT below inserted more than 4 billion rows in a plpgsql function
--
-- Use type 'skip', because we don't want to throw an ERROR or worse. There
-- is special handling at the code that checks for this fault, to bump up
-- the row counter regardless of the fault type.
SELECT gp_inject_fault('executor_run_high_processed', 'reset', '', '', '', 0, 0, dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
NOTICE: Success:
NOTICE: Success:
NOTICE: Success:
NOTICE: Success:
gp_inject_fault
-----------------
t
t
t
t
(4 rows)
-- insert enough rows to trigger the fault injector
SELECT gp_inject_fault('executor_run_high_processed', 'skip', '', '', '', 0, 0, dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
NOTICE: Success:
NOTICE: Success:
NOTICE: Success:
NOTICE: Success:
gp_inject_fault
-----------------
t
t
t
t
(4 rows)
DO $$
declare
num_rows int8;
begin
INSERT INTO public.spi64bittest (data)
SELECT g
FROM generate_series(1, 30000) g;
GET DIAGNOSTICS num_rows = ROW_COUNT;
RAISE NOTICE 'Inserted % rows', num_rows;
end;
$$;
NOTICE: Inserted 4294987269 rows
SELECT gp_inject_fault('executor_run_high_processed', 'reset', '', '', '', 0, 0, dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
NOTICE: Success:
NOTICE: Success:
NOTICE: Success:
NOTICE: Success:
gp_inject_fault
-----------------
t
t
t
t
(4 rows)
SELECT COUNT(*) AS count
FROM public.spi64bittest;
count
-------
30000
(1 row)
-- update all rows, and trigger the fault injector
SELECT gp_inject_fault('executor_run_high_processed', 'skip', '', '', '', 0, 0, dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
NOTICE: Success:
NOTICE: Success:
NOTICE: Success:
NOTICE: Success:
gp_inject_fault
-----------------
t
t
t
t
(4 rows)
DO $$
declare
num_rows int8;
begin
UPDATE public.spi64bittest
SET data = data + 1;
GET DIAGNOSTICS num_rows = ROW_COUNT;
RAISE NOTICE 'Updated % rows', num_rows;
end;
$$;
NOTICE: Updated 4294987269 rows
SELECT gp_inject_fault('executor_run_high_processed', 'reset', '', '', '', 0, 0, dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
NOTICE: Success:
NOTICE: Success:
NOTICE: Success:
NOTICE: Success:
gp_inject_fault
-----------------
t
t
t
t
(4 rows)
SELECT COUNT(*) AS count
FROM public.spi64bittest;
count
-------
30000
(1 row)
-- delete all rows, and trigger the fault injector
SELECT gp_inject_fault('executor_run_high_processed', 'skip', '', '', '', 0, 0, dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
NOTICE: Success:
NOTICE: Success:
NOTICE: Success:
NOTICE: Success:
gp_inject_fault
-----------------
t
t
t
t
(4 rows)
DO $$
declare
num_rows int8;
begin
DELETE FROM public.spi64bittest;
GET DIAGNOSTICS num_rows = ROW_COUNT;
RAISE NOTICE 'Deleted % rows', num_rows;
end;
$$;
NOTICE: Deleted 4294987269 rows
SELECT gp_inject_fault('executor_run_high_processed', 'reset', '', '', '', 0, 0, dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
NOTICE: Success:
NOTICE: Success:
NOTICE: Success:
NOTICE: Success:
gp_inject_fault
-----------------
t
t
t
t
(4 rows)
SELECT COUNT(*) AS count
FROM public.spi64bittest;
count
-------
0
(1 row)
DROP TABLE public.spi64bittest;
-- user test case
-- create a function which executes SQL statements and processes the number of touched rows
CREATE OR REPLACE FUNCTION sql_exec_stmt(sql_stmt TEXT)
RETURNS BIGINT AS $$
DECLARE
num_rows BIGINT;
BEGIN
EXECUTE sql_stmt;
GET DIAGNOSTICS num_rows := ROW_COUNT;
RETURN (num_rows);
END
$$
LANGUAGE 'plpgsql' VOLATILE;
SELECT sql_exec_stmt('SELECT 1');
sql_exec_stmt
---------------
1
(1 row)
DROP TABLE IF EXISTS public.spi64bittest_2;
NOTICE: table "spi64bittest_2" does not exist, skipping
CREATE TABLE public.spi64bittest_2 (id BIGINT);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
-- insert some data
SELECT sql_exec_stmt('INSERT INTO public.spi64bittest_2 (id) SELECT generate_series(1,5000)');
sql_exec_stmt
---------------
5000
(1 row)
-- activate fault injector
SELECT gp_inject_fault('executor_run_high_processed', 'skip', '', '', '', 0, 0, dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
NOTICE: Success:
NOTICE: Success:
NOTICE: Success:
NOTICE: Success:
gp_inject_fault
-----------------
t
t
t
t
(4 rows)
-- double the data
SELECT sql_exec_stmt('INSERT INTO public.spi64bittest_2 (id) SELECT id FROM public.spi64bittest_2');
sql_exec_stmt
---------------
5000
(1 row)
SELECT sql_exec_stmt('INSERT INTO public.spi64bittest_2 (id) SELECT id FROM public.spi64bittest_2');
sql_exec_stmt
---------------
10000
(1 row)
SELECT sql_exec_stmt('INSERT INTO public.spi64bittest_2 (id) SELECT id FROM public.spi64bittest_2');
sql_exec_stmt
---------------
20000
(1 row)
SELECT sql_exec_stmt('INSERT INTO public.spi64bittest_2 (id) SELECT id FROM public.spi64bittest_2');
sql_exec_stmt
---------------
12884901855
(1 row)
SELECT gp_inject_fault('executor_run_high_processed', 'reset', '', '', '', 0, 0, dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
NOTICE: Success:
NOTICE: Success:
NOTICE: Success:
NOTICE: Success:
gp_inject_fault
-----------------
t
t
t
t
(4 rows)
SELECT COUNT(*) AS count
FROM public.spi64bittest_2;
count
-------
80000
(1 row)
DROP TABLE public.spi64bittest_2;
......@@ -15,7 +15,7 @@
# hitting max_connections limit on segments.
#
test: gp_aggregates gp_metadata variadic_parameters default_parameters function_extensions spi gp_xml pgoptions shared_scan null
test: gp_aggregates gp_metadata variadic_parameters default_parameters function_extensions spi spi_processed64bit gp_xml pgoptions shared_scan null
test: leastsquares opr_sanity_gp decode_expr bitmapscan bitmapscan_ao case_gp limit_gp notin percentile join_gp union_gp gpcopy gp_create_table gp_create_view window_views
test: filter gpctas gpdist matrix toast sublink table_functions olap_setup complex opclass_ddl information_schema guc_env_var guc_gp gp_explain
......
CREATE EXTENSION IF NOT EXISTS gp_inject_fault;
DROP TABLE IF EXISTS public.spi64bittest;
-- use a sequence as primary key, so we can update the data later on
CREATE TABLE public.spi64bittest (id BIGSERIAL PRIMARY KEY, data BIGINT);
-- general test case first, user test case afterwards
-- Pretend that the INSERT below inserted more than 4 billion rows in a plpgsql function
--
-- Use type 'skip', because we don't want to throw an ERROR or worse. There
-- is special handling at the code that checks for this fault, to bump up
-- the row counter regardless of the fault type.
SELECT gp_inject_fault('executor_run_high_processed', 'reset', '', '', '', 0, 0, dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
-- insert enough rows to trigger the fault injector
SELECT gp_inject_fault('executor_run_high_processed', 'skip', '', '', '', 0, 0, dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
DO $$
declare
num_rows int8;
begin
INSERT INTO public.spi64bittest (data)
SELECT g
FROM generate_series(1, 30000) g;
GET DIAGNOSTICS num_rows = ROW_COUNT;
RAISE NOTICE 'Inserted % rows', num_rows;
end;
$$;
SELECT gp_inject_fault('executor_run_high_processed', 'reset', '', '', '', 0, 0, dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
SELECT COUNT(*) AS count
FROM public.spi64bittest;
-- update all rows, and trigger the fault injector
SELECT gp_inject_fault('executor_run_high_processed', 'skip', '', '', '', 0, 0, dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
DO $$
declare
num_rows int8;
begin
UPDATE public.spi64bittest
SET data = data + 1;
GET DIAGNOSTICS num_rows = ROW_COUNT;
RAISE NOTICE 'Updated % rows', num_rows;
end;
$$;
SELECT gp_inject_fault('executor_run_high_processed', 'reset', '', '', '', 0, 0, dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
SELECT COUNT(*) AS count
FROM public.spi64bittest;
-- delete all rows, and trigger the fault injector
SELECT gp_inject_fault('executor_run_high_processed', 'skip', '', '', '', 0, 0, dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
DO $$
declare
num_rows int8;
begin
DELETE FROM public.spi64bittest;
GET DIAGNOSTICS num_rows = ROW_COUNT;
RAISE NOTICE 'Deleted % rows', num_rows;
end;
$$;
SELECT gp_inject_fault('executor_run_high_processed', 'reset', '', '', '', 0, 0, dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
SELECT COUNT(*) AS count
FROM public.spi64bittest;
DROP TABLE public.spi64bittest;
-- user test case
-- create a function which executes SQL statements and processes the number of touched rows
CREATE OR REPLACE FUNCTION sql_exec_stmt(sql_stmt TEXT)
RETURNS BIGINT AS $$
DECLARE
num_rows BIGINT;
BEGIN
EXECUTE sql_stmt;
GET DIAGNOSTICS num_rows := ROW_COUNT;
RETURN (num_rows);
END
$$
LANGUAGE 'plpgsql' VOLATILE;
SELECT sql_exec_stmt('SELECT 1');
DROP TABLE IF EXISTS public.spi64bittest_2;
CREATE TABLE public.spi64bittest_2 (id BIGINT);
-- insert some data
SELECT sql_exec_stmt('INSERT INTO public.spi64bittest_2 (id) SELECT generate_series(1,5000)');
-- activate fault injector
SELECT gp_inject_fault('executor_run_high_processed', 'skip', '', '', '', 0, 0, dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
-- double the data
SELECT sql_exec_stmt('INSERT INTO public.spi64bittest_2 (id) SELECT id FROM public.spi64bittest_2');
SELECT sql_exec_stmt('INSERT INTO public.spi64bittest_2 (id) SELECT id FROM public.spi64bittest_2');
SELECT sql_exec_stmt('INSERT INTO public.spi64bittest_2 (id) SELECT id FROM public.spi64bittest_2');
SELECT sql_exec_stmt('INSERT INTO public.spi64bittest_2 (id) SELECT id FROM public.spi64bittest_2');
SELECT gp_inject_fault('executor_run_high_processed', 'reset', '', '', '', 0, 0, dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
SELECT COUNT(*) AS count
FROM public.spi64bittest_2;
DROP TABLE public.spi64bittest_2;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册