提交 c962ef2c 编写于 作者: A Adam Lee

s3ext: amend comments and signatures

上级 cb84ba41
......@@ -4,7 +4,7 @@
#include <zlib.h>
#include "reader.h"
// 256K by default
// 2MB by default
extern uint64_t S3_ZIP_CHUNKSIZE;
class DecompressReader : public Reader {
......
......@@ -5,7 +5,7 @@
#include <signal.h>
#include <sys/types.h>
// GPDB's global val
// GPDB's global variable
extern volatile bool QueryCancelPending;
// TODO change to functions getgpsegmentId() and getgpsegmentCount()
......
......@@ -14,7 +14,7 @@ using std::vector;
using std::map;
enum ResponseStatus {
RESPONSE_OK, // everything is ok
RESPONSE_OK, // everything is OK
RESPONSE_FAIL, // curl failed (i.e., the status is not CURLE_OK)
RESPONSE_ERROR, // server error (server return code is not 200)
};
......
......@@ -27,10 +27,10 @@ class S3BucketReader : public Reader {
this->upstreamReader = reader;
}
void validateURL();
void validateURL(const string &url) {
void parseURL();
void parseURL(const string &url) {
this->url = url;
validateURL();
parseURL();
};
ListBucketResult *getKeyList() {
......
......@@ -24,10 +24,8 @@ enum Method { GET, PUT, POST, DELETE, HEAD };
void SignRequestV4(const string& method, HTTPHeaders* h, const string& orig_region,
const string& path, const string& query, const S3Credential& cred);
char* get_opt_s3(const char* url, const char* key);
string get_opt_s3(const string& options, const string& key);
char* truncate_options(const char* url_with_options);
string truncate_options(const string& url_with_options);
#endif // __S3_COMMON_H__
#endif // __S3_COMMON_H__
\ No newline at end of file
......@@ -6,7 +6,7 @@
class S3CommonReader : public Reader {
public:
S3CommonReader() {
S3CommonReader() : upstreamReader(NULL), s3service(NULL) {
}
virtual ~S3CommonReader() {
......
......@@ -30,6 +30,7 @@ struct BucketContent {
}
~BucketContent() {
}
string getName() const {
return this->name;
};
......
#ifndef __S3LOG__
#define __S3LOG__
#ifndef __S3_LOG_H__
#define __S3_LOG_H__
#include <cstdarg>
#include <cstdio>
......@@ -13,10 +13,10 @@ enum LOGLEVEL { EXT_FATAL, EXT_ERROR, EXT_WARNING, EXT_INFO, EXT_DEBUG };
// log type
enum LOGTYPE {
REMOTE_LOG, // log to remote udp server
LOCAL_LOG, // log to local unix dgram domain socket
INTERNAL_LOG, // use pg elog
STDERR_LOG // use stderr
REMOTE_LOG, // log to remote UDP server
LOCAL_LOG, // log to local Unix domain socket
INTERNAL_LOG, // use PostgreSQL's elog()
STDERR_LOG // use STDERR
};
void LogMessage(LOGLEVEL level, const char* fmt, ...);
......@@ -48,4 +48,4 @@ LOGLEVEL getLogLevel(const char* v);
void InitRemoteLog();
#endif // __S3LOG__
#endif // __S3_LOG_H__
\ No newline at end of file
......@@ -24,6 +24,7 @@ class UrlParser {
}
private:
// get the string of URL field
string extractField(const struct http_parser_url *u, http_parser_url_fields i);
string schema;
......
#ifndef __S3_UTILFUNCTIONS__
#define __S3_UTILFUNCTIONS__
#ifndef __S3_UTILS_H__
#define __S3_UTILS_H__
#include <stdint.h>
#include <sys/types.h>
......@@ -69,12 +69,12 @@ class Config {
ini_t* _conf;
};
bool to_bool(std::string str);
bool to_bool(string str);
std::string uri_encode(const std::string& src);
string uri_encode(const string& src);
std::string uri_decode(const std::string& src);
string uri_decode(const string& src);
void find_replace(string& str, const string& find, const string& replace);
#endif // _UTILFUNCTIONS_
#endif // __S3_UTILS_H__
#include <algorithm>
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include <string.h>
......@@ -36,7 +37,7 @@ void DecompressReader::setReader(Reader *reader) {
}
void DecompressReader::open(const ReaderParams &params) {
// allocate inflate state
// allocate inflate state for zlib
zstream.zalloc = Z_NULL;
zstream.zfree = Z_NULL;
zstream.opaque = Z_NULL;
......
......@@ -11,6 +11,8 @@
#include "s3macros.h"
#include "s3utils.h"
string gpReaderErrorMessage;
// Thread related functions, called only by gpreader and gpcheckcloud
#define MUTEX_TYPE pthread_mutex_t
#define MUTEX_SETUP(x) pthread_mutex_init(&(x), NULL)
......@@ -61,8 +63,6 @@ int thread_cleanup(void) {
return 1;
}
string gpReaderErrorMessage;
GPReader::GPReader(const string& url) {
constructReaderParam(url);
restfulServicePtr = &restfulService;
......
// Required by building cpp dynamic library via Makefile of GPDB.
// Required by building CPP dynamic library via Makefile of GPDB.
#define PGDLLIMPORT "C"
#include <signal.h>
......@@ -24,7 +24,6 @@
#include "s3utils.h"
/* Do the module magic dance */
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(s3_export);
PG_FUNCTION_INFO_V1(s3_import);
......
......@@ -44,7 +44,7 @@ void S3BucketReader::open(const ReaderParams &params) {
this->chunkSize = params.getChunkSize();
this->numOfChunks = params.getNumOfChunks();
this->validateURL();
this->parseURL();
CHECK_OR_DIE(this->s3interface != NULL);
......@@ -188,7 +188,7 @@ void S3BucketReader::SetBucketAndPrefix() {
this->prefix = url.substr(ibegin + 1, url.length() - ibegin - 1);
}
void S3BucketReader::validateURL() {
void S3BucketReader::parseURL() {
this->SetSchema();
this->SetRegion();
this->SetBucketAndPrefix();
......
......@@ -3,8 +3,10 @@
void S3CommonReader::open(const ReaderParams &params) {
this->keyReader.setS3interface(s3service);
S3CompressionType compressionType =
s3service->checkCompressionType(params.getKeyUrl(), params.getRegion(), params.getCred());
switch (compressionType) {
case S3_COMPRESSION_GZIP:
this->upstreamReader = &this->decompressReader;
......@@ -16,6 +18,7 @@ void S3CommonReader::open(const ReaderParams &params) {
default:
CHECK_OR_DIE_MSG(false, "%s", "unknown file type");
};
this->upstreamReader->open(params);
}
......
......@@ -50,7 +50,7 @@ int32_t s3ext_segnum = -1;
int32_t s3ext_logsock_udp = -1;
struct sockaddr_in s3ext_logserveraddr;
// not thread safe!!
// not thread safe
bool InitConfig(const string& conf_path, const string section = "default") {
if (conf_path == "") {
#ifndef S3_STANDALONE
......
......@@ -23,7 +23,7 @@ HTTPHeaders::~HTTPHeaders() {
this->FreeList();
}
bool HTTPHeaders::Add(HeaderField f, const std::string &v) {
bool HTTPHeaders::Add(HeaderField f, const string &v) {
if (v.empty()) {
return false;
} else {
......
......@@ -19,6 +19,7 @@
#include "s3interface.h"
using std::stringstream;
// use destructor ~XMLContextHolder() to do the cleanup
class XMLContextHolder {
public:
XMLContextHolder(xmlParserCtxtPtr ctx) : context(ctx) {
......@@ -88,7 +89,7 @@ HTTPHeaders S3Service::composeHTTPHeaders(const string &url, const string &marke
UrlParser p(url);
std::stringstream query;
stringstream query;
if (!marker.empty()) {
query << "marker=" << marker;
if (!prefix.empty()) {
......
......@@ -109,7 +109,7 @@ uint64_t ChunkBuffer::read(char* buf, uint64_t len) {
return lenToRead;
}
// returning -1 means error
// returning uint64_t(-1) means error
uint64_t ChunkBuffer::fill() {
pthread_mutex_lock(&this->statusMutex);
while (this->status != ReadyToFill) {
......@@ -249,6 +249,7 @@ uint64_t S3KeyReader::read(char* buf, uint64_t count) {
return readLen;
}
// reset marks before reading next key
void S3KeyReader::reset() {
this->sharedError = false;
this->curReadingChunk = 0;
......
......@@ -169,8 +169,8 @@ const char *GetUploadId(const char *host, const char *bucket, const char *obj_na
// <Key>example-object</Key>
// <UploadId>VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA</UploadId>
// </InitiateMultipartUploadResult>
std::stringstream url;
std::stringstream path_with_query;
stringstream url;
stringstream path_with_query;
XMLInfo xml;
xml.ctxt = NULL;
......@@ -248,8 +248,8 @@ const char *GetUploadId(const char *host, const char *bucket, const char *obj_na
const char *PartPutS3Object(const char *host, const char *bucket, const char *obj_name,
const S3Credential &cred, const char *data, uint64_t data_size,
uint64_t part_number, const char *upload_id) {
std::stringstream url;
std::stringstream path_with_query;
stringstream url;
stringstream path_with_query;
XMLInfo xml;
xml.ctxt = NULL;
......@@ -353,7 +353,7 @@ const char *PartPutS3Object(const char *host, const char *bucket, const char *ob
// TODO general header content extracting func
uint64_t etag_start_pos = out.str().find("ETag: ") + 6;
std::string etag_to_end = out.str().substr(etag_start_pos);
string etag_to_end = out.str().substr(etag_start_pos);
// RFC 2616 states "HTTP/1.1 defines the sequence CR LF as the end-of-line
// marker for all protocol elements except the entity-body"
uint64_t etag_len = etag_to_end.find("\r");
......@@ -413,8 +413,8 @@ const char *PartPutS3Object(const char *host, const char *bucket, const char *ob
bool CompleteMultiPutS3(const char *host, const char *bucket, const char *obj_name,
const char *upload_id, const char **etag_array, uint64_t count,
const S3Credential &cred) {
std::stringstream url;
std::stringstream path_with_query;
stringstream url;
stringstream path_with_query;
XMLInfo xml;
xml.ctxt = NULL;
......@@ -452,7 +452,7 @@ bool CompleteMultiPutS3(const char *host, const char *bucket, const char *obj_na
// <ETag>"acbd18db4cc2f85cedef654fccc4a4d8"</ETag>
// </Part>
// </CompleteMultipartUpload>
std::stringstream body;
stringstream body;
body << "<CompleteMultipartUpload>\n";
for (uint64_t i = 0; i < count; ++i) {
......
......@@ -226,7 +226,7 @@ bool Config::Scan(const string &sec, const string &key, const char *scanfmt, voi
return ini_sget(this->_conf, sec.c_str(), key.c_str(), scanfmt, dst);
}
bool to_bool(std::string str) {
bool to_bool(string str) {
std::transform(str.begin(), str.end(), str.begin(), ::tolower);
if ((str == "yes") || (str == "true") || (str == "y") || (str == "t") || (str == "1")) {
return true;
......@@ -274,7 +274,7 @@ const char uri_mapping[256] = {
/* F */ -1, -1, -1, -1, -1, -1, -1, -1,
/* */ -1, -1, -1, -1, -1, -1, -1, -1};
// alpha, num and - _ . ~ are reserved(RFC 3986).
// alpha, numbers and - _ . ~ are reserved(RFC 3986).
const char uri_reserved[256] = {
/* 0 1 2 3 4 5 6 7 8 9 A B C D E F */
/* 0 */ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
......@@ -297,7 +297,7 @@ const char uri_reserved[256] = {
/* E */ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/* F */ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
std::string uri_encode(const std::string &src) {
string uri_encode(const string &src) {
const unsigned char *src_str = (const unsigned char *)src.c_str();
const int src_len = src.length();
......@@ -319,12 +319,12 @@ std::string uri_encode(const std::string &src) {
src_str++;
}
std::string ret_str((char *)sub_start, (char *)sub_end);
string ret_str((char *)sub_start, (char *)sub_end);
delete[] sub_start;
return ret_str;
}
std::string uri_decode(const std::string &src) {
string uri_decode(const string &src) {
const unsigned char *src_str = (const unsigned char *)src.c_str();
const int src_len = src.length();
......@@ -353,7 +353,7 @@ std::string uri_decode(const std::string &src) {
while (src_str < src_end) *sub_end++ = *src_str++;
std::string ret_str(sub_start, sub_end);
string ret_str(sub_start, sub_end);
delete[] sub_start;
return ret_str;
}
......
......@@ -237,7 +237,7 @@ TEST_F(GPReaderTest, ReadFromEmptyURL) {
MockS3RESTfulService mockRestfulService;
MockGPReader gpreader(url, &mockRestfulService);
// an exception should be throwed in validateURL()
// an exception should be throwed in parseURL()
// with message "'' is not valid"
ReaderParams params;
EXPECT_THROW(gpreader.open(params), std::runtime_error);
......@@ -249,7 +249,7 @@ TEST_F(GPReaderTest, ReadFromInvalidURL) {
MockS3RESTfulService mockRestfulService;
MockGPReader gpreader(url, &mockRestfulService);
// an exception should be throwed in validateURL()
// an exception should be throwed in parseURL()
// with message "'s3://' is not valid,"
ReaderParams params;
EXPECT_THROW(gpreader.open(params), std::runtime_error);
......
......@@ -66,70 +66,70 @@ TEST_F(S3BucketReaderTest, OpenThrowExceptionWhenS3InterfaceIsNULL) {
EXPECT_THROW(bucketReader->open(params), std::runtime_error);
}
TEST_F(S3BucketReaderTest, ValidateURL_normal) {
EXPECT_NO_THROW(this->bucketReader->validateURL(
TEST_F(S3BucketReaderTest, ParseURL_normal) {
EXPECT_NO_THROW(this->bucketReader->parseURL(
"s3://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/normal"));
EXPECT_EQ("us-west-2", this->bucketReader->getRegion());
EXPECT_EQ("s3test.pivotal.io", this->bucketReader->getBucket());
EXPECT_EQ("dataset1/normal", this->bucketReader->getPrefix());
}
TEST_F(S3BucketReaderTest, ValidateURL_NoPrefixAndSlash) {
TEST_F(S3BucketReaderTest, ParseURL_NoPrefixAndSlash) {
EXPECT_NO_THROW(
this->bucketReader->validateURL("s3://s3-us-west-2.amazonaws.com/s3test.pivotal.io"));
this->bucketReader->parseURL("s3://s3-us-west-2.amazonaws.com/s3test.pivotal.io"));
EXPECT_EQ("s3test.pivotal.io", this->bucketReader->getBucket());
EXPECT_EQ("", this->bucketReader->getPrefix());
}
TEST_F(S3BucketReaderTest, ValidateURL_NoPrefix) {
TEST_F(S3BucketReaderTest, ParseURL_NoPrefix) {
EXPECT_NO_THROW(
this->bucketReader->validateURL("s3://s3-us-west-2.amazonaws.com/s3test.pivotal.io/"));
this->bucketReader->parseURL("s3://s3-us-west-2.amazonaws.com/s3test.pivotal.io/"));
EXPECT_EQ("s3test.pivotal.io", this->bucketReader->getBucket());
EXPECT_EQ("", this->bucketReader->getPrefix());
}
TEST_F(S3BucketReaderTest, ValidateURL_default) {
TEST_F(S3BucketReaderTest, ParseURL_default) {
EXPECT_NO_THROW(
this->bucketReader->validateURL("s3://s3.amazonaws.com/s3test.pivotal.io/dataset1/normal"));
this->bucketReader->parseURL("s3://s3.amazonaws.com/s3test.pivotal.io/dataset1/normal"));
EXPECT_EQ("external-1", this->bucketReader->getRegion());
EXPECT_EQ("s3test.pivotal.io", this->bucketReader->getBucket());
EXPECT_EQ("dataset1/normal", this->bucketReader->getPrefix());
}
TEST_F(S3BucketReaderTest, ValidateURL_useast1) {
EXPECT_NO_THROW(this->bucketReader->validateURL(
TEST_F(S3BucketReaderTest, ParseURL_useast1) {
EXPECT_NO_THROW(this->bucketReader->parseURL(
"s3://s3-us-east-1.amazonaws.com/s3test.pivotal.io/dataset1/normal"));
EXPECT_EQ("external-1", this->bucketReader->getRegion());
EXPECT_EQ("s3test.pivotal.io", this->bucketReader->getBucket());
EXPECT_EQ("dataset1/normal", this->bucketReader->getPrefix());
}
TEST_F(S3BucketReaderTest, ValidateURL_eucentral1) {
EXPECT_NO_THROW(this->bucketReader->validateURL(
TEST_F(S3BucketReaderTest, ParseURL_eucentral1) {
EXPECT_NO_THROW(this->bucketReader->parseURL(
"s3://s3.eu-central-1.amazonaws.com/s3test.pivotal.io/dataset1/normal"));
EXPECT_EQ("eu-central-1", this->bucketReader->getRegion());
EXPECT_EQ("s3test.pivotal.io", this->bucketReader->getBucket());
EXPECT_EQ("dataset1/normal", this->bucketReader->getPrefix());
}
TEST_F(S3BucketReaderTest, ValidateURL_eucentral11) {
EXPECT_NO_THROW(this->bucketReader->validateURL(
TEST_F(S3BucketReaderTest, ParseURL_eucentral11) {
EXPECT_NO_THROW(this->bucketReader->parseURL(
"s3://s3-eu-central-1.amazonaws.com/s3test.pivotal.io/dataset1/normal"));
EXPECT_EQ("eu-central-1", this->bucketReader->getRegion());
EXPECT_EQ("s3test.pivotal.io", this->bucketReader->getBucket());
EXPECT_EQ("dataset1/normal", this->bucketReader->getPrefix());
}
TEST_F(S3BucketReaderTest, ValidateURL_apnortheast2) {
EXPECT_NO_THROW(this->bucketReader->validateURL(
TEST_F(S3BucketReaderTest, ParseURL_apnortheast2) {
EXPECT_NO_THROW(this->bucketReader->parseURL(
"s3://s3.ap-northeast-2.amazonaws.com/s3test.pivotal.io/dataset1/normal"));
EXPECT_EQ("ap-northeast-2", this->bucketReader->getRegion());
EXPECT_EQ("s3test.pivotal.io", this->bucketReader->getBucket());
EXPECT_EQ("dataset1/normal", this->bucketReader->getPrefix());
}
TEST_F(S3BucketReaderTest, ValidateURL_apnortheast21) {
EXPECT_NO_THROW(this->bucketReader->validateURL(
TEST_F(S3BucketReaderTest, ParseURL_apnortheast21) {
EXPECT_NO_THROW(this->bucketReader->parseURL(
"s3://s3-ap-northeast-2.amazonaws.com/s3test.pivotal.io/dataset1/normal"));
EXPECT_EQ("ap-northeast-2", this->bucketReader->getRegion());
EXPECT_EQ("s3test.pivotal.io", this->bucketReader->getBucket());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册