未验证 提交 08ad77c7 编写于 作者: X xige-16 提交者: GitHub

Delete all repeated primary keys (#16863)

Signed-off-by: Nxige-16 <xi.ge@zilliz.com>
上级 b3eb2b1d
......@@ -21,6 +21,7 @@
#include <string>
#include <utility>
#include <vector>
#include <tbb/concurrent_unordered_map.h>
#include <boost/align/aligned_allocator.hpp>
#include <boost/container/vector.hpp>
#include <boost/dynamic_bitset.hpp>
......@@ -67,6 +68,7 @@ using IdArray = proto::schema::IDs;
using MetricType = faiss::MetricType;
using InsertData = proto::segcore::InsertRecord;
using PkType = std::variant<std::monostate, int64_t, std::string>;
using Pk2OffsetType = tbb::concurrent_unordered_multimap<PkType, int64_t, std::hash<PkType>>;
MetricType
GetMetricType(const std::string& type);
......
......@@ -21,7 +21,7 @@ get_barrier(const RecordType& record, Timestamp timestamp) {
int64_t end = record.ack_responder_.GetAck();
while (beg < end) {
auto mid = (beg + end) / 2;
if (vec[mid] < timestamp) {
if (vec[mid] <= timestamp) {
beg = mid + 1;
} else {
end = mid;
......
......@@ -23,18 +23,6 @@
namespace milvus::segcore {
using SearchResult = milvus::SearchResult;
struct RowBasedRawData {
void* raw_data; // schema
int sizeof_per_row; // alignment
int64_t count;
};
struct ColumnBasedRawData {
std::vector<aligned_vector<uint8_t>> columns_;
int64_t count;
};
class SegmentGrowing : public SegmentInternalInterface {
public:
virtual void
......
......@@ -38,65 +38,14 @@ SegmentGrowingImpl::PreDelete(int64_t size) {
return reserved_begin;
}
std::shared_ptr<DeletedRecord::TmpBitmap>
SegmentGrowingImpl::get_deleted_bitmap(int64_t del_barrier,
Timestamp query_timestamp,
int64_t insert_barrier,
bool force) const {
auto old = deleted_record_.get_lru_entry();
if (old->bitmap_ptr->size() == insert_barrier) {
if (old->del_barrier == del_barrier) {
return old;
}
}
auto current = old->clone(insert_barrier);
current->del_barrier = del_barrier;
auto bitmap = current->bitmap_ptr;
int64_t start, end;
if (del_barrier < old->del_barrier) {
start = del_barrier;
end = old->del_barrier;
} else {
start = old->del_barrier;
end = del_barrier;
}
for (auto del_index = start; del_index < end; ++del_index) {
// get uid in delete logs
auto uid = deleted_record_.pks_[del_index];
// map uid to corresponding offsets, select the max one, which should be the target
// the max one should be closest to query_timestamp, so the delete log should refer to it
int64_t the_offset = -1;
auto [iter_b, iter_e] = pk2offset_.equal_range(uid);
for (auto iter = iter_b; iter != iter_e; ++iter) {
auto offset = iter->second;
AssertInfo(offset < insert_barrier, "Timestamp offset is larger than insert barrier");
the_offset = std::max(the_offset, offset);
if (the_offset == -1) {
continue;
}
if (insert_record_.timestamps_[the_offset] >= query_timestamp) {
bitmap->reset(the_offset);
} else {
bitmap->set(the_offset);
}
}
}
this->deleted_record_.insert_lru_entry(current);
return current;
}
void
SegmentGrowingImpl::mask_with_delete(BitsetType& bitset, int64_t ins_barrier, Timestamp timestamp) const {
auto del_barrier = get_barrier(get_deleted_record(), timestamp);
if (del_barrier == 0) {
return;
}
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier);
auto bitmap_holder =
get_deleted_bitmap(del_barrier, ins_barrier, deleted_record_, insert_record_, pk2offset_, timestamp);
if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
return;
}
......
......@@ -195,12 +195,6 @@ class SegmentGrowingImpl : public SegmentGrowing {
}
protected:
std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap(int64_t del_barrier,
Timestamp query_timestamp,
int64_t insert_barrier,
bool force = false) const;
int64_t
num_chunk() const override;
......@@ -227,7 +221,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
mutable DeletedRecord deleted_record_;
// pks to row offset
tbb::concurrent_unordered_multimap<PkType, int64_t, std::hash<PkType>> pk2offset_;
Pk2OffsetType pk2offset_;
int64_t id_;
private:
......
......@@ -279,67 +279,14 @@ SegmentSealedImpl::get_schema() const {
return *schema_;
}
std::shared_ptr<DeletedRecord::TmpBitmap>
SegmentSealedImpl::get_deleted_bitmap(int64_t del_barrier,
Timestamp query_timestamp,
int64_t insert_barrier,
bool force) const {
auto old = deleted_record_.get_lru_entry();
auto current = old->clone(insert_barrier);
current->del_barrier = del_barrier;
auto bitmap = current->bitmap_ptr;
// Sealed segment only has one chunk with chunk_id 0
auto delete_pks_data = deleted_record_.pks_.get_chunk_data(0);
auto delete_pks = reinterpret_cast<const PkType*>(delete_pks_data);
auto del_size = deleted_record_.reserved.load();
std::vector<SegOffset> seg_offsets;
std::vector<PkType> pks;
for (int i = 0; i < del_size; ++i) {
auto [iter_b, iter_e] = pk2offset_.equal_range(delete_pks[i]);
for (auto iter = iter_b; iter != iter_e; ++iter) {
auto [entry_pk, entry_offset] = *iter;
pks.emplace_back(entry_pk);
seg_offsets.emplace_back(SegOffset(entry_offset));
}
}
for (int i = 0; i < pks.size(); ++i) {
bitmap->set(seg_offsets[i].get());
}
if (pks.size() == 0 || seg_offsets.size() == 0) {
return current;
}
int64_t start, end;
if (del_barrier < old->del_barrier) {
start = del_barrier;
end = old->del_barrier;
} else {
start = old->del_barrier;
end = del_barrier;
}
for (auto del_index = start; del_index < end; ++del_index) {
int64_t the_offset = seg_offsets[del_index].get();
AssertInfo(the_offset >= 0, "Seg offset is invalid");
if (deleted_record_.timestamps_[del_index] >= query_timestamp) {
bitmap->reset(the_offset);
} else {
bitmap->set(the_offset);
}
}
this->deleted_record_.insert_lru_entry(current);
return current;
}
void
SegmentSealedImpl::mask_with_delete(BitsetType& bitset, int64_t ins_barrier, Timestamp timestamp) const {
auto del_barrier = get_barrier(get_deleted_record(), timestamp);
if (del_barrier == 0) {
return;
}
auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier);
auto bitmap_holder =
get_deleted_bitmap(del_barrier, ins_barrier, deleted_record_, insert_record_, pk2offset_, timestamp);
if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
return;
}
......@@ -463,7 +410,6 @@ SegmentSealedImpl::check_search(const query::Plan* plan) const {
SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema, int64_t segment_id)
: schema_(schema),
// fields_data_(schema->size()),
insert_record_(*schema, MAX_ROW_COUNT),
field_data_ready_bitset_(schema->size()),
vecindex_ready_bitset_(schema->size()),
......
......@@ -19,7 +19,6 @@
#include <utility>
#include <vector>
#include <tbb/concurrent_priority_queue.h>
#include <tbb/concurrent_unordered_map.h>
#include <tbb/concurrent_vector.h>
#include "ConcurrentVector.h"
......@@ -105,12 +104,6 @@ class SegmentSealedImpl : public SegmentSealed {
int64_t
get_active_count(Timestamp ts) const override;
std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap(int64_t del_barrier,
Timestamp query_timestamp,
int64_t insert_barrier,
bool force = false) const;
private:
template <typename T>
static void
......@@ -194,7 +187,7 @@ class SegmentSealedImpl : public SegmentSealed {
mutable DeletedRecord deleted_record_;
// pks to row offset
tbb::concurrent_unordered_multimap<PkType, int64_t, std::hash<PkType>> pk2offset_;
Pk2OffsetType pk2offset_;
// std::unique_ptr<ScalarIndexBase> primary_key_index_;
SchemaPtr schema_;
......
......@@ -255,4 +255,68 @@ MergeDataArray(std::vector<std::pair<milvus::SearchResult*, int64_t>>& result_of
return data_array;
}
// insert_barrier means num row of insert data in a segment
// del_barrier means that if the pk of the insert data is in delete record[0 : del_barrier]
// then the data corresponding to this pk may be ignored when searching/querying
// and refer to func get_barrier, all ts in delete record[0 : del_barrier] < query_timestamp
// assert old insert record pks = [5, 2, 4, 1, 3, 8, 7, 6]
// assert old delete record pks = [2, 4, 3, 8, 5], old delete record ts = [100, 100, 150, 200, 400, 500, 500, 500]
// if delete_barrier = 3, query time = 180, then insert records with pks in [2, 4, 3] will be deleted
// then the old bitmap = [0, 1, 1, 0, 1, 0, 0, 0]
std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap(int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
const InsertRecord& insert_record,
const Pk2OffsetType& pk2offset,
Timestamp query_timestamp) {
auto old = delete_record.get_lru_entry();
// if insert_barrier and del_barrier have not changed, use cache data directly
if (old->bitmap_ptr->size() == insert_barrier) {
if (old->del_barrier == del_barrier) {
return old;
}
}
auto current = old->clone(insert_barrier);
current->del_barrier = del_barrier;
auto bitmap = current->bitmap_ptr;
int64_t start, end;
if (del_barrier < old->del_barrier) {
// in this case, ts of delete record[current_del_barrier : old_del_barrier] > query_timestamp
// so these deletion records do not take effect in query/search
// so bitmap corresponding to those pks in delete record[current_del_barrier:old_del_barrier] wil be reset to 0
// for example, current_del_barrier = 2, query_time = 120, the bitmap will be reset to [0, 1, 1, 0, 0, 0, 0, 0]
start = del_barrier;
end = old->del_barrier;
} else {
// the cache is not enough, so update bitmap using new pks in delete record[old_del_barrier:current_del_barrier]
// for example, current_del_barrier = 4, query_time = 300, bitmap will be updated to [0, 1, 1, 0, 1, 1, 0, 0]
start = old->del_barrier;
end = del_barrier;
}
for (auto del_index = start; del_index < end; ++del_index) {
// get pk in delete logs
auto pk = delete_record.pks_[del_index];
// find insert data which has same pk
auto [iter_b, iter_e] = pk2offset.equal_range(pk);
for (auto iter = iter_b; iter != iter_e; ++iter) {
auto insert_row_offset = iter->second;
AssertInfo(insert_row_offset < insert_barrier, "Timestamp offset is larger than insert barrier");
if (delete_record.timestamps_[del_index] > query_timestamp) {
// the deletion record do not take effect in search/query, and reset bitmap to 0
bitmap->reset(insert_row_offset);
} else {
// insert data corresponding to the insert_row_offset will be ignored in search/query
bitmap->set(insert_row_offset);
}
}
}
delete_record.insert_lru_entry(current);
return current;
}
} // namespace milvus::segcore
......@@ -18,6 +18,8 @@
#include <stdexcept>
#include <knowhere/common/MetricType.h>
#include "common/QueryResult.h"
#include "DeletedRecord.h"
#include "InsertRecord.h"
namespace milvus::segcore {
......@@ -79,4 +81,12 @@ CreateDataArrayFrom(const void* data_raw, int64_t count, const FieldMeta& field_
std::unique_ptr<DataArray>
MergeDataArray(std::vector<std::pair<milvus::SearchResult*, int64_t>>& result_offsets, const FieldMeta& field_meta);
std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap(int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
const InsertRecord& insert_record,
const Pk2OffsetType& pk2offset,
Timestamp query_timestamp);
} // namespace milvus::segcore
......@@ -240,6 +240,168 @@ TEST(CApiTest, DeleteTest) {
DeleteSegment(segment);
}
TEST(CApiTest, DeleteRepeatedPksFromGrowingSegment) {
auto collection = NewCollection(get_default_schema_config());
auto segment = NewSegment(collection, Growing, -1);
auto col = (milvus::segcore::Collection*)collection;
int N = 10;
auto dataset = DataGen(col->get_schema(), N);
std::string insert_data;
auto marshal = google::protobuf::TextFormat::PrintToString(*dataset.raw_, &insert_data);
assert(marshal == true);
// first insert, pks= {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
int64_t offset;
PreInsert(segment, N, &offset);
auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.c_str());
assert(res.error_code == Success);
// second insert, pks= {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
PreInsert(segment, N, &offset);
res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.c_str());
assert(res.error_code == Success);
// create retrieve plan pks in {1, 2, 3}
std::vector<int64_t> retrive_row_ids = {1, 2, 3};
auto schema = ((milvus::segcore::Collection*)collection)->get_schema();
auto plan = std::make_unique<query::RetrievePlan>(*schema);
auto term_expr = std::make_unique<query::TermExprImpl<int64_t>>(FieldId(101), DataType::INT64, retrive_row_ids);
plan->plan_node_ = std::make_unique<query::RetrievePlanNode>();
plan->plan_node_->predicate_ = std::move(term_expr);
std::vector<FieldId> target_field_ids{FieldId(100), FieldId(101)};
plan->field_ids_ = target_field_ids;
CRetrieveResult retrieve_result;
res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 6);
// delete data pks = {1, 2, 3}
std::vector<int64_t> delete_row_ids = {1, 2, 3};
auto ids = std::make_unique<IdArray>();
ids->mutable_int_id()->mutable_data()->Add(delete_row_ids.begin(), delete_row_ids.end());
std::string delete_data;
marshal = google::protobuf::TextFormat::PrintToString(*ids.get(), &delete_data);
assert(marshal == true);
std::vector<uint64_t> delete_timestamps(3, dataset.timestamps_[N - 1]);
offset = PreDelete(segment, 3);
auto del_res = Delete(segment, offset, 3, delete_data.c_str(), delete_timestamps.data());
assert(del_res.error_code == Success);
// retrieve pks in {1, 2, 3}
res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
query_result = std::make_unique<proto::segcore::RetrieveResults>();
suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 0);
DeleteRetrievePlan(plan.release());
DeleteRetrieveResult(&retrieve_result);
DeleteCollection(collection);
DeleteSegment(segment);
}
TEST(CApiTest, DeleteRepeatedPksFromSealedSegment) {
auto collection = NewCollection(get_default_schema_config());
auto segment = NewSegment(collection, Sealed, -1);
auto col = (milvus::segcore::Collection*)collection;
int N = 20;
auto dataset = DataGen(col->get_schema(), N, 42, 0, 2);
for (auto& [field_id, field_meta] : col->get_schema()->get_fields()) {
auto array = dataset.get_col(field_id);
std::string data;
auto marshal = google::protobuf::TextFormat::PrintToString(*array.get(), &data);
assert(marshal == true);
auto load_info = CLoadFieldDataInfo{field_id.get(), data.c_str(), N};
auto res = LoadFieldData(segment, load_info);
assert(res.error_code == Success);
auto count = GetRowCount(segment);
assert(count == N);
}
FieldMeta ts_field_meta(FieldName("Timestamp"), TimestampFieldID, DataType::INT64);
auto ts_array = CreateScalarDataArrayFrom(dataset.timestamps_.data(), N, ts_field_meta);
std::string ts_data;
auto marshal = google::protobuf::TextFormat::PrintToString(*ts_array.get(), &ts_data);
assert(marshal == true);
auto load_info = CLoadFieldDataInfo{TimestampFieldID.get(), ts_data.c_str(), N};
auto res = LoadFieldData(segment, load_info);
assert(res.error_code == Success);
auto count = GetRowCount(segment);
assert(count == N);
FieldMeta row_id_field_meta(FieldName("RowID"), RowFieldID, DataType::INT64);
auto row_id_array = CreateScalarDataArrayFrom(dataset.row_ids_.data(), N, row_id_field_meta);
std::string row_is_data;
marshal = google::protobuf::TextFormat::PrintToString(*row_id_array.get(), &row_is_data);
assert(marshal == true);
load_info = CLoadFieldDataInfo{RowFieldID.get(), ts_data.c_str(), N};
res = LoadFieldData(segment, load_info);
assert(res.error_code == Success);
count = GetRowCount(segment);
assert(count == N);
// create retrieve plan pks in {1, 2, 3}
std::vector<int64_t> retrive_row_ids = {1, 2, 3};
auto schema = ((milvus::segcore::Collection*)collection)->get_schema();
auto plan = std::make_unique<query::RetrievePlan>(*schema);
auto term_expr = std::make_unique<query::TermExprImpl<int64_t>>(FieldId(101), DataType::INT64, retrive_row_ids);
plan->plan_node_ = std::make_unique<query::RetrievePlanNode>();
plan->plan_node_->predicate_ = std::move(term_expr);
std::vector<FieldId> target_field_ids{FieldId(100), FieldId(101)};
plan->field_ids_ = target_field_ids;
CRetrieveResult retrieve_result;
res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 6);
// delete data pks = {1, 2, 3}
std::vector<int64_t> delete_row_ids = {1, 2, 3};
auto ids = std::make_unique<IdArray>();
ids->mutable_int_id()->mutable_data()->Add(delete_row_ids.begin(), delete_row_ids.end());
std::string delete_data;
marshal = google::protobuf::TextFormat::PrintToString(*ids.get(), &delete_data);
assert(marshal == true);
std::vector<uint64_t> delete_timestamps(3, dataset.timestamps_[N - 1]);
auto offset = PreDelete(segment, 3);
auto del_res = Delete(segment, offset, 3, delete_data.c_str(), delete_timestamps.data());
assert(del_res.error_code == Success);
// retrieve pks in {1, 2, 3}
res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
query_result = std::make_unique<proto::segcore::RetrieveResults>();
suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 0);
DeleteRetrievePlan(plan.release());
DeleteRetrieveResult(&retrieve_result);
DeleteCollection(collection);
DeleteSegment(segment);
}
TEST(CApiTest, SearchTest) {
auto c_collection = NewCollection(get_default_schema_config());
auto segment = NewSegment(c_collection, Growing, -1);
......@@ -388,7 +550,7 @@ TEST(CApiTest, RetrieveTestWithExpr) {
plan->field_ids_ = target_field_ids;
CRetrieveResult retrieve_result;
auto res = Retrieve(segment, plan.release(), dataset.timestamps_[0], &retrieve_result);
auto res = Retrieve(segment, plan.get(), dataset.timestamps_[0], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
DeleteRetrievePlan(plan.release());
......
......@@ -123,11 +123,11 @@ struct GeneratedData {
private:
GeneratedData() = default;
friend GeneratedData
DataGen(SchemaPtr schema, int64_t N, uint64_t seed, uint64_t ts_offset);
DataGen(SchemaPtr schema, int64_t N, uint64_t seed, uint64_t ts_offset, int repeat_count);
};
inline GeneratedData
DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42, uint64_t ts_offset = 0) {
DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42, uint64_t ts_offset = 0, int repeat_count = 1) {
using std::vector;
std::default_random_engine er(seed);
std::normal_distribution<> distr(0, 1);
......@@ -181,19 +181,8 @@ DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42, uint64_t ts_offset = 0)
}
case DataType::INT64: {
vector<int64_t> data(N);
// begin with counter
if (starts_with(field_meta.get_name().get(), "counter")) {
int64_t index = 0;
for (auto& x : data) {
x = index++;
}
} else {
int i = 0;
for (auto& x : data) {
x = er() % (2 * N);
x = i;
i++;
}
for (int i = 0; i < N; i++) {
data[i] = i / repeat_count;
}
insert_cols(data, N, field_meta);
break;
......@@ -240,8 +229,11 @@ DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42, uint64_t ts_offset = 0)
}
case DataType::VARCHAR: {
vector<std::string> data(N);
for (auto& x : data) {
x = std::to_string(er());
for (int i = 0; i < N / repeat_count; i++) {
auto str = std::to_string(er());
for (int j = 0; j < repeat_count; j++) {
data[i * repeat_count + j] = str;
}
}
insert_cols(data, N, field_meta);
break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册