未验证 提交 6f5be4b5 编写于 作者: G groot 提交者: GitHub

remove wal logic (#3245)

* remove wal logic
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* validate insert data size
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* typo
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* preload collections
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* refine code
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* avoid test fail
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* db proxy
Signed-off-by: Ngroot <yihua.mo@zilliz.com>
上级 ae5f518f
......@@ -99,9 +99,6 @@ ConfigMgr::ConfigMgr() {
{"storage.auto_flush_interval",
CreateIntegerConfig("storage.auto_flush_interval", true, 0, std::numeric_limits<int64_t>::max(),
&config.storage.auto_flush_interval.value, 1, nullptr, nullptr)},
{"storage.file_cleanup_timeout",
CreateIntegerConfig("storage.file_cleanup_timeout", false, 0, 3600, &config.storage.file_cleanup_timeout.value,
10, nullptr, nullptr)},
/* wal */
{"wal.enable", CreateBoolConfig("wal.enable", false, &config.wal.enable.value, true, nullptr, nullptr)},
......@@ -178,14 +175,6 @@ ConfigMgr::ConfigMgr() {
&config.engine.omp_thread_num.value, 0, nullptr, nullptr)},
{"engine.simd_type", CreateEnumConfig("engine.simd_type", false, &SimdMap, &config.engine.simd_type.value,
SimdType::AUTO, nullptr, nullptr)},
/* db */
{"db.archive_disk_threshold",
CreateFloatingConfig("db.archive_disk_threshold", false, 0.0, 1.0, &config.db.archive_disk_threshold.value,
0.0, nullptr, nullptr)},
{"db.archive_days_threshold",
CreateIntegerConfig("db.archive_days_threshold", false, 0, std::numeric_limits<int64_t>::max(),
&config.db.archive_days_threshold.value, 0, nullptr, nullptr)},
};
}
......
......@@ -92,15 +92,9 @@ struct ServerConfig {
} http;
} network;
struct DB {
Floating archive_disk_threshold{0.0};
Integer archive_days_threshold{0};
} db;
struct Storage {
String path{"unknown"};
Integer auto_flush_interval{0};
Integer file_cleanup_timeout{0};
} storage;
struct Cache {
......
......@@ -16,9 +16,9 @@
#include <unordered_map>
#include <vector>
#include "db/Options.h"
#include "db/SimpleWaitNotify.h"
#include "db/SnapshotHandlers.h"
#include "db/Types.h"
#include "db/insert/MemManager.h"
#include "db/merge/MergeManager.h"
#include "db/snapshot/Context.h"
......@@ -82,7 +82,7 @@ class DB {
ListPartitions(const std::string& collection_name, std::vector<std::string>& partition_names) = 0;
virtual Status
CreateIndex(const server::ContextPtr& context, const std::string& collection_id, const std::string& field_name,
CreateIndex(const server::ContextPtr& context, const std::string& collection_name, const std::string& field_name,
const CollectionIndex& index) = 0;
virtual Status
......@@ -103,7 +103,7 @@ class DB {
DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) = 0;
virtual Status
ListIDInSegment(const std::string& collection_id, int64_t segment_id, IDNumbers& entity_ids) = 0;
ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) = 0;
virtual Status
Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) = 0;
......
......@@ -12,7 +12,6 @@
#pragma once
#include "DB.h"
#include "Options.h"
#include <memory>
#include <string>
......
......@@ -38,7 +38,6 @@
#include "utils/Exception.h"
#include "utils/StringHelpFunctions.h"
#include "utils/TimeRecorder.h"
#include "wal/WalDefinations.h"
#include <fiu-local.h>
#include <src/scheduler/job/BuildIndexJob.h>
......@@ -134,9 +133,7 @@ DBImpl::Stop() {
if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
// flush all without merge
wal::MXLogRecord record;
record.type = wal::MXLogType::Flush;
ExecWalRecord(record);
InternalFlush("", false);
// wait flush thread finish
swn_flush_.Notify();
......@@ -467,15 +464,26 @@ DBImpl::Insert(const std::string& collection_name, const std::string& partition_
}
// insert entities: collection_name is field id
wal::MXLogRecord record;
record.lsn = 0;
record.collection_id = collection_name;
record.partition_tag = partition_name;
record.data_chunk = data_chunk;
record.length = data_chunk->count_;
record.type = wal::MXLogType::Entity;
snapshot::PartitionPtr part = ss->GetPartition(partition_name);
if (part == nullptr) {
LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << partition_name;
return Status(DB_ERROR, "Invalid partiiton name");
}
int64_t collection_id = ss->GetCollectionId();
int64_t partition_id = part->GetID();
STATUS_CHECK(ExecWalRecord(record));
auto status = mem_mgr_->InsertEntities(collection_id, partition_id, data_chunk, 0);
if (!status.ok()) {
return status;
}
if (mem_mgr_->GetCurrentMem() > options_.insert_buffer_size_) {
LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Insert buffer size exceeds limit. Force flush";
InternalFlush();
}
// metrics
milvus::server::CollectInsertMetrics metrics(data_chunk->count_, status);
return Status::OK();
}
......@@ -504,16 +512,14 @@ Status
DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) {
CHECK_INITIALIZED;
Status status;
wal::MXLogRecord record;
record.lsn = 0; // need to get from meta ?
record.type = wal::MXLogType::Delete;
record.collection_id = collection_name;
record.ids = entity_ids.data();
record.length = entity_ids.size();
status = ExecWalRecord(record);
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
if (!status.ok()) {
LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "delete", 0) << "Get snapshot fail: " << status.message();
return status;
}
status = mem_mgr_->DeleteEntities(ss->GetCollectionId(), entity_ids, 0);
return status;
}
......@@ -771,11 +777,54 @@ DBImpl::Compact(const std::shared_ptr<server::Context>& context, const std::stri
// Internal APIs
////////////////////////////////////////////////////////////////////////////////
void
DBImpl::InternalFlush(const std::string& collection_name) {
wal::MXLogRecord record;
record.type = wal::MXLogType::Flush;
record.collection_id = collection_name;
ExecWalRecord(record);
DBImpl::InternalFlush(const std::string& collection_name, bool merge) {
Status status;
std::set<std::string> flushed_collections;
if (!collection_name.empty()) {
// flush one collection
snapshot::ScopedSnapshotT ss;
status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
if (!status.ok()) {
LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "flush", 0) << "Get snapshot fail: " << status.message();
return;
}
{
const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
int64_t collection_id = ss->GetCollectionId();
status = mem_mgr_->Flush(collection_id);
if (!status.ok()) {
return;
}
}
flushed_collections.insert(collection_name);
} else {
// flush all collections
std::set<int64_t> collection_ids;
{
const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
status = mem_mgr_->Flush(collection_ids);
if (!status.ok()) {
return;
}
}
for (auto id : collection_ids) {
snapshot::ScopedSnapshotT ss;
status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, id);
if (!status.ok()) {
LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "flush", 0) << "Get snapshot fail: " << status.message();
return;
}
flushed_collections.insert(ss->GetName());
}
}
if (merge) {
StartMergeTask(flushed_collections);
}
}
void
......@@ -932,188 +981,6 @@ DBImpl::WaitBuildIndexFinish() {
// LOG_ENGINE_DEBUG_ << "End WaitBuildIndexFinish";
}
void
DBImpl::TimingWalThread() {
// SetThreadName("wal_thread");
// server::SystemInfo::GetInstance().Init();
//
// std::chrono::system_clock::time_point next_auto_flush_time;
// auto get_next_auto_flush_time = [&]() {
// return std::chrono::system_clock::now() + std::chrono::seconds(options_.auto_flush_interval_);
// };
// if (options_.auto_flush_interval_ > 0) {
// next_auto_flush_time = get_next_auto_flush_time();
// }
//
// InternalFlush();
// while (true) {
// if (options_.auto_flush_interval_ > 0) {
// if (std::chrono::system_clock::now() >= next_auto_flush_time) {
// InternalFlush();
// next_auto_flush_time = get_next_auto_flush_time();
// }
// }
//
// wal::MXLogRecord record;
// auto error_code = wal_mgr_->GetNextRecord(record);
// if (error_code != WAL_SUCCESS) {
// LOG_ENGINE_ERROR_ << "WAL background GetNextRecord error";
// break;
// }
//
// if (record.type != wal::MXLogType::None) {
// ExecWalRecord(record);
// if (record.type == wal::MXLogType::Flush) {
// // notify flush request to return
// flush_req_swn_.Notify();
//
// // if user flush all manually, update auto flush also
// if (record.collection_id.empty() && options_.auto_flush_interval_ > 0) {
// next_auto_flush_time = get_next_auto_flush_time();
// }
// }
//
// } else {
// if (!initialized_.load(std::memory_order_acquire)) {
// InternalFlush();
// flush_req_swn_.Notify();
// // SS TODO
// // WaitMergeFileFinish();
// // WaitBuildIndexFinish();
// LOG_ENGINE_DEBUG_ << "WAL background thread exit";
// break;
// }
//
// if (options_.auto_flush_interval_ > 0) {
// swn_wal_.Wait_Until(next_auto_flush_time);
// } else {
// swn_wal_.Wait();
// }
// }
// }
}
Status
DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
auto force_flush_if_mem_full = [&]() -> void {
if (mem_mgr_->GetCurrentMem() > options_.insert_buffer_size_) {
LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Insert buffer size exceeds limit. Force flush";
InternalFlush();
}
};
auto get_collection_partition_id = [&](const wal::MXLogRecord& record, int64_t& col_id,
int64_t& part_id) -> Status {
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, record.collection_id);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get snapshot fail: " << status.message();
return status;
}
col_id = ss->GetCollectionId();
snapshot::PartitionPtr part = ss->GetPartition(record.partition_tag);
if (part == nullptr) {
LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
return status;
}
part_id = part->GetID();
return Status::OK();
};
Status status;
switch (record.type) {
case wal::MXLogType::Entity: {
int64_t collection_name = 0, partition_id = 0;
status = get_collection_partition_id(record, collection_name, partition_id);
if (!status.ok()) {
LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << status.message();
return status;
}
status = mem_mgr_->InsertEntities(collection_name, partition_id, record.data_chunk, record.lsn);
force_flush_if_mem_full();
// metrics
milvus::server::CollectInsertMetrics metrics(record.length, status);
break;
}
case wal::MXLogType::Delete: {
snapshot::ScopedSnapshotT ss;
status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, record.collection_id);
if (!status.ok()) {
LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "delete", 0) << "Get snapshot fail: " << status.message();
return status;
}
std::vector<id_t> delete_ids;
delete_ids.resize(record.length);
memcpy(delete_ids.data(), record.ids, record.length * sizeof(id_t));
status = mem_mgr_->DeleteEntities(ss->GetCollectionId(), delete_ids, record.lsn);
if (!status.ok()) {
return status;
}
break;
}
case wal::MXLogType::Flush: {
if (!record.collection_id.empty()) {
// flush one collection
snapshot::ScopedSnapshotT ss;
status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, record.collection_id);
if (!status.ok()) {
LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "flush", 0) << "Get snapshot fail: " << status.message();
return status;
}
{
const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
int64_t collection_id = ss->GetCollectionId();
status = mem_mgr_->Flush(collection_id);
if (!status.ok()) {
return status;
}
}
std::set<std::string> flushed_collections;
flushed_collections.insert(record.collection_id);
StartMergeTask(flushed_collections);
} else {
// flush all collections
std::set<int64_t> collection_ids;
{
const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
status = mem_mgr_->Flush(collection_ids);
}
std::set<std::string> flushed_collections;
for (auto id : collection_ids) {
snapshot::ScopedSnapshotT ss;
status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, id);
if (!status.ok()) {
LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "flush", 0) << "Get snapshot fail: " << status.message();
return status;
}
flushed_collections.insert(ss->GetName());
}
StartMergeTask(flushed_collections);
}
break;
}
default:
break;
}
return status;
}
void
DBImpl::StartMergeTask(const std::set<std::string>& collection_names, bool force_merge_all) {
// LOG_ENGINE_DEBUG_ << "Begin StartMergeTask";
......
......@@ -25,7 +25,6 @@
#include "config/ConfigMgr.h"
#include "utils/ThreadPool.h"
#include "wal/WalManager.h"
namespace milvus {
namespace engine {
......@@ -37,10 +36,10 @@ class DBImpl : public DB, public ConfigObserver {
~DBImpl();
Status
Start();
Start() override;
Status
Stop();
Stop() override;
Status
CreateCollection(const snapshot::CreateCollectionContext& context) override;
......@@ -103,6 +102,7 @@ class DBImpl : public DB, public ConfigObserver {
Status
ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) override;
// if the input field_names is empty, will load all fields of this collection
Status
LoadCollection(const server::ContextPtr& context, const std::string& collection_name,
const std::vector<std::string>& field_names, bool force = false) override;
......@@ -121,7 +121,7 @@ class DBImpl : public DB, public ConfigObserver {
private:
void
InternalFlush(const std::string& collection_name = "");
InternalFlush(const std::string& collection_name = "", bool merge = true);
void
TimingFlushThread();
......@@ -144,12 +144,6 @@ class DBImpl : public DB, public ConfigObserver {
void
WaitBuildIndexFinish();
void
TimingWalThread();
Status
ExecWalRecord(const wal::MXLogRecord& record);
void
StartMergeTask(const std::set<std::string>& collection_names, bool force_merge_all = false);
......@@ -172,14 +166,10 @@ class DBImpl : public DB, public ConfigObserver {
MemManagerPtr mem_mgr_;
MergeManagerPtr merge_mgr_ptr_;
// std::shared_ptr<wal::WalManager> wal_mgr_;
std::thread bg_wal_thread_;
std::thread bg_flush_thread_;
std::thread bg_metric_thread_;
std::thread bg_index_thread_;
SimpleWaitNotify swn_wal_;
SimpleWaitNotify swn_flush_;
SimpleWaitNotify swn_metric_;
SimpleWaitNotify swn_index_;
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/DBProxy.h"
namespace milvus {
namespace engine {
#define DB_CHECK \
if (db_ == nullptr) { \
return Status::OK(); \
}
DBProxy::DBProxy(const DBPtr& db) : db_(db) {
}
Status
DBProxy::Stop() {
DB_CHECK
return db_->Stop();
}
Status
DBProxy::CreateCollection(const snapshot::CreateCollectionContext& context) {
DB_CHECK
return db_->CreateCollection(context);
}
Status
DBProxy::DropCollection(const std::string& name) {
DB_CHECK
return db_->DropCollection(name);
}
Status
DBProxy::HasCollection(const std::string& collection_name, bool& has_or_not) {
DB_CHECK
return db_->HasCollection(collection_name, has_or_not);
}
Status
DBProxy::ListCollections(std::vector<std::string>& names) {
DB_CHECK
return db_->ListCollections(names);
}
Status
DBProxy::GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection,
snapshot::FieldElementMappings& fields_schema) {
DB_CHECK
return db_->GetCollectionInfo(collection_name, collection, fields_schema);
}
Status
DBProxy::GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats) {
DB_CHECK
return db_->GetCollectionStats(collection_name, collection_stats);
}
Status
DBProxy::CountEntities(const std::string& collection_name, int64_t& row_count) {
DB_CHECK
return db_->CountEntities(collection_name, row_count);
}
Status
DBProxy::CreatePartition(const std::string& collection_name, const std::string& partition_name) {
DB_CHECK
return db_->CreatePartition(collection_name, partition_name);
}
Status
DBProxy::DropPartition(const std::string& collection_name, const std::string& partition_name) {
DB_CHECK
return db_->DropPartition(collection_name, partition_name);
}
Status
DBProxy::HasPartition(const std::string& collection_name, const std::string& partition_tag, bool& exist) {
DB_CHECK
return db_->HasPartition(collection_name, partition_tag, exist);
}
Status
DBProxy::ListPartitions(const std::string& collection_name, std::vector<std::string>& partition_names) {
DB_CHECK
return db_->ListPartitions(collection_name, partition_names);
}
Status
DBProxy::CreateIndex(const server::ContextPtr& context, const std::string& collection_name,
const std::string& field_name, const CollectionIndex& index) {
DB_CHECK
return db_->CreateIndex(context, collection_name, field_name, index);
}
Status
DBProxy::DropIndex(const std::string& collection_name, const std::string& field_name) {
DB_CHECK
return db_->DropIndex(collection_name, field_name);
}
Status
DBProxy::DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) {
DB_CHECK
return db_->DescribeIndex(collection_name, field_name, index);
}
Status
DBProxy::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) {
DB_CHECK
return db_->Insert(collection_name, partition_name, data_chunk);
}
Status
DBProxy::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
const std::vector<std::string>& field_names, std::vector<bool>& valid_row,
DataChunkPtr& data_chunk) {
DB_CHECK
return db_->GetEntityByID(collection_name, id_array, field_names, valid_row, data_chunk);
}
Status
DBProxy::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) {
DB_CHECK
return db_->DeleteEntityByID(collection_name, entity_ids);
}
Status
DBProxy::ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) {
DB_CHECK
return db_->ListIDInSegment(collection_name, segment_id, entity_ids);
}
Status
DBProxy::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) {
DB_CHECK
return db_->Query(context, query_ptr, result);
}
Status
DBProxy::LoadCollection(const server::ContextPtr& context, const std::string& collection_name,
const std::vector<std::string>& field_names, bool force) {
DB_CHECK
return db_->LoadCollection(context, collection_name, field_names, force);
}
Status
DBProxy::Flush(const std::string& collection_name) {
DB_CHECK
return db_->Flush(collection_name);
}
Status
DBProxy::Flush() {
DB_CHECK
return db_->Flush();
}
Status
DBProxy::Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold) {
DB_CHECK
return db_->Compact(context, collection_name, threshold);
}
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include "DB.h"
#include <memory>
#include <string>
#include <vector>
namespace milvus {
namespace engine {
class DBProxy : public DB {
public:
explicit DBProxy(const DBPtr& db);
Status
Start() override;
Status
Stop() override;
Status
CreateCollection(const snapshot::CreateCollectionContext& context) override;
Status
DropCollection(const std::string& name) override;
Status
HasCollection(const std::string& collection_name, bool& has_or_not) override;
Status
ListCollections(std::vector<std::string>& names) override;
Status
GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection,
snapshot::FieldElementMappings& fields_schema) override;
Status
GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats) override;
Status
CountEntities(const std::string& collection_name, int64_t& row_count) override;
Status
CreatePartition(const std::string& collection_name, const std::string& partition_name) override;
Status
DropPartition(const std::string& collection_name, const std::string& partition_name) override;
Status
HasPartition(const std::string& collection_name, const std::string& partition_tag, bool& exist) override;
Status
ListPartitions(const std::string& collection_name, std::vector<std::string>& partition_names) override;
Status
CreateIndex(const server::ContextPtr& context, const std::string& collection_name, const std::string& field_name,
const CollectionIndex& index) override;
Status
DropIndex(const std::string& collection_name, const std::string& field_name = "") override;
Status
DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) override;
Status
Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) override;
Status
GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
const std::vector<std::string>& field_names, std::vector<bool>& valid_row,
DataChunkPtr& data_chunk) override;
Status
DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) override;
Status
ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) override;
Status
Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) override;
Status
LoadCollection(const server::ContextPtr& context, const std::string& collection_name,
const std::vector<std::string>& field_names, bool force = false) override;
Status
Flush(const std::string& collection_name) override;
Status
Flush() override;
Status
Compact(const server::ContextPtr& context, const std::string& collection_name, double threshold = 0.0) override;
private:
DBPtr db_;
};
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/Options.h"
#include <fiu-local.h>
#include <limits>
#include "utils/Exception.h"
#include "utils/Log.h"
#include <assert.h>
#include <stdlib.h>
#include <boost/algorithm/string.hpp>
namespace milvus {
namespace engine {
const char* ARCHIVE_CONF_DISK = "disk";
const char* ARCHIVE_CONF_DAYS = "days";
const char* DEFAULT_PARTITON_TAG = "_default";
ArchiveConf::ArchiveConf(const std::string& type, const std::string& criterias) {
ParseType(type);
ParseCritirias(criterias);
}
void
ArchiveConf::SetCriterias(const ArchiveConf::CriteriaT& criterial) {
for (auto& pair : criterial) {
criterias_[pair.first] = pair.second;
}
}
void
ArchiveConf::ParseCritirias(const std::string& criterias) {
std::stringstream ss(criterias);
std::vector<std::string> tokens;
boost::algorithm::split(tokens, criterias, boost::is_any_of(";"));
fiu_do_on("ArchiveConf.ParseCritirias.empty_tokens", tokens.clear());
if (tokens.empty()) {
return;
}
for (auto& token : tokens) {
if (token.empty()) {
continue;
}
std::vector<std::string> kv;
boost::algorithm::split(kv, token, boost::is_any_of(":"));
if (kv.size() != 2) {
LOG_ENGINE_WARNING_ << "Invalid ArchiveConf Criterias: " << token << " Ignore!";
continue;
}
if (kv[0] != "disk" && kv[0] != "days") {
LOG_ENGINE_WARNING_ << "Invalid ArchiveConf Criterias: " << token << " Ignore!";
continue;
}
try {
fiu_do_on("ArchiveConf.ParseCritirias.OptionsParseCritiriasOutOfRange",
kv[1] = std::to_string(std::numeric_limits<int>::max() + 1UL));
auto value = std::stoi(kv[1]);
criterias_[kv[0]] = value;
} catch (std::out_of_range&) {
std::string msg = "Out of range: '" + kv[1] + "'";
LOG_ENGINE_ERROR_ << msg;
throw InvalidArgumentException(msg);
} catch (...) {
std::string msg = "Invalid argument: '" + kv[1] + "'";
LOG_ENGINE_ERROR_ << msg;
throw InvalidArgumentException(msg);
}
}
}
void
ArchiveConf::ParseType(const std::string& type) {
if (type != "delete" && type != "swap") {
std::string msg = "Invalid argument: type='" + type + "'";
throw InvalidArgumentException(msg);
}
type_ = type;
}
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include "Constants.h"
#include <map>
#include <memory>
#include <string>
#include <vector>
namespace milvus {
namespace engine {
class Env;
extern const char* ARCHIVE_CONF_DISK;
extern const char* ARCHIVE_CONF_DAYS;
extern const char* DEFAULT_PARTITON_TAG;
struct ArchiveConf {
using CriteriaT = std::map<std::string, int64_t>;
explicit ArchiveConf(const std::string& type, const std::string& criterias = std::string());
const std::string&
GetType() const {
return type_;
}
const CriteriaT
GetCriterias() const {
return criterias_;
}
void
SetCriterias(const ArchiveConf::CriteriaT& criterial);
private:
void
ParseCritirias(const std::string& criterias);
void
ParseType(const std::string& type);
std::string type_;
CriteriaT criterias_;
};
struct DBMetaOptions {
std::string path_;
std::string backend_uri_;
ArchiveConf archive_conf_ = ArchiveConf("delete");
}; // DBMetaOptions
struct DBOptions {
typedef enum { SINGLE = 0, CLUSTER_READONLY, CLUSTER_WRITABLE } MODE;
uint16_t merge_trigger_number_ = 2;
DBMetaOptions meta_;
int mode_ = MODE::SINGLE;
size_t insert_buffer_size_ = 4 * GB;
bool insert_cache_immediately_ = false;
int64_t auto_flush_interval_ = 1;
int64_t file_cleanup_timeout_ = 10;
bool metric_enable_ = false;
// wal relative configurations
bool wal_enable_ = true;
bool recovery_error_ignore_ = true;
int64_t buffer_size_ = 256;
std::string mxlog_path_ = "/tmp/milvus/wal/";
}; // Options
} // namespace engine
} // namespace milvus
......@@ -30,6 +30,7 @@ const char* PARAM_INDEX_EXTRA_PARAMS = "params";
const char* PARAM_SEGMENT_ROW_COUNT = "segment_row_count";
const char* DEFAULT_STRUCTURED_INDEX = "SORTED"; // this string should be defined in knowhere::IndexEnum
const char* DEFAULT_PARTITON_TAG = "_default";
} // namespace engine
} // namespace milvus
......@@ -30,6 +30,24 @@
namespace milvus {
namespace engine {
extern const char* FIELD_UID;
extern const char* ELEMENT_RAW_DATA;
extern const char* ELEMENT_BLOOM_FILTER;
extern const char* ELEMENT_DELETED_DOCS;
extern const char* ELEMENT_INDEX_COMPRESS;
extern const char* PARAM_UID_AUTOGEN;
extern const char* PARAM_DIMENSION;
extern const char* PARAM_INDEX_TYPE;
extern const char* PARAM_INDEX_METRIC_TYPE;
extern const char* PARAM_INDEX_EXTRA_PARAMS;
extern const char* PARAM_SEGMENT_ROW_COUNT;
extern const char* DEFAULT_STRUCTURED_INDEX;
extern const char* DEFAULT_PARTITON_TAG;
///////////////////////////////////////////////////////////////////////////////////////////////////
using id_t = int64_t;
using offset_t = int32_t;
using date_t = int32_t;
......@@ -42,6 +60,7 @@ using VectorDistances = std::vector<VectorDistance>;
using ResultIds = std::vector<faiss::Index::idx_t>;
using ResultDistances = std::vector<faiss::Index::distance_t>;
///////////////////////////////////////////////////////////////////////////////////////////////////
enum DataType {
NONE = 0,
BOOL = 1,
......@@ -59,6 +78,17 @@ enum DataType {
VECTOR_FLOAT = 101,
};
///////////////////////////////////////////////////////////////////////////////////////////////////
enum FieldElementType {
FET_NONE = 0,
FET_RAW = 1,
FET_BLOOM_FILTER = 2,
FET_DELETED_DOCS = 3,
FET_INDEX = 4,
FET_COMPRESS_SQ8 = 5,
};
///////////////////////////////////////////////////////////////////////////////////////////////////
class BinaryData : public cache::DataObj {
public:
int64_t
......@@ -71,6 +101,7 @@ class BinaryData : public cache::DataObj {
};
using BinaryDataPtr = std::shared_ptr<BinaryData>;
///////////////////////////////////////////////////////////////////////////////////////////////////
class VaribleData : public cache::DataObj {
public:
int64_t
......@@ -84,6 +115,7 @@ class VaribleData : public cache::DataObj {
};
using VaribleDataPtr = std::shared_ptr<VaribleData>;
///////////////////////////////////////////////////////////////////////////////////////////////////
using FIELD_TYPE_MAP = std::unordered_map<std::string, DataType>;
using FIELD_WIDTH_MAP = std::unordered_map<std::string, int64_t>;
using FIXEDX_FIELD_MAP = std::unordered_map<std::string, BinaryDataPtr>;
......@@ -91,14 +123,15 @@ using VARIABLE_FIELD_MAP = std::unordered_map<std::string, VaribleDataPtr>;
using VECTOR_INDEX_MAP = std::unordered_map<std::string, knowhere::VecIndexPtr>;
using STRUCTURED_INDEX_MAP = std::unordered_map<std::string, knowhere::IndexPtr>;
///////////////////////////////////////////////////////////////////////////////////////////////////
struct DataChunk {
int64_t count_ = 0;
FIXEDX_FIELD_MAP fixed_fields_;
VARIABLE_FIELD_MAP variable_fields_;
};
using DataChunkPtr = std::shared_ptr<DataChunk>;
///////////////////////////////////////////////////////////////////////////////////////////////////
struct CollectionIndex {
std::string index_name_;
std::string index_type_;
......@@ -106,6 +139,7 @@ struct CollectionIndex {
milvus::json extra_params_ = {{"nlist", 2048}};
};
///////////////////////////////////////////////////////////////////////////////////////////////////
struct VectorsData {
uint64_t vector_count_ = 0;
std::vector<float> float_data_;
......@@ -113,13 +147,7 @@ struct VectorsData {
IDNumbers id_array_;
};
struct Entity {
int64_t entity_count_ = 0;
std::vector<uint8_t> attr_value_;
std::unordered_map<std::string, VectorsData> vector_data_;
IDNumbers id_array_;
};
///////////////////////////////////////////////////////////////////////////////////////////////////
struct AttrsData {
uint64_t attr_count_ = 0;
std::unordered_map<std::string, engine::DataType> attr_type_;
......@@ -127,6 +155,7 @@ struct AttrsData {
IDNumbers id_array_;
};
///////////////////////////////////////////////////////////////////////////////////////////////////
struct QueryResult {
uint64_t row_num_;
engine::ResultIds result_ids_;
......@@ -135,33 +164,31 @@ struct QueryResult {
};
using QueryResultPtr = std::shared_ptr<QueryResult>;
using File2ErrArray = std::map<std::string, std::vector<std::string>>;
using Table2FileErr = std::map<std::string, File2ErrArray>;
///////////////////////////////////////////////////////////////////////////////////////////////////
struct DBMetaOptions {
std::string path_;
std::string backend_uri_;
}; // DBMetaOptions
extern const char* FIELD_UID;
///////////////////////////////////////////////////////////////////////////////////////////////////
struct DBOptions {
typedef enum { SINGLE = 0, CLUSTER_READONLY, CLUSTER_WRITABLE } MODE;
extern const char* ELEMENT_RAW_DATA;
extern const char* ELEMENT_BLOOM_FILTER;
extern const char* ELEMENT_DELETED_DOCS;
extern const char* ELEMENT_INDEX_COMPRESS;
DBMetaOptions meta_;
int mode_ = MODE::SINGLE;
extern const char* PARAM_UID_AUTOGEN;
extern const char* PARAM_DIMENSION;
extern const char* PARAM_INDEX_TYPE;
extern const char* PARAM_INDEX_METRIC_TYPE;
extern const char* PARAM_INDEX_EXTRA_PARAMS;
extern const char* PARAM_SEGMENT_ROW_COUNT;
size_t insert_buffer_size_ = 4 * GB;
bool insert_cache_immediately_ = false;
extern const char* DEFAULT_STRUCTURED_INDEX;
int64_t auto_flush_interval_ = 1;
enum FieldElementType {
FET_NONE = 0,
FET_RAW = 1,
FET_BLOOM_FILTER = 2,
FET_DELETED_DOCS = 3,
FET_INDEX = 4,
FET_COMPRESS_SQ8 = 5,
};
bool metric_enable_ = false;
// wal relative configurations
bool wal_enable_ = true;
int64_t buffer_size_ = 256;
std::string mxlog_path_ = "/tmp/milvus/wal/";
}; // Options
} // namespace engine
} // namespace milvus
......@@ -14,7 +14,6 @@
#include <ctime>
#include <string>
#include "Options.h"
#include "db/Types.h"
#include "utils/Status.h"
......
......@@ -12,7 +12,6 @@
#pragma once
#include "MemManager.h"
#include "db/Options.h"
#include <memory>
......
......@@ -12,7 +12,6 @@
#pragma once
#include "MergeManager.h"
#include "db/Options.h"
#include <memory>
......
......@@ -11,11 +11,11 @@
#pragma once
#include <string>
#include "db/Options.h"
#include "db/Types.h"
#include "db/meta/MetaAdapter.h"
#include <string>
namespace milvus::engine {
class MetaFactory {
......
......@@ -15,7 +15,7 @@
#include <mutex>
#include <vector>
#include "db/Options.h"
#include "db/Types.h"
#include "db/meta/backend/MetaEngine.h"
#include "db/meta/backend/MySQLConnectionPool.h"
......
......@@ -16,7 +16,6 @@
#include <sqlite3.h>
#include "db/Options.h"
#include "db/meta/backend/MetaEngine.h"
#include "utils/Status.h"
......
......@@ -41,7 +41,6 @@ DBWrapper::StartService() {
opt.meta_.path_ = path + "/db";
opt.auto_flush_interval_ = config.storage.auto_flush_interval();
opt.file_cleanup_timeout_ = config.storage.file_cleanup_timeout();
opt.metric_enable_ = config.metric.enable();
opt.insert_cache_immediately_ = config.cache.cache_insert_data();
opt.insert_buffer_size_ = config.cache.insert_buffer_size();
......@@ -63,7 +62,6 @@ DBWrapper::StartService() {
opt.wal_enable_ = false;
if (opt.wal_enable_) {
opt.recovery_error_ignore_ = config.wal.recovery_error_ignore();
int64_t wal_buffer_size = config.wal.buffer_size();
wal_buffer_size /= (1024 * 1024);
opt.buffer_size_ = wal_buffer_size;
......@@ -88,20 +86,6 @@ DBWrapper::StartService() {
int64_t use_blas_threshold = config.engine.use_blas_threshold();
faiss::distance_compute_blas_threshold = use_blas_threshold;
// set archive config
engine::ArchiveConf::CriteriaT criterial;
int64_t disk = config.db.archive_disk_threshold();
int64_t days = config.db.archive_days_threshold();
if (disk > 0) {
criterial[engine::ARCHIVE_CONF_DISK] = disk;
}
if (days > 0) {
criterial[engine::ARCHIVE_CONF_DAYS] = days;
}
opt.meta_.archive_conf_.SetCriterias(criterial);
// create db root folder
s = CommonUtil::CreateDirectory(opt.meta_.path_);
if (!s.ok()) {
......@@ -121,20 +105,14 @@ DBWrapper::StartService() {
kill(0, SIGUSR1);
}
// // preload collection
// std::string preload_collections;
// s = config.GetCacheConfigPreloadCollection(preload_collections);
// if (!s.ok()) {
// std::cerr << s.ToString() << std::endl;
// return s;
// }
//
// s = PreloadCollections(preload_collections);
// if (!s.ok()) {
// std::cerr << "ERROR! Failed to preload tables: " << preload_collections << std::endl;
// std::cerr << s.ToString() << std::endl;
// kill(0, SIGUSR1);
// }
// preload collection
std::string preload_collections = config.cache.preload_collection();
s = PreloadCollections(preload_collections);
if (!s.ok()) {
std::cerr << "ERROR! Failed to preload collections: " << preload_collections << std::endl;
std::cerr << s.ToString() << std::endl;
kill(0, SIGUSR1);
}
return Status::OK();
}
......@@ -150,38 +128,39 @@ DBWrapper::StopService() {
return Status::OK();
}
// Status
// DBWrapper::PreloadCollections(const std::string& preload_collections) {
// if (preload_collections.empty()) {
// // do nothing
// } else if (preload_collections == "*") {
// // load all tables
// // SS TODO: Replace name with id
// std::vector<std::string> names;
// auto status = db_->AllCollections(names);
// if (!status.ok()) {
// return status;
// }
//
// for (auto& name : names) {
// auto status = db_->PreloadCollection(nullptr, name);
// if (!status.ok()) {
// return status;
// }
// }
// } else {
// std::vector<std::string> collection_names;
// StringHelpFunctions::SplitStringByDelimeter(preload_collections, ",", collection_names);
// for (auto& name : collection_names) {
// auto status = db_->PreloadCollection(nullptr, name);
// if (!status.ok()) {
// return status;
// }
// }
// }
//
// return Status::OK();
//}
Status
DBWrapper::PreloadCollections(const std::string& preload_collections) {
if (preload_collections.empty()) {
// do nothing
} else if (preload_collections == "*") {
// load all collections
std::vector<std::string> names;
auto status = db_->ListCollections(names);
if (!status.ok()) {
return status;
}
for (auto& name : names) {
std::vector<std::string> field_names; // input empty field names will load all fileds
auto status = db_->LoadCollection(nullptr, name, field_names);
if (!status.ok()) {
return status;
}
}
} else {
std::vector<std::string> collection_names;
StringHelpFunctions::SplitStringByDelimeter(preload_collections, ",", collection_names);
for (auto& name : collection_names) {
std::vector<std::string> field_names; // input empty field names will load all fileds
auto status = db_->LoadCollection(nullptr, name, field_names);
if (!status.ok()) {
return status;
}
}
}
return Status::OK();
}
} // namespace server
} // namespace milvus
......@@ -46,9 +46,9 @@ class DBWrapper {
return db_;
}
// private:
// Status
// PreloadCollections(const std::string& preload_collections);
private:
Status
PreloadCollections(const std::string& preload_collections);
private:
engine::DBPtr db_;
......
......@@ -10,6 +10,7 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "server/ValidationUtil.h"
#include "db/Constants.h"
#include "db/Utils.h"
#include "knowhere/index/vector_index/ConfAdapter.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
......@@ -405,5 +406,33 @@ ValidatePartitionTags(const std::vector<std::string>& partition_tags) {
return Status::OK();
}
Status
ValidateInsertDataSize(const engine::DataChunkPtr& data) {
int64_t total_size = 0;
for (auto& pair : data->fixed_fields_) {
if (pair.second == nullptr) {
continue;
}
total_size += pair.second->Size();
}
for (auto& pair : data->variable_fields_) {
if (pair.second == nullptr) {
continue;
}
total_size += pair.second->Size();
}
if (total_size > engine::MAX_INSERT_DATA_SIZE) {
std::string msg = "The amount of data inserted each time cannot exceed " +
std::to_string(engine::MAX_INSERT_DATA_SIZE / engine::MB) + " MB";
return Status(SERVER_INVALID_ROWRECORD_ARRAY, msg);
}
return Status::OK();
}
} // namespace server
} // namespace milvus
......@@ -49,5 +49,8 @@ ValidateSearchTopk(int64_t top_k);
extern Status
ValidatePartitionTags(const std::vector<std::string>& partition_tags);
extern Status
ValidateInsertDataSize(const engine::DataChunkPtr& data);
} // namespace server
} // namespace milvus
......@@ -59,12 +59,14 @@ InsertReq::OnExecute() {
"The vector field is empty, Make sure you have entered vector records"};
}
// step 1: check collection existence
bool exist = false;
auto status = DBWrapper::DB()->HasCollection(collection_name_, exist);
if (!exist) {
return Status(SERVER_COLLECTION_NOT_EXIST, "Collection not exist: " + collection_name_);
}
// step 2: construct insert data
engine::DataChunkPtr data_chunk = std::make_shared<engine::DataChunk>();
data_chunk->count_ = row_count_;
for (auto& pair : chunk_data_) {
......@@ -72,11 +74,22 @@ InsertReq::OnExecute() {
bin->data_.swap(pair.second);
data_chunk->fixed_fields_.insert(std::make_pair(pair.first, bin));
}
// step 3: check insert data limitation
status = ValidateInsertDataSize(data_chunk);
if (!status.ok()) {
LOG_SERVER_ERROR_ << LogOut("[%s][%d] Invalid vector data: %s", "insert", 0, status.message().c_str());
return status;
}
// step 4: insert data into db
status = DBWrapper::DB()->Insert(collection_name_, partition_name_, data_chunk);
if (!status.ok()) {
LOG_SERVER_ERROR_ << LogOut("[%s][%ld] %s", "Insert", 0, status.message().c_str());
return status;
}
// step 5: return entity id to client
chunk_data_[engine::FIELD_UID] = data_chunk->fixed_fields_[engine::FIELD_UID]->data_;
rc.ElapseFromBegin("done");
......
......@@ -16,8 +16,6 @@ set( TEST_FILES ${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_segment.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_db.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_meta.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_ss_job.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_ss_task.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_ss_event.cpp
)
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include <gtest/gtest.h>
#include "scheduler/job/BuildIndexJob.h"
#include "scheduler/job/SearchJob.h"
namespace milvus {
namespace scheduler {
class TestJob : public Job {
public:
TestJob() : Job(JobType::INVALID) {}
};
TEST(JobTest, TestJob) {
// engine::DBOptions options;
// auto build_index_ptr = std::make_shared<SSBuildIndexJob>(options);
// build_index_ptr->Dump();
// build_index_ptr->AddSegmentVisitor(nullptr);
//
// TestJob test_job;
// test_job.Dump();
//
// /* collect all valid segment */
// std::vector<milvus::engine::SegmentVisitorPtr> segment_visitors;
// auto executor = [&](const SegmentPtr& segment, SegmentIterator* handler) -> Status {
// auto visitor = SegmentVisitor::Build(ss, segment->GetID());
// if (visitor == nullptr) {
// return Status(milvus::SS_ERROR, "Cannot build segment visitor");
// }
// segment_visitors.push_back(visitor);
// return Status::OK();
// };
//
// auto segment_iter = std::make_shared<SegmentIterator>(ss, executor);
// segment_iter->Iterate();
// ASSERT_TRUE(segment_iter->GetStatus().ok());
// ASSERT_EQ(segment_visitors.size(), 2);
/* create BuildIndexJob */
// milvus::scheduler::BuildIndexJobPtr build_index_job =
// std::make_shared<milvus::scheduler::SSBuildIndexJob>("");
// for (auto& sv : segment_visitors) {
// build_index_job->AddSegmentVisitor(sv);
// }
/* put search job to scheduler and wait result */
// milvus::scheduler::JobMgrInst::GetInstance()->Put(build_index_job);
// build_index_job->WaitFinish();
// /* create SearchJob */
// milvus::scheduler::SearchJobPtr search_job =
// std::make_shared<milvus::scheduler::SSSearchJob>(nullptr, "", nullptr);
// for (auto& sv : segment_visitors) {
// search_job->AddSegmentVisitor(sv);
// }
//
// /* put search job to scheduler and wait result */
// milvus::scheduler::JobMgrInst::GetInstance()->Put(search_job);
// search_job->WaitFinish();
}
} // namespace scheduler
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include <fiu-local.h>
#include <fiu-control.h>
#include <gtest/gtest.h>
#include <opentracing/mocktracer/tracer.h>
#include "db/DBFactory.h"
#include "scheduler/SchedInst.h"
#include "scheduler/job/BuildIndexJob.h"
#include "scheduler/job/SearchJob.h"
#include "scheduler/resource/CpuResource.h"
#include "scheduler/tasklabel/BroadcastLabel.h"
#include "scheduler/task/BuildIndexTask.h"
#include "scheduler/task/SearchTask.h"
namespace milvus {
namespace scheduler {
TEST(TaskTest, INVALID_INDEX) {
auto dummy_context = std::make_shared<milvus::server::Context>("dummy_request_id");
opentracing::mocktracer::MockTracerOptions tracer_options;
auto mock_tracer =
std::shared_ptr<opentracing::Tracer>{new opentracing::mocktracer::MockTracer{std::move(tracer_options)}};
auto mock_span = mock_tracer->StartSpan("mock_span");
auto trace_context = std::make_shared<milvus::tracing::TraceContext>(mock_span);
dummy_context->SetTraceContext(trace_context);
}
TEST(TaskTest, TEST_PATH) {
Path path;
auto empty_path = path.Current();
ASSERT_TRUE(empty_path.empty());
empty_path = path.Next();
ASSERT_TRUE(empty_path.empty());
empty_path = path.Last();
ASSERT_TRUE(empty_path.empty());
}
} // namespace scheduler
} // namespace milvus
......@@ -28,7 +28,6 @@
#include "config/ServerConfig.h"
#include "codecs/Codec.h"
#include "db/DBFactory.h"
#include "db/Options.h"
#include "db/snapshot/EventExecutor.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/Snapshots.h"
......@@ -48,69 +47,6 @@ INITIALIZE_EASYLOGGINGPP
namespace {
static const char* CONFIG_STR =
"version: 0.5\n"
"\n"
"cluster:\n"
" enable: false\n"
" role: rw\n"
"\n"
"general:\n"
" timezone: UTC+8\n"
" meta_uri: mock://:@:/\n"
"\n"
"network:\n"
" bind.address: 0.0.0.0\n"
" bind.port: 19530\n"
" http.enable: true\n"
" http.port: 19121\n"
"\n"
"storage:\n"
" path: /tmp/milvus\n"
" auto_flush_interval: 1\n"
"\n"
"wal:\n"
" enable: true\n"
" recovery_error_ignore: false\n"
" buffer_size: 256MB\n"
" path: /tmp/milvus/wal\n"
"\n"
"cache:\n"
" cache_size: 4GB\n"
" insert_buffer_size: 1GB\n"
" preload_collection:\n"
"\n"
"gpu:\n"
" enable: true\n"
" cache_size: 1GB\n"
" gpu_search_threshold: 1000\n"
" search_devices:\n"
" - gpu0\n"
" build_index_devices:\n"
" - gpu0\n"
"\n"
"logs:\n"
" level: debug\n"
" trace.enable: true\n"
" path: /tmp/milvus/logs\n"
" max_log_file_size: 1024MB\n"
" log_rotate_num: 0\n"
"\n"
"metric:\n"
" enable: false\n"
" address: 127.0.0.1\n"
" port: 9091\n"
"\n";
void
WriteToFile(const std::string &file_path, const char *content) {
std::fstream fs(file_path.c_str(), std::ios_base::out);
// write data to file
fs << content;
fs.close();
}
class DBTestEnvironment : public ::testing::Environment {
public:
explicit DBTestEnvironment(const std::string &uri) : uri_(uri) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册