From 7a82462da8b28a9d414b3c781abbff3ba75dc0fc Mon Sep 17 00:00:00 2001 From: congqixia <84113973+congqixia@users.noreply.github.com> Date: Fri, 21 May 2021 16:54:29 +0800 Subject: [PATCH] DataService publishes flush-completed message into its channel (#5320) DataService publishes flush-completed message into related msgstream See also: #5220 Signed-off-by: Congqi Xia --- internal/dataservice/server.go | 46 +++++++++++++++++++++++++++++ internal/dataservice/server_test.go | 3 ++ 2 files changed, 49 insertions(+) diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index af6ece432..f7dfd031c 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -70,6 +70,7 @@ type Server struct { name string } segmentInfoStream msgstream.MsgStream + flushMsgStream msgstream.MsgStream insertChannels []string msFactory msgstream.Factory ttBarrier timesync.TimeTickBarrier @@ -194,6 +195,15 @@ func (s *Server) initMsgProducer() error { return err } s.msgProducer.Start(s.ctx) + // segment flush stream + s.flushMsgStream, err = s.msFactory.NewMsgStream(s.ctx) + if err != nil { + return err + } + s.flushMsgStream.AsProducer([]string{Params.SegmentInfoChannelName}) + log.Debug("dataservice AsProducer:" + Params.SegmentInfoChannelName) + s.flushMsgStream.Start() + return nil } @@ -891,6 +901,10 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath resp.Reason = "server is initializing" return resp, nil } + if s.flushMsgStream == nil { + resp.Reason = "flush msg stream nil" + return resp, nil + } // check segment id & collection id matched _, err := s.meta.GetCollection(req.GetCollectionID()) @@ -926,12 +940,44 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath for k, v := range ddlMeta { meta[k] = v } + // Save into k-v store err = s.SaveBinLogMetaTxn(meta) if err != nil { resp.Reason = err.Error() return resp, err } + // write flush msg into segmentInfo/flush stream + msgPack := composeSegmentFlushMsgPack(req.SegmentID) + err = s.flushMsgStream.Produce(&msgPack) + if err != nil { + resp.Reason = err.Error() + return resp, err + } resp.ErrorCode = commonpb.ErrorCode_Success return resp, nil } + +func composeSegmentFlushMsgPack(segmentID UniqueID) msgstream.MsgPack { + msgPack := msgstream.MsgPack{ + Msgs: make([]msgstream.TsMsg, 0, 1), + } + completeFlushMsg := internalpb.SegmentFlushCompletedMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SegmentFlushDone, + MsgID: 0, // TODO + Timestamp: 0, // TODO + SourceID: Params.NodeID, + }, + SegmentID: segmentID, + } + var msg msgstream.TsMsg = &msgstream.FlushCompletedMsg{ + BaseMsg: msgstream.BaseMsg{ + HashValues: []uint32{0}, + }, + SegmentFlushCompletedMsg: completeFlushMsg, + } + + msgPack.Msgs = append(msgPack.Msgs, msg) + return msgPack +} diff --git a/internal/dataservice/server_test.go b/internal/dataservice/server_test.go index 82d2b7637..dcac6a039 100644 --- a/internal/dataservice/server_test.go +++ b/internal/dataservice/server_test.go @@ -525,6 +525,9 @@ func TestSaveBinlogPaths(t *testing.T) { t.Run("Normal SaveRequest", func(t *testing.T) { ctx := context.Background() resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{ + Base: &commonpb.MsgBase{ + Timestamp: uint64(time.Now().Unix()), + }, SegmentID: 2, CollectionID: 0, Field2BinlogPaths: &datapb.ID2PathList{ -- GitLab