提交 753a5326 编写于 作者: A Adam Lee

s3ext: refactor s3wrapper and s3downloader codes

refactor, optimize and rename them to gpreader and s3reader.
上级 d42233e2
......@@ -16,7 +16,7 @@ endif
# Targets
MODULE_big = gps3ext
OBJS = lib/http_parser.o lib/ini.o src/gps3ext.o src/s3conf.o src/s3common.o src/s3wrapper.o src/s3downloader.o src/s3utils.o src/s3log.o src/s3url_parser.o src/s3http_headers.o src/s3thread.o
OBJS = lib/http_parser.o lib/ini.o src/gps3ext.o src/s3conf.o src/s3common.o src/gpreader.o src/s3utils.o src/s3log.o src/s3url_parser.o src/s3http_headers.o src/s3thread.o src/s3extbase.o src/s3reader.o
# Launch
PGXS := $(shell pg_config --pgxs)
......
......@@ -16,7 +16,7 @@ endif
# Targets
PROGRAM = gpcheckcloud
OBJS = src/gpcheckcloud.o src/s3conf.o src/s3downloader.o src/s3wrapper.o src/s3utils.o src/s3log.o src/s3common.o lib/http_parser.o lib/ini.o src/s3url_parser.o src/s3http_headers.o src/s3thread.o
OBJS = src/gpcheckcloud.o src/s3conf.o src/gpreader.o src/s3utils.o src/s3log.o src/s3common.o lib/http_parser.o lib/ini.o src/s3url_parser.o src/s3http_headers.o src/s3thread.o src/s3extbase.o src/s3reader.o
# Launch
PGXS := $(shell pg_config --pgxs)
......
......@@ -14,7 +14,7 @@ endif
all: test
# Google TEST
TEST_SRC_FILES = test/s3conf_test.cpp test/s3utils_test.cpp test/s3downloader_test.cpp test/s3common_test.cpp test/s3wrapper_test.cpp test/s3log_test.cpp test/s3url_parser_test.cpp test/s3http_headers_test.cpp test/s3thread_test.cpp
TEST_SRC_FILES = test/s3conf_test.cpp test/s3utils_test.cpp test/gpreader_test.cpp test/s3common_test.cpp test/s3log_test.cpp test/s3url_parser_test.cpp test/s3http_headers_test.cpp test/s3thread_test.cpp test/s3reader_test.cpp
TEST_OBJS = $(TEST_SRC_FILES:.cpp=.o)
TEST_APP = s3test
......
......@@ -7,9 +7,10 @@
#include "s3common.h"
#include "s3conf.h"
#include "s3downloader.h"
#include "s3reader.h"
#include "s3log.h"
#include "s3wrapper.h"
#include "gpreader.h"
#include "s3thread.h"
#define BUF_SIZE 64 * 1024
......
#ifndef _GPS3EXT_H_
#define _GPS3EXT_H_
#ifndef __GP_S3EXT_H_
#define __GP_S3EXT_H_
#include <string>
#include <signal.h>
......@@ -15,8 +15,8 @@ typedef struct GpId {
int4 numsegments; /* count of distinct segindexes */
int4 dbid; /* the dbid of this database */
int4 segindex; /* content indicator: -1 for entry database,
* 0, ..., n-1 for segment database *
* a primary and its mirror have the same segIndex */
* 0, ..., n-1 for segment database *
* a primary and its mirror have the same segIndex */
} GpId;
extern GpId GpIdentity;
......
#ifndef __GP_EXT_READER_H__
#define __GP_EXT_READER_H__
class Reader {
public:
virtual ~Reader() {}
virtual bool open() = 0;
// read() attempts to read up to count bytes into the buffer starting at
// buf.
virtual uint64_t read(char *buf, uint64_t count) = 0;
virtual void close() = 0;
};
#endif
......@@ -19,7 +19,13 @@ struct S3Credential {
string secret;
};
enum Method { GET, PUT, POST, DELETE, HEAD };
enum Method {
GET,
PUT,
POST,
DELETE,
HEAD
};
bool SignRequestV4(const string& method, HTTPHeaders* h,
const string& orig_region, const string& path,
......
#ifndef __S3_EXT_WRAPPER__
#define __S3_EXT_WRAPPER__
#ifndef __S3_EXTBASE__
#define __S3_EXTBASE__
#include <string>
#include "s3downloader.h"
#include "s3uploader.h"
#include "s3common.h"
using std::string;
// Base class for reader and writer wrapper for GPDB, includes common
// data and functions for both reader and writer.
class S3ExtBase {
public:
S3ExtBase(const string& url);
virtual ~S3ExtBase();
// Init initializes segment information, chunk size and threads.
virtual bool Init(int segid, int segnum, int chunksize) = 0;
// Transfer data between endpoints including download and upload.
virtual bool TransferData(char* data, uint64_t& len) = 0;
// Destroy allocated resources during initialization.
virtual bool Destroy() = 0;
// Check whether URL is valid or not.
virtual bool ValidateURL();
string get_region() { return this->region; }
......@@ -35,34 +41,11 @@ class S3ExtBase {
int concurrent_num;
int chunksize;
};
class S3Reader : public S3ExtBase {
public:
S3Reader(const string& url);
virtual ~S3Reader();
virtual bool Init(int segid, int segnum, int chunksize);
virtual bool TransferData(char* data, uint64_t& len);
virtual bool Destroy();
protected:
virtual string getKeyURL(const string& key);
bool getNextDownloader();
// private:
unsigned int contentindex;
Downloader* filedownloader;
ListBucketResult* keylist;
private:
void SetSchema();
void SetRegion();
void SetBucketAndPrefix();
};
class S3Writer : public S3ExtBase {};
extern "C" S3ExtBase* CreateExtWrapper(const char* url);
S3Reader* reader_init(const char* url_with_options);
bool reader_transfer_data(S3Reader* reader, char* data_buf, int& data_len);
bool reader_cleanup(S3Reader** reader);
#endif
......@@ -9,7 +9,13 @@
#include "s3conf.h"
// log level
enum LOGLEVEL { EXT_FATAL, EXT_ERROR, EXT_WARNING, EXT_INFO, EXT_DEBUG };
enum LOGLEVEL {
EXT_FATAL,
EXT_ERROR,
EXT_WARNING,
EXT_INFO,
EXT_DEBUG
};
// log type
enum LOGTYPE {
......
#ifndef __S3DOWNLOADER_H__
#define __S3DOWNLOADER_H__
#ifndef __S3_READER_H__
#define __S3_READER_H__
#include <fcntl.h>
#include <pthread.h>
......@@ -37,11 +37,17 @@ typedef enum compression_type {
class OffsetMgr {
public:
OffsetMgr(uint64_t maxsize, uint64_t chunksize);
~OffsetMgr() { pthread_mutex_destroy(&this->offset_lock); };
~OffsetMgr() {
pthread_mutex_destroy(&this->offset_lock);
};
Range NextOffset(); // ret.len == 0 means EOF
void Reset(uint64_t n);
uint64_t Chunksize() { return this->chunksize; };
uint64_t Size() { return this->maxsize; };
uint64_t Chunksize() {
return this->chunksize;
};
uint64_t Size() {
return this->maxsize;
};
private:
pthread_mutex_t offset_lock;
......@@ -57,8 +63,12 @@ class BlockingBuffer {
BlockingBuffer(const string& url, OffsetMgr* o);
virtual ~BlockingBuffer();
bool Init();
bool EndOfFile() { return this->eof; };
bool Error() { return this->error; };
bool EndOfFile() {
return this->eof;
};
bool Error() {
return this->error;
};
uint64_t Read(char* buf, uint64_t len);
uint64_t Fill();
......@@ -138,7 +148,9 @@ class HTTPFetcher : public BlockingBuffer {
protected:
uint64_t fetchdata(uint64_t offset, char* data, uint64_t len);
virtual bool processheader() { return true; };
virtual bool processheader() {
return true;
};
CURL* curl;
Method method;
HTTPHeaders headers;
......@@ -149,7 +161,7 @@ class S3Fetcher : public HTTPFetcher {
public:
S3Fetcher(const string& url, const string& region, OffsetMgr* o,
const S3Credential& cred);
~S3Fetcher(){};
~S3Fetcher() {};
protected:
virtual bool processheader();
......@@ -177,8 +189,12 @@ struct BucketContent {
uint64_t size);
BucketContent();
~BucketContent();
string Key() const { return this->key; };
uint64_t Size() const { return this->size; };
string Key() const {
return this->key;
};
uint64_t Size() const {
return this->size;
};
private:
// BucketContent(const BucketContent& b) = delete;
......
......@@ -7,9 +7,15 @@ class UrlParser {
public:
UrlParser(const char* url);
~UrlParser();
const char* Schema() { return this->schema; };
const char* Host() { return this->host; };
const char* Path() { return this->path; };
const char* Schema() {
return this->schema;
};
const char* Host() {
return this->host;
};
const char* Path() {
return this->path;
};
/* data */
private:
......
#ifndef __S3_UTILFUNCTIONS__
#define __S3_UTILFUNCTIONS__
#include <cstdio>
#include <cstdlib>
#include <stdint.h>
#include <sys/types.h>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <string>
......@@ -44,7 +44,7 @@ size_t find_Nth(const string& str, // where to work
class MD5Calc {
public:
MD5Calc();
~MD5Calc(){};
~MD5Calc() {};
bool Update(const char* data, int len);
const char* Get();
......@@ -58,13 +58,23 @@ class DataBuffer {
public:
DataBuffer(uint64_t size);
~DataBuffer();
void reset() { length = 0; };
void reset() {
length = 0;
};
uint64_t append(const char* buf, uint64_t len); // ret < len means full
const char* getdata() { return data; };
uint64_t len() { return this->length; };
bool full() { return maxsize == length; };
bool empty() { return 0 == length; };
const char* getdata() {
return data;
};
uint64_t len() {
return this->length;
};
bool full() {
return maxsize == length;
};
bool empty() {
return 0 == length;
};
private:
const uint64_t maxsize;
......@@ -81,7 +91,9 @@ class Config {
const string& defaultvalue);
bool Scan(const string& sec, const string& key, const char* scanfmt,
void* dst);
void* Handle() { return (void*)this->_conf; };
void* Handle() {
return (void*)this->_conf;
};
private:
ini_t* _conf;
......
......@@ -4,112 +4,75 @@
#include "gps3ext.h"
#include "s3conf.h"
#include "s3log.h"
#include "gpreader.h"
#include "s3utils.h"
#include "s3wrapper.h"
using std::string;
using std::stringstream;
// invoked by s3_import(), need to be exception safe
S3ExtBase *CreateExtWrapper(const char *url) {
try {
return new S3Reader(url);
} catch (...) {
S3ERROR("Caught an exception, aborting");
return NULL;
}
}
S3ExtBase::S3ExtBase(const string &url) {
this->url = url;
// get following from config
this->cred.secret = s3ext_secret;
this->cred.keyid = s3ext_accessid;
this->segid = s3ext_segid;
this->segnum = s3ext_segnum;
this->chunksize = s3ext_chunksize;
this->concurrent_num = s3ext_threadnum;
S3INFO("Created %d threads for downloading", s3ext_threadnum);
S3INFO("File is splited to %d each", s3ext_chunksize);
}
S3ExtBase::~S3ExtBase() {}
S3Reader::~S3Reader() {}
S3Reader::S3Reader(const string &url) : S3ExtBase(url) {
this->contentindex = -1;
this->filedownloader = NULL;
this->keylist = NULL;
}
// invoked by s3_import(), need to be exception safe
S3Reader::~S3Reader() {}
bool S3Reader::Init(int segid, int segnum, int chunksize) {
try {
// set segment id and num
this->segid = s3ext_segid;
this->segnum = s3ext_segnum;
this->contentindex = this->segid;
// set segment id and num
this->segid = s3ext_segid;
this->segnum = s3ext_segnum;
this->contentindex = this->segid;
this->chunksize = chunksize;
this->chunksize = chunksize;
// Validate url first
if (!this->ValidateURL()) {
S3ERROR("The given URL(%s) is invalid", this->url.c_str());
return false;
}
// validate url first
if (!this->ValidateURL()) {
S3ERROR("The given URL(%s) is invalid", this->url.c_str());
return false;
}
int initretry = 3;
while (initretry--) {
this->keylist = ListBucket(this->schema, this->region, this->bucket,
this->prefix, this->cred);
if (!this->keylist) {
S3INFO("Can't get keylist from bucket %s",
this->bucket.c_str());
if (initretry) {
S3INFO("Retrying");
continue;
} else {
S3ERROR(
"Quit initialization because ListBucket keeps failing");
return false;
}
int initretry = 3;
while (initretry--) {
this->keylist = ListBucket(this->schema, this->region, this->bucket,
this->prefix, this->cred);
if (!this->keylist) {
S3INFO("Can't get keylist from bucket %s", this->bucket.c_str());
if (initretry) {
S3INFO("Retrying");
continue;
} else {
S3ERROR("Quit initialization because ListBucket keeps failing");
return false;
}
}
if (this->keylist->contents.size() == 0) {
S3INFO("Keylist of bucket is empty");
if (initretry) {
S3INFO("Retry listing bucket");
delete this->keylist;
this->keylist = NULL;
continue;
} else {
S3ERROR("Quit initialization because keylist is empty");
return false;
}
if (this->keylist->contents.size() == 0) {
S3INFO("Keylist of bucket is empty");
if (initretry) {
S3INFO("Retry listing bucket");
delete this->keylist;
this->keylist = NULL;
continue;
} else {
S3ERROR("Quit initialization because keylist is empty");
return false;
}
break;
}
S3INFO("Got %d files to download", this->keylist->contents.size());
if (!this->getNextDownloader()) {
return false;
}
} catch (...) {
S3ERROR("Caught an exception, aborting");
break;
}
S3INFO("Got %d files to download", this->keylist->contents.size());
if (!this->getNextDownloader()) {
return false;
}
// return this->filedownloader ? true : false;
return true;
}
bool S3Reader::getNextDownloader() {
if (this->filedownloader) { // reset old downloader
// delete previous downloader
if (this->filedownloader) {
filedownloader->destroy();
delete this->filedownloader;
this->filedownloader = NULL;
......@@ -120,6 +83,7 @@ bool S3Reader::getNextDownloader() {
return true;
}
// construct a new downloader
if (this->concurrent_num > 0) {
this->filedownloader = new Downloader(this->concurrent_num);
} else {
......@@ -157,127 +121,50 @@ string S3Reader::getKeyURL(const string &key) {
return sstr.str();
}
// invoked by s3_import(), need to be exception safe
// Read data from downloader
bool S3Reader::TransferData(char *data, uint64_t &len) {
try {
if (!this->filedownloader) {
S3INFO("No files to download, exit");
// not initialized?
len = 0;
return true;
}
uint64_t buflen;
RETRY:
buflen = len;
// S3DEBUG("getlen is %d", len);
bool result = filedownloader->get(data, buflen);
if (!result) { // read fail
S3ERROR("Failed to get data from filedownloader");
return false;
}
// S3DEBUG("getlen is %lld", buflen);
if (buflen == 0) {
// change to next downloader
if (!this->getNextDownloader()) {
return false;
}
if (this->filedownloader) { // download next file
S3INFO("Time to download new file");
goto RETRY;
}
}
len = buflen;
} catch (...) {
S3ERROR("Caught an exception, aborting");
if (!this->filedownloader) {
S3INFO("No files to download, exit");
len = 0;
return true;
}
uint64_t buflen;
RETRY:
buflen = len;
bool result = this->filedownloader->get(data, buflen);
if (!result) {
S3ERROR("Failed to get data from filedownloader");
return false;
}
return true;
}
// invoked by s3_import(), need to be exception safe
bool S3Reader::Destroy() {
try {
if (this->filedownloader) {
this->filedownloader->destroy();
delete this->filedownloader;
this->filedownloader = NULL;
if (buflen == 0) {
if (!this->getNextDownloader()) {
return false;
}
if (this->keylist) {
delete this->keylist;
this->keylist = NULL;
if (this->filedownloader) {
S3INFO("Time to download new file");
goto RETRY;
}
} catch (...) {
S3ERROR("Caught an exception, aborting");
return false;
}
len = buflen;
return true;
}
bool S3ExtBase::ValidateURL() {
// http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
const char *awsdomain = ".amazonaws.com";
size_t ibegin = 0;
size_t iend = url.find("://");
if (iend == string::npos) { // Error
return false;
}
this->schema = url.substr(ibegin, iend);
if (this->schema == "s3") {
if (s3ext_encryption)
this->schema = "https";
else
this->schema = "http";
}
ibegin = url.find("://s3") + 4; // "3"
iend = url.find(awsdomain);
if (iend == string::npos) {
return false;
} else if (ibegin + 1 == iend) { // "s3.amazonaws.com"
this->region = "external-1";
} else {
this->region = url.substr(ibegin + 2, iend - ibegin - 2);
}
if (this->region.compare("us-east-1") == 0) {
this->region = "external-1";
}
ibegin = find_Nth(url, 3, "/");
iend = find_Nth(url, 4, "/");
if ((iend == string::npos) || (ibegin == string::npos)) {
return false;
}
this->bucket = url.substr(ibegin + 1, iend - ibegin - 1);
this->prefix = url.substr(iend + 1, url.length() - iend - 1);
return true;
}
/*
bool S3Protocol_t::Write(char *data, size_t &len) {
if (!this->fileuploader) {
// not initialized?
return false;
bool S3Reader::Destroy() {
if (this->filedownloader) {
this->filedownloader->destroy();
delete this->filedownloader;
this->filedownloader = NULL;
}
bool result = fileuploader->write(data, len);
if (!result) {
S3DEBUG("Failed to write data via fileuploader");
return false;
if (this->keylist) {
delete this->keylist;
this->keylist = NULL;
}
return true;
}
*/
// invoked by s3_import(), need to be exception safe
S3Reader *reader_init(const char *url_with_options) {
......@@ -311,7 +198,7 @@ S3Reader *reader_init(const char *url_with_options) {
InitRemoteLog();
S3Reader *reader = (S3Reader *)CreateExtWrapper(url);
S3Reader *reader = new S3Reader(url);
free(url);
......@@ -326,7 +213,8 @@ S3Reader *reader_init(const char *url_with_options) {
}
return reader;
} catch (...) {
}
catch (...) {
S3ERROR("Caught an exception, aborting");
return NULL;
}
......@@ -350,7 +238,8 @@ bool reader_transfer_data(S3Reader *reader, char *data_buf, int &data_len) {
// sure read_len <= data_len here, hence truncation will never happen
data_len = (int)read_len;
} catch (...) {
}
catch (...) {
S3ERROR("Caught an exception, aborting");
return false;
}
......@@ -374,11 +263,10 @@ bool reader_cleanup(S3Reader **reader) {
return false;
}
/*
* Cleanup function for the XML library.
*/
// Cleanup function for the XML library.
xmlCleanupParser();
} catch (...) {
}
catch (...) {
S3ERROR("Caught an exception, aborting");
return false;
}
......
#ifndef __GP_READER__
#define __GP_READER__
#include <string>
#include "s3reader.h"
#include "s3extbase.h"
using std::string;
// S3Reader implements readable external table.
class S3Reader : public S3ExtBase {
public:
S3Reader(const string& url);
virtual ~S3Reader();
virtual bool Init(int segid, int segnum, int chunksize);
virtual bool TransferData(char* data, uint64_t& len);
virtual bool Destroy();
protected:
// Get URL for a S3 object/file.
virtual string getKeyURL(const string& key);
// Get downloading handler for next file.
bool getNextDownloader();
// private:
unsigned int contentindex;
// Downloader is repsonsible for downloading one file using multiple
// threads.
Downloader* filedownloader;
// List of matched keys/files.
ListBucketResult* keylist;
};
// Following 3 functions are invoked by s3_import(), need to be exception safe
S3Reader* reader_init(const char* url_with_options);
bool reader_transfer_data(S3Reader* reader, char* data_buf, int& data_len);
bool reader_cleanup(S3Reader** reader);
#endif
......@@ -18,11 +18,12 @@
#include "utils/memutils.h"
#include "gps3ext.h"
#include "gpreader.h"
#include "s3common.h"
#include "s3conf.h"
#include "s3log.h"
#include "s3thread.h"
#include "s3utils.h"
#include "s3wrapper.h"
/* Do the module magic dance */
......@@ -82,10 +83,11 @@ Datum s3_import(PG_FUNCTION_ARGS) {
s3reader = reader_init(url_with_options);
if (!s3reader) {
ereport(ERROR, (0, errmsg("Failed to init S3 extension, segid = "
"%d, segnum = %d, please check your "
"configurations and net connection",
s3ext_segid, s3ext_segnum)));
ereport(ERROR, (0, errmsg(
"Failed to init S3 extension, segid = "
"%d, segnum = %d, please check your "
"configurations and net connection",
s3ext_segid, s3ext_segnum)));
}
check_essential_config();
......
......@@ -63,9 +63,8 @@ bool SignRequestV4(const string &method, HTTPHeaders *h,
string query_encoded = encode_query_str(query);
stringstream canonical_str;
canonical_str << method << "\n"
<< path << "\n"
<< query_encoded << "\nhost:" << h->Get(HOST)
canonical_str << method << "\n" << path << "\n" << query_encoded
<< "\nhost:" << h->Get(HOST)
<< "\nx-amz-content-sha256:" << h->Get(X_AMZ_CONTENT_SHA256)
<< "\nx-amz-date:" << h->Get(X_AMZ_DATE) << "\n\n"
<< "host;x-amz-content-sha256;x-amz-date\n"
......@@ -79,8 +78,7 @@ bool SignRequestV4(const string &method, HTTPHeaders *h,
find_replace(region, "external-1", "us-east-1");
stringstream string2sign_str;
string2sign_str << "AWS4-HMAC-SHA256\n"
<< h->Get(X_AMZ_DATE) << "\n"
string2sign_str << "AWS4-HMAC-SHA256\n" << h->Get(X_AMZ_DATE) << "\n"
<< date_str << "/" << region << "/s3/aws4_request\n"
<< canonical_hex;
......@@ -126,7 +124,7 @@ uint64_t XMLParserCallback(void *contents, uint64_t size, uint64_t nmemb,
}
// Returns string lengh till next occurence of given character.
static int strlen_to_next_char(const char* ptr, char ch) {
static int strlen_to_next_char(const char *ptr, char ch) {
int len = 0;
while ((*ptr != '\0') && (*ptr != ch)) {
len++;
......
......@@ -52,111 +52,104 @@ struct sockaddr_in s3ext_logserveraddr;
int32_t s3ext_logsock_udp = -1;
// not thread safe!!
// invoked by s3_import(), need to be exception safe
bool InitConfig(const string& conf_path, const string section = "default") {
try {
if (conf_path == "") {
if (conf_path == "") {
#ifndef S3_STANDALONE
write_log("Config file is not specified\n");
write_log("Config file is not specified\n");
#else
S3ERROR("Config file is not specified");
S3ERROR("Config file is not specified");
#endif
return false;
}
return false;
}
Config* s3cfg = new Config(conf_path);
if (s3cfg == NULL || !s3cfg->Handle()) {
Config* s3cfg = new Config(conf_path);
if (s3cfg == NULL || !s3cfg->Handle()) {
#ifndef S3_STANDALONE
write_log(
"Failed to parse config file \"%s\", or it doesn't exist\n",
conf_path.c_str());
write_log("Failed to parse config file \"%s\", or it doesn't exist\n",
conf_path.c_str());
#else
S3ERROR("Failed to parse config file \"%s\", or it doesn't exist",
conf_path.c_str());
S3ERROR("Failed to parse config file \"%s\", or it doesn't exist",
conf_path.c_str());
#endif
delete s3cfg;
return false;
}
delete s3cfg;
return false;
}
string content = s3cfg->Get(section.c_str(), "loglevel", "WARNING");
s3ext_loglevel = getLogLevel(content.c_str());
string content = s3cfg->Get(section.c_str(), "loglevel", "WARNING");
s3ext_loglevel = getLogLevel(content.c_str());
#ifndef S3_CHK_CFG
content = s3cfg->Get(section.c_str(), "logtype", "INTERNAL");
s3ext_logtype = getLogType(content.c_str());
content = s3cfg->Get(section.c_str(), "logtype", "INTERNAL");
s3ext_logtype = getLogType(content.c_str());
#endif
s3ext_accessid = s3cfg->Get(section.c_str(), "accessid", "");
s3ext_secret = s3cfg->Get(section.c_str(), "secret", "");
s3ext_token = s3cfg->Get(section.c_str(), "token", "");
s3ext_logserverhost =
s3cfg->Get(section.c_str(), "logserverhost", "127.0.0.1");
bool ret = s3cfg->Scan(section.c_str(), "logserverport", "%d",
&s3ext_logserverport);
if (!ret) {
s3ext_logserverport = 1111;
}
ret = s3cfg->Scan(section.c_str(), "threadnum", "%d", &s3ext_threadnum);
if (!ret) {
S3INFO("The thread number is set to default value 4");
s3ext_threadnum = 4;
}
if (s3ext_threadnum > 8) {
S3INFO("The given thread number is too big, use max value 8");
s3ext_threadnum = 8;
}
if (s3ext_threadnum < 1) {
S3INFO("The given thread number is too small, use min value 1");
s3ext_threadnum = 1;
}
ret = s3cfg->Scan(section.c_str(), "chunksize", "%d", &s3ext_chunksize);
if (!ret) {
S3INFO("The chunksize is set to default value 64MB");
s3ext_chunksize = 64 * 1024 * 1024;
}
if (s3ext_chunksize > 128 * 1024 * 1024) {
S3INFO("The given chunksize is too large, use max value 128MB");
s3ext_chunksize = 128 * 1024 * 1024;
}
if (s3ext_chunksize < 2 * 1024 * 1024) {
S3INFO("The given chunksize is too small, use min value 2MB");
s3ext_chunksize = 2 * 1024 * 1024;
}
ret = s3cfg->Scan(section.c_str(), "low_speed_limit", "%d",
&s3ext_low_speed_limit);
if (!ret) {
S3INFO("The low_speed_limit is set to default value %d bytes/s",
10240);
s3ext_low_speed_limit = 10240;
}
ret = s3cfg->Scan(section.c_str(), "low_speed_time", "%d",
&s3ext_low_speed_time);
if (!ret) {
S3INFO("The low_speed_time is set to default value %d seconds", 60);
s3ext_low_speed_time = 60;
}
content = s3cfg->Get(section.c_str(), "encryption", "true");
s3ext_encryption = to_bool(content);
s3ext_accessid = s3cfg->Get(section.c_str(), "accessid", "");
s3ext_secret = s3cfg->Get(section.c_str(), "secret", "");
s3ext_token = s3cfg->Get(section.c_str(), "token", "");
s3ext_logserverhost =
s3cfg->Get(section.c_str(), "logserverhost", "127.0.0.1");
bool ret = s3cfg->Scan(section.c_str(), "logserverport", "%d",
&s3ext_logserverport);
if (!ret) {
s3ext_logserverport = 1111;
}
ret = s3cfg->Scan(section.c_str(), "threadnum", "%d", &s3ext_threadnum);
if (!ret) {
S3INFO("The thread number is set to default value 4");
s3ext_threadnum = 4;
}
if (s3ext_threadnum > 8) {
S3INFO("The given thread number is too big, use max value 8");
s3ext_threadnum = 8;
}
if (s3ext_threadnum < 1) {
S3INFO("The given thread number is too small, use min value 1");
s3ext_threadnum = 1;
}
ret = s3cfg->Scan(section.c_str(), "chunksize", "%d", &s3ext_chunksize);
if (!ret) {
S3INFO("The chunksize is set to default value 64MB");
s3ext_chunksize = 64 * 1024 * 1024;
}
if (s3ext_chunksize > 128 * 1024 * 1024) {
S3INFO("The given chunksize is too large, use max value 128MB");
s3ext_chunksize = 128 * 1024 * 1024;
}
if (s3ext_chunksize < 2 * 1024 * 1024) {
S3INFO("The given chunksize is too small, use min value 2MB");
s3ext_chunksize = 2 * 1024 * 1024;
}
ret = s3cfg->Scan(section.c_str(), "low_speed_limit", "%d",
&s3ext_low_speed_limit);
if (!ret) {
S3INFO("The low_speed_limit is set to default value %d bytes/s", 10240);
s3ext_low_speed_limit = 10240;
}
ret = s3cfg->Scan(section.c_str(), "low_speed_time", "%d",
&s3ext_low_speed_time);
if (!ret) {
S3INFO("The low_speed_time is set to default value %d seconds", 60);
s3ext_low_speed_time = 60;
}
content = s3cfg->Get(section.c_str(), "encryption", "true");
s3ext_encryption = to_bool(content);
#ifdef S3_STANDALONE
s3ext_segid = 0;
s3ext_segnum = 1;
s3ext_segid = 0;
s3ext_segnum = 1;
#else
s3ext_segid = GpIdentity.segindex;
s3ext_segnum = GpIdentity.numsegments;
s3ext_segid = GpIdentity.segindex;
s3ext_segnum = GpIdentity.numsegments;
#endif
delete s3cfg;
} catch (...) {
return false;
}
delete s3cfg;
return true;
}
#include <sstream>
#include <string>
#include "gps3ext.h"
#include "s3conf.h"
#include "s3extbase.h"
#include "s3log.h"
#include "s3utils.h"
using std::string;
using std::stringstream;
S3ExtBase::S3ExtBase(const string &url) {
this->url = url;
// TODO: move into config
this->cred.secret = s3ext_secret;
this->cred.keyid = s3ext_accessid;
this->segid = s3ext_segid;
this->segnum = s3ext_segnum;
this->chunksize = s3ext_chunksize;
this->concurrent_num = s3ext_threadnum;
S3INFO("Created %d threads for downloading", s3ext_threadnum);
S3INFO("File is splited to %d each", s3ext_chunksize);
}
S3ExtBase::~S3ExtBase() {}
// Set schema to 'https' or 'http'
void S3ExtBase::SetSchema() {
size_t iend = this->url.find("://");
if (iend == string::npos) {
return;
}
this->schema = this->url.substr(0, iend);
if (this->schema == "s3") {
this->schema = s3ext_encryption ? "https" : "http";
}
}
// Set AWS region, use 'external-1' if it is 'us-east-1' or not present
// http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
void S3ExtBase::SetRegion() {
size_t ibegin =
this->url.find("://s3") +
strlen("://s3"); // index of character('.' or '-') after "3"
size_t iend = this->url.find(".amazonaws.com");
if (iend == string::npos) {
return;
} else if (ibegin == iend) { // "s3.amazonaws.com"
this->region = "external-1";
} else {
// ibegin + 1 is the character after "s3." or "s3-"
// for case: s3-us-west-2.amazonaws.com
this->region = this->url.substr(ibegin + 1, iend - (ibegin + 1));
}
if (this->region.compare("us-east-1") == 0) {
this->region = "external-1";
}
}
void S3ExtBase::SetBucketAndPrefix() {
size_t ibegin = find_Nth(this->url, 3, "/");
size_t iend = find_Nth(this->url, 4, "/");
if ((iend == string::npos) || (ibegin == string::npos)) {
return;
}
this->bucket = this->url.substr(ibegin + 1, iend - ibegin - 1);
this->prefix = this->url.substr(iend + 1, this->url.length() - iend - 1);
}
bool S3ExtBase::ValidateURL() {
this->SetSchema();
this->SetRegion();
this->SetBucketAndPrefix();
return !(this->schema.empty() || this->region.empty() ||
this->bucket.empty());
}
......@@ -46,9 +46,7 @@ void HTTPHeaders::CreateList() {
this->header_list = headers;
}
struct curl_slist *HTTPHeaders::GetList() {
return this->header_list;
}
struct curl_slist *HTTPHeaders::GetList() { return this->header_list; }
void HTTPHeaders::FreeList() {
if (this->header_list) {
......
......@@ -62,27 +62,22 @@ void LogMessage(LOGLEVEL loglevel, const char* fmt, ...) {
static bool loginited = false;
// invoked by s3_import(), need to be exception safe
void InitRemoteLog() {
try {
if (loginited) {
return;
}
if (loginited) {
return;
}
s3ext_logsock_udp = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (s3ext_logsock_udp < 0) {
perror("Failed to create socket while InitRemoteLog()");
}
s3ext_logsock_udp = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (s3ext_logsock_udp < 0) {
perror("Failed to create socket while InitRemoteLog()");
}
memset(&s3ext_logserveraddr, 0, sizeof(struct sockaddr_in));
s3ext_logserveraddr.sin_family = AF_INET;
s3ext_logserveraddr.sin_port = htons(s3ext_logserverport);
inet_aton(s3ext_logserverhost.c_str(), &s3ext_logserveraddr.sin_addr);
memset(&s3ext_logserveraddr, 0, sizeof(struct sockaddr_in));
s3ext_logserveraddr.sin_family = AF_INET;
s3ext_logserveraddr.sin_port = htons(s3ext_logserverport);
inet_aton(s3ext_logserverhost.c_str(), &s3ext_logserveraddr.sin_addr);
loginited = true;
} catch (...) {
return;
}
loginited = true;
}
LOGTYPE getLogType(const char* v) {
......
......@@ -12,7 +12,7 @@
#include <zlib.h>
#include "gps3ext.h"
#include "s3downloader.h"
#include "s3reader.h"
#include "s3http_headers.h"
#include "s3log.h"
#include "s3url_parser.h"
......@@ -144,7 +144,7 @@ uint64_t BlockingBuffer::Fill() {
if (leftlen != 0) {
readlen = this->fetchdata(offset, this->bufferdata + this->realsize,
leftlen);
if (readlen == (uint64_t)-1) {
if (readlen == (uint64_t) - 1) {
S3DEBUG("Failed to fetch data from libcurl");
} else {
S3DEBUG("Got %llu bytes from libcurl", readlen);
......@@ -157,7 +157,7 @@ uint64_t BlockingBuffer::Fill() {
this->eof = true;
S3DEBUG("Reached the end of file");
break;
} else if (readlen == (uint64_t)-1) { // Error
} else if (readlen == (uint64_t) - 1) { // Error
this->error = true;
S3ERROR("Failed to download file");
break;
......@@ -171,7 +171,7 @@ uint64_t BlockingBuffer::Fill() {
pthread_cond_signal(&this->stat_cond);
pthread_mutex_unlock(&this->stat_mutex);
return (readlen == (uint64_t)-1) ? -1 : this->realsize;
return (readlen == (uint64_t) - 1) ? -1 : this->realsize;
}
BlockingBuffer *BlockingBuffer::CreateBuffer(const string &url,
......@@ -200,7 +200,7 @@ void *DownloadThreadfunc(void *data) {
}
filled_size = buffer->Fill();
if (filled_size == (uint64_t)-1) {
if (filled_size == (uint64_t) - 1) {
S3DEBUG("Failed to fill downloading buffer");
} else {
S3DEBUG("Size of filled data is %llu", filled_size);
......@@ -208,7 +208,7 @@ void *DownloadThreadfunc(void *data) {
if (buffer->EndOfFile()) break;
if (filled_size == (uint64_t)-1) { // Error
if (filled_size == (uint64_t) - 1) { // Error
if (buffer->Error()) {
break;
} else {
......
#include <pthread.h>
#include <openssl/crypto.h>
#include <pthread.h>
#define MUTEX_TYPE pthread_mutex_t
#define MUTEX_SETUP(x) pthread_mutex_init(&(x), NULL)
......
......@@ -9,7 +9,7 @@
#include <sys/stat.h>
#include "s3common.h"
#include "s3downloader.h"
#include "s3reader.h"
#include "s3http_headers.h"
#include "s3uploader.h"
#include "s3url_parser.h"
......
#include "s3wrapper.cpp"
#include "gpreader.cpp"
#include "gtest/gtest.h"
#include "s3extbase.cpp"
TEST(ExtWrapper, ValidateURL_normal) {
TEST(ExtBase, ValidateURL_normal) {
S3ExtBase *myData;
myData = new S3Reader(
"s3://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/normal");
......@@ -12,7 +13,7 @@ TEST(ExtWrapper, ValidateURL_normal) {
delete myData;
}
TEST(ExtWrapper, ValidateURL_default) {
TEST(ExtBase, ValidateURL_default) {
S3ExtBase *myData;
myData =
new S3Reader("s3://s3.amazonaws.com/s3test.pivotal.io/dataset1/normal");
......@@ -23,7 +24,7 @@ TEST(ExtWrapper, ValidateURL_default) {
delete myData;
}
TEST(ExtWrapper, ValidateURL_useast1) {
TEST(ExtBase, ValidateURL_useast1) {
S3ExtBase *myData;
myData = new S3Reader(
"s3://s3-us-east-1.amazonaws.com/s3test.pivotal.io/dataset1/normal");
......@@ -34,7 +35,7 @@ TEST(ExtWrapper, ValidateURL_useast1) {
delete myData;
}
TEST(ExtWrapper, ValidateURL_eucentral1) {
TEST(ExtBase, ValidateURL_eucentral1) {
S3ExtBase *myData;
myData = new S3Reader(
"s3://s3.eu-central-1.amazonaws.com/s3test.pivotal.io/dataset1/normal");
......@@ -45,7 +46,7 @@ TEST(ExtWrapper, ValidateURL_eucentral1) {
delete myData;
}
TEST(ExtWrapper, ValidateURL_eucentral11) {
TEST(ExtBase, ValidateURL_eucentral11) {
S3ExtBase *myData;
myData = new S3Reader(
"s3://s3-eu-central-1.amazonaws.com/s3test.pivotal.io/dataset1/normal");
......@@ -56,7 +57,7 @@ TEST(ExtWrapper, ValidateURL_eucentral11) {
delete myData;
}
TEST(ExtWrapper, ValidateURL_apnortheast2) {
TEST(ExtBase, ValidateURL_apnortheast2) {
S3ExtBase *myData;
myData = new S3Reader(
"s3://s3.ap-northeast-2.amazonaws.com/s3test.pivotal.io/dataset1/"
......@@ -68,7 +69,7 @@ TEST(ExtWrapper, ValidateURL_apnortheast2) {
delete myData;
}
TEST(ExtWrapper, ValidateURL_apnortheast21) {
TEST(ExtBase, ValidateURL_apnortheast21) {
S3ExtBase *myData;
myData = new S3Reader(
"s3://s3-ap-northeast-2.amazonaws.com/s3test.pivotal.io/dataset1/"
......
......@@ -86,53 +86,53 @@ TEST(Common, UrlOptions) {
option = NULL;
}
EXPECT_STREQ(
"secret_test",
option = get_opt_s3("s3://neverland.amazonaws.com secret=secret_test "
"blah=whatever accessid=\".\\!@#$%^&*()DFGHJK\" "
"chunksize=3456789 KingOfTheWorld=sanpang",
"secret"));
EXPECT_STREQ("secret_test",
option = get_opt_s3(
"s3://neverland.amazonaws.com secret=secret_test "
"blah=whatever accessid=\".\\!@#$%^&*()DFGHJK\" "
"chunksize=3456789 KingOfTheWorld=sanpang",
"secret"));
if (option) {
free(option);
option = NULL;
}
EXPECT_STREQ(
"secret_test",
option = get_opt_s3("s3://neverland.amazonaws.com secret=secret_test "
"blah= accessid=\".\\!@#$%^&*()DFGHJK\" "
"chunksize=3456789 KingOfTheWorld=sanpang",
"secret"));
EXPECT_STREQ("secret_test",
option = get_opt_s3(
"s3://neverland.amazonaws.com secret=secret_test "
"blah= accessid=\".\\!@#$%^&*()DFGHJK\" "
"chunksize=3456789 KingOfTheWorld=sanpang",
"secret"));
if (option) {
free(option);
option = NULL;
}
EXPECT_STREQ(
"3456789",
option = get_opt_s3("s3://neverland.amazonaws.com secret=secret_test "
"chunksize=3456789 KingOfTheWorld=sanpang ",
"chunksize"));
EXPECT_STREQ("3456789",
option = get_opt_s3(
"s3://neverland.amazonaws.com secret=secret_test "
"chunksize=3456789 KingOfTheWorld=sanpang ",
"chunksize"));
if (option) {
free(option);
option = NULL;
}
EXPECT_STREQ(
"3456789",
option = get_opt_s3("s3://neverland.amazonaws.com secret=secret_test "
"chunksize=3456789 KingOfTheWorld=sanpang ",
"chunksize"));
EXPECT_STREQ("3456789",
option = get_opt_s3(
"s3://neverland.amazonaws.com secret=secret_test "
"chunksize=3456789 KingOfTheWorld=sanpang ",
"chunksize"));
if (option) {
free(option);
option = NULL;
}
EXPECT_STREQ(
"=sanpang",
option = get_opt_s3("s3://neverland.amazonaws.com secret=secret_test "
"chunksize=3456789 KingOfTheWorld==sanpang ",
"KingOfTheWorld"));
EXPECT_STREQ("=sanpang",
option = get_opt_s3(
"s3://neverland.amazonaws.com secret=secret_test "
"chunksize=3456789 KingOfTheWorld==sanpang ",
"KingOfTheWorld"));
if (option) {
free(option);
option = NULL;
......@@ -145,27 +145,29 @@ TEST(Common, UrlOptions) {
EXPECT_THROW(get_opt_s3("s3://neverland.amazonaws.com", "secret"),
std::runtime_error);
EXPECT_THROW(get_opt_s3("s3://neverland.amazonaws.com secret=secret_test "
"blah=whatever accessid= chunksize=3456789 "
"KingOfTheWorld=sanpang",
"accessid"),
EXPECT_THROW(get_opt_s3(
"s3://neverland.amazonaws.com secret=secret_test "
"blah=whatever accessid= chunksize=3456789 "
"KingOfTheWorld=sanpang",
"accessid"),
std::runtime_error);
EXPECT_THROW(get_opt_s3(
"s3://neverland.amazonaws.com secret=secret_test "
"blah=whatever chunksize=3456789 KingOfTheWorld=sanpang",
""),
std::runtime_error);
EXPECT_THROW(
get_opt_s3("s3://neverland.amazonaws.com secret=secret_test "
"blah=whatever chunksize=3456789 KingOfTheWorld=sanpang",
""),
std::runtime_error);
EXPECT_THROW(
get_opt_s3("s3://neverland.amazonaws.com secret=secret_test "
"blah=whatever chunksize=3456789 KingOfTheWorld=sanpang",
NULL),
std::runtime_error);
EXPECT_THROW(get_opt_s3("s3://neverland.amazonaws.com secret=secret_test "
"chunksize=3456789 KingOfTheWorld=sanpang ",
"chunk size"),
EXPECT_THROW(get_opt_s3(
"s3://neverland.amazonaws.com secret=secret_test "
"blah=whatever chunksize=3456789 KingOfTheWorld=sanpang",
NULL),
std::runtime_error);
EXPECT_THROW(get_opt_s3(
"s3://neverland.amazonaws.com secret=secret_test "
"chunksize=3456789 KingOfTheWorld=sanpang ",
"chunk size"),
std::runtime_error);
}
......
#include "s3downloader.cpp"
#include "s3reader.cpp"
#include "gtest/gtest.h"
volatile bool QueryCancelPending = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册