提交 ce22b327 编写于 作者: A Andreas Scherbaum 提交者: Andreas Scherbaum

SPI 64 bit changes for pl/Python (#4154)

SPI 64 bit changes for pl/Python

Includes fault injection tests
上级 0c0782fe
...@@ -126,6 +126,8 @@ typedef int Py_ssize_t; ...@@ -126,6 +126,8 @@ typedef int Py_ssize_t;
#define pyelog(...) #define pyelog(...)
#endif #endif
#include "utils/faultinjector.h"
PG_MODULE_MAGIC; PG_MODULE_MAGIC;
/* convert Postgresql Datum or tuple into a PyObject. /* convert Postgresql Datum or tuple into a PyObject.
...@@ -3390,9 +3392,9 @@ static int PLy_result_ass_slice(PyObject *, Py_ssize_t, Py_ssize_t, PyObject *); ...@@ -3390,9 +3392,9 @@ static int PLy_result_ass_slice(PyObject *, Py_ssize_t, Py_ssize_t, PyObject *);
static PyObject *PLy_spi_prepare(PyObject *, PyObject *); static PyObject *PLy_spi_prepare(PyObject *, PyObject *);
static PyObject *PLy_spi_execute(PyObject *, PyObject *); static PyObject *PLy_spi_execute(PyObject *, PyObject *);
static PyObject *PLy_spi_execute_query(char *query, long limit); static PyObject *PLy_spi_execute_query(char *query, int64 limit);
static PyObject *PLy_spi_execute_plan(PyObject *, PyObject *, long); static PyObject *PLy_spi_execute_plan(PyObject *, PyObject *, int64);
static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *, int, int); static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *, int64, int);
static PyObject *PLy_quote_literal(PyObject *self, PyObject *args); static PyObject *PLy_quote_literal(PyObject *self, PyObject *args);
static PyObject *PLy_quote_nullable(PyObject *self, PyObject *args); static PyObject *PLy_quote_nullable(PyObject *self, PyObject *args);
...@@ -3951,7 +3953,7 @@ PLy_spi_execute(PyObject *self __attribute__((unused)), PyObject *args) ...@@ -3951,7 +3953,7 @@ PLy_spi_execute(PyObject *self __attribute__((unused)), PyObject *args)
PLy_enter_python_intepreter = false; PLy_enter_python_intepreter = false;
if (PyArg_ParseTuple(args, "s|l", &query, &limit)) if (PyArg_ParseTuple(args, "s|l", &query, &limit))
{ {
ret = PLy_spi_execute_query(query, limit); ret = PLy_spi_execute_query(query, (int64) limit);
PLy_enter_python_intepreter = true; PLy_enter_python_intepreter = true;
return ret; return ret;
} }
...@@ -3960,7 +3962,7 @@ PLy_spi_execute(PyObject *self __attribute__((unused)), PyObject *args) ...@@ -3960,7 +3962,7 @@ PLy_spi_execute(PyObject *self __attribute__((unused)), PyObject *args)
if (PyArg_ParseTuple(args, "O|Ol", &plan, &list, &limit) && if (PyArg_ParseTuple(args, "O|Ol", &plan, &list, &limit) &&
is_PLyPlanObject(plan)) is_PLyPlanObject(plan))
{ {
ret = PLy_spi_execute_plan(plan, list, limit); ret = PLy_spi_execute_plan(plan, list, (int64) limit);
PLy_enter_python_intepreter = true; PLy_enter_python_intepreter = true;
return ret; return ret;
} }
...@@ -3971,7 +3973,7 @@ PLy_spi_execute(PyObject *self __attribute__((unused)), PyObject *args) ...@@ -3971,7 +3973,7 @@ PLy_spi_execute(PyObject *self __attribute__((unused)), PyObject *args)
} }
static PyObject * static PyObject *
PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit) PLy_spi_execute_plan(PyObject *ob, PyObject *list, int64 limit)
{ {
volatile int nargs; volatile int nargs;
int i, int i,
...@@ -4155,7 +4157,7 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit) ...@@ -4155,7 +4157,7 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
} }
static PyObject * static PyObject *
PLy_spi_execute_query(char *query, long limit) PLy_spi_execute_query(char *query, int64 limit)
{ {
int rv; int rv;
volatile MemoryContext oldcontext; volatile MemoryContext oldcontext;
...@@ -4233,11 +4235,30 @@ PLy_spi_execute_query(char *query, long limit) ...@@ -4233,11 +4235,30 @@ PLy_spi_execute_query(char *query, long limit)
} }
static PyObject * static PyObject *
PLy_spi_execute_fetch_result(SPITupleTable *tuptable, int rows, int status) PLy_spi_execute_fetch_result(SPITupleTable *tuptable, int64 rows, int status)
{ {
PLyResultObject *result; PLyResultObject *result;
volatile MemoryContext oldcontext; volatile MemoryContext oldcontext;
#ifdef FAULT_INJECTOR
if (rows >= 10000 && rows <= 1000000)
{
if (FaultInjector_InjectFaultIfSet(ExecutorRunHighProcessed,
DDLNotSpecified,
"" /* databaseName */,
"" /* tableName */) == FaultInjectorTypeSkip)
{
/*
* For testing purposes, pretend that we have already processed
* almost 2^32 rows.
*/
rows = UINT_MAX - 10;
}
}
#endif /* FAULT_INJECTOR */
result = (PLyResultObject *) PLy_result_new(); result = (PLyResultObject *) PLy_result_new();
Py_DECREF(result->status); Py_DECREF(result->status);
result->status = PyInt_FromLong(status); result->status = PyInt_FromLong(status);
...@@ -4245,15 +4266,17 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, int rows, int status) ...@@ -4245,15 +4266,17 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, int rows, int status)
if (status > 0 && tuptable == NULL) if (status > 0 && tuptable == NULL)
{ {
Py_DECREF(result->nrows); Py_DECREF(result->nrows);
result->nrows = PyInt_FromLong(rows); /* rows is 64 bit, Python Integer holds sys.maxint = 2^63 - 1 */
result->nrows = PyInt_FromLong((long) rows);
} }
else if (status > 0 && tuptable != NULL) else if (status > 0 && tuptable != NULL)
{ {
PLyTypeInfo args; PLyTypeInfo args;
int i; int64 i;
Py_DECREF(result->nrows); Py_DECREF(result->nrows);
result->nrows = PyInt_FromLong(rows); /* rows is 64 bit, Python Integer holds sys.maxint = 2^63 - 1 */
result->nrows = PyInt_FromLong((long) rows);
PLy_typeinfo_init(&args); PLy_typeinfo_init(&args);
oldcontext = CurrentMemoryContext; oldcontext = CurrentMemoryContext;
...@@ -4262,7 +4285,7 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, int rows, int status) ...@@ -4262,7 +4285,7 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, int rows, int status)
if (rows) if (rows)
{ {
Py_DECREF(result->rows); Py_DECREF(result->rows);
result->rows = PyList_New(rows); result->rows = PyList_New((Py_ssize_t) rows);
PLy_input_tuple_funcs(&args, tuptable->tupdesc); PLy_input_tuple_funcs(&args, tuptable->tupdesc);
...@@ -4272,7 +4295,7 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, int rows, int status) ...@@ -4272,7 +4295,7 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, int rows, int status)
PyObject *row = PLyDict_FromTuple(&args, tuptable->vals[i], PyObject *row = PLyDict_FromTuple(&args, tuptable->vals[i],
tuptable->tupdesc); tuptable->tupdesc);
PyList_SetItem(result->rows, i, row); PyList_SetItem(result->rows, (Py_ssize_t) i, row);
} }
} }
} }
......
--start_ignore
CREATE EXTENSION IF NOT EXISTS gp_inject_fault;
--end_ignore
--start_ignore
CREATE LANGUAGE plpythonu;
--end_ignore
DROP FUNCTION IF EXISTS public.test_bigint_python();
NOTICE: function public.test_bigint_python() does not exist, skipping
DROP TABLE IF EXISTS public.spi64bittestpython;
NOTICE: table "spi64bittestpython" does not exist, skipping
CREATE TABLE public.spi64bittestpython (id BIGSERIAL PRIMARY KEY, data BIGINT);
NOTICE: CREATE TABLE will create implicit sequence "spi64bittestpython_id_seq" for serial column "spi64bittestpython.id"
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "spi64bittestpython_pkey" for table "spi64bittestpython"
CREATE FUNCTION public.test_bigint_python()
RETURNS BIGINT
AS $$
res = plpy.execute("INSERT INTO public.spi64bittestpython (data) SELECT g FROM generate_series(1, 30000) g")
return res.nrows()
$$ LANGUAGE plpythonu;
-- insert 30k rows without fault injection framework
SELECT public.test_bigint_python();
test_bigint_python
--------------------
30000
(1 row)
SELECT COUNT(*) AS count
FROM public.spi64bittestpython;
count
-------
30000
(1 row)
-- activate fault injection framework
SELECT gp_inject_fault('executor_run_high_processed', 'skip', dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
NOTICE: Success:
NOTICE: Success: (seg0 192.168.0.65:25432 pid=121194)
NOTICE: Success: (seg1 192.168.0.65:25433 pid=121196)
NOTICE: Success: (seg2 192.168.0.65:25434 pid=121195)
gp_inject_fault
-----------------
t
t
t
t
(4 rows)
-- and insert another 30k rows, this time overflowing the 2^32 counter
SELECT public.test_bigint_python();
test_bigint_python
--------------------
4294987275
(1 row)
-- reset fault injection framework
SELECT gp_inject_fault('executor_run_high_processed', 'reset', dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
NOTICE: Success:
NOTICE: Success: (seg0 192.168.0.65:25432 pid=121194)
NOTICE: Success: (seg1 192.168.0.65:25433 pid=121196)
NOTICE: Success: (seg2 192.168.0.65:25434 pid=121195)
gp_inject_fault
-----------------
t
t
t
t
(4 rows)
SELECT COUNT(*) AS count
FROM public.spi64bittestpython;
count
-------
60000
(1 row)
-- drop test table
DROP TABLE public.spi64bittestpython;
...@@ -22,6 +22,7 @@ test: instr_in_shmem ...@@ -22,6 +22,7 @@ test: instr_in_shmem
test: gp_tablespace gp_aggregates gp_metadata variadic_parameters default_parameters function_extensions spi gp_xml pgoptions shared_scan update_gp test: gp_tablespace gp_aggregates gp_metadata variadic_parameters default_parameters function_extensions spi gp_xml pgoptions shared_scan update_gp
test: spi_processed64bit test: spi_processed64bit
test: python_processed64bit
test: leastsquares opr_sanity_gp decode_expr bitmapscan bitmapscan_ao case_gp limit_gp notin percentile join_gp union_gp gpcopy gp_create_table gp_create_view window_views test: leastsquares opr_sanity_gp decode_expr bitmapscan bitmapscan_ao case_gp limit_gp notin percentile join_gp union_gp gpcopy gp_create_table gp_create_view window_views
test: filter gpctas gpdist matrix toast sublink table_functions olap_setup complex opclass_ddl information_schema guc_env_var guc_gp gp_explain test: filter gpctas gpdist matrix toast sublink table_functions olap_setup complex opclass_ddl information_schema guc_env_var guc_gp gp_explain
......
--start_ignore
CREATE EXTENSION IF NOT EXISTS gp_inject_fault;
--end_ignore
--start_ignore
CREATE LANGUAGE plpythonu;
--end_ignore
DROP FUNCTION IF EXISTS public.test_bigint_python();
DROP TABLE IF EXISTS public.spi64bittestpython;
CREATE TABLE public.spi64bittestpython (id BIGSERIAL PRIMARY KEY, data BIGINT);
CREATE FUNCTION public.test_bigint_python()
RETURNS BIGINT
AS $$
res = plpy.execute("INSERT INTO public.spi64bittestpython (data) SELECT g FROM generate_series(1, 30000) g")
return res.nrows()
$$ LANGUAGE plpythonu;
-- insert 30k rows without fault injection framework
SELECT public.test_bigint_python();
SELECT COUNT(*) AS count
FROM public.spi64bittestpython;
-- activate fault injection framework
SELECT gp_inject_fault('executor_run_high_processed', 'skip', dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
-- and insert another 30k rows, this time overflowing the 2^32 counter
SELECT public.test_bigint_python();
-- reset fault injection framework
SELECT gp_inject_fault('executor_run_high_processed', 'reset', dbid)
FROM pg_catalog.gp_segment_configuration
WHERE role = 'p';
SELECT COUNT(*) AS count
FROM public.spi64bittestpython;
-- drop test table
DROP TABLE public.spi64bittestpython;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册