未验证 提交 7e43b568 编写于 作者: C congqixia 提交者: GitHub

Add dropped ts and apply when gc scan dropped segments (#11838)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 27d711e5
......@@ -106,14 +106,14 @@ func (gc *garbageCollector) close() {
// if drop found or missing found, performs gc cleanup
func (gc *garbageCollector) scan() {
var v, d, m, e int
valid, dropped := gc.meta.ListSegmentFiles()
valid, dropped, droppedAt := gc.meta.ListSegmentFiles()
vm := make(map[string]struct{})
dm := make(map[string]struct{})
dm := make(map[string]uint64)
for _, k := range valid {
vm[k] = struct{}{}
}
for _, k := range dropped {
dm[k] = struct{}{}
for i, k := range dropped {
dm[k] = droppedAt[i]
}
for info := range gc.option.cli.ListObjects(context.TODO(), gc.option.bucketName, minio.ListObjectsOptions{
......@@ -127,11 +127,12 @@ func (gc *garbageCollector) scan() {
continue
}
// dropped
_, has = dm[info.Key]
droppedTs, has := dm[info.Key]
if has {
d++
droppedTime := time.Unix(0, int64(droppedTs))
// check file last modified time exceeds tolerance duration
if time.Since(info.LastModified) > gc.option.dropTolerance {
if time.Since(droppedTime) > gc.option.dropTolerance {
e++
// ignore error since it could be cleaned up next time
_ = gc.option.cli.RemoveObject(context.TODO(), gc.option.bucketName, info.Key, minio.RemoveObjectOptions{})
......
......@@ -87,7 +87,7 @@ func Test_garbageCollector_scan(t *testing.T) {
bucketName := `datacoord-ut` + strings.ToLower(funcutil.RandomString(8))
rootPath := `gc` + funcutil.RandomString(8)
//TODO change to Params
cli, files, err := initUtOSSEnv(bucketName, rootPath, 3)
cli, files, err := initUtOSSEnv(bucketName, rootPath, 6)
require.NoError(t, err)
mockAllocator := newMockAllocator()
......@@ -106,13 +106,13 @@ func Test_garbageCollector_scan(t *testing.T) {
})
gc.scan()
current := make([]string, 0, 3)
current := make([]string, 0, 6)
for info := range cli.ListObjects(context.TODO(), bucketName, minio.ListObjectsOptions{Prefix: rootPath, Recursive: true}) {
current = append(current, info.Key)
}
assert.ElementsMatch(t, files, current)
})
t.Run("all hit, no gc", func(t *testing.T) {
t.Run("hit, no gc", func(t *testing.T) {
segment := buildSegment(1, 10, 100, "ch")
segment.State = commonpb.SegmentState_Flushed
segment.Binlogs = []*datapb.FieldBinlog{{FieldID: 0, Binlogs: []string{files[0]}}}
......@@ -133,7 +133,7 @@ func Test_garbageCollector_scan(t *testing.T) {
gc.start()
gc.scan()
current := make([]string, 0, 3)
current := make([]string, 0, 6)
for info := range cli.ListObjects(context.TODO(), bucketName, minio.ListObjectsOptions{Prefix: rootPath, Recursive: true}) {
current = append(current, info.Key)
}
......@@ -144,6 +144,9 @@ func Test_garbageCollector_scan(t *testing.T) {
t.Run("dropped gc one", func(t *testing.T) {
segment := buildSegment(1, 10, 100, "ch")
segment.State = commonpb.SegmentState_Dropped
segment.DroppedAt = uint64(time.Now().Add(-time.Hour).UnixNano())
segment.Binlogs = []*datapb.FieldBinlog{{FieldID: 0, Binlogs: []string{files[0]}}}
segment.Statslogs = []*datapb.FieldBinlog{{FieldID: 0, Binlogs: []string{files[1]}}}
segment.Deltalogs = []*datapb.DeltaLogInfo{{DeltaLogPath: files[2]}}
err = meta.AddSegment(segment)
require.NoError(t, err)
......@@ -164,7 +167,7 @@ func Test_garbageCollector_scan(t *testing.T) {
for info := range cli.ListObjects(context.TODO(), bucketName, minio.ListObjectsOptions{Prefix: rootPath, Recursive: true}) {
current = append(current, info.Key)
}
assert.ElementsMatch(t, files[:2], current)
assert.ElementsMatch(t, files[3:], current)
gc.close()
})
t.Run("missing gc all", func(t *testing.T) {
......
......@@ -231,6 +231,7 @@ func (m *meta) UpdateFlushSegmentsInfo(
if dropped {
clonedSegment.State = commonpb.SegmentState_Dropped
clonedSegment.DroppedAt = uint64(time.Now().UnixNano())
modSegments[segmentID] = clonedSegment
}
......@@ -334,19 +335,17 @@ func (m *meta) UpdateFlushSegmentsInfo(
}
// ListSegmentFiles lists all segment related file paths in valid & dropped list
func (m *meta) ListSegmentFiles() ([]string, []string) {
func (m *meta) ListSegmentFiles() (valid []string, dropped []string, droppedAt []uint64) {
m.RLock()
defer m.RUnlock()
var valid []string
var dropped []string
for _, segment := range m.segments.GetSegments() {
for _, binlog := range segment.GetBinlogs() {
if segment.State != commonpb.SegmentState_Dropped {
valid = append(valid, binlog.Binlogs...)
} else {
dropped = append(valid, binlog.Binlogs...)
dropped = append(dropped, binlog.Binlogs...)
droppedAt = append(droppedAt, segment.DroppedAt)
}
}
......@@ -354,7 +353,8 @@ func (m *meta) ListSegmentFiles() ([]string, []string) {
if segment.State != commonpb.SegmentState_Dropped {
valid = append(valid, statLog.Binlogs...)
} else {
dropped = append(valid, statLog.Binlogs...)
dropped = append(dropped, statLog.Binlogs...)
droppedAt = append(droppedAt, segment.DroppedAt)
}
}
......@@ -362,12 +362,13 @@ func (m *meta) ListSegmentFiles() ([]string, []string) {
if segment.State != commonpb.SegmentState_Dropped {
valid = append(valid, deltaLog.GetDeltaLogPath())
} else {
dropped = append(valid, deltaLog.GetDeltaLogPath())
dropped = append(dropped, deltaLog.GetDeltaLogPath())
droppedAt = append(droppedAt, segment.DroppedAt)
}
}
}
return valid, dropped
return valid, dropped, droppedAt
}
// GetSegmentsByChannel returns all segment info which insert channel equals provided `dmlCh`
......
......@@ -211,6 +211,7 @@ message SegmentInfo {
bool createdByCompaction = 14;
repeated int64 compactionFrom = 15;
uint64 dropped_at = 16; // timestamp when segment marked drop
}
message SegmentStartPosition {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册