提交 a38c7ff2 编写于 作者: V Vladimir Chebotarev

Somehow uncompressed PUT works and gzipped PUT doesn't, in S3 storage.

上级 b7a6f15f
......@@ -197,7 +197,7 @@ StorageS3::StorageS3(
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_,
const String & compression_method_ = "")
const String & compression_method_)
: IStorage(table_id_)
, uri(uri_)
, context_global(context_)
......@@ -372,7 +372,18 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
format_name = engine_args.back()->as<ASTLiteral &>().value.safeGet<String>();
}
return StorageS3::create(s3_uri, access_key_id, secret_access_key, args.table_id, format_name, min_upload_part_size, args.columns, args.constraints, args.context);
return StorageS3::create(
s3_uri,
access_key_id,
secret_access_key,
args.table_id,
format_name,
min_upload_part_size,
args.columns,
args.constraints,
args.context,
compression_method
);
},
{
.source_access_type = AccessType::S3,
......
......@@ -34,7 +34,7 @@ public:
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_,
const String & compression_method_);
const String & compression_method_ = "");
String getName() const override
{
......
......@@ -392,6 +392,41 @@ def test_storage_s3_get_gzip(cluster):
run_query(instance, "DROP TABLE {}".format(name))
def test_storage_s3_put_uncompressed(cluster):
bucket = cluster.minio_bucket
instance = cluster.instances["dummy"]
filename = "test_put_uncompressed.bin"
name = "test_put_uncompressed"
data = [
"'Gloria Thompson',99",
"'Matthew Tang',98",
"'Patsy Anderson',23",
"'Nancy Badillo',93",
"'Roy Hunt',5",
"'Adam Kirk',51",
"'Joshua Douds',28",
"'Jolene Ryan',0",
"'Roxanne Padilla',50",
"'Howard Roberts',41",
"'Ricardo Broughton',13",
"'Roland Speer',83",
"'Cathy Cohan',58",
"'Kathie Dawson',100",
"'Gregg Mcquistion',11",
]
try:
run_query(instance, "CREATE TABLE {} (name String, id UInt32) ENGINE = S3('http://{}:{}/{}/{}', 'CSV')".format(
name, cluster.minio_host, cluster.minio_port, bucket, filename))
run_query(instance, "INSERT INTO {} VALUES ({})".format(name, "),(".join(data)))
run_query(instance, "SELECT sum(id) FROM {}".format(name)).splitlines() == ["753"]
uncompressed_content = get_s3_file_content(cluster, bucket, filename)
assert sum([ int(i.split(',')[1]) for i in uncompressed_content.splitlines() ]) == 753
finally:
run_query(instance, "DROP TABLE {}".format(name))
def test_storage_s3_put_gzip(cluster):
bucket = cluster.minio_bucket
instance = cluster.instances["dummy"]
......@@ -419,6 +454,7 @@ def test_storage_s3_put_gzip(cluster):
name, cluster.minio_host, cluster.minio_port, bucket, filename))
run_query(instance, "INSERT INTO {} VALUES ({})".format(name, "),(".join(data)))
run_query(instance, "SELECT sum(id) FROM {}".format(name)).splitlines() == ["708"]
buf = StringIO.StringIO(get_s3_file_content(cluster, bucket, filename))
f = gzip.GzipFile(fileobj=buf, mode="rb")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册