提交 565c273e 编写于 作者: A Alexander Tokmakov

Merge branch 'master' into merging_values_with_expressions

......@@ -2,7 +2,7 @@ if (NOT ARCH_ARM AND NOT OS_FREEBSD)
option (ENABLE_FASTOPS "Enable fast vectorized mathematical functions library by Mikhail Parakhin" ${NOT_UNBUNDLED})
endif ()
if (ENABLE_FASTOPS)
if (ENABLE_FASTOPS AND NOT CMAKE_SYSTEM_NAME MATCHES "Darwin")
if(NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/fastops/fastops/fastops.h")
message(FATAL_ERROR "submodule contrib/fastops is missing. to fix try run: \n git submodule update --init --recursive")
set(USE_FASTOPS 0)
......
option(ENABLE_ICU "Enable ICU" ON)
if(ENABLE_ICU)
if(ENABLE_ICU AND NOT CMAKE_SYSTEM_NAME MATCHES "Darwin")
if (APPLE)
set(ICU_ROOT "/usr/local/opt/icu4c" CACHE STRING "")
endif()
......
option (ENABLE_PARQUET "Enable parquet" ON)
if (ENABLE_PARQUET)
if (ENABLE_PARQUET AND NOT CMAKE_SYSTEM_NAME MATCHES "Darwin")
if (NOT OS_FREEBSD AND NOT APPLE) # Freebsd: ../contrib/arrow/cpp/src/arrow/util/bit-util.h:27:10: fatal error: endian.h: No such file or directory
option(USE_INTERNAL_PARQUET_LIBRARY "Set to FALSE to use system parquet library instead of bundled" ${NOT_UNBUNDLED})
......
option (ENABLE_PROTOBUF "Enable protobuf" ON)
if (ENABLE_PROTOBUF)
if (ENABLE_PROTOBUF AND NOT CMAKE_SYSTEM_NAME MATCHES "Darwin")
option(USE_INTERNAL_PROTOBUF_LIBRARY "Set to FALSE to use system protobuf instead of bundled" ${NOT_UNBUNDLED})
......
......@@ -2,7 +2,7 @@ include (CMakePushCheckState)
cmake_push_check_state ()
option (ENABLE_READLINE "Enable readline" 1)
if (ENABLE_READLINE)
if (ENABLE_READLINE AND NOT CMAKE_SYSTEM_NAME MATCHES "Darwin")
set (READLINE_PATHS "/usr/local/opt/readline/lib")
# First try find custom lib for macos users (default lib without history support)
......
option(USE_SNAPPY "Enable support of snappy library" ON)
if (USE_SNAPPY)
if (USE_SNAPPY AND NOT CMAKE_SYSTEM_NAME MATCHES "Darwin")
option (USE_INTERNAL_SNAPPY_LIBRARY "Set to FALSE to use system snappy library instead of bundled" ${NOT_UNBUNDLED})
if(NOT USE_INTERNAL_SNAPPY_LIBRARY)
......
option (ENABLE_SSL "Enable ssl" ON)
if (ENABLE_SSL)
if (ENABLE_SSL AND NOT CMAKE_SYSTEM_NAME MATCHES "Darwin")
if(NOT ARCH_32)
option(USE_INTERNAL_SSL_LIBRARY "Set to FALSE to use system *ssl library instead of bundled" ${NOT_UNBUNDLED})
......
......@@ -65,14 +65,19 @@ if (CMAKE_CROSSCOMPILING)
set (HAS_POST_2038_EXITCODE "0" CACHE STRING "Result from TRY_RUN" FORCE)
set (HAS_POST_2038_EXITCODE__TRYRUN_OUTPUT "" CACHE STRING "Output from TRY_RUN" FORCE)
# CMake < 3.13 doesn't respect same-name variables as values for options.
# FIXME: broken dependencies
set (USE_SNAPPY OFF)
set (ENABLE_SSL OFF)
set (ENABLE_PROTOBUF OFF)
set (ENABLE_PARQUET OFF)
set (ENABLE_READLINE OFF)
set (ENABLE_ICU OFF)
set (ENABLE_FASTOPS OFF)
# set (USE_SNAPPY OFF)
# set (ENABLE_SSL OFF)
# set (ENABLE_PROTOBUF OFF)
# set (ENABLE_PARQUET OFF)
# set (ENABLE_READLINE OFF)
# set (ENABLE_ICU OFF)
# set (ENABLE_FASTOPS OFF)
message (STATUS "Cross-compiling for Darwin")
else ()
message (FATAL_ERROR "Trying to cross-compile to unsupported target: ${CMAKE_SYSTEM_NAME}!")
endif ()
# Don't know why but CXX_STANDARD doesn't work for cross-compilation
......
......@@ -44,7 +44,7 @@ target_include_directories(cxx SYSTEM BEFORE PUBLIC $<BUILD_INTERFACE:${LIBCXX_S
target_compile_definitions(cxx PRIVATE -D_LIBCPP_BUILDING_LIBRARY -DLIBCXX_BUILDING_LIBCXXABI)
target_compile_options(cxx PUBLIC -nostdinc++ -Wno-reserved-id-macro)
if (OS_DARWIN)
if (OS_DARWIN AND NOT CMAKE_CXX_COMPILER_VERSION VERSION_LESS 9)
target_compile_options(cxx PUBLIC -Wno-ctad-maybe-unsupported)
endif ()
......
......@@ -24,6 +24,8 @@ MetricsTransmitter::MetricsTransmitter(
send_events_cumulative = config.getBool(config_name + ".events_cumulative", false);
send_metrics = config.getBool(config_name + ".metrics", true);
send_asynchronous_metrics = config.getBool(config_name + ".asynchronous_metrics", true);
thread = ThreadFromGlobalPool{&MetricsTransmitter::run, this};
}
......@@ -38,7 +40,7 @@ MetricsTransmitter::~MetricsTransmitter()
cond.notify_one();
thread.join();
thread->join();
}
catch (...)
{
......
......@@ -5,8 +5,10 @@
#include <string>
#include <thread>
#include <vector>
#include <Common/ProfileEvents.h>
#include <optional>
#include <Core/Types.h>
#include <Common/ThreadPool.h>
#include <Common/ProfileEvents.h>
namespace Poco
......@@ -52,7 +54,7 @@ private:
bool quit = false;
std::mutex mutex;
std::condition_variable cond;
ThreadFromGlobalPool thread{&MetricsTransmitter::run, this};
std::optional<ThreadFromGlobalPool> thread;
static inline constexpr auto profile_events_path_prefix = "ClickHouse.ProfileEvents.";
static inline constexpr auto profile_events_cumulative_path_prefix = "ClickHouse.ProfileEventsCumulative.";
......
......@@ -111,7 +111,7 @@ public:
using IteratorType = typename MapType::iterator;
array_column.getData().get(values_vec_offset + i, value);
const auto & key = keys_vec.getData()[keys_vec_offset + i];
const auto & key = keys_vec.getElement(keys_vec_offset + i);
if (!keepKey(key))
{
......
......@@ -171,7 +171,7 @@ struct OneAdder
{
if constexpr (!std::is_same_v<T, String>)
{
const auto & value = assert_cast<const ColumnVector<T> &>(column).getData()[row_num];
const auto & value = assert_cast<const ColumnVector<T> &>(column).getElement(row_num);
data.set.insert(AggregateFunctionUniqTraits<T>::hash(value));
}
else
......
......@@ -132,7 +132,7 @@ public:
{
if constexpr (!std::is_same_v<T, String>)
{
const auto & value = assert_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num];
const auto & value = assert_cast<const ColumnVector<T> &>(*columns[0]).getElement(row_num);
this->data(place).set.insert(detail::AggregateFunctionUniqCombinedTraits<T>::hash(value));
}
else
......
#include <Common/filesystemHelpers.h>
#include <common/config_common.h>
#include <Poco/File.h>
#include <Poco/Path.h>
namespace DB
{
bool enoughSpaceInDirectory(const std::string & path, size_t data_size)
{
#if !UNBUNDLED
auto free_space = Poco::File(path).freeSpace();
if (data_size > free_space)
return false;
#endif
return true;
}
std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path)
{
Poco::File(path).createDirectories();
/// NOTE: std::make_shared cannot use protected constructors
return std::make_unique<TemporaryFile>(path);
}
}
#pragma once
#include <string>
#include <memory>
#include <Poco/TemporaryFile.h>
namespace DB
{
using TemporaryFile = Poco::TemporaryFile;
bool enoughSpaceInDirectory(const std::string & path, size_t data_size);
std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path);
}
......@@ -305,8 +305,9 @@ struct Settings : public SettingsCollection<Settings>
M(SettingOverflowMode, join_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \
M(SettingBool, join_any_take_last_row, false, "When disabled (default) ANY JOIN will take the first found row for a key. When enabled, it will take the last row seen if there are multiple rows for the same key.") \
M(SettingBool, partial_merge_join, false, "Use partial merge join instead of hash join for LEFT and INNER JOINs.") \
M(SettingBool, partial_merge_join_optimizations, false, "Enable optimisations in partial merge join") \
M(SettingUInt64, partial_merge_join_rows_in_right_blocks, 10000, "Split right-hand joining data in blocks of specified size.") \
M(SettingBool, partial_merge_join_optimizations, false, "Enable optimizations in partial merge join") \
M(SettingUInt64, partial_merge_join_rows_in_right_blocks, 10000, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.") \
M(SettingFloat, partial_merge_join_rows_in_left_blocks, 10000, "Group left-hand joining data in bigger blocks. Setting it to a bigger value increase JOIN performance and memory usage.") \
\
M(SettingUInt64, max_rows_to_transfer, 0, "Maximum size (in rows) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \
M(SettingUInt64, max_bytes_to_transfer, 0, "Maximum size (in uncompressed bytes) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \
......
......@@ -74,9 +74,4 @@ Block AggregatingBlockInputStream::readImpl()
return impl->read();
}
AggregatingBlockInputStream::TemporaryFileStream::TemporaryFileStream(const std::string & path)
: file_in(path), compressed_in(file_in),
block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get())) {}
}
......@@ -4,6 +4,7 @@
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/TemporaryFileStream.h>
namespace DB
......@@ -41,15 +42,6 @@ protected:
bool executed = false;
/// To read the data that was flushed into the temporary data file.
struct TemporaryFileStream
{
ReadBufferFromFile file_in;
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
TemporaryFileStream(const std::string & path);
};
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
/** From here we will get the completed blocks after the aggregation. */
......
......@@ -4,7 +4,6 @@
#include <DataStreams/copyData.h>
#include <DataStreams/processConstants.h>
#include <Common/formatReadable.h>
#include <common/config_common.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Interpreters/sortBlock.h>
......@@ -79,14 +78,10 @@ Block MergeSortingBlockInputStream::readImpl()
*/
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
#if !UNBUNDLED
auto free_space = Poco::File(tmp_path).freeSpace();
if (sum_bytes_in_blocks + min_free_disk_space > free_space)
if (!enoughSpaceInDirectory(tmp_path, sum_bytes_in_blocks + min_free_disk_space))
throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
#endif
Poco::File(tmp_path).createDirectories();
temporary_files.emplace_back(std::make_unique<Poco::TemporaryFile>(tmp_path));
temporary_files.emplace_back(createTemporaryFile(tmp_path));
const std::string & path = temporary_files.back()->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
......
#pragma once
#include <queue>
#include <Poco/TemporaryFile.h>
#include <common/logger_useful.h>
#include <Common/filesystemHelpers.h>
#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/TemporaryFileStream.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
......@@ -114,19 +115,7 @@ private:
Block header_without_constants;
/// Everything below is for external sorting.
std::vector<std::unique_ptr<Poco::TemporaryFile>> temporary_files;
/// For reading data from temporary file.
struct TemporaryFileStream
{
ReadBufferFromFile file_in;
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
TemporaryFileStream(const std::string & path, const Block & header_)
: file_in(path), compressed_in(file_in), block_in(std::make_shared<NativeBlockInputStream>(compressed_in, header_, 0)) {}
};
std::vector<std::unique_ptr<TemporaryFile>> temporary_files;
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
BlockInputStreams inputs_to_merge;
......
......@@ -101,12 +101,6 @@ Block ParallelAggregatingBlockInputStream::readImpl()
}
ParallelAggregatingBlockInputStream::TemporaryFileStream::TemporaryFileStream(const std::string & path)
: file_in(path), compressed_in(file_in),
block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get())) {}
void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t thread_num)
{
parent.aggregator.executeOnBlock(block, *parent.many_data[thread_num],
......
......@@ -5,6 +5,7 @@
#include <Compression/CompressedReadBuffer.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/ParallelInputsProcessor.h>
#include <DataStreams/TemporaryFileStream.h>
namespace DB
......@@ -57,16 +58,6 @@ private:
bool no_more_keys = false;
std::atomic<bool> executed {false};
/// To read the data stored into the temporary data file.
struct TemporaryFileStream
{
ReadBufferFromFile file_in;
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
TemporaryFileStream(const std::string & path);
};
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
Logger * log = &Logger::get("ParallelAggregatingBlockInputStream");
......
......@@ -5,8 +5,8 @@ namespace DB
{
SquashingBlockInputStream::SquashingBlockInputStream(
const BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes)
: header(src->getHeader()), transform(min_block_size_rows, min_block_size_bytes)
const BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes, bool reserve_memory)
: header(src->getHeader()), transform(min_block_size_rows, min_block_size_bytes, reserve_memory)
{
children.emplace_back(src);
}
......
......@@ -12,7 +12,8 @@ namespace DB
class SquashingBlockInputStream : public IBlockInputStream
{
public:
SquashingBlockInputStream(const BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes);
SquashingBlockInputStream(const BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes,
bool reserve_memory = false);
String getName() const override { return "Squashing"; }
......
......@@ -4,8 +4,10 @@
namespace DB
{
SquashingTransform::SquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_)
: min_block_size_rows(min_block_size_rows_), min_block_size_bytes(min_block_size_bytes_)
SquashingTransform::SquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_, bool reserve_memory_)
: min_block_size_rows(min_block_size_rows_)
, min_block_size_bytes(min_block_size_bytes_)
, reserve_memory(reserve_memory_)
{
}
......@@ -59,7 +61,12 @@ void SquashingTransform::append(MutableColumns && columns)
}
for (size_t i = 0, size = columns.size(); i < size; ++i)
accumulated_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
{
auto & column = accumulated_columns[i];
if (reserve_memory)
column->reserve(min_block_size_bytes);
column->insertRangeFrom(*columns[i], 0, columns[i]->size());
}
}
......
......@@ -23,7 +23,7 @@ class SquashingTransform
{
public:
/// Conditions on rows and bytes are OR-ed. If one of them is zero, then corresponding condition is ignored.
SquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_);
SquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_, bool reserve_memory_ = false);
/// When not ready, you need to pass more blocks to add function.
struct Result
......@@ -43,6 +43,7 @@ public:
private:
size_t min_block_size_rows;
size_t min_block_size_bytes;
bool reserve_memory;
MutableColumns accumulated_columns;
......
#pragma once
#include <Common/ClickHouseRevision.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <Compression/CompressedReadBuffer.h>
#include <IO/ReadBufferFromFile.h>
namespace DB
{
/// To read the data that was flushed into the temporary data file.
struct TemporaryFileStream
{
ReadBufferFromFile file_in;
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
TemporaryFileStream(const std::string & path)
: file_in(path)
, compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get()))
{}
TemporaryFileStream(const std::string & path, const Block & header_)
: file_in(path)
, compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, header_, 0))
{}
};
}
......@@ -111,7 +111,7 @@ void DataTypeDecimal<T>::serializeBinary(const Field & field, WriteBuffer & ostr
template <typename T>
void DataTypeDecimal<T>::serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const
{
const FieldType & x = assert_cast<const ColumnType &>(column).getData()[row_num];
const FieldType & x = assert_cast<const ColumnType &>(column).getElement(row_num);
writeBinary(x, ostr);
}
......
#include <IO/WriteBufferFromTemporaryFile.h>
#include <IO/ReadBufferFromFile.h>
#include <Poco/Path.h>
#include <fcntl.h>
......@@ -15,17 +14,14 @@ namespace ErrorCodes
}
WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(std::unique_ptr<Poco::TemporaryFile> && tmp_file_)
WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(std::unique_ptr<TemporaryFile> && tmp_file_)
: WriteBufferFromFile(tmp_file_->path(), DBMS_DEFAULT_BUFFER_SIZE, O_RDWR | O_TRUNC | O_CREAT, 0600), tmp_file(std::move(tmp_file_))
{}
WriteBufferFromTemporaryFile::Ptr WriteBufferFromTemporaryFile::create(const std::string & tmp_dir)
{
Poco::File(tmp_dir).createDirectories();
/// NOTE: std::make_shared cannot use protected constructors
return Ptr{new WriteBufferFromTemporaryFile(std::make_unique<Poco::TemporaryFile>(tmp_dir))};
return Ptr{new WriteBufferFromTemporaryFile(createTemporaryFile(tmp_dir))};
}
......@@ -45,11 +41,11 @@ public:
return std::make_shared<ReadBufferFromTemporaryWriteBuffer>(fd, file_name, std::move(origin->tmp_file));
}
ReadBufferFromTemporaryWriteBuffer(int fd_, const std::string & file_name_, std::unique_ptr<Poco::TemporaryFile> && tmp_file_)
ReadBufferFromTemporaryWriteBuffer(int fd_, const std::string & file_name_, std::unique_ptr<TemporaryFile> && tmp_file_)
: ReadBufferFromFile(fd_, file_name_), tmp_file(std::move(tmp_file_))
{}
std::unique_ptr<Poco::TemporaryFile> tmp_file;
std::unique_ptr<TemporaryFile> tmp_file;
};
......
#include <IO/WriteBuffer.h>
#include <IO/IReadableWriteBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <Poco/TemporaryFile.h>
#include <Common/filesystemHelpers.h>
namespace DB
......@@ -20,13 +20,13 @@ public:
protected:
WriteBufferFromTemporaryFile(std::unique_ptr<Poco::TemporaryFile> && tmp_file);
WriteBufferFromTemporaryFile(std::unique_ptr<TemporaryFile> && tmp_file);
std::shared_ptr<ReadBuffer> getReadBufferImpl() override;
protected:
std::unique_ptr<Poco::TemporaryFile> tmp_file;
std::unique_ptr<TemporaryFile> tmp_file;
friend class ReadBufferFromTemporaryWriteBuffer;
};
......
......@@ -646,11 +646,8 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
&& current_memory_usage > static_cast<Int64>(params.max_bytes_before_external_group_by)
&& worth_convert_to_two_level)
{
#if !UNBUNDLED
auto free_space = Poco::File(params.tmp_path).freeSpace();
if (current_memory_usage + params.min_free_disk_space > free_space)
if (!enoughSpaceInDirectory(params.tmp_path, current_memory_usage + params.min_free_disk_space))
throw Exception("Not enough space for external aggregation in " + params.tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
#endif
writeToTemporaryFile(result);
}
......@@ -664,8 +661,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
Stopwatch watch;
size_t rows = data_variants.size();
Poco::File(params.tmp_path).createDirectories();
auto file = std::make_unique<Poco::TemporaryFile>(params.tmp_path);
auto file = createTemporaryFile(params.tmp_path);
const std::string & path = file->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
......
......@@ -4,8 +4,6 @@
#include <memory>
#include <functional>
#include <Poco/TemporaryFile.h>
#include <common/logger_useful.h>
#include <common/StringRef.h>
......@@ -18,6 +16,7 @@
#include <Common/LRUCache.h>
#include <Common/ColumnsHashing.h>
#include <Common/assert_cast.h>
#include <Common/filesystemHelpers.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/SizeLimits.h>
......
......@@ -21,6 +21,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int PARAMETER_OUT_OF_BOUND;
}
AnalyzedJoin::AnalyzedJoin(const Settings & settings)
......@@ -272,4 +273,11 @@ JoinPtr makeJoin(std::shared_ptr<AnalyzedJoin> table_join, const Block & right_s
return std::make_shared<Join>(table_join, right_sample_block);
}
bool isMergeJoin(const JoinPtr & join)
{
if (join)
return typeid_cast<const MergeJoin *>(join.get());
return false;
}
}
......@@ -39,9 +39,9 @@ class AnalyzedJoin
const SizeLimits size_limits;
const bool join_use_nulls;
const bool partial_merge_join;
const bool partial_merge_join_optimizations;
const size_t partial_merge_join_rows_in_right_blocks;
const bool partial_merge_join = false;
const bool partial_merge_join_optimizations = false;
const size_t partial_merge_join_rows_in_right_blocks = 0;
Names key_names_left;
Names key_names_right; /// Duplicating names are qualified.
......@@ -67,9 +67,6 @@ public:
const Names & key_names_right_)
: size_limits(limits)
, join_use_nulls(use_nulls)
, partial_merge_join(false)
, partial_merge_join_optimizations(false)
, partial_merge_join_rows_in_right_blocks(0)
, key_names_right(key_names_right_)
{
table_join.kind = kind;
......@@ -119,4 +116,6 @@ public:
struct ASTTableExpression;
NamesAndTypesList getNamesAndTypeListFromTableExpression(const ASTTableExpression & table_expression, const Context & context);
bool isMergeJoin(const JoinPtr &);
}
......@@ -23,7 +23,7 @@ public:
const NameSet & source_columns;
const NameSet & joined_columns;
const Aliases & aliases;
const bool is_asof;
const bool is_asof{false};
ASTPtr asof_left_key{};
ASTPtr asof_right_key{};
bool has_some{false};
......
......@@ -25,6 +25,7 @@
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/ReverseBlockInputStream.h>
#include <DataStreams/FillingBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
......@@ -1116,6 +1117,13 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
/// Applies to all sources except stream_with_non_joined_data.
for (auto & stream : pipeline.streams)
stream = std::make_shared<ExpressionBlockInputStream>(stream, expressions.before_join);
if (isMergeJoin(expressions.before_join->getTableJoinAlgo()) && settings.partial_merge_join_optimizations)
{
if (size_t rows_in_block = settings.partial_merge_join_rows_in_left_blocks)
for (auto & stream : pipeline.streams)
stream = std::make_shared<SquashingBlockInputStream>(stream, rows_in_block, 0, true);
}
}
if (JoinPtr join = expressions.before_join->getTableJoinAlgo())
......
......@@ -168,14 +168,10 @@ void MergeSortingTransform::consume(Chunk chunk)
*/
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
#if !UNBUNDLED
auto free_space = Poco::File(tmp_path).freeSpace();
if (sum_bytes_in_blocks + min_free_disk_space > free_space)
if (!enoughSpaceInDirectory(tmp_path, sum_bytes_in_blocks + min_free_disk_space))
throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
#endif
Poco::File(tmp_path).createDirectories();
temporary_files.emplace_back(std::make_unique<Poco::TemporaryFile>(tmp_path));
temporary_files.emplace_back(createTemporaryFile(tmp_path));
const std::string & path = temporary_files.back()->path();
merge_sorter = std::make_unique<MergeSorter>(std::move(chunks), description, max_merged_block_size, limit);
auto current_processor = std::make_shared<BufferingToFileTransform>(header_without_constants, log, path);
......
#pragma once
#include <Processors/Transforms/SortingTransform.h>
#include <Core/SortDescription.h>
#include <Poco/TemporaryFile.h>
#include <Common/filesystemHelpers.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <DataStreams/IBlockInputStream.h>
......@@ -52,7 +52,7 @@ private:
bool remerge_is_useful = true;
/// Everything below is for external sorting.
std::vector<std::unique_ptr<Poco::TemporaryFile>> temporary_files;
std::vector<std::unique_ptr<TemporaryFile>> temporary_files;
/// Merge all accumulated blocks to keep no more than limit rows.
void remerge();
......
......@@ -8,7 +8,6 @@ CREATE TABLE t2 (x UInt32, y UInt64) engine = MergeTree ORDER BY (x,y);
SET partial_merge_join = 1;
SET partial_merge_join_optimizations = 1;
SET partial_merge_join_rows_in_right_blocks = 2;
SET any_join_distinct_right_table_keys = 1;
INSERT INTO t1 (x, y) VALUES (0, 0);
......
......@@ -107,7 +107,10 @@ def parse_env_variables(build_type, compiler, sanitizer, package_type, cache, di
result = []
cmake_flags = ['$CMAKE_FLAGS', '-DADD_GDB_INDEX_FOR_GOLD=1']
cc = compiler
if compiler.endswith("-darwin"):
cc = compiler[:-len("-darwin")]
else:
cc = compiler
cxx = cc.replace('gcc', 'g++').replace('clang', 'clang++')
if package_type == "deb":
result.append("DEB_CC={}".format(cc))
......@@ -119,11 +122,11 @@ def parse_env_variables(build_type, compiler, sanitizer, package_type, cache, di
cmake_flags.append('-DCMAKE_CXX_COMPILER=`which {}`'.format(cxx))
if "darwin" in compiler:
cmake_flags.append("-DCMAKE_AR:FILEPATH=/cctools/bin/x86_64-apple-darwin-ar") \
.append("-DCMAKE_RANLIB:FILEPATH=/cctools/bin/x86_64-apple-darwin-ranlib") \
.append("-DCMAKE_SYSTEM_NAME=Darwin") \
.append("-DSDK_PATH=/cctools/MacOSX10.14.sdk") \
.append("-DLINKER_NAME=/cctools/bin/x86_64-apple-darwin-ld")
cmake_flags.append("-DCMAKE_AR:FILEPATH=/cctools/bin/x86_64-apple-darwin-ar")
cmake_flags.append("-DCMAKE_RANLIB:FILEPATH=/cctools/bin/x86_64-apple-darwin-ranlib")
cmake_flags.append("-DCMAKE_SYSTEM_NAME=Darwin")
cmake_flags.append("-DSDK_PATH=/cctools/MacOSX10.14.sdk")
cmake_flags.append("-DLINKER_NAME=/cctools/bin/x86_64-apple-darwin-ld")
if sanitizer:
result.append("SANITIZER={}".format(sanitizer))
......
......@@ -14,39 +14,37 @@ sudo apt-get install clang-8
# Install Cross-Compilation Toolset
Let's remember the path where we install `cctools` as ${CCTOOLS}
```bash
mkdir cctools
mkdir ${CCTOOLS}
git clone https://github.com/tpoechtrager/apple-libtapi.git
cd apple-libtapi
INSTALLPREFIX=../cctools ./build.sh
INSTALLPREFIX=${CCTOOLS} ./build.sh
./install.sh
cd ..
git clone https://github.com/tpoechtrager/cctools-port.git
cd cctools-port/cctools
./configure --prefix=../cctools --with-libtapi=../cctools --target=x86_64-apple-darwin
./configure --prefix=${CCTOOLS} --with-libtapi=${CCTOOLS} --target=x86_64-apple-darwin
make install
cd ..
cd cctools
cd ${CCTOOLS}
wget https://github.com/phracker/MacOSX-SDKs/releases/download/10.14-beta4/MacOSX10.14.sdk.tar.xz
tar xJf MacOSX10.14.sdk.tar.xz
```
Let's remember the path where we created `cctools` directory as ${CCTOOLS_PARENT}
# Build ClickHouse
```bash
cd ClickHouse
mkdir build-osx
CC=clang-8 CXX=clang++-8 cmake . -Bbuild-osx -DCMAKE_SYSTEM_NAME=Darwin \
-DCMAKE_AR:FILEPATH=${CCTOOLS_PARENT}/cctools/bin/x86_64-apple-darwin-ar \
-DCMAKE_RANLIB:FILEPATH=${CCTOOLS_PARENT}/cctools/bin/x86_64-apple-darwin-ranlib \
-DLINKER_NAME=${CCTOOLS_PARENT}/cctools/bin/x86_64-apple-darwin-ld \
-DSDK_PATH=${CCTOOLS_PARENT}/cctools/MacOSX10.14.sdk \
-DUSE_SNAPPY=OFF -DENABLE_SSL=OFF -DENABLE_PROTOBUF=OFF -DENABLE_PARQUET=OFF -DENABLE_READLINE=OFF -DENABLE_ICU=OFF -DENABLE_FASTOPS=OFF
-DCMAKE_AR:FILEPATH=${CCTOOLS}/bin/x86_64-apple-darwin-ar \
-DCMAKE_RANLIB:FILEPATH=${CCTOOLS}/bin/x86_64-apple-darwin-ranlib \
-DLINKER_NAME=${CCTOOLS}/bin/x86_64-apple-darwin-ld \
-DSDK_PATH=${CCTOOLS}/MacOSX10.14.sdk
ninja -C build-osx
```
......
......@@ -649,9 +649,11 @@ If you want to get a list of unique items in an array, you can use arrayReduce('
A special function. See the section ["ArrayJoin function"](array_join.md#functions_arrayjoin).
## arrayDifference(arr)
## arrayDifference(arr) {#array_functions-arraydifference}
Takes an array, returns an array with the difference between all pairs of neighboring elements. For example:
Takes an array, returns an array of differences between adjacent elements. The first element will be 0, the second is the difference between the second and first elements of the original array, etc. The type of elements in the resulting array is determined by the type inference rules for subtraction (e.g. UInt8 - UInt8 = Int16). UInt*/Int*/Float* types are supported (type Decimal is not supported).
Example:
```sql
SELECT arrayDifference([1, 2, 3, 4])
......@@ -663,9 +665,23 @@ SELECT arrayDifference([1, 2, 3, 4])
└───────────────────────────────┘
```
## arrayDistinct(arr)
Example of the overflow due to result type Int64:
```sql
SELECT arrayDifference([0, 10000000000000000000])
```
```text
┌─arrayDifference([0, 10000000000000000000])─┐
│ [0,-8446744073709551616] │
└────────────────────────────────────────────┘
```
Takes an array, returns an array containing the distinct elements. For example:
## arrayDistinct(arr) {#array_functions-arraydistinct}
Takes an array, returns an array containing the distinct elements.
Example:
```sql
SELECT arrayDistinct([1, 2, 2, 3, 1])
......@@ -677,13 +693,27 @@ SELECT arrayDistinct([1, 2, 2, 3, 1])
└────────────────────────────────┘
```
## arrayEnumerateDense(arr)
## arrayEnumerateDense(arr) {#array_functions-arrayenumeratedense}
Returns an array of the same size as the source array, indicating where each element first appears in the source array. For example: arrayEnumerateDense([10,20,10,30]) = [1,2,1,3].
Returns an array of the same size as the source array, indicating where each element first appears in the source array.
## arrayIntersect(arr)
Example:
Takes an array, returns the intersection of all array elements. For example:
```sql
SELECT arrayEnumerateDense([10, 20, 10, 30])
```
```text
┌─arrayEnumerateDense([10, 20, 10, 30])─┐
│ [1,2,1,3] │
└───────────────────────────────────────┘
```
## arrayIntersect(arr) {#array_functions-arrayintersect}
Takes multiple arrays, returns an array with elements that are present in all source arrays. Elements order in the resulting array is the same as in the first array.
Example:
```sql
SELECT
......@@ -697,16 +727,67 @@ SELECT
└──────────────┴───────────┘
```
## arrayReduce(agg_func, arr1, ...)
## arrayReduce(agg_func, arr1, ...) {#array_functions-arrayreduce}
Applies an aggregate function to array and returns its result.If aggregate function has multiple arguments, then this function can be applied to multiple arrays of the same size.
Applies an aggregate function to array elements and returns its result. The name of the aggregation function is passed as a string in single quotes `'max'`, `'sum'`. When using parametric aggregate functions, the parameter is indicated after the function name in parentheses `'uniqUpTo(6)'`.
arrayReduce('agg_func', arr1, ...) - apply the aggregate function `agg_func` to arrays `arr1...`. If multiple arrays passed, then elements on corresponding positions are passed as multiple arguments to the aggregate function. For example: SELECT arrayReduce('max', [1,2,3]) = 3
Example:
```sql
SELECT arrayReduce('max', [1, 2, 3])
```
## arrayReverse(arr)
```text
┌─arrayReduce('max', [1, 2, 3])─┐
│ 3 │
└───────────────────────────────┘
```
If an aggregate function takes multiple arguments, then this function must be applied to multiple arrays of the same size.
Example:
```sql
SELECT arrayReduce('maxIf', [3, 5], [1, 0])
```
```text
┌─arrayReduce('maxIf', [3, 5], [1, 0])─┐
│ 3 │
└──────────────────────────────────────┘
```
Example with a parametric aggregate function:
```sql
SELECT arrayReduce('uniqUpTo(3)', [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
```
```text
┌─arrayReduce('uniqUpTo(3)', [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])─┐
│ 4 │
└─────────────────────────────────────────────────────────────┘
```
## arrayReverse(arr) {#array_functions-arrayreverse}
Returns an array of the same size as the original array containing the elements in reverse order.
Example:
```sql
SELECT arrayReverse([1, 2, 3])
```
```text
┌─arrayReverse([1, 2, 3])─┐
│ [3,2,1] │
└─────────────────────────┘
```
Returns an array of the same size as the source array, containing the result of inverting all elements of the source array.
## reverse(arr) {#array_functions-reverse}
Synonym for ["arrayReverse"](#array_functions-arrayreverse)
[Original article](https://clickhouse.yandex/docs/en/query_language/functions/array_functions/) <!--hide-->
......@@ -422,7 +422,7 @@ if __name__ == '__main__':
parser.add_argument('--token', help='Github token. Use it to increase github api query limit.')
parser.add_argument('--directory', help='ClickHouse repo directory. Script dir by default.')
parser.add_argument('--state', help='File to dump inner states result.', default='changelog_state.json')
parser.add_argument('--repo', help='ClickHouse repo on GitHub.', default='yandex/ClickHouse')
parser.add_argument('--repo', help='ClickHouse repo on GitHub.', default='ClickHouse/ClickHouse')
parser.add_argument('--max_retry', default=100, type=int,
help='Max number of retries pre api query in case of API rate limit exceeded error.')
parser.add_argument('--retry_timeout', help='Timeout after retry in seconds.', type=int, default=5)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册