未验证 提交 1b1d4e50 编写于 作者: C Cai Yudong 提交者: GitHub

Remove session from timetickSync (#15255)

Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 ac3e30db
...@@ -1032,7 +1032,7 @@ func (c *Core) Init() error { ...@@ -1032,7 +1032,7 @@ func (c *Core) Init() error {
} }
chanMap := c.MetaTable.ListCollectionPhysicalChannels() chanMap := c.MetaTable.ListCollectionPhysicalChannels()
c.chanTimeTick = newTimeTickSync(c.ctx, c.session, c.msFactory, chanMap) c.chanTimeTick = newTimeTickSync(c.ctx, c.session.ServerID, c.msFactory, chanMap)
c.chanTimeTick.addProxy(c.session) c.chanTimeTick.addProxy(c.session)
c.proxyClientManager = newProxyClientManager(c) c.proxyClientManager = newProxyClientManager(c)
...@@ -1040,7 +1040,7 @@ func (c *Core) Init() error { ...@@ -1040,7 +1040,7 @@ func (c *Core) Init() error {
c.proxyManager = newProxyManager( c.proxyManager = newProxyManager(
c.ctx, c.ctx,
c.etcdCli, c.etcdCli,
c.chanTimeTick.getProxy, c.chanTimeTick.clearProxy,
c.proxyClientManager.GetProxyClients, c.proxyClientManager.GetProxyClients,
) )
c.proxyManager.AddSession(c.chanTimeTick.addProxy, c.proxyClientManager.AddProxyClient) c.proxyManager.AddSession(c.chanTimeTick.addProxy, c.proxyClientManager.AddProxyClient)
......
...@@ -44,8 +44,8 @@ var ( ...@@ -44,8 +44,8 @@ var (
) )
type timetickSync struct { type timetickSync struct {
ctx context.Context ctx context.Context
session *sessionutil.Session sourceID int64
dmlChannels *dmlChannels // used for insert dmlChannels *dmlChannels // used for insert
deltaChannels *dmlChannels // used for delete deltaChannels *dmlChannels // used for delete
...@@ -85,7 +85,7 @@ func (c *chanTsMsg) getTimetick(channelName string) typeutil.Timestamp { ...@@ -85,7 +85,7 @@ func (c *chanTsMsg) getTimetick(channelName string) typeutil.Timestamp {
return c.defaultTs return c.defaultTs
} }
func newTimeTickSync(ctx context.Context, session *sessionutil.Session, factory msgstream.Factory, chanMap map[typeutil.UniqueID][]string) *timetickSync { func newTimeTickSync(ctx context.Context, sourceID int64, factory msgstream.Factory, chanMap map[typeutil.UniqueID][]string) *timetickSync {
// initialize dml channels used for insert // initialize dml channels used for insert
dmlChannels := newDmlChannels(ctx, factory, Params.RootCoordCfg.DmlChannelName, Params.RootCoordCfg.DmlChannelNum) dmlChannels := newDmlChannels(ctx, factory, Params.RootCoordCfg.DmlChannelName, Params.RootCoordCfg.DmlChannelNum)
// initialize delta channels used for delete, share Params.DmlChannelNum with dmlChannels // initialize delta channels used for delete, share Params.DmlChannelNum with dmlChannels
...@@ -110,8 +110,8 @@ func newTimeTickSync(ctx context.Context, session *sessionutil.Session, factory ...@@ -110,8 +110,8 @@ func newTimeTickSync(ctx context.Context, session *sessionutil.Session, factory
} }
return &timetickSync{ return &timetickSync{
ctx: ctx, ctx: ctx,
session: session, sourceID: sourceID,
dmlChannels: dmlChannels, dmlChannels: dmlChannels,
deltaChannels: deltaChannels, deltaChannels: deltaChannels,
...@@ -236,7 +236,7 @@ func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason ...@@ -236,7 +236,7 @@ func (t *timetickSync) updateTimeTick(in *internalpb.ChannelTimeTickMsg, reason
return nil return nil
} }
if in.Base.SourceID == t.session.ServerID { if in.Base.SourceID == t.sourceID {
if prev != nil && in.DefaultTimestamp <= prev.defaultTs { if prev != nil && in.DefaultTimestamp <= prev.defaultTs {
log.Debug("timestamp go back", zap.Int64("source id", in.Base.SourceID), log.Debug("timestamp go back", zap.Int64("source id", in.Base.SourceID),
zap.Uint64("curr ts", in.DefaultTimestamp), zap.Uint64("curr ts", in.DefaultTimestamp),
...@@ -273,7 +273,7 @@ func (t *timetickSync) delProxy(sess *sessionutil.Session) { ...@@ -273,7 +273,7 @@ func (t *timetickSync) delProxy(sess *sessionutil.Session) {
} }
} }
func (t *timetickSync) getProxy(sess []*sessionutil.Session) { func (t *timetickSync) clearProxy(sess []*sessionutil.Session) {
t.lock.Lock() t.lock.Lock()
defer t.lock.Unlock() defer t.lock.Unlock()
for _, s := range sess { for _, s := range sess {
...@@ -304,7 +304,7 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) { ...@@ -304,7 +304,7 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) {
} }
// reduce each channel to get min timestamp // reduce each channel to get min timestamp
local := proxyTimetick[t.session.ServerID] local := proxyTimetick[t.sourceID]
if len(local.chanTs) == 0 { if len(local.chanTs) == 0 {
continue continue
} }
...@@ -356,7 +356,7 @@ func (t *timetickSync) sendTimeTickToChannel(chanNames []string, ts typeutil.Tim ...@@ -356,7 +356,7 @@ func (t *timetickSync) sendTimeTickToChannel(chanNames []string, ts typeutil.Tim
MsgType: commonpb.MsgType_TimeTick, MsgType: commonpb.MsgType_TimeTick,
MsgID: 0, MsgID: 0,
Timestamp: ts, Timestamp: ts,
SourceID: t.session.ServerID, SourceID: t.sourceID,
}, },
} }
timeTickMsg := &msgstream.TimeTickMsg{ timeTickMsg := &msgstream.TimeTickMsg{
......
...@@ -26,15 +26,11 @@ import ( ...@@ -26,15 +26,11 @@ import (
"github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/sessionutil"
) )
func TestTimetickSync(t *testing.T) { func TestTimetickSync(t *testing.T) {
ctx := context.Background() ctx := context.Background()
sourceID := int64(100)
session := &sessionutil.Session{
ServerID: 100,
}
factory := msgstream.NewPmsFactory() factory := msgstream.NewPmsFactory()
m := map[string]interface{}{ m := map[string]interface{}{
...@@ -51,7 +47,7 @@ func TestTimetickSync(t *testing.T) { ...@@ -51,7 +47,7 @@ func TestTimetickSync(t *testing.T) {
Params.RootCoordCfg.DmlChannelNum = 2 Params.RootCoordCfg.DmlChannelNum = 2
Params.RootCoordCfg.DmlChannelName = "rootcoord-dml" Params.RootCoordCfg.DmlChannelName = "rootcoord-dml"
Params.RootCoordCfg.DeltaChannelName = "rootcoord-delta" Params.RootCoordCfg.DeltaChannelName = "rootcoord-delta"
ttSync := newTimeTickSync(ctx, session, factory, nil) ttSync := newTimeTickSync(ctx, sourceID, factory, nil)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
...@@ -108,7 +104,7 @@ func TestTimetickSync(t *testing.T) { ...@@ -108,7 +104,7 @@ func TestTimetickSync(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
ttSync.ddlMinTs = uint64(300) ttSync.ddlMinTs = uint64(300)
ttSync.session.ServerID = int64(1) ttSync.sourceID = int64(1)
err = ttSync.updateTimeTick(msg, "1") err = ttSync.updateTimeTick(msg, "1")
assert.Nil(t, err) assert.Nil(t, err)
}) })
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册