提交 ec1a4909 编写于 作者: A Alexander Burmak

Merge remote-tracking branch 'refs/remotes/upstream/master' into vfs_log

dbms/* @ClickHouse/core-assigner
utils/* @ClickHouse/core-assigner
docs/* @ClickHouse/docs
docs/zh/* @ClickHouse/docs-zh
此差异已折叠。
......@@ -11,3 +11,7 @@ ClickHouse is an open-source column-oriented database management system that all
* [Blog](https://clickhouse.yandex/blog/en/) contains various ClickHouse-related articles, as well as announces and reports about events.
* [Contacts](https://clickhouse.yandex/#contacts) can help to get your questions answered if there are any.
* You can also [fill this form](https://forms.yandex.com/surveys/meet-yandex-clickhouse-team/) to meet Yandex ClickHouse team in person.
## Upcoming Events
* [ClickHouse Meetup in San Francisco](https://www.eventbrite.com/e/clickhouse-february-meetup-registration-88496227599) on February 5.
......@@ -13,12 +13,12 @@ set(CMAKE_C_STANDARD_LIBRARIES ${DEFAULT_LIBS})
# Minimal supported SDK version
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mmacosx-version-min=10.14")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mmacosx-version-min=10.14")
set (CMAKE_ASM_FLAGS "${CMAKE_ASM_FLAGS} -mmacosx-version-min=10.14")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mmacosx-version-min=10.15")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mmacosx-version-min=10.15")
set (CMAKE_ASM_FLAGS "${CMAKE_ASM_FLAGS} -mmacosx-version-min=10.15")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -mmacosx-version-min=10.14")
set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -mmacosx-version-min=10.14")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -mmacosx-version-min=10.15")
set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -mmacosx-version-min=10.15")
# Global libraries
......
Subproject commit d175c8bf823e709d570772b038757fadf63bc632
Subproject commit 703bd9caab50b139428cea1aaff9974ebee5742e
......@@ -546,7 +546,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
auto & pipeline = state.io.pipeline;
if (pipeline.getMaxThreads())
num_threads = pipeline.getMaxThreads();
num_threads = std::min(num_threads, pipeline.getMaxThreads());
/// Send header-block, to allow client to prepare output format for data to send.
{
......
......@@ -101,9 +101,6 @@ namespace
public:
void add(const ASTPtr & condition, bool is_restrictive)
{
if (!condition)
return;
if (is_restrictive)
restrictions.push_back(condition);
else
......@@ -139,29 +136,32 @@ void RowPolicyContextFactory::PolicyInfo::setPolicy(const RowPolicyPtr & policy_
for (auto index : ext::range_with_static_cast<ConditionIndex>(0, MAX_CONDITION_INDEX))
{
parsed_conditions[index] = nullptr;
const String & condition = policy->conditions[index];
if (condition.empty())
continue;
auto previous_range = std::pair(std::begin(policy->conditions), std::begin(policy->conditions) + index);
auto previous_it = std::find(previous_range.first, previous_range.second, condition);
if (previous_it != previous_range.second)
{
/// The condition is already parsed before.
parsed_conditions[index] = parsed_conditions[previous_it - previous_range.first];
continue;
}
else
/// Try to parse the condition.
try
{
/// Try to parse the condition.
try
{
ParserExpression parser;
parsed_conditions[index] = parseQuery(parser, condition, 0);
}
catch (...)
{
tryLogCurrentException(
&Poco::Logger::get("RowPolicy"),
String("Could not parse the condition ") + RowPolicy::conditionIndexToString(index) + " of row policy "
+ backQuote(policy->getFullName()));
}
ParserExpression parser;
parsed_conditions[index] = parseQuery(parser, condition, 0);
}
catch (...)
{
tryLogCurrentException(
&Poco::Logger::get("RowPolicy"),
String("Could not parse the condition ") + RowPolicy::conditionIndexToString(index) + " of row policy "
+ backQuote(policy->getFullName()));
}
}
}
......@@ -290,7 +290,8 @@ void RowPolicyContextFactory::mixConditionsForContext(RowPolicyContext & context
auto & mixers = map_of_mixers[std::pair{policy.getDatabase(), policy.getTableName()}];
mixers.policy_ids.push_back(policy_id);
for (auto index : ext::range(0, MAX_CONDITION_INDEX))
mixers.mixers[index].add(info.parsed_conditions[index], policy.isRestrictive());
if (info.parsed_conditions[index])
mixers.mixers[index].add(info.parsed_conditions[index], policy.isRestrictive());
}
}
......
......@@ -301,10 +301,6 @@ struct Codec
: codec_statement(std::move(codec_statement_)),
expected_compression_ratio(expected_compression_ratio_)
{}
Codec()
: Codec(std::string())
{}
};
......@@ -314,23 +310,12 @@ struct CodecTestSequence
std::vector<char> serialized_data;
DataTypePtr data_type;
CodecTestSequence()
: name(),
serialized_data(),
data_type()
{}
CodecTestSequence(std::string name_, std::vector<char> serialized_data_, DataTypePtr data_type_)
: name(name_),
serialized_data(serialized_data_),
data_type(data_type_)
{}
CodecTestSequence(const CodecTestSequence &) = default;
CodecTestSequence & operator=(const CodecTestSequence &) = default;
CodecTestSequence(CodecTestSequence &&) = default;
CodecTestSequence & operator=(CodecTestSequence &&) = default;
CodecTestSequence & append(const CodecTestSequence & other)
{
assert(data_type->equals(*other.data_type));
......@@ -819,24 +804,6 @@ std::vector<CodecTestSequence> generatePyramidOfSequences(const size_t sequences
return sequences;
};
// Just as if all sequences from generatePyramidOfSequences were appended to one-by-one to the first one.
template <typename T, typename Generator>
CodecTestSequence generatePyramidSequence(const size_t sequences_count, Generator && generator, const char* generator_name)
{
CodecTestSequence sequence;
sequence.data_type = makeDataType<T>();
sequence.serialized_data.reserve(sequences_count * sequences_count * sizeof(T));
for (size_t i = 1; i < sequences_count; ++i)
{
std::string name = generator_name + std::string(" from 0 to ") + std::to_string(i);
sequence.append(generateSeq<T>(std::forward<decltype(generator)>(generator), name.c_str(), 0, i));
}
return sequence;
};
// helper macro to produce human-friendly sequence name from generator
#define G(generator) generator, #generator
......@@ -853,17 +820,17 @@ const auto DefaultCodecsToTest = ::testing::Values(
// test cases
///////////////////////////////////////////////////////////////////////////////////////////////////
INSTANTIATE_TEST_CASE_P(Simple,
INSTANTIATE_TEST_SUITE_P(Simple,
CodecTest,
::testing::Combine(
DefaultCodecsToTest,
::testing::Values(
makeSeq<Float64>(1, 2, 3, 5, 7, 11, 13, 17, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97)
)
),
)
);
INSTANTIATE_TEST_CASE_P(SmallSequences,
INSTANTIATE_TEST_SUITE_P(SmallSequences,
CodecTest,
::testing::Combine(
DefaultCodecsToTest,
......@@ -877,10 +844,10 @@ INSTANTIATE_TEST_CASE_P(SmallSequences,
+ generatePyramidOfSequences<UInt32>(42, G(SequentialGenerator(1)))
+ generatePyramidOfSequences<UInt64>(42, G(SequentialGenerator(1)))
)
),
)
);
INSTANTIATE_TEST_CASE_P(Mixed,
INSTANTIATE_TEST_SUITE_P(Mixed,
CodecTest,
::testing::Combine(
DefaultCodecsToTest,
......@@ -894,10 +861,10 @@ INSTANTIATE_TEST_CASE_P(Mixed,
generateSeq<UInt32>(G(MinMaxGenerator()), 1, 5) + generateSeq<UInt32>(G(SequentialGenerator(1)), 1, 1001),
generateSeq<UInt64>(G(MinMaxGenerator()), 1, 5) + generateSeq<UInt64>(G(SequentialGenerator(1)), 1, 1001)
)
),
)
);
INSTANTIATE_TEST_CASE_P(SameValueInt,
INSTANTIATE_TEST_SUITE_P(SameValueInt,
CodecTest,
::testing::Combine(
DefaultCodecsToTest,
......@@ -911,10 +878,10 @@ INSTANTIATE_TEST_CASE_P(SameValueInt,
generateSeq<UInt32>(G(SameValueGenerator(1000))),
generateSeq<UInt64>(G(SameValueGenerator(1000)))
)
),
)
);
INSTANTIATE_TEST_CASE_P(SameNegativeValueInt,
INSTANTIATE_TEST_SUITE_P(SameNegativeValueInt,
CodecTest,
::testing::Combine(
DefaultCodecsToTest,
......@@ -928,10 +895,10 @@ INSTANTIATE_TEST_CASE_P(SameNegativeValueInt,
generateSeq<UInt32>(G(SameValueGenerator(-1000))),
generateSeq<UInt64>(G(SameValueGenerator(-1000)))
)
),
)
);
INSTANTIATE_TEST_CASE_P(SameValueFloat,
INSTANTIATE_TEST_SUITE_P(SameValueFloat,
CodecTest,
::testing::Combine(
::testing::Values(
......@@ -942,10 +909,10 @@ INSTANTIATE_TEST_CASE_P(SameValueFloat,
generateSeq<Float32>(G(SameValueGenerator(M_E))),
generateSeq<Float64>(G(SameValueGenerator(M_E)))
)
),
)
);
INSTANTIATE_TEST_CASE_P(SameNegativeValueFloat,
INSTANTIATE_TEST_SUITE_P(SameNegativeValueFloat,
CodecTest,
::testing::Combine(
::testing::Values(
......@@ -956,10 +923,10 @@ INSTANTIATE_TEST_CASE_P(SameNegativeValueFloat,
generateSeq<Float32>(G(SameValueGenerator(-1 * M_E))),
generateSeq<Float64>(G(SameValueGenerator(-1 * M_E)))
)
),
)
);
INSTANTIATE_TEST_CASE_P(SequentialInt,
INSTANTIATE_TEST_SUITE_P(SequentialInt,
CodecTest,
::testing::Combine(
DefaultCodecsToTest,
......@@ -973,12 +940,12 @@ INSTANTIATE_TEST_CASE_P(SequentialInt,
generateSeq<UInt32>(G(SequentialGenerator(1))),
generateSeq<UInt64>(G(SequentialGenerator(1)))
)
),
)
);
// -1, -2, -3, ... etc for signed
// 0xFF, 0xFE, 0xFD, ... for unsigned
INSTANTIATE_TEST_CASE_P(SequentialReverseInt,
INSTANTIATE_TEST_SUITE_P(SequentialReverseInt,
CodecTest,
::testing::Combine(
DefaultCodecsToTest,
......@@ -992,10 +959,10 @@ INSTANTIATE_TEST_CASE_P(SequentialReverseInt,
generateSeq<UInt32>(G(SequentialGenerator(-1))),
generateSeq<UInt64>(G(SequentialGenerator(-1)))
)
),
)
);
INSTANTIATE_TEST_CASE_P(SequentialFloat,
INSTANTIATE_TEST_SUITE_P(SequentialFloat,
CodecTest,
::testing::Combine(
::testing::Values(
......@@ -1006,10 +973,10 @@ INSTANTIATE_TEST_CASE_P(SequentialFloat,
generateSeq<Float32>(G(SequentialGenerator(M_E))),
generateSeq<Float64>(G(SequentialGenerator(M_E)))
)
),
)
);
INSTANTIATE_TEST_CASE_P(SequentialReverseFloat,
INSTANTIATE_TEST_SUITE_P(SequentialReverseFloat,
CodecTest,
::testing::Combine(
::testing::Values(
......@@ -1020,10 +987,10 @@ INSTANTIATE_TEST_CASE_P(SequentialReverseFloat,
generateSeq<Float32>(G(SequentialGenerator(-1 * M_E))),
generateSeq<Float64>(G(SequentialGenerator(-1 * M_E)))
)
),
)
);
INSTANTIATE_TEST_CASE_P(MonotonicInt,
INSTANTIATE_TEST_SUITE_P(MonotonicInt,
CodecTest,
::testing::Combine(
DefaultCodecsToTest,
......@@ -1037,10 +1004,10 @@ INSTANTIATE_TEST_CASE_P(MonotonicInt,
generateSeq<UInt32>(G(MonotonicGenerator(1, 5))),
generateSeq<UInt64>(G(MonotonicGenerator(1, 5)))
)
),
)
);
INSTANTIATE_TEST_CASE_P(MonotonicReverseInt,
INSTANTIATE_TEST_SUITE_P(MonotonicReverseInt,
CodecTest,
::testing::Combine(
DefaultCodecsToTest,
......@@ -1054,10 +1021,10 @@ INSTANTIATE_TEST_CASE_P(MonotonicReverseInt,
generateSeq<UInt32>(G(MonotonicGenerator(-1, 5))),
generateSeq<UInt64>(G(MonotonicGenerator(-1, 5)))
)
),
)
);
INSTANTIATE_TEST_CASE_P(MonotonicFloat,
INSTANTIATE_TEST_SUITE_P(MonotonicFloat,
CodecTest,
::testing::Combine(
::testing::Values(
......@@ -1067,10 +1034,10 @@ INSTANTIATE_TEST_CASE_P(MonotonicFloat,
generateSeq<Float32>(G(MonotonicGenerator<Float32>(M_E, 5))),
generateSeq<Float64>(G(MonotonicGenerator<Float64>(M_E, 5)))
)
),
)
);
INSTANTIATE_TEST_CASE_P(MonotonicReverseFloat,
INSTANTIATE_TEST_SUITE_P(MonotonicReverseFloat,
CodecTest,
::testing::Combine(
::testing::Values(
......@@ -1080,10 +1047,10 @@ INSTANTIATE_TEST_CASE_P(MonotonicReverseFloat,
generateSeq<Float32>(G(MonotonicGenerator<Float32>(-1 * M_E, 5))),
generateSeq<Float64>(G(MonotonicGenerator<Float64>(-1 * M_E, 5)))
)
),
)
);
INSTANTIATE_TEST_CASE_P(RandomInt,
INSTANTIATE_TEST_SUITE_P(RandomInt,
CodecTest,
::testing::Combine(
DefaultCodecsToTest,
......@@ -1093,10 +1060,10 @@ INSTANTIATE_TEST_CASE_P(RandomInt,
generateSeq<UInt32>(G(RandomGenerator<UInt32>(0, 0, 1000'000'000))),
generateSeq<UInt64>(G(RandomGenerator<UInt64>(0, 0, 1000'000'000)))
)
),
)
);
INSTANTIATE_TEST_CASE_P(RandomishInt,
INSTANTIATE_TEST_SUITE_P(RandomishInt,
CodecTest,
::testing::Combine(
DefaultCodecsToTest,
......@@ -1108,10 +1075,10 @@ INSTANTIATE_TEST_CASE_P(RandomishInt,
generateSeq<Float32>(G(RandomishGenerator)),
generateSeq<Float64>(G(RandomishGenerator))
)
),
)
);
INSTANTIATE_TEST_CASE_P(RandomishFloat,
INSTANTIATE_TEST_SUITE_P(RandomishFloat,
CodecTest,
::testing::Combine(
DefaultCodecsToTest,
......@@ -1119,11 +1086,11 @@ INSTANTIATE_TEST_CASE_P(RandomishFloat,
generateSeq<Float32>(G(RandomishGenerator)),
generateSeq<Float64>(G(RandomishGenerator))
)
),
)
);
// Double delta overflow case, deltas are out of bounds for target type
INSTANTIATE_TEST_CASE_P(OverflowInt,
INSTANTIATE_TEST_SUITE_P(OverflowInt,
CodecTest,
::testing::Combine(
::testing::Values(
......@@ -1136,10 +1103,10 @@ INSTANTIATE_TEST_CASE_P(OverflowInt,
generateSeq<UInt64>(G(MinMaxGenerator())),
generateSeq<Int64>(G(MinMaxGenerator()))
)
),
)
);
INSTANTIATE_TEST_CASE_P(OverflowFloat,
INSTANTIATE_TEST_SUITE_P(OverflowFloat,
CodecTest,
::testing::Combine(
::testing::Values(
......@@ -1152,7 +1119,7 @@ INSTANTIATE_TEST_CASE_P(OverflowFloat,
generateSeq<Float32>(G(FFand0Generator())),
generateSeq<Float64>(G(FFand0Generator()))
)
),
)
);
template <typename ValueType>
......@@ -1189,7 +1156,7 @@ auto DDCompatibilityTestSequence()
#define BIN_STR(x) std::string{x, sizeof(x) - 1}
INSTANTIATE_TEST_CASE_P(DoubleDelta,
INSTANTIATE_TEST_SUITE_P(DoubleDelta,
CodecTest_Compatibility,
::testing::Combine(
::testing::Values(Codec("DoubleDelta")),
......@@ -1227,7 +1194,7 @@ INSTANTIATE_TEST_CASE_P(DoubleDelta,
BIN_STR("\x94\xd4\x00\x00\x00\x98\x01\x00\x00\x08\x00\x33\x00\x00\x00\x2a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x6b\x65\x5f\x50\x34\xff\x4f\xaf\xbc\xe3\x5d\xa3\xd3\xd9\xf6\x1f\xe2\x07\x7c\x47\x20\x67\x48\x07\x47\xff\x47\xf6\xfe\xf8\x00\x00\x70\x6b\xd0\x00\x02\x83\xd9\xfb\x9f\xdc\x1f\xfc\x20\x1e\x80\x00\x22\xc8\xf0\x00\x00\x66\x67\xa0\x00\x02\x00\x3d\x00\x00\x0f\xff\xe8\x00\x00\x7f\xee\xff\xdf\x00\x00\x70\x0d\x7a\x00\x02\x80\x7b\x9f\xf7\x9f\xfb\xc0\x00\x00\xff\xfe\x00\x00\x08\x00\xfc\x00\x00\x00\x04\x00\x06\xbe\x4f\xbf\xff\xd6\x0c\xff\x00\x00\x00\x01\x00\x00\x00\x03\xf8\x00\x00\x00\x08\x00\x00\x00\x0f\xc0\x00\x00\x00\x3f\xff\xff\xff\xfb\xff\xff\xff\xfb\xe0\x00\x00\x01\xc0\x00\x00\x06\x9f\x80\x00\x00\x0a\x00\x00\x00\x34\xf3\xff\xff\xff\xe7\x9f\xff\xff\xff\x7e\x00\x00\x00\x00\xff\xff\xff\xfd\xf0\x00\x00\x00\x07\xff\xff\xff\xf0")
},
})
),
)
);
template <typename ValueType>
......@@ -1263,7 +1230,7 @@ auto GCompatibilityTestSequence()
return generateSeq<ValueType>(G(PrimesWithMultiplierGenerator(intExp10(sizeof(ValueType)))), 0, 42);
}
INSTANTIATE_TEST_CASE_P(Gorilla,
INSTANTIATE_TEST_SUITE_P(Gorilla,
CodecTest_Compatibility,
::testing::Combine(
::testing::Values(Codec("Gorilla")),
......@@ -1301,14 +1268,31 @@ INSTANTIATE_TEST_CASE_P(Gorilla,
BIN_STR("\x95\x91\x00\x00\x00\x50\x01\x00\x00\x08\x00\x2a\x00\x00\x00\x00\xc2\xeb\x0b\x00\x00\x00\x00\xe3\x2b\xa0\xa6\x19\x85\x98\xdc\x45\x74\x74\x43\xc2\x57\x41\x4c\x6e\x42\x79\xd9\x8f\x88\xa5\x05\xf3\xf1\x94\xa3\x62\x1e\x02\xdf\x05\x10\xf1\x15\x97\x35\x2a\x50\x71\x0f\x09\x6c\x89\xf7\x65\x1d\x11\xb7\xcc\x7d\x0b\x70\xc1\x86\x88\x48\x47\x87\xb6\x32\x26\xa7\x86\x87\x88\xd3\x93\x3d\xfc\x28\x68\x85\x05\x0b\x13\xc6\x5f\xd4\x70\xe1\x5e\x76\xf1\x9f\xf3\x33\x2a\x14\x14\x5e\x40\xc1\x5c\x28\x3f\xec\x43\x03\x05\x11\x91\xe8\xeb\x8e\x0a\x0e\x27\x21\x55\xcb\x39\xbc\x6a\xff\x11\x5d\x81\xa0\xa6\x10")
},
})
),
)
);
// These 'tests' try to measure performance of encoding and decoding and hence only make sence to be run locally,
// also they require pretty big data to run agains and generating this data slows down startup of unit test process.
// So un-comment only at your discretion.
//INSTANTIATE_TEST_CASE_P(DoubleDelta,
// Just as if all sequences from generatePyramidOfSequences were appended to one-by-one to the first one.
//template <typename T, typename Generator>
//CodecTestSequence generatePyramidSequence(const size_t sequences_count, Generator && generator, const char* generator_name)
//{
// CodecTestSequence sequence;
// sequence.data_type = makeDataType<T>();
// sequence.serialized_data.reserve(sequences_count * sequences_count * sizeof(T));
//
// for (size_t i = 1; i < sequences_count; ++i)
// {
// std::string name = generator_name + std::string(" from 0 to ") + std::to_string(i);
// sequence.append(generateSeq<T>(std::forward<decltype(generator)>(generator), name.c_str(), 0, i));
// }
//
// return sequence;
//};
//INSTANTIATE_TEST_SUITE_P(DoubleDelta,
// CodecTest_Performance,
// ::testing::Combine(
// ::testing::Values(Codec("DoubleDelta")),
......@@ -1325,7 +1309,7 @@ INSTANTIATE_TEST_CASE_P(Gorilla,
// ),
//);
//INSTANTIATE_TEST_CASE_P(Gorilla,
//INSTANTIATE_TEST_SUITE_P(Gorilla,
// CodecTest_Performance,
// ::testing::Combine(
// ::testing::Values(Codec("Gorilla")),
......
......@@ -121,19 +121,19 @@ struct SortCursorHelper
SortCursorImpl * operator-> () { return impl; }
const SortCursorImpl * operator-> () const { return impl; }
bool greater(const SortCursorHelper & rhs) const
bool ALWAYS_INLINE greater(const SortCursorHelper & rhs) const
{
return derived().greaterAt(rhs.derived(), impl->pos, rhs.impl->pos);
}
/// Inverted so that the priority queue elements are removed in ascending order.
bool operator< (const SortCursorHelper & rhs) const
bool ALWAYS_INLINE operator< (const SortCursorHelper & rhs) const
{
return derived().greater(rhs.derived());
}
/// Checks that all rows in the current block of this cursor are less than or equal to all the rows of the current block of another cursor.
bool totallyLessOrEquals(const SortCursorHelper & rhs) const
bool ALWAYS_INLINE totallyLessOrEquals(const SortCursorHelper & rhs) const
{
if (impl->rows == 0 || rhs.impl->rows == 0)
return false;
......@@ -149,7 +149,7 @@ struct SortCursor : SortCursorHelper<SortCursor>
using SortCursorHelper<SortCursor>::SortCursorHelper;
/// The specified row of this cursor is greater than the specified row of another cursor.
bool greaterAt(const SortCursor & rhs, size_t lhs_pos, size_t rhs_pos) const
bool ALWAYS_INLINE greaterAt(const SortCursor & rhs, size_t lhs_pos, size_t rhs_pos) const
{
for (size_t i = 0; i < impl->sort_columns_size; ++i)
{
......@@ -172,7 +172,7 @@ struct SimpleSortCursor : SortCursorHelper<SimpleSortCursor>
{
using SortCursorHelper<SimpleSortCursor>::SortCursorHelper;
bool greaterAt(const SimpleSortCursor & rhs, size_t lhs_pos, size_t rhs_pos) const
bool ALWAYS_INLINE greaterAt(const SimpleSortCursor & rhs, size_t lhs_pos, size_t rhs_pos) const
{
const auto & desc = impl->desc[0];
int direction = desc.direction;
......@@ -188,7 +188,7 @@ struct SortCursorWithCollation : SortCursorHelper<SortCursorWithCollation>
{
using SortCursorHelper<SortCursorWithCollation>::SortCursorHelper;
bool greaterAt(const SortCursorWithCollation & rhs, size_t lhs_pos, size_t rhs_pos) const
bool ALWAYS_INLINE greaterAt(const SortCursorWithCollation & rhs, size_t lhs_pos, size_t rhs_pos) const
{
for (size_t i = 0; i < impl->sort_columns_size; ++i)
{
......@@ -243,7 +243,7 @@ public:
Cursor & nextChild() { return queue[nextChildIndex()]; }
void next()
void ALWAYS_INLINE next()
{
assert(isValid());
......@@ -283,7 +283,7 @@ private:
/// Cache comparison between first and second child if the order in queue has not been changed.
size_t next_idx = 0;
size_t nextChildIndex()
size_t ALWAYS_INLINE nextChildIndex()
{
if (next_idx == 0)
{
......@@ -300,7 +300,7 @@ private:
/// Why cannot simply use std::priority_queue?
/// - because it doesn't support updating the top element and requires pop and push instead.
/// Also look at "Boost.Heap" library.
void updateTop()
void ALWAYS_INLINE updateTop()
{
size_t size = queue.size();
if (size < 2)
......
......@@ -120,7 +120,7 @@ TEST_P(DecimalUtilsSplitAndCombineTest, getFractionalPart_Decimal128)
}
// Intentionally small values that fit into 32-bit in order to cover Decimal32, Decimal64 and Decimal128 with single set of data.
INSTANTIATE_TEST_CASE_P(Basic,
INSTANTIATE_TEST_SUITE_P(Basic,
DecimalUtilsSplitAndCombineTest,
::testing::ValuesIn(std::initializer_list<DecimalUtilsSplitAndCombineTestParam>{
{
......@@ -168,5 +168,5 @@ INSTANTIATE_TEST_CASE_P(Basic,
89
}
}
}
),);
})
);
......@@ -104,7 +104,7 @@ TEST_P(MostSubtypeTest, getLeastSupertype)
}
}
INSTANTIATE_TEST_CASE_P(data_type,
INSTANTIATE_TEST_SUITE_P(data_type,
LeastSuperTypeTest,
::testing::ValuesIn(
std::initializer_list<TypesTestCase>{
......@@ -159,10 +159,10 @@ INSTANTIATE_TEST_CASE_P(data_type,
{"Tuple(Int64,Int8) Tuple(UInt64)", nullptr},
{"Array(Int64) Array(String)", nullptr},
}
),
)
);
INSTANTIATE_TEST_CASE_P(data_type,
INSTANTIATE_TEST_SUITE_P(data_type,
MostSubtypeTest,
::testing::ValuesIn(
std::initializer_list<TypesTestCase>{
......@@ -210,5 +210,6 @@ INSTANTIATE_TEST_CASE_P(data_type,
{"Int8 String", nullptr},
{"Nothing", nullptr},
{"FixedString(16) FixedString(8) String", nullptr},
}),
}
)
);
......@@ -238,7 +238,7 @@ struct StringSource
size_t getElementSize() const
{
return offsets[row_num] - prev_offset;
return offsets[row_num] - prev_offset - 1;
}
Slice getWhole() const
......
......@@ -40,6 +40,8 @@ public:
bool isVariadic() const override { return true; }
bool isStateful() const override { return true; }
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return false; }
......
......@@ -29,7 +29,7 @@ const std::pair<LogsLevel, Message::Priority> & convertLogLevel(Aws::Utils::Logg
return mapping.at(log_level);
}
class AWSLogger : public Aws::Utils::Logging::LogSystemInterface
class AWSLogger final : public Aws::Utils::Logging::LogSystemInterface
{
public:
~AWSLogger() final = default;
......
......@@ -79,7 +79,7 @@ TEST_P(DateTime64StringParseBestEffortTest, parse)
// YYYY-MM-DD HH:MM:SS.NNNNNNNNN
INSTANTIATE_TEST_CASE_P(Basic,
INSTANTIATE_TEST_SUITE_P(Basic,
DateTime64StringParseTest,
::testing::ValuesIn(std::initializer_list<DateTime64StringsTestParam>{
{
......@@ -130,10 +130,10 @@ INSTANTIATE_TEST_CASE_P(Basic,
1568650817'1ULL,
1
}
}),
})
);
INSTANTIATE_TEST_CASE_P(BestEffort,
INSTANTIATE_TEST_SUITE_P(BestEffort,
DateTime64StringParseBestEffortTest,
::testing::ValuesIn(std::initializer_list<DateTime64StringsTestParam>{
{
......@@ -142,13 +142,13 @@ INSTANTIATE_TEST_CASE_P(BestEffort,
1568650817'123456ULL,
6
}
}),
})
);
// TODO: add negative test cases for invalid strings, verifying that error is reported properly
INSTANTIATE_TEST_CASE_P(Basic,
INSTANTIATE_TEST_SUITE_P(Basic,
DateTime64StringWriteTest,
::testing::ValuesIn(std::initializer_list<DateTime64StringsTestParam>{
{
......@@ -181,6 +181,6 @@ INSTANTIATE_TEST_CASE_P(Basic,
1568650817'001ULL,
3
}
}),
})
);
......@@ -177,7 +177,7 @@ TEST_P(BitIO, WriteAndRead)
}
}
INSTANTIATE_TEST_CASE_P(Simple,
INSTANTIATE_TEST_SUITE_P(Simple,
BitIO,
::testing::ValuesIn(std::initializer_list<TestCaseParameter>{
{
......@@ -221,7 +221,7 @@ INSTANTIATE_TEST_CASE_P(Simple,
"10101001 10111010 11101111 10101111 10111010 11101011 10101001 00000000 " // 256
"10101111 10111010 11101011 10101001 00001111 11110000 00001110 11111111 " // 320
}
}),
})
);
TestCaseParameter primes_case(UInt8 repeat_times, UInt64 pattern)
......@@ -241,12 +241,13 @@ TestCaseParameter primes_case(UInt8 repeat_times, UInt64 pattern)
return TestCaseParameter(test_data);
}
INSTANTIATE_TEST_CASE_P(Primes,
BitIO,
::testing::Values(
primes_case(11, 0xFFFFFFFFFFFFFFFFULL),
primes_case(11, BIT_PATTERN)
),);
INSTANTIATE_TEST_SUITE_P(Primes,
BitIO,
::testing::Values(
primes_case(11, 0xFFFFFFFFFFFFFFFFULL),
primes_case(11, BIT_PATTERN)
)
);
TEST(BitHelpers, maskLowBits)
{
......
......@@ -72,4 +72,6 @@ private:
std::vector<DatabaseAndTableWithAlias> getDatabaseAndTables(const ASTSelectQuery & select_query, const String & current_database);
std::optional<DatabaseAndTableWithAlias> getDatabaseAndTable(const ASTSelectQuery & select, size_t table_number);
using TablesWithColumnNames = std::vector<TableWithColumnNames>;
}
......@@ -26,7 +26,6 @@
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/InJoinSubqueriesPreprocessor.h>
#include <Interpreters/LogicalExpressionsOptimizer.h>
#include <Interpreters/PredicateExpressionsOptimizer.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/Set.h>
#include <Interpreters/AnalyzedJoin.h>
......
#include <Interpreters/ExtractExpressionInfoVisitor.h>
#include <Functions/FunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Parsers/ASTSubquery.h>
namespace DB
{
void ExpressionInfoMatcher::visit(const ASTPtr & ast, Data & data)
{
if (const auto * function = ast->as<ASTFunction>())
visit(*function, ast, data);
else if (const auto * identifier = ast->as<ASTIdentifier>())
visit(*identifier, ast, data);
}
void ExpressionInfoMatcher::visit(const ASTFunction & ast_function, const ASTPtr &, Data & data)
{
if (ast_function.name == "arrayJoin")
data.is_array_join = true;
else if (AggregateFunctionFactory::instance().isAggregateFunctionName(ast_function.name))
data.is_aggregate_function = true;
else
{
const auto & function = FunctionFactory::instance().tryGet(ast_function.name, data.context);
/// Skip lambda, tuple and other special functions
if (function && function->isStateful())
data.is_stateful_function = true;
}
}
void ExpressionInfoMatcher::visit(const ASTIdentifier & identifier, const ASTPtr &, Data & data)
{
if (!identifier.compound())
{
for (size_t index = 0; index < data.tables.size(); ++index)
{
const auto & columns = data.tables[index].columns;
// TODO: make sure no collision ever happens
if (std::find(columns.begin(), columns.end(), identifier.name) != columns.end())
{
data.unique_reference_tables_pos.emplace(index);
break;
}
}
}
else
{
size_t best_table_pos = 0;
if (IdentifierSemantic::chooseTable(identifier, data.tables, best_table_pos))
data.unique_reference_tables_pos.emplace(best_table_pos);
}
}
bool ExpressionInfoMatcher::needChildVisit(const ASTPtr & node, const ASTPtr &)
{
return !node->as<ASTSubquery>();
}
bool hasStatefulFunction(const ASTPtr & node, const Context & context)
{
for (const auto & select_expression : node->children)
{
ExpressionInfoVisitor::Data expression_info{.context = context, .tables = {}};
ExpressionInfoVisitor(expression_info).visit(select_expression);
if (expression_info.is_stateful_function)
return true;
}
return false;
}
}
#pragma once
#include <Parsers/IAST_fwd.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
namespace DB
{
class Context;
struct ExpressionInfoMatcher
{
struct Data
{
const Context & context;
const std::vector<TableWithColumnNames> & tables;
bool is_array_join = false;
bool is_stateful_function = false;
bool is_aggregate_function = false;
std::unordered_set<size_t> unique_reference_tables_pos = {};
};
static void visit(const ASTPtr & ast, Data & data);
static bool needChildVisit(const ASTPtr & node, const ASTPtr &);
static void visit(const ASTFunction & ast_function, const ASTPtr &, Data & data);
static void visit(const ASTIdentifier & identifier, const ASTPtr &, Data & data);
};
using ExpressionInfoVisitor = ConstInDepthNodeVisitor<ExpressionInfoMatcher, true>;
bool hasStatefulFunction(const ASTPtr & node, const Context & context);
}
#include <Interpreters/ExtractFunctionDataVisitor.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
namespace DB
{
void ExtractFunctionData::visit(ASTFunction & function, ASTPtr &)
{
if (AggregateFunctionFactory::instance().isAggregateFunctionName(function.name))
aggregate_functions.emplace_back(&function);
else
functions.emplace_back(&function);
}
}
#pragma once
#include <Parsers/IAST.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Interpreters/InDepthNodeVisitor.h>
namespace DB
{
struct ExtractFunctionData
{
using TypeToVisit = ASTFunction;
std::vector<ASTFunction *> functions;
std::vector<ASTFunction *> aggregate_functions;
void visit(ASTFunction & identifier, ASTPtr &);
};
using ExtractFunctionMatcher = OneTypeMatcher<ExtractFunctionData>;
using ExtractFunctionVisitor = InDepthNodeVisitor<ExtractFunctionMatcher, true>;
}
#include <Interpreters/FindIdentifierBestTableVisitor.h>
#include <Interpreters/IdentifierSemantic.h>
namespace DB
{
FindIdentifierBestTableData::FindIdentifierBestTableData(const std::vector<TableWithColumnNames> & tables_)
: tables(tables_)
{
}
void FindIdentifierBestTableData::visit(ASTIdentifier & identifier, ASTPtr &)
{
const DatabaseAndTableWithAlias * best_table = nullptr;
if (!identifier.compound())
{
for (const auto & table_names : tables)
{
auto & columns = table_names.columns;
if (std::find(columns.begin(), columns.end(), identifier.name) != columns.end())
{
// TODO: make sure no collision ever happens
if (!best_table)
best_table = &table_names.table;
}
}
}
else
{
size_t best_table_pos = 0;
if (IdentifierSemantic::chooseTable(identifier, tables, best_table_pos))
best_table = &tables[best_table_pos].table;
}
identifier_table.emplace_back(&identifier, best_table);
}
}
#pragma once
#include <Parsers/IAST.h>
#include <Parsers/ASTIdentifier.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
namespace DB
{
struct FindIdentifierBestTableData
{
using TypeToVisit = ASTIdentifier;
using IdentifierWithTable = std::pair<ASTIdentifier *, const DatabaseAndTableWithAlias *>;
const std::vector<TableWithColumnNames> & tables;
std::vector<IdentifierWithTable> identifier_table;
FindIdentifierBestTableData(const std::vector<TableWithColumnNames> & tables_);
void visit(ASTIdentifier & identifier, ASTPtr &);
};
using FindIdentifierBestTableMatcher = OneTypeMatcher<FindIdentifierBestTableData>;
using FindIdentifierBestTableVisitor = InDepthNodeVisitor<FindIdentifierBestTableMatcher, true>;
}
......@@ -59,7 +59,13 @@ public:
using Data = Data_;
using TypeToVisit = typename Data::TypeToVisit;
static bool needChildVisit(const ASTPtr &, const ASTPtr &) { return visit_children; }
static bool needChildVisit(const ASTPtr & node, const ASTPtr &)
{
if (node && node->as<TypeToVisit>())
return visit_children;
return true;
}
static void visit(T & ast, Data & data)
{
......
......@@ -488,8 +488,8 @@ BlockInputStreams InterpreterSelectQuery::executeWithMultipleStreams(QueryPipeli
QueryPipeline InterpreterSelectQuery::executeWithProcessors()
{
QueryPipeline query_pipeline;
query_pipeline.setMaxThreads(context->getSettingsRef().max_threads);
executeImpl(query_pipeline, input, query_pipeline);
query_pipeline.setMaxThreads(max_streams);
query_pipeline.addInterpreterContext(context);
query_pipeline.addStorageHolder(storage);
return query_pipeline;
......@@ -503,28 +503,31 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
/// Do all AST changes here, because actions from analysis_result will be used later in readImpl.
/// PREWHERE optimization.
/// Turn off, if the table filter (row-level security) is applied.
if (storage && !context->getRowPolicy()->getCondition(storage->getDatabaseName(), storage->getTableName(), RowPolicy::SELECT_FILTER))
if (storage)
{
query_analyzer->makeSetsForIndex(query.where());
query_analyzer->makeSetsForIndex(query.prewhere());
auto optimize_prewhere = [&](auto & merge_tree)
/// PREWHERE optimization.
/// Turn off, if the table filter (row-level security) is applied.
if (!context->getRowPolicy()->getCondition(storage->getDatabaseName(), storage->getTableName(), RowPolicy::SELECT_FILTER))
{
SelectQueryInfo current_info;
current_info.query = query_ptr;
current_info.syntax_analyzer_result = syntax_analyzer_result;
current_info.sets = query_analyzer->getPreparedSets();
/// Try transferring some condition from WHERE to PREWHERE if enabled and viable
if (settings.optimize_move_to_prewhere && query.where() && !query.prewhere() && !query.final())
MergeTreeWhereOptimizer{current_info, *context, merge_tree,
syntax_analyzer_result->requiredSourceColumns(), log};
};
if (const auto * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()))
optimize_prewhere(*merge_tree_data);
auto optimize_prewhere = [&](auto & merge_tree)
{
SelectQueryInfo current_info;
current_info.query = query_ptr;
current_info.syntax_analyzer_result = syntax_analyzer_result;
current_info.sets = query_analyzer->getPreparedSets();
/// Try transferring some condition from WHERE to PREWHERE if enabled and viable
if (settings.optimize_move_to_prewhere && query.where() && !query.prewhere() && !query.final())
MergeTreeWhereOptimizer{current_info, *context, merge_tree,
syntax_analyzer_result->requiredSourceColumns(), log};
};
if (const auto * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()))
optimize_prewhere(*merge_tree_data);
}
}
if (storage && !options.only_analyze)
......@@ -1180,7 +1183,6 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (expressions.second_stage)
{
bool need_second_distinct_pass = false;
bool need_merge_streams = false;
if (expressions.need_aggregate)
{
......@@ -1241,13 +1243,11 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
executePreLimit(pipeline);
}
if (need_second_distinct_pass
|| query.limitLength()
|| query.limitBy()
|| pipeline.hasDelayedStream())
{
need_merge_streams = true;
}
bool need_merge_streams = need_second_distinct_pass || query.limitLength() || query.limitBy();
if constexpr (!pipeline_with_processors)
if (pipeline.hasDelayedStream())
need_merge_streams = true;
if (need_merge_streams)
{
......@@ -1793,6 +1793,9 @@ void InterpreterSelectQuery::executeFetchColumns(
// pipes[i].pinSources(i);
// }
for (auto & pipe : pipes)
pipe.enableQuota();
pipeline.init(std::move(pipes));
}
else
......@@ -1930,7 +1933,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
* 1. Parallel aggregation is done, and the results should be merged in parallel.
* 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way.
*/
bool allow_to_use_two_level_group_by = pipeline.getNumMainStreams() > 1 || settings.max_bytes_before_external_group_by != 0;
bool allow_to_use_two_level_group_by = pipeline.getNumStreams() > 1 || settings.max_bytes_before_external_group_by != 0;
Aggregator::Params params(header_before_aggregation, keys, aggregates,
overflow_row, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
......@@ -1944,12 +1947,12 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
pipeline.dropTotalsIfHas();
/// If there are several sources, then we perform parallel aggregation
if (pipeline.getNumMainStreams() > 1)
if (pipeline.getNumStreams() > 1)
{
/// Add resize transform to uniformly distribute data between aggregating streams.
pipeline.resize(pipeline.getNumMainStreams(), true);
pipeline.resize(pipeline.getNumStreams(), true);
auto many_data = std::make_shared<ManyAggregatedData>(pipeline.getNumMainStreams());
auto many_data = std::make_shared<ManyAggregatedData>(pipeline.getNumStreams());
auto merge_threads = settings.aggregation_memory_efficient_merge_threads
? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: static_cast<size_t>(settings.max_threads);
......@@ -1971,6 +1974,8 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
return std::make_shared<AggregatingTransform>(header, transform_params);
});
}
pipeline.enableQuotaForCurrentStreams();
}
......@@ -2084,6 +2089,8 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPipeline & pipeline, bo
pipeline.addPipe(std::move(pipe));
}
pipeline.enableQuotaForCurrentStreams();
}
......@@ -2317,6 +2324,8 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSorting
pipeline.addPipe({ std::move(transform) });
}
pipeline.enableQuotaForCurrentStreams();
if (need_finish_sorting)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
......@@ -2342,9 +2351,6 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSorting
return std::make_shared<PartialSortingTransform>(header, output_order_descr, limit, do_count_rows);
});
/// If there are several streams, we merge them into one
pipeline.resize(1);
/// Merge the sorted blocks.
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
......@@ -2353,9 +2359,12 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSorting
return std::make_shared<MergeSortingTransform>(
header, output_order_descr, settings.max_block_size, limit,
settings.max_bytes_before_remerge_sort,
settings.max_bytes_before_remerge_sort / pipeline.getNumStreams(),
settings.max_bytes_before_external_sort, context->getTemporaryPath(), settings.min_free_disk_space_for_temporary_data);
});
/// If there are several streams, we merge them into one
executeMergeSorted(pipeline, output_order_descr, limit);
}
......@@ -2417,6 +2426,8 @@ void InterpreterSelectQuery::executeMergeSorted(QueryPipeline & pipeline, const
settings.max_block_size, limit);
pipeline.addPipe({ std::move(transform) });
pipeline.enableQuotaForCurrentStreams();
}
}
......@@ -2794,11 +2805,7 @@ void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(Pipeline & pipeline
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, SubqueriesForSets & subqueries_for_sets)
{
if (query_info.input_sorting_info)
{
if (pipeline.hasDelayedStream())
throw Exception("Using read in order optimization, but has delayed stream in pipeline", ErrorCodes::LOGICAL_ERROR);
executeMergeSorted(pipeline, query_info.input_sorting_info->order_key_prefix_descr, 0);
}
const Settings & settings = context->getSettingsRef();
......@@ -2815,7 +2822,7 @@ void InterpreterSelectQuery::unifyStreams(Pipeline & pipeline, Block header)
{
/// Unify streams in case they have different headers.
/// TODO: remove previos addition of _dummy column.
/// TODO: remove previous addition of _dummy column.
if (header.columns() > 1 && header.has("_dummy"))
header.erase("_dummy");
......
#pragma once
#include "DatabaseAndTableWithAlias.h"
#include <Parsers/ASTSelectQuery.h>
#include <map>
#include <Interpreters/DatabaseAndTableWithAlias.h>
namespace DB
{
class ASTIdentifier;
class ASTSubquery;
class Context;
struct Settings;
/** This class provides functions for Push-Down predicate expressions
*
* The Example:
* - Query before optimization :
* SELECT id_1, name_1 FROM (SELECT id_1, name_1 FROM table_a UNION ALL SELECT id_2, name_2 FROM table_b)
* WHERE id_1 = 1
* - Query after optimization :
* SELECT id_1, name_1 FROM (SELECT id_1, name_1 FROM table_a WHERE id_1 = 1 UNION ALL SELECT id_2, name_2 FROM table_b WHERE id_2 = 1)
* WHERE id_1 = 1
/** Predicate optimization based on rewriting ast rules
* For more details : https://github.com/ClickHouse/ClickHouse/pull/2015#issuecomment-374283452
* The optimizer does two different optimizations
* - Move predicates from having to where
* - Push the predicate down from the current query to the having of the subquery
*/
class PredicateExpressionsOptimizer
{
using ProjectionWithAlias = std::pair<ASTPtr, String>;
using SubqueriesProjectionColumns = std::map<ASTSelectQuery *, std::vector<ProjectionWithAlias>>;
using IdentifierWithQualifier = std::pair<ASTIdentifier *, String>;
public:
PredicateExpressionsOptimizer(const Context & context_, const TablesWithColumnNames & tables_with_columns_, const Settings & settings_);
bool optimize(ASTSelectQuery & select_query);
private:
/// Extracts settings, mostly to show which are used and which are not.
struct ExtractedSettings
{
/// QueryNormalizer settings
const UInt64 max_ast_depth;
const UInt64 max_expanded_ast_elements;
const String count_distinct_implementation;
/// for PredicateExpressionsOptimizer
const bool enable_optimize_predicate_expression;
const bool enable_optimize_predicate_expression_to_final_subquery;
const bool join_use_nulls;
template<typename T>
ExtractedSettings(const T & settings_)
: max_ast_depth(settings_.max_ast_depth),
max_expanded_ast_elements(settings_.max_expanded_ast_elements),
count_distinct_implementation(settings_.count_distinct_implementation),
enable_optimize_predicate_expression(settings_.enable_optimize_predicate_expression),
enable_optimize_predicate_expression_to_final_subquery(settings_.enable_optimize_predicate_expression_to_final_subquery),
join_use_nulls(settings_.join_use_nulls)
: enable_optimize_predicate_expression(settings_.enable_optimize_predicate_expression),
enable_optimize_predicate_expression_to_final_subquery(settings_.enable_optimize_predicate_expression_to_final_subquery)
{}
};
public:
PredicateExpressionsOptimizer(ASTSelectQuery * ast_select_, ExtractedSettings && settings_, const Context & context_);
bool optimize();
private:
ASTSelectQuery * ast_select;
const ExtractedSettings settings;
const Context & context;
const std::vector<TableWithColumnNames> & tables_with_columns;
enum OptimizeKind
{
NONE,
PUSH_TO_PREWHERE,
PUSH_TO_WHERE,
PUSH_TO_HAVING,
};
bool isArrayJoinFunction(const ASTPtr & node);
std::vector<ASTPtr> splitConjunctionPredicate(const ASTPtr & predicate_expression);
std::vector<IdentifierWithQualifier> getDependenciesAndQualifiers(ASTPtr & expression,
std::vector<TableWithColumnNames> & tables_with_aliases);
bool optimizeExpression(const ASTPtr & outer_expression, ASTSelectQuery * subquery, ASTSelectQuery::Expression expr);
bool optimizeImpl(const ASTPtr & outer_expression, const SubqueriesProjectionColumns & subqueries_projection_columns, OptimizeKind optimize_kind);
bool allowPushDown(
const ASTSelectQuery * subquery,
const ASTPtr & outer_predicate,
const std::vector<ProjectionWithAlias> & subquery_projection_columns,
const std::vector<IdentifierWithQualifier> & outer_predicate_dependencies,
OptimizeKind & optimize_kind);
bool checkDependencies(
const std::vector<ProjectionWithAlias> & projection_columns,
const std::vector<IdentifierWithQualifier> & dependencies,
OptimizeKind & optimize_kind);
void setNewAliasesForInnerPredicate(const std::vector<ProjectionWithAlias> & projection_columns,
const std::vector<IdentifierWithQualifier> & inner_predicate_dependencies);
SubqueriesProjectionColumns getAllSubqueryProjectionColumns();
const ExtractedSettings settings;
void getSubqueryProjectionColumns(const ASTPtr & subquery, SubqueriesProjectionColumns & all_subquery_projection_columns);
std::vector<ASTs> extractTablesPredicates(const ASTPtr & where, const ASTPtr & prewhere);
ASTs getSelectQueryProjectionColumns(ASTPtr & ast);
bool tryRewritePredicatesToTables(ASTs & tables_element, const std::vector<ASTs> & tables_predicates);
ASTs evaluateAsterisk(ASTSelectQuery * select_query, const ASTPtr & asterisk);
bool tryRewritePredicatesToTable(ASTPtr & table_element, const ASTs & table_predicates, const Names & table_column) const;
void cleanExpressionAlias(ASTPtr & expression);
bool tryMovePredicatesFromHavingToWhere(ASTSelectQuery & select_query);
};
}
#include <Interpreters/PredicateRewriteVisitor.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTAsterisk.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTColumnsMatcher.h>
#include <Parsers/ASTQualifiedAsterisk.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/getTableExpressions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/ExtractExpressionInfoVisitor.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
namespace DB
{
PredicateRewriteVisitorData::PredicateRewriteVisitorData(
const Context & context_, const ASTs & predicates_, const Names & column_names_, bool optimize_final_)
: context(context_), predicates(predicates_), column_names(column_names_), optimize_final(optimize_final_)
{
}
void PredicateRewriteVisitorData::visit(ASTSelectWithUnionQuery & union_select_query, ASTPtr &)
{
auto & internal_select_list = union_select_query.list_of_selects->children;
if (internal_select_list.size() > 0)
visitFirstInternalSelect(*internal_select_list[0]->as<ASTSelectQuery>(), internal_select_list[0]);
for (size_t index = 1; index < internal_select_list.size(); ++index)
visitOtherInternalSelect(*internal_select_list[index]->as<ASTSelectQuery>(), internal_select_list[index]);
}
void PredicateRewriteVisitorData::visitFirstInternalSelect(ASTSelectQuery & select_query, ASTPtr &)
{
is_rewrite |= rewriteSubquery(select_query, column_names, column_names);
}
void PredicateRewriteVisitorData::visitOtherInternalSelect(ASTSelectQuery & select_query, ASTPtr &)
{
/// For non first select, its alias has no more significance, so we can set a temporary alias for them
ASTPtr temp_internal_select = select_query.clone();
ASTSelectQuery * temp_select_query = temp_internal_select->as<ASTSelectQuery>();
size_t alias_index = 0;
for (auto & ref_select : temp_select_query->refSelect()->children)
{
if (!ref_select->as<ASTAsterisk>() && !ref_select->as<ASTQualifiedAsterisk>() && !ref_select->as<ASTColumnsMatcher>() &&
!ref_select->as<ASTIdentifier>())
{
if (const auto & alias = ref_select->tryGetAlias(); alias.empty())
ref_select->setAlias("--predicate_optimizer_" + toString(alias_index++));
}
}
const Names & internal_columns = InterpreterSelectQuery(
temp_internal_select, context, SelectQueryOptions().analyze()).getSampleBlock().getNames();
if (rewriteSubquery(*temp_select_query, column_names, internal_columns))
{
is_rewrite |= true;
select_query.setExpression(ASTSelectQuery::Expression::SELECT, std::move(temp_select_query->refSelect()));
select_query.setExpression(ASTSelectQuery::Expression::HAVING, std::move(temp_select_query->refHaving()));
}
}
static void cleanAliasAndCollectIdentifiers(ASTPtr & predicate, std::vector<ASTIdentifier *> & identifiers)
{
/// Skip WHERE x in (SELECT ...)
if (!predicate->as<ASTSubquery>())
{
for (auto & children : predicate->children)
cleanAliasAndCollectIdentifiers(children, identifiers);
}
if (const auto alias = predicate->tryGetAlias(); !alias.empty())
predicate->setAlias("");
if (ASTIdentifier * identifier = predicate->as<ASTIdentifier>())
identifiers.emplace_back(identifier);
}
bool PredicateRewriteVisitorData::rewriteSubquery(ASTSelectQuery & subquery, const Names & outer_columns, const Names & inner_columns)
{
if ((!optimize_final && subquery.final())
|| subquery.with() || subquery.withFill()
|| subquery.limitBy() || subquery.limitLength()
|| hasStatefulFunction(subquery.select(), context))
return false;
for (const auto & predicate : predicates)
{
std::vector<ASTIdentifier *> identifiers;
ASTPtr optimize_predicate = predicate->clone();
cleanAliasAndCollectIdentifiers(optimize_predicate, identifiers);
for (size_t index = 0; index < identifiers.size(); ++index)
{
const auto & column_name = identifiers[index]->shortName();
const auto & outer_column_iterator = std::find(outer_columns.begin(), outer_columns.end(), column_name);
/// For lambda functions, we can't always find them in the list of columns
/// For example: SELECT * FROM system.one WHERE arrayMap(x -> x, [dummy]) = [0]
if (outer_column_iterator != outer_columns.end())
identifiers[index]->setShortName(inner_columns[outer_column_iterator - outer_columns.begin()]);
}
/// We only need to push all the predicates to subquery having
/// The subquery optimizer will move the appropriate predicates from having to where
subquery.setExpression(ASTSelectQuery::Expression::HAVING,
subquery.having() ? makeASTFunction("and", optimize_predicate, subquery.having()) : optimize_predicate);
}
return true;
}
}
#pragma once
#include <Parsers/IAST.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Interpreters/InDepthNodeVisitor.h>
namespace DB
{
class PredicateRewriteVisitorData
{
public:
bool is_rewrite = false;
using TypeToVisit = ASTSelectWithUnionQuery;
void visit(ASTSelectWithUnionQuery & union_select_query, ASTPtr &);
PredicateRewriteVisitorData(const Context & context_, const ASTs & predicates_, const Names & column_names_, bool optimize_final_);
private:
const Context & context;
const ASTs & predicates;
const Names & column_names;
bool optimize_final;
void visitFirstInternalSelect(ASTSelectQuery & select_query, ASTPtr &);
void visitOtherInternalSelect(ASTSelectQuery & select_query, ASTPtr &);
bool rewriteSubquery(ASTSelectQuery & subquery, const Names & outer_columns, const Names & inner_columns);
};
using PredicateRewriteMatcher = OneTypeMatcher<PredicateRewriteVisitorData, false>;
using PredicateRewriteVisitor = InDepthNodeVisitor<PredicateRewriteMatcher, true>;
}
......@@ -920,6 +920,9 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
if (select_query)
{
/// Push the predicate expression down to the subqueries.
result.rewrite_subqueries = PredicateExpressionsOptimizer(context, tables_with_columns, settings).optimize(*select_query);
/// GROUP BY injective function elimination.
optimizeGroupBy(select_query, source_columns_set, context);
......@@ -935,9 +938,6 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
/// array_join_alias_to_name, array_join_result_to_source.
getArrayJoinedColumns(query, result, select_query, result.source_columns, source_columns_set);
/// Push the predicate expression down to the subqueries.
result.rewrite_subqueries = PredicateExpressionsOptimizer(select_query, settings, context).optimize();
setJoinStrictness(*select_query, settings.join_default_strictness, settings.any_join_distinct_right_table_keys,
result.analyzed_join->table_join);
collectJoinedColumns(*result.analyzed_join, *select_query, tables_with_columns, result.aliases);
......
#include <Processors/DelayedPortsProcessor.h>
namespace DB
{
DelayedPortsProcessor::DelayedPortsProcessor(const Block & header, size_t num_ports, const PortNumbers & delayed_ports)
: IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header))
, num_delayed(delayed_ports.size())
{
port_pairs.resize(num_ports);
auto input_it = inputs.begin();
auto output_it = outputs.begin();
for (size_t i = 0; i < num_ports; ++i)
{
port_pairs[i].input_port = &*input_it;
port_pairs[i].output_port = &*output_it;
++input_it;
++output_it;
}
for (auto & delayed : delayed_ports)
port_pairs[delayed].is_delayed = true;
}
bool DelayedPortsProcessor::processPair(PortsPair & pair)
{
auto finish = [&]()
{
if (!pair.is_finished)
{
pair.is_finished = true;
++num_finished;
}
};
if (pair.output_port->isFinished())
{
pair.input_port->close();
finish();
return false;
}
if (pair.input_port->isFinished())
{
pair.output_port->finish();
finish();
return false;
}
if (!pair.output_port->canPush())
return false;
pair.input_port->setNeeded();
if (pair.input_port->hasData())
pair.output_port->pushData(pair.input_port->pullData());
return true;
}
IProcessor::Status DelayedPortsProcessor::prepare(const PortNumbers & updated_inputs, const PortNumbers & updated_outputs)
{
bool skip_delayed = (num_finished + num_delayed) < port_pairs.size();
bool need_data = false;
for (auto & output_number : updated_outputs)
{
if (!skip_delayed || !port_pairs[output_number].is_delayed)
need_data = processPair(port_pairs[output_number]) || need_data;
}
for (auto & input_number : updated_inputs)
{
if (!skip_delayed || !port_pairs[input_number].is_delayed)
need_data = processPair(port_pairs[input_number]) || need_data;
}
/// In case if main streams are finished at current iteration, start processing delayed streams.
if (skip_delayed && (num_finished + num_delayed) >= port_pairs.size())
{
for (auto & pair : port_pairs)
if (pair.is_delayed)
need_data = processPair(pair) || need_data;
}
if (num_finished == port_pairs.size())
return Status::Finished;
if (need_data)
return Status::NeedData;
return Status::PortFull;
}
}
#pragma once
#include <Processors/IProcessor.h>
namespace DB
{
/// Processor with N inputs and N outputs. Only moves data from i-th input to i-th output as is.
/// Some ports are delayed. Delayed ports are processed after other outputs are all finished.
/// Data between ports is not mixed. It is important because this processor can be used before MergingSortedTransform.
/// Delayed ports are appeared after joins, when some non-matched data need to be processed at the end.
class DelayedPortsProcessor : public IProcessor
{
public:
DelayedPortsProcessor(const Block & header, size_t num_ports, const PortNumbers & delayed_ports);
String getName() const override { return "DelayedPorts"; }
Status prepare(const PortNumbers &, const PortNumbers &) override;
private:
struct PortsPair
{
InputPort * input_port = nullptr;
OutputPort * output_port = nullptr;
bool is_delayed = false;
bool is_finished = false;
};
std::vector<PortsPair> port_pairs;
size_t num_delayed;
size_t num_finished = 0;
bool processPair(PortsPair & pair);
};
}
......@@ -64,13 +64,6 @@ bool PipelineExecutor::addEdges(UInt64 node)
throwUnknownProcessor(to_proc, cur, true);
UInt64 proc_num = it->second;
for (auto & edge : edges)
{
if (edge.to == proc_num)
throw Exception("Multiple edges are not allowed for the same processors.", ErrorCodes::LOGICAL_ERROR);
}
auto & edge = edges.emplace_back(proc_num, is_backward, input_port_number, output_port_number, update_list);
from_port.setUpdateInfo(&edge.update_info);
......@@ -177,10 +170,20 @@ void PipelineExecutor::addJob(ExecutionState * execution_state)
execution_state->job = std::move(job);
}
void PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid)
bool PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid)
{
auto & cur_node = graph[pid];
auto new_processors = cur_node.processor->expandPipeline();
Processors new_processors;
try
{
new_processors = cur_node.processor->expandPipeline();
}
catch (...)
{
cur_node.execution_state->exception = std::current_exception();
return false;
}
for (const auto & processor : new_processors)
{
......@@ -220,20 +223,22 @@ void PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid)
}
}
}
return true;
}
bool PipelineExecutor::tryAddProcessorToStackIfUpdated(Edge & edge, Stack & stack)
bool PipelineExecutor::tryAddProcessorToStackIfUpdated(Edge & edge, Queue & queue, size_t thread_number)
{
/// In this method we have ownership on edge, but node can be concurrently accessed.
auto & node = graph[edge.to];
std::lock_guard guard(node.status_mutex);
std::unique_lock lock(node.status_mutex);
ExecStatus status = node.status;
if (status == ExecStatus::Finished)
return false;
return true;
if (edge.backward)
node.updated_output_ports.push_back(edge.output_port_number);
......@@ -243,14 +248,13 @@ bool PipelineExecutor::tryAddProcessorToStackIfUpdated(Edge & edge, Stack & stac
if (status == ExecStatus::Idle)
{
node.status = ExecStatus::Preparing;
stack.push(edge.to);
return true;
return prepareProcessor(edge.to, thread_number, queue, std::move(lock));
}
return false;
return true;
}
bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & parents, size_t thread_number, bool async)
bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue & queue, std::unique_lock<std::mutex> node_lock)
{
/// In this method we have ownership on node.
auto & node = graph[pid];
......@@ -264,14 +268,22 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & pa
{
/// Stopwatch watch;
std::lock_guard guard(node.status_mutex);
std::unique_lock<std::mutex> lock(std::move(node_lock));
auto status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports);
node.updated_input_ports.clear();
node.updated_output_ports.clear();
try
{
node.last_processor_status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports);
}
catch (...)
{
node.execution_state->exception = std::current_exception();
return false;
}
/// node.execution_state->preparation_time_ns += watch.elapsed();
node.last_processor_status = status;
node.updated_input_ports.clear();
node.updated_output_ports.clear();
switch (node.last_processor_status)
{
......@@ -291,7 +303,8 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & pa
case IProcessor::Status::Ready:
{
node.status = ExecStatus::Executing;
return true;
queue.push(node.execution_state.get());
break;
}
case IProcessor::Status::Async:
{
......@@ -303,9 +316,7 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & pa
}
case IProcessor::Status::Wait:
{
if (!async)
throw Exception("Processor returned status Wait before Async.", ErrorCodes::LOGICAL_ERROR);
break;
throw Exception("Wait is temporary not supported.", ErrorCodes::LOGICAL_ERROR);
}
case IProcessor::Status::ExpandPipeline:
{
......@@ -337,18 +348,26 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & pa
if (need_traverse)
{
for (auto & edge : updated_back_edges)
tryAddProcessorToStackIfUpdated(*edge, parents);
for (auto & edge : updated_direct_edges)
tryAddProcessorToStackIfUpdated(*edge, children);
{
if (!tryAddProcessorToStackIfUpdated(*edge, queue, thread_number))
return false;
}
for (auto & edge : updated_back_edges)
{
if (!tryAddProcessorToStackIfUpdated(*edge, queue, thread_number))
return false;
}
}
if (need_expand_pipeline)
{
Stack stack;
executor_contexts[thread_number]->task_list.emplace_back(
node.execution_state.get(),
&parents
&stack
);
ExpandPipelineTask * desired = &executor_contexts[thread_number]->task_list.back();
......@@ -356,20 +375,32 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & pa
while (!expand_pipeline_task.compare_exchange_strong(expected, desired))
{
doExpandPipeline(expected, true);
if (!doExpandPipeline(expected, true))
return false;
expected = nullptr;
}
doExpandPipeline(desired, true);
if (!doExpandPipeline(desired, true))
return false;
/// Add itself back to be prepared again.
children.push(pid);
stack.push(pid);
while (!stack.empty())
{
auto item = stack.top();
if (!prepareProcessor(item, thread_number, queue, std::unique_lock<std::mutex>(graph[item].status_mutex)))
return false;
stack.pop();
}
}
return false;
return true;
}
void PipelineExecutor::doExpandPipeline(ExpandPipelineTask * task, bool processing)
bool PipelineExecutor::doExpandPipeline(ExpandPipelineTask * task, bool processing)
{
std::unique_lock lock(task->mutex);
......@@ -381,16 +412,20 @@ void PipelineExecutor::doExpandPipeline(ExpandPipelineTask * task, bool processi
return task->num_waiting_processing_threads >= num_processing_executors || expand_pipeline_task != task;
});
bool result = true;
/// After condvar.wait() task may point to trash. Can change it only if it is still in expand_pipeline_task.
if (expand_pipeline_task == task)
{
expandPipeline(*task->stack, task->node_to_expand->processors_id);
result = expandPipeline(*task->stack, task->node_to_expand->processors_id);
expand_pipeline_task = nullptr;
lock.unlock();
task->condvar.notify_all();
}
return result;
}
void PipelineExecutor::cancel()
......@@ -459,49 +494,31 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
#if !defined(__APPLE__) && !defined(__FreeBSD__)
/// Specify CPU core for thread if can.
/// It may reduce the number of context swithches.
cpu_set_t cpu_set;
CPU_ZERO(&cpu_set);
CPU_SET(thread_num, &cpu_set);
if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) == -1)
LOG_TRACE(log, "Cannot set affinity for thread " << num_threads);
/*
if (num_threads > 1)
{
cpu_set_t cpu_set;
CPU_ZERO(&cpu_set);
CPU_SET(thread_num, &cpu_set);
if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) == -1)
LOG_TRACE(log, "Cannot set affinity for thread " << num_threads);
}
*/
#endif
UInt64 total_time_ns = 0;
UInt64 execution_time_ns = 0;
UInt64 processing_time_ns = 0;
UInt64 wait_time_ns = 0;
// UInt64 total_time_ns = 0;
// UInt64 execution_time_ns = 0;
// UInt64 processing_time_ns = 0;
// UInt64 wait_time_ns = 0;
Stopwatch total_time_watch;
// Stopwatch total_time_watch;
ExecutionState * state = nullptr;
auto prepare_processor = [&](UInt64 pid, Stack & children, Stack & parents)
auto prepare_processor = [&](UInt64 pid, Queue & queue)
{
try
{
return prepareProcessor(pid, children, parents, thread_num, false);
}
catch (...)
{
graph[pid].execution_state->exception = std::current_exception();
if (!prepareProcessor(pid, thread_num, queue, std::unique_lock<std::mutex>(graph[pid].status_mutex)))
finish();
}
return false;
};
using Queue = std::queue<ExecutionState *>;
auto prepare_all_processors = [&](Queue & queue, Stack & stack, Stack & children, Stack & parents)
{
while (!stack.empty() && !finished)
{
auto current_processor = stack.top();
stack.pop();
if (prepare_processor(current_processor, children, parents))
queue.push(graph[current_processor].execution_state.get());
}
};
auto wake_up_executor = [&](size_t executor)
......@@ -511,63 +528,6 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
executor_contexts[executor]->condvar.notify_one();
};
auto process_pinned_tasks = [&](Queue & queue)
{
Queue tmp_queue;
struct PinnedTask
{
ExecutionState * task;
size_t thread_num;
};
std::stack<PinnedTask> pinned_tasks;
while (!queue.empty())
{
auto task = queue.front();
queue.pop();
auto stream = task->processor->getStream();
if (stream != IProcessor::NO_STREAM)
pinned_tasks.push({.task = task, .thread_num = stream % num_threads});
else
tmp_queue.push(task);
}
if (!pinned_tasks.empty())
{
std::stack<size_t> threads_to_wake;
{
std::lock_guard lock(task_queue_mutex);
while (!pinned_tasks.empty())
{
auto & pinned_task = pinned_tasks.top();
auto thread = pinned_task.thread_num;
executor_contexts[thread]->pinned_tasks.push(pinned_task.task);
pinned_tasks.pop();
if (threads_queue.has(thread))
{
threads_queue.pop(thread);
threads_to_wake.push(thread);
}
}
}
while (!threads_to_wake.empty())
{
wake_up_executor(threads_to_wake.top());
threads_to_wake.pop();
}
}
queue.swap(tmp_queue);
};
while (!finished)
{
/// First, find any processor to execute.
......@@ -577,20 +537,11 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
{
std::unique_lock lock(task_queue_mutex);
if (!executor_contexts[thread_num]->pinned_tasks.empty())
{
state = executor_contexts[thread_num]->pinned_tasks.front();
executor_contexts[thread_num]->pinned_tasks.pop();
break;
}
if (!task_queue.empty())
{
state = task_queue.front();
task_queue.pop();
state = task_queue.pop(thread_num);
if (!task_queue.empty() && !threads_queue.empty())
if (!task_queue.empty() && !threads_queue.empty() /*&& task_queue.quota() > threads_queue.size()*/)
{
auto thread_to_wake = threads_queue.pop_any();
lock.unlock();
......@@ -648,8 +599,6 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
/// Try to execute neighbour processor.
{
Stack children;
Stack parents;
Queue queue;
++num_processing_executors;
......@@ -657,36 +606,16 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
doExpandPipeline(task, true);
/// Execute again if can.
if (!prepare_processor(state->processors_id, children, parents))
state = nullptr;
/// Process all neighbours. Children will be on the top of stack, then parents.
prepare_all_processors(queue, children, children, parents);
process_pinned_tasks(queue);
prepare_processor(state->processors_id, queue);
state = nullptr;
/// Take local task from queue if has one.
if (!state && !queue.empty())
if (!queue.empty())
{
state = queue.front();
queue.pop();
}
prepare_all_processors(queue, parents, parents, parents);
process_pinned_tasks(queue);
/// Take pinned task if has one.
{
std::lock_guard guard(task_queue_mutex);
if (!executor_contexts[thread_num]->pinned_tasks.empty())
{
if (state)
queue.push(state);
state = executor_contexts[thread_num]->pinned_tasks.front();
executor_contexts[thread_num]->pinned_tasks.pop();
}
}
/// Push other tasks to global queue.
if (!queue.empty())
{
......@@ -694,14 +623,15 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
while (!queue.empty() && !finished)
{
task_queue.push(queue.front());
task_queue.push(queue.front(), thread_num);
queue.pop();
}
if (!threads_queue.empty())
if (!threads_queue.empty() /* && task_queue.quota() > threads_queue.size()*/)
{
auto thread_to_wake = threads_queue.pop_any();
lock.unlock();
wake_up_executor(thread_to_wake);
}
}
......@@ -715,14 +645,15 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
}
}
total_time_ns = total_time_watch.elapsed();
wait_time_ns = total_time_ns - execution_time_ns - processing_time_ns;
// total_time_ns = total_time_watch.elapsed();
// wait_time_ns = total_time_ns - execution_time_ns - processing_time_ns;
/*
LOG_TRACE(log, "Thread finished."
<< " Total time: " << (total_time_ns / 1e9) << " sec."
<< " Execution time: " << (execution_time_ns / 1e9) << " sec."
<< " Processing time: " << (processing_time_ns / 1e9) << " sec."
<< " Wait time: " << (wait_time_ns / 1e9) << "sec.");
*/
}
void PipelineExecutor::executeImpl(size_t num_threads)
......@@ -730,6 +661,7 @@ void PipelineExecutor::executeImpl(size_t num_threads)
Stack stack;
threads_queue.init(num_threads);
task_queue.init(num_threads);
{
std::lock_guard guard(executor_contexts_mutex);
......@@ -763,42 +695,57 @@ void PipelineExecutor::executeImpl(size_t num_threads)
{
std::lock_guard lock(task_queue_mutex);
Queue queue;
size_t next_thread = 0;
while (!stack.empty())
{
UInt64 proc = stack.top();
stack.pop();
if (prepareProcessor(proc, stack, stack, 0, false))
prepareProcessor(proc, 0, queue, std::unique_lock<std::mutex>(graph[proc].status_mutex));
while (!queue.empty())
{
auto cur_state = graph[proc].execution_state.get();
task_queue.push(cur_state);
task_queue.push(queue.front(), next_thread);
queue.pop();
++next_thread;
if (next_thread >= num_threads)
next_thread = 0;
}
}
}
for (size_t i = 0; i < num_threads; ++i)
if (num_threads > 1)
{
threads.emplace_back([this, thread_group, thread_num = i, num_threads]
for (size_t i = 0; i < num_threads; ++i)
{
/// ThreadStatus thread_status;
threads.emplace_back([this, thread_group, thread_num = i, num_threads]
{
/// ThreadStatus thread_status;
setThreadName("QueryPipelineEx");
setThreadName("QueryPipelineEx");
if (thread_group)
CurrentThread::attachTo(thread_group);
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
SCOPE_EXIT(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
executeSingleThread(thread_num, num_threads);
});
}
executeSingleThread(thread_num, num_threads);
});
}
for (auto & thread : threads)
if (thread.joinable())
thread.join();
for (auto & thread : threads)
if (thread.joinable())
thread.join();
}
else
executeSingleThread(0, num_threads);
finished_flag = true;
}
......
......@@ -84,6 +84,7 @@ private:
IProcessor * processor = nullptr;
UInt64 processors_id = 0;
bool has_quota = false;
/// Counters for profiling.
size_t num_executed_jobs = 0;
......@@ -117,6 +118,7 @@ private:
execution_state = std::make_unique<ExecutionState>();
execution_state->processor = processor;
execution_state->processors_id = processor_id;
execution_state->has_quota = processor->hasQuota();
}
Node(Node && other) noexcept
......@@ -132,7 +134,59 @@ private:
using Stack = std::stack<UInt64>;
using TaskQueue = std::queue<ExecutionState *>;
class TaskQueue
{
public:
void init(size_t num_threads) { queues.resize(num_threads); }
void push(ExecutionState * state, size_t thread_num)
{
queues[thread_num].push(state);
++size_;
if (state->has_quota)
++quota_;
}
ExecutionState * pop(size_t thread_num)
{
if (size_ == 0)
throw Exception("TaskQueue is not empty.", ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < queues.size(); ++i)
{
if (!queues[thread_num].empty())
{
ExecutionState * state = queues[thread_num].front();
queues[thread_num].pop();
--size_;
if (state->has_quota)
++quota_;
return state;
}
++thread_num;
if (thread_num >= queues.size())
thread_num = 0;
}
throw Exception("TaskQueue is not empty.", ErrorCodes::LOGICAL_ERROR);
}
size_t size() const { return size_; }
bool empty() const { return size_ == 0; }
size_t quota() const { return quota_; }
private:
using Queue = std::queue<ExecutionState *>;
std::vector<Queue> queues;
size_t size_ = 0;
size_t quota_ = 0;
};
/// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set.
/// Stores processors need to be prepared. Preparing status is already set for them.
......@@ -173,7 +227,7 @@ private:
std::mutex mutex;
bool wake_flag = false;
std::queue<ExecutionState *> pinned_tasks;
/// std::queue<ExecutionState *> pinned_tasks;
};
std::vector<std::unique_ptr<ExecutorContext>> executor_contexts;
......@@ -186,19 +240,21 @@ private:
/// Graph related methods.
bool addEdges(UInt64 node);
void buildGraph();
void expandPipeline(Stack & stack, UInt64 pid);
bool expandPipeline(Stack & stack, UInt64 pid);
using Queue = std::queue<ExecutionState *>;
/// Pipeline execution related methods.
void addChildlessProcessorsToStack(Stack & stack);
bool tryAddProcessorToStackIfUpdated(Edge & edge, Stack & stack);
bool tryAddProcessorToStackIfUpdated(Edge & edge, Queue & queue, size_t thread_number);
static void addJob(ExecutionState * execution_state);
// TODO: void addAsyncJob(UInt64 pid);
/// Prepare processor with pid number.
/// Check parents and children of current processor and push them to stacks if they also need to be prepared.
/// If processor wants to be expanded, ExpandPipelineTask from thread_number's execution context will be used.
bool prepareProcessor(UInt64 pid, Stack & children, Stack & parents, size_t thread_number, bool async);
void doExpandPipeline(ExpandPipelineTask * task, bool processing);
bool prepareProcessor(UInt64 pid, size_t thread_number, Queue & queue, std::unique_lock<std::mutex> node_lock);
bool doExpandPipeline(ExpandPipelineTask * task, bool processing);
void executeImpl(size_t num_threads);
void executeSingleThread(size_t thread_num, size_t num_threads);
......
此差异已折叠。
此差异已折叠。
......@@ -42,6 +42,8 @@ public:
/// Set information about preferred executor number for sources.
void pinSources(size_t executor_number);
void enableQuota();
void setTotalsPort(OutputPort * totals_) { totals = totals_; }
OutputPort * getTotalsPort() const { return totals; }
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册