未验证 提交 3d6bded1 编写于 作者: X Xiaofan 提交者: GitHub

Fix Row Number Mismatch (#22307)

Signed-off-by: Nxiaofan-luan <xiaofan.luan@zilliz.com>
上级 697dedac
......@@ -121,8 +121,14 @@ func (c *Cluster) Import(ctx context.Context, nodeID int64, it *datapb.ImportTas
}
// ReCollectSegmentStats triggers a ReCollectSegmentStats call from session manager.
func (c *Cluster) ReCollectSegmentStats(ctx context.Context, nodeID int64) {
c.sessionManager.ReCollectSegmentStats(ctx, nodeID)
func (c *Cluster) ReCollectSegmentStats(ctx context.Context) error {
for _, node := range c.sessionManager.getLiveNodeIDs() {
err := c.sessionManager.ReCollectSegmentStats(ctx, node)
if err != nil {
return err
}
}
return nil
}
// GetSessions returns all sessions
......
......@@ -617,7 +617,7 @@ func TestCluster_ReCollectSegmentStats(t *testing.T) {
assert.NoError(t, err)
assert.NotPanics(t, func() {
cluster.ReCollectSegmentStats(ctx, 1)
cluster.ReCollectSegmentStats(ctx)
})
time.Sleep(500 * time.Millisecond)
})
......@@ -643,7 +643,7 @@ func TestCluster_ReCollectSegmentStats(t *testing.T) {
assert.NoError(t, err)
assert.NotPanics(t, func() {
cluster.ReCollectSegmentStats(ctx, 1)
cluster.ReCollectSegmentStats(ctx)
})
time.Sleep(500 * time.Millisecond)
})
......
......@@ -910,3 +910,14 @@ func getFieldBinlogPaths(id int64, paths ...string) *datapb.FieldBinlog {
}
return l
}
func getFieldBinlogPathsWithEntry(id int64, entry int64, paths ...string) *datapb.FieldBinlog {
l := &datapb.FieldBinlog{
FieldID: id,
Binlogs: make([]*datapb.Binlog, 0, len(paths)),
}
for _, path := range paths {
l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogPath: path, EntriesNum: entry})
}
return l
}
......@@ -40,6 +40,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/metautil"
"github.com/milvus-io/milvus/internal/util/segmentutil"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
......@@ -440,10 +441,6 @@ func (m *meta) UpdateFlushSegmentsInfo(
defer m.Unlock()
segment := m.segments.GetSegment(segmentID)
if importing {
m.segments.SetRowCount(segmentID, segment.currRows)
segment = m.segments.GetSegment(segmentID)
}
if segment == nil || !isSegmentHealthy(segment) {
log.Warn("meta update: update flush segments info - segment not found",
zap.Int64("segment ID", segmentID),
......@@ -532,20 +529,46 @@ func (m *meta) UpdateFlushSegmentsInfo(
s.StartPosition = pos.GetStartPosition()
modSegments[pos.GetSegmentID()] = s
}
for _, cp := range checkpoints {
s := getClonedSegment(cp.GetSegmentID())
if s == nil {
continue
}
if s.DmlPosition != nil && s.DmlPosition.Timestamp >= cp.Position.Timestamp {
// segment position in etcd is larger than checkpoint, then dont change it
continue
if importing {
s := clonedSegment
count := segmentutil.CalcRowCountFromBinLog(s.SegmentInfo)
if count != segment.currRows {
log.Info("check point reported inconsistent with bin log row count",
zap.Int64("segment ID", segment.GetID()),
zap.Int64("current rows (wrong)", segment.currRows),
zap.Int64("segment bin log row count (correct)", count))
}
s.NumOfRows = count
modSegments[segmentID] = s
} else {
for _, cp := range checkpoints {
if cp.SegmentID != segmentID {
// Don't think this is gonna to happen, ignore for now.
log.Warn("checkpoint in segment is not same as flush segment to update, igreo", zap.Int64("current", segmentID), zap.Int64("checkpoint segment", cp.SegmentID))
continue
}
s := clonedSegment
if s.DmlPosition != nil && s.DmlPosition.Timestamp >= cp.Position.Timestamp {
log.Warn("checkpoint in segment is larger than reported", zap.Any("current", s.GetDmlPosition()), zap.Any("reported", cp.GetPosition()))
// segment position in etcd is larger than checkpoint, then dont change it
continue
}
s.DmlPosition = cp.GetPosition()
s.NumOfRows = cp.GetNumOfRows()
modSegments[cp.GetSegmentID()] = s
count := segmentutil.CalcRowCountFromBinLog(s.SegmentInfo)
// count should smaller than or equal to cp reported
if count != cp.NumOfRows {
log.Info("check point reported inconsistent with bin log row count",
zap.Int64("segment ID", segment.GetID()),
zap.Int64("check point (wrong)", cp.NumOfRows),
zap.Int64("segment bin log row count (correct)", count))
}
s.NumOfRows = count
s.DmlPosition = cp.GetPosition()
modSegments[cp.GetSegmentID()] = s
}
}
segments := make([]*datapb.SegmentInfo, 0, len(modSegments))
for _, seg := range modSegments {
......
......@@ -436,7 +436,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
err = meta.AddSegment(segment1)
assert.Nil(t, err)
err = meta.UpdateFlushSegmentsInfo(1, true, false, true, []*datapb.FieldBinlog{getFieldBinlogPaths(1, getInsertLogPath("binlog1", 1))},
err = meta.UpdateFlushSegmentsInfo(1, true, false, true, []*datapb.FieldBinlog{getFieldBinlogPathsWithEntry(1, 10, getInsertLogPath("binlog1", 1))},
[]*datapb.FieldBinlog{getFieldBinlogPaths(1, getStatsLogPath("statslog1", 1))},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1)}}}},
[]*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}})
......
......@@ -662,7 +662,7 @@ func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.Dat
}
err = s.cluster.Flush(s.ctx, ttMsg.GetBase().GetSourceID(), ch, finfo)
if err != nil {
log.Warn("handle")
log.Warn("failed to handle flush", zap.Any("source", ttMsg.GetBase().GetSourceID()), zap.Error(err))
return err
}
......@@ -671,16 +671,30 @@ func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.Dat
func (s *Server) updateSegmentStatistics(stats []*datapb.SegmentStats) {
for _, stat := range stats {
segment := s.meta.GetSegmentUnsafe(stat.GetSegmentID())
if segment == nil {
log.Warn("skip updating row number for not exist segment",
zap.Int64("segment ID", stat.GetSegmentID()),
zap.Int64("new value", stat.GetNumRows()))
continue
}
if isFlushState(segment.GetState()) {
log.Warn("skip updating row number for flushed segment",
zap.Int64("segment ID", stat.GetSegmentID()),
zap.Int64("new value", stat.GetNumRows()))
continue
}
// Log if # of rows is updated.
if s.meta.GetSegmentUnsafe(stat.GetSegmentID()) != nil &&
s.meta.GetSegmentUnsafe(stat.GetSegmentID()).GetNumOfRows() != stat.GetNumRows() {
if segment.currRows < stat.GetNumRows() {
log.Info("Updating segment number of rows",
zap.Int64("segment ID", stat.GetSegmentID()),
zap.Int64("old value", s.meta.GetSegmentUnsafe(stat.GetSegmentID()).GetNumOfRows()),
zap.Int64("new value", stat.GetNumRows()),
)
s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
}
s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
}
}
......@@ -995,7 +1009,16 @@ func (s *Server) reCollectSegmentStats(ctx context.Context) {
nodes := s.sessionManager.getLiveNodeIDs()
log.Info("re-collecting segment stats from DataNodes",
zap.Int64s("DataNode IDs", nodes))
for _, node := range nodes {
s.cluster.ReCollectSegmentStats(ctx, node)
reCollectFunc := func() error {
err := s.cluster.ReCollectSegmentStats(ctx)
if err != nil {
return err
}
return nil
}
if err := retry.Do(ctx, reCollectFunc, retry.Attempts(20), retry.Sleep(time.Millisecond*100), retry.MaxSleepTime(5*time.Second)); err != nil {
panic(err)
}
}
......@@ -1286,24 +1286,26 @@ func TestSaveBinlogPaths(t *testing.T) {
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "/by-dev/test/0/1/1/1/Allo1",
LogPath: "/by-dev/test/0/1/1/1/Allo1",
EntriesNum: 5,
},
{
LogPath: "/by-dev/test/0/1/1/1/Allo2",
LogPath: "/by-dev/test/0/1/1/1/Allo2",
EntriesNum: 5,
},
},
},
},
CheckPoints: []*datapb.CheckPoint{
{
SegmentID: 0,
SegmentID: 1,
Position: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 0,
},
NumOfRows: 10,
NumOfRows: 12,
},
},
Flushed: false,
......@@ -1322,11 +1324,9 @@ func TestSaveBinlogPaths(t *testing.T) {
assert.EqualValues(t, "/by-dev/test/0/1/1/1/Allo1", fieldBinlogs.GetBinlogs()[0].GetLogPath())
assert.EqualValues(t, "/by-dev/test/0/1/1/1/Allo2", fieldBinlogs.GetBinlogs()[1].GetLogPath())
segmentInfo := svr.meta.GetSegment(0)
assert.NotNil(t, segmentInfo)
assert.EqualValues(t, segmentInfo.DmlPosition.ChannelName, "ch1")
assert.EqualValues(t, segmentInfo.DmlPosition.MsgID, []byte{1, 2, 3})
assert.EqualValues(t, segmentInfo.NumOfRows, 10)
assert.EqualValues(t, segment.DmlPosition.ChannelName, "ch1")
assert.EqualValues(t, segment.DmlPosition.MsgID, []byte{1, 2, 3})
assert.EqualValues(t, segment.NumOfRows, 10)
})
t.Run("with channel not matched", func(t *testing.T) {
......@@ -3429,7 +3429,62 @@ func TestDataCoord_Import(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
})
}
func TestDataCoord_SegmentStatistics(t *testing.T) {
t.Run("test update imported segment stat", func(t *testing.T) {
svr := newTestServer(t, nil)
seg1 := &datapb.SegmentInfo{
ID: 100,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPathsWithEntry(101, 1, getInsertLogPath("log1", 100))},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log2", 100))},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 100))},
State: commonpb.SegmentState_Importing,
}
info := NewSegmentInfo(seg1)
svr.meta.AddSegment(info)
status, err := svr.UpdateSegmentStatistics(context.TODO(), &datapb.UpdateSegmentStatisticsRequest{
Stats: []*datapb.SegmentStats{{
SegmentID: 100,
NumRows: int64(1),
}},
})
assert.NoError(t, err)
assert.Equal(t, svr.meta.GetSegment(100).currRows, int64(1))
assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode())
closeTestServer(t, svr)
})
t.Run("test update flushed segment stat", func(t *testing.T) {
svr := newTestServer(t, nil)
seg1 := &datapb.SegmentInfo{
ID: 100,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPathsWithEntry(101, 1, getInsertLogPath("log1", 100))},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log2", 100))},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 100))},
State: commonpb.SegmentState_Flushed,
}
info := NewSegmentInfo(seg1)
svr.meta.AddSegment(info)
status, err := svr.UpdateSegmentStatistics(context.TODO(), &datapb.UpdateSegmentStatisticsRequest{
Stats: []*datapb.SegmentStats{{
SegmentID: 100,
NumRows: int64(1),
}},
})
assert.NoError(t, err)
assert.Equal(t, svr.meta.GetSegment(100).currRows, int64(0))
assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode())
closeTestServer(t, svr)
})
}
func TestDataCoord_SaveImportSegment(t *testing.T) {
......
......@@ -35,9 +35,8 @@ import (
const (
flushTimeout = 15 * time.Second
// TODO: evaluate and update import timeout.
importTimeout = 3 * time.Hour
reCollectTimeout = 5 * time.Second
addSegmentTimeout = 30 * time.Second
importTimeout = 3 * time.Hour
reCollectTimeout = 5 * time.Second
)
// SessionManager provides the grpc interfaces of cluster
......@@ -207,15 +206,11 @@ func (c *SessionManager) execImport(ctx context.Context, nodeID int64, itr *data
}
// ReCollectSegmentStats collects segment stats info from DataNodes, after DataCoord reboots.
func (c *SessionManager) ReCollectSegmentStats(ctx context.Context, nodeID int64) {
go c.execReCollectSegmentStats(ctx, nodeID)
}
func (c *SessionManager) execReCollectSegmentStats(ctx context.Context, nodeID int64) {
func (c *SessionManager) ReCollectSegmentStats(ctx context.Context, nodeID int64) error {
cli, err := c.getClient(ctx, nodeID)
if err != nil {
log.Warn("failed to get dataNode client", zap.Int64("DataNode ID", nodeID), zap.Error(err))
return
return err
}
ctx, cancel := context.WithTimeout(ctx, reCollectTimeout)
defer cancel()
......@@ -226,13 +221,14 @@ func (c *SessionManager) execReCollectSegmentStats(ctx context.Context, nodeID i
),
})
if err := VerifyResponse(resp, err); err != nil {
log.Error("re-collect segment stats call failed",
log.Warn("re-collect segment stats call failed",
zap.Int64("DataNode ID", nodeID), zap.Error(err))
} else {
log.Info("re-collect segment stats call succeeded",
zap.Int64("DataNode ID", nodeID),
zap.Int64s("segment stat collected", resp.GetSegResent()))
return err
}
log.Info("re-collect segment stats call succeeded",
zap.Int64("DataNode ID", nodeID),
zap.Int64s("segment stat collected", resp.GetSegResent()))
return nil
}
func (c *SessionManager) GetCompactionState() map[int64]*datapb.CompactionStateResult {
......
......@@ -472,7 +472,6 @@ func (c *ChannelMeta) updateStatistics(segID UniqueID, numRows int64) {
log.Info("updating segment", zap.Int64("Segment ID", segID), zap.Int64("numRows", numRows))
seg, ok := c.segments[segID]
if ok && seg.notFlushed() {
seg.memorySize = 0
seg.numRows += numRows
return
}
......
......@@ -66,12 +66,6 @@ func (mt *mergedTimeTickerSender) bufferTs(ts Timestamp, segmentIDs []int64) {
for _, sid := range segmentIDs {
mt.segmentIDs[sid] = struct{}{}
}
if !mt.lastSent.IsZero() && time.Since(mt.lastSent) > time.Millisecond*100 {
mt.cond.L.Lock()
defer mt.cond.L.Unlock()
mt.cond.Signal()
}
}
func (mt *mergedTimeTickerSender) tick() {
......@@ -118,13 +112,19 @@ func (mt *mergedTimeTickerSender) work() {
for sid := range mt.segmentIDs {
sids = append(sids, sid)
}
mt.segmentIDs = make(map[int64]struct{})
// we will reset the timer but not the segmentIDs, since if we sent the timetick fail we may block forever due to flush stuck
lastTs = mt.ts
mt.lastSent = time.Now()
if err := mt.send(mt.ts, sids); err != nil {
log.Error("send hard time tick failed", zap.Error(err))
mt.mu.Unlock()
continue
}
mt.segmentIDs = make(map[int64]struct{})
}
mt.mu.Unlock()
}
......
package datanode
import (
"errors"
"sync"
"testing"
"time"
......@@ -55,3 +56,29 @@ func TestMergedTimeTicker_close10000(t *testing.T) {
case <-done:
}
}
func TestMergedTimeTickerSendFail(t *testing.T) {
var ticks []uint64
var mut sync.Mutex
first := true
mt := newMergedTimeTickerSender(func(ts Timestamp, _ []int64) error {
mut.Lock()
defer mut.Unlock()
if first {
first = false
return errors.New("merged time tick")
}
ticks = append(ticks, ts)
return nil
})
for i := 1; i < 100; i++ {
time.Sleep(time.Millisecond * 10)
mt.bufferTs(uint64(i), nil)
}
mt.close()
mut.Lock()
assert.EqualValues(t, 99, ticks[len(ticks)-1])
assert.Less(t, len(ticks), 20)
mut.Unlock()
}
......@@ -781,6 +781,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
updates, _ := dsService.channel.getSegmentStatisticsUpdates(pack.segmentID)
checkPoints = append(checkPoints, &datapb.CheckPoint{
SegmentID: pack.segmentID,
// this shouldn't be used because we are not sure this is aligned
NumOfRows: updates.GetNumRows(),
Position: pack.pos,
})
......
......@@ -116,7 +116,7 @@ func PrintAccessInfo(ctx context.Context, resp interface{}, err error, rpcInfo *
//get trace ID of task
traceID, ok := getTraceID(ctx)
if !ok {
log.Warn("access log print failed: cloud not get trace ID")
log.Warn("access log print failed: could not get trace ID")
return false
}
fields = append(fields, zap.String("traceId", traceID))
......@@ -124,7 +124,7 @@ func PrintAccessInfo(ctx context.Context, resp interface{}, err error, rpcInfo *
//get response size of task
responseSize, ok := getResponseSize(resp)
if !ok {
log.Warn("access log print failed: cloud not get response size")
log.Warn("access log print failed: could not get response size")
return false
}
fields = append(fields, zap.Int("responseSize", responseSize))
......@@ -132,8 +132,8 @@ func PrintAccessInfo(ctx context.Context, resp interface{}, err error, rpcInfo *
//get err code of task
errCode, ok := getErrCode(resp)
if !ok {
log.Warn("access log print failed: cloud not get error code")
return false
// unknown error code
errCode = -1
}
fields = append(fields, zap.Int("errorCode", errCode))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册