test.py 7.0 KB
Newer Older
V
Vladimir Chebotarev 已提交
1 2 3 4 5 6 7
import httplib
import json
import logging
import os
import time
import traceback

8 9 10 11
import pytest

from helpers.cluster import ClickHouseCluster

V
Vladimir Chebotarev 已提交
12 13 14 15 16 17

logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())


def get_communication_data(started_cluster):
V
Vladimir Chebotarev 已提交
18
    conn = httplib.HTTPConnection(started_cluster.instances["dummy"].ip_address, started_cluster.communication_port)
V
Vladimir Chebotarev 已提交
19 20 21 22 23 24 25 26
    conn.request("GET", "/")
    r = conn.getresponse()
    raw_data = r.read()
    conn.close()
    return json.loads(raw_data)


def put_communication_data(started_cluster, body):
V
Vladimir Chebotarev 已提交
27
    conn = httplib.HTTPConnection(started_cluster.instances["dummy"].ip_address, started_cluster.communication_port)
V
Vladimir Chebotarev 已提交
28 29 30 31 32
    conn.request("PUT", "/", body)
    r = conn.getresponse()
    conn.close()


33 34 35 36
@pytest.fixture(scope="module")
def started_cluster():
    try:
        cluster = ClickHouseCluster(__file__)
37
        instance = cluster.add_instance("dummy")
38
        cluster.start()
V
Vladimir Chebotarev 已提交
39 40

        cluster.communication_port = 10000
V
Vladimir Chebotarev 已提交
41 42 43
        instance.copy_file_to_container(os.path.join(os.path.dirname(__file__), "test_server.py"), "test_server.py")
        cluster.bucket = "abc"
        instance.exec_in_container(["python", "test_server.py", str(cluster.communication_port), cluster.bucket], detach=True)
V
Vladimir Chebotarev 已提交
44 45 46 47 48
        cluster.mock_host = instance.ip_address

        for i in range(10):
            try:
                data = get_communication_data(cluster)
V
Vladimir Chebotarev 已提交
49 50 51 52
                cluster.redirecting_to_http_port = data["redirecting_to_http_port"]
                cluster.preserving_data_port = data["preserving_data_port"]
                cluster.multipart_preserving_data_port = data["multipart_preserving_data_port"]
                cluster.redirecting_preserving_data_port = data["redirecting_preserving_data_port"]
V
Vladimir Chebotarev 已提交
53 54 55 56 57 58
            except:
                logging.error(traceback.format_exc())
                time.sleep(0.5)
            else:
                break
        else:
V
Vladimir Chebotarev 已提交
59
            assert False, "Could not initialize mock server"
V
Vladimir Chebotarev 已提交
60

61 62 63 64 65 66
        yield cluster

    finally:
        cluster.shutdown()


V
Vladimir Chebotarev 已提交
67
def run_query(instance, query, stdin=None):
V
Vladimir Chebotarev 已提交
68
    logging.info("Running query '{}'...".format(query))
V
Vladimir Chebotarev 已提交
69
    result = instance.query(query, stdin=stdin)
V
Vladimir Chebotarev 已提交
70
    logging.info("Query finished")
V
Vladimir Chebotarev 已提交
71
    return result
72

V
Vladimir Chebotarev 已提交
73

V
Vladimir Chebotarev 已提交
74
def test_get_with_redirect(started_cluster):
V
Vladimir Chebotarev 已提交
75 76
    instance = started_cluster.instances["dummy"]
    format = "column1 UInt32, column2 UInt32, column3 UInt32"
77

V
Vladimir Chebotarev 已提交
78
    put_communication_data(started_cluster, "=== Get with redirect test ===")
V
Vladimir Chebotarev 已提交
79 80
    query = "select *, column1*column2*column3 from s3('http://{}:{}/', 'CSV', '{}')".format(started_cluster.mock_host, started_cluster.redirecting_to_http_port, format)
    stdout = run_query(instance, query)
V
Vladimir Chebotarev 已提交
81 82 83
    data = get_communication_data(started_cluster)
    expected = [ [str(row[0]), str(row[1]), str(row[2]), str(row[0]*row[1]*row[2])] for row in data["redirect_csv_data"] ]
    assert list(map(str.split, stdout.splitlines())) == expected
84
    
V
Vladimir Chebotarev 已提交
85

V
Vladimir Chebotarev 已提交
86
def test_put(started_cluster):
V
Vladimir Chebotarev 已提交
87 88
    instance = started_cluster.instances["dummy"]
    format = "column1 UInt32, column2 UInt32, column3 UInt32"
V
Vladimir Chebotarev 已提交
89

V
Vladimir Chebotarev 已提交
90 91 92
    logging.info("Phase 3")
    put_communication_data(started_cluster, "=== Put test ===")
    values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
V
Vladimir Chebotarev 已提交
93 94 95
    put_query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') values {}".format(started_cluster.mock_host, started_cluster.preserving_data_port, started_cluster.bucket, format, values)
    run_query(instance, put_query)
    data = get_communication_data(started_cluster)
