提交 26511b79 编写于 作者: A Alexander Burmak

Added DiskMemory and tests

上级 ec1a4909
......@@ -277,15 +277,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
Poco::File(path + "data/" + default_database).createDirectories();
Poco::File(path + "metadata/" + default_database).createDirectories();
/// Check that we have read and write access to all data paths
auto disk_selector = global_context->getDiskSelector();
for (const auto & [name, disk] : disk_selector.getDisksMap())
{
Poco::File disk_path(disk->getPath());
if (!disk_path.canRead() || !disk_path.canWrite())
throw Exception("There is no RW access to disk " + name + " (" + disk->getPath() + ")", ErrorCodes::PATH_ACCESS_DENIED);
}
StatusFile status{path + "status"};
SCOPE_EXIT({
......
......@@ -476,6 +476,8 @@ namespace ErrorCodes
extern const int CANNOT_CREATE_DATABASE = 501;
extern const int CANNOT_SIGQUEUE = 502;
extern const int AGGREGATE_FUNCTION_THROW = 503;
extern const int FILE_ALREADY_EXISTS = 504;
extern const int CANNOT_DELETE_DIRECTORY = 505;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
......
if (ENABLE_TESTS)
add_subdirectory (tests)
endif ()
......@@ -12,6 +12,7 @@ namespace ErrorCodes
{
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int PATH_ACCESS_DENIED;
}
std::mutex DiskLocal::mutex;
......@@ -121,7 +122,7 @@ void DiskLocal::moveDirectory(const String & from_path, const String & to_path)
DiskDirectoryIteratorPtr DiskLocal::iterateDirectory(const String & path)
{
return std::make_unique<DiskLocalDirectoryIterator>(disk_path + path);
return std::make_unique<DiskLocalDirectoryIterator>(disk_path, path);
}
void DiskLocal::moveFile(const String & from_path, const String & to_path)
......@@ -219,6 +220,11 @@ void registerDiskLocal(DiskFactory & factory)
throw Exception("Disk path must end with /. Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
if (Poco::File disk{path}; !disk.canRead() || !disk.canWrite())
{
throw Exception("There is no RW access to disk " + name + " (" + path + ")", ErrorCodes::PATH_ACCESS_DENIED);
}
bool has_space_ratio = config.has(config_prefix + ".keep_free_space_ratio");
if (config.has(config_prefix + ".keep_free_space_bytes") && has_space_ratio)
......
......@@ -91,15 +91,23 @@ using DiskLocalPtr = std::shared_ptr<DiskLocal>;
class DiskLocalDirectoryIterator : public IDiskDirectoryIterator
{
public:
explicit DiskLocalDirectoryIterator(const String & path) : iter(path) {}
explicit DiskLocalDirectoryIterator(const String & disk_path_, const String & dir_path_) :
dir_path(dir_path_), iter(disk_path_ + dir_path_) {}
void next() override { ++iter; }
bool isValid() const override { return iter != Poco::DirectoryIterator(); }
String name() const override { return iter.name(); }
String path() const override
{
if (iter->isDirectory())
return dir_path + iter.name() + '/';
else
return dir_path + iter.name();
}
private:
String dir_path;
Poco::DirectoryIterator iter;
};
......
#include "DiskMemory.h"
#include "DiskFactory.h"
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int FILE_DOESNT_EXIST;
extern const int FILE_ALREADY_EXISTS;
extern const int DIRECTORY_DOESNT_EXIST;
extern const int CANNOT_DELETE_DIRECTORY;
}
ReservationPtr DiskMemory::reserve(UInt64 /*bytes*/)
{
throw Exception("Method reserve is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
UInt64 DiskMemory::getTotalSpace() const
{
return 0;
}
UInt64 DiskMemory::getAvailableSpace() const
{
return 0;
}
UInt64 DiskMemory::getUnreservedSpace() const
{
return 0;
}
bool DiskMemory::exists(const String & path) const
{
std::lock_guard lock(mutex);
return files.find(path) != files.end();
}
bool DiskMemory::isFile(const String & path) const
{
std::lock_guard lock(mutex);
auto iter = files.find(path);
if (iter == files.end())
return false;
return iter->second.type == FileType::File;
}
bool DiskMemory::isDirectory(const String & path) const
{
std::lock_guard lock(mutex);
auto iter = files.find(path);
if (iter == files.end())
return false;
return iter->second.type == FileType::Directory;
}
size_t DiskMemory::getFileSize(const String & path) const
{
std::lock_guard lock(mutex);
auto iter = files.find(path);
if (iter == files.end())
throw Exception("File " + path + " does not exist", ErrorCodes::FILE_DOESNT_EXIST);
return iter->second.data.size();
}
void DiskMemory::createDirectory(const String & path)
{
std::lock_guard lock(mutex);
if (files.find(path) != files.end())
return;
String parent_path = parentPath(path);
if (!parent_path.empty() && files.find(parent_path) == files.end())
throw Exception(
"Failed to create directory " + path + ". Parent directory " + parent_path + " does not exist",
ErrorCodes::DIRECTORY_DOESNT_EXIST);
files.emplace(path, FileData{FileType::Directory});
}
void DiskMemory::createDirectories(const String & path)
{
std::lock_guard lock(mutex);
createDirectoriesImpl(path);
}
void DiskMemory::createDirectoriesImpl(const String & path)
{
if (files.find(path) != files.end())
return;
String parent_path = parentPath(path);
if (!parent_path.empty())
createDirectoriesImpl(parent_path);
files.emplace(path, FileData{FileType::Directory});
}
void DiskMemory::clearDirectory(const String & path)
{
std::lock_guard lock(mutex);
if (files.find(path) == files.end())
throw Exception("Directory " + path + " does not exist", ErrorCodes::DIRECTORY_DOESNT_EXIST);
for (auto iter = files.begin(); iter != files.end();)
{
if (parentPath(iter->first) != path)
{
++iter;
continue;
}
if (iter->second.type == FileType::Directory)
throw Exception(
"Failed to clear directory " + path + ". " + iter->first + " is a directory", ErrorCodes::CANNOT_DELETE_DIRECTORY);
files.erase(iter++);
}
}
void DiskMemory::moveDirectory(const String & /*from_path*/, const String & /*to_path*/)
{
throw Exception("Method moveDirectory is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
DiskDirectoryIteratorPtr DiskMemory::iterateDirectory(const String & path)
{
std::lock_guard lock(mutex);
if (!path.empty() && files.find(path) == files.end())
throw Exception("Directory " + path + " does not exist", ErrorCodes::DIRECTORY_DOESNT_EXIST);
std::vector<String> dir_file_paths;
for (const auto & file : files)
if (parentPath(file.first) == path)
dir_file_paths.push_back(file.first);
return std::make_unique<DiskMemoryDirectoryIterator>(std::move(dir_file_paths));
}
void DiskMemory::moveFile(const String & from_path, const String & to_path)
{
std::lock_guard lock(mutex);
if (files.find(to_path) != files.end())
throw Exception(
"Failed to move file from " + from_path + " to " + to_path + ". File " + to_path + " already exist",
ErrorCodes::FILE_ALREADY_EXISTS);
replaceFileImpl(from_path, to_path);
}
void DiskMemory::replaceFile(const String & from_path, const String & to_path)
{
std::lock_guard lock(mutex);
replaceFileImpl(from_path, to_path);
}
void DiskMemory::replaceFileImpl(const String & from_path, const String & to_path)
{
String to_parent_path = parentPath(to_path);
if (!to_parent_path.empty() && files.find(to_parent_path) == files.end())
throw Exception(
"Failed to move file from " + from_path + " to " + to_path + ". Directory " + to_parent_path + " does not exist",
ErrorCodes::DIRECTORY_DOESNT_EXIST);
auto iter = files.find(from_path);
if (iter == files.end())
throw Exception(
"Failed to move file from " + from_path + " to " + to_path + ". File " + from_path + " does not exist",
ErrorCodes::FILE_DOESNT_EXIST);
auto node = files.extract(iter);
node.key() = to_path;
files.insert(std::move(node));
}
void DiskMemory::copyFile(const String & /*from_path*/, const String & /*to_path*/)
{
throw Exception("Method copyFile is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
std::unique_ptr<ReadBuffer> DiskMemory::readFile(const String & path, size_t /*buf_size*/) const
{
std::lock_guard lock(mutex);
auto iter = files.find(path);
if (iter == files.end())
throw Exception("File " + path + " does not exist", ErrorCodes::FILE_DOESNT_EXIST);
return std::make_unique<ReadBufferFromString>(iter->second.data);
}
std::unique_ptr<WriteBuffer> DiskMemory::writeFile(const String & path, size_t /*buf_size*/, WriteMode mode)
{
std::lock_guard lock(mutex);
auto iter = files.find(path);
if (iter == files.end())
{
String parent_path = parentPath(path);
if (!parent_path.empty() && files.find(parent_path) == files.end())
throw Exception(
"Failed to create file " + path + ". Directory " + parent_path + " does not exist", ErrorCodes::DIRECTORY_DOESNT_EXIST);
iter = files.emplace(path, FileData{FileType::File}).first;
}
if (mode == WriteMode::Append)
return std::make_unique<WriteBufferFromString>(iter->second.data, WriteBufferFromString::AppendModeTag{});
else
return std::make_unique<WriteBufferFromString>(iter->second.data);
}
void registerDiskMemory(DiskFactory & factory)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & /*config*/,
const String & /*config_prefix*/,
const Context & /*context*/) -> DiskPtr { return std::make_shared<DiskMemory>(name); };
factory.registerDiskType("memory", creator);
}
}
#pragma once
#include <Disks/IDisk.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include <mutex>
#include <unordered_map>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class DiskMemory : public IDisk
{
public:
DiskMemory(const String & name_) : name(name_), disk_path("memory://" + name_ + '/') { }
const String & getName() const override { return name; }
const String & getPath() const override { return disk_path; }
ReservationPtr reserve(UInt64 bytes) override;
UInt64 getTotalSpace() const override;
UInt64 getAvailableSpace() const override;
UInt64 getUnreservedSpace() const override;
bool exists(const String & path) const override;
bool isFile(const String & path) const override;
bool isDirectory(const String & path) const override;
size_t getFileSize(const String & path) const override;
void createDirectory(const String & path) override;
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
void copyFile(const String & from_path, const String & to_path) override;
std::unique_ptr<ReadBuffer> readFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) const override;
std::unique_ptr<WriteBuffer> writeFile(
const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
WriteMode mode = WriteMode::Rewrite) override;
private:
void createDirectoriesImpl(const String & path);
void replaceFileImpl(const String & from_path, const String & to_path);
private:
enum class FileType
{
File,
Directory
};
struct FileData
{
FileType type;
String data;
explicit FileData(FileType type_) : type(type_) { }
};
using Files = std::unordered_map<String, FileData>; /// file path -> file data
const String name;
const String disk_path;
Files files;
mutable std::mutex mutex;
};
using DiskMemoryPtr = std::shared_ptr<DiskMemory>;
class DiskMemoryDirectoryIterator : public IDiskDirectoryIterator
{
public:
explicit DiskMemoryDirectoryIterator(std::vector<String> && dir_file_paths_)
: dir_file_paths(std::move(dir_file_paths_)), iter(dir_file_paths.begin())
{
}
void next() override { ++iter; }
bool isValid() const override { return iter != dir_file_paths.end(); }
String path() const override { return *iter; }
private:
std::vector<String> dir_file_paths;
std::vector<String>::iterator iter;
};
class DiskFactory;
void registerDiskMemory(DiskFactory & factory);
}
......@@ -142,8 +142,8 @@ public:
/// Return `true` if the iterator points to a valid element.
virtual bool isValid() const = 0;
/// Name of the file that the iterator currently points to.
virtual String name() const = 0;
/// Path to the file that the iterator currently points to.
virtual String path() const = 0;
virtual ~IDiskDirectoryIterator() = default;
};
......
......@@ -4,12 +4,14 @@
namespace DB
{
void registerDiskLocal(DiskFactory & factory);
void registerDiskMemory(DiskFactory & factory);
void registerDisks()
{
auto & factory = DiskFactory::instance();
registerDiskLocal(factory);
registerDiskMemory(factory);
}
}
#include <gtest/gtest.h>
#include <Disks/DiskLocal.h>
#include <Disks/DiskMemory.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
template <typename T>
DB::DiskPtr createDisk();
template <>
DB::DiskPtr createDisk<DB::DiskMemory>()
{
return std::make_shared<DB::DiskMemory>("memory_disk");
}
template <>
DB::DiskPtr createDisk<DB::DiskLocal>()
{
Poco::File("tmp/").createDirectory();
return std::make_shared<DB::DiskLocal>("local_disk", "tmp/", 0);
}
template <typename T>
void destroyDisk(DB::DiskPtr & disk)
{
disk.reset();
}
template <>
void destroyDisk<DB::DiskLocal>(DB::DiskPtr & disk)
{
disk.reset();
Poco::File("tmp/").remove(true);
}
template <typename T>
class DiskTest : public testing::Test
{
public:
void SetUp() override { disk_ = createDisk<T>(); }
void TearDown() override { destroyDisk<T>(disk_); }
const DB::DiskPtr & getDisk() { return disk_; }
private:
DB::DiskPtr disk_;
};
typedef testing::Types<DB::DiskMemory, DB::DiskLocal> DiskImplementations;
TYPED_TEST_SUITE(DiskTest, DiskImplementations);
TYPED_TEST(DiskTest, createDirectories)
{
const auto & disk = this->getDisk();
disk->createDirectories("test_dir1/");
EXPECT_TRUE(disk->isDirectory("test_dir1/"));
disk->createDirectories("test_dir2/nested_dir/");
EXPECT_TRUE(disk->isDirectory("test_dir2/nested_dir/"));
}
TYPED_TEST(DiskTest, writeFile)
{
const auto & disk = this->getDisk();
{
std::unique_ptr<DB::WriteBuffer> out = disk->writeFile("test_file");
writeString("test data", *out);
}
DB::String data;
{
std::unique_ptr<DB::ReadBuffer> in = disk->readFile("test_file");
readString(data, *in);
}
EXPECT_EQ("test data", data);
EXPECT_EQ(data.size(), disk->getFileSize("test_file"));
}
TYPED_TEST(DiskTest, iterateDirectory)
{
const auto & disk = this->getDisk();
disk->createDirectories("test_dir/nested_dir/");
{
auto iter = disk->iterateDirectory("");
EXPECT_TRUE(iter->isValid());
EXPECT_EQ("test_dir/", iter->path());
iter->next();
EXPECT_FALSE(iter->isValid());
}
{
auto iter = disk->iterateDirectory("test_dir/");
EXPECT_TRUE(iter->isValid());
EXPECT_EQ("test_dir/nested_dir/", iter->path());
iter->next();
EXPECT_FALSE(iter->isValid());
}
}
#include <gtest/gtest.h>
#include <Disks/IDisk.h>
TEST(DiskTest, parentPath)
{
EXPECT_EQ("", DB::parentPath("test_dir/"));
EXPECT_EQ("test_dir/", DB::parentPath("test_dir/nested_dir/"));
EXPECT_EQ("test_dir/", DB::parentPath("test_dir/nested_file"));
}
TEST(DiskTest, fileName)
{
EXPECT_EQ("test_file", DB::fileName("test_file"));
EXPECT_EQ("nested_file", DB::fileName("test_dir/nested_file"));
EXPECT_EQ("", DB::fileName("test_dir/"));
EXPECT_EQ("", DB::fileName(""));
}
......@@ -390,7 +390,7 @@ struct ConvertImpl<FromDataType, std::enable_if_t<!std::is_same_v<FromDataType,
offsets_to[i] = write_buffer.count();
}
write_buffer.finish();
write_buffer.finalize();
block.getByPosition(result).column = std::move(col_to);
}
else
......@@ -430,7 +430,7 @@ struct ConvertImplGenericToString
offsets_to[i] = write_buffer.count();
}
write_buffer.finish();
write_buffer.finalize();
block.getByPosition(result).column = std::move(col_to);
}
};
......
......@@ -113,7 +113,7 @@ private:
offsets_to[i] = buf_to.count();
}
buf_to.finish();
buf_to.finalize();
block.getByPosition(result).column = std::move(col_to);
}
else
......@@ -192,7 +192,7 @@ private:
offsets_to[i] = buf_to.count();
}
buf_to.finish();
buf_to.finalize();
block.getByPosition(result).column = std::move(col_to);
return true;
}
......
......@@ -993,7 +993,7 @@ public:
auto & chars = col_str.getChars();
WriteBufferFromVector<ColumnString::Chars> buf(chars, WriteBufferFromVector<ColumnString::Chars>::AppendModeTag());
traverse(it, buf);
buf.finish();
buf.finalize();
chars.push_back(0);
col_str.getOffsets().push_back(chars.size());
return true;
......
......@@ -121,7 +121,7 @@ struct ToValidUTF8Impl
res_offsets[i] = write_buffer.count();
prev_offset = offsets[i];
}
write_buffer.finish();
write_buffer.finalize();
}
[[noreturn]] static void vector_fixed(const ColumnString::Chars &, size_t, ColumnString::Chars &)
......
......@@ -50,6 +50,11 @@ public:
/// Close file before destruction of object.
void close();
void finalize() override
{
close();
}
std::string getFileName() const override
{
return file_name;
......
......@@ -59,7 +59,7 @@ void WriteBufferFromS3::nextImpl()
if (last_part_size > minimum_upload_part_size)
{
temporary_buffer->finish();
temporary_buffer->finalize();
writePart(buffer_string);
last_part_size = 0;
temporary_buffer = std::make_unique<WriteBufferFromString>(buffer_string);
......@@ -69,7 +69,7 @@ void WriteBufferFromS3::nextImpl()
void WriteBufferFromS3::finalize()
{
temporary_buffer->finish();
temporary_buffer->finalize();
if (!buffer_string.empty())
{
writePart(buffer_string);
......
......@@ -34,7 +34,7 @@ public:
std::string & str()
{
finish();
finalize();
return value;
}
};
......
......@@ -65,7 +65,7 @@ public:
set(reinterpret_cast<Position>(vector.data() + old_size), (size - old_size) * sizeof(typename VectorType::value_type));
}
void finish()
void finalize() override
{
if (is_finished)
return;
......@@ -91,7 +91,7 @@ public:
{
try
{
finish();
finalize();
}
catch (...)
{
......
......@@ -620,7 +620,7 @@ void executeQuery(
WriteBufferFromVector<PODArray<char>> out(parse_buf);
LimitReadBuffer limit(istr, max_query_size + 1, false);
copyData(limit, out);
out.finish();
out.finalize();
begin = parse_buf.data();
end = begin + parse_buf.size();
......
......@@ -774,7 +774,7 @@ String PipelineExecutor::dumpPipeline() const
WriteBufferFromOwnString out;
printPipeline(processors, statuses, out);
out.finish();
out.finalize();
return out.str();
}
......
......@@ -144,7 +144,7 @@ private:
void finalize()
{
compressed.next();
plain->next();
plain->finalize();
}
};
......
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
<storage_configuration>
<disks>
<default>
<type>memory</type>
</default>
</disks>
</storage_configuration>
</yandex>
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", main_configs=['configs/config.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_tinylog(started_cluster):
node.query('''CREATE DATABASE IF NOT EXISTS test''')
node.query('''CREATE TABLE test.tinylog (s String, n UInt8) ENGINE = TinyLog''')
node.query('''INSERT INTO test.tinylog SELECT toString(number), number * 2 FROM system.numbers LIMIT 5''')
assert TSV(node.query('''SELECT * FROM test.tinylog''')) == TSV('0\t0\n1\t2\n2\t4\n3\t6\n4\t8')
node.query('''TRUNCATE TABLE test.tinylog''')
assert TSV(node.query('''SELECT * FROM test.tinylog''')) == TSV('')
node.query('''DROP TABLE test.tinylog''')
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册