提交 82329ca1 编写于 作者: M Ming LI 提交者: GitHub

Restrict only one reader of pipe on Linux. (#2771)

If there are two external tables refer to the same PIPE file using gpfdist
or file protocol directly, concurrent read will result in wrong data format
or hang for gpfdist. Now before read the pipe, it will firstly flock the
pipe file (Windows not supported yet), other requests from gpdb will report
error.
Signed-off-by: NMing LI <mli@apache.org>
Signed-off-by: NXiaoran Wang <xiwang@pivotal.io>
上级 41b5d563
......@@ -31,7 +31,7 @@
#include <errno.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/file.h> /* for flock */
#ifdef WIN32
#include <io.h>
......@@ -696,6 +696,10 @@ int gfile_open(gfile_t* fd, const char* fpath, int flags, int* response_code, co
#define S_IFDIR _S_IFDIR
#define S_ISDIR(m) (((m) & S_IFMT) == S_IFDIR)
#endif
#if !defined(S_ISFIFO)
#define S_IFIFO _S_IFIFO
#define S_ISFIFO(m) (((m) & S_IFMT) == S_IFIFO)
#endif
#define strcasecmp stricmp
#endif
......@@ -886,6 +890,28 @@ int gfile_open(gfile_t* fd, const char* fpath, int flags, int* response_code, co
return 1;
}
#ifndef WIN32
if (!is_win_pipe && (flags == GFILE_OPEN_FOR_READ))
{
/* Restrict only one reader session for each PIPE */
if (S_ISFIFO(sta.st_mode))
{
if (flock (fd->fd.filefd, LOCK_EX | LOCK_NB) != 0)
{
fd->held_pipe_lock = FALSE;
gfile_printf_then_putc_newline("gfile %s is a pipe", fpath);
*response_code = 404;
*response_string = "Multiple reader to a pipe is forbidden.";
return 1;
}
else
{
fd->held_pipe_lock = TRUE;
}
}
}
#endif
/*
* prepare to use the appropriate i/o routines
*/
......@@ -999,6 +1025,14 @@ gfile_close(gfile_t*fd)
}
else
{
int i;
if(fd->held_pipe_lock)
{
#ifndef WIN32
flock (fd->fd.filefd, LOCK_UN);
#endif
}
do
{
//fsync(fd->fd.filefd);
......
......@@ -1166,6 +1166,9 @@ static int local_send(request_t *r, const char* buf, int buflen)
if ( e == EPIPE || e == ECONNRESET )
{
gwarning(r, "gpfdist_send failed - the connection was terminated by the client (%d: %s)", e, strerror(e));
/* close stream and release fd & flock on pipe file*/
if (r->session)
session_end(r->session, 0);
} else {
gdebug(r, "gpfdist_send failed - due to (%d: %s)", e, strerror(e));
}
......
......@@ -6,6 +6,10 @@
-- check platform and start 'gpfdist'
-- --------------------------------------
set optimizer_print_missing_stats = off;
CREATE EXTERNAL WEB TABLE gpfdist_status (x text)
execute E'( python @bindir@/lib/gppinggpfdist.py @hostname@:7575 2>&1 || echo) '
on SEGMENT 0
FORMAT 'text' (delimiter '|');
CREATE EXTERNAL WEB TABLE gpfdist_start (x text)
execute E'((@bindir@/gpfdist -p 7575 -d @abs_srcdir@/data/fixedwidth/ </dev/null >/dev/null 2>&1 &); sleep 1; echo "starting...") '
......
......@@ -31,6 +31,11 @@ CREATE TABLE REG_REGION (R_REGIONKEY INT, R_NAME CHAR(25), R_COMMENT VARCHAR(152
-- 'gpfdist' protocol
-- --------------------------------------
CREATE EXTERNAL WEB TABLE exttab1_gpfdist_status (x text)
execute E'( python @bindir@/lib/gppinggpfdist.py @hostname@:7070 2>&1 || echo) '
on SEGMENT 0
FORMAT 'text' (delimiter '|');
CREATE EXTERNAL WEB TABLE exttab1_gpfdist_start (x text)
execute E'((@bindir@/gpfdist -p 7070 -d @abs_srcdir@/data </dev/null >/dev/null 2>&1 &); sleep 1; echo "starting...") '
on SEGMENT 0
......@@ -81,6 +86,42 @@ SELECT * FROM EXT_REGION as r, EXT_NATION as n WHERE n.N_REGIONKEY = r.R_REGIONK
\! wget http://@hostname@:7070/exttab1/missing_fields1.data >/dev/null 2>&1 || echo "Execute error";
\! wget --header='X-GP-PROTO:0' http://@hostname@:7070/exttab1/missing_fields1.data >/dev/null 2>&1 && echo "Execute successully";
-- test: multiple reader for pipe is forbidden
DROP EXTERNAL WEB TABLE IF EXISTS create_pipe;
CREATE EXTERNAL WEB TABLE create_pipe (x text)
execute E'(rm @abs_srcdir@/data/test_pipe; mkfifo @abs_srcdir@/data/test_pipe) > /dev/null 2>&1; echo "creating pipe..."'
on SEGMENT 0
FORMAT 'text' (delimiter '|');
DROP EXTERNAL WEB TABLE IF EXISTS write_pipe;
CREATE EXTERNAL WEB TABLE write_pipe (x text)
execute E'((for i in `seq 1 5000`;do cat @abs_srcdir@/data/exttab1/nation.tbl;done > @abs_srcdir@/data/test_pipe); echo "writing pipe...")'
on SEGMENT 0
FORMAT 'text' (delimiter '|');
DROP EXTERNAL WEB TABLE IF EXISTS cat_sqlout_result;
CREATE EXTERNAL WEB TABLE cat_sqlout_result (x text)
execute E'(grep -q "Multiple reader to a pipe is forbidden" @abs_srcdir@/data/sql1.out @abs_srcdir@/data/sql2.out && echo "Report error: Multiple reader to a pipe is forbidden";)'
on SEGMENT 0
FORMAT 'text' (delimiter '|');
DROP EXTERNAL TABLE IF EXISTS pipe_ext1;
CREATE EXTERNAL TABLE pipe_ext1(LIKE EXT_NATION)
location ('gpfdist://@hostname@:7070/test_pipe' )
FORMAT 'text' (delimiter '|');
DROP EXTERNAL TABLE IF EXISTS pipe_ext2;
CREATE EXTERNAL TABLE pipe_ext2(LIKE EXT_NATION)
location ('file://@hostname@@abs_srcdir@/data/test_pipe' )
FORMAT 'text' (delimiter '|');
select * from create_pipe;
\! psql -d gpfdist_regression -c 'select count(*) from pipe_ext1;' > @abs_srcdir@/data/sql1.out 2>&1 &
\! psql -d gpfdist_regression -c 'select count(*) from pipe_ext2;' > @abs_srcdir@/data/sql2.out 2>&1 &
select * from write_pipe;
select * from cat_sqlout_result;
-- start_ignore
select * from exttab1_gpfdist_stop;
-- end_ignore
......
......@@ -5,6 +5,10 @@
-- check platform and start 'gpfdist'
-- --------------------------------------
set optimizer_print_missing_stats = off;
CREATE EXTERNAL WEB TABLE gpfdist_status (x text)
execute E'( python @bindir@/lib/gppinggpfdist.py @hostname@:7575 2>&1 || echo) '
on SEGMENT 0
FORMAT 'text' (delimiter '|');
CREATE EXTERNAL WEB TABLE gpfdist_start (x text)
execute E'((@bindir@/gpfdist -p 7575 -d @abs_srcdir@/data/fixedwidth/ </dev/null >/dev/null 2>&1 &); sleep 1; echo "starting...") '
on SEGMENT 0
......@@ -9,6 +9,10 @@ CREATE TABLE REG_REGION (R_REGIONKEY INT, R_NAME CHAR(25), R_COMMENT VARCHAR(152
-- --------------------------------------
-- 'gpfdist' protocol
-- --------------------------------------
CREATE EXTERNAL WEB TABLE exttab1_gpfdist_status (x text)
execute E'( python @bindir@/lib/gppinggpfdist.py @hostname@:7070 2>&1 || echo) '
on SEGMENT 0
FORMAT 'text' (delimiter '|');
CREATE EXTERNAL WEB TABLE exttab1_gpfdist_start (x text)
execute E'((@bindir@/gpfdist -p 7070 -d @abs_srcdir@/data </dev/null >/dev/null 2>&1 &); sleep 1; echo "starting...") '
on SEGMENT 0
......@@ -151,6 +155,65 @@ SELECT * FROM EXT_REGION as r, EXT_NATION as n WHERE n.N_REGIONKEY = r.R_REGIONK
Execute error
\! wget --header='X-GP-PROTO:0' http://@hostname@:7070/exttab1/missing_fields1.data >/dev/null 2>&1 && echo "Execute successully";
Execute successully
-- test: multiple reader for pipe is forbidden
DROP EXTERNAL WEB TABLE IF EXISTS create_pipe;
-- start_ignore
NOTICE: table "create_pipe" does not exist, skipping
-- end_ignore
CREATE EXTERNAL WEB TABLE create_pipe (x text)
execute E'(rm @abs_srcdir@/data/test_pipe; mkfifo @abs_srcdir@/data/test_pipe) > /dev/null 2>&1; echo "creating pipe..."'
on SEGMENT 0
FORMAT 'text' (delimiter '|');
DROP EXTERNAL WEB TABLE IF EXISTS write_pipe;
-- start_ignore
NOTICE: table "write_pipe" does not exist, skipping
-- end_ignore
CREATE EXTERNAL WEB TABLE write_pipe (x text)
execute E'((for i in `seq 1 5000`;do cat @abs_srcdir@/data/exttab1/nation.tbl;done > @abs_srcdir@/data/test_pipe); echo "writing pipe...")'
on SEGMENT 0
FORMAT 'text' (delimiter '|');
DROP EXTERNAL WEB TABLE IF EXISTS cat_sqlout_result;
-- start_ignore
NOTICE: table "cat_sqlout_result" does not exist, skipping
-- end_ignore
CREATE EXTERNAL WEB TABLE cat_sqlout_result (x text)
execute E'(grep -q "Multiple reader to a pipe is forbidden" @abs_srcdir@/data/sql1.out @abs_srcdir@/data/sql2.out && echo "Report error: Multiple reader to a pipe is forbidden";)'
on SEGMENT 0
FORMAT 'text' (delimiter '|');
DROP EXTERNAL TABLE IF EXISTS pipe_ext1;
-- start_ignore
NOTICE: table "pipe_ext1" does not exist, skipping
-- end_ignore
CREATE EXTERNAL TABLE pipe_ext1(LIKE EXT_NATION)
location ('gpfdist://@hostname@:7070/test_pipe' )
FORMAT 'text' (delimiter '|');
DROP EXTERNAL TABLE IF EXISTS pipe_ext2;
-- start_ignore
NOTICE: table "pipe_ext2" does not exist, skipping
-- end_ignore
CREATE EXTERNAL TABLE pipe_ext2(LIKE EXT_NATION)
location ('file://@hostname@@abs_srcdir@/data/test_pipe' )
FORMAT 'text' (delimiter '|');
select * from create_pipe;
x
------------------
creating pipe...
(1 row)
\! psql -d gpfdist_regression -c 'select count(*) from pipe_ext1;' > @abs_srcdir@/data/sql1.out 2>&1 &
\! psql -d gpfdist_regression -c 'select count(*) from pipe_ext2;' > @abs_srcdir@/data/sql2.out 2>&1 &
select * from write_pipe;
x
-----------------
writing pipe...
(1 row)
select * from cat_sqlout_result;
x
------------------------------------------------------
Report error: Multiple reader to a pipe is forbidden
(1 row)
-- start_ignore
-- end_ignore
-- drop tables
......
......@@ -48,6 +48,7 @@ typedef struct gfile_t
int(*close)(struct gfile_t*);
off_t compressed_size,compressed_position;
bool_t is_win_pipe;
bool_t held_pipe_lock; /* Whether held flock on pipe file, used to restrict only one reader of pipe */
union
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册