提交 43fedfec 编写于 作者: A Alexey Milovidov

Get rid of INSERT ID [#CLICKHOUSE-31].

上级 1790eeeb
......@@ -12,10 +12,6 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s
settings.ostr << (settings.hilite ? hilite_keyword : "") << "INSERT INTO " << (settings.hilite ? hilite_none : "")
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
if (!insert_id.empty())
settings.ostr << (settings.hilite ? hilite_keyword : "") << " ID = " << (settings.hilite ? hilite_none : "")
<< std::quoted(insert_id, '\'');
if (columns)
{
settings.ostr << " (";
......
......@@ -17,8 +17,6 @@ public:
ASTPtr columns;
String format;
ASTPtr select;
/// INSERT query identifier. Used for replication.
String insert_id;
/// Data to insert
const char * data = nullptr;
const char * end = nullptr;
......
......@@ -29,9 +29,6 @@ bool ParserInsertQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
ParserString s_insert("INSERT", true, true);
ParserString s_into("INTO", true, true);
ParserString s_dot(".");
ParserString s_id("ID");
ParserString s_eq("=");
ParserStringLiteral id_p;
ParserString s_values("VALUES", true, true);
ParserString s_format("FORMAT", true, true);
ParserString s_select("SELECT", true, true);
......@@ -45,7 +42,6 @@ bool ParserInsertQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
ASTPtr columns;
ASTPtr format;
ASTPtr select;
ASTPtr id;
/// Insertion data
const char * data = nullptr;
......@@ -75,19 +71,6 @@ bool ParserInsertQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
ws.ignore(pos, end);
if (s_id.ignore(pos, end, max_parsed_pos, expected))
{
if (!s_eq.ignore(pos, end, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
if (!id_p.parse(pos, end, id, max_parsed_pos, expected))
return false;
}
ws.ignore(pos, end);
/// Is there a list of columns
if (s_lparen.ignore(pos, end, max_parsed_pos, expected))
{
......@@ -159,9 +142,6 @@ bool ParserInsertQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
query->table = typeid_cast<ASTIdentifier &>(*table).name;
if (id)
query->insert_id = safeGet<const String &>(typeid_cast<ASTLiteral &>(*id).value);
if (format)
query->format = typeid_cast<ASTIdentifier &>(*format).name;
......
......@@ -23,8 +23,8 @@ namespace ErrorCodes
ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
StorageReplicatedMergeTree & storage_, const String & insert_id_, size_t quorum_, size_t quorum_timeout_ms_)
: storage(storage_), insert_id(insert_id_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_),
StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_)
: storage(storage_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_),
log(&Logger::get(storage.data.getLogName() + " (Replicated OutputStream)"))
{
/// The quorum value `1` has the same meaning as if it is disabled.
......@@ -110,7 +110,6 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
assertSessionIsNotExpired(zookeeper);
++block_index;
String block_id = insert_id.empty() ? "" : insert_id + "__" + toString(block_index);
String month_name = toString(DateLUT::instance().toNumYYYYMMDD(DayNum_t(current_block.min_date)) / 100);
AbandonableLockInZooKeeper block_number_lock = storage.allocateBlockNumber(month_name); /// 2 RTT
......@@ -132,16 +131,8 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
String checksum(hash_value.bytes, 16);
/// If no ID is specified in query, we take the hash from the data as ID. That is, do not insert the same data twice.
/// NOTE: If you do not need this deduplication, you can leave `block_id` empty instead.
/// Setting or syntax in the query (for example, `ID = null`) could be done for this.
if (block_id.empty())
{
block_id = toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]);
if (block_id.empty())
throw Exception("Logical error: block_id is empty.", ErrorCodes::LOGICAL_ERROR);
}
/// We take the hash from the data as ID. That is, do not insert the same data twice.
String block_id = toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]);
LOG_DEBUG(log, "Wrote block " << part_number << " with ID " << block_id << ", " << current_block.block.rows() << " rows");
......@@ -253,12 +244,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
/// If the data is different from the ones that were inserted earlier with the same ID, throw an exception.
if (expected_checksum != checksum)
{
if (!insert_id.empty())
throw Exception("Attempt to insert block with same ID but different checksum", ErrorCodes::CHECKSUM_DOESNT_MATCH);
else
throw Exception("Logical error: got ZNODEEXISTS while inserting data, block ID is derived from checksum but checksum doesn't match", ErrorCodes::LOGICAL_ERROR);
}
throw Exception("Logical error: got ZNODEEXISTS while inserting data, block ID is derived from checksum but checksum doesn't match", ErrorCodes::LOGICAL_ERROR);
transaction.rollback();
}
......
......@@ -14,14 +14,13 @@ class StorageReplicatedMergeTree;
class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, const String & insert_id_,
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_,
size_t quorum_, size_t quorum_timeout_ms_);
void write(const Block & block) override;
private:
StorageReplicatedMergeTree & storage;
String insert_id;
size_t quorum;
size_t quorum_timeout_ms;
size_t block_index = 0;
......
......@@ -14,7 +14,6 @@
#include <Databases/IDatabase.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/queryToString.h>
......@@ -2339,12 +2338,7 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & query, con
{
assertNotReadonly();
String insert_id;
if (query)
if (ASTInsertQuery * insert = typeid_cast<ASTInsertQuery *>(&*query))
insert_id = insert->insert_id;
return std::make_shared<ReplicatedMergeTreeBlockOutputStream>(*this, insert_id,
return std::make_shared<ReplicatedMergeTreeBlockOutputStream>(*this,
settings.insert_quorum, settings.insert_quorum_timeout.totalMilliseconds());
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册