ClickHouseDictionarySource.cpp 9.3 KB
Newer Older
1
#include "ClickHouseDictionarySource.h"
P
proller 已提交
2
#include <memory>
3 4
#include <Client/ConnectionPool.h>
#include <DataStreams/RemoteBlockInputStream.h>
5
#include <DataStreams/ConvertingBlockInputStream.h>
P
proller 已提交
6
#include <IO/ConnectionTimeouts.h>
7 8
#include <Interpreters/executeQuery.h>
#include <Common/isLocalAddress.h>
9
#include <common/logger_useful.h>
10 11
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
P
proller 已提交
12 13 14
#include "ExternalQueryBuilder.h"
#include "readInvalidateQuery.h"
#include "writeParenthesisedString.h"
K
kreuzerkrieg 已提交
15
#include "DictionaryFactory.h"
16
#include "DictionarySourceHelpers.h"
A
Alexey Milovidov 已提交
17 18 19

namespace DB
{
20

21
namespace ErrorCodes
A
Alexey Milovidov 已提交
22
{
23
    extern const int BAD_ARGUMENTS;
A
Alexey Milovidov 已提交
24 25
}

26
namespace
27
{
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
    constexpr size_t MAX_CONNECTIONS = 16;

    inline UInt16 getPortFromContext(const Context & context, bool secure)
    {
        return secure ? context.getTCPPortSecure().value_or(0) : context.getTCPPort();
    }

    ConnectionPoolWithFailoverPtr createPool(const ClickHouseDictionarySource::Configuration & configuration)
    {
        if (configuration.is_local)
            return nullptr;

        ConnectionPoolPtrs pools;
        pools.emplace_back(std::make_shared<ConnectionPool>(
            MAX_CONNECTIONS,
            configuration.host,
            configuration.port,
            configuration.db,
            configuration.user,
            configuration.password,
            "", /* cluster */
            "", /* cluster_secret */
            "ClickHouseDictionarySource",
            Protocol::Compression::Enable,
            configuration.secure ? Protocol::Secure::Enable : Protocol::Secure::Disable));

        return std::make_shared<ConnectionPoolWithFailover>(pools, LoadBalancing::RANDOM);
    }
A
Alexey Milovidov 已提交
56

57
}
A
Alexey Milovidov 已提交
58

59
ClickHouseDictionarySource::ClickHouseDictionarySource(
P
proller 已提交
60
    const DictionaryStructure & dict_struct_,
61
    const Configuration & configuration_,
K
kreuzerkrieg 已提交
62
    const Block & sample_block_,
63
    const Context & context_)
P
proller 已提交
64 65
    : update_time{std::chrono::system_clock::from_time_t(0)}
    , dict_struct{dict_struct_}
66 67
    , configuration{configuration_}
    , query_builder{dict_struct, configuration.db, "", configuration.table, configuration.where, IdentifierQuotingStyle::Backticks}
K
kreuzerkrieg 已提交
68
    , sample_block{sample_block_}
69 70
    , context{context_}
    , pool{createPool(configuration)}
P
proller 已提交
71 72
    , load_all_query{query_builder.composeLoadAllQuery()}
{
73 74 75
    /// Query context is needed because some code in executeQuery function may assume it exists.
    /// Current example is Context::getSampleBlockCache from InterpreterSelectWithUnionQuery::getSampleBlock.
    context.makeQueryContext();
P
proller 已提交
76
}
A
Alexey Milovidov 已提交
77 78

ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionarySource & other)
P
proller 已提交
79 80
    : update_time{other.update_time}
    , dict_struct{other.dict_struct}
81
    , configuration{other.configuration}
P
proller 已提交
82
    , invalidate_query_response{other.invalidate_query_response}
83
    , query_builder{dict_struct, configuration.db, "", configuration.table, configuration.where, IdentifierQuotingStyle::Backticks}
P
proller 已提交
84
    , sample_block{other.sample_block}
85 86
    , context{other.context}
    , pool{createPool(configuration)}
P
proller 已提交
87 88
    , load_all_query{other.load_all_query}
{
89
    context.makeQueryContext();
P
proller 已提交
90
}
A
Alexey Milovidov 已提交
91

92 93 94 95
std::string ClickHouseDictionarySource::getUpdateFieldAndDate()
{
    if (update_time != std::chrono::system_clock::from_time_t(0))
    {
96 97
        time_t hr_time = std::chrono::system_clock::to_time_t(update_time) - 1;
        std::string str_time = DateLUT::instance().timeToString(hr_time);
98
        update_time = std::chrono::system_clock::now();
99
        return query_builder.composeUpdateQuery(configuration.update_field, str_time);
100 101 102 103
    }
    else
    {
        update_time = std::chrono::system_clock::now();
104
        return query_builder.composeLoadAllQuery();
105 106
    }
}
A
Alexey Milovidov 已提交
107 108 109

BlockInputStreamPtr ClickHouseDictionarySource::loadAll()
{
M
Maksim Kita 已提交
110
    return createStreamForQuery(load_all_query);
A
Alexey Milovidov 已提交
111 112
}

113 114
BlockInputStreamPtr ClickHouseDictionarySource::loadUpdatedAll()
{
M
Maksim Kita 已提交
115 116
    String load_update_query = getUpdateFieldAndDate();
    return createStreamForQuery(load_update_query);
117
}
A
Alexey Milovidov 已提交
118 119 120

BlockInputStreamPtr ClickHouseDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
M
Maksim Kita 已提交
121
    return createStreamForQuery(query_builder.composeLoadIdsQuery(ids));
A
Alexey Milovidov 已提交
122 123 124
}


