提交 eee73dac 编写于 作者: A Alexey Zatelepin

Merge branch 'master' into fix-concurrent-alters

......@@ -19,6 +19,7 @@ endif()
if (NOT GTEST_SRC_DIR AND NOT GTEST_INCLUDE_DIRS AND NOT MISSING_INTERNAL_GTEST_LIBRARY)
set (USE_INTERNAL_GTEST_LIBRARY 1)
set (GTEST_MAIN_LIBRARIES gtest_main)
set (GTEST_LIBRARIES gtest)
set (GTEST_INCLUDE_DIRS ${ClickHouse_SOURCE_DIR}/contrib/googletest/googletest)
endif ()
......@@ -26,4 +27,4 @@ if((GTEST_INCLUDE_DIRS AND GTEST_MAIN_LIBRARIES) OR GTEST_SRC_DIR)
set(USE_GTEST 1)
endif()
message (STATUS "Using gtest=${USE_GTEST}: ${GTEST_INCLUDE_DIRS} : ${GTEST_MAIN_LIBRARIES} : ${GTEST_SRC_DIR}")
message (STATUS "Using gtest=${USE_GTEST}: ${GTEST_INCLUDE_DIRS} : ${GTEST_LIBRARIES}, ${GTEST_MAIN_LIBRARIES} : ${GTEST_SRC_DIR}")
# Freebsd: TODO: use system devel/xxhash. now error: undefined reference to `XXH32'
if (LZ4_INCLUDE_DIR)
if (NOT EXISTS "${LZ4_INCLUDE_DIR}/xxhash.h")
message (WARNING "LZ4 library does not have XXHash. Support for XXHash will be disabled.")
set (USE_XXHASH 0)
else ()
set (USE_XXHASH 1)
endif ()
option (USE_INTERNAL_XXHASH_LIBRARY "Set to FALSE to use system xxHash library instead of bundled" ${NOT_UNBUNDLED})
if (USE_INTERNAL_XXHASH_LIBRARY AND NOT USE_INTERNAL_LZ4_LIBRARY)
message (WARNING "can not use internal xxhash without internal lz4")
set (USE_INTERNAL_XXHASH_LIBRARY 0)
endif ()
if (USE_INTERNAL_XXHASH_LIBRARY)
set (XXHASH_LIBRARY lz4)
set (XXHASH_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/lz4/lib)
else ()
find_library (XXHASH_LIBRARY xxhash)
find_path (XXHASH_INCLUDE_DIR NAMES xxhash.h PATHS ${XXHASH_INCLUDE_PATHS})
endif ()
if (OS_FREEBSD AND NOT USE_INTERNAL_LZ4_LIBRARY)
if (XXHASH_LIBRARY AND XXHASH_INCLUDE_DIR)
set (USE_XXHASH 1)
else ()
set (USE_XXHASH 0)
endif ()
message (STATUS "Using xxhash=${USE_XXHASH}")
message (STATUS "Using xxhash=${USE_XXHASH}: ${XXHASH_INCLUDE_DIR} : ${XXHASH_LIBRARY}")
......@@ -24,7 +24,7 @@ if (OS_LINUX AND COMPILER_CLANG)
endif ()
if (LIBCXX_PATH)
# include_directories (SYSTEM BEFORE "${LIBCXX_PATH}/include" "${LIBCXX_PATH}/include/c++/v1")
include_directories (SYSTEM BEFORE "${LIBCXX_PATH}/include" "${LIBCXX_PATH}/include/c++/v1")
link_directories ("${LIBCXX_PATH}/lib")
endif ()
endif ()
......
......@@ -37,7 +37,8 @@ ODBCHandler::PoolPtr ODBCHandler::getPool(const std::string & connection_str)
std::lock_guard lock(mutex);
if (!pool_map->count(connection_str))
{
pool_map->emplace(connection_str, createAndCheckResizePocoSessionPool([connection_str] {
pool_map->emplace(connection_str, createAndCheckResizePocoSessionPool([connection_str]
{
return std::make_shared<Poco::Data::SessionPool>("ODBC", validateODBCConnectionString(connection_str));
}));
}
......
......@@ -364,7 +364,7 @@ void ColumnArray::insertRangeFrom(const IColumn & src, size_t start, size_t leng
const ColumnArray & src_concrete = static_cast<const ColumnArray &>(src);
if (start + length > src_concrete.getOffsets().size())
throw Exception("Parameter out of bound in ColumnArray::insertRangeFrom method.",
throw Exception("Parameter out of bound in ColumnArray::insertRangeFrom method. [start(" + std::to_string(start) + ") + length(" + std::to_string(length) + ") > offsets.size(" + std::to_string(src_concrete.getOffsets().size()) + ")]",
ErrorCodes::PARAMETER_OUT_OF_BOUND);
size_t nested_offset = src_concrete.offsetAt(start);
......
......@@ -188,6 +188,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
if (threads.size() > scheduled_jobs + max_free_threads)
{
thread_it->detach();
threads.erase(thread_it);
job_finished.notify_all();
return;
......
......@@ -134,10 +134,11 @@ public:
template <typename Function, typename... Args>
explicit ThreadFromGlobalPool(Function && func, Args &&... args)
{
mutex = std::make_unique<std::mutex>();
mutex = std::make_shared<std::mutex>();
/// The function object must be copyable, so we wrap lock_guard in shared_ptr.
GlobalThreadPool::instance().scheduleOrThrow([
mutex = mutex,
lock = std::make_shared<std::lock_guard<std::mutex>>(*mutex),
func = std::forward<Function>(func),
args = std::make_tuple(std::forward<Args>(args)...)]
......@@ -154,7 +155,7 @@ public:
ThreadFromGlobalPool & operator=(ThreadFromGlobalPool && rhs)
{
if (mutex)
if (joinable())
std::terminate();
mutex = std::move(rhs.mutex);
return *this;
......@@ -162,25 +163,34 @@ public:
~ThreadFromGlobalPool()
{
if (mutex)
if (joinable())
std::terminate();
}
void join()
{
if (!joinable())
std::terminate();
{
std::lock_guard lock(*mutex);
}
mutex.reset();
}
void detach()
{
if (!joinable())
std::terminate();
mutex.reset();
}
bool joinable() const
{
return static_cast<bool>(mutex);
}
private:
std::unique_ptr<std::mutex> mutex; /// Object must be moveable.
std::shared_ptr<std::mutex> mutex; /// Object must be moveable.
};
......
......@@ -7,7 +7,7 @@
# include <Common/Exception.h>
namespace DB { namespace ErrorCodes { extern const int CPUID_ERROR; }}
#elif USE_CPUINFO
# include <cpuinfo.h>
# include <cpuinfo.h> // Y_IGNORE
#endif
......
......@@ -56,6 +56,9 @@ target_link_libraries (thread_pool PRIVATE clickhouse_common_io)
add_executable (thread_pool_2 thread_pool_2.cpp)
target_link_libraries (thread_pool_2 PRIVATE clickhouse_common_io)
add_executable (thread_pool_3 thread_pool_3.cpp)
target_link_libraries (thread_pool_3 PRIVATE clickhouse_common_io)
add_executable (multi_version multi_version.cpp)
target_link_libraries (multi_version PRIVATE clickhouse_common_io)
add_check(multi_version)
......
#include <atomic>
#include <iostream>
#include <Common/ThreadPool.h>
/// Test for thread self-removal when number of free threads in pool is too large.
/// Just checks that nothing weird happens.
template <typename Pool>
void test()
{
Pool pool(10, 2, 10);
for (size_t i = 0; i < 10; ++i)
pool.schedule([]{ std::cerr << '.'; });
pool.wait();
}
int main(int, char **)
{
test<FreeThreadPool>();
std::cerr << '\n';
test<ThreadPool>();
std::cerr << '\n';
return 0;
}
......@@ -117,3 +117,9 @@
#define NO_SANITIZE_UNDEFINED
#define NO_SANITIZE_ADDRESS
#endif
#if defined __GNUC__ && !defined __clang__
#define OPTIMIZE(x) __attribute__((__optimize__(x)))
#else
#define OPTIMIZE(x)
#endif
......@@ -5,19 +5,18 @@
#include <Core/ColumnWithTypeAndName.h>
#include <Core/Field.h>
#include <Core/NamesAndTypes.h>
#include <Common/FieldVisitors.h>
#include <Common/COWPtr.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataTypes/IDataType.h>
#include <Functions/IFunction.h>
#include <Storages/IStorage.h>
#include <IO/WriteBufferFromOStream.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Parsers/IAST.h>
#include <Storages/IStorage.h>
#include <Common/COWPtr.h>
#include <Common/FieldVisitors.h>
namespace DB
{
std::ostream & operator<<(std::ostream & stream, const IBlockInputStream & what)
{
stream << "IBlockInputStream(name = " << what.getName() << ")";
......@@ -45,8 +44,7 @@ std::ostream & operator<<(std::ostream & stream, const IDataType & what)
std::ostream & operator<<(std::ostream & stream, const IStorage & what)
{
stream << "IStorage(name = " << what.getName() << ", tableName = " << what.getTableName() << ") {"
<< what.getColumns().getAllPhysical().toString()
<< "}";
<< what.getColumns().getAllPhysical().toString() << "}";
return stream;
}
......@@ -58,16 +56,15 @@ std::ostream & operator<<(std::ostream & stream, const TableStructureReadLock &)
std::ostream & operator<<(std::ostream & stream, const IFunctionBuilder & what)
{
stream << "IFunction(name = " << what.getName() << ", variadic = " << what.isVariadic() << ", args = "
<< what.getNumberOfArguments() << ")";
stream << "IFunction(name = " << what.getName() << ", variadic = " << what.isVariadic() << ", args = " << what.getNumberOfArguments()
<< ")";
return stream;
}
std::ostream & operator<<(std::ostream & stream, const Block & what)
{
stream << "Block("
<< "num_columns = " << what.columns()
<< "){" << what.dumpStructure() << "}";
<< "num_columns = " << what.columns() << "){" << what.dumpStructure() << "}";
return stream;
}
......@@ -80,14 +77,23 @@ std::ostream & operator<<(std::ostream & stream, const ColumnWithTypeAndName & w
std::ostream & operator<<(std::ostream & stream, const IColumn & what)
{
stream << "IColumn(" << what.dumpStructure() << ")";
stream << "{";
for (size_t i = 0; i < what.size(); ++i)
{
stream << applyVisitor(FieldVisitorDump(), what[i]);
if (i)
stream << ", ";
}
stream << "}";
return stream;
}
std::ostream & operator<<(std::ostream & stream, const Connection::Packet & what)
{
stream << "Connection::Packet("
<< "type = " << what.type;
// types description: Core/Protocol.h
<< "type = " << what.type;
// types description: Core/Protocol.h
if (what.exception)
stream << "exception = " << what.exception.get();
// TODO: profile_info
......
......@@ -6,7 +6,6 @@
namespace DB
{
class IBlockInputStream;
std::ostream & operator<<(std::ostream & stream, const IBlockInputStream & what);
......
......@@ -250,9 +250,12 @@ void DataTypeAggregateFunction::deserializeTextCSV(IColumn & column, ReadBuffer
}
void DataTypeAggregateFunction::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const
void DataTypeAggregateFunction::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
protobuf.writeAggregateFunction(function, static_cast<const ColumnAggregateFunction &>(column).getData()[row_num]);
if (value_index)
return;
value_index = static_cast<bool>(
protobuf.writeAggregateFunction(function, static_cast<const ColumnAggregateFunction &>(column).getData()[row_num]));
}
void DataTypeAggregateFunction::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
......
......@@ -56,7 +56,7 @@ public:
void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;
......
......@@ -431,15 +431,22 @@ void DataTypeArray::deserializeTextCSV(IColumn & column, ReadBuffer & istr, cons
}
void DataTypeArray::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const
void DataTypeArray::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
const ColumnArray & column_array = static_cast<const ColumnArray &>(column);
const ColumnArray::Offsets & offsets = column_array.getOffsets();
size_t offset = offsets[row_num - 1];
size_t offset = offsets[row_num - 1] + value_index;
size_t next_offset = offsets[row_num];
const IColumn & nested_column = column_array.getData();
for (size_t i = offset; i < next_offset; ++i)
nested->serializeProtobuf(nested_column, i, protobuf);
size_t i;
for (i = offset; i < next_offset; ++i)
{
size_t element_stored = 0;
nested->serializeProtobuf(nested_column, i, protobuf, element_stored);
if (!element_stored)
break;
}
value_index += i - offset;
}
......
......@@ -86,7 +86,8 @@ public:
void serializeProtobuf(const IColumn & column,
size_t row_num,
ProtobufWriter & protobuf) const override;
ProtobufWriter & protobuf,
size_t & value_index) const override;
void deserializeProtobuf(IColumn & column,
ProtobufReader & protobuf,
bool allow_add_row,
......
......@@ -74,9 +74,11 @@ void DataTypeDate::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const
static_cast<ColumnUInt16 &>(column).getData().push_back(value.getDayNum());
}
void DataTypeDate::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const
void DataTypeDate::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
protobuf.writeDate(DayNum(static_cast<const ColumnUInt16 &>(column).getData()[row_num]));
if (value_index)
return;
value_index = static_cast<bool>(protobuf.writeDate(DayNum(static_cast<const ColumnUInt16 &>(column).getData()[row_num])));
}
void DataTypeDate::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
......
......@@ -21,7 +21,7 @@ public:
void deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
bool canBeUsedAsVersion() const override { return true; }
......
......@@ -140,9 +140,11 @@ void DataTypeDateTime::deserializeTextCSV(IColumn & column, ReadBuffer & istr, c
static_cast<ColumnUInt32 &>(column).getData().push_back(x);
}
void DataTypeDateTime::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const
void DataTypeDateTime::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
protobuf.writeDateTime(static_cast<const ColumnUInt32 &>(column).getData()[row_num]);
if (value_index)
return;
value_index = static_cast<bool>(protobuf.writeDateTime(static_cast<const ColumnUInt32 &>(column).getData()[row_num]));
}
void DataTypeDateTime::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
......
......@@ -46,7 +46,7 @@ public:
void deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
bool canBeUsedAsVersion() const override { return true; }
......
......@@ -225,10 +225,12 @@ void DataTypeEnum<Type>::deserializeBinaryBulk(
}
template <typename Type>
void DataTypeEnum<Type>::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const
void DataTypeEnum<Type>::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
if (value_index)
return;
protobuf.prepareEnumMapping(values);
protobuf.writeEnum(static_cast<const ColumnType &>(column).getData()[row_num]);
value_index = static_cast<bool>(protobuf.writeEnum(static_cast<const ColumnType &>(column).getData()[row_num]));
}
template<typename Type>
......
......@@ -105,7 +105,7 @@ public:
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, const size_t offset, size_t limit) const override;
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, const size_t limit, const double avg_value_size_hint) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override { return ColumnType::create(); }
......
......@@ -209,10 +209,12 @@ void DataTypeFixedString::deserializeTextCSV(IColumn & column, ReadBuffer & istr
}
void DataTypeFixedString::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const
void DataTypeFixedString::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
if (value_index)
return;
const char * pos = reinterpret_cast<const char *>(&static_cast<const ColumnFixedString &>(column).getChars()[n * row_num]);
protobuf.writeString(StringRef(pos, n));
value_index = static_cast<bool>(protobuf.writeString(StringRef(pos, n)));
}
......
......@@ -64,7 +64,7 @@ public:
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;
......
......@@ -110,9 +110,9 @@ public:
serializeImpl(column, row_num, &IDataType::serializeAsTextXML, ostr, settings);
}
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override
{
serializeImpl(column, row_num, &IDataType::serializeProtobuf, protobuf);
serializeImpl(column, row_num, &IDataType::serializeProtobuf, protobuf, value_index);
}
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
......
......@@ -311,11 +311,11 @@ void DataTypeNullable::serializeTextXML(const IColumn & column, size_t row_num,
nested_data_type->serializeAsTextXML(col.getNestedColumn(), row_num, ostr, settings);
}
void DataTypeNullable::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const
void DataTypeNullable::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
const ColumnNullable & col = static_cast<const ColumnNullable &>(column);
if (!col.isNullAt(row_num))
nested_data_type->serializeProtobuf(col.getNestedColumn(), row_num, protobuf);
nested_data_type->serializeProtobuf(col.getNestedColumn(), row_num, protobuf, value_index);
}
void DataTypeNullable::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
......
......@@ -70,7 +70,7 @@ public:
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;
......
......@@ -205,9 +205,11 @@ void DataTypeNumberBase<T>::deserializeBinaryBulk(IColumn & column, ReadBuffer &
template <typename T>
void DataTypeNumberBase<T>::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const
void DataTypeNumberBase<T>::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
protobuf.writeNumber(static_cast<const ColumnVector<T> &>(column).getData()[row_num]);
if (value_index)
return;
value_index = static_cast<bool>(protobuf.writeNumber(static_cast<const ColumnVector<T> &>(column).getData()[row_num]));
}
......
......@@ -36,7 +36,7 @@ public:
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override;
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;
......
......@@ -303,9 +303,11 @@ void DataTypeString::deserializeTextCSV(IColumn & column, ReadBuffer & istr, con
}
void DataTypeString::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const
void DataTypeString::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
protobuf.writeString(static_cast<const ColumnString &>(column).getDataAt(row_num));
if (value_index)
return;
value_index = static_cast<bool>(protobuf.writeString(static_cast<const ColumnString &>(column).getDataAt(row_num)));
}
......
......@@ -45,7 +45,7 @@ public:
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;
......
......@@ -407,10 +407,15 @@ void DataTypeTuple::deserializeBinaryBulkWithMultipleStreams(
settings.path.pop_back();
}
void DataTypeTuple::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const
void DataTypeTuple::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
for (const auto i : ext::range(0, ext::size(elems)))
elems[i]->serializeProtobuf(extractElementColumn(column, i), row_num, protobuf);
for (; value_index < elems.size(); ++value_index)
{
size_t stored = 0;
elems[value_index]->serializeProtobuf(extractElementColumn(column, value_index), row_num, protobuf, stored);
if (!stored)
break;
}
}
void DataTypeTuple::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
......
......@@ -77,7 +77,7 @@ public:
DeserializeBinaryBulkSettings & settings,
DeserializeBinaryBulkStatePtr & state) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & reader, bool allow_add_row, bool & row_added) const override;
MutableColumnPtr createColumn() const override;
......
......@@ -73,9 +73,11 @@ void DataTypeUUID::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const
static_cast<ColumnUInt128 &>(column).getData().push_back(value);
}
void DataTypeUUID::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const
void DataTypeUUID::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
protobuf.writeUUID(UUID(static_cast<const ColumnUInt128 &>(column).getData()[row_num]));
if (value_index)
return;
value_index = static_cast<bool>(protobuf.writeUUID(UUID(static_cast<const ColumnUInt128 &>(column).getData()[row_num])));
}
void DataTypeUUID::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const
......
......@@ -24,7 +24,7 @@ public:
void deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
bool canBeUsedInBitOperations() const override { return true; }
......
......@@ -136,9 +136,11 @@ void DataTypeDecimal<T>::deserializeBinaryBulk(IColumn & column, ReadBuffer & is
template <typename T>
void DataTypeDecimal<T>::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const
void DataTypeDecimal<T>::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const
{
protobuf.writeDecimal(static_cast<const ColumnType &>(column).getData()[row_num], scale);
if (value_index)
return;
value_index = static_cast<bool>(protobuf.writeDecimal(static_cast<const ColumnType &>(column).getData()[row_num], scale));
}
......
......@@ -100,7 +100,7 @@ public:
void deserializeBinary(IColumn & column, ReadBuffer & istr) const override;
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const override;
void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const override;
Field getDefault() const override;
......
......@@ -254,7 +254,7 @@ public:
virtual void serializeAsTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const;
/** Serialize to a protobuf. */
virtual void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const = 0;
virtual void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const = 0;
virtual void deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const = 0;
protected:
......
......@@ -28,7 +28,7 @@ public:
void deserializeBinaryBulk(IColumn &, ReadBuffer &, size_t, double) const override { throwNoSerialization(); }
void serializeText(const IColumn &, size_t, WriteBuffer &, const FormatSettings &) const override { throwNoSerialization(); }
void deserializeText(IColumn &, ReadBuffer &, const FormatSettings &) const override { throwNoSerialization(); }
void serializeProtobuf(const IColumn &, size_t, ProtobufWriter &) const override { throwNoSerialization(); }
void serializeProtobuf(const IColumn &, size_t, ProtobufWriter &, size_t &) const override { throwNoSerialization(); }
void deserializeProtobuf(IColumn &, ProtobufReader &, bool, bool &) const override { throwNoSerialization(); }
MutableColumnPtr createColumn() const override
......
......@@ -111,7 +111,7 @@ void CacheDictionary::isInImpl(const PaddedPODArray<Key> & child_ids, const Ance
const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);
PaddedPODArray<Key> children(out_size);
PaddedPODArray<Key> children(out_size, 0);
PaddedPODArray<Key> parents(child_ids.begin(), child_ids.end());
while (true)
......
......@@ -12,26 +12,12 @@ BlockOutputStreamFromRowOutputStream::BlockOutputStreamFromRowOutputStream(RowOu
void BlockOutputStreamFromRowOutputStream::write(const Block & block)
{
size_t rows = block.rows();
size_t columns = block.columns();
for (size_t i = 0; i < rows; ++i)
{
if (!first_row)
row_output->writeRowBetweenDelimiter();
first_row = false;
row_output->writeRowStartDelimiter();
for (size_t j = 0; j < columns; ++j)
{
if (j != 0)
row_output->writeFieldDelimiter();
auto & col = block.getByPosition(j);
row_output->writeField(*col.column, *col.type, i);
}
row_output->writeRowEndDelimiter();
row_output->write(block, i);
}
}
......
#include <Core/Defines.h>
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
......@@ -189,7 +191,13 @@ String CSVRowInputStream::getDiagnosticInfo()
}
bool CSVRowInputStream::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
/** gcc-7 generates wrong code with optimization level greater than 1.
* See tests: dbms/src/IO/tests/write_int.cpp
* and dbms/tests/queries/0_stateless/00898_parsing_bad_diagnostic_message.sh
* This is compiler bug. The bug does not present in gcc-8 and clang-8.
* Nevertheless, we don't need high optimization of this function.
*/
bool OPTIMIZE(1) CSVRowInputStream::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name)
{
const char delimiter = format_settings.csv.delimiter;
......
#include <Common/Exception.h>
#include <Core/Block.h>
#include <Formats/IRowOutputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
void IRowOutputStream::write(const Block & block, size_t row_num)
{
......@@ -23,4 +29,9 @@ void IRowOutputStream::write(const Block & block, size_t row_num)
writeRowEndDelimiter();
}
void IRowOutputStream::writeField(const IColumn &, const IDataType &, size_t)
{
throw Exception("Method writeField is not implemented for output format", ErrorCodes::NOT_IMPLEMENTED);
}
}
......@@ -28,7 +28,7 @@ public:
virtual void write(const Block & block, size_t row_num);
/** Write single value. */
virtual void writeField(const IColumn & column, const IDataType & type, size_t row_num) = 0;
virtual void writeField(const IColumn & column, const IDataType & type, size_t row_num);
/** Write delimiter. */
virtual void writeFieldDelimiter() {} /// delimiter between values
......
#include <Formats/FormatFactory.h>
#include <Common/config.h>
#if USE_PROTOBUF
#include "ProtobufBlockOutputStream.h"
#include <Core/Block.h>
#include <Formats/FormatSchemaInfo.h>
#include <Formats/ProtobufSchemas.h>
#include <Interpreters/Context.h>
#include <google/protobuf/descriptor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int NO_DATA_FOR_REQUIRED_PROTOBUF_FIELD;
}
ProtobufBlockOutputStream::ProtobufBlockOutputStream(
WriteBuffer & buffer_,
const Block & header_,
const google::protobuf::Descriptor * message_type,
const FormatSettings & format_settings_)
: writer(buffer_, message_type), header(header_), format_settings(format_settings_)
{
}
void ProtobufBlockOutputStream::write(const Block & block)
{
std::vector<const ColumnWithTypeAndName *> columns_in_write_order;
const auto & fields_in_write_order = writer.fieldsInWriteOrder();
columns_in_write_order.reserve(fields_in_write_order.size());
for (size_t i = 0; i != fields_in_write_order.size(); ++i)
{
const auto * field = fields_in_write_order[i];
const ColumnWithTypeAndName * column = nullptr;
if (block.has(field->name()))
{
column = &block.getByName(field->name());
}
else if (field->is_required())
{
throw Exception(
"Output doesn't have a column named '" + field->name() + "' which is required to write the output in the protobuf format.",
ErrorCodes::NO_DATA_FOR_REQUIRED_PROTOBUF_FIELD);
}
columns_in_write_order.emplace_back(column);
}
for (size_t row_num = 0; row_num != block.rows(); ++row_num)
{
writer.newMessage();
for (const auto * column : columns_in_write_order)
{
if (column)
{
assert(column->name == writer.currentField()->name());
column->type->serializeProtobuf(*(column->column), row_num, writer);
}
writer.nextField();
}
}
}
void registerOutputFormatProtobuf(FormatFactory & factory)
{
factory.registerOutputFormat(
"Protobuf", [](WriteBuffer & buf, const Block & header, const Context & context, const FormatSettings & format_settings)
{
const auto * message_type = ProtobufSchemas::instance().getMessageTypeForFormatSchema(FormatSchemaInfo(context, "proto"));
return std::make_shared<ProtobufBlockOutputStream>(buf, header, message_type, format_settings);
});
}
}
#else
namespace DB
{
class FormatFactory;
void registerOutputFormatProtobuf(FormatFactory &) {}
}
#endif
......@@ -69,6 +69,12 @@ namespace ProtobufColumnMatcher
const std::vector<String> & column_names,
const google::protobuf::Descriptor * message_type);
template <typename Traits = DefaultTraits>
static std::unique_ptr<Message<Traits>> matchColumns(
const std::vector<String> & column_names,
const google::protobuf::Descriptor * message_type,
std::vector<const google::protobuf::FieldDescriptor *> & field_descriptors_without_match);
namespace details
{
void throwNoCommonColumns();
......@@ -88,7 +94,8 @@ namespace ProtobufColumnMatcher
std::unique_ptr<Message<Traits>> matchColumnsRecursive(
ColumnNameMatcher & name_matcher,
const google::protobuf::Descriptor * message_type,
const String & field_name_prefix)
const String & field_name_prefix,
std::vector<const google::protobuf::FieldDescriptor *> * field_descriptors_without_match)
{
auto message = std::make_unique<Message<Traits>>();
for (int i = 0; i != message_type->field_count(); ++i)
......@@ -98,7 +105,10 @@ namespace ProtobufColumnMatcher
|| (field_descriptor->type() == google::protobuf::FieldDescriptor::TYPE_GROUP))
{
auto nested_message = matchColumnsRecursive<Traits>(
name_matcher, field_descriptor->message_type(), field_name_prefix + field_descriptor->name() + ".");
name_matcher,
field_descriptor->message_type(),
field_name_prefix + field_descriptor->name() + ".",
field_descriptors_without_match);
if (nested_message)
{
message->fields.emplace_back();
......@@ -112,7 +122,12 @@ namespace ProtobufColumnMatcher
else
{
size_t column_index = name_matcher.findColumn(field_name_prefix + field_descriptor->name());
if (column_index != static_cast<size_t>(-1))
if (column_index == static_cast<size_t>(-1))
{
if (field_descriptors_without_match)
field_descriptors_without_match->emplace_back(field_descriptor);
}
else
{
message->fields.emplace_back();
auto & current_field = message->fields.back();
......@@ -144,16 +159,34 @@ namespace ProtobufColumnMatcher
}
template <typename Data>
static std::unique_ptr<Message<Data>> matchColumns(
static std::unique_ptr<Message<Data>> matchColumnsImpl(
const std::vector<String> & column_names,
const google::protobuf::Descriptor * message_type)
const google::protobuf::Descriptor * message_type,
std::vector<const google::protobuf::FieldDescriptor *> * field_descriptors_without_match)
{
details::ColumnNameMatcher name_matcher(column_names);
auto message = details::matchColumnsRecursive<Data>(name_matcher, message_type, "");
auto message = details::matchColumnsRecursive<Data>(name_matcher, message_type, "", field_descriptors_without_match);
if (!message)
details::throwNoCommonColumns();
return message;
}
template <typename Data>
static std::unique_ptr<Message<Data>> matchColumns(
const std::vector<String> & column_names,
const google::protobuf::Descriptor * message_type)
{
return matchColumnsImpl<Data>(column_names, message_type, nullptr);
}
template <typename Data>
static std::unique_ptr<Message<Data>> matchColumns(
const std::vector<String> & column_names,
const google::protobuf::Descriptor * message_type,
std::vector<const google::protobuf::FieldDescriptor *> & field_descriptors_without_match)
{
return matchColumnsImpl<Data>(column_names, message_type, &field_descriptors_without_match);
}
}
}
......
......@@ -48,7 +48,8 @@ namespace
}
// SimpleReader is an utility class to deserialize protobufs.
// Knows nothing about protobuf schemas, just provides useful functions to deserialize data.
ProtobufReader::SimpleReader::SimpleReader(ReadBuffer & in_)
: in(in_)
, cursor(1 /* We starts at cursor == 1 to keep any cursor value > REACHED_END, this allows to simplify conditions */)
......@@ -374,6 +375,7 @@ void ProtobufReader::SimpleReader::ignoreGroup()
}
// Implementation for a converter from any protobuf field type to any DB data type.
class ProtobufReader::ConverterBaseImpl : public ProtobufReader::IConverter
{
public:
......@@ -688,19 +690,19 @@ private:
std::optional<std::unordered_map<StringRef, Int16>> enum_name_to_value_map;
};
#define PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_STRINGS(field_type_id) \
template<> \
class ProtobufReader::ConverterImpl<field_type_id> : public ConverterFromString \
#define PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_STRINGS(field_type_id) \
template <> \
std::unique_ptr<ProtobufReader::IConverter> ProtobufReader::createConverter<field_type_id>( \
const google::protobuf::FieldDescriptor * field) \
{ \
using ConverterFromString::ConverterFromString; \
return std::make_unique<ConverterFromString>(simple_reader, field); \
}
PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_STRINGS(google::protobuf::FieldDescriptor::TYPE_STRING);
PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_STRINGS(google::protobuf::FieldDescriptor::TYPE_BYTES);
#undef PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_STRINGS
PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_STRINGS(google::protobuf::FieldDescriptor::TYPE_STRING)
PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_STRINGS(google::protobuf::FieldDescriptor::TYPE_BYTES)
#undef PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_STRINGS
template <int field_type_id, typename T>
template <int field_type_id, typename FromType>
class ProtobufReader::ConverterFromNumber : public ConverterBaseImpl
{
public:
......@@ -708,7 +710,7 @@ public:
bool readStringInto(PaddedPODArray<UInt8> & str) override
{
T number;
FromType number;
if (!readField(number))
return false;
WriteBufferFromVector<PaddedPODArray<UInt8>> buf(str);
......@@ -765,7 +767,7 @@ private:
template <typename To>
bool readNumeric(To & value)
{
T number;
FromType number;
if (!readField(number))
return false;
value = numericCast<To>(number);
......@@ -775,9 +777,9 @@ private:
template<typename EnumType>
bool readEnum(EnumType & value)
{
if constexpr (!std::is_integral_v<T>)
if constexpr (!std::is_integral_v<FromType>)
cannotConvertType("Enum"); // It's not correct to convert floating point to enum.
T number;
FromType number;
if (!readField(number))
return false;
value = numericCast<EnumType>(number);
......@@ -799,40 +801,39 @@ private:
template <typename S>
bool readDecimal(Decimal<S> & decimal, UInt32 scale)
{
T number;
FromType number;
if (!readField(number))
return false;
decimal.value = convertToDecimal<DataTypeNumber<T>, DataTypeDecimal<Decimal<S>>>(number, scale);
decimal.value = convertToDecimal<DataTypeNumber<FromType>, DataTypeDecimal<Decimal<S>>>(number, scale);
return true;
}
bool readField(T & value)
bool readField(FromType & value)
{
if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_INT32) && std::is_same_v<T, Int64>)
if constexpr (((field_type_id == google::protobuf::FieldDescriptor::TYPE_INT32) && std::is_same_v<FromType, Int64>)
|| ((field_type_id == google::protobuf::FieldDescriptor::TYPE_INT64) && std::is_same_v<FromType, Int64>))
{
return simple_reader.readInt(value);
else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_SINT32) && std::is_same_v<T, Int64>)
return simple_reader.readSInt(value);
else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_UINT32) && std::is_same_v<T, UInt64>)
}
else if constexpr (((field_type_id == google::protobuf::FieldDescriptor::TYPE_UINT32) && std::is_same_v<FromType, UInt64>)
|| ((field_type_id == google::protobuf::FieldDescriptor::TYPE_UINT64) && std::is_same_v<FromType, UInt64>))
{
return simple_reader.readUInt(value);
else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_INT64) && std::is_same_v<T, Int64>)
return simple_reader.readInt(value);
else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_SINT64) && std::is_same_v<T, Int64>)
}
else if constexpr (((field_type_id == google::protobuf::FieldDescriptor::TYPE_SINT32) && std::is_same_v<FromType, Int64>)
|| ((field_type_id == google::protobuf::FieldDescriptor::TYPE_SINT64) && std::is_same_v<FromType, Int64>))
{
return simple_reader.readSInt(value);
else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_UINT64) && std::is_same_v<T, UInt64>)
return simple_reader.readUInt(value);
else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_FIXED32) && std::is_same_v<T, UInt32>)
return simple_reader.readFixed(value);
else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_SFIXED32) && std::is_same_v<T, Int32>)
return simple_reader.readFixed(value);
else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_FIXED64) && std::is_same_v<T, UInt64>)
return simple_reader.readFixed(value);
else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_SFIXED64) && std::is_same_v<T, Int64>)
return simple_reader.readFixed(value);
else if constexpr ((field_type_id == google::protobuf::FieldDescriptor::TYPE_FLOAT) && std::is_same_v<T, float>)
return simple_reader.readFixed(value);
}
else
{
static_assert((field_type_id == google::protobuf::FieldDescriptor::TYPE_DOUBLE) && std::is_same_v<T, double>);
static_assert(((field_type_id == google::protobuf::FieldDescriptor::TYPE_FIXED32) && std::is_same_v<FromType, UInt32>)
|| ((field_type_id == google::protobuf::FieldDescriptor::TYPE_SFIXED32) && std::is_same_v<FromType, Int32>)
|| ((field_type_id == google::protobuf::FieldDescriptor::TYPE_FIXED64) && std::is_same_v<FromType, UInt64>)
|| ((field_type_id == google::protobuf::FieldDescriptor::TYPE_SFIXED64) && std::is_same_v<FromType, Int64>)
|| ((field_type_id == google::protobuf::FieldDescriptor::TYPE_FLOAT) && std::is_same_v<FromType, float>)
|| ((field_type_id == google::protobuf::FieldDescriptor::TYPE_DOUBLE) && std::is_same_v<FromType, double>));
return simple_reader.readFixed(value);
}
}
......@@ -840,30 +841,30 @@ private:
std::optional<std::unordered_set<Int16>> set_of_enum_values;
};
#define PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(field_type_id, field_type) \
template<> \
class ProtobufReader::ConverterImpl<field_type_id> : public ConverterFromNumber<field_type_id, field_type> \
#define PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(field_type_id, field_type) \
template <> \
std::unique_ptr<ProtobufReader::IConverter> ProtobufReader::createConverter<field_type_id>( \
const google::protobuf::FieldDescriptor * field) \
{ \
using ConverterFromNumber::ConverterFromNumber; \
}
PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_INT32, Int64);
PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_SINT32, Int64);
PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_UINT32, UInt64);
PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_INT64, Int64);
PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_SINT64, Int64);
PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_UINT64, UInt64);
PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_FIXED32, UInt32);
PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_SFIXED32, Int32);
PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_FIXED64, UInt64);
PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_SFIXED64, Int64);
PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_FLOAT, float);
PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_DOUBLE, double);
#undef PROTOBUF_READER_CONVERTER_IMPL_SPECIALIZATION_FOR_NUMBERS
template<>
class ProtobufReader::ConverterImpl<google::protobuf::FieldDescriptor::TYPE_BOOL> : public ConverterBaseImpl
return std::make_unique<ConverterFromNumber<field_type_id, field_type>>(simple_reader, field); \
}
PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_INT32, Int64);
PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_SINT32, Int64);
PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_UINT32, UInt64);
PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_INT64, Int64);
PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_SINT64, Int64);
PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_UINT64, UInt64);
PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_FIXED32, UInt32);
PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_SFIXED32, Int32);
PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_FIXED64, UInt64);
PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_SFIXED64, Int64);
PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_FLOAT, float);
PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::FieldDescriptor::TYPE_DOUBLE, double);
#undef PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS
class ProtobufReader::ConverterFromBool : public ConverterBaseImpl
{
public:
using ConverterBaseImpl::ConverterBaseImpl;
......@@ -913,10 +914,15 @@ private:
}
};
template <>
std::unique_ptr<ProtobufReader::IConverter> ProtobufReader::createConverter<google::protobuf::FieldDescriptor::TYPE_BOOL>(
const google::protobuf::FieldDescriptor * field)
{
return std::make_unique<ConverterFromBool>(simple_reader, field);
}
template<>
class ProtobufReader::ConverterImpl<google::protobuf::FieldDescriptor::TYPE_ENUM> : public ConverterBaseImpl
class ProtobufReader::ConverterFromEnum : public ConverterBaseImpl
{
public:
using ConverterBaseImpl::ConverterBaseImpl;
......@@ -973,10 +979,15 @@ private:
Int64 pbnumber;
if (!readField(pbnumber))
return false;
auto it = enum_pbnumber_to_value_map->find(pbnumber);
if (it == enum_pbnumber_to_value_map->end())
cannotConvertValue(toString(pbnumber), "Enum");
value = static_cast<T>(it->second);
if (enum_pbnumber_always_equals_value)
value = static_cast<T>(pbnumber);
else
{
auto it = enum_pbnumber_to_value_map->find(pbnumber);
if (it == enum_pbnumber_to_value_map->end())
cannotConvertValue(toString(pbnumber), "Enum");
value = static_cast<T>(it->second);
}
return true;
}
......@@ -999,12 +1010,19 @@ private:
if (likely(enum_pbnumber_to_value_map.has_value()))
return;
enum_pbnumber_to_value_map.emplace();
enum_pbnumber_always_equals_value = true;
for (const auto & name_value_pair : name_value_pairs)
{
Int16 value = name_value_pair.second;
const auto * enum_descriptor = field->enum_type()->FindValueByName(name_value_pair.first);
if (enum_descriptor)
{
enum_pbnumber_to_value_map->emplace(enum_descriptor->number(), value);
if (enum_descriptor->number() != value)
enum_pbnumber_always_equals_value = false;
}
else
enum_pbnumber_always_equals_value = false;
}
}
......@@ -1015,8 +1033,16 @@ private:
std::optional<std::unordered_map<Int64, StringRef>> enum_pbnumber_to_name_map;
std::optional<std::unordered_map<Int64, Int16>> enum_pbnumber_to_value_map;
bool enum_pbnumber_always_equals_value;
};
template <>
std::unique_ptr<ProtobufReader::IConverter> ProtobufReader::createConverter<google::protobuf::FieldDescriptor::TYPE_ENUM>(
const google::protobuf::FieldDescriptor * field)
{
return std::make_unique<ConverterFromEnum>(simple_reader, field);
}
ProtobufReader::ProtobufReader(
ReadBuffer & in_, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names)
......@@ -1041,7 +1067,7 @@ void ProtobufReader::setTraitsDataAfterMatchingColumns(Message * message)
{
#define PROTOBUF_READER_CONVERTER_CREATING_CASE(field_type_id) \
case field_type_id: \
field.data.converter = std::make_unique<ConverterImpl<field_type_id>>(simple_reader, field.field_descriptor); \
field.data.converter = createConverter<field_type_id>(field.field_descriptor); \
break
PROTOBUF_READER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_STRING);
PROTOBUF_READER_CONVERTER_CREATING_CASE(google::protobuf::FieldDescriptor::TYPE_BYTES);
......
......@@ -158,9 +158,10 @@ private:
};
class ConverterBaseImpl;
template <int type_id> class ConverterImpl;
class ConverterFromString;
template<int field_type_id, typename FromType> class ConverterFromNumber;
class ConverterFromBool;
class ConverterFromEnum;
struct ColumnMatcherTraits
{
......@@ -178,6 +179,9 @@ private:
void setTraitsDataAfterMatchingColumns(Message * message);
template <int field_type_id>
std::unique_ptr<IConverter> createConverter(const google::protobuf::FieldDescriptor * field);
SimpleReader simple_reader;
std::unique_ptr<Message> root_message;
Message* current_message = nullptr;
......@@ -202,30 +206,30 @@ class ProtobufReader
public:
bool startMessage() { return false; }
void endMessage() {}
bool readColumnIndex(size_t & column_index) { return false; }
bool readNumber(Int8 & value) { return false; }
bool readNumber(UInt8 & value) { return false; }
bool readNumber(Int16 & value) { return false; }
bool readNumber(UInt16 & value) { return false; }
bool readNumber(Int32 & value) { return false; }
bool readNumber(UInt32 & value) { return false; }
bool readNumber(Int64 & value) { return false; }
bool readNumber(UInt64 & value) { return false; }
bool readNumber(UInt128 & value) { return false; }
bool readNumber(Float32 & value) { return false; }
bool readNumber(Float64 & value) { return false; }
bool readStringInto(PaddedPODArray<UInt8> & str) { return false; }
void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> & name_value_pairs) {}
void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> & name_value_pairs) {}
bool readEnum(Int8 & value) { return false; }
bool readEnum(Int16 & value) { return false; }
bool readUUID(UUID & uuid) { return false; }
bool readDate(DayNum & date) { return false; }
bool readDateTime(time_t & tm) { return false; }
bool readDecimal(Decimal32 & decimal, UInt32 precision, UInt32 scale) { return false; }
bool readDecimal(Decimal64 & decimal, UInt32 precision, UInt32 scale) { return false; }
bool readDecimal(Decimal128 & decimal, UInt32 precision, UInt32 scale) { return false; }
bool readAggregateFunction(const AggregateFunctionPtr & function, AggregateDataPtr place, Arena & arena) { return false; }
bool readColumnIndex(size_t &) { return false; }
bool readNumber(Int8 &) { return false; }
bool readNumber(UInt8 &) { return false; }
bool readNumber(Int16 &) { return false; }
bool readNumber(UInt16 &) { return false; }
bool readNumber(Int32 &) { return false; }
bool readNumber(UInt32 &) { return false; }
bool readNumber(Int64 &) { return false; }
bool readNumber(UInt64 &) { return false; }
bool readNumber(UInt128 &) { return false; }
bool readNumber(Float32 &) { return false; }
bool readNumber(Float64 &) { return false; }
bool readStringInto(PaddedPODArray<UInt8> &) { return false; }
void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> &) {}
void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> &) {}
bool readEnum(Int8 &) { return false; }
bool readEnum(Int16 &) { return false; }
bool readUUID(UUID &) { return false; }
bool readDate(DayNum &) { return false; }
bool readDateTime(time_t &) { return false; }
bool readDecimal(Decimal32 &, UInt32, UInt32) { return false; }
bool readDecimal(Decimal64 &, UInt32, UInt32) { return false; }
bool readDecimal(Decimal128 &, UInt32, UInt32) { return false; }
bool readAggregateFunction(const AggregateFunctionPtr &, AggregateDataPtr, Arena &) { return false; }
bool maybeCanReadValue() const { return false; }
};
......
......@@ -12,8 +12,8 @@
namespace DB
{
ProtobufRowInputStream::ProtobufRowInputStream(ReadBuffer & in_, const Block & header, const FormatSchemaInfo & info)
: data_types(header.getDataTypes()), reader(in_, ProtobufSchemas::instance().getMessageTypeForFormatSchema(info), header.getNames())
ProtobufRowInputStream::ProtobufRowInputStream(ReadBuffer & in_, const Block & header, const FormatSchemaInfo & format_schema)
: data_types(header.getDataTypes()), reader(in_, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames())
{
}
......@@ -87,7 +87,7 @@ void registerInputFormatProtobuf(FormatFactory & factory)
namespace DB
{
class FormatFactory;
void registerInputFormatProtobuf(FormatFactory & factory) {}
void registerInputFormatProtobuf(FormatFactory &) {}
}
#endif
......@@ -13,12 +13,18 @@ class Block;
class FormatSchemaInfo;
/** Interface of stream, that allows to read data by rows.
/** Stream designed to deserialize data from the google protobuf format.
* Each row is read as a separated message.
* These messages are delimited according to documentation
* https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/util/delimited_message_util.h
* Serializing in the protobuf format requires the 'format_schema' setting to be set, e.g.
* INSERT INTO table FORMAT Protobuf SETTINGS format_schema = 'schema:Message'
* where schema is the name of "schema.proto" file specifying protobuf schema.
*/
class ProtobufRowInputStream : public IRowInputStream
{
public:
ProtobufRowInputStream(ReadBuffer & in_, const Block & header, const FormatSchemaInfo & info);
ProtobufRowInputStream(ReadBuffer & in_, const Block & header, const FormatSchemaInfo & format_schema);
~ProtobufRowInputStream() override;
bool read(MutableColumns & columns, RowReadExtension & extra) override;
......
#include <Formats/FormatFactory.h>
#include <Common/config.h>
#if USE_PROTOBUF
#include "ProtobufRowOutputStream.h"
#include <Core/Block.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
#include <Formats/FormatSchemaInfo.h>
#include <Formats/ProtobufSchemas.h>
#include <google/protobuf/descriptor.h>
namespace DB
{
ProtobufRowOutputStream::ProtobufRowOutputStream(WriteBuffer & out, const Block & header, const FormatSchemaInfo & format_schema)
: data_types(header.getDataTypes()), writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames())
{
value_indices.resize(header.columns());
}
void ProtobufRowOutputStream::write(const Block & block, size_t row_num)
{
writer.startMessage();
std::fill(value_indices.begin(), value_indices.end(), 0);
size_t column_index;
while (writer.writeField(column_index))
data_types[column_index]->serializeProtobuf(
*block.getByPosition(column_index).column, row_num, writer, value_indices[column_index]);
writer.endMessage();
}
void registerOutputFormatProtobuf(FormatFactory & factory)
{
factory.registerOutputFormat(
"Protobuf", [](WriteBuffer & buf, const Block & header, const Context & context, const FormatSettings &)
{
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<ProtobufRowOutputStream>(buf, header, FormatSchemaInfo(context, "proto")), header);
});
}
}
#else
namespace DB
{
class FormatFactory;
void registerOutputFormatProtobuf(FormatFactory &) {}
}
#endif
#pragma once
#include <Core/Block.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Formats/FormatSettings.h>
#include <DataTypes/IDataType.h>
#include <Formats/IRowOutputStream.h>
#include <Formats/ProtobufWriter.h>
......@@ -17,6 +16,9 @@ namespace protobuf
namespace DB
{
class Block;
class FormatSchemaInfo;
/** Stream designed to serialize data in the google protobuf format.
* Each row is written as a separated message.
* These messages are delimited according to documentation
......@@ -25,23 +27,18 @@ namespace DB
* SELECT * from table FORMAT Protobuf SETTINGS format_schema = 'schema:Message'
* where schema is the name of "schema.proto" file specifying protobuf schema.
*/
class ProtobufBlockOutputStream : public IBlockOutputStream
class ProtobufRowOutputStream : public IRowOutputStream
{
public:
ProtobufBlockOutputStream(
WriteBuffer & buffer_,
const Block & header_,
const google::protobuf::Descriptor * message_prototype_,
const FormatSettings & format_settings_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
ProtobufRowOutputStream(WriteBuffer & out, const Block & header, const FormatSchemaInfo & format_schema);
void write(const Block & block, size_t row_num) override;
std::string getContentType() const override { return "application/octet-stream"; }
private:
DataTypes data_types;
ProtobufWriter writer;
const Block header;
const FormatSettings format_settings;
std::vector<size_t> value_indices;
};
}
......@@ -2,8 +2,8 @@
#if USE_PROTOBUF
#include <Formats/FormatSchemaInfo.h>
#include <Formats/ProtobufSchemas.h>
#include <google/protobuf/compiler/importer.h>
#include <Formats/ProtobufSchemas.h> // Y_IGNORE
#include <google/protobuf/compiler/importer.h> // Y_IGNORE
#include <Common/Exception.h>
......@@ -12,7 +12,6 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
extern const int CANNOT_PARSE_PROTOBUF_SCHEMA;
}
......@@ -74,11 +73,6 @@ const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForFormatSch
return importer->import(info.schemaPath(), info.messageName());
}
const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForColumns(const std::vector<ColumnWithTypeAndName> & /*columns*/)
{
throw Exception("Using the 'Protobuf' format without schema is not implemented", ErrorCodes::NOT_IMPLEMENTED);
}
}
#endif
......@@ -5,7 +5,6 @@
#include <memory>
#include <unordered_map>
#include <vector>
#include <Core/Types.h>
#include <ext/singleton.h>
......@@ -21,9 +20,8 @@ namespace protobuf
namespace DB
{
class FormatSchemaInfo;
struct ColumnWithTypeAndName;
/** Keeps parsed google protobuf schemas either parsed from files or generated from DB columns.
/** Keeps parsed google protobuf schemas parsed from files.
* This class is used to handle the "Protobuf" input/output formats.
*/
class ProtobufSchemas : public ext::singleton<ProtobufSchemas>
......@@ -36,10 +34,6 @@ public:
/// The function never returns nullptr, it throws an exception if it cannot load or parse the file.
const google::protobuf::Descriptor * getMessageTypeForFormatSchema(const FormatSchemaInfo & info);
/// Generates a message type with suitable types of fields to store a block with |header|, then returns the descriptor
/// of the generated message type.
const google::protobuf::Descriptor * getMessageTypeForColumns(const std::vector<ColumnWithTypeAndName> & columns);
private:
class ImporterWithSourceTree;
std::unordered_map<String, std::unique_ptr<ImporterWithSourceTree>> importers;
......
#include <cassert>
#include <Formats/ProtobufSimpleWriter.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace
{
// Note: We cannot simply use writeVarUInt() from IO/VarInt.h here because there is one small difference:
// Google protobuf's representation of 64-bit integer contains from 1 to 10 bytes, whileas writeVarUInt() writes from 1 to 9 bytes
// because it omits the tenth byte (which is not necessary to decode actually).
void writePbVarUInt(UInt64 value, WriteBuffer & buf)
{
while (value >= 0x80)
{
buf.write(static_cast<char>(value | 0x80));
value >>= 7;
}
buf.write(static_cast<char>(value));
}
void writePbVarInt(Int64 value, WriteBuffer & buf)
{
writePbVarUInt((static_cast<UInt64>(value) << 1) ^ static_cast<UInt64>(value >> 63), buf);
}
void writePbVarIntNoZigZagEncoding(Int64 value, WriteBuffer & buf) { writePbVarUInt(static_cast<UInt64>(value), buf); }
}
enum ProtobufSimpleWriter::WireType : UInt32
{
VARINT = 0,
BITS64 = 1,
LENGTH_DELIMITED = 2,
BITS32 = 5
};
ProtobufSimpleWriter::ProtobufSimpleWriter(WriteBuffer & out_) : out(out_)
{
}
ProtobufSimpleWriter::~ProtobufSimpleWriter()
{
finishCurrentMessage();
}
void ProtobufSimpleWriter::newMessage()
{
finishCurrentMessage();
were_messages = true;
}
void ProtobufSimpleWriter::finishCurrentMessage()
{
if (!were_messages)
return;
finishCurrentField();
current_field_number = 0;
StringRef str = message_buffer.stringRef();
writePbVarUInt(str.size, out);
out.write(str.data, str.size);
message_buffer.restart();
}
void ProtobufSimpleWriter::setCurrentField(UInt32 field_number)
{
finishCurrentField();
assert(current_field_number < field_number);
current_field_number = field_number;
num_normal_values = 0;
num_packed_values = 0;
}
void ProtobufSimpleWriter::finishCurrentField()
{
if (num_packed_values)
{
assert(!num_normal_values);
StringRef str = repeated_packing_buffer.stringRef();
if (str.size)
{
writeKey(LENGTH_DELIMITED, message_buffer);
writePbVarUInt(str.size, message_buffer);
message_buffer.write(str.data, str.size);
repeated_packing_buffer.restart();
}
}
}
void ProtobufSimpleWriter::writeKey(WireType wire_type, WriteBuffer & buf)
{
writePbVarUInt((current_field_number << 3) | wire_type, buf);
}
void ProtobufSimpleWriter::writeInt32(Int32 value)
{
assert(current_field_number);
writeKey(VARINT, message_buffer);
writePbVarIntNoZigZagEncoding(value, message_buffer);
++num_normal_values;
}
void ProtobufSimpleWriter::writeUInt32(UInt32 value)
{
assert(current_field_number);
writeKey(VARINT, message_buffer);
writePbVarUInt(value, message_buffer);
++num_normal_values;
}
void ProtobufSimpleWriter::writeSInt32(Int32 value)
{
assert(current_field_number);
writeKey(VARINT, message_buffer);
writePbVarInt(value, message_buffer);
++num_normal_values;
}
void ProtobufSimpleWriter::writeInt64(Int64 value)
{
assert(current_field_number);
writeKey(VARINT, message_buffer);
writePbVarIntNoZigZagEncoding(value, message_buffer);
++num_normal_values;
}
void ProtobufSimpleWriter::writeUInt64(UInt64 value)
{
assert(current_field_number);
writeKey(VARINT, message_buffer);
writePbVarUInt(value, message_buffer);
++num_normal_values;
}
void ProtobufSimpleWriter::writeSInt64(Int64 value)
{
assert(current_field_number);
writeKey(VARINT, message_buffer);
writePbVarInt(value, message_buffer);
++num_normal_values;
}
void ProtobufSimpleWriter::writeFixed32(UInt32 value)
{
assert(current_field_number);
writeKey(BITS32, message_buffer);
writePODBinary(value, message_buffer);
++num_normal_values;
}
void ProtobufSimpleWriter::writeSFixed32(Int32 value)
{
assert(current_field_number);
writeKey(BITS32, message_buffer);
writePODBinary(value, message_buffer);
++num_normal_values;
}
void ProtobufSimpleWriter::writeFloat(float value)
{
assert(current_field_number);
writeKey(BITS32, message_buffer);
writePODBinary(value, message_buffer);
++num_normal_values;
}
void ProtobufSimpleWriter::writeFixed64(UInt64 value)
{
assert(current_field_number);
writeKey(BITS64, message_buffer);
writePODBinary(value, message_buffer);
++num_normal_values;
}
void ProtobufSimpleWriter::writeSFixed64(Int64 value)
{
assert(current_field_number);
writeKey(BITS64, message_buffer);
writePODBinary(value, message_buffer);
++num_normal_values;
}
void ProtobufSimpleWriter::writeDouble(double value)
{
assert(current_field_number);
writeKey(BITS64, message_buffer);
writePODBinary(value, message_buffer);
++num_normal_values;
}
void ProtobufSimpleWriter::writeString(const StringRef & str)
{
assert(current_field_number);
++num_normal_values;
writeKey(LENGTH_DELIMITED, message_buffer);
writePbVarUInt(str.size, message_buffer);
message_buffer.write(str.data, str.size);
}
void ProtobufSimpleWriter::writeInt32IfNonZero(Int32 value)
{
if (value)
writeInt32(value);
}
void ProtobufSimpleWriter::writeUInt32IfNonZero(UInt32 value)
{
if (value)
writeUInt32(value);
}
void ProtobufSimpleWriter::writeSInt32IfNonZero(Int32 value)
{
if (value)
writeSInt32(value);
}
void ProtobufSimpleWriter::writeInt64IfNonZero(Int64 value)
{
if (value)
writeInt64(value);
}
void ProtobufSimpleWriter::writeUInt64IfNonZero(UInt64 value)
{
if (value)
writeUInt64(value);
}
void ProtobufSimpleWriter::writeSInt64IfNonZero(Int64 value)
{
if (value)
writeSInt64(value);
}
void ProtobufSimpleWriter::writeFixed32IfNonZero(UInt32 value)
{
if (value)
writeFixed32(value);
}
void ProtobufSimpleWriter::writeSFixed32IfNonZero(Int32 value)
{
if (value)
writeSFixed32(value);
}
void ProtobufSimpleWriter::writeFloatIfNonZero(float value)
{
if (value != 0)
writeFloat(value);
}
void ProtobufSimpleWriter::writeFixed64IfNonZero(UInt64 value)
{
if (value)
writeFixed64(value);
}
void ProtobufSimpleWriter::writeSFixed64IfNonZero(Int64 value)
{
if (value)
writeSFixed64(value);
}
void ProtobufSimpleWriter::writeDoubleIfNonZero(double value)
{
if (value != 0)
writeDouble(value);
}
void ProtobufSimpleWriter::writeStringIfNotEmpty(const StringRef & str)
{
if (str.size)
writeString(str);
}
void ProtobufSimpleWriter::packRepeatedInt32(Int32 value)
{
assert(current_field_number);
writePbVarIntNoZigZagEncoding(value, repeated_packing_buffer);
++num_packed_values;
}
void ProtobufSimpleWriter::packRepeatedUInt32(UInt32 value)
{
assert(current_field_number);
writePbVarUInt(value, repeated_packing_buffer);
++num_packed_values;
}
void ProtobufSimpleWriter::packRepeatedSInt32(Int32 value)
{
assert(current_field_number);
writePbVarInt(value, repeated_packing_buffer);
++num_packed_values;
}
void ProtobufSimpleWriter::packRepeatedInt64(Int64 value)
{
assert(current_field_number);
writePbVarIntNoZigZagEncoding(value, repeated_packing_buffer);
++num_packed_values;
}
void ProtobufSimpleWriter::packRepeatedUInt64(UInt64 value)
{
assert(current_field_number);
writePbVarUInt(value, repeated_packing_buffer);
++num_packed_values;
}
void ProtobufSimpleWriter::packRepeatedSInt64(Int64 value)
{
assert(current_field_number);
writePbVarInt(value, repeated_packing_buffer);
++num_packed_values;
}
void ProtobufSimpleWriter::packRepeatedFixed32(UInt32 value)
{
assert(current_field_number);
writePODBinary(value, repeated_packing_buffer);
++num_packed_values;
}
void ProtobufSimpleWriter::packRepeatedSFixed32(Int32 value)
{
assert(current_field_number);
writePODBinary(value, repeated_packing_buffer);
++num_packed_values;
}
void ProtobufSimpleWriter::packRepeatedFloat(float value)
{
assert(current_field_number);
writePODBinary(value, repeated_packing_buffer);
++num_packed_values;
}
void ProtobufSimpleWriter::packRepeatedFixed64(UInt64 value)
{
assert(current_field_number);
writePODBinary(value, repeated_packing_buffer);
++num_packed_values;
}
void ProtobufSimpleWriter::packRepeatedSFixed64(Int64 value)
{
assert(current_field_number);
writePODBinary(value, repeated_packing_buffer);
++num_packed_values;
}
void ProtobufSimpleWriter::packRepeatedDouble(double value)
{
assert(current_field_number);
writePODBinary(value, repeated_packing_buffer);
++num_packed_values;
}
}
#pragma once
#include <Core/Types.h>
#include <boost/noncopyable.hpp>
#include "IO/WriteBufferFromString.h"
namespace DB
{
/** Utility class to serialize protobufs.
* Knows nothing about protobuf schemas, just provides useful functions to serialize data.
* This class is written following the documentation: https://developers.google.com/protocol-buffers/docs/encoding
*/
class ProtobufSimpleWriter : private boost::noncopyable
{
public:
ProtobufSimpleWriter(WriteBuffer & out_);
~ProtobufSimpleWriter();
/// Should be called when we start writing a new message.
void newMessage();
/// Should be called when we start writing a new field.
/// A passed 'field_number' should be positive and greater than any previous 'field_number'.
void setCurrentField(UInt32 field_number);
UInt32 currentFieldNumber() const { return current_field_number; }
/// Returns number of values added to the current field.
size_t numValues() const { return num_normal_values + num_packed_values; }
void writeInt32(Int32 value);
void writeUInt32(UInt32 value);
void writeSInt32(Int32 value);
void writeInt64(Int64 value);
void writeUInt64(UInt64 value);
void writeSInt64(Int64 value);
void writeFixed32(UInt32 value);
void writeSFixed32(Int32 value);
void writeFloat(float value);
void writeFixed64(UInt64 value);
void writeSFixed64(Int64 value);
void writeDouble(double value);
void writeString(const StringRef & str);
void writeInt32IfNonZero(Int32 value);
void writeUInt32IfNonZero(UInt32 value);
void writeSInt32IfNonZero(Int32 value);
void writeInt64IfNonZero(Int64 value);
void writeUInt64IfNonZero(UInt64 value);
void writeSInt64IfNonZero(Int64 value);
void writeFixed32IfNonZero(UInt32 value);
void writeSFixed32IfNonZero(Int32 value);
void writeFloatIfNonZero(float value);
void writeFixed64IfNonZero(UInt64 value);
void writeSFixed64IfNonZero(Int64 value);
void writeDoubleIfNonZero(double value);
void writeStringIfNotEmpty(const StringRef & str);
void packRepeatedInt32(Int32 value);
void packRepeatedUInt32(UInt32 value);
void packRepeatedSInt32(Int32 value);
void packRepeatedInt64(Int64 value);
void packRepeatedUInt64(UInt64 value);
void packRepeatedSInt64(Int64 value);
void packRepeatedFixed32(UInt32 value);
void packRepeatedSFixed32(Int32 value);
void packRepeatedFloat(float value);
void packRepeatedFixed64(UInt64 value);
void packRepeatedSFixed64(Int64 value);
void packRepeatedDouble(double value);
private:
void finishCurrentMessage();
void finishCurrentField();
enum WireType : UInt32;
void writeKey(WireType wire_type, WriteBuffer & buf);
WriteBuffer & out;
bool were_messages = false;
WriteBufferFromOwnString message_buffer;
UInt32 current_field_number = 0;
size_t num_normal_values = 0;
size_t num_packed_values = 0;
WriteBufferFromOwnString repeated_packing_buffer;
};
}
此差异已折叠。
#pragma once
#include <Core/UUID.h>
#include <Formats/ProtobufSimpleWriter.h>
#include <boost/noncopyable.hpp>
#include <Common/PODArray.h>
#include <Common/UInt128.h>
#include <common/DayNum.h>
#include <Common/config.h>
#if USE_PROTOBUF
#include <Formats/ProtobufColumnMatcher.h>
#include <IO/WriteBufferFromString.h>
#include <boost/noncopyable.hpp>
#include <Common/PODArray.h>
namespace google
......@@ -18,20 +22,6 @@ namespace protobuf
}
}
#if USE_PROTOBUF
# define EMPTY_DEF
# define EMPTY_DEF_RET(a)
#else
# define EMPTY_DEF {}
# define EMPTY_DEF_RET(a) {return a;}
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wunused-parameter"
# if defined(__clang__)
# pragma GCC diagnostic ignored "-Wextra-semi"
# endif
#endif
namespace DB
{
class IAggregateFunction;
......@@ -44,85 +34,234 @@ using ConstAggregateDataPtr = const char *;
class ProtobufWriter : private boost::noncopyable
{
public:
ProtobufWriter(WriteBuffer & out, const google::protobuf::Descriptor * message_type) EMPTY_DEF;
~ProtobufWriter() EMPTY_DEF;
ProtobufWriter(WriteBuffer & out, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names);
~ProtobufWriter();
/// Returns fields of the protobuf schema sorted by their numbers.
const std::vector<const google::protobuf::FieldDescriptor *> & fieldsInWriteOrder() const;
/// Should be called at the beginning of writing a message.
void startMessage();
/// Should be called when we start writing a new message.
void newMessage() EMPTY_DEF;
/// Should be called at the end of writing a message.
void endMessage();
/// Should be called when we start writing a new field.
/// Returns false if there is no more fields in the message type.
bool nextField() EMPTY_DEF_RET(false);
/// Prepares for writing values of a field.
/// Returns true and sets 'column_index' to the corresponding column's index.
/// Returns false if there are no more fields to write in the message type (call endMessage() in this case).
bool writeField(size_t & column_index);
/// Returns the current field of the message type.
/// The value returned by this function changes after calling nextField() or newMessage().
#if USE_PROTOBUF
const google::protobuf::FieldDescriptor * currentField() const { return current_field; }
#endif
/// Writes a value. This function should be called one or multiple times after writeField().
/// Returns false if there are no more place for the values in the protobuf's field.
/// This can happen if the protobuf's field is not declared as repeated in the protobuf schema.
bool writeNumber(Int8 value) { return writeValueIfPossible(&IConverter::writeInt8, value); }
bool writeNumber(UInt8 value) { return writeValueIfPossible(&IConverter::writeUInt8, value); }
bool writeNumber(Int16 value) { return writeValueIfPossible(&IConverter::writeInt16, value); }
bool writeNumber(UInt16 value) { return writeValueIfPossible(&IConverter::writeUInt16, value); }
bool writeNumber(Int32 value) { return writeValueIfPossible(&IConverter::writeInt32, value); }
bool writeNumber(UInt32 value) { return writeValueIfPossible(&IConverter::writeUInt32, value); }
bool writeNumber(Int64 value) { return writeValueIfPossible(&IConverter::writeInt64, value); }
bool writeNumber(UInt64 value) { return writeValueIfPossible(&IConverter::writeUInt64, value); }
bool writeNumber(UInt128 value) { return writeValueIfPossible(&IConverter::writeUInt128, value); }
bool writeNumber(Float32 value) { return writeValueIfPossible(&IConverter::writeFloat32, value); }
bool writeNumber(Float64 value) { return writeValueIfPossible(&IConverter::writeFloat64, value); }
bool writeString(const StringRef & str) { return writeValueIfPossible(&IConverter::writeString, str); }
void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> & enum_values) { current_converter->prepareEnumMapping8(enum_values); }
void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> & enum_values) { current_converter->prepareEnumMapping16(enum_values); }
bool writeEnum(Int8 value) { return writeValueIfPossible(&IConverter::writeEnum8, value); }
bool writeEnum(Int16 value) { return writeValueIfPossible(&IConverter::writeEnum16, value); }
bool writeUUID(const UUID & uuid) { return writeValueIfPossible(&IConverter::writeUUID, uuid); }
bool writeDate(DayNum date) { return writeValueIfPossible(&IConverter::writeDate, date); }
bool writeDateTime(time_t tm) { return writeValueIfPossible(&IConverter::writeDateTime, tm); }
bool writeDecimal(Decimal32 decimal, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDecimal32, decimal, scale); }
bool writeDecimal(Decimal64 decimal, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDecimal64, decimal, scale); }
bool writeDecimal(const Decimal128 & decimal, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDecimal128, decimal, scale); }
bool writeAggregateFunction(const AggregateFunctionPtr & function, ConstAggregateDataPtr place) { return writeValueIfPossible(&IConverter::writeAggregateFunction, function, place); }
void writeNumber(Int8 value) EMPTY_DEF;
void writeNumber(UInt8 value) EMPTY_DEF;
void writeNumber(Int16 value) EMPTY_DEF;
void writeNumber(UInt16 value) EMPTY_DEF;
void writeNumber(Int32 value) EMPTY_DEF;
void writeNumber(UInt32 value) EMPTY_DEF;
void writeNumber(Int64 value) EMPTY_DEF;
void writeNumber(UInt64 value) EMPTY_DEF;
void writeNumber(UInt128 value) EMPTY_DEF;
void writeNumber(Float32 value) EMPTY_DEF;
void writeNumber(Float64 value) EMPTY_DEF;
private:
class SimpleWriter
{
public:
SimpleWriter(WriteBuffer & out_);
~SimpleWriter();
void writeString(const StringRef & value) EMPTY_DEF;
void startMessage();
void endMessage();
void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> & name_value_pairs) EMPTY_DEF;
void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> & name_value_pairs) EMPTY_DEF;
void writeEnum(Int8 value) EMPTY_DEF;
void writeEnum(Int16 value) EMPTY_DEF;
void startNestedMessage();
void endNestedMessage(UInt32 field_number, bool is_group, bool skip_if_empty);
void writeUUID(const UUID & value) EMPTY_DEF;
void writeDate(DayNum date) EMPTY_DEF;
void writeDateTime(time_t tm) EMPTY_DEF;
void writeInt(UInt32 field_number, Int64 value);
void writeUInt(UInt32 field_number, UInt64 value);
void writeSInt(UInt32 field_number, Int64 value);
template <typename T>
void writeFixed(UInt32 field_number, T value);
void writeString(UInt32 field_number, const StringRef & str);
void writeDecimal(Decimal32 decimal, UInt32 scale) EMPTY_DEF;
void writeDecimal(Decimal64 decimal, UInt32 scale) EMPTY_DEF;
void writeDecimal(const Decimal128 & decimal, UInt32 scale) EMPTY_DEF;
void startRepeatedPack();
void addIntToRepeatedPack(Int64 value);
void addUIntToRepeatedPack(UInt64 value);
void addSIntToRepeatedPack(Int64 value);
template <typename T>
void addFixedToRepeatedPack(T value);
void endRepeatedPack(UInt32 field_number);
void writeAggregateFunction(const AggregateFunctionPtr & function, ConstAggregateDataPtr place) EMPTY_DEF;
private:
struct Piece
{
size_t start;
size_t end;
Piece(size_t start, size_t end) : start(start), end(end) {}
Piece() = default;
};
private:
#if USE_PROTOBUF
struct NestedInfo
{
size_t num_pieces_at_start;
size_t num_bytes_skipped_at_start;
NestedInfo(size_t num_pieces_at_start, size_t num_bytes_skipped_at_start)
: num_pieces_at_start(num_pieces_at_start), num_bytes_skipped_at_start(num_bytes_skipped_at_start)
{
}
};
void enumerateFieldsInWriteOrder(const google::protobuf::Descriptor * message_type);
void createConverters();
WriteBuffer & out;
PODArray<UInt8> buffer;
std::vector<Piece> pieces;
size_t current_piece_start;
size_t num_bytes_skipped;
std::vector<NestedInfo> nested_infos;
};
void finishCurrentMessage();
void finishCurrentField();
class IConverter
{
public:
virtual ~IConverter() = default;
virtual void writeString(const StringRef &) = 0;
virtual void writeInt8(Int8) = 0;
virtual void writeUInt8(UInt8) = 0;
virtual void writeInt16(Int16) = 0;
virtual void writeUInt16(UInt16) = 0;
virtual void writeInt32(Int32) = 0;
virtual void writeUInt32(UInt32) = 0;
virtual void writeInt64(Int64) = 0;
virtual void writeUInt64(UInt64) = 0;
virtual void writeUInt128(const UInt128 &) = 0;
virtual void writeFloat32(Float32) = 0;
virtual void writeFloat64(Float64) = 0;
virtual void prepareEnumMapping8(const std::vector<std::pair<std::string, Int8>> &) = 0;
virtual void prepareEnumMapping16(const std::vector<std::pair<std::string, Int16>> &) = 0;
virtual void writeEnum8(Int8) = 0;
virtual void writeEnum16(Int16) = 0;
virtual void writeUUID(const UUID &) = 0;
virtual void writeDate(DayNum) = 0;
virtual void writeDateTime(time_t) = 0;
virtual void writeDecimal32(Decimal32, UInt32) = 0;
virtual void writeDecimal64(Decimal64, UInt32) = 0;
virtual void writeDecimal128(const Decimal128 &, UInt32) = 0;
virtual void writeAggregateFunction(const AggregateFunctionPtr &, ConstAggregateDataPtr) = 0;
};
ProtobufSimpleWriter simple_writer;
std::vector<const google::protobuf::FieldDescriptor *> fields_in_write_order;
size_t current_field_index = -1;
const google::protobuf::FieldDescriptor * current_field = nullptr;
class ConverterBaseImpl;
template <bool skip_null_value>
class ConverterToString;
template <int field_type_id, typename ToType, bool skip_null_value, bool pack_repeated>
class ConverterToNumber;
template <bool skip_null_value, bool pack_repeated>
class ConverterToBool;
template <bool skip_null_value, bool pack_repeated>
class ConverterToEnum;
class Converter;
class ToStringConverter;
template <typename T>
class ToNumberConverter;
class ToBoolConverter;
class ToEnumConverter;
struct ColumnMatcherTraits
{
struct FieldData
{
std::unique_ptr<IConverter> converter;
bool is_required;
bool is_repeatable;
bool should_pack_repeated;
ProtobufColumnMatcher::Message<ColumnMatcherTraits> * repeatable_container_message;
};
struct MessageData
{
UInt32 parent_field_number;
bool is_group;
bool is_required;
ProtobufColumnMatcher::Message<ColumnMatcherTraits> * repeatable_container_message;
bool need_repeat;
};
};
using Message = ProtobufColumnMatcher::Message<ColumnMatcherTraits>;
using Field = ProtobufColumnMatcher::Field<ColumnMatcherTraits>;
std::vector<std::unique_ptr<Converter>> converters;
Converter * current_converter = nullptr;
void setTraitsDataAfterMatchingColumns(Message * message);
#endif
template <int field_type_id>
std::unique_ptr<IConverter> createConverter(const google::protobuf::FieldDescriptor * field);
template <typename... Params>
using WriteValueFunctionPtr = void (IConverter::*)(Params...);
template <typename... Params, typename... Args>
bool writeValueIfPossible(WriteValueFunctionPtr<Params...> func, Args &&... args)
{
if (num_values && !current_field->data.is_repeatable)
{
setNestedMessageNeedsRepeat();
return false;
}
(current_converter->*func)(std::forward<Args>(args)...);
++num_values;
return true;
}
void setNestedMessageNeedsRepeat();
void endWritingField();
SimpleWriter simple_writer;
std::unique_ptr<Message> root_message;
Message * current_message;
size_t current_field_index = 0;
const Field * current_field = nullptr;
IConverter * current_converter = nullptr;
size_t num_values = 0;
};
}
#if !USE_PROTOBUF
# undef EMPTY_DEF
# undef EMPTY_DEF_RET
# pragma GCC diagnostic pop
#else
namespace DB
{
class IAggregateFunction;
using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
using ConstAggregateDataPtr = const char *;
class ProtobufWriter
{
public:
bool writeNumber(Int8 value) { return false; }
bool writeNumber(UInt8 value) { return false; }
bool writeNumber(Int16 value) { return false; }
bool writeNumber(UInt16 value) { return false; }
bool writeNumber(Int32 value) { return false; }
bool writeNumber(UInt32 value) { return false; }
bool writeNumber(Int64 value) { return false; }
bool writeNumber(UInt64 value) { return false; }
bool writeNumber(UInt128 value) { return false; }
bool writeNumber(Float32 value) { return false; }
bool writeNumber(Float64 value) { return false; }
bool writeString(const StringRef & value) { return false; }
void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> & name_value_pairs) {}
void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> & name_value_pairs) {}
bool writeEnum(Int8 value) { return false; }
bool writeEnum(Int16 value) { return false; }
bool writeUUID(const UUID & value) { return false; }
bool writeDate(DayNum date) { return false; }
bool writeDateTime(time_t tm) { return false; }
bool writeDecimal(Decimal32 decimal, UInt32 scale) { return false; }
bool writeDecimal(Decimal64 decimal, UInt32 scale) { return false; }
bool writeDecimal(const Decimal128 & decimal, UInt32 scale) { return false; }
bool writeAggregateFunction(const AggregateFunctionPtr & function, ConstAggregateDataPtr place) { return false; }
};
}
#endif
#include <Core/Defines.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
......@@ -162,8 +164,14 @@ String TabSeparatedRowInputStream::getDiagnosticInfo()
}
bool TabSeparatedRowInputStream::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name)
/** gcc-7 generates wrong code with optimization level greater than 1.
* See tests: dbms/src/IO/tests/write_int.cpp
* and dbms/tests/queries/0_stateless/00898_parsing_bad_diagnostic_message.sh
* This is compiler bug. The bug does not present in gcc-8 and clang-8.
* Nevertheless, we don't need high optimization of this function.
*/
bool OPTIMIZE(1) TabSeparatedRowInputStream::parseRowAndPrintDiagnosticInfo(
MutableColumns & columns, WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name)
{
size_t size = data_types.size();
for (size_t i = 0; i < size; ++i)
......
......@@ -20,8 +20,7 @@ target_link_libraries(clickhouse_functions
${METROHASH_LIBRARIES}
murmurhash
${BASE64_LIBRARY}
${OPENSSL_CRYPTO_LIBRARY}
${LZ4_LIBRARY})
${OPENSSL_CRYPTO_LIBRARY})
target_include_directories (clickhouse_functions SYSTEM BEFORE PUBLIC ${DIVIDE_INCLUDE_DIR} ${METROHASH_INCLUDE_DIR})
......@@ -56,3 +55,8 @@ endif ()
if(USE_BASE64)
target_include_directories(clickhouse_functions SYSTEM PRIVATE ${BASE64_INCLUDE_DIR})
endif()
if (USE_XXHASH)
target_link_libraries(clickhouse_functions PRIVATE ${XXHASH_LIBRARY})
target_include_directories(clickhouse_functions SYSTEM PRIVATE ${XXHASH_INCLUDE_DIR})
endif()
......@@ -7,7 +7,7 @@
#include <Functions/FunctionHelpers.h>
#include <Functions/GatherUtils/Algorithms.h>
#include <IO/WriteHelpers.h>
#include <libbase64.h>
#include <libbase64.h> // Y_IGNORE
namespace DB
......
......@@ -14,7 +14,7 @@
#include <Common/config.h>
#if USE_XXHASH
#include <xxhash.h>
#include <xxhash.h> // Y_IGNORE
#endif
#include <Poco/ByteOrder.h>
......
#include <Functions/FunctionsStringSimilarity.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsHashing.h>
#include <Common/HashTable/ClearableHashMap.h>
#include <Common/HashTable/Hash.h>
#include <Common/UTF8Helpers.h>
#include <Core/Defines.h>
#include <algorithm>
#include <cstring>
#include <limits>
#include <memory>
#ifdef __SSE4_2__
#include <nmmintrin.h>
#endif
namespace DB
{
/** Distance function implementation.
* We calculate all the trigrams from left string and count by the index of
* 16 bits hash of them in the map.
* Then calculate all the trigrams from the right string and calculate
* the trigram distance on the flight by adding and subtracting from the hashmap.
* Then return the map into the condition of which it was after the left string
* calculation. If the right string size is big (more than 2**15 bytes),
* the strings are not similar at all and we return 1.
*/
struct TrigramDistanceImpl
{
using ResultType = Float32;
using CodePoint = UInt32;
/// map_size for trigram difference
static constexpr size_t map_size = 1u << 16;
/// If the haystack size is bigger than this, behaviour is unspecified for this function
static constexpr size_t max_string_size = 1u << 15;
/** This fits mostly in L2 cache all the time.
* Actually use UInt16 as addings and subtractions do not UB overflow. But think of it as a signed
* integer array.
*/
using TrigramStats = UInt16[map_size];
static ALWAYS_INLINE UInt16 trigramHash(CodePoint one, CodePoint two, CodePoint three)
{
UInt64 combined = (static_cast<UInt64>(one) << 32) | two;
#ifdef __SSE4_2__
return _mm_crc32_u64(three, combined) & 0xFFFFu;
#else
return (intHashCRC32(combined) ^ intHashCRC32(three)) & 0xFFFFu;
#endif
}
static ALWAYS_INLINE CodePoint readCodePoint(const char *& pos, const char * end) noexcept
{
size_t length = UTF8::seqLength(*pos);
if (pos + length > end)
length = end - pos;
CodePoint res;
/// This is faster than just memcpy because of compiler optimizations with moving bytes.
switch (length)
{
case 1:
res = 0;
memcpy(&res, pos, 1);
break;
case 2:
res = 0;
memcpy(&res, pos, 2);
break;
case 3:
res = 0;
memcpy(&res, pos, 3);
break;
default:
memcpy(&res, pos, 4);
}
pos += length;
return res;
}
static inline size_t calculateNeedleStats(const char * data, const size_t size, TrigramStats & trigram_stats) noexcept
{
size_t len = 0;
const char * start = data;
const char * end = data + size;
CodePoint cp1 = 0;
CodePoint cp2 = 0;
CodePoint cp3 = 0;
while (start != end)
{
cp1 = cp2;
cp2 = cp3;
cp3 = readCodePoint(start, end);
++len;
if (len < 3)
continue;
++trigram_stats[trigramHash(cp1, cp2, cp3)];
}
return std::max(static_cast<Int64>(0), static_cast<Int64>(len) - 2);
}
static inline UInt64 calculateHaystackStatsAndMetric(const char * data, const size_t size, TrigramStats & trigram_stats, size_t & distance)
{
size_t len = 0;
size_t trigram_cnt = 0;
const char * start = data;
const char * end = data + size;
CodePoint cp1 = 0;
CodePoint cp2 = 0;
CodePoint cp3 = 0;
/// allocation tricks, most strings are relatively small
static constexpr size_t small_buffer_size = 256;
std::unique_ptr<UInt16[]> big_buffer;
UInt16 small_buffer[small_buffer_size];
UInt16 * trigram_storage = small_buffer;
if (size > small_buffer_size)
{
trigram_storage = new UInt16[size];
big_buffer.reset(trigram_storage);
}
while (start != end)
{
cp1 = cp2;
cp2 = cp3;
cp3 = readCodePoint(start, end);
++len;
if (len < 3)
continue;
UInt16 hash = trigramHash(cp1, cp2, cp3);
if (static_cast<Int16>(trigram_stats[hash]) > 0)
--distance;
else
++distance;
trigram_storage[trigram_cnt++] = hash;
--trigram_stats[hash];
}
/// Return the state of hash map to its initial.
for (size_t i = 0; i < trigram_cnt; ++i)
++trigram_stats[trigram_storage[i]];
return trigram_cnt;
}
static void constant_constant(const std::string & data, const std::string & needle, Float32 & res)
{
TrigramStats common_stats;
memset(common_stats, 0, sizeof(common_stats));
size_t second_size = calculateNeedleStats(needle.data(), needle.size(), common_stats);
size_t distance = second_size;
if (data.size() <= max_string_size)
{
size_t first_size = calculateHaystackStatsAndMetric(data.data(), data.size(), common_stats, distance);
res = distance * 1.f / std::max(first_size + second_size, size_t(1));
}
else
{
res = 1.f;
}
}
static void vector_constant(
const ColumnString::Chars & data, const ColumnString::Offsets & offsets, const std::string & needle, PaddedPODArray<Float32> & res)
{
TrigramStats common_stats;
memset(common_stats, 0, sizeof(common_stats));
const size_t needle_stats_size = calculateNeedleStats(needle.data(), needle.size(), common_stats);
size_t distance = needle_stats_size;
size_t prev_offset = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{
const UInt8 * haystack = &data[prev_offset];
const size_t haystack_size = offsets[i] - prev_offset - 1;
if (haystack_size <= max_string_size)
{
size_t haystack_stats_size
= calculateHaystackStatsAndMetric(reinterpret_cast<const char *>(haystack), haystack_size, common_stats, distance);
res[i] = distance * 1.f / std::max(haystack_stats_size + needle_stats_size, size_t(1));
}
else
{
res[i] = 1.f;
}
distance = needle_stats_size;
prev_offset = offsets[i];
}
}
};
struct TrigramDistanceName
{
static constexpr auto name = "trigramDistance";
};
using FunctionTrigramsDistance = FunctionsStringSimilarity<TrigramDistanceImpl, TrigramDistanceName>;
void registerFunctionsStringSimilarity(FunctionFactory & factory)
{
factory.registerFunction<FunctionTrigramsDistance>();
}
}
#pragma once
#include <Columns/ColumnConst.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
namespace DB
{
/** Calculate similarity metrics:
*
* trigramDistance(haystack, needle) --- calculate so called 3-gram distance between haystack and needle.
* Returns float number from 0 to 1 - the closer to zero, the more strings are similar to each other.
*/
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ILLEGAL_COLUMN;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int TOO_LARGE_STRING_SIZE;
}
template <typename Impl, typename Name>
class FunctionsStringSimilarity : public IFunction
{
public:
static constexpr auto name = Name::name;
static FunctionPtr create(const Context &) { return std::make_shared<FunctionsStringSimilarity>(); }
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 2; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!isString(arguments[0]))
throw Exception(
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!isString(arguments[1]))
throw Exception(
"Illegal type " + arguments[1]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeNumber<typename Impl::ResultType>>();
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override
{
using ResultType = typename Impl::ResultType;
const ColumnPtr & column_haystack = block.getByPosition(arguments[0]).column;
const ColumnPtr & column_needle = block.getByPosition(arguments[1]).column;
const ColumnConst * col_haystack_const = typeid_cast<const ColumnConst *>(&*column_haystack);
const ColumnConst * col_needle_const = typeid_cast<const ColumnConst *>(&*column_needle);
if (!col_needle_const)
throw Exception("Second argument of function " + getName() + " must be constant string.", ErrorCodes::ILLEGAL_COLUMN);
if (col_haystack_const)
{
ResultType res{};
const String & needle = col_needle_const->getValue<String>();
if (needle.size() > Impl::max_string_size)
{
throw Exception(
"String size of needle is too big for function " + getName() + ". Should be at most " + std::to_string(Impl::max_string_size),
ErrorCodes::TOO_LARGE_STRING_SIZE);
}
Impl::constant_constant(col_haystack_const->getValue<String>(), needle, res);
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(col_haystack_const->size(), toField(res));
return;
}
auto col_res = ColumnVector<ResultType>::create();
typename ColumnVector<ResultType>::Container & vec_res = col_res->getData();
vec_res.resize(column_haystack->size());
const ColumnString * col_haystack_vector = checkAndGetColumn<ColumnString>(&*column_haystack);
if (col_haystack_vector)
{
const String & needle = col_needle_const->getValue<String>();
if (needle.size() > Impl::max_string_size)
{
throw Exception(
"String size of needle is too big for function " + getName() + ". Should be at most " + std::to_string(Impl::max_string_size),
ErrorCodes::TOO_LARGE_STRING_SIZE);
}
Impl::vector_constant(
col_haystack_vector->getChars(), col_haystack_vector->getOffsets(), needle, vec_res);
}
else
{
throw Exception(
"Illegal columns " + block.getByPosition(arguments[0]).column->getName() + " and "
+ block.getByPosition(arguments[1]).column->getName() + " of arguments of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
block.getByPosition(result).column = std::move(col_res);
}
};
}
......@@ -5,7 +5,6 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeArray.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnFixedString.h>
......
......@@ -6,7 +6,6 @@
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnLowCardinality.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
......
#include "FunctionFactory.h"
#include "arrayEnumerateRanked.h"
namespace DB
{
class FunctionArrayEnumerateDenseRanked : public FunctionArrayEnumerateRankedExtended<FunctionArrayEnumerateDenseRanked>
{
using Base = FunctionArrayEnumerateRankedExtended<FunctionArrayEnumerateDenseRanked>;
public:
static constexpr auto name = "arrayEnumerateDenseRanked";
using Base::create;
};
void registerFunctionArrayEnumerateDenseRanked(FunctionFactory & factory)
{
factory.registerFunction<FunctionArrayEnumerateDenseRanked>();
}
}
#include "arrayEnumerateRanked.h"
namespace DB
{
ArraysDepths getArraysDepths(const ColumnsWithTypeAndName & arguments)
{
const size_t num_arguments = arguments.size();
DepthType clear_depth = 1;
DepthType max_array_depth = 0;
DepthTypes depths;
size_t array_num = 0;
DepthType last_array_depth = 0;
for (size_t i = 0; i < num_arguments; ++i)
{
const auto type = arguments[i].type;
if (isArray(type))
{
if (depths.size() < array_num && last_array_depth)
{
depths.emplace_back(last_array_depth);
last_array_depth = 0;
}
DepthType depth = 0;
auto sub_type = type;
do
{
auto sub_type_array = typeid_cast<const DataTypeArray *>(sub_type.get());
if (!sub_type_array)
break;
sub_type = sub_type_array->getNestedType();
++depth;
} while (isArray(sub_type));
last_array_depth = depth;
++array_num;
}
if (!arguments[i].column)
continue;
const IColumn * non_const = nullptr;
if (auto const_array_column = typeid_cast<const ColumnConst *>(arguments[i].column.get()))
non_const = const_array_column->getDataColumnPtr().get();
const auto array = typeid_cast<const ColumnArray *>(non_const ? non_const : arguments[i].column.get());
if (!array)
{
const auto & depth_column = arguments[i].column;
if (depth_column && depth_column->isColumnConst())
{
auto value = depth_column->getUInt(0);
if (!value)
throw Exception(
"Arguments for function arrayEnumerateUniqRanked/arrayEnumerateDenseRanked incorrect: depth ("
+ std::to_string(value) + ") cant be 0.",
ErrorCodes::BAD_ARGUMENTS);
if (i == 0)
{
clear_depth = value;
}
else
{
if (depths.size() >= array_num)
{
throw Exception(
"Arguments for function arrayEnumerateUniqRanked/arrayEnumerateDenseRanked incorrect: depth ("
+ std::to_string(value) + ") for missing array.",
ErrorCodes::BAD_ARGUMENTS);
}
depths.emplace_back(value);
}
}
}
}
if (depths.size() < array_num)
{
depths.emplace_back(last_array_depth);
}
for (auto & depth : depths)
{
if (max_array_depth < depth)
max_array_depth = depth;
}
if (depths.empty())
throw Exception(
"Arguments for function arrayEnumerateUniqRanked/arrayEnumerateDenseRanked incorrect: At least one array should be passed.",
ErrorCodes::BAD_ARGUMENTS);
if (clear_depth > max_array_depth)
throw Exception(
"Arguments for function arrayEnumerateUniqRanked/arrayEnumerateDenseRanked incorrect: clear_depth ("
+ std::to_string(clear_depth) + ") cant be larger than max_array_depth (" + std::to_string(max_array_depth) + ").",
ErrorCodes::BAD_ARGUMENTS);
return {clear_depth, depths, max_array_depth};
}
}
#include <Columns/ColumnArray.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/getLeastSupertype.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Interpreters/AggregationCommon.h>
#include <Common/ColumnsHashing.h>
#include <Common/HashTable/ClearableHashMap.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int SIZES_OF_ARRAYS_DOESNT_MATCH;
}
class FunctionArrayEnumerateUniqRanked;
class FunctionArrayEnumerateDenseRanked;
using DepthType = uint32_t;
using DepthTypes = std::vector<DepthType>;
struct ArraysDepths
{
DepthType clear_depth;
DepthTypes depths;
DepthType max_array_depth;
};
/// Return depth info about passed arrays
ArraysDepths getArraysDepths(const ColumnsWithTypeAndName & arguments);
template <typename Derived>
class FunctionArrayEnumerateRankedExtended : public IFunction
{
public:
static FunctionPtr create(const Context & /* context */) { return std::make_shared<Derived>(); }
String getName() const override { return Derived::name; }
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() == 0)
throw Exception(
"Number of arguments for function " + getName() + " doesn't match: passed " + std::to_string(arguments.size())
+ ", should be at least 1.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const auto & arrays_depths = getArraysDepths(arguments);
DataTypePtr type = std::make_shared<DataTypeUInt32>();
for (DepthType i = 0; i < arrays_depths.max_array_depth; ++i)
type = std::make_shared<DataTypeArray>(type);
return type;
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override;
private:
/// Initially allocate a piece of memory for 512 elements. NOTE: This is just a guess.
static constexpr size_t INITIAL_SIZE_DEGREE = 9;
void executeMethodImpl(
const std::vector<const ColumnArray::Offsets *> & offsets_by_depth,
const ColumnRawPtrs & columns,
const ArraysDepths & arrays_depths,
ColumnUInt32::Container & res_values);
};
/// Hash a set of keys into a UInt128 value.
static inline UInt128 ALWAYS_INLINE hash128depths(const std::vector<size_t> & indexes, const ColumnRawPtrs & key_columns)
{
UInt128 key;
SipHash hash;
for (size_t j = 0, keys_size = key_columns.size(); j < keys_size; ++j)
{
// Debug: const auto & field = (*key_columns[j])[indexes[j]]; DUMP(j, indexes[j], field);
key_columns[j]->updateHashWithValue(indexes[j], hash);
}
hash.get128(key.low, key.high);
return key;
}
template <typename Derived>
void FunctionArrayEnumerateRankedExtended<Derived>::executeImpl(
Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/)
{
size_t num_arguments = arguments.size();
ColumnRawPtrs data_columns;
Columns array_holders;
ColumnPtr offsets_column;
ColumnsWithTypeAndName args;
for (size_t i = 0; i < arguments.size(); ++i)
args.emplace_back(block.getByPosition(arguments[i]));
const auto & arrays_depths = getArraysDepths(args);
auto get_array_column = [&](const auto & column) -> const DB::ColumnArray * {
const ColumnArray * array = checkAndGetColumn<ColumnArray>(column);
if (!array)
{
const ColumnConst * const_array = checkAndGetColumnConst<ColumnArray>(column);
if (!const_array)
return nullptr;
array_holders.emplace_back(const_array->convertToFullColumn());
array = checkAndGetColumn<ColumnArray>(array_holders.back().get());
}
return array;
};
std::vector<const ColumnArray::Offsets *> offsets_by_depth;
std::vector<ColumnPtr> offsetsptr_by_depth;
size_t array_num = 0;
for (size_t i = 0; i < num_arguments; ++i)
{
const auto * array = get_array_column(block.getByPosition(arguments[i]).column.get());
if (!array)
continue;
if (array_num == 0) // TODO check with prev
{
offsets_by_depth.emplace_back(&array->getOffsets());
offsetsptr_by_depth.emplace_back(array->getOffsetsPtr());
}
else
{
if (*offsets_by_depth[0] != array->getOffsets())
{
throw Exception(
"Lengths and depths of all arrays passed to " + getName() + " must be equal.",
ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
}
}
DepthType col_depth = 1;
for (; col_depth < arrays_depths.depths[array_num]; ++col_depth)
{
auto sub_array = get_array_column(&array->getData());
if (sub_array)
array = sub_array;
if (!sub_array)
break;
if (offsets_by_depth.size() <= col_depth)
{
offsets_by_depth.emplace_back(&array->getOffsets());
offsetsptr_by_depth.emplace_back(array->getOffsetsPtr());
}
else
{
if (*offsets_by_depth[col_depth] != array->getOffsets())
{
throw Exception(
"Lengths and depths of all arrays passed to " + getName() + " must be equal.",
ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
}
}
}
if (col_depth < arrays_depths.depths[array_num])
{
throw Exception(
getName() + ": Passed array number " + std::to_string(array_num) + " depth ("
+ std::to_string(arrays_depths.depths[array_num]) + ") more than actual array depth (" + std::to_string(col_depth)
+ ").",
ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
}
auto * array_data = &array->getData();
data_columns.emplace_back(array_data);
++array_num;
}
if (offsets_by_depth.empty())
throw Exception("No arrays passed to function " + getName(), ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
auto res_nested = ColumnUInt32::create();
ColumnUInt32::Container & res_values = res_nested->getData();
res_values.resize(offsets_by_depth[arrays_depths.max_array_depth - 1]->back());
executeMethodImpl(offsets_by_depth, data_columns, arrays_depths, res_values);
ColumnPtr result_nested_array = std::move(res_nested);
for (int depth = arrays_depths.max_array_depth - 1; depth >= 0; --depth)
result_nested_array = ColumnArray::create(std::move(result_nested_array), offsetsptr_by_depth[depth]);
block.getByPosition(result).column = result_nested_array;
}
/*
(2, [[1,2,3],[2,2,1],[3]], 2, [4,5,6], 1)
; 1 2 3; 2 2 1; 3 4 5 6
; 4 4 4; 5 5 5; 6 <-
(1, [[1,2,3],[2,2,1],[3]], 1, [4,5,6], 1)
;[1,2,3] [2,2,1] [3] 4 5 6
;4 5 6 <-
(1, [[1,2,3],[2,2,1],[3]], 1, [4,5,6], 0)
;[1,2,3] [2,2,1] [3] 4 5 6
;[4,5,6] [4,5,6] [4,5,6] <-
. - get data
; - clean index
(1, [[[1,2,3],[1,2,3],[1,2,3]],[[1,2,3],[1,2,3],[1,2,3]],[[1,2]]], 1)
;. . .
(1, [[[1,2,3],[1,2,3],[1,2,3]],[[1,2,3],[1,2,3],[1,2,3]],[[1,2]]], 2)
; . . . . . . .
(2, [[[1,2,3],[1,2,3],[1,2,3]],[[1,2,3],[1,2,3],[1,2,3]],[[1,2]]], 2)
; . . . ; . . . ; .
(1, [[[1,2,3],[1,2,3],[1,2,3]],[[1,2,3],[1,2,3],[1,2,3]],[[1,2]]], 3)
; . . . . . . . . . . . . . . . . . . . .
(2, [[[1,2,3],[1,2,3],[1,2,3]],[[1,2,3],[1,2,3],[1,2,3]],[[1,2]]], 3)
; . . . . . . . . . ; . . . . . . . . . ; . .
(3, [[[1,2,3],[1,2,3],[1,2,3]],[[1,2,3],[1,2,3],[1,2,3]],[[1,2]]], 3)
; . . . ; . . . ; . . . ; . . . ; . . . ; . . . ; . .
*/
template <typename Derived>
void FunctionArrayEnumerateRankedExtended<Derived>::executeMethodImpl(
const std::vector<const ColumnArray::Offsets *> & offsets_by_depth,
const ColumnRawPtrs & columns,
const ArraysDepths & arrays_depths,
ColumnUInt32::Container & res_values)
{
const size_t current_offset_depth = arrays_depths.max_array_depth;
const auto & offsets = *offsets_by_depth[current_offset_depth - 1];
ColumnArray::Offset prev_off = 0;
using Map = ClearableHashMap<
UInt128,
UInt32,
UInt128TrivialHash,
HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
Map indices;
std::vector<size_t> indexes_by_depth(arrays_depths.max_array_depth);
std::vector<size_t> current_offset_n_by_depth(arrays_depths.max_array_depth);
UInt32 rank = 0;
std::vector<size_t> columns_indexes(columns.size());
for (size_t off : offsets)
{
bool want_clear = false;
for (size_t j = prev_off; j < off; ++j)
{
for (size_t col_n = 0; col_n < columns.size(); ++col_n)
columns_indexes[col_n] = indexes_by_depth[arrays_depths.depths[col_n] - 1];
auto hash = hash128depths(columns_indexes, columns);
if constexpr (std::is_same_v<Derived, FunctionArrayEnumerateUniqRanked>)
{
auto idx = ++indices[hash];
res_values[j] = idx;
}
else // FunctionArrayEnumerateDenseRanked
{
auto idx = indices[hash];
if (!idx)
{
idx = ++rank;
indices[hash] = idx;
}
res_values[j] = idx;
}
// Debug: DUMP(off, prev_off, j, columns_indexes, res_values[j], columns);
for (int depth = current_offset_depth - 1; depth >= 0; --depth)
{
++indexes_by_depth[depth];
if (indexes_by_depth[depth] == (*offsets_by_depth[depth])[current_offset_n_by_depth[depth]])
{
if (static_cast<int>(arrays_depths.clear_depth) == depth + 1)
want_clear = true;
++current_offset_n_by_depth[depth];
}
else
{
break;
}
}
}
if (want_clear)
{
want_clear = false;
indices.clear();
rank = 0;
}
prev_off = off;
}
}
}
#include "Functions/FunctionFactory.h"
#include "arrayEnumerateRanked.h"
namespace DB
{
class FunctionArrayEnumerateUniqRanked : public FunctionArrayEnumerateRankedExtended<FunctionArrayEnumerateUniqRanked>
{
using Base = FunctionArrayEnumerateRankedExtended<FunctionArrayEnumerateUniqRanked>;
public:
static constexpr auto name = "arrayEnumerateUniqRanked";
using Base::create;
};
void registerFunctionArrayEnumerateUniqRanked(FunctionFactory & factory)
{
factory.registerFunction<FunctionArrayEnumerateUniqRanked>();
}
}
......@@ -33,6 +33,7 @@ void registerFunctionsRound(FunctionFactory &);
void registerFunctionsString(FunctionFactory &);
void registerFunctionsStringArray(FunctionFactory &);
void registerFunctionsStringSearch(FunctionFactory &);
void registerFunctionsStringSimilarity(FunctionFactory &);
void registerFunctionsURL(FunctionFactory &);
void registerFunctionsVisitParam(FunctionFactory &);
void registerFunctionsMath(FunctionFactory &);
......@@ -72,6 +73,7 @@ void registerFunctions()
registerFunctionsString(factory);
registerFunctionsStringArray(factory);
registerFunctionsStringSearch(factory);
registerFunctionsStringSimilarity(factory);
registerFunctionsURL(factory);
registerFunctionsVisitParam(factory);
registerFunctionsMath(factory);
......
......@@ -26,6 +26,8 @@ void registerFunctionEmptyArrayToSingle(FunctionFactory &);
void registerFunctionArrayEnumerate(FunctionFactory &);
void registerFunctionArrayEnumerateUniq(FunctionFactory &);
void registerFunctionArrayEnumerateDense(FunctionFactory &);
void registerFunctionArrayEnumerateUniqRanked(FunctionFactory &);
void registerFunctionArrayEnumerateDenseRanked(FunctionFactory &);
void registerFunctionArrayUniq(FunctionFactory &);
void registerFunctionArrayDistinct(FunctionFactory &);
void registerFunctionArrayWithConstant(FunctionFactory &);
......@@ -55,6 +57,8 @@ void registerFunctionsArray(FunctionFactory & factory)
registerFunctionArrayEnumerate(factory);
registerFunctionArrayEnumerateUniq(factory);
registerFunctionArrayEnumerateDense(factory);
registerFunctionArrayEnumerateUniqRanked(factory);
registerFunctionArrayEnumerateDenseRanked(factory);
registerFunctionArrayUniq(factory);
registerFunctionArrayDistinct(factory);
registerFunctionArrayWithConstant(factory);
......
......@@ -2,7 +2,7 @@
#if USE_BROTLI
#include "BrotliReadBuffer.h"
#include <brotli/decode.h>
#include <brotli/decode.h> // Y_IGNORE
namespace DB
{
......
......@@ -4,7 +4,7 @@
#include <Poco/URI.h>
#if USE_HDFS
#include <hdfs/hdfs.h>
#include <hdfs/hdfs.h> // Y_IGNORE
namespace DB
{
......
......@@ -2,10 +2,10 @@
#if USE_HDFS
#include <IO/ReadBufferFromHDFS.h>
#include <IO/ReadBufferFromHDFS.h> // Y_IGNORE
#include <IO/HDFSCommon.h>
#include <Poco/URI.h>
#include <hdfs/hdfs.h>
#include <hdfs/hdfs.h> // Y_IGNORE
namespace DB
......
......@@ -210,12 +210,41 @@ void readStringUntilEOFInto(Vector & s, ReadBuffer & buf)
}
}
void readStringUntilEOF(String & s, ReadBuffer & buf)
{
s.clear();
readStringUntilEOFInto(s, buf);
}
template <typename Vector>
void readEscapedStringUntilEOLInto(Vector & s, ReadBuffer & buf)
{
while (!buf.eof())
{
char * next_pos = find_first_symbols<'\n', '\\'>(buf.position(), buf.buffer().end());
appendToStringOrVector(s, buf, next_pos);
buf.position() = next_pos;
if (!buf.hasPendingData())
continue;
if (*buf.position() == '\n')
return;
if (*buf.position() == '\\')
parseComplexEscapeSequence(s, buf);
}
}
void readEscapedStringUntilEOL(String & s, ReadBuffer & buf)
{
s.clear();
readEscapedStringUntilEOLInto(s, buf);
}
template void readStringUntilEOFInto<PaddedPODArray<UInt8>>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
......
......@@ -403,6 +403,7 @@ void readBackQuotedString(String & s, ReadBuffer & buf);
void readBackQuotedStringWithSQLStyle(String & s, ReadBuffer & buf);
void readStringUntilEOF(String & s, ReadBuffer & buf);
void readEscapedStringUntilEOL(String & s, ReadBuffer & buf);
/** Read string in CSV format.
......
......@@ -3,9 +3,9 @@
#if USE_HDFS
#include <Poco/URI.h>
#include <IO/WriteBufferFromHDFS.h>
#include <IO/WriteBufferFromHDFS.h> // Y_IGNORE
#include <IO/HDFSCommon.h>
#include <hdfs/hdfs.h>
#include <hdfs/hdfs.h> // Y_IGNORE
namespace DB
......
#pragma once
#include <limits>
#include <type_traits>
#include <Core/Types.h>
#include <Core/Defines.h>
#include <IO/WriteBuffer.h>
#include <common/itoa.h>
......@@ -11,6 +9,7 @@
namespace DB
{
namespace detail
{
template <typename T>
......
......@@ -35,7 +35,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(const std::
ProfileEvents::increment(ProfileEvents::CreatedReadBufferAIO);
return std::make_unique<ReadBufferAIO>(filename_, buffer_size_, flags_, existing_memory_);
#else
throw Exception("AIO is not implemented yet on non-Linux OS", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("AIO is implemented only on Linux and FreeBSD", ErrorCodes::NOT_IMPLEMENTED);
#endif
}
}
......
......@@ -37,7 +37,7 @@ std::unique_ptr<WriteBufferFromFileBase> createWriteBufferFromFileBase(const std
ProfileEvents::increment(ProfileEvents::CreatedWriteBufferAIO);
return std::make_unique<WriteBufferAIO>(filename_, buffer_size_, flags_, mode, existing_memory_);
#else
throw Exception("AIO is not implemented yet on non-Linux OS", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("AIO is implemented only on Linux and FreeBSD", ErrorCodes::NOT_IMPLEMENTED);
#endif
}
}
......
......@@ -51,7 +51,7 @@ add_executable (hashing_read_buffer hashing_read_buffer.cpp)
target_link_libraries (hashing_read_buffer PRIVATE clickhouse_common_io)
add_check (hashing_read_buffer)
add_executable (io_operators operators.cpp)
add_executable (io_operators io_operators.cpp)
target_link_libraries (io_operators PRIVATE clickhouse_common_io)
if (OS_LINUX)
......
......@@ -31,5 +31,11 @@ int main(int, char **)
std::cerr << hello << '\n';
}
{
DB::WriteBufferFromFileDescriptor buf(STDOUT_FILENO);
size_t x = 11;
buf << "Column " << x << ", \n";
}
return 0;
}
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h>
/** gcc-7 generates wrong code with -O1 -finline-small-functions -ftree-vrp
* This is compiler bug. The issue does not exist in gcc-8 or clang-8.
*/
using namespace DB;
void NO_INLINE write(WriteBuffer & out, size_t size)
{
for (size_t i = 0; i < size; ++i)
{
writeIntText(i, out);
writeChar(' ', out);
}
}
int main(int, char **)
{
WriteBufferFromFileDescriptor out(STDOUT_FILENO);
write(out, 80);
return 0;
}
......@@ -190,7 +190,7 @@ struct DDLTask
static std::unique_ptr<zkutil::Lock> createSimpleZooKeeperLock(
std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message)
const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message)
{
auto zookeeper_holder = std::make_shared<zkutil::ZooKeeperHolder>();
zookeeper_holder->initFromInstance(zookeeper);
......@@ -239,21 +239,39 @@ DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_, const
host_fqdn = getFQDNOrHostName();
host_fqdn_id = Cluster::Address::toString(host_fqdn, context.getTCPPort());
event_queue_updated = std::make_shared<Poco::Event>();
thread = ThreadFromGlobalPool(&DDLWorker::run, this);
main_thread = ThreadFromGlobalPool(&DDLWorker::runMainThread, this);
cleanup_thread = ThreadFromGlobalPool(&DDLWorker::runCleanupThread, this);
}
DDLWorker::~DDLWorker()
{
stop_flag = true;
event_queue_updated->set();
thread.join();
queue_updated_event->set();
cleanup_event->set();
main_thread.join();
cleanup_thread.join();
}
DDLWorker::ZooKeeperPtr DDLWorker::tryGetZooKeeper() const
{
std::lock_guard lock(zookeeper_mutex);
return current_zookeeper;
}
DDLWorker::ZooKeeperPtr DDLWorker::getAndSetZooKeeper()
{
std::lock_guard lock(zookeeper_mutex);
if (!current_zookeeper || current_zookeeper->expired())
current_zookeeper = context.getZooKeeper();
return current_zookeeper;
}
bool DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason)
bool DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper)
{
String node_data;
String entry_path = queue_dir + "/" + entry_name;
......@@ -277,14 +295,14 @@ bool DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason)
{
/// What should we do if we even cannot parse host name and therefore cannot properly submit execution status?
/// We can try to create fail node using FQDN if it equal to host name in cluster config attempt will be successful.
/// Otherwise, that node will be ignored by DDLQueryStatusInputSream.
/// Otherwise, that node will be ignored by DDLQueryStatusInputStream.
tryLogCurrentException(log, "Cannot parse DDL task " + entry_name + ", will try to send error status");
String status = ExecutionStatus::fromCurrentException().serializeText();
try
{
createStatusDirs(entry_path);
createStatusDirs(entry_path, zookeeper);
zookeeper->tryCreate(entry_path + "/finished/" + host_fqdn_id, status, zkutil::CreateMode::Persistent);
}
catch (...)
......@@ -341,8 +359,9 @@ static void filterAndSortQueueNodes(Strings & all_nodes)
void DDLWorker::processTasks()
{
LOG_DEBUG(log, "Processing tasks");
auto zookeeper = tryGetZooKeeper();
Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, event_queue_updated);
Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, queue_updated_event);
filterAndSortQueueNodes(queue_nodes);
if (queue_nodes.empty())
return;
......@@ -373,7 +392,7 @@ void DDLWorker::processTasks()
if (!current_task)
{
String reason;
if (!initAndCheckTask(entry_name, reason))
if (!initAndCheckTask(entry_name, reason, zookeeper))
{
LOG_DEBUG(log, "Will not execute task " << entry_name << ": " << reason);
last_processed_task_name = entry_name;
......@@ -395,7 +414,7 @@ void DDLWorker::processTasks()
{
try
{
processTask(task);
processTask(task, zookeeper);
}
catch (...)
{
......@@ -559,7 +578,7 @@ void DDLWorker::attachToThreadGroup()
}
void DDLWorker::processTask(DDLTask & task)
void DDLWorker::processTask(DDLTask & task, const ZooKeeperPtr & zookeeper)
{
LOG_DEBUG(log, "Processing task " << task.entry_name << " (" << task.entry.query << ")");
......@@ -576,7 +595,7 @@ void DDLWorker::processTask(DDLTask & task)
else if (code == Coordination::ZNONODE)
{
/// There is no parent
createStatusDirs(task.entry_path);
createStatusDirs(task.entry_path, zookeeper);
if (Coordination::ZOK != zookeeper->tryCreate(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy))
throw Coordination::Exception(code, active_node_path);
}
......@@ -595,7 +614,7 @@ void DDLWorker::processTask(DDLTask & task)
if (auto ast_alter = dynamic_cast<const ASTAlterQuery *>(rewritten_ast.get()))
{
processTaskAlter(task, ast_alter, rewritten_query, task.entry_path);
processTaskAlter(task, ast_alter, rewritten_query, task.entry_path, zookeeper);
}
else
{
......@@ -629,7 +648,8 @@ void DDLWorker::processTaskAlter(
DDLTask & task,
const ASTAlterQuery * ast_alter,
const String & rewritten_query,
const String & node_path)
const String & node_path,
const ZooKeeperPtr & zookeeper)
{
String database = ast_alter->database.empty() ? context.getCurrentDatabase() : ast_alter->database;
StoragePtr storage = context.getTable(database, ast_alter->table);
......@@ -746,18 +766,8 @@ void DDLWorker::processTaskAlter(
}
void DDLWorker::cleanupQueue()
void DDLWorker::cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zookeeper)
{
/// Both ZK and Poco use Unix epoch
Int64 current_time_seconds = Poco::Timestamp().epochTime();
constexpr UInt64 zookeeper_time_resolution = 1000;
/// Too early to check
if (last_cleanup_time_seconds && current_time_seconds < last_cleanup_time_seconds + cleanup_delay_period)
return;
last_cleanup_time_seconds = current_time_seconds;
LOG_DEBUG(log, "Cleaning queue");
Strings queue_nodes = zookeeper->getChildren(queue_dir);
......@@ -768,6 +778,9 @@ void DDLWorker::cleanupQueue()
for (auto it = queue_nodes.cbegin(); it < queue_nodes.cend(); ++it)
{
if (stop_flag)
return;
String node_name = *it;
String node_path = queue_dir + "/" + node_name;
String lock_path = node_path + "/lock";
......@@ -782,6 +795,7 @@ void DDLWorker::cleanupQueue()
continue;
/// Delete node if its lifetmie is expired (according to task_max_lifetime parameter)
constexpr UInt64 zookeeper_time_resolution = 1000;
Int64 zookeeper_time_seconds = stat.ctime / zookeeper_time_resolution;
bool node_lifetime_is_expired = zookeeper_time_seconds + task_max_lifetime < current_time_seconds;
......@@ -839,7 +853,7 @@ void DDLWorker::cleanupQueue()
/// Try to create nonexisting "status" dirs for a node
void DDLWorker::createStatusDirs(const std::string & node_path)
void DDLWorker::createStatusDirs(const std::string & node_path, const ZooKeeperPtr & zookeeper)
{
Coordination::Requests ops;
{
......@@ -864,6 +878,8 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry)
if (entry.hosts.empty())
throw Exception("Empty host list in a distributed DDL task", ErrorCodes::LOGICAL_ERROR);
auto zookeeper = getAndSetZooKeeper();
String query_path_prefix = queue_dir + "/query-";
zookeeper->createAncestors(query_path_prefix);
......@@ -872,7 +888,7 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry)
/// Optional step
try
{
createStatusDirs(node_path);
createStatusDirs(node_path, zookeeper);
}
catch (...)
{
......@@ -884,7 +900,7 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry)
}
void DDLWorker::run()
void DDLWorker::runMainThread()
{
setThreadName("DDLWorker");
LOG_DEBUG(log, "Started DDLWorker thread");
......@@ -896,7 +912,7 @@ void DDLWorker::run()
{
try
{
zookeeper = context.getZooKeeper();
auto zookeeper = getAndSetZooKeeper();
zookeeper->createAncestors(queue_dir + "/");
initialized = true;
}
......@@ -916,7 +932,8 @@ void DDLWorker::run()
tryLogCurrentException(log, "Terminating. Cannot initialize DDL queue.");
return;
}
} while (!initialized && !stop_flag);
}
while (!initialized && !stop_flag);
while (!stop_flag)
{
......@@ -924,16 +941,11 @@ void DDLWorker::run()
{
attachToThreadGroup();
cleanup_event->set();
processTasks();
LOG_DEBUG(log, "Waiting a watch");
event_queue_updated->wait();
if (stop_flag)
break;
/// TODO: it might delay the execution, move it to separate thread.
cleanupQueue();
queue_updated_event->wait();
}
catch (const Coordination::Exception & e)
{
......@@ -945,7 +957,7 @@ void DDLWorker::run()
{
try
{
zookeeper = context.getZooKeeper();
getAndSetZooKeeper();
break;
}
catch (...)
......@@ -962,9 +974,6 @@ void DDLWorker::run()
LOG_ERROR(log, "Unexpected ZooKeeper error: " << getCurrentExceptionMessage(true) << ". Terminating.");
return;
}
/// Unlock the processing just in case
event_queue_updated->set();
}
catch (...)
{
......@@ -975,12 +984,48 @@ void DDLWorker::run()
}
class DDLQueryStatusInputSream : public IBlockInputStream
void DDLWorker::runCleanupThread()
{
setThreadName("DDLWorkerClnr");
LOG_DEBUG(log, "Started DDLWorker cleanup thread");
Int64 last_cleanup_time_seconds = 0;
while (!stop_flag)
{
try
{
cleanup_event->wait();
if (stop_flag)
break;
Int64 current_time_seconds = Poco::Timestamp().epochTime();
if (last_cleanup_time_seconds && current_time_seconds < last_cleanup_time_seconds + cleanup_delay_period)
{
LOG_TRACE(log, "Too early to clean queue, will do it later.");
continue;
}
auto zookeeper = tryGetZooKeeper();
if (zookeeper->expired())
continue;
cleanupQueue(current_time_seconds, zookeeper);
last_cleanup_time_seconds = current_time_seconds;
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}
}
class DDLQueryStatusInputStream : public IBlockInputStream
{
public:
DDLQueryStatusInputSream(const String & zk_node_path, const DDLLogEntry & entry, const Context & context)
: node_path(zk_node_path), context(context), watch(CLOCK_MONOTONIC_COARSE), log(&Logger::get("DDLQueryStatusInputSream"))
DDLQueryStatusInputStream(const String & zk_node_path, const DDLLogEntry & entry, const Context & context)
: node_path(zk_node_path), context(context), watch(CLOCK_MONOTONIC_COARSE), log(&Logger::get("DDLQueryStatusInputStream"))
{
sample = Block{
{std::make_shared<DataTypeString>(), "host"},
......@@ -1001,7 +1046,7 @@ public:
String getName() const override
{
return "DDLQueryStatusInputSream";
return "DDLQueryStatusInputStream";
}
Block getHeader() const override { return sample; }
......@@ -1101,7 +1146,7 @@ public:
return sample.cloneEmpty();
}
~DDLQueryStatusInputSream() override = default;
~DDLQueryStatusInputStream() override = default;
private:
......@@ -1244,7 +1289,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont
if (context.getSettingsRef().distributed_ddl_task_timeout == 0)
return io;
auto stream = std::make_shared<DDLQueryStatusInputSream>(node_path, entry, context);
auto stream = std::make_shared<DDLQueryStatusInputStream>(node_path, entry, context);
io.in = std::move(stream);
return io;
}
......
......@@ -41,33 +41,42 @@ public:
}
private:
using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
/// Returns cached ZooKeeper session (possibly expired).
ZooKeeperPtr tryGetZooKeeper() const;
/// If necessary, creates a new session and caches it.
ZooKeeperPtr getAndSetZooKeeper();
void processTasks();
/// Reads entry and check that the host belongs to host list of the task
/// Returns true and sets current_task if entry parsed and the check is passed
bool initAndCheckTask(const String & entry_name, String & out_reason);
bool initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper);
void processTask(DDLTask & task);
void processTask(DDLTask & task, const ZooKeeperPtr & zookeeper);
void processTaskAlter(
DDLTask & task,
const ASTAlterQuery * ast_alter,
const String & rewritten_query,
const String & node_path);
const String & node_path,
const ZooKeeperPtr & zookeeper);
void parseQueryAndResolveHost(DDLTask & task);
bool tryExecuteQuery(const String & query, const DDLTask & task, ExecutionStatus & status);
/// Checks and cleanups queue's nodes
void cleanupQueue();
void cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zookeeper);
/// Init task node
void createStatusDirs(const std::string & node_name);
void createStatusDirs(const std::string & node_name, const ZooKeeperPtr & zookeeper);
void run();
void runMainThread();
void runCleanupThread();
void attachToThreadGroup();
......@@ -83,17 +92,19 @@ private:
/// Name of last task that was skipped or successfully executed
std::string last_processed_task_name;
std::shared_ptr<zkutil::ZooKeeper> zookeeper;
mutable std::mutex zookeeper_mutex;
ZooKeeperPtr current_zookeeper;
/// Save state of executed task to avoid duplicate execution on ZK error
using DDLTaskPtr = std::unique_ptr<DDLTask>;
DDLTaskPtr current_task;
std::shared_ptr<Poco::Event> event_queue_updated;
std::shared_ptr<Poco::Event> queue_updated_event = std::make_shared<Poco::Event>();
std::shared_ptr<Poco::Event> cleanup_event = std::make_shared<Poco::Event>();
std::atomic<bool> stop_flag{false};
ThreadFromGlobalPool thread;
Int64 last_cleanup_time_seconds = 0;
ThreadFromGlobalPool main_thread;
ThreadFromGlobalPool cleanup_thread;
/// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago
Int64 cleanup_delay_period = 60; // minute (in seconds)
......@@ -104,7 +115,7 @@ private:
ThreadGroupStatusPtr thread_group;
friend class DDLQueryStatusInputSream;
friend class DDLQueryStatusInputStream;
friend struct DDLTask;
};
......
......@@ -298,8 +298,8 @@ struct Settings
M(SettingBool, enable_debug_queries, false, "Enables debug queries such as AST.") \
M(SettingBool, enable_unaligned_array_join, false, "Allow ARRAY JOIN with multiple arrays that have different sizes. When this settings is enabled, arrays will be resized to the longest one.") \
M(SettingBool, low_cardinality_allow_in_native_format, true, "Use LowCardinality type in Native format. Otherwise, convert LowCardinality columns to ordinary for select query, and convert ordinary columns to required LowCardinality for insert query.") \
M(SettingBool, allow_experimental_multiple_joins_emulation, false, "Emulate multiple joins using subselects") \
M(SettingBool, allow_experimental_cross_to_join_conversion, false, "Convert CROSS JOIN to INNER JOIN if possible") \
M(SettingBool, allow_experimental_multiple_joins_emulation, true, "Emulate multiple joins using subselects") \
M(SettingBool, allow_experimental_cross_to_join_conversion, true, "Convert CROSS JOIN to INNER JOIN if possible") \
M(SettingBool, cancel_http_readonly_queries_on_client_close, false, "Cancel HTTP readonly queries when a client closes the connection without waiting for response.") \
M(SettingBool, external_table_functions_use_nulls, true, "If it is set to true, external table functions will implicitly use Nullable type if needed. Otherwise NULLs will be substituted with default values. Currently supported only for 'mysql' table function.") \
M(SettingBool, allow_experimental_data_skipping_indices, false, "If it is set to true, data skipping indices can be used in CREATE TABLE/ALTER TABLE queries.")\
......
......@@ -41,15 +41,7 @@ struct ColumnsDescription
explicit ColumnsDescription(NamesAndTypesList ordinary_) : ordinary(std::move(ordinary_)) {}
bool operator==(const ColumnsDescription & other) const
{
return ordinary == other.ordinary
&& materialized == other.materialized
&& aliases == other.aliases
&& defaults == other.defaults
&& comments == other.comments
&& codecs == other.codecs;
}
bool operator==(const ColumnsDescription & other) const;
bool operator!=(const ColumnsDescription & other) const { return !(*this == other); }
......
......@@ -3,12 +3,12 @@
#if USE_HDFS
#include <Storages/StorageFactory.h>
#include <Storages/StorageHDFS.h>
#include <Storages/StorageHDFS.h> // Y_IGNORE
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ReadBufferFromHDFS.h>
#include <IO/WriteBufferFromHDFS.h>
#include <IO/ReadBufferFromHDFS.h> // Y_IGNORE
#include <IO/WriteBufferFromHDFS.h> // Y_IGNORE
#include <Formats/FormatFactory.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
......
#include <Common/config.h>
#if USE_HDFS
#include <Storages/StorageHDFS.h>
#include <Storages/StorageHDFS.h> // Y_IGNORE
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionHDFS.h>
#include <TableFunctions/TableFunctionHDFS.h> // Y_IGNORE
namespace DB
{
......
......@@ -9,7 +9,7 @@ endif ()
install (PROGRAMS clickhouse-test clickhouse-test-server DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
install (
DIRECTORY queries performance external_dictionaries
DIRECTORY queries performance
DESTINATION ${CMAKE_INSTALL_DATAROOTDIR}/clickhouse-test
USE_SOURCE_PERMISSIONS
COMPONENT clickhouse
......@@ -20,8 +20,6 @@ install (
install (FILES server-test.xml DESTINATION ${CLICKHOUSE_ETC_DIR}/clickhouse-server COMPONENT clickhouse)
install (FILES client-test.xml DESTINATION ${CLICKHOUSE_ETC_DIR}/clickhouse-client COMPONENT clickhouse)
add_subdirectory (external_dictionaries)
if (ENABLE_TESTS)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/CTestCustom.cmake ${CMAKE_BINARY_DIR})
......
# Automatic tests for external dictionaries
## Prerequisites:
```
sudo apt install python-lxml python-termcolor
```
## Example
```
MYSQL_OPTIONS=--user=root ./run.sh
```
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<log>./data/clickhouse-server.log</log>
<errorlog>./data/clickhouse-server.err.log</errorlog>
<size>never</size>
<count>50</count>
</logger>
<tcp_port>9001</tcp_port>
<listen_host>localhost</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
<dictionaries_config>generated/*.xml</dictionaries_config>
</yandex>
add_library (dictionary_library SHARED "dictionary_library.cpp")
target_include_directories (dictionary_library PRIVATE ${DBMS_INCLUDE_DIR})
add_library (dictionary_library_c SHARED "dictionary_library_c.c")
target_include_directories (dictionary_library_c PRIVATE ${DBMS_INCLUDE_DIR})
add_library (dictionary_library_empty SHARED "dictionary_library_empty.cpp")
target_include_directories (dictionary_library_empty PRIVATE ${DBMS_INCLUDE_DIR})
# Don't add "lib" prefix, and don't change lib name in debug build
# because result .so will be pointed in dictionary_*.xml
set_target_properties(dictionary_library PROPERTIES PREFIX "" DEBUG_POSTFIX "")
set_target_properties(dictionary_library_c PROPERTIES PREFIX "" DEBUG_POSTFIX "")
set_target_properties(dictionary_library_empty PROPERTIES PREFIX "" DEBUG_POSTFIX "")
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册