master_service.go 47.7 KB
Newer Older
1 2 3 4
package masterservice

import (
	"context"
5
	"fmt"
6 7 8 9 10
	"math/rand"
	"sync"
	"sync/atomic"
	"time"

B
bigsheeper 已提交
11 12 13
	"go.etcd.io/etcd/clientv3"
	"go.uber.org/zap"

14
	"github.com/zilliztech/milvus-distributed/internal/allocator"
15
	etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
N
neza2017 已提交
16
	"github.com/zilliztech/milvus-distributed/internal/log"
Z
zhenshan.cao 已提交
17
	ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
18
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
N
neza2017 已提交
19
	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
N
neza2017 已提交
20
	"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
G
godchen 已提交
21
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
22 23
	"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
N
neza2017 已提交
24
	"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
25
	"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
B
bigsheeper 已提交
26 27 28 29 30
	"github.com/zilliztech/milvus-distributed/internal/tso"
	"github.com/zilliztech/milvus-distributed/internal/types"
	"github.com/zilliztech/milvus-distributed/internal/util/retry"
	"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
31 32
)

G
godchen 已提交
33
//  internalpb -> internalpb
34 35 36 37
//  proxypb(proxy_service)
//  querypb(query_service)
//  datapb(data_service)
//  indexpb(index_service)
38
//  milvuspb -> milvuspb
Z
zhenshan.cao 已提交
39
//  masterpb2 -> masterpb (master_service)
40 41 42 43 44

// ------------------ struct -----------------------

// master core
type Core struct {
45 46 47
	/*
		ProxyServiceClient Interface:
		get proxy service time tick channel,InvalidateCollectionMetaCache
48

49 50 51
		DataService Interface:
		Segment States Channel, from DataService, if create new segment, data service should put the segment id into this channel, and let the master add the segment id to the collection meta
		Segment Flush Watcher, monitor if segment has flushed into disk
52

G
godchen 已提交
53
		IndexService Interface
Z
zhenshan.cao 已提交
54
		IndexService Sch, tell index service to build index
55
	*/
56 57 58

	MetaTable *metaTable
	//id allocator
N
neza2017 已提交
59 60 61
	idAllocator       func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error)
	idAllocatorUpdate func() error

62
	//tso allocator
N
neza2017 已提交
63 64
	tsoAllocator       func(count uint32) (typeutil.Timestamp, error)
	tsoAllocatorUpdate func() error
65 66 67 68 69 70 71 72

	//inner members
	ctx     context.Context
	cancel  context.CancelFunc
	etcdCli *clientv3.Client
	kvBase  *etcdkv.EtcdKV
	metaKV  *etcdkv.EtcdKV

Z
zhenshan.cao 已提交
73 74
	//setMsgStreams, receive time tick from proxy service time tick channel
	ProxyTimeTickChan chan typeutil.Timestamp
75

Z
zhenshan.cao 已提交
76
	//setMsgStreams, send time tick into dd channel and time tick channel
77 78
	SendTimeTick func(t typeutil.Timestamp) error

Z
zhenshan.cao 已提交
79
	//setMsgStreams, send create collection into dd channel
G
groot 已提交
80
	DdCreateCollectionReq func(ctx context.Context, req *internalpb.CreateCollectionRequest) error
81

Z
zhenshan.cao 已提交
82
	//setMsgStreams, send drop collection into dd channel, and notify the proxy to delete this collection
G
groot 已提交
83
	DdDropCollectionReq func(ctx context.Context, req *internalpb.DropCollectionRequest) error
84

Z
zhenshan.cao 已提交
85
	//setMsgStreams, send create partition into dd channel
G
groot 已提交
86
	DdCreatePartitionReq func(ctx context.Context, req *internalpb.CreatePartitionRequest) error
87

Z
zhenshan.cao 已提交
88
	//setMsgStreams, send drop partition into dd channel
G
groot 已提交
89
	DdDropPartitionReq func(ctx context.Context, req *internalpb.DropPartitionRequest) error
90

N
neza2017 已提交
91 92 93
	//setMsgStreams segment channel, receive segment info from data service, if master create segment
	DataServiceSegmentChan chan *datapb.SegmentInfo

94 95 96
	//setMsgStreams ,if segment flush completed, data node would put segment id into msg stream
	DataNodeSegmentFlushCompletedChan chan typeutil.UniqueID

97
	//get binlog file path from data service,
N
neza2017 已提交
98
	GetBinlogFilePathsFromDataServiceReq func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error)
N
neza2017 已提交
99
	GetNumRowsReq                        func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error)
N
neza2017 已提交
100

101
	//call index builder's client to build index, return build id
G
groot 已提交
102 103
	BuildIndexReq func(ctx context.Context, binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error)
	DropIndexReq  func(ctx context.Context, indexID typeutil.UniqueID) error
N
neza2017 已提交
104

105
	//proxy service interface, notify proxy service to drop collection
G
groot 已提交
106
	InvalidateCollectionMetaCache func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error
107

108
	//query service interface, notify query service to release collection
G
groot 已提交
109
	ReleaseCollection func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error
110

N
neza2017 已提交
111 112 113
	// put create index task into this chan
	indexTaskQueue chan *CreateIndexTask

114 115 116 117 118 119 120 121 122 123 124 125 126
	//dd request scheduler
	ddReqQueue      chan reqTask //dd request will be push into this chan
	lastDdTimeStamp typeutil.Timestamp

	//time tick loop
	lastTimeTick typeutil.Timestamp

	//states code
	stateCode atomic.Value

	//call once
	initOnce  sync.Once
	startOnce sync.Once
127
	//isInit    atomic.Value
G
groot 已提交
128 129

	msFactory ms.Factory
130 131 132 133
}

// --------------------- function --------------------------

G
groot 已提交
134
func NewCore(c context.Context, factory ms.Factory) (*Core, error) {
135 136 137
	ctx, cancel := context.WithCancel(c)
	rand.Seed(time.Now().UnixNano())
	core := &Core{
G
groot 已提交
138 139 140
		ctx:       ctx,
		cancel:    cancel,
		msFactory: factory,
141
	}
G
godchen 已提交
142
	core.UpdateStateCode(internalpb.StateCode_Abnormal)
143 144 145
	return core, nil
}

G
godchen 已提交
146
func (c *Core) UpdateStateCode(code internalpb.StateCode) {
147 148 149
	c.stateCode.Store(code)
}

