未验证 提交 b3eb2b1d 编写于 作者: L Letian Jiang 提交者: GitHub

Support deltaLog loading on growing segment (#16903)

Signed-off-by: NLetian Jiang <letian.jiang@zilliz.com>
上级 a8b81e21
......@@ -194,6 +194,28 @@ SegmentGrowingImpl::GetMemoryUsageInBytes() const {
return total_bytes;
}
void
SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
AssertInfo(info.row_count > 0, "The row count of deleted record is 0");
AssertInfo(info.primary_keys, "Deleted primary keys is null");
AssertInfo(info.timestamps, "Deleted timestamps is null");
// step 1: get pks and timestamps
auto field_id = schema_->get_primary_field_id().value_or(FieldId(INVALID_FIELD_ID));
AssertInfo(field_id.get() != INVALID_FIELD_ID, "Primary key has invalid field id");
auto& field_meta = schema_->operator[](field_id);
int64_t size = info.row_count;
std::vector<PkType> pks(size);
ParsePksFromIDs(pks, field_meta.get_data_type(), *info.primary_keys);
auto timestamps = reinterpret_cast<const Timestamp*>(info.timestamps);
// step 2: fill pks and timestamps
deleted_record_.pks_.set_data_raw(0, pks.data(), size);
deleted_record_.timestamps_.set_data_raw(0, timestamps, size);
deleted_record_.ack_responder_.AddSegment(0, size);
deleted_record_.reserved.fetch_add(size);
deleted_record_.record_size_ = size;
}
SpanBase
SegmentGrowingImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const {
auto vec = get_insert_record().get_field_data_base(field_id);
......
......@@ -59,6 +59,9 @@ class SegmentGrowingImpl : public SegmentGrowing {
int64_t
GetMemoryUsageInBytes() const override;
void
LoadDeletedRecord(const LoadDeletedRecordInfo& info) override;
std::string
debug() const override;
......
......@@ -24,6 +24,7 @@
#include "common/Span.h"
#include "common/SystemProperty.h"
#include "common/Types.h"
#include "common/LoadInfo.h"
#include "common/BitsetView.h"
#include "common/QueryResult.h"
#include "knowhere/index/vector_index/VecIndex.h"
......@@ -66,6 +67,9 @@ class SegmentInterface {
virtual Status
Delete(int64_t reserved_offset, int64_t size, const IdArray* pks, const Timestamp* timestamps) = 0;
virtual void
LoadDeletedRecord(const LoadDeletedRecordInfo& info) = 0;
};
// internal API for DSL calculation
......
......@@ -29,8 +29,6 @@ class SegmentSealed : public SegmentInternalInterface {
virtual void
LoadFieldData(const LoadFieldDataInfo& info) = 0;
virtual void
LoadDeletedRecord(const LoadDeletedRecordInfo& info) = 0;
virtual void
DropIndex(const FieldId field_id) = 0;
virtual void
DropFieldData(const FieldId field_id) = 0;
......
......@@ -210,8 +210,7 @@ CStatus
LoadDeletedRecord(CSegmentInterface c_segment, CLoadDeletedRecordInfo deleted_record_info) {
try {
auto segment_interface = reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
auto segment = dynamic_cast<milvus::segcore::SegmentSealed*>(segment_interface);
AssertInfo(segment != nullptr, "segment conversion failed");
AssertInfo(segment_interface != nullptr, "segment conversion failed");
auto proto = std::string(deleted_record_info.primary_keys);
Assert(!proto.empty());
auto pks = std::make_unique<milvus::proto::schema::IDs>();
......@@ -219,7 +218,7 @@ LoadDeletedRecord(CSegmentInterface c_segment, CLoadDeletedRecordInfo deleted_re
AssertInfo(suc, "unmarshal field data string failed");
auto load_info =
LoadDeletedRecordInfo{deleted_record_info.timestamps, pks.get(), deleted_record_info.row_count};
segment->LoadDeletedRecord(load_info);
segment_interface->LoadDeletedRecord(load_info);
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(UnexpectedError, e.what());
......
......@@ -1062,13 +1062,15 @@ func saveDeltaLog(collectionID UniqueID,
kvs := make(map[string][]byte, 1)
// write insert binlog
// write delta log
pkFieldID := UniqueID(106)
fieldBinlog := make([]*datapb.FieldBinlog, 0)
log.Debug("[query node unittest] save delta log", zap.Int64("fieldID", 999))
key := JoinIDPath(collectionID, partitionID, segmentID, 999)
log.Debug("[query node unittest] save delta log", zap.Int64("fieldID", pkFieldID))
key := JoinIDPath(collectionID, partitionID, segmentID, pkFieldID)
key += "delta" // append suffix 'delta' to avoid conflicts against binlog
kvs[key] = blob.Value[:]
fieldBinlog = append(fieldBinlog, &datapb.FieldBinlog{
FieldID: 999,
FieldID: pkFieldID,
Binlogs: []*datapb.Binlog{{LogPath: key}},
})
log.Debug("[query node unittest] save delta log file to MinIO/S3")
......
......@@ -774,10 +774,6 @@ func (s *Segment) segmentLoadDeletedRecord(primaryKeys []primaryKey, timestamps
if s.segmentPtr == nil {
return errors.New("null seg core pointer")
}
if s.segmentType != segmentTypeSealed {
errMsg := fmt.Sprintln("segmentLoadFieldData failed, illegal segment type ", s.segmentType, "segmentID = ", s.ID())
return errors.New(errMsg)
}
if len(primaryKeys) <= 0 {
return fmt.Errorf("empty pks to delete")
......
......@@ -25,6 +25,9 @@ import (
"strconv"
"sync"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
......@@ -41,8 +44,6 @@ import (
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
// segmentLoader is only responsible for loading the field data from binlog
......@@ -97,10 +98,8 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, segme
switch segmentType {
case segmentTypeGrowing:
metaReplica = loader.streamingReplica
case segmentTypeSealed:
metaReplica = loader.historicalReplica
default:
err := fmt.Errorf("illegal segment type when load segment, collectionID = %d", req.CollectionID)
log.Error("load segment failed, illegal segment type",
......
......@@ -430,7 +430,7 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) {
deltaLogs, err := saveDeltaLog(defaultCollectionID, defaultPartitionID, defaultSegmentID)
assert.NoError(t, err)
t.Run("test load growing and sealed segments", func(t *testing.T) {
t.Run("test load sealed segments", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
......@@ -451,7 +451,6 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) {
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
BinlogPaths: fieldBinlog,
Deltalogs: deltaLogs,
},
},
}
......@@ -459,6 +458,10 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) {
err = loader.loadSegment(req1, segmentTypeSealed)
assert.NoError(t, err)
segment1, err := loader.historicalReplica.getSegmentByID(segmentID1)
assert.NoError(t, err)
assert.Equal(t, segment1.getRowCount(), int64(100))
segmentID2 := UniqueID(101)
req2 := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
......@@ -473,25 +476,54 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) {
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
BinlogPaths: fieldBinlog,
Deltalogs: deltaLogs,
},
},
}
err = loader.loadSegment(req2, segmentTypeGrowing)
err = loader.loadSegment(req2, segmentTypeSealed)
assert.NoError(t, err)
segment1, err := loader.historicalReplica.getSegmentByID(segmentID1)
segment2, err := loader.historicalReplica.getSegmentByID(segmentID2)
assert.NoError(t, err)
// Note: getRowCount currently does not return accurate counts. The deleted rows are also counted.
assert.Equal(t, segment2.getRowCount(), int64(100)) // accurate counts should be 98
})
segment2, err := loader.streamingReplica.getSegmentByID(segmentID2)
t.Run("test load growing segments", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
loader := node.loader
assert.NotNil(t, loader)
segmentID1 := UniqueID(100)
req1 := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
},
DstNodeID: 0,
Schema: schema,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: segmentID1,
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
BinlogPaths: fieldBinlog,
},
},
}
err = loader.loadSegment(req1, segmentTypeGrowing)
assert.NoError(t, err)
assert.Equal(t, segment1.getRowCount(), segment2.getRowCount())
segment1, err := loader.streamingReplica.getSegmentByID(segmentID1)
assert.NoError(t, err)
assert.Equal(t, segment1.getRowCount(), int64(100))
// Loading growing segments with delta log, expect to fail (this is a bug).
// See: https://github.com/milvus-io/milvus/issues/16821
segmentID3 := UniqueID(102)
req3 := &querypb.LoadSegmentsRequest{
segmentID2 := UniqueID(101)
req2 := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
......@@ -500,7 +532,7 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) {
Schema: schema,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: segmentID3,
SegmentID: segmentID2,
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
BinlogPaths: fieldBinlog,
......@@ -509,8 +541,13 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) {
},
}
err = loader.loadSegment(req3, segmentTypeGrowing)
assert.Error(t, err)
err = loader.loadSegment(req2, segmentTypeGrowing)
assert.NoError(t, err)
segment2, err := loader.streamingReplica.getSegmentByID(segmentID2)
assert.NoError(t, err)
// Note: getRowCount currently does not return accurate counts. The deleted rows are also counted.
assert.Equal(t, segment2.getRowCount(), int64(100)) // accurate counts should be 98
})
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册