提交 6596c3f6 编写于 作者: I Igor Mineev

DataPartsExchange new protocol with data size....

DataPartsExchange new protocol with data size. StorageReplicatedMergeTree::fetchPartition multipath fix. Choosing on mutate any disk to write mutation file.
上级 af300c66
......@@ -4,7 +4,6 @@
#include <Common/NetException.h>
#include <Common/typeid_cast.h>
#include <IO/HTTPCommon.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Poco/File.h>
#include <ext/scope_guard.h>
#include <Poco/Net/HTTPServerResponse.h>
......@@ -27,6 +26,7 @@ namespace ErrorCodes
extern const int CANNOT_WRITE_TO_OSTREAM;
extern const int CHECKSUM_DOESNT_MATCH;
extern const int UNKNOWN_TABLE;
extern const int UNKNOWN_ACTION;
}
namespace DataPartsExchange
......@@ -52,7 +52,17 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
if (blocker.isCancelled())
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
String part_name = params.get("part");
/// "0" for backward compatibility
String protocol_version = params.get("protocol_version", "0");
String part_name;
if (protocol_version == "0")
part_name = params.get("part");
else if (protocol_version == "1")
part_name = params.get("part_name");
else
throw Exception("Unsupported protocol version", ErrorCodes::UNKNOWN_ACTION); ///@TODO_IGR ASK Is it true error code?
static std::atomic_uint total_sends {0};
......@@ -95,7 +105,21 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
MergeTreeData::DataPart::Checksums data_checksums;
if (protocol_version == "1")
{
/// Get size of all files
UInt64 all_part_files_size = 0;
for (const auto &it : checksums.files)
{
String file_name = it.first;
String path = part->getFullPath() + part_name + "/" + file_name;
all_part_files_size += Poco::File(path).getSize();
}
writeBinary(all_part_files_size, out);
}
writeBinary(checksums.files.size(), out);
for (const auto & it : checksums.files)
{
String file_name = it.first;
......@@ -174,9 +198,10 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
uri.setPort(port);
uri.setQueryParameters(
{
{"endpoint", getEndpointId(replica_path)},
{"part", part_name},
{"compress", "false"}
{"endpoint", getEndpointId(replica_path)},
{"part_name", part_name},
{"protocol_version", "1"},
{"compress", "false"}
});
Poco::Net::HTTPBasicCredentials creds{};
......@@ -186,22 +211,81 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
creds.setPassword(password);
}
bool protocol_error = true;
try
{
PooledReadWriteBufferFromHTTP in{
uri,
Poco::Net::HTTPRequest::HTTP_POST,
{},
timeouts,
creds,
DBMS_DEFAULT_BUFFER_SIZE,
data.settings.replicated_max_parallel_fetches_for_host
};
UInt64 sum_files_size;
readBinary(sum_files_size, in);
protocol_error = false;
auto reservation = data.reserveSpaceForPart(sum_files_size);
return downloadPart(part_name, replica_path, to_detached, tmp_prefix_, reservation, in);
}
catch (...) ///@TODO_IGR catch exception
{
if (!protocol_error)
throw;
}
/// Protocol error
/// Seems to be replica without protocol_version "1" supporting
/// Try to use old one
Poco::URI uri_v0;
uri_v0.setScheme(interserver_scheme);
uri_v0.setHost(host);
uri_v0.setPort(port);
uri_v0.setQueryParameters(
{
{"endpoint", getEndpointId(replica_path)},
{"part", part_name},
{"compress", "false"}
});
PooledReadWriteBufferFromHTTP in{
uri,
Poco::Net::HTTPRequest::HTTP_POST,
{},
timeouts,
creds,
DBMS_DEFAULT_BUFFER_SIZE,
data.settings.replicated_max_parallel_fetches_for_host
uri_v0,
Poco::Net::HTTPRequest::HTTP_POST,
{},
timeouts,
creds,
DBMS_DEFAULT_BUFFER_SIZE,
data.settings.replicated_max_parallel_fetches_for_host
};
/// We don't know real size of part
auto reservation = data.reserveOnMaxDiskWithoutReservation();
return downloadPart(part_name, replica_path, to_detached, tmp_prefix_, reservation, in);
}
MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
const String & part_name,
const String & replica_path,
bool to_detached,
const String & tmp_prefix_,
const DiskSpaceMonitor::ReservationPtr & reservation,
PooledReadWriteBufferFromHTTP & in)
{
size_t files;
readBinary(files, in);
static const String TMP_PREFIX = "tmp_fetch_";
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
String relative_part_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
auto reservation = data.reserveSpaceForPart(0); ///@TODO_IGR ASK What size should be there?
String part_path = data.getFullPathOnDisk(reservation->getDisk());
String absolute_part_path = part_path + relative_part_path + "/";
Poco::File part_file(absolute_part_path);
......@@ -217,8 +301,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
new_data_part->relative_path = relative_part_path;
new_data_part->is_temp = true;
size_t files;
readBinary(files, in);
MergeTreeData::DataPart::Checksums checksums;
for (size_t i = 0; i < files; ++i)
{
......
......@@ -6,6 +6,7 @@
#include <IO/HashingWriteBuffer.h>
#include <IO/copyData.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ReadWriteBufferFromHTTP.h>
namespace DB
......@@ -64,6 +65,14 @@ public:
ActionBlocker blocker;
private:
MergeTreeData::MutableDataPartPtr downloadPart(
const String & part_name,
const String & replica_path,
bool to_detached,
const String & tmp_prefix_,
const DiskSpaceMonitor::ReservationPtr & reservation,
PooledReadWriteBufferFromHTTP & in);
MergeTreeData & data;
Logger * log;
};
......
......@@ -183,6 +183,25 @@ DiskSpaceMonitor::ReservationPtr Schema::reserve(UInt64 expected_size) const
return {};
}
DiskSpaceMonitor::ReservationPtr Schema::reserveOnMaxDiskWithoutReservation() const
{
UInt64 max_space = 0;
DiskPtr max_disk;
for (const auto & volume : volumes)
{
for (const auto &disk : volume.disks)
{
auto avail_space = disk->getAvailableSpace();
if (avail_space > max_space)
{
max_space = avail_space;
max_disk = disk;
}
}
}
return DiskSpaceMonitor::tryToReserve(max_disk, 0);
}
SchemaSelector::SchemaSelector(const Poco::Util::AbstractConfiguration & config, const String& config_prefix, const DiskSelector & disks)
{
Poco::Util::AbstractConfiguration::Keys keys;
......
......@@ -324,6 +324,10 @@ public:
DiskSpaceMonitor::ReservationPtr reserve(UInt64 expected_size) const;
/// Reserves 0 bytes on disk with max available space
/// Do not use this function when it is possible to predict size!!!
DiskSpaceMonitor::ReservationPtr reserveOnMaxDiskWithoutReservation() const;
private:
Volumes volumes;
};
......
......@@ -605,6 +605,10 @@ public:
DiskSpaceMonitor::ReservationPtr reserveSpaceForPart(UInt64 expected_size);
/// Choose disk with max available free space
/// Reserves 0 bytes
DiskSpaceMonitor::ReservationPtr reserveOnMaxDiskWithoutReservation() { return schema.reserveOnMaxDiskWithoutReservation(); }
MergeTreeDataFormatVersion format_version;
Context global_context;
......
......@@ -334,8 +334,10 @@ public:
void StorageMergeTree::mutate(const MutationCommands & commands, const Context &)
{
auto reservation = reserveSpaceForPart(0); ///@TODO_IGR ASK What expected size of mutated part? what size should we reserve?
MergeTreeMutationEntry entry(commands, getFullPathOnDisk(reservation->getDisk()), insert_increment.get());
///@TODO_IGR ASK What should i do here?
/// Choose any disk.
auto disk = schema.getDisks()[0];
MergeTreeMutationEntry entry(commands, getFullPathOnDisk(disk), insert_increment.get());
String file_name;
{
std::lock_guard lock(currently_merging_mutex);
......
......@@ -4172,12 +4172,16 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const
* Unreliable (there is a race condition) - such a partition may appear a little later.
*/
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator dir_it{getDataPaths()[0] + "detached/"}; dir_it != dir_end; ++dir_it) ///@TODO IGR
for (const std::string & path : getDataPaths())
{
MergeTreePartInfo part_info;
if (MergeTreePartInfo::tryParsePartName(dir_it.name(), &part_info, format_version)
&& part_info.partition_id == partition_id)
throw Exception("Detached partition " + partition_id + " already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS);
for (Poco::DirectoryIterator dir_it{path + "detached/"}; dir_it != dir_end; ++dir_it)
{
MergeTreePartInfo part_info;
if (MergeTreePartInfo::tryParsePartName(dir_it.name(), &part_info, format_version)
&& part_info.partition_id == partition_id)
throw Exception("Detached partition " + partition_id + " already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS);
}
}
zkutil::Strings replicas;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册