150 151
func (c *Core) checkInit() error {
	if c.MetaTable == nil {
N
neza2017 已提交
152
		return fmt.Errorf("MetaTable is nil")
153 154
	}
	if c.idAllocator == nil {
N
neza2017 已提交
155
		return fmt.Errorf("idAllocator is nil")
156
	}
N
neza2017 已提交
157 158 159
	if c.idAllocatorUpdate == nil {
		return fmt.Errorf("idAllocatorUpdate is nil")
	}
160
	if c.tsoAllocator == nil {
N
neza2017 已提交
161
		return fmt.Errorf("tsoAllocator is nil")
162
	}
N
neza2017 已提交
163 164 165
	if c.tsoAllocatorUpdate == nil {
		return fmt.Errorf("tsoAllocatorUpdate is nil")
	}
166
	if c.etcdCli == nil {
N
neza2017 已提交
167
		return fmt.Errorf("etcdCli is nil")
168 169
	}
	if c.metaKV == nil {
N
neza2017 已提交
170
		return fmt.Errorf("metaKV is nil")
171 172
	}
	if c.kvBase == nil {
N
neza2017 已提交
173
		return fmt.Errorf("kvBase is nil")
174 175
	}
	if c.ProxyTimeTickChan == nil {
N
neza2017 已提交
176
		return fmt.Errorf("ProxyTimeTickChan is nil")
177 178
	}
	if c.ddReqQueue == nil {
N
neza2017 已提交
179
		return fmt.Errorf("ddReqQueue is nil")
180 181
	}
	if c.DdCreateCollectionReq == nil {
N
neza2017 已提交
182
		return fmt.Errorf("DdCreateCollectionReq is nil")
183 184
	}
	if c.DdDropCollectionReq == nil {
N
neza2017 已提交
185
		return fmt.Errorf("DdDropCollectionReq is nil")
186 187
	}
	if c.DdCreatePartitionReq == nil {
N
neza2017 已提交
188
		return fmt.Errorf("DdCreatePartitionReq is nil")
189 190
	}
	if c.DdDropPartitionReq == nil {
N
neza2017 已提交
191
		return fmt.Errorf("DdDropPartitionReq is nil")
192
	}
N
neza2017 已提交
193
	if c.DataServiceSegmentChan == nil {
N
neza2017 已提交
194
		return fmt.Errorf("DataServiceSegmentChan is nil")
N
neza2017 已提交
195 196
	}
	if c.GetBinlogFilePathsFromDataServiceReq == nil {
N
neza2017 已提交
197
		return fmt.Errorf("GetBinlogFilePathsFromDataServiceReq is nil")
N
neza2017 已提交
198
	}
199
	if c.GetNumRowsReq == nil {
N
neza2017 已提交
200
		return fmt.Errorf("GetNumRowsReq is nil")
201
	}
N
neza2017 已提交
202
	if c.BuildIndexReq == nil {
N
neza2017 已提交
203
		return fmt.Errorf("BuildIndexReq is nil")
N
neza2017 已提交
204
	}
N
neza2017 已提交
205
	if c.DropIndexReq == nil {
N
neza2017 已提交
206
		return fmt.Errorf("DropIndexReq is nil")
N
neza2017 已提交
207
	}
208
	if c.InvalidateCollectionMetaCache == nil {
N
neza2017 已提交
209
		return fmt.Errorf("InvalidateCollectionMetaCache is nil")
210
	}
N
neza2017 已提交
211
	if c.indexTaskQueue == nil {
N
neza2017 已提交
212
		return fmt.Errorf("indexTaskQueue is nil")
N
neza2017 已提交
213
	}
214
	if c.DataNodeSegmentFlushCompletedChan == nil {
N
neza2017 已提交
215
		return fmt.Errorf("DataNodeSegmentFlushCompletedChan is nil")
216
	}
217
	if c.ReleaseCollection == nil {
N
neza2017 已提交
218
		return fmt.Errorf("ReleaseCollection is nil")
219
	}
220 221 222 223 224 225 226
	return nil
}

func (c *Core) startDdScheduler() {
	for {
		select {
		case <-c.ctx.Done():
B
bigsheeper 已提交
227
			log.Debug("close dd scheduler, exit task execution loop")
228 229 230
			return
		case task, ok := <-c.ddReqQueue:
			if !ok {
B
bigsheeper 已提交
231
				log.Debug("dd chan is closed, exit task execution loop")
232 233 234 235 236 237 238
				return
			}
			ts, err := task.Ts()
			if err != nil {
				task.Notify(err)
				break
			}
239
			if !task.IgnoreTimeStamp() && ts <= c.lastDdTimeStamp {
S
sunby 已提交
240
				task.Notify(fmt.Errorf("input timestamp = %d, last dd time stamp = %d", ts, c.lastDdTimeStamp))
241 242
				break
			}
G
groot 已提交
243
			err = task.Execute(task.Ctx())
244
			task.Notify(err)
245 246 247
			if ts > c.lastDdTimeStamp {
				c.lastDdTimeStamp = ts
			}
248 249 250 251 252 253 254 255
		}
	}
}

func (c *Core) startTimeTickLoop() {
	for {
		select {
		case <-c.ctx.Done():
B
bigsheeper 已提交
256
			log.Debug("close master time tick loop")
257 258 259
			return
		case tt, ok := <-c.ProxyTimeTickChan:
			if !ok {
B
bigsheeper 已提交
260
				log.Warn("proxyTimeTickStream is closed, exit time tick loop")
261 262 263
				return
			}
			if tt <= c.lastTimeTick {
N
neza2017 已提交
264
				log.Warn("master time tick go back", zap.Uint64("last time tick", c.lastTimeTick), zap.Uint64("input time tick ", tt))
265 266
			}
			if err := c.SendTimeTick(tt); err != nil {
N
neza2017 已提交
267
				log.Warn("master send time tick into dd and time_tick channel failed", zap.String("error", err.Error()))
268 269 270 271 272 273
			}
			c.lastTimeTick = tt
		}
	}
}

N
neza2017 已提交
274 275 276 277 278
//data service send segment info to master when create segment
func (c *Core) startDataServiceSegmentLoop() {
	for {
		select {
		case <-c.ctx.Done():
B
bigsheeper 已提交
279
			log.Debug("close data service segment loop")
N
neza2017 已提交
280 281 282
			return
		case seg, ok := <-c.DataServiceSegmentChan:
			if !ok {
B
bigsheeper 已提交
283
				log.Debug("data service segment is closed, exit loop")
N
neza2017 已提交
284 285 286
				return
			}
			if seg == nil {
N
neza2017 已提交
287
				log.Warn("segment from data service is nil")
N
neza2017 已提交
288 289
			} else if err := c.MetaTable.AddSegment(seg); err != nil {
				//what if master add segment failed, but data service success?
N
neza2017 已提交
290
				log.Warn("add segment info meta table failed ", zap.String("error", err.Error()))
N
neza2017 已提交
291
			} else {
S
sunby 已提交
292
				log.Debug("add segment", zap.Int64("collection id", seg.CollectionID), zap.Int64("partition id", seg.PartitionID), zap.Int64("segment id", seg.ID))
N
neza2017 已提交
293 294 295 296 297 298 299 300 301 302
			}
		}
	}
}

//create index loop
func (c *Core) startCreateIndexLoop() {
	for {
		select {
		case <-c.ctx.Done():
B
bigsheeper 已提交
303
			log.Debug("close create index loop")
N
neza2017 已提交
304 305 306
			return
		case t, ok := <-c.indexTaskQueue:
			if !ok {
B
bigsheeper 已提交
307
				log.Debug("index task chan has closed, exit loop")
N
neza2017 已提交
308 309 310
				return
			}
			if err := t.BuildIndex(); err != nil {
N
neza2017 已提交
311
				log.Warn("create index failed", zap.String("error", err.Error()))
N
neza2017 已提交
312
			} else {
N
neza2017 已提交
313
				log.Debug("create index", zap.String("index name", t.indexName), zap.String("field name", t.fieldSchema.Name), zap.Int64("segment id", t.segmentID))
N
neza2017 已提交
314 315 316 317 318
			}
		}
	}
}

319 320 321 322
func (c *Core) startSegmentFlushCompletedLoop() {
	for {
		select {
		case <-c.ctx.Done():
B
bigsheeper 已提交
323
			log.Debug("close segment flush completed loop")
324 325 326
			return
		case seg, ok := <-c.DataNodeSegmentFlushCompletedChan:
			if !ok {
B
bigsheeper 已提交
327
				log.Debug("data node segment flush completed chan has colsed, exit loop")
328
			}
329
			log.Debug("flush segment", zap.Int64("id", seg))
N
neza2017 已提交
330
			coll, err := c.MetaTable.GetCollectionBySegmentID(seg)
331
			if err != nil {
332
				log.Warn("GetCollectionBySegmentID error", zap.Error(err))
N
neza2017 已提交
333
				break
334
			}
335 336 337 338
			err = c.MetaTable.AddFlushedSegment(seg)
			if err != nil {
				log.Warn("AddFlushedSegment error", zap.Error(err))
			}
N
neza2017 已提交
339
			for _, f := range coll.FieldIndexes {
N
neza2017 已提交
340 341
				idxInfo, err := c.MetaTable.GetIndexByID(f.IndexID)
				if err != nil {
N
neza2017 已提交
342
					log.Warn("index not found", zap.Int64("index id", f.IndexID))
N
neza2017 已提交
343
					continue
N
neza2017 已提交
344 345
				}

N
neza2017 已提交
346 347 348
				fieldSch, err := GetFieldSchemaByID(coll, f.FiledID)
				if err == nil {
					t := &CreateIndexTask{
N
neza2017 已提交
349 350 351 352 353 354 355 356
						ctx:               c.ctx,
						core:              c,
						segmentID:         seg,
						indexName:         idxInfo.IndexName,
						indexID:           idxInfo.IndexID,
						fieldSchema:       fieldSch,
						indexParams:       idxInfo.IndexParams,
						isFromFlushedChan: true,
N
neza2017 已提交
357 358
					}
					c.indexTaskQueue <- t
359 360 361 362 363 364
				}
			}
		}
	}
}

