提交 c2ca2c27 编写于 作者: G godchen 提交者: yefu.chen

Fix response check error

Signed-off-by: Ngodchen <qingxiang.chen@zilliz.com>
上级 5658f779
......@@ -9,5 +9,5 @@ dataservice:
defaultSizePerRecord: 1024
# old name: segmentExpireDuration: 2000
IDAssignExpiration: 2000 # ms
insertChannelNum: 16
insertChannelNum: 2
dataNodeNum: 1
\ No newline at end of file
......@@ -13,10 +13,12 @@
```go
type IndexService interface {
Service
RegisterNode(req RegisterNodeRequest) (RegisterNodeResponse, error)
BuildIndex(req BuildIndexRequest) (BuildIndexResponse, error)
GetIndexStates(req IndexStatesRequest) (IndexStatesResponse, error)
GetIndexFilePaths(req IndexFilePathRequest) (IndexFilePathsResponse, error)
RegisterNode(RegisterNodeRequest) (RegisterNodeResponse, error)
BuildIndex(BuildIndexRequest) (BuildIndexResponse, error)
GetIndexStates(IndexStatesRequest) (IndexStatesResponse, error)
GetIndexFilePaths(IndexFilePathRequest) (IndexFilePathsResponse, error)
GetTimeTickChannel() (StringResponse, error)
GetStatisticsChannel() (StringResponse, error)
NotifyTaskState(TaskStateNotification) error
}
......
......@@ -13,7 +13,7 @@ type ProxyService interface {
Service
RegisterLink() (RegisterLinkResponse, error)
RegisterNode(req RegisterNodeRequest) (RegisterNodeResponse, error)
InvalidateCollectionMetaCache(req InvalidateCollMetaCacheRequest) error
InvalidateCollectionMetaCache(req InvalidateCollMetaCacheRequest) (Status, error)
}
```
......@@ -72,8 +72,10 @@ type ProxyNode interface {
Service
//SetTimeTickChannel(channelName string) error
//SetStatsChannel(channelName string) error
InvalidateCollectionMetaCache(request InvalidateCollMetaCacheRequest) (Status, error)
CreateCollection(req CreateCollectionRequest) error
CreateCollection(req CreateCollectionRequest) error
DropCollection(req DropCollectionRequest) error
HasCollection(req HasCollectionRequest) (bool, error)
LoadCollection(req LoadCollectionRequest) error
......@@ -92,12 +94,16 @@ type ProxyNode interface {
CreateIndex(req CreateIndexRequest) error
DescribeIndex(DescribeIndexRequest) (DescribeIndexResponse, error)
GetIndexState(IndexStateRequest) (IndexStateResponse, error)
Insert(req InsertRequest) (InsertResponse, error)
Search(req SearchRequest) (SearchResults, error)
Flush(req FlushRequest) error
GetPersistentSegmentInfo(req PersistentSegmentInfoRequest) (PersistentSegmentInfoResponse, error)
GetDdChannel(Empty) (StringResponse, error)
GetQuerySegmentInfo(QuerySegmentInfoRequest) (QuerySegmentInfoResponse, error)
GetPersistentSegmentInfo(PersistentSegmentInfoRequest) (PersistentSegmentInfoResponse, error)
}
```
......
......@@ -23,8 +23,9 @@ type DataService interface {
GetSegmentInfo(req SegmentInfoRequest) (SegmentInfoResponse, error)
GetInsertBinlogPaths(req InsertBinlogPathRequest) (InsertBinlogPathsResponse, error)
GetInsertChannels(req InsertChannelRequest) ([]string, error)
GetSegmentInfoChannel(req InsertChannelRequest) (StringResponse, error)
GetInsertChannels(req InsertChannelRequest) (StringList, error)
GetCollectionStatistics(req CollectionStatsRequest) (CollectionStatsResponse, error)
GetPartitionStatistics(req PartitionStatsRequest) (PartitionStatsResponse, error)
......@@ -253,16 +254,38 @@ type InsertRequest struct {
```go
type DataNode interface {
Service
GetComponentStates() (ComponentStates, error)
GetTimeTickChannel() (StringResponse, error)
GetStatisticsChannel() (StringResponse, error)
WatchDmChannels(req WatchDmChannelRequest) error
WatchDmChannels(WatchDmChannelRequest) error
FlushSegments(FlushSegRequest) (Status, error)
//WatchDdChannel(channelName string) error
//SetTimeTickChannel(channelName string) error
//SetStatisticsChannel(channelName string) error
FlushSegments(req FlushSegRequest) error
SetMasterServiceInterface(MasterServiceInterface) error
SetDataServiceInterface(DataServiceInterface) error
}
```
```go
type DataServiceInterface interface {
GetComponentStates() (ComponentStates, error)
RegisterNode(RegisterNodeRequest) (RegisterNodeResponse, error)
}
```
```go
type MasterServiceInterface interface {
GetComponentStates() (ComponentStates, error)
AllocID(IDRequest) (IDResponse, error)
ShowCollections(ShowCollectionRequest) (ShowCollectionResponse, error)
DescribeCollection(DescribeCollectionRequest) (DescribeCollectionResponse, error)
}
```
* *WatchDmChannels*
......
......@@ -7,10 +7,6 @@ import (
"log"
"time"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
......@@ -33,8 +29,8 @@ type (
// Component
GetComponentStates() (*internalpb2.ComponentStates, error)
GetTimeTickChannel() (string, error) // This function has no effect
GetStatisticsChannel() (string, error) // This function has no effect
GetTimeTickChannel() (*milvuspb.StringResponse, error) // This function has no effect
GetStatisticsChannel() (*milvuspb.StringResponse, error) // This function has no effect
WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error)
FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error)
......@@ -72,7 +68,6 @@ type (
flushChan chan *flushMsg
replica collectionReplica
tracer opentracing.Tracer
closer io.Closer
}
)
......@@ -177,25 +172,6 @@ func (node *DataNode) Init() error {
node.replica = replica
// --- Opentracing ---
cfg := &config.Configuration{
ServiceName: "data_node",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
Reporter: &config.ReporterConfig{
LogSpans: true,
},
}
tracer, closer, err := cfg.NewTracer(config.Logger(jaeger.StdLogger))
if err != nil {
return errors.Errorf("ERROR: cannot init Jaeger: %v\n", err)
}
node.tracer = tracer
node.closer = closer
opentracing.SetGlobalTracer(node.tracer)
return nil
}
......
package datanode
import (
"context"
"log"
"math"
"github.com/opentracing/opentracing-go"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
......@@ -35,28 +32,6 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
// TODO: add error handling
}
var childs []opentracing.Span
tracer := opentracing.GlobalTracer()
if tracer != nil {
for _, msg := range msgStreamMsg.TsMessages() {
if msg.Type() == commonpb.MsgType_kInsert {
var child opentracing.Span
ctx := msg.GetMsgContext()
if parent := opentracing.SpanFromContext(ctx); parent != nil {
child = tracer.StartSpan("pass filter node",
opentracing.FollowsFrom(parent.Context()))
} else {
child = tracer.StartSpan("pass filter node")
}
child.SetTag("hash keys", msg.HashKeys())
child.SetTag("start time", msg.BeginTs())
child.SetTag("end time", msg.EndTs())
msg.SetMsgContext(opentracing.ContextWithSpan(ctx, child))
childs = append(childs, child)
}
}
}
ddMsg, ok := (*in[1]).(*ddMsg)
if !ok {
log.Println("type assertion failed for ddMsg")
......@@ -77,20 +52,11 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
iMsg.flushMessages = append(iMsg.flushMessages, ddMsg.flushMessages...)
for key, msg := range msgStreamMsg.TsMessages() {
for _, msg := range msgStreamMsg.TsMessages() {
switch msg.Type() {
case commonpb.MsgType_kInsert:
var ctx2 context.Context
if childs != nil {
if childs[key] != nil {
ctx2 = opentracing.ContextWithSpan(msg.GetMsgContext(), childs[key])
} else {
ctx2 = context.Background()
}
}
resMsg := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg))
if resMsg != nil {
resMsg.SetMsgContext(ctx2)
iMsg.insertMessages = append(iMsg.insertMessages, resMsg)
}
// case commonpb.MsgType_kDelete:
......@@ -103,9 +69,6 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
iMsg.startPositions = append(iMsg.startPositions, msgStreamMsg.StartPositions()...)
iMsg.gcRecord = ddMsg.gcRecord
var res Msg = &iMsg
for _, child := range childs {
child.Finish()
}
return []*Msg{&res}
}
......
......@@ -4,7 +4,6 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"log"
"path"
"strconv"
......@@ -12,9 +11,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/opentracing/opentracing-go"
oplog "github.com/opentracing/opentracing-go/log"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
......@@ -155,23 +151,12 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
// iMsg is insertMsg
// 1. iMsg -> buffer
for _, msg := range iMsg.insertMessages {
ctx := msg.GetMsgContext()
var span opentracing.Span
if ctx != nil {
span, _ = opentracing.StartSpanFromContext(ctx, fmt.Sprintf("insert buffer node, start time = %d", msg.BeginTs()))
} else {
span = opentracing.StartSpan(fmt.Sprintf("insert buffer node, start time = %d", msg.BeginTs()))
}
span.SetTag("hash keys", msg.HashKeys())
span.SetTag("start time", msg.BeginTs())
span.SetTag("end time", msg.EndTs())
if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
log.Println("Error: misaligned messages detected")
continue
}
currentSegID := msg.GetSegmentID()
collectionID := msg.GetCollectionID()
span.LogFields(oplog.Int("segment id", int(currentSegID)))
idata, ok := ibNode.insertBuffer.insertData[currentSegID]
if !ok {
......@@ -426,11 +411,9 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
// 1.3 store in buffer
ibNode.insertBuffer.insertData[currentSegID] = idata
span.LogFields(oplog.String("store in buffer", "store in buffer"))
// 1.4 if full
// 1.4.1 generate binlogs
span.LogFields(oplog.String("generate binlogs", "generate binlogs"))
if ibNode.insertBuffer.full(currentSegID) {
log.Printf(". Insert Buffer full, auto flushing (%v) rows of data...", ibNode.insertBuffer.size(currentSegID))
......
......@@ -44,8 +44,8 @@ type DataService interface {
ShowSegments(req *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error)
GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error)
GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error)
GetSegmentInfoChannel() (string, error)
GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error)
GetSegmentInfoChannel() (*milvuspb.StringResponse, error)
GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error)
GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error)
GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error)
GetComponentStates() (*internalpb2.ComponentStates, error)
......@@ -438,12 +438,22 @@ func (s *Server) GetComponentStates() (*internalpb2.ComponentStates, error) {
return resp, nil
}
func (s *Server) GetTimeTickChannel() (string, error) {
return Params.TimeTickChannelName, nil
func (s *Server) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Value: Params.TimeTickChannelName,
}, nil
}
func (s *Server) GetStatisticsChannel() (string, error) {
return Params.StatisticsChannelName, nil
func (s *Server) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Value: Params.StatisticsChannelName,
}, nil
}
func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
......@@ -687,8 +697,13 @@ func (s *Server) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*dat
return resp, nil
}
func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) {
return s.insertChannels, nil
func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
return &internalpb2.StringList{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Values: s.insertChannels,
}, nil
}
func (s *Server) GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) {
......@@ -718,8 +733,13 @@ func (s *Server) GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*dat
return nil, nil
}
func (s *Server) GetSegmentInfoChannel() (string, error) {
return Params.SegmentInfoChannelName, nil
func (s *Server) GetSegmentInfoChannel() (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Value: Params.SegmentInfoChannelName,
}, nil
}
func (s *Server) GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) {
......
......@@ -2,9 +2,10 @@ package dataservice
import (
"context"
"errors"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"google.golang.org/grpc"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
......@@ -58,26 +59,12 @@ func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) {
return c.grpcClient.GetComponentStates(context.Background(), &commonpb.Empty{})
}
func (c *Client) GetTimeTickChannel() (string, error) {
resp, err := c.grpcClient.GetTimeTickChannel(context.Background(), &commonpb.Empty{})
if err != nil {
return "", err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return "", errors.New(resp.Status.Reason)
}
return resp.Value, nil
func (c *Client) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
return c.grpcClient.GetTimeTickChannel(context.Background(), &commonpb.Empty{})
}
func (c *Client) GetStatisticsChannel() (string, error) {
resp, err := c.grpcClient.GetStatisticsChannel(context.Background(), &commonpb.Empty{})
if err != nil {
return "", err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return "", errors.New(resp.Status.Reason)
}
return resp.Value, nil
func (c *Client) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
return c.grpcClient.GetStatisticsChannel(context.Background(), &commonpb.Empty{})
}
func (c *Client) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
......@@ -104,15 +91,8 @@ func (c *Client) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*dat
return c.grpcClient.GetInsertBinlogPaths(context.Background(), req)
}
func (c *Client) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) {
resp, err := c.grpcClient.GetInsertChannels(context.Background(), req)
if err != nil {
return nil, err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return nil, errors.New(resp.Status.Reason)
}
return resp.Values, nil
func (c *Client) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
return c.grpcClient.GetInsertChannels(context.Background(), req)
}
func (c *Client) GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) {
......@@ -123,15 +103,8 @@ func (c *Client) GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*dat
return c.grpcClient.GetPartitionStatistics(context.Background(), req)
}
func (c *Client) GetSegmentInfoChannel() (string, error) {
resp, err := c.grpcClient.GetSegmentInfoChannel(context.Background(), &commonpb.Empty{})
if err != nil {
return "", err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return "", errors.New(resp.Status.Reason)
}
return resp.Value, nil
func (c *Client) GetSegmentInfoChannel() (*milvuspb.StringResponse, error) {
return c.grpcClient.GetSegmentInfoChannel(context.Background(), &commonpb.Empty{})
}
func (c *Client) GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) {
......
......@@ -105,20 +105,7 @@ func (s *Service) GetInsertBinlogPaths(ctx context.Context, request *datapb.Inse
}
func (s *Service) GetInsertChannels(ctx context.Context, request *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
resp := &internalpb2.StringList{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}
channels, err := s.server.GetInsertChannels(request)
if err != nil {
resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
resp.Status.Reason = err.Error()
return resp, nil
}
resp.Values = channels
return resp, nil
return s.server.GetInsertChannels(request)
}
func (s *Service) GetCollectionStatistics(ctx context.Context, request *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) {
......@@ -134,53 +121,15 @@ func (s *Service) GetComponentStates(ctx context.Context, empty *commonpb.Empty)
}
func (s *Service) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
resp := &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}
channel, err := s.server.GetTimeTickChannel()
if err != nil {
resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
resp.Status.Reason = err.Error()
return resp, nil
}
resp.Value = channel
return resp, nil
return s.server.GetTimeTickChannel()
}
func (s *Service) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
resp := &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}
channel, err := s.server.GetStatisticsChannel()
if err != nil {
resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
resp.Status.Reason = err.Error()
return resp, nil
}
resp.Value = channel
return resp, nil
return s.server.GetStatisticsChannel()
}
func (s *Service) GetSegmentInfoChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
resp := &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}
channel, err := s.server.GetSegmentInfoChannel()
if err != nil {
resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
resp.Status.Reason = err.Error()
return resp, nil
}
resp.Value = channel
return resp, nil
return s.server.GetSegmentInfoChannel()
}
func (s *Service) GetCount(ctx context.Context, request *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) {
......
......@@ -2,9 +2,10 @@ package grpcindexnodeclient
import (
"context"
"errors"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
......@@ -47,28 +48,12 @@ func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) {
return c.grpcClient.GetComponentStates(context.Background(), &commonpb.Empty{})
}
func (c *Client) GetTimeTickChannel() (string, error) {
resp, err := c.grpcClient.GetTimeTickChannel(context.Background(), &commonpb.Empty{})
if err != nil {
return "", err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return "", errors.New(resp.Status.Reason)
}
return resp.Value, nil
func (c *Client) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
return c.grpcClient.GetTimeTickChannel(context.Background(), &commonpb.Empty{})
}
func (c *Client) GetStatisticsChannel() (string, error) {
resp, err := c.grpcClient.GetStatisticsChannel(context.Background(), &commonpb.Empty{})
if err != nil {
return "", err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return "", errors.New(resp.Status.Reason)
}
return resp.Value, nil
func (c *Client) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
return c.grpcClient.GetStatisticsChannel(context.Background(), &commonpb.Empty{})
}
func (c *Client) BuildIndex(req *indexpb.BuildIndexCmd) (*commonpb.Status, error) {
......
......@@ -142,35 +142,11 @@ func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty)
}
func (s *Server) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
ret, err := s.impl.GetTimeTickChannel()
resp := &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}
if err != nil {
resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
resp.Status.Reason = err.Error()
} else {
resp.Value = ret
}
return resp, nil
return s.impl.GetTimeTickChannel()
}
func (s *Server) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
ret, err := s.impl.GetStatisticsChannel()
resp := &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}
if err != nil {
resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
resp.Status.Reason = err.Error()
} else {
resp.Value = ret
}
return resp, nil
return s.impl.GetStatisticsChannel()
}
func NewServer(ctx context.Context) (*Server, error) {
......
......@@ -5,6 +5,8 @@ import (
"log"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"google.golang.org/grpc"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
......@@ -50,12 +52,20 @@ func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) {
return c.grpcClient.GetComponentStates(ctx, &commonpb.Empty{})
}
func (c *Client) GetTimeTickChannel() (string, error) {
return "", nil
func (c *Client) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}, nil
}
func (c *Client) GetStatisticsChannel() (string, error) {
return "", nil
func (c *Client) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}, nil
}
func (c *Client) RegisterNode(req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) {
......
......@@ -4,6 +4,8 @@ import (
"context"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"google.golang.org/grpc"
......@@ -39,9 +41,8 @@ func (c *Client) Stop() error {
return nil
}
func (c *Client) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error {
_, err := c.grpcClient.InvalidateCollectionMetaCache(c.ctx, request)
return err
func (c *Client) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
return c.grpcClient.InvalidateCollectionMetaCache(c.ctx, request)
}
func NewClient(ctx context.Context, address string) *Client {
......
......@@ -4,6 +4,8 @@ import (
"context"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"google.golang.org/grpc"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
......@@ -46,25 +48,25 @@ func (c *Client) RegisterNode(request *proxypb.RegisterNodeRequest) (*proxypb.Re
return c.proxyServiceClient.RegisterNode(c.ctx, request)
}
func (c *Client) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error {
func (c *Client) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
_, err := c.proxyServiceClient.InvalidateCollectionMetaCache(c.ctx, request)
return err
return nil, err
}
func (c *Client) GetTimeTickChannel() (string, error) {
response, err := c.proxyServiceClient.GetTimeTickChannel(c.ctx, &commonpb.Empty{})
if err != nil {
return "", err
}
return response.Value, nil
func (c *Client) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
return c.proxyServiceClient.GetTimeTickChannel(c.ctx, &commonpb.Empty{})
}
func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) {
return c.proxyServiceClient.GetComponentStates(c.ctx, &commonpb.Empty{})
}
func (c *Client) GetStatisticsChannel() (string, error) {
return "", nil
func (c *Client) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}, nil
}
func NewClient(address string) *Client {
......
......@@ -133,23 +133,11 @@ func (s *Server) RegisterNode(ctx context.Context, request *proxypb.RegisterNode
}
func (s *Server) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
return &commonpb.Status{}, s.impl.InvalidateCollectionMetaCache(request)
return s.impl.InvalidateCollectionMetaCache(request)
}
func (s *Server) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
channel, err := s.impl.GetTimeTickChannel()
if err != nil {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
},
Value: "",
}, nil
}
return &milvuspb.StringResponse{
Value: channel,
}, nil
return s.impl.GetTimeTickChannel()
}
func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
......
......@@ -5,6 +5,8 @@ import (
"log"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
......@@ -209,10 +211,18 @@ func (i *NodeImpl) GetComponentStates() (*internalpb2.ComponentStates, error) {
return ret, nil
}
func (i *NodeImpl) GetTimeTickChannel() (string, error) {
return "", nil
func (i *NodeImpl) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}, nil
}
func (i *NodeImpl) GetStatisticsChannel() (string, error) {
return "", nil
func (i *NodeImpl) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}, nil
}
......@@ -34,13 +34,13 @@ import (
// masterpb2 -> masterpb (master_service)
type ProxyServiceInterface interface {
GetTimeTickChannel() (string, error)
InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error
GetTimeTickChannel() (*milvuspb.StringResponse, error)
InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
}
type DataServiceInterface interface {
GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error)
GetSegmentInfoChannel() (string, error)
GetSegmentInfoChannel() (*milvuspb.StringResponse, error)
}
type IndexServiceInterface interface {
......@@ -608,11 +608,11 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error {
if err != nil {
return err
}
Params.ProxyTimeTickChannel = rsp
Params.ProxyTimeTickChannel = rsp.Value
log.Printf("proxy time tick channel name = %s", Params.ProxyTimeTickChannel)
c.InvalidateCollectionMetaCache = func(ts typeutil.Timestamp, dbName string, collectionName string) error {
err := s.InvalidateCollectionMetaCache(&proxypb.InvalidateCollMetaCacheRequest{
status, _ := s.InvalidateCollectionMetaCache(&proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO,MsgType
MsgID: 0,
......@@ -622,8 +622,11 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error {
DbName: dbName,
CollectionName: collectionName,
})
if err != nil {
return err
if status == nil {
return errors.New("invalidate collection metacache resp is nil")
}
if status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(status.Reason)
}
return nil
}
......@@ -635,7 +638,7 @@ func (c *Core) SetDataService(s DataServiceInterface) error {
if err != nil {
return err
}
Params.DataServiceSegmentChannel = rsp
Params.DataServiceSegmentChannel = rsp.Value
log.Printf("data service segment channel name = %s", Params.DataServiceSegmentChannel)
c.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) {
......
......@@ -28,14 +28,21 @@ type proxyMock struct {
mutex sync.Mutex
}
func (p *proxyMock) GetTimeTickChannel() (string, error) {
return fmt.Sprintf("proxy-time-tick-%d", p.randVal), nil
func (p *proxyMock) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Value: fmt.Sprintf("proxy-time-tick-%d", p.randVal),
}, nil
}
func (p *proxyMock) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error {
func (p *proxyMock) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.collArray = append(p.collArray, request.CollectionName)
return nil
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}, nil
}
func (p *proxyMock) GetCollArray() []string {
p.mutex.Lock()
......@@ -72,8 +79,13 @@ func (d *dataMock) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*d
return rst, nil
}
func (d *dataMock) GetSegmentInfoChannel() (string, error) {
return fmt.Sprintf("segment-info-channel-%d", d.randVal), nil
func (d *dataMock) GetSegmentInfoChannel() (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Value: fmt.Sprintf("segment-info-channel-%d", d.randVal),
}, nil
}
type indexMock struct {
......
......@@ -2,9 +2,9 @@ package msgstream
import (
"context"
"errors"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
......@@ -14,8 +14,6 @@ type MsgType = commonpb.MsgType
type MarshalType = interface{}
type TsMsg interface {
GetMsgContext() context.Context
SetMsgContext(context.Context)
BeginTs() Timestamp
EndTs() Timestamp
Type() MsgType
......@@ -59,7 +57,7 @@ func ConvertToByteArray(input interface{}) ([]byte, error) {
case []byte:
return output, nil
default:
return nil, errors.New("Cannot convert interface{} to []byte")
return nil, errors.New("cannot convert interface{} to []byte")
}
}
......@@ -73,14 +71,6 @@ func (it *InsertMsg) Type() MsgType {
return it.Base.MsgType
}
func (it *InsertMsg) GetMsgContext() context.Context {
return it.MsgCtx
}
func (it *InsertMsg) SetMsgContext(ctx context.Context) {
it.MsgCtx = ctx
}
func (it *InsertMsg) Marshal(input TsMsg) (MarshalType, error) {
insertMsg := input.(*InsertMsg)
insertRequest := &insertMsg.InsertRequest
......@@ -129,13 +119,6 @@ func (fl *FlushCompletedMsg) Type() MsgType {
return fl.Base.MsgType
}
func (fl *FlushCompletedMsg) GetMsgContext() context.Context {
return fl.MsgCtx
}
func (fl *FlushCompletedMsg) SetMsgContext(ctx context.Context) {
fl.MsgCtx = ctx
}
func (fl *FlushCompletedMsg) Marshal(input TsMsg) (MarshalType, error) {
flushCompletedMsgTask := input.(*FlushCompletedMsg)
flushCompletedMsg := &flushCompletedMsgTask.SegmentFlushCompletedMsg
......@@ -174,13 +157,6 @@ func (fl *FlushMsg) Type() MsgType {
return fl.Base.MsgType
}
func (fl *FlushMsg) GetMsgContext() context.Context {
return fl.MsgCtx
}
func (fl *FlushMsg) SetMsgContext(ctx context.Context) {
fl.MsgCtx = ctx
}
func (fl *FlushMsg) Marshal(input TsMsg) (MarshalType, error) {
flushMsgTask := input.(*FlushMsg)
flushMsg := &flushMsgTask.FlushMsg
......@@ -218,14 +194,6 @@ func (dt *DeleteMsg) Type() MsgType {
return dt.Base.MsgType
}
func (dt *DeleteMsg) GetMsgContext() context.Context {
return dt.MsgCtx
}
func (dt *DeleteMsg) SetMsgContext(ctx context.Context) {
dt.MsgCtx = ctx
}
func (dt *DeleteMsg) Marshal(input TsMsg) (MarshalType, error) {
deleteMsg := input.(*DeleteMsg)
deleteRequest := &deleteMsg.DeleteRequest
......@@ -275,14 +243,6 @@ func (st *SearchMsg) Type() MsgType {
return st.Base.MsgType
}
func (st *SearchMsg) GetMsgContext() context.Context {
return st.MsgCtx
}
func (st *SearchMsg) SetMsgContext(ctx context.Context) {
st.MsgCtx = ctx
}
func (st *SearchMsg) Marshal(input TsMsg) (MarshalType, error) {
searchTask := input.(*SearchMsg)
searchRequest := &searchTask.SearchRequest
......@@ -320,14 +280,6 @@ func (srt *SearchResultMsg) Type() MsgType {
return srt.Base.MsgType
}
func (srt *SearchResultMsg) GetMsgContext() context.Context {
return srt.MsgCtx
}
func (srt *SearchResultMsg) SetMsgContext(ctx context.Context) {
srt.MsgCtx = ctx
}
func (srt *SearchResultMsg) Marshal(input TsMsg) (MarshalType, error) {
searchResultTask := input.(*SearchResultMsg)
searchResultRequest := &searchResultTask.SearchResults
......@@ -365,14 +317,6 @@ func (tst *TimeTickMsg) Type() MsgType {
return tst.Base.MsgType
}
func (tst *TimeTickMsg) GetMsgContext() context.Context {
return tst.MsgCtx
}
func (tst *TimeTickMsg) SetMsgContext(ctx context.Context) {
tst.MsgCtx = ctx
}
func (tst *TimeTickMsg) Marshal(input TsMsg) (MarshalType, error) {
timeTickTask := input.(*TimeTickMsg)
timeTick := &timeTickTask.TimeTickMsg
......@@ -411,14 +355,6 @@ func (qs *QueryNodeStatsMsg) Type() MsgType {
return qs.Base.MsgType
}
func (qs *QueryNodeStatsMsg) GetMsgContext() context.Context {
return qs.MsgCtx
}
func (qs *QueryNodeStatsMsg) SetMsgContext(ctx context.Context) {
qs.MsgCtx = ctx
}
func (qs *QueryNodeStatsMsg) Marshal(input TsMsg) (MarshalType, error) {
queryNodeSegStatsTask := input.(*QueryNodeStatsMsg)
queryNodeSegStats := &queryNodeSegStatsTask.QueryNodeStats
......@@ -454,14 +390,6 @@ func (ss *SegmentStatisticsMsg) Type() MsgType {
return ss.Base.MsgType
}
func (ss *SegmentStatisticsMsg) GetMsgContext() context.Context {
return ss.MsgCtx
}
func (ss *SegmentStatisticsMsg) SetMsgContext(ctx context.Context) {
ss.MsgCtx = ctx
}
func (ss *SegmentStatisticsMsg) Marshal(input TsMsg) (MarshalType, error) {
segStatsTask := input.(*SegmentStatisticsMsg)
segStats := &segStatsTask.SegmentStatistics
......@@ -507,14 +435,6 @@ func (cc *CreateCollectionMsg) Type() MsgType {
return cc.Base.MsgType
}
func (cc *CreateCollectionMsg) GetMsgContext() context.Context {
return cc.MsgCtx
}
func (cc *CreateCollectionMsg) SetMsgContext(ctx context.Context) {
cc.MsgCtx = ctx
}
func (cc *CreateCollectionMsg) Marshal(input TsMsg) (MarshalType, error) {
createCollectionMsg := input.(*CreateCollectionMsg)
createCollectionRequest := &createCollectionMsg.CreateCollectionRequest
......@@ -551,13 +471,6 @@ type DropCollectionMsg struct {
func (dc *DropCollectionMsg) Type() MsgType {
return dc.Base.MsgType
}
func (dc *DropCollectionMsg) GetMsgContext() context.Context {
return dc.MsgCtx
}
func (dc *DropCollectionMsg) SetMsgContext(ctx context.Context) {
dc.MsgCtx = ctx
}
func (dc *DropCollectionMsg) Marshal(input TsMsg) (MarshalType, error) {
dropCollectionMsg := input.(*DropCollectionMsg)
......@@ -592,14 +505,6 @@ type CreatePartitionMsg struct {
internalpb2.CreatePartitionRequest
}
func (cc *CreatePartitionMsg) GetMsgContext() context.Context {
return cc.MsgCtx
}
func (cc *CreatePartitionMsg) SetMsgContext(ctx context.Context) {
cc.MsgCtx = ctx
}
func (cc *CreatePartitionMsg) Type() MsgType {
return cc.Base.MsgType
}
......@@ -637,14 +542,6 @@ type DropPartitionMsg struct {
internalpb2.DropPartitionRequest
}
func (dc *DropPartitionMsg) GetMsgContext() context.Context {
return dc.MsgCtx
}
func (dc *DropPartitionMsg) SetMsgContext(ctx context.Context) {
dc.MsgCtx = ctx
}
func (dc *DropPartitionMsg) Type() MsgType {
return dc.Base.MsgType
}
......@@ -729,14 +626,6 @@ func (sim *SegmentInfoMsg) Type() MsgType {
return sim.Base.MsgType
}
func (sim *SegmentInfoMsg) GetMsgContext() context.Context {
return sim.MsgCtx
}
func (sim *SegmentInfoMsg) SetMsgContext(ctx context.Context) {
sim.MsgCtx = ctx
}
func (sim *SegmentInfoMsg) Marshal(input TsMsg) (MarshalType, error) {
segInfoMsg := input.(*SegmentInfoMsg)
mb, err := proto.Marshal(&segInfoMsg.SegmentMsg)
......
......@@ -12,10 +12,6 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
"github.com/golang/protobuf/proto"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
oplog "github.com/opentracing/opentracing-go/log"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
......@@ -247,49 +243,12 @@ func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error {
msg := &pulsar.ProducerMessage{Payload: m}
var child opentracing.Span
if v.Msgs[i].Type() == commonpb.MsgType_kInsert ||
v.Msgs[i].Type() == commonpb.MsgType_kSearch ||
v.Msgs[i].Type() == commonpb.MsgType_kSearchResult {
tracer := opentracing.GlobalTracer()
ctx := v.Msgs[i].GetMsgContext()
if ctx == nil {
ctx = context.Background()
}
if parent := opentracing.SpanFromContext(ctx); parent != nil {
child = tracer.StartSpan("start send pulsar msg",
opentracing.FollowsFrom(parent.Context()))
} else {
child = tracer.StartSpan("start send pulsar msg")
}
child.SetTag("hash keys", v.Msgs[i].HashKeys())
child.SetTag("start time", v.Msgs[i].BeginTs())
child.SetTag("end time", v.Msgs[i].EndTs())
child.SetTag("msg type", v.Msgs[i].Type())
msg.Properties = make(map[string]string)
err = tracer.Inject(child.Context(), opentracing.TextMap, &propertiesReaderWriter{msg.Properties})
if err != nil {
child.LogFields(oplog.Error(err))
child.Finish()
return err
}
child.LogFields(oplog.String("inject success", "inject success"))
}
if _, err := ms.producers[k].Send(
context.Background(),
msg,
); err != nil {
if child != nil {
child.LogFields(oplog.Error(err))
child.Finish()
}
return err
}
if child != nil {
child.Finish()
}
}
}
return nil
......@@ -309,49 +268,14 @@ func (ms *PulsarMsgStream) Broadcast(msgPack *MsgPack) error {
}
msg := &pulsar.ProducerMessage{Payload: m}
var child opentracing.Span
if v.Type() == commonpb.MsgType_kInsert ||
v.Type() == commonpb.MsgType_kSearch ||
v.Type() == commonpb.MsgType_kSearchResult {
tracer := opentracing.GlobalTracer()
ctx := v.GetMsgContext()
if ctx == nil {
ctx = context.Background()
}
if parent := opentracing.SpanFromContext(ctx); parent != nil {
child = tracer.StartSpan("start send pulsar msg",
opentracing.FollowsFrom(parent.Context()))
} else {
child = tracer.StartSpan("start send pulsar msg, start time: %d")
}
child.SetTag("hash keys", v.HashKeys())
child.SetTag("start time", v.BeginTs())
child.SetTag("end time", v.EndTs())
child.SetTag("msg type", v.Type())
msg.Properties = make(map[string]string)
err = tracer.Inject(child.Context(), opentracing.TextMap, &propertiesReaderWriter{msg.Properties})
if err != nil {
child.LogFields(oplog.Error(err))
child.Finish()
return err
}
child.LogFields(oplog.String("inject success", "inject success"))
}
for i := 0; i < producerLen; i++ {
if _, err := ms.producers[i].Send(
context.Background(),
msg,
); err != nil {
if child != nil {
child.LogFields(oplog.Error(err))
child.Finish()
}
return err
}
}
if child != nil {
child.Finish()
}
}
return nil
}
......@@ -411,23 +335,6 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() {
continue
}
tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.Base.MsgType)
if tsMsg.Type() == commonpb.MsgType_kSearch ||
tsMsg.Type() == commonpb.MsgType_kSearchResult {
tracer := opentracing.GlobalTracer()
spanContext, err := tracer.Extract(opentracing.HTTPHeaders, &propertiesReaderWriter{pulsarMsg.Properties()})
if err != nil {
log.Println("extract message err")
log.Println(err.Error())
}
span := opentracing.StartSpan("pulsar msg received",
ext.RPCServerOption(spanContext))
span.SetTag("msg type", tsMsg.Type())
span.SetTag("hash keys", tsMsg.HashKeys())
span.SetTag("start time", tsMsg.BeginTs())
span.SetTag("end time", tsMsg.EndTs())
tsMsg.SetMsgContext(opentracing.ContextWithSpan(context.Background(), span))
span.Finish()
}
if err != nil {
log.Printf("Failed to unmarshal tsMsg, error = %v", err)
continue
......@@ -521,8 +428,6 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
ms.unsolvedBuf = make(map[Consumer][]TsMsg)
isChannelReady := make(map[Consumer]bool)
eofMsgTimeStamp := make(map[Consumer]Timestamp)
spans := make(map[Timestamp]opentracing.Span)
ctxs := make(map[Timestamp]context.Context)
for _, consumer := range ms.consumers {
ms.unsolvedBuf[consumer] = make([]TsMsg, 0)
}
......@@ -558,22 +463,8 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
timeTickMsg = v
continue
}
var ctx context.Context
var span opentracing.Span
if v.Type() == commonpb.MsgType_kInsert {
if _, ok := spans[v.BeginTs()]; !ok {
span, ctx = opentracing.StartSpanFromContext(v.GetMsgContext(), "after find time tick")
ctxs[v.BeginTs()] = ctx
spans[v.BeginTs()] = span
}
}
if v.EndTs() <= timeStamp {
timeTickBuf = append(timeTickBuf, v)
if v.Type() == commonpb.MsgType_kInsert {
v.SetMsgContext(ctxs[v.BeginTs()])
spans[v.BeginTs()].Finish()
delete(spans, v.BeginTs())
}
} else {
tempBuffer = append(tempBuffer, v)
}
......@@ -643,23 +534,6 @@ func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer,
MsgID: typeutil.PulsarMsgIDToString(pulsarMsg.ID()),
})
if tsMsg.Type() == commonpb.MsgType_kInsert {
tracer := opentracing.GlobalTracer()
spanContext, err := tracer.Extract(opentracing.HTTPHeaders, &propertiesReaderWriter{pulsarMsg.Properties()})
if err != nil {
log.Println("extract message err")
log.Println(err.Error())
}
span := opentracing.StartSpan("pulsar msg received",
ext.RPCServerOption(spanContext))
span.SetTag("hash keys", tsMsg.HashKeys())
span.SetTag("start time", tsMsg.BeginTs())
span.SetTag("end time", tsMsg.EndTs())
span.SetTag("msg type", tsMsg.Type())
tsMsg.SetMsgContext(opentracing.ContextWithSpan(context.Background(), span))
span.Finish()
}
mu.Lock()
ms.unsolvedBuf[consumer] = append(ms.unsolvedBuf[consumer], tsMsg)
mu.Unlock()
......
......@@ -53,9 +53,6 @@ func InsertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e
}
insertMsg := &msgstream.InsertMsg{
BaseMsg: BaseMsg{
MsgCtx: request.GetMsgContext(),
},
InsertRequest: sliceRequest,
}
result[key].Msgs = append(result[key].Msgs, insertMsg)
......@@ -102,9 +99,6 @@ func DeleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e
}
deleteMsg := &msgstream.DeleteMsg{
BaseMsg: BaseMsg{
MsgCtx: request.GetMsgContext(),
},
DeleteRequest: sliceRequest,
}
result[key].Msgs = append(result[key].Msgs, deleteMsg)
......
......@@ -2,21 +2,18 @@ package proxynode
import (
"context"
"errors"
"log"
"strconv"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
)
const (
......
......@@ -49,7 +49,7 @@ type QueryServiceClient interface {
type DataServiceClient interface {
AssignSegmentID(req *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error)
GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error)
GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error)
Flush(req *datapb.FlushRequest) (*commonpb.Status, error)
GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error)
......@@ -58,7 +58,7 @@ type DataServiceClient interface {
}
type ProxyServiceClient interface {
GetTimeTickChannel() (string, error)
GetTimeTickChannel() (*milvuspb.StringResponse, error)
RegisterNode(request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error)
GetComponentStates() (*internalpb2.ComponentStates, error)
}
......
......@@ -19,9 +19,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
......@@ -57,7 +54,6 @@ type NodeImpl struct {
manipulationMsgStream msgstream.MsgStream
queryMsgStream msgstream.MsgStream
tracer opentracing.Tracer
closer io.Closer
// Add callback functions at different stages
......@@ -106,7 +102,6 @@ func (node *NodeImpl) waitForServiceReady(service Component, serviceName string)
}
func (node *NodeImpl) Init() error {
factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamSearchBufSize, 1024)
// todo wait for proxyservice state changed to Healthy
......@@ -136,6 +131,8 @@ func (node *NodeImpl) Init() error {
return err
}
factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamSearchBufSize, 1024)
// wait for dataservice state changed to Healthy
if node.dataServiceClient != nil {
err = node.waitForServiceReady(node.dataServiceClient, "DataService")
......@@ -182,19 +179,6 @@ func (node *NodeImpl) Init() error {
// return err
//}
cfg := &config.Configuration{
ServiceName: "proxynode",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
node.tracer, node.closer, err = cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(node.tracer)
node.queryMsgStream, _ = factory.NewMsgStream(node.ctx)
node.queryMsgStream.AsProducer(Params.SearchChannelNames)
log.Println("create query message stream ...")
......
......@@ -214,7 +214,6 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg,
insertMsg := &msgstream.InsertMsg{
InsertRequest: sliceRequest,
}
insertMsg.SetMsgContext(request.GetMsgContext())
if together { // all rows with same hash value are accumulated to only one message
if len(result[key].Msgs) <= 0 {
result[key].Msgs = append(result[key].Msgs, insertMsg)
......
package proxynode
import (
"context"
"errors"
"log"
"math"
......@@ -146,7 +147,6 @@ func (it *InsertTask) Execute() error {
EndTs: it.EndTs(),
Msgs: make([]msgstream.TsMsg, 1),
}
tsMsg.SetMsgContext(it.Ctx())
it.result = &milvuspb.InsertResponse{
Status: &commonpb.Status{
......@@ -160,7 +160,7 @@ func (it *InsertTask) Execute() error {
stream, err := globalInsertChannelsMap.getInsertMsgStream(collID)
if err != nil {
collectionInsertChannels, err := it.dataServiceClient.GetInsertChannels(&datapb.InsertChannelRequest{
resp, _ := it.dataServiceClient.GetInsertChannels(&datapb.InsertChannelRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kInsert, // todo
MsgID: it.Base.MsgID, // todo
......@@ -170,10 +170,13 @@ func (it *InsertTask) Execute() error {
DbID: 0, // todo
CollectionID: collID,
})
if err != nil {
return err
if resp == nil {
return errors.New("get insert channels resp is nil")
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(resp.Status.Reason)
}
err = globalInsertChannelsMap.createInsertMsgStream(collID, collectionInsertChannels)
err = globalInsertChannelsMap.createInsertMsgStream(collID, resp.Values)
if err != nil {
return err
}
......@@ -315,7 +318,7 @@ func (cct *CreateCollectionTask) Execute() error {
if err != nil {
return err
}
collectionInsertChannels, err := cct.dataServiceClient.GetInsertChannels(&datapb.InsertChannelRequest{
resp, _ := cct.dataServiceClient.GetInsertChannels(&datapb.InsertChannelRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kInsert, // todo
MsgID: cct.Base.MsgID, // todo
......@@ -325,10 +328,13 @@ func (cct *CreateCollectionTask) Execute() error {
DbID: 0, // todo
CollectionID: collID,
})
if err != nil {
return err
if resp == nil {
return errors.New("get insert channels resp is nil")
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(resp.Status.Reason)
}
err = globalInsertChannelsMap.createInsertMsgStream(collID, collectionInsertChannels)
err = globalInsertChannelsMap.createInsertMsgStream(collID, resp.Values)
if err != nil {
return err
}
......@@ -387,19 +393,19 @@ func (dct *DropCollectionTask) PreExecute() error {
}
func (dct *DropCollectionTask) Execute() error {
var err error
collID, err := globalMetaCache.GetCollectionID(dct.CollectionName)
if err != nil {
return err
}
dct.result, err = dct.masterClient.DropCollection(dct.DropCollectionRequest)
if dct.result.ErrorCode == commonpb.ErrorCode_SUCCESS {
err = globalInsertChannelsMap.closeInsertMsgStream(collID)
if err != nil {
return err
}
dct.result, _ = dct.masterClient.DropCollection(dct.DropCollectionRequest)
if dct.result.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(dct.result.Reason)
}
return err
err = globalInsertChannelsMap.closeInsertMsgStream(collID)
if err != nil {
return err
}
return nil
}
func (dct *DropCollectionTask) PostExecute() error {
......@@ -507,7 +513,6 @@ func (st *SearchTask) Execute() error {
EndTs: st.Base.Timestamp,
Msgs: make([]msgstream.TsMsg, 1),
}
tsMsg.SetMsgContext(st.Ctx())
msgPack.Msgs[0] = tsMsg
err := st.queryMsgStream.Produce(msgPack)
log.Printf("[NodeImpl] length of searchMsg: %v", len(msgPack.Msgs))
......@@ -719,6 +724,12 @@ func (hct *HasCollectionTask) PreExecute() error {
func (hct *HasCollectionTask) Execute() error {
var err error
hct.result, err = hct.masterClient.HasCollection(hct.HasCollectionRequest)
if hct.result == nil {
return errors.New("has collection resp is nil")
}
if hct.result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(hct.result.Status.Reason)
}
return err
}
......@@ -775,10 +786,13 @@ func (dct *DescribeCollectionTask) PreExecute() error {
func (dct *DescribeCollectionTask) Execute() error {
var err error
dct.result, err = dct.masterClient.DescribeCollection(dct.DescribeCollectionRequest)
if err != nil {
return err
if dct.result == nil {
return errors.New("has collection resp is nil")
}
return nil
if dct.result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(dct.result.Status.Reason)
}
return err
}
func (dct *DescribeCollectionTask) PostExecute() error {
......@@ -842,9 +856,12 @@ func (g *GetCollectionsStatisticsTask) Execute() error {
CollectionID: collID,
}
result, err := g.dataServiceClient.GetCollectionStatistics(req)
if err != nil {
return err
result, _ := g.dataServiceClient.GetCollectionStatistics(req)
if result == nil {
return errors.New("get collection statistics resp is nil")
}
if result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(result.Status.Reason)
}
g.result = &milvuspb.CollectionStatsResponse{
Status: &commonpb.Status{
......@@ -865,6 +882,7 @@ type ShowCollectionsTask struct {
*milvuspb.ShowCollectionRequest
masterClient MasterClient
result *milvuspb.ShowCollectionResponse
ctx context.Context
}
func (sct *ShowCollectionsTask) OnEnqueue() error {
......@@ -906,6 +924,12 @@ func (sct *ShowCollectionsTask) PreExecute() error {
func (sct *ShowCollectionsTask) Execute() error {
var err error
sct.result, err = sct.masterClient.ShowCollections(sct.ShowCollectionRequest)
if sct.result == nil {
return errors.New("get collection statistics resp is nil")
}
if sct.result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(sct.result.Status.Reason)
}
return err
}
......@@ -968,6 +992,12 @@ func (cpt *CreatePartitionTask) PreExecute() error {
func (cpt *CreatePartitionTask) Execute() (err error) {
cpt.result, err = cpt.masterClient.CreatePartition(cpt.CreatePartitionRequest)
if cpt.result == nil {
return errors.New("get collection statistics resp is nil")
}
if cpt.result.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(cpt.result.Reason)
}
return err
}
......@@ -1030,6 +1060,12 @@ func (dpt *DropPartitionTask) PreExecute() error {
func (dpt *DropPartitionTask) Execute() (err error) {
dpt.result, err = dpt.masterClient.DropPartition(dpt.DropPartitionRequest)
if dpt.result == nil {
return errors.New("get collection statistics resp is nil")
}
if dpt.result.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(dpt.result.Reason)
}
return err
}
......@@ -1091,6 +1127,12 @@ func (hpt *HasPartitionTask) PreExecute() error {
func (hpt *HasPartitionTask) Execute() (err error) {
hpt.result, err = hpt.masterClient.HasPartition(hpt.HasPartitionRequest)
if hpt.result == nil {
return errors.New("get collection statistics resp is nil")
}
if hpt.result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(hpt.result.Status.Reason)
}
return err
}
......@@ -1147,10 +1189,13 @@ func (spt *ShowPartitionsTask) PreExecute() error {
func (spt *ShowPartitionsTask) Execute() error {
var err error
spt.result, err = spt.masterClient.ShowPartitions(spt.ShowPartitionRequest)
if err != nil {
return err
if spt.result == nil {
return errors.New("get collection statistics resp is nil")
}
return nil
if spt.result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(spt.result.Status.Reason)
}
return err
}
func (spt *ShowPartitionsTask) PostExecute() error {
......@@ -1210,8 +1255,15 @@ func (cit *CreateIndexTask) PreExecute() error {
return nil
}
func (cit *CreateIndexTask) Execute() (err error) {
func (cit *CreateIndexTask) Execute() error {
var err error
cit.result, err = cit.masterClient.CreateIndex(cit.CreateIndexRequest)
if cit.result == nil {
return errors.New("get collection statistics resp is nil")
}
if cit.result.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(cit.result.Reason)
}
return err
}
......@@ -1275,6 +1327,12 @@ func (dit *DescribeIndexTask) PreExecute() error {
func (dit *DescribeIndexTask) Execute() error {
var err error
dit.result, err = dit.masterClient.DescribeIndex(dit.DescribeIndexRequest)
if dit.result == nil {
return errors.New("get collection statistics resp is nil")
}
if dit.result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(dit.result.Status.Reason)
}
return err
}
......@@ -1495,9 +1553,9 @@ func (ft *FlushTask) Execute() error {
CollectionID: collID,
}
var status *commonpb.Status
status, err = ft.dataServiceClient.Flush(flushReq)
if err != nil {
return nil
status, _ = ft.dataServiceClient.Flush(flushReq)
if status == nil {
return errors.New("flush resp is nil")
}
if status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(status.Reason)
......
......@@ -114,7 +114,7 @@ func (s *ServiceImpl) Init() error {
}
insertTickMsgStream, _ := factory.NewMsgStream(s.ctx)
insertTickMsgStream.AsProducer(channels)
log.Println("create service time tick producer channel: ", channels)
log.Println("create insert time tick producer channel: ", channels)
nodeTimeTickMsgStream, _ := factory.NewMsgStream(s.ctx)
nodeTimeTickMsgStream.AsConsumer(Params.NodeTimeTickChannel,
......@@ -175,11 +175,16 @@ func (s *ServiceImpl) UpdateStateCode(code internalpb2.StateCode) {
s.stateCode = code
}
func (s *ServiceImpl) GetTimeTickChannel() (string, error) {
return Params.ServiceTimeTickChannel, nil
func (s *ServiceImpl) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Value: Params.ServiceTimeTickChannel,
}, nil
}
func (s *ServiceImpl) GetStatisticsChannel() (string, error) {
func (s *ServiceImpl) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
panic("implement me")
}
......@@ -260,7 +265,7 @@ func (s *ServiceImpl) RegisterNode(request *proxypb.RegisterNodeRequest) (*proxy
return t.response, nil
}
func (s *ServiceImpl) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error {
func (s *ServiceImpl) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
log.Println("InvalidateCollectionMetaCache")
ctx, cancel := context.WithTimeout(s.ctx, timeoutInterval)
defer cancel()
......@@ -275,13 +280,13 @@ func (s *ServiceImpl) InvalidateCollectionMetaCache(request *proxypb.InvalidateC
err = s.sched.InvalidateCollectionMetaCacheTaskQueue.Enqueue(t)
if err != nil {
return err
return nil, err
}
err = t.WaitToFinish()
if err != nil {
return err
return nil, err
}
return nil
return nil, nil
}
package proxyservice
import (
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
......@@ -15,5 +16,5 @@ type ProxyService interface {
RegisterLink() (*milvuspb.RegisterLinkResponse, error)
RegisterNode(request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error)
// TODO: i'm sure it's not a best way to keep consistency, fix me
InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error
InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
}
......@@ -8,6 +8,8 @@ import (
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
grpcproxynodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/proxynode/client"
"github.com/zilliztech/milvus-distributed/internal/errors"
......@@ -25,7 +27,7 @@ type NodeClient interface {
Start() error
Stop() error
InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error
InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
}
type GlobalNodeInfoTable struct {
......
......@@ -149,9 +149,12 @@ func (t *InvalidateCollectionMetaCacheTask) Execute() error {
return err
}
for _, c := range clients {
err = c.InvalidateCollectionMetaCache(t.request)
if err != nil {
return err
status, _ := c.InvalidateCollectionMetaCache(t.request)
if status == nil {
return errors.New("invalidate collection meta cache error")
}
if status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(status.Reason)
}
}
return nil
......
package querynode
import (
"context"
"log"
"math"
"github.com/opentracing/opentracing-go"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
)
......@@ -34,28 +32,6 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
// TODO: add error handling
}
var childs []opentracing.Span
tracer := opentracing.GlobalTracer()
if tracer != nil && msgStreamMsg != nil {
for _, msg := range msgStreamMsg.TsMessages() {
if msg.Type() == commonpb.MsgType_kInsert || msg.Type() == commonpb.MsgType_kSearch {
var child opentracing.Span
ctx := msg.GetMsgContext()
if parent := opentracing.SpanFromContext(ctx); parent != nil {
child = tracer.StartSpan("pass filter node",
opentracing.FollowsFrom(parent.Context()))
} else {
child = tracer.StartSpan("pass filter node")
}
child.SetTag("hash keys", msg.HashKeys())
child.SetTag("start time", msg.BeginTs())
child.SetTag("end time", msg.EndTs())
msg.SetMsgContext(opentracing.ContextWithSpan(ctx, child))
childs = append(childs, child)
}
}
}
ddMsg, ok := (*in[1]).(*ddMsg)
if !ok {
log.Println("type assertion failed for ddMsg")
......@@ -70,20 +46,11 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
timestampMax: msgStreamMsg.TimestampMax(),
},
}
for key, msg := range msgStreamMsg.TsMessages() {
for _, msg := range msgStreamMsg.TsMessages() {
switch msg.Type() {
case commonpb.MsgType_kInsert:
var ctx2 context.Context
if childs != nil {
if childs[key] != nil {
ctx2 = opentracing.ContextWithSpan(msg.GetMsgContext(), childs[key])
} else {
ctx2 = context.Background()
}
}
resMsg := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg))
if resMsg != nil {
resMsg.SetMsgContext(ctx2)
iMsg.insertMessages = append(iMsg.insertMessages, resMsg)
}
// case commonpb.MsgType_kDelete:
......@@ -96,9 +63,6 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
iMsg.gcRecord = ddMsg.gcRecord
var res Msg = &iMsg
for _, child := range childs {
child.Finish()
}
return []*Msg{&res}
}
......
......@@ -6,8 +6,6 @@ import (
"log"
"sync"
"github.com/opentracing/opentracing-go"
oplog "github.com/opentracing/opentracing-go/log"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
)
......@@ -42,30 +40,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
// TODO: add error handling
}
var childs []opentracing.Span
tracer := opentracing.GlobalTracer()
if tracer != nil && iMsg != nil {
for _, msg := range iMsg.insertMessages {
if msg.Type() == commonpb.MsgType_kInsert || msg.Type() == commonpb.MsgType_kSearch {
var child opentracing.Span
ctx := msg.GetMsgContext()
if parent := opentracing.SpanFromContext(ctx); parent != nil {
child = tracer.StartSpan("pass insert node",
opentracing.FollowsFrom(parent.Context()))
} else {
child = tracer.StartSpan("pass insert node")
}
child.SetTag("hash keys", msg.HashKeys())
child.SetTag("start time", msg.BeginTs())
child.SetTag("end time", msg.EndTs())
msg.SetMsgContext(opentracing.ContextWithSpan(ctx, child))
childs = append(childs, child)
}
}
}
insertData := InsertData{
insertContext: make(map[int64]context.Context),
insertIDs: make(map[int64][]int64),
insertTimestamps: make(map[int64][]uint64),
insertRecords: make(map[int64][]*commonpb.Blob),
......@@ -74,7 +49,6 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
// 1. hash insertMessages to insertData
for _, task := range iMsg.insertMessages {
insertData.insertContext[task.SegmentID] = task.GetMsgContext()
insertData.insertIDs[task.SegmentID] = append(insertData.insertIDs[task.SegmentID], task.RowIDs...)
insertData.insertTimestamps[task.SegmentID] = append(insertData.insertTimestamps[task.SegmentID], task.Timestamps...)
insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...)
......@@ -108,7 +82,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
wg := sync.WaitGroup{}
for segmentID := range insertData.insertRecords {
wg.Add(1)
go iNode.insert(insertData.insertContext[segmentID], &insertData, segmentID, &wg)
go iNode.insert(&insertData, segmentID, &wg)
}
wg.Wait()
......@@ -116,21 +90,15 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
gcRecord: iMsg.gcRecord,
timeRange: iMsg.timeRange,
}
for _, child := range childs {
child.Finish()
}
return []*Msg{&res}
}
func (iNode *insertNode) insert(ctx context.Context, insertData *InsertData, segmentID int64, wg *sync.WaitGroup) {
span, _ := opentracing.StartSpanFromContext(ctx, "insert node insert function")
defer span.Finish()
func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) {
var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
if err != nil {
log.Println("cannot find segment:", segmentID)
// TODO: add error handling
wg.Done()
span.LogFields(oplog.Error(err))
return
}
......@@ -144,7 +112,6 @@ func (iNode *insertNode) insert(ctx context.Context, insertData *InsertData, seg
log.Println(err)
// TODO: add error handling
wg.Done()
span.LogFields(oplog.Error(err))
return
}
......
......@@ -20,9 +20,6 @@ import (
"log"
"sync/atomic"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
......@@ -62,7 +59,6 @@ type QueryNode struct {
statsService *statsService
//opentracing
tracer opentracing.Tracer
closer io.Closer
// clients
......@@ -85,20 +81,6 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
statsService: nil,
}
var err error
cfg := &config.Configuration{
ServiceName: "query_node",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
node.tracer, node.closer, err = cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(node.tracer)
segmentsMap := make(map[int64]*Segment)
collections := make([]*Collection, 0)
......@@ -126,20 +108,6 @@ func NewQueryNodeWithoutID(ctx context.Context) *QueryNode {
statsService: nil,
}
var err error
cfg := &config.Configuration{
ServiceName: "query_node",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
node.tracer, node.closer, err = cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(node.tracer)
segmentsMap := make(map[int64]*Segment)
collections := make([]*Collection, 0)
......
......@@ -9,9 +9,6 @@ import (
"strconv"
"sync"
"github.com/opentracing/opentracing-go"
oplog "github.com/opentracing/opentracing-go/log"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
......@@ -145,19 +142,14 @@ func (ss *searchService) receiveSearchMsg() {
searchMsg = append(searchMsg, msgPack.Msgs[i])
}
for _, msg := range searchMsg {
span, ctx := opentracing.StartSpanFromContext(msg.GetMsgContext(), "receive search msg")
msg.SetMsgContext(ctx)
err := ss.search(msg)
if err != nil {
log.Println(err)
span.LogFields(oplog.Error(err))
err2 := ss.publishFailedSearchResult(msg, err.Error())
if err2 != nil {
span.LogFields(oplog.Error(err2))
log.Println("publish FailedSearchResult failed, error message: ", err2)
}
}
span.Finish()
}
log.Println("ReceiveSearchMsg, do search done, num of searchMsg = ", len(searchMsg))
}
......@@ -219,12 +211,8 @@ func (ss *searchService) doUnsolvedMsgSearch() {
// TODO:: cache map[dsl]plan
// TODO: reBatched search requests
func (ss *searchService) search(msg msgstream.TsMsg) error {
span, ctx := opentracing.StartSpanFromContext(msg.GetMsgContext(), "do search")
defer span.Finish()
msg.SetMsgContext(ctx)
searchMsg, ok := msg.(*msgstream.SearchMsg)
if !ok {
span.LogFields(oplog.Error(errors.New("invalid request type = " + string(msg.Type()))))
return errors.New("invalid request type = " + string(msg.Type()))
}
......@@ -233,25 +221,21 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
query := milvuspb.SearchRequest{}
err := proto.Unmarshal(queryBlob, &query)
if err != nil {
span.LogFields(oplog.Error(err))
return errors.New("unmarshal query failed")
}
collectionID := searchMsg.CollectionID
collection, err := ss.replica.getCollectionByID(collectionID)
if err != nil {
span.LogFields(oplog.Error(err))
return err
}
dsl := query.Dsl
plan, err := createPlan(*collection, dsl)
if err != nil {
span.LogFields(oplog.Error(err))
return err
}
placeHolderGroupBlob := query.PlaceholderGroup
placeholderGroup, err := parserPlaceholderGroup(plan, placeHolderGroupBlob)
if err != nil {
span.LogFields(oplog.Error(err))
return err
}
placeholderGroups := make([]*PlaceholderGroup, 0)
......@@ -290,7 +274,6 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
searchResult, err := segment.segmentSearch(plan, placeholderGroups, []Timestamp{searchTimestamp})
if err != nil {
span.LogFields(oplog.Error(err))
return err
}
searchResults = append(searchResults, searchResult)
......@@ -306,7 +289,6 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
for i := 0; i < int(nq); i++ {
bs, err := proto.Marshal(hit)
if err != nil {
span.LogFields(oplog.Error(err))
return err
}
nilHits[i] = bs
......@@ -329,7 +311,6 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
}
err = ss.publishSearchResult(searchResultMsg)
if err != nil {
span.LogFields(oplog.Error(err))
return err
}
return nil
......@@ -340,22 +321,18 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
numSegment := int64(len(searchResults))
err2 := reduceSearchResults(searchResults, numSegment, inReduced)
if err2 != nil {
span.LogFields(oplog.Error(err2))
return err2
}
err = fillTargetEntry(plan, searchResults, matchedSegments, inReduced)
if err != nil {
span.LogFields(oplog.Error(err))
return err
}
marshaledHits, err := reorganizeQueryResults(plan, placeholderGroups, searchResults, numSegment, inReduced)
if err != nil {
span.LogFields(oplog.Error(err))
return err
}
hitsBlob, err := marshaledHits.getHitsBlob()
if err != nil {
span.LogFields(oplog.Error(err))
return err
}
......@@ -407,7 +384,6 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
//}
err = ss.publishSearchResult(searchResultMsg)
if err != nil {
span.LogFields(oplog.Error(err))
return err
}
}
......
......@@ -24,7 +24,7 @@ type MasterServiceInterface interface {
type DataServiceInterface interface {
GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error)
GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error)
GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error)
}
type QueryNodeInterface interface {
......@@ -188,10 +188,17 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com
DbID: req.DbID,
CollectionID: req.CollectionID,
}
dmChannels, err := qs.dataServiceClient.GetInsertChannels(&channelRequest)
resp, err := qs.dataServiceClient.GetInsertChannels(&channelRequest)
if resp == nil {
err = errors.New("get insert channels resp is nil")
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
err = errors.New(resp.Status.Reason)
}
if err != nil {
return fn(err), err
}
dmChannels := resp.Values
// get partitionIDs
showPartitionRequest := &milvuspb.ShowPartitionRequest{
......@@ -428,10 +435,16 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
CollectionID: collectionID,
}
dmChannels, err := qs.dataServiceClient.GetInsertChannels(&channelRequest)
if err != nil {
resp, err := qs.dataServiceClient.GetInsertChannels(&channelRequest)
if resp == nil {
err = errors.New("get insert channels resp is nil")
return fn(err), err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
err = errors.New(resp.Status.Reason)
return fn(err), err
}
dmChannels := resp.Values
for _, partitionID := range partitionIDs {
loadSegmentRequest := &querypb.LoadSegmentRequest{
CollectionID: collectionID,
......
......@@ -155,8 +155,13 @@ func (data *dataMock) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datap
return ret, nil
}
func (data *dataMock) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) {
return []string{"test-insert"}, nil
func (data *dataMock) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
return &internalpb2.StringList{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Values: []string{"test-insert"},
}, nil
}
func TestQueryService_Init(t *testing.T) {
......
package flowgraph
import (
"fmt"
"log"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/opentracing/opentracing-go"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
)
......@@ -34,28 +30,6 @@ func (inNode *InputNode) Operate([]*Msg) []*Msg {
msgPack := (*inNode.inStream).Consume()
var childs []opentracing.Span
tracer := opentracing.GlobalTracer()
if tracer != nil && msgPack != nil {
for _, msg := range msgPack.Msgs {
if msg.Type() == commonpb.MsgType_kInsert {
var child opentracing.Span
ctx := msg.GetMsgContext()
if parent := opentracing.SpanFromContext(ctx); parent != nil {
child = tracer.StartSpan(fmt.Sprintf("through msg input node, start time = %d", msg.BeginTs()),
opentracing.FollowsFrom(parent.Context()))
} else {
child = tracer.StartSpan(fmt.Sprintf("through msg input node, start time = %d", msg.BeginTs()))
}
child.SetTag("hash keys", msg.HashKeys())
child.SetTag("start time", msg.BeginTs())
child.SetTag("end time", msg.EndTs())
msg.SetMsgContext(opentracing.ContextWithSpan(ctx, child))
childs = append(childs, child)
}
}
}
// TODO: add status
if msgPack == nil {
log.Println("null msg pack")
......@@ -69,10 +43,6 @@ func (inNode *InputNode) Operate([]*Msg) []*Msg {
startPositions: msgPack.StartPositions,
}
for _, child := range childs {
child.Finish()
}
return []*Msg{&msgStreamMsg}
}
......
......@@ -4,6 +4,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
)
......@@ -15,8 +16,8 @@ type Service interface {
type Component interface {
GetComponentStates() (*internalpb2.ComponentStates, error)
GetTimeTickChannel() (string, error)
GetStatisticsChannel() (string, error)
GetTimeTickChannel() (*milvuspb.StringResponse, error)
GetStatisticsChannel() (*milvuspb.StringResponse, error)
}
type IndexNodeInterface interface {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册