提交 9f4497fd 编写于 作者: D Daniel Gustafsson 提交者: Lav Jain

Use built-in JSON parser for PXF fragments (#3185)

* Use built-in JSON parser for PXF fragments

Instead of relying on a external library, use the built-in JSON
parser in the backend for the PXF fragments parsing. Since this
replaces the current implementation with an event-based callback
parser, the code is more complicated, but dogfooding the parser
that we want extension writers to use is a good thing.

This removes the dependency on json-c from autoconf, and enables
building PXF on Travis for extra coverage.

* Use elog for internal errors, and ereport for user errors

Internal errors where we are interested in source filename should
use elog() which will decorate the error messages automatically
with this information. The connection error is interesting for the
user however, use ereport() instead there.
上级 8cc820d3
......@@ -84,7 +84,7 @@ before_script:
script:
- cd ${TRAVIS_BUILD_DIR}
- ./configure --with-openssl --with-ldap --with-libcurl --prefix=${TRAVIS_BUILD_DIR}/gpsql --with-apr-config=${TRAVIS_BUILD_DIR}/tools/bin/apr-1-config --disable-orca --disable-gpcloud
- ./configure --with-openssl --with-ldap --with-libcurl --prefix=${TRAVIS_BUILD_DIR}/gpsql --with-apr-config=${TRAVIS_BUILD_DIR}/tools/bin/apr-1-config --disable-orca --disable-gpcloud --enable-pxf
- make
- make install
......
......@@ -17,7 +17,6 @@ brew install apr # gpperfmon
brew install apr-util # gpperfmon
brew link --force apr
brew link --force apr-util
brew install json-c # pxf
brew install curl # pxf
# Installing Golang
......
......@@ -173,10 +173,11 @@ make distclean
PXF is an extension framework for GPDB to enable fast access to external hadoop datasets.
Refer to [PXF extension](https://github.com/greenplum-db/gpdb/tree/master/gpAux/extensions/pxf) for more information.
Currently, GPDPB isn't built with PXF by default.
PXF requires curl version >= 7.21.3 and also has an additional dependancy on json-c library
In order to build GPDB with pxf, simply invoke `./configure` with the additional option `--enable-pxf`.
PXF requires curl version >= 7.21.3, so `--enable-pxf` is not compatible with
the `--without-libcurl` option.
```
# Configure build environment to build/install PXF at /usr/local/gpdb
# Configure build environment to additionally build PXF, and install at /usr/local/gpdb
./configure --with-perl --with-python --with-libxml --prefix=/usr/local/gpdb --enable-pxf
```
......
......@@ -13169,86 +13169,6 @@ done
fi
# json-c
if test "$enable_pxf" = yes ; then
if test "$PORTNAME" != "win32" ; then
for ac_header in json-c/json.h
do :
ac_fn_c_check_header_mongrel "$LINENO" "json-c/json.h" "ac_cv_header_json_c_json_h" "$ac_includes_default"
if test "x$ac_cv_header_json_c_json_h" = xyes; then :
cat >>confdefs.h <<_ACEOF
#define HAVE_JSON_C_JSON_H 1
_ACEOF
else
as_fn_error $? "header file <json-c/json.h> is required.
Check config.log for details. It is possible the compiler isn't looking in the proper directory." "$LINENO" 5
fi
done
{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for library containing json_tokener_parse" >&5
$as_echo_n "checking for library containing json_tokener_parse... " >&6; }
if ${ac_cv_search_json_tokener_parse+:} false; then :
$as_echo_n "(cached) " >&6
else
ac_func_search_save_LIBS=$LIBS
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
/* end confdefs.h. */
/* Override any GCC internal prototype to avoid an error.
Use char because int might match the return type of a GCC
builtin and then its argument prototype would still apply. */
#ifdef __cplusplus
extern "C"
#endif
char json_tokener_parse ();
int
main ()
{
return json_tokener_parse ();
;
return 0;
}
_ACEOF
for ac_lib in '' json-c json; do
if test -z "$ac_lib"; then
ac_res="none required"
else
ac_res=-l$ac_lib
LIBS="-l$ac_lib $ac_func_search_save_LIBS"
fi
if ac_fn_c_try_link "$LINENO"; then :
ac_cv_search_json_tokener_parse=$ac_res
fi
rm -f core conftest.err conftest.$ac_objext \
conftest$ac_exeext
if ${ac_cv_search_json_tokener_parse+:} false; then :
break
fi
done
if ${ac_cv_search_json_tokener_parse+:} false; then :
else
ac_cv_search_json_tokener_parse=no
fi
rm conftest.$ac_ext
LIBS=$ac_func_search_save_LIBS
fi
{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_search_json_tokener_parse" >&5
$as_echo "$ac_cv_search_json_tokener_parse" >&6; }
ac_res=$ac_cv_search_json_tokener_parse
if test "$ac_res" != no; then :
test "$ac_res" = "none required" || LIBS="$ac_res $LIBS"
else
as_fn_error $? "library 'json-c' is required.
Check config.log for details. It is possible the compiler isn't looking in the proper directory." "$LINENO" 5
fi
fi
fi
##
## Types, structures, compiler characteristics
##
......
......@@ -1518,16 +1518,6 @@ if test "$enable_mapreduce" = yes; then
AC_CHECK_HEADERS(yaml.h, [], [AC_MSG_ERROR([YAML includes required for Greenplum Mapreduce])])
fi
# json-c
if test "$enable_pxf" = yes ; then
if test "$PORTNAME" != "win32" ; then
AC_CHECK_HEADERS(json-c/json.h, [], [AC_MSG_ERROR([header file <json-c/json.h> is required.
Check config.log for details. It is possible the compiler isn't looking in the proper directory.])])
AC_SEARCH_LIBS(json_tokener_parse, json-c json, [], [AC_MSG_ERROR([library 'json-c' is required.
Check config.log for details. It is possible the compiler isn't looking in the proper directory.])], [])
fi
fi
##
## Types, structures, compiler characteristics
##
......
......@@ -15,7 +15,7 @@ endif
CURL_DIR = /usr/local/opt/curl
SHLIB_LINK += -lcurl -ljson-c
SHLIB_LINK += -lcurl
unittest-check:
$(MAKE) -C test check
......
#include "postgres.h"
#include "pxffragment.h"
#include "pxfutils.h"
#include "cdb/cdbtm.h"
#include "cdb/cdbvars.h"
#include "commands/copy.h"
#include "postgres.h"
#include "lib/stringinfo.h"
#include <json-c/json.h>
#include "utils/jsonapi.h"
static List *get_data_fragment_list(GPHDUri *hadoop_uri, ClientContext *client_context);
static void rest_request(GPHDUri *hadoop_uri, ClientContext *client_context, char *rest_msg);
......@@ -19,6 +19,10 @@ static void init_client_context(ClientContext *client_context);
static void assign_pxf_location_to_fragments(List *fragments);
static void call_rest(GPHDUri *hadoop_uri, ClientContext *client_context, char *rest_msg);
static void process_request(ClientContext *client_context, char *uri);
static void pxf_fragment_scalar(void *state, char *token, JsonTokenType type);
static void pxf_fragment_object_start(void *state, char *name, bool isnull);
static void pxf_array_element_start(void *state, bool isnull);
static void pxf_array_element_end(void *state, bool isnull);
/* Get List of fragments using PXF
* Returns selected fragments that have been allocated to the current segment
......@@ -156,76 +160,189 @@ rest_request(GPHDUri *hadoop_uri, ClientContext *client_context, char *rest_msg)
* Response (left as a single line purposefully):
* {"PXFFragments":[{"index":0,"userData":null,"sourceName":"demo/text2.csv","metadata":"rO0ABXcQAAAAAAAAAAAAAAAAAAAABXVyABNbTGphdmEubGFuZy5TdHJpbmc7rdJW5+kde0cCAAB4cAAAAAN0ABxhZXZjZWZlcm5hczdtYnAuY29ycC5lbWMuY29tdAAcYWV2Y2VmZXJuYXM3bWJwLmNvcnAuZW1jLmNvbXQAHGFldmNlZmVybmFzN21icC5jb3JwLmVtYy5jb20=","replicas":["10.207.4.23","10.207.4.23","10.207.4.23"]},{"index":0,"userData":null,"sourceName":"demo/text_csv.csv","metadata":"rO0ABXcQAAAAAAAAAAAAAAAAAAAABnVyABNbTGphdmEubGFuZy5TdHJpbmc7rdJW5+kde0cCAAB4cAAAAAN0ABxhZXZjZWZlcm5hczdtYnAuY29ycC5lbWMuY29tdAAcYWV2Y2VmZXJuYXM3bWJwLmNvcnAuZW1jLmNvbXQAHGFldmNlZmVybmFzN21icC5jb3JwLmVtYy5jb20=","replicas":["10.207.4.23","10.207.4.23","10.207.4.23"]}]}
*/
static List *
parse_get_fragments_response(List *fragments, StringInfo rest_buf)
typedef enum pxf_fragment_object
{
struct json_object *whole = json_tokener_parse(rest_buf->data);
PXF_PARSE_START,
PXF_PARSE_INDEX,
PXF_PARSE_USERDATA,
PXF_PARSE_PROFILE,
PXF_PARSE_SOURCENAME,
PXF_PARSE_METADATA,
PXF_PARSE_REPLICAS
} pxf_fragment_object;
typedef struct FragmentState
{
JsonLexContext *lex;
pxf_fragment_object object;
List *fragments;
bool has_replicas;
int arraydepth;
} FragmentState;
static void
pxf_fragment_object_start(void *state, char *name, bool isnull)
{
FragmentState *s = (FragmentState *) state;
if ((whole == NULL) || is_error(whole))
if (s->lex->token_type == JSON_TOKEN_NUMBER)
{
elog(ERROR, "Failed to parse fragments list from PXF");
if (pg_strcasecmp(name, "index") == 0)
s->object = PXF_PARSE_INDEX;
}
struct json_object *head;
List *ret_frags = fragments;
if (!json_object_object_get_ex(whole, "PXFFragments", &head))
else if (s->lex->token_type == JSON_TOKEN_STRING || s->lex->token_type == JSON_TOKEN_NULL)
{
elog(INFO, "No Data Fragments available for the resource");
return ret_frags;
if (pg_strcasecmp(name, "userData") == 0)
s->object = PXF_PARSE_USERDATA;
else if (pg_strcasecmp(name, "sourceName") == 0)
s->object = PXF_PARSE_SOURCENAME;
else if (pg_strcasecmp(name, "metadata") == 0)
s->object = PXF_PARSE_METADATA;
else if (pg_strcasecmp(name, "profile") == 0)
s->object = PXF_PARSE_PROFILE;
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("unrecognized object in PXF fragment: \"%s\"",
name)));
}
else if (s->lex->token_type == JSON_TOKEN_ARRAY_START)
{
if (pg_strcasecmp(name, "PXFFragments") == 0)
{
if (s->object != PXF_PARSE_START)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("malformed PXF fragment")));
}
else if (pg_strcasecmp(name, "replicas") == 0)
{
s->object = PXF_PARSE_REPLICAS;
s->has_replicas = false;
}
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("unrecognized array in PXF fragment: \"%s\"",
name)));
}
}
int length = json_object_array_length(head);
static void
check_and_assign(char **field, JsonTokenType type, char *token,
JsonTokenType expected_type, bool nullable)
{
if (type == JSON_TOKEN_NULL && nullable)
return;
/* obtain split information from the block */
for (int i = 0; i < length; i++)
{
struct json_object *js_fragment = json_object_array_get_idx(head, i);
FragmentData *fragment = (FragmentData *) palloc0(sizeof(FragmentData));
if (type == expected_type)
*field = pstrdup(token);
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("unexpected value \"%s\" for attribute", token)));
}
/* source name */
struct json_object *block_data;
static void
pxf_fragment_scalar(void *state, char *token, JsonTokenType type)
{
FragmentState *s = (FragmentState *) state;
FragmentData *d = (FragmentData *) llast(s->fragments);
if (json_object_object_get_ex(js_fragment, "sourceName", &block_data))
fragment->source_name = pstrdup(json_object_get_string(block_data));
/* Populate the fragment depending on the type of the scalar and the current object */
switch(s->object)
{
case PXF_PARSE_USERDATA:
check_and_assign(&(d->user_data), type, token, JSON_TOKEN_STRING, true);
break;
case PXF_PARSE_METADATA:
check_and_assign(&(d->fragment_md), type, token, JSON_TOKEN_STRING, true);
break;
case PXF_PARSE_PROFILE:
check_and_assign(&(d->profile), type, token, JSON_TOKEN_STRING, true);
break;
case PXF_PARSE_SOURCENAME:
check_and_assign(&(d->source_name), type, token, JSON_TOKEN_STRING, false);
break;
case PXF_PARSE_INDEX:
check_and_assign(&(d->index), type, token, JSON_TOKEN_NUMBER, true);
break;
case PXF_PARSE_REPLICAS:
if (type == JSON_TOKEN_STRING)
s->has_replicas = true;
break;
case PXF_PARSE_START:
break;
default:
elog(ERROR, "Unexpected PXF object state: %d", s->object);
break;
}
}
/* fragment index, incremented per source name */
struct json_object *index;
static void
pxf_array_element_start(void *state, bool isnull)
{
FragmentState *s = (FragmentState *) state;
FragmentData *data;
if (json_object_object_get_ex(js_fragment, "index", &index) && index)
fragment->index = pstrdup(json_object_get_string(index));
/*
* If we are entering a nested array, an array inside the object in the
* main PXFFragments array, then we are still inside the current fragment
* and there is nothing but book keeping to do
*/
if (++s->arraydepth > 1)
return;
/* location - fragment meta data */
struct json_object *js_fragment_metadata;
/*
* Reaching here means we are entering a new fragment in the PXFFragments
* array, allocate a new fragment on the list to populate during parsing.
*/
data = palloc0(sizeof(FragmentData));
s->fragments = lappend(s->fragments, data);
s->has_replicas = false;
s->object = PXF_PARSE_START;
}
if (json_object_object_get_ex(js_fragment, "metadata", &js_fragment_metadata) && js_fragment_metadata)
fragment->fragment_md = pstrdup(json_object_get_string(js_fragment_metadata));
static void
pxf_array_element_end(void *state, bool isnull)
{
FragmentState *s = (FragmentState *) state;
/* userdata - additional user information */
struct json_object *js_user_data;
if (--s->arraydepth == 0)
{
if (!s->has_replicas)
s->fragments = list_truncate(s->fragments,
list_length(s->fragments) - 1);
}
}
if (json_object_object_get_ex(js_fragment, "userData", &js_user_data) && js_user_data)
fragment->user_data = pstrdup(json_object_get_string(js_user_data));
static List *
parse_get_fragments_response(List *fragments, StringInfo rest_buf)
{
JsonSemAction *sem;
FragmentState *state;
/* profile - recommended profile to work with fragment */
struct json_object *js_profile;
sem = palloc0(sizeof(JsonSemAction));
state = palloc0(sizeof(FragmentState));
if (json_object_object_get_ex(js_fragment, "profile", &js_profile) && js_profile)
fragment->profile = pstrdup(json_object_get_string(js_profile));
state->fragments = NIL;
state->lex = makeJsonLexContext(cstring_to_text(rest_buf->data), true);
state->object = PXF_PARSE_START;
state->arraydepth = 0;
state->has_replicas = false;
/*
* Ignore fragment if it doesn't contain any host locations, for
* example if the file is empty.
*/
struct json_object *js_replica;
sem->semstate = state;
sem->scalar = pxf_fragment_scalar;
sem->object_field_start = pxf_fragment_object_start;
sem->array_element_start = pxf_array_element_start;
sem->array_element_end = pxf_array_element_end;
if (json_object_object_get_ex(js_fragment, "replicas", &js_replica) && js_replica)
ret_frags = lappend(ret_frags, fragment);
else
free_fragment(fragment);
pg_parse_json(state->lex, sem);
}
pfree(state->lex);
return ret_frags;
return state->fragments;
}
/*
......@@ -237,16 +354,12 @@ static List *
filter_fragments_for_segment(List *list)
{
if (!list)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("internal error in pxffragment.c:filter_fragments_for_segment. Parameter list is null.")));
elog(ERROR, "Parameter list is null in filter_fragments_for_segment");
DistributedTransactionId xid = getDistributedTransactionId();
if (xid == InvalidDistributedTransactionId)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("internal error in pxffragment.c:filter_fragments_for_segment. Cannot get distributed transaction identifier.")));
elog(ERROR, "Cannot get distributed transaction identifier in filter_fragments_for_segment");
/*
* to determine which segment S should process an element at a given index
......@@ -414,7 +527,9 @@ process_request(ClientContext *client_context, char *uri)
print_http_headers(client_context->http_headers);
client_context->handle = churl_init_download(uri, client_context->http_headers);
if (client_context->handle == NULL)
elog(ERROR, "Unsuccessful connection to uri: %s", uri);
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("Unsuccessful connection to uri: \"%s\"", uri)));
memset(buffer, 0, RAW_BUF_SIZE);
resetStringInfo(&(client_context->the_rest_buf));
......
......@@ -4,10 +4,8 @@ include $(top_builddir)/src/Makefile.global
TARGETS= libchurl pxfprotocol pxfbridge pxfheaders pxfuriparser pxfutils pxffragment
LIBS += -ljson-c
include $(top_builddir)/src/backend/mock.mk
pxfheaders.t: $(MOCK_DIR)/backend/access/external/fileam_mock.o $(MOCK_DIR)/backend/catalog/pg_exttable_mock.o
pxffragment.t: $(MOCK_DIR)/backend/cdb/cdbtm_mock.o
pxffragment.t: $(MOCK_DIR)/backend/cdb/cdbtm_mock.o $(top_builddir)/src/backend/utils/adt/json.o
......@@ -155,7 +155,7 @@ test_filter_fragments_for_segment(void **state)
ErrorData *edata = CopyErrorData();
assert_true(edata->elevel == ERROR);
char *expected_message = pstrdup("internal error in pxffragment.c:filter_fragments_for_segment. Parameter list is null.");
char *expected_message = pstrdup("Parameter list is null in filter_fragments_for_segment");
assert_string_equal(edata->message, expected_message);
pfree(expected_message);
......@@ -175,7 +175,7 @@ test_filter_fragments_for_segment(void **state)
ErrorData *edata = CopyErrorData();
assert_true(edata->elevel == ERROR);
char *expected_message = pstrdup("internal error in pxffragment.c:filter_fragments_for_segment. Cannot get distributed transaction identifier.");
char *expected_message = pstrdup("Cannot get distributed transaction identifier in filter_fragments_for_segment");
assert_string_equal(edata->message, expected_message);
pfree(expected_message);
......
......@@ -291,9 +291,6 @@
/* Define to 1 if you have isinf(). */
#undef HAVE_ISINF
/* Define to 1 if you have the <json-c/json.h> header file. */
#undef HAVE_JSON_C_JSON_H
/* Define to 1 if you have the <kernel/image.h> header file. */
#undef HAVE_KERNEL_IMAGE_H
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册