365
func (c *Core) tsLoop() {
366
	tsoTicker := time.NewTicker(tso.UpdateTimestampStep)
367 368 369 370 371 372
	defer tsoTicker.Stop()
	ctx, cancel := context.WithCancel(c.ctx)
	defer cancel()
	for {
		select {
		case <-tsoTicker.C:
N
neza2017 已提交
373
			if err := c.tsoAllocatorUpdate(); err != nil {
N
neza2017 已提交
374 375
				log.Warn("failed to update timestamp: ", zap.Error(err))
				continue
376
			}
N
neza2017 已提交
377
			if err := c.idAllocatorUpdate(); err != nil {
N
neza2017 已提交
378 379
				log.Warn("failed to update id: ", zap.Error(err))
				continue
380 381 382
			}
		case <-ctx.Done():
			// Server is closed and it should return nil.
B
bigsheeper 已提交
383
			log.Debug("tsLoop is closed")
384 385 386 387
			return
		}
	}
}
Z
zhenshan.cao 已提交
388
func (c *Core) setMsgStreams() error {
N
neza2017 已提交
389
	if Params.PulsarAddress == "" {
N
neza2017 已提交
390
		return fmt.Errorf("PulsarAddress is empty")
N
neza2017 已提交
391 392
	}
	if Params.MsgChannelSubName == "" {
N
neza2017 已提交
393
		return fmt.Errorf("MsgChannelSubName is emptyr")
N
neza2017 已提交
394 395
	}

Z
zhenshan.cao 已提交
396
	//proxy time tick stream,
N
neza2017 已提交
397
	if Params.ProxyTimeTickChannel == "" {
N
neza2017 已提交
398
		return fmt.Errorf("ProxyTimeTickChannel is empty")
N
neza2017 已提交
399
	}
Z
zhenshan.cao 已提交
400

G
groot 已提交
401 402 403 404 405 406 407 408 409 410 411
	var err error
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = c.msFactory.SetParams(m)
	if err != nil {
		return err
	}

	proxyTimeTickStream, _ := c.msFactory.NewMsgStream(c.ctx)
Z
zhenshan.cao 已提交
412
	proxyTimeTickStream.AsConsumer([]string{Params.ProxyTimeTickChannel}, Params.MsgChannelSubName)
X
Xiangyu Wang 已提交
413
	log.Debug("master AsConsumer: " + Params.ProxyTimeTickChannel + " : " + Params.MsgChannelSubName)
Z
zhenshan.cao 已提交
414 415 416
	proxyTimeTickStream.Start()

	// master time tick channel
N
neza2017 已提交
417
	if Params.TimeTickChannel == "" {
N
neza2017 已提交
418
		return fmt.Errorf("TimeTickChannel is empty")
N
neza2017 已提交
419
	}
G
groot 已提交
420
	timeTickStream, _ := c.msFactory.NewMsgStream(c.ctx)
Z
zhenshan.cao 已提交
421
	timeTickStream.AsProducer([]string{Params.TimeTickChannel})
X
Xiangyu Wang 已提交
422
	log.Debug("masterservice AsProducer: " + Params.TimeTickChannel)
Z
zhenshan.cao 已提交
423 424

	// master dd channel
N
neza2017 已提交
425
	if Params.DdChannel == "" {
N
neza2017 已提交
426
		return fmt.Errorf("DdChannel is empty")
N
neza2017 已提交
427
	}
G
groot 已提交
428
	ddStream, _ := c.msFactory.NewMsgStream(c.ctx)
Z
zhenshan.cao 已提交
429
	ddStream.AsProducer([]string{Params.DdChannel})
X
Xiangyu Wang 已提交
430
	log.Debug("masterservice AsProducer: " + Params.DdChannel)
Z
zhenshan.cao 已提交
431 432 433 434 435 436 437 438

	c.SendTimeTick = func(t typeutil.Timestamp) error {
		msgPack := ms.MsgPack{}
		baseMsg := ms.BaseMsg{
			BeginTimestamp: t,
			EndTimestamp:   t,
			HashValues:     []uint32{0},
		}
G
godchen 已提交
439
		timeTickResult := internalpb.TimeTickMsg{
Z
zhenshan.cao 已提交
440
			Base: &commonpb.MsgBase{
441
				MsgType:   commonpb.MsgType_TimeTick,
Z
zhenshan.cao 已提交
442 443 444 445 446 447 448 449 450 451
				MsgID:     0,
				Timestamp: t,
				SourceID:  int64(Params.NodeID),
			},
		}
		timeTickMsg := &ms.TimeTickMsg{
			BaseMsg:     baseMsg,
			TimeTickMsg: timeTickResult,
		}
		msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
452
		if err := timeTickStream.Broadcast(&msgPack); err != nil {
Z
zhenshan.cao 已提交
453 454
			return err
		}
455
		if err := ddStream.Broadcast(&msgPack); err != nil {
Z
zhenshan.cao 已提交
456 457 458 459 460
			return err
		}
		return nil
	}

G
groot 已提交
461
	c.DdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest) error {
Z
zhenshan.cao 已提交
462 463
		msgPack := ms.MsgPack{}
		baseMsg := ms.BaseMsg{
464
			Ctx:            ctx,
Z
zhenshan.cao 已提交
465 466 467 468 469 470 471 472 473
			BeginTimestamp: req.Base.Timestamp,
			EndTimestamp:   req.Base.Timestamp,
			HashValues:     []uint32{0},
		}
		collMsg := &ms.CreateCollectionMsg{
			BaseMsg:                 baseMsg,
			CreateCollectionRequest: *req,
		}
		msgPack.Msgs = append(msgPack.Msgs, collMsg)
474
		if err := ddStream.Broadcast(&msgPack); err != nil {
Z
zhenshan.cao 已提交
475 476 477 478 479
			return err
		}
		return nil
	}

G
groot 已提交
480
	c.DdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest) error {
Z
zhenshan.cao 已提交
481 482
		msgPack := ms.MsgPack{}
		baseMsg := ms.BaseMsg{
483
			Ctx:            ctx,
Z
zhenshan.cao 已提交
484 485 486 487 488 489 490 491 492
			BeginTimestamp: req.Base.Timestamp,
			EndTimestamp:   req.Base.Timestamp,
			HashValues:     []uint32{0},
		}
		collMsg := &ms.DropCollectionMsg{
			BaseMsg:               baseMsg,
			DropCollectionRequest: *req,
		}
		msgPack.Msgs = append(msgPack.Msgs, collMsg)
493
		if err := ddStream.Broadcast(&msgPack); err != nil {
Z
zhenshan.cao 已提交
494 495 496 497 498
			return err
		}
		return nil
	}

G
groot 已提交
499
	c.DdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest) error {
Z
zhenshan.cao 已提交
500 501
		msgPack := ms.MsgPack{}
		baseMsg := ms.BaseMsg{
502
			Ctx:            ctx,
Z
zhenshan.cao 已提交
503 504 505 506 507 508 509 510 511
			BeginTimestamp: req.Base.Timestamp,
			EndTimestamp:   req.Base.Timestamp,
			HashValues:     []uint32{0},
		}
		collMsg := &ms.CreatePartitionMsg{
			BaseMsg:                baseMsg,
			CreatePartitionRequest: *req,
		}
		msgPack.Msgs = append(msgPack.Msgs, collMsg)
512
		if err := ddStream.Broadcast(&msgPack); err != nil {
Z
zhenshan.cao 已提交
513 514 515 516 517
			return err
		}
		return nil
	}

