未验证 提交 0323aa1a 编写于 作者: J Jin Hai 提交者: GitHub

Merge 080 (#1940)

* #1910 C++ SDK GetIDsInSegment could not work for large dataset (#1911)
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* #1903 Fix invalid annoy result (#1912)
Signed-off-by: Nshengjun.li <shengjun.li@zilliz.com>

* #1914: Partition max size should be 4096 (#1915)
Signed-off-by: Njinhai <hai.jin@zilliz.com>

* add log (#1913)

* add log
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* add log
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* fix ut
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* partition limit 4096
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* fix py test
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* update server version (#1916)
Signed-off-by: Nzw <zw@zilliz.com>

* Update to 0.8.0 (#1918)

* Create new branch 0.8.0 and change preload_table to preload_collection
Signed-off-by: Njinhai <hai.jin@zilliz.com>

* Fix format
Signed-off-by: NJinHai-CN <hai.jin@zilliz.com>

* Update CHANGELOG
Signed-off-by: Njinhai <hai.jin@zilliz.com>

* Update CHANGELOG
Signed-off-by: Njinhai <hai.jin@zilliz.com>

* update helm version
Signed-off-by: Nzw <zw@zilliz.com>

* Update CHANGELOG
Signed-off-by: Njinhai <hai.jin@zilliz.com>
Co-authored-by: Nzw <zw@zilliz.com>

* fix issue 1901 (#1920)

* fix issue 1901
Signed-off-by: Ncmli <chengming.li@zilliz.com>

* update change log
Signed-off-by: Ncmli <chengming.li@zilliz.com>
Co-authored-by: Ncmli <chengming.li@zilliz.com>

* #1900 (#1923)

* add log
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* fix #1900
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* Upgrade mishards to 0.8.0 (#1933)

* update grpc server of milvus & rename table name to collection
Signed-off-by: NYhz <yinghao.zou@zilliz.com>

* update changlog
Signed-off-by: NYhz <yinghao.zou@zilliz.com>

* [skip ci] Skip CI
Signed-off-by: NYhz <yinghao.zou@zilliz.com>

* [skip ci] Update changlog
Signed-off-by: NYhz <yinghao.zou@zilliz.com>

* Caiyd 1883 fix rw (#1926)

* #1883 use DiskIO
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* fix logic error
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* update changelog
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* retry CI
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* Update CHANGELOG
Signed-off-by: NJinHai-CN <hai.jin@zilliz.com>

* update changelog
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
Co-authored-by: NJinHai-CN <hai.jin@zilliz.com>

* #1928 Too many data and uid copies when loading files (#1931)
Signed-off-by: Nshengjun.li <shengjun.li@zilliz.com>
Co-authored-by: NJin Hai <hai.jin@zilliz.com>

* Update mishards configure files (#1938)

* Update web readme
Signed-off-by: NYhz <yinghao.zou@zilliz.com>

* [skip ci] update configure files
Signed-off-by: NYhz <yinghao.zou@zilliz.com>

* [skip ci] rename table to collection
Signed-off-by: NYhz <yinghao.zou@zilliz.com>

* Update test.groovy
Signed-off-by: Njinhai <hai.jin@zilliz.com>

* Update test.groovy
Signed-off-by: Njinhai <hai.jin@zilliz.com>

* Fix lint
Signed-off-by: NJinHai-CN <hai.jin@zilliz.com>

* Fix compiling error
Signed-off-by: Njinhai <hai.jin@zilliz.com>
Co-authored-by: Ngroot <yhmo@zeronedata.com>
Co-authored-by: Nshengjun.li <49774184+shengjun1985@users.noreply.github.com>
Co-authored-by: Ndel-zhenwu <56623710+del-zhenwu@users.noreply.github.com>
Co-authored-by: Nzw <zw@zilliz.com>
Co-authored-by: Nop-hunter <ophunter52@gmail.com>
Co-authored-by: Ncmli <chengming.li@zilliz.com>
Co-authored-by: NBossZou <40255591+BossZou@users.noreply.github.com>
Co-authored-by: NCai Yudong <yudong.cai@zilliz.com>
上级 890fe08e
......@@ -13,7 +13,7 @@ Please mark all change in change log and use the issue from GitHub
## Task
# Milvus 0.8.0 (TBD)
# Milvus 0.8.0 (2020-04-15)
## Bug
- \#1276 SQLite throw exception after create 50000+ partitions in a table
......@@ -22,6 +22,10 @@ Please mark all change in change log and use the issue from GitHub
- \#1832 Fix crash in tracing module
- \#1873 Fix index file serialize to incorrect path
- \#1881 Fix bad alloc when index files lost
- \#1883 Fix inserted vectors becomes all zero when index_file_size >= 2GB
- \#1901 Search failed with flat index
- \#1903 Fix invalid annoy result
- \#1910 C++ SDK GetIDsInSegment could not work for large dataset
## Feature
- \#261 Integrate ANNOY into Milvus
......@@ -41,10 +45,11 @@ Please mark all change in change log and use the issue from GitHub
- \#1886 Refactor log on search and insert request
- \#1897 Heap pop and push can be realized by heap_swap_top
- \#1921 Use TimeRecorder instead of chrono
- \#1928 Fix too many data and uid copies when loading files
- \#1930 Upgrade mishards to v0.8.0
## Task
# Milvus 0.7.1 (2020-03-29)
## Bug
......@@ -708,14 +713,14 @@ Please mark all change in change log and use the issue from GitHub
- MS-16 Implement metrics without prometheus
- MS-21 Implement SDK interface part 2
- MS-26 CMake. Add thirdparty packages
- MS-31 cmake: add prometheus
- MS-31 CMake: add prometheus
- MS-33 cmake: add -j4 to make third party packages build faster
- MS-27 Support gpu config and disable license build config in cmake
- MS-47 Add query vps metrics
- MS-37 Add query, cache usage, disk write speed and file data size metrics
- MS-30 Use faiss v1.5.2
- MS-54 cmake: Change Thrift third party URL to github.com
- MS-69 prometheus: add all proposed metrics
- MS-69 Prometheus: add all proposed metrics
## Task
......
......@@ -2,7 +2,7 @@ dir ('milvus-helm') {
sh 'helm version'
sh 'helm repo add stable https://kubernetes.oss-cn-hangzhou.aliyuncs.com/charts'
sh 'helm repo update'
checkout([$class: 'GitSCM', branches: [[name: "0.8.0"]], userRemoteConfigs: [[url: "https://github.com/milvus-io/milvus-helm.git", name: 'origin', refspec: "+refs/heads/0.8.0:refs/remotes/origin/0.8.0"]]])
checkout([$class: 'GitSCM', branches: [[name: "master"]], userRemoteConfigs: [[url: "https://github.com/milvus-io/milvus-helm.git", name: 'origin', refspec: "+refs/heads/master:refs/remotes/origin/master"]]])
sh "helm install --wait --timeout 600s --set image.repository=registry.zilliz.com/milvus/engine --set image.tag=${DOCKER_VERSION} --set image.pullPolicy=Always --set service.type=ClusterIP -f ci/db_backend/mysql_${BINARY_VERSION}_values.yaml -f ci/filebeat/values.yaml --namespace milvus ${env.HELM_RELEASE_NAME} ."
}
......@@ -16,7 +16,7 @@ load "ci/jenkins/step/cleanupSingleDev.groovy"
if (!fileExists('milvus-helm')) {
dir ("milvus-helm") {
checkout([$class: 'GitSCM', branches: [[name: "0.8.0"]], userRemoteConfigs: [[url: "https://github.com/milvus-io/milvus-helm.git", name: 'origin', refspec: "+refs/heads/0.8.0:refs/remotes/origin/0.8.0"]]])
checkout([$class: 'GitSCM', branches: [[name: "master"]], userRemoteConfigs: [[url: "https://github.com/milvus-io/milvus-helm.git", name: 'origin', refspec: "+refs/heads/master:refs/remotes/origin/master"]]])
}
}
dir ("milvus-helm") {
......
......@@ -2,7 +2,7 @@ dir ('milvus-helm') {
sh 'helm version'
sh 'helm repo add stable https://kubernetes.oss-cn-hangzhou.aliyuncs.com/charts'
sh 'helm repo update'
checkout([$class: 'GitSCM', branches: [[name: "0.8.0"]], userRemoteConfigs: [[url: "https://github.com/milvus-io/milvus-helm.git", name: 'origin', refspec: "+refs/heads/0.8.0:refs/remotes/origin/0.8.0"]]])
checkout([$class: 'GitSCM', branches: [[name: "master"]], userRemoteConfigs: [[url: "https://github.com/milvus-io/milvus-helm.git", name: 'origin', refspec: "+refs/heads/master:refs/remotes/origin/master"]]])
sh "helm install --wait --timeout 600s --set image.repository=registry.zilliz.com/milvus/engine --set image.tag=${DOCKER_VERSION} --set image.pullPolicy=Always --set service.type=ClusterIP -f ci/db_backend/mysql_${BINARY_VERSION}_values.yaml -f ci/filebeat/values.yaml --namespace milvus ${env.HELM_RELEASE_NAME} ."
}
......
......@@ -31,74 +31,44 @@ namespace milvus {
namespace codec {
void
DefaultVectorsFormat::read_vectors_internal(const std::string& file_path, off_t offset, size_t num,
std::vector<uint8_t>& raw_vectors) {
int rv_fd = open(file_path.c_str(), O_RDONLY, 00664);
if (rv_fd == -1) {
DefaultVectorsFormat::read_vectors_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
off_t offset, size_t num, std::vector<uint8_t>& raw_vectors) {
if (!fs_ptr->reader_ptr_->open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
throw Exception(SERVER_CANNOT_OPEN_FILE, err_msg);
}
size_t num_bytes;
if (::read(rv_fd, &num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->reader_ptr_->read(&num_bytes, sizeof(size_t));
num = std::min(num, num_bytes - offset);
offset += sizeof(size_t); // Beginning of file is num_bytes
int off = lseek(rv_fd, offset, SEEK_SET);
if (off == -1) {
std::string err_msg = "Failed to seek file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->reader_ptr_->seekg(offset);
raw_vectors.resize(num / sizeof(uint8_t));
if (::read(rv_fd, raw_vectors.data(), num) == -1) {
std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->reader_ptr_->read(raw_vectors.data(), num);
if (::close(rv_fd) == -1) {
std::string err_msg = "Failed to close file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->reader_ptr_->close();
}
void
DefaultVectorsFormat::read_uids_internal(const std::string& file_path, std::vector<segment::doc_id_t>& uids) {
int uid_fd = open(file_path.c_str(), O_RDONLY, 00664);
if (uid_fd == -1) {
DefaultVectorsFormat::read_uids_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
std::vector<segment::doc_id_t>& uids) {
if (!fs_ptr->reader_ptr_->open(file_path.c_str())) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
throw Exception(SERVER_CANNOT_OPEN_FILE, err_msg);
}
size_t num_bytes;
if (::read(uid_fd, &num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->reader_ptr_->read(&num_bytes, sizeof(size_t));
uids.resize(num_bytes / sizeof(segment::doc_id_t));
if (::read(uid_fd, uids.data(), num_bytes) == -1) {
std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->reader_ptr_->read(uids.data(), num_bytes);
if (::close(uid_fd) == -1) {
std::string err_msg = "Failed to close file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->reader_ptr_->close();
}
void
......@@ -120,15 +90,12 @@ DefaultVectorsFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::Vectors
for (; it != it_end; ++it) {
const auto& path = it->path();
if (path.extension().string() == raw_vector_extension_) {
std::vector<uint8_t> vector_list;
read_vectors_internal(path.string(), 0, INT64_MAX, vector_list);
vectors_read->AddData(vector_list);
auto& vector_list = vectors_read->GetMutableData();
read_vectors_internal(fs_ptr, path.string(), 0, INT64_MAX, vector_list);
vectors_read->SetName(path.stem().string());
}
if (path.extension().string() == user_id_extension_) {
std::vector<segment::doc_id_t> uids;
read_uids_internal(path.string(), uids);
vectors_read->AddUids(uids);
} else if (path.extension().string() == user_id_extension_) {
auto& uids = vectors_read->GetMutableUids();
read_uids_internal(fs_ptr, path.string(), uids);
}
}
}
......@@ -144,54 +111,28 @@ DefaultVectorsFormat::write(const storage::FSHandlerPtr& fs_ptr, const segment::
TimeRecorder rc("write vectors");
int rv_fd = open(rv_file_path.c_str(), O_WRONLY | O_TRUNC | O_CREAT, 00664);
if (rv_fd == -1) {
if (!fs_ptr->writer_ptr_->open(rv_file_path.c_str())) {
std::string err_msg = "Failed to open file: " + rv_file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
size_t rv_num_bytes = vectors->GetData().size() * sizeof(uint8_t);
if (::write(rv_fd, &rv_num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to write to file: " + rv_file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::write(rv_fd, vectors->GetData().data(), rv_num_bytes) == -1) {
std::string err_msg = "Failed to write to file: " + rv_file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::close(rv_fd) == -1) {
std::string err_msg = "Failed to close file: " + rv_file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->writer_ptr_->write(&rv_num_bytes, sizeof(size_t));
fs_ptr->writer_ptr_->write((void*)vectors->GetData().data(), rv_num_bytes);
fs_ptr->writer_ptr_->close();
rc.RecordSection("write rv done");
int uid_fd = open(uid_file_path.c_str(), O_WRONLY | O_TRUNC | O_CREAT, 00664);
if (uid_fd == -1) {
if (!fs_ptr->writer_ptr_->open(uid_file_path.c_str())) {
std::string err_msg = "Failed to open file: " + uid_file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
size_t uid_num_bytes = vectors->GetUids().size() * sizeof(segment::doc_id_t);
if (::write(uid_fd, &uid_num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to write to file" + rv_file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::write(uid_fd, vectors->GetUids().data(), uid_num_bytes) == -1) {
std::string err_msg = "Failed to write to file" + uid_file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::close(uid_fd) == -1) {
std::string err_msg = "Failed to close file: " + uid_file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
fs_ptr->writer_ptr_->write(&uid_num_bytes, sizeof(size_t));
fs_ptr->writer_ptr_->write((void*)vectors->GetUids().data(), uid_num_bytes);
fs_ptr->writer_ptr_->close();
rc.RecordSection("write uids done");
}
......@@ -215,7 +156,7 @@ DefaultVectorsFormat::read_uids(const storage::FSHandlerPtr& fs_ptr, std::vector
for (; it != it_end; ++it) {
const auto& path = it->path();
if (path.extension().string() == user_id_extension_) {
read_uids_internal(path.string(), uids);
read_uids_internal(fs_ptr, path.string(), uids);
}
}
}
......@@ -240,7 +181,7 @@ DefaultVectorsFormat::read_vectors(const storage::FSHandlerPtr& fs_ptr, off_t of
for (; it != it_end; ++it) {
const auto& path = it->path();
if (path.extension().string() == raw_vector_extension_) {
read_vectors_internal(path.string(), offset, num_bytes, raw_vectors);
read_vectors_internal(fs_ptr, path.string(), offset, num_bytes, raw_vectors);
}
}
}
......
......@@ -55,10 +55,12 @@ class DefaultVectorsFormat : public VectorsFormat {
private:
void
read_vectors_internal(const std::string&, off_t, size_t, std::vector<uint8_t>&);
read_vectors_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, off_t offset, size_t num,
std::vector<uint8_t>& raw_vectors);
void
read_uids_internal(const std::string&, std::vector<segment::doc_id_t>&);
read_uids_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
std::vector<segment::doc_id_t>& uids);
private:
std::mutex mutex_;
......
......@@ -861,6 +861,8 @@ DBImpl::GetVectorByID(const std::string& collection_id, const IDNumber& vector_i
return status;
}
OngoingFileChecker::GetInstance().MarkOngoingFiles(files_to_query);
std::vector<meta::CollectionSchema> partition_array;
status = meta_ptr_->ShowPartitions(collection_id, partition_array);
for (auto& schema : partition_array) {
......@@ -871,17 +873,18 @@ DBImpl::GetVectorByID(const std::string& collection_id, const IDNumber& vector_i
LOG_ENGINE_ERROR_ << err_msg;
return status;
}
OngoingFileChecker::GetInstance().MarkOngoingFiles(files);
files_to_query.insert(files_to_query.end(), std::make_move_iterator(files.begin()),
std::make_move_iterator(files.end()));
}
if (files_to_query.empty()) {
LOG_ENGINE_DEBUG_ << "No files to get vector by id from";
return Status::OK();
return Status(DB_NOT_FOUND, "Collection is empty");
}
cache::CpuCacheMgr::GetInstance()->PrintInfo();
OngoingFileChecker::GetInstance().MarkOngoingFiles(files_to_query);
status = GetVectorByIdHelper(collection_id, vector_id, vector, files_to_query);
......@@ -965,7 +968,11 @@ DBImpl::GetVectorIDs(const std::string& collection_id, const std::string& segmen
Status
DBImpl::GetVectorByIdHelper(const std::string& collection_id, IDNumber vector_id, VectorsData& vector,
const meta::SegmentsSchema& files) {
LOG_ENGINE_DEBUG_ << "Getting vector by id in " << files.size() << " files";
LOG_ENGINE_DEBUG_ << "Getting vector by id in " << files.size() << " files, id = " << vector_id;
vector.vector_count_ = 0;
vector.float_data_.clear();
vector.binary_data_.clear();
for (auto& file : files) {
// Load bloom filter
......@@ -993,6 +1000,7 @@ DBImpl::GetVectorByIdHelper(const std::string& collection_id, IDNumber vector_id
segment::DeletedDocsPtr deleted_docs_ptr;
status = segment_reader.LoadDeletedDocs(deleted_docs_ptr);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << status.message();
return status;
}
auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs();
......@@ -1005,6 +1013,7 @@ DBImpl::GetVectorByIdHelper(const std::string& collection_id, IDNumber vector_id
std::vector<uint8_t> raw_vector;
status = segment_reader.LoadVectors(offset * single_vector_bytes, single_vector_bytes, raw_vector);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << status.message();
return status;
}
......@@ -1025,6 +1034,11 @@ DBImpl::GetVectorByIdHelper(const std::string& collection_id, IDNumber vector_id
}
}
if (vector.binary_data_.empty() && vector.float_data_.empty()) {
std::string msg = "Vector with id " + std::to_string(vector_id) + " not found in collection " + collection_id;
LOG_ENGINE_DEBUG_ << msg;
}
return Status::OK();
}
......
......@@ -375,8 +375,6 @@ ExecutionEngineImpl::Serialize() {
Status
ExecutionEngineImpl::Load(bool to_cache) {
// TODO(zhiru): refactor
index_ = std::static_pointer_cast<knowhere::VecIndex>(cache::CpuCacheMgr::GetInstance()->GetIndex(location_));
bool already_in_cache = (index_ != nullptr);
if (!already_in_cache) {
......@@ -411,21 +409,19 @@ ExecutionEngineImpl::Load(bool to_cache) {
auto& vectors = segment_ptr->vectors_ptr_;
auto& deleted_docs = segment_ptr->deleted_docs_ptr_->GetDeletedDocs();
auto vectors_uids = vectors->GetUids();
auto& vectors_uids = vectors->GetMutableUids();
auto count = vectors_uids.size();
index_->SetUids(vectors_uids);
LOG_ENGINE_DEBUG_ << "set uids " << index_->GetUids().size() << " for index " << location_;
auto vectors_data = vectors->GetData();
auto& vectors_data = vectors->GetData();
faiss::ConcurrentBitsetPtr concurrent_bitset_ptr =
std::make_shared<faiss::ConcurrentBitset>(vectors->GetCount());
faiss::ConcurrentBitsetPtr concurrent_bitset_ptr = std::make_shared<faiss::ConcurrentBitset>(count);
for (auto& offset : deleted_docs) {
if (!concurrent_bitset_ptr->test(offset)) {
concurrent_bitset_ptr->set(offset);
}
concurrent_bitset_ptr->set(offset);
}
auto dataset = knowhere::GenDataset(vectors->GetCount(), this->dim_, vectors_data.data());
auto dataset = knowhere::GenDataset(count, this->dim_, vectors_data.data());
if (index_type_ == EngineType::FAISS_IDMAP) {
auto bf_index = std::static_pointer_cast<knowhere::IDMAP>(index_);
bf_index->Train(knowhere::DatasetPtr(), conf);
......
......@@ -651,6 +651,9 @@ MySQLMetaImpl::DeleteCollectionFiles(const std::string& collection_id) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
// soft delete collection files
mysqlpp::Query statement = connectionPtr->query();
//
......@@ -726,6 +729,9 @@ MySQLMetaImpl::CreateCollectionFile(SegmentSchema& file_schema) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "INSERT INTO " << META_TABLEFILES << " VALUES(" << id << ", " << mysqlpp::quote
......@@ -777,6 +783,9 @@ MySQLMetaImpl::GetCollectionFiles(const std::string& collection_id, const std::v
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement
<< "SELECT id, segment_id, engine_type, file_id, file_type, file_size, row_count, date, created_on"
......@@ -834,6 +843,9 @@ MySQLMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id,
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, "
<< "row_count, date, created_on"
......@@ -897,15 +909,14 @@ MySQLMetaImpl::UpdateCollectionIndex(const std::string& collection_id, const Col
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
mysqlpp::Query updateCollectionIndexParamQuery = connectionPtr->query();
updateCollectionIndexParamQuery << "SELECT id, state, dimension, created_on"
<< " FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
<< collection_id << " AND state <> "
<< std::to_string(CollectionSchema::TO_DELETE) << ";";
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, state, dimension, created_on"
<< " FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote << collection_id
<< " AND state <> " << std::to_string(CollectionSchema::TO_DELETE) << ";";
LOG_ENGINE_DEBUG_ << "UpdateCollectionIndex: " << updateCollectionIndexParamQuery.str();
LOG_ENGINE_DEBUG_ << "UpdateCollectionIndex: " << statement.str();
mysqlpp::StoreQueryResult res = updateCollectionIndexParamQuery.store();
mysqlpp::StoreQueryResult res = statement.store();
if (res.num_rows() == 1) {
const mysqlpp::Row& resRow = res[0];
......@@ -915,18 +926,16 @@ MySQLMetaImpl::UpdateCollectionIndex(const std::string& collection_id, const Col
uint16_t dimension = resRow["dimension"];
int64_t created_on = resRow["created_on"];
updateCollectionIndexParamQuery
<< "UPDATE " << META_TABLES << " SET id = " << id << " ,state = " << state
<< " ,dimension = " << dimension << " ,created_on = " << created_on
<< " ,engine_type = " << index.engine_type_ << " ,index_params = " << mysqlpp::quote
<< index.extra_params_.dump() << " ,metric_type = " << index.metric_type_
<< " WHERE table_id = " << mysqlpp::quote << collection_id << ";";
statement << "UPDATE " << META_TABLES << " SET id = " << id << " ,state = " << state
<< " ,dimension = " << dimension << " ,created_on = " << created_on
<< " ,engine_type = " << index.engine_type_ << " ,index_params = " << mysqlpp::quote
<< index.extra_params_.dump() << " ,metric_type = " << index.metric_type_
<< " WHERE table_id = " << mysqlpp::quote << collection_id << ";";
LOG_ENGINE_DEBUG_ << "UpdateCollectionIndex: " << updateCollectionIndexParamQuery.str();
LOG_ENGINE_DEBUG_ << "UpdateCollectionIndex: " << statement.str();
if (!updateCollectionIndexParamQuery.exec()) {
return HandleException("Failed to update collection index",
updateCollectionIndexParamQuery.error());
if (!statement.exec()) {
return HandleException("Failed to update collection index", statement.error());
}
} else {
return Status(DB_NOT_FOUND, "Collection " + collection_id + " not found");
......@@ -1019,7 +1028,7 @@ MySQLMetaImpl::GetCollectionFlushLSN(const std::string& collection_id, uint64_t&
}
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT flush_lsn FROM " << META_TABLES << " WHERE collection_id = " << mysqlpp::quote
statement << "SELECT flush_lsn FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
<< collection_id << ";";
LOG_ENGINE_DEBUG_ << "GetCollectionFlushLSN: " << statement.str();
......@@ -1054,6 +1063,9 @@ MySQLMetaImpl::UpdateCollectionFile(SegmentSchema& file_schema) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
// if the collection has been deleted, just mark the collection file as TO_DELETE
......@@ -1120,6 +1132,9 @@ MySQLMetaImpl::UpdateCollectionFilesToIndex(const std::string& collection_id) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "UPDATE " << META_TABLEFILES << " SET file_type = " << std::to_string(SegmentSchema::TO_INDEX)
......@@ -1155,6 +1170,9 @@ MySQLMetaImpl::UpdateCollectionFiles(SegmentsSchema& files) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
std::map<std::string, bool> has_collections;
......@@ -1229,6 +1247,9 @@ MySQLMetaImpl::UpdateCollectionFilesRowCount(SegmentsSchema& files) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
for (auto& file : files) {
......@@ -1442,7 +1463,7 @@ MySQLMetaImpl::ShowPartitions(const std::string& collection_id,
<< " WHERE owner_table = " << mysqlpp::quote << collection_id << " AND state <> "
<< std::to_string(CollectionSchema::TO_DELETE) << ";";
LOG_ENGINE_DEBUG_ << "AllCollections: " << statement.str();
LOG_ENGINE_DEBUG_ << "ShowPartitions: " << statement.str();
res = statement.store();
} // Scoped Connection
......@@ -1498,7 +1519,7 @@ MySQLMetaImpl::GetPartitionName(const std::string& collection_id, const std::str
<< collection_id << " AND partition_tag = " << mysqlpp::quote << valid_tag << " AND state <> "
<< std::to_string(CollectionSchema::TO_DELETE) << ";";
LOG_ENGINE_DEBUG_ << "AllCollections: " << statement.str();
LOG_ENGINE_DEBUG_ << "GetPartitionName: " << statement.str();
res = statement.store();
} // Scoped Connection
......@@ -1533,6 +1554,9 @@ MySQLMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema& f
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date"
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id;
......@@ -1615,6 +1639,9 @@ MySQLMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& fi
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date, "
"engine_type, created_on"
......@@ -1684,6 +1711,9 @@ MySQLMetaImpl::FilesToIndex(SegmentsSchema& files) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, "
"row_count, date, created_on"
......@@ -1773,16 +1803,19 @@ MySQLMetaImpl::FilesByType(const std::string& collection_id, const std::vector<i
types += std::to_string(type);
}
mysqlpp::Query hasNonIndexFilesQuery = connectionPtr->query();
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
// since collection_id is a unique column we just need to check whether it exists or not
hasNonIndexFilesQuery
statement
<< "SELECT id, segment_id, engine_type, file_id, file_type, file_size, row_count, date, created_on"
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id
<< " AND file_type in (" << types << ");";
LOG_ENGINE_DEBUG_ << "FilesByType: " << hasNonIndexFilesQuery.str();
LOG_ENGINE_DEBUG_ << "FilesByType: " << statement.str();
res = hasNonIndexFilesQuery.store();
res = statement.store();
} // Scoped Connection
CollectionSchema collection_schema;
......@@ -1906,6 +1939,9 @@ MySQLMetaImpl::FilesByID(const std::vector<size_t>& ids, SegmentsSchema& files)
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date"
<< " FROM " << META_TABLEFILES;
......@@ -2054,6 +2090,9 @@ MySQLMetaImpl::Size(uint64_t& result) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT IFNULL(SUM(file_size),0) AS sum"
<< " FROM " << META_TABLEFILES << " WHERE file_type <> "
......@@ -2143,14 +2182,21 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/)
}
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, date"
<< " FROM " << META_TABLEFILES << " WHERE file_type IN ("
<< std::to_string(SegmentSchema::TO_DELETE) << "," << std::to_string(SegmentSchema::BACKUP) << ")"
<< " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";
mysqlpp::StoreQueryResult res;
{
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str();
statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, date"
<< " FROM " << META_TABLEFILES << " WHERE file_type IN ("
<< std::to_string(SegmentSchema::TO_DELETE) << "," << std::to_string(SegmentSchema::BACKUP)
<< ")"
<< " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";
mysqlpp::StoreQueryResult res = statement.store();
LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str();
res = statement.store();
}
SegmentSchema collection_file;
std::vector<std::string> delete_ids;
......@@ -2385,6 +2431,9 @@ MySQLMetaImpl::Count(const std::string& collection_id, uint64_t& result) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT row_count"
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id
......
......@@ -162,6 +162,7 @@ class MySQLMetaImpl : public Meta {
std::shared_ptr<MySQLConnectionPool> mysql_connection_pool_;
bool safe_grab_ = false;
std::mutex meta_mutex_;
std::mutex genid_mutex_;
// std::mutex connectionMutex_;
}; // DBMetaImpl
......
......@@ -253,6 +253,9 @@ SqliteMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not
try {
fiu_do_on("SqliteMetaImpl.HasCollection.throw_exception", throw std::exception());
server::MetricCollector metric;
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto collections = ConnectorPtr->select(
columns(&CollectionSchema::id_),
where(c(&CollectionSchema::collection_id_) == collection_id and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
......@@ -273,6 +276,9 @@ SqliteMetaImpl::AllCollections(std::vector<CollectionSchema>& collection_schema_
try {
fiu_do_on("SqliteMetaImpl.AllCollections.throw_exception", throw std::exception());
server::MetricCollector metric;
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto selected = ConnectorPtr->select(
columns(&CollectionSchema::id_, &CollectionSchema::collection_id_, &CollectionSchema::dimension_, &CollectionSchema::created_on_,
&CollectionSchema::flag_, &CollectionSchema::index_file_size_, &CollectionSchema::engine_type_,
......@@ -403,12 +409,7 @@ SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std::
fiu_do_on("SqliteMetaImpl.GetCollectionFiles.throw_exception", throw std::exception());
collection_files.clear();
auto files = ConnectorPtr->select(
columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_,
&SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_,
&SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_),
where(c(&SegmentSchema::collection_id_) == collection_id and in(&SegmentSchema::id_, ids) and
c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE));
CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_id;
auto status = DescribeCollection(collection_schema);
......@@ -416,8 +417,20 @@ SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std::
return status;
}
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_,
&SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_,
&SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_);
decltype(ConnectorPtr->select(select_columns)) selected;
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns,
where(c(&SegmentSchema::collection_id_) == collection_id and in(&SegmentSchema::id_, ids) and
c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE));
}
Status result;
for (auto& file : files) {
for (auto& file : selected) {
SegmentSchema file_schema;
file_schema.collection_id_ = collection_id;
file_schema.id_ = std::get<0>(file);
......@@ -451,23 +464,29 @@ SqliteMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id,
milvus::engine::meta::SegmentsSchema& collection_files) {
try {
collection_files.clear();
auto files = ConnectorPtr->select(
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
&SegmentSchema::created_on_),
where(c(&SegmentSchema::segment_id_) == segment_id and
c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE));
if (!files.empty()) {
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
&SegmentSchema::created_on_);
decltype(ConnectorPtr->select(select_columns)) selected;
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns,
where(c(&SegmentSchema::segment_id_) == segment_id and
c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE));
}
if (!selected.empty()) {
CollectionSchema collection_schema;
collection_schema.collection_id_ = std::get<1>(files[0]);
collection_schema.collection_id_ = std::get<1>(selected[0]);
auto status = DescribeCollection(collection_schema);
if (!status.ok()) {
return status;
}
for (auto& file : files) {
for (auto& file : selected) {
SegmentSchema file_schema;
file_schema.collection_id_ = collection_schema.collection_id_;
file_schema.id_ = std::get<0>(file);
......@@ -502,6 +521,9 @@ SqliteMetaImpl::UpdateCollectionFlag(const std::string& collection_id, int64_t f
server::MetricCollector metric;
fiu_do_on("SqliteMetaImpl.UpdateCollectionFlag.throw_exception", throw std::exception());
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
// set all backup file to raw
ConnectorPtr->update_all(set(c(&CollectionSchema::flag_) = flag), where(c(&CollectionSchema::collection_id_) == collection_id));
LOG_ENGINE_DEBUG_ << "Successfully update collection flag, collection id = " << collection_id;
......@@ -518,6 +540,9 @@ SqliteMetaImpl::UpdateCollectionFlushLSN(const std::string& collection_id, uint6
try {
server::MetricCollector metric;
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
ConnectorPtr->update_all(set(c(&CollectionSchema::flush_lsn_) = flush_lsn),
where(c(&CollectionSchema::collection_id_) == collection_id));
LOG_ENGINE_DEBUG_ << "Successfully update collection flush_lsn, collection id = " << collection_id << " flush_lsn = " << flush_lsn;;
......@@ -534,6 +559,9 @@ SqliteMetaImpl::GetCollectionFlushLSN(const std::string& collection_id, uint64_t
try {
server::MetricCollector metric;
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto selected =
ConnectorPtr->select(columns(&CollectionSchema::flush_lsn_), where(c(&CollectionSchema::collection_id_) == collection_id));
......@@ -920,6 +948,14 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema&
server::MetricCollector metric;
fiu_do_on("SqliteMetaImpl.FilesToSearch.throw_exception", throw std::exception());
CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_id;
auto status = DescribeCollection(collection_schema);
if (!status.ok()) {
return status;
}
// perform query
auto select_columns =
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
......@@ -930,18 +966,13 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema&
std::vector<int> file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX,
(int)SegmentSchema::INDEX};
auto match_type = in(&SegmentSchema::file_type_, file_types);
CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_id;
auto status = DescribeCollection(collection_schema);
if (!status.ok()) {
return status;
}
// perform query
decltype(ConnectorPtr->select(select_columns)) selected;
auto filter = where(match_collectionid and match_type);
selected = ConnectorPtr->select(select_columns, filter);
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto filter = where(match_collectionid and match_type);
selected = ConnectorPtr->select(select_columns, filter);
}
Status ret;
for (auto& file : selected) {
......@@ -998,13 +1029,18 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& f
}
// get files to merge
auto selected = ConnectorPtr->select(
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::created_on_),
where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::RAW and
c(&SegmentSchema::collection_id_) == collection_id),
order_by(&SegmentSchema::file_size_).desc());
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::created_on_);
decltype(ConnectorPtr->select(select_columns)) selected;
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns,
where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::RAW and
c(&SegmentSchema::collection_id_) == collection_id),
order_by(&SegmentSchema::file_size_).desc());
}
Status result;
int64_t to_merge_files = 0;
......@@ -1055,12 +1091,17 @@ SqliteMetaImpl::FilesToIndex(SegmentsSchema& files) {
server::MetricCollector metric;
auto selected = ConnectorPtr->select(
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
&SegmentSchema::created_on_),
where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::TO_INDEX));
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
&SegmentSchema::created_on_);
decltype(ConnectorPtr->select(select_columns)) selected;
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns,
where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::TO_INDEX));
}
std::map<std::string, CollectionSchema> groups;
SegmentSchema collection_file;
......@@ -1118,22 +1159,33 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, const std::vector<
Status ret = Status::OK();
CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_id;
auto status = DescribeCollection(collection_schema);
if (!status.ok()) {
return status;
}
try {
fiu_do_on("SqliteMetaImpl.FilesByType.throw_exception", throw std::exception());
files.clear();
auto selected = ConnectorPtr->select(
columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_,
&SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_,
&SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_),
where(in(&SegmentSchema::file_type_, file_types) and c(&SegmentSchema::collection_id_) == collection_id));
CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_id;
auto status = DescribeCollection(collection_schema);
if (!status.ok()) {
return status;
}
// get files by type
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_,
&SegmentSchema::file_type_,
&SegmentSchema::file_size_,
&SegmentSchema::row_count_,
&SegmentSchema::date_,
&SegmentSchema::engine_type_,
&SegmentSchema::created_on_);
decltype(ConnectorPtr->select(select_columns)) selected;
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns,
where(in(&SegmentSchema::file_type_, file_types) and c(&SegmentSchema::collection_id_) == collection_id));
}
if (selected.size() >= 1) {
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
......@@ -1232,7 +1284,6 @@ SqliteMetaImpl::FilesByID(const std::vector<size_t>& ids, SegmentsSchema& files)
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_);
std::vector<int> file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX,
(int)SegmentSchema::INDEX};
auto match_type = in(&SegmentSchema::file_type_, file_types);
......@@ -1241,7 +1292,11 @@ SqliteMetaImpl::FilesByID(const std::vector<size_t>& ids, SegmentsSchema& files)
decltype(ConnectorPtr->select(select_columns)) selected;
auto match_fileid = in(&SegmentSchema::id_, ids);
auto filter = where(match_fileid and match_type);
selected = ConnectorPtr->select(select_columns, filter);
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns, filter);
}
std::map<std::string, meta::CollectionSchema> collections;
Status ret;
......@@ -1573,9 +1628,14 @@ SqliteMetaImpl::Count(const std::string& collection_id, uint64_t& result) {
std::vector<int> file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX,
(int)SegmentSchema::INDEX};
auto selected = ConnectorPtr->select(
columns(&SegmentSchema::row_count_),
where(in(&SegmentSchema::file_type_, file_types) and c(&SegmentSchema::collection_id_) == collection_id));
auto select_columns = columns(&SegmentSchema::row_count_);
decltype(ConnectorPtr->select(select_columns)) selected;
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns,
where(in(&SegmentSchema::file_type_, file_types) and c(&SegmentSchema::collection_id_) == collection_id));
}
CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_id;
......
......@@ -128,8 +128,15 @@ IndexAnnoy::Query(const DatasetPtr& dataset_ptr, const Config& config) {
distances.reserve(k);
index_->get_nns_by_vector((const float*)p_data + i * dim, k, search_k, &result, &distances, blacklist);
memcpy(p_id + k * i, result.data(), k * sizeof(int64_t));
memcpy(p_dist + k * i, distances.data(), k * sizeof(float));
size_t result_num = result.size();
auto local_p_id = p_id + k * i;
auto local_p_dist = p_dist + k * i;
memcpy(local_p_id, result.data(), result_num * sizeof(int64_t));
memcpy(local_p_dist, distances.data(), result_num * sizeof(float));
for (; result_num < k; result_num++) {
local_p_id[result_num] = -1;
local_p_dist[result_num] = 1.0 / 0.0;
}
}
auto ret_ds = std::make_shared<Dataset>();
......
......@@ -63,7 +63,7 @@ FaissFlatPass::Run(const TaskPtr& task) {
} else {
auto best_device_id = count_ % search_gpus_.size();
LOG_SERVER_DEBUG_ << LogOut("[%s][%d] FaissFlatPass: nq > gpu_search_threshold, specify gpu %d to search!",
best_device_id, "search", 0);
"search", 0, best_device_id);
++count_;
res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, search_gpus_[best_device_id]);
}
......
......@@ -28,10 +28,6 @@
namespace milvus {
namespace segment {
Vectors::Vectors(std::vector<uint8_t> data, std::vector<doc_id_t> uids, const std::string& name)
: data_(std::move(data)), uids_(std::move(uids)), name_(name) {
}
void
Vectors::AddData(const std::vector<uint8_t>& data) {
data_.reserve(data_.size() + data.size());
......@@ -113,6 +109,16 @@ Vectors::Erase(std::vector<int32_t>& offsets) {
recorder.RecordSection(msg);
}
std::vector<uint8_t>&
Vectors::GetMutableData() {
return data_;
}
std::vector<doc_id_t>&
Vectors::GetMutableUids() {
return uids_;
}
const std::vector<uint8_t>&
Vectors::GetData() const {
return data_;
......
......@@ -28,8 +28,6 @@ using doc_id_t = int64_t;
class Vectors {
public:
Vectors(std::vector<uint8_t> data, std::vector<doc_id_t> uids, const std::string& name);
Vectors() = default;
void
......@@ -41,6 +39,12 @@ class Vectors {
void
SetName(const std::string& name);
std::vector<uint8_t>&
GetMutableData();
std::vector<doc_id_t>&
GetMutableUids();
const std::vector<uint8_t>&
GetData() const;
......
......@@ -23,7 +23,7 @@
namespace milvus {
namespace server {
constexpr uint64_t MAX_PARTITION_LIMIT = 5000;
constexpr uint64_t MAX_PARTITION_LIMIT = 4096;
CreatePartitionRequest::CreatePartitionRequest(const std::shared_ptr<milvus::server::Context>& context,
const std::string& collection_name, const std::string& tag)
......@@ -83,7 +83,7 @@ CreatePartitionRequest::OnExecute() {
std::vector<engine::meta::CollectionSchema> schema_array;
status = DBWrapper::DB()->ShowPartitions(collection_name_, schema_array);
if (schema_array.size() >= MAX_PARTITION_LIMIT) {
return Status(SERVER_UNSUPPORTED_ERROR, "The number of partitions exceeds the upper limit(5000)");
return Status(SERVER_UNSUPPORTED_ERROR, "The number of partitions exceeds the upper limit(4096)");
}
rc.RecordSection("check validation");
......
......@@ -422,12 +422,21 @@ Creates a collection.
##### Body Parameters
| Parameter | Description | Required? |
| ----------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------- |
| `collection_name` | The name of the collection to create, which must be unique within its database. | Yes |
| `dimension` | The dimension of the vectors that are to be inserted into the created collection. | Yes |
| `index_file_size` | Threshold value that triggers index building for raw data files. The default is 1024. | No |
| `metric_type` | The method vector distances are compared in Milvus. The default is L2. Currently supported metrics include `L2` (Euclidean distance), `IP` (Inner Product), `HAMMING` (Hamming distance), `JACCARD` (Jaccard distance), and `TANIMOTO` (Tanomoto distance). | No |
| Parameter | Description | Required? |
| ----------------- | ----------------------------------------------------------------------------------------- | --------- |
| `collection_name` | The name of the collection to create, which must be unique within its database. | Yes |
| `dimension` | The dimension of the vectors that are to be inserted into the created collection. | Yes |
| `index_file_size` | Threshold value that triggers index building for raw data files. The default is 1024. | No |
| `metric_type` | The method vector distances are compared in Milvus. The default is L2. | No |
* Currently supported metrics include:
- `L2` (Euclidean distance),
- `IP` (Inner Product)
- `HAMMING` (Hamming distance)
- `JACCARD` (Jaccard distance)
- `TANIMOTO` (Tanomoto distance)
- `SUBSTRUCTURE` (Sub structure distance)
- `SUPERSTRUCTURE` (Super structure distance)
#### Response
......@@ -1541,6 +1550,11 @@ For each index type, the RESTful API has specific index parameters and search pa
<td><pre><code>{"M": $int, "efConstruction": $int}</code></pre></td>
<td><pre><code>{"ef": $int}</code></pre></td>
</tr>
<tr>
<td> ANNOY</td>
<td><pre><code>{"n_trees": $int}</code></pre></td>
<td><pre><code>{"search_k": $int}</code></pre></td>
</tr>
</table>
For detailed information about the parameters above, refer to [Milvus Indexes](https://milvus.io/docs/guides/index.md)
......
......@@ -63,6 +63,7 @@ constexpr ErrorCode SERVER_CANNOT_CREATE_FILE = ToServerErrorCode(9);
constexpr ErrorCode SERVER_CANNOT_DELETE_FOLDER = ToServerErrorCode(10);
constexpr ErrorCode SERVER_CANNOT_DELETE_FILE = ToServerErrorCode(11);
constexpr ErrorCode SERVER_BUILD_INDEX_ERROR = ToServerErrorCode(12);
constexpr ErrorCode SERVER_CANNOT_OPEN_FILE = ToServerErrorCode(13);
constexpr ErrorCode SERVER_COLLECTION_NOT_EXIST = ToServerErrorCode(100);
constexpr ErrorCode SERVER_INVALID_COLLECTION_NAME = ToServerErrorCode(101);
......
......@@ -79,7 +79,10 @@ Status
ClientProxy::Connect(const ConnectParam& param) {
std::string uri = param.ip_address + ":" + param.port;
channel_ = ::grpc::CreateChannel(uri, ::grpc::InsecureChannelCredentials());
::grpc::ChannelArguments args;
args.SetMaxSendMessageSize(-1);
args.SetMaxReceiveMessageSize(-1);
channel_ = ::grpc::CreateCustomChannel(uri, ::grpc::InsecureChannelCredentials(), args);
if (channel_ != nullptr) {
connected_ = true;
client_ptr_ = std::make_shared<GrpcClient>(channel_);
......
......@@ -54,7 +54,7 @@ Follow below steps to start a standalone Milvus instance with Mishards from sour
3. Start Milvus server.
```shell
$ sudo nvidia-docker run --rm -d -p 19530:19530 -v /tmp/milvus/db:/opt/milvus/db milvusdb/milvus:0.6.0-gpu-d120719-2b40dd
$ sudo nvidia-docker run --rm -d -p 19530:19530 -v /tmp/milvus/db:/opt/milvus/db milvusdb/milvus:0.8.0-gpu-d041520-464400
```
4. Update path permissions.
......
......@@ -48,7 +48,7 @@ Python 版本为3.6及以上。
3. 启动 Milvus 服务。
```shell
$ sudo nvidia-docker run --rm -d -p 19530:19530 -v /tmp/milvus/db:/opt/milvus/db milvusdb/milvus:0.6.0-gpu-d120719-2b40dd
$ sudo nvidia-docker run --rm -d -p 19530:19530 -v /tmp/milvus/db:/opt/milvus/db milvusdb/milvus:0.8.0-gpu-d041520-464400
```
4. 更改目录权限。
......
......@@ -3,7 +3,7 @@ services:
milvus_wr:
runtime: nvidia
restart: always
image: milvusdb/milvus:0.7.1-gpu-d032920-3cdba5
image: milvusdb/milvus:0.8.0-gpu-d041520-464400
ports:
- "0.0.0.0:19540:19530"
volumes:
......@@ -13,7 +13,7 @@ services:
milvus_ro:
runtime: nvidia
restart: always
image: milvusdb/milvus:0.7.1-gpu-d032920-3cdba5
image: milvusdb/milvus:0.8.0-gpu-d041520-464400
ports:
- "0.0.0.0:19541:19530"
volumes:
......
......@@ -9,7 +9,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
version: 0.2
version: 0.3
#----------------------+------------------------------------------------------------+------------+-----------------+
# Server Config | Description | Type | Default |
......@@ -42,14 +42,19 @@ server_config:
# | Keep 'dialect://:@:/', 'dialect' can be either 'sqlite' or | | |
# | 'mysql', replace other texts with real values. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# preload_table | A comma-separated list of table names that need to be pre- | StringList | |
# | loaded when Milvus server starts up. | | |
# preload_collection | A comma-separated list of collection names that need to | StringList | |
# | be pre-loaded when Milvus server starts up. | | |
# | '*' means preload all existing tables (single-quote or | | |
# | double-quote required). | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# auto_flush_interval | The interval, in seconds, at which Milvus automatically | Integer | 1 (s) |
# | flushes data to disk. | | |
# | 0 means disable the regular flush. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
db_config:
backend_url: sqlite://:@:/
preload_table:
preload_collection:
auto_flush_interval: 1
#----------------------+------------------------------------------------------------+------------+-----------------+
# Storage Config | Description | Type | Default |
......
......@@ -9,7 +9,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
version: 0.2
version: 0.3
#----------------------+------------------------------------------------------------+------------+-----------------+
# Server Config | Description | Type | Default |
......@@ -42,14 +42,19 @@ server_config:
# | Keep 'dialect://:@:/', 'dialect' can be either 'sqlite' or | | |
# | 'mysql', replace other texts with real values. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# preload_table | A comma-separated list of table names that need to be pre- | StringList | |
# | loaded when Milvus server starts up. | | |
# preload_collection | A comma-separated list of collection names that need to | StringList | |
# | be pre-loaded when Milvus server starts up. | | |
# | '*' means preload all existing tables (single-quote or | | |
# | double-quote required). | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# auto_flush_interval | The interval, in seconds, at which Milvus automatically | Integer | 1 (s) |
# | flushes data to disk. | | |
# | 0 means disable the regular flush. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
db_config:
backend_url: sqlite://:@:/
preload_table:
preload_collection:
auto_flush_interval: 1
#----------------------+------------------------------------------------------------+------------+-----------------+
# Storage Config | Description | Type | Default |
......
......@@ -18,7 +18,7 @@ services:
milvus_wr:
runtime: nvidia
restart: always
image: milvusdb/milvus:0.7.1-gpu-d032920-3cdba5
image: milvusdb/milvus:0.8.0-gpu-d041520-464400
volumes:
- /tmp/milvus/db:/var/lib/milvus/db
- ./wr_server.yml:/var/lib/milvus/conf/server_config.yaml
......@@ -29,7 +29,7 @@ services:
milvus_ro:
runtime: nvidia
restart: always
image: milvusdb/milvus:0.7.1-gpu-d032920-3cdba5
image: milvusdb/milvus:0.8.0-gpu-d041520-464400
volumes:
- /tmp/milvus/db:/var/lib/milvus/db
- ./ro_server.yml:/var/lib/milvus/conf/server_config.yaml
......
......@@ -9,7 +9,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
version: 0.2
version: 0.3
#----------------------+------------------------------------------------------------+------------+-----------------+
# Server Config | Description | Type | Default |
......@@ -42,14 +42,19 @@ server_config:
# | Keep 'dialect://:@:/', 'dialect' can be either 'sqlite' or | | |
# | 'mysql', replace other texts with real values. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# preload_table | A comma-separated list of table names that need to be pre- | StringList | |
# | loaded when Milvus server starts up. | | |
# preload_collection | A comma-separated list of collection names that need to | StringList | |
# | be pre-loaded when Milvus server starts up. | | |
# | '*' means preload all existing tables (single-quote or | | |
# | double-quote required). | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# auto_flush_interval | The interval, in seconds, at which Milvus automatically | Integer | 1 (s) |
# | flushes data to disk. | | |
# | 0 means disable the regular flush. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
db_config:
backend_url: mysql://root:milvusroot@milvus-mysql:3306/milvus
preload_table:
preload_collection:
auto_flush_interval: 1
#----------------------+------------------------------------------------------------+------------+-----------------+
# Storage Config | Description | Type | Default |
......
......@@ -9,7 +9,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
version: 0.2
version: 0.3
#----------------------+------------------------------------------------------------+------------+-----------------+
# Server Config | Description | Type | Default |
......@@ -42,14 +42,19 @@ server_config:
# | Keep 'dialect://:@:/', 'dialect' can be either 'sqlite' or | | |
# | 'mysql', replace other texts with real values. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# preload_table | A comma-separated list of table names that need to be pre- | StringList | |
# | loaded when Milvus server starts up. | | |
# preload_collection | A comma-separated list of collection names that need to | StringList | |
# | be pre-loaded when Milvus server starts up. | | |
# | '*' means preload all existing tables (single-quote or | | |
# | double-quote required). | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# auto_flush_interval | The interval, in seconds, at which Milvus automatically | Integer | 1 (s) |
# | flushes data to disk. | | |
# | 0 means disable the regular flush. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
db_config:
backend_url: mysql://root:milvusroot@milvus-mysql:3306/milvus
preload_table:
preload_collection:
auto_flush_interval: 1
#----------------------+------------------------------------------------------------+------------+-----------------+
# Storage Config | Description | Type | Default |
......
......@@ -4,7 +4,7 @@ CONNECT_ERROR_CODE = 10001
CONNECTTION_NOT_FOUND_CODE = 10002
DB_ERROR_CODE = 10003
TABLE_NOT_FOUND_CODE = 20001
COLLECTION_NOT_FOUND_CODE = 20001
INVALID_ARGUMENT_CODE = 20002
INVALID_DATE_RANGE_CODE = 20003
INVALID_TOPK_CODE = 20004
......
......@@ -30,23 +30,23 @@ def resp_handler(err, error_code):
ids=[],
distances=[])
if resp_class == milvus_pb2.TableRowCount:
return resp_class(status=status, table_row_count=-1)
if resp_class == milvus_pb2.CollectionRowCount:
return resp_class(status=status, collection_row_count=-1)
if resp_class == milvus_pb2.TableName:
return resp_class(status=status, table_name=[])
if resp_class == milvus_pb2.CollectionName:
return resp_class(status=status, collection_name=[])
if resp_class == milvus_pb2.StringReply:
return resp_class(status=status, string_reply='')
if resp_class == milvus_pb2.TableSchema:
return milvus_pb2.TableSchema(
if resp_class == milvus_pb2.CollectionSchema:
return milvus_pb2.CollectionSchema(
status=status
)
if resp_class == milvus_pb2.IndexParam:
return milvus_pb2.IndexParam(
table_name=milvus_pb2.TableName(
collection_name=milvus_pb2.CollectionName(
status=status
)
)
......@@ -55,10 +55,10 @@ def resp_handler(err, error_code):
return status
@server.errorhandler(exceptions.TableNotFoundError)
def TableNotFoundErrorHandler(err):
@server.errorhandler(exceptions.CollectionNotFoundError)
def CollectionNotFoundErrorHandler(err):
logger.error(err)
return resp_handler(err, status_pb2.TABLE_NOT_EXISTS)
return resp_handler(err, status_pb2.COLLECTION_NOT_EXISTS)
@server.errorhandler(exceptions.InvalidTopKError)
......
......@@ -22,8 +22,8 @@ class DBError(BaseException):
code = codes.DB_ERROR_CODE
class TableNotFoundError(BaseException):
code = codes.TABLE_NOT_FOUND_CODE
class CollectionNotFoundError(BaseException):
code = codes.COLLECTION_NOT_FOUND_CODE
class InvalidTopKError(BaseException):
......
......@@ -20,25 +20,25 @@ class GrpcArgsParser(object):
@classmethod
@error_status
def parse_proto_TableSchema(cls, param):
_table_schema = {
'collection_name': param.table_name,
def parse_proto_CollectionSchema(cls, param):
_collection_schema = {
'collection_name': param.collection_name,
'dimension': param.dimension,
'index_file_size': param.index_file_size,
'metric_type': param.metric_type
}
return param.status, _table_schema
return param.status, _collection_schema
@classmethod
@error_status
def parse_proto_TableName(cls, param):
return param.table_name
def parse_proto_CollectionName(cls, param):
return param.collection_name
@classmethod
@error_status
def parse_proto_FlushParam(cls, param):
return list(param.table_name_array)
return list(param.collection_name_array)
@classmethod
@error_status
......@@ -53,7 +53,7 @@ class GrpcArgsParser(object):
@classmethod
@error_status
def parse_proto_IndexParam(cls, param):
_table_name = param.table_name
_collection_name = param.collection_name
_index_type = param.index_type
_index_param = {}
......@@ -61,7 +61,7 @@ class GrpcArgsParser(object):
if params.key == 'params':
_index_param = ujson.loads(str(params.value))
return _table_name, _index_type, _index_param
return _collection_name, _index_type, _index_param
@classmethod
@error_status
......@@ -77,15 +77,15 @@ class GrpcArgsParser(object):
@classmethod
def parse_proto_PartitionParam(cls, param):
_table_name = param.table_name
_collection_name = param.collection_name
_tag = param.tag
return _table_name, _tag
return _collection_name, _tag
@classmethod
@error_status
def parse_proto_SearchParam(cls, param):
_table_name = param.table_name
_collection_name = param.collection_name
_topk = param.topk
if len(param.extra_params) == 0:
......@@ -102,28 +102,28 @@ class GrpcArgsParser(object):
else:
raise Exception("Search argument parse error: record array is empty")
return _table_name, _query_record_array, _topk, _params
return _collection_name, _query_record_array, _topk, _params
@classmethod
@error_status
def parse_proto_DeleteByIDParam(cls, param):
_table_name = param.table_name
_collection_name = param.collection_name
_id_array = list(param.id_array)
return _table_name, _id_array
return _collection_name, _id_array
@classmethod
@error_status
def parse_proto_VectorIdentity(cls, param):
_table_name = param.table_name
_collection_name = param.collection_name
_id = param.id
return _table_name, _id
return _collection_name, _id
@classmethod
@error_status
def parse_proto_GetVectorIDsParam(cls, param):
_table__name = param.table_name
_collection__name = param.collection_name
_segment_name = param.segment_name
return _table__name, _segment_name
return _collection__name, _segment_name
# class GrpcArgsWrapper(object):
# @classmethod
# def proto_TableName(cls):
# def proto_CollectionName(cls):
......@@ -6,7 +6,7 @@ class RouterMixin:
self.writable_topo = writable_topo
self.readonly_topo = readonly_topo
def routing(self, table_name, metadata=None, **kwargs):
def routing(self, collection_name, metadata=None, **kwargs):
raise NotImplemented()
def connection(self, metadata=None):
......
......@@ -16,53 +16,53 @@ class Factory(RouterMixin):
super(Factory, self).__init__(writable_topo=writable_topo,
readonly_topo=readonly_topo)
def routing(self, table_name, partition_tags=None, metadata=None, **kwargs):
def routing(self, collection_name, partition_tags=None, metadata=None, **kwargs):
range_array = kwargs.pop('range_array', None)
return self._route(table_name, range_array, partition_tags, metadata, **kwargs)
return self._route(collection_name, range_array, partition_tags, metadata, **kwargs)
def _route(self, table_name, range_array, partition_tags=None, metadata=None, **kwargs):
def _route(self, collection_name, range_array, partition_tags=None, metadata=None, **kwargs):
# PXU TODO: Implement Thread-local Context
# PXU TODO: Session life mgt
if not partition_tags:
cond = and_(
or_(Tables.table_id == table_name, Tables.owner_table == table_name),
or_(Tables.table_id == collection_name, Tables.owner_table == collection_name),
Tables.state != Tables.TO_DELETE)
else:
# TODO: collection default partition is '_default'
cond = and_(Tables.state != Tables.TO_DELETE,
Tables.owner_table == table_name,
Tables.owner_table == collection_name,
Tables.partition_tag.in_(partition_tags))
if '_default' in partition_tags:
default_par_cond = and_(Tables.table_id == table_name, Tables.state != Tables.TO_DELETE)
default_par_cond = and_(Tables.table_id == collection_name, Tables.state != Tables.TO_DELETE)
cond = or_(cond, default_par_cond)
try:
tables = db.Session.query(Tables).filter(cond).all()
collections = db.Session.query(Tables).filter(cond).all()
except sqlalchemy_exc.SQLAlchemyError as e:
raise exceptions.DBError(message=str(e), metadata=metadata)
if not tables:
logger.error("Cannot find table {} / {} in metadata".format(table_name, partition_tags))
raise exceptions.TableNotFoundError('{}:{}'.format(table_name, partition_tags), metadata=metadata)
if not collections:
logger.error("Cannot find collection {} / {} in metadata".format(collection_name, partition_tags))
raise exceptions.CollectionNotFoundError('{}:{}'.format(collection_name, partition_tags), metadata=metadata)
table_list = [str(table.table_id) for table in tables]
collection_list = [str(collection.table_id) for collection in collections]
file_type_cond = or_(
TableFiles.file_type == TableFiles.FILE_TYPE_RAW,
TableFiles.file_type == TableFiles.FILE_TYPE_TO_INDEX,
TableFiles.file_type == TableFiles.FILE_TYPE_INDEX,
)
file_cond = and_(file_type_cond, TableFiles.table_id.in_(table_list))
file_cond = and_(file_type_cond, TableFiles.table_id.in_(collection_list))
try:
files = db.Session.query(TableFiles).filter(file_cond).all()
except sqlalchemy_exc.SQLAlchemyError as e:
raise exceptions.DBError(message=str(e), metadata=metadata)
if not files:
logger.warning("Table file is empty. {}".format(table_list))
# logger.error("Cannot find table file id {} / {} in metadata".format(table_name, partition_tags))
# raise exceptions.TableNotFoundError('Table file id not found. {}:{}'.format(table_name, partition_tags),
# metadata=metadata)
logger.warning("Collection file is empty. {}".format(collection_list))
# logger.error("Cannot find collection file id {} / {} in metadata".format(collection_name, partition_tags))
# raise exceptions.CollectionNotFoundError('Collection file id not found. {}:{}'.format(collection_name, partition_tags),
# metadata=metadata)
db.remove_session()
......
此差异已折叠。
......@@ -12,7 +12,7 @@ else:
env.read_env()
SERVER_VERSIONS = ['0.7.0', '0.7.1']
SERVER_VERSIONS = ['0.8.0']
DEBUG = env.bool('DEBUG', False)
MAX_RETRY = env.int('MAX_RETRY', 3)
......
......@@ -7,7 +7,7 @@ import faker
import inspect
from milvus import Milvus
from milvus.client.types import Status, IndexType, MetricType
from milvus.client.abstract import IndexParam, TableSchema
from milvus.client.abstract import IndexParam, CollectionSchema
from milvus.grpc_gen import status_pb2, milvus_pb2
from mishards import db, create_app, settings
from mishards.service_handler import ServiceHandler
......@@ -43,55 +43,55 @@ class TestServer:
assert not status.OK()
def test_drop_index(self, started_app):
table_name = inspect.currentframe().f_code.co_name
collection_name = inspect.currentframe().f_code.co_name
ServiceHandler._drop_index = mock.MagicMock(return_value=OK)
status = self.client.drop_index(table_name)
status = self.client.drop_index(collection_name)
assert status.OK()
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(BAD, table_name))
status = self.client.drop_index(table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(BAD, collection_name))
status = self.client.drop_index(collection_name)
assert not status.OK()
def test_describe_index(self, started_app):
table_name = inspect.currentframe().f_code.co_name
collection_name = inspect.currentframe().f_code.co_name
index_type = IndexType.FLAT
nlist = 1
index_param = IndexParam(table_name=table_name,
params = {'nlist': 1}
index_param = IndexParam(collection_name=collection_name,
index_type=index_type,
nlist=nlist)
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(OK, table_name))
params=params)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(OK, collection_name))
ServiceHandler._describe_index = mock.MagicMock(
return_value=(OK, index_param))
status, ret = self.client.describe_index(table_name)
status, ret = self.client.describe_index(collection_name)
assert status.OK()
assert ret._table_name == index_param._table_name
assert ret._collection_name == index_param._collection_name
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(BAD, table_name))
status, _ = self.client.describe_index(table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(BAD, collection_name))
status, _ = self.client.describe_index(collection_name)
assert not status.OK()
def test_preload(self, started_app):
table_name = inspect.currentframe().f_code.co_name
collection_name = inspect.currentframe().f_code.co_name
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(OK, table_name))
ServiceHandler._preload_table = mock.MagicMock(return_value=OK)
status = self.client.preload_table(table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(OK, collection_name))
ServiceHandler._preload_collection = mock.MagicMock(return_value=OK)
status = self.client.preload_collection(collection_name)
assert status.OK()
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(BAD, table_name))
status = self.client.preload_table(table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(BAD, collection_name))
status = self.client.preload_collection(collection_name)
assert not status.OK()
@pytest.mark.skip
def test_delete_by_range(self, started_app):
table_name = inspect.currentframe().f_code.co_name
collection_name = inspect.currentframe().f_code.co_name
unpacked = table_name, datetime.datetime.today(
unpacked = collection_name, datetime.datetime.today(
), datetime.datetime.today()
Parser.parse_proto_DeleteByRangeParam = mock.MagicMock(
......@@ -107,122 +107,122 @@ class TestServer:
*unpacked)
assert not status.OK()
def test_count_table(self, started_app):
table_name = inspect.currentframe().f_code.co_name
def test_count_collection(self, started_app):
collection_name = inspect.currentframe().f_code.co_name
count = random.randint(100, 200)
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(OK, table_name))
ServiceHandler._count_table = mock.MagicMock(return_value=(OK, count))
status, ret = self.client.get_table_row_count(table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(OK, collection_name))
ServiceHandler._count_collection = mock.MagicMock(return_value=(OK, count))
status, ret = self.client.count_collection(collection_name)
assert status.OK()
assert ret == count
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(BAD, table_name))
status, _ = self.client.get_table_row_count(table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(BAD, collection_name))
status, _ = self.client.count_collection(collection_name)
assert not status.OK()
def test_show_tables(self, started_app):
tables = ['t1', 't2']
ServiceHandler._show_tables = mock.MagicMock(return_value=(OK, tables))
status, ret = self.client.show_tables()
def test_show_collections(self, started_app):
collections = ['t1', 't2']
ServiceHandler._show_collections = mock.MagicMock(return_value=(OK, collections))
status, ret = self.client.show_collections()
assert status.OK()
assert ret == tables
assert ret == collections
def test_describe_table(self, started_app):
table_name = inspect.currentframe().f_code.co_name
def test_describe_collection(self, started_app):
collection_name = inspect.currentframe().f_code.co_name
dimension = 128
nlist = 1
table_schema = TableSchema(table_name=table_name,
collection_schema = CollectionSchema(collection_name=collection_name,
index_file_size=100,
metric_type=MetricType.L2,
dimension=dimension)
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(OK, table_schema.table_name))
ServiceHandler._describe_table = mock.MagicMock(
return_value=(OK, table_schema))
status, _ = self.client.describe_table(table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(OK, collection_schema.collection_name))
ServiceHandler._describe_collection = mock.MagicMock(
return_value=(OK, collection_schema))
status, _ = self.client.describe_collection(collection_name)
assert status.OK()
ServiceHandler._describe_table = mock.MagicMock(
return_value=(BAD, table_schema))
status, _ = self.client.describe_table(table_name)
ServiceHandler._describe_collection = mock.MagicMock(
return_value=(BAD, collection_schema))
status, _ = self.client.describe_collection(collection_name)
assert not status.OK()
Parser.parse_proto_TableName = mock.MagicMock(return_value=(BAD,
Parser.parse_proto_CollectionName = mock.MagicMock(return_value=(BAD,
'cmd'))
status, ret = self.client.describe_table(table_name)
status, ret = self.client.describe_collection(collection_name)
assert not status.OK()
def test_insert(self, started_app):
table_name = inspect.currentframe().f_code.co_name
collection_name = inspect.currentframe().f_code.co_name
vectors = [[random.random() for _ in range(16)] for _ in range(10)]
ids = [random.randint(1000000, 20000000) for _ in range(10)]
ServiceHandler._add_vectors = mock.MagicMock(return_value=(OK, ids))
status, ret = self.client.add_vectors(
table_name=table_name, records=vectors)
collection_name=collection_name, records=vectors)
assert status.OK()
assert ids == ret
def test_create_index(self, started_app):
table_name = inspect.currentframe().f_code.co_name
unpacks = table_name, None
collection_name = inspect.currentframe().f_code.co_name
unpacks = collection_name, None
Parser.parse_proto_IndexParam = mock.MagicMock(return_value=(OK,
unpacks))
ServiceHandler._create_index = mock.MagicMock(return_value=OK)
status = self.client.create_index(table_name=table_name)
status = self.client.create_index(collection_name=collection_name)
assert status.OK()
Parser.parse_proto_IndexParam = mock.MagicMock(return_value=(BAD,
None))
status = self.client.create_index(table_name=table_name)
status = self.client.create_index(collection_name=collection_name)
assert not status.OK()
def test_drop_table(self, started_app):
table_name = inspect.currentframe().f_code.co_name
def test_drop_collection(self, started_app):
collection_name = inspect.currentframe().f_code.co_name
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(OK, table_name))
ServiceHandler._delete_table = mock.MagicMock(return_value=OK)
status = self.client.delete_table(table_name=table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(OK, collection_name))
ServiceHandler._drop_collection = mock.MagicMock(return_value=OK)
status = self.client.drop_collection(collection_name=collection_name)
assert status.OK()
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(BAD, table_name))
status = self.client.delete_table(table_name=table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(BAD, collection_name))
status = self.client.drop_collection(collection_name=collection_name)
assert not status.OK()
def test_has_table(self, started_app):
table_name = inspect.currentframe().f_code.co_name
def test_has_collection(self, started_app):
collection_name = inspect.currentframe().f_code.co_name
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(OK, table_name))
ServiceHandler._has_table = mock.MagicMock(return_value=(OK, True))
has = self.client.has_table(table_name=table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(OK, collection_name))
ServiceHandler._has_collection = mock.MagicMock(return_value=(OK, True))
has = self.client.has_collection(collection_name=collection_name)
assert has
Parser.parse_proto_TableName = mock.MagicMock(
return_value=(BAD, table_name))
status, has = self.client.has_table(table_name=table_name)
Parser.parse_proto_CollectionName = mock.MagicMock(
return_value=(BAD, collection_name))
status, has = self.client.has_collection(collection_name=collection_name)
assert not status.OK()
assert not has
def test_create_table(self, started_app):
table_name = inspect.currentframe().f_code.co_name
def test_create_collection(self, started_app):
collection_name = inspect.currentframe().f_code.co_name
dimension = 128
table_schema = dict(table_name=table_name,
collection_schema = dict(collection_name=collection_name,
index_file_size=100,
metric_type=MetricType.L2,
dimension=dimension)
ServiceHandler._create_table = mock.MagicMock(return_value=OK)
status = self.client.create_table(table_schema)
ServiceHandler._create_collection = mock.MagicMock(return_value=OK)
status = self.client.create_collection(collection_schema)
assert status.OK()
Parser.parse_proto_TableSchema = mock.MagicMock(return_value=(BAD,
Parser.parse_proto_CollectionSchema = mock.MagicMock(return_value=(BAD,
None))
status = self.client.create_table(table_schema)
status = self.client.create_collection(collection_schema)
assert not status.OK()
def random_data(self, n, dimension):
......@@ -230,18 +230,18 @@ class TestServer:
@pytest.mark.skip
def test_search(self, started_app):
table_name = inspect.currentframe().f_code.co_name
collection_name = inspect.currentframe().f_code.co_name
to_index_cnt = random.randint(10, 20)
table = TablesFactory(table_id=table_name, state=Tables.NORMAL)
collection = TablesFactory(collection_id=collection_name, state=Tables.NORMAL)
to_index_files = TableFilesFactory.create_batch(
to_index_cnt, table=table, file_type=TableFiles.FILE_TYPE_TO_INDEX)
to_index_cnt, collection=collection, file_type=TableFiles.FILE_TYPE_TO_INDEX)
topk = random.randint(5, 10)
nq = random.randint(5, 10)
param = {
'table_name': table_name,
'query_records': self.random_data(nq, table.dimension),
'collection_name': collection_name,
'query_records': self.random_data(nq, collection.dimension),
'top_k': topk,
'nprobe': 2049
'params': {'nprobe': 2049}
}
result = [
......@@ -255,23 +255,23 @@ class TestServer:
error_code=status_pb2.SUCCESS, reason="Success"),
topk_query_result=result)
table_schema = TableSchema(table_name=table_name,
index_file_size=table.index_file_size,
metric_type=table.metric_type,
dimension=table.dimension)
collection_schema = CollectionSchema(collection_name=collection_name,
index_file_size=collection.index_file_size,
metric_type=collection.metric_type,
dimension=collection.dimension)
status, _ = self.client.search_vectors(**param)
assert status.code == Status.ILLEGAL_ARGUMENT
param['nprobe'] = 2048
param['params']['nprobe'] = 2048
RouterMixin.connection = mock.MagicMock(return_value=Milvus())
RouterMixin.query_conn.conn = mock.MagicMock(return_value=Milvus())
Milvus.describe_table = mock.MagicMock(return_value=(BAD,
table_schema))
Milvus.describe_collection = mock.MagicMock(return_value=(BAD,
collection_schema))
status, ret = self.client.search_vectors(**param)
assert status.code == Status.TABLE_NOT_EXISTS
assert status.code == Status.COLLECTION_NOT_EXISTS
Milvus.describe_table = mock.MagicMock(return_value=(OK, table_schema))
Milvus.describe_collection = mock.MagicMock(return_value=(OK, collection_schema))
Milvus.search_vectors_in_files = mock.MagicMock(
return_value=mock_results)
......
......@@ -14,7 +14,7 @@ py==1.8.0
pyasn1==0.4.7
pyasn1-modules==0.2.6
pylint==2.3.1
pymilvus==0.2.9
pymilvus==0.2.10
#pymilvus-test==0.3.3
pyparsing==2.4.0
pytest==4.6.3
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册