提交 ff07ffbb 编写于 作者: A Alexey Milovidov

Merge branch 'merging_redis' of https://github.com/CurtizJ/ClickHouse into CurtizJ-merging_redis

......@@ -18,6 +18,9 @@ if (NOT DEFINED ENABLE_POCO_MONGODB OR ENABLE_POCO_MONGODB)
else ()
set(ENABLE_POCO_MONGODB 0 CACHE BOOL "")
endif ()
if (NOT DEFINED ENABLE_POCO_REDIS OR ENABLE_POCO_REDIS)
list (APPEND POCO_COMPONENTS Redis)
endif ()
# TODO: after new poco release with SQL library rename ENABLE_POCO_ODBC -> ENABLE_POCO_SQLODBC
if (NOT DEFINED ENABLE_POCO_ODBC OR ENABLE_POCO_ODBC)
list (APPEND POCO_COMPONENTS DataODBC)
......@@ -35,7 +38,6 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY)
set (ENABLE_ZIP 0 CACHE BOOL "")
set (ENABLE_PAGECOMPILER 0 CACHE BOOL "")
set (ENABLE_PAGECOMPILER_FILE2PAGE 0 CACHE BOOL "")
set (ENABLE_REDIS 0 CACHE BOOL "")
set (ENABLE_DATA_SQLITE 0 CACHE BOOL "")
set (ENABLE_DATA_MYSQL 0 CACHE BOOL "")
set (ENABLE_DATA_POSTGRESQL 0 CACHE BOOL "")
......@@ -46,7 +48,6 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY)
set (POCO_ENABLE_ZIP 0 CACHE BOOL "")
set (POCO_ENABLE_PAGECOMPILER 0 CACHE BOOL "")
set (POCO_ENABLE_PAGECOMPILER_FILE2PAGE 0 CACHE BOOL "")
set (POCO_ENABLE_REDIS 0 CACHE BOOL "")
set (POCO_ENABLE_SQL_SQLITE 0 CACHE BOOL "")
set (POCO_ENABLE_SQL_MYSQL 0 CACHE BOOL "")
set (POCO_ENABLE_SQL_POSTGRESQL 0 CACHE BOOL "")
......@@ -69,6 +70,11 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY)
set (Poco_MongoDB_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/poco/MongoDB/include/")
endif ()
if (NOT DEFINED ENABLE_POCO_REDIS OR ENABLE_POCO_REDIS)
set (Poco_Redis_LIBRARY PocoRedis)
set (Poco_Redis_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/poco/Redis/include/")
endif ()
if (EXISTS "${ClickHouse_SOURCE_DIR}/contrib/poco/SQL/ODBC/include/")
set (Poco_SQL_FOUND 1)
set (Poco_SQL_LIBRARY PocoSQL)
......@@ -122,6 +128,9 @@ endif ()
if (Poco_MongoDB_LIBRARY)
set (USE_POCO_MONGODB 1)
endif ()
if (Poco_Redis_LIBRARY)
set (USE_POCO_REDIS 1)
endif ()
if (Poco_DataODBC_LIBRARY AND ODBC_FOUND)
set (USE_POCO_DATAODBC 1)
endif ()
......@@ -129,7 +138,7 @@ if (Poco_SQLODBC_LIBRARY AND ODBC_FOUND)
set (USE_POCO_SQLODBC 1)
endif ()
message(STATUS "Using Poco: ${Poco_INCLUDE_DIRS} : ${Poco_Foundation_LIBRARY},${Poco_Util_LIBRARY},${Poco_Net_LIBRARY},${Poco_NetSSL_LIBRARY},${Poco_Crypto_LIBRARY},${Poco_XML_LIBRARY},${Poco_Data_LIBRARY},${Poco_DataODBC_LIBRARY},${Poco_SQL_LIBRARY},${Poco_SQLODBC_LIBRARY},${Poco_MongoDB_LIBRARY}; MongoDB=${USE_POCO_MONGODB}, DataODBC=${USE_POCO_DATAODBC}, NetSSL=${USE_POCO_NETSSL}")
message(STATUS "Using Poco: ${Poco_INCLUDE_DIRS} : ${Poco_Foundation_LIBRARY},${Poco_Util_LIBRARY},${Poco_Net_LIBRARY},${Poco_NetSSL_LIBRARY},${Poco_Crypto_LIBRARY},${Poco_XML_LIBRARY},${Poco_Data_LIBRARY},${Poco_DataODBC_LIBRARY},${Poco_SQL_LIBRARY},${Poco_SQLODBC_LIBRARY},${Poco_MongoDB_LIBRARY},${Poco_Redis_LIBRARY}; MongoDB=${USE_POCO_MONGODB}, Redis=${USE_POCO_REDIS}, DataODBC=${USE_POCO_DATAODBC}, NetSSL=${USE_POCO_NETSSL}")
# How to make sutable poco:
# use branch:
......
......@@ -458,6 +458,7 @@ namespace ErrorCodes
extern const int PATH_ACCESS_DENIED = 481;
extern const int DICTIONARY_ACCESS_DENIED = 482;
extern const int TOO_MANY_REDIRECTS = 483;
extern const int INTERNAL_REDIS_ERROR = 484;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
......
......@@ -9,6 +9,7 @@
#cmakedefine01 USE_POCO_SQLODBC
#cmakedefine01 USE_POCO_DATAODBC
#cmakedefine01 USE_POCO_MONGODB
#cmakedefine01 USE_POCO_REDIS
#cmakedefine01 USE_INTERNAL_LLVM_LIBRARY
#cmakedefine01 USE_SSL
......@@ -39,6 +39,14 @@ if(USE_POCO_MONGODB)
target_link_libraries(clickhouse_dictionaries PRIVATE ${Poco_MongoDB_LIBRARY})
endif()
if(USE_POCO_REDIS)
# for code highlighting in CLion
# target_include_directories(clickhouse_dictionaries SYSTEM PRIVATE ${Poco_Redis_INCLUDE_DIR})
# for build
target_link_libraries(clickhouse_dictionaries PRIVATE ${Poco_Redis_LIBRARY})
endif()
add_subdirectory(Embedded)
target_include_directories(clickhouse_dictionaries SYSTEM PRIVATE ${SPARSEHASH_INCLUDE_DIR})
#include "RedisBlockInputStream.h"
#if USE_POCO_REDIS
# include <string>
# include <vector>
# include <Poco/Redis/Array.h>
# include <Poco/Redis/Client.h>
# include <Poco/Redis/Command.h>
# include <Poco/Redis/Type.h>
# include <Columns/ColumnNullable.h>
# include <Columns/ColumnString.h>
# include <Columns/ColumnsNumber.h>
# include <IO/ReadHelpers.h>
# include <IO/WriteHelpers.h>
# include <ext/range.h>
# include "DictionaryStructure.h"
namespace DB
{
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
extern const int LOGICAL_ERROR;
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int INTERNAL_REDIS_ERROR;
}
RedisBlockInputStream::RedisBlockInputStream(
const std::shared_ptr<Poco::Redis::Client> & client_,
const RedisArray & keys_,
const RedisStorageType & storage_type_,
const DB::Block & sample_block,
const size_t max_block_size_)
: client(client_), keys(keys_), storage_type(storage_type_), max_block_size{max_block_size_}
{
description.init(sample_block);
}
RedisBlockInputStream::~RedisBlockInputStream() = default;
namespace
{
using ValueType = ExternalResultDescription::ValueType;
template <typename T>
inline void insert(IColumn & column, const String & stringValue)
{
assert_cast<ColumnVector<T> &>(column).insertValue(parse<T>(stringValue));
}
void insertValue(IColumn & column, const ValueType type, const Poco::Redis::BulkString & bulk_string)
{
if (bulk_string.isNull())
throw Exception{"Type mismatch, expected not Null String", ErrorCodes::TYPE_MISMATCH};
String stringValue = bulk_string.value();
switch (type)
{
case ValueType::vtUInt8:
insert<UInt8>(column, stringValue);
break;
case ValueType::vtUInt16:
insert<UInt16>(column, stringValue);
break;
case ValueType::vtUInt32:
insert<UInt32>(column, stringValue);
break;
case ValueType::vtUInt64:
insert<UInt64>(column, stringValue);
break;
case ValueType::vtInt8:
insert<Int8>(column, stringValue);
break;
case ValueType::vtInt16:
insert<Int16>(column, stringValue);
break;
case ValueType::vtInt32:
insert<Int32>(column, stringValue);
break;
case ValueType::vtInt64:
insert<Int64>(column, stringValue);
break;
case ValueType::vtFloat32:
insert<Float32>(column, stringValue);
break;
case ValueType::vtFloat64:
insert<Float64>(column, stringValue);
break;
case ValueType::vtString:
assert_cast<ColumnString &>(column).insert(parse<String>(stringValue));
break;
case ValueType::vtDate:
assert_cast<ColumnUInt16 &>(column).insertValue(parse<LocalDate>(stringValue).getDayNum());
break;
case ValueType::vtDateTime:
assert_cast<ColumnUInt32 &>(column).insertValue(static_cast<UInt32>(parse<LocalDateTime>(stringValue)));
break;
case ValueType::vtUUID:
assert_cast<ColumnUInt128 &>(column).insertValue(parse<UUID>(stringValue));
break;
}
}
}
Block RedisBlockInputStream::readImpl()
{
if (keys.isNull() || description.sample_block.rows() == 0 || cursor >= keys.size())
all_read = true;
if (all_read)
return {};
const size_t size = description.sample_block.columns();
MutableColumns columns(size);
for (const auto i : ext::range(0, size))
columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty();
const auto insertValueByIdx = [this, &columns](size_t idx, const auto & value)
{
if (description.types[idx].second)
{
ColumnNullable & column_nullable = static_cast<ColumnNullable &>(*columns[idx]);
insertValue(column_nullable.getNestedColumn(), description.types[idx].first, value);
column_nullable.getNullMapData().emplace_back(0);
}
else
insertValue(*columns[idx], description.types[idx].first, value);
};
if (storage_type == RedisStorageType::HASH_MAP)
{
size_t num_rows = 0;
for (; cursor < keys.size(); ++cursor)
{
const auto & keys_array = keys.get<RedisArray>(cursor);
if (keys_array.size() < 2)
{
throw Exception{"Too low keys in request to source: " + DB::toString(keys_array.size())
+ ", expected 2 or more", ErrorCodes::LOGICAL_ERROR};
}
if (num_rows + keys_array.size() - 1 > max_block_size)
break;
Poco::Redis::Command command_for_values("HMGET");
for (auto it = keys_array.begin(); it != keys_array.end(); ++it)
command_for_values.addRedisType(*it);
auto values = client->execute<RedisArray>(command_for_values);
if (keys_array.size() != values.size() + 1) // 'HMGET' primary_key secondary_keys
throw Exception{"Inconsistent sizes of keys and values in Redis request",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
const auto & primary_key = keys_array.get<RedisBulkString>(0);
for (size_t i = 0; i < values.size(); ++i)
{
const auto & secondary_key = keys_array.get<RedisBulkString>(i + 1);
const auto & value = values.get<RedisBulkString>(i);
/// null string means 'no value for requested key'
if (!value.isNull())
{
insertValueByIdx(0, primary_key);
insertValueByIdx(1, secondary_key);
insertValueByIdx(2, value);
++num_rows;
}
}
}
}
else
{
Poco::Redis::Command command_for_values("MGET");
size_t need_values = std::min(max_block_size, keys.size() - cursor);
for (size_t i = 0; i < need_values; ++i)
command_for_values.add(keys.get<RedisBulkString>(cursor + i));
auto values = client->execute<RedisArray>(command_for_values);
if (values.size() != need_values)
throw Exception{"Inconsistent sizes of keys and values in Redis request", ErrorCodes::INTERNAL_REDIS_ERROR};
for (size_t i = 0; i < values.size(); ++i)
{
const auto & key = keys.get<RedisBulkString>(cursor + i);
const auto & value = values.get<RedisBulkString>(i);
/// Null string means 'no value for requested key'
if (!value.isNull())
{
insertValueByIdx(0, key);
insertValueByIdx(1, value);
}
}
cursor += need_values;
}
return description.sample_block.cloneWithColumns(std::move(columns));
}
}
#endif
#pragma once
#include "config_core.h"
#include <Core/Block.h>
#if USE_POCO_REDIS
# include <Core/ExternalResultDescription.h>
# include <DataStreams/IBlockInputStream.h>
# include "RedisDictionarySource.h"
# include <Poco/Redis/Array.h>
# include <Poco/Redis/Type.h>
namespace Poco
{
namespace Redis
{
class Client;
}
}
namespace DB
{
class RedisBlockInputStream final : public IBlockInputStream
{
public:
using RedisArray = Poco::Redis::Array;
using RedisBulkString = Poco::Redis::BulkString;
RedisBlockInputStream(
const std::shared_ptr<Poco::Redis::Client> & client_,
const Poco::Redis::Array & keys_,
const RedisStorageType & storage_type_,
const Block & sample_block,
const size_t max_block_size);
~RedisBlockInputStream() override;
String getName() const override { return "Redis"; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
private:
Block readImpl() override;
std::shared_ptr<Poco::Redis::Client> client;
Poco::Redis::Array keys;
RedisStorageType storage_type;
const size_t max_block_size;
ExternalResultDescription description;
size_t cursor = 0;
bool all_read = false;
};
}
#endif
#include "RedisDictionarySource.h"
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
namespace DB
{
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
}
void registerDictionarySourceRedis(DictionarySourceFactory & factory)
{
auto createTableSource = [=](const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
Block & sample_block,
const Context & /* context */) -> DictionarySourcePtr {
#if USE_POCO_REDIS
return std::make_unique<RedisDictionarySource>(dict_struct, config, config_prefix + ".redis", sample_block);
#else
UNUSED(dict_struct);
UNUSED(config);
UNUSED(config_prefix);
UNUSED(sample_block);
throw Exception{"Dictionary source of type `redis` is disabled because poco library was built without redis support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
};
factory.registerSource("redis", createTableSource);
}
}
#if USE_POCO_REDIS
# include <Poco/Redis/Array.h>
# include <Poco/Redis/Client.h>
# include <Poco/Redis/Command.h>
# include <Poco/Redis/Type.h>
# include <Poco/Util/AbstractConfiguration.h>
# include <Common/FieldVisitors.h>
# include <IO/WriteHelpers.h>
# include "RedisBlockInputStream.h"
namespace DB
{
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
extern const int INVALID_CONFIG_PARAMETER;
extern const int INTERNAL_REDIS_ERROR;
}
static const size_t max_block_size = 8192;
RedisDictionarySource::RedisDictionarySource(
const DictionaryStructure & dict_struct_,
const String & host_,
UInt16 port_,
UInt8 db_index_,
RedisStorageType storage_type_,
const Block & sample_block_)
: dict_struct{dict_struct_}
, host{host_}
, port{port_}
, db_index{db_index_}
, storage_type{storage_type_}
, sample_block{sample_block_}
, client{std::make_shared<Poco::Redis::Client>(host, port)}
{
if (dict_struct.attributes.size() != 1)
throw Exception{"Invalid number of non key columns for Redis source: " +
DB::toString(dict_struct.attributes.size()) + ", expected 1",
ErrorCodes::INVALID_CONFIG_PARAMETER};
if (storage_type == RedisStorageType::HASH_MAP)
{
if (!dict_struct.key)
throw Exception{"Redis source with storage type \'hash_map\' must have key",
ErrorCodes::INVALID_CONFIG_PARAMETER};
if (dict_struct.key->size() != 2)
throw Exception{"Redis source with storage type \'hash_map\' requiers 2 keys",
ErrorCodes::INVALID_CONFIG_PARAMETER};
// suppose key[0] is primary key, key[1] is secondary key
}
if (db_index != 0)
{
RedisCommand command("SELECT");
command << static_cast<Int64>(db_index);
String reply = client->execute<String>(command);
if (reply != "+OK\r\n")
throw Exception{"Selecting database with index " + DB::toString(db_index)
+ " failed with reason " + reply, ErrorCodes::INTERNAL_REDIS_ERROR};
}
}
RedisDictionarySource::RedisDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config_,
const String & config_prefix_,
Block & sample_block_)
: RedisDictionarySource(
dict_struct_,
config_.getString(config_prefix_ + ".host"),
config_.getUInt(config_prefix_ + ".port"),
config_.getUInt(config_prefix_ + ".db_index", 0),
parseStorageType(config_.getString(config_prefix_ + ".storage_type", "")),
sample_block_)
{
}
RedisDictionarySource::RedisDictionarySource(const RedisDictionarySource & other)
: RedisDictionarySource{other.dict_struct,
other.host,
other.port,
other.db_index,
other.storage_type,
other.sample_block}
{
}
RedisDictionarySource::~RedisDictionarySource() = default;
static String storageTypeToKeyType(RedisStorageType type)
{
switch (type)
{
case RedisStorageType::SIMPLE:
return "string";
case RedisStorageType::HASH_MAP:
return "hash";
default:
return "none";
}
__builtin_unreachable();
}
BlockInputStreamPtr RedisDictionarySource::loadAll()
{
RedisCommand command_for_keys("KEYS");
command_for_keys << "*";
/// Get only keys for specified storage type.
auto all_keys = client->execute<RedisArray>(command_for_keys);
if (all_keys.isNull())
return std::make_shared<RedisBlockInputStream>(client, RedisArray{}, storage_type, sample_block, max_block_size);
RedisArray keys;
auto key_type = storageTypeToKeyType(storage_type);
for (auto & key : all_keys)
if (key_type == client->execute<String>(RedisCommand("TYPE").addRedisType(key)))
keys.addRedisType(std::move(key));
if (storage_type == RedisStorageType::HASH_MAP)
{
RedisArray hkeys;
for (const auto & key : keys)
{
RedisCommand command_for_secondary_keys("HKEYS");
command_for_secondary_keys.addRedisType(key);
auto secondary_keys = client->execute<RedisArray>(command_for_secondary_keys);
RedisArray primary_with_secondary;
primary_with_secondary.addRedisType(key);
for (const auto & secondary_key : secondary_keys)
{
primary_with_secondary.addRedisType(secondary_key);
/// Do not store more than max_block_size values for one request.
if (primary_with_secondary.size() == max_block_size + 1)
{
hkeys.add(std::move(primary_with_secondary));
primary_with_secondary.clear();
primary_with_secondary.addRedisType(key);
}
}
if (primary_with_secondary.size() > 1)
hkeys.add(std::move(primary_with_secondary));
}
keys = std::move(hkeys);
}
return std::make_shared<RedisBlockInputStream>(client, std::move(keys), storage_type, sample_block, max_block_size);
}
BlockInputStreamPtr RedisDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
if (storage_type != RedisStorageType::SIMPLE)
throw Exception{"Cannot use loadIds with \'simple\' storage type", ErrorCodes::UNSUPPORTED_METHOD};
if (!dict_struct.id)
throw Exception{"'id' is required for selective loading", ErrorCodes::UNSUPPORTED_METHOD};
RedisArray keys;
for (UInt64 id : ids)
keys << DB::toString(id);
return std::make_shared<RedisBlockInputStream>(client, std::move(keys), storage_type, sample_block, max_block_size);
}
String RedisDictionarySource::toString() const
{
return "Redis: " + host + ':' + DB::toString(port);
}
RedisStorageType RedisDictionarySource::parseStorageType(const String & storage_type_str)
{
if (storage_type_str == "hash_map")
return RedisStorageType::HASH_MAP;
else if (!storage_type_str.empty() && storage_type_str != "simple")
throw Exception("Unknown storage type " + storage_type_str + " for Redis dictionary", ErrorCodes::INVALID_CONFIG_PARAMETER);
return RedisStorageType::SIMPLE;
}
}
#endif
#pragma once
#include "config_core.h"
#include <Core/Block.h>
#if USE_POCO_REDIS
# include "DictionaryStructure.h"
# include "IDictionarySource.h"
namespace Poco
{
namespace Util
{
class AbstractConfiguration;
}
namespace Redis
{
class Client;
class Array;
class Command;
}
}
namespace DB
{
enum class RedisStorageType
{
SIMPLE,
HASH_MAP,
UNKNOWN
};
class RedisDictionarySource final : public IDictionarySource
{
RedisDictionarySource(
const DictionaryStructure & dict_struct,
const std::string & host,
UInt16 port,
UInt8 db_index,
RedisStorageType storage_type,
const Block & sample_block);
public:
using RedisArray = Poco::Redis::Array;
using RedisCommand = Poco::Redis::Command;
RedisDictionarySource(
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block);
RedisDictionarySource(const RedisDictionarySource & other);
~RedisDictionarySource() override;
BlockInputStreamPtr loadAll() override;
BlockInputStreamPtr loadUpdatedAll() override
{
throw Exception{"Method loadUpdatedAll is unsupported for RedisDictionarySource", ErrorCodes::NOT_IMPLEMENTED};
}
bool supportsSelectiveLoad() const override { return true; }
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & /* key_columns */, const std::vector<size_t> & /* requested_rows */) override
{
// Redis does not support native indexing
throw Exception{"Method loadKeys is unsupported for RedisDictionarySource", ErrorCodes::NOT_IMPLEMENTED};
}
bool isModified() const override { return true; }
bool hasUpdateField() const override { return false; }
DictionarySourcePtr clone() const override { return std::make_unique<RedisDictionarySource>(*this); }
std::string toString() const override;
private:
static RedisStorageType parseStorageType(const std::string& storage_type);
private:
const DictionaryStructure dict_struct;
const std::string host;
const UInt16 port;
const UInt8 db_index;
const RedisStorageType storage_type;
Block sample_block;
std::shared_ptr<Poco::Redis::Client> client;
};
}
#endif
......@@ -7,6 +7,7 @@ void registerDictionarySourceFile(DictionarySourceFactory & source_factory);
void registerDictionarySourceMysql(DictionarySourceFactory & source_factory);
void registerDictionarySourceClickHouse(DictionarySourceFactory & source_factory);
void registerDictionarySourceMongoDB(DictionarySourceFactory & source_factory);
void registerDictionarySourceRedis(DictionarySourceFactory & source_factory);
void registerDictionarySourceXDBC(DictionarySourceFactory & source_factory);
void registerDictionarySourceJDBC(DictionarySourceFactory & source_factory);
void registerDictionarySourceExecutable(DictionarySourceFactory & source_factory);
......@@ -30,6 +31,7 @@ void registerDictionaries()
registerDictionarySourceMysql(source_factory);
registerDictionarySourceClickHouse(source_factory);
registerDictionarySourceMongoDB(source_factory);
registerDictionarySourceRedis(source_factory);
registerDictionarySourceXDBC(source_factory);
registerDictionarySourceJDBC(source_factory);
registerDictionarySourceExecutable(source_factory);
......
......@@ -61,6 +61,7 @@ const char * auto_config_build[]
"USE_SSL", "@USE_SSL@",
"USE_HYPERSCAN", "@USE_HYPERSCAN@",
"USE_SIMDJSON", "@USE_SIMDJSON@",
"USE_POCO_REDIS", "@USE_POCO_REDIS@",
nullptr, nullptr
};
......@@ -103,6 +103,7 @@ class ClickHouseCluster:
self.with_hdfs = False
self.with_mongo = False
self.with_net_trics = False
self.with_redis = False
self.docker_client = None
self.is_up = False
......@@ -114,7 +115,7 @@ class ClickHouseCluster:
cmd += " client"
return cmd
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, tmpfs=[]):
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macros={}, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False, with_redis=False, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test", stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, tmpfs=[]):
"""Add an instance to the cluster.
name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
......@@ -132,7 +133,7 @@ class ClickHouseCluster:
instance = ClickHouseInstance(
self, self.base_dir, name, config_dir, main_configs, user_configs, macros, with_zookeeper,
self.zookeeper_config_path, with_mysql, with_kafka, with_mongo, self.base_configs_dir, self.server_bin_path,
self.zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, self.base_configs_dir, self.server_bin_path,
self.odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=hostname,
env_variables=env_variables, image=image, stay_alive=stay_alive, ipv4_address=ipv4_address, ipv6_address=ipv6_address,
with_installed_binary=with_installed_binary, tmpfs=tmpfs)
......@@ -208,6 +209,13 @@ class ClickHouseCluster:
for cmd in cmds:
cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_net.yml')])
if with_redis and not self.with_redis:
self.with_redis = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_redis.yml')])
self.base_redis_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_redis.yml')]
return instance
......@@ -367,6 +375,11 @@ class ClickHouseCluster:
subprocess_check_call(self.base_mongo_cmd + common_opts)
self.wait_mongo_to_start(30)
if self.with_redis and self.base_redis_cmd:
subprocess_check_call(self.base_redis_cmd + ['up', '-d', '--force-recreate'])
time.sleep(10)
subprocess_check_call(self.base_cmd + ['up', '-d', '--no-recreate'])
start_deadline = time.time() + 20.0 # seconds
......@@ -459,7 +472,7 @@ class ClickHouseInstance:
def __init__(
self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros,
with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_mongo, base_configs_dir, server_bin_path, odbc_bridge_bin_path,
with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, base_configs_dir, server_bin_path, odbc_bridge_bin_path,
clickhouse_path_dir, with_odbc_drivers, hostname=None, env_variables={}, image="yandex/clickhouse-integration-test",
stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, tmpfs=[]):
......@@ -485,6 +498,7 @@ class ClickHouseInstance:
self.with_mysql = with_mysql
self.with_kafka = with_kafka
self.with_mongo = with_mongo
self.with_redis = with_redis
self.path = p.join(self.cluster.instances_dir, name)
self.docker_compose_path = p.join(self.path, 'docker_compose.yml')
......
version: '2.2'
services:
redis1:
image: redis
restart: always
ports:
- 6380:6379
#-*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
import copy
......@@ -8,8 +8,10 @@ class Layout(object):
'hashed': '<hashed/>',
'cache': '<cache><size_in_cells>128</size_in_cells></cache>',
'complex_key_hashed': '<complex_key_hashed/>',
'complex_key_hashed_one_key': '<complex_key_hashed/>',
'complex_key_hashed_two_keys': '<complex_key_hashed/>',
'complex_key_cache': '<complex_key_cache><size_in_cells>128</size_in_cells></complex_key_cache>',
'range_hashed': '<range_hashed/>'
'range_hashed': '<range_hashed/>',
}
def __init__(self, name):
......@@ -18,13 +20,13 @@ class Layout(object):
self.is_simple = False
self.is_ranged = False
if self.name.startswith('complex'):
self.layout_type = "complex"
self.layout_type = 'complex'
self.is_complex = True
elif name.startswith("range"):
self.layout_type = "ranged"
elif name.startswith('range'):
self.layout_type = 'ranged'
self.is_ranged = True
else:
self.layout_type = "simple"
self.layout_type = 'simple'
self.is_simple = True
def get_str(self):
......@@ -33,8 +35,7 @@ class Layout(object):
def get_key_block_name(self):
if self.is_complex:
return 'key'
else:
return 'id'
return 'id'
class Row(object):
......@@ -43,9 +44,15 @@ class Row(object):
for field, value in zip(fields, values):
self.data[field.name] = value
def has_field(self, name):
return name in self.data
def get_value_by_name(self, name):
return self.data[name]
def set_value(self, name, value):
self.data[name] = value
class Field(object):
def __init__(self, name, field_type, is_key=False, is_range_key=False, default=None, hierarchical=False, range_hash_type=None, default_value_for_get=None):
......@@ -93,6 +100,8 @@ class DictionaryStructure(object):
self.range_key = None
self.ordinary_fields = []
self.range_fields = []
self.has_hierarchy = False
for field in fields:
if field.is_key:
self.keys.append(field)
......@@ -100,6 +109,9 @@ class DictionaryStructure(object):
self.range_fields.append(field)
else:
self.ordinary_fields.append(field)
if field.hierarchical:
self.has_hierarchy = True
if field.is_range_key:
if self.range_key is not None:
......@@ -116,11 +128,12 @@ class DictionaryStructure(object):
fields_strs = []
for field in self.ordinary_fields:
fields_strs.append(field.get_attribute_str())
key_strs = []
if self.layout.is_complex:
for key_field in self.keys:
key_strs.append(key_field.get_attribute_str())
else: # same for simple and ranged
else: # same for simple and ranged
for key_field in self.keys:
key_strs.append(key_field.get_simple_index_str())
......@@ -179,7 +192,7 @@ class DictionaryStructure(object):
if isinstance(val, str):
val = "'" + val + "'"
key_exprs_strs.append('to{type}({value})'.format(type=key.field_type, value=val))
key_expr = ', (' + ','.join(key_exprs_strs) + ')'
key_expr = ', tuple(' + ','.join(key_exprs_strs) + ')'
date_expr = ''
if self.layout.is_ranged:
......@@ -280,12 +293,13 @@ class DictionaryStructure(object):
class Dictionary(object):
def __init__(self, name, structure, source, config_path, table_name):
def __init__(self, name, structure, source, config_path, table_name, fields):
self.name = name
self.structure = copy.deepcopy(structure)
self.source = copy.deepcopy(source)
self.config_path = config_path
self.table_name = table_name
self.fields = fields
def generate_config(self):
with open(self.config_path, 'w') as result:
......@@ -335,3 +349,6 @@ class Dictionary(object):
def is_complex(self):
return self.structure.layout.is_complex
def get_fields(self):
return self.fields
......@@ -2,6 +2,8 @@
import warnings
import pymysql.cursors
import pymongo
import redis
import aerospike
from tzlocal import get_localzone
import datetime
import os
......@@ -372,3 +374,122 @@ class SourceHTTP(SourceHTTPBase):
class SourceHTTPS(SourceHTTPBase):
def _get_schema(self):
return "https"
class SourceRedis(ExternalSource):
def __init__(
self, name, internal_hostname, internal_port, docker_hostname, docker_port, user, password, storage_type
):
super(SourceRedis, self).__init__(
name, internal_hostname, internal_port, docker_hostname, docker_port, user, password
)
self.storage_type = storage_type
def get_source_str(self, table_name):
return '''
<redis>
<host>{host}</host>
<port>{port}</port>
<db_index>0</db_index>
<storage_type>{storage_type}</storage_type>
</redis>
'''.format(
host=self.docker_hostname,
port=self.docker_port,
storage_type=self.storage_type, # simple or hash_map
)
def prepare(self, structure, table_name, cluster):
self.client = redis.StrictRedis(host=self.internal_hostname, port=self.internal_port)
self.prepared = True
self.ordered_names = structure.get_ordered_names()
def load_data(self, data, table_name):
self.client.flushdb()
for row in list(data):
values = []
for name in self.ordered_names:
values.append(str(row.data[name]))
print 'values: ', values
if len(values) == 2:
self.client.set(*values)
print 'kek: ', self.client.get(values[0])
else:
self.client.hset(*values)
def compatible_with_layout(self, layout):
if (
layout.is_simple and self.storage_type == "simple" or
layout.is_complex and self.storage_type == "simple" and layout.name == "complex_key_hashed_one_key" or
layout.is_complex and self.storage_type == "hash_map" and layout.name == "complex_key_hashed_two_keys"
):
return True
return False
class SourceAerospike(ExternalSource):
def __init__(self, name, internal_hostname, internal_port,
docker_hostname, docker_port, user, password):
ExternalSource.__init__(self, name, internal_hostname, internal_port,
docker_hostname, docker_port, user, password)
self.namespace = "test"
self.set = "test_set"
def get_source_str(self, table_name):
print("AEROSPIKE get source str")
return '''
<aerospike>
<host>{host}</host>
<port>{port}</port>
</aerospike>
'''.format(
host=self.docker_hostname,
port=self.docker_port,
)
def prepare(self, structure, table_name, cluster):
config = {
'hosts': [ (self.internal_hostname, self.internal_port) ]
}
self.client = aerospike.client(config).connect()
self.prepared = True
print("PREPARED AEROSPIKE")
print(config)
def compatible_with_layout(self, layout):
print("compatible AEROSPIKE")
return layout.is_simple
def _flush_aerospike_db(self):
keys = []
def handle_record((key, metadata, record)):
print("Handle record {} {}".format(key, record))
keys.append(key)
def print_record((key, metadata, record)):
print("Print record {} {}".format(key, record))
scan = self.client.scan(self.namespace, self.set)
scan.foreach(handle_record)
[self.client.remove(key) for key in keys]
def load_kv_data(self, values):
self._flush_aerospike_db()
print("Load KV Data Aerospike")
if len(values[0]) == 2:
for value in values:
key = (self.namespace, self.set, value[0])
print(key)
self.client.put(key, {"bin_value": value[1]}, policy={"key": aerospike.POLICY_KEY_SEND})
assert self.client.exists(key)
else:
assert("VALUES SIZE != 2")
# print(values)
def load_data(self, data, table_name):
print("Load Data Aerospike")
# print(data)
import pytest
import os
import time
from helpers.cluster import ClickHouseCluster
from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout
from external_sources import SourceMySQL, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed, SourceMongo
from external_sources import SourceHTTP, SourceHTTPS
from external_sources import SourceMySQL, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed
from external_sources import SourceMongo, SourceHTTP, SourceHTTPS, SourceRedis
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
dict_configs_path = os.path.join(SCRIPT_DIR, 'configs/dictionaries')
FIELDS = {
"simple": [
......@@ -66,9 +66,44 @@ FIELDS = {
Field("Float32_", 'Float32', default_value_for_get=555.11),
Field("Float64_", 'Float64', default_value_for_get=777.11),
]
}
VALUES = {
"simple": [
[1, 22, 333, 4444, 55555, -6, -77,
-888, -999, '550e8400-e29b-41d4-a716-446655440003',
'1973-06-28', '1985-02-28 23:43:25', 'hello', 22.543, 3332154213.4, 0],
[2, 3, 4, 5, 6, -7, -8,
-9, -10, '550e8400-e29b-41d4-a716-446655440002',
'1978-06-28', '1986-02-28 23:42:25', 'hello', 21.543, 3222154213.4, 1]
],
"complex": [
[1, 'world', 22, 333, 4444, 55555, -6,
-77, -888, -999, '550e8400-e29b-41d4-a716-446655440003',
'1973-06-28', '1985-02-28 23:43:25',
'hello', 22.543, 3332154213.4],
[2, 'qwerty2', 52, 2345, 6544, 9191991, -2,
-717, -81818, -92929, '550e8400-e29b-41d4-a716-446655440007',
'1975-09-28', '2000-02-28 23:33:24',
'my', 255.543, 3332221.44]
],
"ranged": [
[1, '2019-02-10', '2019-02-01', '2019-02-28',
22, 333, 4444, 55555, -6, -77, -888, -999,
'550e8400-e29b-41d4-a716-446655440003',
'1973-06-28', '1985-02-28 23:43:25', 'hello',
22.543, 3332154213.4],
[2, '2019-04-10', '2019-04-01', '2019-04-28',
11, 3223, 41444, 52515, -65, -747, -8388, -9099,
'550e8400-e29b-41d4-a716-446655440004',
'1973-06-29', '2002-02-28 23:23:25', '!!!!',
32.543, 3332543.4]
]
}
LAYOUTS = [
Layout("hashed"),
Layout("cache"),
......@@ -92,42 +127,67 @@ SOURCES = [
DICTIONARIES = []
# Key-value dictionaries with onle one possible field for key
SOURCES_KV = [
SourceRedis("RedisSimple", "localhost", "6380", "redis1", "6379", "", "", storage_type="simple"),
SourceRedis("RedisHash", "localhost", "6380", "redis1", "6379", "", "", storage_type="hash_map"),
]
DICTIONARIES_KV = []
cluster = None
node = None
def get_dict(source, layout, fields, suffix_name=''):
global dict_configs_path
structure = DictionaryStructure(layout, fields)
dict_name = source.name + "_" + layout.name + '_' + suffix_name
dict_path = os.path.join(dict_configs_path, dict_name + '.xml')
dictionary = Dictionary(dict_name, structure, source, dict_path, "table_" + dict_name, fields)
dictionary.generate_config()
return dictionary
def setup_module(module):
global DICTIONARIES
global cluster
global node
global dict_configs_path
dict_configs_path = os.path.join(SCRIPT_DIR, 'configs/dictionaries')
for f in os.listdir(dict_configs_path):
os.remove(os.path.join(dict_configs_path, f))
for layout in LAYOUTS:
for source in SOURCES:
if source.compatible_with_layout(layout):
structure = DictionaryStructure(layout, FIELDS[layout.layout_type])
dict_name = source.name + "_" + layout.name
dict_path = os.path.join(dict_configs_path, dict_name + '.xml')
dictionary = Dictionary(dict_name, structure, source, dict_path, "table_" + dict_name)
dictionary.generate_config()
DICTIONARIES.append(dictionary)
DICTIONARIES.append(get_dict(source, layout, FIELDS[layout.layout_type]))
else:
print "Source", source.name, "incompatible with layout", layout.name
for layout in LAYOUTS:
field_keys = list(filter(lambda x: x.is_key, FIELDS[layout.layout_type]))
for source in SOURCES_KV:
if not source.compatible_with_layout(layout):
print "Source", source.name, "incompatible with layout", layout.name
continue
for field in FIELDS[layout.layout_type]:
if not (field.is_key or field.is_range or field.is_range_key):
DICTIONARIES_KV.append(get_dict(source, layout, field_keys + [field], field.name))
main_configs = []
for fname in os.listdir(dict_configs_path):
main_configs.append(os.path.join(dict_configs_path, fname))
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True)
node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True, with_redis=True)
cluster.add_instance('clickhouse1')
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for dictionary in DICTIONARIES:
for dictionary in DICTIONARIES + DICTIONARIES_KV:
print "Preparing", dictionary.name
dictionary.prepare_source(cluster)
print "Prepared"
......@@ -140,16 +200,8 @@ def started_cluster():
def test_simple_dictionaries(started_cluster):
fields = FIELDS["simple"]
data = [
Row(fields,
[1, 22, 333, 4444, 55555, -6, -77,
-888, -999, '550e8400-e29b-41d4-a716-446655440003',
'1973-06-28', '1985-02-28 23:43:25', 'hello', 22.543, 3332154213.4, 0]),
Row(fields,
[2, 3, 4, 5, 6, -7, -8,
-9, -10, '550e8400-e29b-41d4-a716-446655440002',
'1978-06-28', '1986-02-28 23:42:25', 'hello', 21.543, 3222154213.4, 1]),
]
values = VALUES["simple"]
data = [Row(fields, vals) for vals in values]
simple_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "simple"]
for dct in simple_dicts:
......@@ -188,20 +240,11 @@ def test_simple_dictionaries(started_cluster):
answer = str(answer).replace(' ', '')
assert node.query(query) == str(answer) + '\n'
def test_complex_dictionaries(started_cluster):
fields = FIELDS["complex"]
data = [
Row(fields,
[1, 'world', 22, 333, 4444, 55555, -6,
-77, -888, -999, '550e8400-e29b-41d4-a716-446655440003',
'1973-06-28', '1985-02-28 23:43:25',
'hello', 22.543, 3332154213.4]),
Row(fields,
[2, 'qwerty2', 52, 2345, 6544, 9191991, -2,
-717, -81818, -92929, '550e8400-e29b-41d4-a716-446655440007',
'1975-09-28', '2000-02-28 23:33:24',
'my', 255.543, 3332221.44]),
]
values = VALUES["complex"]
data = [Row(fields, vals) for vals in values]
complex_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "complex"]
for dct in complex_dicts:
......@@ -227,22 +270,11 @@ def test_complex_dictionaries(started_cluster):
print query
assert node.query(query) == str(answer) + '\n'
def test_ranged_dictionaries(started_cluster):
fields = FIELDS["ranged"]
data = [
Row(fields,
[1, '2019-02-10', '2019-02-01', '2019-02-28',
22, 333, 4444, 55555, -6, -77, -888, -999,
'550e8400-e29b-41d4-a716-446655440003',
'1973-06-28', '1985-02-28 23:43:25', 'hello',
22.543, 3332154213.4]),
Row(fields,
[2, '2019-04-10', '2019-04-01', '2019-04-28',
11, 3223, 41444, 52515, -65, -747, -8388, -9099,
'550e8400-e29b-41d4-a716-446655440004',
'1973-06-29', '2002-02-28 23:23:25', '!!!!',
32.543, 3332543.4]),
]
values = VALUES["ranged"]
data = [Row(fields, vals) for vals in values]
ranged_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "ranged"]
for dct in ranged_dicts:
......@@ -261,3 +293,98 @@ def test_ranged_dictionaries(started_cluster):
for query, answer in queries_with_answers:
print query
assert node.query(query) == str(answer) + '\n'
def test_key_value_simple_dictionaries(started_cluster):
fields = FIELDS["simple"]
values = VALUES["simple"]
data = [Row(fields, vals) for vals in values]
simple_dicts = [d for d in DICTIONARIES_KV if d.structure.layout.layout_type == "simple"]
for dct in simple_dicts:
queries_with_answers = []
local_data = []
for row in data:
local_fields = dct.get_fields()
local_values = [row.get_value_by_name(field.name) for field in local_fields if row.has_field(field.name)]
local_data.append(Row(local_fields, local_values))
dct.load_data(local_data)
node.query("system reload dictionary {}".format(dct.name))
print 'name: ', dct.name
for row in local_data:
print dct.get_fields()
for field in dct.get_fields():
print field.name, field.is_key
if not field.is_key:
for query in dct.get_select_get_queries(field, row):
queries_with_answers.append((query, row.get_value_by_name(field.name)))
for query in dct.get_select_has_queries(field, row):
queries_with_answers.append((query, 1))
for query in dct.get_select_get_or_default_queries(field, row):
queries_with_answers.append((query, field.default_value_for_get))
if dct.structure.has_hierarchy:
for query in dct.get_hierarchical_queries(data[0]):
queries_with_answers.append((query, [1]))
for query in dct.get_hierarchical_queries(data[1]):
queries_with_answers.append((query, [2, 1]))
for query in dct.get_is_in_queries(data[0], data[1]):
queries_with_answers.append((query, 0))
for query in dct.get_is_in_queries(data[1], data[0]):
queries_with_answers.append((query, 1))
for query, answer in queries_with_answers:
print query
if isinstance(answer, list):
answer = str(answer).replace(' ', '')
assert node.query(query) == str(answer) + '\n'
def test_key_value_complex_dictionaries(started_cluster):
fields = FIELDS["complex"]
values = VALUES["complex"]
data = [Row(fields, vals) for vals in values]
complex_dicts = [d for d in DICTIONARIES if d.structure.layout.layout_type == "complex"]
for dct in complex_dicts:
dct.load_data(data)
node.query("system reload dictionaries")
for dct in complex_dicts:
queries_with_answers = []
local_data = []
for row in data:
local_fields = dct.get_fields()
local_values = [row.get_value_by_name(field.name) for field in local_fields if row.has_field(field.name)]
local_data.append(Row(local_fields, local_values))
dct.load_data(local_data)
node.query("system reload dictionary {}".format(dct.name))
for row in local_data:
for field in dct.get_fields():
if not field.is_key:
for query in dct.get_select_get_queries(field, row):
queries_with_answers.append((query, row.get_value_by_name(field.name)))
for query in dct.get_select_has_queries(field, row):
queries_with_answers.append((query, 1))
for query in dct.get_select_get_or_default_queries(field, row):
queries_with_answers.append((query, field.default_value_for_get))
for query, answer in queries_with_answers:
print query
assert node.query(query) == str(answer) + '\n'
......@@ -27,10 +27,11 @@ Types of sources (`source_type`):
- [Executable file](#dicts-external_dicts_dict_sources-executable)
- [HTTP(s)](#dicts-external_dicts_dict_sources-http)
- DBMS
- [ODBC](#dicts-external_dicts_dict_sources-odbc)
- [MySQL](#dicts-external_dicts_dict_sources-mysql)
- [ClickHouse](#dicts-external_dicts_dict_sources-clickhouse)
- [MongoDB](#dicts-external_dicts_dict_sources-mongodb)
- [ODBC](#dicts-external_dicts_dict_sources-odbc)
- [Redis](#dicts-external_dicts_dict_sources-redis)
## Local File {#dicts-external_dicts_dict_sources-local_file}
......@@ -424,4 +425,27 @@ Setting fields:
- `db` – Name of the database.
- `collection` – Name of the collection.
### Redis {#dicts-external_dicts_dict_sources-redis}
Example of settings:
```xml
<source>
<redis>
<host>localhost</host>
<port>6379</port>
<storage_type>simple</storage_type>
<db_index>0</db_index>
</redis>
</source>
```
Setting fields:
- `host` – The Redis host.
- `port` – The port on the Redis server.
- `storage_type` – The structure of internal Redis storage using for work with keys. `simple` is for simple sources and for hashed single key sources, `hash_map` is for hashed sources with two keys. Ranged sources and cache sources with complex key are unsupported. May be omitted, default value is `simple`.
- `db_index` – The specific numeric index of Redis logical database. May be omitted, default value is 0.
[Original article](https://clickhouse.yandex/docs/en/query_language/dicts/external_dicts_dict_sources/) <!--hide-->
# Источники внешних словарей
# Источники внешних словарей {#dicts-external_dicts_dict_sources}
Внешний словарь можно подключить из множества источников.
......@@ -24,17 +24,18 @@
Типы источников (`source_type`):
- [Локальный файл](#ispolniaemyi-fail)
- [Исполняемый файл](#ispolniaemyi-fail)
- [HTTP(s)](#http-s)
- [Локальный файл](#dicts-external_dicts_dict_sources-local_file)
- [Исполняемый файл](#dicts-external_dicts_dict_sources-executable)
- [HTTP(s)](#dicts-external_dicts_dict_sources-http)
- СУБД:
- [ODBC](#dicts-external_dicts_dict_sources-odbc)
- [MySQL](#mysql)
- [ClickHouse](#clickhouse)
- [MongoDB](#mongodb)
- [MySQL](#dicts-external_dicts_dict_sources-mysql)
- [ClickHouse](#dicts-external_dicts_dict_sources-clickhouse)
- [MongoDB](#dicts-external_dicts_dict_sources-mongodb)
- [Redis](#dicts-external_dicts_dict_sources-redis)
## Локальный файл
## Локальный файл {#dicts-external_dicts_dict_sources-local_file}
Пример настройки:
......@@ -53,7 +54,7 @@
- `format` - Формат файла. Поддерживаются все форматы, описанные в разделе "[Форматы](../../interfaces/formats.md#formats)".
## Исполняемый файл
## Исполняемый файл {#dicts-external_dicts_dict_sources-executable}
Работа с исполняемым файлом зависит от [размещения словаря в памяти](external_dicts_dict_layout.md). Если тип размещения словаря `cache` и `complex_key_cache`, то ClickHouse запрашивает необходимые ключи, отправляя запрос в `STDIN` исполняемого файла.
......@@ -74,7 +75,7 @@
- `format` - Формат файла. Поддерживаются все форматы, описанные в разделе "[Форматы](../../interfaces/formats.md#formats)".
## HTTP(s)
## HTTP(s) {#dicts-external_dicts_dict_sources-http}
Работа с HTTP(s) сервером зависит от [размещения словаря в памяти](external_dicts_dict_layout.md). Если тип размещения словаря `cache` и `complex_key_cache`, то ClickHouse запрашивает необходимые ключи, отправляя запрос методом `POST`.
......@@ -362,7 +363,7 @@ MySQL можно подключить на локальном хосте чер
```
### ClickHouse
### ClickHouse {#dicts-external_dicts_dict_sources-clickhouse}
Пример настройки:
......@@ -392,7 +393,7 @@ MySQL можно подключить на локальном хосте чер
- `invalidate_query` - запрос для проверки статуса словаря. Необязательный параметр. Читайте подробнее в разделе [Обновление словарей](external_dicts_dict_lifetime.md).
### MongoDB
### MongoDB {#dicts-external_dicts_dict_sources-mongodb}
Пример настройки:
......@@ -418,4 +419,26 @@ MySQL можно подключить на локальном хосте чер
- `db` - имя базы данных.
- `collection` - имя коллекции.
### Redis {#dicts-external_dicts_dict_sources-redis}
Пример настройки:
```xml
<source>
<redis>
<host>localhost</host>
<port>6379</port>
<storage_type>simple</storage_type>
<db_index>0</db_index>
</redis>
</source>
```
Поля настройки:
- `host` – хост Redis.
- `port` – порт сервера Redis.
- `storage_type` – способ хранения ключей. Необходимо использовать `simple` для источников с одним столбцом ключей, `hash_map` -- для источников с двумя столбцами ключей. Источники с более, чем двумя столбцами ключей, не поддерживаются. Может отсутствовать, значение по умолчанию `simple`.
- `db_index` – номер базы данных. Может отсутствовать, значение по умолчанию 0.
[Оригинальная статья](https://clickhouse.yandex/docs/ru/query_language/dicts/external_dicts_dict_sources/) <!--hide-->
......@@ -21,7 +21,7 @@ BUILD_TARGETS=clickhouse
BUILD_TYPE=Debug
ENABLE_EMBEDDED_COMPILER=0
CMAKE_FLAGS="-D CMAKE_C_FLAGS_ADD=-g0 -D CMAKE_CXX_FLAGS_ADD=-g0 -D ENABLE_JEMALLOC=0 -D ENABLE_CAPNP=0 -D ENABLE_RDKAFKA=0 -D ENABLE_UNWIND=0 -D ENABLE_ICU=0 -D ENABLE_POCO_MONGODB=0 -D ENABLE_POCO_NETSSL=0 -D ENABLE_POCO_ODBC=0 -D ENABLE_ODBC=0 -D ENABLE_MYSQL=0 -D ENABLE_SSL=0 -D ENABLE_POCO_NETSSL=0"
CMAKE_FLAGS="-D CMAKE_C_FLAGS_ADD=-g0 -D CMAKE_CXX_FLAGS_ADD=-g0 -D ENABLE_JEMALLOC=0 -D ENABLE_CAPNP=0 -D ENABLE_RDKAFKA=0 -D ENABLE_UNWIND=0 -D ENABLE_ICU=0 -D ENABLE_POCO_MONGODB=0 -D ENABLE_POCO_REDIS=0 -D ENABLE_POCO_NETSSL=0 -D ENABLE_POCO_ODBC=0 -D ENABLE_ODBC=0 -D ENABLE_MYSQL=0 -D ENABLE_SSL=0 -D ENABLE_POCO_NETSSL=0"
[[ $(uname) == "FreeBSD" ]] && COMPILER_PACKAGE_VERSION=devel && export COMPILER_PATH=/usr/local/bin
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册