mock_test.go 12.8 KB
Newer Older
S
sunby 已提交
1 2 3 4 5 6 7 8 9 10
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
11
package datacoord
S
sunby 已提交
12 13

import (
G
godchen 已提交
14
	"context"
15
	"errors"
S
sunby 已提交
16 17 18
	"sync/atomic"
	"time"

19 20 21
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
	"github.com/milvus-io/milvus/internal/util/typeutil"

X
Xiangyu Wang 已提交
22 23
	memkv "github.com/milvus-io/milvus/internal/kv/mem"
	"github.com/milvus-io/milvus/internal/util/tsoutil"
N
neza2017 已提交
24

X
Xiangyu Wang 已提交
25 26 27 28
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
N
neza2017 已提交
29
	"github.com/milvus-io/milvus/internal/proto/proxypb"
30
	"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
X
Xiangyu Wang 已提交
31
	"github.com/milvus-io/milvus/internal/proto/schemapb"
S
sunby 已提交
32 33
)

S
sunby 已提交
34
func newMemoryMeta(allocator allocator) (*meta, error) {
S
sunby 已提交
35
	memoryKV := memkv.NewMemoryKV()
36
	return NewMeta(memoryKV)
S
sunby 已提交
37 38
}

C
congqixia 已提交
39 40
var _ allocator = (*MockAllocator)(nil)

S
sunby 已提交
41 42 43 44
type MockAllocator struct {
	cnt int64
}

C
congqixia 已提交
45
func (m *MockAllocator) allocTimestamp(ctx context.Context) (Timestamp, error) {
S
sunby 已提交
46 47 48 49 50 51
	val := atomic.AddInt64(&m.cnt, 1)
	phy := time.Now().UnixNano() / int64(time.Millisecond)
	ts := tsoutil.ComposeTS(phy, val)
	return ts, nil
}

C
congqixia 已提交
52
func (m *MockAllocator) allocID(ctx context.Context) (UniqueID, error) {
S
sunby 已提交
53 54 55 56
	val := atomic.AddInt64(&m.cnt, 1)
	return val, nil
}

57 58 59 60 61 62 63 64 65 66 67
// FailsAllocator allocator that fails
type FailsAllocator struct{}

func (a *FailsAllocator) allocTimestamp(_ context.Context) (Timestamp, error) {
	return 0, errors.New("always fail")
}

func (a *FailsAllocator) allocID(_ context.Context) (UniqueID, error) {
	return 0, errors.New("always fail")
}

S
sunby 已提交
68 69 70 71
func newMockAllocator() *MockAllocator {
	return &MockAllocator{}
}

S
sunby 已提交
72
func newTestSchema() *schemapb.CollectionSchema {
S
sunby 已提交
73 74 75 76 77
	return &schemapb.CollectionSchema{
		Name:        "test",
		Description: "schema for test used",
		AutoID:      false,
		Fields: []*schemapb.FieldSchema{
G
godchen 已提交
78 79
			{FieldID: 1, Name: "field1", IsPrimaryKey: false, Description: "field no.1", DataType: schemapb.DataType_String},
			{FieldID: 2, Name: "field2", IsPrimaryKey: false, Description: "field no.2", DataType: schemapb.DataType_FloatVector},
S
sunby 已提交
80 81 82
		},
	}
}
S
sunby 已提交
83 84

type mockDataNodeClient struct {
S
sunby 已提交
85 86
	id    int64
	state internalpb.StateCode
S
sunby 已提交
87
	ch    chan interface{}
S
sunby 已提交
88 89
}

90
func newMockDataNodeClient(id int64, ch chan interface{}) (*mockDataNodeClient, error) {
S
sunby 已提交
91 92 93
	return &mockDataNodeClient{
		id:    id,
		state: internalpb.StateCode_Initializing,
94
		ch:    ch,
G
godchen 已提交
95
	}, nil
S
sunby 已提交
96 97
}

N
neza2017 已提交
98 99 100 101 102
func (c *mockDataNodeClient) Init() error {
	return nil
}

func (c *mockDataNodeClient) Start() error {
S
sunby 已提交
103
	c.state = internalpb.StateCode_Healthy
N
neza2017 已提交
104 105 106
	return nil
}

107 108 109 110
func (c *mockDataNodeClient) Register() error {
	return nil
}

G
godchen 已提交
111
func (c *mockDataNodeClient) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
S
sunby 已提交
112 113 114 115 116 117
	return &internalpb.ComponentStates{
		State: &internalpb.ComponentInfo{
			NodeID:    c.id,
			StateCode: c.state,
		},
	}, nil
N
neza2017 已提交
118 119 120 121 122 123
}

func (c *mockDataNodeClient) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return nil, nil
}

