提交 927e572d 编写于 作者: A Andrew Onyshchuk

AvroConfluent bugfixes

上级 16d49900
......@@ -189,7 +189,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, input_format_values_interpret_expressions, true, "For Values format: if the field could not be parsed by streaming parser, run SQL parser and try to interpret it as SQL expression.", 0) \
M(SettingBool, input_format_values_deduce_templates_of_expressions, true, "For Values format: if the field could not be parsed by streaming parser, run SQL parser, deduce template of the SQL expression, try to parse all rows using template and then interpret expression for all rows.", 0) \
M(SettingBool, input_format_values_accurate_types_of_literals, true, "For Values format: when parsing and interpreting expressions using template, check actual type of literal to avoid possible overflow and precision issues.", 0) \
M(SettingString, input_format_avro_schema_registry_url, "", "For AvroConfluent format: Confluent Schema Registry URL.", 0) \
M(SettingURI, format_avro_schema_registry_url, {}, "For AvroConfluent format: Confluent Schema Registry URL.", 0) \
\
M(SettingBool, output_format_json_quote_64bit_integers, true, "Controls quoting of 64-bit integers in JSON output format.", 0) \
\
......
......@@ -396,6 +396,54 @@ void SettingEnum<EnumType, Tag>::set(const Field & x)
}
String SettingURI::toString() const
{
return value.toString();
}
Field SettingURI::toField() const
{
return value.toString();
}
void SettingURI::set(const Poco::URI & x)
{
value = x;
changed = true;
}
void SettingURI::set(const Field & x)
{
const String & s = safeGet<const String &>(x);
set(s);
}
void SettingURI::set(const String & x)
{
try {
Poco::URI uri(x);
set(uri);
}
catch (const Poco::Exception& e)
{
throw Exception{Exception::CreateFromPoco, e};
}
}
void SettingURI::serialize(WriteBuffer & buf, SettingsBinaryFormat) const
{
writeStringBinary(toString(), buf);
}
void SettingURI::deserialize(ReadBuffer & buf, SettingsBinaryFormat)
{
String s;
readStringBinary(s, buf);
set(s);
}
#define IMPLEMENT_SETTING_ENUM(ENUM_NAME, LIST_OF_NAMES_MACRO, ERROR_CODE_FOR_UNEXPECTED_NAME) \
IMPLEMENT_SETTING_ENUM_WITH_TAG(ENUM_NAME, void, LIST_OF_NAMES_MACRO, ERROR_CODE_FOR_UNEXPECTED_NAME)
......
#pragma once
#include <Poco/Timespan.h>
#include <Poco/URI.h>
#include <DataStreams/SizeLimits.h>
#include <Formats/FormatSettings.h>
#include <common/StringRef.h>
......@@ -196,6 +197,26 @@ struct SettingEnum
void deserialize(ReadBuffer & buf, SettingsBinaryFormat format);
};
struct SettingURI
{
Poco::URI value;
bool changed = false;
SettingURI(const Poco::URI & x = Poco::URI{}) : value(x) {}
operator Poco::URI() const { return value; }
SettingURI & operator= (const Poco::URI & x) { set(x); return *this; }
String toString() const;
Field toField() const;
void set(const Poco::URI & x);
void set(const Field & x);
void set(const String & x);
void serialize(WriteBuffer & buf, SettingsBinaryFormat format) const;
void deserialize(ReadBuffer & buf, SettingsBinaryFormat format);
};
enum class LoadBalancing
{
......
......@@ -14,7 +14,7 @@
#include <DataStreams/NativeBlockInputStream.h>
#include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
#include <Processors/Formats/Impl/MySQLOutputFormat.h>
#include <Poco/URI.h>
namespace DB
{
......@@ -68,7 +68,15 @@ static FormatSettings getInputFormatSetting(const Settings & settings, const Con
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
format_settings.avro.schema_registry_url = settings.input_format_avro_schema_registry_url;
/// Validate avro_schema_registry_url with RemoteHostFilter when non-empty and in Server context
if (context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER))
{
const Poco::URI & avro_schema_registry_url = settings.format_avro_schema_registry_url;
if (!avro_schema_registry_url.empty())
context.getRemoteHostFilter().checkURL(avro_schema_registry_url);
}
format_settings.avro.schema_registry_url = settings.format_avro_schema_registry_url.toString();
return format_settings;
}
......
......@@ -480,7 +480,7 @@ AvroDeserializer::AvroDeserializer(const ColumnsWithTypeAndName & columns, avro:
}
}
void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder & decoder)
void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder & decoder) const
{
for (size_t i = 0; i < field_mapping.size(); i++)
{
......@@ -519,51 +519,49 @@ bool AvroRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
class AvroConfluentRowInputFormat::SchemaRegistry
{
public:
SchemaRegistry(const std::string & base_url_)
SchemaRegistry(const std::string & base_url_, size_t schema_cache_max_size = 1000)
: base_url(base_url_), schema_cache(schema_cache_max_size)
{
if (base_url_.empty())
{
if (base_url.empty())
throw Exception("Empty Schema Registry URL", ErrorCodes::BAD_ARGUMENTS);
}
try
{
base_url = base_url_;
}
catch (const Poco::SyntaxException & e)
{
throw Exception("Invalid Schema Registry URL: " + e.displayText(), ErrorCodes::BAD_ARGUMENTS);
}
}
avro::ValidSchema getSchema(uint32_t id) const
avro::ValidSchema getSchema(uint32_t id)
{
auto [schema, loaded] = schema_cache.getOrSet(
id,
[this, id](){ return std::make_shared<avro::ValidSchema>(fetchSchema(id)); }
);
return *schema;
}
private:
avro::ValidSchema fetchSchema(uint32_t id)
{
try
{
try
{
/// TODO Host checking to prevent SSRF
Poco::URI url(base_url, "/schemas/ids/" + std::to_string(id));
LOG_TRACE((&Logger::get("AvroConfluentRowInputFormat")), "Fetching schema id = " << id);
/// One second for connect/send/receive. Just in case.
ConnectionTimeouts timeouts({1, 0}, {1, 0}, {1, 0});
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery());
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request.setHost(url.getHost());
auto session = makePooledHTTPSession(url, timeouts, 1);
session->sendRequest(request);
Poco::Net::HTTPResponse response;
auto & response_body = session->receiveResponse(response);
if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK)
{
throw Exception("HTTP code " + std::to_string(response.getStatus()), ErrorCodes::INCORRECT_DATA);
}
auto response_body = receiveResponse(*session, request, response, false);
Poco::JSON::Parser parser;
auto json_body = parser.parse(response_body).extract<Poco::JSON::Object::Ptr>();
auto json_body = parser.parse(*response_body).extract<Poco::JSON::Object::Ptr>();
auto schema = json_body->getValue<std::string>("schema");
LOG_TRACE((&Logger::get("AvroConfluentRowInputFormat")),
"Succesfully fetched schema id = " << id << "\n" << schema);
return avro::compileJsonSchemaFromString(schema);
}
catch (const Exception &)
......@@ -588,8 +586,27 @@ public:
private:
Poco::URI base_url;
LRUCache<uint32_t, avro::ValidSchema> schema_cache;
};
using ConfluentSchemaRegistry = AvroConfluentRowInputFormat::SchemaRegistry;
#define SCHEMA_REGISTRY_CACHE_MAX_SIZE 1000
/// Cache of Schema Registry URL -> SchemaRegistry
static LRUCache<std::string, ConfluentSchemaRegistry> schema_registry_cache(SCHEMA_REGISTRY_CACHE_MAX_SIZE);
static std::shared_ptr<ConfluentSchemaRegistry> getConfluentSchemaRegistry(const FormatSettings & format_settings)
{
const auto & base_url = format_settings.avro.schema_registry_url;
auto [schema_registry, loaded] = schema_registry_cache.getOrSet(
base_url,
[base_url]()
{
return std::make_shared<ConfluentSchemaRegistry>(base_url);
}
);
return schema_registry;
}
static uint32_t readConfluentSchemaId(ReadBuffer & in)
{
uint8_t magic;
......@@ -611,7 +628,7 @@ AvroConfluentRowInputFormat::AvroConfluentRowInputFormat(
const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_.cloneEmpty(), in_, params_)
, header_columns(header_.getColumnsWithTypeAndName())
, schema_registry(std::make_unique<SchemaRegistry>(format_settings_.avro.schema_registry_url))
, schema_registry(getConfluentSchemaRegistry(format_settings_))
, input_stream(std::make_unique<InputStreamReadBufferAdapter>(in))
, decoder(avro::binaryDecoder())
......@@ -632,7 +649,7 @@ bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExten
return true;
}
AvroDeserializer & AvroConfluentRowInputFormat::getOrCreateDeserializer(SchemaId schema_id)
const AvroDeserializer & AvroConfluentRowInputFormat::getOrCreateDeserializer(SchemaId schema_id)
{
auto it = deserializer_cache.find(schema_id);
if (it == deserializer_cache.end())
......@@ -657,12 +674,6 @@ void registerInputFormatProcessorAvro(FormatFactory & factory)
});
#if USE_POCO_JSON
/// AvroConfluent format is disabled for the following reasons:
/// 1. There is no test for it.
/// 2. RemoteHostFilter is not used to prevent CSRF attacks.
#if 0
factory.registerInputFormatProcessor("AvroConfluent",[](
ReadBuffer & buf,
const Block & sample,
......@@ -673,8 +684,6 @@ void registerInputFormatProcessorAvro(FormatFactory & factory)
});
#endif
#endif
}
}
......
......@@ -23,7 +23,7 @@ class AvroDeserializer
{
public:
AvroDeserializer(const ColumnsWithTypeAndName & columns, avro::ValidSchema schema);
void deserializeRow(MutableColumns & columns, avro::Decoder & decoder);
void deserializeRow(MutableColumns & columns, avro::Decoder & decoder) const;
private:
using DeserializeFn = std::function<void(IColumn & column, avro::Decoder & decoder)>;
......@@ -58,6 +58,12 @@ private:
};
#if USE_POCO_JSON
/// Confluent framing + Avro binary datum encoding. Mainly used for Kafka.
/// Uses 3 caches:
/// 1. global: schema registry cache (base_url -> SchemaRegistry)
/// 2. SchemaRegistry: schema cache (schema_id -> schema)
/// 3. AvroConfluentRowInputFormat: deserializer cache (schema_id -> AvroDeserializer)
/// This is needed because KafkaStorage creates a new instance of InputFormat per a batch of messages
class AvroConfluentRowInputFormat : public IRowInputFormat
{
public:
......@@ -65,15 +71,13 @@ public:
virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
String getName() const override { return "AvroConfluentRowInputFormat"; }
class SchemaRegistry;
private:
const ColumnsWithTypeAndName header_columns;
class SchemaRegistry;
std::unique_ptr<SchemaRegistry> schema_registry;
std::shared_ptr<SchemaRegistry> schema_registry;
using SchemaId = uint32_t;
std::unordered_map<SchemaId, AvroDeserializer> deserializer_cache;
AvroDeserializer & getOrCreateDeserializer(SchemaId schema_id);
const AvroDeserializer & getOrCreateDeserializer(SchemaId schema_id);
avro::InputStreamPtr input_stream;
avro::DecoderPtr decoder;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册