未验证 提交 0141a327 编写于 作者: L Lav Jain 提交者: GitHub

Set last segment's fragment header for PXF fragments (#5279)

* Set fragment index for PXF fragments per segment

* Added X-GP-LAST-FRAGMENT header to the request

to indicate that this is the last fragment for the segment.
The header is only sent for the last fragment, with the value "true".
Co-authored-by: NBen Christel <bchristel@pivotal.io>
Co-authored-by: NFrancisco Guerrero <aguerrero@pivotal.io>
上级 bc91185f
......@@ -214,6 +214,7 @@ static void
set_current_fragment_headers(gphadoop_context *context)
{
FragmentData *frag_data = (FragmentData *) lfirst(context->current_fragment);
int fragment_count = list_length(context->gphd_uri->fragments);
elog(DEBUG2, "pxf: set_current_fragment_source_name: source_name %s, index %s, has user data: %s ",
frag_data->source_name, frag_data->index, frag_data->user_data ? "TRUE" : "FALSE");
......@@ -223,6 +224,11 @@ set_current_fragment_headers(gphadoop_context *context)
churl_headers_override(context->churl_headers, "X-GP-FRAGMENT-METADATA", frag_data->fragment_md);
churl_headers_override(context->churl_headers, "X-GP-FRAGMENT-INDEX", frag_data->index);
if (frag_data->fragment_idx == fragment_count)
{
churl_headers_override(context->churl_headers, "X-GP-LAST-FRAGMENT", "true");
}
if (frag_data->user_data)
{
churl_headers_override(context->churl_headers, "X-GP-FRAGMENT-USER-DATA", frag_data->user_data);
......
......@@ -372,6 +372,7 @@ filter_fragments_for_segment(List *list)
*current = NULL;
int index = 0;
int frag_index = 1;
int4 shift = xid % GpIdentity.numsegments;
for (current = list_head(list); current != NULL; index++)
......@@ -382,6 +383,8 @@ filter_fragments_for_segment(List *list)
* current segment is the one that should process, keep the
* element, adjust cursor pointers
*/
FragmentData *frag = (FragmentData *) current->data.ptr_value;
frag->fragment_idx = frag_index++;
previous = current;
current = lnext(current);
}
......
......@@ -55,6 +55,7 @@ typedef struct FragmentData
char *fragment_md;
char *user_data;
char *profile;
int fragment_idx;
} FragmentData;
/*
......
......@@ -74,6 +74,7 @@ test_gpbridge_import_start(void **state)
fragment->profile = NULL;
fragment->source_name = "source";
fragment->user_data = "user_data";
fragment->fragment_idx = 1;
context->gphd_uri = (GPHDUri *) palloc0(sizeof(GPHDUri));
List *list = list_make1(fragment);
......@@ -93,6 +94,7 @@ test_gpbridge_import_start(void **state)
expect_set_headers_call(headers, "X-GP-DATA-FRAGMENT", fragment->index);
expect_set_headers_call(headers, "X-GP-FRAGMENT-METADATA", fragment->fragment_md);
expect_set_headers_call(headers, "X-GP-FRAGMENT-INDEX", fragment->index);
expect_set_headers_call(headers, "X-GP-LAST-FRAGMENT", "true");
expect_set_headers_call(headers, "X-GP-FRAGMENT-USER-DATA", fragment->user_data);
expect_set_headers_call(headers, "X-GP-PROFILE", context->gphd_uri->profile);
......@@ -256,6 +258,88 @@ test_gpbridge_read_one_fragment_buffer(void **state)
pfree(context);
}
void
test_gpbridge_read_first_fragment_buffer(void **state)
{
/* init data in context that will be cleaned up */
gphadoop_context *context = (gphadoop_context *) palloc0(sizeof(gphadoop_context));
initStringInfo(&context->uri);
/* setup list of fragments */
FragmentData *fragment = (FragmentData *) palloc0(sizeof(FragmentData));
FragmentData *next_fragment = (FragmentData *) palloc0(sizeof(FragmentData));
fragment->authority = AUTHORITY;
fragment->fragment_md = "md";
fragment->index = "1";
fragment->profile = NULL;
fragment->source_name = "source";
fragment->user_data = "user_data";
fragment->fragment_idx = 1;
next_fragment->authority = AUTHORITY;
next_fragment->fragment_md = "md";
next_fragment->index = "1";
next_fragment->profile = NULL;
next_fragment->source_name = "next_source";
next_fragment->user_data = "next_user_data";
next_fragment->fragment_idx = 2;
context->gphd_uri = (GPHDUri *) palloc0(sizeof(GPHDUri));
List *list = list_make2(fragment, next_fragment);
context->gphd_uri->fragments = list;
context->gphd_uri->profile = "profile";
CHURL_HEADERS headers = (CHURL_HEADERS) palloc0(sizeof(CHURL_HEADERS));
will_return(churl_headers_init, headers);
expect_any(build_http_headers, input);
/* might verify params later */
will_be_called(build_http_headers);
expect_set_headers_call(headers, "X-GP-DATA-DIR", fragment->source_name);
expect_set_headers_call(headers, "X-GP-DATA-FRAGMENT", fragment->index);
expect_set_headers_call(headers, "X-GP-FRAGMENT-METADATA", fragment->fragment_md);
expect_set_headers_call(headers, "X-GP-FRAGMENT-INDEX", fragment->index);
expect_set_headers_call(headers, "X-GP-FRAGMENT-USER-DATA", fragment->user_data);
expect_set_headers_call(headers, "X-GP-PROFILE", context->gphd_uri->profile);
CHURL_HANDLE handle = (CHURL_HANDLE) palloc0(sizeof(CHURL_HANDLE));
expect_value(churl_init_download, url, context->uri.data);
expect_value(churl_init_download, headers, headers);
will_return(churl_init_download, handle);
expect_value(churl_read_check_connectivity, handle, handle);
will_be_called(churl_read_check_connectivity);
/* call function under test */
gpbridge_import_start(context);
/* assert call results */
assert_int_equal(context->current_fragment, list_head(context->gphd_uri->fragments));
StringInfoData expected_uri;
initStringInfo(&expected_uri);
appendStringInfo(&expected_uri,
"http://%s/%s/%s/Bridge/",
AUTHORITY, PXF_SERVICE_PREFIX, PXF_VERSION);
assert_string_equal(context->uri.data, expected_uri.data);
assert_int_equal(context->churl_headers, headers);
assert_int_equal(context->churl_handle, handle);
/* cleanup */
list_free_deep(list);
pfree(handle);
pfree(headers);
pfree(context->gphd_uri);
pfree(context);
}
void
test_gpbridge_read_next_fragment_buffer(void **state)
{
......@@ -280,6 +364,7 @@ test_gpbridge_read_next_fragment_buffer(void **state)
fragment->profile = NULL;
fragment->source_name = "source";
fragment->user_data = "user_data";
fragment->fragment_idx = 2;
List *list = list_make2(prev_fragment, fragment);
......@@ -287,7 +372,7 @@ test_gpbridge_read_next_fragment_buffer(void **state)
context->gphd_uri = (GPHDUri *) palloc0(sizeof(GPHDUri));
context->gphd_uri->profile = "profile";
context->gphd_uri->fragments = (FragmentData *) palloc0(sizeof(FragmentData));
context->gphd_uri->fragments = list;
int datalen = 10;
char *databuf = (char *) palloc0(datalen);
......@@ -305,6 +390,7 @@ test_gpbridge_read_next_fragment_buffer(void **state)
expect_set_headers_call(headers, "X-GP-DATA-FRAGMENT", fragment->index);
expect_set_headers_call(headers, "X-GP-FRAGMENT-METADATA", fragment->fragment_md);
expect_set_headers_call(headers, "X-GP-FRAGMENT-INDEX", fragment->index);
expect_set_headers_call(headers, "X-GP-LAST-FRAGMENT", "true");
expect_set_headers_call(headers, "X-GP-FRAGMENT-USER-DATA", fragment->user_data);
expect_set_headers_call(headers, "X-GP-PROFILE", context->gphd_uri->profile);
......@@ -462,6 +548,7 @@ main(int argc, char *argv[])
unit_test(test_gpbridge_import_start),
unit_test(test_gpbridge_read_one_fragment_less_than_buffer),
unit_test(test_gpbridge_read_one_fragment_buffer),
unit_test(test_gpbridge_read_first_fragment_buffer),
unit_test(test_gpbridge_read_next_fragment_buffer),
unit_test(test_gpbridge_read_last_fragment_finished),
unit_test(test_gpbridge_export_start),
......
......@@ -33,7 +33,7 @@ static bool compareFragment(ListCell *fragment_cell1, ListCell *fragment_cell2);
void
test_filter_fragments_for_segment(void **state)
{
/* --- 1 segment, all fragements should be processed by it */
/* --- 1 segment, all fragments should be processed by it */
char *expected_1_1_0[1] = {"0"};
/* 1 fragment */
......@@ -206,6 +206,7 @@ test_list(int segindex, int segtotal, int xid, int fragtotal, char *expected[],
foreach_with_count(cell, filtered, i)
{
assert_true(compareString(((FragmentData *) lfirst(cell))->index, expected[i]));
assert_int_equal(i + 1, ((FragmentData *) lfirst(cell))->fragment_idx);
}
}
else
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册