提交 1fa09f5b 编写于 作者: F Francisco Guerrero

PXF: Do not rely on getDistributedTransactionId()

PXF relies on getDistributedTransactionId() to distribute load among
segments. This is important when the fragmentation call returns a small
list of fragments. Recently, the behavior of
getDistributedTransactionId() changed, and it no longer returns a value
for SELECT queries. As an alternative, we will use a combination of the
session id and the command count to balance the load on PXF.

This is a backport from master.
上级 a3c1f876
......@@ -109,7 +109,6 @@ assign_pxf_location_to_fragments(List *fragments)
FragmentData *fragment = (FragmentData *) lfirst(frag_c);
fragment->authority = get_authority();
}
return;
}
/*
......@@ -356,20 +355,15 @@ filter_fragments_for_segment(List *list)
if (!list)
elog(ERROR, "Parameter list is null in filter_fragments_for_segment");
DistributedTransactionId xid = getDistributedTransactionId();
if (xid == InvalidDistributedTransactionId)
elog(ERROR, "Cannot get distributed transaction identifier in filter_fragments_for_segment");
/*
* to determine which segment S should process an element at a given index
* I, use a randomized MOD function
*
* S = MOD(I + MOD(XID, N), N)
* S = MOD(I + MOD(gp_session_id, N) + gp_command_count, N)
*
* which ensures more fair work distribution for small lists of just a few
* elements across N segments global transaction ID is used as a
* randomizer, as it is different for every query while being the same
* elements across N segments global session ID and command count is used
* as a randomizer, as it is different for every query while being the same
* across all segments for a given query
*/
......@@ -380,11 +374,11 @@ filter_fragments_for_segment(List *list)
int index = 0;
int frag_index = 1;
int numsegments = getgpsegmentCount();
int32 shift = xid % numsegments;
int shift = gp_session_id % numsegments;
for (current = list_head(list); current != NULL; index++)
{
if (GpIdentity.segindex == (index + shift) % numsegments)
if (GpIdentity.segindex == (index + shift + gp_command_count) % numsegments)
{
/*
* current segment is the one that should process, keep the
......
......@@ -5,6 +5,7 @@
#include "postgres.h"
#include "utils/memutils.h"
#include "cdb/cdbvars.h"
/* Define UNIT_TESTING so that the extension can skip declaring PG_MODULE_MAGIC */
#define UNIT_TESTING
......@@ -23,7 +24,7 @@
/* helper functions */
static List *prepare_fragment_list(int fragtotal, int sefgindex, int segtotal, int xid);
static void test_list(int segindex, int segtotal, int xid, int fragtotal, char *expected[], int expected_total);
static void test_list(int segindex, int segtotal, int session_id, int fragtotal, char *expected[], int expected_total);
static FragmentData *buildFragment(char *index, char *source, char *userdata, char *metadata, char *profile);
static bool compareLists(List *list1, List *list2, bool (*compareType) (void *, void *));
static bool compareString(char *str1, char *str2);
......@@ -37,9 +38,9 @@ test_filter_fragments_for_segment(void **state)
/* 1 fragment */
test_list(0, 1, 1, 1, expected_1_1_0, ARRSIZE(expected_1_1_0));
/* xid = 1 */
/* session_id = 1 */
test_list(0, 1, 2, 1, expected_1_1_0, ARRSIZE(expected_1_1_0));
/* xid = 2 */
/* session_id = 2 */
char *expected_1_2_0[2] = {"0", "1"};
......@@ -163,36 +164,18 @@ test_filter_fragments_for_segment(void **state)
pfree(expected_message);
}
PG_END_TRY();
/* special case -- invalid transaction id */
old_context = CurrentMemoryContext;
PG_TRY();
{
test_list(0, 1, 0, 1, NULL, 0);
assert_false("Expected Exception");
}
PG_CATCH();
{
MemoryContextSwitchTo(old_context);
ErrorData *edata = CopyErrorData();
assert_true(edata->elevel == ERROR);
char *expected_message = pstrdup("Cannot get distributed transaction identifier in filter_fragments_for_segment");
assert_string_equal(edata->message, expected_message);
pfree(expected_message);
}
PG_END_TRY();
}
static void
test_list(int segindex, int segtotal, int xid, int fragtotal, char *expected[], int expected_total)
test_list(int segindex, int segtotal, int session_id, int fragtotal, char *expected[], int expected_total)
{
/* prepare the input list */
List *list = prepare_fragment_list(fragtotal, segindex, segtotal, xid);
List *list = prepare_fragment_list(fragtotal, segindex, segtotal, session_id);
if (list && xid != InvalidDistributedTransactionId)
if (list)
{
will_return(getgpsegmentCount, segtotal);
}
/* filter the list */
List *filtered = filter_fragments_for_segment(list);
......@@ -218,12 +201,11 @@ test_list(int segindex, int segtotal, int xid, int fragtotal, char *expected[],
}
static List *
prepare_fragment_list(int fragtotal, int segindex, int segtotal, int xid)
prepare_fragment_list(int fragtotal, int segindex, int segtotal, int session_id)
{
GpIdentity.segindex = segindex;
if (fragtotal > 0)
will_return(getDistributedTransactionId, xid);
gp_session_id = session_id;
gp_command_count = 0;
List *result = NIL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册