提交 2b5267fe 编写于 作者: H Heikki Linnakangas

Refactor reshuffle tests, using gp_execute_on_segment() helper function.

This enhances gp_execute_on_segment(), so that it can be used to execute
queries that return a result, and the result is passed back to the caller.
With that, we can avoid the pl/python hack to connect to segments in
utility mode, in reshuffle_ao test.

I imagine that this would be useful for many other tests too, but I
haven't yet searched around for more use cases.
Reviewed-by: NAsim R P <apraveen@pivotal.io>
Reviewed-by: NZhenghua Lyu <zlv@pivotal.io>
上级 990941ff
......@@ -108,7 +108,8 @@ static DispatchCommandQueryParms *cdbdisp_buildUtilityQueryParms(struct Node *st
static DispatchCommandQueryParms *cdbdisp_buildCommandQueryParms(const char *strCommand, int flags);
static void cdbdisp_dispatchCommandInternal(DispatchCommandQueryParms *pQueryParms,
int flags, CdbPgResults *cdb_pgresults);
int flags, List *segments,
CdbPgResults *cdb_pgresults);
static void
cdbdisp_dispatchX(QueryDesc *queryDesc,
......@@ -311,12 +312,27 @@ CdbDispatchSetCommand(const char *strCommand, bool cancelOnError)
*
* -flags:
* Is the combination of DF_NEED_TWO_PHASE, DF_WITH_SNAPSHOT,DF_CANCEL_ON_ERROR
*
*/
void
CdbDispatchCommand(const char *strCommand,
int flags,
CdbPgResults *cdb_pgresults)
{
return CdbDispatchCommandToSegments(strCommand,
flags,
cdbcomponent_getCdbComponentsList(),
cdb_pgresults);
}
/*
* Like CdbDispatchCommand, but sends the command only to the
* specified segments.
*/
void
CdbDispatchCommandToSegments(const char *strCommand,
int flags,
List *segments,
CdbPgResults *cdb_pgresults)
{
DispatchCommandQueryParms *pQueryParms;
bool needTwoPhase = flags & DF_NEED_TWO_PHASE;
......@@ -332,7 +348,10 @@ CdbDispatchCommand(const char *strCommand,
pQueryParms = cdbdisp_buildCommandQueryParms(strCommand, flags);
return cdbdisp_dispatchCommandInternal(pQueryParms, flags, cdb_pgresults);
return cdbdisp_dispatchCommandInternal(pQueryParms,
flags,
segments,
cdb_pgresults);
}
/*
......@@ -369,12 +388,16 @@ CdbDispatchUtilityStatement(struct Node *stmt,
pQueryParms = cdbdisp_buildUtilityQueryParms(stmt, flags, oid_assignments);
return cdbdisp_dispatchCommandInternal(pQueryParms, flags, cdb_pgresults);
return cdbdisp_dispatchCommandInternal(pQueryParms,
flags,
cdbcomponent_getCdbComponentsList(),
cdb_pgresults);
}
static void
cdbdisp_dispatchCommandInternal(DispatchCommandQueryParms *pQueryParms,
int flags,
List *segments,
CdbPgResults *cdb_pgresults)
{
CdbDispatcherState *ds;
......@@ -394,8 +417,7 @@ cdbdisp_dispatchCommandInternal(DispatchCommandQueryParms *pQueryParms,
/*
* Allocate a primary QE for every available segDB in the system.
*/
primaryGang = AllocateGang(ds, GANGTYPE_PRIMARY_WRITER,
cdbcomponent_getCdbComponentsList());
primaryGang = AllocateGang(ds, GANGTYPE_PRIMARY_WRITER, segments);
Assert(primaryGang);
cdbdisp_makeDispatchResults(ds, 1, flags & DF_CANCEL_ON_ERROR);
......
......@@ -84,6 +84,12 @@ CdbDispatchCommand(const char* strCommand,
int flags,
struct CdbPgResults* cdb_pgresults);
void
CdbDispatchCommandToSegments(const char* strCommand,
int flags,
List *segments,
struct CdbPgResults* cdb_pgresults);
/*
* CdbDispatchUtilityStatement
*
......
......@@ -388,60 +388,11 @@ select numsegments from gp_distribution_policy where localoid='r1_reshuffle'::re
drop table r1_reshuffle;
-- Replicated tables
-- We have to make sure replicated table successfully reshuffled.
-- Currently we could only connect to the specific segments in utility mode
-- to see if the data is consistent there. We create some python function here.
-- The case can only work under the assumption: it's running on 3-segment cluster
-- in a single machine.
-- start_ignore
create language plpythonu;
-- end_ignore
create function update_on_segment(tabname text, segid int, numseg int) returns boolean as
$$
import pygresql.pg as pg
conn = pg.connect(dbname='regression')
port = conn.query("select port from gp_segment_configuration where content = %d and role = 'p'" % segid).getresult()[0][0]
conn.close()
conn = pg.connect(dbname='regression', opt='-c gp_session_role=utility', port=port)
conn.query("set allow_system_table_mods = true")
conn.query("update gp_distribution_policy set numsegments = %d where localoid = '%s'::regclass" % (numseg, tabname))
conn.close()
return True
$$
LANGUAGE plpythonu;
create function select_on_segment(sql text, segid int) returns int as
$$
import pygresql.pg as pg
conn = pg.connect(dbname='regression')
port = conn.query("select port from gp_segment_configuration where content = %d and role = 'p'" % segid).getresult()[0][0]
conn.close()
conn = pg.connect(dbname='regression', opt='-c gp_session_role=utility', port=port)
r = conn.query(sql).getresult()
conn.close()
return r[0][0]
$$
LANGUAGE plpythonu;
Create table r1_reshuffle(a int, b int, c int) distributed replicated;
update gp_distribution_policy set numsegments=1 where localoid='r1_reshuffle'::regclass;
select update_on_segment('r1_reshuffle', 0, 1);
update_on_segment
-------------------
t
(1 row)
select update_on_segment('r1_reshuffle', 1, 1);
update_on_segment
-------------------
t
(1 row)
select update_on_segment('r1_reshuffle', 2, 1);
update_on_segment
-------------------
t
select update_numsegments_in_policy('r1_reshuffle', 1);
update_numsegments_in_policy
------------------------------
(1 row)
insert into r1_reshuffle select i,i,0 from generate_series(1,100) I;
......@@ -451,16 +402,16 @@ Select count(*) from r1_reshuffle;
100
(1 row)
Select select_on_segment('Select count(*) from r1_reshuffle;', 1);
select_on_segment
-------------------
0
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle;');
gp_execute_on_server
----------------------
0
(1 row)
Select select_on_segment('Select count(*) from r1_reshuffle;', 2);
select_on_segment
-------------------
0
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle;');
gp_execute_on_server
----------------------
0
(1 row)
begin;
......@@ -478,16 +429,16 @@ Select count(*) from r1_reshuffle;
100
(1 row)
Select select_on_segment('Select count(*) from r1_reshuffle;', 1);
select_on_segment
-------------------
0
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle;');
gp_execute_on_server
----------------------
0
(1 row)
Select select_on_segment('Select count(*) from r1_reshuffle;', 2);
select_on_segment
-------------------
0
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle;');
gp_execute_on_server
----------------------
0
(1 row)
Alter table r1_reshuffle set with (reshuffle);
......@@ -497,16 +448,16 @@ Select count(*) from r1_reshuffle;
100
(1 row)
Select select_on_segment('Select count(*) from r1_reshuffle;', 1);
select_on_segment
-------------------
100
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle;');
gp_execute_on_server
----------------------
100
(1 row)
Select select_on_segment('Select count(*) from r1_reshuffle;', 2);
select_on_segment
-------------------
100
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle;');
gp_execute_on_server
----------------------
100
(1 row)
select numsegments from gp_distribution_policy where localoid='r1_reshuffle'::regclass;
......@@ -518,23 +469,10 @@ select numsegments from gp_distribution_policy where localoid='r1_reshuffle'::re
drop table r1_reshuffle;
Create table r1_reshuffle(a int, b int, c int) with OIDS distributed replicated;
NOTICE: OIDS=TRUE is not recommended for user-created tables. Use OIDS=FALSE to prevent wrap-around of the OID counter
update gp_distribution_policy set numsegments=2 where localoid='r1_reshuffle'::regclass;
select update_on_segment('r1_reshuffle', 0, 2);
update_on_segment
-------------------
t
(1 row)
select update_on_segment('r1_reshuffle', 1, 2);
update_on_segment
-------------------
t
(1 row)
select update_on_segment('r1_reshuffle', 2, 2);
update_on_segment
-------------------
t
select update_numsegments_in_policy('r1_reshuffle', 2);
update_numsegments_in_policy
------------------------------
(1 row)
insert into r1_reshuffle select i,i,0 from generate_series(1,100) I;
......@@ -544,16 +482,16 @@ Select count(*) from r1_reshuffle;
100
(1 row)
Select select_on_segment('Select count(*) from r1_reshuffle;', 1);
select_on_segment
-------------------
100
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle;');
gp_execute_on_server
----------------------
100
(1 row)
Select select_on_segment('Select count(*) from r1_reshuffle;', 2);
select_on_segment
-------------------
0
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle;');
gp_execute_on_server
----------------------
0
(1 row)
begin;
......@@ -571,16 +509,16 @@ Select count(*) from r1_reshuffle;
100
(1 row)
Select select_on_segment('Select count(*) from r1_reshuffle;', 1);
select_on_segment
-------------------
100
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle;');
gp_execute_on_server
----------------------
100
(1 row)
Select select_on_segment('Select count(*) from r1_reshuffle;', 2);
select_on_segment
-------------------
0
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle;');
gp_execute_on_server
----------------------
0
(1 row)
Alter table r1_reshuffle set with (reshuffle);
......@@ -590,16 +528,16 @@ Select count(*) from r1_reshuffle;
100
(1 row)
Select select_on_segment('Select count(*) from r1_reshuffle;', 1);
select_on_segment
-------------------
100
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle;');
gp_execute_on_server
----------------------
100
(1 row)
Select select_on_segment('Select count(*) from r1_reshuffle;', 2);
select_on_segment
-------------------
100
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle;');
gp_execute_on_server
----------------------
100
(1 row)
select numsegments from gp_distribution_policy where localoid='r1_reshuffle'::regclass;
......
......@@ -388,61 +388,11 @@ select numsegments from gp_distribution_policy where localoid='r1_reshuffle_ao':
drop table r1_reshuffle_ao;
-- Replicated tables
-- We have to make sure replicated table successfully reshuffled.
-- Currently we could only connect to the specific segments in utility mode
-- to see if the data is consistent there. We create some python function here.
-- The case can only work under the assumption: it's running on 3-segment cluster
-- in a single machine.
-- start_ignore
create language plpythonu;
ERROR: language "plpythonu" already exists
-- end_ignore
create function update_on_segment_ao(tabname text, segid int, numseg int) returns boolean as
$$
import pygresql.pg as pg
conn = pg.connect(dbname='regression')
port = conn.query("select port from gp_segment_configuration where content = %d and role = 'p'" % segid).getresult()[0][0]
conn.close()
conn = pg.connect(dbname='regression', opt='-c gp_session_role=utility', port=port)
conn.query("set allow_system_table_mods = true")
conn.query("update gp_distribution_policy set numsegments = %d where localoid = '%s'::regclass" % (numseg, tabname))
conn.close()
return True
$$
LANGUAGE plpythonu;
create function select_on_segment_ao(sql text, segid int) returns int as
$$
import pygresql.pg as pg
conn = pg.connect(dbname='regression')
port = conn.query("select port from gp_segment_configuration where content = %d and role = 'p'" % segid).getresult()[0][0]
conn.close()
conn = pg.connect(dbname='regression', opt='-c gp_session_role=utility', port=port)
r = conn.query(sql).getresult()
conn.close()
return r[0][0]
$$
LANGUAGE plpythonu;
Create table r1_reshuffle_ao(a int, b int, c int) with (appendonly = true) distributed replicated;
update gp_distribution_policy set numsegments=1 where localoid='r1_reshuffle_ao'::regclass;
select update_on_segment_ao('r1_reshuffle_ao', 0, 1);
update_on_segment_ao
----------------------
t
(1 row)
select update_on_segment_ao('r1_reshuffle_ao', 1, 1);
update_on_segment_ao
----------------------
t
(1 row)
select update_on_segment_ao('r1_reshuffle_ao', 2, 1);
update_on_segment_ao
----------------------
t
select update_numsegments_in_policy('r1_reshuffle_ao', 1);
update_numsegments_in_policy
------------------------------
(1 row)
insert into r1_reshuffle_ao select i,i,0 from generate_series(1,100) I;
......@@ -452,16 +402,16 @@ Select count(*) from r1_reshuffle_ao;
100
(1 row)
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 1);
select_on_segment_ao
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle_ao;');
gp_execute_on_server
----------------------
0
0
(1 row)
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 2);
select_on_segment_ao
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle_ao;');
gp_execute_on_server
----------------------
0
0
(1 row)
begin;
......@@ -479,16 +429,16 @@ Select count(*) from r1_reshuffle_ao;
100
(1 row)
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 1);
select_on_segment_ao
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle_ao;');
gp_execute_on_server
----------------------
0
0
(1 row)
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 2);
select_on_segment_ao
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle_ao;');
gp_execute_on_server
----------------------
0
0
(1 row)
Alter table r1_reshuffle_ao set with (reshuffle);
......@@ -498,16 +448,16 @@ Select count(*) from r1_reshuffle_ao;
100
(1 row)
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 1);
select_on_segment_ao
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle_ao;');
gp_execute_on_server
----------------------
100
100
(1 row)
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 2);
select_on_segment_ao
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle_ao;');
gp_execute_on_server
----------------------
100
100
(1 row)
select numsegments from gp_distribution_policy where localoid='r1_reshuffle_ao'::regclass;
......@@ -519,23 +469,10 @@ select numsegments from gp_distribution_policy where localoid='r1_reshuffle_ao':
drop table r1_reshuffle_ao;
Create table r1_reshuffle_ao(a int, b int, c int) with (appendonly = true, OIDS = true) distributed replicated;
NOTICE: OIDS=TRUE is not recommended for user-created tables. Use OIDS=FALSE to prevent wrap-around of the OID counter
update gp_distribution_policy set numsegments=2 where localoid='r1_reshuffle_ao'::regclass;
select update_on_segment_ao('r1_reshuffle_ao', 0, 2);
update_on_segment_ao
----------------------
t
(1 row)
select update_on_segment_ao('r1_reshuffle_ao', 1, 2);
update_on_segment_ao
----------------------
t
(1 row)
select update_on_segment_ao('r1_reshuffle_ao', 2, 2);
update_on_segment_ao
----------------------
t
select update_numsegments_in_policy('r1_reshuffle_ao', 2);
update_numsegments_in_policy
------------------------------
(1 row)
insert into r1_reshuffle_ao select i,i,0 from generate_series(1,100) I;
......@@ -545,16 +482,16 @@ Select count(*) from r1_reshuffle_ao;
100
(1 row)
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 1);
select_on_segment_ao
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle_ao;');
gp_execute_on_server
----------------------
100
100
(1 row)
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 2);
select_on_segment_ao
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle_ao;');
gp_execute_on_server
----------------------
0
0
(1 row)
begin;
......@@ -572,16 +509,16 @@ Select count(*) from r1_reshuffle_ao;
100
(1 row)
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 1);
select_on_segment_ao
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle_ao;');
gp_execute_on_server
----------------------
100
100
(1 row)
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 2);
select_on_segment_ao
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle_ao;');
gp_execute_on_server
----------------------
0
0
(1 row)
Alter table r1_reshuffle_ao set with (reshuffle);
......@@ -591,16 +528,16 @@ Select count(*) from r1_reshuffle_ao;
100
(1 row)
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 1);
select_on_segment_ao
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle_ao;');
gp_execute_on_server
----------------------
100
100
(1 row)
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 2);
select_on_segment_ao
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle_ao;');
gp_execute_on_server
----------------------
100
100
(1 row)
select numsegments from gp_distribution_policy where localoid='r1_reshuffle_ao'::regclass;
......
......@@ -280,61 +280,11 @@ select numsegments from gp_distribution_policy where localoid='r1_reshuffle_aoco
drop table r1_reshuffle_aoco;
-- Replicated tables
-- We have to make sure replicated table successfully reshuffled.
-- Currently we could only connect to the specific segments in utility mode
-- to see if the data is consistent there. We create some python function here.
-- The case can only work under the assumption: it's running on 3-segment cluster
-- in a single machine.
-- start_ignore
create language plpythonu;
ERROR: language "plpythonu" already exists
-- end_ignore
create function update_on_segment_aoco(tabname text, segid int, numseg int) returns boolean as
$$
import pygresql.pg as pg
conn = pg.connect(dbname='regression')
port = conn.query("select port from gp_segment_configuration where content = %d and role = 'p'" % segid).getresult()[0][0]
conn.close()
conn = pg.connect(dbname='regression', opt='-c gp_session_role=utility', port=port)
conn.query("set allow_system_table_mods = true")
conn.query("update gp_distribution_policy set numsegments = %d where localoid = '%s'::regclass" % (numseg, tabname))
conn.close()
return True
$$
LANGUAGE plpythonu;
create function select_on_segment_aoco(sql text, segid int) returns int as
$$
import pygresql.pg as pg
conn = pg.connect(dbname='regression')
port = conn.query("select port from gp_segment_configuration where content = %d and role = 'p'" % segid).getresult()[0][0]
conn.close()
conn = pg.connect(dbname='regression', opt='-c gp_session_role=utility', port=port)
r = conn.query(sql).getresult()
conn.close()
return r[0][0]
$$
LANGUAGE plpythonu;
Create table r1_reshuffle_aoco(a int, b int, c int) with (appendonly = true, orientation = column) distributed replicated;
update gp_distribution_policy set numsegments=1 where localoid='r1_reshuffle_aoco'::regclass;
select update_on_segment_aoco('r1_reshuffle_aoco', 0, 1);
update_on_segment_aoco
------------------------
t
(1 row)
select update_on_segment_aoco('r1_reshuffle_aoco', 1, 1);
update_on_segment_aoco
------------------------
t
(1 row)
select update_on_segment_aoco('r1_reshuffle_aoco', 2, 1);
update_on_segment_aoco
------------------------
t
select update_numsegments_in_policy('r1_reshuffle_aoco', 1);
update_numsegments_in_policy
------------------------------
(1 row)
insert into r1_reshuffle_aoco select i,i,0 from generate_series(1,100) I;
......@@ -344,16 +294,16 @@ Select count(*) from r1_reshuffle_aoco;
100
(1 row)
Select select_on_segment_aoco('Select count(*) from r1_reshuffle_aoco;', 1);
select_on_segment_aoco
------------------------
0
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle_aoco;');
gp_execute_on_server
----------------------
0
(1 row)
Select select_on_segment_aoco('Select count(*) from r1_reshuffle_aoco;', 2);
select_on_segment_aoco
------------------------
0
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle_aoco;');
gp_execute_on_server
----------------------
0
(1 row)
begin;
......@@ -371,16 +321,16 @@ Select count(*) from r1_reshuffle_aoco;
100
(1 row)
Select select_on_segment_aoco('Select count(*) from r1_reshuffle_aoco;', 1);
select_on_segment_aoco
------------------------
0
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle_aoco;');
gp_execute_on_server
----------------------
0
(1 row)
Select select_on_segment_aoco('Select count(*) from r1_reshuffle_aoco;', 2);
select_on_segment_aoco
------------------------
0
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle_aoco;');
gp_execute_on_server
----------------------
0
(1 row)
Alter table r1_reshuffle_aoco set with (reshuffle);
......@@ -390,16 +340,16 @@ Select count(*) from r1_reshuffle_aoco;
100
(1 row)
Select select_on_segment_aoco('Select count(*) from r1_reshuffle_aoco;', 1);
select_on_segment_aoco
------------------------
100
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle_aoco;');
gp_execute_on_server
----------------------
100
(1 row)
Select select_on_segment_aoco('Select count(*) from r1_reshuffle_aoco;', 2);
select_on_segment_aoco
------------------------
100
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle_aoco;');
gp_execute_on_server
----------------------
100
(1 row)
select numsegments from gp_distribution_policy where localoid='r1_reshuffle_aoco'::regclass;
......
......@@ -15,6 +15,7 @@
# hitting max_connections limit on segments.
#
# test reshuffle
test: reshuffle_helper
test: reshuffle reshuffle_ao reshuffle_aoco
# enable query metrics cluster GUC
......
CREATE OR REPLACE FUNCTION gp_execute_on_server(content int, query text) returns text
language C as '@abs_builddir@/regress@DLSUFFIX@', 'gp_execute_on_server';
-- A helper function, to forcibly update gp_distribution_policy.numsegments
-- for a table, in the QD as well as all QEs.
create function update_numsegments_in_policy(relid regclass, numseg int) returns void as
$$
declare
update_sql text;
begin
-- Update the entry in QD.
update_sql = 'update gp_distribution_policy set numsegments=' || numseg || ' where localoid=' || relid::oid;
execute update_sql;
-- And the same in each QE.
-- XXX: This works under the assumption that this is a 3-segment cluster
perform gp_execute_on_server(0, update_sql);
perform gp_execute_on_server(1, update_sql);
perform gp_execute_on_server(2, update_sql);
end;
$$
LANGUAGE plpgsql;
set log_min_messages to ERROR;
CREATE FUNCTION gp_execute_on_server(content smallint, query cstring) returns boolean
CREATE OR REPLACE FUNCTION gp_execute_on_server(content int, query text) returns text
language C as '@abs_builddir@/regress@DLSUFFIX@', 'gp_execute_on_server';
-- terminate backend process for this session on segment with content ID = 0
select gp_execute_on_server(0::smallint, 'select pg_terminate_backend(pg_backend_pid())'::cstring);
select gp_execute_on_server(0, 'select pg_terminate_backend(pg_backend_pid())');
-- session is reset and gp_session_id value changes
-- sess_id in pg_stat_activity should be consistent with gp_session_id
......
CREATE OR REPLACE FUNCTION gp_execute_on_server(content int, query text) returns text
language C as '@abs_builddir@/regress@DLSUFFIX@', 'gp_execute_on_server';
-- A helper function, to forcibly update gp_distribution_policy.numsegments
-- for a table, in the QD as well as all QEs.
create function update_numsegments_in_policy(relid regclass, numseg int) returns void as
$$
declare
update_sql text;
begin
-- Update the entry in QD.
update_sql = 'update gp_distribution_policy set numsegments=' || numseg || ' where localoid=' || relid::oid;
execute update_sql;
-- And the same in each QE.
-- XXX: This works under the assumption that this is a 3-segment cluster
perform gp_execute_on_server(0, update_sql);
perform gp_execute_on_server(1, update_sql);
perform gp_execute_on_server(2, update_sql);
end;
$$
LANGUAGE plpgsql;
set log_min_messages to ERROR;
CREATE FUNCTION gp_execute_on_server(content smallint, query cstring) returns boolean
CREATE OR REPLACE FUNCTION gp_execute_on_server(content int, query text) returns text
language C as '@abs_builddir@/regress@DLSUFFIX@', 'gp_execute_on_server';
-- terminate backend process for this session on segment with content ID = 0
select gp_execute_on_server(0::smallint, 'select pg_terminate_backend(pg_backend_pid())'::cstring);
select gp_execute_on_server(0, 'select pg_terminate_backend(pg_backend_pid())');
ERROR: terminating connection due to administrator command (seg0 10.152.10.149:25432 pid=32665)
-- session is reset and gp_session_id value changes
-- sess_id in pg_stat_activity should be consistent with gp_session_id
......
......@@ -1978,57 +1978,62 @@ test_consume_xids(PG_FUNCTION_ARGS)
/*
* Function to execute a DML/DDL command on segment with specified content id.
* Returns true on success or error on failure.
* To use:
*
* CREATE FUNCTION gp_execute_on_server(content int, query text) returns text
* language C as '$libdir/regress.so', 'gp_execute_on_server';
*/
PG_FUNCTION_INFO_V1(gp_execute_on_server);
Datum
gp_execute_on_server(PG_FUNCTION_ARGS)
{
int16 content = PG_GETARG_INT16(0);
char *query = PG_GETARG_CSTRING(1);
int ret;
int proc;
if (GpIdentity.segindex == content)
int32 content = PG_GETARG_INT32(0);
char *query = TextDatumGetCString(PG_GETARG_TEXT_PP(1));
CdbPgResults cdb_pgresults;
StringInfoData result_str;
if (!IS_QUERY_DISPATCHER())
elog(ERROR, "cannot use gp_execute_on_server() when not in QD mode");
CdbDispatchCommandToSegments(query,
DF_CANCEL_ON_ERROR | DF_WITH_SNAPSHOT,
list_make1_int(content),
&cdb_pgresults);
/*
* Collect the results.
*
* All the result fields are appended to a string, with minimal
* formatting. That's not very pretty, but is good enough for
* regression tests.
*/
initStringInfo(&result_str);
for (int resultno = 0; resultno < cdb_pgresults.numResults; resultno++)
{
if ((ret = SPI_connect()) < 0)
/* internal error */
elog(ERROR, "SPI_connect returned %d", ret);
struct pg_result *pgresult = cdb_pgresults.pg_results[resultno];
/* Retrieve the desired rows */
ret = SPI_execute(query, false, 0);
proc = SPI_processed;
if (ret != SPI_OK_SELECT || proc <= 0)
if (PQresultStatus(pgresult) != PGRES_TUPLES_OK &&
PQresultStatus(pgresult) != PGRES_COMMAND_OK)
{
SPI_finish();
elog(ERROR, "SPI failed on segment %d, return code %d",
GpIdentity.segindex, ret);
cdbdisp_clearCdbPgResults(&cdb_pgresults);
elog(ERROR, "execution failed with status %d", PQresultStatus(pgresult));
}
SPI_finish();
PG_RETURN_BOOL(true);
}
else if (IS_QUERY_DISPATCHER())
{
proc = 0;
CdbPgResults cdb_pgresults;
int i;
CdbDispatchCommand(query, DF_CANCEL_ON_ERROR | DF_WITH_SNAPSHOT, &cdb_pgresults);
for (i = 0; i < cdb_pgresults.numResults; i++)
{
struct pg_result *pgresult = cdb_pgresults.pg_results[i];
if (PQresultStatus(pgresult) != PGRES_TUPLES_OK &&
PQresultStatus(pgresult) != PGRES_COMMAND_OK)
for (int rowno = 0; rowno < PQntuples(pgresult); rowno++)
{
if (rowno > 0)
appendStringInfoString(&result_str, "\n");
for (int colno = 0; colno < PQnfields(pgresult); colno++)
{
cdbdisp_clearCdbPgResults(&cdb_pgresults);
elog(ERROR, "execution failed with status %d", PQresultStatus(pgresult));
if (colno > 0)
appendStringInfoString(&result_str, " ");
appendStringInfoString(&result_str, PQgetvalue(pgresult, rowno, colno));
}
}
cdbdisp_clearCdbPgResults(&cdb_pgresults);
PG_RETURN_BOOL(true);
}
else
PG_RETURN_BOOL(true);
cdbdisp_clearCdbPgResults(&cdb_pgresults);
PG_RETURN_TEXT_P(CStringGetTextDatum(result_str.data));
}
/*
......
......@@ -175,54 +175,14 @@ drop table r1_reshuffle;
-- Replicated tables
-- We have to make sure replicated table successfully reshuffled.
-- Currently we could only connect to the specific segments in utility mode
-- to see if the data is consistent there. We create some python function here.
-- The case can only work under the assumption: it's running on 3-segment cluster
-- in a single machine.
-- start_ignore
create language plpythonu;
-- end_ignore
create function update_on_segment(tabname text, segid int, numseg int) returns boolean as
$$
import pygresql.pg as pg
conn = pg.connect(dbname='regression')
port = conn.query("select port from gp_segment_configuration where content = %d and role = 'p'" % segid).getresult()[0][0]
conn.close()
conn = pg.connect(dbname='regression', opt='-c gp_session_role=utility', port=port)
conn.query("set allow_system_table_mods = true")
conn.query("update gp_distribution_policy set numsegments = %d where localoid = '%s'::regclass" % (numseg, tabname))
conn.close()
return True
$$
LANGUAGE plpythonu;
create function select_on_segment(sql text, segid int) returns int as
$$
import pygresql.pg as pg
conn = pg.connect(dbname='regression')
port = conn.query("select port from gp_segment_configuration where content = %d and role = 'p'" % segid).getresult()[0][0]
conn.close()
conn = pg.connect(dbname='regression', opt='-c gp_session_role=utility', port=port)
r = conn.query(sql).getresult()
conn.close()
return r[0][0]
$$
LANGUAGE plpythonu;
Create table r1_reshuffle(a int, b int, c int) distributed replicated;
update gp_distribution_policy set numsegments=1 where localoid='r1_reshuffle'::regclass;
select update_on_segment('r1_reshuffle', 0, 1);
select update_on_segment('r1_reshuffle', 1, 1);
select update_on_segment('r1_reshuffle', 2, 1);
select update_numsegments_in_policy('r1_reshuffle', 1);
insert into r1_reshuffle select i,i,0 from generate_series(1,100) I;
Select count(*) from r1_reshuffle;
Select select_on_segment('Select count(*) from r1_reshuffle;', 1);
Select select_on_segment('Select count(*) from r1_reshuffle;', 2);
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle;');
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle;');
begin;
Alter table r1_reshuffle set with (reshuffle);
......@@ -230,28 +190,25 @@ Select count(*) from r1_reshuffle;
abort;
Select count(*) from r1_reshuffle;
Select select_on_segment('Select count(*) from r1_reshuffle;', 1);
Select select_on_segment('Select count(*) from r1_reshuffle;', 2);
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle;');
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle;');
Alter table r1_reshuffle set with (reshuffle);
Select count(*) from r1_reshuffle;
Select select_on_segment('Select count(*) from r1_reshuffle;', 1);
Select select_on_segment('Select count(*) from r1_reshuffle;', 2);
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle;');
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle;');
select numsegments from gp_distribution_policy where localoid='r1_reshuffle'::regclass;
drop table r1_reshuffle;
Create table r1_reshuffle(a int, b int, c int) with OIDS distributed replicated;
update gp_distribution_policy set numsegments=2 where localoid='r1_reshuffle'::regclass;
select update_on_segment('r1_reshuffle', 0, 2);
select update_on_segment('r1_reshuffle', 1, 2);
select update_on_segment('r1_reshuffle', 2, 2);
select update_numsegments_in_policy('r1_reshuffle', 2);
insert into r1_reshuffle select i,i,0 from generate_series(1,100) I;
Select count(*) from r1_reshuffle;
Select select_on_segment('Select count(*) from r1_reshuffle;', 1);
Select select_on_segment('Select count(*) from r1_reshuffle;', 2);
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle;');
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle;');
begin;
Alter table r1_reshuffle set with (reshuffle);
......@@ -259,14 +216,14 @@ Select count(*) from r1_reshuffle;
abort;
Select count(*) from r1_reshuffle;
Select select_on_segment('Select count(*) from r1_reshuffle;', 1);
Select select_on_segment('Select count(*) from r1_reshuffle;', 2);
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle;');
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle;');
Alter table r1_reshuffle set with (reshuffle);
Select count(*) from r1_reshuffle;
Select select_on_segment('Select count(*) from r1_reshuffle;', 1);
Select select_on_segment('Select count(*) from r1_reshuffle;', 2);
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle;');
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle;');
select numsegments from gp_distribution_policy where localoid='r1_reshuffle'::regclass;
drop table r1_reshuffle;
......
......@@ -175,54 +175,14 @@ drop table r1_reshuffle_ao;
-- Replicated tables
-- We have to make sure replicated table successfully reshuffled.
-- Currently we could only connect to the specific segments in utility mode
-- to see if the data is consistent there. We create some python function here.
-- The case can only work under the assumption: it's running on 3-segment cluster
-- in a single machine.
-- start_ignore
create language plpythonu;
-- end_ignore
create function update_on_segment_ao(tabname text, segid int, numseg int) returns boolean as
$$
import pygresql.pg as pg
conn = pg.connect(dbname='regression')
port = conn.query("select port from gp_segment_configuration where content = %d and role = 'p'" % segid).getresult()[0][0]
conn.close()
conn = pg.connect(dbname='regression', opt='-c gp_session_role=utility', port=port)
conn.query("set allow_system_table_mods = true")
conn.query("update gp_distribution_policy set numsegments = %d where localoid = '%s'::regclass" % (numseg, tabname))
conn.close()
return True
$$
LANGUAGE plpythonu;
create function select_on_segment_ao(sql text, segid int) returns int as
$$
import pygresql.pg as pg
conn = pg.connect(dbname='regression')
port = conn.query("select port from gp_segment_configuration where content = %d and role = 'p'" % segid).getresult()[0][0]
conn.close()
conn = pg.connect(dbname='regression', opt='-c gp_session_role=utility', port=port)
r = conn.query(sql).getresult()
conn.close()
return r[0][0]
$$
LANGUAGE plpythonu;
Create table r1_reshuffle_ao(a int, b int, c int) with (appendonly = true) distributed replicated;
update gp_distribution_policy set numsegments=1 where localoid='r1_reshuffle_ao'::regclass;
select update_on_segment_ao('r1_reshuffle_ao', 0, 1);
select update_on_segment_ao('r1_reshuffle_ao', 1, 1);
select update_on_segment_ao('r1_reshuffle_ao', 2, 1);
select update_numsegments_in_policy('r1_reshuffle_ao', 1);
insert into r1_reshuffle_ao select i,i,0 from generate_series(1,100) I;
Select count(*) from r1_reshuffle_ao;
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 1);
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 2);
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle_ao;');
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle_ao;');
begin;
Alter table r1_reshuffle_ao set with (reshuffle);
......@@ -230,28 +190,25 @@ Select count(*) from r1_reshuffle_ao;
abort;
Select count(*) from r1_reshuffle_ao;
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 1);
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 2);
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle_ao;');
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle_ao;');
Alter table r1_reshuffle_ao set with (reshuffle);
Select count(*) from r1_reshuffle_ao;
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 1);
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 2);
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle_ao;');
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle_ao;');
select numsegments from gp_distribution_policy where localoid='r1_reshuffle_ao'::regclass;
drop table r1_reshuffle_ao;
Create table r1_reshuffle_ao(a int, b int, c int) with (appendonly = true, OIDS = true) distributed replicated;
update gp_distribution_policy set numsegments=2 where localoid='r1_reshuffle_ao'::regclass;
select update_on_segment_ao('r1_reshuffle_ao', 0, 2);
select update_on_segment_ao('r1_reshuffle_ao', 1, 2);
select update_on_segment_ao('r1_reshuffle_ao', 2, 2);
select update_numsegments_in_policy('r1_reshuffle_ao', 2);
insert into r1_reshuffle_ao select i,i,0 from generate_series(1,100) I;
Select count(*) from r1_reshuffle_ao;
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 1);
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 2);
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle_ao;');
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle_ao;');
begin;
Alter table r1_reshuffle_ao set with (reshuffle);
......@@ -259,14 +216,14 @@ Select count(*) from r1_reshuffle_ao;
abort;
Select count(*) from r1_reshuffle_ao;
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 1);
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 2);
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle_ao;');
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle_ao;');
Alter table r1_reshuffle_ao set with (reshuffle);
Select count(*) from r1_reshuffle_ao;
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 1);
Select select_on_segment_ao('Select count(*) from r1_reshuffle_ao;', 2);
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle_ao;');
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle_ao;');
select numsegments from gp_distribution_policy where localoid='r1_reshuffle_ao'::regclass;
drop table r1_reshuffle_ao;
......@@ -128,54 +128,14 @@ drop table r1_reshuffle_aoco;
-- Replicated tables
-- We have to make sure replicated table successfully reshuffled.
-- Currently we could only connect to the specific segments in utility mode
-- to see if the data is consistent there. We create some python function here.
-- The case can only work under the assumption: it's running on 3-segment cluster
-- in a single machine.
-- start_ignore
create language plpythonu;
-- end_ignore
create function update_on_segment_aoco(tabname text, segid int, numseg int) returns boolean as
$$
import pygresql.pg as pg
conn = pg.connect(dbname='regression')
port = conn.query("select port from gp_segment_configuration where content = %d and role = 'p'" % segid).getresult()[0][0]
conn.close()
conn = pg.connect(dbname='regression', opt='-c gp_session_role=utility', port=port)
conn.query("set allow_system_table_mods = true")
conn.query("update gp_distribution_policy set numsegments = %d where localoid = '%s'::regclass" % (numseg, tabname))
conn.close()
return True
$$
LANGUAGE plpythonu;
create function select_on_segment_aoco(sql text, segid int) returns int as
$$
import pygresql.pg as pg
conn = pg.connect(dbname='regression')
port = conn.query("select port from gp_segment_configuration where content = %d and role = 'p'" % segid).getresult()[0][0]
conn.close()
conn = pg.connect(dbname='regression', opt='-c gp_session_role=utility', port=port)
r = conn.query(sql).getresult()
conn.close()
return r[0][0]
$$
LANGUAGE plpythonu;
Create table r1_reshuffle_aoco(a int, b int, c int) with (appendonly = true, orientation = column) distributed replicated;
update gp_distribution_policy set numsegments=1 where localoid='r1_reshuffle_aoco'::regclass;
select update_on_segment_aoco('r1_reshuffle_aoco', 0, 1);
select update_on_segment_aoco('r1_reshuffle_aoco', 1, 1);
select update_on_segment_aoco('r1_reshuffle_aoco', 2, 1);
select update_numsegments_in_policy('r1_reshuffle_aoco', 1);
insert into r1_reshuffle_aoco select i,i,0 from generate_series(1,100) I;
Select count(*) from r1_reshuffle_aoco;
Select select_on_segment_aoco('Select count(*) from r1_reshuffle_aoco;', 1);
Select select_on_segment_aoco('Select count(*) from r1_reshuffle_aoco;', 2);
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle_aoco;');
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle_aoco;');
begin;
Alter table r1_reshuffle_aoco set with (reshuffle);
......@@ -183,14 +143,14 @@ Select count(*) from r1_reshuffle_aoco;
abort;
Select count(*) from r1_reshuffle_aoco;
Select select_on_segment_aoco('Select count(*) from r1_reshuffle_aoco;', 1);
Select select_on_segment_aoco('Select count(*) from r1_reshuffle_aoco;', 2);
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle_aoco;');
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle_aoco;');
Alter table r1_reshuffle_aoco set with (reshuffle);
Select count(*) from r1_reshuffle_aoco;
Select select_on_segment_aoco('Select count(*) from r1_reshuffle_aoco;', 1);
Select select_on_segment_aoco('Select count(*) from r1_reshuffle_aoco;', 2);
Select gp_execute_on_server(1, 'Select count(*) from r1_reshuffle_aoco;');
Select gp_execute_on_server(2, 'Select count(*) from r1_reshuffle_aoco;');
select numsegments from gp_distribution_policy where localoid='r1_reshuffle_aoco'::regclass;
drop table r1_reshuffle_aoco;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册