G
groot 已提交
518
	c.DdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest) error {
Z
zhenshan.cao 已提交
519 520
		msgPack := ms.MsgPack{}
		baseMsg := ms.BaseMsg{
521
			Ctx:            ctx,
Z
zhenshan.cao 已提交
522 523 524 525 526 527 528 529 530
			BeginTimestamp: req.Base.Timestamp,
			EndTimestamp:   req.Base.Timestamp,
			HashValues:     []uint32{0},
		}
		collMsg := &ms.DropPartitionMsg{
			BaseMsg:              baseMsg,
			DropPartitionRequest: *req,
		}
		msgPack.Msgs = append(msgPack.Msgs, collMsg)
531
		if err := ddStream.Broadcast(&msgPack); err != nil {
Z
zhenshan.cao 已提交
532 533 534 535 536 537
			return err
		}
		return nil
	}

	// receive time tick from msg stream
538
	c.ProxyTimeTickChan = make(chan typeutil.Timestamp, 1024)
Z
zhenshan.cao 已提交
539 540 541 542 543 544 545
	go func() {
		for {
			select {
			case <-c.ctx.Done():
				return
			case ttmsgs, ok := <-proxyTimeTickStream.Chan():
				if !ok {
N
neza2017 已提交
546
					log.Warn("proxy time tick msg stream closed")
Z
zhenshan.cao 已提交
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562
					return
				}
				if len(ttmsgs.Msgs) > 0 {
					for _, ttm := range ttmsgs.Msgs {
						ttmsg, ok := ttm.(*ms.TimeTickMsg)
						if !ok {
							continue
						}
						c.ProxyTimeTickChan <- ttmsg.Base.Timestamp
					}
				}
			}
		}
	}()

	//segment channel, data service create segment,or data node flush segment will put msg in this channel
N
neza2017 已提交
563
	if Params.DataServiceSegmentChannel == "" {
N
neza2017 已提交
564
		return fmt.Errorf("DataServiceSegmentChannel is empty")
N
neza2017 已提交
565
	}
G
groot 已提交
566
	dataServiceStream, _ := c.msFactory.NewMsgStream(c.ctx)
Z
zhenshan.cao 已提交
567
	dataServiceStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, Params.MsgChannelSubName)
X
Xiangyu Wang 已提交
568
	log.Debug("master AsConsumer: " + Params.DataServiceSegmentChannel + " : " + Params.MsgChannelSubName)
Z
zhenshan.cao 已提交
569
	dataServiceStream.Start()
N
neza2017 已提交
570
	c.DataServiceSegmentChan = make(chan *datapb.SegmentInfo, 1024)
571
	c.DataNodeSegmentFlushCompletedChan = make(chan typeutil.UniqueID, 1024)
Z
zhenshan.cao 已提交
572 573 574 575 576 577 578 579 580

	// receive segment info from msg stream
	go func() {
		for {
			select {
			case <-c.ctx.Done():
				return
			case segMsg, ok := <-dataServiceStream.Chan():
				if !ok {
N
neza2017 已提交
581
					log.Warn("data service segment msg closed")
Z
zhenshan.cao 已提交
582 583 584 585 586
				}
				if len(segMsg.Msgs) > 0 {
					for _, segm := range segMsg.Msgs {
						segInfoMsg, ok := segm.(*ms.SegmentInfoMsg)
						if ok {
N
neza2017 已提交
587
							c.DataServiceSegmentChan <- segInfoMsg.Segment
588
						} else {
N
neza2017 已提交
589
							flushMsg, ok := segm.(*ms.FlushCompletedMsg)
590 591 592
							if ok {
								c.DataNodeSegmentFlushCompletedChan <- flushMsg.SegmentFlushCompletedMsg.SegmentID
							} else {
N
neza2017 已提交
593
								log.Debug("receive unexpected msg from data service stream", zap.Stringer("segment", segInfoMsg.SegmentMsg.Segment))
594
							}
Z
zhenshan.cao 已提交
595 596 597 598 599 600 601 602 603 604
						}
					}
				}
			}
		}
	}()

	return nil
}

N
neza2017 已提交
605
func (c *Core) SetProxyService(ctx context.Context, s types.ProxyService) error {
G
godchen 已提交
606
	rsp, err := s.GetTimeTickChannel(ctx)
N
neza2017 已提交
607 608 609
	if err != nil {
		return err
	}
G
godchen 已提交
610
	Params.ProxyTimeTickChannel = rsp.Value
B
bigsheeper 已提交
611
	log.Debug("proxy time tick", zap.String("channel name", Params.ProxyTimeTickChannel))
N
neza2017 已提交
612

G
groot 已提交
613
	c.InvalidateCollectionMetaCache = func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error {
G
godchen 已提交
614
		status, _ := s.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{
N
neza2017 已提交
615 616 617 618 619 620 621 622 623
			Base: &commonpb.MsgBase{
				MsgType:   0, //TODO,MsgType
				MsgID:     0,
				Timestamp: ts,
				SourceID:  int64(Params.NodeID),
			},
			DbName:         dbName,
			CollectionName: collectionName,
		})
G
godchen 已提交
624
		if status == nil {
N
neza2017 已提交
625
			return fmt.Errorf("invalidate collection metacache resp is nil")
G
godchen 已提交
626
		}
627
		if status.ErrorCode != commonpb.ErrorCode_Success {
N
neza2017 已提交
628
			return fmt.Errorf(status.Reason)
N
neza2017 已提交
629 630 631 632 633 634
		}
		return nil
	}
	return nil
}

N
neza2017 已提交
635
func (c *Core) SetDataService(ctx context.Context, s types.DataService) error {
G
godchen 已提交
636
	rsp, err := s.GetSegmentInfoChannel(ctx)
N
neza2017 已提交
637 638 639
	if err != nil {
		return err
	}
G
godchen 已提交
640
	Params.DataServiceSegmentChannel = rsp.Value
B
bigsheeper 已提交
641
	log.Debug("data service segment", zap.String("channel name", Params.DataServiceSegmentChannel))
X
Xiangyu Wang 已提交
642

N
neza2017 已提交
643
	c.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) {
N
neza2017 已提交
644
		ts, err := c.tsoAllocator(1)
N
neza2017 已提交
645 646 647
		if err != nil {
			return nil, err
		}
G
godchen 已提交
648
		binlog, err := s.GetInsertBinlogPaths(ctx, &datapb.GetInsertBinlogPathsRequest{
N
neza2017 已提交
649
			Base: &commonpb.MsgBase{
650
				MsgType:   0, //TODO, msg type
N
neza2017 已提交
651 652 653 654 655 656 657 658 659
				MsgID:     0,
				Timestamp: ts,
				SourceID:  int64(Params.NodeID),
			},
			SegmentID: segID,
		})
		if err != nil {
			return nil, err
		}
660
		if binlog.Status.ErrorCode != commonpb.ErrorCode_Success {
S
sunby 已提交
661
			return nil, fmt.Errorf("GetInsertBinlogPaths from data service failed, error = %s", binlog.Status.Reason)
N
neza2017 已提交
662 663 664 665 666 667
		}
		for i := range binlog.FieldIDs {
			if binlog.FieldIDs[i] == fieldID {
				return binlog.Paths[i].Values, nil
			}
		}
S
sunby 已提交
668
		return nil, fmt.Errorf("binlog file not exist, segment id = %d, field id = %d", segID, fieldID)
N
neza2017 已提交
669
	}
670

N
neza2017 已提交
671
	c.GetNumRowsReq = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) {
N
neza2017 已提交
672
		ts, err := c.tsoAllocator(1)
673 674 675
		if err != nil {
			return 0, err
		}
G
godchen 已提交
676
		segInfo, err := s.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
677 678 679 680 681 682 683 684 685 686 687
			Base: &commonpb.MsgBase{
				MsgType:   0, //TODO, msg type
				MsgID:     0,
				Timestamp: ts,
				SourceID:  int64(Params.NodeID),
			},
			SegmentIDs: []typeutil.UniqueID{segID},
		})
		if err != nil {
			return 0, err
		}
688
		if segInfo.Status.ErrorCode != commonpb.ErrorCode_Success {
689 690 691 692 693 694
			return 0, fmt.Errorf("GetSegmentInfo from data service failed, error = %s", segInfo.Status.Reason)
		}
		if len(segInfo.Infos) != 1 {
			log.Debug("get segment info empty")
			return 0, nil
		}
