提交 23af9ddd 编写于 作者: V Vitaliy Lyudvichenko 提交者: alexey-milovidov

Fixed segfault: the future owns source ops. [#CLICKHOUSE-3207]

上级 351a0905
......@@ -19,6 +19,8 @@ public:
Op() : data(new zoo_op_t) {}
virtual ~Op() {}
virtual std::unique_ptr<Op> clone() const = 0;
virtual std::string describe() = 0;
std::unique_ptr<zoo_op_t> data;
......@@ -31,21 +33,32 @@ public:
struct Op::Remove : public Op
{
Remove(const std::string & path_, int32_t version) :
path(path_)
Remove(const std::string & path_, int32_t version_) :
path(path_), version(version_)
{
zoo_delete_op_init(data.get(), path.c_str(), version);
}
std::unique_ptr<Op> clone() const override
{
return std::unique_ptr<zkutil::Op>(new Remove(path, version));
}
std::string describe() override { return "command: remove, path: " + path; }
private:
std::string path;
int32_t version;
};
struct Op::Create : public Op
{
Create(const std::string & path_, const std::string & value_, ACLPtr acl, int32_t flags);
Create(const std::string & path_, const std::string & value_, ACLPtr acl_, int32_t flags_);
std::unique_ptr<Op> clone() const override
{
return std::unique_ptr<zkutil::Op>(new Create(path, value, acl, flags));
}
std::string getPathCreated()
{
......@@ -62,17 +75,24 @@ struct Op::Create : public Op
private:
std::string path;
std::string value;
ACLPtr acl;
int32_t flags;
std::vector<char> created_path;
};
struct Op::SetData : public Op
{
SetData(const std::string & path_, const std::string & value_, int32_t version) :
path(path_), value(value_)
SetData(const std::string & path_, const std::string & value_, int32_t version_) :
path(path_), value(value_), version(version_)
{
zoo_set_op_init(data.get(), path.c_str(), value.c_str(), value.size(), version, &stat);
}
std::unique_ptr<Op> clone() const override
{
return std::unique_ptr<zkutil::Op>(new SetData(path, value, version));
}
std::string describe() override
{
return
......@@ -85,21 +105,28 @@ struct Op::SetData : public Op
private:
std::string path;
std::string value;
int32_t version;
Stat stat;
};
struct Op::Check : public Op
{
Check(const std::string & path_, int32_t version) :
path(path_)
Check(const std::string & path_, int32_t version_) :
path(path_), version(version_)
{
zoo_check_op_init(data.get(), path.c_str(), version);
}
std::unique_ptr<Op> clone() const override
{
return std::unique_ptr<zkutil::Op>(new Check(path, version));
}
std::string describe() override { return "command: check, path: " + path; }
private:
std::string path;
int32_t version;
};
struct OpResult : public zoo_op_result_t
......
......@@ -710,8 +710,8 @@ ZooKeeperPtr ZooKeeper::startNewSession() const
return std::make_shared<ZooKeeper>(hosts, session_timeout_ms);
}
Op::Create::Create(const std::string & path_, const std::string & value_, ACLPtr acl, int32_t flags)
: path(path_), value(value_), created_path(path.size() + ZooKeeper::SEQUENTIAL_SUFFIX_SIZE)
Op::Create::Create(const std::string & path_, const std::string & value_, ACLPtr acl_, int32_t flags_)
: path(path_), value(value_), acl(acl_), flags(flags_), created_path(path.size() + ZooKeeper::SEQUENTIAL_SUFFIX_SIZE)
{
zoo_create_op_init(data.get(), path.c_str(), value.c_str(), value.size(), acl, flags, created_path.data(), created_path.size());
}
......@@ -907,10 +907,24 @@ ZooKeeper::MultiFuture ZooKeeper::asyncMultiImpl(const zkutil::Ops & ops_, bool
size_t count = ops_.size();
OpResultsPtr results(new OpResults(count));
MultiFuture future{ [throw_exception, results] (int rc) {
/// We need to hold all references to ops data until the end of multi callback
struct OpsHolder
{
std::shared_ptr<zkutil::Ops> ops_ptr = std::make_shared<zkutil::Ops>();
std::shared_ptr<std::vector<zoo_op_t>> ops_raw_ptr = std::make_shared<std::vector<zoo_op_t>>();;
} holder;
for (const auto & op : ops_)
{
holder.ops_ptr->emplace_back(op->clone());
holder.ops_raw_ptr->push_back(*holder.ops_ptr->back()->data);
}
MultiFuture future{ [throw_exception, results, holder] (int rc) {
OpResultsAndCode res;
res.code = rc;
res.results = results;
res.ops_ptr = holder.ops_ptr;
if (throw_exception && rc != ZOK)
throw zkutil::KeeperException(rc);
return res;
......@@ -927,10 +941,7 @@ ZooKeeper::MultiFuture ZooKeeper::asyncMultiImpl(const zkutil::Ops & ops_, bool
if (expired())
throw KeeperException(ZINVALIDSTATE);
/// There is no need to hold these ops until the end of the passed callback
std::vector<zoo_op_t> ops;
for (const auto & op : ops_)
ops.push_back(*(op->data));
auto & ops = *holder.ops_raw_ptr;
int32_t code = zoo_amulti(impl, static_cast<int>(ops.size()), ops.data(), results->data(),
[] (int rc, const void * data)
......
......@@ -326,14 +326,14 @@ public:
struct OpResultsAndCode
{
OpResultsPtr results;
Ops ops;
std::shared_ptr<Ops> ops_ptr;
int code;
};
using MultiFuture = Future<OpResultsAndCode, int>;
MultiFuture asyncMulti(const zkutil::Ops & ops);
MultiFuture asyncMulti(const Ops & ops);
/// Like the previous one but don't throw any exceptions on future.get()
MultiFuture tryAsyncMulti(const zkutil::Ops & ops);
MultiFuture tryAsyncMulti(const Ops & ops);
static std::string error2string(int32_t code);
......
#include <iostream>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/Exception.h>
#include <iostream>
#include <chrono>
#include <gtest/gtest.h>
using namespace DB;
......@@ -67,8 +68,33 @@ TEST(zkutil, multi_async)
auto res = fut.get();
ASSERT_TRUE(res.code == ZOK);
ASSERT_EQ(res.results->size(), 2);
ASSERT_EQ(res.ops_ptr->size(), 2);
}
EXPECT_ANY_THROW
(
std::vector<zkutil::ZooKeeper::MultiFuture> futures;
for (size_t i = 0; i < 10000; ++i)
{
ops.clear();
ops.emplace_back(new zkutil::Op::Remove("/clickhouse_test_zkutil_multi", -1));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi", "_", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(new zkutil::Op::Check("/clickhouse_test_zkutil_multi", -1));
ops.emplace_back(new zkutil::Op::SetData("/clickhouse_test_zkutil_multi", "xxx", 42));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
futures.emplace_back(zookeeper->asyncMulti(ops));
}
futures[0].get();
);
/// Check there are no segfaults for remaining 999 futures
using namespace std::chrono_literals;
std::this_thread::sleep_for(1s);
{
ops.clear();
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi", "_", acl, zkutil::CreateMode::Persistent));
......@@ -80,5 +106,6 @@ TEST(zkutil, multi_async)
auto res = fut.get();
ASSERT_TRUE(res.code == ZNODEEXISTS);
ASSERT_EQ(res.results->size(), 2);
ASSERT_EQ(res.ops_ptr->size(), 2);
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册