提交 2e1646a9 编写于 作者: K Kuien Liu 提交者: Adam Lee

s3ext: refactor gpcheckcloud codes

Using new GPReader, and make it clean.
Signed-off-by: NKuien Liu <kliu@pivotal.io>
上级 8fc342dc
......@@ -53,7 +53,7 @@ mkgphdfs:
.PHONY:
mks3ext:
PATH=$(INSTLOC)/bin:$(PATH) $(MAKE) -B -C gps3ext USE_PGXS=1 install
PATH=$(INSTLOC)/bin:$(PATH) $(MAKE) -B -C gps3ext -f Makefile.check USE_PGXS=1 install
PATH=$(INSTLOC)/bin:$(PATH) $(MAKE) -B -C gps3ext/bin/gpcheckcloud USE_PGXS=1 install
# Only include include these files when running enterprise build
ENTERPRISE_TARGETS="mkdbgen mkgphdfs mkorafce mkplr mkpljava"
......
......@@ -35,10 +35,10 @@ tags:
-gtags -i
lint:
cppcheck -v --enable=warning src/*.cpp test/*.cpp include/*.h
cppcheck -v --enable=warning src/*.cpp bin/gpcheckcloud/*.cpp test/*.cpp include/*.h
format:
clang-format -style="{BasedOnStyle: Google, IndentWidth: 4, ColumnLimit: 100, AllowShortFunctionsOnASingleLine: None}" -i src/*.cpp test/*.cpp include/*.h
clang-format -style="{BasedOnStyle: Google, IndentWidth: 4, ColumnLimit: 100, AllowShortFunctionsOnASingleLine: None}" -i src/*.cpp bin/gpcheckcloud/*.cpp test/*.cpp include/*.h
cleanall:
@-make clean # incase PGXS not included
......@@ -46,6 +46,6 @@ cleanall:
@make -C test clean
rm -f *.o *.so *.a
rm -f *.gcov src/*.gcov src/*.gcda src/*.gcno
rm -f src/*.o src/*.d test/*.o test/*.d test/*.a lib/*.o lib/*.d
rm -f src/*.o src/*.d bin/gpcheckcloud/*.o bin/gpcheckcloud/*.d test/*.o test/*.d test/*.a lib/*.o lib/*.d
.PHONY: format lint tags test coverage cleanall
......@@ -14,7 +14,7 @@ endif
# Targets
PROGRAM = gpcheckcloud
OBJS = gpcheckcloud.o ../../lib/http_parser.o ../../lib/ini.o $(addprefix ../../,$(COMMON_OBJS))
OBJS = gpcheckcloud.o ../../lib/http_parser.o ../../lib/ini.o $(addprefix ../../src/,$(COMMON_OBJS))
# Launch
PGXS := $(shell pg_config --pgxs)
......
......@@ -6,8 +6,8 @@ int main(int argc, char *argv[]) {
int opt = 0;
bool ret = true;
s3ext_logtype = STDERR_LOG;
s3ext_loglevel = EXT_ERROR;
s3ext_logtype = STDERR_LOG;
if (argc == 1) {
print_usage(stderr);
......@@ -39,7 +39,7 @@ int main(int argc, char *argv[]) {
if (ret) {
exit(EXIT_SUCCESS);
} else {
fprintf(stderr, "Failed. Please check the arguments.\n\n");
fprintf(stderr, "Failed. Please check the arguments and configuration file.\n\n");
print_usage(stderr);
exit(EXIT_FAILURE);
}
......@@ -67,24 +67,6 @@ void print_usage(FILE *stream) {
" gpcheckcloud -h, to show this help.\n");
}
bool read_config(const char *config) {
bool ret = false;
ret = InitConfig(config, "default");
s3ext_logtype = STDERR_LOG;
return ret;
}
ListBucketResult *list_bucket(S3Reader *wrapper) {
S3Credential g_cred = {s3ext_accessid, s3ext_secret};
ListBucketResult *r = ListBucket("https", wrapper->get_region(), wrapper->get_bucket(),
wrapper->get_prefix(), g_cred);
return r;
}
uint8_t print_contents(ListBucketResult *r) {
char urlbuf[256];
uint8_t count = 0;
......@@ -107,63 +89,29 @@ uint8_t print_contents(ListBucketResult *r) {
}
bool check_config(const char *url_with_options) {
char *url_str = truncate_options(url_with_options);
if (!url_str) {
if (!url_with_options) {
return false;
}
char *config_path = get_opt_s3(url_with_options, "config");
if (!config_path) {
free(url_str);
GPReader *reader = reader_init(url_with_options);
if (!reader) {
return false;
}
curl_global_init(CURL_GLOBAL_ALL);
S3Reader *wrapper = NULL;
ListBucketResult *result = NULL;
bool ret = false;
if (!read_config(config_path)) {
goto FAIL;
}
wrapper = new S3Reader(url_str);
if (!wrapper) {
fprintf(stderr, "Failed to allocate wrapper\n");
goto FAIL;
}
if (!wrapper->ValidateURL()) {
fprintf(stderr, "Failed: URL is not valid.\n");
goto FAIL;
}
result = list_bucket(wrapper);
if (!result) {
goto FAIL;
} else {
ListBucketResult *result = reader->getKeyList();
if (result != NULL) {
if (print_contents(result)) {
printf("Yea! Your configuration works well.\n");
printf("\nYea! Your configuration works well.\n");
} else {
printf(
"Your configuration works well, however there is no file "
"\nYour configuration works well, however there is no file "
"matching your prefix.\n");
}
delete result;
}
ret = true;
FAIL:
free(url_str);
free(config_path);
reader_cleanup(&reader);
if (wrapper) {
delete wrapper;
}
return ret;
return true;
}
bool s3_download(const char *url_with_options) {
......@@ -171,57 +119,32 @@ bool s3_download(const char *url_with_options) {
return false;
}
S3Reader *wrapper = NULL;
int data_len = BUF_SIZE;
char data_buf[BUF_SIZE];
bool ret = true;
thread_setup();
char *data_buf = (char *)malloc(BUF_SIZE);
if (!data_buf) {
goto FAIL;
}
s3ext_logtype = STDERR_LOG;
wrapper = reader_init(url_with_options);
if (!wrapper) {
fprintf(stderr, "Failed to init wrapper\n");
goto FAIL;
GPReader *reader = reader_init(url_with_options);
if (!reader) {
return false;
}
s3ext_segid = 0;
s3ext_segnum = 1;
do {
data_len = BUF_SIZE;
if (!reader_transfer_data(wrapper, data_buf, data_len)) {
if (!reader_transfer_data(reader, data_buf, data_len)) {
fprintf(stderr, "Failed to read data\n");
goto FAIL;
ret = false;
break;
}
fwrite(data_buf, data_len, 1, stdout);
} while (data_len);
thread_cleanup();
if (!reader_cleanup(&wrapper)) {
fprintf(stderr, "Failed to cleanup wrapper\n");
goto FAIL;
}
free(data_buf);
return true;
reader_cleanup(&reader);
FAIL:
if (data_buf) {
free(data_buf);
}
if (wrapper) {
delete wrapper;
}
thread_cleanup();
return false;
return ret;
}
......@@ -5,9 +5,11 @@
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include "gpcommon.h"
#include "gpreader.h"
#include "s3common.h"
#include "s3conf.h"
#include "s3interface.h"
#include "s3key_reader.h"
#include "s3log.h"
......@@ -15,16 +17,10 @@
extern volatile bool QueryCancelPending;
extern S3Reader *wrapper;
void print_template();
void print_usage(FILE *stream);
bool read_config(const char *config);
ListBucketResult *list_bucket(S3Reader *wrapper);
uint8_t print_contents(ListBucketResult *r);
bool check_config(const char *url_with_options);
......
......@@ -24,6 +24,10 @@ class GPReader : public Reader {
// This should be reentrant, has no side effects when called multiple times.
virtual void close();
ListBucketResult *getKeyList() {
return bucketReader.getKeyList();
}
private:
void constructReaderParam(const string &url);
......
......@@ -35,7 +35,6 @@ class S3BucketReader : public Reader {
ListBucketResult *listBucketWithRetry(int retries);
// for test only
ListBucketResult *getKeyList() {
return keyList;
}
......
......@@ -92,12 +92,12 @@ class S3Service : public S3Interface {
string getUrl(const string& prefix, const string& schema, const string& host,
const string& bucket, const string& marker);
void parseBucketXML(ListBucketResult* result, xmlParserCtxtPtr xmlcontext, string& marker);
bool parseBucketXML(ListBucketResult* result, xmlParserCtxtPtr xmlcontext, string& marker);
xmlParserCtxtPtr getBucketXML(const string& region, const string& url, const string& prefix,
const S3Credential& cred, const string& marker);
Response getBucketResponse(const string& region, const string& url, const string& prefix,
const S3Credential& cred, const string& marker);
bool checkXMLMessage(xmlParserCtxtPtr xmlcontext);
void parseXMLMessage(xmlParserCtxtPtr xmlcontext);
HTTPHeaders composeHTTPHeaders(const string& url, const string& marker, const string& prefix,
const string& region, const S3Credential& cred);
......
......@@ -137,7 +137,6 @@ GPReader* reader_init(const char* url_with_options) {
} catch (std::exception& e) {
if (reader != NULL) {
delete reader;
return NULL;
}
S3ERROR("reader_init caught an exception: %s, aborting", e.what());
return NULL;
......
......@@ -80,6 +80,9 @@ bool InitConfig(const string& conf_path, const string section = "default") {
#ifndef S3_CHK_CFG
content = s3cfg->Get(section.c_str(), "logtype", "INTERNAL");
s3ext_logtype = getLogType(content.c_str());
content = s3cfg->Get(section.c_str(), "debug_curl", "false");
s3ext_debug_curl = to_bool(content);
#endif
s3ext_accessid = s3cfg->Get(section.c_str(), "accessid", "");
......@@ -136,9 +139,6 @@ bool InitConfig(const string& conf_path, const string section = "default") {
content = s3cfg->Get(section.c_str(), "encryption", "true");
s3ext_encryption = to_bool(content);
content = s3cfg->Get(section.c_str(), "debug_curl", "false");
s3ext_debug_curl = to_bool(content);
#ifdef S3_STANDALONE
s3ext_segid = 0;
s3ext_segnum = 1;
......
......@@ -100,27 +100,24 @@ xmlParserCtxtPtr S3Service::getXMLContext(Response response) {
// require curl 7.17 higher
// http://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html
xmlParserCtxtPtr S3Service::getBucketXML(const string &region, const string &url,
const string &prefix, const S3Credential &cred,
const string &marker) {
Response S3Service::getBucketResponse(const string &region, const string &url, const string &prefix,
const S3Credential &cred, const string &marker) {
HTTPHeaders header = composeHTTPHeaders(url, marker, prefix, region, cred);
std::map<string, string> empty;
Response response = service->get(url, header, empty);
if (!response.isSuccess()) {
S3ERROR("Failed to GET bucket list of '%s'", url.c_str());
return NULL;
}
xmlParserCtxtPtr xmlptr = getXMLContext(response);
return xmlptr;
return service->get(url, header, empty);
}
bool S3Service::checkXMLMessage(xmlParserCtxtPtr xmlcontext) {
// parseXMLMessage must not throw exception, otherwise result is leaked.
void S3Service::parseXMLMessage(xmlParserCtxtPtr xmlcontext) {
if (xmlcontext == NULL) {
return;
}
xmlNode *rootElement = xmlDocGetRootElement(xmlcontext->myDoc);
if (rootElement == NULL) {
S3ERROR("Failed to parse returned xml of bucket list");
return false;
return;
}
xmlNodePtr curNode = rootElement->xmlChildrenNode;
......@@ -131,21 +128,25 @@ bool S3Service::checkXMLMessage(xmlParserCtxtPtr xmlcontext) {
S3ERROR("Amazon S3 returns error \"%s\"", content);
xmlFree(content);
}
return false;
return;
}
curNode = curNode->next;
}
return true;
}
void S3Service::parseBucketXML(ListBucketResult *result, xmlParserCtxtPtr xmlcontext,
// parseBucketXML must not throw exception, otherwise result is leaked.
bool S3Service::parseBucketXML(ListBucketResult *result, xmlParserCtxtPtr xmlcontext,
string &marker) {
CHECK_OR_DIE((result != NULL && xmlcontext != NULL));
if ((result == NULL) || (xmlcontext == NULL)) {
return false;
}
xmlNode *rootElement = xmlDocGetRootElement(xmlcontext->myDoc);
CHECK_OR_DIE_MSG(rootElement != NULL, "%s", "Failed to parse returned xml of bucket list");
if (rootElement == NULL) {
S3WARN("Failed to parse returned xml of bucket list");
return false;
}
xmlNodePtr cur;
bool is_truncated = false;
......@@ -233,7 +234,7 @@ void S3Service::parseBucketXML(ListBucketResult *result, xmlParserCtxtPtr xmlcon
xmlFree(key);
}
return;
return true;
}
// ListBucket list all keys in given bucket with given prefix.
......@@ -249,7 +250,7 @@ ListBucketResult *S3Service::listBucket(const string &schema, const string &regi
host << "s3-" << region << ".amazonaws.com";
S3DEBUG("Host url is %s", host.str().c_str());
// TODO: here we have memory leak.
// NOTE: here we might have memory leak.
ListBucketResult *result = new ListBucketResult();
CHECK_OR_DIE_MSG(result != NULL, "%s", "Failed to allocate bucket list result");
......@@ -258,22 +259,22 @@ ListBucketResult *S3Service::listBucket(const string &schema, const string &regi
// S3 requires query parameters specified alphabetically.
string url = this->getUrl(prefix, schema, host.str(), bucket, marker);
xmlParserCtxtPtr xmlcontext = getBucketXML(region, url, prefix, cred, marker);
if (xmlcontext == NULL) {
S3ERROR("Failed to list bucket for '%s'", url.c_str());
delete result;
return NULL;
}
xmlParserCtxtPtr xmlcontext = NULL;
XMLContextHolder holder(xmlcontext);
// parseBucketXML must not throw exception, otherwise result is leaked.
if (!checkXMLMessage(xmlcontext)) {
delete result;
return NULL;
Response response = getBucketResponse(region, url, prefix, cred, marker);
xmlcontext = getXMLContext(response);
if (response.isSuccess()) {
if (parseBucketXML(result, xmlcontext, marker)) {
continue;
}
} else {
parseXMLMessage(xmlcontext);
}
parseBucketXML(result, xmlcontext, marker);
delete result;
return NULL;
} while (!marker.empty());
return result;
......@@ -307,7 +308,7 @@ uint64_t S3Service::fetchData(uint64_t offset, char *data, uint64_t len, const s
return responseData.size();
} else if (resp.getStatus() == RESPONSE_ERROR) {
xmlParserCtxtPtr xmlptr = getXMLContext(resp);
checkXMLMessage(xmlptr);
parseXMLMessage(xmlptr);
S3ERROR("Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
resp.getMessage().c_str());
return 0;
......@@ -346,10 +347,11 @@ S3CompressionType S3Service::checkCompressionType(const string &keyUrl, const st
if ((responseData[0] == 0x1f) && (responseData[1] == 0x8b)) {
return S3_COMPRESSION_GZIP;
}
} else if (resp.getStatus() == RESPONSE_ERROR) {
xmlParserCtxtPtr xmlptr = getXMLContext(resp);
CHECK_OR_DIE(checkXMLMessage(xmlptr));
} else {
if (resp.getStatus() == RESPONSE_ERROR) {
xmlParserCtxtPtr xmlptr = getXMLContext(resp);
parseXMLMessage(xmlptr);
}
CHECK_OR_DIE_MSG(false, "Failed to fetch: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
}
......
......@@ -82,7 +82,7 @@ TEST_F(S3ServiceTest, ListBucketWithWrongBucketName) {
"hXUzV1VnFbbwNjUQsqWeFiDANkV4EVkh8Kpq5NNAi27P7XDhoA9M9Xhg0=</HostId>"
"</Error>";
vector<uint8_t> raw(xml, xml + sizeof(xml) - 1);
Response response(RESPONSE_OK, raw);
Response response(RESPONSE_ERROR, raw);
EXPECT_CALL(mockRestfulService, get(_, _, _)).WillOnce(Return(response));
......@@ -199,7 +199,7 @@ TEST_F(S3ServiceTest, ListBucketWithErrorResponse) {
TEST_F(S3ServiceTest, ListBucketWithErrorReturnedXML) {
uint8_t xml[] = "whatever";
vector<uint8_t> raw(xml, xml + sizeof(xml) - 1);
Response response(RESPONSE_OK, raw);
Response response(RESPONSE_ERROR, raw);
EXPECT_CALL(mockRestfulService, get(_, _, _)).WillOnce(Return(response));
......@@ -210,7 +210,7 @@ TEST_F(S3ServiceTest, ListBucketWithErrorReturnedXML) {
TEST_F(S3ServiceTest, ListBucketWithNonRootXML) {
uint8_t xml[] = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
vector<uint8_t> raw(xml, xml + sizeof(xml) - 1);
Response response(RESPONSE_OK, raw);
Response response(RESPONSE_ERROR, raw);
EXPECT_CALL(mockRestfulService, get(_, _, _)).WillOnce(Return(response));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册