未验证 提交 1f27eb60 编写于 作者: C Cai Yudong 提交者: GitHub

Remove all index when drop collection (#12227)

Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 f95ef226
......@@ -882,6 +882,39 @@ func (c *Core) BuildIndex(ctx context.Context, segID typeutil.UniqueID, field *s
return bldID, nil
}
// RemoveIndex will call drop index service
func (c *Core) RemoveIndex(ctx context.Context, collName string, indexName string) error {
_, indexInfos, err := c.MetaTable.GetIndexByName(collName, indexName)
if err != nil {
log.Error("GetIndexByName failed,", zap.String("collection name", collName),
zap.String("index name", indexName), zap.Error(err))
return err
}
for _, indexInfo := range indexInfos {
if err = c.CallDropIndexService(ctx, indexInfo.IndexID); err != nil {
log.Error("CallDropIndexService failed,", zap.String("collection name", collName), zap.Error(err))
return err
}
}
return nil
}
// ExpireMetaCache will call invalidate collection meta cache
func (c *Core) ExpireMetaCache(ctx context.Context, collNames []string, ts typeutil.Timestamp) {
for _, collName := range collNames {
req := proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO, msg type
MsgID: 0, //TODO, msg id
Timestamp: ts,
SourceID: c.session.ServerID,
},
CollectionName: collName,
}
c.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
}
}
// Register register rootcoord at etcd
func (c *Core) Register() error {
c.session = sessionutil.NewSession(c.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
......
......@@ -22,7 +22,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
......@@ -296,7 +295,10 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
return fmt.Errorf("encodeDdOperation fail, error = %w", err)
}
aliases := t.core.MetaTable.ListAliases(collMeta.ID)
// drop all indices
if err = t.core.RemoveIndex(ctx, t.Req.CollectionName, ""); err != nil {
return err
}
// use lambda function here to guarantee all resources to be released
dropCollectionFn := func() error {
......@@ -346,36 +348,14 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
//notify query service to release collection
if err = t.core.CallReleaseCollectionService(t.core.ctx, ts, 0, collMeta.ID); err != nil {
log.Error("Failed to CallReleaseCollectionService", zap.String("error", err.Error()))
log.Error("Failed to CallReleaseCollectionService", zap.Error(err))
return err
}
req := proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO, msg type
MsgID: 0, //TODO, msg id
Timestamp: ts,
SourceID: t.core.session.ServerID,
},
DbName: t.Req.DbName,
CollectionName: t.Req.CollectionName,
}
// error doesn't matter here
t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
for _, alias := range aliases {
req = proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO, msg type
MsgID: 0, //TODO, msg id
Timestamp: ts,
SourceID: t.core.session.ServerID,
},
DbName: t.Req.DbName,
CollectionName: alias,
}
t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
}
t.core.ExpireMetaCache(ctx, []string{t.Req.CollectionName}, ts)
aliases := t.core.MetaTable.ListAliases(collMeta.ID)
t.core.ExpireMetaCache(ctx, aliases, ts)
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)
......@@ -566,18 +546,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
return err
}
req := proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO, msg type
MsgID: 0, //TODO, msg id
Timestamp: ts,
SourceID: t.core.session.ServerID,
},
DbName: t.Req.DbName,
CollectionName: t.Req.CollectionName,
}
// error doesn't matter here
t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
t.core.ExpireMetaCache(ctx, []string{t.Req.CollectionName}, ts)
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)
......@@ -662,22 +631,11 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
return err
}
req := proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO, msg type
MsgID: 0, //TODO, msg id
Timestamp: ts,
SourceID: t.core.session.ServerID,
},
DbName: t.Req.DbName,
CollectionName: t.Req.CollectionName,
}
// error doesn't matter here
t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
t.core.ExpireMetaCache(ctx, []string{t.Req.CollectionName}, ts)
//notify query service to release partition
if err = t.core.CallReleasePartitionService(t.core.ctx, ts, 0, collInfo.ID, []typeutil.UniqueID{partID}); err != nil {
log.Error("Failed to CallReleaseCollectionService", zap.String("error", err.Error()))
log.Error("Failed to CallReleaseCollectionService", zap.Error(err))
return err
}
......@@ -969,22 +927,10 @@ func (t *DropIndexReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DropIndex {
return fmt.Errorf("drop index, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
}
_, info, err := t.core.MetaTable.GetIndexByName(t.Req.CollectionName, t.Req.IndexName)
if err != nil {
log.Warn("GetIndexByName failed,", zap.String("collection name", t.Req.CollectionName), zap.String("field name", t.Req.FieldName), zap.String("index name", t.Req.IndexName), zap.Error(err))
return err
}
if len(info) == 0 {
return nil
}
if len(info) != 1 {
return fmt.Errorf("len(index) = %d", len(info))
}
err = t.core.CallDropIndexService(ctx, info[0].IndexID)
if err != nil {
if err := t.core.RemoveIndex(ctx, t.Req.CollectionName, t.Req.IndexName); err != nil {
return err
}
_, _, err = t.core.MetaTable.DropIndex(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName)
_, _, err := t.core.MetaTable.DropIndex(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName)
return err
}
......@@ -1043,17 +989,7 @@ func (t *DropAliasReqTask) Execute(ctx context.Context) error {
return fmt.Errorf("meta table drop alias failed, error = %w", err)
}
req := proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO, msg type
MsgID: 0, //TODO, msg id
Timestamp: ts,
SourceID: t.core.session.ServerID,
},
CollectionName: t.Req.Alias,
}
// error doesn't matter here
t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
t.core.ExpireMetaCache(ctx, []string{t.Req.Alias}, ts)
return nil
}
......@@ -1084,17 +1020,7 @@ func (t *AlterAliasReqTask) Execute(ctx context.Context) error {
return fmt.Errorf("meta table alter alias failed, error = %w", err)
}
req := proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO, msg type
MsgID: 0, //TODO, msg id
Timestamp: ts,
SourceID: t.core.session.ServerID,
},
CollectionName: t.Req.Alias,
}
// error doesn't matter here
t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
t.core.ExpireMetaCache(ctx, []string{t.Req.Alias}, ts)
return nil
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册