From d7f5c6b1efe7b887e9f14946454fe27d7e721bbc Mon Sep 17 00:00:00 2001 From: neza2017 Date: Thu, 1 Apr 2021 18:05:43 +0800 Subject: [PATCH] Add benchmark for timtick Signed-off-by: neza2017 --- internal/masterservice/master_service.go | 4 +- internal/masterservice/timestamp_test.go | 136 +++++++++++++++++++++++ 2 files changed, 138 insertions(+), 2 deletions(-) create mode 100644 internal/masterservice/timestamp_test.go diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index b226e3de4..dc68f76ba 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -1436,7 +1436,7 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques func (c *Core) AllocTimestamp(ctx context.Context, in *masterpb.AllocTimestampRequest) (*masterpb.AllocTimestampResponse, error) { ts, err := c.tsoAllocator.Alloc(in.Count) if err != nil { - log.Debug("AllocTimestamp failed", zap.Error(err)) + log.Debug("AllocTimestamp failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) return &masterpb.AllocTimestampResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -1460,7 +1460,7 @@ func (c *Core) AllocTimestamp(ctx context.Context, in *masterpb.AllocTimestampRe func (c *Core) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*masterpb.AllocIDResponse, error) { start, _, err := c.idAllocator.Alloc(in.Count) if err != nil { - log.Debug("AllocID failed", zap.Error(err)) + log.Debug("AllocID failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) return &masterpb.AllocIDResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, diff --git a/internal/masterservice/timestamp_test.go b/internal/masterservice/timestamp_test.go new file mode 100644 index 000000000..afc793314 --- /dev/null +++ b/internal/masterservice/timestamp_test.go @@ -0,0 +1,136 @@ +package masterservice + +import ( + "context" + "fmt" + "math/rand" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" + "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" + "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" + "github.com/zilliztech/milvus-distributed/internal/proto/proxypb" + "github.com/zilliztech/milvus-distributed/internal/proto/querypb" + "github.com/zilliztech/milvus-distributed/internal/types" +) + +type tbp struct { + types.ProxyService +} + +func (*tbp) GetTimeTickChannel(context.Context) (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + Value: fmt.Sprintf("tbp-%d", rand.Int()), + }, nil +} + +func (*tbp) InvalidateCollectionMetaCache(context.Context, *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { + return nil, nil +} + +type tbd struct { + types.DataService +} + +func (*tbd) GetInsertBinlogPaths(context.Context, *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) { + return nil, nil +} + +func (*tbd) GetSegmentInfo(context.Context, *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { + return nil, nil +} + +func (*tbd) GetSegmentInfoChannel(context.Context) (*milvuspb.StringResponse, error) { + return &milvuspb.StringResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + Value: fmt.Sprintf("tbd-%d", rand.Int()), + }, nil +} + +type tbq struct { + types.QueryService +} + +func (*tbq) ReleaseCollection(context.Context, *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) { + return nil, nil +} + +type tbi struct { + types.IndexService +} + +func (*tbi) BuildIndex(context.Context, *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) { + return nil, nil +} + +func (*tbi) DropIndex(context.Context, *indexpb.DropIndexRequest) (*commonpb.Status, error) { + return nil, nil +} + +func BenchmarkAllocTimestamp(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + msFactory := pulsarms.NewFactory() + Params.Init() + core, err := NewCore(ctx, msFactory) + + assert.Nil(b, err) + + randVal := rand.Int() + + Params.TimeTickChannel = fmt.Sprintf("master-time-tick-%d", randVal) + Params.DdChannel = fmt.Sprintf("master-dd-%d", randVal) + Params.StatisticsChannel = fmt.Sprintf("master-statistics-%d", randVal) + Params.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.MetaRootPath) + Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath) + Params.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal) + + err = core.SetProxyService(ctx, &tbp{}) + assert.Nil(b, err) + + err = core.SetDataService(ctx, &tbd{}) + assert.Nil(b, err) + + err = core.SetIndexService(&tbi{}) + assert.Nil(b, err) + + err = core.SetQueryService(&tbq{}) + assert.Nil(b, err) + + err = core.Init() + assert.Nil(b, err) + + err = core.Start() + assert.Nil(b, err) + + m := map[string]interface{}{ + "receiveBufSize": 1024, + "pulsarAddress": Params.PulsarAddress, + "pulsarBufSize": 1024} + err = msFactory.SetParams(m) + assert.Nil(b, err) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + req := masterpb.AllocTimestampRequest{ + Base: &commonpb.MsgBase{ + MsgID: int64(i), + }, + Count: 1, + } + _, err := core.AllocTimestamp(ctx, &req) + assert.Nil(b, err) + + } + b.StopTimer() +} -- GitLab