diff --git a/CHANGELOG.md b/CHANGELOG.md index b9ca1bc7fc51af57ca21ab8de6f7b48044b12f15..8cb65f086d1a6b27ced83b968ee685c884433add 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ Please mark all change in change log and use the issue from GitHub ## Task -# Milvus 0.8.0 (TBD) +# Milvus 0.8.0 (2020-04-15) ## Bug - \#1276 SQLite throw exception after create 50000+ partitions in a table @@ -22,6 +22,10 @@ Please mark all change in change log and use the issue from GitHub - \#1832 Fix crash in tracing module - \#1873 Fix index file serialize to incorrect path - \#1881 Fix bad alloc when index files lost +- \#1883 Fix inserted vectors becomes all zero when index_file_size >= 2GB +- \#1901 Search failed with flat index +- \#1903 Fix invalid annoy result +- \#1910 C++ SDK GetIDsInSegment could not work for large dataset ## Feature - \#261 Integrate ANNOY into Milvus @@ -41,10 +45,11 @@ Please mark all change in change log and use the issue from GitHub - \#1886 Refactor log on search and insert request - \#1897 Heap pop and push can be realized by heap_swap_top - \#1921 Use TimeRecorder instead of chrono +- \#1928 Fix too many data and uid copies when loading files +- \#1930 Upgrade mishards to v0.8.0 ## Task - # Milvus 0.7.1 (2020-03-29) ## Bug @@ -708,14 +713,14 @@ Please mark all change in change log and use the issue from GitHub - MS-16 Implement metrics without prometheus - MS-21 Implement SDK interface part 2 - MS-26 CMake. Add thirdparty packages -- MS-31 cmake: add prometheus +- MS-31 CMake: add prometheus - MS-33 cmake: add -j4 to make third party packages build faster - MS-27 Support gpu config and disable license build config in cmake - MS-47 Add query vps metrics - MS-37 Add query, cache usage, disk write speed and file data size metrics - MS-30 Use faiss v1.5.2 - MS-54 cmake: Change Thrift third party URL to github.com -- MS-69 prometheus: add all proposed metrics +- MS-69 Prometheus: add all proposed metrics ## Task diff --git a/ci/jenkins/step/singleDevNightlyTest.groovy b/ci/jenkins/step/singleDevNightlyTest.groovy index ae388c886583c88e633fe8d936ce692658420c8c..4ab2985644eb547b9e15ef5cdb64c107226e23aa 100644 --- a/ci/jenkins/step/singleDevNightlyTest.groovy +++ b/ci/jenkins/step/singleDevNightlyTest.groovy @@ -2,7 +2,7 @@ dir ('milvus-helm') { sh 'helm version' sh 'helm repo add stable https://kubernetes.oss-cn-hangzhou.aliyuncs.com/charts' sh 'helm repo update' - checkout([$class: 'GitSCM', branches: [[name: "0.8.0"]], userRemoteConfigs: [[url: "https://github.com/milvus-io/milvus-helm.git", name: 'origin', refspec: "+refs/heads/0.8.0:refs/remotes/origin/0.8.0"]]]) + checkout([$class: 'GitSCM', branches: [[name: "master"]], userRemoteConfigs: [[url: "https://github.com/milvus-io/milvus-helm.git", name: 'origin', refspec: "+refs/heads/master:refs/remotes/origin/master"]]]) sh "helm install --wait --timeout 600s --set image.repository=registry.zilliz.com/milvus/engine --set image.tag=${DOCKER_VERSION} --set image.pullPolicy=Always --set service.type=ClusterIP -f ci/db_backend/mysql_${BINARY_VERSION}_values.yaml -f ci/filebeat/values.yaml --namespace milvus ${env.HELM_RELEASE_NAME} ." } @@ -16,7 +16,7 @@ load "ci/jenkins/step/cleanupSingleDev.groovy" if (!fileExists('milvus-helm')) { dir ("milvus-helm") { - checkout([$class: 'GitSCM', branches: [[name: "0.8.0"]], userRemoteConfigs: [[url: "https://github.com/milvus-io/milvus-helm.git", name: 'origin', refspec: "+refs/heads/0.8.0:refs/remotes/origin/0.8.0"]]]) + checkout([$class: 'GitSCM', branches: [[name: "master"]], userRemoteConfigs: [[url: "https://github.com/milvus-io/milvus-helm.git", name: 'origin', refspec: "+refs/heads/master:refs/remotes/origin/master"]]]) } } dir ("milvus-helm") { diff --git a/ci/jenkins/step/singleDevTest.groovy b/ci/jenkins/step/singleDevTest.groovy index 36d4144a8a2b39645ecd45b3695f38a2e1a46800..7596e9cf6de88377d2a33de6ccc2a7b500c7a164 100644 --- a/ci/jenkins/step/singleDevTest.groovy +++ b/ci/jenkins/step/singleDevTest.groovy @@ -2,7 +2,7 @@ dir ('milvus-helm') { sh 'helm version' sh 'helm repo add stable https://kubernetes.oss-cn-hangzhou.aliyuncs.com/charts' sh 'helm repo update' - checkout([$class: 'GitSCM', branches: [[name: "0.8.0"]], userRemoteConfigs: [[url: "https://github.com/milvus-io/milvus-helm.git", name: 'origin', refspec: "+refs/heads/0.8.0:refs/remotes/origin/0.8.0"]]]) + checkout([$class: 'GitSCM', branches: [[name: "master"]], userRemoteConfigs: [[url: "https://github.com/milvus-io/milvus-helm.git", name: 'origin', refspec: "+refs/heads/master:refs/remotes/origin/master"]]]) sh "helm install --wait --timeout 600s --set image.repository=registry.zilliz.com/milvus/engine --set image.tag=${DOCKER_VERSION} --set image.pullPolicy=Always --set service.type=ClusterIP -f ci/db_backend/mysql_${BINARY_VERSION}_values.yaml -f ci/filebeat/values.yaml --namespace milvus ${env.HELM_RELEASE_NAME} ." } diff --git a/core/src/codecs/default/DefaultVectorsFormat.cpp b/core/src/codecs/default/DefaultVectorsFormat.cpp index 6b00d2612581cfcafeda8fb3239e64a1c8c25c48..9e6e50d2f00d70e13c8afb03d77961fb086cbd3d 100644 --- a/core/src/codecs/default/DefaultVectorsFormat.cpp +++ b/core/src/codecs/default/DefaultVectorsFormat.cpp @@ -31,74 +31,44 @@ namespace milvus { namespace codec { void -DefaultVectorsFormat::read_vectors_internal(const std::string& file_path, off_t offset, size_t num, - std::vector& raw_vectors) { - int rv_fd = open(file_path.c_str(), O_RDONLY, 00664); - if (rv_fd == -1) { +DefaultVectorsFormat::read_vectors_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, + off_t offset, size_t num, std::vector& raw_vectors) { + if (!fs_ptr->reader_ptr_->open(file_path.c_str())) { std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno); LOG_ENGINE_ERROR_ << err_msg; - throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); + throw Exception(SERVER_CANNOT_OPEN_FILE, err_msg); } size_t num_bytes; - if (::read(rv_fd, &num_bytes, sizeof(size_t)) == -1) { - std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno); - LOG_ENGINE_ERROR_ << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } + fs_ptr->reader_ptr_->read(&num_bytes, sizeof(size_t)); num = std::min(num, num_bytes - offset); offset += sizeof(size_t); // Beginning of file is num_bytes - int off = lseek(rv_fd, offset, SEEK_SET); - if (off == -1) { - std::string err_msg = "Failed to seek file: " + file_path + ", error: " + std::strerror(errno); - LOG_ENGINE_ERROR_ << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } + fs_ptr->reader_ptr_->seekg(offset); raw_vectors.resize(num / sizeof(uint8_t)); - if (::read(rv_fd, raw_vectors.data(), num) == -1) { - std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno); - LOG_ENGINE_ERROR_ << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } + fs_ptr->reader_ptr_->read(raw_vectors.data(), num); - if (::close(rv_fd) == -1) { - std::string err_msg = "Failed to close file: " + file_path + ", error: " + std::strerror(errno); - LOG_ENGINE_ERROR_ << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } + fs_ptr->reader_ptr_->close(); } void -DefaultVectorsFormat::read_uids_internal(const std::string& file_path, std::vector& uids) { - int uid_fd = open(file_path.c_str(), O_RDONLY, 00664); - if (uid_fd == -1) { +DefaultVectorsFormat::read_uids_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, + std::vector& uids) { + if (!fs_ptr->reader_ptr_->open(file_path.c_str())) { std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno); LOG_ENGINE_ERROR_ << err_msg; - throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); + throw Exception(SERVER_CANNOT_OPEN_FILE, err_msg); } size_t num_bytes; - if (::read(uid_fd, &num_bytes, sizeof(size_t)) == -1) { - std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno); - LOG_ENGINE_ERROR_ << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } + fs_ptr->reader_ptr_->read(&num_bytes, sizeof(size_t)); uids.resize(num_bytes / sizeof(segment::doc_id_t)); - if (::read(uid_fd, uids.data(), num_bytes) == -1) { - std::string err_msg = "Failed to read from file: " + file_path + ", error: " + std::strerror(errno); - LOG_ENGINE_ERROR_ << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } + fs_ptr->reader_ptr_->read(uids.data(), num_bytes); - if (::close(uid_fd) == -1) { - std::string err_msg = "Failed to close file: " + file_path + ", error: " + std::strerror(errno); - LOG_ENGINE_ERROR_ << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } + fs_ptr->reader_ptr_->close(); } void @@ -120,15 +90,12 @@ DefaultVectorsFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::Vectors for (; it != it_end; ++it) { const auto& path = it->path(); if (path.extension().string() == raw_vector_extension_) { - std::vector vector_list; - read_vectors_internal(path.string(), 0, INT64_MAX, vector_list); - vectors_read->AddData(vector_list); + auto& vector_list = vectors_read->GetMutableData(); + read_vectors_internal(fs_ptr, path.string(), 0, INT64_MAX, vector_list); vectors_read->SetName(path.stem().string()); - } - if (path.extension().string() == user_id_extension_) { - std::vector uids; - read_uids_internal(path.string(), uids); - vectors_read->AddUids(uids); + } else if (path.extension().string() == user_id_extension_) { + auto& uids = vectors_read->GetMutableUids(); + read_uids_internal(fs_ptr, path.string(), uids); } } } @@ -144,54 +111,28 @@ DefaultVectorsFormat::write(const storage::FSHandlerPtr& fs_ptr, const segment:: TimeRecorder rc("write vectors"); - int rv_fd = open(rv_file_path.c_str(), O_WRONLY | O_TRUNC | O_CREAT, 00664); - if (rv_fd == -1) { + if (!fs_ptr->writer_ptr_->open(rv_file_path.c_str())) { std::string err_msg = "Failed to open file: " + rv_file_path + ", error: " + std::strerror(errno); LOG_ENGINE_ERROR_ << err_msg; throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); } size_t rv_num_bytes = vectors->GetData().size() * sizeof(uint8_t); - if (::write(rv_fd, &rv_num_bytes, sizeof(size_t)) == -1) { - std::string err_msg = "Failed to write to file: " + rv_file_path + ", error: " + std::strerror(errno); - LOG_ENGINE_ERROR_ << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } - if (::write(rv_fd, vectors->GetData().data(), rv_num_bytes) == -1) { - std::string err_msg = "Failed to write to file: " + rv_file_path + ", error: " + std::strerror(errno); - LOG_ENGINE_ERROR_ << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } - if (::close(rv_fd) == -1) { - std::string err_msg = "Failed to close file: " + rv_file_path + ", error: " + std::strerror(errno); - LOG_ENGINE_ERROR_ << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } + fs_ptr->writer_ptr_->write(&rv_num_bytes, sizeof(size_t)); + fs_ptr->writer_ptr_->write((void*)vectors->GetData().data(), rv_num_bytes); + fs_ptr->writer_ptr_->close(); rc.RecordSection("write rv done"); - int uid_fd = open(uid_file_path.c_str(), O_WRONLY | O_TRUNC | O_CREAT, 00664); - if (uid_fd == -1) { + if (!fs_ptr->writer_ptr_->open(uid_file_path.c_str())) { std::string err_msg = "Failed to open file: " + uid_file_path + ", error: " + std::strerror(errno); LOG_ENGINE_ERROR_ << err_msg; throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); } size_t uid_num_bytes = vectors->GetUids().size() * sizeof(segment::doc_id_t); - if (::write(uid_fd, &uid_num_bytes, sizeof(size_t)) == -1) { - std::string err_msg = "Failed to write to file" + rv_file_path + ", error: " + std::strerror(errno); - LOG_ENGINE_ERROR_ << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } - if (::write(uid_fd, vectors->GetUids().data(), uid_num_bytes) == -1) { - std::string err_msg = "Failed to write to file" + uid_file_path + ", error: " + std::strerror(errno); - LOG_ENGINE_ERROR_ << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } - if (::close(uid_fd) == -1) { - std::string err_msg = "Failed to close file: " + uid_file_path + ", error: " + std::strerror(errno); - LOG_ENGINE_ERROR_ << err_msg; - throw Exception(SERVER_WRITE_ERROR, err_msg); - } + fs_ptr->writer_ptr_->write(&uid_num_bytes, sizeof(size_t)); + fs_ptr->writer_ptr_->write((void*)vectors->GetUids().data(), uid_num_bytes); + fs_ptr->writer_ptr_->close(); rc.RecordSection("write uids done"); } @@ -215,7 +156,7 @@ DefaultVectorsFormat::read_uids(const storage::FSHandlerPtr& fs_ptr, std::vector for (; it != it_end; ++it) { const auto& path = it->path(); if (path.extension().string() == user_id_extension_) { - read_uids_internal(path.string(), uids); + read_uids_internal(fs_ptr, path.string(), uids); } } } @@ -240,7 +181,7 @@ DefaultVectorsFormat::read_vectors(const storage::FSHandlerPtr& fs_ptr, off_t of for (; it != it_end; ++it) { const auto& path = it->path(); if (path.extension().string() == raw_vector_extension_) { - read_vectors_internal(path.string(), offset, num_bytes, raw_vectors); + read_vectors_internal(fs_ptr, path.string(), offset, num_bytes, raw_vectors); } } } diff --git a/core/src/codecs/default/DefaultVectorsFormat.h b/core/src/codecs/default/DefaultVectorsFormat.h index bfb20f221b7d44ba57cf474a780af0b8b4de3829..ac5fc89a5a25a92d60ea7e9eae245e2af2635003 100644 --- a/core/src/codecs/default/DefaultVectorsFormat.h +++ b/core/src/codecs/default/DefaultVectorsFormat.h @@ -55,10 +55,12 @@ class DefaultVectorsFormat : public VectorsFormat { private: void - read_vectors_internal(const std::string&, off_t, size_t, std::vector&); + read_vectors_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, off_t offset, size_t num, + std::vector& raw_vectors); void - read_uids_internal(const std::string&, std::vector&); + read_uids_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, + std::vector& uids); private: std::mutex mutex_; diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 9fe5785aa2b98324619c9676ad5920f6290e39ba..c1f61fc81a0a3d3726b7a674ab78b915543b7d19 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -861,6 +861,8 @@ DBImpl::GetVectorByID(const std::string& collection_id, const IDNumber& vector_i return status; } + OngoingFileChecker::GetInstance().MarkOngoingFiles(files_to_query); + std::vector partition_array; status = meta_ptr_->ShowPartitions(collection_id, partition_array); for (auto& schema : partition_array) { @@ -871,17 +873,18 @@ DBImpl::GetVectorByID(const std::string& collection_id, const IDNumber& vector_i LOG_ENGINE_ERROR_ << err_msg; return status; } + + OngoingFileChecker::GetInstance().MarkOngoingFiles(files); files_to_query.insert(files_to_query.end(), std::make_move_iterator(files.begin()), std::make_move_iterator(files.end())); } if (files_to_query.empty()) { LOG_ENGINE_DEBUG_ << "No files to get vector by id from"; - return Status::OK(); + return Status(DB_NOT_FOUND, "Collection is empty"); } cache::CpuCacheMgr::GetInstance()->PrintInfo(); - OngoingFileChecker::GetInstance().MarkOngoingFiles(files_to_query); status = GetVectorByIdHelper(collection_id, vector_id, vector, files_to_query); @@ -965,7 +968,11 @@ DBImpl::GetVectorIDs(const std::string& collection_id, const std::string& segmen Status DBImpl::GetVectorByIdHelper(const std::string& collection_id, IDNumber vector_id, VectorsData& vector, const meta::SegmentsSchema& files) { - LOG_ENGINE_DEBUG_ << "Getting vector by id in " << files.size() << " files"; + LOG_ENGINE_DEBUG_ << "Getting vector by id in " << files.size() << " files, id = " << vector_id; + + vector.vector_count_ = 0; + vector.float_data_.clear(); + vector.binary_data_.clear(); for (auto& file : files) { // Load bloom filter @@ -993,6 +1000,7 @@ DBImpl::GetVectorByIdHelper(const std::string& collection_id, IDNumber vector_id segment::DeletedDocsPtr deleted_docs_ptr; status = segment_reader.LoadDeletedDocs(deleted_docs_ptr); if (!status.ok()) { + LOG_ENGINE_ERROR_ << status.message(); return status; } auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs(); @@ -1005,6 +1013,7 @@ DBImpl::GetVectorByIdHelper(const std::string& collection_id, IDNumber vector_id std::vector raw_vector; status = segment_reader.LoadVectors(offset * single_vector_bytes, single_vector_bytes, raw_vector); if (!status.ok()) { + LOG_ENGINE_ERROR_ << status.message(); return status; } @@ -1025,6 +1034,11 @@ DBImpl::GetVectorByIdHelper(const std::string& collection_id, IDNumber vector_id } } + if (vector.binary_data_.empty() && vector.float_data_.empty()) { + std::string msg = "Vector with id " + std::to_string(vector_id) + " not found in collection " + collection_id; + LOG_ENGINE_DEBUG_ << msg; + } + return Status::OK(); } diff --git a/core/src/db/engine/ExecutionEngineImpl.cpp b/core/src/db/engine/ExecutionEngineImpl.cpp index d47fa327ad2baa96eec7ab952d224800a956296c..801cd709d8a8b962a6cc60d2c82080726db522b7 100644 --- a/core/src/db/engine/ExecutionEngineImpl.cpp +++ b/core/src/db/engine/ExecutionEngineImpl.cpp @@ -375,8 +375,6 @@ ExecutionEngineImpl::Serialize() { Status ExecutionEngineImpl::Load(bool to_cache) { - // TODO(zhiru): refactor - index_ = std::static_pointer_cast(cache::CpuCacheMgr::GetInstance()->GetIndex(location_)); bool already_in_cache = (index_ != nullptr); if (!already_in_cache) { @@ -411,21 +409,19 @@ ExecutionEngineImpl::Load(bool to_cache) { auto& vectors = segment_ptr->vectors_ptr_; auto& deleted_docs = segment_ptr->deleted_docs_ptr_->GetDeletedDocs(); - auto vectors_uids = vectors->GetUids(); + auto& vectors_uids = vectors->GetMutableUids(); + auto count = vectors_uids.size(); index_->SetUids(vectors_uids); LOG_ENGINE_DEBUG_ << "set uids " << index_->GetUids().size() << " for index " << location_; - auto vectors_data = vectors->GetData(); + auto& vectors_data = vectors->GetData(); - faiss::ConcurrentBitsetPtr concurrent_bitset_ptr = - std::make_shared(vectors->GetCount()); + faiss::ConcurrentBitsetPtr concurrent_bitset_ptr = std::make_shared(count); for (auto& offset : deleted_docs) { - if (!concurrent_bitset_ptr->test(offset)) { - concurrent_bitset_ptr->set(offset); - } + concurrent_bitset_ptr->set(offset); } - auto dataset = knowhere::GenDataset(vectors->GetCount(), this->dim_, vectors_data.data()); + auto dataset = knowhere::GenDataset(count, this->dim_, vectors_data.data()); if (index_type_ == EngineType::FAISS_IDMAP) { auto bf_index = std::static_pointer_cast(index_); bf_index->Train(knowhere::DatasetPtr(), conf); diff --git a/core/src/db/meta/MySQLMetaImpl.cpp b/core/src/db/meta/MySQLMetaImpl.cpp index d974cc7b7407e3c18b6ccfb2a5b6431c87cfda8b..5c75c8c1232f749012389e2e5dfc7092efad4ecc 100644 --- a/core/src/db/meta/MySQLMetaImpl.cpp +++ b/core/src/db/meta/MySQLMetaImpl.cpp @@ -651,6 +651,9 @@ MySQLMetaImpl::DeleteCollectionFiles(const std::string& collection_id) { return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(meta_mutex_); + // soft delete collection files mysqlpp::Query statement = connectionPtr->query(); // @@ -726,6 +729,9 @@ MySQLMetaImpl::CreateCollectionFile(SegmentSchema& file_schema) { return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(meta_mutex_); + mysqlpp::Query statement = connectionPtr->query(); statement << "INSERT INTO " << META_TABLEFILES << " VALUES(" << id << ", " << mysqlpp::quote @@ -777,6 +783,9 @@ MySQLMetaImpl::GetCollectionFiles(const std::string& collection_id, const std::v return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(meta_mutex_); + mysqlpp::Query statement = connectionPtr->query(); statement << "SELECT id, segment_id, engine_type, file_id, file_type, file_size, row_count, date, created_on" @@ -834,6 +843,9 @@ MySQLMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id, return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(meta_mutex_); + mysqlpp::Query statement = connectionPtr->query(); statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, " << "row_count, date, created_on" @@ -897,15 +909,14 @@ MySQLMetaImpl::UpdateCollectionIndex(const std::string& collection_id, const Col return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } - mysqlpp::Query updateCollectionIndexParamQuery = connectionPtr->query(); - updateCollectionIndexParamQuery << "SELECT id, state, dimension, created_on" - << " FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote - << collection_id << " AND state <> " - << std::to_string(CollectionSchema::TO_DELETE) << ";"; + mysqlpp::Query statement = connectionPtr->query(); + statement << "SELECT id, state, dimension, created_on" + << " FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote << collection_id + << " AND state <> " << std::to_string(CollectionSchema::TO_DELETE) << ";"; - LOG_ENGINE_DEBUG_ << "UpdateCollectionIndex: " << updateCollectionIndexParamQuery.str(); + LOG_ENGINE_DEBUG_ << "UpdateCollectionIndex: " << statement.str(); - mysqlpp::StoreQueryResult res = updateCollectionIndexParamQuery.store(); + mysqlpp::StoreQueryResult res = statement.store(); if (res.num_rows() == 1) { const mysqlpp::Row& resRow = res[0]; @@ -915,18 +926,16 @@ MySQLMetaImpl::UpdateCollectionIndex(const std::string& collection_id, const Col uint16_t dimension = resRow["dimension"]; int64_t created_on = resRow["created_on"]; - updateCollectionIndexParamQuery - << "UPDATE " << META_TABLES << " SET id = " << id << " ,state = " << state - << " ,dimension = " << dimension << " ,created_on = " << created_on - << " ,engine_type = " << index.engine_type_ << " ,index_params = " << mysqlpp::quote - << index.extra_params_.dump() << " ,metric_type = " << index.metric_type_ - << " WHERE table_id = " << mysqlpp::quote << collection_id << ";"; + statement << "UPDATE " << META_TABLES << " SET id = " << id << " ,state = " << state + << " ,dimension = " << dimension << " ,created_on = " << created_on + << " ,engine_type = " << index.engine_type_ << " ,index_params = " << mysqlpp::quote + << index.extra_params_.dump() << " ,metric_type = " << index.metric_type_ + << " WHERE table_id = " << mysqlpp::quote << collection_id << ";"; - LOG_ENGINE_DEBUG_ << "UpdateCollectionIndex: " << updateCollectionIndexParamQuery.str(); + LOG_ENGINE_DEBUG_ << "UpdateCollectionIndex: " << statement.str(); - if (!updateCollectionIndexParamQuery.exec()) { - return HandleException("Failed to update collection index", - updateCollectionIndexParamQuery.error()); + if (!statement.exec()) { + return HandleException("Failed to update collection index", statement.error()); } } else { return Status(DB_NOT_FOUND, "Collection " + collection_id + " not found"); @@ -1019,7 +1028,7 @@ MySQLMetaImpl::GetCollectionFlushLSN(const std::string& collection_id, uint64_t& } mysqlpp::Query statement = connectionPtr->query(); - statement << "SELECT flush_lsn FROM " << META_TABLES << " WHERE collection_id = " << mysqlpp::quote + statement << "SELECT flush_lsn FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote << collection_id << ";"; LOG_ENGINE_DEBUG_ << "GetCollectionFlushLSN: " << statement.str(); @@ -1054,6 +1063,9 @@ MySQLMetaImpl::UpdateCollectionFile(SegmentSchema& file_schema) { return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(meta_mutex_); + mysqlpp::Query statement = connectionPtr->query(); // if the collection has been deleted, just mark the collection file as TO_DELETE @@ -1120,6 +1132,9 @@ MySQLMetaImpl::UpdateCollectionFilesToIndex(const std::string& collection_id) { return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(meta_mutex_); + mysqlpp::Query statement = connectionPtr->query(); statement << "UPDATE " << META_TABLEFILES << " SET file_type = " << std::to_string(SegmentSchema::TO_INDEX) @@ -1155,6 +1170,9 @@ MySQLMetaImpl::UpdateCollectionFiles(SegmentsSchema& files) { return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(meta_mutex_); + mysqlpp::Query statement = connectionPtr->query(); std::map has_collections; @@ -1229,6 +1247,9 @@ MySQLMetaImpl::UpdateCollectionFilesRowCount(SegmentsSchema& files) { return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(meta_mutex_); + mysqlpp::Query statement = connectionPtr->query(); for (auto& file : files) { @@ -1442,7 +1463,7 @@ MySQLMetaImpl::ShowPartitions(const std::string& collection_id, << " WHERE owner_table = " << mysqlpp::quote << collection_id << " AND state <> " << std::to_string(CollectionSchema::TO_DELETE) << ";"; - LOG_ENGINE_DEBUG_ << "AllCollections: " << statement.str(); + LOG_ENGINE_DEBUG_ << "ShowPartitions: " << statement.str(); res = statement.store(); } // Scoped Connection @@ -1498,7 +1519,7 @@ MySQLMetaImpl::GetPartitionName(const std::string& collection_id, const std::str << collection_id << " AND partition_tag = " << mysqlpp::quote << valid_tag << " AND state <> " << std::to_string(CollectionSchema::TO_DELETE) << ";"; - LOG_ENGINE_DEBUG_ << "AllCollections: " << statement.str(); + LOG_ENGINE_DEBUG_ << "GetPartitionName: " << statement.str(); res = statement.store(); } // Scoped Connection @@ -1533,6 +1554,9 @@ MySQLMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema& f return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(meta_mutex_); + mysqlpp::Query statement = connectionPtr->query(); statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date" << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id; @@ -1615,6 +1639,9 @@ MySQLMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& fi return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(meta_mutex_); + mysqlpp::Query statement = connectionPtr->query(); statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date, " "engine_type, created_on" @@ -1684,6 +1711,9 @@ MySQLMetaImpl::FilesToIndex(SegmentsSchema& files) { return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(meta_mutex_); + mysqlpp::Query statement = connectionPtr->query(); statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, " "row_count, date, created_on" @@ -1773,16 +1803,19 @@ MySQLMetaImpl::FilesByType(const std::string& collection_id, const std::vectorquery(); + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(meta_mutex_); + + mysqlpp::Query statement = connectionPtr->query(); // since collection_id is a unique column we just need to check whether it exists or not - hasNonIndexFilesQuery + statement << "SELECT id, segment_id, engine_type, file_id, file_type, file_size, row_count, date, created_on" << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id << " AND file_type in (" << types << ");"; - LOG_ENGINE_DEBUG_ << "FilesByType: " << hasNonIndexFilesQuery.str(); + LOG_ENGINE_DEBUG_ << "FilesByType: " << statement.str(); - res = hasNonIndexFilesQuery.store(); + res = statement.store(); } // Scoped Connection CollectionSchema collection_schema; @@ -1906,6 +1939,9 @@ MySQLMetaImpl::FilesByID(const std::vector& ids, SegmentsSchema& files) return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(meta_mutex_); + mysqlpp::Query statement = connectionPtr->query(); statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date" << " FROM " << META_TABLEFILES; @@ -2054,6 +2090,9 @@ MySQLMetaImpl::Size(uint64_t& result) { return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(meta_mutex_); + mysqlpp::Query statement = connectionPtr->query(); statement << "SELECT IFNULL(SUM(file_size),0) AS sum" << " FROM " << META_TABLEFILES << " WHERE file_type <> " @@ -2143,14 +2182,21 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/) } mysqlpp::Query statement = connectionPtr->query(); - statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, date" - << " FROM " << META_TABLEFILES << " WHERE file_type IN (" - << std::to_string(SegmentSchema::TO_DELETE) << "," << std::to_string(SegmentSchema::BACKUP) << ")" - << " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";"; + mysqlpp::StoreQueryResult res; + { + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(meta_mutex_); - LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str(); + statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, date" + << " FROM " << META_TABLEFILES << " WHERE file_type IN (" + << std::to_string(SegmentSchema::TO_DELETE) << "," << std::to_string(SegmentSchema::BACKUP) + << ")" + << " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";"; - mysqlpp::StoreQueryResult res = statement.store(); + LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str(); + + res = statement.store(); + } SegmentSchema collection_file; std::vector delete_ids; @@ -2385,6 +2431,9 @@ MySQLMetaImpl::Count(const std::string& collection_id, uint64_t& result) { return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(meta_mutex_); + mysqlpp::Query statement = connectionPtr->query(); statement << "SELECT row_count" << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id diff --git a/core/src/db/meta/MySQLMetaImpl.h b/core/src/db/meta/MySQLMetaImpl.h index f449ebf2a6cb462e3fdaa4f2aaae262138bea869..c14c215af2df55da1ac6c02f8fd2de46c2cf12ea 100644 --- a/core/src/db/meta/MySQLMetaImpl.h +++ b/core/src/db/meta/MySQLMetaImpl.h @@ -162,6 +162,7 @@ class MySQLMetaImpl : public Meta { std::shared_ptr mysql_connection_pool_; bool safe_grab_ = false; + std::mutex meta_mutex_; std::mutex genid_mutex_; // std::mutex connectionMutex_; }; // DBMetaImpl diff --git a/core/src/db/meta/SqliteMetaImpl.cpp b/core/src/db/meta/SqliteMetaImpl.cpp index 708b542e25cc97ab2ec28f469937f68b734c1acf..fbe958d617f37b5c365a63f17c858c5ce9afa518 100644 --- a/core/src/db/meta/SqliteMetaImpl.cpp +++ b/core/src/db/meta/SqliteMetaImpl.cpp @@ -253,6 +253,9 @@ SqliteMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not try { fiu_do_on("SqliteMetaImpl.HasCollection.throw_exception", throw std::exception()); server::MetricCollector metric; + + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); auto collections = ConnectorPtr->select( columns(&CollectionSchema::id_), where(c(&CollectionSchema::collection_id_) == collection_id and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); @@ -273,6 +276,9 @@ SqliteMetaImpl::AllCollections(std::vector& collection_schema_ try { fiu_do_on("SqliteMetaImpl.AllCollections.throw_exception", throw std::exception()); server::MetricCollector metric; + + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); auto selected = ConnectorPtr->select( columns(&CollectionSchema::id_, &CollectionSchema::collection_id_, &CollectionSchema::dimension_, &CollectionSchema::created_on_, &CollectionSchema::flag_, &CollectionSchema::index_file_size_, &CollectionSchema::engine_type_, @@ -403,12 +409,7 @@ SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std:: fiu_do_on("SqliteMetaImpl.GetCollectionFiles.throw_exception", throw std::exception()); collection_files.clear(); - auto files = ConnectorPtr->select( - columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, - &SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_, - &SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_), - where(c(&SegmentSchema::collection_id_) == collection_id and in(&SegmentSchema::id_, ids) and - c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE)); + CollectionSchema collection_schema; collection_schema.collection_id_ = collection_id; auto status = DescribeCollection(collection_schema); @@ -416,8 +417,20 @@ SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std:: return status; } + auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, + &SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_, + &SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_); + decltype(ConnectorPtr->select(select_columns)) selected; + { + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + selected = ConnectorPtr->select(select_columns, + where(c(&SegmentSchema::collection_id_) == collection_id and in(&SegmentSchema::id_, ids) and + c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE)); + } + Status result; - for (auto& file : files) { + for (auto& file : selected) { SegmentSchema file_schema; file_schema.collection_id_ = collection_id; file_schema.id_ = std::get<0>(file); @@ -451,23 +464,29 @@ SqliteMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id, milvus::engine::meta::SegmentsSchema& collection_files) { try { collection_files.clear(); - auto files = ConnectorPtr->select( - columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, - &SegmentSchema::created_on_), - where(c(&SegmentSchema::segment_id_) == segment_id and - c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE)); - if (!files.empty()) { + auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, + &SegmentSchema::created_on_); + decltype(ConnectorPtr->select(select_columns)) selected; + { + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + selected = ConnectorPtr->select(select_columns, + where(c(&SegmentSchema::segment_id_) == segment_id and + c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE)); + } + + if (!selected.empty()) { CollectionSchema collection_schema; - collection_schema.collection_id_ = std::get<1>(files[0]); + collection_schema.collection_id_ = std::get<1>(selected[0]); auto status = DescribeCollection(collection_schema); if (!status.ok()) { return status; } - for (auto& file : files) { + for (auto& file : selected) { SegmentSchema file_schema; file_schema.collection_id_ = collection_schema.collection_id_; file_schema.id_ = std::get<0>(file); @@ -502,6 +521,9 @@ SqliteMetaImpl::UpdateCollectionFlag(const std::string& collection_id, int64_t f server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.UpdateCollectionFlag.throw_exception", throw std::exception()); + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + // set all backup file to raw ConnectorPtr->update_all(set(c(&CollectionSchema::flag_) = flag), where(c(&CollectionSchema::collection_id_) == collection_id)); LOG_ENGINE_DEBUG_ << "Successfully update collection flag, collection id = " << collection_id; @@ -518,6 +540,9 @@ SqliteMetaImpl::UpdateCollectionFlushLSN(const std::string& collection_id, uint6 try { server::MetricCollector metric; + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + ConnectorPtr->update_all(set(c(&CollectionSchema::flush_lsn_) = flush_lsn), where(c(&CollectionSchema::collection_id_) == collection_id)); LOG_ENGINE_DEBUG_ << "Successfully update collection flush_lsn, collection id = " << collection_id << " flush_lsn = " << flush_lsn;; @@ -534,6 +559,9 @@ SqliteMetaImpl::GetCollectionFlushLSN(const std::string& collection_id, uint64_t try { server::MetricCollector metric; + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + auto selected = ConnectorPtr->select(columns(&CollectionSchema::flush_lsn_), where(c(&CollectionSchema::collection_id_) == collection_id)); @@ -920,6 +948,14 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema& server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.FilesToSearch.throw_exception", throw std::exception()); + CollectionSchema collection_schema; + collection_schema.collection_id_ = collection_id; + auto status = DescribeCollection(collection_schema); + if (!status.ok()) { + return status; + } + + // perform query auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, @@ -930,18 +966,13 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema& std::vector file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX, (int)SegmentSchema::INDEX}; auto match_type = in(&SegmentSchema::file_type_, file_types); - - CollectionSchema collection_schema; - collection_schema.collection_id_ = collection_id; - auto status = DescribeCollection(collection_schema); - if (!status.ok()) { - return status; - } - - // perform query decltype(ConnectorPtr->select(select_columns)) selected; - auto filter = where(match_collectionid and match_type); - selected = ConnectorPtr->select(select_columns, filter); + { + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + auto filter = where(match_collectionid and match_type); + selected = ConnectorPtr->select(select_columns, filter); + } Status ret; for (auto& file : selected) { @@ -998,13 +1029,18 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& f } // get files to merge - auto selected = ConnectorPtr->select( - columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::created_on_), - where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::RAW and - c(&SegmentSchema::collection_id_) == collection_id), - order_by(&SegmentSchema::file_size_).desc()); + auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::created_on_); + decltype(ConnectorPtr->select(select_columns)) selected; + { + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + selected = ConnectorPtr->select(select_columns, + where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::RAW and + c(&SegmentSchema::collection_id_) == collection_id), + order_by(&SegmentSchema::file_size_).desc()); + } Status result; int64_t to_merge_files = 0; @@ -1055,12 +1091,17 @@ SqliteMetaImpl::FilesToIndex(SegmentsSchema& files) { server::MetricCollector metric; - auto selected = ConnectorPtr->select( - columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, - &SegmentSchema::created_on_), - where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::TO_INDEX)); + auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, + &SegmentSchema::created_on_); + decltype(ConnectorPtr->select(select_columns)) selected; + { + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + selected = ConnectorPtr->select(select_columns, + where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::TO_INDEX)); + } std::map groups; SegmentSchema collection_file; @@ -1118,22 +1159,33 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, const std::vector< Status ret = Status::OK(); - CollectionSchema collection_schema; - collection_schema.collection_id_ = collection_id; - auto status = DescribeCollection(collection_schema); - if (!status.ok()) { - return status; - } - try { fiu_do_on("SqliteMetaImpl.FilesByType.throw_exception", throw std::exception()); - files.clear(); - auto selected = ConnectorPtr->select( - columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, - &SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_, - &SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_), - where(in(&SegmentSchema::file_type_, file_types) and c(&SegmentSchema::collection_id_) == collection_id)); + + CollectionSchema collection_schema; + collection_schema.collection_id_ = collection_id; + auto status = DescribeCollection(collection_schema); + if (!status.ok()) { + return status; + } + + // get files by type + auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, + &SegmentSchema::file_type_, + &SegmentSchema::file_size_, + &SegmentSchema::row_count_, + &SegmentSchema::date_, + &SegmentSchema::engine_type_, + &SegmentSchema::created_on_); + decltype(ConnectorPtr->select(select_columns)) selected; + { + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + selected = ConnectorPtr->select(select_columns, + where(in(&SegmentSchema::file_type_, file_types) and c(&SegmentSchema::collection_id_) == collection_id)); + } if (selected.size() >= 1) { int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0; @@ -1232,7 +1284,6 @@ SqliteMetaImpl::FilesByID(const std::vector& ids, SegmentsSchema& files) &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_); - std::vector file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX, (int)SegmentSchema::INDEX}; auto match_type = in(&SegmentSchema::file_type_, file_types); @@ -1241,7 +1292,11 @@ SqliteMetaImpl::FilesByID(const std::vector& ids, SegmentsSchema& files) decltype(ConnectorPtr->select(select_columns)) selected; auto match_fileid = in(&SegmentSchema::id_, ids); auto filter = where(match_fileid and match_type); - selected = ConnectorPtr->select(select_columns, filter); + { + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + selected = ConnectorPtr->select(select_columns, filter); + } std::map collections; Status ret; @@ -1573,9 +1628,14 @@ SqliteMetaImpl::Count(const std::string& collection_id, uint64_t& result) { std::vector file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX, (int)SegmentSchema::INDEX}; - auto selected = ConnectorPtr->select( - columns(&SegmentSchema::row_count_), - where(in(&SegmentSchema::file_type_, file_types) and c(&SegmentSchema::collection_id_) == collection_id)); + auto select_columns = columns(&SegmentSchema::row_count_); + decltype(ConnectorPtr->select(select_columns)) selected; + { + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + selected = ConnectorPtr->select(select_columns, + where(in(&SegmentSchema::file_type_, file_types) and c(&SegmentSchema::collection_id_) == collection_id)); + } CollectionSchema collection_schema; collection_schema.collection_id_ = collection_id; diff --git a/core/src/index/knowhere/knowhere/index/vector_index/IndexAnnoy.cpp b/core/src/index/knowhere/knowhere/index/vector_index/IndexAnnoy.cpp index 5ef6d27f623a146600b5d0c56c380a681c45c7d2..59d425b3bcccc09409dfe61023256f57e51a5418 100644 --- a/core/src/index/knowhere/knowhere/index/vector_index/IndexAnnoy.cpp +++ b/core/src/index/knowhere/knowhere/index/vector_index/IndexAnnoy.cpp @@ -128,8 +128,15 @@ IndexAnnoy::Query(const DatasetPtr& dataset_ptr, const Config& config) { distances.reserve(k); index_->get_nns_by_vector((const float*)p_data + i * dim, k, search_k, &result, &distances, blacklist); - memcpy(p_id + k * i, result.data(), k * sizeof(int64_t)); - memcpy(p_dist + k * i, distances.data(), k * sizeof(float)); + size_t result_num = result.size(); + auto local_p_id = p_id + k * i; + auto local_p_dist = p_dist + k * i; + memcpy(local_p_id, result.data(), result_num * sizeof(int64_t)); + memcpy(local_p_dist, distances.data(), result_num * sizeof(float)); + for (; result_num < k; result_num++) { + local_p_id[result_num] = -1; + local_p_dist[result_num] = 1.0 / 0.0; + } } auto ret_ds = std::make_shared(); diff --git a/core/src/scheduler/optimizer/FaissFlatPass.cpp b/core/src/scheduler/optimizer/FaissFlatPass.cpp index 871a833eacf6f97b4794feab0cb2b66bd6af7808..bb9f0814f9da09adf4e340ceecdbcde68c761f6c 100644 --- a/core/src/scheduler/optimizer/FaissFlatPass.cpp +++ b/core/src/scheduler/optimizer/FaissFlatPass.cpp @@ -63,7 +63,7 @@ FaissFlatPass::Run(const TaskPtr& task) { } else { auto best_device_id = count_ % search_gpus_.size(); LOG_SERVER_DEBUG_ << LogOut("[%s][%d] FaissFlatPass: nq > gpu_search_threshold, specify gpu %d to search!", - best_device_id, "search", 0); + "search", 0, best_device_id); ++count_; res_ptr = ResMgrInst::GetInstance()->GetResource(ResourceType::GPU, search_gpus_[best_device_id]); } diff --git a/core/src/segment/Vectors.cpp b/core/src/segment/Vectors.cpp index a8555089aff0e6cbec46fab6644d1fd20a878ebc..d0a7622e2ef79c2c68e4af7af76acaa91b263853 100644 --- a/core/src/segment/Vectors.cpp +++ b/core/src/segment/Vectors.cpp @@ -28,10 +28,6 @@ namespace milvus { namespace segment { -Vectors::Vectors(std::vector data, std::vector uids, const std::string& name) - : data_(std::move(data)), uids_(std::move(uids)), name_(name) { -} - void Vectors::AddData(const std::vector& data) { data_.reserve(data_.size() + data.size()); @@ -113,6 +109,16 @@ Vectors::Erase(std::vector& offsets) { recorder.RecordSection(msg); } +std::vector& +Vectors::GetMutableData() { + return data_; +} + +std::vector& +Vectors::GetMutableUids() { + return uids_; +} + const std::vector& Vectors::GetData() const { return data_; diff --git a/core/src/segment/Vectors.h b/core/src/segment/Vectors.h index 2be6e62646c30472cb61671626729b5b9bed2f29..b5594cc9570abf6ec6ddf5b9ae9c10c188b47a07 100644 --- a/core/src/segment/Vectors.h +++ b/core/src/segment/Vectors.h @@ -28,8 +28,6 @@ using doc_id_t = int64_t; class Vectors { public: - Vectors(std::vector data, std::vector uids, const std::string& name); - Vectors() = default; void @@ -41,6 +39,12 @@ class Vectors { void SetName(const std::string& name); + std::vector& + GetMutableData(); + + std::vector& + GetMutableUids(); + const std::vector& GetData() const; diff --git a/core/src/server/delivery/request/CreatePartitionRequest.cpp b/core/src/server/delivery/request/CreatePartitionRequest.cpp index e33c71a8d51e69a9e10c5d3d88b9132bf2c5f442..0b0687063c3961becebc94020be45af1ca7f2c92 100644 --- a/core/src/server/delivery/request/CreatePartitionRequest.cpp +++ b/core/src/server/delivery/request/CreatePartitionRequest.cpp @@ -23,7 +23,7 @@ namespace milvus { namespace server { -constexpr uint64_t MAX_PARTITION_LIMIT = 5000; +constexpr uint64_t MAX_PARTITION_LIMIT = 4096; CreatePartitionRequest::CreatePartitionRequest(const std::shared_ptr& context, const std::string& collection_name, const std::string& tag) @@ -83,7 +83,7 @@ CreatePartitionRequest::OnExecute() { std::vector schema_array; status = DBWrapper::DB()->ShowPartitions(collection_name_, schema_array); if (schema_array.size() >= MAX_PARTITION_LIMIT) { - return Status(SERVER_UNSUPPORTED_ERROR, "The number of partitions exceeds the upper limit(5000)"); + return Status(SERVER_UNSUPPORTED_ERROR, "The number of partitions exceeds the upper limit(4096)"); } rc.RecordSection("check validation"); diff --git a/core/src/server/web_impl/README.md b/core/src/server/web_impl/README.md index 7e102caccf58d3bc6c64e4364fa9ef5171e650f2..00db5f053d41298808e09d96fd22318619968eb9 100644 --- a/core/src/server/web_impl/README.md +++ b/core/src/server/web_impl/README.md @@ -422,12 +422,21 @@ Creates a collection. ##### Body Parameters -| Parameter | Description | Required? | -| ----------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------- | -| `collection_name` | The name of the collection to create, which must be unique within its database. | Yes | -| `dimension` | The dimension of the vectors that are to be inserted into the created collection. | Yes | -| `index_file_size` | Threshold value that triggers index building for raw data files. The default is 1024. | No | -| `metric_type` | The method vector distances are compared in Milvus. The default is L2. Currently supported metrics include `L2` (Euclidean distance), `IP` (Inner Product), `HAMMING` (Hamming distance), `JACCARD` (Jaccard distance), and `TANIMOTO` (Tanomoto distance). | No | +| Parameter | Description | Required? | +| ----------------- | ----------------------------------------------------------------------------------------- | --------- | +| `collection_name` | The name of the collection to create, which must be unique within its database. | Yes | +| `dimension` | The dimension of the vectors that are to be inserted into the created collection. | Yes | +| `index_file_size` | Threshold value that triggers index building for raw data files. The default is 1024. | No | +| `metric_type` | The method vector distances are compared in Milvus. The default is L2. | No | + +* Currently supported metrics include: + - `L2` (Euclidean distance), + - `IP` (Inner Product) + - `HAMMING` (Hamming distance) + - `JACCARD` (Jaccard distance) + - `TANIMOTO` (Tanomoto distance) + - `SUBSTRUCTURE` (Sub structure distance) + - `SUPERSTRUCTURE` (Super structure distance) #### Response @@ -1541,6 +1550,11 @@ For each index type, the RESTful API has specific index parameters and search pa
{"M": $int, "efConstruction": $int}
{"ef": $int}
+ + ANNOY +
{"n_trees": $int}
+
{"search_k": $int}
+ For detailed information about the parameters above, refer to [Milvus Indexes](https://milvus.io/docs/guides/index.md) diff --git a/core/src/utils/Error.h b/core/src/utils/Error.h index aaf3612f03f7ef22f00cee21d882974080800291..31c401dca108488243d9b5d8be55590bd7c08f20 100644 --- a/core/src/utils/Error.h +++ b/core/src/utils/Error.h @@ -63,6 +63,7 @@ constexpr ErrorCode SERVER_CANNOT_CREATE_FILE = ToServerErrorCode(9); constexpr ErrorCode SERVER_CANNOT_DELETE_FOLDER = ToServerErrorCode(10); constexpr ErrorCode SERVER_CANNOT_DELETE_FILE = ToServerErrorCode(11); constexpr ErrorCode SERVER_BUILD_INDEX_ERROR = ToServerErrorCode(12); +constexpr ErrorCode SERVER_CANNOT_OPEN_FILE = ToServerErrorCode(13); constexpr ErrorCode SERVER_COLLECTION_NOT_EXIST = ToServerErrorCode(100); constexpr ErrorCode SERVER_INVALID_COLLECTION_NAME = ToServerErrorCode(101); diff --git a/sdk/grpc/ClientProxy.cpp b/sdk/grpc/ClientProxy.cpp index 3a8d5319c6a435afeba99fd1e4e8fbc91973f71d..b0dc36aa42555e4beb69e5aa30d8c85feb5bfb5c 100644 --- a/sdk/grpc/ClientProxy.cpp +++ b/sdk/grpc/ClientProxy.cpp @@ -79,7 +79,10 @@ Status ClientProxy::Connect(const ConnectParam& param) { std::string uri = param.ip_address + ":" + param.port; - channel_ = ::grpc::CreateChannel(uri, ::grpc::InsecureChannelCredentials()); + ::grpc::ChannelArguments args; + args.SetMaxSendMessageSize(-1); + args.SetMaxReceiveMessageSize(-1); + channel_ = ::grpc::CreateCustomChannel(uri, ::grpc::InsecureChannelCredentials(), args); if (channel_ != nullptr) { connected_ = true; client_ptr_ = std::make_shared(channel_); diff --git a/shards/README.md b/shards/README.md index c74c58ac32c7dbb73b56b3fca19781505fc0fa99..e9fd56716ce3881579d8c2285c0dd56202a1621a 100644 --- a/shards/README.md +++ b/shards/README.md @@ -54,7 +54,7 @@ Follow below steps to start a standalone Milvus instance with Mishards from sour 3. Start Milvus server. ```shell - $ sudo nvidia-docker run --rm -d -p 19530:19530 -v /tmp/milvus/db:/opt/milvus/db milvusdb/milvus:0.6.0-gpu-d120719-2b40dd + $ sudo nvidia-docker run --rm -d -p 19530:19530 -v /tmp/milvus/db:/opt/milvus/db milvusdb/milvus:0.8.0-gpu-d041520-464400 ``` 4. Update path permissions. diff --git a/shards/README_CN.md b/shards/README_CN.md index 5ee079c72a35314836999c98d8ee08d00faef951..a60ec20d3892c5acbc5e0e0851b1117068b8a25e 100644 --- a/shards/README_CN.md +++ b/shards/README_CN.md @@ -48,7 +48,7 @@ Python 版本为3.6及以上。 3. 启动 Milvus 服务。 ```shell - $ sudo nvidia-docker run --rm -d -p 19530:19530 -v /tmp/milvus/db:/opt/milvus/db milvusdb/milvus:0.6.0-gpu-d120719-2b40dd + $ sudo nvidia-docker run --rm -d -p 19530:19530 -v /tmp/milvus/db:/opt/milvus/db milvusdb/milvus:0.8.0-gpu-d041520-464400 ``` 4. 更改目录权限。 diff --git a/shards/all_in_one/all_in_one.yml b/shards/all_in_one/all_in_one.yml index 67dd3b5f600985df00f7267d6a533f32f28561dd..9ecf43792e09fa493f4d6b85c33deab012c308f3 100644 --- a/shards/all_in_one/all_in_one.yml +++ b/shards/all_in_one/all_in_one.yml @@ -3,7 +3,7 @@ services: milvus_wr: runtime: nvidia restart: always - image: milvusdb/milvus:0.7.1-gpu-d032920-3cdba5 + image: milvusdb/milvus:0.8.0-gpu-d041520-464400 ports: - "0.0.0.0:19540:19530" volumes: @@ -13,7 +13,7 @@ services: milvus_ro: runtime: nvidia restart: always - image: milvusdb/milvus:0.7.1-gpu-d032920-3cdba5 + image: milvusdb/milvus:0.8.0-gpu-d041520-464400 ports: - "0.0.0.0:19541:19530" volumes: diff --git a/shards/all_in_one/ro_server.yml b/shards/all_in_one/ro_server.yml index 64725ac02030163ed3e61dcfe6a2bfed508c12b8..00540672ada2b5e2fc9be45af1ef598c295deb59 100644 --- a/shards/all_in_one/ro_server.yml +++ b/shards/all_in_one/ro_server.yml @@ -9,7 +9,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing permissions and limitations under the License. -version: 0.2 +version: 0.3 #----------------------+------------------------------------------------------------+------------+-----------------+ # Server Config | Description | Type | Default | @@ -42,14 +42,19 @@ server_config: # | Keep 'dialect://:@:/', 'dialect' can be either 'sqlite' or | | | # | 'mysql', replace other texts with real values. | | | #----------------------+------------------------------------------------------------+------------+-----------------+ -# preload_table | A comma-separated list of table names that need to be pre- | StringList | | -# | loaded when Milvus server starts up. | | | +# preload_collection | A comma-separated list of collection names that need to | StringList | | +# | be pre-loaded when Milvus server starts up. | | | # | '*' means preload all existing tables (single-quote or | | | # | double-quote required). | | | #----------------------+------------------------------------------------------------+------------+-----------------+ +# auto_flush_interval | The interval, in seconds, at which Milvus automatically | Integer | 1 (s) | +# | flushes data to disk. | | | +# | 0 means disable the regular flush. | | | +#----------------------+------------------------------------------------------------+------------+-----------------+ db_config: backend_url: sqlite://:@:/ - preload_table: + preload_collection: + auto_flush_interval: 1 #----------------------+------------------------------------------------------------+------------+-----------------+ # Storage Config | Description | Type | Default | diff --git a/shards/all_in_one/wr_server.yml b/shards/all_in_one/wr_server.yml index d13e945b7199cd06b10eb994a965fbd453f24e7e..4127fc9ece392a469079a98c2c1d108852066b00 100644 --- a/shards/all_in_one/wr_server.yml +++ b/shards/all_in_one/wr_server.yml @@ -9,7 +9,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing permissions and limitations under the License. -version: 0.2 +version: 0.3 #----------------------+------------------------------------------------------------+------------+-----------------+ # Server Config | Description | Type | Default | @@ -42,14 +42,19 @@ server_config: # | Keep 'dialect://:@:/', 'dialect' can be either 'sqlite' or | | | # | 'mysql', replace other texts with real values. | | | #----------------------+------------------------------------------------------------+------------+-----------------+ -# preload_table | A comma-separated list of table names that need to be pre- | StringList | | -# | loaded when Milvus server starts up. | | | +# preload_collection | A comma-separated list of collection names that need to | StringList | | +# | be pre-loaded when Milvus server starts up. | | | # | '*' means preload all existing tables (single-quote or | | | # | double-quote required). | | | #----------------------+------------------------------------------------------------+------------+-----------------+ +# auto_flush_interval | The interval, in seconds, at which Milvus automatically | Integer | 1 (s) | +# | flushes data to disk. | | | +# | 0 means disable the regular flush. | | | +#----------------------+------------------------------------------------------------+------------+-----------------+ db_config: backend_url: sqlite://:@:/ - preload_table: + preload_collection: + auto_flush_interval: 1 #----------------------+------------------------------------------------------------+------------+-----------------+ # Storage Config | Description | Type | Default | diff --git a/shards/all_in_one_with_mysql/all_in_one.yml b/shards/all_in_one_with_mysql/all_in_one.yml index a1d6a199ae46e4b21250441cda1be4252acda82f..dfa71b5d983587e7d989b19b6b63ce5dd335b107 100644 --- a/shards/all_in_one_with_mysql/all_in_one.yml +++ b/shards/all_in_one_with_mysql/all_in_one.yml @@ -18,7 +18,7 @@ services: milvus_wr: runtime: nvidia restart: always - image: milvusdb/milvus:0.7.1-gpu-d032920-3cdba5 + image: milvusdb/milvus:0.8.0-gpu-d041520-464400 volumes: - /tmp/milvus/db:/var/lib/milvus/db - ./wr_server.yml:/var/lib/milvus/conf/server_config.yaml @@ -29,7 +29,7 @@ services: milvus_ro: runtime: nvidia restart: always - image: milvusdb/milvus:0.7.1-gpu-d032920-3cdba5 + image: milvusdb/milvus:0.8.0-gpu-d041520-464400 volumes: - /tmp/milvus/db:/var/lib/milvus/db - ./ro_server.yml:/var/lib/milvus/conf/server_config.yaml diff --git a/shards/all_in_one_with_mysql/ro_server.yml b/shards/all_in_one_with_mysql/ro_server.yml index 26121d0560b59559033be1fab58d7fd06af4c860..2de7edbed73c3e1c46cc8604ba9d3404660a4cc7 100644 --- a/shards/all_in_one_with_mysql/ro_server.yml +++ b/shards/all_in_one_with_mysql/ro_server.yml @@ -9,7 +9,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing permissions and limitations under the License. -version: 0.2 +version: 0.3 #----------------------+------------------------------------------------------------+------------+-----------------+ # Server Config | Description | Type | Default | @@ -42,14 +42,19 @@ server_config: # | Keep 'dialect://:@:/', 'dialect' can be either 'sqlite' or | | | # | 'mysql', replace other texts with real values. | | | #----------------------+------------------------------------------------------------+------------+-----------------+ -# preload_table | A comma-separated list of table names that need to be pre- | StringList | | -# | loaded when Milvus server starts up. | | | +# preload_collection | A comma-separated list of collection names that need to | StringList | | +# | be pre-loaded when Milvus server starts up. | | | # | '*' means preload all existing tables (single-quote or | | | # | double-quote required). | | | #----------------------+------------------------------------------------------------+------------+-----------------+ +# auto_flush_interval | The interval, in seconds, at which Milvus automatically | Integer | 1 (s) | +# | flushes data to disk. | | | +# | 0 means disable the regular flush. | | | +#----------------------+------------------------------------------------------------+------------+-----------------+ db_config: backend_url: mysql://root:milvusroot@milvus-mysql:3306/milvus - preload_table: + preload_collection: + auto_flush_interval: 1 #----------------------+------------------------------------------------------------+------------+-----------------+ # Storage Config | Description | Type | Default | diff --git a/shards/all_in_one_with_mysql/wr_server.yml b/shards/all_in_one_with_mysql/wr_server.yml index e6fa55fbdd1bdccc1c108d3e2baf192ebe3cfed5..40e0b0b71e117eb8d3f89d021eed97bfa810a0e6 100644 --- a/shards/all_in_one_with_mysql/wr_server.yml +++ b/shards/all_in_one_with_mysql/wr_server.yml @@ -9,7 +9,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing permissions and limitations under the License. -version: 0.2 +version: 0.3 #----------------------+------------------------------------------------------------+------------+-----------------+ # Server Config | Description | Type | Default | @@ -42,14 +42,19 @@ server_config: # | Keep 'dialect://:@:/', 'dialect' can be either 'sqlite' or | | | # | 'mysql', replace other texts with real values. | | | #----------------------+------------------------------------------------------------+------------+-----------------+ -# preload_table | A comma-separated list of table names that need to be pre- | StringList | | -# | loaded when Milvus server starts up. | | | +# preload_collection | A comma-separated list of collection names that need to | StringList | | +# | be pre-loaded when Milvus server starts up. | | | # | '*' means preload all existing tables (single-quote or | | | # | double-quote required). | | | #----------------------+------------------------------------------------------------+------------+-----------------+ +# auto_flush_interval | The interval, in seconds, at which Milvus automatically | Integer | 1 (s) | +# | flushes data to disk. | | | +# | 0 means disable the regular flush. | | | +#----------------------+------------------------------------------------------------+------------+-----------------+ db_config: backend_url: mysql://root:milvusroot@milvus-mysql:3306/milvus - preload_table: + preload_collection: + auto_flush_interval: 1 #----------------------+------------------------------------------------------------+------------+-----------------+ # Storage Config | Description | Type | Default | diff --git a/shards/mishards/exception_codes.py b/shards/mishards/exception_codes.py index 7c06f445fd303aa012ea26b0280b7a1b5d9b6d60..d0451c9ecd138adf12b5bfe39a3dfd58f5cecd31 100644 --- a/shards/mishards/exception_codes.py +++ b/shards/mishards/exception_codes.py @@ -4,7 +4,7 @@ CONNECT_ERROR_CODE = 10001 CONNECTTION_NOT_FOUND_CODE = 10002 DB_ERROR_CODE = 10003 -TABLE_NOT_FOUND_CODE = 20001 +COLLECTION_NOT_FOUND_CODE = 20001 INVALID_ARGUMENT_CODE = 20002 INVALID_DATE_RANGE_CODE = 20003 INVALID_TOPK_CODE = 20004 diff --git a/shards/mishards/exception_handlers.py b/shards/mishards/exception_handlers.py index e7e0bfc4cb33e7c8213940b46995d03c56727ba8..77cc99f22d9290d7d052461a76338038288afe7a 100644 --- a/shards/mishards/exception_handlers.py +++ b/shards/mishards/exception_handlers.py @@ -30,23 +30,23 @@ def resp_handler(err, error_code): ids=[], distances=[]) - if resp_class == milvus_pb2.TableRowCount: - return resp_class(status=status, table_row_count=-1) + if resp_class == milvus_pb2.CollectionRowCount: + return resp_class(status=status, collection_row_count=-1) - if resp_class == milvus_pb2.TableName: - return resp_class(status=status, table_name=[]) + if resp_class == milvus_pb2.CollectionName: + return resp_class(status=status, collection_name=[]) if resp_class == milvus_pb2.StringReply: return resp_class(status=status, string_reply='') - if resp_class == milvus_pb2.TableSchema: - return milvus_pb2.TableSchema( + if resp_class == milvus_pb2.CollectionSchema: + return milvus_pb2.CollectionSchema( status=status ) if resp_class == milvus_pb2.IndexParam: return milvus_pb2.IndexParam( - table_name=milvus_pb2.TableName( + collection_name=milvus_pb2.CollectionName( status=status ) ) @@ -55,10 +55,10 @@ def resp_handler(err, error_code): return status -@server.errorhandler(exceptions.TableNotFoundError) -def TableNotFoundErrorHandler(err): +@server.errorhandler(exceptions.CollectionNotFoundError) +def CollectionNotFoundErrorHandler(err): logger.error(err) - return resp_handler(err, status_pb2.TABLE_NOT_EXISTS) + return resp_handler(err, status_pb2.COLLECTION_NOT_EXISTS) @server.errorhandler(exceptions.InvalidTopKError) diff --git a/shards/mishards/exceptions.py b/shards/mishards/exceptions.py index 1661b06263a7fd1d29d9cf5da42524d40771b5e7..f254f6a68c13de3ce728aa5ab210ff676c6c5050 100644 --- a/shards/mishards/exceptions.py +++ b/shards/mishards/exceptions.py @@ -22,8 +22,8 @@ class DBError(BaseException): code = codes.DB_ERROR_CODE -class TableNotFoundError(BaseException): - code = codes.TABLE_NOT_FOUND_CODE +class CollectionNotFoundError(BaseException): + code = codes.COLLECTION_NOT_FOUND_CODE class InvalidTopKError(BaseException): diff --git a/shards/mishards/grpc_utils/grpc_args_parser.py b/shards/mishards/grpc_utils/grpc_args_parser.py index 73c427744af078a96cdc45e32a3be124d16e3394..67ca043b7f135a3c5e72b51fb3010b19e030fede 100644 --- a/shards/mishards/grpc_utils/grpc_args_parser.py +++ b/shards/mishards/grpc_utils/grpc_args_parser.py @@ -20,25 +20,25 @@ class GrpcArgsParser(object): @classmethod @error_status - def parse_proto_TableSchema(cls, param): - _table_schema = { - 'collection_name': param.table_name, + def parse_proto_CollectionSchema(cls, param): + _collection_schema = { + 'collection_name': param.collection_name, 'dimension': param.dimension, 'index_file_size': param.index_file_size, 'metric_type': param.metric_type } - return param.status, _table_schema + return param.status, _collection_schema @classmethod @error_status - def parse_proto_TableName(cls, param): - return param.table_name + def parse_proto_CollectionName(cls, param): + return param.collection_name @classmethod @error_status def parse_proto_FlushParam(cls, param): - return list(param.table_name_array) + return list(param.collection_name_array) @classmethod @error_status @@ -53,7 +53,7 @@ class GrpcArgsParser(object): @classmethod @error_status def parse_proto_IndexParam(cls, param): - _table_name = param.table_name + _collection_name = param.collection_name _index_type = param.index_type _index_param = {} @@ -61,7 +61,7 @@ class GrpcArgsParser(object): if params.key == 'params': _index_param = ujson.loads(str(params.value)) - return _table_name, _index_type, _index_param + return _collection_name, _index_type, _index_param @classmethod @error_status @@ -77,15 +77,15 @@ class GrpcArgsParser(object): @classmethod def parse_proto_PartitionParam(cls, param): - _table_name = param.table_name + _collection_name = param.collection_name _tag = param.tag - return _table_name, _tag + return _collection_name, _tag @classmethod @error_status def parse_proto_SearchParam(cls, param): - _table_name = param.table_name + _collection_name = param.collection_name _topk = param.topk if len(param.extra_params) == 0: @@ -102,28 +102,28 @@ class GrpcArgsParser(object): else: raise Exception("Search argument parse error: record array is empty") - return _table_name, _query_record_array, _topk, _params + return _collection_name, _query_record_array, _topk, _params @classmethod @error_status def parse_proto_DeleteByIDParam(cls, param): - _table_name = param.table_name + _collection_name = param.collection_name _id_array = list(param.id_array) - return _table_name, _id_array + return _collection_name, _id_array @classmethod @error_status def parse_proto_VectorIdentity(cls, param): - _table_name = param.table_name + _collection_name = param.collection_name _id = param.id - return _table_name, _id + return _collection_name, _id @classmethod @error_status def parse_proto_GetVectorIDsParam(cls, param): - _table__name = param.table_name + _collection__name = param.collection_name _segment_name = param.segment_name - return _table__name, _segment_name + return _collection__name, _segment_name diff --git a/shards/mishards/grpc_utils/grpc_args_wrapper.py b/shards/mishards/grpc_utils/grpc_args_wrapper.py index 7447dbd99558a5ea97ab700cc01db0fcac0ee38b..f61ea0fa808fac903efbd1b2004991c02b7f184a 100644 --- a/shards/mishards/grpc_utils/grpc_args_wrapper.py +++ b/shards/mishards/grpc_utils/grpc_args_wrapper.py @@ -1,4 +1,4 @@ # class GrpcArgsWrapper(object): # @classmethod -# def proto_TableName(cls): +# def proto_CollectionName(cls): diff --git a/shards/mishards/router/__init__.py b/shards/mishards/router/__init__.py index 2567682fdaa4e3c35629e85499dd557a57925388..4c0fc815da3d22c49fc79031737ccd698089d85c 100644 --- a/shards/mishards/router/__init__.py +++ b/shards/mishards/router/__init__.py @@ -6,7 +6,7 @@ class RouterMixin: self.writable_topo = writable_topo self.readonly_topo = readonly_topo - def routing(self, table_name, metadata=None, **kwargs): + def routing(self, collection_name, metadata=None, **kwargs): raise NotImplemented() def connection(self, metadata=None): diff --git a/shards/mishards/router/plugins/file_based_hash_ring_router.py b/shards/mishards/router/plugins/file_based_hash_ring_router.py index 3f945a837a36275318808e0bb2b24d6fb5497f3e..d4c66cce647c0fc2401186c177ba10d57c72c7f4 100644 --- a/shards/mishards/router/plugins/file_based_hash_ring_router.py +++ b/shards/mishards/router/plugins/file_based_hash_ring_router.py @@ -16,53 +16,53 @@ class Factory(RouterMixin): super(Factory, self).__init__(writable_topo=writable_topo, readonly_topo=readonly_topo) - def routing(self, table_name, partition_tags=None, metadata=None, **kwargs): + def routing(self, collection_name, partition_tags=None, metadata=None, **kwargs): range_array = kwargs.pop('range_array', None) - return self._route(table_name, range_array, partition_tags, metadata, **kwargs) + return self._route(collection_name, range_array, partition_tags, metadata, **kwargs) - def _route(self, table_name, range_array, partition_tags=None, metadata=None, **kwargs): + def _route(self, collection_name, range_array, partition_tags=None, metadata=None, **kwargs): # PXU TODO: Implement Thread-local Context # PXU TODO: Session life mgt if not partition_tags: cond = and_( - or_(Tables.table_id == table_name, Tables.owner_table == table_name), + or_(Tables.table_id == collection_name, Tables.owner_table == collection_name), Tables.state != Tables.TO_DELETE) else: # TODO: collection default partition is '_default' cond = and_(Tables.state != Tables.TO_DELETE, - Tables.owner_table == table_name, + Tables.owner_table == collection_name, Tables.partition_tag.in_(partition_tags)) if '_default' in partition_tags: - default_par_cond = and_(Tables.table_id == table_name, Tables.state != Tables.TO_DELETE) + default_par_cond = and_(Tables.table_id == collection_name, Tables.state != Tables.TO_DELETE) cond = or_(cond, default_par_cond) try: - tables = db.Session.query(Tables).filter(cond).all() + collections = db.Session.query(Tables).filter(cond).all() except sqlalchemy_exc.SQLAlchemyError as e: raise exceptions.DBError(message=str(e), metadata=metadata) - if not tables: - logger.error("Cannot find table {} / {} in metadata".format(table_name, partition_tags)) - raise exceptions.TableNotFoundError('{}:{}'.format(table_name, partition_tags), metadata=metadata) + if not collections: + logger.error("Cannot find collection {} / {} in metadata".format(collection_name, partition_tags)) + raise exceptions.CollectionNotFoundError('{}:{}'.format(collection_name, partition_tags), metadata=metadata) - table_list = [str(table.table_id) for table in tables] + collection_list = [str(collection.table_id) for collection in collections] file_type_cond = or_( TableFiles.file_type == TableFiles.FILE_TYPE_RAW, TableFiles.file_type == TableFiles.FILE_TYPE_TO_INDEX, TableFiles.file_type == TableFiles.FILE_TYPE_INDEX, ) - file_cond = and_(file_type_cond, TableFiles.table_id.in_(table_list)) + file_cond = and_(file_type_cond, TableFiles.table_id.in_(collection_list)) try: files = db.Session.query(TableFiles).filter(file_cond).all() except sqlalchemy_exc.SQLAlchemyError as e: raise exceptions.DBError(message=str(e), metadata=metadata) if not files: - logger.warning("Table file is empty. {}".format(table_list)) - # logger.error("Cannot find table file id {} / {} in metadata".format(table_name, partition_tags)) - # raise exceptions.TableNotFoundError('Table file id not found. {}:{}'.format(table_name, partition_tags), - # metadata=metadata) + logger.warning("Collection file is empty. {}".format(collection_list)) + # logger.error("Cannot find collection file id {} / {} in metadata".format(collection_name, partition_tags)) + # raise exceptions.CollectionNotFoundError('Collection file id not found. {}:{}'.format(collection_name, partition_tags), + # metadata=metadata) db.remove_session() diff --git a/shards/mishards/service_handler.py b/shards/mishards/service_handler.py index b8adfb0c891415219a0a37c257ea530011c0011a..e43125a6a722e24841b1bdef54cb4030c7f2c9af 100644 --- a/shards/mishards/service_handler.py +++ b/shards/mishards/service_handler.py @@ -25,7 +25,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): MAX_TOPK = 2048 def __init__(self, tracer, router, max_workers=multiprocessing.cpu_count(), **kwargs): - self.table_meta = {} + self.collection_meta = {} self.error_handlers = {} self.tracer = tracer self.router = router @@ -106,8 +106,8 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): def _do_query(self, context, - table_id, - table_meta, + collection_id, + collection_meta, vectors, topk, search_params, @@ -119,7 +119,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): p_span = None if self.tracer.empty else context.get_active_span( ).context with self.tracer.start_span('get_routing', child_of=p_span): - routing = self.router.routing(table_id, + routing = self.router.routing(collection_id, partition_tags=partition_tags, metadata=metadata) logger.info('Routing: {}'.format(routing)) @@ -129,10 +129,10 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): rs = [] all_topk_results = [] - def search(addr, table_id, file_ids, vectors, topk, params, **kwargs): + def search(addr, collection_id, file_ids, vectors, topk, params, **kwargs): logger.info( - 'Send Search Request: addr={};table_id={};ids={};nq={};topk={};params={}' - .format(addr, table_id, file_ids, len(vectors), topk, params)) + 'Send Search Request: addr={};collection_id={};ids={};nq={};topk={};params={}' + .format(addr, collection_id, file_ids, len(vectors), topk, params)) conn = self.router.query_conn(addr, metadata=metadata) start = time.time() @@ -142,7 +142,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): with self.tracer.start_span('search_{}'.format(addr), child_of=span): - ret = conn.conn.search_vectors_in_files(collection_name=table_id, + ret = conn.conn.search_vectors_in_files(collection_name=collection_id, file_ids=file_ids, query_records=vectors, top_k=topk, @@ -158,7 +158,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): for addr, file_ids in routing.items(): res = pool.submit(search, addr, - table_id, + collection_id, file_ids, vectors, topk, @@ -169,51 +169,51 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): for res in rs: res.result() - reverse = table_meta.metric_type == Types.MetricType.IP + reverse = collection_meta.metric_type == Types.MetricType.IP with self.tracer.start_span('do_merge', child_of=p_span): return self._do_merge(all_topk_results, topk, reverse=reverse, metadata=metadata) - def _create_table(self, table_schema): - return self.router.connection().create_collection(table_schema) + def _create_collection(self, collection_schema): + return self.router.connection().create_collection(collection_schema) @mark_grpc_method - def CreateTable(self, request, context): - _status, unpacks = Parser.parse_proto_TableSchema(request) + def CreateCollection(self, request, context): + _status, unpacks = Parser.parse_proto_CollectionSchema(request) if not _status.OK(): return status_pb2.Status(error_code=_status.code, reason=_status.message) - _status, _table_schema = unpacks + _status, _collection_schema = unpacks # if _status.error_code != 0: - # logging.warning('[CreateTable] table schema error occurred: {}'.format(_status)) + # logging.warning('[CreateCollection] collection schema error occurred: {}'.format(_status)) # return _status - logger.info('CreateTable {}'.format(_table_schema['collection_name'])) + logger.info('CreateCollection {}'.format(_collection_schema['collection_name'])) - _status = self._create_table(_table_schema) + _status = self._create_collection(_collection_schema) return status_pb2.Status(error_code=_status.code, reason=_status.message) - def _has_table(self, table_name, metadata=None): - return self.router.connection(metadata=metadata).has_collection(table_name) + def _has_collection(self, collection_name, metadata=None): + return self.router.connection(metadata=metadata).has_collection(collection_name) @mark_grpc_method - def HasTable(self, request, context): - _status, _table_name = Parser.parse_proto_TableName(request) + def HasCollection(self, request, context): + _status, _collection_name = Parser.parse_proto_CollectionName(request) if not _status.OK(): return milvus_pb2.BoolReply(status=status_pb2.Status( error_code=_status.code, reason=_status.message), bool_reply=False) - logger.info('HasTable {}'.format(_table_name)) + logger.info('HasCollection {}'.format(_collection_name)) - _status, _bool = self._has_table(_table_name, + _status, _bool = self._has_collection(_collection_name, metadata={'resp_class': milvus_pb2.BoolReply}) return milvus_pb2.BoolReply(status=status_pb2.Status( @@ -222,55 +222,55 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): @mark_grpc_method def CreatePartition(self, request, context): - _table_name, _tag = Parser.parse_proto_PartitionParam(request) - _status = self.router.connection().create_partition(_table_name, _tag) + _collection_name, _tag = Parser.parse_proto_PartitionParam(request) + _status = self.router.connection().create_partition(_collection_name, _tag) return status_pb2.Status(error_code=_status.code, reason=_status.message) @mark_grpc_method def DropPartition(self, request, context): - _table_name, _tag = Parser.parse_proto_PartitionParam(request) + _collection_name, _tag = Parser.parse_proto_PartitionParam(request) - _status = self.router.connection().drop_partition(_table_name, _tag) + _status = self.router.connection().drop_partition(_collection_name, _tag) return status_pb2.Status(error_code=_status.code, reason=_status.message) @mark_grpc_method def ShowPartitions(self, request, context): - _status, _table_name = Parser.parse_proto_TableName(request) + _status, _collection_name = Parser.parse_proto_CollectionName(request) if not _status.OK(): return milvus_pb2.PartitionList(status=status_pb2.Status( error_code=_status.code, reason=_status.message), partition_array=[]) - logger.info('ShowPartitions {}'.format(_table_name)) + logger.info('ShowPartitions {}'.format(_collection_name)) - _status, partition_array = self.router.connection().show_partitions(_table_name) + _status, partition_array = self.router.connection().show_partitions(_collection_name) return milvus_pb2.PartitionList(status=status_pb2.Status( error_code=_status.code, reason=_status.message), partition_tag_array=[param.tag for param in partition_array]) - def _delete_table(self, table_name): - return self.router.connection().drop_collection(table_name) + def _drop_collection(self, collection_name): + return self.router.connection().drop_collection(collection_name) @mark_grpc_method - def DropTable(self, request, context): - _status, _table_name = Parser.parse_proto_TableName(request) + def DropCollection(self, request, context): + _status, _collection_name = Parser.parse_proto_CollectionName(request) if not _status.OK(): return status_pb2.Status(error_code=_status.code, reason=_status.message) - logger.info('DropTable {}'.format(_table_name)) + logger.info('DropCollection {}'.format(_collection_name)) - _status = self._delete_table(_table_name) + _status = self._drop_collection(_collection_name) return status_pb2.Status(error_code=_status.code, reason=_status.message) - def _create_index(self, table_name, index_type, param): - return self.router.connection().create_index(table_name, index_type, param) + def _create_index(self, collection_name, index_type, param): + return self.router.connection().create_index(collection_name, index_type, param) @mark_grpc_method def CreateIndex(self, request, context): @@ -280,12 +280,12 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): return status_pb2.Status(error_code=_status.code, reason=_status.message) - _table_name, _index_type, _index_param = unpacks + _collection_name, _index_type, _index_param = unpacks - logger.info('CreateIndex {}'.format(_table_name)) + logger.info('CreateIndex {}'.format(_collection_name)) - # TODO: interface create_table incompleted - _status = self._create_index(_table_name, _index_type, _index_param) + # TODO: interface create_collection incompleted + _status = self._create_index(_collection_name, _index_type, _index_param) return status_pb2.Status(error_code=_status.code, reason=_status.message) @@ -309,7 +309,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): metadata = {'resp_class': milvus_pb2.TopKQueryResult} - table_name = request.table_name + collection_name = request.collection_name topk = request.topk @@ -318,7 +318,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): params = ujson.loads(str(request.extra_params[0].value)) logger.info('Search {}: topk={} params={}'.format( - table_name, topk, params)) + collection_name, topk, params)) # if nprobe > self.MAX_NPROBE or nprobe <= 0: # raise exceptions.InvalidArgumentError( @@ -328,22 +328,22 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): raise exceptions.InvalidTopKError( message='Invalid topk: {}'.format(topk), metadata=metadata) - table_meta = self.table_meta.get(table_name, None) + collection_meta = self.collection_meta.get(collection_name, None) - if not table_meta: + if not collection_meta: status, info = self.router.connection( - metadata=metadata).describe_collection(table_name) + metadata=metadata).describe_collection(collection_name) if not status.OK(): - raise exceptions.TableNotFoundError(table_name, + raise exceptions.CollectionNotFoundError(collection_name, metadata=metadata) - self.table_meta[table_name] = info - table_meta = info + self.collection_meta[collection_name] = info + collection_meta = info start = time.time() query_record_array = [] - if int(table_meta.metric_type) >= MetricType.HAMMING.value: + if int(collection_meta.metric_type) >= MetricType.HAMMING.value: for query_record in request.query_record_array: query_record_array.append(bytes(query_record.binary_data)) else: @@ -351,8 +351,8 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): query_record_array.append(list(query_record.float_data)) status, id_results, dis_results = self._do_query(context, - table_name, - table_meta, + collection_name, + collection_meta, query_record_array, topk, params, @@ -374,57 +374,57 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): def SearchInFiles(self, request, context): raise NotImplemented() - def _describe_table(self, table_name, metadata=None): - return self.router.connection(metadata=metadata).describe_collection(table_name) + def _describe_collection(self, collection_name, metadata=None): + return self.router.connection(metadata=metadata).describe_collection(collection_name) @mark_grpc_method - def DescribeTable(self, request, context): - _status, _table_name = Parser.parse_proto_TableName(request) + def DescribeCollection(self, request, context): + _status, _collection_name = Parser.parse_proto_CollectionName(request) if not _status.OK(): - return milvus_pb2.TableSchema(status=status_pb2.Status( + return milvus_pb2.CollectionSchema(status=status_pb2.Status( error_code=_status.code, reason=_status.message), ) - metadata = {'resp_class': milvus_pb2.TableSchema} + metadata = {'resp_class': milvus_pb2.CollectionSchema} - logger.info('DescribeTable {}'.format(_table_name)) - _status, _table = self._describe_table(metadata=metadata, - table_name=_table_name) + logger.info('DescribeCollection {}'.format(_collection_name)) + _status, _collection = self._describe_collection(metadata=metadata, + collection_name=_collection_name) if _status.OK(): - return milvus_pb2.TableSchema( - table_name=_table_name, - index_file_size=_table.index_file_size, - dimension=_table.dimension, - metric_type=_table.metric_type, + return milvus_pb2.CollectionSchema( + collection_name=_collection_name, + index_file_size=_collection.index_file_size, + dimension=_collection.dimension, + metric_type=_collection.metric_type, status=status_pb2.Status(error_code=_status.code, reason=_status.message), ) - return milvus_pb2.TableSchema( - table_name=_table_name, + return milvus_pb2.CollectionSchema( + collection_name=_collection_name, status=status_pb2.Status(error_code=_status.code, reason=_status.message), ) - def _table_info(self, table_name, metadata=None): - return self.router.connection(metadata=metadata).collection_info(table_name) + def _collection_info(self, collection_name, metadata=None): + return self.router.connection(metadata=metadata).collection_info(collection_name) @mark_grpc_method - def ShowTableInfo(self, request, context): - _status, _table_name = Parser.parse_proto_TableName(request) + def ShowCollectionInfo(self, request, context): + _status, _collection_name = Parser.parse_proto_CollectionName(request) if not _status.OK(): - return milvus_pb2.TableInfo(status=status_pb2.Status( + return milvus_pb2.CollectionInfo(status=status_pb2.Status( error_code=_status.code, reason=_status.message), ) - metadata = {'resp_class': milvus_pb2.TableInfo} + metadata = {'resp_class': milvus_pb2.CollectionInfo} - logger.info('ShowTableInfo {}'.format(_table_name)) - _status, _info = self._table_info(metadata=metadata, table_name=_table_name) + logger.info('ShowCollectionInfo {}'.format(_collection_name)) + _status, _info = self._collection_info(metadata=metadata, collection_name=_collection_name) if _status.OK(): - _table_info = milvus_pb2.TableInfo( + _collection_info = milvus_pb2.CollectionInfo( status=status_pb2.Status(error_code=_status.code, reason=_status.message), total_row_count=_info.count @@ -443,37 +443,37 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): data_size=seg_stat.data_size, ) - _table_info.partitions_stat.append(_par) - return _table_info + _collection_info.partitions_stat.append(_par) + return _collection_info - return milvus_pb2.TableInfo( + return milvus_pb2.CollectionInfo( status=status_pb2.Status(error_code=_status.code, reason=_status.message), ) - def _count_table(self, table_name, metadata=None): + def _count_collection(self, collection_name, metadata=None): return self.router.connection( - metadata=metadata).count_collection(table_name) + metadata=metadata).count_collection(collection_name) @mark_grpc_method - def CountTable(self, request, context): - _status, _table_name = Parser.parse_proto_TableName(request) + def CountCollection(self, request, context): + _status, _collection_name = Parser.parse_proto_CollectionName(request) if not _status.OK(): status = status_pb2.Status(error_code=_status.code, reason=_status.message) - return milvus_pb2.TableRowCount(status=status) + return milvus_pb2.CollectionRowCount(status=status) - logger.info('CountTable {}'.format(_table_name)) + logger.info('CountCollection {}'.format(_collection_name)) - metadata = {'resp_class': milvus_pb2.TableRowCount} - _status, _count = self._count_table(_table_name, metadata=metadata) + metadata = {'resp_class': milvus_pb2.CollectionRowCount} + _status, _count = self._count_collection(_collection_name, metadata=metadata) - return milvus_pb2.TableRowCount( + return milvus_pb2.CollectionRowCount( status=status_pb2.Status(error_code=_status.code, reason=_status.message), - table_row_count=_count if isinstance(_count, int) else -1) + collection_row_count=_count if isinstance(_count, int) else -1) def _get_server_version(self, metadata=None): return self.router.connection(metadata=metadata).server_version() @@ -509,41 +509,41 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): error_code=_status.code, reason=_status.message), string_reply=_reply) - def _show_tables(self, metadata=None): + def _show_collections(self, metadata=None): return self.router.connection(metadata=metadata).show_collections() @mark_grpc_method - def ShowTables(self, request, context): - logger.info('ShowTables') - metadata = {'resp_class': milvus_pb2.TableName} - _status, _results = self._show_tables(metadata=metadata) + def ShowCollections(self, request, context): + logger.info('ShowCollections') + metadata = {'resp_class': milvus_pb2.CollectionName} + _status, _results = self._show_collections(metadata=metadata) - return milvus_pb2.TableNameList(status=status_pb2.Status( + return milvus_pb2.CollectionNameList(status=status_pb2.Status( error_code=_status.code, reason=_status.message), - table_names=_results) + collection_names=_results) - def _preload_table(self, table_name): - return self.router.connection().preload_collection(table_name) + def _preload_collection(self, collection_name): + return self.router.connection().preload_collection(collection_name) @mark_grpc_method - def PreloadTable(self, request, context): - _status, _table_name = Parser.parse_proto_TableName(request) + def PreloadCollection(self, request, context): + _status, _collection_name = Parser.parse_proto_CollectionName(request) if not _status.OK(): return status_pb2.Status(error_code=_status.code, reason=_status.message) - logger.info('PreloadTable {}'.format(_table_name)) - _status = self._preload_table(_table_name) + logger.info('PreloadCollection {}'.format(_collection_name)) + _status = self._preload_collection(_collection_name) return status_pb2.Status(error_code=_status.code, reason=_status.message) - def _describe_index(self, table_name, metadata=None): - return self.router.connection(metadata=metadata).describe_index(table_name) + def _describe_index(self, collection_name, metadata=None): + return self.router.connection(metadata=metadata).describe_index(collection_name) @mark_grpc_method def DescribeIndex(self, request, context): - _status, _table_name = Parser.parse_proto_TableName(request) + _status, _collection_name = Parser.parse_proto_CollectionName(request) if not _status.OK(): return milvus_pb2.IndexParam(status=status_pb2.Status( @@ -551,8 +551,8 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): metadata = {'resp_class': milvus_pb2.IndexParam} - logger.info('DescribeIndex {}'.format(_table_name)) - _status, _index_param = self._describe_index(table_name=_table_name, + logger.info('DescribeIndex {}'.format(_collection_name)) + _status, _index_param = self._describe_index(collection_name=_collection_name, metadata=metadata) if not _index_param: @@ -563,13 +563,13 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): grpc_index = milvus_pb2.IndexParam(status=status_pb2.Status( error_code=_status.code, reason=_status.message), - table_name=_table_name, index_type=_index_type) + collection_name=_collection_name, index_type=_index_type) grpc_index.extra_params.add(key='params', value=ujson.dumps(_index_param._params)) return grpc_index - def _get_vector_by_id(self, table_name, vec_id, metadata): - return self.router.connection(metadata=metadata).get_vector_by_id(table_name, vec_id) + def _get_vector_by_id(self, collection_name, vec_id, metadata): + return self.router.connection(metadata=metadata).get_vector_by_id(collection_name, vec_id) @mark_grpc_method def GetVectorByID(self, request, context): @@ -580,9 +580,9 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): metadata = {'resp_class': milvus_pb2.VectorData} - _table_name, _id = unpacks - logger.info('GetVectorByID {}'.format(_table_name)) - _status, vector = self._get_vector_by_id(_table_name, _id, metadata) + _collection_name, _id = unpacks + logger.info('GetVectorByID {}'.format(_collection_name)) + _status, vector = self._get_vector_by_id(_collection_name, _id, metadata) if not vector: return milvus_pb2.VectorData(status=status_pb2.Status( @@ -598,8 +598,8 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): vector_data=records ) - def _get_vector_ids(self, table_name, segment_name, metadata): - return self.router.connection(metadata=metadata).get_vector_ids(table_name, segment_name) + def _get_vector_ids(self, collection_name, segment_name, metadata): + return self.router.connection(metadata=metadata).get_vector_ids(collection_name, segment_name) @mark_grpc_method def GetVectorIDs(self, request, context): @@ -611,9 +611,9 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): metadata = {'resp_class': milvus_pb2.VectorIds} - _table_name, _segment_name = unpacks - logger.info('GetVectorIDs {}'.format(_table_name)) - _status, ids = self._get_vector_ids(_table_name, _segment_name, metadata) + _collection_name, _segment_name = unpacks + logger.info('GetVectorIDs {}'.format(_collection_name)) + _status, ids = self._get_vector_ids(_collection_name, _segment_name, metadata) if not ids: return milvus_pb2.VectorIds(status=status_pb2.Status( @@ -624,8 +624,8 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): vector_id_array=ids ) - def _delete_by_id(self, table_name, id_array): - return self.router.connection().delete_by_id(table_name, id_array) + def _delete_by_id(self, collection_name, id_array): + return self.router.connection().delete_by_id(collection_name, id_array) @mark_grpc_method def DeleteByID(self, request, context): @@ -636,57 +636,57 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): return status_pb2.Status(error_code=_status.code, reason=_status.message) - _table_name, _ids = unpacks - logger.info('DeleteByID {}'.format(_table_name)) - _status = self._delete_by_id(_table_name, _ids) + _collection_name, _ids = unpacks + logger.info('DeleteByID {}'.format(_collection_name)) + _status = self._delete_by_id(_collection_name, _ids) return status_pb2.Status(error_code=_status.code, reason=_status.message) - def _drop_index(self, table_name): - return self.router.connection().drop_index(table_name) + def _drop_index(self, collection_name): + return self.router.connection().drop_index(collection_name) @mark_grpc_method def DropIndex(self, request, context): - _status, _table_name = Parser.parse_proto_TableName(request) + _status, _collection_name = Parser.parse_proto_CollectionName(request) if not _status.OK(): return status_pb2.Status(error_code=_status.code, reason=_status.message) - logger.info('DropIndex {}'.format(_table_name)) - _status = self._drop_index(_table_name) + logger.info('DropIndex {}'.format(_collection_name)) + _status = self._drop_index(_collection_name) return status_pb2.Status(error_code=_status.code, reason=_status.message) - def _flush(self, table_names): - return self.router.connection().flush(table_names) + def _flush(self, collection_names): + return self.router.connection().flush(collection_names) @mark_grpc_method def Flush(self, request, context): - _status, _table_names = Parser.parse_proto_FlushParam(request) + _status, _collection_names = Parser.parse_proto_FlushParam(request) if not _status.OK(): return status_pb2.Status(error_code=_status.code, reason=_status.message) - logger.info('Flush {}'.format(_table_names)) - _status = self._flush(_table_names) + logger.info('Flush {}'.format(_collection_names)) + _status = self._flush(_collection_names) return status_pb2.Status(error_code=_status.code, reason=_status.message) - def _compact(self, table_name): - return self.router.connection().compact(table_name) + def _compact(self, collection_name): + return self.router.connection().compact(collection_name) @mark_grpc_method def Compact(self, request, context): - _status, _table_name = Parser.parse_proto_TableName(request) + _status, _collection_name = Parser.parse_proto_CollectionName(request) if not _status.OK(): return status_pb2.Status(error_code=_status.code, reason=_status.message) - logger.info('Compact {}'.format(_table_name)) - _status = self._compact(_table_name) + logger.info('Compact {}'.format(_collection_name)) + _status = self._compact(_collection_name) return status_pb2.Status(error_code=_status.code, reason=_status.message) diff --git a/shards/mishards/settings.py b/shards/mishards/settings.py index 94c89c9d83d19278674ed6165f074c8da6194717..89c24ab60d4436db2e07dc8c003530e21bec9b65 100644 --- a/shards/mishards/settings.py +++ b/shards/mishards/settings.py @@ -12,7 +12,7 @@ else: env.read_env() -SERVER_VERSIONS = ['0.7.0', '0.7.1'] +SERVER_VERSIONS = ['0.8.0'] DEBUG = env.bool('DEBUG', False) MAX_RETRY = env.int('MAX_RETRY', 3) diff --git a/shards/mishards/test_server.py b/shards/mishards/test_server.py index b7a3ad370da46af5ba1b935381244c72be1b858a..f7a1a63cacbb9080be23502cb144c46a0c9a4e18 100644 --- a/shards/mishards/test_server.py +++ b/shards/mishards/test_server.py @@ -7,7 +7,7 @@ import faker import inspect from milvus import Milvus from milvus.client.types import Status, IndexType, MetricType -from milvus.client.abstract import IndexParam, TableSchema +from milvus.client.abstract import IndexParam, CollectionSchema from milvus.grpc_gen import status_pb2, milvus_pb2 from mishards import db, create_app, settings from mishards.service_handler import ServiceHandler @@ -43,55 +43,55 @@ class TestServer: assert not status.OK() def test_drop_index(self, started_app): - table_name = inspect.currentframe().f_code.co_name + collection_name = inspect.currentframe().f_code.co_name ServiceHandler._drop_index = mock.MagicMock(return_value=OK) - status = self.client.drop_index(table_name) + status = self.client.drop_index(collection_name) assert status.OK() - Parser.parse_proto_TableName = mock.MagicMock( - return_value=(BAD, table_name)) - status = self.client.drop_index(table_name) + Parser.parse_proto_CollectionName = mock.MagicMock( + return_value=(BAD, collection_name)) + status = self.client.drop_index(collection_name) assert not status.OK() def test_describe_index(self, started_app): - table_name = inspect.currentframe().f_code.co_name + collection_name = inspect.currentframe().f_code.co_name index_type = IndexType.FLAT - nlist = 1 - index_param = IndexParam(table_name=table_name, + params = {'nlist': 1} + index_param = IndexParam(collection_name=collection_name, index_type=index_type, - nlist=nlist) - Parser.parse_proto_TableName = mock.MagicMock( - return_value=(OK, table_name)) + params=params) + Parser.parse_proto_CollectionName = mock.MagicMock( + return_value=(OK, collection_name)) ServiceHandler._describe_index = mock.MagicMock( return_value=(OK, index_param)) - status, ret = self.client.describe_index(table_name) + status, ret = self.client.describe_index(collection_name) assert status.OK() - assert ret._table_name == index_param._table_name + assert ret._collection_name == index_param._collection_name - Parser.parse_proto_TableName = mock.MagicMock( - return_value=(BAD, table_name)) - status, _ = self.client.describe_index(table_name) + Parser.parse_proto_CollectionName = mock.MagicMock( + return_value=(BAD, collection_name)) + status, _ = self.client.describe_index(collection_name) assert not status.OK() def test_preload(self, started_app): - table_name = inspect.currentframe().f_code.co_name + collection_name = inspect.currentframe().f_code.co_name - Parser.parse_proto_TableName = mock.MagicMock( - return_value=(OK, table_name)) - ServiceHandler._preload_table = mock.MagicMock(return_value=OK) - status = self.client.preload_table(table_name) + Parser.parse_proto_CollectionName = mock.MagicMock( + return_value=(OK, collection_name)) + ServiceHandler._preload_collection = mock.MagicMock(return_value=OK) + status = self.client.preload_collection(collection_name) assert status.OK() - Parser.parse_proto_TableName = mock.MagicMock( - return_value=(BAD, table_name)) - status = self.client.preload_table(table_name) + Parser.parse_proto_CollectionName = mock.MagicMock( + return_value=(BAD, collection_name)) + status = self.client.preload_collection(collection_name) assert not status.OK() @pytest.mark.skip def test_delete_by_range(self, started_app): - table_name = inspect.currentframe().f_code.co_name + collection_name = inspect.currentframe().f_code.co_name - unpacked = table_name, datetime.datetime.today( + unpacked = collection_name, datetime.datetime.today( ), datetime.datetime.today() Parser.parse_proto_DeleteByRangeParam = mock.MagicMock( @@ -107,122 +107,122 @@ class TestServer: *unpacked) assert not status.OK() - def test_count_table(self, started_app): - table_name = inspect.currentframe().f_code.co_name + def test_count_collection(self, started_app): + collection_name = inspect.currentframe().f_code.co_name count = random.randint(100, 200) - Parser.parse_proto_TableName = mock.MagicMock( - return_value=(OK, table_name)) - ServiceHandler._count_table = mock.MagicMock(return_value=(OK, count)) - status, ret = self.client.get_table_row_count(table_name) + Parser.parse_proto_CollectionName = mock.MagicMock( + return_value=(OK, collection_name)) + ServiceHandler._count_collection = mock.MagicMock(return_value=(OK, count)) + status, ret = self.client.count_collection(collection_name) assert status.OK() assert ret == count - Parser.parse_proto_TableName = mock.MagicMock( - return_value=(BAD, table_name)) - status, _ = self.client.get_table_row_count(table_name) + Parser.parse_proto_CollectionName = mock.MagicMock( + return_value=(BAD, collection_name)) + status, _ = self.client.count_collection(collection_name) assert not status.OK() - def test_show_tables(self, started_app): - tables = ['t1', 't2'] - ServiceHandler._show_tables = mock.MagicMock(return_value=(OK, tables)) - status, ret = self.client.show_tables() + def test_show_collections(self, started_app): + collections = ['t1', 't2'] + ServiceHandler._show_collections = mock.MagicMock(return_value=(OK, collections)) + status, ret = self.client.show_collections() assert status.OK() - assert ret == tables + assert ret == collections - def test_describe_table(self, started_app): - table_name = inspect.currentframe().f_code.co_name + def test_describe_collection(self, started_app): + collection_name = inspect.currentframe().f_code.co_name dimension = 128 nlist = 1 - table_schema = TableSchema(table_name=table_name, + collection_schema = CollectionSchema(collection_name=collection_name, index_file_size=100, metric_type=MetricType.L2, dimension=dimension) - Parser.parse_proto_TableName = mock.MagicMock( - return_value=(OK, table_schema.table_name)) - ServiceHandler._describe_table = mock.MagicMock( - return_value=(OK, table_schema)) - status, _ = self.client.describe_table(table_name) + Parser.parse_proto_CollectionName = mock.MagicMock( + return_value=(OK, collection_schema.collection_name)) + ServiceHandler._describe_collection = mock.MagicMock( + return_value=(OK, collection_schema)) + status, _ = self.client.describe_collection(collection_name) assert status.OK() - ServiceHandler._describe_table = mock.MagicMock( - return_value=(BAD, table_schema)) - status, _ = self.client.describe_table(table_name) + ServiceHandler._describe_collection = mock.MagicMock( + return_value=(BAD, collection_schema)) + status, _ = self.client.describe_collection(collection_name) assert not status.OK() - Parser.parse_proto_TableName = mock.MagicMock(return_value=(BAD, + Parser.parse_proto_CollectionName = mock.MagicMock(return_value=(BAD, 'cmd')) - status, ret = self.client.describe_table(table_name) + status, ret = self.client.describe_collection(collection_name) assert not status.OK() def test_insert(self, started_app): - table_name = inspect.currentframe().f_code.co_name + collection_name = inspect.currentframe().f_code.co_name vectors = [[random.random() for _ in range(16)] for _ in range(10)] ids = [random.randint(1000000, 20000000) for _ in range(10)] ServiceHandler._add_vectors = mock.MagicMock(return_value=(OK, ids)) status, ret = self.client.add_vectors( - table_name=table_name, records=vectors) + collection_name=collection_name, records=vectors) assert status.OK() assert ids == ret def test_create_index(self, started_app): - table_name = inspect.currentframe().f_code.co_name - unpacks = table_name, None + collection_name = inspect.currentframe().f_code.co_name + unpacks = collection_name, None Parser.parse_proto_IndexParam = mock.MagicMock(return_value=(OK, unpacks)) ServiceHandler._create_index = mock.MagicMock(return_value=OK) - status = self.client.create_index(table_name=table_name) + status = self.client.create_index(collection_name=collection_name) assert status.OK() Parser.parse_proto_IndexParam = mock.MagicMock(return_value=(BAD, None)) - status = self.client.create_index(table_name=table_name) + status = self.client.create_index(collection_name=collection_name) assert not status.OK() - def test_drop_table(self, started_app): - table_name = inspect.currentframe().f_code.co_name + def test_drop_collection(self, started_app): + collection_name = inspect.currentframe().f_code.co_name - Parser.parse_proto_TableName = mock.MagicMock( - return_value=(OK, table_name)) - ServiceHandler._delete_table = mock.MagicMock(return_value=OK) - status = self.client.delete_table(table_name=table_name) + Parser.parse_proto_CollectionName = mock.MagicMock( + return_value=(OK, collection_name)) + ServiceHandler._drop_collection = mock.MagicMock(return_value=OK) + status = self.client.drop_collection(collection_name=collection_name) assert status.OK() - Parser.parse_proto_TableName = mock.MagicMock( - return_value=(BAD, table_name)) - status = self.client.delete_table(table_name=table_name) + Parser.parse_proto_CollectionName = mock.MagicMock( + return_value=(BAD, collection_name)) + status = self.client.drop_collection(collection_name=collection_name) assert not status.OK() - def test_has_table(self, started_app): - table_name = inspect.currentframe().f_code.co_name + def test_has_collection(self, started_app): + collection_name = inspect.currentframe().f_code.co_name - Parser.parse_proto_TableName = mock.MagicMock( - return_value=(OK, table_name)) - ServiceHandler._has_table = mock.MagicMock(return_value=(OK, True)) - has = self.client.has_table(table_name=table_name) + Parser.parse_proto_CollectionName = mock.MagicMock( + return_value=(OK, collection_name)) + ServiceHandler._has_collection = mock.MagicMock(return_value=(OK, True)) + has = self.client.has_collection(collection_name=collection_name) assert has - Parser.parse_proto_TableName = mock.MagicMock( - return_value=(BAD, table_name)) - status, has = self.client.has_table(table_name=table_name) + Parser.parse_proto_CollectionName = mock.MagicMock( + return_value=(BAD, collection_name)) + status, has = self.client.has_collection(collection_name=collection_name) assert not status.OK() assert not has - def test_create_table(self, started_app): - table_name = inspect.currentframe().f_code.co_name + def test_create_collection(self, started_app): + collection_name = inspect.currentframe().f_code.co_name dimension = 128 - table_schema = dict(table_name=table_name, + collection_schema = dict(collection_name=collection_name, index_file_size=100, metric_type=MetricType.L2, dimension=dimension) - ServiceHandler._create_table = mock.MagicMock(return_value=OK) - status = self.client.create_table(table_schema) + ServiceHandler._create_collection = mock.MagicMock(return_value=OK) + status = self.client.create_collection(collection_schema) assert status.OK() - Parser.parse_proto_TableSchema = mock.MagicMock(return_value=(BAD, + Parser.parse_proto_CollectionSchema = mock.MagicMock(return_value=(BAD, None)) - status = self.client.create_table(table_schema) + status = self.client.create_collection(collection_schema) assert not status.OK() def random_data(self, n, dimension): @@ -230,18 +230,18 @@ class TestServer: @pytest.mark.skip def test_search(self, started_app): - table_name = inspect.currentframe().f_code.co_name + collection_name = inspect.currentframe().f_code.co_name to_index_cnt = random.randint(10, 20) - table = TablesFactory(table_id=table_name, state=Tables.NORMAL) + collection = TablesFactory(collection_id=collection_name, state=Tables.NORMAL) to_index_files = TableFilesFactory.create_batch( - to_index_cnt, table=table, file_type=TableFiles.FILE_TYPE_TO_INDEX) + to_index_cnt, collection=collection, file_type=TableFiles.FILE_TYPE_TO_INDEX) topk = random.randint(5, 10) nq = random.randint(5, 10) param = { - 'table_name': table_name, - 'query_records': self.random_data(nq, table.dimension), + 'collection_name': collection_name, + 'query_records': self.random_data(nq, collection.dimension), 'top_k': topk, - 'nprobe': 2049 + 'params': {'nprobe': 2049} } result = [ @@ -255,23 +255,23 @@ class TestServer: error_code=status_pb2.SUCCESS, reason="Success"), topk_query_result=result) - table_schema = TableSchema(table_name=table_name, - index_file_size=table.index_file_size, - metric_type=table.metric_type, - dimension=table.dimension) + collection_schema = CollectionSchema(collection_name=collection_name, + index_file_size=collection.index_file_size, + metric_type=collection.metric_type, + dimension=collection.dimension) status, _ = self.client.search_vectors(**param) assert status.code == Status.ILLEGAL_ARGUMENT - param['nprobe'] = 2048 + param['params']['nprobe'] = 2048 RouterMixin.connection = mock.MagicMock(return_value=Milvus()) RouterMixin.query_conn.conn = mock.MagicMock(return_value=Milvus()) - Milvus.describe_table = mock.MagicMock(return_value=(BAD, - table_schema)) + Milvus.describe_collection = mock.MagicMock(return_value=(BAD, + collection_schema)) status, ret = self.client.search_vectors(**param) - assert status.code == Status.TABLE_NOT_EXISTS + assert status.code == Status.COLLECTION_NOT_EXISTS - Milvus.describe_table = mock.MagicMock(return_value=(OK, table_schema)) + Milvus.describe_collection = mock.MagicMock(return_value=(OK, collection_schema)) Milvus.search_vectors_in_files = mock.MagicMock( return_value=mock_results) diff --git a/shards/requirements.txt b/shards/requirements.txt index 399c6fd8453d14cef24a159a6a3536691fcb2f77..65960e127a5c5a58e575ce901f5e71222b73747d 100644 --- a/shards/requirements.txt +++ b/shards/requirements.txt @@ -14,7 +14,7 @@ py==1.8.0 pyasn1==0.4.7 pyasn1-modules==0.2.6 pylint==2.3.1 -pymilvus==0.2.9 +pymilvus==0.2.10 #pymilvus-test==0.3.3 pyparsing==2.4.0 pytest==4.6.3