提交 3aad307c 编写于 作者: ( (Jerome)Junfeng Yang 提交者: Adam Lee

Provide a pg_exttable view for extension compatibility

pg_exttable catalog was removed because we use the FDW to implement
external table now. But other extensions may still rely on the
pg_exttable catalog.

So we create a view base on this UDF to extract pg_exttable catalog
info.
Signed-off-by: NAdam Lee <ali@pivotal.io>
上级 044c4c0d
......@@ -24,6 +24,7 @@
#include "access/exttable_fdw_shim.h"
#include "access/fileam.h"
#include "access/reloptions.h"
#include "access/relscan.h"
#include "cdb/cdbsreh.h"
#include "cdb/cdbvars.h"
......@@ -31,7 +32,10 @@
#include "catalog/pg_extprotocol.h"
#include "catalog/pg_exttable.h"
#include "catalog/pg_foreign_server.h"
#include "catalog/pg_foreign_table.h"
#include "commands/defrem.h"
#include "foreign/fdwapi.h"
#include "funcapi.h"
#include "nodes/execnodes.h"
#include "nodes/relation.h"
#include "optimizer/clauses.h"
......@@ -40,10 +44,34 @@
#include "optimizer/planmain.h"
#include "optimizer/restrictinfo.h"
#include "optimizer/var.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/syscache.h"
#include "utils/uri.h"
#define GP_EXTTABLE_ATTRNUM 12
/*
* PGExtTableEntry is used in pg_exttable(). It reflects each external
* table entry in the foreign table catalog.
*/
typedef struct PGExtTableEntry
{
Oid reloid;
Oid serveroid;
List *ftoptions;
} PGExtTableEntry;
/*
* PGExtTableEntriesContext is used in pg_exttable() as user_fctx.
*/
typedef struct PGExtTableEntriesContext
{
int entryIdx;
List *ftentries;
} PGExtTableEntriesContext;
typedef struct
{
FileScanDescData *ess_ScanDesc;
......@@ -57,6 +85,265 @@ static List *create_external_scan_uri_list(ExtTableEntry *ext, bool *ismasteronl
static void cost_externalscan(ForeignPath *path, PlannerInfo *root,
RelOptInfo *baserel, ParamPathInfo *param_info);
/*
* strListToArray - String Value to Text array datum
*/
static Datum
strListToArray(List *stringlist)
{
ArrayBuildState *astate = NULL;
ListCell *cell;
foreach(cell, stringlist)
{
Value *val = lfirst(cell);
astate = accumArrayResult(astate, CStringGetTextDatum(strVal(val)),
false, TEXTOID,
CurrentMemoryContext);
}
if (astate)
return makeArrayResult(astate, CurrentMemoryContext);
return PointerGetDatum(NULL);
}
/*
* formatOptionsToTextDatum
* Convert format options to text datum. The text datum format is same with
* the original pg_exttable catalog's fmtopt field.
*/
static Datum
formatOptionsToTextDatum(List *options, char formattype)
{
ListCell *option;
Datum result;
StringInfoData cfbuf;
initStringInfo(&cfbuf);
if (fmttype_is_text(formattype) || fmttype_is_csv(formattype))
{
bool isfirst = true;
/*
* Note: the order of the options should be same with the original
* pg_exttable catalog's fmtopt field.
*/
foreach(option, options)
{
DefElem *defel = (DefElem *) lfirst(option);
char *key = defel->defname;
char *val = (char *) defGetString(defel);
if (strcmp(defel->defname, "format") == 0)
continue;
if (isfirst)
isfirst = false;
else
appendStringInfo(&cfbuf, " ");
if (strcmp(defel->defname, "header") == 0)
appendStringInfo(&cfbuf, "header");
else if (strcmp(defel->defname, "fill_missing_fields") == 0)
appendStringInfo(&cfbuf, "fill missing fields");
else if (strcmp(defel->defname, "force_not_null") == 0)
appendStringInfo(&cfbuf, "force not null %s", val);
else if (strcmp(defel->defname, "force_quote") == 0)
appendStringInfo(&cfbuf, "force quote %s", val);
else
appendStringInfo(&cfbuf, "%s '%s'", key, val);
}
}
else
{
foreach(option, options)
{
DefElem *defel = (DefElem *) lfirst(option);
char *key = defel->defname;
char *val = (char *) defGetString(defel);
appendStringInfo(&cfbuf, "%s '%s'", key, val);
}
}
result = CStringGetTextDatum(cfbuf.data);
pfree(cfbuf.data);
return result;
}
/*
* Use the pg_exttable UDF to extract pg_exttable catalog info for
* extension compatibility.
*
* pg_exttable catalog was removed because we use the FDW to implement
* external table now. But other extensions may still rely on the pg_exttable
* catalog. So we create a view base on this UDF to extract pg_exttable catalog
* info.
*/
Datum pg_exttable(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
PGExtTableEntriesContext *context;
Datum values[GP_EXTTABLE_ATTRNUM];
bool nulls[GP_EXTTABLE_ATTRNUM] = {false};
/*
* First call setup
*/
if (SRF_IS_FIRSTCALL())
{
MemoryContext oldcontext;
Relation pg_foreign_table_rel;
ScanKeyData ftkey;
SysScanDesc ftscan;
HeapTuple fttuple;
Form_pg_foreign_table fttableform;
List *ftentries = NIL;
funcctx = SRF_FIRSTCALL_INIT();
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/* Build tuple descriptor */
TupleDesc tupdesc =
CreateTemplateTupleDesc(GP_EXTTABLE_ATTRNUM, false);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "reloid", OIDOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "urilocation", TEXTARRAYOID, -1, 1);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "execlocation", TEXTARRAYOID, -1, 1);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "fmttype", CHAROID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 5, "fmtopts", TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 6, "options", TEXTARRAYOID, -1, 1);
TupleDescInitEntry(tupdesc, (AttrNumber) 7, "command", TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 8, "rejectlimit", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 9, "rejectlimittype", CHAROID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 10, "logerrors", CHAROID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 11, "encoding", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 12, "writable", BOOLOID, -1, 0);
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
/* Retrieve external table in foreign table catalog */
pg_foreign_table_rel = heap_open(ForeignTableRelationId, AccessShareLock);
ScanKeyInit(&ftkey,
Anum_pg_foreign_table_ftserver,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(PG_EXTTABLE_SERVER_OID));
ftscan = systable_beginscan(pg_foreign_table_rel, InvalidOid,
false, NULL, 1, &ftkey);
while (HeapTupleIsValid(fttuple = systable_getnext(ftscan)))
{
Datum ftoptions;
bool isNull;
PGExtTableEntry *entry= (PGExtTableEntry *) palloc0(sizeof(PGExtTableEntry));
fttableform = (Form_pg_foreign_table) GETSTRUCT(fttuple);
entry->reloid = fttableform->ftrelid;
entry->serveroid = fttableform->ftserver;
/* get the foreign table options */
ftoptions = heap_getattr(fttuple,
Anum_pg_foreign_table_ftoptions,
RelationGetDescr(pg_foreign_table_rel),
&isNull);
if (!isNull)
entry->ftoptions = untransformRelOptions(ftoptions);
ftentries = lappend(ftentries, entry);
}
systable_endscan(ftscan);
heap_close(pg_foreign_table_rel, AccessShareLock);
context = (PGExtTableEntriesContext *)palloc0(sizeof(PGExtTableEntriesContext));
context->entryIdx = 0;
context->ftentries = ftentries;
funcctx->user_fctx = (void *) context;
MemoryContextSwitchTo(oldcontext);
}
funcctx = SRF_PERCALL_SETUP();
context = (PGExtTableEntriesContext *) funcctx->user_fctx;
while (context->entryIdx < list_length(context->ftentries))
{
PGExtTableEntry *entry;
ExtTableEntry *extentry;
Datum datum;
HeapTuple tuple;
Datum result;
entry = (PGExtTableEntry *)list_nth(context->ftentries, context->entryIdx);
context->entryIdx++;
extentry = GetExtFromForeignTableOptions(entry->ftoptions, entry->reloid);
memset(values, 0, sizeof(values));
memset(nulls, 0, sizeof(nulls));
/* reloid */
values[0] = ObjectIdGetDatum(entry->reloid);
/* urilocations */
datum = strListToArray(extentry->urilocations);
if (DatumGetPointer(datum) != NULL)
values[1] = datum;
else
nulls[1] = true;
/* execlocations */
datum = strListToArray(extentry->execlocations);
if (DatumGetPointer(datum) != NULL)
values[2] = datum;
else
nulls[2] = true;
/* fmtcode */
values[3] = CharGetDatum(extentry->fmtcode);
/* fmtopts */
if (extentry->options)
values[4] = formatOptionsToTextDatum(extentry->options, extentry->fmtcode);
else
nulls[4] = true;
/*
* options. Since our document not contains the OPTION caluse, so we
* assume no external table options in used for now. Except
* gpextprotocol.c.
*/
nulls[5] = true;
/* command */
if (extentry->command)
values[6] = CStringGetTextDatum(extentry->command);
else
nulls[6] = true;
/* rejectlimit */
values[7] = Int32GetDatum(extentry->rejectlimit);
if (values[7] == -1)
nulls[7] = true;
/* rejectlimittype */
values[8] = CharGetDatum(extentry->rejectlimittype);
if (values[8] == -1)
nulls[8] = true;
/* logerrors */
values[9] = CharGetDatum(extentry->logerrors);
/* encoding */
values[10] = Int32GetDatum(extentry->encoding);
/* iswritable */
values[11] = BoolGetDatum(extentry->iswritable);
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
result = HeapTupleGetDatum(tuple);
SRF_RETURN_NEXT(funcctx, result);
}
SRF_RETURN_DONE(funcctx);
}
/* FDW validator for external tables */
void
gp_exttable_permission_check(PG_FUNCTION_ARGS)
......
......@@ -936,6 +936,8 @@ CREATE VIEW pg_resqueue_status AS
ON (s.queueid = q.oid);
-- External table views
CREATE VIEW pg_exttable AS
SELECT * FROM pg_exttable();
CREATE VIEW pg_max_external_files AS
SELECT address::name as hostname, count(*) as maxfiles
......
......@@ -694,11 +694,12 @@ transformFormatOpts(char formattype, List *formatOpts, int numcols, bool iswrita
numcols,
false /* is_copy */);
if (fmttype_is_csv(formattype))
cslist = lappend(cslist, makeDefElem("quote", (Node *) makeString(cstate->quote)));
/* keep the same order with the original pg_exttable catalog's fmtopt field */
cslist = lappend(cslist, makeDefElem("delimiter", (Node *) makeString(cstate->delim)));
cslist = lappend(cslist, makeDefElem("null", (Node *) makeString(cstate->null_print)));
cslist = lappend(cslist, makeDefElem("escape", (Node *) makeString(cstate->escape)));
if (fmttype_is_csv(formattype))
cslist = lappend(cslist, makeDefElem("quote", (Node *) makeString(cstate->quote)));
if (cstate->header_line)
cslist = lappend(cslist, makeDefElem("header", (Node *) makeString("true")));
if (cstate->fill_missing)
......@@ -919,18 +920,7 @@ GetExtTableEntryIfExists(Oid relid)
HeapTuple fttuple;
ExtTableEntry *extentry;
bool isNull;
ListCell *lc;
char *arg;
List *entryOptions = NIL;
List *ftoptions_list = NIL;;
bool fmtcode_found = false;
bool rejectlimit_found = false;
bool rejectlimittype_found = false;
bool logerrors_found = false;
bool encoding_found = false;
bool iswritable_found = false;
bool locationuris_found = false;
bool command_found = false;
pg_foreign_table_rel = heap_open(ForeignTableRelationId, RowExclusiveLock);
......@@ -951,8 +941,6 @@ GetExtTableEntryIfExists(Oid relid)
return NULL;
}
extentry = (ExtTableEntry *) palloc0(sizeof(ExtTableEntry));
/* get the foreign table options */
Datum ftoptions = heap_getattr(fttuple,
Anum_pg_foreign_table_ftoptions,
......@@ -969,7 +957,34 @@ GetExtTableEntryIfExists(Oid relid)
ftoptions_list = untransformRelOptions(ftoptions);
}
foreach(lc, ftoptions_list)
extentry = GetExtFromForeignTableOptions(ftoptions_list, relid);
/* Finish up scan and close catalogs */
systable_endscan(ftscan);
heap_close(pg_foreign_table_rel, RowExclusiveLock);
return extentry;
}
ExtTableEntry *
GetExtFromForeignTableOptions(List *ftoptons, Oid relid)
{
ExtTableEntry *extentry;
ListCell *lc;
List *entryOptions = NIL;
char *arg;
bool fmtcode_found = false;
bool rejectlimit_found = false;
bool rejectlimittype_found = false;
bool logerrors_found = false;
bool encoding_found = false;
bool iswritable_found = false;
bool locationuris_found = false;
bool command_found = false;
extentry = (ExtTableEntry *) palloc0(sizeof(ExtTableEntry));
foreach(lc, ftoptons)
{
DefElem *def = (DefElem *) lfirst(lc);
......@@ -1085,9 +1100,5 @@ GetExtTableEntryIfExists(Oid relid)
extentry->options = entryOptions;
/* Finish up scan and close catalogs */
systable_endscan(ftscan);
heap_close(pg_foreign_table_rel, RowExclusiveLock);
return extentry;
}
......@@ -20,6 +20,8 @@
extern Datum exttable_fdw_handler(PG_FUNCTION_ARGS);
extern Datum pg_exttable(PG_FUNCTION_ARGS);
extern void gp_exttable_permission_check(PG_FUNCTION_ARGS);
extern ForeignScan *create_foreignscan_for_external_table(Oid relid, Index scanrelid, List *qual, List *targetlist);
......
......@@ -56,6 +56,6 @@
*/
/* 3yyymmddN */
#define CATALOG_VERSION_NO 302005181
#define CATALOG_VERSION_NO 302005182
#endif
......@@ -44,6 +44,8 @@ extern ExtTableEntry *GetExtTableEntry(Oid relid);
extern ExtTableEntry *GetExtTableEntryIfExists(Oid relid);
extern ExtTableEntry *GetExtFromForeignTableOptions(List *ftoptons, Oid relid);
#define fmttype_is_custom(c) (c == 'b')
#define fmttype_is_text(c) (c == 't')
#define fmttype_is_csv(c) (c == 'c')
......
......@@ -168,6 +168,9 @@
CREATE FUNCTION gp_list_backend_priorities() RETURNS SETOF record LANGUAGE internal VOLATILE AS 'gp_list_backend_priorities' WITH (OID=5042, DESCRIPTION="list priorities of backends");
-- Functions to extract external table info
CREATE FUNCTION pg_exttable(OUT reloid oid, OUT urilocation _text, OUT execlocation _text, OUT fmttype char, OUT fmtopts text, OUT options _text, OUT command text, OUT rejectlimit int4, OUT rejectlimittype char, OUT logerrors char, OUT encoding int4, OUT writable bool) RETURNS SETOF record LANGUAGE internal VOLATILE EXECUTE ON MASTER AS 'pg_exttable' WITH (OID=7061, DESCRIPTION="original pg_exttable catalog info");
CREATE FUNCTION gp_exttable_permission_check(text, oid) RETURNS void LANGUAGE internal VOLATILE NO SQL AS 'gp_exttable_permission_check' WITH (OID=7070, DESCRIPTION="validator for external tables");
-- Functions to deal with SREH error logs
......
......@@ -22,7 +22,7 @@
WARNING: DO NOT MODIFY THE FOLLOWING SECTION:
Generated by catullus.pl version 8
on Mon May 18 10:05:36 2020
on Mon May 18 10:15:00 2020
Please make your changes in pg_proc.sql
*/
......@@ -356,6 +356,11 @@ DATA(insert OID = 5042 ( gp_list_backend_priorities PGNSP PGUID 12 1 1000 0 0 f
DESCR("list priorities of backends");
/* Functions to extract external table info */
/* pg_exttable(OUT reloid oid, OUT urilocation _text, OUT execlocation _text, OUT fmttype char, OUT fmtopts text, OUT options _text, OUT command text, OUT rejectlimit int4, OUT rejectlimittype char, OUT logerrors char, OUT encoding int4, OUT writable bool) => SETOF record */
DATA(insert OID = 7061 ( pg_exttable PGNSP PGUID 12 1 1000 0 0 f f f f f t v u 0 0 2249 "" "{26,1009,1009,18,25,1009,25,23,18,18,23,16}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{reloid,urilocation,execlocation,fmttype,fmtopts,options,command,rejectlimit,rejectlimittype,logerrors,encoding,writable}" _null_ _null_ pg_exttable _null_ _null_ _null_ n m ));
DESCR("original pg_exttable catalog info");
/* gp_exttable_permission_check(text, oid) => void */
DATA(insert OID = 7070 ( gp_exttable_permission_check PGNSP PGUID 12 1 0 0 0 f f f f f f v u 2 0 2278 "25 26" _null_ _null_ _null_ _null_ _null_ gp_exttable_permission_check _null_ _null_ _null_ n a ));
DESCR("validator for external tables");
......
......@@ -103,6 +103,6 @@ WHERE
---------+------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
t_ext | f | {format=text,"delimiter= ","null=\\N","escape=\\",format_type=t,location_uris=file://127.0.0.1/tmp/foo,execute_on=ALL_SEGMENTS,log_errors=f,encoding=6,is_writable=false}
t_ext_b | f | {format=text,"delimiter= ","null=\\N","escape=\\",format_type=t,location_uris=file://127.0.0.1/tmp/foo,execute_on=ALL_SEGMENTS,log_errors=f,encoding=6,is_writable=false}
t_ext_r | f | {format=csv,"quote=\"","delimiter= ",null=,"escape=\"",format_type=c,location_uris=gpfdist://127.0.0.1:8080/tmp/dummy,execute_on=ALL_SEGMENTS,log_errors=f,encoding=6,is_writable=false}
t_ext_r | f | {format=csv,"delimiter= ",null=,"escape=\"","quote=\"",format_type=c,location_uris=gpfdist://127.0.0.1:8080/tmp/dummy,execute_on=ALL_SEGMENTS,log_errors=f,encoding=6,is_writable=false}
(3 rows)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册