diff --git a/dbms/src/Parsers/ASTInsertQuery.cpp b/dbms/src/Parsers/ASTInsertQuery.cpp index fc1a7af94e1bef63a2f781019083ec0955b90892..e3d047b42a41b8b17ca03e9b49e68a6725b85d5d 100644 --- a/dbms/src/Parsers/ASTInsertQuery.cpp +++ b/dbms/src/Parsers/ASTInsertQuery.cpp @@ -12,10 +12,6 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s settings.ostr << (settings.hilite ? hilite_keyword : "") << "INSERT INTO " << (settings.hilite ? hilite_none : "") << (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table); - if (!insert_id.empty()) - settings.ostr << (settings.hilite ? hilite_keyword : "") << " ID = " << (settings.hilite ? hilite_none : "") - << std::quoted(insert_id, '\''); - if (columns) { settings.ostr << " ("; diff --git a/dbms/src/Parsers/ASTInsertQuery.h b/dbms/src/Parsers/ASTInsertQuery.h index e65c349745c6b4f61899d398e90fc9d7ea8695c9..c6f584a7d5e8ca9824c4b279442b78f47973ae35 100644 --- a/dbms/src/Parsers/ASTInsertQuery.h +++ b/dbms/src/Parsers/ASTInsertQuery.h @@ -17,8 +17,6 @@ public: ASTPtr columns; String format; ASTPtr select; - /// INSERT query identifier. Used for replication. - String insert_id; /// Data to insert const char * data = nullptr; const char * end = nullptr; diff --git a/dbms/src/Parsers/ParserInsertQuery.cpp b/dbms/src/Parsers/ParserInsertQuery.cpp index 6b116e4277f54e8aa47eec61072dde8386bb8a42..1f8790e0848311cc3a92cf654eacdd2bf6c51e1e 100644 --- a/dbms/src/Parsers/ParserInsertQuery.cpp +++ b/dbms/src/Parsers/ParserInsertQuery.cpp @@ -29,9 +29,6 @@ bool ParserInsertQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p ParserString s_insert("INSERT", true, true); ParserString s_into("INTO", true, true); ParserString s_dot("."); - ParserString s_id("ID"); - ParserString s_eq("="); - ParserStringLiteral id_p; ParserString s_values("VALUES", true, true); ParserString s_format("FORMAT", true, true); ParserString s_select("SELECT", true, true); @@ -45,7 +42,6 @@ bool ParserInsertQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p ASTPtr columns; ASTPtr format; ASTPtr select; - ASTPtr id; /// Insertion data const char * data = nullptr; @@ -75,19 +71,6 @@ bool ParserInsertQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p ws.ignore(pos, end); - if (s_id.ignore(pos, end, max_parsed_pos, expected)) - { - if (!s_eq.ignore(pos, end, max_parsed_pos, expected)) - return false; - - ws.ignore(pos, end); - - if (!id_p.parse(pos, end, id, max_parsed_pos, expected)) - return false; - } - - ws.ignore(pos, end); - /// Is there a list of columns if (s_lparen.ignore(pos, end, max_parsed_pos, expected)) { @@ -159,9 +142,6 @@ bool ParserInsertQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p query->table = typeid_cast(*table).name; - if (id) - query->insert_id = safeGet(typeid_cast(*id).value); - if (format) query->format = typeid_cast(*format).name; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index 6ea311318fed90579583bc20696274e664f6c28b..9fd1d413b07c3fc031dce0fad5504bfc2d1398d4 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -23,8 +23,8 @@ namespace ErrorCodes ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream( - StorageReplicatedMergeTree & storage_, const String & insert_id_, size_t quorum_, size_t quorum_timeout_ms_) - : storage(storage_), insert_id(insert_id_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_), + StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_) + : storage(storage_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_), log(&Logger::get(storage.data.getLogName() + " (Replicated OutputStream)")) { /// The quorum value `1` has the same meaning as if it is disabled. @@ -110,7 +110,6 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) assertSessionIsNotExpired(zookeeper); ++block_index; - String block_id = insert_id.empty() ? "" : insert_id + "__" + toString(block_index); String month_name = toString(DateLUT::instance().toNumYYYYMMDD(DayNum_t(current_block.min_date)) / 100); AbandonableLockInZooKeeper block_number_lock = storage.allocateBlockNumber(month_name); /// 2 RTT @@ -132,16 +131,8 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) String checksum(hash_value.bytes, 16); - /// If no ID is specified in query, we take the hash from the data as ID. That is, do not insert the same data twice. - /// NOTE: If you do not need this deduplication, you can leave `block_id` empty instead. - /// Setting or syntax in the query (for example, `ID = null`) could be done for this. - if (block_id.empty()) - { - block_id = toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]); - - if (block_id.empty()) - throw Exception("Logical error: block_id is empty.", ErrorCodes::LOGICAL_ERROR); - } + /// We take the hash from the data as ID. That is, do not insert the same data twice. + String block_id = toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]); LOG_DEBUG(log, "Wrote block " << part_number << " with ID " << block_id << ", " << current_block.block.rows() << " rows"); @@ -253,12 +244,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) /// If the data is different from the ones that were inserted earlier with the same ID, throw an exception. if (expected_checksum != checksum) - { - if (!insert_id.empty()) - throw Exception("Attempt to insert block with same ID but different checksum", ErrorCodes::CHECKSUM_DOESNT_MATCH); - else - throw Exception("Logical error: got ZNODEEXISTS while inserting data, block ID is derived from checksum but checksum doesn't match", ErrorCodes::LOGICAL_ERROR); - } + throw Exception("Logical error: got ZNODEEXISTS while inserting data, block ID is derived from checksum but checksum doesn't match", ErrorCodes::LOGICAL_ERROR); transaction.rollback(); } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index 46f286f6ea1ae1763d8b0cfe3066871ec8bb831f..f3374df55c1fc4ed696eda54059fb423edb5a778 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -14,14 +14,13 @@ class StorageReplicatedMergeTree; class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream { public: - ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, const String & insert_id_, + ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_); void write(const Block & block) override; private: StorageReplicatedMergeTree & storage; - String insert_id; size_t quorum; size_t quorum_timeout_ms; size_t block_index = 0; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 50e9cbc23180ae257c5a09c40c18e186d655ab0f..53e2030057ba156327436d3d6e4be4ce6415959c 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -14,7 +14,6 @@ #include #include -#include #include #include @@ -2339,12 +2338,7 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & query, con { assertNotReadonly(); - String insert_id; - if (query) - if (ASTInsertQuery * insert = typeid_cast(&*query)) - insert_id = insert->insert_id; - - return std::make_shared(*this, insert_id, + return std::make_shared(*this, settings.insert_quorum, settings.insert_quorum_timeout.totalMilliseconds()); }