test.py 9.5 KB
Newer Older
V
Vladimir Chebotarev 已提交
1 2 3
import json
import logging

4 5
import pytest

6
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
V
Vladimir Chebotarev 已提交
7

8 9 10
import helpers.client


V
Vladimir Chebotarev 已提交
11 12 13 14
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())


15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
# Creates S3 bucket for tests and allows anonymous read-write access to it.
def prepare_s3_bucket(cluster):
    minio_client = cluster.minio_client

    if minio_client.bucket_exists(cluster.minio_bucket):
        minio_client.remove_bucket(cluster.minio_bucket)

    minio_client.make_bucket(cluster.minio_bucket)

    # Allows read-write access for bucket without authorization.
    bucket_read_write_policy = {"Version": "2012-10-17",
                                "Statement": [
                                    {
                                        "Sid": "",
                                        "Effect": "Allow",
                                        "Principal": {"AWS": "*"},
                                        "Action": "s3:GetBucketLocation",
                                        "Resource": "arn:aws:s3:::root"
                                    },
                                    {
                                        "Sid": "",
                                        "Effect": "Allow",
                                        "Principal": {"AWS": "*"},
                                        "Action": "s3:ListBucket",
                                        "Resource": "arn:aws:s3:::root"
                                    },
                                    {
                                        "Sid": "",
                                        "Effect": "Allow",
                                        "Principal": {"AWS": "*"},
                                        "Action": "s3:GetObject",
                                        "Resource": "arn:aws:s3:::root/*"
                                    },
                                    {
                                        "Sid": "",
                                        "Effect": "Allow",
                                        "Principal": {"AWS": "*"},
                                        "Action": "s3:PutObject",
                                        "Resource": "arn:aws:s3:::root/*"
                                    }
                                ]}

    minio_client.set_bucket_policy(cluster.minio_bucket, json.dumps(bucket_read_write_policy))

59 60 61 62 63 64
    cluster.minio_restricted_bucket = "{}-with-auth".format(cluster.minio_bucket)
    if minio_client.bucket_exists(cluster.minio_restricted_bucket):
        minio_client.remove_bucket(cluster.minio_restricted_bucket)

    minio_client.make_bucket(cluster.minio_restricted_bucket)

65 66

# Returns content of given S3 file as string.
67
def get_s3_file_content(cluster, bucket, filename):
68 69
    # type: (ClickHouseCluster, str) -> str

70
    data = cluster.minio_client.get_object(bucket, filename)
71 72 73 74
    data_str = ""
    for chunk in data.stream():
        data_str += chunk
    return data_str
V
Vladimir Chebotarev 已提交
75 76


77 78 79 80 81 82 83 84
# Returns nginx access log lines.
def get_nginx_access_logs():
    handle = open("/nginx/access.log", "r")
    data = handle.readlines()
    handle.close()
    return data


85
@pytest.fixture(scope="module")
86
def cluster():
87 88
    try:
        cluster = ClickHouseCluster(__file__)
89 90
        cluster.add_instance("dummy", with_minio=True)
        logging.info("Starting cluster...")
91
        cluster.start()
92
        logging.info("Cluster started")
V
Vladimir Chebotarev 已提交
93

94 95
        prepare_s3_bucket(cluster)
        logging.info("S3 bucket created")
V
Vladimir Chebotarev 已提交
96

97 98 99 100 101
        yield cluster
    finally:
        cluster.shutdown()


V
Fixes.  
Vladimir Chebotarev 已提交
102
def run_query(instance, query, stdin=None, settings=None):
103 104
    # type: (ClickHouseInstance, str, object, dict) -> str

V
Vladimir Chebotarev 已提交
105
    logging.info("Running query '{}'...".format(query))
V
Fixes.  
Vladimir Chebotarev 已提交
106
    result = instance.query(query, stdin=stdin, settings=settings)
V
Vladimir Chebotarev 已提交
107
    logging.info("Query finished")
108

109
    return result
V
Vladimir Chebotarev 已提交
110 111


112
# Test simple put.
113 114 115 116 117 118
@pytest.mark.parametrize("maybe_auth,positive", [
    ("",True),
    ("'minio','minio123',",True),
    ("'wrongid','wrongkey',",False)
])
def test_put(cluster, maybe_auth, positive):
119
    # type: (ClickHouseCluster) -> None
V
Vladimir Chebotarev 已提交
120

121
    bucket = cluster.minio_bucket if not maybe_auth else cluster.minio_restricted_bucket
122 123
    instance = cluster.instances["dummy"]  # type: ClickHouseInstance
    table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
V
Vladimir Chebotarev 已提交
124
    values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
125 126
    values_csv = "1,2,3\n3,2,1\n78,43,45\n"
    filename = "test.csv"
127 128
    put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') values {}".format(
        cluster.minio_host, cluster.minio_port, bucket, filename, maybe_auth, table_format, values)
129

130 131 132 133 134 135 136
    try:
        run_query(instance, put_query)
    except helpers.client.QueryRuntimeException:
        assert not positive
    else:
        assert positive
        assert values_csv == get_s3_file_content(cluster, bucket, filename)
137 138 139


