test.py 7.0 KB
Newer Older
V
Vladimir Chebotarev 已提交
1 2 3 4 5 6 7
import httplib
import json
import logging
import os
import time
import traceback

8 9 10 11
import pytest

from helpers.cluster import ClickHouseCluster

V
Vladimir Chebotarev 已提交
12 13 14 15 16 17

logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())


def get_communication_data(started_cluster):
V
Vladimir Chebotarev 已提交
18
    conn = httplib.HTTPConnection(started_cluster.instances["dummy"].ip_address, started_cluster.communication_port)
V
Vladimir Chebotarev 已提交
19 20 21 22 23 24 25 26
    conn.request("GET", "/")
    r = conn.getresponse()
    raw_data = r.read()
    conn.close()
    return json.loads(raw_data)


def put_communication_data(started_cluster, body):
V
Vladimir Chebotarev 已提交
27
    conn = httplib.HTTPConnection(started_cluster.instances["dummy"].ip_address, started_cluster.communication_port)
V
Vladimir Chebotarev 已提交
28 29 30 31 32
    conn.request("PUT", "/", body)
    r = conn.getresponse()
    conn.close()


33 34 35 36
@pytest.fixture(scope="module")
def started_cluster():
    try:
        cluster = ClickHouseCluster(__file__)
V
Vladimir Chebotarev 已提交
37
        instance = cluster.add_instance("dummy", config_dir="configs", main_configs=["configs/min_chunk_size.xml"])
38
        cluster.start()
V
Vladimir Chebotarev 已提交
39 40

        cluster.communication_port = 10000
V
Vladimir Chebotarev 已提交
41 42 43
        instance.copy_file_to_container(os.path.join(os.path.dirname(__file__), "test_server.py"), "test_server.py")
        cluster.bucket = "abc"
        instance.exec_in_container(["python", "test_server.py", str(cluster.communication_port), cluster.bucket], detach=True)
V
Vladimir Chebotarev 已提交
44 45 46 47 48
        cluster.mock_host = instance.ip_address

        for i in range(10):
            try:
                data = get_communication_data(cluster)
V
Vladimir Chebotarev 已提交
49 50 51 52
                cluster.redirecting_to_http_port = data["redirecting_to_http_port"]
                cluster.preserving_data_port = data["preserving_data_port"]
                cluster.multipart_preserving_data_port = data["multipart_preserving_data_port"]
                cluster.redirecting_preserving_data_port = data["redirecting_preserving_data_port"]
V
Vladimir Chebotarev 已提交
53 54 55 56 57 58
            except:
                logging.error(traceback.format_exc())
                time.sleep(0.5)
            else:
                break
        else:
V
Vladimir Chebotarev 已提交
59
            assert False, "Could not initialize mock server"
V
Vladimir Chebotarev 已提交
60

61 62 63 64 65 66
        yield cluster

    finally:
        cluster.shutdown()


V
Vladimir Chebotarev 已提交
67
def run_query(instance, query, stdin=None):
V
Vladimir Chebotarev 已提交
68
    logging.info("Running query '{}'...".format(query))
V
Vladimir Chebotarev 已提交
69
    result = instance.query(query, stdin=stdin)
V
Vladimir Chebotarev 已提交
70
    logging.info("Query finished")
V
Vladimir Chebotarev 已提交
71
    return result
72

V
Vladimir Chebotarev 已提交
73

V
Vladimir Chebotarev 已提交
74
def test_get_with_redirect(started_cluster):
V
Vladimir Chebotarev 已提交
75 76
    instance = started_cluster.instances["dummy"]
    format = "column1 UInt32, column2 UInt32, column3 UInt32"
77

V
Vladimir Chebotarev 已提交
78
    put_communication_data(started_cluster, "=== Get with redirect test ===")
V
Vladimir Chebotarev 已提交
79 80 81
    query = "select *, column1*column2*column3 from s3('http://{}:{}/', 'CSV', '{}')".format(started_cluster.mock_host, started_cluster.redirecting_to_http_port, format)
    stdout = run_query(instance, query)
    assert list(map(str.split, stdout.splitlines())) == [
V
Vladimir Chebotarev 已提交
82 83 84
        ["42", "87", "44", "160776"],
        ["55", "33", "81", "147015"],
        ["1", "0", "9", "0"],
85 86
    ]
    
V
Vladimir Chebotarev 已提交
87

V
Vladimir Chebotarev 已提交
88
def test_put(started_cluster):
V
Vladimir Chebotarev 已提交
89 90
    instance = started_cluster.instances["dummy"]
    format = "column1 UInt32, column2 UInt32, column3 UInt32"
V
Vladimir Chebotarev 已提交
91

V
Vladimir Chebotarev 已提交
92 93 94
    logging.info("Phase 3")
    put_communication_data(started_cluster, "=== Put test ===")
    values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
V
Vladimir Chebotarev 已提交
95 96 97
    put_query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') values {}".format(started_cluster.mock_host, started_cluster.preserving_data_port, started_cluster.bucket, format, values)
    run_query(instance, put_query)
    data = get_communication_data(started_cluster)
