提交 014f002d 编写于 作者: A Adam Lee

Update S3 extention codes

compatible with s3.amazonaws.com and s3-us-east-1.amazonaws.com regions
catch any exceptions thrown by C++ functions
set min and max values of threadnum and chunksize

and will move development to GPDB source tree after this.
上级 9f1516f5
......@@ -235,6 +235,7 @@ uint64_t ParserCallback(void *contents, uint64_t size, uint64_t nmemb,
return realsize;
}
// invoked by s3_import(), need to be exception safe
char *get_opt_s3(const char *url, const char *key) {
const char *key_f = NULL;
const char *key_tailing = NULL;
......@@ -303,6 +304,7 @@ FAIL:
}
// returned memory needs to be freed
// invoked by s3_import(), need to be exception safe
char *truncate_options(const char *url_with_options) {
const char *delimiter = " ";
char *options = strstr((char *)url_with_options, delimiter);
......
......@@ -9,9 +9,14 @@
using std::stringstream;
using std::string;
// invoked by s3_import(), need to be exception safe
S3ExtBase *CreateExtWrapper(const char *url) {
S3ExtBase *ret = new S3Reader(url);
return ret;
try {
S3ExtBase *ret = new S3Reader(url);
return ret;
} catch (...) {
return NULL;
}
}
S3ExtBase::S3ExtBase(string url) {
......@@ -41,55 +46,62 @@ S3Reader::S3Reader(string url) : S3ExtBase(url) {
this->keylist = NULL;
}
// invoked by s3_import(), need to be exception safe
bool S3Reader::Init(int segid, int segnum, int chunksize) {
// set segment id and num
this->segid = s3ext_segid;
this->segnum = s3ext_segnum;
this->contentindex = this->segid;
this->chunksize = chunksize;
// Validate url first
if (!this->ValidateURL()) {
S3ERROR("The given URL(%s) is invalid", this->url.c_str());
return false;
}
try {
// set segment id and num
this->segid = s3ext_segid;
this->segnum = s3ext_segnum;
this->contentindex = this->segid;
this->chunksize = chunksize;
// Validate url first
if (!this->ValidateURL()) {
S3ERROR("The given URL(%s) is invalid", this->url.c_str());
return false;
}
// TODO: As separated function for generating url
stringstream sstr;
sstr << "s3-" << this->region << ".amazonaws.com";
S3DEBUG("Host url is %s", sstr.str().c_str());
int initretry = 3;
while (initretry--) {
this->keylist =
ListBucket(this->schema.c_str(), sstr.str().c_str(),
this->bucket.c_str(), this->prefix.c_str(), this->cred);
if (!this->keylist) {
S3INFO("Can't get keylist from bucket %s", this->bucket.c_str());
if (initretry) {
S3INFO("Retrying");
continue;
} else {
S3ERROR("Quit initialization because ListBucket keeps failing");
return false;
// TODO: As separated function for generating url
stringstream sstr;
sstr << "s3-" << this->region << ".amazonaws.com";
S3DEBUG("Host url is %s", sstr.str().c_str());
int initretry = 3;
while (initretry--) {
this->keylist = ListBucket(this->schema.c_str(), sstr.str().c_str(),
this->bucket.c_str(),
this->prefix.c_str(), this->cred);
if (!this->keylist) {
S3INFO("Can't get keylist from bucket %s",
this->bucket.c_str());
if (initretry) {
S3INFO("Retrying");
continue;
} else {
S3ERROR(
"Quit initialization because ListBucket keeps failing");
return false;
}
}
}
if (this->keylist->contents.size() == 0) {
S3INFO("Keylist of bucket is empty");
if (initretry) {
S3INFO("Retry listing bucket");
continue;
} else {
S3ERROR("Quit initialization because keylist is empty");
return false;
if (this->keylist->contents.size() == 0) {
S3INFO("Keylist of bucket is empty");
if (initretry) {
S3INFO("Retry listing bucket");
continue;
} else {
S3ERROR("Quit initialization because keylist is empty");
return false;
}
}
break;
}
break;
S3INFO("Got %d files to download", this->keylist->contents.size());
this->getNextDownloader();
} catch (...) {
return false;
}
S3INFO("Got %d files to download", this->keylist->contents.size());
this->getNextDownloader();
// return this->filedownloader ? true : false;
return true;
......@@ -106,7 +118,13 @@ void S3Reader::getNextDownloader() {
S3DEBUG("No more files to download");
return;
}
this->filedownloader = new Downloader(this->concurrent_num);
if (this->concurrent_num > 0) {
this->filedownloader = new Downloader(this->concurrent_num);
} else {
S3ERROR("Failed to create filedownloader due to threadnum");
return;
}
if (!this->filedownloader) {
S3ERROR("Failed to create filedownloader");
......@@ -116,9 +134,8 @@ void S3Reader::getNextDownloader() {
string keyurl = this->getKeyURL(c->Key());
S3DEBUG("key: %s, size: %llu", keyurl.c_str(), c->Size());
// XXX don't use strdup()
if (!filedownloader->init(keyurl.c_str(), c->Size(),
this->chunksize, &this->cred)) {
if (!filedownloader->init(keyurl, c->Size(), this->chunksize,
&this->cred)) {
delete this->filedownloader;
this->filedownloader = NULL;
} else { // move to next file
......@@ -137,50 +154,72 @@ string S3Reader::getKeyURL(const string key) {
return sstr.str();
}
// invoked by s3_import(), need to be exception safe
bool S3Reader::TransferData(char *data, uint64_t &len) {
if (!this->filedownloader) {
S3INFO("No files to download, exit");
// not initialized?
len = 0;
return true;
}
uint64_t buflen;
RETRY:
buflen = len;
// S3DEBUG("getlen is %d", len);
bool result = filedownloader->get(data, buflen);
if (!result) { // read fail
S3ERROR("Failed to get data from filedownloader");
return false;
}
// S3DEBUG("getlen is %lld", buflen);
if (buflen == 0) {
// change to next downloader
this->getNextDownloader();
if (this->filedownloader) { // download next file
S3INFO("Time to download new file");
goto RETRY;
try {
if (!this->filedownloader) {
S3INFO("No files to download, exit");
// not initialized?
len = 0;
return true;
}
uint64_t buflen;
RETRY:
buflen = len;
// S3DEBUG("getlen is %d", len);
bool result = filedownloader->get(data, buflen);
if (!result) { // read fail
S3ERROR("Failed to get data from filedownloader");
return false;
}
// S3DEBUG("getlen is %lld", buflen);
if (buflen == 0) {
// change to next downloader
this->getNextDownloader();
if (this->filedownloader) { // download next file
S3INFO("Time to download new file");
goto RETRY;
}
}
len = buflen;
} catch (...) {
return false;
}
len = buflen;
return true;
}
// invoked by s3_import(), need to be exception safe
bool S3Reader::Destroy() {
// reset filedownloader
if (this->filedownloader) {
this->filedownloader->destroy();
delete this->filedownloader;
}
try {
// reset filedownloader
if (this->filedownloader) {
this->filedownloader->destroy();
delete this->filedownloader;
}
// Free keylist
if (this->keylist) {
delete this->keylist;
// Free keylist
if (this->keylist) {
delete this->keylist;
}
} catch (...) {
return false;
}
return true;
}
bool S3ExtBase::ValidateURL() {
// TODO http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
// As the documentation above says, some regions use domains in forms other
// than the "standard" ones, need to cover them here.
//
// Still have two to take care of, do it later since they only support
// signature v4, s3ext doesn't support v4 yet.
//
// s3.eu-central-1.amazonaws.com -> s3-eu-central-1.amazonaws.com
// s3.ap-northeast-2.amazonaws.com -> s3-ap-northeast-2.amazonaws.com
const char *awsdomain = ".amazonaws.com";
int ibegin = 0;
int iend = url.find("://");
......@@ -199,10 +238,18 @@ bool S3ExtBase::ValidateURL() {
ibegin = url.find("-");
iend = url.find(awsdomain);
if ((iend == string::npos) || (ibegin == string::npos)) {
if (iend == string::npos) {
return false;
} else if (ibegin == string::npos) {
this->region = "external-1";
} else {
this->region = url.substr(ibegin + 1, iend - ibegin - 1);
}
if (this->region.compare("us-east-1") == 0) {
this->region = "external-1";
}
this->region = url.substr(ibegin + 1, iend - ibegin - 1);
ibegin = find_Nth(url, 3, "/");
iend = find_Nth(url, 4, "/");
......
......@@ -14,6 +14,9 @@ class S3ExtBase {
virtual bool Init(int segid, int segnum, int chunksize) = 0;
virtual bool TransferData(char* data, uint64_t& len) = 0;
virtual bool Destroy() = 0;
virtual bool ValidateURL();
string get_region() { return this->region; }
protected:
S3Credential cred;
......@@ -29,8 +32,6 @@ class S3ExtBase {
int concurrent_num;
int chunksize;
virtual bool ValidateURL();
};
class S3Reader : public S3ExtBase {
......
......@@ -7,10 +7,10 @@ class S3Reader_fake : public S3Reader {
virtual ~S3Reader_fake();
virtual bool Init(int segid, int segnum, int chunksize);
virtual bool Destroy();
virtual bool ValidateURL();
protected:
virtual string getKeyURL(const string key);
virtual bool ValidateURL();
};
S3Reader_fake::S3Reader_fake(string url) : S3Reader(url) {}
......@@ -117,8 +117,53 @@ void ExtWrapperTest(const char *url, uint64_t buffer_size, const char *md5_str,
free(buf);
}
TEST(ExtWrapper, ValidateURL_normal) {
S3ExtBase *myData;
myData = new S3Reader(
"s3://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/normal");
ASSERT_TRUE(myData->ValidateURL());
EXPECT_STREQ("us-west-2", myData->get_region().c_str());
delete myData;
}
TEST(ExtWrapper, ValidateURL_default) {
S3ExtBase *myData;
myData =
new S3Reader("s3://s3.amazonaws.com/s3test.pivotal.io/dataset1/normal");
ASSERT_TRUE(myData->ValidateURL());
EXPECT_STREQ("external-1", myData->get_region().c_str());
delete myData;
}
TEST(ExtWrapper, ValidateURL_useast1) {
S3ExtBase *myData;
myData = new S3Reader(
"s3://s3-us-east-1.amazonaws.com/s3test.pivotal.io/dataset1/normal");
ASSERT_TRUE(myData->ValidateURL());
EXPECT_STREQ("external-1", myData->get_region().c_str());
delete myData;
}
#ifdef AWSTEST
TEST(ExtWrapper, normal_region_default) {
ExtWrapperTest(
"https://s3.amazonaws.com/useast1.s3test.pivotal.io/small17/",
64 * 1024, "138fc555074671912125ba692c678246", 0, 1, 64 * 1024 * 1024);
}
TEST(ExtWrapper, normal_region_useast1) {
ExtWrapperTest(
"https://s3-us-east-1.amazonaws.com/useast1.s3test.pivotal.io/small17/",
64 * 1024, "138fc555074671912125ba692c678246", 0, 1, 64 * 1024 * 1024);
}
TEST(ExtWrapper, normal) {
ExtWrapperTest(
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/"
......@@ -175,19 +220,19 @@ TEST(ExtWrapper, huge_1seg) {
ExtWrapperTest(
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/"
"hugefile/",
64 * 1024, "b87b5d79e2bcb8dc1d0fd289fbfa5829", 0, 1, 64 * 1024 * 1024);
64 * 1024, "75baaa39f2b1544ed8af437c2cad86b7", 0, 1, 64 * 1024 * 1024);
}
TEST(ExtWrapper, normal2_3segs) {
ExtWrapperTest(
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/normal/",
64 * 1024, "b87b5d79e2bcb8dc1d0fd289fbfa5829", 0, 3, 64 * 1024 * 1024);
64 * 1024, "1c1b198b246160733f7a3491bff5cd52", 0, 3, 64 * 1024 * 1024);
ExtWrapperTest(
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/normal/",
64 * 1024, "b87b5d79e2bcb8dc1d0fd289fbfa5829", 1, 3, 64 * 1024 * 1024);
64 * 1024, "296856eb9739d3022b3e9d8bf3b1ea2e", 1, 3, 64 * 1024 * 1024);
ExtWrapperTest(
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/normal/",
64 * 1024, "b87b5d79e2bcb8dc1d0fd289fbfa5829", 2, 3, 64 * 1024 * 1024);
64 * 1024, "00675684b6d6697571f22baaf407c6df", 2, 3, 64 * 1024 * 1024);
}
#endif // AWSTEST
......
......@@ -89,8 +89,13 @@ void LogMessage(LOGLEVEL loglevel, const char* fmt, ...) {
}
static bool loginited = false;
// invoked by s3_import(), need to be exception safe
void InitLog() {
if (!loginited) {
try {
if (loginited) {
return;
}
s3ext_logsock_local = socket(PF_UNIX, SOCK_DGRAM, 0);
if (s3ext_logsock_local < 0) {
perror("Failed to create socket while InitLog()");
......@@ -113,6 +118,8 @@ void InitLog() {
inet_aton(s3ext_logserverhost.c_str(), &s3ext_logserveraddr.sin_addr);
loginited = true;
} catch (...) {
return;
}
}
......
......@@ -58,97 +58,126 @@ int32_t s3ext_logsock_udp = -1;
Config* s3cfg = NULL;
// not thread safe!!
// invoked by s3_import(), need to be exception safe
bool InitConfig(string conf_path, string section /*not used currently*/) {
if (conf_path == "") {
try {
if (conf_path == "") {
#ifndef DEBUGS3
write_log("Config file is not specified\n");
write_log("Config file is not specified\n");
#endif
return false;
}
return false;
}
if (s3cfg) delete s3cfg;
if (s3cfg) delete s3cfg;
s3cfg = new Config(conf_path);
if (!s3cfg || !s3cfg->Handle()) {
s3cfg = new Config(conf_path);
if (!s3cfg || !s3cfg->Handle()) {
#ifndef DEBUGS3
write_log("Failed to parse config file\n");
write_log("Failed to parse config file\n");
#endif
if (s3cfg) {
delete s3cfg;
s3cfg = NULL;
if (s3cfg) {
delete s3cfg;
s3cfg = NULL;
}
return false;
}
return false;
}
Config* cfg = s3cfg;
bool ret = false;
string content;
content = cfg->Get("default", "loglevel", "INFO");
s3ext_loglevel = getLogLevel(content.c_str());
Config* cfg = s3cfg;
bool ret = false;
string content;
content = cfg->Get("default", "loglevel", "INFO");
s3ext_loglevel = getLogLevel(content.c_str());
content = cfg->Get("default", "logtype", "INTERNAL");
s3ext_logtype = getLogType(content.c_str());
content = cfg->Get("default", "logtype", "INTERNAL");
s3ext_logtype = getLogType(content.c_str());
s3ext_accessid = cfg->Get("default", "accessid", "");
s3ext_secret = cfg->Get("default", "secret", "");
s3ext_token = cfg->Get("default", "token", "");
s3ext_accessid = cfg->Get("default", "accessid", "");
s3ext_secret = cfg->Get("default", "secret", "");
s3ext_token = cfg->Get("default", "token", "");
#ifdef DEBUGS3
// s3ext_loglevel = EXT_DEBUG;
// s3ext_logtype = LOCAL_LOG;
#endif
s3ext_logpath = cfg->Get("default", "logpath", "/tmp/.s3log.sock");
s3ext_logserverhost = cfg->Get("default", "logserverhost", "127.0.0.1");
s3ext_logpath = cfg->Get("default", "logpath", "/tmp/.s3log.sock");
s3ext_logserverhost = cfg->Get("default", "logserverhost", "127.0.0.1");
ret = cfg->Scan("default", "logserverport", "%d", &s3ext_logserverport);
if (!ret) {
s3ext_logserverport = 1111;
}
ret = cfg->Scan("default", "logserverport", "%d", &s3ext_logserverport);
if (!ret) {
s3ext_logserverport = 1111;
}
ret = cfg->Scan("default", "threadnum", "%d", &s3ext_threadnum);
if (!ret) {
S3INFO("Failed to get thread number, use default value 4");
s3ext_threadnum = 4;
}
ret = cfg->Scan("default", "threadnum", "%d", &s3ext_threadnum);
if (!ret) {
S3INFO("Failed to get thread number, use default value 4");
s3ext_threadnum = 4;
}
if (s3ext_threadnum > 8) {
S3INFO("The given thread number is too big, use max value 8");
s3ext_threadnum = 8;
}
if (s3ext_threadnum < 1) {
S3INFO("The given thread number is too small, use min value 1");
s3ext_threadnum = 1;
}
ret = cfg->Scan("default", "chunksize", "%d", &s3ext_chunksize);
if (!ret) {
S3INFO("Failed to get chunksize, use default value %d",
64 * 1024 * 1024);
s3ext_chunksize = 64 * 1024 * 1024;
}
ret = cfg->Scan("default", "chunksize", "%d", &s3ext_chunksize);
if (!ret) {
S3INFO("Failed to get chunksize, use default value 64MB");
s3ext_chunksize = 64 * 1024 * 1024;
}
if (s3ext_chunksize > 128 * 1024 * 1024) {
S3INFO("The given chunksize is too large, use max value 128MB");
s3ext_chunksize = 128 * 1024 * 1024;
}
if (s3ext_chunksize < 2 * 1024 * 1024) {
S3INFO("The given chunksize is too small, use min value 2MB");
s3ext_chunksize = 2 * 1024 * 1024;
}
ret = cfg->Scan("default", "low_speed_limit", "%d", &s3ext_low_speed_limit);
if (!ret) {
S3INFO("Failed to get low_speed_limit, use default value %d bytes/s",
10240);
s3ext_low_speed_limit = 10240;
}
ret = cfg->Scan("default", "low_speed_limit", "%d",
&s3ext_low_speed_limit);
if (!ret) {
S3INFO(
"Failed to get low_speed_limit, use default value %d bytes/s",
10240);
s3ext_low_speed_limit = 10240;
}
ret = cfg->Scan("default", "low_speed_time", "%d", &s3ext_low_speed_time);
if (!ret) {
S3INFO("Failed to get low_speed_time, use default value %d seconds",
60);
s3ext_low_speed_time = 60;
}
ret =
cfg->Scan("default", "low_speed_time", "%d", &s3ext_low_speed_time);
if (!ret) {
S3INFO("Failed to get low_speed_time, use default value %d seconds",
60);
s3ext_low_speed_time = 60;
}
content = cfg->Get("default", "encryption", "true");
s3ext_encryption = to_bool(content);
content = cfg->Get("default", "encryption", "true");
s3ext_encryption = to_bool(content);
#ifdef DEBUGS3
s3ext_segid = 0;
s3ext_segnum = 1;
s3ext_segid = 0;
s3ext_segnum = 1;
#else
s3ext_segid = GpIdentity.segindex;
s3ext_segnum = GpIdentity.numsegments;
s3ext_segid = GpIdentity.segindex;
s3ext_segnum = GpIdentity.numsegments;
#endif
} catch (...) {
return false;
}
return true;
}
// invoked by s3_import(), need to be exception safe
void ClearConfig() {
if (s3cfg) {
delete s3cfg;
s3cfg = NULL;
try {
if (s3cfg) {
delete s3cfg;
s3cfg = NULL;
}
} catch (...) {
return;
}
}
......@@ -36,55 +36,44 @@ Datum s3_import(PG_FUNCTION_ARGS);
Datum s3_validate_urls(PG_FUNCTION_ARGS);
}
#define MUTEX_TYPE pthread_mutex_t
#define MUTEX_SETUP(x) pthread_mutex_init(&(x), NULL)
#define MUTEX_TYPE pthread_mutex_t
#define MUTEX_SETUP(x) pthread_mutex_init(&(x), NULL)
#define MUTEX_CLEANUP(x) pthread_mutex_destroy(&(x))
#define MUTEX_LOCK(x) pthread_mutex_lock(&(x))
#define MUTEX_UNLOCK(x) pthread_mutex_unlock(&(x))
#define THREAD_ID pthread_self( )
/* This array will store all of the mutexes available to OpenSSL. */
static MUTEX_TYPE *mutex_buf= NULL;
static void locking_function(int mode, int n, const char * file, int line)
{
#define MUTEX_LOCK(x) pthread_mutex_lock(&(x))
#define MUTEX_UNLOCK(x) pthread_mutex_unlock(&(x))
#define THREAD_ID pthread_self()
/* This array will store all of the mutexes available to OpenSSL. */
static MUTEX_TYPE *mutex_buf = NULL;
static void locking_function(int mode, int n, const char *file, int line) {
if (mode & CRYPTO_LOCK)
MUTEX_LOCK(mutex_buf[n]);
else
MUTEX_UNLOCK(mutex_buf[n]);
}
static unsigned long id_function(void)
{
return ((unsigned long)THREAD_ID);
}
int thread_setup(void)
{
static unsigned long id_function(void) { return ((unsigned long)THREAD_ID); }
int thread_setup(void) {
int i;
mutex_buf = (pthread_mutex_t*)palloc(CRYPTO_num_locks() * sizeof(MUTEX_TYPE));
if (!mutex_buf)
return 0;
for (i = 0; i < CRYPTO_num_locks( ); i++)
MUTEX_SETUP(mutex_buf[i]);
mutex_buf =
(pthread_mutex_t *)palloc(CRYPTO_num_locks() * sizeof(MUTEX_TYPE));
if (!mutex_buf) return 0;
for (i = 0; i < CRYPTO_num_locks(); i++) MUTEX_SETUP(mutex_buf[i]);
CRYPTO_set_id_callback(id_function);
CRYPTO_set_locking_callback(locking_function);
return 1;
}
int thread_cleanup(void)
{
int thread_cleanup(void) {
int i;
if (!mutex_buf)
return 0;
if (!mutex_buf) return 0;
CRYPTO_set_id_callback(NULL);
CRYPTO_set_locking_callback(NULL);
for (i = 0; i < CRYPTO_num_locks( ); i++)
MUTEX_CLEANUP(mutex_buf[i]);
for (i = 0; i < CRYPTO_num_locks(); i++) MUTEX_CLEANUP(mutex_buf[i]);
pfree(mutex_buf);
mutex_buf = NULL;
return 1;
......@@ -92,6 +81,7 @@ int thread_cleanup(void)
/*
* Import data into GPDB.
* invoked by GPDB, be careful with C++ exceptions.
*/
Datum s3_import(PG_FUNCTION_ARGS) {
S3ExtBase *myData;
......@@ -111,7 +101,7 @@ Datum s3_import(PG_FUNCTION_ARGS) {
if (myData) {
thread_cleanup();
if (!myData->Destroy()) {
ereport(ERROR, (0, errmsg("Cleanup S3 extention failed")));
ereport(ERROR, (0, errmsg("Failed to cleanup S3 extention")));
}
delete myData;
}
......@@ -122,6 +112,7 @@ Datum s3_import(PG_FUNCTION_ARGS) {
/* first call. do any desired init */
curl_global_init(CURL_GLOBAL_ALL);
thread_setup();
const char *p_name = "s3";
char *url_with_options = EXTPROTOCOL_GET_URL(fcinfo);
char *url = truncate_options(url_with_options);
......@@ -135,9 +126,8 @@ Datum s3_import(PG_FUNCTION_ARGS) {
bool result = InitConfig(config_path, "");
if (!result) {
ereport(ERROR,
(0, errmsg("Can't find config file %s", config_path)));
free(config_path);
ereport(ERROR, (0, errmsg("Can't find config file, please check")));
} else {
ClearConfig();
free(config_path);
......@@ -202,6 +192,7 @@ Datum s3_import(PG_FUNCTION_ARGS) {
/*
* Export data out of GPDB.
* invoked by GPDB, be careful with C++ exceptions.
*/
Datum s3_export(PG_FUNCTION_ARGS) { PG_RETURN_INT32(0); }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册