From 7ac1821cf1b785af9ebe732886372fb0b090e9f3 Mon Sep 17 00:00:00 2001 From: BossZou Date: Sat, 20 Feb 2021 18:30:37 +0800 Subject: [PATCH] Add RocksMQ throughout test Signed-off-by: BossZou --- internal/distributed/proxynode/service.go | 8 +-- internal/msgstream/pulsarms/msg_test.go | 6 +- internal/proxynode/impl.go | 26 +++++++++ internal/proxynode/interface.go | 2 + internal/proxynode/proxy_node.go | 2 +- internal/proxynode/task.go | 69 +++++++++++++++++++++++ internal/util/rocksmq/rocksmq_test.go | 66 ++++++++++++++++++++++ tests/python/test_index.py | 2 +- 8 files changed, 174 insertions(+), 7 deletions(-) diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index b04ac95d9..43a1e27c0 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -48,10 +48,6 @@ type Server struct { indexServiceClient *grpcindexserviceclient.Client } -func (s *Server) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) { - panic("implement me") -} - func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) { server := &Server{ @@ -307,6 +303,10 @@ func (s *Server) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexR return s.impl.CreateIndex(request) } +func (s *Server) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) { + return s.impl.DropIndex(request) +} + func (s *Server) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { return s.impl.DescribeIndex(request) } diff --git a/internal/msgstream/pulsarms/msg_test.go b/internal/msgstream/pulsarms/msg_test.go index 51d999c4d..abee2ee0b 100644 --- a/internal/msgstream/pulsarms/msg_test.go +++ b/internal/msgstream/pulsarms/msg_test.go @@ -164,7 +164,11 @@ func TestStream_task_Insert(t *testing.T) { msgs := result.Msgs for _, v := range msgs { receiveCount++ - fmt.Println("msg type: ", v.Type(), ", msg value: ", v, "msg tag: ", v.(*InsertTask).Tag) + // variable v could be type of '*msgstream.TimeTickMsg', here need to check + // if type conversation is successful + if task, ok := v.(*InsertTask); ok { + fmt.Println("msg type: ", v.Type(), ", msg value: ", v, "msg tag: ", task.Tag) + } } } if receiveCount >= len(msgPack.Msgs) { diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go index c57bd0da2..8b67f2ddf 100644 --- a/internal/proxynode/impl.go +++ b/internal/proxynode/impl.go @@ -540,6 +540,32 @@ func (node *NodeImpl) DescribeIndex(request *milvuspb.DescribeIndexRequest) (*mi return dit.result, nil } +func (node *NodeImpl) DropIndex(request *milvuspb.DropIndexRequest) (*commonpb.Status, error) { + log.Println("Drop index for: ", request) + ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) + defer cancel() + dit := &DropIndexTask{ + Condition: NewTaskCondition(ctx), + DropIndexRequest: request, + masterClient: node.masterClient, + } + err := node.sched.DdQueue.Enqueue(dit) + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, nil + } + err = dit.WaitToFinish() + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, nil + } + return dit.result, nil +} + func (node *NodeImpl) GetIndexState(request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error) { // log.Println("Describe index progress for: ", request) ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) diff --git a/internal/proxynode/interface.go b/internal/proxynode/interface.go index 509c73c6c..32471183e 100644 --- a/internal/proxynode/interface.go +++ b/internal/proxynode/interface.go @@ -22,6 +22,7 @@ type MasterClient interface { ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) + DropIndex(in *milvuspb.DropIndexRequest) (*commonpb.Status, error) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) } @@ -90,6 +91,7 @@ type ProxyNode interface { CreateIndex(request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) DescribeIndex(request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) GetIndexState(request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error) + DropIndex(request *milvuspb.DropIndexRequest) (*commonpb.Status, error) Insert(request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error) Search(request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index 2408eb0e6..fbf25b803 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -93,7 +93,7 @@ func (node *NodeImpl) waitForServiceReady(service Component, serviceName string) return nil } // wait for 10 seconds - err := retry.Retry(10, time.Second, checkFunc) + err := retry.Retry(10, time.Millisecond*200, checkFunc) if err != nil { errMsg := fmt.Sprintf("ProxyNode wait for %s ready failed", serviceName) return errors.New(errMsg) diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go index a05110547..cfd6ebb7b 100644 --- a/internal/proxynode/task.go +++ b/internal/proxynode/task.go @@ -1350,6 +1350,75 @@ func (dit *DescribeIndexTask) PostExecute() error { return nil } +type DropIndexTask struct { + Condition + *milvuspb.DropIndexRequest + masterClient MasterClient + result *commonpb.Status +} + +func (dit *DropIndexTask) OnEnqueue() error { + dit.Base = &commonpb.MsgBase{} + return nil +} + +func (dit *DropIndexTask) ID() UniqueID { + return dit.Base.MsgID +} + +func (dit *DropIndexTask) SetID(uid UniqueID) { + dit.Base.MsgID = uid +} + +func (dit *DropIndexTask) Type() commonpb.MsgType { + return dit.Base.MsgType +} + +func (dit *DropIndexTask) BeginTs() Timestamp { + return dit.Base.Timestamp +} + +func (dit *DropIndexTask) EndTs() Timestamp { + return dit.Base.Timestamp +} + +func (dit *DropIndexTask) SetTs(ts Timestamp) { + dit.Base.Timestamp = ts +} + +func (dit *DropIndexTask) PreExecute() error { + dit.Base.MsgType = commonpb.MsgType_kDropIndex + dit.Base.SourceID = Params.ProxyID + + collName, fieldName := dit.CollectionName, dit.FieldName + + if err := ValidateCollectionName(collName); err != nil { + return err + } + + if err := ValidateFieldName(fieldName); err != nil { + return err + } + + return nil +} + +func (dit *DropIndexTask) Execute() error { + var err error + dit.result, err = dit.masterClient.DropIndex(dit.DropIndexRequest) + if dit.result == nil { + return errors.New("drop index resp is nil") + } + if dit.result.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(dit.result.Reason) + } + return err +} + +func (dit *DropIndexTask) PostExecute() error { + return nil +} + type GetIndexStateTask struct { Condition *milvuspb.IndexStateRequest diff --git a/internal/util/rocksmq/rocksmq_test.go b/internal/util/rocksmq/rocksmq_test.go index f8e0d58c7..9f5869db5 100644 --- a/internal/util/rocksmq/rocksmq_test.go +++ b/internal/util/rocksmq/rocksmq_test.go @@ -1,10 +1,12 @@ package rocksmq import ( + "log" "os" "strconv" "sync" "testing" + "time" "github.com/stretchr/testify/assert" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" @@ -203,3 +205,67 @@ func TestRocksMQ_Goroutines(t *testing.T) { } wg.Wait() } + +/** + This test is aim to measure RocksMq throughout. + Hardware: + CPU Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz + Disk SSD + + Test with 1,000,000 message, result is as follow: + Produce: 190000 message / s + Consume: 90000 message / s +*/ +func TestRocksMQ_Throughout(t *testing.T) { + etcdAddr := os.Getenv("ETCD_ADDRESS") + if etcdAddr == "" { + etcdAddr = "localhost:2379" + } + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + assert.Nil(t, err) + etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root") + defer etcdKV.Close() + idAllocator := NewGlobalIDAllocator("dummy", etcdKV) + _ = idAllocator.Initialize() + + name := "/tmp/rocksmq_3" + defer os.RemoveAll(name) + rmq, err := NewRocksMQ(name, idAllocator) + assert.Nil(t, err) + + channelName := "channel_throughout_test" + err = rmq.CreateChannel(channelName) + assert.Nil(t, err) + defer rmq.DestroyChannel(channelName) + + entityNum := 1000000 + + pt0 := time.Now().UnixNano() / int64(time.Millisecond) + for i := 0; i < entityNum; i++ { + msg := "message_" + strconv.Itoa(i) + pMsg := ProducerMessage{payload: []byte(msg)} + assert.Nil(t, idAllocator.UpdateID()) + err := rmq.Produce(channelName, []ProducerMessage{pMsg}) + assert.Nil(t, err) + } + pt1 := time.Now().UnixNano() / int64(time.Millisecond) + pDuration := pt1 - pt0 + log.Printf("Total produce %d item, cost %v ms, throughout %v / s", entityNum, pDuration, int64(entityNum)*1000/pDuration) + + groupName := "test_throughout_group" + _ = rmq.DestroyConsumerGroup(groupName, channelName) + _, err = rmq.CreateConsumerGroup(groupName, channelName) + assert.Nil(t, err) + defer rmq.DestroyConsumerGroup(groupName, channelName) + + // Consume one message in each goroutine + ct0 := time.Now().UnixNano() / int64(time.Millisecond) + for i := 0; i < entityNum; i++ { + cMsgs, err := rmq.Consume(groupName, channelName, 1) + assert.Nil(t, err) + assert.Equal(t, len(cMsgs), 1) + } + ct1 := time.Now().UnixNano() / int64(time.Millisecond) + cDuration := ct1 - ct0 + log.Printf("Total consume %d item, cost %v ms, throughout %v / s", entityNum, cDuration, int64(entityNum)*1000/cDuration) +} diff --git a/tests/python/test_index.py b/tests/python/test_index.py index cad7695c9..ee7b36c50 100644 --- a/tests/python/test_index.py +++ b/tests/python/test_index.py @@ -524,6 +524,7 @@ class TestIndexBase: connect.drop_index(collection, field_name) +@pytest.mark.skip("r0.3-test") class TestIndexBinary: @pytest.fixture( scope="function", @@ -593,7 +594,6 @@ class TestIndexBinary: ids = connect.bulk_insert(binary_collection, default_binary_entities, partition_tag=default_tag) connect.create_index(binary_collection, binary_field_name, get_jaccard_index) - @pytest.mark.skip("r0.3-test") @pytest.mark.timeout(BUILD_TIMEOUT) def test_create_index_search_with_query_vectors(self, connect, binary_collection, get_jaccard_index, get_nq): ''' -- GitLab