V
Vladimir Chebotarev 已提交
98 99 100 101 102
    received_data_completed = data["received_data_completed"]
    received_data = data["received_data"]
    finalize_data = data["finalize_data"]
    finalize_data_query = data["finalize_data_query"]
    assert received_data[-1].decode() == "1,2,3\n3,2,1\n78,43,45\n"
V
Vladimir Chebotarev 已提交
103
    assert received_data_completed
V
Vladimir Chebotarev 已提交
104 105 106
    assert finalize_data == "<CompleteMultipartUpload><Part><PartNumber>1</PartNumber><ETag>hello-etag</ETag></Part></CompleteMultipartUpload>"
    assert finalize_data_query == "uploadId=TEST"

107
    
V
Vladimir Chebotarev 已提交
108
def test_put_csv(started_cluster):
V
Vladimir Chebotarev 已提交
109 110
    instance = started_cluster.instances["dummy"]
    format = "column1 UInt32, column2 UInt32, column3 UInt32"
V
Vladimir Chebotarev 已提交
111

V
Vladimir Chebotarev 已提交
112
    put_communication_data(started_cluster, "=== Put test CSV ===")
V
Vladimir Chebotarev 已提交
113
    put_query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') format CSV".format(started_cluster.mock_host, started_cluster.preserving_data_port, started_cluster.bucket, format)
V
Vladimir Chebotarev 已提交
114
    csv_data = "8,9,16\n11,18,13\n22,14,2\n"
V
Vladimir Chebotarev 已提交
115 116
    run_query(instance, put_query, stdin=csv_data)
    data = get_communication_data(started_cluster)
V
Vladimir Chebotarev 已提交
117 118 119 120
    received_data_completed = data["received_data_completed"]
    received_data = data["received_data"]
    finalize_data = data["finalize_data"]
    finalize_data_query = data["finalize_data_query"]
V
Vladimir Chebotarev 已提交
121 122
    assert received_data[-1].decode() == csv_data
    assert received_data_completed
V
Vladimir Chebotarev 已提交
123 124 125
    assert finalize_data == "<CompleteMultipartUpload><Part><PartNumber>1</PartNumber><ETag>hello-etag</ETag></Part></CompleteMultipartUpload>"
    assert finalize_data_query == "uploadId=TEST"

126
    
V
Vladimir Chebotarev 已提交
127
def test_put_with_redirect(started_cluster):
V
Vladimir Chebotarev 已提交
128 129
    instance = started_cluster.instances["dummy"]
    format = "column1 UInt32, column2 UInt32, column3 UInt32"
V
Vladimir Chebotarev 已提交
130

V
Vladimir Chebotarev 已提交
131 132
    put_communication_data(started_cluster, "=== Put with redirect test ===")
    other_values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
V
Vladimir Chebotarev 已提交
133 134 135 136 137 138
    query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') values {}".format(started_cluster.mock_host, started_cluster.redirecting_preserving_data_port, started_cluster.bucket, format, other_values)
    run_query(instance, query)

    query = "select *, column1*column2*column3 from s3('http://{}:{}/{}/test.csv', 'CSV', '{}')".format(started_cluster.mock_host, started_cluster.preserving_data_port, started_cluster.bucket, format)
    stdout = run_query(instance, query)
    assert list(map(str.split, stdout.splitlines())) == [
V
Vladimir Chebotarev 已提交
139 140 141
        ["1", "1", "1", "1"],
        ["1", "1", "1", "1"],
        ["11", "11", "11", "1331"],
142
    ]
V
Vladimir Chebotarev 已提交
143
    data = get_communication_data(started_cluster)
V
Vladimir Chebotarev 已提交
144 145 146
    received_data = data["received_data"]
    assert received_data[-1].decode() == "1,1,1\n1,1,1\n11,11,11\n"

V
Vladimir Chebotarev 已提交
147 148

def test_multipart_put(started_cluster):
V
Vladimir Chebotarev 已提交
149 150
    instance = started_cluster.instances["dummy"]
    format = "column1 UInt32, column2 UInt32, column3 UInt32"
V
Vladimir Chebotarev 已提交
151

V
Vladimir Chebotarev 已提交
152
    put_communication_data(started_cluster, "=== Multipart test ===")
V
Vladimir Chebotarev 已提交
153
    long_data = [[i, i+1, i+2] for i in range(100000)]
V
Vladimir Chebotarev 已提交
154
    long_values = "".join([ "{},{},{}\n".format(x,y,z) for x, y, z in long_data ])
V
Vladimir Chebotarev 已提交
155 156 157
    put_query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') format CSV".format(started_cluster.mock_host, started_cluster.multipart_preserving_data_port, started_cluster.bucket, format)
    run_query(instance, put_query, stdin=long_values)
    data = get_communication_data(started_cluster)
V
Vladimir Chebotarev 已提交
158 159 160 161
    assert "multipart_received_data" in data
    received_data = data["multipart_received_data"]
    assert received_data[-1].decode() == "".join([ "{},{},{}\n".format(x, y, z) for x, y, z in long_data ])
    assert 1 < data["multipart_parts"] < 10000