G
godchen 已提交
124
func (c *mockDataNodeClient) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelsRequest) (*commonpb.Status, error) {
125
	return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
S
sunby 已提交
126 127
}

G
godchen 已提交
128
func (c *mockDataNodeClient) FlushSegments(ctx context.Context, in *datapb.FlushSegmentsRequest) (*commonpb.Status, error) {
S
sunby 已提交
129 130 131
	if c.ch != nil {
		c.ch <- in
	}
132
	return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
S
sunby 已提交
133 134
}

135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
func (c *mockDataNodeClient) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	// TODO(dragondriver): change the id, though it's not important in ut
	nodeID := UniqueID(20210819)

	nodeInfos := metricsinfo.DataNodeInfos{
		BaseComponentInfos: metricsinfo.BaseComponentInfos{
			Name: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID),
		},
	}
	resp, err := metricsinfo.MarshalComponentInfos(nodeInfos)
	if err != nil {
		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response:      "",
			ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID),
		}, nil
	}

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Response:      resp,
		ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID),
	}, nil
}

S
sunby 已提交
166
func (c *mockDataNodeClient) Stop() error {
S
sunby 已提交
167
	c.state = internalpb.StateCode_Abnormal
S
sunby 已提交
168 169
	return nil
}
S
sunby 已提交
170

171
type mockRootCoordService struct {
C
congqixia 已提交
172 173
	state internalpb.StateCode
	cnt   int64
S
sunby 已提交
174 175
}

176
func newMockRootCoordService() *mockRootCoordService {
C
congqixia 已提交
177
	return &mockRootCoordService{state: internalpb.StateCode_Healthy}
S
sunby 已提交
178 179
}

180
func (m *mockRootCoordService) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
181 182 183
	return nil, nil
}

184
func (m *mockRootCoordService) Init() error {
S
sunby 已提交
185 186 187
	return nil
}

188
func (m *mockRootCoordService) Start() error {
S
sunby 已提交
189 190 191
	return nil
}

192
func (m *mockRootCoordService) Stop() error {
C
congqixia 已提交
193
	m.state = internalpb.StateCode_Abnormal
S
sunby 已提交
194 195 196
	return nil
}

197
func (m *mockRootCoordService) Register() error {
198 199 200
	return nil
}

201
func (m *mockRootCoordService) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
S
sunby 已提交
202 203 204 205
	return &internalpb.ComponentStates{
		State: &internalpb.ComponentInfo{
			NodeID:    0,
			Role:      "",
C
congqixia 已提交
206
			StateCode: m.state,
S
sunby 已提交
207 208 209 210 211 212 213 214 215 216
			ExtraInfo: []*commonpb.KeyValuePair{},
		},
		SubcomponentStates: []*internalpb.ComponentInfo{},
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
	}, nil
}

217
func (m *mockRootCoordService) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
S
sunby 已提交
218 219 220 221
	panic("not implemented") // TODO: Implement
}

//DDL request
222
func (m *mockRootCoordService) CreateCollection(ctx context.Context, req *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
S
sunby 已提交
223 224 225
	panic("not implemented") // TODO: Implement
}

226
func (m *mockRootCoordService) DropCollection(ctx context.Context, req *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
S
sunby 已提交
227 228 229
	panic("not implemented") // TODO: Implement
}

230
func (m *mockRootCoordService) HasCollection(ctx context.Context, req *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
S
sunby 已提交
231 232 233
	panic("not implemented") // TODO: Implement
}

234
func (m *mockRootCoordService) DescribeCollection(ctx context.Context, req *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
S
sunby 已提交
235 236
	return &milvuspb.DescribeCollectionResponse{
		Status: &commonpb.Status{
237
			ErrorCode: commonpb.ErrorCode_Success,
S
sunby 已提交
238 239
			Reason:    "",
		},
240 241 242
		Schema: &schemapb.CollectionSchema{
			Name: "test",
		},
243 244
		CollectionID:        1314,
		VirtualChannelNames: []string{"vchan1"},
S
sunby 已提交
245
	}, nil
S
sunby 已提交
246 247
}

248
func (m *mockRootCoordService) ShowCollections(ctx context.Context, req *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
S
sunby 已提交
249 250 251 252 253
	return &milvuspb.ShowCollectionsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
254
		CollectionNames: []string{"test"},
S
sunby 已提交
255 256 257
	}, nil
}

258
func (m *mockRootCoordService) CreatePartition(ctx context.Context, req *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
S
sunby 已提交
259 260 261
	panic("not implemented") // TODO: Implement
}

262
func (m *mockRootCoordService) DropPartition(ctx context.Context, req *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
S
sunby 已提交
263 264 265
	panic("not implemented") // TODO: Implement
}

