提交 6db8da82 编写于 作者: V Vitaliy Lyudvichenko

Add requested changes. [#CLICKHOUSE-5]

上级 73e2aab9
...@@ -259,7 +259,7 @@ std::string ExecutionStatus::serializeText() const ...@@ -259,7 +259,7 @@ std::string ExecutionStatus::serializeText() const
std::string res; std::string res;
{ {
WriteBufferFromString wb(res); WriteBufferFromString wb(res);
wb << code << "\n" << message; wb << code << "\n" << escape << message;
} }
return res; return res;
} }
...@@ -267,7 +267,7 @@ std::string ExecutionStatus::serializeText() const ...@@ -267,7 +267,7 @@ std::string ExecutionStatus::serializeText() const
void ExecutionStatus::deserializeText(const std::string & data) void ExecutionStatus::deserializeText(const std::string & data)
{ {
ReadBufferFromString rb(data); ReadBufferFromString rb(data);
rb >> code >> "\n" >> message; rb >> code >> "\n" >> escape >> message;
} }
ExecutionStatus ExecutionStatus::fromCurrentException(const std::string & start_of_message) ExecutionStatus ExecutionStatus::fromCurrentException(const std::string & start_of_message)
......
...@@ -91,7 +91,7 @@ std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded ...@@ -91,7 +91,7 @@ std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded
int getCurrentExceptionCode(); int getCurrentExceptionCode();
/// An execution status of any piece of code /// An execution status of any piece of code, contains return code and optional error
struct ExecutionStatus struct ExecutionStatus
{ {
int code = 0; int code = 0;
......
...@@ -50,6 +50,10 @@ namespace ErrorCodes ...@@ -50,6 +50,10 @@ namespace ErrorCodes
} }
const size_t DDLWorker::node_max_lifetime_seconds = 7 * 24 * 60 * 60; // week
const size_t DDLWorker::cleanup_min_period_seconds = 60; // minute
struct DDLLogEntry struct DDLLogEntry
{ {
String query; String query;
...@@ -65,7 +69,7 @@ struct DDLLogEntry ...@@ -65,7 +69,7 @@ struct DDLLogEntry
WriteBufferFromString wb(res); WriteBufferFromString wb(res);
wb << "version: " << CURRENT_VERSION << "\n"; wb << "version: " << CURRENT_VERSION << "\n";
wb << "query: " << query << "\n"; wb << "query: " << escape << query << "\n";
wb << "hosts: " << hosts << "\n"; wb << "hosts: " << hosts << "\n";
wb << "initiator: " << initiator << "\n"; wb << "initiator: " << initiator << "\n";
} }
...@@ -83,7 +87,7 @@ struct DDLLogEntry ...@@ -83,7 +87,7 @@ struct DDLLogEntry
if (version != CURRENT_VERSION) if (version != CURRENT_VERSION)
throw Exception("Unknown DDLLogEntry format version: " + version, ErrorCodes::UNKNOWN_FORMAT_VERSION); throw Exception("Unknown DDLLogEntry format version: " + version, ErrorCodes::UNKNOWN_FORMAT_VERSION);
rb >> "query: " >> query >> "\n"; rb >> "query: " >> escape >> query >> "\n";
rb >> "hosts: " >> hosts >> "\n"; rb >> "hosts: " >> hosts >> "\n";
if (!rb.eof()) if (!rb.eof())
...@@ -128,7 +132,7 @@ static bool isSupportedAlterType(int type) ...@@ -128,7 +132,7 @@ static bool isSupportedAlterType(int type)
DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_) DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_)
: context(context_), stop_flag(false) : context(context_)
{ {
queue_dir = zk_root_dir; queue_dir = zk_root_dir;
if (queue_dir.back() == '/') if (queue_dir.back() == '/')
...@@ -169,7 +173,9 @@ void DDLWorker::processTasks() ...@@ -169,7 +173,9 @@ void DDLWorker::processTasks()
for (auto it = begin_node; it != queue_nodes.end(); ++it) for (auto it = begin_node; it != queue_nodes.end(); ++it)
{ {
String node_data, node_name = *it, node_path = queue_dir + "/" + node_name; const String & node_name = *it;
String node_path = queue_dir + "/" + node_name;
String node_data;
if (!zookeeper->tryGet(node_path, node_data)) if (!zookeeper->tryGet(node_path, node_data))
{ {
......
...@@ -65,7 +65,7 @@ private: ...@@ -65,7 +65,7 @@ private:
std::string queue_dir; /// dir with queue of queries std::string queue_dir; /// dir with queue of queries
std::string master_dir; /// dir with queries was initiated by the server std::string master_dir; /// dir with queries was initiated by the server
/// Used to omit already processed nodes; /// Used to omit already processed nodes. Maybe usage of set is more obvious.
std::string last_processed_node_name; std::string last_processed_node_name;
std::shared_ptr<zkutil::ZooKeeper> zookeeper; std::shared_ptr<zkutil::ZooKeeper> zookeeper;
...@@ -76,12 +76,15 @@ private: ...@@ -76,12 +76,15 @@ private:
ExecutionStatus current_node_execution_status; ExecutionStatus current_node_execution_status;
std::shared_ptr<Poco::Event> event_queue_updated; std::shared_ptr<Poco::Event> event_queue_updated;
std::atomic<bool> stop_flag; std::atomic<bool> stop_flag{false};
std::thread thread; std::thread thread;
size_t last_cleanup_time_seconds = 0; size_t last_cleanup_time_seconds = 0;
static constexpr size_t node_max_lifetime_seconds = 60; // 7 * 24 * 60 * 60;
static constexpr size_t cleanup_min_period_seconds = 60; /// Delete node if its age is greater than that
static const size_t node_max_lifetime_seconds;
/// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago
static const size_t cleanup_min_period_seconds;
friend class DDLQueryStatusInputSream; friend class DDLQueryStatusInputSream;
}; };
......
...@@ -177,11 +177,7 @@ private: ...@@ -177,11 +177,7 @@ private:
}; };
<<<<<<< HEAD
/// Quota the identifier with backquotes, if required.
=======
/// Surrounds an identifier by back quotes if it is necessary. /// Surrounds an identifier by back quotes if it is necessary.
>>>>>>> Parsers refactoring. [#CLICKHOUSE-5]
String backQuoteIfNeed(const String & x); String backQuoteIfNeed(const String & x);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册