未验证 提交 2e15ce67 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #2132 from yandex/small-enhancements

Small enhancements
......@@ -12,6 +12,7 @@
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <Client/Connection.h>
#include <Client/TimeoutSetter.h>
#include <Common/ClickHouseRevision.h>
#include <Common/Exception.h>
#include <Common/NetException.h>
......@@ -228,32 +229,6 @@ void Connection::forceConnected()
}
}
struct TimeoutSetter
{
TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & timeout_)
: socket(socket_), timeout(timeout_)
{
old_send_timeout = socket.getSendTimeout();
old_receive_timeout = socket.getReceiveTimeout();
if (old_send_timeout > timeout)
socket.setSendTimeout(timeout);
if (old_receive_timeout > timeout)
socket.setReceiveTimeout(timeout);
}
~TimeoutSetter()
{
socket.setSendTimeout(old_send_timeout);
socket.setReceiveTimeout(old_receive_timeout);
}
Poco::Net::StreamSocket & socket;
Poco::Timespan timeout;
Poco::Timespan old_send_timeout;
Poco::Timespan old_receive_timeout;
};
bool Connection::ping()
{
// LOG_TRACE(log_wrapper.get(), "Ping");
......
#pragma once
#include <Poco/Timespan.h>
#include <Poco/Net/StreamSocket.h>
namespace DB
{
/// Temporarily overrides socket send/recieve timeouts and reset them back into destructor
/// Timeouts could be only decreased
struct TimeoutSetter
{
TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & send_timeout_, const Poco::Timespan & recieve_timeout_)
: socket(socket_), send_timeout(send_timeout_), recieve_timeout(recieve_timeout_)
{
old_send_timeout = socket.getSendTimeout();
old_receive_timeout = socket.getReceiveTimeout();
if (old_send_timeout > send_timeout)
socket.setSendTimeout(send_timeout);
if (old_receive_timeout > recieve_timeout)
socket.setReceiveTimeout(recieve_timeout);
}
TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & timeout_)
: TimeoutSetter(socket_, timeout_, timeout_) {}
~TimeoutSetter()
{
socket.setSendTimeout(old_send_timeout);
socket.setReceiveTimeout(old_receive_timeout);
}
Poco::Net::StreamSocket & socket;
Poco::Timespan send_timeout;
Poco::Timespan recieve_timeout;
Poco::Timespan old_send_timeout;
Poco::Timespan old_receive_timeout;
};
}
......@@ -31,6 +31,7 @@
#include "TCPHandler.h"
#include <Common/NetException.h>
#include <ext/scope_guard.h>
namespace DB
......@@ -139,6 +140,9 @@ void TCPHandler::runImpl()
/// Restore context of request.
query_context = connection_context;
/// If a user passed query-local timeouts, reset socket to initial state at the end of the query
SCOPE_EXIT({state.timeout_setter.reset();});
/** If Query - process it. If Ping or Cancel - go back to the beginning.
* There may come settings for a separate query that modify `query_context`.
*/
......@@ -600,7 +604,12 @@ void TCPHandler::receiveQuery()
}
/// Per query settings.
query_context.getSettingsRef().deserialize(*in);
Settings & settings = query_context.getSettingsRef();
settings.deserialize(*in);
/// Sync timeouts on client and server during current query to avoid dangling queries on server
/// NOTE: these settings are applied only for current connection (not for distributed tables' connections)
state.timeout_setter = std::make_unique<TimeoutSetter>(socket(), settings.send_timeout, settings.receive_timeout);
readVarUInt(stage, *in);
state.stage = QueryProcessingStage::Enum(stage);
......
......@@ -11,6 +11,7 @@
#include <DataStreams/BlockIO.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Client/TimeoutSetter.h>
#include "IServer.h"
......@@ -59,6 +60,9 @@ struct QueryState
/// To output progress, the difference after the previous sending of progress.
Progress progress;
/// Timeouts setter for current query
std::unique_ptr<TimeoutSetter> timeout_setter;
void reset()
{
......
......@@ -94,7 +94,12 @@ void ReplicatedMergeTreeQueue::initialize(
void ReplicatedMergeTreeQueue::insertUnlocked(LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed, std::lock_guard<std::mutex> &)
{
virtual_parts.add(entry->new_part_name);
queue.push_back(entry);
/// Put 'DROP PARTITION' entries at the beginning of the queue not to make superfluous fetches of parts that will be eventually deleted
if (entry->type != LogEntry::DROP_RANGE)
queue.push_back(entry);
else
queue.push_front(entry);
if (entry->type == LogEntry::GET_PART)
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册