未验证 提交 23f8671a 编写于 作者: ( (Jerome)Junfeng Yang 提交者: GitHub

Use foreign data wraper routines to replace external insert in COPY FROM. (#10169)

Enable Copy Form for foreign tables to remove the external table
dependency in copy.c.
This commit backports small part of the commit 3d956d95 from Postgres.

Remove the fileam.h including from non-external code. So we can extract
external table into extension later.
Move function external_set_env_vars to URL component since extvar_t is
defined in url.h.
Implement external table fdw's BeginForeignInsert and EndForeignInsert,
so COPY FROM will go through the fdw routine instead of the external
insert.
Reviewed-by: NHeikki Linnakangas <heikki.linnakangas@iki.fi>
上级 36b1c7d0
......@@ -132,6 +132,11 @@ DELETE FROM agg_csv WHERE a = 100;
-- but this should be allowed
SELECT * FROM agg_csv FOR UPDATE;
-- copy from isn't supported either
COPY agg_csv FROM STDIN;
12 3.4
\.
-- constraint exclusion tests
\t on
EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv WHERE a < 0;
......
......@@ -217,6 +217,9 @@ SELECT * FROM agg_csv FOR UPDATE;
42 | 324.78
(3 rows)
-- copy from isn't supported either
COPY agg_csv FROM STDIN;
ERROR: cannot insert into foreign table "agg_csv"
-- constraint exclusion tests
\t on
EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv WHERE a < 0;
......
......@@ -20,7 +20,7 @@
#include "pxf_filter.h"
#include "pxf_header.h"
#include "access/fileam.h"
#include "access/url.h"
#include "utils/builtins.h"
#include "catalog/pg_exttable.h"
#include "commands/defrem.h"
......
......@@ -963,6 +963,30 @@ exttable_EndForeignModify(EState *estate, ResultRelInfo *rinfo)
external_insert_finish(extInsertDesc);
}
static void exttable_BeginForeignInsert(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo)
{
/*
* This would be the natural place to call external_insert_init(), but we
* delay that until the first actual insert. That's because we don't want
* to open the external resource if we don't end up actually inserting any
* rows in this segment. In particular, we don't want to initialize the
* external resource in the QD node, when all the actual insertions happen
* in the segments.
*/
}
static void exttable_EndForeignInsert(EState *estate,
ResultRelInfo *resultRelInfo)
{
ExternalInsertDescData *extInsertDesc = (ExternalInsertDescData *) resultRelInfo->ri_FdwState;
if (extInsertDesc == NULL)
return;
external_insert_finish(extInsertDesc);
}
Datum
exttable_fdw_handler(PG_FUNCTION_ARGS)
{
......@@ -980,6 +1004,8 @@ exttable_fdw_handler(PG_FUNCTION_ARGS)
routine->BeginForeignModify = exttable_BeginForeignModify;
routine->ExecForeignInsert = exttable_ExecForeignInsert;
routine->EndForeignModify = exttable_EndForeignModify;
routine->BeginForeignInsert = exttable_BeginForeignInsert;
routine->EndForeignInsert = exttable_EndForeignInsert;
PG_RETURN_POINTER(routine);
};
......
......@@ -34,35 +34,27 @@
#include "postgres.h"
#include <fstream/gfile.h>
#include <tcop/tcopprot.h>
#include "access/fileam.h"
#include "access/formatter.h"
#include "access/heapam.h"
#include "access/url.h"
#include "access/valid.h"
#include "catalog/pg_exttable.h"
#include "catalog/pg_proc.h"
#include "cdb/cdbsreh.h"
#include "cdb/cdbtm.h"
#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"
#include "commands/copy.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq-be.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "pgstat.h"
#include "parser/parse_func.h"
#include "postmaster/postmaster.h" /* postmaster port */
#include "utils/relcache.h"
#include "utils/lsyscache.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/uri.h"
static HeapTuple externalgettup(FileScanDesc scan, ScanDirection dir);
static void InitParseState(CopyState pstate, Relation relation,
......@@ -91,11 +83,6 @@ static void external_scan_error_callback(void *arg);
static Oid lookupCustomFormatter(List **options, bool iswritable);
static void justifyDatabuf(StringInfo buf);
static void base16_encode(char *raw, int len, char *encoded);
static char *get_eol_delimiter(List *params);
static void external_set_env_vars_ext(extvar_t *extvar, char *uri, bool csv, char *escape,
char *quote, int eol_type, bool header, uint32 scancounter, List *params);
/* ----------------------------------------------------------------
* external_ interface functions
......@@ -1418,6 +1405,17 @@ external_senddata(URL_FILE *extfile, CopyState pstate)
}
}
static char *
linenumber_atoi(char *buffer, size_t bufsz, int64 linenumber)
{
if (linenumber < 0)
snprintf(buffer, bufsz, "%s", "N/A");
else
snprintf(buffer, bufsz, INT64_FORMAT, linenumber);
return buffer;
}
/*
* error context callback for external table scan
*/
......@@ -1571,162 +1569,6 @@ justifyDatabuf(StringInfo buf)
buf->cursor = 0;
}
char *
linenumber_atoi(char *buffer, size_t bufsz, int64 linenumber)
{
if (linenumber < 0)
snprintf(buffer, bufsz, "%s", "N/A");
else
snprintf(buffer, bufsz, INT64_FORMAT, linenumber);
return buffer;
}
static char *
get_eol_delimiter(List *params)
{
ListCell *lc = params->head;
while (lc)
{
if (pg_strcasecmp(((DefElem *) lc->data.ptr_value)->defname, "line_delim") == 0)
return pstrdup(((Value *) ((DefElem *) lc->data.ptr_value)->arg)->val.str);
lc = lc->next;
}
return pstrdup("");
}
static void
base16_encode(char *raw, int len, char *encoded)
{
const char *raw_bytes = raw;
char *encoded_bytes = encoded;
int remaining = len;
for (; remaining--; encoded_bytes += 2)
{
sprintf(encoded_bytes, "%02x", *(raw_bytes++));
}
}
void
external_set_env_vars(extvar_t *extvar, char *uri, bool csv, char *escape, char *quote, bool header, uint32 scancounter)
{
external_set_env_vars_ext(extvar, uri, csv, escape, quote, EOL_UNKNOWN, header, scancounter, NULL);
}
static void
external_set_env_vars_ext(extvar_t *extvar, char *uri, bool csv, char *escape, char *quote, int eol_type, bool header,
uint32 scancounter, List *params)
{
time_t now = time(0);
struct tm *tm = localtime(&now);
char *result = (char *) palloc(7); /* sign, 5 digits, '\0' */
char *encoded_delim;
int line_delim_len;
snprintf(extvar->GP_CSVOPT, sizeof(extvar->GP_CSVOPT),
"m%dx%dq%dn%dh%d",
csv ? 1 : 0,
escape ? 255 & *escape : 0,
quote ? 255 & *quote : 0,
eol_type,
header ? 1 : 0);
if (Gp_role != GP_ROLE_DISPATCH)
{
pg_ltoa(qdPostmasterPort, result);
extvar->GP_MASTER_PORT = result;
extvar->GP_MASTER_HOST = qdHostname;
}
else
{
CdbComponentDatabaseInfo *qdinfo =
cdbcomponent_getComponentInfo(MASTER_CONTENT_ID);
pg_ltoa(qdinfo->config->port, result);
extvar->GP_MASTER_PORT = result;
if (qdinfo->config->hostip != NULL)
extvar->GP_MASTER_HOST = pstrdup(qdinfo->config->hostip);
else
extvar->GP_MASTER_HOST = pstrdup(qdinfo->config->hostname);
}
if (MyProcPort)
extvar->GP_USER = MyProcPort->user_name;
else
extvar->GP_USER = "";
extvar->GP_DATABASE = get_database_name(MyDatabaseId);
extvar->GP_SEG_PG_CONF = ConfigFileName; /* location of the segments
* pg_conf file */
extvar->GP_SEG_DATADIR = DataDir; /* location of the segments
* datadirectory */
sprintf(extvar->GP_DATE, "%04d%02d%02d",
1900 + tm->tm_year, 1 + tm->tm_mon, tm->tm_mday);
sprintf(extvar->GP_TIME, "%02d%02d%02d",
tm->tm_hour, tm->tm_min, tm->tm_sec);
/*
* read-only query don't have a valid distributed transaction ID, use
* "session id"-"command id" to identify the transaction.
*/
if (!getDistributedTransactionIdentifier(extvar->GP_XID))
sprintf(extvar->GP_XID, "%u-%.10u", gp_session_id, gp_command_count);
sprintf(extvar->GP_CID, "%x", QEDtxContextInfo.curcid);
sprintf(extvar->GP_SN, "%x", scancounter);
sprintf(extvar->GP_SEGMENT_ID, "%d", GpIdentity.segindex);
sprintf(extvar->GP_SEG_PORT, "%d", PostPortNumber);
sprintf(extvar->GP_SESSION_ID, "%d", gp_session_id);
sprintf(extvar->GP_SEGMENT_COUNT, "%d", getgpsegmentCount());
extvar->GP_QUERY_STRING = (char *)debug_query_string;
if (NULL != params)
{
char *line_delim_str = get_eol_delimiter(params);
line_delim_len = (int) strlen(line_delim_str);
if (line_delim_len > 0)
{
encoded_delim = (char *) (palloc(line_delim_len * 2 + 1));
base16_encode(line_delim_str, line_delim_len, encoded_delim);
}
else
{
line_delim_len = -1;
encoded_delim = "";
}
}
else
{
switch(eol_type)
{
case EOL_CR:
encoded_delim = "0D";
line_delim_len = 1;
break;
case EOL_NL:
encoded_delim = "0A";
line_delim_len = 1;
break;
case EOL_CRNL:
encoded_delim = "0D0A";
line_delim_len = 2;
break;
default:
encoded_delim = "";
line_delim_len = -1;
break;
}
}
extvar->GP_LINE_DELIM_STR = pstrdup(encoded_delim);
sprintf(extvar->GP_LINE_DELIM_LENGTH, "%d", line_delim_len);
}
List *
appendCopyEncodingOption(List *copyFmtOpts, int encoding)
{
......
......@@ -15,11 +15,169 @@
#include "postgres.h"
#include "access/url.h"
#include "cdb/cdbdtxcontextinfo.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbtm.h"
#include "commands/dbcommands.h"
#include "libpq/libpq-be.h"
#include "miscadmin.h"
#include "postmaster/postmaster.h" /* postmaster port */
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/uri.h"
/* GUC */
int readable_external_table_timeout = 0;
static void base16_encode(char *raw, int len, char *encoded);
static char *get_eol_delimiter(List *params);
void
external_set_env_vars(extvar_t *extvar, char *uri, bool csv, char *escape, char *quote, bool header, uint32 scancounter)
{
external_set_env_vars_ext(extvar, uri, csv, escape, quote, EOL_UNKNOWN, header, scancounter, NULL);
}
void
external_set_env_vars_ext(extvar_t *extvar, char *uri, bool csv, char *escape, char *quote, int eol_type, bool header,
uint32 scancounter, List *params)
{
time_t now = time(0);
struct tm *tm = localtime(&now);
char *result = (char *) palloc(7); /* sign, 5 digits, '\0' */
char *encoded_delim;
int line_delim_len;
snprintf(extvar->GP_CSVOPT, sizeof(extvar->GP_CSVOPT),
"m%dx%dq%dn%dh%d",
csv ? 1 : 0,
escape ? 255 & *escape : 0,
quote ? 255 & *quote : 0,
eol_type,
header ? 1 : 0);
if (Gp_role != GP_ROLE_DISPATCH)
{
pg_ltoa(qdPostmasterPort, result);
extvar->GP_MASTER_PORT = result;
extvar->GP_MASTER_HOST = qdHostname;
}
else
{
CdbComponentDatabaseInfo *qdinfo =
cdbcomponent_getComponentInfo(MASTER_CONTENT_ID);
pg_ltoa(qdinfo->config->port, result);
extvar->GP_MASTER_PORT = result;
if (qdinfo->config->hostip != NULL)
extvar->GP_MASTER_HOST = pstrdup(qdinfo->config->hostip);
else
extvar->GP_MASTER_HOST = pstrdup(qdinfo->config->hostname);
}
if (MyProcPort)
extvar->GP_USER = MyProcPort->user_name;
else
extvar->GP_USER = "";
extvar->GP_DATABASE = get_database_name(MyDatabaseId);
extvar->GP_SEG_PG_CONF = ConfigFileName; /* location of the segments
* pg_conf file */
extvar->GP_SEG_DATADIR = DataDir; /* location of the segments
* datadirectory */
sprintf(extvar->GP_DATE, "%04d%02d%02d",
1900 + tm->tm_year, 1 + tm->tm_mon, tm->tm_mday);
sprintf(extvar->GP_TIME, "%02d%02d%02d",
tm->tm_hour, tm->tm_min, tm->tm_sec);
/*
* read-only query don't have a valid distributed transaction ID, use
* "session id"-"command id" to identify the transaction.
*/
if (!getDistributedTransactionIdentifier(extvar->GP_XID))
sprintf(extvar->GP_XID, "%u-%.10u", gp_session_id, gp_command_count);
sprintf(extvar->GP_CID, "%x", QEDtxContextInfo.curcid);
sprintf(extvar->GP_SN, "%x", scancounter);
sprintf(extvar->GP_SEGMENT_ID, "%d", GpIdentity.segindex);
sprintf(extvar->GP_SEG_PORT, "%d", PostPortNumber);
sprintf(extvar->GP_SESSION_ID, "%d", gp_session_id);
sprintf(extvar->GP_SEGMENT_COUNT, "%d", getgpsegmentCount());
extvar->GP_QUERY_STRING = (char *)debug_query_string;
if (NULL != params)
{
char *line_delim_str = get_eol_delimiter(params);
line_delim_len = (int) strlen(line_delim_str);
if (line_delim_len > 0)
{
encoded_delim = (char *) (palloc(line_delim_len * 2 + 1));
base16_encode(line_delim_str, line_delim_len, encoded_delim);
}
else
{
line_delim_len = -1;
encoded_delim = "";
}
}
else
{
switch(eol_type)
{
case EOL_CR:
encoded_delim = "0D";
line_delim_len = 1;
break;
case EOL_NL:
encoded_delim = "0A";
line_delim_len = 1;
break;
case EOL_CRNL:
encoded_delim = "0D0A";
line_delim_len = 2;
break;
default:
encoded_delim = "";
line_delim_len = -1;
break;
}
}
extvar->GP_LINE_DELIM_STR = pstrdup(encoded_delim);
sprintf(extvar->GP_LINE_DELIM_LENGTH, "%d", line_delim_len);
}
static void
base16_encode(char *raw, int len, char *encoded)
{
const char *raw_bytes = raw;
char *encoded_bytes = encoded;
int remaining = len;
for (; remaining--; encoded_bytes += 2)
{
sprintf(encoded_bytes, "%02x", *(raw_bytes++));
}
}
static char *
get_eol_delimiter(List *params)
{
ListCell *lc = params->head;
while (lc)
{
if (pg_strcasecmp(((DefElem *) lc->data.ptr_value)->defname, "line_delim") == 0)
return pstrdup(((Value *) ((DefElem *) lc->data.ptr_value)->arg)->val.str);
lc = lc->next;
}
return pstrdup("");
}
/*
* url_fopen
*
......
......@@ -15,7 +15,7 @@
#include "postgres.h"
#include "access/extprotocol.h"
#include "access/fileam.h"
#include "access/url.h"
#include "catalog/pg_extprotocol.h"
#include "commands/copy.h"
#include "utils/memutils.h"
......
......@@ -18,7 +18,7 @@
#include <sys/wait.h>
#include <unistd.h>
#include "access/fileam.h"
#include "access/url.h"
#include "cdb/cdbtimer.h"
#include "cdb/cdbvars.h"
#include "libpq/pqsignal.h"
......
......@@ -37,6 +37,7 @@
#include "commands/defrem.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "foreign/fdwapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
......@@ -56,7 +57,7 @@
#include "utils/snapmgr.h"
#include "access/appendonlywriter.h"
#include "access/fileam.h"
#include "access/url.h"
#include "catalog/namespace.h"
#include "cdb/cdbappendonlyam.h"
#include "cdb/cdbaocsam.h"
......@@ -3446,6 +3447,16 @@ CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
MemoryContextSwitchTo(oldcontext);
}
static char *
linenumber_atoi(char *buffer, size_t bufsz, int64 linenumber)
{
if (linenumber < 0)
snprintf(buffer, bufsz, "%s", "N/A");
else
snprintf(buffer, bufsz, INT64_FORMAT, linenumber);
return buffer;
}
/*
* error context callback for COPY FROM
......@@ -3620,8 +3631,9 @@ CopyFrom(CopyState cstate)
ResultRelInfo *parentResultRelInfo;
List *resultRelInfoList = NULL;
EState *estate = CreateExecutorState(); /* for ExecConstraints() */
TupleTableSlot *baseSlot;
ModifyTableState *mtstate;
ExprContext *econtext; /* used for ExecEvalExpr for default atts */
TupleTableSlot *baseSlot;
MemoryContext oldcontext = CurrentMemoryContext;
ErrorContextCallback errcallback;
......@@ -3640,13 +3652,14 @@ CopyFrom(CopyState cstate)
bool *baseNulls;
GpDistributionData *part_distData = NULL;
int firstBufferedLineNo = 0;
bool is_external_table;
Assert(cstate->rel);
is_external_table = (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE &&
rel_is_external_table(RelationGetRelid(cstate->rel)));
if (cstate->rel->rd_rel->relkind != RELKIND_RELATION && !is_external_table)
/*
* The target must be a plain or foreign relation.
*/
if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
{
if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
ereport(ERROR,
......@@ -3658,11 +3671,6 @@ CopyFrom(CopyState cstate)
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to materialized view \"%s\"",
RelationGetRelationName(cstate->rel))));
else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to foreign table \"%s\"",
RelationGetRelationName(cstate->rel))));
else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
......@@ -3782,6 +3790,9 @@ CopyFrom(CopyState cstate)
parentResultRelInfo = resultRelInfo;
/* Verify the named relation is a valid target for INSERT */
CheckValidResultRel(resultRelInfo->ri_RelationDesc, CMD_INSERT);
ExecOpenIndices(resultRelInfo, false);
resultRelInfo->ri_resultSlot = MakeSingleTupleTableSlot(resultRelInfo->ri_RelationDesc->rd_att);
......@@ -3818,11 +3829,13 @@ CopyFrom(CopyState cstate)
* BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
* expressions. Such triggers or expressions might query the table we're
* inserting to, and act differently if the tuples that have already been
* processed and prepared for insertion are not there.
* processed and prepared for insertion are not there. We also can't do
* it if the table is foreign.
*/
if ((resultRelInfo->ri_TrigDesc != NULL &&
(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
resultRelInfo->ri_FdwRoutine != NULL ||
cstate->volatile_defexprs || cstate->oids)
{
useHeapMultiInsert = false;
......@@ -3832,6 +3845,21 @@ CopyFrom(CopyState cstate)
useHeapMultiInsert = true;
}
/*
* Set up a ModifyTableState so we can let FDW(s) init themselves for
* foreign-table result relation(s).
*/
mtstate = makeNode(ModifyTableState);
mtstate->ps.plan = NULL;
mtstate->ps.state = estate;
mtstate->operation = CMD_INSERT;
mtstate->resultRelInfo = estate->es_result_relations;
if (resultRelInfo->ri_FdwRoutine != NULL &&
resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
resultRelInfo);
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
......@@ -4156,12 +4184,6 @@ CopyFrom(CopyState cstate)
aocs_insert_init(resultRelInfo->ri_RelationDesc,
resultRelInfo->ri_aosegno, false);
}
else if (is_external_table &&
resultRelInfo->ri_extInsertDesc == NULL)
{
resultRelInfo->ri_extInsertDesc =
external_insert_init(resultRelInfo->ri_RelationDesc);
}
}
if (cstate->dispatch_mode == COPY_DISPATCH)
......@@ -4207,7 +4229,8 @@ CopyFrom(CopyState cstate)
ItemPointerData insertedTid;
/* Check the constraints of the tuple */
if (resultRelInfo->ri_RelationDesc->rd_att->constr)
if (resultRelInfo->ri_FdwRoutine == NULL &&
resultRelInfo->ri_RelationDesc->rd_att->constr)
ExecConstraints(resultRelInfo, slot, estate);
/* OK, store the tuple and create index entries for it */
......@@ -4272,12 +4295,27 @@ CopyFrom(CopyState cstate)
aocs_insert(resultRelInfo->ri_aocsInsertDesc, slot);
insertedTid = *slot_get_ctid(slot);
}
else if (is_external_table)
else if (resultRelInfo->ri_FdwRoutine != NULL)
{
HeapTuple tuple;
tuple = ExecFetchSlotHeapTuple(slot);
external_insert(resultRelInfo->ri_extInsertDesc, tuple);
slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
resultRelInfo,
slot,
NULL);
if (slot == NULL) /* "do nothing" */
continue;
/*
* AFTER ROW Triggers might reference the tableoid
* column, so (re-)initialize tts_tableOid before
* evaluating them.
*/
slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
/* FDW might have changed tuple */
tuple = ExecMaterializeSlot(slot);
ItemPointerSetInvalid(&insertedTid);
}
else
......@@ -4296,7 +4334,7 @@ CopyFrom(CopyState cstate)
if (resultRelInfo->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuples(slot, &insertedTid,
estate, false, NULL,
estate, false, NULL,
NIL);
/* AFTER ROW INSERT Triggers */
......@@ -4444,6 +4482,12 @@ CopyFrom(CopyState cstate)
* would be freed by FreeExecutorState anyhow */
ExecResetTupleTable(estate->es_tupleTable, false);
/* Allow the FDW to shut down */
if (parentResultRelInfo->ri_FdwRoutine != NULL &&
parentResultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
parentResultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
parentResultRelInfo);
/*
* Finalize appends and close relations we opened.
*
......
......@@ -40,7 +40,6 @@
#include "postgres.h"
#include "access/appendonlywriter.h"
#include "access/fileam.h"
#include "access/htup_details.h"
#include "access/sysattr.h"
#include "access/transam.h"
......@@ -2404,7 +2403,6 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
resultRelInfo->ri_projectReturning = NULL;
resultRelInfo->ri_aoInsertDesc = NULL;
resultRelInfo->ri_aocsInsertDesc = NULL;
resultRelInfo->ri_extInsertDesc = NULL;
resultRelInfo->ri_deleteDesc = NULL;
resultRelInfo->ri_updateDesc = NULL;
resultRelInfo->ri_aosegno = InvalidFileSegNumber;
......@@ -2423,8 +2421,6 @@ CloseResultRelInfo(ResultRelInfo *resultRelInfo)
appendonly_insert_finish(resultRelInfo->ri_aoInsertDesc);
if (resultRelInfo->ri_aocsInsertDesc)
aocs_insert_finish(resultRelInfo->ri_aocsInsertDesc);
if (resultRelInfo->ri_extInsertDesc)
external_insert_finish(resultRelInfo->ri_extInsertDesc);
if (resultRelInfo->ri_deleteDesc != NULL)
{
......
......@@ -53,7 +53,6 @@
#include "utils/rel.h"
#include "utils/tqual.h"
#include "access/fileam.h"
#include "access/transam.h"
#include "cdb/cdbaocsam.h"
#include "cdb/cdbappendonlyam.h"
......@@ -2311,6 +2310,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
List *rlist = (List *) lfirst(l);
List *rliststate;
resultRelInfo->ri_returningList = rlist;
rliststate = (List *) ExecInitExpr((Expr *) rlist, &mtstate->ps);
resultRelInfo->ri_projectReturning =
ExecBuildProjectionInfo(rliststate, econtext, slot,
......
......@@ -84,13 +84,6 @@ external_getnext(FileScanDesc scan,
extern ExternalInsertDesc external_insert_init(Relation rel);
extern Oid external_insert(ExternalInsertDesc extInsertDesc, HeapTuple instup);
extern void external_insert_finish(ExternalInsertDesc extInsertDesc);
extern void external_set_env_vars(extvar_t *extvar, char *uri, bool csv, char *escape, char *quote, bool header, uint32 scancounter);
extern char *linenumber_atoi(char *buffer, size_t bufsz, int64 linenumber);
/* prototypes for functions in url_execute.c */
extern int popen_with_stderr(int *rwepipe, const char *exe, bool forwrite);
extern int pclose_with_stderr(int pid, int *rwepipe, StringInfo sinfo);
extern char *make_command(const char *cmd, extvar_t *ev);
extern List *appendCopyEncodingOption(List *copyFmtOpts, int encoding);
......
......@@ -73,6 +73,12 @@ typedef struct extvar_t
/* an EXECUTE string will always be prefixed like this */
#define EXEC_URL_PREFIX "execute:"
extern void external_set_env_vars(extvar_t *extvar, char *uri, bool csv, char *escape,
char *quote, bool header, uint32 scancounter);
extern void external_set_env_vars_ext(extvar_t *extvar, char *uri, bool csv, char *escape,
char *quote, int eol_type, bool header,
uint32 scancounter, List *params);
/* exported functions */
extern URL_FILE *url_fopen(char *url, bool forwrite, extvar_t *ev, CopyState pstate, ExternalSelectDesc desc);
extern void url_fclose(URL_FILE *file, bool failOnError, const char *relname);
......@@ -82,6 +88,11 @@ extern size_t url_fread(void *ptr, size_t size, URL_FILE *file, CopyState pstate
extern size_t url_fwrite(void *ptr, size_t size, URL_FILE *file, CopyState pstate);
extern void url_fflush(URL_FILE *file, CopyState pstate);
/* prototypes for functions in url_execute.c */
extern int popen_with_stderr(int *rwepipe, const char *exe, bool forwrite);
extern int pclose_with_stderr(int pid, int *rwepipe, StringInfo sinfo);
extern char *make_command(const char *cmd, extvar_t *ev);
/* implementation-specific functions. */
extern URL_FILE *url_curl_fopen(char *url, bool forwrite, extvar_t *ev, CopyState pstate);
extern void url_curl_fclose(URL_FILE *file, bool failOnError, const char *relname);
......
......@@ -97,6 +97,12 @@ typedef TupleTableSlot *(*ExecForeignDelete_function) (EState *estate,
typedef void (*EndForeignModify_function) (EState *estate,
ResultRelInfo *rinfo);
typedef void (*BeginForeignInsert_function) (ModifyTableState *mtstate,
ResultRelInfo *rinfo);
typedef void (*EndForeignInsert_function) (EState *estate,
ResultRelInfo *rinfo);
typedef int (*IsForeignRelUpdatable_function) (Relation rel);
typedef bool (*PlanDirectModify_function) (PlannerInfo *root,
......@@ -199,6 +205,8 @@ typedef struct FdwRoutine
ExecForeignUpdate_function ExecForeignUpdate;
ExecForeignDelete_function ExecForeignDelete;
EndForeignModify_function EndForeignModify;
BeginForeignInsert_function BeginForeignInsert;
EndForeignInsert_function EndForeignInsert;
IsForeignRelUpdatable_function IsForeignRelUpdatable;
PlanDirectModify_function PlanDirectModify;
BeginDirectModify_function BeginDirectModify;
......
......@@ -383,6 +383,9 @@ typedef struct ResultRelInfo
AttrNumber ri_action_attno; /* is this an INSERT or DELETE ? */
AttrNumber ri_tupleoid_attno; /* old OID, when updating table with OIDs */
/* list of RETURNING expressions */
List *ri_returningList;
ProjectionInfo *ri_projectReturning;
ProjectionInfo *ri_onConflictSetProj;
List *ri_onConflictSetWhere;
......@@ -390,7 +393,6 @@ typedef struct ResultRelInfo
struct AppendOnlyInsertDescData *ri_aoInsertDesc;
struct AOCSInsertDescData *ri_aocsInsertDesc;
struct ExternalInsertDescData *ri_extInsertDesc;
RelationDeleteDesc ri_deleteDesc;
RelationUpdateDesc ri_updateDesc;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册