提交 1c9fb699 编写于 作者: H Heikki Linnakangas

Fix rescan support for external scan nodes.

Instead of pushing the responsibility of rescanning down to each different
kind of external table implement rescanning in fileam.c in a generic
fashion, by closing and reopening the underlying "url". This gets us rescan
support for custom and EXECUTE-type external tables, which was missing
before, and also makes the code simpler.

There are no known cases where the rescan support is currently needed (hence
no test case included), because the planner puts Materialize nodes on top of
external scans, but in principle every plan node is supposed to be
rescannable. I tested this by reverting the previous patch that fixed
using external scans in a subplan; without that patch, an external table
in a subplan would get rescanned.
上级 8ae5a93f
......@@ -299,15 +299,10 @@ void
external_rescan(FileScanDesc scan)
{
if (!scan->fs_noop)
{
/* may need to open file since beginscan doens't do it for us */
if (!scan->fs_file)
open_external_readable_source(scan);
/* Close previous scan if it was already open */
external_stopscan(scan);
/* seek to beginning of data source so we can start over */
url_rewind((URL_FILE*)scan->fs_file, RelationGetRelationName(scan->fs_rd));
}
/* The first call to external_getnext will re-open the scan */
/* reset some parse state variables */
scan->fs_pstate->fe_eof = false;
......
......@@ -2481,64 +2481,6 @@ url_fflush(URL_FILE *file, CopyState pstate)
}
}
void
url_rewind(URL_FILE *file, const char *relname)
{
switch(file->type)
{
case CFTYPE_FILE:
fstream_rewind(file->u.file.fp);
break;
case CFTYPE_EXEC:
{
elog(ERROR, "Rescan is not supported for web external table: %s. "
"Please use 'set optimizer=on' as a work around "
"and 'set optimizer_enable_master_only_queries=on' if accessing catalog tables.", relname);
}
break;
#ifdef USE_CURL
case CFTYPE_CURL:
/* halt transaction */
{
CURLMcode e;
if (!file->u.curl.for_write)
{
// TODO: Is this for reading only?
e = curl_multi_remove_handle(multi_handle, file->u.curl.handle);
if (CURLM_OK != e)
elog(ERROR, "internal error curl_multi_remove_handle (%d - %s)", e, curl_easy_strerror(e));
/* restart */
e = curl_multi_add_handle(multi_handle, file->u.curl.handle);
if (CURLM_OK != e)
elog(ERROR, "internal error curl_multi_add_handle (%d - %s)", e, curl_easy_strerror(e));
}
/* ditch buffer - write will recreate - resets stream pos*/
if (file->u.curl.in.ptr)
free(file->u.curl.in.ptr);
file->u.curl.gp_proto = 0;
file->u.curl.error = file->u.curl.eof = 0;
memset(&file->u.curl.in, 0, sizeof(file->u.curl.in));
memset(&file->u.curl.block, 0, sizeof(file->u.curl.block));
}
break;
#endif
case CFTYPE_CUSTOM:
elog(ERROR, "rewind support not yet implemented in custom protocol");
break;
default: /* unknown or supported type - oh dear */
break;
}
}
/*
* interpretError - formats a brief message and/or the exit code from pclose()
* (or wait4()).
......
......@@ -2073,7 +2073,6 @@ _outRelOptInfo(StringInfo str, RelOptInfo *node)
WRITE_CHAR_FIELD(rejectlimittype);
WRITE_OID_FIELD(fmterrtbl);
WRITE_INT_FIELD(ext_encoding);
WRITE_BOOL_FIELD(isrescannable);
WRITE_BOOL_FIELD(writable);
WRITE_NODE_FIELD(baserestrictinfo);
WRITE_NODE_FIELD(joininfo);
......
......@@ -1111,7 +1111,13 @@ create_external_path(PlannerInfo *root, RelOptInfo *rel)
pathnode->path.locus = cdbpathlocus_from_baserel(root, rel);
pathnode->path.motionHazard = false;
pathnode->path.rescannable = rel->isrescannable;
/*
* Mark external tables as non-rescannable. While rescan is possible,
* it can lead to surprising results if the external table produces
* different results when invoked twice.
*/
pathnode->path.rescannable = false;
cost_externalscan(pathnode, root, rel);
......
......@@ -358,9 +358,6 @@ get_external_relation_info(Oid relationObjectId, RelOptInfo *rel)
rel->ext_encoding = extentry->encoding;
rel->writable = extentry->iswritable;
/* any external tables are non-rescannable. */
rel->isrescannable = false;
}
/*
......
......@@ -99,7 +99,6 @@ build_simple_rel(PlannerInfo *root, int relid, RelOptKind reloptkind)
rel->rejectlimittype = '\0';
rel->fmterrtbl = InvalidOid;
rel->ext_encoding = -1;
rel->isrescannable = true;
rel->writable = false;
rel->baserestrictinfo = NIL;
rel->baserestrictcost.startup = 0;
......
......@@ -998,39 +998,6 @@ int64_t fstream_get_compressed_position(fstream_t *fs)
return p;
}
/*
* fstream_rewind
*
* close the currently open file. open the first file in the file stream
* chain, reset state and start from scratch.
*/
int fstream_rewind(fstream_t *fs)
{
int response_code;
const char* response_string;
struct gpfxdist_t* transform = fs->options.transform;
fs->fidx = 0;
fs->foff = 0;
fs->line_number = 1;
fs->ferror = 0;
fs->skip_header_line = fs->options.header;
fs->buffer_cur_size = 0;
fs->compressed_position = 0;
gfile_close(&fs->fd);
if (gfile_open(&fs->fd, fs->glob.gl_pathv[0], GFILE_OPEN_FOR_READ,
&response_code, &response_string, transform))
{
gfile_printf_then_putc_newline("fstream unable to open file %s",
fs->glob.gl_pathv[0]);
fs->ferror = "unable to open file";
return -1;
}
return 0;
}
bool_t fstream_is_win_pipe(fstream_t *fs)
{
return fs->fd.is_win_pipe;
......
......@@ -159,7 +159,6 @@ extern bool url_feof(URL_FILE *file, int bytesread);
extern bool url_ferror(URL_FILE *file, int bytesread, char *ebuf, int ebuflen);
extern size_t url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate);
extern size_t url_fwrite(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate);
extern void url_rewind(URL_FILE *file, const char *relname);
extern void url_fflush(URL_FILE *file, CopyState pstate);
extern URL_FILE *url_execute_fopen(char* url, char *cmd, bool forwrite, extvar_t *ev);
......
......@@ -46,7 +46,6 @@ int fstream_write(fstream_t *fs,
int fstream_eof(fstream_t* fs);
int64_t fstream_get_compressed_size(fstream_t* fs);
int64_t fstream_get_compressed_position(fstream_t* fs);
int fstream_rewind(fstream_t* fs);
const char* fstream_get_error(fstream_t* fs);
fstream_t* fstream_open(const char* path, const struct fstream_options* options,
int* response_code, const char** response_string);
......
......@@ -456,7 +456,6 @@ typedef struct RelOptInfo
char rejectlimittype;
Oid fmterrtbl;
int32 ext_encoding;
bool isrescannable; /* false for ext web tables */
bool writable; /* true for writable, false for readable ext tables*/
/* used by various scans and joins: */
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册