提交 6ea11f67 编写于 作者: D dragondriver 提交者: zhenshan.cao

Dock channelsTimeTicker, channelsMgr with master service (#5509)

Signed-off-by: Ndragondriver <jiquan.long@zilliz.com>
上级 2cc330b1
......@@ -64,16 +64,16 @@ func getUniqueIntGeneratorIns() uniqueIntGenerator {
type getChannelsFuncType = func(collectionID UniqueID) (map[vChan]pChan, error)
type masterService interface {
type getChannelsService interface {
GetChannels(collectionID UniqueID) (map[vChan]pChan, error)
}
type mockMaster struct {
type mockGetChannelsService struct {
collectionID2Channels map[UniqueID]map[vChan]pChan
}
func newMockMaster() *mockMaster {
return &mockMaster{
func newMockGetChannelsService() *mockGetChannelsService {
return &mockGetChannelsService{
collectionID2Channels: make(map[UniqueID]map[vChan]pChan),
}
}
......@@ -87,37 +87,7 @@ func genUniqueStr() string {
return fmt.Sprintf("%X", b)
}
func (m *mockMaster) GetChannels(collectionID UniqueID) (map[vChan]pChan, error) {
channels, ok := m.collectionID2Channels[collectionID]
if ok {
return channels, nil
}
channels = make(map[vChan]pChan)
l := rand.Uint64()%10 + 1
for i := 0; uint64(i) < l; i++ {
channels[genUniqueStr()] = genUniqueStr()
}
m.collectionID2Channels[collectionID] = channels
return channels, nil
}
type queryService interface {
GetChannels(collectionID UniqueID) (map[vChan]pChan, error)
}
type mockQueryService struct {
collectionID2Channels map[UniqueID]map[vChan]pChan
}
func newMockQueryService() *mockQueryService {
return &mockQueryService{
collectionID2Channels: make(map[UniqueID]map[vChan]pChan),
}
}
func (m *mockQueryService) GetChannels(collectionID UniqueID) (map[vChan]pChan, error) {
func (m *mockGetChannelsService) GetChannels(collectionID UniqueID) (map[vChan]pChan, error) {
channels, ok := m.collectionID2Channels[collectionID]
if ok {
return channels, nil
......@@ -475,9 +445,9 @@ func (mgr *channelsMgrImpl) removeAllDMLStream() error {
return mgr.dmlChannelsMgr.removeAllStream()
}
func newChannelsMgr(master masterService, query queryService, msgStreamFactory msgstream.Factory) channelsMgr {
func newChannelsMgr(getDmlChannelsFunc getChannelsFuncType, getDqlChannelsFunc getChannelsFuncType, msgStreamFactory msgstream.Factory) channelsMgr {
return &channelsMgrImpl{
dmlChannelsMgr: newSingleTypeChannelsMgr(master.GetChannels, msgStreamFactory),
dqlChannelsMgr: newSingleTypeChannelsMgr(query.GetChannels, msgStreamFactory),
dmlChannelsMgr: newSingleTypeChannelsMgr(getDmlChannelsFunc, msgStreamFactory),
dqlChannelsMgr: newSingleTypeChannelsMgr(getDqlChannelsFunc, msgStreamFactory),
}
}
......@@ -23,10 +23,10 @@ func TestNaiveUniqueIntGenerator_get(t *testing.T) {
}
func TestChannelsMgrImpl_getChannels(t *testing.T) {
master := newMockMaster()
query := newMockQueryService()
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := msgstream.NewSimpleMsgStreamFactory()
mgr := newChannelsMgr(master, query, factory)
mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory)
defer mgr.removeAllDMLStream()
collID := UniqueID(getUniqueIntGeneratorIns().get())
......@@ -41,10 +41,10 @@ func TestChannelsMgrImpl_getChannels(t *testing.T) {
}
func TestChannelsMgrImpl_getVChannels(t *testing.T) {
master := newMockMaster()
query := newMockQueryService()
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := msgstream.NewSimpleMsgStreamFactory()
mgr := newChannelsMgr(master, query, factory)
mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory)
defer mgr.removeAllDMLStream()
collID := UniqueID(getUniqueIntGeneratorIns().get())
......@@ -59,10 +59,10 @@ func TestChannelsMgrImpl_getVChannels(t *testing.T) {
}
func TestChannelsMgrImpl_createDMLMsgStream(t *testing.T) {
master := newMockMaster()
query := newMockQueryService()
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := msgstream.NewSimpleMsgStreamFactory()
mgr := newChannelsMgr(master, query, factory)
mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory)
defer mgr.removeAllDMLStream()
collID := UniqueID(getUniqueIntGeneratorIns().get())
......@@ -81,10 +81,10 @@ func TestChannelsMgrImpl_createDMLMsgStream(t *testing.T) {
}
func TestChannelsMgrImpl_getDMLMsgStream(t *testing.T) {
master := newMockMaster()
query := newMockQueryService()
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := msgstream.NewSimpleMsgStreamFactory()
mgr := newChannelsMgr(master, query, factory)
mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory)
defer mgr.removeAllDMLStream()
collID := UniqueID(getUniqueIntGeneratorIns().get())
......@@ -99,10 +99,10 @@ func TestChannelsMgrImpl_getDMLMsgStream(t *testing.T) {
}
func TestChannelsMgrImpl_removeDMLMsgStream(t *testing.T) {
master := newMockMaster()
query := newMockQueryService()
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := msgstream.NewSimpleMsgStreamFactory()
mgr := newChannelsMgr(master, query, factory)
mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory)
defer mgr.removeAllDMLStream()
collID := UniqueID(getUniqueIntGeneratorIns().get())
......@@ -126,10 +126,10 @@ func TestChannelsMgrImpl_removeDMLMsgStream(t *testing.T) {
}
func TestChannelsMgrImpl_removeAllDMLMsgStream(t *testing.T) {
master := newMockMaster()
query := newMockQueryService()
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := msgstream.NewSimpleMsgStreamFactory()
mgr := newChannelsMgr(master, query, factory)
mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory)
defer mgr.removeAllDMLStream()
num := 10
......@@ -141,11 +141,11 @@ func TestChannelsMgrImpl_removeAllDMLMsgStream(t *testing.T) {
}
func TestChannelsMgrImpl_createDQLMsgStream(t *testing.T) {
master := newMockMaster()
query := newMockQueryService()
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := msgstream.NewSimpleMsgStreamFactory()
mgr := newChannelsMgr(master, query, factory)
defer mgr.removeAllDQLStream()
mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory)
defer mgr.removeAllDMLStream()
collID := UniqueID(getUniqueIntGeneratorIns().get())
_, err := mgr.getChannels(collID)
......@@ -163,11 +163,11 @@ func TestChannelsMgrImpl_createDQLMsgStream(t *testing.T) {
}
func TestChannelsMgrImpl_getDQLMsgStream(t *testing.T) {
master := newMockMaster()
query := newMockQueryService()
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := msgstream.NewSimpleMsgStreamFactory()
mgr := newChannelsMgr(master, query, factory)
defer mgr.removeAllDQLStream()
mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory)
defer mgr.removeAllDMLStream()
collID := UniqueID(getUniqueIntGeneratorIns().get())
_, err := mgr.getDQLStream(collID)
......@@ -181,11 +181,11 @@ func TestChannelsMgrImpl_getDQLMsgStream(t *testing.T) {
}
func TestChannelsMgrImpl_removeDQLMsgStream(t *testing.T) {
master := newMockMaster()
query := newMockQueryService()
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := msgstream.NewSimpleMsgStreamFactory()
mgr := newChannelsMgr(master, query, factory)
defer mgr.removeAllDQLStream()
mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory)
defer mgr.removeAllDMLStream()
collID := UniqueID(getUniqueIntGeneratorIns().get())
_, err := mgr.getDQLStream(collID)
......@@ -208,11 +208,11 @@ func TestChannelsMgrImpl_removeDQLMsgStream(t *testing.T) {
}
func TestChannelsMgrImpl_removeAllDQLMsgStream(t *testing.T) {
master := newMockMaster()
query := newMockQueryService()
master := newMockGetChannelsService()
query := newMockGetChannelsService()
factory := msgstream.NewSimpleMsgStreamFactory()
mgr := newChannelsMgr(master, query, factory)
defer mgr.removeAllDQLStream()
mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory)
defer mgr.removeAllDMLStream()
num := 10
for i := 0; i < num; i++ {
......
......@@ -35,19 +35,24 @@ type channelsTimeTicker interface {
close() error
addPChan(pchan pChan) error
getLastTick(pchan pChan) (Timestamp, error)
getMinTsStatistics() (map[pChan]Timestamp, error)
}
type channelsTimeTickerImpl struct {
interval time.Duration // interval to synchronize
minTsStatistics map[pChan]Timestamp // pchan -> min Timestamp
statisticsMtx sync.RWMutex
getStatistics getPChanStatisticsFuncType
tso tsoAllocator
currents map[pChan]Timestamp
currentsMtx sync.RWMutex
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
interval time.Duration // interval to synchronize
minTsStatistics map[pChan]Timestamp // pchan -> min Timestamp
statisticsMtx sync.RWMutex
getStatisticsFunc getPChanStatisticsFuncType
tso tsoAllocator
currents map[pChan]Timestamp
currentsMtx sync.RWMutex
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func (ticker *channelsTimeTickerImpl) getMinTsStatistics() (map[pChan]Timestamp, error) {
panic("implement me")
}
func (ticker *channelsTimeTickerImpl) initStatistics() {
......@@ -86,7 +91,7 @@ func (ticker *channelsTimeTickerImpl) tick() error {
for pchan := range ticker.currents {
current := ticker.currents[pchan]
stats, err := ticker.getStatistics(pchan)
stats, err := ticker.getStatisticsFunc(pchan)
if err != nil {
continue
}
......@@ -174,20 +179,20 @@ func newChannelsTimeTicker(
ctx context.Context,
interval time.Duration,
pchans []pChan,
getStatistics getPChanStatisticsFuncType,
getStatisticsFunc getPChanStatisticsFuncType,
tso tsoAllocator,
) *channelsTimeTickerImpl {
ctx1, cancel := context.WithCancel(ctx)
ticker := &channelsTimeTickerImpl{
interval: interval,
minTsStatistics: make(map[pChan]Timestamp),
getStatistics: getStatistics,
tso: tso,
currents: make(map[pChan]Timestamp),
ctx: ctx1,
cancel: cancel,
interval: interval,
minTsStatistics: make(map[pChan]Timestamp),
getStatisticsFunc: getStatisticsFunc,
tso: tso,
currents: make(map[pChan]Timestamp),
ctx: ctx1,
cancel: cancel,
}
for _, pchan := range pchans {
......
......@@ -152,3 +152,50 @@ func TestChannelsTimeTickerImpl_getLastTick(t *testing.T) {
time.Sleep(time.Second)
}
func TestChannelsTimeTickerImpl_getMinTsStatistics(t *testing.T) {
interval := time.Millisecond * 10
pchanNum := rand.Uint64()%10 + 1
pchans := make([]pChan, 0, pchanNum)
for i := 0; uint64(i) < pchanNum; i++ {
pchans = append(pchans, genUniqueStr())
}
tso := newMockTsoAllocator()
ctx := context.Background()
ticker := newChannelsTimeTicker(ctx, interval, pchans, getStatistics, tso)
err := ticker.start()
assert.Equal(t, nil, err)
var wg sync.WaitGroup
wg.Add(1)
b := make(chan struct{}, 1)
go func() {
defer wg.Done()
timer := time.NewTicker(interval * 40)
for {
select {
case <-b:
return
case <-timer.C:
stats, err := ticker.getMinTsStatistics()
assert.Equal(t, nil, err)
for pchan, ts := range stats {
log.Debug("TestChannelsTimeTickerImpl_getLastTick",
zap.Any("pchan", pchan),
zap.Any("minTs", ts))
}
}
}
}()
time.Sleep(time.Second)
b <- struct{}{}
wg.Wait()
defer func() {
err := ticker.close()
assert.Equal(t, nil, err)
}()
time.Sleep(time.Second)
}
......@@ -1064,6 +1064,7 @@ func (node *ProxyNode) Insert(ctx context.Context, request *milvuspb.InsertReque
rowIDAllocator: node.idAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
}
if len(it.PartitionName) <= 0 {
it.PartitionName = Params.DefaultPartitionName
......
......@@ -14,11 +14,14 @@ package proxynode
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/allocator"
......@@ -228,10 +231,55 @@ func (node *ProxyNode) Init() error {
node.segAssigner = segAssigner
node.segAssigner.PeerID = Params.ProxyID
// TODO(dragondriver): use real master service & query service instance
mockMasterIns := newMockMaster()
mockQueryIns := newMockQueryService()
chMgr := newChannelsMgr(mockMasterIns, mockQueryIns, node.msFactory)
getDmlChannelsFunc := func(collectionID UniqueID) (map[vChan]pChan, error) {
req := &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
MsgID: 0, // todo
Timestamp: 0, // todo
SourceID: 0, // todo
},
DbName: "", // todo
CollectionName: "", // todo
CollectionID: collectionID,
TimeStamp: 0, // todo
}
resp, err := node.masterService.DescribeCollection(node.ctx, req)
if err != nil {
log.Warn("DescribeCollection", zap.Error(err))
return nil, err
}
if resp.Status.ErrorCode != 0 {
log.Warn("DescribeCollection",
zap.Any("ErrorCode", resp.Status.ErrorCode),
zap.Any("Reason", resp.Status.Reason))
return nil, err
}
if len(resp.VirtualChannelNames) != len(resp.PhysicalChannelNames) {
err := fmt.Errorf(
"len(VirtualChannelNames): %v, len(PhysicalChannelNames): %v",
len(resp.VirtualChannelNames),
len(resp.PhysicalChannelNames))
log.Warn("GetDmlChannels", zap.Error(err))
return nil, err
}
ret := make(map[vChan]pChan)
for idx, name := range resp.VirtualChannelNames {
if _, ok := ret[name]; ok {
err := fmt.Errorf(
"duplicated virtual channel found, vchan: %v, pchan: %v",
name,
resp.PhysicalChannelNames[idx])
return nil, err
}
ret[name] = resp.PhysicalChannelNames[idx]
}
return ret, nil
}
mockQueryService := newMockGetChannelsService()
chMgr := newChannelsMgr(getDmlChannelsFunc, mockQueryService.GetChannels, node.msFactory)
node.chMgr = chMgr
node.sched, err = NewTaskScheduler(node.ctx, node.idAllocator, node.tsoAllocator, node.msFactory)
......@@ -243,15 +291,61 @@ func (node *ProxyNode) Init() error {
// TODO(dragondriver): read this from config
interval := time.Millisecond * 200
// TODO(dragondriver): use scheduler's method
getStats := func(ch pChan) (pChanStatistics, error) {
return pChanStatistics{}, nil
}
node.chTicker = newChannelsTimeTicker(node.ctx, interval, []string{}, getStats, tsoAllocator)
node.chTicker = newChannelsTimeTicker(node.ctx, interval, []string{}, node.sched.getPChanStatistics, tsoAllocator)
return nil
}
func (node *ProxyNode) sendChannelsTimeTickLoop() {
node.wg.Add(1)
go func() {
defer node.wg.Done()
// TODO(dragondriver): read this from config
interval := time.Millisecond * 200
timer := time.NewTicker(interval)
for {
select {
case <-node.ctx.Done():
return
case <-timer.C:
stats, err := node.chTicker.getMinTsStatistics()
if err != nil {
log.Warn("sendChannelsTimeTickLoop.getMinTsStatistics", zap.Error(err))
continue
}
channels := make([]pChan, len(stats))
tss := make([]Timestamp, len(stats))
req := &internalpb.ChannelTimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Undefined, // todo
MsgID: 0, // todo
Timestamp: 0, // todo
SourceID: node.session.ServerID,
},
ChannelNames: channels,
Timestamps: tss,
}
status, err := node.masterService.UpdateChannelTimeTick(node.ctx, req)
if err != nil {
log.Warn("sendChannelsTimeTickLoop.UpdateChannelTimeTick", zap.Error(err))
continue
}
if status.ErrorCode != 0 {
log.Warn("sendChannelsTimeTickLoop.UpdateChannelTimeTick",
zap.Any("ErrorCode", status.ErrorCode),
zap.Any("Reason", status.Reason))
continue
}
}
}
}()
}
func (node *ProxyNode) Start() error {
err := InitMetaCache(node.masterService)
if err != nil {
......@@ -283,6 +377,8 @@ func (node *ProxyNode) Start() error {
}
log.Debug("start channelsTimeTicker")
node.sendChannelsTimeTickLoop()
// Start callbacks
for _, cb := range node.startCallbacks {
cb()
......
......@@ -116,6 +116,7 @@ type InsertTask struct {
rowIDAllocator *allocator.IDAllocator
segIDAssigner *SegIDAssigner
chMgr channelsMgr
chTicker channelsTimeTicker
}
func (it *InsertTask) TraceCtx() context.Context {
......@@ -739,6 +740,14 @@ func (it *InsertTask) Execute(ctx context.Context) error {
return err
}
pchans, err := it.chMgr.getChannels(collID)
if err != nil {
return err
}
for _, pchan := range pchans {
_ = it.chTicker.addPChan(pchan)
}
// Assign SegmentID
var pack *msgstream.MsgPack
pack, err = it._assignSegmentID(stream, &msgPack)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册