未验证 提交 a1d0e5c9 编写于 作者: N neza2017 提交者: GitHub

remove task scheduler (#6123)

Signed-off-by: Nyefu.chen <yefu.chen@zilliz.com>
上级 0d0c1901
......@@ -172,7 +172,10 @@ func TestGrpcService(t *testing.T) {
core.DataCoordSegmentChan = SegmentInfoChan
timeTickArray := make([]typeutil.Timestamp, 0, 16)
timeTickLock := sync.Mutex{}
core.SendTimeTick = func(ts typeutil.Timestamp) error {
timeTickLock.Lock()
defer timeTickLock.Unlock()
t.Logf("send time tick %d", ts)
timeTickArray = append(timeTickArray, ts)
return nil
......
......@@ -120,9 +120,6 @@ type Core struct {
CallReleaseCollectionService func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID) error
CallReleasePartitionService func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID, partitionIDs []typeutil.UniqueID) error
//dd request scheduler
ddReqQueue chan reqTask //dd request will be push into this chan
//dml channels
dmlChannels *dmlChannels
......@@ -192,9 +189,6 @@ func (c *Core) checkInit() error {
if c.kvBase == nil {
return fmt.Errorf("kvBase is nil")
}
if c.ddReqQueue == nil {
return fmt.Errorf("ddReqQueue is nil")
}
if c.SendDdCreateCollectionReq == nil {
return fmt.Errorf("SendDdCreateCollectionReq is nil")
}
......@@ -238,23 +232,6 @@ func (c *Core) checkInit() error {
return nil
}
func (c *Core) startDdScheduler() {
for {
select {
case <-c.ctx.Done():
log.Debug("close dd scheduler, exit task execution loop")
return
case task, ok := <-c.ddReqQueue:
if !ok {
log.Debug("dd chan is closed, exit task execution loop")
return
}
err := task.Execute(task.Ctx())
task.Notify(err)
}
}
}
func (c *Core) startTimeTickLoop() {
ticker := time.NewTicker(time.Duration(Params.TimeTickInterval) * time.Millisecond)
for {
......@@ -1043,7 +1020,6 @@ func (c *Core) Init() error {
c.proxyManager.AddSession(c.chanTimeTick.AddProxy, c.proxyClientManager.AddProxyClient)
c.proxyManager.DelSession(c.chanTimeTick.DelProxy, c.proxyClientManager.DelProxyClient)
c.ddReqQueue = make(chan reqTask, 1024)
initError = c.setMsgStreams()
})
if initError == nil {
......@@ -1188,7 +1164,6 @@ func (c *Core) Start() error {
log.Debug("RootCoord Start reSendDdMsg failed", zap.Error(err))
return
}
go c.startDdScheduler()
go c.startTimeTickLoop()
go c.startDataCoordSegmentLoop()
go c.startDataNodeFlushedSegmentLoop()
......@@ -1266,13 +1241,11 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti
t := &CreateCollectionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error, 1),
core: c,
},
Req: in,
}
c.ddReqQueue <- t
err := t.WaitToFinish()
err := executeTask(t)
if err != nil {
log.Debug("CreateCollection failed", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
return &commonpb.Status{
......@@ -1301,13 +1274,11 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe
t := &DropCollectionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error, 1),
core: c,
},
Req: in,
}
c.ddReqQueue <- t
err := t.WaitToFinish()
err := executeTask(t)
if err != nil {
log.Debug("DropCollection Failed", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
return &commonpb.Status{
......@@ -1339,14 +1310,12 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ
t := &HasCollectionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error, 1),
core: c,
},
Req: in,
HasCollection: false,
}
c.ddReqQueue <- t
err := t.WaitToFinish()
err := executeTask(t)
if err != nil {
log.Debug("HasCollection Failed", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
return &milvuspb.BoolResponse{
......@@ -1385,14 +1354,12 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl
t := &DescribeCollectionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error, 1),
core: c,
},
Req: in,
Rsp: &milvuspb.DescribeCollectionResponse{},
}
c.ddReqQueue <- t
err := t.WaitToFinish()
err := executeTask(t)
if err != nil {
log.Debug("DescribeCollection Failed", zap.String("name", in.CollectionName), zap.Error(err), zap.Int64("msgID", in.Base.MsgID))
return &milvuspb.DescribeCollectionResponse{
......@@ -1429,7 +1396,6 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections
t := &ShowCollectionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error, 1),
core: c,
},
Req: in,
......@@ -1438,8 +1404,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections
CollectionIds: nil,
},
}
c.ddReqQueue <- t
err := t.WaitToFinish()
err := executeTask(t)
if err != nil {
log.Debug("ShowCollections failed", zap.String("dbname", in.DbName), zap.Int64("msgID", in.Base.MsgID))
return &milvuspb.ShowCollectionsResponse{
......@@ -1472,13 +1437,11 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition
t := &CreatePartitionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error, 1),
core: c,
},
Req: in,
}
c.ddReqQueue <- t
err := t.WaitToFinish()
err := executeTask(t)
if err != nil {
log.Debug("CreatePartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
return &commonpb.Status{
......@@ -1507,13 +1470,11 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ
t := &DropPartitionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error, 1),
core: c,
},
Req: in,
}
c.ddReqQueue <- t
err := t.WaitToFinish()
err := executeTask(t)
if err != nil {
log.Debug("DropPartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
return &commonpb.Status{
......@@ -1545,14 +1506,12 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques
t := &HasPartitionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error, 1),
core: c,
},
Req: in,
HasPartition: false,
}
c.ddReqQueue <- t
err := t.WaitToFinish()
err := executeTask(t)
if err != nil {
log.Debug("HasPartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
return &milvuspb.BoolResponse{
......@@ -1594,7 +1553,6 @@ func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRe
t := &ShowPartitionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error, 1),
core: c,
},
Req: in,
......@@ -1603,8 +1561,7 @@ func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRe
Status: nil,
},
}
c.ddReqQueue <- t
err := t.WaitToFinish()
err := executeTask(t)
if err != nil {
log.Debug("ShowPartitionsRequest failed", zap.String("role", Params.RoleName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
return &milvuspb.ShowPartitionsResponse{
......@@ -1639,13 +1596,11 @@ func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest)
t := &CreateIndexReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error, 1),
core: c,
},
Req: in,
}
c.ddReqQueue <- t
err := t.WaitToFinish()
err := executeTask(t)
if err != nil {
log.Debug("CreateIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID))
return &commonpb.Status{
......@@ -1677,7 +1632,6 @@ func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequ
t := &DescribeIndexReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error, 1),
core: c,
},
Req: in,
......@@ -1686,8 +1640,7 @@ func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequ
IndexDescriptions: nil,
},
}
c.ddReqQueue <- t
err := t.WaitToFinish()
err := executeTask(t)
if err != nil {
log.Debug("DescribeIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID))
return &milvuspb.DescribeIndexResponse{
......@@ -1731,13 +1684,11 @@ func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*c
t := &DropIndexReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error, 1),
core: c,
},
Req: in,
}
c.ddReqQueue <- t
err := t.WaitToFinish()
err := executeTask(t)
if err != nil {
log.Debug("DropIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID))
return &commonpb.Status{
......@@ -1769,7 +1720,6 @@ func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegment
t := &DescribeSegmentReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error, 1),
core: c,
},
Req: in,
......@@ -1778,8 +1728,7 @@ func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegment
IndexID: 0,
},
}
c.ddReqQueue <- t
err := t.WaitToFinish()
err := executeTask(t)
if err != nil {
log.Debug("DescribeSegment Failed", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID), zap.Int64("msgID", in.Base.MsgID))
return &milvuspb.DescribeSegmentResponse{
......@@ -1815,7 +1764,6 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques
t := &ShowSegmentReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
cv: make(chan error, 1),
core: c,
},
Req: in,
......@@ -1824,8 +1772,7 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques
SegmentIDs: nil,
},
}
c.ddReqQueue <- t
err := t.WaitToFinish()
err := executeTask(t)
if err != nil {
log.Debug("ShowSegments Failed", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID), zap.Int64("msgID", in.Base.MsgID))
return &milvuspb.ShowSegmentsResponse{
......
......@@ -1445,6 +1445,7 @@ func TestRootCoord(t *testing.T) {
})
assert.Nil(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, rsp8.Status.ErrorCode)
time.Sleep(5 * time.Second)
})
......@@ -1499,6 +1500,7 @@ func TestRootCoord(t *testing.T) {
ts1 = uint64(120)
ts2 = uint64(150)
)
numChan := core.chanTimeTick.GetChanNum()
p1 := sessionutil.Session{
ServerID: 100,
}
......@@ -1561,8 +1563,8 @@ func TestRootCoord(t *testing.T) {
// 2 proxy, 1 rootcoord
assert.Equal(t, 3, core.chanTimeTick.GetProxyNum())
// 3 proxy channels, 2 rootcoord channels
assert.Equal(t, 5, core.chanTimeTick.GetChanNum())
// add 3 proxy channels
assert.Equal(t, 3, core.chanTimeTick.GetChanNum()-numChan)
})
err = core.Stop()
......@@ -1929,10 +1931,6 @@ func TestCheckInit(t *testing.T) {
err = c.checkInit()
assert.NotNil(t, err)
c.ddReqQueue = make(chan reqTask)
err = c.checkInit()
assert.NotNil(t, err)
c.SendDdCreateCollectionReq = func(context.Context, *internalpb.CreateCollectionRequest, []string) error {
return nil
}
......
......@@ -31,30 +31,35 @@ type reqTask interface {
Ctx() context.Context
Type() commonpb.MsgType
Execute(ctx context.Context) error
WaitToFinish() error
Notify(err error)
Core() *Core
}
type baseReqTask struct {
ctx context.Context
cv chan error
core *Core
}
func (bt *baseReqTask) Notify(err error) {
bt.cv <- err
func (b *baseReqTask) Core() *Core {
return b.core
}
func (bt *baseReqTask) WaitToFinish() error {
func (b *baseReqTask) Ctx() context.Context {
return b.ctx
}
func executeTask(t reqTask) error {
errChan := make(chan error)
go func() {
err := t.Execute(t.Ctx())
errChan <- err
}()
select {
case <-bt.core.ctx.Done():
return fmt.Errorf("core context done, %w", bt.core.ctx.Err())
case <-bt.ctx.Done():
return fmt.Errorf("request context done, %w", bt.ctx.Err())
case err, ok := <-bt.cv:
if !ok {
return fmt.Errorf("notify chan closed")
}
case <-t.Core().ctx.Done():
return fmt.Errorf("context canceled")
case <-t.Ctx().Done():
return fmt.Errorf("context canceled")
case err := <-errChan:
return err
}
}
......@@ -64,10 +69,6 @@ type CreateCollectionReqTask struct {
Req *milvuspb.CreateCollectionRequest
}
func (t *CreateCollectionReqTask) Ctx() context.Context {
return t.ctx
}
func (t *CreateCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
......@@ -244,10 +245,6 @@ type DropCollectionReqTask struct {
Req *milvuspb.DropCollectionRequest
}
func (t *DropCollectionReqTask) Ctx() context.Context {
return t.ctx
}
func (t *DropCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
......@@ -322,10 +319,6 @@ type HasCollectionReqTask struct {
HasCollection bool
}
func (t *HasCollectionReqTask) Ctx() context.Context {
return t.ctx
}
func (t *HasCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
......@@ -349,10 +342,6 @@ type DescribeCollectionReqTask struct {
Rsp *milvuspb.DescribeCollectionResponse
}
func (t *DescribeCollectionReqTask) Ctx() context.Context {
return t.ctx
}
func (t *DescribeCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
......@@ -397,10 +386,6 @@ type ShowCollectionReqTask struct {
Rsp *milvuspb.ShowCollectionsResponse
}
func (t *ShowCollectionReqTask) Ctx() context.Context {
return t.ctx
}
func (t *ShowCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
......@@ -425,10 +410,6 @@ type CreatePartitionReqTask struct {
Req *milvuspb.CreatePartitionRequest
}
func (t *CreatePartitionReqTask) Ctx() context.Context {
return t.ctx
}
func (t *CreatePartitionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
......@@ -497,10 +478,6 @@ type DropPartitionReqTask struct {
Req *milvuspb.DropPartitionRequest
}
func (t *DropPartitionReqTask) Ctx() context.Context {
return t.ctx
}
func (t *DropPartitionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
......@@ -577,10 +554,6 @@ type HasPartitionReqTask struct {
HasPartition bool
}
func (t *HasPartitionReqTask) Ctx() context.Context {
return t.ctx
}
func (t *HasPartitionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
......@@ -603,10 +576,6 @@ type ShowPartitionReqTask struct {
Rsp *milvuspb.ShowPartitionsResponse
}
func (t *ShowPartitionReqTask) Ctx() context.Context {
return t.ctx
}
func (t *ShowPartitionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
......@@ -642,10 +611,6 @@ type DescribeSegmentReqTask struct {
Rsp *milvuspb.DescribeSegmentResponse //TODO,return repeated segment id in the future
}
func (t *DescribeSegmentReqTask) Ctx() context.Context {
return t.ctx
}
func (t *DescribeSegmentReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
......@@ -696,10 +661,6 @@ type ShowSegmentReqTask struct {
Rsp *milvuspb.ShowSegmentsResponse
}
func (t *ShowSegmentReqTask) Ctx() context.Context {
return t.ctx
}
func (t *ShowSegmentReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
......@@ -735,10 +696,6 @@ type CreateIndexReqTask struct {
Req *milvuspb.CreateIndexRequest
}
func (t *CreateIndexReqTask) Ctx() context.Context {
return t.ctx
}
func (t *CreateIndexReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
......@@ -796,10 +753,6 @@ type DescribeIndexReqTask struct {
Rsp *milvuspb.DescribeIndexResponse
}
func (t *DescribeIndexReqTask) Ctx() context.Context {
return t.ctx
}
func (t *DescribeIndexReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
......@@ -834,10 +787,6 @@ type DropIndexReqTask struct {
Req *milvuspb.DropIndexRequest
}
func (t *DropIndexReqTask) Ctx() context.Context {
return t.ctx
}
func (t *DropIndexReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册