diff --git a/dbms/tests/integration/test_storage_s3/test.py b/dbms/tests/integration/test_storage_s3/test.py index c383a2a7bea3ba22ecf53e0e1a361d8589f76947..b975c4c92d56d69f384604cb967fac5a502e3322 100644 --- a/dbms/tests/integration/test_storage_s3/test.py +++ b/dbms/tests/integration/test_storage_s3/test.py @@ -14,112 +14,122 @@ def started_cluster(): cluster.shutdown() +import httplib import json +import logging import os import time +import traceback -def test_sophisticated_default(started_cluster): +logging.getLogger().setLevel(logging.INFO) +logging.getLogger().addHandler(logging.StreamHandler()) + +def test_simple(started_cluster): instance = started_cluster.instances['dummy'] instance.copy_file_to_container(os.path.join(os.path.dirname(__file__), 'test_server.py'), 'test_server.py') - communication_path = '/test_sophisticated_default' + communication_port = 10000 bucket = 'abc' - instance.exec_in_container(['python', 'test_server.py', communication_path, bucket], detach=True) + instance.exec_in_container(['python', 'test_server.py', str(communication_port), bucket], detach=True) + + def get_data(): + conn = httplib.HTTPConnection(started_cluster.instances['dummy'].ip_address, communication_port) + conn.request("GET", "/") + r = conn.getresponse() + raw_data = r.read() + conn.close() + return json.loads(raw_data) format = 'column1 UInt32, column2 UInt32, column3 UInt32' values = '(1, 2, 3), (3, 2, 1), (78, 43, 45)' other_values = '(1, 1, 1), (1, 1, 1), (11, 11, 11)' for i in range(10): try: - raw = instance.exec_in_container(['cat', communication_path]) - data = json.loads(instance.exec_in_container(['cat', communication_path])) + data = get_data() redirecting_to_http_port = data['redirecting_to_http_port'] - redirecting_to_https_port = data['redirecting_to_https_port'] preserving_data_port = data['preserving_data_port'] redirecting_preserving_data_port = data['redirecting_preserving_data_port'] - localhost = data['localhost'] except: + logging.error(traceback.format_exc()) time.sleep(0.5) else: break else: - assert False, 'Could not initialize mock server' + str(raw) + assert False, 'Could not initialize mock server' - redirecting_host = localhost + mock_host = started_cluster.instances['dummy'].ip_address def run_query(query): - print('Running query "{}"...'.format(query)) + logging.info('Running query "{}"...'.format(query)) result = instance.query(query) - print('Query finished') + logging.info('Query finished') return result prepare_put_queries = [ - "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') values {}".format(localhost, preserving_data_port, bucket, format, values), + "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') values {}".format(mock_host, preserving_data_port, bucket, format, values), ] queries = [ - "select *, column1*column2*column3 from s3('http://{}:{}/', 'CSV', '{}')".format(redirecting_host, redirecting_to_http_port, format), + "select *, column1*column2*column3 from s3('http://{}:{}/', 'CSV', '{}')".format(mock_host, redirecting_to_http_port, format), ] - put_query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') values {}".format(redirecting_host, preserving_data_port, bucket, format, values) + put_query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') values {}".format(mock_host, preserving_data_port, bucket, format, values) - redirect_put_query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') values {}".format(redirecting_host, redirecting_preserving_data_port, bucket, format, other_values) + redirect_put_query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') values {}".format(mock_host, redirecting_preserving_data_port, bucket, format, other_values) check_queries = [ - "select *, column1*column2*column3 from s3('http://{}:{}/{}/test.csv', 'CSV', '{}')".format(localhost, preserving_data_port, bucket, format), + "select *, column1*column2*column3 from s3('http://{}:{}/{}/test.csv', 'CSV', '{}')".format(mock_host, preserving_data_port, bucket, format), ] try: - print('Phase 1') + logging.info('Phase 1') for query in prepare_put_queries: run_query(query) - print('Phase 2') + logging.info('Phase 2') for query in queries: stdout = run_query(query) assert list(map(str.split, stdout.splitlines())) == [ - ['1', '2', '3', '6'], - ['3', '2', '1', '6'], - ['78', '43', '45', '150930'], + ['42', '87', '44', '160776'], + ['55', '33', '81', '147015'], + ['1', '0', '9', '0'], ] - print('Phase 3') + logging.info('Phase 3') query = put_query run_query(query) - for i in range(10): - try: - data = json.loads(instance.exec_in_container(['cat', communication_path])) - received_data_completed = data['received_data_completed'] - received_data = data['received_data'] - finalize_data = data['finalize_data'] - finalize_data_query = data['finalize_data_query'] - except: - time.sleep(0.5) - else: - break - else: - assert False, 'Could not read data from mock server'+str(data) + data = get_data() + received_data_completed = data['received_data_completed'] + received_data = data['received_data'] + finalize_data = data['finalize_data'] + finalize_data_query = data['finalize_data_query'] assert received_data[-1].decode() == '1,2,3\n3,2,1\n78,43,45\n' assert received_data_completed assert finalize_data == '1hello-etag' assert finalize_data_query == 'uploadId=TEST' - print('Phase 4') + logging.info('Phase 4') query = redirect_put_query run_query(query) for query in check_queries: - print(query) + logging.info(query) stdout = run_query(query) assert list(map(str.split, stdout.splitlines())) == [ ['1', '1', '1', '1'], ['1', '1', '1', '1'], ['11', '11', '11', '1331'], ] - # FIXME check result + data = get_data() + received_data = data['received_data'] + assert received_data[-1].decode() == '1,1,1\n1,1,1\n11,11,11\n' # FIXME tests for multipart - finally: - print('Done') + except: + logging.error(traceback.format_exc()) + raise + + else: + logging.info('Done') diff --git a/dbms/tests/integration/test_storage_s3/test_server.py b/dbms/tests/integration/test_storage_s3/test_server.py index aed5996212b5646e9d09cbf219f84e6c1bc7c76b..bc22b0df085a7b58cb3635ee7281dc2ce98b9906 100644 --- a/dbms/tests/integration/test_storage_s3/test_server.py +++ b/dbms/tests/integration/test_storage_s3/test_server.py @@ -28,7 +28,7 @@ file_handler.setFormatter(logging.Formatter('%(asctime)s %(message)s')) logging.getLogger().addHandler(file_handler) logging.getLogger().addHandler(logging.StreamHandler()) -comm_path = sys.argv[1] +communication_port = int(sys.argv[1]) bucket = sys.argv[2] def GetFreeTCPPortsAndIP(n): @@ -43,41 +43,34 @@ def GetFreeTCPPortsAndIP(n): [ s.close() for s in sockets ] return result, addr -(redirecting_to_http_port, redirecting_to_https_port, preserving_data_port, redirecting_preserving_data_port), localhost = GetFreeTCPPortsAndIP(4) +(redirecting_to_http_port, simple_server_port, preserving_data_port, redirecting_preserving_data_port), localhost = GetFreeTCPPortsAndIP(4) data = { 'redirecting_to_http_port': redirecting_to_http_port, - 'redirecting_to_https_port': redirecting_to_https_port, 'preserving_data_port': preserving_data_port, 'redirecting_preserving_data_port': redirecting_preserving_data_port, - 'localhost': localhost } redirecting_host = localhost -with open(comm_path, 'w') as f: - f.write(json.dumps(data)) - -class RedirectingToHTTPHandler(BaseHTTPRequestHandler): +class SimpleHTTPServerHandler(BaseHTTPRequestHandler): def do_GET(self): - self.send_response(307) - self.send_header('Content-type', 'text/xml') - self.send_header('Location', 'http://storage.yandexcloud.net/milovidov/test.csv') - self.end_headers() - self.wfile.write(r''' - - TemporaryRedirect - Please re-send this request to the specified temporary endpoint. - Continue to use the original request endpoint for future requests. - storage.yandexcloud.net -'''.encode()) + logging.info('GET {}'.format(self.path)) + if self.path == '/milovidov/test.csv': + self.send_response(200) + self.send_header('Content-type', 'text/plain') + self.end_headers() + self.wfile.write('42,87,44\n55,33,81\n1,0,9\n') + else: + self.send_response(404) + self.end_headers() self.finish() -class RedirectingToHTTPSHandler(BaseHTTPRequestHandler): +class RedirectingToHTTPHandler(BaseHTTPRequestHandler): def do_GET(self): self.send_response(307) self.send_header('Content-type', 'text/xml') - self.send_header('Location', 'https://storage.yandexcloud.net/milovidov/test.csv') + self.send_header('Location', 'http://{}:{}/milovidov/test.csv'.format(localhost, simple_server_port)) self.end_headers() self.wfile.write(r''' @@ -135,8 +128,6 @@ class PreservingDataHandler(BaseHTTPRequestHandler): data['received_data_completed'] = True data['finalize_data'] = post_data data['finalize_data_query'] = query - with open(comm_path, 'w') as f: - f.write(json.dumps(data)) self.finish() def do_PUT(self): @@ -152,8 +143,6 @@ class PreservingDataHandler(BaseHTTPRequestHandler): assert self.headers['Expect'] == '100-continue' put_data = self.rfile.read() data.setdefault('received_data', []).append(put_data) - with open(comm_path, 'w') as f: - f.write(json.dumps(data)) logging.info('PUT to {}'.format(path)) self.server.storage[path] = put_data self.finish() @@ -233,12 +222,21 @@ class RedirectingPreservingDataHandler(BaseHTTPRequestHandler): self.finish() +class CommunicationServerHandler(BaseHTTPRequestHandler): + def do_GET(self): + self.send_response(200) + self.end_headers() + self.wfile.write(json.dumps(data)) + self.finish() + + servers = [] -servers.append(HTTPServer((redirecting_host, redirecting_to_https_port), RedirectingToHTTPSHandler)) -servers.append(HTTPServer((redirecting_host, redirecting_to_http_port), RedirectingToHTTPHandler)) -servers.append(HTTPServer((redirecting_host, preserving_data_port), PreservingDataHandler)) +servers.append(HTTPServer((localhost, communication_port), CommunicationServerHandler)) +servers.append(HTTPServer((localhost, redirecting_to_http_port), RedirectingToHTTPHandler)) +servers.append(HTTPServer((localhost, preserving_data_port), PreservingDataHandler)) servers[-1].storage = {} -servers.append(HTTPServer((redirecting_host, redirecting_preserving_data_port), RedirectingPreservingDataHandler)) +servers.append(HTTPServer((localhost, simple_server_port), SimpleHTTPServerHandler)) +servers.append(HTTPServer((localhost, redirecting_preserving_data_port), RedirectingPreservingDataHandler)) jobs = [ threading.Thread(target=server.serve_forever) for server in servers ] [ job.start() for job in jobs ]