提交 14885872 编写于 作者: A Adam Lee

s3ext: support to create writable external table

Only to create, not functional yet.
Signed-off-by: NAdam Lee <ali@pivotal.io>
Signed-off-by: NHaozhou Wang <hawang@pivotal.io>
上级 7eb0cc92
......@@ -8,7 +8,7 @@
#include "s3interface.h"
#include "s3key_reader.h"
extern string gpReaderErrorMessage;
extern string s3extErrorMessage;
class GPReader : public Reader {
public:
......
#ifndef INCLUDE_GPWRITER_H_
#define INCLUDE_GPWRITER_H_
class GPWriter {
public:
GPWriter();
virtual ~GPWriter();
};
// Following 3 functions are invoked by s3_export(), need to be exception safe
GPWriter *writer_init(const char *url_with_options);
bool writer_transfer_data(GPWriter *writer, char *data_buf, int &data_len);
bool writer_cleanup(GPWriter **writer);
#endif /* INCLUDE_GPWRITER_H_ */
COMMON_OBJS = s3conf.o s3common.o s3utils.o s3log.o s3url_parser.o s3http_headers.o s3interface.o s3restful_service.o decompress_reader.o s3key_reader.o s3bucket_reader.o gpreader.o s3common_reader.o
COMMON_OBJS = gpreader.o gpwriter.o s3conf.o s3common.o s3utils.o s3log.o s3url_parser.o s3http_headers.o s3interface.o s3restful_service.o decompress_reader.o s3key_reader.o s3bucket_reader.o s3common_reader.o
COMMON_LINK_OPTIONS = -lstdc++ -lxml2 -lpthread -lcrypto -lcurl -lz
......
CREATE OR REPLACE FUNCTION write_to_s3() RETURNS integer AS
'$libdir/gps3ext.so', 's3_export' LANGUAGE C STABLE;
CREATE PROTOCOL s3 (
writefunc = write_to_s3
);
DROP EXTERNAL TABLE s3regress;
CREATE WRITABLE EXTERNAL TABLE s3regress (date text, time text, open float, high float,
low float, volume int) LOCATION('s3://s3-us-west-2.amazonaws.com/s3test.pivotal.io/regress/normal/ config=/home/gpadmin/s3.conf') FORMAT 'csv';
DROP EXTERNAL TABLE s3regress;
DROP PROTOCOL s3;
CREATE OR REPLACE FUNCTION write_to_s3() RETURNS integer AS
'$libdir/gps3ext.so', 's3_export' LANGUAGE C STABLE;
CREATE PROTOCOL s3 (
writefunc = write_to_s3
);
DROP EXTERNAL TABLE s3regress;
CREATE WRITABLE EXTERNAL TABLE s3regress (date text, time text, open float, high float,
low float, volume int) FORMAT 'csv';
DROP EXTERNAL TABLE s3regress;
DROP PROTOCOL s3;
CREATE OR REPLACE FUNCTION write_to_s3() RETURNS integer AS
'$libdir/gps3ext.so', 's3_export' LANGUAGE C STABLE;
CREATE PROTOCOL s3 (
writefunc = write_to_s3
);
DROP EXTERNAL TABLE s3regress;
ERROR: table "s3regress" does not exist
CREATE WRITABLE EXTERNAL TABLE s3regress (date text, time text, open float, high float,
low float, volume int) LOCATION('s3://s3-us-west-2.amazonaws.com/s3test.pivotal.io/regress/normal/ config=/home/gpadmin/s3.conf') FORMAT 'csv';
DROP EXTERNAL TABLE s3regress;
DROP PROTOCOL s3;
CREATE OR REPLACE FUNCTION write_to_s3() RETURNS integer AS
'$libdir/gps3ext.so', 's3_export' LANGUAGE C STABLE;
CREATE PROTOCOL s3 (
writefunc = write_to_s3
);
DROP EXTERNAL TABLE s3regress;
ERROR: table "s3regress" does not exist
CREATE WRITABLE EXTERNAL TABLE s3regress (date text, time text, open float, high float,
low float, volume int) FORMAT 'csv';
ERROR: syntax error at or near "FORMAT"
LINE 2: low float, volume int) FORMAT 'csv';
^
DROP EXTERNAL TABLE s3regress;
ERROR: table "s3regress" does not exist
DROP PROTOCOL s3;
......@@ -16,3 +16,5 @@ test: 2_01_invalid_syntax
test: 2_02_invalid_region
test: 2_03_invalid_config
test: 2_04_invalid_header
test: 3_01_create_wet
test: 4_01_create_invalid_wet
......@@ -11,7 +11,7 @@
#include "s3macros.h"
#include "s3utils.h"
string gpReaderErrorMessage;
string s3extErrorMessage;
// Thread related functions, called only by gpreader and gpcheckcloud
#define MUTEX_TYPE pthread_mutex_t
......@@ -117,7 +117,7 @@ void CheckEssentialConfig() {
// invoked by s3_import(), need to be exception safe
GPReader* reader_init(const char* url_with_options) {
GPReader* reader = NULL;
gpReaderErrorMessage.clear();
s3extErrorMessage.clear();
try {
if (!url_with_options) {
return NULL;
......@@ -158,7 +158,7 @@ GPReader* reader_init(const char* url_with_options) {
delete reader;
}
S3ERROR("reader_init caught an exception: %s", e.what());
gpReaderErrorMessage = e.what();
s3extErrorMessage = e.what();
return NULL;
}
}
......@@ -180,7 +180,7 @@ bool reader_transfer_data(GPReader* reader, char* data_buf, int& data_len) {
data_len = (int)read_len;
} catch (std::exception& e) {
S3ERROR("reader_transfer_data caught an exception: %s", e.what());
gpReaderErrorMessage = e.what();
s3extErrorMessage = e.what();
return false;
}
......@@ -200,7 +200,7 @@ bool reader_cleanup(GPReader** reader) {
}
} catch (std::exception& e) {
S3ERROR("reader_cleanup caught an exception: %s", e.what());
gpReaderErrorMessage = e.what();
s3extErrorMessage = e.what();
result = false;
}
......
......@@ -18,6 +18,8 @@
#include "utils/memutils.h"
#include "gpreader.h"
#include "gpwriter.h"
#include "s3common.h"
#include "s3conf.h"
#include "s3log.h"
......@@ -50,8 +52,8 @@ Datum s3_import(PG_FUNCTION_ARGS) {
thread_cleanup();
if (!reader_cleanup(&gpreader)) {
ereport(ERROR, (0, errmsg("Failed to cleanup S3 extension: %s",
gpReaderErrorMessage.c_str())));
ereport(ERROR,
(0, errmsg("Failed to cleanup S3 extension: %s", s3extErrorMessage.c_str())));
}
EXTPROTOCOL_SET_USER_CTX(fcinfo, NULL);
......@@ -70,7 +72,7 @@ Datum s3_import(PG_FUNCTION_ARGS) {
ereport(ERROR, (0, errmsg("Failed to init S3 extension, segid = %d, "
"segnum = %d, please check your "
"configurations and net connection: %s",
s3ext_segid, s3ext_segnum, gpReaderErrorMessage.c_str())));
s3ext_segid, s3ext_segnum, s3extErrorMessage.c_str())));
}
EXTPROTOCOL_SET_USER_CTX(fcinfo, gpreader);
......@@ -81,7 +83,7 @@ Datum s3_import(PG_FUNCTION_ARGS) {
if (!reader_transfer_data(gpreader, data_buf, data_len)) {
ereport(ERROR,
(0, errmsg("s3_import: could not read data: %s", gpReaderErrorMessage.c_str())));
(0, errmsg("s3_import: could not read data: %s", s3extErrorMessage.c_str())));
}
PG_RETURN_INT32(data_len);
}
......@@ -91,5 +93,51 @@ Datum s3_import(PG_FUNCTION_ARGS) {
* invoked by GPDB, be careful with C++ exceptions.
*/
Datum s3_export(PG_FUNCTION_ARGS) {
PG_RETURN_INT32(0);
/* Must be called via the external table format manager */
if (!CALLED_AS_EXTPROTOCOL(fcinfo))
elog(ERROR, "extprotocol_import: not called by external protocol manager");
/* Get our internal description of the protocol */
GPWriter *gpwriter = (GPWriter *)EXTPROTOCOL_GET_USER_CTX(fcinfo);
/* last call. destroy writer */
if (EXTPROTOCOL_IS_LAST_CALL(fcinfo)) {
thread_cleanup();
if (!writer_cleanup(&gpwriter)) {
ereport(ERROR,
(0, errmsg("Failed to cleanup S3 extension: %s", s3extErrorMessage.c_str())));
}
EXTPROTOCOL_SET_USER_CTX(fcinfo, NULL);
PG_RETURN_INT32(0);
}
/* first call. do any desired init */
if (gpwriter == NULL) {
const char *url_with_options = EXTPROTOCOL_GET_URL(fcinfo);
thread_setup();
gpwriter = writer_init(url_with_options);
if (!gpwriter) {
ereport(ERROR, (0, errmsg("Failed to init S3 extension, segid = %d, "
"segnum = %d, please check your "
"configurations and net connection: %s",
s3ext_segid, s3ext_segnum, s3extErrorMessage.c_str())));
}
EXTPROTOCOL_SET_USER_CTX(fcinfo, gpwriter);
}
char *data_buf = EXTPROTOCOL_GET_DATABUF(fcinfo);
int32 data_len = EXTPROTOCOL_GET_DATALEN(fcinfo);
if (!writer_transfer_data(gpwriter, data_buf, data_len)) {
ereport(ERROR,
(0, errmsg("s3_export: could not write data: %s", s3extErrorMessage.c_str())));
}
PG_RETURN_INT32(data_len);
}
#include "gpwriter.h"
GPWriter::GPWriter() {
}
GPWriter::~GPWriter() {
}
GPWriter *writer_init(const char *url_with_options) {
GPWriter *gpwriter = new GPWriter();
return gpwriter;
}
bool writer_transfer_data(GPWriter *writer, char *data_buf, int &data_len) {
return true;
}
bool writer_cleanup(GPWriter **writer) {
if (*writer) {
delete *writer;
}
return true;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册