266
func (m *mockRootCoordService) HasPartition(ctx context.Context, req *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
S
sunby 已提交
267 268 269
	panic("not implemented") // TODO: Implement
}

270
func (m *mockRootCoordService) ShowPartitions(ctx context.Context, req *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
271 272 273 274 275 276 277 278
	return &milvuspb.ShowPartitionsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		PartitionNames: []string{"_default"},
		PartitionIDs:   []int64{0},
	}, nil
S
sunby 已提交
279 280 281
}

//index builder service
282
func (m *mockRootCoordService) CreateIndex(ctx context.Context, req *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
S
sunby 已提交
283 284 285
	panic("not implemented") // TODO: Implement
}

286
func (m *mockRootCoordService) DescribeIndex(ctx context.Context, req *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
S
sunby 已提交
287 288 289
	panic("not implemented") // TODO: Implement
}

290
func (m *mockRootCoordService) DropIndex(ctx context.Context, req *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
S
sunby 已提交
291 292 293 294
	panic("not implemented") // TODO: Implement
}

//global timestamp allocator
295
func (m *mockRootCoordService) AllocTimestamp(ctx context.Context, req *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) {
C
congqixia 已提交
296 297 298 299
	if m.state != internalpb.StateCode_Healthy {
		return &rootcoordpb.AllocTimestampResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}}, nil
	}

S
sunby 已提交
300 301 302
	val := atomic.AddInt64(&m.cnt, int64(req.Count))
	phy := time.Now().UnixNano() / int64(time.Millisecond)
	ts := tsoutil.ComposeTS(phy, val)
303
	return &rootcoordpb.AllocTimestampResponse{
S
sunby 已提交
304 305 306 307 308 309 310
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Timestamp: ts,
		Count:     req.Count,
	}, nil
S
sunby 已提交
311 312
}

313
func (m *mockRootCoordService) AllocID(ctx context.Context, req *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) {
C
congqixia 已提交
314 315 316
	if m.state != internalpb.StateCode_Healthy {
		return &rootcoordpb.AllocIDResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}}, nil
	}
S
sunby 已提交
317
	val := atomic.AddInt64(&m.cnt, int64(req.Count))
318
	return &rootcoordpb.AllocIDResponse{
S
sunby 已提交
319 320 321 322 323 324 325
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		ID:    val,
		Count: req.Count,
	}, nil
S
sunby 已提交
326 327 328
}

//segment
329
func (m *mockRootCoordService) DescribeSegment(ctx context.Context, req *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
S
sunby 已提交
330 331 332
	panic("not implemented") // TODO: Implement
}

333
func (m *mockRootCoordService) ShowSegments(ctx context.Context, req *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) {
S
sunby 已提交
334 335 336
	panic("not implemented") // TODO: Implement
}

337
func (m *mockRootCoordService) GetDdChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
S
sunby 已提交
338 339 340 341 342 343 344 345
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: "ddchannel",
	}, nil
}
346

347
func (m *mockRootCoordService) UpdateChannelTimeTick(ctx context.Context, req *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) {
348 349
	panic("not implemented") // TODO: Implement
}
350

351
func (m *mockRootCoordService) ReleaseDQLMessageStream(ctx context.Context, req *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
N
neza2017 已提交
352 353
	panic("not implemented") // TODO: Implement
}
354 355
func (m *mockRootCoordService) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) {
	return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
356 357 358 359
}
func (m *mockRootCoordService) AddNewSegment(ctx context.Context, in *datapb.SegmentMsg) (*commonpb.Status, error) {
	panic("not implemented") // TODO: Implement
}
360

361
func (m *mockRootCoordService) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
362
	// TODO(dragondriver): change the id, though it's not important in ut
363
	nodeID := UniqueID(20210901)
364

365 366 367 368 369 370 371 372 373 374
	rootCoordTopology := metricsinfo.RootCoordTopology{
		Self: metricsinfo.RootCoordInfos{
			BaseComponentInfos: metricsinfo.BaseComponentInfos{
				Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, nodeID),
			},
		},
		Connections: metricsinfo.ConnTopology{
			Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, nodeID),
			// TODO(dragondriver): fill ConnectedComponents if necessary
			ConnectedComponents: []metricsinfo.ConnectionInfo{},
375 376
		},
	}
377 378

	resp, err := metricsinfo.MarshalTopology(rootCoordTopology)
379 380 381 382 383 384 385
	if err != nil {
		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response:      "",
386
			ComponentName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, nodeID),
387 388 389 390 391 392 393 394 395
		}, nil
	}

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Response:      resp,
396
		ComponentName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, nodeID),
397 398
	}, nil
}