未验证 提交 45c66b3c 编写于 作者: J Jin Hai 提交者: GitHub

Merge pull request #1585 from cydrain/caiyd_1548_move_store

Caiyd 1548 move store
......@@ -7,6 +7,17 @@ Please mark all change in change log and use the issue from GitHub
## Bug
- \#1635 Vectors can be returned by searching after vectors deleted if `cache_insert_data` set true
## Feature
## Improvement
- \#1537 Optimize raw vector and uids read/write
- \#1546 Move Config.cpp to config directory
- \#1547 Rename storage/file to storage/disk and rename classes
- \#1548 Move store/Directory to storage/Operation and add FSHandler
## Task
# Milvus 0.7.0 (2020-03-11)
## Bug
......@@ -113,9 +124,7 @@ Please mark all change in change log and use the issue from GitHub
- \#1448 General proto api for NNS libraries
- \#1480 Add return code for AVX512 selection
- \#1524 Update config "preload_table" description
- \#1537 Optimize raw vector and uids read/write
- \#1544 Update resources name in HTTP module
- \#1546 Move Config.cpp to config directory
- \#1567 Update yaml config description
## Task
......
......@@ -106,11 +106,11 @@ set(web_server_files
)
aux_source_directory(${MILVUS_ENGINE_SRC}/storage storage_main_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/storage/file storage_file_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/storage/disk storage_disk_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/storage/s3 storage_s3_files)
set(storage_files
${storage_main_files}
${storage_file_files}
${storage_disk_files}
${storage_s3_files}
)
......@@ -125,8 +125,6 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/codecs/default codecs_default_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/segment segment_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/store store_files)
set(engine_files
${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
${cache_files}
......@@ -143,7 +141,6 @@ set(engine_files
${codecs_files}
${codecs_default_files}
${segment_files}
${store_files}
)
if (MILVUS_WITH_PROMETHEUS)
......
......@@ -20,7 +20,7 @@
#include <memory>
#include "segment/DeletedDocs.h"
#include "store/Directory.h"
#include "storage/FSHandler.h"
namespace milvus {
namespace codec {
......@@ -28,10 +28,10 @@ namespace codec {
class DeletedDocsFormat {
public:
virtual void
read(const store::DirectoryPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) = 0;
read(const storage::FSHandlerPtr& fs_ptr, segment::DeletedDocsPtr& deleted_docs) = 0;
virtual void
write(const store::DirectoryPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) = 0;
write(const storage::FSHandlerPtr& fs_ptr, const segment::DeletedDocsPtr& deleted_docs) = 0;
};
using DeletedDocsFormatPtr = std::shared_ptr<DeletedDocsFormat>;
......
......@@ -20,7 +20,7 @@
#include <memory>
#include "segment/IdBloomFilter.h"
#include "store/Directory.h"
#include "storage/FSHandler.h"
namespace milvus {
namespace codec {
......@@ -28,13 +28,13 @@ namespace codec {
class IdBloomFilterFormat {
public:
virtual void
read(const store::DirectoryPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0;
read(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0;
virtual void
write(const store::DirectoryPtr& directory_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0;
write(const storage::FSHandlerPtr& fs_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0;
virtual void
create(const store::DirectoryPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0;
create(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0;
};
using IdBloomFilterFormatPtr = std::shared_ptr<IdBloomFilterFormat>;
......
......@@ -21,7 +21,7 @@
#include <vector>
#include "segment/Vectors.h"
#include "store/Directory.h"
#include "storage/FSHandler.h"
namespace milvus {
namespace codec {
......@@ -29,16 +29,16 @@ namespace codec {
class VectorsFormat {
public:
virtual void
read(const store::DirectoryPtr& directory_ptr, segment::VectorsPtr& vectors_read) = 0;
read(const storage::FSHandlerPtr& fs_ptr, segment::VectorsPtr& vectors_read) = 0;
virtual void
write(const store::DirectoryPtr& directory_ptr, const segment::VectorsPtr& vectors) = 0;
write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorsPtr& vectors) = 0;
virtual void
read_uids(const store::DirectoryPtr& directory_ptr, std::vector<segment::doc_id_t>& uids) = 0;
read_uids(const storage::FSHandlerPtr& fs_ptr, std::vector<segment::doc_id_t>& uids) = 0;
virtual void
read_vectors(const store::DirectoryPtr& directory_ptr, off_t offset, size_t num_bytes,
read_vectors(const storage::FSHandlerPtr& fs_ptr, off_t offset, size_t num_bytes,
std::vector<uint8_t>& raw_vectors) = 0;
};
......
......@@ -35,10 +35,10 @@ namespace milvus {
namespace codec {
void
DefaultDeletedDocsFormat::read(const store::DirectoryPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) {
DefaultDeletedDocsFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::DeletedDocsPtr& deleted_docs) {
const std::lock_guard<std::mutex> lock(mutex_);
std::string dir_path = directory_ptr->GetDirPath();
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
const std::string del_file_path = dir_path + "/" + deleted_docs_filename_;
int del_fd = open(del_file_path.c_str(), O_RDONLY, 00664);
......@@ -75,10 +75,10 @@ DefaultDeletedDocsFormat::read(const store::DirectoryPtr& directory_ptr, segment
}
void
DefaultDeletedDocsFormat::write(const store::DirectoryPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) {
DefaultDeletedDocsFormat::write(const storage::FSHandlerPtr& fs_ptr, const segment::DeletedDocsPtr& deleted_docs) {
const std::lock_guard<std::mutex> lock(mutex_);
std::string dir_path = directory_ptr->GetDirPath();
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
const std::string del_file_path = dir_path + "/" + deleted_docs_filename_;
// Create a temporary file from the existing file
......
......@@ -30,10 +30,10 @@ class DefaultDeletedDocsFormat : public DeletedDocsFormat {
DefaultDeletedDocsFormat() = default;
void
read(const store::DirectoryPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) override;
read(const storage::FSHandlerPtr& fs_ptr, segment::DeletedDocsPtr& deleted_docs) override;
void
write(const store::DirectoryPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) override;
write(const storage::FSHandlerPtr& fs_ptr, const segment::DeletedDocsPtr& deleted_docs) override;
// No copy and move
DefaultDeletedDocsFormat(const DefaultDeletedDocsFormat&) = delete;
......
......@@ -30,11 +30,10 @@ constexpr unsigned int bloom_filter_capacity = 500000;
constexpr double bloom_filter_error_rate = 0.01;
void
DefaultIdBloomFilterFormat::read(const store::DirectoryPtr& directory_ptr,
segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
DefaultIdBloomFilterFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
const std::lock_guard<std::mutex> lock(mutex_);
std::string dir_path = directory_ptr->GetDirPath();
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_;
scaling_bloom_t* bloom_filter =
new_scaling_bloom_from_file(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str());
......@@ -48,11 +47,11 @@ DefaultIdBloomFilterFormat::read(const store::DirectoryPtr& directory_ptr,
}
void
DefaultIdBloomFilterFormat::write(const store::DirectoryPtr& directory_ptr,
DefaultIdBloomFilterFormat::write(const storage::FSHandlerPtr& fs_ptr,
const segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
const std::lock_guard<std::mutex> lock(mutex_);
std::string dir_path = directory_ptr->GetDirPath();
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_;
if (scaling_bloom_flush(id_bloom_filter_ptr->GetBloomFilter()) == -1) {
std::string err_msg =
......@@ -63,9 +62,9 @@ DefaultIdBloomFilterFormat::write(const store::DirectoryPtr& directory_ptr,
}
void
DefaultIdBloomFilterFormat::create(const store::DirectoryPtr& directory_ptr,
DefaultIdBloomFilterFormat::create(const storage::FSHandlerPtr& fs_ptr,
segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
std::string dir_path = directory_ptr->GetDirPath();
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_;
scaling_bloom_t* bloom_filter =
new_scaling_bloom(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str());
......
......@@ -22,7 +22,7 @@
#include "codecs/IdBloomFilterFormat.h"
#include "segment/IdBloomFilter.h"
#include "store/Directory.h"
#include "storage/disk/DiskOperation.h"
namespace milvus {
namespace codec {
......@@ -32,13 +32,13 @@ class DefaultIdBloomFilterFormat : public IdBloomFilterFormat {
DefaultIdBloomFilterFormat() = default;
void
read(const store::DirectoryPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override;
read(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override;
void
write(const store::DirectoryPtr& directory_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) override;
write(const storage::FSHandlerPtr& fs_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) override;
void
create(const store::DirectoryPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override;
create(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override;
// No copy and move
DefaultIdBloomFilterFormat(const DefaultIdBloomFilterFormat&) = delete;
......
......@@ -102,10 +102,10 @@ DefaultVectorsFormat::read_uids_internal(const std::string& file_path, std::vect
}
void
DefaultVectorsFormat::read(const store::DirectoryPtr& directory_ptr, segment::VectorsPtr& vectors_read) {
DefaultVectorsFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::VectorsPtr& vectors_read) {
const std::lock_guard<std::mutex> lock(mutex_);
std::string dir_path = directory_ptr->GetDirPath();
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
if (!boost::filesystem::is_directory(dir_path)) {
std::string err_msg = "Directory: " + dir_path + "does not exist";
ENGINE_LOG_ERROR << err_msg;
......@@ -134,10 +134,10 @@ DefaultVectorsFormat::read(const store::DirectoryPtr& directory_ptr, segment::Ve
}
void
DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segment::VectorsPtr& vectors) {
DefaultVectorsFormat::write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorsPtr& vectors) {
const std::lock_guard<std::mutex> lock(mutex_);
std::string dir_path = directory_ptr->GetDirPath();
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
const std::string rv_file_path = dir_path + "/" + vectors->GetName() + raw_vector_extension_;
const std::string uid_file_path = dir_path + "/" + vectors->GetName() + user_id_extension_;
......@@ -197,10 +197,10 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm
}
void
DefaultVectorsFormat::read_uids(const store::DirectoryPtr& directory_ptr, std::vector<segment::doc_id_t>& uids) {
DefaultVectorsFormat::read_uids(const storage::FSHandlerPtr& fs_ptr, std::vector<segment::doc_id_t>& uids) {
const std::lock_guard<std::mutex> lock(mutex_);
std::string dir_path = directory_ptr->GetDirPath();
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
if (!boost::filesystem::is_directory(dir_path)) {
std::string err_msg = "Directory: " + dir_path + "does not exist";
ENGINE_LOG_ERROR << err_msg;
......@@ -221,11 +221,11 @@ DefaultVectorsFormat::read_uids(const store::DirectoryPtr& directory_ptr, std::v
}
void
DefaultVectorsFormat::read_vectors(const store::DirectoryPtr& directory_ptr, off_t offset, size_t num_bytes,
DefaultVectorsFormat::read_vectors(const storage::FSHandlerPtr& fs_ptr, off_t offset, size_t num_bytes,
std::vector<uint8_t>& raw_vectors) {
const std::lock_guard<std::mutex> lock(mutex_);
std::string dir_path = directory_ptr->GetDirPath();
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
if (!boost::filesystem::is_directory(dir_path)) {
std::string err_msg = "Directory: " + dir_path + "does not exist";
ENGINE_LOG_ERROR << err_msg;
......
......@@ -32,16 +32,17 @@ class DefaultVectorsFormat : public VectorsFormat {
DefaultVectorsFormat() = default;
void
read(const store::DirectoryPtr&, segment::VectorsPtr&) override;
read(const storage::FSHandlerPtr& fs_ptr, segment::VectorsPtr& vectors_read) override;
void
write(const store::DirectoryPtr&, const segment::VectorsPtr&) override;
write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorsPtr& vectors) override;
void
read_vectors(const store::DirectoryPtr&, off_t, size_t, std::vector<uint8_t>&) override;
read_uids(const storage::FSHandlerPtr& fs_ptr, std::vector<segment::doc_id_t>& uids) override;
void
read_uids(const store::DirectoryPtr&, std::vector<segment::doc_id_t>&) override;
read_vectors(const storage::FSHandlerPtr& fs_ptr, off_t offset, size_t num_bytes,
std::vector<uint8_t>& raw_vectors) override;
// No copy and move
DefaultVectorsFormat(const DefaultVectorsFormat&) = delete;
......
......@@ -21,14 +21,19 @@
#include "Vectors.h"
#include "codecs/default/DefaultCodec.h"
#include "store/Directory.h"
#include "storage/disk/DiskIOReader.h"
#include "storage/disk/DiskIOWriter.h"
#include "storage/disk/DiskOperation.h"
#include "utils/Log.h"
namespace milvus {
namespace segment {
SegmentReader::SegmentReader(const std::string& directory) {
directory_ptr_ = std::make_shared<store::Directory>(directory);
storage::IOReaderPtr reader_ptr = std::make_shared<storage::DiskIOReader>();
storage::IOWriterPtr writer_ptr = std::make_shared<storage::DiskIOWriter>();
storage::OperationPtr operation_ptr = std::make_shared<storage::DiskOperation>(directory);
fs_ptr_ = std::make_shared<storage::FSHandler>(reader_ptr, writer_ptr, operation_ptr);
segment_ptr_ = std::make_shared<Segment>();
}
......@@ -43,9 +48,9 @@ SegmentReader::Load() {
// TODO(zhiru)
codec::DefaultCodec default_codec;
try {
directory_ptr_->Create();
default_codec.GetVectorsFormat()->read(directory_ptr_, segment_ptr_->vectors_ptr_);
default_codec.GetDeletedDocsFormat()->read(directory_ptr_, segment_ptr_->deleted_docs_ptr_);
fs_ptr_->operation_ptr_->CreateDirectory();
default_codec.GetVectorsFormat()->read(fs_ptr_, segment_ptr_->vectors_ptr_);
default_codec.GetDeletedDocsFormat()->read(fs_ptr_, segment_ptr_->deleted_docs_ptr_);
} catch (std::exception& e) {
return Status(DB_ERROR, e.what());
}
......@@ -56,8 +61,8 @@ Status
SegmentReader::LoadVectors(off_t offset, size_t num_bytes, std::vector<uint8_t>& raw_vectors) {
codec::DefaultCodec default_codec;
try {
directory_ptr_->Create();
default_codec.GetVectorsFormat()->read_vectors(directory_ptr_, offset, num_bytes, raw_vectors);
fs_ptr_->operation_ptr_->CreateDirectory();
default_codec.GetVectorsFormat()->read_vectors(fs_ptr_, offset, num_bytes, raw_vectors);
} catch (std::exception& e) {
std::string err_msg = "Failed to load raw vectors: " + std::string(e.what());
ENGINE_LOG_ERROR << err_msg;
......@@ -70,8 +75,8 @@ Status
SegmentReader::LoadUids(std::vector<doc_id_t>& uids) {
codec::DefaultCodec default_codec;
try {
directory_ptr_->Create();
default_codec.GetVectorsFormat()->read_uids(directory_ptr_, uids);
fs_ptr_->operation_ptr_->CreateDirectory();
default_codec.GetVectorsFormat()->read_uids(fs_ptr_, uids);
} catch (std::exception& e) {
std::string err_msg = "Failed to load uids: " + std::string(e.what());
ENGINE_LOG_ERROR << err_msg;
......@@ -90,8 +95,8 @@ Status
SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
codec::DefaultCodec default_codec;
try {
directory_ptr_->Create();
default_codec.GetIdBloomFilterFormat()->read(directory_ptr_, id_bloom_filter_ptr);
fs_ptr_->operation_ptr_->CreateDirectory();
default_codec.GetIdBloomFilterFormat()->read(fs_ptr_, id_bloom_filter_ptr);
} catch (std::exception& e) {
std::string err_msg = "Failed to load bloom filter: " + std::string(e.what());
ENGINE_LOG_ERROR << err_msg;
......@@ -104,8 +109,8 @@ Status
SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) {
codec::DefaultCodec default_codec;
try {
directory_ptr_->Create();
default_codec.GetDeletedDocsFormat()->read(directory_ptr_, deleted_docs_ptr);
fs_ptr_->operation_ptr_->CreateDirectory();
default_codec.GetDeletedDocsFormat()->read(fs_ptr_, deleted_docs_ptr);
} catch (std::exception& e) {
std::string err_msg = "Failed to load deleted docs: " + std::string(e.what());
ENGINE_LOG_ERROR << err_msg;
......
......@@ -22,7 +22,7 @@
#include <vector>
#include "segment/Types.h"
#include "store/Directory.h"
#include "storage/FSHandler.h"
#include "utils/Status.h"
namespace milvus {
......@@ -55,7 +55,7 @@ class SegmentReader {
GetSegment(SegmentPtr& segment_ptr);
private:
store::DirectoryPtr directory_ptr_;
storage::FSHandlerPtr fs_ptr_;
SegmentPtr segment_ptr_;
};
......
......@@ -23,14 +23,19 @@
#include "SegmentReader.h"
#include "Vectors.h"
#include "codecs/default/DefaultCodec.h"
#include "store/Directory.h"
#include "storage/disk/DiskIOReader.h"
#include "storage/disk/DiskIOWriter.h"
#include "storage/disk/DiskOperation.h"
#include "utils/Log.h"
namespace milvus {
namespace segment {
SegmentWriter::SegmentWriter(const std::string& directory) {
directory_ptr_ = std::make_shared<store::Directory>(directory);
storage::IOReaderPtr reader_ptr = std::make_shared<storage::DiskIOReader>();
storage::IOWriterPtr writer_ptr = std::make_shared<storage::DiskIOWriter>();
storage::OperationPtr operation_ptr = std::make_shared<storage::DiskOperation>(directory);
fs_ptr_ = std::make_shared<storage::FSHandler>(reader_ptr, writer_ptr, operation_ptr);
segment_ptr_ = std::make_shared<Segment>();
}
......@@ -84,8 +89,8 @@ Status
SegmentWriter::WriteVectors() {
codec::DefaultCodec default_codec;
try {
directory_ptr_->Create();
default_codec.GetVectorsFormat()->write(directory_ptr_, segment_ptr_->vectors_ptr_);
fs_ptr_->operation_ptr_->CreateDirectory();
default_codec.GetVectorsFormat()->write(fs_ptr_, segment_ptr_->vectors_ptr_);
} catch (std::exception& e) {
std::string err_msg = "Failed to write vectors: " + std::string(e.what());
ENGINE_LOG_ERROR << err_msg;
......@@ -98,11 +103,11 @@ Status
SegmentWriter::WriteBloomFilter() {
codec::DefaultCodec default_codec;
try {
directory_ptr_->Create();
fs_ptr_->operation_ptr_->CreateDirectory();
auto start = std::chrono::high_resolution_clock::now();
default_codec.GetIdBloomFilterFormat()->create(directory_ptr_, segment_ptr_->id_bloom_filter_ptr_);
default_codec.GetIdBloomFilterFormat()->create(fs_ptr_, segment_ptr_->id_bloom_filter_ptr_);
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> diff = end - start;
......@@ -121,7 +126,7 @@ SegmentWriter::WriteBloomFilter() {
start = std::chrono::high_resolution_clock::now();
default_codec.GetIdBloomFilterFormat()->write(directory_ptr_, segment_ptr_->id_bloom_filter_ptr_);
default_codec.GetIdBloomFilterFormat()->write(fs_ptr_, segment_ptr_->id_bloom_filter_ptr_);
end = std::chrono::high_resolution_clock::now();
diff = end - start;
......@@ -138,9 +143,9 @@ Status
SegmentWriter::WriteDeletedDocs() {
codec::DefaultCodec default_codec;
try {
directory_ptr_->Create();
fs_ptr_->operation_ptr_->CreateDirectory();
DeletedDocsPtr deleted_docs_ptr = std::make_shared<DeletedDocs>();
default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs_ptr);
default_codec.GetDeletedDocsFormat()->write(fs_ptr_, deleted_docs_ptr);
} catch (std::exception& e) {
std::string err_msg = "Failed to write deleted docs: " + std::string(e.what());
ENGINE_LOG_ERROR << err_msg;
......@@ -153,8 +158,8 @@ Status
SegmentWriter::WriteDeletedDocs(const DeletedDocsPtr& deleted_docs) {
codec::DefaultCodec default_codec;
try {
directory_ptr_->Create();
default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs);
fs_ptr_->operation_ptr_->CreateDirectory();
default_codec.GetDeletedDocsFormat()->write(fs_ptr_, deleted_docs);
} catch (std::exception& e) {
std::string err_msg = "Failed to write deleted docs: " + std::string(e.what());
ENGINE_LOG_ERROR << err_msg;
......@@ -167,8 +172,8 @@ Status
SegmentWriter::WriteBloomFilter(const IdBloomFilterPtr& id_bloom_filter_ptr) {
codec::DefaultCodec default_codec;
try {
directory_ptr_->Create();
default_codec.GetIdBloomFilterFormat()->write(directory_ptr_, id_bloom_filter_ptr);
fs_ptr_->operation_ptr_->CreateDirectory();
default_codec.GetIdBloomFilterFormat()->write(fs_ptr_, id_bloom_filter_ptr);
} catch (std::exception& e) {
std::string err_msg = "Failed to write bloom filter: " + std::string(e.what());
ENGINE_LOG_ERROR << err_msg;
......@@ -191,11 +196,11 @@ SegmentWriter::GetSegment(SegmentPtr& segment_ptr) {
Status
SegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name) {
if (dir_to_merge == directory_ptr_->GetDirPath()) {
if (dir_to_merge == fs_ptr_->operation_ptr_->GetDirectory()) {
return Status(DB_ERROR, "Cannot Merge Self");
}
ENGINE_LOG_DEBUG << "Merging from " << dir_to_merge << " to " << directory_ptr_->GetDirPath();
ENGINE_LOG_DEBUG << "Merging from " << dir_to_merge << " to " << fs_ptr_->operation_ptr_->GetDirectory();
auto start = std::chrono::high_resolution_clock::now();
......@@ -234,7 +239,7 @@ SegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name) {
ENGINE_LOG_DEBUG << "Adding " << segment_to_merge->vectors_ptr_->GetCount() << " vectors and uids took "
<< diff.count() << " s";
ENGINE_LOG_DEBUG << "Merging completed from " << dir_to_merge << " to " << directory_ptr_->GetDirPath();
ENGINE_LOG_DEBUG << "Merging completed from " << dir_to_merge << " to " << fs_ptr_->operation_ptr_->GetDirectory();
return Status::OK();
}
......
......@@ -22,7 +22,7 @@
#include <vector>
#include "segment/Types.h"
#include "store/Directory.h"
#include "storage/FSHandler.h"
#include "utils/Status.h"
namespace milvus {
......@@ -70,7 +70,7 @@ class SegmentWriter {
WriteDeletedDocs();
private:
store::DirectoryPtr directory_ptr_;
storage::FSHandlerPtr fs_ptr_;
SegmentPtr segment_ptr_;
};
......
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <memory>
#include "storage/IOReader.h"
#include "storage/IOWriter.h"
#include "storage/Operation.h"
namespace milvus {
namespace storage {
struct FSHandler {
IOReaderPtr reader_ptr_ = nullptr;
IOWriterPtr writer_ptr_ = nullptr;
OperationPtr operation_ptr_ = nullptr;
FSHandler(IOReaderPtr& reader_ptr, IOWriterPtr& writer_ptr, OperationPtr& operation_ptr)
: reader_ptr_(reader_ptr), writer_ptr_(writer_ptr), operation_ptr_(operation_ptr) {
}
};
using FSHandlerPtr = std::shared_ptr<FSHandler>;
} // namespace storage
} // namespace milvus
......@@ -11,6 +11,7 @@
#pragma once
#include <memory>
#include <string>
namespace milvus {
......@@ -18,9 +19,8 @@ namespace storage {
class IOReader {
public:
explicit IOReader(const std::string& name) : name_(name) {
}
~IOReader() = default;
virtual void
open(const std::string& name) = 0;
virtual void
read(void* ptr, size_t size) = 0;
......@@ -31,9 +31,11 @@ class IOReader {
virtual size_t
length() = 0;
public:
std::string name_;
virtual void
close() = 0;
};
using IOReaderPtr = std::shared_ptr<IOReader>;
} // namespace storage
} // namespace milvus
......@@ -11,6 +11,7 @@
#pragma once
#include <memory>
#include <string>
namespace milvus {
......@@ -18,9 +19,8 @@ namespace storage {
class IOWriter {
public:
explicit IOWriter(const std::string& name) : name_(name), len_(0) {
}
~IOWriter() = default;
virtual void
open(const std::string& name) = 0;
virtual void
write(void* ptr, size_t size) = 0;
......@@ -28,10 +28,11 @@ class IOWriter {
virtual size_t
length() = 0;
public:
std::string name_;
size_t len_;
virtual void
close() = 0;
};
using IOWriterPtr = std::shared_ptr<IOWriter>;
} // namespace storage
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <string>
#include <vector>
#include "utils/Status.h"
namespace milvus {
namespace storage {
class IStorage {
public:
virtual Status
CreateBucket() = 0;
virtual Status
DeleteBucket() = 0;
virtual Status
PutObjectFile(const std::string& object_name, const std::string& file_path) = 0;
virtual Status
PutObjectStr(const std::string& object_name, const std::string& content) = 0;
virtual Status
GetObjectFile(const std::string& object_name, const std::string& file_path) = 0;
virtual Status
GetObjectStr(const std::string& object_name, std::string& content) = 0;
virtual Status
ListObjects(std::vector<std::string>& object_list, const std::string& marker = "") = 0;
virtual Status
DeleteObject(const std::string& object_name) = 0;
virtual Status
DeleteObjects(const std::string& marker) = 0;
};
} // namespace storage
} // namespace milvus
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <memory>
#include <string>
#include <vector>
namespace milvus {
namespace storage {
class Operation {
public:
virtual void
CreateDirectory() = 0;
virtual const std::string&
GetDirectory() const = 0;
virtual void
ListDirectory(std::vector<std::string>& file_paths) = 0;
virtual bool
DeleteFile(const std::string& file_path) = 0;
// TODO(zhiru):
// open(), sync(), close()
// function that opens a stream for reading file
// function that creates a new, empty file and returns an stream for appending data to this file
// function that creates a new, empty, temporary file and returns an stream for appending data to this file
};
using OperationPtr = std::shared_ptr<Operation>;
} // namespace storage
} // namespace milvus
......@@ -9,33 +9,39 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "storage/file/FileIOReader.h"
#include "storage/disk/DiskIOReader.h"
namespace milvus {
namespace storage {
FileIOReader::FileIOReader(const std::string& name) : IOReader(name) {
void
DiskIOReader::open(const std::string& name) {
name_ = name;
fs_ = std::fstream(name_, std::ios::in | std::ios::binary);
}
FileIOReader::~FileIOReader() {
fs_.close();
}
void
FileIOReader::read(void* ptr, size_t size) {
DiskIOReader::read(void* ptr, size_t size) {
fs_.read(reinterpret_cast<char*>(ptr), size);
}
void
FileIOReader::seekg(size_t pos) {
DiskIOReader::seekg(size_t pos) {
fs_.seekg(pos);
}
size_t
FileIOReader::length() {
DiskIOReader::length() {
fs_.seekg(0, fs_.end);
return fs_.tellg();
size_t len = fs_.tellg();
fs_.seekg(0, fs_.beg);
return len;
}
void
DiskIOReader::close() {
fs_.close();
}
} // namespace storage
} // namespace milvus
......@@ -18,10 +18,22 @@
namespace milvus {
namespace storage {
class FileIOReader : public IOReader {
class DiskIOReader : public IOReader {
public:
explicit FileIOReader(const std::string& name);
~FileIOReader();
DiskIOReader() = default;
~DiskIOReader() = default;
// No copy and move
DiskIOReader(const DiskIOReader&) = delete;
DiskIOReader(DiskIOReader&&) = delete;
DiskIOReader&
operator=(const DiskIOReader&) = delete;
DiskIOReader&
operator=(DiskIOReader&&) = delete;
void
open(const std::string& name) override;
void
read(void* ptr, size_t size) override;
......@@ -32,7 +44,11 @@ class FileIOReader : public IOReader {
size_t
length() override;
void
close() override;
public:
std::string name_;
std::fstream fs_;
};
......
......@@ -9,29 +9,33 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "storage/file/FileIOWriter.h"
#include "storage/disk/DiskIOWriter.h"
namespace milvus {
namespace storage {
FileIOWriter::FileIOWriter(const std::string& name) : IOWriter(name) {
void
DiskIOWriter::open(const std::string& name) {
name_ = name;
len_ = 0;
fs_ = std::fstream(name_, std::ios::out | std::ios::binary);
}
FileIOWriter::~FileIOWriter() {
fs_.close();
}
void
FileIOWriter::write(void* ptr, size_t size) {
DiskIOWriter::write(void* ptr, size_t size) {
fs_.write(reinterpret_cast<char*>(ptr), size);
len_ += size;
}
size_t
FileIOWriter::length() {
DiskIOWriter::length() {
return len_;
}
void
DiskIOWriter::close() {
fs_.close();
}
} // namespace storage
} // namespace milvus
......@@ -18,10 +18,22 @@
namespace milvus {
namespace storage {
class FileIOWriter : public IOWriter {
class DiskIOWriter : public IOWriter {
public:
explicit FileIOWriter(const std::string& name);
~FileIOWriter();
DiskIOWriter() = default;
~DiskIOWriter() = default;
// No copy and move
DiskIOWriter(const DiskIOWriter&) = delete;
DiskIOWriter(DiskIOWriter&&) = delete;
DiskIOWriter&
operator=(const DiskIOWriter&) = delete;
DiskIOWriter&
operator=(DiskIOWriter&&) = delete;
void
open(const std::string& name) override;
void
write(void* ptr, size_t size) override;
......@@ -29,7 +41,12 @@ class FileIOWriter : public IOWriter {
size_t
length() override;
void
close() override;
public:
std::string name_;
size_t len_;
std::fstream fs_;
};
......
......@@ -15,21 +15,20 @@
// specific language governing permissions and limitations
// under the License.
#include "store/Directory.h"
#include <boost/filesystem.hpp>
#include "storage/disk/DiskOperation.h"
#include "utils/Exception.h"
#include "utils/Log.h"
namespace milvus {
namespace store {
namespace storage {
Directory::Directory(const std::string& dir_path) : dir_path_(dir_path) {
DiskOperation::DiskOperation(const std::string& dir_path) : dir_path_(dir_path) {
}
void
Directory::Create() {
DiskOperation::CreateDirectory() {
if (!boost::filesystem::is_directory(dir_path_)) {
auto ret = boost::filesystem::create_directory(dir_path_);
if (!ret) {
......@@ -40,8 +39,13 @@ Directory::Create() {
}
}
const std::string&
DiskOperation::GetDirectory() const {
return dir_path_;
}
void
Directory::ListAll(std::vector<std::string>& file_paths) {
DiskOperation::ListDirectory(std::vector<std::string>& file_paths) {
boost::filesystem::path target_path(dir_path_);
typedef boost::filesystem::directory_iterator d_it;
d_it it_end;
......@@ -54,14 +58,9 @@ Directory::ListAll(std::vector<std::string>& file_paths) {
}
bool
Directory::DeleteFile(const std::string& file_path) {
DiskOperation::DeleteFile(const std::string& file_path) {
return boost::filesystem::remove(file_path);
}
const std::string&
Directory::GetDirPath() const {
return dir_path_;
}
} // namespace store
} // namespace storage
} // namespace milvus
......@@ -21,25 +21,27 @@
#include <string>
#include <vector>
#include "storage/Operation.h"
namespace milvus {
namespace store {
namespace storage {
class Directory {
class DiskOperation : public Operation {
public:
explicit Directory(const std::string& dir_path);
explicit DiskOperation(const std::string& dir_path);
void
Create();
CreateDirectory();
const std::string&
GetDirectory() const;
void
ListAll(std::vector<std::string>& file_paths);
ListDirectory(std::vector<std::string>& file_paths);
bool
DeleteFile(const std::string& file_path);
const std::string&
GetDirPath() const;
// TODO(zhiru):
// open(), sync(), close()
// function that opens a stream for reading file
......@@ -50,7 +52,7 @@ class Directory {
const std::string dir_path_;
};
using DirectoryPtr = std::shared_ptr<Directory>;
using DiskOperationPtr = std::shared_ptr<DiskOperation>;
} // namespace store
} // namespace storage
} // namespace milvus
......@@ -16,12 +16,13 @@
#include <memory>
#include <string>
#include <vector>
#include "storage/IStorage.h"
#include "utils/Status.h"
namespace milvus {
namespace storage {
class S3ClientWrapper : public IStorage {
class S3ClientWrapper {
public:
static S3ClientWrapper&
GetInstance() {
......@@ -35,23 +36,23 @@ class S3ClientWrapper : public IStorage {
StopService();
Status
CreateBucket() override;
CreateBucket();
Status
DeleteBucket() override;
DeleteBucket();
Status
PutObjectFile(const std::string& object_key, const std::string& file_path) override;
PutObjectFile(const std::string& object_key, const std::string& file_path);
Status
PutObjectStr(const std::string& object_key, const std::string& content) override;
PutObjectStr(const std::string& object_key, const std::string& content);
Status
GetObjectFile(const std::string& object_key, const std::string& file_path) override;
GetObjectFile(const std::string& object_key, const std::string& file_path);
Status
GetObjectStr(const std::string& object_key, std::string& content) override;
GetObjectStr(const std::string& object_key, std::string& content);
Status
ListObjects(std::vector<std::string>& object_list, const std::string& marker = "") override;
ListObjects(std::vector<std::string>& object_list, const std::string& marker = "");
Status
DeleteObject(const std::string& object_key) override;
DeleteObject(const std::string& object_key);
Status
DeleteObjects(const std::string& marker) override;
DeleteObjects(const std::string& marker);
private:
std::shared_ptr<Aws::S3::S3Client> client_ptr_;
......
......@@ -15,13 +15,13 @@
namespace milvus {
namespace storage {
S3IOReader::S3IOReader(const std::string& name) : IOReader(name), pos_(0) {
void
S3IOReader::open(const std::string& name) {
name_ = name;
pos_ = 0;
S3ClientWrapper::GetInstance().GetObjectStr(name_, buffer_);
}
S3IOReader::~S3IOReader() {
}
void
S3IOReader::read(void* ptr, size_t size) {
memcpy(ptr, buffer_.data() + pos_, size);
......@@ -37,5 +37,9 @@ S3IOReader::length() {
return buffer_.length();
}
void
S3IOReader::close() {
}
} // namespace storage
} // namespace milvus
......@@ -19,8 +19,20 @@ namespace storage {
class S3IOReader : public IOReader {
public:
explicit S3IOReader(const std::string& name);
~S3IOReader();
S3IOReader() = default;
~S3IOReader() = default;
// No copy and move
S3IOReader(const S3IOReader&) = delete;
S3IOReader(S3IOReader&&) = delete;
S3IOReader&
operator=(const S3IOReader&) = delete;
S3IOReader&
operator=(S3IOReader&&) = delete;
void
open(const std::string& name) override;
void
read(void* ptr, size_t size) override;
......@@ -31,7 +43,11 @@ class S3IOReader : public IOReader {
size_t
length() override;
void
close() override;
public:
std::string name_;
std::string buffer_;
size_t pos_;
};
......
......@@ -15,14 +15,13 @@
namespace milvus {
namespace storage {
S3IOWriter::S3IOWriter(const std::string& name) : IOWriter(name) {
void
S3IOWriter::open(const std::string& name) {
name_ = name;
len_ = 0;
buffer_ = "";
}
S3IOWriter::~S3IOWriter() {
S3ClientWrapper::GetInstance().PutObjectStr(name_, buffer_);
}
void
S3IOWriter::write(void* ptr, size_t size) {
buffer_ += std::string(reinterpret_cast<char*>(ptr), size);
......@@ -34,5 +33,10 @@ S3IOWriter::length() {
return len_;
}
void
S3IOWriter::close() {
S3ClientWrapper::GetInstance().PutObjectStr(name_, buffer_);
}
} // namespace storage
} // namespace milvus
......@@ -19,8 +19,20 @@ namespace storage {
class S3IOWriter : public IOWriter {
public:
explicit S3IOWriter(const std::string& name);
~S3IOWriter();
S3IOWriter() = default;
~S3IOWriter() = default;
// No copy and move
S3IOWriter(const S3IOWriter&) = delete;
S3IOWriter(S3IOWriter&&) = delete;
S3IOWriter&
operator=(const S3IOWriter&) = delete;
S3IOWriter&
operator=(S3IOWriter&&) = delete;
void
open(const std::string& name) override;
void
write(void* ptr, size_t size) override;
......@@ -28,7 +40,12 @@ class S3IOWriter : public IOWriter {
size_t
length() override;
void
close() override;
public:
std::string name_;
size_t len_;
std::string buffer_;
};
......
......@@ -22,8 +22,8 @@
#include "knowhere/index/vector_index/IndexIVFSQ.h"
#include "knowhere/index/vector_index/IndexNSG.h"
#include "knowhere/index/vector_index/IndexSPTAG.h"
#include "storage/file/FileIOReader.h"
#include "storage/file/FileIOWriter.h"
#include "storage/disk/DiskIOReader.h"
#include "storage/disk/DiskIOWriter.h"
#include "storage/s3/S3IOReader.h"
#include "storage/s3/S3IOWriter.h"
#include "utils/Exception.h"
......@@ -179,12 +179,13 @@ read_index(const std::string& location) {
std::shared_ptr<storage::IOReader> reader_ptr;
if (s3_enable) {
reader_ptr = std::make_shared<storage::S3IOReader>(location);
reader_ptr = std::make_shared<storage::S3IOReader>();
} else {
reader_ptr = std::make_shared<storage::FileIOReader>(location);
reader_ptr = std::make_shared<storage::DiskIOReader>();
}
recorder.RecordSection("Start");
reader_ptr->open(location);
size_t length = reader_ptr->length();
if (length <= 0) {
......@@ -226,6 +227,8 @@ read_index(const std::string& location) {
delete[] meta;
}
reader_ptr->close();
double span = recorder.RecordSection("End");
double rate = length * 1000000.0 / span / 1024 / 1024;
STORAGE_LOG_DEBUG << "read_index(" << location << ") rate " << rate << "MB/s";
......@@ -252,12 +255,13 @@ write_index(VecIndexPtr index, const std::string& location) {
std::shared_ptr<storage::IOWriter> writer_ptr;
if (s3_enable) {
writer_ptr = std::make_shared<storage::S3IOWriter>(location);
writer_ptr = std::make_shared<storage::S3IOWriter>();
} else {
writer_ptr = std::make_shared<storage::FileIOWriter>(location);
writer_ptr = std::make_shared<storage::DiskIOWriter>();
}
recorder.RecordSection("Start");
writer_ptr->open(location);
writer_ptr->write(&index_type, sizeof(IndexType));
......@@ -273,6 +277,8 @@ write_index(VecIndexPtr index, const std::string& location) {
writer_ptr->write((void*)binary->data.get(), binary_length);
}
writer_ptr->close();
double span = recorder.RecordSection("End");
double rate = writer_ptr->length() * 1000000.0 / span / 1024 / 1024;
STORAGE_LOG_DEBUG << "write_index(" << location << ") rate " << rate << "MB/s";
......
......@@ -98,11 +98,11 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/tracing tracing_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/storage storage_main_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/storage/file storage_file_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/storage/disk storage_disk_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/storage/s3 storage_s3_files)
set(storage_files
${storage_main_files}
${storage_file_files}
${storage_disk_files}
${storage_s3_files}
)
......@@ -111,8 +111,6 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/codecs/default codecs_default_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/segment segment_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/store store_files)
set(entry_file
${CMAKE_CURRENT_SOURCE_DIR}/main.cpp)
......@@ -146,7 +144,6 @@ set(common_files
${codecs_files}
${codecs_default_files}
${segment_files}
${store_files}
)
set(unittest_libs
......
......@@ -21,7 +21,6 @@
#include "storage/s3/S3ClientWrapper.h"
#include "storage/s3/S3IOReader.h"
#include "storage/s3/S3IOWriter.h"
#include "storage/IStorage.h"
#include "storage/utils.h"
INITIALIZE_EASYLOGGINGPP
......@@ -91,15 +90,18 @@ TEST_F(StorageTest, S3_RW_TEST) {
ASSERT_TRUE(storage_inst.StartService().ok());
{
milvus::storage::S3IOWriter writer(index_name);
milvus::storage::S3IOWriter writer;
writer.open(index_name);
size_t len = content.length();
writer.write(&len, sizeof(len));
writer.write((void*)(content.data()), len);
ASSERT_TRUE(len + sizeof(len) == writer.length());
writer.close();
}
{
milvus::storage::S3IOReader reader(index_name);
milvus::storage::S3IOReader reader;
reader.open(index_name);
size_t length = reader.length();
size_t rp = 0;
reader.seekg(rp);
......@@ -121,6 +123,7 @@ TEST_F(StorageTest, S3_RW_TEST) {
}
ASSERT_TRUE(content == content_out);
reader.close();
}
storage_inst.StopService();
......
......@@ -27,8 +27,8 @@ set(wrapper_files
)
set(storage_files
${MILVUS_ENGINE_SRC}/storage/file/FileIOReader.cpp
${MILVUS_ENGINE_SRC}/storage/file/FileIOWriter.cpp
${MILVUS_ENGINE_SRC}/storage/disk/DiskIOReader.cpp
${MILVUS_ENGINE_SRC}/storage/disk/DiskIOWriter.cpp
)
set(util_files
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册