未验证 提交 8d5ab34a 编写于 作者: G groot 提交者: GitHub

#1900 (#1923)

* add log
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* fix #1900
Signed-off-by: Ngroot <yihua.mo@zilliz.com>
上级 e22ba03f
......@@ -968,7 +968,7 @@ DBImpl::GetVectorIDs(const std::string& collection_id, const std::string& segmen
Status
DBImpl::GetVectorByIdHelper(const std::string& collection_id, IDNumber vector_id, VectorsData& vector,
const meta::SegmentsSchema& files) {
ENGINE_LOG_DEBUG << "Getting vector by id in " << files.size() << " files";
ENGINE_LOG_DEBUG << "Getting vector by id in " << files.size() << " files, id = " << vector_id;
vector.vector_count_ = 0;
vector.float_data_.clear();
......
......@@ -651,6 +651,9 @@ MySQLMetaImpl::DeleteCollectionFiles(const std::string& collection_id) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
// soft delete collection files
mysqlpp::Query statement = connectionPtr->query();
//
......@@ -726,6 +729,9 @@ MySQLMetaImpl::CreateCollectionFile(SegmentSchema& file_schema) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "INSERT INTO " << META_TABLEFILES << " VALUES(" << id << ", " << mysqlpp::quote
......@@ -777,6 +783,9 @@ MySQLMetaImpl::GetCollectionFiles(const std::string& collection_id, const std::v
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement
<< "SELECT id, segment_id, engine_type, file_id, file_type, file_size, row_count, date, created_on"
......@@ -834,6 +843,9 @@ MySQLMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id,
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, "
<< "row_count, date, created_on"
......@@ -897,15 +909,14 @@ MySQLMetaImpl::UpdateCollectionIndex(const std::string& collection_id, const Col
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
mysqlpp::Query updateCollectionIndexParamQuery = connectionPtr->query();
updateCollectionIndexParamQuery << "SELECT id, state, dimension, created_on"
<< " FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
<< collection_id << " AND state <> "
<< std::to_string(CollectionSchema::TO_DELETE) << ";";
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, state, dimension, created_on"
<< " FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote << collection_id
<< " AND state <> " << std::to_string(CollectionSchema::TO_DELETE) << ";";
ENGINE_LOG_DEBUG << "UpdateCollectionIndex: " << updateCollectionIndexParamQuery.str();
ENGINE_LOG_DEBUG << "UpdateCollectionIndex: " << statement.str();
mysqlpp::StoreQueryResult res = updateCollectionIndexParamQuery.store();
mysqlpp::StoreQueryResult res = statement.store();
if (res.num_rows() == 1) {
const mysqlpp::Row& resRow = res[0];
......@@ -915,18 +926,16 @@ MySQLMetaImpl::UpdateCollectionIndex(const std::string& collection_id, const Col
uint16_t dimension = resRow["dimension"];
int64_t created_on = resRow["created_on"];
updateCollectionIndexParamQuery
<< "UPDATE " << META_TABLES << " SET id = " << id << " ,state = " << state
<< " ,dimension = " << dimension << " ,created_on = " << created_on
<< " ,engine_type = " << index.engine_type_ << " ,index_params = " << mysqlpp::quote
<< index.extra_params_.dump() << " ,metric_type = " << index.metric_type_
<< " WHERE table_id = " << mysqlpp::quote << collection_id << ";";
statement << "UPDATE " << META_TABLES << " SET id = " << id << " ,state = " << state
<< " ,dimension = " << dimension << " ,created_on = " << created_on
<< " ,engine_type = " << index.engine_type_ << " ,index_params = " << mysqlpp::quote
<< index.extra_params_.dump() << " ,metric_type = " << index.metric_type_
<< " WHERE table_id = " << mysqlpp::quote << collection_id << ";";
ENGINE_LOG_DEBUG << "UpdateCollectionIndex: " << updateCollectionIndexParamQuery.str();
ENGINE_LOG_DEBUG << "UpdateCollectionIndex: " << statement.str();
if (!updateCollectionIndexParamQuery.exec()) {
return HandleException("Failed to update collection index",
updateCollectionIndexParamQuery.error());
if (!statement.exec()) {
return HandleException("Failed to update collection index", statement.error());
}
} else {
return Status(DB_NOT_FOUND, "Collection " + collection_id + " not found");
......@@ -1019,7 +1028,7 @@ MySQLMetaImpl::GetCollectionFlushLSN(const std::string& collection_id, uint64_t&
}
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT flush_lsn FROM " << META_TABLES << " WHERE collection_id = " << mysqlpp::quote
statement << "SELECT flush_lsn FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote
<< collection_id << ";";
ENGINE_LOG_DEBUG << "GetCollectionFlushLSN: " << statement.str();
......@@ -1054,6 +1063,9 @@ MySQLMetaImpl::UpdateCollectionFile(SegmentSchema& file_schema) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
// if the collection has been deleted, just mark the collection file as TO_DELETE
......@@ -1120,6 +1132,9 @@ MySQLMetaImpl::UpdateCollectionFilesToIndex(const std::string& collection_id) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "UPDATE " << META_TABLEFILES << " SET file_type = " << std::to_string(SegmentSchema::TO_INDEX)
......@@ -1155,6 +1170,9 @@ MySQLMetaImpl::UpdateCollectionFiles(SegmentsSchema& files) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
std::map<std::string, bool> has_collections;
......@@ -1229,6 +1247,9 @@ MySQLMetaImpl::UpdateCollectionFilesRowCount(SegmentsSchema& files) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
for (auto& file : files) {
......@@ -1442,7 +1463,7 @@ MySQLMetaImpl::ShowPartitions(const std::string& collection_id,
<< " WHERE owner_table = " << mysqlpp::quote << collection_id << " AND state <> "
<< std::to_string(CollectionSchema::TO_DELETE) << ";";
ENGINE_LOG_DEBUG << "AllCollections: " << statement.str();
ENGINE_LOG_DEBUG << "ShowPartitions: " << statement.str();
res = statement.store();
} // Scoped Connection
......@@ -1498,7 +1519,7 @@ MySQLMetaImpl::GetPartitionName(const std::string& collection_id, const std::str
<< collection_id << " AND partition_tag = " << mysqlpp::quote << valid_tag << " AND state <> "
<< std::to_string(CollectionSchema::TO_DELETE) << ";";
ENGINE_LOG_DEBUG << "AllCollections: " << statement.str();
ENGINE_LOG_DEBUG << "GetPartitionName: " << statement.str();
res = statement.store();
} // Scoped Connection
......@@ -1533,6 +1554,9 @@ MySQLMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema& f
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date"
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id;
......@@ -1615,6 +1639,9 @@ MySQLMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& fi
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date, "
"engine_type, created_on"
......@@ -1684,6 +1711,9 @@ MySQLMetaImpl::FilesToIndex(SegmentsSchema& files) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, "
"row_count, date, created_on"
......@@ -1773,16 +1803,19 @@ MySQLMetaImpl::FilesByType(const std::string& collection_id, const std::vector<i
types += std::to_string(type);
}
mysqlpp::Query hasNonIndexFilesQuery = connectionPtr->query();
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
// since collection_id is a unique column we just need to check whether it exists or not
hasNonIndexFilesQuery
statement
<< "SELECT id, segment_id, engine_type, file_id, file_type, file_size, row_count, date, created_on"
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id
<< " AND file_type in (" << types << ");";
ENGINE_LOG_DEBUG << "FilesByType: " << hasNonIndexFilesQuery.str();
ENGINE_LOG_DEBUG << "FilesByType: " << statement.str();
res = hasNonIndexFilesQuery.store();
res = statement.store();
} // Scoped Connection
CollectionSchema collection_schema;
......@@ -1906,6 +1939,9 @@ MySQLMetaImpl::FilesByID(const std::vector<size_t>& ids, SegmentsSchema& files)
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date"
<< " FROM " << META_TABLEFILES;
......@@ -2054,6 +2090,9 @@ MySQLMetaImpl::Size(uint64_t& result) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT IFNULL(SUM(file_size),0) AS sum"
<< " FROM " << META_TABLEFILES << " WHERE file_type <> "
......@@ -2143,14 +2182,21 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/)
}
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, date"
<< " FROM " << META_TABLEFILES << " WHERE file_type IN ("
<< std::to_string(SegmentSchema::TO_DELETE) << "," << std::to_string(SegmentSchema::BACKUP) << ")"
<< " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";
mysqlpp::StoreQueryResult res;
{
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
ENGINE_LOG_DEBUG << "CleanUpFilesWithTTL: " << statement.str();
statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, date"
<< " FROM " << META_TABLEFILES << " WHERE file_type IN ("
<< std::to_string(SegmentSchema::TO_DELETE) << "," << std::to_string(SegmentSchema::BACKUP)
<< ")"
<< " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";
mysqlpp::StoreQueryResult res = statement.store();
ENGINE_LOG_DEBUG << "CleanUpFilesWithTTL: " << statement.str();
res = statement.store();
}
SegmentSchema collection_file;
std::vector<std::string> delete_ids;
......@@ -2385,6 +2431,9 @@ MySQLMetaImpl::Count(const std::string& collection_id, uint64_t& result) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT row_count"
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id
......
......@@ -162,6 +162,7 @@ class MySQLMetaImpl : public Meta {
std::shared_ptr<MySQLConnectionPool> mysql_connection_pool_;
bool safe_grab_ = false;
std::mutex meta_mutex_;
std::mutex genid_mutex_;
// std::mutex connectionMutex_;
}; // DBMetaImpl
......
......@@ -253,6 +253,9 @@ SqliteMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not
try {
fiu_do_on("SqliteMetaImpl.HasCollection.throw_exception", throw std::exception());
server::MetricCollector metric;
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto collections = ConnectorPtr->select(
columns(&CollectionSchema::id_),
where(c(&CollectionSchema::collection_id_) == collection_id and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
......@@ -273,6 +276,9 @@ SqliteMetaImpl::AllCollections(std::vector<CollectionSchema>& collection_schema_
try {
fiu_do_on("SqliteMetaImpl.AllCollections.throw_exception", throw std::exception());
server::MetricCollector metric;
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto selected = ConnectorPtr->select(
columns(&CollectionSchema::id_, &CollectionSchema::collection_id_, &CollectionSchema::dimension_, &CollectionSchema::created_on_,
&CollectionSchema::flag_, &CollectionSchema::index_file_size_, &CollectionSchema::engine_type_,
......@@ -403,12 +409,7 @@ SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std::
fiu_do_on("SqliteMetaImpl.GetCollectionFiles.throw_exception", throw std::exception());
collection_files.clear();
auto files = ConnectorPtr->select(
columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_,
&SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_,
&SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_),
where(c(&SegmentSchema::collection_id_) == collection_id and in(&SegmentSchema::id_, ids) and
c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE));
CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_id;
auto status = DescribeCollection(collection_schema);
......@@ -416,8 +417,20 @@ SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std::
return status;
}
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_,
&SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_,
&SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_);
decltype(ConnectorPtr->select(select_columns)) selected;
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns,
where(c(&SegmentSchema::collection_id_) == collection_id and in(&SegmentSchema::id_, ids) and
c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE));
}
Status result;
for (auto& file : files) {
for (auto& file : selected) {
SegmentSchema file_schema;
file_schema.collection_id_ = collection_id;
file_schema.id_ = std::get<0>(file);
......@@ -451,23 +464,29 @@ SqliteMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id,
milvus::engine::meta::SegmentsSchema& collection_files) {
try {
collection_files.clear();
auto files = ConnectorPtr->select(
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
&SegmentSchema::created_on_),
where(c(&SegmentSchema::segment_id_) == segment_id and
c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE));
if (!files.empty()) {
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
&SegmentSchema::created_on_);
decltype(ConnectorPtr->select(select_columns)) selected;
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns,
where(c(&SegmentSchema::segment_id_) == segment_id and
c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE));
}
if (!selected.empty()) {
CollectionSchema collection_schema;
collection_schema.collection_id_ = std::get<1>(files[0]);
collection_schema.collection_id_ = std::get<1>(selected[0]);
auto status = DescribeCollection(collection_schema);
if (!status.ok()) {
return status;
}
for (auto& file : files) {
for (auto& file : selected) {
SegmentSchema file_schema;
file_schema.collection_id_ = collection_schema.collection_id_;
file_schema.id_ = std::get<0>(file);
......@@ -502,6 +521,9 @@ SqliteMetaImpl::UpdateCollectionFlag(const std::string& collection_id, int64_t f
server::MetricCollector metric;
fiu_do_on("SqliteMetaImpl.UpdateCollectionFlag.throw_exception", throw std::exception());
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
// set all backup file to raw
ConnectorPtr->update_all(set(c(&CollectionSchema::flag_) = flag), where(c(&CollectionSchema::collection_id_) == collection_id));
ENGINE_LOG_DEBUG << "Successfully update collection flag, collection id = " << collection_id;
......@@ -518,6 +540,9 @@ SqliteMetaImpl::UpdateCollectionFlushLSN(const std::string& collection_id, uint6
try {
server::MetricCollector metric;
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
ConnectorPtr->update_all(set(c(&CollectionSchema::flush_lsn_) = flush_lsn),
where(c(&CollectionSchema::collection_id_) == collection_id));
ENGINE_LOG_DEBUG << "Successfully update collection flush_lsn, collection id = " << collection_id << " flush_lsn = " << flush_lsn;;
......@@ -534,6 +559,9 @@ SqliteMetaImpl::GetCollectionFlushLSN(const std::string& collection_id, uint64_t
try {
server::MetricCollector metric;
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto selected =
ConnectorPtr->select(columns(&CollectionSchema::flush_lsn_), where(c(&CollectionSchema::collection_id_) == collection_id));
......@@ -920,6 +948,14 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema&
server::MetricCollector metric;
fiu_do_on("SqliteMetaImpl.FilesToSearch.throw_exception", throw std::exception());
CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_id;
auto status = DescribeCollection(collection_schema);
if (!status.ok()) {
return status;
}
// perform query
auto select_columns =
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
......@@ -930,18 +966,13 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema&
std::vector<int> file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX,
(int)SegmentSchema::INDEX};
auto match_type = in(&SegmentSchema::file_type_, file_types);
CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_id;
auto status = DescribeCollection(collection_schema);
if (!status.ok()) {
return status;
}
// perform query
decltype(ConnectorPtr->select(select_columns)) selected;
auto filter = where(match_collectionid and match_type);
selected = ConnectorPtr->select(select_columns, filter);
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto filter = where(match_collectionid and match_type);
selected = ConnectorPtr->select(select_columns, filter);
}
Status ret;
for (auto& file : selected) {
......@@ -998,13 +1029,18 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& f
}
// get files to merge
auto selected = ConnectorPtr->select(
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::created_on_),
where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::RAW and
c(&SegmentSchema::collection_id_) == collection_id),
order_by(&SegmentSchema::file_size_).desc());
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::created_on_);
decltype(ConnectorPtr->select(select_columns)) selected;
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns,
where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::RAW and
c(&SegmentSchema::collection_id_) == collection_id),
order_by(&SegmentSchema::file_size_).desc());
}
Status result;
int64_t to_merge_files = 0;
......@@ -1055,12 +1091,17 @@ SqliteMetaImpl::FilesToIndex(SegmentsSchema& files) {
server::MetricCollector metric;
auto selected = ConnectorPtr->select(
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
&SegmentSchema::created_on_),
where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::TO_INDEX));
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
&SegmentSchema::created_on_);
decltype(ConnectorPtr->select(select_columns)) selected;
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns,
where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::TO_INDEX));
}
std::map<std::string, CollectionSchema> groups;
SegmentSchema collection_file;
......@@ -1118,22 +1159,33 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, const std::vector<
Status ret = Status::OK();
CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_id;
auto status = DescribeCollection(collection_schema);
if (!status.ok()) {
return status;
}
try {
fiu_do_on("SqliteMetaImpl.FilesByType.throw_exception", throw std::exception());
files.clear();
auto selected = ConnectorPtr->select(
columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_,
&SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_,
&SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_),
where(in(&SegmentSchema::file_type_, file_types) and c(&SegmentSchema::collection_id_) == collection_id));
CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_id;
auto status = DescribeCollection(collection_schema);
if (!status.ok()) {
return status;
}
// get files by type
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_,
&SegmentSchema::file_type_,
&SegmentSchema::file_size_,
&SegmentSchema::row_count_,
&SegmentSchema::date_,
&SegmentSchema::engine_type_,
&SegmentSchema::created_on_);
decltype(ConnectorPtr->select(select_columns)) selected;
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns,
where(in(&SegmentSchema::file_type_, file_types) and c(&SegmentSchema::collection_id_) == collection_id));
}
if (selected.size() >= 1) {
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
......@@ -1232,7 +1284,6 @@ SqliteMetaImpl::FilesByID(const std::vector<size_t>& ids, SegmentsSchema& files)
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_);
std::vector<int> file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX,
(int)SegmentSchema::INDEX};
auto match_type = in(&SegmentSchema::file_type_, file_types);
......@@ -1241,7 +1292,11 @@ SqliteMetaImpl::FilesByID(const std::vector<size_t>& ids, SegmentsSchema& files)
decltype(ConnectorPtr->select(select_columns)) selected;
auto match_fileid = in(&SegmentSchema::id_, ids);
auto filter = where(match_fileid and match_type);
selected = ConnectorPtr->select(select_columns, filter);
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns, filter);
}
std::map<std::string, meta::CollectionSchema> collections;
Status ret;
......@@ -1573,9 +1628,14 @@ SqliteMetaImpl::Count(const std::string& collection_id, uint64_t& result) {
std::vector<int> file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX,
(int)SegmentSchema::INDEX};
auto selected = ConnectorPtr->select(
columns(&SegmentSchema::row_count_),
where(in(&SegmentSchema::file_type_, file_types) and c(&SegmentSchema::collection_id_) == collection_id));
auto select_columns = columns(&SegmentSchema::row_count_);
decltype(ConnectorPtr->select(select_columns)) selected;
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns,
where(in(&SegmentSchema::file_type_, file_types) and c(&SegmentSchema::collection_id_) == collection_id));
}
CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_id;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册