P
proller 已提交
125
BlockInputStreamPtr ClickHouseDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
A
Alexey Milovidov 已提交
126
{
M
Maksim Kita 已提交
127 128
    String query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::IN_WITH_TUPLES);
    return createStreamForQuery(query);
A
Alexey Milovidov 已提交
129 130
}

V
VadimPE 已提交
131 132
bool ClickHouseDictionarySource::isModified() const
{
133
    if (!configuration.invalidate_query.empty())
V
VadimPE 已提交
134
    {
135
        auto response = doInvalidateQuery(configuration.invalidate_query);
A
Alexey Milovidov 已提交
136
        LOG_TRACE(log, "Invalidate query has returned: {}, previous value: {}", response, invalidate_query_response);
V
VadimPE 已提交
137 138 139 140 141 142 143
        if (invalidate_query_response == response)
            return false;
        invalidate_query_response = response;
    }
    return true;
}

144 145
bool ClickHouseDictionarySource::hasUpdateField() const
{
146
    return !configuration.update_field.empty();
147
}
A
Alexey Milovidov 已提交
148 149 150

std::string ClickHouseDictionarySource::toString() const
{
151 152
    const std::string & where = configuration.where;
    return "ClickHouse: " + configuration.db + '.' + configuration.table + (where.empty() ? "" : ", where: " + where);
A
Alexey Milovidov 已提交
153 154
}

M
Maksim Kita 已提交
155
BlockInputStreamPtr ClickHouseDictionarySource::createStreamForQuery(const String & query)
A
Alexey Milovidov 已提交
156
{
M
Maksim Kita 已提交
157 158 159
    /// Sample block should not contain first row default values
    auto empty_sample_block = sample_block.cloneEmpty();

160
    if (configuration.is_local)
161
    {
M
Maksim Kita 已提交
162 163 164
        auto stream = executeQuery(query, context, true).getInputStream();
        stream = std::make_shared<ConvertingBlockInputStream>(stream, empty_sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
        return stream;
165
    }
166

M
Maksim Kita 已提交
167
    return std::make_shared<RemoteBlockInputStream>(pool, query, empty_sample_block, context);
A
Alexey Milovidov 已提交
168 169
}

V
VadimPE 已提交
170 171
std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & request) const
{
A
Alexey Milovidov 已提交
172
    LOG_TRACE(log, "Performing invalidate query");
173
    if (configuration.is_local)
V
VadimPE 已提交
174
    {
175
        Context query_context = context;
176
        auto input_block = executeQuery(request, query_context, true).getInputStream();
177
        return readInvalidateQuery(*input_block);
V
VadimPE 已提交
178
    }
V
VadimPE 已提交
179 180
    else
    {
181
        /// We pass empty block to RemoteBlockInputStream, because we don't know the structure of the result.
V
VadimPE 已提交
182 183 184 185
        Block invalidate_sample_block;
        RemoteBlockInputStream invalidate_stream(pool, request, invalidate_sample_block, context);
        return readInvalidateQuery(invalidate_stream);
    }
V
VadimPE 已提交
186 187
}

188 189
void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
{
A
alexey-milovidov 已提交
190
    auto create_table_source = [=](const DictionaryStructure & dict_struct,
191 192 193
                                 const Poco::Util::AbstractConfiguration & config,
                                 const std::string & config_prefix,
                                 Block & sample_block,
194
                                 const Context & context,
195
                                 const std::string & default_database [[maybe_unused]],
196
                                 bool /* check_config */) -> DictionarySourcePtr
197
    {
198 199 200 201 202 203
        bool secure = config.getBool(config_prefix + ".secure", false);
        Context context_copy = context;

        UInt16 default_port = getPortFromContext(context_copy, secure);

        std::string settings_config_prefix = config_prefix + ".clickhouse";
M
Maksim Kita 已提交
204 205
        std::string host = config.getString(settings_config_prefix + ".host", "localhost");
        UInt16 port = static_cast<UInt16>(config.getUInt(settings_config_prefix + ".port", default_port));
206 207 208

        ClickHouseDictionarySource::Configuration configuration {
            .secure = config.getBool(settings_config_prefix + ".secure", false),
M
Maksim Kita 已提交
209 210
            .host = host,
            .port = port,
211 212 213 214 215 216 217
            .user = config.getString(settings_config_prefix + ".user", "default"),
            .password = config.getString(settings_config_prefix + ".password", ""),
            .db = config.getString(settings_config_prefix + ".db", default_database),
            .table = config.getString(settings_config_prefix + ".table"),
            .where = config.getString(settings_config_prefix + ".where", ""),
            .update_field = config.getString(settings_config_prefix + ".update_field", ""),
            .invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
M
Maksim Kita 已提交
218
            .is_local = isLocalAddress({host, port}, default_port)
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
        };

        /// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication).
        if (configuration.is_local)
        {
            context_copy.setUser(configuration.user, configuration.password, Poco::Net::SocketAddress("127.0.0.1", 0));
            context_copy = copyContextAndApplySettings(config_prefix, context_copy, config);
        }

        String dictionary_name = config.getString(".dictionary.name", "");
        String dictionary_database = config.getString(".dictionary.database", "");

        if (dictionary_name == configuration.table && dictionary_database == configuration.db)
            throw Exception(ErrorCodes::BAD_ARGUMENTS, "ClickHouseDictionarySource table cannot be dictionary table");

        return std::make_unique<ClickHouseDictionarySource>(dict_struct, configuration, sample_block, context_copy);
235
    };
236

A
alexey-milovidov 已提交
237
    factory.registerSource("clickhouse", create_table_source);
238 239
}

A
Alexey Milovidov 已提交
240
}