N
neza2017 已提交
695
		if !isFromFlushedChan && segInfo.Infos[0].State != commonpb.SegmentState_Flushed {
696 697 698 699 700
			log.Debug("segment id not flushed", zap.Int64("segment id", segID))
			return 0, nil
		}
		return segInfo.Infos[0].NumRows, nil
	}
N
neza2017 已提交
701 702 703
	return nil
}

G
groot 已提交
704 705
func (c *Core) SetIndexService(s types.IndexService) error {
	c.BuildIndexReq = func(ctx context.Context, binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) {
G
godchen 已提交
706
		rsp, err := s.BuildIndex(ctx, &indexpb.BuildIndexRequest{
N
neza2017 已提交
707 708 709
			DataPaths:   binlog,
			TypeParams:  typeParams,
			IndexParams: indexParams,
B
bigsheeper 已提交
710 711
			IndexID:     indexID,
			IndexName:   indexName,
N
neza2017 已提交
712 713 714 715
		})
		if err != nil {
			return 0, err
		}
716
		if rsp.Status.ErrorCode != commonpb.ErrorCode_Success {
S
sunby 已提交
717
			return 0, fmt.Errorf("BuildIndex from index service failed, error = %s", rsp.Status.Reason)
N
neza2017 已提交
718
		}
719
		return rsp.IndexBuildID, nil
N
neza2017 已提交
720
	}
N
neza2017 已提交
721

G
groot 已提交
722
	c.DropIndexReq = func(ctx context.Context, indexID typeutil.UniqueID) error {
G
godchen 已提交
723
		rsp, err := s.DropIndex(ctx, &indexpb.DropIndexRequest{
N
neza2017 已提交
724 725 726 727 728
			IndexID: indexID,
		})
		if err != nil {
			return err
		}
729
		if rsp.ErrorCode != commonpb.ErrorCode_Success {
N
neza2017 已提交
730
			return fmt.Errorf(rsp.Reason)
N
neza2017 已提交
731 732 733 734
		}
		return nil
	}

N
neza2017 已提交
735 736 737
	return nil
}

G
groot 已提交
738 739
func (c *Core) SetQueryService(s types.QueryService) error {
	c.ReleaseCollection = func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error {
740 741
		req := &querypb.ReleaseCollectionRequest{
			Base: &commonpb.MsgBase{
742
				MsgType:   commonpb.MsgType_ReleaseCollection,
743 744 745 746 747 748 749
				MsgID:     0, //TODO, msg ID
				Timestamp: ts,
				SourceID:  int64(Params.NodeID),
			},
			DbID:         dbID,
			CollectionID: collectionID,
		}
G
godchen 已提交
750
		rsp, err := s.ReleaseCollection(ctx, req)
751 752 753
		if err != nil {
			return err
		}
754
		if rsp.ErrorCode != commonpb.ErrorCode_Success {
S
sunby 已提交
755
			return fmt.Errorf("ReleaseCollection from query service failed, error = %s", rsp.Reason)
756 757 758 759 760 761
		}
		return nil
	}
	return nil
}

N
neza2017 已提交
762
func (c *Core) Init() error {
763 764
	var initError error = nil
	c.initOnce.Do(func() {
765 766 767 768 769 770 771 772 773 774
		connectEtcdFn := func() error {
			if c.etcdCli, initError = clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second}); initError != nil {
				return initError
			}
			c.metaKV = etcdkv.NewEtcdKV(c.etcdCli, Params.MetaRootPath)
			if c.MetaTable, initError = NewMetaTable(c.metaKV); initError != nil {
				return initError
			}
			c.kvBase = etcdkv.NewEtcdKV(c.etcdCli, Params.KvRootPath)
			return nil
775
		}
Z
zhenshan.cao 已提交
776
		err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
777
		if err != nil {
778 779 780
			return
		}

N
neza2017 已提交
781 782
		idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "gid"))
		if initError = idAllocator.Initialize(); initError != nil {
783 784
			return
		}
N
neza2017 已提交
785 786 787 788 789 790 791 792 793
		c.idAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) {
			return idAllocator.Alloc(count)
		}
		c.idAllocatorUpdate = func() error {
			return idAllocator.UpdateID()
		}

		tsoAllocator := tso.NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "tso"))
		if initError = tsoAllocator.Initialize(); initError != nil {
794 795
			return
		}
N
neza2017 已提交
796 797 798 799 800 801 802
		c.tsoAllocator = func(count uint32) (typeutil.Timestamp, error) {
			return tsoAllocator.Alloc(count)
		}
		c.tsoAllocatorUpdate = func() error {
			return tsoAllocator.UpdateTSO()
		}

803
		c.ddReqQueue = make(chan reqTask, 1024)
N
neza2017 已提交
804
		c.indexTaskQueue = make(chan *CreateIndexTask, 1024)
Z
zhenshan.cao 已提交
805
		initError = c.setMsgStreams()
806
	})
N
neza2017 已提交
807
	if initError == nil {
G
godchen 已提交
808
		log.Debug("Master service", zap.String("State Code", internalpb.StateCode_name[int32(internalpb.StateCode_Initializing)]))
N
neza2017 已提交
809
	}
810 811 812 813 814 815 816
	return initError
}

func (c *Core) Start() error {
	if err := c.checkInit(); err != nil {
		return err
	}
N
neza2017 已提交
817 818 819 820 821

	log.Debug("master", zap.Int64("node id", int64(Params.NodeID)))
	log.Debug("master", zap.String("dd channel name", Params.DdChannel))
	log.Debug("master", zap.String("time tick channel name", Params.TimeTickChannel))

822 823 824
	c.startOnce.Do(func() {
		go c.startDdScheduler()
		go c.startTimeTickLoop()
N
neza2017 已提交
825 826
		go c.startDataServiceSegmentLoop()
		go c.startCreateIndexLoop()
827
		go c.startSegmentFlushCompletedLoop()
828
		go c.tsLoop()
G
godchen 已提交
829
		c.stateCode.Store(internalpb.StateCode_Healthy)
830
	})
G
godchen 已提交
831
	log.Debug("Master service", zap.String("State Code", internalpb.StateCode_name[int32(internalpb.StateCode_Healthy)]))
832 833 834 835 836
	return nil
}

func (c *Core) Stop() error {
	c.cancel()
G
godchen 已提交
837
	c.stateCode.Store(internalpb.StateCode_Abnormal)
838 839 840
	return nil
}

G
godchen 已提交
841 842 843
func (c *Core) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
	code := c.stateCode.Load().(internalpb.StateCode)
	log.Debug("GetComponentStates", zap.String("State Code", internalpb.StateCode_name[int32(code)]))
N
neza2017 已提交
844

G
godchen 已提交
845 846
	return &internalpb.ComponentStates{
		State: &internalpb.ComponentInfo{
847
			NodeID:    int64(Params.NodeID),
X
XuanYang-cn 已提交
848
			Role:      typeutil.MasterServiceRole,
849 850
			StateCode: code,
			ExtraInfo: nil,
851
		},
N
neza2017 已提交
852
		Status: &commonpb.Status{
853
			ErrorCode: commonpb.ErrorCode_Success,
N
neza2017 已提交
854 855
			Reason:    "",
		},
G
godchen 已提交
856
		SubcomponentStates: []*internalpb.ComponentInfo{
N
neza2017 已提交
857 858 859 860 861 862 863
			{
				NodeID:    int64(Params.NodeID),
				Role:      typeutil.MasterServiceRole,
				StateCode: code,
				ExtraInfo: nil,
			},
		},
864 865 866
	}, nil
}

G
godchen 已提交
867 868 869
func (c *Core) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
870
			ErrorCode: commonpb.ErrorCode_Success,
G
godchen 已提交
871 872 873 874
			Reason:    "",
		},
		Value: Params.TimeTickChannel,
	}, nil
875 876
}

G
godchen 已提交
877 878 879
func (c *Core) GetDdChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
880
			ErrorCode: commonpb.ErrorCode_Success,