# Test put values in CSV format.
140 141 142 143 144 145
@pytest.mark.parametrize("maybe_auth,positive", [
    ("",True),
    ("'minio','minio123',",True),
    ("'wrongid','wrongkey',",False)
])
def test_put_csv(cluster, maybe_auth, positive):
146 147
    # type: (ClickHouseCluster) -> None

148
    bucket = cluster.minio_bucket if not maybe_auth else cluster.minio_restricted_bucket
149 150 151
    instance = cluster.instances["dummy"]  # type: ClickHouseInstance
    table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
    filename = "test.csv"
152 153
    put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV".format(
        cluster.minio_host, cluster.minio_port, bucket, filename, maybe_auth, table_format)
V
Vladimir Chebotarev 已提交
154
    csv_data = "8,9,16\n11,18,13\n22,14,2\n"
155

156 157 158 159 160 161 162
    try:
        run_query(instance, put_query, stdin=csv_data)
    except helpers.client.QueryRuntimeException:
        assert not positive
    else:
        assert positive
        assert csv_data == get_s3_file_content(cluster, bucket, filename)
163 164 165 166 167 168


# Test put and get with S3 server redirect.
def test_put_get_with_redirect(cluster):
    # type: (ClickHouseCluster) -> None

169
    bucket = cluster.minio_bucket
170 171 172 173 174 175
    instance = cluster.instances["dummy"]  # type: ClickHouseInstance
    table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
    values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
    values_csv = "1,1,1\n1,1,1\n11,11,11\n"
    filename = "test.csv"
    query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
176
        cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, table_format, values)
V
Vladimir Chebotarev 已提交
177 178
    run_query(instance, query)

179
    assert values_csv == get_s3_file_content(cluster, bucket, filename)
180 181

    query = "select *, column1*column2*column3 from s3('http://{}:{}/{}/{}', 'CSV', '{}')".format(
182
        cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, table_format)
V
Vladimir Chebotarev 已提交
183
    stdout = run_query(instance, query)
184

V
Vladimir Chebotarev 已提交
185
    assert list(map(str.split, stdout.splitlines())) == [
V
Vladimir Chebotarev 已提交
186 187 188
        ["1", "1", "1", "1"],
        ["1", "1", "1", "1"],
        ["11", "11", "11", "1331"],
189
    ]
190 191 192


# Test multipart put.
193 194 195 196 197 198
@pytest.mark.parametrize("maybe_auth,positive", [
    ("",True),
    ("'minio','minio123',",True),
    ("'wrongid','wrongkey',",False)
])
def test_multipart_put(cluster, maybe_auth, positive):
199 200
    # type: (ClickHouseCluster) -> None

201
    bucket = cluster.minio_bucket if not maybe_auth else cluster.minio_restricted_bucket
202 203 204 205 206
    instance = cluster.instances["dummy"]  # type: ClickHouseInstance
    table_format = "column1 UInt32, column2 UInt32, column3 UInt32"

    # Minimum size of part is 5 Mb for Minio.
    # See: https://github.com/minio/minio/blob/master/docs/minio-limits.md
207 208 209 210 211 212 213 214
    min_part_size_bytes = 5 * 1024 * 1024
    csv_size_bytes = int(min_part_size_bytes * 1.5)  # To have 2 parts.

    one_line_length = 6  # 3 digits, 2 commas, 1 line separator.

    # Generate data having size more than one part
    int_data = [[1, 2, 3] for i in range(csv_size_bytes / one_line_length)]
    csv_data = "".join(["{},{},{}\n".format(x, y, z) for x, y, z in int_data])
215

216 217 218
    assert len(csv_data) > min_part_size_bytes

    filename = "test_multipart.csv"
219 220
    put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV".format(
        cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, maybe_auth, table_format)
V
Vladimir Chebotarev 已提交
221

222 223 224 225 226 227 228 229 230 231 232 233 234
    try:
        run_query(instance, put_query, stdin=csv_data, settings={'s3_min_upload_part_size': min_part_size_bytes})
    except helpers.client.QueryRuntimeException:
        assert not positive
    else:
        assert positive

        # Use Nginx access logs to count number of parts uploaded to Minio.
        nginx_logs = get_nginx_access_logs()
        uploaded_parts = filter(lambda log_line: log_line.find(filename) >= 0 and log_line.find("PUT") >= 0, nginx_logs)
        assert uploaded_parts > 1

        assert csv_data == get_s3_file_content(cluster, bucket, filename)
235 236 237 238 239 240 241 242 243 244 245 246

def test_remote_host_filter(started_cluster):
    instance = started_cluster.instances["dummy"]
    format = "column1 UInt32, column2 UInt32, column3 UInt32"

    put_communication_data(started_cluster, "=== RemoteHostFilter test ===")
    query = "select *, column1*column2*column3 from s3('http://{}:{}/', 'CSV', '{}')".format("invalid_host", started_cluster.redirecting_to_http_port, format)
    assert "not allowed in config.xml" in instance.query_and_get_error(query)

    other_values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
    query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') values {}".format("invalid_host", started_cluster.redirecting_preserving_data_port, started_cluster.bucket, format, other_values)
    assert "not allowed in config.xml" in instance.query_and_get_error(query)