提交 4c4ce14c 编写于 作者: F Francisco Guerrero 提交者: Francisco Guerrero

Enable parallel writes for Foreign Data Wrappers

This commit enables parallel writes for Foreign Data Wrapper. This
feature is currently missing from the FDW framework, whilst parallel
scans are supported, parallel writes are missing. FDW parallel writes
are analogous to writing to writable external tables that run on all
segments.

One caveat is that in the external table framework, writable tables
support a distribution policy:

    CREATE WRITABLE EXTERNAL TABLE foo (id int)
    LOCATION ('....')
    FORMAT 'CSV'
    DISTRIBUTED BY (id);

In foreign tables, the distribution policy cannot be defined during the
table definition, so we assume random distribution for all foreign
tables.

Parallel writes are enabled when the foreign table's exec_location is
set to FTEXECLOCATION_ALL_SEGMENTS only. For foreign tables that run on
master or any segment, the current policy behavior remains.
上级 1bc2396b
...@@ -10,7 +10,7 @@ SHLIB_LINK_INTERNAL = $(libpq) ...@@ -10,7 +10,7 @@ SHLIB_LINK_INTERNAL = $(libpq)
EXTENSION = postgres_fdw EXTENSION = postgres_fdw
DATA = postgres_fdw--1.0.sql DATA = postgres_fdw--1.0.sql
REGRESS = postgres_fdw REGRESS = postgres_fdw gp_postgres_fdw
ifdef USE_PGXS ifdef USE_PGXS
PG_CONFIG = pg_config PG_CONFIG = pg_config
......
此差异已折叠。
-- ===================================================================
-- Greenplum-specific features for postgres_fdw
-- ===================================================================
-- ===================================================================
-- Create source tables and populate with data
-- ===================================================================
CREATE TABLE table_dist_rand
(
f1 int,
f2 text,
f3 text
) DISTRIBUTED RANDOMLY;
CREATE TABLE table_dist_repl
(
f1 int,
f2 text,
f3 text
) DISTRIBUTED REPLICATED;
CREATE TABLE table_dist_int
(
f1 int,
f2 text,
f3 text
) DISTRIBUTED BY (f1);
CREATE TABLE table_dist_text
(
f1 int,
f2 text,
f3 text
) DISTRIBUTED BY (f2);
CREATE TABLE table_dist_int_text
(
f1 int,
f2 text,
f3 text
) DISTRIBUTED BY (f1, f2);
INSERT INTO table_dist_rand
VALUES (1, 'a', 'aa'),
(2, 'b', 'bb'),
(3, 'c', 'cc'),
(4, 'd', 'dd'),
(5, 'e', 'ee'),
(6, 'f', 'ff'),
(7, 'g', 'gg'),
(8, 'h', 'hh'),
(9, 'i', 'ii'),
(10, 'j', 'jj'),
(11, 'k', 'kk'),
(12, 'l', 'll');
INSERT INTO table_dist_repl SELECT * FROM table_dist_rand;
INSERT INTO table_dist_int SELECT * FROM table_dist_rand;
INSERT INTO table_dist_text SELECT * FROM table_dist_rand;
INSERT INTO table_dist_int_text SELECT * FROM table_dist_rand;
-- ===================================================================
-- create target table
-- ===================================================================
CREATE TABLE "S 1"."GP 1" (
f1 int,
f2 text,
f3 text
);
-- ===================================================================
-- create foreign tables
-- ===================================================================
CREATE FOREIGN TABLE gp_ft1 (
f1 int,
f2 text,
f3 text
) SERVER loopback OPTIONS (schema_name 'S 1', table_name 'GP 1', mpp_execute 'all segments');
-- ===================================================================
-- validate parallel writes (mpp_execute set to all segments)
-- ===================================================================
EXPLAIN (COSTS FALSE) INSERT INTO gp_ft1 SELECT * FROM table_dist_rand;
INSERT INTO gp_ft1 SELECT * FROM table_dist_rand;
SELECT * FROM "S 1"."GP 1" ORDER BY f1;
TRUNCATE TABLE "S 1"."GP 1";
EXPLAIN (COSTS FALSE) INSERT INTO gp_ft1 SELECT * FROM table_dist_repl;
INSERT INTO gp_ft1 SELECT * FROM table_dist_repl;
SELECT * FROM "S 1"."GP 1" ORDER BY f1;
TRUNCATE TABLE "S 1"."GP 1";
EXPLAIN (COSTS FALSE) INSERT INTO gp_ft1 SELECT * FROM table_dist_int;
INSERT INTO gp_ft1 SELECT * FROM table_dist_int;
SELECT * FROM "S 1"."GP 1" ORDER BY f1;
TRUNCATE TABLE "S 1"."GP 1";
EXPLAIN (COSTS FALSE) INSERT INTO gp_ft1 SELECT * FROM table_dist_text;
INSERT INTO gp_ft1 SELECT * FROM table_dist_text;
SELECT * FROM "S 1"."GP 1" ORDER BY f1;
TRUNCATE TABLE "S 1"."GP 1";
EXPLAIN (COSTS FALSE) INSERT INTO gp_ft1 SELECT * FROM table_dist_int_text;
INSERT INTO gp_ft1 SELECT * FROM table_dist_int_text;
SELECT * FROM "S 1"."GP 1" ORDER BY f1;
TRUNCATE TABLE "S 1"."GP 1";
EXPLAIN (COSTS FALSE) INSERT INTO gp_ft1
SELECT id,
'AAA' || to_char(id, 'FM000'),
'BBB' || to_char(id, 'FM000')
FROM generate_series(1, 100) id;
INSERT INTO gp_ft1
SELECT id,
'AAA' || to_char(id, 'FM000'),
'BBB' || to_char(id, 'FM000')
FROM generate_series(1, 100) id;
SELECT * FROM "S 1"."GP 1" ORDER BY f1;
TRUNCATE TABLE "S 1"."GP 1";
-- ===================================================================
-- validate writes on any segment (mpp_execute set to any)
-- ===================================================================
ALTER FOREIGN TABLE gp_ft1 OPTIONS ( SET mpp_execute 'any' );
EXPLAIN (COSTS FALSE) INSERT INTO gp_ft1 SELECT * FROM table_dist_rand;
INSERT INTO gp_ft1 SELECT * FROM table_dist_rand;
SELECT * FROM "S 1"."GP 1" ORDER BY f1;
TRUNCATE TABLE "S 1"."GP 1";
EXPLAIN (COSTS FALSE) INSERT INTO gp_ft1 SELECT * FROM table_dist_repl;
INSERT INTO gp_ft1 SELECT * FROM table_dist_repl;
SELECT * FROM "S 1"."GP 1" ORDER BY f1;
TRUNCATE TABLE "S 1"."GP 1";
EXPLAIN (COSTS FALSE) INSERT INTO gp_ft1 SELECT * FROM table_dist_int;
INSERT INTO gp_ft1 SELECT * FROM table_dist_int;
SELECT * FROM "S 1"."GP 1" ORDER BY f1;
TRUNCATE TABLE "S 1"."GP 1";
EXPLAIN (COSTS FALSE) INSERT INTO gp_ft1 SELECT * FROM table_dist_text;
INSERT INTO gp_ft1 SELECT * FROM table_dist_text;
SELECT * FROM "S 1"."GP 1" ORDER BY f1;
TRUNCATE TABLE "S 1"."GP 1";
EXPLAIN (COSTS FALSE) INSERT INTO gp_ft1 SELECT * FROM table_dist_int_text;
INSERT INTO gp_ft1 SELECT * FROM table_dist_int_text;
SELECT * FROM "S 1"."GP 1" ORDER BY f1;
TRUNCATE TABLE "S 1"."GP 1";
EXPLAIN (COSTS FALSE) INSERT INTO gp_ft1
SELECT id,
'AAA' || to_char(id, 'FM000'),
'BBB' || to_char(id, 'FM000')
FROM generate_series(1, 100) id;
INSERT INTO gp_ft1
SELECT id,
'AAA' || to_char(id, 'FM000'),
'BBB' || to_char(id, 'FM000')
FROM generate_series(1, 100) id;
SELECT * FROM "S 1"."GP 1" ORDER BY f1;
TRUNCATE TABLE "S 1"."GP 1";
-- ===================================================================
-- validate writes on master (mpp_execute set to master)
-- ===================================================================
ALTER FOREIGN TABLE gp_ft1 OPTIONS ( SET mpp_execute 'master' );
EXPLAIN (COSTS FALSE) INSERT INTO gp_ft1 SELECT * FROM table_dist_rand;
INSERT INTO gp_ft1 SELECT * FROM table_dist_rand;
SELECT * FROM "S 1"."GP 1" ORDER BY f1;
TRUNCATE TABLE "S 1"."GP 1";
EXPLAIN (COSTS FALSE) INSERT INTO gp_ft1 SELECT * FROM table_dist_repl;
INSERT INTO gp_ft1 SELECT * FROM table_dist_repl;
SELECT * FROM "S 1"."GP 1" ORDER BY f1;
TRUNCATE TABLE "S 1"."GP 1";
EXPLAIN (COSTS FALSE) INSERT INTO gp_ft1 SELECT * FROM table_dist_int;
INSERT INTO gp_ft1 SELECT * FROM table_dist_int;
SELECT * FROM "S 1"."GP 1" ORDER BY f1;
TRUNCATE TABLE "S 1"."GP 1";
EXPLAIN (COSTS FALSE) INSERT INTO gp_ft1 SELECT * FROM table_dist_text;
INSERT INTO gp_ft1 SELECT * FROM table_dist_text;
SELECT * FROM "S 1"."GP 1" ORDER BY f1;
TRUNCATE TABLE "S 1"."GP 1";
EXPLAIN (COSTS FALSE) INSERT INTO gp_ft1 SELECT * FROM table_dist_int_text;
INSERT INTO gp_ft1 SELECT * FROM table_dist_int_text;
SELECT * FROM "S 1"."GP 1" ORDER BY f1;
TRUNCATE TABLE "S 1"."GP 1";
EXPLAIN (COSTS FALSE) INSERT INTO gp_ft1
SELECT id,
'AAA' || to_char(id, 'FM000'),
'BBB' || to_char(id, 'FM000')
FROM generate_series(1, 100) id;
INSERT INTO gp_ft1
SELECT id,
'AAA' || to_char(id, 'FM000'),
'BBB' || to_char(id, 'FM000')
FROM generate_series(1, 100) id;
SELECT * FROM "S 1"."GP 1" ORDER BY f1;
TRUNCATE TABLE "S 1"."GP 1";
\ No newline at end of file
...@@ -316,6 +316,35 @@ GpPolicyFetch(Oid tbloid) ...@@ -316,6 +316,35 @@ GpPolicyFetch(Oid tbloid)
return createRandomPartitionedPolicy(getgpsegmentCount()); return createRandomPartitionedPolicy(getgpsegmentCount());
} }
} }
else if (get_rel_relstorage(tbloid) == RELSTORAGE_FOREIGN)
{
/*
* Similar to the external table creation, there is a transient state
* during creation of a foreign table, where the pg_class entry has
* been created, before the pg_foreign_table entry has been created.
*/
HeapTuple tp = SearchSysCache1(FOREIGNTABLEREL, ObjectIdGetDatum(tbloid));
if (HeapTupleIsValid(tp))
{
ReleaseSysCache(tp);
ForeignTable *f = GetForeignTable(tbloid);
if (f->exec_location == FTEXECLOCATION_ALL_SEGMENTS)
{
/*
* Currently, foreign tables do not support a distribution
* policy, as opposed to writable external tables. For now,
* we will create a random partitioned policy for foreign
* tables that run on all segments. This will allow writing
* to foreign tables from all segments when the mpp_execute
* option is set to 'all segments'
*/
return createRandomPartitionedPolicy(getgpsegmentCount());
}
}
}
/* /*
* Select by value of the localoid field * Select by value of the localoid field
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册