From c2ca2c276fded7a8524822cf611fad86c50bb022 Mon Sep 17 00:00:00 2001 From: godchen Date: Thu, 4 Feb 2021 19:34:35 +0800 Subject: [PATCH] Fix response check error Signed-off-by: godchen --- configs/advanced/data_service.yaml | 2 +- docs/developer_guides/chap03_index_service.md | 10 +- docs/developer_guides/chap05_proxy.md | 14 +- docs/developer_guides/chap09_data_service.md | 31 ++++- internal/datanode/data_node.go | 28 +--- .../datanode/flow_graph_filter_dm_node.go | 39 +----- .../datanode/flow_graph_insert_buffer_node.go | 17 --- internal/dataservice/server.go | 40 ++++-- internal/distributed/dataservice/client.go | 47 ++----- .../distributed/dataservice/grpc_service.go | 59 +------- .../distributed/indexnode/client/client.go | 27 +--- internal/distributed/indexnode/service.go | 28 +--- .../distributed/indexservice/client/client.go | 18 ++- .../distributed/proxynode/client/client.go | 7 +- .../distributed/proxyservice/client/client.go | 22 +-- internal/distributed/proxyservice/service.go | 16 +-- internal/indexnode/indexnode.go | 18 ++- internal/masterservice/master_service.go | 19 +-- internal/masterservice/master_service_test.go | 24 +++- internal/msgstream/msg.go | 115 +--------------- .../msgstream/pulsarms/pulsar_msgstream.go | 126 ------------------ internal/msgstream/util/repack_func.go | 6 - internal/proxynode/impl.go | 11 +- internal/proxynode/interface.go | 4 +- internal/proxynode/proxy_node.go | 20 +-- internal/proxynode/repack_func.go | 1 - internal/proxynode/task.go | 120 ++++++++++++----- internal/proxyservice/impl.go | 21 +-- internal/proxyservice/interface.go | 3 +- internal/proxyservice/node_info.go | 4 +- internal/proxyservice/task.go | 9 +- .../querynode/flow_graph_filter_dm_node.go | 38 +----- internal/querynode/flow_graph_insert_node.go | 37 +---- internal/querynode/query_node.go | 32 ----- internal/querynode/search_service.go | 24 ---- internal/queryservice/queryservice.go | 21 ++- internal/queryservice/queryservice_test.go | 9 +- internal/util/flowgraph/input_node.go | 30 ----- internal/util/typeutil/interface.go | 5 +- 39 files changed, 327 insertions(+), 775 deletions(-) diff --git a/configs/advanced/data_service.yaml b/configs/advanced/data_service.yaml index addbe35bb..c82a0cea0 100644 --- a/configs/advanced/data_service.yaml +++ b/configs/advanced/data_service.yaml @@ -9,5 +9,5 @@ dataservice: defaultSizePerRecord: 1024 # old name: segmentExpireDuration: 2000 IDAssignExpiration: 2000 # ms - insertChannelNum: 16 + insertChannelNum: 2 dataNodeNum: 1 \ No newline at end of file diff --git a/docs/developer_guides/chap03_index_service.md b/docs/developer_guides/chap03_index_service.md index f114c9f05..5add6dc82 100644 --- a/docs/developer_guides/chap03_index_service.md +++ b/docs/developer_guides/chap03_index_service.md @@ -13,10 +13,12 @@ ```go type IndexService interface { Service - RegisterNode(req RegisterNodeRequest) (RegisterNodeResponse, error) - BuildIndex(req BuildIndexRequest) (BuildIndexResponse, error) - GetIndexStates(req IndexStatesRequest) (IndexStatesResponse, error) - GetIndexFilePaths(req IndexFilePathRequest) (IndexFilePathsResponse, error) + RegisterNode(RegisterNodeRequest) (RegisterNodeResponse, error) + BuildIndex(BuildIndexRequest) (BuildIndexResponse, error) + GetIndexStates(IndexStatesRequest) (IndexStatesResponse, error) + GetIndexFilePaths(IndexFilePathRequest) (IndexFilePathsResponse, error) + GetTimeTickChannel() (StringResponse, error) + GetStatisticsChannel() (StringResponse, error) NotifyTaskState(TaskStateNotification) error } diff --git a/docs/developer_guides/chap05_proxy.md b/docs/developer_guides/chap05_proxy.md index 6689f187e..e73fe9d65 100644 --- a/docs/developer_guides/chap05_proxy.md +++ b/docs/developer_guides/chap05_proxy.md @@ -13,7 +13,7 @@ type ProxyService interface { Service RegisterLink() (RegisterLinkResponse, error) RegisterNode(req RegisterNodeRequest) (RegisterNodeResponse, error) - InvalidateCollectionMetaCache(req InvalidateCollMetaCacheRequest) error + InvalidateCollectionMetaCache(req InvalidateCollMetaCacheRequest) (Status, error) } ``` @@ -72,8 +72,10 @@ type ProxyNode interface { Service //SetTimeTickChannel(channelName string) error //SetStatsChannel(channelName string) error + + InvalidateCollectionMetaCache(request InvalidateCollMetaCacheRequest) (Status, error) - CreateCollection(req CreateCollectionRequest) error + CreateCollection(req CreateCollectionRequest) error DropCollection(req DropCollectionRequest) error HasCollection(req HasCollectionRequest) (bool, error) LoadCollection(req LoadCollectionRequest) error @@ -92,12 +94,16 @@ type ProxyNode interface { CreateIndex(req CreateIndexRequest) error DescribeIndex(DescribeIndexRequest) (DescribeIndexResponse, error) - + GetIndexState(IndexStateRequest) (IndexStateResponse, error) + Insert(req InsertRequest) (InsertResponse, error) Search(req SearchRequest) (SearchResults, error) Flush(req FlushRequest) error - GetPersistentSegmentInfo(req PersistentSegmentInfoRequest) (PersistentSegmentInfoResponse, error) + GetDdChannel(Empty) (StringResponse, error) + + GetQuerySegmentInfo(QuerySegmentInfoRequest) (QuerySegmentInfoResponse, error) + GetPersistentSegmentInfo(PersistentSegmentInfoRequest) (PersistentSegmentInfoResponse, error) } ``` diff --git a/docs/developer_guides/chap09_data_service.md b/docs/developer_guides/chap09_data_service.md index d92c88284..a2f693228 100644 --- a/docs/developer_guides/chap09_data_service.md +++ b/docs/developer_guides/chap09_data_service.md @@ -23,8 +23,9 @@ type DataService interface { GetSegmentInfo(req SegmentInfoRequest) (SegmentInfoResponse, error) GetInsertBinlogPaths(req InsertBinlogPathRequest) (InsertBinlogPathsResponse, error) - - GetInsertChannels(req InsertChannelRequest) ([]string, error) + + GetSegmentInfoChannel(req InsertChannelRequest) (StringResponse, error) + GetInsertChannels(req InsertChannelRequest) (StringList, error) GetCollectionStatistics(req CollectionStatsRequest) (CollectionStatsResponse, error) GetPartitionStatistics(req PartitionStatsRequest) (PartitionStatsResponse, error) @@ -253,16 +254,38 @@ type InsertRequest struct { ```go type DataNode interface { Service + + GetComponentStates() (ComponentStates, error) + GetTimeTickChannel() (StringResponse, error) + GetStatisticsChannel() (StringResponse, error) - WatchDmChannels(req WatchDmChannelRequest) error + WatchDmChannels(WatchDmChannelRequest) error + FlushSegments(FlushSegRequest) (Status, error) //WatchDdChannel(channelName string) error //SetTimeTickChannel(channelName string) error //SetStatisticsChannel(channelName string) error - FlushSegments(req FlushSegRequest) error + SetMasterServiceInterface(MasterServiceInterface) error + SetDataServiceInterface(DataServiceInterface) error } ``` +```go +type DataServiceInterface interface { + GetComponentStates() (ComponentStates, error) + RegisterNode(RegisterNodeRequest) (RegisterNodeResponse, error) +} +``` +```go +type MasterServiceInterface interface { + GetComponentStates() (ComponentStates, error) + AllocID(IDRequest) (IDResponse, error) + ShowCollections(ShowCollectionRequest) (ShowCollectionResponse, error) + DescribeCollection(DescribeCollectionRequest) (DescribeCollectionResponse, error) +} + +``` + * *WatchDmChannels* diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 5599216be..b86f64e08 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -7,10 +7,6 @@ import ( "log" "time" - "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go" - "github.com/uber/jaeger-client-go/config" - "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" @@ -33,8 +29,8 @@ type ( // Component GetComponentStates() (*internalpb2.ComponentStates, error) - GetTimeTickChannel() (string, error) // This function has no effect - GetStatisticsChannel() (string, error) // This function has no effect + GetTimeTickChannel() (*milvuspb.StringResponse, error) // This function has no effect + GetStatisticsChannel() (*milvuspb.StringResponse, error) // This function has no effect WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error) @@ -72,7 +68,6 @@ type ( flushChan chan *flushMsg replica collectionReplica - tracer opentracing.Tracer closer io.Closer } ) @@ -177,25 +172,6 @@ func (node *DataNode) Init() error { node.replica = replica - // --- Opentracing --- - cfg := &config.Configuration{ - ServiceName: "data_node", - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - Reporter: &config.ReporterConfig{ - LogSpans: true, - }, - } - tracer, closer, err := cfg.NewTracer(config.Logger(jaeger.StdLogger)) - if err != nil { - return errors.Errorf("ERROR: cannot init Jaeger: %v\n", err) - } - node.tracer = tracer - node.closer = closer - opentracing.SetGlobalTracer(node.tracer) - return nil } diff --git a/internal/datanode/flow_graph_filter_dm_node.go b/internal/datanode/flow_graph_filter_dm_node.go index 4663635a4..c5171a6be 100644 --- a/internal/datanode/flow_graph_filter_dm_node.go +++ b/internal/datanode/flow_graph_filter_dm_node.go @@ -1,12 +1,9 @@ package datanode import ( - "context" "log" "math" - "github.com/opentracing/opentracing-go" - "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -35,28 +32,6 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { // TODO: add error handling } - var childs []opentracing.Span - tracer := opentracing.GlobalTracer() - if tracer != nil { - for _, msg := range msgStreamMsg.TsMessages() { - if msg.Type() == commonpb.MsgType_kInsert { - var child opentracing.Span - ctx := msg.GetMsgContext() - if parent := opentracing.SpanFromContext(ctx); parent != nil { - child = tracer.StartSpan("pass filter node", - opentracing.FollowsFrom(parent.Context())) - } else { - child = tracer.StartSpan("pass filter node") - } - child.SetTag("hash keys", msg.HashKeys()) - child.SetTag("start time", msg.BeginTs()) - child.SetTag("end time", msg.EndTs()) - msg.SetMsgContext(opentracing.ContextWithSpan(ctx, child)) - childs = append(childs, child) - } - } - } - ddMsg, ok := (*in[1]).(*ddMsg) if !ok { log.Println("type assertion failed for ddMsg") @@ -77,20 +52,11 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { iMsg.flushMessages = append(iMsg.flushMessages, ddMsg.flushMessages...) - for key, msg := range msgStreamMsg.TsMessages() { + for _, msg := range msgStreamMsg.TsMessages() { switch msg.Type() { case commonpb.MsgType_kInsert: - var ctx2 context.Context - if childs != nil { - if childs[key] != nil { - ctx2 = opentracing.ContextWithSpan(msg.GetMsgContext(), childs[key]) - } else { - ctx2 = context.Background() - } - } resMsg := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg)) if resMsg != nil { - resMsg.SetMsgContext(ctx2) iMsg.insertMessages = append(iMsg.insertMessages, resMsg) } // case commonpb.MsgType_kDelete: @@ -103,9 +69,6 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { iMsg.startPositions = append(iMsg.startPositions, msgStreamMsg.StartPositions()...) iMsg.gcRecord = ddMsg.gcRecord var res Msg = &iMsg - for _, child := range childs { - child.Finish() - } return []*Msg{&res} } diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 1b5336926..59a292a91 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/binary" - "fmt" "log" "path" "strconv" @@ -12,9 +11,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/opentracing/opentracing-go" - oplog "github.com/opentracing/opentracing-go/log" - "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/kv" miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio" @@ -155,23 +151,12 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { // iMsg is insertMsg // 1. iMsg -> buffer for _, msg := range iMsg.insertMessages { - ctx := msg.GetMsgContext() - var span opentracing.Span - if ctx != nil { - span, _ = opentracing.StartSpanFromContext(ctx, fmt.Sprintf("insert buffer node, start time = %d", msg.BeginTs())) - } else { - span = opentracing.StartSpan(fmt.Sprintf("insert buffer node, start time = %d", msg.BeginTs())) - } - span.SetTag("hash keys", msg.HashKeys()) - span.SetTag("start time", msg.BeginTs()) - span.SetTag("end time", msg.EndTs()) if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) { log.Println("Error: misaligned messages detected") continue } currentSegID := msg.GetSegmentID() collectionID := msg.GetCollectionID() - span.LogFields(oplog.Int("segment id", int(currentSegID))) idata, ok := ibNode.insertBuffer.insertData[currentSegID] if !ok { @@ -426,11 +411,9 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { // 1.3 store in buffer ibNode.insertBuffer.insertData[currentSegID] = idata - span.LogFields(oplog.String("store in buffer", "store in buffer")) // 1.4 if full // 1.4.1 generate binlogs - span.LogFields(oplog.String("generate binlogs", "generate binlogs")) if ibNode.insertBuffer.full(currentSegID) { log.Printf(". Insert Buffer full, auto flushing (%v) rows of data...", ibNode.insertBuffer.size(currentSegID)) diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 5b1c54e09..2116478e9 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -44,8 +44,8 @@ type DataService interface { ShowSegments(req *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) - GetSegmentInfoChannel() (string, error) - GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) + GetSegmentInfoChannel() (*milvuspb.StringResponse, error) + GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error) GetComponentStates() (*internalpb2.ComponentStates, error) @@ -438,12 +438,22 @@ func (s *Server) GetComponentStates() (*internalpb2.ComponentStates, error) { return resp, nil } -func (s *Server) GetTimeTickChannel() (string, error) { - return Params.TimeTickChannelName, nil +func (s *Server) GetTimeTickChannel() (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + Value: Params.TimeTickChannelName, + }, nil } -func (s *Server) GetStatisticsChannel() (string, error) { - return Params.StatisticsChannelName, nil +func (s *Server) GetStatisticsChannel() (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + Value: Params.StatisticsChannelName, + }, nil } func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) { @@ -687,8 +697,13 @@ func (s *Server) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*dat return resp, nil } -func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) { - return s.insertChannels, nil +func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) { + return &internalpb2.StringList{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + Values: s.insertChannels, + }, nil } func (s *Server) GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) { @@ -718,8 +733,13 @@ func (s *Server) GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*dat return nil, nil } -func (s *Server) GetSegmentInfoChannel() (string, error) { - return Params.SegmentInfoChannelName, nil +func (s *Server) GetSegmentInfoChannel() (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + Value: Params.SegmentInfoChannelName, + }, nil } func (s *Server) GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) { diff --git a/internal/distributed/dataservice/client.go b/internal/distributed/dataservice/client.go index 4507502ef..cc70a3ed3 100644 --- a/internal/distributed/dataservice/client.go +++ b/internal/distributed/dataservice/client.go @@ -2,9 +2,10 @@ package dataservice import ( "context" - "errors" "time" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" + "google.golang.org/grpc" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -58,26 +59,12 @@ func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) { return c.grpcClient.GetComponentStates(context.Background(), &commonpb.Empty{}) } -func (c *Client) GetTimeTickChannel() (string, error) { - resp, err := c.grpcClient.GetTimeTickChannel(context.Background(), &commonpb.Empty{}) - if err != nil { - return "", err - } - if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - return "", errors.New(resp.Status.Reason) - } - return resp.Value, nil +func (c *Client) GetTimeTickChannel() (*milvuspb.StringResponse, error) { + return c.grpcClient.GetTimeTickChannel(context.Background(), &commonpb.Empty{}) } -func (c *Client) GetStatisticsChannel() (string, error) { - resp, err := c.grpcClient.GetStatisticsChannel(context.Background(), &commonpb.Empty{}) - if err != nil { - return "", err - } - if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - return "", errors.New(resp.Status.Reason) - } - return resp.Value, nil +func (c *Client) GetStatisticsChannel() (*milvuspb.StringResponse, error) { + return c.grpcClient.GetStatisticsChannel(context.Background(), &commonpb.Empty{}) } func (c *Client) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) { @@ -104,15 +91,8 @@ func (c *Client) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*dat return c.grpcClient.GetInsertBinlogPaths(context.Background(), req) } -func (c *Client) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) { - resp, err := c.grpcClient.GetInsertChannels(context.Background(), req) - if err != nil { - return nil, err - } - if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - return nil, errors.New(resp.Status.Reason) - } - return resp.Values, nil +func (c *Client) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) { + return c.grpcClient.GetInsertChannels(context.Background(), req) } func (c *Client) GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) { @@ -123,15 +103,8 @@ func (c *Client) GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*dat return c.grpcClient.GetPartitionStatistics(context.Background(), req) } -func (c *Client) GetSegmentInfoChannel() (string, error) { - resp, err := c.grpcClient.GetSegmentInfoChannel(context.Background(), &commonpb.Empty{}) - if err != nil { - return "", err - } - if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - return "", errors.New(resp.Status.Reason) - } - return resp.Value, nil +func (c *Client) GetSegmentInfoChannel() (*milvuspb.StringResponse, error) { + return c.grpcClient.GetSegmentInfoChannel(context.Background(), &commonpb.Empty{}) } func (c *Client) GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) { diff --git a/internal/distributed/dataservice/grpc_service.go b/internal/distributed/dataservice/grpc_service.go index ff3951282..13a8fab20 100644 --- a/internal/distributed/dataservice/grpc_service.go +++ b/internal/distributed/dataservice/grpc_service.go @@ -105,20 +105,7 @@ func (s *Service) GetInsertBinlogPaths(ctx context.Context, request *datapb.Inse } func (s *Service) GetInsertChannels(ctx context.Context, request *datapb.InsertChannelRequest) (*internalpb2.StringList, error) { - resp := &internalpb2.StringList{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - } - channels, err := s.server.GetInsertChannels(request) - if err != nil { - resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR - resp.Status.Reason = err.Error() - return resp, nil - } - - resp.Values = channels - return resp, nil + return s.server.GetInsertChannels(request) } func (s *Service) GetCollectionStatistics(ctx context.Context, request *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) { @@ -134,53 +121,15 @@ func (s *Service) GetComponentStates(ctx context.Context, empty *commonpb.Empty) } func (s *Service) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { - resp := &milvuspb.StringResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - } - channel, err := s.server.GetTimeTickChannel() - if err != nil { - resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR - resp.Status.Reason = err.Error() - return resp, nil - } - - resp.Value = channel - return resp, nil + return s.server.GetTimeTickChannel() } func (s *Service) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { - resp := &milvuspb.StringResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - } - channel, err := s.server.GetStatisticsChannel() - if err != nil { - resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR - resp.Status.Reason = err.Error() - return resp, nil - } - - resp.Value = channel - return resp, nil + return s.server.GetStatisticsChannel() } func (s *Service) GetSegmentInfoChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { - resp := &milvuspb.StringResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - } - channel, err := s.server.GetSegmentInfoChannel() - if err != nil { - resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR - resp.Status.Reason = err.Error() - return resp, nil - } - resp.Value = channel - return resp, nil + return s.server.GetSegmentInfoChannel() } func (s *Service) GetCount(ctx context.Context, request *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) { diff --git a/internal/distributed/indexnode/client/client.go b/internal/distributed/indexnode/client/client.go index 201bcb2e8..05dbc7957 100644 --- a/internal/distributed/indexnode/client/client.go +++ b/internal/distributed/indexnode/client/client.go @@ -2,9 +2,10 @@ package grpcindexnodeclient import ( "context" - "errors" "time" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" + "github.com/zilliztech/milvus-distributed/internal/util/retry" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -47,28 +48,12 @@ func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) { return c.grpcClient.GetComponentStates(context.Background(), &commonpb.Empty{}) } -func (c *Client) GetTimeTickChannel() (string, error) { - resp, err := c.grpcClient.GetTimeTickChannel(context.Background(), &commonpb.Empty{}) - - if err != nil { - return "", err - } - if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - return "", errors.New(resp.Status.Reason) - } - return resp.Value, nil +func (c *Client) GetTimeTickChannel() (*milvuspb.StringResponse, error) { + return c.grpcClient.GetTimeTickChannel(context.Background(), &commonpb.Empty{}) } -func (c *Client) GetStatisticsChannel() (string, error) { - resp, err := c.grpcClient.GetStatisticsChannel(context.Background(), &commonpb.Empty{}) - - if err != nil { - return "", err - } - if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - return "", errors.New(resp.Status.Reason) - } - return resp.Value, nil +func (c *Client) GetStatisticsChannel() (*milvuspb.StringResponse, error) { + return c.grpcClient.GetStatisticsChannel(context.Background(), &commonpb.Empty{}) } func (c *Client) BuildIndex(req *indexpb.BuildIndexCmd) (*commonpb.Status, error) { diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 38cc6c9d3..ab9a7300e 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -142,35 +142,11 @@ func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) } func (s *Server) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { - ret, err := s.impl.GetTimeTickChannel() - resp := &milvuspb.StringResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - } - if err != nil { - resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR - resp.Status.Reason = err.Error() - } else { - resp.Value = ret - } - return resp, nil + return s.impl.GetTimeTickChannel() } func (s *Server) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { - ret, err := s.impl.GetStatisticsChannel() - resp := &milvuspb.StringResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - } - if err != nil { - resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR - resp.Status.Reason = err.Error() - } else { - resp.Value = ret - } - return resp, nil + return s.impl.GetStatisticsChannel() } func NewServer(ctx context.Context) (*Server, error) { diff --git a/internal/distributed/indexservice/client/client.go b/internal/distributed/indexservice/client/client.go index f3fd99e99..b84177a44 100644 --- a/internal/distributed/indexservice/client/client.go +++ b/internal/distributed/indexservice/client/client.go @@ -5,6 +5,8 @@ import ( "log" "time" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" + "google.golang.org/grpc" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -50,12 +52,20 @@ func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) { return c.grpcClient.GetComponentStates(ctx, &commonpb.Empty{}) } -func (c *Client) GetTimeTickChannel() (string, error) { - return "", nil +func (c *Client) GetTimeTickChannel() (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + }, nil } -func (c *Client) GetStatisticsChannel() (string, error) { - return "", nil +func (c *Client) GetStatisticsChannel() (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + }, nil } func (c *Client) RegisterNode(req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) { diff --git a/internal/distributed/proxynode/client/client.go b/internal/distributed/proxynode/client/client.go index 637f912d1..a915c2767 100644 --- a/internal/distributed/proxynode/client/client.go +++ b/internal/distributed/proxynode/client/client.go @@ -4,6 +4,8 @@ import ( "context" "time" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/proxypb" "github.com/zilliztech/milvus-distributed/internal/util/retry" "google.golang.org/grpc" @@ -39,9 +41,8 @@ func (c *Client) Stop() error { return nil } -func (c *Client) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error { - _, err := c.grpcClient.InvalidateCollectionMetaCache(c.ctx, request) - return err +func (c *Client) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { + return c.grpcClient.InvalidateCollectionMetaCache(c.ctx, request) } func NewClient(ctx context.Context, address string) *Client { diff --git a/internal/distributed/proxyservice/client/client.go b/internal/distributed/proxyservice/client/client.go index 13621a2db..99ad11e5c 100644 --- a/internal/distributed/proxyservice/client/client.go +++ b/internal/distributed/proxyservice/client/client.go @@ -4,6 +4,8 @@ import ( "context" "time" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" + "google.golang.org/grpc" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -46,25 +48,25 @@ func (c *Client) RegisterNode(request *proxypb.RegisterNodeRequest) (*proxypb.Re return c.proxyServiceClient.RegisterNode(c.ctx, request) } -func (c *Client) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error { +func (c *Client) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { _, err := c.proxyServiceClient.InvalidateCollectionMetaCache(c.ctx, request) - return err + return nil, err } -func (c *Client) GetTimeTickChannel() (string, error) { - response, err := c.proxyServiceClient.GetTimeTickChannel(c.ctx, &commonpb.Empty{}) - if err != nil { - return "", err - } - return response.Value, nil +func (c *Client) GetTimeTickChannel() (*milvuspb.StringResponse, error) { + return c.proxyServiceClient.GetTimeTickChannel(c.ctx, &commonpb.Empty{}) } func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) { return c.proxyServiceClient.GetComponentStates(c.ctx, &commonpb.Empty{}) } -func (c *Client) GetStatisticsChannel() (string, error) { - return "", nil +func (c *Client) GetStatisticsChannel() (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + }, nil } func NewClient(address string) *Client { diff --git a/internal/distributed/proxyservice/service.go b/internal/distributed/proxyservice/service.go index 565905ff8..19fe18ecd 100644 --- a/internal/distributed/proxyservice/service.go +++ b/internal/distributed/proxyservice/service.go @@ -133,23 +133,11 @@ func (s *Server) RegisterNode(ctx context.Context, request *proxypb.RegisterNode } func (s *Server) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { - return &commonpb.Status{}, s.impl.InvalidateCollectionMetaCache(request) + return s.impl.InvalidateCollectionMetaCache(request) } func (s *Server) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { - channel, err := s.impl.GetTimeTickChannel() - if err != nil { - return &milvuspb.StringResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: err.Error(), - }, - Value: "", - }, nil - } - return &milvuspb.StringResponse{ - Value: channel, - }, nil + return s.impl.GetTimeTickChannel() } func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 11af320e4..97b1ad55b 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -5,6 +5,8 @@ import ( "log" "time" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" + "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/kv" miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio" @@ -209,10 +211,18 @@ func (i *NodeImpl) GetComponentStates() (*internalpb2.ComponentStates, error) { return ret, nil } -func (i *NodeImpl) GetTimeTickChannel() (string, error) { - return "", nil +func (i *NodeImpl) GetTimeTickChannel() (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + }, nil } -func (i *NodeImpl) GetStatisticsChannel() (string, error) { - return "", nil +func (i *NodeImpl) GetStatisticsChannel() (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + }, nil } diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 545c0c401..8f7e5f73a 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -34,13 +34,13 @@ import ( // masterpb2 -> masterpb (master_service) type ProxyServiceInterface interface { - GetTimeTickChannel() (string, error) - InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error + GetTimeTickChannel() (*milvuspb.StringResponse, error) + InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) } type DataServiceInterface interface { GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) - GetSegmentInfoChannel() (string, error) + GetSegmentInfoChannel() (*milvuspb.StringResponse, error) } type IndexServiceInterface interface { @@ -608,11 +608,11 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error { if err != nil { return err } - Params.ProxyTimeTickChannel = rsp + Params.ProxyTimeTickChannel = rsp.Value log.Printf("proxy time tick channel name = %s", Params.ProxyTimeTickChannel) c.InvalidateCollectionMetaCache = func(ts typeutil.Timestamp, dbName string, collectionName string) error { - err := s.InvalidateCollectionMetaCache(&proxypb.InvalidateCollMetaCacheRequest{ + status, _ := s.InvalidateCollectionMetaCache(&proxypb.InvalidateCollMetaCacheRequest{ Base: &commonpb.MsgBase{ MsgType: 0, //TODO,MsgType MsgID: 0, @@ -622,8 +622,11 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error { DbName: dbName, CollectionName: collectionName, }) - if err != nil { - return err + if status == nil { + return errors.New("invalidate collection metacache resp is nil") + } + if status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(status.Reason) } return nil } @@ -635,7 +638,7 @@ func (c *Core) SetDataService(s DataServiceInterface) error { if err != nil { return err } - Params.DataServiceSegmentChannel = rsp + Params.DataServiceSegmentChannel = rsp.Value log.Printf("data service segment channel name = %s", Params.DataServiceSegmentChannel) c.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) { diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index 5333dbf45..0b42e4020 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -28,14 +28,21 @@ type proxyMock struct { mutex sync.Mutex } -func (p *proxyMock) GetTimeTickChannel() (string, error) { - return fmt.Sprintf("proxy-time-tick-%d", p.randVal), nil +func (p *proxyMock) GetTimeTickChannel() (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + Value: fmt.Sprintf("proxy-time-tick-%d", p.randVal), + }, nil } -func (p *proxyMock) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error { +func (p *proxyMock) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { p.mutex.Lock() defer p.mutex.Unlock() p.collArray = append(p.collArray, request.CollectionName) - return nil + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, nil } func (p *proxyMock) GetCollArray() []string { p.mutex.Lock() @@ -72,8 +79,13 @@ func (d *dataMock) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*d return rst, nil } -func (d *dataMock) GetSegmentInfoChannel() (string, error) { - return fmt.Sprintf("segment-info-channel-%d", d.randVal), nil +func (d *dataMock) GetSegmentInfoChannel() (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + Value: fmt.Sprintf("segment-info-channel-%d", d.randVal), + }, nil } type indexMock struct { diff --git a/internal/msgstream/msg.go b/internal/msgstream/msg.go index 6e9a5ae2e..f23bf92e9 100644 --- a/internal/msgstream/msg.go +++ b/internal/msgstream/msg.go @@ -2,9 +2,9 @@ package msgstream import ( "context" + "errors" "github.com/golang/protobuf/proto" - "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -14,8 +14,6 @@ type MsgType = commonpb.MsgType type MarshalType = interface{} type TsMsg interface { - GetMsgContext() context.Context - SetMsgContext(context.Context) BeginTs() Timestamp EndTs() Timestamp Type() MsgType @@ -59,7 +57,7 @@ func ConvertToByteArray(input interface{}) ([]byte, error) { case []byte: return output, nil default: - return nil, errors.New("Cannot convert interface{} to []byte") + return nil, errors.New("cannot convert interface{} to []byte") } } @@ -73,14 +71,6 @@ func (it *InsertMsg) Type() MsgType { return it.Base.MsgType } -func (it *InsertMsg) GetMsgContext() context.Context { - return it.MsgCtx -} - -func (it *InsertMsg) SetMsgContext(ctx context.Context) { - it.MsgCtx = ctx -} - func (it *InsertMsg) Marshal(input TsMsg) (MarshalType, error) { insertMsg := input.(*InsertMsg) insertRequest := &insertMsg.InsertRequest @@ -129,13 +119,6 @@ func (fl *FlushCompletedMsg) Type() MsgType { return fl.Base.MsgType } -func (fl *FlushCompletedMsg) GetMsgContext() context.Context { - return fl.MsgCtx -} -func (fl *FlushCompletedMsg) SetMsgContext(ctx context.Context) { - fl.MsgCtx = ctx -} - func (fl *FlushCompletedMsg) Marshal(input TsMsg) (MarshalType, error) { flushCompletedMsgTask := input.(*FlushCompletedMsg) flushCompletedMsg := &flushCompletedMsgTask.SegmentFlushCompletedMsg @@ -174,13 +157,6 @@ func (fl *FlushMsg) Type() MsgType { return fl.Base.MsgType } -func (fl *FlushMsg) GetMsgContext() context.Context { - return fl.MsgCtx -} -func (fl *FlushMsg) SetMsgContext(ctx context.Context) { - fl.MsgCtx = ctx -} - func (fl *FlushMsg) Marshal(input TsMsg) (MarshalType, error) { flushMsgTask := input.(*FlushMsg) flushMsg := &flushMsgTask.FlushMsg @@ -218,14 +194,6 @@ func (dt *DeleteMsg) Type() MsgType { return dt.Base.MsgType } -func (dt *DeleteMsg) GetMsgContext() context.Context { - return dt.MsgCtx -} - -func (dt *DeleteMsg) SetMsgContext(ctx context.Context) { - dt.MsgCtx = ctx -} - func (dt *DeleteMsg) Marshal(input TsMsg) (MarshalType, error) { deleteMsg := input.(*DeleteMsg) deleteRequest := &deleteMsg.DeleteRequest @@ -275,14 +243,6 @@ func (st *SearchMsg) Type() MsgType { return st.Base.MsgType } -func (st *SearchMsg) GetMsgContext() context.Context { - return st.MsgCtx -} - -func (st *SearchMsg) SetMsgContext(ctx context.Context) { - st.MsgCtx = ctx -} - func (st *SearchMsg) Marshal(input TsMsg) (MarshalType, error) { searchTask := input.(*SearchMsg) searchRequest := &searchTask.SearchRequest @@ -320,14 +280,6 @@ func (srt *SearchResultMsg) Type() MsgType { return srt.Base.MsgType } -func (srt *SearchResultMsg) GetMsgContext() context.Context { - return srt.MsgCtx -} - -func (srt *SearchResultMsg) SetMsgContext(ctx context.Context) { - srt.MsgCtx = ctx -} - func (srt *SearchResultMsg) Marshal(input TsMsg) (MarshalType, error) { searchResultTask := input.(*SearchResultMsg) searchResultRequest := &searchResultTask.SearchResults @@ -365,14 +317,6 @@ func (tst *TimeTickMsg) Type() MsgType { return tst.Base.MsgType } -func (tst *TimeTickMsg) GetMsgContext() context.Context { - return tst.MsgCtx -} - -func (tst *TimeTickMsg) SetMsgContext(ctx context.Context) { - tst.MsgCtx = ctx -} - func (tst *TimeTickMsg) Marshal(input TsMsg) (MarshalType, error) { timeTickTask := input.(*TimeTickMsg) timeTick := &timeTickTask.TimeTickMsg @@ -411,14 +355,6 @@ func (qs *QueryNodeStatsMsg) Type() MsgType { return qs.Base.MsgType } -func (qs *QueryNodeStatsMsg) GetMsgContext() context.Context { - return qs.MsgCtx -} - -func (qs *QueryNodeStatsMsg) SetMsgContext(ctx context.Context) { - qs.MsgCtx = ctx -} - func (qs *QueryNodeStatsMsg) Marshal(input TsMsg) (MarshalType, error) { queryNodeSegStatsTask := input.(*QueryNodeStatsMsg) queryNodeSegStats := &queryNodeSegStatsTask.QueryNodeStats @@ -454,14 +390,6 @@ func (ss *SegmentStatisticsMsg) Type() MsgType { return ss.Base.MsgType } -func (ss *SegmentStatisticsMsg) GetMsgContext() context.Context { - return ss.MsgCtx -} - -func (ss *SegmentStatisticsMsg) SetMsgContext(ctx context.Context) { - ss.MsgCtx = ctx -} - func (ss *SegmentStatisticsMsg) Marshal(input TsMsg) (MarshalType, error) { segStatsTask := input.(*SegmentStatisticsMsg) segStats := &segStatsTask.SegmentStatistics @@ -507,14 +435,6 @@ func (cc *CreateCollectionMsg) Type() MsgType { return cc.Base.MsgType } -func (cc *CreateCollectionMsg) GetMsgContext() context.Context { - return cc.MsgCtx -} - -func (cc *CreateCollectionMsg) SetMsgContext(ctx context.Context) { - cc.MsgCtx = ctx -} - func (cc *CreateCollectionMsg) Marshal(input TsMsg) (MarshalType, error) { createCollectionMsg := input.(*CreateCollectionMsg) createCollectionRequest := &createCollectionMsg.CreateCollectionRequest @@ -551,13 +471,6 @@ type DropCollectionMsg struct { func (dc *DropCollectionMsg) Type() MsgType { return dc.Base.MsgType } -func (dc *DropCollectionMsg) GetMsgContext() context.Context { - return dc.MsgCtx -} - -func (dc *DropCollectionMsg) SetMsgContext(ctx context.Context) { - dc.MsgCtx = ctx -} func (dc *DropCollectionMsg) Marshal(input TsMsg) (MarshalType, error) { dropCollectionMsg := input.(*DropCollectionMsg) @@ -592,14 +505,6 @@ type CreatePartitionMsg struct { internalpb2.CreatePartitionRequest } -func (cc *CreatePartitionMsg) GetMsgContext() context.Context { - return cc.MsgCtx -} - -func (cc *CreatePartitionMsg) SetMsgContext(ctx context.Context) { - cc.MsgCtx = ctx -} - func (cc *CreatePartitionMsg) Type() MsgType { return cc.Base.MsgType } @@ -637,14 +542,6 @@ type DropPartitionMsg struct { internalpb2.DropPartitionRequest } -func (dc *DropPartitionMsg) GetMsgContext() context.Context { - return dc.MsgCtx -} - -func (dc *DropPartitionMsg) SetMsgContext(ctx context.Context) { - dc.MsgCtx = ctx -} - func (dc *DropPartitionMsg) Type() MsgType { return dc.Base.MsgType } @@ -729,14 +626,6 @@ func (sim *SegmentInfoMsg) Type() MsgType { return sim.Base.MsgType } -func (sim *SegmentInfoMsg) GetMsgContext() context.Context { - return sim.MsgCtx -} - -func (sim *SegmentInfoMsg) SetMsgContext(ctx context.Context) { - sim.MsgCtx = ctx -} - func (sim *SegmentInfoMsg) Marshal(input TsMsg) (MarshalType, error) { segInfoMsg := input.(*SegmentInfoMsg) mb, err := proto.Marshal(&segInfoMsg.SegmentMsg) diff --git a/internal/msgstream/pulsarms/pulsar_msgstream.go b/internal/msgstream/pulsarms/pulsar_msgstream.go index aedca490c..f70ce3d91 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream.go @@ -12,10 +12,6 @@ import ( "github.com/apache/pulsar-client-go/pulsar" "github.com/golang/protobuf/proto" - "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" - oplog "github.com/opentracing/opentracing-go/log" - "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream/util" @@ -247,49 +243,12 @@ func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error { msg := &pulsar.ProducerMessage{Payload: m} - var child opentracing.Span - if v.Msgs[i].Type() == commonpb.MsgType_kInsert || - v.Msgs[i].Type() == commonpb.MsgType_kSearch || - v.Msgs[i].Type() == commonpb.MsgType_kSearchResult { - tracer := opentracing.GlobalTracer() - ctx := v.Msgs[i].GetMsgContext() - if ctx == nil { - ctx = context.Background() - } - - if parent := opentracing.SpanFromContext(ctx); parent != nil { - child = tracer.StartSpan("start send pulsar msg", - opentracing.FollowsFrom(parent.Context())) - } else { - child = tracer.StartSpan("start send pulsar msg") - } - child.SetTag("hash keys", v.Msgs[i].HashKeys()) - child.SetTag("start time", v.Msgs[i].BeginTs()) - child.SetTag("end time", v.Msgs[i].EndTs()) - child.SetTag("msg type", v.Msgs[i].Type()) - msg.Properties = make(map[string]string) - err = tracer.Inject(child.Context(), opentracing.TextMap, &propertiesReaderWriter{msg.Properties}) - if err != nil { - child.LogFields(oplog.Error(err)) - child.Finish() - return err - } - child.LogFields(oplog.String("inject success", "inject success")) - } - if _, err := ms.producers[k].Send( context.Background(), msg, ); err != nil { - if child != nil { - child.LogFields(oplog.Error(err)) - child.Finish() - } return err } - if child != nil { - child.Finish() - } } } return nil @@ -309,49 +268,14 @@ func (ms *PulsarMsgStream) Broadcast(msgPack *MsgPack) error { } msg := &pulsar.ProducerMessage{Payload: m} - var child opentracing.Span - if v.Type() == commonpb.MsgType_kInsert || - v.Type() == commonpb.MsgType_kSearch || - v.Type() == commonpb.MsgType_kSearchResult { - tracer := opentracing.GlobalTracer() - ctx := v.GetMsgContext() - if ctx == nil { - ctx = context.Background() - } - if parent := opentracing.SpanFromContext(ctx); parent != nil { - child = tracer.StartSpan("start send pulsar msg", - opentracing.FollowsFrom(parent.Context())) - } else { - child = tracer.StartSpan("start send pulsar msg, start time: %d") - } - child.SetTag("hash keys", v.HashKeys()) - child.SetTag("start time", v.BeginTs()) - child.SetTag("end time", v.EndTs()) - child.SetTag("msg type", v.Type()) - msg.Properties = make(map[string]string) - err = tracer.Inject(child.Context(), opentracing.TextMap, &propertiesReaderWriter{msg.Properties}) - if err != nil { - child.LogFields(oplog.Error(err)) - child.Finish() - return err - } - child.LogFields(oplog.String("inject success", "inject success")) - } for i := 0; i < producerLen; i++ { if _, err := ms.producers[i].Send( context.Background(), msg, ); err != nil { - if child != nil { - child.LogFields(oplog.Error(err)) - child.Finish() - } return err } } - if child != nil { - child.Finish() - } } return nil } @@ -411,23 +335,6 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() { continue } tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.Base.MsgType) - if tsMsg.Type() == commonpb.MsgType_kSearch || - tsMsg.Type() == commonpb.MsgType_kSearchResult { - tracer := opentracing.GlobalTracer() - spanContext, err := tracer.Extract(opentracing.HTTPHeaders, &propertiesReaderWriter{pulsarMsg.Properties()}) - if err != nil { - log.Println("extract message err") - log.Println(err.Error()) - } - span := opentracing.StartSpan("pulsar msg received", - ext.RPCServerOption(spanContext)) - span.SetTag("msg type", tsMsg.Type()) - span.SetTag("hash keys", tsMsg.HashKeys()) - span.SetTag("start time", tsMsg.BeginTs()) - span.SetTag("end time", tsMsg.EndTs()) - tsMsg.SetMsgContext(opentracing.ContextWithSpan(context.Background(), span)) - span.Finish() - } if err != nil { log.Printf("Failed to unmarshal tsMsg, error = %v", err) continue @@ -521,8 +428,6 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() { ms.unsolvedBuf = make(map[Consumer][]TsMsg) isChannelReady := make(map[Consumer]bool) eofMsgTimeStamp := make(map[Consumer]Timestamp) - spans := make(map[Timestamp]opentracing.Span) - ctxs := make(map[Timestamp]context.Context) for _, consumer := range ms.consumers { ms.unsolvedBuf[consumer] = make([]TsMsg, 0) } @@ -558,22 +463,8 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() { timeTickMsg = v continue } - var ctx context.Context - var span opentracing.Span - if v.Type() == commonpb.MsgType_kInsert { - if _, ok := spans[v.BeginTs()]; !ok { - span, ctx = opentracing.StartSpanFromContext(v.GetMsgContext(), "after find time tick") - ctxs[v.BeginTs()] = ctx - spans[v.BeginTs()] = span - } - } if v.EndTs() <= timeStamp { timeTickBuf = append(timeTickBuf, v) - if v.Type() == commonpb.MsgType_kInsert { - v.SetMsgContext(ctxs[v.BeginTs()]) - spans[v.BeginTs()].Finish() - delete(spans, v.BeginTs()) - } } else { tempBuffer = append(tempBuffer, v) } @@ -643,23 +534,6 @@ func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer, MsgID: typeutil.PulsarMsgIDToString(pulsarMsg.ID()), }) - if tsMsg.Type() == commonpb.MsgType_kInsert { - tracer := opentracing.GlobalTracer() - spanContext, err := tracer.Extract(opentracing.HTTPHeaders, &propertiesReaderWriter{pulsarMsg.Properties()}) - if err != nil { - log.Println("extract message err") - log.Println(err.Error()) - } - span := opentracing.StartSpan("pulsar msg received", - ext.RPCServerOption(spanContext)) - span.SetTag("hash keys", tsMsg.HashKeys()) - span.SetTag("start time", tsMsg.BeginTs()) - span.SetTag("end time", tsMsg.EndTs()) - span.SetTag("msg type", tsMsg.Type()) - tsMsg.SetMsgContext(opentracing.ContextWithSpan(context.Background(), span)) - span.Finish() - } - mu.Lock() ms.unsolvedBuf[consumer] = append(ms.unsolvedBuf[consumer], tsMsg) mu.Unlock() diff --git a/internal/msgstream/util/repack_func.go b/internal/msgstream/util/repack_func.go index 176778065..45e58f70c 100644 --- a/internal/msgstream/util/repack_func.go +++ b/internal/msgstream/util/repack_func.go @@ -53,9 +53,6 @@ func InsertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e } insertMsg := &msgstream.InsertMsg{ - BaseMsg: BaseMsg{ - MsgCtx: request.GetMsgContext(), - }, InsertRequest: sliceRequest, } result[key].Msgs = append(result[key].Msgs, insertMsg) @@ -102,9 +99,6 @@ func DeleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e } deleteMsg := &msgstream.DeleteMsg{ - BaseMsg: BaseMsg{ - MsgCtx: request.GetMsgContext(), - }, DeleteRequest: sliceRequest, } result[key].Msgs = append(result[key].Msgs, deleteMsg) diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go index 6e951f477..de9bfac82 100644 --- a/internal/proxynode/impl.go +++ b/internal/proxynode/impl.go @@ -2,21 +2,18 @@ package proxynode import ( "context" - "errors" "log" "strconv" "time" - "github.com/zilliztech/milvus-distributed/internal/proto/querypb" - - "github.com/zilliztech/milvus-distributed/internal/proto/datapb" - + "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/proxypb" + "github.com/zilliztech/milvus-distributed/internal/proto/querypb" ) const ( diff --git a/internal/proxynode/interface.go b/internal/proxynode/interface.go index fcb44aa13..509c73c6c 100644 --- a/internal/proxynode/interface.go +++ b/internal/proxynode/interface.go @@ -49,7 +49,7 @@ type QueryServiceClient interface { type DataServiceClient interface { AssignSegmentID(req *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error) - GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) + GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) Flush(req *datapb.FlushRequest) (*commonpb.Status, error) GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) @@ -58,7 +58,7 @@ type DataServiceClient interface { } type ProxyServiceClient interface { - GetTimeTickChannel() (string, error) + GetTimeTickChannel() (*milvuspb.StringResponse, error) RegisterNode(request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error) GetComponentStates() (*internalpb2.ComponentStates, error) } diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index ee999d4d2..850027122 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -19,9 +19,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go/config" - "github.com/zilliztech/milvus-distributed/internal/allocator" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" @@ -57,7 +54,6 @@ type NodeImpl struct { manipulationMsgStream msgstream.MsgStream queryMsgStream msgstream.MsgStream - tracer opentracing.Tracer closer io.Closer // Add callback functions at different stages @@ -106,7 +102,6 @@ func (node *NodeImpl) waitForServiceReady(service Component, serviceName string) } func (node *NodeImpl) Init() error { - factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamSearchBufSize, 1024) // todo wait for proxyservice state changed to Healthy @@ -136,6 +131,8 @@ func (node *NodeImpl) Init() error { return err } + factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamSearchBufSize, 1024) + // wait for dataservice state changed to Healthy if node.dataServiceClient != nil { err = node.waitForServiceReady(node.dataServiceClient, "DataService") @@ -182,19 +179,6 @@ func (node *NodeImpl) Init() error { // return err //} - cfg := &config.Configuration{ - ServiceName: "proxynode", - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - } - node.tracer, node.closer, err = cfg.NewTracer() - if err != nil { - panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) - } - opentracing.SetGlobalTracer(node.tracer) - node.queryMsgStream, _ = factory.NewMsgStream(node.ctx) node.queryMsgStream.AsProducer(Params.SearchChannelNames) log.Println("create query message stream ...") diff --git a/internal/proxynode/repack_func.go b/internal/proxynode/repack_func.go index 01a1a67ad..3837338f1 100644 --- a/internal/proxynode/repack_func.go +++ b/internal/proxynode/repack_func.go @@ -214,7 +214,6 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg, insertMsg := &msgstream.InsertMsg{ InsertRequest: sliceRequest, } - insertMsg.SetMsgContext(request.GetMsgContext()) if together { // all rows with same hash value are accumulated to only one message if len(result[key].Msgs) <= 0 { result[key].Msgs = append(result[key].Msgs, insertMsg) diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go index bfa92e0ed..9aff4e257 100644 --- a/internal/proxynode/task.go +++ b/internal/proxynode/task.go @@ -1,6 +1,7 @@ package proxynode import ( + "context" "errors" "log" "math" @@ -146,7 +147,6 @@ func (it *InsertTask) Execute() error { EndTs: it.EndTs(), Msgs: make([]msgstream.TsMsg, 1), } - tsMsg.SetMsgContext(it.Ctx()) it.result = &milvuspb.InsertResponse{ Status: &commonpb.Status{ @@ -160,7 +160,7 @@ func (it *InsertTask) Execute() error { stream, err := globalInsertChannelsMap.getInsertMsgStream(collID) if err != nil { - collectionInsertChannels, err := it.dataServiceClient.GetInsertChannels(&datapb.InsertChannelRequest{ + resp, _ := it.dataServiceClient.GetInsertChannels(&datapb.InsertChannelRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kInsert, // todo MsgID: it.Base.MsgID, // todo @@ -170,10 +170,13 @@ func (it *InsertTask) Execute() error { DbID: 0, // todo CollectionID: collID, }) - if err != nil { - return err + if resp == nil { + return errors.New("get insert channels resp is nil") + } + if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(resp.Status.Reason) } - err = globalInsertChannelsMap.createInsertMsgStream(collID, collectionInsertChannels) + err = globalInsertChannelsMap.createInsertMsgStream(collID, resp.Values) if err != nil { return err } @@ -315,7 +318,7 @@ func (cct *CreateCollectionTask) Execute() error { if err != nil { return err } - collectionInsertChannels, err := cct.dataServiceClient.GetInsertChannels(&datapb.InsertChannelRequest{ + resp, _ := cct.dataServiceClient.GetInsertChannels(&datapb.InsertChannelRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kInsert, // todo MsgID: cct.Base.MsgID, // todo @@ -325,10 +328,13 @@ func (cct *CreateCollectionTask) Execute() error { DbID: 0, // todo CollectionID: collID, }) - if err != nil { - return err + if resp == nil { + return errors.New("get insert channels resp is nil") + } + if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(resp.Status.Reason) } - err = globalInsertChannelsMap.createInsertMsgStream(collID, collectionInsertChannels) + err = globalInsertChannelsMap.createInsertMsgStream(collID, resp.Values) if err != nil { return err } @@ -387,19 +393,19 @@ func (dct *DropCollectionTask) PreExecute() error { } func (dct *DropCollectionTask) Execute() error { - var err error collID, err := globalMetaCache.GetCollectionID(dct.CollectionName) if err != nil { return err } - dct.result, err = dct.masterClient.DropCollection(dct.DropCollectionRequest) - if dct.result.ErrorCode == commonpb.ErrorCode_SUCCESS { - err = globalInsertChannelsMap.closeInsertMsgStream(collID) - if err != nil { - return err - } + dct.result, _ = dct.masterClient.DropCollection(dct.DropCollectionRequest) + if dct.result.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(dct.result.Reason) } - return err + err = globalInsertChannelsMap.closeInsertMsgStream(collID) + if err != nil { + return err + } + return nil } func (dct *DropCollectionTask) PostExecute() error { @@ -507,7 +513,6 @@ func (st *SearchTask) Execute() error { EndTs: st.Base.Timestamp, Msgs: make([]msgstream.TsMsg, 1), } - tsMsg.SetMsgContext(st.Ctx()) msgPack.Msgs[0] = tsMsg err := st.queryMsgStream.Produce(msgPack) log.Printf("[NodeImpl] length of searchMsg: %v", len(msgPack.Msgs)) @@ -719,6 +724,12 @@ func (hct *HasCollectionTask) PreExecute() error { func (hct *HasCollectionTask) Execute() error { var err error hct.result, err = hct.masterClient.HasCollection(hct.HasCollectionRequest) + if hct.result == nil { + return errors.New("has collection resp is nil") + } + if hct.result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(hct.result.Status.Reason) + } return err } @@ -775,10 +786,13 @@ func (dct *DescribeCollectionTask) PreExecute() error { func (dct *DescribeCollectionTask) Execute() error { var err error dct.result, err = dct.masterClient.DescribeCollection(dct.DescribeCollectionRequest) - if err != nil { - return err + if dct.result == nil { + return errors.New("has collection resp is nil") } - return nil + if dct.result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(dct.result.Status.Reason) + } + return err } func (dct *DescribeCollectionTask) PostExecute() error { @@ -842,9 +856,12 @@ func (g *GetCollectionsStatisticsTask) Execute() error { CollectionID: collID, } - result, err := g.dataServiceClient.GetCollectionStatistics(req) - if err != nil { - return err + result, _ := g.dataServiceClient.GetCollectionStatistics(req) + if result == nil { + return errors.New("get collection statistics resp is nil") + } + if result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(result.Status.Reason) } g.result = &milvuspb.CollectionStatsResponse{ Status: &commonpb.Status{ @@ -865,6 +882,7 @@ type ShowCollectionsTask struct { *milvuspb.ShowCollectionRequest masterClient MasterClient result *milvuspb.ShowCollectionResponse + ctx context.Context } func (sct *ShowCollectionsTask) OnEnqueue() error { @@ -906,6 +924,12 @@ func (sct *ShowCollectionsTask) PreExecute() error { func (sct *ShowCollectionsTask) Execute() error { var err error sct.result, err = sct.masterClient.ShowCollections(sct.ShowCollectionRequest) + if sct.result == nil { + return errors.New("get collection statistics resp is nil") + } + if sct.result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(sct.result.Status.Reason) + } return err } @@ -968,6 +992,12 @@ func (cpt *CreatePartitionTask) PreExecute() error { func (cpt *CreatePartitionTask) Execute() (err error) { cpt.result, err = cpt.masterClient.CreatePartition(cpt.CreatePartitionRequest) + if cpt.result == nil { + return errors.New("get collection statistics resp is nil") + } + if cpt.result.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(cpt.result.Reason) + } return err } @@ -1030,6 +1060,12 @@ func (dpt *DropPartitionTask) PreExecute() error { func (dpt *DropPartitionTask) Execute() (err error) { dpt.result, err = dpt.masterClient.DropPartition(dpt.DropPartitionRequest) + if dpt.result == nil { + return errors.New("get collection statistics resp is nil") + } + if dpt.result.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(dpt.result.Reason) + } return err } @@ -1091,6 +1127,12 @@ func (hpt *HasPartitionTask) PreExecute() error { func (hpt *HasPartitionTask) Execute() (err error) { hpt.result, err = hpt.masterClient.HasPartition(hpt.HasPartitionRequest) + if hpt.result == nil { + return errors.New("get collection statistics resp is nil") + } + if hpt.result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(hpt.result.Status.Reason) + } return err } @@ -1147,10 +1189,13 @@ func (spt *ShowPartitionsTask) PreExecute() error { func (spt *ShowPartitionsTask) Execute() error { var err error spt.result, err = spt.masterClient.ShowPartitions(spt.ShowPartitionRequest) - if err != nil { - return err + if spt.result == nil { + return errors.New("get collection statistics resp is nil") } - return nil + if spt.result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(spt.result.Status.Reason) + } + return err } func (spt *ShowPartitionsTask) PostExecute() error { @@ -1210,8 +1255,15 @@ func (cit *CreateIndexTask) PreExecute() error { return nil } -func (cit *CreateIndexTask) Execute() (err error) { +func (cit *CreateIndexTask) Execute() error { + var err error cit.result, err = cit.masterClient.CreateIndex(cit.CreateIndexRequest) + if cit.result == nil { + return errors.New("get collection statistics resp is nil") + } + if cit.result.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(cit.result.Reason) + } return err } @@ -1275,6 +1327,12 @@ func (dit *DescribeIndexTask) PreExecute() error { func (dit *DescribeIndexTask) Execute() error { var err error dit.result, err = dit.masterClient.DescribeIndex(dit.DescribeIndexRequest) + if dit.result == nil { + return errors.New("get collection statistics resp is nil") + } + if dit.result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(dit.result.Status.Reason) + } return err } @@ -1495,9 +1553,9 @@ func (ft *FlushTask) Execute() error { CollectionID: collID, } var status *commonpb.Status - status, err = ft.dataServiceClient.Flush(flushReq) - if err != nil { - return nil + status, _ = ft.dataServiceClient.Flush(flushReq) + if status == nil { + return errors.New("flush resp is nil") } if status.ErrorCode != commonpb.ErrorCode_SUCCESS { return errors.New(status.Reason) diff --git a/internal/proxyservice/impl.go b/internal/proxyservice/impl.go index f03afa130..2f49f6717 100644 --- a/internal/proxyservice/impl.go +++ b/internal/proxyservice/impl.go @@ -114,7 +114,7 @@ func (s *ServiceImpl) Init() error { } insertTickMsgStream, _ := factory.NewMsgStream(s.ctx) insertTickMsgStream.AsProducer(channels) - log.Println("create service time tick producer channel: ", channels) + log.Println("create insert time tick producer channel: ", channels) nodeTimeTickMsgStream, _ := factory.NewMsgStream(s.ctx) nodeTimeTickMsgStream.AsConsumer(Params.NodeTimeTickChannel, @@ -175,11 +175,16 @@ func (s *ServiceImpl) UpdateStateCode(code internalpb2.StateCode) { s.stateCode = code } -func (s *ServiceImpl) GetTimeTickChannel() (string, error) { - return Params.ServiceTimeTickChannel, nil +func (s *ServiceImpl) GetTimeTickChannel() (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + Value: Params.ServiceTimeTickChannel, + }, nil } -func (s *ServiceImpl) GetStatisticsChannel() (string, error) { +func (s *ServiceImpl) GetStatisticsChannel() (*milvuspb.StringResponse, error) { panic("implement me") } @@ -260,7 +265,7 @@ func (s *ServiceImpl) RegisterNode(request *proxypb.RegisterNodeRequest) (*proxy return t.response, nil } -func (s *ServiceImpl) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error { +func (s *ServiceImpl) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { log.Println("InvalidateCollectionMetaCache") ctx, cancel := context.WithTimeout(s.ctx, timeoutInterval) defer cancel() @@ -275,13 +280,13 @@ func (s *ServiceImpl) InvalidateCollectionMetaCache(request *proxypb.InvalidateC err = s.sched.InvalidateCollectionMetaCacheTaskQueue.Enqueue(t) if err != nil { - return err + return nil, err } err = t.WaitToFinish() if err != nil { - return err + return nil, err } - return nil + return nil, nil } diff --git a/internal/proxyservice/interface.go b/internal/proxyservice/interface.go index 7171a4465..0f21af5b9 100644 --- a/internal/proxyservice/interface.go +++ b/internal/proxyservice/interface.go @@ -1,6 +1,7 @@ package proxyservice import ( + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/proxypb" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" @@ -15,5 +16,5 @@ type ProxyService interface { RegisterLink() (*milvuspb.RegisterLinkResponse, error) RegisterNode(request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error) // TODO: i'm sure it's not a best way to keep consistency, fix me - InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error + InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) } diff --git a/internal/proxyservice/node_info.go b/internal/proxyservice/node_info.go index 0bceb22fb..9b51c5892 100644 --- a/internal/proxyservice/node_info.go +++ b/internal/proxyservice/node_info.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + grpcproxynodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/proxynode/client" "github.com/zilliztech/milvus-distributed/internal/errors" @@ -25,7 +27,7 @@ type NodeClient interface { Start() error Stop() error - InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error + InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) } type GlobalNodeInfoTable struct { diff --git a/internal/proxyservice/task.go b/internal/proxyservice/task.go index 1195eb39f..17fab676b 100644 --- a/internal/proxyservice/task.go +++ b/internal/proxyservice/task.go @@ -149,9 +149,12 @@ func (t *InvalidateCollectionMetaCacheTask) Execute() error { return err } for _, c := range clients { - err = c.InvalidateCollectionMetaCache(t.request) - if err != nil { - return err + status, _ := c.InvalidateCollectionMetaCache(t.request) + if status == nil { + return errors.New("invalidate collection meta cache error") + } + if status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(status.Reason) } } return nil diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index d166d11f5..171875b5d 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -1,11 +1,9 @@ package querynode import ( - "context" "log" "math" - "github.com/opentracing/opentracing-go" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" ) @@ -34,28 +32,6 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { // TODO: add error handling } - var childs []opentracing.Span - tracer := opentracing.GlobalTracer() - if tracer != nil && msgStreamMsg != nil { - for _, msg := range msgStreamMsg.TsMessages() { - if msg.Type() == commonpb.MsgType_kInsert || msg.Type() == commonpb.MsgType_kSearch { - var child opentracing.Span - ctx := msg.GetMsgContext() - if parent := opentracing.SpanFromContext(ctx); parent != nil { - child = tracer.StartSpan("pass filter node", - opentracing.FollowsFrom(parent.Context())) - } else { - child = tracer.StartSpan("pass filter node") - } - child.SetTag("hash keys", msg.HashKeys()) - child.SetTag("start time", msg.BeginTs()) - child.SetTag("end time", msg.EndTs()) - msg.SetMsgContext(opentracing.ContextWithSpan(ctx, child)) - childs = append(childs, child) - } - } - } - ddMsg, ok := (*in[1]).(*ddMsg) if !ok { log.Println("type assertion failed for ddMsg") @@ -70,20 +46,11 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { timestampMax: msgStreamMsg.TimestampMax(), }, } - for key, msg := range msgStreamMsg.TsMessages() { + for _, msg := range msgStreamMsg.TsMessages() { switch msg.Type() { case commonpb.MsgType_kInsert: - var ctx2 context.Context - if childs != nil { - if childs[key] != nil { - ctx2 = opentracing.ContextWithSpan(msg.GetMsgContext(), childs[key]) - } else { - ctx2 = context.Background() - } - } resMsg := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg)) if resMsg != nil { - resMsg.SetMsgContext(ctx2) iMsg.insertMessages = append(iMsg.insertMessages, resMsg) } // case commonpb.MsgType_kDelete: @@ -96,9 +63,6 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { iMsg.gcRecord = ddMsg.gcRecord var res Msg = &iMsg - for _, child := range childs { - child.Finish() - } return []*Msg{&res} } diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index dcb6216ae..a47facef0 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -6,8 +6,6 @@ import ( "log" "sync" - "github.com/opentracing/opentracing-go" - oplog "github.com/opentracing/opentracing-go/log" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" ) @@ -42,30 +40,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg { // TODO: add error handling } - var childs []opentracing.Span - tracer := opentracing.GlobalTracer() - if tracer != nil && iMsg != nil { - for _, msg := range iMsg.insertMessages { - if msg.Type() == commonpb.MsgType_kInsert || msg.Type() == commonpb.MsgType_kSearch { - var child opentracing.Span - ctx := msg.GetMsgContext() - if parent := opentracing.SpanFromContext(ctx); parent != nil { - child = tracer.StartSpan("pass insert node", - opentracing.FollowsFrom(parent.Context())) - } else { - child = tracer.StartSpan("pass insert node") - } - child.SetTag("hash keys", msg.HashKeys()) - child.SetTag("start time", msg.BeginTs()) - child.SetTag("end time", msg.EndTs()) - msg.SetMsgContext(opentracing.ContextWithSpan(ctx, child)) - childs = append(childs, child) - } - } - } - insertData := InsertData{ - insertContext: make(map[int64]context.Context), insertIDs: make(map[int64][]int64), insertTimestamps: make(map[int64][]uint64), insertRecords: make(map[int64][]*commonpb.Blob), @@ -74,7 +49,6 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg { // 1. hash insertMessages to insertData for _, task := range iMsg.insertMessages { - insertData.insertContext[task.SegmentID] = task.GetMsgContext() insertData.insertIDs[task.SegmentID] = append(insertData.insertIDs[task.SegmentID], task.RowIDs...) insertData.insertTimestamps[task.SegmentID] = append(insertData.insertTimestamps[task.SegmentID], task.Timestamps...) insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...) @@ -108,7 +82,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg { wg := sync.WaitGroup{} for segmentID := range insertData.insertRecords { wg.Add(1) - go iNode.insert(insertData.insertContext[segmentID], &insertData, segmentID, &wg) + go iNode.insert(&insertData, segmentID, &wg) } wg.Wait() @@ -116,21 +90,15 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg { gcRecord: iMsg.gcRecord, timeRange: iMsg.timeRange, } - for _, child := range childs { - child.Finish() - } return []*Msg{&res} } -func (iNode *insertNode) insert(ctx context.Context, insertData *InsertData, segmentID int64, wg *sync.WaitGroup) { - span, _ := opentracing.StartSpanFromContext(ctx, "insert node insert function") - defer span.Finish() +func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) { var targetSegment, err = iNode.replica.getSegmentByID(segmentID) if err != nil { log.Println("cannot find segment:", segmentID) // TODO: add error handling wg.Done() - span.LogFields(oplog.Error(err)) return } @@ -144,7 +112,6 @@ func (iNode *insertNode) insert(ctx context.Context, insertData *InsertData, seg log.Println(err) // TODO: add error handling wg.Done() - span.LogFields(oplog.Error(err)) return } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 9f4629c1b..2be99ad95 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -20,9 +20,6 @@ import ( "log" "sync/atomic" - "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go/config" - "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -62,7 +59,6 @@ type QueryNode struct { statsService *statsService //opentracing - tracer opentracing.Tracer closer io.Closer // clients @@ -85,20 +81,6 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode { statsService: nil, } - var err error - cfg := &config.Configuration{ - ServiceName: "query_node", - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - } - node.tracer, node.closer, err = cfg.NewTracer() - if err != nil { - panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) - } - opentracing.SetGlobalTracer(node.tracer) - segmentsMap := make(map[int64]*Segment) collections := make([]*Collection, 0) @@ -126,20 +108,6 @@ func NewQueryNodeWithoutID(ctx context.Context) *QueryNode { statsService: nil, } - var err error - cfg := &config.Configuration{ - ServiceName: "query_node", - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - } - node.tracer, node.closer, err = cfg.NewTracer() - if err != nil { - panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) - } - opentracing.SetGlobalTracer(node.tracer) - segmentsMap := make(map[int64]*Segment) collections := make([]*Collection, 0) diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index c44e9406d..b663d9e90 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -9,9 +9,6 @@ import ( "strconv" "sync" - "github.com/opentracing/opentracing-go" - oplog "github.com/opentracing/opentracing-go/log" - "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -145,19 +142,14 @@ func (ss *searchService) receiveSearchMsg() { searchMsg = append(searchMsg, msgPack.Msgs[i]) } for _, msg := range searchMsg { - span, ctx := opentracing.StartSpanFromContext(msg.GetMsgContext(), "receive search msg") - msg.SetMsgContext(ctx) err := ss.search(msg) if err != nil { log.Println(err) - span.LogFields(oplog.Error(err)) err2 := ss.publishFailedSearchResult(msg, err.Error()) if err2 != nil { - span.LogFields(oplog.Error(err2)) log.Println("publish FailedSearchResult failed, error message: ", err2) } } - span.Finish() } log.Println("ReceiveSearchMsg, do search done, num of searchMsg = ", len(searchMsg)) } @@ -219,12 +211,8 @@ func (ss *searchService) doUnsolvedMsgSearch() { // TODO:: cache map[dsl]plan // TODO: reBatched search requests func (ss *searchService) search(msg msgstream.TsMsg) error { - span, ctx := opentracing.StartSpanFromContext(msg.GetMsgContext(), "do search") - defer span.Finish() - msg.SetMsgContext(ctx) searchMsg, ok := msg.(*msgstream.SearchMsg) if !ok { - span.LogFields(oplog.Error(errors.New("invalid request type = " + string(msg.Type())))) return errors.New("invalid request type = " + string(msg.Type())) } @@ -233,25 +221,21 @@ func (ss *searchService) search(msg msgstream.TsMsg) error { query := milvuspb.SearchRequest{} err := proto.Unmarshal(queryBlob, &query) if err != nil { - span.LogFields(oplog.Error(err)) return errors.New("unmarshal query failed") } collectionID := searchMsg.CollectionID collection, err := ss.replica.getCollectionByID(collectionID) if err != nil { - span.LogFields(oplog.Error(err)) return err } dsl := query.Dsl plan, err := createPlan(*collection, dsl) if err != nil { - span.LogFields(oplog.Error(err)) return err } placeHolderGroupBlob := query.PlaceholderGroup placeholderGroup, err := parserPlaceholderGroup(plan, placeHolderGroupBlob) if err != nil { - span.LogFields(oplog.Error(err)) return err } placeholderGroups := make([]*PlaceholderGroup, 0) @@ -290,7 +274,6 @@ func (ss *searchService) search(msg msgstream.TsMsg) error { searchResult, err := segment.segmentSearch(plan, placeholderGroups, []Timestamp{searchTimestamp}) if err != nil { - span.LogFields(oplog.Error(err)) return err } searchResults = append(searchResults, searchResult) @@ -306,7 +289,6 @@ func (ss *searchService) search(msg msgstream.TsMsg) error { for i := 0; i < int(nq); i++ { bs, err := proto.Marshal(hit) if err != nil { - span.LogFields(oplog.Error(err)) return err } nilHits[i] = bs @@ -329,7 +311,6 @@ func (ss *searchService) search(msg msgstream.TsMsg) error { } err = ss.publishSearchResult(searchResultMsg) if err != nil { - span.LogFields(oplog.Error(err)) return err } return nil @@ -340,22 +321,18 @@ func (ss *searchService) search(msg msgstream.TsMsg) error { numSegment := int64(len(searchResults)) err2 := reduceSearchResults(searchResults, numSegment, inReduced) if err2 != nil { - span.LogFields(oplog.Error(err2)) return err2 } err = fillTargetEntry(plan, searchResults, matchedSegments, inReduced) if err != nil { - span.LogFields(oplog.Error(err)) return err } marshaledHits, err := reorganizeQueryResults(plan, placeholderGroups, searchResults, numSegment, inReduced) if err != nil { - span.LogFields(oplog.Error(err)) return err } hitsBlob, err := marshaledHits.getHitsBlob() if err != nil { - span.LogFields(oplog.Error(err)) return err } @@ -407,7 +384,6 @@ func (ss *searchService) search(msg msgstream.TsMsg) error { //} err = ss.publishSearchResult(searchResultMsg) if err != nil { - span.LogFields(oplog.Error(err)) return err } } diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index cb55a836d..f80b92e6a 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -24,7 +24,7 @@ type MasterServiceInterface interface { type DataServiceInterface interface { GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) - GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) + GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) } type QueryNodeInterface interface { @@ -188,10 +188,17 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com DbID: req.DbID, CollectionID: req.CollectionID, } - dmChannels, err := qs.dataServiceClient.GetInsertChannels(&channelRequest) + resp, err := qs.dataServiceClient.GetInsertChannels(&channelRequest) + if resp == nil { + err = errors.New("get insert channels resp is nil") + } + if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + err = errors.New(resp.Status.Reason) + } if err != nil { return fn(err), err } + dmChannels := resp.Values // get partitionIDs showPartitionRequest := &milvuspb.ShowPartitionRequest{ @@ -428,10 +435,16 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm CollectionID: collectionID, } - dmChannels, err := qs.dataServiceClient.GetInsertChannels(&channelRequest) - if err != nil { + resp, err := qs.dataServiceClient.GetInsertChannels(&channelRequest) + if resp == nil { + err = errors.New("get insert channels resp is nil") + return fn(err), err + } + if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + err = errors.New(resp.Status.Reason) return fn(err), err } + dmChannels := resp.Values for _, partitionID := range partitionIDs { loadSegmentRequest := &querypb.LoadSegmentRequest{ CollectionID: collectionID, diff --git a/internal/queryservice/queryservice_test.go b/internal/queryservice/queryservice_test.go index a6e36bbee..3c7ca3616 100644 --- a/internal/queryservice/queryservice_test.go +++ b/internal/queryservice/queryservice_test.go @@ -155,8 +155,13 @@ func (data *dataMock) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datap return ret, nil } -func (data *dataMock) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) { - return []string{"test-insert"}, nil +func (data *dataMock) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) { + return &internalpb2.StringList{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + Values: []string{"test-insert"}, + }, nil } func TestQueryService_Init(t *testing.T) { diff --git a/internal/util/flowgraph/input_node.go b/internal/util/flowgraph/input_node.go index e79f350b8..0c0730f7f 100644 --- a/internal/util/flowgraph/input_node.go +++ b/internal/util/flowgraph/input_node.go @@ -1,12 +1,8 @@ package flowgraph import ( - "fmt" "log" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - - "github.com/opentracing/opentracing-go" "github.com/zilliztech/milvus-distributed/internal/msgstream" ) @@ -34,28 +30,6 @@ func (inNode *InputNode) Operate([]*Msg) []*Msg { msgPack := (*inNode.inStream).Consume() - var childs []opentracing.Span - tracer := opentracing.GlobalTracer() - if tracer != nil && msgPack != nil { - for _, msg := range msgPack.Msgs { - if msg.Type() == commonpb.MsgType_kInsert { - var child opentracing.Span - ctx := msg.GetMsgContext() - if parent := opentracing.SpanFromContext(ctx); parent != nil { - child = tracer.StartSpan(fmt.Sprintf("through msg input node, start time = %d", msg.BeginTs()), - opentracing.FollowsFrom(parent.Context())) - } else { - child = tracer.StartSpan(fmt.Sprintf("through msg input node, start time = %d", msg.BeginTs())) - } - child.SetTag("hash keys", msg.HashKeys()) - child.SetTag("start time", msg.BeginTs()) - child.SetTag("end time", msg.EndTs()) - msg.SetMsgContext(opentracing.ContextWithSpan(ctx, child)) - childs = append(childs, child) - } - } - } - // TODO: add status if msgPack == nil { log.Println("null msg pack") @@ -69,10 +43,6 @@ func (inNode *InputNode) Operate([]*Msg) []*Msg { startPositions: msgPack.StartPositions, } - for _, child := range childs { - child.Finish() - } - return []*Msg{&msgStreamMsg} } diff --git a/internal/util/typeutil/interface.go b/internal/util/typeutil/interface.go index 2116a9e09..dc9c3f52b 100644 --- a/internal/util/typeutil/interface.go +++ b/internal/util/typeutil/interface.go @@ -4,6 +4,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/querypb" ) @@ -15,8 +16,8 @@ type Service interface { type Component interface { GetComponentStates() (*internalpb2.ComponentStates, error) - GetTimeTickChannel() (string, error) - GetStatisticsChannel() (string, error) + GetTimeTickChannel() (*milvuspb.StringResponse, error) + GetStatisticsChannel() (*milvuspb.StringResponse, error) } type IndexNodeInterface interface { -- GitLab