diff --git a/gpcontrib/pxf/src/pxfbridge.c b/gpcontrib/pxf/src/pxfbridge.c index efc1a10cb29d9e4b2a19510ce0bdf76640500674..93291246c0598fe0ed5053c893d05e8423ca0919 100644 --- a/gpcontrib/pxf/src/pxfbridge.c +++ b/gpcontrib/pxf/src/pxfbridge.c @@ -25,7 +25,6 @@ /* helper function declarations */ static void build_uri_for_read(gphadoop_context *context); static void build_uri_for_write(gphadoop_context *context); -static void build_file_name_for_write(gphadoop_context *context); static void add_querydata_to_http_headers(gphadoop_context *context); static void set_current_fragment_headers(gphadoop_context *context); static size_t fill_buffer(gphadoop_context *context, char *start, size_t size); @@ -86,7 +85,7 @@ gpbridge_import_start(gphadoop_context *context) void gpbridge_export_start(gphadoop_context *context) { - build_file_name_for_write(context); + elog(DEBUG2, "pxf: file name for write: %s", context->gphd_uri->data); build_uri_for_write(context); context->churl_headers = churl_headers_init(); add_querydata_to_http_headers(context); @@ -139,7 +138,8 @@ gpbridge_write(gphadoop_context *context, char *databuf, int datalen) if (datalen > 0) { n = churl_write(context->churl_handle, databuf, datalen); - elog(DEBUG5, "pxf gpbridge_write: wrote %zu bytes to %s", n, context->write_file_name.data); + elog(DEBUG5, "pxf gpbridge_write: segment %d wrote %zu bytes to %s", + GpIdentity.segindex, n, context->gphd_uri->data); } return (int) n; @@ -165,28 +165,10 @@ build_uri_for_read(gphadoop_context *context) static void build_uri_for_write(gphadoop_context *context) { - appendStringInfo(&context->uri, "http://%s/%s/%s/Writable/stream?path=%s", - get_authority(), PXF_SERVICE_PREFIX, PXF_VERSION, context->write_file_name.data); - elog(DEBUG2, "pxf: uri %s with file name for write: %s", context->uri.data, context->write_file_name.data); -} - -/* - * Builds a unique file name for write per segment, based on - * directory name from the table's URI, the transaction id (XID) and segment id. - * e.g. with path in URI '/data/writable/table1', XID 1234 and segment id 3, - * the file name will be '/data/writable/table1/1234_3'. - */ -static void -build_file_name_for_write(gphadoop_context *context) -{ - char xid[TMGIDSIZE]; - - if (!getDistributedTransactionIdentifier(xid)) - elog(ERROR, "Unable to obtain distributed transaction id"); - - appendStringInfo(&context->write_file_name, "/%s/%s_%d", - context->gphd_uri->data, xid, GpIdentity.segindex); - elog(DEBUG2, "pxf: file name for write: %s", context->write_file_name.data); + appendStringInfo(&context->uri, "http://%s/%s/%s/Writable/stream?path=", + get_authority(), PXF_SERVICE_PREFIX, PXF_VERSION); + elog(DEBUG2, "pxf: uri %s with file name for write: %s", + context->uri.data, context->gphd_uri->data); } diff --git a/gpcontrib/pxf/src/pxfbridge.h b/gpcontrib/pxf/src/pxfbridge.h index 71f1e3b38c82ca185425996b755ee9a0beabf07d..bd9367ce9cdc410d90e425167dfc87a173bf556d 100644 --- a/gpcontrib/pxf/src/pxfbridge.h +++ b/gpcontrib/pxf/src/pxfbridge.h @@ -38,7 +38,6 @@ typedef struct GPHDUri *gphd_uri; StringInfoData uri; ListCell *current_fragment; - StringInfoData write_file_name; Relation relation; char *filterstr; ProjectionInfo *proj_info; diff --git a/gpcontrib/pxf/src/pxfprotocol.c b/gpcontrib/pxf/src/pxfprotocol.c index c9dfc0af737691366e28b941534d32f74c1c37d2..b61c29b8317bce38fd24f7fc08e97c72ac95d319 100644 --- a/gpcontrib/pxf/src/pxfprotocol.c +++ b/gpcontrib/pxf/src/pxfprotocol.c @@ -182,7 +182,6 @@ create_context(PG_FUNCTION_ARGS, bool is_import) context->gphd_uri = uri; initStringInfo(&context->uri); - initStringInfo(&context->write_file_name); context->relation = relation; context->filterstr = filterstr; context->proj_info = proj_info; @@ -200,7 +199,6 @@ cleanup_context(gphadoop_context *context) { gpbridge_cleanup(context); pfree(context->uri.data); - pfree(context->write_file_name.data); pfree(context); } } diff --git a/gpcontrib/pxf/test/pxfbridge_test.c b/gpcontrib/pxf/test/pxfbridge_test.c index 19f20c137cfb3fbe1f74f1e07cfb0a0802b7d7cf..2efbda9de19639f16e929441eae6648f54d96db6 100644 --- a/gpcontrib/pxf/test/pxfbridge_test.c +++ b/gpcontrib/pxf/test/pxfbridge_test.c @@ -137,23 +137,13 @@ test_gpbridge_export_start(void **state) /* init data in context that will be cleaned up */ gphadoop_context *context = (gphadoop_context *) palloc0(sizeof(gphadoop_context)); initStringInfo(&context->uri); - initStringInfo(&context->write_file_name); context->gphd_uri = (GPHDUri *) palloc0(sizeof(GPHDUri)); context->gphd_uri->data = "path"; context->gphd_uri->profile = "profile"; - /* expectations for creating file name for write */ - char xid[TMGIDSIZE]="abcdefghijklmnopqrstu"; - - GpIdentity.segindex = 3; - char* expected_file_name = psprintf("/%s/%s_%d", "path", xid, 3); - expect_any(getDistributedTransactionIdentifier, id); - will_assign_memory(getDistributedTransactionIdentifier, id, xid, TMGIDSIZE); - will_return(getDistributedTransactionIdentifier, true); - /* expectation for remote uri construction */ will_return(get_authority, "abc:123"); - char* expected_uri = psprintf("http://abc:123/pxf/v15/Writable/stream?path=%s", expected_file_name); + char* expected_uri = "http://abc:123/pxf/v15/Writable/stream?path="; CHURL_HEADERS headers = (CHURL_HEADERS) palloc0(sizeof(CHURL_HEADERS)); will_return(churl_headers_init, headers); @@ -171,7 +161,6 @@ test_gpbridge_export_start(void **state) gpbridge_export_start(context); /* assert call results */ - assert_string_equal(context->write_file_name.data, expected_file_name); assert_string_equal(context->uri.data, expected_uri); assert_int_equal(context->churl_headers, headers); assert_int_equal(context->churl_handle, handle); @@ -486,16 +475,21 @@ test_gpbridge_read_last_fragment_finished(void **state) } void -test_gpbridge_write_data(void **state) { +test_gpbridge_write_data(void **state) +{ /* init data in context */ - gphadoop_context *context = (gphadoop_context *) palloc0(sizeof(gphadoop_context)); - initStringInfo(&context->write_file_name); - CHURL_HANDLE handle = (CHURL_HANDLE) palloc0(sizeof(CHURL_HANDLE)); - context->churl_handle = handle; + gphadoop_context + *context = + (gphadoop_context *) palloc0(sizeof(gphadoop_context)); + CHURL_HANDLE handle = (CHURL_HANDLE) palloc0(sizeof(CHURL_HANDLE)); + + context->gphd_uri = (GPHDUri *) palloc0(sizeof(GPHDUri)); + context->gphd_uri->data = "path"; + context->churl_handle = handle; /* set mock behavior */ - char* databuf = "foo"; - int datalen = 3; + char *databuf = "foo"; + int datalen = 3; expect_value(churl_write, handle, context->churl_handle); expect_value(churl_write, buf, databuf); expect_value(churl_write, bufsize, datalen); @@ -508,6 +502,7 @@ test_gpbridge_write_data(void **state) { assert_int_equal(bytes_written, 3); /* cleanup */ + pfree(context->gphd_uri); pfree(handle); pfree(context); } diff --git a/gpcontrib/pxf/test/pxfprotocol_test.c b/gpcontrib/pxf/test/pxfprotocol_test.c index 18fb0141b9db8108898c4ef3e93a61ff332445ec..dc95ff483ebb1f04228a78ec99dda4c349b512f9 100644 --- a/gpcontrib/pxf/test/pxfprotocol_test.c +++ b/gpcontrib/pxf/test/pxfprotocol_test.c @@ -142,7 +142,6 @@ test_pxfprotocol_import_first_call(void **state) /* uri has been initialized */ assert_true(context->uri.data != NULL); /* no write file name for import case */ - assert_int_equal(context->write_file_name.len, 0); assert_true(context->relation != NULL); /* relation pointer is copied */ assert_int_equal(context->relation, relation); @@ -208,7 +207,6 @@ test_pxfprotocol_import_last_call(void **state) /* init data in context that will be cleaned up */ initStringInfo(&call_context->uri); - initStringInfo(&call_context->write_file_name); /* set mock behavior for bridge cleanup */ expect_value(gpbridge_cleanup, context, call_context); @@ -279,7 +277,6 @@ test_pxfprotocol_export_first_call(void **state) /* uri has been initialized */ assert_true(context->uri.data != NULL); /* write file name initialized, but empty, since it is filled by another component */ - assert_int_equal(context->write_file_name.len, 0); assert_true(context->relation != NULL); /* relation pointer is copied */ assert_int_equal(context->relation, relation); @@ -345,7 +342,6 @@ test_pxfprotocol_export_last_call(void **state) /* init data in context that will be cleaned up */ initStringInfo(&call_context->uri); - initStringInfo(&call_context->write_file_name); /* set mock behavior for bridge cleanup */ expect_value(gpbridge_cleanup, context, call_context);