G
godchen 已提交
881 882 883 884
			Reason:    "",
		},
		Value: Params.DdChannel,
	}, nil
885 886
}

G
godchen 已提交
887 888 889
func (c *Core) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
890
			ErrorCode: commonpb.ErrorCode_Success,
G
godchen 已提交
891 892 893 894
			Reason:    "",
		},
		Value: Params.StatisticsChannel,
	}, nil
895 896
}

G
godchen 已提交
897
func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
G
godchen 已提交
898 899
	code := c.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
900
		return &commonpb.Status{
901
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
G
godchen 已提交
902
			Reason:    fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
903 904
		}, nil
	}
N
neza2017 已提交
905
	log.Debug("CreateCollection ", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
906 907
	t := &CreateCollectionReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
908
			ctx:  ctx,
N
neza2017 已提交
909
			cv:   make(chan error, 1),
910 911 912 913 914 915 916
			core: c,
		},
		Req: in,
	}
	c.ddReqQueue <- t
	err := t.WaitToFinish()
	if err != nil {
N
neza2017 已提交
917
		log.Debug("CreateCollection failed", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
918
		return &commonpb.Status{
919
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
920 921 922
			Reason:    "Create collection failed: " + err.Error(),
		}, nil
	}
N
neza2017 已提交
923
	log.Debug("CreateCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
924
	return &commonpb.Status{
925
		ErrorCode: commonpb.ErrorCode_Success,
926 927 928 929
		Reason:    "",
	}, nil
}

G
godchen 已提交
930
func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
G
godchen 已提交
931 932
	code := c.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
933
		return &commonpb.Status{
934
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
G
godchen 已提交
935
			Reason:    fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
936 937
		}, nil
	}
N
neza2017 已提交
938
	log.Debug("DropCollection", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
939 940
	t := &DropCollectionReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
941
			ctx:  ctx,
N
neza2017 已提交
942
			cv:   make(chan error, 1),
943 944 945 946 947 948 949
			core: c,
		},
		Req: in,
	}
	c.ddReqQueue <- t
	err := t.WaitToFinish()
	if err != nil {
N
neza2017 已提交
950
		log.Debug("DropCollection Failed", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
951
		return &commonpb.Status{
952
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
G
godchen 已提交
953
			Reason:    "Drop collection failed: " + err.Error(),
954 955
		}, nil
	}
N
neza2017 已提交
956
	log.Debug("DropCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
957
	return &commonpb.Status{
958
		ErrorCode: commonpb.ErrorCode_Success,
959 960 961 962
		Reason:    "",
	}, nil
}

G
godchen 已提交
963
func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
G
godchen 已提交
964 965
	code := c.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
966 967
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
968
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
G
godchen 已提交
969
				Reason:    fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
970 971 972 973
			},
			Value: false,
		}, nil
	}
N
neza2017 已提交
974
	log.Debug("HasCollection", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
975 976
	t := &HasCollectionReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
977
			ctx:  ctx,
N
neza2017 已提交
978
			cv:   make(chan error, 1),
979 980 981 982 983 984 985 986
			core: c,
		},
		Req:           in,
		HasCollection: false,
	}
	c.ddReqQueue <- t
	err := t.WaitToFinish()
	if err != nil {
N
neza2017 已提交
987
		log.Debug("HasCollection Failed", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
988 989
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
990
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
991 992 993 994 995
				Reason:    "Has collection failed: " + err.Error(),
			},
			Value: false,
		}, nil
	}
N
neza2017 已提交
996
	log.Debug("HasCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
997 998
	return &milvuspb.BoolResponse{
		Status: &commonpb.Status{
999
			ErrorCode: commonpb.ErrorCode_Success,
1000 1001 1002 1003 1004 1005
			Reason:    "",
		},
		Value: t.HasCollection,
	}, nil
}

G
godchen 已提交
1006
func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
G
godchen 已提交
1007 1008
	code := c.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
1009 1010
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
1011
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
G
godchen 已提交
1012
				Reason:    fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
1013 1014 1015 1016 1017
			},
			Schema:       nil,
			CollectionID: 0,
		}, nil
	}
N
neza2017 已提交
1018
	log.Debug("DescribeCollection", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
1019 1020
	t := &DescribeCollectionReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1021
			ctx:  ctx,
N
neza2017 已提交
1022
			cv:   make(chan error, 1),
1023 1024 1025 1026 1027 1028 1029 1030
			core: c,
		},
		Req: in,
		Rsp: &milvuspb.DescribeCollectionResponse{},
	}
	c.ddReqQueue <- t
	err := t.WaitToFinish()
	if err != nil {
N
neza2017 已提交
1031
		log.Debug("DescribeCollection Failed", zap.String("name", in.CollectionName), zap.Error(err), zap.Int64("msgID", in.Base.MsgID))
1032 1033
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
1034
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1035 1036 1037 1038 1039
				Reason:    "describe collection failed: " + err.Error(),
			},
			Schema: nil,
		}, nil
	}
N
neza2017 已提交
1040
	log.Debug("DescribeCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
1041
	t.Rsp.Status = &commonpb.Status{
1042
		ErrorCode: commonpb.ErrorCode_Success,
1043 1044 1045 1046 1047
		Reason:    "",
	}
	return t.Rsp, nil
}

G
godchen 已提交
1048 1049 1050 1051
func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
	code := c.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
		return &milvuspb.ShowCollectionsResponse{
1052
			Status: &commonpb.Status{
1053
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
G
godchen 已提交
1054
				Reason:    fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
1055 1056 1057 1058
			},
			CollectionNames: nil,
		}, nil
	}
N
neza2017 已提交
1059
	log.Debug("ShowCollections", zap.String("dbname", in.DbName), zap.Int64("msgID", in.Base.MsgID))
1060 1061
	t := &ShowCollectionReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1062
			ctx:  ctx,
N
neza2017 已提交
1063
			cv:   make(chan error, 1),
1064 1065 1066
			core: c,
		},
		Req: in,
G
godchen 已提交
1067
		Rsp: &milvuspb.ShowCollectionsResponse{
1068 1069 1070 1071 1072 1073
			CollectionNames: nil,
		},
	}
	c.ddReqQueue <- t
	err := t.WaitToFinish()
	if err != nil {
N
neza2017 已提交
1074
		log.Debug("ShowCollections failed", zap.String("dbname", in.DbName), zap.Int64("msgID", in.Base.MsgID))
G
godchen 已提交
1075
		return &milvuspb.ShowCollectionsResponse{
1076 1077
			CollectionNames: nil,
			Status: &commonpb.Status{
1078
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1079 1080 1081 1082
				Reason:    "ShowCollections failed: " + err.Error(),
			},
		}, nil
	}
N
neza2017 已提交
1083
	log.Debug("ShowCollections Success", zap.String("dbname", in.DbName), zap.Strings("collection names", t.Rsp.CollectionNames), zap.Int64("msgID", in.Base.MsgID))
1084
	t.Rsp.Status = &commonpb.Status{
1085
		ErrorCode: commonpb.ErrorCode_Success,
1086 1087 1088 1089 1090
		Reason:    "",
	}
	return t.Rsp, nil
}

G
godchen 已提交
1091
func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
G
godchen 已提交
1092 1093
	code := c.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
1094
		return &commonpb.Status{
1095
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
G
godchen 已提交
1096
			Reason:    fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
1097 1098
		}, nil
	}
N
neza2017 已提交
1099
	log.Debug("CreatePartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
1100 1101
	t := &CreatePartitionReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1102
			ctx:  ctx,
N
neza2017 已提交
1103
			cv:   make(chan error, 1),
1104 1105 1106 1107 1108 1109 1110
			core: c,
		},
		Req: in,
	}
	c.ddReqQueue <- t
	err := t.WaitToFinish()
	if err != nil {
N
neza2017 已提交
1111
		log.Debug("CreatePartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
1112
		return &commonpb.Status{
1113
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1114 1115 1116
			Reason:    "create partition failed: " + err.Error(),
		}, nil
	}
N
neza2017 已提交
1117
	log.Debug("CreatePartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
1118
	return &commonpb.Status{
1119
		ErrorCode: commonpb.ErrorCode_Success,
1120 1121 1122 1123
		Reason:    "",
	}, nil
}

G
godchen 已提交
1124
func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
G
godchen 已提交
1125 1126
	code := c.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
1127
		return &commonpb.Status{
1128
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
G
godchen 已提交
1129
			Reason:    fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
1130 1131
		}, nil
	}
