/*------------------------------------------------------------------------- * * copy.c * Implements the COPY utility command * * Portions Copyright (c) 2005-2008, Greenplum inc * Portions Copyright (c) 2012-Present Pivotal Software, Inc. * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * src/backend/commands/copy.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include "libpq-int.h" #include #include #include #include "access/heapam.h" #include "access/htup_details.h" #include "access/sysattr.h" #include "access/tableam.h" #include "access/xact.h" #include "access/xlog.h" #include "catalog/dependency.h" #include "catalog/pg_authid.h" #include "catalog/pg_type.h" #include "commands/copy.h" #include "commands/defrem.h" #include "commands/trigger.h" #include "executor/execPartition.h" #include "executor/executor.h" #include "executor/nodeModifyTable.h" #include "executor/tuptable.h" #include "foreign/fdwapi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "optimizer/optimizer.h" #include "nodes/makefuncs.h" #include "parser/parse_coerce.h" #include "parser/parse_collate.h" #include "parser/parse_expr.h" #include "parser/parse_relation.h" #include "port/pg_bswap.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/partcache.h" #include "utils/portal.h" #include "utils/rel.h" #include "utils/rls.h" #include "utils/snapmgr.h" #include "access/external.h" #include "access/url.h" #include "catalog/catalog.h" #include "catalog/namespace.h" #include "catalog/pg_extprotocol.h" #include "cdb/cdbappendonlyam.h" #include "cdb/cdbaocsam.h" #include "cdb/cdbconn.h" #include "cdb/cdbcopy.h" #include "cdb/cdbdisp_query.h" #include "cdb/cdbdispatchresult.h" #include "cdb/cdbsreh.h" #include "cdb/cdbvars.h" #include "commands/queue.h" #include "nodes/makefuncs.h" #include "postmaster/autostats.h" #include "utils/metrics_utils.h" #include "utils/resscheduler.h" #include "utils/string_utils.h" #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7')) #define OCTVALUE(c) ((c) - '0') /* * Represents the heap insert method to be used during COPY FROM. */ typedef enum CopyInsertMethod { CIM_SINGLE, /* use table_tuple_insert or fdw routine */ CIM_MULTI, /* always use table_multi_insert */ CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */ } CopyInsertMethod; /* * No more than this many tuples per CopyMultiInsertBuffer * * Caution: Don't make this too big, as we could end up with this many * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's * multiInsertBuffers list. Increasing this can cause quadratic growth in * memory requirements during copies into partitioned tables with a large * number of partitions. */ #define MAX_BUFFERED_TUPLES 1000 /* * Flush buffers if there are >= this many bytes, as counted by the input * size, of tuples stored. */ #define MAX_BUFFERED_BYTES 65535 /* Trim the list of buffers back down to this number after flushing */ #define MAX_PARTITION_BUFFERS 32 /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ BulkInsertState bistate; /* BulkInsertState for this rel */ int nused; /* number of 'slots' containing tuples */ uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy * stream */ } CopyMultiInsertBuffer; /* * Stores one or many CopyMultiInsertBuffers and details about the size and * number of tuples which are stored in them. This allows multiple buffers to * exist at once when COPYing into a partitioned table. */ typedef struct CopyMultiInsertInfo { List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */ int bufferedTuples; /* number of tuples buffered over all buffers */ int bufferedBytes; /* number of bytes from all buffered tuples */ CopyState cstate; /* Copy state for this CopyMultiInsertInfo */ EState *estate; /* Executor state used for COPY */ CommandId mycid; /* Command Id used for COPY */ int ti_options; /* table insert options */ } CopyMultiInsertInfo; /* * These macros centralize code used to process line_buf and raw_buf buffers. * They are macros because they often do continue/break control and to avoid * function call overhead in tight COPY loops. * * We must use "if (1)" because the usual "do {...} while(0)" wrapper would * prevent the continue/break processing from working. We end the "if (1)" * with "else ((void) 0)" to ensure the "if" does not unintentionally match * any "else" in the calling code, and to avoid any compiler warnings about * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros. */ /* * This keeps the character read at the top of the loop in the buffer * even if there is more than one read-ahead. */ #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \ if (1) \ { \ if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \ { \ raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \ need_data = true; \ continue; \ } \ } else ((void) 0) /* This consumes the remainder of the buffer and breaks */ #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \ if (1) \ { \ if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \ { \ if (extralen) \ raw_buf_ptr = copy_buf_len; /* consume the partial character */ \ /* backslash just before EOF, treat as data char */ \ result = true; \ break; \ } \ } else ((void) 0) /* * Transfer any approved data to line_buf; must do this to be sure * there is some room in raw_buf. */ #define REFILL_LINEBUF \ if (1) \ { \ if (raw_buf_ptr > cstate->raw_buf_index) \ { \ appendBinaryStringInfo(&cstate->line_buf, \ cstate->raw_buf + cstate->raw_buf_index, \ raw_buf_ptr - cstate->raw_buf_index); \ cstate->raw_buf_index = raw_buf_ptr; \ } \ } else ((void) 0) /* Undo any read-ahead and jump out of the block. */ #define NO_END_OF_COPY_GOTO \ if (1) \ { \ raw_buf_ptr = prev_raw_ptr + 1; \ goto not_end_of_copy; \ } else ((void) 0) static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; /* non-export function prototypes */ static void EndCopy(CopyState cstate); static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, Oid queryRelId, const char *filename, bool is_program, List *attnamelist, List *options); static void EndCopyTo(CopyState cstate, uint64 *processed); static uint64 DoCopyTo(CopyState cstate); static uint64 CopyToDispatch(CopyState cstate); static uint64 CopyTo(CopyState cstate); static uint64 CopyDispatchOnSegment(CopyState cstate, const CopyStmt *stmt); static uint64 CopyToQueryOnSegment(CopyState cstate); static bool CopyReadLine(CopyState cstate); static bool CopyReadLineText(CopyState cstate); static int CopyReadAttributesText(CopyState cstate, int stop_processing_at_field); static int CopyReadAttributesCSV(CopyState cstate, int stop_processing_at_field); static Datum CopyReadBinaryAttribute(CopyState cstate, int column_no, FmgrInfo *flinfo, Oid typioparam, int32 typmod, bool *isnull); static void CopyAttributeOutText(CopyState cstate, char *string); static void CopyAttributeOutCSV(CopyState cstate, char *string, bool use_quote, bool single_attr); /* Low-level communications functions */ static void SendCopyBegin(CopyState cstate); static void ReceiveCopyBegin(CopyState cstate); static void SendCopyEnd(CopyState cstate); static void CopySendData(CopyState cstate, const void *databuf, int datasize); static void CopySendString(CopyState cstate, const char *str); static void CopySendChar(CopyState cstate, char c); static int CopyGetData(CopyState cstate, void *databuf, int datasize); static void CopySendInt32(CopyState cstate, int32 val); static bool CopyGetInt32(CopyState cstate, int32 *val); static void CopySendInt16(CopyState cstate, int16 val); static bool CopyGetInt16(CopyState cstate, int16 *val); static void SendCopyFromForwardedTuple(CopyState cstate, CdbCopy *cdbCopy, bool toAll, int target_seg, Relation rel, int64 lineno, char *line, int line_len, Datum *values, bool *nulls); static void SendCopyFromForwardedHeader(CopyState cstate, CdbCopy *cdbCopy); static void SendCopyFromForwardedError(CopyState cstate, CdbCopy *cdbCopy, char *errmsg); static bool NextCopyFromDispatch(CopyState cstate, ExprContext *econtext, Datum *values, bool *nulls); static bool NextCopyFromExecute(CopyState cstate, ExprContext *econtext, Datum *values, bool *nulls); static bool NextCopyFromRawFieldsX(CopyState cstate, char ***fields, int *nfields, int stop_processing_at_field); static bool NextCopyFromX(CopyState cstate, ExprContext *econtext, Datum *values, bool *nulls); static void HandleCopyError(CopyState cstate); static void HandleQDErrorFrame(CopyState cstate, char *p, int len); static void CopyInitDataParser(CopyState cstate); static void setEncodingConversionProc(CopyState cstate, int encoding, bool iswritable); static GpDistributionData *InitDistributionData(CopyState cstate, EState *estate); static void FreeDistributionData(GpDistributionData *distData); static void InitCopyFromDispatchSplit(CopyState cstate, GpDistributionData *distData, EState *estate); static unsigned int GetTargetSeg(GpDistributionData *distData, TupleTableSlot *slot); static ProgramPipes *open_program_pipes(char *command, bool forwrite); static void close_program_pipes(CopyState cstate, bool ifThrow); CopyIntoClause* MakeCopyIntoClause(CopyStmt *stmt); static List *parse_joined_option_list(char *str, char *delimiter); /* ========================================================================== * The following macros aid in major refactoring of data processing code (in * CopyFrom(+Dispatch)). We use macros because in some cases the code must be in * line in order to work (for example elog_dismiss() in PG_CATCH) while in * other cases we'd like to inline the code for performance reasons. * * NOTE that an almost identical set of macros exists in fileam.c. If you make * changes here you may want to consider taking a look there as well. * ========================================================================== */ #define RESET_LINEBUF \ cstate->line_buf.len = 0; \ cstate->line_buf.data[0] = '\0'; \ cstate->line_buf.cursor = 0; #define RESET_ATTRBUF \ cstate->attribute_buf.len = 0; \ cstate->attribute_buf.data[0] = '\0'; \ cstate->attribute_buf.cursor = 0; #define RESET_LINEBUF_WITH_LINENO \ line_buf_with_lineno.len = 0; \ line_buf_with_lineno.data[0] = '\0'; \ line_buf_with_lineno.cursor = 0; static volatile CopyState glob_cstate = NULL; /* GPDB_91_MERGE_FIXME: passing through a global variable like this is ugly */ static CopyStmt *glob_copystmt = NULL; /* * Testing GUC: When enabled, COPY FROM prints an INFO line to indicate which * fields are processed in the QD, and which in the QE. */ extern bool Test_copy_qd_qe_split; /* * When doing a COPY FROM through the dispatcher, the QD reads the input from * the input file (or stdin or program), and forwards the data to the QE nodes, * where they will actually be inserted. * * Ideally, the QD would just pass through each line to the QE as is, and let * the QEs to do all the processing. Because the more processing the QD has * to do, the more likely it is to become a bottleneck. * * However, the QD needs to figure out which QE to send each row to. For that, * it needs to at least parse the distribution key. The distribution key might * also be a DEFAULTed column, in which case the DEFAULT value needs to be * evaluated in the QD. In that case, the QD must send the computed value * to the QE - we cannot assume that the QE can re-evaluate the expression and * arrive at the same value, at least not if the DEFAULT expression is volatile. * * Therefore, we need a flexible format between the QD and QE, where the QD * processes just enough of each input line to figure out where to send it. * It must send the values it had to parse and evaluate to the QE, as well * as the rest of the original input line, so that the QE can parse the rest * of it. * * The 'copy_from_dispatch_*' structs are used in the QD->QE stream. For each * input line, the QD constructs a 'copy_from_dispatch_row' struct, and sends * it to the QE. Before any rows, a QDtoQESignature is sent first, followed by * a 'copy_from_dispatch_header'. When QD encounters a recoverable error that * needs to be logged in the error log (LOG ERRORS SEGMENT REJECT LIMIT), it * sends the erroneous raw to a QE, in a 'copy_from_dispatch_error' struct. * * * COPY TO is simpler: The QEs form the output rows in the final form, and the QD * just collects and forwards them to the client. The QD doesn't need to parse * the rows at all. */ static const char QDtoQESignature[] = "PGCOPY-QD-TO-QE\n\377\r\n"; /* Header contains information that applies to all the rows that follow. */ typedef struct { /* * First field that should be processed in the QE. Any fields before * this will be included as Datums in the rows that follow. */ int16 first_qe_processed_field; } copy_from_dispatch_header; typedef struct { /* * Information about this input line. * * 'relid' is the target relation's OID. Normally, the same as * cstate->relid, but for a partitioned relation, it indicates the target * partition. Note: this must be the first field, because InvalidOid means * that this is actually a 'copy_from_dispatch_error' struct. * * 'lineno' is the input line number, for error reporting. */ int64 lineno; Oid relid; uint32 line_len; /* size of the included input line */ uint32 residual_off; /* offset in the line, where QE should * process remaining fields */ bool delim_seen_at_end; /* conveys to QE if QD saw a delim at end * of its processing */ uint16 fld_count; /* # of fields that were processed in the * QD. */ /* The input line follows. */ /* * For each field that was parsed in the QD already, the following data follows: * * int16 fieldnum; * * * NULL values are not included, any attributes that are not included in * the message are implicitly NULL. * * For pass-by-value datatypes, the is the raw Datum. For * simplicity, it is always sent as a full-width 8-byte Datum, regardless * of the datatype's length. * * For other fixed width datatypes, is the datatype's value. * * For variable-length datatypes, begins with a 4-byte length field, * followed by the data. Cstrings (typlen = -2) are also sent in this * format. */ } copy_from_dispatch_row; /* Size of the struct, without padding at the end. */ #define SizeOfCopyFromDispatchRow (offsetof(copy_from_dispatch_row, fld_count) + sizeof(uint16)) typedef struct { int64 error_marker; /* constant -1, to mark that this is an error * frame rather than 'copy_from_dispatch_row' */ int64 lineno; uint32 errmsg_len; uint32 line_len; bool line_buf_converted; /* 'errmsg' follows */ /* 'line' follows */ } copy_from_dispatch_error; /* Size of the struct, without padding at the end. */ #define SizeOfCopyFromDispatchError (offsetof(copy_from_dispatch_error, line_buf_converted) + sizeof(bool)) /* * Send copy start/stop messages for frontend copies. These have changed * in past protocol redesigns. */ static void SendCopyBegin(CopyState cstate) { if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) { /* new way */ StringInfoData buf; int natts = list_length(cstate->attnumlist); int16 format = (cstate->binary ? 1 : 0); int i; pq_beginmessage(&buf, 'H'); pq_sendbyte(&buf, format); /* overall format */ pq_sendint16(&buf, natts); for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); cstate->copy_dest = COPY_NEW_FE; } else { /* old way */ if (cstate->binary) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY BINARY is not supported to stdout or from stdin"))); pq_putemptymessage('H'); /* grottiness needed for old COPY OUT protocol */ pq_startcopyout(); cstate->copy_dest = COPY_OLD_FE; } } static void ReceiveCopyBegin(CopyState cstate) { if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) { /* new way */ StringInfoData buf; int natts = list_length(cstate->attnumlist); int16 format = (cstate->binary ? 1 : 0); int i; pq_beginmessage(&buf, 'G'); pq_sendbyte(&buf, format); /* overall format */ pq_sendint16(&buf, natts); for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); cstate->copy_dest = COPY_NEW_FE; cstate->fe_msgbuf = makeStringInfo(); } else { /* old way */ if (cstate->binary) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY BINARY is not supported to stdout or from stdin"))); pq_putemptymessage('G'); /* any error in old protocol will make us lose sync */ pq_startmsgread(); cstate->copy_dest = COPY_OLD_FE; } /* We *must* flush here to ensure FE knows it can send. */ pq_flush(); } static void SendCopyEnd(CopyState cstate) { if (cstate->copy_dest == COPY_NEW_FE) { /* Shouldn't have any unsent data */ Assert(cstate->fe_msgbuf->len == 0); /* Send Copy Done message */ pq_putemptymessage('c'); } else { CopySendData(cstate, "\\.", 2); /* Need to flush out the trailer (this also appends a newline) */ CopySendEndOfRow(cstate); pq_endcopyout(false); } } /*---------- * CopySendData sends output data to the destination (file or frontend) * CopySendString does the same for null-terminated strings * CopySendChar does the same for single characters * CopySendEndOfRow does the appropriate thing at end of each data row * (data is not actually flushed except by CopySendEndOfRow) * * NB: no data conversion is applied by these functions *---------- */ static void CopySendData(CopyState cstate, const void *databuf, int datasize) { appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize); } static void CopySendString(CopyState cstate, const char *str) { appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str)); } static void CopySendChar(CopyState cstate, char c) { appendStringInfoCharMacro(cstate->fe_msgbuf, c); } /* AXG: Note that this will both add a newline AND flush the data. * For the dispatcher COPY TO we don't want to use this method since * our newlines already exist. We use another new method similar to * this one to flush the data */ void CopySendEndOfRow(CopyState cstate) { StringInfo fe_msgbuf = cstate->fe_msgbuf; switch (cstate->copy_dest) { case COPY_FILE: if (!cstate->binary) { /* Default line termination depends on platform */ #ifndef WIN32 CopySendChar(cstate, '\n'); #else CopySendString(cstate, "\r\n"); #endif } if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) { if (cstate->is_program) { if (errno == EPIPE) { /* * The pipe will be closed automatically on error at * the end of transaction, but we might get a better * error message from the subprocess' exit code than * just "Broken Pipe" */ close_program_pipes(cstate, true); /* * If close_program_pipes() didn't throw an error, * the program terminated normally, but closed the * pipe first. Restore errno, and throw an error. */ errno = EPIPE; } ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to COPY program: %m"))); } else ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to COPY file: %m"))); } break; case COPY_OLD_FE: /* The FE/BE protocol uses \n as newline for all platforms */ if (!cstate->binary) CopySendChar(cstate, '\n'); if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len)) { /* no hope of recovering connection sync, so FATAL */ ereport(FATAL, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("connection lost during COPY to stdout"))); } break; case COPY_NEW_FE: /* The FE/BE protocol uses \n as newline for all platforms */ if (!cstate->binary) CopySendChar(cstate, '\n'); /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); break; case COPY_CALLBACK: /* we don't actually do the write here, we let the caller do it */ #ifndef WIN32 CopySendChar(cstate, '\n'); #else CopySendString(cstate, "\r\n"); #endif return; /* don't want to reset msgbuf quite yet */ } resetStringInfo(fe_msgbuf); } /* * AXG: This one is equivalent to CopySendEndOfRow() besides that * it doesn't send end of row - it just flushed the data. We need * this method for the dispatcher COPY TO since it already has data * with newlines (from the executors). */ static void CopyToDispatchFlush(CopyState cstate) { StringInfo fe_msgbuf = cstate->fe_msgbuf; switch (cstate->copy_dest) { case COPY_FILE: (void) fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file); if (ferror(cstate->copy_file)) { if (cstate->is_program) { if (errno == EPIPE) { /* * The pipe will be closed automatically on error at * the end of transaction, but we might get a better * error message from the subprocess' exit code than * just "Broken Pipe" */ close_program_pipes(cstate, true); /* * If close_program_pipes() didn't throw an error, * the program terminated normally, but closed the * pipe first. Restore errno, and throw an error. */ errno = EPIPE; } ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to COPY program: %m"))); } else ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to COPY file: %m"))); } break; case COPY_OLD_FE: if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len)) { /* no hope of recovering connection sync, so FATAL */ ereport(FATAL, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("connection lost during COPY to stdout"))); } break; case COPY_NEW_FE: /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); break; case COPY_CALLBACK: elog(ERROR, "unexpected destination COPY_CALLBACK to flush data"); break; } resetStringInfo(fe_msgbuf); } /* * CopyGetData reads data from the source (file or frontend) * * Note: when copying from the frontend, we expect a proper EOF mark per * protocol; if the frontend simply drops the connection, we raise error. * It seems unwise to allow the COPY IN to complete normally in that case. * * NB: no data conversion is applied here. * * Returns: the number of bytes that were successfully read * into the data buffer. */ static int CopyGetData(CopyState cstate, void *databuf, int datasize) { size_t bytesread = 0; switch (cstate->copy_dest) { case COPY_FILE: bytesread = fread(databuf, 1, datasize, cstate->copy_file); if (feof(cstate->copy_file)) cstate->reached_eof = true; if (ferror(cstate->copy_file)) { if (cstate->is_program) { int olderrno = errno; close_program_pipes(cstate, true); /* * If close_program_pipes() didn't throw an error, * the program terminated normally, but closed the * pipe first. Restore errno, and throw an error. */ errno = olderrno; ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from COPY program: %m"))); } else ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from COPY file: %m"))); } break; case COPY_OLD_FE: if (pq_getbytes((char *) databuf, datasize)) { /* Only a \. terminator is legal EOF in old protocol */ ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("unexpected EOF on client connection with an open transaction"))); } bytesread += datasize; /* update the count of bytes that were * read so far */ break; case COPY_NEW_FE: while (datasize > 0 && !cstate->reached_eof) { int avail; while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len) { /* Try to receive another message */ int mtype; readmessage: HOLD_CANCEL_INTERRUPTS(); pq_startmsgread(); mtype = pq_getbyte(); if (mtype == EOF) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("unexpected EOF on client connection with an open transaction"))); if (pq_getmessage(cstate->fe_msgbuf, 0)) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("unexpected EOF on client connection with an open transaction"))); RESUME_CANCEL_INTERRUPTS(); switch (mtype) { case 'd': /* CopyData */ break; case 'c': /* CopyDone */ /* COPY IN correctly terminated by frontend */ cstate->reached_eof = true; return bytesread; case 'f': /* CopyFail */ ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED), errmsg("COPY from stdin failed: %s", pq_getmsgstring(cstate->fe_msgbuf)))); break; case 'H': /* Flush */ case 'S': /* Sync */ /* * Ignore Flush/Sync for the convenience of client * libraries (such as libpq) that may send those * without noticing that the command they just * sent was COPY. */ goto readmessage; default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected message type 0x%02X during COPY from stdin", mtype))); break; } } avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor; if (avail > datasize) avail = datasize; pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail); databuf = (void *) ((char *) databuf + avail); bytesread += avail; /* update the count of bytes that were * read so far */ datasize -= avail; } break; case COPY_CALLBACK: bytesread = cstate->data_source_cb(databuf, datasize, datasize, cstate->data_source_cb_extra); break; } return bytesread; } /* * These functions do apply some data conversion */ /* * CopySendInt32 sends an int32 in network byte order */ static void CopySendInt32(CopyState cstate, int32 val) { uint32 buf; buf = pg_hton32((uint32) val); CopySendData(cstate, &buf, sizeof(buf)); } /* * CopyGetInt32 reads an int32 that appears in network byte order * * Returns true if OK, false if EOF */ static bool CopyGetInt32(CopyState cstate, int32 *val) { uint32 buf; if (CopyGetData(cstate, &buf, sizeof(buf)) != sizeof(buf)) { *val = 0; /* suppress compiler warning */ return false; } *val = (int32) pg_ntoh32(buf); return true; } /* * CopySendInt16 sends an int16 in network byte order */ static void CopySendInt16(CopyState cstate, int16 val) { uint16 buf; buf = pg_hton16((uint16) val); CopySendData(cstate, &buf, sizeof(buf)); } /* * CopyGetInt16 reads an int16 that appears in network byte order */ static bool CopyGetInt16(CopyState cstate, int16 *val) { uint16 buf; if (CopyGetData(cstate, &buf, sizeof(buf)) != sizeof(buf)) { *val = 0; /* suppress compiler warning */ return false; } *val = (int16) pg_ntoh16(buf); return true; } /* * CopyLoadRawBuf loads some more data into raw_buf * * Returns true if able to obtain at least one more byte, else false. * * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred * down to the start of the buffer and then we load more data after that. * This case is used only when a frontend multibyte character crosses a * bufferload boundary. */ static bool CopyLoadRawBuf(CopyState cstate) { int nbytes; int inbytes; if (cstate->raw_buf_index < cstate->raw_buf_len) { /* Copy down the unprocessed data */ nbytes = cstate->raw_buf_len - cstate->raw_buf_index; memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index, nbytes); } else nbytes = 0; /* no data need be saved */ inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes, RAW_BUF_SIZE - nbytes); nbytes += inbytes; cstate->raw_buf[nbytes] = '\0'; cstate->raw_buf_index = 0; cstate->raw_buf_len = nbytes; return (inbytes > 0); } /* * DoCopy executes the SQL COPY statement * * Either unload or reload contents of table , depending on . * ( = true means we are inserting into the table.) In the "TO" case * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE * or DELETE query. * * If is false, transfer is between the table and the file named * . Otherwise, transfer is between the table and our regular * input/output stream. The latter could be either stdin/stdout or a * socket, depending on whether we're running under Postmaster control. * * Do not allow a Postgres user without the 'pg_read_server_files' or * 'pg_write_server_files' role to read from or write to a file. * * Do not allow the copy if user doesn't have proper permission to access * the table or the specifically requested columns. */ void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, uint64 *processed) { CopyState cstate; bool is_from = stmt->is_from; bool pipe = (stmt->filename == NULL || Gp_role == GP_ROLE_EXECUTE); Relation rel; Oid relid; RawStmt *query = NULL; Node *whereClause = NULL; List *attnamelist = stmt->attlist; List *options; glob_cstate = NULL; glob_copystmt = (CopyStmt *) stmt; options = stmt->options; if (stmt->sreh && !is_from) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY single row error handling only available using COPY FROM"))); /* GPDB_91_MERGE_FIXME: this should probably be done earlier, e.g. in parser */ /* Transfer any SREH options to the options list, so that BeginCopy can see them. */ if (stmt->sreh) { SingleRowErrorDesc *sreh = (SingleRowErrorDesc *) stmt->sreh; options = list_copy(options); options = lappend(options, makeDefElem("sreh", (Node *) sreh, -1)); } /* * Disallow COPY to/from file or program except to users with the * appropriate role. */ if (!pipe) { if (stmt->is_program) { if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_EXECUTE_SERVER_PROGRAM)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser or a member of the pg_execute_server_program role to COPY to or from an external program"), errhint("Anyone can COPY to stdout or from stdin. " "psql's \\copy command also works for anyone."))); } else { if (is_from && !is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_SERVER_FILES)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser or a member of the pg_read_server_files role to COPY from a file"), errhint("Anyone can COPY to stdout or from stdin. " "psql's \\copy command also works for anyone."))); if (!is_from && !is_member_of_role(GetUserId(), DEFAULT_ROLE_WRITE_SERVER_FILES)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser or a member of the pg_write_server_files role to COPY to a file"), errhint("Anyone can COPY to stdout or from stdin. " "psql's \\copy command also works for anyone."))); } } if (stmt->relation) { LOCKMODE lockmode = is_from ? RowExclusiveLock : AccessShareLock; RangeTblEntry *rte; TupleDesc tupDesc; List *attnums; ListCell *cur; Assert(!stmt->query); /* Open and lock the relation, using the appropriate lock type. */ rel = table_openrv(stmt->relation, lockmode); if (is_from && !allowSystemTableMods && IsUnderPostmaster && IsSystemRelation(rel)) { ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("permission denied: \"%s\" is a system catalog", RelationGetRelationName(rel)), errhint("Make sure the configuration parameter allow_system_table_mods is set."))); } relid = RelationGetRelid(rel); rte = addRangeTableEntryForRelation(pstate, rel, lockmode, NULL, false, false); rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT); if (stmt->whereClause) { /* add rte to column namespace */ addRTEtoQuery(pstate, rte, false, true, true); /* Transform the raw expression tree */ whereClause = transformExpr(pstate, stmt->whereClause, EXPR_KIND_COPY_WHERE); /* Make sure it yields a boolean result. */ whereClause = coerce_to_boolean(pstate, whereClause, "WHERE"); /* we have to fix its collations too */ assign_expr_collations(pstate, whereClause); whereClause = eval_const_expressions(NULL, whereClause); whereClause = (Node *) canonicalize_qual((Expr *) whereClause, false); whereClause = (Node *) make_ands_implicit((Expr *) whereClause); } tupDesc = RelationGetDescr(rel); attnums = CopyGetAttnums(tupDesc, rel, attnamelist); foreach (cur, attnums) { int attno = lfirst_int(cur) - FirstLowInvalidHeapAttributeNumber; if (is_from) rte->insertedCols = bms_add_member(rte->insertedCols, attno); else rte->selectedCols = bms_add_member(rte->selectedCols, attno); } ExecCheckRTPerms(pstate->p_rtable, true); /* * Permission check for row security policies. * * check_enable_rls will ereport(ERROR) if the user has requested * something invalid and will otherwise indicate if we should enable * RLS (returns RLS_ENABLED) or not for this COPY statement. * * If the relation has a row security policy and we are to apply it * then perform a "query" copy and allow the normal query processing * to handle the policies. * * If RLS is not enabled for this, then just fall through to the * normal non-filtering relation handling. * * GPDB: Also do this for partitioned tables. In PostgreSQL, you get * an error: * * ERROR: cannot copy from partitioned table "foo" * HINT: Try the COPY (SELECT ...) TO variant. * * In GPDB 6 and before, support for COPYing partitioned table was * implemented deenop in the COPY processing code. In GPDB 7, * partitiong was replaced with upstream impementation, but for * backwards-compatibility, we do the translation to "COPY (SELECT * ...)" variant automatically, just like PostgreSQL does for RLS. */ if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED || (!is_from && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)) { SelectStmt *select; ColumnRef *cr; ResTarget *target; RangeVar *from; List *targetList = NIL; if (is_from) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY FROM not supported with row-level security"), errhint("Use INSERT statements instead."))); /* * Build target list * * If no columns are specified in the attribute list of the COPY * command, then the target list is 'all' columns. Therefore, '*' * should be used as the target list for the resulting SELECT * statement. * * In the case that columns are specified in the attribute list, * create a ColumnRef and ResTarget for each column and add them * to the target list for the resulting SELECT statement. */ if (!stmt->attlist) { cr = makeNode(ColumnRef); cr->fields = list_make1(makeNode(A_Star)); cr->location = -1; target = makeNode(ResTarget); target->name = NULL; target->indirection = NIL; target->val = (Node *) cr; target->location = -1; targetList = list_make1(target); } else { ListCell *lc; foreach(lc, stmt->attlist) { /* * Build the ColumnRef for each column. The ColumnRef * 'fields' property is a String 'Value' node (see * nodes/value.h) that corresponds to the column name * respectively. */ cr = makeNode(ColumnRef); cr->fields = list_make1(lfirst(lc)); cr->location = -1; /* Build the ResTarget and add the ColumnRef to it. */ target = makeNode(ResTarget); target->name = NULL; target->indirection = NIL; target->val = (Node *) cr; target->location = -1; /* Add each column to the SELECT statement's target list */ targetList = lappend(targetList, target); } } /* * Build RangeVar for from clause, fully qualified based on the * relation which we have opened and locked. */ from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)), pstrdup(RelationGetRelationName(rel)), -1); /* Build query */ select = makeNode(SelectStmt); select->targetList = targetList; select->fromClause = list_make1(from); query = makeNode(RawStmt); query->stmt = (Node *) select; query->stmt_location = stmt_location; query->stmt_len = stmt_len; /* * Close the relation for now, but keep the lock on it to prevent * changes between now and when we start the query-based COPY. * * We'll reopen it later as part of the query-based COPY. */ table_close(rel, NoLock); rel = NULL; } } else { Assert(stmt->query); query = makeNode(RawStmt); query->stmt = stmt->query; query->stmt_location = stmt_location; query->stmt_len = stmt_len; relid = InvalidOid; rel = NULL; } if (is_from) { Assert(rel); if (stmt->sreh && Gp_role != GP_ROLE_EXECUTE && !rel->rd_cdbpolicy) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY single row error handling only available for distributed user tables"))); /* * GPDB_91_MERGE_FIXME: is it possible to get to this point in the code * with a temporary relation that belongs to another session? If so, the * following code doesn't function as expected. */ /* check read-only transaction and parallel mode */ if (XactReadOnly && !rel->rd_islocaltemp) PreventCommandIfReadOnly("COPY FROM"); PreventCommandIfParallelMode("COPY FROM"); cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program, NULL, NULL, stmt->attlist, options); cstate->whereClause = whereClause; /* * Error handling setup */ if (cstate->sreh) { /* Single row error handling requested */ SingleRowErrorDesc *sreh = cstate->sreh; char log_to_file = LOG_ERRORS_DISABLE; if (IS_LOG_TO_FILE(sreh->log_error_type)) { cstate->errMode = SREH_LOG; /* LOG ERRORS PERSISTENTLY for COPY is not allowed for now. */ log_to_file = LOG_ERRORS_ENABLE; } else { cstate->errMode = SREH_IGNORE; } cstate->cdbsreh = makeCdbSreh(sreh->rejectlimit, sreh->is_limit_in_rows, cstate->filename, stmt->relation->relname, log_to_file); if (rel) cstate->cdbsreh->relid = RelationGetRelid(rel); } else { /* No single row error handling requested. Use "all or nothing" */ cstate->cdbsreh = NULL; /* default - no SREH */ cstate->errMode = ALL_OR_NOTHING; /* default */ } PG_TRY(); { if (Gp_role == GP_ROLE_DISPATCH && cstate->on_segment) *processed = CopyDispatchOnSegment(cstate, stmt); else *processed = CopyFrom(cstate); /* copy from file to database */ } PG_CATCH(); { if (cstate->cdbCopy) { MemoryContext oldcontext = MemoryContextSwitchTo(cstate->copycontext); cdbCopyAbort(cstate->cdbCopy); cstate->cdbCopy = NULL; MemoryContextSwitchTo(oldcontext); } PG_RE_THROW(); } PG_END_TRY(); EndCopyFrom(cstate); } else { /* * GPDB_91_MERGE_FIXME: ExecutorStart() is called in BeginCopyTo, * but the TRY-CATCH block only starts here. If an error is * thrown in-between, we would fail to call mppExecutorCleanup. We * really should be using a ResourceOwner or something else for * cleanup, instead of TRY-CATCH blocks... * * Update: I tried to fix this using the glob_cstate hack. It's ugly, * but fixes at least some cases that came up in regression tests. */ PG_TRY(); { cstate = BeginCopyTo(pstate, rel, query, relid, stmt->filename, stmt->is_program, stmt->attlist, options); /* * "copy t to file on segment" CopyDispatchOnSegment * "copy (select * from t) to file on segment" CopyToQueryOnSegment * "copy t/(select * from t) to file" DoCopyTo */ if (Gp_role == GP_ROLE_DISPATCH && cstate->on_segment) { if (cstate->rel) *processed = CopyDispatchOnSegment(cstate, stmt); else *processed = CopyToQueryOnSegment(cstate); } else *processed = DoCopyTo(cstate); /* copy from database to file */ } PG_CATCH(); { if (glob_cstate && glob_cstate->queryDesc) { /* should shutdown the mpp stuff such as interconnect and dispatch thread */ mppExecutorCleanup(glob_cstate->queryDesc); } PG_RE_THROW(); } PG_END_TRY(); EndCopyTo(cstate, processed); } /* * Close the relation. If reading, we can release the AccessShareLock we * got; if writing, we should hold the lock until end of transaction to * ensure that updates will be committed before lock is released. */ if (rel != NULL) table_close(rel, (is_from ? NoLock : AccessShareLock)); /* Issue automatic ANALYZE if conditions are satisfied (MPP-4082). */ if (Gp_role == GP_ROLE_DISPATCH && is_from) auto_stats(AUTOSTATS_CMDTYPE_COPY, relid, *processed, false /* inFunction */); } /* * Process the statement option list for COPY. * * Scan the options list (a list of DefElem) and transpose the information * into cstate, applying appropriate error checking. * * cstate is assumed to be filled with zeroes initially. * * This is exported so that external users of the COPY API can sanity-check * a list of options. In that usage, cstate should be passed as NULL * (since external users don't know sizeof(CopyStateData)) and the collected * data is just leaked until CurrentMemoryContext is reset. * * Note that additional checking, such as whether column names listed in FORCE * QUOTE actually exist, has to be applied later. This just checks for * self-consistency of the options list. */ void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options) { bool format_specified = false; ListCell *option; /* Support external use for option sanity checking */ if (cstate == NULL) cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); cstate->escape_off = false; cstate->skip_ext_partition = false; cstate->is_copy_from = is_from; cstate->delim_off = false; cstate->file_encoding = -1; /* Extract options from the statement node tree */ foreach(option, options) { DefElem *defel = lfirst_node(DefElem, option); if (strcmp(defel->defname, "format") == 0) { char *fmt = defGetString(defel); if (format_specified) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); format_specified = true; if (strcmp(fmt, "text") == 0) /* default format */ ; else if (strcmp(fmt, "csv") == 0) cstate->csv_mode = true; else if (strcmp(fmt, "binary") == 0) cstate->binary = true; else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY format \"%s\" not recognized", fmt), parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "freeze") == 0) { if (cstate->freeze) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); cstate->freeze = defGetBoolean(defel); } else if (strcmp(defel->defname, "delimiter") == 0) { if (cstate->delim) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); cstate->delim = defGetString(defel); if (cstate->delim && pg_strcasecmp(cstate->delim, "off") == 0) cstate->delim_off = true; } else if (strcmp(defel->defname, "null") == 0) { if (cstate->null_print) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); cstate->null_print = defGetString(defel); /* * MPP-2010: unfortunately serialization function doesn't * distinguish between 0x0 and empty string. Therefore we * must assume that if NULL AS was indicated and has no value * the actual value is an empty string. */ if(!cstate->null_print) cstate->null_print = ""; } else if (strcmp(defel->defname, "header") == 0) { if (cstate->header_line) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); cstate->header_line = defGetBoolean(defel); } else if (strcmp(defel->defname, "quote") == 0) { if (cstate->quote) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); cstate->quote = defGetString(defel); } else if (strcmp(defel->defname, "escape") == 0) { if (cstate->escape) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); cstate->escape = defGetString(defel); } else if (strcmp(defel->defname, "force_quote") == 0) { if (cstate->force_quote || cstate->force_quote_all) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); if (defel->arg && IsA(defel->arg, A_Star)) cstate->force_quote_all = true; else if (defel->arg && IsA(defel->arg, List)) cstate->force_quote = castNode(List, defel->arg); else if (defel->arg && IsA(defel->arg, String)) { if (strcmp(strVal(defel->arg), "*") == 0) cstate->force_quote_all = true; else { /* OPTIONS (force_quote 'c1,c2') */ cstate->force_quote = parse_joined_option_list(strVal(defel->arg), ","); } } else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a list of column names", defel->defname), parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "force_not_null") == 0) { if (cstate->force_notnull) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); if (defel->arg && IsA(defel->arg, List)) cstate->force_notnull = castNode(List, defel->arg); else if (defel->arg && IsA(defel->arg, String)) { /* OPTIONS (force_not_null 'c1,c2') */ cstate->force_notnull = parse_joined_option_list(strVal(defel->arg), ","); } else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a list of column names", defel->defname), parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "force_null") == 0) { if (cstate->force_null) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); if (defel->arg && IsA(defel->arg, List)) cstate->force_null = castNode(List, defel->arg); else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a list of column names", defel->defname), parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "convert_selectively") == 0) { /* * Undocumented, not-accessible-from-SQL option: convert only the * named columns to binary form, storing the rest as NULLs. It's * allowed for the column list to be NIL. */ if (cstate->convert_selectively) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); cstate->convert_selectively = true; if (defel->arg == NULL || IsA(defel->arg, List)) cstate->convert_select = castNode(List, defel->arg); else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a list of column names", defel->defname), parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "encoding") == 0) { if (cstate->file_encoding >= 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); cstate->file_encoding = pg_char_to_encoding(defGetString(defel)); if (cstate->file_encoding < 0) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a valid encoding name", defel->defname), parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "fill_missing_fields") == 0) { if (cstate->fill_missing) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); cstate->fill_missing = defGetBoolean(defel); } else if (strcmp(defel->defname, "newline") == 0) { if (cstate->eol_str) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); cstate->eol_str = strVal(defel->arg); } else if (strcmp(defel->defname, "sreh") == 0) { if (defel->arg == NULL || !IsA(defel->arg, SingleRowErrorDesc)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a list of column names", defel->defname))); if (cstate->sreh) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); cstate->sreh = (SingleRowErrorDesc *) defel->arg; } else if (strcmp(defel->defname, "on_segment") == 0) { if (cstate->on_segment) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); cstate->on_segment = true; } else if (strcmp(defel->defname, "skip_ext_partition") == 0) { if (cstate->skip_ext_partition) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); cstate->skip_ext_partition = true; } else if (!rel_is_external_table(cstate->rel->rd_id)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("option \"%s\" not recognized", defel->defname), parser_errposition(pstate, defel->location))); } /* * Check for incompatible options (must do these two before inserting * defaults) */ if (cstate->binary && cstate->delim) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("COPY cannot specify DELIMITER in BINARY mode"))); if (cstate->binary && cstate->null_print) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("COPY cannot specify NULL in BINARY mode"))); cstate->eol_type = EOL_UNKNOWN; /* Set defaults for omitted options */ if (!cstate->delim) cstate->delim = cstate->csv_mode ? "," : "\t"; if (!cstate->null_print) cstate->null_print = cstate->csv_mode ? "" : "\\N"; cstate->null_print_len = strlen(cstate->null_print); if (cstate->csv_mode) { if (!cstate->quote) cstate->quote = "\""; if (!cstate->escape) cstate->escape = cstate->quote; } if (!cstate->csv_mode && !cstate->escape) cstate->escape = "\\"; /* default escape for text mode */ /* Only single-byte delimiter strings are supported. */ /* GPDB: This is checked later */ #if 0 if (strlen(cstate->delim) != 1) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY delimiter must be a single one-byte character"))); #endif /* Disallow end-of-line characters */ if (strchr(cstate->delim, '\r') != NULL || strchr(cstate->delim, '\n') != NULL) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY delimiter cannot be newline or carriage return"))); if (strchr(cstate->null_print, '\r') != NULL || strchr(cstate->null_print, '\n') != NULL) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY null representation cannot use newline or carriage return"))); /* * Disallow unsafe delimiter characters in non-CSV mode. We can't allow * backslash because it would be ambiguous. We can't allow the other * cases because data characters matching the delimiter must be * backslashed, and certain backslash combinations are interpreted * non-literally by COPY IN. Disallowing all lower case ASCII letters is * more than strictly necessary, but seems best for consistency and * future-proofing. Likewise we disallow all digits though only octal * digits are actually dangerous. */ if (!cstate->csv_mode && !cstate->delim_off && strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789", cstate->delim[0]) != NULL) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY delimiter cannot be \"%s\"", cstate->delim))); /* Check header */ /* * In PostgreSQL, HEADER is not allowed in text mode either, but in GPDB, * only forbid it with BINARY. */ if (cstate->binary && cstate->header_line) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("COPY cannot specify HEADER in BINARY mode"))); /* Check quote */ if (!cstate->csv_mode && cstate->quote != NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY quote available only in CSV mode"))); if (cstate->csv_mode && strlen(cstate->quote) != 1) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY quote must be a single one-byte character"))); if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0] && !cstate->delim_off) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY delimiter and quote must be different"))); /* Check escape */ if (cstate->csv_mode && cstate->escape != NULL && strlen(cstate->escape) != 1) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY escape in CSV format must be a single character"))); if (!cstate->csv_mode && cstate->escape != NULL && (strchr(cstate->escape, '\r') != NULL || strchr(cstate->escape, '\n') != NULL)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY escape representation in text format cannot use newline or carriage return"))); if (!cstate->csv_mode && cstate->escape != NULL && strlen(cstate->escape) != 1) { if (pg_strcasecmp(cstate->escape, "off") != 0) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY escape must be a single character, or [OFF/off] to disable escapes"))); } /* Check force_quote */ if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force quote available only in CSV mode"))); if ((cstate->force_quote || cstate->force_quote_all) && is_from) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force quote only available using COPY TO"))); /* Check force_notnull */ if (!cstate->csv_mode && cstate->force_notnull != NIL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force not null available only in CSV mode"))); if (cstate->force_notnull != NIL && !is_from) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force not null only available using COPY FROM"))); /* Check force_null */ if (!cstate->csv_mode && cstate->force_null != NIL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force null available only in CSV mode"))); if (cstate->force_null != NIL && !is_from) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force null only available using COPY FROM"))); /* Don't allow the delimiter to appear in the null string. */ if (strchr(cstate->null_print, cstate->delim[0]) != NULL && !cstate->delim_off) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY delimiter must not appear in the NULL specification"))); /* Don't allow the CSV quote char to appear in the null string. */ if (cstate->csv_mode && strchr(cstate->null_print, cstate->quote[0]) != NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CSV quote character must not appear in the NULL specification"))); /* * DELIMITER * * Only single-byte delimiter strings are supported. In addition, if the * server encoding is a multibyte character encoding we only allow the * delimiter to be an ASCII character (like postgresql. For more info * on this see discussion and comments in MPP-3756). */ if (pg_database_encoding_max_length() == 1) { /* single byte encoding such as ascii, latinx and other */ if (strlen(cstate->delim) != 1 && !cstate->delim_off) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY delimiter must be a single one-byte character, or \'off\'"))); } else { /* multi byte encoding such as utf8 */ if ((strlen(cstate->delim) != 1 || IS_HIGHBIT_SET(cstate->delim[0])) && !cstate->delim_off ) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY delimiter must be a single one-byte character, or \'off\'"))); } if (!cstate->csv_mode && strchr(cstate->delim, '\\') != NULL) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY delimiter cannot be backslash"))); if (cstate->fill_missing && !is_from) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("fill missing fields only available for data loading, not unloading"))); /* * NEWLINE */ if (cstate->eol_str) { if (!is_from) { ereport(ERROR, (errcode(ERRCODE_GP_FEATURE_NOT_YET), errmsg("newline currently available for data loading only, not unloading"))); } else { if (pg_strcasecmp(cstate->eol_str, "lf") == 0) cstate->eol_type = EOL_NL; else if (pg_strcasecmp(cstate->eol_str, "cr") == 0) cstate->eol_type = EOL_CR; else if (pg_strcasecmp(cstate->eol_str, "crlf") == 0) cstate->eol_type = EOL_CRNL; else ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("invalid value for NEWLINE \"%s\"", cstate->eol_str), errhint("Valid options are: 'LF', 'CRLF' and 'CR'."))); } } if (cstate->escape != NULL && pg_strcasecmp(cstate->escape, "off") == 0) { cstate->escape_off = true; } } /* * Common setup routines used by BeginCopyFrom and BeginCopyTo. * * Iff , unload or reload in the binary format, as opposed to the * more wasteful but more robust and portable text format. * * Iff , unload or reload the format that includes OID information. * On input, we accept OIDs whether or not the table has an OID column, * but silently drop them if it does not. On output, we report an error * if the user asks for OIDs in a table that has none (not providing an * OID column might seem friendlier, but could seriously confuse programs). * * If in the text format, delimit columns with delimiter and print * NULL values as . */ CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, RawStmt *raw_query, Oid queryRelId, List *attnamelist, List *options, TupleDesc tupDesc) { CopyState cstate; int num_phys_attrs; MemoryContext oldcontext; /* Allocate workspace and zero all fields */ cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); glob_cstate = cstate; /* * We allocate everything used by a cstate in a new memory context. This * avoids memory leaks during repeated use of COPY in a query. */ cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext, "COPY", ALLOCSET_DEFAULT_SIZES); oldcontext = MemoryContextSwitchTo(cstate->copycontext); /* Greenplum needs this to detect custom protocol */ if (rel) cstate->rel = rel; /* Extract options from the statement node tree */ ProcessCopyOptions(pstate, cstate, is_from, options); if (cstate->delim_off && !rel_is_external_table(rel->rd_id)) { /* * We don't support delimiter 'off' for COPY because the QD COPY * sometimes internally adds columns to the data that it sends to * the QE COPY modules, and it uses the delimiter for it. There * are ways to work around this but for now it's not important and * we simply don't support it. */ ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("using no delimiter is only supported for external tables"))); } /* Process the source/target relation or query */ if (rel) { Assert(!raw_query); tupDesc = RelationGetDescr(cstate->rel); } else if(raw_query) { List *rewritten; Query *query; PlannedStmt *plan; DestReceiver *dest; Assert(!is_from); cstate->rel = NULL; /* * Run parse analysis and rewrite. Note this also acquires sufficient * locks on the source table(s). * * Because the parser and planner tend to scribble on their input, we * make a preliminary copy of the source querytree. This prevents * problems in the case that the COPY is in a portal or plpgsql * function and is executed repeatedly. (See also the same hack in * DECLARE CURSOR and PREPARE.) XXX FIXME someday. */ rewritten = pg_analyze_and_rewrite(copyObject(raw_query), pstate->p_sourcetext, NULL, 0, NULL); /* check that we got back something we can work with */ if (rewritten == NIL) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("DO INSTEAD NOTHING rules are not supported for COPY"))); } else if (list_length(rewritten) > 1) { ListCell *lc; /* examine queries to determine which error message to issue */ foreach(lc, rewritten) { Query *q = lfirst_node(Query, lc); if (q->querySource == QSRC_QUAL_INSTEAD_RULE) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("conditional DO INSTEAD rules are not supported for COPY"))); if (q->querySource == QSRC_NON_INSTEAD_RULE) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("DO ALSO rules are not supported for the COPY"))); } ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("multi-statement DO INSTEAD rules are not supported for COPY"))); } query = linitial_node(Query, rewritten); if (cstate->on_segment && IsA(query, Query)) { query->parentStmtType = PARENTSTMTTYPE_COPY; } /* Query mustn't use INTO, either */ if (query->utilityStmt != NULL && IsA(query->utilityStmt, CreateTableAsStmt)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY (SELECT INTO) is not supported"))); Assert(query->utilityStmt == NULL); /* * Similarly the grammar doesn't enforce the presence of a RETURNING * clause, but this is required here. */ if (query->commandType != CMD_SELECT && query->returningList == NIL) { Assert(query->commandType == CMD_INSERT || query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE); ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY query must have a RETURNING clause"))); } /* plan the query */ int cursorOptions = CURSOR_OPT_PARALLEL_OK; /* GPDB: Pass the IGNORE EXTERNAL PARTITION option to the planner. */ if (cstate->skip_ext_partition) cursorOptions |= CURSOR_OPT_SKIP_FOREIGN_PARTITIONS; plan = pg_plan_query(query, cursorOptions, NULL); /* * With row level security and a user using "COPY relation TO", we * have to convert the "COPY relation TO" to a query-based COPY (eg: * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add * in any RLS clauses. * * When this happens, we are passed in the relid of the originally * found relation (which we have locked). As the planner will look up * the relation again, we double-check here to make sure it found the * same one that we have locked. */ if (queryRelId != InvalidOid) { /* * Note that with RLS involved there may be multiple relations, * and while the one we need is almost certainly first, we don't * make any guarantees of that in the planner, so check the whole * list and make sure we find the original relation. */ if (!list_member_oid(plan->relationOids, queryRelId)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("relation referenced by COPY statement has changed"))); } /* * Use a snapshot with an updated command ID to ensure this query sees * results of any previously executed queries. */ PushCopiedSnapshot(GetActiveSnapshot()); UpdateActiveSnapshotCommandId(); /* Create dest receiver for COPY OUT */ dest = CreateDestReceiver(DestCopyOut); ((DR_copy *) dest)->cstate = cstate; /* Create a QueryDesc requesting no output */ cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext, GetActiveSnapshot(), InvalidSnapshot, dest, NULL, NULL, GP_INSTRUMENT_OPTS); if (cstate->on_segment) cstate->queryDesc->plannedstmt->copyIntoClause = MakeCopyIntoClause(glob_copystmt); /* GPDB hook for collecting query info */ if (query_info_collect_hook) (*query_info_collect_hook)(METRICS_QUERY_SUBMIT, cstate->queryDesc); /* * Call ExecutorStart to prepare the plan for execution. * * ExecutorStart computes a result tupdesc for us */ ExecutorStart(cstate->queryDesc, 0); tupDesc = cstate->queryDesc->tupDesc; } cstate->attnamelist = attnamelist; /* Generate or convert list of attributes to process */ cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); num_phys_attrs = tupDesc->natts; /* Convert FORCE_QUOTE name list to per-column flags, check validity */ cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); if (cstate->force_quote_all) { int i; for (i = 0; i < num_phys_attrs; i++) cstate->force_quote_flags[i] = true; } else if (cstate->force_quote) { List *attnums; ListCell *cur; attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote); foreach(cur, attnums) { int attnum = lfirst_int(cur); Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); if (!list_member_int(cstate->attnumlist, attnum)) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY", NameStr(attr->attname)))); cstate->force_quote_flags[attnum - 1] = true; } } /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */ cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); if (cstate->force_notnull) { List *attnums; ListCell *cur; attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull); foreach(cur, attnums) { int attnum = lfirst_int(cur); Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); if (!list_member_int(cstate->attnumlist, attnum)) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY", NameStr(attr->attname)))); cstate->force_notnull_flags[attnum - 1] = true; } } /* Convert FORCE_NULL name list to per-column flags, check validity */ cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); if (cstate->force_null) { List *attnums; ListCell *cur; attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null); foreach(cur, attnums) { int attnum = lfirst_int(cur); Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); if (!list_member_int(cstate->attnumlist, attnum)) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("FORCE_NULL column \"%s\" not referenced by COPY", NameStr(attr->attname)))); cstate->force_null_flags[attnum - 1] = true; } } /* Convert convert_selectively name list to per-column flags */ if (cstate->convert_selectively) { List *attnums; ListCell *cur; cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select); foreach(cur, attnums) { int attnum = lfirst_int(cur); Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); if (!list_member_int(cstate->attnumlist, attnum)) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg_internal("selected column \"%s\" not referenced by COPY", NameStr(attr->attname)))); cstate->convert_select_flags[attnum - 1] = true; } } /* Use client encoding when ENCODING option is not specified. */ if (cstate->file_encoding < 0) cstate->file_encoding = pg_get_client_encoding(); /* * Set up encoding conversion info. Even if the file and server encodings * are the same, we must apply pg_any_to_server() to validate data in * multibyte encodings. * * In COPY_EXECUTE mode, the dispatcher has already done the conversion. */ if (cstate->dispatch_mode != COPY_DISPATCH) { cstate->need_transcoding = ((cstate->file_encoding != GetDatabaseEncoding() || pg_database_encoding_max_length() > 1)); /* See Multibyte encoding comment above */ cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); setEncodingConversionProc(cstate, cstate->file_encoding, !is_from); } else { cstate->need_transcoding = false; cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); } cstate->copy_dest = COPY_FILE; /* default */ MemoryContextSwitchTo(oldcontext); return cstate; } /* * Dispatch a COPY ON SEGMENT statement to QEs. */ static uint64 CopyDispatchOnSegment(CopyState cstate, const CopyStmt *stmt) { CopyStmt *dispatchStmt; CdbPgResults pgresults = {0}; int i; uint64 processed = 0; uint64 rejected = 0; dispatchStmt = copyObject((CopyStmt *) stmt); CdbDispatchUtilityStatement((Node *) dispatchStmt, DF_NEED_TWO_PHASE | DF_WITH_SNAPSHOT | DF_CANCEL_ON_ERROR, NIL, &pgresults); /* * GPDB_91_MERGE_FIXME: SREH handling seems to be handled in a different * place for every type of copy. This should be consolidated with the * others. */ for (i = 0; i < pgresults.numResults; ++i) { struct pg_result *result = pgresults.pg_results[i]; processed += result->numCompleted; rejected += result->numRejected; } if (rejected) ReportSrehResults(NULL, rejected); cdbdisp_clearCdbPgResults(&pgresults); return processed; } /* * Modify the filename in cstate->filename, and cstate->cdbsreh if any, * for COPY ON SEGMENT. * * Replaces the "" token in the filename with this segment's ID. */ static void MangleCopyFileName(CopyState cstate) { char *filename = cstate->filename; StringInfoData filepath; initStringInfo(&filepath); appendStringInfoString(&filepath, filename); replaceStringInfoString(&filepath, "", DataDir); if (strstr(filename, "") == NULL) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg(" is required for file name"))); char segid_buf[8]; snprintf(segid_buf, 8, "%d", GpIdentity.segindex); replaceStringInfoString(&filepath, "", segid_buf); cstate->filename = filepath.data; /* Rename filename if error log needed */ if (NULL != cstate->cdbsreh) { snprintf(cstate->cdbsreh->filename, sizeof(cstate->cdbsreh->filename), "%s", filepath.data); } } /* * Release resources allocated in a cstate for COPY TO/FROM. */ static void EndCopy(CopyState cstate) { if (cstate->is_program) { close_program_pipes(cstate, true); } else { if (cstate->filename != NULL && FreeFile(cstate->copy_file)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not close file \"%s\": %m", cstate->filename))); } /* Clean up single row error handling related memory */ if (cstate->cdbsreh) destroyCdbSreh(cstate->cdbsreh); MemoryContextDelete(cstate->copycontext); pfree(cstate); } CopyIntoClause* MakeCopyIntoClause(CopyStmt *stmt) { CopyIntoClause *copyIntoClause; copyIntoClause = makeNode(CopyIntoClause); copyIntoClause->is_program = stmt->is_program; copyIntoClause->filename = stmt->filename; copyIntoClause->options = stmt->options; copyIntoClause->attlist = stmt->attlist; return copyIntoClause; } CopyState BeginCopyToOnSegment(QueryDesc *queryDesc) { CopyState cstate; MemoryContext oldcontext; ListCell *cur; TupleDesc tupDesc; int num_phys_attrs; FormData_pg_attribute *attr; char *filename; CopyIntoClause *copyIntoClause; Assert(Gp_role == GP_ROLE_EXECUTE); copyIntoClause = queryDesc->plannedstmt->copyIntoClause; tupDesc = queryDesc->tupDesc; cstate = BeginCopy(false, NULL, NULL, NULL, InvalidOid, copyIntoClause->attlist, copyIntoClause->options, tupDesc); oldcontext = MemoryContextSwitchTo(cstate->copycontext); cstate->null_print_client = cstate->null_print; /* default */ /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ cstate->fe_msgbuf = makeStringInfo(); cstate->filename = pstrdup(copyIntoClause->filename); cstate->is_program = copyIntoClause->is_program; if (cstate->on_segment) MangleCopyFileName(cstate); filename = cstate->filename; if (cstate->is_program) { cstate->program_pipes = open_program_pipes(cstate->filename, true); cstate->copy_file = fdopen(cstate->program_pipes->pipes[0], PG_BINARY_W); if (cstate->copy_file == NULL) ereport(ERROR, (errmsg("could not execute command \"%s\": %m", cstate->filename))); } else { mode_t oumask; /* Pre-existing umask value */ struct stat st; /* * Prevent write to relative path ... too easy to shoot oneself in * the foot by overwriting a database file ... */ if (!is_absolute_path(filename)) ereport(ERROR, (errcode(ERRCODE_INVALID_NAME), errmsg("relative path not allowed for COPY to file"))); oumask = umask(S_IWGRP | S_IWOTH); cstate->copy_file = AllocateFile(filename, PG_BINARY_W); umask(oumask); if (cstate->copy_file == NULL) ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\" for writing: %m", filename))); // Increase buffer size to improve performance (cmcdevitt) setvbuf(cstate->copy_file, NULL, _IOFBF, 393216); // 384 Kbytes fstat(fileno(cstate->copy_file), &st); if (S_ISDIR(st.st_mode)) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is a directory", filename))); } attr = tupDesc->attrs; num_phys_attrs = tupDesc->natts; /* Get info about the columns we need to process. */ cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); Oid out_func_oid; bool isvarlena; if (cstate->binary) getTypeBinaryOutputInfo(attr[attnum - 1].atttypid, &out_func_oid, &isvarlena); else getTypeOutputInfo(attr[attnum - 1].atttypid, &out_func_oid, &isvarlena); fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); } /* * Create a temporary memory context that we can reset once per row to * recover palloc'd memory. This avoids any problems with leaks inside * datatype output routines, and should be faster than retail pfree's * anyway. (We don't need a whole econtext as CopyFrom does.) */ cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext, "COPY TO", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); if (cstate->binary) { /* Generate header for a binary copy */ int32 tmp; /* Signature */ CopySendData(cstate, BinarySignature, 11); /* Flags field */ tmp = 0; CopySendInt32(cstate, tmp); /* No header extension */ tmp = 0; CopySendInt32(cstate, tmp); } else { /* if a header has been requested send the line */ if (cstate->header_line) { bool hdr_delim = false; foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); char *colname; if (hdr_delim) CopySendChar(cstate, cstate->delim[0]); hdr_delim = true; colname = NameStr(attr[attnum - 1].attname); CopyAttributeOutCSV(cstate, colname, false, list_length(cstate->attnumlist) == 1); } CopySendEndOfRow(cstate); } } MemoryContextSwitchTo(oldcontext); return cstate; } /* * Setup CopyState to read tuples from a table or a query for COPY TO. */ static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, Oid queryRelId, const char *filename, bool is_program, List *attnamelist, List *options) { CopyState cstate; MemoryContext oldcontext; if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION) { if (rel->rd_rel->relkind == RELKIND_VIEW) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from view \"%s\"", RelationGetRelationName(rel)), errhint("Try the COPY (SELECT ...) TO variant."))); else if (rel->rd_rel->relkind == RELKIND_MATVIEW) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from materialized view \"%s\"", RelationGetRelationName(rel)), errhint("Try the COPY (SELECT ...) TO variant."))); else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from foreign table \"%s\"", RelationGetRelationName(rel)), errhint("Try the COPY (SELECT ...) TO variant."))); else if (rel->rd_rel->relkind == RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from sequence \"%s\"", RelationGetRelationName(rel)))); /* * GPDB: This is not reached in GPDB, because we transform the command * to the COPY (SELECT ...) TO variant automatically earlier already. */ else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from partitioned table \"%s\"", RelationGetRelationName(rel)), errhint("Try the COPY (SELECT ...) TO variant."))); else ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from non-table relation \"%s\"", RelationGetRelationName(rel)))); } cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist, options, NULL); oldcontext = MemoryContextSwitchTo(cstate->copycontext); /* Determine the mode */ if (Gp_role == GP_ROLE_DISPATCH && !cstate->on_segment && cstate->rel && cstate->rel->rd_cdbpolicy) { cstate->dispatch_mode = COPY_DISPATCH; } else cstate->dispatch_mode = COPY_DIRECT; /* * GPDB_12_MERGE_FIXME: We used to have this limitation, but now that we * handle partitioned tables by invoking the planner (we basically do * the "COPY (SELECT ...) TO" trick automatically now), we handle it * just fine. Do we still want to restrict it? If not, we can just * remove this. * * Whether we resurrect or remove this, we should add a test for it; no * existing test covered this error. */ #if 0 if (rel != NULL && rel_has_external_partition(rel->rd_id)) { if (!cstate->skip_ext_partition) { ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy from relation \"%s\" which has external partition(s)", RelationGetRelationName(rel)), errhint("Try the COPY (SELECT ...) TO variant."))); } } #endif bool pipe = (filename == NULL || (Gp_role == GP_ROLE_EXECUTE && !cstate->on_segment)); if (cstate->on_segment && Gp_role == GP_ROLE_DISPATCH) { /* in ON SEGMENT mode, we don't open anything on the dispatcher. */ if (filename == NULL) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("STDOUT is not supported by 'COPY ON SEGMENT'"))); } else if (pipe) { Assert(!is_program || Gp_role == GP_ROLE_EXECUTE); /* the grammar does not allow this */ if (whereToSendOutput != DestRemote) cstate->copy_file = stdout; } else { cstate->filename = pstrdup(filename); cstate->is_program = is_program; if (cstate->on_segment) MangleCopyFileName(cstate); filename = cstate->filename; if (is_program) { cstate->program_pipes = open_program_pipes(cstate->filename, true); cstate->copy_file = fdopen(cstate->program_pipes->pipes[0], PG_BINARY_W); if (cstate->copy_file == NULL) ereport(ERROR, (errcode_for_file_access(), errmsg("could not execute command \"%s\": %m", cstate->filename))); } else { mode_t oumask; /* Pre-existing umask value */ struct stat st; /* * Prevent write to relative path ... too easy to shoot oneself in * the foot by overwriting a database file ... */ if (!is_absolute_path(filename)) ereport(ERROR, (errcode(ERRCODE_INVALID_NAME), errmsg("relative path not allowed for COPY to file"))); oumask = umask(S_IWGRP | S_IWOTH); PG_TRY(); { cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); } PG_CATCH(); { umask(oumask); PG_RE_THROW(); } PG_END_TRY(); umask(oumask); if (cstate->copy_file == NULL) { /* copy errno because ereport subfunctions might change it */ int save_errno = errno; ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\" for writing: %m", cstate->filename), (save_errno == ENOENT || save_errno == EACCES) ? errhint("COPY TO instructs the PostgreSQL server process to write a file. " "You may want a client-side facility such as psql's \\copy.") : 0)); // Increase buffer size to improve performance (cmcdevitt) setvbuf(cstate->copy_file, NULL, _IOFBF, 393216); // 384 Kbytes } if (fstat(fileno(cstate->copy_file), &st)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", cstate->filename))); if (S_ISDIR(st.st_mode)) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is a directory", filename))); } } MemoryContextSwitchTo(oldcontext); return cstate; } /* * Set up CopyState for writing to a foreign or external table. */ CopyState BeginCopyToForeignTable(Relation forrel, List *options) { CopyState cstate; Assert(forrel->rd_rel->relkind == RELKIND_FOREIGN_TABLE); cstate = BeginCopy(NULL, false, forrel, NULL, /* raw_query */ InvalidOid, NIL, options, RelationGetDescr(forrel)); cstate->dispatch_mode = COPY_DIRECT; /* * We use COPY_CALLBACK to mean that the each line should be * left in fe_msgbuf. There is no actual callback! */ cstate->copy_dest = COPY_CALLBACK; /* * Some more initialization, that in the normal COPY TO codepath, is done * in CopyTo() itself. */ cstate->null_print_client = cstate->null_print; /* default */ if (cstate->need_transcoding) cstate->null_print_client = pg_server_to_custom(cstate->null_print, cstate->null_print_len, cstate->file_encoding, cstate->enc_conversion_proc); return cstate; } /* * This intermediate routine exists mainly to localize the effects of setjmp * so we don't need to plaster a lot of variables with "volatile". */ static uint64 DoCopyTo(CopyState cstate) { bool pipe = (cstate->filename == NULL); bool fe_copy = (pipe && whereToSendOutput == DestRemote); uint64 processed; #ifdef FAULT_INJECTOR FaultInjector_InjectFaultIfSet("DoCopyToFail", DDLNotSpecified, "", ""); #endif PG_TRY(); { if (fe_copy) SendCopyBegin(cstate); /* * We want to dispatch COPY TO commands only in the case that * we are the dispatcher and we are copying from a user relation * (a relation where data is distributed in the segment databases). * Otherwize, if we are not the dispatcher *or* if we are * doing COPY (SELECT) we just go straight to work, without * dispatching COPY commands to executors. */ if (Gp_role == GP_ROLE_DISPATCH && cstate->rel && cstate->rel->rd_cdbpolicy) processed = CopyToDispatch(cstate); else processed = CopyTo(cstate); if (fe_copy) SendCopyEnd(cstate); else if (Gp_role == GP_ROLE_EXECUTE && cstate->on_segment) { /* * For COPY ON SEGMENT command, switch back to front end * before sending copy end which is "\." */ cstate->copy_dest = COPY_NEW_FE; SendCopyEnd(cstate); } } PG_CATCH(); { /* * Make sure we turn off old-style COPY OUT mode upon error. It is * okay to do this in all cases, since it does nothing if the mode is * not on. */ if (Gp_role == GP_ROLE_EXECUTE && cstate->on_segment) cstate->copy_dest = COPY_NEW_FE; pq_endcopyout(true); PG_RE_THROW(); } PG_END_TRY(); return processed; } void EndCopyToOnSegment(CopyState cstate) { Assert(Gp_role == GP_ROLE_EXECUTE); if (cstate->binary) { /* Generate trailer for a binary copy */ CopySendInt16(cstate, -1); /* Need to flush out the trailer */ CopySendEndOfRow(cstate); } MemoryContextDelete(cstate->rowcontext); EndCopy(cstate); } /* * Clean up storage and release resources for COPY TO. */ static void EndCopyTo(CopyState cstate, uint64 *processed) { if (cstate->queryDesc != NULL) { /* Close down the query and free resources. */ ExecutorFinish(cstate->queryDesc); ExecutorEnd(cstate->queryDesc); if (cstate->queryDesc->es_processed > 0) *processed = cstate->queryDesc->es_processed; FreeQueryDesc(cstate->queryDesc); PopActiveSnapshot(); } /* Clean up storage */ EndCopy(cstate); } /* * Copy FROM relation TO file, in the dispatcher. Starts a COPY TO command on * each of the executors and gathers all the results and writes it out. */ static uint64 CopyToDispatch(CopyState cstate) { CopyStmt *stmt = glob_copystmt; TupleDesc tupDesc; int num_phys_attrs; int attr_count; FormData_pg_attribute *attr; CdbCopy *cdbCopy; uint64 processed = 0; tupDesc = cstate->rel->rd_att; attr = tupDesc->attrs; num_phys_attrs = tupDesc->natts; attr_count = list_length(cstate->attnumlist); /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ cstate->fe_msgbuf = makeStringInfo(); cdbCopy = makeCdbCopy(cstate, false); /* XXX: lock all partitions */ /* * Start a COPY command in every db of every segment in Greenplum Database. * * From this point in the code we need to be extra careful * about error handling. ereport() must not be called until * the COPY command sessions are closed on the executors. * Calling ereport() will leave the executors hanging in * COPY state. */ elog(DEBUG5, "COPY command sent to segdbs"); PG_TRY(); { bool done; cdbCopyStart(cdbCopy, stmt, cstate->file_encoding); if (cstate->binary) { /* Generate header for a binary copy */ int32 tmp; /* Signature */ CopySendData(cstate, (char *) BinarySignature, 11); /* Flags field */ tmp = 0; CopySendInt32(cstate, tmp); /* No header extension */ tmp = 0; CopySendInt32(cstate, tmp); } /* if a header has been requested send the line */ if (cstate->header_line) { ListCell *cur; bool hdr_delim = false; /* * For non-binary copy, we need to convert null_print to client * encoding, because it will be sent directly with CopySendString. * * MPP: in here we only care about this if we need to print the * header. We rely on the segdb server copy out to do the conversion * before sending the data rows out. We don't need to repeat it here */ if (cstate->need_transcoding) cstate->null_print = (char *) pg_server_to_custom(cstate->null_print, strlen(cstate->null_print), cstate->file_encoding, cstate->enc_conversion_proc); foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); char *colname; if (hdr_delim) CopySendChar(cstate, cstate->delim[0]); hdr_delim = true; colname = NameStr(attr[attnum - 1].attname); CopyAttributeOutCSV(cstate, colname, false, list_length(cstate->attnumlist) == 1); } /* add a newline and flush the data */ CopySendEndOfRow(cstate); } /* * This is the main work-loop. In here we keep collecting data from the * COPY commands on the segdbs, until no more data is available. We * keep writing data out a chunk at a time. */ do { bool copy_cancel = (QueryCancelPending ? true : false); /* get a chunk of data rows from the QE's */ done = cdbCopyGetData(cdbCopy, copy_cancel, &processed); /* send the chunk of data rows to destination (file or stdout) */ if (cdbCopy->copy_out_buf.len > 0) /* conditional is important! */ { /* * in the dispatcher we receive chunks of whole rows with row endings. * We don't want to use CopySendEndOfRow() b/c it adds row endings and * also b/c it's intended for a single row at a time. Therefore we need * to fill in the out buffer and just flush it instead. */ CopySendData(cstate, (void *) cdbCopy->copy_out_buf.data, cdbCopy->copy_out_buf.len); CopyToDispatchFlush(cstate); } } while(!done); cdbCopyEnd(cdbCopy, NULL, NULL); /* now it's safe to destroy the whole dispatcher state */ CdbDispatchCopyEnd(cdbCopy); } /* catch error from CopyStart, CopySendEndOfRow or CopyToDispatchFlush */ PG_CATCH(); { MemoryContext oldcontext = MemoryContextSwitchTo(cstate->copycontext); cdbCopyAbort(cdbCopy); MemoryContextSwitchTo(oldcontext); PG_RE_THROW(); } PG_END_TRY(); if (cstate->binary) { /* Generate trailer for a binary copy */ CopySendInt16(cstate, -1); /* Need to flush out the trailer */ CopySendEndOfRow(cstate); } /* we can throw the error now if QueryCancelPending was set previously */ CHECK_FOR_INTERRUPTS(); pfree(cdbCopy); return processed; } static uint64 CopyToQueryOnSegment(CopyState cstate) { Assert(Gp_role != GP_ROLE_EXECUTE); /* run the plan --- the dest receiver will send tuples */ ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true); return 0; } /* * Copy from relation or query TO file. */ static uint64 CopyTo(CopyState cstate) { TupleDesc tupDesc; int num_phys_attrs; ListCell *cur; uint64 processed = 0; if (cstate->rel) tupDesc = RelationGetDescr(cstate->rel); else tupDesc = cstate->queryDesc->tupDesc; num_phys_attrs = tupDesc->natts; cstate->null_print_client = cstate->null_print; /* default */ /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ cstate->fe_msgbuf = makeStringInfo(); /* Get info about the columns we need to process. */ cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); Oid out_func_oid; bool isvarlena; Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); if (cstate->binary) getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); else getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); } /* * Create a temporary memory context that we can reset once per row to * recover palloc'd memory. This avoids any problems with leaks inside * datatype output routines, and should be faster than retail pfree's * anyway. (We don't need a whole econtext as CopyFrom does.) */ cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext, "COPY TO", ALLOCSET_DEFAULT_SIZES); if (!cstate->binary) { /* * For non-binary copy, we need to convert null_print to file * encoding, because it will be sent directly with CopySendString. */ if (cstate->need_transcoding) cstate->null_print_client = pg_server_to_custom(cstate->null_print, cstate->null_print_len, cstate->file_encoding, cstate->enc_conversion_proc); } if (Gp_role == GP_ROLE_EXECUTE && !cstate->on_segment) { /* header should not be printed in execute mode. */ } else if (cstate->binary) { /* Generate header for a binary copy */ int32 tmp; /* Signature */ CopySendData(cstate, BinarySignature, 11); /* Flags field */ tmp = 0; CopySendInt32(cstate, tmp); /* No header extension */ tmp = 0; CopySendInt32(cstate, tmp); } else { /* if a header has been requested send the line */ if (cstate->header_line) { bool hdr_delim = false; foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); char *colname; if (hdr_delim) CopySendChar(cstate, cstate->delim[0]); hdr_delim = true; colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); CopyAttributeOutCSV(cstate, colname, false, list_length(cstate->attnumlist) == 1); } CopySendEndOfRow(cstate); } } if (cstate->rel) { TupleTableSlot *slot; TableScanDesc scandesc; scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL); slot = table_slot_create(cstate->rel, NULL); processed = 0; while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot)) { CHECK_FOR_INTERRUPTS(); /* Deconstruct the tuple ... */ slot_getallattrs(slot); /* Format and send the data */ CopyOneRowTo(cstate, slot); processed++; } ExecDropSingleTupleTableSlot(slot); table_endscan(scandesc); } else { Assert(Gp_role != GP_ROLE_EXECUTE); /* run the plan --- the dest receiver will send tuples */ ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true); processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } if (Gp_role == GP_ROLE_EXECUTE && !cstate->on_segment) { /* * Trailer should not be printed in execute mode. The dispatcher will * write it once. */ } else if (cstate->binary) { /* Generate trailer for a binary copy */ CopySendInt16(cstate, -1); /* Need to flush out the trailer */ CopySendEndOfRow(cstate); } if (Gp_role == GP_ROLE_EXECUTE && cstate->on_segment) SendNumRows(0, processed); MemoryContextDelete(cstate->rowcontext); return processed; } void CopyOneCustomRowTo(CopyState cstate, bytea *value) { appendBinaryStringInfo(cstate->fe_msgbuf, VARDATA_ANY((void *) value), VARSIZE_ANY_EXHDR((void *) value)); } /* * Emit one row during CopyTo(). */ void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot) { bool need_delim = false; FmgrInfo *out_functions = cstate->out_functions; MemoryContext oldcontext; ListCell *cur; char *string; MemoryContextReset(cstate->rowcontext); oldcontext = MemoryContextSwitchTo(cstate->rowcontext); if (cstate->binary) { /* Binary per-tuple header */ CopySendInt16(cstate, list_length(cstate->attnumlist)); } /* Make sure the tuple is fully deconstructed */ slot_getallattrs(slot); foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); Datum value = slot->tts_values[attnum - 1]; bool isnull = slot->tts_isnull[attnum - 1]; if (!cstate->binary) { if (need_delim) CopySendChar(cstate, cstate->delim[0]); need_delim = true; } if (isnull) { if (!cstate->binary) CopySendString(cstate, cstate->null_print_client); else CopySendInt32(cstate, -1); } else { if (!cstate->binary) { char quotec = cstate->quote ? cstate->quote[0] : '\0'; /* int2out or int4out ? */ if (out_functions[attnum -1].fn_oid == 39 || /* int2out or int4out */ out_functions[attnum -1].fn_oid == 43 ) { char tmp[33]; /* * The standard postgres way is to call the output function, but that involves one or more pallocs, * and a call to sprintf, followed by a conversion to client charset. * Do a fast conversion to string instead. */ if (out_functions[attnum -1].fn_oid == 39) pg_itoa(DatumGetInt16(value),tmp); else pg_ltoa(DatumGetInt32(value),tmp); /* * Integers don't need quoting, or transcoding to client char * set. We still quote them if FORCE QUOTE was used, though. */ if (cstate->force_quote_flags[attnum - 1]) CopySendChar(cstate, quotec); CopySendData(cstate, tmp, strlen(tmp)); if (cstate->force_quote_flags[attnum - 1]) CopySendChar(cstate, quotec); } else if (out_functions[attnum -1].fn_oid == 1702) /* numeric_out */ { string = OutputFunctionCall(&out_functions[attnum - 1], value); /* * Numerics don't need quoting, or transcoding to client char * set. We still quote them if FORCE QUOTE was used, though. */ if (cstate->force_quote_flags[attnum - 1]) CopySendChar(cstate, quotec); CopySendData(cstate, string, strlen(string)); if (cstate->force_quote_flags[attnum - 1]) CopySendChar(cstate, quotec); } else { string = OutputFunctionCall(&out_functions[attnum - 1], value); if (cstate->csv_mode) CopyAttributeOutCSV(cstate, string, cstate->force_quote_flags[attnum - 1], list_length(cstate->attnumlist) == 1); else CopyAttributeOutText(cstate, string); } } else { bytea *outputbytes; outputbytes = SendFunctionCall(&out_functions[attnum - 1], value); CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); CopySendData(cstate, VARDATA(outputbytes), VARSIZE(outputbytes) - VARHDRSZ); } } } /* * Finish off the row: write it to the destination, and update the count. * However, if we're in the context of a writable external table, we let * the caller do it - send the data to its local external source (see * external_insert() ). */ if (cstate->copy_dest != COPY_CALLBACK) { CopySendEndOfRow(cstate); } MemoryContextSwitchTo(oldcontext); } /* * error context callback for COPY FROM * * The argument for the error context must be CopyState. */ void CopyFromErrorCallback(void *arg) { CopyState cstate = (CopyState) arg; char curlineno_str[32]; snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT, cstate->cur_lineno); if (cstate->binary) { /* can't usefully display the data */ if (cstate->cur_attname) errcontext("COPY %s, line %s, column %s", cstate->cur_relname, curlineno_str, cstate->cur_attname); else errcontext("COPY %s, line %s", cstate->cur_relname, curlineno_str); } else { if (cstate->cur_attname && cstate->cur_attval) { /* error is relevant to a particular column */ char *attval; attval = limit_printout_length(cstate->cur_attval); errcontext("COPY %s, line %s, column %s: \"%s\"", cstate->cur_relname, curlineno_str, cstate->cur_attname, attval); pfree(attval); } else if (cstate->cur_attname) { /* error is relevant to a particular column, value is NULL */ errcontext("COPY %s, line %s, column %s: null input", cstate->cur_relname, curlineno_str, cstate->cur_attname); } else { /* * Error is relevant to a particular line. * * If line_buf still contains the correct line, and it's already * transcoded, print it. If it's still in a foreign encoding, it's * quite likely that the error is precisely a failure to do * encoding conversion (ie, bad data). We dare not try to convert * it, and at present there's no way to regurgitate it without * conversion. So we have to punt and just report the line number. */ if (cstate->line_buf_valid && (cstate->line_buf_converted || !cstate->need_transcoding)) { char *lineval; lineval = limit_printout_length(cstate->line_buf.data); errcontext("COPY %s, line %s: \"%s\"", cstate->cur_relname, curlineno_str, lineval); pfree(lineval); } else { /* * Here, the line buffer is still in a foreign encoding, * and indeed it's quite likely that the error is precisely * a failure to do encoding conversion (ie, bad data). We * dare not try to convert it, and at present there's no way * to regurgitate it without conversion. So we have to punt * and just report the line number. */ errcontext("COPY %s, line %s", cstate->cur_relname, curlineno_str); } } } } /* * Make sure we don't print an unreasonable amount of COPY data in a message. * * It would seem a lot easier to just use the sprintf "precision" limit to * truncate the string. However, some versions of glibc have a bug/misfeature * that vsnprintf will always fail (return -1) if it is asked to truncate * a string that contains invalid byte sequences for the current encoding. * So, do our own truncation. We return a pstrdup'd copy of the input. */ char * limit_printout_length(const char *str) { #define MAX_COPY_DATA_DISPLAY 100 int slen = strlen(str); int len; char *res; /* Fast path if definitely okay */ if (slen <= MAX_COPY_DATA_DISPLAY) return pstrdup(str); /* Apply encoding-dependent truncation */ len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY); /* * Truncate, and add "..." to show we truncated the input. */ res = (char *) palloc(len + 4); memcpy(res, str, len); strcpy(res + len, "..."); return res; } /* * Allocate memory and initialize a new CopyMultiInsertBuffer for this * ResultRelInfo. */ static CopyMultiInsertBuffer * CopyMultiInsertBufferInit(ResultRelInfo *rri) { CopyMultiInsertBuffer *buffer; buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); buffer->resultRelInfo = rri; buffer->bistate = GetBulkInsertState(); buffer->nused = 0; return buffer; } /* * Make a new buffer for this ResultRelInfo. */ static inline void CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri) { CopyMultiInsertBuffer *buffer; buffer = CopyMultiInsertBufferInit(rri); /* Setup back-link so we can easily find this buffer again */ rri->ri_CopyMultiInsertBuffer = buffer; /* Record that we're tracking this buffer */ miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer); } /* * Initialize an already allocated CopyMultiInsertInfo. * * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up * for that table. */ static void CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyState cstate, EState *estate, CommandId mycid, int ti_options) { miinfo->multiInsertBuffers = NIL; miinfo->bufferedTuples = 0; miinfo->bufferedBytes = 0; miinfo->cstate = cstate; miinfo->estate = estate; miinfo->mycid = mycid; miinfo->ti_options = ti_options; /* * Only setup the buffer when not dealing with a partitioned table. * Buffers for partitioned tables will just be setup when we need to send * tuples their way for the first time. */ if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) CopyMultiInsertInfoSetupBuffer(miinfo, rri); } /* * Returns true if the buffers are full */ static inline bool CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo) { if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES || miinfo->bufferedBytes >= MAX_BUFFERED_BYTES) return true; return false; } /* * Returns true if we have no buffered tuples */ static inline bool CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo) { return miinfo->bufferedTuples == 0; } /* * Write the tuples stored in 'buffer' out to the table. */ static inline void CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer) { MemoryContext oldcontext; int i; uint64 save_cur_lineno; CopyState cstate = miinfo->cstate; EState *estate = miinfo->estate; CommandId mycid = miinfo->mycid; int ti_options = miinfo->ti_options; bool line_buf_valid = cstate->line_buf_valid; int nused = buffer->nused; ResultRelInfo *resultRelInfo = buffer->resultRelInfo; TupleTableSlot **slots = buffer->slots; /* Set es_result_relation_info to the ResultRelInfo we're flushing. */ estate->es_result_relation_info = resultRelInfo; /* * Print error context information correctly, if one of the operations * below fail. */ cstate->line_buf_valid = false; save_cur_lineno = cstate->cur_lineno; /* * table_multi_insert may leak memory, so switch to short-lived memory * context before calling it. */ oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); table_multi_insert(resultRelInfo->ri_RelationDesc, slots, nused, mycid, ti_options, buffer->bistate); MemoryContextSwitchTo(oldcontext); for (i = 0; i < nused; i++) { /* * If there are any indexes, update them for all the inserted tuples, * and run AFTER ROW INSERT triggers. */ if (resultRelInfo->ri_NumIndices > 0) { List *recheckIndexes; cstate->cur_lineno = buffer->linenos[i]; recheckIndexes = ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL, NIL); ExecARInsertTriggers(estate, resultRelInfo, slots[i], recheckIndexes, cstate->transition_capture); list_free(recheckIndexes); } /* * There's no indexes, but see if we need to run AFTER ROW INSERT * triggers anyway. */ else if (resultRelInfo->ri_TrigDesc != NULL && (resultRelInfo->ri_TrigDesc->trig_insert_after_row || resultRelInfo->ri_TrigDesc->trig_insert_new_table)) { cstate->cur_lineno = buffer->linenos[i]; ExecARInsertTriggers(estate, resultRelInfo, slots[i], NIL, cstate->transition_capture); } ExecClearTuple(slots[i]); } /* Mark that all slots are free */ buffer->nused = 0; /* reset cur_lineno and line_buf_valid to what they were */ cstate->line_buf_valid = line_buf_valid; cstate->cur_lineno = save_cur_lineno; } /* * Drop used slots and free member for this buffer. * * The buffer must be flushed before cleanup. */ static inline void CopyMultiInsertBufferCleanup(CopyMultiInsertBuffer *buffer) { int i; /* Ensure buffer was flushed */ Assert(buffer->nused == 0); /* Remove back-link to ourself */ buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL; FreeBulkInsertState(buffer->bistate); /* Since we only create slots on demand, just drop the non-null ones. */ for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) ExecDropSingleTupleTableSlot(buffer->slots[i]); pfree(buffer); } /* * Write out all stored tuples in all buffers out to the tables. * * Once flushed we also trim the tracked buffers list down to size by removing * the buffers created earliest first. * * Callers should pass 'curr_rri' is the ResultRelInfo that's currently being * used. When cleaning up old buffers we'll never remove the one for * 'curr_rri'. */ static inline void CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri) { ListCell *lc; foreach(lc, miinfo->multiInsertBuffers) { CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc); CopyMultiInsertBufferFlush(miinfo, buffer); } miinfo->bufferedTuples = 0; miinfo->bufferedBytes = 0; /* * Trim the list of tracked buffers down if it exceeds the limit. Here we * remove buffers starting with the ones we created first. It seems more * likely that these older ones are less likely to be needed than ones * that were just created. */ while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS) { CopyMultiInsertBuffer *buffer; buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers); /* * We never want to remove the buffer that's currently being used, so * if we happen to find that then move it to the end of the list. */ if (buffer->resultRelInfo == curr_rri) { miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers); miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer); buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers); } CopyMultiInsertBufferCleanup(buffer); miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers); } } /* * Cleanup allocated buffers and free memory */ static inline void CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo) { ListCell *lc; foreach(lc, miinfo->multiInsertBuffers) CopyMultiInsertBufferCleanup(lfirst(lc)); list_free(miinfo->multiInsertBuffers); } /* * Get the next TupleTableSlot that the next tuple should be stored in. * * Callers must ensure that the buffer is not full. */ static inline TupleTableSlot * CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri) { CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; int nused = buffer->nused; Assert(buffer != NULL); Assert(nused < MAX_BUFFERED_TUPLES); if (buffer->slots[nused] == NULL) buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); return buffer->slots[nused]; } /* * Record the previously reserved TupleTableSlot that was reserved by * CopyMultiInsertInfoNextFreeSlot as being consumed. */ static inline void CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, TupleTableSlot *slot, int tuplen, uint64 lineno) { CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; Assert(buffer != NULL); Assert(slot == buffer->slots[buffer->nused]); /* Store the line number so we can properly report any errors later */ buffer->linenos[buffer->nused] = lineno; /* Record this slot as being used */ buffer->nused++; /* Update how many tuples are stored and their size */ miinfo->bufferedTuples++; miinfo->bufferedBytes += tuplen; } /* * Copy FROM file to relation. */ uint64 CopyFrom(CopyState cstate) { ResultRelInfo *resultRelInfo; ResultRelInfo *target_resultRelInfo; ResultRelInfo *prevResultRelInfo = NULL; EState *estate = CreateExecutorState(); /* for ExecConstraints() */ ModifyTableState *mtstate; ExprContext *econtext; TupleTableSlot *singleslot = NULL; MemoryContext oldcontext = CurrentMemoryContext; PartitionTupleRouting *proute = NULL; ErrorContextCallback errcallback; CommandId mycid = GetCurrentCommandId(true); int ti_options = 0; /* start with default options for insert */ BulkInsertState bistate = NULL; CopyInsertMethod insertMethod; CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */ uint64 processed = 0; bool has_before_insert_row_trig; bool has_instead_insert_row_trig; bool leafpart_use_multi_insert = false; CdbCopy *cdbCopy = NULL; bool is_check_distkey; GpDistributionData *distData = NULL; /* distribution data used to compute target seg */ Assert(cstate->rel); /* * The target must be a plain, foreign, or partitioned relation, or have * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only * allowed on views, so we only hint about them in the view case.) */ if (cstate->rel->rd_rel->relkind != RELKIND_RELATION && cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE && cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE && !(cstate->rel->trigdesc && cstate->rel->trigdesc->trig_insert_instead_row)) { if (cstate->rel->rd_rel->relkind == RELKIND_VIEW) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to view \"%s\"", RelationGetRelationName(cstate->rel)), errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger."))); else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to materialized view \"%s\"", RelationGetRelationName(cstate->rel)))); else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to sequence \"%s\"", RelationGetRelationName(cstate->rel)))); else ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to non-table relation \"%s\"", RelationGetRelationName(cstate->rel)))); } /*---------- * Check to see if we can avoid writing WAL * * If archive logging/streaming is not enabled *and* either * - table was created in same transaction as this COPY * - data is being written to relfilenode created in this transaction * then we can skip writing WAL. It's safe because if the transaction * doesn't commit, we'll discard the table (or the new relfilenode file). * If it does commit, we'll have done the table_finish_bulk_insert() at * the bottom of this routine first. * * As mentioned in comments in utils/rel.h, the in-same-transaction test * is not always set correctly, since in rare cases rd_newRelfilenodeSubid * can be cleared before the end of the transaction. The exact case is * when a relation sets a new relfilenode twice in same transaction, yet * the second one fails in an aborted subtransaction, e.g. * * BEGIN; * TRUNCATE t; * SAVEPOINT save; * TRUNCATE t; * ROLLBACK TO save; * COPY ... * * Also, if the target file is new-in-transaction, we assume that checking * FSM for free space is a waste of time, even if we must use WAL because * of archiving. This could possibly be wrong, but it's unlikely. * * The comments for table_tuple_insert and RelationGetBufferForTuple * specify that skipping WAL logging is only safe if we ensure that our * tuples do not go into pages containing tuples from any other * transactions --- but this must be the case if we have a new table or * new relfilenode, so we need no additional work to enforce that. * * We currently don't support this optimization if the COPY target is a * partitioned table as we currently only lazily initialize partition * information when routing the first tuple to the partition. We cannot * know at this stage if we can perform this optimization. It should be * possible to improve on this, but it does mean maintaining heap insert * option flags per partition and setting them when we first open the * partition. * * This optimization is not supported for relation types which do not * have any physical storage, with foreign tables and views using * INSTEAD OF triggers entering in this category. Partitioned tables * are not supported as per the description above. *---------- */ /* createSubid is creation check, newRelfilenodeSubid is truncation check */ if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) && (cstate->rel->rd_createSubid != InvalidSubTransactionId || cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)) { ti_options |= TABLE_INSERT_SKIP_FSM; /* * The optimization to skip WAL has been disabled in GPDB. wal_level * is hardcoded to 'archive' in GPDB, so it wouldn't have any effect * anyway. */ #if 0 if (!XLogIsNeeded()) ti_options |= TABLE_INSERT_SKIP_WAL; #endif } /* * Optimize if new relfilenode was created in this subxact or one of its * committed children and we won't see those rows later as part of an * earlier scan or command. The subxact test ensures that if this subxact * aborts then the frozen rows won't be visible after xact cleanup. Note * that the stronger test of exactly which subtransaction created it is * crucial for correctness of this optimization. The test for an earlier * scan or command tolerates false negatives. FREEZE causes other sessions * to see rows they would not see under MVCC, and a false negative merely * spreads that anomaly to the current session. */ if (cstate->freeze) { /* * We currently disallow COPY FREEZE on partitioned tables. The * reason for this is that we've simply not yet opened the partitions * to determine if the optimization can be applied to them. We could * go and open them all here, but doing so may be quite a costly * overhead for small copies. In any case, we may just end up routing * tuples to a small number of partitions. It seems better just to * raise an ERROR for partitioned tables. */ if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot perform COPY FREEZE on a partitioned table"))); } /* * Tolerate one registration for the benefit of FirstXactSnapshot. * Scan-bearing queries generally create at least two registrations, * though relying on that is fragile, as is ignoring ActiveSnapshot. * Clear CatalogSnapshot to avoid counting its registration. We'll * still detect ongoing catalog scans, each of which separately * registers the snapshot it uses. */ InvalidateCatalogSnapshot(); if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals()) ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), errmsg("cannot perform COPY FREEZE because of prior transaction activity"))); if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() && cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId()) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction"))); ti_options |= TABLE_INSERT_FROZEN; } /* * We need a ResultRelInfo so we can use the regular executor's * index-entry-making machinery. (There used to be a huge amount of code * here that basically duplicated execUtils.c ...) */ resultRelInfo = makeNode(ResultRelInfo); InitResultRelInfo(resultRelInfo, cstate->rel, 1, /* must match rel's position in range_table */ NULL, 0); target_resultRelInfo = resultRelInfo; /* Verify the named relation is a valid target for INSERT */ CheckValidResultRel(resultRelInfo, CMD_INSERT); ExecOpenIndices(resultRelInfo, false); estate->es_result_relations = resultRelInfo; estate->es_num_result_relations = 1; estate->es_result_relation_info = resultRelInfo; ExecInitRangeTable(estate, cstate->range_table); /* * Set up a ModifyTableState so we can let FDW(s) init themselves for * foreign-table result relation(s). */ mtstate = makeNode(ModifyTableState); mtstate->ps.plan = NULL; mtstate->ps.state = estate; mtstate->operation = CMD_INSERT; mtstate->resultRelInfo = estate->es_result_relations; if (resultRelInfo->ri_FdwRoutine != NULL && resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL) resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, resultRelInfo); /* Prepare to catch AFTER triggers. */ AfterTriggerBeginQuery(); /* * If there are any triggers with transition tables on the named relation, * we need to be prepared to capture transition tuples. * * Because partition tuple routing would like to know about whether * transition capture is active, we also set it in mtstate, which is * passed to ExecFindPartition() below. */ cstate->transition_capture = mtstate->mt_transition_capture = MakeTransitionCaptureState(cstate->rel->trigdesc, RelationGetRelid(cstate->rel), CMD_INSERT); /* * If the named relation is a partitioned table, initialize state for * CopyFrom tuple routing. */ if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) proute = ExecSetupPartitionTupleRouting(estate, NULL, cstate->rel); if (cstate->whereClause) cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause), &mtstate->ps); /* * It's generally more efficient to prepare a bunch of tuples for * insertion, and insert them in one table_multi_insert() call, than call * table_tuple_insert() separately for every tuple. However, there are a * number of reasons why we might not be able to do this. These are * explained below. */ if (resultRelInfo->ri_TrigDesc != NULL && (resultRelInfo->ri_TrigDesc->trig_insert_before_row || resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) { /* * Can't support multi-inserts when there are any BEFORE/INSTEAD OF * triggers on the table. Such triggers might query the table we're * inserting into and act differently if the tuples that have already * been processed and prepared for insertion are not there. */ insertMethod = CIM_SINGLE; } else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL && resultRelInfo->ri_TrigDesc->trig_insert_new_table) { /* * For partitioned tables we can't support multi-inserts when there * are any statement level insert triggers. It might be possible to * allow partitioned tables with such triggers in the future, but for * now, CopyMultiInsertInfoFlush expects that any before row insert * and statement level insert triggers are on the same relation. */ insertMethod = CIM_SINGLE; } else if (resultRelInfo->ri_FdwRoutine != NULL || cstate->volatile_defexprs) { /* * Can't support multi-inserts to foreign tables or if there are any * volatile default expressions in the table. Similarly to the * trigger case above, such expressions may query the table we're * inserting into. * * Note: It does not matter if any partitions have any volatile * default expressions as we use the defaults from the target of the * COPY command. */ insertMethod = CIM_SINGLE; } else if (contain_volatile_functions(cstate->whereClause)) { /* * Can't support multi-inserts if there are any volatile function * expressions in WHERE clause. Similarly to the trigger case above, * such expressions may query the table we're inserting into. */ insertMethod = CIM_SINGLE; } else { /* * For partitioned tables, we may still be able to perform bulk * inserts. However, the possibility of this depends on which types * of triggers exist on the partition. We must disable bulk inserts * if the partition is a foreign table or it has any before row insert * or insert instead triggers (same as we checked above for the parent * table). Since the partition's resultRelInfos are initialized only * when we actually need to insert the first tuple into them, we must * have the intermediate insert method of CIM_MULTI_CONDITIONAL to * flag that we must later determine if we can use bulk-inserts for * the partition being inserted into. */ if (proute) insertMethod = CIM_MULTI_CONDITIONAL; else insertMethod = CIM_MULTI; CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate, estate, mycid, ti_options); } /* * If not using batch mode (which allocates slots as needed) set up a * tuple slot too. When inserting into a partitioned table, we also need * one, even if we might batch insert, to read the tuple in the root * partition's form. */ if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL) { singleslot = table_slot_create(resultRelInfo->ri_RelationDesc, &estate->es_tupleTable); bistate = GetBulkInsertState(); } has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_before_row); has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_instead_row); /* * Check BEFORE STATEMENT insertion triggers. It's debatable whether we * should do this for COPY, since it's not really an "INSERT" statement as * such. However, executing these triggers maintains consistency with the * EACH ROW triggers that we already fire on COPY. */ ExecBSInsertTriggers(estate, resultRelInfo); econtext = GetPerTupleExprContext(estate); /* Set up callback to identify error line number */ errcallback.callback = CopyFromErrorCallback; errcallback.arg = (void *) cstate; errcallback.previous = error_context_stack; error_context_stack = &errcallback; /* * Do we need to check the distribution keys? Normally, the QD computes the * target segment and sends the data to the correct segment. We don't need to * verify that in the QE anymore. But in ON SEGMENT, we're reading data * directly from a file, and there's no guarantee on what it contains, so we * need to do the checking in the QE. */ is_check_distkey = (cstate->on_segment && Gp_role == GP_ROLE_EXECUTE && gp_enable_segment_copy_checking) ? true : false; /* * Initialize information about distribution keys, needed to compute target * segment for each row. */ if (cstate->dispatch_mode == COPY_DISPATCH || is_check_distkey) { distData = InitDistributionData(cstate, estate); /* * If this table is distributed randomly, there is nothing to check, * after all. */ if (distData->policy == NULL || distData->policy->nattrs == 0) is_check_distkey = false; } /* Determine which fields we need to parse in the QD. */ if (cstate->dispatch_mode == COPY_DISPATCH) InitCopyFromDispatchSplit(cstate, distData, estate); if (cstate->dispatch_mode == COPY_DISPATCH || cstate->dispatch_mode == COPY_EXECUTOR) { /* * Now split the attnumlist into the parts that are parsed in the QD, and * in QE. */ ListCell *lc; int i = 0; List *qd_attnumlist = NIL; List *qe_attnumlist = NIL; int first_qe_processed_field; first_qe_processed_field = cstate->first_qe_processed_field; foreach(lc, cstate->attnumlist) { int attnum = lfirst_int(lc); if (i < first_qe_processed_field) qd_attnumlist = lappend_int(qd_attnumlist, attnum); else qe_attnumlist = lappend_int(qe_attnumlist, attnum); i++; } cstate->qd_attnumlist = qd_attnumlist; cstate->qe_attnumlist = qe_attnumlist; } if (cstate->dispatch_mode == COPY_DISPATCH) { /* * We are the QD node, and we are receiving rows from client, or * reading them from a file. We are not writing any data locally, * instead, we determine the correct target segment for row, * and forward each to the correct segment. */ /* * pre-allocate buffer for constructing a message. */ cstate->dispatch_msgbuf = makeStringInfo(); enlargeStringInfo(cstate->dispatch_msgbuf, SizeOfCopyFromDispatchRow); /* * prepare to COPY data into segDBs: * - set table partitioning information * - set append only table relevant info for dispatch. * - get the distribution policy for this table. * - build a COPY command to dispatch to segdbs. * - dispatch the modified COPY command to all segment databases. * - prepare cdbhash for hashing on row values. */ cdbCopy = makeCdbCopy(cstate, true); /* * Dispatch the COPY command. * * From this point in the code we need to be extra careful about error * handling. ereport() must not be called until the COPY command sessions * are closed on the executors. Calling ereport() will leave the executors * hanging in COPY state. * * For errors detected by the dispatcher, we save the error message in * cdbcopy_err StringInfo, move on to closing all COPY sessions on the * executors and only then raise an error. We need to make sure to TRY/CATCH * all other errors that may be raised from elsewhere in the backend. All * error during COPY on the executors will be detected only when we end the * COPY session there, so we are fine there. */ elog(DEBUG5, "COPY command sent to segdbs"); cdbCopyStart(cdbCopy, glob_copystmt, cstate->file_encoding); /* * Skip header processing if dummy file get from master for COPY FROM ON * SEGMENT */ if (!cstate->on_segment) { /* * Send QD->QE header to all segments except: * dummy file on master for COPY FROM ON SEGMENT */ if (!cstate->on_segment) SendCopyFromForwardedHeader(cstate, cdbCopy); } } CopyInitDataParser(cstate); /* * GPDB_12_MERGE_FIXME: We still have to perform the initialization * here for AO relations. It is preferreable by all means to perform the * initialization via the table AP API, however it simply does not * provide a good enough interface for this yet. */ if (RelationIsAoRows(resultRelInfo->ri_RelationDesc)) appendonly_dml_init(resultRelInfo->ri_RelationDesc, CMD_INSERT); else if (RelationIsAoCols(resultRelInfo->ri_RelationDesc)) aoco_dml_init(resultRelInfo->ri_RelationDesc, CMD_INSERT); for (;;) { TupleTableSlot *myslot; bool skip_tuple; unsigned int target_seg = 0; /* result segment of cdbhash */ CHECK_FOR_INTERRUPTS(); /* * Reset the per-tuple exprcontext. We do this after every tuple, to * clean-up after expression evaluations etc. */ ResetPerTupleExprContext(estate); /* select slot to (initially) load row into */ if (insertMethod == CIM_SINGLE || proute) { myslot = singleslot; Assert(myslot != NULL); } else { Assert(resultRelInfo == target_resultRelInfo); Assert(insertMethod == CIM_MULTI); myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo, resultRelInfo); } /* * Switch to per-tuple context before calling NextCopyFrom, which does * evaluate default expressions etc. and requires per-tuple context. */ MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); ExecClearTuple(myslot); if (cstate->dispatch_mode == COPY_EXECUTOR) { if (!NextCopyFromExecute(cstate, econtext, myslot->tts_values, myslot->tts_isnull)) break; /* * NextCopyFromExecute set up estate->es_result_relation_info, * and stored the tuple in the correct slot. */ resultRelInfo = estate->es_result_relation_info; } else { /* Directly store the values/nulls array in the slot */ if (cstate->dispatch_mode == COPY_DISPATCH) { if (!NextCopyFromDispatch(cstate, econtext, myslot->tts_values, myslot->tts_isnull)) break; } else { if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull)) break; } } ExecStoreVirtualTuple(myslot); /* * Constraints and where clause might reference the tableoid column, * so (re-)initialize tts_tableOid before evaluating them. */ myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc); /* Triggers and stuff need to be invoked in query context. */ MemoryContextSwitchTo(oldcontext); if (cstate->whereClause) { econtext->ecxt_scantuple = myslot; /* Skip items that don't match COPY's WHERE clause */ if (!ExecQual(cstate->qualexpr, econtext)) continue; } /* Determine the partition to insert the tuple into */ if (proute && cstate->dispatch_mode != COPY_DISPATCH) { TupleConversionMap *map; bool got_error = false; /* * Attempt to find a partition suitable for this tuple. * ExecFindPartition() will raise an error if none can be found or * if the found partition is not suitable for INSERTs. */ PG_TRY(); { resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo, proute, myslot, estate); } PG_CATCH(); { /* after all the prep work let cdbsreh do the real work */ HandleCopyError(cstate); got_error = true; MemoryContextSwitchTo(oldcontext); } PG_END_TRY(); if (got_error) continue; if (prevResultRelInfo != resultRelInfo) { /* Determine which triggers exist on this partition */ has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_before_row); has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_instead_row); /* * Disable multi-inserts when the partition has BEFORE/INSTEAD * OF triggers, or if the partition is a foreign partition. */ leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL && !has_before_insert_row_trig && !has_instead_insert_row_trig && resultRelInfo->ri_FdwRoutine == NULL; /* Set the multi-insert buffer to use for this partition. */ if (leafpart_use_multi_insert) { if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL) CopyMultiInsertInfoSetupBuffer(&multiInsertInfo, resultRelInfo); } else if (insertMethod == CIM_MULTI_CONDITIONAL && !CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) { /* * Flush pending inserts if this partition can't use * batching, so rows are visible to triggers etc. */ CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo); } if (bistate != NULL) ReleaseBulkInsertStatePin(bistate); prevResultRelInfo = resultRelInfo; } /* * For ExecInsertIndexTuples() to work on the partition's indexes */ estate->es_result_relation_info = resultRelInfo; /* * If we're capturing transition tuples, we might need to convert * from the partition rowtype to root rowtype. */ if (cstate->transition_capture != NULL) { if (has_before_insert_row_trig) { /* * If there are any BEFORE triggers on the partition, * we'll have to be ready to convert their result back to * tuplestore format. */ cstate->transition_capture->tcs_original_insert_tuple = NULL; cstate->transition_capture->tcs_map = resultRelInfo->ri_PartitionInfo->pi_PartitionToRootMap; } else { /* * Otherwise, just remember the original unconverted * tuple, to avoid a needless round trip conversion. */ cstate->transition_capture->tcs_original_insert_tuple = myslot; cstate->transition_capture->tcs_map = NULL; } } /* * We might need to convert from the root rowtype to the partition * rowtype. */ map = resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap; if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert) { /* non batch insert */ if (map != NULL) { TupleTableSlot *new_slot; new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot; myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot); } } else { /* * Prepare to queue up tuple for later batch insert into * current partition. */ TupleTableSlot *batchslot; /* no other path available for partitioned table */ Assert(insertMethod == CIM_MULTI_CONDITIONAL); batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo, resultRelInfo); if (map != NULL) myslot = execute_attr_map_slot(map->attrMap, myslot, batchslot); else { /* * This looks more expensive than it is (Believe me, I * optimized it away. Twice.). The input is in virtual * form, and we'll materialize the slot below - for most * slot types the copy performs the work materialization * would later require anyway. */ ExecCopySlot(batchslot, myslot); myslot = batchslot; } } /* ensure that triggers etc see the right relation */ myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); } skip_tuple = false; /* * Compute which segment this row belongs to. */ if (cstate->dispatch_mode == COPY_DISPATCH) { /* In QD, compute the target segment to send this row to. */ target_seg = GetTargetSeg(distData, myslot); } else if (is_check_distkey) { /* * In COPY FROM ON SEGMENT, check the distribution key in the * QE. */ if (distData->policy->nattrs != 0) { target_seg = GetTargetSeg(distData, myslot); if (GpIdentity.segindex != target_seg) { PG_TRY(); { ereport(ERROR, (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION), errmsg("value of distribution key doesn't belong to segment with ID %d, it belongs to segment with ID %d", GpIdentity.segindex, target_seg))); } PG_CATCH(); { HandleCopyError(cstate); } PG_END_TRY(); } } } if (cstate->dispatch_mode == COPY_DISPATCH) { bool send_to_all = distData && GpPolicyIsReplicated(distData->policy); /* in the QD, forward the row to the correct segment(s). */ SendCopyFromForwardedTuple(cstate, cdbCopy, send_to_all, send_to_all ? 0 : target_seg, resultRelInfo->ri_RelationDesc, cstate->cur_lineno, cstate->line_buf.data, cstate->line_buf.len, myslot->tts_values, myslot->tts_isnull); skip_tuple = true; processed++; } /* BEFORE ROW INSERT Triggers */ if (has_before_insert_row_trig) { /* * If the tuple was dispatched to segments, do not execute trigger * on master. */ if (!skip_tuple && !ExecBRInsertTriggers(estate, resultRelInfo, myslot)) skip_tuple = true; /* "do nothing" */ } if (!skip_tuple) { /* * If there is an INSTEAD OF INSERT ROW trigger, let it handle the * tuple. Otherwise, proceed with inserting the tuple into the * table or foreign table. */ if (has_instead_insert_row_trig) { ExecIRInsertTriggers(estate, resultRelInfo, myslot); } else { /* Compute stored generated columns */ if (resultRelInfo->ri_RelationDesc->rd_att->constr && resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored) ExecComputeStoredGenerated(estate, myslot); /* * If the target is a plain table, check the constraints of * the tuple. */ if (resultRelInfo->ri_FdwRoutine == NULL && resultRelInfo->ri_RelationDesc->rd_att->constr) ExecConstraints(resultRelInfo, myslot, estate); /* * Also check the tuple against the partition constraint, if * there is one; except that if we got here via tuple-routing, * we don't need to if there's no BR trigger defined on the * partition. */ if (resultRelInfo->ri_PartitionCheck && (proute == NULL || has_before_insert_row_trig)) ExecPartitionCheck(resultRelInfo, myslot, estate, true); /* Store the slot in the multi-insert buffer, when enabled. */ if (insertMethod == CIM_MULTI || leafpart_use_multi_insert) { /* * The slot previously might point into the per-tuple * context. For batching it needs to be longer lived. */ ExecMaterializeSlot(myslot); /* Add this tuple to the tuple buffer */ CopyMultiInsertInfoStore(&multiInsertInfo, resultRelInfo, myslot, cstate->line_buf.len, cstate->cur_lineno); /* * If enough inserts have queued up, then flush all * buffers out to their tables. */ if (CopyMultiInsertInfoIsFull(&multiInsertInfo)) CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo); } else { List *recheckIndexes = NIL; /* OK, store the tuple */ if (resultRelInfo->ri_FdwRoutine != NULL) { myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate, resultRelInfo, myslot, NULL); if (myslot == NULL) /* "do nothing" */ continue; /* next tuple please */ /* * AFTER ROW Triggers might reference the tableoid * column, so (re-)initialize tts_tableOid before * evaluating them. */ myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); } else { /* OK, store the tuple and create index entries for it */ table_tuple_insert(resultRelInfo->ri_RelationDesc, myslot, mycid, ti_options, bistate); if (resultRelInfo->ri_NumIndices > 0) recheckIndexes = ExecInsertIndexTuples(myslot, estate, false, NULL, NIL); } /* AFTER ROW INSERT Triggers */ ExecARInsertTriggers(estate, resultRelInfo, myslot, recheckIndexes, cstate->transition_capture); list_free(recheckIndexes); } } /* * We count only tuples not suppressed by a BEFORE INSERT trigger * or FDW; this is the same definition used by nodeModifyTable.c * for counting tuples inserted by an INSERT command. * * MPP: incrementing this counter here only matters for utility * mode. in dispatch mode only the dispatcher COPY collects row * count, so this counter is meaningless. */ processed++; if (cstate->cdbsreh) cstate->cdbsreh->processed++; } } /* * After processed data from QD, which is empty and just for workflow, now * to process the data on segment, only one shot if cstate->on_segment && * Gp_role == GP_ROLE_DISPATCH */ if (cstate->on_segment && Gp_role == GP_ROLE_EXECUTE) { CopyInitDataParser(cstate); } elog(DEBUG1, "Segment %u, Copied %lu rows.", GpIdentity.segindex, processed); /* Flush any remaining buffered tuples */ if (insertMethod != CIM_SINGLE) { if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) CopyMultiInsertInfoFlush(&multiInsertInfo, NULL); /* Tear down the multi-insert buffer data */ CopyMultiInsertInfoCleanup(&multiInsertInfo); } /* Done, clean up */ error_context_stack = errcallback.previous; if (bistate != NULL) FreeBulkInsertState(bistate); MemoryContextSwitchTo(oldcontext); /* * Done reading input data and sending it off to the segment * databases Now we would like to end the copy command on * all segment databases across the cluster. */ if (cstate->dispatch_mode == COPY_DISPATCH) { int64 total_completed_from_qes; int64 total_rejected_from_qes; cdbCopyEnd(cdbCopy, &total_completed_from_qes, &total_rejected_from_qes); if (cstate->cdbsreh) { /* emit a NOTICE with number of rejected rows */ uint64 total_rejected = 0; uint64 total_rejected_from_qd = cstate->cdbsreh->rejectcount; /* * If error log has been requested, then we send the row to the segment * so that it can be written in the error log file. The segment process * counts it again as a rejected row. So we ignore the reject count * from the master and only consider the reject count from segments. */ if (IS_LOG_TO_FILE(cstate->cdbsreh->logerrors)) total_rejected_from_qd = 0; total_rejected = total_rejected_from_qd + total_rejected_from_qes; ReportSrehResults(cstate->cdbsreh, total_rejected); } } /* * In the old protocol, tell pqcomm that we can process normal protocol * messages again. */ if (cstate->copy_dest == COPY_OLD_FE) pq_endmsgread(); /* Execute AFTER STATEMENT insertion triggers */ ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture); /* Handle queued AFTER triggers */ AfterTriggerEndQuery(estate); /* * If SREH and in executor mode send the number of rejected * rows to the client (QD COPY). * If COPY ... FROM/TO ... ON SEGMENT, then we need to send the number of * completed rows as well. */ if ((cstate->errMode != ALL_OR_NOTHING && cstate->dispatch_mode == COPY_EXECUTOR) || cstate->on_segment) { SendNumRows((cstate->errMode != ALL_OR_NOTHING) ? cstate->cdbsreh->rejectcount : 0, cstate->on_segment ? processed : 0); } ExecResetTupleTable(estate->es_tupleTable, false); /* Allow the FDW to shut down */ if (target_resultRelInfo->ri_FdwRoutine != NULL && target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL) target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate, target_resultRelInfo); ExecCloseIndices(target_resultRelInfo); /* Close all the partitioned tables, leaf partitions, and their indices */ if (proute) ExecCleanupTupleRouting(mtstate, proute); /* Close any trigger target relations */ ExecCleanUpTriggerState(estate); FreeDistributionData(distData); FreeExecutorState(estate); table_finish_bulk_insert(cstate->rel, ti_options); return processed; } /* * Setup to read tuples from a file for COPY FROM. * * 'rel': Used as a template for the tuples * 'filename': Name of server-local file to read * 'attnamelist': List of char *, columns to include. NIL selects all cols. * 'options': List of DefElem. See copy_opt_item in gram.y for selections. * * Returns a CopyState, to be passed to NextCopyFrom and related functions. */ CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename, bool is_program, copy_data_source_cb data_source_cb, void *data_source_cb_extra, List *attnamelist, List *options) { CopyState cstate; TupleDesc tupDesc; AttrNumber num_phys_attrs, num_defaults; FmgrInfo *in_functions; Oid *typioparams; int attnum; Oid in_func_oid; int *defmap; ExprState **defexprs; MemoryContext oldcontext; bool volatile_defexprs; cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options, NULL); oldcontext = MemoryContextSwitchTo(cstate->copycontext); /* * Determine the mode */ if (cstate->on_segment || data_source_cb) cstate->dispatch_mode = COPY_DIRECT; else if (Gp_role == GP_ROLE_DISPATCH && cstate->rel && cstate->rel->rd_cdbpolicy && cstate->rel->rd_cdbpolicy->ptype != POLICYTYPE_ENTRY) cstate->dispatch_mode = COPY_DISPATCH; else if (Gp_role == GP_ROLE_EXECUTE) cstate->dispatch_mode = COPY_EXECUTOR; else cstate->dispatch_mode = COPY_DIRECT; /* Initialize state variables */ cstate->reached_eof = false; // cstate->eol_type = EOL_UNKNOWN; /* GPDB: don't overwrite value set in ProcessCopyOptions */ cstate->cur_relname = RelationGetRelationName(cstate->rel); cstate->cur_lineno = 0; cstate->cur_attname = NULL; cstate->cur_attval = NULL; /* Set up variables to avoid per-attribute overhead. */ initStringInfo(&cstate->attribute_buf); initStringInfo(&cstate->line_buf); cstate->line_buf_converted = false; cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); cstate->raw_buf_index = cstate->raw_buf_len = 0; /* Assign range table, we'll need it in CopyFrom. */ if (pstate) cstate->range_table = pstate->p_rtable; tupDesc = RelationGetDescr(cstate->rel); num_phys_attrs = tupDesc->natts; num_defaults = 0; volatile_defexprs = false; /* * Pick up the required catalog information for each attribute in the * relation, including the input function, the element type (to pass to * the input function), and info about defaults and constraints. (Which * input function we use depends on text/binary format choice.) */ in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); defmap = (int *) palloc(num_phys_attrs * sizeof(int)); defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); for (attnum = 1; attnum <= num_phys_attrs; attnum++) { Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1); /* We don't need info for dropped attributes */ if (att->attisdropped) continue; /* Fetch the input function and typioparam info */ if (cstate->binary) getTypeBinaryInputInfo(att->atttypid, &in_func_oid, &typioparams[attnum - 1]); else getTypeInputInfo(att->atttypid, &in_func_oid, &typioparams[attnum - 1]); fmgr_info(in_func_oid, &in_functions[attnum - 1]); /* TODO: is force quote array necessary for default conversion */ /* Get default info if needed */ if (!list_member_int(cstate->attnumlist, attnum) && !att->attgenerated) { /* attribute is NOT to be copied from input */ /* use default value if one exists */ Expr *defexpr = (Expr *) build_column_default(cstate->rel, attnum); if (defexpr != NULL) { /* Run the expression through planner */ defexpr = expression_planner(defexpr); /* Initialize executable expression in copycontext */ defexprs[num_defaults] = ExecInitExpr(defexpr, NULL); defmap[num_defaults] = attnum - 1; num_defaults++; /* * If a default expression looks at the table being loaded, * then it could give the wrong answer when using * multi-insert. Since database access can be dynamic this is * hard to test for exactly, so we use the much wider test of * whether the default expression is volatile. We allow for * the special case of when the default expression is the * nextval() of a sequence which in this specific case is * known to be safe for use with the multi-insert * optimization. Hence we use this special case function * checker rather than the standard check for * contain_volatile_functions(). */ if (!volatile_defexprs) volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr); } } } /* We keep those variables in cstate. */ cstate->in_functions = in_functions; cstate->typioparams = typioparams; cstate->defmap = defmap; cstate->defexprs = defexprs; cstate->volatile_defexprs = volatile_defexprs; cstate->num_defaults = num_defaults; cstate->is_program = is_program; bool pipe = (filename == NULL || cstate->dispatch_mode == COPY_EXECUTOR); if (cstate->on_segment && Gp_role == GP_ROLE_DISPATCH) { /* open nothing */ if (filename == NULL) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("STDIN is not supported by 'COPY ON SEGMENT'"))); } else if (data_source_cb) { cstate->copy_dest = COPY_CALLBACK; cstate->data_source_cb = data_source_cb; cstate->data_source_cb_extra = data_source_cb_extra; } else if (pipe) { Assert(!is_program || cstate->dispatch_mode == COPY_EXECUTOR); /* the grammar does not allow this */ if (whereToSendOutput == DestRemote) ReceiveCopyBegin(cstate); else cstate->copy_file = stdin; } else { cstate->filename = pstrdup(filename); if (cstate->on_segment) MangleCopyFileName(cstate); if (cstate->is_program) { cstate->program_pipes = open_program_pipes(cstate->filename, false); cstate->copy_file = fdopen(cstate->program_pipes->pipes[0], PG_BINARY_R); if (cstate->copy_file == NULL) ereport(ERROR, (errcode_for_file_access(), errmsg("could not execute command \"%s\": %m", cstate->filename))); } else { struct stat st; char *filename = cstate->filename; cstate->copy_file = AllocateFile(filename, PG_BINARY_R); if (cstate->copy_file == NULL) { /* copy errno because ereport subfunctions might change it */ int save_errno = errno; ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\" for reading: %m", cstate->filename), (save_errno == ENOENT || save_errno == EACCES) ? errhint("COPY FROM instructs the PostgreSQL server process to read a file. " "You may want a client-side facility such as psql's \\copy.") : 0)); } // Increase buffer size to improve performance (cmcdevitt) setvbuf(cstate->copy_file, NULL, _IOFBF, 393216); // 384 Kbytes if (fstat(fileno(cstate->copy_file), &st)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", cstate->filename))); if (S_ISDIR(st.st_mode)) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is a directory", filename))); } } if (cstate->on_segment && Gp_role == GP_ROLE_DISPATCH) { /* nothing to do */ } else if (cstate->dispatch_mode == COPY_EXECUTOR && cstate->copy_dest != COPY_CALLBACK) { /* Read special header from QD */ static const size_t sigsize = sizeof(QDtoQESignature); char readSig[sizeof(QDtoQESignature)]; copy_from_dispatch_header header_frame; if (CopyGetData(cstate, &readSig, sigsize) != sigsize || memcmp(readSig, QDtoQESignature, sigsize) != 0) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("QD->QE COPY communication signature not recognized"))); if (CopyGetData(cstate, &header_frame, sizeof(header_frame)) != sizeof(header_frame)) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid QD->QD COPY communication header"))); cstate->first_qe_processed_field = header_frame.first_qe_processed_field; } else if (cstate->binary) { /* Read and verify binary header */ char readSig[11]; int32 tmp; /* Signature */ if (CopyGetData(cstate, readSig, 11) != 11 || memcmp(readSig, BinarySignature, 11) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("COPY file signature not recognized"))); /* Flags field */ if (!CopyGetInt32(cstate, &tmp)) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (missing flags)"))); if ((tmp & (1 << 16)) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (WITH OIDS)"))); tmp &= ~(1 << 16); if ((tmp >> 16) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unrecognized critical flags in COPY file header"))); /* Header extension length */ if (!CopyGetInt32(cstate, &tmp) || tmp < 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (missing length)"))); /* Skip extension header, if present */ while (tmp-- > 0) { if (CopyGetData(cstate, readSig, 1) != 1) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (wrong length)"))); } } /* create workspace for CopyReadAttributes results */ if (!cstate->binary) { AttrNumber attr_count = list_length(cstate->attnumlist); cstate->max_fields = attr_count; cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); } MemoryContextSwitchTo(oldcontext); return cstate; } /* * Read raw fields in the next line for COPY FROM in text or csv mode. * Return false if no more lines. * * An internal temporary buffer is returned via 'fields'. It is valid until * the next call of the function. Since the function returns all raw fields * in the input file, 'nfields' could be different from the number of columns * in the relation. * * NOTE: force_not_null option are not applied to the returned fields. */ bool NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields) { return NextCopyFromRawFieldsX(cstate, fields, nfields, -1); } static bool NextCopyFromRawFieldsX(CopyState cstate, char ***fields, int *nfields, int stop_processing_at_field) { int fldct; bool done; /* only available for text or csv input */ Assert(!cstate->binary); /* on input just throw the header line away */ if (cstate->cur_lineno == 0 && cstate->header_line) { cstate->cur_lineno++; if (CopyReadLine(cstate)) return false; /* done */ } cstate->cur_lineno++; /* Actually read the line into memory here */ done = CopyReadLine(cstate); /* * EOF at start of line means we're done. If we see EOF after some * characters, we act as though it was newline followed by EOF, ie, * process the line and then exit loop on next iteration. */ if (done && cstate->line_buf.len == 0) return false; /* Parse the line into de-escaped field values */ if (cstate->csv_mode) fldct = CopyReadAttributesCSV(cstate, stop_processing_at_field); else fldct = CopyReadAttributesText(cstate, stop_processing_at_field); *fields = cstate->raw_fields; *nfields = fldct; return true; } bool NextCopyFrom(CopyState cstate, ExprContext *econtext, Datum *values, bool *nulls) { if (!cstate->cdbsreh) return NextCopyFromX(cstate, econtext, values, nulls); else { MemoryContext oldcontext = CurrentMemoryContext; for (;;) { bool got_error = false; bool result = false; PG_TRY(); { result = NextCopyFromX(cstate, econtext, values, nulls); } PG_CATCH(); { HandleCopyError(cstate); /* cdbsreh->processed is updated inside here */ got_error = true; MemoryContextSwitchTo(oldcontext); } PG_END_TRY(); if (result) cstate->cdbsreh->processed++; if (!got_error) return result; } } } /* * A data error happened. This code block will always be inside a PG_CATCH() * block right when a higher stack level produced an error. We handle the error * by checking which error mode is set (SREH or all-or-nothing) and do the right * thing accordingly. Note that we MUST have this code in a macro (as opposed * to a function) as elog_dismiss() has to be inlined with PG_CATCH in order to * access local error state variables. * * changing me? take a look at FILEAM_HANDLE_ERROR in fileam.c as well. */ static void HandleCopyError(CopyState cstate) { if (cstate->errMode == ALL_OR_NOTHING) { /* re-throw error and abort */ PG_RE_THROW(); } /* SREH must only handle data errors. all other errors must not be caught */ if (ERRCODE_TO_CATEGORY(elog_geterrcode()) != ERRCODE_DATA_EXCEPTION) { /* re-throw error and abort */ PG_RE_THROW(); } else { /* SREH - release error state and handle error */ MemoryContext oldcontext; ErrorData *edata; char *errormsg; CdbSreh *cdbsreh = cstate->cdbsreh; cdbsreh->processed++; oldcontext = MemoryContextSwitchTo(cstate->cdbsreh->badrowcontext); /* save a copy of the error info */ edata = CopyErrorData(); FlushErrorState(); /* * set the error message. Use original msg and add column name if available. * We do this even if we're not logging the errors, because * ErrorIfRejectLimit() below will use this information in the error message, * if the error count is reached. */ cdbsreh->rawdata = cstate->line_buf.data; cdbsreh->is_server_enc = cstate->line_buf_converted; cdbsreh->linenumber = cstate->cur_lineno; if (cstate->cur_attname) { errormsg = psprintf("%s, column %s", edata->message, cstate->cur_attname); } else { errormsg = edata->message; } cstate->cdbsreh->errmsg = errormsg; if (IS_LOG_TO_FILE(cstate->cdbsreh->logerrors)) { if (Gp_role == GP_ROLE_DISPATCH && !cstate->on_segment) { cstate->cdbsreh->rejectcount++; SendCopyFromForwardedError(cstate, cstate->cdbCopy, errormsg); } else { /* after all the prep work let cdbsreh do the real work */ if (Gp_role == GP_ROLE_DISPATCH) { cstate->cdbsreh->rejectcount++; } else { HandleSingleRowError(cstate->cdbsreh); } } } else cstate->cdbsreh->rejectcount++; ErrorIfRejectLimitReached(cstate->cdbsreh); MemoryContextSwitchTo(oldcontext); MemoryContextReset(cstate->cdbsreh->badrowcontext); } } /* * Read next tuple from file for COPY FROM. Return false if no more tuples. * * 'econtext' is used to evaluate default expression for each columns not * read from the file. It can be NULL when no default values are used, i.e. * when all columns are read from the file. * * 'values' and 'nulls' arrays must be the same length as columns of the * relation passed to BeginCopyFrom. This function fills the arrays. */ bool NextCopyFromX(CopyState cstate, ExprContext *econtext, Datum *values, bool *nulls) { TupleDesc tupDesc; AttrNumber num_phys_attrs, attr_count, num_defaults = cstate->num_defaults; FmgrInfo *in_functions = cstate->in_functions; Oid *typioparams = cstate->typioparams; int i; int *defmap = cstate->defmap; ExprState **defexprs = cstate->defexprs; List *attnumlist; int stop_processing_at_field; /* * Figure out what fields we're going to process in this process. * * In the QD, set 'stop_processing_at_field' so that we only those * fields that are needed in the QD. */ switch (cstate->dispatch_mode) { case COPY_DIRECT: stop_processing_at_field = -1; attnumlist = cstate->attnumlist; break; case COPY_DISPATCH: stop_processing_at_field = cstate->first_qe_processed_field; attnumlist = cstate->qd_attnumlist; break; case COPY_EXECUTOR: stop_processing_at_field = -1; attnumlist = cstate->qe_attnumlist; break; default: elog(ERROR, "unexpected COPY dispatch mode %d", cstate->dispatch_mode); } tupDesc = RelationGetDescr(cstate->rel); num_phys_attrs = tupDesc->natts; attr_count = list_length(attnumlist); /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); if (!cstate->binary) { char **field_strings; ListCell *cur; int fldct; int fieldno; char *string; /* read raw fields in the next line */ if (cstate->dispatch_mode != COPY_EXECUTOR) { if (!NextCopyFromRawFieldsX(cstate, &field_strings, &fldct, stop_processing_at_field)) return false; } else { /* * We have received the raw line from the QD, and we just * need to split it into raw fields. */ if (cstate->stopped_processing_at_delim && cstate->line_buf.cursor <= cstate->line_buf.len) { if (cstate->csv_mode) fldct = CopyReadAttributesCSV(cstate, -1); else fldct = CopyReadAttributesText(cstate, -1); } else fldct = 0; field_strings = cstate->raw_fields; } /* check for overflowing fields */ if (attr_count > 0 && fldct > attr_count) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("extra data after last expected column"))); /* * A completely empty line is not allowed with FILL MISSING FIELDS. Without * FILL MISSING FIELDS, it's almost surely an error, but not always: * a table with a single text column, for example, needs to accept empty * lines. */ if (cstate->line_buf.len == 0 && cstate->fill_missing && list_length(cstate->attnumlist) > 1) { ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("missing data for column \"%s\", found empty data line", NameStr(TupleDescAttr(tupDesc, 1)->attname)))); } fieldno = 0; /* Loop to read the user attributes on the line. */ foreach(cur, attnumlist) { int attnum = lfirst_int(cur); int m = attnum - 1; Form_pg_attribute att = TupleDescAttr(tupDesc, m); if (fieldno >= fldct) { /* * Some attributes are missing. In FILL MISSING FIELDS mode, * treat them as NULLs. */ if (!cstate->fill_missing) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("missing data for column \"%s\"", NameStr(att->attname)))); fieldno++; string = NULL; } else string = field_strings[fieldno++]; if (cstate->convert_select_flags && !cstate->convert_select_flags[m]) { /* ignore input field, leaving column as NULL */ continue; } if (cstate->csv_mode) { if (string == NULL && cstate->force_notnull_flags[m]) { /* * FORCE_NOT_NULL option is set and column is NULL - * convert it to the NULL string. */ string = cstate->null_print; } else if (string != NULL && cstate->force_null_flags[m] && strcmp(string, cstate->null_print) == 0) { /* * FORCE_NULL option is set and column matches the NULL * string. It must have been quoted, or otherwise the * string would already have been set to NULL. Convert it * to NULL as specified. */ string = NULL; } } cstate->cur_attname = NameStr(att->attname); cstate->cur_attval = string; values[m] = InputFunctionCall(&in_functions[m], string, typioparams[m], att->atttypmod); if (string != NULL) nulls[m] = false; cstate->cur_attname = NULL; cstate->cur_attval = NULL; } Assert(fieldno == attr_count); } else if (attr_count) { /* binary */ int16 fld_count; ListCell *cur; cstate->cur_lineno++; if (!CopyGetInt16(cstate, &fld_count)) { /* EOF detected (end of file, or protocol-level EOF) */ return false; } if (fld_count == -1) { /* * Received EOF marker. In a V3-protocol copy, wait for the * protocol-level EOF, and complain if it doesn't come * immediately. This ensures that we correctly handle CopyFail, * if client chooses to send that now. * * Note that we MUST NOT try to read more data in an old-protocol * copy, since there is no protocol-level EOF marker then. We * could go either way for copy from file, but choose to throw * error if there's data after the EOF marker, for consistency * with the new-protocol case. */ char dummy; if (cstate->copy_dest != COPY_OLD_FE && CopyGetData(cstate, &dummy, 1) > 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("received copy data after EOF marker"))); return false; } if (fld_count != attr_count) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("row field count is %d, expected %d", (int) fld_count, attr_count))); i = 0; foreach(cur, attnumlist) { int attnum = lfirst_int(cur); int m = attnum - 1; Form_pg_attribute att = TupleDescAttr(tupDesc, m); cstate->cur_attname = NameStr(att->attname); i++; values[m] = CopyReadBinaryAttribute(cstate, i, &in_functions[m], typioparams[m], att->atttypmod, &nulls[m]); cstate->cur_attname = NULL; } } /* * Now compute and insert any defaults available for the columns not * provided by the input data. Anything not processed here or above will * remain NULL. * * GPDB: The defaults are always computed in the QD, and are included * in the QD->QE stream as pre-computed Datums. Funny indentation, to * keep the indentation of the code inside the same as in upstream. * (We could improve this, and compute immutable defaults that don't * affect which segment the row belongs to, in the QE.) */ if (cstate->dispatch_mode != COPY_EXECUTOR) { for (i = 0; i < num_defaults; i++) { /* * The caller must supply econtext and have switched into the * per-tuple memory context in it. */ Assert(econtext != NULL); Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext, &nulls[defmap[i]]); } } return true; } /* * Like NextCopyFrom(), but used in the QD, when we want to parse the * input line only partially. We only want to parse enough fields needed * to determine which target segment to forward the row to. * * (The logic is actually within NextCopyFrom(). This is a separate * function just for symmetry with NextCopyFromExecute()). */ static bool NextCopyFromDispatch(CopyState cstate, ExprContext *econtext, Datum *values, bool *nulls) { return NextCopyFrom(cstate, econtext, values, nulls); } /* * Like NextCopyFrom(), but used in the QE, when we're reading pre-processed * rows from the QD. */ static bool NextCopyFromExecute(CopyState cstate, ExprContext *econtext, Datum *values, bool *nulls) { TupleDesc tupDesc; AttrNumber num_phys_attrs, attr_count; FormData_pg_attribute *attr; int i; copy_from_dispatch_row frame; int r; bool got_error; tupDesc = RelationGetDescr(cstate->rel); num_phys_attrs = tupDesc->natts; attr_count = list_length(cstate->attnumlist); /* * The code below reads the 'copy_from_dispatch_row' struct, and only * then checks if it was actually a 'copy_from_dispatch_error' struct. * That only works when 'copy_from_dispatch_error' is larger than *'copy_from_dispatch_row'. */ StaticAssertStmt(SizeOfCopyFromDispatchError >= SizeOfCopyFromDispatchRow, "copy_from_dispatch_error must be larger than copy_from_dispatch_row"); /* * If we encounter an error while parsing the row (or we receive a row from * the QD that was already marked as an erroneous row), we loop back here * until we get a good row. */ retry: got_error = false; r = CopyGetData(cstate, (char *) &frame, SizeOfCopyFromDispatchRow); if (r == 0) return false; if (r != SizeOfCopyFromDispatchRow) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); if (frame.lineno == -1) { HandleQDErrorFrame(cstate, (char *) &frame, SizeOfCopyFromDispatchRow); goto retry; } /* Prepare for parsing the input line */ attr = tupDesc->attrs; num_phys_attrs = tupDesc->natts; /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); /* check for overflowing fields */ if (frame.fld_count < 0 || frame.fld_count > num_phys_attrs) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("extra data after last expected column"))); /* * Read the input line into 'line_buf'. */ resetStringInfo(&cstate->line_buf); enlargeStringInfo(&cstate->line_buf, frame.line_len); if (CopyGetData(cstate, cstate->line_buf.data, frame.line_len) != frame.line_len) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); cstate->line_buf.data[frame.line_len] = '\0'; cstate->line_buf.len = frame.line_len; cstate->line_buf.cursor = frame.residual_off; cstate->line_buf_valid = true; cstate->line_buf_converted = true; cstate->cur_lineno = frame.lineno; cstate->stopped_processing_at_delim = frame.delim_seen_at_end; /* * Parse any fields from the input line that were not processed in the * QD already. */ if (!cstate->cdbsreh) { if (!NextCopyFromX(cstate, econtext, values, nulls)) { ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); } } else { MemoryContext oldcontext = CurrentMemoryContext; bool result; PG_TRY(); { result = NextCopyFromX(cstate, econtext, values, nulls); if (!result) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); } PG_CATCH(); { HandleCopyError(cstate); got_error = true; MemoryContextSwitchTo(oldcontext); } PG_END_TRY(); } /* * Read any attributes that were processed in the QD already. The attribute * numbers in the message are already in terms of the target partition, so * we do this after remapping and switching to the partition slot. */ for (i = 0; i < frame.fld_count; i++) { int16 attnum; int m; int32 len; Datum value; if (CopyGetData(cstate, &attnum, sizeof(attnum)) != sizeof(attnum)) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); if (attnum < 1 || attnum > num_phys_attrs) elog(ERROR, "invalid attnum received from QD: %d (num physical attributes: %d)", attnum, num_phys_attrs); m = attnum - 1; cstate->cur_attname = NameStr(attr[m].attname); if (attr[attnum - 1].attbyval) { if (CopyGetData(cstate, &value, sizeof(Datum)) != sizeof(Datum)) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); } else { char *p; if (attr[attnum - 1].attlen > 0) { len = attr[attnum - 1].attlen; p = palloc(len); if (CopyGetData(cstate, p, len) != len) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); } else if (attr[attnum - 1].attlen == -1) { /* For simplicity, varlen's are always transmitted in "long" format */ if (CopyGetData(cstate, &len, sizeof(len)) != sizeof(len)) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); if (len < VARHDRSZ) elog(ERROR, "invalid varlen length received from QD: %d", len); p = palloc(len); SET_VARSIZE(p, len); if (CopyGetData(cstate, p + VARHDRSZ, len - VARHDRSZ) != len - VARHDRSZ) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); } else if (attr[attnum - 1].attlen == -2) { /* * Like the varlen case above, cstrings are sent with a length * prefix and no terminator, so we have to NULL-terminate in * memory after reading them in. */ if (CopyGetData(cstate, &len, sizeof(len)) != sizeof(len)) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); p = palloc(len + 1); if (CopyGetData(cstate, p, len) != len) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); p[len] = '\0'; } else { elog(ERROR, "attribute %d has invalid length %d", attnum, attr[attnum - 1].attlen); } value = PointerGetDatum(p); } cstate->cur_attname = NULL; values[m] = value; nulls[m] = false; } if (got_error) goto retry; /* * Here we should compute defaults for any columns for which we didn't * get a default from the QD. But at the moment, all defaults are evaluated * in the QD. */ return true; } /* * Parse and handle an "error frame" from QD. * * The caller has already read part of the frame; 'p' points to that part, * of length 'len'. */ static void HandleQDErrorFrame(CopyState cstate, char *p, int len) { CdbSreh *cdbsreh = cstate->cdbsreh; MemoryContext oldcontext; copy_from_dispatch_error errframe; char *errormsg; char *line; int r; Assert(len <= SizeOfCopyFromDispatchError); Assert(Gp_role == GP_ROLE_EXECUTE); oldcontext = MemoryContextSwitchTo(cdbsreh->badrowcontext); /* * Copy the part of the struct that the caller had already read, and * read the rest. */ memcpy(&errframe, p, len); r = CopyGetData(cstate, ((char *) &errframe) + len, SizeOfCopyFromDispatchError - len); if (r != SizeOfCopyFromDispatchError - len) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); errormsg = palloc(errframe.errmsg_len + 1); line = palloc(errframe.line_len + 1); r = CopyGetData(cstate, errormsg, errframe.errmsg_len); if (r != errframe.errmsg_len) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); errormsg[errframe.errmsg_len] = '\0'; r = CopyGetData(cstate, line, errframe.line_len); if (r != errframe.line_len) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); line[errframe.line_len] = '\0'; cdbsreh->linenumber = errframe.lineno; cdbsreh->rawdata = line; cdbsreh->errmsg = errormsg; cdbsreh->is_server_enc = errframe.line_buf_converted; HandleSingleRowError(cdbsreh); MemoryContextSwitchTo(oldcontext); } /* * Inlined versions of appendBinaryStringInfo and enlargeStringInfo, for * speed. * * NOTE: These versions don't NULL-terminate the string. We don't need * it here. */ #define APPEND_MSGBUF_NOCHECK(buf, ptr, datalen) \ do { \ memcpy((buf)->data + (buf)->len, ptr, (datalen)); \ (buf)->len += (datalen); \ } while(0) #define APPEND_MSGBUF(buf, ptr, datalen) \ do { \ if ((buf)->len + (datalen) >= (buf)->maxlen) \ enlargeStringInfo((buf), (datalen)); \ memcpy((buf)->data + (buf)->len, ptr, (datalen)); \ (buf)->len += (datalen); \ } while(0) #define ENLARGE_MSGBUF(buf, needed) \ do { \ if ((buf)->len + (needed) >= (buf)->maxlen) \ enlargeStringInfo((buf), (needed)); \ } while(0) /* * This is the sending counterpart of NextCopyFromExecute. Used in the QD, * to send a row to a QE. */ static void SendCopyFromForwardedTuple(CopyState cstate, CdbCopy *cdbCopy, bool toAll, int target_seg, Relation rel, int64 lineno, char *line, int line_len, Datum *values, bool *nulls) { TupleDesc tupDesc; FormData_pg_attribute *attr; copy_from_dispatch_row *frame; StringInfo msgbuf; int num_sent_fields; AttrNumber num_phys_attrs; int i; if (!OidIsValid(RelationGetRelid(rel))) elog(ERROR, "invalid target table OID in COPY"); tupDesc = RelationGetDescr(rel); attr = tupDesc->attrs; num_phys_attrs = tupDesc->natts; /* * Reset the message buffer, and reserve enough space for the header, * the OID if any, and the residual line. */ msgbuf = cstate->dispatch_msgbuf; ENLARGE_MSGBUF(msgbuf, SizeOfCopyFromDispatchRow + sizeof(Oid) + cstate->line_buf.len); /* the header goes to the beginning of the struct, but it will be filled in later. */ msgbuf->len = SizeOfCopyFromDispatchRow; /* * Next, any residual text that we didn't process in the QD. */ APPEND_MSGBUF_NOCHECK(msgbuf, cstate->line_buf.data, cstate->line_buf.len); /* * Append attributes to the buffer. */ num_sent_fields = 0; for (i = 0; i < num_phys_attrs; i++) { int16 attnum = i + 1; /* NULLs are simply left out of the message. */ if (nulls[i]) continue; /* * Make sure we have room for the attribute number. While we're at it, * also reserve room for the Datum, if it's a by-value datatype, or for * the length field, if it's a varlena. Allocating both in one call * saves one size-check. */ ENLARGE_MSGBUF(msgbuf, sizeof(int16) + sizeof(Datum)); /* attribute number comes first */ APPEND_MSGBUF_NOCHECK(msgbuf, &attnum, sizeof(int16)); if (attr[i].attbyval) { /* we already reserved space for this above, so we can just memcpy */ APPEND_MSGBUF_NOCHECK(msgbuf, &values[i], sizeof(Datum)); } else { if (attr[i].attlen > 0) { APPEND_MSGBUF(msgbuf, DatumGetPointer(values[i]), attr[i].attlen); } else if (attr[i].attlen == -1) { int32 len; char *ptr; /* For simplicity, varlen's are always transmitted in "long" format */ Assert(!VARATT_IS_SHORT(values[i])); len = VARSIZE(values[i]); ptr = VARDATA(values[i]); /* we already reserved space for this int */ APPEND_MSGBUF_NOCHECK(msgbuf, &len, sizeof(int32)); APPEND_MSGBUF(msgbuf, ptr, len - VARHDRSZ); } else if (attr[i].attlen == -2) { /* * These attrs are NULL-terminated in memory, but we send * them length-prefixed (like the varlen case above) so that * the receiver can preallocate a data buffer. */ int32 len; size_t slen; char *ptr; ptr = DatumGetPointer(values[i]); slen = strlen(ptr); if (slen > PG_INT32_MAX) { elog(ERROR, "attribute %d is too long (%lld bytes)", attnum, (long long) slen); } len = (int32) slen; APPEND_MSGBUF_NOCHECK(msgbuf, &len, sizeof(int32)); APPEND_MSGBUF(msgbuf, ptr, len); } else { elog(ERROR, "attribute %d has invalid length %d", attnum, attr[i].attlen); } } num_sent_fields++; } /* * Fill in the header. We reserved room for this at the beginning of the * buffer. */ frame = (copy_from_dispatch_row *) msgbuf->data; frame->lineno = lineno; frame->relid = RelationGetRelid(rel); frame->line_len = cstate->line_buf.len; frame->residual_off = cstate->line_buf.cursor; frame->fld_count = num_sent_fields; frame->delim_seen_at_end = cstate->stopped_processing_at_delim; if (toAll) cdbCopySendDataToAll(cdbCopy, msgbuf->data, msgbuf->len); else cdbCopySendData(cdbCopy, target_seg, msgbuf->data, msgbuf->len); } static void SendCopyFromForwardedHeader(CopyState cstate, CdbCopy *cdbCopy) { copy_from_dispatch_header header_frame; cdbCopySendDataToAll(cdbCopy, QDtoQESignature, sizeof(QDtoQESignature)); memset(&header_frame, 0, sizeof(header_frame)); header_frame.first_qe_processed_field = cstate->first_qe_processed_field; cdbCopySendDataToAll(cdbCopy, (char *) &header_frame, sizeof(header_frame)); } static void SendCopyFromForwardedError(CopyState cstate, CdbCopy *cdbCopy, char *errormsg) { copy_from_dispatch_error *errframe; StringInfo msgbuf; int target_seg; int errormsg_len = strlen(errormsg); msgbuf = cstate->dispatch_msgbuf; resetStringInfo(msgbuf); enlargeStringInfo(msgbuf, SizeOfCopyFromDispatchError); /* allocate space for the header (we'll fill it in last). */ msgbuf->len = SizeOfCopyFromDispatchError; appendBinaryStringInfo(msgbuf, errormsg, errormsg_len); appendBinaryStringInfo(msgbuf, cstate->line_buf.data, cstate->line_buf.len); errframe = (copy_from_dispatch_error *) msgbuf->data; errframe->error_marker = -1; errframe->lineno = cstate->cur_lineno; errframe->line_len = cstate->line_buf.len; errframe->errmsg_len = errormsg_len; errframe->line_buf_converted = cstate->line_buf_converted; /* send the bad data row to a random QE (via roundrobin) */ if (cstate->lastsegid == cdbCopy->total_segs) cstate->lastsegid = 0; /* start over from first segid */ target_seg = (cstate->lastsegid++ % cdbCopy->total_segs); cdbCopySendData(cdbCopy, target_seg, msgbuf->data, msgbuf->len); } /* * Clean up storage and release resources for COPY FROM. */ void EndCopyFrom(CopyState cstate) { /* No COPY FROM related resources except memory. */ EndCopy(cstate); } /* * Read the next input line and stash it in line_buf, with conversion to * server encoding. * * Result is true if read was terminated by EOF, false if terminated * by newline. The terminating newline or EOF marker is not included * in the final value of line_buf. */ static bool CopyReadLine(CopyState cstate) { bool result; resetStringInfo(&cstate->line_buf); cstate->line_buf_valid = true; /* Mark that encoding conversion hasn't occurred yet */ cstate->line_buf_converted = false; /* Parse data and transfer into line_buf */ result = CopyReadLineText(cstate); if (result) { /* * Reached EOF. In protocol version 3, we should ignore anything * after \. up to the protocol end of copy data. (XXX maybe better * not to treat \. as special?) */ if (cstate->copy_dest == COPY_NEW_FE) { do { cstate->raw_buf_index = cstate->raw_buf_len; } while (CopyLoadRawBuf(cstate)); } } else { /* * If we didn't hit EOF, then we must have transferred the EOL marker * to line_buf along with the data. Get rid of it. */ switch (cstate->eol_type) { case EOL_NL: Assert(cstate->line_buf.len >= 1); Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); cstate->line_buf.len--; cstate->line_buf.data[cstate->line_buf.len] = '\0'; break; case EOL_CR: Assert(cstate->line_buf.len >= 1); Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r'); cstate->line_buf.len--; cstate->line_buf.data[cstate->line_buf.len] = '\0'; break; case EOL_CRNL: Assert(cstate->line_buf.len >= 2); Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r'); Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); cstate->line_buf.len -= 2; cstate->line_buf.data[cstate->line_buf.len] = '\0'; break; case EOL_UNKNOWN: /* shouldn't get here */ Assert(false); break; } } /* Done reading the line. Convert it to server encoding. */ if (cstate->need_transcoding) { char *cvt; cvt = pg_any_to_server(cstate->line_buf.data, cstate->line_buf.len, cstate->file_encoding); if (cvt != cstate->line_buf.data) { /* transfer converted data back to line_buf */ resetStringInfo(&cstate->line_buf); appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt)); pfree(cvt); } } /* Now it's safe to use the buffer in error messages */ cstate->line_buf_converted = true; return result; } /* * CopyReadLineText - inner loop of CopyReadLine for text mode */ static bool CopyReadLineText(CopyState cstate) { char *copy_raw_buf; int raw_buf_ptr; int copy_buf_len; bool need_data = false; bool hit_eof = false; bool result = false; char mblen_str[2]; /* CSV variables */ bool first_char_in_line = true; bool in_quote = false, last_was_esc = false; char quotec = '\0'; char escapec = '\0'; if (cstate->csv_mode) { quotec = cstate->quote[0]; escapec = cstate->escape[0]; /* ignore special escape processing if it's the same as quotec */ if (quotec == escapec) escapec = '\0'; } mblen_str[1] = '\0'; /* * The objective of this loop is to transfer the entire next input line * into line_buf. Hence, we only care for detecting newlines (\r and/or * \n) and the end-of-copy marker (\.). * * In CSV mode, \r and \n inside a quoted field are just part of the data * value and are put in line_buf. We keep just enough state to know if we * are currently in a quoted field or not. * * These four characters, and the CSV escape and quote characters, are * assumed the same in frontend and backend encodings. * * For speed, we try to move data from raw_buf to line_buf in chunks * rather than one character at a time. raw_buf_ptr points to the next * character to examine; any characters from raw_buf_index to raw_buf_ptr * have been determined to be part of the line, but not yet transferred to * line_buf. * * For a little extra speed within the loop, we copy raw_buf and * raw_buf_len into local variables. */ copy_raw_buf = cstate->raw_buf; raw_buf_ptr = cstate->raw_buf_index; copy_buf_len = cstate->raw_buf_len; for (;;) { int prev_raw_ptr; char c; /* * Load more data if needed. Ideally we would just force four bytes * of read-ahead and avoid the many calls to * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol * does not allow us to read too far ahead or we might read into the * next data, so we read-ahead only as far we know we can. One * optimization would be to read-ahead four byte here if * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it, * considering the size of the buffer. */ if (raw_buf_ptr >= copy_buf_len || need_data) { REFILL_LINEBUF; /* * Try to read some more data. This will certainly reset * raw_buf_index to zero, and raw_buf_ptr must go with it. */ if (!CopyLoadRawBuf(cstate)) hit_eof = true; raw_buf_ptr = 0; copy_buf_len = cstate->raw_buf_len; /* * If we are completely out of data, break out of the loop, * reporting EOF. */ if (copy_buf_len <= 0) { result = true; break; } need_data = false; } /* OK to fetch a character */ prev_raw_ptr = raw_buf_ptr; c = copy_raw_buf[raw_buf_ptr++]; if (cstate->csv_mode) { /* * If character is '\\' or '\r', we may need to look ahead below. * Force fetch of the next character if we don't already have it. * We need to do this before changing CSV state, in case one of * these characters is also the quote or escape character. * * Note: old-protocol does not like forced prefetch, but it's OK * here since we cannot validly be at EOF. */ if (c == '\\' || c == '\r') { IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); } /* * Dealing with quotes and escapes here is mildly tricky. If the * quote char is also the escape char, there's no problem - we * just use the char as a toggle. If they are different, we need * to ensure that we only take account of an escape inside a * quoted field and immediately preceding a quote char, and not * the second in an escape-escape sequence. */ if (in_quote && c == escapec) last_was_esc = !last_was_esc; if (c == quotec && !last_was_esc) in_quote = !in_quote; if (c != escapec) last_was_esc = false; /* * Updating the line count for embedded CR and/or LF chars is * necessarily a little fragile - this test is probably about the * best we can do. (XXX it's arguable whether we should do this * at all --- is cur_lineno a physical or logical count?) */ if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r')) cstate->cur_lineno++; } /* Process \r */ if (c == '\r' && (!cstate->csv_mode || !in_quote)) { /* Check for \r\n on first line, _and_ handle \r\n. */ if (cstate->eol_type == EOL_UNKNOWN || cstate->eol_type == EOL_CRNL) { /* * If need more data, go back to loop top to load it. * * Note that if we are at EOF, c will wind up as '\0' because * of the guaranteed pad of raw_buf. */ IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); /* get next char */ c = copy_raw_buf[raw_buf_ptr]; if (c == '\n') { raw_buf_ptr++; /* eat newline */ cstate->eol_type = EOL_CRNL; /* in case not set yet */ /* * GPDB: end of line. Since we don't error out if we find a * bare CR or LF in CRLF mode, break here instead. */ break; } else { /* * GPDB_91_MERGE_FIXME: these commented-out blocks (as well * as the restructured newline checks) are here because we * allow the user to manually set the newline mode, and * therefore don't error out on bare CR/LF in the middle of * a column. Instead, they will be included verbatim. * * This probably has other fallout -- but so does changing * the behavior. Discuss. */ #if 0 /* found \r, but no \n */ if (cstate->eol_type == EOL_CRNL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), !cstate->csv_mode ? errmsg("literal carriage return found in data") : errmsg("unquoted carriage return found in data"), !cstate->csv_mode ? errhint("Use \"\\r\" to represent carriage return.") : errhint("Use quoted CSV field to represent carriage return."))); #endif /* GPDB: only reset eol_type if it's currently unknown. */ if (cstate->eol_type == EOL_UNKNOWN) { /* * if we got here, it is the first line and we didn't find * \n, so don't consume the peeked character */ cstate->eol_type = EOL_CR; } } } #if 0 /* GPDB_91_MERGE_FIXME: see above. */ else if (cstate->eol_type == EOL_NL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), !cstate->csv_mode ? errmsg("literal carriage return found in data") : errmsg("unquoted carriage return found in data"), !cstate->csv_mode ? errhint("Use \"\\r\" to represent carriage return.") : errhint("Use quoted CSV field to represent carriage return."))); #endif /* GPDB: a CR only ends the line in CR mode. */ if (cstate->eol_type == EOL_CR) { /* If reach here, we have found the line terminator */ break; } } /* Process \n */ if (c == '\n' && (!cstate->csv_mode || !in_quote)) { #if 0 /* GPDB_91_MERGE_FIXME: see above. */ if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), !cstate->csv_mode ? errmsg("literal newline found in data") : errmsg("unquoted newline found in data"), !cstate->csv_mode ? errhint("Use \"\\n\" to represent newline.") : errhint("Use quoted CSV field to represent newline."))); #endif /* GPDB: only reset eol_type if it's currently unknown. */ if (cstate->eol_type == EOL_UNKNOWN) cstate->eol_type = EOL_NL; /* in case not set yet */ /* GPDB: a LF only ends the line in LF mode. */ if (cstate->eol_type == EOL_NL) { /* If reach here, we have found the line terminator */ break; } } /* * In CSV mode, we only recognize \. alone on a line. This is because * \. is a valid CSV data value. */ if (c == '\\' && (!cstate->csv_mode || first_char_in_line)) { char c2; IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); IF_NEED_REFILL_AND_EOF_BREAK(0); /* ----- * get next character * Note: we do not change c so if it isn't \., we can fall * through and continue processing for file encoding. * ----- */ c2 = copy_raw_buf[raw_buf_ptr]; if (c2 == '.') { raw_buf_ptr++; /* consume the '.' */ /* * Note: if we loop back for more data here, it does not * matter that the CSV state change checks are re-executed; we * will come back here with no important state changed. */ if (cstate->eol_type == EOL_CRNL) { /* Get the next character */ IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); /* if hit_eof, c2 will become '\0' */ c2 = copy_raw_buf[raw_buf_ptr++]; if (c2 == '\n') { if (!cstate->csv_mode) { cstate->raw_buf_index = raw_buf_ptr; ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("end-of-copy marker does not match previous newline style"))); } else NO_END_OF_COPY_GOTO; } else if (c2 != '\r') { if (!cstate->csv_mode) { cstate->raw_buf_index = raw_buf_ptr; ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("end-of-copy marker corrupt"))); } else NO_END_OF_COPY_GOTO; } } /* Get the next character */ IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); /* if hit_eof, c2 will become '\0' */ c2 = copy_raw_buf[raw_buf_ptr++]; if (c2 != '\r' && c2 != '\n') { if (!cstate->csv_mode) { cstate->raw_buf_index = raw_buf_ptr; ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("end-of-copy marker corrupt"))); } else NO_END_OF_COPY_GOTO; } if ((cstate->eol_type == EOL_NL && c2 != '\n') || (cstate->eol_type == EOL_CRNL && c2 != '\n') || (cstate->eol_type == EOL_CR && c2 != '\r')) { cstate->raw_buf_index = raw_buf_ptr; ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("end-of-copy marker does not match previous newline style"))); } /* * Transfer only the data before the \. into line_buf, then * discard the data and the \. sequence. */ if (prev_raw_ptr > cstate->raw_buf_index) appendBinaryStringInfo(&cstate->line_buf, cstate->raw_buf + cstate->raw_buf_index, prev_raw_ptr - cstate->raw_buf_index); cstate->raw_buf_index = raw_buf_ptr; result = true; /* report EOF */ break; } else if (!cstate->csv_mode) /* * If we are here, it means we found a backslash followed by * something other than a period. In non-CSV mode, anything * after a backslash is special, so we skip over that second * character too. If we didn't do that \\. would be * considered an eof-of copy, while in non-CSV mode it is a * literal backslash followed by a period. In CSV mode, * backslashes are not special, so we want to process the * character after the backslash just like a normal character, * so we don't increment in those cases. */ raw_buf_ptr++; } /* * This label is for CSV cases where \. appears at the start of a * line, but there is more text after it, meaning it was a data value. * We are more strict for \. in CSV mode because \. could be a data * value, while in non-CSV mode, \. cannot be a data value. */ not_end_of_copy: /* * Process all bytes of a multi-byte character as a group. * * We only support multi-byte sequences where the first byte has the * high-bit set, so as an optimization we can avoid this block * entirely if it is not set. */ if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c)) { int mblen; /* * It is enough to look at the first byte in all our encodings, to * get the length. (GB18030 is a bit special, but still works for * our purposes; see comment in pg_gb18030_mblen()) */ mblen_str[0] = c; mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str); IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1); IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1); raw_buf_ptr += mblen - 1; } first_char_in_line = false; } /* end of outer loop */ /* * Transfer any still-uncopied data to line_buf. */ REFILL_LINEBUF; return result; } /* * Return decimal value for a hexadecimal digit */ static int GetDecimalFromHex(char hex) { if (isdigit((unsigned char) hex)) return hex - '0'; else return tolower((unsigned char) hex) - 'a' + 10; } /* * Parse the current line into separate attributes (fields), * performing de-escaping as needed. * * The input is in line_buf. We use attribute_buf to hold the result * strings. cstate->raw_fields[k] is set to point to the k'th attribute * string, or NULL when the input matches the null marker string. * This array is expanded as necessary. * * (Note that the caller cannot check for nulls since the returned * string would be the post-de-escaping equivalent, which may look * the same as some valid data string.) * * delim is the column delimiter string (must be just one byte for now). * null_print is the null marker string. Note that this is compared to * the pre-de-escaped input string. * * The return value is the number of fields actually read. */ static int CopyReadAttributesText(CopyState cstate, int stop_processing_at_field) { char delimc = cstate->delim[0]; char escapec = cstate->escape_off ? delimc : cstate->escape[0]; bool delim_off = cstate->delim_off; int fieldno; char *output_ptr; char *cur_ptr; char *line_end_ptr; /* * We need a special case for zero-column tables: check that the input * line is empty, and return. */ if (cstate->max_fields <= 0) { if (cstate->line_buf.len != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("extra data after last expected column"))); return 0; } resetStringInfo(&cstate->attribute_buf); /* * The de-escaped attributes will certainly not be longer than the input * data line, so we can just force attribute_buf to be large enough and * then transfer data without any checks for enough space. We need to do * it this way because enlarging attribute_buf mid-stream would invalidate * pointers already stored into cstate->raw_fields[]. */ if (cstate->attribute_buf.maxlen <= cstate->line_buf.len) enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len); output_ptr = cstate->attribute_buf.data; /* set pointer variables for loop */ cur_ptr = cstate->line_buf.data + cstate->line_buf.cursor; line_end_ptr = cstate->line_buf.data + cstate->line_buf.len; /* Outer loop iterates over fields */ fieldno = 0; for (;;) { bool found_delim = false; char *start_ptr; char *end_ptr; int input_len; bool saw_non_ascii = false; /* * In QD, stop once we have processed the last field we need in the QD. */ if (fieldno == stop_processing_at_field) { cstate->stopped_processing_at_delim = true; break; } /* Make sure there is enough space for the next value */ if (fieldno >= cstate->max_fields) { cstate->max_fields *= 2; cstate->raw_fields = repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *)); } /* Remember start of field on both input and output sides */ start_ptr = cur_ptr; cstate->raw_fields[fieldno] = output_ptr; /* * Scan data for field. * * Note that in this loop, we are scanning to locate the end of field * and also speculatively performing de-escaping. Once we find the * end-of-field, we can match the raw field contents against the null * marker string. Only after that comparison fails do we know that * de-escaping is actually the right thing to do; therefore we *must * not* throw any syntax errors before we've done the null-marker * check. */ for (;;) { char c; end_ptr = cur_ptr; if (cur_ptr >= line_end_ptr) break; c = *cur_ptr++; if (c == delimc && !delim_off) { found_delim = true; break; } if (c == escapec) { if (cur_ptr >= line_end_ptr) break; c = *cur_ptr++; switch (c) { case '0': case '1': case '2': case '3': case '4': case '5': case '6': case '7': { /* handle \013 */ int val; val = OCTVALUE(c); if (cur_ptr < line_end_ptr) { c = *cur_ptr; if (ISOCTAL(c)) { cur_ptr++; val = (val << 3) + OCTVALUE(c); if (cur_ptr < line_end_ptr) { c = *cur_ptr; if (ISOCTAL(c)) { cur_ptr++; val = (val << 3) + OCTVALUE(c); } } } } c = val & 0377; if (c == '\0' || IS_HIGHBIT_SET(c)) saw_non_ascii = true; } break; case 'x': /* Handle \x3F */ if (cur_ptr < line_end_ptr) { char hexchar = *cur_ptr; if (isxdigit((unsigned char) hexchar)) { int val = GetDecimalFromHex(hexchar); cur_ptr++; if (cur_ptr < line_end_ptr) { hexchar = *cur_ptr; if (isxdigit((unsigned char) hexchar)) { cur_ptr++; val = (val << 4) + GetDecimalFromHex(hexchar); } } c = val & 0xff; if (c == '\0' || IS_HIGHBIT_SET(c)) saw_non_ascii = true; } } break; case 'b': c = '\b'; break; case 'f': c = '\f'; break; case 'n': c = '\n'; break; case 'r': c = '\r'; break; case 't': c = '\t'; break; case 'v': c = '\v'; break; /* * in all other cases, take the char after '\' * literally */ } } /* Add c to output string */ *output_ptr++ = c; } /* Check whether raw input matched null marker */ input_len = end_ptr - start_ptr; if (input_len == cstate->null_print_len && strncmp(start_ptr, cstate->null_print, input_len) == 0) cstate->raw_fields[fieldno] = NULL; else { /* * At this point we know the field is supposed to contain data. * * If we de-escaped any non-7-bit-ASCII chars, make sure the * resulting string is valid data for the db encoding. */ if (saw_non_ascii) { char *fld = cstate->raw_fields[fieldno]; pg_verifymbstr(fld, output_ptr - fld, false); } } /* Terminate attribute value in output area */ *output_ptr++ = '\0'; fieldno++; /* Done if we hit EOL instead of a delim */ if (!found_delim) { cstate->stopped_processing_at_delim = false; break; } } /* * Make note of the stopping point in 'line_buf.cursor', so that we * can send the rest to the QE later. */ cstate->line_buf.cursor = cur_ptr - cstate->line_buf.data; /* Clean up state of attribute_buf */ output_ptr--; Assert(*output_ptr == '\0'); cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data); return fieldno; } /* * Parse the current line into separate attributes (fields), * performing de-escaping as needed. This has exactly the same API as * CopyReadAttributesText, except we parse the fields according to * "standard" (i.e. common) CSV usage. */ static int CopyReadAttributesCSV(CopyState cstate, int stop_processing_at_field) { char delimc = cstate->delim[0]; bool delim_off = cstate->delim_off; char quotec = cstate->quote[0]; char escapec = cstate->escape[0]; int fieldno; char *output_ptr; char *cur_ptr; char *line_end_ptr; /* * We need a special case for zero-column tables: check that the input * line is empty, and return. */ if (cstate->max_fields <= 0) { if (cstate->line_buf.len != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("extra data after last expected column"))); return 0; } resetStringInfo(&cstate->attribute_buf); /* * The de-escaped attributes will certainly not be longer than the input * data line, so we can just force attribute_buf to be large enough and * then transfer data without any checks for enough space. We need to do * it this way because enlarging attribute_buf mid-stream would invalidate * pointers already stored into cstate->raw_fields[]. */ if (cstate->attribute_buf.maxlen <= cstate->line_buf.len) enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len); output_ptr = cstate->attribute_buf.data; /* set pointer variables for loop */ cur_ptr = cstate->line_buf.data + cstate->line_buf.cursor; line_end_ptr = cstate->line_buf.data + cstate->line_buf.len; /* Outer loop iterates over fields */ fieldno = 0; for (;;) { bool found_delim = false; bool saw_quote = false; char *start_ptr; char *end_ptr; int input_len; /* * In QD, stop once we have processed the last field we need in the QD. */ if (fieldno == stop_processing_at_field) { cstate->stopped_processing_at_delim = true; break; } /* Make sure there is enough space for the next value */ if (fieldno >= cstate->max_fields) { cstate->max_fields *= 2; cstate->raw_fields = repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *)); } /* Remember start of field on both input and output sides */ start_ptr = cur_ptr; cstate->raw_fields[fieldno] = output_ptr; /* * Scan data for field, * * The loop starts in "not quote" mode and then toggles between that * and "in quote" mode. The loop exits normally if it is in "not * quote" mode and a delimiter or line end is seen. */ for (;;) { char c; /* Not in quote */ for (;;) { end_ptr = cur_ptr; if (cur_ptr >= line_end_ptr) goto endfield; c = *cur_ptr++; /* unquoted field delimiter */ if (c == delimc && !delim_off) { found_delim = true; goto endfield; } /* start of quoted field (or part of field) */ if (c == quotec) { saw_quote = true; break; } /* Add c to output string */ *output_ptr++ = c; } /* In quote */ for (;;) { end_ptr = cur_ptr; if (cur_ptr >= line_end_ptr) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unterminated CSV quoted field"))); c = *cur_ptr++; /* escape within a quoted field */ if (c == escapec) { /* * peek at the next char if available, and escape it if it * is an escape char or a quote char */ if (cur_ptr < line_end_ptr) { char nextc = *cur_ptr; if (nextc == escapec || nextc == quotec) { *output_ptr++ = nextc; cur_ptr++; continue; } } } /* * end of quoted field. Must do this test after testing for * escape in case quote char and escape char are the same * (which is the common case). */ if (c == quotec) break; /* Add c to output string */ *output_ptr++ = c; } } endfield: /* Terminate attribute value in output area */ *output_ptr++ = '\0'; /* Check whether raw input matched null marker */ input_len = end_ptr - start_ptr; if (!saw_quote && input_len == cstate->null_print_len && strncmp(start_ptr, cstate->null_print, input_len) == 0) cstate->raw_fields[fieldno] = NULL; fieldno++; /* Done if we hit EOL instead of a delim */ if (!found_delim) { cstate->stopped_processing_at_delim = false; break; } } /* * Make note of the stopping point in 'line_buf.cursor', so that we * can send the rest to the QE later. */ cstate->line_buf.cursor = cur_ptr - cstate->line_buf.data; /* Clean up state of attribute_buf */ output_ptr--; Assert(*output_ptr == '\0'); cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data); return fieldno; } /* * Read a binary attribute */ static Datum CopyReadBinaryAttribute(CopyState cstate, int column_no, FmgrInfo *flinfo, Oid typioparam, int32 typmod, bool *isnull) { int32 fld_size; Datum result; if (!CopyGetInt32(cstate, &fld_size)) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); if (fld_size == -1) { *isnull = true; return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod); } if (fld_size < 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid field size"))); /* reset attribute_buf to empty, and load raw data in it */ resetStringInfo(&cstate->attribute_buf); enlargeStringInfo(&cstate->attribute_buf, fld_size); if (CopyGetData(cstate, cstate->attribute_buf.data, fld_size) != fld_size) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); cstate->attribute_buf.len = fld_size; cstate->attribute_buf.data[fld_size] = '\0'; /* Call the column type's binary input converter */ result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf, typioparam, typmod); /* Trouble if it didn't eat the whole buffer */ if (cstate->attribute_buf.cursor != cstate->attribute_buf.len) ereport(ERROR, (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), errmsg("incorrect binary data format"))); *isnull = false; return result; } /* * Send text representation of one attribute, with conversion and escaping */ #define DUMPSOFAR() \ do { \ if (ptr > start) \ CopySendData(cstate, start, ptr - start); \ } while (0) static void CopyAttributeOutText(CopyState cstate, char *string) { char *ptr; char *start; char c; char delimc = cstate->delim[0]; char escapec = cstate->escape[0]; if (cstate->need_transcoding) ptr = pg_server_to_custom(string, strlen(string), cstate->file_encoding, cstate->enc_conversion_proc); else ptr = string; if (cstate->escape_off) { CopySendData(cstate, ptr, strlen(ptr)); return; } /* * We have to grovel through the string searching for control characters * and instances of the delimiter character. In most cases, though, these * are infrequent. To avoid overhead from calling CopySendData once per * character, we dump out all characters between escaped characters in a * single call. The loop invariant is that the data from "start" to "ptr" * can be sent literally, but hasn't yet been. * * We can skip pg_encoding_mblen() overhead when encoding is safe, because * in valid backend encodings, extra bytes of a multibyte character never * look like ASCII. This loop is sufficiently performance-critical that * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out * of the normal safe-encoding path. */ if (cstate->encoding_embeds_ascii) { start = ptr; while ((c = *ptr) != '\0') { if ((unsigned char) c < (unsigned char) 0x20) { /* * \r and \n must be escaped, the others are traditional. We * prefer to dump these using the C-like notation, rather than * a backslash and the literal character, because it makes the * dump file a bit more proof against Microsoftish data * mangling. */ switch (c) { case '\b': c = 'b'; break; case '\f': c = 'f'; break; case '\n': c = 'n'; break; case '\r': c = 'r'; break; case '\t': c = 't'; break; case '\v': c = 'v'; break; default: /* If it's the delimiter, must backslash it */ if (c == delimc) break; /* All ASCII control chars are length 1 */ ptr++; continue; /* fall to end of loop */ } /* if we get here, we need to convert the control char */ DUMPSOFAR(); CopySendChar(cstate, escapec); CopySendChar(cstate, c); start = ++ptr; /* do not include char in next run */ } else if (c == escapec || c == delimc) { DUMPSOFAR(); CopySendChar(cstate, escapec); start = ptr++; /* we include char in next run */ } else if (IS_HIGHBIT_SET(c)) ptr += pg_encoding_mblen(cstate->file_encoding, ptr); else ptr++; } } else { start = ptr; while ((c = *ptr) != '\0') { if ((unsigned char) c < (unsigned char) 0x20) { /* * \r and \n must be escaped, the others are traditional. We * prefer to dump these using the C-like notation, rather than * a backslash and the literal character, because it makes the * dump file a bit more proof against Microsoftish data * mangling. */ switch (c) { case '\b': c = 'b'; break; case '\f': c = 'f'; break; case '\n': c = 'n'; break; case '\r': c = 'r'; break; case '\t': c = 't'; break; case '\v': c = 'v'; break; default: /* If it's the delimiter, must backslash it */ if (c == delimc) break; /* All ASCII control chars are length 1 */ ptr++; continue; /* fall to end of loop */ } /* if we get here, we need to convert the control char */ DUMPSOFAR(); CopySendChar(cstate, escapec); CopySendChar(cstate, c); start = ++ptr; /* do not include char in next run */ } else if (c == escapec || c == delimc) { DUMPSOFAR(); CopySendChar(cstate, escapec); start = ptr++; /* we include char in next run */ } else ptr++; } } DUMPSOFAR(); } /* * Send text representation of one attribute, with conversion and * CSV-style escaping */ static void CopyAttributeOutCSV(CopyState cstate, char *string, bool use_quote, bool single_attr) { char *ptr; char *start; char c; char delimc = cstate->delim[0]; char quotec; char escapec = cstate->escape[0]; /* * MPP-8075. We may get called with cstate->quote == NULL. */ if (cstate->quote == NULL) { quotec = '"'; } else { quotec = cstate->quote[0]; } /* force quoting if it matches null_print (before conversion!) */ if (!use_quote && strcmp(string, cstate->null_print) == 0) use_quote = true; if (cstate->need_transcoding) ptr = pg_server_to_custom(string, strlen(string), cstate->file_encoding, cstate->enc_conversion_proc); else ptr = string; /* * Make a preliminary pass to discover if it needs quoting */ if (!use_quote) { /* * Because '\.' can be a data value, quote it if it appears alone on a * line so it is not interpreted as the end-of-data marker. */ if (single_attr && strcmp(ptr, "\\.") == 0) use_quote = true; else { char *tptr = ptr; while ((c = *tptr) != '\0') { if (c == delimc || c == quotec || c == '\n' || c == '\r') { use_quote = true; break; } if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii) tptr += pg_encoding_mblen(cstate->file_encoding, tptr); else tptr++; } } } if (use_quote) { CopySendChar(cstate, quotec); /* * We adopt the same optimization strategy as in CopyAttributeOutText */ start = ptr; while ((c = *ptr) != '\0') { if (c == quotec || c == escapec) { DUMPSOFAR(); CopySendChar(cstate, escapec); start = ptr; /* we include char in next run */ } if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii) ptr += pg_encoding_mblen(cstate->file_encoding, ptr); else ptr++; } DUMPSOFAR(); CopySendChar(cstate, quotec); } else { /* If it doesn't need quoting, we can just dump it as-is */ CopySendString(cstate, ptr); } } /* * CopyGetAttnums - build an integer list of attnums to be copied * * The input attnamelist is either the user-specified column list, * or NIL if there was none (in which case we want all the non-dropped * columns). * * We don't include generated columns in the generated full list and we don't * allow them to be specified explicitly. They don't make sense for COPY * FROM, but we could possibly allow them for COPY TO. But this way it's at * least ensured that whatever we copy out can be copied back in. * * rel can be NULL ... it's only used for error reports. */ List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) { List *attnums = NIL; if (attnamelist == NIL) { /* Generate default column list */ int attr_count = tupDesc->natts; int i; for (i = 0; i < attr_count; i++) { if (TupleDescAttr(tupDesc, i)->attisdropped) continue; if (TupleDescAttr(tupDesc, i)->attgenerated) continue; attnums = lappend_int(attnums, i + 1); } } else { /* Validate the user-supplied list and extract attnums */ ListCell *l; foreach(l, attnamelist) { char *name = strVal(lfirst(l)); int attnum; int i; /* Lookup column name */ attnum = InvalidAttrNumber; for (i = 0; i < tupDesc->natts; i++) { Form_pg_attribute att = TupleDescAttr(tupDesc, i); if (att->attisdropped) continue; if (namestrcmp(&(att->attname), name) == 0) { if (att->attgenerated) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("column \"%s\" is a generated column", name), errdetail("Generated columns cannot be used in COPY."))); attnum = att->attnum; break; } } if (attnum == InvalidAttrNumber) { if (rel != NULL) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("column \"%s\" of relation \"%s\" does not exist", name, RelationGetRelationName(rel)))); else ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("column \"%s\" does not exist", name))); } /* Check for duplicates */ if (list_member_int(attnums, attnum)) ereport(ERROR, (errcode(ERRCODE_DUPLICATE_COLUMN), errmsg("column \"%s\" specified more than once", name))); attnums = lappend_int(attnums, attnum); } } return attnums; } /* remove end of line chars from end of a buffer */ void truncateEol(StringInfo buf, EolType eol_type) { int one_back = buf->len - 1; int two_back = buf->len - 2; if(eol_type == EOL_CRNL) { if(buf->len < 2) return; if(buf->data[two_back] == '\r' && buf->data[one_back] == '\n') { buf->data[two_back] = '\0'; buf->data[one_back] = '\0'; buf->len -= 2; } } else { if(buf->len < 1) return; if(buf->data[one_back] == '\r' || buf->data[one_back] == '\n') { buf->data[one_back] = '\0'; buf->len--; } } } /* wrapper for truncateEol */ void truncateEolStr(char *str, EolType eol_type) { StringInfoData buf; buf.data = str; buf.len = strlen(str); buf.maxlen = buf.len; truncateEol(&buf, eol_type); } /* * copy_dest_startup --- executor startup */ static void copy_dest_startup(DestReceiver *self pg_attribute_unused(), int operation pg_attribute_unused(), TupleDesc typeinfo pg_attribute_unused()) { if (Gp_role != GP_ROLE_EXECUTE) return; DR_copy *myState = (DR_copy *) self; myState->cstate = BeginCopyToOnSegment(myState->queryDesc); } /* * copy_dest_receive --- receive one tuple */ static bool copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) { DR_copy *myState = (DR_copy *) self; CopyState cstate = myState->cstate; /* Send the data */ CopyOneRowTo(cstate, slot); myState->processed++; return true; } /* * copy_dest_shutdown --- executor end */ static void copy_dest_shutdown(DestReceiver *self pg_attribute_unused()) { if (Gp_role != GP_ROLE_EXECUTE) return; DR_copy *myState = (DR_copy *) self; EndCopyToOnSegment(myState->cstate); } /* * copy_dest_destroy --- release DestReceiver object */ static void copy_dest_destroy(DestReceiver *self) { pfree(self); } /* * CreateCopyDestReceiver -- create a suitable DestReceiver object */ DestReceiver * CreateCopyDestReceiver(void) { DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy)); self->pub.receiveSlot = copy_dest_receive; self->pub.rStartup = copy_dest_startup; self->pub.rShutdown = copy_dest_shutdown; self->pub.rDestroy = copy_dest_destroy; self->pub.mydest = DestCopyOut; self->cstate = NULL; /* need to be set later */ self->queryDesc = NULL; /* need to be set later */ self->processed = 0; return (DestReceiver *) self; } /* * Initialize data loader parsing state */ static void CopyInitDataParser(CopyState cstate) { cstate->reached_eof = false; cstate->cur_relname = RelationGetRelationName(cstate->rel); cstate->cur_lineno = 0; cstate->cur_attname = NULL; cstate->null_print_len = strlen(cstate->null_print); /* Set up data buffer to hold a chunk of data */ MemSet(cstate->raw_buf, ' ', RAW_BUF_SIZE * sizeof(char)); cstate->raw_buf[RAW_BUF_SIZE] = '\0'; } /* * setEncodingConversionProc * * COPY and External tables use a custom path to the encoding conversion * API because external tables have their own encoding (which is not * necessarily client_encoding). We therefore have to set the correct * encoding conversion function pointer ourselves, to be later used in * the conversion engine. * * The code here mimics a part of SetClientEncoding() in mbutils.c */ static void setEncodingConversionProc(CopyState cstate, int encoding, bool iswritable) { Oid conversion_proc; /* * COPY FROM and RET: convert from file to server * COPY TO and WET: convert from server to file */ if (iswritable) conversion_proc = FindDefaultConversionProc(GetDatabaseEncoding(), encoding); else conversion_proc = FindDefaultConversionProc(encoding, GetDatabaseEncoding()); if (OidIsValid(conversion_proc)) { /* conversion proc found */ cstate->enc_conversion_proc = palloc(sizeof(FmgrInfo)); fmgr_info(conversion_proc, cstate->enc_conversion_proc); } else { /* no conversion function (both encodings are probably the same) */ cstate->enc_conversion_proc = NULL; } } static GpDistributionData * InitDistributionData(CopyState cstate, EState *estate) { GpDistributionData *distData; GpPolicy *policy; CdbHash *cdbHash; /* * A non-partitioned table, or all the partitions have identical * distribution policies. */ policy = GpPolicyCopy(cstate->rel->rd_cdbpolicy); cdbHash = makeCdbHashForRelation(cstate->rel); distData = palloc(sizeof(GpDistributionData)); distData->policy = policy; distData->cdbHash = cdbHash; return distData; } static void FreeDistributionData(GpDistributionData *distData) { if (distData) { if (distData->policy) pfree(distData->policy); if (distData->cdbHash) pfree(distData->cdbHash); pfree(distData); } } /* * Compute which fields need to be processed in the QD, and which ones can * be delayed to the QE. */ static void InitCopyFromDispatchSplit(CopyState cstate, GpDistributionData *distData, EState *estate) { int first_qe_processed_field = 0; Bitmapset *needed_cols = NULL; ListCell *lc; if (cstate->binary) { foreach(lc, cstate->attnumlist) { AttrNumber attnum = lfirst_int(lc); needed_cols = bms_add_member(needed_cols, attnum); first_qe_processed_field++; } } else { int fieldno; /* * We need all the columns that form the distribution key. */ if (distData->policy) { for (int i = 0; i < distData->policy->nattrs; i++) needed_cols = bms_add_member(needed_cols, distData->policy->attrs[i]); } /* Get the max fieldno that contains one of the needed attributes. */ fieldno = 0; foreach(lc, cstate->attnumlist) { AttrNumber attnum = lfirst_int(lc); if (bms_is_member(attnum, needed_cols)) first_qe_processed_field = fieldno + 1; fieldno++; } } cstate->first_qe_processed_field = first_qe_processed_field; if (Test_copy_qd_qe_split) { if (first_qe_processed_field == list_length(cstate->attnumlist)) elog(INFO, "all fields will be processed in the QD"); else elog(INFO, "first field processed in the QE: %d", first_qe_processed_field); } } static unsigned int GetTargetSeg(GpDistributionData *distData, TupleTableSlot *slot) { unsigned int target_seg; CdbHash *cdbHash = distData->cdbHash; GpPolicy *policy = distData->policy; /* the partitioning policy for this table */ AttrNumber p_nattrs; /* num of attributes in the distribution policy */ /* * These might be NULL, if we're called with a "main" GpDistributionData, * for a partitioned table with heterogenous partitions. The caller * should've used GetDistributionPolicyForPartition() to get the right * distdata object for the partition. */ if (!policy) elog(ERROR, "missing distribution policy."); if (!cdbHash) elog(ERROR, "missing cdbhash"); /* * At this point in the code, baseValues[x] is final for this * data row -- either the input data, a null or a default * value is in there, and constraints applied. * * Perform a cdbhash on this data row. Perform a hash operation * on each attribute. */ p_nattrs = policy->nattrs; if (p_nattrs > 0) { cdbhashinit(cdbHash); for (int i = 0; i < p_nattrs; i++) { /* current attno from the policy */ AttrNumber h_attnum = policy->attrs[i]; Datum d; bool isnull; d = slot_getattr(slot, h_attnum, &isnull); cdbhash(cdbHash, i + 1, d, isnull); } target_seg = cdbhashreduce(cdbHash); /* hash result segment */ } else { /* * Randomly distributed. Pick a segment at random. */ target_seg = cdbhashrandomseg(policy->numsegments); } return target_seg; } static ProgramPipes* open_program_pipes(char *command, bool forwrite) { int save_errno; pqsigfunc save_SIGPIPE; /* set up extvar */ extvar_t extvar; memset(&extvar, 0, sizeof(extvar)); external_set_env_vars(&extvar, command, false, NULL, NULL, false, 0); ProgramPipes *program_pipes = palloc(sizeof(ProgramPipes)); program_pipes->pid = -1; program_pipes->pipes[0] = -1; program_pipes->pipes[1] = -1; program_pipes->shexec = make_command(command, &extvar); /* * Preserve the SIGPIPE handler and set to default handling. This * allows "normal" SIGPIPE handling in the command pipeline. Normal * for PG is to *ignore* SIGPIPE. */ save_SIGPIPE = pqsignal(SIGPIPE, SIG_DFL); program_pipes->pid = popen_with_stderr(program_pipes->pipes, program_pipes->shexec, forwrite); save_errno = errno; /* Restore the SIGPIPE handler */ pqsignal(SIGPIPE, save_SIGPIPE); elog(DEBUG5, "COPY ... PROGRAM command: %s", program_pipes->shexec); if (program_pipes->pid == -1) { errno = save_errno; ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_RESOURCES), errmsg("can not start command: %s", command))); } return program_pipes; } static void close_program_pipes(CopyState cstate, bool ifThrow) { Assert(cstate->is_program); int ret = 0; StringInfoData sinfo; initStringInfo(&sinfo); if (cstate->copy_file) { fclose(cstate->copy_file); cstate->copy_file = NULL; } /* just return if pipes not created, like when relation does not exist */ if (!cstate->program_pipes) { return; } ret = pclose_with_stderr(cstate->program_pipes->pid, cstate->program_pipes->pipes, &sinfo); if (ret == 0 || !ifThrow) { return; } if (ret == -1) { /* pclose()/wait4() ended with an error; errno should be valid */ ereport(ERROR, (errcode_for_file_access(), errmsg("can not close pipe: %m"))); } else if (!WIFSIGNALED(ret)) { /* * pclose() returned the process termination state. */ ereport(ERROR, (errcode(ERRCODE_SQL_ROUTINE_EXCEPTION), errmsg("command error message: %s", sinfo.data))); } } static List * parse_joined_option_list(char *str, char *delimiter) { char *token; char *comma; const char *whitespace = " \t\n\r"; List *cols = NIL; int encoding = GetDatabaseEncoding(); token = strtokx2(str, whitespace, delimiter, "\"", 0, false, false, encoding); while (token) { if (token[0] == ',') break; cols = lappend(cols, makeString(pstrdup(token))); /* consume the comma if any */ comma = strtokx2(NULL, whitespace, delimiter, "\"", 0, false, false, encoding); if (!comma || comma[0] != ',') break; token = strtokx2(NULL, whitespace, delimiter, "\"", 0, false, false, encoding); } return cols; }