提交 7ccf0444 编写于 作者: C CurtizJ

better code in Redis external dictionary

上级 df82e4bd
......@@ -451,7 +451,7 @@ namespace ErrorCodes
extern const int INVALID_TEMPLATE_FORMAT = 474;
extern const int INVALID_WITH_FILL_EXPRESSION = 475;
extern const int WITH_TIES_WITHOUT_ORDER_BY = 476;
extern const int INVALID_USAGE_OF_INPUT = 477;
extern const int INTERNAL_REDIS_ERROR = 477;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
......
......@@ -26,8 +26,8 @@ namespace DB
{
extern const int TYPE_MISMATCH;
extern const int LOGICAL_ERROR;
extern const int LIMIT_EXCEEDED;
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int INTERNAL_REDIS_ERROR;
}
......@@ -49,42 +49,18 @@ namespace DB
{
using ValueType = ExternalResultDescription::ValueType;
bool isNullString(const Poco::Redis::RedisType::Ptr & value)
{
return value->isBulkString() &&
static_cast<const Poco::Redis::Type<Poco::Redis::BulkString> *>(value.get())->value().isNull();
}
std::string getStringOrThrow(const Poco::Redis::RedisType::Ptr & value, const std::string & column_name)
{
switch (value->type())
{
case Poco::Redis::RedisTypeTraits<Poco::Redis::BulkString>::TypeId:
{
const auto & bs = static_cast<const Poco::Redis::Type<Poco::Redis::BulkString> *>(value.get())->value();
if (bs.isNull())
throw Exception{"Type mismatch, expected not null String for column " + column_name,
ErrorCodes::TYPE_MISMATCH};
return bs.value();
}
case Poco::Redis::RedisTypeTraits<std::string>::TypeId:
return static_cast<const Poco::Redis::Type<std::string> *>(value.get())->value();
default:
throw Exception{"Type mismatch, expected std::string, got type id = " + toString(value->type()) + " for column " + column_name,
ErrorCodes::TYPE_MISMATCH};
}
}
template <typename T>
inline void insert(IColumn & column, const String & stringValue)
{
static_cast<ColumnVector<T> &>(column).insertValue(parse<T>(stringValue));
assert_cast<ColumnVector<T> &>(column).insertValue(parse<T>(stringValue));
}
void insertValue(IColumn & column, const ValueType type, const Poco::Redis::RedisType::Ptr & value, const std::string & name)
void insertValue(IColumn & column, const ValueType type, const Poco::Redis::BulkString & bulk_string)
{
String stringValue = getStringOrThrow(value, name);
if (bulk_string.isNull())
throw Exception{"Type mismatch, expected not Null String", ErrorCodes::TYPE_MISMATCH};
String stringValue = bulk_string.value();
switch (type)
{
case ValueType::vtUInt8:
......@@ -118,16 +94,16 @@ namespace DB
insert<Float64>(column, stringValue);
break;
case ValueType::vtString:
static_cast<ColumnString &>(column).insert(parse<String>(stringValue));
assert_cast<ColumnString &>(column).insert(parse<String>(stringValue));
break;
case ValueType::vtDate:
static_cast<ColumnUInt16 &>(column).insertValue(parse<LocalDate>(stringValue).getDayNum());
assert_cast<ColumnUInt16 &>(column).insertValue(parse<LocalDate>(stringValue).getDayNum());
break;
case ValueType::vtDateTime:
static_cast<ColumnUInt32 &>(column).insertValue(static_cast<UInt32>(parse<LocalDateTime>(stringValue)));
assert_cast<ColumnUInt32 &>(column).insertValue(static_cast<UInt32>(parse<LocalDateTime>(stringValue)));
break;
case ValueType::vtUUID:
static_cast<ColumnUInt128 &>(column).insertValue(parse<UUID>(stringValue));
assert_cast<ColumnUInt128 &>(column).insertValue(parse<UUID>(stringValue));
break;
}
}
......@@ -150,25 +126,21 @@ namespace DB
const auto insertValueByIdx = [this, &columns](size_t idx, const auto & value)
{
const auto & name = description.sample_block.getByPosition(idx).name;
if (description.types[idx].second)
{
ColumnNullable & column_nullable = static_cast<ColumnNullable &>(*columns[idx]);
insertValue(column_nullable.getNestedColumn(), description.types[idx].first, value, name);
insertValue(column_nullable.getNestedColumn(), description.types[idx].first, value);
column_nullable.getNullMapData().emplace_back(0);
}
else
insertValue(*columns[idx], description.types[idx].first, value, name);
insertValue(*columns[idx], description.types[idx].first, value);
};
if (storage_type == RedisStorageType::HASH_MAP)
{
size_t num_rows = 0;
while (num_rows < max_block_size && !all_read)
for (; cursor < keys.size(); ++cursor)
{
if (cursor >= keys.size())
break;
const auto & keys_array = keys.get<RedisArray>(cursor);
if (keys_array.size() < 2)
{
......@@ -183,24 +155,20 @@ namespace DB
for (auto it = keys_array.begin(); it != keys_array.end(); ++it)
command_for_values.addRedisType(*it);
++cursor;
auto values = client->execute<RedisArray>(command_for_values);
if (keys_array.size() != values.size() + 1) // 'HMGET' primary_key secondary_keys
throw Exception{"Inconsistent sizes of keys and values in Redis request",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
const auto & primary_key = *keys_array.begin();
const auto & primary_key = keys_array.get<RedisBulkString>(0);
for (size_t i = 0; i < values.size(); ++i)
{
const auto & secondary_key = *(keys_array.begin() + i + 1);
const auto & value = *(values.begin() + i);
if (value.isNull())
throw Exception("Got NULL value in response from Redis", ErrorCodes::LOGICAL_ERROR);
const auto & secondary_key = keys_array.get<RedisBulkString>(i + 1);
const auto & value = values.get<RedisBulkString>(i);
/// null string means 'no value for requested key'
if (!isNullString(value))
if (!value.isNull())
{
insertValueByIdx(0, primary_key);
insertValueByIdx(1, secondary_key);
......@@ -214,34 +182,27 @@ namespace DB
{
Poco::Redis::Command command_for_values("MGET");
// keys.size() > 0
for (size_t i = 0; i < max_block_size && cursor < keys.size(); ++i)
{
const auto & key = *(keys.begin() + cursor);
command_for_values.addRedisType(key);
++cursor;
}
size_t need_values = std::min(max_block_size, keys.size() - cursor);
for (size_t i = 0; i < need_values; ++i)
command_for_values.add(keys.get<RedisBulkString>(cursor + i));
auto values = client->execute<RedisArray>(command_for_values);
if (command_for_values.size() != values.size() + 1) // 'MGET' keys
throw Exception{"Inconsistent sizes of keys and values in Redis request",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
if (values.size() != need_values)
throw Exception{"Inconsistent sizes of keys and values in Redis request", ErrorCodes::INTERNAL_REDIS_ERROR};
for (size_t i = 0; i < values.size(); ++i)
{
const auto & key = *(keys.begin() + cursor - i - 1);
const auto & value = *(values.begin() + values.size() - i - 1);
if (value.isNull())
throw Exception("Got NULL value in response from Redis", ErrorCodes::LOGICAL_ERROR);
const auto & key = keys.get<RedisBulkString>(cursor + i);
const auto & value = values.get<RedisBulkString>(i);
/// null string means 'no value for requested key'
if (!isNullString(value))
/// Null string means 'no value for requested key'
if (!value.isNull())
{
insertValueByIdx(0, key);
insertValueByIdx(1, value);
}
}
cursor += need_values;
}
return description.sample_block.cloneWithColumns(std::move(columns));
......
......@@ -8,14 +8,13 @@
# include <DataStreams/IBlockInputStream.h>
# include "RedisDictionarySource.h"
# include <Poco/Redis/Array.h>
# include <Poco/Redis/Type.h>
namespace Poco
{
namespace Redis
{
class Array;
class Client;
class RedisType;
}
}
......@@ -26,6 +25,7 @@ namespace DB
{
public:
using RedisArray = Poco::Redis::Array;
using RedisBulkString = Poco::Redis::BulkString;
RedisBlockInputStream(
const std::shared_ptr<Poco::Redis::Client> & client_,
......
......@@ -13,7 +13,7 @@ namespace DB
{
auto createTableSource = [=](const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const String & config_prefix,
Block & sample_block,
const Context & /* context */) -> DictionarySourcePtr {
#if USE_POCO_REDIS
......@@ -52,8 +52,8 @@ namespace DB
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
extern const int CANNOT_SELECT;
extern const int INVALID_CONFIG_PARAMETER;
extern const int INTERNAL_REDIS_ERROR;
}
......@@ -61,7 +61,7 @@ namespace DB
RedisDictionarySource::RedisDictionarySource(
const DictionaryStructure & dict_struct_,
const std::string & host_,
const String & host_,
UInt16 port_,
UInt8 db_index_,
RedisStorageType storage_type_,
......@@ -81,12 +81,12 @@ namespace DB
if (storage_type == RedisStorageType::HASH_MAP)
{
if (!dict_struct.key.has_value())
if (!dict_struct.key)
throw Exception{"Redis source with storage type \'hash_map\' must have key",
ErrorCodes::INVALID_CONFIG_PARAMETER};
if (dict_struct.key.value().size() > 2)
throw Exception{"Redis source with complex keys having more than 2 attributes are unsupported",
if (dict_struct.key->size() != 2)
throw Exception{"Redis source with storage type \'hash_map\' requiers 2 keys",
ErrorCodes::INVALID_CONFIG_PARAMETER};
// suppose key[0] is primary key, key[1] is secondary key
}
......@@ -95,10 +95,10 @@ namespace DB
{
RedisCommand command("SELECT");
command << static_cast<Int64>(db_index);
std::string reply = client->execute<std::string>(command);
String reply = client->execute<String>(command);
if (reply != "+OK\r\n")
throw Exception{"Selecting db with index " + DB::toString(db_index) + " failed with reason " + reply,
ErrorCodes::CANNOT_SELECT};
throw Exception{"Selecting database with index " + DB::toString(db_index)
+ " failed with reason " + reply, ErrorCodes::INTERNAL_REDIS_ERROR};
}
}
......@@ -106,7 +106,7 @@ namespace DB
RedisDictionarySource::RedisDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config_,
const std::string & config_prefix_,
const String & config_prefix_,
Block & sample_block_)
: RedisDictionarySource(
dict_struct_,
......@@ -132,7 +132,7 @@ namespace DB
RedisDictionarySource::~RedisDictionarySource() = default;
static std::string storageTypeToKeyType(RedisStorageType type)
static String storageTypeToKeyType(RedisStorageType type)
{
switch (type)
{
......@@ -160,7 +160,7 @@ namespace DB
RedisArray keys;
auto key_type = storageTypeToKeyType(storage_type);
for (auto & key : all_keys)
if (key_type == client->execute<std::string>(RedisCommand("TYPE").addRedisType(key)))
if (key_type == client->execute<String>(RedisCommand("TYPE").addRedisType(key)))
keys.addRedisType(std::move(key));
if (storage_type == RedisStorageType::HASH_MAP)
......@@ -213,12 +213,12 @@ namespace DB
return std::make_shared<RedisBlockInputStream>(client, std::move(keys), storage_type, sample_block, max_block_size);
}
std::string RedisDictionarySource::toString() const
String RedisDictionarySource::toString() const
{
return "Redis: " + host + ':' + DB::toString(port);
}
RedisStorageType RedisDictionarySource::parseStorageType(const std::string & storage_type_str)
RedisStorageType RedisDictionarySource::parseStorageType(const String & storage_type_str)
{
if (storage_type_str == "hash_map")
return RedisStorageType::HASH_MAP;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册