N
neza2017 已提交
1132
	log.Debug("DropPartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
1133 1134
	t := &DropPartitionReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1135
			ctx:  ctx,
N
neza2017 已提交
1136
			cv:   make(chan error, 1),
1137 1138 1139 1140 1141 1142 1143
			core: c,
		},
		Req: in,
	}
	c.ddReqQueue <- t
	err := t.WaitToFinish()
	if err != nil {
N
neza2017 已提交
1144
		log.Debug("DropPartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
1145
		return &commonpb.Status{
1146
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1147 1148 1149
			Reason:    "DropPartition failed: " + err.Error(),
		}, nil
	}
N
neza2017 已提交
1150
	log.Debug("DropPartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
1151
	return &commonpb.Status{
1152
		ErrorCode: commonpb.ErrorCode_Success,
1153 1154 1155 1156
		Reason:    "",
	}, nil
}

G
godchen 已提交
1157
func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
G
godchen 已提交
1158 1159
	code := c.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
1160 1161
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1162
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
G
godchen 已提交
1163
				Reason:    fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
1164 1165 1166 1167
			},
			Value: false,
		}, nil
	}
N
neza2017 已提交
1168
	log.Debug("HasPartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
1169 1170
	t := &HasPartitionReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1171
			ctx:  ctx,
N
neza2017 已提交
1172
			cv:   make(chan error, 1),
1173 1174 1175 1176 1177 1178 1179 1180
			core: c,
		},
		Req:          in,
		HasPartition: false,
	}
	c.ddReqQueue <- t
	err := t.WaitToFinish()
	if err != nil {
N
neza2017 已提交
1181
		log.Debug("HasPartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
1182 1183
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1184
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1185 1186 1187 1188 1189
				Reason:    "HasPartition failed: " + err.Error(),
			},
			Value: false,
		}, nil
	}
N
neza2017 已提交
1190
	log.Debug("HasPartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
1191 1192
	return &milvuspb.BoolResponse{
		Status: &commonpb.Status{
1193
			ErrorCode: commonpb.ErrorCode_Success,
1194 1195 1196 1197 1198 1199
			Reason:    "",
		},
		Value: t.HasPartition,
	}, nil
}

G
godchen 已提交
1200
func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
S
sunby 已提交
1201 1202
	log.Debug("ShowPartitionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", in.Base.MsgID),
		zap.String("collection", in.CollectionName))
G
godchen 已提交
1203 1204
	code := c.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
S
sunby 已提交
1205 1206
		log.Error("ShowPartitionRequest failed: master is not healthy", zap.String("role", Params.RoleName),
			zap.Int64("msgID", in.Base.MsgID), zap.String("state", internalpb.StateCode_name[int32(code)]))
G
godchen 已提交
1207
		return &milvuspb.ShowPartitionsResponse{
1208
			Status: &commonpb.Status{
1209
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
S
sunby 已提交
1210
				Reason:    fmt.Sprintf("master is not healthy, state code = %s", internalpb.StateCode_name[int32(code)]),
1211 1212 1213 1214 1215
			},
			PartitionNames: nil,
			PartitionIDs:   nil,
		}, nil
	}
1216 1217
	t := &ShowPartitionReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1218
			ctx:  ctx,
N
neza2017 已提交
1219
			cv:   make(chan error, 1),
1220 1221 1222
			core: c,
		},
		Req: in,
G
godchen 已提交
1223
		Rsp: &milvuspb.ShowPartitionsResponse{
1224 1225 1226 1227 1228 1229 1230
			PartitionNames: nil,
			Status:         nil,
		},
	}
	c.ddReqQueue <- t
	err := t.WaitToFinish()
	if err != nil {
S
sunby 已提交
1231
		log.Error("ShowPartitionsRequest failed", zap.String("role", Params.RoleName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
G
godchen 已提交
1232
		return &milvuspb.ShowPartitionsResponse{
1233 1234
			PartitionNames: nil,
			Status: &commonpb.Status{
1235
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
S
sunby 已提交
1236
				Reason:    err.Error(),
1237 1238 1239
			},
		}, nil
	}
S
sunby 已提交
1240 1241 1242
	log.Debug("ShowPartitions succeed", zap.String("role", Params.RoleName), zap.Int64("msgID", t.Req.Base.MsgID),
		zap.String("collection name", in.CollectionName), zap.Strings("partition names", t.Rsp.PartitionNames),
		zap.Int64s("partition ids", t.Rsp.PartitionIDs))
1243
	t.Rsp.Status = &commonpb.Status{
1244
		ErrorCode: commonpb.ErrorCode_Success,
1245 1246 1247 1248 1249
		Reason:    "",
	}
	return t.Rsp, nil
}

G
godchen 已提交
1250
func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
G
godchen 已提交
1251 1252
	code := c.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
1253
		return &commonpb.Status{
1254
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
G
godchen 已提交
1255
			Reason:    fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
1256 1257
		}, nil
	}
N
neza2017 已提交
1258
	log.Debug("CreateIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID))
N
neza2017 已提交
1259 1260
	t := &CreateIndexReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1261
			ctx:  ctx,
N
neza2017 已提交
1262
			cv:   make(chan error, 1),
N
neza2017 已提交
1263 1264 1265 1266 1267 1268 1269
			core: c,
		},
		Req: in,
	}
	c.ddReqQueue <- t
	err := t.WaitToFinish()
	if err != nil {
N
neza2017 已提交
1270
		log.Debug("CreateIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID))
N
neza2017 已提交
1271
		return &commonpb.Status{
1272
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
N
neza2017 已提交
1273 1274 1275
			Reason:    "CreateIndex failed, error = " + err.Error(),
		}, nil
	}
N
neza2017 已提交
1276
	log.Debug("CreateIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID))
N
neza2017 已提交
1277
	return &commonpb.Status{
1278
		ErrorCode: commonpb.ErrorCode_Success,
N
neza2017 已提交
1279 1280
		Reason:    "",
	}, nil
1281 1282
}

G
godchen 已提交
1283
func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
G
godchen 已提交
1284 1285
	code := c.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
1286 1287
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
1288
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
G
godchen 已提交
1289
				Reason:    fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
1290 1291 1292 1293
			},
			IndexDescriptions: nil,
		}, nil
	}
N
neza2017 已提交
1294
	log.Debug("DescribeIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID))
N
neza2017 已提交
1295 1296
	t := &DescribeIndexReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1297
			ctx:  ctx,
N
neza2017 已提交
1298
			cv:   make(chan error, 1),
N
neza2017 已提交
1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309
			core: c,
		},
		Req: in,
		Rsp: &milvuspb.DescribeIndexResponse{
			Status:            nil,
			IndexDescriptions: nil,
		},
	}
	c.ddReqQueue <- t
	err := t.WaitToFinish()
	if err != nil {
N
neza2017 已提交
1310
		log.Debug("DescribeIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID))
N
neza2017 已提交
1311 1312
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
1313
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
N
neza2017 已提交
1314 1315 1316 1317 1318
				Reason:    "DescribeIndex failed, error = " + err.Error(),
			},
			IndexDescriptions: nil,
		}, nil
	}
N
neza2017 已提交
1319 1320 1321 1322
	idxNames := make([]string, 0, len(t.Rsp.IndexDescriptions))
	for _, i := range t.Rsp.IndexDescriptions {
		idxNames = append(idxNames, i.IndexName)
	}
