未验证 提交 d91facc8 编写于 作者: C Cai Yudong 提交者: GitHub

fix collection ci (#3061)

* fix set config issue
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* add auto_flush_interval observer in DBImpl
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* fix some issues in SegmentReader
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* update CreateCollection interface
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* fix clang-format
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* update
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 412eac3e
......@@ -69,107 +69,132 @@ ConfigMgr ConfigMgr::instance;
ConfigMgr::ConfigMgr() {
config_list_ = {
/* version */
CreateStringConfig("version", false, &config.version.value, "unknown", nullptr, nullptr),
{"version", CreateStringConfig("version", false, &config.version.value, "unknown", nullptr, nullptr)},
/* cluster */
CreateBoolConfig("cluster.enable", false, &config.cluster.enable.value, false, nullptr, nullptr),
CreateEnumConfig("cluster.role", false, &ClusterRoleMap, &config.cluster.role.value, ClusterRole::RW, nullptr,
nullptr),
{"cluster.enable",
CreateBoolConfig("cluster.enable", false, &config.cluster.enable.value, false, nullptr, nullptr)},
{"cluster.role", CreateEnumConfig("cluster.role", false, &ClusterRoleMap, &config.cluster.role.value,
ClusterRole::RW, nullptr, nullptr)},
/* general */
CreateStringConfig("general.timezone", false, &config.general.timezone.value, "UTC+8", nullptr, nullptr),
CreateStringConfig("general.meta_uri", false, &config.general.meta_uri.value, "sqlite://:@:/", nullptr,
nullptr),
{"general.timezone",
CreateStringConfig("general.timezone", false, &config.general.timezone.value, "UTC+8", nullptr, nullptr)},
{"general.meta_uri", CreateStringConfig("general.meta_uri", false, &config.general.meta_uri.value,
"sqlite://:@:/", nullptr, nullptr)},
/* network */
CreateStringConfig("network.bind.address", false, &config.network.bind.address.value, "0.0.0.0", nullptr,
nullptr),
CreateIntegerConfig("network.bind.port", false, 0, 65535, &config.network.bind.port.value, 19530, nullptr,
nullptr),
CreateBoolConfig("network.http.enable", false, &config.network.http.enable.value, true, nullptr, nullptr),
CreateIntegerConfig("network.http.port", false, 0, 65535, &config.network.http.port.value, 19121, nullptr,
nullptr),
{"network.bind.address", CreateStringConfig("network.bind.address", false, &config.network.bind.address.value,
"0.0.0.0", nullptr, nullptr)},
{"network.bind.port", CreateIntegerConfig("network.bind.port", false, 0, 65535, &config.network.bind.port.value,
19530, nullptr, nullptr)},
{"network.http.enable",
CreateBoolConfig("network.http.enable", false, &config.network.http.enable.value, true, nullptr, nullptr)},
{"network.http.port", CreateIntegerConfig("network.http.port", false, 0, 65535, &config.network.http.port.value,
19121, nullptr, nullptr)},
/* storage */
CreateStringConfig("storage.path", false, &config.storage.path.value, "/var/lib/milvus", nullptr, nullptr),
CreateIntegerConfig("storage.auto_flush_interval", false, 0, std::numeric_limits<int64_t>::max(),
&config.storage.auto_flush_interval.value, 1, nullptr, nullptr),
CreateIntegerConfig("storage.file_cleanup_timeout", false, 0, 3600, &config.storage.file_cleanup_timeout.value,
10, nullptr, nullptr),
{"storage.path",
CreateStringConfig("storage.path", false, &config.storage.path.value, "/var/lib/milvus", nullptr, nullptr)},
{"storage.auto_flush_interval",
CreateIntegerConfig("storage.auto_flush_interval", true, 0, std::numeric_limits<int64_t>::max(),
&config.storage.auto_flush_interval.value, 1, nullptr, nullptr)},
{"storage.file_cleanup_timeout",
CreateIntegerConfig("storage.file_cleanup_timeout", false, 0, 3600, &config.storage.file_cleanup_timeout.value,
10, nullptr, nullptr)},
/* wal */
CreateBoolConfig("wal.enable", false, &config.wal.enable.value, true, nullptr, nullptr),
CreateBoolConfig("wal.recovery_error_ignore", false, &config.wal.recovery_error_ignore.value, false, nullptr,
nullptr),
CreateSizeConfig("wal.buffer_size", false, 64 * MB, 4096 * MB, &config.wal.buffer_size.value, 256 * MB, nullptr,
nullptr),
CreateStringConfig("wal.path", false, &config.wal.path.value, "/var/lib/milvus/wal", nullptr, nullptr),
{"wal.enable", CreateBoolConfig("wal.enable", false, &config.wal.enable.value, true, nullptr, nullptr)},
{"wal.recovery_error_ignore",
CreateBoolConfig("wal.recovery_error_ignore", false, &config.wal.recovery_error_ignore.value, false, nullptr,
nullptr)},
{"wal.buffer_size", CreateSizeConfig("wal.buffer_size", false, 64 * MB, 4096 * MB,
&config.wal.buffer_size.value, 256 * MB, nullptr, nullptr)},
{"wal.path",
CreateStringConfig("wal.path", false, &config.wal.path.value, "/var/lib/milvus/wal", nullptr, nullptr)},
/* cache */
CreateSizeConfig("cache.cache_size", true, 0, std::numeric_limits<int64_t>::max(),
&config.cache.cache_size.value, 4 * GB, nullptr, nullptr),
CreateFloatingConfig("cache.cpu_cache_threshold", false, 0.0, 1.0, &config.cache.cpu_cache_threshold.value, 0.7,
nullptr, nullptr),
CreateSizeConfig("cache.insert_buffer_size", false, 0, std::numeric_limits<int64_t>::max(),
&config.cache.insert_buffer_size.value, 1 * GB, nullptr, nullptr),
CreateBoolConfig("cache.cache_insert_data", false, &config.cache.cache_insert_data.value, false, nullptr,
nullptr),
CreateStringConfig("cache.preload_collection", false, &config.cache.preload_collection.value, "", nullptr,
nullptr),
{"cache.cache_size", CreateSizeConfig("cache.cache_size", true, 0, std::numeric_limits<int64_t>::max(),
&config.cache.cache_size.value, 4 * GB, nullptr, nullptr)},
{"cache.cpu_cache_threshold",
CreateFloatingConfig("cache.cpu_cache_threshold", false, 0.0, 1.0, &config.cache.cpu_cache_threshold.value,
0.7, nullptr, nullptr)},
{"cache.insert_buffer_size",
CreateSizeConfig("cache.insert_buffer_size", false, 0, std::numeric_limits<int64_t>::max(),
&config.cache.insert_buffer_size.value, 1 * GB, nullptr, nullptr)},
{"cache.cache_insert_data", CreateBoolConfig("cache.cache_insert_data", false,
&config.cache.cache_insert_data.value, false, nullptr, nullptr)},
{"cache.preload_collection", CreateStringConfig("cache.preload_collection", false,
&config.cache.preload_collection.value, "", nullptr, nullptr)},
/* gpu */
CreateBoolConfig("gpu.enable", false, &config.gpu.enable.value, false, nullptr, nullptr),
CreateSizeConfig("gpu.cache_size", true, 0, std::numeric_limits<int64_t>::max(), &config.gpu.cache_size.value,
1 * GB, nullptr, nullptr),
CreateFloatingConfig("gpu.cache_threshold", false, 0.0, 1.0, &config.gpu.cache_threshold.value, 0.7, nullptr,
nullptr),
CreateIntegerConfig("gpu.gpu_search_threshold", true, 0, std::numeric_limits<int64_t>::max(),
&config.gpu.gpu_search_threshold.value, 1000, nullptr, nullptr),
CreateStringConfig("gpu.search_devices", false, &config.gpu.search_devices.value, "gpu0", nullptr, nullptr),
CreateStringConfig("gpu.build_index_devices", false, &config.gpu.build_index_devices.value, "gpu0", nullptr,
nullptr),
{"gpu.enable", CreateBoolConfig("gpu.enable", false, &config.gpu.enable.value, false, nullptr, nullptr)},
{"gpu.cache_size", CreateSizeConfig("gpu.cache_size", true, 0, std::numeric_limits<int64_t>::max(),
&config.gpu.cache_size.value, 1 * GB, nullptr, nullptr)},
{"gpu.cache_threshold", CreateFloatingConfig("gpu.cache_threshold", false, 0.0, 1.0,
&config.gpu.cache_threshold.value, 0.7, nullptr, nullptr)},
{"gpu.gpu_search_threshold",
CreateIntegerConfig("gpu.gpu_search_threshold", true, 0, std::numeric_limits<int64_t>::max(),
&config.gpu.gpu_search_threshold.value, 1000, nullptr, nullptr)},
{"gpu.search_devices",
CreateStringConfig("gpu.search_devices", false, &config.gpu.search_devices.value, "gpu0", nullptr, nullptr)},
{"gpu.build_index_devices",
CreateStringConfig("gpu.build_index_devices", false, &config.gpu.build_index_devices.value, "gpu0", nullptr,
nullptr)},
/* log */
CreateStringConfig("logs.level", false, &config.logs.level.value, "debug", nullptr, nullptr),
CreateBoolConfig("logs.trace.enable", false, &config.logs.trace.enable.value, true, nullptr, nullptr),
CreateStringConfig("logs.path", false, &config.logs.path.value, "/var/lib/milvus/logs", nullptr, nullptr),
CreateSizeConfig("logs.max_log_file_size", false, 512 * MB, 4096 * MB, &config.logs.max_log_file_size.value,
1024 * MB, nullptr, nullptr),
CreateIntegerConfig("logs.log_rotate_num", false, 0, 1024, &config.logs.log_rotate_num.value, 0, nullptr,
nullptr),
{"logs.level", CreateStringConfig("logs.level", false, &config.logs.level.value, "debug", nullptr, nullptr)},
{"logs.trace.enable",
CreateBoolConfig("logs.trace.enable", false, &config.logs.trace.enable.value, true, nullptr, nullptr)},
{"logs.path",
CreateStringConfig("logs.path", false, &config.logs.path.value, "/var/lib/milvus/logs", nullptr, nullptr)},
{"logs.max_log_file_size", CreateSizeConfig("logs.max_log_file_size", false, 512 * MB, 4096 * MB,
&config.logs.max_log_file_size.value, 1024 * MB, nullptr, nullptr)},
{"logs.log_rotate_num", CreateIntegerConfig("logs.log_rotate_num", false, 0, 1024,
&config.logs.log_rotate_num.value, 0, nullptr, nullptr)},
/* metric */
CreateBoolConfig("metric.enable", false, &config.metric.enable.value, false, nullptr, nullptr),
CreateStringConfig("metric.address", false, &config.metric.address.value, "127.0.0.1", nullptr, nullptr),
CreateIntegerConfig("metric.port", false, 1024, 65535, &config.metric.port.value, 9091, nullptr, nullptr),
{"metric.enable",
CreateBoolConfig("metric.enable", false, &config.metric.enable.value, false, nullptr, nullptr)},
{"metric.address",
CreateStringConfig("metric.address", false, &config.metric.address.value, "127.0.0.1", nullptr, nullptr)},
{"metric.port",
CreateIntegerConfig("metric.port", false, 1024, 65535, &config.metric.port.value, 9091, nullptr, nullptr)},
/* tracing */
CreateStringConfig("tracing.json_config_path", false, &config.tracing.json_config_path.value, "", nullptr,
nullptr),
{"tracing.json_config_path", CreateStringConfig("tracing.json_config_path", false,
&config.tracing.json_config_path.value, "", nullptr, nullptr)},
/* invisible */
/* engine */
CreateIntegerConfig("engine.search_combine_nq", true, 0, std::numeric_limits<int64_t>::max(),
&config.engine.search_combine_nq.value, 64, nullptr, nullptr),
CreateIntegerConfig("engine.use_blas_threshold", true, 0, std::numeric_limits<int64_t>::max(),
&config.engine.use_blas_threshold.value, 1100, nullptr, nullptr),
CreateIntegerConfig("engine.omp_thread_num", true, 0, std::numeric_limits<int64_t>::max(),
&config.engine.omp_thread_num.value, 0, nullptr, nullptr),
CreateEnumConfig("engine.simd_type", false, &SimdMap, &config.engine.simd_type.value, SimdType::AUTO, nullptr,
nullptr),
{"engine.search_combine_nq",
CreateIntegerConfig("engine.search_combine_nq", true, 0, std::numeric_limits<int64_t>::max(),
&config.engine.search_combine_nq.value, 64, nullptr, nullptr)},
{"engine.use_blas_threshold",
CreateIntegerConfig("engine.use_blas_threshold", true, 0, std::numeric_limits<int64_t>::max(),
&config.engine.use_blas_threshold.value, 1100, nullptr, nullptr)},
{"engine.omp_thread_num",
CreateIntegerConfig("engine.omp_thread_num", true, 0, std::numeric_limits<int64_t>::max(),
&config.engine.omp_thread_num.value, 0, nullptr, nullptr)},
{"engine.simd_type", CreateEnumConfig("engine.simd_type", false, &SimdMap, &config.engine.simd_type.value,
SimdType::AUTO, nullptr, nullptr)},
/* db */
CreateFloatingConfig("db.archive_disk_threshold", false, 0.0, 1.0, &config.db.archive_disk_threshold.value, 0.0,
nullptr, nullptr),
CreateIntegerConfig("db.archive_days_threshold", false, 0, std::numeric_limits<int64_t>::max(),
&config.db.archive_days_threshold.value, 0, nullptr, nullptr),
{"db.archive_disk_threshold",
CreateFloatingConfig("db.archive_disk_threshold", false, 0.0, 1.0, &config.db.archive_disk_threshold.value,
0.0, nullptr, nullptr)},
{"db.archive_days_threshold",
CreateIntegerConfig("db.archive_days_threshold", false, 0, std::numeric_limits<int64_t>::max(),
&config.db.archive_days_threshold.value, 0, nullptr, nullptr)},
};
}
void
ConfigMgr::Init() {
std::lock_guard<std::mutex> lock(GetConfigMutex());
for (auto& config : config_list_) config->Init();
for (auto& kv : config_list_) {
kv.second->Init();
}
}
void
......@@ -187,33 +212,41 @@ ConfigMgr::Load(const std::string& path) {
void
ConfigMgr::Set(const std::string& name, const std::string& value, bool update) {
for (auto& config : config_list_) {
if (std::strcmp(name.c_str(), config->name_) == 0) {
std::lock_guard<std::mutex> lock(GetConfigMutex());
if (not update || config->modifiable_) {
ThrowIfNotSuccess(config->Set(value, update));
Notify(name);
return;
}
try {
auto& config = config_list_.at(name);
std::unique_lock<std::mutex> lock(GetConfigMutex());
/* update=false when loading from config file */
if (not update) {
ThrowIfNotSuccess(config->Set(value, update));
} else if (config->modifiable_) {
/* set manually */
ThrowIfNotSuccess(config->Set(value, update));
lock.unlock();
Notify(name);
}
} catch (...) {
throw "Config " + name + " not found.";
}
throw "Config " + name + " not found.";
}
std::string
ConfigMgr::Get(const std::string& name) const {
for (auto& config : config_list_)
if (std::strcmp(name.c_str(), config->name_) == 0) {
std::lock_guard<std::mutex> lock(GetConfigMutex());
return config->Get();
}
throw "Config " + name + " not found.";
try {
auto& config = config_list_.at(name);
std::lock_guard<std::mutex> lock(GetConfigMutex());
return config->Get();
} catch (...) {
throw "Config " + name + " not found.";
}
}
std::string
ConfigMgr::Dump() const {
std::stringstream ss;
for (auto& config : config_list_) ss << config->name_ << ": " << config->Get() << std::endl;
for (auto& kv : config_list_) {
auto& config = kv.second;
ss << config->name_ << ": " << config->Get() << std::endl;
}
return ss.str();
}
......
......@@ -82,7 +82,7 @@ class ConfigMgr {
Notify(const std::string& name);
private:
std::vector<BaseConfigPtr> config_list_;
std::unordered_map<std::string, BaseConfigPtr> config_list_;
std::mutex mutex_;
std::unordered_map<std::string, std::list<ConfigObserver*>> observers_;
......
......@@ -73,10 +73,16 @@ DBImpl::DBImpl(const DBOptions& options)
// mxlog_config.mxlog_path = options_.mxlog_path_;
// wal_mgr_ = std::make_shared<wal::WalManager>(mxlog_config);
}
/* watch on storage.auto_flush_interval */
ConfigMgr::GetInstance().Attach("storage.auto_flush_interval", this);
Start();
}
DBImpl::~DBImpl() {
ConfigMgr::GetInstance().Detach("storage.auto_flush_interval", this);
Stop();
}
......@@ -1284,5 +1290,12 @@ DBImpl::ResumeIfLast() {
}
}
void
DBImpl::ConfigUpdate(const std::string& name) {
if (name == "storage.auto_flush_interval") {
options_.auto_flush_interval_ = config.storage.auto_flush_interval();
}
}
} // namespace engine
} // namespace milvus
......@@ -23,13 +23,14 @@
#include "db/DB.h"
#include "config/ConfigMgr.h"
#include "utils/ThreadPool.h"
#include "wal/WalManager.h"
namespace milvus {
namespace engine {
class DBImpl : public DB {
class DBImpl : public DB, public ConfigObserver {
public:
explicit DBImpl(const DBOptions& options);
......@@ -115,6 +116,9 @@ class DBImpl : public DB {
Status
Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold = 0.0) override;
void
ConfigUpdate(const std::string& name) override;
private:
void
InternalFlush(const std::string& collection_name = "");
......
......@@ -22,7 +22,7 @@ const char* DEFAULT_DELETED_DOCS_NAME = "_del";
const char* DEFAULT_INDEX_NAME = "_idx";
const char* DEFAULT_INDEX_COMPRESS_NAME = "_compress";
const char* PARAM_COLLECTION_DIMENSION = "dim";
const char* PARAM_DIMENSION = "dim";
const char* PARAM_INDEX_METRIC_TYPE = "metric_type";
const char* PARAM_INDEX_EXTRA_PARAMS = "extra_params";
const char* PARAM_SEGMENT_SIZE = "segment_size";
......
......@@ -85,7 +85,7 @@ extern const char* DEFAULT_DELETED_DOCS_NAME;
extern const char* DEFAULT_INDEX_NAME;
extern const char* DEFAULT_INDEX_COMPRESS_NAME;
extern const char* PARAM_COLLECTION_DIMENSION;
extern const char* PARAM_DIMENSION;
extern const char* PARAM_INDEX_METRIC_TYPE;
extern const char* PARAM_INDEX_EXTRA_PARAMS;
extern const char* PARAM_SEGMENT_SIZE;
......
......@@ -72,9 +72,9 @@ SegmentReader::Initialize() {
int64_t field_width = 0;
int64_t dimension = params[knowhere::meta::DIM];
if (ftype == engine::FIELD_TYPE::VECTOR_BINARY) {
field_width += (dimension / 8);
field_width = (dimension / 8);
} else {
field_width += (dimension * sizeof(float));
field_width = (dimension * sizeof(float));
}
segment_ptr_->AddField(name, ftype, field_width);
} else {
......@@ -241,12 +241,8 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
std::make_shared<faiss::ConcurrentBitset>(segment_commit->GetRowCount());
segment::DeletedDocsPtr deleted_docs_ptr;
auto status = LoadDeletedDocs(deleted_docs_ptr);
if (!status.ok()) {
return status;
}
if (deleted_docs_ptr) {
STATUS_CHECK(LoadDeletedDocs(deleted_docs_ptr));
if (deleted_docs_ptr != nullptr) {
auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs();
for (auto& offset : deleted_docs) {
concurrent_bitset_ptr->set(offset);
......@@ -255,10 +251,7 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
// load uids
std::vector<int64_t> uids;
status = LoadUids(uids);
if (!status.ok()) {
return status;
}
STATUS_CHECK(LoadUids(uids));
knowhere::BinarySet index_data;
knowhere::BinaryPtr raw_data, compress_data;
......@@ -278,14 +271,13 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
// if index not specified, or index file not created, return IDMAP
auto index_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_INDEX);
if (index_visitor == nullptr || index_visitor->GetFile() == nullptr) {
auto& snapshot = segment_visitor_->GetSnapshot();
auto& json = field->GetParams();
if (json.find(knowhere::meta::DIM) == json.end()) {
return Status(DB_ERROR, "Vector field dimension undefined");
}
int64_t dimension = json[knowhere::meta::DIM];
std::vector<uint8_t> raw;
LoadField(field_name, raw);
STATUS_CHECK(LoadField(field_name, raw));
auto dataset = knowhere::GenDataset(segment_commit->GetRowCount(), dimension, raw.data());
knowhere::VecIndexFactory& vec_index_factory = knowhere::VecIndexFactory::GetInstance();
......
......@@ -43,11 +43,8 @@ namespace server {
Status
ReqHandler::CreateCollection(const std::shared_ptr<Context>& context, const std::string& collection_name,
std::unordered_map<std::string, engine::meta::DataType>& field_types,
std::unordered_map<std::string, milvus::json>& field_index_params,
std::unordered_map<std::string, std::string>& field_params, milvus::json& json_param) {
BaseReqPtr req_ptr = CreateCollectionReq::Create(context, collection_name, field_types, field_index_params,
field_params, json_param);
std::unordered_map<std::string, FieldSchema>& fields, milvus::json& json_param) {
BaseReqPtr req_ptr = CreateCollectionReq::Create(context, collection_name, fields, json_param);
ReqScheduler::ExecReq(req_ptr);
return req_ptr->status();
}
......
......@@ -32,9 +32,7 @@ class ReqHandler {
Status
CreateCollection(const std::shared_ptr<Context>& context, const std::string& collection_name,
std::unordered_map<std::string, engine::meta::DataType>& field_types,
std::unordered_map<std::string, milvus::json>& field_index_params,
std::unordered_map<std::string, std::string>& field_params, milvus::json& json_params);
std::unordered_map<std::string, FieldSchema>& fields, milvus::json& json_params);
Status
DropCollection(const std::shared_ptr<Context>& context, const std::string& collection_name);
......
......@@ -32,11 +32,15 @@
namespace milvus {
namespace server {
struct FieldSchema {
engine::FieldType field_type_;
milvus::json field_params_;
milvus::json index_params_;
};
struct CollectionSchema {
std::string collection_name_;
std::unordered_map<std::string, engine::meta::DataType> field_types_;
std::unordered_map<std::string, milvus::json> index_params_;
std::unordered_map<std::string, milvus::json> field_params_;
std::unordered_map<std::string, FieldSchema> fields_;
milvus::json extra_params_;
};
......
......@@ -30,25 +30,18 @@ namespace server {
CreateCollectionReq::CreateCollectionReq(const std::shared_ptr<milvus::server::Context>& context,
const std::string& collection_name,
std::unordered_map<std::string, engine::meta::DataType>& field_types,
std::unordered_map<std::string, milvus::json>& field_index_params,
std::unordered_map<std::string, std::string>& field_params,
std::unordered_map<std::string, FieldSchema>& fields,
milvus::json& extra_params)
: BaseReq(context, BaseReq::kCreateCollection),
collection_name_(collection_name),
field_types_(field_types),
field_index_params_(field_index_params),
field_params_(field_params),
fields_(fields),
extra_params_(extra_params) {
}
BaseReqPtr
CreateCollectionReq::Create(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
std::unordered_map<std::string, engine::meta::DataType>& field_types,
std::unordered_map<std::string, milvus::json>& field_index_params,
std::unordered_map<std::string, std::string>& field_params, milvus::json& extra_params) {
return std::shared_ptr<BaseReq>(
new CreateCollectionReq(context, collection_name, field_types, field_index_params, field_params, extra_params));
std::unordered_map<std::string, FieldSchema>& fields, milvus::json& extra_params) {
return std::shared_ptr<BaseReq>(new CreateCollectionReq(context, collection_name, fields, extra_params));
}
Status
......@@ -69,37 +62,42 @@ CreateCollectionReq::OnExecute() {
// step 2: create snapshot collection context
engine::snapshot::CreateCollectionContext create_collection_context;
auto ss_collection_schema = std::make_shared<engine::snapshot::Collection>(collection_name_, extra_params_);
create_collection_context.collection = ss_collection_schema;
for (auto& field_type : field_types_) {
std::string field_name = field_type.first;
auto index_params = field_index_params_.at(field_name);
auto collection_schema = std::make_shared<engine::snapshot::Collection>(collection_name_, extra_params_);
create_collection_context.collection = collection_schema;
for (auto& field_kv : fields_) {
auto& field_name = field_kv.first;
auto& field_schema = field_kv.second;
auto& field_type = field_schema.field_type_;
auto& field_params = field_schema.field_params_;
auto& index_params = field_schema.index_params_;
std::cout << index_params.dump() << std::endl;
std::string index_name;
if (index_params.contains("name")) {
index_name = index_params["name"];
}
int64_t dimension = 0;
json field_params;
if (!field_params_.at(field_name).empty()) {
field_params = json::parse(field_params_.at(field_name));
if (field_type.second == engine::meta::DataType::VECTOR_FLOAT ||
field_type.second == engine::meta::DataType::VECTOR_BINARY) {
if (!field_params.contains(engine::PARAM_COLLECTION_DIMENSION)) {
return Status{milvus::SERVER_INVALID_VECTOR_DIMENSION,
"Dimension should be defined in vector field extra_params"};
}
std::cout << field_params.dump() << std::endl;
if (field_type == engine::FieldType::VECTOR_FLOAT || field_type == engine::FieldType::VECTOR_BINARY) {
if (!field_params.contains(engine::PARAM_DIMENSION)) {
return Status(SERVER_INVALID_VECTOR_DIMENSION, "Dimension not defined in field_params");
}
}
auto field = std::make_shared<engine::snapshot::Field>(field_name, 0, (engine::FieldType)field_type.second,
field_params);
auto field = std::make_shared<engine::snapshot::Field>(field_name, 0, field_type, field_params);
auto field_element = std::make_shared<engine::snapshot::FieldElement>(
0, 0, index_name, engine::FieldElementType::FET_INDEX, index_params);
create_collection_context.fields_schema[field] = {field_element};
}
if (!extra_params_.contains(engine::PARAM_SEGMENT_SIZE)) {
return Status(SERVER_UNEXPECTED_ERROR, "Segment size not defined");
} else {
auto segment_size = extra_params_[engine::PARAM_SEGMENT_SIZE].get<int64_t>();
STATUS_CHECK(ValidateCollectionIndexFileSize(segment_size));
}
// step 3: create collection
status = DBWrapper::DB()->CreateCollection(create_collection_context);
fiu_do_on("CreateCollectionReq.OnExecute.invalid_db_execute",
......
......@@ -26,24 +26,18 @@ class CreateCollectionReq : public BaseReq {
public:
static BaseReqPtr
Create(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
std::unordered_map<std::string, engine::meta::DataType>& field_types,
std::unordered_map<std::string, milvus::json>& field_index_params,
std::unordered_map<std::string, std::string>& field_params, milvus::json& extra_params);
std::unordered_map<std::string, FieldSchema>& fields, milvus::json& extra_params);
protected:
CreateCollectionReq(const std::shared_ptr<milvus::server::Context>& context, const std::string& collection_name,
std::unordered_map<std::string, engine::meta::DataType>& field_types,
std::unordered_map<std::string, milvus::json>& field_index_params,
std::unordered_map<std::string, std::string>& field_params, milvus::json& extra_params);
std::unordered_map<std::string, FieldSchema>& fields, milvus::json& extra_params);
Status
OnExecute() override;
private:
const std::string collection_name_;
std::unordered_map<std::string, engine::meta::DataType> field_types_;
std::unordered_map<std::string, milvus::json> field_index_params_;
std::unordered_map<std::string, std::string> field_params_;
std::unordered_map<std::string, FieldSchema> fields_;
milvus::json extra_params_;
};
......
......@@ -57,21 +57,22 @@ GetCollectionInfoReq::OnExecute() {
continue;
}
milvus::json json_index_param;
milvus::json field_index_param;
auto field_elements = field_kv.second;
for (const auto& element : field_elements) {
if (element->GetFtype() == (engine::snapshot::FTYPE_TYPE)engine::FieldElementType::FET_INDEX) {
json_index_param = element->GetParams().dump();
field_index_param = element->GetParams();
break;
}
}
auto field_name = field->GetName();
collection_schema_.field_types_.insert(
std::make_pair(field_name, (engine::meta::DataType)field->GetFtype()));
collection_schema_.index_params_.insert(std::make_pair(field_name, json_index_param));
milvus::json json_extra_param = field->GetParams();
collection_schema_.field_params_.insert(std::make_pair(field_name, json_extra_param));
FieldSchema field_schema;
field_schema.field_type_ = (engine::FieldType)field->GetFtype();
field_schema.field_params_ = field->GetParams();
field_schema.index_params_ = field_index_param;
collection_schema_.fields_.insert(std::make_pair(field_name, field_schema));
}
rc.ElapseFromBegin("done");
......
......@@ -626,34 +626,31 @@ GrpcRequestHandler::CreateCollection(::grpc::ServerContext* context, const ::mil
CHECK_NULLPTR_RETURN(request);
LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->ReqID().c_str(), __func__);
std::unordered_map<std::string, engine::meta::DataType> field_types;
std::unordered_map<std::string, milvus::json> field_index_params;
std::unordered_map<std::string, std::string> field_params;
std::unordered_map<std::string, FieldSchema> fields;
if (request->fields_size() > MAXIMUM_FIELD_NUM) {
Status status = Status{SERVER_INVALID_FIELD_NUM, "Maximum field's number should be limited to 64"};
LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__);
SET_RESPONSE(response, status, context);
return ::grpc::Status::OK;
}
for (int i = 0; i < request->fields_size(); ++i) {
const auto& field = request->fields(i);
auto field_name = field.name();
field_types.insert(std::make_pair(field_name, (engine::meta::DataType)field.type()));
milvus::json index_param;
for (int j = 0; j < field.index_params_size(); j++) {
index_param[field.index_params(j).key()] = field.index_params(j).value();
}
field_index_params.insert(std::make_pair(field_name, index_param));
FieldSchema field_schema;
field_schema.field_type_ = (engine::FieldType)field.type();
// Currently only one extra_param
if (request->fields(i).extra_params_size() != 0) {
auto extra_params = std::make_pair(request->fields(i).name(), request->fields(i).extra_params(0).value());
field_params.insert(extra_params);
} else {
auto extra_params = std::make_pair(request->fields(i).name(), "");
field_params.insert(extra_params);
if (field.extra_params_size() != 0) {
field_schema.field_params_ = json::parse(field.extra_params(0).value());
}
for (int j = 0; j < field.index_params_size(); j++) {
field_schema.index_params_[field.index_params(j).key()] = field.index_params(j).value();
}
fields[field.name()] = field_schema;
}
milvus::json json_params;
......@@ -664,8 +661,7 @@ GrpcRequestHandler::CreateCollection(::grpc::ServerContext* context, const ::mil
}
}
Status status = req_handler_.CreateCollection(GetContext(context), request->collection_name(), field_types,
field_index_params, field_params, json_params);
Status status = req_handler_.CreateCollection(GetContext(context), request->collection_name(), fields, json_params);
LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->ReqID().c_str(), __func__);
SET_RESPONSE(response, status, context)
......@@ -981,20 +977,25 @@ GrpcRequestHandler::DescribeCollection(::grpc::ServerContext* context, const ::m
}
response->set_collection_name(request->collection_name());
auto field_it = collection_schema.field_types_.begin();
for (; field_it != collection_schema.field_types_.end(); field_it++) {
for (auto& field_kv : collection_schema.fields_) {
auto field = response->add_fields();
field->set_name(field_it->first);
field->set_type((milvus::grpc::DataType)field_it->second);
for (auto& json_param : collection_schema.index_params_.at(field_it->first).items()) {
auto grpc_index_param = field->add_index_params();
grpc_index_param->set_key(json_param.key());
grpc_index_param->set_value(json_param.value());
}
auto& field_name = field_kv.first;
auto& field_schema = field_kv.second;
field->set_name(field_name);
field->set_type((milvus::grpc::DataType)field_schema.field_type_);
auto grpc_field_param = field->add_extra_params();
grpc_field_param->set_key(EXTRA_PARAM_KEY);
grpc_field_param->set_value(collection_schema.field_params_.at(field_it->first).dump());
grpc_field_param->set_value(field_schema.field_params_.dump());
for (auto& item : field_schema.index_params_.items()) {
auto grpc_index_param = field->add_index_params();
grpc_index_param->set_key(item.key());
grpc_index_param->set_value(item.value());
}
}
auto grpc_extra_param = response->add_extra_params();
grpc_extra_param->set_key(EXTRA_PARAM_KEY);
grpc_extra_param->set_value(collection_schema.extra_params_.dump());
......@@ -1699,8 +1700,6 @@ GrpcRequestHandler::Search(::grpc::ServerContext* context, const ::milvus::grpc:
CollectionSchema collection_schema;
status = req_handler_.GetCollectionInfo(GetContext(context), request->collection_name(), collection_schema);
field_type_ = collection_schema.field_types_;
auto grpc_entity = response->mutable_entities();
if (!status.ok()) {
SET_RESPONSE(response->mutable_status(), status, context);
......
......@@ -333,11 +333,8 @@ class GrpcRequestHandler final : public ::milvus::grpc::MilvusService::Service,
private:
ReqHandler req_handler_;
// std::unordered_map<::grpc::ServerContext*, std::shared_ptr<Context>> context_map_;
std::unordered_map<std::string, std::shared_ptr<Context>> context_map_;
std::shared_ptr<opentracing::Tracer> tracer_;
std::unordered_map<std::string, engine::meta::DataType> field_type_;
// std::unordered_map<::grpc::ServerContext*, std::unique_ptr<opentracing::Span>> span_map_;
mutable std::mt19937_64 random_num_generator_;
mutable std::mutex random_mutex_;
......
......@@ -158,7 +158,7 @@ WebRequestHandler::GetCollectionMetaInfo(const std::string& collection_name, nlo
STATUS_CHECK(req_handler_.CountEntities(context_ptr_, collection_name, count));
json_out["collection_name"] = schema.collection_name_;
json_out["dimension"] = schema.extra_params_[engine::PARAM_COLLECTION_DIMENSION].get<int64_t>();
json_out["dimension"] = schema.extra_params_[engine::PARAM_DIMENSION].get<int64_t>();
json_out["index_file_size"] = schema.extra_params_[engine::PARAM_SEGMENT_SIZE].get<int64_t>();
json_out["metric_type"] = schema.extra_params_[engine::PARAM_INDEX_METRIC_TYPE].get<int64_t>();
json_out["index_params"] = schema.extra_params_[engine::PARAM_INDEX_EXTRA_PARAMS].get<std::string>();
......@@ -699,7 +699,6 @@ WebRequestHandler::Search(const std::string& collection_name, const nlohmann::js
if (!status.ok()) {
return Status{UNEXPECTED_ERROR, "DescribeHybridCollection failed"};
}
field_type_ = collection_schema.field_types_;
milvus::json extra_params;
if (json.contains("fields")) {
......@@ -1202,38 +1201,38 @@ WebRequestHandler::CreateHybridCollection(const milvus::server::web::OString& bo
std::string collection_name = json_str["collection_name"];
// TODO(yukun): do checking
std::unordered_map<std::string, engine::meta::DataType> field_types;
std::unordered_map<std::string, milvus::json> field_index_params;
std::unordered_map<std::string, std::string> field_extra_params;
std::unordered_map<std::string, FieldSchema> fields;
for (auto& field : json_str["fields"]) {
FieldSchema field_schema;
std::string field_name = field["field_name"];
std::string field_type = field["field_type"];
auto extra_params = field["extra_params"];
field_schema.field_params_ = field["extra_params"];
const std::string& field_type = field["field_type"];
if (field_type == "int8") {
field_types.insert(std::make_pair(field_name, engine::meta::DataType::INT8));
field_schema.field_type_ = engine::FieldType::INT8;
} else if (field_type == "int16") {
field_types.insert(std::make_pair(field_name, engine::meta::DataType::INT16));
field_schema.field_type_ = engine::FieldType::INT16;
} else if (field_type == "int32") {
field_types.insert(std::make_pair(field_name, engine::meta::DataType::INT32));
field_schema.field_type_ = engine::FieldType::INT32;
} else if (field_type == "int64") {
field_types.insert(std::make_pair(field_name, engine::meta::DataType::INT64));
field_schema.field_type_ = engine::FieldType::INT64;
} else if (field_type == "float") {
field_types.insert(std::make_pair(field_name, engine::meta::DataType::FLOAT));
field_schema.field_type_ = engine::FieldType::FLOAT;
} else if (field_type == "double") {
field_types.insert(std::make_pair(field_name, engine::meta::DataType::DOUBLE));
field_schema.field_type_ = engine::FieldType::DOUBLE;
} else if (field_type == "vector") {
} else {
std::string msg = field_name + " has wrong field_type";
RETURN_STATUS_DTO(BODY_PARSE_FAIL, msg.c_str());
}
field_extra_params.insert(std::make_pair(field_name, extra_params.dump()));
fields[field_name] = field_schema;
}
milvus::json json_params;
auto status = req_handler_.CreateCollection(context_ptr_, collection_name, field_types, field_index_params,
field_extra_params, json_params);
auto status = req_handler_.CreateCollection(context_ptr_, collection_name, fields, json_params);
ASSIGN_RETURN_STATUS_DTO(status)
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册