提交 628de0a3 编写于 作者: V Vitaliy Lyudvichenko 提交者: alexey-milovidov

Fixed logging of DDL queries. [#CLICKHOUSE-3128]

上级 736feab7
......@@ -258,7 +258,8 @@ bool ExecutionStatus::tryDeserializeText(const std::string & data)
ExecutionStatus ExecutionStatus::fromCurrentException(const std::string & start_of_message)
{
return ExecutionStatus(getCurrentExceptionCode(), start_of_message + ": " + getCurrentExceptionMessage(false, true));
String msg = start_of_message.empty() ? "" : (start_of_message + ": " + getCurrentExceptionMessage(false, true));
return ExecutionStatus(getCurrentExceptionCode(), msg);
}
......
......@@ -119,10 +119,10 @@ void Cluster::Address::fromString(const String & host_port_string, String & host
{
auto pos = host_port_string.find_last_of(':');
if (pos == std::string::npos)
throw Exception("Incorrect host ID format " + host_port_string, ErrorCodes::SYNTAX_ERROR);
throw Exception("Incorrect <host>:<port> format " + host_port_string, ErrorCodes::SYNTAX_ERROR);
host_name = unescapeForFileName(host_port_string.substr(0, pos));
port = parse<UInt16>(host_port_string.substr(0, pos));
port = parse<UInt16>(host_port_string.substr(pos + 1));
}
......
......@@ -328,25 +328,29 @@ void DDLWorker::processTasks()
}
static bool tryExecuteQuery(const String & query, Context & context, ExecutionStatus & status, Logger * log = nullptr)
bool DDLWorker::tryExecuteQuery(const String & query, const DDLTask & task, ExecutionStatus & status)
{
String query_prefix = "/*ddl_entry=" + task.entry_name + "*/ ";
String query_to_execute = query_prefix + query;
ReadBufferFromString istr(query_to_execute);
String dummy_string;
WriteBufferFromString ostr(dummy_string);
try
{
executeQuery(query, context);
executeQuery(istr, ostr, false, context, nullptr);
}
catch (...)
{
status = ExecutionStatus::fromCurrentException();
if (log)
tryLogCurrentException(log, "Query " + query + " wasn't finished successfully");
tryLogCurrentException(log, "Query " + query + " wasn't finished successfully");
return false;
}
status = ExecutionStatus(0);
if (log)
LOG_DEBUG(log, "Executed query: " << query);
LOG_DEBUG(log, "Executed query: " << query);
return true;
}
......@@ -372,7 +376,6 @@ void DDLWorker::processTask(DDLTask & task)
{
ASTPtr rewritten_ast = task.query_on_cluster->getRewrittenASTWithoutOnCluster(task.host_address.default_database);
String rewritten_query = queryToString(rewritten_ast);
LOG_DEBUG(log, "Executing query: " << rewritten_query);
if (auto query_alter = dynamic_cast<const ASTAlterQuery *>(rewritten_ast.get()))
......@@ -381,7 +384,7 @@ void DDLWorker::processTask(DDLTask & task)
}
else
{
tryExecuteQuery(rewritten_query, context, current_node_execution_status, log);
tryExecuteQuery(rewritten_query, task, current_node_execution_status);
}
}
catch (const zkutil::KeeperException & e)
......@@ -487,7 +490,7 @@ void DDLWorker::processTaskAlter(
if (lock.tryLock())
{
tryExecuteQuery(rewritten_query, context, current_node_execution_status, log);
tryExecuteQuery(rewritten_query, task, current_node_execution_status);
if (execute_on_leader_replica && current_node_execution_status.code == ErrorCodes::NOT_IMPLEMENTED)
{
......@@ -509,7 +512,7 @@ void DDLWorker::processTaskAlter(
}
else
{
tryExecuteQuery(rewritten_query, context, current_node_execution_status, log);
tryExecuteQuery(rewritten_query, task, current_node_execution_status);
}
}
......@@ -565,7 +568,7 @@ void DDLWorker::cleanupQueue(const Strings * node_names_to_check)
}
catch (...)
{
tryLogCurrentException(log, "An error occured while checking and cleaning node " + node_name + " from queue");
LOG_INFO(log, "An error occured while checking and cleaning node " + node_name + " from queue: " + getCurrentExceptionMessage(false));
}
}
}
......@@ -614,7 +617,7 @@ void DDLWorker::run()
{
processTasks();
LOG_DEBUG(log, "Waiting watch");
LOG_DEBUG(log, "Waiting a watch");
event_queue_updated->wait();
if (stop_flag)
......@@ -624,7 +627,7 @@ void DDLWorker::run()
}
catch (zkutil::KeeperException &)
{
LOG_DEBUG(log, "Recovering ZooKeeper session after " << getCurrentExceptionMessage());
LOG_DEBUG(log, "Recovering ZooKeeper session after " << getCurrentExceptionMessage(true));
zookeeper = context.getZooKeeper();
}
catch (...)
......@@ -730,6 +733,15 @@ public:
return res;
}
Block getSampleBlock() const
{
return sample.cloneEmpty();
}
~DDLQueryStatusInputSream() override = default;
private:
static Strings getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path)
{
Strings res;
......@@ -764,8 +776,6 @@ public:
return diff;
}
~DDLQueryStatusInputSream() override = default;
private:
String node_path;
Context & context;
......@@ -821,7 +831,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, Context & context)
return io;
auto stream = std::make_shared<DDLQueryStatusInputSream>(node_path, entry, context);
io.in_sample = stream->sample.cloneEmpty();
io.in_sample = stream->getSampleBlock();
io.in = std::move(stream);
return io;
}
......
......@@ -48,6 +48,8 @@ private:
const String & rewritten_query,
const String & node_path);
bool tryExecuteQuery(const String & query, const DDLTask & task, ExecutionStatus & status);
/// Checks and cleanups queue's nodes
void cleanupQueue(const Strings * node_names_to_check = nullptr);
......
......@@ -147,7 +147,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
ast = parseQuery(parser, begin, end, "");
/// Copy query into string. It will be written to log and presented in processlist. If an INSERT query, string will not include data to insertion.
query_size = ast->range.second - ast->range.first;
if (!(begin <= ast->range.first && ast->range.second <= end))
throw Exception("Unexpected behavior: AST chars range is not inside source range", ErrorCodes::LOGICAL_ERROR);
query_size = ast->range.second - begin;
if (max_query_size && query_size > max_query_size)
throw Exception("Query is too large (" + toString(query_size) + ")."
......
......@@ -10,10 +10,10 @@ namespace DB
/// Parse and execute a query.
void executeQuery(
ReadBuffer & istr, /// Where to read query from (and data for INSERT, if present).
WriteBuffer & ostr, /// Where to write query output to.
ReadBuffer & istr, /// Where to read query from (and data for INSERT, if present).
WriteBuffer & ostr, /// Where to write query output to.
bool allow_into_outfile, /// If true and the query contains INTO OUTFILE section, redirect output to that file.
Context & context, /// DB, tables, data types, storage engines, functions, aggregate functions...
Context & context, /// DB, tables, data types, storage engines, functions, aggregate functions...
std::function<void(const String &)> set_content_type /// If non-empty callback is passed, it will be called with the Content-Type of the result.
);
......
......@@ -89,6 +89,12 @@ class ClickHouseCluster:
shutil.rmtree(self.instances_dir)
for instance in self.instances.values():
# Kill unstopped containers from previous launch
try:
subprocess.check_call(self.base_cmd + ['kill'])
subprocess.check_call(self.base_cmd + ['down', '--volumes'])
except:
pass
instance.create_dir(destroy_dir=destroy_dirs)
subprocess.check_call(self.base_cmd + ['up', '-d'])
......
......@@ -27,6 +27,11 @@ def ddl_check_query(instance, query, num_hosts=None):
check_all_hosts_sucesfully_executed(contents, num_hosts)
return contents
def ddl_check_there_are_no_dublicates(instance):
answer = instance.query("SELECT max(c), argMax(q, c) FROM (SELECT lower(query) AS q, count() AS c FROM system.query_log WHERE type=2 AND q LIKE '/*ddl_entry=query-%' GROUP BY query)")
row = TSV.toMat(answer)[0]
assert row[0] == "1", "dublicates on {} {}, query {}".format(instance.name, instance.ip_address, row[1])
TEST_REPLICATED_ALTERS=True
......@@ -61,7 +66,10 @@ CREATE TABLE IF NOT EXISTS all_tables ON CLUSTER 'cluster_no_replicas'
ddl_check_query(instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'")
finally:
pass
# Check query log to ensure that DDL queries are not executed twice
time.sleep(1)
for instance in cluster.instances.values():
ddl_check_there_are_no_dublicates(instance)
#cluster.shutdown()
......@@ -98,7 +106,7 @@ def test_on_server_fail(started_cluster):
contents = instance.query("SELECT hostName() AS h FROM all_tables WHERE database='test' AND name='test_server_fail' ORDER BY h")
assert TSV(contents) == TSV("ch1\nch2\nch3\nch4\n")
ddl_check_query(instance, "DROP TABLE IF EXISTS test.test_server_fail ON CLUSTER 'cluster'")
ddl_check_query(instance, "DROP TABLE test.test_server_fail ON CLUSTER 'cluster'")
def _test_on_connection_losses(cluster, zk_timeout):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册