未验证 提交 cd4c7a70 编写于 作者: A Alexander Denissov 提交者: GitHub

support for writable external tables with PXF (#3795)

上级 f7f726fb
......@@ -23,3 +23,101 @@ SELECT * from pxf_readcustom_test order by a;
fragment3 row2 | value1 | value2
(6 rows)
------------------------------------------------------------------
-- PXF write test
------------------------------------------------------------------
\!rm -rf /tmp/pxf
INSERT INTO pxf_write_test SELECT * from origin;
\!ls -1 /tmp/pxf/ | wc -l | sed -e 's/^[[:space:]]*//'
3
\!cat /tmp/pxf/* | sort
10,data_10
11,data_11
12,data_12
13,data_13
14,data_14
15,data_15
16,data_16
17,data_17
18,data_18
19,data_19
20,data_20
21,data_21
22,data_22
23,data_23
24,data_24
25,data_25
26,data_26
27,data_27
28,data_28
29,data_29
30,data_30
31,data_31
32,data_32
33,data_33
34,data_34
35,data_35
36,data_36
37,data_37
38,data_38
39,data_39
40,data_40
41,data_41
42,data_42
43,data_43
44,data_44
45,data_45
46,data_46
47,data_47
48,data_48
49,data_49
50,data_50
51,data_51
52,data_52
53,data_53
54,data_54
55,data_55
56,data_56
57,data_57
58,data_58
59,data_59
60,data_60
61,data_61
62,data_62
63,data_63
64,data_64
65,data_65
66,data_66
67,data_67
68,data_68
69,data_69
70,data_70
71,data_71
72,data_72
73,data_73
74,data_74
75,data_75
76,data_76
77,data_77
78,data_78
79,data_79
80,data_80
81,data_81
82,data_82
83,data_83
84,data_84
85,data_85
86,data_86
87,data_87
88,data_88
89,data_89
90,data_90
91,data_91
92,data_92
93,data_93
94,data_94
95,data_95
96,data_96
97,data_97
98,data_98
99,data_99
\ No newline at end of file
......@@ -19,3 +19,10 @@ LOCATION ('pxf://tmp/dummy1'
'&ACCESSOR=org.apache.hawq.pxf.api.examples.DemoAccessor'
'&RESOLVER=org.apache.hawq.pxf.api.examples.DemoResolver')
FORMAT 'CUSTOM' (formatter='pxfwritable_import');
CREATE WRITABLE EXTERNAL TABLE pxf_write_test (a int, b TEXT)
LOCATION ('pxf:///tmp/pxf?'
'&ACCESSOR=org.apache.hawq.pxf.api.examples.DemoFileWritableAccessor'
'&RESOLVER=org.apache.hawq.pxf.api.examples.DemoTextResolver')
FORMAT 'TEXT' (DELIMITER ',') DISTRIBUTED BY (a);
CREATE TABLE origin (a int, b TEXT) DISTRIBUTED BY (a);
INSERT INTO origin SELECT i, 'data_' || i FROM generate_series(10,99) AS i;
......@@ -4,3 +4,13 @@
SELECT * from pxf_read_test order by a;
SELECT * from pxf_readcustom_test order by a;
------------------------------------------------------------------
-- PXF write test
------------------------------------------------------------------
\!rm -rf /tmp/pxf
INSERT INTO pxf_write_test SELECT * from origin;
\!ls -1 /tmp/pxf/ | wc -l | sed -e 's/^[[:space:]]*//'
\!cat /tmp/pxf/* | sort
\ No newline at end of file
......@@ -24,3 +24,12 @@ LOCATION ('pxf://tmp/dummy1'
'&ACCESSOR=org.apache.hawq.pxf.api.examples.DemoAccessor'
'&RESOLVER=org.apache.hawq.pxf.api.examples.DemoResolver')
FORMAT 'CUSTOM' (formatter='pxfwritable_import');
CREATE WRITABLE EXTERNAL TABLE pxf_write_test (a int, b TEXT)
LOCATION ('pxf:///tmp/pxf?'
'&ACCESSOR=org.apache.hawq.pxf.api.examples.DemoFileWritableAccessor'
'&RESOLVER=org.apache.hawq.pxf.api.examples.DemoTextResolver')
FORMAT 'TEXT' (DELIMITER ',') DISTRIBUTED BY (a);
CREATE TABLE origin (a int, b TEXT) DISTRIBUTED BY (a);
INSERT INTO origin SELECT i, 'data_' || i FROM generate_series(10,99) AS i;
\ No newline at end of file
......@@ -21,9 +21,14 @@
#include "pxfbridge.h"
#include "pxfheaders.h"
#include "pxffragment.h"
#include "pxfutils.h"
#include "cdb/cdbtm.h"
#include "cdb/cdbvars.h"
/* helper function declarations */
static void build_uri_for_read(gphadoop_context *context);
static void build_uri_for_write(gphadoop_context *context);
static void build_file_name_for_write(gphadoop_context *context);
static void add_querydata_to_http_headers(gphadoop_context *context);
static void set_current_fragment_headers(gphadoop_context *context);
static size_t fill_buffer(gphadoop_context *context, char *start, size_t size);
......@@ -73,7 +78,21 @@ gpbridge_import_start(gphadoop_context *context)
}
/*
* Reads data from PXF into the given buffer of a given size
* Sets up data before starting export
*/
void
gpbridge_export_start(gphadoop_context *context)
{
build_file_name_for_write(context);
build_uri_for_write(context);
context->churl_headers = churl_headers_init();
add_querydata_to_http_headers(context);
context->churl_handle = churl_init_upload(context->uri.data, context->churl_headers);
}
/*
* Reads data from the PXF server into the given buffer of a given size
*/
int
gpbridge_read(gphadoop_context *context, char *databuf, int datalen)
......@@ -107,7 +126,24 @@ gpbridge_read(gphadoop_context *context, char *databuf, int datalen)
}
/*
* Format the URI by adding PXF service endpoint details
* Writes data from the given buffer of a given size to the PXF server
*/
int
gpbridge_write(gphadoop_context *context, char *databuf, int datalen)
{
size_t n = 0;
if (datalen > 0)
{
n = churl_write(context->churl_handle, databuf, datalen);
elog(DEBUG5, "pxf gpbridge_write: wrote %zu bytes to %s", n, context->write_file_name.data);
}
return (int) n;
}
/*
* Format the URI for reading by adding PXF service endpoint details
*/
static void
build_uri_for_read(gphadoop_context *context)
......@@ -120,6 +156,37 @@ build_uri_for_read(gphadoop_context *context)
elog(DEBUG2, "pxf: uri %s for read", context->uri.data);
}
/*
* Format the URI for writing by adding PXF service endpoint details
*/
static void
build_uri_for_write(gphadoop_context *context)
{
appendStringInfo(&context->uri, "http://%s/%s/%s/Writable/stream?path=%s",
get_authority(), PXF_SERVICE_PREFIX, PXF_VERSION, context->write_file_name.data);
elog(DEBUG2, "pxf: uri %s with file name for write: %s", context->uri.data, context->write_file_name.data);
}
/*
* Builds a unique file name for write per segment, based on
* directory name from the table's URI, the transaction id (XID) and segment id.
* e.g. with path in URI '/data/writable/table1', XID 1234 and segment id 3,
* the file name will be '/data/writable/table1/1234_3'.
*/
static void
build_file_name_for_write(gphadoop_context *context)
{
char xid[TMGIDSIZE];
if (!getDistributedTransactionIdentifier(xid))
elog(ERROR, "Unable to obtain distributed transaction id");
appendStringInfo(&context->write_file_name, "/%s/%s_%d",
context->gphd_uri->data, xid, GpIdentity.segindex);
elog(DEBUG2, "pxf: file name for write: %s", context->write_file_name.data);
}
/*
* Add key/value pairs to connection header. These values are the context of the query and used
* by the remote component.
......
......@@ -53,8 +53,18 @@ void gpbridge_cleanup(gphadoop_context *context);
void gpbridge_import_start(gphadoop_context *context);
/*
* Reads data from PXF into the given buffer of a given size
* Sets up data before starting export
*/
void gpbridge_export_start(gphadoop_context *context);
/*
* Reads data from the PXF server into the given buffer of a given size
*/
int gpbridge_read(gphadoop_context *context, char *databuf, int datalen);
/*
* Writes data from the given buffer of a given size to the PXF server
*/
int gpbridge_write(gphadoop_context *context, char *databuf, int datalen);
#endif /* _PXFBRIDGE_H */
......@@ -98,16 +98,10 @@ static void
assign_pxf_location_to_fragments(List *fragments)
{
ListCell *frag_c = NULL;
StringInfoData authority;
initStringInfo(&authority);
appendStringInfo(&authority, "%s:%d", PxfDefaultHost, PxfDefaultPort);
foreach(frag_c, fragments)
{
FragmentData *fragment = (FragmentData *) lfirst(frag_c);
fragment->authority = pstrdup(authority.data);
fragment->authority = get_authority();
}
return;
}
......
......@@ -65,12 +65,6 @@ pxfprotocol_validate_urls(PG_FUNCTION_ARGS)
elog(DEBUG2, "pxfprotocol_validate_urls: uri %s", uri_string);
GPHDUri *uri = parseGPHDUri(uri_string);
/* Test that Fragmenter or Profile was specified in the URI */
if (!GPHDUri_opt_exists(uri, FRAGMENTER) && !GPHDUri_opt_exists(uri, PXF_PROFILE))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("FRAGMENTER or PROFILE option must exist in %s", uri->uri)));
/* No duplicate options. */
GPHDUri_verify_no_duplicate_options(uri);
......@@ -78,9 +72,9 @@ pxfprotocol_validate_urls(PG_FUNCTION_ARGS)
if (!GPHDUri_opt_exists(uri, PXF_PROFILE))
{
List *coreOptions = list_make2(ACCESSOR, RESOLVER);
if (EXTPROTOCOL_VALIDATOR_GET_DIRECTION(fcinfo) != EXT_VALIDATE_WRITE)
if (EXTPROTOCOL_VALIDATOR_GET_DIRECTION(fcinfo) != EXT_VALIDATE_WRITE)
coreOptions = lcons(FRAGMENTER, coreOptions);
GPHDUri_verify_core_options_exist(uri, coreOptions);
list_free(coreOptions);
}
......@@ -95,9 +89,30 @@ pxfprotocol_validate_urls(PG_FUNCTION_ARGS)
Datum
pxfprotocol_export(PG_FUNCTION_ARGS)
{
/* TODO: provide real implementation */
elog(INFO, "Dummy PXF protocol write");
PG_RETURN_INT32(0);
/* Must be called via the external table format manager */
check_caller(fcinfo, "pxfprotocol_export");
/* retrieve user context required for data write */
gphadoop_context *context = (gphadoop_context *) EXTPROTOCOL_GET_USER_CTX(fcinfo);
/* last call -- cleanup */
if (EXTPROTOCOL_IS_LAST_CALL(fcinfo))
{
cleanup_context(context);
EXTPROTOCOL_SET_USER_CTX(fcinfo, NULL);
PG_RETURN_INT32(0);
}
/* first call -- do any desired init */
if (context == NULL)
{
context = create_context(fcinfo, false);
EXTPROTOCOL_SET_USER_CTX(fcinfo, context);
gpbridge_export_start(context);
}
/* Read data */
int bytes_written = gpbridge_write(context, EXTPROTOCOL_GET_DATABUF(fcinfo), EXTPROTOCOL_GET_DATALEN(fcinfo));
PG_RETURN_INT32(bytes_written);
}
/*
......@@ -141,8 +156,10 @@ create_context(PG_FUNCTION_ARGS, bool is_import)
GPHDUri *uri = parseGPHDUri(EXTPROTOCOL_GET_URL(fcinfo));
Relation relation = EXTPROTOCOL_GET_RELATION(fcinfo);
/* fetch data fragments */
get_fragments(uri, relation);
if (is_import) {
/* fetch data fragments */
get_fragments(uri, relation);
}
/* set context */
gphadoop_context *context = palloc0(sizeof(gphadoop_context));
......
......@@ -80,3 +80,10 @@ concat(int num_args,...)
return str.data;
}
/* Get authority (host:port) for the PXF server URL */
char *
get_authority(void)
{
return psprintf("%s:%d", PxfDefaultHost, PxfDefaultPort);
}
......@@ -13,6 +13,9 @@ char *TypeOidGetTypename(Oid typid);
/* Concatenate multiple literal strings using stringinfo */
char *concat(int num_args,...);
/* Get authority (host:port) for the PXF server URL */
char *get_authority(void);
#define PXF_PROFILE "PROFILE"
#define FRAGMENTER "FRAGMENTER"
#define ACCESSOR "ACCESSOR"
......
......@@ -9,3 +9,5 @@ include $(top_builddir)/src/backend/mock.mk
pxfheaders.t: $(MOCK_DIR)/backend/access/external/fileam_mock.o $(MOCK_DIR)/backend/catalog/pg_exttable_mock.o
pxffragment.t: $(MOCK_DIR)/backend/cdb/cdbtm_mock.o $(top_builddir)/src/backend/utils/adt/json.o
pxfbridge.t: $(MOCK_DIR)/backend/cdb/cdbtm_mock.o
......@@ -2,22 +2,38 @@
void
gpbridge_cleanup(gphadoop_context* context)
{
check_expected(context);
mock();
check_expected(context);
mock();
}
void
gpbridge_import_start(gphadoop_context* context)
{
check_expected(context);
mock();
check_expected(context);
mock();
}
void
gpbridge_export_start(gphadoop_context *context)
{
check_expected(context);
mock();
}
int
gpbridge_read(gphadoop_context* context, char* databuf, int datalen)
{
check_expected(context);
check_expected(databuf);
check_expected(datalen);
return (int) mock();
check_expected(context);
check_expected(databuf);
check_expected(datalen);
return (int) mock();
}
int
gpbridge_write(gphadoop_context *context, char *databuf, int datalen)
{
check_expected(context);
check_expected(databuf);
check_expected(datalen);
return (int) mock();
}
\ No newline at end of file
char*
normalize_key_name(const char* key)
{
check_expected(key);
return (char*) mock();
check_expected(key);
return (char*) mock();
}
char*
......@@ -15,6 +15,12 @@ TypeOidGetTypename(Oid typid)
char*
concat(int num_args, ...)
{
check_expected(num_args);
return (char*) mock();
check_expected(num_args);
return (char*) mock();
}
char*
get_authority(void)
{
return (char*) mock();
}
\ No newline at end of file
......@@ -13,6 +13,7 @@
#include "mock/libchurl_mock.c"
#include "mock/pxfuriparser_mock.c"
#include "mock/pxfheaders_mock.c"
#include "mock/pxfutils_mock.c"
#include "../src/pxfbridge.h"
/* helper functions */
......@@ -125,6 +126,58 @@ test_gpbridge_import_start(void **state)
pfree(context);
}
void
test_gpbridge_export_start(void **state)
{
/* init data in context that will be cleaned up */
gphadoop_context *context = (gphadoop_context *) palloc0(sizeof(gphadoop_context));
initStringInfo(&context->uri);
initStringInfo(&context->write_file_name);
context->gphd_uri = (GPHDUri *) palloc0(sizeof(GPHDUri));
context->gphd_uri->data = "path";
context->gphd_uri->profile = "profile";
/* expectations for creating file name for write */
char xid[TMGIDSIZE]="abcdefghijklmnopqrstu";
GpIdentity.segindex = 3;
char* expected_file_name = psprintf("/%s/%s_%d", "path", xid, 3);
expect_any(getDistributedTransactionIdentifier, id);
will_assign_memory(getDistributedTransactionIdentifier, id, xid, TMGIDSIZE);
will_return(getDistributedTransactionIdentifier, true);
/* expectation for remote uri construction */
will_return(get_authority, "abc:123");
char* expected_uri = psprintf("http://abc:123/pxf/v15/Writable/stream?path=%s", expected_file_name);
CHURL_HEADERS headers = (CHURL_HEADERS) palloc0(sizeof(CHURL_HEADERS));
will_return(churl_headers_init, headers);
expect_any(build_http_headers, input);
/* might verify params later */
will_be_called(build_http_headers);
CHURL_HANDLE handle = (CHURL_HANDLE) palloc0(sizeof(CHURL_HANDLE));
expect_value(churl_init_upload, url, context->uri.data);
expect_value(churl_init_upload, headers, headers);
will_return(churl_init_upload, handle);
/* call function under test */
gpbridge_export_start(context);
/* assert call results */
assert_string_equal(context->write_file_name.data, expected_file_name);
assert_string_equal(context->uri.data, expected_uri);
assert_int_equal(context->churl_headers, headers);
assert_int_equal(context->churl_handle, handle);
/* cleanup */
pfree(handle);
pfree(headers);
pfree(context->gphd_uri);
pfree(context);
}
void
test_gpbridge_read_one_fragment_less_than_buffer(void **state)
{
......@@ -343,6 +396,50 @@ test_gpbridge_read_last_fragment_finished(void **state)
pfree(context);
}
void
test_gpbridge_write_data(void **state) {
/* init data in context */
gphadoop_context *context = (gphadoop_context *) palloc0(sizeof(gphadoop_context));
initStringInfo(&context->write_file_name);
CHURL_HANDLE handle = (CHURL_HANDLE) palloc0(sizeof(CHURL_HANDLE));
context->churl_handle = handle;
/* set mock behavior */
char* databuf = "foo";
int datalen = 3;
expect_value(churl_write, handle, context->churl_handle);
expect_value(churl_write, buf, databuf);
expect_value(churl_write, bufsize, datalen);
will_return(churl_write, 3);
/* call function under test */
int bytes_written = gpbridge_write(context, databuf, datalen);
/* assert call results */
assert_int_equal(bytes_written, 3);
/* cleanup */
pfree(handle);
pfree(context);
}
void
test_gpbridge_write_no_data(void **state) {
/* init data in context */
gphadoop_context *context = (gphadoop_context *) palloc0(sizeof(gphadoop_context));
/* call function under test */
char* databuf;
int datalen = 0;
int bytes_written = gpbridge_write(context, databuf, datalen);
/* assert call results */
assert_int_equal(bytes_written, 0);
/* cleanup */
pfree(context);
}
static void
expect_set_headers_call(CHURL_HEADERS headers_handle, const char *header_key, const char *header_value)
{
......@@ -363,7 +460,10 @@ main(int argc, char *argv[])
unit_test(test_gpbridge_read_one_fragment_less_than_buffer),
unit_test(test_gpbridge_read_one_fragment_buffer),
unit_test(test_gpbridge_read_next_fragment_buffer),
unit_test(test_gpbridge_read_last_fragment_finished)
unit_test(test_gpbridge_read_last_fragment_finished),
unit_test(test_gpbridge_export_start),
unit_test(test_gpbridge_write_data),
unit_test(test_gpbridge_write_no_data)
};
MemoryContextInit();
......
......@@ -40,8 +40,8 @@ void
test_pxfprotocol_validate_urls(void **state)
{
/* setup call info with no call context */
PG_FUNCTION_ARGS = palloc(sizeof(FunctionCallInfoData));
fcinfo->context = palloc(sizeof(ExtProtocolValidatorData));
PG_FUNCTION_ARGS = palloc0(sizeof(FunctionCallInfoData));
fcinfo->context = palloc0(sizeof(ExtProtocolValidatorData));
fcinfo->context->type = T_ExtProtocolValidatorData;
Value *v = makeString(uri_no_profile);
List *list = list_make1(v);
......@@ -57,10 +57,6 @@ test_pxfprotocol_validate_urls(void **state)
expect_value(GPHDUri_verify_no_duplicate_options, uri, gphd_uri);
will_be_called(GPHDUri_verify_no_duplicate_options);
expect_value(GPHDUri_opt_exists, uri, gphd_uri);
expect_string(GPHDUri_opt_exists, key, FRAGMENTER);
will_return(GPHDUri_opt_exists, true);
expect_value(GPHDUri_opt_exists, uri, gphd_uri);
expect_string(GPHDUri_opt_exists, key, PXF_PROFILE);
will_return(GPHDUri_opt_exists, false);
......@@ -82,20 +78,12 @@ test_pxfprotocol_validate_urls(void **state)
pfree(fcinfo);
}
void
test_pxfprotocol_export(void **state)
{
Datum d = pxfprotocol_export(NULL);
assert_int_equal(DatumGetInt32(d), 0);
}
void
test_pxfprotocol_import_first_call(void **state)
{
/* setup call info with no call context */
PG_FUNCTION_ARGS = palloc(sizeof(FunctionCallInfoData));
fcinfo->context = palloc(sizeof(ExtProtocolData));
PG_FUNCTION_ARGS = palloc0(sizeof(FunctionCallInfoData));
fcinfo->context = palloc0(sizeof(ExtProtocolData));
fcinfo->context->type = T_ExtProtocolData;
EXTPROTOCOL_GET_DATALEN(fcinfo) = 100;
EXTPROTOCOL_GET_DATABUF(fcinfo) = palloc0(EXTPROTOCOL_GET_DATALEN(fcinfo));
......@@ -163,8 +151,8 @@ void
test_pxfprotocol_import_second_call(void **state)
{
/* setup call info with call context */
PG_FUNCTION_ARGS = palloc(sizeof(FunctionCallInfoData));
fcinfo->context = palloc(sizeof(ExtProtocolData));
PG_FUNCTION_ARGS = palloc0(sizeof(FunctionCallInfoData));
fcinfo->context = palloc0(sizeof(ExtProtocolData));
fcinfo->context->type = T_ExtProtocolData;
EXTPROTOCOL_GET_DATALEN(fcinfo) = 100;
EXTPROTOCOL_GET_DATABUF(fcinfo) = palloc0(EXTPROTOCOL_GET_DATALEN(fcinfo));
......@@ -200,10 +188,10 @@ void
test_pxfprotocol_import_last_call(void **state)
{
/* setup call info with a call context and last call indicator */
PG_FUNCTION_ARGS = palloc(sizeof(FunctionCallInfoData));
fcinfo->context = palloc(sizeof(ExtProtocolData));
PG_FUNCTION_ARGS = palloc0(sizeof(FunctionCallInfoData));
fcinfo->context = palloc0(sizeof(ExtProtocolData));
fcinfo->context->type = T_ExtProtocolData;
gphadoop_context *call_context = palloc(sizeof(gphadoop_context));
gphadoop_context *call_context = palloc0(sizeof(gphadoop_context));
EXTPROTOCOL_SET_USER_CTX(fcinfo, call_context);
EXTPROTOCOL_SET_LAST_CALL(fcinfo);
......@@ -228,6 +216,136 @@ test_pxfprotocol_import_last_call(void **state)
pfree(fcinfo);
}
void
test_pxfprotocol_export_first_call(void **state)
{
/* setup call info with no call context */
PG_FUNCTION_ARGS = palloc0(sizeof(FunctionCallInfoData));
fcinfo->context = palloc0(sizeof(ExtProtocolData));
fcinfo->context->type = T_ExtProtocolData;
EXTPROTOCOL_GET_DATALEN(fcinfo) = 100;
EXTPROTOCOL_GET_DATABUF(fcinfo) = palloc0(EXTPROTOCOL_GET_DATALEN(fcinfo));
((ExtProtocolData *) fcinfo->context)->prot_last_call = false;
((ExtProtocolData *) fcinfo->context)->prot_url = uri_param;
Relation relation = (Relation) palloc0(sizeof(Relation));
((ExtProtocolData *) fcinfo->context)->prot_relation = relation;
/* set mock behavior for uri parsing */
GPHDUri *gphd_uri = palloc0(sizeof(GPHDUri));
expect_string(parseGPHDUri, uri_str, uri_param);
will_return(parseGPHDUri, gphd_uri);
/* set mock behavior for bridge export start -- nothing here */
expect_any(gpbridge_export_start, context);
will_be_called(gpbridge_export_start);
/* set mock behavior for bridge write */
const int EXPECTED_SIZE = 31;
/* expected number of bytes written to the bridge */
expect_any(gpbridge_write, context);
expect_value(gpbridge_write, databuf, EXTPROTOCOL_GET_DATABUF(fcinfo));
expect_value(gpbridge_write, datalen, EXTPROTOCOL_GET_DATALEN(fcinfo));
will_return(gpbridge_write, EXPECTED_SIZE);
Datum d = pxfprotocol_export(fcinfo);
/* return number of bytes written to the bridge */
assert_int_equal(DatumGetInt32(d), EXPECTED_SIZE);
gphadoop_context *context = (gphadoop_context *) EXTPROTOCOL_GET_USER_CTX(fcinfo);
/* context has been created */
assert_true(context != NULL);
/* gphduri has been parsed */
assert_true(context->gphd_uri != NULL);
/* uri has been initialized */
assert_true(context->uri.data != NULL);
/* write file name initialized, but empty, since it is filled by another component */
assert_int_equal(context->write_file_name.len, 0);
assert_true(context->relation != NULL);
/* relation pointer is copied */
assert_int_equal(context->relation, relation);
/* cleanup */
pfree(relation);
pfree(gphd_uri);
pfree(EXTPROTOCOL_GET_USER_CTX(fcinfo));
pfree(EXTPROTOCOL_GET_DATABUF(fcinfo));
pfree(fcinfo->context);
pfree(fcinfo);
}
void
test_pxfprotocol_export_second_call(void **state)
{
/* setup call info with call context */
PG_FUNCTION_ARGS = palloc0(sizeof(FunctionCallInfoData));
fcinfo->context = palloc0(sizeof(ExtProtocolData));
fcinfo->context->type = T_ExtProtocolData;
EXTPROTOCOL_GET_DATALEN(fcinfo) = 100;
EXTPROTOCOL_GET_DATABUF(fcinfo) = palloc0(EXTPROTOCOL_GET_DATALEN(fcinfo));
((ExtProtocolData *) fcinfo->context)->prot_last_call = false;
gphadoop_context *call_context = palloc0(sizeof(gphadoop_context));
EXTPROTOCOL_SET_USER_CTX(fcinfo, call_context);
/* set mock behavior for bridge write */
const int EXPECTED_SIZE = 0;
/* expected number of bytes written to the bridge */
expect_value(gpbridge_write, context, call_context);
expect_value(gpbridge_write, databuf, EXTPROTOCOL_GET_DATABUF(fcinfo));
expect_value(gpbridge_write, datalen, EXTPROTOCOL_GET_DATALEN(fcinfo));
will_return(gpbridge_write, EXPECTED_SIZE);
Datum d = pxfprotocol_export(fcinfo);
assert_int_equal(DatumGetInt32(d), EXPECTED_SIZE);
/* return number of bytes written to the bridge */
assert_true(EXTPROTOCOL_GET_USER_CTX(fcinfo) == call_context);
/* context is still the same */
/* cleanup */
pfree(call_context);
pfree(EXTPROTOCOL_GET_DATABUF(fcinfo));
pfree(fcinfo->context);
pfree(fcinfo);
}
void
test_pxfprotocol_export_last_call(void **state)
{
/* setup call info with a call context and last call indicator */
PG_FUNCTION_ARGS = palloc0(sizeof(FunctionCallInfoData));
fcinfo->context = palloc0(sizeof(ExtProtocolData));
fcinfo->context->type = T_ExtProtocolData;
gphadoop_context *call_context = palloc0(sizeof(gphadoop_context));
EXTPROTOCOL_SET_USER_CTX(fcinfo, call_context);
EXTPROTOCOL_SET_LAST_CALL(fcinfo);
/* init data in context that will be cleaned up */
initStringInfo(&call_context->uri);
initStringInfo(&call_context->write_file_name);
/* set mock behavior for bridge cleanup */
expect_value(gpbridge_cleanup, context, call_context);
will_be_called(gpbridge_cleanup);
Datum d = pxfprotocol_export(fcinfo);
assert_int_equal(DatumGetInt32(d), 0);
/* 0 is returned from function */
assert_true(EXTPROTOCOL_GET_USER_CTX(fcinfo) == NULL);
/* call context is cleaned up */
/* cleanup */
pfree(fcinfo->context);
pfree(fcinfo);
}
/* test setup and teardown methods */
void
before_test(void)
......@@ -252,10 +370,13 @@ main(int argc, char *argv[])
const UnitTest tests[] = {
unit_test(test_pxfprotocol_validate_urls),
unit_test(test_pxfprotocol_export),
unit_test_setup_teardown(test_pxfprotocol_import_first_call, before_test, after_test),
unit_test_setup_teardown(test_pxfprotocol_import_second_call, before_test, after_test),
unit_test_setup_teardown(test_pxfprotocol_import_last_call, before_test, after_test)
unit_test_setup_teardown(test_pxfprotocol_import_last_call, before_test, after_test),
unit_test_setup_teardown(test_pxfprotocol_export_first_call, before_test, after_test),
unit_test_setup_teardown(test_pxfprotocol_export_second_call, before_test, after_test),
unit_test_setup_teardown(test_pxfprotocol_export_last_call, before_test, after_test)
};
MemoryContextInit();
......
......@@ -63,13 +63,22 @@ test_normalize_key_name(void **state)
}
void
test_get_authority(void **state)
{
char *authority = get_authority();
assert_string_equal(authority, "localhost:51200");
pfree(authority);
}
int
main(int argc, char *argv[])
{
cmockery_parse_arguments(argc, argv);
const UnitTest tests[] = {
unit_test(test_normalize_key_name)
unit_test(test_normalize_key_name),
unit_test(test_get_authority)
};
MemoryContextInit();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册