提交 c4712f1e 编写于 作者: A Alexey Milovidov

Make the code less bad

上级 859736d9
......@@ -114,6 +114,7 @@ add_headers_and_sources(dbms src/Columns)
add_headers_and_sources(dbms src/Storages)
add_headers_and_sources(dbms src/Storages/Distributed)
add_headers_and_sources(dbms src/Storages/MergeTree)
add_headers_and_sources(dbms src/Storages/LiveView)
add_headers_and_sources(dbms src/Client)
add_headers_and_sources(dbms src/Formats)
add_headers_and_sources(dbms src/Processors)
......
......@@ -12,7 +12,7 @@
#include <Common/ThreadPool.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <Storages/StorageValues.h>
#include <Storages/StorageLiveView.h>
#include <Storages/LiveView/StorageLiveView.h>
namespace DB
{
......
......@@ -5,7 +5,6 @@
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/StorageLiveView.h>
namespace DB
{
......
......@@ -8,9 +8,9 @@
#include <Storages/AlterCommands.h>
#include <Storages/MutationCommands.h>
#include <Storages/PartitionCommands.h>
#include <Storages/LiveViewCommands.h>
#include <Storages/LiveView/LiveViewCommands.h>
#include <Storages/LiveView/StorageLiveView.h>
#include <Common/typeid_cast.h>
#include <Storages/StorageLiveView.h>
#include <algorithm>
......
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <limits>
#include <Common/ConcurrentBoundedQueue.h>
#include <Poco/Condition.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/StorageLiveView.h>
namespace DB
......@@ -61,8 +44,8 @@ public:
if (isCancelled() || storage->is_dropped)
return;
IBlockInputStream::cancel(kill);
Poco::FastMutex::ScopedLock lock(storage->mutex);
storage->condition.broadcast();
std::lock_guard lock(storage->mutex);
storage->condition.notify_all();
}
Block getHeader() const override { return storage->getHeader(); }
......@@ -92,14 +75,14 @@ public:
NonBlockingResult tryRead()
{
return tryRead_(false);
return tryReadImpl(false);
}
protected:
Block readImpl() override
{
/// try reading
return tryRead_(true).first;
return tryReadImpl(true).first;
}
/** tryRead method attempts to read a block in either blocking
......@@ -107,7 +90,7 @@ protected:
* then method return empty block with flag set to false
* to indicate that method would block to get the next block.
*/
NonBlockingResult tryRead_(bool blocking)
NonBlockingResult tryReadImpl(bool blocking)
{
Block res;
......@@ -118,7 +101,7 @@ protected:
/// If blocks were never assigned get blocks
if (!blocks)
{
Poco::FastMutex::ScopedLock lock(storage->mutex);
std::lock_guard lock(storage->mutex);
if (!active)
return { Block(), false };
blocks = (*blocks_ptr);
......@@ -135,7 +118,7 @@ protected:
if (it == end)
{
{
Poco::FastMutex::ScopedLock lock(storage->mutex);
std::unique_lock lock(storage->mutex);
if (!active)
return { Block(), false };
/// If we are done iterating over our blocks
......@@ -162,7 +145,10 @@ protected:
while (true)
{
UInt64 timestamp_usec = static_cast<UInt64>(timestamp.epochMicroseconds());
bool signaled = storage->condition.tryWait(storage->mutex, std::max(static_cast<UInt64>(0), heartbeat_interval_usec - (timestamp_usec - last_event_timestamp_usec)) / 1000);
/// Or spurious wakeup.
bool signaled = std::cv_status::no_timeout == storage->condition.wait_for(lock,
std::chrono::microseconds(std::max(UInt64(0), heartbeat_interval_usec - (timestamp_usec - last_event_timestamp_usec))));
if (isCancelled() || storage->is_dropped)
{
......@@ -181,7 +167,7 @@ protected:
}
}
}
return tryRead_(blocking);
return tryReadImpl(blocking);
}
res = *it;
......
#pragma once
#include <DataTypes/DataTypesNumber.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/LiveView/StorageLiveView.h>
namespace DB
{
class LiveViewBlockOutputStream : public IBlockOutputStream
{
public:
explicit LiveViewBlockOutputStream(StorageLiveView & storage_) : storage(storage_) {}
void writePrefix() override
{
new_blocks = std::make_shared<Blocks>();
new_blocks_metadata = std::make_shared<BlocksMetadata>();
new_hash = std::make_shared<SipHash>();
}
void writeSuffix() override
{
UInt128 key;
String key_str;
new_hash->get128(key.low, key.high);
key_str = key.toHexString();
std::lock_guard lock(storage.mutex);
if (storage.getBlocksHashKey() != key_str)
{
new_blocks_metadata->hash = key_str;
new_blocks_metadata->version = storage.getBlocksVersion() + 1;
for (auto & block : *new_blocks)
{
block.insert({DataTypeUInt64().createColumnConst(
block.rows(), new_blocks_metadata->version)->convertToFullColumnIfConst(),
std::make_shared<DataTypeUInt64>(),
"_version"});
}
(*storage.blocks_ptr) = new_blocks;
(*storage.blocks_metadata_ptr) = new_blocks_metadata;
storage.condition.notify_all();
}
new_blocks.reset();
new_blocks_metadata.reset();
new_hash.reset();
}
void write(const Block & block) override
{
new_blocks->push_back(block);
block.updateHash(*new_hash);
}
Block getHeader() const override { return storage.getHeader(); }
private:
using SipHashPtr = std::shared_ptr<SipHash>;
BlocksPtr new_blocks;
BlocksMetadataPtr new_blocks_metadata;
SipHashPtr new_hash;
StorageLiveView & storage;
};
}
......@@ -12,9 +12,9 @@ limitations under the License. */
#pragma once
#include <Parsers/ASTAlterQuery.h>
#include <optional>
#include <Storages/StorageLiveView.h>
#include <Parsers/ASTAlterQuery.h>
#include <Storages/LiveView/StorageLiveView.h>
namespace DB
{
......
......@@ -9,11 +9,9 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <limits>
#pragma once
#include <Common/ConcurrentBoundedQueue.h>
#include <Poco/Condition.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
......@@ -21,7 +19,7 @@ limitations under the License. */
#include <Columns/ColumnsNumber.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/StorageLiveView.h>
#include <Storages/LiveView/StorageLiveView.h>
namespace DB
......@@ -66,8 +64,8 @@ public:
if (isCancelled() || storage->is_dropped)
return;
IBlockInputStream::cancel(kill);
Poco::FastMutex::ScopedLock lock(storage->mutex);
storage->condition.broadcast();
std::lock_guard lock(storage->mutex);
storage->condition.notify_all();
}
Block getHeader() const override
......@@ -103,7 +101,7 @@ public:
NonBlockingResult tryRead()
{
return tryRead_(false);
return tryReadImpl(false);
}
Block getEventBlock()
......@@ -120,7 +118,7 @@ protected:
Block readImpl() override
{
/// try reading
return tryRead_(true).first;
return tryReadImpl(true).first;
}
/** tryRead method attempts to read a block in either blocking
......@@ -128,7 +126,7 @@ protected:
* then method return empty block with flag set to false
* to indicate that method would block to get the next block.
*/
NonBlockingResult tryRead_(bool blocking)
NonBlockingResult tryReadImpl(bool blocking)
{
if (has_limit && num_updates == static_cast<Int64>(limit))
{
......@@ -137,7 +135,7 @@ protected:
/// If blocks were never assigned get blocks
if (!blocks)
{
Poco::FastMutex::ScopedLock lock(storage->mutex);
std::lock_guard lock(storage->mutex);
if (!active)
return { Block(), false };
blocks = (*blocks_ptr);
......@@ -155,7 +153,7 @@ protected:
if (it == end)
{
{
Poco::FastMutex::ScopedLock lock(storage->mutex);
std::unique_lock lock(storage->mutex);
if (!active)
return { Block(), false };
/// If we are done iterating over our blocks
......@@ -183,7 +181,10 @@ protected:
while (true)
{
UInt64 timestamp_usec = static_cast<UInt64>(timestamp.epochMicroseconds());
bool signaled = storage->condition.tryWait(storage->mutex, std::max(static_cast<UInt64>(0), heartbeat_interval_usec - (timestamp_usec - last_event_timestamp_usec)) / 1000);
/// Or spurious wakeup.
bool signaled = std::cv_status::no_timeout == storage->condition.wait_for(lock,
std::chrono::microseconds(std::max(UInt64(0), heartbeat_interval_usec - (timestamp_usec - last_event_timestamp_usec))));
if (isCancelled() || storage->is_dropped)
{
......@@ -202,7 +203,7 @@ protected:
}
}
}
return tryRead_(blocking);
return tryReadImpl(blocking);
}
// move right to the end
......
......@@ -20,9 +20,6 @@ limitations under the License. */
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/LiveViewBlockInputStream.h>
#include <DataStreams/LiveViewEventsBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/BlocksBlockInputStream.h>
......@@ -30,10 +27,15 @@ limitations under the License. */
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <Common/typeid_cast.h>
#include <Common/SipHash.h>
#include <Storages/LiveView/StorageLiveView.h>
#include <Storages/LiveView/LiveViewBlockInputStream.h>
#include <Storages/LiveView/LiveViewBlockOutputStream.h>
#include <Storages/LiveView/LiveViewEventsBlockInputStream.h>
#include <Storages/LiveView/ProxyStorage.h>
#include <Storages/StorageLiveView.h>
#include <Storages/StorageFactory.h>
#include <Storages/ProxyStorage.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
......@@ -107,70 +109,6 @@ static void checkAllowedQueries(const ASTSelectQuery & query)
}
class LiveViewBlockOutputStream : public IBlockOutputStream
{
public:
explicit LiveViewBlockOutputStream(StorageLiveView & storage_) : storage(storage_) {}
void writePrefix() override
{
new_blocks = std::make_shared<Blocks>();
new_blocks_metadata = std::make_shared<BlocksMetadata>();
new_hash = std::make_shared<SipHash>();
}
void writeSuffix() override
{
UInt128 key;
String key_str;
new_hash->get128(key.low, key.high);
key_str = key.toHexString();
Poco::FastMutex::ScopedLock lock(storage.mutex);
if (storage.getBlocksHashKey() != key_str)
{
new_blocks_metadata->hash = key_str;
new_blocks_metadata->version = storage.getBlocksVersion() + 1;
for (auto & block : *new_blocks)
{
block.insert({DataTypeUInt64().createColumnConst(
block.rows(), new_blocks_metadata->version)->convertToFullColumnIfConst(),
std::make_shared<DataTypeUInt64>(),
"_version"});
}
(*storage.blocks_ptr) = new_blocks;
(*storage.blocks_metadata_ptr) = new_blocks_metadata;
storage.condition.broadcast();
}
new_blocks.reset();
new_blocks_metadata.reset();
new_hash.reset();
}
void write(const Block & block) override
{
new_blocks->push_back(block);
block.updateHash(*new_hash);
}
Block getHeader() const override { return storage.getHeader(); }
private:
using SipHashPtr = std::shared_ptr<SipHash>;
BlocksPtr new_blocks;
BlocksMetadataPtr new_blocks_metadata;
SipHashPtr new_hash;
StorageLiveView & storage;
};
void StorageLiveView::writeIntoLiveView(
StorageLiveView & live_view,
const Block & block,
......@@ -182,7 +120,7 @@ void StorageLiveView::writeIntoLiveView(
/// just reset blocks to empty and do nothing else
/// When first reader comes the blocks will be read.
{
Poco::FastMutex::ScopedLock lock(live_view.mutex);
std::lock_guard lock(live_view.mutex);
if (!live_view.hasActiveUsers())
{
live_view.reset();
......@@ -196,7 +134,7 @@ void StorageLiveView::writeIntoLiveView(
BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>();
{
Poco::FastMutex::ScopedLock lock(live_view.mutex);
std::lock_guard lock(live_view.mutex);
mergeable_blocks = live_view.getMergeableBlocks();
if (!mergeable_blocks || mergeable_blocks->size() >= context.getGlobalContext().getSettingsRef().max_live_view_insert_blocks_before_refresh)
......@@ -242,7 +180,7 @@ void StorageLiveView::writeIntoLiveView(
return;
{
Poco::FastMutex::ScopedLock lock(live_view.mutex);
std::lock_guard lock(live_view.mutex);
mergeable_blocks = live_view.getMergeableBlocks();
mergeable_blocks->push_back(new_mergeable_blocks);
......@@ -435,8 +373,8 @@ void StorageLiveView::noUsersThread(const UInt64 & timeout)
{
while (1)
{
Poco::FastMutex::ScopedLock lock(no_users_thread_mutex);
if (!no_users_thread_wakeup && !no_users_thread_condition.tryWait(no_users_thread_mutex, timeout * 1000))
std::unique_lock lock(no_users_thread_mutex);
if (!no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return no_users_thread_wakeup; }))
{
no_users_thread_wakeup = false;
if (shutdown_called)
......@@ -487,14 +425,14 @@ void StorageLiveView::startNoUsersThread(const UInt64 & timeout)
if (no_users_thread.joinable())
{
{
Poco::FastMutex::ScopedLock lock(no_users_thread_mutex);
std::lock_guard lock(no_users_thread_mutex);
no_users_thread_wakeup = true;
no_users_thread_condition.signal();
no_users_thread_condition.notify_one();
}
no_users_thread.join();
}
{
Poco::FastMutex::ScopedLock lock(no_users_thread_mutex);
std::lock_guard lock(no_users_thread_mutex);
no_users_thread_wakeup = false;
}
if (!is_dropped)
......@@ -516,9 +454,9 @@ void StorageLiveView::shutdown()
if (no_users_thread.joinable())
{
Poco::FastMutex::ScopedLock lock(no_users_thread_mutex);
std::lock_guard lock(no_users_thread_mutex);
no_users_thread_wakeup = true;
no_users_thread_condition.signal();
no_users_thread_condition.notify_one();
/// Must detach the no users thread
/// as we can't join it as it will result
/// in a deadlock
......@@ -536,18 +474,19 @@ void StorageLiveView::drop()
global_context.removeDependency(
DatabaseAndTableName(select_database_name, select_table_name),
DatabaseAndTableName(database_name, table_name));
Poco::FastMutex::ScopedLock lock(mutex);
std::lock_guard lock(mutex);
is_dropped = true;
condition.broadcast();
condition.notify_all();
}
void StorageLiveView::refresh(const Context & context)
{
auto alter_lock = lockAlterIntention(context.getCurrentQueryId());
{
Poco::FastMutex::ScopedLock lock(mutex);
std::lock_guard lock(mutex);
if (getNewBlocks())
condition.broadcast();
condition.notify_all();
}
}
......@@ -562,11 +501,11 @@ BlockInputStreams StorageLiveView::read(
/// add user to the blocks_ptr
std::shared_ptr<BlocksPtr> stream_blocks_ptr = blocks_ptr;
{
Poco::FastMutex::ScopedLock lock(mutex);
std::lock_guard lock(mutex);
if (!(*blocks_ptr))
{
if (getNewBlocks())
condition.broadcast();
condition.notify_all();
}
}
return { std::make_shared<BlocksBlockInputStream>(stream_blocks_ptr, getHeader()) };
......@@ -598,17 +537,17 @@ BlockInputStreams StorageLiveView::watch(
if (no_users_thread.joinable())
{
Poco::FastMutex::ScopedLock lock(no_users_thread_mutex);
std::lock_guard lock(no_users_thread_mutex);
no_users_thread_wakeup = true;
no_users_thread_condition.signal();
no_users_thread_condition.notify_one();
}
{
Poco::FastMutex::ScopedLock lock(mutex);
std::lock_guard lock(mutex);
if (!(*blocks_ptr))
{
if (getNewBlocks())
condition.broadcast();
condition.notify_all();
}
}
......@@ -623,17 +562,17 @@ BlockInputStreams StorageLiveView::watch(
if (no_users_thread.joinable())
{
Poco::FastMutex::ScopedLock lock(no_users_thread_mutex);
std::lock_guard lock(no_users_thread_mutex);
no_users_thread_wakeup = true;
no_users_thread_condition.signal();
no_users_thread_condition.notify_one();
}
{
Poco::FastMutex::ScopedLock lock(mutex);
std::lock_guard lock(mutex);
if (!(*blocks_ptr))
{
if (getNewBlocks())
condition.broadcast();
condition.notify_all();
}
}
......
......@@ -11,12 +11,11 @@ See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <Poco/Condition.h>
#include <DataTypes/DataTypesNumber.h>
#include <ext/shared_ptr_helper.h>
#include <Common/SipHash.h>
#include <Storages/IStorage.h>
#include <Storages/ProxyStorage.h>
#include <mutex>
#include <condition_variable>
namespace DB
......@@ -35,6 +34,8 @@ using BlocksMetadataPtr = std::shared_ptr<BlocksMetadata>;
class StorageLiveView : public ext::shared_ptr_helper<StorageLiveView>, public IStorage
{
friend struct ext::shared_ptr_helper<StorageLiveView>;
friend class LiveViewBlockInputStream;
friend class LiveViewEventsBlockInputStream;
friend class LiveViewBlockOutputStream;
public:
......@@ -55,12 +56,6 @@ public:
bool supportsSampling() const override { return true; }
bool supportsFinal() const override { return true; }
/// Mutex for the blocks and ready condition
Poco::FastMutex mutex;
/// New blocks ready condition to broadcast to readers
/// that new blocks are available
Poco::Condition condition;
bool isTemporary() { return is_temporary; }
/// Check if we have any readers
......@@ -79,16 +74,16 @@ public:
/// Background thread for temporary tables
/// which drops this table if there are no users
void startNoUsersThread(const UInt64 & timeout);
Poco::FastMutex no_users_thread_mutex;
std::mutex no_users_thread_mutex;
bool no_users_thread_wakeup{false};
Poco::Condition no_users_thread_condition;
std::condition_variable no_users_thread_condition;
/// Get blocks hash
/// must be called with mutex locked
String getBlocksHashKey()
{
if (*blocks_metadata_ptr)
return (*blocks_metadata_ptr)->hash;
return "";
return {};
}
/// Get blocks version
/// must be called with mutex locked
......@@ -157,6 +152,12 @@ private:
bool is_temporary {false};
mutable Block sample_block;
/// Mutex for the blocks and ready condition
std::mutex mutex;
/// New blocks ready condition to broadcast to readers
/// that new blocks are available
std::condition_variable condition;
/// Active users
std::shared_ptr<bool> active_ptr;
/// Current data blocks that store query result
......
......@@ -68,14 +68,6 @@ StoragePtr StorageFactory::get(
name = "LiveView";
}
else if (query.is_live_channel)
{
if (query.storage)
throw Exception("Specifying ENGINE is not allowed for a LiveChannel", ErrorCodes::INCORRECT_QUERY);
name = "LiveChannel";
}
else
{
/// Check for some special types, that are not allowed to be stored in tables. Example: NULL data type.
......@@ -137,12 +129,6 @@ StoragePtr StorageFactory::get(
"Direct creation of tables with ENGINE LiveView is not supported, use CREATE LIVE VIEW statement",
ErrorCodes::INCORRECT_QUERY);
}
else if (name == "LiveChannel")
{
throw Exception(
"Direct creation of tables with ENGINE LiveChannel is not supported, use CREATE LIVE CHANNEL statement",
ErrorCodes::INCORRECT_QUERY);
}
}
}
......
......@@ -25,7 +25,6 @@ void registerStorageJoin(StorageFactory & factory);
void registerStorageView(StorageFactory & factory);
void registerStorageMaterializedView(StorageFactory & factory);
void registerStorageLiveView(StorageFactory & factory);
//void registerStorageLiveChannel(StorageFactory & factory);
#if USE_HDFS
void registerStorageHDFS(StorageFactory & factory);
......@@ -67,7 +66,6 @@ void registerStorages()
registerStorageView(factory);
registerStorageMaterializedView(factory);
registerStorageLiveView(factory);
//registerStorageLiveChannel(factory);
#if USE_HDFS
registerStorageHDFS(factory);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册