提交 a8ce7530 编写于 作者: C comunodi

Put keys in result block only if value exists

上级 b3d8ec3e
......@@ -36,6 +36,7 @@ namespace DB
extern const int TYPE_MISMATCH;
extern const int LOGICAL_ERROR;
extern const int LIMIT_EXCEEDED;
extern const int SIZES_OF_NESTED_COLUMNS_ARE_INCONSISTENT;
}
......@@ -59,11 +60,8 @@ namespace DB
bool isNull(const Poco::Redis::RedisType::Ptr & value)
{
if (value.isNull())
return true;
if (value->isBulkString())
return static_cast<const Poco::Redis::Type<Poco::Redis::BulkString> *>(value.get())->value().isNull();
return false;
return value->isBulkString() &&
static_cast<const Poco::Redis::Type<Poco::Redis::BulkString> *>(value.get())->value().isNull();
}
std::string getStringOrThrow(const Poco::Redis::RedisType::Ptr & value, const std::string & column_name)
......@@ -158,10 +156,6 @@ namespace DB
if (all_read)
return {};
for (size_t i = 0; i < 5; ++i)
if (description.sample_block.columns() >= i + 1)
LOG_INFO(&Logger::get("Redis"), description.sample_block.getByPosition(i).dumpStructure());
const size_t size = description.sample_block.columns();
MutableColumns columns(description.sample_block.columns());
......@@ -220,6 +214,7 @@ namespace DB
commandForValues.addRedisType(secondary_key);
}
// FIXME: fix insert
Poco::Redis::Array values = client->execute<Poco::Redis::Array>(commandForValues);
for (const auto & value : values)
{
......@@ -235,10 +230,10 @@ namespace DB
}
else
{
size_t num_rows = 0;
Poco::Redis::Command commandForValues("MGET");
while (num_rows < max_block_size)
// keys.size() > 0
for (size_t num_rows = 0; num_rows < max_block_size; ++num_rows)
{
if (cursor >= keys.size())
{
......@@ -247,23 +242,29 @@ namespace DB
}
const auto & key = *(keys.begin() + cursor);
insertValueByIdx(0, key);
commandForValues.addRedisType(key);
++num_rows;
++cursor;
}
if (num_rows == 0)
return {};
Poco::Redis::Array values = client->execute<Poco::Redis::Array>(commandForValues);
for (const auto & value : values)
if (commandForValues.size() != values.size() + 1)
throw Exception{"Inconsistent sizes of keys and values in Redis request",
ErrorCodes::SIZES_OF_NESTED_COLUMNS_ARE_INCONSISTENT};
for (size_t num_rows = 0; num_rows < values.size(); ++num_rows)
{
if (isNull(value))
const auto & key = *(keys.begin() + cursor - num_rows - 1);
const auto & value = *(values.begin() + values.size() - num_rows - 1);
if (value.isNull())
{
insertValueByIdx(0, key);
insertDefaultValue(*columns[1], *description.sample_block.getByPosition(1).column);
else
}
else if (!isNull(value)) // null string means 'no value for requested key'
{
insertValueByIdx(0, key);
insertValueByIdx(1, value);
}
}
}
......
......@@ -52,9 +52,6 @@ namespace DB
# include <ext/enumerate.h>
# include "RedisBlockInputStream.h"
# include "Poco/Logger.h"
# include "common/logger_useful.h"
namespace DB
{
......@@ -84,18 +81,13 @@ namespace DB
, sample_block{sample_block}
, client{std::make_shared<Poco::Redis::Client>(host, port)}
{
LOG_INFO(&Logger::get("Redis"), "in ctor");
LOG_INFO(&Logger::get("Redis"), dict_struct.attributes.size());
if (dict_struct.attributes.size() != 1)
throw Exception{"Invalid number of non key columns for Redis source: " +
DB::toString(dict_struct.attributes.size()) + ", expected 1",
ErrorCodes::INVALID_CONFIG_PARAMETER};
LOG_INFO(&Logger::get("Redis"), "After first check");
if (storage_type == RedisStorageType::HASH_MAP)
{
LOG_INFO(&Logger::get("Redis"), "SET STORAGE_TYPE");
if (!dict_struct.key.has_value())
throw Exception{"Redis source with storage type \'hash_map\' must have key",
ErrorCodes::INVALID_CONFIG_PARAMETER};
......@@ -105,11 +97,8 @@ namespace DB
// suppose key[0] is primary key, key[1] is secondary key
}
LOG_INFO(&Logger::get("Redis"), "After second check");
if (db_index != 0)
{
LOG_INFO(&Logger::get("Redis"), "SET DB_INDEX");
Poco::Redis::Command command("SELECT");
command << static_cast<Int64>(db_index);
std::string reply = client->execute<std::string>(command);
......@@ -117,8 +106,6 @@ namespace DB
throw Exception{"Selecting db with index " + DB::toString(db_index) + " failed with reason " + reply,
ErrorCodes::CANNOT_SELECT};
}
LOG_INFO(&Logger::get("Redis"), "After third check");
}
......@@ -154,17 +141,11 @@ namespace DB
BlockInputStreamPtr RedisDictionarySource::loadAll()
{
LOG_INFO(&Logger::get("Redis"), "Redis in loadAll");
Poco::Redis::Command command_for_keys("KEYS");
command_for_keys << "*";
LOG_INFO(&Logger::get("Redis"), "Command for keys: " + command_for_keys.toString());
Poco::Redis::Array keys = client->execute<Poco::Redis::Array>(command_for_keys);
LOG_INFO(&Logger::get("Redis"), "Command for keys executed");
LOG_INFO(&Logger::get("Redis"), "KEYS: " + keys.toString());
if (storage_type == RedisStorageType::HASH_MAP && dict_struct.key->size() == 2)
{
Poco::Redis::Array hkeys;
......@@ -173,13 +154,12 @@ namespace DB
Poco::Redis::Command command_for_secondary_keys("HKEYS");
command_for_secondary_keys.addRedisType(key);
Poco::Redis::Array reply_for_primary_key = client->execute<Poco::Redis::Array>(command_for_secondary_keys);
LOG_INFO(&Logger::get("Redis"), "Command for hkeys executed");
Poco::SharedPtr<Poco::Redis::Array> primary_with_secondary;
primary_with_secondary->addRedisType(key);
for (const auto & secondary_key : reply_for_primary_key)
primary_with_secondary->addRedisType(secondary_key);
LOG_INFO(&Logger::get("Redis"), "HKEYS: " + primary_with_secondary->toString());
hkeys.add(*primary_with_secondary);
}
keys = hkeys;
......@@ -191,8 +171,6 @@ namespace DB
BlockInputStreamPtr RedisDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
LOG_INFO(&Logger::get("Redis"), "Redis in loadIds");
if (storage_type != RedisStorageType::SIMPLE)
throw Exception{"Cannot use loadIds with \'simple\' storage type", ErrorCodes::UNSUPPORTED_METHOD};
......@@ -204,8 +182,6 @@ namespace DB
for (UInt64 id : ids)
keys << DB::toString(id);
LOG_INFO(&Logger::get("Redis"), "KEYS: " + keys.toString());
return std::make_shared<RedisBlockInputStream>(client, std::move(keys), sample_block, max_block_size);
}
......
......@@ -2,6 +2,7 @@
#include <Common/config.h>
#include <Core/Block.h>
#if USE_POCO_REDIS
# include "DictionaryStructure.h"
......@@ -95,7 +96,7 @@ namespace DB
const DictionaryStructure dict_struct;
const std::string host;
const UInt16 port;
const UInt8 db_index; // [0..15]
const UInt8 db_index;
const RedisStorageType::Id storage_type;
Block sample_block;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册