提交 751c9d4e 编写于 作者: S sunby 提交者: yefu.chen

Refactor meta.go

Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
上级 964e7e0a
......@@ -41,7 +41,6 @@ require (
google.golang.org/genproto v0.0.0-20200122232147-0452cf42e150 // indirect
google.golang.org/grpc v1.31.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v2 v2.3.0 // indirect
honnef.co/go/tools v0.0.1-2020.1.4 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)
......
......@@ -3,6 +3,8 @@ package dataservice
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
......@@ -45,7 +47,7 @@ func (handler *ddHandler) handleCreateCollection(msg *msgstream.CreateCollection
if err := proto.Unmarshal(msg.Schema, schema); err != nil {
return err
}
err := handler.meta.AddCollection(&collectionInfo{
err := handler.meta.AddCollection(&datapb.CollectionInfo{
ID: msg.CollectionID,
Schema: schema,
})
......
......@@ -2,7 +2,6 @@ package dataservice
import (
"fmt"
"strconv"
"sync"
"github.com/golang/protobuf/proto"
......@@ -11,7 +10,11 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
const (
metaPrefix = "dataservice-meta"
segmentPrefix = metaPrefix + "/s"
)
type errSegmentNotFound struct {
......@@ -20,16 +23,11 @@ type errSegmentNotFound struct {
type errCollectionNotFound struct {
collectionID UniqueID
}
type collectionInfo struct {
ID UniqueID
Schema *schemapb.CollectionSchema
Partitions []UniqueID
}
type meta struct {
client kv.TxnBase // client of a reliable kv service, i.e. etcd client
collID2Info map[UniqueID]*collectionInfo // collection id to collection info
segID2Info map[UniqueID]*datapb.SegmentInfo // segment id to segment info
ddLock sync.RWMutex
sync.RWMutex
client kv.TxnBase // client of a reliable kv service, i.e. etcd client
collections map[UniqueID]*datapb.CollectionInfo // collection id to collection info
segments map[UniqueID]*datapb.SegmentInfo // segment id to segment info
}
func newErrSegmentNotFound(segmentID UniqueID) errSegmentNotFound {
......@@ -51,8 +49,8 @@ func (err errCollectionNotFound) Error() string {
func newMeta(kv kv.TxnBase) (*meta, error) {
mt := &meta{
client: kv,
collID2Info: make(map[UniqueID]*collectionInfo),
segID2Info: make(map[UniqueID]*datapb.SegmentInfo),
collections: make(map[UniqueID]*datapb.CollectionInfo),
segments: make(map[UniqueID]*datapb.SegmentInfo),
}
err := mt.reloadFromKV()
if err != nil {
......@@ -62,7 +60,7 @@ func newMeta(kv kv.TxnBase) (*meta, error) {
}
func (meta *meta) reloadFromKV() error {
_, values, err := meta.client.LoadWithPrefix("segment")
_, values, err := meta.client.LoadWithPrefix(segmentPrefix)
if err != nil {
return err
}
......@@ -73,66 +71,65 @@ func (meta *meta) reloadFromKV() error {
if err != nil {
return fmt.Errorf("DataService reloadFromKV UnMarshalText datapb.SegmentInfo err:%w", err)
}
meta.segID2Info[segmentInfo.SegmentID] = segmentInfo
meta.segments[segmentInfo.ID] = segmentInfo
}
return nil
}
func (meta *meta) AddCollection(collectionInfo *collectionInfo) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
if _, ok := meta.collID2Info[collectionInfo.ID]; ok {
return fmt.Errorf("collection %s with id %d already exist", collectionInfo.Schema.Name, collectionInfo.ID)
func (meta *meta) AddCollection(collection *datapb.CollectionInfo) error {
meta.Lock()
defer meta.Unlock()
if _, ok := meta.collections[collection.ID]; ok {
return fmt.Errorf("collection %s with id %d already exist", collection.Schema.Name, collection.ID)
}
meta.collID2Info[collectionInfo.ID] = collectionInfo
meta.collections[collection.ID] = collection
return nil
}
func (meta *meta) DropCollection(collID UniqueID) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
meta.Lock()
defer meta.Unlock()
if _, ok := meta.collID2Info[collID]; !ok {
if _, ok := meta.collections[collID]; !ok {
return newErrCollectionNotFound(collID)
}
ids := make([]UniqueID, 0)
for i, info := range meta.segID2Info {
key := fmt.Sprintf("%s/%d/", segmentPrefix, collID)
if err := meta.client.RemoveWithPrefix(key); err != nil {
return err
}
delete(meta.collections, collID)
for i, info := range meta.segments {
if info.CollectionID == collID {
delete(meta.segID2Info, i)
ids = append(ids, i)
delete(meta.segments, i)
}
}
if err := meta.removeSegments(ids); err != nil {
_ = meta.reloadFromKV()
return err
}
delete(meta.collID2Info, collID)
return nil
}
func (meta *meta) HasCollection(collID UniqueID) bool {
meta.ddLock.RLock()
defer meta.ddLock.RUnlock()
_, ok := meta.collID2Info[collID]
meta.RLock()
defer meta.RUnlock()
_, ok := meta.collections[collID]
return ok
}
func (meta *meta) GetCollection(collectionID UniqueID) (*collectionInfo, error) {
meta.ddLock.RLock()
defer meta.ddLock.RUnlock()
func (meta *meta) GetCollection(collectionID UniqueID) (*datapb.CollectionInfo, error) {
meta.RLock()
defer meta.RUnlock()
collectionInfo, ok := meta.collID2Info[collectionID]
collection, ok := meta.collections[collectionID]
if !ok {
return nil, newErrCollectionNotFound(collectionID)
}
return collectionInfo, nil
return proto.Clone(collection).(*datapb.CollectionInfo), nil
}
func (meta *meta) GetNumRowsOfCollection(collectionID UniqueID) (int64, error) {
meta.ddLock.RLock()
defer meta.ddLock.RUnlock()
meta.RLock()
defer meta.RUnlock()
var ret int64 = 0
for _, info := range meta.segID2Info {
for _, info := range meta.segments {
if info.CollectionID == collectionID {
ret += info.NumRows
}
......@@ -141,10 +138,10 @@ func (meta *meta) GetNumRowsOfCollection(collectionID UniqueID) (int64, error) {
}
func (meta *meta) GetMemSizeOfCollection(collectionID UniqueID) (int64, error) {
meta.ddLock.RLock()
defer meta.ddLock.RUnlock()
meta.RLock()
defer meta.RUnlock()
var ret int64 = 0
for _, info := range meta.segID2Info {
for _, info := range meta.segments {
if info.CollectionID == collectionID {
ret += info.MemSize
}
......@@ -152,161 +149,158 @@ func (meta *meta) GetMemSizeOfCollection(collectionID UniqueID) (int64, error) {
return ret, nil
}
func (meta *meta) AddSegment(segmentInfo *datapb.SegmentInfo) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
if _, ok := meta.segID2Info[segmentInfo.SegmentID]; ok {
return fmt.Errorf("segment %d already exist", segmentInfo.SegmentID)
func (meta *meta) AddSegment(segment *datapb.SegmentInfo) error {
meta.Lock()
defer meta.Unlock()
if _, ok := meta.segments[segment.ID]; ok {
return fmt.Errorf("segment %d already exist", segment.ID)
}
meta.segID2Info[segmentInfo.SegmentID] = segmentInfo
if err := meta.saveSegmentInfo(segmentInfo); err != nil {
_ = meta.reloadFromKV()
meta.segments[segment.ID] = segment
if err := meta.saveSegmentInfo(segment); err != nil {
return err
}
return nil
}
func (meta *meta) UpdateSegment(segmentInfo *datapb.SegmentInfo) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
meta.segID2Info[segmentInfo.SegmentID] = segmentInfo
if err := meta.saveSegmentInfo(segmentInfo); err != nil {
_ = meta.reloadFromKV()
func (meta *meta) UpdateSegment(segment *datapb.SegmentInfo) error {
meta.Lock()
defer meta.Unlock()
seg, ok := meta.segments[segment.ID]
if !ok {
return newErrSegmentNotFound(segment.ID)
}
seg.OpenTime = segment.OpenTime
seg.SealedTime = segment.SealedTime
seg.NumRows = segment.NumRows
seg.MemSize = segment.MemSize
seg.StartPosition = proto.Clone(segment.StartPosition).(*internalpb.MsgPosition)
seg.EndPosition = proto.Clone(segment.EndPosition).(*internalpb.MsgPosition)
if err := meta.saveSegmentInfo(segment); err != nil {
return err
}
return nil
}
func (meta *meta) DropSegment(segmentID UniqueID) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
meta.Lock()
defer meta.Unlock()
if _, ok := meta.segID2Info[segmentID]; !ok {
segment, ok := meta.segments[segmentID]
if !ok {
return newErrSegmentNotFound(segmentID)
}
delete(meta.segID2Info, segmentID)
if err := meta.removeSegmentInfo(segmentID); err != nil {
_ = meta.reloadFromKV()
if err := meta.removeSegmentInfo(segment); err != nil {
return err
}
delete(meta.segments, segmentID)
return nil
}
func (meta *meta) GetSegment(segID UniqueID) (*datapb.SegmentInfo, error) {
meta.ddLock.RLock()
defer meta.ddLock.RUnlock()
meta.RLock()
defer meta.RUnlock()
segmentInfo, ok := meta.segID2Info[segID]
segment, ok := meta.segments[segID]
if !ok {
return nil, newErrSegmentNotFound(segID)
}
return segmentInfo, nil
return proto.Clone(segment).(*datapb.SegmentInfo), nil
}
func (meta *meta) OpenSegment(segmentID UniqueID, timetick Timestamp) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
meta.Lock()
defer meta.Unlock()
segInfo, ok := meta.segID2Info[segmentID]
segInfo, ok := meta.segments[segmentID]
if !ok {
return newErrSegmentNotFound(segmentID)
}
segInfo.OpenTime = timetick
err := meta.saveSegmentInfo(segInfo)
if err != nil {
_ = meta.reloadFromKV()
if err := meta.saveSegmentInfo(segInfo); err != nil {
return err
}
return nil
}
func (meta *meta) SealSegment(segID UniqueID, timetick Timestamp) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
meta.Lock()
defer meta.Unlock()
segInfo, ok := meta.segID2Info[segID]
segInfo, ok := meta.segments[segID]
if !ok {
return newErrSegmentNotFound(segID)
}
segInfo.SealedTime = timetick
err := meta.saveSegmentInfo(segInfo)
if err != nil {
_ = meta.reloadFromKV()
if err := meta.saveSegmentInfo(segInfo); err != nil {
return err
}
return nil
}
func (meta *meta) FlushSegment(segID UniqueID, timetick Timestamp) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
meta.Lock()
defer meta.Unlock()
segInfo, ok := meta.segID2Info[segID]
segInfo, ok := meta.segments[segID]
if !ok {
return newErrSegmentNotFound(segID)
}
segInfo.FlushedTime = timetick
segInfo.State = commonpb.SegmentState_Flushed
err := meta.saveSegmentInfo(segInfo)
if err != nil {
_ = meta.reloadFromKV()
if err := meta.saveSegmentInfo(segInfo); err != nil {
return err
}
return nil
}
func (meta *meta) SetSegmentState(segmentID UniqueID, state commonpb.SegmentState) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
meta.Lock()
defer meta.Unlock()
segInfo, ok := meta.segID2Info[segmentID]
segInfo, ok := meta.segments[segmentID]
if !ok {
return newErrSegmentNotFound(segmentID)
}
segInfo.State = state
err := meta.saveSegmentInfo(segInfo)
if err != nil {
_ = meta.reloadFromKV()
if err := meta.saveSegmentInfo(segInfo); err != nil {
return err
}
return nil
}
func (meta *meta) GetSegmentsOfCollection(collectionID UniqueID) []UniqueID {
meta.ddLock.RLock()
defer meta.ddLock.RUnlock()
meta.RLock()
defer meta.RUnlock()
ret := make([]UniqueID, 0)
for _, info := range meta.segID2Info {
for _, info := range meta.segments {
if info.CollectionID == collectionID {
ret = append(ret, info.SegmentID)
ret = append(ret, info.ID)
}
}
return ret
}
func (meta *meta) GetSegmentsOfPartition(collectionID, partitionID UniqueID) []UniqueID {
meta.ddLock.RLock()
defer meta.ddLock.RUnlock()
meta.RLock()
defer meta.RUnlock()
ret := make([]UniqueID, 0)
for _, info := range meta.segID2Info {
for _, info := range meta.segments {
if info.CollectionID == collectionID && info.PartitionID == partitionID {
ret = append(ret, info.SegmentID)
ret = append(ret, info.ID)
}
}
return ret
}
func (meta *meta) AddPartition(collectionID UniqueID, partitionID UniqueID) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
coll, ok := meta.collID2Info[collectionID]
meta.Lock()
defer meta.Unlock()
coll, ok := meta.collections[collectionID]
if !ok {
return newErrCollectionNotFound(collectionID)
}
......@@ -321,10 +315,10 @@ func (meta *meta) AddPartition(collectionID UniqueID, partitionID UniqueID) erro
}
func (meta *meta) DropPartition(collID UniqueID, partitionID UniqueID) error {
meta.ddLock.Lock()
defer meta.ddLock.Unlock()
meta.Lock()
defer meta.Unlock()
collection, ok := meta.collID2Info[collID]
collection, ok := meta.collections[collID]
if !ok {
return newErrCollectionNotFound(collID)
}
......@@ -339,26 +333,25 @@ func (meta *meta) DropPartition(collID UniqueID, partitionID UniqueID) error {
return fmt.Errorf("cannot find partition id %d", partitionID)
}
ids := make([]UniqueID, 0)
for i, info := range meta.segID2Info {
if info.PartitionID == partitionID {
delete(meta.segID2Info, i)
ids = append(ids, i)
}
}
if err := meta.removeSegments(ids); err != nil {
_ = meta.reloadFromKV()
prefix := fmt.Sprintf("%s/%d/%d/", segmentPrefix, collID, partitionID)
if err := meta.client.RemoveWithPrefix(prefix); err != nil {
return err
}
collection.Partitions = append(collection.Partitions[:idx], collection.Partitions[idx+1:]...)
for i, info := range meta.segments {
if info.PartitionID == partitionID {
delete(meta.segments, i)
}
}
return nil
}
func (meta *meta) GetNumRowsOfPartition(collectionID UniqueID, partitionID UniqueID) (int64, error) {
meta.ddLock.RLock()
defer meta.ddLock.RUnlock()
meta.RLock()
defer meta.RUnlock()
var ret int64 = 0
for _, info := range meta.segID2Info {
for _, info := range meta.segments {
if info.CollectionID == collectionID && info.PartitionID == partitionID {
ret += info.NumRows
}
......@@ -366,27 +359,21 @@ func (meta *meta) GetNumRowsOfPartition(collectionID UniqueID, partitionID Uniqu
return ret, nil
}
func (meta *meta) saveSegmentInfo(segmentInfo *datapb.SegmentInfo) error {
segBytes := proto.MarshalTextString(segmentInfo)
func (meta *meta) saveSegmentInfo(segment *datapb.SegmentInfo) error {
segBytes := proto.MarshalTextString(segment)
return meta.client.Save("/segment/"+strconv.FormatInt(segmentInfo.SegmentID, 10), segBytes)
key := fmt.Sprintf("%s/%d/%d/%d", segmentPrefix, segment.CollectionID, segment.PartitionID, segment.ID)
return meta.client.Save(key, segBytes)
}
func (meta *meta) removeSegmentInfo(segID UniqueID) error {
return meta.client.Remove("/segment/" + strconv.FormatInt(segID, 10))
}
func (meta *meta) removeSegments(segIDs []UniqueID) error {
segmentPaths := make([]string, len(segIDs))
for i, id := range segIDs {
segmentPaths[i] = "/segment/" + strconv.FormatInt(id, 10)
}
return meta.client.MultiRemove(segmentPaths)
func (meta *meta) removeSegmentInfo(segment *datapb.SegmentInfo) error {
key := fmt.Sprintf("%s/%d/%d/%d", segmentPrefix, segment.CollectionID, segment.PartitionID, segment.ID)
return meta.client.Remove(key)
}
func BuildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelName string) (*datapb.SegmentInfo, error) {
return &datapb.SegmentInfo{
SegmentID: segmentID,
ID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channelName,
......
......@@ -3,6 +3,10 @@ package dataservice
import (
"testing"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/stretchr/testify/assert"
)
......@@ -13,13 +17,13 @@ func TestCollection(t *testing.T) {
testSchema := newTestSchema()
id, err := mockAllocator.allocID()
assert.Nil(t, err)
err = meta.AddCollection(&collectionInfo{
err = meta.AddCollection(&datapb.CollectionInfo{
ID: id,
Schema: testSchema,
Partitions: []UniqueID{100},
})
assert.Nil(t, err)
err = meta.AddCollection(&collectionInfo{
err = meta.AddCollection(&datapb.CollectionInfo{
ID: id,
Schema: testSchema,
})
......@@ -52,20 +56,20 @@ func TestSegment(t *testing.T) {
assert.Nil(t, err)
err = meta.AddSegment(segmentInfo)
assert.Nil(t, err)
info, err := meta.GetSegment(segmentInfo.SegmentID)
info, err := meta.GetSegment(segmentInfo.ID)
assert.Nil(t, err)
assert.EqualValues(t, segmentInfo, info)
assert.True(t, proto.Equal(info, segmentInfo))
ids := meta.GetSegmentsOfCollection(id)
assert.EqualValues(t, 1, len(ids))
assert.EqualValues(t, segmentInfo.SegmentID, ids[0])
assert.EqualValues(t, segmentInfo.ID, ids[0])
ids = meta.GetSegmentsOfPartition(id, 100)
assert.EqualValues(t, 1, len(ids))
assert.EqualValues(t, segmentInfo.SegmentID, ids[0])
err = meta.SealSegment(segmentInfo.SegmentID, 100)
assert.EqualValues(t, segmentInfo.ID, ids[0])
err = meta.SealSegment(segmentInfo.ID, 100)
assert.Nil(t, err)
err = meta.FlushSegment(segmentInfo.SegmentID, 200)
err = meta.FlushSegment(segmentInfo.ID, 200)
assert.Nil(t, err)
info, err = meta.GetSegment(segmentInfo.SegmentID)
info, err = meta.GetSegment(segmentInfo.ID)
assert.Nil(t, err)
assert.NotZero(t, info.SealedTime)
assert.NotZero(t, info.FlushedTime)
......@@ -81,7 +85,7 @@ func TestPartition(t *testing.T) {
err = meta.AddPartition(id, 10)
assert.NotNil(t, err)
err = meta.AddCollection(&collectionInfo{
err = meta.AddCollection(&datapb.CollectionInfo{
ID: id,
Schema: testSchema,
Partitions: []UniqueID{},
......
......@@ -90,8 +90,8 @@ func (allocator *segmentAllocator) OpenSegment(ctx context.Context, segmentInfo
defer sp.Finish()
allocator.mu.Lock()
defer allocator.mu.Unlock()
if _, ok := allocator.segments[segmentInfo.SegmentID]; ok {
return fmt.Errorf("segment %d already exist", segmentInfo.SegmentID)
if _, ok := allocator.segments[segmentInfo.ID]; ok {
return fmt.Errorf("segment %d already exist", segmentInfo.ID)
}
totalRows, err := allocator.estimateTotalRows(segmentInfo.CollectionID)
if err != nil {
......@@ -99,10 +99,10 @@ func (allocator *segmentAllocator) OpenSegment(ctx context.Context, segmentInfo
}
log.Debug("dataservice: estimateTotalRows: ",
zap.Int64("CollectionID", segmentInfo.CollectionID),
zap.Int64("SegmentID", segmentInfo.SegmentID),
zap.Int64("SegmentID", segmentInfo.ID),
zap.Int("Rows", totalRows))
allocator.segments[segmentInfo.SegmentID] = &segmentStatus{
id: segmentInfo.SegmentID,
allocator.segments[segmentInfo.ID] = &segmentStatus{
id: segmentInfo.ID,
collectionID: segmentInfo.CollectionID,
partitionID: segmentInfo.PartitionID,
total: totalRows,
......
......@@ -8,6 +8,8 @@ import (
"testing"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/stretchr/testify/assert"
......@@ -24,7 +26,7 @@ func TestAllocSegment(t *testing.T) {
schema := newTestSchema()
collID, err := mockAllocator.allocID()
assert.Nil(t, err)
err = meta.AddCollection(&collectionInfo{
err = meta.AddCollection(&datapb.CollectionInfo{
ID: collID,
Schema: schema,
})
......@@ -75,7 +77,7 @@ func TestSealSegment(t *testing.T) {
schema := newTestSchema()
collID, err := mockAllocator.allocID()
assert.Nil(t, err)
err = meta.AddCollection(&collectionInfo{
err = meta.AddCollection(&datapb.CollectionInfo{
ID: collID,
Schema: schema,
})
......@@ -90,7 +92,7 @@ func TestSealSegment(t *testing.T) {
assert.Nil(t, err)
err = segAllocator.OpenSegment(ctx, segmentInfo)
assert.Nil(t, err)
lastSegID = segmentInfo.SegmentID
lastSegID = segmentInfo.ID
}
err = segAllocator.SealSegment(ctx, lastSegID)
......@@ -112,7 +114,7 @@ func TestExpireSegment(t *testing.T) {
schema := newTestSchema()
collID, err := mockAllocator.allocID()
assert.Nil(t, err)
err = meta.AddCollection(&collectionInfo{
err = meta.AddCollection(&datapb.CollectionInfo{
ID: collID,
Schema: schema,
})
......
......@@ -246,7 +246,7 @@ func (s *Server) loadMetaFromMaster() error {
log.Error("show partitions error", zap.String("collectionName", collectionName), zap.Int64("collectionID", collection.CollectionID), zap.Error(err))
continue
}
err = s.meta.AddCollection(&collectionInfo{
err = s.meta.AddCollection(&datapb.CollectionInfo{
ID: collection.CollectionID,
Schema: collection.Schema,
Partitions: partitions.PartitionIDs,
......@@ -615,7 +615,7 @@ func (s *Server) loadCollectionFromMaster(ctx context.Context, collectionID int6
if err = VerifyResponse(resp, err); err != nil {
return err
}
collInfo := &collectionInfo{
collInfo := &datapb.CollectionInfo{
ID: resp.CollectionID,
Schema: resp.Schema,
}
......@@ -797,14 +797,14 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
resp.Status.Reason = "data service is not healthy"
return resp, nil
}
infos := make([]*datapb.SegmentInfo, len(req.SegmentIDs))
for i, id := range req.SegmentIDs {
segmentInfo, err := s.meta.GetSegment(id)
infos := make([]*datapb.SegmentInfo, 0, len(req.SegmentIDs))
for _, id := range req.SegmentIDs {
info, err := s.meta.GetSegment(id)
if err != nil {
resp.Status.Reason = err.Error()
return resp, nil
}
infos[i] = proto.Clone(segmentInfo).(*datapb.SegmentInfo)
infos = append(infos, info)
}
resp.Status.ErrorCode = commonpb.ErrorCode_Success
resp.Infos = infos
......
......@@ -34,6 +34,6 @@ func (handler *statsHandler) HandleSegmentStat(segStats *internalpb.SegmentStati
segMeta.SealedTime = segStats.EndTime
segMeta.NumRows = segStats.NumRows
segMeta.MemSize = segStats.MemorySize
log.Debug("stats_handler update segment", zap.Any("segmentID", segMeta.SegmentID), zap.Any("State", segMeta.State))
log.Debug("stats_handler update segment", zap.Any("segmentID", segMeta.ID), zap.Any("State", segMeta.State))
return handler.meta.UpdateSegment(segMeta)
}
......@@ -102,7 +102,7 @@ func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTic
log.Error("set segment state error", zap.Int64("segmentID", id), zap.Error(err))
continue
}
collID, segID := sInfo.CollectionID, sInfo.SegmentID
collID, segID := sInfo.CollectionID, sInfo.ID
coll2Segs[collID] = append(coll2Segs[collID], segID)
watcher.allocator.DropSegment(ctx, id)
}
......
......@@ -6,6 +6,8 @@ import (
"testing"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
......@@ -29,7 +31,7 @@ func TestDataNodeTTWatcher(t *testing.T) {
id, err := allocator.allocID()
assert.Nil(t, err)
err = meta.AddCollection(&collectionInfo{
err = meta.AddCollection(&datapb.CollectionInfo{
Schema: schema,
ID: id,
})
......
......@@ -407,7 +407,7 @@ func TestGrpcService(t *testing.T) {
assert.Nil(t, err)
assert.Zero(t, len(part.SegmentIDs))
seg := &datapb.SegmentInfo{
SegmentID: 1000,
ID: 1000,
CollectionID: coll.ID,
PartitionID: part.PartitionID,
}
......@@ -521,7 +521,7 @@ func TestGrpcService(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, len(part.SegmentIDs), 1)
seg := &datapb.SegmentInfo{
SegmentID: 1001,
ID: 1001,
CollectionID: coll.ID,
PartitionID: part.PartitionID,
}
......
......@@ -8,6 +8,7 @@ type Base interface {
MultiSave(kvs map[string]string) error
Remove(key string) error
MultiRemove(keys []string) error
RemoveWithPrefix(key string) error
Close()
}
......
......@@ -116,6 +116,7 @@ func (kv *MemoryKV) LoadWithPrefix(key string) ([]string, []string, error) {
keys := make([]string, 0)
values := make([]string, 0)
kv.tree.Ascend(func(i btree.Item) bool {
if strings.HasPrefix(i.(memoryKVItem).key, key) {
keys = append(keys, i.(memoryKVItem).key)
......@@ -135,3 +136,21 @@ func (kv *MemoryKV) MultiRemoveWithPrefix(keys []string) error {
func (kv *MemoryKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
panic("not implement")
}
func (kv *MemoryKV) RemoveWithPrefix(key string) error {
kv.Lock()
defer kv.Unlock()
keys := make([]btree.Item, 0)
kv.tree.Ascend(func(i btree.Item) bool {
if strings.HasPrefix(i.(memoryKVItem).key, key) {
keys = append(keys, i.(memoryKVItem))
}
return true
})
for _, item := range keys {
kv.tree.Delete(item)
}
return nil
}
......@@ -289,7 +289,7 @@ func (c *Core) startDataServiceSegmentLoop() {
//what if master add segment failed, but data service success?
log.Warn("add segment info meta table failed ", zap.String("error", err.Error()))
} else {
log.Debug("add segment", zap.Int64("collection id", seg.CollectionID), zap.Int64("partition id", seg.PartitionID), zap.Int64("segment id", seg.SegmentID))
log.Debug("add segment", zap.Int64("collection id", seg.CollectionID), zap.Int64("partition id", seg.PartitionID), zap.Int64("segment id", seg.ID))
}
}
}
......
......@@ -563,7 +563,7 @@ func TestMasterService(t *testing.T) {
assert.Zero(t, len(part.SegmentIDs))
seg := &datapb.SegmentInfo{
SegmentID: 1000,
ID: 1000,
CollectionID: coll.ID,
PartitionID: part.PartitionID,
}
......@@ -722,7 +722,7 @@ func TestMasterService(t *testing.T) {
assert.Equal(t, len(part.SegmentIDs), 1)
seg := &datapb.SegmentInfo{
SegmentID: 1001,
ID: 1001,
CollectionID: coll.ID,
PartitionID: part.PartitionID,
}
......
......@@ -543,17 +543,17 @@ func (mt *metaTable) AddSegment(seg *datapb.SegmentInfo) error {
}
exist = false
for _, segID := range partMeta.SegmentIDs {
if segID == seg.SegmentID {
if segID == seg.ID {
exist = true
}
}
if exist {
return fmt.Errorf("segment id = %d exist", seg.SegmentID)
return fmt.Errorf("segment id = %d exist", seg.ID)
}
partMeta.SegmentIDs = append(partMeta.SegmentIDs, seg.SegmentID)
partMeta.SegmentIDs = append(partMeta.SegmentIDs, seg.ID)
mt.partitionID2Meta[seg.PartitionID] = partMeta
mt.segID2CollID[seg.SegmentID] = seg.CollectionID
mt.segID2PartitionID[seg.SegmentID] = seg.PartitionID
mt.segID2CollID[seg.ID] = seg.CollectionID
mt.segID2PartitionID[seg.ID] = seg.PartitionID
k := fmt.Sprintf("%s/%d/%d", PartitionMetaPrefix, seg.CollectionID, seg.PartitionID)
v := proto.MarshalTextString(&partMeta)
......
......@@ -126,13 +126,13 @@ func TestMetaTable(t *testing.T) {
t.Run("add segment", func(t *testing.T) {
seg := &datapb.SegmentInfo{
SegmentID: 100,
ID: 100,
CollectionID: 1,
PartitionID: 10,
}
assert.Nil(t, mt.AddSegment(seg))
assert.NotNil(t, mt.AddSegment(seg))
seg.SegmentID = 101
seg.ID = 101
seg.CollectionID = 2
assert.NotNil(t, mt.AddSegment(seg))
seg.CollectionID = 1
......
......@@ -7,6 +7,7 @@ option go_package = "github.com/zilliztech/milvus-distributed/internal/proto/dat
import "common.proto";
import "internal.proto";
import "milvus.proto";
import "schema.proto";
service DataService {
rpc GetComponentStates(internal.GetComponentStatesRequest) returns (internal.ComponentStates) {}
......@@ -120,21 +121,6 @@ message GetSegmentInfoRequest {
repeated int64 segmentIDs = 2;
}
message SegmentInfo {
int64 segmentID = 1;
int64 collectionID = 2;
int64 partitionID = 3;
string insert_channel = 4;
uint64 open_time = 5;
uint64 sealed_time = 6;
uint64 flushed_time = 7;
int64 num_rows = 8;
int64 mem_size = 9;
common.SegmentState state = 10;
internal.MsgPosition start_position = 11;
internal.MsgPosition end_position = 12;
}
message GetSegmentInfoResponse {
common.Status status = 1;
repeated SegmentInfo infos = 2;
......@@ -227,3 +213,23 @@ message DDLFlushMeta {
int64 collectionID = 1;
repeated string binlog_paths = 2;
}
message CollectionInfo {
int64 ID = 1;
schema.CollectionSchema schema = 2;
repeated int64 partitions = 3;
}
message SegmentInfo {
int64 ID = 1;
int64 collectionID = 2;
int64 partitionID = 3;
string insert_channel = 4;
uint64 open_time = 5;
uint64 sealed_time = 6;
uint64 flushed_time = 7;
int64 num_rows = 8;
int64 mem_size = 9;
common.SegmentState state = 10;
internal.MsgPosition start_position = 11;
internal.MsgPosition end_position = 12;
}
......@@ -1140,7 +1140,7 @@ func (node *ProxyNode) GetPersistentSegmentInfo(ctx context.Context, req *milvus
persistentInfos := make([]*milvuspb.PersistentSegmentInfo, len(infoResp.Infos))
for i, info := range infoResp.Infos {
persistentInfos[i] = &milvuspb.PersistentSegmentInfo{
SegmentID: info.SegmentID,
SegmentID: info.ID,
CollectionID: info.CollectionID,
PartitionID: info.PartitionID,
OpenTime: info.OpenTime,
......
......@@ -152,7 +152,7 @@ func (mService *metaService) processSegmentCreate(id string, value string) {
// TODO: what if seg == nil? We need to notify master and return rpc request failed
if seg != nil {
// TODO: get partition id from segment meta
err := mService.replica.addSegment(seg.SegmentID, seg.PartitionID, seg.CollectionID, segmentTypeGrowing)
err := mService.replica.addSegment(seg.ID, seg.PartitionID, seg.CollectionID, segmentTypeGrowing)
if err != nil {
log.Error(err.Error())
return
......
......@@ -72,7 +72,7 @@ func TestMetaService_printCollectionStruct(t *testing.T) {
func TestMetaService_printSegmentStruct(t *testing.T) {
var s = datapb.SegmentInfo{
SegmentID: UniqueID(0),
ID: UniqueID(0),
CollectionID: UniqueID(0),
PartitionID: defaultPartitionID,
OpenTime: Timestamp(0),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册