提交 c951ec7b 编写于 作者: A alesapin

CLICKHOUSE-4067: Fixes in odbc dictionaries. Now dictionaries receives bridge...

CLICKHOUSE-4067: Fixes in odbc dictionaries. Now dictionaries receives bridge path from global config (as it was before). Quotation performed on table and schema independently. Add tests with postgres interaction. Update documentation
上级 cb6e8409
......@@ -254,7 +254,8 @@ struct ODBCBridgeMixin
static void startBridge(const Poco::Util::AbstractConfiguration & config, Poco::Logger * log, const Poco::Timespan & http_timeout)
{
Poco::Path path{config.getString("application.dir", "")};
/// Path to executable folder
Poco::Path path{config.getString("application.dir", "/usr/bin")};
path.setFileName(
#if CLICKHOUSE_SPLIT_BINARY
......@@ -264,9 +265,6 @@ struct ODBCBridgeMixin
#endif
);
if (!Poco::File(path).exists())
throw Exception("clickhouse binary (" + path.toString() + ") is not found", ErrorCodes::EXTERNAL_EXECUTABLE_NOT_FOUND);
std::stringstream command;
command << path.toString() <<
......
......@@ -155,7 +155,8 @@ DictionarySourcePtr DictionarySourceFactory::create(
else if ("odbc" == source_type)
{
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
BridgeHelperPtr bridge = std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(config, context.getSettings().http_connection_timeout, config.getString(config_prefix + ".odbc.connection_string"));
const auto & global_config = context.getConfigRef();
BridgeHelperPtr bridge = std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(global_config, context.getSettings().http_connection_timeout, config.getString(config_prefix + ".odbc.connection_string"));
return std::make_unique<XDBCDictionarySource>(dict_struct, config, config_prefix + ".odbc", sample_block, context, bridge);
#else
throw Exception{"Dictionary source of type `odbc` is disabled because poco library was built without ODBC support.",
......
......@@ -14,17 +14,28 @@ namespace DB
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
extern const int LOGICAL_ERROR;
}
ExternalQueryBuilder::ExternalQueryBuilder(
const DictionaryStructure & dict_struct,
const std::string & db,
const std::string & table,
const std::string & where,
IdentifierQuotingStyle quoting_style)
: dict_struct(dict_struct), db(db), table(table), where(where), quoting_style(quoting_style)
const DictionaryStructure & dict_struct_,
const std::string & db_,
const std::string & table_,
const std::string & where_,
IdentifierQuotingStyle quoting_style_)
: dict_struct(dict_struct_), db(db_), where(where_), quoting_style(quoting_style_)
{
if (auto pos = table_.find('.'); pos != std::string::npos)
{
schema = table_.substr(0, pos);
table = table_.substr(pos + 1);
}
else
{
schema = "";
table = table_;
}
}
......@@ -124,6 +135,11 @@ std::string ExternalQueryBuilder::composeLoadAllQuery() const
writeQuoted(db, out);
writeChar('.', out);
}
if (!schema.empty())
{
writeQuoted(schema, out);
writeChar('.', out);
}
writeQuoted(table, out);
if (!where.empty())
......@@ -187,6 +203,12 @@ std::string ExternalQueryBuilder::composeLoadIdsQuery(const std::vector<UInt64>
writeQuoted(db, out);
writeChar('.', out);
}
if (!schema.empty())
{
writeQuoted(schema, out);
writeChar('.', out);
}
writeQuoted(table, out);
writeString(" WHERE ", out);
......@@ -250,6 +272,12 @@ std::string ExternalQueryBuilder::composeLoadKeysQuery(
writeQuoted(db, out);
writeChar('.', out);
}
if (!schema.empty())
{
writeQuoted(schema, out);
writeChar('.', out);
}
writeQuoted(table, out);
writeString(" WHERE ", out);
......
......@@ -18,19 +18,20 @@ class WriteBuffer;
struct ExternalQueryBuilder
{
const DictionaryStructure & dict_struct;
const std::string & db;
const std::string & table;
std::string db;
std::string table;
std::string schema;
const std::string & where;
IdentifierQuotingStyle quoting_style;
ExternalQueryBuilder(
const DictionaryStructure & dict_struct,
const std::string & db,
const std::string & table,
const std::string & where,
IdentifierQuotingStyle quoting_style);
const DictionaryStructure & dict_struct_,
const std::string & db_,
const std::string & table_,
const std::string & where_,
IdentifierQuotingStyle quoting_style_);
/** Generate a query to load all data. */
std::string composeLoadAllQuery() const;
......
......@@ -14,9 +14,9 @@ Don't use Docker from your system repository.
* [pip](https://pypi.python.org/pypi/pip). To install: `sudo apt-get install python-pip`
* [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml kazoo PyMySQL psycopg2`
(highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python-pytest python-dicttoxml python-docker python-pymysql python-kazoo`
(highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python-pytest python-dicttoxml python-docker python-pymysql python-kazoo python-psycopg2`
If you want to run the tests under a non-privileged user, you must add this user to `docker` group: `sudo usermod -aG docker $USER` and re-login.
(You must close all your sessions (for example, restart your computer))
......
......@@ -13,6 +13,7 @@ import pymysql
import xml.dom.minidom
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
import psycopg2
import docker
from docker.errors import ContainerError
......@@ -79,6 +80,7 @@ class ClickHouseCluster:
self.instances = {}
self.with_zookeeper = False
self.with_mysql = False
self.with_postgres = False
self.with_kafka = False
self.with_odbc_drivers = False
......@@ -92,7 +94,7 @@ class ClickHouseCluster:
cmd += " client"
return cmd
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, hostname=None, env_variables={}, image="ubuntu:14.04"):
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, hostname=None, env_variables={}, image="ubuntu:14.04"):
"""Add an instance to the cluster.
name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
......@@ -127,6 +129,12 @@ class ClickHouseCluster:
self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]
if with_postgres and not self.with_postgres:
self.with_postgres = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')])
self.base_postgres_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')]
if with_odbc_drivers and not self.with_odbc_drivers:
self.with_odbc_drivers = True
if not self.with_mysql:
......@@ -134,6 +142,12 @@ class ClickHouseCluster:
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')])
self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]
if not self.with_postgres:
self.with_postgres = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')])
self.base_postgres_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')]
if with_kafka and not self.with_kafka:
self.with_kafka = True
......@@ -168,6 +182,21 @@ class ClickHouseCluster:
raise Exception("Cannot wait MySQL container")
def wait_postgres_to_start(self, timeout=60):
start = time.time()
while time.time() - start < timeout:
try:
conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
conn = psycopg2.connect(conn_string)
conn.close()
print "Postgres Started"
return
except Exception as ex:
print "Can't connect to Postgres " + str(ex)
time.sleep(0.5)
raise Exception("Cannot wait Postgres container")
def wait_zookeeper_to_start(self, timeout=60):
start = time.time()
while time.time() - start < timeout:
......@@ -204,20 +233,24 @@ class ClickHouseCluster:
self.docker_client = docker.from_env(version=self.docker_api_version)
if self.with_zookeeper and self.base_zookeeper_cmd:
subprocess_check_call(self.base_zookeeper_cmd + ['up', '-d', '--force-recreate', '--remove-orphans'])
subprocess_check_call(self.base_zookeeper_cmd + ['up', '-d', '--force-recreate'])
for command in self.pre_zookeeper_commands:
self.run_kazoo_commands_with_retries(command, repeats=5)
self.wait_zookeeper_to_start(120)
if self.with_mysql and self.base_mysql_cmd:
subprocess_check_call(self.base_mysql_cmd + ['up', '-d', '--force-recreate', '--remove-orphans'])
subprocess_check_call(self.base_mysql_cmd + ['up', '-d', '--force-recreate'])
self.wait_mysql_to_start(120)
if self.with_postgres and self.base_postgres_cmd:
subprocess_check_call(self.base_postgres_cmd + ['up', '-d', '--force-recreate'])
self.wait_postgres_to_start(120)
if self.with_kafka and self.base_kafka_cmd:
subprocess_check_call(self.base_kafka_cmd + ['up', '-d', '--force-recreate', '--remove-orphans'])
subprocess_check_call(self.base_kafka_cmd + ['up', '-d', '--force-recreate'])
self.kafka_docker_id = self.get_instance_docker_id('kafka1')
subprocess_check_call(self.base_cmd + ['up', '-d', '--force-recreate', '--remove-orphans'])
subprocess_check_call(self.base_cmd + ['up', '-d', '--force-recreate'])
start_deadline = time.time() + 20.0 # seconds
for instance in self.instances.itervalues():
......@@ -444,8 +477,18 @@ class ClickHouseInstance:
},
"PostgreSQL": {
"DSN": "postgresql_odbc",
"Database": "postgres",
"UserName": "postgres",
"Password": "mysecretpassword",
"Port": "5432",
"Servername": "postgres1",
"Protocol": "9.3",
"ReadOnly": "No",
"RowVersioning": "No",
"ShowSystemTables": "No",
"Driver": "/usr/lib/x86_64-linux-gnu/odbc/psqlodbca.so",
"Setup": "/usr/lib/x86_64-linux-gnu/odbc/libodbcpsqlS.so",
"ConnSettings": "",
}
}
else:
......
version: '2'
services:
postgres1:
image: postgres
restart: always
environment:
POSTGRES_PASSWORD: mysecretpassword
ports:
- 5432:5432
......@@ -3,12 +3,14 @@ import pytest
import os
import pymysql.cursors
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from helpers.cluster import ClickHouseCluster
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
node1 = cluster.add_instance('node1', with_odbc_drivers=True, with_mysql=True, image='alesapin/ubuntu_with_odbc:14.04', main_configs=['configs/dictionaries/sqlite3_odbc_hashed_dictionary.xml', 'configs/dictionaries/sqlite3_odbc_cached_dictionary.xml'])
node1 = cluster.add_instance('node1', with_odbc_drivers=True, with_mysql=True, image='alesapin/ubuntu_with_odbc:14.04', main_configs=['configs/dictionaries/sqlite3_odbc_hashed_dictionary.xml', 'configs/dictionaries/sqlite3_odbc_cached_dictionary.xml', 'configs/dictionaries/postgres_odbc_hashed_dictionary.xml'])
create_table_sql_template = """
CREATE TABLE `clickhouse`.`{}` (
......@@ -31,6 +33,17 @@ def create_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(create_table_sql_template.format(table_name))
def get_postgres_conn():
conn_string = "host='localhost' user='postgres' password='mysecretpassword'"
conn = psycopg2.connect(conn_string)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
return conn
def create_postgres_db(conn, name):
with conn.cursor() as cursor:
cursor.execute(
"CREATE SCHEMA {}".format(name))
@pytest.fixture(scope="module")
def started_cluster():
try:
......@@ -41,9 +54,13 @@ def started_cluster():
node1.exec_in_container(["bash", "-c", "echo 'CREATE TABLE t2(X INTEGER PRIMARY KEY ASC, Y, Z);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
node1.exec_in_container(["bash", "-c", "echo 'CREATE TABLE t3(X INTEGER PRIMARY KEY ASC, Y, Z);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
node1.exec_in_container(["bash", "-c", "echo 'CREATE TABLE t4(X INTEGER PRIMARY KEY ASC, Y, Z);' | sqlite3 {}".format(sqlite_db)], privileged=True, user='root')
conn = get_mysql_conn()
mysql_conn = get_mysql_conn()
## create mysql db and table
create_mysql_db(conn, 'clickhouse')
create_mysql_db(mysql_conn, 'clickhouse')
postgres_conn = get_postgres_conn()
create_postgres_db(postgres_conn, 'clickhouse')
yield cluster
......@@ -141,3 +158,13 @@ def test_sqlite_odbc_cached_dictionary(started_cluster):
time.sleep(5)
assert node1.query("select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(1))") == "12\n"
def test_postgres_odbc_hached_dictionary_with_schema(started_cluster):
conn = get_postgres_conn()
cursor = conn.cursor()
cursor.execute("drop table if exists clickhouse.test_table")
cursor.execute("create table if not exists clickhouse.test_table (column1 int primary key, column2 varchar(40) not null)")
cursor.execute("insert into clickhouse.test_table values(1, 'hello'),(2, 'world')")
time.sleep(5)
assert node1.query("select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(1))") == "hello\n"
assert node1.query("select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(2))") == "world\n"
......@@ -111,7 +111,7 @@ Example of settings:
```xml
<odbc>
<db>DatabaseName</db>
<table>TableName</table>
<table>ShemaName.TableName</table>
<connection_string>DSN=some_parameters</connection_string>
<invalidate_query>SQL_QUERY</invalidate_query>
</odbc>
......@@ -120,10 +120,11 @@ Example of settings:
Setting fields:
- `db` – Name of the database. Omit it if the database name is set in the `<connection_string>` parameters.
- `table` – Name of the table.
- `table` – Name of the table and schema if exists.
- `connection_string` – Connection string.
- `invalidate_query` – Query for checking the dictionary status. Optional parameter. Read more in the section [Updating dictionaries](external_dicts_dict_lifetime.md#dicts-external_dicts_dict_lifetime).
ClickHouse receives quoting symbols from ODBC-driver and quote all settings in queries to driver, so it's necessary to set table name accordingly to table name case in database.
### Known vulnerability of the ODBC dictionary functionality
......
......@@ -111,7 +111,7 @@
```xml
<odbc>
<db>DatabaseName</db>
<table>TableName</table>
<table>ShemaName.TableName</table>
<connection_string>DSN=some_parameters</connection_string>
<invalidate_query>SQL_QUERY</invalidate_query>
</odbc>
......@@ -119,11 +119,13 @@
Поля настройки:
- `db` - имя базы данных. Не указывать, если имя базы задано в параметрах `<connection_string>`.
- `table` - имя таблицы.
- `db` - имя базы данных. Не указывать, если имя базы задано в параметрах. `<connection_string>`.
- `table` - имя таблицы и схемы, если она есть.
- `connection_string` - строка соединения.
- `invalidate_query` - запрос для проверки статуса словаря. Необязательный параметр. Читайте подробнее в разделе [Обновление словарей](external_dicts_dict_lifetime.md#dicts-external_dicts_dict_lifetime).
ClickHouse получает от ODBC-драйвера информацию о квотировании и квотирует настройки в запросах к драйверу, поэтому имя таблицы нужно указывать в соответствии с регистром имени таблицы в базе данных.
### Выявленная уязвимость в функционировании ODBC словарей
!!! attention
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册