未验证 提交 fbd266ee 编写于 作者: J jaime 提交者: GitHub

backward compatibility with empty DB name (#24317)

Signed-off-by: Njaime <yun.zhang@zilliz.com>
上级 5b723f9d
......@@ -327,6 +327,12 @@ func (mt *MetaTable) GetDatabaseByName(ctx context.Context, dbName string, ts Ti
}
func (mt *MetaTable) getDatabaseByNameInternal(ctx context.Context, dbName string, ts Timestamp) (*model.Database, error) {
// backward compatibility for rolling upgrade
if dbName == "" {
log.Warn("db name is empty")
dbName = util.DefaultDBName
}
db, ok := mt.dbName2Meta[dbName]
if !ok {
return nil, fmt.Errorf("database:%s not found", dbName)
......@@ -532,6 +538,12 @@ func (mt *MetaTable) GetCollectionByName(ctx context.Context, dbName string, col
}
func (mt *MetaTable) getCollectionByNameInternal(ctx context.Context, dbName string, collectionName string, ts Timestamp) (*model.Collection, error) {
// backward compatibility for rolling upgrade
if dbName == "" {
log.Warn("db name is empty", zap.String("collectionName", collectionName), zap.Uint64("ts", ts))
dbName = util.DefaultDBName
}
collectionID, ok := mt.aliases.get(dbName, collectionName)
if ok {
return mt.getCollectionByIDInternal(ctx, dbName, collectionID, ts, false)
......@@ -624,6 +636,12 @@ func (mt *MetaTable) ListCollections(ctx context.Context, dbName string, ts Time
}
func (mt *MetaTable) listCollectionFromCache(dbName string, onlyAvail bool) ([]*model.Collection, error) {
// backward compatibility for rolling upgrade
if dbName == "" {
log.Warn("db name is empty")
dbName = util.DefaultDBName
}
collectionIDs, err := mt.names.listCollectionID(dbName)
if err != nil {
return nil, err
......@@ -675,12 +693,17 @@ func (mt *MetaTable) RenameCollection(ctx context.Context, dbName string, oldNam
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
ctx = contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName)
// backward compatibility for rolling upgrade
log := log.Ctx(ctx).With(
zap.String("db", dbName),
zap.String("oldName", oldName),
zap.String("newName", newName),
)
if dbName == "" {
log.Warn("db name is empty")
dbName = util.DefaultDBName
}
//old collection should not be an alias
_, ok := mt.aliases.get(dbName, oldName)
......@@ -818,6 +841,11 @@ func (mt *MetaTable) RemovePartition(ctx context.Context, dbID int64, collection
func (mt *MetaTable) CreateAlias(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
// backward compatibility for rolling upgrade
if dbName == "" {
log.Warn("db name is empty", zap.String("alias", alias), zap.String("collection", collectionName))
dbName = util.DefaultDBName
}
// It's ok that we don't read from catalog when cache missed.
// Since cache always keep the latest version, and the ts should always be the latest.
......@@ -881,6 +909,11 @@ func (mt *MetaTable) CreateAlias(ctx context.Context, dbName string, alias strin
func (mt *MetaTable) DropAlias(ctx context.Context, dbName string, alias string, ts Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
// backward compatibility for rolling upgrade
if dbName == "" {
log.Warn("db name is empty", zap.String("alias", alias), zap.Uint64("ts", ts))
dbName = util.DefaultDBName
}
db, err := mt.getDatabaseByNameInternal(ctx, dbName, typeutil.MaxTimestamp)
if err != nil {
......@@ -905,6 +938,11 @@ func (mt *MetaTable) DropAlias(ctx context.Context, dbName string, alias string,
func (mt *MetaTable) AlterAlias(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
// backward compatibility for rolling upgrade
if dbName == "" {
log.Warn("db name is empty", zap.String("alias", alias), zap.String("collection", collectionName))
dbName = util.DefaultDBName
}
// It's ok that we don't read from catalog when cache missed.
// Since cache always keep the latest version, and the ts should always be the latest.
......
......@@ -518,7 +518,7 @@ func TestMetaTable_GetCollectionByName(t *testing.T) {
},
},
}
meta.aliases.insert("", "alias", 100)
meta.aliases.insert(util.DefaultDBName, "alias", 100)
ctx := context.Background()
coll, err := meta.GetCollectionByName(ctx, "", "alias", 101)
assert.NoError(t, err)
......@@ -542,7 +542,7 @@ func TestMetaTable_GetCollectionByName(t *testing.T) {
},
},
}
meta.names.insert("", "name", 100)
meta.names.insert(util.DefaultDBName, "name", 100)
ctx := context.Background()
coll, err := meta.GetCollectionByName(ctx, "", "name", 101)
assert.NoError(t, err)
......@@ -1196,8 +1196,8 @@ func TestMetaTable_RenameCollection(t *testing.T) {
},
},
}
meta.names.insert("", "old", 1)
meta.names.insert("", "new", 2)
meta.names.insert(util.DefaultDBName, "old", 1)
meta.names.insert(util.DefaultDBName, "new", 2)
err := meta.RenameCollection(context.TODO(), "", "old", "new", 1000)
assert.Error(t, err)
})
......@@ -1423,6 +1423,72 @@ func TestMetaTable_CreateDatabase(t *testing.T) {
})
}
func TestMetaTable_EmtpyDatabaseName(t *testing.T) {
t.Run("getDatabaseByNameInternal with empty db", func(t *testing.T) {
mt := &MetaTable{
dbName2Meta: map[string]*model.Database{
util.DefaultDBName: {ID: 1},
},
}
ret, err := mt.getDatabaseByNameInternal(context.TODO(), "", typeutil.MaxTimestamp)
assert.NoError(t, err)
assert.Equal(t, int64(1), ret.ID)
})
t.Run("getCollectionByNameInternal with empty db", func(t *testing.T) {
mt := &MetaTable{
aliases: newNameDb(),
collID2Meta: map[typeutil.UniqueID]*model.Collection{
1: {CollectionID: 1},
},
}
mt.aliases.insert(util.DefaultDBName, "aliases", 1)
ret, err := mt.getCollectionByNameInternal(context.TODO(), "", "aliases", typeutil.MaxTimestamp)
assert.NoError(t, err)
assert.Equal(t, int64(1), ret.CollectionID)
})
t.Run("listCollectionFromCache with empty db", func(t *testing.T) {
mt := &MetaTable{
names: newNameDb(),
collID2Meta: map[typeutil.UniqueID]*model.Collection{
1: {
CollectionID: 1,
State: pb.CollectionState_CollectionCreated,
},
},
}
mt.names.insert(util.DefaultDBName, "name", 1)
ret, err := mt.listCollectionFromCache("", false)
assert.NoError(t, err)
assert.Equal(t, 1, len(ret))
assert.Equal(t, int64(1), ret[0].CollectionID)
})
t.Run("CreateAlias with empty db", func(t *testing.T) {
mt := &MetaTable{
names: newNameDb(),
}
mt.names.insert(util.DefaultDBName, "name", 1)
err := mt.CreateAlias(context.TODO(), "", "name", "name", typeutil.MaxTimestamp)
assert.Error(t, err)
})
t.Run("DropAlias with empty db", func(t *testing.T) {
mt := &MetaTable{
names: newNameDb(),
}
mt.names.insert(util.DefaultDBName, "name", 1)
err := mt.DropAlias(context.TODO(), "", "name", typeutil.MaxTimestamp)
assert.Error(t, err)
})
}
func TestMetaTable_DropDatabase(t *testing.T) {
t.Run("can't drop default database", func(t *testing.T) {
mt := &MetaTable{}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册