未验证 提交 5357e301 编写于 作者: G godchen 提交者: GitHub

Fix retrieve search error (#11501)

Signed-off-by: Ngodchen <qingxiang.chen@zilliz.com>
上级 f01063a0
...@@ -779,3 +779,34 @@ func getCompactionState(tasks []*compactionTask) (state commonpb.CompactionState ...@@ -779,3 +779,34 @@ func getCompactionState(tasks []*compactionTask) (state commonpb.CompactionState
} }
return return
} }
func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
log.Debug("receive watch channels request", zap.Any("channels", req.GetChannelNames()))
resp := &datapb.WatchChannelsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}
if s.isClosed() {
log.Warn("failed to watch channels request", zap.Any("channels", req.GetChannelNames()),
zap.Error(errDataCoordIsUnhealthy(Params.NodeID)))
resp.Status.Reason = msgDataCoordIsUnhealthy(Params.NodeID)
return resp, nil
}
for _, channelName := range req.GetChannelNames() {
ch := &channel{
Name: channelName,
CollectionID: req.GetCollectionID(),
}
err := s.channelManager.Watch(ch)
if err != nil {
log.Warn("fail to watch channelName", zap.String("channelName", channelName), zap.Error(err))
resp.Status.Reason = err.Error()
return resp, nil
}
}
resp.Status.ErrorCode = commonpb.ErrorCode_Success
return resp, nil
}
...@@ -184,9 +184,6 @@ func (ddn *ddNode) isFlushed(segmentID UniqueID) bool { ...@@ -184,9 +184,6 @@ func (ddn *ddNode) isFlushed(segmentID UniqueID) bool {
} }
func (ddn *ddNode) forwardDeleteMsg(msgs []msgstream.TsMsg, minTs Timestamp, maxTs Timestamp) error { func (ddn *ddNode) forwardDeleteMsg(msgs []msgstream.TsMsg, minTs Timestamp, maxTs Timestamp) error {
if err := ddn.sendDeltaTimeTick(minTs); err != nil {
return err
}
if len(msgs) != 0 { if len(msgs) != 0 {
var msgPack = msgstream.MsgPack{ var msgPack = msgstream.MsgPack{
Msgs: msgs, Msgs: msgs,
......
...@@ -590,3 +590,18 @@ func (c *Client) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb. ...@@ -590,3 +590,18 @@ func (c *Client) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.
} }
return ret.(*milvuspb.GetCompactionPlansResponse), err return ret.(*milvuspb.GetCompactionPlansResponse), err
} }
func (c *Client) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
client, err := c.getGrpcClient()
if err != nil {
return nil, err
}
return client.WatchChannels(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*datapb.WatchChannelsResponse), err
}
...@@ -110,6 +110,10 @@ func (m *MockDataCoordClient) GetCompactionStateWithPlans(ctx context.Context, r ...@@ -110,6 +110,10 @@ func (m *MockDataCoordClient) GetCompactionStateWithPlans(ctx context.Context, r
return &milvuspb.GetCompactionPlansResponse{}, m.err return &milvuspb.GetCompactionPlansResponse{}, m.err
} }
func (m *MockDataCoordClient) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest, opts ...grpc.CallOption) (*datapb.WatchChannelsResponse, error) {
return &datapb.WatchChannelsResponse{}, m.err
}
func Test_NewClient(t *testing.T) { func Test_NewClient(t *testing.T) {
proxy.Params.InitOnce() proxy.Params.InitOnce()
...@@ -194,6 +198,9 @@ func Test_NewClient(t *testing.T) { ...@@ -194,6 +198,9 @@ func Test_NewClient(t *testing.T) {
r19, err := client.GetCompactionStateWithPlans(ctx, nil) r19, err := client.GetCompactionStateWithPlans(ctx, nil)
retCheck(retNotNil, r19, err) retCheck(retNotNil, r19, err)
r20, err := client.WatchChannels(ctx, nil)
retCheck(retNotNil, r20, err)
} }
client.getGrpcClient = func() (datapb.DataCoordClient, error) { client.getGrpcClient = func() (datapb.DataCoordClient, error) {
......
...@@ -278,3 +278,7 @@ func (s *Server) GetCompactionState(ctx context.Context, req *milvuspb.GetCompac ...@@ -278,3 +278,7 @@ func (s *Server) GetCompactionState(ctx context.Context, req *milvuspb.GetCompac
func (s *Server) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) { func (s *Server) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
return s.dataCoord.GetCompactionStateWithPlans(ctx, req) return s.dataCoord.GetCompactionStateWithPlans(ctx, req)
} }
func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
return s.dataCoord.WatchChannels(ctx, req)
}
...@@ -51,6 +51,7 @@ type MockDataCoord struct { ...@@ -51,6 +51,7 @@ type MockDataCoord struct {
compactionStateResp *milvuspb.GetCompactionStateResponse compactionStateResp *milvuspb.GetCompactionStateResponse
manualCompactionResp *milvuspb.ManualCompactionResponse manualCompactionResp *milvuspb.ManualCompactionResponse
compactionPlansResp *milvuspb.GetCompactionPlansResponse compactionPlansResp *milvuspb.GetCompactionPlansResponse
watchChannelsResp *datapb.WatchChannelsResponse
} }
func (m *MockDataCoord) Init() error { func (m *MockDataCoord) Init() error {
...@@ -145,6 +146,10 @@ func (m *MockDataCoord) GetCompactionStateWithPlans(ctx context.Context, req *mi ...@@ -145,6 +146,10 @@ func (m *MockDataCoord) GetCompactionStateWithPlans(ctx context.Context, req *mi
return m.compactionPlansResp, m.err return m.compactionPlansResp, m.err
} }
func (m *MockDataCoord) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
return m.watchChannelsResp, m.err
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func Test_NewServer(t *testing.T) { func Test_NewServer(t *testing.T) {
ctx := context.Background() ctx := context.Background()
...@@ -293,6 +298,15 @@ func Test_NewServer(t *testing.T) { ...@@ -293,6 +298,15 @@ func Test_NewServer(t *testing.T) {
assert.NotNil(t, resp) assert.NotNil(t, resp)
}) })
t.Run("WatchChannels", func(t *testing.T) {
server.dataCoord = &MockDataCoord{
watchChannelsResp: &datapb.WatchChannelsResponse{},
}
resp, err := server.WatchChannels(ctx, nil)
assert.Nil(t, err)
assert.NotNil(t, resp)
})
err = server.Stop() err = server.Stop()
assert.Nil(t, err) assert.Nil(t, err)
} }
......
...@@ -395,6 +395,10 @@ func (m *MockDataCoord) GetCompactionStateWithPlans(ctx context.Context, req *mi ...@@ -395,6 +395,10 @@ func (m *MockDataCoord) GetCompactionStateWithPlans(ctx context.Context, req *mi
return nil, nil return nil, nil
} }
func (m *MockDataCoord) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
return nil, nil
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type MockProxy struct { type MockProxy struct {
MockBase MockBase
......
...@@ -165,6 +165,10 @@ func TestGrpcService(t *testing.T) { ...@@ -165,6 +165,10 @@ func TestGrpcService(t *testing.T) {
core.CallGetNumRowsService = func(ctx context.Context, segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) { core.CallGetNumRowsService = func(ctx context.Context, segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) {
return rootcoord.Params.MinSegmentSizeToEnableIndex, nil return rootcoord.Params.MinSegmentSizeToEnableIndex, nil
} }
core.CallWatchChannels = func(ctx context.Context, collectionID int64, channelNames []string) error {
return nil
}
segs := []typeutil.UniqueID{} segs := []typeutil.UniqueID{}
segLock := sync.Mutex{} segLock := sync.Mutex{}
core.CallGetFlushedSegmentsService = func(ctx context.Context, collID, partID typeutil.UniqueID) ([]typeutil.UniqueID, error) { core.CallGetFlushedSegmentsService = func(ctx context.Context, collID, partID typeutil.UniqueID) ([]typeutil.UniqueID, error) {
......
...@@ -37,6 +37,8 @@ service DataCoord { ...@@ -37,6 +37,8 @@ service DataCoord {
rpc ManualCompaction(milvus.ManualCompactionRequest) returns (milvus.ManualCompactionResponse) {} rpc ManualCompaction(milvus.ManualCompactionRequest) returns (milvus.ManualCompactionResponse) {}
rpc GetCompactionState(milvus.GetCompactionStateRequest) returns (milvus.GetCompactionStateResponse) {} rpc GetCompactionState(milvus.GetCompactionStateRequest) returns (milvus.GetCompactionStateResponse) {}
rpc GetCompactionStateWithPlans(milvus.GetCompactionPlansRequest) returns (milvus.GetCompactionPlansResponse) {} rpc GetCompactionStateWithPlans(milvus.GetCompactionPlansRequest) returns (milvus.GetCompactionPlansResponse) {}
rpc WatchChannels(WatchChannelsRequest) returns (WatchChannelsResponse) {}
} }
service DataNode { service DataNode {
...@@ -350,3 +352,12 @@ message SegmentFieldBinlogMeta { ...@@ -350,3 +352,12 @@ message SegmentFieldBinlogMeta {
int64 fieldID = 1; int64 fieldID = 1;
string binlog_path = 2; string binlog_path = 2;
} }
message WatchChannelsRequest {
int64 collectionID = 1;
repeated string channelNames = 2;
}
message WatchChannelsResponse {
common.Status status = 1;
}
...@@ -194,6 +194,10 @@ func (coord *DataCoordMock) GetCompactionStateWithPlans(ctx context.Context, req ...@@ -194,6 +194,10 @@ func (coord *DataCoordMock) GetCompactionStateWithPlans(ctx context.Context, req
return &milvuspb.GetCompactionPlansResponse{}, nil return &milvuspb.GetCompactionPlansResponse{}, nil
} }
func (coord *DataCoordMock) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
return &datapb.WatchChannelsResponse{}, nil
}
func NewDataCoordMock() *DataCoordMock { func NewDataCoordMock() *DataCoordMock {
return &DataCoordMock{ return &DataCoordMock{
nodeID: typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()), nodeID: typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()),
......
...@@ -342,11 +342,8 @@ type dqTaskQueue struct { ...@@ -342,11 +342,8 @@ type dqTaskQueue struct {
} }
func (queue *ddTaskQueue) Enqueue(t task) error { func (queue *ddTaskQueue) Enqueue(t task) error {
log.Debug("get mutex")
queue.lock.Lock() queue.lock.Lock()
log.Debug("get mutex end")
defer queue.lock.Unlock() defer queue.lock.Unlock()
log.Debug("get mutex enqueue")
return queue.baseTaskQueue.Enqueue(t) return queue.baseTaskQueue.Enqueue(t)
} }
......
...@@ -161,7 +161,7 @@ func (q *queryCollection) registerCollectionTSafe() error { ...@@ -161,7 +161,7 @@ func (q *queryCollection) registerCollectionTSafe() error {
if err != nil { if err != nil {
return err return err
} }
for _, channel := range historicalCollection.getVChannels() { for _, channel := range historicalCollection.getVDeltaChannels() {
err := q.addTSafeWatcher(channel) err := q.addTSafeWatcher(channel)
if err != nil { if err != nil {
return err return err
...@@ -169,7 +169,7 @@ func (q *queryCollection) registerCollectionTSafe() error { ...@@ -169,7 +169,7 @@ func (q *queryCollection) registerCollectionTSafe() error {
} }
log.Debug("register tSafe watcher and init watcher select case", log.Debug("register tSafe watcher and init watcher select case",
zap.Any("collectionID", historicalCollection.ID()), zap.Any("collectionID", historicalCollection.ID()),
zap.Any("delta channels", historicalCollection.getVChannels())) zap.Any("delta channels", historicalCollection.getVDeltaChannels()))
return nil return nil
} }
...@@ -426,7 +426,7 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) error { ...@@ -426,7 +426,7 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) error {
serviceTime := q.getServiceableTime() serviceTime := q.getServiceableTime()
gt, _ := tsoutil.ParseTS(guaranteeTs) gt, _ := tsoutil.ParseTS(guaranteeTs)
st, _ := tsoutil.ParseTS(serviceTime) st, _ := tsoutil.ParseTS(serviceTime)
if guaranteeTs > serviceTime && len(collection.getVChannels()) > 0 { if guaranteeTs > serviceTime && (len(collection.getVChannels()) > 0 || len(collection.getVDeltaChannels()) > 0) {
log.Debug("query node::receiveQueryMsg: add to unsolvedMsg", log.Debug("query node::receiveQueryMsg: add to unsolvedMsg",
zap.Any("collectionID", q.collectionID), zap.Any("collectionID", q.collectionID),
zap.Any("sm.GuaranteeTimestamp", gt), zap.Any("sm.GuaranteeTimestamp", gt),
......
...@@ -172,14 +172,28 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { ...@@ -172,14 +172,28 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
sCol.addVChannels(vChannels) sCol.addVChannels(vChannels)
sCol.addPChannels(pChannels) sCol.addPChannels(pChannels)
sCol.setLoadType(l) sCol.setLoadType(l)
hCol, err := w.node.historical.replica.getCollectionByID(collectionID)
if err != nil {
return err
}
hCol.addVChannels(vChannels)
hCol.addPChannels(pChannels)
hCol.setLoadType(l)
if loadPartition { if loadPartition {
sCol.deleteReleasedPartition(partitionID) sCol.deleteReleasedPartition(partitionID)
hCol.deleteReleasedPartition(partitionID)
if hasPartitionInStreaming := w.node.streaming.replica.hasPartition(partitionID); !hasPartitionInStreaming { if hasPartitionInStreaming := w.node.streaming.replica.hasPartition(partitionID); !hasPartitionInStreaming {
err := w.node.streaming.replica.addPartition(collectionID, partitionID) err := w.node.streaming.replica.addPartition(collectionID, partitionID)
if err != nil { if err != nil {
return err return err
} }
} }
if hasPartitionInHistorical := w.node.historical.replica.hasPartition(partitionID); !hasPartitionInHistorical {
err := w.node.historical.replica.addPartition(collectionID, partitionID)
if err != nil {
return err
}
}
} }
log.Debug("watchDMChannel, init replica done", zap.Any("collectionID", collectionID)) log.Debug("watchDMChannel, init replica done", zap.Any("collectionID", collectionID))
...@@ -366,6 +380,16 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error { ...@@ -366,6 +380,16 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
hCol.addVDeltaChannels(vDeltaChannels) hCol.addVDeltaChannels(vDeltaChannels)
hCol.addPDeltaChannels(pDeltaChannels) hCol.addPDeltaChannels(pDeltaChannels)
if hasCollectionInStreaming := w.node.streaming.replica.hasCollection(collectionID); !hasCollectionInStreaming {
return fmt.Errorf("cannot find collection with collectionID, %d", collectionID)
}
sCol, err := w.node.streaming.replica.getCollectionByID(collectionID)
if err != nil {
return err
}
sCol.addVDeltaChannels(vDeltaChannels)
sCol.addPDeltaChannels(pDeltaChannels)
// get subscription name // get subscription name
getUniqueSubName := func() string { getUniqueSubName := func() string {
prefixName := Params.MsgChannelSubName prefixName := Params.MsgChannelSubName
...@@ -582,12 +606,14 @@ const ( ...@@ -582,12 +606,14 @@ const (
func (r *releaseCollectionTask) Execute(ctx context.Context) error { func (r *releaseCollectionTask) Execute(ctx context.Context) error {
log.Debug("Execute release collection task", zap.Any("collectionID", r.req.CollectionID)) log.Debug("Execute release collection task", zap.Any("collectionID", r.req.CollectionID))
errMsg := "release collection failed, collectionID = " + strconv.FormatInt(r.req.CollectionID, 10) + ", err = " errMsg := "release collection failed, collectionID = " + strconv.FormatInt(r.req.CollectionID, 10) + ", err = "
log.Debug("release streaming", zap.Any("collectionID", r.req.CollectionID))
err := r.releaseReplica(r.node.streaming.replica, replicaStreaming) err := r.releaseReplica(r.node.streaming.replica, replicaStreaming)
if err != nil { if err != nil {
return errors.New(errMsg + err.Error()) return errors.New(errMsg + err.Error())
} }
// remove collection metas in streaming and historical // remove collection metas in streaming and historical
log.Debug("release historical", zap.Any("collectionID", r.req.CollectionID))
err = r.releaseReplica(r.node.historical.replica, replicaHistorical) err = r.releaseReplica(r.node.historical.replica, replicaHistorical)
if err != nil { if err != nil {
return errors.New(errMsg + err.Error()) return errors.New(errMsg + err.Error())
...@@ -606,6 +632,7 @@ func (r *releaseCollectionTask) releaseReplica(replica ReplicaInterface, replica ...@@ -606,6 +632,7 @@ func (r *releaseCollectionTask) releaseReplica(replica ReplicaInterface, replica
return err return err
} }
// set release time // set release time
log.Debug("set release time", zap.Any("collectionID", r.req.CollectionID))
collection.setReleaseTime(r.req.Base.Timestamp) collection.setReleaseTime(r.req.Base.Timestamp)
// sleep to wait for query tasks done // sleep to wait for query tasks done
...@@ -706,6 +733,7 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error { ...@@ -706,6 +733,7 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
log.Debug("start release partition", zap.Any("collectionID", r.req.CollectionID))
// release partitions // release partitions
vChannels := sCol.getVChannels() vChannels := sCol.getVChannels()
...@@ -748,6 +776,26 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error { ...@@ -748,6 +776,26 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
hCol.addReleasedPartition(id) hCol.addReleasedPartition(id)
sCol.addReleasedPartition(id) sCol.addReleasedPartition(id)
} }
pids, err := r.node.historical.replica.getPartitionIDs(r.req.CollectionID)
if err != nil {
return err
}
log.Debug("start release history pids", zap.Any("pids", pids))
if len(pids) == 0 && hCol.getLoadType() == loadTypePartition {
r.node.dataSyncService.removeCollectionDeltaFlowGraph(r.req.CollectionID)
vChannels := hCol.getVDeltaChannels()
for _, channel := range vChannels {
log.Debug("Releasing tSafe in releasePartitionTask...",
zap.Any("collectionID", r.req.CollectionID),
zap.Any("vChannel", channel),
)
// no tSafe in tSafeReplica, don't return error
err = r.node.tSafeReplica.removeTSafe(channel)
if err != nil {
log.Warn(err.Error())
}
}
}
// release global segment info // release global segment info
r.node.historical.removeGlobalSegmentIDsByPartitionIds(r.req.PartitionIDs) r.node.historical.removeGlobalSegmentIDsByPartitionIds(r.req.PartitionIDs)
......
...@@ -79,6 +79,7 @@ func newTSafe(ctx context.Context, channel Channel) tSafer { ...@@ -79,6 +79,7 @@ func newTSafe(ctx context.Context, channel Channel) tSafer {
watcherList: make([]*tSafeWatcher, 0), watcherList: make([]*tSafeWatcher, 0),
tSafeChan: make(chan tSafeMsg, channelSize), tSafeChan: make(chan tSafeMsg, channelSize),
tSafeRecord: make(map[UniqueID]Timestamp), tSafeRecord: make(map[UniqueID]Timestamp),
tSafe: math.MaxUint64,
} }
return t return t
} }
......
...@@ -124,6 +124,8 @@ type Core struct { ...@@ -124,6 +124,8 @@ type Core struct {
CallReleaseCollectionService func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID) error CallReleaseCollectionService func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID) error
CallReleasePartitionService func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID, partitionIDs []typeutil.UniqueID) error CallReleasePartitionService func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID, partitionIDs []typeutil.UniqueID) error
CallWatchChannels func(ctx context.Context, collectionID int64, channelNames []string) error
// dml channels used for insert // dml channels used for insert
dmlChannels *dmlChannels dmlChannels *dmlChannels
...@@ -233,6 +235,9 @@ func (c *Core) checkInit() error { ...@@ -233,6 +235,9 @@ func (c *Core) checkInit() error {
if c.CallGetFlushedSegmentsService == nil { if c.CallGetFlushedSegmentsService == nil {
return fmt.Errorf("CallGetFlushedSegments is nil") return fmt.Errorf("CallGetFlushedSegments is nil")
} }
if c.CallWatchChannels == nil {
return fmt.Errorf("WatchChannelReq is nil")
}
if c.NewProxyClient == nil { if c.NewProxyClient == nil {
return fmt.Errorf("NewProxyClient is nil") return fmt.Errorf("NewProxyClient is nil")
} }
...@@ -672,6 +677,26 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error { ...@@ -672,6 +677,26 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error {
return rsp.Segments, nil return rsp.Segments, nil
} }
c.CallWatchChannels = func(ctx context.Context, collectionID int64, channelNames []string) (retErr error) {
defer func() {
if err := recover(); err != nil {
retErr = fmt.Errorf("watch channels panic, msg = %v", err)
}
}()
<-initCh
req := &datapb.WatchChannelsRequest{
CollectionID: collectionID,
ChannelNames: channelNames,
}
rsp, err := s.WatchChannels(ctx, req)
if err != nil {
return err
}
if rsp.Status.ErrorCode != commonpb.ErrorCode_Success {
return fmt.Errorf("data coord watch channels failed, reason = %s", rsp.Status.Reason)
}
return nil
}
return nil return nil
} }
......
...@@ -148,6 +148,13 @@ func (d *dataMock) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushe ...@@ -148,6 +148,13 @@ func (d *dataMock) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushe
return rsp, nil return rsp, nil
} }
func (d *dataMock) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
return &datapb.WatchChannelsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}}, nil
}
type queryMock struct { type queryMock struct {
types.QueryCoord types.QueryCoord
collID []typeutil.UniqueID collID []typeutil.UniqueID
...@@ -2438,7 +2445,14 @@ func TestCheckInit(t *testing.T) { ...@@ -2438,7 +2445,14 @@ func TestCheckInit(t *testing.T) {
return nil return nil
} }
err = c.checkInit() err = c.checkInit()
assert.NotNil(t, err)
c.CallWatchChannels = func(ctx context.Context, collectionID int64, channelNames []string) error {
return nil
}
err = c.checkInit()
assert.Nil(t, err) assert.Nil(t, err)
err = c.Stop() err = c.Stop()
assert.Nil(t, err) assert.Nil(t, err)
} }
......
...@@ -231,6 +231,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { ...@@ -231,6 +231,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason) t.core.chanTimeTick.RemoveDdlTimeTick(ts, reason)
t.core.SendTimeTick(ts, reason) t.core.SendTimeTick(ts, reason)
return nil return nil
} }
...@@ -239,6 +240,11 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { ...@@ -239,6 +240,11 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
return err return err
} }
err = t.core.CallWatchChannels(ctx, collID, vchanNames)
if err != nil {
return err
}
// Update DDOperation in etcd // Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true) return t.core.setDdMsgSendFlag(true)
} }
......
...@@ -228,6 +228,8 @@ type DataCoord interface { ...@@ -228,6 +228,8 @@ type DataCoord interface {
// GetCompactionState gets the state of a compaction // GetCompactionState gets the state of a compaction
GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error)
GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error)
WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error)
} }
// IndexNode is the interface `indexnode` package implements // IndexNode is the interface `indexnode` package implements
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册