提交 08170d0d 编写于 作者: A Alexey Milovidov

Modifications after removing libzookeeper [#CLICKHOUSE-2]

上级 9379d71f
#pragma once #pragma once
#include <Common/Exception.h>
#include "Types.h"
#include <Common/ProfileEvents.h>
namespace DB
{
namespace ErrorCodes
{
extern const int KEEPER_EXCEPTION;
}
}
namespace ProfileEvents #include "Types.h"
{
extern const Event ZooKeeperExceptions;
}
namespace zkutil namespace zkutil
...@@ -43,42 +28,7 @@ inline bool isUserError(int32_t zk_return_code) ...@@ -43,42 +28,7 @@ inline bool isUserError(int32_t zk_return_code)
} }
class KeeperException : public DB::Exception using KeeperException = ZooKeeperImpl::Exception;
{
private:
/// delegate constructor, used to minimize repetition; last parameter used for overload resolution
KeeperException(const std::string & msg, const int32_t code, int)
: DB::Exception(msg, DB::ErrorCodes::KEEPER_EXCEPTION), code(code) { incrementEventCounter(); }
public:
KeeperException(const std::string & msg, const int32_t code)
: KeeperException(msg + " (" + ZooKeeperImpl::ZooKeeper::errorMessage(code) + ")", code, 0) {}
explicit KeeperException(const int32_t code) : KeeperException(ZooKeeperImpl::ZooKeeper::errorMessage(code), code, 0) {}
KeeperException(const int32_t code, const std::string & path)
: KeeperException(std::string{ZooKeeperImpl::ZooKeeper::errorMessage(code)} + ", path: " + path, code, 0) {}
KeeperException(const KeeperException & exc) : DB::Exception(exc), code(exc.code) { incrementEventCounter(); }
const char * name() const throw() override { return "zkutil::KeeperException"; }
const char * className() const throw() override { return "zkutil::KeeperException"; }
KeeperException * clone() const override { return new KeeperException(*this); }
/// Any error related with network or master election
/// In case of these errors you should reinitialize ZooKeeper session.
bool isHardwareError() const
{
return zkutil::isHardwareError(code);
}
const int32_t code;
private:
static void incrementEventCounter()
{
ProfileEvents::increment(ProfileEvents::ZooKeeperExceptions);
}
};
class KeeperMultiException : public KeeperException class KeeperMultiException : public KeeperException
......
#include <Common/ZooKeeper/ZooKeeperImpl.h> #include <Common/ZooKeeper/ZooKeeperImpl.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <Poco/Exception.h> #include <Poco/Exception.h>
...@@ -11,7 +13,19 @@ ...@@ -11,7 +13,19 @@
#include <array> #include <array>
//#include <iostream>
namespace DB
{
namespace ErrorCodes
{
extern const int KEEPER_EXCEPTION;
}
}
namespace ProfileEvents
{
extern const Event ZooKeeperExceptions;
}
/** ZooKeeper wire protocol. /** ZooKeeper wire protocol.
...@@ -228,6 +242,33 @@ after: ...@@ -228,6 +242,33 @@ after:
namespace ZooKeeperImpl namespace ZooKeeperImpl
{ {
Exception::Exception(const std::string & msg, const int32_t code, int)
: DB::Exception(msg, DB::ErrorCodes::KEEPER_EXCEPTION), code(code)
{
ProfileEvents::increment(ProfileEvents::ZooKeeperExceptions);
}
Exception::Exception(const std::string & msg, const int32_t code)
: Exception(msg + " (" + ZooKeeperImpl::ZooKeeper::errorMessage(code) + ")", code, 0)
{
}
Exception::Exception(const int32_t code)
: Exception(ZooKeeperImpl::ZooKeeper::errorMessage(code), code, 0)
{
}
Exception::Exception(const int32_t code, const std::string & path)
: Exception(std::string{ZooKeeperImpl::ZooKeeper::errorMessage(code)} + ", path: " + path, code, 0)
{
}
Exception::Exception(const Exception & exc)
: DB::Exception(exc), code(exc.code)
{
}
using namespace DB; using namespace DB;
...@@ -304,10 +345,10 @@ void read(String & s, ReadBuffer & in) ...@@ -304,10 +345,10 @@ void read(String & s, ReadBuffer & in)
static constexpr int32_t max_string_size = 1 << 20; static constexpr int32_t max_string_size = 1 << 20;
int32_t size = 0; int32_t size = 0;
read(size, in); read(size, in);
if (size < 0) if (size < 0) /// TODO Actually it means that zookeeper node have NULL value. Maybe better to treat it like empty string.
throw Exception("Negative size"); /// TODO Actually it means that zookeeper node have NULL value. Maybe better to treat it like empty string. throw Exception("Negative size while reading string from ZooKeeper", ZooKeeper::ZMARSHALLINGERROR);
if (size > max_string_size) if (size > max_string_size)
throw Exception("Too large string size"); /// TODO error code throw Exception("Too large string size while reading from ZooKeeper", ZooKeeper::ZMARSHALLINGERROR);
s.resize(size); s.resize(size);
in.read(&s[0], size); in.read(&s[0], size);
} }
...@@ -317,7 +358,7 @@ template <size_t N> void read(std::array<char, N> & s, ReadBuffer & in) ...@@ -317,7 +358,7 @@ template <size_t N> void read(std::array<char, N> & s, ReadBuffer & in)
int32_t size = 0; int32_t size = 0;
read(size, in); read(size, in);
if (size != N) if (size != N)
throw Exception("Unexpected array size"); /// TODO error code throw Exception("Unexpected array size while reading from ZooKeeper", ZooKeeper::ZMARSHALLINGERROR);
in.read(&s[0], N); in.read(&s[0], N);
} }
...@@ -347,9 +388,9 @@ template <typename T> void read(std::vector<T> & arr, ReadBuffer & in) ...@@ -347,9 +388,9 @@ template <typename T> void read(std::vector<T> & arr, ReadBuffer & in)
int32_t size = 0; int32_t size = 0;
read(size, in); read(size, in);
if (size < 0) if (size < 0)
throw Exception("Negative size"); throw Exception("Negative size while reading array from ZooKeeper", ZooKeeper::ZMARSHALLINGERROR);
if (size > max_array_size) if (size > max_array_size)
throw Exception("Too large array size"); /// TODO error code throw Exception("Too large array size while reading from ZooKeeper", ZooKeeper::ZMARSHALLINGERROR);
arr.resize(size); arr.resize(size);
for (auto & elem : arr) for (auto & elem : arr)
read(elem, in); read(elem, in);
...@@ -488,6 +529,7 @@ void ZooKeeper::connect( ...@@ -488,6 +529,7 @@ void ZooKeeper::connect(
static constexpr size_t num_tries = 3; static constexpr size_t num_tries = 3;
bool connected = false; bool connected = false;
WriteBufferFromOwnString fail_reasons;
for (size_t try_no = 0; try_no < num_tries; ++try_no) for (size_t try_no = 0; try_no < num_tries; ++try_no)
{ {
for (const auto & address : addresses) for (const auto & address : addresses)
...@@ -500,10 +542,11 @@ void ZooKeeper::connect( ...@@ -500,10 +542,11 @@ void ZooKeeper::connect(
} }
catch (const Poco::Net::NetException & e) catch (const Poco::Net::NetException & e)
{ {
/// TODO log exception fail_reasons << "\n" << getCurrentExceptionMessage(false);
} }
catch (const Poco::TimeoutException & e) catch (const Poco::TimeoutException & e)
{ {
fail_reasons << "\n" << getCurrentExceptionMessage(false);
} }
} }
...@@ -512,7 +555,22 @@ void ZooKeeper::connect( ...@@ -512,7 +555,22 @@ void ZooKeeper::connect(
} }
if (!connected) if (!connected)
throw Exception("All connection tries failed"); /// TODO more info; error code {
WriteBufferFromOwnString out;
out << "All connection tries failed while connecting to ZooKeeper. Addresses: ";
bool first = true;
for (const auto & address : addresses)
{
if (first)
first = false;
else
out << ", ";
out << address.toString();
}
out << fail_reasons.str();
throw Exception(out.str(), ZCONNECTIONLOSS);
}
socket.setReceiveTimeout(operation_timeout); socket.setReceiveTimeout(operation_timeout);
socket.setSendTimeout(operation_timeout); socket.setSendTimeout(operation_timeout);
...@@ -553,15 +611,15 @@ void ZooKeeper::receiveHandshake() ...@@ -553,15 +611,15 @@ void ZooKeeper::receiveHandshake()
read(handshake_length); read(handshake_length);
if (handshake_length != 36) if (handshake_length != 36)
throw Exception("Unexpected handshake length received: " + toString(handshake_length)); throw Exception("Unexpected handshake length received: " + toString(handshake_length), ZMARSHALLINGERROR);
read(protocol_version_read); read(protocol_version_read);
if (protocol_version_read != protocol_version) if (protocol_version_read != protocol_version)
throw Exception("Unexpected protocol version: " + toString(protocol_version_read)); throw Exception("Unexpected protocol version: " + toString(protocol_version_read), ZMARSHALLINGERROR);
read(timeout); read(timeout);
if (timeout != session_timeout.totalMilliseconds()) if (timeout != session_timeout.totalMilliseconds())
throw Exception("Received different session timeout from server: " + toString(timeout)); throw Exception("Received different session timeout from server: " + toString(timeout), ZMARSHALLINGERROR);
read(session_id); read(session_id);
read(passwd); read(passwd);
...@@ -588,14 +646,17 @@ void ZooKeeper::sendAuth(const String & scheme, const String & data) ...@@ -588,14 +646,17 @@ void ZooKeeper::sendAuth(const String & scheme, const String & data)
read(err); read(err);
if (xid != auth_xid) if (xid != auth_xid)
throw Exception("Unexpected event recievent in reply to auth request: " + toString(xid)); throw Exception("Unexpected event recieved in reply to auth request: " + toString(xid),
ZMARSHALLINGERROR);
int32_t actual_length = in->count() - count_before_event; int32_t actual_length = in->count() - count_before_event;
if (length != actual_length) if (length != actual_length)
throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length)); throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length),
ZMARSHALLINGERROR);
if (err) if (err)
throw Exception("Error received in reply to auth request. Code: " + toString(err) + ". Message: " + String(errorMessage(err))); throw Exception("Error received in reply to auth request. Code: " + toString(err) + ". Message: " + String(errorMessage(err)),
ZMARSHALLINGERROR);
} }
...@@ -669,7 +730,7 @@ void ZooKeeper::receiveThread() ...@@ -669,7 +730,7 @@ void ZooKeeper::receiveThread()
has_operations = true; has_operations = true;
auto earliest_operation_deadline = operations.begin()->second.time + std::chrono::microseconds(operation_timeout.totalMicroseconds()); auto earliest_operation_deadline = operations.begin()->second.time + std::chrono::microseconds(operation_timeout.totalMicroseconds());
if (now > earliest_operation_deadline) if (now > earliest_operation_deadline)
throw Exception("Operation timeout"); throw Exception("Operation timeout", ZOPERATIONTIMEOUT);
max_wait = std::chrono::duration_cast<std::chrono::microseconds>(earliest_operation_deadline - now).count(); max_wait = std::chrono::duration_cast<std::chrono::microseconds>(earliest_operation_deadline - now).count();
} }
} }
...@@ -685,10 +746,10 @@ void ZooKeeper::receiveThread() ...@@ -685,10 +746,10 @@ void ZooKeeper::receiveThread()
else else
{ {
if (has_operations) if (has_operations)
throw Exception("Operation timeout"); throw Exception("Operation timeout", ZOPERATIONTIMEOUT);
waited += max_wait; waited += max_wait;
if (waited > session_timeout.totalMicroseconds()) if (waited > session_timeout.totalMicroseconds())
throw Exception("Nothing is received in session timeout"); throw Exception("Nothing is received in session timeout", ZOPERATIONTIMEOUT);
} }
} }
...@@ -729,10 +790,10 @@ ZooKeeper::ResponsePtr ZooKeeper::CloseRequest::makeResponse() const { return st ...@@ -729,10 +790,10 @@ ZooKeeper::ResponsePtr ZooKeeper::CloseRequest::makeResponse() const { return st
void addRootPath(String & path, const String & root_path) void addRootPath(String & path, const String & root_path)
{ {
if (path.empty()) if (path.empty())
throw Exception("Path cannot be empty"); throw Exception("Path cannot be empty", ZooKeeper::ZBADARGUMENTS);
if (path[0] != '/') if (path[0] != '/')
throw Exception("Path must begin with /"); throw Exception("Path must begin with /", ZooKeeper::ZBADARGUMENTS);
if (root_path.empty()) if (root_path.empty())
return; return;
...@@ -749,7 +810,7 @@ void removeRootPath(String & path, const String & root_path) ...@@ -749,7 +810,7 @@ void removeRootPath(String & path, const String & root_path)
return; return;
if (path.size() <= root_path.size()) if (path.size() <= root_path.size())
throw Exception("Received path is not longer than root_path"); throw Exception("Received path is not longer than root_path", ZooKeeper::ZDATAINCONSISTENCY);
path = path.substr(root_path.size()); path = path.substr(root_path.size());
} }
...@@ -798,7 +859,7 @@ void ZooKeeper::receiveEvent() ...@@ -798,7 +859,7 @@ void ZooKeeper::receiveEvent()
if (xid == ping_xid) if (xid == ping_xid)
{ {
if (err) if (err)
throw Exception("Received error in heartbeat response: " + String(errorMessage(err))); throw Exception("Received error in heartbeat response: " + String(errorMessage(err)), ZRUNTIMEINCONSISTENCY);
response = std::make_shared<HeartbeatResponse>(); response = std::make_shared<HeartbeatResponse>();
...@@ -845,7 +906,7 @@ void ZooKeeper::receiveEvent() ...@@ -845,7 +906,7 @@ void ZooKeeper::receiveEvent()
auto it = operations.find(xid); auto it = operations.find(xid);
if (it == operations.end()) if (it == operations.end())
throw Exception("Received response for unknown xid"); throw Exception("Received response for unknown xid", ZRUNTIMEINCONSISTENCY);
request_info = std::move(it->second); request_info = std::move(it->second);
operations.erase(it); operations.erase(it);
...@@ -866,7 +927,7 @@ void ZooKeeper::receiveEvent() ...@@ -866,7 +927,7 @@ void ZooKeeper::receiveEvent()
int32_t actual_length = in->count() - count_before_event; int32_t actual_length = in->count() - count_before_event;
if (length != actual_length) if (length != actual_length)
throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length)); throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length), ZMARSHALLINGERROR);
if (request_info.callback) if (request_info.callback)
request_info.callback(*response); request_info.callback(*response);
...@@ -1058,12 +1119,13 @@ void ZooKeeper::ErrorResponse::readImpl(ReadBuffer & in) ...@@ -1058,12 +1119,13 @@ void ZooKeeper::ErrorResponse::readImpl(ReadBuffer & in)
ZooKeeperImpl::read(read_error, in); ZooKeeperImpl::read(read_error, in);
if (read_error != error) if (read_error != error)
throw Exception("Error code in ErrorResponse (" + toString(read_error) + ") doesn't match error code in header (" + toString(error) + ")"); throw Exception("Error code in ErrorResponse (" + toString(read_error) + ") doesn't match error code in header (" + toString(error) + ")",
ZMARSHALLINGERROR);
} }
void ZooKeeper::CloseResponse::readImpl(ReadBuffer &) void ZooKeeper::CloseResponse::readImpl(ReadBuffer &)
{ {
throw Exception("Received response for close request"); throw Exception("Received response for close request", ZRUNTIMEINCONSISTENCY);
} }
ZooKeeper::MultiResponse::MultiResponse(const Requests & requests) ZooKeeper::MultiResponse::MultiResponse(const Requests & requests)
...@@ -1089,7 +1151,7 @@ void ZooKeeper::MultiResponse::readImpl(ReadBuffer & in) ...@@ -1089,7 +1151,7 @@ void ZooKeeper::MultiResponse::readImpl(ReadBuffer & in)
// std::cerr << "Received result for multi: " << op_num << "\n"; // std::cerr << "Received result for multi: " << op_num << "\n";
if (done) if (done)
throw Exception("Not enough results received for multi transaction"); throw Exception("Not enough results received for multi transaction", ZMARSHALLINGERROR);
/// op_num == -1 is special for multi transaction. /// op_num == -1 is special for multi transaction.
/// For unknown reason, error code is duplicated in header and in response body. /// For unknown reason, error code is duplicated in header and in response body.
...@@ -1123,11 +1185,11 @@ void ZooKeeper::MultiResponse::readImpl(ReadBuffer & in) ...@@ -1123,11 +1185,11 @@ void ZooKeeper::MultiResponse::readImpl(ReadBuffer & in)
ZooKeeperImpl::read(error, in); ZooKeeperImpl::read(error, in);
if (!done) if (!done)
throw Exception("Too many results received for multi transaction"); throw Exception("Too many results received for multi transaction", ZMARSHALLINGERROR);
if (op_num != -1) if (op_num != -1)
throw Exception("Unexpected op_num received at the end of results for multi transaction"); throw Exception("Unexpected op_num received at the end of results for multi transaction", ZMARSHALLINGERROR);
if (error != -1) if (error != -1)
throw Exception("Unexpected error value received at the end of results for multi transaction"); throw Exception("Unexpected error value received at the end of results for multi transaction", ZMARSHALLINGERROR);
} }
} }
...@@ -1136,7 +1198,7 @@ void ZooKeeper::pushRequest(RequestInfo && info) ...@@ -1136,7 +1198,7 @@ void ZooKeeper::pushRequest(RequestInfo && info)
{ {
/// If the request is close request, we push it even after session is expired - because it will signal sending thread to stop. /// If the request is close request, we push it even after session is expired - because it will signal sending thread to stop.
if (expired && info.request->xid != close_xid) if (expired && info.request->xid != close_xid)
throw Exception("Session expired"); throw Exception("Session expired", ZSESSIONEXPIRED);
info.request->addRootPath(root_path); info.request->addRootPath(root_path);
...@@ -1146,7 +1208,7 @@ void ZooKeeper::pushRequest(RequestInfo && info) ...@@ -1146,7 +1208,7 @@ void ZooKeeper::pushRequest(RequestInfo && info)
{ {
info.request->xid = xid.fetch_add(1); info.request->xid = xid.fetch_add(1);
if (info.request->xid < 0) if (info.request->xid < 0)
throw Exception("XID overflow"); throw Exception("XID overflow", ZSESSIONEXPIRED);
} }
{ {
...@@ -1161,8 +1223,8 @@ void ZooKeeper::pushRequest(RequestInfo && info) ...@@ -1161,8 +1223,8 @@ void ZooKeeper::pushRequest(RequestInfo && info)
watches[info.request->getPath()].emplace_back(std::move(info.watch)); watches[info.request->getPath()].emplace_back(std::move(info.watch));
} }
if (!requests.tryPush(info.request, session_timeout.totalMilliseconds())) if (!requests.tryPush(info.request, operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within session timeout"); throw Exception("Cannot push request to queue within operation timeout", ZOPERATIONTIMEOUT);
} }
......
...@@ -29,6 +29,26 @@ namespace ZooKeeperImpl ...@@ -29,6 +29,26 @@ namespace ZooKeeperImpl
using namespace DB; using namespace DB;
class Exception : public DB::Exception
{
private:
/// Delegate constructor, used to minimize repetition; last parameter used for overload resolution.
Exception(const std::string & msg, const int32_t code, int);
public:
explicit Exception(const int32_t code);
Exception(const std::string & msg, const int32_t code);
Exception(const int32_t code, const std::string & path);
Exception(const Exception & exc);
const char * name() const throw() override { return "ZooKeeperImpl::Exception"; }
const char * className() const throw() override { return "ZooKeeperImpl::Exception"; }
Exception * clone() const override { return new Exception(*this); }
const int32_t code;
};
/** Usage scenario: /** Usage scenario:
* - create an object and issue commands; * - create an object and issue commands;
* - you provide callbacks for your commands; callbacks are invoked in internal thread and must be cheap: * - you provide callbacks for your commands; callbacks are invoked in internal thread and must be cheap:
......
...@@ -859,7 +859,7 @@ void DDLWorker::run() ...@@ -859,7 +859,7 @@ void DDLWorker::run()
} }
catch (const zkutil::KeeperException & e) catch (const zkutil::KeeperException & e)
{ {
if (!e.isHardwareError()) if (!zkutil::isHardwareError(e.code))
throw; throw;
} }
} }
...@@ -887,7 +887,7 @@ void DDLWorker::run() ...@@ -887,7 +887,7 @@ void DDLWorker::run()
} }
catch (zkutil::KeeperException & e) catch (zkutil::KeeperException & e)
{ {
if (e.isHardwareError()) if (zkutil::isHardwareError(e.code))
{ {
LOG_DEBUG(log, "Recovering ZooKeeper session after: " << getCurrentExceptionMessage(false)); LOG_DEBUG(log, "Recovering ZooKeeper session after: " << getCurrentExceptionMessage(false));
......
...@@ -27,6 +27,7 @@ namespace ErrorCodes ...@@ -27,6 +27,7 @@ namespace ErrorCodes
extern const int READONLY; extern const int READONLY;
extern const int UNKNOWN_STATUS_OF_INSERT; extern const int UNKNOWN_STATUS_OF_INSERT;
extern const int INSERT_WAS_DEDUPLICATED; extern const int INSERT_WAS_DEDUPLICATED;
extern const int KEEPER_EXCEPTION;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册