提交 34d311e4 编写于 作者: A Adam Lee

s3ext: construct WriterParams, introduce genUniqueKeyName()

genUniqueKeyName() makes sure the file for every segment to upload has a
unique name.
Signed-off-by: NAdam Lee <ali@pivotal.io>
Signed-off-by: NHaozhou Wang <hawang@pivotal.io>
上级 a8a3f5c7
......@@ -14,8 +14,7 @@ class DecompressReader : public Reader {
void open(const ReaderParams &params);
// read() attempts to read up to count bytes into the buffer starting at
// buffer.
// read() attempts to read up to count bytes into the buffer.
// Return 0 if EOF. Throw exception if encounters errors.
uint64_t read(char *buf, uint64_t count);
......
......@@ -3,6 +3,7 @@
#include <string>
#include "reader.h"
#include "s3bucket_reader.h"
#include "s3common_reader.h"
#include "s3interface.h"
......@@ -18,8 +19,7 @@ class GPReader : public Reader {
virtual void open(const ReaderParams &params);
// read() attempts to read up to count bytes into the buffer starting at
// buffer.
// read() attempts to read up to count bytes into the buffer.
// Return 0 if EOF. Throw exception if encounters errors.
virtual uint64_t read(char *buf, uint64_t count);
......@@ -31,7 +31,7 @@ class GPReader : public Reader {
}
private:
void constructReaderParam(const string &url);
void constructReaderParams(const string &url);
protected:
S3BucketReader bucketReader;
......@@ -48,6 +48,8 @@ class GPReader : public Reader {
S3RESTfulService *restfulServicePtr;
};
void CheckEssentialConfig();
// Following 3 functions are invoked by s3_import(), need to be exception safe
GPReader *reader_init(const char *url_with_options);
bool reader_transfer_data(GPReader *reader, char *data_buf, int &data_len);
......
#ifndef INCLUDE_GPWRITER_H_
#define INCLUDE_GPWRITER_H_
#include <string>
#include <string.h>
#include "s3common.h"
#include "s3interface.h"
#include "s3restful_service.h"
#include "writer.h"
extern string s3extErrorMessage;
#include "writer.h"
class GPWriter : public Writer {
public:
GPWriter(const string &url);
virtual ~GPWriter();
virtual ~GPWriter() {
}
virtual void open(const ReaderParams &params);
virtual void open(const WriterParams &params);
// write() attempts to write up to count bytes from the buffer.
// Always return 0 if EOF, no matter how many times it's invoked. Throw exception if encounters
// errors.
virtual uint64_t write(char *buf, uint64_t count);
// This should be reentrant, has no side effects when called multiple times.
virtual void close();
const string &getKeyToUpload() const {
return keyToUpload;
}
void setKeyToUpload(const string &keyToUpload) {
this->keyToUpload = keyToUpload;
}
private:
void constructWriterParam(const string &url);
void constructWriterParams(const string &url);
string constructKeyName(const string &url);
string genUniqueKeyName(const string &url);
protected:
WriterParams params;
S3RESTfulService restfulService;
S3Service s3service;
ReaderParams params;
S3Credential cred;
S3RESTfulService restfulService;
string keyToUpload;
// it links to itself by default
// but the pointer here leaves a chance to mock it in unit test
......
......@@ -10,8 +10,7 @@ class Reader {
virtual void open(const ReaderParams &params) = 0;
// read() attempts to read up to count bytes into the buffer starting at
// buffer.
// read() attempts to read up to count bytes into the buffer.
// Always return 0 if EOF, no matter how many times it's invoked. Throw exception if encounters
// errors.
virtual uint64_t read(char *buf, uint64_t count) = 0;
......
......@@ -14,8 +14,7 @@ class S3CommonReader : public Reader {
virtual void open(const ReaderParams& params);
// read() attempts to read up to count bytes into the buffer starting at
// buffer.
// read() attempts to read up to count bytes into the buffer.
// Return 0 if EOF. Throw exception if encounters errors.
virtual uint64_t read(char* buf, uint64_t count);
......
......@@ -52,4 +52,3 @@ extern int32_t s3ext_low_speed_time;
// not thread safe!! call it only once.
bool InitConfig(const string &path, const string section);
void CheckEssentialConfig();
#ifndef __GP_EXT_WRITER_H__
#define __GP_EXT_WRITER_H__
#include "reader_params.h"
#include "writer_params.h"
class Writer {
public:
virtual ~Writer() {
}
virtual void open(const ReaderParams &params) = 0;
virtual void open(const WriterParams &params) = 0;
// read() attempts to read up to count bytes into the buffer starting at
// buffer.
// write() attempts to write up to count bytes from the buffer.
// Always return 0 if EOF, no matter how many times it's invoked. Throw exception if encounters
// errors.
virtual uint64_t write(char *buf, uint64_t count) = 0;
......
#ifndef INCLUDE_WRITER_PARAMS_H_
#define INCLUDE_WRITER_PARAMS_H_
#include <string>
#include "s3common.h"
using std::string;
class WriterParams {
public:
WriterParams() : chunkSize(0), numOfChunks(0), segId(0), segNum(1) {
}
virtual ~WriterParams() {
}
uint64_t getChunkSize() const {
return chunkSize;
}
void setChunkSize(uint64_t chunkSize) {
this->chunkSize = chunkSize;
}
const S3Credential& getCred() const {
return cred;
}
void setCred(const S3Credential& cred) {
this->cred = cred;
}
const string& getKeyUrl() const {
return keyUrl;
}
void setKeyUrl(const string& keyUrl) {
this->keyUrl = keyUrl;
}
const string& getRegion() const {
return region;
}
void setRegion(const string& region) {
this->region = region;
}
const string& getUrlToUpload() const {
return urlToUpload;
}
void setUrlToUpload(const string& url) {
this->urlToUpload = url;
}
uint64_t getSegId() const {
return segId;
}
void setSegId(uint64_t segId) {
this->segId = segId;
}
uint64_t getSegNum() const {
return segNum;
}
void setSegNum(uint64_t segNum) {
this->segNum = segNum;
}
uint64_t getNumOfChunks() const {
return numOfChunks;
}
void setNumOfChunks(uint64_t numOfChunks) {
this->numOfChunks = numOfChunks;
}
private:
string urlToUpload; // original url to read/write.
string keyUrl; // key url in s3 bucket.
string region;
uint64_t chunkSize; // chunk size
uint64_t numOfChunks; // number of chunks(threads).
S3Credential cred;
uint64_t segId;
uint64_t segNum;
};
#endif /* INCLUDE_WRITER_PARAMS_H_ */
......@@ -64,11 +64,11 @@ int thread_cleanup(void) {
}
GPReader::GPReader(const string& url) {
constructReaderParam(url);
constructReaderParams(url);
restfulServicePtr = &restfulService;
}
void GPReader::constructReaderParam(const string& url) {
void GPReader::constructReaderParams(const string& url) {
this->params.setUrlToLoad(url);
this->params.setSegId(s3ext_segid);
this->params.setSegNum(s3ext_segnum);
......@@ -88,8 +88,7 @@ void GPReader::open(const ReaderParams& params) {
this->bucketReader.open(this->params);
}
// read() attempts to read up to count bytes into the buffer starting at
// buffer.
// read() attempts to read up to count bytes into the buffer.
// Return 0 if EOF. Throw exception if encounters errors.
uint64_t GPReader::read(char* buf, uint64_t count) {
return this->bucketReader.read(buf, count);
......@@ -149,8 +148,8 @@ GPReader* reader_init(const char* url_with_options) {
return NULL;
}
ReaderParams param;
reader->open(param);
ReaderParams params;
reader->open(params);
return reader;
} catch (std::exception& e) {
......
#include <fcntl.h>
#include <openssl/crypto.h>
#include <pthread.h>
#include <sstream>
#include <string>
#include "gpwriter.h"
#include "gpcommon.h"
#include "gpreader.h"
#include "gpwriter.h"
#include "s3common.h"
#include "s3conf.h"
#include "s3log.h"
#include "s3macros.h"
#include "s3utils.h"
GPWriter::GPWriter(const string &url) {
string file = replaceSchemaFromURL(url);
using std::string;
using std::stringstream;
constructWriterParam(file);
GPWriter::GPWriter(const string& url) {
string file = replaceSchemaFromURL(url);
constructWriterParams(file);
restfulServicePtr = &restfulService;
}
GPWriter::~GPWriter() {
}
void GPWriter::constructWriterParam(const string &url) {
this->params.setUrlToLoad(url);
void GPWriter::constructWriterParams(const string& url) {
this->params.setUrlToUpload(url);
this->params.setSegId(s3ext_segid);
this->params.setSegNum(s3ext_segnum);
this->params.setNumOfChunks(s3ext_threadnum);
......@@ -30,26 +36,63 @@ void GPWriter::constructWriterParam(const string &url) {
this->params.setCred(this->cred);
}
void GPWriter::open(const ReaderParams &params) {
void GPWriter::open(const WriterParams& params) {
this->s3service.setRESTfulService(this->restfulServicePtr);
this->setKeyToUpload(this->genUniqueKeyName(this->params.getUrlToUpload()));
}
uint64_t GPWriter::write(char *buf, uint64_t count) {
printf("GPWriter::write\n");
uint64_t GPWriter::write(char* buf, uint64_t count) {
vector<uint8_t> data;
data.insert(data.end(), buf, buf + count);
return this->s3service.uploadData(data, this->params.getUrlToLoad(), this->params.getRegion(),
return this->s3service.uploadData(data, this->params.getUrlToUpload(), this->params.getRegion(),
this->params.getCred());
}
void GPWriter::close() {
}
GPWriter *writer_init(const char *url_with_options) {
GPWriter *gpwriter = NULL;
string GPWriter::genUniqueKeyName(const string& url) {
string keyName;
do {
keyName = this->constructKeyName(url);
} while (this->s3service.checkKeyExistence(keyName, this->params.getRegion(),
this->params.getCred()));
return keyName;
}
string GPWriter::constructKeyName(const string& url) {
int randomDevice = ::open("/dev/urandom", O_RDONLY);
char randomData[32];
size_t randomDataLen = 0;
while (randomDataLen < sizeof(randomData)) {
ssize_t result =
::read(randomDevice, randomData + randomDataLen, sizeof(randomData) - randomDataLen);
if (result < 0) {
break;
}
randomDataLen += result;
}
::close(randomDevice);
char out_hash_hex[SHA256_DIGEST_STRING_LENGTH];
sha256_hex(randomData, out_hash_hex);
stringstream ss;
ss << url << s3ext_segid << out_hash_hex + SHA256_DIGEST_STRING_LENGTH - 8 - 1 << ".data";
return ss.str();
}
// invoked by s3_export(), need to be exception safe
GPWriter* writer_init(const char* url_with_options) {
GPWriter* writer = NULL;
s3extErrorMessage.clear();
try {
if (!url_with_options) {
return NULL;
......@@ -76,18 +119,18 @@ GPWriter *writer_init(const char *url_with_options) {
InitRemoteLog();
gpwriter = new GPWriter(url);
if (gpwriter == NULL) {
writer = new GPWriter(url);
if (writer == NULL) {
return NULL;
}
ReaderParams param;
gpwriter->open(param);
return gpwriter;
WriterParams params;
writer->open(params);
return writer;
} catch (std::exception &e) {
if (gpwriter != NULL) {
delete gpwriter;
} catch (std::exception& e) {
if (writer != NULL) {
delete writer;
}
S3ERROR("writer_init caught an exception: %s", e.what());
s3extErrorMessage = e.what();
......@@ -95,7 +138,8 @@ GPWriter *writer_init(const char *url_with_options) {
}
}
bool writer_transfer_data(GPWriter *writer, char *data_buf, int &data_len) {
// invoked by s3_export(), need to be exception safe
bool writer_transfer_data(GPWriter* writer, char* data_buf, int& data_len) {
try {
if (!writer || !data_buf || (data_len < 0)) {
return false;
......@@ -107,9 +151,9 @@ bool writer_transfer_data(GPWriter *writer, char *data_buf, int &data_len) {
uint64_t write_len = writer->write(data_buf, data_len);
// sure read_len <= data_len here, hence truncation will never happen
// sure write_len <= data_len here, hence truncation will never happen
data_len = (int)write_len;
} catch (std::exception &e) {
} catch (std::exception& e) {
S3ERROR("writer_transfer_data caught an exception: %s", e.what());
s3extErrorMessage = e.what();
return false;
......@@ -118,7 +162,8 @@ bool writer_transfer_data(GPWriter *writer, char *data_buf, int &data_len) {
return true;
}
bool writer_cleanup(GPWriter **writer) {
// invoked by s3_export(), need to be exception safe
bool writer_cleanup(GPWriter** writer) {
bool result = true;
try {
if (*writer) {
......@@ -128,7 +173,7 @@ bool writer_cleanup(GPWriter **writer) {
} else {
result = false;
}
} catch (std::exception &e) {
} catch (std::exception& e) {
S3ERROR("writer_cleanup caught an exception: %s", e.what());
s3extErrorMessage = e.what();
result = false;
......
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <sstream>
#include <iostream>
#include <map>
......
......@@ -22,8 +22,7 @@ void S3CommonReader::open(const ReaderParams &params) {
this->upstreamReader->open(params);
}
// read() attempts to read up to count bytes into the buffer starting at
// buffer.
// read() attempts to read up to count bytes into the buffer.
// Return 0 if EOF. Throw exception if encounters errors.
uint64_t S3CommonReader::read(char *buf, uint64_t count) {
return this->upstreamReader->read(buf, count);
......@@ -32,4 +31,4 @@ uint64_t S3CommonReader::read(char *buf, uint64_t count) {
// This should be reentrant, has no side effects when called multiple times.
void S3CommonReader::close() {
this->upstreamReader->close();
}
\ No newline at end of file
}
#include <cstdarg>
#include <cstdio>
#include <cstring>
#include <sstream>
#include <string>
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
......@@ -6,11 +12,6 @@
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
#include <cstdarg>
#include <cstdio>
#include <cstring>
#include <sstream>
#include <string>
#include "gpcommon.h"
#include "s3conf.h"
......
#include <unistd.h>
#include <map>
#include <string>
#include <unistd.h>
#define __STDC_FORMAT_MACROS
#include <curl/curl.h>
#include <inttypes.h>
#include <string.h>
#include <map>
#include <string>
#include <string.h>
#include "gpcommon.h"
#include "s3http_headers.h"
......
......@@ -55,7 +55,7 @@ $(TEST_APP): $(TEST_OBJS) gtest_main.a
test: $(TEST_APP)
@-rm -f *.gcda test/*.gcda # workaround for XCode/Clang
@-./$(TEST_APP) --gtest_filter=$(gtest_filter)
@./$(TEST_APP) --gtest_filter=$(gtest_filter)
coverage: test
@gcov $(TEST_SRC) | grep -A 1 "src/.*.cpp"
......
#include "gpwriter.h"
#include "gpwriter.cpp"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "mock_classes.h"
using ::testing::AtMost;
using ::testing::AtLeast;
using ::testing::Return;
using ::testing::Invoke;
using ::testing::Throw;
using ::testing::_;
class MockGPWriter : public GPWriter {
public:
MockGPWriter(const string& urlWithOptions, S3RESTfulService* mockService)
: GPWriter(urlWithOptions) {
restfulServicePtr = mockService;
}
};
class GPWriterTest : public testing::Test {
protected:
virtual void SetUp() {
InitConfig("data/s3test.conf", "default");
}
virtual void TearDown() {
}
};
TEST_F(GPWriterTest, ConstructKeyName) {
string url = "https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/normal";
MockS3RESTfulService mockRestfulService;
MockGPWriter gpwriter(url, &mockRestfulService);
EXPECT_CALL(mockRestfulService, head(_, _, _)).WillOnce(Return(404));
WriterParams params;
params.setUrlToUpload(url);
gpwriter.open(params);
// "0"+".data"'s length is 6
EXPECT_EQ(8, gpwriter.getKeyToUpload().length() - url.length() - 6);
}
TEST_F(GPWriterTest, GenerateUniqueKeyName) {
string url = "https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/normal";
MockS3RESTfulService mockRestfulService;
MockGPWriter gpwriter(url, &mockRestfulService);
EXPECT_CALL(mockRestfulService, head(_, _, _)).Times(AtLeast(1)).WillRepeatedly(Return(404));
WriterParams params;
params.setUrlToUpload(url);
gpwriter.open(params);
MockGPWriter gpwriter2(url, &mockRestfulService);
EXPECT_CALL(mockRestfulService, head(gpwriter.getKeyToUpload(), _, _))
.Times(AtMost(1))
.WillOnce(Return(200));
gpwriter2.open(params);
EXPECT_NE(gpwriter.getKeyToUpload(), gpwriter2.getKeyToUpload());
}
TEST_F(GPWriterTest, ReGenerateKeyName) {
string url = "https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/normal";
MockS3RESTfulService mockRestfulService;
MockGPWriter gpwriter(url, &mockRestfulService);
EXPECT_CALL(mockRestfulService, head(_, _, _)).WillOnce(Return(200)).WillOnce(Return(404));
WriterParams params;
params.setUrlToUpload(url);
gpwriter.open(params);
// expect the restfulService->head() was called twice
}
......@@ -24,6 +24,9 @@ class MockS3Interface : public S3Interface {
MOCK_METHOD3(checkCompressionType, S3CompressionType(const string& keyUrl, const string& region,
const S3Credential& cred));
MOCK_METHOD3(checkKeyExistence, bool(const string& keyUrl, const string& region,
const S3Credential& cred));
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册