Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Greenplum
Gpdb
提交
f58f8093
G
Gpdb
项目概览
Greenplum
/
Gpdb
通知
7
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
G
Gpdb
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
f58f8093
编写于
3月 02, 2016
作者:
A
Adam Lee
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #459 from adam8157/update_s3_v2
Update S3 extention codes v2
上级
9f1516f5
4becc37f
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
351 addition
and
198 deletion
+351
-198
gpAux/extensions/gps3ext/README.md
gpAux/extensions/gps3ext/README.md
+15
-0
gpAux/extensions/gps3ext/README.org
gpAux/extensions/gps3ext/README.org
+0
-11
gpAux/extensions/gps3ext/src/S3Common.cpp
gpAux/extensions/gps3ext/src/S3Common.cpp
+2
-0
gpAux/extensions/gps3ext/src/S3Downloader.cpp
gpAux/extensions/gps3ext/src/S3Downloader.cpp
+16
-0
gpAux/extensions/gps3ext/src/S3Downloader_test.cpp
gpAux/extensions/gps3ext/src/S3Downloader_test.cpp
+2
-0
gpAux/extensions/gps3ext/src/S3ExtWrapper.cpp
gpAux/extensions/gps3ext/src/S3ExtWrapper.cpp
+132
-81
gpAux/extensions/gps3ext/src/S3ExtWrapper.h
gpAux/extensions/gps3ext/src/S3ExtWrapper.h
+3
-2
gpAux/extensions/gps3ext/src/S3ExtWrapper_test.cpp
gpAux/extensions/gps3ext/src/S3ExtWrapper_test.cpp
+50
-5
gpAux/extensions/gps3ext/src/S3Log.cpp
gpAux/extensions/gps3ext/src/S3Log.cpp
+8
-1
gpAux/extensions/gps3ext/src/gps3conf.cpp
gpAux/extensions/gps3ext/src/gps3conf.cpp
+90
-61
gpAux/extensions/gps3ext/src/gps3ext.cpp
gpAux/extensions/gps3ext/src/gps3ext.cpp
+29
-37
gpAux/extensions/gps3ext/src/gps3ext.h
gpAux/extensions/gps3ext/src/gps3ext.h
+4
-0
gpAux/extensions/gps3ext/test/s3test.conf
gpAux/extensions/gps3ext/test/s3test.conf
+0
-0
未找到文件。
gpAux/extensions/gps3ext/README.md
0 → 100644
浏览文件 @
f58f8093
## Build
`make`
to build the extension for GPDB.
## Test
### Run Unit Tests
`make -f Makefile.test test`
Some tests need
[
unite
](
https://github.com/lij55/unite
)
to serve the test contents and recevie log.
### Test Code Coverage
`make -f Makefile.test coverage`
gpAux/extensions/gps3ext/README.org
已删除
100644 → 0
浏览文件 @
9f1516f5
* Build
=make= to build the extension for GPDB.
* Test
** Run Unit Test
In order to running test, it need [[https://github.com/GPDBUnite/unite][uniteserver]] to serve the test content and recevie log.
=make -f Makefile.test test=
** Test Coverage
=make -f Makefile.test coverage=
gpAux/extensions/gps3ext/src/S3Common.cpp
浏览文件 @
f58f8093
...
...
@@ -235,6 +235,7 @@ uint64_t ParserCallback(void *contents, uint64_t size, uint64_t nmemb,
return
realsize
;
}
// invoked by s3_import(), need to be exception safe
char
*
get_opt_s3
(
const
char
*
url
,
const
char
*
key
)
{
const
char
*
key_f
=
NULL
;
const
char
*
key_tailing
=
NULL
;
...
...
@@ -303,6 +304,7 @@ FAIL:
}
// returned memory needs to be freed
// invoked by s3_import(), need to be exception safe
char
*
truncate_options
(
const
char
*
url_with_options
)
{
const
char
*
delimiter
=
" "
;
char
*
options
=
strstr
((
char
*
)
url_with_options
,
delimiter
);
...
...
gpAux/extensions/gps3ext/src/S3Downloader.cpp
浏览文件 @
f58f8093
...
...
@@ -7,6 +7,7 @@
#include <libxml/parser.h>
#include <libxml/tree.h>
#include "gps3ext.h"
#include "utils.h"
#include "S3Log.h"
#include <curl/curl.h>
...
...
@@ -184,6 +185,11 @@ void *DownloadThreadfunc(void *data) {
uint64_t
filled_size
=
0
;
S3INFO
(
"Downloading thread starts"
);
do
{
if
(
QueryCancelPending
)
{
S3INFO
(
"Downloading thread is interrupted by GPDB"
);
return
NULL
;
}
filled_size
=
buffer
->
Fill
();
// XXX fix the returning type
if
(
filled_size
==
-
1
)
{
...
...
@@ -303,6 +309,10 @@ static uint64_t WriterCallback(void *contents, uint64_t size, uint64_t nmemb,
uint64_t
realsize
=
size
*
nmemb
;
Bufinfo
*
p
=
reinterpret_cast
<
Bufinfo
*>
(
userp
);
if
(
QueryCancelPending
)
{
return
-
1
;
}
memcpy
(
p
->
buf
+
p
->
len
,
contents
,
realsize
);
p
->
len
+=
realsize
;
return
realsize
;
...
...
@@ -391,6 +401,12 @@ uint64_t HTTPFetcher::fetchdata(uint64_t offset, char *data, uint64_t len) {
CURLcode
res
=
curl_easy_perform
(
curl_handle
);
if
(
res
==
CURLE_WRITE_ERROR
)
{
S3INFO
(
"Curl downloading is interrupted by GPDB"
);
bi
.
len
=
-
1
;
break
;
}
if
(
res
==
CURLE_OPERATION_TIMEDOUT
)
{
S3WARN
(
"Net speed is too slow, retry"
);
bi
.
len
=
-
1
;
...
...
gpAux/extensions/gps3ext/src/S3Downloader_test.cpp
浏览文件 @
f58f8093
#include "gtest/gtest.h"
#include "S3Downloader.cpp"
volatile
bool
QueryCancelPending
=
false
;
TEST
(
OffsetMgr
,
simple
)
{
OffsetMgr
*
o
=
new
OffsetMgr
(
4096
,
1000
);
Range
r
=
o
->
NextOffset
();
...
...
gpAux/extensions/gps3ext/src/S3ExtWrapper.cpp
浏览文件 @
f58f8093
...
...
@@ -9,9 +9,15 @@
using
std
::
stringstream
;
using
std
::
string
;
// invoked by s3_import(), need to be exception safe
S3ExtBase
*
CreateExtWrapper
(
const
char
*
url
)
{
S3ExtBase
*
ret
=
new
S3Reader
(
url
);
return
ret
;
try
{
S3ExtBase
*
ret
=
new
S3Reader
(
url
);
return
ret
;
}
catch
(...)
{
S3ERROR
(
"Caught an exception, aborting"
);
return
NULL
;
}
}
S3ExtBase
::
S3ExtBase
(
string
url
)
{
...
...
@@ -41,55 +47,63 @@ S3Reader::S3Reader(string url) : S3ExtBase(url) {
this
->
keylist
=
NULL
;
}
// invoked by s3_import(), need to be exception safe
bool
S3Reader
::
Init
(
int
segid
,
int
segnum
,
int
chunksize
)
{
// set segment id and num
this
->
segid
=
s3ext_segid
;
this
->
segnum
=
s3ext_segnum
;
this
->
contentindex
=
this
->
segid
;
this
->
chunksize
=
chunksize
;
// Validate url first
if
(
!
this
->
ValidateURL
())
{
S3ERROR
(
"The given URL(%s) is invalid"
,
this
->
url
.
c_str
());
return
false
;
}
try
{
// set segment id and num
this
->
segid
=
s3ext_segid
;
this
->
segnum
=
s3ext_segnum
;
this
->
contentindex
=
this
->
segid
;
this
->
chunksize
=
chunksize
;
// Validate url first
if
(
!
this
->
ValidateURL
())
{
S3ERROR
(
"The given URL(%s) is invalid"
,
this
->
url
.
c_str
());
return
false
;
}
// TODO: As separated function for generating url
stringstream
sstr
;
sstr
<<
"s3-"
<<
this
->
region
<<
".amazonaws.com"
;
S3DEBUG
(
"Host url is %s"
,
sstr
.
str
().
c_str
());
int
initretry
=
3
;
while
(
initretry
--
)
{
this
->
keylist
=
ListBucket
(
this
->
schema
.
c_str
(),
sstr
.
str
().
c_str
(),
this
->
bucket
.
c_str
(),
this
->
prefix
.
c_str
(),
this
->
cred
);
if
(
!
this
->
keylist
)
{
S3INFO
(
"Can't get keylist from bucket %s"
,
this
->
bucket
.
c_str
());
if
(
initretry
)
{
S3INFO
(
"Retrying"
);
continue
;
}
else
{
S3ERROR
(
"Quit initialization because ListBucket keeps failing"
);
return
false
;
// TODO: As separated function for generating url
stringstream
sstr
;
sstr
<<
"s3-"
<<
this
->
region
<<
".amazonaws.com"
;
S3DEBUG
(
"Host url is %s"
,
sstr
.
str
().
c_str
());
int
initretry
=
3
;
while
(
initretry
--
)
{
this
->
keylist
=
ListBucket
(
this
->
schema
.
c_str
(),
sstr
.
str
().
c_str
(),
this
->
bucket
.
c_str
(),
this
->
prefix
.
c_str
(),
this
->
cred
);
if
(
!
this
->
keylist
)
{
S3INFO
(
"Can't get keylist from bucket %s"
,
this
->
bucket
.
c_str
());
if
(
initretry
)
{
S3INFO
(
"Retrying"
);
continue
;
}
else
{
S3ERROR
(
"Quit initialization because ListBucket keeps failing"
);
return
false
;
}
}
}
if
(
this
->
keylist
->
contents
.
size
()
==
0
)
{
S3INFO
(
"Keylist of bucket is empty"
);
if
(
initretry
)
{
S3INFO
(
"Retry listing bucket"
);
continue
;
}
else
{
S3ERROR
(
"Quit initialization because keylist is empty"
);
return
false
;
if
(
this
->
keylist
->
contents
.
size
()
==
0
)
{
S3INFO
(
"Keylist of bucket is empty"
);
if
(
initretry
)
{
S3INFO
(
"Retry listing bucket"
);
continue
;
}
else
{
S3ERROR
(
"Quit initialization because keylist is empty"
);
return
false
;
}
}
break
;
}
break
;
S3INFO
(
"Got %d files to download"
,
this
->
keylist
->
contents
.
size
());
this
->
getNextDownloader
();
}
catch
(...)
{
S3ERROR
(
"Caught an exception, aborting"
);
return
false
;
}
S3INFO
(
"Got %d files to download"
,
this
->
keylist
->
contents
.
size
());
this
->
getNextDownloader
();
// return this->filedownloader ? true : false;
return
true
;
...
...
@@ -106,7 +120,13 @@ void S3Reader::getNextDownloader() {
S3DEBUG
(
"No more files to download"
);
return
;
}
this
->
filedownloader
=
new
Downloader
(
this
->
concurrent_num
);
if
(
this
->
concurrent_num
>
0
)
{
this
->
filedownloader
=
new
Downloader
(
this
->
concurrent_num
);
}
else
{
S3ERROR
(
"Failed to create filedownloader due to threadnum"
);
return
;
}
if
(
!
this
->
filedownloader
)
{
S3ERROR
(
"Failed to create filedownloader"
);
...
...
@@ -116,9 +136,8 @@ void S3Reader::getNextDownloader() {
string
keyurl
=
this
->
getKeyURL
(
c
->
Key
());
S3DEBUG
(
"key: %s, size: %llu"
,
keyurl
.
c_str
(),
c
->
Size
());
// XXX don't use strdup()
if
(
!
filedownloader
->
init
(
keyurl
.
c_str
(),
c
->
Size
(),
this
->
chunksize
,
&
this
->
cred
))
{
if
(
!
filedownloader
->
init
(
keyurl
,
c
->
Size
(),
this
->
chunksize
,
&
this
->
cred
))
{
delete
this
->
filedownloader
;
this
->
filedownloader
=
NULL
;
}
else
{
// move to next file
...
...
@@ -137,50 +156,74 @@ string S3Reader::getKeyURL(const string key) {
return
sstr
.
str
();
}
// invoked by s3_import(), need to be exception safe
bool
S3Reader
::
TransferData
(
char
*
data
,
uint64_t
&
len
)
{
if
(
!
this
->
filedownloader
)
{
S3INFO
(
"No files to download, exit"
);
// not initialized?
len
=
0
;
return
true
;
}
uint64_t
buflen
;
RETRY:
buflen
=
len
;
// S3DEBUG("getlen is %d", len);
bool
result
=
filedownloader
->
get
(
data
,
buflen
);
if
(
!
result
)
{
// read fail
S3ERROR
(
"Failed to get data from filedownloader"
);
return
false
;
}
// S3DEBUG("getlen is %lld", buflen);
if
(
buflen
==
0
)
{
// change to next downloader
this
->
getNextDownloader
();
if
(
this
->
filedownloader
)
{
// download next file
S3INFO
(
"Time to download new file"
);
goto
RETRY
;
try
{
if
(
!
this
->
filedownloader
)
{
S3INFO
(
"No files to download, exit"
);
// not initialized?
len
=
0
;
return
true
;
}
uint64_t
buflen
;
RETRY:
buflen
=
len
;
// S3DEBUG("getlen is %d", len);
bool
result
=
filedownloader
->
get
(
data
,
buflen
);
if
(
!
result
)
{
// read fail
S3ERROR
(
"Failed to get data from filedownloader"
);
return
false
;
}
// S3DEBUG("getlen is %lld", buflen);
if
(
buflen
==
0
)
{
// change to next downloader
this
->
getNextDownloader
();
if
(
this
->
filedownloader
)
{
// download next file
S3INFO
(
"Time to download new file"
);
goto
RETRY
;
}
}
len
=
buflen
;
}
catch
(...)
{
S3ERROR
(
"Caught an exception, aborting"
);
return
false
;
}
len
=
buflen
;
return
true
;
}
// invoked by s3_import(), need to be exception safe
bool
S3Reader
::
Destroy
()
{
// reset filedownloader
if
(
this
->
filedownloader
)
{
this
->
filedownloader
->
destroy
();
delete
this
->
filedownloader
;
}
try
{
// reset filedownloader
if
(
this
->
filedownloader
)
{
this
->
filedownloader
->
destroy
();
delete
this
->
filedownloader
;
}
// Free keylist
if
(
this
->
keylist
)
{
delete
this
->
keylist
;
// Free keylist
if
(
this
->
keylist
)
{
delete
this
->
keylist
;
}
}
catch
(...)
{
S3ERROR
(
"Caught an exception, aborting"
);
return
false
;
}
return
true
;
}
bool
S3ExtBase
::
ValidateURL
()
{
// TODO http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
// As the documentation above says, some regions use domains in forms other
// than the "standard" ones, need to cover them here.
//
// Still have two to take care of, do it later since they only support
// signature v4, s3ext doesn't support v4 yet.
//
// s3.eu-central-1.amazonaws.com -> s3-eu-central-1.amazonaws.com
// s3.ap-northeast-2.amazonaws.com -> s3-ap-northeast-2.amazonaws.com
const
char
*
awsdomain
=
".amazonaws.com"
;
int
ibegin
=
0
;
int
iend
=
url
.
find
(
"://"
);
...
...
@@ -199,10 +242,18 @@ bool S3ExtBase::ValidateURL() {
ibegin
=
url
.
find
(
"-"
);
iend
=
url
.
find
(
awsdomain
);
if
((
iend
==
string
::
npos
)
||
(
ibegin
==
string
::
npos
))
{
if
(
iend
==
string
::
npos
)
{
return
false
;
}
else
if
(
ibegin
==
string
::
npos
)
{
this
->
region
=
"external-1"
;
}
else
{
this
->
region
=
url
.
substr
(
ibegin
+
1
,
iend
-
ibegin
-
1
);
}
if
(
this
->
region
.
compare
(
"us-east-1"
)
==
0
)
{
this
->
region
=
"external-1"
;
}
this
->
region
=
url
.
substr
(
ibegin
+
1
,
iend
-
ibegin
-
1
);
ibegin
=
find_Nth
(
url
,
3
,
"/"
);
iend
=
find_Nth
(
url
,
4
,
"/"
);
...
...
gpAux/extensions/gps3ext/src/S3ExtWrapper.h
浏览文件 @
f58f8093
...
...
@@ -14,6 +14,9 @@ class S3ExtBase {
virtual
bool
Init
(
int
segid
,
int
segnum
,
int
chunksize
)
=
0
;
virtual
bool
TransferData
(
char
*
data
,
uint64_t
&
len
)
=
0
;
virtual
bool
Destroy
()
=
0
;
virtual
bool
ValidateURL
();
string
get_region
()
{
return
this
->
region
;
}
protected:
S3Credential
cred
;
...
...
@@ -29,8 +32,6 @@ class S3ExtBase {
int
concurrent_num
;
int
chunksize
;
virtual
bool
ValidateURL
();
};
class
S3Reader
:
public
S3ExtBase
{
...
...
gpAux/extensions/gps3ext/src/S3ExtWrapper_test.cpp
浏览文件 @
f58f8093
...
...
@@ -7,10 +7,10 @@ class S3Reader_fake : public S3Reader {
virtual
~
S3Reader_fake
();
virtual
bool
Init
(
int
segid
,
int
segnum
,
int
chunksize
);
virtual
bool
Destroy
();
virtual
bool
ValidateURL
();
protected:
virtual
string
getKeyURL
(
const
string
key
);
virtual
bool
ValidateURL
();
};
S3Reader_fake
::
S3Reader_fake
(
string
url
)
:
S3Reader
(
url
)
{}
...
...
@@ -117,8 +117,53 @@ void ExtWrapperTest(const char *url, uint64_t buffer_size, const char *md5_str,
free
(
buf
);
}
TEST
(
ExtWrapper
,
ValidateURL_normal
)
{
S3ExtBase
*
myData
;
myData
=
new
S3Reader
(
"s3://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/normal"
);
ASSERT_TRUE
(
myData
->
ValidateURL
());
EXPECT_STREQ
(
"us-west-2"
,
myData
->
get_region
().
c_str
());
delete
myData
;
}
TEST
(
ExtWrapper
,
ValidateURL_default
)
{
S3ExtBase
*
myData
;
myData
=
new
S3Reader
(
"s3://s3.amazonaws.com/s3test.pivotal.io/dataset1/normal"
);
ASSERT_TRUE
(
myData
->
ValidateURL
());
EXPECT_STREQ
(
"external-1"
,
myData
->
get_region
().
c_str
());
delete
myData
;
}
TEST
(
ExtWrapper
,
ValidateURL_useast1
)
{
S3ExtBase
*
myData
;
myData
=
new
S3Reader
(
"s3://s3-us-east-1.amazonaws.com/s3test.pivotal.io/dataset1/normal"
);
ASSERT_TRUE
(
myData
->
ValidateURL
());
EXPECT_STREQ
(
"external-1"
,
myData
->
get_region
().
c_str
());
delete
myData
;
}
#ifdef AWSTEST
TEST
(
ExtWrapper
,
normal_region_default
)
{
ExtWrapperTest
(
"https://s3.amazonaws.com/useast1.s3test.pivotal.io/small17/"
,
64
*
1024
,
"138fc555074671912125ba692c678246"
,
0
,
1
,
64
*
1024
*
1024
);
}
TEST
(
ExtWrapper
,
normal_region_useast1
)
{
ExtWrapperTest
(
"https://s3-us-east-1.amazonaws.com/useast1.s3test.pivotal.io/small17/"
,
64
*
1024
,
"138fc555074671912125ba692c678246"
,
0
,
1
,
64
*
1024
*
1024
);
}
TEST
(
ExtWrapper
,
normal
)
{
ExtWrapperTest
(
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/"
...
...
@@ -175,19 +220,19 @@ TEST(ExtWrapper, huge_1seg) {
ExtWrapperTest
(
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/"
"hugefile/"
,
64
*
1024
,
"
b87b5d79e2bcb8dc1d0fd289fbfa5829
"
,
0
,
1
,
64
*
1024
*
1024
);
64
*
1024
,
"
75baaa39f2b1544ed8af437c2cad86b7
"
,
0
,
1
,
64
*
1024
*
1024
);
}
TEST
(
ExtWrapper
,
normal2_3segs
)
{
ExtWrapperTest
(
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/normal/"
,
64
*
1024
,
"
b87b5d79e2bcb8dc1d0fd289fbfa5829
"
,
0
,
3
,
64
*
1024
*
1024
);
64
*
1024
,
"
1c1b198b246160733f7a3491bff5cd52
"
,
0
,
3
,
64
*
1024
*
1024
);
ExtWrapperTest
(
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/normal/"
,
64
*
1024
,
"
b87b5d79e2bcb8dc1d0fd289fbfa5829
"
,
1
,
3
,
64
*
1024
*
1024
);
64
*
1024
,
"
296856eb9739d3022b3e9d8bf3b1ea2e
"
,
1
,
3
,
64
*
1024
*
1024
);
ExtWrapperTest
(
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/normal/"
,
64
*
1024
,
"
b87b5d79e2bcb8dc1d0fd289fbfa5829
"
,
2
,
3
,
64
*
1024
*
1024
);
64
*
1024
,
"
00675684b6d6697571f22baaf407c6df
"
,
2
,
3
,
64
*
1024
*
1024
);
}
#endif // AWSTEST
...
...
gpAux/extensions/gps3ext/src/S3Log.cpp
浏览文件 @
f58f8093
...
...
@@ -89,8 +89,13 @@ void LogMessage(LOGLEVEL loglevel, const char* fmt, ...) {
}
static
bool
loginited
=
false
;
// invoked by s3_import(), need to be exception safe
void
InitLog
()
{
if
(
!
loginited
)
{
try
{
if
(
loginited
)
{
return
;
}
s3ext_logsock_local
=
socket
(
PF_UNIX
,
SOCK_DGRAM
,
0
);
if
(
s3ext_logsock_local
<
0
)
{
perror
(
"Failed to create socket while InitLog()"
);
...
...
@@ -113,6 +118,8 @@ void InitLog() {
inet_aton
(
s3ext_logserverhost
.
c_str
(),
&
s3ext_logserveraddr
.
sin_addr
);
loginited
=
true
;
}
catch
(...)
{
return
;
}
}
...
...
gpAux/extensions/gps3ext/src/gps3conf.cpp
浏览文件 @
f58f8093
...
...
@@ -58,97 +58,126 @@ int32_t s3ext_logsock_udp = -1;
Config
*
s3cfg
=
NULL
;
// not thread safe!!
// invoked by s3_import(), need to be exception safe
bool
InitConfig
(
string
conf_path
,
string
section
/*not used currently*/
)
{
if
(
conf_path
==
""
)
{
try
{
if
(
conf_path
==
""
)
{
#ifndef DEBUGS3
write_log
(
"Config file is not specified
\n
"
);
write_log
(
"Config file is not specified
\n
"
);
#endif
return
false
;
}
return
false
;
}
if
(
s3cfg
)
delete
s3cfg
;
if
(
s3cfg
)
delete
s3cfg
;
s3cfg
=
new
Config
(
conf_path
);
if
(
!
s3cfg
||
!
s3cfg
->
Handle
())
{
s3cfg
=
new
Config
(
conf_path
);
if
(
!
s3cfg
||
!
s3cfg
->
Handle
())
{
#ifndef DEBUGS3
write_log
(
"Failed to parse config file
\n
"
);
write_log
(
"Failed to parse config file
\n
"
);
#endif
if
(
s3cfg
)
{
delete
s3cfg
;
s3cfg
=
NULL
;
if
(
s3cfg
)
{
delete
s3cfg
;
s3cfg
=
NULL
;
}
return
false
;
}
return
false
;
}
Config
*
cfg
=
s3cfg
;
bool
ret
=
false
;
string
content
;
content
=
cfg
->
Get
(
"default"
,
"loglevel"
,
"INFO"
);
s3ext_loglevel
=
getLogLevel
(
content
.
c_str
());
Config
*
cfg
=
s3cfg
;
bool
ret
=
false
;
string
content
;
content
=
cfg
->
Get
(
"default"
,
"loglevel"
,
"INFO"
);
s3ext_loglevel
=
getLogLevel
(
content
.
c_str
());
content
=
cfg
->
Get
(
"default"
,
"logtype"
,
"INTERNAL"
);
s3ext_logtype
=
getLogType
(
content
.
c_str
());
content
=
cfg
->
Get
(
"default"
,
"logtype"
,
"INTERNAL"
);
s3ext_logtype
=
getLogType
(
content
.
c_str
());
s3ext_accessid
=
cfg
->
Get
(
"default"
,
"accessid"
,
""
);
s3ext_secret
=
cfg
->
Get
(
"default"
,
"secret"
,
""
);
s3ext_token
=
cfg
->
Get
(
"default"
,
"token"
,
""
);
s3ext_accessid
=
cfg
->
Get
(
"default"
,
"accessid"
,
""
);
s3ext_secret
=
cfg
->
Get
(
"default"
,
"secret"
,
""
);
s3ext_token
=
cfg
->
Get
(
"default"
,
"token"
,
""
);
#ifdef DEBUGS3
// s3ext_loglevel = EXT_DEBUG;
// s3ext_logtype = LOCAL_LOG;
#endif
s3ext_logpath
=
cfg
->
Get
(
"default"
,
"logpath"
,
"/tmp/.s3log.sock"
);
s3ext_logserverhost
=
cfg
->
Get
(
"default"
,
"logserverhost"
,
"127.0.0.1"
);
s3ext_logpath
=
cfg
->
Get
(
"default"
,
"logpath"
,
"/tmp/.s3log.sock"
);
s3ext_logserverhost
=
cfg
->
Get
(
"default"
,
"logserverhost"
,
"127.0.0.1"
);
ret
=
cfg
->
Scan
(
"default"
,
"logserverport"
,
"%d"
,
&
s3ext_logserverport
);
if
(
!
ret
)
{
s3ext_logserverport
=
1111
;
}
ret
=
cfg
->
Scan
(
"default"
,
"logserverport"
,
"%d"
,
&
s3ext_logserverport
);
if
(
!
ret
)
{
s3ext_logserverport
=
1111
;
}
ret
=
cfg
->
Scan
(
"default"
,
"threadnum"
,
"%d"
,
&
s3ext_threadnum
);
if
(
!
ret
)
{
S3INFO
(
"Failed to get thread number, use default value 4"
);
s3ext_threadnum
=
4
;
}
ret
=
cfg
->
Scan
(
"default"
,
"threadnum"
,
"%d"
,
&
s3ext_threadnum
);
if
(
!
ret
)
{
S3INFO
(
"Failed to get thread number, use default value 4"
);
s3ext_threadnum
=
4
;
}
if
(
s3ext_threadnum
>
8
)
{
S3INFO
(
"The given thread number is too big, use max value 8"
);
s3ext_threadnum
=
8
;
}
if
(
s3ext_threadnum
<
1
)
{
S3INFO
(
"The given thread number is too small, use min value 1"
);
s3ext_threadnum
=
1
;
}
ret
=
cfg
->
Scan
(
"default"
,
"chunksize"
,
"%d"
,
&
s3ext_chunksize
);
if
(
!
ret
)
{
S3INFO
(
"Failed to get chunksize, use default value %d"
,
64
*
1024
*
1024
);
s3ext_chunksize
=
64
*
1024
*
1024
;
}
ret
=
cfg
->
Scan
(
"default"
,
"chunksize"
,
"%d"
,
&
s3ext_chunksize
);
if
(
!
ret
)
{
S3INFO
(
"Failed to get chunksize, use default value 64MB"
);
s3ext_chunksize
=
64
*
1024
*
1024
;
}
if
(
s3ext_chunksize
>
128
*
1024
*
1024
)
{
S3INFO
(
"The given chunksize is too large, use max value 128MB"
);
s3ext_chunksize
=
128
*
1024
*
1024
;
}
if
(
s3ext_chunksize
<
2
*
1024
*
1024
)
{
S3INFO
(
"The given chunksize is too small, use min value 2MB"
);
s3ext_chunksize
=
2
*
1024
*
1024
;
}
ret
=
cfg
->
Scan
(
"default"
,
"low_speed_limit"
,
"%d"
,
&
s3ext_low_speed_limit
);
if
(
!
ret
)
{
S3INFO
(
"Failed to get low_speed_limit, use default value %d bytes/s"
,
10240
);
s3ext_low_speed_limit
=
10240
;
}
ret
=
cfg
->
Scan
(
"default"
,
"low_speed_limit"
,
"%d"
,
&
s3ext_low_speed_limit
);
if
(
!
ret
)
{
S3INFO
(
"Failed to get low_speed_limit, use default value %d bytes/s"
,
10240
);
s3ext_low_speed_limit
=
10240
;
}
ret
=
cfg
->
Scan
(
"default"
,
"low_speed_time"
,
"%d"
,
&
s3ext_low_speed_time
);
if
(
!
ret
)
{
S3INFO
(
"Failed to get low_speed_time, use default value %d seconds"
,
60
);
s3ext_low_speed_time
=
60
;
}
ret
=
cfg
->
Scan
(
"default"
,
"low_speed_time"
,
"%d"
,
&
s3ext_low_speed_time
);
if
(
!
ret
)
{
S3INFO
(
"Failed to get low_speed_time, use default value %d seconds"
,
60
);
s3ext_low_speed_time
=
60
;
}
content
=
cfg
->
Get
(
"default"
,
"encryption"
,
"true"
);
s3ext_encryption
=
to_bool
(
content
);
content
=
cfg
->
Get
(
"default"
,
"encryption"
,
"true"
);
s3ext_encryption
=
to_bool
(
content
);
#ifdef DEBUGS3
s3ext_segid
=
0
;
s3ext_segnum
=
1
;
s3ext_segid
=
0
;
s3ext_segnum
=
1
;
#else
s3ext_segid
=
GpIdentity
.
segindex
;
s3ext_segnum
=
GpIdentity
.
numsegments
;
s3ext_segid
=
GpIdentity
.
segindex
;
s3ext_segnum
=
GpIdentity
.
numsegments
;
#endif
}
catch
(...)
{
return
false
;
}
return
true
;
}
// invoked by s3_import(), need to be exception safe
void
ClearConfig
()
{
if
(
s3cfg
)
{
delete
s3cfg
;
s3cfg
=
NULL
;
try
{
if
(
s3cfg
)
{
delete
s3cfg
;
s3cfg
=
NULL
;
}
}
catch
(...)
{
return
;
}
}
gpAux/extensions/gps3ext/src/gps3ext.cpp
浏览文件 @
f58f8093
...
...
@@ -20,6 +20,7 @@
#include "gps3ext.h"
#include "gps3conf.h"
#include <signal.h>
#include <pthread.h>
#include <openssl/err.h>
...
...
@@ -36,55 +37,44 @@ Datum s3_import(PG_FUNCTION_ARGS);
Datum
s3_validate_urls
(
PG_FUNCTION_ARGS
);
}
#define MUTEX_TYPE
pthread_mutex_t
#define MUTEX_SETUP(x)
pthread_mutex_init(&(x), NULL)
#define MUTEX_TYPE pthread_mutex_t
#define MUTEX_SETUP(x) pthread_mutex_init(&(x), NULL)
#define MUTEX_CLEANUP(x) pthread_mutex_destroy(&(x))
#define MUTEX_LOCK(x) pthread_mutex_lock(&(x))
#define MUTEX_UNLOCK(x) pthread_mutex_unlock(&(x))
#define THREAD_ID pthread_self( )
/* This array will store all of the mutexes available to OpenSSL. */
static
MUTEX_TYPE
*
mutex_buf
=
NULL
;
static
void
locking_function
(
int
mode
,
int
n
,
const
char
*
file
,
int
line
)
{
#define MUTEX_LOCK(x) pthread_mutex_lock(&(x))
#define MUTEX_UNLOCK(x) pthread_mutex_unlock(&(x))
#define THREAD_ID pthread_self()
/* This array will store all of the mutexes available to OpenSSL. */
static
MUTEX_TYPE
*
mutex_buf
=
NULL
;
static
void
locking_function
(
int
mode
,
int
n
,
const
char
*
file
,
int
line
)
{
if
(
mode
&
CRYPTO_LOCK
)
MUTEX_LOCK
(
mutex_buf
[
n
]);
else
MUTEX_UNLOCK
(
mutex_buf
[
n
]);
}
static
unsigned
long
id_function
(
void
)
{
return
((
unsigned
long
)
THREAD_ID
);
}
int
thread_setup
(
void
)
{
static
unsigned
long
id_function
(
void
)
{
return
((
unsigned
long
)
THREAD_ID
);
}
int
thread_setup
(
void
)
{
int
i
;
mutex_buf
=
(
pthread_mutex_t
*
)
palloc
(
CRYPTO_num_locks
()
*
sizeof
(
MUTEX_TYPE
));
if
(
!
mutex_buf
)
return
0
;
for
(
i
=
0
;
i
<
CRYPTO_num_locks
(
);
i
++
)
MUTEX_SETUP
(
mutex_buf
[
i
]);
mutex_buf
=
(
pthread_mutex_t
*
)
palloc
(
CRYPTO_num_locks
()
*
sizeof
(
MUTEX_TYPE
));
if
(
!
mutex_buf
)
return
0
;
for
(
i
=
0
;
i
<
CRYPTO_num_locks
();
i
++
)
MUTEX_SETUP
(
mutex_buf
[
i
]);
CRYPTO_set_id_callback
(
id_function
);
CRYPTO_set_locking_callback
(
locking_function
);
return
1
;
}
int
thread_cleanup
(
void
)
{
int
thread_cleanup
(
void
)
{
int
i
;
if
(
!
mutex_buf
)
return
0
;
if
(
!
mutex_buf
)
return
0
;
CRYPTO_set_id_callback
(
NULL
);
CRYPTO_set_locking_callback
(
NULL
);
for
(
i
=
0
;
i
<
CRYPTO_num_locks
(
);
i
++
)
MUTEX_CLEANUP
(
mutex_buf
[
i
]);
for
(
i
=
0
;
i
<
CRYPTO_num_locks
();
i
++
)
MUTEX_CLEANUP
(
mutex_buf
[
i
]);
pfree
(
mutex_buf
);
mutex_buf
=
NULL
;
return
1
;
...
...
@@ -92,6 +82,7 @@ int thread_cleanup(void)
/*
* Import data into GPDB.
* invoked by GPDB, be careful with C++ exceptions.
*/
Datum
s3_import
(
PG_FUNCTION_ARGS
)
{
S3ExtBase
*
myData
;
...
...
@@ -111,7 +102,7 @@ Datum s3_import(PG_FUNCTION_ARGS) {
if
(
myData
)
{
thread_cleanup
();
if
(
!
myData
->
Destroy
())
{
ereport
(
ERROR
,
(
0
,
errmsg
(
"
Cleanup S3 extention failed
"
)));
ereport
(
ERROR
,
(
0
,
errmsg
(
"
Failed to cleanup S3 extention
"
)));
}
delete
myData
;
}
...
...
@@ -122,6 +113,7 @@ Datum s3_import(PG_FUNCTION_ARGS) {
/* first call. do any desired init */
curl_global_init
(
CURL_GLOBAL_ALL
);
thread_setup
();
const
char
*
p_name
=
"s3"
;
char
*
url_with_options
=
EXTPROTOCOL_GET_URL
(
fcinfo
);
char
*
url
=
truncate_options
(
url_with_options
);
...
...
@@ -135,9 +127,8 @@ Datum s3_import(PG_FUNCTION_ARGS) {
bool
result
=
InitConfig
(
config_path
,
""
);
if
(
!
result
)
{
ereport
(
ERROR
,
(
0
,
errmsg
(
"Can't find config file %s"
,
config_path
)));
free
(
config_path
);
ereport
(
ERROR
,
(
0
,
errmsg
(
"Can't find config file, please check"
)));
}
else
{
ClearConfig
();
free
(
config_path
);
...
...
@@ -202,6 +193,7 @@ Datum s3_import(PG_FUNCTION_ARGS) {
/*
* Export data out of GPDB.
* invoked by GPDB, be careful with C++ exceptions.
*/
Datum
s3_export
(
PG_FUNCTION_ARGS
)
{
PG_RETURN_INT32
(
0
);
}
...
...
gpAux/extensions/gps3ext/src/gps3ext.h
浏览文件 @
f58f8093
#ifndef _GPS3EXT_H_
#define _GPS3EXT_H_
#include <string>
#include <signal.h>
// GPDB's global val
extern
volatile
bool
QueryCancelPending
;
// TODO include GpId from proper place
typedef
int32_t
int32
;
...
...
gpAux/extensions/gps3ext/test/s3.conf
→
gpAux/extensions/gps3ext/test/s3
test
.conf
浏览文件 @
f58f8093
文件已移动
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录