提交 0f0d4687 编写于 作者: L Lei Wang 提交者: Lav Jain

Add pxfprotocol_validate_urls for validating URLs and corresponding unit tests (#2952)

Signed-off-by: NAlexandra Wang <lewang@pivotal.io>
上级 6268ba99
......@@ -2,7 +2,7 @@ EXTENSION = pxf
DATA = pxf--1.0.sql
MODULE_big = pxf
OBJS = src/pxfprotocol.o src/pxfbridge.o src/pxfuriparser.o src/libchurl.o src/pxfutils.o src/pxfheaders.o
REGRESS = setup pxf
REGRESS = setup pxf pxfinvalid
ifdef USE_PGXS
PGXS := $(shell pg_config --pgxs)
......
------------------------------------------------------------------
-- PXF invalid test
------------------------------------------------------------------
CREATE EXTERNAL TABLE pxf_invalid_test (a TEXT, b TEXT, c TEXT)
LOCATION ('pxf://default/tmp/dummy1?FRAGMENTER=DemoFragmenter&ACCESSOR=&RESOLVER=DemoTextResolver')
FORMAT 'TEXT' (DELIMITER ',');
ERROR: Invalid URI pxf://default/tmp/dummy1?FRAGMENTER=DemoFragmenter&ACCESSOR=&RESOLVER=DemoTextResolver: option 'ACCESSOR=' missing value after '='
......@@ -3,9 +3,8 @@
------------------------------------------------------------------
CREATE EXTENSION pxf;
CREATE EXTERNAL TABLE pxf_read_test (a TEXT, b TEXT, c TEXT)
LOCATION ('pxf://namenode:51200/tmp/dummy1'
LOCATION ('pxf://default/tmp/dummy1'
'?FRAGMENTER=org.apache.hawq.pxf.api.examples.DemoFragmenter'
'&ACCESSOR=org.apache.hawq.pxf.api.examples.DemoAccessor'
'&RESOLVER=org.apache.hawq.pxf.api.examples.DemoTextResolver')
FORMAT 'TEXT' (DELIMITER ',');
INFO: Dummy PXF protocol validate
\ No newline at end of file
------------------------------------------------------------------
-- PXF invalid test
------------------------------------------------------------------
CREATE EXTERNAL TABLE pxf_invalid_test (a TEXT, b TEXT, c TEXT)
LOCATION ('pxf://default/tmp/dummy1?FRAGMENTER=DemoFragmenter&ACCESSOR=&RESOLVER=DemoTextResolver')
FORMAT 'TEXT' (DELIMITER ',');
......@@ -5,7 +5,7 @@
CREATE EXTENSION pxf;
CREATE EXTERNAL TABLE pxf_read_test (a TEXT, b TEXT, c TEXT)
LOCATION ('pxf://namenode:51200/tmp/dummy1'
LOCATION ('pxf://default/tmp/dummy1'
'?FRAGMENTER=org.apache.hawq.pxf.api.examples.DemoFragmenter'
'&ACCESSOR=org.apache.hawq.pxf.api.examples.DemoAccessor'
'&RESOLVER=org.apache.hawq.pxf.api.examples.DemoTextResolver')
......
......@@ -18,6 +18,7 @@
*
*/
#include "pxfutils.h"
#include "pxfbridge.h"
#include "access/extprotocol.h"
......@@ -46,9 +47,44 @@ static void check_caller(PG_FUNCTION_ARGS, const char* func_name);
Datum
pxfprotocol_validate_urls(PG_FUNCTION_ARGS)
{
//TODO: provide real implementation
elog(INFO, "Dummy PXF protocol validate");
PG_RETURN_VOID();
/* Must be called via the external table format manager */
if (!CALLED_AS_EXTPROTOCOL_VALIDATOR(fcinfo))
elog(ERROR, "cannot execute pxfprotocol_validate_urls outside protocol manager");
/* There must be only ONE url. */
if (EXTPROTOCOL_VALIDATOR_GET_NUM_URLS(fcinfo) != 1)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("number of URLs must be one")));
char *uri_string = EXTPROTOCOL_VALIDATOR_GET_NTH_URL(fcinfo, 1);
elog(DEBUG2, "pxfprotocol_validate_urls: uri %s", uri_string);
GPHDUri *uri = parseGPHDUri(uri_string);
/* Test that Fragmenter or Profile was specified in the URI */
if (!GPHDUri_opt_exists(uri, FRAGMENTER) && !GPHDUri_opt_exists(uri, PXF_PROFILE))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("FRAGMENTER or PROFILE option must exist in %s", uri->uri)));
/* Check for valid cluster name */
GPHDUri_verify_cluster_exists(uri, PXF_CLUSTER);
/* No duplicate options. */
GPHDUri_verify_no_duplicate_options(uri);
/* Check for existence of core options if profile wasn't supplied */
if (!GPHDUri_opt_exists(uri, PXF_PROFILE))
{
List *coreOptions = list_make2(ACCESSOR, RESOLVER);
if (EXTPROTOCOL_VALIDATOR_GET_DIRECTION(fcinfo) != EXT_VALIDATE_WRITE)
coreOptions = lcons(FRAGMENTER, coreOptions);
GPHDUri_verify_core_options_exist(uri, coreOptions);
list_free(coreOptions);
}
freeGPHDUri(uri);
PG_RETURN_VOID();
}
/*
......@@ -58,7 +94,7 @@ Datum
pxfprotocol_export(PG_FUNCTION_ARGS)
{
//TODO: provide real implementation
elog(INFO, "Dummy PXF protocol write");
elog(INFO, "Dummy PXF protocol write");
PG_RETURN_INT32(0);
}
......@@ -89,7 +125,6 @@ pxfprotocol_import(PG_FUNCTION_ARGS)
}
int bytes_read = gpbridge_read(context, EXTPROTOCOL_GET_DATABUF(fcinfo), EXTPROTOCOL_GET_DATALEN(fcinfo));
PG_RETURN_INT32(bytes_read);
}
......@@ -113,7 +148,6 @@ create_context(PG_FUNCTION_ARGS, bool is_import)
fragment 2: segwork=46@127.0.0.1@51200@tmp/dummy1.2@0@ZnJhZ21lbnQy@@@
fragment 3: segwork=46@127.0.0.1@51200@tmp/dummy1.3@0@ZnJhZ21lbnQz@@@
*/
appendStringInfo(&uri_with_segwork,
"%s&segwork=46@127.0.0.1@51200@tmp/dummy1.%d@0@ZnJhZ21lbnQ%c@@@",
original_uri,
......@@ -159,5 +193,5 @@ check_caller(PG_FUNCTION_ARGS, const char* func_name)
if (!CALLED_AS_EXTPROTOCOL(fcinfo))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("%s not called by external protocol manager", func_name)));
errmsg("%s not called by external protocol manager", func_name)));
}
......@@ -42,12 +42,12 @@
*/
typedef struct FragmentData
{
char *authority;
char *index;
char *source_name;
char *fragment_md;
char *user_data;
char *profile;
char *authority;
char *index;
char *source_name;
char *fragment_md;
char *user_data;
char *profile;
} FragmentData;
/*
......@@ -55,8 +55,8 @@ typedef struct FragmentData
*/
typedef struct OptionData
{
char *key;
char *value;
char *key;
char *value;
} OptionData;
/*
......@@ -64,24 +64,34 @@ typedef struct OptionData
*/
typedef struct GPHDUri
{
char *uri; /* the unparsed user uri */
char *protocol; /* the protocol name */
char *host; /* host name str */
char *port; /* port number as string */
char *data; /* data location (path) */
char *profile; /* profile option */
List *fragments; /* list of FragmentData */
List *options; /* list of OptionData */
char *uri; /* the unparsed user uri */
char *protocol; /* the protocol name */
char *cluster; /* cluster name str */
char *host; /* host name str */
char *port; /* port number as string */
char *data; /* data location (path) */
char *profile; /* profile option */
List *fragments; /* list of FragmentData */
List *options; /* list of OptionData */
} GPHDUri;
/*
* Parses a string URI into a data structure
*/
GPHDUri *parseGPHDUri(const char *uri_str);
GPHDUri *parseGPHDUri(const char *uri_str);
GPHDUri *parseGPHDUriHostPort(const char *uri_str, const char *host, const int port);
/*
* Validation functions
*/
bool GPHDUri_opt_exists(GPHDUri *uri, char *key);
void GPHDUri_verify_no_duplicate_options(GPHDUri *uri);
void GPHDUri_verify_core_options_exist(GPHDUri *uri, List *coreOptions);
void GPHDUri_verify_cluster_exists(GPHDUri *uri, char* cluster);
/*
* Frees the elements of the data structure
*/
void freeGPHDUri(GPHDUri *uri);
void freeGPHDUri(GPHDUri *uri);
#endif // _PXFURIPARSER_H_
#endif // _PXFURIPARSER_H_
......@@ -18,22 +18,18 @@
*/
#include "pxfutils.h"
#include "commands/copy.h"
#include "utils/formatting.h"
#include "utils/syscache.h"
/* public function declarations */
static void process_request(ClientContext* client_context, char *uri);
/*
* Checks if two strings are equal
*/
bool
are_ips_equal(char *ip1, char *ip2)
{
if ((ip1 == NULL) || (ip2 == NULL))
return false;
return (strcmp(ip1, ip2) == 0);
if ((ip1 == NULL) || (ip2 == NULL))
return false;
return (strcmp(ip1, ip2) == 0);
}
/*
......@@ -42,80 +38,16 @@ are_ips_equal(char *ip1, char *ip2)
void
port_to_str(char **port, int new_port)
{
char tmp[10];
if (!port)
elog(ERROR, "unexpected internal error in pxfutils.c");
if (*port)
pfree(*port);
char tmp[10];
Assert((new_port <= 65535) && (new_port >= 1)); /* legal port range */
pg_ltoa(new_port, tmp);
*port = pstrdup(tmp);
}
if (!port)
elog(ERROR, "unexpected internal error in pxfutils.c");
if (*port)
pfree(*port);
/*
* call_rest
*
* Creates the REST message and sends it to the PXF service located on
* <hadoop_uri->host>:<hadoop_uri->port>
*/
void
call_rest(GPHDUri *hadoop_uri, ClientContext *client_context, char *rest_msg)
{
StringInfoData request;
initStringInfo(&request);
appendStringInfo(&request, rest_msg,
hadoop_uri->host,
hadoop_uri->port,
PXF_SERVICE_PREFIX,
PXF_VERSION);
/* send the request. The response will exist in rest_buf.data */
process_request(client_context, request.data);
pfree(request.data);
}
/*
* Reads from churl in chunks of 64K and copies data to the context's buffer
*/
static void
process_request(ClientContext* client_context, char *uri)
{
size_t n = 0;
char buffer[RAW_BUF_SIZE];
print_http_headers(client_context->http_headers);
client_context->handle = churl_init_download(uri, client_context->http_headers);
if (client_context->handle == NULL)
elog(ERROR, "Unsuccessful connection to uri: %s", uri);
memset(buffer, 0, RAW_BUF_SIZE);
resetStringInfo(&(client_context->the_rest_buf));
/*
* This try-catch ensures that in case of an exception during the "communication with PXF and the accumulation of
* PXF data in client_context->the_rest_buf", we still get to terminate the libcurl connection nicely and avoid
* leaving the PXF server connection hung.
*/
PG_TRY();
{
/* read some bytes to make sure the connection is established */
churl_read_check_connectivity(client_context->handle);
while ((n = churl_read(client_context->handle, buffer, sizeof(buffer))) != 0)
{
appendBinaryStringInfo(&(client_context->the_rest_buf), buffer, n);
memset(buffer, 0, RAW_BUF_SIZE);
}
churl_cleanup(client_context->handle, false);
}
PG_CATCH();
{
if (client_context->handle)
churl_cleanup(client_context->handle, true);
PG_RE_THROW();
}
PG_END_TRY();
Assert((new_port <= 65535) && (new_port >= 1)); /* legal port range */
pg_ltoa(new_port, tmp);
*port = pstrdup(tmp);
}
/*
......@@ -126,20 +58,20 @@ process_request(ClientContext* client_context, char *uri)
char*
normalize_key_name(const char* key)
{
if (!key || strlen(key) == 0)
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("internal error in pxfutils.c:normalize_key_name. Parameter key is null or empty.")));
}
StringInfoData formatter;
initStringInfo(&formatter);
char* upperCasedKey = str_toupper(pstrdup(key), strlen(key));
appendStringInfo(&formatter, "X-GP-%s", upperCasedKey);
pfree(upperCasedKey);
return formatter.data;
if (!key || strlen(key) == 0)
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("internal error in pxfutils.c:normalize_key_name. Parameter key is null or empty.")));
}
StringInfoData formatter;
initStringInfo(&formatter);
char* upperCasedKey = str_toupper(pstrdup(key), strlen(key));
appendStringInfo(&formatter, "X-GP-%s", upperCasedKey);
pfree(upperCasedKey);
return formatter.data;
}
/*
......@@ -150,26 +82,26 @@ char*
TypeOidGetTypename(Oid typid)
{
Assert(OidIsValid(typid));
Assert(OidIsValid(typid));
HeapTuple typtup;
Form_pg_type typform;
HeapTuple typtup;
Form_pg_type typform;
typtup = SearchSysCache(TYPEOID,
ObjectIdGetDatum(typid),
0, 0, 0);
if (!HeapTupleIsValid(typtup))
elog(ERROR, "cache lookup failed for type %u", typid);
typtup = SearchSysCache(TYPEOID,
ObjectIdGetDatum(typid),
0, 0, 0);
if (!HeapTupleIsValid(typtup))
elog(ERROR, "cache lookup failed for type %u", typid);
typform = (Form_pg_type) GETSTRUCT(typtup);
typform = (Form_pg_type) GETSTRUCT(typtup);
char *typname = NameStr(typform->typname);
char *typname = NameStr(typform->typname);
StringInfoData tname;
initStringInfo(&tname);
appendStringInfo(&tname, "%s", typname);
StringInfoData tname;
initStringInfo(&tname);
appendStringInfo(&tname, "%s", typname);
ReleaseSysCache(typtup);
ReleaseSysCache(typtup);
return tname.data;
return tname.data;
}
......@@ -21,31 +21,28 @@
#define _PXFUTILS_H_
#include "postgres.h"
#include "libchurl.h"
#include "pxfuriparser.h"
typedef struct sClientContext
{
CHURL_HEADERS http_headers;
CHURL_HANDLE handle;
/* part of the HTTP response - received */
/* from one call to churl_read */
StringInfoData the_rest_buf; /* contains the complete HTTP response */
} ClientContext;
/* checks if two ip strings are equal */
bool are_ips_equal(char *ip1, char *ip2);
/* override port str with given new port int */
void port_to_str(char** port, int new_port);
/* parse the REST message and issue the libchurl call */
void call_rest(GPHDUri *hadoop_uri, ClientContext *client_context, char* rest_msg);
/* convert input string to upper case and prepend "X-GP-" prefix */
char* normalize_key_name(const char* key);
/* get the name of the type, given the OID */
char* TypeOidGetTypename(Oid typid);
#endif // _PXFUTILS_H_
#define PXF_CLUSTER "default"
#define PXF_PROFILE "PROFILE"
#define FRAGMENTER "FRAGMENTER"
#define ACCESSOR "ACCESSOR"
#define RESOLVER "RESOLVER"
#define ANALYZER "ANALYZER"
#define PxfDefaultHost "localhost"
#define PxfDefaultPortStr "51200"
#define PxfDefaultPort 51200
#endif // _PXFUTILS_H_
......@@ -12,3 +12,34 @@ freeGPHDUri(GPHDUri* uri)
check_expected(uri);
mock();
}
bool
GPHDUri_opt_exists(GPHDUri *uri, char *key)
{
check_expected(uri);
check_expected(key);
return (bool) mock();
}
void
GPHDUri_verify_no_duplicate_options(GPHDUri *uri)
{
check_expected(uri);
mock();
}
void
GPHDUri_verify_core_options_exist(GPHDUri *uri, List *coreoptions)
{
check_expected(uri);
check_expected(coreoptions);
mock();
}
void
GPHDUri_verify_cluster_exists(GPHDUri *uri, char* cluster)
{
check_expected(uri);
check_expected(cluster);
mock();
}
\ No newline at end of file
......@@ -18,24 +18,6 @@ port_to_str(char** port, int new_port)
mock();
}
void
call_rest(GPHDUri* hadoop_uri, ClientContext* client_context, char* rest_msg)
{
check_expected(hadoop_uri);
check_expected(client_context);
check_expected(rest_msg);
optional_assignment(hadoop_uri);
optional_assignment(client_context);
optional_assignment(rest_msg);
mock();
}
static void
process_request(ClientContext* client_context, char* uri)
{
mock();
}
char*
normalize_key_name(const char* key)
{
......
......@@ -32,14 +32,55 @@
#include "mock/pxfbridge_mock.c"
#include "mock/pxfuriparser_mock.c"
const char* uri_no_profile = "pxf://default/tmp/dummy1?FRAGMENTER=xxx&RESOLVER=yyy&ACCESSOR=zzz";
const char* uri_param = "pxf://localhost:51200/tmp/dummy1";
const char* uri_param_segwork = "pxf://localhost:51200/tmp/dummy1&segwork=46@127.0.0.1@51200@tmp/dummy1.1@0@ZnJhZ21lbnQx@@@";
void
test_pxfprotocol_validate_urls(void **state)
{
Datum d = pxfprotocol_validate_urls(NULL);
/* setup call info with no call context */
PG_FUNCTION_ARGS = palloc(sizeof(FunctionCallInfoData));
fcinfo->context = palloc(sizeof(ExtProtocolValidatorData));
fcinfo->context->type = T_ExtProtocolValidatorData;
Value *v = makeString(uri_no_profile);
List* list = list_make1(v);
((ExtProtocolValidatorData*) fcinfo->context)->url_list = list;
/* set mock behavior for uri parsing */
GPHDUri* gphd_uri = palloc0(sizeof(GPHDUri));
expect_string(parseGPHDUri, uri_str, uri_no_profile);
will_return(parseGPHDUri, gphd_uri);
expect_value(GPHDUri_verify_cluster_exists, uri, gphd_uri);
expect_string(GPHDUri_verify_cluster_exists, cluster, PXF_CLUSTER);
will_be_called(GPHDUri_verify_cluster_exists);
expect_value(GPHDUri_verify_no_duplicate_options, uri, gphd_uri);
will_be_called(GPHDUri_verify_no_duplicate_options);
expect_value(GPHDUri_opt_exists, uri, gphd_uri);
expect_string(GPHDUri_opt_exists, key, FRAGMENTER);
will_return(GPHDUri_opt_exists, true);
expect_value(GPHDUri_opt_exists, uri, gphd_uri);
expect_string(GPHDUri_opt_exists, key, PXF_PROFILE);
will_return(GPHDUri_opt_exists, false);
expect_value(GPHDUri_verify_core_options_exist, uri, gphd_uri);
expect_any(GPHDUri_verify_core_options_exist, coreoptions);
will_be_called(GPHDUri_verify_core_options_exist);
expect_value(freeGPHDUri, uri, gphd_uri);
will_be_called(freeGPHDUri);
Datum d = pxfprotocol_validate_urls(fcinfo);
assert_int_equal(DatumGetInt32(d), 0);
/* cleanup */
list_free_deep(list);
pfree(fcinfo->context);
pfree(fcinfo);
}
void
......
......@@ -26,19 +26,16 @@
#define UNIT_TESTING
/* include unit under test */
#include "../src/pxfutils.c"
#include "../src/pxfuriparser.c"
/* include mock files */
#include "mock/pxfutils_mock.c"
static void expect_normalize_key_name(const char *key);
static void test_parseGPHDUri_helper(const char* uri, const char* message);
static void test_parseFragment_helper(const char* fragment, const char* message);
static void test_verify_cluster_exception_helper(const char* uri_str);
static char uri_no_segwork[] = "pxf://1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER=SomeFragmenter&ACCESSOR=SomeAccessor&RESOLVER=SomeResolver&ANALYZER=SomeAnalyzer";
static char uri_with_segwork_1[] = "pxf://1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER=SomeFragmenter&ACCESSOR=SomeAccessor&RESOLVER=SomeResolver&ANALYZER=SomeAnalyzer&segwork=42@127.0.0.1@51200@tmp/test@0@ZnJhZ21lbnQx@@@";
static char uri_with_segwork_2[] = "pxf://1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER=SomeFragmenter&ACCESSOR=SomeAccessor&RESOLVER=SomeResolver&ANALYZER=SomeAnalyzer&segwork=42@127.0.0.1@51200@tmp/test@0@ZnJhZ21lbnQx@@@41@127.0.0.1@51200@tmp/foo@0@ZnJhZ21lbnQx@@@";
static char uri_with_no_segwork[] = "pxf://default/some/path/and/table.tbl?FRAGMENTER=SomeFragmenter&ACCESSOR=SomeAccessor&RESOLVER=SomeResolver&ANALYZER=SomeAnalyzer";
static char uri_with_segwork_1[] = "pxf://default/some/path/and/table.tbl?FRAGMENTER=SomeFragmenter&ACCESSOR=SomeAccessor&RESOLVER=SomeResolver&ANALYZER=SomeAnalyzer&segwork=42@127.0.0.1@51200@tmp/test@0@ZnJhZ21lbnQx@@@";
static char uri_with_segwork_2[] = "pxf://default/some/path/and/table.tbl?FRAGMENTER=SomeFragmenter&ACCESSOR=SomeAccessor&RESOLVER=SomeResolver&ANALYZER=SomeAnalyzer&segwork=42@127.0.0.1@51200@tmp/test@0@ZnJhZ21lbnQx@@@41@127.0.0.1@51200@tmp/foo@0@ZnJhZ21lbnQx@@@";
/*
* Test parsing of valid uri as given in LOCATION in a PXF external table.
......@@ -46,46 +43,37 @@ static char uri_with_segwork_2[] = "pxf://1.2.3.4:5678/some/path/and/table.tbl?F
void
test_parseGPHDUri_ValidURI(void **state)
{
List* options = NIL;
ListCell* cell = NULL;
OptionData* option = NULL;
expect_normalize_key_name("FRAGMENTER");
expect_normalize_key_name("ACCESSOR");
expect_normalize_key_name("RESOLVER");
expect_normalize_key_name("ANALYZER");
GPHDUri* parsed = parseGPHDUri(uri_with_segwork_1);
assert_true(parsed != NULL);
assert_string_equal(parsed->uri, uri_no_segwork);
assert_string_equal(parsed->uri, uri_with_no_segwork);
assert_string_equal(parsed->protocol, "pxf");
assert_string_equal(parsed->host, "1.2.3.4");
assert_string_equal(parsed->port, "5678");
assert_string_equal(parsed->host, PxfDefaultHost);
assert_string_equal(parsed->port, PxfDefaultPortStr);
assert_string_equal(parsed->data, "some/path/and/table.tbl");
options = parsed->options;
List *options = parsed->options;
assert_int_equal(list_length(options), 4);
cell = list_nth_cell(options, 0);
option = lfirst(cell);
assert_string_equal(option->key, "FRAGMENTER");
ListCell* cell = list_nth_cell(options, 0);
OptionData* option = lfirst(cell);
assert_string_equal(option->key, FRAGMENTER);
assert_string_equal(option->value, "SomeFragmenter");
cell = list_nth_cell(options, 1);
option = lfirst(cell);
assert_string_equal(option->key, "ACCESSOR");
assert_string_equal(option->key, ACCESSOR);
assert_string_equal(option->value, "SomeAccessor");
cell = list_nth_cell(options, 2);
option = lfirst(cell);
assert_string_equal(option->key, "RESOLVER");
assert_string_equal(option->key, RESOLVER);
assert_string_equal(option->value, "SomeResolver");
cell = list_nth_cell(options, 3);
option = lfirst(cell);
assert_string_equal(option->key, "ANALYZER");
assert_string_equal(option->key, ANALYZER);
assert_string_equal(option->value, "SomeAnalyzer");
assert_true(parsed->fragments != NULL);
......@@ -104,7 +92,7 @@ test_parseGPHDUri_ValidURI(void **state)
void
test_parseGPHDUri_NegativeTestNoProtocol(void **state)
{
char* uri = "pxf:/1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER=HdfsDataFragmenter";
char* uri = "pxf:/default/some/path/and/table.tbl?FRAGMENTER=HdfsDataFragmenter";
test_parseGPHDUri_helper(uri, "");
}
......@@ -114,17 +102,27 @@ test_parseGPHDUri_NegativeTestNoProtocol(void **state)
void
test_parseGPHDUri_NegativeTestNoOptions(void **state)
{
char *uri = "pxf://1.2.3.4:5678/some/path/and/table.tbl";
char *uri = "pxf://default/some/path/and/table.tbl";
test_parseGPHDUri_helper(uri, ": missing options section");
}
/*
* Negative test: parsing of uri without cluster part
*/
void
test_parseGPHDUri_NegativeTestNoCluster(void **state)
{
char *uri = "pxf:///default/some/path/and/table.tbl";
test_parseGPHDUri_helper(uri, ": missing cluster section");
}
/*
* Negative test: parsing of a uri with a missing equal
*/
void
test_parseGPHDUri_NegativeTestMissingEqual(void **state)
{
char* uri = "pxf://1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER";
char* uri = "pxf://default/some/path/and/table.tbl?FRAGMENTER";
test_parseGPHDUri_helper(uri, ": option 'FRAGMENTER' missing '='");
}
......@@ -134,7 +132,7 @@ test_parseGPHDUri_NegativeTestMissingEqual(void **state)
void
test_parseGPHDUri_NegativeTestDuplicateEquals(void **state)
{
char* uri = "pxf://1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER=HdfsDataFragmenter=DuplicateFragmenter";
char* uri = "pxf://default/some/path/and/table.tbl?FRAGMENTER=HdfsDataFragmenter=DuplicateFragmenter";
test_parseGPHDUri_helper(uri, ": option 'FRAGMENTER=HdfsDataFragmenter=DuplicateFragmenter' contains duplicate '='");
}
......@@ -144,7 +142,7 @@ test_parseGPHDUri_NegativeTestDuplicateEquals(void **state)
void
test_parseGPHDUri_NegativeTestMissingKey(void **state)
{
char* uri = "pxf://1.2.3.4:5678/some/path/and/table.tbl?=HdfsDataFragmenter";
char* uri = "pxf://default/some/path/and/table.tbl?=HdfsDataFragmenter";
test_parseGPHDUri_helper(uri, ": option '=HdfsDataFragmenter' missing key before '='");
}
......@@ -154,7 +152,7 @@ test_parseGPHDUri_NegativeTestMissingKey(void **state)
void
test_parseGPHDUri_NegativeTestMissingValue(void **state)
{
char* uri = "pxf://1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER=";
char* uri = "pxf://default/some/path/and/table.tbl?FRAGMENTER=";
test_parseGPHDUri_helper(uri, ": option 'FRAGMENTER=' missing value after '='");
}
......@@ -209,7 +207,6 @@ test_GPHDUri_parse_fragment_EmptyProfile(void **state)
GPHDUri_free_fragment(fragment_data);
list_free(fragments);
}
/*
......@@ -219,7 +216,7 @@ void
test_GPHDUri_parse_fragment_EmptyString(void **state)
{
char* fragment = "";
test_parseFragment_helper(fragment, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.");
test_parseFragment_helper(fragment, "internal error in GPHDUri_parse_fragment. Fragment string is invalid.");
}
/*
......@@ -229,7 +226,7 @@ void
test_GPHDUri_parse_fragment_NullFragment(void **state)
{
char *fragment = NULL;
test_parseFragment_helper(fragment, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is null.");
test_parseFragment_helper(fragment, "internal error in GPHDUri_parse_fragment. Fragment string is null.");
}
/*
......@@ -239,7 +236,7 @@ void
test_GPHDUri_parse_fragment_MissingIpHost(void **state)
{
char* fragment = "@";
test_parseFragment_helper(fragment, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.");
test_parseFragment_helper(fragment, "internal error in GPHDUri_parse_fragment. Fragment string is invalid.");
}
/*
......@@ -249,7 +246,7 @@ void
test_GPHDUri_parse_fragment_MissingPort(void **state)
{
char* fragment = "@HOST@";
test_parseFragment_helper(fragment, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.");
test_parseFragment_helper(fragment, "internal error in GPHDUri_parse_fragment. Fragment string is invalid.");
}
/*
......@@ -259,7 +256,7 @@ void
test_GPHDUri_parse_fragment_MissingSourceName(void **state)
{
char* fragment = "@HOST@PORT@";
test_parseFragment_helper(fragment, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.");
test_parseFragment_helper(fragment, "internal error in GPHDUri_parse_fragment. Fragment string is invalid.");
}
/*
......@@ -269,7 +266,7 @@ void
test_GPHDUri_parse_fragment_MissingIndex(void **state)
{
char* fragment = "@HOST@PORT@SOURCE_NAME@";
test_parseFragment_helper(fragment, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.");
test_parseFragment_helper(fragment, "internal error in GPHDUri_parse_fragment. Fragment string is invalid.");
}
/*
......@@ -279,7 +276,7 @@ void
test_GPHDUri_parse_fragment_MissingFragmentMetadata(void **state)
{
char* fragment = "@HOST@PORT@SOURCE_NAME@42@";
test_parseFragment_helper(fragment, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.");
test_parseFragment_helper(fragment, "internal error in GPHDUri_parse_fragment. Fragment string is invalid.");
}
/*
......@@ -289,7 +286,7 @@ void
test_GPHDUri_parse_fragment_MissingUserData(void **state)
{
char* fragment = "HOST@REST_PORT@TABLE_NAME@INDEX@FRAGMENT_METADATA@";
test_parseFragment_helper(fragment, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.");
test_parseFragment_helper(fragment, "internal error in GPHDUri_parse_fragment. Fragment string is invalid.");
}
/*
......@@ -299,23 +296,28 @@ void
test_GPHDUri_parse_fragment_MissingProfile(void **state)
{
char* fragment = "HOST@REST_PORT@TABLE_NAME@INDEX@FRAGMENT_METADATA@USER_METADATA@";
test_parseFragment_helper(fragment, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.");
test_parseFragment_helper(fragment, "internal error in GPHDUri_parse_fragment. Fragment string is invalid.");
}
/*
* Test GPHDUri_parse_fragment when there is no segwork in the URI
*/
void
test_GPHDUri_parse_segwork_NoSegwork(void **state)
{
GPHDUri *uri = (GPHDUri *)palloc0(sizeof(GPHDUri));
GPHDUri_parse_segwork(uri, uri_no_segwork);
GPHDUri *uri = (GPHDUri *)palloc0(sizeof(GPHDUri));
GPHDUri_parse_segwork(uri, uri_with_no_segwork);
assert_true(uri->fragments == NULL);
pfree(uri);
}
/*
* Test GPHDUri_parse_fragment when there are more than one fragments in segwork
*/
void
test_GPHDUri_parse_segwork_TwoFragments(void **state)
{
GPHDUri *uri = (GPHDUri *)palloc0(sizeof(GPHDUri));
GPHDUri *uri = (GPHDUri *)palloc0(sizeof(GPHDUri));
GPHDUri_parse_segwork(uri, uri_with_segwork_2);
assert_true(uri->fragments != NULL);
......@@ -327,6 +329,183 @@ test_GPHDUri_parse_segwork_TwoFragments(void **state)
pfree(uri);
}
/*
* Test GPHDUri_opt_exists to check if a specified option is in the URI
*/
void
test_GPHDUri_opt_exists(void **state)
{
char* uri_str = "xyz?FRAGMENTER=HdfsDataFragmenter&RESOLVER=SomeResolver";
char* cursor = strstr(uri_str, "?");
GPHDUri *uri = (GPHDUri *)palloc0(sizeof(GPHDUri));
GPHDUri_parse_options(uri, &cursor);
bool exists = GPHDUri_opt_exists(uri, "FRAGMENTER");
assert_true(exists);
exists = GPHDUri_opt_exists(uri, "RESOLVER");
assert_true(exists);
exists = GPHDUri_opt_exists(uri, "ACCESSOR");
assert_false(exists);
pfree(uri);
}
/*
* Test GPHDUri_verify_no_duplicate_options to check that there are no duplicate options
*/
void
test_GPHDUri_verify_no_duplicate_options(void **state)
{
/* No duplicates */
char* uri_no_dup_opts = "xyz?FRAGMENTER=HdfsDataFragmenter&RESOLVER=SomeResolver";
char* cursor = strstr(uri_no_dup_opts, "?");
GPHDUri *uri = (GPHDUri *)palloc0(sizeof(GPHDUri));
GPHDUri_parse_options(uri, &cursor);
GPHDUri_verify_no_duplicate_options(uri);
pfree(uri);
/* Expect error if duplicate options specified */
char* uri_dup_opts = "xyz?FRAGMENTER=HdfsDataFragmenter&FRAGMENTER=SomeFragmenter";
cursor = strstr(uri_dup_opts, "?");
uri = (GPHDUri *)palloc0(sizeof(GPHDUri));
GPHDUri_parse_options(uri, &cursor);
MemoryContext old_context = CurrentMemoryContext;
PG_TRY();
{
GPHDUri_verify_no_duplicate_options(uri);
assert_false("Expected Exception");
}
PG_CATCH();
{
MemoryContextSwitchTo(old_context);
ErrorData *edata = CopyErrorData();
FlushErrorState();
/*Validate the type of expected error */
assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR);
assert_true(edata->elevel == ERROR);
StringInfoData expected_message;
initStringInfo(&expected_message);
appendStringInfo(&expected_message, "Invalid URI %s: Duplicate option(s): %s", uri->uri, "FRAGMENTER");
assert_string_equal(edata->message, expected_message.data);
pfree(expected_message.data);
elog_dismiss(INFO);
}
PG_END_TRY();
pfree(uri);
}
/*
* Test GPHDUri_verify_core_options_exist to check that all options in the expected list are present
*/
void
test_GPHDUri_verify_core_options_exist(void **state)
{
List *coreOptions = list_make3("FRAGMENTER", "ACCESSOR", "RESOLVER");
/* Check for presence of options in the above list */
char* uri_core_opts = "xyz?FRAGMENTER=HdfsDataFragmenter&ACCESSOR=SomeAccesor&RESOLVER=SomeResolver";
char* cursor = strstr(uri_core_opts, "?");
GPHDUri *uri = (GPHDUri *)palloc0(sizeof(GPHDUri));
GPHDUri_parse_options(uri, &cursor);
GPHDUri_verify_core_options_exist(uri, coreOptions);
pfree(uri);
/* Option RESOLVER is missing. Expect validation error */
char* uri_miss_core_opts = "xyz?FRAGMENTER=HdfsDataFragmenter&ACCESSOR=SomeAccesor";
cursor = strstr(uri_miss_core_opts, "?");
uri = (GPHDUri *)palloc0(sizeof(GPHDUri));
GPHDUri_parse_options(uri, &cursor);
MemoryContext old_context = CurrentMemoryContext;
PG_TRY();
{
GPHDUri_verify_core_options_exist(uri, coreOptions);
assert_false("Expected Exception");
}
PG_CATCH();
{
MemoryContextSwitchTo(old_context);
ErrorData *edata = CopyErrorData();
FlushErrorState();
/*Validate the type of expected error */
assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR);
assert_true(edata->elevel == ERROR);
StringInfoData expected_message;
initStringInfo(&expected_message);
appendStringInfo(&expected_message, "Invalid URI %s: %s option(s) missing", uri->uri, "RESOLVER");
assert_string_equal(edata->message, expected_message.data);
pfree(expected_message.data);
elog_dismiss(INFO);
}
PG_END_TRY();
pfree(uri);
}
/*
* Test GPHDUri_verify_cluster_exists to check if the specified cluster is present in the URI
*/
void
test_GPHDUri_verify_cluster_exists(void **state)
{
char* uri_with_cluster = "pxf://default/some/file/path?key=value";
char* cursor = strstr(uri_with_cluster, PTC_SEP) + strlen(PTC_SEP);
GPHDUri *uri = (GPHDUri *)palloc0(sizeof(GPHDUri));
GPHDUri_parse_cluster(uri, &cursor);
GPHDUri_verify_cluster_exists(uri, "default");
pfree(uri);
char* uri_different_cluster = "pxf://asdf:1034/some/file/path?key=value";
test_verify_cluster_exception_helper(uri_different_cluster);
char* uri_invalid_cluster = "pxf://asdf/default/file/path?key=value";
test_verify_cluster_exception_helper(uri_invalid_cluster);
}
/*
* Test GPHDUri_verify_cluster_exists to check if the specified cluster is present in the URI
*/
static void
test_verify_cluster_exception_helper(const char* uri_str)
{
char *cursor = strstr(uri_str, PTC_SEP) + strlen(PTC_SEP);
GPHDUri *uri = (GPHDUri *)palloc0(sizeof(GPHDUri));
GPHDUri_parse_cluster(uri, &cursor);
MemoryContext old_context = CurrentMemoryContext;
PG_TRY();
{
GPHDUri_verify_cluster_exists(uri, "default");
assert_false("Expected Exception");
}
PG_CATCH();
{
MemoryContextSwitchTo(old_context);
ErrorData *edata = CopyErrorData();
FlushErrorState();
/*Validate the type of expected error */
assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR);
assert_true(edata->elevel == ERROR);
StringInfoData expected_message;
initStringInfo(&expected_message);
appendStringInfo(&expected_message, "Invalid URI %s: CLUSTER NAME %s not found", uri->uri, "default");
assert_string_equal(edata->message, expected_message.data);
pfree(expected_message.data);
elog_dismiss(INFO);
}
PG_END_TRY();
pfree(uri);
}
/*
* Helper function for parse fragment test cases
*/
......@@ -398,16 +577,6 @@ test_parseGPHDUri_helper(const char* uri, const char* message)
PG_END_TRY();
}
static void
expect_normalize_key_name(const char *key)
{
StringInfoData keyname;
initStringInfo(&keyname);
appendStringInfo(&keyname, "X-GP-%s", key);
expect_string(normalize_key_name, key, key);
will_return(normalize_key_name, keyname.data);
}
int
main(int argc, char* argv[])
{
......@@ -417,6 +586,7 @@ main(int argc, char* argv[])
unit_test(test_parseGPHDUri_ValidURI),
unit_test(test_parseGPHDUri_NegativeTestNoProtocol),
unit_test(test_parseGPHDUri_NegativeTestNoOptions),
unit_test(test_parseGPHDUri_NegativeTestNoCluster),
unit_test(test_parseGPHDUri_NegativeTestMissingEqual),
unit_test(test_parseGPHDUri_NegativeTestDuplicateEquals),
unit_test(test_parseGPHDUri_NegativeTestMissingKey),
......@@ -433,7 +603,11 @@ main(int argc, char* argv[])
unit_test(test_GPHDUri_parse_fragment_MissingUserData),
unit_test(test_GPHDUri_parse_fragment_MissingProfile),
unit_test(test_GPHDUri_parse_segwork_NoSegwork),
unit_test(test_GPHDUri_parse_segwork_TwoFragments)
unit_test(test_GPHDUri_parse_segwork_TwoFragments),
unit_test(test_GPHDUri_opt_exists),
unit_test(test_GPHDUri_verify_no_duplicate_options),
unit_test(test_GPHDUri_verify_core_options_exist),
unit_test(test_GPHDUri_verify_cluster_exists)
};
MemoryContextInit();
......
......@@ -28,9 +28,6 @@
/* include unit under test */
#include "../src/pxfutils.c"
/* include mock files */
#include "mock/libchurl_mock.c"
void
test_are_ips_equal(void **state)
{
......@@ -118,75 +115,6 @@ test_normalize_key_name(void **state)
}
void
test_call_rest(void **state)
{
GPHDUri *hadoop_uri = (GPHDUri*) palloc0(sizeof(GPHDUri));
hadoop_uri->host = "host";
hadoop_uri->port = "123";
ClientContext *client_context = (ClientContext*) palloc0(sizeof(ClientContext));
client_context->http_headers = (CHURL_HEADERS) palloc0(sizeof(CHURL_HEADERS));
initStringInfo(&(client_context->the_rest_buf));
CHURL_HANDLE handle = (CHURL_HANDLE) palloc0(sizeof(CHURL_HANDLE));
char *rest_msg = "http://%s:%s/%s/%s";
expect_value(print_http_headers, headers, client_context->http_headers);
will_be_called(print_http_headers);(client_context->http_headers);
StringInfoData expected_url;
initStringInfo(&expected_url);
appendStringInfo(&expected_url, "http://host:123/%s/%s", PXF_SERVICE_PREFIX, PXF_VERSION);
expect_string(churl_init_download, url, expected_url.data);
expect_value(churl_init_download, headers, client_context->http_headers);
will_return(churl_init_download, handle);
expect_value(churl_read_check_connectivity, handle, handle);
will_be_called(churl_read_check_connectivity);
/* first call to read should return Hello */
expect_value(churl_read, handle, handle);
expect_any(churl_read, buf);
expect_any(churl_read, max_size);
char* str = pstrdup("Hello ");
//will_assign_memory(churl_read, buf, str, 7);
will_return(churl_read, 7);
/* second call to read should return World */
expect_value(churl_read, handle, handle);
expect_any(churl_read, buf);
expect_any(churl_read, max_size);
//will_assign_memory(churl_read, buf, "World", 6);
will_return(churl_read, 6);
/* third call will return nothing */
expect_value(churl_read, handle, handle);
expect_any(churl_read, buf);
expect_any(churl_read, max_size);
will_return(churl_read, 0);
expect_value(churl_cleanup, handle, handle);
expect_value(churl_cleanup, after_error, false);
will_be_called(churl_cleanup);
call_rest(hadoop_uri, client_context, rest_msg);
//TODO: debug this, seems will_assign_memory is not quite working
//assert_string_equal(client_context->the_rest_buf.data, "Hello World");
pfree(expected_url.data);
pfree(client_context->http_headers);
pfree(client_context);
pfree(hadoop_uri);
}
int
main(int argc, char* argv[])
{
......@@ -195,8 +123,7 @@ main(int argc, char* argv[])
const UnitTest tests[] = {
unit_test(test_are_ips_equal),
unit_test(test_port_to_str),
unit_test(test_normalize_key_name),
unit_test(test_call_rest)
unit_test(test_normalize_key_name)
};
MemoryContextInit();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册