提交 bd1ce567 编写于 作者: V Vladimir Chebotarev

Tests decomposition.

上级 d53872c3
<yandex>
<s3_minimum_upload_part_size>1000000</s3_minimum_upload_part_size>
</yandex>
import pytest
from helpers.cluster import ClickHouseCluster
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('dummy')
cluster.start()
yield cluster
finally:
cluster.shutdown()
import httplib
import json
import logging
......@@ -21,34 +5,51 @@ import os
import time
import traceback
import pytest
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
def test_simple(started_cluster):
instance = started_cluster.instances['dummy']
instance.copy_file_to_container(os.path.join(os.path.dirname(__file__), 'test_server.py'), 'test_server.py')
communication_port = 10000
bucket = 'abc'
instance.exec_in_container(['python', 'test_server.py', str(communication_port), bucket], detach=True)
def get_data():
conn = httplib.HTTPConnection(started_cluster.instances['dummy'].ip_address, communication_port)
def get_communication_data(started_cluster):
conn = httplib.HTTPConnection(started_cluster.instances['dummy'].ip_address, started_cluster.communication_port)
conn.request("GET", "/")
r = conn.getresponse()
raw_data = r.read()
conn.close()
return json.loads(raw_data)
format = 'column1 UInt32, column2 UInt32, column3 UInt32'
values = '(1, 2, 3), (3, 2, 1), (78, 43, 45)'
other_values = '(1, 1, 1), (1, 1, 1), (11, 11, 11)'
def put_communication_data(started_cluster, body):
conn = httplib.HTTPConnection(started_cluster.instances['dummy'].ip_address, started_cluster.communication_port)
conn.request("PUT", "/", body)
r = conn.getresponse()
conn.close()
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('dummy', config_dir="configs", main_configs=['configs/min_chunk_size.xml'])
cluster.start()
cluster.communication_port = 10000
instance.copy_file_to_container(os.path.join(os.path.dirname(__file__), 'test_server.py'), 'test_server.py')
cluster.bucket = 'abc'
instance.exec_in_container(['python', 'test_server.py', str(cluster.communication_port), cluster.bucket], detach=True)
cluster.mock_host = instance.ip_address
for i in range(10):
try:
data = get_data()
redirecting_to_http_port = data['redirecting_to_http_port']
preserving_data_port = data['preserving_data_port']
redirecting_preserving_data_port = data['redirecting_preserving_data_port']
data = get_communication_data(cluster)
cluster.redirecting_to_http_port = data['redirecting_to_http_port']
cluster.preserving_data_port = data['preserving_data_port']
cluster.multipart_preserving_data_port = data['multipart_preserving_data_port']
cluster.redirecting_preserving_data_port = data['redirecting_preserving_data_port']
except:
logging.error(traceback.format_exc())
time.sleep(0.5)
......@@ -57,49 +58,41 @@ def test_simple(started_cluster):
else:
assert False, 'Could not initialize mock server'
mock_host = started_cluster.instances['dummy'].ip_address
yield cluster
def run_query(query):
finally:
cluster.shutdown()
def run_query(instance, query, stdin=None):
logging.info('Running query "{}"...'.format(query))
result = instance.query(query)
result = instance.query(query, stdin=stdin)
logging.info('Query finished')
return result
def test_get_with_redirect(started_cluster):
instance = started_cluster.instances['dummy']
format = 'column1 UInt32, column2 UInt32, column3 UInt32'
prepare_put_queries = [
"insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') values {}".format(mock_host, preserving_data_port, bucket, format, values),
]
queries = [
"select *, column1*column2*column3 from s3('http://{}:{}/', 'CSV', '{}')".format(mock_host, redirecting_to_http_port, format),
]
put_query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') values {}".format(mock_host, preserving_data_port, bucket, format, values)
redirect_put_query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') values {}".format(mock_host, redirecting_preserving_data_port, bucket, format, other_values)
check_queries = [
"select *, column1*column2*column3 from s3('http://{}:{}/{}/test.csv', 'CSV', '{}')".format(mock_host, preserving_data_port, bucket, format),
]
try:
logging.info('Phase 1')
for query in prepare_put_queries:
run_query(query)
logging.info('Phase 2')
for query in queries:
stdout = run_query(query)
put_communication_data(started_cluster, '=== Get with redirect test ===')
query = "select *, column1*column2*column3 from s3('http://{}:{}/', 'CSV', '{}')".format(started_cluster.mock_host, started_cluster.redirecting_to_http_port, format)
stdout = run_query(instance, query)
assert list(map(str.split, stdout.splitlines())) == [
['42', '87', '44', '160776'],
['55', '33', '81', '147015'],
['1', '0', '9', '0'],
]
def test_put(started_cluster):
instance = started_cluster.instances['dummy']
format = 'column1 UInt32, column2 UInt32, column3 UInt32'
logging.info('Phase 3')
query = put_query
run_query(query)
data = get_data()
put_communication_data(started_cluster, '=== Put test ===')
values = '(1, 2, 3), (3, 2, 1), (78, 43, 45)'
put_query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') values {}".format(started_cluster.mock_host, started_cluster.preserving_data_port, started_cluster.bucket, format, values)
run_query(instance, put_query)
data = get_communication_data(started_cluster)
received_data_completed = data['received_data_completed']
received_data = data['received_data']
finalize_data = data['finalize_data']
......@@ -109,27 +102,55 @@ def test_simple(started_cluster):
assert finalize_data == '<CompleteMultipartUpload><Part><PartNumber>1</PartNumber><ETag>hello-etag</ETag></Part></CompleteMultipartUpload>'
assert finalize_data_query == 'uploadId=TEST'
logging.info('Phase 4')
query = redirect_put_query
run_query(query)
def test_put_csv(started_cluster):
instance = started_cluster.instances['dummy']
format = 'column1 UInt32, column2 UInt32, column3 UInt32'
for query in check_queries:
logging.info(query)
stdout = run_query(query)
put_communication_data(started_cluster, '=== Put test CSV ===')
put_query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') format CSV".format(started_cluster.mock_host, started_cluster.preserving_data_port, started_cluster.bucket, format)
csv_data = '8,9,16\n11,18,13\n22,14,2\n'
run_query(instance, put_query, stdin=csv_data)
data = get_communication_data(started_cluster)
received_data_completed = data['received_data_completed']
received_data = data['received_data']
finalize_data = data['finalize_data']
finalize_data_query = data['finalize_data_query']
assert received_data[-1].decode() == csv_data
assert received_data_completed
assert finalize_data == '<CompleteMultipartUpload><Part><PartNumber>1</PartNumber><ETag>hello-etag</ETag></Part></CompleteMultipartUpload>'
assert finalize_data_query == 'uploadId=TEST'
def test_put_with_redirect(started_cluster):
instance = started_cluster.instances['dummy']
format = 'column1 UInt32, column2 UInt32, column3 UInt32'
put_communication_data(started_cluster, '=== Put with redirect test ===')
other_values = '(1, 1, 1), (1, 1, 1), (11, 11, 11)'
query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') values {}".format(started_cluster.mock_host, started_cluster.redirecting_preserving_data_port, started_cluster.bucket, format, other_values)
run_query(instance, query)
query = "select *, column1*column2*column3 from s3('http://{}:{}/{}/test.csv', 'CSV', '{}')".format(started_cluster.mock_host, started_cluster.preserving_data_port, started_cluster.bucket, format)
stdout = run_query(instance, query)
assert list(map(str.split, stdout.splitlines())) == [
['1', '1', '1', '1'],
['1', '1', '1', '1'],
['11', '11', '11', '1331'],
]
data = get_data()
data = get_communication_data(started_cluster)
received_data = data['received_data']
assert received_data[-1].decode() == '1,1,1\n1,1,1\n11,11,11\n'
# FIXME tests for multipart
except:
logging.error(traceback.format_exc())
raise
def test_multipart_put(started_cluster):
instance = started_cluster.instances['dummy']
format = 'column1 UInt32, column2 UInt32, column3 UInt32'
else:
logging.info('Done')
put_communication_data(started_cluster, '=== Multipart test ===')
long_data = [[i, i+1, i+2] for i in range(100000)]
long_values = ''.join([ '{},{},{}\n'.format(x,y,z) for x, y, z in long_data ])
put_query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') format CSV".format(started_cluster.mock_host, started_cluster.multipart_preserving_data_port, started_cluster.bucket, format)
run_query(instance, put_query, stdin=long_values)
data = get_communication_data(started_cluster)
assert 'multipart_received_data' in data
received_data = data['multipart_received_data']
assert received_data[-1].decode() == ''.join([ '{},{},{}\n'.format(x, y, z) for x, y, z in long_data ])
assert 1 < data['multipart_parts'] < 10000
......@@ -20,6 +20,8 @@ import socket
import sys
import threading
import time
import uuid
import xml.etree.ElementTree
logging.getLogger().setLevel(logging.INFO)
......@@ -43,13 +45,20 @@ def GetFreeTCPPortsAndIP(n):
[ s.close() for s in sockets ]
return result, addr
(redirecting_to_http_port, simple_server_port, preserving_data_port, redirecting_preserving_data_port), localhost = GetFreeTCPPortsAndIP(4)
(
redirecting_to_http_port,
simple_server_port,
preserving_data_port,
multipart_preserving_data_port,
redirecting_preserving_data_port
), localhost = GetFreeTCPPortsAndIP(5)
data = {
'redirecting_to_http_port': redirecting_to_http_port,
'preserving_data_port': preserving_data_port,
'multipart_preserving_data_port': multipart_preserving_data_port,
'redirecting_preserving_data_port': redirecting_preserving_data_port,
}
redirecting_host = localhost
class SimpleHTTPServerHandler(BaseHTTPRequestHandler):
......@@ -113,7 +122,7 @@ class PreservingDataHandler(BaseHTTPRequestHandler):
def do_POST(self):
self.send_response(200)
query = urlparse.urlparse(self.path).query
logging.info('POST ' + query)
logging.info('PreservingDataHandler POST ?' + query)
if query == 'uploads':
post_data = r'''<?xml version="1.0" encoding="UTF-8"?>
<hi><UploadId>TEST</UploadId></hi>'''.encode()
......@@ -161,6 +170,104 @@ class PreservingDataHandler(BaseHTTPRequestHandler):
self.finish()
class MultipartPreservingDataHandler(BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'
def parse_request(self):
result = BaseHTTPRequestHandler.parse_request(self)
# Adaptation to Python 3.
if sys.version_info.major == 2 and result == True:
expect = self.headers.get('Expect', "")
if (expect.lower() == "100-continue" and self.protocol_version >= "HTTP/1.1" and self.request_version >= "HTTP/1.1"):
if not self.handle_expect_100():
return False
return result
def send_response_only(self, code, message=None):
if message is None:
if code in self.responses:
message = self.responses[code][0]
else:
message = ''
if self.request_version != 'HTTP/0.9':
self.wfile.write("%s %d %s\r\n" % (self.protocol_version, code, message))
def handle_expect_100(self):
logging.info('Received Expect-100')
self.send_response_only(100)
self.end_headers()
return True
def do_POST(self):
query = urlparse.urlparse(self.path).query
logging.info('MultipartPreservingDataHandler POST ?' + query)
if query == 'uploads':
self.send_response(200)
post_data = r'''<?xml version="1.0" encoding="UTF-8"?>
<hi><UploadId>TEST</UploadId></hi>'''.encode()
self.send_header('Content-length', str(len(post_data)))
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(post_data)
else:
try:
assert query == 'uploadId=TEST'
logging.info('Content-Length = ' + self.headers.get('Content-Length'))
post_data = self.rfile.read(int(self.headers.get('Content-Length')))
root = xml.etree.ElementTree.fromstring(post_data)
assert root.tag == 'CompleteMultipartUpload'
assert len(root) > 1
content = ''
for i, part in enumerate(root):
assert part.tag == 'Part'
assert len(part) == 2
assert part[0].tag == 'PartNumber'
assert part[1].tag == 'ETag'
assert int(part[0].text) == i + 1
content += self.server.storage['@'+part[1].text]
data.setdefault('multipart_received_data', []).append(content)
data['multipart_parts'] = len(root)
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
logging.info('Sending 200')
except:
logging.error('Sending 500')
self.send_response(500)
self.finish()
def do_PUT(self):
uid = uuid.uuid4()
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.send_header('ETag', str(uid))
self.end_headers()
query = urlparse.urlparse(self.path).query
path = urlparse.urlparse(self.path).path
logging.info('Content-Length = ' + self.headers.get('Content-Length'))
logging.info('PUT ' + query)
assert self.headers.get('Content-Length')
assert self.headers['Expect'] == '100-continue'
put_data = self.rfile.read()
data.setdefault('received_data', []).append(put_data)
logging.info('PUT to {}'.format(path))
self.server.storage['@'+str(uid)] = put_data
self.finish()
def do_GET(self):
path = urlparse.urlparse(self.path).path
if path in self.server.storage:
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.send_header('Content-length', str(len(self.server.storage[path])))
self.end_headers()
self.wfile.write(self.server.storage[path])
else:
self.send_response(404)
self.end_headers()
self.finish()
class RedirectingPreservingDataHandler(BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'
......@@ -229,12 +336,20 @@ class CommunicationServerHandler(BaseHTTPRequestHandler):
self.wfile.write(json.dumps(data))
self.finish()
def do_PUT(self):
self.send_response(200)
self.end_headers()
logging.info(self.rfile.read())
self.finish()
servers = []
servers.append(HTTPServer((localhost, communication_port), CommunicationServerHandler))
servers.append(HTTPServer((localhost, redirecting_to_http_port), RedirectingToHTTPHandler))
servers.append(HTTPServer((localhost, preserving_data_port), PreservingDataHandler))
servers[-1].storage = {}
servers.append(HTTPServer((localhost, multipart_preserving_data_port), MultipartPreservingDataHandler))
servers[-1].storage = {}
servers.append(HTTPServer((localhost, simple_server_port), SimpleHTTPServerHandler))
servers.append(HTTPServer((localhost, redirecting_preserving_data_port), RedirectingPreservingDataHandler))
jobs = [ threading.Thread(target=server.serve_forever) for server in servers ]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册