未验证 提交 3b99b723 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #1245 from yandex/CLICKHOUSE-3178

Atomic parts addition and removing
......@@ -35,6 +35,7 @@ cmake_install.cmake
CTestTestfile.cmake
*.a
*.o
cmake-build-*
# Python cache
*.pyc
......
......@@ -21,6 +21,27 @@ namespace ProfileEvents
namespace zkutil
{
/// You should reinitialize ZooKeeper session in case of these errors
inline bool isUnrecoverableErrorCode(int32_t zk_return_code)
{
return zk_return_code == ZINVALIDSTATE || zk_return_code == ZSESSIONEXPIRED || zk_return_code == ZSESSIONMOVED;
}
/// Errors related with temporary network problems
inline bool isTemporaryErrorCode(int32_t zk_return_code)
{
return zk_return_code == ZCONNECTIONLOSS || zk_return_code == ZOPERATIONTIMEOUT;
}
/// Any error related with network or master election
/// In case of these errors you should retry the query or reinitialize ZooKeeper session (see isUnrecoverable())
inline bool isHardwareErrorCode(int32_t zk_return_code)
{
return isUnrecoverableErrorCode(zk_return_code) || isTemporaryErrorCode(zk_return_code);
}
class KeeperException : public DB::Exception
{
private:
......@@ -29,35 +50,36 @@ private:
: DB::Exception(msg, DB::ErrorCodes::KEEPER_EXCEPTION), code(code) { incrementEventCounter(); }
public:
KeeperException(const std::string & msg) : KeeperException(msg, ZOK, 0) {}
explicit KeeperException(const std::string & msg) : KeeperException(msg, ZOK, 0) {}
KeeperException(const std::string & msg, const int32_t code)
: KeeperException(msg + " (" + zerror(code) + ")", code, 0) {}
KeeperException(const int32_t code) : KeeperException(zerror(code), code, 0) {}
explicit KeeperException(const int32_t code) : KeeperException(zerror(code), code, 0) {}
KeeperException(const int32_t code, const std::string & path)
: KeeperException(std::string{zerror(code)} + ", path: " + path, code, 0) {}
KeeperException(const KeeperException & exc) : DB::Exception(exc), code(exc.code) { incrementEventCounter(); }
const char * name() const throw() { return "zkutil::KeeperException"; }
const char * className() const throw() { return "zkutil::KeeperException"; }
KeeperException * clone() const { return new KeeperException(*this); }
const char * name() const throw() override { return "zkutil::KeeperException"; }
const char * className() const throw() override { return "zkutil::KeeperException"; }
KeeperException * clone() const override { return new KeeperException(*this); }
/// при этих ошибках надо переинициализировать сессию с zookeeper
/// You should reinitialize ZooKeeper session in case of these errors
bool isUnrecoverable() const
{
return code == ZINVALIDSTATE || code == ZSESSIONEXPIRED || code == ZSESSIONMOVED;
return isUnrecoverableErrorCode(code);
}
/// любая ошибка связанная с работой сети, перевыбором мастера
/// при этих ошибках надо либо повторить запрос повторно, либо переинициализировать сессию (см. isUnrecoverable())
bool isHardwareError() const
/// Errors related with temporary network problems
bool isTemporaryError() const
{
return isUnrecoverable() || code == ZCONNECTIONLOSS || code == ZOPERATIONTIMEOUT;
return isTemporaryErrorCode(code);
}
bool isTemporaryError() const
/// Any error related with network or master election
/// In case of these errors you should retry the query or reinitialize ZooKeeper session (see isUnrecoverable())
bool isHardwareError() const
{
return code == ZCONNECTIONLOSS || code == ZOPERATIONTIMEOUT;
return isHardwareErrorCode(code);
}
const int32_t code;
......
......@@ -157,10 +157,13 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body
MergeTreeData::DataPartPtr Service::findPart(const String & name)
{
MergeTreeData::DataPartPtr part = data.getPartIfExists(name);
/// It is important to include PreCommitted parts here
/// Because part could be actually committed into ZooKeeper, but response from ZooKeeper to the server could be delayed
auto part = data.getPartIfExists(name, {MergeTreeDataPart::State::PreCommitted, MergeTreeDataPart::State::Committed});
if (part)
return part;
throw Exception("No part " + name + " in table");
throw Exception("No part " + name + " in table", ErrorCodes::NO_SUCH_DATA_PART);
}
MergeTreeData::DataPartPtr Service::findShardedPart(const String & name, size_t shard_no)
......
......@@ -15,6 +15,7 @@
#include <DataStreams/GraphiteRollupSortedBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <common/RangeFiltered.h>
namespace DB
{
......@@ -99,6 +100,10 @@ public:
/// After the DataPart is added to the working set, it cannot be changed.
using DataPartPtr = std::shared_ptr<const DataPart>;
using DataPartState = MergeTreeDataPart::State;
using DataPartStates = std::initializer_list<DataPartState>;
using DataPartStateVector = std::vector<DataPartState>;
struct DataPartPtrLess
{
using is_transparent = void;
......@@ -122,10 +127,7 @@ public:
public:
Transaction() {}
void commit()
{
clear();
}
void commit();
void rollback();
......@@ -155,6 +157,8 @@ public:
parts_to_remove_on_rollback.clear();
parts_to_add_on_rollback.clear();
}
void replaceParts(DataPartState move_precommitted_to, DataPartState move_committed_to, bool remove_without_delay);
};
/// An object that stores the names of temporary files created in the part directory during ALTER of its
......@@ -305,10 +309,29 @@ public:
String getLogName() const { return log_name; }
/// Returns a copy of the list so that the caller shouldn't worry about locks.
DataParts getDataParts(const DataPartStates & affordable_states) const;
DataPartsVector getDataPartsVector(const DataPartStates & affordable_states) const;
DataPartsVector getDataPartsVector(const DataPartStates & affordable_states, DataPartStateVector & out_states_snapshot) const;
/// Returns a virtual container iteration only through parts with specified states
decltype(auto) getDataPartsRange(const DataPartStates & affordable_states) const
{
return createRangeFiltered(DataPart::getStatesFilter(affordable_states), data_parts);
}
/// Returns Committed parts
DataParts getDataParts() const;
DataPartsVector getDataPartsVector() const;
/// Returns all parts except Temporary and Deleting ones
DataParts getAllDataParts() const;
/// Returns an comitted part with the given name or a part containing it. If there is no such part, returns nullptr.
DataPartPtr getActiveContainingPart(const String & part_name);
/// Returns the part with the given name (and state) or nullptr if no such part.
DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states = {DataPartState::Committed});
/// Total size of active parts in bytes.
size_t getTotalActiveSizeInBytes() const;
......@@ -318,12 +341,6 @@ public:
/// If until is non-null, wake up from the sleep earlier if the event happened.
void delayInsertIfNeeded(Poco::Event * until = nullptr);
/// Returns an active part with the given name or a part containing it. If there is no such part,
/// returns nullptr.
DataPartPtr getActiveContainingPart(const String & part_name);
/// Returns the part with the given name or nullptr if no such part.
DataPartPtr getPartIfExists(const String & part_name);
DataPartPtr getShardedPartIfExists(const String & part_name, size_t shard_no);
/// Renames temporary part to a permanent part and adds it to the working set.
......@@ -337,26 +354,26 @@ public:
DataPartsVector renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);
/// Removes from the working set parts in remove and adds parts in add. Parts in add must already be in
/// all_data_parts.
/// Removes parts from the working set parts.
/// Parts in add must already be in data_parts with PreCommitted, Committed, or Outdated states.
/// If clear_without_timeout is true, the parts will be deleted at once, or during the next call to
/// clearOldParts (ignoring old_parts_lifetime).
void replaceParts(const DataPartsVector & remove, const DataPartsVector & add, bool clear_without_timeout);
void removePartsFromWorkingSet(const DataPartsVector & remove, bool clear_without_timeout);
/// Renames the part to detached/<prefix>_<part> and forgets about it. The data won't be deleted in
/// clearOldParts.
/// If restore_covered is true, adds to the working set inactive parts, which were merged into the deleted part.
void renameAndDetachPart(const DataPartPtr & part, const String & prefix = "", bool restore_covered = false, bool move_to_detached = true);
/// Removes the part from the list of parts (including all_data_parts), but doesn't move the directory.
void detachPartInPlace(const DataPartPtr & part);
/// Returns old inactive parts that can be deleted. At the same time removes them from the list of parts
/// but not from the disk.
DataPartsVector grabOldParts();
/// Reverts the changes made by grabOldParts().
void addOldParts(const DataPartsVector & parts);
/// Reverts the changes made by grabOldParts(), parts should be in Deleting state.
void rollbackDeletingParts(const DataPartsVector & parts);
/// Removes parts from data_parts, they should be in Deleting state
void removePartsFinally(const DataPartsVector & parts);
/// Delete irrelevant parts.
void clearOldParts();
......@@ -400,12 +417,6 @@ public:
broken_part_callback(name);
}
/// Delete old parts from disk and ZooKeeper (in replicated case)
void clearOldPartsAndRemoveFromZK()
{
parts_clean_callback();
}
ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; }
SortDescription getSortDescription() const { return sort_descr; }
......@@ -523,8 +534,6 @@ private:
/// Engine-specific methods
BrokenPartCallback broken_part_callback;
/// Use to delete outdated parts immediately from memory, disk and ZooKeeper
PartsCleanCallback parts_clean_callback;
String log_name;
Logger * log;
......@@ -536,8 +545,8 @@ private:
/// The set of all data parts including already merged but not yet deleted. Usually it is small (tens of elements).
/// The part is referenced from here, from the list of current parts and from each thread reading from it.
/// This means that if reference count is 1 - the part is not used right now and can be deleted.
DataParts all_data_parts;
mutable std::mutex all_data_parts_mutex;
// DataParts all_data_parts;
// mutable std::mutex all_data_parts_mutex;
/// Used to serialize calls to grabOldParts.
std::mutex grab_old_parts_mutex;
......@@ -572,7 +581,7 @@ private:
void addPartContributionToColumnSizes(const DataPartPtr & part);
void removePartContributionToColumnSizes(const DataPartPtr & part);
/// If there is no part in the partition with ID `partition_id`, returns empty ptr.
/// If there is no part in the partition with ID `partition_id`, returns empty ptr. Should be called under the lock.
DataPartPtr getAnyPartInPartition(const String & partition_id, std::lock_guard<std::mutex> & data_parts_lock);
};
......
......@@ -500,8 +500,8 @@ size_t MergeTreeDataPart::calcTotalSize(const String & from)
std::vector<std::string> files;
cur.list(files);
size_t res = 0;
for (size_t i = 0; i < files.size(); ++i)
res += calcTotalSize(from + files[i]);
for (const auto & file : files)
res += calcTotalSize(from + file);
return res;
}
......@@ -541,6 +541,7 @@ void MergeTreeDataPart::remove() const
LOG_WARNING(storage.log, "Directory " << from << " (part to remove) doesn't exist or one of nested files has gone."
" Most likely this is due to manual removing. This should be discouraged. Ignoring.");
std::terminate();
return;
}
......@@ -576,7 +577,7 @@ void MergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_n
}
}
from_file.setLastModified(Poco::Timestamp::fromEpochTime(time(0)));
from_file.setLastModified(Poco::Timestamp::fromEpochTime(time(nullptr)));
from_file.renameTo(to);
relative_path = new_relative_path;
}
......@@ -910,4 +911,28 @@ size_t MergeTreeDataPart::getIndexSizeInAllocatedBytes() const
return res;
}
String MergeTreeDataPart::stateToString(MergeTreeDataPart::State state)
{
switch (state)
{
case State::Temporary:
return "Temporary";
case State::PreCommitted:
return "PreCommitted";
case State::Committed:
return "Committed";
case State::Outdated:
return "Outdated";
case State::Deleting:
return "Deleting";
default:
throw Exception("Unknown part state " + std::to_string(static_cast<int>(state)), ErrorCodes::LOGICAL_ERROR);
}
}
String MergeTreeDataPart::stateString() const
{
return stateToString(state);
}
}
......@@ -139,9 +139,87 @@ struct MergeTreeDataPart
/// If true, the destructor will delete the directory with the part.
bool is_temp = false;
/// If true it means that there are no ZooKeeper node for this part, so it should be deleted only from filesystem
bool is_duplicate = false;
/// For resharding.
size_t shard_no = 0;
/**
* Part state is a stage of its lifetime. States are ordered and state of a part could be increased only.
* Part state should be modified under data_parts mutex.
*
* Possible state transitions:
* Temporary -> Precommitted: we are trying to commit a fetched, inserted or merged part to active set
* Precommitted -> Outdated: we could not to add a part to active set and doing a rollback (for example it is duplicated part)
* Precommitted -> Commited: we successfully committed a part to active dataset
* Precommitted -> Outdated: a part was replaced by a covering part or DROP PARTITION
* Outdated -> Deleting: a cleaner selected this part for deletion
* Deleting -> Outdated: if an ZooKeeper error occurred during the deletion, we will retry deletion
*/
enum class State
{
Temporary, /// the part is generating now, it is not in data_parts list
PreCommitted, /// the part is in data_parts, but not used for SELECTs
Committed, /// active data part, used by current and upcoming SELECTs
Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes
Deleting /// not active data part with identity refcounter, it is deleting right now by a cleaner
};
/// Current state of the part. If the part is in working set already, it should be accessed via data_parts mutex
mutable State state{State::Temporary};
/// Returns name of state
static String stateToString(State state);
String stateString() const;
String getNameWithState() const
{
return name + " (state " + stateString() + ")";
}
/// Returns true if state of part is one of affordable_states
bool checkState(const std::initializer_list<State> & affordable_states) const
{
for (auto affordable_state : affordable_states)
{
if (state == affordable_state)
return true;
}
return false;
}
/// Throws an exception if state of the part is not in affordable_states
void assertState(const std::initializer_list<State> & affordable_states) const
{
if (!checkState(affordable_states))
{
String states_str;
for (auto state : affordable_states)
states_str += stateToString(state) + " ";
throw Exception("Unexpected state of part " + getNameWithState() + ". Expected: " + states_str);
}
}
/// In comparison with lambdas, it is move assignable and could has several overloaded operator()
struct StatesFilter
{
std::initializer_list<State> affordable_states;
StatesFilter(const std::initializer_list<State> & affordable_states) : affordable_states(affordable_states) {}
bool operator() (const std::shared_ptr<const MergeTreeDataPart> & part) const
{
return part->checkState(affordable_states);
}
};
/// Returns a lambda that returns true only for part with states from specified list
static inline StatesFilter getStatesFilter(const std::initializer_list<State> & affordable_states)
{
return StatesFilter(affordable_states);
}
/// Primary key (correspond to primary.idx file).
/// Always loaded in RAM. Contains each index_granularity-th value of primary key tuple.
/// Note that marks (also correspond to primary key) is not always in RAM, but cached. See MarkCache.h.
......
......@@ -305,6 +305,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
{
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")");
part->is_duplicate = true;
transaction.rollback();
last_block_is_duplicate = true;
}
......
......@@ -37,7 +37,7 @@ void ReplicatedMergeTreeCleanupThread::run()
tryLogCurrentException(__PRETTY_FUNCTION__);
}
storage.shutdown_event.tryWait(CLEANUP_SLEEP_MS);
storage.cleanup_thread_event.tryWait(CLEANUP_SLEEP_MS);
}
LOG_DEBUG(log, "Cleanup thread finished");
......@@ -46,7 +46,7 @@ void ReplicatedMergeTreeCleanupThread::run()
void ReplicatedMergeTreeCleanupThread::iterate()
{
storage.clearOldPartsAndRemoveFromZK(log);
storage.clearOldPartsAndRemoveFromZK();
storage.data.clearOldTemporaryDirectories();
if (storage.is_leader_node)
......
......@@ -367,6 +367,7 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
storage.merge_selecting_event.set();
storage.queue_updating_event->set();
storage.alter_query_event->set();
storage.cleanup_thread_event.set();
storage.replica_is_active_node = nullptr;
LOG_TRACE(log, "Waiting for threads to finish");
......
......@@ -482,7 +482,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & query, const ASTPtr & partit
if (detach)
data.renameAndDetachPart(part, "");
else
data.replaceParts({part}, {}, false);
data.removePartsFromWorkingSet({part}, false);
}
LOG_INFO(log, (detach ? "Detached " : "Removed ") << removed_parts << " parts inside partition ID " << partition_id << ".");
......
......@@ -1468,7 +1468,7 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
/// If the part needs to be removed, it is more reliable to delete the directory after the changes in ZooKeeper.
if (!entry.detach)
data.replaceParts({part}, {}, true);
data.removePartsFromWorkingSet({part}, true);
}
LOG_INFO(log, (entry.detach ? "Detached " : "Removed ") << removed_parts << " parts inside " << entry.new_part_name << ".");
......@@ -2155,6 +2155,14 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum)
{
if (auto part = data.getPartIfExists(part_name, {MergeTreeDataPart::State::Outdated, MergeTreeDataPart::State::Deleting}))
{
LOG_DEBUG(log, "Part " << part->getNameWithState() << " should be deleted after previous attempt before fetch");
/// Force premature parts cleanup
cleanup_thread_event.set();
return false;
}
{
std::lock_guard<std::mutex> lock(currently_fetching_parts_mutex);
if (!currently_fetching_parts.insert(part_name).second)
......@@ -3536,10 +3544,10 @@ void StorageReplicatedMergeTree::reshardPartitions(
/// Make a list of local partitions that need to be resharded.
std::set<std::string> unique_partition_list;
const MergeTreeData::DataParts & data_parts = data.getDataParts();
for (MergeTreeData::DataParts::iterator it = data_parts.cbegin(); it != data_parts.cend(); ++it)
auto data_parts = data.getDataParts();
for (auto & part : data_parts)
{
const String & current_partition_id = (*it)->info.partition_id;
const String & current_partition_id = part->info.partition_id;
if (include_all || partition_id == current_partition_id)
unique_partition_list.insert(current_partition_id);
}
......@@ -3855,64 +3863,99 @@ bool StorageReplicatedMergeTree::checkSpaceForResharding(const ReplicaToSpaceInf
}
void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK(Logger * log_)
void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
{
/// Critical section is not required (since grabOldParts() returns unique part set on each call)
Logger * log = log_ ? log_ : this->log;
auto table_lock = lockStructure(false, __PRETTY_FUNCTION__);
auto zookeeper = getZooKeeper();
MergeTreeData::DataPartsVector parts = data.grabOldParts();
size_t count = parts.size();
if (!count)
if (parts.empty())
return;
/// Part names that were successfully deleted from filesystem and should be deleted from ZooKeeper
Strings part_names;
auto remove_from_zookeeper = [&] ()
MergeTreeData::DataPartsVector parts_to_delete_only_from_filesystem; // Only duplicates
MergeTreeData::DataPartsVector parts_to_delete_completely; // All parts except duplicates
MergeTreeData::DataPartsVector parts_to_retry_deletion; // Parts that should be retried due to network problems
MergeTreeData::DataPartsVector parts_to_remove_from_filesystem; // Parts removed from ZK
for (const auto & part : parts)
{
LOG_DEBUG(log, "Removed " << part_names.size() << " old parts from filesystem. Removing them from ZooKeeper.");
if (!part->is_duplicate)
parts_to_delete_completely.emplace_back(part);
else
parts_to_delete_only_from_filesystem.emplace_back(part);
}
parts.clear();
try
{
removePartsFromZooKeeper(zookeeper, part_names);
}
catch (...)
auto remove_parts_from_filesystem = [log=log] (const MergeTreeData::DataPartsVector & parts_to_remove)
{
for (auto & part : parts_to_remove)
{
LOG_ERROR(log, "There is a problem with deleting parts from ZooKeeper: " << getCurrentExceptionMessage(false));
try
{
part->remove();
}
catch (...)
{
tryLogCurrentException(log, "There is a problem with deleting part " + part->name + " from filesystem");
}
}
};
/// Delete duplicate parts from filesystem
if (!parts_to_delete_only_from_filesystem.empty())
{
remove_parts_from_filesystem(parts_to_delete_only_from_filesystem);
data.removePartsFinally(parts_to_delete_only_from_filesystem);
LOG_DEBUG(log, "Removed " << parts_to_delete_only_from_filesystem.size() << " old duplicate parts");
}
/// Delete normal parts from ZooKeeper
NameSet part_names_to_retry_deletion;
try
{
LOG_DEBUG(log, "Removing " << parts.size() << " old parts from filesystem");
Strings part_names_to_delete_completely;
for (const auto & part : parts_to_delete_completely)
part_names_to_delete_completely.emplace_back(part->name);
while (!parts.empty())
{
MergeTreeData::DataPartPtr & part = parts.back();
part->remove();
part_names.emplace_back(part->name);
parts.pop_back();
}
LOG_DEBUG(log, "Removing " << parts_to_delete_completely.size() << " old parts from ZooKeeper");
removePartsFromZooKeeper(zookeeper, part_names_to_delete_completely, &part_names_to_retry_deletion);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
LOG_ERROR(log, "There is a problem with deleting parts from ZooKeeper: " << getCurrentExceptionMessage(false));
}
/// Finalize deletion of parts already deleted from filesystem, rollback remaining parts
data.addOldParts(parts);
remove_from_zookeeper();
/// Part names that were reliably deleted from ZooKeeper should be deleted from filesystem
auto num_reliably_deleted_parts = parts_to_delete_completely.size() - part_names_to_retry_deletion.size();
LOG_DEBUG(log, "Removed " << num_reliably_deleted_parts << " old parts from ZooKeeper. Removing them from filesystem.");
throw;
/// Delete normal parts on two sets
for (auto & part : parts_to_delete_completely)
{
if (part_names_to_retry_deletion.count(part->name) == 0)
parts_to_remove_from_filesystem.emplace_back(part);
else
parts_to_retry_deletion.emplace_back(part);
}
/// Finalize deletion
remove_from_zookeeper();
/// Will retry deletion
if (!parts_to_retry_deletion.empty())
{
data.rollbackDeletingParts(parts_to_retry_deletion);
LOG_DEBUG(log, "Will retry deletion of " << parts_to_retry_deletion.size() << " parts in the next time");
}
LOG_DEBUG(log, "Removed " << count << " old parts");
/// Remove parts from filesystem and finally from data_parts
if (!parts_to_remove_from_filesystem.empty())
{
remove_parts_from_filesystem(parts_to_remove_from_filesystem);
data.removePartsFinally(parts_to_remove_from_filesystem);
LOG_DEBUG(log, "Removed " << parts_to_remove_from_filesystem.size() << " old parts");
}
}
......@@ -3932,7 +3975,8 @@ static int32_t tryMultiWithRetries(zkutil::ZooKeeperPtr & zookeeper, zkutil::Ops
}
void StorageReplicatedMergeTree::removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names)
void StorageReplicatedMergeTree::removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names,
NameSet * parts_should_be_retied)
{
zkutil::Ops ops;
auto it_first_node_in_batch = part_names.cbegin();
......@@ -3960,11 +4004,24 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeper(zkutil::ZooKeeperPtr &
auto cur_code = tryMultiWithRetries(zookeeper, cur_ops);
if (cur_code == ZNONODE)
{
LOG_DEBUG(log, "There is no part " << *it_in_batch << " in ZooKeeper, it was only in filesystem");
}
else if (parts_should_be_retied && zkutil::isHardwareErrorCode(cur_code))
{
parts_should_be_retied->emplace(*it_in_batch);
}
else if (cur_code != ZOK)
{
LOG_WARNING(log, "Cannot remove part " << *it_in_batch << " from ZooKeeper: " << ::zerror(cur_code));
}
}
}
else if (parts_should_be_retied && zkutil::isHardwareErrorCode(code))
{
for (auto it_in_batch = it_first_node_in_batch; it_in_batch != it_next; ++it_in_batch)
parts_should_be_retied->emplace(*it_in_batch);
}
else if (code != ZOK)
{
LOG_WARNING(log, "There was a problem with deleting " << (it_next - it_first_node_in_batch)
......
......@@ -205,7 +205,7 @@ public:
private:
/// Delete old chunks from disk and from ZooKeeper.
void clearOldPartsAndRemoveFromZK(Logger * log_ = nullptr);
void clearOldPartsAndRemoveFromZK();
friend class ReplicatedMergeTreeBlockOutputStream;
friend class ReplicatedMergeTreeRestartingThread;
......@@ -307,6 +307,8 @@ private:
/// A thread that removes old parts, log entries, and blocks.
std::unique_ptr<ReplicatedMergeTreeCleanupThread> cleanup_thread;
/// Is used to wakeup cleanup_thread
Poco::Event cleanup_thread_event;
/// A thread that processes reconnection to ZooKeeper when the session expires.
std::unique_ptr<ReplicatedMergeTreeRestartingThread> restarting_thread;
......@@ -380,7 +382,8 @@ private:
void removePartFromZooKeeper(const String & part_name, zkutil::Ops & ops);
/// Quickly removes big set of parts from ZooKeeper (using async multi queries)
void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names);
void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names,
NameSet * parts_should_be_retied = nullptr);
/// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts.
void removePartAndEnqueueFetch(const String & part_name);
......
......@@ -174,33 +174,41 @@ BlockInputStreams StorageSystemParts::read(
*/
if (e.code() == ErrorCodes::TABLE_IS_DROPPED)
continue;
else
throw;
throw;
}
String engine = storage->getName();
MergeTreeData * data = nullptr;
if (StorageMergeTree * merge_tree = dynamic_cast<StorageMergeTree *>(&*storage))
if (auto merge_tree = dynamic_cast<StorageMergeTree *>(&*storage))
{
data = &merge_tree->getData();
}
else if (StorageReplicatedMergeTree * replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(&*storage))
else if (auto replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(&*storage))
{
data = &replicated_merge_tree->getData();
}
else
{
throw Exception("Unknown engine " + engine, ErrorCodes::LOGICAL_ERROR);
}
MergeTreeData::DataParts active_parts = data->getDataParts();
MergeTreeData::DataParts all_parts;
using State = MergeTreeDataPart::State;
MergeTreeData::DataPartStateVector all_parts_state;
MergeTreeData::DataPartsVector all_parts;
if (need[0])
all_parts = data->getAllDataParts();
all_parts = data->getDataPartsVector({State::Committed, State::Outdated}, all_parts_state);
else
all_parts = active_parts;
all_parts = data->getDataPartsVector({State::Committed}, all_parts_state);
/// Finally, we'll go through the list of parts.
for (const MergeTreeData::DataPartPtr & part : all_parts)
for (size_t part_number = 0; part_number < all_parts.size(); ++part_number)
{
const auto & part = all_parts[part_number];
auto part_state = all_parts_state[part_number];
size_t i = 0;
{
WriteBufferFromOwnString out;
......@@ -208,7 +216,7 @@ BlockInputStreams StorageSystemParts::read(
block.getByPosition(i++).column->insert(out.str());
}
block.getByPosition(i++).column->insert(part->name);
block.getByPosition(i++).column->insert(static_cast<UInt64>(active_parts.count(part)));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part_state == State::Committed));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->marks_count));
size_t marks_size = 0;
......@@ -227,7 +235,7 @@ BlockInputStreams StorageSystemParts::read(
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->remove_time));
/// For convenience, in returned refcount, don't add references that was due to local variables in this method: all_parts, active_parts.
block.getByPosition(i++).column->insert(static_cast<UInt64>(part.use_count() - (active_parts.count(part) ? 2 : 1)));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part.use_count() - 1));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->getMinDate()));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->getMaxDate()));
......
#include <gtest/gtest.h>
#include <common/RangeFiltered.h>
#include <vector>
#include <set>
TEST(RangeFiltered, simple)
{
std::vector<int> v;
for (int i = 0; i < 10; ++i)
v.push_back(i);
auto v30 = createRangeFiltered([] (int i) { return i % 3 == 0;}, v);
auto v31 = createRangeFiltered([] (int i) { return i % 3 != 0;}, v);
for (const int & i : v30)
ASSERT_EQ(i % 3, 0);
for (const int & i : v31)
ASSERT_NE(i % 3, 0);
{
auto it = v30.begin();
ASSERT_EQ(*it, 0);
auto it2 = std::next(it);
ASSERT_EQ(*it2, 3);
auto it3 = it;
it = std::next(it2);
ASSERT_EQ(*it, 6);
}
{
auto it = std::next(v30.begin());
ASSERT_EQ(*it, 3);
*it = 2; /// it becomes invalid
ASSERT_EQ(*(++it), 6); /// but iteration is sucessfull
*v30.begin() = 1;
ASSERT_EQ(*v30.begin(), 6);
}
}
......@@ -3,5 +3,6 @@
<replicated_deduplication_window>999999999</replicated_deduplication_window>
<replicated_deduplication_window_seconds>1</replicated_deduplication_window_seconds>
<cleanup_delay_period>1</cleanup_delay_period>
<old_parts_lifetime>1</old_parts_lifetime>
</merge_tree>
</yandex>
......@@ -7,31 +7,19 @@ from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import TSV
from helpers.client import CommandRequest
from helpers.client import QueryTimeoutExceedException
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', config_dir='configs', with_zookeeper=True)
node2 = cluster.add_instance('node2', config_dir='configs', with_zookeeper=True)
node1 = cluster.add_instance('node1', config_dir='configs', with_zookeeper=True, macroses={"layer": 0, "shard": 0, "replica": 1})
node2 = cluster.add_instance('node2', config_dir='configs', with_zookeeper=True, macroses={"layer": 0, "shard": 0, "replica": 2})
nodes = [node1, node2]
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for node in nodes:
node.query('''
CREATE TABLE simple (date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);
'''.format(replica=node.name))
node.query("INSERT INTO simple VALUES (0, 0)")
node.query('''
CREATE TABLE simple2 (date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple2', '{replica}', date, id, 8192);
'''.format(replica=node.name))
yield cluster
finally:
......@@ -42,24 +30,24 @@ ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple2', '{replica}', date,
def test_deduplication_window_in_seconds(started_cluster):
node = node1
node.query("INSERT INTO simple2 VALUES (0, 0)")
node1.query("""
CREATE TABLE simple ON CLUSTER test_cluster (date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/simple', '{replica}', date, id, 8192)""")
node.query("INSERT INTO simple VALUES (0, 0)")
time.sleep(1)
node.query("INSERT INTO simple2 VALUES (0, 0)") # deduplication works here
node.query("INSERT INTO simple2 VALUES (0, 1)")
assert TSV(node.query("SELECT count() FROM simple2")) == TSV("2\n")
node.query("INSERT INTO simple VALUES (0, 0)") # deduplication works here
node.query("INSERT INTO simple VALUES (0, 1)")
assert TSV(node.query("SELECT count() FROM simple")) == TSV("2\n")
# wait clean thread
time.sleep(2)
assert TSV.toMat(node.query("SELECT count() FROM system.zookeeper WHERE path='/clickhouse/tables/0/simple2/blocks'"))[0][0] == "1"
node.query("INSERT INTO simple2 VALUES (0, 0)") # deduplication doesn't works here, the first hash node was deleted
assert TSV.toMat(node.query("SELECT count() FROM simple2"))[0][0] == "3"
assert TSV.toMat(node.query("SELECT count() FROM system.zookeeper WHERE path='/clickhouse/tables/0/simple/blocks'"))[0][0] == "1"
node.query("INSERT INTO simple VALUES (0, 0)") # deduplication doesn't works here, the first hash node was deleted
assert TSV.toMat(node.query("SELECT count() FROM simple"))[0][0] == "3"
def check_timeout_exception(e):
s = str(e)
#print s
assert s.find('timed out!') >= 0 or s.find('Return code: -9') >= 0
node1.query("""DROP TABLE simple ON CLUSTER test_cluster""")
# Currently this test just reproduce incorrect behavior that sould be fixed
......@@ -67,6 +55,12 @@ def test_deduplication_works_in_case_of_intensive_inserts(started_cluster):
inserters = []
fetchers = []
node1.query("""
CREATE TABLE simple ON CLUSTER test_cluster (date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/simple', '{replica}', date, id, 8192)""")
node1.query("INSERT INTO simple VALUES (0, 0)")
for node in nodes:
host = node.ip_address
......@@ -81,7 +75,7 @@ done
set -e
for i in `seq 1000`; do
res=`clickhouse-client --host {} -q "SELECT count() FROM simple"`
if [[ $res -ne 1 ]]; then
if [[ $? -ne 0 || $res -ne 1 ]]; then
echo "Selected $res elements! Host: {}" 1>&2
exit -1
fi;
......@@ -92,14 +86,16 @@ done
for inserter in inserters:
try:
inserter.get_answer()
except Exception as e:
check_timeout_exception(e)
except QueryTimeoutExceedException:
# Only timeout is accepted
pass
# There were not errors during SELECTs
for fetcher in fetchers:
try:
fetcher.get_answer()
except Exception as e:
except QueryTimeoutExceedException:
# Only timeout is accepted
pass
# Uncomment when problem will be fixed
# check_timeout_exception(e)
node1.query("""DROP TABLE simple ON CLUSTER test_cluster""")
<yandex>
<merge_tree>
<replicated_deduplication_window>999999999</replicated_deduplication_window>
<replicated_deduplication_window_seconds>999999999</replicated_deduplication_window_seconds>
<cleanup_delay_period>10</cleanup_delay_period>
<old_parts_lifetime>1</old_parts_lifetime>
</merge_tree>
</yandex>
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>
import time
import os
from contextlib import contextmanager
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import TSV
from helpers.client import CommandRequest
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', config_dir='configs', with_zookeeper=True, macroses={"layer": 0, "shard": 0, "replica": 1})
node2 = cluster.add_instance('node2', config_dir='configs', with_zookeeper=True, macroses={"layer": 0, "shard": 0, "replica": 2})
nodes = [node1, node2]
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
pass
cluster.shutdown()
def test_random_inserts(started_cluster):
# Duration of the test, reduce it if don't want to wait
DURATION_SECONDS = 10# * 60
node1.query("""
CREATE TABLE simple ON CLUSTER test_cluster (date Date, i UInt32, s String)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/simple', '{replica}', date, i, 8192)""")
with PartitionManager() as pm_random_drops:
for sacrifice in nodes:
pass # This test doesn't work with partition problems still
#pm_random_drops._add_rule({'probability': 0.01, 'destination': sacrifice.ip_address, 'source_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
#pm_random_drops._add_rule({'probability': 0.01, 'source': sacrifice.ip_address, 'destination_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
min_timestamp = int(time.time())
max_timestamp = min_timestamp + DURATION_SECONDS
num_timestamps = max_timestamp - min_timestamp + 1
bash_script = os.path.join(os.path.dirname(__file__), "test.sh")
inserters = []
for node in nodes:
cmd = ['/bin/bash', bash_script, node.ip_address, str(min_timestamp), str(max_timestamp)]
inserters.append(CommandRequest(cmd, timeout=DURATION_SECONDS * 2, stdin=''))
print node.name, node.ip_address
for inserter in inserters:
inserter.get_answer()
answer="{}\t{}\t{}\t{}\n".format(num_timestamps, num_timestamps, min_timestamp, max_timestamp)
for node in nodes:
assert TSV(node.query("SELECT count(), uniqExact(i), min(i), max(i) FROM simple")) == TSV(answer), node.name + " : " + node.query("SELECT groupArray(_part), i, count() AS c FROM simple GROUP BY i ORDER BY c DESC LIMIT 1")
node1.query("""DROP TABLE simple ON CLUSTER test_cluster""")
#!/bin/bash
#set -e
[[ -n "$1" ]] && host="$1" || host="127.0.0.1"
[[ -n "$2" ]] && min_timestamp="$2" || min_timestamp=$(( $(date +%s) - 10 ))
[[ -n "$3" ]] && max_timestamp="$3" || max_timestamp=$(( $(date +%s) + 10 ))
timestamps=`seq $min_timestamp $max_timestamp`
function reliable_insert {
local ts="$1"
num_tries=0
while true; do
if (( $num_tries > 20 )); then
echo "Too many retries" 1>&2
exit -1
fi
#echo clickhouse-client --host $host -q "INSERT INTO simple VALUES (0, $ts, '$ts')"
res=`clickhouse-client --host $host -q "INSERT INTO simple VALUES (0, $ts, '$ts')" 2>&1`
rt=$?
num_tries=$(($num_tries+1))
if (( $rt == 0 )); then break; fi
if [[ $res == *"Code: 319. "*"Unknown status, client must retry"* || $res == *"Code: 999. "* ]]; then
continue
else
echo FAIL "$res" 1>&2
exit -1
fi
done;
}
for i in $timestamps; do
cur_timestamp=$(date +%s)
while (( $cur_timestamp < $i )); do
ts=`shuf -i $min_timestamp-$cur_timestamp -n 1`
reliable_insert "$ts"
cur_timestamp=$(date +%s)
done
#echo $i >> $host".txt"
reliable_insert "$i"
done
\ No newline at end of file
#pragma once
#include <type_traits>
/// Similar to boost::filtered_range but a little bit easier and allows to convert ordinary iterators to filtered
template <typename F, typename C>
struct RangeFiltered
{
using RawIterator = typename C:: iterator;
class Iterator;
/// Will iterate over elements for which filter(*it) == true
RangeFiltered(F && filter, const C & container)
: filter(std::move(filter)), container(container) {}
Iterator begin() const
{
return {*this, std::begin(container)};
}
Iterator end() const
{
return {*this, std::end(container)};
}
/// Convert ordinary iterator to filtered one
/// Real position will be in range [ordinary_iterator; end()], so it is suitable to use with lower[upper]_bound()
inline Iterator convert(RawIterator ordinary_iterator) const
{
return {*this, ordinary_iterator};
}
/// It is similar to boost::filtered_iterator, but has additional features:
/// it doesn't store end() iterator
/// it doesn't store predicate, so it allows to implement operator=()
/// it guarantees that operator++() works properly in case of filter(*it) == false
class Iterator
{
public:
using Range = RangeFiltered<F, C>;
typedef Iterator self_type;
typedef typename std::iterator_traits<RawIterator>::value_type value_type;
typedef typename std::iterator_traits<RawIterator>::reference reference;
typedef const value_type & const_reference;
typedef typename std::iterator_traits<RawIterator>::pointer pointer;
typedef const value_type * const_pointer;
typedef typename std::iterator_traits<RawIterator>::difference_type difference_type;
typedef std::bidirectional_iterator_tag iterator_category;
Iterator(const Range & range_, RawIterator iter_)
: range(&range_), iter(iter_)
{
for (; iter != std::end(range->container) && !range->filter(*iter); ++iter);
}
Iterator(const Iterator & rhs) = default;
Iterator(Iterator && rhs) noexcept = default;
Iterator operator++()
{
++iter;
for (; iter != std::end(range->container) && !range->filter(*iter); ++iter);
return *this;
}
Iterator operator--()
{
--iter;
for (; !range->filter(*iter); --iter); /// Don't check std::begin() bound
return *this;
}
pointer operator->()
{
return iter.operator->();
}
const_pointer operator->() const
{
return iter.operator->();
}
reference operator*()
{
return *iter;
}
const_reference operator*() const
{
return *iter;
}
bool operator==(const self_type & rhs) const
{
return iter == rhs.iter;
}
bool operator!=(const self_type & rhs) const
{
return iter != rhs.iter;
}
self_type & operator=(const self_type & rhs) = default;
self_type & operator=(self_type && rhs) noexcept = default;
~Iterator() = default;
private:
const Range * range = nullptr;
RawIterator iter;
};
protected:
F filter;
const C & container;
};
template <typename F, typename C>
inline RangeFiltered<std::decay_t<F>, std::decay_t<C>> createRangeFiltered(F && filter, C && container)
{
return {std::forward<F>(filter), std::forward<C>(container)};
};
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册