未验证 提交 0978b93a 编写于 作者: S sunby 提交者: GitHub

Refactor data coordinator (#5982)

Rename variable name and make error msg more clear
Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
上级 0f05622c
......@@ -25,18 +25,19 @@ type allocator interface {
}
type rootCoordAllocator struct {
ctx context.Context
rootCoordClient types.RootCoord
}
func newAllocator(rootCoordClient types.RootCoord) *rootCoordAllocator {
func newRootCoordAllocator(ctx context.Context, rootCoordClient types.RootCoord) *rootCoordAllocator {
return &rootCoordAllocator{
ctx: ctx,
rootCoordClient: rootCoordClient,
}
}
func (allocator *rootCoordAllocator) allocTimestamp() (Timestamp, error) {
ctx := context.TODO()
resp, err := allocator.rootCoordClient.AllocTimestamp(ctx, &rootcoordpb.AllocTimestampRequest{
func (alloc *rootCoordAllocator) allocTimestamp() (Timestamp, error) {
resp, err := alloc.rootCoordClient.AllocTimestamp(alloc.ctx, &rootcoordpb.AllocTimestampRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_RequestTSO,
MsgID: -1, // todo add msg id
......@@ -51,9 +52,8 @@ func (allocator *rootCoordAllocator) allocTimestamp() (Timestamp, error) {
return resp.Timestamp, nil
}
func (allocator *rootCoordAllocator) allocID() (UniqueID, error) {
ctx := context.TODO()
resp, err := allocator.rootCoordClient.AllocID(ctx, &rootcoordpb.AllocIDRequest{
func (alloc *rootCoordAllocator) allocID() (UniqueID, error) {
resp, err := alloc.rootCoordClient.AllocID(alloc.ctx, &rootcoordpb.AllocIDRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_RequestID,
MsgID: -1, // todo add msg id
......
......@@ -15,11 +15,12 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
)
func TestAllocator_Basic(t *testing.T) {
ms := newMockRootCoordService()
allocator := newAllocator(ms)
allocator := newRootCoordAllocator(context.Background(), ms)
t.Run("Test allocTimestamp", func(t *testing.T) {
_, err := allocator.allocTimestamp()
......
......@@ -11,6 +11,7 @@
package datacoord
import (
"context"
"sync"
"github.com/milvus-io/milvus/internal/types"
......@@ -26,12 +27,14 @@ type sessionManager interface {
type clusterSessionManager struct {
sync.RWMutex
ctx context.Context
sessions map[string]types.DataNode
dataClientCreator func(addr string) (types.DataNode, error)
dataClientCreator dataNodeCreatorFunc
}
func newClusterSessionManager(dataClientCreator func(addr string) (types.DataNode, error)) *clusterSessionManager {
func newClusterSessionManager(ctx context.Context, dataClientCreator dataNodeCreatorFunc) *clusterSessionManager {
return &clusterSessionManager{
ctx: ctx,
sessions: make(map[string]types.DataNode),
dataClientCreator: dataClientCreator,
}
......@@ -39,7 +42,7 @@ func newClusterSessionManager(dataClientCreator func(addr string) (types.DataNod
// lock acquired
func (m *clusterSessionManager) createSession(addr string) (types.DataNode, error) {
cli, err := m.dataClientCreator(addr)
cli, err := m.dataClientCreator(m.ctx, addr)
if err != nil {
return nil, err
}
......
......@@ -42,15 +42,15 @@ func TestFlushMonitor(t *testing.T) {
// create seg0 for partition0, seg0/seg1 for partition1
segID0_0, err := mockAllocator.allocID()
assert.Nil(t, err)
segInfo0_0, err := BuildSegment(collID, partID0, segID0_0, channelName)
segInfo0_0, err := buildSegment(collID, partID0, segID0_0, channelName)
assert.Nil(t, err)
segID1_0, err := mockAllocator.allocID()
assert.Nil(t, err)
segInfo1_0, err := BuildSegment(collID, partID1, segID1_0, channelName)
segInfo1_0, err := buildSegment(collID, partID1, segID1_0, channelName)
assert.Nil(t, err)
segID1_1, err := mockAllocator.allocID()
assert.Nil(t, err)
segInfo1_1, err := BuildSegment(collID, partID1, segID1_1, channelName)
segInfo1_1, err := buildSegment(collID, partID1, segID1_1, channelName)
assert.Nil(t, err)
// check AddSegment
......@@ -77,7 +77,7 @@ func TestFlushMonitor(t *testing.T) {
fm.segmentPolicy = estSegmentSizePolicy(1024*1024, 1024*1024*2) // row size 1Mib Limit 2 MB
segID3Rows, err := mockAllocator.allocID()
assert.Nil(t, err)
segInfo3Rows, err := BuildSegment(collID, partID1, segID3Rows, channelName)
segInfo3Rows, err := buildSegment(collID, partID1, segID3Rows, channelName)
segInfo3Rows.NumOfRows = 3
assert.Nil(t, err)
......@@ -95,7 +95,7 @@ func TestFlushMonitor(t *testing.T) {
for i := 0; i < 100; i++ {
segID, err := mockAllocator.allocID()
assert.Nil(t, err)
seg, err := BuildSegment(collID, partID0, segID, channelName2)
seg, err := buildSegment(collID, partID0, segID, channelName2)
assert.Nil(t, err)
seg.DmlPosition = &internalpb.MsgPosition{
Timestamp: uint64(i + 1),
......@@ -108,7 +108,7 @@ func TestFlushMonitor(t *testing.T) {
exSegID, err := mockAllocator.allocID()
assert.Nil(t, err)
seg, err := BuildSegment(collID, partID0, exSegID, channelName2)
seg, err := buildSegment(collID, partID0, exSegID, channelName2)
assert.Nil(t, err)
seg.DmlPosition = &internalpb.MsgPosition{
Timestamp: uint64(0), // the oldest
......
......@@ -16,6 +16,8 @@ import (
"go.uber.org/zap"
)
const serverNotServingErrMsg = "server is not serving"
func (s *Server) isClosed() bool {
return atomic.LoadInt64(&s.isServing) != 2
}
......@@ -44,11 +46,11 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}
if s.isClosed() {
resp.Reason = "server is closed"
resp.Reason = serverNotServingErrMsg
return resp, nil
}
if err := s.segmentManager.SealAllSegments(ctx, req.CollectionID); err != nil {
resp.Reason = fmt.Sprintf("Seal all segments error %s", err)
resp.Reason = fmt.Sprintf("Failed to flush %d, %s", req.CollectionID, err)
return resp, nil
}
return &commonpb.Status{
......@@ -60,6 +62,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
if s.isClosed() {
return &datapb.AssignSegmentIDResponse{
Status: &commonpb.Status{
Reason: serverNotServingErrMsg,
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}, nil
......@@ -85,7 +88,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
if !s.meta.HasCollection(r.CollectionID) {
if err := s.loadCollectionFromRootCoord(ctx, r.CollectionID); err != nil {
errMsg := fmt.Sprintf("can not load collection %d", r.CollectionID)
errMsg := fmt.Sprintf("Can not load collection %d", r.CollectionID)
appendFailedAssignment(errMsg)
log.Error("load collection from rootcoord error",
zap.Int64("collectionID", r.CollectionID),
......@@ -104,7 +107,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
segmentID, retCount, expireTs, err := s.segmentManager.AllocSegment(ctx,
r.CollectionID, r.PartitionID, r.ChannelName, int64(r.Count))
if err != nil {
errMsg := fmt.Sprintf("allocation of collection %d, partition %d, channel %s, count %d error: %s",
errMsg := fmt.Sprintf("Allocation of collection %d, partition %d, channel %s, count %d error: %s",
r.CollectionID, r.PartitionID, r.ChannelName, r.Count, err.Error())
appendFailedAssignment(errMsg)
continue
......@@ -142,7 +145,7 @@ func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentSta
},
}
if s.isClosed() {
resp.Status.Reason = "server is initializing"
resp.Status.Reason = serverNotServingErrMsg
return resp, nil
}
......@@ -154,7 +157,7 @@ func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentSta
segmentInfo, err := s.meta.GetSegment(segmentID)
if err != nil {
state.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
state.Status.Reason = "get segment states error: " + err.Error()
state.Status.Reason = fmt.Sprintf("Failed to get segment %d, %s", segmentID, err.Error())
} else {
state.Status.ErrorCode = commonpb.ErrorCode_Success
state.State = segmentInfo.GetState()
......@@ -174,7 +177,7 @@ func (s *Server) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert
},
}
if s.isClosed() {
resp.Status.Reason = "server is initializing"
resp.Status.Reason = serverNotServingErrMsg
return resp, nil
}
p := path.Join(Params.SegmentBinlogSubPath, strconv.FormatInt(req.SegmentID, 10)) + "/" // prefix/id/ instead of prefix/id
......@@ -212,7 +215,7 @@ func (s *Server) GetCollectionStatistics(ctx context.Context, req *datapb.GetCol
},
}
if s.isClosed() {
resp.Status.Reason = "server is initializing"
resp.Status.Reason = serverNotServingErrMsg
return resp, nil
}
nums, err := s.meta.GetNumRowsOfCollection(req.CollectionID)
......@@ -232,7 +235,7 @@ func (s *Server) GetPartitionStatistics(ctx context.Context, req *datapb.GetPart
},
}
if s.isClosed() {
resp.Status.Reason = "server is initializing"
resp.Status.Reason = serverNotServingErrMsg
return resp, nil
}
nums, err := s.meta.GetNumRowsOfPartition(req.CollectionID, req.PartitionID)
......@@ -261,7 +264,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
},
}
if s.isClosed() {
resp.Status.Reason = "data service is not healthy"
resp.Status.Reason = serverNotServingErrMsg
return resp, nil
}
infos := make([]*datapb.SegmentInfo, 0, len(req.SegmentIDs))
......@@ -283,7 +286,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}
if s.isClosed() {
resp.Reason = "server is closed"
resp.Reason = serverNotServingErrMsg
return resp, nil
}
log.Debug("Receive SaveBinlogPaths request",
......@@ -364,7 +367,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
},
}
if s.isClosed() {
resp.Status.Reason = "server is initializing"
resp.Status.Reason = serverNotServingErrMsg
return resp, nil
}
segmentIDs := s.meta.GetSegmentsOfPartition(collectionID, partitionID)
......
......@@ -32,15 +32,6 @@ const (
type errSegmentNotFound struct {
segmentID UniqueID
}
type errCollectionNotFound struct {
collectionID UniqueID
}
type meta struct {
sync.RWMutex
client kv.TxnKV // client of a reliable kv service, i.e. etcd client
collections map[UniqueID]*datapb.CollectionInfo // collection id to collection info
segments map[UniqueID]*datapb.SegmentInfo // segment id to segment info
}
func newErrSegmentNotFound(segmentID UniqueID) errSegmentNotFound {
return errSegmentNotFound{segmentID: segmentID}
......@@ -50,6 +41,10 @@ func (err errSegmentNotFound) Error() string {
return fmt.Sprintf("segment %d not found", err.segmentID)
}
type errCollectionNotFound struct {
collectionID UniqueID
}
func newErrCollectionNotFound(collectionID UniqueID) errCollectionNotFound {
return errCollectionNotFound{collectionID: collectionID}
}
......@@ -58,6 +53,62 @@ func (err errCollectionNotFound) Error() string {
return fmt.Sprintf("collection %d not found", err.collectionID)
}
type errPartitionNotFound struct {
partitionID UniqueID
}
func newErrPartitionNotFound(partitionID UniqueID) errPartitionNotFound {
return errPartitionNotFound{partitionID: partitionID}
}
func (err errPartitionNotFound) Error() string {
return fmt.Sprintf("partition %d not found", err.partitionID)
}
type errCollectionExist struct {
name string
id UniqueID
}
func newErrCollectionExist(collectionName string, collectionID UniqueID) errCollectionExist {
return errCollectionExist{name: collectionName, id: collectionID}
}
func (err errCollectionExist) Error() string {
return fmt.Sprintf("collection %s with id %d already exist", err.name, err.id)
}
type errPartitionExist struct {
id UniqueID
}
func newErrPartitionExist(partitionID UniqueID) errPartitionExist {
return errPartitionExist{id: partitionID}
}
func (err errPartitionExist) Error() string {
return fmt.Sprintf("partition %d already exist", err.id)
}
type errSegmentExist struct {
id UniqueID
}
func newErrSegmentExist(segmentID UniqueID) errSegmentExist {
return errSegmentExist{id: segmentID}
}
func (err errSegmentExist) Error() string {
return fmt.Sprintf("segment %d already exist", err.id)
}
type meta struct {
sync.RWMutex
client kv.TxnKV // client of a reliable kv service, i.e. etcd client
collections map[UniqueID]*datapb.CollectionInfo // collection id to collection info
segments map[UniqueID]*datapb.SegmentInfo // segment id to segment info
}
func newMeta(kv kv.TxnKV) (*meta, error) {
mt := &meta{
client: kv,
......@@ -93,27 +144,27 @@ func (m *meta) AddCollection(collection *datapb.CollectionInfo) error {
m.Lock()
defer m.Unlock()
if _, ok := m.collections[collection.ID]; ok {
return fmt.Errorf("collection %s with id %d already exist", collection.Schema.Name, collection.ID)
return newErrCollectionExist(collection.GetSchema().GetName(), collection.GetID())
}
m.collections[collection.ID] = collection
return nil
}
func (m *meta) DropCollection(collID UniqueID) error {
func (m *meta) DropCollection(collectionID UniqueID) error {
m.Lock()
defer m.Unlock()
if _, ok := m.collections[collID]; !ok {
return newErrCollectionNotFound(collID)
if _, ok := m.collections[collectionID]; !ok {
return newErrCollectionNotFound(collectionID)
}
key := fmt.Sprintf("%s/%d/", segmentPrefix, collID)
key := buildCollectionPath(collectionID)
if err := m.client.RemoveWithPrefix(key); err != nil {
return err
}
delete(m.collections, collID)
delete(m.collections, collectionID)
for i, info := range m.segments {
if info.CollectionID == collID {
if info.CollectionID == collectionID {
delete(m.segments, i)
}
}
......@@ -141,9 +192,9 @@ func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) (int64, error) {
m.RLock()
defer m.RUnlock()
var ret int64 = 0
for _, info := range m.segments {
if info.CollectionID == collectionID {
ret += info.NumOfRows
for _, segment := range m.segments {
if segment.CollectionID == collectionID {
ret += segment.GetNumOfRows()
}
}
return ret, nil
......@@ -153,7 +204,7 @@ func (m *meta) AddSegment(segment *datapb.SegmentInfo) error {
m.Lock()
defer m.Unlock()
if _, ok := m.segments[segment.ID]; ok {
return fmt.Errorf("segment %d already exist", segment.ID)
return newErrSegmentExist(segment.GetID())
}
m.segments[segment.ID] = segment
if err := m.saveSegmentInfo(segment); err != nil {
......@@ -179,13 +230,13 @@ func (m *meta) UpdateSegmentStatistic(stats *internalpb.SegmentStatisticsUpdates
func (m *meta) SetLastExpireTime(segmentID UniqueID, expireTs Timestamp) error {
m.Lock()
defer m.Unlock()
seg, ok := m.segments[segmentID]
segment, ok := m.segments[segmentID]
if !ok {
return newErrSegmentNotFound(segmentID)
}
seg.LastExpireTime = expireTs
segment.LastExpireTime = expireTs
if err := m.saveSegmentInfo(seg); err != nil {
if err := m.saveSegmentInfo(segment); err != nil {
return err
}
return nil
......@@ -238,7 +289,7 @@ func (m *meta) SaveBinlogAndCheckPoints(segID UniqueID, flushed bool,
startPositions []*datapb.SegmentStartPosition) error {
m.Lock()
defer m.Unlock()
segInfo, ok := m.segments[segID]
segment, ok := m.segments[segID]
if !ok {
return newErrSegmentNotFound(segID)
}
......@@ -247,7 +298,7 @@ func (m *meta) SaveBinlogAndCheckPoints(segID UniqueID, flushed bool,
kv[k] = v
}
if flushed {
segInfo.State = commonpb.SegmentState_Flushing
segment.State = commonpb.SegmentState_Flushing
}
modifiedSegments := make(map[UniqueID]struct{})
......@@ -281,9 +332,9 @@ func (m *meta) SaveBinlogAndCheckPoints(segID UniqueID, flushed bool,
}
for id := range modifiedSegments {
segInfo = m.segments[id]
segBytes := proto.MarshalTextString(segInfo)
key := m.prepareSegmentPath(segInfo)
segment = m.segments[id]
segBytes := proto.MarshalTextString(segment)
key := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
kv[key] = segBytes
}
......@@ -355,20 +406,20 @@ func (m *meta) AddPartition(collectionID UniqueID, partitionID UniqueID) error {
for _, t := range coll.Partitions {
if t == partitionID {
return fmt.Errorf("partition %d already exists", partitionID)
return newErrPartitionExist(partitionID)
}
}
coll.Partitions = append(coll.Partitions, partitionID)
return nil
}
func (m *meta) DropPartition(collID UniqueID, partitionID UniqueID) error {
func (m *meta) DropPartition(collectionID UniqueID, partitionID UniqueID) error {
m.Lock()
defer m.Unlock()
collection, ok := m.collections[collID]
collection, ok := m.collections[collectionID]
if !ok {
return newErrCollectionNotFound(collID)
return newErrCollectionNotFound(collectionID)
}
idx := -1
for i, id := range collection.Partitions {
......@@ -378,10 +429,10 @@ func (m *meta) DropPartition(collID UniqueID, partitionID UniqueID) error {
}
}
if idx == -1 {
return fmt.Errorf("cannot find partition id %d", partitionID)
return newErrPartitionNotFound(partitionID)
}
prefix := fmt.Sprintf("%s/%d/%d/", segmentPrefix, collID, partitionID)
prefix := buildPartitionPath(collectionID, partitionID)
if err := m.client.RemoveWithPrefix(prefix); err != nil {
return err
}
......@@ -451,24 +502,32 @@ func (m *meta) GetFlushingSegments() []*datapb.SegmentInfo {
func (m *meta) saveSegmentInfo(segment *datapb.SegmentInfo) error {
segBytes := proto.MarshalTextString(segment)
key := m.prepareSegmentPath(segment)
key := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
return m.client.Save(key, segBytes)
}
func (m *meta) removeSegmentInfo(segment *datapb.SegmentInfo) error {
key := m.prepareSegmentPath(segment)
key := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
return m.client.Remove(key)
}
func (m *meta) prepareSegmentPath(segInfo *datapb.SegmentInfo) string {
return fmt.Sprintf("%s/%d/%d/%d", segmentPrefix, segInfo.CollectionID, segInfo.PartitionID, segInfo.ID)
}
func (m *meta) saveKvTxn(kv map[string]string) error {
return m.client.MultiSave(kv)
}
func BuildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelName string) (*datapb.SegmentInfo, error) {
func buildSegmentPath(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID) string {
return fmt.Sprintf("%s/%d/%d/%d", segmentPrefix, collectionID, partitionID, segmentID)
}
func buildCollectionPath(collectionID UniqueID) string {
return fmt.Sprintf("%s/%d/", segmentPrefix, collectionID)
}
func buildPartitionPath(collectionID UniqueID, partitionID UniqueID) string {
return fmt.Sprintf("%s/%d/%d/", segmentPrefix, collectionID, partitionID)
}
func buildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelName string) (*datapb.SegmentInfo, error) {
return &datapb.SegmentInfo{
ID: segmentID,
CollectionID: collectionID,
......
......@@ -122,15 +122,15 @@ func TestMeta_Basic(t *testing.T) {
// create seg0 for partition0, seg0/seg1 for partition1
segID0_0, err := mockAllocator.allocID()
assert.Nil(t, err)
segInfo0_0, err := BuildSegment(collID, partID0, segID0_0, channelName)
segInfo0_0, err := buildSegment(collID, partID0, segID0_0, channelName)
assert.Nil(t, err)
segID1_0, err := mockAllocator.allocID()
assert.Nil(t, err)
segInfo1_0, err := BuildSegment(collID, partID1, segID1_0, channelName)
segInfo1_0, err := buildSegment(collID, partID1, segID1_0, channelName)
assert.Nil(t, err)
segID1_1, err := mockAllocator.allocID()
assert.Nil(t, err)
segInfo1_1, err := BuildSegment(collID, partID1, segID1_1, channelName)
segInfo1_1, err := buildSegment(collID, partID1, segID1_1, channelName)
assert.Nil(t, err)
// check AddSegment
......@@ -202,7 +202,7 @@ func TestMeta_Basic(t *testing.T) {
// add seg1 with 100 rows
segID0, err := mockAllocator.allocID()
assert.Nil(t, err)
segInfo0, err := BuildSegment(collID, partID0, segID0, channelName)
segInfo0, err := buildSegment(collID, partID0, segID0, channelName)
assert.Nil(t, err)
segInfo0.NumOfRows = rowCount0
err = meta.AddSegment(segInfo0)
......@@ -211,7 +211,7 @@ func TestMeta_Basic(t *testing.T) {
// add seg2 with 300 rows
segID1, err := mockAllocator.allocID()
assert.Nil(t, err)
segInfo1, err := BuildSegment(collID, partID0, segID1, channelName)
segInfo1, err := buildSegment(collID, partID0, segID1, channelName)
assert.Nil(t, err)
segInfo1.NumOfRows = rowCount1
err = meta.AddSegment(segInfo1)
......
......@@ -70,7 +70,7 @@ func getSegmentCapacityPolicy(sizeFactor float64) segmentSealPolicy {
return func(status *segmentStatus, ts Timestamp) bool {
var allocSize int64
for _, allocation := range status.allocations {
allocSize += allocation.rowNums
allocSize += allocation.numOfRows
}
// max, written, allocated := status.total, status.currentRows, allocSize
// float64(writtenCount) >= Params.SegmentSizeFactor*float64(maxCount)
......
......@@ -68,7 +68,7 @@ type segmentStatus struct {
}
type allocation struct {
rowNums int64
numOfRows int64
expireTime Timestamp
}
......@@ -250,16 +250,15 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
segID = status.id
retCount = requestRows
expireTime = status.lastExpireTime
return
}
func (s *SegmentManager) alloc(segStatus *segmentStatus, numRows int64) (bool, error) {
func (s *SegmentManager) alloc(status *segmentStatus, numOfRows int64) (bool, error) {
var allocSize int64
for _, allocation := range segStatus.allocations {
allocSize += allocation.rowNums
for _, allocItem := range status.allocations {
allocSize += allocItem.numOfRows
}
if !s.allocPolicy.apply(segStatus.total, segStatus.currentRows, allocSize, numRows) {
if !s.allocPolicy.apply(status.total, status.currentRows, allocSize, numOfRows) {
return false, nil
}
......@@ -269,13 +268,13 @@ func (s *SegmentManager) alloc(segStatus *segmentStatus, numRows int64) (bool, e
}
alloc := &allocation{
rowNums: numRows,
numOfRows: numOfRows,
expireTime: expireTs,
}
segStatus.lastExpireTime = expireTs
segStatus.allocations = append(segStatus.allocations, alloc)
status.lastExpireTime = expireTs
status.allocations = append(status.allocations, alloc)
if err := s.meta.SetLastExpireTime(segStatus.id, expireTs); err != nil {
if err := s.meta.SetLastExpireTime(status.id, expireTs); err != nil {
return false, err
}
return true, nil
......@@ -299,22 +298,22 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique
if err != nil {
return nil, err
}
totalRows, err := s.estimateTotalRows(collectionID)
maxNumOfRows, err := s.estimateMaxNumOfRows(collectionID)
if err != nil {
return nil, err
}
segStatus := &segmentStatus{
status := &segmentStatus{
id: id,
collectionID: collectionID,
partitionID: partitionID,
sealed: false,
total: int64(totalRows),
total: int64(maxNumOfRows),
insertChannel: channelName,
allocations: []*allocation{},
lastExpireTime: 0,
currentRows: 0,
}
s.stats[id] = segStatus
s.stats[id] = status
segmentInfo := &datapb.SegmentInfo{
ID: id,
......@@ -323,7 +322,7 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique
InsertChannel: channelName,
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
MaxRowNum: int64(totalRows),
MaxRowNum: int64(maxNumOfRows),
LastExpireTime: 0,
StartPosition: &internalpb.MsgPosition{
ChannelName: channelName,
......@@ -339,15 +338,14 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique
log.Debug("datacoord: estimateTotalRows: ",
zap.Int64("CollectionID", segmentInfo.CollectionID),
zap.Int64("SegmentID", segmentInfo.ID),
zap.Int("Rows", totalRows),
zap.String("channel", segmentInfo.InsertChannel))
zap.Int("Rows", maxNumOfRows),
zap.String("Channel", segmentInfo.InsertChannel))
s.helper.afterCreateSegment(segmentInfo)
return segStatus, nil
return status, nil
}
func (s *SegmentManager) estimateTotalRows(collectionID UniqueID) (int, error) {
func (s *SegmentManager) estimateMaxNumOfRows(collectionID UniqueID) (int, error) {
collMeta, err := s.meta.GetCollection(collectionID)
if err != nil {
return -1, err
......@@ -391,12 +389,12 @@ func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel strin
}
ret := make([]UniqueID, 0)
for _, segStatus := range s.stats {
if segStatus.insertChannel != channel {
for _, status := range s.stats {
if status.insertChannel != channel {
continue
}
if s.flushPolicy.apply(segStatus, t) {
ret = append(ret, segStatus.id)
if s.flushPolicy.apply(status, t) {
ret = append(ret, status.id)
}
}
......@@ -416,34 +414,34 @@ func (s *SegmentManager) UpdateSegmentStats(stat *internalpb.SegmentStatisticsUp
// tryToSealSegment applies segment & channel seal policies
func (s *SegmentManager) tryToSealSegment(ts Timestamp) error {
channelInfo := make(map[string][]*segmentStatus)
for _, segStatus := range s.stats {
channelInfo[segStatus.insertChannel] = append(channelInfo[segStatus.insertChannel], segStatus)
if segStatus.sealed {
for _, status := range s.stats {
channelInfo[status.insertChannel] = append(channelInfo[status.insertChannel], status)
if status.sealed {
continue
}
// change shouldSeal to segment seal policy logic
for _, policy := range s.segmentSealPolicies {
if policy(segStatus, ts) {
if err := s.meta.SealSegment(segStatus.id); err != nil {
if policy(status, ts) {
if err := s.meta.SealSegment(status.id); err != nil {
return err
}
segStatus.sealed = true
status.sealed = true
break
}
}
}
for channel, segs := range channelInfo {
for channel, segmentStats := range channelInfo {
for _, policy := range s.channelSealPolicies {
vs := policy(channel, segs, ts)
for _, seg := range vs {
if seg.sealed {
vs := policy(channel, segmentStats, ts)
for _, status := range vs {
if status.sealed {
continue
}
if err := s.meta.SealSegment(seg.id); err != nil {
if err := s.meta.SealSegment(status.id); err != nil {
return err
}
seg.sealed = true
status.sealed = true
}
}
}
......
......@@ -37,12 +37,20 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
)
const rootCoordClientTimout = 20 * time.Second
const (
rootCoordClientTimout = 20 * time.Second
connEtcdMaxRetryTime = 100000
connEtcdRetryInterval = 200 * time.Millisecond
)
type (
UniqueID = typeutil.UniqueID
Timestamp = typeutil.Timestamp
)
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
type rootCoordCreatorFunc func(ctx context.Context) (types.RootCoord, error)
type Server struct {
ctx context.Context
serverLoopCtx context.Context
......@@ -67,27 +75,30 @@ type Server struct {
activeCh <-chan bool
eventCh <-chan *sessionutil.SessionEvent
dataClientCreator func(addr string) (types.DataNode, error)
rootCoordClientCreator func(addr string) (types.RootCoord, error)
dataClientCreator dataNodeCreatorFunc
rootCoordClientCreator rootCoordCreatorFunc
}
func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
rand.Seed(time.Now().UnixNano())
s := &Server{
ctx: ctx,
msFactory: factory,
flushCh: make(chan UniqueID, 1024),
ctx: ctx,
msFactory: factory,
flushCh: make(chan UniqueID, 1024),
dataClientCreator: defaultDataNodeCreatorFunc,
rootCoordClientCreator: defaultRootCoordCreatorFunc,
}
s.dataClientCreator = func(addr string) (types.DataNode, error) {
return datanodeclient.NewClient(addr, 3*time.Second)
}
s.rootCoordClientCreator = func(addr string) (types.RootCoord, error) {
return rootcoordclient.NewClient(ctx, Params.MetaRootPath, Params.EtcdEndpoints, rootCoordClientTimout)
}
return s, nil
}
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
return datanodeclient.NewClient(ctx, addr, 3*time.Second)
}
func defaultRootCoordCreatorFunc(ctx context.Context) (types.RootCoord, error) {
return rootcoordclient.NewClient(ctx, Params.MetaRootPath, Params.EtcdEndpoints, rootCoordClientTimout)
}
// Register register data service at etcd
func (s *Server) Register() error {
s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
......@@ -127,7 +138,7 @@ func (s *Server) Start() error {
return err
}
s.allocator = newAllocator(s.rootCoordClient)
s.allocator = newRootCoordAllocator(s.ctx, s.rootCoordClient)
s.startSegmentManager()
if err = s.initFlushMsgStream(); err != nil {
......@@ -141,7 +152,7 @@ func (s *Server) Start() error {
s.startServerLoop()
atomic.StoreInt64(&s.isServing, 2)
log.Debug("start success")
log.Debug("DataCoordinator startup success")
return nil
}
......@@ -150,7 +161,7 @@ func (s *Server) initCluster() error {
if err != nil {
return err
}
sManager := newClusterSessionManager(s.dataClientCreator)
sManager := newClusterSessionManager(s.ctx, s.dataClientCreator)
s.cluster = newCluster(s.ctx, dManager, sManager, s)
return nil
}
......@@ -211,7 +222,7 @@ func (s *Server) initMeta() error {
}
return nil
}
return retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
return retry.Retry(connEtcdMaxRetryTime, connEtcdRetryInterval, connectEtcdFn)
}
func (s *Server) initFlushMsgStream() error {
......@@ -224,7 +235,6 @@ func (s *Server) initFlushMsgStream() error {
s.flushMsgStream.AsProducer([]string{Params.SegmentInfoChannelName})
log.Debug("DataCoord AsProducer:" + Params.SegmentInfoChannelName)
s.flushMsgStream.Start()
return nil
}
......@@ -251,6 +261,7 @@ func (s *Server) startStatsChannel(ctx context.Context) {
for {
select {
case <-ctx.Done():
log.Debug("stats channel shutdown")
return
default:
}
......@@ -290,7 +301,7 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
log.Debug("data node tt loop done")
log.Debug("data node tt loop shutdown")
return
default:
}
......@@ -381,7 +392,7 @@ func (s *Server) startActiveCheck(ctx context.Context) {
continue
}
s.Stop()
log.Debug("disconnect with etcd")
log.Debug("disconnect with etcd and shutdown data coordinator")
return
case <-ctx.Done():
log.Debug("connection check shutdown")
......@@ -438,8 +449,7 @@ func (s *Server) handleFlushingSegments(ctx context.Context) {
func (s *Server) initRootCoordClient() error {
var err error
s.rootCoordClient, err = s.rootCoordClientCreator("")
if err != nil {
if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx); err != nil {
return err
}
if err = s.rootCoordClient.Init(); err != nil {
......
......@@ -636,7 +636,7 @@ func TestGetRecoveryInfo(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
svr.rootCoordClientCreator = func(addr string) (types.RootCoord, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoord, error) {
return newMockRootCoordService(), nil
}
......@@ -773,10 +773,10 @@ func newTestServer(t *testing.T, receiveCh chan interface{}) *Server {
svr, err := CreateServer(context.TODO(), factory)
assert.Nil(t, err)
svr.dataClientCreator = func(addr string) (types.DataNode, error) {
svr.dataClientCreator = func(ctx context.Context, addr string) (types.DataNode, error) {
return newMockDataNodeClient(0, receiveCh)
}
svr.rootCoordClientCreator = func(addr string) (types.RootCoord, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoord, error) {
return newMockRootCoordService(), nil
}
assert.Nil(t, err)
......
......@@ -45,7 +45,7 @@ type Client struct {
recallTry int
}
func NewClient(addr string, timeout time.Duration) (*Client, error) {
func NewClient(ctx context.Context, addr string, timeout time.Duration) (*Client, error) {
if addr == "" {
return nil, fmt.Errorf("address is empty")
}
......@@ -54,7 +54,7 @@ func NewClient(addr string, timeout time.Duration) (*Client, error) {
grpc: nil,
conn: nil,
addr: addr,
ctx: context.Background(),
ctx: ctx,
timeout: timeout,
recallTry: 3,
reconnTry: 10,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册