N
neza2017 已提交
1323
	log.Debug("DescribeIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Strings("index names", idxNames), zap.Int64("msgID", in.Base.MsgID))
N
neza2017 已提交
1324 1325
	if len(t.Rsp.IndexDescriptions) == 0 {
		t.Rsp.Status = &commonpb.Status{
1326
			ErrorCode: commonpb.ErrorCode_IndexNotExist,
N
neza2017 已提交
1327 1328 1329 1330
			Reason:    "index not exist",
		}
	} else {
		t.Rsp.Status = &commonpb.Status{
1331
			ErrorCode: commonpb.ErrorCode_Success,
N
neza2017 已提交
1332 1333
			Reason:    "",
		}
N
neza2017 已提交
1334 1335
	}
	return t.Rsp, nil
1336 1337
}

G
godchen 已提交
1338
func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
G
godchen 已提交
1339 1340
	code := c.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
N
neza2017 已提交
1341
		return &commonpb.Status{
1342
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
G
godchen 已提交
1343
			Reason:    fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
N
neza2017 已提交
1344 1345
		}, nil
	}
N
neza2017 已提交
1346
	log.Debug("DropIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID))
N
neza2017 已提交
1347 1348
	t := &DropIndexReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1349
			ctx:  ctx,
N
neza2017 已提交
1350
			cv:   make(chan error, 1),
N
neza2017 已提交
1351 1352 1353 1354 1355 1356 1357
			core: c,
		},
		Req: in,
	}
	c.ddReqQueue <- t
	err := t.WaitToFinish()
	if err != nil {
N
neza2017 已提交
1358
		log.Debug("DropIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID))
N
neza2017 已提交
1359
		return &commonpb.Status{
1360
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
1361
			Reason:    "DropIndex failed, error = " + err.Error(),
N
neza2017 已提交
1362 1363
		}, nil
	}
N
neza2017 已提交
1364
	log.Debug("DropIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID))
N
neza2017 已提交
1365
	return &commonpb.Status{
1366
		ErrorCode: commonpb.ErrorCode_Success,
N
neza2017 已提交
1367 1368 1369 1370
		Reason:    "",
	}, nil
}

G
godchen 已提交
1371
func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
G
godchen 已提交
1372 1373
	code := c.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
1374 1375
		return &milvuspb.DescribeSegmentResponse{
			Status: &commonpb.Status{
1376
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
G
godchen 已提交
1377
				Reason:    fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
1378 1379 1380 1381
			},
			IndexID: 0,
		}, nil
	}
N
neza2017 已提交
1382
	log.Debug("DescribeSegment", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID), zap.Int64("msgID", in.Base.MsgID))
N
neza2017 已提交
1383 1384
	t := &DescribeSegmentReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1385
			ctx:  ctx,
N
neza2017 已提交
1386
			cv:   make(chan error, 1),
N
neza2017 已提交
1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397
			core: c,
		},
		Req: in,
		Rsp: &milvuspb.DescribeSegmentResponse{
			Status:  nil,
			IndexID: 0,
		},
	}
	c.ddReqQueue <- t
	err := t.WaitToFinish()
	if err != nil {
N
neza2017 已提交
1398
		log.Debug("DescribeSegment Failed", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID), zap.Int64("msgID", in.Base.MsgID))
N
neza2017 已提交
1399 1400
		return &milvuspb.DescribeSegmentResponse{
			Status: &commonpb.Status{
1401
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
N
neza2017 已提交
1402 1403 1404 1405 1406
				Reason:    "DescribeSegment failed, error = " + err.Error(),
			},
			IndexID: 0,
		}, nil
	}
N
neza2017 已提交
1407
	log.Debug("DescribeSegment Success", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID), zap.Int64("msgID", in.Base.MsgID))
N
neza2017 已提交
1408
	t.Rsp.Status = &commonpb.Status{
1409
		ErrorCode: commonpb.ErrorCode_Success,
N
neza2017 已提交
1410 1411 1412
		Reason:    "",
	}
	return t.Rsp, nil
1413 1414
}

G
godchen 已提交
1415 1416 1417 1418
func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) {
	code := c.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
		return &milvuspb.ShowSegmentsResponse{
1419
			Status: &commonpb.Status{
1420
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
G
godchen 已提交
1421
				Reason:    fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
1422 1423 1424 1425
			},
			SegmentIDs: nil,
		}, nil
	}
N
neza2017 已提交
1426
	log.Debug("ShowSegments", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID), zap.Int64("msgID", in.Base.MsgID))
N
neza2017 已提交
1427 1428
	t := &ShowSegmentReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1429
			ctx:  ctx,
N
neza2017 已提交
1430
			cv:   make(chan error, 1),
N
neza2017 已提交
1431 1432 1433
			core: c,
		},
		Req: in,
G
godchen 已提交
1434
		Rsp: &milvuspb.ShowSegmentsResponse{
N
neza2017 已提交
1435 1436 1437 1438 1439 1440 1441
			Status:     nil,
			SegmentIDs: nil,
		},
	}
	c.ddReqQueue <- t
	err := t.WaitToFinish()
	if err != nil {
N
neza2017 已提交
1442
		log.Debug("ShowSegments Failed", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID), zap.Int64("msgID", in.Base.MsgID))
G
godchen 已提交
1443
		return &milvuspb.ShowSegmentsResponse{
N
neza2017 已提交
1444
			Status: &commonpb.Status{
1445
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
N
neza2017 已提交
1446 1447 1448 1449 1450
				Reason:    "ShowSegments failed, error: " + err.Error(),
			},
			SegmentIDs: nil,
		}, nil
	}
N
neza2017 已提交
1451
	log.Debug("ShowSegments Success", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID), zap.Int64s("segments ids", t.Rsp.SegmentIDs), zap.Int64("msgID", in.Base.MsgID))
N
neza2017 已提交
1452
	t.Rsp.Status = &commonpb.Status{
1453
		ErrorCode: commonpb.ErrorCode_Success,
N
neza2017 已提交
1454 1455 1456
		Reason:    "",
	}
	return t.Rsp, nil
1457 1458
}

G
godchen 已提交
1459
func (c *Core) AllocTimestamp(ctx context.Context, in *masterpb.AllocTimestampRequest) (*masterpb.AllocTimestampResponse, error) {
N
neza2017 已提交
1460
	ts, err := c.tsoAllocator(in.Count)
1461
	if err != nil {
N
neza2017 已提交
1462
		log.Debug("AllocTimestamp failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
G
godchen 已提交
1463
		return &masterpb.AllocTimestampResponse{
1464
			Status: &commonpb.Status{
1465
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1466 1467 1468 1469 1470 1471
				Reason:    "AllocTimestamp failed: " + err.Error(),
			},
			Timestamp: 0,
			Count:     0,
		}, nil
	}
Z
zhenshan.cao 已提交
1472
	// log.Printf("AllocTimestamp : %d", ts)
G
godchen 已提交
1473
	return &masterpb.AllocTimestampResponse{
1474
		Status: &commonpb.Status{
1475
			ErrorCode: commonpb.ErrorCode_Success,
1476 1477 1478 1479 1480 1481 1482
			Reason:    "",
		},
		Timestamp: ts,
		Count:     in.Count,
	}, nil
}

G
godchen 已提交
1483
func (c *Core) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*masterpb.AllocIDResponse, error) {
N
neza2017 已提交
1484
	start, _, err := c.idAllocator(in.Count)
1485
	if err != nil {
N
neza2017 已提交
1486
		log.Debug("AllocID failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
G
godchen 已提交
1487
		return &masterpb.AllocIDResponse{
1488
			Status: &commonpb.Status{
1489
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1490 1491 1492 1493 1494 1495
				Reason:    "AllocID failed: " + err.Error(),
			},
			ID:    0,
			Count: in.Count,
		}, nil
	}
N
neza2017 已提交
1496
	log.Debug("AllocID", zap.Int64("id start", start), zap.Uint32("count", in.Count))
G
godchen 已提交
1497
	return &masterpb.AllocIDResponse{
1498
		Status: &commonpb.Status{
1499
			ErrorCode: commonpb.ErrorCode_Success,
1500 1501 1502 1503 1504 1505
			Reason:    "",
		},
		ID:    start,
		Count: in.Count,
	}, nil
}