未验证 提交 fdae3eef 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #3805 from proller/fix4

clang-format of dbms/src/Dictionaries/*
......@@ -50,7 +50,8 @@ IncludeCategories:
- Regex: '.*'
Priority: 40
ReflowComments: false
AlignEscapedNewlinesLeft: true
AlignEscapedNewlinesLeft: false
AlignEscapedNewlines: DontAlign
# Not changed:
AccessModifierOffset: -4
......
#pragma once
#include "IDictionary.h"
#include "IDictionarySource.h"
#include "DictionaryStructure.h"
#include <Common/ArenaWithFreeLists.h>
#include <Common/CurrentMetrics.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <ext/bit_cast.h>
#include <cmath>
#include <atomic>
#include <chrono>
#include <vector>
#include <cmath>
#include <map>
#include <shared_mutex>
#include <variant>
#include <vector>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <pcg_random.hpp>
#include <shared_mutex>
#include <Common/ArenaWithFreeLists.h>
#include <Common/CurrentMetrics.h>
#include <ext/bit_cast.h>
#include "DictionaryStructure.h"
#include "IDictionary.h"
#include "IDictionarySource.h"
namespace DB
{
class CacheDictionary final : public IDictionary
{
public:
CacheDictionary(const std::string & name, const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime,
CacheDictionary(
const std::string & name,
const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr,
const DictionaryLifetime dict_lifetime,
const size_t size);
CacheDictionary(const CacheDictionary & other);
......@@ -42,16 +44,12 @@ public:
double getHitRate() const override
{
return static_cast<double>(hit_count.load(std::memory_order_acquire)) /
query_count.load(std::memory_order_relaxed);
return static_cast<double>(hit_count.load(std::memory_order_acquire)) / query_count.load(std::memory_order_relaxed);
}
size_t getElementCount() const override { return element_count.load(std::memory_order_relaxed); }
double getLoadFactor() const override
{
return static_cast<double>(element_count.load(std::memory_order_relaxed)) / size;
}
double getLoadFactor() const override { return static_cast<double>(element_count.load(std::memory_order_relaxed)) / size; }
bool isCached() const override { return true; }
......@@ -63,10 +61,7 @@ public:
const DictionaryStructure & getStructure() const override { return dict_struct; }
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override
{
return creation_time;
}
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override { return creation_time; }
bool isInjective(const std::string & attribute_name) const override
{
......@@ -77,14 +72,15 @@ public:
void toParent(const PaddedPODArray<Key> & ids, PaddedPODArray<Key> & out) const override;
void isInVectorVector(const PaddedPODArray<Key> & child_ids, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const override;
void isInVectorVector(
const PaddedPODArray<Key> & child_ids, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const override;
void isInVectorConstant(const PaddedPODArray<Key> & child_ids, const Key ancestor_id, PaddedPODArray<UInt8> & out) const override;
void isInConstantVector(const Key child_id, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const override;
template <typename T>
using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>;
#define DECLARE(TYPE)\
#define DECLARE(TYPE) \
void get##TYPE(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
......@@ -104,9 +100,11 @@ public:
void getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ColumnString * out) const;
#define DECLARE(TYPE)\
void get##TYPE(\
const std::string & attribute_name, const PaddedPODArray<Key> & ids, const PaddedPODArray<TYPE> & def,\
#define DECLARE(TYPE) \
void get##TYPE( \
const std::string & attribute_name, \
const PaddedPODArray<Key> & ids, \
const PaddedPODArray<TYPE> & def, \
ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
......@@ -124,11 +122,11 @@ public:
DECLARE(Decimal128)
#undef DECLARE
void getString(
const std::string & attribute_name, const PaddedPODArray<Key> & ids, const ColumnString * const def,
ColumnString * const out) const;
void
getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, const ColumnString * const def, ColumnString * const out)
const;
#define DECLARE(TYPE)\
#define DECLARE(TYPE) \
void get##TYPE(const std::string & attribute_name, const PaddedPODArray<Key> & ids, const TYPE def, ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
......@@ -146,17 +144,17 @@ public:
DECLARE(Decimal128)
#undef DECLARE
void getString(
const std::string & attribute_name, const PaddedPODArray<Key> & ids, const String & def,
ColumnString * const out) const;
void getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, const String & def, ColumnString * const out) const;
void has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8> & out) const override;
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override;
private:
template <typename Value> using ContainerType = Value[];
template <typename Value> using ContainerPtrType = std::unique_ptr<ContainerType<Value>>;
template <typename Value>
using ContainerType = Value[];
template <typename Value>
using ContainerPtrType = std::unique_ptr<ContainerType<Value>>;
struct CellMetadata final
{
......@@ -183,19 +181,39 @@ private:
{
AttributeUnderlyingType type;
std::variant<
UInt8, UInt16, UInt32, UInt64,
UInt8,
UInt16,
UInt32,
UInt64,
UInt128,
Int8, Int16, Int32, Int64,
Decimal32, Decimal64, Decimal128,
Float32, Float64,
String> null_values;
Int8,
Int16,
Int32,
Int64,
Decimal32,
Decimal64,
Decimal128,
Float32,
Float64,
String>
null_values;
std::variant<
ContainerPtrType<UInt8>, ContainerPtrType<UInt16>, ContainerPtrType<UInt32>, ContainerPtrType<UInt64>,
ContainerPtrType<UInt8>,
ContainerPtrType<UInt16>,
ContainerPtrType<UInt32>,
ContainerPtrType<UInt64>,
ContainerPtrType<UInt128>,
ContainerPtrType<Int8>, ContainerPtrType<Int16>, ContainerPtrType<Int32>, ContainerPtrType<Int64>,
ContainerPtrType<Decimal32>, ContainerPtrType<Decimal64>, ContainerPtrType<Decimal128>,
ContainerPtrType<Float32>, ContainerPtrType<Float64>,
ContainerPtrType<StringRef>> arrays;
ContainerPtrType<Int8>,
ContainerPtrType<Int16>,
ContainerPtrType<Int32>,
ContainerPtrType<Int64>,
ContainerPtrType<Decimal32>,
ContainerPtrType<Decimal64>,
ContainerPtrType<Decimal128>,
ContainerPtrType<Float32>,
ContainerPtrType<Float64>,
ContainerPtrType<StringRef>>
arrays;
};
void createAttributes();
......@@ -205,29 +223,17 @@ private:
template <typename OutputType, typename DefaultGetter>
void getItemsNumber(
Attribute & attribute,
const PaddedPODArray<Key> & ids,
ResultArrayType<OutputType> & out,
DefaultGetter && get_default) const;
Attribute & attribute, const PaddedPODArray<Key> & ids, ResultArrayType<OutputType> & out, DefaultGetter && get_default) const;
template <typename AttributeType, typename OutputType, typename DefaultGetter>
void getItemsNumberImpl(
Attribute & attribute,
const PaddedPODArray<Key> & ids,
ResultArrayType<OutputType> & out,
DefaultGetter && get_default) const;
Attribute & attribute, const PaddedPODArray<Key> & ids, ResultArrayType<OutputType> & out, DefaultGetter && get_default) const;
template <typename DefaultGetter>
void getItemsString(
Attribute & attribute,
const PaddedPODArray<Key> & ids,
ColumnString * out,
DefaultGetter && get_default) const;
void getItemsString(Attribute & attribute, const PaddedPODArray<Key> & ids, ColumnString * out, DefaultGetter && get_default) const;
template <typename PresentIdHandler, typename AbsentIdHandler>
void update(
const std::vector<Key> & requested_ids, PresentIdHandler && on_cell_updated,
AbsentIdHandler && on_id_not_found) const;
void update(const std::vector<Key> & requested_ids, PresentIdHandler && on_cell_updated, AbsentIdHandler && on_id_not_found) const;
PaddedPODArray<Key> getCachedIds() const;
......@@ -251,10 +257,7 @@ private:
FindResult findCellIdx(const Key & id, const CellMetadata::time_point_t now) const;
template <typename AncestorType>
void isInImpl(
const PaddedPODArray<Key> & child_ids,
const AncestorType & ancestor_ids,
PaddedPODArray<UInt8> & out) const;
void isInImpl(const PaddedPODArray<Key> & child_ids, const AncestorType & ancestor_ids, PaddedPODArray<UInt8> & out) const;
const std::string name;
const DictionaryStructure dict_struct;
......
#include "CacheDictionary.h"
#include <ext/size.h>
#include <ext/map.h>
#include <ext/range.h>
#include <Columns/ColumnsNumber.h>
#include <Common/ProfilingScopedRWLock.h>
#include <Common/typeid_cast.h>
#include <Columns/ColumnsNumber.h>
#include <ext/map.h>
#include <ext/range.h>
#include <ext/size.h>
namespace ProfileEvents
{
extern const Event DictCacheKeysRequested;
extern const Event DictCacheKeysRequestedMiss;
extern const Event DictCacheKeysRequestedFound;
extern const Event DictCacheKeysExpired;
extern const Event DictCacheKeysNotFound;
extern const Event DictCacheKeysHit;
extern const Event DictCacheRequestTimeNs;
extern const Event DictCacheRequests;
extern const Event DictCacheLockWriteNs;
extern const Event DictCacheLockReadNs;
extern const Event DictCacheKeysRequested;
extern const Event DictCacheKeysRequestedMiss;
extern const Event DictCacheKeysRequestedFound;
extern const Event DictCacheKeysExpired;
extern const Event DictCacheKeysNotFound;
extern const Event DictCacheKeysHit;
extern const Event DictCacheRequestTimeNs;
extern const Event DictCacheRequests;
extern const Event DictCacheLockWriteNs;
extern const Event DictCacheLockReadNs;
}
namespace CurrentMetrics
{
extern const Metric DictCacheRequests;
extern const Metric DictCacheRequests;
}
namespace DB
{
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
......@@ -36,12 +35,11 @@ namespace ErrorCodes
template <typename OutputType, typename DefaultGetter>
void CacheDictionary::getItemsNumber(
Attribute & attribute,
const PaddedPODArray<Key> & ids,
ResultArrayType<OutputType> & out,
DefaultGetter && get_default) const
Attribute & attribute, const PaddedPODArray<Key> & ids, ResultArrayType<OutputType> & out, DefaultGetter && get_default) const
{
if (false) {}
if (false)
{
}
#define DISPATCH(TYPE) \
else if (attribute.type == AttributeUnderlyingType::TYPE) \
getItemsNumberImpl<TYPE, OutputType>(attribute, ids, out, std::forward<DefaultGetter>(get_default));
......@@ -60,16 +58,12 @@ void CacheDictionary::getItemsNumber(
DISPATCH(Decimal64)
DISPATCH(Decimal128)
#undef DISPATCH
else
throw Exception("Unexpected type of attribute: " + toString(attribute.type), ErrorCodes::LOGICAL_ERROR);
else throw Exception("Unexpected type of attribute: " + toString(attribute.type), ErrorCodes::LOGICAL_ERROR);
}
template <typename AttributeType, typename OutputType, typename DefaultGetter>
void CacheDictionary::getItemsNumberImpl(
Attribute & attribute,
const PaddedPODArray<Key> & ids,
ResultArrayType<OutputType> & out,
DefaultGetter && get_default) const
Attribute & attribute, const PaddedPODArray<Key> & ids, ResultArrayType<OutputType> & out, DefaultGetter && get_default) const
{
/// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
std::unordered_map<Key, std::vector<size_t>> outdated_ids;
......@@ -122,31 +116,28 @@ void CacheDictionary::getItemsNumberImpl(
return;
std::vector<Key> required_ids(outdated_ids.size());
std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids),
[] (auto & pair) { return pair.first; });
std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids), [](auto & pair) { return pair.first; });
/// request new values
update(required_ids,
[&] (const auto id, const auto cell_idx)
{
const auto attribute_value = attribute_array[cell_idx];
update(
required_ids,
[&](const auto id, const auto cell_idx)
{
const auto attribute_value = attribute_array[cell_idx];
for (const size_t row : outdated_ids[id])
out[row] = static_cast<OutputType>(attribute_value);
},
[&] (const auto id, const auto)
{
for (const size_t row : outdated_ids[id])
out[row] = get_default(row);
});
for (const size_t row : outdated_ids[id])
out[row] = static_cast<OutputType>(attribute_value);
},
[&](const auto id, const auto)
{
for (const size_t row : outdated_ids[id])
out[row] = get_default(row);
});
}
template <typename DefaultGetter>
void CacheDictionary::getItemsString(
Attribute & attribute,
const PaddedPODArray<Key> & ids,
ColumnString * out,
DefaultGetter && get_default) const
Attribute & attribute, const PaddedPODArray<Key> & ids, ColumnString * out, DefaultGetter && get_default) const
{
const auto rows = ext::size(ids);
......@@ -245,22 +236,22 @@ void CacheDictionary::getItemsString(
if (!outdated_ids.empty())
{
std::vector<Key> required_ids(outdated_ids.size());
std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids),
[] (auto & pair) { return pair.first; });
std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids), [](auto & pair) { return pair.first; });
update(required_ids,
[&] (const auto id, const auto cell_idx)
{
const auto attribute_value = attribute_array[cell_idx];
update(
required_ids,
[&](const auto id, const auto cell_idx)
{
const auto attribute_value = attribute_array[cell_idx];
map[id] = String{attribute_value};
total_length += (attribute_value.size + 1) * outdated_ids[id].size();
},
[&] (const auto id, const auto)
{
for (const auto row : outdated_ids[id])
total_length += get_default(row).size + 1;
});
map[id] = String{attribute_value};
total_length += (attribute_value.size + 1) * outdated_ids[id].size();
},
[&](const auto id, const auto)
{
for (const auto row : outdated_ids[id])
total_length += get_default(row).size + 1;
});
}
out->getChars().reserve(total_length);
......@@ -277,19 +268,13 @@ void CacheDictionary::getItemsString(
template <typename PresentIdHandler, typename AbsentIdHandler>
void CacheDictionary::update(
const std::vector<Key> & requested_ids,
PresentIdHandler && on_cell_updated,
AbsentIdHandler && on_id_not_found) const
const std::vector<Key> & requested_ids, PresentIdHandler && on_cell_updated, AbsentIdHandler && on_id_not_found) const
{
std::unordered_map<Key, UInt8> remaining_ids{requested_ids.size()};
for (const auto id : requested_ids)
remaining_ids.insert({ id, 0 });
remaining_ids.insert({id, 0});
std::uniform_int_distribution<UInt64> distribution
{
dict_lifetime.min_sec,
dict_lifetime.max_sec
};
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
......@@ -310,10 +295,8 @@ void CacheDictionary::update(
const auto & ids = id_column->getData();
/// cache column pointers
const auto column_ptrs = ext::map<std::vector>(ext::range(0, attributes.size()), [&block] (size_t i)
{
return block.safeGetByPosition(i + 1).column.get();
});
const auto column_ptrs = ext::map<std::vector>(
ext::range(0, attributes.size()), [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); });
for (const auto i : ext::range(0, ids.size()))
{
......
#include "ClickHouseDictionarySource.h"
#include "ExternalQueryBuilder.h"
#include "writeParenthesisedString.h"
#include <memory>
#include <Client/ConnectionPool.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include "readInvalidateQuery.h"
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/executeQuery.h>
#include <Common/isLocalAddress.h>
#include <memory>
#include <ext/range.h>
#include <IO/ConnectionTimeouts.h>
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
#include "ExternalQueryBuilder.h"
#include "readInvalidateQuery.h"
#include "writeParenthesisedString.h"
namespace DB
{
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
......@@ -25,61 +24,81 @@ namespace ErrorCodes
static const size_t MAX_CONNECTIONS = 16;
static ConnectionPoolWithFailoverPtr createPool(
const std::string & host, UInt16 port, bool secure, const std::string & db,
const std::string & user, const std::string & password, const Context & context)
const std::string & host,
UInt16 port,
bool secure,
const std::string & db,
const std::string & user,
const std::string & password,
const Context & context)
{
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(context.getSettingsRef());
ConnectionPoolPtrs pools;
pools.emplace_back(std::make_shared<ConnectionPool>(
MAX_CONNECTIONS, host, port, db, user, password, timeouts, "ClickHouseDictionarySource",
Protocol::Compression::Enable,
secure ? Protocol::Secure::Enable : Protocol::Secure::Disable));
MAX_CONNECTIONS,
host,
port,
db,
user,
password,
timeouts,
"ClickHouseDictionarySource",
Protocol::Compression::Enable,
secure ? Protocol::Secure::Enable : Protocol::Secure::Disable));
return std::make_shared<ConnectionPoolWithFailover>(pools, LoadBalancing::RANDOM);
}
ClickHouseDictionarySource::ClickHouseDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const Block & sample_block, Context & context)
: update_time{std::chrono::system_clock::from_time_t(0)},
dict_struct{dict_struct_},
host{config.getString(config_prefix + ".host")},
port(config.getInt(config_prefix + ".port")),
secure(config.getBool(config_prefix + ".secure", false)),
user{config.getString(config_prefix + ".user", "")},
password{config.getString(config_prefix + ".password", "")},
db{config.getString(config_prefix + ".db", "")},
table{config.getString(config_prefix + ".table")},
where{config.getString(config_prefix + ".where", "")},
update_field{config.getString(config_prefix + ".update_field", "")},
invalidate_query{config.getString(config_prefix + ".invalidate_query", "")},
query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks},
sample_block{sample_block}, context(context),
is_local{isLocalAddress({ host, port }, config.getInt("tcp_port", 0))},
pool{is_local ? nullptr : createPool(host, port, secure, db, user, password, context)},
load_all_query{query_builder.composeLoadAllQuery()}
{}
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const Block & sample_block,
Context & context)
: update_time{std::chrono::system_clock::from_time_t(0)}
, dict_struct{dict_struct_}
, host{config.getString(config_prefix + ".host")}
, port(config.getInt(config_prefix + ".port"))
, secure(config.getBool(config_prefix + ".secure", false))
, user{config.getString(config_prefix + ".user", "")}
, password{config.getString(config_prefix + ".password", "")}
, db{config.getString(config_prefix + ".db", "")}
, table{config.getString(config_prefix + ".table")}
, where{config.getString(config_prefix + ".where", "")}
, update_field{config.getString(config_prefix + ".update_field", "")}
, invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
, sample_block{sample_block}
, context(context)
, is_local{isLocalAddress({host, port}, config.getInt("tcp_port", 0))}
, pool{is_local ? nullptr : createPool(host, port, secure, db, user, password, context)}
, load_all_query{query_builder.composeLoadAllQuery()}
{
}
ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionarySource & other)
: update_time{other.update_time},
dict_struct{other.dict_struct},
host{other.host}, port{other.port},
secure{other.secure},
user{other.user}, password{other.password},
db{other.db}, table{other.table},
where{other.where},
update_field{other.update_field},
invalidate_query{other.invalidate_query},
invalidate_query_response{other.invalidate_query_response},
query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks},
sample_block{other.sample_block}, context(other.context),
is_local{other.is_local},
pool{is_local ? nullptr : createPool(host, port, secure, db, user, password, context)},
load_all_query{other.load_all_query}
{}
: update_time{other.update_time}
, dict_struct{other.dict_struct}
, host{other.host}
, port{other.port}
, secure{other.secure}
, user{other.user}
, password{other.password}
, db{other.db}
, table{other.table}
, where{other.where}
, update_field{other.update_field}
, invalidate_query{other.invalidate_query}
, invalidate_query_response{other.invalidate_query_response}
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
, sample_block{other.sample_block}
, context(other.context)
, is_local{other.is_local}
, pool{is_local ? nullptr : createPool(host, port, secure, db, user, password, context)}
, load_all_query{other.load_all_query}
{
}
std::string ClickHouseDictionarySource::getUpdateFieldAndDate()
{
......@@ -119,17 +138,14 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadUpdatedAll()
BlockInputStreamPtr ClickHouseDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
return createStreamForSelectiveLoad(
query_builder.composeLoadIdsQuery(ids));
return createStreamForSelectiveLoad(query_builder.composeLoadIdsQuery(ids));
}
BlockInputStreamPtr ClickHouseDictionarySource::loadKeys(
const Columns & key_columns, const std::vector<size_t> & requested_rows)
BlockInputStreamPtr ClickHouseDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
return createStreamForSelectiveLoad(
query_builder.composeLoadKeysQuery(
key_columns, requested_rows, ExternalQueryBuilder::IN_WITH_TUPLES));
query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::IN_WITH_TUPLES));
}
bool ClickHouseDictionarySource::isModified() const
......@@ -167,7 +183,7 @@ std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & re
if (is_local)
{
auto input_block = executeQuery(request, context, true).in;
return readInvalidateQuery(dynamic_cast<IProfilingBlockInputStream&>((*input_block)));
return readInvalidateQuery(dynamic_cast<IProfilingBlockInputStream &>((*input_block)));
}
else
{
......
#pragma once
#include "IDictionarySource.h"
#include <memory>
#include <Client/ConnectionPoolWithFailover.h>
#include "DictionaryStructure.h"
#include "ExternalQueryBuilder.h"
#include <Client/ConnectionPoolWithFailover.h>
#include <memory>
#include "IDictionarySource.h"
namespace DB
{
/** Allows loading dictionaries from local or remote ClickHouse instance
* @todo use ConnectionPoolWithFailover
* @todo invent a way to keep track of source modifications
......@@ -17,10 +16,12 @@ namespace DB
class ClickHouseDictionarySource final : public IDictionarySource
{
public:
ClickHouseDictionarySource(const DictionaryStructure & dict_struct_,
ClickHouseDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const Block & sample_block, Context & context);
const Block & sample_block,
Context & context);
/// copy-constructor is provided in order to support cloneability
ClickHouseDictionarySource(const ClickHouseDictionarySource & other);
......@@ -31,8 +32,7 @@ public:
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(
const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
bool isModified() const override;
bool supportsSelectiveLoad() const override { return true; }
......
#include "ComplexKeyCacheDictionary.h"
#include "DictionaryBlockInputStream.h"
#include <Common/Arena.h>
#include <Common/BitHelpers.h>
#include <Common/randomSeed.h>
#include <Common/Stopwatch.h>
#include <Common/ProfilingScopedRWLock.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <ext/range.h>
#include <Common/ProfileEvents.h>
#include <Common/ProfilingScopedRWLock.h>
#include <Common/Stopwatch.h>
#include <Common/randomSeed.h>
#include <ext/map.h>
#include <ext/range.h>
#include "DictionaryBlockInputStream.h"
#include "DictionaryFactory.h"
namespace ProfileEvents
{
extern const Event DictCacheKeysRequested;
extern const Event DictCacheKeysRequestedMiss;
extern const Event DictCacheKeysRequestedFound;
extern const Event DictCacheKeysExpired;
extern const Event DictCacheKeysNotFound;
extern const Event DictCacheKeysHit;
extern const Event DictCacheRequestTimeNs;
extern const Event DictCacheLockWriteNs;
extern const Event DictCacheLockReadNs;
extern const Event DictCacheKeysRequested;
extern const Event DictCacheKeysRequestedMiss;
extern const Event DictCacheKeysRequestedFound;
extern const Event DictCacheKeysExpired;
extern const Event DictCacheKeysNotFound;
extern const Event DictCacheKeysHit;
extern const Event DictCacheRequestTimeNs;
extern const Event DictCacheLockWriteNs;
extern const Event DictCacheLockReadNs;
}
namespace CurrentMetrics
{
extern const Metric DictCacheRequests;
extern const Metric DictCacheRequests;
}
namespace DB
{
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
......@@ -52,13 +50,19 @@ inline UInt64 ComplexKeyCacheDictionary::getCellIdx(const StringRef key) const
}
ComplexKeyCacheDictionary::ComplexKeyCacheDictionary(const std::string & name, const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime,
ComplexKeyCacheDictionary::ComplexKeyCacheDictionary(
const std::string & name,
const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr,
const DictionaryLifetime dict_lifetime,
const size_t size)
: name{name}, dict_struct(dict_struct), source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime),
size{roundUpToPowerOfTwoOrZero(std::max(size, size_t(max_collision_length)))},
size_overlap_mask{this->size - 1},
rnd_engine(randomSeed())
: name{name}
, dict_struct(dict_struct)
, source_ptr{std::move(source_ptr)}
, dict_lifetime(dict_lifetime)
, size{roundUpToPowerOfTwoOrZero(std::max(size, size_t(max_collision_length)))}
, size_overlap_mask{this->size - 1}
, rnd_engine(randomSeed())
{
if (!this->source_ptr->supportsSelectiveLoad())
throw Exception{name + ": source cannot be used with ComplexKeyCacheDictionary", ErrorCodes::UNSUPPORTED_METHOD};
......@@ -68,47 +72,56 @@ ComplexKeyCacheDictionary::ComplexKeyCacheDictionary(const std::string & name, c
ComplexKeyCacheDictionary::ComplexKeyCacheDictionary(const ComplexKeyCacheDictionary & other)
: ComplexKeyCacheDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.size}
{}
{
}
void ComplexKeyCacheDictionary::getString(
const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types,
ColumnString * out) const
const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types, ColumnString * out) const
{
dict_struct.validateKeyTypes(key_types);
auto & attribute = getAttribute(attribute_name);
if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::String))
throw Exception{name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type), ErrorCodes::TYPE_MISMATCH};
throw Exception{name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),
ErrorCodes::TYPE_MISMATCH};
const auto null_value = StringRef{std::get<String>(attribute.null_values)};
getItemsString(attribute, key_columns, out, [&] (const size_t) { return null_value; });
getItemsString(attribute, key_columns, out, [&](const size_t) { return null_value; });
}
void ComplexKeyCacheDictionary::getString(
const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types,
const ColumnString * const def, ColumnString * const out) const
const std::string & attribute_name,
const Columns & key_columns,
const DataTypes & key_types,
const ColumnString * const def,
ColumnString * const out) const
{
dict_struct.validateKeyTypes(key_types);
auto & attribute = getAttribute(attribute_name);
if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::String))
throw Exception{name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type), ErrorCodes::TYPE_MISMATCH};
throw Exception{name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),
ErrorCodes::TYPE_MISMATCH};
getItemsString(attribute, key_columns, out, [&] (const size_t row) { return def->getDataAt(row); });
getItemsString(attribute, key_columns, out, [&](const size_t row) { return def->getDataAt(row); });
}
void ComplexKeyCacheDictionary::getString(
const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types,
const String & def, ColumnString * const out) const
const std::string & attribute_name,
const Columns & key_columns,
const DataTypes & key_types,
const String & def,
ColumnString * const out) const
{
dict_struct.validateKeyTypes(key_types);
auto & attribute = getAttribute(attribute_name);
if (!isAttributeTypeConvertibleTo(attribute.type, AttributeUnderlyingType::String))
throw Exception{name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type), ErrorCodes::TYPE_MISMATCH};
throw Exception{name + ": type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),
ErrorCodes::TYPE_MISMATCH};
getItemsString(attribute, key_columns, out, [&] (const size_t) { return StringRef{def}; });
getItemsString(attribute, key_columns, out, [&](const size_t) { return StringRef{def}; });
}
/// returns cell_idx (always valid for replacing), 'cell is valid' flag, 'cell is outdated' flag,
......@@ -118,7 +131,8 @@ void ComplexKeyCacheDictionary::getString(
/// true true impossible
///
/// todo: split this func to two: find_for_get and find_for_set
ComplexKeyCacheDictionary::FindResult ComplexKeyCacheDictionary::findCellIdx(const StringRef & key, const CellMetadata::time_point_t now, const size_t hash) const
ComplexKeyCacheDictionary::FindResult
ComplexKeyCacheDictionary::findCellIdx(const StringRef & key, const CellMetadata::time_point_t now, const size_t hash) const
{
auto pos = hash;
auto oldest_id = pos;
......@@ -211,17 +225,20 @@ void ComplexKeyCacheDictionary::has(const Columns & key_columns, const DataTypes
return;
std::vector<size_t> required_rows(outdated_keys.size());
std::transform(std::begin(outdated_keys), std::end(outdated_keys), std::begin(required_rows),
[] (auto & pair) { return pair.second.front(); });
std::transform(
std::begin(outdated_keys), std::end(outdated_keys), std::begin(required_rows), [](auto & pair) { return pair.second.front(); });
/// request new values
update(key_columns, keys_array, required_rows,
[&] (const StringRef key, const auto)
update(
key_columns,
keys_array,
required_rows,
[&](const StringRef key, const auto)
{
for (const auto out_idx : outdated_keys[key])
out[out_idx] = true;
},
[&] (const StringRef key, const auto)
[&](const StringRef key, const auto)
{
for (const auto out_idx : outdated_keys[key])
out[out_idx] = false;
......@@ -242,7 +259,8 @@ void ComplexKeyCacheDictionary::createAttributes()
attributes.push_back(createAttributeWithType(attribute.underlying_type, attribute.null_value));
if (attribute.hierarchical)
throw Exception{name + ": hierarchical attributes not supported for dictionary of type " + getTypeName(), ErrorCodes::TYPE_MISMATCH};
throw Exception{name + ": hierarchical attributes not supported for dictionary of type " + getTypeName(),
ErrorCodes::TYPE_MISMATCH};
}
}
......@@ -273,8 +291,7 @@ void ComplexKeyCacheDictionary::freeKey(const StringRef key) const
template <typename Pool>
StringRef ComplexKeyCacheDictionary::placeKeysInPool(
const size_t row, const Columns & key_columns, StringRefs & keys,
const std::vector<DictionaryAttribute> & key_attributes, Pool & pool)
const size_t row, const Columns & key_columns, StringRefs & keys, const std::vector<DictionaryAttribute> & key_attributes, Pool & pool)
{
const auto keys_size = key_columns.size();
size_t sum_keys_size{};
......@@ -313,22 +330,27 @@ StringRef ComplexKeyCacheDictionary::placeKeysInPool(
}
}
return { place, sum_keys_size };
return {place, sum_keys_size};
}
/// Explicit instantiations.
template StringRef ComplexKeyCacheDictionary::placeKeysInPool<Arena>(
const size_t row, const Columns & key_columns, StringRefs & keys,
const std::vector<DictionaryAttribute> & key_attributes, Arena & pool);
const size_t row,
const Columns & key_columns,
StringRefs & keys,
const std::vector<DictionaryAttribute> & key_attributes,
Arena & pool);
template StringRef ComplexKeyCacheDictionary::placeKeysInPool<ArenaWithFreeLists>(
const size_t row, const Columns & key_columns, StringRefs & keys,
const std::vector<DictionaryAttribute> & key_attributes, ArenaWithFreeLists & pool);
const size_t row,
const Columns & key_columns,
StringRefs & keys,
const std::vector<DictionaryAttribute> & key_attributes,
ArenaWithFreeLists & pool);
StringRef ComplexKeyCacheDictionary::placeKeysInFixedSizePool(
const size_t row, const Columns & key_columns) const
StringRef ComplexKeyCacheDictionary::placeKeysInFixedSizePool(const size_t row, const Columns & key_columns) const
{
const auto res = fixed_size_keys_pool->alloc();
auto place = res;
......@@ -340,14 +362,14 @@ StringRef ComplexKeyCacheDictionary::placeKeysInFixedSizePool(
place += key.size;
}
return { res, key_size };
return {res, key_size};
}
StringRef ComplexKeyCacheDictionary::copyIntoArena(StringRef src, Arena & arena)
{
char * allocated = arena.alloc(src.size);
memcpy(allocated, src.data, src.size);
return { allocated, src.size };
return {allocated, src.size};
}
StringRef ComplexKeyCacheDictionary::copyKey(const StringRef key) const
......@@ -355,13 +377,14 @@ StringRef ComplexKeyCacheDictionary::copyKey(const StringRef key) const
const auto res = key_size_is_fixed ? fixed_size_keys_pool->alloc() : keys_pool->alloc(key.size);
memcpy(res, key.data, key.size);
return { res, key.size };
return {res, key.size};
}
bool ComplexKeyCacheDictionary::isEmptyCell(const UInt64 idx) const
{
return (cells[idx].key == StringRef{} && (idx != zero_cell_idx
|| cells[idx].data == ext::safe_bit_cast<CellMetadata::time_point_urep_t>(CellMetadata::time_point_t())));
return (
cells[idx].key == StringRef{}
&& (idx != zero_cell_idx || cells[idx].data == ext::safe_bit_cast<CellMetadata::time_point_urep_t>(CellMetadata::time_point_t())));
}
BlockInputStreamPtr ComplexKeyCacheDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const
......@@ -371,8 +394,7 @@ BlockInputStreamPtr ComplexKeyCacheDictionary::getBlockInputStream(const Names &
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
for (auto idx : ext::range(0, cells.size()))
if (!isEmptyCell(idx)
&& !cells[idx].isDefault())
if (!isEmptyCell(idx) && !cells[idx].isDefault())
keys.push_back(cells[idx].key);
}
......@@ -382,26 +404,25 @@ BlockInputStreamPtr ComplexKeyCacheDictionary::getBlockInputStream(const Names &
void registerDictionaryComplexKeyCache(DictionaryFactory & factory)
{
auto create_layout = [=](
const std::string & name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr
) -> DictionaryPtr {
auto create_layout = [=](const std::string & name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr) -> DictionaryPtr
{
if (!dict_struct.key)
throw Exception {"'key' is required for dictionary of layout 'complex_key_hashed'", ErrorCodes::BAD_ARGUMENTS};
throw Exception{"'key' is required for dictionary of layout 'complex_key_hashed'", ErrorCodes::BAD_ARGUMENTS};
const auto & layout_prefix = config_prefix + ".layout";
const auto size = config.getInt(layout_prefix + ".complex_key_cache.size_in_cells");
if (size == 0)
throw Exception {name + ": dictionary of layout 'cache' cannot have 0 cells", ErrorCodes::TOO_SMALL_BUFFER_SIZE};
throw Exception{name + ": dictionary of layout 'cache' cannot have 0 cells", ErrorCodes::TOO_SMALL_BUFFER_SIZE};
const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
if (require_nonempty)
throw Exception {name + ": dictionary of layout 'cache' cannot have 'require_nonempty' attribute set",
ErrorCodes::BAD_ARGUMENTS};
throw Exception{name + ": dictionary of layout 'cache' cannot have 'require_nonempty' attribute set",
ErrorCodes::BAD_ARGUMENTS};
const DictionaryLifetime dict_lifetime {config, config_prefix + ".lifetime"};
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
return std::make_unique<ComplexKeyCacheDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime, size);
};
factory.registerLayout("complex_key_cache", create_layout);
......
......@@ -3,23 +3,23 @@
#include <atomic>
#include <chrono>
#include <map>
#include <shared_mutex>
#include <variant>
#include <vector>
#include <shared_mutex>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <pcg_random.hpp>
#include <Common/ArenaWithFreeLists.h>
#include <Common/HashTable/HashMap.h>
#include <Common/ProfilingScopedRWLock.h>
#include <Common/SmallObjectPool.h>
#include "DictionaryStructure.h"
#include "IDictionary.h"
#include "IDictionarySource.h"
#include <common/StringRef.h>
#include <ext/bit_cast.h>
#include <ext/map.h>
#include <ext/scope_guard.h>
#include <pcg_random.hpp>
#include "DictionaryStructure.h"
#include "IDictionary.h"
#include "IDictionarySource.h"
namespace ProfileEvents
......@@ -40,7 +40,8 @@ namespace DB
class ComplexKeyCacheDictionary final : public IDictionaryBase
{
public:
ComplexKeyCacheDictionary(const std::string & name,
ComplexKeyCacheDictionary(
const std::string & name,
const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr,
const DictionaryLifetime dict_lifetime,
......@@ -48,25 +49,13 @@ public:
ComplexKeyCacheDictionary(const ComplexKeyCacheDictionary & other);
std::string getKeyDescription() const
{
return key_description;
}
std::string getKeyDescription() const { return key_description; }
std::exception_ptr getCreationException() const override
{
return {};
}
std::exception_ptr getCreationException() const override { return {}; }
std::string getName() const override
{
return name;
}
std::string getName() const override { return name; }
std::string getTypeName() const override
{
return "ComplexKeyCache";
}
std::string getTypeName() const override { return "ComplexKeyCache"; }
size_t getBytesAllocated() const override
{
......@@ -74,55 +63,28 @@ public:
+ (string_arena ? string_arena->size() : 0);
}
size_t getQueryCount() const override
{
return query_count.load(std::memory_order_relaxed);
}
size_t getQueryCount() const override { return query_count.load(std::memory_order_relaxed); }
double getHitRate() const override
{
return static_cast<double>(hit_count.load(std::memory_order_acquire)) / query_count.load(std::memory_order_relaxed);
}
size_t getElementCount() const override
{
return element_count.load(std::memory_order_relaxed);
}
size_t getElementCount() const override { return element_count.load(std::memory_order_relaxed); }
double getLoadFactor() const override
{
return static_cast<double>(element_count.load(std::memory_order_relaxed)) / size;
}
double getLoadFactor() const override { return static_cast<double>(element_count.load(std::memory_order_relaxed)) / size; }
bool isCached() const override
{
return true;
}
bool isCached() const override { return true; }
std::unique_ptr<IExternalLoadable> clone() const override
{
return std::make_unique<ComplexKeyCacheDictionary>(*this);
}
std::unique_ptr<IExternalLoadable> clone() const override { return std::make_unique<ComplexKeyCacheDictionary>(*this); }
const IDictionarySource * getSource() const override
{
return source_ptr.get();
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }
const DictionaryLifetime & getLifetime() const override
{
return dict_lifetime;
}
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
const DictionaryStructure & getStructure() const override
{
return dict_struct;
}
const DictionaryStructure & getStructure() const override { return dict_struct; }
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override
{
return creation_time;
}
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override { return creation_time; }
bool isInjective(const std::string & attribute_name) const override
{
......@@ -135,7 +97,7 @@ public:
/// In all functions below, key_columns must be full (non-constant) columns.
/// See the requirement in IDataType.h for text-serialization functions.
#define DECLARE(TYPE) \
void get##TYPE( \
void get##TYPE( \
const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types, ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
......@@ -155,11 +117,12 @@ public:
void getString(const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types, ColumnString * out) const;
#define DECLARE(TYPE) \
void get##TYPE(const std::string & attribute_name, \
const Columns & key_columns, \
const DataTypes & key_types, \
const PaddedPODArray<TYPE> & def, \
#define DECLARE(TYPE) \
void get##TYPE( \
const std::string & attribute_name, \
const Columns & key_columns, \
const DataTypes & key_types, \
const PaddedPODArray<TYPE> & def, \
ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
......@@ -177,17 +140,19 @@ public:
DECLARE(Decimal128)
#undef DECLARE
void getString(const std::string & attribute_name,
void getString(
const std::string & attribute_name,
const Columns & key_columns,
const DataTypes & key_types,
const ColumnString * const def,
ColumnString * const out) const;
#define DECLARE(TYPE) \
void get##TYPE(const std::string & attribute_name, \
const Columns & key_columns, \
const DataTypes & key_types, \
const TYPE def, \
#define DECLARE(TYPE) \
void get##TYPE( \
const std::string & attribute_name, \
const Columns & key_columns, \
const DataTypes & key_types, \
const TYPE def, \
ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
......@@ -205,7 +170,8 @@ public:
DECLARE(Decimal128)
#undef DECLARE
void getString(const std::string & attribute_name,
void getString(
const std::string & attribute_name,
const Columns & key_columns,
const DataTypes & key_types,
const String & def,
......@@ -216,9 +182,12 @@ public:
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override;
private:
template <typename Value> using MapType = HashMapWithSavedHash<StringRef, Value, StringRefHash>;
template <typename Value> using ContainerType = Value[];
template <typename Value> using ContainerPtrType = std::unique_ptr<ContainerType<Value>>;
template <typename Value>
using MapType = HashMapWithSavedHash<StringRef, Value, StringRefHash>;
template <typename Value>
using ContainerType = Value[];
template <typename Value>
using ContainerPtrType = std::unique_ptr<ContainerType<Value>>;
struct CellMetadata final
{
......@@ -235,32 +204,35 @@ private:
time_point_urep_t data;
/// Sets expiration time, resets `is_default` flag to false
time_point_t expiresAt() const
{
return ext::safe_bit_cast<time_point_t>(data & EXPIRES_AT_MASK);
}
void setExpiresAt(const time_point_t & t)
{
data = ext::safe_bit_cast<time_point_urep_t>(t);
}
time_point_t expiresAt() const { return ext::safe_bit_cast<time_point_t>(data & EXPIRES_AT_MASK); }
void setExpiresAt(const time_point_t & t) { data = ext::safe_bit_cast<time_point_urep_t>(t); }
bool isDefault() const
{
return (data & IS_DEFAULT_MASK) == IS_DEFAULT_MASK;
}
void setDefault()
{
data |= IS_DEFAULT_MASK;
}
bool isDefault() const { return (data & IS_DEFAULT_MASK) == IS_DEFAULT_MASK; }
void setDefault() { data |= IS_DEFAULT_MASK; }
};
struct Attribute final
{
AttributeUnderlyingType type;
std::variant<UInt8, UInt16, UInt32, UInt64, UInt128, Int8, Int16, Int32, Int64,
Decimal32, Decimal64, Decimal128,
Float32, Float64, String> null_values;
std::variant<ContainerPtrType<UInt8>,
std::variant<
UInt8,
UInt16,
UInt32,
UInt64,
UInt128,
Int8,
Int16,
Int32,
Int64,
Decimal32,
Decimal64,
Decimal128,
Float32,
Float64,
String>
null_values;
std::variant<
ContainerPtrType<UInt8>,
ContainerPtrType<UInt16>,
ContainerPtrType<UInt32>,
ContainerPtrType<UInt64>,
......@@ -283,13 +255,13 @@ private:
Attribute createAttributeWithType(const AttributeUnderlyingType type, const Field & null_value);
template <typename OutputType, typename DefaultGetter>
void getItemsNumber(
Attribute & attribute, const Columns & key_columns, PaddedPODArray<OutputType> & out, DefaultGetter && get_default) const
void
getItemsNumber(Attribute & attribute, const Columns & key_columns, PaddedPODArray<OutputType> & out, DefaultGetter && get_default) const
{
if (false)
{
}
#define DISPATCH(TYPE) \
#define DISPATCH(TYPE) \
else if (attribute.type == AttributeUnderlyingType::TYPE) \
getItemsNumberImpl<TYPE, OutputType>(attribute, key_columns, out, std::forward<DefaultGetter>(get_default));
DISPATCH(UInt8)
......@@ -372,7 +344,8 @@ private:
std::begin(outdated_keys), std::end(outdated_keys), std::begin(required_rows), [](auto & pair) { return pair.second.front(); });
/// request new values
update(key_columns,
update(
key_columns,
keys_array,
required_rows,
[&](const StringRef key, const size_t cell_idx)
......@@ -497,7 +470,8 @@ private:
return pair.second.front();
});
update(key_columns,
update(
key_columns,
keys_array,
required_rows,
[&](const StringRef key, const size_t cell_idx)
......@@ -531,7 +505,8 @@ private:
}
template <typename PresentKeyHandler, typename AbsentKeyHandler>
void update(const Columns & in_key_columns,
void update(
const Columns & in_key_columns,
const PODArray<StringRef> & in_keys,
const std::vector<size_t> & in_requested_rows,
PresentKeyHandler && on_cell_updated,
......@@ -561,8 +536,10 @@ private:
const auto key_columns = ext::map<Columns>(
ext::range(0, keys_size), [&](const size_t attribute_idx) { return block.safeGetByPosition(attribute_idx).column; });
const auto attribute_columns = ext::map<Columns>(ext::range(0, attributes_size),
[&](const size_t attribute_idx) { return block.safeGetByPosition(keys_size + attribute_idx).column; });
const auto attribute_columns = ext::map<Columns>(ext::range(0, attributes_size), [&](const size_t attribute_idx)
{
return block.safeGetByPosition(keys_size + attribute_idx).column;
});
const auto rows_num = block.rows();
......@@ -693,7 +670,8 @@ private:
void freeKey(const StringRef key) const;
template <typename Arena>
static StringRef placeKeysInPool(const size_t row,
static StringRef placeKeysInPool(
const size_t row,
const Columns & key_columns,
StringRefs & keys,
const std::vector<DictionaryAttribute> & key_attributes,
......
......@@ -2,19 +2,19 @@
namespace DB
{
ComplexKeyCacheDictionary::Attribute ComplexKeyCacheDictionary::createAttributeWithType(const AttributeUnderlyingType type, const Field & null_value)
ComplexKeyCacheDictionary::Attribute
ComplexKeyCacheDictionary::createAttributeWithType(const AttributeUnderlyingType type, const Field & null_value)
{
Attribute attr{type, {}, {}};
switch (type)
{
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::TYPE: \
attr.null_values = TYPE(null_value.get<NearestFieldType<TYPE>>()); \
attr.arrays = std::make_unique<ContainerType<TYPE>>(size); \
bytes_allocated += size * sizeof(TYPE); \
break;
case AttributeUnderlyingType::TYPE: \
attr.null_values = TYPE(null_value.get<NearestFieldType<TYPE>>()); \
attr.arrays = std::make_unique<ContainerType<TYPE>>(size); \
bytes_allocated += size * sizeof(TYPE); \
break;
DISPATCH(UInt8)
DISPATCH(UInt16)
DISPATCH(UInt32)
......
......@@ -2,26 +2,53 @@
namespace DB
{
void ComplexKeyCacheDictionary::setAttributeValue(Attribute & attribute, const size_t idx, const Field & value) const
{
switch (attribute.type)
{
case AttributeUnderlyingType::UInt8: std::get<ContainerPtrType<UInt8>>(attribute.arrays)[idx] = value.get<UInt64>(); break;
case AttributeUnderlyingType::UInt16: std::get<ContainerPtrType<UInt16>>(attribute.arrays)[idx] = value.get<UInt64>(); break;
case AttributeUnderlyingType::UInt32: std::get<ContainerPtrType<UInt32>>(attribute.arrays)[idx] = value.get<UInt64>(); break;
case AttributeUnderlyingType::UInt64: std::get<ContainerPtrType<UInt64>>(attribute.arrays)[idx] = value.get<UInt64>(); break;
case AttributeUnderlyingType::UInt128: std::get<ContainerPtrType<UInt128>>(attribute.arrays)[idx] = value.get<UInt128>(); break;
case AttributeUnderlyingType::Int8: std::get<ContainerPtrType<Int8>>(attribute.arrays)[idx] = value.get<Int64>(); break;
case AttributeUnderlyingType::Int16: std::get<ContainerPtrType<Int16>>(attribute.arrays)[idx] = value.get<Int64>(); break;
case AttributeUnderlyingType::Int32: std::get<ContainerPtrType<Int32>>(attribute.arrays)[idx] = value.get<Int64>(); break;
case AttributeUnderlyingType::Int64: std::get<ContainerPtrType<Int64>>(attribute.arrays)[idx] = value.get<Int64>(); break;
case AttributeUnderlyingType::Float32: std::get<ContainerPtrType<Float32>>(attribute.arrays)[idx] = value.get<Float64>(); break;
case AttributeUnderlyingType::Float64: std::get<ContainerPtrType<Float64>>(attribute.arrays)[idx] = value.get<Float64>(); break;
case AttributeUnderlyingType::UInt8:
std::get<ContainerPtrType<UInt8>>(attribute.arrays)[idx] = value.get<UInt64>();
break;
case AttributeUnderlyingType::UInt16:
std::get<ContainerPtrType<UInt16>>(attribute.arrays)[idx] = value.get<UInt64>();
break;
case AttributeUnderlyingType::UInt32:
std::get<ContainerPtrType<UInt32>>(attribute.arrays)[idx] = value.get<UInt64>();
break;
case AttributeUnderlyingType::UInt64:
std::get<ContainerPtrType<UInt64>>(attribute.arrays)[idx] = value.get<UInt64>();
break;
case AttributeUnderlyingType::UInt128:
std::get<ContainerPtrType<UInt128>>(attribute.arrays)[idx] = value.get<UInt128>();
break;
case AttributeUnderlyingType::Int8:
std::get<ContainerPtrType<Int8>>(attribute.arrays)[idx] = value.get<Int64>();
break;
case AttributeUnderlyingType::Int16:
std::get<ContainerPtrType<Int16>>(attribute.arrays)[idx] = value.get<Int64>();
break;
case AttributeUnderlyingType::Int32:
std::get<ContainerPtrType<Int32>>(attribute.arrays)[idx] = value.get<Int64>();
break;
case AttributeUnderlyingType::Int64:
std::get<ContainerPtrType<Int64>>(attribute.arrays)[idx] = value.get<Int64>();
break;
case AttributeUnderlyingType::Float32:
std::get<ContainerPtrType<Float32>>(attribute.arrays)[idx] = value.get<Float64>();
break;
case AttributeUnderlyingType::Float64:
std::get<ContainerPtrType<Float64>>(attribute.arrays)[idx] = value.get<Float64>();
break;
case AttributeUnderlyingType::Decimal32: std::get<ContainerPtrType<Decimal32>>(attribute.arrays)[idx] = value.get<Decimal32>(); break;
case AttributeUnderlyingType::Decimal64: std::get<ContainerPtrType<Decimal64>>(attribute.arrays)[idx] = value.get<Decimal64>(); break;
case AttributeUnderlyingType::Decimal128: std::get<ContainerPtrType<Decimal128>>(attribute.arrays)[idx] = value.get<Decimal128>(); break;
case AttributeUnderlyingType::Decimal32:
std::get<ContainerPtrType<Decimal32>>(attribute.arrays)[idx] = value.get<Decimal32>();
break;
case AttributeUnderlyingType::Decimal64:
std::get<ContainerPtrType<Decimal64>>(attribute.arrays)[idx] = value.get<Decimal64>();
break;
case AttributeUnderlyingType::Decimal128:
std::get<ContainerPtrType<Decimal128>>(attribute.arrays)[idx] = value.get<Decimal128>();
break;
case AttributeUnderlyingType::String:
{
......
......@@ -2,22 +2,43 @@
namespace DB
{
void ComplexKeyCacheDictionary::setDefaultAttributeValue(Attribute & attribute, const size_t idx) const
{
switch (attribute.type)
{
case AttributeUnderlyingType::UInt8: std::get<ContainerPtrType<UInt8>>(attribute.arrays)[idx] = std::get<UInt8>(attribute.null_values); break;
case AttributeUnderlyingType::UInt16: std::get<ContainerPtrType<UInt16>>(attribute.arrays)[idx] = std::get<UInt16>(attribute.null_values); break;
case AttributeUnderlyingType::UInt32: std::get<ContainerPtrType<UInt32>>(attribute.arrays)[idx] = std::get<UInt32>(attribute.null_values); break;
case AttributeUnderlyingType::UInt64: std::get<ContainerPtrType<UInt64>>(attribute.arrays)[idx] = std::get<UInt64>(attribute.null_values); break;
case AttributeUnderlyingType::UInt128: std::get<ContainerPtrType<UInt128>>(attribute.arrays)[idx] = std::get<UInt128>(attribute.null_values); break;
case AttributeUnderlyingType::Int8: std::get<ContainerPtrType<Int8>>(attribute.arrays)[idx] = std::get<Int8>(attribute.null_values); break;
case AttributeUnderlyingType::Int16: std::get<ContainerPtrType<Int16>>(attribute.arrays)[idx] = std::get<Int16>(attribute.null_values); break;
case AttributeUnderlyingType::Int32: std::get<ContainerPtrType<Int32>>(attribute.arrays)[idx] = std::get<Int32>(attribute.null_values); break;
case AttributeUnderlyingType::Int64: std::get<ContainerPtrType<Int64>>(attribute.arrays)[idx] = std::get<Int64>(attribute.null_values); break;
case AttributeUnderlyingType::Float32: std::get<ContainerPtrType<Float32>>(attribute.arrays)[idx] = std::get<Float32>(attribute.null_values); break;
case AttributeUnderlyingType::Float64: std::get<ContainerPtrType<Float64>>(attribute.arrays)[idx] = std::get<Float64>(attribute.null_values); break;
case AttributeUnderlyingType::UInt8:
std::get<ContainerPtrType<UInt8>>(attribute.arrays)[idx] = std::get<UInt8>(attribute.null_values);
break;
case AttributeUnderlyingType::UInt16:
std::get<ContainerPtrType<UInt16>>(attribute.arrays)[idx] = std::get<UInt16>(attribute.null_values);
break;
case AttributeUnderlyingType::UInt32:
std::get<ContainerPtrType<UInt32>>(attribute.arrays)[idx] = std::get<UInt32>(attribute.null_values);
break;
case AttributeUnderlyingType::UInt64:
std::get<ContainerPtrType<UInt64>>(attribute.arrays)[idx] = std::get<UInt64>(attribute.null_values);
break;
case AttributeUnderlyingType::UInt128:
std::get<ContainerPtrType<UInt128>>(attribute.arrays)[idx] = std::get<UInt128>(attribute.null_values);
break;
case AttributeUnderlyingType::Int8:
std::get<ContainerPtrType<Int8>>(attribute.arrays)[idx] = std::get<Int8>(attribute.null_values);
break;
case AttributeUnderlyingType::Int16:
std::get<ContainerPtrType<Int16>>(attribute.arrays)[idx] = std::get<Int16>(attribute.null_values);
break;
case AttributeUnderlyingType::Int32:
std::get<ContainerPtrType<Int32>>(attribute.arrays)[idx] = std::get<Int32>(attribute.null_values);
break;
case AttributeUnderlyingType::Int64:
std::get<ContainerPtrType<Int64>>(attribute.arrays)[idx] = std::get<Int64>(attribute.null_values);
break;
case AttributeUnderlyingType::Float32:
std::get<ContainerPtrType<Float32>>(attribute.arrays)[idx] = std::get<Float32>(attribute.null_values);
break;
case AttributeUnderlyingType::Float64:
std::get<ContainerPtrType<Float64>>(attribute.arrays)[idx] = std::get<Float64>(attribute.null_values);
break;
case AttributeUnderlyingType::Decimal32:
std::get<ContainerPtrType<Decimal32>>(attribute.arrays)[idx] = std::get<Decimal32>(attribute.null_values);
......
#pragma once
#include "IDictionary.h"
#include "IDictionarySource.h"
#include "DictionaryStructure.h"
#include <common/StringRef.h>
#include <Common/HashTable/HashMap.h>
#include <atomic>
#include <memory>
#include <variant>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <Common/Arena.h>
#include <Common/HashTable/HashMap.h>
#include <common/StringRef.h>
#include <ext/range.h>
#include <atomic>
#include <memory>
#include <variant>
#include "DictionaryStructure.h"
#include "IDictionary.h"
#include "IDictionarySource.h"
namespace DB
{
using BlockPtr = std::shared_ptr<Block>;
class ComplexKeyHashedDictionary final : public IDictionaryBase
{
public:
ComplexKeyHashedDictionary(
const std::string & name, const DictionaryStructure & dict_struct, DictionarySourcePtr source_ptr,
const DictionaryLifetime dict_lifetime, bool require_nonempty, BlockPtr saved_block = nullptr);
const std::string & name,
const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr,
const DictionaryLifetime dict_lifetime,
bool require_nonempty,
BlockPtr saved_block = nullptr);
ComplexKeyHashedDictionary(const ComplexKeyHashedDictionary & other);
......@@ -56,10 +59,7 @@ public:
const DictionaryStructure & getStructure() const override { return dict_struct; }
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override
{
return creation_time;
}
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override { return creation_time; }
bool isInjective(const std::string & attribute_name) const override
{
......@@ -69,10 +69,9 @@ public:
template <typename T>
using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>;
#define DECLARE(TYPE)\
void get##TYPE(\
const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types,\
ResultArrayType<TYPE> & out) const;
#define DECLARE(TYPE) \
void get##TYPE( \
const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types, ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
......@@ -89,14 +88,15 @@ public:
DECLARE(Decimal128)
#undef DECLARE
void getString(
const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types,
ColumnString * out) const;
void getString(const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types, ColumnString * out) const;
#define DECLARE(TYPE)\
void get##TYPE(\
const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types,\
const PaddedPODArray<TYPE> & def, ResultArrayType<TYPE> & out) const;
#define DECLARE(TYPE) \
void get##TYPE( \
const std::string & attribute_name, \
const Columns & key_columns, \
const DataTypes & key_types, \
const PaddedPODArray<TYPE> & def, \
ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
......@@ -114,13 +114,19 @@ public:
#undef DECLARE
void getString(
const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types,
const ColumnString * const def, ColumnString * const out) const;
#define DECLARE(TYPE)\
void get##TYPE(\
const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types,\
const TYPE def, ResultArrayType<TYPE> & out) const;
const std::string & attribute_name,
const Columns & key_columns,
const DataTypes & key_types,
const ColumnString * const def,
ColumnString * const out) const;
#define DECLARE(TYPE) \
void get##TYPE( \
const std::string & attribute_name, \
const Columns & key_columns, \
const DataTypes & key_types, \
const TYPE def, \
ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
......@@ -138,33 +144,57 @@ public:
#undef DECLARE
void getString(
const std::string & attribute_name, const Columns & key_columns, const DataTypes & key_types,
const String & def, ColumnString * const out) const;
const std::string & attribute_name,
const Columns & key_columns,
const DataTypes & key_types,
const String & def,
ColumnString * const out) const;
void has(const Columns & key_columns, const DataTypes & key_types, PaddedPODArray<UInt8> & out) const;
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override;
private:
template <typename Value> using ContainerType = HashMapWithSavedHash<StringRef, Value, StringRefHash>;
template <typename Value>
using ContainerType = HashMapWithSavedHash<StringRef, Value, StringRefHash>;
struct Attribute final
{
AttributeUnderlyingType type;
std::variant<
UInt8, UInt16, UInt32, UInt64,
UInt8,
UInt16,
UInt32,
UInt64,
UInt128,
Int8, Int16, Int32, Int64,
Decimal32, Decimal64, Decimal128,
Float32, Float64,
String> null_values;
Int8,
Int16,
Int32,
Int64,
Decimal32,
Decimal64,
Decimal128,
Float32,
Float64,
String>
null_values;
std::variant<
ContainerType<UInt8>, ContainerType<UInt16>, ContainerType<UInt32>, ContainerType<UInt64>,
ContainerType<UInt8>,
ContainerType<UInt16>,
ContainerType<UInt32>,
ContainerType<UInt64>,
ContainerType<UInt128>,
ContainerType<Int8>, ContainerType<Int16>, ContainerType<Int32>, ContainerType<Int64>,
ContainerType<Decimal32>, ContainerType<Decimal64>, ContainerType<Decimal128>,
ContainerType<Float32>, ContainerType<Float64>,
ContainerType<StringRef>> maps;
ContainerType<Int8>,
ContainerType<Int16>,
ContainerType<Int32>,
ContainerType<Int64>,
ContainerType<Decimal32>,
ContainerType<Decimal64>,
ContainerType<Decimal128>,
ContainerType<Float32>,
ContainerType<Float64>,
ContainerType<StringRef>>
maps;
std::unique_ptr<Arena> string_arena;
};
......@@ -188,18 +218,12 @@ private:
template <typename OutputType, typename ValueSetter, typename DefaultGetter>
void getItemsNumber(
const Attribute & attribute,
const Columns & key_columns,
ValueSetter && set_value,
DefaultGetter && get_default) const;
void
getItemsNumber(const Attribute & attribute, const Columns & key_columns, ValueSetter && set_value, DefaultGetter && get_default) const;
template <typename AttributeType, typename OutputType, typename ValueSetter, typename DefaultGetter>
void getItemsImpl(
const Attribute & attribute,
const Columns & key_columns,
ValueSetter && set_value,
DefaultGetter && get_default) const;
void
getItemsImpl(const Attribute & attribute, const Columns & key_columns, ValueSetter && set_value, DefaultGetter && get_default) const;
template <typename T>
......@@ -209,8 +233,7 @@ private:
const Attribute & getAttribute(const std::string & attribute_name) const;
static StringRef placeKeysInPool(
const size_t row, const Columns & key_columns, StringRefs & keys, Arena & pool);
static StringRef placeKeysInPool(const size_t row, const Columns & key_columns, StringRefs & keys, Arena & pool);
template <typename T>
void has(const Attribute & attribute, const Columns & key_columns, PaddedPODArray<UInt8> & out) const;
......
......@@ -2,7 +2,6 @@
namespace DB
{
DictionaryBlockInputStreamBase::DictionaryBlockInputStreamBase(size_t rows_count, size_t max_block_size)
: rows_count(rows_count), max_block_size(max_block_size)
{
......
......@@ -4,7 +4,6 @@
namespace DB
{
class DictionaryBlockInputStreamBase : public IProfilingBlockInputStream
{
protected:
......
......@@ -27,10 +27,10 @@ DictionaryPtr DictionaryFactory::create(
const auto & layout_prefix = config_prefix + ".layout";
config.keys(layout_prefix, keys);
if (keys.size() != 1)
throw Exception {name + ": element dictionary.layout should have exactly one child element",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG};
throw Exception{name + ": element dictionary.layout should have exactly one child element",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG};
const DictionaryStructure dict_struct {config, config_prefix + ".structure"};
const DictionaryStructure dict_struct{config, config_prefix + ".structure"};
auto source_ptr = DictionarySourceFactory::instance().create(name, config, config_prefix + ".source", dict_struct, context);
......@@ -45,7 +45,7 @@ DictionaryPtr DictionaryFactory::create(
}
}
throw Exception {name + ": unknown dictionary layout type: " + layout_type, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG};
throw Exception{name + ": unknown dictionary layout type: " + layout_type, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG};
}
}
......@@ -26,7 +26,7 @@ namespace
Block block;
if (dict_struct.id)
block.insert(ColumnWithTypeAndName {ColumnUInt64::create(1, 0), std::make_shared<DataTypeUInt64>(), dict_struct.id->name});
block.insert(ColumnWithTypeAndName{ColumnUInt64::create(1, 0), std::make_shared<DataTypeUInt64>(), dict_struct.id->name});
if (dict_struct.key)
{
......@@ -35,7 +35,7 @@ namespace
auto column = attribute.type->createColumn();
column->insertDefault();
block.insert(ColumnWithTypeAndName {std::move(column), attribute.type, attribute.name});
block.insert(ColumnWithTypeAndName{std::move(column), attribute.type, attribute.name});
}
}
......@@ -47,7 +47,7 @@ namespace
auto column = type->createColumn();
column->insertDefault();
block.insert(ColumnWithTypeAndName {std::move(column), type, attribute->name});
block.insert(ColumnWithTypeAndName{std::move(column), type, attribute->name});
}
}
......@@ -56,7 +56,7 @@ namespace
auto column = attribute.type->createColumn();
column->insert(attribute.null_value);
block.insert(ColumnWithTypeAndName {std::move(column), attribute.type, attribute.name});
block.insert(ColumnWithTypeAndName{std::move(column), attribute.type, attribute.name});
}
return block;
......@@ -86,8 +86,8 @@ DictionarySourcePtr DictionarySourceFactory::create(
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
if (keys.size() != 1)
throw Exception {name + ": element dictionary.source should have exactly one child element",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG};
throw Exception{name + ": element dictionary.source should have exactly one child element",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG};
auto sample_block = createSampleBlock(dict_struct);
......@@ -102,7 +102,7 @@ DictionarySourcePtr DictionarySourceFactory::create(
}
}
throw Exception {name + ": unknown dictionary source type: " + source_type, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG};
throw Exception{name + ": unknown dictionary source type: " + source_type, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG};
}
}
#include "DictionarySourceHelpers.h"
#include "DictionaryStructure.h"
#include <Core/ColumnWithTypeAndName.h>
#include <Core/Block.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <Core/Block.h>
#include <Core/ColumnWithTypeAndName.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/WriteHelpers.h>
#include "DictionaryStructure.h"
namespace DB
{
/// For simple key
void formatIDs(BlockOutputStreamPtr & out, const std::vector<UInt64> & ids)
{
auto column = ColumnUInt64::create(ids.size());
memcpy(column->getData().data(), ids.data(), ids.size() * sizeof(ids.front()));
Block block{{ std::move(column), std::make_shared<DataTypeUInt64>(), "id" }};
Block block{{std::move(column), std::make_shared<DataTypeUInt64>(), "id"}};
out->writePrefix();
out->write(block);
......@@ -26,8 +25,11 @@ void formatIDs(BlockOutputStreamPtr & out, const std::vector<UInt64> & ids)
}
/// For composite key
void formatKeys(const DictionaryStructure & dict_struct, BlockOutputStreamPtr & out,
const Columns & key_columns, const std::vector<size_t> & requested_rows)
void formatKeys(
const DictionaryStructure & dict_struct,
BlockOutputStreamPtr & out,
const Columns & key_columns,
const std::vector<size_t> & requested_rows)
{
Block block;
for (size_t i = 0, size = key_columns.size(); i < size; ++i)
......@@ -39,7 +41,7 @@ void formatKeys(const DictionaryStructure & dict_struct, BlockOutputStreamPtr &
for (size_t idx : requested_rows)
filtered_column->insertFrom(*source_column, idx);
block.insert({ std::move(filtered_column), (*dict_struct.key)[i].type, toString(i) });
block.insert({std::move(filtered_column), (*dict_struct.key)[i].type, toString(i)});
}
out->writePrefix();
......
#pragma once
#include <vector>
#include <common/Types.h>
#include <Columns/IColumn.h>
#include <common/Types.h>
namespace DB
{
class IBlockOutputStream;
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
......@@ -19,7 +18,10 @@ struct DictionaryStructure;
void formatIDs(BlockOutputStreamPtr & out, const std::vector<UInt64> & ids);
/// For composite key
void formatKeys(const DictionaryStructure & dict_struct, BlockOutputStreamPtr & out,
const Columns & key_columns, const std::vector<size_t> & requested_rows);
void formatKeys(
const DictionaryStructure & dict_struct,
BlockOutputStreamPtr & out,
const Columns & key_columns,
const std::vector<size_t> & requested_rows);
}
#include "DictionaryStructure.h"
#include <Formats/FormatSettings.h>
#include <Columns/IColumn.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeNullable.h>
#include <Columns/IColumn.h>
#include <Common/StringUtils/StringUtils.h>
#include <Formats/FormatSettings.h>
#include <IO/WriteHelpers.h>
#include <Common/StringUtils/StringUtils.h>
#include <ext/range.h>
#include <numeric>
#include <unordered_set>
#include <unordered_map>
#include <unordered_set>
#include <ext/range.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_TYPE;
......@@ -25,20 +24,18 @@ namespace ErrorCodes
namespace
{
DictionaryTypedSpecialAttribute makeDictionaryTypedSpecialAttribute(
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const std::string& default_type)
{
const auto name = config.getString(config_prefix + ".name", "");
const auto expression = config.getString(config_prefix + ".expression", "");
DictionaryTypedSpecialAttribute makeDictionaryTypedSpecialAttribute(
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const std::string & default_type)
{
const auto name = config.getString(config_prefix + ".name", "");
const auto expression = config.getString(config_prefix + ".expression", "");
if (name.empty() && !expression.empty())
throw Exception{"Element " + config_prefix + ".name is empty", ErrorCodes::BAD_ARGUMENTS};
if (name.empty() && !expression.empty())
throw Exception{"Element " + config_prefix + ".name is empty", ErrorCodes::BAD_ARGUMENTS};
const auto type_name = config.getString(config_prefix + ".type", default_type);
return DictionaryTypedSpecialAttribute{std::move(name), std::move(expression), DataTypeFactory::instance().get(type_name)};
}
const auto type_name = config.getString(config_prefix + ".type", default_type);
return DictionaryTypedSpecialAttribute{std::move(name), std::move(expression), DataTypeFactory::instance().get(type_name)};
}
} // namespace
......@@ -52,27 +49,27 @@ bool isAttributeTypeConvertibleTo(AttributeUnderlyingType from, AttributeUnderly
* (for example, because integers can not be converted to floats)
* This is normal for a limited usage scope.
*/
if ( (from == AttributeUnderlyingType::UInt8 && to == AttributeUnderlyingType::UInt16)
|| (from == AttributeUnderlyingType::UInt8 && to == AttributeUnderlyingType::UInt32)
|| (from == AttributeUnderlyingType::UInt8 && to == AttributeUnderlyingType::UInt64)
|| (from == AttributeUnderlyingType::UInt16 && to == AttributeUnderlyingType::UInt32)
|| (from == AttributeUnderlyingType::UInt16 && to == AttributeUnderlyingType::UInt64)
|| (from == AttributeUnderlyingType::UInt32 && to == AttributeUnderlyingType::UInt64)
|| (from == AttributeUnderlyingType::UInt8 && to == AttributeUnderlyingType::Int16)
|| (from == AttributeUnderlyingType::UInt8 && to == AttributeUnderlyingType::Int32)
|| (from == AttributeUnderlyingType::UInt8 && to == AttributeUnderlyingType::Int64)
|| (from == AttributeUnderlyingType::UInt16 && to == AttributeUnderlyingType::Int32)
|| (from == AttributeUnderlyingType::UInt16 && to == AttributeUnderlyingType::Int64)
|| (from == AttributeUnderlyingType::UInt32 && to == AttributeUnderlyingType::Int64)
|| (from == AttributeUnderlyingType::Int8 && to == AttributeUnderlyingType::Int16)
|| (from == AttributeUnderlyingType::Int8 && to == AttributeUnderlyingType::Int32)
|| (from == AttributeUnderlyingType::Int8 && to == AttributeUnderlyingType::Int64)
|| (from == AttributeUnderlyingType::Int16 && to == AttributeUnderlyingType::Int32)
|| (from == AttributeUnderlyingType::Int16 && to == AttributeUnderlyingType::Int64)
|| (from == AttributeUnderlyingType::Int32 && to == AttributeUnderlyingType::Int64)
|| (from == AttributeUnderlyingType::Float32 && to == AttributeUnderlyingType::Float64))
if ((from == AttributeUnderlyingType::UInt8 && to == AttributeUnderlyingType::UInt16)
|| (from == AttributeUnderlyingType::UInt8 && to == AttributeUnderlyingType::UInt32)
|| (from == AttributeUnderlyingType::UInt8 && to == AttributeUnderlyingType::UInt64)
|| (from == AttributeUnderlyingType::UInt16 && to == AttributeUnderlyingType::UInt32)
|| (from == AttributeUnderlyingType::UInt16 && to == AttributeUnderlyingType::UInt64)
|| (from == AttributeUnderlyingType::UInt32 && to == AttributeUnderlyingType::UInt64)
|| (from == AttributeUnderlyingType::UInt8 && to == AttributeUnderlyingType::Int16)
|| (from == AttributeUnderlyingType::UInt8 && to == AttributeUnderlyingType::Int32)
|| (from == AttributeUnderlyingType::UInt8 && to == AttributeUnderlyingType::Int64)
|| (from == AttributeUnderlyingType::UInt16 && to == AttributeUnderlyingType::Int32)
|| (from == AttributeUnderlyingType::UInt16 && to == AttributeUnderlyingType::Int64)
|| (from == AttributeUnderlyingType::UInt32 && to == AttributeUnderlyingType::Int64)
|| (from == AttributeUnderlyingType::Int8 && to == AttributeUnderlyingType::Int16)
|| (from == AttributeUnderlyingType::Int8 && to == AttributeUnderlyingType::Int32)
|| (from == AttributeUnderlyingType::Int8 && to == AttributeUnderlyingType::Int64)
|| (from == AttributeUnderlyingType::Int16 && to == AttributeUnderlyingType::Int32)
|| (from == AttributeUnderlyingType::Int16 && to == AttributeUnderlyingType::Int64)
|| (from == AttributeUnderlyingType::Int32 && to == AttributeUnderlyingType::Int64)
|| (from == AttributeUnderlyingType::Float32 && to == AttributeUnderlyingType::Float64))
{
return true;
}
......@@ -84,20 +81,20 @@ bool isAttributeTypeConvertibleTo(AttributeUnderlyingType from, AttributeUnderly
AttributeUnderlyingType getAttributeUnderlyingType(const std::string & type)
{
static const std::unordered_map<std::string, AttributeUnderlyingType> dictionary{
{ "UInt8", AttributeUnderlyingType::UInt8 },
{ "UInt16", AttributeUnderlyingType::UInt16 },
{ "UInt32", AttributeUnderlyingType::UInt32 },
{ "UInt64", AttributeUnderlyingType::UInt64 },
{ "UUID", AttributeUnderlyingType::UInt128 },
{ "Int8", AttributeUnderlyingType::Int8 },
{ "Int16", AttributeUnderlyingType::Int16 },
{ "Int32", AttributeUnderlyingType::Int32 },
{ "Int64", AttributeUnderlyingType::Int64 },
{ "Float32", AttributeUnderlyingType::Float32 },
{ "Float64", AttributeUnderlyingType::Float64 },
{ "String", AttributeUnderlyingType::String },
{ "Date", AttributeUnderlyingType::UInt16 },
{ "DateTime", AttributeUnderlyingType::UInt32 },
{"UInt8", AttributeUnderlyingType::UInt8},
{"UInt16", AttributeUnderlyingType::UInt16},
{"UInt32", AttributeUnderlyingType::UInt32},
{"UInt64", AttributeUnderlyingType::UInt64},
{"UUID", AttributeUnderlyingType::UInt128},
{"Int8", AttributeUnderlyingType::Int8},
{"Int16", AttributeUnderlyingType::Int16},
{"Int32", AttributeUnderlyingType::Int32},
{"Int64", AttributeUnderlyingType::Int64},
{"Float32", AttributeUnderlyingType::Float32},
{"Float64", AttributeUnderlyingType::Float64},
{"String", AttributeUnderlyingType::String},
{"Date", AttributeUnderlyingType::UInt16},
{"DateTime", AttributeUnderlyingType::UInt32},
};
const auto it = dictionary.find(type);
......@@ -123,21 +120,36 @@ std::string toString(const AttributeUnderlyingType type)
{
switch (type)
{
case AttributeUnderlyingType::UInt8: return "UInt8";
case AttributeUnderlyingType::UInt16: return "UInt16";
case AttributeUnderlyingType::UInt32: return "UInt32";
case AttributeUnderlyingType::UInt64: return "UInt64";
case AttributeUnderlyingType::UInt128: return "UUID";
case AttributeUnderlyingType::Int8: return "Int8";
case AttributeUnderlyingType::Int16: return "Int16";
case AttributeUnderlyingType::Int32: return "Int32";
case AttributeUnderlyingType::Int64: return "Int64";
case AttributeUnderlyingType::Float32: return "Float32";
case AttributeUnderlyingType::Float64: return "Float64";
case AttributeUnderlyingType::Decimal32: return "Decimal32";
case AttributeUnderlyingType::Decimal64: return "Decimal64";
case AttributeUnderlyingType::Decimal128: return "Decimal128";
case AttributeUnderlyingType::String: return "String";
case AttributeUnderlyingType::UInt8:
return "UInt8";
case AttributeUnderlyingType::UInt16:
return "UInt16";
case AttributeUnderlyingType::UInt32:
return "UInt32";
case AttributeUnderlyingType::UInt64:
return "UInt64";
case AttributeUnderlyingType::UInt128:
return "UUID";
case AttributeUnderlyingType::Int8:
return "Int8";
case AttributeUnderlyingType::Int16:
return "Int16";
case AttributeUnderlyingType::Int32:
return "Int32";
case AttributeUnderlyingType::Int64:
return "Int64";
case AttributeUnderlyingType::Float32:
return "Float32";
case AttributeUnderlyingType::Float64:
return "Float64";
case AttributeUnderlyingType::Decimal32:
return "Decimal32";
case AttributeUnderlyingType::Decimal64:
return "Decimal64";
case AttributeUnderlyingType::Decimal128:
return "Decimal128";
case AttributeUnderlyingType::String:
return "String";
}
throw Exception{"Unknown attribute_type " + toString(static_cast<int>(type)), ErrorCodes::ARGUMENT_OUT_OF_BOUND};
......@@ -145,8 +157,7 @@ std::string toString(const AttributeUnderlyingType type)
DictionarySpecialAttribute::DictionarySpecialAttribute(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
: name{config.getString(config_prefix + ".name", "")},
expression{config.getString(config_prefix + ".expression", "")}
: name{config.getString(config_prefix + ".name", "")}, expression{config.getString(config_prefix + ".expression", "")}
{
if (name.empty() && !expression.empty())
throw Exception{"Element " + config_prefix + ".name is empty", ErrorCodes::BAD_ARGUMENTS};
......@@ -186,28 +197,31 @@ DictionaryStructure::DictionaryStructure(const Poco::Util::AbstractConfiguration
if (range_min.has_value() != range_max.has_value())
{
throw Exception{"Dictionary structure should have both 'range_min' and 'range_max' either specified or not.", ErrorCodes::BAD_ARGUMENTS};
throw Exception{"Dictionary structure should have both 'range_min' and 'range_max' either specified or not.",
ErrorCodes::BAD_ARGUMENTS};
}
if (range_min && range_max && !range_min->type->equals(*range_max->type))
{
throw Exception{"Dictionary structure 'range_min' and 'range_max' should have same type, "
"'range_min' type: " + range_min->type->getName() + ", "
"'range_max' type: " + range_max->type->getName(),
ErrorCodes::BAD_ARGUMENTS};
"'range_min' type: "
+ range_min->type->getName()
+ ", "
"'range_max' type: "
+ range_max->type->getName(),
ErrorCodes::BAD_ARGUMENTS};
}
if (range_min)
{
if (!range_min->type->isValueRepresentedByInteger())
throw Exception{"Dictionary structure type of 'range_min' and 'range_max' should be an integer, Date, DateTime, or Enum."
" Actual 'range_min' and 'range_max' type is " + range_min->type->getName(),
ErrorCodes::BAD_ARGUMENTS};
" Actual 'range_min' and 'range_max' type is "
+ range_min->type->getName(),
ErrorCodes::BAD_ARGUMENTS};
}
if (!id->expression.empty() ||
(range_min && !range_min->expression.empty()) ||
(range_max && !range_max->expression.empty()))
if (!id->expression.empty() || (range_min && !range_min->expression.empty()) || (range_max && !range_max->expression.empty()))
has_expressions = true;
}
......@@ -228,8 +242,9 @@ void DictionaryStructure::validateKeyTypes(const DataTypes & key_types) const
const auto & actual_type = key_types[i]->getName();
if (expected_type != actual_type)
throw Exception{"Key type at position " + std::to_string(i) + " does not match, expected " + expected_type +
", found " + actual_type, ErrorCodes::TYPE_MISMATCH};
throw Exception{"Key type at position " + std::to_string(i) + " does not match, expected " + expected_type + ", found "
+ actual_type,
ErrorCodes::TYPE_MISMATCH};
}
}
......@@ -274,15 +289,17 @@ bool DictionaryStructure::isKeySizeFixed() const
size_t DictionaryStructure::getKeySize() const
{
return std::accumulate(std::begin(*key), std::end(*key), size_t{},
[] (const auto running_size, const auto & key_i) {return running_size + key_i.type->getSizeOfValueInMemory(); });
return std::accumulate(std::begin(*key), std::end(*key), size_t{}, [](const auto running_size, const auto & key_i)
{
return running_size + key_i.type->getSizeOfValueInMemory();
});
}
static void checkAttributeKeys(const Poco::Util::AbstractConfiguration::Keys & keys)
{
static const std::unordered_set<std::string> valid_keys =
{ "name", "type", "expression", "null_value", "hierarchical", "injective", "is_object_id" };
static const std::unordered_set<std::string> valid_keys
= {"name", "type", "expression", "null_value", "hierarchical", "injective", "is_object_id"};
for (const auto & key : keys)
{
......@@ -293,8 +310,10 @@ static void checkAttributeKeys(const Poco::Util::AbstractConfiguration::Keys & k
std::vector<DictionaryAttribute> DictionaryStructure::getAttributes(
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
const bool hierarchy_allowed, const bool allow_null_values)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const bool hierarchy_allowed,
const bool allow_null_values)
{
Poco::Util::AbstractConfiguration::Keys config_elems;
config.keys(config_prefix, config_elems);
......@@ -361,9 +380,8 @@ std::vector<DictionaryAttribute> DictionaryStructure::getAttributes(
has_hierarchy = has_hierarchy || hierarchical;
res_attributes.emplace_back(DictionaryAttribute{
name, underlying_type, type, expression, null_value, hierarchical, injective, is_object_id
});
res_attributes.emplace_back(
DictionaryAttribute{name, underlying_type, type, expression, null_value, hierarchical, injective, is_object_id});
}
return res_attributes;
......
......@@ -5,15 +5,14 @@
#include <Interpreters/IExternalLoadable.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <vector>
#include <string>
#include <map>
#include <optional>
#include <string>
#include <vector>
namespace DB
{
enum class AttributeUnderlyingType
{
UInt8,
......@@ -104,8 +103,10 @@ struct DictionaryStructure final
private:
std::vector<DictionaryAttribute> getAttributes(
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
const bool hierarchy_allowed = true, const bool allow_null_values = true);
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const bool hierarchy_allowed = true,
const bool allow_null_values = true);
};
}
......@@ -4,8 +4,7 @@
#include "GeodataProviders/HierarchiesProvider.h"
#include "GeodataProviders/NamesProvider.h"
std::unique_ptr<RegionsHierarchies> GeoDictionariesLoader::reloadRegionsHierarchies(
const Poco::Util::AbstractConfiguration & config)
std::unique_ptr<RegionsHierarchies> GeoDictionariesLoader::reloadRegionsHierarchies(const Poco::Util::AbstractConfiguration & config)
{
static constexpr auto config_key = "path_to_regions_hierarchy_file";
......@@ -17,8 +16,7 @@ std::unique_ptr<RegionsHierarchies> GeoDictionariesLoader::reloadRegionsHierarch
return std::make_unique<RegionsHierarchies>(std::move(data_provider));
}
std::unique_ptr<RegionsNames> GeoDictionariesLoader::reloadRegionsNames(
const Poco::Util::AbstractConfiguration & config)
std::unique_ptr<RegionsNames> GeoDictionariesLoader::reloadRegionsNames(const Poco::Util::AbstractConfiguration & config)
{
static constexpr auto config_key = "path_to_regions_names_files";
......
......@@ -7,9 +7,7 @@
class GeoDictionariesLoader : public IGeoDictionariesLoader
{
public:
std::unique_ptr<RegionsHierarchies> reloadRegionsHierarchies(
const Poco::Util::AbstractConfiguration & config) override;
std::unique_ptr<RegionsHierarchies> reloadRegionsHierarchies(const Poco::Util::AbstractConfiguration & config) override;
std::unique_ptr<RegionsNames> reloadRegionsNames(
const Poco::Util::AbstractConfiguration & config) override;
std::unique_ptr<RegionsNames> reloadRegionsNames(const Poco::Util::AbstractConfiguration & config) override;
};
#pragma once
#include "Types.h"
#include <string>
#include "Types.h"
struct RegionEntry
{
......@@ -17,4 +17,3 @@ struct RegionNameEntry
RegionID id;
std::string name;
};
#include "HierarchiesProvider.h"
#include "HierarchyFormatReader.h"
#include <IO/ReadBufferFromFile.h>
#include <Poco/Util/Application.h>
#include <Poco/Exception.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/Exception.h>
#include <Poco/Util/Application.h>
#include "HierarchyFormatReader.h"
bool RegionsHierarchyDataSource::isModified() const
......@@ -20,8 +20,7 @@ IRegionsHierarchyReaderPtr RegionsHierarchyDataSource::createReader()
}
RegionsHierarchiesDataProvider::RegionsHierarchiesDataProvider(const std::string & path)
: path(path)
RegionsHierarchiesDataProvider::RegionsHierarchiesDataProvider(const std::string & path) : path(path)
{
discoverFilesWithCustomHierarchies();
}
......@@ -37,9 +36,8 @@ void RegionsHierarchiesDataProvider::discoverFilesWithCustomHierarchies()
{
std::string candidate_basename = dir_it.path().getBaseName();
if ((0 == candidate_basename.compare(0, basename.size(), basename)) &&
(candidate_basename.size() > basename.size() + 1) &&
(candidate_basename[basename.size()] == '_'))
if ((0 == candidate_basename.compare(0, basename.size(), basename)) && (candidate_basename.size() > basename.size() + 1)
&& (candidate_basename[basename.size()] == '_'))
{
const std::string suffix = candidate_basename.substr(basename.size() + 1);
hierarchy_files.emplace(suffix, dir_it->path());
......
......@@ -2,23 +2,19 @@
#include "IHierarchiesProvider.h"
#include <Common/FileUpdatesTracker.h>
#include <unordered_map>
#include <Common/FileUpdatesTracker.h>
// Represents local file with regions hierarchy dump
class RegionsHierarchyDataSource
: public IRegionsHierarchyDataSource
class RegionsHierarchyDataSource : public IRegionsHierarchyDataSource
{
private:
std::string path;
FileUpdatesTracker updates_tracker;
public:
RegionsHierarchyDataSource(const std::string & path_)
: path(path_)
, updates_tracker(path_)
{}
RegionsHierarchyDataSource(const std::string & path_) : path(path_), updates_tracker(path_) {}
bool isModified() const override;
......@@ -27,8 +23,7 @@ public:
// Provides access to directory with multiple data source files: one file per regions hierarchy
class RegionsHierarchiesDataProvider
: public IRegionsHierarchiesDataProvider
class RegionsHierarchiesDataProvider : public IRegionsHierarchiesDataProvider
{
private:
// path to file with default regions hierarchy
......@@ -55,4 +50,3 @@ public:
private:
void discoverFilesWithCustomHierarchies();
};
......@@ -30,9 +30,8 @@ bool RegionsHierarchyFormatReader::readNext(RegionEntry & entry)
++input->position();
UInt64 population_big = 0;
DB::readIntText(population_big, *input);
population = population_big > std::numeric_limits<RegionPopulation>::max()
? std::numeric_limits<RegionPopulation>::max()
: population_big;
population = population_big > std::numeric_limits<RegionPopulation>::max() ? std::numeric_limits<RegionPopulation>::max()
: population_big;
}
DB::assertChar('\n', *input);
......
#pragma once
#include "IHierarchiesProvider.h"
#include <IO/ReadBuffer.h>
#include "IHierarchiesProvider.h"
// Reads regions hierarchy in geoexport format
......@@ -11,10 +11,7 @@ private:
DB::ReadBufferPtr input;
public:
RegionsHierarchyFormatReader(DB::ReadBufferPtr input_)
: input(std::move(input_))
{}
RegionsHierarchyFormatReader(DB::ReadBufferPtr input_) : input(std::move(input_)) {}
bool readNext(RegionEntry & entry) override;
};
#pragma once
#include "Entries.h"
#include <memory>
#include <string>
#include <vector>
#include "Entries.h"
// Iterates over all regions in data source
......@@ -46,4 +46,3 @@ public:
};
using IRegionsHierarchiesDataProviderPtr = std::shared_ptr<IRegionsHierarchiesDataProvider>;
#pragma once
#include "Entries.h"
#include <memory>
#include "Entries.h"
// Iterates over all name entries in data source
......@@ -42,11 +42,9 @@ using ILanguageRegionsNamesDataSourcePtr = std::unique_ptr<ILanguageRegionsNames
class IRegionsNamesDataProvider
{
public:
virtual ILanguageRegionsNamesDataSourcePtr getLanguageRegionsNamesSource(
const std::string & language) const = 0;
virtual ILanguageRegionsNamesDataSourcePtr getLanguageRegionsNamesSource(const std::string & language) const = 0;
virtual ~IRegionsNamesDataProvider() {}
};
using IRegionsNamesDataProviderPtr = std::unique_ptr<IRegionsNamesDataProvider>;
#pragma once
#include "INamesProvider.h"
#include <IO/ReadBuffer.h>
#include "INamesProvider.h"
// Reads regions names list in geoexport format
......@@ -11,9 +11,7 @@ private:
DB::ReadBufferPtr input;
public:
LanguageRegionsNamesFormatReader(DB::ReadBufferPtr input_)
: input(std::move(input_))
{}
LanguageRegionsNamesFormatReader(DB::ReadBufferPtr input_) : input(std::move(input_)) {}
bool readNext(RegionNameEntry & entry) override;
};
#include "NamesProvider.h"
#include "NamesFormatReader.h"
#include <IO/ReadBufferFromFile.h>
#include "NamesFormatReader.h"
bool LanguageRegionsNamesDataSource::isModified() const
......@@ -32,12 +32,11 @@ std::string LanguageRegionsNamesDataSource::getSourceName() const
}
RegionsNamesDataProvider::RegionsNamesDataProvider(const std::string & directory_)
: directory(directory_)
{}
RegionsNamesDataProvider::RegionsNamesDataProvider(const std::string & directory_) : directory(directory_)
{
}
ILanguageRegionsNamesDataSourcePtr RegionsNamesDataProvider::getLanguageRegionsNamesSource(
const std::string & language) const
ILanguageRegionsNamesDataSourcePtr RegionsNamesDataProvider::getLanguageRegionsNamesSource(const std::string & language) const
{
const auto data_file = getDataFilePath(language);
return std::make_unique<LanguageRegionsNamesDataSource>(data_file, language);
......
#pragma once
#include "INamesProvider.h"
#include <Common/FileUpdatesTracker.h>
#include "INamesProvider.h"
// Represents local file with list of regions ids / names
......@@ -14,10 +14,9 @@ private:
public:
LanguageRegionsNamesDataSource(const std::string & path_, const std::string & language_)
: path(path_)
, updates_tracker(path_)
, language(language_)
{}
: path(path_), updates_tracker(path_), language(language_)
{
}
bool isModified() const override;
......@@ -42,8 +41,7 @@ private:
public:
RegionsNamesDataProvider(const std::string & directory_);
ILanguageRegionsNamesDataSourcePtr getLanguageRegionsNamesSource(
const std::string & language) const override;
ILanguageRegionsNamesDataSourcePtr getLanguageRegionsNamesSource(const std::string & language) const override;
private:
std::string getDataFilePath(const std::string & language) const;
......
#pragma once
#include <memory>
#include "RegionsHierarchies.h"
#include "RegionsNames.h"
#include <memory>
namespace Poco
{
namespace Util
{
class AbstractConfiguration;
}
namespace Util
{
class AbstractConfiguration;
}
class Logger;
class Logger;
}
......@@ -20,11 +20,9 @@ namespace Poco
class IGeoDictionariesLoader
{
public:
virtual std::unique_ptr<RegionsHierarchies> reloadRegionsHierarchies(
const Poco::Util::AbstractConfiguration & config) = 0;
virtual std::unique_ptr<RegionsHierarchies> reloadRegionsHierarchies(const Poco::Util::AbstractConfiguration & config) = 0;
virtual std::unique_ptr<RegionsNames> reloadRegionsNames(
const Poco::Util::AbstractConfiguration & config) = 0;
virtual std::unique_ptr<RegionsNames> reloadRegionsNames(const Poco::Util::AbstractConfiguration & config) = 0;
virtual ~IGeoDictionariesLoader() {}
};
#include "RegionsHierarchies.h"
#include <common/logger_useful.h>
#include <Poco/DirectoryIterator.h>
#include <common/logger_useful.h>
RegionsHierarchies::RegionsHierarchies(IRegionsHierarchiesDataProviderPtr data_provider)
......
#pragma once
#include "RegionsHierarchy.h"
#include "GeodataProviders/IHierarchiesProvider.h"
#include <Poco/Exception.h>
#include <unordered_map>
#include <Poco/Exception.h>
#include "GeodataProviders/IHierarchiesProvider.h"
#include "RegionsHierarchy.h"
/** Contains several hierarchies of regions.
......
#include "RegionsHierarchy.h"
#include "GeodataProviders/IHierarchiesProvider.h"
#include <Poco/Util/Application.h>
#include <IO/WriteHelpers.h>
#include <Poco/Exception.h>
#include <Poco/Util/Application.h>
#include <common/logger_useful.h>
#include <ext/singleton.h>
#include <IO/WriteHelpers.h>
#include "GeodataProviders/IHierarchiesProvider.h"
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}
}
RegionsHierarchy::RegionsHierarchy(IRegionsHierarchyDataSourcePtr data_source_)
: data_source(data_source_)
RegionsHierarchy::RegionsHierarchy(IRegionsHierarchyDataSourcePtr data_source_) : data_source(data_source_)
{
}
......@@ -56,7 +55,8 @@ void RegionsHierarchy::reload()
if (region_entry.id > max_region_id)
{
if (region_entry.id > max_size)
throw DB::Exception("Region id is too large: " + DB::toString(region_entry.id) + ", should be not more than " + DB::toString(max_size),
throw DB::Exception(
"Region id is too large: " + DB::toString(region_entry.id) + ", should be not more than " + DB::toString(max_size),
DB::ErrorCodes::INCORRECT_DATA);
max_region_id = region_entry.id;
......@@ -74,16 +74,16 @@ void RegionsHierarchy::reload()
types[region_entry.id] = region_entry.type;
}
new_parents .resize(max_region_id + 1);
new_city .resize(max_region_id + 1);
new_country .resize(max_region_id + 1);
new_area .resize(max_region_id + 1);
new_district .resize(max_region_id + 1);
new_continent .resize(max_region_id + 1);
new_parents.resize(max_region_id + 1);
new_city.resize(max_region_id + 1);
new_country.resize(max_region_id + 1);
new_area.resize(max_region_id + 1);
new_district.resize(max_region_id + 1);
new_continent.resize(max_region_id + 1);
new_top_continent.resize(max_region_id + 1);
new_populations .resize(max_region_id + 1);
new_depths .resize(max_region_id + 1);
types .resize(max_region_id + 1);
new_populations.resize(max_region_id + 1);
new_depths.resize(max_region_id + 1);
types.resize(max_region_id + 1);
/// prescribe the cities and countries for the regions
for (RegionID i = 0; i <= max_region_id; ++i)
......@@ -113,14 +113,16 @@ void RegionsHierarchy::reload()
++depth;
if (depth == std::numeric_limits<RegionDepth>::max())
throw Poco::Exception("Logical error in regions hierarchy: region " + DB::toString(current) + " possible is inside infinite loop");
throw Poco::Exception(
"Logical error in regions hierarchy: region " + DB::toString(current) + " possible is inside infinite loop");
current = new_parents[current];
if (current == 0)
break;
if (current > max_region_id)
throw Poco::Exception("Logical error in regions hierarchy: region " + DB::toString(current) + " (specified as parent) doesn't exist");
throw Poco::Exception(
"Logical error in regions hierarchy: region " + DB::toString(current) + " (specified as parent) doesn't exist");
if (types[current] == RegionType::City)
new_city[i] = current;
......
#pragma once
#include "GeodataProviders/IHierarchiesProvider.h"
#include <vector>
#include <boost/noncopyable.hpp>
#include <common/Types.h>
#include "GeodataProviders/IHierarchiesProvider.h"
class IRegionsHierarchyDataProvider;
......
#include "RegionsNames.h"
#include "GeodataProviders/INamesProvider.h"
#include <Poco/Util/Application.h>
#include <IO/WriteHelpers.h>
#include <Poco/Exception.h>
#include <Poco/Util/Application.h>
#include <common/logger_useful.h>
#include <IO/WriteHelpers.h>
#include "GeodataProviders/INamesProvider.h"
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}
}
......@@ -84,7 +84,8 @@ void RegionsNames::reload()
max_region_id = name_entry.id;
if (name_entry.id > max_size)
throw DB::Exception("Region id is too large: " + DB::toString(name_entry.id) + ", should be not more than " + DB::toString(max_size),
throw DB::Exception(
"Region id is too large: " + DB::toString(name_entry.id) + ", should be not more than " + DB::toString(max_size),
DB::ErrorCodes::INCORRECT_DATA);
}
......
#pragma once
#include "GeodataProviders/INamesProvider.h"
#include <Poco/Exception.h>
#include <common/Types.h>
#include <common/StringRef.h>
#include <string>
#include <vector>
#include <Poco/Exception.h>
#include <common/StringRef.h>
#include <common/Types.h>
#include "GeodataProviders/INamesProvider.h"
/** A class that allows you to recognize by region id its text name in one of the supported languages: ru, en, ua, by, kz, tr.
......@@ -37,23 +37,24 @@ private:
static const char ** getSupportedLanguages()
{
static const char * res[] { "ru", "en", "ua", "by", "kz", "tr" };
static const char * res[]{"ru", "en", "ua", "by", "kz", "tr"};
return res;
}
struct language_alias { const char * const name; const Language lang; };
struct language_alias
{
const char * const name;
const Language lang;
};
static const language_alias * getLanguageAliases()
{
static constexpr const language_alias language_aliases[]
{
{ "ru", Language::RU },
{ "en", Language::EN },
{ "ua", Language::UA },
{ "uk", Language::UA },
{ "by", Language::BY },
{ "kz", Language::KZ },
{ "tr", Language::TR }
};
static constexpr const language_alias language_aliases[]{{"ru", Language::RU},
{"en", Language::EN},
{"ua", Language::UA},
{"uk", Language::UA},
{"by", Language::BY},
{"kz", Language::KZ},
{"tr", Language::TR}};
return language_aliases;
}
......@@ -79,7 +80,7 @@ public:
while (ref.size == 0 && language_id != ROOT_LANGUAGE)
{
static const size_t FALLBACK[] = { 0, 0, 0, 0, 0, 1 };
static const size_t FALLBACK[] = {0, 0, 0, 0, 0, 1};
language_id = FALLBACK[language_id];
ref = names_refs[language_id][region_id];
}
......
#include <Common/config.h>
#if USE_MYSQL
#include "TechDataHierarchy.h"
# include "TechDataHierarchy.h"
#include <common/logger_useful.h>
#include <mysqlxx/PoolWithFailover.h>
# include <common/logger_useful.h>
# include <mysqlxx/PoolWithFailover.h>
static constexpr auto config_key = "mysql_metrica";
......
......@@ -5,12 +5,12 @@
namespace Poco
{
namespace Util
{
class AbstractConfiguration;
}
namespace Util
{
class AbstractConfiguration;
}
class Logger;
class Logger;
}
......@@ -21,8 +21,8 @@ namespace Poco
class TechDataHierarchy
{
private:
UInt8 os_parent[256] {};
UInt8 se_parent[256] {};
UInt8 os_parent[256]{};
UInt8 se_parent[256]{};
public:
void reload();
......@@ -49,15 +49,9 @@ public:
}
UInt8 OSToParent(UInt8 x) const
{
return os_parent[x];
}
UInt8 OSToParent(UInt8 x) const { return os_parent[x]; }
UInt8 SEToParent(UInt8 x) const
{
return se_parent[x];
}
UInt8 SEToParent(UInt8 x) const { return se_parent[x]; }
/// To the topmost ancestor.
......@@ -77,4 +71,6 @@ public:
};
class TechDataHierarchySingleton : public ext::singleton<TechDataHierarchySingleton>, public TechDataHierarchy {};
class TechDataHierarchySingleton : public ext::singleton<TechDataHierarchySingleton>, public TechDataHierarchy
{
};
#include "ExecutableDictionarySource.h"
#include <thread>
#include <future>
#include <Common/ShellCommand.h>
#include <Interpreters/Context.h>
#include <DataStreams/OwningBlockInputStream.h>
#include "DictionarySourceHelpers.h"
#include <thread>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Common/ShellCommand.h>
#include <common/logger_useful.h>
#include "DictionarySourceFactory.h"
#include "DictionarySourceHelpers.h"
#include "DictionaryStructure.h"
namespace DB
{
static const size_t max_block_size = 8192;
namespace
{
/// Owns ShellCommand and calls wait for it.
class ShellCommandOwningBlockInputStream : public OwningBlockInputStream<ShellCommand>
{
public:
ShellCommandOwningBlockInputStream(const BlockInputStreamPtr & stream, std::unique_ptr<ShellCommand> own)
: OwningBlockInputStream(std::move(stream), std::move(own))
/// Owns ShellCommand and calls wait for it.
class ShellCommandOwningBlockInputStream : public OwningBlockInputStream<ShellCommand>
{
}
public:
ShellCommandOwningBlockInputStream(const BlockInputStreamPtr & stream, std::unique_ptr<ShellCommand> own)
: OwningBlockInputStream(std::move(stream), std::move(own))
{
}
void readSuffix() override
{
OwningBlockInputStream<ShellCommand>::readSuffix();
own->wait();
}
};
void readSuffix() override
{
OwningBlockInputStream<ShellCommand>::readSuffix();
own->wait();
}
};
}
ExecutableDictionarySource::ExecutableDictionarySource(const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
Block & sample_block, const Context & context)
: log(&Logger::get("ExecutableDictionarySource")),
update_time{std::chrono::system_clock::from_time_t(0)},
dict_struct{dict_struct_},
command{config.getString(config_prefix + ".command")},
update_field{config.getString(config_prefix + ".update_field", "")},
format{config.getString(config_prefix + ".format")},
sample_block{sample_block},
context(context)
ExecutableDictionarySource::ExecutableDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
const Context & context)
: log(&Logger::get("ExecutableDictionarySource"))
, update_time{std::chrono::system_clock::from_time_t(0)}
, dict_struct{dict_struct_}
, command{config.getString(config_prefix + ".command")}
, update_field{config.getString(config_prefix + ".update_field", "")}
, format{config.getString(config_prefix + ".format")}
, sample_block{sample_block}
, context(context)
{
}
ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionarySource & other)
: log(&Logger::get("ExecutableDictionarySource")),
update_time{other.update_time},
dict_struct{other.dict_struct},
command{other.command},
update_field{other.update_field},
format{other.format},
sample_block{other.sample_block},
context(other.context)
: log(&Logger::get("ExecutableDictionarySource"))
, update_time{other.update_time}
, dict_struct{other.dict_struct}
, command{other.command}
, update_field{other.update_field}
, format{other.format}
, sample_block{other.sample_block}
, context(other.context)
{
}
......@@ -73,12 +74,12 @@ std::string ExecutableDictionarySource::getUpdateFieldAndDate()
auto tmp_time = update_time;
update_time = std::chrono::system_clock::now();
time_t hr_time = std::chrono::system_clock::to_time_t(tmp_time) - 1;
char buffer [80];
char buffer[80];
struct tm * timeinfo;
timeinfo = localtime (&hr_time);
timeinfo = localtime(&hr_time);
strftime(buffer, 80, "\"%Y-%m-%d %H:%M:%S\"", timeinfo);
std::string str_time(buffer);
return command + " " + update_field + " "+ str_time;
return command + " " + update_field + " " + str_time;
///Example case: command -T "2018-02-12 12:44:04"
///should return all entries after mentioned date
///if executable is eligible to return entries according to date.
......@@ -87,7 +88,7 @@ std::string ExecutableDictionarySource::getUpdateFieldAndDate()
else
{
std::string str_time("\"0000-00-00 00:00:00\""); ///for initial load
return command + " " + update_field + " "+ str_time;
return command + " " + update_field + " " + str_time;
}
}
......@@ -110,63 +111,63 @@ BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll()
namespace
{
/** A stream, that also runs and waits for background thread
/** A stream, that also runs and waits for background thread
* (that will feed data into pipe to be read from the other side of the pipe).
*/
class BlockInputStreamWithBackgroundThread final : public IProfilingBlockInputStream
{
public:
BlockInputStreamWithBackgroundThread(
const BlockInputStreamPtr & stream_, std::unique_ptr<ShellCommand> && command_,
std::packaged_task<void()> && task_)
: stream{stream_}, command{std::move(command_)}, task(std::move(task_)),
thread([this]{ task(); command->in.close(); })
class BlockInputStreamWithBackgroundThread final : public IProfilingBlockInputStream
{
children.push_back(stream);
}
public:
BlockInputStreamWithBackgroundThread(
const BlockInputStreamPtr & stream_, std::unique_ptr<ShellCommand> && command_, std::packaged_task<void()> && task_)
: stream{stream_}, command{std::move(command_)}, task(std::move(task_)), thread([this] {
task();
command->in.close();
})
{
children.push_back(stream);
}
~BlockInputStreamWithBackgroundThread() override
{
if (thread.joinable())
~BlockInputStreamWithBackgroundThread() override
{
try
{
readSuffix();
}
catch (...)
if (thread.joinable())
{
tryLogCurrentException(__PRETTY_FUNCTION__);
try
{
readSuffix();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
}
Block getHeader() const override { return stream->getHeader(); }
Block getHeader() const override { return stream->getHeader(); }
private:
Block readImpl() override { return stream->read(); }
private:
Block readImpl() override { return stream->read(); }
void readSuffix() override
{
IProfilingBlockInputStream::readSuffix();
if (!wait_called)
void readSuffix() override
{
wait_called = true;
command->wait();
IProfilingBlockInputStream::readSuffix();
if (!wait_called)
{
wait_called = true;
command->wait();
}
thread.join();
/// To rethrow an exception, if any.
task.get_future().get();
}
thread.join();
/// To rethrow an exception, if any.
task.get_future().get();
}
String getName() const override { return "WithBackgroundThread"; }
String getName() const override { return "WithBackgroundThread"; }
BlockInputStreamPtr stream;
std::unique_ptr<ShellCommand> command;
std::packaged_task<void()> task;
std::thread thread;
bool wait_called = false;
};
BlockInputStreamPtr stream;
std::unique_ptr<ShellCommand> command;
std::packaged_task<void()> task;
std::thread thread;
bool wait_called = false;
};
}
......@@ -180,15 +181,10 @@ BlockInputStreamPtr ExecutableDictionarySource::loadIds(const std::vector<UInt64
auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
return std::make_shared<BlockInputStreamWithBackgroundThread>(
input_stream, std::move(process), std::packaged_task<void()>(
[output_stream, &ids]() mutable
{
formatIDs(output_stream, ids);
}));
input_stream, std::move(process), std::packaged_task<void()>([output_stream, &ids]() mutable { formatIDs(output_stream, ids); }));
}
BlockInputStreamPtr ExecutableDictionarySource::loadKeys(
const Columns & key_columns, const std::vector<size_t> & requested_rows)
BlockInputStreamPtr ExecutableDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
LOG_TRACE(log, "loadKeys " << toString() << " size = " << requested_rows.size());
auto process = ShellCommand::execute(command);
......@@ -197,8 +193,7 @@ BlockInputStreamPtr ExecutableDictionarySource::loadKeys(
auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
return std::make_shared<BlockInputStreamWithBackgroundThread>(
input_stream, std::move(process), std::packaged_task<void()>(
[output_stream, key_columns, &requested_rows, this]() mutable
input_stream, std::move(process), std::packaged_task<void()>([output_stream, key_columns, &requested_rows, this]() mutable
{
formatKeys(dict_struct, output_stream, key_columns, requested_rows);
}));
......@@ -240,7 +235,7 @@ void registerDictionarySourceExecutable(DictionarySourceFactory & factory)
Block & sample_block,
const Context & context) -> DictionarySourcePtr {
if (dict_struct.has_expressions)
throw Exception {"Dictionary source of type `executable` does not support attribute expressions", ErrorCodes::LOGICAL_ERROR};
throw Exception{"Dictionary source of type `executable` does not support attribute expressions", ErrorCodes::LOGICAL_ERROR};
return std::make_unique<ExecutableDictionarySource>(dict_struct, config, config_prefix + ".executable", sample_block, context);
};
......
#pragma once
#include "IDictionarySource.h"
#include "DictionaryStructure.h"
#include "IDictionarySource.h"
namespace Poco { class Logger; }
namespace Poco
{
class Logger;
}
namespace DB
{
/// Allows loading dictionaries from executable
class ExecutableDictionarySource final : public IDictionarySource
{
......@@ -29,8 +31,7 @@ public:
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(
const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
bool isModified() const override;
......
#include <ext/range.h>
#include <boost/range/join.hpp>
#include "ExternalQueryBuilder.h"
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include "writeParenthesisedString.h"
#include <boost/range/join.hpp>
#include <ext/range.h>
#include "DictionaryStructure.h"
#include "ExternalQueryBuilder.h"
#include "writeParenthesisedString.h"
namespace DB
{
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
......@@ -164,7 +163,7 @@ std::string ExternalQueryBuilder::composeUpdateQuery(const std::string & update_
else
update_query = " WHERE " + update_field + " >= '" + time_point + "'";
return out.insert(out.size()-1, update_query); ///This is done to insert "update_query" before "out"'s semicolon
return out.insert(out.size() - 1, update_query); ///This is done to insert "update_query" before "out"'s semicolon
}
......@@ -238,10 +237,8 @@ std::string ExternalQueryBuilder::composeLoadIdsQuery(const std::vector<UInt64>
}
std::string ExternalQueryBuilder::composeLoadKeysQuery(
const Columns & key_columns,
const std::vector<size_t> & requested_rows,
LoadKeysMethod method)
std::string
ExternalQueryBuilder::composeLoadKeysQuery(const Columns & key_columns, const std::vector<size_t> & requested_rows, LoadKeysMethod method)
{
if (!dict_struct.key)
throw Exception{"Composite key required for method", ErrorCodes::UNSUPPORTED_METHOD};
......
#pragma once
#include <string>
#include <Formats/FormatSettings.h>
#include <Columns/IColumn.h>
#include <Formats/FormatSettings.h>
#include <Parsers/IdentifierQuotingStyle.h>
namespace DB
{
struct DictionaryStructure;
class WriteBuffer;
......@@ -37,7 +36,7 @@ struct ExternalQueryBuilder
std::string composeLoadAllQuery() const;
/** Generate a query to load data after certain time point*/
std::string composeUpdateQuery(const std::string &update_field, const std::string &time_point) const;
std::string composeUpdateQuery(const std::string & update_field, const std::string & time_point) const;
/** Generate a query to load data by set of UInt64 keys. */
std::string composeLoadIdsQuery(const std::vector<UInt64> & ids);
......@@ -53,10 +52,7 @@ struct ExternalQueryBuilder
IN_WITH_TUPLES,
};
std::string composeLoadKeysQuery(
const Columns & key_columns,
const std::vector<size_t> & requested_rows,
LoadKeysMethod method);
std::string composeLoadKeysQuery(const Columns & key_columns, const std::vector<size_t> & requested_rows, LoadKeysMethod method);
private:
......
#include <ext/range.h>
#include "ExternalResultDescription.h"
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesNumber.h>
#include <Common/typeid_cast.h>
#include <ext/range.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_TYPE;
......
......@@ -5,7 +5,6 @@
namespace DB
{
/** Common part for implementation of MySQLBlockInputStream, MongoDBBlockInputStream and others.
*/
struct ExternalResultDescription
......
#include "FileDictionarySource.h"
#include <Interpreters/Context.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <IO/ReadBufferFromFile.h>
#include <Interpreters/Context.h>
#include <Poco/File.h>
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
namespace DB
{
static const size_t max_block_size = 8192;
FileDictionarySource::FileDictionarySource(const std::string & filename, const std::string & format, Block & sample_block,
const Context & context)
FileDictionarySource::FileDictionarySource(
const std::string & filename, const std::string & format, Block & sample_block, const Context & context)
: filename{filename}, format{format}, sample_block{sample_block}, context(context)
{}
{
}
FileDictionarySource::FileDictionarySource(const FileDictionarySource & other)
: filename{other.filename}, format{other.format},
sample_block{other.sample_block}, context(other.context),
last_modification{other.last_modification}
{}
: filename{other.filename}
, format{other.format}
, sample_block{other.sample_block}
, context(other.context)
, last_modification{other.last_modification}
{
}
BlockInputStreamPtr FileDictionarySource::loadAll()
{
auto in_ptr = std::make_unique<ReadBufferFromFile>(filename);
auto stream = context.getInputFormat(
format, *in_ptr, sample_block, max_block_size);
auto stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size);
last_modification = getLastModification();
return std::make_shared<OwningBlockInputStream<ReadBuffer>>(stream, std::move(in_ptr));
......@@ -56,7 +58,7 @@ void registerDictionarySourceFile(DictionarySourceFactory & factory)
Block & sample_block,
const Context & context) -> DictionarySourcePtr {
if (dict_struct.has_expressions)
throw Exception {"Dictionary source of type `file` does not support attribute expressions", ErrorCodes::LOGICAL_ERROR};
throw Exception{"Dictionary source of type `file` does not support attribute expressions", ErrorCodes::LOGICAL_ERROR};
const auto filename = config.getString(config_prefix + ".file.path");
const auto format = config.getString(config_prefix + ".file.format");
......
#pragma once
#include "IDictionarySource.h"
#include <Poco/Timestamp.h>
#include "IDictionarySource.h"
namespace DB
{
class Context;
/// Allows loading dictionaries from a file with given format, does not support "random access"
class FileDictionarySource final : public IDictionarySource
{
public:
FileDictionarySource(const std::string & filename, const std::string & format, Block & sample_block,
const Context & context);
FileDictionarySource(const std::string & filename, const std::string & format, Block & sample_block, const Context & context);
FileDictionarySource(const FileDictionarySource & other);
......@@ -30,8 +28,7 @@ public:
throw Exception{"Method loadIds is unsupported for FileDictionarySource", ErrorCodes::NOT_IMPLEMENTED};
}
BlockInputStreamPtr loadKeys(
const Columns & /*key_columns*/, const std::vector<size_t> & /*requested_rows*/) override
BlockInputStreamPtr loadKeys(const Columns & /*key_columns*/, const std::vector<size_t> & /*requested_rows*/) override
{
throw Exception{"Method loadKeys is unsupported for FileDictionarySource", ErrorCodes::NOT_IMPLEMENTED};
}
......
#pragma once
#include <IO/ConnectionTimeouts.h>
#include <Poco/URI.h>
#include "IDictionarySource.h"
#include "DictionaryStructure.h"
#include <common/LocalDateTime.h>
#include <IO/ConnectionTimeouts.h>
#include "DictionaryStructure.h"
#include "IDictionarySource.h"
namespace Poco { class Logger; }
namespace Poco
{
class Logger;
}
namespace DB
{
/// Allows loading dictionaries from http[s] source
class HTTPDictionarySource final : public IDictionarySource
{
public:
HTTPDictionarySource(const DictionaryStructure & dict_struct_,
HTTPDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
......@@ -30,8 +33,7 @@ public:
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(
const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
bool isModified() const override;
......
此差异已折叠。
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <vector>
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
class IDictionarySource;
using DictionarySourcePtr = std::unique_ptr<IDictionarySource>;
......@@ -36,8 +35,7 @@ public:
* `requested_rows` contains indices of all rows containing unique keys.
* It must be guaranteed, that 'requested_rows' array will live at least until all data will be read from returned stream.
*/
virtual BlockInputStreamPtr loadKeys(
const Columns & key_columns, const std::vector<size_t> & requested_rows) = 0;
virtual BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) = 0;
/// indicates whether the source has been modified since last load* operation
virtual bool isModified() const = 0;
......
......@@ -3,12 +3,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MANY_COLUMNS;
extern const int TOO_MANY_ROWS;
extern const int RECEIVED_EMPTY_DATA;
extern const int TOO_MANY_COLUMNS;
extern const int TOO_MANY_ROWS;
extern const int RECEIVED_EMPTY_DATA;
}
std::string readInvalidateQuery(IProfilingBlockInputStream & block_input_stream)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册