V
Vladimir Chebotarev 已提交
96 97 98 99 100
    received_data_completed = data["received_data_completed"]
    received_data = data["received_data"]
    finalize_data = data["finalize_data"]
    finalize_data_query = data["finalize_data_query"]
    assert received_data[-1].decode() == "1,2,3\n3,2,1\n78,43,45\n"
V
Vladimir Chebotarev 已提交
101
    assert received_data_completed
V
Vladimir Chebotarev 已提交
102 103 104
    assert finalize_data == "<CompleteMultipartUpload><Part><PartNumber>1</PartNumber><ETag>hello-etag</ETag></Part></CompleteMultipartUpload>"
    assert finalize_data_query == "uploadId=TEST"

105
    
V
Vladimir Chebotarev 已提交
106
def test_put_csv(started_cluster):
V
Vladimir Chebotarev 已提交
107 108
    instance = started_cluster.instances["dummy"]
    format = "column1 UInt32, column2 UInt32, column3 UInt32"
V
Vladimir Chebotarev 已提交
109

V
Vladimir Chebotarev 已提交
110
    put_communication_data(started_cluster, "=== Put test CSV ===")
V
Vladimir Chebotarev 已提交
111
    put_query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') format CSV".format(started_cluster.mock_host, started_cluster.preserving_data_port, started_cluster.bucket, format)
V
Vladimir Chebotarev 已提交
112
    csv_data = "8,9,16\n11,18,13\n22,14,2\n"
V
Vladimir Chebotarev 已提交
113 114
    run_query(instance, put_query, stdin=csv_data)
    data = get_communication_data(started_cluster)
V
Vladimir Chebotarev 已提交
115 116 117 118
    received_data_completed = data["received_data_completed"]
    received_data = data["received_data"]
    finalize_data = data["finalize_data"]
    finalize_data_query = data["finalize_data_query"]
V
Vladimir Chebotarev 已提交
119 120
    assert received_data[-1].decode() == csv_data
    assert received_data_completed
V
Vladimir Chebotarev 已提交
121 122 123
    assert finalize_data == "<CompleteMultipartUpload><Part><PartNumber>1</PartNumber><ETag>hello-etag</ETag></Part></CompleteMultipartUpload>"
    assert finalize_data_query == "uploadId=TEST"

124
    
V
Vladimir Chebotarev 已提交
125
def test_put_with_redirect(started_cluster):
V
Vladimir Chebotarev 已提交
126 127
    instance = started_cluster.instances["dummy"]
    format = "column1 UInt32, column2 UInt32, column3 UInt32"
V
Vladimir Chebotarev 已提交
128

V
Vladimir Chebotarev 已提交
129 130
    put_communication_data(started_cluster, "=== Put with redirect test ===")
    other_values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
V
Vladimir Chebotarev 已提交
131 132 133 134 135 136
    query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') values {}".format(started_cluster.mock_host, started_cluster.redirecting_preserving_data_port, started_cluster.bucket, format, other_values)
    run_query(instance, query)

    query = "select *, column1*column2*column3 from s3('http://{}:{}/{}/test.csv', 'CSV', '{}')".format(started_cluster.mock_host, started_cluster.preserving_data_port, started_cluster.bucket, format)
    stdout = run_query(instance, query)
    assert list(map(str.split, stdout.splitlines())) == [
V
Vladimir Chebotarev 已提交
137 138 139
        ["1", "1", "1", "1"],
        ["1", "1", "1", "1"],
        ["11", "11", "11", "1331"],
140
    ]
V
Vladimir Chebotarev 已提交
141
    data = get_communication_data(started_cluster)
V
Vladimir Chebotarev 已提交
142 143 144
    received_data = data["received_data"]
    assert received_data[-1].decode() == "1,1,1\n1,1,1\n11,11,11\n"

V
Vladimir Chebotarev 已提交
145 146

def test_multipart_put(started_cluster):
V
Vladimir Chebotarev 已提交
147 148
    instance = started_cluster.instances["dummy"]
    format = "column1 UInt32, column2 UInt32, column3 UInt32"
V
Vladimir Chebotarev 已提交
149

V
Vladimir Chebotarev 已提交
150
    put_communication_data(started_cluster, "=== Multipart test ===")
V
Vladimir Chebotarev 已提交
151
    long_data = [[i, i+1, i+2] for i in range(100000)]
V
Vladimir Chebotarev 已提交
152
    long_values = "".join([ "{},{},{}\n".format(x,y,z) for x, y, z in long_data ])
153
    put_query = "set s3_min_upload_part_size = 1000000; insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') format CSV".format(started_cluster.mock_host, started_cluster.multipart_preserving_data_port, started_cluster.bucket, format)
V
Vladimir Chebotarev 已提交
154 155
    run_query(instance, put_query, stdin=long_values)
    data = get_communication_data(started_cluster)
V
Vladimir Chebotarev 已提交
156 157 158 159
    assert "multipart_received_data" in data
    received_data = data["multipart_received_data"]
    assert received_data[-1].decode() == "".join([ "{},{},{}\n".format(x, y, z) for x, y, z in long_data ])
    assert 1 < data["multipart_parts"] < 10000