提交 ceb102f7 编写于 作者: F Francisco Guerrero 提交者: Francisco Guerrero

Remove XID and segmentID from the write path sent to PXF (#7234)

- The transaction ID and segment ID are only used by a subset of
  profiles. PXF should be able to append the transaction and segment IDs
  depending on the profile using the header information.
- Remove write_file_name, do not send path
上级 8a66ad18
...@@ -25,7 +25,6 @@ ...@@ -25,7 +25,6 @@
/* helper function declarations */ /* helper function declarations */
static void build_uri_for_read(gphadoop_context *context); static void build_uri_for_read(gphadoop_context *context);
static void build_uri_for_write(gphadoop_context *context); static void build_uri_for_write(gphadoop_context *context);
static void build_file_name_for_write(gphadoop_context *context);
static void add_querydata_to_http_headers(gphadoop_context *context); static void add_querydata_to_http_headers(gphadoop_context *context);
static void set_current_fragment_headers(gphadoop_context *context); static void set_current_fragment_headers(gphadoop_context *context);
static size_t fill_buffer(gphadoop_context *context, char *start, size_t size); static size_t fill_buffer(gphadoop_context *context, char *start, size_t size);
...@@ -86,7 +85,7 @@ gpbridge_import_start(gphadoop_context *context) ...@@ -86,7 +85,7 @@ gpbridge_import_start(gphadoop_context *context)
void void
gpbridge_export_start(gphadoop_context *context) gpbridge_export_start(gphadoop_context *context)
{ {
build_file_name_for_write(context); elog(DEBUG2, "pxf: file name for write: %s", context->gphd_uri->data);
build_uri_for_write(context); build_uri_for_write(context);
context->churl_headers = churl_headers_init(); context->churl_headers = churl_headers_init();
add_querydata_to_http_headers(context); add_querydata_to_http_headers(context);
...@@ -139,7 +138,8 @@ gpbridge_write(gphadoop_context *context, char *databuf, int datalen) ...@@ -139,7 +138,8 @@ gpbridge_write(gphadoop_context *context, char *databuf, int datalen)
if (datalen > 0) if (datalen > 0)
{ {
n = churl_write(context->churl_handle, databuf, datalen); n = churl_write(context->churl_handle, databuf, datalen);
elog(DEBUG5, "pxf gpbridge_write: wrote %zu bytes to %s", n, context->write_file_name.data); elog(DEBUG5, "pxf gpbridge_write: segment %d wrote %zu bytes to %s",
GpIdentity.segindex, n, context->gphd_uri->data);
} }
return (int) n; return (int) n;
...@@ -165,28 +165,10 @@ build_uri_for_read(gphadoop_context *context) ...@@ -165,28 +165,10 @@ build_uri_for_read(gphadoop_context *context)
static void static void
build_uri_for_write(gphadoop_context *context) build_uri_for_write(gphadoop_context *context)
{ {
appendStringInfo(&context->uri, "http://%s/%s/%s/Writable/stream?path=%s", appendStringInfo(&context->uri, "http://%s/%s/%s/Writable/stream?path=",
get_authority(), PXF_SERVICE_PREFIX, PXF_VERSION, context->write_file_name.data); get_authority(), PXF_SERVICE_PREFIX, PXF_VERSION);
elog(DEBUG2, "pxf: uri %s with file name for write: %s", context->uri.data, context->write_file_name.data); elog(DEBUG2, "pxf: uri %s with file name for write: %s",
} context->uri.data, context->gphd_uri->data);
/*
* Builds a unique file name for write per segment, based on
* directory name from the table's URI, the transaction id (XID) and segment id.
* e.g. with path in URI '/data/writable/table1', XID 1234 and segment id 3,
* the file name will be '/data/writable/table1/1234_3'.
*/
static void
build_file_name_for_write(gphadoop_context *context)
{
char xid[TMGIDSIZE];
if (!getDistributedTransactionIdentifier(xid))
elog(ERROR, "Unable to obtain distributed transaction id");
appendStringInfo(&context->write_file_name, "/%s/%s_%d",
context->gphd_uri->data, xid, GpIdentity.segindex);
elog(DEBUG2, "pxf: file name for write: %s", context->write_file_name.data);
} }
......
...@@ -38,7 +38,6 @@ typedef struct ...@@ -38,7 +38,6 @@ typedef struct
GPHDUri *gphd_uri; GPHDUri *gphd_uri;
StringInfoData uri; StringInfoData uri;
ListCell *current_fragment; ListCell *current_fragment;
StringInfoData write_file_name;
Relation relation; Relation relation;
char *filterstr; char *filterstr;
ProjectionInfo *proj_info; ProjectionInfo *proj_info;
......
...@@ -182,7 +182,6 @@ create_context(PG_FUNCTION_ARGS, bool is_import) ...@@ -182,7 +182,6 @@ create_context(PG_FUNCTION_ARGS, bool is_import)
context->gphd_uri = uri; context->gphd_uri = uri;
initStringInfo(&context->uri); initStringInfo(&context->uri);
initStringInfo(&context->write_file_name);
context->relation = relation; context->relation = relation;
context->filterstr = filterstr; context->filterstr = filterstr;
context->proj_info = proj_info; context->proj_info = proj_info;
...@@ -200,7 +199,6 @@ cleanup_context(gphadoop_context *context) ...@@ -200,7 +199,6 @@ cleanup_context(gphadoop_context *context)
{ {
gpbridge_cleanup(context); gpbridge_cleanup(context);
pfree(context->uri.data); pfree(context->uri.data);
pfree(context->write_file_name.data);
pfree(context); pfree(context);
} }
} }
......
...@@ -137,23 +137,13 @@ test_gpbridge_export_start(void **state) ...@@ -137,23 +137,13 @@ test_gpbridge_export_start(void **state)
/* init data in context that will be cleaned up */ /* init data in context that will be cleaned up */
gphadoop_context *context = (gphadoop_context *) palloc0(sizeof(gphadoop_context)); gphadoop_context *context = (gphadoop_context *) palloc0(sizeof(gphadoop_context));
initStringInfo(&context->uri); initStringInfo(&context->uri);
initStringInfo(&context->write_file_name);
context->gphd_uri = (GPHDUri *) palloc0(sizeof(GPHDUri)); context->gphd_uri = (GPHDUri *) palloc0(sizeof(GPHDUri));
context->gphd_uri->data = "path"; context->gphd_uri->data = "path";
context->gphd_uri->profile = "profile"; context->gphd_uri->profile = "profile";
/* expectations for creating file name for write */
char xid[TMGIDSIZE]="abcdefghijklmnopqrstu";
GpIdentity.segindex = 3;
char* expected_file_name = psprintf("/%s/%s_%d", "path", xid, 3);
expect_any(getDistributedTransactionIdentifier, id);
will_assign_memory(getDistributedTransactionIdentifier, id, xid, TMGIDSIZE);
will_return(getDistributedTransactionIdentifier, true);
/* expectation for remote uri construction */ /* expectation for remote uri construction */
will_return(get_authority, "abc:123"); will_return(get_authority, "abc:123");
char* expected_uri = psprintf("http://abc:123/pxf/v15/Writable/stream?path=%s", expected_file_name); char* expected_uri = "http://abc:123/pxf/v15/Writable/stream?path=";
CHURL_HEADERS headers = (CHURL_HEADERS) palloc0(sizeof(CHURL_HEADERS)); CHURL_HEADERS headers = (CHURL_HEADERS) palloc0(sizeof(CHURL_HEADERS));
will_return(churl_headers_init, headers); will_return(churl_headers_init, headers);
...@@ -171,7 +161,6 @@ test_gpbridge_export_start(void **state) ...@@ -171,7 +161,6 @@ test_gpbridge_export_start(void **state)
gpbridge_export_start(context); gpbridge_export_start(context);
/* assert call results */ /* assert call results */
assert_string_equal(context->write_file_name.data, expected_file_name);
assert_string_equal(context->uri.data, expected_uri); assert_string_equal(context->uri.data, expected_uri);
assert_int_equal(context->churl_headers, headers); assert_int_equal(context->churl_headers, headers);
assert_int_equal(context->churl_handle, handle); assert_int_equal(context->churl_handle, handle);
...@@ -486,16 +475,21 @@ test_gpbridge_read_last_fragment_finished(void **state) ...@@ -486,16 +475,21 @@ test_gpbridge_read_last_fragment_finished(void **state)
} }
void void
test_gpbridge_write_data(void **state) { test_gpbridge_write_data(void **state)
{
/* init data in context */ /* init data in context */
gphadoop_context *context = (gphadoop_context *) palloc0(sizeof(gphadoop_context)); gphadoop_context
initStringInfo(&context->write_file_name); *context =
CHURL_HANDLE handle = (CHURL_HANDLE) palloc0(sizeof(CHURL_HANDLE)); (gphadoop_context *) palloc0(sizeof(gphadoop_context));
context->churl_handle = handle; CHURL_HANDLE handle = (CHURL_HANDLE) palloc0(sizeof(CHURL_HANDLE));
context->gphd_uri = (GPHDUri *) palloc0(sizeof(GPHDUri));
context->gphd_uri->data = "path";
context->churl_handle = handle;
/* set mock behavior */ /* set mock behavior */
char* databuf = "foo"; char *databuf = "foo";
int datalen = 3; int datalen = 3;
expect_value(churl_write, handle, context->churl_handle); expect_value(churl_write, handle, context->churl_handle);
expect_value(churl_write, buf, databuf); expect_value(churl_write, buf, databuf);
expect_value(churl_write, bufsize, datalen); expect_value(churl_write, bufsize, datalen);
...@@ -508,6 +502,7 @@ test_gpbridge_write_data(void **state) { ...@@ -508,6 +502,7 @@ test_gpbridge_write_data(void **state) {
assert_int_equal(bytes_written, 3); assert_int_equal(bytes_written, 3);
/* cleanup */ /* cleanup */
pfree(context->gphd_uri);
pfree(handle); pfree(handle);
pfree(context); pfree(context);
} }
......
...@@ -142,7 +142,6 @@ test_pxfprotocol_import_first_call(void **state) ...@@ -142,7 +142,6 @@ test_pxfprotocol_import_first_call(void **state)
/* uri has been initialized */ /* uri has been initialized */
assert_true(context->uri.data != NULL); assert_true(context->uri.data != NULL);
/* no write file name for import case */ /* no write file name for import case */
assert_int_equal(context->write_file_name.len, 0);
assert_true(context->relation != NULL); assert_true(context->relation != NULL);
/* relation pointer is copied */ /* relation pointer is copied */
assert_int_equal(context->relation, relation); assert_int_equal(context->relation, relation);
...@@ -208,7 +207,6 @@ test_pxfprotocol_import_last_call(void **state) ...@@ -208,7 +207,6 @@ test_pxfprotocol_import_last_call(void **state)
/* init data in context that will be cleaned up */ /* init data in context that will be cleaned up */
initStringInfo(&call_context->uri); initStringInfo(&call_context->uri);
initStringInfo(&call_context->write_file_name);
/* set mock behavior for bridge cleanup */ /* set mock behavior for bridge cleanup */
expect_value(gpbridge_cleanup, context, call_context); expect_value(gpbridge_cleanup, context, call_context);
...@@ -279,7 +277,6 @@ test_pxfprotocol_export_first_call(void **state) ...@@ -279,7 +277,6 @@ test_pxfprotocol_export_first_call(void **state)
/* uri has been initialized */ /* uri has been initialized */
assert_true(context->uri.data != NULL); assert_true(context->uri.data != NULL);
/* write file name initialized, but empty, since it is filled by another component */ /* write file name initialized, but empty, since it is filled by another component */
assert_int_equal(context->write_file_name.len, 0);
assert_true(context->relation != NULL); assert_true(context->relation != NULL);
/* relation pointer is copied */ /* relation pointer is copied */
assert_int_equal(context->relation, relation); assert_int_equal(context->relation, relation);
...@@ -345,7 +342,6 @@ test_pxfprotocol_export_last_call(void **state) ...@@ -345,7 +342,6 @@ test_pxfprotocol_export_last_call(void **state)
/* init data in context that will be cleaned up */ /* init data in context that will be cleaned up */
initStringInfo(&call_context->uri); initStringInfo(&call_context->uri);
initStringInfo(&call_context->write_file_name);
/* set mock behavior for bridge cleanup */ /* set mock behavior for bridge cleanup */
expect_value(gpbridge_cleanup, context, call_context); expect_value(gpbridge_cleanup, context, call_context);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册