未验证 提交 f9968fbc 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #6554 from yandex/add-table-finctions-number-mt

Add table function numbers_mt
......@@ -43,8 +43,63 @@ private:
};
StorageSystemNumbers::StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional<UInt64> limit_, UInt64 offset_)
: name(name_), multithreaded(multithreaded_), limit(limit_), offset(offset_)
struct NumbersMtState
{
std::atomic<UInt64> counter;
explicit NumbersMtState(UInt64 offset) : counter(offset) {}
};
using NumbersMtStatePtr = std::shared_ptr<NumbersMtState>;
class NumbersMultiThreadedBlockInputStream : public IBlockInputStream
{
public:
NumbersMultiThreadedBlockInputStream(NumbersMtStatePtr state_, UInt64 block_size_, UInt64 max_counter_)
: state(std::move(state_)), counter(state->counter), block_size(block_size_), max_counter(max_counter_) {}
String getName() const override { return "NumbersMt"; }
Block getHeader() const override
{
return { ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number") };
}
protected:
Block readImpl() override
{
if (block_size == 0)
return {};
UInt64 curr = counter.fetch_add(block_size, std::memory_order_acquire);
if (curr >= max_counter)
return {};
if (curr + block_size > max_counter)
block_size = max_counter - curr;
auto column = ColumnUInt64::create(block_size);
ColumnUInt64::Container & vec = column->getData();
UInt64 * pos = vec.data();
UInt64 * end = &vec[block_size];
while (pos < end)
*pos++ = curr++;
return { ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeUInt64>(), "number") };
}
private:
NumbersMtStatePtr state;
std::atomic<UInt64> & counter;
UInt64 block_size;
UInt64 max_counter;
};
StorageSystemNumbers::StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional<UInt64> limit_, UInt64 offset_, bool even_distribution_)
: name(name_), multithreaded(multithreaded_), even_distribution(even_distribution_), limit(limit_), offset(offset_)
{
setColumns(ColumnsDescription({{"number", std::make_shared<DataTypeUInt64>()}}));
}
......@@ -69,6 +124,18 @@ BlockInputStreams StorageSystemNumbers::read(
num_streams = 1;
BlockInputStreams res(num_streams);
if (num_streams > 1 && !even_distribution && *limit)
{
auto state = std::make_shared<NumbersMtState>(offset);
UInt64 max_counter = offset + *limit;
for (size_t i = 0; i < num_streams; ++i)
res[i] = std::make_shared<NumbersMultiThreadedBlockInputStream>(state, max_block_size, max_counter);
return res;
}
for (size_t i = 0; i < num_streams; ++i)
{
res[i] = std::make_shared<NumbersBlockInputStream>(max_block_size, offset + i * max_block_size, num_streams * max_block_size);
......
......@@ -19,6 +19,9 @@ class Context;
* If multithreaded is specified, numbers will be generated in several streams
* (and result could be out of order). If both multithreaded and limit are specified,
* the table could give you not exactly 1..limit range, but some arbitrary 'limit' numbers.
*
* In multithreaded case, if even_distributed is False, implementation with atomic is used,
* and result is always in [0 ... limit - 1] range.
*/
class StorageSystemNumbers : public ext::shared_ptr_helper<StorageSystemNumbers>, public IStorage
{
......@@ -38,11 +41,14 @@ public:
private:
const std::string name;
bool multithreaded;
bool even_distribution;
std::optional<UInt64> limit;
UInt64 offset;
protected:
StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional<UInt64> limit_ = std::nullopt, UInt64 offset_ = 0);
/// If even_distribution is true, numbers are distributed evenly between streams.
/// Otherwise, streams concurrently increment atomic.
StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional<UInt64> limit_ = std::nullopt, UInt64 offset_ = 0, bool even_distribution_ = true);
};
}
......@@ -16,34 +16,35 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
StoragePtr TableFunctionNumbers::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
template <bool multithreaded>
StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
if (const auto * function = ast_function->as<ASTFunction>())
{
auto arguments = function->arguments->children;
if (arguments.size() != 1 && arguments.size() != 2)
throw Exception("Table function 'numbers' requires 'length' or 'offset, length'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception("Table function '" + getName() + "' requires 'length' or 'offset, length'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
UInt64 offset = arguments.size() == 2 ? evaluateArgument(context, arguments[0]) : 0;
UInt64 length = arguments.size() == 2 ? evaluateArgument(context, arguments[1]) : evaluateArgument(context, arguments[0]);
auto res = StorageSystemNumbers::create(table_name, false, length, offset);
auto res = StorageSystemNumbers::create(table_name, multithreaded, length, offset, false);
res->startup();
return res;
}
throw Exception("Table function 'numbers' requires 'limit' or 'offset, limit'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception("Table function '" + getName() + "' requires 'limit' or 'offset, limit'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
void registerTableFunctionNumbers(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionNumbers>();
factory.registerFunction<TableFunctionNumbers<true>>();
factory.registerFunction<TableFunctionNumbers<false>>();
}
UInt64 TableFunctionNumbers::evaluateArgument(const Context & context, ASTPtr & argument) const
template <bool multithreaded>
UInt64 TableFunctionNumbers<multithreaded>::evaluateArgument(const Context & context, ASTPtr & argument) const
{
return evaluateConstantExpressionOrIdentifierAsLiteral(argument, context)->as<ASTLiteral &>().value.safeGet<UInt64>();
}
......
......@@ -7,14 +7,15 @@
namespace DB
{
/* numbers(limit)
/* numbers(limit), numbers_mt(limit)
* - the same as SELECT number FROM system.numbers LIMIT limit.
* Used for testing purposes, as a simple example of table function.
*/
template <bool multithreaded>
class TableFunctionNumbers : public ITableFunction
{
public:
static constexpr auto name = "numbers";
static constexpr auto name = multithreaded ? "numbers_mt" : "numbers";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
......
<test>
<type>once</type>
<type>loop</type>
<stop_conditions>
<all_of>
<total_time_ms>10000</total_time_ms>
<all_of>
<iterations>3</iterations>
<min_time_not_changing_for_ms>10000</min_time_not_changing_for_ms>
</all_of>
<any_of>
<average_speed_not_changing_for_ms>5000</average_speed_not_changing_for_ms>
<total_time_ms>20000</total_time_ms>
<iterations>5</iterations>
<total_time_ms>60000</total_time_ms>
</any_of>
</stop_conditions>
<main_metric>
<max_bytes_per_second/>
<min_time/>
</main_metric>
<substitutions>
......@@ -45,11 +46,11 @@
<substitution>
<name>table</name>
<values>
<value>numbers</value>
<value>numbers_mt</value>
<value>numbers(1000000)</value>
<value>numbers_mt(10000000)</value>
</values>
</substitution>
</substitutions>
<query>SELECT ignore({gp_hash_func}({string})) FROM system.{table}</query>
<query>SELECT count() from {table} where not ignore({gp_hash_func}({string}))</query>
</test>
......@@ -21,8 +21,8 @@ $CLICKHOUSE_CLIENT -q "select multisearchallposicionutf7casesensitive('abc');" 2
$CLICKHOUSE_CLIENT -q "select multiSearchAllposicionutf7sensitive('abc');" 2>&1 | grep "Maybe you meant: \['multiSearchAllPositionsCaseInsensitive','multiSearchAnyCaseInsensitive'\]." &>/dev/null;
$CLICKHOUSE_CLIENT -q "select multiSearchAllPosicionSensitiveUTF8('abc');" 2>&1 | grep "Maybe you meant: \['multiSearchAnyCaseInsensitiveUTF8','multiSearchAllPositionsCaseInsensitiveUTF8'\]." &>/dev/null;
$CLICKHOUSE_CLIENT -q "select * FROM numberss(10);" 2>&1 | grep "Maybe you meant: \['numbers'\]." &>/dev/null
$CLICKHOUSE_CLIENT -q "select * FROM anothernumbers(10);" 2>&1 | grep -v "Maybe you meant: \['numbers'\]." &>/dev/null
$CLICKHOUSE_CLIENT -q "select * FROM numberss(10);" 2>&1 | grep "Maybe you meant: \['numbers'\,'numbers_mt'\]." &>/dev/null
$CLICKHOUSE_CLIENT -q "select * FROM anothernumbers(10);" 2>&1 | grep -v "Maybe you meant: \['numbers'\,'numbers_mt'\]." &>/dev/null
$CLICKHOUSE_CLIENT -q "select * FROM mynumbers(10);" 2>&1 | grep "Maybe you meant: \['numbers'\]." &>/dev/null
$CLICKHOUSE_CLIENT -q "CREATE TABLE stored_aggregates (d Date, Uniq AggregateFunction(uniq, UInt64)) ENGINE = MergeTre(d, d, 8192);" 2>&1 | grep "Maybe you meant: \['MergeTree'\]." &>/dev/null
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册